- LB Proxy - the first shot (not working)
authorJiří Škrábal <nykolas@ics.muni.cz>
Tue, 9 Nov 2004 11:02:57 +0000 (11:02 +0000)
committerJiří Škrábal <nykolas@ics.muni.cz>
Tue, 9 Nov 2004 11:02:57 +0000 (11:02 +0000)
org.glite.lb.server/Makefile
org.glite.lb.server/src/db_store.c
org.glite.lb.server/src/il_lbproxy.c [new file with mode: 0644]
org.glite.lb.server/src/il_lbproxy.h [new file with mode: 0644]
org.glite.lb.server/src/lb_http.c
org.glite.lb.server/src/lb_http.h
org.glite.lb.server/src/lbproxy.c [new file with mode: 0644]
org.glite.lb.server/src/stored_master.c

index b7ba4a9..1525add 100644 (file)
@@ -88,6 +88,8 @@ SRVBONES_LIB:= -L${stagedir}/lib -lglite_lb_server_bones
 
 COMMON_LIB:= -L${stagedir}/lib -lglite_lb_common_${nothrflavour}
 
+LB_PROXY_COMMON_LIB:= -L${stagedir}/lib -lglite_lb_common_${nothrflavour}
+
 ifeq ($(GLITE_LB_SERVER_WITH_WS),yes)
        SERVER_SOAP_OBJS:=${GSOAP_FILES_PREFIX}C.o ${GSOAP_FILES_PREFIX}Server.o \
                        ws_plugin.o ws_query.o ws_fault.o ws_typeref.o
@@ -97,6 +99,13 @@ else
        SERVER_WS_LIBS=
 endif
 
+LB_PROXY_OBJS:= lbproxy.o il_lbproxy.o get_events.o index.o jobstat.o jobstat_supp.o \
+       write2rgma.o lbs_db.o lb_html.o lb_http.o lb_proto.o lb_xml_parse.o \
+       lb_xml_parse_V21.o \
+       lock.o openserver.o query.o userjobs.o db_store.o request.o store.o \
+       stored_master.o srv_purge.o server_state.o dump.o lb_authz.o load.o \
+       notification.o il_notification.o notif_match.o
+
 SERVER_OBJS:= bkserverd.o get_events.o index.o jobstat.o jobstat_supp.o \
        write2rgma.o lbs_db.o lb_html.o lb_http.o lb_proto.o lb_xml_parse.o \
        lb_xml_parse_V21.o \
