#include <string.h>
#include <assert.h>
#include <sys/types.h>
+#include <sys/wait.h>
#include <errno.h>
#include <syslog.h>
#include <linux/limits.h>
#define GLITE_JPIMPORTER_PIDFILE "/var/run/glite-jpimporter.pid"
#endif
-#ifndef GLITE_JPIMPORTER_MDIR
-#define GLITE_JPIMPORTER_MDIR "/tmp/jpreg"
+#ifndef GLITE_REG_IMPORTER_MDIR
+#define GLITE_REG_IMPORTER_MDIR "/tmp/jpreg"
#endif
-static int debug = 0;
-static int die = 0;
+#ifndef GLITE_DUMP_IMPORTER_MDIR
+#define GLITE_DUMP_IMPORTER_MDIR "/tmp/jpdump"
+#endif
+
+#ifndef GLITE_JPPS
+#define GLITE_JPPS "http://localhost:8901"
+#endif
+
+#define MAX_REG_CONNS 500
+
+static int debug = 0;
+static int die = 0;
+static int child_died = 0;
+static int poll = 2;
+static char *jpps = GLITE_JPPS;
+static char reg_mdir[PATH_MAX] = GLITE_REG_IMPORTER_MDIR;
+static char dump_mdir[PATH_MAX] = GLITE_DUMP_IMPORTER_MDIR;
+static struct soap *soap;
static struct option opts[] = {
{ "help", 0, NULL, 'h'},
- { "debug", 0, NULL, 'd'},
+ { "debug", 0, NULL, 'g'},
{ "jpps", 1, NULL, 'p'},
- { "mdir", 1, NULL, 'm'},
+ { "reg-mdir", 1, NULL, 'r'},
+ { "dump-mdir", 1, NULL, 'd'},
{ "pidfile", 1, NULL, 'i'},
+ { "poll", 1, NULL, 't'},
{ NULL, 0, NULL, 0}
};
-static const char *get_opt_string = "hdp:m:i:";
+static const char *get_opt_string = "hgp:r:d::i:t:";
static void usage(char *me)
{
fprintf(stderr,"usage: %s [option]\n"
- "\t-h, --help\t displays this screen\n"
- "\t-d, --debug\t don't run as daemon, additional diagnostics\n"
- "\t-p, --jpps\t JP primary service server\n"
- "\t-m, --mdir\t path to the 'LB maildir' subtree\n"
- "\t-i, --pidfile\t file to store master pid\n",
+ "\t-h, --help displays this screen\n"
+ "\t-g, --debug don't run as daemon, additional diagnostics\n"
+ "\t-p, --jpps JP primary service server\n"
+ "\t-r, --reg-mdir path to the 'LB maildir' subtree for registrations\n"
+ "\t-d, --dump-mdir path to the 'LB maildir' subtree for LB dumps\n"
+ "\t-i, --pidfile file to store master pid\n"
+ "\t-t, --poll maildir polling interval (in seconds)\n",
me);
}
die = sig;
}
+static void catch_chld(int sig)
+{
+ child_died = 1;
+}
+
+
+static int slave(int (*)(const char *), const char *);
+static int check_soap_fault(struct soap *, int, const char *);
+static int reg_importer(const char *);
+static int dump_importer(const char *);
+
+
+
int main(int argc, char *argv[])
{
struct sigaction sa;
- struct soap *soap;
sigset_t sset;
FILE *fpid;
+ pid_t reg_pid, dump_pid;
int opt;
char *name,
- *jpps = "http://localhost:8901",
- pidfile[PATH_MAX] = GLITE_JPIMPORTER_PIDFILE,
- mdir[PATH_MAX] = GLITE_JPIMPORTER_MDIR;
+ pidfile[PATH_MAX] = GLITE_JPIMPORTER_PIDFILE;
name = strrchr(argv[0],'/');
while ( (opt = getopt_long(argc, argv, get_opt_string, opts, NULL)) != EOF )
switch ( opt ) {
- case 'd': debug = 1; break;
+ case 'g': debug = 1; break;
case 'h': usage(name); return 0;
case 'p': jpps = optarg; break;
- case 'm': strcpy(mdir, optarg); break;
+ case 't': poll = atoi(optarg); break;
+ case 'r': strcpy(reg_mdir, optarg); break;
+ case 'd': strcpy(dump_mdir, optarg); break;
case 'i': strcpy(pidfile, optarg); break;
case '?': usage(name); return 1;
}
if ( optind < argc ) { usage(name); return 1; }
- soap = soap_new();
- soap_init(soap);
- soap_set_namespaces(soap, jpps__namespaces);
- soap_register_plugin(soap, glite_gsplugin);
-
setlinebuf(stdout);
setlinebuf(stderr);
sigaction(SIGINT, &sa, NULL);
sigaction(SIGTERM, &sa, NULL);
+ sa.sa_handler = catch_chld;
+ sigaction(SIGCHLD, &sa, NULL);
+
sa.sa_handler = SIG_IGN;
sigaction(SIGUSR1, &sa, NULL);
sigemptyset(&sset);
+ sigaddset(&sset, SIGCHLD);
sigaddset(&sset, SIGTERM);
sigaddset(&sset, SIGINT);
sigprocmask(SIG_BLOCK, &sset, NULL);
+ soap = soap_new();
+ soap_init(soap);
+ soap_set_namespaces(soap, jpps__namespaces);
+ soap_register_plugin(soap, glite_gsplugin);
+
+ if ( (reg_pid = slave(reg_importer, "reg-imp")) < 0 ) {
+ perror("starting reg importer slave");
+ exit(1);
+ }
+
while ( !die ) {
- int ret;
- char *msg = NULL;
- char *fname = NULL;
- ret = edg_wll_MaildirTransStart(mdir, &msg, &fname);
- /* XXX: where should unblocking signal besides? */
sigprocmask(SIG_UNBLOCK, &sset, NULL);
+ sleep(5);
sigprocmask(SIG_BLOCK, &sset, NULL);
- if ( ret < 0 ) {
- dprintf(("edg_wll_MaildirTransStart: %s (%s)\n", strerror(errno), lbm_errdesc));
- if ( !debug ) syslog(LOG_ERR, "edg_wll_MaildirTransStart: %s (%s)", strerror(errno), lbm_errdesc);
- exit(1);
- } else if ( ret == 0 ) {
- sleep(2);
- } else {
- struct _jpelem__RegisterJob in;
- struct _jpelem__RegisterJobResponse empty;
- struct SOAP_ENV__Detail *detail;
- struct jptype__genericFault *f;
- char *aux, *reason, indent[200] = " ";
+ if ( child_died ) {
+ int pid;
+
+ while ( (pid = waitpid(-1, NULL, WNOHANG)) > 0 ) {
+ if ( !die ) {
+ if ( pid == reg_pid ) {
+ dprintf(("[master] reg importer slave died [%d]\n", pid));
+ if (!debug) syslog(LOG_INFO, "reg importer slave died [%d]\n", die);
+ if ( (reg_pid = slave(reg_importer, "reg-imp")) < 0 ) {
+ perror("starting reg importer slave");
+ kill(0, SIGINT);
+ exit(1);
+ }
+ dprintf(("[master] reg importer slave restarted [%d]\n", reg_pid));
+ }
+ }
+ }
+ child_died = 0;
+ continue;
+ }
+ }
+
+ dprintf(("[master] Terminating on signal %d\n", die));
+ if (!debug) syslog(LOG_INFO, "Terminating on signal %d\n", die);
+ kill(0, die);
- dprintf(("JP registration request received\n"));
- if ( !debug ) syslog(LOG_INFO, "JP registration request received\n");
+ unlink(pidfile);
- if ( !(aux = strchr(msg, '\n')) ) {
- dprintf(("Wrong format of message!\n"));
- if ( !debug ) syslog(LOG_ERR, "Wrong format of message\n");
- free(msg);
- continue;
- }
+ return 0;
+}
+
+static int slave(int (*fn)(const char *), const char *nm)
+{
+ struct sigaction sa;
+ sigset_t sset;
+ int pid,
+ conn_cnt = 0;
+
+
+ if ( (pid = fork()) ) return pid;
+
+ memset(&sa, 0, sizeof(sa)); assert(sa.sa_handler == NULL);
+ sa.sa_handler = catchsig;
+ sigaction(SIGUSR1, &sa, NULL);
+
+ sigemptyset(&sset);
+ sigaddset(&sset, SIGTERM);
+ sigaddset(&sset, SIGINT);
+ sigaddset(&sset, SIGUSR1);
+ sigprocmask(SIG_BLOCK, &sset, NULL);
+
+ dprintf(("[%s] slave started - pid [%d]\n", nm, getpid()));
+
+ while ( !die && conn_cnt < MAX_REG_CONNS ) {
+ int ret = fn(nm);
+
+ if ( ret > 0 ) conn_cnt++;
+ else if ( ret < 0 ) exit(1);
+ else if ( ret == 0 ) {
+ sigprocmask(SIG_UNBLOCK, &sset, NULL);
+ sleep(poll);
+ sigprocmask(SIG_BLOCK, &sset, NULL);
+ }
+ }
+
+ if ( die ) {
+ dprintf(("[%s] Terminating on signal %d\n", nm, getpid(), die));
+ if ( !debug ) syslog(LOG_INFO, "Terminating on signal %d", die);
+ }
+ dprintf(("[%s] Terminating after %d connections\n", nm, conn_cnt));
+ if ( !debug ) syslog(LOG_INFO, "Terminating after %d connections", conn_cnt);
+
+ exit(0);
+}
+
+
+static int reg_importer(const char *nm)
+{
+ struct _jpelem__RegisterJob in;
+ struct _jpelem__RegisterJobResponse empty;
+ int ret;
+ char *msg = NULL,
+ *fname = NULL,
+ *aux;
+
+
+ ret = edg_wll_MaildirTransStart(reg_mdir, &msg, &fname);
+ if ( ret < 0 ) {
+ dprintf(("[%s] edg_wll_MaildirTransStart: %s (%s)\n", nm, strerror(errno), lbm_errdesc));
+ if ( !debug ) syslog(LOG_ERR, "edg_wll_MaildirTransStart: %s (%s)", strerror(errno), lbm_errdesc);
+ return -1;
+ } else if ( ret > 0 ) {
+ dprintf(("[%s] JP registration request received\n", nm));
+ if ( !debug ) syslog(LOG_INFO, "JP registration request received\n");
+
+ ret = 0;
+ if ( !(aux = strchr(msg, '\n')) ) {
+ dprintf(("[%s] Wrong format of message!\n", nm));
+ if ( !debug ) syslog(LOG_ERR, "Wrong format of message\n");
+ ret = 0;
+ } else {
*aux++ = '\0';
in.job = msg;
in.owner = aux;
+ dprintf(("[%s] Registering '%s'\n", nm, msg));
+ if ( !debug ) syslog(LOG_INFO, "Registering '%s'\n", msg);
ret = soap_call___jpsrv__RegisterJob(soap, jpps, "", &in, &empty);
- free(msg);
-
- switch ( ret ) {
- case SOAP_OK:
- /* XXX: checks return error code */
- edg_wll_MaildirTransEnd(mdir, fname, LBMD_TRANS_OK);
- dprintf(("Job '%s' succesfully registered to JP\n", msg));
- if ( !debug ) syslog(LOG_INFO, "Job '%s' succesfully registered to JP\n", msg);
- break;
-
- case SOAP_FAULT:
- case SOAP_SVR_FAULT:
- edg_wll_MaildirTransEnd(mdir, fname, LBMD_TRANS_FAILED);
- if (soap->version == 2) {
- detail = soap->fault->SOAP_ENV__Detail;
- reason = soap->fault->SOAP_ENV__Reason;
- } else {
- detail = soap->fault->detail;
- reason = soap->fault->faultstring;
- }
- dprintf(("%s\n", reason));
- assert(detail->__type == SOAP_TYPE__genericFault);
-#if GSOAP_VERSION >=20700
- f = ((struct _genericFault *) detail->fault)
-#else
- f = ((struct _genericFault *) detail->value)
-#endif
- -> jpelem__genericFault;
+ ret = check_soap_fault(soap, ret, nm);
+ }
+ edg_wll_MaildirTransEnd(reg_mdir, fname, ret? LBMD_TRANS_FAILED: LBMD_TRANS_OK);
+ free(fname);
+ free(msg);
+ return 1;
+ }
- while ( f ) {
- dprintf(("%s%s: %s (%s)\n", indent, f->source, f->text, f->description));
- f = f->reason;
- strcat(indent, " ");
- }
- break;
+ return 0;
+}
- default:
- soap_print_fault(soap, stderr);
- edg_wll_MaildirTransEnd(mdir, fname, LBMD_TRANS_FAILED);
- break;
- }
- free(fname);
+static int dump_importer(const char *nm)
+{
+ struct _jpelem__StartUpload in;
+ struct _jpelem__StartUploadResponse out;
+ int ret;
+ char *msg = NULL,
+ *fname = NULL,
+ *aux;
+
+
+ ret = edg_wll_MaildirTransStart(dump_mdir, &msg, &fname);
+ if ( ret < 0 ) {
+ dprintf(("[%s] edg_wll_MaildirTransStart: %s (%s)\n", nm, strerror(errno), lbm_errdesc));
+ if ( !debug ) syslog(LOG_ERR, "edg_wll_MaildirTransStart: %s (%s)", strerror(errno), lbm_errdesc);
+ return -1;
+ } else if ( ret > 0 ) {
+ dprintf(("[%s] dump JP import request received\n", nm));
+ if ( !debug ) syslog(LOG_INFO, "dump JP import request received\n");
+
+ ret = 0;
+ if ( !(aux = strchr(msg, '\n')) ) {
+ dprintf(("[%s] Wrong format of message!\n", nm));
+ if ( !debug ) syslog(LOG_ERR, "Wrong format of message\n");
+ ret = 0;
+ } else {
+ *aux++ = '\0';
+ in.job = argv[2];
+ in.class_ = argv[3];
+ in.name = NULL;
+ in.commitBefore = atoi(argv[4]) + time(NULL);
+ in.contentType = argv[5];
+ dprintf(("[%s] Importing LB dump file '%s'\n", nm, msg));
+ if ( !debug ) syslog(LOG_INFO, "Importing LB dump file '%s'\n", msg);
+ ret = soap_call___jpsrv__StartUpload(soap, jpps, "", &in, &out);
+ ret = check_soap_fault(soap, ret, nm);
}
+ edg_wll_MaildirTransEnd(dump_mdir, fname, ret? LBMD_TRANS_FAILED: LBMD_TRANS_OK);
+ free(fname);
+ free(msg);
+ return 1;
}
- /* XXX: some sort of soap_destroy(soap) */
- dprintf(("Terminating on signal %d\n", die));
- if ( !debug ) syslog(LOG_INFO, "Terminating on signal %d\n", die);
+ return 0;
+}
- unlink(pidfile);
+
+static int check_soap_fault(struct soap *soap, int err, const char *msg_pref)
+{
+ struct SOAP_ENV__Detail *detail;
+ struct jptype__genericFault *f;
+ char *reason,
+ indent[200] = " ";
+
+
+ switch ( err ) {
+ case SOAP_OK:
+ dprintf(("[%s] ok\n", msg_pref));
+ break;
+
+ case SOAP_FAULT:
+ case SOAP_SVR_FAULT:
+ if (soap->version == 2) {
+ detail = soap->fault->SOAP_ENV__Detail;
+ reason = soap->fault->SOAP_ENV__Reason;
+ } else {
+ detail = soap->fault->detail;
+ reason = soap->fault->faultstring;
+ }
+ dprintf(("[%s] %s\n", msg_pref, reason));
+ if ( !debug ) syslog(LOG_ERR, "%s %s", msg_pref, reason);
+ assert(detail->__type == SOAP_TYPE__genericFault);
+#if GSOAP_VERSION >=20700
+ f = ((struct _genericFault *) detail->fault) -> jpelem__genericFault;
+#else
+ f = ((struct _genericFault *) detail->value) -> jpelem__genericFault;
+#endif
+ while (f) {
+ dprintf(("[%s] %s%s: %s (%s)\n",
+ msg_pref, indent,
+ f->source, f->text, f->description));
+ if ( !debug ) syslog(LOG_ERR, "%s %s%s: %s (%s)",
+ msg_pref, reason,
+ f->source, f->text, f->description);
+ f = f->reason;
+ strcat(indent, " ");
+ }
+ return -1;
+
+ default: soap_print_fault(soap,stderr);
+ return -1;
+ }
return 0;
}