enable to start more slaves consuming LB dumps
authorMiloš Mulač <mulac@civ.zcu.cz>
Tue, 13 May 2008 15:40:08 +0000 (15:40 +0000)
committerMiloš Mulač <mulac@civ.zcu.cz>
Tue, 13 May 2008 15:40:08 +0000 (15:40 +0000)
org.glite.jp.client/src/jpimporter.c

index 4195963..3cd04a6 100644 (file)
@@ -60,6 +60,9 @@ typedef struct {
 #define JPREG_GIVUP_TIMEOUT                    3000
 #define JP_REPEAT_TIMEOUT                      360
 #define JP_GIVUP_TIMEOUT                       3600
+#define PID_POOL_SIZE                          20
+#define DUMP_SLAVES_DEFAULT                    1
+
 
 static int             debug = 0;
 static int             die = 0;
@@ -99,6 +102,7 @@ static struct option opts[] = {
        { "jpps",        1, NULL,    'p'},
        { "reg-mdir",    1, NULL,    'r'},
        { "dump-mdir",   1, NULL,    'd'},
+       { "dump-slaves", 1, NULL,    'D'},
        { "sandbox-mdir",1, NULL,    's'},
        { "pidfile",     1, NULL,    'i'},
        { "poll",        1, NULL,    't'},
@@ -110,7 +114,7 @@ static struct option opts[] = {
        { NULL,          0, NULL,     0}
 };
 
-static const char *get_opt_string = "hgp:r:d:s:i:t:c:k:C:"
+static const char *get_opt_string = "hgp:r:d:D:s:i:t:c:k:C:"
 #ifdef JP_PERF
        "K:"
 #endif
@@ -136,6 +140,7 @@ static void usage(char *me)
                "\t-p, --jpps         JP primary service server\n"
                "\t-r, --reg-mdir     path to the 'LB maildir' subtree for registrations\n"
                "\t-d, --dump-mdir    path to the 'LB maildir' subtree for LB dumps\n"
+               "\t-D, --dump-slaves  number of slaves processing LB dumps\n"
                "\t-s, --sandbox-mdir path to the 'LB maildir' subtree for input/output sandboxes\n"
                "\t-i, --pidfile      file to store master pid\n"
                "\t-t, --poll         maildir polling interval (in seconds)\n"
@@ -172,7 +177,9 @@ int main(int argc, char *argv[])
        struct sigaction        sa;
        sigset_t                sset;
        FILE                    *fpid;
-       pid_t                   reg_pid, dump_pid, sandbox_pid;
+       pid_t                   reg_pid, sandbox_pid;
+       pid_t                   dump_pids[PID_POOL_SIZE];
+       int                     dump_slaves = DEFAULT_DUMP_SLAVES_NUMBER, i;
        int                     opt;
        char                    *name,
                                pidfile[PATH_MAX] = GLITE_JPIMPORTER_PIDFILE;
@@ -196,6 +203,7 @@ int main(int argc, char *argv[])
                case 'S': store = optarg; break;
                case 'r': strcpy(reg_mdir, optarg); break;
                case 'd': strcpy(dump_mdir, optarg); break;
+               case 'D': dump_slaves = atoi(optarg); break;
                case 's': strcpy(sandbox_mdir, optarg); break;
                case 'i': strcpy(pidfile, optarg); break;
 #ifdef JP_PERF
@@ -205,6 +213,13 @@ int main(int argc, char *argv[])
        }
        if ( optind < argc ) { usage(name); return 1; }
 
+       if (dump_slaves > PID_POOL_SIZE) {
+               fprintf(stderr,"Maximum number of dump slaves is %d\n", PID_POOL_SIZE);
+               return(1);
+       }
+
+       memset(&dump_pids,0,sizeof(dump_pids)); 
+
        setlinebuf(stdout);
        setlinebuf(stderr);
 
@@ -297,9 +312,11 @@ int main(int argc, char *argv[])
                perror("starting reg importer slave");
                exit(1);
        }
-       if ( (dump_pid = slave(dump_importer, "dump-imp")) < 0 ) {
-               perror("starting dump importer slave");
-               exit(1);
+       for (i=0; i < dump_slaves; i++) {
+               if ( (dump_pids[i] = slave(dump_importer, "dump-imp")) < 0 ) {
+                       perror("starting dump importer slave");
+                       exit(1);
+               }
        }
        if ( (sandbox_pid = slave(sandbox_importer, "sandbox-imp")) < 0 ) {
                perror("starting sandbox importer slave");
@@ -326,15 +343,6 @@ int main(int argc, char *argv[])
                                                        exit(1);
                                                }
                                                dprintf("[master] reg importer slave restarted [%d]\n", reg_pid);
-                                       } else if ( pid == dump_pid ) {
-                                               dprintf("[master] dump importer slave died [%d]\n", pid);
-                                               if (!debug) syslog(LOG_INFO, "dump importer slave died [%d]\n", die);
-                                               if ( (dump_pid = slave(dump_importer, "dump-imp")) < 0 ) {
-                                                       perror("starting dump importer slave");
-                                                       kill(0, SIGINT);
-                                                       exit(1);
-                                               }
-                                               dprintf("[master] dump importer slave restarted [%d]\n", dump_pid);
                                        } else if ( pid == sandbox_pid ) {
                                                dprintf("[master] sandbox importer slave died [%d]\n", pid);
                                                if (!debug) syslog(LOG_INFO, "sandbox importer slave died [%d]\n", die);
@@ -344,6 +352,19 @@ int main(int argc, char *argv[])
                                                        exit(1);
                                                }
                                                dprintf("[master] sandbox importer slave restarted [%d]\n", sandbox_pid);
+                                       } else /* must be in dump_pids */ {
+                                               dprintf("[master] dump importer slave died [%d]\n", pid);
+                                               if (!debug) syslog(LOG_INFO, "dump importer slave died [%d]\n", die);
+                                               for (i=0; (i < dump_slaves) && (pid != dump_pids[i]); i++);
+                                               assert(i < dump_slaves); // pid should be in pool
+
+                                               if ( (dump_pids[i] = slave(dump_importer, "dump-imp")) < 0 ) {
+                                                       perror("starting dump importer slave");
+                                                       kill(0, SIGINT);
+                                                       exit(1);
+                                               }
+                                               dprintf("[master] dump importer slave restarted [%d]\n", dump_pids[i]);
+                               
                                        }
 
                                }
@@ -373,7 +394,7 @@ static int slave(int (*fn)(void), const char *nm)
 
        if ( (pid = fork()) ) return pid;
 
-       name = (char *)nm;
+       asprintf(&name,"%s %d",nm,getpid());
        memset(&sa, 0, sizeof(sa)); assert(sa.sa_handler == NULL);
        sa.sa_handler = catchsig;
        sigaction(SIGUSR1, &sa, NULL);
@@ -674,7 +695,6 @@ static int dump_importer(void)
 }
 
 
-
 static int sandbox_importer(void)
 {
        struct _jpelem__StartUpload             su_in;