recompute int_status of subjobs without any events when version change
authorMiloš Mulač <mulac@civ.zcu.cz>
Fri, 29 Aug 2008 10:34:37 +0000 (10:34 +0000)
committerMiloš Mulač <mulac@civ.zcu.cz>
Fri, 29 Aug 2008 10:34:37 +0000 (10:34 +0000)
- merge from head

org.glite.lb.server/interface/store.h
org.glite.lb.server/src/jobstat.c
org.glite.lb.server/src/jobstat.h
org.glite.lb.server/src/store.c.T

index 17e71ac..7e6fdc2 100644 (file)
@@ -46,6 +46,7 @@ int db_parent_store(edg_wll_Context, edg_wll_Event *, intJobStat *);
 int handle_request(edg_wll_Context,char *);
 int create_reply(const edg_wll_Context,char **);
 int trans_db_store(edg_wll_Context,char *,edg_wll_Event *,intJobStat *);
+edg_wll_ErrorCode intJobStat_embryonic(edg_wll_Context ctx, edg_wlc_JobId jobid, const edg_wll_RegJobEvent *e, intJobStat *stat);
 
 
 int edg_wll_delete_event(edg_wll_Context,const char *, int);
index f2ae024..01123a8 100644 (file)
@@ -37,6 +37,7 @@
 
 static void warn (const char* format, ...) UNUSED_VAR ;
 static char *job_owner(edg_wll_Context,char *);
+static edg_wll_ErrorCode get_job_parent(edg_wll_Context ctx, edg_wlc_JobId job, edg_wlc_JobId *parent);
 
 
 int js_enable_store = 1;
