.IP "-n, --cancelled \fiNNN[smhd]\fR"
Purge CANCELLED jobs older than given time.
+.IP "-e, --done \fiNNN[smhd]\fR"
+Purge DONE jobs older than given time.
+
.IP "-o, --other \fiNNN[smhd]\fR"
Purge OTHER (i.e. job in other state than ABORTED, CLEARED or CANCELLED) jobs older than given time.
.IP "-i, --client-dump"
Receive stream of dumped jobs (NOT implemented yet!).
-.IP "-f, --dump-full"
-Dump full database records of purged jobs.
-
.IP "-h, --help"
Display help.
.IP "-d, --debug"
Print diagnostics on the \fIstderr\fR.
+.IP "-x, --proxy"
+Purge L&B proxy.
+
+.IP "-X, --sock \fIpath\fR"
+Purge L&B proxy using given socket path.
+
+.IP "-t, --target-runtime \fiNNN[smhd]\fR"
+Throttle purge to the estimated target runtime.
+
.\".SH EXAMPLES
.\"To appear :o(
X509_USER_CERT="$X509_USER_CERT" X509_USER_KEY="$X509_USER_KEY" $PREFIX/bin/glite-lb-purge $GLITE_LB_EXPORT_PURGE_ARGS -l -m $GLITE_LB_EXPORT_BKSERVER $GLITE_LB_PURGE_OTHER_OPTIONS
fi
- find $GLITE_LB_EXPORT_PURGEDIR/ -type f 2>/dev/null | \
+ find $GLITE_LB_EXPORT_PURGEDIR/ -type f -name purge_\* 2>/dev/null | \
while read file; do
if [ -s $file ]; then
if [ x"$GLITE_LB_EXPORT_ENABLED" = x"true" ]; then
#define EDG_WLL_PURGE_CLIENT_DUMP 8
/* ! when addning new constant, add it also to common/xml_conversions.c ! */
+/** Desired purge estimated time.
+ */
+ time_t target_runtime;
/** private request processing data (for the reentrant functions) */
/* TODO */
const edg_wll_PurgeRequest *request,
char **message)
{
- char *pomA, *pomB, *pomC;
+ char *pomA, *pomB, *pomC, *pomMSG;
if (!request) { *message = NULL; return(-1); }
edg_wll_add_time_t_list_to_XMLBody(&pomB, request->timeout, "timeout", edg_wll_StatToString, "\t",
0, EDG_WLL_NUMBER_OF_STATCODES);
- trio_asprintf(&pomC,"%s%s%s\t<flags>%|Xs</flags>\r\n%s",
- PURGE_REQUEST_BEGIN,pomA,pomB,edg_wll_purge_flags_to_string(request->flags),
+ pomC = strdup("");
+ edg_wll_add_time_t_to_XMLBody(&pomC, request->target_runtime, "target_runtime", 0);
+
+ trio_asprintf(&pomMSG,"%s%s%s%s\t<flags>%|Xs</flags>\r\n%s",
+ PURGE_REQUEST_BEGIN,pomA,pomB,pomC,edg_wll_purge_flags_to_string(request->flags),
PURGE_REQUEST_END);
free(pomA);
free(pomB);
+ free(pomC);
-
- *message = pomC;
+ *message = pomMSG;
return 0;
}
case 0: if (strcasecmp(el,"edg_wll_PurgeRequest")) unexp()
break;
case 1: if (strcasecmp(el,"jobs") && strcasecmp(el,"timeout")
- && strcasecmp(el,"flags")) unexp()
+ && strcasecmp(el,"flags") && strcasecmp(el,"target_runtime")) unexp()
else
XMLCtx->position = 0;
break;
s = glite_lbu_UnescapeXML((const char *) XMLCtx->char_buf);
XMLCtx->purgeRequestGlobal.flags = edg_wll_string_to_purge_flags(s);
free(s);
+ } else if (!strcmp(XMLCtx->element,"target_runtime")) {
+ XMLCtx->purgeRequestGlobal.target_runtime = edg_wll_from_string_to_time_t(XMLCtx);
}
}
else if (XMLCtx->level == 3) {
else {
glite_lbu_Statement s;
char *job_s;
- int res;
+ int jobs_to_exa;
time_t timeout[EDG_WLL_NUMBER_OF_STATCODES],
- now = time(NULL);
+ start = time(NULL);
+ double now, time_per_job, target_this, purge_end;
+ struct timeval tp;
for (i=0; i<EDG_WLL_NUMBER_OF_STATCODES; i++)
timeout[i] = request->timeout[i] < 0 ?
request->timeout[EDG_WLL_PURGE_JOBSTAT_OTHER] ) :
request->timeout[i];
- if (edg_wll_ExecSQL(ctx, (ctx->isProxy) ? "select dg_jobid from jobs where proxy='1'" :
- "select dg_jobid from jobs where server='1'", &s) < 0) goto abort;
-
- while ((res = edg_wll_FetchRow(ctx,s,1,NULL,&job_s)) > 0) {
+ if ((jobs_to_exa = edg_wll_ExecSQL(ctx, (ctx->isProxy) ? "select dg_jobid from jobs where proxy='1'" :
+ "select dg_jobid from jobs where server='1'", &s)) < 0) goto abort;
+
+ gettimeofday(&tp, NULL);
+ now = tp.tv_sec + (double)tp.tv_usec / 1000000.0;
+ purge_end = now + request->target_runtime;
+ time_per_job = jobs_to_exa ? (double)request->target_runtime / jobs_to_exa : 0.0;
+ //fprintf(stderr, "[%d] target runtime: %ld, end: %lf, jobs: %d, tpj: %lf\n",
+ // getpid(), request->target_runtime, purge_end, jobs_to_exa, time_per_job);
+
+ while (edg_wll_FetchRow(ctx,s,1,NULL,&job_s) > 0) {
if (edg_wlc_JobIdParse(job_s,&job)) {
fprintf(stderr,"%s: parse error (internal inconsistency !)\n",job_s);
parse = 1;
continue;
}
+ /* throttle purging according to the required target_runtime */
+ if (request->target_runtime) {
+ target_this = purge_end - time_per_job * jobs_to_exa;
+ gettimeofday(&tp, NULL);
+ now = tp.tv_sec + (double)tp.tv_usec / 1000000.0;
+ if (target_this > now) { /* enough time */
+ //fprintf(stderr, "[%d] sleeping for %lf second...\n", getpid(), target_this - now);
+ usleep(1e6*(target_this - now));
+ }
+ if (target_this < now) { /* speed up */
+ time_per_job = (purge_end-now)/jobs_to_exa;
+ }
+ //fprintf(stderr, "[%d] tpj: %lf\n", getpid(), time_per_job);
+ }
+
memset(&stat,0,sizeof stat);
if (edg_wll_JobStatusServer(ctx,job,0,&stat)) { /* FIXME: replace by intJobStatus ?? */
if (edg_wll_Error(ctx, NULL, NULL) == ENOENT) {
goto abort;
}
- if (stat.lastUpdateTime.tv_sec && now-stat.lastUpdateTime.tv_sec > timeout[stat.state] && !check_strict_jobid(ctx,job))
+ if (stat.lastUpdateTime.tv_sec && start-stat.lastUpdateTime.tv_sec > timeout[stat.state] && !check_strict_jobid(ctx,job))
{
if (purge_one(ctx,job,dumpfile,request->flags&EDG_WLL_PURGE_REALLY_PURGE,ctx->isProxy)) {
edg_wll_FreeStatus(&stat);
edg_wll_FreeStatus(&stat);
free(job_s);
}
+ jobs_to_exa--;
}
glite_lbu_FreeStmt(&s);
}
{ "server", required_argument, NULL, 'm' },
{ "proxy", no_argument, NULL, 'x' },
{ "sock", required_argument, NULL, 'X' },
+ {"target-runtime", required_argument, NULL, 't'},
{ NULL, no_argument, NULL, 0 }
};
" -d, --debug diagnostic output\n"
" -m, --server L&B server machine name\n"
" -x, --proxy purge L&B proxy\n"
- " -X, --sock <path> purge L&B proxy using default socket path\n",
+ " -X, --sock <path> purge L&B proxy using default socket path\n"
+ " -t, --target-runtime NNN[smhd] throttle purge to the estimated target runtime\n",
me);
}
edg_wll_InitContext(&ctx);
/* get arguments */
- while ((opt = getopt_long(argc,argv,"a:c:n:e:o:j:m:rlsidhvxX:",opts,NULL)) != EOF) {
+ while ((opt = getopt_long(argc,argv,"a:c:n:e:o:j:m:rlsidhvxX:t:",opts,NULL)) != EOF) {
timeout=-1;
switch (opt) {
ctx->isProxy = 1;
edg_wll_SetParam(ctx, EDG_WLL_PARAM_LBPROXY_SERVE_SOCK, optarg);
break;
+ case 't':
+ if ((get_timeout(optarg,&timeout) != 0 )) {
+ printf("Wrong usage of timeout argument.\n");
+ usage(me);
+ return 1;
+ }
+ if (timeout >= 0) {
+ request->target_runtime=timeout;
+ }
+ break;
case 'h':
case '?': usage(me); return 1;
}
for ( i = 0; request->jobs[i]; i++ )
printf("%s\n", request->jobs[i]);
}
+ printf("- target runtime: %ld\n", request->target_runtime);
}
if ( server )