Connection Pool improvements, allowing "holes" in the pool.
authorZdeněk Šustr <sustr4@cesnet.cz>
Thu, 21 Sep 2006 15:40:41 +0000 (15:40 +0000)
committerZdeněk Šustr <sustr4@cesnet.cz>
Thu, 21 Sep 2006 15:40:41 +0000 (15:40 +0000)
org.glite.lb.client/Makefile
org.glite.lb.client/examples/user_jobs_threaded.c
org.glite.lb.client/src/connection.c

index ba29ed7..81a459f 100644 (file)
@@ -151,7 +151,8 @@ THRPLUSLIB:=libglite_lb_clientpp_${thrflavour}.la
 TOOLS:=dump load purge lb_dump_exporter ${LB_PERF_TOOLS}
 EXAMPLES:=log_usertag_proxy job_log job_reg feed_shark notify query_ext query_seq_code stats abort_job change_acl stresslog
 
-EXAMPLES_CL=user_jobs job_status user_jobs_threaded 
+EXAMPLES_CL=user_jobs job_status 
+EXAMPLES_CL_THR=user_jobs_threaded
 FAKE_EXAMPLES:=job_log_fake
 
 MAN_GZ:=glite-lb-logevent.1.gz
@@ -200,6 +201,9 @@ ${TOOLS} ${EXAMPLES}: %: %.o
 ${EXAMPLES_CL}: %: %.o
        ${LINK} -o $@ $< ${LIB} ${COMMON_LIB} ${EXT_LIB} 
 
+${EXAMPLES_CL_THR}: %: %.o
+       ${LINK} -o $@ $< ${THRLIB} ${COMMON_LIB_THR} ${EXT_LIB} 
+
 ${FAKE_EXAMPLES}: %: %.o ${FAKELIB}
        ${LINK} -o $@ $< ${FAKELIB} ${TEST_LIBS} ${EXT_LIB} 
 
@@ -259,7 +263,7 @@ else
 compile all: ${LIB} ${THRLIB} ${TOOLS} logevent ${PLUSLIB} ${THRPLUSLIB} examples ${MAN_GZ}
 endif
 
-examples: ${EXAMPLES} ${EXAMPLES_CL} ${sh_PROGS}
+examples: ${EXAMPLES} ${EXAMPLES_CL} ${EXAMPLES_CL_THR} ${sh_PROGS}
 
 fake: ${FAKE_EXAMPLES}
 
@@ -313,7 +317,7 @@ endif
        for p in ${TOOLS} ; do \
                ${INSTALL} -m 755 "$$p" "${PREFIX}/sbin/glite-lb-$$p"; \
        done
-       for p in ${EXAMPLES} ${EXAMPLES_CL} ${sh_PROGS} ; do \
+       for p in ${EXAMPLES} ${EXAMPLES_CL} ${EXAMPLES_CL_THR} ${sh_PROGS} ; do \
                ${INSTALL} -m 755 "$$p" "${PREFIX}/examples/glite-lb-$$p"; \
        done
        ${INSTALL} -m 755 ${top_srcdir}/examples/export.sh "${PREFIX}/examples/glite-lb-export.sh"
index 8d96ed3..47e0b05 100644 (file)
@@ -6,6 +6,8 @@
 
 #include <expat.h>
 
+#include <pthread.h>
+
 #include "glite/lb/context.h"
 #include "glite/lb/xml_conversions.h"
 #include "glite/lb/consumer.h"
@@ -21,32 +23,28 @@ usage(char *me)
        fprintf(stderr,"usage: %s [-h] [-x] [userid]\n", me);
 }
 
