Purge unknown or failed jobs (=don't fail job state computation):
authorFrantišek Dvořák <valtri@civ.zcu.cz>
Wed, 26 Aug 2009 11:24:06 +0000 (11:24 +0000)
committerFrantišek Dvořák <valtri@civ.zcu.cz>
Wed, 26 Aug 2009 11:24:06 +0000 (11:24 +0000)
 - eventless jobs - UNDEFINED state, always purge
 - jobs without registration - UNDEFINED state, with max event time.
Grey jobs:
 - don't touch stored state
 - don't list them

org.glite.lb.server/src/jobstat.c
org.glite.lb.server/src/query.c
org.glite.lb.server/src/srv_purge.c
org.glite.lb.state-machine/interface/intjobstat.h

index 5798301..dbd2260 100644 (file)
@@ -196,7 +196,7 @@ int edg_wll_JobStatusServer(
                                                                glite_jobid_t   subjob;
                                                                intJobStat      js_real;
                                                                char            *name;
-                                                               int             port;
+                                                               unsigned int            port;
 
 
                                                                js = &js_real;
@@ -272,7 +272,7 @@ int edg_wll_JobStatusServer(
                                                                        glite_jobid_t   subjob;
                                                                        intJobStat      js_real;
                                                                        char            *name;
-                                                                       int             port;
+                                                                       unsigned int            port;
 
 
                                                                        js = &js_real;
@@ -403,6 +403,9 @@ int edg_wll_intJobStatus(
 
        edg_wll_QueryRec        jqr[2];
        edg_wll_QueryRec        **jqra;
+       glite_lbu_Statement     sh;
+       char                    *stmt, *out;
+       struct timeval          ts, mints = {tv_sec:0, tv_usec:0};
 
 /* Processing */
        edg_wll_ResetError(ctx);
@@ -443,10 +446,12 @@ int edg_wll_intJobStatus(
        if (edg_wll_QueryEventsServer(ctx,1, (const edg_wll_QueryRec **)jqra, NULL, &events)) {
                if (edg_wll_Error(ctx, NULL, NULL) == ENOENT) {
                        if (edg_wll_RestoreSubjobState(ctx, job, intstat)) {
-                               destroy_intJobStat(intstat);
-                               free(jqra);
-                               free(intstat->pub.owner); intstat->pub.owner = NULL;
-                               return edg_wll_Error(ctx, NULL, NULL);
+                               if (edg_wll_Error(ctx, NULL, NULL) != ENOENT) {
+                                       destroy_intJobStat(intstat);
+                                       free(jqra);
+                                       free(intstat->pub.owner); intstat->pub.owner = NULL;
+                                       return edg_wll_Error(ctx, NULL, NULL);
+                               }
                        }
                }
                else {
@@ -455,25 +460,29 @@ int edg_wll_intJobStatus(
                        return edg_wll_Error(ctx, NULL, NULL);
                }
        }
-       else {
+       edg_wll_ResetError(ctx);
+
+       {
                free(jqra);
 
-               for (num_events = 0; events[num_events].type != EDG_WLL_EVENT_UNDEF;
+               for (num_events = 0; events && events[num_events].type != EDG_WLL_EVENT_UNDEF;
                        num_events++);
 
-               if (num_events == 0) {
-                       free(intstat->pub.owner); intstat->pub.owner = NULL;
-                       return edg_wll_SetError(ctx,ENOENT,NULL);
-               }
-
                for (i = 0; i < num_events; i++) {
                        res = processEvent(intstat, &events[i], i, be_strict, &errstring);
                        if (res == RET_FATAL || res == RET_INTERNAL) { /* !strict */
                                intErr = 1; break;
                        }
+                       ts = events[i].any.timestamp;
+                       if (!mints.tv_sec && !mints.tv_usec
+                           || ts.tv_sec < mints.tv_sec
+                           || (ts.tv_sec == mints.tv_sec && ts.tv_usec < mints.tv_usec)) mints = ts;
                }
+               /* no events or status computation error */
                if (intstat->pub.state == EDG_WLL_JOB_UNDEF) {
                        intstat->pub.state = EDG_WLL_JOB_UNKNOWN;
+                       if (num_events) intstat->pub.lastUpdateTime = mints;
+                       else intstat->pub.lastUpdateTime.tv_sec = 1;
                }
 
 
@@ -489,6 +498,20 @@ int edg_wll_intJobStatus(
                intErr = edg_wlc_JobIdDup(job, &intstat->pub.jobId);
                if (intErr) return edg_wll_SetError(ctx, intErr, NULL);
 
+               /* don't update status of grey jobs */
+               md5_jobid = glite_jobid_getUnique(job);
+               trio_asprintf(&stmt, "select grey from jobs where jobid='%|Ss'", md5_jobid);
+               free(md5_jobid);
+               if (edg_wll_ExecSQL(ctx, stmt, &sh) < 0 ||
+                   (res = edg_wll_FetchRow(ctx, sh, 1, NULL, &out)) < 0) {
+                       free(stmt);
+                       return edg_wll_Error(ctx, NULL, NULL);
+               }
+               if (!out || strcmp(out, "0") != 0) update_db = 0;
+               glite_lbu_FreeStmt(&sh);
+               free(stmt);
+               free(out);
+
                if (update_db) {
                        int tsq = num_events - 1;
                        if (add_fqans && tsq == 0 && ctx->fqans != NULL) {
index 737a860..01f53e8 100644 (file)
@@ -111,7 +111,7 @@ int edg_wll_QueryEventsServer(
        trio_asprintf(&qbase,"SELECT e.event,j.userid,j.dg_jobid,e.code,"
                "e.prog,e.host,u.cert_subj,e.time_stamp,e.usec,e.level,e.arrived,e.seqcode "
                "FROM events e,users u,jobs j%s "
-               "WHERE %se.jobid=j.jobid AND e.userid=u.userid AND e.code != %d "
+               "WHERE %se.jobid=j.jobid AND j.grey=0 AND e.userid=u.userid AND e.code != %d "
                "%s %s %s %s %s %s",
                where_flags & FL_SEL_STATUS ? ",states s"       : "",
                where_flags & FL_SEL_STATUS ? "s.jobid=j.jobid AND " : "",
@@ -385,13 +385,13 @@ int edg_wll_QueryJobsServer(
 
        if ( (where_flags & FL_SEL_STATUS) )
                trio_asprintf(&qbase,"SELECT DISTINCT j.dg_jobid,j.userid "
-                                                "FROM jobs j, states s WHERE j.jobid=s.jobid %s %s AND %s ORDER BY j.jobid", 
+                                                "FROM jobs j, states s WHERE j.jobid=s.jobid AND j.grey=0 %s %s AND %s ORDER BY j.jobid", 
                                                (job_where) ? "AND" : "",
                                                (job_where) ? job_where : "",
                                                (ctx->isProxy) ? "j.proxy='1'" : "j.server='1'");
        else
                trio_asprintf(&qbase,"SELECT DISTINCT j.dg_jobid,j.userid "
-                                                "FROM jobs j WHERE %s %s %s "
+                                                "FROM jobs j WHERE j.grey=0 AND %s %s %s "
                                                 "ORDER BY j.jobid", 
                                                (job_where) ? job_where : "",
                                                (job_where) ? "AND" : "",
index b09e62a..9d523e5 100644 (file)
@@ -666,10 +666,12 @@ int purge_one(edg_wll_Context ctx,glite_jobid_const_t job,int dump, int purge, i
 
                        // get job prefix/suffix before its state is deleted
                        if ( jobtype == EDG_WLL_NUMBER_OF_JOBTYPES) goto rollback;
-                       if ( get_jobid_suffix(ctx, job, jobtype, &root, &suffix) ) goto rollback;
-                       if ( get_jobid_prefix(ctx, job, jobtype, &prefix) ) goto rollback;
-                       
-               
+                       if (get_jobid_suffix(ctx, job, jobtype, &root, &suffix)
+                        || get_jobid_prefix(ctx, job, jobtype, &prefix)) {
+                               fprintf(stderr,"[%d] unknown job type of the '%s'.\n", getpid(), dbjob);
+                               syslog(LOG_WARNING,"Warning: unknown job type of the '%s'", dbjob);
+                               edg_wll_ResetError(ctx);
+                       }
                }
 
                if ( purge )
@@ -690,7 +692,7 @@ int purge_one(edg_wll_Context ctx,glite_jobid_const_t job,int dump, int purge, i
                        free(stmt); stmt = NULL;
                }
 
-               if ( purge )
+               if ( purge && prefix && suffix )
                {
                        /* Store zombie prefix */
                
index 8be7476..bb18f2f 100644 (file)
@@ -14,7 +14,7 @@
 /* where Z.XX is version from indent + 1 (version after commit), Y = Z+1 */
 /* and DESCRIPTION is short hit why version changed            */
 
-#define INTSTAT_VERSION "revision 2.7 - usertag sequences"
+#define INTSTAT_VERSION "revision 2.8 - undef status time"
 //                      ".... MAX LENGTH 32 BYTES !! ...."
 
 // Internal error codes