purge jobs or they flags from DB
authorMiloš Mulač <mulac@civ.zcu.cz>
Thu, 29 Nov 2007 14:33:59 +0000 (14:33 +0000)
committerMiloš Mulač <mulac@civ.zcu.cz>
Thu, 29 Nov 2007 14:33:59 +0000 (14:33 +0000)
org.glite.lb.server/src/db_calls.c
org.glite.lb.server/src/db_store.c
org.glite.lb.server/src/srv_purge.c

index 941abdb..203ac2b 100644 (file)
@@ -42,11 +42,8 @@ int edg_wll_jobMembership(edg_wll_Context ctx, edg_wlc_JobId job)
                 if (strcmp(res[1],"0")) result += DB_SERVER_JOB;
         }
         else {
-                if (ret == 0) result = 0;
-                else {
-                        fprintf(stderr,"Error retrieving proxy&server fields of jobs table. Missing column?\n");
-                        edg_wll_SetError(ctx,ENOENT,dbjob);
-                }
+               fprintf(stderr,"Error retrieving proxy&server fields of jobs table. Missing column?\n");
+                edg_wll_SetError(ctx,ENOENT,dbjob);
         }
         glite_lbu_FreeStmt(&q);
 
index 7ac1a70..0479e2c 100644 (file)
@@ -25,6 +25,7 @@
 /* XXX */
 #define use_db 1
 
+extern int unset_proxy_flag(edg_wll_Context, edg_wlc_JobId);
 extern int edg_wll_NotifMatch(edg_wll_Context, const edg_wll_JobStat *);
 
 
