From 42d00d5f3f6c7375ec8ffa22e096396848b9f25d Mon Sep 17 00:00:00 2001 From: =?utf8?q?Zden=C4=9Bk=20=C5=A0ustr?= Date: Mon, 4 Sep 2006 09:35:34 +0000 Subject: [PATCH] Connection pool - threaded version (recommit :-( ) --- org.glite.lb.client/examples/user_jobs_threaded.c | 107 ++++++++++++++++++++++ org.glite.lb.common/interface/connpool.h | 13 ++- org.glite.lb.common/src/connpool.c | 65 +++++++++---- 3 files changed, 163 insertions(+), 22 deletions(-) create mode 100644 org.glite.lb.client/examples/user_jobs_threaded.c diff --git a/org.glite.lb.client/examples/user_jobs_threaded.c b/org.glite.lb.client/examples/user_jobs_threaded.c new file mode 100644 index 0000000..8d96ed3 --- /dev/null +++ b/org.glite.lb.client/examples/user_jobs_threaded.c @@ -0,0 +1,107 @@ +#ident "$Header$" + +#include +#include +#include + +#include + +#include "glite/lb/context.h" +#include "glite/lb/xml_conversions.h" +#include "glite/lb/consumer.h" + +int use_proxy = 0; + +int (*user_jobs)(edg_wll_Context, edg_wlc_JobId **, edg_wll_JobStat **); + + +void +usage(char *me) +{ + fprintf(stderr,"usage: %s [-h] [-x] [userid]\n", me); +} + +int main(int argc,char **argv) +{ + edg_wll_Context ctx; + char *errt,*errd; + edg_wlc_JobId *jobs = NULL; + edg_wll_JobStat *states = NULL; + int i,j; + char *owner = NULL; + + user_jobs = edg_wll_UserJobs; + for ( i = 1; i < argc; i++ ) { + if ( !strcmp(argv[i], "-h") || !strcmp(argv[i], "--help") ) { + usage(argv[0]); + exit(0); + } else if ( !strcmp(argv[i], "-x") ) { + user_jobs = edg_wll_UserJobsProxy; + continue; + } + + owner = strdup(argv[i]); + break; + } + + edg_wll_InitContext(&ctx); + if ( user_jobs == edg_wll_UserJobsProxy && owner ) + edg_wll_SetParam(ctx, EDG_WLL_PARAM_LBPROXY_USER, owner); + + if (user_jobs(ctx,&jobs,&states)) goto err; + for (i=0; states[i].state != EDG_WLL_JOB_UNDEF; i++) { + char *id = edg_wlc_JobIdUnparse(states[i].jobId), + *st = edg_wll_StatToString(states[i].state); + + if (!states[i].parent_job) { + if (states[i].jobtype == EDG_WLL_STAT_SIMPLE) { + printf(" %s .... %s %s\n", id, st, (states[i].state==EDG_WLL_JOB_DONE) ? edg_wll_done_codeToString(states[i].done_code) : "" ); + } + else if (states[i].jobtype == EDG_WLL_STAT_DAG) { + printf("DAG %s .... %s %s\n", id, st, (states[i].state==EDG_WLL_JOB_DONE) ? edg_wll_done_codeToString(states[i].done_code) : ""); + for (j=0; states[j].state != EDG_WLL_JOB_UNDEF; j++) { + if (states[j].parent_job) { + char *par_id = edg_wlc_JobIdUnparse(states[j].parent_job); + + if (!strcmp(id,par_id)) { + char *sub_id = edg_wlc_JobIdUnparse(states[j].jobId), + *sub_st = edg_wll_StatToString(states[j].state); + + printf(" `- %s .... %s %s\n", sub_id, sub_st, (states[j].state==EDG_WLL_JOB_DONE) ? edg_wll_done_codeToString(states[j].done_code) : ""); + free(sub_id); + free(sub_st); + } + free(par_id); + } + } + } + } + + free(id); + free(st); + } + + printf("\nFound %d jobs\n",i); + +err: + free(owner); + if (jobs) { + for (i=0; jobs[i]; i++) edg_wlc_JobIdFree(jobs[i]); + free(jobs); + } + + if (states) { + for (i=0; states[i].state; i++) edg_wll_FreeStatus(&states[i]); + free(states); + } + + if (edg_wll_Error(ctx,&errt,&errd)) { + fprintf(stderr,"%s: %s (%s)\n",argv[0],errt,errd); + edg_wll_FreeContext(ctx); + return 1; + } + + edg_wll_FreeContext(ctx); + return 0; +} + diff --git a/org.glite.lb.common/interface/connpool.h b/org.glite.lb.common/interface/connpool.h index f9c1df7..af874ac 100644 --- a/org.glite.lb.common/interface/connpool.h +++ b/org.glite.lb.common/interface/connpool.h @@ -2,13 +2,17 @@ #include "glite/lb/consumer.h" #include "lb_plain_io.h" #include "authz.h" -#include "log_proto.h" -#include +#include "glite/lb/log_proto.h" +#ifdef GLITE_LB_THREADED + #include +#endif #ifdef __cplusplus extern "C" { #endif +#define EDG_WLL_CONNPOOL_DEBUG + #ifndef EDG_WLL_CONNPOOL_DECLARED #define EDG_WLL_CONNPOOL_DECLARED 1 @@ -43,15 +47,18 @@ typedef struct _edg_wll_Connections { int connOpened; /* number of opened connections */ /* Connection pool locks & accessories.*/ +#ifdef GLITE_LB_THREADED pthread_mutex_t poolLock; /* Global pool lock (to be used for pool-wide operations carried out locally) */ pthread_mutex_t *connectionLock; /* Per-connection lock (used to lock out connections that are in use) */ +#endif edg_wll_Context *locked_by; /* identifies contexts that have been used to lock a connection since they do probably have a connToUse value stored in them*/ + } edg_wll_Connections; #endif -/* Connections Global Handle */ +/* Connections Global Handle */ extern edg_wll_Connections connectionsHandle; /** ** Locking Functions (wrappers) **/ diff --git a/org.glite.lb.common/src/connpool.c b/org.glite.lb.common/src/connpool.c index 1275b07..e094d28 100644 --- a/org.glite.lb.common/src/connpool.c +++ b/org.glite.lb.common/src/connpool.c @@ -1,19 +1,37 @@ #include "connpool.h" +#ifdef GLITE_LB_THREADED edg_wll_Connections connectionsHandle = { NULL , NULL , EDG_WLL_LOG_CONNECTIONS_DEFAULT , 0 , PTHREAD_MUTEX_INITIALIZER , NULL , NULL}; +#endif +#ifndef GLITE_LB_THREADED +edg_wll_Connections connectionsHandle = + { NULL , NULL , EDG_WLL_LOG_CONNECTIONS_DEFAULT , 0 , NULL}; +#endif /** Lock (try) the pool */ int edg_wll_poolTryLock() { int RetVal; - #ifdef THR - RetVal = pthread_mutex_trylock(&edg_wll_Connections.poolLock); + #ifdef GLITE_LB_THREADED /* Threaded version */ + RetVal = + pthread_mutex_trylock( + &connectionsHandle.poolLock + ); + + #ifdef EDG_WLL_CONNPOOL_DEBUG + printf("Thread %d tring to lock the pool - result %d\n",pthread_self(),RetVal); + #endif #endif - #ifndef THR + + #ifndef GLITE_LB_THREADED /* Single-thread version */ RetVal = 0; + + #ifdef EDG_WLL_CONNPOOL_DEBUG + printf("Dummy - Locking the pool (pthreads not included)\n"); + #endif #endif @@ -25,11 +43,16 @@ int RetVal; int edg_wll_poolLock() { int RetVal; - #ifdef THR - RetVal = pthread_mutex_lock(&edg_wll_Connections.poolLock); + #ifdef GLITE_LB_THREADED + RetVal = pthread_mutex_lock(&connectionsHandle.poolLock); + printf("Debug out edg_wll_poolLock #2\n"); + + #ifdef EDG_WLL_CONNPOOL_DEBUG + printf("Thread %d tring to lock the pool - result %d\n",pthread_self(),RetVal); + #endif #endif - #ifndef THR + #ifndef GLITE_LB_THREADED RetVal = 0; #endif @@ -42,11 +65,11 @@ int RetVal; int edg_wll_poolUnlock() { int RetVal; - #ifdef THR - RetVal = pthread_mutex_unlock(&edg_wll_Connections.poolLock); + #ifdef GLITE_LB_THREADED + RetVal = pthread_mutex_unlock(&connectionsHandle.poolLock); #endif - #ifndef THR + #ifndef GLITE_LB_THREADED RetVal = 0; #endif @@ -59,13 +82,13 @@ int RetVal; int edg_wll_connectionTryLock(edg_wll_Context ctx, int index) { int RetVal; - #ifdef THR - RetVal = pthread_mutex_trylock(&edg_wll_Connections.connectionLock[index]); /* Try to lock the connection */ + #ifdef GLITE_LB_THREADED + RetVal = pthread_mutex_trylock(&connectionsHandle.connectionLock[index]); /* Try to lock the connection */ if (!RetVal) connectionsHandle.locked_by[index] = (void*)ctx; /* If lock succeeded, store the locking context address */ #endif - #ifndef THR + #ifndef GLITE_LB_THREADED RetVal = 0; #endif @@ -78,14 +101,14 @@ int RetVal; int edg_wll_connectionLock(edg_wll_Context ctx, int index) { int RetVal; - #ifdef THR - RetVal = pthread_mutex_lock(&edg_wll_Connections.connectionLock[index]); /* Lock the connection (wait if + #ifdef GLITE_LB_THREADED + RetVal = pthread_mutex_lock(&connectionsHandle.connectionLock[index]); /* Lock the connection (wait if not available)*/ if (!RetVal) connectionsHandle.locked_by[index] = (void*)ctx; /* If lock succeeded, store the locking context address */ #endif - #ifndef THR + #ifndef GLITE_LB_THREADED RetVal = 0; #endif @@ -98,12 +121,12 @@ int RetVal; int edg_wll_connectionUnlock(edg_wll_Context ctx, int index) { int RetVal; - #ifdef THR - RetVal = pthread_mutex_unlock(&edg_wll_Connections.connectionLock[index]); + #ifdef GLITE_LB_THREADED + RetVal = pthread_mutex_unlock(&connectionsHandle.connectionLock[index]); if (!RetVal) connectionsHandle.locked_by[index] = NULL; #endif - #ifndef THR + #ifndef GLITE_LB_THREADED RetVal = 0; #endif @@ -131,11 +154,13 @@ void edg_wll_poolFree() { } edg_wll_poolLock(); +#ifdef GLITE_LB_THREADED free(connectionsHandle.connectionLock); + connectionsHandle.connectionLock = NULL; +#endif free(connectionsHandle.serverConnection); free(connectionsHandle.connPool); free(connectionsHandle.locked_by); - connectionsHandle.connectionLock = NULL; connectionsHandle.serverConnection = NULL; connectionsHandle.connPool = NULL; connectionsHandle.locked_by = NULL; @@ -150,7 +175,9 @@ edg_wll_Connections* edg_wll_initConnections() { if((connectionsHandle.connPool == NULL) && (connectionsHandle.poolSize > 0)) { /* We need to allocate memory for the connPool and connectionLock arrays */ connectionsHandle.connPool = (edg_wll_ConnPool *) calloc(connectionsHandle.poolSize, sizeof(edg_wll_ConnPool)); +#ifdef GLITE_LB_THREADED connectionsHandle.connectionLock = (pthread_mutex_t *) calloc(connectionsHandle.poolSize, sizeof(pthread_mutex_t)); +#endif connectionsHandle.locked_by = (edg_wll_Context) calloc(connectionsHandle.poolSize, sizeof(edg_wll_Context)); } -- 1.8.2.3