- added support for sandbox upload
authorMiloš Mulač <mulac@civ.zcu.cz>
Mon, 15 May 2006 14:19:04 +0000 (14:19 +0000)
committerMiloš Mulač <mulac@civ.zcu.cz>
Mon, 15 May 2006 14:19:04 +0000 (14:19 +0000)
org.glite.jp.client/src/jpimporter.c

index 7a30ef8..9144967 100644 (file)
@@ -51,6 +51,10 @@ typedef struct {
 #define GLITE_DUMP_IMPORTER_MDIR       "/tmp/jpdump"
 #endif 
 
+#ifndef GLITE_SANDBOX_IMPORTER_MDIR
+#define GLITE_SANDBOX_IMPORTER_MDIR    "/tmp/jpsandbox"
+#endif 
+
 #ifndef GLITE_JPPS
 #define GLITE_JPPS                                     "http://localhost:8901"
 #endif 
@@ -58,22 +62,23 @@ typedef struct {
 
 #define        MAX_REG_CONNS                           500
 
-static int                             debug = 0;
-static int                             die = 0;
-static int                             child_died = 0;
-static int                             poll = 2;
-static char                       *name;
-static char                       *jpps = GLITE_JPPS;
-static char                            reg_mdir[PATH_MAX] = GLITE_REG_IMPORTER_MDIR;
-static char                            dump_mdir[PATH_MAX] = GLITE_DUMP_IMPORTER_MDIR;
-static struct soap        *soap;
-
-static time_t                  cert_mtime;
-static char                       *server_cert = NULL,
-                                          *server_key = NULL,
-                                          *cadir;
+static int             debug = 0;
+static int             die = 0;
+static int             child_died = 0;
+static int             poll = 2;
+static char            *name;
+static char            *jpps = GLITE_JPPS;
+static char            reg_mdir[PATH_MAX] = GLITE_REG_IMPORTER_MDIR;
+static char            dump_mdir[PATH_MAX] = GLITE_DUMP_IMPORTER_MDIR;
+static char            sandbox_mdir[PATH_MAX] = GLITE_SANDBOX_IMPORTER_MDIR;
+static struct soap     *soap;
+
+static time_t          cert_mtime;
+static char            *server_cert = NULL,
+                       *server_key = NULL,
+                       *cadir = NULL;
 static gss_cred_id_t   mycred = GSS_C_NO_CREDENTIAL;
-static char                       *mysubj;
+static char            *mysubj;
 
 
 static struct option opts[] = {
@@ -85,12 +90,13 @@ static struct option opts[] = {
        { "jpps",        1, NULL,    'p'},
        { "reg-mdir",    1, NULL,    'r'},
        { "dump-mdir",   1, NULL,    'd'},
+       { "sandbox-mdir",1, NULL,    's'},
        { "pidfile",     1, NULL,    'i'},
        { "poll",        1, NULL,    't'},
        { NULL,          0, NULL,     0}
 };
 
-static const char *get_opt_string = "hgp:r:d:i:t:c:k:C:";
+static const char *get_opt_string = "hgp:r:d:s:i:t:c:k:C:";
 
 static void usage(char *me)
 {
@@ -103,6 +109,7 @@ static void usage(char *me)
                "\t-p, --jpps         JP primary service server\n"
                "\t-r, --reg-mdir     path to the 'LB maildir' subtree for registrations\n"
                "\t-d, --dump-mdir    path to the 'LB maildir' subtree for LB dumps\n"
+               "\t-d, --sandbox-mdir path to the 'LB maildir' subtree for input/output sandboxes\n"
                "\t-i, --pidfile      file to store master pid\n"
                "\t-t, --poll         maildir polling interval (in seconds)\n",
                me);
@@ -123,6 +130,7 @@ static int slave(int (*)(void), const char *);
 static int check_soap_fault(struct soap *, int);
 static int reg_importer(void);
 static int dump_importer(void);
+static int sandbox_importer(void);
 static int parse_msg(char *, msg_pattern_t []);
 static int gftp_put_file(const char *, int);
 
@@ -132,13 +140,12 @@ int main(int argc, char *argv[])
 {
        edg_wll_GssStatus       gss_code;
        struct sigaction        sa;
-       sigset_t                        sset;
-       FILE                       *fpid;
-       pid_t                           reg_pid, dump_pid;
-       int                                     opt;
-       char                       *name,
-                                               pidfile[PATH_MAX] = GLITE_JPIMPORTER_PIDFILE;
-
+       sigset_t                sset;
+       FILE                    *fpid;
+       pid_t                   reg_pid, dump_pid, sandbox_pid;
+       int                     opt;
+       char                    *name,
+                               pidfile[PATH_MAX] = GLITE_JPIMPORTER_PIDFILE;
        glite_gsplugin_Context  plugin_ctx;
 
 
@@ -159,6 +166,7 @@ int main(int argc, char *argv[])
                case 't': poll = atoi(optarg); break;
                case 'r': strcpy(reg_mdir, optarg); break;
                case 'd': strcpy(dump_mdir, optarg); break;
+               case 's': strcpy(sandbox_mdir, optarg); break;
                case 'i': strcpy(pidfile, optarg); break;
                case '?': usage(name); return 1;
        }
@@ -187,6 +195,7 @@ int main(int argc, char *argv[])
                
        edg_wll_MaildirInit(reg_mdir);
        edg_wll_MaildirInit(dump_mdir);
+       edg_wll_MaildirInit(sandbox_mdir);
 
        if ( !debug ) {
                if ( daemon(1,0) == -1 ) { perror("deamon()"); exit(1); }
@@ -256,6 +265,10 @@ int main(int argc, char *argv[])
                perror("starting dump importer slave");
                exit(1);
        }
+       if ( (sandbox_pid = slave(sandbox_importer, "sandbox-imp")) < 0 ) {
+               perror("starting sandbox importer slave");
+               exit(1);
+       }
 
        while ( !die ) {
 
@@ -286,7 +299,17 @@ int main(int argc, char *argv[])
                                                        exit(1);
                                                }
                                                dprintf(("[master] dump importer slave restarted [%d]\n", dump_pid));
+                                       } else if ( pid == sandbox_pid ) {
+                                               dprintf(("[master] sandbox importer slave died [%d]\n", pid));
+                                               if (!debug) syslog(LOG_INFO, "sandbox importer slave died [%d]\n", die);
+                                               if ( (sandbox_pid = slave(sandbox_importer, "sandbox-imp")) < 0 ) {
+                                                       perror("starting sandbox importer slave");
+                                                       kill(0, SIGINT);
+                                                       exit(1);
+                                               }
+                                               dprintf(("[master] sandbox importer slave restarted [%d]\n", sandbox_pid));
                                        }
+
                                }
                        }
                        child_died = 0;