@@ -112,6 +121,9 @@ WS_CLIENT_OBJS:= stdsoap2_patched.o test_ws_plugin.o $(GSOAP_FILES_PREFIX)C.o $(
 WS_CLIENT_LIBS:= -L${stagedir}/lib -lglite_lb_client_${nothrflavour}
 
 
+glite_lb_proxy: ${LB_PROXY_OBJS}
+       ${LINK} -o $@ ${LB_PROXY_OBJS} ${COMMON_LIB} ${SRVBONES_LIB} ${EXT_LIBS}
+
 glite_lb_bkserverd: ${SERVER_OBJS} ${SERVER_SOAP_OBJS}
        ${LINK} -o $@ ${SERVER_OBJS} ${SERVER_SOAP_OBJS} ${COMMON_LIB} ${SRVBONES_LIB} ${EXT_LIBS} ${SERVER_WS_LIBS}
 
index 6ff49cc..b4a1c1c 100644 (file)
@@ -70,7 +70,7 @@ db_store(edg_wll_Context ctx,char *ucs, char *event)
   if (edg_wll_UnlockJob(ctx,ev->any.jobId)) goto err;
   if (err) goto err;
 
-  if (newstat.state) {
+  if (!ctx->isProxy && newstat.state) {
          edg_wll_NotifMatch(ctx,&newstat);
          edg_wll_FreeStatus(&newstat);
   }
diff --git a/org.glite.lb.server/src/il_lbproxy.c b/org.glite.lb.server/src/il_lbproxy.c
new file mode 100644 (file)
index 0000000..053ec33
--- /dev/null
@@ -0,0 +1,372 @@
+/**
+ * il_lbproxy.c
+ *   - implementation of IL API calls for LB proxy
+ *
+ */
+#include <unistd.h>
+#include <fcntl.h>
+#include <sys/socket.h>
+#include <sys/un.h>
+#include <sys/types.h>
+#include <fcntl.h>
+#include <string.h>
+#include <errno.h>
+
+#include "glite/lb/context-int.h"
+#include "glite/lb/events_parse.h"
+#include "glite/lb/escape.h"
+#include "glite/lb/log_proto.h"
+
+#include "il_lbproxy.h"
+#include "lb_xml_parse.h"
+
+
+
+#define FCNTL_ATTEMPTS         5
+#define FCNTL_TIMEOUT          1
+#define FILE_PREFIX             "/tmp/lbproxy_events"
+#define DEFAULT_SOCKET          "/tmp/lbproxy_interlogger.sock"
+
+char *lbproxy_ilog_socket_path = DEFAULT_SOCKET;
+char *lbproxy_ilog_file_prefix = FILE_PREFIX;
+
+#define tv_sub(a,b) {\
+       (a).tv_usec -= (b).tv_usec;\
+       (a).tv_sec -= (b).tv_sec;\
+       if ((a).tv_usec < 0) {\
+               (a).tv_sec--;\
+               (a).tv_usec += 1000000;\
+       }\
+}
+
+int edg_wll_SendEventProxy(edg_wll_Context ctx, const char *owner)
+{
+       return 0;
+}
+
+static
+int
+notif_create_ulm(
+       edg_wll_Context context,
+       edg_wll_NotifId reg_id,
+       const char      *host,
+       const uint16_t  port,
+       const char      *owner,
+       const char      *notif_data,
+       char            **ulm_data,
+       char            **reg_id_s)
+{
+       int             ret;
+       edg_wll_Event   *event=NULL;
+
+       *ulm_data = NULL;
+       *reg_id_s = NULL;
+
+       event = edg_wll_InitEvent(EDG_WLL_EVENT_NOTIFICATION);
+
+       gettimeofday(&event->any.timestamp,0);
+       if (context->p_host) event->any.host = strdup(context->p_host);
+       event->any.level = context->p_level;
+       event->any.source = context->p_source;
+       if (context->p_instance) event->notification.src_instance = strdup(context->p_instance);
+       event->notification.notifId = reg_id;
+       if (owner) event->notification.owner = strdup(owner);
+       if (host) event->notification.dest_host = strdup(host);
+       event->notification.dest_port = port;
+       if (notif_data) event->notification.jobstat = strdup(notif_data);
+
+       if ((*ulm_data = edg_wll_UnparseNotifEvent(context,event)) == NULL) {
+               edg_wll_SetError(context, ret = ENOMEM, "edg_wll_UnparseNotifEvent()"); 
+               goto out;
+       }
+
+       if((*reg_id_s = edg_wll_NotifIdGetUnique(reg_id)) == NULL) {
+               edg_wll_SetError(context, ret = ENOMEM, "edg_wll_NotifIdGetUnique()");
+               goto out;
+       }
+
+       ret = 0;
+
+out:
+       if(event) { 
+               edg_wll_FreeEvent(event);
+               free(event);
+       }
+       if(ret) edg_wll_UpdateError(context, ret, "notif_create_ulm()");
+       return(ret);
+}
+
+
+static
+int
+notif_save_to_file(edg_wll_Context     context,
+                  const char         *event_file,
+                  const char         *ulm_data,
+                  long               *filepos)
+{
+       int ret;
+       FILE *outfile;
+       int filedesc;
+       struct flock filelock;
+       int i, filelock_status=-1;
+
+       for(i=0; i < FCNTL_ATTEMPTS; i++) {
+               /* fopen and properly handle the filelock */
+               if ((outfile = fopen(event_file,"a")) == NULL) {
+                       edg_wll_SetError(context, ret = errno, "fopen()");
+                       goto out;
+               }
+               if ((filedesc = fileno(outfile)) == -1) {
+                       edg_wll_SetError(context, ret = errno, "fileno()");
+                       goto out1;
+               }
+               filelock.l_type = F_WRLCK;
+               filelock.l_whence = SEEK_SET;
+               filelock.l_start = 0;
+               filelock.l_len = 0;
+               filelock_status=fcntl(filedesc, F_SETLK, &filelock);
+               if(filelock_status < 0) {
+                       switch(errno) {
+                       case EAGAIN:
+                       case EACCES:
+                       case EINTR:
+                               /* lock is held by someone else */
+                               sleep(FCNTL_TIMEOUT);
+                               break;
+                       default:
+                               /* other error */
+                               edg_wll_SetError(context, ret=errno, "fcntl()");
+                               goto out1;
+                       }
+               } else {
+                       /* lock acquired, break out of the loop */
+                       break;
+               }
+       }
+       if (fseek(outfile, 0, SEEK_END) == -1) {
+               edg_wll_SetError(context, ret = errno, "fseek()");
+               goto out1;
+       }
+       if ((*filepos=ftell(outfile)) == -1) {
+               edg_wll_SetError(context, ret = errno, "ftell()");
+               goto out1;
+       }
+       /* write, flush and sync */
+       if (fputs(ulm_data, outfile) == EOF) {
+               edg_wll_SetError(context, ret = errno, "fputs()");
+               goto out1;
+       }       
+       if (fflush(outfile) == EOF) {
+               edg_wll_SetError(context, ret = errno, "fflush()");
+               goto out1;
+       }
+       if (fsync(filedesc) < 0) { /* synchronize */
+               edg_wll_SetError(context, ret = errno, "fsync()");
+               goto out1;
+       } 
+
+       ret = 0;
+out1:
+       /* close and unlock */
+       fclose(outfile); 
+out:
+       if(ret) edg_wll_UpdateError(context, ret, "notif_save_to_file()");
+       return(ret);
+}
+
+
+static 
+ssize_t 
+socket_write_full(edg_wll_Context  context,
+                 int              sock,
+                 void            *buf,
+                 size_t           bufsize,
+                 struct timeval  *timeout,
+                 ssize_t         *total)
+{
+       int ret = 0;
+        ssize_t        len;
+
+        *total = 0;
+        while (bufsize > 0) {
+
+               fd_set  fds;
+               struct timeval  to,before,after;
+               
+               if (timeout) {
+                       memcpy(&to, timeout, sizeof(to));
+                       gettimeofday(&before, NULL);
+               }
+
+               len = write(sock, buf, bufsize);
+               while (len <= 0) {
+                       FD_ZERO(&fds);
+                       FD_SET(sock, &fds);
+                       if (select(sock+1, &fds, NULL, NULL, timeout ? &to : NULL) < 0) {
+                               edg_wll_SetError(context, ret = errno, "select()");
+                               goto out;
+                       }
+                       len = write(sock, buf, bufsize);
+               }
+               if (timeout) {
+                       gettimeofday(&after,NULL);
+                       tv_sub(after, before);
+                       tv_sub(*timeout,after);
+                       if (timeout->tv_sec < 0) {
+                               timeout->tv_sec = 0;
+                               timeout->tv_usec = 0;
+                       }
+               }
+               
+                if (len < 0) {
+                       edg_wll_SetError(context, ret = errno, "write()");
+                       goto out;
+               }
+
+               bufsize -= len;
+               buf += len;
+                *total += len;
+        }
+
+       ret = 0;
+out:
+       if(ret) edg_wll_UpdateError(context, ret, "socket_write_full()");
+        return ret;
+}
+
+
+static 
+int
+notif_send_socket(edg_wll_Context       context,
+                 long                  filepos,
+                 const char           *ulm_data)
+{
+       int ret;
+       struct sockaddr_un saddr;
+       int msg_sock, flags, count;
+       struct timeval timeout;
+
+       timeout.tv_sec = EDG_WLL_LOG_TIMEOUT_MAX;
+        timeout.tv_usec = 0;   
+
+       msg_sock = socket(PF_UNIX, SOCK_STREAM, 0);
+       if(msg_sock < 0) {
+               edg_wll_SetError(context, ret = errno, "socket()");
+               goto out;
+       }
+
+       memset(&saddr, 0, sizeof(saddr));
+       saddr.sun_family = AF_UNIX;
+       strcpy(saddr.sun_path, lbproxy_ilog_socket_path);
+
+       if ((flags = fcntl(msg_sock, F_GETFL, 0)) < 0 ||
+           fcntl(msg_sock, F_SETFL, flags | O_NONBLOCK) < 0) {
+               edg_wll_SetError(context, ret = errno, "fcntl()");
+               goto out1;
+       }
+
+       if(connect(msg_sock, (struct sockaddr *)&saddr, sizeof(saddr)) < 0) {
+               if(errno != EISCONN) {
+                       edg_wll_SetError(context, ret = errno, "connect()");
+                       goto out1;
+               }
+       }
+
+       if (socket_write_full(context, msg_sock, &filepos, sizeof(filepos), &timeout, &count) < 0) {
+               ret = errno; 
+               goto out1;
+       }
+
+       if (socket_write_full(context, msg_sock, (void*)ulm_data, strlen(ulm_data), &timeout, &count) < 0) {
+               ret = errno; 
+               goto out1;
+       }
+
+       ret = 0;
+
+out1:
+       close(msg_sock);
+out:
+       if(ret) edg_wll_UpdateError(context, ret, "notif_send_socket()");
+       return(ret);
+}
+
+
+int
+edg_wll_NotifSend_a(edg_wll_Context       context,
+                 edg_wll_NotifId       reg_id,
+                 const char           *host,
+                  int                   port,
+                 const char           *owner,
+                  const char           *notif_data)
+{
+       int ret;
+       long filepos;
+       char *ulm_data, *reg_id_s, *event_file;
+
+       if((ret=notif_create_ulm(context, 
+                                reg_id, 
+                                host, 
+                                port, 
+                                owner, 
+                                notif_data,
+                                &ulm_data,
+                                &reg_id_s))) {
+               goto out;
+       }
+
+       asprintf(&event_file, "%s.%s", lbproxy_ilog_file_prefix, reg_id_s);
+       if(event_file == NULL) {
+               edg_wll_SetError(context, ret=ENOMEM, "asprintf()");
+               goto out;
+       }
+
+       if((ret=notif_save_to_file(context,
+                                  event_file,
+                                  ulm_data,
+                                  &filepos))) {
+               goto out;
+       }
+
+       if((ret=notif_send_socket(context,
+                                 filepos,
+                                 ulm_data))) {
+               goto out;
+       }
+       ret = 0;
+
+out:
+       if(ulm_data) free(ulm_data);
+       if(reg_id_s) free(reg_id_s);
+       if(ret) edg_wll_UpdateError(context, ret, "edg_wll_NotifSend()");
+       return(ret);
+}
+
+
+int
+edg_wll_NotifJobStatus_a(edg_wll_Context       context,
+                      edg_wll_NotifId  reg_id,
+                      const char      *host,
+                       int              port,
+                      const char      *owner,
+                      const edg_wll_JobStat notif_job_stat)
+{
+       int ret=0;
+       char   *xml_data, *xml_esc_data=NULL;
+
+       if(edg_wll_JobStatusToXML(context, notif_job_stat, &xml_data)) 
+               goto out;
+       
+       if((xml_esc_data = edg_wll_EscapeXML(xml_data)) == NULL) {
+               edg_wll_SetError(context, ret=ENOMEM, "edg_wll_EscapeXML()");
+               goto out;
+       }
+
+       ret=edg_wll_NotifSend(context, reg_id, host, port, owner, xml_esc_data);
+
+out:
+       if(xml_data) free(xml_data);
+       if(xml_esc_data) free(xml_esc_data);
+       if(ret) edg_wll_UpdateError(context, ret, "edg_wll_NotifJobStatus()");
+       return(ret);
+}
diff --git a/org.glite.lb.server/src/il_lbproxy.h b/org.glite.lb.server/src/il_lbproxy.h
new file mode 100644 (file)
index 0000000..1f586e6
--- /dev/null
@@ -0,0 +1,19 @@
+#ifndef IL_LBPROXY_H
+#define IL_LBPROXY_H
+
+#ifdef __cplusplus
+#extern "C" {
+#endif
+
+extern char *lbproxy_ilog_socket_path;
+extern char *lbproxy_ilog_file_prefix;
+
+int edg_wll_SendEventProxy(
+               edg_wll_Context         context,
+               const char                 *owner);
+
+#ifdef __cplusplus
+}
+#endif
+
+#endif
index 79e85d5..efe0c41 100644 (file)
@@ -24,7 +24,8 @@ int edg_wll_ServerHTTP(edg_wll_Context ctx)
        edg_wll_ErrorCode       err = 0;
 
 
