remove jobs from proxy on state critetions, not on event ones
authorMiloš Mulač <mulac@civ.zcu.cz>
Tue, 4 Dec 2007 10:02:14 +0000 (10:02 +0000)
committerMiloš Mulač <mulac@civ.zcu.cz>
Tue, 4 Dec 2007 10:02:14 +0000 (10:02 +0000)
- encode/decode new (+forgotten) jobstat fields into/from DB

org.glite.lb.server/src/db_store.c
org.glite.lb.server/src/jobstat.h
org.glite.lb.server/src/jobstat_supp.c
org.glite.lb.server/src/process_event.c

index 0479e2c..9ebf36b 100644 (file)
@@ -39,7 +39,7 @@ db_store(edg_wll_Context ctx,char *ucs, char *event)
   int  seq;
   int   err;
   edg_wll_JobStat      newstat;
-  char                         *srvName;
+  char                         *srvName = NULL;
   unsigned int         srvPort;
 
 
@@ -84,10 +84,12 @@ db_store(edg_wll_Context ctx,char *ucs, char *event)
    */
   if (ctx->isProxy && ctx->serverRunning && (ev->any.priority & EDG_WLL_LOGFLAG_DIRECT) ) {
        if (!strcmp(ctx->srvName, srvName)) {
+               free(srvName);
                return 0;
        }
 
   }
+  free(srvName);
 
   /* XXX: if event type is user tag, convert the tag name to lowercase!
    *     (not sure whether to convert a value too is reasonable
@@ -130,7 +132,7 @@ db_store(edg_wll_Context ctx,char *ucs, char *event)
     }
 #endif
 
-    err = edg_wll_StepIntState(ctx,ev->any.jobId, ev, seq, ctx->isProxy? NULL: &newstat);
+    err = edg_wll_StepIntState(ctx,ev->any.jobId, ev, seq, &newstat);
   }
 
   /* XXX: in edg_wll_StepIntState() 
@@ -146,7 +148,10 @@ err:
     edg_wll_FreeEvent(ev);
     free(ev);
   }
-  
+
+  if ( newstat.state ) edg_wll_FreeStatus(&newstat);
+
+
   return edg_wll_Error(ctx,NULL,NULL);
 }
 
@@ -220,6 +225,7 @@ db_parent_store(edg_wll_Context ctx, edg_wll_Event *ev, intJobStat *is)
 err:
 
   free(event);
+  if ( newstat.state ) edg_wll_FreeStatus(&newstat);
   
   return edg_wll_Error(ctx,NULL,NULL);
 }
@@ -251,20 +257,9 @@ static int db_actual_store(edg_wll_Context ctx, char *event, edg_wll_Event *ev,
        }
 
        /* LB proxy purge
-        * XXX: Set propper set of states!
-        * TODO: Do the set of states configurable? 
         */
