Purging of the job collections:
authorFrantišek Dvořák <valtri@civ.zcu.cz>
Wed, 22 Jul 2009 18:26:15 +0000 (18:26 +0000)
committerFrantišek Dvořák <valtri@civ.zcu.cz>
Wed, 22 Jul 2009 18:26:15 +0000 (18:26 +0000)
 - purge the root job first
 - purging subjobs, caring for orphans
 - not single transaction
 - code cleanups

org.glite.lb.server/src/srv_purge.c

index 49c47dd..a10cd0b 100644 (file)
@@ -41,9 +41,12 @@ static const char* const resp_headers[] = {
        NULL
 };
 
+static int purge_check(edg_wll_Context ctx, edg_wll_JobStat *stat, time_t start, time_t *timeout);
+static int purge_one_with_subjobs(edg_wll_Context ctx, edg_wll_JobStat *stat, int dumpfile, const edg_wll_PurgeRequest *request, edg_wll_PurgeResult *result, int *njobs, int *parse);
 static int purge_one(edg_wll_Context ctx,glite_jobid_const_t,int,int,int);
 int unset_proxy_flag(edg_wll_Context ctx, glite_jobid_const_t job);
 static int unset_server_flag(edg_wll_Context ctx, glite_jobid_const_t job);
+int job_exists(edg_wll_Context ctx, glite_jobid_const_t job);
 static void purge_throttle(int jobs_to_exa, double purge_end, double *time_per_job, time_t *target_runtime);
 
 