-       err = edg_wll_http_recv(ctx,&req,&hdr,&body);
+       if ( ctx->isProxy ) err = edg_wll_http_recv_proxy(ctx,&req,&hdr,&body);
+       else err = edg_wll_http_recv(ctx,&req,&hdr,&body);
 
        dprintf(("[%d] %s\n",getpid(),req));
        if (body) dprintf(("\n%s\n\n",body));
@@ -33,7 +34,12 @@ int edg_wll_ServerHTTP(edg_wll_Context ctx)
                if ((err = edg_wll_Proto(ctx,req,hdr,body,&resp,&hdrOut,&bodyOut))) 
                        edg_wll_Error(ctx,NULL,&err_desc);
 
-               if (resp) edg_wll_http_send(ctx,resp,(char const * const *)hdrOut,bodyOut);
+               if (resp) {
+                       if ( ctx->isProxy )
+                               edg_wll_http_send_proxy(ctx,resp,(char const * const *)hdrOut,bodyOut);
+                       else
+                               edg_wll_http_send(ctx,resp,(char const * const *)hdrOut,bodyOut);
+               }
        }
 
        free(req);
index c3e012d..76bb883 100644 (file)
@@ -8,5 +8,6 @@
 #include "glite/lb/consumer.h"
 
 int edg_wll_ServerHTTP(edg_wll_Context);
