call edg_wll_IColumnsSQLPart only once per DAG
authorMiloš Mulač <mulac@civ.zcu.cz>
Tue, 18 Jul 2006 11:30:42 +0000 (11:30 +0000)
committerMiloš Mulač <mulac@civ.zcu.cz>
Tue, 18 Jul 2006 11:30:42 +0000 (11:30 +0000)
org.glite.lb.server/Makefile
org.glite.lb.server/interface/lbs_db.h
org.glite.lb.server/src/jobstat.c
org.glite.lb.server/src/jobstat.h
org.glite.lb.server/src/lbs_db.c
org.glite.lb.server/src/store.c.T
org.glite.lb.server/test/test_query_events.cpp

index 0216210..2fb12e3 100644 (file)
@@ -250,7 +250,7 @@ else
     PLUGIN_LIB=glite_lb_plugin.la
 endif
 
-compile: glite_lb_bkserverd glite_lb_bkindex ${STATIC_LIB_BK} ${PLUGIN_LIB}
+compile: glite_lb_bkserverd glite_lb_bkindex ${STATIC_LIB_BK} ${PLUGIN_LIB} store.c
 
 
 check: compile test.xml test.query
index de477a3..38c49f6 100644 (file)
@@ -118,7 +118,7 @@ edg_wll_ErrorCode edg_wll_bufferedInsertInit(edg_wll_Context ctx, edg_wll_bufIns
  * if num. of rows or size of data oversteps the limits, real
  * multi-row insert is done
  */
-edg_wll_ErrorCode edg_wll_bufferedInsert(edg_wll_bufInsert *bi, char **row);
+edg_wll_ErrorCode edg_wll_bufferedInsert(edg_wll_bufInsert *bi, char *row);
 
 /**
  * flush buffered data and free bi structure
index 69d633d..d002abe 100644 (file)
@@ -38,7 +38,6 @@
 static void warn (const char* format, ...) UNUSED_VAR ;
 static char *job_owner(edg_wll_Context,char *);
 
-static edg_wll_ErrorCode states_values_embryonic(edg_wll_Context, edg_wlc_JobId,  const edg_wll_RegJobEvent *e, char **, char**);
 
 int js_enable_store = 1;
 
@@ -553,63 +552,13 @@ cleanup:
 }
 
 
-/*
- * Returns encoded SQL table states record for embryonic DAG subjob
- */
-
-static edg_wll_ErrorCode states_values_embryonic(
-       edg_wll_Context ctx,
-       edg_wlc_JobId jobid,
-       const edg_wll_RegJobEvent *e,
-       char **icnames,
-       char **values)
-{
-       char *jobid_md5, *stat_enc, *parent_md5;
-       char *stmt = NULL;
-       char *icvalues;
-       intJobStat stat_rec;
-       intJobStat *stat = &stat_rec;
-
-       init_intJobStat(stat);
-       if (edg_wlc_JobIdDup(jobid, &stat->pub.jobId) ||
-               edg_wlc_JobIdDup(e->jobId, &stat->pub.parent_job)) goto err;
-       stat->pub.state = EDG_WLL_JOB_SUBMITTED;
-       stat->pub.owner = strdup(e->user);
-       stat->pub.stateEnterTimes[1 + EDG_WLL_JOB_SUBMITTED] = (int)e->timestamp.tv_sec;
-
-       jobid_md5 = edg_wlc_JobIdGetUnique(jobid);
-       parent_md5 = edg_wlc_JobIdGetUnique(e->jobId);
-       stat_enc = enc_intJobStat(strdup(""), stat);
-       if (jobid_md5 == NULL || parent_md5 == NULL || stat_enc == NULL) goto err;
-
-
-       if (edg_wll_IColumnsSQLPart(ctx, ctx->job_index_cols, stat, 1, icnames, &icvalues)) goto err;
-       trio_asprintf(&stmt,
-               "'%|Ss',%d,%d,'%|Ss','%|Ss','%|Ss'%s",
-               jobid_md5, stat->pub.state, 1, stat_enc,
-               INTSTAT_VERSION, parent_md5, icvalues);
-       free(icvalues);
-
-err:
-       destroy_intJobStat(stat);
-       free(jobid_md5);
-       free(stat_enc);
-       free(parent_md5);
-       *values = stmt;
-       return edg_wll_Error(ctx,NULL,NULL);
-}
-
 edg_wll_ErrorCode edg_wll_StoreIntStateEmbryonic(edg_wll_Context ctx,
         edg_wlc_JobId jobid,
-        const edg_wll_RegJobEvent *e,
+        char *icnames, 
+       char *values,
        edg_wll_bufInsert *bi)
 {
-       char *values = NULL;
        char *stmt = NULL;
-       char *icnames = NULL;
-
-       if (states_values_embryonic(ctx, jobid, e, &icnames, &values))
-               goto cleanup;
 
 /* TODO
                edg_wll_UpdateStatistics(ctx, NULL, e, &jobstat.pub);
@@ -617,7 +566,7 @@ edg_wll_ErrorCode edg_wll_StoreIntStateEmbryonic(edg_wll_Context ctx,
 */
 
 #ifdef LB_BUF
-       if (edg_wll_bufferedInsert(bi, &values))
+       if (edg_wll_bufferedInsert(bi, values))
                goto cleanup;
 #else
 
@@ -632,8 +581,6 @@ edg_wll_ErrorCode edg_wll_StoreIntStateEmbryonic(edg_wll_Context ctx,
 #endif
 
 cleanup:
-       free(icnames);
-       free(values);
        free(stmt); 
 
        return edg_wll_Error(ctx,NULL,NULL);
index d424ed7..5f1d958 100644 (file)
@@ -58,7 +58,7 @@ edg_wll_ErrorCode edg_wll_IColumnsSQLPart(edg_wll_Context, void *, intJobStat *,
 edg_wll_ErrorCode edg_wll_RefreshIColumns(edg_wll_Context, void *);
 int edg_wll_intJobStatus( edg_wll_Context, const edg_wlc_JobId, int, intJobStat *, int);
 edg_wll_ErrorCode edg_wll_StoreIntState(edg_wll_Context, intJobStat *, int);
-edg_wll_ErrorCode edg_wll_StoreIntStateEmbryonic(edg_wll_Context, edg_wlc_JobId, const edg_wll_RegJobEvent *e, edg_wll_bufInsert *bi);
+edg_wll_ErrorCode edg_wll_StoreIntStateEmbryonic(edg_wll_Context, edg_wlc_JobId, char *icnames, char *values, edg_wll_bufInsert *bi);
 edg_wll_ErrorCode edg_wll_LoadIntState(edg_wll_Context , edg_wlc_JobId , int, intJobStat **);
 
 
index 37f8f41..b0aa285 100644 (file)
@@ -408,13 +408,11 @@ static int flush_bufferd_insert(edg_wll_bufInsert *bi)
  * adds row of n values into n columns into an insert buffer
  * if num. of rows or size of data oversteps the limits, real
  * multi-row insert is done
- * Eats the row! No need to free it after the call :)
  */
-edg_wll_ErrorCode edg_wll_bufferedInsert(edg_wll_bufInsert *bi, char **row)
+edg_wll_ErrorCode edg_wll_bufferedInsert(edg_wll_bufInsert *bi, char *row)
 {
-       bi->rows[bi->rec_num++] = *row;
-       bi->rec_size += strlen(*row);
-       *row = NULL;    // just to avoid freeing by caller function
+       bi->rows[bi->rec_num++] = strdup(row);
+       bi->rec_size += strlen(row);
 
        if ((bi->size_limit && bi->rec_size >= bi->size_limit) ||
                (bi->record_limit && bi->rec_num >= bi->record_limit))
@@ -443,7 +441,6 @@ edg_wll_ErrorCode edg_wll_bufferedInsertClose(edg_wll_bufInsert *bi)
                return edg_wll_Error(bi->ctx,NULL,NULL);
        free_buffered_insert(bi);
 
-
        return edg_wll_ResetError(bi->ctx);
 }
 
index 0a9a7c4..e91445b 100644 (file)
@@ -276,7 +276,7 @@ static int store_job_block(edg_wll_Context ctx,const edg_wlc_JobId job,const cha
        edg_wll_ResetError(ctx);
 
        trio_asprintf(&row, "'%|Ss','%|Ss','%|Ss'", jobid,jobstr,userid);
-       edg_wll_bufferedInsert(bi, &row); // no need to free row
+       edg_wll_bufferedInsert(bi, row); // no need to free row
 
        free(jobstr);
        free(jobid);
@@ -591,26 +591,91 @@ static int register_subjobs(edg_wll_Context ctx,const edg_wll_RegJobEvent *e)
        return edg_wll_Error(ctx,NULL,NULL);
 }
 
+
+/*
+ * Returns encoded SQL table states record for embryonic DAG subjob
+ */
+
+static edg_wll_ErrorCode states_values_embryonic(
+       edg_wll_Context ctx,
+       edg_wlc_JobId jobid,
+       const edg_wll_RegJobEvent *e,
+       char **icnames,
+       char **values)
+{
+       char *jobid_md5, *stat_enc, *parent_md5;
+       char *stmt = NULL;
+       char *icvalues;
+       intJobStat stat_rec;
+       intJobStat *stat = &stat_rec;
+
+       init_intJobStat(stat);
+       if (edg_wlc_JobIdDup(jobid, &stat->pub.jobId) ||
+               edg_wlc_JobIdDup(e->jobId, &stat->pub.parent_job)) goto err;
+       stat->pub.state = EDG_WLL_JOB_SUBMITTED;
+       stat->pub.owner = strdup(e->user);
+       stat->pub.stateEnterTimes[1 + EDG_WLL_JOB_SUBMITTED] = (int)e->timestamp.tv_sec;
+
+       jobid_md5 = edg_wlc_JobIdGetUnique(jobid);
+       parent_md5 = edg_wlc_JobIdGetUnique(e->jobId);
+       stat_enc = enc_intJobStat(strdup(""), stat);
+       if (jobid_md5 == NULL || parent_md5 == NULL || stat_enc == NULL) goto err;
+
+
+       if (edg_wll_IColumnsSQLPart(ctx, ctx->job_index_cols, stat, 1, icnames, &icvalues)) goto err;
+       trio_asprintf(&stmt,
+               "'%|Ss',%d,%d,'%|Ss','%|Ss','%|Ss'%s",
+               jobid_md5, stat->pub.state, 1, stat_enc,
+               INTSTAT_VERSION, parent_md5, icvalues);
+       free(icvalues);
+
+err:
+       destroy_intJobStat(stat);
+       free(jobid_md5);
+       free(stat_enc);
+       free(parent_md5);
+       *values = stmt;
+       return edg_wll_Error(ctx,NULL,NULL);
+}
+
 static int register_subjobs_embryonic(edg_wll_Context ctx,const edg_wll_RegJobEvent *e)
 {
        int                     i, err = 0;
        edg_wlc_JobId           *subjobs;
        struct timeval          now;
        char                    *userid = strdup(strmd5(e->user,NULL));
+       char                    *jobid_md5, *jobid_md5_old;
+       size_t                  jobid_len;
        edg_wll_bufInsert       bi_j, bi_s;
        edg_wll_bufInsert       *bi_jobs = &bi_j, *bi_states = &bi_s;
+       char                    *icnames, *values;
 
 
        edg_wll_ResetError(ctx);
 
+       if (e->nsubjobs == 0) return 0;
+       if (e->nsubjobs < 0) return edg_wll_SetError(ctx,EINVAL,"negative number of subjobs");
+       if ((err = edg_wll_GenerateSubjobIds(ctx,e->jobId,e->nsubjobs,e->seed,&subjobs)))
+               return err;
+
+       /* find out icnames and values once, then only change jobids */ 
+       if (states_values_embryonic(ctx, subjobs[0], e, &icnames, &values))
+               edg_wll_Error(ctx, NULL, NULL);
+       jobid_md5_old = edg_wlc_JobIdGetUnique(subjobs[0]);
+       jobid_len = strlen(jobid_md5_old);
+               
+
 #ifdef LB_BUF
        /* init multirows insert mechanism for tables used here */
-       if (edg_wll_bufferedInsertInit(ctx, bi_jobs, NULL, "jobs", 100000, 1000,
+       if (edg_wll_bufferedInsertInit(ctx, bi_jobs, NULL, "jobs", 4000, 1000,
                "jobid, dg_jobid, userid"))
        {
                return edg_wll_SetError(ctx, EINVAL, "edg_wll_bufferedInsertInit()");
        }
-       if (edg_wll_bufferedInsertInit(ctx, bi_states, NULL, "states", 1000000, 1000, 
+
+// XXX: use icnames here
+
+       if (edg_wll_bufferedInsertInit(ctx, bi_states, NULL, "states", 4000, 1000,
                "jobid, status, seq,int_status, version, parent_job, `STD_location`,"
                "`STD_owner`, `STD_destination`, `TIME_Cleared`, `TIME_Aborted`,"
                "`TIME_Cancelled`, `TIME_Submitted`"))
@@ -619,21 +684,15 @@ static int register_subjobs_embryonic(edg_wll_Context ctx,const edg_wll_RegJobEv
        }
 #endif         
 
-       if (e->nsubjobs == 0) return 0;
-       if (e->nsubjobs < 0) return edg_wll_SetError(ctx,EINVAL,"negative number of subjobs");
-
-       if ((err = edg_wll_GenerateSubjobIds(ctx,e->jobId,e->nsubjobs,e->seed,&subjobs)))
-               return err;
 
        gettimeofday(&now,NULL);
 
-/* XXX: increase the overall request timeout. */
-/* XXX: smaller incerease may be added - e->nsubjobs/100 ? */
-       ctx->p_tmp_timeout.tv_sec += e->nsubjobs;
+       /* increase the overall request timeout. */
+       ctx->p_tmp_timeout.tv_sec += e->nsubjobs/10;
        if (ctx->p_tmp_timeout.tv_sec > 86400) ctx->p_tmp_timeout.tv_sec = 86400;
 
        for (i=0; i<e->nsubjobs; i++) {
-               char            *et,*ed,*job_s;
+               char            *et,*ed,*job_s,*p,*p1;
 
                /* save jobid-userid relation into jobs table */
 #ifdef LB_BUF
@@ -643,7 +702,22 @@ static int register_subjobs_embryonic(edg_wll_Context ctx,const edg_wll_RegJobEv
 #endif
                        edg_wll_Error(ctx,&et,&ed);
 
-               if (!err && (err = edg_wll_StoreIntStateEmbryonic(ctx, subjobs[i], e, bi_states)))
+               /* interchange variable parts (jobids) in values */
+               /* there are only two occurences of subjob jobid */
+               jobid_md5 = edg_wlc_JobIdGetUnique(subjobs[i]);
+               if (i) {
+                       p = strstr(values, jobid_md5_old);              
+                       assert(p);
+                       memcpy(p, jobid_md5, jobid_len);
+
+                       p1 = strstr(p + jobid_len, jobid_md5_old);
+                       assert(p1);
+                       memcpy(p1, jobid_md5, jobid_len);
+               }
+               free(jobid_md5_old);
+               jobid_md5_old = jobid_md5;
+
+               if (!err && (err = edg_wll_StoreIntStateEmbryonic(ctx, subjobs[i], icnames, values, bi_states)))
                        edg_wll_Error(ctx,&et,&ed);
 
 //job_s = edg_wlc_JobIdUnparse(subjobs[i]);
@@ -660,6 +734,9 @@ static int register_subjobs_embryonic(edg_wll_Context ctx,const edg_wll_RegJobEv
                edg_wlc_JobIdFree(subjobs[i]);
        }
 
+       free(jobid_md5_old);    //free the last one
+       free(icnames);
+       free(values);
        free(subjobs);
 
 #ifdef LB_BUF
index 61311d1..1bc7c74 100644 (file)
@@ -139,7 +139,7 @@ int edg_wll_Transaction(edg_wll_Context ctx) { return 0; }
 int edg_wll_Commit(edg_wll_Context ctx) { return 0; }
 int edg_wll_Rollback(edg_wll_Context ctx) { return 0; }
 
-edg_wll_ErrorCode edg_wll_bufferedInsert(edg_wll_bufInsert *bi, char **row)  { return (edg_wll_ErrorCode) 0; };
+edg_wll_ErrorCode edg_wll_bufferedInsert(edg_wll_bufInsert *bi, char *row)  { return (edg_wll_ErrorCode) 0; };
        
 }