@@ -200,13 +203,14 @@ int edg_wll_PurgeServerProxy(edg_wll_Context ctx, glite_jobid_const_t job)
 int edg_wll_PurgeServer(edg_wll_Context ctx,const edg_wll_PurgeRequest *request, edg_wll_PurgeResult *result)
 {
        int     i,parse = 0,dumpfile = -1;
-       edg_wlc_JobId   job;
+       glite_jobid_t   job;
        char    *tmpfname = NULL;
        int     naffected_jobs = 0, ret;
        double          now, time_per_job, purge_end;
        struct timeval  tp;
        int             jobs_to_exa;
        time_t  target_runtime;
+       edg_wll_JobStat stat;
 
 
        if (!ctx->noAuth) {
@@ -265,22 +269,30 @@ int edg_wll_PurgeServer(edg_wll_Context ctx,const edg_wll_PurgeRequest *request,
                        else {
                                purge_throttle(jobs_to_exa, purge_end, &time_per_job, &target_runtime);
 
-                               switch (purge_one(ctx,job,dumpfile,request->flags&EDG_WLL_PURGE_REALLY_PURGE,ctx->isProxy)) {
-                                       case 0: if (request->flags & EDG_WLL_PURGE_LIST_JOBS) {
-                                                       result->jobs = realloc(result->jobs,(naffected_jobs+2) * sizeof(*(result->jobs)));
-                                                       result->jobs[naffected_jobs] = strdup(request->jobs[i]);
-                                                       result->jobs[naffected_jobs+1] = NULL;
-                                               }
-                                               naffected_jobs++;
+                               memset(&stat,0,sizeof stat);
+                               if (edg_wll_JobStatusServer(ctx,job,EDG_WLL_STAT_CHILDSTAT | EDG_WLL_STAT_CHILDREN,&stat)) {
+                                       if (edg_wll_Error(ctx, NULL, NULL) == ENOENT) {
+                                               /* job purged meanwhile, ignore */
+                                               edg_wll_ResetError(ctx);
+                                               glite_jobid_free(job);
+                                               continue;
+                                       }
+                                       edg_wll_FreeStatus(&stat);
+                                       goto abort; 
+                               }
+
+                               switch (purge_one_with_subjobs(ctx, &stat, dumpfile, request, result, &naffected_jobs, &parse)) {
+                                       case 0: 
                                                break;
                                        case ENOENT: /* job does not exist, consider purged and ignore */
-                                                    edg_wll_ResetError(ctx);
-                                                    break;
+                                               edg_wll_ResetError(ctx);
+                                               break;
                                        default: goto abort;
                                }
 
+                               edg_wll_FreeStatus(&stat);
                        }
-                       edg_wlc_JobIdFree(job);
+                       glite_jobid_free(job);
                }
                jobs_to_exa--;
        }
@@ -310,8 +322,6 @@ int edg_wll_PurgeServer(edg_wll_Context ctx,const edg_wll_PurgeRequest *request,
                                parse = 1;
                        }
                        else {
-                               edg_wll_JobStat stat;
-
                                if (check_strict_jobid(ctx,job)) {
                                        edg_wlc_JobIdFree(job);
                                        free(job_s);
@@ -322,42 +332,36 @@ int edg_wll_PurgeServer(edg_wll_Context ctx,const edg_wll_PurgeRequest *request,
                                purge_throttle(jobs_to_exa, purge_end, &time_per_job, &target_runtime);
 
                                memset(&stat,0,sizeof stat);
-                               if (edg_wll_JobStatusServer(ctx,job,0,&stat)) {  /* FIXME: replace by intJobStatus ?? */
+                               if (edg_wll_JobStatusServer(ctx,job,EDG_WLL_STAT_CHILDSTAT | EDG_WLL_STAT_CHILDREN,&stat)) {
                                        if (edg_wll_Error(ctx, NULL, NULL) == ENOENT) {
                                                /* job purged meanwhile, ignore */
                                                edg_wll_ResetError(ctx);
+                                               glite_jobid_free(job);
+                                               free(job_s);
                                                continue;
                                        }
                                        edg_wll_FreeStatus(&stat);
                                        goto abort; 
                                }
 
-                               if (timeout[stat.state] >=0 && stat.lastUpdateTime.tv_sec && start-stat.lastUpdateTime.tv_sec > timeout[stat.state] && !check_strict_jobid(ctx,job))
-                               {
-                                       if (purge_one(ctx,job,dumpfile,request->flags&EDG_WLL_PURGE_REALLY_PURGE,ctx->isProxy)) {
+                               if (purge_check(ctx, &stat, start, timeout)) {
+                                       if (purge_one_with_subjobs(ctx, &stat, dumpfile, request, result, &naffected_jobs, &parse)) {
                                                edg_wll_FreeStatus(&stat);
                                                if (edg_wll_Error(ctx, NULL, NULL) == ENOENT) {
                                                        /* job purged meanwhile, ignore */
                                                        edg_wll_ResetError(ctx);
+                                                       glite_jobid_free(job);
+                                                       free(job_s);
                                                        continue;
                                                }
                                                goto abort;
                                        }
-
-                               /* XXX: change with the streaming interface */
-                                       if (request->flags & EDG_WLL_PURGE_LIST_JOBS) {
-                                               result->jobs = realloc(result->jobs,(naffected_jobs+2) * sizeof(*(result->jobs)));
-                                               result->jobs[naffected_jobs] = job_s;
-                                               result->jobs[naffected_jobs+1] = NULL;
-                                               job_s = NULL;
-                                       }
-                                       naffected_jobs++;
                                }
 
-                               edg_wlc_JobIdFree(job);
+                               glite_jobid_free(job);
                                edg_wll_FreeStatus(&stat);
-                               free(job_s);
                        }
+                       free(job_s);
                        jobs_to_exa--;
                }
                glite_lbu_FreeStmt(&s);
@@ -586,6 +590,80 @@ err:
 }
 
 
+static int purge_check(edg_wll_Context ctx, edg_wll_JobStat *stat, time_t start, time_t *timeout) {
+       int to_purge = 0;
+       int i;
+       edg_wll_JobStat *substat;
+
+       if (timeout[stat->state] >=0 && stat->lastUpdateTime.tv_sec && start-stat->lastUpdateTime.tv_sec > timeout[stat->state]
+           // job from this server
+           && !check_strict_jobid(ctx, stat->jobId)
+           // if subjob purge only when orphan
+           && (!stat->parent_job || job_exists(ctx, stat->parent_job) == 0)) {
+               to_purge = 1;
+       }
+       // check the whole collection yet
+       if (to_purge && stat->children_num && stat->children_states) {
+               for (i = 0; i < stat->children_num && stat->children_states[i].state != EDG_WLL_JOB_UNDEF; i++) {
+                       substat = stat->children_states + i;
+                       if (timeout[substat->state] >=0 && substat->lastUpdateTime.tv_sec && start-substat->lastUpdateTime.tv_sec > timeout[substat->state]) ;
+                       else {
+                               to_purge = 0;
+                               break;
+                       }
+               }
+       }
+
+       return to_purge;
+}
+
+
+#define GRAN 32
+
+static int purge_one_with_subjobs(edg_wll_Context ctx, edg_wll_JobStat *stat, int dumpfile, const edg_wll_PurgeRequest *request, edg_wll_PurgeResult *result, int *njobs, int *parse) {
+       glite_jobid_t subjob = NULL;
+       char *job_s;
+       int i;
+
+       if (purge_one(ctx,stat->jobId,dumpfile,request->flags&EDG_WLL_PURGE_REALLY_PURGE,ctx->isProxy)) return edg_wll_Error(ctx, NULL, NULL);
+
+/* XXX: change with the streaming interface */
+       if (request->flags & EDG_WLL_PURGE_LIST_JOBS) {
+               job_s = glite_jobid_unparse(stat->jobId);
+               if (*njobs % GRAN == 0 || !result->jobs)
+                       result->jobs = realloc(result->jobs,(*njobs+GRAN+1) * sizeof(*(result->jobs)));
+               result->jobs[*njobs] = job_s;
+               result->jobs[*njobs+1] = NULL;
+       }
+       (*njobs)++;
+
+       /* purge the subjobs */
+       if (stat->children_num && stat->children) {
+               for (i = 0; i < stat->children_num && stat->children[i]; i++) {
+                       if (glite_jobid_parse(stat->children[i], &subjob)) {
+                               fprintf(stderr,"%s: parse error (internal inconsistency !)\n",stat->children[i]);
+                               *parse = 1;
+                       }
+                       if (purge_one(ctx, subjob, dumpfile, request->flags&EDG_WLL_PURGE_REALLY_PURGE,ctx->isProxy)) {
+                               glite_jobid_free(subjob);
+                               return edg_wll_Error(ctx, NULL, NULL);
+                       }
+                       glite_jobid_free(subjob);
+
+                       if (request->flags & EDG_WLL_PURGE_LIST_JOBS) {
+                               if (*njobs % GRAN == 0 || !result->jobs)
+                                       result->jobs = realloc(result->jobs,(*njobs+GRAN+1) * sizeof(*(result->jobs)));
+                               result->jobs[*njobs] = strdup(stat->children[i]);
+                               result->jobs[*njobs+1] = NULL;
+                       }
+                       (*njobs)++;
+               }
+       }
+
+       return edg_wll_Error(ctx, NULL, NULL);
+}
+
+
 int purge_one(edg_wll_Context ctx,glite_jobid_const_t job,int dump, int purge, int purge_from_proxy_only)
 {
        char    *dbjob = NULL;
@@ -653,7 +731,7 @@ int purge_one(edg_wll_Context ctx,glite_jobid_const_t job,int dump, int purge, i
                                break;
                }
 
-               dbjob = edg_wlc_JobIdGetUnique(job);    /* XXX: strict jobid already checked */
+               dbjob = glite_jobid_getUnique(job);     /* XXX: strict jobid already checked */
 
                if ( purge )
                {
@@ -792,20 +870,19 @@ int purge_one(edg_wll_Context ctx,glite_jobid_const_t job,int dump, int purge, i
 
                dumped = 1;
                while ((ret = edg_wll_FetchRow(ctx,q,sizofa(res),NULL,res)) > 0) {
-                       int     event;
+                       int     event, ret_dump = 0, i;
 
-                       
                        event = atoi(res[0]);
 
                        if (dump >= 0) {
-                               int ret_dump, i;
-
                                assert(ret == 10);
                                ret_dump = dump_events( ctx, job, dump, (char **) &res);
-                               for (i=0; i<sizofa(res); i++) free(res[i]);
-                               if (ret_dump) goto rollback;
                        }
 
+                       for (i=0; i<sizofa(res); i++) free(res[i]);
+
+                       if (dump >= 0 && ret_dump) goto rollback;
+
                        if ( purge ) 
                                if (edg_wll_delete_event(ctx,dbjob,event)) goto rollback;
                }
@@ -838,9 +915,9 @@ int unset_proxy_flag(edg_wll_Context ctx, glite_jobid_const_t job)
 
        edg_wll_ResetError(ctx);
 
-       dbjob = edg_wlc_JobIdGetUnique(job);
-
+       dbjob = glite_jobid_getUnique(job);
        trio_asprintf(&stmt,"update jobs set proxy='0' where jobid='%|Ss'", dbjob);
+       free(dbjob);
 
        return(edg_wll_ExecSQL(ctx,stmt,NULL));
 }
@@ -853,14 +930,32 @@ int unset_server_flag(edg_wll_Context ctx, glite_jobid_const_t job)
 
        edg_wll_ResetError(ctx);
 
-       dbjob = edg_wlc_JobIdGetUnique(job);
-
+       dbjob = glite_jobid_getUnique(job);
        trio_asprintf(&stmt,"update jobs set server='0' where jobid='%|Ss'", dbjob);
+       free(dbjob);
 
        return(edg_wll_ExecSQL(ctx,stmt,NULL));
 }
 
 
+int job_exists(edg_wll_Context ctx, glite_jobid_const_t job) {
+       char *stmt, *dbjob;
+       int retval;
+
+       edg_wll_ResetError(ctx);
+
+       if (check_strict_jobid(ctx, job)) return 0;
+
+       dbjob = glite_jobid_getUnique(job);
+       trio_asprintf(&stmt, "SELECT jobid FROM jobs WHERE jobid='%|Ss'", dbjob);
+       retval = edg_wll_ExecSQL(ctx, stmt, NULL);
+       free(dbjob);
+       free(stmt);
+
+       return retval;
+}
+
+
 /* throttle purging according to the required target_runtime */
 static void purge_throttle(int jobs_to_exa, double purge_end, double *time_per_job, time_t *target_runtime) {
        struct timeval  tp;