From e4b7aeca212403f621c574537d9234fa1f149c68 Mon Sep 17 00:00:00 2001 From: =?utf8?q?Franti=C5=A1ek=20Dvo=C5=99=C3=A1k?= Date: Fri, 3 Jul 2009 18:16:47 +0000 Subject: [PATCH] Background purge feature, fixes timeout errors when using throttled purge. --- org.glite.lb.common/interface/mini_http.h | 1 + org.glite.lb.common/interface/query_rec.h | 3 ++ org.glite.lb.common/src/xml_conversions.c | 9 +++++ org.glite.lb.server/src/db_calls.c | 3 +- org.glite.lb.server/src/lb_proto.c | 58 +++++++++++++++++++++++++++++-- org.glite.lb.server/src/purge.h | 2 ++ org.glite.lb.server/src/srv_purge.c | 9 +++-- org.glite.lb.utils/Makefile | 2 +- org.glite.lb.utils/doc/glite-lb-purge.8 | 3 ++ org.glite.lb.utils/src/purge.c | 15 ++++++-- 10 files changed, 95 insertions(+), 10 deletions(-) diff --git a/org.glite.lb.common/interface/mini_http.h b/org.glite.lb.common/interface/mini_http.h index 0096568..72c9f8c 100644 --- a/org.glite.lb.common/interface/mini_http.h +++ b/org.glite.lb.common/interface/mini_http.h @@ -25,6 +25,7 @@ /* subset of HTTP codes we return */ #define HTTP_OK 200 +#define HTTP_ACCEPTED 202 #define HTTP_BADREQ 400 #define HTTP_UNAUTH 401 #define HTTP_NOTFOUND 404 diff --git a/org.glite.lb.common/interface/query_rec.h b/org.glite.lb.common/interface/query_rec.h index 3498a01..c8d302d 100644 --- a/org.glite.lb.common/interface/query_rec.h +++ b/org.glite.lb.common/interface/query_rec.h @@ -148,6 +148,9 @@ typedef struct _edg_wll_PurgeRequest { #define EDG_WLL_PURGE_SERVER_DUMP 4 /** TODO: stream the dump info to the client */ #define EDG_WLL_PURGE_CLIENT_DUMP 8 +/** purge the jobs on background, + nifty for the throttled purging using target_runtime */ +#define EDG_WLL_PURGE_BACKGROUND 16 /* ! when addning new constant, add it also to common/xml_conversions.c ! */ /** Desired purge estimated time. diff --git a/org.glite.lb.common/src/xml_conversions.c b/org.glite.lb.common/src/xml_conversions.c index 7d75088..cc8af3f 100644 --- a/org.glite.lb.common/src/xml_conversions.c +++ b/org.glite.lb.common/src/xml_conversions.c @@ -838,6 +838,14 @@ char *edg_wll_purge_flags_to_string(int flags) } else asprintf(&cflags,"%s","client_dump"); } + if (flags & EDG_WLL_PURGE_BACKGROUND) { + if (cflags) { + asprintf(&temp_cflags,"%s+%s",cflags,"background"); + free(cflags); + cflags=temp_cflags; + } + else asprintf(&cflags,"%s","background"); + } if (!cflags) cflags = strdup(""); @@ -858,6 +866,7 @@ int edg_wll_string_to_purge_flags(char *cflags) if (!strcmp(sflag,"list_jobs")) flags = flags | EDG_WLL_PURGE_LIST_JOBS; if (!strcmp(sflag,"server_dump")) flags = flags | EDG_WLL_PURGE_SERVER_DUMP; if (!strcmp(sflag,"client_dump")) flags = flags | EDG_WLL_PURGE_CLIENT_DUMP; + if (!strcmp(sflag,"background")) flags = flags | EDG_WLL_PURGE_BACKGROUND; sflag = strtok_r(NULL, "+", &last); } diff --git a/org.glite.lb.server/src/db_calls.c b/org.glite.lb.server/src/db_calls.c index 4725cb6..4ef3e96 100644 --- a/org.glite.lb.server/src/db_calls.c +++ b/org.glite.lb.server/src/db_calls.c @@ -45,9 +45,10 @@ int edg_wll_jobMembership(edg_wll_Context ctx, glite_jobid_const_t job) fprintf(stderr,"Error retrieving proxy&server fields of jobs table. Missing column?\n"); edg_wll_SetError(ctx,ENOENT,dbjob); } - glite_lbu_FreeStmt(&q); clean: + glite_lbu_FreeStmt(&q); + free(res[0]); free(res[1]); free(dbjob); free(stmt); diff --git a/org.glite.lb.server/src/lb_proto.c b/org.glite.lb.server/src/lb_proto.c index 773f63d..649592a 100644 --- a/org.glite.lb.server/src/lb_proto.c +++ b/org.glite.lb.server/src/lb_proto.c @@ -5,6 +5,8 @@ #include #include #include +#include +#include #include @@ -60,6 +62,8 @@ static const char* const response_headers_html[] = { NULL }; +volatile sig_atomic_t purge_quit = 0; + extern int edg_wll_NotifNewServer(edg_wll_Context, edg_wll_QueryRec const * const *, int flags, char const *, const edg_wll_NotifId, time_t *); @@ -80,6 +84,7 @@ char *edg_wll_HTTPErrorMessage(int errCode) switch (errCode) { case HTTP_OK: msg = "OK"; break; + case HTTP_ACCEPTED: msg = "Accepted"; break; case HTTP_BADREQ: msg = "Bad Request"; break; case HTTP_UNAUTH: msg = "Unauthorized"; break; case HTTP_NOTFOUND: msg = "Not Found"; break; @@ -375,6 +380,11 @@ static int getJobsRSS(edg_wll_Context ctx, char *feedType, edg_wll_JobStat **sta return 0; } +static int hup_handler(int sig) { + purge_quit = 1; + return 0; +} + edg_wll_ErrorCode edg_wll_ProtoV21(edg_wll_Context ctx, char *request,char **headers,char *messageBody, char **response,char ***headersOut,char **bodyOut) @@ -899,7 +909,9 @@ edg_wll_ErrorCode edg_wll_Proto(edg_wll_Context ctx, else if (!strncmp(requestPTR,KEY_PURGE_REQUEST,sizeof(KEY_PURGE_REQUEST)-1)) { edg_wll_PurgeRequest request; edg_wll_PurgeResult result; - int fatal = 0; + struct sigaction sa; + sigset_t sset; + int fatal = 0, retval; ctx->p_tmp_timeout.tv_sec = 86400; @@ -907,6 +919,37 @@ edg_wll_ErrorCode edg_wll_Proto(edg_wll_Context ctx, memset(&result,0,sizeof(result)); if ( !parsePurgeRequest(ctx,messageBody,(int (*)()) edg_wll_StringToStat,&request) ) { + /* do throttled purge on background if requested */ + if ((request.flags & EDG_WLL_PURGE_BACKGROUND)) { + retval = fork(); + + switch (retval) { + case 0: /* forked cleaner */ + memset(&sa, 0, sizeof(sa)); + sa.sa_handler = hup_handler; + sigaction(SIGTERM, &sa, NULL); + sigaction(SIGINT, &sa, NULL); + + sigemptyset(&sset); + sigaddset(&sset, SIGTERM); + sigaddset(&sset, SIGINT); + sigprocmask(SIG_UNBLOCK, &sset, NULL); + sigemptyset(&sset); + sigaddset(&sset, SIGCHLD); + sigprocmask(SIG_BLOCK, &sset, NULL); + break; + case -1: /* err */ + ret = HTTP_INTERNAL; + edg_wll_SetError(ctx, errno, "can't fork purge process"); + goto err; + default: /* client request handler */ + ret = HTTP_ACCEPTED; + /* to end this parent */ + edg_wll_SetError(ctx, EDG_WLL_ERROR_SERVER_RESPONSE, edg_wll_HTTPErrorMessage(ret)); + goto err; + } + } + switch ( edg_wll_PurgeServer(ctx, (const edg_wll_PurgeRequest *)&request, &result)) { case 0: if (html) ret = HTTP_NOTIMPL; else ret = HTTP_OK; @@ -935,6 +978,15 @@ edg_wll_ErrorCode edg_wll_Proto(edg_wll_Context ctx, free(result.jobs); } + /* forked cleaner sends no results */ + if ((request.flags & EDG_WLL_PURGE_BACKGROUND)) { + *response = NULL; + free(message); + message = NULL; + if (requestPTR) free(requestPTR); + exit(0); + } + } if ( request.jobs ) @@ -1182,9 +1234,9 @@ edg_wll_ErrorCode edg_wll_Proto(edg_wll_Context ctx, err: asprintf(response,"HTTP/1.1 %d %s",ret,edg_wll_HTTPErrorMessage(ret)); *headersOut = (char **) (html ? response_headers_html : response_headers_dglb); - if ((ret != HTTP_OK) && text) + if ((ret != HTTP_OK && ret != HTTP_ACCEPTED) && text) *bodyOut = edg_wll_ErrorToText(ctx,ret); - else if ((ret != HTTP_OK) && html) + else if ((ret != HTTP_OK && ret != HTTP_ACCEPTED) && html) *bodyOut = edg_wll_ErrorToHTML(ctx,ret); else *bodyOut = message; diff --git a/org.glite.lb.server/src/purge.h b/org.glite.lb.server/src/purge.h index 35951bf..6bd038a 100644 --- a/org.glite.lb.server/src/purge.h +++ b/org.glite.lb.server/src/purge.h @@ -5,6 +5,8 @@ #include "glite/lb/context.h" +extern volatile sig_atomic_t purge_quit; + /** Server side implementation * besides output to the SSL stream (in the context) it may produce * the server-side dump files diff --git a/org.glite.lb.server/src/srv_purge.c b/org.glite.lb.server/src/srv_purge.c index 49c47dd..b09e62a 100644 --- a/org.glite.lb.server/src/srv_purge.c +++ b/org.glite.lb.server/src/srv_purge.c @@ -9,6 +9,7 @@ #include #include #include +#include #include "glite/jobid/cjobid.h" #include "glite/lbu/trio.h" @@ -34,6 +35,8 @@ #define sizofa(a) (sizeof(a)/sizeof((a)[0])) +extern volatile sig_atomic_t purge_quit; + static const char* const resp_headers[] = { "Cache-Control: no-cache", "Server: edg_wll_Server/" PROTO_VERSION "/" COMP_PROTO, @@ -252,7 +255,7 @@ int edg_wll_PurgeServer(edg_wll_Context ctx,const edg_wll_PurgeRequest *request, if (request->jobs) { for (jobs_to_exa=0; request->jobs[jobs_to_exa]; jobs_to_exa++); - for (i=0; request->jobs[i]; i++) { + for (i=0; request->jobs[i] && !purge_quit; i++) { if (edg_wlc_JobIdParse(request->jobs[i],&job)) { fprintf(stderr,"%s: parse error\n",request->jobs[i]); parse = 1; @@ -264,6 +267,7 @@ int edg_wll_PurgeServer(edg_wll_Context ctx,const edg_wll_PurgeRequest *request, } else { purge_throttle(jobs_to_exa, purge_end, &time_per_job, &target_runtime); + if (purge_quit) break; switch (purge_one(ctx,job,dumpfile,request->flags&EDG_WLL_PURGE_REALLY_PURGE,ctx->isProxy)) { case 0: if (request->flags & EDG_WLL_PURGE_LIST_JOBS) { @@ -304,7 +308,7 @@ int edg_wll_PurgeServer(edg_wll_Context ctx,const edg_wll_PurgeRequest *request, if ((jobs_to_exa = edg_wll_ExecSQL(ctx, (ctx->isProxy) ? "select dg_jobid from jobs where proxy='1'" : "select dg_jobid from jobs where server='1'", &s)) < 0) goto abort; - while (edg_wll_FetchRow(ctx,s,1,NULL,&job_s) > 0) { + while (edg_wll_FetchRow(ctx,s,1,NULL,&job_s) > 0 && !purge_quit) { if (edg_wlc_JobIdParse(job_s,&job)) { fprintf(stderr,"%s: parse error (internal inconsistency !)\n",job_s); parse = 1; @@ -320,6 +324,7 @@ int edg_wll_PurgeServer(edg_wll_Context ctx,const edg_wll_PurgeRequest *request, } purge_throttle(jobs_to_exa, purge_end, &time_per_job, &target_runtime); + if (purge_quit) break; memset(&stat,0,sizeof stat); if (edg_wll_JobStatusServer(ctx,job,0,&stat)) { /* FIXME: replace by intJobStatus ?? */ diff --git a/org.glite.lb.utils/Makefile b/org.glite.lb.utils/Makefile index 039d5c9..3595dde 100644 --- a/org.glite.lb.utils/Makefile +++ b/org.glite.lb.utils/Makefile @@ -110,7 +110,7 @@ install: done clean: - rm -rfv ${UTILS} statistics ${MAN_GZ} ${MAN8_GZ} *.{lo,o} .libs/ + rm -rfv ${ALLUTILS} statistics ${MAN_GZ} ${MAN8_GZ} *.{lo,o} .libs/ rm -rvf log.xml project/ rpmbuild/ RPMS/ tgz/ %.o: %.c diff --git a/org.glite.lb.utils/doc/glite-lb-purge.8 b/org.glite.lb.utils/doc/glite-lb-purge.8 index 848efa4..4482ba0 100644 --- a/org.glite.lb.utils/doc/glite-lb-purge.8 +++ b/org.glite.lb.utils/doc/glite-lb-purge.8 @@ -71,6 +71,9 @@ Purge L&B proxy using given socket path. .IP "-t, --target-runtime \fiNNN[smhd]\fR" Throttle purge to the estimated target runtime. +.IP "-b, --background \fiN\fR" +Purge on the background. Default 1 when throttled purging. + .\".SH EXAMPLES .\"To appear :o( diff --git a/org.glite.lb.utils/src/purge.c b/org.glite.lb.utils/src/purge.c index 4248528..1dba77c 100644 --- a/org.glite.lb.utils/src/purge.c +++ b/org.glite.lb.utils/src/purge.c @@ -59,6 +59,7 @@ static struct option opts[] = { { "proxy", no_argument, NULL, 'x' }, { "sock", required_argument, NULL, 'X' }, {"target-runtime", required_argument, NULL, 't'}, + {"background", required_argument, NULL, 'b'}, { NULL, no_argument, NULL, 0 } }; @@ -81,7 +82,8 @@ static void usage(char *me) " -m, --server L&B server machine name\n" " -x, --proxy purge L&B proxy\n" " -X, --sock purge L&B proxy using default socket path\n" - " -t, --target-runtime NNN[smhd] throttle purge to the estimated target runtime\n", + " -t, --target-runtime NNN[smhd] throttle purge to the estimated target runtime\n" + " -b, --background purge on the background\n", me); } @@ -89,7 +91,7 @@ int main(int argc,char *argv[]) { edg_wll_PurgeRequest *request; edg_wll_PurgeResult *result; - int i, timeout; + int i, timeout, background; char *server = NULL; char *me; @@ -114,8 +116,9 @@ int main(int argc,char *argv[]) edg_wll_InitContext(&ctx); /* get arguments */ - while ((opt = getopt_long(argc,argv,"a:c:n:e:o:j:m:rlsidhvxX:t:",opts,NULL)) != EOF) { + while ((opt = getopt_long(argc,argv,"a:c:n:e:o:j:m:rlsidhvxX:t:b:",opts,NULL)) != EOF) { timeout=-1; + background=-1; switch (opt) { @@ -202,9 +205,15 @@ int main(int argc,char *argv[]) request->target_runtime=timeout; } break; + case 'b': + background = atoi(optarg); + break; case 'h': case '?': usage(me); return 1; } + + if ((background == -1 && request->target_runtime) || background) + request->flags |= EDG_WLL_PURGE_BACKGROUND; } /* read the jobIds from file, if wanted */ -- 1.8.2.3