From: Miloš Mulač Date: Sun, 13 Jul 2008 13:20:35 +0000 (+0000) Subject: recompute int_status of subjobs without any events when version change X-Git-Tag: merge_316_6_dst~20 X-Git-Url: http://scientific.zcu.cz/git/?a=commitdiff_plain;h=d4344c64ddfbe1efd9fa8fd595ed568a143aee56;p=jra1mw.git recompute int_status of subjobs without any events when version change - needs some testing --- diff --git a/org.glite.lb.server/interface/store.h b/org.glite.lb.server/interface/store.h index 8c55de8..3eb9789 100644 --- a/org.glite.lb.server/interface/store.h +++ b/org.glite.lb.server/interface/store.h @@ -32,6 +32,8 @@ int create_reply(const edg_wll_Context,char **); int is_job_local(edg_wll_Context, glite_jobid_const_t jobId); int store_job_server_proxy(edg_wll_Context ctx, edg_wll_Event *event, int *register_to_JP); int register_subjobs_embryonic(edg_wll_Context,const edg_wll_RegJobEvent *); +edg_wll_ErrorCode intJobStat_embryonic(edg_wll_Context ctx, glite_jobid_const_t jobid, const edg_wll_RegJobEvent *e, intJobStat *stat); + int edg_wll_delete_event(edg_wll_Context,const char *, int); diff --git a/org.glite.lb.server/src/jobstat.c b/org.glite.lb.server/src/jobstat.c index 38609c5..ec92ea2 100644 --- a/org.glite.lb.server/src/jobstat.c +++ b/org.glite.lb.server/src/jobstat.c @@ -43,6 +43,7 @@ static void warn (const char* format, ...) UNUSED_VAR ; static char *job_owner(edg_wll_Context,char *); +static edg_wll_ErrorCode get_job_parent(edg_wll_Context ctx, glite_jobid_const_t job, glite_jobid_t *parent); int js_enable_store = 1; @@ -114,14 +115,13 @@ int edg_wll_JobStatusServer( if (!edg_wll_LoadIntState(ctx, job, DONT_LOCK, -1 /*all events*/, &ijsp)) { memcpy(stat, &(ijsp->pub), sizeof(ijsp->pub)); - free(jobstat.pub.owner); jobstat.pub.owner = NULL; destroy_intJobStat_extension(ijsp); free(ijsp); } else { if (edg_wll_intJobStatus(ctx, job, flags,&jobstat, js_enable_store, 0)) { goto rollback; } - memcpy(stat, &(ijsp->pub), sizeof(ijsp->pub)); + memcpy(stat, &(jobstat.pub), sizeof(jobstat.pub)); } if (edg_wll_GetACL(ctx, job, &acl)) goto rollback; @@ -312,12 +312,11 @@ int edg_wll_JobStatusServer( whole_cycle = 1; rollback: - if (!whole_cycle) { - edg_wll_FreeStatus(&jobstat.pub); - jobstat.pub.owner = NULL; - destroy_intJobStat_extension(&jobstat); + if (!whole_cycle) { + edg_wll_FreeStatus(&jobstat.pub); + memset(stat, 0, sizeof(*stat)); } - if (jobstat.pub.owner) { free(jobstat.pub.owner); jobstat.pub.owner = NULL; } + destroy_intJobStat_extension(&jobstat); if (acl) { edg_wll_FreeAcl(acl); acl = NULL; } if (stmt) { free(stmt); stmt = NULL; } if (sh) { glite_lbu_FreeStmt(&sh); sh = NULL; } @@ -349,7 +348,7 @@ int edg_wll_intJobStatus( int i, intErr = 0; int res; int be_strict = 0; - char *errstring = NULL; + char *errstring = NULL; edg_wll_QueryRec jqr[2]; edg_wll_QueryRec **jqra; @@ -359,20 +358,28 @@ int edg_wll_intJobStatus( init_intJobStat(intstat); string_jobid = edg_wlc_JobIdUnparse(job); - if (string_jobid == NULL || intstat == NULL) + if (string_jobid == NULL || intstat == NULL) { + free(string_jobid); return edg_wll_SetError(ctx,EINVAL, NULL); + } + free(string_jobid); /* can be already filled by public edg_wll_JobStat() */ if (intstat->pub.owner == NULL) { md5_jobid = edg_wlc_JobIdGetUnique(job); if ( !(intstat->pub.owner = job_owner(ctx,md5_jobid)) ) { free(md5_jobid); - free(string_jobid); return edg_wll_Error(ctx,NULL,NULL); } - free(md5_jobid); } + /* re-lock job from InShareMode to ForUpdate + * needed by edg_wll_RestoreSubjobState and edg_wll_StoreIntState + */ + res = edg_wll_LockJobRowForUpdate(ctx, md5_jobid); + free(md5_jobid); + if (res) return edg_wll_Error(ctx, NULL, NULL); + jqr[0].attr = EDG_WLL_QUERY_ATTR_JOBID; jqr[0].op = EDG_WLL_QUERY_OP_EQUAL; jqr[0].value.j = (glite_jobid_t)job; @@ -383,37 +390,45 @@ int edg_wll_intJobStatus( jqra[1] = NULL; if (edg_wll_QueryEventsServer(ctx,1, (const edg_wll_QueryRec **)jqra, NULL, &events)) { - free(string_jobid); - free(jqra); - free(intstat->pub.owner); intstat->pub.owner = NULL; - return edg_wll_Error(ctx, NULL, NULL); + if (edg_wll_Error(ctx, NULL, NULL) == ENOENT) { + if (edg_wll_RestoreSubjobState(ctx, job, intstat)) { + destroy_intJobStat(intstat); + free(jqra); + free(intstat->pub.owner); intstat->pub.owner = NULL; + return edg_wll_Error(ctx, NULL, NULL); + } + } + else { + free(jqra); + free(intstat->pub.owner); intstat->pub.owner = NULL; + return edg_wll_Error(ctx, NULL, NULL); + } } - free(jqra); - - for (num_events = 0; events[num_events].type != EDG_WLL_EVENT_UNDEF; - num_events++); + else { + free(jqra); - if (num_events == 0) { - free(string_jobid); - free(intstat->pub.owner); intstat->pub.owner = NULL; - return edg_wll_SetError(ctx,ENOENT,NULL); - } + for (num_events = 0; events[num_events].type != EDG_WLL_EVENT_UNDEF; + num_events++); - for (i = 0; i < num_events; i++) { - res = processEvent(intstat, &events[i], i, be_strict, &errstring); - if (res == RET_FATAL || res == RET_INTERNAL) { /* !strict */ - intErr = 1; break; + if (num_events == 0) { + free(intstat->pub.owner); intstat->pub.owner = NULL; + return edg_wll_SetError(ctx,ENOENT,NULL); } - } - if (intstat->pub.state == EDG_WLL_JOB_UNDEF) { - intstat->pub.state = EDG_WLL_JOB_UNKNOWN; - } - free(string_jobid); + for (i = 0; i < num_events; i++) { + res = processEvent(intstat, &events[i], i, be_strict, &errstring); + if (res == RET_FATAL || res == RET_INTERNAL) { /* !strict */ + intErr = 1; break; + } + } + if (intstat->pub.state == EDG_WLL_JOB_UNDEF) { + intstat->pub.state = EDG_WLL_JOB_UNKNOWN; + } - for (i=0; i < num_events ; i++) edg_wll_FreeEvent(&events[i]); - free(events); + for (i=0; i < num_events ; i++) edg_wll_FreeEvent(&events[i]); + free(events); + } if (intErr) { destroy_intJobStat(intstat); @@ -434,12 +449,6 @@ int edg_wll_intJobStatus( intstat->user_fqans[i] = NULL; } - // re-lock job from InShareMode to ForUpdate - md5_jobid = edg_wlc_JobIdGetUnique(job); - res = edg_wll_LockJobRowForUpdate(ctx, md5_jobid); - free(md5_jobid); - if (res) return edg_wll_Error(ctx, NULL, NULL); - edg_wll_StoreIntState(ctx, intstat, tsq); /* recheck * intJobStat *reread; @@ -453,6 +462,66 @@ int edg_wll_intJobStatus( } +/* + * Regenarate state of subjob without any event from its parent JobReg Event + */ +edg_wll_ErrorCode edg_wll_RestoreSubjobState( + edg_wll_Context ctx, + glite_jobid_const_t job, + intJobStat *intstat) +{ + glite_jobid_t parent_job = NULL; + edg_wll_QueryRec jqr_p1[2], jqr_p2[2]; + edg_wll_QueryRec **ec, **jc; + edg_wll_Event *events_p; + int err, i; + + + /* find job parent */ + if (get_job_parent(ctx, job, &parent_job)) goto err; + + /* get registration event(s) of parent*/ + jqr_p1[0].attr = EDG_WLL_QUERY_ATTR_JOBID; + jqr_p1[0].op = EDG_WLL_QUERY_OP_EQUAL; + jqr_p1[0].value.j = parent_job; + jqr_p1[1].attr = EDG_WLL_QUERY_ATTR_UNDEF; + + jqr_p2[0].attr = EDG_WLL_QUERY_ATTR_EVENT_TYPE; + jqr_p2[0].op = EDG_WLL_QUERY_OP_EQUAL; + jqr_p2[0].value.i = EDG_WLL_EVENT_REGJOB; + jqr_p2[1].attr = EDG_WLL_QUERY_ATTR_UNDEF; + + jc = (edg_wll_QueryRec **) malloc (2 * sizeof(edg_wll_QueryRec **)); + jc[0] = jqr_p1; + jc[1] = NULL; + + ec = (edg_wll_QueryRec **) malloc (2 * sizeof(edg_wll_QueryRec **)); + ec[0] = jqr_p2; + ec[1] = NULL; + + if (edg_wll_QueryEventsServer(ctx,1, (const edg_wll_QueryRec **)jc, + (const edg_wll_QueryRec **)jc, &events_p)) { + free(jc); + free(ec); + return edg_wll_Error(ctx, NULL, NULL); + } + free(jc); + free(ec); + + /* recreate job status of subjob */ + err = intJobStat_embryonic(ctx, job, (const edg_wll_RegJobEvent *) &(events_p[0]), intstat); + + for (i=0; events_p[i].type != EDG_WLL_EVENT_UNDEF ; i++) + edg_wll_FreeEvent(&events_p[i]); + free(events_p); + + if (err) goto err; + +err: + return edg_wll_Error(ctx, NULL, NULL); +} + + /* * Helper for warning printouts */ @@ -501,6 +570,48 @@ static char *job_owner(edg_wll_Context ctx,char *md5_jobid) } +static edg_wll_ErrorCode get_job_parent(edg_wll_Context ctx, glite_jobid_const_t job, glite_jobid_t *parent) +{ + glite_lbu_Statement sh = NULL; + char *stmt = NULL, *out = NULL; + char *md5_jobid = edg_wlc_JobIdGetUnique(job); + int ret; + + + edg_wll_ResetError(ctx); + trio_asprintf(&stmt,"select parent_job from states " + "where jobid = '%|Ss'" ,md5_jobid); + + if (stmt==NULL) { + edg_wll_SetError(ctx,ENOMEM, NULL); + goto err; + } + + if (edg_wll_ExecSQL(ctx,stmt,&sh) < 0) goto err; + + if (!edg_wll_FetchRow(ctx,sh,1,NULL,&out)) { + edg_wll_SetError(ctx,ENOENT,md5_jobid); + goto err; + } + + ret = glite_jobid_recreate((const char*) ctx->srvName, + ctx->srvPort, (const char *) out, parent); + + if (ret) { + edg_wll_SetError(ctx,ret,"Error creating jobid"); + goto err; + } + +err: + if (sh) glite_lbu_FreeStmt(&sh); + free(md5_jobid); + free(stmt); + free(out); + + return edg_wll_Error(ctx,NULL,NULL); +} + + #if 0 /* XXX went_through went out */ static int eval_expect_update(intJobStat *js, int* went_through, char **expect_from) @@ -656,7 +767,6 @@ cleanup: edg_wll_ErrorCode edg_wll_StoreIntStateEmbryonic(edg_wll_Context ctx, - glite_jobid_const_t jobid, char *icnames, char *values, glite_lbu_bufInsert *bi) diff --git a/org.glite.lb.server/src/jobstat.h b/org.glite.lb.server/src/jobstat.h index b0b916a..240fd98 100644 --- a/org.glite.lb.server/src/jobstat.h +++ b/org.glite.lb.server/src/jobstat.h @@ -12,8 +12,10 @@ int edg_wll_JobStatusServer(edg_wll_Context, glite_jobid_const_t, int, edg_wll_J int edg_wll_intJobStatus( edg_wll_Context, glite_jobid_const_t, int, intJobStat *, int, int); edg_wll_ErrorCode edg_wll_StoreIntState(edg_wll_Context, intJobStat *, int); -edg_wll_ErrorCode edg_wll_StoreIntStateEmbryonic(edg_wll_Context, glite_jobid_const_t, char *icnames, char *values, glite_lbu_bufInsert *bi); +edg_wll_ErrorCode edg_wll_StoreIntStateEmbryonic(edg_wll_Context, char *icnames, char *values, glite_lbu_bufInsert *bi); edg_wll_ErrorCode edg_wll_LoadIntState(edg_wll_Context , glite_jobid_const_t , int, int, intJobStat **); +edg_wll_ErrorCode edg_wll_RestoreSubjobState(edg_wll_Context , glite_jobid_const_t , intJobStat *); + /* update stored job state according to new event */ edg_wll_ErrorCode edg_wll_StepIntState(edg_wll_Context ctx, glite_jobid_const_t job, edg_wll_Event *e, int seq, edg_wll_JobStat *stat_out); diff --git a/org.glite.lb.server/src/store.c.T b/org.glite.lb.server/src/store.c.T index f774788..359deb9 100644 --- a/org.glite.lb.server/src/store.c.T +++ b/org.glite.lb.server/src/store.c.T @@ -592,6 +592,23 @@ clean: } +edg_wll_ErrorCode intJobStat_embryonic( + edg_wll_Context ctx, + glite_jobid_const_t jobid, + const edg_wll_RegJobEvent *e, + intJobStat *stat) +{ + if (edg_wlc_JobIdDup(jobid, &stat->pub.jobId) || + edg_wlc_JobIdDup(e->jobId, &stat->pub.parent_job)) goto err; + stat->pub.state = EDG_WLL_JOB_SUBMITTED; + stat->pub.owner = strdup(e->user); + stat->pub.jobtype = EDG_WLL_STAT_SIMPLE; + stat->pub.stateEnterTimes[1 + EDG_WLL_JOB_SUBMITTED] = (int)e->timestamp.tv_sec; + stat->pub.lastUpdateTime = e->timestamp; + +err: + return edg_wll_Error(ctx,NULL,NULL); +} /* * Returns encoded SQL table states record for embryonic DAG subjob @@ -611,14 +628,9 @@ static edg_wll_ErrorCode states_values_embryonic( intJobStat *stat = &stat_rec; init_intJobStat(stat); - if (edg_wlc_JobIdDup(jobid, &stat->pub.jobId) || - edg_wlc_JobIdDup(e->jobId, &stat->pub.parent_job)) goto err; - stat->pub.state = EDG_WLL_JOB_SUBMITTED; - stat->pub.owner = strdup(e->user); - stat->pub.jobtype = EDG_WLL_STAT_SIMPLE; - stat->pub.stateEnterTimes[1 + EDG_WLL_JOB_SUBMITTED] = (int)e->timestamp.tv_sec; - stat->pub.lastUpdateTime = e->timestamp; + if (intJobStat_embryonic(ctx, jobid, e, stat)) goto err; + jobid_md5 = edg_wlc_JobIdGetUnique(jobid); parent_md5 = edg_wlc_JobIdGetUnique(e->jobId); stat_enc = enc_intJobStat(strdup(""), stat); @@ -641,11 +653,11 @@ err: return edg_wll_Error(ctx,NULL,NULL); } + int register_subjobs_embryonic(edg_wll_Context ctx,const edg_wll_RegJobEvent *e) { int i, j, err = 0; edg_wlc_JobId *subjobs = NULL; - struct timeval now; char *jobid = NULL, *jobid_md5 = NULL, *jobid_md5_old = NULL; size_t jobid_len; glite_lbu_bufInsert bi_s, *bi_states = &bi_s; @@ -668,8 +680,6 @@ int register_subjobs_embryonic(edg_wll_Context ctx,const edg_wll_RegJobEvent *e) jobid_len = strlen(jobid_md5_old); - gettimeofday(&now,NULL); - /* increase the overall request timeout. */ ctx->p_tmp_timeout.tv_sec += e->nsubjobs/10; if (ctx->p_tmp_timeout.tv_sec > 86400) ctx->p_tmp_timeout.tv_sec = 86400; @@ -706,7 +716,7 @@ int register_subjobs_embryonic(edg_wll_Context ctx,const edg_wll_RegJobEvent *e) free(jobid_md5_old); jobid_md5_old = jobid_md5; - if (!err && (err = edg_wll_StoreIntStateEmbryonic(ctx, subjobs[i], icnames, values, bi_states))) + if (!err && (err = edg_wll_StoreIntStateEmbryonic(ctx, icnames, values, bi_states))) edg_wll_Error(ctx,&et,&ed); if (err) { @@ -732,6 +742,7 @@ err: return edg_wll_Error(ctx,NULL,NULL); } + int edg_wll_delete_event(edg_wll_Context ctx,const char *jobid,int event) { char *stmt;