From 448a9cb1ee83082bbe9dc79713835681a1c3658f Mon Sep 17 00:00:00 2001 From: =?utf8?q?Zden=C4=9Bk=20=C5=A0ustr?= Date: Thu, 21 Sep 2006 15:40:41 +0000 Subject: [PATCH] Connection Pool improvements, allowing "holes" in the pool. --- org.glite.lb.client/Makefile | 10 ++- org.glite.lb.client/examples/user_jobs_threaded.c | 103 +++++++++++++++++----- org.glite.lb.client/src/connection.c | 50 ++++++++--- 3 files changed, 128 insertions(+), 35 deletions(-) diff --git a/org.glite.lb.client/Makefile b/org.glite.lb.client/Makefile index ba29ed7..81a459f 100644 --- a/org.glite.lb.client/Makefile +++ b/org.glite.lb.client/Makefile @@ -151,7 +151,8 @@ THRPLUSLIB:=libglite_lb_clientpp_${thrflavour}.la TOOLS:=dump load purge lb_dump_exporter ${LB_PERF_TOOLS} EXAMPLES:=log_usertag_proxy job_log job_reg feed_shark notify query_ext query_seq_code stats abort_job change_acl stresslog -EXAMPLES_CL=user_jobs job_status user_jobs_threaded +EXAMPLES_CL=user_jobs job_status +EXAMPLES_CL_THR=user_jobs_threaded FAKE_EXAMPLES:=job_log_fake MAN_GZ:=glite-lb-logevent.1.gz @@ -200,6 +201,9 @@ ${TOOLS} ${EXAMPLES}: %: %.o ${EXAMPLES_CL}: %: %.o ${LINK} -o $@ $< ${LIB} ${COMMON_LIB} ${EXT_LIB} +${EXAMPLES_CL_THR}: %: %.o + ${LINK} -o $@ $< ${THRLIB} ${COMMON_LIB_THR} ${EXT_LIB} + ${FAKE_EXAMPLES}: %: %.o ${FAKELIB} ${LINK} -o $@ $< ${FAKELIB} ${TEST_LIBS} ${EXT_LIB} @@ -259,7 +263,7 @@ else compile all: ${LIB} ${THRLIB} ${TOOLS} logevent ${PLUSLIB} ${THRPLUSLIB} examples ${MAN_GZ} endif -examples: ${EXAMPLES} ${EXAMPLES_CL} ${sh_PROGS} +examples: ${EXAMPLES} ${EXAMPLES_CL} ${EXAMPLES_CL_THR} ${sh_PROGS} fake: ${FAKE_EXAMPLES} @@ -313,7 +317,7 @@ endif for p in ${TOOLS} ; do \ ${INSTALL} -m 755 "$$p" "${PREFIX}/sbin/glite-lb-$$p"; \ done - for p in ${EXAMPLES} ${EXAMPLES_CL} ${sh_PROGS} ; do \ + for p in ${EXAMPLES} ${EXAMPLES_CL} ${EXAMPLES_CL_THR} ${sh_PROGS} ; do \ ${INSTALL} -m 755 "$$p" "${PREFIX}/examples/glite-lb-$$p"; \ done ${INSTALL} -m 755 ${top_srcdir}/examples/export.sh "${PREFIX}/examples/glite-lb-export.sh" diff --git a/org.glite.lb.client/examples/user_jobs_threaded.c b/org.glite.lb.client/examples/user_jobs_threaded.c index 8d96ed3..47e0b05 100644 --- a/org.glite.lb.client/examples/user_jobs_threaded.c +++ b/org.glite.lb.client/examples/user_jobs_threaded.c @@ -6,6 +6,8 @@ #include +#include + #include "glite/lb/context.h" #include "glite/lb/xml_conversions.h" #include "glite/lb/consumer.h" @@ -21,32 +23,28 @@ usage(char *me) fprintf(stderr,"usage: %s [-h] [-x] [userid]\n", me); } -int main(int argc,char **argv) -{ +typedef struct { + char *owner; + int proxy; + char *argv_0;} thread_code_args; + +void *thread_code(thread_code_args *arguments) { + edg_wll_Context ctx; char *errt,*errd; edg_wlc_JobId *jobs = NULL; edg_wll_JobStat *states = NULL; int i,j; - char *owner = NULL; user_jobs = edg_wll_UserJobs; - for ( i = 1; i < argc; i++ ) { - if ( !strcmp(argv[i], "-h") || !strcmp(argv[i], "--help") ) { - usage(argv[0]); - exit(0); - } else if ( !strcmp(argv[i], "-x") ) { - user_jobs = edg_wll_UserJobsProxy; - continue; - } - owner = strdup(argv[i]); - break; - } + if (arguments->proxy) { + user_jobs = edg_wll_UserJobsProxy; + } edg_wll_InitContext(&ctx); - if ( user_jobs == edg_wll_UserJobsProxy && owner ) - edg_wll_SetParam(ctx, EDG_WLL_PARAM_LBPROXY_USER, owner); + if ( user_jobs == edg_wll_UserJobsProxy && arguments->owner ) + edg_wll_SetParam(ctx, EDG_WLL_PARAM_LBPROXY_USER, arguments->owner); if (user_jobs(ctx,&jobs,&states)) goto err; for (i=0; states[i].state != EDG_WLL_JOB_UNDEF; i++) { @@ -84,7 +82,7 @@ int main(int argc,char **argv) printf("\nFound %d jobs\n",i); err: - free(owner); + free(arguments->owner); if (jobs) { for (i=0; jobs[i]; i++) edg_wlc_JobIdFree(jobs[i]); free(jobs); @@ -96,12 +94,77 @@ err: } if (edg_wll_Error(ctx,&errt,&errd)) { - fprintf(stderr,"%s: %s (%s)\n",argv[0],errt,errd); + fprintf(stderr,"%s: %s (%s)\n",arguments->argv_0,errt,errd); edg_wll_FreeContext(ctx); - return 1; + pthread_exit(NULL); } edg_wll_FreeContext(ctx); - return 0; + + printf("Thread %d exitting\n",(int)pthread_self()); + + pthread_exit(NULL); + + printf("Thread %d out !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!\n",(int)pthread_self()); +// return 0; +} + +int main(int argc,char **argv) +{ + + #define NUM_THREADS 10 + + thread_code_args arguments = { NULL , 0 , NULL }; + int i,rc,status; + pthread_t threads[NUM_THREADS]; + pthread_attr_t attr; + + pthread_attr_init(&attr); + pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE); + + for ( i = 1; i < argc; i++ ) { + if ( !strcmp(argv[i], "-h") || !strcmp(argv[i], "--help") ) { + usage(argv[0]); + exit(0); + } + else if ( !strcmp(argv[i], "-x") ) { + arguments.proxy = 1; + } + + arguments.owner = strdup(argv[i]); + } + + arguments.argv_0 = argv[0]; + + + for (i=0;iconnections->connOpened); assert(cIndex < ctx->connections->connOpened); @@ -40,13 +40,18 @@ static void CloseConnection(edg_wll_Context ctx, int* conn_index) /* if deleted conn was not the last one -> there is a 'hole' and then */ /* 'shake' together connections in pool, no holes are allowed */ - if (cIndex < ctx->connections->connOpened - 1) { + /* */ + /* This principle is unsuitable for multi-threaded applications. Too much waiting for connections */ + /* to unlock. We need to allow "holes" in the pool. */ + +/* if (cIndex < ctx->connections->connOpened - 1) { ctx->connections->connPool[cIndex] = ctx->connections->connPool[ctx->connections->connOpened - 1]; memset(ctx->connections->connPool + ctx->connections->connOpened - 1 , 0, sizeof(edg_wll_ConnPool)); - } + }*/ + ctx->connections->connOpened--; - conn_index[0] = cIndex; + *conn_index = cIndex; } @@ -55,8 +60,9 @@ static int ConnectionIndex(edg_wll_Context ctx, const char *name, int port) { int i; - for (i=0; iconnections->connOpened;i++) { - if (!strcmp(name, ctx->connections->connPool[i].peerName) && + for (i=0; iconnections->poolSize;i++) { + if ((ctx->connections->connPool[i].peerName != NULL) && + !strcmp(name, ctx->connections->connPool[i].peerName) && (port == ctx->connections->connPool[i].peerPort)) { /* TryLock (next line) is in fact used only @@ -88,8 +94,19 @@ static int ConnectionIndex(edg_wll_Context ctx, const char *name, int port) static int AddConnection(edg_wll_Context ctx, char *name, int port) { - int index = ctx->connections->connOpened; - + int i,index = -1; + + for (i = 0; i < ctx->connections->poolSize; i++) { + if (ctx->connections->connPool[i].peerName == NULL) { + if (!edg_wll_connectionTryLock(ctx, i)) { + index = i; // This connection is free and was not locked. We may lock it and use it. + break; + } + } + } + + if (index < 0) return -1; + free(ctx->connections->connPool[index].peerName); // should be empty; just to be sure ctx->connections->connPool[index].peerName = strdup(ctx->srvName); ctx->connections->connPool[index].peerPort = ctx->srvPort; @@ -112,9 +129,11 @@ static void ReleaseConnection(edg_wll_Context ctx, char *name, int port) if ((index = ConnectionIndex(ctx, name, port)) >= 0) CloseConnection(ctx, &index); } - else { /* free the oldest connection*/ + else { /* free the oldest (unlocked) connection */ + assert(ctx->connections->connPool[0].peerName); // Full pool expected - accept non-NULL values only min = ctx->connections->connPool[0].lastUsed.tv_sec; - for (i=0; iconnections->connOpened; i++) { + for (i=0; iconnections->poolSize; i++) { + assert(ctx->connections->connPool[i].peerName); // Full pool expected - accept non-NULL values only if (ctx->connections->connPool[i].lastUsed.tv_sec < min) { min = ctx->connections->connPool[i].lastUsed.tv_sec; index = i; @@ -166,10 +185,16 @@ int edg_wll_open(edg_wll_Context ctx, int* connToUse) ReleaseConnection(ctx, NULL, 0); index = AddConnection(ctx, ctx->srvName, ctx->srvPort); + #ifdef EDG_WLL_CONNPOOL_DEBUG + printf("Connection to %s:%d opened as No. %d in the pool\n",ctx->srvName,ctx->srvPort,index); + #endif } /* else - there is cached open connection, reuse it */ - + #ifdef EDG_WLL_CONNPOOL_DEBUG + else printf("Connection to %s:%d exists (No. %d in the pool) - reusing\n",ctx->srvName,ctx->srvPort,index); + #endif + *connToUse = index; /* XXX support anonymous connections, perhaps add a flag to the connPool @@ -225,7 +250,7 @@ err: *connToUse = -1; ok: - if (*connToUse>-1) edg_wll_connectionLock(ctx, *connToUse); /* Lock the connection */ + if (*connToUse>-1) edg_wll_connectionTryLock(ctx, *connToUse); /* Just to be sure we have not forgotten to lock it */ edg_wll_poolUnlock(); /* One way or the other, there are no more pool-wide operations */ @@ -374,6 +399,7 @@ int edg_wll_http_send_recv( assert(connToUse >= 0); gettimeofday(&ctx->connections->connPool[connToUse].lastUsed, NULL); + edg_wll_connectionUnlock(ctx, connToUse); return 0; err: -- 1.8.2.3