@@ -271,6 +272,29 @@ static int db_actual_store(edg_wll_Context ctx, char *event, edg_wll_Event *ev,
        } else 
 #endif
   {
+       /* Purge proxy flag
+        * XXX: Workaround - if these events arrive on server with shared DB
+        *      it reaches terminal state from proxy point of view and so
+        *      proxy flag have to be removed (cannot call edg_wll_PurgeServerProxy)
+        * - these triggers on events should be replaced by triggers on state change
+        * (remove extern unset_proxy_flag, set it static in srv_purge.c)
+        */
+       switch ( ev->any.type ) {
+       case EDG_WLL_EVENT_CLEAR:
+       case EDG_WLL_EVENT_ABORT:
+               if (unset_proxy_flag(ctx, ev->any.jobId) < 0) {
+                                        return(edg_wll_Error(ctx,NULL,NULL));
+               }
+               break;
+       case EDG_WLL_EVENT_CANCEL:
+               if (ev->cancel.status_code == EDG_WLL_CANCEL_DONE) 
+                       if (unset_proxy_flag(ctx, ev->any.jobId) < 0) {
+                                               return(edg_wll_Error(ctx,NULL,NULL));
+                       }
+               break;
+       default: break;
+       }
+
        if ( newstat->state ) {
                edg_wll_NotifMatch(ctx, newstat);
                edg_wll_FreeStatus(newstat);
index d303f4a..2478ad9 100644 (file)
@@ -43,7 +43,8 @@ static const char* const resp_headers[] = {
 };
 
 static int purge_one(edg_wll_Context ctx,const edg_wlc_JobId,int,int);
-static int unset_proxy_flag(edg_wll_Context ctx, edg_wlc_JobId job);
+int unset_proxy_flag(edg_wll_Context ctx, edg_wlc_JobId job);
+static int unset_server_flag(edg_wll_Context ctx, edg_wlc_JobId job);
 
 
 int edg_wll_CreateTmpFileStorage(edg_wll_Context ctx, char *prefix, char **fname)
@@ -185,31 +186,15 @@ int edg_wll_CreateFileStorage(edg_wll_Context ctx, char *file_type, char *prefix
 
 int edg_wll_PurgeServerProxy(edg_wll_Context ctx, edg_wlc_JobId job)
 {
-       switch (edg_wll_jobMembership(ctx, job)) {
-               case DB_PROXY_JOB:
-                       switch ( purge_one(ctx, job, -1, 1) ) {
-                               case 0:
-                               case ENOENT:
-                                       edg_wll_ResetError(ctx);
-                                       return 0;
-
-                               default:
-                                       return -1;
-                       }
-                       break;
-               case DB_SERVER_JOB:
-                       // should not happen, however, no action needed
-                       // proxy flag is unset already
+       switch ( purge_one(ctx, job, -1, 1) ) {
+               case 0:
+               case ENOENT:
                        edg_wll_ResetError(ctx);
                        return 0;
-                       break;
-               case DB_PROXY_JOB+DB_SERVER_JOB:
-                       return(unset_proxy_flag(ctx, job));
-                       break;
+
                default:
                        return -1;
-                       break;
-               }
+       }
 }
 
 int edg_wll_PurgeServer(edg_wll_Context ctx,const edg_wll_PurgeRequest *request)
@@ -295,7 +280,7 @@ int edg_wll_PurgeServer(edg_wll_Context ctx,const edg_wll_PurgeRequest *request)
                for (i=0; i<EDG_WLL_NUMBER_OF_STATCODES; i++)
                        timeout[i] = request->timeout[i] < 0 ? ctx->purge_timeout[i] : request->timeout[i];
 
-               if (edg_wll_ExecSQL(ctx,"select dg_jobid from jobs",&s) < 0) goto abort;
+               if (edg_wll_ExecSQL(ctx,"select dg_jobid from jobs where server='1'",&s) < 0) goto abort;
                while ((res = edg_wll_FetchRow(ctx,s,1,NULL,&job_s)) > 0) {
                        if (edg_wlc_JobIdParse(job_s,&job)) {
                                fprintf(stderr,"%s: parse error (internal inconsistency !)\n",job_s);
@@ -443,6 +428,44 @@ int purge_one(edg_wll_Context ctx,const edg_wlc_JobId job,int dump, int purge)
        edg_wll_ResetError(ctx);
        if ( !purge && dump < 0 ) return 0;
 
+       switch (edg_wll_jobMembership(ctx, job)) {
+               case DB_PROXY_JOB:
+                       if (!ctx->isProxy) {
+                               /* should not happen */
+                               return 0;
+                       }
+                       /* continue */
+                       break;
+               case DB_SERVER_JOB:
+                       if (ctx->isProxy) {
+                               /* should not happen */
+                               return 0;
+                       }
+                       /* continue */
+                       break;
+               case DB_PROXY_JOB+DB_SERVER_JOB:
+                       if (ctx->isProxy) {
+                               purge = 0;
+                               if (unset_proxy_flag(ctx, job) < 0) {
+                                       return(edg_wll_Error(ctx,NULL,NULL));
+                               }
+                       }
+                       else {
+                               purge = 0;
+                               if (unset_server_flag(ctx, job) < 0) {
+                                       return(edg_wll_Error(ctx,NULL,NULL));
+                               }
+                       }
+                       break;
+               case 0:
+                       // Zombie job (server=0, proxy=0)? should not happen;
+                       // clear it to keep DB healthy
+                       break;
+               default:
+                       return 0;
+                       break;
+       }
+
        dbjob = edg_wlc_JobIdGetUnique(job);    /* XXX: strict jobid already checked */
        if (edg_wll_LockJob(ctx,job)) goto clean;
 
@@ -615,3 +638,18 @@ int unset_proxy_flag(edg_wll_Context ctx, edg_wlc_JobId job)
        return(edg_wll_ExecSQL(ctx,stmt,NULL));
 }
 
+
+int unset_server_flag(edg_wll_Context ctx, edg_wlc_JobId job)
+{
+       char    *stmt = NULL;
+       char            *dbjob;
+
+       edg_wll_ResetError(ctx);
+
+       dbjob = edg_wlc_JobIdGetUnique(job);
+
+       trio_asprintf(&stmt,"update jobs set server='0' where jobid='%|Ss'", dbjob);
+
+       return(edg_wll_ExecSQL(ctx,stmt,NULL));
+}
+