From: Miloš Mulač Date: Mon, 21 Aug 2006 12:39:34 +0000 (+0000) Subject: Zdenek's (who is still low-karmatic) initial implementation of new connPool X-Git-Tag: merge_connpool_src~19 X-Git-Url: http://scientific.zcu.cz/git/?a=commitdiff_plain;h=abfdd23b5b6ba2ee7d99cc712d8d138efda1fbe0;p=jra1mw.git Zdenek's (who is still low-karmatic) initial implementation of new connPool - compiles, not tested - maily for sharing --- diff --git a/org.glite.lb.client/src/connection.c b/org.glite.lb.client/src/connection.c index 232be5b..dc237cd 100644 --- a/org.glite.lb.client/src/connection.c +++ b/org.glite.lb.client/src/connection.c @@ -16,32 +16,37 @@ #include "glite/lb/consumer.h" #include "glite/lb/context-int.h" #include "glite/lb/mini_http.h" +#include "glite/lb/connpool.h" - -static void CloseConnection(edg_wll_Context ctx, int conn_index) +static void CloseConnection(edg_wll_Context ctx, int* conn_index) { /* close connection ad free its structures */ OM_uint32 min_stat; + int cIndex; + + cIndex = conn_index[0]; - assert(ctx->connOpened); - assert(conn_index < ctx->connOpened); + assert(ctx->connections->connOpened); + assert(cIndex < ctx->connections->connOpened); - edg_wll_gss_close(&ctx->connPool[conn_index].gss, &ctx->p_tmp_timeout); - if (ctx->connPool[conn_index].gsiCred) - gss_release_cred(&min_stat, &ctx->connPool[conn_index].gsiCred); - free(ctx->connPool[conn_index].peerName); - free(ctx->connPool[conn_index].buf); + edg_wll_gss_close(&ctx->connections->connPool[cIndex].gss, &ctx->p_tmp_timeout); + if (ctx->connections->connPool[cIndex].gsiCred) + gss_release_cred(&min_stat, &ctx->connections->connPool[cIndex].gsiCred); + free(ctx->connections->connPool[cIndex].peerName); + free(ctx->connections->connPool[cIndex].buf); - memset(ctx->connPool + conn_index, 0, sizeof(edg_wll_ConnPool)); + memset(ctx->connections->connPool + cIndex, 0, sizeof(edg_wll_ConnPool)); /* if deleted conn was not the last one -> there is a 'hole' and then */ /* 'shake' together connections in pool, no holes are allowed */ - if (conn_index < ctx->connOpened - 1) { - ctx->connPool[conn_index] = ctx->connPool[ctx->connOpened - 1]; - memset(ctx->connPool + ctx->connOpened - 1 , 0, sizeof(edg_wll_ConnPool)); + if (cIndex < ctx->connections->connOpened - 1) { + ctx->connections->connPool[cIndex] = ctx->connections->connPool[ctx->connections->connOpened - 1]; + memset(ctx->connections->connPool + ctx->connections->connOpened - 1 , 0, sizeof(edg_wll_ConnPool)); } - ctx->connOpened--; + ctx->connections->connOpened--; + + conn_index[0] = cIndex; } @@ -50,9 +55,19 @@ static int ConnectionIndex(edg_wll_Context ctx, const char *name, int port) { int i; - for (i=0; iconnOpened;i++) - if (!strcmp(name, ctx->connPool[i].peerName) && - (port == ctx->connPool[i].peerPort)) return i; + for (i=0; iconnections->connOpened;i++) + /* TryLock (next line) is in fact used only to check the mutex status */ + if (EBUSY & edg_wll_connectionTryLock(ctx, i)) { + /* Connection locked. Do not consider it */ + } + else { + /* Connection was not locked but now it is. Since we do not + really know wheter we are interested in that connection, we + are simply unlocking it now. */ + edg_wll_connectionUnlock(ctx, i); + if (!strcmp(name, ctx->connections->connPool[i].peerName) && + (port == ctx->connections->connPool[i].peerPort)) return i; + } return -1; } @@ -61,12 +76,12 @@ static int ConnectionIndex(edg_wll_Context ctx, const char *name, int port) static int AddConnection(edg_wll_Context ctx, char *name, int port) { - int index = ctx->connOpened; + int index = ctx->connections->connOpened; - free(ctx->connPool[index].peerName); // should be empty; just to be sure - ctx->connPool[index].peerName = strdup(ctx->srvName); - ctx->connPool[index].peerPort = ctx->srvPort; - ctx->connOpened++; + free(ctx->connections->connPool[index].peerName); // should be empty; just to be sure + ctx->connections->connPool[index].peerName = strdup(ctx->srvName); + ctx->connections->connPool[index].peerPort = ctx->srvPort; + ctx->connections->connOpened++; return index; } @@ -79,34 +94,36 @@ static void ReleaseConnection(edg_wll_Context ctx, char *name, int port) long min; - if (ctx->connOpened == 0) return; /* nothing to release */ + if (ctx->connections->connOpened == 0) return; /* nothing to release */ if (name) { if ((index = ConnectionIndex(ctx, name, port)) >= 0) - CloseConnection(ctx, index); + CloseConnection(ctx, &index); } else { /* free the oldest connection*/ - min = ctx->connPool[0].lastUsed.tv_sec; - for (i=0; iconnOpened; i++) { - if (ctx->connPool[i].lastUsed.tv_sec < min) { - min = ctx->connPool[i].lastUsed.tv_sec; + min = ctx->connections->connPool[0].lastUsed.tv_sec; + for (i=0; iconnections->connOpened; i++) { + if (ctx->connections->connPool[i].lastUsed.tv_sec < min) { + min = ctx->connections->connPool[i].lastUsed.tv_sec; index = i; } } - CloseConnection(ctx, index); + CloseConnection(ctx, &index); } } -int edg_wll_close(edg_wll_Context ctx) +int edg_wll_close(edg_wll_Context ctx, int* connToUse) { edg_wll_ResetError(ctx); - if (ctx->connToUse == -1) return 0; + if (*connToUse == -1) return 0; + + CloseConnection(ctx, connToUse); - CloseConnection(ctx, ctx->connToUse); + edg_wll_connectionUnlock(ctx, *connToUse); /* Forgetting the conn. Unlocking is safe. */ - ctx->connToUse = -1; + *connToUse = -1; return edg_wll_Error(ctx,NULL,NULL); } @@ -121,7 +138,7 @@ int edg_wll_close_proxy(edg_wll_Context ctx) -int edg_wll_open(edg_wll_Context ctx) +int edg_wll_open(edg_wll_Context ctx, int* connToUse) { int index; edg_wll_GssStatus gss_stat; @@ -129,9 +146,11 @@ int edg_wll_open(edg_wll_Context ctx) edg_wll_ResetError(ctx); + edg_wll_poolLock(); /* We are going to search the pool, it has better be locked */ + if ( (index = ConnectionIndex(ctx, ctx->srvName, ctx->srvPort)) == -1 ) { /* no such open connection in pool */ - if (ctx->connOpened == ctx->poolSize) + if (ctx->connections->connOpened == ctx->connections->poolSize) ReleaseConnection(ctx, NULL, 0); index = AddConnection(ctx, ctx->srvName, ctx->srvPort); @@ -139,24 +158,24 @@ int edg_wll_open(edg_wll_Context ctx) } /* else - there is cached open connection, reuse it */ - ctx->connToUse = index; + *connToUse = index; /* XXX support anonymous connections, perhaps add a flag to the connPool * struct specifying whether or not this connection shall be authenticated * to prevent from repeated calls to edg_wll_gss_acquire_cred_gsi() */ - if (!ctx->connPool[index].gsiCred && + if (!ctx->connections->connPool[index].gsiCred && edg_wll_gss_acquire_cred_gsi( ctx->p_proxy_filename ? ctx->p_proxy_filename : ctx->p_cert_filename, ctx->p_proxy_filename ? ctx->p_proxy_filename : ctx->p_key_filename, - &ctx->connPool[index].gsiCred, NULL, &gss_stat)) { + &ctx->connections->connPool[index].gsiCred, NULL, &gss_stat)) { edg_wll_SetErrorGss(ctx, "failed to load GSI credentials", &gss_stat); goto err; } - if (ctx->connPool[index].gss.context == GSS_C_NO_CONTEXT) { - switch (edg_wll_gss_connect(ctx->connPool[index].gsiCred, - ctx->connPool[index].peerName, ctx->connPool[index].peerPort, - &ctx->p_tmp_timeout,&ctx->connPool[index].gss, + if (ctx->connections->connPool[index].gss.context == GSS_C_NO_CONTEXT) { + switch (edg_wll_gss_connect(ctx->connections->connPool[index].gsiCred, + ctx->connections->connPool[index].peerName, ctx->connections->connPool[index].peerPort, + &ctx->p_tmp_timeout,&ctx->connections->connPool[index].gss, &gss_stat)) { case EDG_WLL_GSS_OK: @@ -190,9 +209,16 @@ int edg_wll_open(edg_wll_Context ctx) err: /* some error occured; close created connection * and free all fields in connPool[index] */ - CloseConnection(ctx, index); - ctx->connToUse = -1; + CloseConnection(ctx, &index); + *connToUse = -1; ok: + + if (*connToUse>-1) edg_wll_connectionLock(ctx, *connToUse); /* Lock the connection */ + + edg_wll_poolUnlock(); /* One way or the other, there are no more pool-wide operations */ + +// xxxxx + return edg_wll_Error(ctx,NULL,NULL); } @@ -307,39 +333,40 @@ int edg_wll_http_send_recv( { int ec; char *ed = NULL; + int connToUse = -1; //Index of the connection to use. Used to be a context member. - if (edg_wll_open(ctx)) return edg_wll_Error(ctx,NULL,NULL); + if (edg_wll_open(ctx,&connToUse)) return edg_wll_Error(ctx,NULL,NULL); - switch (edg_wll_http_send(ctx,request,req_head,req_body)) { + switch (edg_wll_http_send(ctx,request,req_head,req_body,&ctx->connections->connPool[connToUse])) { case ENOTCONN: - edg_wll_close(ctx); - if (edg_wll_open(ctx) - || edg_wll_http_send(ctx,request,req_head,req_body)) + edg_wll_close(ctx,&connToUse); + if (edg_wll_open(ctx,&connToUse) + || edg_wll_http_send(ctx,request,req_head,req_body,&ctx->connections->connPool[connToUse])) goto err; /* fallthrough */ case 0: break; default: goto err; } - switch (edg_wll_http_recv(ctx,response,resp_head,resp_body)) { + switch (edg_wll_http_recv(ctx,response,resp_head,resp_body,&ctx->connections->connPool[connToUse])) { case ENOTCONN: - edg_wll_close(ctx); - if (edg_wll_open(ctx) - || edg_wll_http_send(ctx,request,req_head,req_body) - || edg_wll_http_recv(ctx,response,resp_head,resp_body)) + edg_wll_close(ctx,&connToUse); + if (edg_wll_open(ctx,&connToUse) + || edg_wll_http_send(ctx,request,req_head,req_body,&ctx->connections->connPool[connToUse]) + || edg_wll_http_recv(ctx,response,resp_head,resp_body,&ctx->connections->connPool[connToUse])) goto err; /* fallthrough */ case 0: break; default: goto err; } - assert(ctx->connToUse >= 0); - gettimeofday(&ctx->connPool[ctx->connToUse].lastUsed, NULL); + assert(connToUse >= 0); + gettimeofday(&ctx->connections->connPool[connToUse].lastUsed, NULL); return 0; err: ec = edg_wll_Error(ctx,NULL,&ed); - edg_wll_close(ctx); + edg_wll_close(ctx,&connToUse); edg_wll_SetError(ctx,ec,ed); free(ed); return ec; diff --git a/org.glite.lb.common/Makefile b/org.glite.lb.common/Makefile index fdc9ac5..6d960c5 100644 --- a/org.glite.lb.common/Makefile +++ b/org.glite.lb.common/Makefile @@ -93,7 +93,8 @@ INSTALL:=libtool --mode=install install OBJS:=${JOBID_OBJS} ${PERF_OBJS} lb_plain_io.o escape.o events.o mini_http.o query_rec.o \ status.o xml_conversions.o xml_parse.o ulm_parse.o param.o \ events_parse.o il_string.o il_int.o notifid.o \ - il_log.o il_msg.o log_msg.o context.o trio.o strio.o lb_maildir.o + il_log.o il_msg.o log_msg.o context.o trio.o strio.o lb_maildir.o \ + connpool.o LOBJS:=${OBJS:.o=.lo} TRIO_OBJS:=escape.o trio.o strio.o @@ -107,7 +108,7 @@ THRLOBJS:=${OBJS:.o=.thr.lo} HDRS:=context-int.h lb_plain_io.h mini_http.h authz.h xml_parse.h \ xml_conversions.h log_proto.h events_parse.h il_string.h il_msg.h \ - escape.h ulm_parse.h trio.h lb_maildir.h ${PERF_HDRS} + escape.h ulm_parse.h trio.h lb_maildir.h connpool.h ${PERF_HDRS} STATICLIB:=libglite_lb_common_${nothrflavour}.a THRSTATICLIB:=libglite_lb_common_${thrflavour}.a diff --git a/org.glite.lb.common/interface/connpool.h b/org.glite.lb.common/interface/connpool.h new file mode 100644 index 0000000..6bb18d4 --- /dev/null +++ b/org.glite.lb.common/interface/connpool.h @@ -0,0 +1,109 @@ +#include "glite/security/glite_gss.h" +#include "glite/lb/consumer.h" +#include "lb_plain_io.h" +#include "authz.h" +#include "glite/lb/log_proto.h" +#include + +#ifdef __cplusplus +extern "C" { +#endif + +#ifndef EDG_WLL_CONNPOOL_DECLARED +#define EDG_WLL_CONNPOOL_DECLARED 1 + +typedef struct _edg_wll_ConnPool { +/* address and port where we are connected to */ + char *peerName; + unsigned int peerPort; + +/* http(s) stream */ + gss_cred_id_t gsiCred; + edg_wll_GssConnection gss; + char *buf; + int bufUse,bufSize; + +/* timestamp of usage of this entry in ctx.connPool */ + struct timeval lastUsed; +} edg_wll_ConnPool; +#endif + + +#ifndef EDG_WLL_CONNECIONS_DECLARED +#define EDG_WLL_CONNECIONS_DECLARED 1 +typedef struct _edg_wll_Connections { + +/* connection pool array */ + edg_wll_ConnPool *connPool; + edg_wll_ConnPool *serverConnection; /* this is in fact a single-item array intended for use + by the server (since the server only uses one connection)*/ + +/* pool of connections from client */ + int poolSize; /* number of connections in the pool */ + int connOpened; /* number of opened connections */ + +/* Connection pool locks & accessories.*/ + pthread_mutex_t poolLock; /* Global pool lock (to be used for pool-wide operations carried out locally) */ + pthread_mutex_t *connectionLock; /* Per-connection lock (used to lock out connections that are in use) */ + edg_wll_Context *locked_by; /* identifies contexts that have been used to lock a connection since they + do probably have a connToUse value stored in them*/ +} edg_wll_Connections; +#endif +/* Connections Global Handle */ + + +extern edg_wll_Connections connectionsHandle; + +/** ** Locking Functions (wrappers) **/ +/* Introduced to simplify debugging and switching betwen single/multi-threaded behavior */ + +/** Lock (try) the pool */ +int edg_wll_poolTryLock(); + +/** Lock the pool (blocking) */ +int edg_wll_poolLock(); + +/** Unlock the pool */ +int edg_wll_poolUnlock(); + + +/** Lock (try) a single connection */ +int edg_wll_connectionTryLock(edg_wll_Context ctx, int index); + +/** Lock a single connection (blocking) */ +int edg_wll_connectionLock(edg_wll_Context ctx, int index); + +/** Unlock a connection */ +int edg_wll_connectionUnlock(edg_wll_Context ctx, int index); + + +/** Free memory used by the pool and lock array */ +void edg_wll_poolFree(); + +/** Allocate memory for the edg_wll_Connections structure and its properties and return a pointer. + in case memory has been already allocated, just return a pointer */ +edg_wll_Connections* edg_wll_initConnections(); + + +/** Set parameters */ + +/* Not yet defined */ + +/** Make connection (+ establish a lock) */ + +/* Not yet defined */ + +/** Unlock connection after use */ + +/* Not yet defined */ + +/** Forcibly drop the connection */ + +/* Not yet defined */ + + + +#ifdef __cplusplus +} +#endif + diff --git a/org.glite.lb.common/interface/context-int.h b/org.glite.lb.common/interface/context-int.h index 84cbedf..433a71d 100644 --- a/org.glite.lb.common/interface/context-int.h +++ b/org.glite.lb.common/interface/context-int.h @@ -7,6 +7,7 @@ #include "glite/lb/consumer.h" #include "lb_plain_io.h" #include "authz.h" +#include "connpool.h" #ifdef __cplusplus extern "C" { @@ -29,20 +30,20 @@ typedef struct _edg_wll_ConnProxy { -typedef struct _edg_wll_ConnPool { +/* typedef struct _edg_wll_ConnPool { */ /* address and port where we are connected to */ - char *peerName; - unsigned int peerPort; - -/* http(s) stream */ - gss_cred_id_t gsiCred; - edg_wll_GssConnection gss; - char *buf; - int bufUse,bufSize; +/* char *peerName; */ +/* unsigned int peerPort; */ + + /* http(s) stream */ +/* gss_cred_id_t gsiCred; */ +/* edg_wll_GssConnection gss; */ +/* char *buf; */ +/* int bufUse,bufSize; */ /* timestamp of usage of this entry in ctx.connPool */ - struct timeval lastUsed; -} edg_wll_ConnPool; +/* struct timeval lastUsed; */ +/* } edg_wll_ConnPool; */ @@ -54,7 +55,7 @@ struct _edg_wll_Context { /* server part */ void *mysql; - edg_wll_ConnPool *connPool; + edg_wll_Connections *connections; edg_wll_ConnPool *connPoolNotif; /* hold _one_ connection from notif-interlogger */ edg_wll_ConnProxy *connProxy; /* holds one plain connection */ @@ -99,10 +100,10 @@ struct _edg_wll_Context { unsigned int srvPort; /* pool of connections from client */ - int poolSize; - int connOpened; /* number of opened connections */ - int connToUse; /* index of connection that will * - * be used by low-level f-cions */ +// int poolSize; +// int connOpened; /* number of opened connections */ +// int connToUse; /* index of connection that will * +// * be used by low-level f-cions */ // XXX similar variables will be needed for connPoolNotif diff --git a/org.glite.lb.common/interface/mini_http.h b/org.glite.lb.common/interface/mini_http.h index 2b846f3..1bae8b9 100644 --- a/org.glite.lb.common/interface/mini_http.h +++ b/org.glite.lb.common/interface/mini_http.h @@ -4,6 +4,7 @@ #ident "$Header$" #include "glite/lb/consumer.h" +#include "connpool.h" /* XXX: not a good place for the folowing #def's but we ain't got better currently */ /** protocol version */ @@ -30,16 +31,18 @@ extern edg_wll_ErrorCode edg_wll_http_recv( edg_wll_Context, /* INOUT: context */ - char **, /* OUT: first line */ - char ***, /* OUT: null terminated array of headers */ - char ** /* OUT: message body */ + char **, /* OUT: first line */ + char ***, /* OUT: null terminated array of headers */ + char **, /* OUT: message body */ + edg_wll_ConnPool *connPTR /* IN: Pointer to the connection to use */ ); extern edg_wll_ErrorCode edg_wll_http_send( edg_wll_Context, /* INOUT: context */ const char *, /* IN: first line */ char const * const *, /* IN: headers */ - const char * /* IN: message body */ + const char *, /* IN: message body */ + edg_wll_ConnPool *connPTR /* IN: Pointer to the connection to use */ ); extern edg_wll_ErrorCode edg_wll_http_recv_proxy( diff --git a/org.glite.lb.common/src/connpool.c b/org.glite.lb.common/src/connpool.c new file mode 100644 index 0000000..1275b07 --- /dev/null +++ b/org.glite.lb.common/src/connpool.c @@ -0,0 +1,165 @@ +#include "connpool.h" + +edg_wll_Connections connectionsHandle = + { NULL , NULL , EDG_WLL_LOG_CONNECTIONS_DEFAULT , 0 , PTHREAD_MUTEX_INITIALIZER , NULL , NULL}; + + +/** Lock (try) the pool */ +int edg_wll_poolTryLock() { +int RetVal; + + #ifdef THR + RetVal = pthread_mutex_trylock(&edg_wll_Connections.poolLock); + #endif + + #ifndef THR + RetVal = 0; + #endif + + + return(RetVal); +} + + +/** Lock the pool (blocking) */ +int edg_wll_poolLock() { +int RetVal; + + #ifdef THR + RetVal = pthread_mutex_lock(&edg_wll_Connections.poolLock); + #endif + + #ifndef THR + RetVal = 0; + #endif + + + return(RetVal); +} + + +/** Unlock the pool */ +int edg_wll_poolUnlock() { +int RetVal; + + #ifdef THR + RetVal = pthread_mutex_unlock(&edg_wll_Connections.poolLock); + #endif + + #ifndef THR + RetVal = 0; + #endif + + + return(RetVal); +} + + +/** Lock (try) a single connection */ +int edg_wll_connectionTryLock(edg_wll_Context ctx, int index) { +int RetVal; + + #ifdef THR + RetVal = pthread_mutex_trylock(&edg_wll_Connections.connectionLock[index]); /* Try to lock the connection */ + if (!RetVal) connectionsHandle.locked_by[index] = (void*)ctx; /* If lock succeeded, store the + locking context address */ + #endif + + #ifndef THR + RetVal = 0; + #endif + + + return(RetVal); +} + + +/** Lock a single connection (blocking) */ +int edg_wll_connectionLock(edg_wll_Context ctx, int index) { +int RetVal; + + #ifdef THR + RetVal = pthread_mutex_lock(&edg_wll_Connections.connectionLock[index]); /* Lock the connection (wait if + not available)*/ + if (!RetVal) connectionsHandle.locked_by[index] = (void*)ctx; /* If lock succeeded, store the + locking context address */ + #endif + + #ifndef THR + RetVal = 0; + #endif + + + return(RetVal); +} + + +/** Unlock a connection */ +int edg_wll_connectionUnlock(edg_wll_Context ctx, int index) { +int RetVal; + + #ifdef THR + RetVal = pthread_mutex_unlock(&edg_wll_Connections.connectionLock[index]); + if (!RetVal) connectionsHandle.locked_by[index] = NULL; + #endif + + #ifndef THR + RetVal = 0; + #endif + + + return(RetVal); +} + + +/** Free all memory used by the pool and lock-related arrays */ +void edg_wll_poolFree() { + int i; + struct timeval close_timeout = {0, 50000}; /* Declarations taken over from edg_wll_FreeContext() */ + OM_uint32 min_stat; /* Does not seem to have any use whatsoever - neither + here nor in edg_wll_FreeContext() */ + + + printf("edg_wll_poolFree Checkpoint\n"); + + for (i=0; i 0)) { /* We need to allocate memory for the connPool and connectionLock arrays */ + connectionsHandle.connPool = (edg_wll_ConnPool *) calloc(connectionsHandle.poolSize, sizeof(edg_wll_ConnPool)); + connectionsHandle.connectionLock = (pthread_mutex_t *) calloc(connectionsHandle.poolSize, sizeof(pthread_mutex_t)); + connectionsHandle.locked_by = (edg_wll_Context) calloc(connectionsHandle.poolSize, sizeof(edg_wll_Context)); + + } + if(connectionsHandle.serverConnection == NULL) { + connectionsHandle.serverConnection = (edg_wll_ConnPool *) calloc(1, sizeof(edg_wll_ConnPool)); + } + + + return (&connectionsHandle); +} + + diff --git a/org.glite.lb.common/src/context.c b/org.glite.lb.common/src/context.c index 561e068..36c4d79 100644 --- a/org.glite.lb.common/src/context.c +++ b/org.glite.lb.common/src/context.c @@ -34,11 +34,12 @@ int edg_wll_InitContext(edg_wll_Context *ctx) /* XXX */ for (i=0; iconnPool = (edg_wll_ConnPool *) calloc(out->poolSize, sizeof(edg_wll_ConnPool)); + out->connections = edg_wll_initConnections(); +// out->connections->connPool = (edg_wll_ConnPool *) calloc(out->connections->poolSize, sizeof(edg_wll_ConnPool)); out->connPoolNotif = (edg_wll_ConnPool *) calloc(1, sizeof(edg_wll_ConnPool)); out->connProxy = (edg_wll_ConnProxy *) calloc(1, sizeof(edg_wll_ConnProxy)); out->connProxy->conn.sock = -1; - out->connToUse = -1; +// out->connToUse = -1; *ctx = out; @@ -73,17 +74,26 @@ void edg_wll_FreeContext(edg_wll_Context ctx) } #endif if (ctx->errDesc) free(ctx->errDesc); - if (ctx->connPool) { + if (ctx->connections->connPool) { int i; - - for (i=0; ipoolSize; i++) { - if (ctx->connPool[i].peerName) free(ctx->connPool[i].peerName); - edg_wll_gss_close(&ctx->connPool[i].gss,&close_timeout); - if (ctx->connPool[i].gsiCred) - gss_release_cred(&min_stat, &ctx->connPool[i].gsiCred); - if (ctx->connPool[i].buf) free(ctx->connPool[i].buf); + + /* Since the introduction of a shared connection pool, the pool cannot freed here. + We only need to unlock connections locked using this context. */ + for (i=0; iconnections->poolSize; i++) { + if (ctx->connections->locked_by[i]==ctx) { + edg_wll_connectionUnlock(ctx, i); + } + } + +/* + for (i=0; iconnections->poolSize; i++) { + if (ctx->connections->connPool[i].peerName) free(ctx->connections->connPool[i].peerName); + edg_wll_gss_close(&ctx->connections->connPool[i].gss,&close_timeout); + if (ctx->connections->connPool[i].gsiCred) + gss_release_cred(&min_stat, &ctx->connections->connPool[i].gsiCred); + if (ctx->connections->connPool[i].buf) free(ctx->connections->connPool[i].buf); } - free(ctx->connPool); + free(ctx->connections->connPool);*/ } if (ctx->connPoolNotif) { if (ctx->connPoolNotif[0].peerName) free(ctx->connPoolNotif[0].peerName); diff --git a/org.glite.lb.common/src/mini_http.c b/org.glite.lb.common/src/mini_http.c index fcb3157..ad302a8 100644 --- a/org.glite.lb.common/src/mini_http.c +++ b/org.glite.lb.common/src/mini_http.c @@ -21,7 +21,7 @@ #define min(x,y) ((x) < (y) ? (x) : (y)) #define CONTENT_LENGTH "Content-Length:" -edg_wll_ErrorCode edg_wll_http_recv(edg_wll_Context ctx,char **firstOut,char ***hdrOut,char **bodyOut) +edg_wll_ErrorCode edg_wll_http_recv(edg_wll_Context ctx,char **firstOut,char ***hdrOut,char **bodyOut, edg_wll_ConnPool *connPTR) { char **hdr = NULL,*first = NULL,*body = NULL; enum { FIRST, HEAD, BODY, DONE } pstat = FIRST; @@ -30,23 +30,23 @@ edg_wll_ErrorCode edg_wll_http_recv(edg_wll_Context ctx,char **firstOut,char *** edg_wll_GssStatus gss_code; #define bshift(shift) {\ - memmove(ctx->connPool[ctx->connToUse].buf,ctx->connPool[ctx->connToUse].buf+(shift),ctx->connPool[ctx->connToUse].bufUse-(shift));\ - ctx->connPool[ctx->connToUse].bufUse -= (shift);\ + memmove(connPTR->buf,connPTR->buf+(shift),connPTR->bufUse-(shift));\ + connPTR->bufUse -= (shift);\ } edg_wll_ResetError(ctx); - if (ctx->connPool[ctx->connToUse].gss.context != GSS_C_NO_CONTEXT) - sock = ctx->connPool[ctx->connToUse].gss.sock; + if (connPTR->gss.context != GSS_C_NO_CONTEXT) + sock = connPTR->gss.sock; else { edg_wll_SetError(ctx,ENOTCONN,NULL); goto error; } - if (!ctx->connPool[ctx->connToUse].buf) ctx->connPool[ctx->connToUse].buf = malloc(ctx->connPool[ctx->connToUse].bufSize = BUFSIZ); + if (!connPTR->buf) connPTR->buf = malloc(connPTR->bufSize = BUFSIZ); do { - len = edg_wll_gss_read(&ctx->connPool[ctx->connToUse].gss, - ctx->connPool[ctx->connToUse].buf+ctx->connPool[ctx->connToUse].bufUse,ctx->connPool[ctx->connToUse].bufSize-ctx->connPool[ctx->connToUse].bufUse,&ctx->p_tmp_timeout, &gss_code); + len = edg_wll_gss_read(&connPTR->gss, + connPTR->buf+connPTR->bufUse,connPTR->bufSize-connPTR->bufUse,&ctx->p_tmp_timeout, &gss_code); switch (len) { case EDG_WLL_GSS_OK: @@ -67,27 +67,27 @@ edg_wll_ErrorCode edg_wll_http_recv(edg_wll_Context ctx,char **firstOut,char *** } - ctx->connPool[ctx->connToUse].bufUse += len; + connPTR->bufUse += len; rdmore = 0; while (!rdmore && pstat != DONE) switch (pstat) { char *cr; case FIRST: - if ((cr = memchr(ctx->connPool[ctx->connToUse].buf,'\r',ctx->connPool[ctx->connToUse].bufUse)) && - ctx->connPool[ctx->connToUse].bufUse >= cr-ctx->connPool[ctx->connToUse].buf+2 && cr[1] == '\n') + if ((cr = memchr(connPTR->buf,'\r',connPTR->bufUse)) && + connPTR->bufUse >= cr-connPTR->buf+2 && cr[1] == '\n') { *cr = 0; - first = strdup(ctx->connPool[ctx->connToUse].buf); - bshift(cr-ctx->connPool[ctx->connToUse].buf+2); + first = strdup(connPTR->buf); + bshift(cr-connPTR->buf+2); pstat = HEAD; } else rdmore = 1; break; case HEAD: - if ((cr = memchr(ctx->connPool[ctx->connToUse].buf,'\r',ctx->connPool[ctx->connToUse].bufUse)) && - ctx->connPool[ctx->connToUse].bufUse >= cr-ctx->connPool[ctx->connToUse].buf+2 && cr[1] == '\n') + if ((cr = memchr(connPTR->buf,'\r',connPTR->bufUse)) && + connPTR->bufUse >= cr-connPTR->buf+2 && cr[1] == '\n') { - if (cr == ctx->connPool[ctx->connToUse].buf) { + if (cr == connPTR->buf) { bshift(2); pstat = clen ? BODY : DONE; if (clen) body = malloc(clen+1); @@ -96,19 +96,19 @@ edg_wll_ErrorCode edg_wll_http_recv(edg_wll_Context ctx,char **firstOut,char *** *cr = 0; hdr = realloc(hdr,(nhdr+2) * sizeof(*hdr)); - hdr[nhdr] = strdup(ctx->connPool[ctx->connToUse].buf); + hdr[nhdr] = strdup(connPTR->buf); hdr[++nhdr] = NULL; - if (!strncasecmp(ctx->connPool[ctx->connToUse].buf,CONTENT_LENGTH,sizeof(CONTENT_LENGTH)-1)) - clen = atoi(ctx->connPool[ctx->connToUse].buf+sizeof(CONTENT_LENGTH)-1); + if (!strncasecmp(connPTR->buf,CONTENT_LENGTH,sizeof(CONTENT_LENGTH)-1)) + clen = atoi(connPTR->buf+sizeof(CONTENT_LENGTH)-1); - bshift(cr-ctx->connPool[ctx->connToUse].buf+2); + bshift(cr-connPTR->buf+2); } else rdmore = 1; break; case BODY: - if (ctx->connPool[ctx->connToUse].bufUse) { - int m = min(ctx->connPool[ctx->connToUse].bufUse,clen-blen); - memcpy(body+blen,ctx->connPool[ctx->connToUse].buf,m); + if (connPTR->bufUse) { + int m = min(connPTR->bufUse,clen-blen); + memcpy(body+blen,connPTR->buf,m); blen += m; bshift(m); } @@ -296,23 +296,23 @@ static int real_write(edg_wll_Context ctx, edg_wll_GssConnection *con,const char } } -edg_wll_ErrorCode edg_wll_http_send(edg_wll_Context ctx,const char *first,const char * const *head,const char *body) +edg_wll_ErrorCode edg_wll_http_send(edg_wll_Context ctx,const char *first,const char * const *head,const char *body, edg_wll_ConnPool *connPTR) { const char* const *h; int len = 0, blen; edg_wll_ResetError(ctx); - if (ctx->connPool[ctx->connToUse].gss.context == GSS_C_NO_CONTEXT) + if (connPTR->gss.context == GSS_C_NO_CONTEXT) return edg_wll_SetError(ctx,ENOTCONN,NULL); - if (real_write(ctx,&ctx->connPool[ctx->connToUse].gss,first,strlen(first)) < 0 || - real_write(ctx,&ctx->connPool[ctx->connToUse].gss,"\r\n",2) < 0) + if (real_write(ctx,&connPTR->gss,first,strlen(first)) < 0 || + real_write(ctx,&connPTR->gss,"\r\n",2) < 0) return edg_wll_SetError(ctx,errno,"edg_wll_http_send()"); if (head) for (h=head; *h; h++) - if (real_write(ctx,&ctx->connPool[ctx->connToUse].gss,*h,strlen(*h)) < 0 || - real_write(ctx,&ctx->connPool[ctx->connToUse].gss,"\r\n",2) < 0) + if (real_write(ctx,&connPTR->gss,*h,strlen(*h)) < 0 || + real_write(ctx,&connPTR->gss,"\r\n",2) < 0) return edg_wll_SetError(ctx,errno,"edg_wll_http_send()"); if (body) { @@ -320,13 +320,13 @@ edg_wll_ErrorCode edg_wll_http_send(edg_wll_Context ctx,const char *first,const len = strlen(body); blen = sprintf(buf,CONTENT_LENGTH " %d\r\n",len); - if (real_write(ctx,&ctx->connPool[ctx->connToUse].gss,buf,blen) < 0) + if (real_write(ctx,&connPTR->gss,buf,blen) < 0) return edg_wll_SetError(ctx,errno,"edg_wll_http_send()"); } - if (real_write(ctx,&ctx->connPool[ctx->connToUse].gss,"\r\n",2) < 0) + if (real_write(ctx,&connPTR->gss,"\r\n",2) < 0) return edg_wll_SetError(ctx,errno,"edg_wll_http_send()"); - if (body && real_write(ctx,&ctx->connPool[ctx->connToUse].gss,body,len) < 0) + if (body && real_write(ctx,&connPTR->gss,body,len) < 0) return edg_wll_SetError(ctx,errno,"edg_wll_http_send()"); return edg_wll_Error(ctx,NULL,NULL); diff --git a/org.glite.lb.common/src/param.c b/org.glite.lb.common/src/param.c index d0b70bd..82dc590 100644 --- a/org.glite.lb.common/src/param.c +++ b/org.glite.lb.common/src/param.c @@ -243,7 +243,8 @@ int edg_wll_SetParamInt(edg_wll_Context ctx,edg_wll_ContextParam param,int val) char *s = mygetenv(param); if (!val && s) val = atoi(s); - ctx->poolSize = val ? val : EDG_WLL_LOG_CONNECTIONS_DEFAULT; +// ctx->connections->poolSize = val ? val : EDG_WLL_LOG_CONNECTIONS_DEFAULT; + connectionsHandle.poolSize = val ? val : EDG_WLL_LOG_CONNECTIONS_DEFAULT; } break; case EDG_WLL_PARAM_SOURCE: @@ -380,7 +381,7 @@ int edg_wll_GetParam(edg_wll_Context ctx,edg_wll_ContextParam param,...) break; case EDG_WLL_PARAM_QUERY_CONNECTIONS: p_int = va_arg(ap, int *); - *p_int = ctx->poolSize; + *p_int = ctx->connections->poolSize; break; case EDG_WLL_PARAM_SOURCE: p_int = va_arg(ap, int *); diff --git a/org.glite.lb.server/src/bkserverd.c b/org.glite.lb.server/src/bkserverd.c index 16a63dc..972588c 100644 --- a/org.glite.lb.server/src/bkserverd.c +++ b/org.glite.lb.server/src/bkserverd.c @@ -72,8 +72,9 @@ extern int edg_wll_StoreProto(edg_wll_Context ctx); extern edg_wll_ErrorCode edg_wll_Open(edg_wll_Context ctx, char *cs); extern edg_wll_ErrorCode edg_wll_Close(edg_wll_Context); - - +#ifdef LB_PERF +extern void _start (void), etext (void); +#endif #define CON_QUEUE 20 /* accept() */ #define SLAVE_OVERLOAD 10 /* queue items per slave */ @@ -573,6 +574,9 @@ a.sin_addr.s_addr = INADDR_ANY; perror("deamon()"); exit(1); } +#ifdef LB_PERF + monstartup((u_long)&_start, (u_long)&etext); +#endif fpid = fopen(pidfile,"w"); if (!fpid) { perror(pidfile); return 1; } @@ -755,14 +759,15 @@ int bk_handle_connection(int conn, struct timeval *timeout, void *data) ctx->p_tmp_timeout.tv_sec = timeout->tv_sec; ctx->p_tmp_timeout.tv_usec = timeout->tv_usec; - ctx->poolSize = 1; - ctx->connPool = calloc(1, sizeof(edg_wll_ConnPool)); - ctx->connToUse = 0; +// ctx->connections->poolSize = 0; +// ctx->connections->connPool = calloc(1, sizeof(edg_wll_ConnPool)); +// ctx->connToUse = 0; + edg_wll_initConnections(); alen = sizeof(a); getpeername(conn, (struct sockaddr *)&a, &alen); - ctx->connPool[ctx->connToUse].peerName = strdup(inet_ntoa(a.sin_addr)); - ctx->connPool[ctx->connToUse].peerPort = ntohs(a.sin_port); + ctx->connections->serverConnection->peerName = strdup(inet_ntoa(a.sin_addr)); + ctx->connections->serverConnection->peerPort = ntohs(a.sin_port); ctx->count_statistics = count_statistics; gettimeofday(&conn_start, 0); @@ -773,8 +778,8 @@ int bk_handle_connection(int conn, struct timeval *timeout, void *data) case NETDB_SUCCESS: if (name) dprintf(("[%d] connection from %s:%d (%s)\n", getpid(), inet_ntoa(a.sin_addr), ntohs(a.sin_port), name)); - free(ctx->connPool[ctx->connToUse].peerName); - ctx->connPool[ctx->connToUse].peerName = name; + free(ctx->connections->serverConnection->peerName); + ctx->connections->serverConnection->peerName = name; name = NULL; break; @@ -839,7 +844,7 @@ int bk_handle_connection(int conn, struct timeval *timeout, void *data) ctx->srvPort = ntohs(a.sin_port); } - if ( (ret = edg_wll_gss_accept(mycred, conn, timeout, &ctx->connPool[ctx->connToUse].gss, &gss_code)) ) + if ( (ret = edg_wll_gss_accept(mycred, conn, timeout, &ctx->connections->serverConnection->gss, &gss_code)) ) { if ( ret == EDG_WLL_GSS_ERROR_TIMEOUT ) { @@ -856,7 +861,7 @@ int bk_handle_connection(int conn, struct timeval *timeout, void *data) return 1; } - maj_stat = gss_inquire_context(&min_stat, ctx->connPool[ctx->connToUse].gss.context, + maj_stat = gss_inquire_context(&min_stat, ctx->connections->serverConnection->gss.context, &client_name, NULL, NULL, NULL, NULL, NULL, NULL); if ( !GSS_ERROR(maj_stat) ) maj_stat = gss_display_name(&min_stat, client_name, &token, NULL); @@ -883,7 +888,7 @@ int bk_handle_connection(int conn, struct timeval *timeout, void *data) if ( token.value ) gss_release_buffer(&min_stat, &token); - if ( edg_wll_SetVomsGroups(ctx, &ctx->connPool[ctx->connToUse].gss, server_cert, server_key, vomsdir, cadir) ) + if ( edg_wll_SetVomsGroups(ctx, &ctx->connections->serverConnection->gss, server_cert, server_key, vomsdir, cadir) ) { char *errt, *errd; @@ -951,7 +956,7 @@ int bk_handle_ws_connection(int conn, struct timeval *timeout, void *data) soap_done(soap); goto err; } - gsplugin_ctx->connection = &cdata->ctx->connPool[cdata->ctx->connToUse].gss; + gsplugin_ctx->connection = &cdata->ctx->connections->serverConnection->gss; gsplugin_ctx->cred = mycred; cdata->soap = soap; @@ -1164,8 +1169,8 @@ int bk_clnt_disconnect(int conn, struct timeval *timeout, void *cdata) edg_wll_Context ctx = ((struct clnt_data_t *) cdata)->ctx; - if ( ctx->connPool[ctx->connToUse].gss.context != GSS_C_NO_CONTEXT) - edg_wll_gss_close(&ctx->connPool[ctx->connToUse].gss, timeout); + if ( ctx->connections->serverConnection->gss.context != GSS_C_NO_CONTEXT) + edg_wll_gss_close(&ctx->connections->serverConnection->gss, timeout); edg_wll_FreeContext(ctx); ctx = NULL; @@ -1408,10 +1413,11 @@ static int parse_limits(char *opt, int *j_limit, int *e_limit, int *size_limit) return (sscanf(opt, "%d:%d:%d", j_limit, e_limit, size_limit) == 3); } + static int check_mkdir(const char *dir) { struct stat sbuf; - + if ( stat(dir, &sbuf) ) { if ( errno == ENOENT ) diff --git a/org.glite.lb.server/src/lb_http.c b/org.glite.lb.server/src/lb_http.c index 038d679..3e2cb50 100644 --- a/org.glite.lb.server/src/lb_http.c +++ b/org.glite.lb.server/src/lb_http.c @@ -26,7 +26,7 @@ int edg_wll_ServerHTTP(edg_wll_Context ctx) if ( ctx->isProxy ) err = edg_wll_http_recv_proxy(ctx,&req,&hdr,&body); - else err = edg_wll_http_recv(ctx,&req,&hdr,&body); + else err = edg_wll_http_recv(ctx,&req,&hdr,&body,ctx->connections->serverConnection); dprintf(("[%d] %s\n",getpid(),req)); if (body) dprintf(("\n%s\n\n",body)); @@ -39,7 +39,7 @@ int edg_wll_ServerHTTP(edg_wll_Context ctx) if ( ctx->isProxy ) edg_wll_http_send_proxy(ctx,resp,(char const * const *)hdrOut,bodyOut); else - edg_wll_http_send(ctx,resp,(char const * const *)hdrOut,bodyOut); + edg_wll_http_send(ctx,resp,(char const * const *)hdrOut,bodyOut,ctx->connections->serverConnection); } } diff --git a/org.glite.lb.server/src/notification.c b/org.glite.lb.server/src/notification.c index 3b6243c..2e4c26e 100644 --- a/org.glite.lb.server/src/notification.c +++ b/org.glite.lb.server/src/notification.c @@ -97,7 +97,7 @@ int edg_wll_NotifNewServer( goto cleanup; } if ( !strncmp(address_override, "0.0.0.0", aux-address_override) ) - trio_asprintf(&addr_s, "%s:%s", ctx->connPool[ctx->connToUse].peerName, aux+1); + trio_asprintf(&addr_s, "%s:%s", ctx->connections->serverConnection->peerName, aux+1); } /* Format DB insert statement @@ -203,7 +203,7 @@ int edg_wll_NotifBindServer( goto cleanup; } if ( !strncmp(address_override, "0.0.0.0", aux-address_override) ) - trio_asprintf(&addr_s, "%s:%s", ctx->connPool[ctx->connToUse].peerName, aux+1); + trio_asprintf(&addr_s, "%s:%s", ctx->connections->serverConnection->peerName, aux+1); } diff --git a/org.glite.lb.server/src/srv_purge.c b/org.glite.lb.server/src/srv_purge.c index 5b3c117..bfa1aa3 100644 --- a/org.glite.lb.server/src/srv_purge.c +++ b/org.glite.lb.server/src/srv_purge.c @@ -380,7 +380,7 @@ abort: asprintf(&response, "HTTP/1.1 %d %s", ret, edg_wll_HTTPErrorMessage(ret)); - edg_wll_http_send(ctx, response, resp_headers, message); + edg_wll_http_send(ctx, response, resp_headers, message,ctx->connections->serverConnection); return edg_wll_Error(ctx,NULL,NULL); } diff --git a/org.glite.lb.server/src/stored_master.c b/org.glite.lb.server/src/stored_master.c index ed78612..d827822 100644 --- a/org.glite.lb.server/src/stored_master.c +++ b/org.glite.lb.server/src/stored_master.c @@ -28,7 +28,7 @@ gss_reader(void *user_data, char *buffer, int max_len) int ret, len; edg_wll_GssStatus gss_code; - ret = edg_wll_gss_read_full(&tmp_ctx->connPool[tmp_ctx->connToUse].gss, + ret = edg_wll_gss_read_full(&tmp_ctx->connections->serverConnection->gss, buffer, max_len, &tmp_ctx->p_tmp_timeout, &len, &gss_code); @@ -71,7 +71,7 @@ int edg_wll_StoreProto(edg_wll_Context ctx) free(buf); if ((len = create_reply(ctx,&buf)) > 0) { - if ((ret = edg_wll_gss_write_full(&ctx->connPool[ctx->connToUse].gss,buf,len,&ctx->p_tmp_timeout,&total,&gss_code)) < 0) + if ((ret = edg_wll_gss_write_full(&ctx->connections->serverConnection->gss,buf,len,&ctx->p_tmp_timeout,&total,&gss_code)) < 0) edg_wll_SetError(ctx, ret == EDG_WLL_GSS_ERROR_TIMEOUT ? ETIMEDOUT : EDG_WLL_ERROR_GSS,