merge (and updates due to security API changes)
authorFrantišek Dvořák <valtri@civ.zcu.cz>
Wed, 26 Mar 2008 14:05:49 +0000 (14:05 +0000)
committerFrantišek Dvořák <valtri@civ.zcu.cz>
Wed, 26 Mar 2008 14:05:49 +0000 (14:05 +0000)
org.glite.jp.client/Makefile
org.glite.jp.client/config/startup
org.glite.jp.client/examples/glite-jp-importer.sh
org.glite.jp.client/examples/mill_feed.c [new file with mode: 0644]
org.glite.jp.client/interface/jpimporter.h
org.glite.jp.client/project/version.properties
org.glite.jp.client/src/jpimp_lib.c
org.glite.jp.client/src/jpimporter.c

index bb3e673..a1a869e 100644 (file)
@@ -14,7 +14,6 @@ glite_location=/opt/glite
 globus_prefix=/opt/globus
 nothrflavour=gcc32
 thrflavour=gcc32pthr
-expat_prefix=/opt/expat
 gsoap_prefix=/software/gsoap-2.6
 
 CC=gcc
@@ -25,6 +24,10 @@ STAGETO=include/${globalprefix}/${jpprefix}
 
 VPATH=${top_srcdir}/src:${top_srcdir}/examples:${top_srcdir}/project$:${top_srcdir}/interface:${stagedir}/interface:${top_srcdir}/build
 
+ifdef JP_PERF
+       JP_PERF_CFLAGS:=-DJP_PERF=1
+endif
+
 GLOBUS_LIBS:=-L${globus_prefix}/lib \
        -lglobus_ftp_client_${nothrflavour} \
        -lglobus_ftp_control_${nothrflavour}
@@ -33,7 +36,7 @@ GLOBUS_CFLAGS:=-I${globus_prefix}/include/${nothrflavour}
 
 DEBUG:=-g -O0 -DDEBUG
 
-CFLAGS:=${DEBUG} -I. -I${top_srcdir}/interface -I${top_srcdir}/src -I${gsoap_prefix}/include -I${gsoap_prefix} -I${stagedir}/include ${GLOBUS_CFLAGS} -I${libtar}/include
+CFLAGS:=${DEBUG} -I. -I${top_srcdir}/interface -I${top_srcdir}/src -I${gsoap_prefix}/include -I${gsoap_prefix} -I${stagedir}/include ${GLOBUS_CFLAGS} -I${libtar}/include ${JP_PERF_CFLAGS} -W -Wall -Wno-unused-parameter -D_GNU_SOURCE
 LDFLAGS:=-L${stagedir}/lib -L${libtar}/lib
 
 LINK:=libtool --mode=link ${CC} ${LDFLAGS} 
@@ -41,10 +44,12 @@ LTCOMPILE:=libtool --mode=compile ${CC} ${CFLAGS}
 LINKXX:=libtool --mode=link ${CXX} ${LDFLAGS} 
 INSTALL:=libtool --mode=install install
 
+LIBTAR:=-L${libtar}/lib -ltar
+
 STAGE_HDRS:=jpcl_ctx_int.h
 HDRS:=jp_client.h jpimporter.h
 
-EXAMPLES:=jpps_upload_files
+EXAMPLES:=jpps_upload_files mill_feed
 
 LIBOBJS:=jpcl_ctx.o jpimp_lib.o
 LIBTHROBJS:=${LIBOBJS:.o=.thr.o}
@@ -77,18 +82,19 @@ compile: ${daemon} ${LIB} ${EXAMPLES}
 
 examples: ${EXAMPLES}
 
-# XXX: should depend on -ltar but we have no dynamic version so far :-(
 ${LIB}: ${LIBOBJS}
-       ${LINK} ${version_info} -o $@ ${LIBLOBJS} -rpath ${glite_location}/lib ${LBMAILDIRLIB} 
+       ${LINK} ${version_info} -o $@ ${LIBLOBJS} -rpath ${glite_location}/lib ${LBMAILDIRLIB} ${LIBTAR}
 
 ${daemon}: ${OBJS}
        ${LINK} -o $@ ${OBJS} ${LBMAILDIRLIB} ${GSOAPLIB} ${GLOBUS_LIBS}
 
 ${EXAMPLES}: ${LIB}
 
-# XXX: -ltar should be in the library
-${EXAMPLES}: %: %.o
-       ${LINK} -o $@ $< ${LIB} ${LBMAILDIRLIB}  -ltar
+jpps_upload_files: %: %.o
+       ${LINK} -o $@ $< ${LIB} ${LBMAILDIRLIB} 
+
+mill_feed: %: %.o
+       ${LINK} -o $@ $< ${LBMAILDIRLIB} -lglite_lb_common_${nothrflavour}
 
 
 
@@ -148,6 +154,7 @@ install:
        ${INSTALL} -m 755 ${daemon} ${PREFIX}/bin
        ${INSTALL} -m 644 ${LIB} ${PREFIX}/lib
        ${INSTALL} -m 644 jpps_upload_files ${PREFIX}/examples/glite-jp-primary-upload_files
+       ${INSTALL} -m 755 mill_feed ${PREFIX}/examples/glite-jp-mill_feed
        cd ${top_srcdir}/examples && ${INSTALL} -m 755 glite-jp-importer.sh ${PREFIX}/examples/
        ${INSTALL} -m 755 ${top_srcdir}/config/startup ${PREFIX}/etc/init.d/glite-jp-importer
        cd ${top_srcdir}/interface && ${INSTALL} -m 644 ${HDRS} ${PREFIX}/${STAGETO}
index 5d5173e..b38020d 100755 (executable)
@@ -2,6 +2,7 @@
 
 GLITE_LOCATION=${GLITE_LOCATION:-/opt/glite}
 GLITE_LOCATION_VAR=${GLITE_LOCATION_VAR:-/var/glite}
+GLITE_TMP=${GLITE_TMP:-/tmp}
 
 [ -f /etc/glite.conf ] && . /etc/glite.conf
 [ -f $GLITE_LOCATION/etc/glite-wms.conf ] && . $GLITE_LOCATION/etc/glite-wms.conf
