From: František Dvořák Date: Mon, 13 Oct 2008 13:35:40 +0000 (+0000) Subject: purge: throttle with job list too, improvement, segfault fix X-Git-Tag: test_tag~142 X-Git-Url: http://scientific.zcu.cz/git/?a=commitdiff_plain;h=c1d3b6faeee8d10753025f475c606b123e0580f4;p=jra1mw.git purge: throttle with job list too, improvement, segfault fix server examples: warnings, clean target --- diff --git a/org.glite.lb.server/Makefile b/org.glite.lb.server/Makefile index 75b8a3b..1113e7e 100644 --- a/org.glite.lb.server/Makefile +++ b/org.glite.lb.server/Makefile @@ -240,6 +240,8 @@ LCAS_PLUGIN_OBJS:=lcas_lb.o LCAS_PLUGIN_LOBJS:=${LCAS_PLUGIN_OBJS:.o=.lo} LCAS_PLUGIN_LIB:=liblcas_lb.la +EXAMPLES=ws_getversion ws_jobstat ws_query_ex ws_joblog ws_userjobs + glite-lb-bkserverd: ${NSMAP} ${BKSERVER_OBJS} ${LINKXX} -o $@ ${BKSERVER_OBJS} ${BKSERVER_LIBS} @@ -307,7 +309,7 @@ test_soap_conv: test_soap_conv.cpp ${WS_CLIENT_OBJS} ${CXX} -c ${CFLAGS} ${TEST_INC} $< ${LINKXX} -o $@ test_soap_conv.o ${WS_CLIENT_OBJS} ${WS_CLIENT_LIBS} ${TEST_LIBS} -examples: ws_getversion ws_jobstat ws_query_ex ws_joblog ws_userjobs +examples: ${EXAMPLES} ws_getversion: ws_getversion.o ${WS_CLIENT_OBJS} ${LINK} -o $@ $< ${WS_CLIENT_OBJS} ${WS_CLIENT_LIBS} @@ -377,7 +379,7 @@ ifdef LB_PERF endif clean: - rm -rvf *.c *.h *.ch *.xh *.xml *.nsmap *.o *.lo .libs glite-lb-* ${STATIC_LIB_BK} ${LCAS_PLUGIN_LIB} test* ${MAN_GZ} + rm -rvf *.c *.h *.ch *.xh *.xml *.nsmap *.o *.lo .libs glite-lb-* ${STATIC_LIB_BK} ${LCAS_PLUGIN_LIB} test* ${MAN_GZ} ${EXAMPLES} rm -rvf log.xml project/ rpmbuild/ RPMS/ tgz/ %.c: %.c.T diff --git a/org.glite.lb.server/examples/ws_jobstat.c b/org.glite.lb.server/examples/ws_jobstat.c index 2336b46..553359c 100644 --- a/org.glite.lb.server/examples/ws_jobstat.c +++ b/org.glite.lb.server/examples/ws_jobstat.c @@ -1,10 +1,12 @@ #include #include +#include #include "soap_version.h" #include "glite/security/glite_gsplugin.h" #include "glite/security/glite_gscompat.h" #include "glite/lb/consumer.h" +#include "glite/lb/xml_conversions.h" #include "bk_ws_H.h" #include "ws_typeref.h" diff --git a/org.glite.lb.server/src/srv_purge.c b/org.glite.lb.server/src/srv_purge.c index 94a7112..76dcab5 100644 --- a/org.glite.lb.server/src/srv_purge.c +++ b/org.glite.lb.server/src/srv_purge.c @@ -44,6 +44,7 @@ static const char* const resp_headers[] = { static int purge_one(edg_wll_Context ctx,glite_jobid_const_t,int,int,int); int unset_proxy_flag(edg_wll_Context ctx, glite_jobid_const_t job); static int unset_server_flag(edg_wll_Context ctx, glite_jobid_const_t job); +static void purge_throttle(int jobs_to_exa, double purge_end, double *time_per_job, time_t *target_runtime); int edg_wll_CreateTmpFileStorage(edg_wll_Context ctx, char *prefix, char **fname) @@ -202,6 +203,10 @@ int edg_wll_PurgeServer(edg_wll_Context ctx,const edg_wll_PurgeRequest *request, edg_wlc_JobId job; char *tmpfname = NULL; int naffected_jobs = 0, ret; + double now, time_per_job, purge_end; + struct timeval tp; + int jobs_to_exa; + time_t target_runtime; if (!ctx->noAuth) { @@ -237,7 +242,17 @@ int edg_wll_PurgeServer(edg_wll_Context ctx,const edg_wll_PurgeRequest *request, } */ - if (request->jobs) for (i=0; request->jobs[i]; i++) { + /* throttle parameters */ + gettimeofday(&tp, NULL); + now = tp.tv_sec + (double)tp.tv_usec / 1000000.0; + purge_end = now + request->target_runtime; + target_runtime = request->target_runtime; + time_per_job = -1.0; + + if (request->jobs) { + + for (jobs_to_exa=0; request->jobs[jobs_to_exa]; jobs_to_exa++); + for (i=0; request->jobs[i]; i++) { if (edg_wlc_JobIdParse(request->jobs[i],&job)) { fprintf(stderr,"%s: parse error\n",request->jobs[i]); parse = 1; @@ -248,6 +263,8 @@ int edg_wll_PurgeServer(edg_wll_Context ctx,const edg_wll_PurgeRequest *request, parse = 1; } else { + purge_throttle(jobs_to_exa, purge_end, &time_per_job, &target_runtime); + switch (purge_one(ctx,job,dumpfile,request->flags&EDG_WLL_PURGE_REALLY_PURGE,ctx->isProxy)) { case 0: if (request->flags & EDG_WLL_PURGE_LIST_JOBS) { result->jobs = realloc(result->jobs,(naffected_jobs+2) * sizeof(*(result->jobs))); @@ -280,15 +297,14 @@ int edg_wll_PurgeServer(edg_wll_Context ctx,const edg_wll_PurgeRequest *request, } edg_wlc_JobIdFree(job); } + jobs_to_exa--; + } } else { glite_lbu_Statement s; char *job_s; - int jobs_to_exa; time_t timeout[EDG_WLL_NUMBER_OF_STATCODES], start = time(NULL); - double now, time_per_job, target_this, purge_end; - struct timeval tp; for (i=0; itimeout[i] < 0 ? @@ -299,13 +315,6 @@ int edg_wll_PurgeServer(edg_wll_Context ctx,const edg_wll_PurgeRequest *request, if ((jobs_to_exa = edg_wll_ExecSQL(ctx, (ctx->isProxy) ? "select dg_jobid from jobs where proxy='1'" : "select dg_jobid from jobs where server='1'", &s)) < 0) goto abort; - - gettimeofday(&tp, NULL); - now = tp.tv_sec + (double)tp.tv_usec / 1000000.0; - purge_end = now + request->target_runtime; - time_per_job = jobs_to_exa ? (double)request->target_runtime / jobs_to_exa : 0.0; - //fprintf(stderr, "[%d] target runtime: %ld, end: %lf, jobs: %d, tpj: %lf\n", - // getpid(), request->target_runtime, purge_end, jobs_to_exa, time_per_job); while (edg_wll_FetchRow(ctx,s,1,NULL,&job_s) > 0) { if (edg_wlc_JobIdParse(job_s,&job)) { @@ -322,20 +331,7 @@ int edg_wll_PurgeServer(edg_wll_Context ctx,const edg_wll_PurgeRequest *request, continue; } - /* throttle purging according to the required target_runtime */ - if (request->target_runtime) { - target_this = purge_end - time_per_job * jobs_to_exa; - gettimeofday(&tp, NULL); - now = tp.tv_sec + (double)tp.tv_usec / 1000000.0; - if (target_this > now) { /* enough time */ - //fprintf(stderr, "[%d] sleeping for %lf second...\n", getpid(), target_this - now); - usleep(1e6*(target_this - now)); - } - if (target_this < now) { /* speed up */ - time_per_job = (purge_end-now)/jobs_to_exa; - } - //fprintf(stderr, "[%d] tpj: %lf\n", getpid(), time_per_job); - } + purge_throttle(jobs_to_exa, purge_end, &time_per_job, &target_runtime); memset(&stat,0,sizeof stat); if (edg_wll_JobStatusServer(ctx,job,0,&stat)) { /* FIXME: replace by intJobStatus ?? */ @@ -620,11 +616,11 @@ int purge_one(edg_wll_Context ctx,glite_jobid_const_t job,int dump, int purge, i { char *dbjob = NULL; char *stmt = NULL; - glite_lbu_Statement q; + glite_lbu_Statement q = NULL; int ret,dumped = 0; char *res[10]; char *prefix = NULL, *suffix = NULL, *root = NULL; - char *prefix_id, *suffix_id; + char *prefix_id = NULL, *suffix_id = NULL; int sql_retval; edg_wll_ResetError(ctx); @@ -870,3 +866,36 @@ int unset_server_flag(edg_wll_Context ctx, glite_jobid_const_t job) return(edg_wll_ExecSQL(ctx,stmt,NULL)); } + +/* throttle purging according to the required target_runtime */ +static void purge_throttle(int jobs_to_exa, double purge_end, double *time_per_job, time_t *target_runtime) { + struct timeval tp; + double target_this, now; + + + if (*time_per_job < 0.0) { + *time_per_job = jobs_to_exa ? (double)*target_runtime / jobs_to_exa : 0.0; + //fprintf(stderr, "[%d] target runtime: %ld, end: %lf, jobs: %d, tpj: %lf\n", + // getpid(), *target_runtime, purge_end, jobs_to_exa, *time_per_job); + } + + if (*target_runtime) { + target_this = purge_end - *time_per_job * jobs_to_exa; + gettimeofday(&tp, NULL); + now = tp.tv_sec + (double)tp.tv_usec / 1000000.0; + if (target_this > now) { /* enough time */ + //fprintf(stderr, "[%d] sleeping for %lf second...\n", getpid(), target_this - now); + usleep(1e6*(target_this - now)); + } + if (target_this < now) { /* speed up */ + *time_per_job = (purge_end-now)/jobs_to_exa; + /* limit not catched, maximal speed */ + if (*time_per_job <= 0) { + *time_per_job = 0.0; + *target_runtime = 0; + } + } + //fprintf(stderr, "[%d] tpj: %lf\n", getpid(), *time_per_job); + } + +}