@@ -344,15 +345,17 @@ int edg_wll_intJobStatus(
        init_intJobStat(intstat);
 
        string_jobid = edg_wlc_JobIdUnparse(job);
-       if (string_jobid == NULL || intstat == NULL)
+       if (string_jobid == NULL || intstat == NULL) {
+               free(string_jobid);
                return edg_wll_SetError(ctx,EINVAL, NULL);
+       }
+       free(string_jobid);
 
        /* can be already filled by public edg_wll_JobStat() */
        if (intstat->pub.owner == NULL) {
                md5_jobid = edg_wlc_JobIdGetUnique(job);
                if ( !(intstat->pub.owner = job_owner(ctx,md5_jobid)) ) {
                        free(md5_jobid);
-                       free(string_jobid);
                        return edg_wll_Error(ctx,NULL,NULL);
                }
                free(md5_jobid);
@@ -368,39 +371,45 @@ int edg_wll_intJobStatus(
        jqra[1] = NULL;
 
        if (edg_wll_QueryEventsServer(ctx,1, (const edg_wll_QueryRec **)jqra, NULL, &events)) {
-               free(string_jobid);
-               free(jqra);
-               free(intstat->pub.owner); intstat->pub.owner = NULL;
-                return edg_wll_Error(ctx, NULL, NULL);
-       }
-       free(jqra);
-
-       for (num_events = 0; events[num_events].type != EDG_WLL_EVENT_UNDEF;
-               num_events++);
-
-       if (num_events == 0) {
-               free(string_jobid);
-               free(intstat->pub.owner); intstat->pub.owner = NULL;
-               return edg_wll_SetError(ctx,ENOENT,NULL);
+               if (edg_wll_Error(ctx, NULL, NULL) == ENOENT) {
+                       if (edg_wll_RestoreSubjobState(ctx, job, intstat)) {
+                               destroy_intJobStat(intstat);
+                               free(jqra);
+                               free(intstat->pub.owner); intstat->pub.owner = NULL;
+                               return edg_wll_Error(ctx, NULL, NULL);
+                       }
+               }
+               else {
+                       free(jqra);
+                       free(intstat->pub.owner); intstat->pub.owner = NULL;
+                       return edg_wll_Error(ctx, NULL, NULL);
+               }
        }
+       else {
+               free(jqra);
 
-       edg_wll_SortEvents(events);
+               for (num_events = 0; events[num_events].type != EDG_WLL_EVENT_UNDEF;
+                       num_events++);
 
-       for (i = 0; i < num_events; i++) {
-               res = processEvent(intstat, &events[i], i, be_strict, &errstring);
-               if (res == RET_FATAL || res == RET_INTERNAL) { /* !strict */
-                       intErr = 1; break;
+               if (num_events == 0) {
+                       free(intstat->pub.owner); intstat->pub.owner = NULL;
+                       return edg_wll_SetError(ctx,ENOENT,NULL);
                }
-       }
-       if (intstat->pub.state == EDG_WLL_JOB_UNDEF) {
-               intstat->pub.state = EDG_WLL_JOB_UNKNOWN;
-       }
 
-       free(string_jobid);
+               for (i = 0; i < num_events; i++) {
+                       res = processEvent(intstat, &events[i], i, be_strict, &errstring);
+                       if (res == RET_FATAL || res == RET_INTERNAL) { /* !strict */
+                               intErr = 1; break;
+                       }
+               }
+               if (intstat->pub.state == EDG_WLL_JOB_UNDEF) {
+                       intstat->pub.state = EDG_WLL_JOB_UNKNOWN;
+               }
 
-       for (i=0; i < num_events ; i++) edg_wll_FreeEvent(&events[i]);
-       free(events);
 
+               for (i=0; i < num_events ; i++) edg_wll_FreeEvent(&events[i]);
+               free(events);
+       }
 
        if (intErr) {
                destroy_intJobStat(intstat);
@@ -433,6 +442,66 @@ int edg_wll_intJobStatus(
 }
 
 
+/* 
+ * Regenarate state of subjob without any event from its parent JobReg Event 
+ */
+edg_wll_ErrorCode edg_wll_RestoreSubjobState(
+       edg_wll_Context         ctx,
+       edg_wlc_JobId           job,
+       intJobStat              *intstat)
+{
+       edg_wlc_JobId           parent_job = NULL;
+       edg_wll_QueryRec        jqr_p1[2], jqr_p2[2];
+       edg_wll_QueryRec        **ec, **jc;
+       edg_wll_Event           *events_p;
+       int                     err, i;
+
+       
+       /* find job parent */
+       if (get_job_parent(ctx, job, &parent_job)) goto err;
+
+       /* get registration event(s) of parent*/
+       jqr_p1[0].attr = EDG_WLL_QUERY_ATTR_JOBID;
+       jqr_p1[0].op = EDG_WLL_QUERY_OP_EQUAL;
+       jqr_p1[0].value.j = parent_job;
+       jqr_p1[1].attr = EDG_WLL_QUERY_ATTR_UNDEF;
+
+       jqr_p2[0].attr = EDG_WLL_QUERY_ATTR_EVENT_TYPE;
+       jqr_p2[0].op = EDG_WLL_QUERY_OP_EQUAL;
+       jqr_p2[0].value.i = EDG_WLL_EVENT_REGJOB;
+       jqr_p2[1].attr = EDG_WLL_QUERY_ATTR_UNDEF;
+
+       jc = (edg_wll_QueryRec **) malloc (2 * sizeof(edg_wll_QueryRec **));
+       jc[0] = jqr_p1;
+       jc[1] = NULL;
+
+       ec = (edg_wll_QueryRec **) malloc (2 * sizeof(edg_wll_QueryRec **));
+       ec[0] = jqr_p2;
+       ec[1] = NULL;
+
+       if (edg_wll_QueryEventsServer(ctx,1, (const edg_wll_QueryRec **)jc, 
+                               (const edg_wll_QueryRec **)ec, &events_p)) {
+               free(jc);
+               free(ec);
+               return edg_wll_Error(ctx, NULL, NULL);
+       }
+       free(jc);
+       free(ec);
+
+       /* recreate job status of subjob */
+       err = intJobStat_embryonic(ctx, job, (const edg_wll_RegJobEvent *) &(events_p[0]), intstat);
+
+       for (i=0; events_p[i].type != EDG_WLL_EVENT_UNDEF ; i++) 
+               edg_wll_FreeEvent(&events_p[i]);
+       free(events_p);
+
+       if (err) goto err;
+
+err:
+       return edg_wll_Error(ctx, NULL, NULL);
+}
+
+
 /*
  * Helper for warning printouts
  */
@@ -481,6 +550,49 @@ static char *job_owner(edg_wll_Context ctx,char *md5_jobid)
 }
 
 
+
+static edg_wll_ErrorCode get_job_parent(edg_wll_Context ctx, edg_wlc_JobId job, edg_wlc_JobId *parent)
+{
+       edg_wll_Stmt    sh = NULL;
+       char    *stmt = NULL, *out = NULL;
+       char    *md5_jobid = edg_wlc_JobIdGetUnique(job);
+       int     ret;
+
+       
+       edg_wll_ResetError(ctx);
+       trio_asprintf(&stmt,"select parent_job from states "
+               "where jobid = '%|Ss'" ,md5_jobid);
+
+       if (stmt==NULL) {
+               edg_wll_SetError(ctx,ENOMEM, NULL);
+               goto err;
+       }
+
+       if (edg_wll_ExecStmt(ctx,stmt,&sh) < 0) goto err;
+
+       if (!edg_wll_FetchRow(sh,&out)) {
+               edg_wll_SetError(ctx,ENOENT,md5_jobid);
+               goto err;
+       }
+
+       ret = edg_wlc_JobIdRecreate((const char*) ctx->srvName,
+                       ctx->srvPort, (const char *) out, parent);
+
+       if (ret) {
+               edg_wll_SetError(ctx,ret,"Error creating jobid");
+               goto err;
+       }
+
+err:
+       if (sh) edg_wll_FreeStmt(&sh);
+       free(md5_jobid);
+       free(stmt);
+       free(out);
+
+       return edg_wll_Error(ctx,NULL,NULL);
+}
+
+
 #if 0
 /* XXX went_through went out */
 static int eval_expect_update(intJobStat *js, int* went_through, char **expect_from)
@@ -630,7 +742,6 @@ cleanup:
 
 
 edg_wll_ErrorCode edg_wll_StoreIntStateEmbryonic(edg_wll_Context ctx,
-        edg_wlc_JobId jobid,
         char *icnames, 
        char *values,
        edg_wll_bufInsert *bi)
index 9cdd5a6..367df62 100644 (file)
@@ -101,8 +101,9 @@ edg_wll_ErrorCode edg_wll_IColumnsSQLPart(edg_wll_Context, void *, intJobStat *,
 edg_wll_ErrorCode edg_wll_RefreshIColumns(edg_wll_Context, void *);
 int edg_wll_intJobStatus( edg_wll_Context, const edg_wlc_JobId, int, intJobStat *, int, int);
 edg_wll_ErrorCode edg_wll_StoreIntState(edg_wll_Context, intJobStat *, int);
-edg_wll_ErrorCode edg_wll_StoreIntStateEmbryonic(edg_wll_Context, edg_wlc_JobId, char *icnames, char *values, edg_wll_bufInsert *bi);
+edg_wll_ErrorCode edg_wll_StoreIntStateEmbryonic(edg_wll_Context, char *icnames, char *values, edg_wll_bufInsert *bi);
 edg_wll_ErrorCode edg_wll_LoadIntState(edg_wll_Context , edg_wlc_JobId , int, intJobStat **);
+edg_wll_ErrorCode edg_wll_RestoreSubjobState(edg_wll_Context , edg_wlc_JobId , intJobStat *);
 
 edg_wll_ErrorCode edg_wll_StepIntStateParent(edg_wll_Context,edg_wlc_JobId,edg_wll_Event *,int,intJobStat *,edg_wll_JobStat *);
 
index 110eeb4..23c5fa4 100644 (file)
@@ -674,6 +674,25 @@ static int register_subjobs(edg_wll_Context ctx,const edg_wll_RegJobEvent *e)
 #endif
 
 
+edg_wll_ErrorCode intJobStat_embryonic(
+       edg_wll_Context ctx,
+       edg_wlc_JobId jobid,
+       const edg_wll_RegJobEvent *e,
+       intJobStat *stat)
+{
+       if (edg_wlc_JobIdDup(jobid, &stat->pub.jobId) ||
+               edg_wlc_JobIdDup(e->jobId, &stat->pub.parent_job)) goto err;
+       stat->pub.state = EDG_WLL_JOB_SUBMITTED;
+       stat->pub.owner = strdup(e->user);
+       stat->pub.jobtype = EDG_WLL_STAT_SIMPLE;
+       stat->pub.stateEnterTimes[1 + EDG_WLL_JOB_SUBMITTED] = (int)e->timestamp.tv_sec;
+       stat->pub.lastUpdateTime = e->timestamp;
+
+err:
+       return edg_wll_Error(ctx,NULL,NULL);
+}
+
+
 /*
  * Returns encoded SQL table states record for embryonic DAG subjob
  */
@@ -692,13 +711,8 @@ static edg_wll_ErrorCode states_values_embryonic(
        intJobStat *stat = &stat_rec;
 
        init_intJobStat(stat);
-       if (edg_wlc_JobIdDup(jobid, &stat->pub.jobId) ||
-               edg_wlc_JobIdDup(e->jobId, &stat->pub.parent_job)) goto err;
-       stat->pub.state = EDG_WLL_JOB_SUBMITTED;
-       stat->pub.owner = strdup(e->user);
-       stat->pub.jobtype = EDG_WLL_STAT_SIMPLE;
-       stat->pub.stateEnterTimes[1 + EDG_WLL_JOB_SUBMITTED] = (int)e->timestamp.tv_sec;
-       stat->pub.lastUpdateTime = e->timestamp;
+
+       if (intJobStat_embryonic(ctx, jobid, e, stat)) goto err;
 
        jobid_md5 = edg_wlc_JobIdGetUnique(jobid);
        parent_md5 = edg_wlc_JobIdGetUnique(e->jobId);
@@ -726,7 +740,6 @@ static int register_subjobs_embryonic(edg_wll_Context ctx,const edg_wll_RegJobEv
 {
        int                     i, err = 0;
        edg_wlc_JobId           *subjobs;
-       struct timeval          now;
        char                    *jobid_md5, *jobid_md5_old;
        size_t                  jobid_len;
 #ifdef LB_BUF
@@ -769,8 +782,6 @@ static int register_subjobs_embryonic(edg_wll_Context ctx,const edg_wll_RegJobEv
 #endif         
 
 
-       gettimeofday(&now,NULL);
-
        /* increase the overall request timeout. */
        ctx->p_tmp_timeout.tv_sec += e->nsubjobs/10;
        if (ctx->p_tmp_timeout.tv_sec > 86400) ctx->p_tmp_timeout.tv_sec = 86400;
@@ -801,7 +812,7 @@ static int register_subjobs_embryonic(edg_wll_Context ctx,const edg_wll_RegJobEv
                free(jobid_md5_old);
                jobid_md5_old = jobid_md5;
 
-               if (!err && (err = edg_wll_StoreIntStateEmbryonic(ctx, subjobs[i], icnames, values, bi_states)))
+               if (!err && (err = edg_wll_StoreIntStateEmbryonic(ctx, icnames, values, bi_states)))
                        edg_wll_Error(ctx,&et,&ed);
 
 //job_s = edg_wlc_JobIdUnparse(subjobs[i]);