recompute int_status of subjobs without any events when version change
authorMiloš Mulač <mulac@civ.zcu.cz>
Sun, 13 Jul 2008 13:20:35 +0000 (13:20 +0000)
committerMiloš Mulač <mulac@civ.zcu.cz>
Sun, 13 Jul 2008 13:20:35 +0000 (13:20 +0000)
- needs some testing

org.glite.lb.server/interface/store.h
org.glite.lb.server/src/jobstat.c
org.glite.lb.server/src/jobstat.h
org.glite.lb.server/src/store.c.T

index 8c55de8..3eb9789 100644 (file)
@@ -32,6 +32,8 @@ int create_reply(const edg_wll_Context,char **);
 int is_job_local(edg_wll_Context, glite_jobid_const_t jobId);
 int store_job_server_proxy(edg_wll_Context ctx, edg_wll_Event *event, int *register_to_JP);
 int register_subjobs_embryonic(edg_wll_Context,const edg_wll_RegJobEvent *);
+edg_wll_ErrorCode intJobStat_embryonic(edg_wll_Context ctx, glite_jobid_const_t jobid, const edg_wll_RegJobEvent *e, intJobStat *stat);
+
 
 
 int edg_wll_delete_event(edg_wll_Context,const char *, int);
index 38609c5..ec92ea2 100644 (file)
@@ -43,6 +43,7 @@
 
 static void warn (const char* format, ...) UNUSED_VAR ;
 static char *job_owner(edg_wll_Context,char *);
+static edg_wll_ErrorCode get_job_parent(edg_wll_Context ctx, glite_jobid_const_t job, glite_jobid_t *parent);
 
 
 int js_enable_store = 1;
