Initial experiments with registrations and mill.
authorFrantišek Dvořák <valtri@civ.zcu.cz>
Wed, 18 Jul 2007 15:27:17 +0000 (15:27 +0000)
committerFrantišek Dvořák <valtri@civ.zcu.cz>
Wed, 18 Jul 2007 15:27:17 +0000 (15:27 +0000)
org.glite.jp.client/Makefile
org.glite.jp.client/config/startup
org.glite.jp.client/examples/glite-jp-importer.sh
org.glite.jp.client/examples/mill_feed.c [new file with mode: 0644]
org.glite.jp.client/interface/jpimporter.h
org.glite.jp.client/src/jpimporter.c

index cb636fc..8ce0358 100644 (file)
@@ -24,6 +24,10 @@ STAGETO=include/${globalprefix}/${jpprefix}
 
 VPATH=${top_srcdir}/src:${top_srcdir}/examples:${top_srcdir}/project$:${top_srcdir}/interface:${stagedir}/interface:${top_srcdir}/build
 
+ifdef JP_PERF
+       JP_PERF_CFLAGS:=-DJP_PERF=1
+endif
+
 GLOBUS_LIBS:=-L${globus_prefix}/lib \
        -lglobus_common_${nothrflavour} \
        -lglobus_gssapi_gsi_${nothrflavour} \
@@ -34,7 +38,7 @@ GLOBUS_CFLAGS:=-I${globus_prefix}/include/${nothrflavour}
 
 DEBUG:=-g -O0 -DDEBUG
 
-CFLAGS:=${DEBUG} -I. -I${top_srcdir}/interface -I${top_srcdir}/src -I${gsoap_prefix}/include -I${gsoap_prefix} -I${stagedir}/include ${GLOBUS_CFLAGS} -I${libtar}/include
+CFLAGS:=${DEBUG} -I. -I${top_srcdir}/interface -I${top_srcdir}/src -I${gsoap_prefix}/include -I${gsoap_prefix} -I${stagedir}/include ${GLOBUS_CFLAGS} -I${libtar}/include ${JP_PERF_CFLAGS}
 LDFLAGS:=-L${stagedir}/lib -L${libtar}/lib
 
 LINK:=libtool --mode=link ${CC} ${LDFLAGS} 
@@ -47,7 +51,7 @@ LIBTAR:=-L${libtar}/lib -ltar
 STAGE_HDRS:=jpcl_ctx_int.h
 HDRS:=jp_client.h jpimporter.h
 
-EXAMPLES:=jpps_upload_files
+EXAMPLES:=jpps_upload_files mill_feed
 
 LIBOBJS:=jpcl_ctx.o jpimp_lib.o
 LIBTHROBJS:=${LIBOBJS:.o=.thr.o}
@@ -88,9 +92,12 @@ ${daemon}: ${OBJS}
 
 ${EXAMPLES}: ${LIB}
 
-${EXAMPLES}: %: %.o
+jpps_upload_files: %: %.o
        ${LINK} -o $@ $< ${LIB} ${LBMAILDIRLIB} 
 
+mill_feed: %: %.o
+       ${LINK} -o $@ $< ${LBMAILDIRLIB} -lglite_lb_common_${nothrflavour}
+
 
 
 JobProvenancePS.xh: %.xh: %.wsdl JobProvenanceTypes.wsdl typemap.dat
@@ -149,6 +156,7 @@ install:
        ${INSTALL} -m 755 ${daemon} ${PREFIX}/bin
        ${INSTALL} -m 644 ${LIB} ${PREFIX}/lib
        ${INSTALL} -m 644 jpps_upload_files ${PREFIX}/examples/glite-jp-primary-upload_files
+       ${INSTALL} -m 755 mill_feed ${PREFIX}/examples/glite-jp-mill_feed
        cd ${top_srcdir}/examples && ${INSTALL} -m 755 glite-jp-importer.sh ${PREFIX}/examples/
        ${INSTALL} -m 755 ${top_srcdir}/config/startup ${PREFIX}/etc/init.d/glite-jp-importer
        cd ${top_srcdir}/interface && ${INSTALL} -m 644 ${HDRS} ${PREFIX}/${STAGETO}
index 5d5173e..44018a7 100755 (executable)
@@ -55,7 +55,8 @@ start()
 #              -i $pidfile $jpreg_maildir $jpdump_maildir $jpps $sandbox_maildir $creds" \
 
        su - $GLITE_USER -c "$GLITE_LOCATION/bin/glite-jp-importer \
-               -i $pidfile $jpreg_maildir $jpdump_maildir $jpps $sandbox_maildir $keep_jobs $creds" \
+               -i $pidfile $jpreg_maildir $jpdump_maildir $jpps $sandbox_maildir $keep_jobs \
+               $creds $GLITE_JP_IMPORTER_ARGS" \
        && echo " done" || echo " FAILED"
 }
 
