Added support for dumps to the megajob (without transformations yet).
authorFrantišek Dvořák <valtri@civ.zcu.cz>
Thu, 19 Jul 2007 14:41:34 +0000 (14:41 +0000)
committerFrantišek Dvořák <valtri@civ.zcu.cz>
Thu, 19 Jul 2007 14:41:34 +0000 (14:41 +0000)
Enabled warnings (prevents printf segfaults).

org.glite.jp.client/Makefile
org.glite.jp.client/examples/glite-jp-importer.sh
org.glite.jp.client/examples/mill_feed.c
org.glite.jp.client/interface/jpimporter.h
org.glite.jp.client/src/jpimp_lib.c
org.glite.jp.client/src/jpimporter.c

index 8ce0358..02a4e84 100644 (file)
@@ -38,7 +38,7 @@ GLOBUS_CFLAGS:=-I${globus_prefix}/include/${nothrflavour}
 
 DEBUG:=-g -O0 -DDEBUG
 
-CFLAGS:=${DEBUG} -I. -I${top_srcdir}/interface -I${top_srcdir}/src -I${gsoap_prefix}/include -I${gsoap_prefix} -I${stagedir}/include ${GLOBUS_CFLAGS} -I${libtar}/include ${JP_PERF_CFLAGS}
+CFLAGS:=${DEBUG} -I. -I${top_srcdir}/interface -I${top_srcdir}/src -I${gsoap_prefix}/include -I${gsoap_prefix} -I${stagedir}/include ${GLOBUS_CFLAGS} -I${libtar}/include ${JP_PERF_CFLAGS} -W -Wall -Wno-unused-parameter -D_GNU_SOURCE
 LDFLAGS:=-L${stagedir}/lib -L${libtar}/lib
 
 LINK:=libtool --mode=link ${CC} ${LDFLAGS} 
index 2740e05..cfc5849 100644 (file)
@@ -81,10 +81,10 @@ while [ 1 ]; do
       if [ -n "$GLITE_LB_EXPORT_DUMPDIR_KEEP" ]; then
         mv $file $GLITE_LB_EXPORT_DUMPDIR_KEEP
       else
-        rm $file
+        rm -f $file
       fi
     else
-      rm $file
+      rm -f $file
     fi
   done
 
index 676a829..3e3b4d4 100644 (file)
@@ -1,3 +1,7 @@
+#include <sys/time.h>
+#include <sys/stat.h>
+#include <sys/types.h>
+#include <unistd.h>
 #include <signal.h>
 #include <stdlib.h>
 #include <stdio.h>
@@ -33,7 +37,9 @@ char perf_ts[100];
 static int register_init();
 static int register_add(const char *jobid);
 static void get_time(char *s, size_t maxs, double *t);
