#define sizofa(a) (sizeof(a)/sizeof((a)[0]))
+typedef struct {
+ int naffected_jobs;
+ int parse;
+ int dumpfile;
+
+ int jobs_to_exa;
+ double purge_end;
+ double time_per_job;
+ time_t target_runtime;
+} purge_ctx_t;
+
extern volatile sig_atomic_t purge_quit;
static const char* const resp_headers[] = {
};
static int purge_check(edg_wll_Context ctx, edg_wll_JobStat *stat, time_t start, time_t *timeout);
-static int purge_one_with_subjobs(edg_wll_Context ctx, edg_wll_JobStat *stat, int dumpfile, const edg_wll_PurgeRequest *request, edg_wll_PurgeResult *result, int *njobs, int *parse);
+static int purge_one_with_subjobs(edg_wll_Context ctx, purge_ctx_t *prg, edg_wll_JobStat *stat, const edg_wll_PurgeRequest *request, edg_wll_PurgeResult *result);
static int purge_one(edg_wll_Context ctx,edg_wll_JobStat *,int,int,int);
int unset_proxy_flag(edg_wll_Context ctx, glite_jobid_const_t job);
static int unset_server_flag(edg_wll_Context ctx, glite_jobid_const_t job);
int job_exists(edg_wll_Context ctx, glite_jobid_const_t job);
-static void purge_throttle(int jobs_to_exa, double purge_end, double *time_per_job, time_t *target_runtime);
+static void purge_throttle(purge_ctx_t *prg);
int edg_wll_CreateTmpFileStorage(edg_wll_Context ctx, char *prefix, char **fname)
int edg_wll_PurgeServer(edg_wll_Context ctx,const edg_wll_PurgeRequest *request, edg_wll_PurgeResult *result)
{
- int i,parse = 0,dumpfile = -1;
+ int i;
glite_jobid_t job;
char *tmpfname = NULL;
- int naffected_jobs = 0, ret;
- double now, time_per_job, purge_end;
+ int ret;
+ double now;
struct timeval tp;
- int jobs_to_exa;
- time_t target_runtime;
edg_wll_JobStat stat;
+ purge_ctx_t prg;
if (!ctx->noAuth) {
edg_wll_ResetError(ctx);
memset(result, 0, sizeof(*result));
+ memset(&prg, 0, sizeof prg);
+ prg.naffected_jobs = 0;
+ prg.parse = 0;
+ prg.dumpfile = -1;
if ( (request->flags & EDG_WLL_PURGE_SERVER_DUMP) &&
- ((dumpfile = edg_wll_CreateTmpPurgeFile(ctx, &tmpfname)) == -1 ) )
+ ((prg.dumpfile = edg_wll_CreateTmpPurgeFile(ctx, &tmpfname)) == -1 ) )
goto abort;
/*
/* throttle parameters */
gettimeofday(&tp, NULL);
now = tp.tv_sec + (double)tp.tv_usec / 1000000.0;
- purge_end = now + request->target_runtime;
- target_runtime = request->target_runtime;
- time_per_job = -1.0;
+ prg.purge_end = now + request->target_runtime;
+ prg.target_runtime = request->target_runtime;
+ prg.time_per_job = -1.0;
if (request->jobs) {
- for (jobs_to_exa=0; request->jobs[jobs_to_exa]; jobs_to_exa++);
+ for (prg.jobs_to_exa=0; request->jobs[prg.jobs_to_exa]; prg.jobs_to_exa++);
for (i=0; request->jobs[i] && !purge_quit; i++) {
if (edg_wlc_JobIdParse(request->jobs[i],&job)) {
glite_common_log(LOG_CATEGORY_CONTROL, LOG_PRIORITY_ERROR, "%s: parse error\n", request->jobs[i]);
- parse = 1;
+ prg.parse = 1;
+ prg.jobs_to_exa--;
}
else {
if (check_strict_jobid(ctx,job)) {
glite_common_log(LOG_CATEGORY_CONTROL, LOG_PRIORITY_WARN, "%s: not my job\n", request->jobs[i]);
- parse = 1;
+ prg.parse = 1;
+ prg.jobs_to_exa--;
}
else {
- purge_throttle(jobs_to_exa, purge_end, &time_per_job, &target_runtime);
if (purge_quit) break;
memset(&stat,0,sizeof stat);
/* job purged meanwhile, ignore */
edg_wll_ResetError(ctx);
glite_jobid_free(job);
+ purge_throttle(&prg);
continue;
}
edg_wll_FreeStatus(&stat);
goto abort;
}
- switch (purge_one_with_subjobs(ctx, &stat, dumpfile, request, result, &naffected_jobs, &parse)) {
+ switch (purge_one_with_subjobs(ctx, &prg, &stat, request, result)) {
case 0:
break;
case ENOENT: /* job does not exist, consider purged and ignore */
}
glite_jobid_free(job);
}
- jobs_to_exa--;
}
}
else {
? "select dg_jobid from jobs where proxy='1'"
: "select dg_jobid from jobs where server='1'");
- if ((jobs_to_exa = edg_wll_ExecSQL(ctx, (ctx->isProxy) ? "select dg_jobid from jobs where proxy='1'" :
+ if ((prg.jobs_to_exa = edg_wll_ExecSQL(ctx, (ctx->isProxy) ? "select dg_jobid from jobs where proxy='1'" :
"select dg_jobid from jobs where server='1'", &s)) < 0) goto abort;
while (edg_wll_FetchRow(ctx,s,1,NULL,&job_s) > 0 && !purge_quit) {
if (edg_wlc_JobIdParse(job_s,&job)) {
glite_common_log(LOG_CATEGORY_CONTROL, LOG_PRIORITY_ERROR, "%s: parse error (internal inconsistency !)", job_s);
- parse = 1;
+ prg.parse = 1;
+ prg.jobs_to_exa--;
}
else {
if (check_strict_jobid(ctx,job)) {
edg_wlc_JobIdFree(job);
free(job_s);
- parse = 1;
+ prg.parse = 1;
+ prg.jobs_to_exa--;
continue;
}
- purge_throttle(jobs_to_exa, purge_end, &time_per_job, &target_runtime);
if (purge_quit) break;
memset(&stat,0,sizeof stat);
edg_wll_ResetError(ctx);
glite_jobid_free(job);
free(job_s);
+ purge_throttle(&prg);
continue;
}
edg_wll_FreeStatus(&stat);
}
if (purge_check(ctx, &stat, start, timeout)) {
- if (purge_one_with_subjobs(ctx, &stat, dumpfile, request, result, &naffected_jobs, &parse)) {
+ if (purge_one_with_subjobs(ctx, &prg, &stat, request, result)) {
edg_wll_FreeStatus(&stat);
if (edg_wll_Error(ctx, NULL, NULL) == ENOENT) {
/* job purged meanwhile, ignore */
edg_wll_FreeStatus(&stat);
}
free(job_s);
- jobs_to_exa--;
}
glite_lbu_FreeStmt(&s);
}
abort:
- if (parse && !edg_wll_Error(ctx,NULL,NULL))
+ if (prg.parse && !edg_wll_Error(ctx,NULL,NULL))
{
- if ( naffected_jobs ) {
+ if ( prg.naffected_jobs ) {
glite_common_log(LOG_CATEGORY_CONTROL, LOG_PRIORITY_INFO, "[%d] Found only jobs not matching server address/port; these were not purged.", getpid());
}
else {
#define GRAN 32
-static int purge_one_with_subjobs(edg_wll_Context ctx, edg_wll_JobStat *stat, int dumpfile, const edg_wll_PurgeRequest *request, edg_wll_PurgeResult *result, int *njobs, int *parse) {
+static int purge_one_with_subjobs(edg_wll_Context ctx, purge_ctx_t *prg, edg_wll_JobStat *stat, const edg_wll_PurgeRequest *request, edg_wll_PurgeResult *result) {
char *job_s;
int i;
- if (purge_one(ctx,stat,dumpfile,request->flags&EDG_WLL_PURGE_REALLY_PURGE,ctx->isProxy)) return edg_wll_Error(ctx, NULL, NULL);
+ if (purge_one(ctx,stat,prg->dumpfile,request->flags&EDG_WLL_PURGE_REALLY_PURGE,ctx->isProxy)) return edg_wll_Error(ctx, NULL, NULL);
+ purge_throttle(prg);
/* XXX: change with the streaming interface */
if (request->flags & EDG_WLL_PURGE_LIST_JOBS) {
job_s = glite_jobid_unparse(stat->jobId);
- if (*njobs % GRAN == 0 || !result->jobs)
- result->jobs = realloc(result->jobs,(*njobs+GRAN+1) * sizeof(*(result->jobs)));
- result->jobs[*njobs] = job_s;
- result->jobs[*njobs+1] = NULL;
+ if (prg->naffected_jobs % GRAN == 0 || !result->jobs)
+ result->jobs = realloc(result->jobs,(prg->naffected_jobs+GRAN+1) * sizeof(*(result->jobs)));
+ result->jobs[prg->naffected_jobs] = job_s;
+ result->jobs[prg->naffected_jobs+1] = NULL;
}
- (*njobs)++;
+ prg->naffected_jobs++;
/* purge the subjobs */
if (stat->children_num && stat->children) {
for (i = 0; i < stat->children_num && stat->children[i]; i++) {
- if (purge_one(ctx, stat->children_states + i, dumpfile, request->flags&EDG_WLL_PURGE_REALLY_PURGE,ctx->isProxy)) {
+ if (purge_one(ctx, stat->children_states + i, prg->dumpfile, request->flags&EDG_WLL_PURGE_REALLY_PURGE,ctx->isProxy)) {
return edg_wll_Error(ctx, NULL, NULL);
}
+ purge_throttle(prg);
if (request->flags & EDG_WLL_PURGE_LIST_JOBS) {
- if (*njobs % GRAN == 0 || !result->jobs)
- result->jobs = realloc(result->jobs,(*njobs+GRAN+1) * sizeof(*(result->jobs)));
- result->jobs[*njobs] = strdup(stat->children[i]);
- result->jobs[*njobs+1] = NULL;
+ if (prg->naffected_jobs % GRAN == 0 || !result->jobs)
+ result->jobs = realloc(result->jobs,(prg->naffected_jobs+GRAN+1) * sizeof(*(result->jobs)));
+ result->jobs[prg->naffected_jobs] = strdup(stat->children[i]);
+ result->jobs[prg->naffected_jobs+1] = NULL;
}
- (*njobs)++;
+ prg->naffected_jobs++;
}
}
/* throttle purging according to the required target_runtime */
-static void purge_throttle(int jobs_to_exa, double purge_end, double *time_per_job, time_t *target_runtime) {
+static void purge_throttle(purge_ctx_t *prg) {
struct timeval tp;
double target_this, now;
-
- if (*time_per_job < 0.0) {
- *time_per_job = jobs_to_exa ? (double)*target_runtime / jobs_to_exa : 0.0;
- //fprintf(stderr, "[%d] target runtime: %ld, end: %lf, jobs: %d, tpj: %lf\n",
- // getpid(), *target_runtime, purge_end, jobs_to_exa, *time_per_job);
+ if (prg->time_per_job < 0.0) {
+ prg->time_per_job = prg->jobs_to_exa ? (double)(prg->target_runtime) / prg->jobs_to_exa : 0.0;
+ //glite_common_log(LOG_CATEGORY_CONTROL, LOG_PRIORITY_DEBUG, "purge, target runtime: %ld, end: %lf, jobs: %d, s/job: %lf\n",
+ // prg->target_runtime, prg->purge_end, prg->jobs_to_exa, prg->time_per_job);
}
- if (*target_runtime) {
- target_this = purge_end - *time_per_job * jobs_to_exa;
+ if (--prg->jobs_to_exa < 0) prg->jobs_to_exa = 0;
+
+ if (prg->target_runtime) {
+ target_this = prg->purge_end - prg->time_per_job * prg->jobs_to_exa;
gettimeofday(&tp, NULL);
now = tp.tv_sec + (double)tp.tv_usec / 1000000.0;
if (target_this > now) { /* enough time */
- //fprintf(stderr, "[%d] sleeping for %lf second...\n", getpid(), target_this - now);
+ //glite_common_log(LOG_CATEGORY_CONTROL, LOG_PRIORITY_DEBUG, "purge, sleeping for %lf seconds...\n", target_this - now);
usleep(1e6*(target_this - now));
}
if (target_this < now) { /* speed up */
- *time_per_job = (purge_end-now)/jobs_to_exa;
+ prg->time_per_job = (prg->purge_end-now)/prg->jobs_to_exa;
/* limit not catched, maximal speed */
- if (*time_per_job <= 0) {
- *time_per_job = 0.0;
- *target_runtime = 0;
+ if (prg->time_per_job <= 0) {
+ prg->time_per_job = 0.0;
+ prg->target_runtime = 0;
}
}
- //fprintf(stderr, "[%d] tpj: %lf\n", getpid(), *time_per_job);
+ //glite_common_log(LOG_CATEGORY_CONTROL, LOG_PRIORITY_DEBUG, "purge, s/job: %lf\n", prg->time_per_job);
}
-
}