Throttle purging, update docs.
authorFrantišek Dvořák <valtri@civ.zcu.cz>
Tue, 23 Sep 2008 10:51:05 +0000 (10:51 +0000)
committerFrantišek Dvořák <valtri@civ.zcu.cz>
Tue, 23 Sep 2008 10:51:05 +0000 (10:51 +0000)
org.glite.lb.client/doc/glite-lb-purge.8
org.glite.lb.client/src/export.sh
org.glite.lb.common/interface/query_rec.h
org.glite.lb.common/src/xml_parse.c.T
org.glite.lb.server/src/lb_xml_parse.c.T
org.glite.lb.server/src/srv_purge.c
org.glite.lb.utils/src/purge.c

index 1df6c4c..848efa4 100644 (file)
@@ -32,6 +32,9 @@ Purge CLEARED jobs older than given time.
 .IP "-n, --cancelled \fiNNN[smhd]\fR"
 Purge CANCELLED jobs older than given time.
 
+.IP "-e, --done \fiNNN[smhd]\fR"
+Purge DONE jobs older than given time.
+
 .IP "-o, --other \fiNNN[smhd]\fR"
 Purge OTHER (i.e. job in other state than ABORTED, CLEARED or CANCELLED) jobs older than given time.
 
@@ -50,9 +53,6 @@ Dump jobs at the server into a file (see bkserverd).
 .IP "-i, --client-dump"
 Receive stream of dumped jobs (NOT implemented yet!).
 
-.IP "-f, --dump-full"
-Dump full database records of purged jobs.
-
 .IP "-h, --help"
 Display help.
 
@@ -62,6 +62,15 @@ Display version.
 .IP "-d, --debug"
 Print diagnostics on the \fIstderr\fR.
 
+.IP "-x, --proxy"
+Purge L&B proxy.
+
+.IP "-X, --sock \fIpath\fR"
+Purge L&B proxy using given socket path.
+
+.IP "-t, --target-runtime \fiNNN[smhd]\fR"
+Throttle purge to the estimated target runtime.
+
 
 .\".SH EXAMPLES
 .\"To appear :o(
index 05f5c19..9ec130d 100644 (file)
@@ -62,7 +62,7 @@ if [ x"$GLITE_LB_PURGE_ENABLED" = x"true" ]; then
        X509_USER_CERT="$X509_USER_CERT" X509_USER_KEY="$X509_USER_KEY" $PREFIX/bin/glite-lb-purge $GLITE_LB_EXPORT_PURGE_ARGS -l -m $GLITE_LB_EXPORT_BKSERVER $GLITE_LB_PURGE_OTHER_OPTIONS
 fi
 
-  find  $GLITE_LB_EXPORT_PURGEDIR/ -type f 2>/dev/null | \
+  find  $GLITE_LB_EXPORT_PURGEDIR/ -type f -name purge_\* 2>/dev/null | \
   while read file; do
     if [ -s $file ]; then
        if [ x"$GLITE_LB_EXPORT_ENABLED" = x"true" ]; then
