static void warn (const char* format, ...) UNUSED_VAR ;
static char *job_owner(edg_wll_Context,char *);
+static edg_wll_ErrorCode get_job_parent(edg_wll_Context ctx, glite_jobid_const_t job, glite_jobid_t *parent);
int js_enable_store = 1;
if (!edg_wll_LoadIntState(ctx, job, DONT_LOCK, -1 /*all events*/, &ijsp)) {
memcpy(stat, &(ijsp->pub), sizeof(ijsp->pub));
- free(jobstat.pub.owner); jobstat.pub.owner = NULL;
destroy_intJobStat_extension(ijsp);
free(ijsp);
} else {
if (edg_wll_intJobStatus(ctx, job, flags,&jobstat, js_enable_store, 0)) {
goto rollback;
}
- memcpy(stat, &(ijsp->pub), sizeof(ijsp->pub));
+ memcpy(stat, &(jobstat.pub), sizeof(jobstat.pub));
}
if (edg_wll_GetACL(ctx, job, &acl)) goto rollback;
whole_cycle = 1;
rollback:
- if (!whole_cycle) {
- edg_wll_FreeStatus(&jobstat.pub);
- jobstat.pub.owner = NULL;
- destroy_intJobStat_extension(&jobstat);
+ if (!whole_cycle) {
+ edg_wll_FreeStatus(&jobstat.pub);
+ memset(stat, 0, sizeof(*stat));
}
- if (jobstat.pub.owner) { free(jobstat.pub.owner); jobstat.pub.owner = NULL; }
+ destroy_intJobStat_extension(&jobstat);
if (acl) { edg_wll_FreeAcl(acl); acl = NULL; }
if (stmt) { free(stmt); stmt = NULL; }
if (sh) { glite_lbu_FreeStmt(&sh); sh = NULL; }
int i, intErr = 0;
int res;
int be_strict = 0;
- char *errstring = NULL;
+ char *errstring = NULL;
edg_wll_QueryRec jqr[2];
edg_wll_QueryRec **jqra;
init_intJobStat(intstat);
string_jobid = edg_wlc_JobIdUnparse(job);
- if (string_jobid == NULL || intstat == NULL)
+ if (string_jobid == NULL || intstat == NULL) {
+ free(string_jobid);
return edg_wll_SetError(ctx,EINVAL, NULL);
+ }
+ free(string_jobid);
/* can be already filled by public edg_wll_JobStat() */
if (intstat->pub.owner == NULL) {
md5_jobid = edg_wlc_JobIdGetUnique(job);
if ( !(intstat->pub.owner = job_owner(ctx,md5_jobid)) ) {
free(md5_jobid);
- free(string_jobid);
return edg_wll_Error(ctx,NULL,NULL);
}
- free(md5_jobid);
}
+ /* re-lock job from InShareMode to ForUpdate
+ * needed by edg_wll_RestoreSubjobState and edg_wll_StoreIntState
+ */
+ res = edg_wll_LockJobRowForUpdate(ctx, md5_jobid);
+ free(md5_jobid);
+ if (res) return edg_wll_Error(ctx, NULL, NULL);
+
jqr[0].attr = EDG_WLL_QUERY_ATTR_JOBID;
jqr[0].op = EDG_WLL_QUERY_OP_EQUAL;
jqr[0].value.j = (glite_jobid_t)job;
jqra[1] = NULL;
if (edg_wll_QueryEventsServer(ctx,1, (const edg_wll_QueryRec **)jqra, NULL, &events)) {
- free(string_jobid);
- free(jqra);
- free(intstat->pub.owner); intstat->pub.owner = NULL;
- return edg_wll_Error(ctx, NULL, NULL);
+ if (edg_wll_Error(ctx, NULL, NULL) == ENOENT) {
+ if (edg_wll_RestoreSubjobState(ctx, job, intstat)) {
+ destroy_intJobStat(intstat);
+ free(jqra);
+ free(intstat->pub.owner); intstat->pub.owner = NULL;
+ return edg_wll_Error(ctx, NULL, NULL);
+ }
+ }
+ else {
+ free(jqra);
+ free(intstat->pub.owner); intstat->pub.owner = NULL;
+ return edg_wll_Error(ctx, NULL, NULL);
+ }
}
- free(jqra);
-
- for (num_events = 0; events[num_events].type != EDG_WLL_EVENT_UNDEF;
- num_events++);
+ else {
+ free(jqra);
- if (num_events == 0) {
- free(string_jobid);
- free(intstat->pub.owner); intstat->pub.owner = NULL;
- return edg_wll_SetError(ctx,ENOENT,NULL);
- }
+ for (num_events = 0; events[num_events].type != EDG_WLL_EVENT_UNDEF;
+ num_events++);
- for (i = 0; i < num_events; i++) {
- res = processEvent(intstat, &events[i], i, be_strict, &errstring);
- if (res == RET_FATAL || res == RET_INTERNAL) { /* !strict */
- intErr = 1; break;
+ if (num_events == 0) {
+ free(intstat->pub.owner); intstat->pub.owner = NULL;
+ return edg_wll_SetError(ctx,ENOENT,NULL);
}
- }
- if (intstat->pub.state == EDG_WLL_JOB_UNDEF) {
- intstat->pub.state = EDG_WLL_JOB_UNKNOWN;
- }
- free(string_jobid);
+ for (i = 0; i < num_events; i++) {
+ res = processEvent(intstat, &events[i], i, be_strict, &errstring);
+ if (res == RET_FATAL || res == RET_INTERNAL) { /* !strict */
+ intErr = 1; break;
+ }
+ }
+ if (intstat->pub.state == EDG_WLL_JOB_UNDEF) {
+ intstat->pub.state = EDG_WLL_JOB_UNKNOWN;
+ }
- for (i=0; i < num_events ; i++) edg_wll_FreeEvent(&events[i]);
- free(events);
+ for (i=0; i < num_events ; i++) edg_wll_FreeEvent(&events[i]);
+ free(events);
+ }
if (intErr) {
destroy_intJobStat(intstat);
intstat->user_fqans[i] = NULL;
}
- // re-lock job from InShareMode to ForUpdate
- md5_jobid = edg_wlc_JobIdGetUnique(job);
- res = edg_wll_LockJobRowForUpdate(ctx, md5_jobid);
- free(md5_jobid);
- if (res) return edg_wll_Error(ctx, NULL, NULL);
-
edg_wll_StoreIntState(ctx, intstat, tsq);
/* recheck
* intJobStat *reread;
}
+/*
+ * Regenarate state of subjob without any event from its parent JobReg Event
+ */
+edg_wll_ErrorCode edg_wll_RestoreSubjobState(
+ edg_wll_Context ctx,
+ glite_jobid_const_t job,
+ intJobStat *intstat)
+{
+ glite_jobid_t parent_job = NULL;
+ edg_wll_QueryRec jqr_p1[2], jqr_p2[2];
+ edg_wll_QueryRec **ec, **jc;
+ edg_wll_Event *events_p;
+ int err, i;
+
+
+ /* find job parent */
+ if (get_job_parent(ctx, job, &parent_job)) goto err;
+
+ /* get registration event(s) of parent*/
+ jqr_p1[0].attr = EDG_WLL_QUERY_ATTR_JOBID;
+ jqr_p1[0].op = EDG_WLL_QUERY_OP_EQUAL;
+ jqr_p1[0].value.j = parent_job;
+ jqr_p1[1].attr = EDG_WLL_QUERY_ATTR_UNDEF;
+
+ jqr_p2[0].attr = EDG_WLL_QUERY_ATTR_EVENT_TYPE;
+ jqr_p2[0].op = EDG_WLL_QUERY_OP_EQUAL;
+ jqr_p2[0].value.i = EDG_WLL_EVENT_REGJOB;
+ jqr_p2[1].attr = EDG_WLL_QUERY_ATTR_UNDEF;
+
+ jc = (edg_wll_QueryRec **) malloc (2 * sizeof(edg_wll_QueryRec **));
+ jc[0] = jqr_p1;
+ jc[1] = NULL;
+
+ ec = (edg_wll_QueryRec **) malloc (2 * sizeof(edg_wll_QueryRec **));
+ ec[0] = jqr_p2;
+ ec[1] = NULL;
+
+ if (edg_wll_QueryEventsServer(ctx,1, (const edg_wll_QueryRec **)jc,
+ (const edg_wll_QueryRec **)jc, &events_p)) {
+ free(jc);
+ free(ec);
+ return edg_wll_Error(ctx, NULL, NULL);
+ }
+ free(jc);
+ free(ec);
+
+ /* recreate job status of subjob */
+ err = intJobStat_embryonic(ctx, job, (const edg_wll_RegJobEvent *) &(events_p[0]), intstat);
+
+ for (i=0; events_p[i].type != EDG_WLL_EVENT_UNDEF ; i++)
+ edg_wll_FreeEvent(&events_p[i]);
+ free(events_p);
+
+ if (err) goto err;
+
+err:
+ return edg_wll_Error(ctx, NULL, NULL);
+}
+
+
/*
* Helper for warning printouts
*/
}
+static edg_wll_ErrorCode get_job_parent(edg_wll_Context ctx, glite_jobid_const_t job, glite_jobid_t *parent)
+{
+ glite_lbu_Statement sh = NULL;
+ char *stmt = NULL, *out = NULL;
+ char *md5_jobid = edg_wlc_JobIdGetUnique(job);
+ int ret;
+
+
+ edg_wll_ResetError(ctx);
+ trio_asprintf(&stmt,"select parent_job from states "
+ "where jobid = '%|Ss'" ,md5_jobid);
+
+ if (stmt==NULL) {
+ edg_wll_SetError(ctx,ENOMEM, NULL);
+ goto err;
+ }
+
+ if (edg_wll_ExecSQL(ctx,stmt,&sh) < 0) goto err;
+
+ if (!edg_wll_FetchRow(ctx,sh,1,NULL,&out)) {
+ edg_wll_SetError(ctx,ENOENT,md5_jobid);
+ goto err;
+ }
+
+ ret = glite_jobid_recreate((const char*) ctx->srvName,
+ ctx->srvPort, (const char *) out, parent);
+
+ if (ret) {
+ edg_wll_SetError(ctx,ret,"Error creating jobid");
+ goto err;
+ }
+
+err:
+ if (sh) glite_lbu_FreeStmt(&sh);
+ free(md5_jobid);
+ free(stmt);
+ free(out);
+
+ return edg_wll_Error(ctx,NULL,NULL);
+}
+
+
#if 0
/* XXX went_through went out */
static int eval_expect_update(intJobStat *js, int* went_through, char **expect_from)
edg_wll_ErrorCode edg_wll_StoreIntStateEmbryonic(edg_wll_Context ctx,
- glite_jobid_const_t jobid,
char *icnames,
char *values,
glite_lbu_bufInsert *bi)
}
+edg_wll_ErrorCode intJobStat_embryonic(
+ edg_wll_Context ctx,
+ glite_jobid_const_t jobid,
+ const edg_wll_RegJobEvent *e,
+ intJobStat *stat)
+{
+ if (edg_wlc_JobIdDup(jobid, &stat->pub.jobId) ||
+ edg_wlc_JobIdDup(e->jobId, &stat->pub.parent_job)) goto err;
+ stat->pub.state = EDG_WLL_JOB_SUBMITTED;
+ stat->pub.owner = strdup(e->user);
+ stat->pub.jobtype = EDG_WLL_STAT_SIMPLE;
+ stat->pub.stateEnterTimes[1 + EDG_WLL_JOB_SUBMITTED] = (int)e->timestamp.tv_sec;
+ stat->pub.lastUpdateTime = e->timestamp;
+
+err:
+ return edg_wll_Error(ctx,NULL,NULL);
+}
/*
* Returns encoded SQL table states record for embryonic DAG subjob
intJobStat *stat = &stat_rec;
init_intJobStat(stat);
- if (edg_wlc_JobIdDup(jobid, &stat->pub.jobId) ||
- edg_wlc_JobIdDup(e->jobId, &stat->pub.parent_job)) goto err;
- stat->pub.state = EDG_WLL_JOB_SUBMITTED;
- stat->pub.owner = strdup(e->user);
- stat->pub.jobtype = EDG_WLL_STAT_SIMPLE;
- stat->pub.stateEnterTimes[1 + EDG_WLL_JOB_SUBMITTED] = (int)e->timestamp.tv_sec;
- stat->pub.lastUpdateTime = e->timestamp;
+ if (intJobStat_embryonic(ctx, jobid, e, stat)) goto err;
+
jobid_md5 = edg_wlc_JobIdGetUnique(jobid);
parent_md5 = edg_wlc_JobIdGetUnique(e->jobId);
stat_enc = enc_intJobStat(strdup(""), stat);
return edg_wll_Error(ctx,NULL,NULL);
}
+
int register_subjobs_embryonic(edg_wll_Context ctx,const edg_wll_RegJobEvent *e)
{
int i, j, err = 0;
edg_wlc_JobId *subjobs = NULL;
- struct timeval now;
char *jobid = NULL, *jobid_md5 = NULL, *jobid_md5_old = NULL;
size_t jobid_len;
glite_lbu_bufInsert bi_s, *bi_states = &bi_s;
jobid_len = strlen(jobid_md5_old);
- gettimeofday(&now,NULL);
-
/* increase the overall request timeout. */
ctx->p_tmp_timeout.tv_sec += e->nsubjobs/10;
if (ctx->p_tmp_timeout.tv_sec > 86400) ctx->p_tmp_timeout.tv_sec = 86400;
free(jobid_md5_old);
jobid_md5_old = jobid_md5;
- if (!err && (err = edg_wll_StoreIntStateEmbryonic(ctx, subjobs[i], icnames, values, bi_states)))
+ if (!err && (err = edg_wll_StoreIntStateEmbryonic(ctx, icnames, values, bi_states)))
edg_wll_Error(ctx,&et,&ed);
if (err) {
return edg_wll_Error(ctx,NULL,NULL);
}
+
int edg_wll_delete_event(edg_wll_Context ctx,const char *jobid,int event)
{
char *stmt;