index 4d2f737..2740e05 100644 (file)
@@ -67,7 +67,7 @@ fi
 
 echo "Using cert args $CERT_ARGS"
 
-$PREFIX/bin/glite-jp-importer --reg-mdir $GLITE_LB_EXPORT_JPREG_MAILDIR --dump-mdir $GLITE_LB_EXPORT_JPDUMP_MAILDIR $CERT_ARGS --sandbox-mdir $GLITE_LB_EXPORT_SANDBOX_MAILDIR -g --jpps $GLITE_LB_EXPORT_JPPS $pidfile$keep_jobs> $LOGDIR/jp-importer.log 2>&1 &
+$PREFIX/bin/glite-jp-importer --reg-mdir $GLITE_LB_EXPORT_JPREG_MAILDIR --dump-mdir $GLITE_LB_EXPORT_JPDUMP_MAILDIR $CERT_ARGS --sandbox-mdir $GLITE_LB_EXPORT_SANDBOX_MAILDIR -g --jpps $GLITE_LB_EXPORT_JPPS $pidfile$keep_jobs$GLITE_JP_IMPORTER_ARGS > $LOGDIR/jp-importer.log 2>&1 &
 
 JP_PID=$!
 trap "kill $JP_PID; exit 0" SIGINT
diff --git a/org.glite.jp.client/examples/mill_feed.c b/org.glite.jp.client/examples/mill_feed.c
new file mode 100644 (file)
index 0000000..676a829
--- /dev/null
@@ -0,0 +1,162 @@
+#include <signal.h>
+#include <stdlib.h>
+#include <stdio.h>
+#include <errno.h>
+#include <string.h>
+#include <time.h>
+#include "jp_client.h"
+#include "jpimporter.h" 
+#include "glite/lb/lb_maildir.h" 
+#include "glite/wmsutils/jobid/cjobid.h" 
+#include "glite/wmsutils/jobid/cjobid.h"
+
+
+#define USER "Job Generator Buddy" 
+#define BKSERVER "funny.zcu.cz"
+#define BKPORT 9000
+#ifndef EDG_DUMP_STORAGE
+#define EDG_DUMP_STORAGE       "/tmp/dump"
+#endif
+#ifndef EDG_PURGE_STORAGE
+#define EDG_PURGE_STORAGE      "/tmp/purge"
+#endif
+
+
+char *jpreg_dir;
+char *dump_dir;
+char *user;
+int do_exit = 0;
+int perf_regs, perf_dumps;
+char perf_ts[100];
+
+
+static int register_init();
+static int register_add(const char *jobid);
+static void get_time(char *s, size_t maxs, double *t);
+static int dump_init();
+
+
+static void handler(int sig) {
+       do_exit = sig;
+       signal(sig, SIG_DFL);
+}
+
+int main(int argc, char *argv[]) {
+       char start_jobid[256], stop_jobid[256];
+       double ts, ts2;
+       int ret;
+
+       get_time(perf_ts, sizeof(perf_ts), &ts);
+       snprintf(start_jobid, sizeof(start_jobid), PERF_JOBID_START_PREFIX "%s", perf_ts);
+       snprintf(stop_jobid, sizeof(stop_jobid), PERF_JOBID_STOP_PREFIX "%s", perf_ts);
+
+       if ((ret = register_init()) != 0) return ret;
+       if ((ret = dump_init()) != 0) return ret;
+       if ((ret = register_add(start_jobid)) != 0) return ret;
+       if (signal(SIGINT, handler) == SIG_ERR) {
+               ret = errno;
+               fprintf(stderr, "%s: can't set signal handler: %s\n", __FUNCTION__, strerror(errno));
+               return ret;
+       }
+       printf("%s\n", start_jobid);
+       printf("start: %lf\n", ts);
+       while (!do_exit) {
+               if ((ret = register_add(NULL)) != 0) return ret;
+               if (argc > 1)
+                       if ((ret = dump_add(argv[1])) != 0) return ret;
+       }
+       if ((ret = register_add(stop_jobid)) != 0) return ret;
+       get_time(NULL, -1, &ts2);
+       printf("stop:  %lf\n", ts2);
+       printf("regs:  %d (%lf jobs/day)\n", perf_regs, 86400.0 * perf_regs / (ts2-ts));
+       printf("dumps: %d (%lf jobs/day)\n", perf_dumps, 86400.0 * perf_dumps / (ts2-ts));
+       printf("%s\n", stop_jobid);
+
+       return 0;
+}
+
+
+static void get_time(char *s, size_t maxs, double *t) {
+       struct timeval tv;
+       struct tm tm;
+
+       gettimeofday(&tv, NULL);
+       if (t) *t = tv.tv_sec + (double)tv.tv_usec / 1000000.0;
+       gmtime_r(&tv.tv_sec, &tm);
+       if (s && maxs > 0) strftime(s, maxs, "%FT%TZ", &tm);
+}
+
+
+static int register_init() {
+        char *env;
+
+        env = getenv("GLITE_LB_EXPORT_JPREG_MAILDIR");
+        if (!env) env = GLITE_REG_IMPORTER_MDIR;
+        jpreg_dir = strdup(env);
+
+       // TODO: better from certificate        
+        env = getenv("GLITE_USER");
+        if (!env) env = USER;
+        user = strdup(env);
+
+        if (edg_wll_MaildirInit(jpreg_dir) != 0) {
+                fprintf(stderr, "maildir init on %s failed\n", jpreg_dir);
+                return EIO;
+        }
+
+       perf_regs = 0;
+        return 0;       
+}
+
+
+static int register_add(const char *jobid) {
+        edg_wlc_JobId j;
+        char *tmpjobid, *msg;
+
+       if (!jobid) {
+               if (edg_wlc_JobIdCreate(BKSERVER, BKPORT, &j) != 0 || (tmpjobid = edg_wlc_JobIdUnparse(j)) == NULL) {
+                       fprintf(stderr, "Can't create jobid\n");
+                       return EIO;
+               }
+               edg_wlc_JobIdFree(j);
+       } else tmpjobid = strdup(jobid);
+        asprintf(&msg, "%s\n%s", tmpjobid, user);
+       free(tmpjobid);
+        if (edg_wll_MaildirStoreMsg(jpreg_dir, BKSERVER, msg) != 0) {
+                fprintf(stderr, "Can't store message: %s\n", lbm_errdesc);
+                return EIO;
+        }
+        free(msg);
+
+       perf_regs++;
+       return 0;
+}
+
+
+static int dump_init() {
+        char *env;
+
+       // FIXME: is it OK? (probably different HEAD and branch)
+        env = getenv("GLITE_LB_EXPORT_DUMPDIR");
+        if (!env) env = EDG_DUMP_STORAGE;
+        dump_dir = strdup(env);
+       mkdir(dump_dir, 0755);
+       perf_dumps = 0;
+
+        return 0;       
+}
+
+
+static int dump_add(const char *filename) {
+       char *fn;
+       int ret;
+
+       asprintf(&fn, "%s/mill-test-%s-%06d", dump_dir, perf_ts, perf_dumps);
+       if ((ret = link(filename, fn)) != 0) {
+                fprintf(stderr, "Can't link file: %s\n", strerror(errno));
+       }
+       free(fn);
+
+       perf_dumps++;
+       return ret;
+}
index 93f7c72..a0b0bc7 100644 (file)
@@ -1,6 +1,22 @@
 #ifndef __GLITE_JPIMPORTER__
 #define __GLITE_JPIMPORTER__
 
+#ifndef GLITE_REG_IMPORTER_MDIR
+#define GLITE_REG_IMPORTER_MDIR                "/tmp/jpreg"
+#endif 
+
+#ifndef GLITE_DUMP_IMPORTER_MDIR
+#define GLITE_DUMP_IMPORTER_MDIR       "/tmp/jpdump"
+#endif 
+
+#ifndef GLITE_SANDBOX_IMPORTER_MDIR
+#define GLITE_SANDBOX_IMPORTER_MDIR    "/tmp/jpsandbox"
+#endif 
+
+#define PERF_JOBID_START_PREFIX "https://start.megajob/"
+#define PERF_JOBID_STOP_PREFIX "https://stop.megajob/"
+
+
 #ifdef __cplusplus
 extern "C" {
 #endif
index 31e9a3d..50047c4 100644 (file)
@@ -6,6 +6,7 @@
 #include <string.h>
 #include <assert.h>
 #include <linux/limits.h>
+#include <sys/time.h>
 #include <sys/types.h>
 #include <sys/wait.h>
 #include <sys/stat.h>
@@ -24,6 +25,8 @@
 #include "glite/security/glite_gscompat.h"
 
 #include "globus_ftp_client.h"
+#include "jp_client.h"
+#include "jpimporter.h"
 
 #if GSOAP_VERSION <= 20602
 #define soap_call___jpsrv__RegisterJob soap_call___ns1__RegisterJob
@@ -46,27 +49,13 @@ typedef struct {
 #define GLITE_JPIMPORTER_PIDFILE       "/var/run/glite-jpimporter.pid"
 #endif 
 
-#ifndef GLITE_REG_IMPORTER_MDIR
-#define GLITE_REG_IMPORTER_MDIR                "/tmp/jpreg"
-#endif 
-
-#ifndef GLITE_DUMP_IMPORTER_MDIR
-#define GLITE_DUMP_IMPORTER_MDIR       "/tmp/jpdump"
-#endif 
-
-#ifndef GLITE_SANDBOX_IMPORTER_MDIR
-#define GLITE_SANDBOX_IMPORTER_MDIR    "/tmp/jpsandbox"
-#endif 
-
 #ifndef GLITE_JPPS
 #define GLITE_JPPS                                     "http://localhost:8901"
 #endif 
 
-
 #define        MAX_REG_CONNS                           500
 #define JPPS_NO_RESPONSE_TIMEOUT               120
 
-
 static int             debug = 0;
 static int             die = 0;
 static int             child_died = 0;
@@ -86,6 +75,12 @@ static char          *server_cert = NULL,
 static gss_cred_id_t   mycred = GSS_C_NO_CREDENTIAL;
 static char            *mysubj;
 struct timeval         to = {JPPS_NO_RESPONSE_TIMEOUT, 0};
+#ifdef JP_PERF
+int                    sink = 0;
+int                    perf_regs, perf_dumps, perf_sandboxes;
+char                   *perf_id = NULL;
+double                 perf_start, perf_end;
+#endif
 
 
 static struct option opts[] = {
@@ -101,10 +96,18 @@ static struct option opts[] = {
        { "pidfile",     1, NULL,    'i'},
        { "poll",        1, NULL,    't'},
        { "store",       1, NULL,    'S'},
+       { "store",       1, NULL,    'S'},
+#ifdef JP_PERF
+       { "perf-sink",   1, NULL,    'K'},
+#endif
        { NULL,          0, NULL,     0}
 };
 
-static const char *get_opt_string = "hgp:r:d:s:i:t:c:k:C:";
+static const char *get_opt_string = "hgp:r:d:s:i:t:c:k:C:"
+#ifdef JP_PERF
+       "K:"
+#endif
+;
 
 #include "glite/jp/ws_fault.c"
 
@@ -121,9 +124,12 @@ static void usage(char *me)
                "\t-d, --dump-mdir    path to the 'LB maildir' subtree for LB dumps\n"
                "\t-s, --sandbox-mdir path to the 'LB maildir' subtree for input/output sandboxes\n"
                "\t-i, --pidfile      file to store master pid\n"
-               "\t-t, --poll         maildir polling interval (in seconds)\n",
-               "\t-S, --store        keep uploaded jobs in this directory\n",
-               me);
+               "\t-t, --poll         maildir polling interval (in seconds)\n"
+               "\t-S, --store        keep uploaded jobs in this directory\n"
+#ifdef JP_PERF
+               "\t-K, --perf-sink    1=stats, 2=stats+run w/o WS calls\n"
+#endif
+               , me);
 }
 
 static void catchsig(int sig)
