int edg_wll_intJobStatus(edg_wll_Context, const edg_wlc_JobId, int, intJobStat *, int);
edg_wll_ErrorCode edg_wll_StoreIntState(edg_wll_Context, intJobStat *, int);
-static edg_wll_ErrorCode edg_wll_StoreIntStateEmbriotic(edg_wll_Context, intJobStat *);
+edg_wll_ErrorCode edg_wll_StoreIntStateEmbryonic(edg_wll_Context, edg_wlc_JobId, edg_wlc_JobId);
+static edg_wll_ErrorCode states_values_embryonic(edg_wll_Context, edg_wlc_JobId, edg_wlc_JobId, char **, char**);
edg_wll_ErrorCode edg_wll_LoadIntState(edg_wll_Context , edg_wlc_JobId , int, intJobStat **);
int js_enable_store = 1;
}
-/*
- * Returns encoded instatus for embrio DAG subjob
- */
-
-static char *states_values_embryotic(
- edg_wlc_JobId jobid,
- edg_wlc_JobId parent_job)
-{
- return 0;
-}
-
-
int edg_wll_JobStatus(
edg_wll_Context ctx,
const edg_wlc_JobId job,
}
-static int edg_wll_intJobStatusEmbriotic(
- edg_wll_Context ctx,
- edg_wll_Event *e,
- intJobStat *intstat,
- int update_db)
-{
-
-/* Local variables */
- char *string_jobid;
-
- int intErr = 0;
- int res;
- int be_strict = 0;
- char *errstring = NULL;
-
-
-/* Processing */
- edg_wll_ResetError(ctx);
- init_intJobStat(intstat);
-
-/* XXX: integrity check ?? can we skip? */
- string_jobid = edg_wlc_JobIdUnparse(e->any.jobId);
- if (string_jobid == NULL || intstat == NULL)
- return edg_wll_SetError(ctx,EINVAL, NULL);
- free(string_jobid);
-
- intstat->pub.owner = strdup(e->any.user);
-
- res = processEvent(intstat, e, 0, be_strict, &errstring);
- if (res == RET_FATAL || res == RET_INTERNAL) { /* !strict */
- intErr = 1;
- }
-
- if (intstat->pub.state == EDG_WLL_JOB_UNDEF) {
- intstat->pub.state = EDG_WLL_JOB_UNKNOWN;
- }
-
- if (intErr) {
- destroy_intJobStat(intstat);
- return edg_wll_SetError(ctx, EDG_WLL_ERROR_SERVER_RESPONSE, NULL);
- } else {
- /* XXX intstat->pub.expectUpdate = eval_expect_update(intstat, &intstat->pub.expectFrom); */
- intErr = edg_wlc_JobIdDup(e->any.jobId, &intstat->pub.jobId);
- if (!intErr) {
- if (update_db) {
- edg_wll_StoreIntStateEmbriotic(ctx, intstat);
- /* recheck
- * intJobStat *reread;
- * edg_wll_LoadIntState(ctx, job, tsq, &reread);
- * destroy_intJobStat(reread);
- */
- }
- }
- return edg_wll_SetError(ctx, intErr, NULL);
- }
-
-}
-
/*
* Helper for warning printouts
}
-static edg_wll_ErrorCode edg_wll_StoreIntStateEmbriotic(edg_wll_Context ctx,
- intJobStat *stat)
+/*
+ * Returns encoded SQL table states record for embryonic DAG subjob
+ */
+
+static edg_wll_ErrorCode states_values_embryonic(
+ edg_wll_Context ctx,
+ edg_wlc_JobId jobid,
+ edg_wlc_JobId parent_job,
+ char **icnames,
+ char **values)
{
- char *jobid_md5, *stat_enc, *parent_md5 = NULL;
- char *stmt;
- char *icnames, *icvalues;
+ char *jobid_md5, *stat_enc, *parent_md5;
+ char *stmt = NULL;
+ char *icvalues;
+ intJobStat stat_rec;
+ intJobStat *stat = &stat_rec;
- jobid_md5 = edg_wlc_JobIdGetUnique(stat->pub.jobId);
- stat_enc = states_values_embryotic(stat->pub.jobId, stat->pub.parent_job);;
+ init_intJobStat(stat);
+ if (edg_wlc_JobIdDup(jobid, &stat->pub.jobId) ||
+ edg_wlc_JobIdDup(parent_job, &stat->pub.parent_job)) goto err;
+ stat->pub.state = EDG_WLL_JOB_SUBMITTED;
- parent_md5 = edg_wlc_JobIdGetUnique(stat->pub.parent_job);
- if (parent_md5 == NULL) parent_md5 = strdup("*no parent job*");
+ jobid_md5 = edg_wlc_JobIdGetUnique(jobid);
+ parent_md5 = edg_wlc_JobIdGetUnique(parent_job);
+ stat_enc = enc_intJobStat(strdup(""), stat);
+ if (jobid_md5 || parent_md5 == NULL || stat_enc == NULL) goto err;
+
+
+ if (edg_wll_IColumnsSQLPart(ctx, ctx->job_index_cols, stat, 1, icnames, &icvalues)) goto err;
+ trio_asprintf(&stmt,
+ "('%|Ss',%d,%d,'%|Ss','%|Ss','%|Ss'%s)",
+ jobid_md5, stat->pub.state, 1, stat_enc,
+ INTSTAT_VERSION, parent_md5, icvalues);
+ free(icvalues);
+
+err:
+ destroy_intJobStat(stat);
+ free(jobid_md5); free(stat_enc);
+ free(parent_md5);
+ *values = stmt;
+ return edg_wll_Error(ctx,NULL,NULL);
+}
+edg_wll_ErrorCode edg_wll_StoreIntStateEmbryonic(edg_wll_Context ctx,
+ edg_wlc_JobId jobid,
+ edg_wlc_JobId parent_job)
+{
+ char *values = NULL;
+ char *stmt;
+ char *icnames, *icvalues;
- edg_wll_IColumnsSQLPart(ctx, ctx->job_index_cols, stat, 1, &icnames, &icvalues);
+ if (states_values_embryonic(ctx, jobid, parent_job, &icnames, &values))
+ goto cleanup;
+
+/* TODO
+ edg_wll_UpdateStatistics(ctx, NULL, e, &jobstat.pub);
+ if (ctx->rgma_export) write2rgma_status(&jobstat.pub);
+*/
trio_asprintf(&stmt,
"insert into states"
"(jobid,status,seq,int_status,version"
",parent_job%s) "
- "values ('%|Ss',%d,%d,'%|Ss','%|Ss','%|Ss'%s)",
- icnames,
- jobid_md5, stat->pub.state, 0, stat_enc,
- INTSTAT_VERSION, parent_md5, icvalues);
+ "values %s",
+ icnames, values);
free(icnames); free(icvalues);
if (edg_wll_ExecStmt(ctx,stmt,NULL) < 0) goto cleanup;
cleanup:
free(stmt);
- free(jobid_md5); free(stat_enc);
- free(parent_md5);
+ free(values);
return edg_wll_Error(ctx,NULL,NULL);
}
return edg_wll_Error(ctx, NULL, NULL);
}
-
-/*
- * store embriotic state of DAGs' subjob
- */
-
-edg_wll_ErrorCode edg_wll_StepIntStateEmbriotic(edg_wll_Context ctx,
- edg_wll_Event *e)
-{
- intJobStat jobstat;
-
-
- if (!edg_wll_intJobStatusEmbriotic(ctx, e, &jobstat, js_enable_store))
- {
- edg_wll_UpdateStatistics(ctx, NULL, e, &jobstat.pub);
- if (ctx->rgma_export) write2rgma_status(&jobstat.pub);
- destroy_intJobStat(&jobstat);
- }
-
- return edg_wll_Error(ctx, NULL, NULL);
-}
int i,err;
edg_wlc_JobId *subjobs;
struct timeval now;
- edg_wll_Event e2;
edg_wll_ResetError(ctx);
ctx->p_tmp_timeout.tv_sec += e->nsubjobs;
if (ctx->p_tmp_timeout.tv_sec > 86400) ctx->p_tmp_timeout.tv_sec = 86400;
- memset(&e2,0,sizeof e2);
- e2.type = EDG_WLL_EVENT_REGJOB;
- memcpy(&e2.regJob.timestamp,&now,sizeof now);
- e2.any.host = strdup(ctx->srvName);
- e2.any.level = e->level;
- e2.any.priority = e->priority;
- e2.any.seqcode = strdup(EDG_WLL_SEQ_NULL);
- e2.any.user = strdup(e->user);
- e2.any.source = e->source;
- e2.any.src_instance = strdup(ctx->isProxy ?
- "L&B proxy" : "L&B server");
- e2.regJob.ns = strdup(e->ns);
- edg_wlc_JobIdDup(e->jobId,&e2.regJob.parent);
- e2.regJob.jobtype = EDG_WLL_REGJOB_SIMPLE;
- e2.regJob.jdl = strdup("");
-
for (i=0; i<e->nsubjobs; i++) {
char *et,*ed,*job_s;
- e2.any.jobId = subjobs[i]; subjobs[i] = NULL;
- // XXX: timespamp each subjob with actual time??
-
- if ((err = edg_wll_StepIntStateEmbriotic(ctx, &e2)))
+ if ((err = edg_wll_StoreIntStateEmbryonic(ctx, subjobs[i], e->jobId)))
edg_wll_Error(ctx,&et,&ed);
if (err) {
- job_s = edg_wlc_JobIdUnparse(e2.any.jobId);
+ job_s = edg_wlc_JobIdUnparse(subjobs[i]);
fprintf(stderr,"%s: %s (%s)\n",job_s,et,ed);
syslog(LOG_ERR,"%s: %s (%s)",job_s,et,ed);
free(job_s); free(et); free(ed);
edg_wll_ResetError(ctx);
}
- edg_wlc_JobIdFree(e2.any.jobId);
+ edg_wlc_JobIdFree(subjobs[i]);
}
- edg_wll_FreeEvent(&e2);
free(subjobs);
return edg_wll_Error(ctx,NULL,NULL);