int edg_wll_intJobStatus(edg_wll_Context, const edg_wlc_JobId, int, intJobStat *, int);
edg_wll_ErrorCode edg_wll_StoreIntState(edg_wll_Context, intJobStat *, int);
+static edg_wll_ErrorCode edg_wll_StoreIntStateEmbriotic(edg_wll_Context, intJobStat *);
edg_wll_ErrorCode edg_wll_LoadIntState(edg_wll_Context , edg_wlc_JobId , int, intJobStat **);
int js_enable_store = 1;
}
+/*
+ * Returns encoded instatus for embrio DAG subjob
+ */
+
+static char *states_values_embryotic(
+ edg_wlc_JobId jobid,
+ edg_wlc_JobId parent_job)
+{
+ return 0;
+}
+
+
int edg_wll_JobStatus(
edg_wll_Context ctx,
const edg_wlc_JobId job,
}
+static int edg_wll_intJobStatusEmbriotic(
+ edg_wll_Context ctx,
+ edg_wll_Event *e,
+ intJobStat *intstat,
+ int update_db)
+{
+
+/* Local variables */
+ char *string_jobid;
+
+ int intErr = 0;
+ int res;
+ int be_strict = 0;
+ char *errstring = NULL;
+
+
+/* Processing */
+ edg_wll_ResetError(ctx);
+ init_intJobStat(intstat);
+
+/* XXX: integrity check ?? can we skip? */
+ string_jobid = edg_wlc_JobIdUnparse(e->any.jobId);
+ if (string_jobid == NULL || intstat == NULL)
+ return edg_wll_SetError(ctx,EINVAL, NULL);
+ free(string_jobid);
+
+ intstat->pub.owner = strdup(e->any.user);
+
+ res = processEvent(intstat, e, 0, be_strict, &errstring);
+ if (res == RET_FATAL || res == RET_INTERNAL) { /* !strict */
+ intErr = 1;
+ }
+
+ if (intstat->pub.state == EDG_WLL_JOB_UNDEF) {
+ intstat->pub.state = EDG_WLL_JOB_UNKNOWN;
+ }
+
+ if (intErr) {
+ destroy_intJobStat(intstat);
+ return edg_wll_SetError(ctx, EDG_WLL_ERROR_SERVER_RESPONSE, NULL);
+ } else {
+ /* XXX intstat->pub.expectUpdate = eval_expect_update(intstat, &intstat->pub.expectFrom); */
+ intErr = edg_wlc_JobIdDup(e->any.jobId, &intstat->pub.jobId);
+ if (!intErr) {
+ if (update_db) {
+ edg_wll_StoreIntStateEmbriotic(ctx, intstat);
+ /* recheck
+ * intJobStat *reread;
+ * edg_wll_LoadIntState(ctx, job, tsq, &reread);
+ * destroy_intJobStat(reread);
+ */
+ }
+ }
+ return edg_wll_SetError(ctx, intErr, NULL);
+ }
+
+}
+
+
/*
* Helper for warning printouts
*/
return edg_wll_Error(ctx,NULL,NULL);
}
+
+static edg_wll_ErrorCode edg_wll_StoreIntStateEmbriotic(edg_wll_Context ctx,
+ intJobStat *stat)
+{
+ char *jobid_md5, *stat_enc, *parent_md5 = NULL;
+ char *stmt;
+ char *icnames, *icvalues;
+
+ jobid_md5 = edg_wlc_JobIdGetUnique(stat->pub.jobId);
+ stat_enc = states_values_embryotic(stat->pub.jobId, stat->pub.parent_job);;
+
+ parent_md5 = edg_wlc_JobIdGetUnique(stat->pub.parent_job);
+ if (parent_md5 == NULL) parent_md5 = strdup("*no parent job*");
+
+
+ edg_wll_IColumnsSQLPart(ctx, ctx->job_index_cols, stat, 1, &icnames, &icvalues);
+ trio_asprintf(&stmt,
+ "insert into states"
+ "(jobid,status,seq,int_status,version"
+ ",parent_job%s) "
+ "values ('%|Ss',%d,%d,'%|Ss','%|Ss','%|Ss'%s)",
+ icnames,
+ jobid_md5, stat->pub.state, 0, stat_enc,
+ INTSTAT_VERSION, parent_md5, icvalues);
+ free(icnames); free(icvalues);
+
+ if (edg_wll_ExecStmt(ctx,stmt,NULL) < 0) goto cleanup;
+
+
+cleanup:
+ free(stmt);
+ free(jobid_md5); free(stat_enc);
+ free(parent_md5);
+ return edg_wll_Error(ctx,NULL,NULL);
+}
+
/*
* Retrieve stored job state from states and status_tags DB tables.
* Should be called with the job locked.
}
return edg_wll_Error(ctx, NULL, NULL);
}
+
+
+/*
+ * store embriotic state of DAGs' subjob
+ */
+
+edg_wll_ErrorCode edg_wll_StepIntStateEmbriotic(edg_wll_Context ctx,
+ edg_wll_Event *e)
+{
+ intJobStat jobstat;
+
+
+ if (!edg_wll_intJobStatusEmbriotic(ctx, e, &jobstat, js_enable_store))
+ {
+ edg_wll_UpdateStatistics(ctx, NULL, e, &jobstat.pub);
+ if (ctx->rgma_export) write2rgma_status(&jobstat.pub);
+ destroy_intJobStat(&jobstat);
+ }
+
+ return edg_wll_Error(ctx, NULL, NULL);
+}
static int check_dup(edg_wll_Context,edg_wll_Event *);
static int check_auth(edg_wll_Context,edg_wll_Event *e);
static int register_subjobs(edg_wll_Context,const edg_wll_RegJobEvent *);
+static int register_subjobs_embriotic(edg_wll_Context,const edg_wll_RegJobEvent *);
void edg_wll_StoreAnonymous(edg_wll_Context ctx,int anon) {
ctx->allowAnonymous = anon;
return edg_wll_Error(ctx,NULL,NULL);
}
+static int register_subjobs_embriotic(edg_wll_Context ctx,const edg_wll_RegJobEvent *e)
+{
+ int i,err;
+ edg_wlc_JobId *subjobs;
+ struct timeval now;
+
+ edg_wll_ResetError(ctx);
+ if (e->nsubjobs == 0) return 0;
+ if (e->nsubjobs < 0) return edg_wll_SetError(ctx,EINVAL,"negative number of subjobs");
+
+ if ((err = edg_wll_GenerateSubjobIds(ctx,e->jobId,e->nsubjobs,e->seed,&subjobs)))
+ return err;
+
+ gettimeofday(&now,NULL);
+
+/* XXX: increase the overall request timeout. */
+/* XXX: smaller incerease may be added - e->nsubjobs/100 ? */
+ ctx->p_tmp_timeout.tv_sec += e->nsubjobs;
+ if (ctx->p_tmp_timeout.tv_sec > 86400) ctx->p_tmp_timeout.tv_sec = 86400;
+
+ for (i=0; i<e->nsubjobs; i++) {
+ edg_wll_Event e2;
+ int seq;
+ char *et,*ed,*job_s;
+
+/* XXX: shloud be possible to move all static stuff out of the cycle
+ * and free event after the whole loop */
+ memset(&e2,0,sizeof e2);
+ e2.type = EDG_WLL_EVENT_REGJOB;
+ e2.any.jobId = subjobs[i]; subjobs[i] = NULL;
+ memcpy(&e2.regJob.timestamp,&now,sizeof now);
+ e2.any.host = strdup(ctx->srvName);
+ e2.any.level = e->level;
+ e2.any.priority = e->priority;
+ e2.any.seqcode = strdup(EDG_WLL_SEQ_NULL);
+ e2.any.user = strdup(e->user);
+ e2.any.source = e->source;
+ e2.any.src_instance = strdup(ctx->isProxy ?
+ "L&B proxy" : "L&B server");
+ e2.regJob.ns = strdup(e->ns);
+ edg_wlc_JobIdDup(e->jobId,&e2.regJob.parent);
+ e2.regJob.jobtype = EDG_WLL_REGJOB_SIMPLE;
+ e2.regJob.jdl = strdup("");
+
+
+ if ((err = edg_wll_StepIntStateEmbriotic(ctx, &e2)))
+ edg_wll_Error(ctx,&et,&ed);
+
+ if (err) {
+ job_s = edg_wlc_JobIdUnparse(e2.any.jobId);
+ fprintf(stderr,"%s: %s (%s)\n",job_s,et,ed);
+ syslog(LOG_ERR,"%s: %s (%s)",job_s,et,ed);
+ free(job_s); free(et); free(ed);
+ edg_wll_ResetError(ctx);
+ }
+
+ edg_wll_FreeEvent(&e2);
+ }
+
+ return edg_wll_Error(ctx,NULL,NULL);
+}
+
int edg_wll_delete_event(edg_wll_Context ctx,const char *jobid,int event)
{
char *stmt;