From e7ab9ef63a124c1eba91b0a893e24e0c26ccf10e Mon Sep 17 00:00:00 2001 From: =?utf8?q?Milo=C5=A1=20Mula=C4=8D?= Date: Tue, 13 May 2008 15:40:08 +0000 Subject: [PATCH] enable to start more slaves consuming LB dumps --- org.glite.jp.client/src/jpimporter.c | 52 +++++++++++++++++++++++++----------- 1 file changed, 36 insertions(+), 16 deletions(-) diff --git a/org.glite.jp.client/src/jpimporter.c b/org.glite.jp.client/src/jpimporter.c index 4195963..3cd04a6 100644 --- a/org.glite.jp.client/src/jpimporter.c +++ b/org.glite.jp.client/src/jpimporter.c @@ -60,6 +60,9 @@ typedef struct { #define JPREG_GIVUP_TIMEOUT 3000 #define JP_REPEAT_TIMEOUT 360 #define JP_GIVUP_TIMEOUT 3600 +#define PID_POOL_SIZE 20 +#define DUMP_SLAVES_DEFAULT 1 + static int debug = 0; static int die = 0; @@ -99,6 +102,7 @@ static struct option opts[] = { { "jpps", 1, NULL, 'p'}, { "reg-mdir", 1, NULL, 'r'}, { "dump-mdir", 1, NULL, 'd'}, + { "dump-slaves", 1, NULL, 'D'}, { "sandbox-mdir",1, NULL, 's'}, { "pidfile", 1, NULL, 'i'}, { "poll", 1, NULL, 't'}, @@ -110,7 +114,7 @@ static struct option opts[] = { { NULL, 0, NULL, 0} }; -static const char *get_opt_string = "hgp:r:d:s:i:t:c:k:C:" +static const char *get_opt_string = "hgp:r:d:D:s:i:t:c:k:C:" #ifdef JP_PERF "K:" #endif @@ -136,6 +140,7 @@ static void usage(char *me) "\t-p, --jpps JP primary service server\n" "\t-r, --reg-mdir path to the 'LB maildir' subtree for registrations\n" "\t-d, --dump-mdir path to the 'LB maildir' subtree for LB dumps\n" + "\t-D, --dump-slaves number of slaves processing LB dumps\n" "\t-s, --sandbox-mdir path to the 'LB maildir' subtree for input/output sandboxes\n" "\t-i, --pidfile file to store master pid\n" "\t-t, --poll maildir polling interval (in seconds)\n" @@ -172,7 +177,9 @@ int main(int argc, char *argv[]) struct sigaction sa; sigset_t sset; FILE *fpid; - pid_t reg_pid, dump_pid, sandbox_pid; + pid_t reg_pid, sandbox_pid; + pid_t dump_pids[PID_POOL_SIZE]; + int dump_slaves = DEFAULT_DUMP_SLAVES_NUMBER, i; int opt; char *name, pidfile[PATH_MAX] = GLITE_JPIMPORTER_PIDFILE; @@ -196,6 +203,7 @@ int main(int argc, char *argv[]) case 'S': store = optarg; break; case 'r': strcpy(reg_mdir, optarg); break; case 'd': strcpy(dump_mdir, optarg); break; + case 'D': dump_slaves = atoi(optarg); break; case 's': strcpy(sandbox_mdir, optarg); break; case 'i': strcpy(pidfile, optarg); break; #ifdef JP_PERF @@ -205,6 +213,13 @@ int main(int argc, char *argv[]) } if ( optind < argc ) { usage(name); return 1; } + if (dump_slaves > PID_POOL_SIZE) { + fprintf(stderr,"Maximum number of dump slaves is %d\n", PID_POOL_SIZE); + return(1); + } + + memset(&dump_pids,0,sizeof(dump_pids)); + setlinebuf(stdout); setlinebuf(stderr); @@ -297,9 +312,11 @@ int main(int argc, char *argv[]) perror("starting reg importer slave"); exit(1); } - if ( (dump_pid = slave(dump_importer, "dump-imp")) < 0 ) { - perror("starting dump importer slave"); - exit(1); + for (i=0; i < dump_slaves; i++) { + if ( (dump_pids[i] = slave(dump_importer, "dump-imp")) < 0 ) { + perror("starting dump importer slave"); + exit(1); + } } if ( (sandbox_pid = slave(sandbox_importer, "sandbox-imp")) < 0 ) { perror("starting sandbox importer slave"); @@ -326,15 +343,6 @@ int main(int argc, char *argv[]) exit(1); } dprintf("[master] reg importer slave restarted [%d]\n", reg_pid); - } else if ( pid == dump_pid ) { - dprintf("[master] dump importer slave died [%d]\n", pid); - if (!debug) syslog(LOG_INFO, "dump importer slave died [%d]\n", die); - if ( (dump_pid = slave(dump_importer, "dump-imp")) < 0 ) { - perror("starting dump importer slave"); - kill(0, SIGINT); - exit(1); - } - dprintf("[master] dump importer slave restarted [%d]\n", dump_pid); } else if ( pid == sandbox_pid ) { dprintf("[master] sandbox importer slave died [%d]\n", pid); if (!debug) syslog(LOG_INFO, "sandbox importer slave died [%d]\n", die); @@ -344,6 +352,19 @@ int main(int argc, char *argv[]) exit(1); } dprintf("[master] sandbox importer slave restarted [%d]\n", sandbox_pid); + } else /* must be in dump_pids */ { + dprintf("[master] dump importer slave died [%d]\n", pid); + if (!debug) syslog(LOG_INFO, "dump importer slave died [%d]\n", die); + for (i=0; (i < dump_slaves) && (pid != dump_pids[i]); i++); + assert(i < dump_slaves); // pid should be in pool + + if ( (dump_pids[i] = slave(dump_importer, "dump-imp")) < 0 ) { + perror("starting dump importer slave"); + kill(0, SIGINT); + exit(1); + } + dprintf("[master] dump importer slave restarted [%d]\n", dump_pids[i]); + } } @@ -373,7 +394,7 @@ static int slave(int (*fn)(void), const char *nm) if ( (pid = fork()) ) return pid; - name = (char *)nm; + asprintf(&name,"%s %d",nm,getpid()); memset(&sa, 0, sizeof(sa)); assert(sa.sa_handler == NULL); sa.sa_handler = catchsig; sigaction(SIGUSR1, &sa, NULL); @@ -674,7 +695,6 @@ static int dump_importer(void) } - static int sandbox_importer(void) { struct _jpelem__StartUpload su_in; -- 1.8.2.3