-static int dump_init();
+static int dump_init(const char *start_jobid);
+static int dump_add(const char *filename);
+static void dump_done();
 
 
 static void handler(int sig) {
@@ -41,17 +47,19 @@ static void handler(int sig) {
        signal(sig, SIG_DFL);
 }
 
+
 int main(int argc, char *argv[]) {
-       char start_jobid[256], stop_jobid[256];
+       char start_jobid[256], stop_jobid[256], *fn;
        double ts, ts2;
        int ret;
+       FILE *f;
 
        get_time(perf_ts, sizeof(perf_ts), &ts);
        snprintf(start_jobid, sizeof(start_jobid), PERF_JOBID_START_PREFIX "%s", perf_ts);
        snprintf(stop_jobid, sizeof(stop_jobid), PERF_JOBID_STOP_PREFIX "%s", perf_ts);
 
        if ((ret = register_init()) != 0) return ret;
-       if ((ret = dump_init()) != 0) return ret;
+       if ((ret = dump_init(start_jobid)) != 0) return ret;
        if ((ret = register_add(start_jobid)) != 0) return ret;
        if (signal(SIGINT, handler) == SIG_ERR) {
                ret = errno;
@@ -65,7 +73,20 @@ int main(int argc, char *argv[]) {
                if (argc > 1)
                        if ((ret = dump_add(argv[1])) != 0) return ret;
        }
+       asprintf(&fn, PERF_STOP_FILE_FORMAT, perf_ts);
+       if ((f = fopen(fn, "wt")) == NULL) {
+               ret = errno;
+               free(fn);
+               fprintf(stderr, "Can' create file '%s': %s\n", fn, strerror(errno));
+               return ret;
+       }
+       free(fn);
+       fprintf(f, "regs\t%d\n", perf_regs);
+       fprintf(f, "dumps\t%d\n", perf_dumps);
+       fclose(f);
        if ((ret = register_add(stop_jobid)) != 0) return ret;
+       dump_done();
+
        get_time(NULL, -1, &ts2);
        printf("stop:  %lf\n", ts2);
        printf("regs:  %d (%lf jobs/day)\n", perf_regs, 86400.0 * perf_regs / (ts2-ts));
@@ -133,9 +154,11 @@ static int register_add(const char *jobid) {
 }
 
 
-static int dump_init() {
+static int dump_init(const char *start_jobid) {
         char *env;
+       FILE *f;
 
+       unlink(PERF_START_FILE);
        // FIXME: is it OK? (probably different HEAD and branch)
         env = getenv("GLITE_LB_EXPORT_DUMPDIR");
         if (!env) env = EDG_DUMP_STORAGE;
@@ -143,7 +166,14 @@ static int dump_init() {
        mkdir(dump_dir, 0755);
        perf_dumps = 0;
 
-        return 0;       
+       if ((f = fopen(PERF_START_FILE, "wt")) == NULL) {
+               fprintf(stderr, "Can't create file '" PERF_START_FILE "': %s\n", strerror(errno));
+               return EIO;
+       }
+       if (start_jobid) fprintf(f, "%s\n", start_jobid);
+       fclose(f);
+
+        return 0;
 }
 
 
@@ -160,3 +190,7 @@ static int dump_add(const char *filename) {
        perf_dumps++;
        return ret;
 }
+
+
+static void dump_done() {
+}
index a0b0bc7..e26221f 100644 (file)
@@ -15,7 +15,8 @@
 
 #define PERF_JOBID_START_PREFIX "https://start.megajob/"
 #define PERF_JOBID_STOP_PREFIX "https://stop.megajob/"
-
+#define PERF_START_FILE                "/tmp/jp_megajob_start"
+#define PERF_STOP_FILE_FORMAT   "/tmp/jp_megajob_%s"
 
 #ifdef __cplusplus
 extern "C" {
index efebc2a..974c0dc 100644 (file)
@@ -5,6 +5,7 @@
 #include <assert.h>
 #include <fcntl.h>
 #include <dirent.h>
+#include <stdio.h>
 
 
 #define COMPILE_WITH_LIBTAR
index 50047c4..d5f9d41 100644 (file)
@@ -14,6 +14,7 @@
 #include <syslog.h>
 #include <fcntl.h>
 #include <libgen.h>
+#include <ctype.h>
 
 #include "glite/lb/lb_maildir.h"
 
@@ -38,7 +39,6 @@ typedef struct {
        char       *val;
 } msg_pattern_t;
 
-
 #ifndef dprintf
 #define dprintf(FMT, ARGS...)          { if (debug) printf(FMT, ##ARGS); }
 #endif
@@ -76,10 +76,14 @@ static gss_cred_id_t        mycred = GSS_C_NO_CREDENTIAL;
 static char            *mysubj;
 struct timeval         to = {JPPS_NO_RESPONSE_TIMEOUT, 0};
 #ifdef JP_PERF
+typedef struct {
+       char *id, *name;
+       long int count, limit;
+       double start, end;
+} perf_t;
+
 int                    sink = 0;
-int                    perf_regs, perf_dumps, perf_sandboxes;
-char                   *perf_id = NULL;
-double                 perf_start, perf_end;
+perf_t                 perf = {name:NULL,};
 #endif
 
 
@@ -111,6 +115,12 @@ static const char *get_opt_string = "hgp:r:d:s:i:t:c:k:C:"
 
 #include "glite/jp/ws_fault.c"
 
+#ifdef JP_PERF
+static void stats_init(perf_t *perf, const char *name);
+static void stats_set_jobid(perf_t *perf, const char *jobid);
+static void stats_done(perf_t *perf);
+#endif
+
 static void usage(char *me)
 {
        fprintf(stderr,"usage: %s [option]\n"
@@ -137,7 +147,7 @@ static void catchsig(int sig)
        die = sig;
 }
 
-static void catch_chld(int sig)
+static void catch_chld(int sig __attribute__((unused)))
 {
        child_died = 1;
 }
@@ -394,7 +404,7 @@ static int slave(int (*fn)(void), const char *nm)
        }
 
        if ( die ) {
-               dprintf("[%s] Terminating on signal %d\n", name, getpid(), die);
+               dprintf("[%s] %d: Terminating on signal %d\n", name, getpid(), die);
                if ( !debug ) syslog(LOG_INFO, "Terminating on signal %d", die);
        }
     dprintf("[%s] Terminating after %d connections\n", name, conn_cnt);
@@ -453,25 +463,12 @@ static int reg_importer(void)
                        if ( !debug ) syslog(LOG_INFO, "Registering '%s'\n", msg);
 #ifdef JP_PERF
                        if (sink) {
-                               struct timeval tv;
-
                                if (strncasecmp(msg, PERF_JOBID_START_PREFIX, sizeof(PERF_JOBID_START_PREFIX) - 1) == 0) {
-                                       perf_regs = 0;
-                                       perf_dumps = 0;
-                                       perf_sandboxes = 0;
-                                       free(perf_id);
-                                       perf_id = strdup(msg + sizeof(PERF_JOBID_START_PREFIX) - 1);
-                                       gettimeofday(&tv, NULL);
-                                       perf_start = tv.tv_sec + (double)tv.tv_usec / 1000000.0;
-                                       dprintf("[statistics] %s\n", perf_id);
-                               } else if (strncasecmp(msg, PERF_JOBID_STOP_PREFIX, sizeof(PERF_JOBID_STOP_PREFIX) - 1) == 0) {
-                                       gettimeofday(&tv, NULL);
-                                       perf_end = tv.tv_sec + (double)tv.tv_usec / 1000000.0;
-                                       dprintf("[statistics] %s\n", perf_id);
-                                       dprintf("[statistics] start: %lf\n", perf_start);
-                                       dprintf("[statistics] stop:  %lf\n", perf_end);
-                                       dprintf("[statistics] regs:  %d (%lf jobs/day)\n", perf_regs, 86400.0 * perf_regs / (perf_end - perf_start));
-                               } else perf_regs++;
+                                       stats_init(&perf, name);
+                                       stats_set_jobid(&perf, msg);
+                               }
+                               else if (strncasecmp(msg, PERF_JOBID_STOP_PREFIX, sizeof(PERF_JOBID_STOP_PREFIX) - 1) == 0) stats_done(&perf);
+                               else perf.count++;
                        }
                        if (!(sink & 2)) {
 #endif
@@ -499,7 +496,6 @@ static int dump_importer(void)
        static int              readnew = 1;
        char               *msg = NULL,
                                   *fname = NULL,
-                                  *aux,
                                   *bname;
        char                        fspec[PATH_MAX];
        int                             ret;
@@ -551,7 +547,45 @@ static int dump_importer(void)
                dprintf("[%s] Importing LB dump file '%s'\n", name, tab[_file].val);
                if ( !debug ) syslog(LOG_INFO, "Importing LB dump file '%s'\n", msg);
 #ifdef JP_PERF
-               if (sink) perf_dumps++;
+               if (sink) {
+               /* statistics started by file, ended by count limit (from the appropriate result fikle) */
+                       FILE *f;
+                       char *fn, item[200];
+                       int count;
+
+                       /* starter */
+                       if (!perf.name) {
+                               f = fopen(PERF_START_FILE, "rt");
+                               if (f) {
+                                       stats_init(&perf, name);
+                                       fscanf(f, "%s", item);
+                                       fclose(f);
+                                       unlink(PERF_START_FILE);
+                                       stats_set_jobid(&perf, item);
+                               } else
+                                       dprintf("[%s statistics]: not started, fopen(\"" PERF_START_FILE "\") => %d\n", name, errno);
+                       }
+                       if (perf.name) {
+                               perf.count++;
+                               if (perf.limit) {
+                                       dprintf("[%s statistics] done %ld/%ld\n", name, perf.count, perf.limit);
+                                       if (perf.count >= perf.limit) stats_done(&perf);
+                               } else {
+                                       dprintf("[%s statistics] done %ld/no limit\n", name, perf.count);
+                                       /* stopper */
+                                       asprintf(&fn, PERF_STOP_FILE_FORMAT, perf.id);
+                                       f = fopen(fn, "rt");
+                                       free(fn);
+                                       if (f) {
+                                               fscanf(f, "%s\t%d", item, &count);
+                                               fscanf(f, "%s\t%d", item, &count);
+                                               dprintf("[%s statistics] expected %d %s\n", name, count, item);
+                                               fclose(f);
+                                               perf.limit = count;
+                                       }
+                               }
+                       }
+               }
                if (!(sink & 2)) {
 #endif
                ret = soap_call___jpsrv__StartUpload(soap, tab[_jpps].val?:jpps, "", &su_in, &su_out);
@@ -612,8 +646,7 @@ static int sandbox_importer(void)
        struct _jpelem__CommitUploadResponse    empty;
        static int      readnew = 1;
        char            *msg = NULL,
-                       *fname = NULL,
-                       *aux;
+                       *fname = NULL;
        int             ret;
        int             fhnd;
        msg_pattern_t   tab[] = {
@@ -667,7 +700,6 @@ static int sandbox_importer(void)
                dprintf("[%s] Importing LB sandbox tar file '%s'\n", name, tab[_file].val);
                if ( !debug ) syslog(LOG_INFO, "Importing LB sandbox tar file '%s'\n", msg);
 #ifdef JP_PERF
-               if (sink) perf_sandboxes++;
                if (!(sink & 2)) {
 #endif
                ret = soap_call___jpsrv__StartUpload(soap, tab[_jpps].val?:jpps, "", &su_in, &su_out);
@@ -860,6 +892,38 @@ static int gftp_put_file(const char *url, int fhnd)
     return (gError == GLOBUS_TRUE)? 1: 0;
 }
 
+#ifdef JP_PERF
+static void stats_init(perf_t *perf, const char *name) {
+       struct timeval tv;
+
+       memset(perf, 0, sizeof *perf);
+       perf->count = 0;
+       perf->name = strdup(name);
+       gettimeofday(&tv, NULL);
+       perf->start = tv.tv_sec + (double)tv.tv_usec / 1000000.0;
+       dprintf("[%s statistics] start detected\n", name);
+}
+
+static void stats_set_jobid(perf_t *perf, const char *jobid) {
+       perf->id = strdup(jobid + sizeof(PERF_JOBID_START_PREFIX) - 1);
+       dprintf("[%s statistics] ID %s\n", perf->name, perf->id);
+}
+
+static void stats_done(perf_t *perf) {
+       struct timeval tv;
+
+       gettimeofday(&tv, NULL);
+       perf->end = tv.tv_sec + (double)tv.tv_usec / 1000000.0;
+       dprintf("[%s statistics] %s\n", perf->name, perf->id);
+       dprintf("[%s statistics] start: %lf\n", perf->name, perf->start);
+       dprintf("[%s statistics] stop:  %lf\n", perf->name, perf->end);
+       dprintf("[%s statistics] count: %ld (%lf job/day)\n", perf->name, perf->count, 86400.0 * perf->count / (perf->end - perf->start));
+       free(perf->id);
+       free(perf->name);
+       memset(perf, 0, sizeof *perf);
+}
+#endif
+
 /* XXX: we don't use it */
 SOAP_NMAC struct Namespace namespaces[] = { {NULL,NULL} };