--- /dev/null
+/**
+ * il_lbproxy.c
+ * - implementation of IL API calls for LB proxy
+ *
+ */
+#include <unistd.h>
+#include <fcntl.h>
+#include <sys/socket.h>
+#include <sys/un.h>
+#include <sys/types.h>
+#include <fcntl.h>
+#include <string.h>
+#include <errno.h>
+
+#include "glite/lb/context-int.h"
+#include "glite/lb/events_parse.h"
+#include "glite/lb/escape.h"
+#include "glite/lb/log_proto.h"
+
+#include "il_lbproxy.h"
+#include "lb_xml_parse.h"
+
+
+
+#define FCNTL_ATTEMPTS 5
+#define FCNTL_TIMEOUT 1
+#define FILE_PREFIX "/tmp/lbproxy_events"
+#define DEFAULT_SOCKET "/tmp/lbproxy_interlogger.sock"
+
+char *lbproxy_ilog_socket_path = DEFAULT_SOCKET;
+char *lbproxy_ilog_file_prefix = FILE_PREFIX;
+
+#define tv_sub(a,b) {\
+ (a).tv_usec -= (b).tv_usec;\
+ (a).tv_sec -= (b).tv_sec;\
+ if ((a).tv_usec < 0) {\
+ (a).tv_sec--;\
+ (a).tv_usec += 1000000;\
+ }\
+}
+
+int edg_wll_SendEventProxy(edg_wll_Context ctx, const char *owner)
+{
+ return 0;
+}
+
+static
+int
+notif_create_ulm(
+ edg_wll_Context context,
+ edg_wll_NotifId reg_id,
+ const char *host,
+ const uint16_t port,
+ const char *owner,
+ const char *notif_data,
+ char **ulm_data,
+ char **reg_id_s)
+{
+ int ret;
+ edg_wll_Event *event=NULL;
+
+ *ulm_data = NULL;
+ *reg_id_s = NULL;
+
+ event = edg_wll_InitEvent(EDG_WLL_EVENT_NOTIFICATION);
+
+ gettimeofday(&event->any.timestamp,0);
+ if (context->p_host) event->any.host = strdup(context->p_host);
+ event->any.level = context->p_level;
+ event->any.source = context->p_source;
+ if (context->p_instance) event->notification.src_instance = strdup(context->p_instance);
+ event->notification.notifId = reg_id;
+ if (owner) event->notification.owner = strdup(owner);
+ if (host) event->notification.dest_host = strdup(host);
+ event->notification.dest_port = port;
+ if (notif_data) event->notification.jobstat = strdup(notif_data);
+
+ if ((*ulm_data = edg_wll_UnparseNotifEvent(context,event)) == NULL) {
+ edg_wll_SetError(context, ret = ENOMEM, "edg_wll_UnparseNotifEvent()");
+ goto out;
+ }
+
+ if((*reg_id_s = edg_wll_NotifIdGetUnique(reg_id)) == NULL) {
+ edg_wll_SetError(context, ret = ENOMEM, "edg_wll_NotifIdGetUnique()");
+ goto out;
+ }
+
+ ret = 0;
+
+out:
+ if(event) {
+ edg_wll_FreeEvent(event);
+ free(event);
+ }
+ if(ret) edg_wll_UpdateError(context, ret, "notif_create_ulm()");
+ return(ret);
+}
+
+
+static
+int
+notif_save_to_file(edg_wll_Context context,
+ const char *event_file,
+ const char *ulm_data,
+ long *filepos)
+{
+ int ret;
+ FILE *outfile;
+ int filedesc;
+ struct flock filelock;
+ int i, filelock_status=-1;
+
+ for(i=0; i < FCNTL_ATTEMPTS; i++) {
+ /* fopen and properly handle the filelock */
+ if ((outfile = fopen(event_file,"a")) == NULL) {
+ edg_wll_SetError(context, ret = errno, "fopen()");
+ goto out;
+ }
+ if ((filedesc = fileno(outfile)) == -1) {
+ edg_wll_SetError(context, ret = errno, "fileno()");
+ goto out1;
+ }
+ filelock.l_type = F_WRLCK;
+ filelock.l_whence = SEEK_SET;
+ filelock.l_start = 0;
+ filelock.l_len = 0;
+ filelock_status=fcntl(filedesc, F_SETLK, &filelock);
+ if(filelock_status < 0) {
+ switch(errno) {
+ case EAGAIN:
+ case EACCES:
+ case EINTR:
+ /* lock is held by someone else */
+ sleep(FCNTL_TIMEOUT);
+ break;
+ default:
+ /* other error */
+ edg_wll_SetError(context, ret=errno, "fcntl()");
+ goto out1;
+ }
+ } else {
+ /* lock acquired, break out of the loop */
+ break;
+ }
+ }
+ if (fseek(outfile, 0, SEEK_END) == -1) {
+ edg_wll_SetError(context, ret = errno, "fseek()");
+ goto out1;
+ }
+ if ((*filepos=ftell(outfile)) == -1) {
+ edg_wll_SetError(context, ret = errno, "ftell()");
+ goto out1;
+ }
+ /* write, flush and sync */
+ if (fputs(ulm_data, outfile) == EOF) {
+ edg_wll_SetError(context, ret = errno, "fputs()");
+ goto out1;
+ }
+ if (fflush(outfile) == EOF) {
+ edg_wll_SetError(context, ret = errno, "fflush()");
+ goto out1;
+ }
+ if (fsync(filedesc) < 0) { /* synchronize */
+ edg_wll_SetError(context, ret = errno, "fsync()");
+ goto out1;
+ }
+
+ ret = 0;
+out1:
+ /* close and unlock */
+ fclose(outfile);
+out:
+ if(ret) edg_wll_UpdateError(context, ret, "notif_save_to_file()");
+ return(ret);
+}
+
+
+static
+ssize_t
+socket_write_full(edg_wll_Context context,
+ int sock,
+ void *buf,
+ size_t bufsize,
+ struct timeval *timeout,
+ ssize_t *total)
+{
+ int ret = 0;
+ ssize_t len;
+
+ *total = 0;
+ while (bufsize > 0) {
+
+ fd_set fds;
+ struct timeval to,before,after;
+
+ if (timeout) {
+ memcpy(&to, timeout, sizeof(to));
+ gettimeofday(&before, NULL);
+ }
+
+ len = write(sock, buf, bufsize);
+ while (len <= 0) {
+ FD_ZERO(&fds);
+ FD_SET(sock, &fds);
+ if (select(sock+1, &fds, NULL, NULL, timeout ? &to : NULL) < 0) {
+ edg_wll_SetError(context, ret = errno, "select()");
+ goto out;
+ }
+ len = write(sock, buf, bufsize);
+ }
+ if (timeout) {
+ gettimeofday(&after,NULL);
+ tv_sub(after, before);
+ tv_sub(*timeout,after);
+ if (timeout->tv_sec < 0) {
+ timeout->tv_sec = 0;
+ timeout->tv_usec = 0;
+ }
+ }
+
+ if (len < 0) {
+ edg_wll_SetError(context, ret = errno, "write()");
+ goto out;
+ }
+
+ bufsize -= len;
+ buf += len;
+ *total += len;
+ }
+
+ ret = 0;
+out:
+ if(ret) edg_wll_UpdateError(context, ret, "socket_write_full()");
+ return ret;
+}
+
+
+static
+int
+notif_send_socket(edg_wll_Context context,
+ long filepos,
+ const char *ulm_data)
+{
+ int ret;
+ struct sockaddr_un saddr;
+ int msg_sock, flags, count;
+ struct timeval timeout;
+
+ timeout.tv_sec = EDG_WLL_LOG_TIMEOUT_MAX;
+ timeout.tv_usec = 0;
+
+ msg_sock = socket(PF_UNIX, SOCK_STREAM, 0);
+ if(msg_sock < 0) {
+ edg_wll_SetError(context, ret = errno, "socket()");
+ goto out;
+ }
+
+ memset(&saddr, 0, sizeof(saddr));
+ saddr.sun_family = AF_UNIX;
+ strcpy(saddr.sun_path, lbproxy_ilog_socket_path);
+
+ if ((flags = fcntl(msg_sock, F_GETFL, 0)) < 0 ||
+ fcntl(msg_sock, F_SETFL, flags | O_NONBLOCK) < 0) {
+ edg_wll_SetError(context, ret = errno, "fcntl()");
+ goto out1;
+ }
+
+ if(connect(msg_sock, (struct sockaddr *)&saddr, sizeof(saddr)) < 0) {
+ if(errno != EISCONN) {
+ edg_wll_SetError(context, ret = errno, "connect()");
+ goto out1;
+ }
+ }
+
+ if (socket_write_full(context, msg_sock, &filepos, sizeof(filepos), &timeout, &count) < 0) {
+ ret = errno;
+ goto out1;
+ }
+
+ if (socket_write_full(context, msg_sock, (void*)ulm_data, strlen(ulm_data), &timeout, &count) < 0) {
+ ret = errno;
+ goto out1;
+ }
+
+ ret = 0;
+
+out1:
+ close(msg_sock);
+out:
+ if(ret) edg_wll_UpdateError(context, ret, "notif_send_socket()");
+ return(ret);
+}
+
+
+int
+edg_wll_NotifSend_a(edg_wll_Context context,
+ edg_wll_NotifId reg_id,
+ const char *host,
+ int port,
+ const char *owner,
+ const char *notif_data)
+{
+ int ret;
+ long filepos;
+ char *ulm_data, *reg_id_s, *event_file;
+
+ if((ret=notif_create_ulm(context,
+ reg_id,
+ host,
+ port,
+ owner,
+ notif_data,
+ &ulm_data,
+ ®_id_s))) {
+ goto out;
+ }
+
+ asprintf(&event_file, "%s.%s", lbproxy_ilog_file_prefix, reg_id_s);
+ if(event_file == NULL) {
+ edg_wll_SetError(context, ret=ENOMEM, "asprintf()");
+ goto out;
+ }
+
+ if((ret=notif_save_to_file(context,
+ event_file,
+ ulm_data,
+ &filepos))) {
+ goto out;
+ }
+
+ if((ret=notif_send_socket(context,
+ filepos,
+ ulm_data))) {
+ goto out;
+ }
+ ret = 0;
+
+out:
+ if(ulm_data) free(ulm_data);
+ if(reg_id_s) free(reg_id_s);
+ if(ret) edg_wll_UpdateError(context, ret, "edg_wll_NotifSend()");
+ return(ret);
+}
+
+
+int
+edg_wll_NotifJobStatus_a(edg_wll_Context context,
+ edg_wll_NotifId reg_id,
+ const char *host,
+ int port,
+ const char *owner,
+ const edg_wll_JobStat notif_job_stat)
+{
+ int ret=0;
+ char *xml_data, *xml_esc_data=NULL;
+
+ if(edg_wll_JobStatusToXML(context, notif_job_stat, &xml_data))
+ goto out;
+
+ if((xml_esc_data = edg_wll_EscapeXML(xml_data)) == NULL) {
+ edg_wll_SetError(context, ret=ENOMEM, "edg_wll_EscapeXML()");
+ goto out;
+ }
+
+ ret=edg_wll_NotifSend(context, reg_id, host, port, owner, xml_esc_data);
+
+out:
+ if(xml_data) free(xml_data);
+ if(xml_esc_data) free(xml_esc_data);
+ if(ret) edg_wll_UpdateError(context, ret, "edg_wll_NotifJobStatus()");
+ return(ret);
+}
--- /dev/null
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+#include <unistd.h>
+#include <getopt.h>
+#include <linux/limits.h>
+#include <sys/types.h>
+#include <sys/stat.h>
+#include <fcntl.h>
+#include <sys/wait.h>
+#include <sys/socket.h>
+#include <sys/uio.h>
+#include <sys/un.h>
+#include <netinet/in.h>
+#include <signal.h>
+#include <errno.h>
+#include <netdb.h>
+#include <limits.h>
+#include <syslog.h>
+#include <sys/time.h>
+#include <arpa/inet.h>
+#include <arpa/nameser.h>
+#include <resolv.h>
+#include <ares.h>
+#include <sys/ipc.h>
+#include <sys/sem.h>
+
+#include "glite/lb/srvbones.h"
+#include "glite/lb/context.h"
+#include "glite/lb/context-int.h"
+
+#include "il_lbproxy.h"
+#include "lb_http.h"
+#include "lbs_db.h"
+
+
+extern edg_wll_ErrorCode edg_wll_Open(edg_wll_Context ctx, char *cs);
+extern edg_wll_ErrorCode edg_wll_Close(edg_wll_Context);
+extern int edg_wll_StoreProtoProxy(edg_wll_Context ctx);
+
+#define DEFAULTCS "lbserver/@localhost:lbserver20"
+/*
+#define DEFAULTCS "lbserver/@localhost:lbproxy"
+*/
+
+#define CON_QUEUE 20 /* accept() */
+#define SLAVE_OVERLOAD 10 /* queue items per slave */
+#define CLNT_TIMEOUT 10 /* keep idle connection that many seconds */
+#define TOTAL_CLNT_TIMEOUT 60 /* one client may ask one slave multiple times */
+ /* but only limited time to avoid DoS attacks */
+#define CLNT_REJECT_TIMEOUT 100000 /* time limit for client rejection in !usec! */
+#define DNS_TIMEOUT 5 /* how long wait for DNS lookup */
+#define SLAVE_CONNS_MAX 500 /* commit suicide after that many connections */
+#define MASTER_TIMEOUT 30 /* maximal time of one-round of master network communication */
+#define SLAVE_TIMEOUT 30 /* maximal time of one-round of slave network communication */
+
+/* file to store pid and generate semaphores key
+ */
+#ifndef GLITE_LBPROXY_PIDFILE
+#define GLITE_LBPROXY_PIDFILE "/var/run/glite-lbproxy.pid"
+#endif
+
+#ifndef GLITE_LBPROXY_SOCK_PREFIX
+#define GLITE_LBPROXY_SOCK_PREFIX "/tmp/lb_proxy_"
+#endif
+
+#ifndef dprintf
+#define dprintf(x) { if (debug) printf x; }
+#endif
+
+#define sizofa(a) (sizeof(a)/sizeof((a)[0]))
+
+
+int debug = 0;
+static const int one = 1;
+static char *dbstring = NULL;
+static char sock_store[PATH_MAX],
+ sock_serve[PATH_MAX];
+static int slaves = 10,
+ semaphores = -1,
+ semset;
+
+
+static struct option opts[] = {
+ {"port", 1, NULL, 'p'},
+ {"debug", 0, NULL, 'd'},
+ {"mysql", 1, NULL, 'm'},
+ {"slaves", 1, NULL, 's'},
+ {"semaphores", 1, NULL, 'l'},
+ {"pidfile", 1, NULL, 'i'},
+ {"proxy-il-sock", 1, NULL, 'X'},
+ {"proxy-il-fprefix", 1, NULL, 'Y'},
+ {NULL,0,NULL,0}
+};
+
+static const char *get_opt_string = "p:dm:s:l:i:X:Y:";
+
+static void usage(char *me)
+{
+ fprintf(stderr,"usage: %s [option]\n"
+ "\t-p, --sock\t path-name to the local socket\n"
+ "\t-m, --mysql\t database connect string\n"
+ "\t-d, --debug\t don't run as daemon, additional diagnostics\n"
+ "\t-s, --slaves\t number of slave servers to fork\n"
+ "\t-l, --semaphores number of semaphores (job locks) to use\n"
+ "\t-i, --pidfile\t file to store master pid\n"
+ "\t--proxy-il-sock\t socket to send events to\n"
+ "\t--proxy-il-fprefix\t file prefix for events\n"
+ ,me);
+}
+
+static void wait_for_open(edg_wll_Context,const char *);
+
+
+/*
+ * SERVER BONES structures and handlers
+ */
+int clnt_data_init(void **);
+
+ /*
+ * Serve & Store handlers
+ */
+int clnt_reject(int);
+int handle_conn(int, struct timeval, void *);
+int accept_serve(int, void *);
+int accept_store(int, void *);
+int clnt_disconnect(int, void *);
+
+#define SRV_SERVE 0
+#define SRV_STORE 1
+static struct glite_srvbones_service service_table[] = {
+ { "serve", -1, handle_conn, accept_serve, clnt_reject, clnt_disconnect },
+ { "store", -1, handle_conn, accept_store, clnt_reject, clnt_disconnect },
+};
+
+struct clnt_data_t {
+ edg_wll_Context ctx;
+ void *mysql;
+};
+
+
+
+int main(int argc, char *argv[])
+{
+ int i;
+ struct sockaddr_un a;
+ int opt;
+ char pidfile[PATH_MAX] = GLITE_LBPROXY_PIDFILE,
+ socket_path_prefix[PATH_MAX] = GLITE_LBPROXY_SOCK_PREFIX,
+ *name;
+ FILE *fpid;
+ key_t semkey;
+ edg_wll_Context ctx;
+ struct timeval to;
+
+
+
+ name = strrchr(argv[0],'/');
+ if (name) name++; else name = argv[0];
+
+ if (geteuid()) snprintf(pidfile,sizeof pidfile,"%s/glite_lb_proxy.pid", getenv("HOME"));
+
+ while ((opt = getopt_long(argc, argv, get_opt_string, opts, NULL)) != EOF) switch (opt) {
+ case 'p': strcpy(socket_path_prefix, optarg); break;
+ case 'd': debug = 1; break;
+ case 'm': dbstring = optarg; break;
+ case 's': slaves = atoi(optarg); break;
+ case 'l': semaphores = atoi(optarg); break;
+ case 'X': lbproxy_ilog_socket_path = strdup(optarg); break;
+ case 'Y': lbproxy_ilog_file_prefix = strdup(optarg); break;
+ case 'i': strcpy(pidfile, optarg); break;
+ case '?': usage(name); return 1;
+ }
+
+ if ( optind < argc ) { usage(name); return 1; }
+
+ setlinebuf(stdout);
+ setlinebuf(stderr);
+
+ fpid = fopen(pidfile,"r");
+ if ( fpid ) {
+ int opid = -1;
+
+ if ( fscanf(fpid,"%d",&opid) == 1 ) {
+ if ( !kill(opid,0) ) {
+ fprintf(stderr,"%s: another instance running, pid = %d\n",argv[0],opid);
+ return 1;
+ }
+ else if (errno != ESRCH) { perror("kill()"); return 1; }
+ }
+ fclose(fpid);
+ } else if (errno != ENOENT) { perror(pidfile); return 1; }
+
+ fpid = fopen(pidfile, "w");
+ if ( !fpid ) { perror(pidfile); return 1; }
+ fprintf(fpid, "%d", getpid());
+ fclose(fpid);
+
+ semkey = ftok(pidfile,0);
+
+ if ( semaphores == -1 ) semaphores = slaves;
+ semset = semget(semkey, 0, 0);
+ if ( semset >= 0 ) semctl(semset, 0, IPC_RMID);
+ semset = semget(semkey, semaphores, IPC_CREAT | 0600);
+ if ( semset < 0 ) { perror("semget()"); return 1; }
+ dprintf(("Using %d semaphores, set id %d\n", semaphores, semset));
+ for ( i = 0; i < semaphores; i++ ) {
+ struct sembuf s;
+
+ s.sem_num = i; s.sem_op = 1; s.sem_flg = 0;
+ if (semop(semset,&s,1) == -1) { perror("semop()"); return 1; }
+ }
+
+ service_table[SRV_SERVE].conn = socket(PF_UNIX, SOCK_STREAM, 0);
+ if ( service_table[SRV_SERVE].conn < 0 ) { perror("socket()"); return 1; }
+ memset(&a, 0, sizeof(a));
+ a.sun_family = AF_UNIX;
+ sprintf(sock_serve, "%s%s", socket_path_prefix, "serve.sock");
+ strcpy(a.sun_path, sock_serve);
+
+ if( connect(service_table[SRV_SERVE].conn, (struct sockaddr *)&a, sizeof(a.sun_path)) < 0) {
+ if( errno == ECONNREFUSED ) {
+ dprintf(("removing stale input socket %s\n", sock_serve));
+ unlink(sock_serve);
+ }
+ } else { perror("another instance of lb-proxy is running"); return 1; }
+
+ if ( bind(service_table[SRV_SERVE].conn, (struct sockaddr *) &a, sizeof(a)) < 0 ) {
+ char buf[100];
+
+ snprintf(buf, sizeof(buf), "bind(%s)", sock_serve);
+ perror(buf);
+ return 1;
+ }
+
+ if ( listen(service_table[SRV_SERVE].conn, CON_QUEUE) ) { perror("listen()"); return 1; }
+
+ service_table[SRV_STORE].conn = socket(PF_UNIX, SOCK_STREAM, 0);
+ if ( service_table[SRV_STORE].conn < 0 ) { perror("socket()"); return 1; }
+ memset(&a, 0, sizeof(a));
+ a.sun_family = AF_UNIX;
+ sprintf(sock_store, "%s%s", socket_path_prefix, "store.sock");
+ strcpy(a.sun_path, sock_store);
+
+ if( connect(service_table[SRV_STORE].conn, (struct sockaddr *)&a, sizeof(a.sun_path)) < 0) {
+ if( errno == ECONNREFUSED ) {
+ dprintf(("removing stale input socket %s\n", sock_store));
+ unlink(sock_store);
+ }
+ } else { perror("another instance of lb-proxy is running"); return 1; }
+
+ if ( bind(service_table[SRV_STORE].conn, (struct sockaddr *) &a, sizeof(a))) {
+ char buf[100];
+
+ snprintf(buf, sizeof(buf), "bind(%s)", sock_store);
+ perror(buf);
+ return 1;
+ }
+ if ( listen(service_table[SRV_STORE].conn, CON_QUEUE) ) { perror("listen()"); return 1; }
+
+ dprintf(("Listening at %s, %s ...\n", sock_store, sock_serve));
+
+ if (!dbstring) dbstring = getenv("LBPROXYDB");
+ if (!dbstring) dbstring = DEFAULTCS;
+
+
+ /* Just check the database and let it be. The slaves do the job. */
+ /* XXX: InitContextProxy() !!!
+ * edg_wll_InitContext(&ctx) causes segfault
+ */
+ if ( !(ctx = (edg_wll_Context) malloc(sizeof(*ctx))) ) {
+ perror("InitContext()");
+ return -1;
+ }
+ memset(ctx, 0, sizeof(*ctx));
+ wait_for_open(ctx, dbstring);
+ if (edg_wll_DBCheckVersion(ctx)) {
+ char *et,*ed;
+ edg_wll_Error(ctx,&et,&ed);
+
+ fprintf(stderr,"%s: open database: %s (%s)\n",argv[0],et,ed);
+ return 1;
+ }
+ edg_wll_Close(ctx);
+ edg_wll_FreeContext(ctx);
+
+ if ( !debug ) {
+ if ( daemon(1,0) == -1 ) { perror("deamon()"); exit(1); }
+
+ fpid = fopen(pidfile,"w");
+ if ( !fpid ) { perror(pidfile); return 1; }
+ fprintf(fpid, "%d", getpid());
+ fclose(fpid);
+ openlog(name, LOG_PID, LOG_DAEMON);
+ } else { setpgid(0, getpid()); }
+
+
+ glite_srvbones_set_param(GLITE_SBPARAM_SLAVES_CT, slaves);
+ glite_srvbones_set_param(GLITE_SBPARAM_SLAVE_OVERLOAD, SLAVE_OVERLOAD);
+ glite_srvbones_set_param(GLITE_SBPARAM_SLAVE_CONNS_MAX, SLAVE_CONNS_MAX);
+ to = (struct timeval){CLNT_TIMEOUT, 0};
+ glite_srvbones_set_param(GLITE_SBPARAM_CLNT_TIMEOUT, &to);
+ to = (struct timeval){TOTAL_CLNT_TIMEOUT, 0};
+ glite_srvbones_set_param(GLITE_SBPARAM_TOTAL_CLNT_TIMEOUT, &to);
+
+ glite_srvbones_run(clnt_data_init, service_table, sizofa(service_table), debug);
+
+
+ semctl(semset, 0, IPC_RMID, 0);
+ unlink(pidfile);
+ for ( i = 0; i < sizofa(service_table); i++ )
+ if ( service_table[i].conn >= 0 ) close(service_table[i].conn);
+ unlink(sock_serve);
+ unlink(sock_store);
+
+
+ return 0;
+}
+
+
+int clnt_data_init(void **data)
+{
+ edg_wll_Context ctx;
+ struct clnt_data_t *cdata;
+
+
+ if ( !(cdata = calloc(1, sizeof(*cdata))) )
+ return -1;
+
+ if ( !(ctx = (edg_wll_Context) malloc(sizeof(*ctx))) ) { free(cdata); return -1; }
+ memset(ctx, 0, sizeof(*ctx));
+
+ dprintf(("[%d] opening database ...\n", getpid()));
+ wait_for_open(ctx, dbstring);
+ cdata->mysql = ctx->mysql;
+ edg_wll_FreeContext(ctx);
+
+ *data = cdata;
+ return 0;
+}
+
+
+int handle_conn(int conn, struct timeval client_start, void *data)
+{
+ struct clnt_data_t *cdata = (struct clnt_data_t *)data;
+ edg_wll_Context ctx;
+ struct timeval total_to = { TOTAL_CLNT_TIMEOUT,0 };
+
+
+ if ( !(ctx = (edg_wll_Context) calloc(1, sizeof(*ctx))) ) {
+ fprintf(stderr, "Couldn't create context");
+ return -1;
+ }
+ cdata->ctx = ctx;
+
+ /* Shared structures (pointers)
+ */
+ ctx->mysql = cdata->mysql;
+
+ /* set globals
+ */
+ ctx->allowAnonymous = 1;
+ ctx->isProxy = 1;
+ ctx->noAuth = 1;
+ ctx->semset = semset;
+ ctx->semaphores = semaphores;
+
+ ctx->p_tmp_timeout.tv_sec = SLAVE_TIMEOUT;
+ ctx->p_tmp_timeout.tv_usec = 0;
+ if ( total_to.tv_sec < ctx->p_tmp_timeout.tv_sec ) {
+ ctx->p_tmp_timeout.tv_sec = total_to.tv_sec;
+ ctx->p_tmp_timeout.tv_usec = total_to.tv_usec;
+ }
+
+ ctx->poolSize = 1;
+ ctx->connPool = calloc(1, sizeof(edg_wll_ConnPool));
+ ctx->connToUse = 0;
+
+ if ( edg_wll_plain_accept(conn, &ctx->connPool[0].conn) ) {
+ perror("accept");
+ edg_wll_FreeContext(ctx);
+
+ return -1;
+ }
+
+
+ return 0;
+}
+
+
+int accept_store(int conn, void *cdata)
+{
+ edg_wll_Context ctx = ((struct clnt_data_t *) cdata)->ctx;
+
+ if ( edg_wll_StoreProtoProxy(ctx) ) {
+ char *errt, *errd;
+
+ errt = errd = NULL;
+ switch ( edg_wll_Error(ctx, &errt, &errd) ) {
+ case ETIMEDOUT:
+ case EPIPE:
+ dprintf(("[%d] %s (%s)\n", getpid(), errt, errd));
+ if (!debug) syslog(LOG_ERR,"%s (%s)", errt, errd);
+ /* fallthrough
+ */
+ case ENOTCONN:
+ edg_wll_FreeContext(ctx);
+ ctx = NULL;
+ free(errt); free(errd);
+ dprintf(("[%d] Connection closed\n", getpid()));
+ return 1;
+ break;
+
+ case ENOENT:
+ case EINVAL:
+ case EPERM:
+ case EEXIST:
+ case EDG_WLL_ERROR_NOINDEX:
+ case E2BIG:
+ dprintf(("[%d] %s (%s)\n", getpid(), errt, errd));
+ if ( !debug ) syslog(LOG_ERR, "%s (%s)", errt, errd);
+ break;
+
+ default:
+ dprintf(("[%d] %s (%s)\n", getpid(), errt, errd));
+ if ( !debug ) syslog(LOG_CRIT, "%s (%s)", errt, errd);
+ return -1;
+ }
+ free(errt); free(errd);
+ }
+
+ return 0;
+}
+
+int accept_serve(int conn, void *cdata)
+{
+ edg_wll_Context ctx = ((struct clnt_data_t *) cdata)->ctx;
+
+ /*
+ * serve the request
+ */
+ if ( edg_wll_ServerHTTP(ctx) ) {
+ char *errt, *errd;
+
+
+ errt = errd = NULL;
+ switch ( edg_wll_Error(ctx, &errt, &errd) ) {
+ case ETIMEDOUT:
+ case EPIPE:
+ dprintf(("[%d] %s (%s)\n", getpid(), errt, errd));
+ if (!debug) syslog(LOG_ERR,"%s (%s)", errt, errd);
+ /* fallthrough
+ */
+ case ENOTCONN:
+ edg_wll_FreeContext(ctx);
+ ctx = NULL;
+ free(errt); free(errd);
+ dprintf(("[%d] Connection closed\n", getpid()));
+ /*
+ * "recoverable" error - return (>0)
+ */
+ return 1;
+ break;
+
+ case ENOENT:
+ case EINVAL:
+ case EPERM:
+ case EEXIST:
+ case EDG_WLL_ERROR_NOINDEX:
+ case E2BIG:
+ dprintf(("[%d] %s (%s)\n", getpid(), errt, errd));
+ if ( !debug ) syslog(LOG_ERR,"%s (%s)", errt, errd);
+ /*
+ * no action for non-fatal errors
+ */
+ break;
+
+ default:
+ dprintf(("[%d] %s (%s)\n", getpid(), errt, errd));
+ if (!debug) syslog(LOG_CRIT,"%s (%s)",errt,errd);
+ /*
+ * unknown error - do rather return (<0) (slave will be killed)
+ */
+ return -1;
+ }
+ free(errt); free(errd);
+ }
+
+ return 0;
+}
+
+
+int clnt_disconnect(int conn, void *cdata)
+{
+ edg_wll_Context ctx = ((struct clnt_data_t *) cdata)->ctx;
+
+ edg_wll_FreeContext(ctx);
+
+ return 0;
+}
+
+int clnt_reject(int conn)
+{
+ return 0;
+}
+
+static void wait_for_open(edg_wll_Context ctx, const char *dbstring)
+{
+ char *dbfail_string1, *dbfail_string2;
+
+ dbfail_string1 = dbfail_string2 = NULL;
+
+ while (edg_wll_Open(ctx, (char *) dbstring)) {
+ char *errt,*errd;
+
+ if (dbfail_string1) free(dbfail_string1);
+ edg_wll_Error(ctx,&errt,&errd);
+ asprintf(&dbfail_string1,"%s (%s)\n",errt,errd);
+ if (dbfail_string1 != NULL) {
+ if (dbfail_string2 == NULL || strcmp(dbfail_string1,dbfail_string2)) {
+ if (dbfail_string2) free(dbfail_string2);
+ dbfail_string2 = dbfail_string1;
+ dbfail_string1 = NULL;
+ dprintf(("[%d]: %s\nStill trying ...\n",getpid(),dbfail_string2));
+ if (!debug) syslog(LOG_ERR,dbfail_string2);
+ }
+ }
+ sleep(5);
+ }
+
+ if (dbfail_string1) free(dbfail_string1);
+ if (dbfail_string2 != NULL) {
+ free(dbfail_string2);
+ dprintf(("[%d]: DB connection established\n",getpid()));
+ if (!debug) syslog(LOG_INFO,"DB connection established\n");
+ }
+}