@@ -179,6 +185,9 @@ int main(int argc, char *argv[])
                case 'd': strcpy(dump_mdir, optarg); break;
                case 's': strcpy(sandbox_mdir, optarg); break;
                case 'i': strcpy(pidfile, optarg); break;
+#ifdef JP_PERF
+               case 'K': sink = atoi(optarg); break;
+#endif
                case '?': usage(name); return 1;
        }
        if ( optind < argc ) { usage(name); return 1; }
@@ -368,7 +377,11 @@ static int slave(int (*fn)(void), const char *nm)
 
        dprintf("[%s] slave started - pid [%d]\n", name, getpid());
 
-       while ( !die && conn_cnt < MAX_REG_CONNS ) {
+#ifdef JP_PERF
+       while ( !die && (conn_cnt < MAX_REG_CONNS || (sink & 1)) ) {
+#else
+       while ( !die && (conn_cnt < MAX_REG_CONNS) ) {
+#endif
                int ret = fn();
 
                if ( ret > 0 ) conn_cnt++;
@@ -438,8 +451,35 @@ static int reg_importer(void)
                        in.owner = aux;
                        dprintf("[%s] Registering '%s'\n", name, msg);
                        if ( !debug ) syslog(LOG_INFO, "Registering '%s'\n", msg);
+#ifdef JP_PERF
+                       if (sink) {
+                               struct timeval tv;
+
+                               if (strncasecmp(msg, PERF_JOBID_START_PREFIX, sizeof(PERF_JOBID_START_PREFIX) - 1) == 0) {
+                                       perf_regs = 0;
+                                       perf_dumps = 0;
+                                       perf_sandboxes = 0;
+                                       free(perf_id);
+                                       perf_id = strdup(msg + sizeof(PERF_JOBID_START_PREFIX) - 1);
+                                       gettimeofday(&tv, NULL);
+                                       perf_start = tv.tv_sec + (double)tv.tv_usec / 1000000.0;
+                                       dprintf("[statistics] %s\n", perf_id);
+                               } else if (strncasecmp(msg, PERF_JOBID_STOP_PREFIX, sizeof(PERF_JOBID_STOP_PREFIX) - 1) == 0) {
+                                       gettimeofday(&tv, NULL);
+                                       perf_end = tv.tv_sec + (double)tv.tv_usec / 1000000.0;
+                                       dprintf("[statistics] %s\n", perf_id);
+                                       dprintf("[statistics] start: %lf\n", perf_start);
+                                       dprintf("[statistics] stop:  %lf\n", perf_end);
+                                       dprintf("[statistics] regs:  %d (%lf jobs/day)\n", perf_regs, 86400.0 * perf_regs / (perf_end - perf_start));
+                               } else perf_regs++;
+                       }
+                       if (!(sink & 2)) {
+#endif
                        ret = soap_call___jpsrv__RegisterJob(soap, jpps, "", &in, &empty);
                        if ( (ret = check_soap_fault(soap, ret)) ) break;
+#ifdef JP_PERF
+                       } else ret = 0;
+#endif
                } while (0);
                edg_wll_MaildirTransEnd(reg_mdir, fname, ret? LBMD_TRANS_FAILED_RETRY: LBMD_TRANS_OK);
                free(fname);
@@ -510,6 +550,10 @@ static int dump_importer(void)
                su_in.contentType = "text/lb";
                dprintf("[%s] Importing LB dump file '%s'\n", name, tab[_file].val);
                if ( !debug ) syslog(LOG_INFO, "Importing LB dump file '%s'\n", msg);
+#ifdef JP_PERF
+               if (sink) perf_dumps++;
+               if (!(sink & 2)) {
+#endif
                ret = soap_call___jpsrv__StartUpload(soap, tab[_jpps].val?:jpps, "", &su_in, &su_out);
                if ( (ret = check_soap_fault(soap, ret)) ) break;
                dprintf("[%s] Destination: %s\n\tCommit before: %s\n", name, su_out.destination, ctime(&su_out.commitBefore));
@@ -532,6 +576,9 @@ static int dump_importer(void)
                ret = soap_call___jpsrv__CommitUpload(soap, tab[_jpps].val?:jpps, "", &cu_in, &empty);
                if ( (ret = check_soap_fault(soap, ret)) ) break;
                dprintf("[%s] Dump upload succesfull\n", name);
+#ifdef JP_PERF
+               } else ret = 0;
+#endif
                if (store && *store) {
                        bname = strdup(tab[_file].val);
                        snprintf(fspec, sizeof fspec, "%s/%s", store, basename(bname));
@@ -619,6 +666,10 @@ static int sandbox_importer(void)
                su_in.contentType = "tar/lb";
                dprintf("[%s] Importing LB sandbox tar file '%s'\n", name, tab[_file].val);
                if ( !debug ) syslog(LOG_INFO, "Importing LB sandbox tar file '%s'\n", msg);
+#ifdef JP_PERF
+               if (sink) perf_sandboxes++;
+               if (!(sink & 2)) {
+#endif
                ret = soap_call___jpsrv__StartUpload(soap, tab[_jpps].val?:jpps, "", &su_in, &su_out);
                ret = check_soap_fault(soap, ret);
                /* XXX: grrrrrrr! test it!!!*/
@@ -638,6 +689,9 @@ static int sandbox_importer(void)
                ret = soap_call___jpsrv__CommitUpload(soap, tab[_jpps].val?:jpps, "", &cu_in, &empty);
                if ( (ret = check_soap_fault(soap, ret)) ) break;
                dprintf("[%s] Dump upload succesfull\n", name);
+#ifdef JP_PERF
+               } else ret = 0;
+#endif
        } while (0);
 
        edg_wll_MaildirTransEnd(sandbox_mdir, fname, ret? LBMD_TRANS_FAILED_RETRY: LBMD_TRANS_OK);