From: Miloš Mulač Date: Thu, 6 Sep 2007 11:52:41 +0000 (+0000) Subject: glued server+proxy X-Git-Tag: glite-yaim-myproxy_R_4_0_1_1~26 X-Git-Url: http://scientific.zcu.cz/git/?a=commitdiff_plain;h=4fe53608341ad360f88c72999d09e0dcc2d475bf;p=jra1mw.git glued server+proxy - compiles, but more work needs to be done to be fully funcional --- diff --git a/org.glite.lb.server/config/glite-lb-dbsetup.sql b/org.glite.lb.server/config/glite-lb-dbsetup.sql index 6e512be..a533ec1 100644 --- a/org.glite.lb.server/config/glite-lb-dbsetup.sql +++ b/org.glite.lb.server/config/glite-lb-dbsetup.sql @@ -3,6 +3,9 @@ create table jobs ( dg_jobid varchar(255) binary not null, userid char(32) binary not null, aclid char(32) binary null, + proxy bool not null, + server bool not null, + primary key (jobid), unique (dg_jobid), diff --git a/org.glite.lb.server/config/glite-lb-migrate_proxy_db2proxy_server_db b/org.glite.lb.server/config/glite-lb-migrate_proxy_db2proxy_server_db new file mode 100644 index 0000000..25fe402 --- /dev/null +++ b/org.glite.lb.server/config/glite-lb-migrate_proxy_db2proxy_server_db @@ -0,0 +1,19 @@ +# This script is intendent to be used to extend proxy database to +# to be able to hold both bkserver and lbproxy jobs. +# The operation should be non-destructive, i.e. all data should persist +# and continue to be fully usable. + +#!/bin/bash + + +# add columns for job membership (proxy/server) flags +mysql -u lbserver lbproxy -e "ALTER TABLE jobs ADD proxy bool not null" +mysql -u lbserver lbproxy -e "ALTER TABLE jobs ADD server bool not null" + +# flag all jobs as proxy jobs +mysql -u lbserver lbproxy -e "update jobs set proxy='1'" +mysql -u lbserver lbproxy -e "update jobs set server='0'" + +# something more clever should come here +# - rename DB to lbserver20 +# - or copy all fields to lbserver20 DB diff --git a/org.glite.lb.server/config/glite-lb-migrate_server_db2proxy_server_db b/org.glite.lb.server/config/glite-lb-migrate_server_db2proxy_server_db new file mode 100644 index 0000000..94cedda --- /dev/null +++ b/org.glite.lb.server/config/glite-lb-migrate_server_db2proxy_server_db @@ -0,0 +1,15 @@ +# This script is intendent to be used to extend bkserver database to +# to be able to hold both bkserver and lbproxy jobs. +# The operation should be non-destructive, i.e. all data should persist +# and continue to be fully usable. + +#!/bin/bash + + +# add columns for job membership (proxy/server) flags +mysql -u lbserver lbserver20 -e "ALTER TABLE jobs ADD proxy bool not null" +mysql -u lbserver lbserver20 -e "ALTER TABLE jobs ADD server bool not null" + +# flag all jobs as server jobs +mysql -u lbserver lbserver20 -e "update jobs set proxy='0'" +mysql -u lbserver lbserver20 -e "update jobs set server='1'" diff --git a/org.glite.lb.server/src/bkserverd.c b/org.glite.lb.server/src/bkserverd.c index 5a5caaa..2074fb6 100644 --- a/org.glite.lb.server/src/bkserverd.c +++ b/org.glite.lb.server/src/bkserverd.c @@ -12,6 +12,7 @@ #include #include #include +#include #include #include #include @@ -71,6 +72,11 @@ enum lb_srv_perf_sink sink_mode; extern int edg_wll_StoreProto(edg_wll_Context ctx); extern edg_wll_ErrorCode edg_wll_Open(edg_wll_Context ctx, char *cs); extern edg_wll_ErrorCode edg_wll_Close(edg_wll_Context); +extern int edg_wll_StoreProtoProxy(edg_wll_Context ctx); + +extern char *lbproxy_ilog_socket_path; +extern char *lbproxy_ilog_file_prefix; + #ifdef LB_PERF extern void _start (void), etext (void); @@ -102,12 +108,21 @@ extern void _start (void), etext (void); #define EDG_BKSERVERD_PIDFILE "/var/run/edg-bkserverd.pid" #endif +#ifndef GLITE_LBPROXY_SOCK_PREFIX +#define GLITE_LBPROXY_SOCK_PREFIX "/tmp/lb_proxy_" +#endif + #ifndef dprintf #define dprintf(x) { if (debug) printf x; } #endif #define sizofa(a) (sizeof(a)/sizeof((a)[0])) +#define SERVICE_PROXY 1 +#define SERVICE_SERVER 2 +#define SERVICE_PROXY_SERVER SERVICE_PROXY+SERVICE_SERVER + + int debug = 0; int rgma_export = 0; @@ -143,6 +158,14 @@ char *cadir = NULL, *vomsdir = NULL, *server_key = NULL, *server_cert = NULL; +static int mode = SERVICE_SERVER; +static char sock_store[PATH_MAX], + sock_serve[PATH_MAX]; +static int con_queue = CON_QUEUE; +static char host[300]; +static char * port; + + static struct option opts[] = { @@ -169,7 +192,7 @@ static struct option opts[] = { {"super-user", 1, NULL, 'R'}, {"super-users-file", 1, NULL,'F'}, {"no-index", 1, NULL, 'x'}, - {"strict-locking",0, NULL, 'P'}, + {"strict-locking",0, NULL, 'O'}, {"limits", 1, NULL, 'L'}, {"notif-dur", 1, NULL, 'N'}, {"notif-il-sock", 1, NULL, 'X'}, @@ -182,10 +205,16 @@ static struct option opts[] = { #endif {"transactions", 1, NULL, 'b'}, {"greyjobs", 0, NULL, 'g'}, + {"withproxy", 0, NULL, 'B'}, + {"proxyonly", 0, NULL, 'P'}, + {"sock", 0, NULL, 'o'}, + {"con-queue", 1, NULL, 'q'}, + {"proxy-il-sock", 1, NULL, 'W'}, + {"proxy-il-fprefix", 1, NULL, 'Z'}, {NULL,0,NULL,0} }; -static const char *get_opt_string = "a:c:k:C:V:p:drm:ns:l:L:N:i:S:D:X:Y:T:t:J:jzb:g" +static const char *get_opt_string = "c:k:C:V:p:a:drm:ns:l:i:S:D:J:jR:F:xOL:N:X:Y:T:t:zb:gPBo:q:W:Z:" #ifdef GLITE_LB_SERVER_WITH_WS "w:" #endif @@ -235,6 +264,12 @@ static void usage(char *me) "\t--perf-sink\t where to sink events\n" #endif "\t-g,--greyjobs\t allow delayed registration (grey jobs), implies --strict-locking\n" + "\t-P,--proxyonly\t run only proxy service\n" + "\t-B,--withproxy\t run both server and proxy service\n" + "\t-o,--sock\t path-name to the local socket for communication with LB proxy\n" + "\t-q,--con-queue\t size of the connection queue (accept)\n" + "\t-W,--proxy-il-sock\t socket to send events to\n" + "\t-Z,--proxy-il-fprefix\t file prefix for events\n" ,me); } @@ -272,17 +307,44 @@ int bk_ws_clnt_reject(int); int bk_ws_clnt_disconnect(int, struct timeval *, void *); #endif /*GLITE_LB_SERVER_WITH_WS */ -#define SRV_SERVE 0 -#define SRV_STORE 1 + /* + * Proxy handlers + */ +int bk_clnt_reject_proxy(int); +int bk_handle_connection_proxy(int, struct timeval *, void *); +int bk_clnt_disconnect_proxy(int, struct timeval *, void *); + + + #define SERVICE_SERVER_START 0 + #define SRV_SERVE 0 + #define SRV_STORE 1 #ifdef GLITE_LB_SERVER_WITH_WS -#define SRV_WS 2 + #define SRV_WS 2 + #define SERVICE_SERVER_SIZE 3 + + #define SERVICE_PROXY_START 3 + #define SRV_SERVE_PROXY 3 + #define SRV_STORE_PROXY 4 + #define SERVICE_PROXY_SIZE 2 +#else + #define SERVICE_SERVER_SIZE 2 + + #define SERVICE_PROXY_START 2 + #define SRV_SERVE_PROXY 2 + #define SRV_STORE_PROXY 3 + #define SERVICE_PROXY_SIZE 2 #endif /* GLITE_LB_SERVER_WITH_WS */ + + + static struct glite_srvbones_service service_table[] = { { "serve", -1, bk_handle_connection, bk_accept_serve, bk_clnt_reject, bk_clnt_disconnect }, { "store", -1, bk_handle_connection, bk_accept_store, bk_clnt_reject, bk_clnt_disconnect }, #ifdef GLITE_LB_SERVER_WITH_WS - { "WS", -1, bk_handle_ws_connection, bk_accept_ws, bk_ws_clnt_reject, bk_ws_clnt_disconnect } + { "WS", -1, bk_handle_ws_connection, bk_accept_ws, bk_ws_clnt_reject, bk_ws_clnt_disconnect }, #endif /* GLITE_LB_SERVER_WITH_WS */ + { "serve_proxy", -1, bk_handle_connection_proxy, bk_accept_serve, bk_clnt_reject_proxy, bk_clnt_disconnect_proxy }, + { "store_proxy", -1, bk_handle_connection_proxy, bk_accept_store, bk_clnt_reject_proxy, bk_clnt_disconnect_proxy } }; struct clnt_data_t { @@ -294,6 +356,7 @@ struct clnt_data_t { void *mysql; edg_wll_QueryRec **job_index; edg_wll_IColumnRec *job_index_cols; + int mode; }; @@ -306,7 +369,6 @@ int main(int argc, char *argv[]) char *mysubj = NULL; int opt; char pidfile[PATH_MAX] = EDG_BKSERVERD_PIDFILE, - *port, *name; #ifdef GLITE_LB_SERVER_WITH_WS char *ws_port; @@ -318,6 +380,7 @@ int main(int argc, char *argv[]) struct timeval to; int request_timeout = REQUEST_TIMEOUT; int silent = 0; + char socket_path_prefix[PATH_MAX] = GLITE_LBPROXY_SOCK_PREFIX; /* keep this at start of main() ! */ @@ -339,9 +402,6 @@ int main(int argc, char *argv[]) purge_timeout[EDG_WLL_JOB_ABORTED] = 60*60*24*7; purge_timeout[EDG_WLL_JOB_CANCELLED] = 60*60*24*7; - if (geteuid()) snprintf(pidfile,sizeof pidfile,"%s/edg-bkserverd.pid", - getenv("HOME")); - while ((opt = getopt_long(argc,argv,get_opt_string,opts,NULL)) != EOF) switch (opt) { case 'a': fake_host = strdup(optarg); break; case 'b': transactions = atoi(optarg); break; @@ -394,7 +454,7 @@ int main(int argc, char *argv[]) case 'x': noIndex = atoi(optarg); if (noIndex < 0 || noIndex > 2) { usage(name); return 1; } break; - case 'P': strict_locking = 1; + case 'O': strict_locking = 1; break; case 'T': count_statistics = atoi(optarg); break; @@ -406,6 +466,18 @@ int main(int argc, char *argv[]) #endif case 'g': greyjobs = strict_locking = 1; break; + case 'P': mode = SERVICE_PROXY; + break; + case 'B': mode = SERVICE_PROXY_SERVER;; + break; + case 'o': strcpy(socket_path_prefix, optarg); + break; + case 'q': con_queue = atoi(optarg); + break; + case 'W': lbproxy_ilog_socket_path = strdup(optarg); + break; + case 'Z': lbproxy_ilog_file_prefix = strdup(optarg); + break; case '?': usage(name); return 1; } @@ -414,6 +486,11 @@ int main(int argc, char *argv[]) setlinebuf(stdout); setlinebuf(stderr); + if (mode & SERVICE_PROXY) dprintf(("\nStaring LB proxy service\n")); + if (mode & SERVICE_SERVER) dprintf(("\nStaring LB server service\n")); + + if (geteuid()) snprintf(pidfile,sizeof pidfile, "%s/edg-bkserverd.pid", getenv("HOME")); + fpid = fopen(pidfile,"r"); if ( fpid ) { @@ -438,17 +515,18 @@ int main(int argc, char *argv[]) semkey = ftok(pidfile,0); - if (check_mkdir(dumpStorage)) exit(1); - if (check_mkdir(purgeStorage)) exit(1); - if ( jpreg ) { - if ( edg_wll_MaildirInit(jpregDir) ) { - dprintf(("[%d] edg_wll_MaildirInit failed: %s\n", getpid(), lbm_errdesc)); - if (!debug) syslog(LOG_CRIT, "edg_wll_MaildirInit failed: %s", lbm_errdesc); - exit(1); + if (mode & SERVICE_SERVER) { + if (check_mkdir(dumpStorage)) exit(1); + if (check_mkdir(purgeStorage)) exit(1); + if ( jpreg ) { + if ( edg_wll_MaildirInit(jpregDir) ) { + dprintf(("[%d] edg_wll_MaildirInit failed: %s\n", getpid(), lbm_errdesc)); + if (!debug) syslog(LOG_CRIT, "edg_wll_MaildirInit failed: %s", lbm_errdesc); + exit(1); + } } } - if (semaphores == -1) semaphores = slaves; semset = semget(semkey, 0, 0); if (semset >= 0) semctl(semset, 0, IPC_RMID); @@ -462,112 +540,169 @@ int main(int argc, char *argv[]) s.sem_num = i; s.sem_op = 1; s.sem_flg = 0; if (semop(semset,&s,1) == -1) { perror("semop()"); return 1; } } + + if (mode & SERVICE_SERVER) { + if ( fake_host ) + { + char *p = strchr(fake_host,':'); - if ( fake_host ) - { - char *p = strchr(fake_host,':'); - - if (p) + if (p) + { + *p = 0; + fake_port = atoi(p+1); + } + else fake_port = atoi(port); + } + else { - *p = 0; - fake_port = atoi(p+1); + char buf[300]; + + edg_wll_gss_gethostname(buf,sizeof buf); + buf[sizeof buf - 1] = 0; + fake_host = strdup(buf); + fake_port = atoi(port); } - else fake_port = atoi(port); - } - else - { - char buf[300]; - edg_wll_gss_gethostname(buf,sizeof buf); - buf[sizeof buf - 1] = 0; - fake_host = strdup(buf); - fake_port = atoi(port); + dprintf(("Server address: %s:%d\n", fake_host, fake_port)); } + if ((mode & SERVICE_SERVER)) { + service_table[SRV_SERVE].conn = socket(PF_INET, SOCK_STREAM, 0); + if ( service_table[SRV_SERVE].conn < 0 ) { perror("socket()"); return 1; } + a.sin_family = AF_INET; + a.sin_port = htons(atoi(port)); + a.sin_addr.s_addr = INADDR_ANY; + setsockopt(service_table[SRV_SERVE].conn, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(one)); + if ( bind(service_table[SRV_SERVE].conn, (struct sockaddr *) &a, sizeof(a)) ) + { + char buf[100]; + + snprintf(buf,sizeof(buf),"bind(%d)",atoi(port)); + perror(buf); + return 1; + } + if ( listen(service_table[SRV_SERVE].conn, CON_QUEUE) ) { perror("listen()"); return 1; } + + service_table[SRV_STORE].conn = socket(PF_INET, SOCK_STREAM, 0); + if ( service_table[SRV_STORE].conn < 0) { perror("socket()"); return 1; } + a.sin_family = AF_INET; + a.sin_port = htons(atoi(port)+1); + a.sin_addr.s_addr = INADDR_ANY; + setsockopt(service_table[SRV_STORE].conn, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(one)); + if ( bind(service_table[SRV_STORE].conn, (struct sockaddr *) &a, sizeof(a))) + { + char buf[100]; - dprintf(("server address: %s:%d\n", fake_host, fake_port)); + snprintf(buf,sizeof(buf), "bind(%d)", atoi(port)+1); + perror(buf); + return 1; + } + if ( listen(service_table[SRV_STORE].conn, CON_QUEUE) ) { perror("listen()"); return 1; } - service_table[SRV_SERVE].conn = socket(PF_INET, SOCK_STREAM, 0); - if ( service_table[SRV_SERVE].conn < 0 ) { perror("socket()"); return 1; } - a.sin_family = AF_INET; - a.sin_port = htons(atoi(port)); - a.sin_addr.s_addr = INADDR_ANY; - setsockopt(service_table[SRV_SERVE].conn, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(one)); - if ( bind(service_table[SRV_SERVE].conn, (struct sockaddr *) &a, sizeof(a)) ) - { - char buf[100]; +#ifdef GLITE_LB_SERVER_WITH_WS + service_table[SRV_WS].conn = socket(PF_INET, SOCK_STREAM, 0); + if ( service_table[SRV_WS].conn < 0) { perror("socket()"); return 1; } + a.sin_family = AF_INET; + a.sin_port = htons(atoi(ws_port)); + a.sin_addr.s_addr = INADDR_ANY; + setsockopt(service_table[SRV_WS].conn, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(one)); + if ( bind(service_table[SRV_WS].conn, (struct sockaddr *) &a, sizeof(a))) + { + char buf[100]; - snprintf(buf,sizeof(buf),"bind(%d)",atoi(port)); - perror(buf); - return 1; - } - if ( listen(service_table[SRV_SERVE].conn, CON_QUEUE) ) { perror("listen()"); return 1; } - - service_table[SRV_STORE].conn = socket(PF_INET, SOCK_STREAM, 0); - if ( service_table[SRV_STORE].conn < 0) { perror("socket()"); return 1; } - a.sin_family = AF_INET; -a.sin_port = htons(atoi(port)+1); -a.sin_addr.s_addr = INADDR_ANY; - setsockopt(service_table[SRV_STORE].conn, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(one)); - if ( bind(service_table[SRV_STORE].conn, (struct sockaddr *) &a, sizeof(a))) - { - char buf[100]; + snprintf(buf, sizeof(buf), "bind(%d)", atoi(ws_port)); + perror(buf); + return 1; + } + if ( listen(service_table[SRV_WS].conn, CON_QUEUE) ) { perror("listen()"); return 1; } - snprintf(buf,sizeof(buf), "bind(%d)", atoi(port)+1); - perror(buf); - return 1; - } - if ( listen(service_table[SRV_STORE].conn, CON_QUEUE) ) { perror("listen()"); return 1; } +#endif /* GLITE_LB_SERVER_WITH_WS */ + + if (!server_cert || !server_key) + fprintf(stderr, "%s: key or certificate file not specified" + " - unable to watch them for changes!\n", argv[0]); + + if ( cadir ) setenv("X509_CERT_DIR", cadir, 1); + edg_wll_gss_watch_creds(server_cert, &cert_mtime); + if ( !edg_wll_gss_acquire_cred_gsi(server_cert, server_key, &mycred, &mysubj, &gss_code) ) + { + int i; + + dprintf(("Server identity: %s\n",mysubj)); + server_subject = strdup(mysubj); + for ( i = 0; super_users && super_users[i]; i++ ) ; + super_users = realloc(super_users, (i+2)*sizeof(*super_users)); + super_users[i] = mysubj; + super_users[i+1] = NULL; + } + else { + dprintf(("Server running unauthenticated\n")); + server_subject = strdup("anonymous LB"); + } + + if ( noAuth ) dprintf(("Server in promiscuous mode\n")); + dprintf(("Server listening at %d,%d (accepting protocols: " COMP_PROTO " and compatible) ...\n",atoi(port),atoi(port)+1)); #ifdef GLITE_LB_SERVER_WITH_WS - service_table[SRV_WS].conn = socket(PF_INET, SOCK_STREAM, 0); - if ( service_table[SRV_WS].conn < 0) { perror("socket()"); return 1; } - a.sin_family = AF_INET; - a.sin_port = htons(atoi(ws_port)); - a.sin_addr.s_addr = INADDR_ANY; - setsockopt(service_table[SRV_WS].conn, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(one)); - if ( bind(service_table[SRV_WS].conn, (struct sockaddr *) &a, sizeof(a))) - { - char buf[100]; + dprintf(("Server listening at %d (accepting web service protocol) ...\n", atoi(ws_port))); +#endif /* GLITE_LB_SERVER_WITH_WS */ - snprintf(buf, sizeof(buf), "bind(%d)", atoi(ws_port)); - perror(buf); - return 1; } - if ( listen(service_table[SRV_WS].conn, CON_QUEUE) ) { perror("listen()"); return 1; } + if (mode & SERVICE_PROXY) { /* proxy stuff */ + struct sockaddr_un a; + + service_table[SRV_SERVE_PROXY].conn = socket(PF_UNIX, SOCK_STREAM, 0); + if ( service_table[SRV_SERVE_PROXY].conn < 0 ) { perror("socket()"); return 1; } + memset(&a, 0, sizeof(a)); + a.sun_family = AF_UNIX; + sprintf(sock_serve, "%s%s", socket_path_prefix, "serve.sock"); + strcpy(a.sun_path, sock_serve); + + if( connect(service_table[SRV_SERVE_PROXY].conn, (struct sockaddr *)&a, sizeof(a.sun_path)) < 0) { + if( errno == ECONNREFUSED ) { + dprintf(("removing stale input socket %s\n", sock_serve)); + unlink(sock_serve); + } + } else { perror("another instance of lb-proxy is running"); return 1; } -#endif /* GLITE_LB_SERVER_WITH_WS */ + if ( bind(service_table[SRV_SERVE_PROXY].conn, (struct sockaddr *) &a, sizeof(a)) < 0 ) { + char buf[100]; - if (!server_cert || !server_key) - fprintf(stderr, "%s: key or certificate file not specified" - " - unable to watch them for changes!\n", argv[0]); + snprintf(buf, sizeof(buf), "bind(%s)", sock_serve); + perror(buf); + return 1; + } - if ( cadir ) setenv("X509_CERT_DIR", cadir, 1); - edg_wll_gss_watch_creds(server_cert, &cert_mtime); - if ( !edg_wll_gss_acquire_cred_gsi(server_cert, server_key, &mycred, &mysubj, &gss_code) ) - { - int i; - - dprintf(("Server identity: %s\n",mysubj)); - server_subject = strdup(mysubj); - for ( i = 0; super_users && super_users[i]; i++ ) ; - super_users = realloc(super_users, (i+2)*sizeof(*super_users)); - super_users[i] = mysubj; - super_users[i+1] = NULL; - } - else { - dprintf(("Running unauthenticated\n")); - server_subject = strdup("anonymous LB"); - } + if ( listen(service_table[SRV_SERVE_PROXY].conn, con_queue) ) { perror("listen()"); return 1; } - if ( noAuth ) dprintf(("Promiscuous mode\n")); - dprintf(("Listening at %d,%d (accepting protocols: " COMP_PROTO " and compatible) ...\n",atoi(port),atoi(port)+1)); + service_table[SRV_STORE_PROXY].conn = socket(PF_UNIX, SOCK_STREAM, 0); + if ( service_table[SRV_STORE_PROXY].conn < 0 ) { perror("socket()"); return 1; } + memset(&a, 0, sizeof(a)); + a.sun_family = AF_UNIX; + sprintf(sock_store, "%s%s", socket_path_prefix, "store.sock"); + strcpy(a.sun_path, sock_store); -#ifdef GLITE_LB_SERVER_WITH_WS - dprintf(("Listening at %d (accepting web service protocol) ...\n", atoi(ws_port))); -#endif /* GLITE_LB_SERVER_WITH_WS */ + if( connect(service_table[SRV_STORE_PROXY].conn, (struct sockaddr *)&a, sizeof(a.sun_path)) < 0) { + if( errno == ECONNREFUSED ) { + dprintf(("removing stale input socket %s\n", sock_store)); + unlink(sock_store); + } + } else { perror("another instance of lb-proxy is running"); return 1; } + + if ( bind(service_table[SRV_STORE_PROXY].conn, (struct sockaddr *) &a, sizeof(a))) { + char buf[100]; + + snprintf(buf, sizeof(buf), "bind(%s)", sock_store); + perror(buf); + return 1; + } + if ( listen(service_table[SRV_STORE_PROXY].conn, con_queue) ) { perror("listen()"); return 1; } + + dprintf(("Proxy listening at %s, %s ...\n", sock_store, sock_serve)); + } if (!dbstring) dbstring = getenv("LBDB"); if (!dbstring) dbstring = strdup(DEFAULTCS); + /* Just check the database and let it be. The slaves do the job. */ edg_wll_InitContext(&ctx); @@ -619,19 +754,48 @@ a.sin_addr.s_addr = INADDR_ANY; glite_srvbones_set_param(GLITE_SBPARAM_SLAVE_OVERLOAD, SLAVE_OVERLOAD); glite_srvbones_set_param(GLITE_SBPARAM_SLAVE_CONNS_MAX, SLAVE_CONNS_MAX); - to = (struct timeval){CONNECT_TIMEOUT, 0}; - glite_srvbones_set_param(GLITE_SBPARAM_CONNECT_TIMEOUT, &to); - to.tv_sec = request_timeout; + if (mode & SERVICE_SERVER) { + to = (struct timeval){CONNECT_TIMEOUT, 0}; + glite_srvbones_set_param(GLITE_SBPARAM_CONNECT_TIMEOUT, &to); + to.tv_sec = request_timeout; + } + // proxy using default from srvbones, 5s + glite_srvbones_set_param(GLITE_SBPARAM_REQUEST_TIMEOUT, &to); to = (struct timeval){IDLE_TIMEOUT, 0}; glite_srvbones_set_param(GLITE_SBPARAM_IDLE_TIMEOUT, &to); - glite_srvbones_run(bk_clnt_data_init, service_table, sizofa(service_table), debug); + switch (mode) { + case SERVICE_PROXY: + glite_srvbones_run(bk_clnt_data_init,service_table+SERVICE_PROXY_START, + SERVICE_PROXY_SIZE, debug); + break; + case SERVICE_SERVER: + glite_srvbones_run(bk_clnt_data_init,service_table+SERVICE_SERVER_START, + SERVICE_SERVER_SIZE, debug); + break; + case SERVICE_PROXY_SERVER: + glite_srvbones_run(bk_clnt_data_init,service_table+SERVICE_SERVER_START, + SERVICE_PROXY_SIZE+SERVICE_SERVER_SIZE, debug); + break; + default: + assert(0); + break; + } semctl(semset, 0, IPC_RMID, 0); unlink(pidfile); - free(port); + + for ( i = 0; i < sizofa(service_table); i++ ) + if ( service_table[i].conn >= 0 ) close(service_table[i].conn); + + if (mode & SERVICE_PROXY) { + unlink(sock_serve); + unlink(sock_store); + } + + if (port) free(port); edg_wll_gss_release_cred(&mycred, NULL); @@ -650,6 +814,8 @@ int bk_clnt_data_init(void **data) if ( !(cdata = calloc(1, sizeof(*cdata))) ) return -1; + cdata->mode = mode; + if ( edg_wll_InitContext(&ctx) ) { free(cdata); @@ -657,7 +823,6 @@ int bk_clnt_data_init(void **data) } dprintf(("[%d] opening database ...\n", getpid())); - if ( !dbstring ) dbstring = getenv("LBDB"); wait_for_open(ctx, dbstring); cdata->mysql = ctx->mysql; cdata->use_transactions = ctx->use_transactions; @@ -720,7 +885,7 @@ int bk_clnt_data_init(void **data) return 0; } - + /* * Creates context (initializes it from global vatiables and data given * from server_bones) @@ -769,6 +934,8 @@ int bk_handle_connection(int conn, struct timeval *timeout, void *data) /* Shared structures (pointers) */ + ctx->serverRunning = cdata->mode & SERVICE_SERVER; + ctx->proxyRunning = cdata->mode & SERVICE_PROXY; ctx->mysql = cdata->mysql; ctx->use_transactions = cdata->use_transactions; ctx->job_index_cols = cdata->job_index_cols; @@ -1006,6 +1173,66 @@ err: } #endif /* GLITE_LB_SERVER_WITH_WS */ + +int bk_handle_connection_proxy(int conn, struct timeval *timeout, void *data) +{ + struct clnt_data_t *cdata = (struct clnt_data_t *)data; + edg_wll_Context ctx; + struct timeval conn_start, now; + + if ( edg_wll_InitContext(&ctx) ) { + dprintf(("Couldn't create context")); + return -1; + } + cdata->ctx = ctx; + + /* Shared structures (pointers) + */ + ctx->serverRunning = cdata->mode & SERVICE_SERVER; + ctx->proxyRunning = cdata->mode & SERVICE_PROXY; + ctx->mysql = cdata->mysql; + + /* set globals + */ + ctx->allowAnonymous = 1; + ctx->isProxy = 1; + ctx->noAuth = 1; + ctx->noIndex = 1; + ctx->semset = semset; + ctx->semaphores = semaphores; + + ctx->srvName = strdup(host); + ctx->srvPort = atoi(port); + + ctx->connProxy = (edg_wll_ConnProxy *) calloc(1, sizeof(edg_wll_ConnProxy)); + if ( !ctx->connProxy ) { + perror("calloc"); + edg_wll_FreeContext(ctx); + + return -1; + } + + gettimeofday(&conn_start, 0); + if ( edg_wll_plain_accept(conn, &ctx->connProxy->conn) ) { + perror("accept"); + edg_wll_FreeContext(ctx); + + return -1; + } + + gettimeofday(&now, 0); + if ( decrement_timeout(timeout, conn_start, now) ) { + if (debug) fprintf(stderr, "edg_wll_plain_accept() timeout"); + else syslog(LOG_ERR, "edg_wll_plain_accept(): timeout"); + + return -1; + } + + + return 0; +} + + int bk_accept_store(int conn, struct timeval *timeout, void *cdata) { edg_wll_Context ctx = ((struct clnt_data_t *) cdata)->ctx; @@ -1074,6 +1301,7 @@ int bk_accept_store(int conn, struct timeval *timeout, void *cdata) return 0; } + int bk_accept_serve(int conn, struct timeval *timeout, void *cdata) { edg_wll_Context ctx = ((struct clnt_data_t *) cdata)->ctx; @@ -1233,6 +1461,22 @@ int bk_ws_clnt_disconnect(int conn, struct timeval *timeout, void *cdata) } #endif /* GLITE_LB_SERVER_WITH_WS */ + +int bk_clnt_disconnect_proxy(int conn, struct timeval *timeout, void *cdata) +{ + edg_wll_Context ctx = ((struct clnt_data_t *) cdata)->ctx; + + /* XXX: handle the timeout + */ + if ( ctx->connProxy && ctx->connProxy->conn.sock >= 0 ) + edg_wll_plain_close(&ctx->connProxy->conn); + + edg_wll_FreeContext(ctx); + ctx = NULL; + + return 0; +} + int bk_clnt_reject(int conn) { int flags = fcntl(conn, F_GETFL, 0); @@ -1252,6 +1496,11 @@ int bk_ws_clnt_reject(int conn) } #endif /* GLITE_LB_SERVER_WITH_WS */ +int bk_clnt_reject_proxy(int conn) +{ + return 0; +} + static void wait_for_open(edg_wll_Context ctx, const char *dbstring) { diff --git a/org.glite.lb.server/src/store.c.T b/org.glite.lb.server/src/store.c.T index a94df0a..45e55ae 100644 --- a/org.glite.lb.server/src/store.c.T +++ b/org.glite.lb.server/src/store.c.T @@ -265,6 +265,9 @@ static int store_job(edg_wll_Context ctx,const edg_wlc_JobId job,const char *use char *jobstr = edg_wlc_JobIdUnparse(job); char *jobid = edg_wlc_JobIdGetUnique(job); char *stmt; + char *srvName; + unsigned int srvPort; + int proxy=0, server=0; /* debug Duplicate key on index: Duplicate entry '(nil)' for key 1 */ @@ -272,8 +275,25 @@ static int store_job(edg_wll_Context ctx,const edg_wlc_JobId job,const char *use return edg_wll_SetError(ctx,EINVAL,"store_job()"); edg_wll_ResetError(ctx); - trio_asprintf(&stmt,"insert into jobs(jobid,dg_jobid,userid) " - "values ('%|Ss','%|Ss','%|Ss')",jobid,jobstr,userid); + + if (ctx->isProxy) { + proxy = 1; + + /* If host&port in jobId match bkserver hostname and port and bkserver + * runs server service, mark the proxy job to belong to bkserver too + */ + edg_wlc_JobIdGetServerParts(job, &srvName, &srvPort); + if ( (ctx->serverRunning) && (ctx->srvPort == srvPort) && + !strcmp(ctx->srvName, srvName)) { + server=1; + } + } + else { + server = 1; + } + + trio_asprintf(&stmt,"insert into jobs(jobid,dg_jobid,userid,proxy,bkserver) " + "values ('%|Ss','%|Ss','%|Ss', '%|Sd', '%|Sd')",jobid,jobstr,userid,proxy,server); if (edg_wll_ExecStmt(ctx,stmt,NULL) < 0) { if (edg_wll_Error(ctx,NULL,NULL) == EEXIST) diff --git a/org.glite.lb.server/src/stored_master.c b/org.glite.lb.server/src/stored_master.c index 1a1ce49..d2d4be6 100644 --- a/org.glite.lb.server/src/stored_master.c +++ b/org.glite.lb.server/src/stored_master.c @@ -53,7 +53,17 @@ gss_reader(void *user_data, char *buffer, int max_len) } -int edg_wll_StoreProto(edg_wll_Context ctx) +// XXX: for easier merge with RC31_3 +// after merge, it would be possible to glue +// edg_wll_StoreProtoProxy and edg_wll_StoreProtoServer together +int edg_wll_StoreProto(edg_wll_Context ctx) +{ + if (ctx->isProxy) return(edg_wll_StoreProtoProxy(ctx)); + else return(edg_wll_StoreProtoServer(ctx)); +} + + +int edg_wll_StoreProtoServer(edg_wll_Context ctx) { char *buf; int len,ret;