From: cvs2svn Date: Fri, 18 May 2007 08:23:34 +0000 (+0000) Subject: This commit was manufactured by cvs2svn to create branch 'glite-jp- X-Git-Tag: glite-jp-client_R_1_2_0_1 X-Git-Url: http://scientific.zcu.cz/git/?a=commitdiff_plain;h=2521a13cd3d516d5f8ee4b991212daffe29b433a;p=jra1mw.git This commit was manufactured by cvs2svn to create branch 'glite-jp- client_branch_RC31_3'. Sprout from glite-jp-client_branch_1_1_0_RC31 2006-04-02 09:58:05 UTC cvs2svn 'This commit was manufactured by cvs2svn to create branch 'glite-jp-' Cherrypick from master 2007-05-18 08:23:31 UTC Aleš Křenek 'push version': org.glite.jp.client/config/startup org.glite.jp.client/doc/README.jpimporter org.glite.jp.client/examples/glite-jp-importer.sh org.glite.jp.client/examples/jpps_upload_files.c org.glite.jp.client/project/version.properties org.glite.jp.client/src/jpimporter.c --- diff --git a/org.glite.jp.client/config/startup b/org.glite.jp.client/config/startup index a17dd4d..5d5173e 100755 --- a/org.glite.jp.client/config/startup +++ b/org.glite.jp.client/config/startup @@ -42,9 +42,20 @@ start() [ -d "$GLITE_LB_EXPORT_JPDUMP_MAILDIR" ] || mkdir "$GLITE_LB_EXPORT_JPDUMP_MAILDIR" && chown $GLITE_USER:$GLITE_GROUP "$GLITE_LB_EXPORT_JPDUMP_MAILDIR" [ -n "$GLITE_LB_EXPORT_JPPS" ] && jpps="--jpps $GLITE_LB_EXPORT_JPPS " + [ -n "$GLITE_LB_EXPORT_SANDBOX_MAILDIR" ] && sandbox_maildir="--sandbox-mdir $GLITE_LB_EXPORT_SANDBOX_MAILDIR " + + if [ -n "$GLITE_LB_EXPORT_JOBSDIR_KEEP" ]; then + keep_jobs="--store ${GLITE_LB_EXPORT_JOBSDIR_KEEP} " + [ -d $GLITE_LB_EXPORT_JOBSDIR_KEEP ] || mkdir -p $GLITE_LB_EXPORT_JOBSDIR_KEEP + fi + echo -n Starting glite-jp-importer ... + +# XXX: HEAD +# -i $pidfile $jpreg_maildir $jpdump_maildir $jpps $sandbox_maildir $creds" \ + su - $GLITE_USER -c "$GLITE_LOCATION/bin/glite-jp-importer \ - -i $pidfile $jpreg_maildir $jpdump_maildir $jpps $creds" \ + -i $pidfile $jpreg_maildir $jpdump_maildir $jpps $sandbox_maildir $keep_jobs $creds" \ && echo " done" || echo " FAILED" } diff --git a/org.glite.jp.client/doc/README.jpimporter b/org.glite.jp.client/doc/README.jpimporter index 5041ca6..3e8bb2d 100644 --- a/org.glite.jp.client/doc/README.jpimporter +++ b/org.glite.jp.client/doc/README.jpimporter @@ -49,6 +49,7 @@ jpimporter daemon usage: glite-jp-importer [option] messages. -d, --dump-mdir path to the 'LB maildir' subtree for LB dumps This directory is scanned for the file upload msgs. + -s, --sandbox-mdir path to the 'LB maildir' subtree for input/output sandboxes -i, --pidfile file to store master pid -t, --poll maildir polling interval (in seconds) Specifies the time interval for which the process diff --git a/org.glite.jp.client/examples/glite-jp-importer.sh b/org.glite.jp.client/examples/glite-jp-importer.sh index 804447e..4d2f737 100644 --- a/org.glite.jp.client/examples/glite-jp-importer.sh +++ b/org.glite.jp.client/examples/glite-jp-importer.sh @@ -21,11 +21,9 @@ if [ -z "$GLITE_LB_EXPORT_JPPS" ]; then echo "Please specify the Job Provanance Primary Storage server." exit 1 fi -# bookkeeping server -if [ -z "$GLITE_LB_EXPORT_BKSERVER" ]; then - echo "Please specify the Book Keeping server." - exit 1 -fi +# book keeping server +GLITE_LB_SERVER_PORT=${GLITE_LB_SERVER_PORT:-9000} +GLITE_LB_EXPORT_BKSERVER=${GLITE_LB_EXPORT_BKSERVER:-localhost:$GLITE_LB_SERVER_PORT} # certificates if [ -z "$X509_USER_CERT" -o -z "$X509_USER_KEY" ]; then echo "Please set X509_USER_CERT and X509_USER_KEY." @@ -41,35 +39,50 @@ if [ -z "$GLITE_LB_EXPORT_JPREG_MAILDIR" ]; then GLITE_LB_EXPORT_JPREG_MAILDIR=/tmp/jpreg echo "GLITE_LB_EXPORT_JPREG_MAILDIR not specified (-J arguent of the bkserver), used $GLITE_LB_EXPORT_JPREG_MAILDIR" fi +# pidfile +[ -n "$GLITE_JP_IMPORTER_PIDFILE" ] && pidfile="-i $GLITE_JP_IMPORTER_PIDFILE " CERT_ARGS="-c $X509_USER_CERT -k $X509_USER_KEY" GLITE_LB_EXPORT_JPDUMP_MAILDIR=${GLITE_LB_EXPORT_JPDUMP_MAILDIR:-/tmp/jpdump} -GLITE_LB_EXPORT_DUMPDIR_OLD=${GLITE_LB_EXPORT_DUMPDIR_OLD:-$GLITE_LB_EXPORT_DUMPDIR.old} -GLITE_LB_EXPORT_EXPORTDIR=${GLITE_LB_EXPORT_EXPORTDIR:-/tmp/lbexport} +GLITE_LB_EXPORT_SANDBOX_MAILDIR=${GLITE_LB_EXPORT_SANDBOX_MAILDIR:-/tmp/jpsandbox} +GLITE_LB_EXPORT_JOBSDIR=${GLITE_LB_EXPORT_JOBSDIR:-/tmp/lbexport} PREFIX=${PREFIX:-`dirname $0`/..} LOGDIR=$GLITE_LOCATION_VAR -GLITE_LB_EXPORT_PURGE_ARGS=${GLITE_LB_EXPORT_PURGE_ARGS:--a 1h -c 1h -n 1h -o 1d} +GLITE_LB_EXPORT_PURGE_ARGS=${GLITE_LB_EXPORT_PURGE_ARGS:---cleared 2d --aborted 2w --cancelled 2w --other 2m} [ -d $GLITE_LB_EXPORT_JPDUMP_MAILDIR ] || mkdir -p $GLITE_LB_EXPORT_JPDUMP_MAILDIR [ -d $GLITE_LB_EXPORT_DUMPDIR ] || mkdir -p $GLITE_LB_EXPORT_DUMPDIR -[ -d $GLITE_LB_EXPORT_DUMPDIR_OLD ] || mkdir -p $GLITE_LB_EXPORT_DUMPDIR_OLD -[ -d $GLITE_LB_EXPORT_EXPORTDIR ] || mkdir -p $GLITE_LB_EXPORT_EXPORTDIR + +[ -d $GLITE_LB_EXPORT_SANDBOX_MAILDIR ] || mkdir -p $GLITE_LB_EXPORT_SANDBOX_MAILDIR + +[ -n "$GLITE_LB_EXPORT_DUMPDIR_KEEP" -a ! -d $GLITE_LB_EXPORT_DUMPDIR_KEEP ] && mkdir -p $GLITE_LB_EXPORT_DUMPDIR_KEEP +[ -d $GLITE_LB_EXPORT_JOBSDIR ] || mkdir -p $GLITE_LB_EXPORT_JOBSDIR +if [ -n "$GLITE_LB_EXPORT_JOBSDIR_KEEP" ]; then + keep_jobs="--store ${GLITE_LB_EXPORT_JOBSDIR_KEEP} " + [ -d $GLITE_LB_EXPORT_JOBSDIR_KEEP ] || mkdir -p $GLITE_LB_EXPORT_JOBSDIR_KEEP +fi + [ -d $LOGDIR ] || mkdir -p $LOGDIR echo "Using cert args $CERT_ARGS" -$PREFIX/bin/glite-jp-importer -r $GLITE_LB_EXPORT_JPREG_MAILDIR -d $GLITE_LB_EXPORT_JPDUMP_MAILDIR $CERT_ARGS -g -p $GLITE_LB_EXPORT_JPPS > $LOGDIR/jp-importer.log 2>&1 & +$PREFIX/bin/glite-jp-importer --reg-mdir $GLITE_LB_EXPORT_JPREG_MAILDIR --dump-mdir $GLITE_LB_EXPORT_JPDUMP_MAILDIR $CERT_ARGS --sandbox-mdir $GLITE_LB_EXPORT_SANDBOX_MAILDIR -g --jpps $GLITE_LB_EXPORT_JPPS $pidfile$keep_jobs> $LOGDIR/jp-importer.log 2>&1 & + JP_PID=$! trap "kill $JP_PID; exit 0" SIGINT while [ 1 ]; do - $PREFIX/sbin/glite-lb-purge $GLITE_LB_EXPORT_PURGE_ARGS -l -m $GLITE_LB_EXPORT_BKSERVER + $PREFIX/sbin/glite-lb-purge $GLITE_LB_EXPORT_PURGE_ARGS -l -m $GLITE_LB_EXPORT_BKSERVER -s for file in $GLITE_LB_EXPORT_DUMPDIR/*; do if [ -s $file ]; then - $PREFIX/sbin/glite-lb-lb_dump_exporter -d $file -s $GLITE_LB_EXPORT_EXPORTDIR -m $GLITE_LB_EXPORT_JPDUMP_MAILDIR - mv $file $GLITE_LB_EXPORT_DUMPDIR_OLD + $PREFIX/sbin/glite-lb-lb_dump_exporter -d $file -s $GLITE_LB_EXPORT_JOBSDIR -m $GLITE_LB_EXPORT_JPDUMP_MAILDIR + if [ -n "$GLITE_LB_EXPORT_DUMPDIR_KEEP" ]; then + mv $file $GLITE_LB_EXPORT_DUMPDIR_KEEP + else + rm $file + fi else rm $file fi diff --git a/org.glite.jp.client/examples/jpps_upload_files.c b/org.glite.jp.client/examples/jpps_upload_files.c index 3e5da0b..48ff776 100644 --- a/org.glite.jp.client/examples/jpps_upload_files.c +++ b/org.glite.jp.client/examples/jpps_upload_files.c @@ -23,8 +23,8 @@ int main(int argc, char **argv) { glite_jpcl_context_t ctx; char **files, - *jobid, - *proxy, + *jobid = NULL, + *proxy = NULL, *lbmd = NULL, *jpps = NULL; int i, j; @@ -46,6 +46,11 @@ int main(int argc, char **argv) if ( i >= argc ) { usage(); return 1; } + if ( !proxy && !(proxy = getenv("X509_USER_PROXY")) ) { + perror("-p or X509_USER_PROXY must be set!\n"); + return 1; + } + if ( !(files = calloc(argc-i+1, sizeof(*files))) ) { perror("calloc()"); return 1; @@ -61,7 +66,7 @@ int main(int argc, char **argv) if ( lbmd ) glite_jpcl_SetParam(ctx, GLITE_JPCL_PARAM_LBMAILDIR, lbmd); if ( jpps ) glite_jpcl_SetParam(ctx, GLITE_JPCL_PARAM_JPPS, jpps); - if ( glite_jpimporter_upload_files(ctx, jobid, files, proxy) ) { + if ( glite_jpimporter_upload_files(ctx, jobid, (const char **)files, proxy) ) { char *errt, *errd; glite_jpcl_Error(ctx, &errt, &errd); diff --git a/org.glite.jp.client/project/version.properties b/org.glite.jp.client/project/version.properties index 08e9de3..46eb5a1 100644 --- a/org.glite.jp.client/project/version.properties +++ b/org.glite.jp.client/project/version.properties @@ -1,2 +1,2 @@ -module.version=1.1.0 -module.age=0 +module.version=1.2.0 +module.age=1 diff --git a/org.glite.jp.client/src/jpimporter.c b/org.glite.jp.client/src/jpimporter.c index 7a30ef8..31e9a3d 100644 --- a/org.glite.jp.client/src/jpimporter.c +++ b/org.glite.jp.client/src/jpimporter.c @@ -12,18 +12,19 @@ #include #include #include +#include #include "glite/lb/lb_maildir.h" -#include "glite/security/glite_gsplugin.h" #include "jpps_H.h" #include "jpps_.nsmap" #include "jptype_map.h" +#include "glite/security/glite_gsplugin.h" +#include "glite/security/glite_gscompat.h" #include "globus_ftp_client.h" -#include "soap_version.h" #if GSOAP_VERSION <= 20602 #define soap_call___jpsrv__RegisterJob soap_call___ns1__RegisterJob #endif @@ -36,9 +37,11 @@ typedef struct { #ifndef dprintf -#define dprintf(x) { if (debug) printf x; } +#define dprintf(FMT, ARGS...) { if (debug) printf(FMT, ##ARGS); } #endif +#define check_soap_fault(SOAP, ERR) glite_jp_clientCheckFault((SOAP), (ERR), name, 1) + #ifndef GLITE_JPIMPORTER_PIDFILE #define GLITE_JPIMPORTER_PIDFILE "/var/run/glite-jpimporter.pid" #endif @@ -51,29 +54,38 @@ typedef struct { #define GLITE_DUMP_IMPORTER_MDIR "/tmp/jpdump" #endif +#ifndef GLITE_SANDBOX_IMPORTER_MDIR +#define GLITE_SANDBOX_IMPORTER_MDIR "/tmp/jpsandbox" +#endif + #ifndef GLITE_JPPS #define GLITE_JPPS "http://localhost:8901" #endif #define MAX_REG_CONNS 500 - -static int debug = 0; -static int die = 0; -static int child_died = 0; -static int poll = 2; -static char *name; -static char *jpps = GLITE_JPPS; -static char reg_mdir[PATH_MAX] = GLITE_REG_IMPORTER_MDIR; -static char dump_mdir[PATH_MAX] = GLITE_DUMP_IMPORTER_MDIR; -static struct soap *soap; - -static time_t cert_mtime; -static char *server_cert = NULL, - *server_key = NULL, - *cadir; +#define JPPS_NO_RESPONSE_TIMEOUT 120 + + +static int debug = 0; +static int die = 0; +static int child_died = 0; +static int poll = 2; +static char *name; +static char *jpps = GLITE_JPPS; +static char reg_mdir[PATH_MAX] = GLITE_REG_IMPORTER_MDIR; +static char dump_mdir[PATH_MAX] = GLITE_DUMP_IMPORTER_MDIR; +static char sandbox_mdir[PATH_MAX] = GLITE_SANDBOX_IMPORTER_MDIR; +static char *store = NULL; +static struct soap *soap; + +static time_t cert_mtime; +static char *server_cert = NULL, + *server_key = NULL, + *cadir = NULL; static gss_cred_id_t mycred = GSS_C_NO_CREDENTIAL; -static char *mysubj; +static char *mysubj; +struct timeval to = {JPPS_NO_RESPONSE_TIMEOUT, 0}; static struct option opts[] = { @@ -85,12 +97,16 @@ static struct option opts[] = { { "jpps", 1, NULL, 'p'}, { "reg-mdir", 1, NULL, 'r'}, { "dump-mdir", 1, NULL, 'd'}, + { "sandbox-mdir",1, NULL, 's'}, { "pidfile", 1, NULL, 'i'}, { "poll", 1, NULL, 't'}, + { "store", 1, NULL, 'S'}, { NULL, 0, NULL, 0} }; -static const char *get_opt_string = "hgp:r:d:i:t:c:k:C:"; +static const char *get_opt_string = "hgp:r:d:s:i:t:c:k:C:"; + +#include "glite/jp/ws_fault.c" static void usage(char *me) { @@ -103,8 +119,10 @@ static void usage(char *me) "\t-p, --jpps JP primary service server\n" "\t-r, --reg-mdir path to the 'LB maildir' subtree for registrations\n" "\t-d, --dump-mdir path to the 'LB maildir' subtree for LB dumps\n" + "\t-s, --sandbox-mdir path to the 'LB maildir' subtree for input/output sandboxes\n" "\t-i, --pidfile file to store master pid\n" "\t-t, --poll maildir polling interval (in seconds)\n", + "\t-S, --store keep uploaded jobs in this directory\n", me); } @@ -120,9 +138,9 @@ static void catch_chld(int sig) static int slave(int (*)(void), const char *); -static int check_soap_fault(struct soap *, int); static int reg_importer(void); static int dump_importer(void); +static int sandbox_importer(void); static int parse_msg(char *, msg_pattern_t []); static int gftp_put_file(const char *, int); @@ -132,13 +150,12 @@ int main(int argc, char *argv[]) { edg_wll_GssStatus gss_code; struct sigaction sa; - sigset_t sset; - FILE *fpid; - pid_t reg_pid, dump_pid; - int opt; - char *name, - pidfile[PATH_MAX] = GLITE_JPIMPORTER_PIDFILE; - + sigset_t sset; + FILE *fpid; + pid_t reg_pid, dump_pid, sandbox_pid; + int opt; + char *name, + pidfile[PATH_MAX] = GLITE_JPIMPORTER_PIDFILE; glite_gsplugin_Context plugin_ctx; @@ -157,8 +174,10 @@ int main(int argc, char *argv[]) case 'C': cadir = optarg; break; case 'p': jpps = optarg; break; case 't': poll = atoi(optarg); break; + case 'S': store = optarg; break; case 'r': strcpy(reg_mdir, optarg); break; case 'd': strcpy(dump_mdir, optarg); break; + case 's': strcpy(sandbox_mdir, optarg); break; case 'i': strcpy(pidfile, optarg); break; case '?': usage(name); return 1; } @@ -187,6 +206,13 @@ int main(int argc, char *argv[]) edg_wll_MaildirInit(reg_mdir); edg_wll_MaildirInit(dump_mdir); + edg_wll_MaildirInit(sandbox_mdir); + if (store && *store) { + if (mkdir(store, 0750) != 0 && errno != EEXIST) { + fprintf(stderr, "Can't create directory %s: %s\n", store, strerror(errno)); + store = NULL; + } + } if ( !debug ) { if ( daemon(1,0) == -1 ) { perror("deamon()"); exit(1); } @@ -198,13 +224,13 @@ int main(int argc, char *argv[]) openlog(name, LOG_PID, LOG_DAEMON); } else { setpgid(0, getpid()); } - dprintf(("Master pid %d\n", getpid())); + dprintf("Master pid %d\n", getpid()); if ( globus_module_activate(GLOBUS_FTP_CLIENT_MODULE) != GLOBUS_SUCCESS ) { - dprintf(("[master] Could not activate ftp client module\n")); + dprintf("[master] Could not activate ftp client module\n"); if (!debug) syslog(LOG_INFO, "Could not activate ftp client module\n"); exit(1); - } else dprintf(("[master] Ftp client module activated\n")); + } else dprintf("[master] Ftp client module activated\n"); if ( !server_cert || !server_key ) fprintf(stderr, "%s: key or certificate file not specified" @@ -212,13 +238,13 @@ int main(int argc, char *argv[]) if ( cadir ) setenv("X509_CERT_DIR", cadir, 1); edg_wll_gss_watch_creds(server_cert, &cert_mtime); if ( !edg_wll_gss_acquire_cred_gsi(server_cert, server_key, &mycred, &mysubj, &gss_code) ) { - dprintf(("[master] Server identity: %s\n", mysubj)); + dprintf("[master] Server identity: %s\n", mysubj); } else { char *errmsg; edg_wll_gss_get_error(&gss_code, "edg_wll_gss_acquire_cred_gsi()", &errmsg); - dprintf(("[master] %s\n", errmsg)); + dprintf("[master] %s\n", errmsg); free(errmsg); - dprintf(("[master] Running unauthenticated\n")); + dprintf("[master] Running unauthenticated\n"); } memset(&sa, 0, sizeof(sa)); assert(sa.sa_handler == NULL); @@ -245,6 +271,7 @@ int main(int argc, char *argv[]) glite_gsplugin_init_context(&plugin_ctx); if (server_key) plugin_ctx->key_filename = strdup(server_key); if (server_cert) plugin_ctx->cert_filename = strdup(server_cert); + glite_gsplugin_set_timeout(plugin_ctx, &to); soap_register_plugin_arg(soap, glite_gsplugin,plugin_ctx); @@ -256,6 +283,10 @@ int main(int argc, char *argv[]) perror("starting dump importer slave"); exit(1); } + if ( (sandbox_pid = slave(sandbox_importer, "sandbox-imp")) < 0 ) { + perror("starting sandbox importer slave"); + exit(1); + } while ( !die ) { @@ -269,24 +300,34 @@ int main(int argc, char *argv[]) while ( (pid = waitpid(-1, NULL, WNOHANG)) > 0 ) { if ( !die ) { if ( pid == reg_pid ) { - dprintf(("[master] reg importer slave died [%d]\n", pid)); + dprintf("[master] reg importer slave died [%d]\n", pid); if (!debug) syslog(LOG_INFO, "reg importer slave died [%d]\n", die); if ( (reg_pid = slave(reg_importer, "reg-imp")) < 0 ) { perror("starting reg importer slave"); kill(0, SIGINT); exit(1); } - dprintf(("[master] reg importer slave restarted [%d]\n", reg_pid)); + dprintf("[master] reg importer slave restarted [%d]\n", reg_pid); } else if ( pid == dump_pid ) { - dprintf(("[master] dump importer slave died [%d]\n", pid)); + dprintf("[master] dump importer slave died [%d]\n", pid); if (!debug) syslog(LOG_INFO, "dump importer slave died [%d]\n", die); if ( (dump_pid = slave(dump_importer, "dump-imp")) < 0 ) { perror("starting dump importer slave"); kill(0, SIGINT); exit(1); } - dprintf(("[master] dump importer slave restarted [%d]\n", dump_pid)); + dprintf("[master] dump importer slave restarted [%d]\n", dump_pid); + } else if ( pid == sandbox_pid ) { + dprintf("[master] sandbox importer slave died [%d]\n", pid); + if (!debug) syslog(LOG_INFO, "sandbox importer slave died [%d]\n", die); + if ( (sandbox_pid = slave(sandbox_importer, "sandbox-imp")) < 0 ) { + perror("starting sandbox importer slave"); + kill(0, SIGINT); + exit(1); + } + dprintf("[master] sandbox importer slave restarted [%d]\n", sandbox_pid); } + } } child_died = 0; @@ -294,7 +335,7 @@ int main(int argc, char *argv[]) } } - dprintf(("[master] Terminating on signal %d\n", die)); + dprintf("[master] Terminating on signal %d\n", die); if (!debug) syslog(LOG_INFO, "Terminating on signal %d\n", die); kill(0, die); @@ -307,9 +348,9 @@ int main(int argc, char *argv[]) static int slave(int (*fn)(void), const char *nm) { struct sigaction sa; - sigset_t sset; - int pid, - conn_cnt = 0; + sigset_t sset; + int pid, + conn_cnt = 0; if ( (pid = fork()) ) return pid; @@ -325,7 +366,7 @@ static int slave(int (*fn)(void), const char *nm) sigaddset(&sset, SIGUSR1); sigprocmask(SIG_BLOCK, &sset, NULL); - dprintf(("[%s] slave started - pid [%d]\n", name, getpid())); + dprintf("[%s] slave started - pid [%d]\n", name, getpid()); while ( !die && conn_cnt < MAX_REG_CONNS ) { int ret = fn(); @@ -340,10 +381,10 @@ static int slave(int (*fn)(void), const char *nm) } if ( die ) { - dprintf(("[%s] Terminating on signal %d\n", name, getpid(), die)); + dprintf("[%s] Terminating on signal %d\n", name, getpid(), die); if ( !debug ) syslog(LOG_INFO, "Terminating on signal %d", die); } - dprintf(("[%s] Terminating after %d connections\n", name, conn_cnt)); + dprintf("[%s] Terminating after %d connections\n", name, conn_cnt); if ( !debug ) syslog(LOG_INFO, "Terminating after %d connections", conn_cnt); exit(0); @@ -355,35 +396,52 @@ static int reg_importer(void) struct _jpelem__RegisterJob in; struct _jpelem__RegisterJobResponse empty; int ret; + static int readnew = 1; char *msg = NULL, *fname = NULL, *aux; + if ( readnew ) ret = edg_wll_MaildirTransStart(reg_mdir, &msg, &fname); + else ret = edg_wll_MaildirRetryTransStart(reg_mdir, (time_t)30, (time_t)500, &msg, &fname); + if ( !ret ) { + readnew = !readnew; + if ( readnew ) ret = edg_wll_MaildirTransStart(reg_mdir, &msg, &fname); + else ret = edg_wll_MaildirRetryTransStart(reg_mdir, (time_t)30, (time_t)500, &msg, &fname); + if ( !ret ) { + readnew = !readnew; + return 0; + } + } + + if ( ret < 0 ) { + dprintf("[%s] edg_wll_MaildirTransStart: %s (%s)\n", name, strerror(errno), lbm_errdesc); + if ( !debug ) syslog(LOG_ERR, "edg_wll_MaildirTransStart: %s (%s)", strerror(errno), lbm_errdesc); + return -1; + } - ret = edg_wll_MaildirTransStart(reg_mdir, &msg, &fname); if ( ret < 0 ) { - dprintf(("[%s] edg_wll_MaildirTransStart: %s (%s)\n", name, strerror(errno), lbm_errdesc)); + dprintf("[%s] edg_wll_MaildirTransStart: %s (%s)\n", name, strerror(errno), lbm_errdesc); if ( !debug ) syslog(LOG_ERR, "edg_wll_MaildirTransStart: %s (%s)", strerror(errno), lbm_errdesc); return -1; } else if ( ret > 0 ) { - dprintf(("[%s] JP registration request received\n", name)); + dprintf("[%s] JP registration request received\n", name); if ( !debug ) syslog(LOG_INFO, "JP registration request received\n"); ret = 0; if ( !(aux = strchr(msg, '\n')) ) { - dprintf(("[%s] Wrong format of message!\n", name)); + dprintf("[%s] Wrong format of message!\n", name); if ( !debug ) syslog(LOG_ERR, "Wrong format of message\n"); ret = 0; } else do { *aux++ = '\0'; in.job = msg; in.owner = aux; - dprintf(("[%s] Registering '%s'\n", name, msg)); + dprintf("[%s] Registering '%s'\n", name, msg); if ( !debug ) syslog(LOG_INFO, "Registering '%s'\n", msg); ret = soap_call___jpsrv__RegisterJob(soap, jpps, "", &in, &empty); if ( (ret = check_soap_fault(soap, ret)) ) break; } while (0); - edg_wll_MaildirTransEnd(reg_mdir, fname, ret? LBMD_TRANS_FAILED: LBMD_TRANS_OK); + edg_wll_MaildirTransEnd(reg_mdir, fname, ret? LBMD_TRANS_FAILED_RETRY: LBMD_TRANS_OK); free(fname); free(msg); return 1; @@ -401,7 +459,9 @@ static int dump_importer(void) static int readnew = 1; char *msg = NULL, *fname = NULL, - *aux; + *aux, + *bname; + char fspec[PATH_MAX]; int ret; int fhnd; msg_pattern_t tab[] = { @@ -417,29 +477,29 @@ static int dump_importer(void) if ( readnew ) ret = edg_wll_MaildirTransStart(dump_mdir, &msg, &fname); - else ret = edg_wll_MaildirRetryTransStart(dump_mdir, (time_t)60, &msg, &fname); + else ret = edg_wll_MaildirRetryTransStart(dump_mdir, (time_t)60, (time_t)600, &msg, &fname); if ( !ret ) { - readnew = ~readnew; + readnew = !readnew; if ( readnew ) ret = edg_wll_MaildirTransStart(dump_mdir, &msg, &fname); - else ret = edg_wll_MaildirRetryTransStart(dump_mdir, (time_t)60, &msg, &fname); + else ret = edg_wll_MaildirRetryTransStart(dump_mdir, (time_t)60, (time_t)600, &msg, &fname); if ( !ret ) { - readnew = ~readnew; + readnew = !readnew; return 0; } } if ( ret < 0 ) { - dprintf(("[%s] edg_wll_MaildirTransStart: %s (%s)\n", name, strerror(errno), lbm_errdesc)); + dprintf("[%s] edg_wll_MaildirTransStart: %s (%s)\n", name, strerror(errno), lbm_errdesc); if ( !debug ) syslog(LOG_ERR, "edg_wll_MaildirTransStart: %s (%s)", strerror(errno), lbm_errdesc); return -1; } - dprintf(("[%s] dump JP import request received\n", name)); + dprintf("[%s] dump JP import request received\n", name); if ( !debug ) syslog(LOG_INFO, "dump JP import request received"); ret = 0; if ( parse_msg(msg, tab) < 0 ) { - dprintf(("[%s] Wrong format of message!\n", name)); + dprintf("[%s] Wrong format of message!\n", name); if ( !debug ) syslog(LOG_ERR, "Wrong format of message"); ret = 0; } else do { @@ -448,27 +508,44 @@ static int dump_importer(void) su_in.name = NULL; su_in.commitBefore = 1000 + time(NULL); su_in.contentType = "text/lb"; - dprintf(("[%s] Importing LB dump file '%s'\n", name, tab[_file].val)); + dprintf("[%s] Importing LB dump file '%s'\n", name, tab[_file].val); if ( !debug ) syslog(LOG_INFO, "Importing LB dump file '%s'\n", msg); ret = soap_call___jpsrv__StartUpload(soap, tab[_jpps].val?:jpps, "", &su_in, &su_out); - ret = check_soap_fault(soap, ret); - /* XXX: grrrrrrr! test it!!!*/ -// if ( (ret = check_soap_fault(soap, ret)) ) break; - dprintf(("[%s] Destination: %s\n\tCommit before: %s\n", name, su_out.destination, ctime(&su_out.commitBefore))); + if ( (ret = check_soap_fault(soap, ret)) ) break; + dprintf("[%s] Destination: %s\n\tCommit before: %s\n", name, su_out.destination, ctime(&su_out.commitBefore)); + if (su_out.destination == NULL) { + dprintf("[%s] StartUpload returned NULL destination\n", name); + if ( !debug ) syslog(LOG_ERR, "StartUpload returned NULL destination"); + break; + } if ( (fhnd = open(tab[_file].val, O_RDONLY)) < 0 ) { - dprintf(("[%s] Can't open dump file: %s\n", name, tab[_file].val)); + dprintf("[%s] Can't open dump file: %s\n", name, tab[_file].val); if ( !debug ) syslog(LOG_ERR, "Can't open dump file: %s", tab[_file].val); ret = 1; break; } if ( (ret = gftp_put_file(su_out.destination, fhnd)) ) break; close(fhnd); - dprintf(("[%s] File sent, commiting the upload\n", name)); + dprintf("[%s] File sent, commiting the upload\n", name); cu_in.destination = su_out.destination; ret = soap_call___jpsrv__CommitUpload(soap, tab[_jpps].val?:jpps, "", &cu_in, &empty); if ( (ret = check_soap_fault(soap, ret)) ) break; - dprintf(("[%s] Dump upload succesfull\n", name)); + dprintf("[%s] Dump upload succesfull\n", name); + if (store && *store) { + bname = strdup(tab[_file].val); + snprintf(fspec, sizeof fspec, "%s/%s", store, basename(bname)); + free(bname); + if (rename(tab[_file].val, fspec) != 0) + fprintf(stderr, "moving %s to %s failed: %s\n", tab[_file].val, fspec, strerror(errno)); + else + dprintf("[%s] moving %s to %s OK\n", name, tab[_file].val, fspec); + } else { + if (unlink(tab[_file].val) != 0) + fprintf(stderr, "removing %s failed: %s\n", tab[_file].val, strerror(errno)); + else + dprintf("[%s] %s removed\n", name, tab[_file].val); + } } while (0); edg_wll_MaildirTransEnd(dump_mdir, fname, ret? LBMD_TRANS_FAILED_RETRY: LBMD_TRANS_OK); @@ -479,54 +556,98 @@ static int dump_importer(void) } -static int check_soap_fault(struct soap *soap, int err) + +static int sandbox_importer(void) { - struct SOAP_ENV__Detail *detail; - struct jptype__genericFault *f; - char *reason, - indent[200] = " "; - + struct _jpelem__StartUpload su_in; + struct _jpelem__StartUploadResponse su_out; + struct _jpelem__CommitUpload cu_in; + struct _jpelem__CommitUploadResponse empty; + static int readnew = 1; + char *msg = NULL, + *fname = NULL, + *aux; + int ret; + int fhnd; + msg_pattern_t tab[] = { + {"jobid", NULL}, + {"file", NULL}, + {"jpps", NULL}, + {"proxy", NULL}, + {NULL, NULL}}; - switch ( err ) { - case SOAP_OK: - dprintf(("[%s] ok\n", name)); - break; +#define _job 0 +#define _file 1 +#define _jpps 2 +#define _proxy 3 - case SOAP_FAULT: - case SOAP_SVR_FAULT: - if (soap->version == 2) { - detail = soap->fault->SOAP_ENV__Detail; - reason = soap->fault->SOAP_ENV__Reason; - } else { - detail = soap->fault->detail; - reason = soap->fault->faultstring; - } - dprintf(("[%s] %s\n", name, reason)); - if ( !debug ) syslog(LOG_ERR, "%s", reason); - assert(detail->__type == SOAP_TYPE__genericFault); -#if GSOAP_VERSION >=20700 - f = ((struct _genericFault *) detail->fault) -> jpelem__genericFault; -#else - f = ((struct _genericFault *) detail->value) -> jpelem__genericFault; -#endif - while (f) { - dprintf(("[%s] %s%s: %s (%s)\n", - name, indent, - f->source, f->text, f->description)); - if ( !debug ) syslog(LOG_ERR, "%s%s: %s (%s)", - reason, f->source, f->text, f->description); - f = f->reason; - strcat(indent, " "); + + if ( readnew ) ret = edg_wll_MaildirTransStart(sandbox_mdir, &msg, &fname); + else ret = edg_wll_MaildirRetryTransStart(sandbox_mdir, (time_t)60, (time_t) 600,&msg, &fname); + if ( !ret ) { + readnew = !readnew; + if ( readnew ) ret = edg_wll_MaildirTransStart(sandbox_mdir, &msg, &fname); + else ret = edg_wll_MaildirRetryTransStart(sandbox_mdir, (time_t)60, (time_t) 600,&msg, &fname); + if ( !ret ) { + readnew = !readnew; + return 0; } - return -1; + } - default: soap_print_fault(soap,stderr); + if ( ret < 0 ) { + dprintf("[%s] edg_wll_MaildirTransStart: %s (%s)\n", name, strerror(errno), lbm_errdesc); + if ( !debug ) syslog(LOG_ERR, "edg_wll_MaildirTransStart: %s (%s)", strerror(errno), lbm_errdesc); return -1; } - return 0; + dprintf("[%s] sandbox JP import request received\n", name); + if ( !debug ) syslog(LOG_INFO, "sandbox JP import request received"); + + ret = 0; + if ( parse_msg(msg, tab) < 0 ) { + dprintf("[%s] Wrong format of message!\n", name); + if ( !debug ) syslog(LOG_ERR, "Wrong format of message"); + ret = 0; + } else do { + su_in.job = tab[_job].val; + // XXX: defined in org.glite.jp.primary/src/builtin_plugins.h + // shloud use symbolic const... + // do not distinquish between ibs and obs now + su_in.class_ = "urn:org.glite.jp.primary:isb"; + su_in.name = NULL; + su_in.commitBefore = 1000 + time(NULL); + su_in.contentType = "tar/lb"; + dprintf("[%s] Importing LB sandbox tar file '%s'\n", name, tab[_file].val); + if ( !debug ) syslog(LOG_INFO, "Importing LB sandbox tar file '%s'\n", msg); + ret = soap_call___jpsrv__StartUpload(soap, tab[_jpps].val?:jpps, "", &su_in, &su_out); + ret = check_soap_fault(soap, ret); + /* XXX: grrrrrrr! test it!!!*/ +// if ( (ret = check_soap_fault(soap, ret)) ) break; + dprintf("[%s] Destination: %s\n\tCommit before: %s\n", name, su_out.destination, ctime(&su_out.commitBefore)); + + if ( (fhnd = open(tab[_file].val, O_RDONLY)) < 0 ) { + dprintf("[%s] Can't open sandbox tar file: %s\n", name, tab[_file].val); + if ( !debug ) syslog(LOG_ERR, "Can't open sandbox tar file: %s", tab[_file].val); + ret = 1; + break; + } + if ( (ret = gftp_put_file(su_out.destination, fhnd)) ) break; + close(fhnd); + dprintf("[%s] File sent, commiting the upload\n", name); + cu_in.destination = su_out.destination; + ret = soap_call___jpsrv__CommitUpload(soap, tab[_jpps].val?:jpps, "", &cu_in, &empty); + if ( (ret = check_soap_fault(soap, ret)) ) break; + dprintf("[%s] Dump upload succesfull\n", name); + } while (0); + + edg_wll_MaildirTransEnd(sandbox_mdir, fname, ret? LBMD_TRANS_FAILED_RETRY: LBMD_TRANS_OK); + free(fname); + free(msg); + + return 1; } + /** Parses every line looking for pattern string and stores the value into * the given variable * @@ -580,7 +701,7 @@ static void gftp_done_cb( { if ( err != GLOBUS_SUCCESS ) { char *tmp = globus_object_printable_to_string(err); - dprintf(("[%s] Error in callback: %s\n", name, tmp)); + dprintf("[%s] Error in callback: %s\n", name, tmp); if ( !debug ) syslog(LOG_ERR, "Error in callback: %s", tmp); gError = GLOBUS_TRUE; globus_libc_free(tmp); @@ -604,7 +725,7 @@ static void gftp_data_cb( int rc; globus_mutex_lock(&gLock); if ( (rc = read(*((int *)user_arg), gBuffer, BUFSZ)) < 0 ) { - dprintf(("[%s] Error reading dump file\n", name)); + dprintf("[%s] Error reading dump file\n", name); if ( !debug ) syslog(LOG_ERR, "Error reading dump file"); gDone = GLOBUS_TRUE; gError = GLOBUS_TRUE; @@ -625,7 +746,7 @@ static int gftp_put_file(const char *url, int fhnd) globus_ftp_client_handleattr_t hnd_attr; #define put_file_err(errs) { \ - dprintf(("[%s] %s\n", name, errs)); \ + dprintf("[%s] %s\n", name, errs); \ if ( !debug ) syslog(LOG_ERR, errs); \ return 1; \ } @@ -648,12 +769,13 @@ static int gftp_put_file(const char *url, int fhnd) globus_cond_init(&gCond, GLOBUS_NULL); gDone = GLOBUS_FALSE; + gError = GLOBUS_FALSE; /* do the op */ if ( globus_ftp_client_put( &hnd, url, &op_attr, GLOBUS_NULL, gftp_done_cb, (void *)&fhnd) != GLOBUS_SUCCESS) { - dprintf(("[%s] Could not start file put\n", name)); + dprintf("[%s] Could not start file put\n", name); if ( !debug ) syslog(LOG_ERR, "Could not start file put"); gError = GLOBUS_TRUE; gDone = GLOBUS_TRUE; @@ -661,7 +783,7 @@ static int gftp_put_file(const char *url, int fhnd) int rc; globus_mutex_lock(&gLock); if ( (rc = read(fhnd, gBuffer, BUFSZ)) < 0 ) { - dprintf(("[%s] Error reading dump file\n", name)); + dprintf("[%s] Error reading dump file\n", name); if ( !debug ) syslog(LOG_ERR, "Error reading dump file"); gDone = GLOBUS_TRUE; gError = GLOBUS_TRUE;