@@ -36,8 +37,10 @@ start()
 
        [ -z "$creds" ] && echo $0: WARNING: No credentials specified. Using default lookup which is dangerous. >&2
 
-       [ -n "$GLITE_LB_EXPORT_JPREG_MAILDIR" ] && jpreg_maildir="--reg-mdir $GLITE_LB_EXPORT_JPREG_MAILDIR "
-       [ -z "$GLITE_LB_EXPORT_JPDUMP_MAILDIR" ] && GLITE_LB_EXPORT_JPDUMP_MAILDIR=/tmp/jpdump
+       [ -z "$GLITE_LB_EXPORT_JPREG_MAILDIR" ] && GLITE_LB_EXPORT_JPREG_MAILDIR=$GLITE_TMP/jpreg
+       jpreg_maildir="--reg-mdir $GLITE_LB_EXPORT_JPREG_MAILDIR "
+       [ -d "$GLITE_LB_EXPORT_JPREG_MAILDIR" ] || mkdir "$GLITE_LB_EXPORT_JPREG_MAILDIR" && chown $GLITE_USER:$GLITE_GROUP "$GLITE_LB_EXPORT_JPREG_MAILDIR"
+       [ -z "$GLITE_LB_EXPORT_JPDUMP_MAILDIR" ] && GLITE_LB_EXPORT_JPDUMP_MAILDIR=$GLITE_TMP/jpdump
        jpdump_maildir="--dump-mdir $GLITE_LB_EXPORT_JPDUMP_MAILDIR "
        [ -d "$GLITE_LB_EXPORT_JPDUMP_MAILDIR" ] || mkdir "$GLITE_LB_EXPORT_JPDUMP_MAILDIR" && chown $GLITE_USER:$GLITE_GROUP "$GLITE_LB_EXPORT_JPDUMP_MAILDIR"
        [ -n "$GLITE_LB_EXPORT_JPPS" ] && jpps="--jpps $GLITE_LB_EXPORT_JPPS "
@@ -55,7 +58,8 @@ start()
 #              -i $pidfile $jpreg_maildir $jpdump_maildir $jpps $sandbox_maildir $creds" \
 
        su - $GLITE_USER -c "$GLITE_LOCATION/bin/glite-jp-importer \
-               -i $pidfile $jpreg_maildir $jpdump_maildir $jpps $sandbox_maildir $keep_jobs $creds" \
+               -i $pidfile $jpreg_maildir $jpdump_maildir $jpps $sandbox_maildir $keep_jobs \
+               $creds $GLITE_JP_IMPORTER_ARGS" \
        && echo " done" || echo " FAILED"
 }
 
index f6347bf..eb40277 100644 (file)
@@ -32,7 +32,11 @@ fi
 # dump directory of bkserver
 if [ -z "$GLITE_LB_EXPORT_DUMPDIR" ]; then
   GLITE_LB_EXPORT_DUMPDIR=/tmp/dump
-  echo "GLITE_LB_EXPORT_DUMPDIR not specified (-D arguent of the bkserver), used $GLITE_LB_EXPORT_DUMPDIR"
+  echo "GLITE_LB_EXPORT_DUMPDIR not specified (-D argument of the bkserver), used $GLITE_LB_EXPORT_DUMPDIR"
+fi
+if [ -z "$GLITE_LB_EXPORT_PURGEDIR" ]; then
+  GLITE_LB_EXPORT_PURGEDIR=/tmp/purge
+  echo "GLITE_LB_EXPORT_PURGEDIR not specified (-S argument of the bkserver), used $GLITE_LB_EXPORT_PURGEDIR"
 fi
 # LB maildir for job registration
 if [ -z "$GLITE_LB_EXPORT_JPREG_MAILDIR" ]; then
@@ -67,7 +71,7 @@ fi
 
 echo "Using cert args $CERT_ARGS"
 
-$PREFIX/bin/glite-jp-importer --reg-mdir $GLITE_LB_EXPORT_JPREG_MAILDIR --dump-mdir $GLITE_LB_EXPORT_JPDUMP_MAILDIR $CERT_ARGS --sandbox-mdir $GLITE_LB_EXPORT_SANDBOX_MAILDIR -g --jpps $GLITE_LB_EXPORT_JPPS $pidfile$keep_jobs> $LOGDIR/jp-importer.log 2>&1 &
+$PREFIX/bin/glite-jp-importer --reg-mdir $GLITE_LB_EXPORT_JPREG_MAILDIR --dump-mdir $GLITE_LB_EXPORT_JPDUMP_MAILDIR $CERT_ARGS --sandbox-mdir $GLITE_LB_EXPORT_SANDBOX_MAILDIR -g --jpps $GLITE_LB_EXPORT_JPPS $pidfile$keep_jobs$GLITE_JP_IMPORTER_ARGS > $LOGDIR/jp-importer.log 2>&1 &
 
 JP_PID=$!
 trap "kill $JP_PID; exit 0" SIGINT
@@ -75,16 +79,17 @@ trap "kill $JP_PID; exit 0" SIGINT
 while [ 1 ]; do
   $PREFIX/bin/glite-lb-purge $GLITE_LB_EXPORT_PURGE_ARGS -l -m $GLITE_LB_EXPORT_BKSERVER -s
 
