From: František Dvořák Date: Tue, 23 Sep 2008 10:51:05 +0000 (+0000) Subject: Throttle purging, update docs. X-Git-Tag: myproxy-config-R_2_0_2_1~22 X-Git-Url: http://scientific.zcu.cz/git/?a=commitdiff_plain;h=7bd883d02f788093d043265765a4794a125ce3e9;p=jra1mw.git Throttle purging, update docs. --- diff --git a/org.glite.lb.client/doc/glite-lb-purge.8 b/org.glite.lb.client/doc/glite-lb-purge.8 index 1df6c4c..848efa4 100644 --- a/org.glite.lb.client/doc/glite-lb-purge.8 +++ b/org.glite.lb.client/doc/glite-lb-purge.8 @@ -32,6 +32,9 @@ Purge CLEARED jobs older than given time. .IP "-n, --cancelled \fiNNN[smhd]\fR" Purge CANCELLED jobs older than given time. +.IP "-e, --done \fiNNN[smhd]\fR" +Purge DONE jobs older than given time. + .IP "-o, --other \fiNNN[smhd]\fR" Purge OTHER (i.e. job in other state than ABORTED, CLEARED or CANCELLED) jobs older than given time. @@ -50,9 +53,6 @@ Dump jobs at the server into a file (see bkserverd). .IP "-i, --client-dump" Receive stream of dumped jobs (NOT implemented yet!). -.IP "-f, --dump-full" -Dump full database records of purged jobs. - .IP "-h, --help" Display help. @@ -62,6 +62,15 @@ Display version. .IP "-d, --debug" Print diagnostics on the \fIstderr\fR. +.IP "-x, --proxy" +Purge L&B proxy. + +.IP "-X, --sock \fIpath\fR" +Purge L&B proxy using given socket path. + +.IP "-t, --target-runtime \fiNNN[smhd]\fR" +Throttle purge to the estimated target runtime. + .\".SH EXAMPLES .\"To appear :o( diff --git a/org.glite.lb.client/src/export.sh b/org.glite.lb.client/src/export.sh index 05f5c19..9ec130d 100644 --- a/org.glite.lb.client/src/export.sh +++ b/org.glite.lb.client/src/export.sh @@ -62,7 +62,7 @@ if [ x"$GLITE_LB_PURGE_ENABLED" = x"true" ]; then X509_USER_CERT="$X509_USER_CERT" X509_USER_KEY="$X509_USER_KEY" $PREFIX/bin/glite-lb-purge $GLITE_LB_EXPORT_PURGE_ARGS -l -m $GLITE_LB_EXPORT_BKSERVER $GLITE_LB_PURGE_OTHER_OPTIONS fi - find $GLITE_LB_EXPORT_PURGEDIR/ -type f 2>/dev/null | \ + find $GLITE_LB_EXPORT_PURGEDIR/ -type f -name purge_\* 2>/dev/null | \ while read file; do if [ -s $file ]; then if [ x"$GLITE_LB_EXPORT_ENABLED" = x"true" ]; then diff --git a/org.glite.lb.common/interface/query_rec.h b/org.glite.lb.common/interface/query_rec.h index 7fffc55..8a660d1 100644 --- a/org.glite.lb.common/interface/query_rec.h +++ b/org.glite.lb.common/interface/query_rec.h @@ -148,6 +148,9 @@ typedef struct _edg_wll_PurgeRequest { #define EDG_WLL_PURGE_CLIENT_DUMP 8 /* ! when addning new constant, add it also to common/xml_conversions.c ! */ +/** Desired purge estimated time. + */ + time_t target_runtime; /** private request processing data (for the reentrant functions) */ /* TODO */ diff --git a/org.glite.lb.common/src/xml_parse.c.T b/org.glite.lb.common/src/xml_parse.c.T index 1272f75..ce6fafe 100644 --- a/org.glite.lb.common/src/xml_parse.c.T +++ b/org.glite.lb.common/src/xml_parse.c.T @@ -2745,7 +2745,7 @@ int edg_wll_PurgeRequestToXML( const edg_wll_PurgeRequest *request, char **message) { - char *pomA, *pomB, *pomC; + char *pomA, *pomB, *pomC, *pomMSG; if (!request) { *message = NULL; return(-1); } @@ -2759,15 +2759,18 @@ int edg_wll_PurgeRequestToXML( edg_wll_add_time_t_list_to_XMLBody(&pomB, request->timeout, "timeout", edg_wll_StatToString, "\t", 0, EDG_WLL_NUMBER_OF_STATCODES); - trio_asprintf(&pomC,"%s%s%s\t%|Xs\r\n%s", - PURGE_REQUEST_BEGIN,pomA,pomB,edg_wll_purge_flags_to_string(request->flags), + pomC = strdup(""); + edg_wll_add_time_t_to_XMLBody(&pomC, request->target_runtime, "target_runtime", 0); + + trio_asprintf(&pomMSG,"%s%s%s%s\t%|Xs\r\n%s", + PURGE_REQUEST_BEGIN,pomA,pomB,pomC,edg_wll_purge_flags_to_string(request->flags), PURGE_REQUEST_END); free(pomA); free(pomB); + free(pomC); - - *message = pomC; + *message = pomMSG; return 0; } diff --git a/org.glite.lb.server/src/lb_xml_parse.c.T b/org.glite.lb.server/src/lb_xml_parse.c.T index 084fc18..99a4286 100644 --- a/org.glite.lb.server/src/lb_xml_parse.c.T +++ b/org.glite.lb.server/src/lb_xml_parse.c.T @@ -288,7 +288,7 @@ static void startPurgeRequest(void *data, const char *el, const char **attr) case 0: if (strcasecmp(el,"edg_wll_PurgeRequest")) unexp() break; case 1: if (strcasecmp(el,"jobs") && strcasecmp(el,"timeout") - && strcasecmp(el,"flags")) unexp() + && strcasecmp(el,"flags") && strcasecmp(el,"target_runtime")) unexp() else XMLCtx->position = 0; break; @@ -755,6 +755,8 @@ static void endPurgeRequest(void *data, const char *el UNUSED_VAR) s = glite_lbu_UnescapeXML((const char *) XMLCtx->char_buf); XMLCtx->purgeRequestGlobal.flags = edg_wll_string_to_purge_flags(s); free(s); + } else if (!strcmp(XMLCtx->element,"target_runtime")) { + XMLCtx->purgeRequestGlobal.target_runtime = edg_wll_from_string_to_time_t(XMLCtx); } } else if (XMLCtx->level == 3) { diff --git a/org.glite.lb.server/src/srv_purge.c b/org.glite.lb.server/src/srv_purge.c index e60fe8d..5c670b2 100644 --- a/org.glite.lb.server/src/srv_purge.c +++ b/org.glite.lb.server/src/srv_purge.c @@ -285,9 +285,11 @@ int edg_wll_PurgeServer(edg_wll_Context ctx,const edg_wll_PurgeRequest *request, else { glite_lbu_Statement s; char *job_s; - int res; + int jobs_to_exa; time_t timeout[EDG_WLL_NUMBER_OF_STATCODES], - now = time(NULL); + start = time(NULL); + double now, time_per_job, target_this, purge_end; + struct timeval tp; for (i=0; itimeout[i] < 0 ? @@ -296,10 +298,17 @@ int edg_wll_PurgeServer(edg_wll_Context ctx,const edg_wll_PurgeRequest *request, request->timeout[EDG_WLL_PURGE_JOBSTAT_OTHER] ) : request->timeout[i]; - if (edg_wll_ExecSQL(ctx, (ctx->isProxy) ? "select dg_jobid from jobs where proxy='1'" : - "select dg_jobid from jobs where server='1'", &s) < 0) goto abort; - - while ((res = edg_wll_FetchRow(ctx,s,1,NULL,&job_s)) > 0) { + if ((jobs_to_exa = edg_wll_ExecSQL(ctx, (ctx->isProxy) ? "select dg_jobid from jobs where proxy='1'" : + "select dg_jobid from jobs where server='1'", &s)) < 0) goto abort; + + gettimeofday(&tp, NULL); + now = tp.tv_sec + (double)tp.tv_usec / 1000000.0; + purge_end = now + request->target_runtime; + time_per_job = jobs_to_exa ? (double)request->target_runtime / jobs_to_exa : 0.0; + //fprintf(stderr, "[%d] target runtime: %ld, end: %lf, jobs: %d, tpj: %lf\n", + // getpid(), request->target_runtime, purge_end, jobs_to_exa, time_per_job); + + while (edg_wll_FetchRow(ctx,s,1,NULL,&job_s) > 0) { if (edg_wlc_JobIdParse(job_s,&job)) { fprintf(stderr,"%s: parse error (internal inconsistency !)\n",job_s); parse = 1; @@ -314,6 +323,21 @@ int edg_wll_PurgeServer(edg_wll_Context ctx,const edg_wll_PurgeRequest *request, continue; } + /* throttle purging according to the required target_runtime */ + if (request->target_runtime) { + target_this = purge_end - time_per_job * jobs_to_exa; + gettimeofday(&tp, NULL); + now = tp.tv_sec + (double)tp.tv_usec / 1000000.0; + if (target_this > now) { /* enough time */ + //fprintf(stderr, "[%d] sleeping for %lf second...\n", getpid(), target_this - now); + usleep(1e6*(target_this - now)); + } + if (target_this < now) { /* speed up */ + time_per_job = (purge_end-now)/jobs_to_exa; + } + //fprintf(stderr, "[%d] tpj: %lf\n", getpid(), time_per_job); + } + memset(&stat,0,sizeof stat); if (edg_wll_JobStatusServer(ctx,job,0,&stat)) { /* FIXME: replace by intJobStatus ?? */ if (edg_wll_Error(ctx, NULL, NULL) == ENOENT) { @@ -325,7 +349,7 @@ int edg_wll_PurgeServer(edg_wll_Context ctx,const edg_wll_PurgeRequest *request, goto abort; } - if (stat.lastUpdateTime.tv_sec && now-stat.lastUpdateTime.tv_sec > timeout[stat.state] && !check_strict_jobid(ctx,job)) + if (stat.lastUpdateTime.tv_sec && start-stat.lastUpdateTime.tv_sec > timeout[stat.state] && !check_strict_jobid(ctx,job)) { if (purge_one(ctx,job,dumpfile,request->flags&EDG_WLL_PURGE_REALLY_PURGE,ctx->isProxy)) { edg_wll_FreeStatus(&stat); @@ -365,6 +389,7 @@ int edg_wll_PurgeServer(edg_wll_Context ctx,const edg_wll_PurgeRequest *request, edg_wll_FreeStatus(&stat); free(job_s); } + jobs_to_exa--; } glite_lbu_FreeStmt(&s); } diff --git a/org.glite.lb.utils/src/purge.c b/org.glite.lb.utils/src/purge.c index 8aa88a4..29214d2 100644 --- a/org.glite.lb.utils/src/purge.c +++ b/org.glite.lb.utils/src/purge.c @@ -58,6 +58,7 @@ static struct option opts[] = { { "server", required_argument, NULL, 'm' }, { "proxy", no_argument, NULL, 'x' }, { "sock", required_argument, NULL, 'X' }, + {"target-runtime", required_argument, NULL, 't'}, { NULL, no_argument, NULL, 0 } }; @@ -79,7 +80,8 @@ static void usage(char *me) " -d, --debug diagnostic output\n" " -m, --server L&B server machine name\n" " -x, --proxy purge L&B proxy\n" - " -X, --sock purge L&B proxy using default socket path\n", + " -X, --sock purge L&B proxy using default socket path\n" + " -t, --target-runtime NNN[smhd] throttle purge to the estimated target runtime\n", me); } @@ -112,7 +114,7 @@ int main(int argc,char *argv[]) edg_wll_InitContext(&ctx); /* get arguments */ - while ((opt = getopt_long(argc,argv,"a:c:n:e:o:j:m:rlsidhvxX:",opts,NULL)) != EOF) { + while ((opt = getopt_long(argc,argv,"a:c:n:e:o:j:m:rlsidhvxX:t:",opts,NULL)) != EOF) { timeout=-1; switch (opt) { @@ -182,6 +184,16 @@ int main(int argc,char *argv[]) ctx->isProxy = 1; edg_wll_SetParam(ctx, EDG_WLL_PARAM_LBPROXY_SERVE_SOCK, optarg); break; + case 't': + if ((get_timeout(optarg,&timeout) != 0 )) { + printf("Wrong usage of timeout argument.\n"); + usage(me); + return 1; + } + if (timeout >= 0) { + request->target_runtime=timeout; + } + break; case 'h': case '?': usage(me); return 1; } @@ -218,6 +230,7 @@ int main(int argc,char *argv[]) for ( i = 0; request->jobs[i]; i++ ) printf("%s\n", request->jobs[i]); } + printf("- target runtime: %ld\n", request->target_runtime); } if ( server )