index 7fffc55..8a660d1 100644 (file)
@@ -148,6 +148,9 @@ typedef struct _edg_wll_PurgeRequest {
 #define EDG_WLL_PURGE_CLIENT_DUMP      8
 /* ! when addning new constant, add it also to common/xml_conversions.c ! */
 
+/** Desired purge estimated time.
+ */
+       time_t target_runtime;
        
 /** private request processing data (for the reentrant functions) */
 /* TODO */
index 1272f75..ce6fafe 100644 (file)
@@ -2745,7 +2745,7 @@ int edg_wll_PurgeRequestToXML(
                const edg_wll_PurgeRequest *request,
                 char **message)
 {
-        char *pomA, *pomB, *pomC;
+        char *pomA, *pomB, *pomC, *pomMSG;
 
 
        if (!request) { *message = NULL; return(-1); }
@@ -2759,15 +2759,18 @@ int edg_wll_PurgeRequestToXML(
        edg_wll_add_time_t_list_to_XMLBody(&pomB, request->timeout, "timeout", edg_wll_StatToString, "\t",
                                                0, EDG_WLL_NUMBER_OF_STATCODES);
 
-       trio_asprintf(&pomC,"%s%s%s\t<flags>%|Xs</flags>\r\n%s",
-                       PURGE_REQUEST_BEGIN,pomA,pomB,edg_wll_purge_flags_to_string(request->flags),
+       pomC = strdup("");
+       edg_wll_add_time_t_to_XMLBody(&pomC, request->target_runtime, "target_runtime", 0);
+
+       trio_asprintf(&pomMSG,"%s%s%s%s\t<flags>%|Xs</flags>\r\n%s",
+                       PURGE_REQUEST_BEGIN,pomA,pomB,pomC,edg_wll_purge_flags_to_string(request->flags),
                        PURGE_REQUEST_END);
 
        free(pomA);
        free(pomB);
+       free(pomC);
 
-
-        *message = pomC;
+        *message = pomMSG;
 
         return 0;
 }
index 084fc18..99a4286 100644 (file)
@@ -288,7 +288,7 @@ static void startPurgeRequest(void *data, const char *el, const char **attr)
                case 0: if (strcasecmp(el,"edg_wll_PurgeRequest")) unexp()
                        break;
                case 1: if (strcasecmp(el,"jobs") && strcasecmp(el,"timeout")
-                               && strcasecmp(el,"flags")) unexp()
+                               && strcasecmp(el,"flags") && strcasecmp(el,"target_runtime")) unexp()
                        else 
                                XMLCtx->position = 0;
                        break;
@@ -755,6 +755,8 @@ static void endPurgeRequest(void *data, const char *el UNUSED_VAR)
                        s = glite_lbu_UnescapeXML((const char *) XMLCtx->char_buf);
                        XMLCtx->purgeRequestGlobal.flags = edg_wll_string_to_purge_flags(s);
                        free(s);
+               } else if (!strcmp(XMLCtx->element,"target_runtime")) {
+                       XMLCtx->purgeRequestGlobal.target_runtime = edg_wll_from_string_to_time_t(XMLCtx);
                }
        }
        else if (XMLCtx->level == 3) {
index e60fe8d..5c670b2 100644 (file)
@@ -285,9 +285,11 @@ int edg_wll_PurgeServer(edg_wll_Context ctx,const edg_wll_PurgeRequest *request,
        else {
                glite_lbu_Statement     s;
                char            *job_s;
-               int             res;
+               int             jobs_to_exa;
                time_t          timeout[EDG_WLL_NUMBER_OF_STATCODES],
-                               now = time(NULL);
+                               start = time(NULL);
+               double          now, time_per_job, target_this, purge_end;
+               struct timeval  tp;
 
                for (i=0; i<EDG_WLL_NUMBER_OF_STATCODES; i++)
                        timeout[i] = request->timeout[i] < 0 ? 
@@ -296,10 +298,17 @@ int edg_wll_PurgeServer(edg_wll_Context ctx,const edg_wll_PurgeRequest *request,
                                        request->timeout[EDG_WLL_PURGE_JOBSTAT_OTHER] ) :
                                request->timeout[i];
 
-               if (edg_wll_ExecSQL(ctx, (ctx->isProxy) ? "select dg_jobid from jobs where proxy='1'" :
-                       "select dg_jobid from jobs where server='1'", &s) < 0) goto abort;
-
-               while ((res = edg_wll_FetchRow(ctx,s,1,NULL,&job_s)) > 0) {
+               if ((jobs_to_exa = edg_wll_ExecSQL(ctx, (ctx->isProxy) ? "select dg_jobid from jobs where proxy='1'" :
+                       "select dg_jobid from jobs where server='1'", &s)) < 0) goto abort;
+               
+               gettimeofday(&tp, NULL);
+               now = tp.tv_sec + (double)tp.tv_usec / 1000000.0;
+               purge_end = now + request->target_runtime;
+               time_per_job = jobs_to_exa ? (double)request->target_runtime / jobs_to_exa : 0.0;
+               //fprintf(stderr, "[%d] target runtime: %ld, end: %lf, jobs: %d, tpj: %lf\n",
+               //    getpid(), request->target_runtime, purge_end, jobs_to_exa, time_per_job);
+
+               while (edg_wll_FetchRow(ctx,s,1,NULL,&job_s) > 0) {
                        if (edg_wlc_JobIdParse(job_s,&job)) {
                                fprintf(stderr,"%s: parse error (internal inconsistency !)\n",job_s);
                                parse = 1;
@@ -314,6 +323,21 @@ int edg_wll_PurgeServer(edg_wll_Context ctx,const edg_wll_PurgeRequest *request,
                                        continue;
                                }
 
+                               /* throttle purging according to the required target_runtime */
+                               if (request->target_runtime) {
+                                       target_this = purge_end - time_per_job * jobs_to_exa;
+                                       gettimeofday(&tp, NULL);
+                                       now = tp.tv_sec + (double)tp.tv_usec / 1000000.0;
+                                       if (target_this > now) {  /* enough time */
+                                               //fprintf(stderr, "[%d] sleeping for %lf second...\n", getpid(), target_this - now);
+                                               usleep(1e6*(target_this - now));
+                                       }
+                                       if (target_this < now) {   /* speed up */
+                                               time_per_job = (purge_end-now)/jobs_to_exa;
+                                       }
+                                       //fprintf(stderr, "[%d] tpj: %lf\n", getpid(), time_per_job);
+                               }
+
                                memset(&stat,0,sizeof stat);
                                if (edg_wll_JobStatusServer(ctx,job,0,&stat)) {  /* FIXME: replace by intJobStatus ?? */
                                        if (edg_wll_Error(ctx, NULL, NULL) == ENOENT) {
@@ -325,7 +349,7 @@ int edg_wll_PurgeServer(edg_wll_Context ctx,const edg_wll_PurgeRequest *request,
                                        goto abort; 
                                }
 
-                               if (stat.lastUpdateTime.tv_sec && now-stat.lastUpdateTime.tv_sec > timeout[stat.state] && !check_strict_jobid(ctx,job))
+                               if (stat.lastUpdateTime.tv_sec && start-stat.lastUpdateTime.tv_sec > timeout[stat.state] && !check_strict_jobid(ctx,job))
                                {
                                        if (purge_one(ctx,job,dumpfile,request->flags&EDG_WLL_PURGE_REALLY_PURGE,ctx->isProxy)) {
                                                edg_wll_FreeStatus(&stat);
@@ -365,6 +389,7 @@ int edg_wll_PurgeServer(edg_wll_Context ctx,const edg_wll_PurgeRequest *request,
                                edg_wll_FreeStatus(&stat);
                                free(job_s);
                        }
+                       jobs_to_exa--;
                }
                glite_lbu_FreeStmt(&s);
        }
index 8aa88a4..29214d2 100644 (file)
@@ -58,6 +58,7 @@ static struct option opts[] = {
        { "server",             required_argument, NULL, 'm' },
        { "proxy",              no_argument, NULL, 'x' },
        { "sock",               required_argument, NULL, 'X' },
+       {"target-runtime",      required_argument, NULL, 't'},
        { NULL,                 no_argument, NULL,  0 }
 };
 
@@ -79,7 +80,8 @@ static void usage(char *me)
                "       -d, --debug                 diagnostic output\n"
                "       -m, --server                L&B server machine name\n"
                "       -x, --proxy                 purge L&B proxy\n"
-               "       -X, --sock <path>           purge L&B proxy using default socket path\n",
+               "       -X, --sock <path>           purge L&B proxy using default socket path\n"
+               "       -t, --target-runtime NNN[smhd]  throttle purge to the estimated target runtime\n",
                me);
 }
 
@@ -112,7 +114,7 @@ int main(int argc,char *argv[])
        edg_wll_InitContext(&ctx);
 
        /* get arguments */
-       while ((opt = getopt_long(argc,argv,"a:c:n:e:o:j:m:rlsidhvxX:",opts,NULL)) != EOF) {
+       while ((opt = getopt_long(argc,argv,"a:c:n:e:o:j:m:rlsidhvxX:t:",opts,NULL)) != EOF) {
                timeout=-1;
 
                switch (opt) {
@@ -182,6 +184,16 @@ int main(int argc,char *argv[])
                        ctx->isProxy = 1; 
                        edg_wll_SetParam(ctx, EDG_WLL_PARAM_LBPROXY_SERVE_SOCK, optarg); 
                        break;
+               case 't':
+                       if ((get_timeout(optarg,&timeout) != 0 )) {
+                               printf("Wrong usage of timeout argument.\n");
+                               usage(me);
+                               return 1;
+                       }
+                       if (timeout >= 0) {
+                               request->target_runtime=timeout; 
+                       }
+                       break;
                case 'h':
                case '?': usage(me); return 1;
                }
@@ -218,6 +230,7 @@ int main(int argc,char *argv[])
                        for ( i = 0; request->jobs[i]; i++ )
                                printf("%s\n", request->jobs[i]);
                }
+               printf("- target runtime: %ld\n", request->target_runtime);
        }
 
        if ( server )