-int main(int argc,char **argv)
-{
+typedef struct {
+    char *owner;
+    int proxy;
+    char *argv_0;} thread_code_args;
+
+void *thread_code(thread_code_args *arguments) {
+
        edg_wll_Context ctx;
        char            *errt,*errd;
        edg_wlc_JobId           *jobs = NULL;
        edg_wll_JobStat         *states = NULL;
        int             i,j;
-       char            *owner = NULL;
 
        user_jobs = edg_wll_UserJobs;
-       for ( i = 1; i < argc; i++ ) {
-               if ( !strcmp(argv[i], "-h") || !strcmp(argv[i], "--help") ) {
-                       usage(argv[0]);
-                       exit(0);
-               } else if ( !strcmp(argv[i], "-x") ) {
-                       user_jobs = edg_wll_UserJobsProxy;
-                       continue;
-               }
 
-               owner = strdup(argv[i]);
-               break;  
-       }
+       if (arguments->proxy) {
+            user_jobs = edg_wll_UserJobsProxy;
+        }
 
        edg_wll_InitContext(&ctx);
-       if ( user_jobs == edg_wll_UserJobsProxy  && owner )
-               edg_wll_SetParam(ctx, EDG_WLL_PARAM_LBPROXY_USER, owner);
+       if ( user_jobs == edg_wll_UserJobsProxy  && arguments->owner )
+               edg_wll_SetParam(ctx, EDG_WLL_PARAM_LBPROXY_USER, arguments->owner);
 
        if (user_jobs(ctx,&jobs,&states)) goto err;
        for (i=0; states[i].state != EDG_WLL_JOB_UNDEF; i++) {  
@@ -84,7 +82,7 @@ int main(int argc,char **argv)
        printf("\nFound %d jobs\n",i);
 
 err:
-       free(owner);
+       free(arguments->owner);
        if  (jobs) {
                for (i=0; jobs[i]; i++)  edg_wlc_JobIdFree(jobs[i]);    
                free(jobs);
@@ -96,12 +94,77 @@ err:
        }
 
        if (edg_wll_Error(ctx,&errt,&errd)) {
-               fprintf(stderr,"%s: %s (%s)\n",argv[0],errt,errd);
+               fprintf(stderr,"%s: %s (%s)\n",arguments->argv_0,errt,errd);
                edg_wll_FreeContext(ctx);
-               return 1;
+                pthread_exit(NULL);
        }
 
        edg_wll_FreeContext(ctx);
-       return 0;
+
+        printf("Thread %d exitting\n",(int)pthread_self());
+
+        pthread_exit(NULL);
+
+        printf("Thread %d out !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!\n",(int)pthread_self());
+//        return 0;
+}
+
+int main(int argc,char **argv)
+{
+
+        #define NUM_THREADS 10
+
+        thread_code_args arguments = { NULL , 0 , NULL };
+       int     i,rc,status;
+        pthread_t threads[NUM_THREADS];
+       pthread_attr_t attr;
+
+        pthread_attr_init(&attr);
+        pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE);
+
+       for ( i = 1; i < argc; i++ ) {
+               if ( !strcmp(argv[i], "-h") || !strcmp(argv[i], "--help") ) {
+                       usage(argv[0]);
+                       exit(0);
+               }
+                else if ( !strcmp(argv[i], "-x") ) {
+                    arguments.proxy = 1;
+               }
+
+               arguments.owner = strdup(argv[i]);
+       }
+
+        arguments.argv_0 = argv[0];
+
+
+        for (i=0;i<NUM_THREADS;i++) {
+            printf("Running thread %d\n",i);
+            rc = pthread_create(&threads[i], &attr, (void*)thread_code, &arguments );
+            if (rc){
+                printf("ERROR; return code from pthread_create() is %d\n", rc);
+                exit(-1);
+            }
+        }
+
+
+
+//        thread_code(&arguments);
+
+//        pthread_exit(NULL)
+
+        pthread_attr_destroy(&attr);
+
+        for (i=0;i<NUM_THREADS;i++) {
+            printf("*** Joining thread %d.\n",i);
+            rc=pthread_join(threads[i],(void **)&status);
+            printf("*** Thread %d joined. (status %d ret. code %d)\n",i,status,rc);
+
+        }
+
+        printf("Threaded example main() exitting\n");
+
+
+//        exit(0);
+//        pthread_exit(NULL);
 }
 
index 603be1a..a40210b 100644 (file)
@@ -25,7 +25,7 @@ static void CloseConnection(edg_wll_Context ctx, int* conn_index)
        OM_uint32 min_stat;
        int cIndex;
 
-        cIndex = conn_index[0];
+        cIndex = *conn_index;
 
        assert(ctx->connections->connOpened);
        assert(cIndex < ctx->connections->connOpened);
@@ -40,13 +40,18 @@ static void CloseConnection(edg_wll_Context ctx, int* conn_index)
        
        /* if deleted conn was not the last one -> there is a 'hole' and then   */
        /* 'shake' together connections in pool, no holes are allowed           */
-       if (cIndex < ctx->connections->connOpened - 1) {        
+       /* */
+       /* This principle is unsuitable for multi-threaded applications. Too much waiting for connections */
+       /* to unlock. We need to allow "holes" in the pool. */
+
+/*     if (cIndex < ctx->connections->connOpened - 1) {        
                ctx->connections->connPool[cIndex] = ctx->connections->connPool[ctx->connections->connOpened - 1];
                memset(ctx->connections->connPool + ctx->connections->connOpened  - 1 , 0, sizeof(edg_wll_ConnPool));
-       }
+       }*/
+
        ctx->connections->connOpened--;
 
-        conn_index[0] = cIndex;
+        *conn_index = cIndex;
 }
 
 
@@ -55,8 +60,9 @@ static int ConnectionIndex(edg_wll_Context ctx, const char *name, int port)
 {
        int i;
 
-        for (i=0; i<ctx->connections->connOpened;i++) { 
-               if (!strcmp(name, ctx->connections->connPool[i].peerName) &&
+        for (i=0; i<ctx->connections->poolSize;i++) { 
+               if ((ctx->connections->connPool[i].peerName != NULL) &&
+                    !strcmp(name, ctx->connections->connPool[i].peerName) &&
                   (port == ctx->connections->connPool[i].peerPort)) {
 
                        /* TryLock (next line) is in fact used only 
@@ -88,8 +94,19 @@ static int ConnectionIndex(edg_wll_Context ctx, const char *name, int port)
 
 static int AddConnection(edg_wll_Context ctx, char *name, int port)
 {
-       int index = ctx->connections->connOpened;
-       
+       int i,index = -1;
+
+        for (i = 0; i < ctx->connections->poolSize; i++) {
+            if (ctx->connections->connPool[i].peerName == NULL) {
+                if (!edg_wll_connectionTryLock(ctx, i)) {
+                   index = i;  // This connection is free and was not locked. We may lock it and use it.
+                   break;
+                }
+           }
+       }
+
+       if (index < 0) return -1;
+
        free(ctx->connections->connPool[index].peerName);       // should be empty; just to be sure
        ctx->connections->connPool[index].peerName = strdup(ctx->srvName);
        ctx->connections->connPool[index].peerPort = ctx->srvPort;
@@ -112,9 +129,11 @@ static void ReleaseConnection(edg_wll_Context ctx, char *name, int port)
                if ((index = ConnectionIndex(ctx, name, port)) >= 0)
                        CloseConnection(ctx, &index);
        }
-       else {                                  /* free the oldest connection*/
+       else {                                  /* free the oldest (unlocked) connection */
+               assert(ctx->connections->connPool[0].peerName);  // Full pool expected - accept non-NULL values only
                min = ctx->connections->connPool[0].lastUsed.tv_sec;
-               for (i=0; i<ctx->connections->connOpened; i++) {
+               for (i=0; i<ctx->connections->poolSize; i++) {
+                       assert(ctx->connections->connPool[i].peerName); // Full pool expected - accept non-NULL values only
                        if (ctx->connections->connPool[i].lastUsed.tv_sec < min) {
                                min = ctx->connections->connPool[i].lastUsed.tv_sec;
                                index = i;
@@ -166,10 +185,16 @@ int edg_wll_open(edg_wll_Context ctx, int* connToUse)
                        ReleaseConnection(ctx, NULL, 0);
                
                index = AddConnection(ctx, ctx->srvName, ctx->srvPort);
+                #ifdef EDG_WLL_CONNPOOL_DEBUG  
+                    printf("Connection to %s:%d opened as No. %d in the pool\n",ctx->srvName,ctx->srvPort,index);
+                #endif
                
        }
        /* else - there is cached open connection, reuse it */
-       
+        #ifdef EDG_WLL_CONNPOOL_DEBUG  
+            else printf("Connection to %s:%d exists (No. %d in the pool) - reusing\n",ctx->srvName,ctx->srvPort,index);
+        #endif
+
        *connToUse = index;
        
        /* XXX support anonymous connections, perhaps add a flag to the connPool
@@ -225,7 +250,7 @@ err:
        *connToUse = -1;
 ok:    
 
-        if (*connToUse>-1) edg_wll_connectionLock(ctx, *connToUse); /* Lock the connection */
+        if (*connToUse>-1) edg_wll_connectionTryLock(ctx, *connToUse); /* Just to be sure we have not forgotten to lock it */
 
        edg_wll_poolUnlock(); /* One way or the other, there are no more pool-wide operations */
 
@@ -374,6 +399,7 @@ int edg_wll_http_send_recv(
        
        assert(connToUse >= 0);
        gettimeofday(&ctx->connections->connPool[connToUse].lastUsed, NULL);
+        edg_wll_connectionUnlock(ctx, connToUse);
        return 0;
 
 err: