purge: throttle with job list too, improvement, segfault fix
authorFrantišek Dvořák <valtri@civ.zcu.cz>
Mon, 13 Oct 2008 13:35:40 +0000 (13:35 +0000)
committerFrantišek Dvořák <valtri@civ.zcu.cz>
Mon, 13 Oct 2008 13:35:40 +0000 (13:35 +0000)
server examples: warnings, clean target

org.glite.lb.server/Makefile
org.glite.lb.server/examples/ws_jobstat.c
org.glite.lb.server/src/srv_purge.c

index 75b8a3b..1113e7e 100644 (file)
@@ -240,6 +240,8 @@ LCAS_PLUGIN_OBJS:=lcas_lb.o
 LCAS_PLUGIN_LOBJS:=${LCAS_PLUGIN_OBJS:.o=.lo}
 LCAS_PLUGIN_LIB:=liblcas_lb.la
 
+EXAMPLES=ws_getversion ws_jobstat ws_query_ex ws_joblog ws_userjobs
+
 glite-lb-bkserverd: ${NSMAP} ${BKSERVER_OBJS}
        ${LINKXX} -o $@ ${BKSERVER_OBJS} ${BKSERVER_LIBS}
 
@@ -307,7 +309,7 @@ test_soap_conv: test_soap_conv.cpp ${WS_CLIENT_OBJS}
        ${CXX} -c ${CFLAGS} ${TEST_INC} $<
        ${LINKXX} -o $@ test_soap_conv.o ${WS_CLIENT_OBJS} ${WS_CLIENT_LIBS} ${TEST_LIBS}
 
-examples: ws_getversion ws_jobstat ws_query_ex ws_joblog ws_userjobs
+examples: ${EXAMPLES}
 
 ws_getversion: ws_getversion.o ${WS_CLIENT_OBJS}
        ${LINK} -o $@ $< ${WS_CLIENT_OBJS} ${WS_CLIENT_LIBS}
@@ -377,7 +379,7 @@ ifdef LB_PERF
 endif
 
 clean:
-       rm -rvf *.c *.h *.ch *.xh *.xml *.nsmap *.o *.lo .libs glite-lb-* ${STATIC_LIB_BK} ${LCAS_PLUGIN_LIB} test* ${MAN_GZ}
+       rm -rvf *.c *.h *.ch *.xh *.xml *.nsmap *.o *.lo .libs glite-lb-* ${STATIC_LIB_BK} ${LCAS_PLUGIN_LIB} test* ${MAN_GZ} ${EXAMPLES}
        rm -rvf log.xml project/ rpmbuild/ RPMS/ tgz/
 
 %.c: %.c.T
index 2336b46..553359c 100644 (file)
@@ -1,10 +1,12 @@
 #include <getopt.h>
 #include <stdsoap2.h>
+#include <expat.h>
 
 #include "soap_version.h"
 #include "glite/security/glite_gsplugin.h"
 #include "glite/security/glite_gscompat.h"
 #include "glite/lb/consumer.h"
+#include "glite/lb/xml_conversions.h"
 
 #include "bk_ws_H.h"
 #include "ws_typeref.h"
index 94a7112..76dcab5 100644 (file)
@@ -44,6 +44,7 @@ static const char* const resp_headers[] = {
 static int purge_one(edg_wll_Context ctx,glite_jobid_const_t,int,int,int);
 int unset_proxy_flag(edg_wll_Context ctx, glite_jobid_const_t job);
 static int unset_server_flag(edg_wll_Context ctx, glite_jobid_const_t job);
+static void purge_throttle(int jobs_to_exa, double purge_end, double *time_per_job, time_t *target_runtime);
 
 
 int edg_wll_CreateTmpFileStorage(edg_wll_Context ctx, char *prefix, char **fname)
@@ -202,6 +203,10 @@ int edg_wll_PurgeServer(edg_wll_Context ctx,const edg_wll_PurgeRequest *request,
        edg_wlc_JobId   job;
        char    *tmpfname = NULL;
        int     naffected_jobs = 0, ret;
+       double          now, time_per_job, purge_end;
+       struct timeval  tp;
+       int             jobs_to_exa;
+       time_t  target_runtime;
 
 
        if (!ctx->noAuth) {
@@ -237,7 +242,17 @@ int edg_wll_PurgeServer(edg_wll_Context ctx,const edg_wll_PurgeRequest *request,
        }
        */
 
-       if (request->jobs) for (i=0; request->jobs[i]; i++) {
+       /* throttle parameters */
+       gettimeofday(&tp, NULL);
+       now = tp.tv_sec + (double)tp.tv_usec / 1000000.0;
+       purge_end = now + request->target_runtime;
+       target_runtime = request->target_runtime;
+       time_per_job = -1.0;
+
+       if (request->jobs) {
+
+       for (jobs_to_exa=0; request->jobs[jobs_to_exa]; jobs_to_exa++);
+       for (i=0; request->jobs[i]; i++) {
                if (edg_wlc_JobIdParse(request->jobs[i],&job)) {
                        fprintf(stderr,"%s: parse error\n",request->jobs[i]);
                        parse = 1;
@@ -248,6 +263,8 @@ int edg_wll_PurgeServer(edg_wll_Context ctx,const edg_wll_PurgeRequest *request,
                                parse = 1;
                        }
                        else {
+                               purge_throttle(jobs_to_exa, purge_end, &time_per_job, &target_runtime);
+
                                switch (purge_one(ctx,job,dumpfile,request->flags&EDG_WLL_PURGE_REALLY_PURGE,ctx->isProxy)) {
                                        case 0: if (request->flags & EDG_WLL_PURGE_LIST_JOBS) {
                                                        result->jobs = realloc(result->jobs,(naffected_jobs+2) * sizeof(*(result->jobs)));
@@ -280,15 +297,14 @@ int edg_wll_PurgeServer(edg_wll_Context ctx,const edg_wll_PurgeRequest *request,
                        }
                        edg_wlc_JobIdFree(job);
                }
+               jobs_to_exa--;
+       }
        }
        else {
                glite_lbu_Statement     s;
                char            *job_s;
-               int             jobs_to_exa;
                time_t          timeout[EDG_WLL_NUMBER_OF_STATCODES],
                                start = time(NULL);
-               double          now, time_per_job, target_this, purge_end;
-               struct timeval  tp;
 
                for (i=0; i<EDG_WLL_NUMBER_OF_STATCODES; i++)
                        timeout[i] = request->timeout[i] < 0 ? 
@@ -299,13 +315,6 @@ int edg_wll_PurgeServer(edg_wll_Context ctx,const edg_wll_PurgeRequest *request,
 
                if ((jobs_to_exa = edg_wll_ExecSQL(ctx, (ctx->isProxy) ? "select dg_jobid from jobs where proxy='1'" :
                        "select dg_jobid from jobs where server='1'", &s)) < 0) goto abort;
-               
-               gettimeofday(&tp, NULL);
-               now = tp.tv_sec + (double)tp.tv_usec / 1000000.0;
-               purge_end = now + request->target_runtime;
-               time_per_job = jobs_to_exa ? (double)request->target_runtime / jobs_to_exa : 0.0;
-               //fprintf(stderr, "[%d] target runtime: %ld, end: %lf, jobs: %d, tpj: %lf\n",
-               //    getpid(), request->target_runtime, purge_end, jobs_to_exa, time_per_job);
 
                while (edg_wll_FetchRow(ctx,s,1,NULL,&job_s) > 0) {
                        if (edg_wlc_JobIdParse(job_s,&job)) {
@@ -322,20 +331,7 @@ int edg_wll_PurgeServer(edg_wll_Context ctx,const edg_wll_PurgeRequest *request,
                                        continue;
                                }
 
-                               /* throttle purging according to the required target_runtime */
-                               if (request->target_runtime) {
-                                       target_this = purge_end - time_per_job * jobs_to_exa;
-                                       gettimeofday(&tp, NULL);
-                                       now = tp.tv_sec + (double)tp.tv_usec / 1000000.0;
-                                       if (target_this > now) {  /* enough time */
-                                               //fprintf(stderr, "[%d] sleeping for %lf second...\n", getpid(), target_this - now);
-                                               usleep(1e6*(target_this - now));
-                                       }
-                                       if (target_this < now) {   /* speed up */
-                                               time_per_job = (purge_end-now)/jobs_to_exa;
-                                       }
-                                       //fprintf(stderr, "[%d] tpj: %lf\n", getpid(), time_per_job);
-                               }
+                               purge_throttle(jobs_to_exa, purge_end, &time_per_job, &target_runtime);
 
                                memset(&stat,0,sizeof stat);
                                if (edg_wll_JobStatusServer(ctx,job,0,&stat)) {  /* FIXME: replace by intJobStatus ?? */
@@ -620,11 +616,11 @@ int purge_one(edg_wll_Context ctx,glite_jobid_const_t job,int dump, int purge, i
 {
        char    *dbjob = NULL;
        char    *stmt = NULL;
-       glite_lbu_Statement     q;
+       glite_lbu_Statement     q = NULL;
        int             ret,dumped = 0;
        char    *res[10];
        char    *prefix = NULL, *suffix = NULL, *root = NULL;
-       char    *prefix_id, *suffix_id;
+       char    *prefix_id = NULL, *suffix_id = NULL;
        int     sql_retval;
 
        edg_wll_ResetError(ctx);
@@ -870,3 +866,36 @@ int unset_server_flag(edg_wll_Context ctx, glite_jobid_const_t job)
        return(edg_wll_ExecSQL(ctx,stmt,NULL));
 }
 
+
+/* throttle purging according to the required target_runtime */
+static void purge_throttle(int jobs_to_exa, double purge_end, double *time_per_job, time_t *target_runtime) {
+       struct timeval  tp;
+       double target_this, now;
+
+
+       if (*time_per_job < 0.0) {
+               *time_per_job = jobs_to_exa ? (double)*target_runtime / jobs_to_exa : 0.0;
+               //fprintf(stderr, "[%d] target runtime: %ld, end: %lf, jobs: %d, tpj: %lf\n",
+               //    getpid(), *target_runtime, purge_end, jobs_to_exa, *time_per_job);
+       }
+
+       if (*target_runtime) {
+               target_this = purge_end - *time_per_job * jobs_to_exa;
+               gettimeofday(&tp, NULL);
+               now = tp.tv_sec + (double)tp.tv_usec / 1000000.0;
+               if (target_this > now) {  /* enough time */
+                       //fprintf(stderr, "[%d] sleeping for %lf second...\n", getpid(), target_this - now);
+                       usleep(1e6*(target_this - now));
+               }
+               if (target_this < now) {   /* speed up */
+                       *time_per_job = (purge_end-now)/jobs_to_exa;
+                       /* limit not catched, maximal speed */
+                       if (*time_per_job <= 0) {
+                               *time_per_job = 0.0;
+                               *target_runtime = 0;
+                       }
+               }
+               //fprintf(stderr, "[%d] tpj: %lf\n", getpid(), *time_per_job);
+       }
+
+}