From 5953fb821b91aa0d0f2760712be78cc3c90a9e69 Mon Sep 17 00:00:00 2001 From: =?utf8?q?Milo=C5=A1=20Mula=C4=8D?= Date: Mon, 15 May 2006 14:19:04 +0000 Subject: [PATCH] - added support for sandbox upload --- org.glite.jp.client/src/jpimporter.c | 168 +++++++++++++++++++++++++++++------ 1 file changed, 142 insertions(+), 26 deletions(-) diff --git a/org.glite.jp.client/src/jpimporter.c b/org.glite.jp.client/src/jpimporter.c index 7a30ef8..9144967 100644 --- a/org.glite.jp.client/src/jpimporter.c +++ b/org.glite.jp.client/src/jpimporter.c @@ -51,6 +51,10 @@ typedef struct { #define GLITE_DUMP_IMPORTER_MDIR "/tmp/jpdump" #endif +#ifndef GLITE_SANDBOX_IMPORTER_MDIR +#define GLITE_SANDBOX_IMPORTER_MDIR "/tmp/jpsandbox" +#endif + #ifndef GLITE_JPPS #define GLITE_JPPS "http://localhost:8901" #endif @@ -58,22 +62,23 @@ typedef struct { #define MAX_REG_CONNS 500 -static int debug = 0; -static int die = 0; -static int child_died = 0; -static int poll = 2; -static char *name; -static char *jpps = GLITE_JPPS; -static char reg_mdir[PATH_MAX] = GLITE_REG_IMPORTER_MDIR; -static char dump_mdir[PATH_MAX] = GLITE_DUMP_IMPORTER_MDIR; -static struct soap *soap; - -static time_t cert_mtime; -static char *server_cert = NULL, - *server_key = NULL, - *cadir; +static int debug = 0; +static int die = 0; +static int child_died = 0; +static int poll = 2; +static char *name; +static char *jpps = GLITE_JPPS; +static char reg_mdir[PATH_MAX] = GLITE_REG_IMPORTER_MDIR; +static char dump_mdir[PATH_MAX] = GLITE_DUMP_IMPORTER_MDIR; +static char sandbox_mdir[PATH_MAX] = GLITE_SANDBOX_IMPORTER_MDIR; +static struct soap *soap; + +static time_t cert_mtime; +static char *server_cert = NULL, + *server_key = NULL, + *cadir = NULL; static gss_cred_id_t mycred = GSS_C_NO_CREDENTIAL; -static char *mysubj; +static char *mysubj; static struct option opts[] = { @@ -85,12 +90,13 @@ static struct option opts[] = { { "jpps", 1, NULL, 'p'}, { "reg-mdir", 1, NULL, 'r'}, { "dump-mdir", 1, NULL, 'd'}, + { "sandbox-mdir",1, NULL, 's'}, { "pidfile", 1, NULL, 'i'}, { "poll", 1, NULL, 't'}, { NULL, 0, NULL, 0} }; -static const char *get_opt_string = "hgp:r:d:i:t:c:k:C:"; +static const char *get_opt_string = "hgp:r:d:s:i:t:c:k:C:"; static void usage(char *me) { @@ -103,6 +109,7 @@ static void usage(char *me) "\t-p, --jpps JP primary service server\n" "\t-r, --reg-mdir path to the 'LB maildir' subtree for registrations\n" "\t-d, --dump-mdir path to the 'LB maildir' subtree for LB dumps\n" + "\t-d, --sandbox-mdir path to the 'LB maildir' subtree for input/output sandboxes\n" "\t-i, --pidfile file to store master pid\n" "\t-t, --poll maildir polling interval (in seconds)\n", me); @@ -123,6 +130,7 @@ static int slave(int (*)(void), const char *); static int check_soap_fault(struct soap *, int); static int reg_importer(void); static int dump_importer(void); +static int sandbox_importer(void); static int parse_msg(char *, msg_pattern_t []); static int gftp_put_file(const char *, int); @@ -132,13 +140,12 @@ int main(int argc, char *argv[]) { edg_wll_GssStatus gss_code; struct sigaction sa; - sigset_t sset; - FILE *fpid; - pid_t reg_pid, dump_pid; - int opt; - char *name, - pidfile[PATH_MAX] = GLITE_JPIMPORTER_PIDFILE; - + sigset_t sset; + FILE *fpid; + pid_t reg_pid, dump_pid, sandbox_pid; + int opt; + char *name, + pidfile[PATH_MAX] = GLITE_JPIMPORTER_PIDFILE; glite_gsplugin_Context plugin_ctx; @@ -159,6 +166,7 @@ int main(int argc, char *argv[]) case 't': poll = atoi(optarg); break; case 'r': strcpy(reg_mdir, optarg); break; case 'd': strcpy(dump_mdir, optarg); break; + case 's': strcpy(sandbox_mdir, optarg); break; case 'i': strcpy(pidfile, optarg); break; case '?': usage(name); return 1; } @@ -187,6 +195,7 @@ int main(int argc, char *argv[]) edg_wll_MaildirInit(reg_mdir); edg_wll_MaildirInit(dump_mdir); + edg_wll_MaildirInit(sandbox_mdir); if ( !debug ) { if ( daemon(1,0) == -1 ) { perror("deamon()"); exit(1); } @@ -256,6 +265,10 @@ int main(int argc, char *argv[]) perror("starting dump importer slave"); exit(1); } + if ( (sandbox_pid = slave(sandbox_importer, "sandbox-imp")) < 0 ) { + perror("starting sandbox importer slave"); + exit(1); + } while ( !die ) { @@ -286,7 +299,17 @@ int main(int argc, char *argv[]) exit(1); } dprintf(("[master] dump importer slave restarted [%d]\n", dump_pid)); + } else if ( pid == sandbox_pid ) { + dprintf(("[master] sandbox importer slave died [%d]\n", pid)); + if (!debug) syslog(LOG_INFO, "sandbox importer slave died [%d]\n", die); + if ( (sandbox_pid = slave(sandbox_importer, "sandbox-imp")) < 0 ) { + perror("starting sandbox importer slave"); + kill(0, SIGINT); + exit(1); + } + dprintf(("[master] sandbox importer slave restarted [%d]\n", sandbox_pid)); } + } } child_died = 0; @@ -307,9 +330,9 @@ int main(int argc, char *argv[]) static int slave(int (*fn)(void), const char *nm) { struct sigaction sa; - sigset_t sset; - int pid, - conn_cnt = 0; + sigset_t sset; + int pid, + conn_cnt = 0; if ( (pid = fork()) ) return pid; @@ -479,6 +502,99 @@ static int dump_importer(void) } + +static int sandbox_importer(void) +{ + struct _jpelem__StartUpload su_in; + struct _jpelem__StartUploadResponse su_out; + struct _jpelem__CommitUpload cu_in; + struct _jpelem__CommitUploadResponse empty; + static int readnew = 1; + char *msg = NULL, + *fname = NULL, + *aux; + int ret; + int fhnd; + msg_pattern_t tab[] = { + {"jobid", NULL}, + {"file", NULL}, + {"jpps", NULL}, + {"proxy", NULL}, + {NULL, NULL}}; + +#define _job 0 +#define _file 1 +#define _jpps 2 +#define _proxy 3 + + + if ( readnew ) ret = edg_wll_MaildirTransStart(sandbox_mdir, &msg, &fname); + else ret = edg_wll_MaildirRetryTransStart(sandbox_mdir, (time_t)60, &msg, &fname); + if ( !ret ) { + readnew = ~readnew; + if ( readnew ) ret = edg_wll_MaildirTransStart(sandbox_mdir, &msg, &fname); + else ret = edg_wll_MaildirRetryTransStart(sandbox_mdir, (time_t)60, &msg, &fname); + if ( !ret ) { + readnew = ~readnew; + return 0; + } + } + + if ( ret < 0 ) { + dprintf(("[%s] edg_wll_MaildirTransStart: %s (%s)\n", name, strerror(errno), lbm_errdesc)); + if ( !debug ) syslog(LOG_ERR, "edg_wll_MaildirTransStart: %s (%s)", strerror(errno), lbm_errdesc); + return -1; + } + + dprintf(("[%s] sandbox JP import request received\n", name)); + if ( !debug ) syslog(LOG_INFO, "sandbox JP import request received"); + + ret = 0; + if ( parse_msg(msg, tab) < 0 ) { + dprintf(("[%s] Wrong format of message!\n", name)); + if ( !debug ) syslog(LOG_ERR, "Wrong format of message"); + ret = 0; + } else do { + su_in.job = tab[_job].val; + // XXX: defined in org.glite.jp.primary/src/builtin_plugins.h + // shloud use symbolic const... + // do not distinquish between ibs and obs now + su_in.class_ = "urn:org.glite.jp.primary:isb"; + su_in.name = NULL; + su_in.commitBefore = 1000 + time(NULL); + su_in.contentType = "tar/lb"; + dprintf(("[%s] Importing LB sandbox tar file '%s'\n", name, tab[_file].val)); + if ( !debug ) syslog(LOG_INFO, "Importing LB sandbox tar file '%s'\n", msg); + ret = soap_call___jpsrv__StartUpload(soap, tab[_jpps].val?:jpps, "", &su_in, &su_out); + ret = check_soap_fault(soap, ret); + /* XXX: grrrrrrr! test it!!!*/ +// if ( (ret = check_soap_fault(soap, ret)) ) break; + dprintf(("[%s] Destination: %s\n\tCommit before: %s\n", name, su_out.destination, ctime(&su_out.commitBefore))); + + if ( (fhnd = open(tab[_file].val, O_RDONLY)) < 0 ) { + dprintf(("[%s] Can't open sandbox tar file: %s\n", name, tab[_file].val)); + if ( !debug ) syslog(LOG_ERR, "Can't open sandbox tar file: %s", tab[_file].val); + ret = 1; + break; + } + if ( (ret = gftp_put_file(su_out.destination, fhnd)) ) break; + close(fhnd); + dprintf(("[%s] File sent, commiting the upload\n", name)); + cu_in.destination = su_out.destination; + ret = soap_call___jpsrv__CommitUpload(soap, tab[_jpps].val?:jpps, "", &cu_in, &empty); + if ( (ret = check_soap_fault(soap, ret)) ) break; + dprintf(("[%s] Dump upload succesfull\n", name)); + } while (0); + + edg_wll_MaildirTransEnd(sandbox_mdir, fname, ret? LBMD_TRANS_FAILED_RETRY: LBMD_TRANS_OK); + free(fname); + free(msg); + + return 1; +} + + + static int check_soap_fault(struct soap *soap, int err) { struct SOAP_ENV__Detail *detail; -- 1.8.2.3