More steady throttling for job collections (waiting per each purged job or subjob).
authorFrantišek Dvořák <valtri@civ.zcu.cz>
Mon, 15 Feb 2010 20:09:45 +0000 (20:09 +0000)
committerFrantišek Dvořák <valtri@civ.zcu.cz>
Mon, 15 Feb 2010 20:09:45 +0000 (20:09 +0000)
org.glite.lb.server/src/srv_purge.c

index dfffd53..b61c024 100644 (file)
 
 #define sizofa(a) (sizeof(a)/sizeof((a)[0]))
 
+typedef struct {
+       int naffected_jobs;
+       int parse;
+       int dumpfile;
+
+       int jobs_to_exa;
+       double purge_end;
+       double time_per_job;
+       time_t target_runtime;
+} purge_ctx_t;
+
 extern volatile sig_atomic_t purge_quit;
 
 static const char* const resp_headers[] = {
@@ -48,12 +59,12 @@ static const char* const resp_headers[] = {
 };
 
 static int purge_check(edg_wll_Context ctx, edg_wll_JobStat *stat, time_t start, time_t *timeout);
-static int purge_one_with_subjobs(edg_wll_Context ctx, edg_wll_JobStat *stat, int dumpfile, const edg_wll_PurgeRequest *request, edg_wll_PurgeResult *result, int *njobs, int *parse);
+static int purge_one_with_subjobs(edg_wll_Context ctx, purge_ctx_t *prg, edg_wll_JobStat *stat, const edg_wll_PurgeRequest *request, edg_wll_PurgeResult *result);
 static int purge_one(edg_wll_Context ctx,edg_wll_JobStat *,int,int,int);
 int unset_proxy_flag(edg_wll_Context ctx, glite_jobid_const_t job);
 static int unset_server_flag(edg_wll_Context ctx, glite_jobid_const_t job);
 int job_exists(edg_wll_Context ctx, glite_jobid_const_t job);
-static void purge_throttle(int jobs_to_exa, double purge_end, double *time_per_job, time_t *target_runtime);
+static void purge_throttle(purge_ctx_t *prg);
 
 
 int edg_wll_CreateTmpFileStorage(edg_wll_Context ctx, char *prefix, char **fname)
@@ -212,15 +223,14 @@ int edg_wll_PurgeServerProxy(edg_wll_Context ctx, glite_jobid_const_t job)
 
 int edg_wll_PurgeServer(edg_wll_Context ctx,const edg_wll_PurgeRequest *request, edg_wll_PurgeResult *result)
 {
-       int     i,parse = 0,dumpfile = -1;
+       int     i;
        glite_jobid_t   job;
        char    *tmpfname = NULL;
-       int     naffected_jobs = 0, ret;
-       double          now, time_per_job, purge_end;
+       int     ret;
+       double          now;
        struct timeval  tp;
-       int             jobs_to_exa;
-       time_t  target_runtime;
        edg_wll_JobStat stat;
+       purge_ctx_t prg;
 
 
        if (!ctx->noAuth) {
@@ -231,9 +241,13 @@ int edg_wll_PurgeServer(edg_wll_Context ctx,const edg_wll_PurgeRequest *request,
        edg_wll_ResetError(ctx);
        memset(result, 0, sizeof(*result));
 
+       memset(&prg, 0, sizeof prg);
+       prg.naffected_jobs = 0;
+       prg.parse = 0;
+       prg.dumpfile = -1;
 
        if ( (request->flags & EDG_WLL_PURGE_SERVER_DUMP) && 
-                ((dumpfile = edg_wll_CreateTmpPurgeFile(ctx, &tmpfname)) == -1 ) )
+                ((prg.dumpfile = edg_wll_CreateTmpPurgeFile(ctx, &tmpfname)) == -1 ) )
                goto abort;
 
        /* 
@@ -259,25 +273,26 @@ int edg_wll_PurgeServer(edg_wll_Context ctx,const edg_wll_PurgeRequest *request,
        /* throttle parameters */
        gettimeofday(&tp, NULL);
        now = tp.tv_sec + (double)tp.tv_usec / 1000000.0;
-       purge_end = now + request->target_runtime;
-       target_runtime = request->target_runtime;
-       time_per_job = -1.0;
+       prg.purge_end = now + request->target_runtime;
+       prg.target_runtime = request->target_runtime;
+       prg.time_per_job = -1.0;
 
        if (request->jobs) {
 
-       for (jobs_to_exa=0; request->jobs[jobs_to_exa]; jobs_to_exa++);
+       for (prg.jobs_to_exa=0; request->jobs[prg.jobs_to_exa]; prg.jobs_to_exa++);
        for (i=0; request->jobs[i] && !purge_quit; i++) {
                if (edg_wlc_JobIdParse(request->jobs[i],&job)) {
                        glite_common_log(LOG_CATEGORY_CONTROL, LOG_PRIORITY_ERROR, "%s: parse error\n", request->jobs[i]);
-                       parse = 1;
+                       prg.parse = 1;
+                       prg.jobs_to_exa--;
                }
                else {
                        if (check_strict_jobid(ctx,job)) {
                                glite_common_log(LOG_CATEGORY_CONTROL, LOG_PRIORITY_WARN, "%s: not my job\n", request->jobs[i]);
-                               parse = 1;
+                               prg.parse = 1;
+                               prg.jobs_to_exa--;
                        }
                        else {
-                               purge_throttle(jobs_to_exa, purge_end, &time_per_job, &target_runtime);
                                if (purge_quit) break;
 
                                memset(&stat,0,sizeof stat);
@@ -286,13 +301,14 @@ int edg_wll_PurgeServer(edg_wll_Context ctx,const edg_wll_PurgeRequest *request,
                                                /* job purged meanwhile, ignore */
                                                edg_wll_ResetError(ctx);
                                                glite_jobid_free(job);
+                                               purge_throttle(&prg);
                                                continue;
                                        }
                                        edg_wll_FreeStatus(&stat);
                                        goto abort; 
                                }
 
-                               switch (purge_one_with_subjobs(ctx, &stat, dumpfile, request, result, &naffected_jobs, &parse)) {
+                               switch (purge_one_with_subjobs(ctx, &prg, &stat, request, result)) {
                                        case 0: 
                                                break;
                                        case ENOENT: /* job does not exist, consider purged and ignore */
@@ -305,7 +321,6 @@ int edg_wll_PurgeServer(edg_wll_Context ctx,const edg_wll_PurgeRequest *request,
                        }
                        glite_jobid_free(job);
                }
-               jobs_to_exa--;
        }
        }
        else {
@@ -329,23 +344,24 @@ int edg_wll_PurgeServer(edg_wll_Context ctx,const edg_wll_PurgeRequest *request,
                        ? "select dg_jobid from jobs where proxy='1'" 
                        : "select dg_jobid from jobs where server='1'");
 
-               if ((jobs_to_exa = edg_wll_ExecSQL(ctx, (ctx->isProxy) ? "select dg_jobid from jobs where proxy='1'" :
+               if ((prg.jobs_to_exa = edg_wll_ExecSQL(ctx, (ctx->isProxy) ? "select dg_jobid from jobs where proxy='1'" :
                        "select dg_jobid from jobs where server='1'", &s)) < 0) goto abort;
 
                while (edg_wll_FetchRow(ctx,s,1,NULL,&job_s) > 0 && !purge_quit) {
                        if (edg_wlc_JobIdParse(job_s,&job)) {
                                glite_common_log(LOG_CATEGORY_CONTROL, LOG_PRIORITY_ERROR, "%s: parse error (internal inconsistency !)", job_s);
-                               parse = 1;
+                               prg.parse = 1;
+                               prg.jobs_to_exa--;
                        }
                        else {
                                if (check_strict_jobid(ctx,job)) {
                                        edg_wlc_JobIdFree(job);
                                        free(job_s);
-                                       parse = 1;
+                                       prg.parse = 1;
+                                       prg.jobs_to_exa--;
                                        continue;
                                }
 
-                               purge_throttle(jobs_to_exa, purge_end, &time_per_job, &target_runtime);
                                if (purge_quit) break;
 
                                memset(&stat,0,sizeof stat);
@@ -355,6 +371,7 @@ int edg_wll_PurgeServer(edg_wll_Context ctx,const edg_wll_PurgeRequest *request,
                                                edg_wll_ResetError(ctx);
                                                glite_jobid_free(job);
                                                free(job_s);
+                                               purge_throttle(&prg);
                                                continue;
                                        }
                                        edg_wll_FreeStatus(&stat);
@@ -362,7 +379,7 @@ int edg_wll_PurgeServer(edg_wll_Context ctx,const edg_wll_PurgeRequest *request,
                                }
 
                                if (purge_check(ctx, &stat, start, timeout)) {
-                                       if (purge_one_with_subjobs(ctx, &stat, dumpfile, request, result, &naffected_jobs, &parse)) {
+                                       if (purge_one_with_subjobs(ctx, &prg, &stat, request, result)) {
                                                edg_wll_FreeStatus(&stat);
                                                if (edg_wll_Error(ctx, NULL, NULL) == ENOENT) {
                                                        /* job purged meanwhile, ignore */
@@ -379,15 +396,14 @@ int edg_wll_PurgeServer(edg_wll_Context ctx,const edg_wll_PurgeRequest *request,
                                edg_wll_FreeStatus(&stat);
                        }
                        free(job_s);
-                       jobs_to_exa--;
                }
                glite_lbu_FreeStmt(&s);
        }
 
 abort:
-       if (parse && !edg_wll_Error(ctx,NULL,NULL))
+       if (prg.parse && !edg_wll_Error(ctx,NULL,NULL))
        {
-               if ( naffected_jobs ) {
+               if ( prg.naffected_jobs ) {
                        glite_common_log(LOG_CATEGORY_CONTROL, LOG_PRIORITY_INFO, "[%d] Found only jobs not matching server address/port; these were not purged.", getpid());
                }
                else {
@@ -630,36 +646,38 @@ static int purge_check(edg_wll_Context ctx, edg_wll_JobStat *stat, time_t start,
 
 #define GRAN 32
 
-static int purge_one_with_subjobs(edg_wll_Context ctx, edg_wll_JobStat *stat, int dumpfile, const edg_wll_PurgeRequest *request, edg_wll_PurgeResult *result, int *njobs, int *parse) {
+static int purge_one_with_subjobs(edg_wll_Context ctx, purge_ctx_t *prg, edg_wll_JobStat *stat, const edg_wll_PurgeRequest *request, edg_wll_PurgeResult *result) {
        char *job_s;
        int i;
 
-       if (purge_one(ctx,stat,dumpfile,request->flags&EDG_WLL_PURGE_REALLY_PURGE,ctx->isProxy)) return edg_wll_Error(ctx, NULL, NULL);
+       if (purge_one(ctx,stat,prg->dumpfile,request->flags&EDG_WLL_PURGE_REALLY_PURGE,ctx->isProxy)) return edg_wll_Error(ctx, NULL, NULL);
+       purge_throttle(prg);
 
 /* XXX: change with the streaming interface */
        if (request->flags & EDG_WLL_PURGE_LIST_JOBS) {
                job_s = glite_jobid_unparse(stat->jobId);
-               if (*njobs % GRAN == 0 || !result->jobs)
-                       result->jobs = realloc(result->jobs,(*njobs+GRAN+1) * sizeof(*(result->jobs)));
-               result->jobs[*njobs] = job_s;
-               result->jobs[*njobs+1] = NULL;
+               if (prg->naffected_jobs % GRAN == 0 || !result->jobs)
+                       result->jobs = realloc(result->jobs,(prg->naffected_jobs+GRAN+1) * sizeof(*(result->jobs)));
+               result->jobs[prg->naffected_jobs] = job_s;
+               result->jobs[prg->naffected_jobs+1] = NULL;
        }
-       (*njobs)++;
+       prg->naffected_jobs++;
 
        /* purge the subjobs */
        if (stat->children_num && stat->children) {
                for (i = 0; i < stat->children_num && stat->children[i]; i++) {
-                       if (purge_one(ctx, stat->children_states + i, dumpfile, request->flags&EDG_WLL_PURGE_REALLY_PURGE,ctx->isProxy)) {
+                       if (purge_one(ctx, stat->children_states + i, prg->dumpfile, request->flags&EDG_WLL_PURGE_REALLY_PURGE,ctx->isProxy)) {
                                return edg_wll_Error(ctx, NULL, NULL);
                        }
+                       purge_throttle(prg);
 
                        if (request->flags & EDG_WLL_PURGE_LIST_JOBS) {
-                               if (*njobs % GRAN == 0 || !result->jobs)
-                                       result->jobs = realloc(result->jobs,(*njobs+GRAN+1) * sizeof(*(result->jobs)));
-                               result->jobs[*njobs] = strdup(stat->children[i]);
-                               result->jobs[*njobs+1] = NULL;
+                               if (prg->naffected_jobs % GRAN == 0 || !result->jobs)
+                                       result->jobs = realloc(result->jobs,(prg->naffected_jobs+GRAN+1) * sizeof(*(result->jobs)));
+                               result->jobs[prg->naffected_jobs] = strdup(stat->children[i]);
+                               result->jobs[prg->naffected_jobs+1] = NULL;
                        }
-                       (*njobs)++;
+                       prg->naffected_jobs++;
                }
        }
 
@@ -982,35 +1000,35 @@ int job_exists(edg_wll_Context ctx, glite_jobid_const_t job) {
 
 
 /* throttle purging according to the required target_runtime */
-static void purge_throttle(int jobs_to_exa, double purge_end, double *time_per_job, time_t *target_runtime) {
+static void purge_throttle(purge_ctx_t *prg) {
        struct timeval  tp;
        double target_this, now;
 
-
-       if (*time_per_job < 0.0) {
-               *time_per_job = jobs_to_exa ? (double)*target_runtime / jobs_to_exa : 0.0;
-               //fprintf(stderr, "[%d] target runtime: %ld, end: %lf, jobs: %d, tpj: %lf\n",
-               //    getpid(), *target_runtime, purge_end, jobs_to_exa, *time_per_job);
+       if (prg->time_per_job < 0.0) {
+               prg->time_per_job = prg->jobs_to_exa ? (double)(prg->target_runtime) / prg->jobs_to_exa : 0.0;
+               //glite_common_log(LOG_CATEGORY_CONTROL, LOG_PRIORITY_DEBUG, "purge, target runtime: %ld, end: %lf, jobs: %d, s/job: %lf\n",
+               //    prg->target_runtime, prg->purge_end, prg->jobs_to_exa, prg->time_per_job);
        }
 
-       if (*target_runtime) {
-               target_this = purge_end - *time_per_job * jobs_to_exa;
+       if (--prg->jobs_to_exa < 0) prg->jobs_to_exa = 0;
+
+       if (prg->target_runtime) {
+               target_this = prg->purge_end - prg->time_per_job * prg->jobs_to_exa;
                gettimeofday(&tp, NULL);
                now = tp.tv_sec + (double)tp.tv_usec / 1000000.0;
                if (target_this > now) {  /* enough time */
-                       //fprintf(stderr, "[%d] sleeping for %lf second...\n", getpid(), target_this - now);
+                       //glite_common_log(LOG_CATEGORY_CONTROL, LOG_PRIORITY_DEBUG, "purge, sleeping for %lf seconds...\n", target_this - now);
                        usleep(1e6*(target_this - now));
                }
                if (target_this < now) {   /* speed up */
-                       *time_per_job = (purge_end-now)/jobs_to_exa;
+                       prg->time_per_job = (prg->purge_end-now)/prg->jobs_to_exa;
                        /* limit not catched, maximal speed */
-                       if (*time_per_job <= 0) {
-                               *time_per_job = 0.0;
-                               *target_runtime = 0;
+                       if (prg->time_per_job <= 0) {
+                               prg->time_per_job = 0.0;
+                               prg->target_runtime = 0;
                        }
                }
-               //fprintf(stderr, "[%d] tpj: %lf\n", getpid(), *time_per_job);
+               //glite_common_log(LOG_CATEGORY_CONTROL, LOG_PRIORITY_DEBUG, "purge, s/job: %lf\n", prg->time_per_job);
        }
-
 }