From 8a32a0a1a576d31cd73b894952860c8fea69d5bd Mon Sep 17 00:00:00 2001 From: =?utf8?q?Franti=C5=A1ek=20Dvo=C5=99=C3=A1k?= Date: Fri, 25 May 2007 15:02:05 +0000 Subject: [PATCH] Initial import - moved from lb.server-bones. --- org.glite.lbjp-utils.server-bones/.cvsignore | 1 + org.glite.lbjp-utils.server-bones/Makefile | 95 +++ .../examples/cnt_example.c | 179 ++++++ .../examples/srv_example.c | 224 +++++++ .../interface/srvbones.h | 92 +++ org.glite.lbjp-utils.server-bones/src/srvbones.c | 661 +++++++++++++++++++++ 6 files changed, 1252 insertions(+) create mode 100644 org.glite.lbjp-utils.server-bones/.cvsignore create mode 100644 org.glite.lbjp-utils.server-bones/Makefile create mode 100644 org.glite.lbjp-utils.server-bones/examples/cnt_example.c create mode 100644 org.glite.lbjp-utils.server-bones/examples/srv_example.c create mode 100644 org.glite.lbjp-utils.server-bones/interface/srvbones.h create mode 100644 org.glite.lbjp-utils.server-bones/src/srvbones.c diff --git a/org.glite.lbjp-utils.server-bones/.cvsignore b/org.glite.lbjp-utils.server-bones/.cvsignore new file mode 100644 index 0000000..378eac2 --- /dev/null +++ b/org.glite.lbjp-utils.server-bones/.cvsignore @@ -0,0 +1 @@ +build diff --git a/org.glite.lbjp-utils.server-bones/Makefile b/org.glite.lbjp-utils.server-bones/Makefile new file mode 100644 index 0000000..100ef16 --- /dev/null +++ b/org.glite.lbjp-utils.server-bones/Makefile @@ -0,0 +1,95 @@ +# defaults +top_srcdir=. +stagedir=. +distdir=. +globalprefix=glite +lbprefix=lb +package=glite-lb-server-bones +version=0.0.1 +PREFIX=/opt/glite + +glite_location=/opt/glite + +CC=gcc + +-include Makefile.inc +-include ../project/version.properties + +version=${module.version} + +VPATH=${top_srcdir}/src:${top_srcdir}/examples + +DEBUG:=-g -O0 -Wall +CFLAGS:= ${DEBUG} -I${top_srcdir}/interface +LDFLAGS:= + +ifdef LB_PROF + CFLAGS:= ${CFLAGS} -pg -g + LDFLAGS:= ${LDFLAGS} -pg +endif + +COMPILE:=libtool --mode=compile ${CC} ${CFLAGS} +LINK:=libtool --mode=link ${CC} -rpath ${stagedir}/lib ${LDFLAGS} +INSTALL:=libtool --mode=install install + +STATICLIB:=libglite_lb_server_bones.a +LTLIB:=libglite_lb_server_bones.la + +OBJS:=srvbones.o +LOBJS:=${OBJS:.o=.lo} + +HDRS:=srvbones.h + +default all: compile + +compile: ${STATICLIB} ${LTLIB} example + +${STATICLIB}: ${OBJS} + ar crv $@ ${OBJS} + ranlib $@ + +${LTLIB}: ${LOBJS} + ${LINK} -o $@ ${LOBJS} + +stage: compile + $(MAKE) install PREFIX=${stagedir} DOSTAGE=yes + +check: + -echo "No unit tests so far." + +example: srv_example cnt_example + +srv_example: srv_example.o + ${LINK} -o $@ ${LTLIB} srv_example.o + +cnt_example: cnt_example.o + ${LINK} -o $@ cnt_example.o + +doc: + +dist: distsrc distbin + +distsrc: + mkdir -p ${top_srcdir}/${package}-${version} + cd ${top_srcdir} && GLOBIGNORE="${package}-${version}" && cp -Rf * ${package}-${version} + cd ${top_srcdir} && tar -czf ${distdir}/${package}-${version}_src.tar.gz --exclude-from=project/tar_exclude ${package}-${version} + rm -rf ${top_srcdir}/${package}-${version} + +distbin: + $(MAKE) install PREFIX=`pwd`/tmpbuilddir${stagedir} + save_dir=`pwd`; cd tmpbuilddir${stagedir} && tar -czf $$save_dir/${top_srcdir}/${distdir}/${package}-${version}_bin.tar.gz *; cd $$save_dir + rm -rf tmpbuilddir + +install: + mkdir -p ${PREFIX}/include/${globalprefix}/${lbprefix} + mkdir -p ${PREFIX}/lib + ${INSTALL} -m 644 ${LTLIB} ${PREFIX}/lib + if [ x${DOSTAGE} = xyes ]; then \ + ${INSTALL} -m 644 ${STATICLIB} ${PREFIX}/lib ; \ + cd ${top_srcdir}/interface && install -m 644 ${HDRS} ${PREFIX}/include/${globalprefix}/${lbprefix} ; \ + fi + +clean: + +%.o: %.c + ${COMPILE} -c $< diff --git a/org.glite.lbjp-utils.server-bones/examples/cnt_example.c b/org.glite.lbjp-utils.server-bones/examples/cnt_example.c new file mode 100644 index 0000000..80b6af3 --- /dev/null +++ b/org.glite.lbjp-utils.server-bones/examples/cnt_example.c @@ -0,0 +1,179 @@ +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#ifndef dprintf +#define dprintf(x) { if (debug) printf x; fflush(stdout); } +#endif + +#define DEF_MSG "Test message\n" +#define DEF_PORT 9999 + +static struct option opts[] = { + { "help", no_argument, NULL, 'h'}, + { "debug", no_argument, NULL, 'd'}, + { "msg", required_argument, NULL, 'm'}, + { "port", required_argument, NULL, 'p'}, + { "repeat", required_argument, NULL, 'r'}, +}; + +int debug = 0; +int port = DEF_PORT; +char *msg = NULL; + +static int writen(int fd, char *ptr, int nbytes); +static int readln(int fd, char *out); + +static void usage(char *me) +{ + fprintf(stderr, + "usage: %s [option]\n" + " -h, --help print this screen\n" + " -d, --debug prints debug messages\n" + " -m, --msg message to send\n" + " -p, --port service port\n", me); +} + + +int main(int argc, char **argv) +{ + struct sockaddr_in addr; + char buff[512], + *me; + int opt, + sock, + n; + int repeat = 1; + + me = strrchr(argv[0], '/'); + if ( me ) me++; else me = argv[0]; + while ( (opt = getopt_long(argc, argv,"p:m:hdr:", opts, NULL)) != EOF ) + { + switch ( opt ) + { + case 'm': + msg = strdup(optarg); + break; + case 'p': + port = atoi(optarg); + break; + case 'd': debug = 1; break; + case 'r': repeat = atoi(optarg); break; + case 'h': usage(me); return 0; + case '?': usage(me); return 1; + } + } + + bzero((char *) &addr, sizeof(addr)); + addr.sin_family = AF_INET; + addr.sin_addr.s_addr = inet_addr("127.0.0.1"); + addr.sin_port = htons(port); + if ( (sock = socket(AF_INET, SOCK_STREAM, 0)) < 0 ) + { + perror("socket"); + exit(1); + } + if ( connect(sock, (struct sockaddr *) &addr, sizeof(addr)) < 0 ) + { + perror("connect"); + exit(1); + } + n = strlen(msg? msg: DEF_MSG); + for (;repeat; repeat--) { + if ( writen(sock, msg? msg: DEF_MSG, n) != n ) + { + dprintf(("error writing message\n")); + exit(1); + } + printf("reply: "); fflush(stdout); + n = readln(sock, buff); + if ( n < 0 ) + { + perror("read() reply error"); + return 1; + } + writen(0, buff, n); + } + close(sock); + + return 0; +} + +int writen(int fd, char *ptr, int nbytes) +{ + int nleft, nwritten; + + nleft = nbytes; + dprintf(("start writing %d bytes\n", nbytes)); + while ( nleft > 0 ) + { + nwritten = write(fd, ptr, nleft); + dprintf(("written %d bytes", nwritten)); + if ( nwritten <= 0 ) + return (nwritten); + + nleft -= nwritten; + ptr += nwritten; + dprintf((" (left %d bytes)\n", nleft)); + } + + dprintf(("written %d bytes (return: %d)\n", nwritten, nbytes - nleft)); + return (nbytes - nleft); +} + +#define BUFFER_SZ 512 + +int readln(int fd, char *out) +{ + static char buffer[BUFFER_SZ]; + static char *buffer_end = buffer; + int n; + + + dprintf(("reading line\n")); + while ( 1 ) { + if ( buffer_end - buffer ) { + /* buffer contains data + */ + char *endl; + + dprintf(("nonempty buffer\n")); + if ( (endl = memchr(buffer, '\n', buffer_end-buffer)) ) { + int linesz = endl-buffer+1; + + dprintf(("using buffer data\n")); + memcpy(out, buffer, linesz); + if ( endl+1 != buffer_end ) + memmove(buffer, endl+1, buffer_end-endl-1); + buffer_end -= linesz; + return linesz; + } + } + dprintf(("reading...\n")); + n = read(fd, buffer_end, BUFFER_SZ-(buffer_end-buffer)); + if ( n < 0 ) { + if ( errno == EAGAIN ) continue; + dprintf(("reading error\n")); + return n; + } + else if ( n == 0 ) { + int ret = buffer_end-buffer; + dprintf(("end of reading - returning %d bytes\n", ret)); + memcpy(out, buffer, ret); + buffer_end = buffer; + return ret; + } + + dprintf(("read %d bytes\n", n)); + buffer_end += n; + } + + return 0; +} diff --git a/org.glite.lbjp-utils.server-bones/examples/srv_example.c b/org.glite.lbjp-utils.server-bones/examples/srv_example.c new file mode 100644 index 0000000..0a19ab4 --- /dev/null +++ b/org.glite.lbjp-utils.server-bones/examples/srv_example.c @@ -0,0 +1,224 @@ +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include "srvbones.h" + +#ifndef dprintf +#define dprintf(x) { if (debug) printf x; fflush(stdout); } +#endif + +#define sizofa(a) (sizeof(a)/sizeof((a)[0])) + +int debug = 1; + +static int writen(int fd, char *ptr, int nbytes); +static int readln(int fd, char *out, int nbytes); + +static int new_conn(int, struct timeval *, void *); +static int reject(int); +static int disconnect(int, struct timeval *, void *); + +static int echo(int, struct timeval *, void *); +static int upper_echo(int, struct timeval *, void *); + +#define ECHO_PORT 9999 +#define UPPER_ECHO_PORT 9998 + +#define SRV_ECHO 0 +#define SRV_UPPER_ECHO 1 + +static struct glite_srvbones_service service_table[] = { + { "Echo Service", -1, new_conn, echo, reject, disconnect }, + { "Upper Echo Service", -1, new_conn, upper_echo, reject, disconnect } +}; + +int main(void) +{ + struct sockaddr_in myaddr; + + + if ( ((service_table[SRV_ECHO].conn = socket(AF_INET, SOCK_STREAM, 0)) == -1) + || ((service_table[SRV_UPPER_ECHO].conn = socket(AF_INET, SOCK_STREAM, 0)) == -1) ) + { + perror("socket"); + exit(1); + } + + bzero((char *) &myaddr, sizeof(myaddr)); + myaddr.sin_family = AF_INET; + myaddr.sin_addr.s_addr = htonl(INADDR_ANY); + myaddr.sin_port = htons(ECHO_PORT); + if ( bind(service_table[SRV_ECHO].conn, (struct sockaddr *)&myaddr, sizeof(myaddr)) == -1 ) + { + perror("bind"); + exit(1); + } + bzero((char *) &myaddr, sizeof(myaddr)); + myaddr.sin_family = AF_INET; + myaddr.sin_addr.s_addr = htonl(INADDR_ANY); + myaddr.sin_port = htons(UPPER_ECHO_PORT); + if ( bind(service_table[SRV_UPPER_ECHO].conn, (struct sockaddr *)&myaddr, sizeof(myaddr)) == -1 ) + { + perror("bind"); + exit(1); + } + + if ( listen(service_table[SRV_ECHO].conn, 10) + || listen(service_table[SRV_UPPER_ECHO].conn, 10) ) + { + perror("listen()"); + exit(1); + } + + + glite_srvbones_set_param(GLITE_SBPARAM_SLAVES_COUNT, 1); + glite_srvbones_run(NULL, service_table, sizofa(service_table), 1); + + + return 0; +} + +int upper_echo(int fd, struct timeval *to, void *data) +{ + int n, i; + char line[80]; + + n = readln(fd, line, 80); + if ( n < 0 ) + { + perror("read() message"); + return n; + } + else if ( n == 0 ) + return ENOTCONN; + + for ( i = 0; i < n; i++ ) + line[i] = toupper(line[i]); + + if ( writen(fd, line, n) != n ) + { + perror("write() message back"); + return -1; + } + + return 0; +} + +int echo(int fd, struct timeval *to, void *data) +{ + int n; + char line[80]; + + n = readln(fd, line, 80); + dprintf(("%d bytes read\n", n)); + if ( n < 0 ) + { + perror("read() message"); + return n; + } + else if ( n == 0 ) + return ENOTCONN; + + if ( writen(fd, line, n) != n ) + { + perror("write() message back"); + return -1; + } + + return 0; +} + +int new_conn(int conn, struct timeval *to, void *cdata) +{ + dprintf(("srv-bones example: new_conn handler\n")); + return 0; +} + +int reject(int conn) +{ + dprintf(("srv-bones example: reject handler\n")); + return 0; +} + +int disconnect(int conn, struct timeval *to, void *cdata) +{ + dprintf(("srv-bones example: disconnect handler\n")); + return 0; +} + +int writen(int fd, char *ptr, int nbytes) +{ + int nleft, nwritten; + + nleft = nbytes; + dprintf(("start writing %d bytes\n", nbytes)); + while ( nleft > 0 ) { + nwritten = write(fd, ptr, nleft); + dprintf(("written %d bytes", nwritten)); + if ( nwritten <= 0 ) + return (nwritten); + + nleft -= nwritten; + ptr += nwritten; + dprintf((" (left %d bytes)\n", nleft)); + } + + dprintf(("written %d bytes (return: %d)\n", nwritten, nbytes - nleft)); + return (nbytes - nleft); +} + +#define BUFFER_SZ 512 + +int readln(int fd, char *out, int nbytes) +{ + static char buffer[BUFFER_SZ]; + static char *buffer_end = buffer; + int n; + + + dprintf(("reading line\n")); + while ( 1 ) { + if ( buffer_end - buffer ) { + /* buffer contains data + */ + char *endl; + + dprintf(("nonempty buffer\n")); + if ( (endl = memchr(buffer, '\n', buffer_end-buffer)) ) { + int linesz = endl-buffer+1; + + memcpy(out, buffer, linesz); + if ( endl+1 != buffer_end ) memmove(buffer, endl+1, buffer_end-endl-1); + buffer_end -= linesz; + return linesz; + } + } + + dprintf(("reding...\n")); + n = read(fd, buffer_end, BUFFER_SZ-(buffer_end-buffer)); + if ( n < 0 ) { + if ( errno == EAGAIN ) n = 0; + else return n; + } + if ( n == 0 ) { + int ret = buffer_end-buffer; + dprintf(("end of reading - returning %d bytes\n", ret)); + memcpy(out, buffer, ret); + buffer_end = buffer; + return ret; + } + dprintf(("read %d bytes\n", n)); + + buffer_end += n; + } + + return 0; +} diff --git a/org.glite.lbjp-utils.server-bones/interface/srvbones.h b/org.glite.lbjp-utils.server-bones/interface/srvbones.h new file mode 100644 index 0000000..f238705 --- /dev/null +++ b/org.glite.lbjp-utils.server-bones/interface/srvbones.h @@ -0,0 +1,92 @@ +#ifndef __ORG_GLITE_LB_SERVER_BONES_BONES_H__ +#define __ORG_GLITE_LB_SERVER_BONES_BONES_H__ + +#ifdef __cplusplus +extern "C" { +#endif + +typedef enum _glite_srvbones_param_t { + GLITE_SBPARAM_SLAVES_COUNT, /**< number of slaves */ + GLITE_SBPARAM_SLAVE_OVERLOAD, /**< queue items per slave */ + GLITE_SBPARAM_SLAVE_CONNS_MAX, /**< commit suicide after that many connections */ + +/* NULL for timeouts means infinity */ + GLITE_SBPARAM_IDLE_TIMEOUT, /**< keep idle connection that long (timeval) */ + GLITE_SBPARAM_CONNECT_TIMEOUT, /**< timeout for establishing a connection (timeval) */ + GLITE_SBPARAM_REQUEST_TIMEOUT, /**< timeout for a single request (timeval)*/ +} glite_srvbones_param_t; + +typedef int (*slave_data_init_hnd)(void **); + +struct glite_srvbones_service { + char *id; /**< name of the service */ + int conn; /**< listening socket */ + +/** Handler called by slave on a newly established connection, + * i.e. after accept(2). + * \param[in] conn the accepted connection + * \param[inout] timeout don't consume more, update with the remaining time + * \param[inout] user_data arbitrary user data passed among the functions + */ + int (*on_new_conn_hnd)( + int conn, + struct timeval *timeout, + void *user_data + ); + + +/** Handler called by slave to serve each request. + * \param[in] conn connection to work with + * \param[inout] timeout don't consume more, update with the remaining time + * \param[inout] user_data arbitrary user data passed among the functions + * + * \retval 0 OK, connection remains open + * \retval ENOTCON terminated gracefully, bones will clean up + * \retval >0 other POSIX errno, non-fatal error + * \retval <0 fatal error, terminate slave + */ + int (*on_request_hnd)( + int conn, + struct timeval *timeout, + void *user_data + ); + +/** Handler called by master to reject connection on server overload. + * Should kick off the client quickly, not imposing aditional load + * on server or blocking long time. + */ + int (*on_reject_hnd)(int conn); + +/** Handler called by slave before closing the connection. + * Perform server-side cleanup, and terminate the connection gracefully + * if there is a way to do so (the disconnect is server-initiated). + * close(conn) is called by bones then. + * \param[in] conn connection to work with + * \param[inout] timeout don't consume more time + * \param[inout] user_data arbitrary user data passed among the functions + */ + int (*on_disconnect_hnd)( + int conn, + struct timeval *timeout, + void *user_data + ); +}; + +extern int glite_srvbones_set_param(glite_srvbones_param_t param, ...); + + +/** Main server function. + * + * \param[in] slave_data_init_hnd callback initializing user data on every slave + */ +extern int glite_srvbones_run( + slave_data_init_hnd slave_data_init, + struct glite_srvbones_service *service_table, + size_t table_sz, + int dbg); + +#ifdef __cplusplus +} +#endif + +#endif /* __ORG_GLITE_LB_SERVER_BONES_BONES_H__ */ diff --git a/org.glite.lbjp-utils.server-bones/src/srvbones.c b/org.glite.lbjp-utils.server-bones/src/srvbones.c new file mode 100644 index 0000000..1d42a3f --- /dev/null +++ b/org.glite.lbjp-utils.server-bones/src/srvbones.c @@ -0,0 +1,661 @@ +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include "srvbones.h" + +/* defaults for GLITE_SBPARAM_* */ + +#define SLAVES_COUNT 5 /* default number of slaves */ +#define SLAVE_OVERLOAD 10 /* queue items per slave */ +#define SLAVE_REQS_MAX 500 /* commit suicide after that many connections */ +#define IDLE_TIMEOUT 30 /* keep idle connection that many seconds */ +#define CONNECT_TIMEOUT 5 /* timeout for establishing a connection */ +#define REQUEST_TIMEOUT 10 /* timeout for a single request */ +#define NEW_CLIENT_DURATION 10 /* how long a client is considered new, i.e. busy + connection is not closed to serve other clients */ + +#ifndef dprintf +#define dprintf(x) { if (debug) printf x; } +#endif + +#ifdef LB_PROF +extern void _start (void), etext (void); +#endif + +static int running = 0; +static int debug = 0; +static volatile int die = 0, + child_died = 0; +static unsigned long clnt_dispatched = 0, + clnt_accepted = 0; + +static struct glite_srvbones_service *services; +static int services_ct; + +static int set_slaves_ct = SLAVES_COUNT; +static int set_slave_overload = SLAVE_OVERLOAD; +static int set_slave_reqs_max = SLAVE_REQS_MAX; +static struct timeval set_idle_to = {IDLE_TIMEOUT, 0}; +static struct timeval set_connect_to = {CONNECT_TIMEOUT, 0}; +static struct timeval set_request_to = {REQUEST_TIMEOUT, 0}; + +static int dispatchit(int, int, int); +static int do_sendmsg(int, int, unsigned long, int); +static int do_recvmsg(int, int *, unsigned long *, int *); +static int check_timeout(struct timeval, struct timeval, struct timeval); +static void catchsig(int); +static void catch_chld(int sig); +static int slave(int (*)(void **), int); + +static void glite_srvbones_set_slaves_ct(int); +static void glite_srvbones_set_slave_overload(int); +static void glite_srvbones_set_slave_conns_max(int); +static void set_timeout(struct timeval *,struct timeval *); + +int glite_srvbones_set_param(glite_srvbones_param_t param, ...) +{ + va_list ap; + + if ( running ) { + dprintf(("Attempting to set srv-bones parameter on running server")); + return -1; + } + + va_start(ap, param); + switch ( param ) { + case GLITE_SBPARAM_SLAVES_COUNT: + glite_srvbones_set_slaves_ct(va_arg(ap,int)); break; + case GLITE_SBPARAM_SLAVE_OVERLOAD: + glite_srvbones_set_slave_overload(va_arg(ap,int)); break; + case GLITE_SBPARAM_SLAVE_CONNS_MAX: + glite_srvbones_set_slave_conns_max(va_arg(ap,int)); break; + case GLITE_SBPARAM_IDLE_TIMEOUT: + set_timeout(&set_idle_to,va_arg(ap,struct timeval *)); break; + case GLITE_SBPARAM_CONNECT_TIMEOUT: + set_timeout(&set_connect_to,va_arg(ap,struct timeval *)); break; + case GLITE_SBPARAM_REQUEST_TIMEOUT: + set_timeout(&set_request_to,va_arg(ap,struct timeval *)); break; + } + va_end(ap); + + return 0; +} + +int glite_srvbones_run( + slave_data_init_hnd slave_data_init, + struct glite_srvbones_service *service_table, + size_t table_sz, + int dbg) +{ + struct sigaction sa; + sigset_t sset; + int sock_slave[2], i; + + + assert(service_table); + assert(table_sz > 0); + + services = service_table; + services_ct = table_sz; + debug = dbg; + + setlinebuf(stdout); + setlinebuf(stderr); + dprintf(("Master pid %d\n", getpid())); + + if ( socketpair(AF_UNIX, SOCK_STREAM, 0, sock_slave) ) + { + perror("socketpair()"); + return 1; + } + + memset(&sa, 0, sizeof(sa)); assert(sa.sa_handler == NULL); + sa.sa_handler = catchsig; + sigaction(SIGINT, &sa, NULL); + sigaction(SIGTERM, &sa, NULL); + + sa.sa_handler = catch_chld; + sigaction(SIGCHLD, &sa, NULL); + + sa.sa_handler = SIG_IGN; + sigaction(SIGUSR1, &sa, NULL); + + sigemptyset(&sset); + sigaddset(&sset, SIGCHLD); + sigaddset(&sset, SIGTERM); + sigaddset(&sset, SIGINT); + sigprocmask(SIG_BLOCK, &sset, NULL); + + for ( i = 0; i < set_slaves_ct; i++ ) + slave(slave_data_init, sock_slave[1]); + + while ( !die ) + { + fd_set fds; + int ret, mx; + + + FD_ZERO(&fds); + FD_SET(sock_slave[0], &fds); + for ( i = 0, mx = sock_slave[0]; i < services_ct; i++ ) + { + FD_SET(services[i].conn, &fds); + if ( mx < services[i].conn ) mx = services[i].conn; + } + + sigprocmask(SIG_UNBLOCK, &sset, NULL); + ret = select(mx+1, &fds, NULL, NULL, NULL); + sigprocmask(SIG_BLOCK, &sset, NULL); + + if ( ret == -1 && errno != EINTR ) + { + if ( debug ) perror("select()"); + else syslog(LOG_CRIT,"select(): %m"); + + return 1; + } + + if ( child_died ) + { + int pid; + + while ( (pid = waitpid(-1, NULL, WNOHANG)) > 0 ) + { + if ( !die ) + { + int newpid = slave(slave_data_init, sock_slave[1]); + dprintf(("[master] Servus mortuus [%d] miraculo resurrexit [%d]\n", pid, newpid)); + } + } + child_died = 0; + continue; + } + + if ( die ) continue; + + + if (FD_ISSET(sock_slave[0],&fds)) { + /* slave accepted a request + */ + unsigned long a; + + if ( (recv(sock_slave[0], &a, sizeof(a), MSG_WAITALL) == sizeof(a)) + && (a <= clnt_dispatched) + && (a > clnt_accepted || clnt_accepted > clnt_dispatched) ) + clnt_accepted = a; + } + + for ( i = 0; i < services_ct; i++ ) + if ( FD_ISSET(services[i].conn, &fds) + && dispatchit(sock_slave[0], services[i].conn ,i) ) + /* Be carefull!!! + * This must break this for cykle but start the + * while (!die) master cykle from the top also + */ + break; + } + + dprintf(("[master] Terminating on signal %d\n", die)); + if (!debug) syslog(LOG_INFO, "Terminating on signal %d\n", die); + kill(0, die); + + return 0; +} + +static int dispatchit(int sock_slave, int sock, int sidx) +{ + struct sockaddr_in a; + unsigned char *pom; + int conn, + alen, ret; + + + alen = sizeof(a); + if ( (conn = accept(sock, (struct sockaddr *)&a, &alen)) < 0 ) + { + if (debug) + { + perror("accept()"); + return 1; + } + else + { + syslog(LOG_ERR, "accept(): %m"); + sleep(5); + return -1; + } + } + + getpeername(conn, (struct sockaddr *)&a, &alen); + pom = (char *) &a.sin_addr.s_addr; + dprintf(("[master] %s connection from %d.%d.%d.%d:%d\n", + services[sidx].id? services[sidx].id: "", + (int)pom[0], (int)pom[1], (int)pom[2], (int)pom[3], + ntohs(a.sin_port))); + + ret = 0; + if ( ( clnt_dispatched < clnt_accepted /* wraparound */ + || clnt_dispatched - clnt_accepted < set_slaves_ct * set_slave_overload) + && !(ret = do_sendmsg(sock_slave, conn, clnt_dispatched++, sidx)) ) + { + /* all done + */ + dprintf(("[master] Dispatched %lu, last known served %lu\n", + clnt_dispatched-1, clnt_accepted)); + } + else + { + services[sidx].on_reject_hnd(conn); + dprintf(("[master] Rejected new connection due to overload\n")); + if ( !debug ) syslog(LOG_ERR, "Rejected new connection due to overload\n"); + } + + close(conn); + if (ret) + { + perror("sendmsg()"); + if ( !debug ) syslog(LOG_ERR, "sendmsg(): %m"); + } + + + return 0; +} + + +static int slave(slave_data_init_hnd data_init_hnd, int sock) +{ + sigset_t sset; + struct sigaction sa; + struct timeval client_done, + client_start, + new_client_duration = { NEW_CLIENT_DURATION, 0 }; + + void *clnt_data = NULL; + int conn = -1, + srv = -1, + req_cnt = 0, + sockflags, + h_errno, + pid, i, + first_request = 0; /* 1 -> first request from connected client expected */ + + + + if ( (pid = fork()) ) return pid; + +#ifdef LB_PROF + monstartup((u_long)&_start, (u_long)&etext); +#endif + + srandom(getpid()+time(NULL)); + + for ( i = 0; i < services_ct; i++ ) + close(services[i].conn); + + sigemptyset(&sset); + sigaddset(&sset, SIGTERM); + sigaddset(&sset, SIGINT); + sigaddset(&sset, SIGUSR1); + + memset(&sa, 0, sizeof(sa)); + sa.sa_handler = catchsig; + sigaction(SIGUSR1, &sa, NULL); + + if ( (sockflags = fcntl(sock, F_GETFL, 0)) < 0 + || fcntl(sock, F_SETFL, sockflags | O_NONBLOCK) < 0 ) + { + dprintf(("[%d] fcntl(master_sock): %s\n", getpid(), strerror(errno))); + if ( !debug ) syslog(LOG_CRIT, "fcntl(master_sock): %m"); + exit(1); + } + + if ( data_init_hnd && data_init_hnd(&clnt_data) ) + /* + * XXX: what if the error remains and master will start new slave + * again and again? + * + * Then we are in a deep shit. + */ + exit(1); + + while ( !die && (req_cnt < set_slave_reqs_max || (conn >= 0 && first_request))) + { + fd_set fds; + int max = sock, + connflags, + newconn = -1, + newsrv = -1; + + enum { KICK_DONT = 0, KICK_IDLE, KICK_LOAD, KICK_HANDLER, KICK_COUNT } + kick_client = KICK_DONT; + + static char * kicks[] = { + "don't kick", + "idle client", + "high load", + "no request handler", + "request count limit reached", + }; + unsigned long seq; + struct timeval now,to; + + + FD_ZERO(&fds); + FD_SET(sock, &fds); + if ( conn >= 0 ) FD_SET(conn, &fds); + if ( conn > sock ) max = conn; + + to = set_idle_to; + sigprocmask(SIG_UNBLOCK, &sset, NULL); + switch (select(max+1, &fds, NULL, NULL, to.tv_sec >= 0 ? &to : NULL)) + { + case -1: + if ( errno != EINTR ) + { + dprintf(("[%d] select(): %s\n", getpid(), strerror(errno))); + if ( !debug ) syslog(LOG_CRIT, "select(): %m"); + exit(1); + } + continue; + + case 0: + if ( conn < 0 ) continue; + + default: + break; + } + sigprocmask(SIG_BLOCK, &sset, NULL); + + gettimeofday(&now,NULL); + + if ( conn >= 0 && FD_ISSET(conn, &fds) ) + { + /* + * serve the request + */ + int rv; + + dprintf(("[%d] incoming request\n", getpid())); + + if ( !services[srv].on_request_hnd ) + { + kick_client = KICK_HANDLER; + } else { + first_request = 0; + to = set_request_to; + if ((rv = services[srv].on_request_hnd(conn,to.tv_sec>=0 ? &to : NULL,clnt_data)) == ENOTCONN) { + if (services[srv].on_disconnect_hnd + && (rv = services[srv].on_disconnect_hnd(conn,NULL,clnt_data))) + { + dprintf(("[%d] disconnect handler: %s, terminating\n",getpid(),strerror(rv))); + exit(1); + } + close(conn); + conn = -1; + srv = -1; + dprintf(("[%d] Connection closed\n", getpid())); + } + else if (rv > 0) { + /* non-fatal error -> close connection and contiue + * XXX: likely to leak resources but can we call on_disconnect_hnd() on error? + */ + close(conn); + conn = -1; + srv = -1; + dprintf(("[%d] %s, connection closed\n",getpid(),strerror(rv))); + continue; + } + else if ( rv < 0 ) { + /* unknown error -> clasified as FATAL -> kill slave + */ + dprintf(("[%d] %s, terminating\n",getpid(),strerror(-rv))); + exit(1); + } + else { + dprintf(("[%d] request done\n", getpid())); + gettimeofday(&client_done, NULL); + } + + if (!check_timeout(new_client_duration,client_start,now)) continue; + + } + } else { + if (conn >= 0 && check_timeout(set_idle_to, client_done, now)) + kick_client = KICK_IDLE; + } + + if ( (conn < 0 || !first_request) && FD_ISSET(sock, &fds) && req_cnt < set_slave_reqs_max ) + { + /* Prefer slaves with no connection, then kick idle clients, + * active ones last. Wait less if we have serviced a request in the meantime. + * Tuned for HZ=100 timer. */ + if ( conn >= 0 ) usleep( kick_client || FD_ISSET(conn, &fds) ? 11000 : 21000); + if ( do_recvmsg(sock, &newconn, &seq, &newsrv) ) switch ( errno ) + { + case EINTR: /* XXX: signals are blocked */ + case EAGAIN: + continue; + default: dprintf(("[%d] recvmsg(): %s\n", getpid(), strerror(errno))); + if (!debug) syslog(LOG_CRIT,"recvmsg(): %m\n"); + exit(1); + } + kick_client = KICK_LOAD; + } + + if (req_cnt >= set_slave_reqs_max && !first_request) kick_client = KICK_COUNT; + + if ( kick_client && conn >= 0 ) + { + if ( services[srv].on_disconnect_hnd ) + services[srv].on_disconnect_hnd(conn, NULL, clnt_data); + close(conn); + conn = -1; + srv = -1; + dprintf(("[%d] Connection closed, %s\n", getpid(), kicks[kick_client])); + } + + if ( newconn >= 0 ) + { + int ret; + + conn = newconn; + srv = newsrv; + gettimeofday(&client_start, NULL); + + switch ( send(sock, &seq, sizeof(seq), 0) ) + { + case -1: + if (debug) perror("send()"); + else syslog(LOG_CRIT, "send(): %m\n"); + exit(1); + + case sizeof(seq): + break; + + default: dprintf(("[%d] send(): incomplete message\n", getpid())); + exit(1); + } + + req_cnt++; + dprintf(("[%d] serving %s connection %lu\n", getpid(), + services[srv].id? services[srv].id: "", seq)); + + connflags = fcntl(conn, F_GETFL, 0); + if ( fcntl(conn, F_SETFL, connflags | O_NONBLOCK) < 0 ) + { + dprintf(("[%d] can't set O_NONBLOCK mode (%s), closing.\n", getpid(), strerror(errno))); + if ( !debug ) syslog(LOG_ERR, "can't set O_NONBLOCK mode (%s), closing.\n", strerror(errno)); + close(conn); + conn = srv = -1; + continue; + } + + to = set_connect_to; + if ( services[srv].on_new_conn_hnd + && (ret = services[srv].on_new_conn_hnd(conn, to.tv_sec >= 0 ? &to : NULL, clnt_data)) ) + { + dprintf(("[%d] Connection not estabilished, err = %d.\n", getpid(),ret)); + if ( !debug ) syslog(LOG_ERR, "Connection not estabilished, err = %d.\n",ret); + close(conn); + conn = srv = -1; + if (ret < 0) exit(1); + continue; + } + gettimeofday(&client_done, NULL); + first_request = 1; + } + } + + if ( die ) + { + dprintf(("[%d] Terminating on signal %d\n", getpid(), die)); + if ( !debug ) syslog(LOG_INFO, "Terminating on signal %d", die); + } + + if (conn >= 0 && services[srv].on_disconnect_hnd ) + services[srv].on_disconnect_hnd(conn, NULL, clnt_data); + + dprintf(("[%d] Terminating after %d requests\n", getpid(), req_cnt)); + if ( !debug ) syslog(LOG_INFO, "Terminating after %d requests", req_cnt); + + + exit(0); +} + +static void catchsig(int sig) +{ + die = sig; +} + +static void catch_chld(int sig) +{ + child_died = 1; +} + +static int check_timeout(struct timeval timeout, struct timeval before, struct timeval after) +{ + return (timeout.tv_usec <= after.tv_usec - before.tv_usec) ? + (timeout.tv_sec <= after.tv_sec - before.tv_sec) : + (timeout.tv_sec < after.tv_sec - before.tv_sec); +} + +#define MSG_BUFSIZ 30 + +/* + * send socket sock through socket to_sock + */ +static int do_sendmsg(int to_sock, int sock, unsigned long clnt_dispatched, int srv) +{ + struct msghdr msg = {0}; + struct cmsghdr *cmsg; + struct iovec sendiov; + int myfds, /* file descriptors to pass. */ + *fdptr; + char buf[CMSG_SPACE(sizeof myfds)]; /* ancillary data buffer */ + char sendbuf[MSG_BUFSIZ]; /* to store unsigned int + \0 */ + + + memset(sendbuf, 0, sizeof(sendbuf)); + snprintf(sendbuf, sizeof(sendbuf), "%u %lu", srv, clnt_dispatched); + + msg.msg_name = NULL; + msg.msg_namelen = 0; + msg.msg_iov = &sendiov; + msg.msg_iovlen = 1; + sendiov.iov_base = sendbuf; + sendiov.iov_len = sizeof(sendbuf); + + msg.msg_control = buf; + msg.msg_controllen = sizeof buf; + + cmsg = CMSG_FIRSTHDR(&msg); + cmsg->cmsg_level = SOL_SOCKET; + cmsg->cmsg_type = SCM_RIGHTS; + cmsg->cmsg_len = CMSG_LEN(sizeof(int)); + fdptr = (int *)CMSG_DATA(cmsg); + *fdptr = sock; + + msg.msg_controllen = cmsg->cmsg_len; + /* send fd to server-slave to do rest of communication */ + if (sendmsg(to_sock, &msg, 0) < 0) + return 1; + + return 0; +} + + +/* receive socket sock through socket from_sock */ +static int do_recvmsg(int from_sock, int *sock, unsigned long *clnt_accepted,int *srv) +{ + struct msghdr msg = {0}; + struct cmsghdr *cmsg; + struct iovec recviov; + int myfds; /* file descriptors to pass. */ + char buf[CMSG_SPACE(sizeof(myfds))]; /* ancillary data buffer */ + char recvbuf[MSG_BUFSIZ]; + + + msg.msg_name = NULL; + msg.msg_namelen = 0; + msg.msg_iov = &recviov; + msg.msg_iovlen = 1; + recviov.iov_base = recvbuf; + recviov.iov_len = sizeof(recvbuf); + + msg.msg_control = buf; + msg.msg_controllen = sizeof buf; + + cmsg = CMSG_FIRSTHDR(&msg); + cmsg->cmsg_level = SOL_SOCKET; + cmsg->cmsg_type = SCM_RIGHTS; + cmsg->cmsg_len = CMSG_LEN(sizeof(int)); + msg.msg_controllen = cmsg->cmsg_len; + + if (recvmsg(from_sock, &msg, 0) < 0) + return 1; + + *sock = *((int *)CMSG_DATA(cmsg)); + sscanf(recvbuf, "%u %lu", srv, clnt_accepted); + + return 0; +} + +static void glite_srvbones_set_slaves_ct(int n) +{ + set_slaves_ct = (n == -1)? SLAVES_COUNT: n; +} + +static void glite_srvbones_set_slave_overload(int n) +{ + set_slave_overload = (n == -1)? SLAVE_OVERLOAD: n; +} + +static void glite_srvbones_set_slave_conns_max(int n) +{ + set_slave_reqs_max = (n == -1)? SLAVE_REQS_MAX: n; +} + +static void set_timeout(struct timeval *to, struct timeval *val) +{ + if (val) { + /* XXX: why not, negative timeouts don't make any sense, IMHO */ + assert(val->tv_sec >= 0); + *to = *val; + } + else to->tv_sec = -1; +} -- 1.8.2.3