glued server+proxy
authorMiloš Mulač <mulac@civ.zcu.cz>
Thu, 6 Sep 2007 11:52:41 +0000 (11:52 +0000)
committerMiloš Mulač <mulac@civ.zcu.cz>
Thu, 6 Sep 2007 11:52:41 +0000 (11:52 +0000)
- compiles, but more work needs to be done to be fully funcional

org.glite.lb.server/config/glite-lb-dbsetup.sql
org.glite.lb.server/config/glite-lb-migrate_proxy_db2proxy_server_db [new file with mode: 0644]
org.glite.lb.server/config/glite-lb-migrate_server_db2proxy_server_db [new file with mode: 0644]
org.glite.lb.server/src/bkserverd.c
org.glite.lb.server/src/store.c.T
org.glite.lb.server/src/stored_master.c

index 6e512be..a533ec1 100644 (file)
@@ -3,6 +3,9 @@ create table jobs (
        dg_jobid        varchar(255)    binary not null,
        userid          char(32)        binary not null,
        aclid           char(32)        binary null,
+       proxy           bool            not null,
+       server          bool            not null,
+       
 
        primary key (jobid),
        unique (dg_jobid),
diff --git a/org.glite.lb.server/config/glite-lb-migrate_proxy_db2proxy_server_db b/org.glite.lb.server/config/glite-lb-migrate_proxy_db2proxy_server_db
new file mode 100644 (file)
index 0000000..25fe402
--- /dev/null
@@ -0,0 +1,19 @@
+# This script is intendent to be used to extend proxy database to 
+# to be able to hold both bkserver and lbproxy jobs.
+# The operation should be non-destructive, i.e. all data should persist
+# and continue to be fully usable.
+
+#!/bin/bash
+
+
+# add columns for job membership (proxy/server) flags
+mysql -u lbserver lbproxy -e "ALTER TABLE jobs ADD proxy bool not null"
+mysql -u lbserver lbproxy -e "ALTER TABLE jobs ADD server bool not null"
+
+# flag all jobs as proxy jobs
+mysql -u lbserver lbproxy -e "update jobs set proxy='1'"
+mysql -u lbserver lbproxy -e "update jobs set server='0'"
+
+# something more clever should come here
+#      - rename DB to lbserver20
+#      - or copy all fields to lbserver20 DB
diff --git a/org.glite.lb.server/config/glite-lb-migrate_server_db2proxy_server_db b/org.glite.lb.server/config/glite-lb-migrate_server_db2proxy_server_db
new file mode 100644 (file)
index 0000000..94cedda
--- /dev/null
@@ -0,0 +1,15 @@
+# This script is intendent to be used to extend bkserver database to 
+# to be able to hold both bkserver and lbproxy jobs.
+# The operation should be non-destructive, i.e. all data should persist
+# and continue to be fully usable.
+
+#!/bin/bash
+
+
+# add columns for job membership (proxy/server) flags
+mysql -u lbserver lbserver20 -e "ALTER TABLE jobs ADD proxy bool not null"
+mysql -u lbserver lbserver20 -e "ALTER TABLE jobs ADD server bool not null"
+
+# flag all jobs as server jobs
+mysql -u lbserver lbserver20 -e "update jobs set proxy='0'"
+mysql -u lbserver lbserver20 -e "update jobs set server='1'"
index 5a5caaa..2074fb6 100644 (file)
@@ -12,6 +12,7 @@
 #include <sys/wait.h>
 #include <sys/socket.h>
 #include <sys/uio.h>
+#include <sys/un.h>
 #include <netinet/in.h>
 #include <signal.h>
 #include <errno.h>
@@ -71,6 +72,11 @@ enum lb_srv_perf_sink sink_mode;
 extern int edg_wll_StoreProto(edg_wll_Context ctx);
 extern edg_wll_ErrorCode edg_wll_Open(edg_wll_Context ctx, char *cs);
 extern edg_wll_ErrorCode edg_wll_Close(edg_wll_Context);
+extern int edg_wll_StoreProtoProxy(edg_wll_Context ctx);
+
+extern char *lbproxy_ilog_socket_path;
+extern char *lbproxy_ilog_file_prefix;
+
 
 #ifdef LB_PERF
 extern void _start (void), etext (void);
@@ -102,12 +108,21 @@ extern void _start (void), etext (void);
 #define EDG_BKSERVERD_PIDFILE  "/var/run/edg-bkserverd.pid"
 #endif
 
+#ifndef GLITE_LBPROXY_SOCK_PREFIX
+#define GLITE_LBPROXY_SOCK_PREFIX       "/tmp/lb_proxy_"
+#endif
+
 #ifndef dprintf
 #define dprintf(x)             { if (debug) printf x; }
 #endif
 
 #define sizofa(a)              (sizeof(a)/sizeof((a)[0]))
 
+#define        SERVICE_PROXY           1
+#define SERVICE_SERVER 2
+#define SERVICE_PROXY_SERVER   SERVICE_PROXY+SERVICE_SERVER
+
+
 
 int                                            debug  = 0;
 int                                            rgma_export = 0;
@@ -143,6 +158,14 @@ char                                  *cadir = NULL,
                                           *vomsdir = NULL,
                                           *server_key = NULL,
                                           *server_cert = NULL;
+static int             mode = SERVICE_SERVER;
+static char             sock_store[PATH_MAX],
+                        sock_serve[PATH_MAX];
+static int             con_queue = CON_QUEUE;
+static char             host[300];
+static char *           port;
+
+
 
 
 static struct option opts[] = {
@@ -169,7 +192,7 @@ static struct option opts[] = {
        {"super-user",  1, NULL,        'R'},
        {"super-users-file",    1, NULL,'F'},
        {"no-index",    1, NULL,        'x'},
-       {"strict-locking",0, NULL,      'P'},
+       {"strict-locking",0, NULL,      'O'},
        {"limits",      1, NULL,        'L'},
        {"notif-dur",   1, NULL,        'N'},
        {"notif-il-sock",       1, NULL,        'X'},
@@ -182,10 +205,16 @@ static struct option opts[] = {
 #endif
        {"transactions",        1,      NULL,   'b'},
        {"greyjobs",    0,      NULL,   'g'},
+       {"withproxy",   0,      NULL,   'B'},
+       {"proxyonly",   0,      NULL,   'P'},
+       {"sock",        0,      NULL,   'o'},
+       {"con-queue",   1,      NULL,   'q'},
+       {"proxy-il-sock",       1,      NULL,   'W'},
+       {"proxy-il-fprefix",    1,      NULL,   'Z'},
        {NULL,0,NULL,0}
 };
 
-static const char *get_opt_string = "a:c:k:C:V:p:drm:ns:l:L:N:i:S:D:X:Y:T:t:J:jzb:g"
+static const char *get_opt_string = "c:k:C:V:p:a:drm:ns:l:i:S:D:J:jR:F:xOL:N:X:Y:T:t:zb:gPBo:q:W:Z:"
 #ifdef GLITE_LB_SERVER_WITH_WS
        "w:"
 #endif
@@ -235,6 +264,12 @@ static void usage(char *me)
                "\t--perf-sink\t where to sink events\n"
 #endif
                "\t-g,--greyjobs\t allow delayed registration (grey jobs), implies --strict-locking\n"
+               "\t-P,--proxyonly\t     run only proxy service\n"
+               "\t-B,--withproxy\t     run both server and proxy service\n"
+               "\t-o,--sock\t path-name to the local socket for communication with LB proxy\n"
+               "\t-q,--con-queue\t size of the connection queue (accept)\n"
+               "\t-W,--proxy-il-sock\t socket to send events to\n"
+               "\t-Z,--proxy-il-fprefix\t file prefix for events\n"
 
        ,me);
 }
@@ -272,17 +307,44 @@ int bk_ws_clnt_reject(int);
 int bk_ws_clnt_disconnect(int, struct timeval *, void *);
 #endif         /*GLITE_LB_SERVER_WITH_WS */
 
-#define SRV_SERVE              0
-#define SRV_STORE              1
+        /*
+         *      Proxy handlers
+         */
+int bk_clnt_reject_proxy(int);
+int bk_handle_connection_proxy(int, struct timeval *, void *);
+int bk_clnt_disconnect_proxy(int, struct timeval *, void *);
+
+  
+  #define SERVICE_SERVER_START 0 
+  #define SRV_SERVE                    0
+  #define SRV_STORE                    1
 #ifdef GLITE_LB_SERVER_WITH_WS
-#define SRV_WS                 2
+  #define SRV_WS                       2
+  #define SERVICE_SERVER_SIZE          3
+
+  #define SERVICE_PROXY_START          3
+  #define SRV_SERVE_PROXY              3
+  #define SRV_STORE_PROXY              4
+  #define SERVICE_PROXY_SIZE           2
+#else
+  #define SERVICE_SERVER_SIZE          2
+
+  #define SERVICE_PROXY_START          2
+  #define SRV_SERVE_PROXY              2
+  #define SRV_STORE_PROXY              3
+  #define SERVICE_PROXY_SIZE           2
 #endif /* GLITE_LB_SERVER_WITH_WS */
+
+
+
 static struct glite_srvbones_service service_table[] = {
        { "serve",      -1, bk_handle_connection, bk_accept_serve, bk_clnt_reject, bk_clnt_disconnect },
        { "store",      -1, bk_handle_connection, bk_accept_store, bk_clnt_reject, bk_clnt_disconnect },
 #ifdef GLITE_LB_SERVER_WITH_WS
-       { "WS",         -1, bk_handle_ws_connection, bk_accept_ws, bk_ws_clnt_reject, bk_ws_clnt_disconnect }
+       { "WS",         -1, bk_handle_ws_connection, bk_accept_ws, bk_ws_clnt_reject, bk_ws_clnt_disconnect },
 #endif /* GLITE_LB_SERVER_WITH_WS */
+       { "serve_proxy",        -1, bk_handle_connection_proxy, bk_accept_serve, bk_clnt_reject_proxy, bk_clnt_disconnect_proxy },
+       { "store_proxy",        -1, bk_handle_connection_proxy, bk_accept_store, bk_clnt_reject_proxy, bk_clnt_disconnect_proxy }
 };
 
 struct clnt_data_t {
@@ -294,6 +356,7 @@ struct clnt_data_t {
        void                               *mysql;
        edg_wll_QueryRec          **job_index;
        edg_wll_IColumnRec         *job_index_cols;
+       int                     mode;
 };
 
 
@@ -306,7 +369,6 @@ int main(int argc, char *argv[])
        char                       *mysubj = NULL;
        int                                     opt;
        char                            pidfile[PATH_MAX] = EDG_BKSERVERD_PIDFILE,
-                                          *port,
                                           *name;
 #ifdef GLITE_LB_SERVER_WITH_WS
        char                       *ws_port;
@@ -318,6 +380,7 @@ int main(int argc, char *argv[])
        struct timeval          to;
        int                     request_timeout = REQUEST_TIMEOUT;
        int                     silent = 0;
+       char                    socket_path_prefix[PATH_MAX] = GLITE_LBPROXY_SOCK_PREFIX;
 
 
        /* keep this at start of main() ! */
@@ -339,9 +402,6 @@ int main(int argc, char *argv[])
        purge_timeout[EDG_WLL_JOB_ABORTED] = 60*60*24*7;
        purge_timeout[EDG_WLL_JOB_CANCELLED] = 60*60*24*7;
 
-       if (geteuid()) snprintf(pidfile,sizeof pidfile,"%s/edg-bkserverd.pid",
-                       getenv("HOME"));
-
        while ((opt = getopt_long(argc,argv,get_opt_string,opts,NULL)) != EOF) switch (opt) {
                case 'a': fake_host = strdup(optarg); break;
                case 'b': transactions = atoi(optarg); break;
@@ -394,7 +454,7 @@ int main(int argc, char *argv[])
                case 'x': noIndex = atoi(optarg);
                          if (noIndex < 0 || noIndex > 2) { usage(name); return 1; }
                          break;
-               case 'P': strict_locking = 1;
+               case 'O': strict_locking = 1;
                          break;
                case 'T': count_statistics = atoi(optarg);
                          break;
@@ -406,6 +466,18 @@ int main(int argc, char *argv[])
 #endif
                case 'g': greyjobs = strict_locking = 1;
                          break;
+               case 'P': mode = SERVICE_PROXY;
+                         break;
+               case 'B': mode = SERVICE_PROXY_SERVER;;
+                         break;
+               case 'o': strcpy(socket_path_prefix, optarg);
+                         break;
+               case 'q': con_queue = atoi(optarg); 
+                         break;
+               case 'W': lbproxy_ilog_socket_path = strdup(optarg); 
+                         break;
+               case 'Z': lbproxy_ilog_file_prefix = strdup(optarg);
+                         break;
                case '?': usage(name); return 1;
        }
 
@@ -414,6 +486,11 @@ int main(int argc, char *argv[])
        setlinebuf(stdout);
        setlinebuf(stderr);
 
+       if (mode & SERVICE_PROXY) dprintf(("\nStaring LB proxy service\n"));
+       if (mode & SERVICE_SERVER) dprintf(("\nStaring LB server service\n"));
+
+       if (geteuid()) snprintf(pidfile,sizeof pidfile, "%s/edg-bkserverd.pid", getenv("HOME"));
+
        fpid = fopen(pidfile,"r");
        if ( fpid )
        {
@@ -438,17 +515,18 @@ int main(int argc, char *argv[])
 
        semkey = ftok(pidfile,0);
 
-       if (check_mkdir(dumpStorage)) exit(1);
-       if (check_mkdir(purgeStorage)) exit(1);
-       if ( jpreg ) {
-               if ( edg_wll_MaildirInit(jpregDir) ) {
-                       dprintf(("[%d] edg_wll_MaildirInit failed: %s\n", getpid(), lbm_errdesc));
-                       if (!debug) syslog(LOG_CRIT, "edg_wll_MaildirInit failed: %s", lbm_errdesc);
-                       exit(1);
+       if (mode & SERVICE_SERVER) {
+               if (check_mkdir(dumpStorage)) exit(1);
+               if (check_mkdir(purgeStorage)) exit(1);
+               if ( jpreg ) {
+                       if ( edg_wll_MaildirInit(jpregDir) ) {
+                               dprintf(("[%d] edg_wll_MaildirInit failed: %s\n", getpid(), lbm_errdesc));
+                               if (!debug) syslog(LOG_CRIT, "edg_wll_MaildirInit failed: %s", lbm_errdesc);
+                               exit(1);
+                       }
                }
        }
 
-
        if (semaphores == -1) semaphores = slaves;
        semset = semget(semkey, 0, 0);
        if (semset >= 0) semctl(semset, 0, IPC_RMID);
@@ -462,112 +540,169 @@ int main(int argc, char *argv[])
                s.sem_num = i; s.sem_op = 1; s.sem_flg = 0;
                if (semop(semset,&s,1) == -1) { perror("semop()"); return 1; }
        }
+       
+       if (mode & SERVICE_SERVER) {
+               if ( fake_host )
+               {
+                       char    *p = strchr(fake_host,':');
 
-       if ( fake_host )
-       {
-               char    *p = strchr(fake_host,':');
-
-               if (p)
+                       if (p)
+                       {
+                               *p = 0;
+                               fake_port = atoi(p+1);
+                       }
+                       else fake_port = atoi(port);
+               }
+               else
                {
-                       *p = 0;
-                       fake_port = atoi(p+1);
+                       char    buf[300];
+
+                       edg_wll_gss_gethostname(buf,sizeof buf);
+                       buf[sizeof buf - 1] = 0;
+                       fake_host = strdup(buf);
+                       fake_port = atoi(port); 
                }
-               else fake_port = atoi(port);
-       }
-       else
-       {
-               char    buf[300];
 
-               edg_wll_gss_gethostname(buf,sizeof buf);
-               buf[sizeof buf - 1] = 0;
-               fake_host = strdup(buf);
-               fake_port = atoi(port); 
+               dprintf(("Server address: %s:%d\n", fake_host, fake_port));
        }
+       if ((mode & SERVICE_SERVER)) {
+               service_table[SRV_SERVE].conn = socket(PF_INET, SOCK_STREAM, 0);
+               if ( service_table[SRV_SERVE].conn < 0 ) { perror("socket()"); return 1; }
+               a.sin_family = AF_INET;
+               a.sin_port = htons(atoi(port));
+               a.sin_addr.s_addr = INADDR_ANY;
+               setsockopt(service_table[SRV_SERVE].conn, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(one));
+               if ( bind(service_table[SRV_SERVE].conn, (struct sockaddr *) &a, sizeof(a)) )
+               { 
+                       char    buf[100];
+
+                       snprintf(buf,sizeof(buf),"bind(%d)",atoi(port));
+                       perror(buf);
+                       return 1;
+               }
+               if ( listen(service_table[SRV_SERVE].conn, CON_QUEUE) ) { perror("listen()"); return 1; }
+
+               service_table[SRV_STORE].conn = socket(PF_INET, SOCK_STREAM, 0);
+               if ( service_table[SRV_STORE].conn < 0) { perror("socket()"); return 1; }
+               a.sin_family = AF_INET;
+               a.sin_port = htons(atoi(port)+1);
+               a.sin_addr.s_addr = INADDR_ANY;
+               setsockopt(service_table[SRV_STORE].conn, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(one));
+               if ( bind(service_table[SRV_STORE].conn, (struct sockaddr *) &a, sizeof(a)))
+               {
+                       char    buf[100];
 
-       dprintf(("server address: %s:%d\n", fake_host, fake_port));
+                       snprintf(buf,sizeof(buf), "bind(%d)", atoi(port)+1);
+                       perror(buf);
+                       return 1;
+               }
+               if ( listen(service_table[SRV_STORE].conn, CON_QUEUE) ) { perror("listen()"); return 1; }
 
-       service_table[SRV_SERVE].conn = socket(PF_INET, SOCK_STREAM, 0);
-       if ( service_table[SRV_SERVE].conn < 0 ) { perror("socket()"); return 1; }
-       a.sin_family = AF_INET;
-       a.sin_port = htons(atoi(port));
-       a.sin_addr.s_addr = INADDR_ANY;
-       setsockopt(service_table[SRV_SERVE].conn, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(one));
-       if ( bind(service_table[SRV_SERVE].conn, (struct sockaddr *) &a, sizeof(a)) )
-       { 
-               char    buf[100];
+#ifdef GLITE_LB_SERVER_WITH_WS
+               service_table[SRV_WS].conn = socket(PF_INET, SOCK_STREAM, 0);
+               if ( service_table[SRV_WS].conn < 0) { perror("socket()"); return 1; }
+               a.sin_family = AF_INET;
+               a.sin_port = htons(atoi(ws_port));
+               a.sin_addr.s_addr = INADDR_ANY;
+               setsockopt(service_table[SRV_WS].conn, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(one));
+               if ( bind(service_table[SRV_WS].conn, (struct sockaddr *) &a, sizeof(a)))
+               {
+                       char    buf[100];
 
-               snprintf(buf,sizeof(buf),"bind(%d)",atoi(port));
-               perror(buf);
-               return 1;
-       }
-       if ( listen(service_table[SRV_SERVE].conn, CON_QUEUE) ) { perror("listen()"); return 1; }
-
-       service_table[SRV_STORE].conn = socket(PF_INET, SOCK_STREAM, 0);
-       if ( service_table[SRV_STORE].conn < 0) { perror("socket()"); return 1; }
-       a.sin_family = AF_INET;
-a.sin_port = htons(atoi(port)+1);
-a.sin_addr.s_addr = INADDR_ANY;
-       setsockopt(service_table[SRV_STORE].conn, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(one));
-       if ( bind(service_table[SRV_STORE].conn, (struct sockaddr *) &a, sizeof(a)))
-       {
-               char    buf[100];
+                       snprintf(buf, sizeof(buf), "bind(%d)", atoi(ws_port));
+                       perror(buf);
+                       return 1;
+               }
+               if ( listen(service_table[SRV_WS].conn, CON_QUEUE) ) { perror("listen()"); return 1; }
 
-               snprintf(buf,sizeof(buf), "bind(%d)", atoi(port)+1);
-               perror(buf);
-               return 1;
-       }
-       if ( listen(service_table[SRV_STORE].conn, CON_QUEUE) ) { perror("listen()"); return 1; }
+#endif /* GLITE_LB_SERVER_WITH_WS */
+
+               if (!server_cert || !server_key)
+                       fprintf(stderr, "%s: key or certificate file not specified"
+                                                       " - unable to watch them for changes!\n", argv[0]);
+
+               if ( cadir ) setenv("X509_CERT_DIR", cadir, 1);
+               edg_wll_gss_watch_creds(server_cert, &cert_mtime);
+               if ( !edg_wll_gss_acquire_cred_gsi(server_cert, server_key, &mycred, &mysubj, &gss_code) )
+               {
+                       int     i;
+
+                       dprintf(("Server identity: %s\n",mysubj));
+                       server_subject = strdup(mysubj);
+                       for ( i = 0; super_users && super_users[i]; i++ ) ;
+                       super_users = realloc(super_users, (i+2)*sizeof(*super_users));
+                       super_users[i] = mysubj;
+                       super_users[i+1] = NULL;
+               }
+               else {
+                       dprintf(("Server running unauthenticated\n"));
+                       server_subject = strdup("anonymous LB");
+               }
+
+               if ( noAuth ) dprintf(("Server in promiscuous mode\n"));
+               dprintf(("Server listening at %d,%d (accepting protocols: " COMP_PROTO " and compatible) ...\n",atoi(port),atoi(port)+1));
 
 #ifdef GLITE_LB_SERVER_WITH_WS
-       service_table[SRV_WS].conn = socket(PF_INET, SOCK_STREAM, 0);
-       if ( service_table[SRV_WS].conn < 0) { perror("socket()"); return 1; }
-       a.sin_family = AF_INET;
-       a.sin_port = htons(atoi(ws_port));
-       a.sin_addr.s_addr = INADDR_ANY;
-       setsockopt(service_table[SRV_WS].conn, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(one));
-       if ( bind(service_table[SRV_WS].conn, (struct sockaddr *) &a, sizeof(a)))
-       {
-               char    buf[100];
+               dprintf(("Server listening at %d (accepting web service protocol) ...\n", atoi(ws_port)));
+#endif /* GLITE_LB_SERVER_WITH_WS */
 
-               snprintf(buf, sizeof(buf), "bind(%d)", atoi(ws_port));
-               perror(buf);
-               return 1;
        }
-       if ( listen(service_table[SRV_WS].conn, CON_QUEUE) ) { perror("listen()"); return 1; }
+       if (mode & SERVICE_PROXY) {     /* proxy stuff */
+               struct sockaddr_un      a;
+
+               service_table[SRV_SERVE_PROXY].conn = socket(PF_UNIX, SOCK_STREAM, 0);
+               if ( service_table[SRV_SERVE_PROXY].conn < 0 ) { perror("socket()"); return 1; }
+               memset(&a, 0, sizeof(a));
+               a.sun_family = AF_UNIX;
+               sprintf(sock_serve, "%s%s", socket_path_prefix, "serve.sock");
+               strcpy(a.sun_path, sock_serve);
+
+               if( connect(service_table[SRV_SERVE_PROXY].conn, (struct sockaddr *)&a, sizeof(a.sun_path)) < 0) {
+                       if( errno == ECONNREFUSED ) {
+                               dprintf(("removing stale input socket %s\n", sock_serve));
+                               unlink(sock_serve);
+                       }
+               } else { perror("another instance of lb-proxy is running"); return 1; }
 
-#endif /* GLITE_LB_SERVER_WITH_WS */
+               if ( bind(service_table[SRV_SERVE_PROXY].conn, (struct sockaddr *) &a, sizeof(a)) < 0 ) {
+                       char    buf[100];
 
-       if (!server_cert || !server_key)
-               fprintf(stderr, "%s: key or certificate file not specified"
-                                               " - unable to watch them for changes!\n", argv[0]);
+                       snprintf(buf, sizeof(buf), "bind(%s)", sock_serve);
+                       perror(buf);
+                       return 1;
+               }
 
-       if ( cadir ) setenv("X509_CERT_DIR", cadir, 1);
-       edg_wll_gss_watch_creds(server_cert, &cert_mtime);
-       if ( !edg_wll_gss_acquire_cred_gsi(server_cert, server_key, &mycred, &mysubj, &gss_code) )
-       {
-               int     i;
-
-               dprintf(("Server identity: %s\n",mysubj));
-               server_subject = strdup(mysubj);
-               for ( i = 0; super_users && super_users[i]; i++ ) ;
-               super_users = realloc(super_users, (i+2)*sizeof(*super_users));
-               super_users[i] = mysubj;
-               super_users[i+1] = NULL;
-       }
-       else {
-               dprintf(("Running unauthenticated\n"));
-               server_subject = strdup("anonymous LB");
-       }
+               if ( listen(service_table[SRV_SERVE_PROXY].conn, con_queue) ) { perror("listen()"); return 1; }
 
-       if ( noAuth ) dprintf(("Promiscuous mode\n"));
-       dprintf(("Listening at %d,%d (accepting protocols: " COMP_PROTO " and compatible) ...\n",atoi(port),atoi(port)+1));
+               service_table[SRV_STORE_PROXY].conn = socket(PF_UNIX, SOCK_STREAM, 0);
+               if ( service_table[SRV_STORE_PROXY].conn < 0 ) { perror("socket()"); return 1; }
+               memset(&a, 0, sizeof(a));
+               a.sun_family = AF_UNIX;
+               sprintf(sock_store, "%s%s", socket_path_prefix, "store.sock");
+               strcpy(a.sun_path, sock_store);
 
-#ifdef GLITE_LB_SERVER_WITH_WS
-       dprintf(("Listening at %d (accepting web service protocol) ...\n", atoi(ws_port)));
-#endif /* GLITE_LB_SERVER_WITH_WS */
+               if( connect(service_table[SRV_STORE_PROXY].conn, (struct sockaddr *)&a, sizeof(a.sun_path)) < 0) {
+                       if( errno == ECONNREFUSED ) {
+                               dprintf(("removing stale input socket %s\n", sock_store));
+                               unlink(sock_store);
+                       }
+               } else { perror("another instance of lb-proxy is running"); return 1; }
+
+               if ( bind(service_table[SRV_STORE_PROXY].conn, (struct sockaddr *) &a, sizeof(a))) {
+                       char    buf[100];
+
+                       snprintf(buf, sizeof(buf), "bind(%s)", sock_store);
+                       perror(buf);
+                       return 1;
+               }
+               if ( listen(service_table[SRV_STORE_PROXY].conn, con_queue) ) { perror("listen()"); return 1; }
+
+               dprintf(("Proxy listening at %s, %s ...\n", sock_store, sock_serve));
+       }
 
        if (!dbstring) dbstring = getenv("LBDB");
        if (!dbstring) dbstring = strdup(DEFAULTCS);
+               
 
        /* Just check the database and let it be. The slaves do the job. */
        edg_wll_InitContext(&ctx);
@@ -619,19 +754,48 @@ a.sin_addr.s_addr = INADDR_ANY;
        glite_srvbones_set_param(GLITE_SBPARAM_SLAVE_OVERLOAD, SLAVE_OVERLOAD);
        glite_srvbones_set_param(GLITE_SBPARAM_SLAVE_CONNS_MAX, SLAVE_CONNS_MAX);
 
-       to = (struct timeval){CONNECT_TIMEOUT, 0};
-       glite_srvbones_set_param(GLITE_SBPARAM_CONNECT_TIMEOUT, &to);
-       to.tv_sec = request_timeout;
+       if (mode & SERVICE_SERVER) {
+               to = (struct timeval){CONNECT_TIMEOUT, 0};
+               glite_srvbones_set_param(GLITE_SBPARAM_CONNECT_TIMEOUT, &to);
+               to.tv_sec = request_timeout;
+       }
+       // proxy using default from srvbones, 5s
+
        glite_srvbones_set_param(GLITE_SBPARAM_REQUEST_TIMEOUT, &to);
        to = (struct timeval){IDLE_TIMEOUT, 0};
        glite_srvbones_set_param(GLITE_SBPARAM_IDLE_TIMEOUT, &to);
 
-       glite_srvbones_run(bk_clnt_data_init, service_table, sizofa(service_table), debug);
+       switch (mode) {
+               case SERVICE_PROXY:
+                       glite_srvbones_run(bk_clnt_data_init,service_table+SERVICE_PROXY_START,
+                               SERVICE_PROXY_SIZE, debug);
+                       break;
+               case SERVICE_SERVER:
+                       glite_srvbones_run(bk_clnt_data_init,service_table+SERVICE_SERVER_START,
+                               SERVICE_SERVER_SIZE, debug);
+                       break;
+               case SERVICE_PROXY_SERVER:
+                       glite_srvbones_run(bk_clnt_data_init,service_table+SERVICE_SERVER_START,
+                               SERVICE_PROXY_SIZE+SERVICE_SERVER_SIZE, debug);
+                       break;
+               default:
+                       assert(0);
+                       break;
+       }
 
 
        semctl(semset, 0, IPC_RMID, 0);
        unlink(pidfile);
-       free(port);
+
+       for ( i = 0; i < sizofa(service_table); i++ )
+               if ( service_table[i].conn >= 0 ) close(service_table[i].conn);
+
+       if (mode & SERVICE_PROXY) {
+               unlink(sock_serve);
+               unlink(sock_store);
+       }
+
+        if (port) free(port);
        edg_wll_gss_release_cred(&mycred, NULL);
 
 
@@ -650,6 +814,8 @@ int bk_clnt_data_init(void **data)
        if ( !(cdata = calloc(1, sizeof(*cdata))) )
                return -1;
 
+       cdata->mode = mode;
+
        if ( edg_wll_InitContext(&ctx) ) 
        {
                free(cdata);
@@ -657,7 +823,6 @@ int bk_clnt_data_init(void **data)
        }
 
        dprintf(("[%d] opening database ...\n", getpid()));
-       if ( !dbstring ) dbstring = getenv("LBDB");
        wait_for_open(ctx, dbstring);
        cdata->mysql = ctx->mysql;
        cdata->use_transactions = ctx->use_transactions;
@@ -720,7 +885,7 @@ int bk_clnt_data_init(void **data)
        return 0;
 }
 
-       
+
 /*
  *     Creates context (initializes it from global vatiables and data given
  *     from server_bones)
@@ -769,6 +934,8 @@ int bk_handle_connection(int conn, struct timeval *timeout, void *data)
 
        /* Shared structures (pointers)
         */
+       ctx->serverRunning = cdata->mode & SERVICE_SERVER;
+       ctx->proxyRunning = cdata->mode & SERVICE_PROXY;
        ctx->mysql = cdata->mysql;
        ctx->use_transactions = cdata->use_transactions;
        ctx->job_index_cols = cdata->job_index_cols;
@@ -1006,6 +1173,66 @@ err:
 }
 #endif /* GLITE_LB_SERVER_WITH_WS */
 
+
+int bk_handle_connection_proxy(int conn, struct timeval *timeout, void *data)
+{
+       struct clnt_data_t *cdata = (struct clnt_data_t *)data;
+       edg_wll_Context         ctx;
+       struct timeval          conn_start, now;
+
+        if ( edg_wll_InitContext(&ctx) ) {
+               dprintf(("Couldn't create context"));
+               return -1;
+       }
+       cdata->ctx = ctx;
+
+       /* Shared structures (pointers)
+        */
+       ctx->serverRunning = cdata->mode & SERVICE_SERVER;
+       ctx->proxyRunning = cdata->mode & SERVICE_PROXY;
+       ctx->mysql = cdata->mysql;
+       
+       /*      set globals
+        */
+       ctx->allowAnonymous = 1;
+       ctx->isProxy = 1;
+       ctx->noAuth = 1;
+       ctx->noIndex = 1;
+       ctx->semset = semset;
+       ctx->semaphores = semaphores;
+
+       ctx->srvName = strdup(host);
+       ctx->srvPort = atoi(port);
+       
+       ctx->connProxy = (edg_wll_ConnProxy *) calloc(1, sizeof(edg_wll_ConnProxy));
+       if ( !ctx->connProxy ) {
+               perror("calloc");
+               edg_wll_FreeContext(ctx);
+
+               return -1;
+       }
+
+       gettimeofday(&conn_start, 0);
+       if ( edg_wll_plain_accept(conn, &ctx->connProxy->conn) ) {
+               perror("accept");
+               edg_wll_FreeContext(ctx);
+
+               return -1;
+       } 
+
+       gettimeofday(&now, 0);
+       if ( decrement_timeout(timeout, conn_start, now) ) {
+               if (debug) fprintf(stderr, "edg_wll_plain_accept() timeout");
+               else syslog(LOG_ERR, "edg_wll_plain_accept(): timeout");
+
+               return -1;
+       }
+
+
+       return 0;
+}
+
+
 int bk_accept_store(int conn, struct timeval *timeout, void *cdata)
 {
        edg_wll_Context         ctx = ((struct clnt_data_t *) cdata)->ctx;
@@ -1074,6 +1301,7 @@ int bk_accept_store(int conn, struct timeval *timeout, void *cdata)
        return 0;
 }
 
+
 int bk_accept_serve(int conn, struct timeval *timeout, void *cdata)
 {
        edg_wll_Context         ctx = ((struct clnt_data_t *) cdata)->ctx;
@@ -1233,6 +1461,22 @@ int bk_ws_clnt_disconnect(int conn, struct timeval *timeout, void *cdata)
 }
 #endif /* GLITE_LB_SERVER_WITH_WS */
 
+
+int bk_clnt_disconnect_proxy(int conn, struct timeval *timeout, void *cdata)
+{
+       edg_wll_Context         ctx = ((struct clnt_data_t *) cdata)->ctx;
+
+       /* XXX: handle the timeout
+        */
+    if ( ctx->connProxy && ctx->connProxy->conn.sock >= 0 )
+               edg_wll_plain_close(&ctx->connProxy->conn);
+
+       edg_wll_FreeContext(ctx);
+       ctx = NULL;
+
+       return 0;
+}
+
 int bk_clnt_reject(int conn)
 {
        int             flags = fcntl(conn, F_GETFL, 0);
@@ -1252,6 +1496,11 @@ int bk_ws_clnt_reject(int conn)
 }
 #endif /* GLITE_LB_SERVER_WITH_WS */
 
+int bk_clnt_reject_proxy(int conn)
+{
+       return 0;
+}
+
 
 static void wait_for_open(edg_wll_Context ctx, const char *dbstring)
 {
index a94df0a..45e55ae 100644 (file)
@@ -265,6 +265,9 @@ static int store_job(edg_wll_Context ctx,const edg_wlc_JobId job,const char *use
        char *jobstr = edg_wlc_JobIdUnparse(job);
        char *jobid = edg_wlc_JobIdGetUnique(job);
        char *stmt;
+       char *srvName;
+       unsigned int    srvPort;
+       int proxy=0, server=0;
 
 /* debug Duplicate key on index: Duplicate entry '(nil)' for key 1
  */
@@ -272,8 +275,25 @@ static int store_job(edg_wll_Context ctx,const edg_wlc_JobId job,const char *use
                return edg_wll_SetError(ctx,EINVAL,"store_job()");
 
        edg_wll_ResetError(ctx);
-       trio_asprintf(&stmt,"insert into jobs(jobid,dg_jobid,userid) "
-               "values ('%|Ss','%|Ss','%|Ss')",jobid,jobstr,userid);
+
+       if (ctx->isProxy) {
+               proxy = 1;
+
+               /* If host&port in jobId match bkserver hostname and port and bkserver
+                * runs server service, mark the proxy job to belong to bkserver too    
+                */
+               edg_wlc_JobIdGetServerParts(job, &srvName, &srvPort);
+               if ( (ctx->serverRunning) && (ctx->srvPort == srvPort) && 
+                               !strcmp(ctx->srvName, srvName)) {
+                       server=1;
+               }
+       }
+       else {
+               server = 1;
+       }
+       
+       trio_asprintf(&stmt,"insert into jobs(jobid,dg_jobid,userid,proxy,bkserver) "
+               "values ('%|Ss','%|Ss','%|Ss', '%|Sd', '%|Sd')",jobid,jobstr,userid,proxy,server);
 
        if (edg_wll_ExecStmt(ctx,stmt,NULL) < 0) {
                if (edg_wll_Error(ctx,NULL,NULL) == EEXIST) 
index 1a1ce49..d2d4be6 100644 (file)
@@ -53,7 +53,17 @@ gss_reader(void *user_data, char *buffer, int max_len)
 }
 
 
-int edg_wll_StoreProto(edg_wll_Context ctx)
+// XXX: for easier merge with RC31_3
+//     after merge, it would be possible to glue
+//     edg_wll_StoreProtoProxy and edg_wll_StoreProtoServer together
+int edg_wll_StoreProto(edg_wll_Context ctx) 
+{
+       if (ctx->isProxy) return(edg_wll_StoreProtoProxy(ctx));
+       else return(edg_wll_StoreProtoServer(ctx));
+}
+
+
+int edg_wll_StoreProtoServer(edg_wll_Context ctx)
 {
        char    *buf;
        int     len,ret;