From 3960741f6f7d81c1ed62d600169c10dac3a81a5b Mon Sep 17 00:00:00 2001 From: =?utf8?q?Franti=C5=A1ek=20Dvo=C5=99=C3=A1k?= Date: Thu, 19 Jul 2007 14:41:34 +0000 Subject: [PATCH] Added support for dumps to the megajob (without transformations yet). Enabled warnings (prevents printf segfaults). --- org.glite.jp.client/Makefile | 2 +- org.glite.jp.client/examples/glite-jp-importer.sh | 4 +- org.glite.jp.client/examples/mill_feed.c | 44 +++++++- org.glite.jp.client/interface/jpimporter.h | 3 +- org.glite.jp.client/src/jpimp_lib.c | 1 + org.glite.jp.client/src/jpimporter.c | 122 +++++++++++++++++----- 6 files changed, 138 insertions(+), 38 deletions(-) diff --git a/org.glite.jp.client/Makefile b/org.glite.jp.client/Makefile index 8ce0358..02a4e84 100644 --- a/org.glite.jp.client/Makefile +++ b/org.glite.jp.client/Makefile @@ -38,7 +38,7 @@ GLOBUS_CFLAGS:=-I${globus_prefix}/include/${nothrflavour} DEBUG:=-g -O0 -DDEBUG -CFLAGS:=${DEBUG} -I. -I${top_srcdir}/interface -I${top_srcdir}/src -I${gsoap_prefix}/include -I${gsoap_prefix} -I${stagedir}/include ${GLOBUS_CFLAGS} -I${libtar}/include ${JP_PERF_CFLAGS} +CFLAGS:=${DEBUG} -I. -I${top_srcdir}/interface -I${top_srcdir}/src -I${gsoap_prefix}/include -I${gsoap_prefix} -I${stagedir}/include ${GLOBUS_CFLAGS} -I${libtar}/include ${JP_PERF_CFLAGS} -W -Wall -Wno-unused-parameter -D_GNU_SOURCE LDFLAGS:=-L${stagedir}/lib -L${libtar}/lib LINK:=libtool --mode=link ${CC} ${LDFLAGS} diff --git a/org.glite.jp.client/examples/glite-jp-importer.sh b/org.glite.jp.client/examples/glite-jp-importer.sh index 2740e05..cfc5849 100644 --- a/org.glite.jp.client/examples/glite-jp-importer.sh +++ b/org.glite.jp.client/examples/glite-jp-importer.sh @@ -81,10 +81,10 @@ while [ 1 ]; do if [ -n "$GLITE_LB_EXPORT_DUMPDIR_KEEP" ]; then mv $file $GLITE_LB_EXPORT_DUMPDIR_KEEP else - rm $file + rm -f $file fi else - rm $file + rm -f $file fi done diff --git a/org.glite.jp.client/examples/mill_feed.c b/org.glite.jp.client/examples/mill_feed.c index 676a829..3e3b4d4 100644 --- a/org.glite.jp.client/examples/mill_feed.c +++ b/org.glite.jp.client/examples/mill_feed.c @@ -1,3 +1,7 @@ +#include +#include +#include +#include #include #include #include @@ -33,7 +37,9 @@ char perf_ts[100]; static int register_init(); static int register_add(const char *jobid); static void get_time(char *s, size_t maxs, double *t); -static int dump_init(); +static int dump_init(const char *start_jobid); +static int dump_add(const char *filename); +static void dump_done(); static void handler(int sig) { @@ -41,17 +47,19 @@ static void handler(int sig) { signal(sig, SIG_DFL); } + int main(int argc, char *argv[]) { - char start_jobid[256], stop_jobid[256]; + char start_jobid[256], stop_jobid[256], *fn; double ts, ts2; int ret; + FILE *f; get_time(perf_ts, sizeof(perf_ts), &ts); snprintf(start_jobid, sizeof(start_jobid), PERF_JOBID_START_PREFIX "%s", perf_ts); snprintf(stop_jobid, sizeof(stop_jobid), PERF_JOBID_STOP_PREFIX "%s", perf_ts); if ((ret = register_init()) != 0) return ret; - if ((ret = dump_init()) != 0) return ret; + if ((ret = dump_init(start_jobid)) != 0) return ret; if ((ret = register_add(start_jobid)) != 0) return ret; if (signal(SIGINT, handler) == SIG_ERR) { ret = errno; @@ -65,7 +73,20 @@ int main(int argc, char *argv[]) { if (argc > 1) if ((ret = dump_add(argv[1])) != 0) return ret; } + asprintf(&fn, PERF_STOP_FILE_FORMAT, perf_ts); + if ((f = fopen(fn, "wt")) == NULL) { + ret = errno; + free(fn); + fprintf(stderr, "Can' create file '%s': %s\n", fn, strerror(errno)); + return ret; + } + free(fn); + fprintf(f, "regs\t%d\n", perf_regs); + fprintf(f, "dumps\t%d\n", perf_dumps); + fclose(f); if ((ret = register_add(stop_jobid)) != 0) return ret; + dump_done(); + get_time(NULL, -1, &ts2); printf("stop: %lf\n", ts2); printf("regs: %d (%lf jobs/day)\n", perf_regs, 86400.0 * perf_regs / (ts2-ts)); @@ -133,9 +154,11 @@ static int register_add(const char *jobid) { } -static int dump_init() { +static int dump_init(const char *start_jobid) { char *env; + FILE *f; + unlink(PERF_START_FILE); // FIXME: is it OK? (probably different HEAD and branch) env = getenv("GLITE_LB_EXPORT_DUMPDIR"); if (!env) env = EDG_DUMP_STORAGE; @@ -143,7 +166,14 @@ static int dump_init() { mkdir(dump_dir, 0755); perf_dumps = 0; - return 0; + if ((f = fopen(PERF_START_FILE, "wt")) == NULL) { + fprintf(stderr, "Can't create file '" PERF_START_FILE "': %s\n", strerror(errno)); + return EIO; + } + if (start_jobid) fprintf(f, "%s\n", start_jobid); + fclose(f); + + return 0; } @@ -160,3 +190,7 @@ static int dump_add(const char *filename) { perf_dumps++; return ret; } + + +static void dump_done() { +} diff --git a/org.glite.jp.client/interface/jpimporter.h b/org.glite.jp.client/interface/jpimporter.h index a0b0bc7..e26221f 100644 --- a/org.glite.jp.client/interface/jpimporter.h +++ b/org.glite.jp.client/interface/jpimporter.h @@ -15,7 +15,8 @@ #define PERF_JOBID_START_PREFIX "https://start.megajob/" #define PERF_JOBID_STOP_PREFIX "https://stop.megajob/" - +#define PERF_START_FILE "/tmp/jp_megajob_start" +#define PERF_STOP_FILE_FORMAT "/tmp/jp_megajob_%s" #ifdef __cplusplus extern "C" { diff --git a/org.glite.jp.client/src/jpimp_lib.c b/org.glite.jp.client/src/jpimp_lib.c index efebc2ab..974c0dc 100644 --- a/org.glite.jp.client/src/jpimp_lib.c +++ b/org.glite.jp.client/src/jpimp_lib.c @@ -5,6 +5,7 @@ #include #include #include +#include #define COMPILE_WITH_LIBTAR diff --git a/org.glite.jp.client/src/jpimporter.c b/org.glite.jp.client/src/jpimporter.c index 50047c4..d5f9d41 100644 --- a/org.glite.jp.client/src/jpimporter.c +++ b/org.glite.jp.client/src/jpimporter.c @@ -14,6 +14,7 @@ #include #include #include +#include #include "glite/lb/lb_maildir.h" @@ -38,7 +39,6 @@ typedef struct { char *val; } msg_pattern_t; - #ifndef dprintf #define dprintf(FMT, ARGS...) { if (debug) printf(FMT, ##ARGS); } #endif @@ -76,10 +76,14 @@ static gss_cred_id_t mycred = GSS_C_NO_CREDENTIAL; static char *mysubj; struct timeval to = {JPPS_NO_RESPONSE_TIMEOUT, 0}; #ifdef JP_PERF +typedef struct { + char *id, *name; + long int count, limit; + double start, end; +} perf_t; + int sink = 0; -int perf_regs, perf_dumps, perf_sandboxes; -char *perf_id = NULL; -double perf_start, perf_end; +perf_t perf = {name:NULL,}; #endif @@ -111,6 +115,12 @@ static const char *get_opt_string = "hgp:r:d:s:i:t:c:k:C:" #include "glite/jp/ws_fault.c" +#ifdef JP_PERF +static void stats_init(perf_t *perf, const char *name); +static void stats_set_jobid(perf_t *perf, const char *jobid); +static void stats_done(perf_t *perf); +#endif + static void usage(char *me) { fprintf(stderr,"usage: %s [option]\n" @@ -137,7 +147,7 @@ static void catchsig(int sig) die = sig; } -static void catch_chld(int sig) +static void catch_chld(int sig __attribute__((unused))) { child_died = 1; } @@ -394,7 +404,7 @@ static int slave(int (*fn)(void), const char *nm) } if ( die ) { - dprintf("[%s] Terminating on signal %d\n", name, getpid(), die); + dprintf("[%s] %d: Terminating on signal %d\n", name, getpid(), die); if ( !debug ) syslog(LOG_INFO, "Terminating on signal %d", die); } dprintf("[%s] Terminating after %d connections\n", name, conn_cnt); @@ -453,25 +463,12 @@ static int reg_importer(void) if ( !debug ) syslog(LOG_INFO, "Registering '%s'\n", msg); #ifdef JP_PERF if (sink) { - struct timeval tv; - if (strncasecmp(msg, PERF_JOBID_START_PREFIX, sizeof(PERF_JOBID_START_PREFIX) - 1) == 0) { - perf_regs = 0; - perf_dumps = 0; - perf_sandboxes = 0; - free(perf_id); - perf_id = strdup(msg + sizeof(PERF_JOBID_START_PREFIX) - 1); - gettimeofday(&tv, NULL); - perf_start = tv.tv_sec + (double)tv.tv_usec / 1000000.0; - dprintf("[statistics] %s\n", perf_id); - } else if (strncasecmp(msg, PERF_JOBID_STOP_PREFIX, sizeof(PERF_JOBID_STOP_PREFIX) - 1) == 0) { - gettimeofday(&tv, NULL); - perf_end = tv.tv_sec + (double)tv.tv_usec / 1000000.0; - dprintf("[statistics] %s\n", perf_id); - dprintf("[statistics] start: %lf\n", perf_start); - dprintf("[statistics] stop: %lf\n", perf_end); - dprintf("[statistics] regs: %d (%lf jobs/day)\n", perf_regs, 86400.0 * perf_regs / (perf_end - perf_start)); - } else perf_regs++; + stats_init(&perf, name); + stats_set_jobid(&perf, msg); + } + else if (strncasecmp(msg, PERF_JOBID_STOP_PREFIX, sizeof(PERF_JOBID_STOP_PREFIX) - 1) == 0) stats_done(&perf); + else perf.count++; } if (!(sink & 2)) { #endif @@ -499,7 +496,6 @@ static int dump_importer(void) static int readnew = 1; char *msg = NULL, *fname = NULL, - *aux, *bname; char fspec[PATH_MAX]; int ret; @@ -551,7 +547,45 @@ static int dump_importer(void) dprintf("[%s] Importing LB dump file '%s'\n", name, tab[_file].val); if ( !debug ) syslog(LOG_INFO, "Importing LB dump file '%s'\n", msg); #ifdef JP_PERF - if (sink) perf_dumps++; + if (sink) { + /* statistics started by file, ended by count limit (from the appropriate result fikle) */ + FILE *f; + char *fn, item[200]; + int count; + + /* starter */ + if (!perf.name) { + f = fopen(PERF_START_FILE, "rt"); + if (f) { + stats_init(&perf, name); + fscanf(f, "%s", item); + fclose(f); + unlink(PERF_START_FILE); + stats_set_jobid(&perf, item); + } else + dprintf("[%s statistics]: not started, fopen(\"" PERF_START_FILE "\") => %d\n", name, errno); + } + if (perf.name) { + perf.count++; + if (perf.limit) { + dprintf("[%s statistics] done %ld/%ld\n", name, perf.count, perf.limit); + if (perf.count >= perf.limit) stats_done(&perf); + } else { + dprintf("[%s statistics] done %ld/no limit\n", name, perf.count); + /* stopper */ + asprintf(&fn, PERF_STOP_FILE_FORMAT, perf.id); + f = fopen(fn, "rt"); + free(fn); + if (f) { + fscanf(f, "%s\t%d", item, &count); + fscanf(f, "%s\t%d", item, &count); + dprintf("[%s statistics] expected %d %s\n", name, count, item); + fclose(f); + perf.limit = count; + } + } + } + } if (!(sink & 2)) { #endif ret = soap_call___jpsrv__StartUpload(soap, tab[_jpps].val?:jpps, "", &su_in, &su_out); @@ -612,8 +646,7 @@ static int sandbox_importer(void) struct _jpelem__CommitUploadResponse empty; static int readnew = 1; char *msg = NULL, - *fname = NULL, - *aux; + *fname = NULL; int ret; int fhnd; msg_pattern_t tab[] = { @@ -667,7 +700,6 @@ static int sandbox_importer(void) dprintf("[%s] Importing LB sandbox tar file '%s'\n", name, tab[_file].val); if ( !debug ) syslog(LOG_INFO, "Importing LB sandbox tar file '%s'\n", msg); #ifdef JP_PERF - if (sink) perf_sandboxes++; if (!(sink & 2)) { #endif ret = soap_call___jpsrv__StartUpload(soap, tab[_jpps].val?:jpps, "", &su_in, &su_out); @@ -860,6 +892,38 @@ static int gftp_put_file(const char *url, int fhnd) return (gError == GLOBUS_TRUE)? 1: 0; } +#ifdef JP_PERF +static void stats_init(perf_t *perf, const char *name) { + struct timeval tv; + + memset(perf, 0, sizeof *perf); + perf->count = 0; + perf->name = strdup(name); + gettimeofday(&tv, NULL); + perf->start = tv.tv_sec + (double)tv.tv_usec / 1000000.0; + dprintf("[%s statistics] start detected\n", name); +} + +static void stats_set_jobid(perf_t *perf, const char *jobid) { + perf->id = strdup(jobid + sizeof(PERF_JOBID_START_PREFIX) - 1); + dprintf("[%s statistics] ID %s\n", perf->name, perf->id); +} + +static void stats_done(perf_t *perf) { + struct timeval tv; + + gettimeofday(&tv, NULL); + perf->end = tv.tv_sec + (double)tv.tv_usec / 1000000.0; + dprintf("[%s statistics] %s\n", perf->name, perf->id); + dprintf("[%s statistics] start: %lf\n", perf->name, perf->start); + dprintf("[%s statistics] stop: %lf\n", perf->name, perf->end); + dprintf("[%s statistics] count: %ld (%lf job/day)\n", perf->name, perf->count, 86400.0 * perf->count / (perf->end - perf->start)); + free(perf->id); + free(perf->name); + memset(perf, 0, sizeof *perf); +} +#endif + /* XXX: we don't use it */ SOAP_NMAC struct Namespace namespaces[] = { {NULL,NULL} }; -- 1.8.2.3