@@ -307,9 +330,9 @@ int main(int argc, char *argv[])
 static int slave(int (*fn)(void), const char *nm)
 {
        struct sigaction        sa;
-       sigset_t                        sset;
-       int                                     pid,
-                                               conn_cnt = 0;
+       sigset_t                sset;
+       int                     pid,
+                               conn_cnt = 0;
 
 
        if ( (pid = fork()) ) return pid;
@@ -479,6 +502,99 @@ static int dump_importer(void)
 }
 
 
+
+static int sandbox_importer(void)
+{
+       struct _jpelem__StartUpload             su_in;
+       struct _jpelem__StartUploadResponse     su_out;
+       struct _jpelem__CommitUpload            cu_in;
+       struct _jpelem__CommitUploadResponse    empty;
+       static int      readnew = 1;
+       char            *msg = NULL,
+                       *fname = NULL,
+                       *aux;
+       int             ret;
+       int             fhnd;
+       msg_pattern_t   tab[] = {
+                               {"jobid", NULL},
+                               {"file", NULL},
+                               {"jpps", NULL},
+                               {"proxy", NULL},
+                               {NULL, NULL}};
+
+#define                        _job   0
+#define                        _file  1
+#define                        _jpps  2
+#define                        _proxy 3
+
+
+       if ( readnew ) ret = edg_wll_MaildirTransStart(sandbox_mdir, &msg, &fname);
+       else ret = edg_wll_MaildirRetryTransStart(sandbox_mdir, (time_t)60, &msg, &fname);
+       if ( !ret ) { 
+               readnew = ~readnew;
+               if ( readnew ) ret = edg_wll_MaildirTransStart(sandbox_mdir, &msg, &fname);
+               else ret = edg_wll_MaildirRetryTransStart(sandbox_mdir, (time_t)60, &msg, &fname);
+               if ( !ret ) {
+                       readnew = ~readnew;
+                       return 0;
+               }
+       }
+
+       if ( ret < 0 ) {
+               dprintf(("[%s] edg_wll_MaildirTransStart: %s (%s)\n", name, strerror(errno), lbm_errdesc));
+               if ( !debug ) syslog(LOG_ERR, "edg_wll_MaildirTransStart: %s (%s)", strerror(errno), lbm_errdesc);
+               return -1;
+       }
+
+       dprintf(("[%s] sandbox JP import request received\n", name));
+       if ( !debug ) syslog(LOG_INFO, "sandbox JP import request received");
+
+       ret = 0;
+       if ( parse_msg(msg, tab) < 0 ) {
+               dprintf(("[%s] Wrong format of message!\n", name));
+               if ( !debug ) syslog(LOG_ERR, "Wrong format of message");
+               ret = 0;
+       } else do {
+               su_in.job = tab[_job].val;
+               // XXX: defined in org.glite.jp.primary/src/builtin_plugins.h
+               // shloud use symbolic const...
+               // do not distinquish between ibs and obs now 
+               su_in.class_ = "urn:org.glite.jp.primary:isb";
+               su_in.name = NULL;
+               su_in.commitBefore = 1000 + time(NULL);
+               su_in.contentType = "tar/lb";
+               dprintf(("[%s] Importing LB sandbox tar file '%s'\n", name, tab[_file].val));
+               if ( !debug ) syslog(LOG_INFO, "Importing LB sandbox tar file '%s'\n", msg);
+               ret = soap_call___jpsrv__StartUpload(soap, tab[_jpps].val?:jpps, "", &su_in, &su_out);
+               ret = check_soap_fault(soap, ret);
+               /* XXX: grrrrrrr! test it!!!*/
+//             if ( (ret = check_soap_fault(soap, ret)) ) break;
+               dprintf(("[%s] Destination: %s\n\tCommit before: %s\n", name, su_out.destination, ctime(&su_out.commitBefore)));
+
+               if ( (fhnd = open(tab[_file].val, O_RDONLY)) < 0 ) {
+                       dprintf(("[%s] Can't open sandbox tar file: %s\n", name, tab[_file].val));
+                       if ( !debug ) syslog(LOG_ERR, "Can't open sandbox tar file: %s", tab[_file].val);
+                       ret = 1;
+                       break;
+               }
+               if ( (ret = gftp_put_file(su_out.destination, fhnd)) ) break;
+               close(fhnd);
+               dprintf(("[%s] File sent, commiting the upload\n", name));
+               cu_in.destination = su_out.destination;
+               ret = soap_call___jpsrv__CommitUpload(soap, tab[_jpps].val?:jpps, "", &cu_in, &empty);
+               if ( (ret = check_soap_fault(soap, ret)) ) break;
+               dprintf(("[%s] Dump upload succesfull\n", name));
+       } while (0);
+
+       edg_wll_MaildirTransEnd(sandbox_mdir, fname, ret? LBMD_TRANS_FAILED_RETRY: LBMD_TRANS_OK);
+       free(fname);
+       free(msg);
+
+       return 1;
+}
+
+
+
 static int check_soap_fault(struct soap *soap, int err)
 {
        struct SOAP_ENV__Detail            *detail;