From 84954be80feb2fed58ecdd2209edcbe39067dc86 Mon Sep 17 00:00:00 2001 From: =?utf8?q?Ale=C5=A1=20K=C5=99enek?= Date: Tue, 21 Sep 2004 07:39:45 +0000 Subject: [PATCH] - First version of BK server using "server bones" library and WS protocol. - Under hard development - unstable & buggy --- org.glite.lb.server/Makefile | 39 +- org.glite.lb.server/project/LB.wsdl | 326 ++++ .../project/configure.properties.xml | 4 + org.glite.lb.server/src/bkserverd.c | 1750 ++++++++++---------- org.glite.lb.server/src/ws_fault.c | 56 + org.glite.lb.server/src/ws_plugin.c | 191 +++ org.glite.lb.server/src/ws_plugin.h | 8 + org.glite.lb.server/src/ws_query.c | 105 ++ org.glite.lb.server/src/ws_typemap.dat | 2 + org.glite.lb.server/src/ws_typeref.c.T | 459 +++++ 10 files changed, 2048 insertions(+), 892 deletions(-) create mode 100644 org.glite.lb.server/project/LB.wsdl create mode 100644 org.glite.lb.server/src/ws_fault.c create mode 100644 org.glite.lb.server/src/ws_plugin.c create mode 100644 org.glite.lb.server/src/ws_plugin.h create mode 100644 org.glite.lb.server/src/ws_query.c create mode 100644 org.glite.lb.server/src/ws_typemap.dat create mode 100644 org.glite.lb.server/src/ws_typeref.c.T diff --git a/org.glite.lb.server/Makefile b/org.glite.lb.server/Makefile index bf01a86..3e0ef9e 100644 --- a/org.glite.lb.server/Makefile +++ b/org.glite.lb.server/Makefile @@ -16,12 +16,15 @@ nothrflavour=gcc32 thrflavour=gcc32pthr expat_prefix=/opt/expat ares_prefix=/opt/ares +gsoap_prefix=/opt/gsoap -include Makefile.inc +GSOAP_FILES_PREFIX:= bk_ws_ + YACC=bison -y -VPATH=${top_srcdir}/src:${top_srcdir}/test +VPATH=${top_srcdir}/src:${top_srcdir}/test:${top_srcdir}/project AT3=perl -I${top_srcdir}/project ${top_srcdir}/project/at3 TEST_LIBS:=-L${cppunit}/lib -lcppunit @@ -29,7 +32,7 @@ TEST_INC:=-I${cppunit}/include SUFFIXES = .T -DEBUG:=-g -O0 +DEBUG:=-g -O0 -Wall # not yet # -I${voms}/include \ # -I${gacl}/include \ @@ -40,6 +43,7 @@ CFLAGS:= -DNO_VOMS -DNO_GACL \ ${DEBUG} -I${stagedir}/include -I${top_srcdir}/src -I. \ -I${expat_prefix}/include \ -I${ares_prefix}/include \ + -I${gsoap_prefix}/include \ ${COVERAGE_FLAGS} \ -I${mysql_prefix}/include \ -I${globus_prefix}/include/${nothrflavour} @@ -73,6 +77,10 @@ EXT_LIBS:= -L${ares_prefix}/lib -lares \ -L${expat_prefix}/lib -lexpat \ ${GLOBUS_LIBS} +GSOAP_LIBS:= -L${gsoap_prefix}/lib -lgsoap + +SRVBONES_LIB:= -L${stagedir}/lib -lglite_lb_server_bones + COMMON_LIB:= -L${stagedir}/lib -lglite_lb_common_${nothrflavour} SERVER_OBJS:= bkserverd.o get_events.o index.o jobstat.o jobstat_supp.o \ @@ -80,14 +88,17 @@ SERVER_OBJS:= bkserverd.o get_events.o index.o jobstat.o jobstat_supp.o \ lb_xml_parse_V21.o \ lock.o openserver.o query.o userjobs.o db_store.o request.o store.o \ stored_master.o srv_purge.o server_state.o dump.o lb_authz.o load.o \ - notification.o il_notification.o notif_match.o + notification.o il_notification.o notif_match.o \ + ${GSOAP_FILES_PREFIX}C.o ${GSOAP_FILES_PREFIX}Server.o \ + ws_plugin.o ws_query.o ws_fault.o ws_typeref.o + INDEX_OBJS:= index.o index_parse.o jobstat_supp.o lbs_db.o openserver.o \ jobstat.o query.o lock.o get_events.o write2rgma.o index_lex.o \ lb_authz.o store.o bkindex.o glite_lb_bkserverd: ${SERVER_OBJS} - ${LINK} -o $@ ${SERVER_OBJS} ${COMMON_LIB} ${EXT_LIBS} + ${LINK} -o $@ ${SERVER_OBJS} ${COMMON_LIB} ${SRVBONES_LIB} ${GSOAP_LIBS} ${EXT_LIBS} glite_lb_bkindex: ${INDEX_OBJS} ${LINK} -o $@ ${INDEX_OBJS} ${COMMON_LIB} ${EXT_LIBS} @@ -96,7 +107,7 @@ default all: compile compile: glite_lb_bkserverd glite_lb_bkindex -check: test_xml +check: ws_jobstat -echo check.query not complete yet test_xml: test_xml.cpp @@ -109,6 +120,14 @@ lb_xml_parse.c: lb_xml_parse.c.T ${AT3} $< >$@ || rm -f $@ chmod -w $@ >/dev/null +${GSOAP_FILES_PREFIX}H.h ${GSOAP_FILES_PREFIX}C.c ${GSOAP_FILES_PREFIX}Server.c: LB.xh + ${gsoap_prefix}/bin/soapcpp2 -w -c -p ${GSOAP_FILES_PREFIX} LB.xh + +LB.xh: LB.wsdl ws_typemap.dat + cp ${top_srcdir}/project/LB.wsdl . + ${gsoap_prefix}/bin/wsdl2h -c -t ${top_srcdir}/src/ws_typemap.dat -o $@ LB.wsdl + rm LB.wsdl + check.query: test_query_events ./test_query_events @@ -119,6 +138,16 @@ test_query_events: test_query_events.o ${LINKXX} -o $@ test_query_events.o ${query_events_objs} \ ${TEST_LIBS} ${COMMON_LIB} ${EXT_LIBS} +WS_JOBSTAT_OBJS:= ws_jobstat.o test_ws_plugin.o $(GSOAP_FILES_PREFIX)C.o $(GSOAP_FILES_PREFIX)Client.o \ + ws_fault.o ws_typeref.o +WS_JOBSTAT_LIBS:= ${GSOAP_LIBS} -L${stagedir}/lib -lglite_lb_client_${nothrflavour} + +ws_jobstat: ${WS_JOBSTAT_OBJS} + ${LINK} -o $@ ${WS_JOBSTAT_OBJS} ${WS_JOBSTAT_LIBS} ${EXT_LIBS} + +test_ws_plugin.o: ${top_srcdir}/src/ws_plugin.c + ${CC} ${CFLAGS} -DPLUGIN_TEST -c -o $@ ${top_srcdir}/src/ws_plugin.c + doc: stage: compile diff --git a/org.glite.lb.server/project/LB.wsdl b/org.glite.lb.server/project/LB.wsdl new file mode 100644 index 0000000..33e39d1 --- /dev/null +++ b/org.glite.lb.server/project/LB.wsdl @@ -0,0 +1,326 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + Retrieve state of a single job. + +Given a job id queries LB for detailed state of the job. + + +Input: + +jobid: Id of the job + +flags: Which fields of the job state should be retrieved: zero or more of CLASSADS, CHILDREN, CHILDSTAT + +Output: state of the job + +Faults: GenericLBFault + + + + + + + Query for jobs satisfying set of conditions. + +Intput: + +conditions: list of lists of query conditions. + Elements of the inner lists refer to a single job attribute, the conditions are or-ed. + Elements of the outer list may refer to different job attributes, they are and-ed. + +flags: Which fields of the job state should be retrieved: see JobStatus + +Output: list of mathching jobs, including their states + +Faults: GenericLBFault + + + + + + + Query all jobs of the current user. + +Input: no input + +Output: list of all user's jobs, including their states + +Faults: GenericLBFault + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + Logging and Bookkeeping service + + + + + diff --git a/org.glite.lb.server/project/configure.properties.xml b/org.glite.lb.server/project/configure.properties.xml index 66ea055..453cdae 100644 --- a/org.glite.lb.server/project/configure.properties.xml +++ b/org.glite.lb.server/project/configure.properties.xml @@ -20,6 +20,9 @@ Revision history: $Log$ + Revision 1.2 2004/09/08 13:38:03 akrenek + query event unit test (not complete yet) + Revision 1.1 2004/07/07 12:08:09 akrenek *** empty log message *** @@ -56,6 +59,7 @@ expat_prefix=${with.expat.prefix} ares_prefix=${with.ares.prefix} mysql_prefix=${with.mysql.prefix} cppunit=${with.cppunit.prefix} +gsoap_prefix=${with.gsoap.prefix} diff --git a/org.glite.lb.server/src/bkserverd.c b/org.glite.lb.server/src/bkserverd.c index 4d8f10b..7cffac6 100644 --- a/org.glite.lb.server/src/bkserverd.c +++ b/org.glite.lb.server/src/bkserverd.c @@ -1,6 +1,3 @@ -#ident "$Header$" - - #include #include #include @@ -30,6 +27,9 @@ #include +#include + +#include "glite/lb/srvbones.h" #include "glite/wmsutils/tls/ssl_helpers/ssl_inits.h" #include "glite/lb/consumer.h" #include "glite/lb/purge.h" @@ -44,12 +44,14 @@ #include "lbs_db.h" #include "lb_authz.h" #include "il_notification.h" +#include "ws_plugin.h" extern int edg_wll_StoreProto(edg_wll_Context ctx); extern edg_wll_ErrorCode edg_wll_Open(edg_wll_Context ctx, char *cs); -edg_wll_ErrorCode edg_wll_Close(edg_wll_Context); +extern edg_wll_ErrorCode edg_wll_Close(edg_wll_Context); + + -#define max(x,y) ((x)>(y)?(x):(y)) #define CON_QUEUE 20 /* accept() */ #define SLAVE_OVERLOAD 10 /* queue items per slave */ @@ -78,53 +80,39 @@ edg_wll_ErrorCode edg_wll_Close(edg_wll_Context); #endif #ifndef dprintf -#define dprintf(x) { if (debug) printf x; } +#define dprintf(x) { if (debug) printf x; } #endif - static const int one = 1; - -int debug = 0; -int rgma_export = 0; -static int noAuth = 0; -static int noIndex = 0; -static int strict_locking = 0; -static int hardJobsLimit = 0; -static int hardEventsLimit = 0; -static int hardRespSizeLimit = 0; -static char *dbstring = NULL,*fake_host = NULL; -static int fake_port = 0; - -static char ** super_users = NULL; -char *cadir = NULL, *vomsdir = NULL; - +#define sizofa(a) (sizeof(a)/sizeof((a)[0])) -static int slaves = 10, semaphores = -1, semset; -static char *purgeStorage = EDG_PURGE_STORAGE; -static char *dumpStorage = EDG_DUMP_STORAGE; -static time_t purge_timeout[EDG_WLL_NUMBER_OF_STATCODES]; -static time_t notif_duration = 60*60*24*7; -static edg_wll_QueryRec **job_index; -static edg_wll_IColumnRec *job_index_cols; -static volatile int die = 0, child_died = 0; +int debug = 0; +int rgma_export = 0; +static const int one = 1; +static int noAuth = 0; +static int noIndex = 0; +static int strict_locking = 0; +static int hardJobsLimit = 0; +static int hardEventsLimit = 0; +static int hardRespSizeLimit = 0; +static char *dbstring = NULL,*fake_host = NULL; +static int fake_port = 0; +static char *proxy = NULL; +static char **super_users = NULL; +static int slaves = 10, + semaphores = -1, + semset; +static char *purgeStorage = EDG_PURGE_STORAGE; +static char *dumpStorage = EDG_DUMP_STORAGE; -static int dispatchit(int,int,int); -static int do_sendmsg(int,int,unsigned long,int); -static int do_recvmsg(int,int *,unsigned long *,int *); -static void wait_for_open(edg_wll_Context,const char *); - -static void catchsig(int sig) -{ - die = sig; -} +static time_t purge_timeout[EDG_WLL_NUMBER_OF_STATCODES]; +static time_t notif_duration = 60*60*24*7; -static void catch_chld(int sig) -{ - child_died = 1; -} +static gss_cred_id_t mycred = GSS_C_NO_CREDENTIAL; +char *cadir = NULL, + *vomsdir = NULL; -static int slave(void *,int); static struct option opts[] = { {"cert", 1, NULL, 'c'}, @@ -132,6 +120,7 @@ static struct option opts[] = { {"CAdir", 1, NULL, 'C'}, {"VOMSdir", 1, NULL, 'V'}, {"port", 1, NULL, 'p'}, + {"wsport", 1, NULL, 'w'}, {"address", 1, NULL, 'a'}, {"debug", 0, NULL, 'd'}, {"rgmaexport", 0, NULL, 'r'}, @@ -161,6 +150,7 @@ static void usage(char *me) "\t-C, --CAdir\t trusted certificates directory\n" "\t-V, --VOMSdir\t trusted VOMS servers certificates directory\n" "\t-p, --port\t port to listen\n" + "\t-w, --wsport\t port to serve the web services requests\n" "\t-m, --mysql\t database connect string\n" "\t-d, --debug\t don't run as daemon, additional diagnostics\n" "\t-r, --rgmaexport write state info to RGMA interface\n" @@ -181,308 +171,80 @@ static void usage(char *me) ,me); } - -static int decrement_timeout(struct timeval *timeout, struct timeval before, struct timeval after) -{ - (*timeout).tv_sec = (*timeout).tv_sec - (after.tv_sec - before.tv_sec); - (*timeout).tv_usec = (*timeout).tv_usec - (after.tv_usec - before.tv_usec); - while ( (*timeout).tv_usec < 0) { - (*timeout).tv_sec--; - (*timeout).tv_usec += 1000000; - } - if ( ((*timeout).tv_sec < 0) || (((*timeout).tv_sec == 0) && ((*timeout).tv_usec == 0)) ) return(1); - else return(0); -} - -static int check_timeout(struct timeval *timeout, struct timeval before, struct timeval after) -{ - return (timeout->tv_usec <= after.tv_usec - before.tv_usec) ? - (timeout->tv_sec <= after.tv_sec - before.tv_sec) : - (timeout->tv_sec < after.tv_sec - before.tv_sec); -} - -static void clnt_reject(void *mycred, int conn) -{ - int flags = fcntl(conn, F_GETFL, 0); - - if (fcntl(conn, F_SETFL, flags | O_NONBLOCK) < 0) - return; - - edg_wll_gss_reject(conn); - return; -} - - -#define MSG_BUFSIZ 15 - -/* send socket sock through socket to_sock */ -static int do_sendmsg(int to_sock, int sock, unsigned long clnt_dispatched,int store) { - - struct msghdr msg = {0}; - struct cmsghdr *cmsg; - int myfds; /* Contains the file descriptors to pass. */ - char buf[CMSG_SPACE(sizeof myfds)]; /* ancillary data buffer */ - int *fdptr; - struct iovec sendiov; - char sendbuf[MSG_BUFSIZ]; /* to store unsigned int + \0 */ - - - snprintf(sendbuf,sizeof(sendbuf),"%c %lu",store?'S':'Q',clnt_dispatched); - - msg.msg_name = NULL; - msg.msg_namelen = 0; - msg.msg_iov = &sendiov; - msg.msg_iovlen = 1; - sendiov.iov_base = sendbuf; - sendiov.iov_len = sizeof(sendbuf); - - msg.msg_control = buf; - msg.msg_controllen = sizeof buf; - - cmsg = CMSG_FIRSTHDR(&msg); - cmsg->cmsg_level = SOL_SOCKET; - cmsg->cmsg_type = SCM_RIGHTS; - cmsg->cmsg_len = CMSG_LEN(sizeof(int)); - fdptr = (int *)CMSG_DATA(cmsg); - *fdptr = sock; - - msg.msg_controllen = cmsg->cmsg_len; - /* send fd to server-slave to do rest of communication */ - if (sendmsg(to_sock, &msg, 0) < 0) - return 1; - - return 0; -} - - -/* receive socket sock through socket from_sock */ -static int do_recvmsg(int from_sock, int *sock, unsigned long *clnt_accepted,int *store) { - - struct msghdr msg = {0}; - struct cmsghdr *cmsg; - int myfds; /* Contains the file descriptors to pass. */ - char buf[CMSG_SPACE(sizeof(myfds))]; /* ancillary data buffer */ - struct iovec recviov; - char recvbuf[MSG_BUFSIZ],op; - - - msg.msg_name = NULL; - msg.msg_namelen = 0; - msg.msg_iov = &recviov; - msg.msg_iovlen = 1; - recviov.iov_base = recvbuf; - recviov.iov_len = sizeof(recvbuf); - - msg.msg_control = buf; - msg.msg_controllen = sizeof buf; - - cmsg = CMSG_FIRSTHDR(&msg); - cmsg->cmsg_level = SOL_SOCKET; - cmsg->cmsg_type = SCM_RIGHTS; - cmsg->cmsg_len = CMSG_LEN(sizeof(int)); - msg.msg_controllen = cmsg->cmsg_len; - - if (recvmsg(from_sock, &msg, 0) < 0) - return 1; - - *sock = *((int *)CMSG_DATA(cmsg)); - sscanf(recvbuf,"%c %lu",&op,clnt_accepted); - *store = op == 'S'; - - return 0; -} +static void wait_for_open(edg_wll_Context,const char *); +static int decrement_timeout(struct timeval *, struct timeval, struct timeval); +static int read_roots(const char *); +static int asyn_gethostbyaddr(char **, const char *, int, int, struct timeval *); +static int amIroot(const char *); +static int parse_limits(char *, int *, int *, int *); +static int check_mkdir(const char *); -struct asyn_result { - struct hostent *ent; - int err; +/* + * SERVER BONES structures and handlers + */ +int bk_clnt_data_init(void **); + + /* + * Serve & Store handlers + */ +int bk_clnt_reject(int); +int bk_handle_connection(int, struct timeval, void *); +int bk_accept_serve(int, void *); +int bk_accept_store(int, void *); +int bk_clnt_disconnect(int, void *); + + /* + * WS handlers + */ +int bk_handle_ws_connection(int, struct timeval, void *); +int bk_accept_ws(int, void *); +int bk_ws_clnt_reject(int); +int bk_ws_clnt_disconnect(int, void *); + + +#define SRV_SERVE 0 +#define SRV_STORE 1 +#define SRV_WS 2 +static struct glite_srvbones_service service_table[] = { + { "serve", -1, bk_handle_connection, bk_accept_serve, bk_clnt_reject, bk_clnt_disconnect }, + { "store", -1, bk_handle_connection, bk_accept_store, bk_clnt_reject, bk_clnt_disconnect }, + { "WS", -1, bk_handle_ws_connection, bk_accept_ws, bk_ws_clnt_reject, bk_ws_clnt_disconnect } }; -/* ares callback handler for ares_gethostbyaddr() */ -static void callback_handler(void *arg, int status, struct hostent *h) { - struct asyn_result *arp = (struct asyn_result *) arg; - - switch (status) { - case ARES_SUCCESS: - if (h && h->h_name) { - arp->ent->h_name = strdup(h->h_name); - if (arp->ent->h_name == NULL) { - arp->err = NETDB_INTERNAL; - } else { - arp->err = NETDB_SUCCESS; - } - } else { - arp->err = NO_DATA; - } - break; - case ARES_EBADNAME: - case ARES_ENOTFOUND: - arp->err = HOST_NOT_FOUND; - break; - case ARES_ENOTIMP: - arp->err = NO_RECOVERY; - break; - case ARES_ENOMEM: - case ARES_EDESTRUCTION: - default: - arp->err = NETDB_INTERNAL; - break; - } -} - -static void free_hostent(struct hostent *h){ - int i; - - if (h) { - if (h->h_name) free(h->h_name); - if (h->h_aliases) { - for (i=0; h->h_aliases[i]; i++) free(h->h_aliases[i]); - free(h->h_aliases); - } - if (h->h_addr_list) { - for (i=0; h->h_addr_list[i]; i++) free(h->h_addr_list[i]); - free(h->h_addr_list); - } - free(h); - } -} - -static int asyn_gethostbyaddr(char **name, const char *addr,int len, int type, struct timeval *timeout) { - struct asyn_result ar; - ares_channel channel; - int nfds; - fd_set readers, writers; - struct timeval tv, *tvp; - struct timeval start_time,check_time; - - -/* start timer */ - gettimeofday(&start_time,0); - -/* ares init */ - if ( ares_init(&channel) != ARES_SUCCESS ) return(NETDB_INTERNAL); - ar.ent = (struct hostent *) malloc (sizeof(*ar.ent)); - memset((void *) ar.ent, 0, sizeof(*ar.ent)); - -/* query DNS server asynchronously */ - ares_gethostbyaddr(channel, addr, len, type, callback_handler, (void *) &ar); - -/* wait for result */ - while (1) { - FD_ZERO(&readers); - FD_ZERO(&writers); - nfds = ares_fds(channel, &readers, &writers); - if (nfds == 0) - break; - - gettimeofday(&check_time,0); - if (decrement_timeout(timeout, start_time, check_time)) { - ares_destroy(channel); - free_hostent(ar.ent); - return(TRY_AGAIN); - } - start_time = check_time; - - tvp = ares_timeout(channel, timeout, &tv); - - switch ( select(nfds, &readers, &writers, NULL, tvp) ) { - case -1: if (errno != EINTR) { - ares_destroy(channel); - free_hostent(ar.ent); - return NETDB_INTERNAL; - } else - continue; - case 0: - FD_ZERO(&readers); - FD_ZERO(&writers); - /* fallthrough */ - default : ares_process(channel, &readers, &writers); - } - - } - - - ares_destroy(channel); - - if (ar.err == NETDB_SUCCESS) { - *name = strdup(ar.ent->h_name); - free_hostent(ar.ent); - } - return (ar.err); -} - -static int read_roots(const char *file) -{ - FILE *roots = fopen(file,"r"); - char buf[BUFSIZ]; - int cnt = 0; - - if (!roots) { - perror(file); - return 1; - } - - while (!feof(roots)) { - char *nl; - fgets(buf,sizeof buf,roots); - nl = strchr(buf,'\n'); - if (nl) *nl = 0; - - super_users = realloc(super_users, (cnt+1) * sizeof super_users[0]); - super_users[cnt] = strdup(buf); - super_users[++cnt] = NULL; - } - - fclose(roots); - - return 0; -} - -static int amIroot(const char *subj) -{ - int i; +struct clnt_data_t { + edg_wll_Context ctx; + struct soap *soap; + void *mysql; + edg_wll_QueryRec **job_index; + edg_wll_IColumnRec *job_index_cols; +}; - if (!subj) return 0; - for (i=0; super_users && super_users[i]; i++) - if (strcmp(subj,super_users[i]) == 0) return 1; - return 0; -} -static int parse_limits(char *opt, int *j_limit, int *e_limit, int *size_limit) +int main(int argc, char *argv[]) { - return (sscanf(opt, "%d:%d:%d", j_limit, e_limit, size_limit) == 3); -} - -static unsigned long clnt_dispatched=0, clnt_accepted=0; -static gss_cred_id_t mycred = GSS_C_NO_CREDENTIAL; -static int server_sock,store_sock; + int fd, i; + struct sockaddr_in a; + char *mysubj = NULL; + int opt; + char pidfile[PATH_MAX] = EDG_BKSERVERD_PIDFILE, + *port, *ws_port, + *name; + FILE *fpid; + key_t semkey; + edg_wll_Context ctx; + OM_uint32 min_stat; + edg_wll_GssStatus gss_code; -static int check_mkdir(const char *); -int main(int argc,char *argv[]) -{ - int fd,i; - struct sockaddr_in a; - struct sigaction sa; - sigset_t sset; - char *mysubj = NULL; - int opt; - char *cert,*key,*port; - char *name,pidfile[PATH_MAX] = EDG_BKSERVERD_PIDFILE; - int sock_slave[2]; - FILE *fpid; - key_t semkey; - time_t cert_mtime = 0,key_mtime = 0; - edg_wll_Context ctx; - OM_uint32 min_stat; - edg_wll_GssStatus gss_code; name = strrchr(argv[0],'/'); if (name) name++; else name = argv[0]; - asprintf(&port,"%d",GLITE_WMSC_JOBID_DEFAULT_PORT); - cert = key = cadir = vomsdir = NULL; + asprintf(&port, "%d", GLITE_WMSC_JOBID_DEFAULT_PORT); + asprintf(&ws_port, "%d", GLITE_WMSC_JOBID_DEFAULT_PORT+2); + proxy = cadir = vomsdir = NULL; /* no magic here: 1 month, 3 and 7 days */ purge_timeout[EDG_WLL_PURGE_JOBSTAT_OTHER] = 60*60*24*31; @@ -505,13 +267,15 @@ int main(int argc,char *argv[]) if (geteuid()) snprintf(pidfile,sizeof pidfile,"%s/edg-bkserverd.pid", getenv("HOME")); - while ((opt = getopt_long(argc,argv,"a:c:k:C:V:p:drm:ns:l:L:N:i:S:D:X:",opts,NULL)) != EOF) switch (opt) { + while ((opt = getopt_long(argc,argv,"a:c:k:C:V:p:w:drm:ns:l:L:N:i:S:D:X:",opts,NULL)) != EOF) switch (opt) { case 'a': fake_host = strdup(optarg); break; - case 'c': cert = optarg; break; - case 'k': key = optarg; break; + case 'c': proxy = optarg; break; + case 'k': if ( proxy ) fprintf(stderr,"%s: proxy already defined (cert = key nowadays)\n", argv[0]); + else proxy = optarg; case 'C': cadir = optarg; break; case 'V': vomsdir = optarg; break; case 'p': free(port); port = strdup(optarg); break; + case 'w': free(ws_port); ws_port = strdup(optarg); break; case 'd': debug = 1; break; case 'r': rgma_export = 1; break; case 'm': dbstring = optarg; break; @@ -554,29 +318,31 @@ int main(int argc,char *argv[]) case '?': usage(name); return 1; } - if (optind < argc) { usage(name); return 1; } + if ( optind < argc ) { usage(name); return 1; } setlinebuf(stdout); setlinebuf(stderr); - dprintf(("Master pid %d\n",getpid())); fpid = fopen(pidfile,"r"); - if (fpid) { + if ( fpid ) + { int opid = -1; - if (fscanf(fpid,"%d",&opid) == 1) { - if (!kill(opid,0)) { + + if ( fscanf(fpid,"%d",&opid) == 1 ) + { + if ( !kill(opid,0) ) + { fprintf(stderr,"%s: another instance running, pid = %d\n",argv[0],opid); return 1; - } else if (errno != ESRCH) { - perror("kill()"); return 1; } + else if (errno != ESRCH) { perror("kill()"); return 1; } } fclose(fpid); } else if (errno != ENOENT) { perror(pidfile); return 1; } - fpid = fopen(pidfile,"w"); + fpid = fopen(pidfile, "w"); if (!fpid) { perror(pidfile); return 1; } - fprintf(fpid,"%d",getpid()); + fprintf(fpid, "%d", getpid()); fclose(fpid); semkey = ftok(pidfile,0); @@ -587,29 +353,36 @@ int main(int argc,char *argv[]) if (check_mkdir(purgeStorage)) exit(1); if (semaphores == -1) semaphores = slaves; - semset = semget(semkey,0,0); - if (semset >= 0) semctl(semset,0,IPC_RMID); - semset = semget(semkey,semaphores,IPC_CREAT | 0600); + semset = semget(semkey, 0, 0); + if (semset >= 0) semctl(semset, 0, IPC_RMID); + semset = semget(semkey, semaphores, IPC_CREAT | 0600); if (semset < 0) { perror("semget()"); return 1; } dprintf(("Using %d semaphores, set id %d\n",semaphores,semset)); - for (i=0; ip_tmp_timeout.tv_sec = MASTER_TIMEOUT; - - if (ret == -1 && errno != EINTR) { - if (debug) perror("select()"); - else syslog(LOG_CRIT,"select(): %m"); - return 1; - } - - if (child_died) { - int pid; - while ((pid=waitpid(-1,NULL,WNOHANG))>0) { - if (!die) { - int newpid = slave(mycred,sock_slave[1]); - dprintf(("[master] Servus mortuus [%d] miraculo resurrexit [%d]\n",pid,newpid)); - } - } - child_died = 0; - continue; - } - - if (die) continue; - - if (FD_ISSET(server_sock,&fds) && dispatchit(sock_slave[0],server_sock,0)) continue; - if (FD_ISSET(store_sock,&fds) && dispatchit(sock_slave[0],store_sock,1)) continue; - - if (FD_ISSET(sock_slave[0],&fds)) { -/* slave accepted a request */ - unsigned long a; - - if ((recv(sock_slave[0],&a,sizeof(a),MSG_WAITALL) == sizeof(a)) - && a<=clnt_dispatched - && (a>clnt_accepted || clnt_accepted>clnt_dispatched) - ) clnt_accepted = a; - } - - } - dprintf(("[master] Terminating on signal %d\n",die)); - if (!debug) syslog(LOG_INFO,"Terminating on signal %d\n",die); - kill(0,die); - semctl(semset,0,IPC_RMID,0); - unlink(pidfile); - free(port); - gss_release_cred(&min_stat, &mycred); - return 0; -} - -static int dispatchit(int sock_slave,int sock,int store) +int bk_clnt_data_init(void **data) { - struct sockaddr_in a; - int conn; - unsigned char *pom; - int alen,ret; - - alen=sizeof(a); - conn = accept(sock,(struct sockaddr *) &a,&alen); - - if (conn<0) { - if (debug) { - perror("accept()"); return 1; - } else { - syslog(LOG_ERR,"accept(): %m"); - sleep(5); - return -1; - } - } + edg_wll_Context ctx; + struct clnt_data_t *cdata; + edg_wll_QueryRec **job_index; + edg_wll_IColumnRec *job_index_cols; - alen=sizeof(a); - getpeername(conn,(struct sockaddr *)&a,&alen); - pom = (char *) &a.sin_addr.s_addr; - dprintf(("[master] %s connection from %d.%d.%d.%d:%d\n",store?"store":"query", - (int) pom[0],(int) pom[1],(int) pom[2],(int) pom[3], ntohs(a.sin_port))); + if ( !(cdata = calloc(1, sizeof(*cdata))) ) + return -1; - ret = 0; - if ((clnt_dispatchedmysql = ctx->mysql; - wait_for_open(ctx,dbstring); + if ( edg_wll_QueryJobIndices(ctx, &job_index, NULL) ) + { + char *et, *ed; - mysql = ctx->mysql; - if (edg_wll_QueryJobIndices(ctx,&job_index,NULL)) { - char *et,*ed; edg_wll_Error(ctx,&et,&ed); - dprintf(("[%d]: query_job_indices(): %s: %s, no custom indices available\n",getpid(),et,ed)); if (!debug) syslog(LOG_ERR,"[%d]: query_job_indices(): %s: %s, no custom indices available\n",getpid(),et,ed); free(et); free(ed); - job_index = NULL; } edg_wll_FreeContext(ctx); + cdata->job_index = job_index; - if (job_index) { + if ( job_index ) + { int i,j, k, maxncol, ncol; + ncol = maxncol = 0; - for (i=0; job_index[i]; i++) { - for (j=0; job_index[i][j].attr; j++) maxncol++; - } + for ( i = 0; job_index[i]; i++ ) + for ( j = 0; job_index[i][j].attr; j++ ) + maxncol++; + job_index_cols = calloc(maxncol+1, sizeof(edg_wll_IColumnRec)); - for (i=0; job_index[i]; i++) { - for (j=0; job_index[i][j].attr; j++) { - for (k=0; kjob_index_cols = job_index_cols; } - - while (!die && (conn_cnt < SLAVE_CONNS_MAX || conn >= 0)) { - fd_set fds; - int alen,store,max = sock,newconn = -1; - int connflags,kick_client = 0; - unsigned long seq; - struct timeval dns_to = {DNS_TIMEOUT, 0}, - check_to = {SLAVE_CHECK_SIGNALS, 0}, - total_to = { TOTAL_CLNT_TIMEOUT,0 }, - client_to = { CLNT_TIMEOUT,0 }, now; - char *name = NULL; - struct sockaddr_in a; - edg_wll_GssStatus gss_code; - - - FD_ZERO(&fds); - FD_SET(sock,&fds); - if (conn >= 0) FD_SET(conn,&fds); - if (conn > sock) max = conn; - - sigprocmask(SIG_UNBLOCK,&sset,NULL); - switch (select(max+1,&fds,NULL,NULL,&check_to)) { - case -1: - if (errno != EINTR) { - dprintf(("[%d] select(): %s\n",getpid(),strerror(errno))); - if (!debug) syslog(LOG_CRIT,"select(): %m"); - exit(1); - } - continue; - case 0: if (conn < 0) continue; - default: break; - } - sigprocmask(SIG_BLOCK,&sset,NULL); - - gettimeofday(&now,NULL); - if (conn >= 0 && (check_timeout(&client_to,client_done,now) || - check_timeout(&total_to,client_start,now))) - kick_client = 1; - - if (conn >= 0 && !kick_client && FD_ISSET(conn,&fds)) { - /* serve the request */ - - dprintf(("[%d] incoming request\n",getpid())); - - ctx->p_tmp_timeout.tv_sec = SLAVE_TIMEOUT; - ctx->p_tmp_timeout.tv_usec = 0; - - if (total_to.tv_sec < ctx->p_tmp_timeout.tv_sec) - /* XXX: sec = && usec < never happens */ - { - ctx->p_tmp_timeout.tv_sec = total_to.tv_sec; - ctx->p_tmp_timeout.tv_usec = total_to.tv_usec; - } - - if (store ? edg_wll_StoreProto(ctx) : edg_wll_ServerHTTP(ctx)) { - char *errt,*errd; - errt = errd = NULL; - - switch (edg_wll_Error(ctx,&errt,&errd)) { - case ETIMEDOUT: - /* fallthrough */ - case EDG_WLL_ERROR_GSS: - case EPIPE: - dprintf(("[%d] %s (%s)\n",getpid(),errt,errd)); - if (!debug) syslog(LOG_ERR,"%s (%s)",errt,errd); - /* fallthrough */ - case ENOTCONN: - edg_wll_gss_close(&ctx->connPool[ctx->connToUse].gss, NULL); - edg_wll_FreeContext(ctx); - ctx = NULL; - close(conn); - conn = -1; - free(errt); free(errd); - dprintf(("[%d] Connection closed\n",getpid())); - continue; - break; - case ENOENT: - /* fallthrough */ - case EINVAL: - /* fallthrough */ - case EPERM: - case EEXIST: - case EDG_WLL_ERROR_NOINDEX: - case E2BIG: - dprintf(("[%d] %s (%s)\n",getpid(),errt,errd)); - if (!debug) syslog(LOG_ERR,"%s (%s)",errt,errd); - break; /* no action for non-fatal errors */ - default: - dprintf(("[%d] %s (%s)\n",getpid(),errt,errd)); - if (!debug) syslog(LOG_CRIT,"%s (%s)",errt,errd); - exit(1); - } - free(errt); free(errd); - } - - dprintf(("[%d] request done\n",getpid())); - gettimeofday(&client_done,NULL); - continue; + *data = cdata; + return 0; +} - } + +/* + * Creates context (initializes it from global vatiables and data given + * from server_bones) + * gets the connection info + * and accepts the gss connection + */ +int bk_handle_connection(int conn, struct timeval client_start, void *data) +{ + struct clnt_data_t *cdata = (struct clnt_data_t *)data; + edg_wll_Context ctx; + gss_name_t client_name = GSS_C_NO_NAME; + gss_buffer_desc token = GSS_C_EMPTY_BUFFER; + edg_wll_GssStatus gss_code; + OM_uint32 min_stat, + maj_stat; + struct timeval dns_to = {DNS_TIMEOUT, 0}, + total_to = { TOTAL_CLNT_TIMEOUT,0 }, + now; + struct sockaddr_in a; + int alen; + char *server_name = NULL, + *name = NULL; + int h_errno; - if (FD_ISSET(sock,&fds) && conn_cnt < SLAVE_CONNS_MAX) { - if (conn >= 0) usleep(100000 + 1000 * (random() % 200)); - if (do_recvmsg(sock,&newconn,&seq,&store)) switch (errno) { - case EINTR: /* XXX: signals are blocked */ - case EAGAIN: continue; - default: dprintf(("[%d] recvmsg(): %s\n",getpid(),strerror(errno))); - if (!debug) syslog(LOG_CRIT,"recvmsg(): %m\n"); - exit(1); - } - kick_client = 1; - } - if (kick_client && conn >= 0) { - if (ctx->connPool[ctx->connToUse].gss.context != GSS_C_NO_CONTEXT) { - struct timeval to = { 0, CLNT_REJECT_TIMEOUT }; - edg_wll_gss_close(&ctx->connPool[ctx->connToUse].gss,&to); - } - edg_wll_FreeContext(ctx); - close(conn); /* XXX: should not harm */ - conn = -1; - dprintf(("[%d] Idle connection closed\n",getpid())); - } + if ( edg_wll_InitContext(&ctx) ) + { + fprintf(stderr, "Couldn't create context"); + return -1; + } + cdata->ctx = ctx; - if (newconn >= 0) { - OM_uint32 min_stat, maj_stat; - gss_name_t client_name = GSS_C_NO_NAME; - gss_buffer_desc token = GSS_C_EMPTY_BUFFER; - - conn = newconn; - gettimeofday(&client_start,NULL); - client_done.tv_sec = client_start.tv_sec; - client_done.tv_usec = client_start.tv_usec; - - switch (send(sock,&seq,sizeof(seq),0)) { - case -1: - if (debug) perror("send()"); - else syslog(LOG_CRIT,"send(): %m\n"); - exit(1); - case sizeof(seq): break; - default: dprintf(("[%d] send(): incomplete message\n",getpid())); - exit(1); - } - - dprintf(("[%d] serving %s %lu\n",getpid(),store?"store":"query",seq)); - conn_cnt++; - - connflags = fcntl(conn, F_GETFL, 0); - if (fcntl(conn, F_SETFL, connflags | O_NONBLOCK) < 0) { - dprintf(("[%d] can't set O_NONBLOCK mode (%s), closing.\n", - getpid(), strerror(errno))); - if (!debug) syslog(LOG_ERR,"can't set O_NONBLOCK mode (%s), closing.\n", - strerror(errno)); - - close(conn); - conn = -1; - continue; - } - - edg_wll_InitContext(&ctx); - - /* Shared structures (pointers) */ - ctx->mysql = mysql; - ctx->job_index_cols = (void*) job_index_cols; - ctx->job_index = job_index; + /* Shared structures (pointers) + */ + ctx->mysql = cdata->mysql; + ctx->job_index_cols = cdata->job_index_cols; + ctx->job_index = cdata->job_index; - ctx->notifDuration = notif_duration; - ctx->purgeStorage = strdup(purgeStorage); - ctx->dumpStorage = strdup(dumpStorage); - ctx->hardJobsLimit = hardJobsLimit; - ctx->hardEventsLimit = hardEventsLimit; - ctx->semset = semset; - ctx->semaphores = semaphores; - if (noAuth) ctx->noAuth = 1; - ctx->rgma_export = rgma_export; - memcpy(ctx->purge_timeout,purge_timeout,sizeof ctx->purge_timeout); - - ctx->p_tmp_timeout.tv_sec = SLAVE_TIMEOUT; - - ctx->poolSize = 1; - ctx->connPool = calloc(1,sizeof(edg_wll_ConnPool)); - ctx->connToUse = 0; + /* set globals + */ + ctx->notifDuration = notif_duration; + ctx->purgeStorage = strdup(purgeStorage); + ctx->dumpStorage = strdup(dumpStorage); + ctx->hardJobsLimit = hardJobsLimit; + ctx->hardEventsLimit = hardEventsLimit; + ctx->semset = semset; + ctx->semaphores = semaphores; + if ( noAuth ) ctx->noAuth = 1; + ctx->rgma_export = rgma_export; + memcpy(ctx->purge_timeout, purge_timeout, sizeof(ctx->purge_timeout)); + + ctx->p_tmp_timeout.tv_sec = SLAVE_TIMEOUT; + ctx->p_tmp_timeout.tv_usec = 0; + if ( total_to.tv_sec < ctx->p_tmp_timeout.tv_sec ) + { + ctx->p_tmp_timeout.tv_sec = total_to.tv_sec; + ctx->p_tmp_timeout.tv_usec = total_to.tv_usec; + } - alen = sizeof(a); - getpeername(conn,(struct sockaddr *)&a,&alen); - ctx->connPool[ctx->connToUse].peerName = strdup(inet_ntoa(a.sin_addr)); - ctx->connPool[ctx->connToUse].peerPort = ntohs(a.sin_port); - - /* not a critical operation, do not waste all SLAVE_TIMEOUT */ - switch (h_errno = asyn_gethostbyaddr(&name,(char *) &a.sin_addr.s_addr,sizeof(a.sin_addr.s_addr), - AF_INET,&dns_to)) { - case NETDB_SUCCESS: - if (name) dprintf(("[%d] connection from %s:%d (%s)\n", getpid(), - inet_ntoa(a.sin_addr), ntohs(a.sin_port), name)); - free(ctx->connPool[ctx->connToUse].peerName); - ctx->connPool[ctx->connToUse].peerName = name; - name = NULL; - break; - default: - if (debug) fprintf(stderr,"gethostbyaddr(%s): %s", inet_ntoa(a.sin_addr), hstrerror(h_errno)); - /* else syslog(LOG_ERR,"gethostbyaddr(%s): %s", inet_ntoa(a.sin_addr), hstrerror(h_errno)); */ - dprintf(("[%d] connection from %s:%d\n", getpid(), inet_ntoa(a.sin_addr), ntohs(a.sin_port))); - break; - } + ctx->poolSize = 1; + ctx->connPool = calloc(1, sizeof(edg_wll_ConnPool)); + ctx->connToUse = 0; + + alen = sizeof(a); + getpeername(conn, (struct sockaddr *)&a, &alen); + ctx->connPool[ctx->connToUse].peerName = strdup(inet_ntoa(a.sin_addr)); + ctx->connPool[ctx->connToUse].peerPort = ntohs(a.sin_port); + + /* not a critical operation, do not waste all SLAVE_TIMEOUT */ + h_errno = asyn_gethostbyaddr(&name, (char *)&a.sin_addr.s_addr,sizeof(a.sin_addr.s_addr), AF_INET, &dns_to); + switch ( h_errno ) + { + case NETDB_SUCCESS: + if (name) dprintf(("[%d] connection from %s:%d (%s)\n", + getpid(), inet_ntoa(a.sin_addr), ntohs(a.sin_port), name)); + free(ctx->connPool[ctx->connToUse].peerName); + ctx->connPool[ctx->connToUse].peerName = name; + name = NULL; + break; + + default: + if (debug) fprintf(stderr, "gethostbyaddr(%s): %s", inet_ntoa(a.sin_addr), hstrerror(h_errno)); + dprintf(("[%d] connection from %s:%d\n", getpid(), inet_ntoa(a.sin_addr), ntohs(a.sin_port))); + break; + } - gettimeofday(&now,0); - if ( decrement_timeout(&ctx->p_tmp_timeout, client_start, now) ) { - if (debug) fprintf(stderr,"gethostbyaddr() timeout"); - else syslog(LOG_ERR,"gethostbyaddr(): timeout"); - free(name); - continue; - } + gettimeofday(&now,0); + if ( decrement_timeout(&ctx->p_tmp_timeout, client_start, now) ) + { + if (debug) fprintf(stderr, "gethostbyaddr() timeout"); + else syslog(LOG_ERR, "gethostbyaddr(): timeout"); + free(name); + + return -1; + } + if (fake_host) + { + ctx->srvName = strdup(fake_host); + ctx->srvPort = fake_port; + } + else + { + alen = sizeof(a); + getsockname(conn,(struct sockaddr *) &a,&alen); - if (fake_host) { - ctx->srvName = strdup(fake_host); - ctx->srvPort = fake_port; - } - else { - alen = sizeof(a); - getsockname(conn,(struct sockaddr *) &a,&alen); - - dns_to.tv_sec = DNS_TIMEOUT; - dns_to.tv_usec = 0; - switch (h_errno = asyn_gethostbyaddr(&name,(char *) &a.sin_addr.s_addr,sizeof(a.sin_addr.s_addr), - AF_INET,&dns_to)) { - case NETDB_SUCCESS: - ctx->srvName = name; - if (server_name != NULL) { - if (strcmp(name, server_name)) { - if (debug) fprintf(stderr, "different server endpoint names (%s,%s)," + dns_to.tv_sec = DNS_TIMEOUT; + dns_to.tv_usec = 0; + h_errno = asyn_gethostbyaddr(&name, + (char *) &a.sin_addr.s_addr,sizeof(a.sin_addr.s_addr), + AF_INET,&dns_to); + + switch ( h_errno ) + { + case NETDB_SUCCESS: + ctx->srvName = name; + if ( server_name != NULL ) + { + if ( strcmp(name, server_name)) + { + if (debug) fprintf(stderr, "different server endpoint names (%s,%s)," " check DNS PTR records\n", name, server_name); - else syslog(LOG_ERR,"different server endpoint names (%s,%s)," + else syslog(LOG_ERR,"different server endpoint names (%s,%s)," " check DNS PTR records\n", name, server_name); - } - } else { - server_name = strdup(name); - } - break; - default: - if (debug) fprintf(stderr, "gethostbyaddr(%s): %s", inet_ntoa(a.sin_addr), hstrerror(h_errno)); - else syslog(LOG_ERR,"gethostbyaddr(%s): %s", inet_ntoa(a.sin_addr), hstrerror(h_errno)); - if (server_name != NULL) { - ctx->srvName = strdup(server_name); - } else { - /* only "GET /jobid" requests refused */ - } - break; } - ctx->srvPort = ntohs(a.sin_port); } - - if (edg_wll_gss_accept(mycred, conn, &ctx->p_tmp_timeout, &ctx->connPool[ctx->connToUse].gss, &gss_code)) { - dprintf(("[%d] Client authentication failed, closing.\n",getpid())); - if (!debug) syslog(LOG_ERR,"Client authentication failed"); - - close(conn); - conn = -1; - edg_wll_FreeContext(ctx); - continue; - } - maj_stat = gss_inquire_context(&min_stat, ctx->connPool[ctx->connToUse].gss.context, &client_name, NULL, NULL, NULL, NULL, NULL, NULL); - if (!GSS_ERROR(maj_stat)) - maj_stat = gss_display_name(&min_stat, client_name, &token, NULL); - - if (!GSS_ERROR(maj_stat)) { - if (ctx->peerName) free(ctx->peerName); - ctx->peerName = (char *)token.value; - memset(&token, 0, sizeof(token)); -/* XXX DK: pujde pouzit lifetime z inquire_context()? - ctx->peerProxyValidity = ASN1_UTCTIME_mktime(X509_get_notAfter(peer)); -*/ + else server_name = strdup(name); + break; + + default: + if ( debug ) + fprintf(stderr, "gethostbyaddr(%s): %s", inet_ntoa(a.sin_addr), hstrerror(h_errno)); + else + syslog(LOG_ERR,"gethostbyaddr(%s): %s", inet_ntoa(a.sin_addr), hstrerror(h_errno)); + if ( server_name != NULL ) + ctx->srvName = strdup(server_name); + break; + } + ctx->srvPort = ntohs(a.sin_port); + } + + if ( edg_wll_gss_accept(mycred, conn, &ctx->p_tmp_timeout, &ctx->connPool[ctx->connToUse].gss, &gss_code) ) + { + dprintf(("[%d] Client authentication failed, closing.\n", getpid())); + if (!debug) syslog(LOG_ERR, "Client authentication failed"); + edg_wll_FreeContext(ctx); + return 1; + } + + maj_stat = gss_inquire_context(&min_stat, ctx->connPool[ctx->connToUse].gss.context, + &client_name, NULL, NULL, NULL, NULL, NULL, NULL); + if ( !GSS_ERROR(maj_stat) ) + maj_stat = gss_display_name(&min_stat, client_name, &token, NULL); + + if ( !GSS_ERROR(maj_stat) ) + { + if (ctx->peerName) free(ctx->peerName); + ctx->peerName = (char *)token.value; + memset(&token, 0, sizeof(token)); + /* XXX DK: pujde pouzit lifetime z inquire_context()? + * + ctx->peerProxyValidity = ASN1_UTCTIME_mktime(X509_get_notAfter(peer)); + */ - dprintf(("[%d] client DN: %s\n",getpid(),ctx->peerName)); - } else { - /* XXX DK: Check if the ANONYMOUS flag is set ? */ - dprintf(("[%d] annonymous client\n",getpid())); - } + dprintf(("[%d] client DN: %s\n",getpid(),ctx->peerName)); + } + else + /* XXX DK: Check if the ANONYMOUS flag is set ? + */ + dprintf(("[%d] annonymous client\n",getpid())); - if (client_name != GSS_C_NO_NAME) - gss_release_name(&min_stat, &client_name); - if (token.value) - gss_release_buffer(&min_stat, &token); - - edg_wll_SetVomsGroups(ctx, &ctx->connPool[ctx->connToUse].gss, vomsdir, cadir); - if (debug && ctx->vomsGroups.len > 0) { - int i; + if ( client_name != GSS_C_NO_NAME ) + gss_release_name(&min_stat, &client_name); + if ( token.value ) + gss_release_buffer(&min_stat, &token); + + edg_wll_SetVomsGroups(ctx, &ctx->connPool[ctx->connToUse].gss, vomsdir, cadir); + if (debug && ctx->vomsGroups.len > 0) + { + int i; - dprintf(("[%d] client's VOMS groups:\n",getpid())); - for (i = 0; i < ctx->vomsGroups.len; i++) - dprintf(("\t%s:%s\n", - ctx->vomsGroups.val[i].vo, - ctx->vomsGroups.val[i].name)); - } - - /* used also to reset start_time after edg_wll_ssl_accept! */ - /* gettimeofday(&start_time,0); */ + dprintf(("[%d] client's VOMS groups:\n",getpid())); + for ( i = 0; i < ctx->vomsGroups.len; i++ ) + dprintf(("\t%s:%s\n", ctx->vomsGroups.val[i].vo, ctx->vomsGroups.val[i].name)); + } + /* used also to reset start_time after edg_wll_ssl_accept! */ + /* gettimeofday(&start_time,0); */ - ctx->noAuth = noAuth || amIroot(ctx->peerName); - switch (noIndex) { - case 0: ctx->noIndex = 0; break; - case 1: ctx->noIndex = amIroot(ctx->peerName); break; - case 2: ctx->noIndex = 1; break; - } - ctx->strict_locking = strict_locking; - } + ctx->noAuth = noAuth || amIroot(ctx->peerName); + switch ( noIndex ) + { + case 0: ctx->noIndex = 0; break; + case 1: ctx->noIndex = amIroot(ctx->peerName); break; + case 2: ctx->noIndex = 1; break; + } + ctx->strict_locking = strict_locking; + + + return 0; +} + +int bk_handle_ws_connection(int conn, struct timeval client_start, void *data) +{ + struct clnt_data_t *cdata = (struct clnt_data_t *) data; + struct soap *soap; + int rv; + + + if ( !(soap = soap_new()) ) + { + fprintf(stderr, "Couldn't create soap environment"); + return -1; + } + + if ( (rv = bk_handle_connection(conn, client_start, data)) ) + { + soap_destroy(soap); + return rv; + } + + cdata->soap = soap; + + soap_init(soap); + soap_set_namespaces(soap, namespaces); + soap_register_plugin_arg(soap, edg_wll_ws_plugin, cdata->ctx); + + + return 0; +} + +int bk_accept_store(int conn, void *cdata) +{ + edg_wll_Context ctx = ((struct clnt_data_t *) cdata)->ctx; + + /* + * serve the request + */ + if ( edg_wll_StoreProto(ctx) ) + { + char *errt, *errd; + + + errt = errd = NULL; + switch ( edg_wll_Error(ctx, &errt, &errd) ) + { + case ETIMEDOUT: + case EDG_WLL_ERROR_GSS: + case EPIPE: + dprintf(("[%d] %s (%s)\n", getpid(), errt, errd)); + if (!debug) syslog(LOG_ERR,"%s (%s)", errt, errd); + /* fallthrough + */ + case ENOTCONN: + edg_wll_gss_close(&ctx->connPool[ctx->connToUse].gss, NULL); + edg_wll_FreeContext(ctx); + ctx = NULL; + free(errt); free(errd); + dprintf(("[%d] Connection closed\n", getpid())); + /* + * "recoverable" error - return (>0) + */ + return 1; + break; + + case ENOENT: + case EINVAL: + case EPERM: + case EEXIST: + case EDG_WLL_ERROR_NOINDEX: + case E2BIG: + dprintf(("[%d] %s (%s)\n", getpid(), errt, errd)); + if ( !debug ) syslog(LOG_ERR,"%s (%s)", errt, errd); + /* + * no action for non-fatal errors + */ + break; + + default: + dprintf(("[%d] %s (%s)\n", getpid(), errt, errd)); + if (!debug) syslog(LOG_CRIT,"%s (%s)",errt,errd); + /* + * unknown error - do rather return (<0) (slave will be killed) + */ + return -1; + } + free(errt); free(errd); + } + + return 0; +} + +int bk_accept_serve(int conn, void *cdata) +{ + edg_wll_Context ctx = ((struct clnt_data_t *) cdata)->ctx; + + /* + * serve the request + */ + if ( edg_wll_ServerHTTP(ctx) ) + { + char *errt, *errd; + + + errt = errd = NULL; + switch ( edg_wll_Error(ctx, &errt, &errd) ) + { + case ETIMEDOUT: + case EDG_WLL_ERROR_GSS: + case EPIPE: + dprintf(("[%d] %s (%s)\n", getpid(), errt, errd)); + if (!debug) syslog(LOG_ERR,"%s (%s)", errt, errd); + /* fallthrough + */ + case ENOTCONN: + edg_wll_gss_close(&ctx->connPool[ctx->connToUse].gss, NULL); + edg_wll_FreeContext(ctx); + ctx = NULL; + free(errt); free(errd); + dprintf(("[%d] Connection closed\n", getpid())); + /* + * "recoverable" error - return (>0) + */ + return 1; + break; + + case ENOENT: + case EINVAL: + case EPERM: + case EEXIST: + case EDG_WLL_ERROR_NOINDEX: + case E2BIG: + dprintf(("[%d] %s (%s)\n", getpid(), errt, errd)); + if ( !debug ) syslog(LOG_ERR,"%s (%s)", errt, errd); + /* + * no action for non-fatal errors + */ + break; + + default: + dprintf(("[%d] %s (%s)\n", getpid(), errt, errd)); + if (!debug) syslog(LOG_CRIT,"%s (%s)",errt,errd); + /* + * unknown error - do rather return (<0) (slave will be killed) + */ + return -1; + } + free(errt); free(errd); } - if (die) { - dprintf(("[%d] Terminating on signal %d\n",getpid(),die)); - if (!debug) syslog(LOG_INFO,"Terminating on signal %d",die); + return 0; +} + + +int bk_accept_ws(int conn, void *cdata) +{ + struct soap *soap = ((struct clnt_data_t *) cdata)->soap; + edg_wll_Context ctx = ((struct clnt_data_t *) cdata)->ctx; + + + /* XXX: Is it neccessary? + * + * BEWARE: gSoap is trying to handle this connection -> closes the + * socket after then query is served (or something like that :) + * + soap->socket = conn; + */ + if ( soap_serve(soap) ) + { + char *errt, *errd; + + + errt = errd = NULL; + switch ( edg_wll_Error(ctx, &errt, &errd) ) + { + case ETIMEDOUT: + case EDG_WLL_ERROR_GSS: + case EPIPE: + dprintf(("[%d] %s (%s)\n", getpid(), errt, errd)); + if (!debug) syslog(LOG_ERR,"%s (%s)", errt, errd); + /* fallthrough + */ + case ENOTCONN: + edg_wll_gss_close(&ctx->connPool[ctx->connToUse].gss, NULL); + edg_wll_FreeContext(ctx); + ctx = NULL; + free(errt); free(errd); + dprintf(("[%d] Connection closed\n", getpid())); + /* + * "recoverable" error - return (>0) + */ + return 1; + break; + + case ENOENT: + case EINVAL: + case EPERM: + case EEXIST: + case EDG_WLL_ERROR_NOINDEX: + case E2BIG: + dprintf(("[%d] %s (%s)\n", getpid(), errt, errd)); + if ( !debug ) syslog(LOG_ERR,"%s (%s)", errt, errd); + /* + * no action for non-fatal errors + */ + break; + + default: + dprintf(("[%d] %s (%s)\n", getpid(), errt, errd)); + if (!debug) syslog(LOG_CRIT,"%s (%s)",errt,errd); + /* + * unknown error - do rather return (<0) (slave will be killed) + */ + return -1; + } + free(errt); free(errd); + return 1; + } + + return 0; +} + + +int bk_clnt_disconnect(int conn, void *cdata) +{ + edg_wll_Context ctx = ((struct clnt_data_t *) cdata)->ctx; + + + if ( ctx->connPool[ctx->connToUse].gss.context != GSS_C_NO_CONTEXT) + { + struct timeval to = { 0, CLNT_REJECT_TIMEOUT }; + + edg_wll_gss_close(&ctx->connPool[ctx->connToUse].gss, &to); } - dprintf(("[%d] Terminating after %d connections\n",getpid(),conn_cnt)); - if (!debug) syslog(LOG_INFO,"Terminating after %d connections",conn_cnt); - exit(0); + edg_wll_FreeContext(ctx); + + return 0; +} + +int bk_ws_clnt_disconnect(int conn, void *cdata) +{ + int rv; + + + if ( (rv = bk_clnt_disconnect(conn, cdata)) ) + return rv; + + soap_destroy(((struct clnt_data_t *)cdata)->soap); + + return 0; } +int bk_clnt_reject(int conn) +{ + int flags = fcntl(conn, F_GETFL, 0); + + if ( fcntl(conn, F_SETFL, flags | O_NONBLOCK) < 0 ) + return 1; + + edg_wll_gss_reject(conn); + + return 0; +} + +int bk_ws_clnt_reject(int conn) +{ + return bk_clnt_reject(conn); +} + + static void wait_for_open(edg_wll_Context ctx, const char *dbstring) { char *dbfail_string1, *dbfail_string2; @@ -1296,23 +1092,188 @@ static void wait_for_open(edg_wll_Context ctx, const char *dbstring) } } +static void free_hostent(struct hostent *h){ + int i; + + if (h) { + if (h->h_name) free(h->h_name); + if (h->h_aliases) { + for (i=0; h->h_aliases[i]; i++) free(h->h_aliases[i]); + free(h->h_aliases); + } + if (h->h_addr_list) { + for (i=0; h->h_addr_list[i]; i++) free(h->h_addr_list[i]); + free(h->h_addr_list); + } + free(h); + } +} + +struct asyn_result { + struct hostent *ent; + int err; +}; + +/* ares callback handler for ares_gethostbyaddr() */ +static void callback_handler(void *arg, int status, struct hostent *h) +{ + struct asyn_result *arp = (struct asyn_result *) arg; + + switch (status) { + case ARES_SUCCESS: + if (h && h->h_name) { + arp->ent->h_name = strdup(h->h_name); + if (arp->ent->h_name == NULL) { + arp->err = NETDB_INTERNAL; + } else { + arp->err = NETDB_SUCCESS; + } + } else { + arp->err = NO_DATA; + } + break; + case ARES_EBADNAME: + case ARES_ENOTFOUND: + arp->err = HOST_NOT_FOUND; + break; + case ARES_ENOTIMP: + arp->err = NO_RECOVERY; + break; + case ARES_ENOMEM: + case ARES_EDESTRUCTION: + default: + arp->err = NETDB_INTERNAL; + break; + } +} + +static int asyn_gethostbyaddr(char **name, const char *addr,int len, int type, struct timeval *timeout) +{ + struct asyn_result ar; + ares_channel channel; + int nfds; + fd_set readers, writers; + struct timeval tv, *tvp; + struct timeval start_time,check_time; + + +/* start timer */ + gettimeofday(&start_time,0); + +/* ares init */ + if ( ares_init(&channel) != ARES_SUCCESS ) return(NETDB_INTERNAL); + ar.ent = (struct hostent *) malloc (sizeof(*ar.ent)); + memset((void *) ar.ent, 0, sizeof(*ar.ent)); + +/* query DNS server asynchronously */ + ares_gethostbyaddr(channel, addr, len, type, callback_handler, (void *) &ar); + +/* wait for result */ + while (1) { + FD_ZERO(&readers); + FD_ZERO(&writers); + nfds = ares_fds(channel, &readers, &writers); + if (nfds == 0) + break; + + gettimeofday(&check_time,0); + if (decrement_timeout(timeout, start_time, check_time)) { + ares_destroy(channel); + free_hostent(ar.ent); + return(TRY_AGAIN); + } + start_time = check_time; + + tvp = ares_timeout(channel, timeout, &tv); + + switch ( select(nfds, &readers, &writers, NULL, tvp) ) { + case -1: if (errno != EINTR) { + ares_destroy(channel); + free_hostent(ar.ent); + return NETDB_INTERNAL; + } else + continue; + case 0: + FD_ZERO(&readers); + FD_ZERO(&writers); + /* fallthrough */ + default : ares_process(channel, &readers, &writers); + } + + } + + + ares_destroy(channel); + + if (ar.err == NETDB_SUCCESS) { + *name = strdup(ar.ent->h_name); + free_hostent(ar.ent); + } + return (ar.err); +} + +static int read_roots(const char *file) +{ + FILE *roots = fopen(file,"r"); + char buf[BUFSIZ]; + int cnt = 0; + + if (!roots) { + perror(file); + return 1; + } + + while (!feof(roots)) { + char *nl; + fgets(buf,sizeof buf,roots); + nl = strchr(buf,'\n'); + if (nl) *nl = 0; + + super_users = realloc(super_users, (cnt+1) * sizeof super_users[0]); + super_users[cnt] = strdup(buf); + super_users[++cnt] = NULL; + } + + fclose(roots); + + return 0; +} + +static int amIroot(const char *subj) +{ + int i; + + if (!subj) return 0; + for (i=0; super_users && super_users[i]; i++) + if (strcmp(subj,super_users[i]) == 0) return 1; + + return 0; +} + +static int parse_limits(char *opt, int *j_limit, int *e_limit, int *size_limit) +{ + return (sscanf(opt, "%d:%d:%d", j_limit, e_limit, size_limit) == 3); +} + static int check_mkdir(const char *dir) { struct stat sbuf; - if (stat(dir,&sbuf)) { - if (errno == ENOENT) { - if (mkdir(dir, S_IRWXU)) { - dprintf(("[%d] %s: %s\n", - getpid(),dir,strerror(errno))); - - if (!debug) syslog(LOG_CRIT,"%s: %m",dir); + if ( stat(dir, &sbuf) ) + { + if ( errno == ENOENT ) + { + if ( mkdir(dir, S_IRWXU) ) + { + dprintf(("[%d] %s: %s\n", getpid(), dir, strerror(errno))); + if (!debug) syslog(LOG_CRIT, "%s: %m", dir); return 1; } } - else { - dprintf(("[%d] %s: %s\n",getpid(),strerror(errno))); - if (!debug) syslog(LOG_CRIT,"%s: %m",dir); + else + { + dprintf(("[%d] %s: %s\n", getpid(), dir, strerror(errno))); + if (!debug) syslog(LOG_CRIT, "%s: %m", dir); return 1; } } @@ -1322,4 +1283,19 @@ static int check_mkdir(const char *dir) if (!debug) syslog(LOG_CRIT,"%s: not a directory",dir); return 1; } + + return 0; } + +static int decrement_timeout(struct timeval *timeout, struct timeval before, struct timeval after) +{ + (*timeout).tv_sec = (*timeout).tv_sec - (after.tv_sec - before.tv_sec); + (*timeout).tv_usec = (*timeout).tv_usec - (after.tv_usec - before.tv_usec); + while ( (*timeout).tv_usec < 0) { + (*timeout).tv_sec--; + (*timeout).tv_usec += 1000000; + } + if ( ((*timeout).tv_sec < 0) || (((*timeout).tv_sec == 0) && ((*timeout).tv_usec == 0)) ) return(1); + else return(0); +} + diff --git a/org.glite.lb.server/src/ws_fault.c b/org.glite.lb.server/src/ws_fault.c new file mode 100644 index 0000000..6f7bbc0 --- /dev/null +++ b/org.glite.lb.server/src/ws_fault.c @@ -0,0 +1,56 @@ +#include + +#include "glite/lb/context-int.h" + +#include "bk_ws_H.h" +#include "bk_ws_Stub.h" + + +void edg_wll_ErrToFault(const edg_wll_Context ctx,struct soap *soap) +{ + char *et,*ed; + struct SOAP_ENV__Detail *detail = soap_malloc(soap,sizeof *detail); + struct _GenericLBFault *f = soap_malloc(soap,sizeof *f); + + + f->edgwll__GenericLBFault = soap_malloc(soap,sizeof *f->edgwll__GenericLBFault); + + f->edgwll__GenericLBFault->code = edg_wll_Error(ctx,&et,&ed); + f->edgwll__GenericLBFault->text = soap_malloc(soap,strlen(et)+1); + strcpy(f->edgwll__GenericLBFault->text,et); + free(et); + f->edgwll__GenericLBFault->description = soap_malloc(soap,strlen(ed)+1); + strcpy(f->edgwll__GenericLBFault->description,ed); + free(ed); + + detail->__type = SOAP_TYPE__GenericLBFault; + detail->value = f; + detail->__any = NULL; + + soap_receiver_fault(soap,"shit",NULL); + if (soap->version == 2) soap->fault->SOAP_ENV__Detail = detail; + else soap->fault->detail = detail; +} + + +void edg_wll_FaultToErr(const struct soap *soap,edg_wll_Context ctx) +{ + struct SOAP_ENV__Detail *detail = soap->version == 2 ? + soap->fault->SOAP_ENV__Detail : soap->fault->detail; + + struct edgwll__GenericLBFaultType *f; + + if (detail->__type == SOAP_TYPE__GenericLBFault) { + f = ((struct _GenericLBFault *) detail->value) + ->edgwll__GenericLBFault; + edg_wll_SetError(ctx,f->code,f->description); + } + else { + char *s; + + asprintf(&s,"SOAP: %s", soap->version == 2 ? + soap->fault->SOAP_ENV__Reason : soap->fault->faultstring); + edg_wll_SetError(ctx,EINVAL,s); + free(s); + } +} diff --git a/org.glite.lb.server/src/ws_plugin.c b/org.glite.lb.server/src/ws_plugin.c new file mode 100644 index 0000000..6a3df26 --- /dev/null +++ b/org.glite.lb.server/src/ws_plugin.c @@ -0,0 +1,191 @@ +#include +#include +#include +#include + +#include "glite/lb/lb_gss.h" +#include "glite/lb/context-int.h" + +#ifdef PLUGIN_TEST +extern int edg_wll_open(edg_wll_Context); +#endif + +#include "ws_plugin.h" + +#include "LoggingAndBookkeeping.nsmap" + +#ifdef WS_PLUGIN_DEBUG +# define pdprintf(s) printf s +#else +# define pdprintf(s) +#endif + +static const char plugin_id[] = PLUGIN_ID; + +#ifdef PLUGIN_TEST +static int edg_wll_ws_connect(struct soap *, const char *, const char *, int); +#endif +static void edg_wll_ws_delete(struct soap *, struct soap_plugin *); +static size_t edg_wll_ws_recv(struct soap *, char *, size_t); +static int edg_wll_ws_send(struct soap *, const char *, size_t); + + +int edg_wll_ws_plugin(struct soap *soap, struct soap_plugin *p, void *arg) +{ + /* The parametr (edg_wll_Context) must be given! */ + assert(arg != NULL); + + p->id = plugin_id; + p->data = arg; + p->fdelete = edg_wll_ws_delete; + +#ifdef PLUGIN_TEST + soap->fconnect = edg_wll_ws_connect; +#endif + soap->fsend = edg_wll_ws_send; + soap->frecv = edg_wll_ws_recv; + + return SOAP_OK; +} + +#ifdef PLUGIN_TEST +int edg_wll_ws_connect(struct soap *soap, const char *endpoint, + const char *host, int port) +{ + edg_wll_Context ctx = (edg_wll_Context)soap_lookup_plugin(soap, plugin_id); + + + ctx->srvName = strdup(host); + ctx->srvPort = port; + ctx->p_tmp_timeout = ctx->p_query_timeout; + if ( edg_wll_open(ctx) ) + return edg_wll_Error(ctx, NULL, NULL); + + soap->socket = 2; + + return SOAP_OK; +} +#endif + +static void edg_wll_ws_delete(struct soap *soap, struct soap_plugin *p) +{ + /* + * Keep silly gSOAP happy + */ +} + + +size_t edg_wll_ws_recv(struct soap *soap, char *buf, size_t bufsz) +{ + edg_wll_Context ctx = (edg_wll_Context)soap_lookup_plugin(soap, plugin_id); + edg_wll_GssStatus gss_code; + int len; + + + edg_wll_ResetError(ctx); + if ( ctx->connPool[ctx->connToUse].gss.context == GSS_C_NO_CONTEXT ) + { + edg_wll_SetError(ctx, ENOTCONN, NULL); + soap->errnum = ENOTCONN; + return 0; + } + + len = edg_wll_gss_read(&ctx->connPool[ctx->connToUse].gss, + buf, bufsz, &ctx->p_tmp_timeout, &gss_code); + + switch ( len ) + { + case EDG_WLL_GSS_OK: + break; + + case EDG_WLL_GSS_ERROR_GSS: + edg_wll_SetErrorGss(ctx, "receving WS request", &gss_code); + soap->errnum = ENOTCONN; + return 0; + + case EDG_WLL_GSS_ERROR_ERRNO: + edg_wll_SetError(ctx, errno, "edg_wll_gss_read()"); + soap->errnum = errno; + return 0; + + case EDG_WLL_GSS_ERROR_TIMEOUT: + edg_wll_SetError(ctx, ETIMEDOUT, NULL); + soap->errnum = ETIMEDOUT; + return 0; + + case EDG_WLL_GSS_ERROR_EOF: + edg_wll_SetError(ctx, ENOTCONN, NULL); + soap->errnum = ENOTCONN; + return 0; + + /* default: fallthrough */ + } + + pdprintf(("\nWS received:\n%s\n\n", buf)); + return len; +} + +static int edg_wll_ws_send(struct soap *soap, const char *buf, size_t bufsz) +{ + edg_wll_Context ctx = (edg_wll_Context) soap_lookup_plugin(soap, plugin_id); + edg_wll_GssStatus gss_code; + struct sigaction sa, osa; + int total = 0, + ret; + + + edg_wll_ResetError(ctx); + + if ( ctx->connPool[ctx->connToUse].gss.context == GSS_C_NO_CONTEXT ) + { + edg_wll_SetError(ctx, ENOTCONN, NULL); + soap->errnum = ENOTCONN; + return SOAP_EOF; + } + + memset(&sa, 0, sizeof(sa)); + assert(sa.sa_handler == NULL); + sa.sa_handler = SIG_IGN; + sigaction(SIGPIPE, &sa, &osa); + + ret = edg_wll_gss_write_full(&ctx->connPool[ctx->connToUse].gss, + (void*)buf, bufsz, + &ctx->p_tmp_timeout, + &total, &gss_code); + + sigaction(SIGPIPE, &osa, NULL); + + switch ( ret ) + { + case EDG_WLL_GSS_OK: + pdprintf(("\nWS sent:\n%s\n\n", buf)); + break; + + case EDG_WLL_GSS_ERROR_TIMEOUT: + edg_wll_SetError(ctx, ETIMEDOUT, "edg_wll_ws_send()"); + soap->errnum = ETIMEDOUT; + return SOAP_EOF; + + case EDG_WLL_GSS_ERROR_ERRNO: + if ( errno == EPIPE ) + { + edg_wll_SetError(ctx, ENOTCONN, "edg_wll_ws_send()"); + soap->errnum = ENOTCONN; + } + else + { + edg_wll_SetError(ctx, errno, "edg_wll_ws_send()"); + soap->errnum = errno; + } + return SOAP_EOF; + + case EDG_WLL_GSS_ERROR_GSS: + case EDG_WLL_GSS_ERROR_EOF: + default: + edg_wll_SetError(ctx, ENOTCONN, "edg_wll_ws_send()"); + soap->errnum = ENOTCONN; + return SOAP_EOF; + } + + return SOAP_OK; +} diff --git a/org.glite.lb.server/src/ws_plugin.h b/org.glite.lb.server/src/ws_plugin.h new file mode 100644 index 0000000..9b691be --- /dev/null +++ b/org.glite.lb.server/src/ws_plugin.h @@ -0,0 +1,8 @@ +#ifndef __EDG_WORKLOAD_LOGGING_LBSERVER_WS_PLUGIN_H__ +#define __EDG_WORKLOAD_LOGGING_LBSERVER_WS_PLUGIN_H__ + +#define PLUGIN_ID "GLITE_WS_PLUGIN" + +int edg_wll_ws_plugin(struct soap *, struct soap_plugin *, void *); + +#endif /* __EDG_WORKLOAD_LOGGING_LBSERVER_WS_PLUGIN_H__ */ diff --git a/org.glite.lb.server/src/ws_query.c b/org.glite.lb.server/src/ws_query.c new file mode 100644 index 0000000..3ab7264 --- /dev/null +++ b/org.glite.lb.server/src/ws_query.c @@ -0,0 +1,105 @@ +#include + +#include "glite/lb/context-int.h" +#include "glite/lb/consumer.h" + +#include "jobstat.h" +#include "query.h" +#include "ws_plugin.h" +#include "bk_ws_H.h" + +int edgwll2__JobStatus( + struct soap *soap, + char *jobid, + struct edgwll__JobStatFlags *flags, + struct edgwll2__JobStatusResponse *out) +{ + edg_wll_Context ctx = (edg_wll_Context) soap_lookup_plugin(soap, PLUGIN_ID); + edg_wlc_JobId j; + edg_wll_JobStat s; + + + out->status = soap_malloc(soap, sizeof *out->status); + + if ( edg_wlc_JobIdParse(jobid, &j) ) + { + edg_wll_SetError(ctx, EINVAL, jobid); + edg_wll_ErrToFault(ctx, soap); + return SOAP_FAULT; + } + + if ( edg_wll_JobStatus(ctx, j, 0, &s) ) + { + edg_wll_ErrToFault(ctx, soap); + return SOAP_FAULT; + } + + edg_wll_StatusToSoap(soap, &s, out->status); + + return SOAP_OK; +} + +int edgwll2__QueryJobs( + struct soap *soap, + struct edgwll__QueryConditions *conditions, + struct edgwll__JobStatFlags *flags, + struct edgwll2__QueryJobsResponse *out) +{ + edg_wll_Context ctx = (edg_wll_Context) soap_lookup_plugin(soap, PLUGIN_ID); + edg_wlc_JobId *jobsOut = NULL; + edg_wll_JobStat *statesOut = NULL; + edg_wll_QueryRec **qr = NULL; + int fl, + i, j, + ret = SOAP_FAULT; + + + out->states = soap_malloc(soap, sizeof(*out->states)); + out->jobs = soap_malloc(soap, sizeof(*out->jobs)); + if ( !out->states || !out->jobs ) goto cleanup; + memset(out->states, 0, sizeof(*(out->states))); + memset(out->jobs, 0, sizeof(*(out->jobs))); + + edg_wll_ResetError(ctx); + edg_wll_SoapToJobStatFlags(flags, &fl); + if ( !edg_wll_SoapToQueryCondsExt(conditions, &qr) ) { + edg_wll_SetError(ctx, ENOMEM, "Couldn't create internal structures"); + goto cleanup; + } + if ( edg_wll_QueryJobsServer(ctx, qr, fl, &jobsOut, &statesOut) ) goto cleanup; + if ( edg_wll_JobsQueryResToSoap(jobsOut, statesOut, out) ) goto cleanup; + ret = SOAP_OK; + +cleanup: + if ( qr ) { + for ( i = 0; qr[i]; i++ ) { + for ( j = 0; qr[i][j].attr; j++ ) + edg_wll_QueryRecFree(&qr[i][j]); + free(qr[i]); + } + free(qr); + } + if ( jobsOut ) { + for ( i = 0; jobsOut[i]; i++ ) + edg_wlc_JobIdFree(jobsOut[i]); + free(jobsOut); + } + if ( statesOut ) { + for ( i = 0; statesOut[i].state; i++ ) + edg_wll_FreeStatus(&statesOut[i]); + free(statesOut); + } + if ( ret == SOAP_FAULT ) edg_wll_ErrToFault(ctx, soap); + + return ret; +} + +int edgwll2__UserJobs( + struct soap *soap, + struct edgwll2__UserJobsResponse *out) +{ + out->jobs = NULL; + out->states = NULL; + + return SOAP_OK; +} diff --git a/org.glite.lb.server/src/ws_typemap.dat b/org.glite.lb.server/src/ws_typemap.dat new file mode 100644 index 0000000..c79474a --- /dev/null +++ b/org.glite.lb.server/src/ws_typemap.dat @@ -0,0 +1,2 @@ +edgwll = http://glite.org/wsdl/types/lb +edgwll = http://glite.org/wsdl/services/lb diff --git a/org.glite.lb.server/src/ws_typeref.c.T b/org.glite.lb.server/src/ws_typeref.c.T new file mode 100644 index 0000000..8668a34 --- /dev/null +++ b/org.glite.lb.server/src/ws_typeref.c.T @@ -0,0 +1,459 @@ +#include +#include +#include +#include + +#include "bk_ws_H.h" + +#include "glite/lb/consumer.h" +/* XXX: references only, src and dest share pointers */ + +static void edg_wll_JobStatCodeToSoap(enum edgwll__JobStatCode in, edg_wll_JobStatCode *out) +{ + switch ( in ) + { + case EDG_WLL_JOB_UNDEF: *out = UNDEF; break; +@@@{ + for my $stat ($status->getTypesOrdered) { + my $u = uc $stat; + my $c = getTypeComment $status $stat; + gen qq{ +! case EDG_WLL_JOB_$u: *out = $u; break; /**< $c */ +}; + } +@@@} + } +} + +static void edg_wll_SoapToJobStatCode(enum edgwll__JobStatCode in, edg_wll_JobStatCode *out) +{ + switch ( in ) + { + case UNDEF: *out = EDG_WLL_JOB_UNDEF; break; +@@@{ + for my $stat ($status->getTypesOrdered) { + my $u = uc $stat; + my $c = getTypeComment $status $stat; + gen qq{ +! case $u: *out = EDG_WLL_JOB_$u; break; /**< $c */ +}; + } +@@@} + } +} + +void edg_wll_StatusToSoap(struct soap *soap,edg_wll_JobStat const *src,struct edgwll__JobStat *dest) +{ + int i; + + memset(dest,0,sizeof *dest); + dest->state = src->state; /* XXX: enum mapping */ +@@@{ + selectType $status '_common_'; + for (getFieldsOrdered $status) { + my $f = selectField $status $_; + my $ft = $f->{type}; + my $usuc = $_; + $usuc = $1.uc($2).$3 while ($usuc =~ /([^_]*)_([a-z])(.*)/); + if ($ft eq 'jobid') { + gen "\tdest->$usuc = edg_wlc_JobIdUnparse(src->$_);\n"; + } + elsif ($ft eq 'strlist') { + gen "\tfor (i=0; src->$_ && src->$_\[i]; i++);\n"; + gen "\tdest->__size$usuc = i;\n"; + gen "\tdest->$usuc = src->$_;\n"; + } elsif ($ft eq 'intlist') { + gen "\tdest->__size$usuc = src->$_ ? src->$_\[0] : 0;\n"; + gen "\tdest->$usuc = src->$_ ? src->$_ + 1 : NULL;\n"; + } elsif ($ft eq 'stslist' || $ft eq 'taglist') { +# FIXME + gen "\tdest->__size$usuc = 0;\n"; + gen "\tdest->$usuc = NULL;\n"; + } elsif ($ft eq 'timeval') { + gen "\tdest->$usuc = soap_malloc(soap,sizeof *dest->$usuc);\n"; + gen "\tdest->$usuc->tvSec = src->$_.tv_sec;\n"; + gen "\tdest->$usuc->tvUsec = src->$_.tv_usec;\n"; + } + else { gen "\tdest->$usuc = src->$_;\n"; } + } +@@@} +} + +void edg_wll_SoapToStatus(struct soap *soap,struct edgwll__JobStat const *src,edg_wll_JobStat *dest) +{ + memset(dest,0,sizeof *dest); + dest->state = src->state; /* XXX: enum mapping */ + +@@@{ + selectType $status '_common_'; + for (getFieldsOrdered $status) { + my $f = selectField $status $_; + my $ft = $f->{type}; + my $usuc = $_; + $usuc = $1.uc($2).$3 while ($usuc =~ /([^_]*)_([a-z])(.*)/); + + if ($ft eq 'jobid') { + gen "\tif (src->$usuc) edg_wlc_JobIdParse(src->$usuc,&dest->$_);\n"; + } elsif ($ft eq 'strlist') { + gen "\tsoap_unlink(soap,src->$usuc);\n"; + gen "\tdest->$_ = realloc(src->$usuc,sizeof(*dest->$_) * (src->__size$usuc + 1));\n"; + gen "\tdest->$_\[src->__size$usuc] = NULL;\n"; + } elsif ($ft eq 'intlist') { + gen "\tsoap_unlink(soap,src->$usuc);\n"; + gen "\tdest->$_ = realloc(src->$usuc,sizeof(*dest->$_) * (src->__size$usuc + 1));\n"; + gen "\tmemmove(dest->$_ + 1,dest->$_,sizeof(*dest->$_) * (src->__size$usuc));\n"; + gen "\tdest->$_\[0] = src->__size$usuc;\n"; + } elsif ($ft eq 'stslist' || $ft eq 'taglist') { +# FIXME + gen "\tdest->$_ = NULL;\n"; + } elsif ($ft eq 'timeval') { + gen "\tdest->$_.tv_sec = src->$usuc->tvSec;\n"; + gen "\tdest->$_.tv_usec = src->$usuc->tvUsec;\n"; + } elsif ($ft eq 'string') { + gen "\tdest->$_ = src->$usuc;\n"; + gen "\tsoap_unlink(soap,src->$usuc);\n"; + } else { + gen "\tdest->$_ = src->$usuc;\n"; +} +} +@@@} +} + +void edg_wll_SoapToJobStatFlags(const struct edgwll__JobStatFlags *in, int *out) +{ + int i; + + assert(in); assert(out); + for ( i = 0; i < in->__sizeflag; i++ ) switch ( in->flag[i] ) + { + case CLASSADS: *out |= EDG_WLL_STAT_CLASSADS; break; + case CHILDREN: *out |= EDG_WLL_STAT_CHILDREN; break; + case CHILDSTAT: *out |= EDG_WLL_STAT_CHILDSTAT; break; + } +} + +int edg_wll_JobStatFlagsToSoap(struct soap *soap, const int in, struct edgwll__JobStatFlags *out) +{ + int i = 0; + + assert(out); + memset(out, 0, sizeof(*out)); + if ( in & EDG_WLL_STAT_CLASSADS ) out->__sizeflag++; + if ( in & EDG_WLL_STAT_CHILDREN ) out->__sizeflag++; + if ( in & EDG_WLL_STAT_CHILDSTAT ) out->__sizeflag++; + if ( !out->__sizeflag ) return SOAP_OK; + out->flag = soap_malloc(soap, sizeof(*(out->flag))*out->__sizeflag); + if ( !out->flag ) return SOAP_FAULT; + + if ( in & EDG_WLL_STAT_CLASSADS ) out->flag[i++] = CLASSADS; + if ( in & EDG_WLL_STAT_CHILDREN ) out->flag[i++] = CHILDREN; + if ( in & EDG_WLL_STAT_CHILDSTAT ) out->flag[i++] = CHILDSTAT; + + return SOAP_OK; +} + +static void edg_wll_SoapToAttr(const enum edgwll__QueryAttr in, edg_wll_QueryAttr *out) +{ + switch ( in ) + { + case UNDEF: *out = EDG_WLL_QUERY_ATTR_UNDEF; break; + case JOBID: *out = EDG_WLL_QUERY_ATTR_JOBID; break; + case OWNER: *out = EDG_WLL_QUERY_ATTR_OWNER; break; + case STATUS: *out = EDG_WLL_QUERY_ATTR_STATUS; break; + case LOCATION: *out = EDG_WLL_QUERY_ATTR_LOCATION; break; + case DESTINATION: *out = EDG_WLL_QUERY_ATTR_DESTINATION; break; + case DONECODE: *out = EDG_WLL_QUERY_ATTR_DONECODE; break; + case USERTAG: *out = EDG_WLL_QUERY_ATTR_USERTAG; break; + case TIME: *out = EDG_WLL_QUERY_ATTR_TIME; break; + case LEVEL: *out = EDG_WLL_QUERY_ATTR_LEVEL; break; + case HOST: *out = EDG_WLL_QUERY_ATTR_HOST; break; + case SOURCE: *out = EDG_WLL_QUERY_ATTR_SOURCE; break; + case INSTANCE: *out = EDG_WLL_QUERY_ATTR_INSTANCE; break; + case EVENT_TYPE: *out = EDG_WLL_QUERY_ATTR_EVENT_TYPE; break; + case CHKPT_TAG: *out = EDG_WLL_QUERY_ATTR_CHKPT_TAG; break; + case RESUBMITTED: *out = EDG_WLL_QUERY_ATTR_RESUBMITTED; break; + case PARENT: *out = EDG_WLL_QUERY_ATTR_PARENT; break; + case EXITCODE: *out = EDG_WLL_QUERY_ATTR_EXITCODE; break; + default: *out = in; break; + } +} + +static void edg_wll_AttrToSoap(const edg_wll_QueryAttr in, enum edgwll__QueryAttr *out) +{ + switch ( in ) + { + case EDG_WLL_QUERY_ATTR_UNDEF: *out = UNDEF; break; + case EDG_WLL_QUERY_ATTR_JOBID: *out = JOBID; break; + case EDG_WLL_QUERY_ATTR_OWNER: *out = OWNER; break; + case EDG_WLL_QUERY_ATTR_STATUS: *out = STATUS; break; + case EDG_WLL_QUERY_ATTR_LOCATION: *out = LOCATION; break; + case EDG_WLL_QUERY_ATTR_DESTINATION: *out = DESTINATION; break; + case EDG_WLL_QUERY_ATTR_DONECODE: *out = DONECODE; break; + case EDG_WLL_QUERY_ATTR_USERTAG: *out = USERTAG; break; + case EDG_WLL_QUERY_ATTR_TIME: *out = TIME; break; + case EDG_WLL_QUERY_ATTR_LEVEL: *out = LEVEL; break; + case EDG_WLL_QUERY_ATTR_HOST: *out = HOST; break; + case EDG_WLL_QUERY_ATTR_SOURCE: *out = SOURCE; break; + case EDG_WLL_QUERY_ATTR_INSTANCE: *out = INSTANCE; break; + case EDG_WLL_QUERY_ATTR_EVENT_TYPE: *out = EVENT_TYPE; break; + case EDG_WLL_QUERY_ATTR_CHKPT_TAG: *out = CHKPT_TAG; break; + case EDG_WLL_QUERY_ATTR_RESUBMITTED: *out = RESUBMITTED; break; + case EDG_WLL_QUERY_ATTR_PARENT: *out = PARENT; break; + case EDG_WLL_QUERY_ATTR_EXITCODE: *out = EXITCODE; break; + default: *out = in; break; + } +} + +static void edg_wll_SoapToQueryOp(const enum edgwll__QueryOp in, edg_wll_QueryOp *out) +{ + switch ( in ) + { + case EQUAL: *out = EDG_WLL_QUERY_OP_EQUAL; break; + case LESS: *out = EDG_WLL_QUERY_OP_LESS; break; + case GREATER: *out = EDG_WLL_QUERY_OP_GREATER; break; + case WITHIN: *out = EDG_WLL_QUERY_OP_WITHIN; break; + default: *out = in; break; + } +} + +static void edg_wll_QueryOpToSoap(const enum edgwll__QueryOp in, edg_wll_QueryOp *out) +{ + switch ( in ) + { + case EDG_WLL_QUERY_OP_EQUAL: *out = EQUAL; break; + case EDG_WLL_QUERY_OP_LESS: *out = LESS; break; + case EDG_WLL_QUERY_OP_GREATER: *out = GREATER; break; + case EDG_WLL_QUERY_OP_WITHIN: *out = WITHIN; break; + default: *out = in; break; + } +} + +int edg_wll_SoapToQueryVal( + const edg_wll_QueryAttr attr, + const struct edgwll__QueryRecValue *in, + union edg_wll_QueryVal *out) +{ + assert(attr); assert(in); assert(out); + switch ( attr ) + { + case EDG_WLL_QUERY_ATTR_USERTAG: + case EDG_WLL_QUERY_ATTR_OWNER: + case EDG_WLL_QUERY_ATTR_LOCATION: + case EDG_WLL_QUERY_ATTR_DESTINATION: + case EDG_WLL_QUERY_ATTR_HOST: + case EDG_WLL_QUERY_ATTR_INSTANCE: + out->c = strdup(in->c); + break; + case EDG_WLL_QUERY_ATTR_JOBID: + case EDG_WLL_QUERY_ATTR_PARENT: + edg_wlc_JobIdParse(in->c, &(out->j)); + break; + case EDG_WLL_QUERY_ATTR_TIME: + out->t.tv_sec = in->t->tvSec; + out->t.tv_usec = in->t->tvUsec; + break; + case EDG_WLL_QUERY_ATTR_STATUS: + case EDG_WLL_QUERY_ATTR_DONECODE: + case EDG_WLL_QUERY_ATTR_LEVEL: + case EDG_WLL_QUERY_ATTR_SOURCE: + case EDG_WLL_QUERY_ATTR_EVENT_TYPE: + case EDG_WLL_QUERY_ATTR_RESUBMITTED: + default: + out->i = *(in->i); + break; + } + + return 0; +} + +int edg_wll_QueryValToSoap( + const edg_wll_QueryAttr attr, + union edg_wll_QueryVal *in, + struct edgwll__QueryRecValue *out) +{ + assert(attr); assert(in); assert(out); + if ( (attr == EDG_WLL_QUERY_ATTR_JOBID) || (attr == EDG_WLL_QUERY_ATTR_PARENT) ) { + out->c = edg_wlc_JobIdUnparse(in->j); + } + else + return edg_wll_SoapToQueryVal(attr, out, in); + + return 0; +} + +int edg_wll_SoapToQueryRec( + const enum edgwll__QueryAttr attr, + const struct edgwll__QueryRec *in, + edg_wll_QueryRec *out) +{ + assert(attr); assert(in); assert(out); + memset(out, 0, sizeof(*out)); + edg_wll_SoapToAttr(attr, &out->attr); + switch ( out->attr ) + { + case EDG_WLL_QUERY_ATTR_TIME: + edg_wll_SoapToJobStatCode(*(in->attrid->state), &(out->attr_id.state)); + break; + case EDG_WLL_QUERY_ATTR_USERTAG: + out->attr_id.tag = strdup(in->attrid->tag); + break; + default: + break; + } + edg_wll_SoapToQueryOp(in->op, &(out->op)); + switch ( out->op ) + { + case EDG_WLL_QUERY_OP_WITHIN: + if ( edg_wll_SoapToQueryVal(attr, in->value2, &(out->value2)) ) goto err; + default: + if ( edg_wll_SoapToQueryVal(attr, in->value1, &(out->value)) ) goto err; + break; + } + + return 0; +err: + edg_wll_QueryRecFree(out); + return -1; +} + +int edg_wll_QueryRecToSoap( + struct soap *soap, + const edg_wll_QueryRec *in, + const struct edgwll__QueryRec *out) +{ + return SOAP_OK; +} + +int edg_wll_SoapToQueryConds(const struct edgwll__QueryCondition *in, edg_wll_QueryRec **out) +{ + int i; + edg_wll_QueryRec *qr; + + + assert(in); assert(*out); + if ( !(qr = calloc(in->__sizerecords+1, sizeof(*qr))) ) goto err; + for ( i = 0; i < in->__sizerecords; i++ ) + if ( edg_wll_SoapToQueryRec(in->attr, in->records[i], qr+i) ) goto err; + *out = qr; + return 0; + +err: + if ( qr ) { + for ( i = 0; qr[i].attr; i++ ) edg_wll_QueryRecFree(qr+i); + free(qr); + } + return -1; +} + +int edg_wll_QueryCondsToSoap( + struct soap *soap, + const edg_wll_QueryRec *in, + struct edgwll__QueryCondition *out) +{ + int i; + struct edgwll__QueryCondition qc; + + + assert(out); + if ( !in ) { memset(out, 0, sizeof(*out)); return SOAP_OK; } + memset(qc, 0, sizeof(*qc)); + while ( in[qc.__sizerecords].attr ) qc.__sizerecords++; + if ( qc.__sizerecords ) { memset(out, 0, sizeof(*out)); return SOAP_OK; } + qc.records = soap_malloc(soap, sizeof(*qc.records)*qc.__sizerecords); + if ( !qc.records ) return SOAP_FAULT; + qc.attr = in[0].attr; + for ( i = 0; in[i].attr; i++ ) + if ( edg_wll_QueryRecToSoap(soap, in+i, qc.records[i]) ) + return SOAP_FAULT; + + memcpy(out, &qc, sizeof(*qc)); + return SOAP_OK; + +} + +int edg_wll_SoapToQueryCondsExt(const struct edgwll__QueryConditions *in, edg_wll_QueryRec ***out) +{ + int i; + edg_wll_QueryRec **qr; + + + assert(in); assert(*out); + if ( !(qr = calloc(in->__sizecondition+1, sizeof(*qr))) ) return -1; + for ( i = 0; i < in->__sizecondition; i++ ) + if ( edg_wll_SoapToQueryConds(*(in->condition), qr+i) ) goto err; + + *out = qr; + return 0; + +err: + if ( qr ) { + int j; + for ( i = 0; qr[i]; i++ ) + { + for ( j = 0; qr[i][j].attr; j++ ) edg_wll_QueryRecFree(qr[i]); + free(qr[i]); + } + free(qr); + } + return -1; +} + +int edg_wll_QueryCondsExtToSoap( + struct soap *soap, + const edg_wll_QueryRec **in, + struct edgwll__QueryConditions *out) +{ + int i; + struct edgwll__QueryConditions qc; + + + assert(out); + if ( !in ) { memset(out, 0, sizeof(*out)); return SOAP_OK; } + memset(qc, 0, sizeof(*qc)); + while ( in[qc.__sizecondition] ) qc.__sizecondition++; + if ( !qc.__sizecondition ) { memset(out, 0, sizeof(*out)); return SOAP_OK; } + qc.condition = soap_malloc(soap, sizeof(*qc.condition)*qc.__sizecondition); + if ( !qc.conditions ) return SOAP_FAULT; + for ( i = 0; in[i]; i++ ) + if ( edg_wll_QueryCondsToSoap(soap, in[i], qc.condition[i]) ) + return SOAP_FAULT; + + memcpy(out, &qc, sizeof(*qc)); + return SOAP_OK; +} + +int edg_wll_JobsQueryResToSoap( + struct soap *soap, + edg_wlc_JobId *jobs, + edg_wll_JobStat *states, + struct edgwll2__QueryJobsResponse *out) +{ + int i; + + + assert(out); assert(out->states); assert(out->jobs); + memset(out->jobs, 0, sizeof(*(out->jobs))); + memset(out->states, 0, sizeof(*(out->states))); + + if ( jobs ) { + for ( i = 0; jobs[i]; i++ ) ; + out->jobs->jobs = soap_malloc(soap, sizeof(*(out->jobs->jobs))*i); + if ( !out->jobs->jobs ) return SOAP_FAULT; + out->jobs->__sizejobs = i; + for ( i = 0; jobs[i]; i++ ) { + char *s; + if ( !(s = edg_wlc_JobIdUnparse(jobs[i])) ) return SOAP_FAULT; + if ( !(out->jobs->jobs[i] = soap_strdup(s)) ) return SOAP_FAULT; + free(s); + } + } + if ( states ) { + for ( i = 0; states[i].state; i++ ) ; + out->states->states = soap_malloc(soap, sizeof(*(out->states->states))*i); + if ( !out->states->states ) return SOAP_FAULT; + for ( i = 0; states[i].state; i++ ) + edg_wll_StatusToSoap(soap, states+i, out->states->states[i]); + } + + return SOAP_OK; +} -- 1.8.2.3