From c613cb182f78572021794718f1431f1024d1817c Mon Sep 17 00:00:00 2001 From: =?utf8?q?Milo=C5=A1=20Mula=C4=8D?= Date: Tue, 18 Jul 2006 11:30:42 +0000 Subject: [PATCH] call edg_wll_IColumnsSQLPart only once per DAG --- org.glite.lb.server/Makefile | 2 +- org.glite.lb.server/interface/lbs_db.h | 2 +- org.glite.lb.server/src/jobstat.c | 59 +------------- org.glite.lb.server/src/jobstat.h | 2 +- org.glite.lb.server/src/lbs_db.c | 9 +-- org.glite.lb.server/src/store.c.T | 103 +++++++++++++++++++++---- org.glite.lb.server/test/test_query_events.cpp | 2 +- 7 files changed, 100 insertions(+), 79 deletions(-) diff --git a/org.glite.lb.server/Makefile b/org.glite.lb.server/Makefile index 0216210..2fb12e3 100644 --- a/org.glite.lb.server/Makefile +++ b/org.glite.lb.server/Makefile @@ -250,7 +250,7 @@ else PLUGIN_LIB=glite_lb_plugin.la endif -compile: glite_lb_bkserverd glite_lb_bkindex ${STATIC_LIB_BK} ${PLUGIN_LIB} +compile: glite_lb_bkserverd glite_lb_bkindex ${STATIC_LIB_BK} ${PLUGIN_LIB} store.c check: compile test.xml test.query diff --git a/org.glite.lb.server/interface/lbs_db.h b/org.glite.lb.server/interface/lbs_db.h index de477a3..38c49f6 100644 --- a/org.glite.lb.server/interface/lbs_db.h +++ b/org.glite.lb.server/interface/lbs_db.h @@ -118,7 +118,7 @@ edg_wll_ErrorCode edg_wll_bufferedInsertInit(edg_wll_Context ctx, edg_wll_bufIns * if num. of rows or size of data oversteps the limits, real * multi-row insert is done */ -edg_wll_ErrorCode edg_wll_bufferedInsert(edg_wll_bufInsert *bi, char **row); +edg_wll_ErrorCode edg_wll_bufferedInsert(edg_wll_bufInsert *bi, char *row); /** * flush buffered data and free bi structure diff --git a/org.glite.lb.server/src/jobstat.c b/org.glite.lb.server/src/jobstat.c index 69d633d..d002abe 100644 --- a/org.glite.lb.server/src/jobstat.c +++ b/org.glite.lb.server/src/jobstat.c @@ -38,7 +38,6 @@ static void warn (const char* format, ...) UNUSED_VAR ; static char *job_owner(edg_wll_Context,char *); -static edg_wll_ErrorCode states_values_embryonic(edg_wll_Context, edg_wlc_JobId, const edg_wll_RegJobEvent *e, char **, char**); int js_enable_store = 1; @@ -553,63 +552,13 @@ cleanup: } -/* - * Returns encoded SQL table states record for embryonic DAG subjob - */ - -static edg_wll_ErrorCode states_values_embryonic( - edg_wll_Context ctx, - edg_wlc_JobId jobid, - const edg_wll_RegJobEvent *e, - char **icnames, - char **values) -{ - char *jobid_md5, *stat_enc, *parent_md5; - char *stmt = NULL; - char *icvalues; - intJobStat stat_rec; - intJobStat *stat = &stat_rec; - - init_intJobStat(stat); - if (edg_wlc_JobIdDup(jobid, &stat->pub.jobId) || - edg_wlc_JobIdDup(e->jobId, &stat->pub.parent_job)) goto err; - stat->pub.state = EDG_WLL_JOB_SUBMITTED; - stat->pub.owner = strdup(e->user); - stat->pub.stateEnterTimes[1 + EDG_WLL_JOB_SUBMITTED] = (int)e->timestamp.tv_sec; - - jobid_md5 = edg_wlc_JobIdGetUnique(jobid); - parent_md5 = edg_wlc_JobIdGetUnique(e->jobId); - stat_enc = enc_intJobStat(strdup(""), stat); - if (jobid_md5 == NULL || parent_md5 == NULL || stat_enc == NULL) goto err; - - - if (edg_wll_IColumnsSQLPart(ctx, ctx->job_index_cols, stat, 1, icnames, &icvalues)) goto err; - trio_asprintf(&stmt, - "'%|Ss',%d,%d,'%|Ss','%|Ss','%|Ss'%s", - jobid_md5, stat->pub.state, 1, stat_enc, - INTSTAT_VERSION, parent_md5, icvalues); - free(icvalues); - -err: - destroy_intJobStat(stat); - free(jobid_md5); - free(stat_enc); - free(parent_md5); - *values = stmt; - return edg_wll_Error(ctx,NULL,NULL); -} - edg_wll_ErrorCode edg_wll_StoreIntStateEmbryonic(edg_wll_Context ctx, edg_wlc_JobId jobid, - const edg_wll_RegJobEvent *e, + char *icnames, + char *values, edg_wll_bufInsert *bi) { - char *values = NULL; char *stmt = NULL; - char *icnames = NULL; - - if (states_values_embryonic(ctx, jobid, e, &icnames, &values)) - goto cleanup; /* TODO edg_wll_UpdateStatistics(ctx, NULL, e, &jobstat.pub); @@ -617,7 +566,7 @@ edg_wll_ErrorCode edg_wll_StoreIntStateEmbryonic(edg_wll_Context ctx, */ #ifdef LB_BUF - if (edg_wll_bufferedInsert(bi, &values)) + if (edg_wll_bufferedInsert(bi, values)) goto cleanup; #else @@ -632,8 +581,6 @@ edg_wll_ErrorCode edg_wll_StoreIntStateEmbryonic(edg_wll_Context ctx, #endif cleanup: - free(icnames); - free(values); free(stmt); return edg_wll_Error(ctx,NULL,NULL); diff --git a/org.glite.lb.server/src/jobstat.h b/org.glite.lb.server/src/jobstat.h index d424ed7..5f1d958 100644 --- a/org.glite.lb.server/src/jobstat.h +++ b/org.glite.lb.server/src/jobstat.h @@ -58,7 +58,7 @@ edg_wll_ErrorCode edg_wll_IColumnsSQLPart(edg_wll_Context, void *, intJobStat *, edg_wll_ErrorCode edg_wll_RefreshIColumns(edg_wll_Context, void *); int edg_wll_intJobStatus( edg_wll_Context, const edg_wlc_JobId, int, intJobStat *, int); edg_wll_ErrorCode edg_wll_StoreIntState(edg_wll_Context, intJobStat *, int); -edg_wll_ErrorCode edg_wll_StoreIntStateEmbryonic(edg_wll_Context, edg_wlc_JobId, const edg_wll_RegJobEvent *e, edg_wll_bufInsert *bi); +edg_wll_ErrorCode edg_wll_StoreIntStateEmbryonic(edg_wll_Context, edg_wlc_JobId, char *icnames, char *values, edg_wll_bufInsert *bi); edg_wll_ErrorCode edg_wll_LoadIntState(edg_wll_Context , edg_wlc_JobId , int, intJobStat **); diff --git a/org.glite.lb.server/src/lbs_db.c b/org.glite.lb.server/src/lbs_db.c index 37f8f41..b0aa285 100644 --- a/org.glite.lb.server/src/lbs_db.c +++ b/org.glite.lb.server/src/lbs_db.c @@ -408,13 +408,11 @@ static int flush_bufferd_insert(edg_wll_bufInsert *bi) * adds row of n values into n columns into an insert buffer * if num. of rows or size of data oversteps the limits, real * multi-row insert is done - * Eats the row! No need to free it after the call :) */ -edg_wll_ErrorCode edg_wll_bufferedInsert(edg_wll_bufInsert *bi, char **row) +edg_wll_ErrorCode edg_wll_bufferedInsert(edg_wll_bufInsert *bi, char *row) { - bi->rows[bi->rec_num++] = *row; - bi->rec_size += strlen(*row); - *row = NULL; // just to avoid freeing by caller function + bi->rows[bi->rec_num++] = strdup(row); + bi->rec_size += strlen(row); if ((bi->size_limit && bi->rec_size >= bi->size_limit) || (bi->record_limit && bi->rec_num >= bi->record_limit)) @@ -443,7 +441,6 @@ edg_wll_ErrorCode edg_wll_bufferedInsertClose(edg_wll_bufInsert *bi) return edg_wll_Error(bi->ctx,NULL,NULL); free_buffered_insert(bi); - return edg_wll_ResetError(bi->ctx); } diff --git a/org.glite.lb.server/src/store.c.T b/org.glite.lb.server/src/store.c.T index 0a9a7c4..e91445b 100644 --- a/org.glite.lb.server/src/store.c.T +++ b/org.glite.lb.server/src/store.c.T @@ -276,7 +276,7 @@ static int store_job_block(edg_wll_Context ctx,const edg_wlc_JobId job,const cha edg_wll_ResetError(ctx); trio_asprintf(&row, "'%|Ss','%|Ss','%|Ss'", jobid,jobstr,userid); - edg_wll_bufferedInsert(bi, &row); // no need to free row + edg_wll_bufferedInsert(bi, row); // no need to free row free(jobstr); free(jobid); @@ -591,26 +591,91 @@ static int register_subjobs(edg_wll_Context ctx,const edg_wll_RegJobEvent *e) return edg_wll_Error(ctx,NULL,NULL); } + +/* + * Returns encoded SQL table states record for embryonic DAG subjob + */ + +static edg_wll_ErrorCode states_values_embryonic( + edg_wll_Context ctx, + edg_wlc_JobId jobid, + const edg_wll_RegJobEvent *e, + char **icnames, + char **values) +{ + char *jobid_md5, *stat_enc, *parent_md5; + char *stmt = NULL; + char *icvalues; + intJobStat stat_rec; + intJobStat *stat = &stat_rec; + + init_intJobStat(stat); + if (edg_wlc_JobIdDup(jobid, &stat->pub.jobId) || + edg_wlc_JobIdDup(e->jobId, &stat->pub.parent_job)) goto err; + stat->pub.state = EDG_WLL_JOB_SUBMITTED; + stat->pub.owner = strdup(e->user); + stat->pub.stateEnterTimes[1 + EDG_WLL_JOB_SUBMITTED] = (int)e->timestamp.tv_sec; + + jobid_md5 = edg_wlc_JobIdGetUnique(jobid); + parent_md5 = edg_wlc_JobIdGetUnique(e->jobId); + stat_enc = enc_intJobStat(strdup(""), stat); + if (jobid_md5 == NULL || parent_md5 == NULL || stat_enc == NULL) goto err; + + + if (edg_wll_IColumnsSQLPart(ctx, ctx->job_index_cols, stat, 1, icnames, &icvalues)) goto err; + trio_asprintf(&stmt, + "'%|Ss',%d,%d,'%|Ss','%|Ss','%|Ss'%s", + jobid_md5, stat->pub.state, 1, stat_enc, + INTSTAT_VERSION, parent_md5, icvalues); + free(icvalues); + +err: + destroy_intJobStat(stat); + free(jobid_md5); + free(stat_enc); + free(parent_md5); + *values = stmt; + return edg_wll_Error(ctx,NULL,NULL); +} + static int register_subjobs_embryonic(edg_wll_Context ctx,const edg_wll_RegJobEvent *e) { int i, err = 0; edg_wlc_JobId *subjobs; struct timeval now; char *userid = strdup(strmd5(e->user,NULL)); + char *jobid_md5, *jobid_md5_old; + size_t jobid_len; edg_wll_bufInsert bi_j, bi_s; edg_wll_bufInsert *bi_jobs = &bi_j, *bi_states = &bi_s; + char *icnames, *values; edg_wll_ResetError(ctx); + if (e->nsubjobs == 0) return 0; + if (e->nsubjobs < 0) return edg_wll_SetError(ctx,EINVAL,"negative number of subjobs"); + if ((err = edg_wll_GenerateSubjobIds(ctx,e->jobId,e->nsubjobs,e->seed,&subjobs))) + return err; + + /* find out icnames and values once, then only change jobids */ + if (states_values_embryonic(ctx, subjobs[0], e, &icnames, &values)) + edg_wll_Error(ctx, NULL, NULL); + jobid_md5_old = edg_wlc_JobIdGetUnique(subjobs[0]); + jobid_len = strlen(jobid_md5_old); + + #ifdef LB_BUF /* init multirows insert mechanism for tables used here */ - if (edg_wll_bufferedInsertInit(ctx, bi_jobs, NULL, "jobs", 100000, 1000, + if (edg_wll_bufferedInsertInit(ctx, bi_jobs, NULL, "jobs", 4000, 1000, "jobid, dg_jobid, userid")) { return edg_wll_SetError(ctx, EINVAL, "edg_wll_bufferedInsertInit()"); } - if (edg_wll_bufferedInsertInit(ctx, bi_states, NULL, "states", 1000000, 1000, + +// XXX: use icnames here + + if (edg_wll_bufferedInsertInit(ctx, bi_states, NULL, "states", 4000, 1000, "jobid, status, seq,int_status, version, parent_job, `STD_location`," "`STD_owner`, `STD_destination`, `TIME_Cleared`, `TIME_Aborted`," "`TIME_Cancelled`, `TIME_Submitted`")) @@ -619,21 +684,15 @@ static int register_subjobs_embryonic(edg_wll_Context ctx,const edg_wll_RegJobEv } #endif - if (e->nsubjobs == 0) return 0; - if (e->nsubjobs < 0) return edg_wll_SetError(ctx,EINVAL,"negative number of subjobs"); - - if ((err = edg_wll_GenerateSubjobIds(ctx,e->jobId,e->nsubjobs,e->seed,&subjobs))) - return err; gettimeofday(&now,NULL); -/* XXX: increase the overall request timeout. */ -/* XXX: smaller incerease may be added - e->nsubjobs/100 ? */ - ctx->p_tmp_timeout.tv_sec += e->nsubjobs; + /* increase the overall request timeout. */ + ctx->p_tmp_timeout.tv_sec += e->nsubjobs/10; if (ctx->p_tmp_timeout.tv_sec > 86400) ctx->p_tmp_timeout.tv_sec = 86400; for (i=0; insubjobs; i++) { - char *et,*ed,*job_s; + char *et,*ed,*job_s,*p,*p1; /* save jobid-userid relation into jobs table */ #ifdef LB_BUF @@ -643,7 +702,22 @@ static int register_subjobs_embryonic(edg_wll_Context ctx,const edg_wll_RegJobEv #endif edg_wll_Error(ctx,&et,&ed); - if (!err && (err = edg_wll_StoreIntStateEmbryonic(ctx, subjobs[i], e, bi_states))) + /* interchange variable parts (jobids) in values */ + /* there are only two occurences of subjob jobid */ + jobid_md5 = edg_wlc_JobIdGetUnique(subjobs[i]); + if (i) { + p = strstr(values, jobid_md5_old); + assert(p); + memcpy(p, jobid_md5, jobid_len); + + p1 = strstr(p + jobid_len, jobid_md5_old); + assert(p1); + memcpy(p1, jobid_md5, jobid_len); + } + free(jobid_md5_old); + jobid_md5_old = jobid_md5; + + if (!err && (err = edg_wll_StoreIntStateEmbryonic(ctx, subjobs[i], icnames, values, bi_states))) edg_wll_Error(ctx,&et,&ed); //job_s = edg_wlc_JobIdUnparse(subjobs[i]); @@ -660,6 +734,9 @@ static int register_subjobs_embryonic(edg_wll_Context ctx,const edg_wll_RegJobEv edg_wlc_JobIdFree(subjobs[i]); } + free(jobid_md5_old); //free the last one + free(icnames); + free(values); free(subjobs); #ifdef LB_BUF diff --git a/org.glite.lb.server/test/test_query_events.cpp b/org.glite.lb.server/test/test_query_events.cpp index 61311d1..1bc7c74 100644 --- a/org.glite.lb.server/test/test_query_events.cpp +++ b/org.glite.lb.server/test/test_query_events.cpp @@ -139,7 +139,7 @@ int edg_wll_Transaction(edg_wll_Context ctx) { return 0; } int edg_wll_Commit(edg_wll_Context ctx) { return 0; } int edg_wll_Rollback(edg_wll_Context ctx) { return 0; } -edg_wll_ErrorCode edg_wll_bufferedInsert(edg_wll_bufInsert *bi, char **row) { return (edg_wll_ErrorCode) 0; }; +edg_wll_ErrorCode edg_wll_bufferedInsert(edg_wll_bufInsert *bi, char *row) { return (edg_wll_ErrorCode) 0; }; } -- 1.8.2.3