NULL
};
+static int purge_check(edg_wll_Context ctx, edg_wll_JobStat *stat, time_t start, time_t *timeout);
+static int purge_one_with_subjobs(edg_wll_Context ctx, edg_wll_JobStat *stat, int dumpfile, const edg_wll_PurgeRequest *request, edg_wll_PurgeResult *result, int *njobs, int *parse);
static int purge_one(edg_wll_Context ctx,glite_jobid_const_t,int,int,int);
int unset_proxy_flag(edg_wll_Context ctx, glite_jobid_const_t job);
static int unset_server_flag(edg_wll_Context ctx, glite_jobid_const_t job);
+int job_exists(edg_wll_Context ctx, glite_jobid_const_t job);
static void purge_throttle(int jobs_to_exa, double purge_end, double *time_per_job, time_t *target_runtime);
int edg_wll_PurgeServer(edg_wll_Context ctx,const edg_wll_PurgeRequest *request, edg_wll_PurgeResult *result)
{
int i,parse = 0,dumpfile = -1;
- edg_wlc_JobId job;
+ glite_jobid_t job;
char *tmpfname = NULL;
int naffected_jobs = 0, ret;
double now, time_per_job, purge_end;
struct timeval tp;
int jobs_to_exa;
time_t target_runtime;
+ edg_wll_JobStat stat;
if (!ctx->noAuth) {
else {
purge_throttle(jobs_to_exa, purge_end, &time_per_job, &target_runtime);
- switch (purge_one(ctx,job,dumpfile,request->flags&EDG_WLL_PURGE_REALLY_PURGE,ctx->isProxy)) {
- case 0: if (request->flags & EDG_WLL_PURGE_LIST_JOBS) {
- result->jobs = realloc(result->jobs,(naffected_jobs+2) * sizeof(*(result->jobs)));
- result->jobs[naffected_jobs] = strdup(request->jobs[i]);
- result->jobs[naffected_jobs+1] = NULL;
- }
- naffected_jobs++;
+ memset(&stat,0,sizeof stat);
+ if (edg_wll_JobStatusServer(ctx,job,EDG_WLL_STAT_CHILDSTAT | EDG_WLL_STAT_CHILDREN,&stat)) {
+ if (edg_wll_Error(ctx, NULL, NULL) == ENOENT) {
+ /* job purged meanwhile, ignore */
+ edg_wll_ResetError(ctx);
+ glite_jobid_free(job);
+ continue;
+ }
+ edg_wll_FreeStatus(&stat);
+ goto abort;
+ }
+
+ switch (purge_one_with_subjobs(ctx, &stat, dumpfile, request, result, &naffected_jobs, &parse)) {
+ case 0:
break;
case ENOENT: /* job does not exist, consider purged and ignore */
- edg_wll_ResetError(ctx);
- break;
+ edg_wll_ResetError(ctx);
+ break;
default: goto abort;
}
+ edg_wll_FreeStatus(&stat);
}
- edg_wlc_JobIdFree(job);
+ glite_jobid_free(job);
}
jobs_to_exa--;
}
parse = 1;
}
else {
- edg_wll_JobStat stat;
-
if (check_strict_jobid(ctx,job)) {
edg_wlc_JobIdFree(job);
free(job_s);
purge_throttle(jobs_to_exa, purge_end, &time_per_job, &target_runtime);
memset(&stat,0,sizeof stat);
- if (edg_wll_JobStatusServer(ctx,job,0,&stat)) { /* FIXME: replace by intJobStatus ?? */
+ if (edg_wll_JobStatusServer(ctx,job,EDG_WLL_STAT_CHILDSTAT | EDG_WLL_STAT_CHILDREN,&stat)) {
if (edg_wll_Error(ctx, NULL, NULL) == ENOENT) {
/* job purged meanwhile, ignore */
edg_wll_ResetError(ctx);
+ glite_jobid_free(job);
+ free(job_s);
continue;
}
edg_wll_FreeStatus(&stat);
goto abort;
}
- if (timeout[stat.state] >=0 && stat.lastUpdateTime.tv_sec && start-stat.lastUpdateTime.tv_sec > timeout[stat.state] && !check_strict_jobid(ctx,job))
- {
- if (purge_one(ctx,job,dumpfile,request->flags&EDG_WLL_PURGE_REALLY_PURGE,ctx->isProxy)) {
+ if (purge_check(ctx, &stat, start, timeout)) {
+ if (purge_one_with_subjobs(ctx, &stat, dumpfile, request, result, &naffected_jobs, &parse)) {
edg_wll_FreeStatus(&stat);
if (edg_wll_Error(ctx, NULL, NULL) == ENOENT) {
/* job purged meanwhile, ignore */
edg_wll_ResetError(ctx);
+ glite_jobid_free(job);
+ free(job_s);
continue;
}
goto abort;
}
-
- /* XXX: change with the streaming interface */
- if (request->flags & EDG_WLL_PURGE_LIST_JOBS) {
- result->jobs = realloc(result->jobs,(naffected_jobs+2) * sizeof(*(result->jobs)));
- result->jobs[naffected_jobs] = job_s;
- result->jobs[naffected_jobs+1] = NULL;
- job_s = NULL;
- }
- naffected_jobs++;
}
- edg_wlc_JobIdFree(job);
+ glite_jobid_free(job);
edg_wll_FreeStatus(&stat);
- free(job_s);
}
+ free(job_s);
jobs_to_exa--;
}
glite_lbu_FreeStmt(&s);
}
+static int purge_check(edg_wll_Context ctx, edg_wll_JobStat *stat, time_t start, time_t *timeout) {
+ int to_purge = 0;
+ int i;
+ edg_wll_JobStat *substat;
+
+ if (timeout[stat->state] >=0 && stat->lastUpdateTime.tv_sec && start-stat->lastUpdateTime.tv_sec > timeout[stat->state]
+ // job from this server
+ && !check_strict_jobid(ctx, stat->jobId)
+ // if subjob purge only when orphan
+ && (!stat->parent_job || job_exists(ctx, stat->parent_job) == 0)) {
+ to_purge = 1;
+ }
+ // check the whole collection yet
+ if (to_purge && stat->children_num && stat->children_states) {
+ for (i = 0; i < stat->children_num && stat->children_states[i].state != EDG_WLL_JOB_UNDEF; i++) {
+ substat = stat->children_states + i;
+ if (timeout[substat->state] >=0 && substat->lastUpdateTime.tv_sec && start-substat->lastUpdateTime.tv_sec > timeout[substat->state]) ;
+ else {
+ to_purge = 0;
+ break;
+ }
+ }
+ }
+
+ return to_purge;
+}
+
+
+#define GRAN 32
+
+static int purge_one_with_subjobs(edg_wll_Context ctx, edg_wll_JobStat *stat, int dumpfile, const edg_wll_PurgeRequest *request, edg_wll_PurgeResult *result, int *njobs, int *parse) {
+ glite_jobid_t subjob = NULL;
+ char *job_s;
+ int i;
+
+ if (purge_one(ctx,stat->jobId,dumpfile,request->flags&EDG_WLL_PURGE_REALLY_PURGE,ctx->isProxy)) return edg_wll_Error(ctx, NULL, NULL);
+
+/* XXX: change with the streaming interface */
+ if (request->flags & EDG_WLL_PURGE_LIST_JOBS) {
+ job_s = glite_jobid_unparse(stat->jobId);
+ if (*njobs % GRAN == 0 || !result->jobs)
+ result->jobs = realloc(result->jobs,(*njobs+GRAN+1) * sizeof(*(result->jobs)));
+ result->jobs[*njobs] = job_s;
+ result->jobs[*njobs+1] = NULL;
+ }
+ (*njobs)++;
+
+ /* purge the subjobs */
+ if (stat->children_num && stat->children) {
+ for (i = 0; i < stat->children_num && stat->children[i]; i++) {
+ if (glite_jobid_parse(stat->children[i], &subjob)) {
+ fprintf(stderr,"%s: parse error (internal inconsistency !)\n",stat->children[i]);
+ *parse = 1;
+ }
+ if (purge_one(ctx, subjob, dumpfile, request->flags&EDG_WLL_PURGE_REALLY_PURGE,ctx->isProxy)) {
+ glite_jobid_free(subjob);
+ return edg_wll_Error(ctx, NULL, NULL);
+ }
+ glite_jobid_free(subjob);
+
+ if (request->flags & EDG_WLL_PURGE_LIST_JOBS) {
+ if (*njobs % GRAN == 0 || !result->jobs)
+ result->jobs = realloc(result->jobs,(*njobs+GRAN+1) * sizeof(*(result->jobs)));
+ result->jobs[*njobs] = strdup(stat->children[i]);
+ result->jobs[*njobs+1] = NULL;
+ }
+ (*njobs)++;
+ }
+ }
+
+ return edg_wll_Error(ctx, NULL, NULL);
+}
+
+
int purge_one(edg_wll_Context ctx,glite_jobid_const_t job,int dump, int purge, int purge_from_proxy_only)
{
char *dbjob = NULL;
break;
}
- dbjob = edg_wlc_JobIdGetUnique(job); /* XXX: strict jobid already checked */
+ dbjob = glite_jobid_getUnique(job); /* XXX: strict jobid already checked */
if ( purge )
{
dumped = 1;
while ((ret = edg_wll_FetchRow(ctx,q,sizofa(res),NULL,res)) > 0) {
- int event;
+ int event, ret_dump = 0, i;
-
event = atoi(res[0]);
if (dump >= 0) {
- int ret_dump, i;
-
assert(ret == 10);
ret_dump = dump_events( ctx, job, dump, (char **) &res);
- for (i=0; i<sizofa(res); i++) free(res[i]);
- if (ret_dump) goto rollback;
}
+ for (i=0; i<sizofa(res); i++) free(res[i]);
+
+ if (dump >= 0 && ret_dump) goto rollback;
+
if ( purge )
if (edg_wll_delete_event(ctx,dbjob,event)) goto rollback;
}
edg_wll_ResetError(ctx);
- dbjob = edg_wlc_JobIdGetUnique(job);
-
+ dbjob = glite_jobid_getUnique(job);
trio_asprintf(&stmt,"update jobs set proxy='0' where jobid='%|Ss'", dbjob);
+ free(dbjob);
return(edg_wll_ExecSQL(ctx,stmt,NULL));
}
edg_wll_ResetError(ctx);
- dbjob = edg_wlc_JobIdGetUnique(job);
-
+ dbjob = glite_jobid_getUnique(job);
trio_asprintf(&stmt,"update jobs set server='0' where jobid='%|Ss'", dbjob);
+ free(dbjob);
return(edg_wll_ExecSQL(ctx,stmt,NULL));
}
+int job_exists(edg_wll_Context ctx, glite_jobid_const_t job) {
+ char *stmt, *dbjob;
+ int retval;
+
+ edg_wll_ResetError(ctx);
+
+ if (check_strict_jobid(ctx, job)) return 0;
+
+ dbjob = glite_jobid_getUnique(job);
+ trio_asprintf(&stmt, "SELECT jobid FROM jobs WHERE jobid='%|Ss'", dbjob);
+ retval = edg_wll_ExecSQL(ctx, stmt, NULL);
+ free(dbjob);
+ free(stmt);
+
+ return retval;
+}
+
+
/* throttle purging according to the required target_runtime */
static void purge_throttle(int jobs_to_exa, double purge_end, double *time_per_job, time_t *target_runtime) {
struct timeval tp;