From b6730042b842ffc443cbea63abe6d555d96ce441 Mon Sep 17 00:00:00 2001 From: =?utf8?q?Ji=C5=99=C3=AD=20=C5=A0kr=C3=A1bal?= Date: Tue, 9 Nov 2004 11:02:57 +0000 Subject: [PATCH] - LB Proxy - the first shot (not working) --- org.glite.lb.server/Makefile | 12 + org.glite.lb.server/src/db_store.c | 2 +- org.glite.lb.server/src/il_lbproxy.c | 372 ++++++++++++++++++++++ org.glite.lb.server/src/il_lbproxy.h | 19 ++ org.glite.lb.server/src/lb_http.c | 10 +- org.glite.lb.server/src/lb_http.h | 1 + org.glite.lb.server/src/lbproxy.c | 537 ++++++++++++++++++++++++++++++++ org.glite.lb.server/src/stored_master.c | 38 +++ 8 files changed, 988 insertions(+), 3 deletions(-) create mode 100644 org.glite.lb.server/src/il_lbproxy.c create mode 100644 org.glite.lb.server/src/il_lbproxy.h create mode 100644 org.glite.lb.server/src/lbproxy.c diff --git a/org.glite.lb.server/Makefile b/org.glite.lb.server/Makefile index b7ba4a9..1525add 100644 --- a/org.glite.lb.server/Makefile +++ b/org.glite.lb.server/Makefile @@ -88,6 +88,8 @@ SRVBONES_LIB:= -L${stagedir}/lib -lglite_lb_server_bones COMMON_LIB:= -L${stagedir}/lib -lglite_lb_common_${nothrflavour} +LB_PROXY_COMMON_LIB:= -L${stagedir}/lib -lglite_lb_common_${nothrflavour} + ifeq ($(GLITE_LB_SERVER_WITH_WS),yes) SERVER_SOAP_OBJS:=${GSOAP_FILES_PREFIX}C.o ${GSOAP_FILES_PREFIX}Server.o \ ws_plugin.o ws_query.o ws_fault.o ws_typeref.o @@ -97,6 +99,13 @@ else SERVER_WS_LIBS= endif +LB_PROXY_OBJS:= lbproxy.o il_lbproxy.o get_events.o index.o jobstat.o jobstat_supp.o \ + write2rgma.o lbs_db.o lb_html.o lb_http.o lb_proto.o lb_xml_parse.o \ + lb_xml_parse_V21.o \ + lock.o openserver.o query.o userjobs.o db_store.o request.o store.o \ + stored_master.o srv_purge.o server_state.o dump.o lb_authz.o load.o \ + notification.o il_notification.o notif_match.o + SERVER_OBJS:= bkserverd.o get_events.o index.o jobstat.o jobstat_supp.o \ write2rgma.o lbs_db.o lb_html.o lb_http.o lb_proto.o lb_xml_parse.o \ lb_xml_parse_V21.o \ @@ -112,6 +121,9 @@ WS_CLIENT_OBJS:= stdsoap2_patched.o test_ws_plugin.o $(GSOAP_FILES_PREFIX)C.o $( WS_CLIENT_LIBS:= -L${stagedir}/lib -lglite_lb_client_${nothrflavour} +glite_lb_proxy: ${LB_PROXY_OBJS} + ${LINK} -o $@ ${LB_PROXY_OBJS} ${COMMON_LIB} ${SRVBONES_LIB} ${EXT_LIBS} + glite_lb_bkserverd: ${SERVER_OBJS} ${SERVER_SOAP_OBJS} ${LINK} -o $@ ${SERVER_OBJS} ${SERVER_SOAP_OBJS} ${COMMON_LIB} ${SRVBONES_LIB} ${EXT_LIBS} ${SERVER_WS_LIBS} diff --git a/org.glite.lb.server/src/db_store.c b/org.glite.lb.server/src/db_store.c index 6ff49cc..b4a1c1c 100644 --- a/org.glite.lb.server/src/db_store.c +++ b/org.glite.lb.server/src/db_store.c @@ -70,7 +70,7 @@ db_store(edg_wll_Context ctx,char *ucs, char *event) if (edg_wll_UnlockJob(ctx,ev->any.jobId)) goto err; if (err) goto err; - if (newstat.state) { + if (!ctx->isProxy && newstat.state) { edg_wll_NotifMatch(ctx,&newstat); edg_wll_FreeStatus(&newstat); } diff --git a/org.glite.lb.server/src/il_lbproxy.c b/org.glite.lb.server/src/il_lbproxy.c new file mode 100644 index 0000000..053ec33 --- /dev/null +++ b/org.glite.lb.server/src/il_lbproxy.c @@ -0,0 +1,372 @@ +/** + * il_lbproxy.c + * - implementation of IL API calls for LB proxy + * + */ +#include +#include +#include +#include +#include +#include +#include +#include + +#include "glite/lb/context-int.h" +#include "glite/lb/events_parse.h" +#include "glite/lb/escape.h" +#include "glite/lb/log_proto.h" + +#include "il_lbproxy.h" +#include "lb_xml_parse.h" + + + +#define FCNTL_ATTEMPTS 5 +#define FCNTL_TIMEOUT 1 +#define FILE_PREFIX "/tmp/lbproxy_events" +#define DEFAULT_SOCKET "/tmp/lbproxy_interlogger.sock" + +char *lbproxy_ilog_socket_path = DEFAULT_SOCKET; +char *lbproxy_ilog_file_prefix = FILE_PREFIX; + +#define tv_sub(a,b) {\ + (a).tv_usec -= (b).tv_usec;\ + (a).tv_sec -= (b).tv_sec;\ + if ((a).tv_usec < 0) {\ + (a).tv_sec--;\ + (a).tv_usec += 1000000;\ + }\ +} + +int edg_wll_SendEventProxy(edg_wll_Context ctx, const char *owner) +{ + return 0; +} + +static +int +notif_create_ulm( + edg_wll_Context context, + edg_wll_NotifId reg_id, + const char *host, + const uint16_t port, + const char *owner, + const char *notif_data, + char **ulm_data, + char **reg_id_s) +{ + int ret; + edg_wll_Event *event=NULL; + + *ulm_data = NULL; + *reg_id_s = NULL; + + event = edg_wll_InitEvent(EDG_WLL_EVENT_NOTIFICATION); + + gettimeofday(&event->any.timestamp,0); + if (context->p_host) event->any.host = strdup(context->p_host); + event->any.level = context->p_level; + event->any.source = context->p_source; + if (context->p_instance) event->notification.src_instance = strdup(context->p_instance); + event->notification.notifId = reg_id; + if (owner) event->notification.owner = strdup(owner); + if (host) event->notification.dest_host = strdup(host); + event->notification.dest_port = port; + if (notif_data) event->notification.jobstat = strdup(notif_data); + + if ((*ulm_data = edg_wll_UnparseNotifEvent(context,event)) == NULL) { + edg_wll_SetError(context, ret = ENOMEM, "edg_wll_UnparseNotifEvent()"); + goto out; + } + + if((*reg_id_s = edg_wll_NotifIdGetUnique(reg_id)) == NULL) { + edg_wll_SetError(context, ret = ENOMEM, "edg_wll_NotifIdGetUnique()"); + goto out; + } + + ret = 0; + +out: + if(event) { + edg_wll_FreeEvent(event); + free(event); + } + if(ret) edg_wll_UpdateError(context, ret, "notif_create_ulm()"); + return(ret); +} + + +static +int +notif_save_to_file(edg_wll_Context context, + const char *event_file, + const char *ulm_data, + long *filepos) +{ + int ret; + FILE *outfile; + int filedesc; + struct flock filelock; + int i, filelock_status=-1; + + for(i=0; i < FCNTL_ATTEMPTS; i++) { + /* fopen and properly handle the filelock */ + if ((outfile = fopen(event_file,"a")) == NULL) { + edg_wll_SetError(context, ret = errno, "fopen()"); + goto out; + } + if ((filedesc = fileno(outfile)) == -1) { + edg_wll_SetError(context, ret = errno, "fileno()"); + goto out1; + } + filelock.l_type = F_WRLCK; + filelock.l_whence = SEEK_SET; + filelock.l_start = 0; + filelock.l_len = 0; + filelock_status=fcntl(filedesc, F_SETLK, &filelock); + if(filelock_status < 0) { + switch(errno) { + case EAGAIN: + case EACCES: + case EINTR: + /* lock is held by someone else */ + sleep(FCNTL_TIMEOUT); + break; + default: + /* other error */ + edg_wll_SetError(context, ret=errno, "fcntl()"); + goto out1; + } + } else { + /* lock acquired, break out of the loop */ + break; + } + } + if (fseek(outfile, 0, SEEK_END) == -1) { + edg_wll_SetError(context, ret = errno, "fseek()"); + goto out1; + } + if ((*filepos=ftell(outfile)) == -1) { + edg_wll_SetError(context, ret = errno, "ftell()"); + goto out1; + } + /* write, flush and sync */ + if (fputs(ulm_data, outfile) == EOF) { + edg_wll_SetError(context, ret = errno, "fputs()"); + goto out1; + } + if (fflush(outfile) == EOF) { + edg_wll_SetError(context, ret = errno, "fflush()"); + goto out1; + } + if (fsync(filedesc) < 0) { /* synchronize */ + edg_wll_SetError(context, ret = errno, "fsync()"); + goto out1; + } + + ret = 0; +out1: + /* close and unlock */ + fclose(outfile); +out: + if(ret) edg_wll_UpdateError(context, ret, "notif_save_to_file()"); + return(ret); +} + + +static +ssize_t +socket_write_full(edg_wll_Context context, + int sock, + void *buf, + size_t bufsize, + struct timeval *timeout, + ssize_t *total) +{ + int ret = 0; + ssize_t len; + + *total = 0; + while (bufsize > 0) { + + fd_set fds; + struct timeval to,before,after; + + if (timeout) { + memcpy(&to, timeout, sizeof(to)); + gettimeofday(&before, NULL); + } + + len = write(sock, buf, bufsize); + while (len <= 0) { + FD_ZERO(&fds); + FD_SET(sock, &fds); + if (select(sock+1, &fds, NULL, NULL, timeout ? &to : NULL) < 0) { + edg_wll_SetError(context, ret = errno, "select()"); + goto out; + } + len = write(sock, buf, bufsize); + } + if (timeout) { + gettimeofday(&after,NULL); + tv_sub(after, before); + tv_sub(*timeout,after); + if (timeout->tv_sec < 0) { + timeout->tv_sec = 0; + timeout->tv_usec = 0; + } + } + + if (len < 0) { + edg_wll_SetError(context, ret = errno, "write()"); + goto out; + } + + bufsize -= len; + buf += len; + *total += len; + } + + ret = 0; +out: + if(ret) edg_wll_UpdateError(context, ret, "socket_write_full()"); + return ret; +} + + +static +int +notif_send_socket(edg_wll_Context context, + long filepos, + const char *ulm_data) +{ + int ret; + struct sockaddr_un saddr; + int msg_sock, flags, count; + struct timeval timeout; + + timeout.tv_sec = EDG_WLL_LOG_TIMEOUT_MAX; + timeout.tv_usec = 0; + + msg_sock = socket(PF_UNIX, SOCK_STREAM, 0); + if(msg_sock < 0) { + edg_wll_SetError(context, ret = errno, "socket()"); + goto out; + } + + memset(&saddr, 0, sizeof(saddr)); + saddr.sun_family = AF_UNIX; + strcpy(saddr.sun_path, lbproxy_ilog_socket_path); + + if ((flags = fcntl(msg_sock, F_GETFL, 0)) < 0 || + fcntl(msg_sock, F_SETFL, flags | O_NONBLOCK) < 0) { + edg_wll_SetError(context, ret = errno, "fcntl()"); + goto out1; + } + + if(connect(msg_sock, (struct sockaddr *)&saddr, sizeof(saddr)) < 0) { + if(errno != EISCONN) { + edg_wll_SetError(context, ret = errno, "connect()"); + goto out1; + } + } + + if (socket_write_full(context, msg_sock, &filepos, sizeof(filepos), &timeout, &count) < 0) { + ret = errno; + goto out1; + } + + if (socket_write_full(context, msg_sock, (void*)ulm_data, strlen(ulm_data), &timeout, &count) < 0) { + ret = errno; + goto out1; + } + + ret = 0; + +out1: + close(msg_sock); +out: + if(ret) edg_wll_UpdateError(context, ret, "notif_send_socket()"); + return(ret); +} + + +int +edg_wll_NotifSend_a(edg_wll_Context context, + edg_wll_NotifId reg_id, + const char *host, + int port, + const char *owner, + const char *notif_data) +{ + int ret; + long filepos; + char *ulm_data, *reg_id_s, *event_file; + + if((ret=notif_create_ulm(context, + reg_id, + host, + port, + owner, + notif_data, + &ulm_data, + ®_id_s))) { + goto out; + } + + asprintf(&event_file, "%s.%s", lbproxy_ilog_file_prefix, reg_id_s); + if(event_file == NULL) { + edg_wll_SetError(context, ret=ENOMEM, "asprintf()"); + goto out; + } + + if((ret=notif_save_to_file(context, + event_file, + ulm_data, + &filepos))) { + goto out; + } + + if((ret=notif_send_socket(context, + filepos, + ulm_data))) { + goto out; + } + ret = 0; + +out: + if(ulm_data) free(ulm_data); + if(reg_id_s) free(reg_id_s); + if(ret) edg_wll_UpdateError(context, ret, "edg_wll_NotifSend()"); + return(ret); +} + + +int +edg_wll_NotifJobStatus_a(edg_wll_Context context, + edg_wll_NotifId reg_id, + const char *host, + int port, + const char *owner, + const edg_wll_JobStat notif_job_stat) +{ + int ret=0; + char *xml_data, *xml_esc_data=NULL; + + if(edg_wll_JobStatusToXML(context, notif_job_stat, &xml_data)) + goto out; + + if((xml_esc_data = edg_wll_EscapeXML(xml_data)) == NULL) { + edg_wll_SetError(context, ret=ENOMEM, "edg_wll_EscapeXML()"); + goto out; + } + + ret=edg_wll_NotifSend(context, reg_id, host, port, owner, xml_esc_data); + +out: + if(xml_data) free(xml_data); + if(xml_esc_data) free(xml_esc_data); + if(ret) edg_wll_UpdateError(context, ret, "edg_wll_NotifJobStatus()"); + return(ret); +} diff --git a/org.glite.lb.server/src/il_lbproxy.h b/org.glite.lb.server/src/il_lbproxy.h new file mode 100644 index 0000000..1f586e6 --- /dev/null +++ b/org.glite.lb.server/src/il_lbproxy.h @@ -0,0 +1,19 @@ +#ifndef IL_LBPROXY_H +#define IL_LBPROXY_H + +#ifdef __cplusplus +#extern "C" { +#endif + +extern char *lbproxy_ilog_socket_path; +extern char *lbproxy_ilog_file_prefix; + +int edg_wll_SendEventProxy( + edg_wll_Context context, + const char *owner); + +#ifdef __cplusplus +} +#endif + +#endif diff --git a/org.glite.lb.server/src/lb_http.c b/org.glite.lb.server/src/lb_http.c index 79e85d5..efe0c41 100644 --- a/org.glite.lb.server/src/lb_http.c +++ b/org.glite.lb.server/src/lb_http.c @@ -24,7 +24,8 @@ int edg_wll_ServerHTTP(edg_wll_Context ctx) edg_wll_ErrorCode err = 0; - err = edg_wll_http_recv(ctx,&req,&hdr,&body); + if ( ctx->isProxy ) err = edg_wll_http_recv_proxy(ctx,&req,&hdr,&body); + else err = edg_wll_http_recv(ctx,&req,&hdr,&body); dprintf(("[%d] %s\n",getpid(),req)); if (body) dprintf(("\n%s\n\n",body)); @@ -33,7 +34,12 @@ int edg_wll_ServerHTTP(edg_wll_Context ctx) if ((err = edg_wll_Proto(ctx,req,hdr,body,&resp,&hdrOut,&bodyOut))) edg_wll_Error(ctx,NULL,&err_desc); - if (resp) edg_wll_http_send(ctx,resp,(char const * const *)hdrOut,bodyOut); + if (resp) { + if ( ctx->isProxy ) + edg_wll_http_send_proxy(ctx,resp,(char const * const *)hdrOut,bodyOut); + else + edg_wll_http_send(ctx,resp,(char const * const *)hdrOut,bodyOut); + } } free(req); diff --git a/org.glite.lb.server/src/lb_http.h b/org.glite.lb.server/src/lb_http.h index c3e012d..76bb883 100644 --- a/org.glite.lb.server/src/lb_http.h +++ b/org.glite.lb.server/src/lb_http.h @@ -8,5 +8,6 @@ #include "glite/lb/consumer.h" int edg_wll_ServerHTTP(edg_wll_Context); +int edg_wll_ServerHTTPProxy(edg_wll_Context); #endif diff --git a/org.glite.lb.server/src/lbproxy.c b/org.glite.lb.server/src/lbproxy.c new file mode 100644 index 0000000..8d79d37 --- /dev/null +++ b/org.glite.lb.server/src/lbproxy.c @@ -0,0 +1,537 @@ +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include "glite/lb/srvbones.h" +#include "glite/lb/context.h" +#include "glite/lb/context-int.h" + +#include "il_lbproxy.h" +#include "lb_http.h" +#include "lbs_db.h" + + +extern edg_wll_ErrorCode edg_wll_Open(edg_wll_Context ctx, char *cs); +extern edg_wll_ErrorCode edg_wll_Close(edg_wll_Context); +extern int edg_wll_StoreProtoProxy(edg_wll_Context ctx); + +#define DEFAULTCS "lbserver/@localhost:lbserver20" +/* +#define DEFAULTCS "lbserver/@localhost:lbproxy" +*/ + +#define CON_QUEUE 20 /* accept() */ +#define SLAVE_OVERLOAD 10 /* queue items per slave */ +#define CLNT_TIMEOUT 10 /* keep idle connection that many seconds */ +#define TOTAL_CLNT_TIMEOUT 60 /* one client may ask one slave multiple times */ + /* but only limited time to avoid DoS attacks */ +#define CLNT_REJECT_TIMEOUT 100000 /* time limit for client rejection in !usec! */ +#define DNS_TIMEOUT 5 /* how long wait for DNS lookup */ +#define SLAVE_CONNS_MAX 500 /* commit suicide after that many connections */ +#define MASTER_TIMEOUT 30 /* maximal time of one-round of master network communication */ +#define SLAVE_TIMEOUT 30 /* maximal time of one-round of slave network communication */ + +/* file to store pid and generate semaphores key + */ +#ifndef GLITE_LBPROXY_PIDFILE +#define GLITE_LBPROXY_PIDFILE "/var/run/glite-lbproxy.pid" +#endif + +#ifndef GLITE_LBPROXY_SOCK_PREFIX +#define GLITE_LBPROXY_SOCK_PREFIX "/tmp/lb_proxy_" +#endif + +#ifndef dprintf +#define dprintf(x) { if (debug) printf x; } +#endif + +#define sizofa(a) (sizeof(a)/sizeof((a)[0])) + + +int debug = 0; +static const int one = 1; +static char *dbstring = NULL; +static char sock_store[PATH_MAX], + sock_serve[PATH_MAX]; +static int slaves = 10, + semaphores = -1, + semset; + + +static struct option opts[] = { + {"port", 1, NULL, 'p'}, + {"debug", 0, NULL, 'd'}, + {"mysql", 1, NULL, 'm'}, + {"slaves", 1, NULL, 's'}, + {"semaphores", 1, NULL, 'l'}, + {"pidfile", 1, NULL, 'i'}, + {"proxy-il-sock", 1, NULL, 'X'}, + {"proxy-il-fprefix", 1, NULL, 'Y'}, + {NULL,0,NULL,0} +}; + +static const char *get_opt_string = "p:dm:s:l:i:X:Y:"; + +static void usage(char *me) +{ + fprintf(stderr,"usage: %s [option]\n" + "\t-p, --sock\t path-name to the local socket\n" + "\t-m, --mysql\t database connect string\n" + "\t-d, --debug\t don't run as daemon, additional diagnostics\n" + "\t-s, --slaves\t number of slave servers to fork\n" + "\t-l, --semaphores number of semaphores (job locks) to use\n" + "\t-i, --pidfile\t file to store master pid\n" + "\t--proxy-il-sock\t socket to send events to\n" + "\t--proxy-il-fprefix\t file prefix for events\n" + ,me); +} + +static void wait_for_open(edg_wll_Context,const char *); + + +/* + * SERVER BONES structures and handlers + */ +int clnt_data_init(void **); + + /* + * Serve & Store handlers + */ +int clnt_reject(int); +int handle_conn(int, struct timeval, void *); +int accept_serve(int, void *); +int accept_store(int, void *); +int clnt_disconnect(int, void *); + +#define SRV_SERVE 0 +#define SRV_STORE 1 +static struct glite_srvbones_service service_table[] = { + { "serve", -1, handle_conn, accept_serve, clnt_reject, clnt_disconnect }, + { "store", -1, handle_conn, accept_store, clnt_reject, clnt_disconnect }, +}; + +struct clnt_data_t { + edg_wll_Context ctx; + void *mysql; +}; + + + +int main(int argc, char *argv[]) +{ + int i; + struct sockaddr_un a; + int opt; + char pidfile[PATH_MAX] = GLITE_LBPROXY_PIDFILE, + socket_path_prefix[PATH_MAX] = GLITE_LBPROXY_SOCK_PREFIX, + *name; + FILE *fpid; + key_t semkey; + edg_wll_Context ctx; + struct timeval to; + + + + name = strrchr(argv[0],'/'); + if (name) name++; else name = argv[0]; + + if (geteuid()) snprintf(pidfile,sizeof pidfile,"%s/glite_lb_proxy.pid", getenv("HOME")); + + while ((opt = getopt_long(argc, argv, get_opt_string, opts, NULL)) != EOF) switch (opt) { + case 'p': strcpy(socket_path_prefix, optarg); break; + case 'd': debug = 1; break; + case 'm': dbstring = optarg; break; + case 's': slaves = atoi(optarg); break; + case 'l': semaphores = atoi(optarg); break; + case 'X': lbproxy_ilog_socket_path = strdup(optarg); break; + case 'Y': lbproxy_ilog_file_prefix = strdup(optarg); break; + case 'i': strcpy(pidfile, optarg); break; + case '?': usage(name); return 1; + } + + if ( optind < argc ) { usage(name); return 1; } + + setlinebuf(stdout); + setlinebuf(stderr); + + fpid = fopen(pidfile,"r"); + if ( fpid ) { + int opid = -1; + + if ( fscanf(fpid,"%d",&opid) == 1 ) { + if ( !kill(opid,0) ) { + fprintf(stderr,"%s: another instance running, pid = %d\n",argv[0],opid); + return 1; + } + else if (errno != ESRCH) { perror("kill()"); return 1; } + } + fclose(fpid); + } else if (errno != ENOENT) { perror(pidfile); return 1; } + + fpid = fopen(pidfile, "w"); + if ( !fpid ) { perror(pidfile); return 1; } + fprintf(fpid, "%d", getpid()); + fclose(fpid); + + semkey = ftok(pidfile,0); + + if ( semaphores == -1 ) semaphores = slaves; + semset = semget(semkey, 0, 0); + if ( semset >= 0 ) semctl(semset, 0, IPC_RMID); + semset = semget(semkey, semaphores, IPC_CREAT | 0600); + if ( semset < 0 ) { perror("semget()"); return 1; } + dprintf(("Using %d semaphores, set id %d\n", semaphores, semset)); + for ( i = 0; i < semaphores; i++ ) { + struct sembuf s; + + s.sem_num = i; s.sem_op = 1; s.sem_flg = 0; + if (semop(semset,&s,1) == -1) { perror("semop()"); return 1; } + } + + service_table[SRV_SERVE].conn = socket(PF_UNIX, SOCK_STREAM, 0); + if ( service_table[SRV_SERVE].conn < 0 ) { perror("socket()"); return 1; } + memset(&a, 0, sizeof(a)); + a.sun_family = AF_UNIX; + sprintf(sock_serve, "%s%s", socket_path_prefix, "serve.sock"); + strcpy(a.sun_path, sock_serve); + + if( connect(service_table[SRV_SERVE].conn, (struct sockaddr *)&a, sizeof(a.sun_path)) < 0) { + if( errno == ECONNREFUSED ) { + dprintf(("removing stale input socket %s\n", sock_serve)); + unlink(sock_serve); + } + } else { perror("another instance of lb-proxy is running"); return 1; } + + if ( bind(service_table[SRV_SERVE].conn, (struct sockaddr *) &a, sizeof(a)) < 0 ) { + char buf[100]; + + snprintf(buf, sizeof(buf), "bind(%s)", sock_serve); + perror(buf); + return 1; + } + + if ( listen(service_table[SRV_SERVE].conn, CON_QUEUE) ) { perror("listen()"); return 1; } + + service_table[SRV_STORE].conn = socket(PF_UNIX, SOCK_STREAM, 0); + if ( service_table[SRV_STORE].conn < 0 ) { perror("socket()"); return 1; } + memset(&a, 0, sizeof(a)); + a.sun_family = AF_UNIX; + sprintf(sock_store, "%s%s", socket_path_prefix, "store.sock"); + strcpy(a.sun_path, sock_store); + + if( connect(service_table[SRV_STORE].conn, (struct sockaddr *)&a, sizeof(a.sun_path)) < 0) { + if( errno == ECONNREFUSED ) { + dprintf(("removing stale input socket %s\n", sock_store)); + unlink(sock_store); + } + } else { perror("another instance of lb-proxy is running"); return 1; } + + if ( bind(service_table[SRV_STORE].conn, (struct sockaddr *) &a, sizeof(a))) { + char buf[100]; + + snprintf(buf, sizeof(buf), "bind(%s)", sock_store); + perror(buf); + return 1; + } + if ( listen(service_table[SRV_STORE].conn, CON_QUEUE) ) { perror("listen()"); return 1; } + + dprintf(("Listening at %s, %s ...\n", sock_store, sock_serve)); + + if (!dbstring) dbstring = getenv("LBPROXYDB"); + if (!dbstring) dbstring = DEFAULTCS; + + + /* Just check the database and let it be. The slaves do the job. */ + /* XXX: InitContextProxy() !!! + * edg_wll_InitContext(&ctx) causes segfault + */ + if ( !(ctx = (edg_wll_Context) malloc(sizeof(*ctx))) ) { + perror("InitContext()"); + return -1; + } + memset(ctx, 0, sizeof(*ctx)); + wait_for_open(ctx, dbstring); + if (edg_wll_DBCheckVersion(ctx)) { + char *et,*ed; + edg_wll_Error(ctx,&et,&ed); + + fprintf(stderr,"%s: open database: %s (%s)\n",argv[0],et,ed); + return 1; + } + edg_wll_Close(ctx); + edg_wll_FreeContext(ctx); + + if ( !debug ) { + if ( daemon(1,0) == -1 ) { perror("deamon()"); exit(1); } + + fpid = fopen(pidfile,"w"); + if ( !fpid ) { perror(pidfile); return 1; } + fprintf(fpid, "%d", getpid()); + fclose(fpid); + openlog(name, LOG_PID, LOG_DAEMON); + } else { setpgid(0, getpid()); } + + + glite_srvbones_set_param(GLITE_SBPARAM_SLAVES_CT, slaves); + glite_srvbones_set_param(GLITE_SBPARAM_SLAVE_OVERLOAD, SLAVE_OVERLOAD); + glite_srvbones_set_param(GLITE_SBPARAM_SLAVE_CONNS_MAX, SLAVE_CONNS_MAX); + to = (struct timeval){CLNT_TIMEOUT, 0}; + glite_srvbones_set_param(GLITE_SBPARAM_CLNT_TIMEOUT, &to); + to = (struct timeval){TOTAL_CLNT_TIMEOUT, 0}; + glite_srvbones_set_param(GLITE_SBPARAM_TOTAL_CLNT_TIMEOUT, &to); + + glite_srvbones_run(clnt_data_init, service_table, sizofa(service_table), debug); + + + semctl(semset, 0, IPC_RMID, 0); + unlink(pidfile); + for ( i = 0; i < sizofa(service_table); i++ ) + if ( service_table[i].conn >= 0 ) close(service_table[i].conn); + unlink(sock_serve); + unlink(sock_store); + + + return 0; +} + + +int clnt_data_init(void **data) +{ + edg_wll_Context ctx; + struct clnt_data_t *cdata; + + + if ( !(cdata = calloc(1, sizeof(*cdata))) ) + return -1; + + if ( !(ctx = (edg_wll_Context) malloc(sizeof(*ctx))) ) { free(cdata); return -1; } + memset(ctx, 0, sizeof(*ctx)); + + dprintf(("[%d] opening database ...\n", getpid())); + wait_for_open(ctx, dbstring); + cdata->mysql = ctx->mysql; + edg_wll_FreeContext(ctx); + + *data = cdata; + return 0; +} + + +int handle_conn(int conn, struct timeval client_start, void *data) +{ + struct clnt_data_t *cdata = (struct clnt_data_t *)data; + edg_wll_Context ctx; + struct timeval total_to = { TOTAL_CLNT_TIMEOUT,0 }; + + + if ( !(ctx = (edg_wll_Context) calloc(1, sizeof(*ctx))) ) { + fprintf(stderr, "Couldn't create context"); + return -1; + } + cdata->ctx = ctx; + + /* Shared structures (pointers) + */ + ctx->mysql = cdata->mysql; + + /* set globals + */ + ctx->allowAnonymous = 1; + ctx->isProxy = 1; + ctx->noAuth = 1; + ctx->semset = semset; + ctx->semaphores = semaphores; + + ctx->p_tmp_timeout.tv_sec = SLAVE_TIMEOUT; + ctx->p_tmp_timeout.tv_usec = 0; + if ( total_to.tv_sec < ctx->p_tmp_timeout.tv_sec ) { + ctx->p_tmp_timeout.tv_sec = total_to.tv_sec; + ctx->p_tmp_timeout.tv_usec = total_to.tv_usec; + } + + ctx->poolSize = 1; + ctx->connPool = calloc(1, sizeof(edg_wll_ConnPool)); + ctx->connToUse = 0; + + if ( edg_wll_plain_accept(conn, &ctx->connPool[0].conn) ) { + perror("accept"); + edg_wll_FreeContext(ctx); + + return -1; + } + + + return 0; +} + + +int accept_store(int conn, void *cdata) +{ + edg_wll_Context ctx = ((struct clnt_data_t *) cdata)->ctx; + + if ( edg_wll_StoreProtoProxy(ctx) ) { + char *errt, *errd; + + errt = errd = NULL; + switch ( edg_wll_Error(ctx, &errt, &errd) ) { + case ETIMEDOUT: + case EPIPE: + dprintf(("[%d] %s (%s)\n", getpid(), errt, errd)); + if (!debug) syslog(LOG_ERR,"%s (%s)", errt, errd); + /* fallthrough + */ + case ENOTCONN: + edg_wll_FreeContext(ctx); + ctx = NULL; + free(errt); free(errd); + dprintf(("[%d] Connection closed\n", getpid())); + return 1; + break; + + case ENOENT: + case EINVAL: + case EPERM: + case EEXIST: + case EDG_WLL_ERROR_NOINDEX: + case E2BIG: + dprintf(("[%d] %s (%s)\n", getpid(), errt, errd)); + if ( !debug ) syslog(LOG_ERR, "%s (%s)", errt, errd); + break; + + default: + dprintf(("[%d] %s (%s)\n", getpid(), errt, errd)); + if ( !debug ) syslog(LOG_CRIT, "%s (%s)", errt, errd); + return -1; + } + free(errt); free(errd); + } + + return 0; +} + +int accept_serve(int conn, void *cdata) +{ + edg_wll_Context ctx = ((struct clnt_data_t *) cdata)->ctx; + + /* + * serve the request + */ + if ( edg_wll_ServerHTTP(ctx) ) { + char *errt, *errd; + + + errt = errd = NULL; + switch ( edg_wll_Error(ctx, &errt, &errd) ) { + case ETIMEDOUT: + case EPIPE: + dprintf(("[%d] %s (%s)\n", getpid(), errt, errd)); + if (!debug) syslog(LOG_ERR,"%s (%s)", errt, errd); + /* fallthrough + */ + case ENOTCONN: + edg_wll_FreeContext(ctx); + ctx = NULL; + free(errt); free(errd); + dprintf(("[%d] Connection closed\n", getpid())); + /* + * "recoverable" error - return (>0) + */ + return 1; + break; + + case ENOENT: + case EINVAL: + case EPERM: + case EEXIST: + case EDG_WLL_ERROR_NOINDEX: + case E2BIG: + dprintf(("[%d] %s (%s)\n", getpid(), errt, errd)); + if ( !debug ) syslog(LOG_ERR,"%s (%s)", errt, errd); + /* + * no action for non-fatal errors + */ + break; + + default: + dprintf(("[%d] %s (%s)\n", getpid(), errt, errd)); + if (!debug) syslog(LOG_CRIT,"%s (%s)",errt,errd); + /* + * unknown error - do rather return (<0) (slave will be killed) + */ + return -1; + } + free(errt); free(errd); + } + + return 0; +} + + +int clnt_disconnect(int conn, void *cdata) +{ + edg_wll_Context ctx = ((struct clnt_data_t *) cdata)->ctx; + + edg_wll_FreeContext(ctx); + + return 0; +} + +int clnt_reject(int conn) +{ + return 0; +} + +static void wait_for_open(edg_wll_Context ctx, const char *dbstring) +{ + char *dbfail_string1, *dbfail_string2; + + dbfail_string1 = dbfail_string2 = NULL; + + while (edg_wll_Open(ctx, (char *) dbstring)) { + char *errt,*errd; + + if (dbfail_string1) free(dbfail_string1); + edg_wll_Error(ctx,&errt,&errd); + asprintf(&dbfail_string1,"%s (%s)\n",errt,errd); + if (dbfail_string1 != NULL) { + if (dbfail_string2 == NULL || strcmp(dbfail_string1,dbfail_string2)) { + if (dbfail_string2) free(dbfail_string2); + dbfail_string2 = dbfail_string1; + dbfail_string1 = NULL; + dprintf(("[%d]: %s\nStill trying ...\n",getpid(),dbfail_string2)); + if (!debug) syslog(LOG_ERR,dbfail_string2); + } + } + sleep(5); + } + + if (dbfail_string1) free(dbfail_string1); + if (dbfail_string2 != NULL) { + free(dbfail_string2); + dprintf(("[%d]: DB connection established\n",getpid())); + if (!debug) syslog(LOG_INFO,"DB connection established\n"); + } +} diff --git a/org.glite.lb.server/src/stored_master.c b/org.glite.lb.server/src/stored_master.c index 7c29f77..52750f5 100644 --- a/org.glite.lb.server/src/stored_master.c +++ b/org.glite.lb.server/src/stored_master.c @@ -10,6 +10,7 @@ #include "glite/lb/il_string.h" #include "glite/lb/lb_gss.h" +#include "glite/lb/lb_plain_io.h" #include "glite/lb/context-int.h" #include "store.h" @@ -57,3 +58,40 @@ int edg_wll_StoreProto(edg_wll_Context ctx) return edg_wll_Error(ctx,NULL,NULL); } + +int edg_wll_StoreProtoProxy(edg_wll_Context ctx) +{ + char fbuf[256], *buf; + int len, ret; + + + edg_wll_ResetError(ctx); + ret = edg_wll_plain_read_full(&ctx->connPool[ctx->connToUse].conn, + fbuf, 17, &ctx->p_tmp_timeout); + if ( ret < 0 ) return edg_wll_SetError(ctx, errno, "StoreProtoProxy() - reading data"); + + len = atoi(fbuf); + if ( len <= 0 ) return edg_wll_SetError(ctx, EINVAL, "message length"); + + buf = malloc(len+1); + if ( !buf ) return edg_wll_SetError(ctx, errno, "StoreProtoProxy()"); + + if ( edg_wll_plain_read_full(&ctx->connPool[ctx->connToUse].conn, + buf, len, &ctx->p_tmp_timeout) < 0) { + free(buf); + return edg_wll_SetError(ctx, errno, "StoreProtoProxy() - reading data"); + } + + buf[len] = 0; + handle_request(ctx, buf, len); + free(buf); + + if ( (len = create_reply(ctx, fbuf, sizeof fbuf)) ) { + if ( edg_wll_plain_write_full(&ctx->connPool[ctx->connToUse].conn, + fbuf, len, &ctx->p_tmp_timeout) < 0 ) + return edg_wll_SetError(ctx, errno, "StoreProtoProxy() - sending reply"); + } + else edg_wll_SetError(ctx, E2BIG, "create_reply()"); + + return edg_wll_Error(ctx, NULL, NULL); +} -- 1.8.2.3