+int edg_wll_ServerHTTPProxy(edg_wll_Context);
 
 #endif
diff --git a/org.glite.lb.server/src/lbproxy.c b/org.glite.lb.server/src/lbproxy.c
new file mode 100644 (file)
index 0000000..8d79d37
--- /dev/null
@@ -0,0 +1,537 @@
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+#include <unistd.h>
+#include <getopt.h>
+#include <linux/limits.h>
+#include <sys/types.h>
+#include <sys/stat.h>
+#include <fcntl.h>
+#include <sys/wait.h>
+#include <sys/socket.h>
+#include <sys/uio.h>
+#include <sys/un.h>
+#include <netinet/in.h>
+#include <signal.h>
+#include <errno.h>
+#include <netdb.h>
+#include <limits.h>
+#include <syslog.h>
+#include <sys/time.h>
+#include <arpa/inet.h>
+#include <arpa/nameser.h>
+#include <resolv.h>
+#include <ares.h>
+#include <sys/ipc.h>
+#include <sys/sem.h>
+
+#include "glite/lb/srvbones.h"
+#include "glite/lb/context.h"
+#include "glite/lb/context-int.h"
+
+#include "il_lbproxy.h"
+#include "lb_http.h"
+#include "lbs_db.h"
+
+
+extern edg_wll_ErrorCode edg_wll_Open(edg_wll_Context ctx, char *cs);
+extern edg_wll_ErrorCode edg_wll_Close(edg_wll_Context);
+extern int edg_wll_StoreProtoProxy(edg_wll_Context ctx);
+
+#define DEFAULTCS                      "lbserver/@localhost:lbserver20"
+/*
+#define DEFAULTCS                      "lbserver/@localhost:lbproxy"
+*/
+
+#define CON_QUEUE                      20      /* accept() */
+#define SLAVE_OVERLOAD         10      /* queue items per slave */
+#define CLNT_TIMEOUT           10      /* keep idle connection that many seconds */
+#define TOTAL_CLNT_TIMEOUT     60      /* one client may ask one slave multiple times */
+                                       /* but only limited time to avoid DoS attacks */
+#define CLNT_REJECT_TIMEOUT    100000  /* time limit for client rejection in !usec! */
+#define DNS_TIMEOUT            5       /* how long wait for DNS lookup */
+#define SLAVE_CONNS_MAX                500     /* commit suicide after that many connections */
+#define MASTER_TIMEOUT         30      /* maximal time of one-round of master network communication */
+#define SLAVE_TIMEOUT          30      /* maximal time of one-round of slave network communication */
+
+/* file to store pid and generate semaphores key
+ */
+#ifndef GLITE_LBPROXY_PIDFILE
+#define GLITE_LBPROXY_PIDFILE          "/var/run/glite-lbproxy.pid"
+#endif
+
+#ifndef GLITE_LBPROXY_SOCK_PREFIX
+#define GLITE_LBPROXY_SOCK_PREFIX      "/tmp/lb_proxy_"
+#endif
+
+#ifndef dprintf
+#define dprintf(x)                     { if (debug) printf x; }
+#endif
+
+#define sizofa(a)                      (sizeof(a)/sizeof((a)[0]))
+
+
+int                                            debug  = 0;
+static const int               one = 1;
+static char                       *dbstring = NULL;
+static char                            sock_store[PATH_MAX],
+                                               sock_serve[PATH_MAX];
+static int                             slaves = 10,
+                                               semaphores = -1,
+                                               semset;
+
+
+static struct option opts[] = {
+       {"port",                1, NULL,        'p'},
+       {"debug",               0, NULL,        'd'},
+       {"mysql",               1, NULL,        'm'},
+       {"slaves",              1, NULL,        's'},
+       {"semaphores",          1, NULL,        'l'},
+       {"pidfile",             1, NULL,        'i'},
+       {"proxy-il-sock",       1, NULL,        'X'},
+       {"proxy-il-fprefix",    1, NULL,        'Y'},
+       {NULL,0,NULL,0}
+};
+
+static const char *get_opt_string = "p:dm:s:l:i:X:Y:";
+
+static void usage(char *me) 
+{
+       fprintf(stderr,"usage: %s [option]\n"
+               "\t-p, --sock\t path-name to the local socket\n"
+               "\t-m, --mysql\t database connect string\n"
+               "\t-d, --debug\t don't run as daemon, additional diagnostics\n"
+               "\t-s, --slaves\t number of slave servers to fork\n"
+               "\t-l, --semaphores number of semaphores (job locks) to use\n"
+               "\t-i, --pidfile\t file to store master pid\n"
+               "\t--proxy-il-sock\t socket to send events to\n"
+               "\t--proxy-il-fprefix\t file prefix for events\n"
+       ,me);
+}
+
+static void wait_for_open(edg_wll_Context,const char *);
+
+
+/*
+ *     SERVER BONES structures and handlers
+ */
+int clnt_data_init(void **);
+
+       /*
+        *      Serve & Store handlers
+        */
+int clnt_reject(int);
+int handle_conn(int, struct timeval, void *);
+int accept_serve(int, void *);
+int accept_store(int, void *);
+int clnt_disconnect(int, void *);
+
+#define SRV_SERVE              0
+#define SRV_STORE              1
+static struct glite_srvbones_service service_table[] = {
+       { "serve",      -1, handle_conn, accept_serve, clnt_reject, clnt_disconnect },
+       { "store",      -1, handle_conn, accept_store, clnt_reject, clnt_disconnect },
+};
+
+struct clnt_data_t {
+       edg_wll_Context                 ctx;
+       void                               *mysql;
+};
+
+
+
+int main(int argc, char *argv[])
+{
+       int                                     i;
+       struct sockaddr_un      a;
+       int                                     opt;
+       char                            pidfile[PATH_MAX] = GLITE_LBPROXY_PIDFILE,
+                                               socket_path_prefix[PATH_MAX] = GLITE_LBPROXY_SOCK_PREFIX,
+                                          *name;
+       FILE                       *fpid;
+       key_t                           semkey;
+       edg_wll_Context         ctx;
+       struct timeval          to;
+
+
+
+       name = strrchr(argv[0],'/');
+       if (name) name++; else name = argv[0];
+
+       if (geteuid()) snprintf(pidfile,sizeof pidfile,"%s/glite_lb_proxy.pid", getenv("HOME"));
+
+       while ((opt = getopt_long(argc, argv, get_opt_string, opts, NULL)) != EOF) switch (opt) {
+               case 'p': strcpy(socket_path_prefix, optarg); break;
+               case 'd': debug = 1; break;
+               case 'm': dbstring = optarg; break;
+               case 's': slaves = atoi(optarg); break;
+               case 'l': semaphores = atoi(optarg); break;
+               case 'X': lbproxy_ilog_socket_path = strdup(optarg); break;
+               case 'Y': lbproxy_ilog_file_prefix = strdup(optarg); break;
+               case 'i': strcpy(pidfile, optarg); break;
+               case '?': usage(name); return 1;
+       }
+
+       if ( optind < argc ) { usage(name); return 1; }
+
+       setlinebuf(stdout);
+       setlinebuf(stderr);
+
+       fpid = fopen(pidfile,"r");
+       if ( fpid ) {
+               int     opid = -1;
+
+               if ( fscanf(fpid,"%d",&opid) == 1 ) {
+                       if ( !kill(opid,0) ) {
+                               fprintf(stderr,"%s: another instance running, pid = %d\n",argv[0],opid);
+                               return 1;
+                       }
+                       else if (errno != ESRCH) { perror("kill()"); return 1; }
+               }
+               fclose(fpid);
+       } else if (errno != ENOENT) { perror(pidfile); return 1; }
+
+       fpid = fopen(pidfile, "w");
+       if ( !fpid ) { perror(pidfile); return 1; }
+       fprintf(fpid, "%d", getpid());
+       fclose(fpid);
+
+       semkey = ftok(pidfile,0);
+
+       if ( semaphores == -1 ) semaphores = slaves;
+       semset = semget(semkey, 0, 0);
+       if ( semset >= 0 ) semctl(semset, 0, IPC_RMID);
+       semset = semget(semkey, semaphores, IPC_CREAT | 0600);
+       if ( semset < 0 ) { perror("semget()"); return 1; }
+       dprintf(("Using %d semaphores, set id %d\n", semaphores, semset));
+       for ( i = 0; i < semaphores; i++ ) {
+               struct sembuf   s;
+
+               s.sem_num = i; s.sem_op = 1; s.sem_flg = 0;
+               if (semop(semset,&s,1) == -1) { perror("semop()"); return 1; }
+       }
+
+       service_table[SRV_SERVE].conn = socket(PF_UNIX, SOCK_STREAM, 0);
+       if ( service_table[SRV_SERVE].conn < 0 ) { perror("socket()"); return 1; }
+       memset(&a, 0, sizeof(a));
+       a.sun_family = AF_UNIX;
+       sprintf(sock_serve, "%s%s", socket_path_prefix, "serve.sock");
+       strcpy(a.sun_path, sock_serve);
+
+       if( connect(service_table[SRV_SERVE].conn, (struct sockaddr *)&a, sizeof(a.sun_path)) < 0) {
+               if( errno == ECONNREFUSED ) {
+                       dprintf(("removing stale input socket %s\n", sock_serve));
+                       unlink(sock_serve);
+               }
+       } else { perror("another instance of lb-proxy is running"); return 1; }
+
+       if ( bind(service_table[SRV_SERVE].conn, (struct sockaddr *) &a, sizeof(a)) < 0 ) {
+               char    buf[100];
+
+               snprintf(buf, sizeof(buf), "bind(%s)", sock_serve);
+               perror(buf);
+               return 1;
+       }
+
+       if ( listen(service_table[SRV_SERVE].conn, CON_QUEUE) ) { perror("listen()"); return 1; }
+
+       service_table[SRV_STORE].conn = socket(PF_UNIX, SOCK_STREAM, 0);
+       if ( service_table[SRV_STORE].conn < 0 ) { perror("socket()"); return 1; }
+       memset(&a, 0, sizeof(a));
+       a.sun_family = AF_UNIX;
+       sprintf(sock_store, "%s%s", socket_path_prefix, "store.sock");
+       strcpy(a.sun_path, sock_store);
+
+       if( connect(service_table[SRV_STORE].conn, (struct sockaddr *)&a, sizeof(a.sun_path)) < 0) {
+               if( errno == ECONNREFUSED ) {
+                       dprintf(("removing stale input socket %s\n", sock_store));
+                       unlink(sock_store);
+               }
+       } else { perror("another instance of lb-proxy is running"); return 1; }
+
+       if ( bind(service_table[SRV_STORE].conn, (struct sockaddr *) &a, sizeof(a))) {
+               char    buf[100];
+
+               snprintf(buf, sizeof(buf), "bind(%s)", sock_store);
+               perror(buf);
+               return 1;
+       }
+       if ( listen(service_table[SRV_STORE].conn, CON_QUEUE) ) { perror("listen()"); return 1; }
+
+       dprintf(("Listening at %s, %s ...\n", sock_store, sock_serve));
+
+       if (!dbstring) dbstring = getenv("LBPROXYDB");
+       if (!dbstring) dbstring = DEFAULTCS;
+
+
+       /* Just check the database and let it be. The slaves do the job. */
+       /* XXX: InitContextProxy() !!!
+        * edg_wll_InitContext(&ctx) causes segfault
+        */
+       if ( !(ctx = (edg_wll_Context) malloc(sizeof(*ctx))) ) {
+               perror("InitContext()");
+               return -1;
+       }
+       memset(ctx, 0, sizeof(*ctx));
+       wait_for_open(ctx, dbstring);
+       if (edg_wll_DBCheckVersion(ctx)) {
+               char    *et,*ed;
+               edg_wll_Error(ctx,&et,&ed);
+
+               fprintf(stderr,"%s: open database: %s (%s)\n",argv[0],et,ed);
+               return 1;
+       }
+       edg_wll_Close(ctx);
+       edg_wll_FreeContext(ctx);
+
+       if ( !debug ) {
+               if ( daemon(1,0) == -1 ) { perror("deamon()"); exit(1); }
+
+               fpid = fopen(pidfile,"w");
+               if ( !fpid ) { perror(pidfile); return 1; }
+               fprintf(fpid, "%d", getpid());
+               fclose(fpid);
+               openlog(name, LOG_PID, LOG_DAEMON);
+       } else { setpgid(0, getpid()); }
+
+
+       glite_srvbones_set_param(GLITE_SBPARAM_SLAVES_CT, slaves);
+       glite_srvbones_set_param(GLITE_SBPARAM_SLAVE_OVERLOAD, SLAVE_OVERLOAD);
+       glite_srvbones_set_param(GLITE_SBPARAM_SLAVE_CONNS_MAX, SLAVE_CONNS_MAX);
+       to = (struct timeval){CLNT_TIMEOUT, 0};
+       glite_srvbones_set_param(GLITE_SBPARAM_CLNT_TIMEOUT, &to);
+       to = (struct timeval){TOTAL_CLNT_TIMEOUT, 0};
+       glite_srvbones_set_param(GLITE_SBPARAM_TOTAL_CLNT_TIMEOUT, &to);
+
+       glite_srvbones_run(clnt_data_init, service_table, sizofa(service_table), debug);
+
+
+       semctl(semset, 0, IPC_RMID, 0);
+       unlink(pidfile);
+       for ( i = 0; i < sizofa(service_table); i++ )
+               if ( service_table[i].conn >= 0 ) close(service_table[i].conn);
+       unlink(sock_serve);
+       unlink(sock_store);
+
+
+       return 0;
+}
+
+
+int clnt_data_init(void **data)
+{
+       edg_wll_Context                 ctx;
+       struct clnt_data_t         *cdata;
+
+
+       if ( !(cdata = calloc(1, sizeof(*cdata))) )
+               return -1;
+
+       if ( !(ctx = (edg_wll_Context) malloc(sizeof(*ctx))) ) { free(cdata); return -1; }
+       memset(ctx, 0, sizeof(*ctx));
+
+       dprintf(("[%d] opening database ...\n", getpid()));
+       wait_for_open(ctx, dbstring);
+       cdata->mysql = ctx->mysql;
+       edg_wll_FreeContext(ctx);
+
+       *data = cdata;
+       return 0;
+}
+
+       
+int handle_conn(int conn, struct timeval client_start, void *data)
+{
+       struct clnt_data_t *cdata = (struct clnt_data_t *)data;
+       edg_wll_Context         ctx;
+       struct timeval          total_to = { TOTAL_CLNT_TIMEOUT,0 };
+
+
+       if ( !(ctx = (edg_wll_Context) calloc(1, sizeof(*ctx))) ) {
+               fprintf(stderr, "Couldn't create context");
+               return -1;
+       }
+       cdata->ctx = ctx;
+
+       /* Shared structures (pointers)
+        */
+       ctx->mysql = cdata->mysql;
+       
+       /*      set globals
+        */
+       ctx->allowAnonymous = 1;
+       ctx->isProxy = 1;
+       ctx->noAuth = 1;
+       ctx->semset = semset;
+       ctx->semaphores = semaphores;
+
+       ctx->p_tmp_timeout.tv_sec = SLAVE_TIMEOUT;
+       ctx->p_tmp_timeout.tv_usec = 0;
+       if ( total_to.tv_sec < ctx->p_tmp_timeout.tv_sec ) {
+               ctx->p_tmp_timeout.tv_sec = total_to.tv_sec;
+               ctx->p_tmp_timeout.tv_usec = total_to.tv_usec;
+       }
+       
+       ctx->poolSize = 1;
+       ctx->connPool = calloc(1, sizeof(edg_wll_ConnPool));
+       ctx->connToUse = 0;
+
+       if ( edg_wll_plain_accept(conn, &ctx->connPool[0].conn) ) {
+               perror("accept");
+               edg_wll_FreeContext(ctx);
+
+               return -1;
+       } 
+
+
+       return 0;
+}
+
+
+int accept_store(int conn, void *cdata)
+{
+       edg_wll_Context         ctx = ((struct clnt_data_t *) cdata)->ctx;
+
+       if ( edg_wll_StoreProtoProxy(ctx) ) { 
+               char    *errt, *errd;
+
+               errt = errd = NULL;
+               switch ( edg_wll_Error(ctx, &errt, &errd) ) {
+               case ETIMEDOUT:
+               case EPIPE:
+                       dprintf(("[%d] %s (%s)\n", getpid(), errt, errd));
+                       if (!debug) syslog(LOG_ERR,"%s (%s)", errt, errd);
+                       /*      fallthrough
+                        */
+               case ENOTCONN:
+                       edg_wll_FreeContext(ctx);
+                       ctx = NULL;
+                       free(errt); free(errd);
+                       dprintf(("[%d] Connection closed\n", getpid()));
+                       return 1;
+                       break;
+
+               case ENOENT:
+               case EINVAL:
+               case EPERM:
+               case EEXIST:
+               case EDG_WLL_ERROR_NOINDEX:
+               case E2BIG:
+                       dprintf(("[%d] %s (%s)\n", getpid(), errt, errd));
+                       if ( !debug ) syslog(LOG_ERR, "%s (%s)", errt, errd);
+                       break;
+                       
+               default:
+                       dprintf(("[%d] %s (%s)\n", getpid(), errt, errd));
+                       if ( !debug ) syslog(LOG_CRIT, "%s (%s)", errt, errd);
+                       return -1;
+               } 
+               free(errt); free(errd);
+       }
+
+       return 0;
+}
+
+int accept_serve(int conn, void *cdata)
+{
+       edg_wll_Context         ctx = ((struct clnt_data_t *) cdata)->ctx;
+
+       /*
+        *      serve the request
+        */
+       if ( edg_wll_ServerHTTP(ctx) ) { 
+               char    *errt, *errd;
+
+               
+               errt = errd = NULL;
+               switch ( edg_wll_Error(ctx, &errt, &errd) ) {
+               case ETIMEDOUT:
+               case EPIPE:
+                       dprintf(("[%d] %s (%s)\n", getpid(), errt, errd));
+                       if (!debug) syslog(LOG_ERR,"%s (%s)", errt, errd);
+                       /*      fallthrough
+                        */
+               case ENOTCONN:
+                       edg_wll_FreeContext(ctx);
+                       ctx = NULL;
+                       free(errt); free(errd);
+                       dprintf(("[%d] Connection closed\n", getpid()));
+                       /*
+                        *      "recoverable" error - return (>0)
+                        */
+                       return 1;
+                       break;
+
+               case ENOENT:
+               case EINVAL:
+               case EPERM:
+               case EEXIST:
+               case EDG_WLL_ERROR_NOINDEX:
+               case E2BIG:
+                       dprintf(("[%d] %s (%s)\n", getpid(), errt, errd));
+                       if ( !debug ) syslog(LOG_ERR,"%s (%s)", errt, errd);
+                       /*
+                        *      no action for non-fatal errors
+                        */
+                       break;
+                       
+               default:
+                       dprintf(("[%d] %s (%s)\n", getpid(), errt, errd));
+                       if (!debug) syslog(LOG_CRIT,"%s (%s)",errt,errd);
+                       /*
+                        *      unknown error - do rather return (<0) (slave will be killed)
+                        */
+                       return -1;
+               } 
+               free(errt); free(errd);
+       }
+
+       return 0;
+}
+
+
+int clnt_disconnect(int conn, void *cdata)
+{
+       edg_wll_Context         ctx = ((struct clnt_data_t *) cdata)->ctx;
+
+       edg_wll_FreeContext(ctx);
+
+       return 0;
+}
+
+int clnt_reject(int conn)
+{
+       return 0;
+}
+
+static void wait_for_open(edg_wll_Context ctx, const char *dbstring)
+{
+       char    *dbfail_string1, *dbfail_string2;
+
+       dbfail_string1 = dbfail_string2 = NULL;
+
+       while (edg_wll_Open(ctx, (char *) dbstring)) {
+               char    *errt,*errd;
+
+               if (dbfail_string1) free(dbfail_string1);
+               edg_wll_Error(ctx,&errt,&errd);
+               asprintf(&dbfail_string1,"%s (%s)\n",errt,errd);
+               if (dbfail_string1 != NULL) {
+                       if (dbfail_string2 == NULL || strcmp(dbfail_string1,dbfail_string2)) {
+                               if (dbfail_string2) free(dbfail_string2);
+                               dbfail_string2 = dbfail_string1;
+                               dbfail_string1 = NULL;
+                               dprintf(("[%d]: %s\nStill trying ...\n",getpid(),dbfail_string2));
+                               if (!debug) syslog(LOG_ERR,dbfail_string2);
+                       }
+               }
+               sleep(5);
+       }
+
+       if (dbfail_string1) free(dbfail_string1);
+       if (dbfail_string2 != NULL) {
+               free(dbfail_string2);
+               dprintf(("[%d]: DB connection established\n",getpid()));
+               if (!debug) syslog(LOG_INFO,"DB connection established\n");
+       }
+}
index 7c29f77..52750f5 100644 (file)
@@ -10,6 +10,7 @@
 
 #include "glite/lb/il_string.h"
 #include "glite/lb/lb_gss.h"
