From 55c2211d27222a7221bfbd66ef94a395cde47d66 Mon Sep 17 00:00:00 2001 From: =?utf8?q?Franti=C5=A1ek=20Dvo=C5=99=C3=A1k?= Date: Wed, 22 Jul 2009 18:26:15 +0000 Subject: [PATCH] Purging of the job collections: - purge the root job first - purging subjobs, caring for orphans - not single transaction - code cleanups --- org.glite.lb.server/src/srv_purge.c | 173 ++++++++++++++++++++++++++++-------- 1 file changed, 134 insertions(+), 39 deletions(-) diff --git a/org.glite.lb.server/src/srv_purge.c b/org.glite.lb.server/src/srv_purge.c index 49c47dd..a10cd0b 100644 --- a/org.glite.lb.server/src/srv_purge.c +++ b/org.glite.lb.server/src/srv_purge.c @@ -41,9 +41,12 @@ static const char* const resp_headers[] = { NULL }; +static int purge_check(edg_wll_Context ctx, edg_wll_JobStat *stat, time_t start, time_t *timeout); +static int purge_one_with_subjobs(edg_wll_Context ctx, edg_wll_JobStat *stat, int dumpfile, const edg_wll_PurgeRequest *request, edg_wll_PurgeResult *result, int *njobs, int *parse); static int purge_one(edg_wll_Context ctx,glite_jobid_const_t,int,int,int); int unset_proxy_flag(edg_wll_Context ctx, glite_jobid_const_t job); static int unset_server_flag(edg_wll_Context ctx, glite_jobid_const_t job); +int job_exists(edg_wll_Context ctx, glite_jobid_const_t job); static void purge_throttle(int jobs_to_exa, double purge_end, double *time_per_job, time_t *target_runtime); @@ -200,13 +203,14 @@ int edg_wll_PurgeServerProxy(edg_wll_Context ctx, glite_jobid_const_t job) int edg_wll_PurgeServer(edg_wll_Context ctx,const edg_wll_PurgeRequest *request, edg_wll_PurgeResult *result) { int i,parse = 0,dumpfile = -1; - edg_wlc_JobId job; + glite_jobid_t job; char *tmpfname = NULL; int naffected_jobs = 0, ret; double now, time_per_job, purge_end; struct timeval tp; int jobs_to_exa; time_t target_runtime; + edg_wll_JobStat stat; if (!ctx->noAuth) { @@ -265,22 +269,30 @@ int edg_wll_PurgeServer(edg_wll_Context ctx,const edg_wll_PurgeRequest *request, else { purge_throttle(jobs_to_exa, purge_end, &time_per_job, &target_runtime); - switch (purge_one(ctx,job,dumpfile,request->flags&EDG_WLL_PURGE_REALLY_PURGE,ctx->isProxy)) { - case 0: if (request->flags & EDG_WLL_PURGE_LIST_JOBS) { - result->jobs = realloc(result->jobs,(naffected_jobs+2) * sizeof(*(result->jobs))); - result->jobs[naffected_jobs] = strdup(request->jobs[i]); - result->jobs[naffected_jobs+1] = NULL; - } - naffected_jobs++; + memset(&stat,0,sizeof stat); + if (edg_wll_JobStatusServer(ctx,job,EDG_WLL_STAT_CHILDSTAT | EDG_WLL_STAT_CHILDREN,&stat)) { + if (edg_wll_Error(ctx, NULL, NULL) == ENOENT) { + /* job purged meanwhile, ignore */ + edg_wll_ResetError(ctx); + glite_jobid_free(job); + continue; + } + edg_wll_FreeStatus(&stat); + goto abort; + } + + switch (purge_one_with_subjobs(ctx, &stat, dumpfile, request, result, &naffected_jobs, &parse)) { + case 0: break; case ENOENT: /* job does not exist, consider purged and ignore */ - edg_wll_ResetError(ctx); - break; + edg_wll_ResetError(ctx); + break; default: goto abort; } + edg_wll_FreeStatus(&stat); } - edg_wlc_JobIdFree(job); + glite_jobid_free(job); } jobs_to_exa--; } @@ -310,8 +322,6 @@ int edg_wll_PurgeServer(edg_wll_Context ctx,const edg_wll_PurgeRequest *request, parse = 1; } else { - edg_wll_JobStat stat; - if (check_strict_jobid(ctx,job)) { edg_wlc_JobIdFree(job); free(job_s); @@ -322,42 +332,36 @@ int edg_wll_PurgeServer(edg_wll_Context ctx,const edg_wll_PurgeRequest *request, purge_throttle(jobs_to_exa, purge_end, &time_per_job, &target_runtime); memset(&stat,0,sizeof stat); - if (edg_wll_JobStatusServer(ctx,job,0,&stat)) { /* FIXME: replace by intJobStatus ?? */ + if (edg_wll_JobStatusServer(ctx,job,EDG_WLL_STAT_CHILDSTAT | EDG_WLL_STAT_CHILDREN,&stat)) { if (edg_wll_Error(ctx, NULL, NULL) == ENOENT) { /* job purged meanwhile, ignore */ edg_wll_ResetError(ctx); + glite_jobid_free(job); + free(job_s); continue; } edg_wll_FreeStatus(&stat); goto abort; } - if (timeout[stat.state] >=0 && stat.lastUpdateTime.tv_sec && start-stat.lastUpdateTime.tv_sec > timeout[stat.state] && !check_strict_jobid(ctx,job)) - { - if (purge_one(ctx,job,dumpfile,request->flags&EDG_WLL_PURGE_REALLY_PURGE,ctx->isProxy)) { + if (purge_check(ctx, &stat, start, timeout)) { + if (purge_one_with_subjobs(ctx, &stat, dumpfile, request, result, &naffected_jobs, &parse)) { edg_wll_FreeStatus(&stat); if (edg_wll_Error(ctx, NULL, NULL) == ENOENT) { /* job purged meanwhile, ignore */ edg_wll_ResetError(ctx); + glite_jobid_free(job); + free(job_s); continue; } goto abort; } - - /* XXX: change with the streaming interface */ - if (request->flags & EDG_WLL_PURGE_LIST_JOBS) { - result->jobs = realloc(result->jobs,(naffected_jobs+2) * sizeof(*(result->jobs))); - result->jobs[naffected_jobs] = job_s; - result->jobs[naffected_jobs+1] = NULL; - job_s = NULL; - } - naffected_jobs++; } - edg_wlc_JobIdFree(job); + glite_jobid_free(job); edg_wll_FreeStatus(&stat); - free(job_s); } + free(job_s); jobs_to_exa--; } glite_lbu_FreeStmt(&s); @@ -586,6 +590,80 @@ err: } +static int purge_check(edg_wll_Context ctx, edg_wll_JobStat *stat, time_t start, time_t *timeout) { + int to_purge = 0; + int i; + edg_wll_JobStat *substat; + + if (timeout[stat->state] >=0 && stat->lastUpdateTime.tv_sec && start-stat->lastUpdateTime.tv_sec > timeout[stat->state] + // job from this server + && !check_strict_jobid(ctx, stat->jobId) + // if subjob purge only when orphan + && (!stat->parent_job || job_exists(ctx, stat->parent_job) == 0)) { + to_purge = 1; + } + // check the whole collection yet + if (to_purge && stat->children_num && stat->children_states) { + for (i = 0; i < stat->children_num && stat->children_states[i].state != EDG_WLL_JOB_UNDEF; i++) { + substat = stat->children_states + i; + if (timeout[substat->state] >=0 && substat->lastUpdateTime.tv_sec && start-substat->lastUpdateTime.tv_sec > timeout[substat->state]) ; + else { + to_purge = 0; + break; + } + } + } + + return to_purge; +} + + +#define GRAN 32 + +static int purge_one_with_subjobs(edg_wll_Context ctx, edg_wll_JobStat *stat, int dumpfile, const edg_wll_PurgeRequest *request, edg_wll_PurgeResult *result, int *njobs, int *parse) { + glite_jobid_t subjob = NULL; + char *job_s; + int i; + + if (purge_one(ctx,stat->jobId,dumpfile,request->flags&EDG_WLL_PURGE_REALLY_PURGE,ctx->isProxy)) return edg_wll_Error(ctx, NULL, NULL); + +/* XXX: change with the streaming interface */ + if (request->flags & EDG_WLL_PURGE_LIST_JOBS) { + job_s = glite_jobid_unparse(stat->jobId); + if (*njobs % GRAN == 0 || !result->jobs) + result->jobs = realloc(result->jobs,(*njobs+GRAN+1) * sizeof(*(result->jobs))); + result->jobs[*njobs] = job_s; + result->jobs[*njobs+1] = NULL; + } + (*njobs)++; + + /* purge the subjobs */ + if (stat->children_num && stat->children) { + for (i = 0; i < stat->children_num && stat->children[i]; i++) { + if (glite_jobid_parse(stat->children[i], &subjob)) { + fprintf(stderr,"%s: parse error (internal inconsistency !)\n",stat->children[i]); + *parse = 1; + } + if (purge_one(ctx, subjob, dumpfile, request->flags&EDG_WLL_PURGE_REALLY_PURGE,ctx->isProxy)) { + glite_jobid_free(subjob); + return edg_wll_Error(ctx, NULL, NULL); + } + glite_jobid_free(subjob); + + if (request->flags & EDG_WLL_PURGE_LIST_JOBS) { + if (*njobs % GRAN == 0 || !result->jobs) + result->jobs = realloc(result->jobs,(*njobs+GRAN+1) * sizeof(*(result->jobs))); + result->jobs[*njobs] = strdup(stat->children[i]); + result->jobs[*njobs+1] = NULL; + } + (*njobs)++; + } + } + + return edg_wll_Error(ctx, NULL, NULL); +} + + int purge_one(edg_wll_Context ctx,glite_jobid_const_t job,int dump, int purge, int purge_from_proxy_only) { char *dbjob = NULL; @@ -653,7 +731,7 @@ int purge_one(edg_wll_Context ctx,glite_jobid_const_t job,int dump, int purge, i break; } - dbjob = edg_wlc_JobIdGetUnique(job); /* XXX: strict jobid already checked */ + dbjob = glite_jobid_getUnique(job); /* XXX: strict jobid already checked */ if ( purge ) { @@ -792,20 +870,19 @@ int purge_one(edg_wll_Context ctx,glite_jobid_const_t job,int dump, int purge, i dumped = 1; while ((ret = edg_wll_FetchRow(ctx,q,sizofa(res),NULL,res)) > 0) { - int event; + int event, ret_dump = 0, i; - event = atoi(res[0]); if (dump >= 0) { - int ret_dump, i; - assert(ret == 10); ret_dump = dump_events( ctx, job, dump, (char **) &res); - for (i=0; i= 0 && ret_dump) goto rollback; + if ( purge ) if (edg_wll_delete_event(ctx,dbjob,event)) goto rollback; } @@ -838,9 +915,9 @@ int unset_proxy_flag(edg_wll_Context ctx, glite_jobid_const_t job) edg_wll_ResetError(ctx); - dbjob = edg_wlc_JobIdGetUnique(job); - + dbjob = glite_jobid_getUnique(job); trio_asprintf(&stmt,"update jobs set proxy='0' where jobid='%|Ss'", dbjob); + free(dbjob); return(edg_wll_ExecSQL(ctx,stmt,NULL)); } @@ -853,14 +930,32 @@ int unset_server_flag(edg_wll_Context ctx, glite_jobid_const_t job) edg_wll_ResetError(ctx); - dbjob = edg_wlc_JobIdGetUnique(job); - + dbjob = glite_jobid_getUnique(job); trio_asprintf(&stmt,"update jobs set server='0' where jobid='%|Ss'", dbjob); + free(dbjob); return(edg_wll_ExecSQL(ctx,stmt,NULL)); } +int job_exists(edg_wll_Context ctx, glite_jobid_const_t job) { + char *stmt, *dbjob; + int retval; + + edg_wll_ResetError(ctx); + + if (check_strict_jobid(ctx, job)) return 0; + + dbjob = glite_jobid_getUnique(job); + trio_asprintf(&stmt, "SELECT jobid FROM jobs WHERE jobid='%|Ss'", dbjob); + retval = edg_wll_ExecSQL(ctx, stmt, NULL); + free(dbjob); + free(stmt); + + return retval; +} + + /* throttle purging according to the required target_runtime */ static void purge_throttle(int jobs_to_exa, double purge_end, double *time_per_job, time_t *target_runtime) { struct timeval tp; -- 1.8.2.3