-       switch ( ev->any.type ) {
-       case EDG_WLL_EVENT_CLEAR:
-       case EDG_WLL_EVENT_ABORT:
-               edg_wll_PurgeServerProxy(ctx, ev->any.jobId);
-               break;
-       case EDG_WLL_EVENT_CANCEL:
-               if (ev->cancel.status_code == EDG_WLL_CANCEL_DONE) 
+       if (newstat->remove_from_proxy) 
                        edg_wll_PurgeServerProxy(ctx, ev->any.jobId);
-               break;
-       default: break;
-       }
   } else 
 #ifdef LB_PERF
        if( sink_mode == GLITE_LB_SINK_SEND ) {
@@ -272,32 +267,26 @@ static int db_actual_store(edg_wll_Context ctx, char *event, edg_wll_Event *ev,
        } else 
 #endif
   {
+       char            *jobIdHost = NULL;
+       unsigned int    jobIdPort;
+
+
        /* Purge proxy flag
-        * XXX: Workaround - if these events arrive on server with shared DB
-        *      it reaches terminal state from proxy point of view and so
-        *      proxy flag have to be removed (cannot call edg_wll_PurgeServerProxy)
-        * - these triggers on events should be replaced by triggers on state change
         * (remove extern unset_proxy_flag, set it static in srv_purge.c)
         */
-       switch ( ev->any.type ) {
-       case EDG_WLL_EVENT_CLEAR:
-       case EDG_WLL_EVENT_ABORT:
-               if (unset_proxy_flag(ctx, ev->any.jobId) < 0) {
-                                        return(edg_wll_Error(ctx,NULL,NULL));
-               }
-               break;
-       case EDG_WLL_EVENT_CANCEL:
-               if (ev->cancel.status_code == EDG_WLL_CANCEL_DONE) 
+       edg_wlc_JobIdGetServerParts(ev->any.jobId, &jobIdHost, &jobIdPort);
+       if ( newstat->remove_from_proxy && (ctx->srvPort == jobIdPort) && 
+               !strcmp(jobIdHost,ctx->srvName) )
+       {
                        if (unset_proxy_flag(ctx, ev->any.jobId) < 0) {
+                                               free(jobIdHost);
                                                return(edg_wll_Error(ctx,NULL,NULL));
                        }
-               break;
-       default: break;
        }
+       free(jobIdHost);
 
        if ( newstat->state ) {
                edg_wll_NotifMatch(ctx, newstat);
-               edg_wll_FreeStatus(newstat);
        }
        if ( ctx->jpreg_dir && ev->any.type == EDG_WLL_EVENT_REGJOB ) {
                char *jids, *msg;
index a45c376..faca87c 100644 (file)
  * (includes edg_wll_JobStat API structure)
  */
 
-#define INTSTAT_VERSION "release-3.3.4"
+/* convention: revision X.XX - DESCRIPTION                     */
+/* where X.XX is version from indent + 1 (version after commit) */
+/* and DESCRIPTION is short hit why version changed            */
+
+#define INTSTAT_VERSION "revision 1.31 - proxy merge"
 
 
 // Internal error codes 
index ae9a299..78a6c29 100644 (file)
@@ -499,6 +499,8 @@ static char *enc_JobStat(char *old, edg_wll_JobStat* stat)
        if (ret) ret = enc_int(ret, stat->suspended);
        if (ret) ret = enc_string(ret, stat->suspend_reason);
        if (ret) ret = enc_int_array(ret, stat->children_hist, EDG_WLL_NUMBER_OF_STATCODES);
+       if (ret) ret = enc_string(ret, stat->failure_reasons);
+       if (ret) ret = enc_int(ret, stat->remove_from_proxy);
        if (ret) ret = enc_string(ret, stat->pbs_state);
        if (ret) ret = enc_string(ret, stat->pbs_queue);
        if (ret) ret = enc_string(ret, stat->pbs_owner);
@@ -576,6 +578,8 @@ static edg_wll_JobStat* dec_JobStat(char *in, char **rest)
                            stat->children_hist = (int*)calloc(EDG_WLL_NUMBER_OF_STATCODES+1, sizeof(int));
                            dec_int_array(tmp_in, &tmp_in, stat->children_hist);
        }
+        if (tmp_in != NULL) stat->failure_reasons = dec_string(tmp_in, &tmp_in);
+        if (tmp_in != NULL) stat->remove_from_proxy = dec_int(tmp_in, &tmp_in);
         if (tmp_in != NULL) stat->pbs_state = dec_string(tmp_in, &tmp_in);
         if (tmp_in != NULL) stat->pbs_queue = dec_string(tmp_in, &tmp_in);
         if (tmp_in != NULL) stat->pbs_owner = dec_string(tmp_in, &tmp_in);
index 42f4faa..d7a150f 100644 (file)
@@ -746,6 +746,7 @@ static int processEvent_glite(intJobStat *js, edg_wll_Event *e, int ev_seq, int
                                                js->pub.cancelling = 1; break;
                                        case EDG_WLL_CANCEL_DONE:
                                                js->pub.state = EDG_WLL_JOB_CANCELLED;
+                                               js->pub.remove_from_proxy = 1;
                                                rep(js->pub.reason, e->cancel.reason);
                                                rep(js->last_seqcode, e->any.seqcode);
                                                rep(js->pub.location, "none");
@@ -768,6 +769,7 @@ static int processEvent_glite(intJobStat *js, edg_wll_Event *e, int ev_seq, int
                        if (e->any.source == EDG_WLL_SOURCE_WORKLOAD_MANAGER) res = RET_OK;
                        if (USABLE(res, strict)) {
                                js->pub.state = EDG_WLL_JOB_ABORTED;
+                               js->pub.remove_from_proxy = 1;
                                rep(js->pub.reason, e->abort.reason);
                                rep(js->pub.location, "none");
 
@@ -778,6 +780,7 @@ static int processEvent_glite(intJobStat *js, edg_wll_Event *e, int ev_seq, int
                case EDG_WLL_EVENT_CLEAR:
                        if (USABLE(res, strict)) {
                                js->pub.state = EDG_WLL_JOB_CLEARED;
+                               js->pub.remove_from_proxy = 1;
                                rep(js->pub.location, "none");
                                switch (e->clear.reason) {
                                        case EDG_WLL_CLEAR_USER: