From 3f63a39c3ed1211473018a9cde51df8fad864006 Mon Sep 17 00:00:00 2001 From: =?utf8?q?Franti=C5=A1ek=20Dvo=C5=99=C3=A1k?= Date: Wed, 18 Jul 2007 15:27:17 +0000 Subject: [PATCH] Initial experiments with registrations and mill. --- org.glite.jp.client/Makefile | 14 +- org.glite.jp.client/config/startup | 3 +- org.glite.jp.client/examples/glite-jp-importer.sh | 2 +- org.glite.jp.client/examples/mill_feed.c | 162 ++++++++++++++++++++++ org.glite.jp.client/interface/jpimporter.h | 16 +++ org.glite.jp.client/src/jpimporter.c | 92 +++++++++--- 6 files changed, 265 insertions(+), 24 deletions(-) create mode 100644 org.glite.jp.client/examples/mill_feed.c diff --git a/org.glite.jp.client/Makefile b/org.glite.jp.client/Makefile index cb636fc..8ce0358 100644 --- a/org.glite.jp.client/Makefile +++ b/org.glite.jp.client/Makefile @@ -24,6 +24,10 @@ STAGETO=include/${globalprefix}/${jpprefix} VPATH=${top_srcdir}/src:${top_srcdir}/examples:${top_srcdir}/project$:${top_srcdir}/interface:${stagedir}/interface:${top_srcdir}/build +ifdef JP_PERF + JP_PERF_CFLAGS:=-DJP_PERF=1 +endif + GLOBUS_LIBS:=-L${globus_prefix}/lib \ -lglobus_common_${nothrflavour} \ -lglobus_gssapi_gsi_${nothrflavour} \ @@ -34,7 +38,7 @@ GLOBUS_CFLAGS:=-I${globus_prefix}/include/${nothrflavour} DEBUG:=-g -O0 -DDEBUG -CFLAGS:=${DEBUG} -I. -I${top_srcdir}/interface -I${top_srcdir}/src -I${gsoap_prefix}/include -I${gsoap_prefix} -I${stagedir}/include ${GLOBUS_CFLAGS} -I${libtar}/include +CFLAGS:=${DEBUG} -I. -I${top_srcdir}/interface -I${top_srcdir}/src -I${gsoap_prefix}/include -I${gsoap_prefix} -I${stagedir}/include ${GLOBUS_CFLAGS} -I${libtar}/include ${JP_PERF_CFLAGS} LDFLAGS:=-L${stagedir}/lib -L${libtar}/lib LINK:=libtool --mode=link ${CC} ${LDFLAGS} @@ -47,7 +51,7 @@ LIBTAR:=-L${libtar}/lib -ltar STAGE_HDRS:=jpcl_ctx_int.h HDRS:=jp_client.h jpimporter.h -EXAMPLES:=jpps_upload_files +EXAMPLES:=jpps_upload_files mill_feed LIBOBJS:=jpcl_ctx.o jpimp_lib.o LIBTHROBJS:=${LIBOBJS:.o=.thr.o} @@ -88,9 +92,12 @@ ${daemon}: ${OBJS} ${EXAMPLES}: ${LIB} -${EXAMPLES}: %: %.o +jpps_upload_files: %: %.o ${LINK} -o $@ $< ${LIB} ${LBMAILDIRLIB} +mill_feed: %: %.o + ${LINK} -o $@ $< ${LBMAILDIRLIB} -lglite_lb_common_${nothrflavour} + JobProvenancePS.xh: %.xh: %.wsdl JobProvenanceTypes.wsdl typemap.dat @@ -149,6 +156,7 @@ install: ${INSTALL} -m 755 ${daemon} ${PREFIX}/bin ${INSTALL} -m 644 ${LIB} ${PREFIX}/lib ${INSTALL} -m 644 jpps_upload_files ${PREFIX}/examples/glite-jp-primary-upload_files + ${INSTALL} -m 755 mill_feed ${PREFIX}/examples/glite-jp-mill_feed cd ${top_srcdir}/examples && ${INSTALL} -m 755 glite-jp-importer.sh ${PREFIX}/examples/ ${INSTALL} -m 755 ${top_srcdir}/config/startup ${PREFIX}/etc/init.d/glite-jp-importer cd ${top_srcdir}/interface && ${INSTALL} -m 644 ${HDRS} ${PREFIX}/${STAGETO} diff --git a/org.glite.jp.client/config/startup b/org.glite.jp.client/config/startup index 5d5173e..44018a7 100755 --- a/org.glite.jp.client/config/startup +++ b/org.glite.jp.client/config/startup @@ -55,7 +55,8 @@ start() # -i $pidfile $jpreg_maildir $jpdump_maildir $jpps $sandbox_maildir $creds" \ su - $GLITE_USER -c "$GLITE_LOCATION/bin/glite-jp-importer \ - -i $pidfile $jpreg_maildir $jpdump_maildir $jpps $sandbox_maildir $keep_jobs $creds" \ + -i $pidfile $jpreg_maildir $jpdump_maildir $jpps $sandbox_maildir $keep_jobs \ + $creds $GLITE_JP_IMPORTER_ARGS" \ && echo " done" || echo " FAILED" } diff --git a/org.glite.jp.client/examples/glite-jp-importer.sh b/org.glite.jp.client/examples/glite-jp-importer.sh index 4d2f737..2740e05 100644 --- a/org.glite.jp.client/examples/glite-jp-importer.sh +++ b/org.glite.jp.client/examples/glite-jp-importer.sh @@ -67,7 +67,7 @@ fi echo "Using cert args $CERT_ARGS" -$PREFIX/bin/glite-jp-importer --reg-mdir $GLITE_LB_EXPORT_JPREG_MAILDIR --dump-mdir $GLITE_LB_EXPORT_JPDUMP_MAILDIR $CERT_ARGS --sandbox-mdir $GLITE_LB_EXPORT_SANDBOX_MAILDIR -g --jpps $GLITE_LB_EXPORT_JPPS $pidfile$keep_jobs> $LOGDIR/jp-importer.log 2>&1 & +$PREFIX/bin/glite-jp-importer --reg-mdir $GLITE_LB_EXPORT_JPREG_MAILDIR --dump-mdir $GLITE_LB_EXPORT_JPDUMP_MAILDIR $CERT_ARGS --sandbox-mdir $GLITE_LB_EXPORT_SANDBOX_MAILDIR -g --jpps $GLITE_LB_EXPORT_JPPS $pidfile$keep_jobs$GLITE_JP_IMPORTER_ARGS > $LOGDIR/jp-importer.log 2>&1 & JP_PID=$! trap "kill $JP_PID; exit 0" SIGINT diff --git a/org.glite.jp.client/examples/mill_feed.c b/org.glite.jp.client/examples/mill_feed.c new file mode 100644 index 0000000..676a829 --- /dev/null +++ b/org.glite.jp.client/examples/mill_feed.c @@ -0,0 +1,162 @@ +#include +#include +#include +#include +#include +#include +#include "jp_client.h" +#include "jpimporter.h" +#include "glite/lb/lb_maildir.h" +#include "glite/wmsutils/jobid/cjobid.h" +#include "glite/wmsutils/jobid/cjobid.h" + + +#define USER "Job Generator Buddy" +#define BKSERVER "funny.zcu.cz" +#define BKPORT 9000 +#ifndef EDG_DUMP_STORAGE +#define EDG_DUMP_STORAGE "/tmp/dump" +#endif +#ifndef EDG_PURGE_STORAGE +#define EDG_PURGE_STORAGE "/tmp/purge" +#endif + + +char *jpreg_dir; +char *dump_dir; +char *user; +int do_exit = 0; +int perf_regs, perf_dumps; +char perf_ts[100]; + + +static int register_init(); +static int register_add(const char *jobid); +static void get_time(char *s, size_t maxs, double *t); +static int dump_init(); + + +static void handler(int sig) { + do_exit = sig; + signal(sig, SIG_DFL); +} + +int main(int argc, char *argv[]) { + char start_jobid[256], stop_jobid[256]; + double ts, ts2; + int ret; + + get_time(perf_ts, sizeof(perf_ts), &ts); + snprintf(start_jobid, sizeof(start_jobid), PERF_JOBID_START_PREFIX "%s", perf_ts); + snprintf(stop_jobid, sizeof(stop_jobid), PERF_JOBID_STOP_PREFIX "%s", perf_ts); + + if ((ret = register_init()) != 0) return ret; + if ((ret = dump_init()) != 0) return ret; + if ((ret = register_add(start_jobid)) != 0) return ret; + if (signal(SIGINT, handler) == SIG_ERR) { + ret = errno; + fprintf(stderr, "%s: can't set signal handler: %s\n", __FUNCTION__, strerror(errno)); + return ret; + } + printf("%s\n", start_jobid); + printf("start: %lf\n", ts); + while (!do_exit) { + if ((ret = register_add(NULL)) != 0) return ret; + if (argc > 1) + if ((ret = dump_add(argv[1])) != 0) return ret; + } + if ((ret = register_add(stop_jobid)) != 0) return ret; + get_time(NULL, -1, &ts2); + printf("stop: %lf\n", ts2); + printf("regs: %d (%lf jobs/day)\n", perf_regs, 86400.0 * perf_regs / (ts2-ts)); + printf("dumps: %d (%lf jobs/day)\n", perf_dumps, 86400.0 * perf_dumps / (ts2-ts)); + printf("%s\n", stop_jobid); + + return 0; +} + + +static void get_time(char *s, size_t maxs, double *t) { + struct timeval tv; + struct tm tm; + + gettimeofday(&tv, NULL); + if (t) *t = tv.tv_sec + (double)tv.tv_usec / 1000000.0; + gmtime_r(&tv.tv_sec, &tm); + if (s && maxs > 0) strftime(s, maxs, "%FT%TZ", &tm); +} + + +static int register_init() { + char *env; + + env = getenv("GLITE_LB_EXPORT_JPREG_MAILDIR"); + if (!env) env = GLITE_REG_IMPORTER_MDIR; + jpreg_dir = strdup(env); + + // TODO: better from certificate + env = getenv("GLITE_USER"); + if (!env) env = USER; + user = strdup(env); + + if (edg_wll_MaildirInit(jpreg_dir) != 0) { + fprintf(stderr, "maildir init on %s failed\n", jpreg_dir); + return EIO; + } + + perf_regs = 0; + return 0; +} + + +static int register_add(const char *jobid) { + edg_wlc_JobId j; + char *tmpjobid, *msg; + + if (!jobid) { + if (edg_wlc_JobIdCreate(BKSERVER, BKPORT, &j) != 0 || (tmpjobid = edg_wlc_JobIdUnparse(j)) == NULL) { + fprintf(stderr, "Can't create jobid\n"); + return EIO; + } + edg_wlc_JobIdFree(j); + } else tmpjobid = strdup(jobid); + asprintf(&msg, "%s\n%s", tmpjobid, user); + free(tmpjobid); + if (edg_wll_MaildirStoreMsg(jpreg_dir, BKSERVER, msg) != 0) { + fprintf(stderr, "Can't store message: %s\n", lbm_errdesc); + return EIO; + } + free(msg); + + perf_regs++; + return 0; +} + + +static int dump_init() { + char *env; + + // FIXME: is it OK? (probably different HEAD and branch) + env = getenv("GLITE_LB_EXPORT_DUMPDIR"); + if (!env) env = EDG_DUMP_STORAGE; + dump_dir = strdup(env); + mkdir(dump_dir, 0755); + perf_dumps = 0; + + return 0; +} + + +static int dump_add(const char *filename) { + char *fn; + int ret; + + asprintf(&fn, "%s/mill-test-%s-%06d", dump_dir, perf_ts, perf_dumps); + if ((ret = link(filename, fn)) != 0) { + fprintf(stderr, "Can't link file: %s\n", strerror(errno)); + } + free(fn); + + perf_dumps++; + return ret; +} diff --git a/org.glite.jp.client/interface/jpimporter.h b/org.glite.jp.client/interface/jpimporter.h index 93f7c72..a0b0bc7 100644 --- a/org.glite.jp.client/interface/jpimporter.h +++ b/org.glite.jp.client/interface/jpimporter.h @@ -1,6 +1,22 @@ #ifndef __GLITE_JPIMPORTER__ #define __GLITE_JPIMPORTER__ +#ifndef GLITE_REG_IMPORTER_MDIR +#define GLITE_REG_IMPORTER_MDIR "/tmp/jpreg" +#endif + +#ifndef GLITE_DUMP_IMPORTER_MDIR +#define GLITE_DUMP_IMPORTER_MDIR "/tmp/jpdump" +#endif + +#ifndef GLITE_SANDBOX_IMPORTER_MDIR +#define GLITE_SANDBOX_IMPORTER_MDIR "/tmp/jpsandbox" +#endif + +#define PERF_JOBID_START_PREFIX "https://start.megajob/" +#define PERF_JOBID_STOP_PREFIX "https://stop.megajob/" + + #ifdef __cplusplus extern "C" { #endif diff --git a/org.glite.jp.client/src/jpimporter.c b/org.glite.jp.client/src/jpimporter.c index 31e9a3d..50047c4 100644 --- a/org.glite.jp.client/src/jpimporter.c +++ b/org.glite.jp.client/src/jpimporter.c @@ -6,6 +6,7 @@ #include #include #include +#include #include #include #include @@ -24,6 +25,8 @@ #include "glite/security/glite_gscompat.h" #include "globus_ftp_client.h" +#include "jp_client.h" +#include "jpimporter.h" #if GSOAP_VERSION <= 20602 #define soap_call___jpsrv__RegisterJob soap_call___ns1__RegisterJob @@ -46,27 +49,13 @@ typedef struct { #define GLITE_JPIMPORTER_PIDFILE "/var/run/glite-jpimporter.pid" #endif -#ifndef GLITE_REG_IMPORTER_MDIR -#define GLITE_REG_IMPORTER_MDIR "/tmp/jpreg" -#endif - -#ifndef GLITE_DUMP_IMPORTER_MDIR -#define GLITE_DUMP_IMPORTER_MDIR "/tmp/jpdump" -#endif - -#ifndef GLITE_SANDBOX_IMPORTER_MDIR -#define GLITE_SANDBOX_IMPORTER_MDIR "/tmp/jpsandbox" -#endif - #ifndef GLITE_JPPS #define GLITE_JPPS "http://localhost:8901" #endif - #define MAX_REG_CONNS 500 #define JPPS_NO_RESPONSE_TIMEOUT 120 - static int debug = 0; static int die = 0; static int child_died = 0; @@ -86,6 +75,12 @@ static char *server_cert = NULL, static gss_cred_id_t mycred = GSS_C_NO_CREDENTIAL; static char *mysubj; struct timeval to = {JPPS_NO_RESPONSE_TIMEOUT, 0}; +#ifdef JP_PERF +int sink = 0; +int perf_regs, perf_dumps, perf_sandboxes; +char *perf_id = NULL; +double perf_start, perf_end; +#endif static struct option opts[] = { @@ -101,10 +96,18 @@ static struct option opts[] = { { "pidfile", 1, NULL, 'i'}, { "poll", 1, NULL, 't'}, { "store", 1, NULL, 'S'}, + { "store", 1, NULL, 'S'}, +#ifdef JP_PERF + { "perf-sink", 1, NULL, 'K'}, +#endif { NULL, 0, NULL, 0} }; -static const char *get_opt_string = "hgp:r:d:s:i:t:c:k:C:"; +static const char *get_opt_string = "hgp:r:d:s:i:t:c:k:C:" +#ifdef JP_PERF + "K:" +#endif +; #include "glite/jp/ws_fault.c" @@ -121,9 +124,12 @@ static void usage(char *me) "\t-d, --dump-mdir path to the 'LB maildir' subtree for LB dumps\n" "\t-s, --sandbox-mdir path to the 'LB maildir' subtree for input/output sandboxes\n" "\t-i, --pidfile file to store master pid\n" - "\t-t, --poll maildir polling interval (in seconds)\n", - "\t-S, --store keep uploaded jobs in this directory\n", - me); + "\t-t, --poll maildir polling interval (in seconds)\n" + "\t-S, --store keep uploaded jobs in this directory\n" +#ifdef JP_PERF + "\t-K, --perf-sink 1=stats, 2=stats+run w/o WS calls\n" +#endif + , me); } static void catchsig(int sig) @@ -179,6 +185,9 @@ int main(int argc, char *argv[]) case 'd': strcpy(dump_mdir, optarg); break; case 's': strcpy(sandbox_mdir, optarg); break; case 'i': strcpy(pidfile, optarg); break; +#ifdef JP_PERF + case 'K': sink = atoi(optarg); break; +#endif case '?': usage(name); return 1; } if ( optind < argc ) { usage(name); return 1; } @@ -368,7 +377,11 @@ static int slave(int (*fn)(void), const char *nm) dprintf("[%s] slave started - pid [%d]\n", name, getpid()); - while ( !die && conn_cnt < MAX_REG_CONNS ) { +#ifdef JP_PERF + while ( !die && (conn_cnt < MAX_REG_CONNS || (sink & 1)) ) { +#else + while ( !die && (conn_cnt < MAX_REG_CONNS) ) { +#endif int ret = fn(); if ( ret > 0 ) conn_cnt++; @@ -438,8 +451,35 @@ static int reg_importer(void) in.owner = aux; dprintf("[%s] Registering '%s'\n", name, msg); if ( !debug ) syslog(LOG_INFO, "Registering '%s'\n", msg); +#ifdef JP_PERF + if (sink) { + struct timeval tv; + + if (strncasecmp(msg, PERF_JOBID_START_PREFIX, sizeof(PERF_JOBID_START_PREFIX) - 1) == 0) { + perf_regs = 0; + perf_dumps = 0; + perf_sandboxes = 0; + free(perf_id); + perf_id = strdup(msg + sizeof(PERF_JOBID_START_PREFIX) - 1); + gettimeofday(&tv, NULL); + perf_start = tv.tv_sec + (double)tv.tv_usec / 1000000.0; + dprintf("[statistics] %s\n", perf_id); + } else if (strncasecmp(msg, PERF_JOBID_STOP_PREFIX, sizeof(PERF_JOBID_STOP_PREFIX) - 1) == 0) { + gettimeofday(&tv, NULL); + perf_end = tv.tv_sec + (double)tv.tv_usec / 1000000.0; + dprintf("[statistics] %s\n", perf_id); + dprintf("[statistics] start: %lf\n", perf_start); + dprintf("[statistics] stop: %lf\n", perf_end); + dprintf("[statistics] regs: %d (%lf jobs/day)\n", perf_regs, 86400.0 * perf_regs / (perf_end - perf_start)); + } else perf_regs++; + } + if (!(sink & 2)) { +#endif ret = soap_call___jpsrv__RegisterJob(soap, jpps, "", &in, &empty); if ( (ret = check_soap_fault(soap, ret)) ) break; +#ifdef JP_PERF + } else ret = 0; +#endif } while (0); edg_wll_MaildirTransEnd(reg_mdir, fname, ret? LBMD_TRANS_FAILED_RETRY: LBMD_TRANS_OK); free(fname); @@ -510,6 +550,10 @@ static int dump_importer(void) su_in.contentType = "text/lb"; dprintf("[%s] Importing LB dump file '%s'\n", name, tab[_file].val); if ( !debug ) syslog(LOG_INFO, "Importing LB dump file '%s'\n", msg); +#ifdef JP_PERF + if (sink) perf_dumps++; + if (!(sink & 2)) { +#endif ret = soap_call___jpsrv__StartUpload(soap, tab[_jpps].val?:jpps, "", &su_in, &su_out); if ( (ret = check_soap_fault(soap, ret)) ) break; dprintf("[%s] Destination: %s\n\tCommit before: %s\n", name, su_out.destination, ctime(&su_out.commitBefore)); @@ -532,6 +576,9 @@ static int dump_importer(void) ret = soap_call___jpsrv__CommitUpload(soap, tab[_jpps].val?:jpps, "", &cu_in, &empty); if ( (ret = check_soap_fault(soap, ret)) ) break; dprintf("[%s] Dump upload succesfull\n", name); +#ifdef JP_PERF + } else ret = 0; +#endif if (store && *store) { bname = strdup(tab[_file].val); snprintf(fspec, sizeof fspec, "%s/%s", store, basename(bname)); @@ -619,6 +666,10 @@ static int sandbox_importer(void) su_in.contentType = "tar/lb"; dprintf("[%s] Importing LB sandbox tar file '%s'\n", name, tab[_file].val); if ( !debug ) syslog(LOG_INFO, "Importing LB sandbox tar file '%s'\n", msg); +#ifdef JP_PERF + if (sink) perf_sandboxes++; + if (!(sink & 2)) { +#endif ret = soap_call___jpsrv__StartUpload(soap, tab[_jpps].val?:jpps, "", &su_in, &su_out); ret = check_soap_fault(soap, ret); /* XXX: grrrrrrr! test it!!!*/ @@ -638,6 +689,9 @@ static int sandbox_importer(void) ret = soap_call___jpsrv__CommitUpload(soap, tab[_jpps].val?:jpps, "", &cu_in, &empty); if ( (ret = check_soap_fault(soap, ret)) ) break; dprintf("[%s] Dump upload succesfull\n", name); +#ifdef JP_PERF + } else ret = 0; +#endif } while (0); edg_wll_MaildirTransEnd(sandbox_mdir, fname, ret? LBMD_TRANS_FAILED_RETRY: LBMD_TRANS_OK); -- 1.8.2.3