Background purge feature, fixes timeout errors when using throttled purge.
authorFrantišek Dvořák <valtri@civ.zcu.cz>
Fri, 3 Jul 2009 18:16:47 +0000 (18:16 +0000)
committerFrantišek Dvořák <valtri@civ.zcu.cz>
Fri, 3 Jul 2009 18:16:47 +0000 (18:16 +0000)
org.glite.lb.common/interface/mini_http.h
org.glite.lb.common/interface/query_rec.h
org.glite.lb.common/src/xml_conversions.c
org.glite.lb.server/src/db_calls.c
org.glite.lb.server/src/lb_proto.c
org.glite.lb.server/src/purge.h
org.glite.lb.server/src/srv_purge.c
org.glite.lb.utils/Makefile
org.glite.lb.utils/doc/glite-lb-purge.8
org.glite.lb.utils/src/purge.c

index 0096568..72c9f8c 100644 (file)
@@ -25,6 +25,7 @@
 
 /* subset of HTTP codes we return */
 #define HTTP_OK                        200
+#define HTTP_ACCEPTED          202
 #define HTTP_BADREQ            400
 #define HTTP_UNAUTH            401
 #define HTTP_NOTFOUND          404
index 3498a01..c8d302d 100644 (file)
@@ -148,6 +148,9 @@ typedef struct _edg_wll_PurgeRequest {
 #define EDG_WLL_PURGE_SERVER_DUMP      4
 /** TODO: stream the dump info to the client */
 #define EDG_WLL_PURGE_CLIENT_DUMP      8
+/** purge the jobs on background,
+    nifty for the throttled purging using target_runtime */
+#define EDG_WLL_PURGE_BACKGROUND       16
 /* ! when addning new constant, add it also to common/xml_conversions.c ! */
 
 /** Desired purge estimated time.
index 7d75088..cc8af3f 100644 (file)
@@ -838,6 +838,14 @@ char *edg_wll_purge_flags_to_string(int flags)
                 }
                 else asprintf(&cflags,"%s","client_dump");
         }
+        if (flags & EDG_WLL_PURGE_BACKGROUND) {
+                if (cflags) {
+                        asprintf(&temp_cflags,"%s+%s",cflags,"background");
+                        free(cflags);
+                        cflags=temp_cflags;
+                }
+                else asprintf(&cflags,"%s","background");
+        }
 
         if (!cflags) cflags = strdup("");
 
@@ -858,6 +866,7 @@ int edg_wll_string_to_purge_flags(char *cflags)
                if (!strcmp(sflag,"list_jobs")) flags = flags | EDG_WLL_PURGE_LIST_JOBS;
                if (!strcmp(sflag,"server_dump")) flags = flags | EDG_WLL_PURGE_SERVER_DUMP;
                if (!strcmp(sflag,"client_dump")) flags = flags | EDG_WLL_PURGE_CLIENT_DUMP;
+               if (!strcmp(sflag,"background")) flags = flags | EDG_WLL_PURGE_BACKGROUND;
                sflag = strtok_r(NULL, "+", &last);
        } 
 
index 4725cb6..4ef3e96 100644 (file)
@@ -45,9 +45,10 @@ int edg_wll_jobMembership(edg_wll_Context ctx, glite_jobid_const_t job)
                fprintf(stderr,"Error retrieving proxy&server fields of jobs table. Missing column?\n");
                 edg_wll_SetError(ctx,ENOENT,dbjob);
         }
-        glite_lbu_FreeStmt(&q);
 
 clean:
+        glite_lbu_FreeStmt(&q);
+
        free(res[0]); free(res[1]);
         free(dbjob);
         free(stmt);
index 773f63d..649592a 100644 (file)
@@ -5,6 +5,8 @@
 #include <string.h>
 #include <ctype.h>
 #include <errno.h>
+#include <signal.h>
+#include <unistd.h>
 
 #include <expat.h>
 
@@ -60,6 +62,8 @@ static const char* const response_headers_html[] = {
         NULL
 };
 
+volatile sig_atomic_t purge_quit = 0;
+
 extern int edg_wll_NotifNewServer(edg_wll_Context,
                                edg_wll_QueryRec const * const *, int flags, char const *,
                                const edg_wll_NotifId, time_t *);
@@ -80,6 +84,7 @@ char *edg_wll_HTTPErrorMessage(int errCode)
        
        switch (errCode) {
                case HTTP_OK: msg = "OK"; break;
+               case HTTP_ACCEPTED: msg = "Accepted"; break;
                case HTTP_BADREQ: msg = "Bad Request"; break;
                case HTTP_UNAUTH: msg = "Unauthorized"; break;
                case HTTP_NOTFOUND: msg = "Not Found"; break;
@@ -375,6 +380,11 @@ static int getJobsRSS(edg_wll_Context ctx, char *feedType, edg_wll_JobStat **sta
        return 0;
 }
 
+static int hup_handler(int sig) {
+       purge_quit = 1;
+       return 0;
+}
+
 edg_wll_ErrorCode edg_wll_ProtoV21(edg_wll_Context ctx,
        char *request,char **headers,char *messageBody,
        char **response,char ***headersOut,char **bodyOut)
@@ -899,7 +909,9 @@ edg_wll_ErrorCode edg_wll_Proto(edg_wll_Context ctx,
                else if (!strncmp(requestPTR,KEY_PURGE_REQUEST,sizeof(KEY_PURGE_REQUEST)-1)) {
                        edg_wll_PurgeRequest    request;
                        edg_wll_PurgeResult     result;
-                       int                     fatal = 0;
+                       struct sigaction        sa;
+                       sigset_t                sset;
+                       int                     fatal = 0, retval;
 
                        ctx->p_tmp_timeout.tv_sec = 86400;  
 
@@ -907,6 +919,37 @@ edg_wll_ErrorCode edg_wll_Proto(edg_wll_Context ctx,
                        memset(&result,0,sizeof(result));
 
                        if ( !parsePurgeRequest(ctx,messageBody,(int (*)()) edg_wll_StringToStat,&request) ) {
+                               /* do throttled purge on background if requested */
+                               if ((request.flags & EDG_WLL_PURGE_BACKGROUND)) {
+                                       retval = fork();
+
+                                       switch (retval) {
+                                       case 0: /* forked cleaner */
+                                               memset(&sa, 0, sizeof(sa));
+                                               sa.sa_handler = hup_handler;
+                                               sigaction(SIGTERM, &sa, NULL);
+                                               sigaction(SIGINT, &sa, NULL);
+
+                                               sigemptyset(&sset);
+                                               sigaddset(&sset, SIGTERM);
+                                               sigaddset(&sset, SIGINT);
+                                               sigprocmask(SIG_UNBLOCK, &sset, NULL);
+                                               sigemptyset(&sset);
+                                               sigaddset(&sset, SIGCHLD);
+                                               sigprocmask(SIG_BLOCK, &sset, NULL);
+                                               break;
+                                       case -1: /* err */
+                                               ret = HTTP_INTERNAL;
+                                               edg_wll_SetError(ctx, errno, "can't fork purge process");
+                                               goto err;
+                                       default: /* client request handler */
+                                               ret = HTTP_ACCEPTED;
+                                               /* to end this parent */
+                                               edg_wll_SetError(ctx, EDG_WLL_ERROR_SERVER_RESPONSE, edg_wll_HTTPErrorMessage(ret));
+                                               goto err;
+                                       }
+                               }
+
                                switch ( edg_wll_PurgeServer(ctx, (const edg_wll_PurgeRequest *)&request, &result)) {
                                        case 0: if (html) ret =  HTTP_NOTIMPL;
                                                else ret = HTTP_OK;
@@ -935,6 +978,15 @@ edg_wll_ErrorCode edg_wll_Proto(edg_wll_Context ctx,
                                        free(result.jobs);
                                }
 
+                               /* forked cleaner sends no results */
+                               if ((request.flags & EDG_WLL_PURGE_BACKGROUND)) {
+                                       *response = NULL;
+                                       free(message);
+                                       message = NULL;
+                                       if (requestPTR) free(requestPTR);
+                                       exit(0);
+                               }
+
                        }
 
                        if ( request.jobs )
@@ -1182,9 +1234,9 @@ edg_wll_ErrorCode edg_wll_Proto(edg_wll_Context ctx,
 
 err:   asprintf(response,"HTTP/1.1 %d %s",ret,edg_wll_HTTPErrorMessage(ret));
        *headersOut = (char **) (html ? response_headers_html : response_headers_dglb);
-       if ((ret != HTTP_OK) && text)
+       if ((ret != HTTP_OK && ret != HTTP_ACCEPTED) && text)
                 *bodyOut = edg_wll_ErrorToText(ctx,ret);
-       else if ((ret != HTTP_OK) && html)
+       else if ((ret != HTTP_OK && ret != HTTP_ACCEPTED) && html)
                *bodyOut = edg_wll_ErrorToHTML(ctx,ret);
        else
                *bodyOut = message;
index 35951bf..6bd038a 100644 (file)
@@ -5,6 +5,8 @@
 
 #include "glite/lb/context.h"
 
+extern volatile sig_atomic_t purge_quit;
+
 /** Server side implementation
  *  besides output to the SSL stream (in the context) it may produce
  *  the server-side dump files
index 49c47dd..b09e62a 100644 (file)
@@ -9,6 +9,7 @@
 #include <string.h>
 #include <time.h>
 #include <errno.h>
+#include <signal.h>
 
 #include "glite/jobid/cjobid.h"
 #include "glite/lbu/trio.h"
@@ -34,6 +35,8 @@
 
 #define sizofa(a) (sizeof(a)/sizeof((a)[0]))
 
+extern volatile sig_atomic_t purge_quit;
+
 static const char* const resp_headers[] = {
        "Cache-Control: no-cache",
        "Server: edg_wll_Server/" PROTO_VERSION "/" COMP_PROTO,
@@ -252,7 +255,7 @@ int edg_wll_PurgeServer(edg_wll_Context ctx,const edg_wll_PurgeRequest *request,
        if (request->jobs) {
 
        for (jobs_to_exa=0; request->jobs[jobs_to_exa]; jobs_to_exa++);
-       for (i=0; request->jobs[i]; i++) {
+       for (i=0; request->jobs[i] && !purge_quit; i++) {
                if (edg_wlc_JobIdParse(request->jobs[i],&job)) {
                        fprintf(stderr,"%s: parse error\n",request->jobs[i]);
                        parse = 1;
@@ -264,6 +267,7 @@ int edg_wll_PurgeServer(edg_wll_Context ctx,const edg_wll_PurgeRequest *request,
                        }
                        else {
                                purge_throttle(jobs_to_exa, purge_end, &time_per_job, &target_runtime);
+                               if (purge_quit) break;
 
                                switch (purge_one(ctx,job,dumpfile,request->flags&EDG_WLL_PURGE_REALLY_PURGE,ctx->isProxy)) {
                                        case 0: if (request->flags & EDG_WLL_PURGE_LIST_JOBS) {
@@ -304,7 +308,7 @@ int edg_wll_PurgeServer(edg_wll_Context ctx,const edg_wll_PurgeRequest *request,
                if ((jobs_to_exa = edg_wll_ExecSQL(ctx, (ctx->isProxy) ? "select dg_jobid from jobs where proxy='1'" :
                        "select dg_jobid from jobs where server='1'", &s)) < 0) goto abort;
 
-               while (edg_wll_FetchRow(ctx,s,1,NULL,&job_s) > 0) {
+               while (edg_wll_FetchRow(ctx,s,1,NULL,&job_s) > 0 && !purge_quit) {
                        if (edg_wlc_JobIdParse(job_s,&job)) {
                                fprintf(stderr,"%s: parse error (internal inconsistency !)\n",job_s);
                                parse = 1;
@@ -320,6 +324,7 @@ int edg_wll_PurgeServer(edg_wll_Context ctx,const edg_wll_PurgeRequest *request,
                                }
 
                                purge_throttle(jobs_to_exa, purge_end, &time_per_job, &target_runtime);
+                               if (purge_quit) break;
 
                                memset(&stat,0,sizeof stat);
                                if (edg_wll_JobStatusServer(ctx,job,0,&stat)) {  /* FIXME: replace by intJobStatus ?? */
index 039d5c9..3595dde 100644 (file)
@@ -110,7 +110,7 @@ install:
        done
 
 clean:
-       rm -rfv ${UTILS} statistics ${MAN_GZ} ${MAN8_GZ} *.{lo,o} .libs/
+       rm -rfv ${ALLUTILS} statistics ${MAN_GZ} ${MAN8_GZ} *.{lo,o} .libs/
        rm -rvf log.xml project/ rpmbuild/ RPMS/ tgz/
 
 %.o: %.c
index 848efa4..4482ba0 100644 (file)
@@ -71,6 +71,9 @@ Purge L&B proxy using given socket path.
 .IP "-t, --target-runtime \fiNNN[smhd]\fR"
 Throttle purge to the estimated target runtime.
 
+.IP "-b, --background \fiN\fR"
+Purge on the background. Default 1 when throttled purging.
+
 
 .\".SH EXAMPLES
 .\"To appear :o(
index 4248528..1dba77c 100644 (file)
@@ -59,6 +59,7 @@ static struct option opts[] = {
        { "proxy",              no_argument, NULL, 'x' },
        { "sock",               required_argument, NULL, 'X' },
        {"target-runtime",      required_argument, NULL, 't'},
+       {"background",          required_argument, NULL, 'b'},
        { NULL,                 no_argument, NULL,  0 }
 };
 
@@ -81,7 +82,8 @@ static void usage(char *me)
                "       -m, --server                L&B server machine name\n"
                "       -x, --proxy                 purge L&B proxy\n"
                "       -X, --sock <path>           purge L&B proxy using default socket path\n"
-               "       -t, --target-runtime NNN[smhd]  throttle purge to the estimated target runtime\n",
+               "       -t, --target-runtime NNN[smhd]  throttle purge to the estimated target runtime\n"
+               "       -b, --background            purge on the background\n",
                me);
 }
 
@@ -89,7 +91,7 @@ int main(int argc,char *argv[])
 {
        edg_wll_PurgeRequest *request;
        edg_wll_PurgeResult *result;
-       int     i, timeout;
+       int     i, timeout, background;
        char *server = NULL;
 
        char *me;
@@ -114,8 +116,9 @@ int main(int argc,char *argv[])
        edg_wll_InitContext(&ctx);
 
        /* get arguments */
-       while ((opt = getopt_long(argc,argv,"a:c:n:e:o:j:m:rlsidhvxX:t:",opts,NULL)) != EOF) {
+       while ((opt = getopt_long(argc,argv,"a:c:n:e:o:j:m:rlsidhvxX:t:b:",opts,NULL)) != EOF) {
                timeout=-1;
+               background=-1;
 
                switch (opt) {
 
@@ -202,9 +205,15 @@ int main(int argc,char *argv[])
                                request->target_runtime=timeout; 
                        }
                        break;
+               case 'b':
+                       background = atoi(optarg);
+                       break;
                case 'h':
                case '?': usage(me); return 1;
                }
+
+               if ((background == -1 && request->target_runtime) || background)
+                       request->flags |= EDG_WLL_PURGE_BACKGROUND;
        }
 
        /* read the jobIds from file, if wanted */