Fixes in statistics (uniq without /, print stats only once after done).
authorFrantišek Dvořák <valtri@civ.zcu.cz>
Tue, 24 Jul 2007 13:54:07 +0000 (13:54 +0000)
committerFrantišek Dvořák <valtri@civ.zcu.cz>
Tue, 24 Jul 2007 13:54:07 +0000 (13:54 +0000)
Implement break feature in the feeder (and added options).

org.glite.jp.client/examples/mill_feed.c
org.glite.jp.client/interface/jpimporter.h
org.glite.jp.client/src/jpimporter.c

index ca06dad..1f2c562 100644 (file)
@@ -8,6 +8,7 @@
 #include <errno.h>
 #include <string.h>
 #include <time.h>
+#include <getopt.h>
 #include "jp_client.h"
 #include "jpimporter.h" 
 #include "glite/lb/lb_maildir.h" 
@@ -33,7 +34,19 @@ int do_exit = 0;
 int perf_regs, perf_dumps;
 char perf_ts[100];
 char *dump; char **dump_index; size_t dump_tokens;
-
+int speed = 0;
+double duration = 0.0;
+
+static struct option opts[] = {
+       { "help",        0, NULL,    'h'},
+       { "reg-mdir",    1, NULL,    'R'},
+       { "dump-mdir",   1, NULL,    'D'},
+       { "break",       1, NULL,    'b'},
+       { "dump",        1, NULL,    'd'},
+//     { "sandbox-mdir",1, NULL,    's'},
+       { NULL,          0, NULL,     0}
+};
+static const char *get_opt_string = "hR:D:b:d:";
 
 static int register_init();
 static int register_add(const char *jobid, char **new_jobid);
@@ -49,32 +62,67 @@ static void handler(int sig) {
 }
 
 