+#include "glite/lb/lb_plain_io.h"
 #include "glite/lb/context-int.h"
 
 #include "store.h"
@@ -57,3 +58,40 @@ int edg_wll_StoreProto(edg_wll_Context ctx)
 
        return edg_wll_Error(ctx,NULL,NULL);
 }
+
+int edg_wll_StoreProtoProxy(edg_wll_Context ctx)
+{
+       char    fbuf[256], *buf;
+       int             len, ret;
+
+
+       edg_wll_ResetError(ctx);
+       ret = edg_wll_plain_read_full(&ctx->connPool[ctx->connToUse].conn,
+                               fbuf, 17, &ctx->p_tmp_timeout);
+       if ( ret < 0 ) return edg_wll_SetError(ctx, errno, "StoreProtoProxy() - reading data");
+
+       len = atoi(fbuf);
+       if ( len <= 0 ) return edg_wll_SetError(ctx, EINVAL, "message length");
+
+       buf = malloc(len+1);
+       if ( !buf ) return edg_wll_SetError(ctx, errno, "StoreProtoProxy()");
+
+       if ( edg_wll_plain_read_full(&ctx->connPool[ctx->connToUse].conn,
+                               buf, len, &ctx->p_tmp_timeout) < 0) {
+               free(buf);
+               return edg_wll_SetError(ctx, errno, "StoreProtoProxy() - reading data");
+       }
+
+       buf[len] = 0;
+       handle_request(ctx, buf, len);
+       free(buf);
+
+       if ( (len = create_reply(ctx, fbuf, sizeof fbuf)) ) {
+               if ( edg_wll_plain_write_full(&ctx->connPool[ctx->connToUse].conn,
+                                       fbuf, len, &ctx->p_tmp_timeout) < 0 )
+                       return edg_wll_SetError(ctx, errno, "StoreProtoProxy() - sending reply");
+       }
+       else edg_wll_SetError(ctx, E2BIG, "create_reply()");
+
+       return edg_wll_Error(ctx, NULL, NULL);
+}