@@ -114,14 +115,13 @@ int edg_wll_JobStatusServer(
 
                if (!edg_wll_LoadIntState(ctx, job, DONT_LOCK, -1 /*all events*/, &ijsp)) {
                        memcpy(stat, &(ijsp->pub), sizeof(ijsp->pub));
-                       free(jobstat.pub.owner); jobstat.pub.owner = NULL;
                        destroy_intJobStat_extension(ijsp);
                        free(ijsp);
                } else {
                        if (edg_wll_intJobStatus(ctx, job, flags,&jobstat, js_enable_store, 0)) {
                                goto rollback;
                        }
-                       memcpy(stat, &(ijsp->pub), sizeof(ijsp->pub));
+                       memcpy(stat, &(jobstat.pub), sizeof(jobstat.pub));
                }
                
                if (edg_wll_GetACL(ctx, job, &acl)) goto rollback;
@@ -312,12 +312,11 @@ int edg_wll_JobStatusServer(
 
                whole_cycle = 1;
 rollback:
-               if (!whole_cycle) {
-                               edg_wll_FreeStatus(&jobstat.pub);
-                               jobstat.pub.owner = NULL;
-                               destroy_intJobStat_extension(&jobstat);
+               if (!whole_cycle) { 
+                       edg_wll_FreeStatus(&jobstat.pub);
+                       memset(stat, 0, sizeof(*stat));
                }
-               if (jobstat.pub.owner) { free(jobstat.pub.owner); jobstat.pub.owner = NULL; }
+               destroy_intJobStat_extension(&jobstat);
                if (acl) { edg_wll_FreeAcl(acl); acl = NULL; }
                if (stmt) { free(stmt); stmt = NULL; }
                if (sh) { glite_lbu_FreeStmt(&sh); sh = NULL; }
@@ -349,7 +348,7 @@ int edg_wll_intJobStatus(
        int             i, intErr = 0;
        int             res;
        int             be_strict = 0;
-       char            *errstring = NULL;
+               char            *errstring = NULL;
 
        edg_wll_QueryRec        jqr[2];
        edg_wll_QueryRec        **jqra;
@@ -359,20 +358,28 @@ int edg_wll_intJobStatus(
        init_intJobStat(intstat);
 
        string_jobid = edg_wlc_JobIdUnparse(job);
-       if (string_jobid == NULL || intstat == NULL)
+       if (string_jobid == NULL || intstat == NULL) {
+               free(string_jobid);
                return edg_wll_SetError(ctx,EINVAL, NULL);
+       }
+       free(string_jobid);
 
        /* can be already filled by public edg_wll_JobStat() */
        if (intstat->pub.owner == NULL) {
                md5_jobid = edg_wlc_JobIdGetUnique(job);
                if ( !(intstat->pub.owner = job_owner(ctx,md5_jobid)) ) {
                        free(md5_jobid);
-                       free(string_jobid);
                        return edg_wll_Error(ctx,NULL,NULL);
                }
-               free(md5_jobid);
        }
        
+       /* re-lock job from InShareMode to ForUpdate
+        * needed by edg_wll_RestoreSubjobState and edg_wll_StoreIntState
+        */
+       res = edg_wll_LockJobRowForUpdate(ctx, md5_jobid);
+       free(md5_jobid);
+       if (res) return edg_wll_Error(ctx, NULL, NULL);
+
         jqr[0].attr = EDG_WLL_QUERY_ATTR_JOBID;
         jqr[0].op = EDG_WLL_QUERY_OP_EQUAL;
         jqr[0].value.j = (glite_jobid_t)job;
@@ -383,37 +390,45 @@ int edg_wll_intJobStatus(
        jqra[1] = NULL;
 
        if (edg_wll_QueryEventsServer(ctx,1, (const edg_wll_QueryRec **)jqra, NULL, &events)) {
-               free(string_jobid);
-               free(jqra);
-               free(intstat->pub.owner); intstat->pub.owner = NULL;
-                return edg_wll_Error(ctx, NULL, NULL);
+               if (edg_wll_Error(ctx, NULL, NULL) == ENOENT) {
+                       if (edg_wll_RestoreSubjobState(ctx, job, intstat)) {
+                               destroy_intJobStat(intstat);
+                               free(jqra);
+                               free(intstat->pub.owner); intstat->pub.owner = NULL;
+                               return edg_wll_Error(ctx, NULL, NULL);
+                       }
+               }
+               else {
+                       free(jqra);
+                       free(intstat->pub.owner); intstat->pub.owner = NULL;
+                       return edg_wll_Error(ctx, NULL, NULL);
+               }
        }
-       free(jqra);
-
-       for (num_events = 0; events[num_events].type != EDG_WLL_EVENT_UNDEF;
-               num_events++);
+       else {
+               free(jqra);
 
-       if (num_events == 0) {
-               free(string_jobid);
-               free(intstat->pub.owner); intstat->pub.owner = NULL;
-               return edg_wll_SetError(ctx,ENOENT,NULL);
-       }
+               for (num_events = 0; events[num_events].type != EDG_WLL_EVENT_UNDEF;
+                       num_events++);
 
-       for (i = 0; i < num_events; i++) {
-               res = processEvent(intstat, &events[i], i, be_strict, &errstring);
-               if (res == RET_FATAL || res == RET_INTERNAL) { /* !strict */
-                       intErr = 1; break;
+               if (num_events == 0) {
+                       free(intstat->pub.owner); intstat->pub.owner = NULL;
+                       return edg_wll_SetError(ctx,ENOENT,NULL);
                }
-       }
-       if (intstat->pub.state == EDG_WLL_JOB_UNDEF) {
-               intstat->pub.state = EDG_WLL_JOB_UNKNOWN;
-       }
 
-       free(string_jobid);
+               for (i = 0; i < num_events; i++) {
+                       res = processEvent(intstat, &events[i], i, be_strict, &errstring);
+                       if (res == RET_FATAL || res == RET_INTERNAL) { /* !strict */
+                               intErr = 1; break;
+                       }
+               }
+               if (intstat->pub.state == EDG_WLL_JOB_UNDEF) {
+                       intstat->pub.state = EDG_WLL_JOB_UNKNOWN;
+               }
 
-       for (i=0; i < num_events ; i++) edg_wll_FreeEvent(&events[i]);
-       free(events);
 
+               for (i=0; i < num_events ; i++) edg_wll_FreeEvent(&events[i]);
+               free(events);
+       }
 
        if (intErr) {
                destroy_intJobStat(intstat);
@@ -434,12 +449,6 @@ int edg_wll_intJobStatus(
                                intstat->user_fqans[i] = NULL;
                        }
 
-                       // re-lock job from InShareMode to ForUpdate
-                       md5_jobid = edg_wlc_JobIdGetUnique(job);
-                       res = edg_wll_LockJobRowForUpdate(ctx, md5_jobid);
-                       free(md5_jobid);
-                       if (res) return edg_wll_Error(ctx, NULL, NULL);
-
                        edg_wll_StoreIntState(ctx, intstat, tsq);
                        /* recheck
                         * intJobStat *reread;
@@ -453,6 +462,66 @@ int edg_wll_intJobStatus(
 }
 
 
+/* 
+ * Regenarate state of subjob without any event from its parent JobReg Event 
+ */
+edg_wll_ErrorCode edg_wll_RestoreSubjobState(
+       edg_wll_Context         ctx,
+       glite_jobid_const_t     job,
+       intJobStat              *intstat)
+{
+       glite_jobid_t           parent_job = NULL;
+       edg_wll_QueryRec        jqr_p1[2], jqr_p2[2];
+       edg_wll_QueryRec        **ec, **jc;
+       edg_wll_Event           *events_p;
+       int                     err, i;
+
+       
+       /* find job parent */
+       if (get_job_parent(ctx, job, &parent_job)) goto err;
+
+       /* get registration event(s) of parent*/
+       jqr_p1[0].attr = EDG_WLL_QUERY_ATTR_JOBID;
+       jqr_p1[0].op = EDG_WLL_QUERY_OP_EQUAL;
+       jqr_p1[0].value.j = parent_job;
+       jqr_p1[1].attr = EDG_WLL_QUERY_ATTR_UNDEF;
+
+       jqr_p2[0].attr = EDG_WLL_QUERY_ATTR_EVENT_TYPE;
+       jqr_p2[0].op = EDG_WLL_QUERY_OP_EQUAL;
+       jqr_p2[0].value.i = EDG_WLL_EVENT_REGJOB;
+       jqr_p2[1].attr = EDG_WLL_QUERY_ATTR_UNDEF;
+
+       jc = (edg_wll_QueryRec **) malloc (2 * sizeof(edg_wll_QueryRec **));
+       jc[0] = jqr_p1;
+       jc[1] = NULL;
+
+       ec = (edg_wll_QueryRec **) malloc (2 * sizeof(edg_wll_QueryRec **));
+       ec[0] = jqr_p2;
+       ec[1] = NULL;
+
+       if (edg_wll_QueryEventsServer(ctx,1, (const edg_wll_QueryRec **)jc, 
+                               (const edg_wll_QueryRec **)jc, &events_p)) {
+               free(jc);
+               free(ec);
+               return edg_wll_Error(ctx, NULL, NULL);
+       }
+       free(jc);
+       free(ec);
+
+       /* recreate job status of subjob */
+       err = intJobStat_embryonic(ctx, job, (const edg_wll_RegJobEvent *) &(events_p[0]), intstat);
+
+       for (i=0; events_p[i].type != EDG_WLL_EVENT_UNDEF ; i++) 
+               edg_wll_FreeEvent(&events_p[i]);
+       free(events_p);
+
+       if (err) goto err;
+
+err:
+       return edg_wll_Error(ctx, NULL, NULL);
+}
+
+
 /*
  * Helper for warning printouts
  */
@@ -501,6 +570,48 @@ static char *job_owner(edg_wll_Context ctx,char *md5_jobid)
 }
 
 
+static edg_wll_ErrorCode get_job_parent(edg_wll_Context ctx, glite_jobid_const_t job, glite_jobid_t *parent)
+{
+       glite_lbu_Statement     sh = NULL;
+       char    *stmt = NULL, *out = NULL;
+       char    *md5_jobid = edg_wlc_JobIdGetUnique(job);
+       int     ret;
+
+       
+       edg_wll_ResetError(ctx);
+       trio_asprintf(&stmt,"select parent_job from states "
+               "where jobid = '%|Ss'" ,md5_jobid);
+
+       if (stmt==NULL) {
+               edg_wll_SetError(ctx,ENOMEM, NULL);
+               goto err;
+       }
+
+       if (edg_wll_ExecSQL(ctx,stmt,&sh) < 0) goto err;
+
+       if (!edg_wll_FetchRow(ctx,sh,1,NULL,&out)) {
+               edg_wll_SetError(ctx,ENOENT,md5_jobid);
+               goto err;
+       }
+
+       ret = glite_jobid_recreate((const char*) ctx->srvName,
+                       ctx->srvPort, (const char *) out, parent);
+
+       if (ret) {
+               edg_wll_SetError(ctx,ret,"Error creating jobid");
+               goto err;
+       }
+
+err:
+       if (sh) glite_lbu_FreeStmt(&sh);
+       free(md5_jobid);
+       free(stmt);
+       free(out);
+
+       return edg_wll_Error(ctx,NULL,NULL);
+}
+
+
 #if 0
 /* XXX went_through went out */
 static int eval_expect_update(intJobStat *js, int* went_through, char **expect_from)
@@ -656,7 +767,6 @@ cleanup:
 
 
 edg_wll_ErrorCode edg_wll_StoreIntStateEmbryonic(edg_wll_Context ctx,
-        glite_jobid_const_t jobid,
         char *icnames, 
        char *values,
        glite_lbu_bufInsert *bi)
index b0b916a..240fd98 100644 (file)
@@ -12,8 +12,10 @@ int edg_wll_JobStatusServer(edg_wll_Context, glite_jobid_const_t, int, edg_wll_J
 
 int edg_wll_intJobStatus( edg_wll_Context, glite_jobid_const_t, int, intJobStat *, int, int);
 edg_wll_ErrorCode edg_wll_StoreIntState(edg_wll_Context, intJobStat *, int);
-edg_wll_ErrorCode edg_wll_StoreIntStateEmbryonic(edg_wll_Context, glite_jobid_const_t, char *icnames, char *values, glite_lbu_bufInsert *bi);
+edg_wll_ErrorCode edg_wll_StoreIntStateEmbryonic(edg_wll_Context, char *icnames, char *values, glite_lbu_bufInsert *bi);
 edg_wll_ErrorCode edg_wll_LoadIntState(edg_wll_Context , glite_jobid_const_t , int, int, intJobStat **);
+edg_wll_ErrorCode edg_wll_RestoreSubjobState(edg_wll_Context , glite_jobid_const_t , intJobStat *);
+
 
 /* update stored job state according to new event */
 edg_wll_ErrorCode edg_wll_StepIntState(edg_wll_Context ctx, glite_jobid_const_t job, edg_wll_Event *e, int seq, edg_wll_JobStat *stat_out);
index f774788..359deb9 100644 (file)
@@ -592,6 +592,23 @@ clean:
 }
 
 
+edg_wll_ErrorCode intJobStat_embryonic(
+       edg_wll_Context ctx,
+       glite_jobid_const_t jobid,
+       const edg_wll_RegJobEvent *e,
+       intJobStat *stat)
+{
+       if (edg_wlc_JobIdDup(jobid, &stat->pub.jobId) ||
+               edg_wlc_JobIdDup(e->jobId, &stat->pub.parent_job)) goto err;
+       stat->pub.state = EDG_WLL_JOB_SUBMITTED;
+       stat->pub.owner = strdup(e->user);
+       stat->pub.jobtype = EDG_WLL_STAT_SIMPLE;
+       stat->pub.stateEnterTimes[1 + EDG_WLL_JOB_SUBMITTED] = (int)e->timestamp.tv_sec;
+       stat->pub.lastUpdateTime = e->timestamp;
+
+err:
+       return edg_wll_Error(ctx,NULL,NULL);
+}
 
 /*
  * Returns encoded SQL table states record for embryonic DAG subjob
@@ -611,14 +628,9 @@ static edg_wll_ErrorCode states_values_embryonic(
        intJobStat *stat = &stat_rec;
 
        init_intJobStat(stat);
-       if (edg_wlc_JobIdDup(jobid, &stat->pub.jobId) ||
-               edg_wlc_JobIdDup(e->jobId, &stat->pub.parent_job)) goto err;
-       stat->pub.state = EDG_WLL_JOB_SUBMITTED;
-       stat->pub.owner = strdup(e->user);
-       stat->pub.jobtype = EDG_WLL_STAT_SIMPLE;
-       stat->pub.stateEnterTimes[1 + EDG_WLL_JOB_SUBMITTED] = (int)e->timestamp.tv_sec;
-       stat->pub.lastUpdateTime = e->timestamp;
 
+       if (intJobStat_embryonic(ctx, jobid, e, stat)) goto err;
+       
        jobid_md5 = edg_wlc_JobIdGetUnique(jobid);
        parent_md5 = edg_wlc_JobIdGetUnique(e->jobId);
        stat_enc = enc_intJobStat(strdup(""), stat);
@@ -641,11 +653,11 @@ err:
        return edg_wll_Error(ctx,NULL,NULL);
 }
 
+
 int register_subjobs_embryonic(edg_wll_Context ctx,const edg_wll_RegJobEvent *e)
 {
        int                     i, j,  err = 0;
        edg_wlc_JobId           *subjobs = NULL;
-       struct timeval          now;
        char                    *jobid = NULL, *jobid_md5 = NULL, *jobid_md5_old = NULL;
        size_t                  jobid_len;
        glite_lbu_bufInsert     bi_s, *bi_states = &bi_s;
@@ -668,8 +680,6 @@ int register_subjobs_embryonic(edg_wll_Context ctx,const edg_wll_RegJobEvent *e)
        jobid_len = strlen(jobid_md5_old);
                
 
-       gettimeofday(&now,NULL);
-
        /* increase the overall request timeout. */
        ctx->p_tmp_timeout.tv_sec += e->nsubjobs/10;
        if (ctx->p_tmp_timeout.tv_sec > 86400) ctx->p_tmp_timeout.tv_sec = 86400;
@@ -706,7 +716,7 @@ int register_subjobs_embryonic(edg_wll_Context ctx,const edg_wll_RegJobEvent *e)
                free(jobid_md5_old);
                jobid_md5_old = jobid_md5;
 
-               if (!err && (err = edg_wll_StoreIntStateEmbryonic(ctx, subjobs[i], icnames, values, bi_states)))
+               if (!err && (err = edg_wll_StoreIntStateEmbryonic(ctx, icnames, values, bi_states)))
                        edg_wll_Error(ctx,&et,&ed);
 
                if (err) {
@@ -732,6 +742,7 @@ err:
        return edg_wll_Error(ctx,NULL,NULL);
 }
 
+
 int edg_wll_delete_event(edg_wll_Context ctx,const char *jobid,int event)
 {
        char    *stmt;