From: František Dvořák Date: Mon, 15 Feb 2010 20:09:45 +0000 (+0000) Subject: More steady throttling for job collections (waiting per each purged job or subjob). X-Git-Tag: glite-jobid_R_1_0_1_1~15 X-Git-Url: http://scientific.zcu.cz/git/?a=commitdiff_plain;h=7df010d0f1309d7da0d301463551fc00c9bcd1a7;p=jra1mw.git More steady throttling for job collections (waiting per each purged job or subjob). --- diff --git a/org.glite.lb.server/src/srv_purge.c b/org.glite.lb.server/src/srv_purge.c index dfffd53..b61c024 100644 --- a/org.glite.lb.server/src/srv_purge.c +++ b/org.glite.lb.server/src/srv_purge.c @@ -38,6 +38,17 @@ #define sizofa(a) (sizeof(a)/sizeof((a)[0])) +typedef struct { + int naffected_jobs; + int parse; + int dumpfile; + + int jobs_to_exa; + double purge_end; + double time_per_job; + time_t target_runtime; +} purge_ctx_t; + extern volatile sig_atomic_t purge_quit; static const char* const resp_headers[] = { @@ -48,12 +59,12 @@ static const char* const resp_headers[] = { }; static int purge_check(edg_wll_Context ctx, edg_wll_JobStat *stat, time_t start, time_t *timeout); -static int purge_one_with_subjobs(edg_wll_Context ctx, edg_wll_JobStat *stat, int dumpfile, const edg_wll_PurgeRequest *request, edg_wll_PurgeResult *result, int *njobs, int *parse); +static int purge_one_with_subjobs(edg_wll_Context ctx, purge_ctx_t *prg, edg_wll_JobStat *stat, const edg_wll_PurgeRequest *request, edg_wll_PurgeResult *result); static int purge_one(edg_wll_Context ctx,edg_wll_JobStat *,int,int,int); int unset_proxy_flag(edg_wll_Context ctx, glite_jobid_const_t job); static int unset_server_flag(edg_wll_Context ctx, glite_jobid_const_t job); int job_exists(edg_wll_Context ctx, glite_jobid_const_t job); -static void purge_throttle(int jobs_to_exa, double purge_end, double *time_per_job, time_t *target_runtime); +static void purge_throttle(purge_ctx_t *prg); int edg_wll_CreateTmpFileStorage(edg_wll_Context ctx, char *prefix, char **fname) @@ -212,15 +223,14 @@ int edg_wll_PurgeServerProxy(edg_wll_Context ctx, glite_jobid_const_t job) int edg_wll_PurgeServer(edg_wll_Context ctx,const edg_wll_PurgeRequest *request, edg_wll_PurgeResult *result) { - int i,parse = 0,dumpfile = -1; + int i; glite_jobid_t job; char *tmpfname = NULL; - int naffected_jobs = 0, ret; - double now, time_per_job, purge_end; + int ret; + double now; struct timeval tp; - int jobs_to_exa; - time_t target_runtime; edg_wll_JobStat stat; + purge_ctx_t prg; if (!ctx->noAuth) { @@ -231,9 +241,13 @@ int edg_wll_PurgeServer(edg_wll_Context ctx,const edg_wll_PurgeRequest *request, edg_wll_ResetError(ctx); memset(result, 0, sizeof(*result)); + memset(&prg, 0, sizeof prg); + prg.naffected_jobs = 0; + prg.parse = 0; + prg.dumpfile = -1; if ( (request->flags & EDG_WLL_PURGE_SERVER_DUMP) && - ((dumpfile = edg_wll_CreateTmpPurgeFile(ctx, &tmpfname)) == -1 ) ) + ((prg.dumpfile = edg_wll_CreateTmpPurgeFile(ctx, &tmpfname)) == -1 ) ) goto abort; /* @@ -259,25 +273,26 @@ int edg_wll_PurgeServer(edg_wll_Context ctx,const edg_wll_PurgeRequest *request, /* throttle parameters */ gettimeofday(&tp, NULL); now = tp.tv_sec + (double)tp.tv_usec / 1000000.0; - purge_end = now + request->target_runtime; - target_runtime = request->target_runtime; - time_per_job = -1.0; + prg.purge_end = now + request->target_runtime; + prg.target_runtime = request->target_runtime; + prg.time_per_job = -1.0; if (request->jobs) { - for (jobs_to_exa=0; request->jobs[jobs_to_exa]; jobs_to_exa++); + for (prg.jobs_to_exa=0; request->jobs[prg.jobs_to_exa]; prg.jobs_to_exa++); for (i=0; request->jobs[i] && !purge_quit; i++) { if (edg_wlc_JobIdParse(request->jobs[i],&job)) { glite_common_log(LOG_CATEGORY_CONTROL, LOG_PRIORITY_ERROR, "%s: parse error\n", request->jobs[i]); - parse = 1; + prg.parse = 1; + prg.jobs_to_exa--; } else { if (check_strict_jobid(ctx,job)) { glite_common_log(LOG_CATEGORY_CONTROL, LOG_PRIORITY_WARN, "%s: not my job\n", request->jobs[i]); - parse = 1; + prg.parse = 1; + prg.jobs_to_exa--; } else { - purge_throttle(jobs_to_exa, purge_end, &time_per_job, &target_runtime); if (purge_quit) break; memset(&stat,0,sizeof stat); @@ -286,13 +301,14 @@ int edg_wll_PurgeServer(edg_wll_Context ctx,const edg_wll_PurgeRequest *request, /* job purged meanwhile, ignore */ edg_wll_ResetError(ctx); glite_jobid_free(job); + purge_throttle(&prg); continue; } edg_wll_FreeStatus(&stat); goto abort; } - switch (purge_one_with_subjobs(ctx, &stat, dumpfile, request, result, &naffected_jobs, &parse)) { + switch (purge_one_with_subjobs(ctx, &prg, &stat, request, result)) { case 0: break; case ENOENT: /* job does not exist, consider purged and ignore */ @@ -305,7 +321,6 @@ int edg_wll_PurgeServer(edg_wll_Context ctx,const edg_wll_PurgeRequest *request, } glite_jobid_free(job); } - jobs_to_exa--; } } else { @@ -329,23 +344,24 @@ int edg_wll_PurgeServer(edg_wll_Context ctx,const edg_wll_PurgeRequest *request, ? "select dg_jobid from jobs where proxy='1'" : "select dg_jobid from jobs where server='1'"); - if ((jobs_to_exa = edg_wll_ExecSQL(ctx, (ctx->isProxy) ? "select dg_jobid from jobs where proxy='1'" : + if ((prg.jobs_to_exa = edg_wll_ExecSQL(ctx, (ctx->isProxy) ? "select dg_jobid from jobs where proxy='1'" : "select dg_jobid from jobs where server='1'", &s)) < 0) goto abort; while (edg_wll_FetchRow(ctx,s,1,NULL,&job_s) > 0 && !purge_quit) { if (edg_wlc_JobIdParse(job_s,&job)) { glite_common_log(LOG_CATEGORY_CONTROL, LOG_PRIORITY_ERROR, "%s: parse error (internal inconsistency !)", job_s); - parse = 1; + prg.parse = 1; + prg.jobs_to_exa--; } else { if (check_strict_jobid(ctx,job)) { edg_wlc_JobIdFree(job); free(job_s); - parse = 1; + prg.parse = 1; + prg.jobs_to_exa--; continue; } - purge_throttle(jobs_to_exa, purge_end, &time_per_job, &target_runtime); if (purge_quit) break; memset(&stat,0,sizeof stat); @@ -355,6 +371,7 @@ int edg_wll_PurgeServer(edg_wll_Context ctx,const edg_wll_PurgeRequest *request, edg_wll_ResetError(ctx); glite_jobid_free(job); free(job_s); + purge_throttle(&prg); continue; } edg_wll_FreeStatus(&stat); @@ -362,7 +379,7 @@ int edg_wll_PurgeServer(edg_wll_Context ctx,const edg_wll_PurgeRequest *request, } if (purge_check(ctx, &stat, start, timeout)) { - if (purge_one_with_subjobs(ctx, &stat, dumpfile, request, result, &naffected_jobs, &parse)) { + if (purge_one_with_subjobs(ctx, &prg, &stat, request, result)) { edg_wll_FreeStatus(&stat); if (edg_wll_Error(ctx, NULL, NULL) == ENOENT) { /* job purged meanwhile, ignore */ @@ -379,15 +396,14 @@ int edg_wll_PurgeServer(edg_wll_Context ctx,const edg_wll_PurgeRequest *request, edg_wll_FreeStatus(&stat); } free(job_s); - jobs_to_exa--; } glite_lbu_FreeStmt(&s); } abort: - if (parse && !edg_wll_Error(ctx,NULL,NULL)) + if (prg.parse && !edg_wll_Error(ctx,NULL,NULL)) { - if ( naffected_jobs ) { + if ( prg.naffected_jobs ) { glite_common_log(LOG_CATEGORY_CONTROL, LOG_PRIORITY_INFO, "[%d] Found only jobs not matching server address/port; these were not purged.", getpid()); } else { @@ -630,36 +646,38 @@ static int purge_check(edg_wll_Context ctx, edg_wll_JobStat *stat, time_t start, #define GRAN 32 -static int purge_one_with_subjobs(edg_wll_Context ctx, edg_wll_JobStat *stat, int dumpfile, const edg_wll_PurgeRequest *request, edg_wll_PurgeResult *result, int *njobs, int *parse) { +static int purge_one_with_subjobs(edg_wll_Context ctx, purge_ctx_t *prg, edg_wll_JobStat *stat, const edg_wll_PurgeRequest *request, edg_wll_PurgeResult *result) { char *job_s; int i; - if (purge_one(ctx,stat,dumpfile,request->flags&EDG_WLL_PURGE_REALLY_PURGE,ctx->isProxy)) return edg_wll_Error(ctx, NULL, NULL); + if (purge_one(ctx,stat,prg->dumpfile,request->flags&EDG_WLL_PURGE_REALLY_PURGE,ctx->isProxy)) return edg_wll_Error(ctx, NULL, NULL); + purge_throttle(prg); /* XXX: change with the streaming interface */ if (request->flags & EDG_WLL_PURGE_LIST_JOBS) { job_s = glite_jobid_unparse(stat->jobId); - if (*njobs % GRAN == 0 || !result->jobs) - result->jobs = realloc(result->jobs,(*njobs+GRAN+1) * sizeof(*(result->jobs))); - result->jobs[*njobs] = job_s; - result->jobs[*njobs+1] = NULL; + if (prg->naffected_jobs % GRAN == 0 || !result->jobs) + result->jobs = realloc(result->jobs,(prg->naffected_jobs+GRAN+1) * sizeof(*(result->jobs))); + result->jobs[prg->naffected_jobs] = job_s; + result->jobs[prg->naffected_jobs+1] = NULL; } - (*njobs)++; + prg->naffected_jobs++; /* purge the subjobs */ if (stat->children_num && stat->children) { for (i = 0; i < stat->children_num && stat->children[i]; i++) { - if (purge_one(ctx, stat->children_states + i, dumpfile, request->flags&EDG_WLL_PURGE_REALLY_PURGE,ctx->isProxy)) { + if (purge_one(ctx, stat->children_states + i, prg->dumpfile, request->flags&EDG_WLL_PURGE_REALLY_PURGE,ctx->isProxy)) { return edg_wll_Error(ctx, NULL, NULL); } + purge_throttle(prg); if (request->flags & EDG_WLL_PURGE_LIST_JOBS) { - if (*njobs % GRAN == 0 || !result->jobs) - result->jobs = realloc(result->jobs,(*njobs+GRAN+1) * sizeof(*(result->jobs))); - result->jobs[*njobs] = strdup(stat->children[i]); - result->jobs[*njobs+1] = NULL; + if (prg->naffected_jobs % GRAN == 0 || !result->jobs) + result->jobs = realloc(result->jobs,(prg->naffected_jobs+GRAN+1) * sizeof(*(result->jobs))); + result->jobs[prg->naffected_jobs] = strdup(stat->children[i]); + result->jobs[prg->naffected_jobs+1] = NULL; } - (*njobs)++; + prg->naffected_jobs++; } } @@ -982,35 +1000,35 @@ int job_exists(edg_wll_Context ctx, glite_jobid_const_t job) { /* throttle purging according to the required target_runtime */ -static void purge_throttle(int jobs_to_exa, double purge_end, double *time_per_job, time_t *target_runtime) { +static void purge_throttle(purge_ctx_t *prg) { struct timeval tp; double target_this, now; - - if (*time_per_job < 0.0) { - *time_per_job = jobs_to_exa ? (double)*target_runtime / jobs_to_exa : 0.0; - //fprintf(stderr, "[%d] target runtime: %ld, end: %lf, jobs: %d, tpj: %lf\n", - // getpid(), *target_runtime, purge_end, jobs_to_exa, *time_per_job); + if (prg->time_per_job < 0.0) { + prg->time_per_job = prg->jobs_to_exa ? (double)(prg->target_runtime) / prg->jobs_to_exa : 0.0; + //glite_common_log(LOG_CATEGORY_CONTROL, LOG_PRIORITY_DEBUG, "purge, target runtime: %ld, end: %lf, jobs: %d, s/job: %lf\n", + // prg->target_runtime, prg->purge_end, prg->jobs_to_exa, prg->time_per_job); } - if (*target_runtime) { - target_this = purge_end - *time_per_job * jobs_to_exa; + if (--prg->jobs_to_exa < 0) prg->jobs_to_exa = 0; + + if (prg->target_runtime) { + target_this = prg->purge_end - prg->time_per_job * prg->jobs_to_exa; gettimeofday(&tp, NULL); now = tp.tv_sec + (double)tp.tv_usec / 1000000.0; if (target_this > now) { /* enough time */ - //fprintf(stderr, "[%d] sleeping for %lf second...\n", getpid(), target_this - now); + //glite_common_log(LOG_CATEGORY_CONTROL, LOG_PRIORITY_DEBUG, "purge, sleeping for %lf seconds...\n", target_this - now); usleep(1e6*(target_this - now)); } if (target_this < now) { /* speed up */ - *time_per_job = (purge_end-now)/jobs_to_exa; + prg->time_per_job = (prg->purge_end-now)/prg->jobs_to_exa; /* limit not catched, maximal speed */ - if (*time_per_job <= 0) { - *time_per_job = 0.0; - *target_runtime = 0; + if (prg->time_per_job <= 0) { + prg->time_per_job = 0.0; + prg->target_runtime = 0; } } - //fprintf(stderr, "[%d] tpj: %lf\n", getpid(), *time_per_job); + //glite_common_log(LOG_CATEGORY_CONTROL, LOG_PRIORITY_DEBUG, "purge, s/job: %lf\n", prg->time_per_job); } - }