+static void usage(const char *program) {
+       fprintf(stderr, "Usage: %s [OPTIONS]\n"
+               "\t-R,--reg-mdir\n"
+               "\t-D,--dump-mdir\n"
+//             "\t-s,--sandbox-mdir\n"
+               "\t-b,--break      speed (jobs/day)\n"
+               "\t-d,--dump       dump file\n"
+       , program);
+}
+
+
 int main(int argc, char *argv[]) {
        char start_jobid[256], stop_jobid[256], *fn;
-       double ts, ts2;
-       int ret;
+       double ts, ts2, last, now;
+       int ret, opt;
        FILE *f;
-       char *jobid;
+       char *jobid, *dumpfile = NULL;
+
+       while ((opt = getopt_long(argc, argv, get_opt_string, opts, NULL)) != EOF)
+       switch (opt) {
+               case 'h': usage(argv[0]); return 0;
+               case 'R': jpreg_dir = strdup(optarg); break;
+               case 'D': dump_dir = strdup(optarg); break;
+               case 'b': speed = atoi(optarg); if (speed) duration = 24.0*3600.0*1000000.0/speed; break;
+               case 'd': dumpfile = optarg; break;
+               default: printf("opt: %c\n", opt); usage(argv[0]); return 1;
+       }
 
        get_time(perf_ts, sizeof(perf_ts), &ts);
        snprintf(start_jobid, sizeof(start_jobid), PERF_JOBID_START_PREFIX "%s", perf_ts);
        snprintf(stop_jobid, sizeof(stop_jobid), PERF_JOBID_STOP_PREFIX "%s", perf_ts);
 
        if ((ret = register_init()) != 0) return ret;
-       if ((ret = dump_init(start_jobid, argc > 1 ? argv[1] : NULL)) != 0) return ret;
+       if ((ret = dump_init(start_jobid, dumpfile)) != 0) return ret;
        if ((ret = register_add(start_jobid, NULL)) != 0) return ret;
        if (signal(SIGINT, handler) == SIG_ERR) {
                ret = errno;
                fprintf(stderr, "%s: can't set signal handler: %s\n", __FUNCTION__, strerror(errno));
                return ret;
        }
+       if (speed) printf("speed:     %d jobs/day (delay %lf)\n", speed, duration / 1000000.0);
+       else printf("speed:     unlimited\n");
+       printf("reg-mdir:  %s\n", jpreg_dir);
+       printf("dump-mdir: %s\n", dump_dir);
+       printf("start:     %lf\n", ts);
        printf("%s\n", start_jobid);
-       printf("start: %lf\n", ts);
+       last = ts;
        while (!do_exit) {
+               struct timeval tv;
+
                if ((ret = register_add(NULL, &jobid)) != 0) return ret;
-               if (argc > 1)
-                       if ((ret = dump_add(argv[1], jobid)) != 0) return ret;
+//             printf("%s\n", jobid);
+               if (dumpfile) {
+                       if ((ret = dump_add(dumpfile, jobid)) != 0) return ret;
+//                     printf("  dumped %s\n", dumpfile);
+               }
                free(jobid);
+               gettimeofday(&tv, NULL);
+               now = tv.tv_sec + (double)tv.tv_usec / 1000000.0;
+               if (now < last + duration) usleep(last + duration - now);
+               last = now;
        }
        asprintf(&fn, PERF_STOP_FILE_FORMAT, perf_ts);
        if ((f = fopen(fn, "wt")) == NULL) {
@@ -114,9 +162,12 @@ static void get_time(char *s, size_t maxs, double *t) {
 static int register_init() {
         char *env;
 
-        env = getenv("GLITE_LB_EXPORT_JPREG_MAILDIR");
-        if (!env) env = GLITE_REG_IMPORTER_MDIR;
-        jpreg_dir = strdup(env);
+        if (!jpreg_dir) {
+               env = getenv("GLITE_LB_EXPORT_JPREG_MAILDIR");
+               if (env) jpreg_dir = strdup(env);
+               else jpreg_dir = strdup(GLITE_REG_IMPORTER_MDIR);
+       }
+        
 
        // TODO: better from certificate        
         env = getenv("GLITE_USER");
@@ -210,9 +261,11 @@ static int dump_init(const char *start_jobid, const char *filename) {
 //for (i = 0; i < dump_tokens; i++) printf("####%s\n", dump_index[i]);
 
        // FIXME: is it OK? (probably different HEAD and branch)
-        env = getenv("GLITE_LB_EXPORT_DUMPDIR");
-        if (!env) env = EDG_DUMP_STORAGE;
-        dump_dir = strdup(env);
+       if (!dump_dir) {
+               env = getenv("GLITE_LB_EXPORT_DUMPDIR");
+               if (env) dump_dir = strdup(env);
+               else dump_dir = strdup(EDG_DUMP_STORAGE);
+       }
        mkdir(dump_dir, 0755);
        perf_dumps = 0;
 
index be3ecdb..019014f 100644 (file)
@@ -13,8 +13,8 @@
 #define GLITE_SANDBOX_IMPORTER_MDIR    "/tmp/jpsandbox"
 #endif 
 
-#define PERF_JOBID_START_PREFIX "https://start.megajob/START/"
-#define PERF_JOBID_STOP_PREFIX "https://stop.megajob/STOP/"
+#define PERF_JOBID_START_PREFIX "https://start.megajob/START-"
+#define PERF_JOBID_STOP_PREFIX "https://stop.megajob/STOP-"
 #define PERF_START_FILE                "/tmp/jp_megajob_start"
 #define PERF_STOP_FILE_FORMAT   "/tmp/jp_megajob_%s"
 
index 140f898..cc33312 100644 (file)
@@ -467,7 +467,7 @@ static int reg_importer(void)
                                        stats_init(&perf, name);
                                        stats_set_jobid(&perf, msg);
                                }
-                               else if (strncasecmp(msg, PERF_JOBID_STOP_PREFIX, sizeof(PERF_JOBID_STOP_PREFIX) - 1) == 0) stats_done(&perf);
+                               else if (perf.name && strncasecmp(msg, PERF_JOBID_STOP_PREFIX, sizeof(PERF_JOBID_STOP_PREFIX) - 1) == 0) stats_done(&perf);
                        }
                        if (!(sink & 2)) {
 #endif
@@ -475,7 +475,7 @@ static int reg_importer(void)
                        if ( (ret = check_soap_fault(soap, ret)) ) break;
 #ifdef JP_PERF
                        } else ret = 0;
-                       if ((sink & 1) && ret == 0) {
+                       if (perf.name && ret == 0) {
                                perf.count++;
                                dprintf("[%s statistics] done %ld\n", name, perf.count);
                        }