From cbfd06d5f565267c1975591c7f283dea1fe9881d Mon Sep 17 00:00:00 2001 From: =?utf8?q?Ale=C5=A1=20K=C5=99enek?= Date: Tue, 14 Nov 2006 17:39:11 +0000 Subject: [PATCH] merged connpool branch --- org.glite.lb.client-interface/interface/context.h | 2 +- org.glite.lb.client/Makefile | 10 +- org.glite.lb.client/examples/user_jobs_threaded.c | 225 ++++++++++++++++++ org.glite.lb.client/src/connection.c | 212 +++++++++++------ org.glite.lb.client/src/connection.h | 4 +- org.glite.lb.common/Makefile | 7 +- org.glite.lb.common/interface/connpool.h | 98 ++++++++ org.glite.lb.common/interface/context-int.h | 28 +-- org.glite.lb.common/interface/log_proto.h | 2 +- org.glite.lb.common/interface/mini_http.h | 11 +- org.glite.lb.common/src/connpool.c | 277 ++++++++++++++++++++++ org.glite.lb.common/src/context.c | 49 +++- org.glite.lb.common/src/mini_http.c | 64 ++--- org.glite.lb.common/src/param.c | 12 +- org.glite.lb.server-bones/src/srvbones.c | 1 - org.glite.lb.server/src/bkserverd.c | 24 +- org.glite.lb.server/src/lb_http.c | 4 +- org.glite.lb.server/src/notification.c | 4 +- org.glite.lb.server/src/srv_purge.c | 2 +- org.glite.lb.server/src/stored_master.c | 4 +- 20 files changed, 862 insertions(+), 178 deletions(-) create mode 100644 org.glite.lb.client/examples/user_jobs_threaded.c create mode 100644 org.glite.lb.common/interface/connpool.h create mode 100644 org.glite.lb.common/src/connpool.c diff --git a/org.glite.lb.client-interface/interface/context.h b/org.glite.lb.client-interface/interface/context.h index 747c44c..9c78a55 100644 --- a/org.glite.lb.client-interface/interface/context.h +++ b/org.glite.lb.client-interface/interface/context.h @@ -43,7 +43,7 @@ typedef enum _edg_wll_ContextParam { EDG_WLL_PARAM_QUERY_JOBS_LIMIT, /**< maximal query jobs result size */ EDG_WLL_PARAM_QUERY_EVENTS_LIMIT,/**< maximal query events result size */ EDG_WLL_PARAM_QUERY_RESULTS, /**< maximal query result size */ - EDG_WLL_PARAM_QUERY_CONNECTIONS,/**< maximal number of open connections in ctx->connPoll */ + EDG_WLL_PARAM_CONNPOOL_SIZE, /**< maximal number of open connections in connectionsHandle.poolSize */ EDG_WLL_PARAM_NOTIF_SERVER, /**< default notification server name */ EDG_WLL_PARAM_NOTIF_SERVER_PORT,/**< default notification server port */ EDG_WLL_PARAM_NOTIF_TIMEOUT, /**< notif timeout */ diff --git a/org.glite.lb.client/Makefile b/org.glite.lb.client/Makefile index 63b749c..f1118cd 100644 --- a/org.glite.lb.client/Makefile +++ b/org.glite.lb.client/Makefile @@ -136,7 +136,8 @@ TOOLS:=dump load purge lb_dump_exporter ${LB_PERF_TOOLS} EXAMPLES:=log_usertag_proxy job_log job_reg feed_shark notify query_ext query_seq_code stats abort_job change_acl stresslog EXAMPLES:=log_usertag_proxy job_log job_reg feed_shark notify query_ext query_seq_code stats abort_job change_acl stresslog lbmon flood_proxy dagids stress_context -EXAMPLES_CL=user_jobs job_status +EXAMPLES_CL=user_jobs job_status +EXAMPLES_CL_THR=user_jobs_threaded FAKE_EXAMPLES:=job_log_fake MAN_GZ:=glite-lb-logevent.1.gz @@ -185,6 +186,9 @@ ${TOOLS} ${EXAMPLES}: %: %.o ${EXAMPLES_CL}: %: %.o ${LINK} -o $@ $< ${LIB} ${COMMON_LIB} ${EXT_LIB} +${EXAMPLES_CL_THR}: %: %.o + ${LINK} -o $@ $< ${THRLIB} ${COMMON_LIB_THR} ${EXT_LIB} + ${FAKE_EXAMPLES}: %: %.o ${FAKELIB} ${LINK} -o $@ $< ${FAKELIB} ${TEST_LIBS} ${EXT_LIB} @@ -244,7 +248,7 @@ else compile all: check_version ${LIB} ${THRLIB} ${TOOLS} logevent ${PLUSLIB} ${THRPLUSLIB} examples ${MAN_GZ} endif -examples: ${EXAMPLES} ${EXAMPLES_CL} ${sh_PROGS} +examples: ${EXAMPLES} ${EXAMPLES_CL} ${EXAMPLES_CL_THR} ${sh_PROGS} fake: ${FAKE_EXAMPLES} @@ -298,7 +302,7 @@ endif for p in ${TOOLS} ; do \ ${INSTALL} -m 755 "$$p" "${PREFIX}/sbin/glite-lb-$$p"; \ done - for p in ${EXAMPLES} ${EXAMPLES_CL} ${sh_PROGS} ; do \ + for p in ${EXAMPLES} ${EXAMPLES_CL} ${EXAMPLES_CL_THR} ${sh_PROGS} ; do \ ${INSTALL} -m 755 "$$p" "${PREFIX}/examples/glite-lb-$$p"; \ done ${INSTALL} -m 755 ${top_srcdir}/src/export.sh "${PREFIX}/sbin/glite-lb-export.sh" diff --git a/org.glite.lb.client/examples/user_jobs_threaded.c b/org.glite.lb.client/examples/user_jobs_threaded.c new file mode 100644 index 0000000..068fecd --- /dev/null +++ b/org.glite.lb.client/examples/user_jobs_threaded.c @@ -0,0 +1,225 @@ +#ident "$Header$" + +#include +#include +#include +#include +#include + +#include + +#include +#include + +#include "glite/lb/context.h" +#include "glite/lb/xml_conversions.h" +#include "glite/lb/consumer.h" + +int use_proxy = 0; + +int (*user_jobs)(edg_wll_Context, edg_wlc_JobId **, edg_wll_JobStat **); + + + +static const char *get_opt_string = "hxj:t:p:r:s:w:"; + +static struct option opts[] = { + {"help", 0, NULL, 'h'}, + {"proxy", 0, NULL, 'x'}, + {"owner", 1, NULL, 'o'}, + {"num-threads", 1, NULL, 't'}, + {"port_range", 1, NULL, 'p'}, + {"repeat", 1, NULL, 'r'}, + {"rand-start", 1, NULL, 's'}, +// {"rand-work", 1, NULL, 'w'}, + {NULL, 0, NULL, 0} +}; + + + +static void usage(char *me) +{ + fprintf(stderr,"usage: %s [option]\n" + "\t-h, --help\t show this help\n" + "\t-x, --proxy\t contact proxy (not implemented yet)\n" + "\t-o, --owner DN\t show jobs of user with this DN\n" + "\t-t, --num-threads N\t number for threads to create\n" + "\t-p, --port-range N\t connect to server:port, server:port+10, \n" + "\t\t\t ... server:port+N*10 bkservers\n" + "\t-r, --repeat N\t repeat query in each slave N-times \n" + "\t-s, --rand-start N\t start threads in random interval <0,N> sec\n" +// "\t-w, --rand-work N\t simulate random server respose time <0,N> sec\n" + "\n" + ,me); +} + + +typedef struct { + char *owner; + int proxy; + int rand_start; + int port_range; + int repeat; + char *argv_0;} thread_code_args; + +void *thread_code(thread_code_args *arguments) { + + edg_wll_Context ctx; + char *errt,*errd; + edg_wlc_JobId *jobs = NULL; + edg_wll_JobStat *states = NULL; + int i,j,k,port; + long sl; + + + sl = (unsigned long) ((double) random()/ RAND_MAX * arguments->rand_start * 1000000); + printf("Thread [%d] - sleeping for %ld us\n",pthread_self(),sl); + usleep( sl ); + + user_jobs = edg_wll_UserJobs; + + if (arguments->proxy) { + user_jobs = edg_wll_UserJobsProxy; + } + + edg_wll_InitContext(&ctx); + if ( user_jobs == edg_wll_UserJobsProxy && arguments->owner ) + edg_wll_SetParam(ctx, EDG_WLL_PARAM_LBPROXY_USER, arguments->owner); + + edg_wll_GetParam(ctx, EDG_WLL_PARAM_QUERY_SERVER_PORT, &port); + // pthread_self tend to return even number, so /7 makes some odd numbers... + edg_wll_SetParam(ctx, EDG_WLL_PARAM_QUERY_SERVER_PORT, + port + ((long) pthread_self()/7 % arguments->port_range)*10 ); + + for (k=0; krepeat; k++) { + if (user_jobs(ctx,&jobs,&states)) goto err; + for (i=0; states[i].state != EDG_WLL_JOB_UNDEF; i++) { + char *id = edg_wlc_JobIdUnparse(states[i].jobId), + *st = edg_wll_StatToString(states[i].state); + + if (!states[i].parent_job) { + if (states[i].jobtype == EDG_WLL_STAT_SIMPLE) { + printf(" %s .... %s %s\n", id, st, + (states[i].state==EDG_WLL_JOB_DONE) ? + edg_wll_done_codeToString(states[i].done_code) : "" ); + } + else if (states[i].jobtype == EDG_WLL_STAT_DAG) { + printf("DAG %s .... %s %s\n", id, st, + (states[i].state==EDG_WLL_JOB_DONE) ? + edg_wll_done_codeToString(states[i].done_code) : ""); + for (j=0; states[j].state != EDG_WLL_JOB_UNDEF; j++) { + if (states[j].parent_job) { + char *par_id = edg_wlc_JobIdUnparse(states[j].parent_job); + + if (!strcmp(id,par_id)) { + char *sub_id = edg_wlc_JobIdUnparse(states[j].jobId), + *sub_st = edg_wll_StatToString(states[j].state); + + printf(" `- %s .... %s %s\n", sub_id, sub_st, + (states[j].state==EDG_WLL_JOB_DONE) ? + edg_wll_done_codeToString(states[j].done_code) : ""); + free(sub_id); + free(sub_st); + } + free(par_id); + } + } + } + } + + free(id); + free(st); + } + + printf("\nFound %d jobs\n",i); + } +err: + free(arguments->owner); + if (jobs) { + for (i=0; jobs[i]; i++) edg_wlc_JobIdFree(jobs[i]); + free(jobs); + } + + if (states) { + for (i=0; states[i].state; i++) edg_wll_FreeStatus(&states[i]); + free(states); + } + + if (edg_wll_Error(ctx,&errt,&errd)) { + fprintf(stderr,"%s: %s (%s)\n",arguments->argv_0,errt,errd); + edg_wll_FreeContext(ctx); + pthread_exit(NULL); + } + + edg_wll_FreeContext(ctx); + +// printf("Thread %d exitting\n",(int)pthread_self()); + + pthread_exit(NULL); + +} + +int main(int argc,char **argv) +{ + thread_code_args arguments = { NULL, 0, 0, 1, 1,NULL }; + int i,rc,status,opt; + int thr_num = 10; // default + + + while ((opt = getopt_long(argc,argv,get_opt_string,opts,NULL)) != EOF) switch (opt) { + case 'x': arguments.proxy = 1; break; + case 'o': arguments.owner = optarg; break; + case 't': thr_num = atoi(optarg); break; + case 'p': arguments.port_range = atoi(optarg); break; + case 'r': arguments.repeat = atoi(optarg); break; + case 's': arguments.rand_start = atoi(optarg); break; + default : usage(argv[0]); exit(0); break; + } + + arguments.argv_0 = argv[0]; + + /* Do a thready work */ + { + pthread_t threads[thr_num]; + pthread_attr_t attr; + + if (globus_module_activate(GLOBUS_COMMON_MODULE) != GLOBUS_SUCCESS) { + fputs("globus_module_activate()\n",stderr); + return 1; + } + + pthread_attr_init(&attr); + pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE); + + for (i=0;iconnOpened); - assert(conn_index < ctx->connOpened); + assert(ctx->connections->connOpened); - edg_wll_gss_close(&ctx->connPool[conn_index].gss, &ctx->p_tmp_timeout); - if (ctx->connPool[conn_index].gsiCred) - gss_release_cred(&min_stat, &ctx->connPool[conn_index].gsiCred); - free(ctx->connPool[conn_index].peerName); - free(ctx->connPool[conn_index].buf); + edg_wll_gss_close(&ctx->connections->connPool[cIndex].gss, &ctx->p_tmp_timeout); + if (ctx->connections->connPool[cIndex].gsiCred) + gss_release_cred(&min_stat, &ctx->connections->connPool[cIndex].gsiCred); + free(ctx->connections->connPool[cIndex].peerName); + free(ctx->connections->connPool[cIndex].buf); - memset(ctx->connPool + conn_index, 0, sizeof(edg_wll_ConnPool)); + memset(ctx->connections->connPool + cIndex, 0, sizeof(edg_wll_ConnPool)); - /* if deleted conn was not the last one -> there is a 'hole' and then */ - /* 'shake' together connections in pool, no holes are allowed */ - if (conn_index < ctx->connOpened - 1) { - ctx->connPool[conn_index] = ctx->connPool[ctx->connOpened - 1]; - memset(ctx->connPool + ctx->connOpened - 1 , 0, sizeof(edg_wll_ConnPool)); - } - ctx->connOpened--; + ctx->connections->connOpened--; + + *conn_index = cIndex; } @@ -50,10 +50,33 @@ static int ConnectionIndex(edg_wll_Context ctx, const char *name, int port) { int i; - for (i=0; iconnOpened;i++) - if (!strcmp(name, ctx->connPool[i].peerName) && - (port == ctx->connPool[i].peerPort)) return i; - + for (i=0; iconnections->poolSize;i++) { + if ((ctx->connections->connPool[i].peerName != NULL) && + !strcmp(name, ctx->connections->connPool[i].peerName) && + (port == ctx->connections->connPool[i].peerPort)) { + + /* TryLock (next line) is in fact used only + to check the mutex status */ + switch (edg_wll_connectionTryLock(ctx, i)) { + case 0: + /* Connection was not locked but now it is. Since we do not + really know wheter we are interested in that connection, we + are simply unlocking it now. */ + edg_wll_connectionUnlock(ctx, i); + return i; + + case EBUSY: + /* Connection locked. Do not consider it */ + // try to find another free connection + break; + default: + /* Some obscure error occured. Need inspection */ + perror("ConnectionIndex() - locking problem \n"); + assert(0); + } + } + } + return -1; } @@ -61,52 +84,79 @@ static int ConnectionIndex(edg_wll_Context ctx, const char *name, int port) static int AddConnection(edg_wll_Context ctx, char *name, int port) { - int index = ctx->connOpened; - - free(ctx->connPool[index].peerName); // should be empty; just to be sure - ctx->connPool[index].peerName = strdup(ctx->srvName); - ctx->connPool[index].peerPort = ctx->srvPort; - ctx->connOpened++; + int i,index = -1; + + for (i = 0; i < ctx->connections->poolSize; i++) { + if (ctx->connections->connPool[i].peerName == NULL) { + if (!edg_wll_connectionTryLock(ctx, i)) { + index = i; // This connection is free and was not locked. We may lock it and use it. + break; + } + } + } + + if (index < 0) return -1; + + free(ctx->connections->connPool[index].peerName); // should be empty; just to be sure + ctx->connections->connPool[index].peerName = strdup(ctx->srvName); + ctx->connections->connPool[index].peerPort = ctx->srvPort; + ctx->connections->connOpened++; return index; } -static void ReleaseConnection(edg_wll_Context ctx, char *name, int port) +static int ReleaseConnection(edg_wll_Context ctx, char *name, int port) { - int i, index = 0; + int i, index = 0, foundConnToDrop = 0; long min; - if (ctx->connOpened == 0) return; /* nothing to release */ + edg_wll_ResetError(ctx); + if (ctx->connections->connOpened == 0) return 0; /* nothing to release */ if (name) { if ((index = ConnectionIndex(ctx, name, port)) >= 0) - CloseConnection(ctx, index); + CloseConnection(ctx, &index); } - else { /* free the oldest connection*/ - min = ctx->connPool[0].lastUsed.tv_sec; - for (i=0; iconnOpened; i++) { - if (ctx->connPool[i].lastUsed.tv_sec < min) { - min = ctx->connPool[i].lastUsed.tv_sec; - index = i; + else { /* free the oldest (unlocked) connection */ + for (i=0; iconnections->poolSize; i++) { + assert(ctx->connections->connPool[i].peerName); // Full pool expected - accept non-NULL values only + if (!edg_wll_connectionTryLock(ctx, i)) { + edg_wll_connectionUnlock(ctx, i); // Connection unlocked. Consider releasing it + if (foundConnToDrop) { // This is not the first unlocked connection + if (ctx->connections->connPool[i].lastUsed.tv_sec < min) { + min = ctx->connections->connPool[i].lastUsed.tv_sec; + index = i; + foundConnToDrop++; + } + } + else { // This is the first unlocked connection we have found. + foundConnToDrop++; + index = i; + min = ctx->connections->connPool[i].lastUsed.tv_sec; + } } } - CloseConnection(ctx, index); + if (!foundConnToDrop) return edg_wll_SetError(ctx,EAGAIN,"all connections in the connection pool are locked"); + CloseConnection(ctx, &index); } + return edg_wll_Error(ctx,NULL,NULL); } -int edg_wll_close(edg_wll_Context ctx) +int edg_wll_close(edg_wll_Context ctx, int* connToUse) { edg_wll_ResetError(ctx); - if (ctx->connToUse == -1) return 0; + if (*connToUse == -1) return 0; - CloseConnection(ctx, ctx->connToUse); + CloseConnection(ctx, connToUse); + + edg_wll_connectionUnlock(ctx, *connToUse); /* Forgetting the conn. Unlocking is safe. */ - ctx->connToUse = -1; + *connToUse = -1; return edg_wll_Error(ctx,NULL,NULL); } @@ -121,7 +171,7 @@ int edg_wll_close_proxy(edg_wll_Context ctx) -int edg_wll_open(edg_wll_Context ctx) +int edg_wll_open(edg_wll_Context ctx, int* connToUse) { int index; edg_wll_GssStatus gss_stat; @@ -129,34 +179,47 @@ int edg_wll_open(edg_wll_Context ctx) edg_wll_ResetError(ctx); + edg_wll_poolLock(); /* We are going to search the pool, it has better be locked */ + if ( (index = ConnectionIndex(ctx, ctx->srvName, ctx->srvPort)) == -1 ) { /* no such open connection in pool */ - if (ctx->connOpened == ctx->poolSize) - ReleaseConnection(ctx, NULL, 0); + if (ctx->connections->connOpened == ctx->connections->poolSize) + if(ReleaseConnection(ctx, NULL, 0)) goto end; index = AddConnection(ctx, ctx->srvName, ctx->srvPort); + if (index < 0) { + edg_wll_SetError(ctx,EAGAIN,"connection pool size exceeded"); + goto end; + } + + #ifdef EDG_WLL_CONNPOOL_DEBUG + printf("Connection to %s:%d opened as No. %d in the pool\n",ctx->srvName,ctx->srvPort,index); + #endif } /* else - there is cached open connection, reuse it */ - - ctx->connToUse = index; + #ifdef EDG_WLL_CONNPOOL_DEBUG + else printf("Connection to %s:%d exists (No. %d in the pool) - reusing\n",ctx->srvName,ctx->srvPort,index); + #endif + + *connToUse = index; /* XXX support anonymous connections, perhaps add a flag to the connPool * struct specifying whether or not this connection shall be authenticated * to prevent from repeated calls to edg_wll_gss_acquire_cred_gsi() */ - if (!ctx->connPool[index].gsiCred && + if (!ctx->connections->connPool[index].gsiCred && edg_wll_gss_acquire_cred_gsi( ctx->p_proxy_filename ? ctx->p_proxy_filename : ctx->p_cert_filename, ctx->p_proxy_filename ? ctx->p_proxy_filename : ctx->p_key_filename, - &ctx->connPool[index].gsiCred, NULL, &gss_stat)) { + &ctx->connections->connPool[index].gsiCred, NULL, &gss_stat)) { edg_wll_SetErrorGss(ctx, "failed to load GSI credentials", &gss_stat); goto err; } - if (ctx->connPool[index].gss.context == GSS_C_NO_CONTEXT) { - switch (edg_wll_gss_connect(ctx->connPool[index].gsiCred, - ctx->connPool[index].peerName, ctx->connPool[index].peerPort, - &ctx->p_tmp_timeout,&ctx->connPool[index].gss, + if (ctx->connections->connPool[index].gss.context == GSS_C_NO_CONTEXT) { + switch (edg_wll_gss_connect(ctx->connections->connPool[index].gsiCred, + ctx->connections->connPool[index].peerName, ctx->connections->connPool[index].peerPort, + &ctx->p_tmp_timeout,&ctx->connections->connPool[index].gss, &gss_stat)) { case EDG_WLL_GSS_OK: @@ -190,9 +253,18 @@ int edg_wll_open(edg_wll_Context ctx) err: /* some error occured; close created connection * and free all fields in connPool[index] */ - CloseConnection(ctx, index); - ctx->connToUse = -1; + if (index >= 0) CloseConnection(ctx, &index); + *connToUse = -1; ok: + + if (*connToUse>-1) edg_wll_connectionTryLock(ctx, *connToUse); /* Just to be sure we have not forgotten to lock it */ + +end: + + edg_wll_poolUnlock(); /* One way or the other, there are no more pool-wide operations */ + +// xxxxx + return edg_wll_Error(ctx,NULL,NULL); } @@ -342,39 +414,43 @@ int edg_wll_http_send_recv( { int ec; char *ed = NULL; + int connToUse = -1; //Index of the connection to use. Used to be a context member. - if (edg_wll_open(ctx)) return edg_wll_Error(ctx,NULL,NULL); - - switch (edg_wll_http_send(ctx,request,req_head,req_body)) { + if (edg_wll_open(ctx,&connToUse)) return edg_wll_Error(ctx,NULL,NULL); + + switch (edg_wll_http_send(ctx,request,req_head,req_body,&ctx->connections->connPool[connToUse])) { case ENOTCONN: - edg_wll_close(ctx); - if (edg_wll_open(ctx) - || edg_wll_http_send(ctx,request,req_head,req_body)) + edg_wll_close(ctx,&connToUse); + if (edg_wll_open(ctx,&connToUse) + || edg_wll_http_send(ctx,request,req_head,req_body,&ctx->connections->connPool[connToUse])) goto err; /* fallthrough */ case 0: break; default: goto err; } - switch (edg_wll_http_recv(ctx,response,resp_head,resp_body)) { + switch (edg_wll_http_recv(ctx,response,resp_head,resp_body,&ctx->connections->connPool[connToUse])) { case ENOTCONN: - edg_wll_close(ctx); - if (edg_wll_open(ctx) - || edg_wll_http_send(ctx,request,req_head,req_body) - || edg_wll_http_recv(ctx,response,resp_head,resp_body)) + edg_wll_close(ctx,&connToUse); + if (edg_wll_open(ctx,&connToUse) + || edg_wll_http_send(ctx,request,req_head,req_body,&ctx->connections->connPool[connToUse]) + || edg_wll_http_recv(ctx,response,resp_head,resp_body,&ctx->connections->connPool[connToUse])) goto err; /* fallthrough */ case 0: break; default: goto err; } - assert(ctx->connToUse >= 0); - gettimeofday(&ctx->connPool[ctx->connToUse].lastUsed, NULL); + assert(connToUse >= 0); + gettimeofday(&ctx->connections->connPool[connToUse].lastUsed, NULL); + + edg_wll_connectionUnlock(ctx, connToUse); + return 0; err: ec = edg_wll_Error(ctx,NULL,&ed); - edg_wll_close(ctx); + edg_wll_close(ctx,&connToUse); edg_wll_SetError(ctx,ec,ed); free(ed); return ec; diff --git a/org.glite.lb.client/src/connection.h b/org.glite.lb.client/src/connection.h index 1b12058..1423d7e 100644 --- a/org.glite.lb.client/src/connection.h +++ b/org.glite.lb.client/src/connection.h @@ -3,8 +3,8 @@ #ident "$Header$" -int edg_wll_close(edg_wll_Context ctx); -int edg_wll_open(edg_wll_Context ctx); +int edg_wll_close(edg_wll_Context ctx,int *); +int edg_wll_open(edg_wll_Context ctx,int *); int edg_wll_http_send_recv(edg_wll_Context, char *, const char * const *, char *, char **, char ***, char **); int edg_wll_close_proxy(edg_wll_Context ctx); diff --git a/org.glite.lb.common/Makefile b/org.glite.lb.common/Makefile index de0d012..ab6dccf 100644 --- a/org.glite.lb.common/Makefile +++ b/org.glite.lb.common/Makefile @@ -98,7 +98,8 @@ INSTALL:=libtool --mode=install install OBJS:=${JOBID_OBJS} ${PERF_OBJS} lb_plain_io.o escape.o events.o mini_http.o query_rec.o \ status.o xml_conversions.o xml_parse.o ulm_parse.o param.o \ events_parse.o il_string.o il_int.o notifid.o \ - il_log.o il_msg.o log_msg.o context.o trio.o strio.o lb_maildir.o + il_log.o il_msg.o log_msg.o context.o trio.o strio.o lb_maildir.o \ + connpool.o LOBJS:=${OBJS:.o=.lo} TRIO_OBJS:=escape.o trio.o strio.o @@ -112,7 +113,7 @@ THRLOBJS:=${OBJS:.o=.thr.lo} HDRS:=context-int.h lb_plain_io.h mini_http.h authz.h xml_parse.h \ xml_conversions.h log_proto.h events_parse.h il_string.h il_msg.h \ - escape.h ulm_parse.h trio.h lb_maildir.h ${PERF_HDRS} + escape.h ulm_parse.h trio.h lb_maildir.h connpool.h ${PERF_HDRS} STATICLIB:=libglite_lb_common_${nothrflavour}.a THRSTATICLIB:=libglite_lb_common_${thrflavour}.a @@ -253,7 +254,7 @@ il_int_test.o il_string_test.o il_test.o il_msg_test.o: %.o: %.cpp ${CXX} -c ${CFLAGS} ${TEST_INC} $< %.thr.o: %.c - ${COMPILE} ${GLOBUSTHRINC} ${CFLAGS} -o $@ -c $< + ${COMPILE} ${GLOBUSTHRINC} ${CFLAGS} -D_REENTRANT -DGLITE_LB_THREADED -o $@ -c $< %.h: %.h.T rm -f $@ diff --git a/org.glite.lb.common/interface/connpool.h b/org.glite.lb.common/interface/connpool.h new file mode 100644 index 0000000..f536a8b --- /dev/null +++ b/org.glite.lb.common/interface/connpool.h @@ -0,0 +1,98 @@ +#include "glite/security/glite_gss.h" +#include "glite/lb/consumer.h" +#include "lb_plain_io.h" +#include "authz.h" +#include "glite/lb/log_proto.h" +#ifdef GLITE_LB_THREADED + #include +#endif + +#ifdef __cplusplus +extern "C" { +#endif + +#ifndef EDG_WLL_CONNPOOL_DECLARED +#define EDG_WLL_CONNPOOL_DECLARED 1 + +#define GLITE_LB_COMMON_CONNPOOL_SIZE 50 + + +typedef struct _edg_wll_ConnPool { +/* address and port where we are connected to */ + char *peerName; + unsigned int peerPort; + +/* http(s) stream */ + gss_cred_id_t gsiCred; + edg_wll_GssConnection gss; + char *buf; + int bufUse,bufSize; + +/* timestamp of usage of this entry in ctx.connPool */ + struct timeval lastUsed; +} edg_wll_ConnPool; +#endif + + +#ifndef EDG_WLL_CONNECIONS_DECLARED +#define EDG_WLL_CONNECIONS_DECLARED 1 +typedef struct _edg_wll_Connections { + +/* connection pool array */ + edg_wll_ConnPool *connPool; + edg_wll_ConnPool *serverConnection; /* this is in fact a single-item array intended for use + by the server (since the server only uses one connection)*/ + +/* pool of connections from client */ + int poolSize; /* number of connections in the pool */ + int connOpened; /* number of opened connections */ + +/* Connection pool locks & accessories.*/ +#ifdef GLITE_LB_THREADED + pthread_mutex_t poolLock; /* Global pool lock (to be used for pool-wide operations carried out locally) */ + pthread_mutex_t *connectionLock; /* Per-connection lock (used to lock out connections that are in use) */ +#endif + edg_wll_Context *locked_by; /* identifies contexts that have been used to lock a connection since they + do probably have a connToUse value stored in them*/ + +} edg_wll_Connections; +#endif + + +/* Connections Global Handle */ +extern edg_wll_Connections connectionsHandle; + +/** ** Locking Functions (wrappers) **/ +/* Introduced to simplify debugging and switching betwen single/multi-threaded behavior */ + +/** Lock (try) the pool */ +int edg_wll_poolTryLock(); + +/** Lock the pool (blocking) */ +int edg_wll_poolLock(); + +/** Unlock the pool */ +int edg_wll_poolUnlock(); + + +/** Lock (try) a single connection */ +int edg_wll_connectionTryLock(edg_wll_Context ctx, int index); + +/** Lock a single connection (blocking) */ +int edg_wll_connectionLock(edg_wll_Context ctx, int index); + +/** Unlock a connection */ +int edg_wll_connectionUnlock(edg_wll_Context ctx, int index); + + +/** Free memory used by the pool and lock array */ +void edg_wll_poolFree(); + +/** Allocate memory for the edg_wll_Connections structure and its properties and return a pointer. + in case memory has been already allocated, just return a pointer */ +edg_wll_Connections* edg_wll_initConnections(); + +#ifdef __cplusplus +} +#endif + diff --git a/org.glite.lb.common/interface/context-int.h b/org.glite.lb.common/interface/context-int.h index 8536a8e..6935154 100644 --- a/org.glite.lb.common/interface/context-int.h +++ b/org.glite.lb.common/interface/context-int.h @@ -7,6 +7,7 @@ #include "glite/lb/consumer.h" #include "lb_plain_io.h" #include "authz.h" +#include "connpool.h" #ifdef __cplusplus extern "C" { @@ -29,23 +30,6 @@ typedef struct _edg_wll_ConnProxy { -typedef struct _edg_wll_ConnPool { -/* address and port where we are connected to */ - char *peerName; - unsigned int peerPort; - -/* http(s) stream */ - gss_cred_id_t gsiCred; - edg_wll_GssConnection gss; - char *buf; - int bufUse,bufSize; - -/* timestamp of usage of this entry in ctx.connPool */ - struct timeval lastUsed; -} edg_wll_ConnPool; - - - struct _edg_wll_Context { /* Error handling */ int errCode; /* recent error code */ @@ -54,7 +38,7 @@ struct _edg_wll_Context { /* server part */ void *mysql; - edg_wll_ConnPool *connPool; + edg_wll_Connections *connections; edg_wll_ConnPool *connPoolNotif; /* hold _one_ connection from notif-interlogger */ edg_wll_ConnProxy *connProxy; /* holds one plain connection */ @@ -99,14 +83,6 @@ struct _edg_wll_Context { char *srvName; unsigned int srvPort; -/* pool of connections from client */ - int poolSize; - int connOpened; /* number of opened connections */ - int connToUse; /* index of connection that will * - * be used by low-level f-cions */ - // XXX similar variables will be needed for connPoolNotif - - /* other client stuff */ int notifSock; /* default client socket * * for receiving notifications */ diff --git a/org.glite.lb.common/interface/log_proto.h b/org.glite.lb.common/interface/log_proto.h index aeaf306..505df74 100644 --- a/org.glite.lb.common/interface/log_proto.h +++ b/org.glite.lb.common/interface/log_proto.h @@ -68,7 +68,7 @@ extern "C" { /** * default maximal number of simultaneously open connections from one client */ -#define EDG_WLL_LOG_CONNECTIONS_DEFAULT 4 +// XXX: not used? #define EDG_WLL_LOG_CONNECTIONS_DEFAULT 4 #ifdef __cplusplus diff --git a/org.glite.lb.common/interface/mini_http.h b/org.glite.lb.common/interface/mini_http.h index 2b846f3..1bae8b9 100644 --- a/org.glite.lb.common/interface/mini_http.h +++ b/org.glite.lb.common/interface/mini_http.h @@ -4,6 +4,7 @@ #ident "$Header$" #include "glite/lb/consumer.h" +#include "connpool.h" /* XXX: not a good place for the folowing #def's but we ain't got better currently */ /** protocol version */ @@ -30,16 +31,18 @@ extern edg_wll_ErrorCode edg_wll_http_recv( edg_wll_Context, /* INOUT: context */ - char **, /* OUT: first line */ - char ***, /* OUT: null terminated array of headers */ - char ** /* OUT: message body */ + char **, /* OUT: first line */ + char ***, /* OUT: null terminated array of headers */ + char **, /* OUT: message body */ + edg_wll_ConnPool *connPTR /* IN: Pointer to the connection to use */ ); extern edg_wll_ErrorCode edg_wll_http_send( edg_wll_Context, /* INOUT: context */ const char *, /* IN: first line */ char const * const *, /* IN: headers */ - const char * /* IN: message body */ + const char *, /* IN: message body */ + edg_wll_ConnPool *connPTR /* IN: Pointer to the connection to use */ ); extern edg_wll_ErrorCode edg_wll_http_recv_proxy( diff --git a/org.glite.lb.common/src/connpool.c b/org.glite.lb.common/src/connpool.c new file mode 100644 index 0000000..fb84222 --- /dev/null +++ b/org.glite.lb.common/src/connpool.c @@ -0,0 +1,277 @@ +#include "connpool.h" + +#ifdef GLITE_LB_THREADED +edg_wll_Connections connectionsHandle = + { NULL , NULL , GLITE_LB_COMMON_CONNPOOL_SIZE , 0 , PTHREAD_MUTEX_INITIALIZER , NULL , NULL}; +#endif + +#ifndef GLITE_LB_THREADED +edg_wll_Connections connectionsHandle = + { NULL , NULL , GLITE_LB_COMMON_CONNPOOL_SIZE , 0 , NULL}; +#endif + +/** Lock (try) the pool */ +int edg_wll_poolTryLock() { +int RetVal; + + #ifdef GLITE_LB_THREADED /* Threaded version */ + #ifdef EDG_WLL_CONNPOOL_DEBUG + printf("Thread %d trying to lock the pool ",pthread_self()); + #endif + RetVal = + pthread_mutex_trylock( + &connectionsHandle.poolLock + ); + + #ifdef EDG_WLL_CONNPOOL_DEBUG + printf("- result %d\n",RetVal); + #endif + #endif + + + #ifndef GLITE_LB_THREADED /* Single-thread version */ + RetVal = 0; + + #ifdef EDG_WLL_CONNPOOL_DEBUG + printf("Dummy - Trying to lock the pool (pthreads not included)\n"); + #endif + #endif + + + return(RetVal); +} + + +/** Lock the pool (blocking) */ +int edg_wll_poolLock() { +int RetVal; + + #ifdef GLITE_LB_THREADED + + #ifdef EDG_WLL_CONNPOOL_DEBUG + printf("Thread %d trying to lock the pool\n",pthread_self()); + #endif + + RetVal = pthread_mutex_trylock(&connectionsHandle.poolLock); + + if (RetVal) { + + #ifdef EDG_WLL_CONNPOOL_DEBUG + printf("Thread %d locking the pool\n",pthread_self()); + #endif + RetVal = pthread_mutex_lock(&connectionsHandle.poolLock); + } + + #ifdef EDG_WLL_CONNPOOL_DEBUG + printf("Thread %d - lock result %d\n",pthread_self(),RetVal); + #endif + #endif + + #ifndef GLITE_LB_THREADED + RetVal = 0; + + #ifdef EDG_WLL_CONNPOOL_DEBUG + printf("Dummy - Locking the pool (pthreads not included)\n"); + #endif + #endif + + + return(RetVal); +} + + +/** Unlock the pool */ +int edg_wll_poolUnlock() { +int RetVal; + + #ifdef GLITE_LB_THREADED + #ifdef EDG_WLL_CONNPOOL_DEBUG + printf("Thread %d unlocking the pool\n",pthread_self()); + #endif + RetVal = pthread_mutex_unlock(&connectionsHandle.poolLock); + + #ifdef EDG_WLL_CONNPOOL_DEBUG + printf("Thread %d - unlock result %d\n",pthread_self(),RetVal); + #endif + + #endif + + #ifndef GLITE_LB_THREADED + RetVal = 0; + + #ifdef EDG_WLL_CONNPOOL_DEBUG + printf("Dummy - Unlocking the pool (pthreads not included)\n"); + #endif + + #endif + + + return(RetVal); +} + + +/** Lock (try) a single connection */ +int edg_wll_connectionTryLock(edg_wll_Context ctx, int index) { +int RetVal; + + #ifdef GLITE_LB_THREADED + RetVal = pthread_mutex_trylock(&connectionsHandle.connectionLock[index]); /* Try to lock the connection */ + if (!RetVal) connectionsHandle.locked_by[index] = (void*)ctx; /* If lock succeeded, store the + locking context address */ + + #ifdef EDG_WLL_CONNPOOL_DEBUG + printf("Thread %d trying to lock connection No. [%d] - result %d\n",pthread_self(),index,RetVal); + #endif + + #endif + + #ifndef GLITE_LB_THREADED + RetVal = 0; + + #ifdef EDG_WLL_CONNPOOL_DEBUG + printf("Dummy - Trying to lock connection No. [%d] (pthreads not included)\n",index); + #endif + + #endif + + + return(RetVal); +} + + +/** Lock a single connection (blocking) */ +int edg_wll_connectionLock(edg_wll_Context ctx, int index) { +int RetVal; + + #ifdef GLITE_LB_THREADED + RetVal = pthread_mutex_lock(&connectionsHandle.connectionLock[index]); /* Lock the connection (wait if + not available)*/ + if (!RetVal) connectionsHandle.locked_by[index] = (void*)ctx; /* If lock succeeded, store the + locking context address */ + + #ifdef EDG_WLL_CONNPOOL_DEBUG + printf("Thread %d locking connection No. [%d] - result %d\n",pthread_self(),index,RetVal); + #endif + + #endif + + #ifndef GLITE_LB_THREADED + RetVal = 0; + + #ifdef EDG_WLL_CONNPOOL_DEBUG + printf("Dummy - locking connection No. [%d] (pthreads not included)\n",index); + #endif + + #endif + + + return(RetVal); +} + + +/** Unlock a connection */ +int edg_wll_connectionUnlock(edg_wll_Context ctx, int index) { +int RetVal; + + #ifdef GLITE_LB_THREADED + RetVal = pthread_mutex_unlock(&connectionsHandle.connectionLock[index]); + if (!RetVal) connectionsHandle.locked_by[index] = NULL; + + #ifdef EDG_WLL_CONNPOOL_DEBUG + printf("Thread %d unlocking connection No. [%d] - result %d\n",pthread_self(),index,RetVal); + #endif + + #endif + + #ifndef GLITE_LB_THREADED + RetVal = 0; + + #ifdef EDG_WLL_CONNPOOL_DEBUG + printf("Dummy - unlocking connection No. [%d] (pthreads not included)\n",index); + #endif + + #endif + + + return(RetVal); +} + + +/** Free all memory used by the pool and lock-related arrays */ +void edg_wll_poolFree() { + int i; + struct timeval close_timeout = {0, 50000}; /* Declarations taken over from edg_wll_FreeContext() */ + OM_uint32 min_stat; /* Does not seem to have any use whatsoever - neither + here nor in edg_wll_FreeContext() */ + + #ifdef EDG_WLL_CONNPOOL_DEBUG + #ifdef GLITE_LB_THREADED + printf("Thread %d ",pthread_self()); + #endif + printf("Entering edg_wll_poolFree\n"); + #endif + + + for (i=0; i 0)) { /* We need to allocate memory for the connPool and connectionLock arrays */ + + #ifdef EDG_WLL_CONNPOOL_DEBUG + #ifdef GLITE_LB_THREADED + printf("Thread %d ",pthread_self()); + #endif + printf("Initializng connections AND THE CONNECTION POOL\n"); + #endif + + + connectionsHandle.connPool = (edg_wll_ConnPool *) calloc(connectionsHandle.poolSize, sizeof(edg_wll_ConnPool)); + + #ifdef GLITE_LB_THREADED + connectionsHandle.connectionLock = (pthread_mutex_t *) calloc(connectionsHandle.poolSize, sizeof(pthread_mutex_t)); + #endif + + connectionsHandle.locked_by = (edg_wll_Context) calloc(connectionsHandle.poolSize, sizeof(edg_wll_Context)); + + } + if(connectionsHandle.serverConnection == NULL) { + connectionsHandle.serverConnection = (edg_wll_ConnPool *) calloc(1, sizeof(edg_wll_ConnPool)); + } + + + return (&connectionsHandle); +} + + diff --git a/org.glite.lb.common/src/context.c b/org.glite.lb.common/src/context.c index 47408ea..3e0ea8a 100644 --- a/org.glite.lb.common/src/context.c +++ b/org.glite.lb.common/src/context.c @@ -34,11 +34,12 @@ int edg_wll_InitContext(edg_wll_Context *ctx) /* XXX */ for (i=0; iconnPool = (edg_wll_ConnPool *) calloc(out->poolSize, sizeof(edg_wll_ConnPool)); + out->connections = edg_wll_initConnections(); +// out->connections->connPool = (edg_wll_ConnPool *) calloc(out->connections->poolSize, sizeof(edg_wll_ConnPool)); out->connPoolNotif = (edg_wll_ConnPool *) calloc(1, sizeof(edg_wll_ConnPool)); out->connProxy = (edg_wll_ConnProxy *) calloc(1, sizeof(edg_wll_ConnProxy)); out->connProxy->conn.sock = -1; - out->connToUse = -1; +// out->connToUse = -1; *ctx = out; @@ -73,17 +74,43 @@ void edg_wll_FreeContext(edg_wll_Context ctx) } #endif if (ctx->errDesc) free(ctx->errDesc); - if (ctx->connPool) { + if (ctx->connections->connPool) { int i; - - for (i=0; ipoolSize; i++) { - if (ctx->connPool[i].peerName) free(ctx->connPool[i].peerName); - edg_wll_gss_close(&ctx->connPool[i].gss,&close_timeout); - if (ctx->connPool[i].gsiCred) - gss_release_cred(&min_stat, &ctx->connPool[i].gsiCred); - if (ctx->connPool[i].buf) free(ctx->connPool[i].buf); + +#ifdef GLITE_LB_THREADED + /* Since the introduction of a shared connection pool, the pool + cannot be freed here. We only need to unlock connections that + may have been locked using this context. */ + + #ifdef EDG_WLL_CONNPOOL_DEBUG + printf("Running edg_wll_FreeContext - checking for connections locked by the current context.\n"); + #endif + + edg_wll_poolLock(); + + for (i=0; iconnections->poolSize; i++) { + + if (ctx->connections->locked_by[i]==ctx) { + #ifdef EDG_WLL_CONNPOOL_DEBUG + printf("Unlocking connection No. %d...",i); + #endif + edg_wll_connectionUnlock(ctx, i); + + } + } + + edg_wll_poolUnlock(); +#endif + +/* + for (i=0; iconnections->poolSize; i++) { + if (ctx->connections->connPool[i].peerName) free(ctx->connections->connPool[i].peerName); + edg_wll_gss_close(&ctx->connections->connPool[i].gss,&close_timeout); + if (ctx->connections->connPool[i].gsiCred) + gss_release_cred(&min_stat, &ctx->connections->connPool[i].gsiCred); + if (ctx->connections->connPool[i].buf) free(ctx->connections->connPool[i].buf); } - free(ctx->connPool); + free(ctx->connections->connPool);*/ } if (ctx->connPoolNotif) { if (ctx->connPoolNotif[0].peerName) free(ctx->connPoolNotif[0].peerName); diff --git a/org.glite.lb.common/src/mini_http.c b/org.glite.lb.common/src/mini_http.c index 0535aa6..29aa3fc 100644 --- a/org.glite.lb.common/src/mini_http.c +++ b/org.glite.lb.common/src/mini_http.c @@ -21,7 +21,7 @@ #define min(x,y) ((x) < (y) ? (x) : (y)) #define CONTENT_LENGTH "Content-Length:" -edg_wll_ErrorCode edg_wll_http_recv(edg_wll_Context ctx,char **firstOut,char ***hdrOut,char **bodyOut) +edg_wll_ErrorCode edg_wll_http_recv(edg_wll_Context ctx,char **firstOut,char ***hdrOut,char **bodyOut, edg_wll_ConnPool *connPTR) { char **hdr = NULL,*first = NULL,*body = NULL; enum { FIRST, HEAD, BODY, DONE } pstat = FIRST; @@ -30,23 +30,23 @@ edg_wll_ErrorCode edg_wll_http_recv(edg_wll_Context ctx,char **firstOut,char *** edg_wll_GssStatus gss_code; #define bshift(shift) {\ - memmove(ctx->connPool[ctx->connToUse].buf,ctx->connPool[ctx->connToUse].buf+(shift),ctx->connPool[ctx->connToUse].bufUse-(shift));\ - ctx->connPool[ctx->connToUse].bufUse -= (shift);\ + memmove(connPTR->buf,connPTR->buf+(shift),connPTR->bufUse-(shift));\ + connPTR->bufUse -= (shift);\ } edg_wll_ResetError(ctx); - if (ctx->connPool[ctx->connToUse].gss.context != GSS_C_NO_CONTEXT) - sock = ctx->connPool[ctx->connToUse].gss.sock; + if (connPTR->gss.context != GSS_C_NO_CONTEXT) + sock = connPTR->gss.sock; else { edg_wll_SetError(ctx,ENOTCONN,NULL); goto error; } - if (!ctx->connPool[ctx->connToUse].buf) ctx->connPool[ctx->connToUse].buf = malloc(ctx->connPool[ctx->connToUse].bufSize = BUFSIZ); + if (!connPTR->buf) connPTR->buf = malloc(connPTR->bufSize = BUFSIZ); do { - len = edg_wll_gss_read(&ctx->connPool[ctx->connToUse].gss, - ctx->connPool[ctx->connToUse].buf+ctx->connPool[ctx->connToUse].bufUse,ctx->connPool[ctx->connToUse].bufSize-ctx->connPool[ctx->connToUse].bufUse,&ctx->p_tmp_timeout, &gss_code); + len = edg_wll_gss_read(&connPTR->gss, + connPTR->buf+connPTR->bufUse,connPTR->bufSize-connPTR->bufUse,&ctx->p_tmp_timeout, &gss_code); switch (len) { case EDG_WLL_GSS_OK: @@ -68,27 +68,27 @@ edg_wll_ErrorCode edg_wll_http_recv(edg_wll_Context ctx,char **firstOut,char *** } - ctx->connPool[ctx->connToUse].bufUse += len; + connPTR->bufUse += len; rdmore = 0; while (!rdmore && pstat != DONE) switch (pstat) { char *cr; case FIRST: - if ((cr = memchr(ctx->connPool[ctx->connToUse].buf,'\r',ctx->connPool[ctx->connToUse].bufUse)) && - ctx->connPool[ctx->connToUse].bufUse >= cr-ctx->connPool[ctx->connToUse].buf+2 && cr[1] == '\n') + if ((cr = memchr(connPTR->buf,'\r',connPTR->bufUse)) && + connPTR->bufUse >= cr-connPTR->buf+2 && cr[1] == '\n') { *cr = 0; - first = strdup(ctx->connPool[ctx->connToUse].buf); - bshift(cr-ctx->connPool[ctx->connToUse].buf+2); + first = strdup(connPTR->buf); + bshift(cr-connPTR->buf+2); pstat = HEAD; } else rdmore = 1; break; case HEAD: - if ((cr = memchr(ctx->connPool[ctx->connToUse].buf,'\r',ctx->connPool[ctx->connToUse].bufUse)) && - ctx->connPool[ctx->connToUse].bufUse >= cr-ctx->connPool[ctx->connToUse].buf+2 && cr[1] == '\n') + if ((cr = memchr(connPTR->buf,'\r',connPTR->bufUse)) && + connPTR->bufUse >= cr-connPTR->buf+2 && cr[1] == '\n') { - if (cr == ctx->connPool[ctx->connToUse].buf) { + if (cr == connPTR->buf) { bshift(2); pstat = clen ? BODY : DONE; if (clen) body = malloc(clen+1); @@ -97,19 +97,19 @@ edg_wll_ErrorCode edg_wll_http_recv(edg_wll_Context ctx,char **firstOut,char *** *cr = 0; hdr = realloc(hdr,(nhdr+2) * sizeof(*hdr)); - hdr[nhdr] = strdup(ctx->connPool[ctx->connToUse].buf); + hdr[nhdr] = strdup(connPTR->buf); hdr[++nhdr] = NULL; - if (!strncasecmp(ctx->connPool[ctx->connToUse].buf,CONTENT_LENGTH,sizeof(CONTENT_LENGTH)-1)) - clen = atoi(ctx->connPool[ctx->connToUse].buf+sizeof(CONTENT_LENGTH)-1); + if (!strncasecmp(connPTR->buf,CONTENT_LENGTH,sizeof(CONTENT_LENGTH)-1)) + clen = atoi(connPTR->buf+sizeof(CONTENT_LENGTH)-1); - bshift(cr-ctx->connPool[ctx->connToUse].buf+2); + bshift(cr-connPTR->buf+2); } else rdmore = 1; break; case BODY: - if (ctx->connPool[ctx->connToUse].bufUse) { - int m = min(ctx->connPool[ctx->connToUse].bufUse,clen-blen); - memcpy(body+blen,ctx->connPool[ctx->connToUse].buf,m); + if (connPTR->bufUse) { + int m = min(connPTR->bufUse,clen-blen); + memcpy(body+blen,connPTR->buf,m); blen += m; bshift(m); } @@ -297,23 +297,23 @@ static int real_write(edg_wll_Context ctx, edg_wll_GssConnection *con,const char } } -edg_wll_ErrorCode edg_wll_http_send(edg_wll_Context ctx,const char *first,const char * const *head,const char *body) +edg_wll_ErrorCode edg_wll_http_send(edg_wll_Context ctx,const char *first,const char * const *head,const char *body, edg_wll_ConnPool *connPTR) { const char* const *h; int len = 0, blen; edg_wll_ResetError(ctx); - if (ctx->connPool[ctx->connToUse].gss.context == GSS_C_NO_CONTEXT) + if (connPTR->gss.context == GSS_C_NO_CONTEXT) return edg_wll_SetError(ctx,ENOTCONN,NULL); - if (real_write(ctx,&ctx->connPool[ctx->connToUse].gss,first,strlen(first)) < 0 || - real_write(ctx,&ctx->connPool[ctx->connToUse].gss,"\r\n",2) < 0) + if (real_write(ctx,&connPTR->gss,first,strlen(first)) < 0 || + real_write(ctx,&connPTR->gss,"\r\n",2) < 0) return edg_wll_SetError(ctx,errno,"edg_wll_http_send()"); if (head) for (h=head; *h; h++) - if (real_write(ctx,&ctx->connPool[ctx->connToUse].gss,*h,strlen(*h)) < 0 || - real_write(ctx,&ctx->connPool[ctx->connToUse].gss,"\r\n",2) < 0) + if (real_write(ctx,&connPTR->gss,*h,strlen(*h)) < 0 || + real_write(ctx,&connPTR->gss,"\r\n",2) < 0) return edg_wll_SetError(ctx,errno,"edg_wll_http_send()"); if (body) { @@ -321,13 +321,13 @@ edg_wll_ErrorCode edg_wll_http_send(edg_wll_Context ctx,const char *first,const len = strlen(body); blen = sprintf(buf,CONTENT_LENGTH " %d\r\n",len); - if (real_write(ctx,&ctx->connPool[ctx->connToUse].gss,buf,blen) < 0) + if (real_write(ctx,&connPTR->gss,buf,blen) < 0) return edg_wll_SetError(ctx,errno,"edg_wll_http_send()"); } - if (real_write(ctx,&ctx->connPool[ctx->connToUse].gss,"\r\n",2) < 0) + if (real_write(ctx,&connPTR->gss,"\r\n",2) < 0) return edg_wll_SetError(ctx,errno,"edg_wll_http_send()"); - if (body && real_write(ctx,&ctx->connPool[ctx->connToUse].gss,body,len) < 0) + if (body && real_write(ctx,&connPTR->gss,body,len) < 0) return edg_wll_SetError(ctx,errno,"edg_wll_http_send()"); return edg_wll_Error(ctx,NULL,NULL); diff --git a/org.glite.lb.common/src/param.c b/org.glite.lb.common/src/param.c index d0b70bd..02d218f 100644 --- a/org.glite.lb.common/src/param.c +++ b/org.glite.lb.common/src/param.c @@ -30,7 +30,7 @@ static const char *myenv[] = { "%sQUERY_JOBS_LIMIT", "%sQUERY_EVENTS_LIMIT", "%sQUERY_RESULTS", - "%sQUERY_CONNECTIONS", + "%sCONNPOOL_SIZE", "%sNOTIF_SERVER", "%sNOTIF_SERVER", "%sNOTIF_TIMEOUT", @@ -238,12 +238,12 @@ int edg_wll_SetParamInt(edg_wll_Context ctx,edg_wll_ContextParam param,int val) return edg_wll_SetError(ctx,EINVAL,"can't parse query result parameter name"); } break; - case EDG_WLL_PARAM_QUERY_CONNECTIONS: + case EDG_WLL_PARAM_CONNPOOL_SIZE: { char *s = mygetenv(param); if (!val && s) val = atoi(s); - ctx->poolSize = val ? val : EDG_WLL_LOG_CONNECTIONS_DEFAULT; + connectionsHandle.poolSize = val ? val : GLITE_LB_COMMON_CONNPOOL_SIZE; } break; case EDG_WLL_PARAM_SOURCE: @@ -312,7 +312,7 @@ int edg_wll_SetParam(edg_wll_Context ctx,edg_wll_ContextParam param,...) case EDG_WLL_PARAM_QUERY_JOBS_LIMIT: case EDG_WLL_PARAM_QUERY_EVENTS_LIMIT: case EDG_WLL_PARAM_QUERY_RESULTS: - case EDG_WLL_PARAM_QUERY_CONNECTIONS: + case EDG_WLL_PARAM_CONNPOOL_SIZE: case EDG_WLL_PARAM_SOURCE: return edg_wll_SetParamInt(ctx,param,va_arg(ap,int)); case EDG_WLL_PARAM_HOST: @@ -378,9 +378,9 @@ int edg_wll_GetParam(edg_wll_Context ctx,edg_wll_ContextParam param,...) p_int = va_arg(ap, int *); *p_int = ctx->p_query_results; break; - case EDG_WLL_PARAM_QUERY_CONNECTIONS: + case EDG_WLL_PARAM_CONNPOOL_SIZE: p_int = va_arg(ap, int *); - *p_int = ctx->poolSize; + *p_int = connectionsHandle.poolSize; break; case EDG_WLL_PARAM_SOURCE: p_int = va_arg(ap, int *); diff --git a/org.glite.lb.server-bones/src/srvbones.c b/org.glite.lb.server-bones/src/srvbones.c index cdac73e..54cc196 100644 --- a/org.glite.lb.server-bones/src/srvbones.c +++ b/org.glite.lb.server-bones/src/srvbones.c @@ -396,7 +396,6 @@ static int slave(slave_data_init_hnd data_init_hnd, int sock) { kick_client = KICK_HANDLER; } else { - req_cnt++; first_request = 0; to = set_request_to; if ((rv = services[srv].on_request_hnd(conn,to.tv_sec>=0 ? &to : NULL,clnt_data)) == ENOTCONN) { diff --git a/org.glite.lb.server/src/bkserverd.c b/org.glite.lb.server/src/bkserverd.c index f634070..a5c4dec 100644 --- a/org.glite.lb.server/src/bkserverd.c +++ b/org.glite.lb.server/src/bkserverd.c @@ -790,14 +790,12 @@ int bk_handle_connection(int conn, struct timeval *timeout, void *data) ctx->p_tmp_timeout.tv_sec = timeout->tv_sec; ctx->p_tmp_timeout.tv_usec = timeout->tv_usec; - ctx->poolSize = 1; - ctx->connPool = calloc(1, sizeof(edg_wll_ConnPool)); - ctx->connToUse = 0; + edg_wll_initConnections(); alen = sizeof(a); getpeername(conn, (struct sockaddr *)&a, &alen); - ctx->connPool[ctx->connToUse].peerName = strdup(inet_ntoa(a.sin_addr)); - ctx->connPool[ctx->connToUse].peerPort = ntohs(a.sin_port); + ctx->connections->serverConnection->peerName = strdup(inet_ntoa(a.sin_addr)); + ctx->connections->serverConnection->peerPort = ntohs(a.sin_port); ctx->count_statistics = count_statistics; ctx->serverIdentity = strdup(server_subject); @@ -810,8 +808,8 @@ int bk_handle_connection(int conn, struct timeval *timeout, void *data) case NETDB_SUCCESS: if (name) dprintf(("[%d] connection from %s:%d (%s)\n", getpid(), inet_ntoa(a.sin_addr), ntohs(a.sin_port), name)); - free(ctx->connPool[ctx->connToUse].peerName); - ctx->connPool[ctx->connToUse].peerName = name; + free(ctx->connections->serverConnection->peerName); + ctx->connections->serverConnection->peerName = name; name = NULL; break; @@ -876,7 +874,7 @@ int bk_handle_connection(int conn, struct timeval *timeout, void *data) ctx->srvPort = ntohs(a.sin_port); } - if ( (ret = edg_wll_gss_accept(mycred, conn, timeout, &ctx->connPool[ctx->connToUse].gss, &gss_code)) ) + if ( (ret = edg_wll_gss_accept(mycred, conn, timeout, &ctx->connections->serverConnection->gss, &gss_code)) ) { if ( ret == EDG_WLL_GSS_ERROR_TIMEOUT ) { @@ -893,7 +891,7 @@ int bk_handle_connection(int conn, struct timeval *timeout, void *data) return 1; } - maj_stat = gss_inquire_context(&min_stat, ctx->connPool[ctx->connToUse].gss.context, + maj_stat = gss_inquire_context(&min_stat, ctx->connections->serverConnection->gss.context, &client_name, NULL, NULL, NULL, NULL, NULL, NULL); if ( !GSS_ERROR(maj_stat) ) maj_stat = gss_display_name(&min_stat, client_name, &token, NULL); @@ -920,7 +918,7 @@ int bk_handle_connection(int conn, struct timeval *timeout, void *data) if ( token.value ) gss_release_buffer(&min_stat, &token); - if ( edg_wll_SetVomsGroups(ctx, &ctx->connPool[ctx->connToUse].gss, server_cert, server_key, vomsdir, cadir) ) + if ( edg_wll_SetVomsGroups(ctx, &ctx->connections->serverConnection->gss, server_cert, server_key, vomsdir, cadir) ) { char *errt, *errd; @@ -988,7 +986,7 @@ int bk_handle_ws_connection(int conn, struct timeval *timeout, void *data) soap_done(soap); goto err; } - gsplugin_ctx->connection = &cdata->ctx->connPool[cdata->ctx->connToUse].gss; + gsplugin_ctx->connection = &cdata->ctx->connections->serverConnection->gss; gsplugin_ctx->cred = mycred; cdata->soap = soap; @@ -1201,8 +1199,8 @@ int bk_clnt_disconnect(int conn, struct timeval *timeout, void *cdata) edg_wll_Context ctx = ((struct clnt_data_t *) cdata)->ctx; - if ( ctx->connPool[ctx->connToUse].gss.context != GSS_C_NO_CONTEXT) - edg_wll_gss_close(&ctx->connPool[ctx->connToUse].gss, timeout); + if ( ctx->connections->serverConnection->gss.context != GSS_C_NO_CONTEXT) + edg_wll_gss_close(&ctx->connections->serverConnection->gss, timeout); edg_wll_FreeContext(ctx); ctx = NULL; diff --git a/org.glite.lb.server/src/lb_http.c b/org.glite.lb.server/src/lb_http.c index 038d679..3e2cb50 100644 --- a/org.glite.lb.server/src/lb_http.c +++ b/org.glite.lb.server/src/lb_http.c @@ -26,7 +26,7 @@ int edg_wll_ServerHTTP(edg_wll_Context ctx) if ( ctx->isProxy ) err = edg_wll_http_recv_proxy(ctx,&req,&hdr,&body); - else err = edg_wll_http_recv(ctx,&req,&hdr,&body); + else err = edg_wll_http_recv(ctx,&req,&hdr,&body,ctx->connections->serverConnection); dprintf(("[%d] %s\n",getpid(),req)); if (body) dprintf(("\n%s\n\n",body)); @@ -39,7 +39,7 @@ int edg_wll_ServerHTTP(edg_wll_Context ctx) if ( ctx->isProxy ) edg_wll_http_send_proxy(ctx,resp,(char const * const *)hdrOut,bodyOut); else - edg_wll_http_send(ctx,resp,(char const * const *)hdrOut,bodyOut); + edg_wll_http_send(ctx,resp,(char const * const *)hdrOut,bodyOut,ctx->connections->serverConnection); } } diff --git a/org.glite.lb.server/src/notification.c b/org.glite.lb.server/src/notification.c index 769011f..18d6287 100644 --- a/org.glite.lb.server/src/notification.c +++ b/org.glite.lb.server/src/notification.c @@ -97,7 +97,7 @@ int edg_wll_NotifNewServer( goto cleanup; } if ( !strncmp(address_override, "0.0.0.0", aux-address_override) ) - trio_asprintf(&addr_s, "%s:%s", ctx->connPool[ctx->connToUse].peerName, aux+1); + trio_asprintf(&addr_s, "%s:%s", ctx->connections->serverConnection->peerName, aux+1); } /* Format DB insert statement @@ -203,7 +203,7 @@ int edg_wll_NotifBindServer( goto cleanup; } if ( !strncmp(address_override, "0.0.0.0", aux-address_override) ) - trio_asprintf(&addr_s, "%s:%s", ctx->connPool[ctx->connToUse].peerName, aux+1); + trio_asprintf(&addr_s, "%s:%s", ctx->connections->serverConnection->peerName, aux+1); } diff --git a/org.glite.lb.server/src/srv_purge.c b/org.glite.lb.server/src/srv_purge.c index e36acbe..5a625bd 100644 --- a/org.glite.lb.server/src/srv_purge.c +++ b/org.glite.lb.server/src/srv_purge.c @@ -380,7 +380,7 @@ abort: asprintf(&response, "HTTP/1.1 %d %s", ret, edg_wll_HTTPErrorMessage(ret)); - edg_wll_http_send(ctx, response, resp_headers, message); + edg_wll_http_send(ctx, response, resp_headers, message,ctx->connections->serverConnection); return edg_wll_Error(ctx,NULL,NULL); } diff --git a/org.glite.lb.server/src/stored_master.c b/org.glite.lb.server/src/stored_master.c index 5107454..1a1ce49 100644 --- a/org.glite.lb.server/src/stored_master.c +++ b/org.glite.lb.server/src/stored_master.c @@ -28,7 +28,7 @@ gss_reader(void *user_data, char *buffer, int max_len) int ret, len; edg_wll_GssStatus gss_code; - ret = edg_wll_gss_read_full(&tmp_ctx->connPool[tmp_ctx->connToUse].gss, + ret = edg_wll_gss_read_full(&tmp_ctx->connections->serverConnection->gss, buffer, max_len, &tmp_ctx->p_tmp_timeout, &len, &gss_code); @@ -75,7 +75,7 @@ int edg_wll_StoreProto(edg_wll_Context ctx) free(buf); if ((len = create_reply(ctx,&buf)) > 0) { - if ((ret = edg_wll_gss_write_full(&ctx->connPool[ctx->connToUse].gss,buf,len,&ctx->p_tmp_timeout,&total,&gss_code)) < 0) + if ((ret = edg_wll_gss_write_full(&ctx->connections->serverConnection->gss,buf,len,&ctx->p_tmp_timeout,&total,&gss_code)) < 0) edg_wll_SetError(ctx, ret == EDG_WLL_GSS_ERROR_TIMEOUT ? ETIMEDOUT : EDG_WLL_ERROR_GSS, -- 1.8.2.3