From 715008991077db96238bce56f7e3569ce6bb621b Mon Sep 17 00:00:00 2001 From: =?utf8?q?Ji=C5=99=C3=AD=20=C5=A0kr=C3=A1bal?= Date: Wed, 10 Aug 2005 12:23:55 +0000 Subject: [PATCH] - new design - prepared to add more slaves (one for registrations one for dumps) --- org.glite.jp.client/src/jpimporter.c | 356 ++++++++++++++++++++++++++--------- 1 file changed, 264 insertions(+), 92 deletions(-) diff --git a/org.glite.jp.client/src/jpimporter.c b/org.glite.jp.client/src/jpimporter.c index b54aac8..fbc2d91 100644 --- a/org.glite.jp.client/src/jpimporter.c +++ b/org.glite.jp.client/src/jpimporter.c @@ -6,6 +6,7 @@ #include #include #include +#include #include #include #include @@ -33,32 +34,52 @@ #define GLITE_JPIMPORTER_PIDFILE "/var/run/glite-jpimporter.pid" #endif -#ifndef GLITE_JPIMPORTER_MDIR -#define GLITE_JPIMPORTER_MDIR "/tmp/jpreg" +#ifndef GLITE_REG_IMPORTER_MDIR +#define GLITE_REG_IMPORTER_MDIR "/tmp/jpreg" #endif -static int debug = 0; -static int die = 0; +#ifndef GLITE_DUMP_IMPORTER_MDIR +#define GLITE_DUMP_IMPORTER_MDIR "/tmp/jpdump" +#endif + +#ifndef GLITE_JPPS +#define GLITE_JPPS "http://localhost:8901" +#endif + +#define MAX_REG_CONNS 500 + +static int debug = 0; +static int die = 0; +static int child_died = 0; +static int poll = 2; +static char *jpps = GLITE_JPPS; +static char reg_mdir[PATH_MAX] = GLITE_REG_IMPORTER_MDIR; +static char dump_mdir[PATH_MAX] = GLITE_DUMP_IMPORTER_MDIR; +static struct soap *soap; static struct option opts[] = { { "help", 0, NULL, 'h'}, - { "debug", 0, NULL, 'd'}, + { "debug", 0, NULL, 'g'}, { "jpps", 1, NULL, 'p'}, - { "mdir", 1, NULL, 'm'}, + { "reg-mdir", 1, NULL, 'r'}, + { "dump-mdir", 1, NULL, 'd'}, { "pidfile", 1, NULL, 'i'}, + { "poll", 1, NULL, 't'}, { NULL, 0, NULL, 0} }; -static const char *get_opt_string = "hdp:m:i:"; +static const char *get_opt_string = "hgp:r:d::i:t:"; static void usage(char *me) { fprintf(stderr,"usage: %s [option]\n" - "\t-h, --help\t displays this screen\n" - "\t-d, --debug\t don't run as daemon, additional diagnostics\n" - "\t-p, --jpps\t JP primary service server\n" - "\t-m, --mdir\t path to the 'LB maildir' subtree\n" - "\t-i, --pidfile\t file to store master pid\n", + "\t-h, --help displays this screen\n" + "\t-g, --debug don't run as daemon, additional diagnostics\n" + "\t-p, --jpps JP primary service server\n" + "\t-r, --reg-mdir path to the 'LB maildir' subtree for registrations\n" + "\t-d, --dump-mdir path to the 'LB maildir' subtree for LB dumps\n" + "\t-i, --pidfile file to store master pid\n" + "\t-t, --poll maildir polling interval (in seconds)\n", me); } @@ -67,17 +88,28 @@ static void catchsig(int sig) die = sig; } +static void catch_chld(int sig) +{ + child_died = 1; +} + + +static int slave(int (*)(const char *), const char *); +static int check_soap_fault(struct soap *, int, const char *); +static int reg_importer(const char *); +static int dump_importer(const char *); + + + int main(int argc, char *argv[]) { struct sigaction sa; - struct soap *soap; sigset_t sset; FILE *fpid; + pid_t reg_pid, dump_pid; int opt; char *name, - *jpps = "http://localhost:8901", - pidfile[PATH_MAX] = GLITE_JPIMPORTER_PIDFILE, - mdir[PATH_MAX] = GLITE_JPIMPORTER_MDIR; + pidfile[PATH_MAX] = GLITE_JPIMPORTER_PIDFILE; name = strrchr(argv[0],'/'); @@ -88,20 +120,17 @@ int main(int argc, char *argv[]) while ( (opt = getopt_long(argc, argv, get_opt_string, opts, NULL)) != EOF ) switch ( opt ) { - case 'd': debug = 1; break; + case 'g': debug = 1; break; case 'h': usage(name); return 0; case 'p': jpps = optarg; break; - case 'm': strcpy(mdir, optarg); break; + case 't': poll = atoi(optarg); break; + case 'r': strcpy(reg_mdir, optarg); break; + case 'd': strcpy(dump_mdir, optarg); break; case 'i': strcpy(pidfile, optarg); break; case '?': usage(name); return 1; } if ( optind < argc ) { usage(name); return 1; } - soap = soap_new(); - soap_init(soap); - soap_set_namespaces(soap, jpps__namespaces); - soap_register_plugin(soap, glite_gsplugin); - setlinebuf(stdout); setlinebuf(stderr); @@ -140,100 +169,243 @@ int main(int argc, char *argv[]) sigaction(SIGINT, &sa, NULL); sigaction(SIGTERM, &sa, NULL); + sa.sa_handler = catch_chld; + sigaction(SIGCHLD, &sa, NULL); + sa.sa_handler = SIG_IGN; sigaction(SIGUSR1, &sa, NULL); sigemptyset(&sset); + sigaddset(&sset, SIGCHLD); sigaddset(&sset, SIGTERM); sigaddset(&sset, SIGINT); sigprocmask(SIG_BLOCK, &sset, NULL); + soap = soap_new(); + soap_init(soap); + soap_set_namespaces(soap, jpps__namespaces); + soap_register_plugin(soap, glite_gsplugin); + + if ( (reg_pid = slave(reg_importer, "reg-imp")) < 0 ) { + perror("starting reg importer slave"); + exit(1); + } + while ( !die ) { - int ret; - char *msg = NULL; - char *fname = NULL; - ret = edg_wll_MaildirTransStart(mdir, &msg, &fname); - /* XXX: where should unblocking signal besides? */ sigprocmask(SIG_UNBLOCK, &sset, NULL); + sleep(5); sigprocmask(SIG_BLOCK, &sset, NULL); - if ( ret < 0 ) { - dprintf(("edg_wll_MaildirTransStart: %s (%s)\n", strerror(errno), lbm_errdesc)); - if ( !debug ) syslog(LOG_ERR, "edg_wll_MaildirTransStart: %s (%s)", strerror(errno), lbm_errdesc); - exit(1); - } else if ( ret == 0 ) { - sleep(2); - } else { - struct _jpelem__RegisterJob in; - struct _jpelem__RegisterJobResponse empty; - struct SOAP_ENV__Detail *detail; - struct jptype__genericFault *f; - char *aux, *reason, indent[200] = " "; + if ( child_died ) { + int pid; + + while ( (pid = waitpid(-1, NULL, WNOHANG)) > 0 ) { + if ( !die ) { + if ( pid == reg_pid ) { + dprintf(("[master] reg importer slave died [%d]\n", pid)); + if (!debug) syslog(LOG_INFO, "reg importer slave died [%d]\n", die); + if ( (reg_pid = slave(reg_importer, "reg-imp")) < 0 ) { + perror("starting reg importer slave"); + kill(0, SIGINT); + exit(1); + } + dprintf(("[master] reg importer slave restarted [%d]\n", reg_pid)); + } + } + } + child_died = 0; + continue; + } + } + + dprintf(("[master] Terminating on signal %d\n", die)); + if (!debug) syslog(LOG_INFO, "Terminating on signal %d\n", die); + kill(0, die); - dprintf(("JP registration request received\n")); - if ( !debug ) syslog(LOG_INFO, "JP registration request received\n"); + unlink(pidfile); - if ( !(aux = strchr(msg, '\n')) ) { - dprintf(("Wrong format of message!\n")); - if ( !debug ) syslog(LOG_ERR, "Wrong format of message\n"); - free(msg); - continue; - } + return 0; +} + +static int slave(int (*fn)(const char *), const char *nm) +{ + struct sigaction sa; + sigset_t sset; + int pid, + conn_cnt = 0; + + + if ( (pid = fork()) ) return pid; + + memset(&sa, 0, sizeof(sa)); assert(sa.sa_handler == NULL); + sa.sa_handler = catchsig; + sigaction(SIGUSR1, &sa, NULL); + + sigemptyset(&sset); + sigaddset(&sset, SIGTERM); + sigaddset(&sset, SIGINT); + sigaddset(&sset, SIGUSR1); + sigprocmask(SIG_BLOCK, &sset, NULL); + + dprintf(("[%s] slave started - pid [%d]\n", nm, getpid())); + + while ( !die && conn_cnt < MAX_REG_CONNS ) { + int ret = fn(nm); + + if ( ret > 0 ) conn_cnt++; + else if ( ret < 0 ) exit(1); + else if ( ret == 0 ) { + sigprocmask(SIG_UNBLOCK, &sset, NULL); + sleep(poll); + sigprocmask(SIG_BLOCK, &sset, NULL); + } + } + + if ( die ) { + dprintf(("[%s] Terminating on signal %d\n", nm, getpid(), die)); + if ( !debug ) syslog(LOG_INFO, "Terminating on signal %d", die); + } + dprintf(("[%s] Terminating after %d connections\n", nm, conn_cnt)); + if ( !debug ) syslog(LOG_INFO, "Terminating after %d connections", conn_cnt); + + exit(0); +} + + +static int reg_importer(const char *nm) +{ + struct _jpelem__RegisterJob in; + struct _jpelem__RegisterJobResponse empty; + int ret; + char *msg = NULL, + *fname = NULL, + *aux; + + + ret = edg_wll_MaildirTransStart(reg_mdir, &msg, &fname); + if ( ret < 0 ) { + dprintf(("[%s] edg_wll_MaildirTransStart: %s (%s)\n", nm, strerror(errno), lbm_errdesc)); + if ( !debug ) syslog(LOG_ERR, "edg_wll_MaildirTransStart: %s (%s)", strerror(errno), lbm_errdesc); + return -1; + } else if ( ret > 0 ) { + dprintf(("[%s] JP registration request received\n", nm)); + if ( !debug ) syslog(LOG_INFO, "JP registration request received\n"); + + ret = 0; + if ( !(aux = strchr(msg, '\n')) ) { + dprintf(("[%s] Wrong format of message!\n", nm)); + if ( !debug ) syslog(LOG_ERR, "Wrong format of message\n"); + ret = 0; + } else { *aux++ = '\0'; in.job = msg; in.owner = aux; + dprintf(("[%s] Registering '%s'\n", nm, msg)); + if ( !debug ) syslog(LOG_INFO, "Registering '%s'\n", msg); ret = soap_call___jpsrv__RegisterJob(soap, jpps, "", &in, &empty); - free(msg); - - switch ( ret ) { - case SOAP_OK: - /* XXX: checks return error code */ - edg_wll_MaildirTransEnd(mdir, fname, LBMD_TRANS_OK); - dprintf(("Job '%s' succesfully registered to JP\n", msg)); - if ( !debug ) syslog(LOG_INFO, "Job '%s' succesfully registered to JP\n", msg); - break; - - case SOAP_FAULT: - case SOAP_SVR_FAULT: - edg_wll_MaildirTransEnd(mdir, fname, LBMD_TRANS_FAILED); - if (soap->version == 2) { - detail = soap->fault->SOAP_ENV__Detail; - reason = soap->fault->SOAP_ENV__Reason; - } else { - detail = soap->fault->detail; - reason = soap->fault->faultstring; - } - dprintf(("%s\n", reason)); - assert(detail->__type == SOAP_TYPE__genericFault); -#if GSOAP_VERSION >=20700 - f = ((struct _genericFault *) detail->fault) -#else - f = ((struct _genericFault *) detail->value) -#endif - -> jpelem__genericFault; + ret = check_soap_fault(soap, ret, nm); + } + edg_wll_MaildirTransEnd(reg_mdir, fname, ret? LBMD_TRANS_FAILED: LBMD_TRANS_OK); + free(fname); + free(msg); + return 1; + } - while ( f ) { - dprintf(("%s%s: %s (%s)\n", indent, f->source, f->text, f->description)); - f = f->reason; - strcat(indent, " "); - } - break; + return 0; +} - default: - soap_print_fault(soap, stderr); - edg_wll_MaildirTransEnd(mdir, fname, LBMD_TRANS_FAILED); - break; - } - free(fname); +static int dump_importer(const char *nm) +{ + struct _jpelem__StartUpload in; + struct _jpelem__StartUploadResponse out; + int ret; + char *msg = NULL, + *fname = NULL, + *aux; + + + ret = edg_wll_MaildirTransStart(dump_mdir, &msg, &fname); + if ( ret < 0 ) { + dprintf(("[%s] edg_wll_MaildirTransStart: %s (%s)\n", nm, strerror(errno), lbm_errdesc)); + if ( !debug ) syslog(LOG_ERR, "edg_wll_MaildirTransStart: %s (%s)", strerror(errno), lbm_errdesc); + return -1; + } else if ( ret > 0 ) { + dprintf(("[%s] dump JP import request received\n", nm)); + if ( !debug ) syslog(LOG_INFO, "dump JP import request received\n"); + + ret = 0; + if ( !(aux = strchr(msg, '\n')) ) { + dprintf(("[%s] Wrong format of message!\n", nm)); + if ( !debug ) syslog(LOG_ERR, "Wrong format of message\n"); + ret = 0; + } else { + *aux++ = '\0'; + in.job = argv[2]; + in.class_ = argv[3]; + in.name = NULL; + in.commitBefore = atoi(argv[4]) + time(NULL); + in.contentType = argv[5]; + dprintf(("[%s] Importing LB dump file '%s'\n", nm, msg)); + if ( !debug ) syslog(LOG_INFO, "Importing LB dump file '%s'\n", msg); + ret = soap_call___jpsrv__StartUpload(soap, jpps, "", &in, &out); + ret = check_soap_fault(soap, ret, nm); } + edg_wll_MaildirTransEnd(dump_mdir, fname, ret? LBMD_TRANS_FAILED: LBMD_TRANS_OK); + free(fname); + free(msg); + return 1; } - /* XXX: some sort of soap_destroy(soap) */ - dprintf(("Terminating on signal %d\n", die)); - if ( !debug ) syslog(LOG_INFO, "Terminating on signal %d\n", die); + return 0; +} - unlink(pidfile); + +static int check_soap_fault(struct soap *soap, int err, const char *msg_pref) +{ + struct SOAP_ENV__Detail *detail; + struct jptype__genericFault *f; + char *reason, + indent[200] = " "; + + + switch ( err ) { + case SOAP_OK: + dprintf(("[%s] ok\n", msg_pref)); + break; + + case SOAP_FAULT: + case SOAP_SVR_FAULT: + if (soap->version == 2) { + detail = soap->fault->SOAP_ENV__Detail; + reason = soap->fault->SOAP_ENV__Reason; + } else { + detail = soap->fault->detail; + reason = soap->fault->faultstring; + } + dprintf(("[%s] %s\n", msg_pref, reason)); + if ( !debug ) syslog(LOG_ERR, "%s %s", msg_pref, reason); + assert(detail->__type == SOAP_TYPE__genericFault); +#if GSOAP_VERSION >=20700 + f = ((struct _genericFault *) detail->fault) -> jpelem__genericFault; +#else + f = ((struct _genericFault *) detail->value) -> jpelem__genericFault; +#endif + while (f) { + dprintf(("[%s] %s%s: %s (%s)\n", + msg_pref, indent, + f->source, f->text, f->description)); + if ( !debug ) syslog(LOG_ERR, "%s %s%s: %s (%s)", + msg_pref, reason, + f->source, f->text, f->description); + f = f->reason; + strcat(indent, " "); + } + return -1; + + default: soap_print_fault(soap,stderr); + return -1; + } return 0; } -- 1.8.2.3