From dc28ae1241ab32bd70a5a74bdb5b4247a6cdab2c Mon Sep 17 00:00:00 2001 From: =?utf8?q?Milo=C5=A1=20Mula=C4=8D?= Date: Fri, 29 Aug 2008 10:34:37 +0000 Subject: [PATCH] recompute int_status of subjobs without any events when version change - merge from head --- org.glite.lb.server/interface/store.h | 1 + org.glite.lb.server/src/jobstat.c | 169 ++++++++++++++++++++++++++++------ org.glite.lb.server/src/jobstat.h | 3 +- org.glite.lb.server/src/store.c.T | 33 ++++--- 4 files changed, 165 insertions(+), 41 deletions(-) diff --git a/org.glite.lb.server/interface/store.h b/org.glite.lb.server/interface/store.h index 17e71ac..7e6fdc2 100644 --- a/org.glite.lb.server/interface/store.h +++ b/org.glite.lb.server/interface/store.h @@ -46,6 +46,7 @@ int db_parent_store(edg_wll_Context, edg_wll_Event *, intJobStat *); int handle_request(edg_wll_Context,char *); int create_reply(const edg_wll_Context,char **); int trans_db_store(edg_wll_Context,char *,edg_wll_Event *,intJobStat *); +edg_wll_ErrorCode intJobStat_embryonic(edg_wll_Context ctx, edg_wlc_JobId jobid, const edg_wll_RegJobEvent *e, intJobStat *stat); int edg_wll_delete_event(edg_wll_Context,const char *, int); diff --git a/org.glite.lb.server/src/jobstat.c b/org.glite.lb.server/src/jobstat.c index f2ae024..01123a8 100644 --- a/org.glite.lb.server/src/jobstat.c +++ b/org.glite.lb.server/src/jobstat.c @@ -37,6 +37,7 @@ static void warn (const char* format, ...) UNUSED_VAR ; static char *job_owner(edg_wll_Context,char *); +static edg_wll_ErrorCode get_job_parent(edg_wll_Context ctx, edg_wlc_JobId job, edg_wlc_JobId *parent); int js_enable_store = 1; @@ -344,15 +345,17 @@ int edg_wll_intJobStatus( init_intJobStat(intstat); string_jobid = edg_wlc_JobIdUnparse(job); - if (string_jobid == NULL || intstat == NULL) + if (string_jobid == NULL || intstat == NULL) { + free(string_jobid); return edg_wll_SetError(ctx,EINVAL, NULL); + } + free(string_jobid); /* can be already filled by public edg_wll_JobStat() */ if (intstat->pub.owner == NULL) { md5_jobid = edg_wlc_JobIdGetUnique(job); if ( !(intstat->pub.owner = job_owner(ctx,md5_jobid)) ) { free(md5_jobid); - free(string_jobid); return edg_wll_Error(ctx,NULL,NULL); } free(md5_jobid); @@ -368,39 +371,45 @@ int edg_wll_intJobStatus( jqra[1] = NULL; if (edg_wll_QueryEventsServer(ctx,1, (const edg_wll_QueryRec **)jqra, NULL, &events)) { - free(string_jobid); - free(jqra); - free(intstat->pub.owner); intstat->pub.owner = NULL; - return edg_wll_Error(ctx, NULL, NULL); - } - free(jqra); - - for (num_events = 0; events[num_events].type != EDG_WLL_EVENT_UNDEF; - num_events++); - - if (num_events == 0) { - free(string_jobid); - free(intstat->pub.owner); intstat->pub.owner = NULL; - return edg_wll_SetError(ctx,ENOENT,NULL); + if (edg_wll_Error(ctx, NULL, NULL) == ENOENT) { + if (edg_wll_RestoreSubjobState(ctx, job, intstat)) { + destroy_intJobStat(intstat); + free(jqra); + free(intstat->pub.owner); intstat->pub.owner = NULL; + return edg_wll_Error(ctx, NULL, NULL); + } + } + else { + free(jqra); + free(intstat->pub.owner); intstat->pub.owner = NULL; + return edg_wll_Error(ctx, NULL, NULL); + } } + else { + free(jqra); - edg_wll_SortEvents(events); + for (num_events = 0; events[num_events].type != EDG_WLL_EVENT_UNDEF; + num_events++); - for (i = 0; i < num_events; i++) { - res = processEvent(intstat, &events[i], i, be_strict, &errstring); - if (res == RET_FATAL || res == RET_INTERNAL) { /* !strict */ - intErr = 1; break; + if (num_events == 0) { + free(intstat->pub.owner); intstat->pub.owner = NULL; + return edg_wll_SetError(ctx,ENOENT,NULL); } - } - if (intstat->pub.state == EDG_WLL_JOB_UNDEF) { - intstat->pub.state = EDG_WLL_JOB_UNKNOWN; - } - free(string_jobid); + for (i = 0; i < num_events; i++) { + res = processEvent(intstat, &events[i], i, be_strict, &errstring); + if (res == RET_FATAL || res == RET_INTERNAL) { /* !strict */ + intErr = 1; break; + } + } + if (intstat->pub.state == EDG_WLL_JOB_UNDEF) { + intstat->pub.state = EDG_WLL_JOB_UNKNOWN; + } - for (i=0; i < num_events ; i++) edg_wll_FreeEvent(&events[i]); - free(events); + for (i=0; i < num_events ; i++) edg_wll_FreeEvent(&events[i]); + free(events); + } if (intErr) { destroy_intJobStat(intstat); @@ -433,6 +442,66 @@ int edg_wll_intJobStatus( } +/* + * Regenarate state of subjob without any event from its parent JobReg Event + */ +edg_wll_ErrorCode edg_wll_RestoreSubjobState( + edg_wll_Context ctx, + edg_wlc_JobId job, + intJobStat *intstat) +{ + edg_wlc_JobId parent_job = NULL; + edg_wll_QueryRec jqr_p1[2], jqr_p2[2]; + edg_wll_QueryRec **ec, **jc; + edg_wll_Event *events_p; + int err, i; + + + /* find job parent */ + if (get_job_parent(ctx, job, &parent_job)) goto err; + + /* get registration event(s) of parent*/ + jqr_p1[0].attr = EDG_WLL_QUERY_ATTR_JOBID; + jqr_p1[0].op = EDG_WLL_QUERY_OP_EQUAL; + jqr_p1[0].value.j = parent_job; + jqr_p1[1].attr = EDG_WLL_QUERY_ATTR_UNDEF; + + jqr_p2[0].attr = EDG_WLL_QUERY_ATTR_EVENT_TYPE; + jqr_p2[0].op = EDG_WLL_QUERY_OP_EQUAL; + jqr_p2[0].value.i = EDG_WLL_EVENT_REGJOB; + jqr_p2[1].attr = EDG_WLL_QUERY_ATTR_UNDEF; + + jc = (edg_wll_QueryRec **) malloc (2 * sizeof(edg_wll_QueryRec **)); + jc[0] = jqr_p1; + jc[1] = NULL; + + ec = (edg_wll_QueryRec **) malloc (2 * sizeof(edg_wll_QueryRec **)); + ec[0] = jqr_p2; + ec[1] = NULL; + + if (edg_wll_QueryEventsServer(ctx,1, (const edg_wll_QueryRec **)jc, + (const edg_wll_QueryRec **)ec, &events_p)) { + free(jc); + free(ec); + return edg_wll_Error(ctx, NULL, NULL); + } + free(jc); + free(ec); + + /* recreate job status of subjob */ + err = intJobStat_embryonic(ctx, job, (const edg_wll_RegJobEvent *) &(events_p[0]), intstat); + + for (i=0; events_p[i].type != EDG_WLL_EVENT_UNDEF ; i++) + edg_wll_FreeEvent(&events_p[i]); + free(events_p); + + if (err) goto err; + +err: + return edg_wll_Error(ctx, NULL, NULL); +} + + /* * Helper for warning printouts */ @@ -481,6 +550,49 @@ static char *job_owner(edg_wll_Context ctx,char *md5_jobid) } + +static edg_wll_ErrorCode get_job_parent(edg_wll_Context ctx, edg_wlc_JobId job, edg_wlc_JobId *parent) +{ + edg_wll_Stmt sh = NULL; + char *stmt = NULL, *out = NULL; + char *md5_jobid = edg_wlc_JobIdGetUnique(job); + int ret; + + + edg_wll_ResetError(ctx); + trio_asprintf(&stmt,"select parent_job from states " + "where jobid = '%|Ss'" ,md5_jobid); + + if (stmt==NULL) { + edg_wll_SetError(ctx,ENOMEM, NULL); + goto err; + } + + if (edg_wll_ExecStmt(ctx,stmt,&sh) < 0) goto err; + + if (!edg_wll_FetchRow(sh,&out)) { + edg_wll_SetError(ctx,ENOENT,md5_jobid); + goto err; + } + + ret = edg_wlc_JobIdRecreate((const char*) ctx->srvName, + ctx->srvPort, (const char *) out, parent); + + if (ret) { + edg_wll_SetError(ctx,ret,"Error creating jobid"); + goto err; + } + +err: + if (sh) edg_wll_FreeStmt(&sh); + free(md5_jobid); + free(stmt); + free(out); + + return edg_wll_Error(ctx,NULL,NULL); +} + + #if 0 /* XXX went_through went out */ static int eval_expect_update(intJobStat *js, int* went_through, char **expect_from) @@ -630,7 +742,6 @@ cleanup: edg_wll_ErrorCode edg_wll_StoreIntStateEmbryonic(edg_wll_Context ctx, - edg_wlc_JobId jobid, char *icnames, char *values, edg_wll_bufInsert *bi) diff --git a/org.glite.lb.server/src/jobstat.h b/org.glite.lb.server/src/jobstat.h index 9cdd5a6..367df62 100644 --- a/org.glite.lb.server/src/jobstat.h +++ b/org.glite.lb.server/src/jobstat.h @@ -101,8 +101,9 @@ edg_wll_ErrorCode edg_wll_IColumnsSQLPart(edg_wll_Context, void *, intJobStat *, edg_wll_ErrorCode edg_wll_RefreshIColumns(edg_wll_Context, void *); int edg_wll_intJobStatus( edg_wll_Context, const edg_wlc_JobId, int, intJobStat *, int, int); edg_wll_ErrorCode edg_wll_StoreIntState(edg_wll_Context, intJobStat *, int); -edg_wll_ErrorCode edg_wll_StoreIntStateEmbryonic(edg_wll_Context, edg_wlc_JobId, char *icnames, char *values, edg_wll_bufInsert *bi); +edg_wll_ErrorCode edg_wll_StoreIntStateEmbryonic(edg_wll_Context, char *icnames, char *values, edg_wll_bufInsert *bi); edg_wll_ErrorCode edg_wll_LoadIntState(edg_wll_Context , edg_wlc_JobId , int, intJobStat **); +edg_wll_ErrorCode edg_wll_RestoreSubjobState(edg_wll_Context , edg_wlc_JobId , intJobStat *); edg_wll_ErrorCode edg_wll_StepIntStateParent(edg_wll_Context,edg_wlc_JobId,edg_wll_Event *,int,intJobStat *,edg_wll_JobStat *); diff --git a/org.glite.lb.server/src/store.c.T b/org.glite.lb.server/src/store.c.T index 110eeb4..23c5fa4 100644 --- a/org.glite.lb.server/src/store.c.T +++ b/org.glite.lb.server/src/store.c.T @@ -674,6 +674,25 @@ static int register_subjobs(edg_wll_Context ctx,const edg_wll_RegJobEvent *e) #endif +edg_wll_ErrorCode intJobStat_embryonic( + edg_wll_Context ctx, + edg_wlc_JobId jobid, + const edg_wll_RegJobEvent *e, + intJobStat *stat) +{ + if (edg_wlc_JobIdDup(jobid, &stat->pub.jobId) || + edg_wlc_JobIdDup(e->jobId, &stat->pub.parent_job)) goto err; + stat->pub.state = EDG_WLL_JOB_SUBMITTED; + stat->pub.owner = strdup(e->user); + stat->pub.jobtype = EDG_WLL_STAT_SIMPLE; + stat->pub.stateEnterTimes[1 + EDG_WLL_JOB_SUBMITTED] = (int)e->timestamp.tv_sec; + stat->pub.lastUpdateTime = e->timestamp; + +err: + return edg_wll_Error(ctx,NULL,NULL); +} + + /* * Returns encoded SQL table states record for embryonic DAG subjob */ @@ -692,13 +711,8 @@ static edg_wll_ErrorCode states_values_embryonic( intJobStat *stat = &stat_rec; init_intJobStat(stat); - if (edg_wlc_JobIdDup(jobid, &stat->pub.jobId) || - edg_wlc_JobIdDup(e->jobId, &stat->pub.parent_job)) goto err; - stat->pub.state = EDG_WLL_JOB_SUBMITTED; - stat->pub.owner = strdup(e->user); - stat->pub.jobtype = EDG_WLL_STAT_SIMPLE; - stat->pub.stateEnterTimes[1 + EDG_WLL_JOB_SUBMITTED] = (int)e->timestamp.tv_sec; - stat->pub.lastUpdateTime = e->timestamp; + + if (intJobStat_embryonic(ctx, jobid, e, stat)) goto err; jobid_md5 = edg_wlc_JobIdGetUnique(jobid); parent_md5 = edg_wlc_JobIdGetUnique(e->jobId); @@ -726,7 +740,6 @@ static int register_subjobs_embryonic(edg_wll_Context ctx,const edg_wll_RegJobEv { int i, err = 0; edg_wlc_JobId *subjobs; - struct timeval now; char *jobid_md5, *jobid_md5_old; size_t jobid_len; #ifdef LB_BUF @@ -769,8 +782,6 @@ static int register_subjobs_embryonic(edg_wll_Context ctx,const edg_wll_RegJobEv #endif - gettimeofday(&now,NULL); - /* increase the overall request timeout. */ ctx->p_tmp_timeout.tv_sec += e->nsubjobs/10; if (ctx->p_tmp_timeout.tv_sec > 86400) ctx->p_tmp_timeout.tv_sec = 86400; @@ -801,7 +812,7 @@ static int register_subjobs_embryonic(edg_wll_Context ctx,const edg_wll_RegJobEv free(jobid_md5_old); jobid_md5_old = jobid_md5; - if (!err && (err = edg_wll_StoreIntStateEmbryonic(ctx, subjobs[i], icnames, values, bi_states))) + if (!err && (err = edg_wll_StoreIntStateEmbryonic(ctx, icnames, values, bi_states))) edg_wll_Error(ctx,&et,&ed); //job_s = edg_wlc_JobIdUnparse(subjobs[i]); -- 1.8.2.3