#define GLITE_DUMP_IMPORTER_MDIR "/tmp/jpdump"
#endif
+#ifndef GLITE_SANDBOX_IMPORTER_MDIR
+#define GLITE_SANDBOX_IMPORTER_MDIR "/tmp/jpsandbox"
+#endif
+
#ifndef GLITE_JPPS
#define GLITE_JPPS "http://localhost:8901"
#endif
#define MAX_REG_CONNS 500
-static int debug = 0;
-static int die = 0;
-static int child_died = 0;
-static int poll = 2;
-static char *name;
-static char *jpps = GLITE_JPPS;
-static char reg_mdir[PATH_MAX] = GLITE_REG_IMPORTER_MDIR;
-static char dump_mdir[PATH_MAX] = GLITE_DUMP_IMPORTER_MDIR;
-static struct soap *soap;
-
-static time_t cert_mtime;
-static char *server_cert = NULL,
- *server_key = NULL,
- *cadir;
+static int debug = 0;
+static int die = 0;
+static int child_died = 0;
+static int poll = 2;
+static char *name;
+static char *jpps = GLITE_JPPS;
+static char reg_mdir[PATH_MAX] = GLITE_REG_IMPORTER_MDIR;
+static char dump_mdir[PATH_MAX] = GLITE_DUMP_IMPORTER_MDIR;
+static char sandbox_mdir[PATH_MAX] = GLITE_SANDBOX_IMPORTER_MDIR;
+static struct soap *soap;
+
+static time_t cert_mtime;
+static char *server_cert = NULL,
+ *server_key = NULL,
+ *cadir = NULL;
static gss_cred_id_t mycred = GSS_C_NO_CREDENTIAL;
-static char *mysubj;
+static char *mysubj;
static struct option opts[] = {
{ "jpps", 1, NULL, 'p'},
{ "reg-mdir", 1, NULL, 'r'},
{ "dump-mdir", 1, NULL, 'd'},
+ { "sandbox-mdir",1, NULL, 's'},
{ "pidfile", 1, NULL, 'i'},
{ "poll", 1, NULL, 't'},
{ NULL, 0, NULL, 0}
};
-static const char *get_opt_string = "hgp:r:d:i:t:c:k:C:";
+static const char *get_opt_string = "hgp:r:d:s:i:t:c:k:C:";
static void usage(char *me)
{
"\t-p, --jpps JP primary service server\n"
"\t-r, --reg-mdir path to the 'LB maildir' subtree for registrations\n"
"\t-d, --dump-mdir path to the 'LB maildir' subtree for LB dumps\n"
+ "\t-d, --sandbox-mdir path to the 'LB maildir' subtree for input/output sandboxes\n"
"\t-i, --pidfile file to store master pid\n"
"\t-t, --poll maildir polling interval (in seconds)\n",
me);
static int check_soap_fault(struct soap *, int);
static int reg_importer(void);
static int dump_importer(void);
+static int sandbox_importer(void);
static int parse_msg(char *, msg_pattern_t []);
static int gftp_put_file(const char *, int);
{
edg_wll_GssStatus gss_code;
struct sigaction sa;
- sigset_t sset;
- FILE *fpid;
- pid_t reg_pid, dump_pid;
- int opt;
- char *name,
- pidfile[PATH_MAX] = GLITE_JPIMPORTER_PIDFILE;
-
+ sigset_t sset;
+ FILE *fpid;
+ pid_t reg_pid, dump_pid, sandbox_pid;
+ int opt;
+ char *name,
+ pidfile[PATH_MAX] = GLITE_JPIMPORTER_PIDFILE;
glite_gsplugin_Context plugin_ctx;
case 't': poll = atoi(optarg); break;
case 'r': strcpy(reg_mdir, optarg); break;
case 'd': strcpy(dump_mdir, optarg); break;
+ case 's': strcpy(sandbox_mdir, optarg); break;
case 'i': strcpy(pidfile, optarg); break;
case '?': usage(name); return 1;
}
edg_wll_MaildirInit(reg_mdir);
edg_wll_MaildirInit(dump_mdir);
+ edg_wll_MaildirInit(sandbox_mdir);
if ( !debug ) {
if ( daemon(1,0) == -1 ) { perror("deamon()"); exit(1); }
perror("starting dump importer slave");
exit(1);
}
+ if ( (sandbox_pid = slave(sandbox_importer, "sandbox-imp")) < 0 ) {
+ perror("starting sandbox importer slave");
+ exit(1);
+ }
while ( !die ) {
exit(1);
}
dprintf(("[master] dump importer slave restarted [%d]\n", dump_pid));
+ } else if ( pid == sandbox_pid ) {
+ dprintf(("[master] sandbox importer slave died [%d]\n", pid));
+ if (!debug) syslog(LOG_INFO, "sandbox importer slave died [%d]\n", die);
+ if ( (sandbox_pid = slave(sandbox_importer, "sandbox-imp")) < 0 ) {
+ perror("starting sandbox importer slave");
+ kill(0, SIGINT);
+ exit(1);
+ }
+ dprintf(("[master] sandbox importer slave restarted [%d]\n", sandbox_pid));
}
+
}
}
child_died = 0;
static int slave(int (*fn)(void), const char *nm)
{
struct sigaction sa;
- sigset_t sset;
- int pid,
- conn_cnt = 0;
+ sigset_t sset;
+ int pid,
+ conn_cnt = 0;
if ( (pid = fork()) ) return pid;
}
+
+static int sandbox_importer(void)
+{
+ struct _jpelem__StartUpload su_in;
+ struct _jpelem__StartUploadResponse su_out;
+ struct _jpelem__CommitUpload cu_in;
+ struct _jpelem__CommitUploadResponse empty;
+ static int readnew = 1;
+ char *msg = NULL,
+ *fname = NULL,
+ *aux;
+ int ret;
+ int fhnd;
+ msg_pattern_t tab[] = {
+ {"jobid", NULL},
+ {"file", NULL},
+ {"jpps", NULL},
+ {"proxy", NULL},
+ {NULL, NULL}};
+
+#define _job 0
+#define _file 1
+#define _jpps 2
+#define _proxy 3
+
+
+ if ( readnew ) ret = edg_wll_MaildirTransStart(sandbox_mdir, &msg, &fname);
+ else ret = edg_wll_MaildirRetryTransStart(sandbox_mdir, (time_t)60, &msg, &fname);
+ if ( !ret ) {
+ readnew = ~readnew;
+ if ( readnew ) ret = edg_wll_MaildirTransStart(sandbox_mdir, &msg, &fname);
+ else ret = edg_wll_MaildirRetryTransStart(sandbox_mdir, (time_t)60, &msg, &fname);
+ if ( !ret ) {
+ readnew = ~readnew;
+ return 0;
+ }
+ }
+
+ if ( ret < 0 ) {
+ dprintf(("[%s] edg_wll_MaildirTransStart: %s (%s)\n", name, strerror(errno), lbm_errdesc));
+ if ( !debug ) syslog(LOG_ERR, "edg_wll_MaildirTransStart: %s (%s)", strerror(errno), lbm_errdesc);
+ return -1;
+ }
+
+ dprintf(("[%s] sandbox JP import request received\n", name));
+ if ( !debug ) syslog(LOG_INFO, "sandbox JP import request received");
+
+ ret = 0;
+ if ( parse_msg(msg, tab) < 0 ) {
+ dprintf(("[%s] Wrong format of message!\n", name));
+ if ( !debug ) syslog(LOG_ERR, "Wrong format of message");
+ ret = 0;
+ } else do {
+ su_in.job = tab[_job].val;
+ // XXX: defined in org.glite.jp.primary/src/builtin_plugins.h
+ // shloud use symbolic const...
+ // do not distinquish between ibs and obs now
+ su_in.class_ = "urn:org.glite.jp.primary:isb";
+ su_in.name = NULL;
+ su_in.commitBefore = 1000 + time(NULL);
+ su_in.contentType = "tar/lb";
+ dprintf(("[%s] Importing LB sandbox tar file '%s'\n", name, tab[_file].val));
+ if ( !debug ) syslog(LOG_INFO, "Importing LB sandbox tar file '%s'\n", msg);
+ ret = soap_call___jpsrv__StartUpload(soap, tab[_jpps].val?:jpps, "", &su_in, &su_out);
+ ret = check_soap_fault(soap, ret);
+ /* XXX: grrrrrrr! test it!!!*/
+// if ( (ret = check_soap_fault(soap, ret)) ) break;
+ dprintf(("[%s] Destination: %s\n\tCommit before: %s\n", name, su_out.destination, ctime(&su_out.commitBefore)));
+
+ if ( (fhnd = open(tab[_file].val, O_RDONLY)) < 0 ) {
+ dprintf(("[%s] Can't open sandbox tar file: %s\n", name, tab[_file].val));
+ if ( !debug ) syslog(LOG_ERR, "Can't open sandbox tar file: %s", tab[_file].val);
+ ret = 1;
+ break;
+ }
+ if ( (ret = gftp_put_file(su_out.destination, fhnd)) ) break;
+ close(fhnd);
+ dprintf(("[%s] File sent, commiting the upload\n", name));
+ cu_in.destination = su_out.destination;
+ ret = soap_call___jpsrv__CommitUpload(soap, tab[_jpps].val?:jpps, "", &cu_in, &empty);
+ if ( (ret = check_soap_fault(soap, ret)) ) break;
+ dprintf(("[%s] Dump upload succesfull\n", name));
+ } while (0);
+
+ edg_wll_MaildirTransEnd(sandbox_mdir, fname, ret? LBMD_TRANS_FAILED_RETRY: LBMD_TRANS_OK);
+ free(fname);
+ free(msg);
+
+ return 1;
+}
+
+
+
static int check_soap_fault(struct soap *soap, int err)
{
struct SOAP_ENV__Detail *detail;