static void warn (const char* format, ...) UNUSED_VAR ;
static char *job_owner(edg_wll_Context,char *);
-static edg_wll_ErrorCode states_values_embryonic(edg_wll_Context, edg_wlc_JobId, const edg_wll_RegJobEvent *e, char **, char**);
int js_enable_store = 1;
}
-/*
- * Returns encoded SQL table states record for embryonic DAG subjob
- */
-
-static edg_wll_ErrorCode states_values_embryonic(
- edg_wll_Context ctx,
- edg_wlc_JobId jobid,
- const edg_wll_RegJobEvent *e,
- char **icnames,
- char **values)
-{
- char *jobid_md5, *stat_enc, *parent_md5;
- char *stmt = NULL;
- char *icvalues;
- intJobStat stat_rec;
- intJobStat *stat = &stat_rec;
-
- init_intJobStat(stat);
- if (edg_wlc_JobIdDup(jobid, &stat->pub.jobId) ||
- edg_wlc_JobIdDup(e->jobId, &stat->pub.parent_job)) goto err;
- stat->pub.state = EDG_WLL_JOB_SUBMITTED;
- stat->pub.owner = strdup(e->user);
- stat->pub.stateEnterTimes[1 + EDG_WLL_JOB_SUBMITTED] = (int)e->timestamp.tv_sec;
-
- jobid_md5 = edg_wlc_JobIdGetUnique(jobid);
- parent_md5 = edg_wlc_JobIdGetUnique(e->jobId);
- stat_enc = enc_intJobStat(strdup(""), stat);
- if (jobid_md5 == NULL || parent_md5 == NULL || stat_enc == NULL) goto err;
-
-
- if (edg_wll_IColumnsSQLPart(ctx, ctx->job_index_cols, stat, 1, icnames, &icvalues)) goto err;
- trio_asprintf(&stmt,
- "'%|Ss',%d,%d,'%|Ss','%|Ss','%|Ss'%s",
- jobid_md5, stat->pub.state, 1, stat_enc,
- INTSTAT_VERSION, parent_md5, icvalues);
- free(icvalues);
-
-err:
- destroy_intJobStat(stat);
- free(jobid_md5);
- free(stat_enc);
- free(parent_md5);
- *values = stmt;
- return edg_wll_Error(ctx,NULL,NULL);
-}
-
edg_wll_ErrorCode edg_wll_StoreIntStateEmbryonic(edg_wll_Context ctx,
edg_wlc_JobId jobid,
- const edg_wll_RegJobEvent *e,
+ char *icnames,
+ char *values,
edg_wll_bufInsert *bi)
{
- char *values = NULL;
char *stmt = NULL;
- char *icnames = NULL;
-
- if (states_values_embryonic(ctx, jobid, e, &icnames, &values))
- goto cleanup;
/* TODO
edg_wll_UpdateStatistics(ctx, NULL, e, &jobstat.pub);
*/
#ifdef LB_BUF
- if (edg_wll_bufferedInsert(bi, &values))
+ if (edg_wll_bufferedInsert(bi, values))
goto cleanup;
#else
#endif
cleanup:
- free(icnames);
- free(values);
free(stmt);
return edg_wll_Error(ctx,NULL,NULL);
edg_wll_ResetError(ctx);
trio_asprintf(&row, "'%|Ss','%|Ss','%|Ss'", jobid,jobstr,userid);
- edg_wll_bufferedInsert(bi, &row); // no need to free row
+ edg_wll_bufferedInsert(bi, row); // no need to free row
free(jobstr);
free(jobid);
return edg_wll_Error(ctx,NULL,NULL);
}
+
+/*
+ * Returns encoded SQL table states record for embryonic DAG subjob
+ */
+
+static edg_wll_ErrorCode states_values_embryonic(
+ edg_wll_Context ctx,
+ edg_wlc_JobId jobid,
+ const edg_wll_RegJobEvent *e,
+ char **icnames,
+ char **values)
+{
+ char *jobid_md5, *stat_enc, *parent_md5;
+ char *stmt = NULL;
+ char *icvalues;
+ intJobStat stat_rec;
+ intJobStat *stat = &stat_rec;
+
+ init_intJobStat(stat);
+ if (edg_wlc_JobIdDup(jobid, &stat->pub.jobId) ||
+ edg_wlc_JobIdDup(e->jobId, &stat->pub.parent_job)) goto err;
+ stat->pub.state = EDG_WLL_JOB_SUBMITTED;
+ stat->pub.owner = strdup(e->user);
+ stat->pub.stateEnterTimes[1 + EDG_WLL_JOB_SUBMITTED] = (int)e->timestamp.tv_sec;
+
+ jobid_md5 = edg_wlc_JobIdGetUnique(jobid);
+ parent_md5 = edg_wlc_JobIdGetUnique(e->jobId);
+ stat_enc = enc_intJobStat(strdup(""), stat);
+ if (jobid_md5 == NULL || parent_md5 == NULL || stat_enc == NULL) goto err;
+
+
+ if (edg_wll_IColumnsSQLPart(ctx, ctx->job_index_cols, stat, 1, icnames, &icvalues)) goto err;
+ trio_asprintf(&stmt,
+ "'%|Ss',%d,%d,'%|Ss','%|Ss','%|Ss'%s",
+ jobid_md5, stat->pub.state, 1, stat_enc,
+ INTSTAT_VERSION, parent_md5, icvalues);
+ free(icvalues);
+
+err:
+ destroy_intJobStat(stat);
+ free(jobid_md5);
+ free(stat_enc);
+ free(parent_md5);
+ *values = stmt;
+ return edg_wll_Error(ctx,NULL,NULL);
+}
+
static int register_subjobs_embryonic(edg_wll_Context ctx,const edg_wll_RegJobEvent *e)
{
int i, err = 0;
edg_wlc_JobId *subjobs;
struct timeval now;
char *userid = strdup(strmd5(e->user,NULL));
+ char *jobid_md5, *jobid_md5_old;
+ size_t jobid_len;
edg_wll_bufInsert bi_j, bi_s;
edg_wll_bufInsert *bi_jobs = &bi_j, *bi_states = &bi_s;
+ char *icnames, *values;
edg_wll_ResetError(ctx);
+ if (e->nsubjobs == 0) return 0;
+ if (e->nsubjobs < 0) return edg_wll_SetError(ctx,EINVAL,"negative number of subjobs");
+ if ((err = edg_wll_GenerateSubjobIds(ctx,e->jobId,e->nsubjobs,e->seed,&subjobs)))
+ return err;
+
+ /* find out icnames and values once, then only change jobids */
+ if (states_values_embryonic(ctx, subjobs[0], e, &icnames, &values))
+ edg_wll_Error(ctx, NULL, NULL);
+ jobid_md5_old = edg_wlc_JobIdGetUnique(subjobs[0]);
+ jobid_len = strlen(jobid_md5_old);
+
+
#ifdef LB_BUF
/* init multirows insert mechanism for tables used here */
- if (edg_wll_bufferedInsertInit(ctx, bi_jobs, NULL, "jobs", 100000, 1000,
+ if (edg_wll_bufferedInsertInit(ctx, bi_jobs, NULL, "jobs", 4000, 1000,
"jobid, dg_jobid, userid"))
{
return edg_wll_SetError(ctx, EINVAL, "edg_wll_bufferedInsertInit()");
}
- if (edg_wll_bufferedInsertInit(ctx, bi_states, NULL, "states", 1000000, 1000,
+
+// XXX: use icnames here
+
+ if (edg_wll_bufferedInsertInit(ctx, bi_states, NULL, "states", 4000, 1000,
"jobid, status, seq,int_status, version, parent_job, `STD_location`,"
"`STD_owner`, `STD_destination`, `TIME_Cleared`, `TIME_Aborted`,"
"`TIME_Cancelled`, `TIME_Submitted`"))
}
#endif
- if (e->nsubjobs == 0) return 0;
- if (e->nsubjobs < 0) return edg_wll_SetError(ctx,EINVAL,"negative number of subjobs");
-
- if ((err = edg_wll_GenerateSubjobIds(ctx,e->jobId,e->nsubjobs,e->seed,&subjobs)))
- return err;
gettimeofday(&now,NULL);
-/* XXX: increase the overall request timeout. */
-/* XXX: smaller incerease may be added - e->nsubjobs/100 ? */
- ctx->p_tmp_timeout.tv_sec += e->nsubjobs;
+ /* increase the overall request timeout. */
+ ctx->p_tmp_timeout.tv_sec += e->nsubjobs/10;
if (ctx->p_tmp_timeout.tv_sec > 86400) ctx->p_tmp_timeout.tv_sec = 86400;
for (i=0; i<e->nsubjobs; i++) {
- char *et,*ed,*job_s;
+ char *et,*ed,*job_s,*p,*p1;
/* save jobid-userid relation into jobs table */
#ifdef LB_BUF
#endif
edg_wll_Error(ctx,&et,&ed);
- if (!err && (err = edg_wll_StoreIntStateEmbryonic(ctx, subjobs[i], e, bi_states)))
+ /* interchange variable parts (jobids) in values */
+ /* there are only two occurences of subjob jobid */
+ jobid_md5 = edg_wlc_JobIdGetUnique(subjobs[i]);
+ if (i) {
+ p = strstr(values, jobid_md5_old);
+ assert(p);
+ memcpy(p, jobid_md5, jobid_len);
+
+ p1 = strstr(p + jobid_len, jobid_md5_old);
+ assert(p1);
+ memcpy(p1, jobid_md5, jobid_len);
+ }
+ free(jobid_md5_old);
+ jobid_md5_old = jobid_md5;
+
+ if (!err && (err = edg_wll_StoreIntStateEmbryonic(ctx, subjobs[i], icnames, values, bi_states)))
edg_wll_Error(ctx,&et,&ed);
//job_s = edg_wlc_JobIdUnparse(subjobs[i]);
edg_wlc_JobIdFree(subjobs[i]);
}
+ free(jobid_md5_old); //free the last one
+ free(icnames);
+ free(values);
free(subjobs);
#ifdef LB_BUF