From 7b31f01d5454b619012c06fc732c4f47d3627be2 Mon Sep 17 00:00:00 2001 From: =?utf8?q?Franti=C5=A1ek=20Dvo=C5=99=C3=A1k?= Date: Wed, 26 Mar 2008 14:05:49 +0000 Subject: [PATCH] merge (and updates due to security API changes) --- org.glite.jp.client/Makefile | 23 +- org.glite.jp.client/config/startup | 10 +- org.glite.jp.client/examples/glite-jp-importer.sh | 15 +- org.glite.jp.client/examples/mill_feed.c | 317 ++++++++++++++++++++++ org.glite.jp.client/interface/jpimporter.h | 17 ++ org.glite.jp.client/project/version.properties | 3 +- org.glite.jp.client/src/jpimp_lib.c | 1 + org.glite.jp.client/src/jpimporter.c | 254 +++++++++++++---- 8 files changed, 572 insertions(+), 68 deletions(-) create mode 100644 org.glite.jp.client/examples/mill_feed.c diff --git a/org.glite.jp.client/Makefile b/org.glite.jp.client/Makefile index bb3e673..a1a869e 100644 --- a/org.glite.jp.client/Makefile +++ b/org.glite.jp.client/Makefile @@ -14,7 +14,6 @@ glite_location=/opt/glite globus_prefix=/opt/globus nothrflavour=gcc32 thrflavour=gcc32pthr -expat_prefix=/opt/expat gsoap_prefix=/software/gsoap-2.6 CC=gcc @@ -25,6 +24,10 @@ STAGETO=include/${globalprefix}/${jpprefix} VPATH=${top_srcdir}/src:${top_srcdir}/examples:${top_srcdir}/project$:${top_srcdir}/interface:${stagedir}/interface:${top_srcdir}/build +ifdef JP_PERF + JP_PERF_CFLAGS:=-DJP_PERF=1 +endif + GLOBUS_LIBS:=-L${globus_prefix}/lib \ -lglobus_ftp_client_${nothrflavour} \ -lglobus_ftp_control_${nothrflavour} @@ -33,7 +36,7 @@ GLOBUS_CFLAGS:=-I${globus_prefix}/include/${nothrflavour} DEBUG:=-g -O0 -DDEBUG -CFLAGS:=${DEBUG} -I. -I${top_srcdir}/interface -I${top_srcdir}/src -I${gsoap_prefix}/include -I${gsoap_prefix} -I${stagedir}/include ${GLOBUS_CFLAGS} -I${libtar}/include +CFLAGS:=${DEBUG} -I. -I${top_srcdir}/interface -I${top_srcdir}/src -I${gsoap_prefix}/include -I${gsoap_prefix} -I${stagedir}/include ${GLOBUS_CFLAGS} -I${libtar}/include ${JP_PERF_CFLAGS} -W -Wall -Wno-unused-parameter -D_GNU_SOURCE LDFLAGS:=-L${stagedir}/lib -L${libtar}/lib LINK:=libtool --mode=link ${CC} ${LDFLAGS} @@ -41,10 +44,12 @@ LTCOMPILE:=libtool --mode=compile ${CC} ${CFLAGS} LINKXX:=libtool --mode=link ${CXX} ${LDFLAGS} INSTALL:=libtool --mode=install install +LIBTAR:=-L${libtar}/lib -ltar + STAGE_HDRS:=jpcl_ctx_int.h HDRS:=jp_client.h jpimporter.h -EXAMPLES:=jpps_upload_files +EXAMPLES:=jpps_upload_files mill_feed LIBOBJS:=jpcl_ctx.o jpimp_lib.o LIBTHROBJS:=${LIBOBJS:.o=.thr.o} @@ -77,18 +82,19 @@ compile: ${daemon} ${LIB} ${EXAMPLES} examples: ${EXAMPLES} -# XXX: should depend on -ltar but we have no dynamic version so far :-( ${LIB}: ${LIBOBJS} - ${LINK} ${version_info} -o $@ ${LIBLOBJS} -rpath ${glite_location}/lib ${LBMAILDIRLIB} + ${LINK} ${version_info} -o $@ ${LIBLOBJS} -rpath ${glite_location}/lib ${LBMAILDIRLIB} ${LIBTAR} ${daemon}: ${OBJS} ${LINK} -o $@ ${OBJS} ${LBMAILDIRLIB} ${GSOAPLIB} ${GLOBUS_LIBS} ${EXAMPLES}: ${LIB} -# XXX: -ltar should be in the library -${EXAMPLES}: %: %.o - ${LINK} -o $@ $< ${LIB} ${LBMAILDIRLIB} -ltar +jpps_upload_files: %: %.o + ${LINK} -o $@ $< ${LIB} ${LBMAILDIRLIB} + +mill_feed: %: %.o + ${LINK} -o $@ $< ${LBMAILDIRLIB} -lglite_lb_common_${nothrflavour} @@ -148,6 +154,7 @@ install: ${INSTALL} -m 755 ${daemon} ${PREFIX}/bin ${INSTALL} -m 644 ${LIB} ${PREFIX}/lib ${INSTALL} -m 644 jpps_upload_files ${PREFIX}/examples/glite-jp-primary-upload_files + ${INSTALL} -m 755 mill_feed ${PREFIX}/examples/glite-jp-mill_feed cd ${top_srcdir}/examples && ${INSTALL} -m 755 glite-jp-importer.sh ${PREFIX}/examples/ ${INSTALL} -m 755 ${top_srcdir}/config/startup ${PREFIX}/etc/init.d/glite-jp-importer cd ${top_srcdir}/interface && ${INSTALL} -m 644 ${HDRS} ${PREFIX}/${STAGETO} diff --git a/org.glite.jp.client/config/startup b/org.glite.jp.client/config/startup index 5d5173e..b38020d 100755 --- a/org.glite.jp.client/config/startup +++ b/org.glite.jp.client/config/startup @@ -2,6 +2,7 @@ GLITE_LOCATION=${GLITE_LOCATION:-/opt/glite} GLITE_LOCATION_VAR=${GLITE_LOCATION_VAR:-/var/glite} +GLITE_TMP=${GLITE_TMP:-/tmp} [ -f /etc/glite.conf ] && . /etc/glite.conf [ -f $GLITE_LOCATION/etc/glite-wms.conf ] && . $GLITE_LOCATION/etc/glite-wms.conf @@ -36,8 +37,10 @@ start() [ -z "$creds" ] && echo $0: WARNING: No credentials specified. Using default lookup which is dangerous. >&2 - [ -n "$GLITE_LB_EXPORT_JPREG_MAILDIR" ] && jpreg_maildir="--reg-mdir $GLITE_LB_EXPORT_JPREG_MAILDIR " - [ -z "$GLITE_LB_EXPORT_JPDUMP_MAILDIR" ] && GLITE_LB_EXPORT_JPDUMP_MAILDIR=/tmp/jpdump + [ -z "$GLITE_LB_EXPORT_JPREG_MAILDIR" ] && GLITE_LB_EXPORT_JPREG_MAILDIR=$GLITE_TMP/jpreg + jpreg_maildir="--reg-mdir $GLITE_LB_EXPORT_JPREG_MAILDIR " + [ -d "$GLITE_LB_EXPORT_JPREG_MAILDIR" ] || mkdir "$GLITE_LB_EXPORT_JPREG_MAILDIR" && chown $GLITE_USER:$GLITE_GROUP "$GLITE_LB_EXPORT_JPREG_MAILDIR" + [ -z "$GLITE_LB_EXPORT_JPDUMP_MAILDIR" ] && GLITE_LB_EXPORT_JPDUMP_MAILDIR=$GLITE_TMP/jpdump jpdump_maildir="--dump-mdir $GLITE_LB_EXPORT_JPDUMP_MAILDIR " [ -d "$GLITE_LB_EXPORT_JPDUMP_MAILDIR" ] || mkdir "$GLITE_LB_EXPORT_JPDUMP_MAILDIR" && chown $GLITE_USER:$GLITE_GROUP "$GLITE_LB_EXPORT_JPDUMP_MAILDIR" [ -n "$GLITE_LB_EXPORT_JPPS" ] && jpps="--jpps $GLITE_LB_EXPORT_JPPS " @@ -55,7 +58,8 @@ start() # -i $pidfile $jpreg_maildir $jpdump_maildir $jpps $sandbox_maildir $creds" \ su - $GLITE_USER -c "$GLITE_LOCATION/bin/glite-jp-importer \ - -i $pidfile $jpreg_maildir $jpdump_maildir $jpps $sandbox_maildir $keep_jobs $creds" \ + -i $pidfile $jpreg_maildir $jpdump_maildir $jpps $sandbox_maildir $keep_jobs \ + $creds $GLITE_JP_IMPORTER_ARGS" \ && echo " done" || echo " FAILED" } diff --git a/org.glite.jp.client/examples/glite-jp-importer.sh b/org.glite.jp.client/examples/glite-jp-importer.sh index f6347bf..eb40277 100644 --- a/org.glite.jp.client/examples/glite-jp-importer.sh +++ b/org.glite.jp.client/examples/glite-jp-importer.sh @@ -32,7 +32,11 @@ fi # dump directory of bkserver if [ -z "$GLITE_LB_EXPORT_DUMPDIR" ]; then GLITE_LB_EXPORT_DUMPDIR=/tmp/dump - echo "GLITE_LB_EXPORT_DUMPDIR not specified (-D arguent of the bkserver), used $GLITE_LB_EXPORT_DUMPDIR" + echo "GLITE_LB_EXPORT_DUMPDIR not specified (-D argument of the bkserver), used $GLITE_LB_EXPORT_DUMPDIR" +fi +if [ -z "$GLITE_LB_EXPORT_PURGEDIR" ]; then + GLITE_LB_EXPORT_PURGEDIR=/tmp/purge + echo "GLITE_LB_EXPORT_PURGEDIR not specified (-S argument of the bkserver), used $GLITE_LB_EXPORT_PURGEDIR" fi # LB maildir for job registration if [ -z "$GLITE_LB_EXPORT_JPREG_MAILDIR" ]; then @@ -67,7 +71,7 @@ fi echo "Using cert args $CERT_ARGS" -$PREFIX/bin/glite-jp-importer --reg-mdir $GLITE_LB_EXPORT_JPREG_MAILDIR --dump-mdir $GLITE_LB_EXPORT_JPDUMP_MAILDIR $CERT_ARGS --sandbox-mdir $GLITE_LB_EXPORT_SANDBOX_MAILDIR -g --jpps $GLITE_LB_EXPORT_JPPS $pidfile$keep_jobs> $LOGDIR/jp-importer.log 2>&1 & +$PREFIX/bin/glite-jp-importer --reg-mdir $GLITE_LB_EXPORT_JPREG_MAILDIR --dump-mdir $GLITE_LB_EXPORT_JPDUMP_MAILDIR $CERT_ARGS --sandbox-mdir $GLITE_LB_EXPORT_SANDBOX_MAILDIR -g --jpps $GLITE_LB_EXPORT_JPPS $pidfile$keep_jobs$GLITE_JP_IMPORTER_ARGS > $LOGDIR/jp-importer.log 2>&1 & JP_PID=$! trap "kill $JP_PID; exit 0" SIGINT @@ -75,16 +79,17 @@ trap "kill $JP_PID; exit 0" SIGINT while [ 1 ]; do $PREFIX/bin/glite-lb-purge $GLITE_LB_EXPORT_PURGE_ARGS -l -m $GLITE_LB_EXPORT_BKSERVER -s - for file in $GLITE_LB_EXPORT_DUMPDIR/*; do + list=`ls $GLITE_LB_EXPORT_PURGEDIR/* 2>/dev/null` + for file in $list; do if [ -s $file ]; then $PREFIX/bin/glite-lb-dump_exporter -d $file -s $GLITE_LB_EXPORT_JOBSDIR -m $GLITE_LB_EXPORT_JPDUMP_MAILDIR if [ -n "$GLITE_LB_EXPORT_DUMPDIR_KEEP" ]; then mv $file $GLITE_LB_EXPORT_DUMPDIR_KEEP else - rm $file + rm -f $file fi else - rm $file + rm -f $file fi done diff --git a/org.glite.jp.client/examples/mill_feed.c b/org.glite.jp.client/examples/mill_feed.c new file mode 100644 index 0000000..c24c9e5 --- /dev/null +++ b/org.glite.jp.client/examples/mill_feed.c @@ -0,0 +1,317 @@ +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include "jp_client.h" +#include "jpimporter.h" +#include "glite/lb/lb_maildir.h" +#include "glite/jobid/cjobid.h" + + +#define USER "Job Generator Buddy" +#define BKSERVER "funny.zcu.cz" +#define BKPORT 9000 +#ifndef EDG_DUMP_STORAGE +#define EDG_DUMP_STORAGE "/tmp/dump" +#endif +#ifndef EDG_PURGE_STORAGE +#define EDG_PURGE_STORAGE "/tmp/purge" +#endif + + +char *jpreg_dir; +char *dump_dir; +char *user; +int do_exit = 0; +int perf_regs, perf_dumps; +char perf_ts[100]; +char *dump; char **dump_index; size_t dump_tokens; +int speed = 0; +double duration = 0.0; + +static struct option opts[] = { + { "help", 0, NULL, 'h'}, + { "reg-mdir", 1, NULL, 'R'}, + { "dump-mdir", 1, NULL, 'D'}, + { "break", 1, NULL, 'b'}, + { "dump", 1, NULL, 'd'}, +// { "sandbox-mdir",1, NULL, 's'}, + { NULL, 0, NULL, 0} +}; +static const char *get_opt_string = "hR:D:b:d:"; + +static int register_init(); +static int register_add(const char *jobid, char **new_jobid); +static void get_time(char *s, size_t maxs, double *t); +static int dump_init(const char *start_jobid, const char *filenmae); +static int dump_add(const char *filename, const char *jobid); +static void dump_done(); + + +static void handler(int sig) { + do_exit = sig; + signal(sig, SIG_DFL); +} + + +static void usage(const char *program) { + fprintf(stderr, "Usage: %s [OPTIONS]\n" + "\t-R,--reg-mdir\n" + "\t-D,--dump-mdir\n" +// "\t-s,--sandbox-mdir\n" + "\t-b,--break speed (jobs/day)\n" + "\t-d,--dump dump file\n" + , program); +} + + +int main(int argc, char *argv[]) { + char start_jobid[256], stop_jobid[256], *fn; + double ts, ts2, last, now; + int ret, opt; + FILE *f; + char *jobid, *dumpfile = NULL; + + while ((opt = getopt_long(argc, argv, get_opt_string, opts, NULL)) != EOF) + switch (opt) { + case 'h': usage(argv[0]); return 0; + case 'R': jpreg_dir = strdup(optarg); break; + case 'D': dump_dir = strdup(optarg); break; + case 'b': speed = atoi(optarg); if (speed) duration = 24.0*3600.0*1000000.0/speed; break; + case 'd': dumpfile = optarg; break; + default: printf("opt: %c\n", opt); usage(argv[0]); return 1; + } + + get_time(perf_ts, sizeof(perf_ts), &ts); + snprintf(start_jobid, sizeof(start_jobid), PERF_JOBID_START_PREFIX "%s", perf_ts); + snprintf(stop_jobid, sizeof(stop_jobid), PERF_JOBID_STOP_PREFIX "%s", perf_ts); + + if ((ret = register_init()) != 0) return ret; + if ((ret = dump_init(start_jobid, dumpfile)) != 0) return ret; + if ((ret = register_add(start_jobid, NULL)) != 0) return ret; + if (signal(SIGINT, handler) == SIG_ERR) { + ret = errno; + fprintf(stderr, "%s: can't set signal handler: %s\n", __FUNCTION__, strerror(errno)); + return ret; + } + if (speed) printf("speed: %d jobs/day (delay %lf)\n", speed, duration / 1000000.0); + else printf("speed: unlimited\n"); + printf("dump: %s\n", dumpfile ? dumpfile : "(none)"); + printf("reg-mdir: %s\n", jpreg_dir); + printf("dump-mdir: %s\n", dump_dir); + printf("start: %lf\n", ts); + printf("%s\n", start_jobid); + last = ts; + while (!do_exit) { + struct timeval tv; + + if ((ret = register_add(NULL, &jobid)) != 0) return ret; +// printf("%s\n", jobid); + if (dumpfile) { + if ((ret = dump_add(dumpfile, jobid)) != 0) return ret; +// printf(" dumped %s\n", dumpfile); + } + free(jobid); + gettimeofday(&tv, NULL); + now = tv.tv_sec + (double)tv.tv_usec / 1000000.0; + if (now < last + duration) usleep(last + duration - now); + last = now; + } + if ((ret = register_add(stop_jobid, NULL)) != 0) return ret; + asprintf(&fn, PERF_STOP_FILE_FORMAT, perf_ts); + if ((f = fopen(fn, "wt")) == NULL) { + ret = errno; + free(fn); + fprintf(stderr, "Can' create file '%s': %s\n", fn, strerror(errno)); + return ret; + } + free(fn); + fprintf(f, "reg-imp\t%d\n", perf_regs); + fprintf(f, "dump-imp\t%d\n", perf_dumps); + fclose(f); + dump_done(); + + get_time(NULL, -1, &ts2); + printf("stop: %lf\n", ts2); + printf("regs: %d (%lf jobs/day)\n", perf_regs, 86400.0 * perf_regs / (ts2-ts)); + printf("dumps: %d (%lf jobs/day)\n", perf_dumps, 86400.0 * perf_dumps / (ts2-ts)); + printf("%s\n", stop_jobid); + + return 0; +} + + +static void get_time(char *s, size_t maxs, double *t) { + struct timeval tv; + struct tm tm; + + gettimeofday(&tv, NULL); + if (t) *t = tv.tv_sec + (double)tv.tv_usec / 1000000.0; + gmtime_r(&tv.tv_sec, &tm); + if (s && maxs > 0) strftime(s, maxs, "%FT%TZ", &tm); +} + + +static int register_init() { + char *env; + + if (!jpreg_dir) { + env = getenv("GLITE_LB_EXPORT_JPREG_MAILDIR"); + if (env) jpreg_dir = strdup(env); + else jpreg_dir = strdup(GLITE_REG_IMPORTER_MDIR); + } + + + // TODO: better from certificate + env = getenv("GLITE_USER"); + if (!env) env = USER; + user = strdup(env); + + if (edg_wll_MaildirInit(jpreg_dir) != 0) { + fprintf(stderr, "maildir init on %s failed\n", jpreg_dir); + return EIO; + } + + perf_regs = 0; + return 0; +} + + +static int register_add(const char *jobid, char **new_jobid) { + glite_jobid_t j; + char *tmpjobid, *msg; + + if (!jobid) { + if (glite_jobid_create(BKSERVER, BKPORT, &j) != 0 || (tmpjobid = glite_jobid_unparse(j)) == NULL) { + fprintf(stderr, "Can't create jobid\n"); + return EIO; + } + glite_jobid_free(j); + } else tmpjobid = strdup(jobid); + asprintf(&msg, "%s\n%s", tmpjobid, user); + if (new_jobid) *new_jobid = tmpjobid; + else free(tmpjobid); + if (edg_wll_MaildirStoreMsg(jpreg_dir, BKSERVER, msg) != 0) { + fprintf(stderr, "Can't store message: %s\n", lbm_errdesc); + return EIO; + } + free(msg); + + perf_regs++; + return 0; +} + + +static int dump_init(const char *start_jobid, const char *filename) { + char *env, *ptr, *delim; + FILE *f; + long ssize; + size_t i, dump_maxtokens, size; + int ret; + + unlink(PERF_START_FILE); + + dump = NULL; + dump_index = NULL; + dump_tokens = 0; + if (filename) { + if ((f = fopen(filename, "rt")) == NULL) { + fprintf(stderr, "Can't open '%s': %s\n", filename, strerror(errno)); + return EIO; + } + if (fseek(f, 0, SEEK_END) == -1 || (ssize = ftell(f)) == -1 || fseek(f, 0, SEEK_SET) == -1) { + fprintf(stderr, "Can't get position in '%s': %s\n", filename, strerror(errno)); + return EIO; + } + dump = malloc(size = ssize); + if (fread(dump, size, 1, f) != 1) { + ret = errno; + fprintf(stderr, "Error reading %ld bytes from file: %s\n", ssize, strerror(errno)); + return ret; + } + fclose(f); + + dump_maxtokens = 1024; + dump_index = malloc(sizeof(char *) * dump_maxtokens); + i = 0; + ptr = dump; + do { + if (dump_tokens >= dump_maxtokens) { + dump_maxtokens *= 2; + dump_index = realloc(dump_index, sizeof(char *) * dump_maxtokens); + } + delim = strstr(ptr, "DG.JOBID=\""); + if (delim != ptr) { + dump_index[dump_tokens++] = ptr; + if (delim) { + delim[10] = '\0'; + ptr = delim + 11; + } else ptr = NULL; + } + if (ptr) ptr = strchr(ptr, '\"'); + } while (ptr && ptr[0]); + } +//for (i = 0; i < dump_tokens; i++) printf("####%s\n", dump_index[i]); + + if (!dump_dir) { + // wrong purging to GLITE_LB_EXPORT_DUMPDIR on older versions + env = getenv("GLITE_LB_EXPORT_PURGEDIR"); + if (env) dump_dir = strdup(env); + else dump_dir = strdup(EDG_DUMP_STORAGE); + } + mkdir(dump_dir, 0755); + perf_dumps = 0; + + if ((f = fopen(PERF_START_FILE, "wt")) == NULL) { + fprintf(stderr, "Can't create file '" PERF_START_FILE "': %s\n", strerror(errno)); + return EIO; + } + if (start_jobid) fprintf(f, "%s\n", start_jobid); + fclose(f); + + return 0; +} + + +static int dump_add(const char *filename, const char *jobid) { + char *fn; + int ret; + size_t i; + FILE *f; + + ret = 0; + asprintf(&fn, "%s/mill-test-%s-%06d", dump_dir, perf_ts, perf_dumps); + if ((f = fopen(fn , "wt")) == NULL) { + ret = errno; + fprintf(stderr, "Can't create file '%s': %s\n", fn, strerror(errno)); + goto err; + } + for (i = 0; i < dump_tokens; i++) { + if (fputs(dump_index[i], f) == EOF || (i + 1 < dump_tokens && (fputs(jobid, f) == EOF))) { + ret = errno; + fprintf(stderr, "Can't write to '%s': %s\n", fn, strerror(errno)); + goto err_close; + } + } + + perf_dumps++; +err_close: + fclose(f); +err: + free(fn); + return ret; +} + + +static void dump_done() { + free(dump_index); + free(dump); + dump_tokens = 0; +} diff --git a/org.glite.jp.client/interface/jpimporter.h b/org.glite.jp.client/interface/jpimporter.h index 93f7c72..019014f 100644 --- a/org.glite.jp.client/interface/jpimporter.h +++ b/org.glite.jp.client/interface/jpimporter.h @@ -1,6 +1,23 @@ #ifndef __GLITE_JPIMPORTER__ #define __GLITE_JPIMPORTER__ +#ifndef GLITE_REG_IMPORTER_MDIR +#define GLITE_REG_IMPORTER_MDIR "/tmp/jpreg" +#endif + +#ifndef GLITE_DUMP_IMPORTER_MDIR +#define GLITE_DUMP_IMPORTER_MDIR "/tmp/jpdump" +#endif + +#ifndef GLITE_SANDBOX_IMPORTER_MDIR +#define GLITE_SANDBOX_IMPORTER_MDIR "/tmp/jpsandbox" +#endif + +#define PERF_JOBID_START_PREFIX "https://start.megajob/START-" +#define PERF_JOBID_STOP_PREFIX "https://stop.megajob/STOP-" +#define PERF_START_FILE "/tmp/jp_megajob_start" +#define PERF_STOP_FILE_FORMAT "/tmp/jp_megajob_%s" + #ifdef __cplusplus extern "C" { #endif diff --git a/org.glite.jp.client/project/version.properties b/org.glite.jp.client/project/version.properties index 46eb5a1..d706b06 100644 --- a/org.glite.jp.client/project/version.properties +++ b/org.glite.jp.client/project/version.properties @@ -1,2 +1,3 @@ -module.version=1.2.0 +# $Header$ +module.version=1.2.1 module.age=1 diff --git a/org.glite.jp.client/src/jpimp_lib.c b/org.glite.jp.client/src/jpimp_lib.c index efebc2ab..974c0dc 100644 --- a/org.glite.jp.client/src/jpimp_lib.c +++ b/org.glite.jp.client/src/jpimp_lib.c @@ -5,6 +5,7 @@ #include #include #include +#include #define COMPILE_WITH_LIBTAR diff --git a/org.glite.jp.client/src/jpimporter.c b/org.glite.jp.client/src/jpimporter.c index bc27574..94a21d1 100644 --- a/org.glite.jp.client/src/jpimporter.c +++ b/org.glite.jp.client/src/jpimporter.c @@ -6,6 +6,7 @@ #include #include #include +#include #include #include #include @@ -13,6 +14,7 @@ #include #include #include +#include #include "glite/lb/lb_maildir.h" @@ -24,6 +26,8 @@ #include "glite/security/glite_gscompat.h" #include "globus_ftp_client.h" +#include "jp_client.h" +#include "jpimporter.h" #if GSOAP_VERSION <= 20602 #define soap_call___jpsrv__RegisterJob soap_call___ns1__RegisterJob @@ -35,7 +39,6 @@ typedef struct { char *val; } msg_pattern_t; - #ifndef dprintf #define dprintf(FMT, ARGS...) { if (debug) printf(FMT, ##ARGS); } #endif @@ -46,26 +49,16 @@ typedef struct { #define GLITE_JPIMPORTER_PIDFILE "/var/run/glite-jpimporter.pid" #endif -#ifndef GLITE_REG_IMPORTER_MDIR -#define GLITE_REG_IMPORTER_MDIR "/tmp/jpreg" -#endif - -#ifndef GLITE_DUMP_IMPORTER_MDIR -#define GLITE_DUMP_IMPORTER_MDIR "/tmp/jpdump" -#endif - -#ifndef GLITE_SANDBOX_IMPORTER_MDIR -#define GLITE_SANDBOX_IMPORTER_MDIR "/tmp/jpsandbox" -#endif - #ifndef GLITE_JPPS #define GLITE_JPPS "http://localhost:8901" #endif - #define MAX_REG_CONNS 500 #define JPPS_NO_RESPONSE_TIMEOUT 120 - +#define JPREG_REPEAT_TIMEOUT 300 +#define JPREG_GIVUP_TIMEOUT 3000 +#define JP_REPEAT_TIMEOUT 360 +#define JP_GIVUP_TIMEOUT 3600 static int debug = 0; static int die = 0; @@ -84,8 +77,16 @@ static char *server_cert = NULL, *server_key = NULL, *cadir = NULL; static edg_wll_GssCred mycred = NULL; -static char *mysubj; -struct timeval to = {JPPS_NO_RESPONSE_TIMEOUT, 0}; +#ifdef JP_PERF +typedef struct { + char *id, *name; + long int count, limit; + double start, end; +} perf_t; + +int sink = 0; +perf_t perf = {name:NULL,}; +#endif static struct option opts[] = { @@ -101,13 +102,28 @@ static struct option opts[] = { { "pidfile", 1, NULL, 'i'}, { "poll", 1, NULL, 't'}, { "store", 1, NULL, 'S'}, + { "store", 1, NULL, 'S'}, +#ifdef JP_PERF + { "perf-sink", 1, NULL, 'K'}, +#endif { NULL, 0, NULL, 0} }; -static const char *get_opt_string = "hgp:r:d:s:i:t:c:k:C:"; +static const char *get_opt_string = "hgp:r:d:s:i:t:c:k:C:" +#ifdef JP_PERF + "K:" +#endif +; #include "glite/jp/ws_fault.c" +#ifdef JP_PERF +static void stats_init(perf_t *perf, const char *name); +static void stats_set_jobid(perf_t *perf, const char *jobid); +static void stats_get_limit(perf_t *perf, const char *name); +static void stats_done(perf_t *perf); +#endif + static void usage(char *me) { fprintf(stderr,"usage: %s [option]\n" @@ -121,9 +137,12 @@ static void usage(char *me) "\t-d, --dump-mdir path to the 'LB maildir' subtree for LB dumps\n" "\t-s, --sandbox-mdir path to the 'LB maildir' subtree for input/output sandboxes\n" "\t-i, --pidfile file to store master pid\n" - "\t-t, --poll maildir polling interval (in seconds)\n", - "\t-S, --store keep uploaded jobs in this directory\n", - me); + "\t-t, --poll maildir polling interval (in seconds)\n" + "\t-S, --store keep uploaded jobs in this directory\n" +#ifdef JP_PERF + "\t-K, --perf-sink 1=stats, 2=without WS calls, 3=stats+without WS\n" +#endif + , me); } static void catchsig(int sig) @@ -131,7 +150,7 @@ static void catchsig(int sig) die = sig; } -static void catch_chld(int sig) +static void catch_chld(int sig __attribute__((unused))) { child_died = 1; } @@ -143,7 +162,7 @@ static int dump_importer(void); static int sandbox_importer(void); static int parse_msg(char *, msg_pattern_t []); static int gftp_put_file(const char *, int); - +static int refresh_connection(struct soap *soap); int main(int argc, char *argv[]) @@ -157,7 +176,6 @@ int main(int argc, char *argv[]) char *name, pidfile[PATH_MAX] = GLITE_JPIMPORTER_PIDFILE; glite_gsplugin_Context plugin_ctx; - edg_wll_GssCred cred; name = strrchr(argv[0],'/'); if (name) name++; else name = argv[0]; @@ -179,6 +197,9 @@ int main(int argc, char *argv[]) case 'd': strcpy(dump_mdir, optarg); break; case 's': strcpy(sandbox_mdir, optarg); break; case 'i': strcpy(pidfile, optarg); break; +#ifdef JP_PERF + case 'K': sink = atoi(optarg); break; +#endif case '?': usage(name); return 1; } if ( optind < argc ) { usage(name); return 1; } @@ -237,8 +258,8 @@ int main(int argc, char *argv[]) " - unable to watch them for changes!\n", argv[0]); if ( cadir ) setenv("X509_CERT_DIR", cadir, 1); edg_wll_gss_watch_creds(server_cert, &cert_mtime); - if ( !edg_wll_gss_acquire_cred_gsi(server_cert, server_key, &mycred, &mysubj, &gss_code) ) { - dprintf("[master] Server identity: %s\n", mysubj); + if ( !edg_wll_gss_acquire_cred_gsi(server_cert, server_key, &mycred, &gss_code) ) { + dprintf("[master] Server identity: %s\n", mycred->name); } else { char *errmsg; edg_wll_gss_get_error(&gss_code, "edg_wll_gss_acquire_cred_gsi()", &errmsg); @@ -269,13 +290,6 @@ int main(int argc, char *argv[]) soap_set_namespaces(soap, jpps__namespaces); glite_gsplugin_init_context(&plugin_ctx); - if (edg_wll_gss_acquire_cred_gsi(server_cert, server_key, &cred, NULL, NULL) != 0) { - perror("can't acquire credentials"); - exit(1); - } - glite_gsplugin_set_timeout(plugin_ctx, &to); - glite_gsplugin_set_credential(plugin_ctx, mycred); - soap_register_plugin_arg(soap, glite_gsplugin,plugin_ctx); if ( (reg_pid = slave(reg_importer, "reg-imp")) < 0 ) { @@ -371,7 +385,11 @@ static int slave(int (*fn)(void), const char *nm) dprintf("[%s] slave started - pid [%d]\n", name, getpid()); - while ( !die && conn_cnt < MAX_REG_CONNS ) { +#ifdef JP_PERF + while ( !die && (conn_cnt < MAX_REG_CONNS || (sink & 1)) ) { +#else + while ( !die && (conn_cnt < MAX_REG_CONNS) ) { +#endif int ret = fn(); if ( ret > 0 ) conn_cnt++; @@ -384,7 +402,7 @@ static int slave(int (*fn)(void), const char *nm) } if ( die ) { - dprintf("[%s] Terminating on signal %d\n", name, getpid(), die); + dprintf("[%s] %d: Terminating on signal %d\n", name, getpid(), die); if ( !debug ) syslog(LOG_INFO, "Terminating on signal %d", die); } dprintf("[%s] Terminating after %d connections\n", name, conn_cnt); @@ -405,11 +423,11 @@ static int reg_importer(void) *aux; if ( readnew ) ret = edg_wll_MaildirTransStart(reg_mdir, &msg, &fname); - else ret = edg_wll_MaildirRetryTransStart(reg_mdir, (time_t)30, (time_t)500, &msg, &fname); + else ret = edg_wll_MaildirRetryTransStart(reg_mdir, (time_t)JPREG_REPEAT_TIMEOUT, (time_t)JPREG_GIVUP_TIMEOUT, &msg, &fname); if ( !ret ) { readnew = !readnew; if ( readnew ) ret = edg_wll_MaildirTransStart(reg_mdir, &msg, &fname); - else ret = edg_wll_MaildirRetryTransStart(reg_mdir, (time_t)30, (time_t)500, &msg, &fname); + else ret = edg_wll_MaildirRetryTransStart(reg_mdir, (time_t)JPREG_REPEAT_TIMEOUT, (time_t)JPREG_GIVUP_TIMEOUT, &msg, &fname); if ( !ret ) { readnew = !readnew; return 0; @@ -420,12 +438,6 @@ static int reg_importer(void) dprintf("[%s] edg_wll_MaildirTransStart: %s (%s)\n", name, strerror(errno), lbm_errdesc); if ( !debug ) syslog(LOG_ERR, "edg_wll_MaildirTransStart: %s (%s)", strerror(errno), lbm_errdesc); return -1; - } - - if ( ret < 0 ) { - dprintf("[%s] edg_wll_MaildirTransStart: %s (%s)\n", name, strerror(errno), lbm_errdesc); - if ( !debug ) syslog(LOG_ERR, "edg_wll_MaildirTransStart: %s (%s)", strerror(errno), lbm_errdesc); - return -1; } else if ( ret > 0 ) { dprintf("[%s] JP registration request received\n", name); if ( !debug ) syslog(LOG_INFO, "JP registration request received\n"); @@ -441,8 +453,30 @@ static int reg_importer(void) in.owner = aux; dprintf("[%s] Registering '%s'\n", name, msg); if ( !debug ) syslog(LOG_INFO, "Registering '%s'\n", msg); +#ifdef JP_PERF + if ((sink & 1)) { + if (strncasecmp(msg, PERF_JOBID_START_PREFIX, sizeof(PERF_JOBID_START_PREFIX) - 1) == 0) { + stats_init(&perf, name); + stats_set_jobid(&perf, msg); + } + if (perf.name && !perf.limit) stats_get_limit(&perf, name); + } + if (!(sink & 2)) { +#endif + refresh_connection(soap); ret = soap_call___jpsrv__RegisterJob(soap, jpps, "", &in, &empty); if ( (ret = check_soap_fault(soap, ret)) ) break; +#ifdef JP_PERF + } else ret = 0; + if (perf.name && ret == 0) { + perf.count++; + if (perf.limit) { + dprintf("[%s statistics] done %ld/%ld\n", name, perf.count, perf.limit); + if (perf.count >= perf.limit) stats_done(&perf); + } else + dprintf("[%s statistics] done %ld/no limit\n", name, perf.count); + } +#endif } while (0); edg_wll_MaildirTransEnd(reg_mdir, fname, ret? LBMD_TRANS_FAILED_RETRY: LBMD_TRANS_OK); free(fname); @@ -462,7 +496,6 @@ static int dump_importer(void) static int readnew = 1; char *msg = NULL, *fname = NULL, - *aux, *bname; char fspec[PATH_MAX]; int ret; @@ -480,11 +513,11 @@ static int dump_importer(void) if ( readnew ) ret = edg_wll_MaildirTransStart(dump_mdir, &msg, &fname); - else ret = edg_wll_MaildirRetryTransStart(dump_mdir, (time_t)60, (time_t)600, &msg, &fname); + else ret = edg_wll_MaildirRetryTransStart(dump_mdir, (time_t)JP_REPEAT_TIMEOUT, (time_t)JP_GIVUP_TIMEOUT, &msg, &fname); if ( !ret ) { readnew = !readnew; if ( readnew ) ret = edg_wll_MaildirTransStart(dump_mdir, &msg, &fname); - else ret = edg_wll_MaildirRetryTransStart(dump_mdir, (time_t)60, (time_t)600, &msg, &fname); + else ret = edg_wll_MaildirRetryTransStart(dump_mdir, (time_t)JP_REPEAT_TIMEOUT, (time_t)JP_GIVUP_TIMEOUT, &msg, &fname); if ( !ret ) { readnew = !readnew; return 0; @@ -513,6 +546,29 @@ static int dump_importer(void) su_in.contentType = "text/lb"; dprintf("[%s] Importing LB dump file '%s'\n", name, tab[_file].val); if ( !debug ) syslog(LOG_INFO, "Importing LB dump file '%s'\n", msg); +#ifdef JP_PERF + if ((sink & 1)) { + /* statistics started by file, ended by count limit (from the appropriate result fikle) */ + FILE *f; + char item[200]; + + /* starter */ + if (!perf.name) { + f = fopen(PERF_START_FILE, "rt"); + if (f) { + stats_init(&perf, name); + fscanf(f, "%s", item); + fclose(f); + unlink(PERF_START_FILE); + stats_set_jobid(&perf, item); + } else + dprintf("[%s statistics]: not started/too much dumps: %s\n", name, strerror(errno)); + } + if (perf.name && !perf.limit) stats_get_limit(&perf, name); + } + if (!(sink & 2)) { +#endif + refresh_connection(soap); ret = soap_call___jpsrv__StartUpload(soap, tab[_jpps].val?:jpps, "", &su_in, &su_out); if ( (ret = check_soap_fault(soap, ret)) ) break; dprintf("[%s] Destination: %s\n\tCommit before: %s\n", name, su_out.destination, ctime(&su_out.commitBefore)); @@ -532,9 +588,21 @@ static int dump_importer(void) close(fhnd); dprintf("[%s] File sent, commiting the upload\n", name); cu_in.destination = su_out.destination; + refresh_connection(soap); ret = soap_call___jpsrv__CommitUpload(soap, tab[_jpps].val?:jpps, "", &cu_in, &empty); if ( (ret = check_soap_fault(soap, ret)) ) break; dprintf("[%s] Dump upload succesfull\n", name); +#ifdef JP_PERF + } else ret = 0; + if (perf.name && ret == 0) { + perf.count++; + if (perf.limit) { + dprintf("[%s statistics] done %ld/%ld\n", name, perf.count, perf.limit); + if (perf.count >= perf.limit) stats_done(&perf); + } else + dprintf("[%s statistics] done %ld/no limit\n", name, perf.count); + } +#endif if (store && *store) { bname = strdup(tab[_file].val); snprintf(fspec, sizeof fspec, "%s/%s", store, basename(bname)); @@ -568,8 +636,7 @@ static int sandbox_importer(void) struct _jpelem__CommitUploadResponse empty; static int readnew = 1; char *msg = NULL, - *fname = NULL, - *aux; + *fname = NULL; int ret; int fhnd; msg_pattern_t tab[] = { @@ -586,11 +653,11 @@ static int sandbox_importer(void) if ( readnew ) ret = edg_wll_MaildirTransStart(sandbox_mdir, &msg, &fname); - else ret = edg_wll_MaildirRetryTransStart(sandbox_mdir, (time_t)60, (time_t) 600,&msg, &fname); + else ret = edg_wll_MaildirRetryTransStart(sandbox_mdir, (time_t)JP_REPEAT_TIMEOUT, (time_t)JP_GIVUP_TIMEOUT ,&msg, &fname); if ( !ret ) { readnew = !readnew; if ( readnew ) ret = edg_wll_MaildirTransStart(sandbox_mdir, &msg, &fname); - else ret = edg_wll_MaildirRetryTransStart(sandbox_mdir, (time_t)60, (time_t) 600,&msg, &fname); + else ret = edg_wll_MaildirRetryTransStart(sandbox_mdir, (time_t)JP_REPEAT_TIMEOUT, (time_t)JP_GIVUP_TIMEOUT ,&msg, &fname); if ( !ret ) { readnew = !readnew; return 0; @@ -622,6 +689,10 @@ static int sandbox_importer(void) su_in.contentType = "tar/lb"; dprintf("[%s] Importing LB sandbox tar file '%s'\n", name, tab[_file].val); if ( !debug ) syslog(LOG_INFO, "Importing LB sandbox tar file '%s'\n", msg); +#ifdef JP_PERF + if (!(sink & 2)) { +#endif + refresh_connection(soap); ret = soap_call___jpsrv__StartUpload(soap, tab[_jpps].val?:jpps, "", &su_in, &su_out); ret = check_soap_fault(soap, ret); /* XXX: grrrrrrr! test it!!!*/ @@ -638,9 +709,13 @@ static int sandbox_importer(void) close(fhnd); dprintf("[%s] File sent, commiting the upload\n", name); cu_in.destination = su_out.destination; + refresh_connection(soap); ret = soap_call___jpsrv__CommitUpload(soap, tab[_jpps].val?:jpps, "", &cu_in, &empty); if ( (ret = check_soap_fault(soap, ret)) ) break; dprintf("[%s] Dump upload succesfull\n", name); +#ifdef JP_PERF + } else ret = 0; +#endif } while (0); edg_wll_MaildirTransEnd(sandbox_mdir, fname, ret? LBMD_TRANS_FAILED_RETRY: LBMD_TRANS_OK); @@ -760,7 +835,7 @@ static int gftp_put_file(const char *url, int fhnd) put_file_err("Could not initialise operation attributes"); if ( globus_ftp_client_operationattr_set_authorization( - &op_attr, server_cert? mycred: GSS_C_NO_CREDENTIAL, + &op_attr, server_cert? mycred->gss_cred: GSS_C_NO_CREDENTIAL, NULL, "", 0, NULL) != GLOBUS_SUCCESS ) put_file_err("Could not set authorization procedure"); @@ -809,6 +884,83 @@ static int gftp_put_file(const char *url, int fhnd) return (gError == GLOBUS_TRUE)? 1: 0; } + +static int refresh_connection(struct soap *soap) { + struct timeval to = {JPPS_NO_RESPONSE_TIMEOUT, 0}; + edg_wll_GssCred newcred; + edg_wll_GssStatus gss_code; + glite_gsplugin_Context gp_ctx; + + gp_ctx = glite_gsplugin_get_context(soap); + glite_gsplugin_set_timeout(gp_ctx, &to); + + switch ( edg_wll_gss_watch_creds(server_cert, &cert_mtime) ) { + case 0: break; + case 1: + if ( !edg_wll_gss_acquire_cred_gsi(server_cert, server_key, &newcred, &gss_code) ) { + dprintf("[%s] reloading credentials successful\n", name); + edg_wll_gss_release_cred(&mycred, &gss_code); + mycred = newcred; + glite_gsplugin_set_credential(gp_ctx, newcred); + } else { dprintf("[%s] reloading credentials failed, using old ones\n", name); } + break; + case -1: dprintf("[%s] edg_wll_gss_watch_creds failed\n", name); break; + } + + return 0; +} + + +#ifdef JP_PERF +static void stats_init(perf_t *perf, const char *name) { + struct timeval tv; + + memset(perf, 0, sizeof *perf); + perf->count = 0; + perf->name = strdup(name); + gettimeofday(&tv, NULL); + perf->start = tv.tv_sec + (double)tv.tv_usec / 1000000.0; + dprintf("[%s statistics] start detected\n", name); +} + +static void stats_set_jobid(perf_t *perf, const char *jobid) { + perf->id = strdup(jobid + sizeof(PERF_JOBID_START_PREFIX) - 1); + dprintf("[%s statistics] ID %s\n", perf->name, perf->id); +} + +static void stats_get_limit(perf_t *perf, const char *name) { + FILE *f; + char *fn, item[200]; + int count; + + /* stopper */ + asprintf(&fn, PERF_STOP_FILE_FORMAT, perf->id); + f = fopen(fn, "rt"); + free(fn); + if (f) { + fscanf(f, "%s\t%d", item, &count); + if (strcasecmp(item, name) != 0) fscanf(f, "%s\t%d", item, &count); + dprintf("[%s statistics] expected %d %s\n", name, count, item); + fclose(f); + perf->limit = count; + } +} + +static void stats_done(perf_t *perf) { + struct timeval tv; + + gettimeofday(&tv, NULL); + perf->end = tv.tv_sec + (double)tv.tv_usec / 1000000.0; + dprintf("[%s statistics] %s\n", perf->name, perf->id); + dprintf("[%s statistics] start: %lf\n", perf->name, perf->start); + dprintf("[%s statistics] stop: %lf\n", perf->name, perf->end); + dprintf("[%s statistics] count: %ld (%lf job/day)\n", perf->name, perf->count, 86400.0 * perf->count / (perf->end - perf->start)); + free(perf->id); + free(perf->name); + memset(perf, 0, sizeof *perf); +} +#endif + /* XXX: we don't use it */ SOAP_NMAC struct Namespace namespaces[] = { {NULL,NULL} }; -- 1.8.2.3