-  for file in $GLITE_LB_EXPORT_DUMPDIR/*; do
+  list=`ls $GLITE_LB_EXPORT_PURGEDIR/* 2>/dev/null`
+  for file in $list; do
     if [ -s $file ]; then
       $PREFIX/bin/glite-lb-dump_exporter -d $file -s $GLITE_LB_EXPORT_JOBSDIR -m $GLITE_LB_EXPORT_JPDUMP_MAILDIR
       if [ -n "$GLITE_LB_EXPORT_DUMPDIR_KEEP" ]; then
         mv $file $GLITE_LB_EXPORT_DUMPDIR_KEEP
       else
-        rm $file
+        rm -f $file
       fi
     else
-      rm $file
+      rm -f $file
     fi
   done
 
diff --git a/org.glite.jp.client/examples/mill_feed.c b/org.glite.jp.client/examples/mill_feed.c
new file mode 100644 (file)
index 0000000..c24c9e5
--- /dev/null
@@ -0,0 +1,317 @@
+#include <sys/time.h>
+#include <sys/stat.h>
+#include <sys/types.h>
+#include <unistd.h>
+#include <signal.h>
+#include <stdlib.h>
+#include <stdio.h>
+#include <errno.h>
+#include <string.h>
+#include <time.h>
+#include <getopt.h>
+#include "jp_client.h"
+#include "jpimporter.h" 
+#include "glite/lb/lb_maildir.h" 
+#include "glite/jobid/cjobid.h" 
+
+
+#define USER "Job Generator Buddy" 
+#define BKSERVER "funny.zcu.cz"
+#define BKPORT 9000
+#ifndef EDG_DUMP_STORAGE
+#define EDG_DUMP_STORAGE       "/tmp/dump"
+#endif
+#ifndef EDG_PURGE_STORAGE
+#define EDG_PURGE_STORAGE      "/tmp/purge"
+#endif
+
+
+char *jpreg_dir;
+char *dump_dir;
+char *user;
+int do_exit = 0;
+int perf_regs, perf_dumps;
+char perf_ts[100];
+char *dump; char **dump_index; size_t dump_tokens;
+int speed = 0;
+double duration = 0.0;
+
+static struct option opts[] = {
+       { "help",        0, NULL,    'h'},
+       { "reg-mdir",    1, NULL,    'R'},
+       { "dump-mdir",   1, NULL,    'D'},
+       { "break",       1, NULL,    'b'},
+       { "dump",        1, NULL,    'd'},
+//     { "sandbox-mdir",1, NULL,    's'},
+       { NULL,          0, NULL,     0}
+};
+static const char *get_opt_string = "hR:D:b:d:";
+
+static int register_init();
+static int register_add(const char *jobid, char **new_jobid);
+static void get_time(char *s, size_t maxs, double *t);
+static int dump_init(const char *start_jobid, const char *filenmae);
+static int dump_add(const char *filename, const char *jobid);
+static void dump_done();
+
+
+static void handler(int sig) {
+       do_exit = sig;
+       signal(sig, SIG_DFL);
+}
+
+
+static void usage(const char *program) {
+       fprintf(stderr, "Usage: %s [OPTIONS]\n"
+               "\t-R,--reg-mdir\n"
+               "\t-D,--dump-mdir\n"
+//             "\t-s,--sandbox-mdir\n"
+               "\t-b,--break      speed (jobs/day)\n"
+               "\t-d,--dump       dump file\n"
+       , program);
+}
+
+
+int main(int argc, char *argv[]) {
+       char start_jobid[256], stop_jobid[256], *fn;
+       double ts, ts2, last, now;
+       int ret, opt;
+       FILE *f;
+       char *jobid, *dumpfile = NULL;
+
+       while ((opt = getopt_long(argc, argv, get_opt_string, opts, NULL)) != EOF)
+       switch (opt) {
+               case 'h': usage(argv[0]); return 0;
+               case 'R': jpreg_dir = strdup(optarg); break;
+               case 'D': dump_dir = strdup(optarg); break;
+               case 'b': speed = atoi(optarg); if (speed) duration = 24.0*3600.0*1000000.0/speed; break;
+               case 'd': dumpfile = optarg; break;
+               default: printf("opt: %c\n", opt); usage(argv[0]); return 1;
+       }
+
+       get_time(perf_ts, sizeof(perf_ts), &ts);
+       snprintf(start_jobid, sizeof(start_jobid), PERF_JOBID_START_PREFIX "%s", perf_ts);
+       snprintf(stop_jobid, sizeof(stop_jobid), PERF_JOBID_STOP_PREFIX "%s", perf_ts);
+
+       if ((ret = register_init()) != 0) return ret;
+       if ((ret = dump_init(start_jobid, dumpfile)) != 0) return ret;
+       if ((ret = register_add(start_jobid, NULL)) != 0) return ret;
+       if (signal(SIGINT, handler) == SIG_ERR) {
+               ret = errno;
+               fprintf(stderr, "%s: can't set signal handler: %s\n", __FUNCTION__, strerror(errno));
+               return ret;
+       }
+       if (speed) printf("speed:     %d jobs/day (delay %lf)\n", speed, duration / 1000000.0);
+       else printf("speed:     unlimited\n");
+       printf("dump:      %s\n", dumpfile ? dumpfile : "(none)");
+       printf("reg-mdir:  %s\n", jpreg_dir);
+       printf("dump-mdir: %s\n", dump_dir);
+       printf("start:     %lf\n", ts);
+       printf("%s\n", start_jobid);
+       last = ts;
+       while (!do_exit) {
+               struct timeval tv;
+
+               if ((ret = register_add(NULL, &jobid)) != 0) return ret;
+//             printf("%s\n", jobid);
+               if (dumpfile) {
+                       if ((ret = dump_add(dumpfile, jobid)) != 0) return ret;
+//                     printf("  dumped %s\n", dumpfile);
+               }
+               free(jobid);
+               gettimeofday(&tv, NULL);
+               now = tv.tv_sec + (double)tv.tv_usec / 1000000.0;
+               if (now < last + duration) usleep(last + duration - now);
+               last = now;
+       }
+       if ((ret = register_add(stop_jobid, NULL)) != 0) return ret;
+       asprintf(&fn, PERF_STOP_FILE_FORMAT, perf_ts);
+       if ((f = fopen(fn, "wt")) == NULL) {
+               ret = errno;
+               free(fn);
+               fprintf(stderr, "Can' create file '%s': %s\n", fn, strerror(errno));
+               return ret;
+       }
+       free(fn);
+       fprintf(f, "reg-imp\t%d\n", perf_regs);
+       fprintf(f, "dump-imp\t%d\n", perf_dumps);
+       fclose(f);
+       dump_done();
+
+       get_time(NULL, -1, &ts2);
+       printf("stop:  %lf\n", ts2);
+       printf("regs:  %d (%lf jobs/day)\n", perf_regs, 86400.0 * perf_regs / (ts2-ts));
+       printf("dumps: %d (%lf jobs/day)\n", perf_dumps, 86400.0 * perf_dumps / (ts2-ts));
+       printf("%s\n", stop_jobid);
+
+       return 0;
+}
+
+
+static void get_time(char *s, size_t maxs, double *t) {
+       struct timeval tv;
+       struct tm tm;
+
+       gettimeofday(&tv, NULL);
+       if (t) *t = tv.tv_sec + (double)tv.tv_usec / 1000000.0;
+       gmtime_r(&tv.tv_sec, &tm);
+       if (s && maxs > 0) strftime(s, maxs, "%FT%TZ", &tm);
+}
+
+
+static int register_init() {
+        char *env;
+
+        if (!jpreg_dir) {
+               env = getenv("GLITE_LB_EXPORT_JPREG_MAILDIR");
+               if (env) jpreg_dir = strdup(env);
+               else jpreg_dir = strdup(GLITE_REG_IMPORTER_MDIR);
+       }
+        
+
+       // TODO: better from certificate        
+        env = getenv("GLITE_USER");
+        if (!env) env = USER;
+        user = strdup(env);
+
+        if (edg_wll_MaildirInit(jpreg_dir) != 0) {
+                fprintf(stderr, "maildir init on %s failed\n", jpreg_dir);
+                return EIO;
+        }
+
+       perf_regs = 0;
+        return 0;       
+}
+
+
+static int register_add(const char *jobid, char **new_jobid) {
+        glite_jobid_t j;
+        char *tmpjobid, *msg;
+
+       if (!jobid) {
+               if (glite_jobid_create(BKSERVER, BKPORT, &j) != 0 || (tmpjobid = glite_jobid_unparse(j)) == NULL) {
+                       fprintf(stderr, "Can't create jobid\n");
+                       return EIO;
+               }
+               glite_jobid_free(j);
+       } else tmpjobid = strdup(jobid);
+        asprintf(&msg, "%s\n%s", tmpjobid, user);
+       if (new_jobid) *new_jobid = tmpjobid;
+       else free(tmpjobid);
+        if (edg_wll_MaildirStoreMsg(jpreg_dir, BKSERVER, msg) != 0) {
+                fprintf(stderr, "Can't store message: %s\n", lbm_errdesc);
+                return EIO;
+        }
+        free(msg);
+
+       perf_regs++;
+       return 0;
+}
+
+
+static int dump_init(const char *start_jobid, const char *filename) {
+        char *env, *ptr, *delim;
+       FILE *f;
+       long ssize;
+       size_t i, dump_maxtokens, size;
+       int ret;
+
+       unlink(PERF_START_FILE);
+
+       dump = NULL;
+       dump_index = NULL;
+       dump_tokens = 0;
+       if (filename) {
+               if ((f = fopen(filename, "rt")) == NULL) {
+                       fprintf(stderr, "Can't open '%s': %s\n", filename, strerror(errno));
+                       return EIO;
+               }
+               if (fseek(f, 0, SEEK_END) == -1 || (ssize = ftell(f)) == -1 || fseek(f, 0, SEEK_SET) == -1) {
+                       fprintf(stderr, "Can't get position in '%s': %s\n", filename, strerror(errno));
+                       return EIO;
+               }
+               dump = malloc(size = ssize);
+               if (fread(dump, size, 1, f) != 1) {
+                       ret = errno;
+                       fprintf(stderr, "Error reading %ld bytes from file: %s\n", ssize, strerror(errno));
+                       return ret;
+               }
+               fclose(f);
+
+               dump_maxtokens = 1024;
+               dump_index = malloc(sizeof(char *) * dump_maxtokens);
+               i = 0;
+               ptr = dump;
+               do {
+                       if (dump_tokens >= dump_maxtokens) {
+                               dump_maxtokens *= 2;
+                               dump_index = realloc(dump_index, sizeof(char *) * dump_maxtokens);
+                       }
+                       delim = strstr(ptr, "DG.JOBID=\"");
+                       if (delim != ptr) {
+                               dump_index[dump_tokens++] = ptr;
+                               if (delim) {
+                                       delim[10] = '\0';
+                                       ptr = delim + 11;
+                               } else ptr = NULL;
+                       }
+                       if (ptr) ptr = strchr(ptr, '\"');
+               } while (ptr && ptr[0]);
+       }
+//for (i = 0; i < dump_tokens; i++) printf("####%s\n", dump_index[i]);
+
+       if (!dump_dir) {
+               // wrong purging to GLITE_LB_EXPORT_DUMPDIR on older versions
+               env = getenv("GLITE_LB_EXPORT_PURGEDIR");
+               if (env) dump_dir = strdup(env);
+               else dump_dir = strdup(EDG_DUMP_STORAGE);
+       }
+       mkdir(dump_dir, 0755);
+       perf_dumps = 0;
+
+       if ((f = fopen(PERF_START_FILE, "wt")) == NULL) {
+               fprintf(stderr, "Can't create file '" PERF_START_FILE "': %s\n", strerror(errno));
+               return EIO;
+       }
+       if (start_jobid) fprintf(f, "%s\n", start_jobid);
+       fclose(f);
+
+        return 0;
+}
+
+
+static int dump_add(const char *filename, const char *jobid) {
+       char *fn;
+       int ret;
+       size_t i;
+       FILE *f;
+
+       ret = 0;
+       asprintf(&fn, "%s/mill-test-%s-%06d", dump_dir, perf_ts, perf_dumps);
+       if ((f = fopen(fn , "wt")) == NULL) {
+               ret = errno;
+               fprintf(stderr, "Can't create file '%s': %s\n", fn, strerror(errno));
+               goto err;
+       }
+       for (i = 0; i < dump_tokens; i++) {
+               if (fputs(dump_index[i], f) == EOF || (i + 1 < dump_tokens && (fputs(jobid, f) == EOF))) {
+                       ret = errno;
+                       fprintf(stderr, "Can't write to '%s': %s\n", fn, strerror(errno));
+                       goto err_close;
+               }
+       }
+
+       perf_dumps++;
+err_close:
+       fclose(f);
+err:
+       free(fn);
+       return ret;
+}
+
+
+static void dump_done() {
+       free(dump_index);
+       free(dump);
+       dump_tokens = 0;
+}
index 93f7c72..019014f 100644 (file)
@@ -1,6 +1,23 @@
 #ifndef __GLITE_JPIMPORTER__
 #define __GLITE_JPIMPORTER__
 
+#ifndef GLITE_REG_IMPORTER_MDIR
+#define GLITE_REG_IMPORTER_MDIR                "/tmp/jpreg"
+#endif 
+
+#ifndef GLITE_DUMP_IMPORTER_MDIR
+#define GLITE_DUMP_IMPORTER_MDIR       "/tmp/jpdump"
+#endif 
+
+#ifndef GLITE_SANDBOX_IMPORTER_MDIR
+#define GLITE_SANDBOX_IMPORTER_MDIR    "/tmp/jpsandbox"
+#endif 
+
+#define PERF_JOBID_START_PREFIX "https://start.megajob/START-"
+#define PERF_JOBID_STOP_PREFIX "https://stop.megajob/STOP-"
+#define PERF_START_FILE                "/tmp/jp_megajob_start"
+#define PERF_STOP_FILE_FORMAT   "/tmp/jp_megajob_%s"
+
 #ifdef __cplusplus
 extern "C" {
 #endif
index efebc2a..974c0dc 100644 (file)
@@ -5,6 +5,7 @@
 #include <assert.h>
 #include <fcntl.h>
 #include <dirent.h>
+#include <stdio.h>
 
 
 #define COMPILE_WITH_LIBTAR
index bc27574..94a21d1 100644 (file)
@@ -6,6 +6,7 @@
 #include <string.h>
 #include <assert.h>
 #include <linux/limits.h>
+#include <sys/time.h>
 #include <sys/types.h>
 #include <sys/wait.h>
 #include <sys/stat.h>
@@ -13,6 +14,7 @@
 #include <syslog.h>
 #include <fcntl.h>
 #include <libgen.h>
+#include <ctype.h>
 
 #include "glite/lb/lb_maildir.h"
 
@@ -24,6 +26,8 @@
 #include "glite/security/glite_gscompat.h"
 
 #include "globus_ftp_client.h"
+#include "jp_client.h"
+#include "jpimporter.h"
 
 #if GSOAP_VERSION <= 20602
 #define soap_call___jpsrv__RegisterJob soap_call___ns1__RegisterJob
@@ -35,7 +39,6 @@ typedef struct {
        char       *val;
 } msg_pattern_t;
 
-
 #ifndef dprintf
 #define dprintf(FMT, ARGS...)          { if (debug) printf(FMT, ##ARGS); }
 #endif
@@ -46,26 +49,16 @@ typedef struct {
 #define GLITE_JPIMPORTER_PIDFILE       "/var/run/glite-jpimporter.pid"
 #endif 
 
-#ifndef GLITE_REG_IMPORTER_MDIR
-#define GLITE_REG_IMPORTER_MDIR                "/tmp/jpreg"
-#endif 
-
-#ifndef GLITE_DUMP_IMPORTER_MDIR
-#define GLITE_DUMP_IMPORTER_MDIR       "/tmp/jpdump"
-#endif 
-
-#ifndef GLITE_SANDBOX_IMPORTER_MDIR
-#define GLITE_SANDBOX_IMPORTER_MDIR    "/tmp/jpsandbox"
-#endif 
-
 #ifndef GLITE_JPPS
 #define GLITE_JPPS                                     "http://localhost:8901"
 #endif 
 
-
 #define        MAX_REG_CONNS                           500
 #define JPPS_NO_RESPONSE_TIMEOUT               120
-
+#define JPREG_REPEAT_TIMEOUT                   300
+#define JPREG_GIVUP_TIMEOUT                    3000
+#define JP_REPEAT_TIMEOUT                      360
+#define JP_GIVUP_TIMEOUT                       3600
 
 static int             debug = 0;
 static int             die = 0;
@@ -84,8 +77,16 @@ static char          *server_cert = NULL,
                        *server_key = NULL,
                        *cadir = NULL;
 static edg_wll_GssCred mycred = NULL;
-static char            *mysubj;
-struct timeval         to = {JPPS_NO_RESPONSE_TIMEOUT, 0};
+#ifdef JP_PERF
+typedef struct {
+       char *id, *name;
+       long int count, limit;
+       double start, end;
+} perf_t;
+
+int                    sink = 0;
+perf_t                 perf = {name:NULL,};
+#endif
 
 
 static struct option opts[] = {
@@ -101,13 +102,28 @@ static struct option opts[] = {
        { "pidfile",     1, NULL,    'i'},
        { "poll",        1, NULL,    't'},
        { "store",       1, NULL,    'S'},
+       { "store",       1, NULL,    'S'},
+#ifdef JP_PERF
+       { "perf-sink",   1, NULL,    'K'},
+#endif
        { NULL,          0, NULL,     0}
 };
 
-static const char *get_opt_string = "hgp:r:d:s:i:t:c:k:C:";
+static const char *get_opt_string = "hgp:r:d:s:i:t:c:k:C:"
+#ifdef JP_PERF
+       "K:"
+#endif
+;
 
 #include "glite/jp/ws_fault.c"
 
+#ifdef JP_PERF
+static void stats_init(perf_t *perf, const char *name);
+static void stats_set_jobid(perf_t *perf, const char *jobid);
+static void stats_get_limit(perf_t *perf, const char *name);
+static void stats_done(perf_t *perf);
+#endif
+
 static void usage(char *me)
 {
        fprintf(stderr,"usage: %s [option]\n"
@@ -121,9 +137,12 @@ static void usage(char *me)
                "\t-d, --dump-mdir    path to the 'LB maildir' subtree for LB dumps\n"
                "\t-s, --sandbox-mdir path to the 'LB maildir' subtree for input/output sandboxes\n"
                "\t-i, --pidfile      file to store master pid\n"
-               "\t-t, --poll         maildir polling interval (in seconds)\n",
-               "\t-S, --store        keep uploaded jobs in this directory\n",
-               me);
+               "\t-t, --poll         maildir polling interval (in seconds)\n"
+               "\t-S, --store        keep uploaded jobs in this directory\n"
+#ifdef JP_PERF
+               "\t-K, --perf-sink    1=stats, 2=without WS calls, 3=stats+without WS\n"
+#endif
+               , me);
 }
 
 static void catchsig(int sig)
@@ -131,7 +150,7 @@ static void catchsig(int sig)
        die = sig;
 }
 
-static void catch_chld(int sig)
+static void catch_chld(int sig __attribute__((unused)))
 {
        child_died = 1;
 }
@@ -143,7 +162,7 @@ static int dump_importer(void);
 static int sandbox_importer(void);
 static int parse_msg(char *, msg_pattern_t []);
 static int gftp_put_file(const char *, int);
-
+static int refresh_connection(struct soap *soap);
 
 
 int main(int argc, char *argv[])
@@ -157,7 +176,6 @@ int main(int argc, char *argv[])
        char                    *name,
                                pidfile[PATH_MAX] = GLITE_JPIMPORTER_PIDFILE;
        glite_gsplugin_Context  plugin_ctx;
-       edg_wll_GssCred         cred;
 
        name = strrchr(argv[0],'/');
        if (name) name++; else name = argv[0];
@@ -179,6 +197,9 @@ int main(int argc, char *argv[])
                case 'd': strcpy(dump_mdir, optarg); break;
                case 's': strcpy(sandbox_mdir, optarg); break;
                case 'i': strcpy(pidfile, optarg); break;
+#ifdef JP_PERF
+               case 'K': sink = atoi(optarg); break;
+#endif
                case '?': usage(name); return 1;
        }
        if ( optind < argc ) { usage(name); return 1; }
@@ -237,8 +258,8 @@ int main(int argc, char *argv[])
                                                " - unable to watch them for changes!\n", argv[0]);
        if ( cadir ) setenv("X509_CERT_DIR", cadir, 1);
        edg_wll_gss_watch_creds(server_cert, &cert_mtime);
-       if ( !edg_wll_gss_acquire_cred_gsi(server_cert, server_key, &mycred, &mysubj, &gss_code) ) {
-               dprintf("[master] Server identity: %s\n", mysubj);
+       if ( !edg_wll_gss_acquire_cred_gsi(server_cert, server_key, &mycred, &gss_code) ) {
+               dprintf("[master] Server identity: %s\n", mycred->name);
        } else {
                char *errmsg;
                edg_wll_gss_get_error(&gss_code, "edg_wll_gss_acquire_cred_gsi()", &errmsg);
@@ -269,13 +290,6 @@ int main(int argc, char *argv[])
        soap_set_namespaces(soap, jpps__namespaces);
 
        glite_gsplugin_init_context(&plugin_ctx);
-       if (edg_wll_gss_acquire_cred_gsi(server_cert, server_key, &cred, NULL, NULL) != 0) {
-               perror("can't acquire credentials");
-               exit(1);
-       }
-       glite_gsplugin_set_timeout(plugin_ctx, &to);
-       glite_gsplugin_set_credential(plugin_ctx, mycred);
-
        soap_register_plugin_arg(soap, glite_gsplugin,plugin_ctx);
 
        if ( (reg_pid = slave(reg_importer, "reg-imp")) < 0 ) {
@@ -371,7 +385,11 @@ static int slave(int (*fn)(void), const char *nm)
 
        dprintf("[%s] slave started - pid [%d]\n", name, getpid());
 
-       while ( !die && conn_cnt < MAX_REG_CONNS ) {
+#ifdef JP_PERF
+       while ( !die && (conn_cnt < MAX_REG_CONNS || (sink & 1)) ) {
+#else
+       while ( !die && (conn_cnt < MAX_REG_CONNS) ) {
+#endif
                int ret = fn();
 
                if ( ret > 0 ) conn_cnt++;
@@ -384,7 +402,7 @@ static int slave(int (*fn)(void), const char *nm)
        }
 
        if ( die ) {
-               dprintf("[%s] Terminating on signal %d\n", name, getpid(), die);
+               dprintf("[%s] %d: Terminating on signal %d\n", name, getpid(), die);
                if ( !debug ) syslog(LOG_INFO, "Terminating on signal %d", die);
        }
     dprintf("[%s] Terminating after %d connections\n", name, conn_cnt);
@@ -405,11 +423,11 @@ static int reg_importer(void)
                           *aux;
 
        if ( readnew ) ret = edg_wll_MaildirTransStart(reg_mdir, &msg, &fname);
-       else ret = edg_wll_MaildirRetryTransStart(reg_mdir, (time_t)30, (time_t)500, &msg, &fname);
+       else ret = edg_wll_MaildirRetryTransStart(reg_mdir, (time_t)JPREG_REPEAT_TIMEOUT, (time_t)JPREG_GIVUP_TIMEOUT, &msg, &fname);
        if ( !ret ) { 
                readnew = !readnew;
                if ( readnew ) ret = edg_wll_MaildirTransStart(reg_mdir, &msg, &fname);
-               else ret = edg_wll_MaildirRetryTransStart(reg_mdir, (time_t)30, (time_t)500, &msg, &fname);
+               else ret = edg_wll_MaildirRetryTransStart(reg_mdir, (time_t)JPREG_REPEAT_TIMEOUT, (time_t)JPREG_GIVUP_TIMEOUT, &msg, &fname);
                if ( !ret ) {
                        readnew = !readnew;
                        return 0;
@@ -420,12 +438,6 @@ static int reg_importer(void)
                dprintf("[%s] edg_wll_MaildirTransStart: %s (%s)\n", name, strerror(errno), lbm_errdesc);
                if ( !debug ) syslog(LOG_ERR, "edg_wll_MaildirTransStart: %s (%s)", strerror(errno), lbm_errdesc);
                return -1;
-       }
-
-       if ( ret < 0 ) {
-               dprintf("[%s] edg_wll_MaildirTransStart: %s (%s)\n", name, strerror(errno), lbm_errdesc);
-               if ( !debug ) syslog(LOG_ERR, "edg_wll_MaildirTransStart: %s (%s)", strerror(errno), lbm_errdesc);
-               return -1;
        } else if ( ret > 0 ) {
                dprintf("[%s] JP registration request received\n", name);
                if ( !debug ) syslog(LOG_INFO, "JP registration request received\n");
@@ -441,8 +453,30 @@ static int reg_importer(void)
                        in.owner = aux;
                        dprintf("[%s] Registering '%s'\n", name, msg);
                        if ( !debug ) syslog(LOG_INFO, "Registering '%s'\n", msg);
+#ifdef JP_PERF
+                       if ((sink & 1)) {
+                               if (strncasecmp(msg, PERF_JOBID_START_PREFIX, sizeof(PERF_JOBID_START_PREFIX) - 1) == 0) {
+                                       stats_init(&perf, name);
+                                       stats_set_jobid(&perf, msg);
+                               }
+                               if (perf.name && !perf.limit) stats_get_limit(&perf, name);
+                       }
+                       if (!(sink & 2)) {
+#endif
+                       refresh_connection(soap);
                        ret = soap_call___jpsrv__RegisterJob(soap, jpps, "", &in, &empty);
                        if ( (ret = check_soap_fault(soap, ret)) ) break;
+#ifdef JP_PERF
+                       } else ret = 0;
+                       if (perf.name && ret == 0) {
+                               perf.count++;
+                               if (perf.limit) {
+                                       dprintf("[%s statistics] done %ld/%ld\n", name, perf.count, perf.limit);
+                                       if (perf.count >= perf.limit) stats_done(&perf);
+                               } else
+                                       dprintf("[%s statistics] done %ld/no limit\n", name, perf.count);
+                       }
+#endif
                } while (0);
                edg_wll_MaildirTransEnd(reg_mdir, fname, ret? LBMD_TRANS_FAILED_RETRY: LBMD_TRANS_OK);
                free(fname);
@@ -462,7 +496,6 @@ static int dump_importer(void)
        static int              readnew = 1;
        char               *msg = NULL,
                                   *fname = NULL,
-                                  *aux,
                                   *bname;
        char                        fspec[PATH_MAX];
        int                             ret;
@@ -480,11 +513,11 @@ static int dump_importer(void)
 
 
        if ( readnew ) ret = edg_wll_MaildirTransStart(dump_mdir, &msg, &fname);
-       else ret = edg_wll_MaildirRetryTransStart(dump_mdir, (time_t)60, (time_t)600, &msg, &fname);
+       else ret = edg_wll_MaildirRetryTransStart(dump_mdir, (time_t)JP_REPEAT_TIMEOUT, (time_t)JP_GIVUP_TIMEOUT, &msg, &fname);
        if ( !ret ) { 
                readnew = !readnew;
                if ( readnew ) ret = edg_wll_MaildirTransStart(dump_mdir, &msg, &fname);
-               else ret = edg_wll_MaildirRetryTransStart(dump_mdir, (time_t)60, (time_t)600, &msg, &fname);
+               else ret = edg_wll_MaildirRetryTransStart(dump_mdir, (time_t)JP_REPEAT_TIMEOUT, (time_t)JP_GIVUP_TIMEOUT, &msg, &fname);
                if ( !ret ) {
                        readnew = !readnew;
                        return 0;
@@ -513,6 +546,29 @@ static int dump_importer(void)
                su_in.contentType = "text/lb";
                dprintf("[%s] Importing LB dump file '%s'\n", name, tab[_file].val);
                if ( !debug ) syslog(LOG_INFO, "Importing LB dump file '%s'\n", msg);
+#ifdef JP_PERF
+               if ((sink & 1)) {
+               /* statistics started by file, ended by count limit (from the appropriate result fikle) */
+                       FILE *f;
+                       char item[200];
+
+                       /* starter */
+                       if (!perf.name) {
+                               f = fopen(PERF_START_FILE, "rt");
+                               if (f) {
+                                       stats_init(&perf, name);
+                                       fscanf(f, "%s", item);
+                                       fclose(f);
+                                       unlink(PERF_START_FILE);
+                                       stats_set_jobid(&perf, item);
+                               } else
+                                       dprintf("[%s statistics]: not started/too much dumps: %s\n", name, strerror(errno));
+                       }
+                       if (perf.name && !perf.limit) stats_get_limit(&perf, name);
+               }
+               if (!(sink & 2)) {
+#endif
+               refresh_connection(soap);
                ret = soap_call___jpsrv__StartUpload(soap, tab[_jpps].val?:jpps, "", &su_in, &su_out);
                if ( (ret = check_soap_fault(soap, ret)) ) break;
                dprintf("[%s] Destination: %s\n\tCommit before: %s\n", name, su_out.destination, ctime(&su_out.commitBefore));
@@ -532,9 +588,21 @@ static int dump_importer(void)
                close(fhnd);
                dprintf("[%s] File sent, commiting the upload\n", name);
                cu_in.destination = su_out.destination;
+               refresh_connection(soap);
                ret = soap_call___jpsrv__CommitUpload(soap, tab[_jpps].val?:jpps, "", &cu_in, &empty);
                if ( (ret = check_soap_fault(soap, ret)) ) break;
                dprintf("[%s] Dump upload succesfull\n", name);
+#ifdef JP_PERF
+               } else ret = 0;
+               if (perf.name && ret == 0) {
+                       perf.count++;
+                       if (perf.limit) {
+                               dprintf("[%s statistics] done %ld/%ld\n", name, perf.count, perf.limit);
+                               if (perf.count >= perf.limit) stats_done(&perf);
+                       } else
+                               dprintf("[%s statistics] done %ld/no limit\n", name, perf.count);
+               }
+#endif
                if (store && *store) {
                        bname = strdup(tab[_file].val);
                        snprintf(fspec, sizeof fspec, "%s/%s", store, basename(bname));
@@ -568,8 +636,7 @@ static int sandbox_importer(void)
        struct _jpelem__CommitUploadResponse    empty;
        static int      readnew = 1;
        char            *msg = NULL,
-                       *fname = NULL,
-                       *aux;
+                       *fname = NULL;
        int             ret;
        int             fhnd;
        msg_pattern_t   tab[] = {
@@ -586,11 +653,11 @@ static int sandbox_importer(void)
 
 
        if ( readnew ) ret = edg_wll_MaildirTransStart(sandbox_mdir, &msg, &fname);
-       else ret = edg_wll_MaildirRetryTransStart(sandbox_mdir, (time_t)60, (time_t) 600,&msg, &fname);
+       else ret = edg_wll_MaildirRetryTransStart(sandbox_mdir, (time_t)JP_REPEAT_TIMEOUT, (time_t)JP_GIVUP_TIMEOUT ,&msg, &fname);
        if ( !ret ) { 
                readnew = !readnew;
                if ( readnew ) ret = edg_wll_MaildirTransStart(sandbox_mdir, &msg, &fname);
-               else ret = edg_wll_MaildirRetryTransStart(sandbox_mdir, (time_t)60, (time_t) 600,&msg, &fname);
+               else ret = edg_wll_MaildirRetryTransStart(sandbox_mdir, (time_t)JP_REPEAT_TIMEOUT, (time_t)JP_GIVUP_TIMEOUT ,&msg, &fname);
                if ( !ret ) {
                        readnew = !readnew;
                        return 0;
@@ -622,6 +689,10 @@ static int sandbox_importer(void)
                su_in.contentType = "tar/lb";
                dprintf("[%s] Importing LB sandbox tar file '%s'\n", name, tab[_file].val);
                if ( !debug ) syslog(LOG_INFO, "Importing LB sandbox tar file '%s'\n", msg);
+#ifdef JP_PERF
+               if (!(sink & 2)) {
+#endif
+               refresh_connection(soap);
                ret = soap_call___jpsrv__StartUpload(soap, tab[_jpps].val?:jpps, "", &su_in, &su_out);
                ret = check_soap_fault(soap, ret);
                /* XXX: grrrrrrr! test it!!!*/
@@ -638,9 +709,13 @@ static int sandbox_importer(void)
                close(fhnd);
                dprintf("[%s] File sent, commiting the upload\n", name);
                cu_in.destination = su_out.destination;
+               refresh_connection(soap);
                ret = soap_call___jpsrv__CommitUpload(soap, tab[_jpps].val?:jpps, "", &cu_in, &empty);
                if ( (ret = check_soap_fault(soap, ret)) ) break;
                dprintf("[%s] Dump upload succesfull\n", name);
+#ifdef JP_PERF
+               } else ret = 0;
+#endif
        } while (0);
 
        edg_wll_MaildirTransEnd(sandbox_mdir, fname, ret? LBMD_TRANS_FAILED_RETRY: LBMD_TRANS_OK);
@@ -760,7 +835,7 @@ static int gftp_put_file(const char *url, int fhnd)
                put_file_err("Could not initialise operation attributes");
        
        if ( globus_ftp_client_operationattr_set_authorization(
-                       &op_attr, server_cert? mycred: GSS_C_NO_CREDENTIAL,
+                       &op_attr, server_cert? mycred->gss_cred: GSS_C_NO_CREDENTIAL,
                        NULL, "", 0, NULL) != GLOBUS_SUCCESS )
                put_file_err("Could not set authorization procedure");
 
@@ -809,6 +884,83 @@ static int gftp_put_file(const char *url, int fhnd)
     return (gError == GLOBUS_TRUE)? 1: 0;
 }
 
+
+static int refresh_connection(struct soap *soap) {
+       struct timeval          to = {JPPS_NO_RESPONSE_TIMEOUT, 0};
+       edg_wll_GssCred newcred;
+       edg_wll_GssStatus       gss_code;
+       glite_gsplugin_Context gp_ctx;
+
+       gp_ctx = glite_gsplugin_get_context(soap);
+       glite_gsplugin_set_timeout(gp_ctx, &to);
+
+       switch ( edg_wll_gss_watch_creds(server_cert, &cert_mtime) ) {
+       case 0: break;
+       case 1:
+               if ( !edg_wll_gss_acquire_cred_gsi(server_cert, server_key, &newcred, &gss_code) ) {
+                       dprintf("[%s] reloading credentials successful\n", name);
+                       edg_wll_gss_release_cred(&mycred, &gss_code);
+                       mycred = newcred;
+                       glite_gsplugin_set_credential(gp_ctx, newcred);
+               } else { dprintf("[%s] reloading credentials failed, using old ones\n", name); }
+               break;
+       case -1: dprintf("[%s] edg_wll_gss_watch_creds failed\n", name); break;
+       }
+
+       return 0;
+}
+
+
+#ifdef JP_PERF
+static void stats_init(perf_t *perf, const char *name) {
+       struct timeval tv;
+
+       memset(perf, 0, sizeof *perf);
+       perf->count = 0;
+       perf->name = strdup(name);
+       gettimeofday(&tv, NULL);
+       perf->start = tv.tv_sec + (double)tv.tv_usec / 1000000.0;
+       dprintf("[%s statistics] start detected\n", name);
+}
+
+static void stats_set_jobid(perf_t *perf, const char *jobid) {
+       perf->id = strdup(jobid + sizeof(PERF_JOBID_START_PREFIX) - 1);
+       dprintf("[%s statistics] ID %s\n", perf->name, perf->id);
+}
+
+static void stats_get_limit(perf_t *perf, const char *name) {
+       FILE *f;
+       char *fn, item[200];
+       int count;
+
+       /* stopper */
+       asprintf(&fn, PERF_STOP_FILE_FORMAT, perf->id);
+       f = fopen(fn, "rt");
+       free(fn);
+       if (f) {
+               fscanf(f, "%s\t%d", item, &count);
+               if (strcasecmp(item, name) != 0) fscanf(f, "%s\t%d", item, &count);
+               dprintf("[%s statistics] expected %d %s\n", name, count, item);
+               fclose(f);
+               perf->limit = count;
+       }
+}
+
+static void stats_done(perf_t *perf) {
+       struct timeval tv;
+
+       gettimeofday(&tv, NULL);
+       perf->end = tv.tv_sec + (double)tv.tv_usec / 1000000.0;
+       dprintf("[%s statistics] %s\n", perf->name, perf->id);
+       dprintf("[%s statistics] start: %lf\n", perf->name, perf->start);
+       dprintf("[%s statistics] stop:  %lf\n", perf->name, perf->end);
+       dprintf("[%s statistics] count: %ld (%lf job/day)\n", perf->name, perf->count, 86400.0 * perf->count / (perf->end - perf->start));
+       free(perf->id);
+       free(perf->name);
+       memset(perf, 0, sizeof *perf);
+}
+#endif
+
 /* XXX: we don't use it */
 SOAP_NMAC struct Namespace namespaces[] = { {NULL,NULL} };