merged connpool branch
authorAleš Křenek <ljocha@ics.muni.cz>
Tue, 14 Nov 2006 17:39:11 +0000 (17:39 +0000)
committerAleš Křenek <ljocha@ics.muni.cz>
Tue, 14 Nov 2006 17:39:11 +0000 (17:39 +0000)
20 files changed:
org.glite.lb.client-interface/interface/context.h
org.glite.lb.client/Makefile
org.glite.lb.client/examples/user_jobs_threaded.c [new file with mode: 0644]
org.glite.lb.client/src/connection.c
org.glite.lb.client/src/connection.h
org.glite.lb.common/Makefile
org.glite.lb.common/interface/connpool.h [new file with mode: 0644]
org.glite.lb.common/interface/context-int.h
org.glite.lb.common/interface/log_proto.h
org.glite.lb.common/interface/mini_http.h
org.glite.lb.common/src/connpool.c [new file with mode: 0644]
org.glite.lb.common/src/context.c
org.glite.lb.common/src/mini_http.c
org.glite.lb.common/src/param.c
org.glite.lb.server-bones/src/srvbones.c
org.glite.lb.server/src/bkserverd.c
org.glite.lb.server/src/lb_http.c
org.glite.lb.server/src/notification.c
org.glite.lb.server/src/srv_purge.c
org.glite.lb.server/src/stored_master.c

index 747c44c..9c78a55 100644 (file)
@@ -43,7 +43,7 @@ typedef enum _edg_wll_ContextParam {
        EDG_WLL_PARAM_QUERY_JOBS_LIMIT, /**< maximal query jobs result size */
        EDG_WLL_PARAM_QUERY_EVENTS_LIMIT,/**< maximal query events result size */
        EDG_WLL_PARAM_QUERY_RESULTS,    /**< maximal query result size */
-       EDG_WLL_PARAM_QUERY_CONNECTIONS,/**< maximal number of open connections in ctx->connPoll */
+       EDG_WLL_PARAM_CONNPOOL_SIZE,    /**< maximal number of open connections in connectionsHandle.poolSize */
        EDG_WLL_PARAM_NOTIF_SERVER,     /**< default notification server name */
        EDG_WLL_PARAM_NOTIF_SERVER_PORT,/**< default notification server port */
        EDG_WLL_PARAM_NOTIF_TIMEOUT,    /**< notif timeout */
index 63b749c..f1118cd 100644 (file)
@@ -136,7 +136,8 @@ TOOLS:=dump load purge lb_dump_exporter ${LB_PERF_TOOLS}
 EXAMPLES:=log_usertag_proxy job_log job_reg feed_shark notify query_ext query_seq_code stats abort_job change_acl stresslog
 EXAMPLES:=log_usertag_proxy job_log job_reg feed_shark notify query_ext query_seq_code stats abort_job change_acl stresslog lbmon flood_proxy dagids stress_context
 
-EXAMPLES_CL=user_jobs job_status
+EXAMPLES_CL=user_jobs job_status 
+EXAMPLES_CL_THR=user_jobs_threaded
 FAKE_EXAMPLES:=job_log_fake
 
 MAN_GZ:=glite-lb-logevent.1.gz
@@ -185,6 +186,9 @@ ${TOOLS} ${EXAMPLES}: %: %.o
 ${EXAMPLES_CL}: %: %.o
        ${LINK} -o $@ $< ${LIB} ${COMMON_LIB} ${EXT_LIB} 
 
+${EXAMPLES_CL_THR}: %: %.o
+       ${LINK} -o $@ $< ${THRLIB} ${COMMON_LIB_THR} ${EXT_LIB} 
+
 ${FAKE_EXAMPLES}: %: %.o ${FAKELIB}
        ${LINK} -o $@ $< ${FAKELIB} ${TEST_LIBS} ${EXT_LIB} 
 
@@ -244,7 +248,7 @@ else
 compile all: check_version ${LIB} ${THRLIB} ${TOOLS} logevent ${PLUSLIB} ${THRPLUSLIB} examples ${MAN_GZ}
 endif
 
-examples: ${EXAMPLES} ${EXAMPLES_CL} ${sh_PROGS}
+examples: ${EXAMPLES} ${EXAMPLES_CL} ${EXAMPLES_CL_THR} ${sh_PROGS}
 
 fake: ${FAKE_EXAMPLES}
 
@@ -298,7 +302,7 @@ endif
        for p in ${TOOLS} ; do \
                ${INSTALL} -m 755 "$$p" "${PREFIX}/sbin/glite-lb-$$p"; \
        done
-       for p in ${EXAMPLES} ${EXAMPLES_CL} ${sh_PROGS} ; do \
+       for p in ${EXAMPLES} ${EXAMPLES_CL} ${EXAMPLES_CL_THR} ${sh_PROGS} ; do \
                ${INSTALL} -m 755 "$$p" "${PREFIX}/examples/glite-lb-$$p"; \
        done
        ${INSTALL} -m 755 ${top_srcdir}/src/export.sh "${PREFIX}/sbin/glite-lb-export.sh"
diff --git a/org.glite.lb.client/examples/user_jobs_threaded.c b/org.glite.lb.client/examples/user_jobs_threaded.c
new file mode 100644 (file)
index 0000000..068fecd
--- /dev/null
@@ -0,0 +1,225 @@
+#ident "$Header$"
+
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+#include <unistd.h>
+#include <getopt.h>
+
+#include <expat.h>
+
+#include <pthread.h>
+#include <globus_common.h>
+
+#include "glite/lb/context.h"
+#include "glite/lb/xml_conversions.h"
+#include "glite/lb/consumer.h"
+
+int use_proxy = 0;
+
+int (*user_jobs)(edg_wll_Context, edg_wlc_JobId **, edg_wll_JobStat **);
+
+
+
+static const char *get_opt_string = "hxj:t:p:r:s:w:";
+
+static struct option opts[] = {
+       {"help",        0, NULL,        'h'},
+       {"proxy",       0, NULL,        'x'},
+       {"owner",       1, NULL,        'o'},
+       {"num-threads", 1, NULL,        't'},
+       {"port_range",  1, NULL,        'p'},
+       {"repeat",      1, NULL,        'r'},
+       {"rand-start",  1, NULL,        's'},
+//     {"rand-work",   1, NULL,        'w'},
+       {NULL,          0, NULL,        0}
+};
+
+
+
+static void usage(char *me) 
+{
+       fprintf(stderr,"usage: %s [option]\n"
+               "\t-h, --help\t show this help\n"
+               "\t-x, --proxy\t contact proxy (not implemented yet)\n"
+               "\t-o, --owner DN\t show jobs of user with this DN\n"
+               "\t-t, --num-threads N\t number for threads to create\n"
+               "\t-p, --port-range N\t connect to server:port, server:port+10, \n"
+               "\t\t\t ... server:port+N*10 bkservers\n"
+               "\t-r, --repeat N\t repeat query in each slave N-times \n"
+               "\t-s, --rand-start N\t start threads in random interval <0,N> sec\n"
+//             "\t-w, --rand-work N\t simulate random server respose time <0,N> sec\n"
+               "\n"
+       ,me);
+}
+
+
+typedef struct {
+    char *owner;
+    int proxy;
+    int rand_start;
+    int port_range;
+    int repeat;
+    char *argv_0;} thread_code_args;
+
+void *thread_code(thread_code_args *arguments) {
+
+       edg_wll_Context ctx;
+       char            *errt,*errd;
+       edg_wlc_JobId           *jobs = NULL;
+       edg_wll_JobStat         *states = NULL;
+       int             i,j,k,port;
+       long            sl;
+
+
+       sl = (unsigned long) ((double) random()/ RAND_MAX * arguments->rand_start * 1000000);
+       printf("Thread [%d] - sleeping for %ld us\n",pthread_self(),sl);
+       usleep( sl );
+
+       user_jobs = edg_wll_UserJobs;
+
+       if (arguments->proxy) {
+            user_jobs = edg_wll_UserJobsProxy;
+        }
+
+       edg_wll_InitContext(&ctx);
+       if ( user_jobs == edg_wll_UserJobsProxy  && arguments->owner )
+               edg_wll_SetParam(ctx, EDG_WLL_PARAM_LBPROXY_USER, arguments->owner);
+
+       edg_wll_GetParam(ctx, EDG_WLL_PARAM_QUERY_SERVER_PORT, &port);
+       // pthread_self tend to return even number, so /7 makes some odd numbers...
+       edg_wll_SetParam(ctx, EDG_WLL_PARAM_QUERY_SERVER_PORT, 
+               port + ((long) pthread_self()/7 % arguments->port_range)*10 );
+
+       for (k=0; k<arguments->repeat; k++) {
+               if (user_jobs(ctx,&jobs,&states)) goto err;
+               for (i=0; states[i].state != EDG_WLL_JOB_UNDEF; i++) {  
+                       char *id = edg_wlc_JobIdUnparse(states[i].jobId),
+                            *st = edg_wll_StatToString(states[i].state);
+                       
+                       if (!states[i].parent_job) {
+                               if (states[i].jobtype == EDG_WLL_STAT_SIMPLE) { 
+                                       printf("      %s .... %s %s\n", id, st, 
+                                               (states[i].state==EDG_WLL_JOB_DONE) ? 
+                                               edg_wll_done_codeToString(states[i].done_code) : "" );
+                               }
+                               else if (states[i].jobtype == EDG_WLL_STAT_DAG) {
+                                       printf("DAG   %s .... %s %s\n", id, st, 
+                                               (states[i].state==EDG_WLL_JOB_DONE) ?
+                                               edg_wll_done_codeToString(states[i].done_code) : "");
+                                       for (j=0; states[j].state != EDG_WLL_JOB_UNDEF; j++) {
+                                               if (states[j].parent_job) {
+                                                       char *par_id = edg_wlc_JobIdUnparse(states[j].parent_job);
+                                                       
+                                                       if (!strcmp(id,par_id)) {
+                                                               char *sub_id = edg_wlc_JobIdUnparse(states[j].jobId),
+                                                                    *sub_st = edg_wll_StatToString(states[j].state);
+                                                               
+                                                               printf(" `-       %s .... %s %s\n", sub_id, sub_st, 
+                                                                       (states[j].state==EDG_WLL_JOB_DONE) ? 
+                                                                       edg_wll_done_codeToString(states[j].done_code) : "");
+                                                               free(sub_id);
+                                                               free(sub_st);
+                                                       }
+                                                       free(par_id);
+                                               }       
+                                       }
+                               }
+                       }
+                               
+                       free(id);
+                       free(st);
+               }
+
+               printf("\nFound %d jobs\n",i);
+       }
+err:
+       free(arguments->owner);
+       if  (jobs) {
+               for (i=0; jobs[i]; i++)  edg_wlc_JobIdFree(jobs[i]);    
+               free(jobs);
+       }
+
+       if  (states) {
+               for (i=0; states[i].state; i++)  edg_wll_FreeStatus(&states[i]);        
+               free(states);
+       }
+
+       if (edg_wll_Error(ctx,&errt,&errd)) {
+               fprintf(stderr,"%s: %s (%s)\n",arguments->argv_0,errt,errd);
+               edg_wll_FreeContext(ctx);
+                pthread_exit(NULL);
+       }
+
+       edg_wll_FreeContext(ctx);
+
+//        printf("Thread %d exitting\n",(int)pthread_self());
+
+        pthread_exit(NULL);
+
+}
+
+int main(int argc,char **argv)
+{
+        thread_code_args arguments = { NULL, 0, 0, 1, 1,NULL };
+       int     i,rc,status,opt;
+       int     thr_num = 10;   // default
+
+
+        while ((opt = getopt_long(argc,argv,get_opt_string,opts,NULL)) != EOF) switch (opt) {
+                case 'x': arguments.proxy = 1; break;
+                case 'o': arguments.owner = optarg; break;
+                case 't': thr_num = atoi(optarg); break;
+                case 'p': arguments.port_range = atoi(optarg); break;
+                case 'r': arguments.repeat = atoi(optarg); break;
+                case 's': arguments.rand_start = atoi(optarg); break;
+                default : usage(argv[0]); exit(0); break;
+        }
+
+        arguments.argv_0 = argv[0];
+
+       /* Do a thready work */
+       {
+               pthread_t threads[thr_num];
+               pthread_attr_t attr;
+
+               if (globus_module_activate(GLOBUS_COMMON_MODULE) != GLOBUS_SUCCESS)   {
+                       fputs("globus_module_activate()\n",stderr);
+                       return 1;
+               }
+
+               pthread_attr_init(&attr);
+               pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE);
+       
+               for (i=0;i<thr_num;i++) {
+                   printf("Running thread %d\n",i);
+                   rc = pthread_create(&threads[i], &attr, (void*)thread_code, &arguments );
+                   if (rc){
+                       printf("ERROR; return code from pthread_create() is %d\n", rc);
+                       exit(-1);
+                   }
+               }
+
+
+
+               //        thread_code(&arguments);
+
+               //        pthread_exit(NULL)
+
+               pthread_attr_destroy(&attr);
+
+               for (i=0;i<thr_num;i++) {
+               //            printf("*** Joining thread %d.\n",i);
+                   rc=pthread_join(threads[i],(void **)&status);
+               //            printf("*** Thread %d joined. (status %d ret. code %d)\n",i,status,rc);
+
+               }
+       }
+
+        printf("Threaded example main() exitting\n");
+
+
+//        exit(0);
+//        pthread_exit(NULL);
+}
+
index 4230b07..809f521 100644 (file)
 #include "glite/lb/consumer.h"
 #include "glite/lb/context-int.h"
 #include "glite/lb/mini_http.h"
+#include "glite/lb/connpool.h"
 
 #include "connection.h"
 
-static void CloseConnection(edg_wll_Context ctx, int conn_index)
+
+static void CloseConnection(edg_wll_Context ctx, int* conn_index)
 {
        /* close connection ad free its structures */
        OM_uint32 min_stat;
+       int cIndex;
+
+        cIndex = *conn_index;
 
-       assert(ctx->connOpened);
-       assert(conn_index < ctx->connOpened);
+       assert(ctx->connections->connOpened);
 
-       edg_wll_gss_close(&ctx->connPool[conn_index].gss, &ctx->p_tmp_timeout);
-       if (ctx->connPool[conn_index].gsiCred) 
-               gss_release_cred(&min_stat, &ctx->connPool[conn_index].gsiCred);
-       free(ctx->connPool[conn_index].peerName);
-       free(ctx->connPool[conn_index].buf);
+       edg_wll_gss_close(&ctx->connections->connPool[cIndex].gss, &ctx->p_tmp_timeout);
+       if (ctx->connections->connPool[cIndex].gsiCred) 
+               gss_release_cred(&min_stat, &ctx->connections->connPool[cIndex].gsiCred);
+       free(ctx->connections->connPool[cIndex].peerName);
+       free(ctx->connections->connPool[cIndex].buf);
        
-       memset(ctx->connPool + conn_index, 0, sizeof(edg_wll_ConnPool));
+       memset(ctx->connections->connPool + cIndex, 0, sizeof(edg_wll_ConnPool));
        
-       /* if deleted conn was not the last one -> there is a 'hole' and then   */
-       /* 'shake' together connections in pool, no holes are allowed           */
-       if (conn_index < ctx->connOpened - 1) { 
-               ctx->connPool[conn_index] = ctx->connPool[ctx->connOpened - 1];
-               memset(ctx->connPool + ctx->connOpened  - 1 , 0, sizeof(edg_wll_ConnPool));
-       }
-       ctx->connOpened--;
+       ctx->connections->connOpened--;
+
+        *conn_index = cIndex;
 }
 
 
@@ -50,10 +50,33 @@ static int ConnectionIndex(edg_wll_Context ctx, const char *name, int port)
 {
        int i;
 
-        for (i=0; i<ctx->connOpened;i++) 
-               if (!strcmp(name, ctx->connPool[i].peerName) &&
-                   (port == ctx->connPool[i].peerPort)) return i;
-                                               
+        for (i=0; i<ctx->connections->poolSize;i++) { 
+               if ((ctx->connections->connPool[i].peerName != NULL) &&
+                    !strcmp(name, ctx->connections->connPool[i].peerName) &&
+                  (port == ctx->connections->connPool[i].peerPort)) {
+
+                       /* TryLock (next line) is in fact used only 
+                          to check the mutex status */
+                       switch (edg_wll_connectionTryLock(ctx, i)) {
+                       case 0: 
+                               /* Connection was not locked but now it is. Since we do not
+                                  really know wheter we are interested in that connection, we
+                                  are simply unlocking it now. */
+                               edg_wll_connectionUnlock(ctx, i);
+                               return i;
+
+                       case EBUSY:
+                               /* Connection locked. Do not consider it */
+                               // try to find another free connection
+                               break;
+                       default:
+                               /* Some obscure error occured. Need inspection */
+                               perror("ConnectionIndex() - locking problem \n");
+                               assert(0);
+                       }
+               }
+       }
+       
        return -1;
 }
 
@@ -61,52 +84,79 @@ static int ConnectionIndex(edg_wll_Context ctx, const char *name, int port)
 
 static int AddConnection(edg_wll_Context ctx, char *name, int port)
 {
-       int index = ctx->connOpened;
-       
-       free(ctx->connPool[index].peerName);    // should be empty; just to be sure
-       ctx->connPool[index].peerName = strdup(ctx->srvName);
-       ctx->connPool[index].peerPort = ctx->srvPort;
-       ctx->connOpened++;
+       int i,index = -1;
+
+        for (i = 0; i < ctx->connections->poolSize; i++) {
+            if (ctx->connections->connPool[i].peerName == NULL) {
+                if (!edg_wll_connectionTryLock(ctx, i)) {
+                   index = i;  // This connection is free and was not locked. We may lock it and use it.
+                   break;
+                }
+           }
+       }
+
+       if (index < 0) return -1;
+
+       free(ctx->connections->connPool[index].peerName);       // should be empty; just to be sure
+       ctx->connections->connPool[index].peerName = strdup(ctx->srvName);
+       ctx->connections->connPool[index].peerPort = ctx->srvPort;
+       ctx->connections->connOpened++;
 
        return index;
 }
 
 
 
-static void ReleaseConnection(edg_wll_Context ctx, char *name, int port)
+static int ReleaseConnection(edg_wll_Context ctx, char *name, int port)
 {
-       int i, index = 0;
+       int i, index = 0, foundConnToDrop = 0;
        long min;
 
 
-       if (ctx->connOpened == 0) return;       /* nothing to release */
+       edg_wll_ResetError(ctx);
+       if (ctx->connections->connOpened == 0) return 0;        /* nothing to release */
        
        if (name) {
                if ((index = ConnectionIndex(ctx, name, port)) >= 0)
-                       CloseConnection(ctx, index);
+                       CloseConnection(ctx, &index);
        }
-       else {                                  /* free the oldest connection*/
-               min = ctx->connPool[0].lastUsed.tv_sec;
-               for (i=0; i<ctx->connOpened; i++) {
-                       if (ctx->connPool[i].lastUsed.tv_sec < min) {
-                               min = ctx->connPool[i].lastUsed.tv_sec;
-                               index = i;
+       else {                                  /* free the oldest (unlocked) connection */
+               for (i=0; i<ctx->connections->poolSize; i++) {
+                       assert(ctx->connections->connPool[i].peerName); // Full pool expected - accept non-NULL values only
+                       if (!edg_wll_connectionTryLock(ctx, i)) {
+                               edg_wll_connectionUnlock(ctx, i);       // Connection unlocked. Consider releasing it
+                               if (foundConnToDrop) {          // This is not the first unlocked connection
+                                       if (ctx->connections->connPool[i].lastUsed.tv_sec < min) {
+                                               min = ctx->connections->connPool[i].lastUsed.tv_sec;
+                                               index = i;
+                                               foundConnToDrop++;
+                                       }
+                               }
+                               else {  // This is the first unlocked connection we have found.
+                                       foundConnToDrop++;
+                                       index = i;
+                                       min = ctx->connections->connPool[i].lastUsed.tv_sec;
+                               }
                        }
                }
-               CloseConnection(ctx, index);
+               if (!foundConnToDrop) return edg_wll_SetError(ctx,EAGAIN,"all connections in the connection pool are locked");
+               CloseConnection(ctx, &index);
        }
+       return edg_wll_Error(ctx,NULL,NULL);
 }
                        
 
 
-int edg_wll_close(edg_wll_Context ctx)
+int edg_wll_close(edg_wll_Context ctx, int* connToUse)
 {
        edg_wll_ResetError(ctx);
-       if (ctx->connToUse == -1) return 0;
+       if (*connToUse == -1) return 0;
 
-       CloseConnection(ctx, ctx->connToUse);
+       CloseConnection(ctx, connToUse);
+
+        edg_wll_connectionUnlock(ctx, *connToUse); /* Forgetting the conn. Unlocking is safe. */
                
-       ctx->connToUse = -1;
+       *connToUse = -1;
        return edg_wll_Error(ctx,NULL,NULL);
 }
 
@@ -121,7 +171,7 @@ int edg_wll_close_proxy(edg_wll_Context ctx)
 
 
 
-int edg_wll_open(edg_wll_Context ctx)
+int edg_wll_open(edg_wll_Context ctx, int* connToUse)
 {
        int index;
        edg_wll_GssStatus gss_stat;
@@ -129,34 +179,47 @@ int edg_wll_open(edg_wll_Context ctx)
 
        edg_wll_ResetError(ctx);
 
+        edg_wll_poolLock(); /* We are going to search the pool, it has better be locked */
+
        if ( (index = ConnectionIndex(ctx, ctx->srvName, ctx->srvPort)) == -1 ) {
                /* no such open connection in pool */
-               if (ctx->connOpened == ctx->poolSize)
-                       ReleaseConnection(ctx, NULL, 0);
+               if (ctx->connections->connOpened == ctx->connections->poolSize)
+                       if(ReleaseConnection(ctx, NULL, 0)) goto end;
                
                index = AddConnection(ctx, ctx->srvName, ctx->srvPort);
+               if (index < 0) {
+                    edg_wll_SetError(ctx,EAGAIN,"connection pool size exceeded");
+                   goto end;
+               }
+
+                #ifdef EDG_WLL_CONNPOOL_DEBUG  
+                    printf("Connection to %s:%d opened as No. %d in the pool\n",ctx->srvName,ctx->srvPort,index);
+                #endif
                
        }
        /* else - there is cached open connection, reuse it */
-       
-       ctx->connToUse = index;
+        #ifdef EDG_WLL_CONNPOOL_DEBUG  
+            else printf("Connection to %s:%d exists (No. %d in the pool) - reusing\n",ctx->srvName,ctx->srvPort,index);
+        #endif
+
+       *connToUse = index;
        
        /* XXX support anonymous connections, perhaps add a flag to the connPool
         * struct specifying whether or not this connection shall be authenticated
         * to prevent from repeated calls to edg_wll_gss_acquire_cred_gsi() */
-       if (!ctx->connPool[index].gsiCred && 
+       if (!ctx->connections->connPool[index].gsiCred && 
            edg_wll_gss_acquire_cred_gsi(
               ctx->p_proxy_filename ? ctx->p_proxy_filename : ctx->p_cert_filename,
               ctx->p_proxy_filename ? ctx->p_proxy_filename : ctx->p_key_filename,
-              &ctx->connPool[index].gsiCred, NULL, &gss_stat)) {
+              &ctx->connections->connPool[index].gsiCred, NULL, &gss_stat)) {
            edg_wll_SetErrorGss(ctx, "failed to load GSI credentials", &gss_stat);
            goto err;
        }
 
-       if (ctx->connPool[index].gss.context == GSS_C_NO_CONTEXT) {     
-               switch (edg_wll_gss_connect(ctx->connPool[index].gsiCred,
-                               ctx->connPool[index].peerName, ctx->connPool[index].peerPort,
-                               &ctx->p_tmp_timeout,&ctx->connPool[index].gss,
+       if (ctx->connections->connPool[index].gss.context == GSS_C_NO_CONTEXT) {        
+               switch (edg_wll_gss_connect(ctx->connections->connPool[index].gsiCred,
+                               ctx->connections->connPool[index].peerName, ctx->connections->connPool[index].peerPort,
+                               &ctx->p_tmp_timeout,&ctx->connections->connPool[index].gss,
                                &gss_stat)) {
                
                        case EDG_WLL_GSS_OK: 
@@ -190,9 +253,18 @@ int edg_wll_open(edg_wll_Context ctx)
 err:
        /* some error occured; close created connection
         * and free all fields in connPool[index] */
-       CloseConnection(ctx, index);
-       ctx->connToUse = -1;
+       if (index >= 0) CloseConnection(ctx, &index);
+       *connToUse = -1;
 ok:    
+
+        if (*connToUse>-1) edg_wll_connectionTryLock(ctx, *connToUse); /* Just to be sure we have not forgotten to lock it */
+
+end:
+
+       edg_wll_poolUnlock(); /* One way or the other, there are no more pool-wide operations */
+
+//     xxxxx
+
        return edg_wll_Error(ctx,NULL,NULL);
 }
 
@@ -342,39 +414,43 @@ int edg_wll_http_send_recv(
 {
        int     ec;
        char    *ed = NULL;
+       int     connToUse = -1; //Index of the connection to use. Used to be a context member.
 
-       if (edg_wll_open(ctx)) return edg_wll_Error(ctx,NULL,NULL);
-       
-       switch (edg_wll_http_send(ctx,request,req_head,req_body)) {
+       if (edg_wll_open(ctx,&connToUse)) return edg_wll_Error(ctx,NULL,NULL);
+
+       switch (edg_wll_http_send(ctx,request,req_head,req_body,&ctx->connections->connPool[connToUse])) {
                case ENOTCONN:
-                       edg_wll_close(ctx);
-                       if (edg_wll_open(ctx)
-                               || edg_wll_http_send(ctx,request,req_head,req_body))
+                       edg_wll_close(ctx,&connToUse);
+                       if (edg_wll_open(ctx,&connToUse)
+                               || edg_wll_http_send(ctx,request,req_head,req_body,&ctx->connections->connPool[connToUse]))
                                        goto err;
                        /* fallthrough */
                case 0: break;
                default: goto err;
        }
 
-       switch (edg_wll_http_recv(ctx,response,resp_head,resp_body)) {
+       switch (edg_wll_http_recv(ctx,response,resp_head,resp_body,&ctx->connections->connPool[connToUse])) {
                case ENOTCONN:
-                       edg_wll_close(ctx);
-                       if (edg_wll_open(ctx)
-                               || edg_wll_http_send(ctx,request,req_head,req_body)
-                               || edg_wll_http_recv(ctx,response,resp_head,resp_body))
+                       edg_wll_close(ctx,&connToUse);
+                       if (edg_wll_open(ctx,&connToUse)
+                               || edg_wll_http_send(ctx,request,req_head,req_body,&ctx->connections->connPool[connToUse])
+                               || edg_wll_http_recv(ctx,response,resp_head,resp_body,&ctx->connections->connPool[connToUse]))
                                        goto err;
                        /* fallthrough */
                case 0: break;
                default: goto err;
        }
        
-       assert(ctx->connToUse >= 0);
-       gettimeofday(&ctx->connPool[ctx->connToUse].lastUsed, NULL);
+       assert(connToUse >= 0);
+       gettimeofday(&ctx->connections->connPool[connToUse].lastUsed, NULL);
+        edg_wll_connectionUnlock(ctx, connToUse);
+
        return 0;
 
 err:
        ec = edg_wll_Error(ctx,NULL,&ed);
-       edg_wll_close(ctx);
+       edg_wll_close(ctx,&connToUse);
        edg_wll_SetError(ctx,ec,ed);
        free(ed);
        return ec;
index 1b12058..1423d7e 100644 (file)
@@ -3,8 +3,8 @@
 
 #ident "$Header$"
 
-int edg_wll_close(edg_wll_Context ctx);
-int edg_wll_open(edg_wll_Context ctx);
+int edg_wll_close(edg_wll_Context ctx,int *);
+int edg_wll_open(edg_wll_Context ctx,int *);
 int edg_wll_http_send_recv(edg_wll_Context, char *, const char * const *, char *, char **, char ***, char **);
 
 int edg_wll_close_proxy(edg_wll_Context ctx);
index de0d012..ab6dccf 100644 (file)
@@ -98,7 +98,8 @@ INSTALL:=libtool --mode=install install
 OBJS:=${JOBID_OBJS} ${PERF_OBJS} lb_plain_io.o escape.o events.o mini_http.o query_rec.o \
        status.o xml_conversions.o xml_parse.o ulm_parse.o param.o \
        events_parse.o il_string.o il_int.o notifid.o \
-       il_log.o il_msg.o log_msg.o context.o trio.o strio.o lb_maildir.o 
+       il_log.o il_msg.o log_msg.o context.o trio.o strio.o lb_maildir.o \
+       connpool.o 
 LOBJS:=${OBJS:.o=.lo}
 
 TRIO_OBJS:=escape.o trio.o strio.o
@@ -112,7 +113,7 @@ THRLOBJS:=${OBJS:.o=.thr.lo}
 
 HDRS:=context-int.h lb_plain_io.h mini_http.h authz.h xml_parse.h \
        xml_conversions.h log_proto.h events_parse.h il_string.h il_msg.h \
-       escape.h ulm_parse.h trio.h lb_maildir.h ${PERF_HDRS}
+       escape.h ulm_parse.h trio.h lb_maildir.h connpool.h ${PERF_HDRS}
 
 STATICLIB:=libglite_lb_common_${nothrflavour}.a
 THRSTATICLIB:=libglite_lb_common_${thrflavour}.a
@@ -253,7 +254,7 @@ il_int_test.o il_string_test.o il_test.o il_msg_test.o: %.o: %.cpp
        ${CXX} -c ${CFLAGS} ${TEST_INC} $<
 
 %.thr.o: %.c
-       ${COMPILE} ${GLOBUSTHRINC} ${CFLAGS} -o $@ -c $<
+       ${COMPILE} ${GLOBUSTHRINC} ${CFLAGS} -D_REENTRANT -DGLITE_LB_THREADED -o $@ -c $<
 
 %.h: %.h.T
        rm -f $@
diff --git a/org.glite.lb.common/interface/connpool.h b/org.glite.lb.common/interface/connpool.h
new file mode 100644 (file)
index 0000000..f536a8b
--- /dev/null
@@ -0,0 +1,98 @@
+#include "glite/security/glite_gss.h"
+#include "glite/lb/consumer.h"
+#include "lb_plain_io.h"
+#include "authz.h"
+#include "glite/lb/log_proto.h"
+#ifdef GLITE_LB_THREADED
+  #include <pthread.h>
+#endif
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+#ifndef EDG_WLL_CONNPOOL_DECLARED
+#define EDG_WLL_CONNPOOL_DECLARED 1
+
+#define        GLITE_LB_COMMON_CONNPOOL_SIZE   50
+
+
+typedef struct _edg_wll_ConnPool {
+/* address and port where we are connected to */
+        char            *peerName;
+        unsigned int    peerPort;
+
+/* http(s) stream */
+        gss_cred_id_t   gsiCred;
+        edg_wll_GssConnection   gss;
+        char            *buf;
+        int             bufUse,bufSize;
+
+/* timestamp of usage of this entry in ctx.connPool */
+        struct timeval  lastUsed;
+} edg_wll_ConnPool;
+#endif
+
+
+#ifndef EDG_WLL_CONNECIONS_DECLARED
+#define EDG_WLL_CONNECIONS_DECLARED 1
+typedef struct _edg_wll_Connections {
+
+/* connection pool array */
+        edg_wll_ConnPool        *connPool;
+       edg_wll_ConnPool        *serverConnection;      /* this is in fact a single-item array intended for use
+                                                          by the server (since the server only uses one connection)*/
+
+/* pool of connections from client */
+        int             poolSize;       /* number of connections in the pool */
+        int             connOpened;     /* number of opened connections  */
+
+/* Connection pool locks & accessories.*/
+#ifdef GLITE_LB_THREADED
+        pthread_mutex_t poolLock;       /* Global pool lock (to be used for pool-wide operations carried out locally) */
+        pthread_mutex_t *connectionLock;      /* Per-connection lock (used to lock out connections that are in use) */
+#endif 
+       edg_wll_Context *locked_by;     /* identifies contexts that have been used to lock a connection since they
+                                           do probably have a connToUse value stored in them*/
+
+} edg_wll_Connections;
+#endif
+
+
+/* Connections Global Handle */
+extern edg_wll_Connections connectionsHandle;
+
+/** ** Locking Functions (wrappers) **/
+/*  Introduced to simplify debugging and switching betwen single/multi-threaded behavior */
+
+/** Lock (try) the pool */
+int edg_wll_poolTryLock();
+
+/** Lock the pool (blocking) */
+int edg_wll_poolLock();
+
+/** Unlock the pool */
+int edg_wll_poolUnlock();
+
+
+/** Lock (try) a single connection */
+int edg_wll_connectionTryLock(edg_wll_Context ctx, int index);
+
+/** Lock a single connection (blocking) */
+int edg_wll_connectionLock(edg_wll_Context ctx, int index);
+
+/** Unlock a connection */
+int edg_wll_connectionUnlock(edg_wll_Context ctx, int index);
+
+
+/** Free memory used by the pool and lock array */
+void edg_wll_poolFree();
+
+/** Allocate memory for the edg_wll_Connections structure and its properties and return a pointer.
+    in case memory has been already allocated, just return a pointer */
+edg_wll_Connections* edg_wll_initConnections();
+
+#ifdef __cplusplus
+}
+#endif
+
index 8536a8e..6935154 100644 (file)
@@ -7,6 +7,7 @@
 #include "glite/lb/consumer.h"
 #include "lb_plain_io.h"
 #include "authz.h"
+#include "connpool.h"
 
 #ifdef __cplusplus
 extern "C" {
@@ -29,23 +30,6 @@ typedef struct _edg_wll_ConnProxy {
 
 
 
-typedef struct _edg_wll_ConnPool {
-/* address and port where we are connected to */
-       char            *peerName;
-       unsigned int    peerPort;
-       
-/* http(s) stream */
-       gss_cred_id_t   gsiCred;
-       edg_wll_GssConnection   gss;
-       char            *buf;
-       int             bufUse,bufSize;
-
-/* timestamp of usage of this entry in ctx.connPool */
-       struct timeval  lastUsed;
-} edg_wll_ConnPool;
-
-
-
 struct _edg_wll_Context {
 /* Error handling */
        int             errCode;        /* recent error code */
@@ -54,7 +38,7 @@ struct _edg_wll_Context {
 /* server part */
 
        void            *mysql;
-       edg_wll_ConnPool        *connPool;
+       edg_wll_Connections     *connections;
        edg_wll_ConnPool        *connPoolNotif;         /* hold _one_ connection from notif-interlogger */
        edg_wll_ConnProxy       *connProxy;             /* holds one plain connection */
 
@@ -99,14 +83,6 @@ struct _edg_wll_Context {
        char            *srvName;
        unsigned int    srvPort;
        
-/* pool of connections from client */
-       int             poolSize;
-       int             connOpened;     /* number of opened connections  */
-       int             connToUse;      /* index of connection that will *
-                                        *  be used by low-level f-cions */
-       // XXX similar variables will be needed for connPoolNotif
-
-       
 /* other client stuff */
        int             notifSock;              /* default client socket        *
                                                 * for receiving notifications  */   
index aeaf306..505df74 100644 (file)
@@ -68,7 +68,7 @@ extern "C" {
 /**
  * default maximal number of simultaneously open connections from one client
  */
-#define EDG_WLL_LOG_CONNECTIONS_DEFAULT                4
+// XXX: not used? #define EDG_WLL_LOG_CONNECTIONS_DEFAULT              4
 
 
 #ifdef __cplusplus
index 2b846f3..1bae8b9 100644 (file)
@@ -4,6 +4,7 @@
 #ident "$Header$"
 
 #include "glite/lb/consumer.h"
+#include "connpool.h"
 
 /* XXX: not a good place for the folowing #def's but we ain't got better currently */
 /** protocol version */
 
 extern edg_wll_ErrorCode edg_wll_http_recv(
        edg_wll_Context,        /* INOUT: context */
-       char **,        /* OUT: first line */
-       char ***,       /* OUT: null terminated array of headers */
-       char **         /* OUT: message body */
+       char **,                /* OUT: first line */
+       char ***,               /* OUT: null terminated array of headers */
+       char **,                /* OUT: message body */
+        edg_wll_ConnPool *connPTR /* IN: Pointer to the connection to use */
 );
 
 extern edg_wll_ErrorCode edg_wll_http_send(
        edg_wll_Context,        /* INOUT: context */
        const char *,           /* IN: first line */
        char const * const *,   /* IN: headers */
-       const char *            /* IN: message body */
+       const char *,           /* IN: message body */
+        edg_wll_ConnPool *connPTR /* IN: Pointer to the connection to use */
 );
 
 extern edg_wll_ErrorCode edg_wll_http_recv_proxy(
diff --git a/org.glite.lb.common/src/connpool.c b/org.glite.lb.common/src/connpool.c
new file mode 100644 (file)
index 0000000..fb84222
--- /dev/null
@@ -0,0 +1,277 @@
+#include "connpool.h"
+
+#ifdef GLITE_LB_THREADED
+edg_wll_Connections connectionsHandle = 
+  { NULL , NULL , GLITE_LB_COMMON_CONNPOOL_SIZE , 0 , PTHREAD_MUTEX_INITIALIZER , NULL , NULL};
+#endif
+
+#ifndef GLITE_LB_THREADED
+edg_wll_Connections connectionsHandle =
+  { NULL , NULL , GLITE_LB_COMMON_CONNPOOL_SIZE , 0 , NULL};
+#endif
+
+/** Lock (try) the pool */
+int edg_wll_poolTryLock() {
+int RetVal;
+
+  #ifdef GLITE_LB_THREADED  /* Threaded version */
+    #ifdef EDG_WLL_CONNPOOL_DEBUG
+      printf("Thread %d trying to lock the pool ",pthread_self());
+    #endif
+    RetVal =
+        pthread_mutex_trylock(
+           &connectionsHandle.poolLock
+        );
+
+    #ifdef EDG_WLL_CONNPOOL_DEBUG
+      printf("- result %d\n",RetVal);
+    #endif
+  #endif
+
+
+  #ifndef GLITE_LB_THREADED  /* Single-thread version */
+    RetVal = 0;
+
+    #ifdef EDG_WLL_CONNPOOL_DEBUG
+      printf("Dummy - Trying to lock the pool (pthreads not included)\n");
+    #endif
+  #endif
+
+
+  return(RetVal);
+}
+
+
+/** Lock the pool (blocking) */
+int edg_wll_poolLock() {
+int RetVal;
+
+  #ifdef GLITE_LB_THREADED
+    
+    #ifdef EDG_WLL_CONNPOOL_DEBUG
+      printf("Thread %d trying to lock the pool\n",pthread_self());
+    #endif
+    RetVal = pthread_mutex_trylock(&connectionsHandle.poolLock);
+   
+    if (RetVal) {
+
+      #ifdef EDG_WLL_CONNPOOL_DEBUG
+        printf("Thread %d locking the pool\n",pthread_self());
+      #endif
+      RetVal = pthread_mutex_lock(&connectionsHandle.poolLock);
+    }
+
+    #ifdef EDG_WLL_CONNPOOL_DEBUG
+      printf("Thread %d - lock result %d\n",pthread_self(),RetVal);
+    #endif
+  #endif
+
+  #ifndef GLITE_LB_THREADED
+    RetVal = 0;
+
+    #ifdef EDG_WLL_CONNPOOL_DEBUG
+      printf("Dummy - Locking the pool (pthreads not included)\n");
+    #endif
+  #endif
+
+
+  return(RetVal);
+}
+
+
+/** Unlock the pool */
+int edg_wll_poolUnlock() {
+int RetVal;
+
+  #ifdef GLITE_LB_THREADED
+    #ifdef EDG_WLL_CONNPOOL_DEBUG
+      printf("Thread %d unlocking the pool\n",pthread_self());
+    #endif
+    RetVal = pthread_mutex_unlock(&connectionsHandle.poolLock);
+
+    #ifdef EDG_WLL_CONNPOOL_DEBUG
+      printf("Thread %d - unlock result %d\n",pthread_self(),RetVal);
+    #endif
+
+  #endif
+
+  #ifndef GLITE_LB_THREADED
+    RetVal = 0;
+
+    #ifdef EDG_WLL_CONNPOOL_DEBUG
+      printf("Dummy - Unlocking the pool (pthreads not included)\n");
+    #endif
+
+  #endif
+
+
+  return(RetVal);
+}
+
+
+/** Lock (try) a single connection */
+int edg_wll_connectionTryLock(edg_wll_Context ctx, int index) {
+int RetVal;
+
+  #ifdef GLITE_LB_THREADED
+    RetVal = pthread_mutex_trylock(&connectionsHandle.connectionLock[index]); /* Try to lock the connection */
+    if (!RetVal) connectionsHandle.locked_by[index] = (void*)ctx;              /* If lock succeeded, store the
+                                                                                  locking context address */
+
+    #ifdef EDG_WLL_CONNPOOL_DEBUG
+      printf("Thread %d trying to lock connection No. [%d] - result %d\n",pthread_self(),index,RetVal);
+    #endif
+
+  #endif
+
+  #ifndef GLITE_LB_THREADED
+    RetVal = 0;
+
+    #ifdef EDG_WLL_CONNPOOL_DEBUG
+      printf("Dummy - Trying to lock connection No. [%d] (pthreads not included)\n",index);
+    #endif
+
+  #endif
+
+
+  return(RetVal);
+}
+
+
+/** Lock a single connection (blocking) */
+int edg_wll_connectionLock(edg_wll_Context ctx, int index) {
+int RetVal;
+
+  #ifdef GLITE_LB_THREADED
+    RetVal = pthread_mutex_lock(&connectionsHandle.connectionLock[index]);     /* Lock the connection (wait if
+                                                                                  not available)*/
+    if (!RetVal) connectionsHandle.locked_by[index] = (void*)ctx;               /* If lock succeeded, store the
+                                                                                  locking context address */
+
+    #ifdef EDG_WLL_CONNPOOL_DEBUG
+      printf("Thread %d locking connection No. [%d] - result %d\n",pthread_self(),index,RetVal);
+    #endif
+
+  #endif
+
+  #ifndef GLITE_LB_THREADED
+    RetVal = 0;
+
+    #ifdef EDG_WLL_CONNPOOL_DEBUG
+      printf("Dummy - locking connection No. [%d] (pthreads not included)\n",index);
+    #endif
+
+  #endif
+
+
+  return(RetVal);
+}
+
+
+/** Unlock a connection */
+int edg_wll_connectionUnlock(edg_wll_Context ctx, int index) {
+int RetVal;
+
+  #ifdef GLITE_LB_THREADED
+    RetVal = pthread_mutex_unlock(&connectionsHandle.connectionLock[index]);
+    if (!RetVal) connectionsHandle.locked_by[index] = NULL;
+
+    #ifdef EDG_WLL_CONNPOOL_DEBUG
+      printf("Thread %d unlocking connection No. [%d] - result %d\n",pthread_self(),index,RetVal);
+    #endif
+
+  #endif
+
+  #ifndef GLITE_LB_THREADED
+    RetVal = 0;
+
+    #ifdef EDG_WLL_CONNPOOL_DEBUG
+      printf("Dummy - unlocking connection No. [%d] (pthreads not included)\n",index);
+    #endif
+
+  #endif
+
+
+  return(RetVal);
+}
+
+
+/** Free all memory used by the pool and lock-related arrays */
+void edg_wll_poolFree() {
+       int i;
+        struct timeval close_timeout = {0, 50000};     /* Declarations taken over from edg_wll_FreeContext() */
+        OM_uint32 min_stat;                            /* Does not seem to have any use whatsoever - neither
+                                                          here nor in edg_wll_FreeContext() */
+
+        #ifdef EDG_WLL_CONNPOOL_DEBUG
+            #ifdef GLITE_LB_THREADED
+                printf("Thread %d ",pthread_self());
+            #endif
+            printf("Entering edg_wll_poolFree\n");
+        #endif
+
+
+        for (i=0; i<connectionsHandle.poolSize; i++) {
+               if (connectionsHandle.connPool[i].peerName) free(connectionsHandle.connPool[i].peerName);
+                edg_wll_gss_close(&connectionsHandle.connPool[i].gss,&close_timeout);
+                if (connectionsHandle.connPool[i].gsiCred)
+                       gss_release_cred(&min_stat, &connectionsHandle.connPool[i].gsiCred);
+                if (connectionsHandle.connPool[i].buf) free(connectionsHandle.connPool[i].buf);
+        }
+
+       edg_wll_poolLock();
+#ifdef GLITE_LB_THREADED
+       free(connectionsHandle.connectionLock);
+       connectionsHandle.connectionLock = NULL;
+#endif
+       free(connectionsHandle.serverConnection);
+       free(connectionsHandle.connPool);
+       free(connectionsHandle.locked_by);
+       connectionsHandle.serverConnection = NULL;
+        connectionsHandle.connPool = NULL;
+        connectionsHandle.locked_by = NULL;
+
+
+}
+
+/** Allocate memory for arrays within the edg_wll_Connections structure */
+edg_wll_Connections* edg_wll_initConnections() {
+
+
+    #ifdef EDG_WLL_CONNPOOL_DEBUG
+        #ifdef GLITE_LB_THREADED
+            printf("Thread %d ",pthread_self());
+        #endif
+        printf("Entering edg_wll_initConnections\n");
+    #endif
+
+    if((connectionsHandle.connPool == NULL) &&
+       (connectionsHandle.poolSize > 0)) { /* We need to allocate memory for the connPool and connectionLock arrays */ 
+
+        #ifdef EDG_WLL_CONNPOOL_DEBUG
+            #ifdef GLITE_LB_THREADED
+                printf("Thread %d ",pthread_self());
+            #endif
+            printf("Initializng connections AND THE CONNECTION POOL\n");
+        #endif
+
+
+        connectionsHandle.connPool = (edg_wll_ConnPool *) calloc(connectionsHandle.poolSize, sizeof(edg_wll_ConnPool));
+
+        #ifdef GLITE_LB_THREADED
+            connectionsHandle.connectionLock = (pthread_mutex_t *) calloc(connectionsHandle.poolSize, sizeof(pthread_mutex_t));
+        #endif
+
+        connectionsHandle.locked_by = (edg_wll_Context) calloc(connectionsHandle.poolSize, sizeof(edg_wll_Context));
+
+    }
+    if(connectionsHandle.serverConnection == NULL) {
+        connectionsHandle.serverConnection = (edg_wll_ConnPool *) calloc(1, sizeof(edg_wll_ConnPool));
+    }
+
+
+  return (&connectionsHandle);
+}
+
+
index 47408ea..3e0ea8a 100644 (file)
@@ -34,11 +34,12 @@ int edg_wll_InitContext(edg_wll_Context *ctx)
        /* XXX */
        for (i=0; i<EDG_WLL_PARAM__LAST; i++) edg_wll_SetParam(out,i,NULL);
 
-       out->connPool = (edg_wll_ConnPool *) calloc(out->poolSize, sizeof(edg_wll_ConnPool));
+        out->connections = edg_wll_initConnections();
+//     out->connections->connPool = (edg_wll_ConnPool *) calloc(out->connections->poolSize, sizeof(edg_wll_ConnPool));
        out->connPoolNotif = (edg_wll_ConnPool *) calloc(1, sizeof(edg_wll_ConnPool));
        out->connProxy = (edg_wll_ConnProxy *) calloc(1, sizeof(edg_wll_ConnProxy));
        out->connProxy->conn.sock = -1;
-       out->connToUse = -1;
+//     out->connToUse = -1;
 
        *ctx = out;
 
@@ -73,17 +74,43 @@ void edg_wll_FreeContext(edg_wll_Context ctx)
 }
 #endif
        if (ctx->errDesc) free(ctx->errDesc);
-       if (ctx->connPool) {
+       if (ctx->connections->connPool) {
                int i;
-               
-               for (i=0; i<ctx->poolSize; i++) {
-                       if (ctx->connPool[i].peerName) free(ctx->connPool[i].peerName);
-                       edg_wll_gss_close(&ctx->connPool[i].gss,&close_timeout);
-                       if (ctx->connPool[i].gsiCred)
-                               gss_release_cred(&min_stat, &ctx->connPool[i].gsiCred);
-                       if (ctx->connPool[i].buf) free(ctx->connPool[i].buf);
+
+#ifdef GLITE_LB_THREADED
+                /* Since the introduction of a shared connection pool, the pool
+                   cannot be freed here. We only need to unlock connections that
+                   may have been locked using this context. */
+
+                #ifdef EDG_WLL_CONNPOOL_DEBUG
+                     printf("Running edg_wll_FreeContext - checking for connections locked by the current context.\n");
+               #endif
+
+                edg_wll_poolLock();
+
+               for (i=0; i<ctx->connections->poolSize; i++) {
+       
+                        if (ctx->connections->locked_by[i]==ctx) {
+                                #ifdef EDG_WLL_CONNPOOL_DEBUG
+                                    printf("Unlocking connection No. %d...",i);
+                                #endif
+                               edg_wll_connectionUnlock(ctx, i);
+
+                       }
+               }
+
+                edg_wll_poolUnlock();
+#endif
+
+/*             
+               for (i=0; i<ctx->connections->poolSize; i++) {
+                       if (ctx->connections->connPool[i].peerName) free(ctx->connections->connPool[i].peerName);
+                       edg_wll_gss_close(&ctx->connections->connPool[i].gss,&close_timeout);
+                       if (ctx->connections->connPool[i].gsiCred)
+                               gss_release_cred(&min_stat, &ctx->connections->connPool[i].gsiCred);
+                       if (ctx->connections->connPool[i].buf) free(ctx->connections->connPool[i].buf);
                }       
-               free(ctx->connPool);
+               free(ctx->connections->connPool);*/
        }
        if (ctx->connPoolNotif) {
                if (ctx->connPoolNotif[0].peerName) free(ctx->connPoolNotif[0].peerName);
index 0535aa6..29aa3fc 100644 (file)
@@ -21,7 +21,7 @@
 #define min(x,y)       ((x) < (y) ? (x) : (y))
 #define CONTENT_LENGTH "Content-Length:"
 
-edg_wll_ErrorCode edg_wll_http_recv(edg_wll_Context ctx,char **firstOut,char ***hdrOut,char **bodyOut)
+edg_wll_ErrorCode edg_wll_http_recv(edg_wll_Context ctx,char **firstOut,char ***hdrOut,char **bodyOut, edg_wll_ConnPool *connPTR)
 {
        char    **hdr = NULL,*first = NULL,*body = NULL;
        enum    { FIRST, HEAD, BODY, DONE }     pstat = FIRST;
@@ -30,23 +30,23 @@ edg_wll_ErrorCode edg_wll_http_recv(edg_wll_Context ctx,char **firstOut,char ***
        edg_wll_GssStatus gss_code;
 
 #define bshift(shift) {\
-       memmove(ctx->connPool[ctx->connToUse].buf,ctx->connPool[ctx->connToUse].buf+(shift),ctx->connPool[ctx->connToUse].bufUse-(shift));\
-       ctx->connPool[ctx->connToUse].bufUse -= (shift);\
+       memmove(connPTR->buf,connPTR->buf+(shift),connPTR->bufUse-(shift));\
+       connPTR->bufUse -= (shift);\
 }
        edg_wll_ResetError(ctx);
 
-       if (ctx->connPool[ctx->connToUse].gss.context != GSS_C_NO_CONTEXT)
-               sock = ctx->connPool[ctx->connToUse].gss.sock;
+       if (connPTR->gss.context != GSS_C_NO_CONTEXT)
+               sock = connPTR->gss.sock;
        else {
                edg_wll_SetError(ctx,ENOTCONN,NULL);
                goto error;
        }
 
-       if (!ctx->connPool[ctx->connToUse].buf) ctx->connPool[ctx->connToUse].buf = malloc(ctx->connPool[ctx->connToUse].bufSize = BUFSIZ);
+       if (!connPTR->buf) connPTR->buf = malloc(connPTR->bufSize = BUFSIZ);
 
        do {
-               len = edg_wll_gss_read(&ctx->connPool[ctx->connToUse].gss,
-                       ctx->connPool[ctx->connToUse].buf+ctx->connPool[ctx->connToUse].bufUse,ctx->connPool[ctx->connToUse].bufSize-ctx->connPool[ctx->connToUse].bufUse,&ctx->p_tmp_timeout, &gss_code);
+               len = edg_wll_gss_read(&connPTR->gss,
+                       connPTR->buf+connPTR->bufUse,connPTR->bufSize-connPTR->bufUse,&ctx->p_tmp_timeout, &gss_code);
 
                switch (len) {
                        case EDG_WLL_GSS_OK:
@@ -68,27 +68,27 @@ edg_wll_ErrorCode edg_wll_http_recv(edg_wll_Context ctx,char **firstOut,char ***
                }
 
 
-               ctx->connPool[ctx->connToUse].bufUse += len;
+               connPTR->bufUse += len;
                rdmore = 0;
 
                while (!rdmore && pstat != DONE) switch (pstat) {
                        char    *cr; 
 
                        case FIRST:
-                               if ((cr = memchr(ctx->connPool[ctx->connToUse].buf,'\r',ctx->connPool[ctx->connToUse].bufUse)) &&
-                                       ctx->connPool[ctx->connToUse].bufUse >= cr-ctx->connPool[ctx->connToUse].buf+2 && cr[1] == '\n')
+                               if ((cr = memchr(connPTR->buf,'\r',connPTR->bufUse)) &&
+                                       connPTR->bufUse >= cr-connPTR->buf+2 && cr[1] == '\n')
                                {
                                        *cr = 0;
-                                       first = strdup(ctx->connPool[ctx->connToUse].buf);
-                                       bshift(cr-ctx->connPool[ctx->connToUse].buf+2);
+                                       first = strdup(connPTR->buf);
+                                       bshift(cr-connPTR->buf+2);
                                        pstat = HEAD;
                                } else rdmore = 1;
                                break;
                        case HEAD:
-                               if ((cr = memchr(ctx->connPool[ctx->connToUse].buf,'\r',ctx->connPool[ctx->connToUse].bufUse)) &&
-                                       ctx->connPool[ctx->connToUse].bufUse >= cr-ctx->connPool[ctx->connToUse].buf+2 && cr[1] == '\n')
+                               if ((cr = memchr(connPTR->buf,'\r',connPTR->bufUse)) &&
+                                       connPTR->bufUse >= cr-connPTR->buf+2 && cr[1] == '\n')
                                {
-                                       if (cr == ctx->connPool[ctx->connToUse].buf) {
+                                       if (cr == connPTR->buf) {
                                                bshift(2);
                                                pstat = clen ? BODY : DONE;
                                                if (clen) body = malloc(clen+1);
@@ -97,19 +97,19 @@ edg_wll_ErrorCode edg_wll_http_recv(edg_wll_Context ctx,char **firstOut,char ***
 
                                        *cr = 0;
                                        hdr = realloc(hdr,(nhdr+2) * sizeof(*hdr));
-                                       hdr[nhdr] = strdup(ctx->connPool[ctx->connToUse].buf);
+                                       hdr[nhdr] = strdup(connPTR->buf);
                                        hdr[++nhdr] = NULL;
 
-                                       if (!strncasecmp(ctx->connPool[ctx->connToUse].buf,CONTENT_LENGTH,sizeof(CONTENT_LENGTH)-1))
-                                               clen = atoi(ctx->connPool[ctx->connToUse].buf+sizeof(CONTENT_LENGTH)-1);
+                                       if (!strncasecmp(connPTR->buf,CONTENT_LENGTH,sizeof(CONTENT_LENGTH)-1))
+                                               clen = atoi(connPTR->buf+sizeof(CONTENT_LENGTH)-1);
        
-                                       bshift(cr-ctx->connPool[ctx->connToUse].buf+2);
+                                       bshift(cr-connPTR->buf+2);
                                } else rdmore = 1;
                                break;
                        case BODY:
-                               if (ctx->connPool[ctx->connToUse].bufUse) {
-                                       int     m = min(ctx->connPool[ctx->connToUse].bufUse,clen-blen);
-                                       memcpy(body+blen,ctx->connPool[ctx->connToUse].buf,m);
+                               if (connPTR->bufUse) {
+                                       int     m = min(connPTR->bufUse,clen-blen);
+                                       memcpy(body+blen,connPTR->buf,m);
                                        blen += m;
                                        bshift(m);
                                }
@@ -297,23 +297,23 @@ static int real_write(edg_wll_Context ctx, edg_wll_GssConnection *con,const char
        }
 }
 
-edg_wll_ErrorCode edg_wll_http_send(edg_wll_Context ctx,const char *first,const char * const *head,const char *body)
+edg_wll_ErrorCode edg_wll_http_send(edg_wll_Context ctx,const char *first,const char * const *head,const char *body, edg_wll_ConnPool *connPTR)
 {
        const char* const *h;
        int     len = 0, blen;
 
        edg_wll_ResetError(ctx);
 
-       if (ctx->connPool[ctx->connToUse].gss.context == GSS_C_NO_CONTEXT)
+       if (connPTR->gss.context == GSS_C_NO_CONTEXT)
           return edg_wll_SetError(ctx,ENOTCONN,NULL);
 
-       if (real_write(ctx,&ctx->connPool[ctx->connToUse].gss,first,strlen(first)) < 0 ||
-               real_write(ctx,&ctx->connPool[ctx->connToUse].gss,"\r\n",2) < 0) 
+       if (real_write(ctx,&connPTR->gss,first,strlen(first)) < 0 ||
+               real_write(ctx,&connPTR->gss,"\r\n",2) < 0) 
                return edg_wll_SetError(ctx,errno,"edg_wll_http_send()");
 
        if (head) for (h=head; *h; h++) 
-               if (real_write(ctx,&ctx->connPool[ctx->connToUse].gss,*h,strlen(*h)) < 0 ||
-                       real_write(ctx,&ctx->connPool[ctx->connToUse].gss,"\r\n",2) < 0) 
+               if (real_write(ctx,&connPTR->gss,*h,strlen(*h)) < 0 ||
+                       real_write(ctx,&connPTR->gss,"\r\n",2) < 0) 
                        return edg_wll_SetError(ctx,errno,"edg_wll_http_send()");
 
        if (body) {
@@ -321,13 +321,13 @@ edg_wll_ErrorCode edg_wll_http_send(edg_wll_Context ctx,const char *first,const
 
                len = strlen(body);
                blen = sprintf(buf,CONTENT_LENGTH " %d\r\n",len);
-               if (real_write(ctx,&ctx->connPool[ctx->connToUse].gss,buf,blen) < 0) 
+               if (real_write(ctx,&connPTR->gss,buf,blen) < 0) 
                        return edg_wll_SetError(ctx,errno,"edg_wll_http_send()");
        }
 
-       if (real_write(ctx,&ctx->connPool[ctx->connToUse].gss,"\r\n",2) < 0) 
+       if (real_write(ctx,&connPTR->gss,"\r\n",2) < 0) 
                return edg_wll_SetError(ctx,errno,"edg_wll_http_send()");
-       if (body && real_write(ctx,&ctx->connPool[ctx->connToUse].gss,body,len) < 0)  
+       if (body && real_write(ctx,&connPTR->gss,body,len) < 0)  
                return edg_wll_SetError(ctx,errno,"edg_wll_http_send()");
 
        return edg_wll_Error(ctx,NULL,NULL);
index d0b70bd..02d218f 100644 (file)
@@ -30,7 +30,7 @@ static const char *myenv[] = {
        "%sQUERY_JOBS_LIMIT",
        "%sQUERY_EVENTS_LIMIT",
        "%sQUERY_RESULTS",
-       "%sQUERY_CONNECTIONS",
+       "%sCONNPOOL_SIZE",
        "%sNOTIF_SERVER",
        "%sNOTIF_SERVER",
        "%sNOTIF_TIMEOUT",
@@ -238,12 +238,12 @@ int edg_wll_SetParamInt(edg_wll_Context ctx,edg_wll_ContextParam param,int val)
                                return edg_wll_SetError(ctx,EINVAL,"can't parse query result parameter name");
                        }
                        break;
-               case EDG_WLL_PARAM_QUERY_CONNECTIONS:
+               case EDG_WLL_PARAM_CONNPOOL_SIZE:
                        {
                                char *s = mygetenv(param);
                                
                                if (!val && s) val = atoi(s);
-                               ctx->poolSize = val ? val : EDG_WLL_LOG_CONNECTIONS_DEFAULT;
+                               connectionsHandle.poolSize = val ? val : GLITE_LB_COMMON_CONNPOOL_SIZE;
                        }
                        break;
                case EDG_WLL_PARAM_SOURCE:           
@@ -312,7 +312,7 @@ int edg_wll_SetParam(edg_wll_Context ctx,edg_wll_ContextParam param,...)
                case EDG_WLL_PARAM_QUERY_JOBS_LIMIT:      
                case EDG_WLL_PARAM_QUERY_EVENTS_LIMIT:      
                case EDG_WLL_PARAM_QUERY_RESULTS:
-               case EDG_WLL_PARAM_QUERY_CONNECTIONS:
+               case EDG_WLL_PARAM_CONNPOOL_SIZE:
                case EDG_WLL_PARAM_SOURCE:           
                        return edg_wll_SetParamInt(ctx,param,va_arg(ap,int));
                case EDG_WLL_PARAM_HOST:             
@@ -378,9 +378,9 @@ int edg_wll_GetParam(edg_wll_Context ctx,edg_wll_ContextParam param,...)
                        p_int = va_arg(ap, int *);
                        *p_int = ctx->p_query_results;
                        break;
-               case EDG_WLL_PARAM_QUERY_CONNECTIONS:
+               case EDG_WLL_PARAM_CONNPOOL_SIZE:
                        p_int = va_arg(ap, int *);
-                       *p_int = ctx->poolSize;
+                       *p_int = connectionsHandle.poolSize;
                        break;
                case EDG_WLL_PARAM_SOURCE:           
                        p_int = va_arg(ap, int *);
index cdac73e..54cc196 100644 (file)
@@ -396,7 +396,6 @@ static int slave(slave_data_init_hnd data_init_hnd, int sock)
                        {
                                kick_client = KICK_HANDLER;
                        } else {
-                               req_cnt++;
                                first_request = 0;
                                to = set_request_to;
                                if ((rv = services[srv].on_request_hnd(conn,to.tv_sec>=0 ? &to : NULL,clnt_data)) == ENOTCONN) {
index f634070..a5c4dec 100644 (file)
@@ -790,14 +790,12 @@ int bk_handle_connection(int conn, struct timeval *timeout, void *data)
        ctx->p_tmp_timeout.tv_sec = timeout->tv_sec;
        ctx->p_tmp_timeout.tv_usec = timeout->tv_usec;
        
-       ctx->poolSize = 1;
-       ctx->connPool = calloc(1, sizeof(edg_wll_ConnPool));
-       ctx->connToUse = 0;
+       edg_wll_initConnections();
 
        alen = sizeof(a);
        getpeername(conn, (struct sockaddr *)&a, &alen);
-       ctx->connPool[ctx->connToUse].peerName = strdup(inet_ntoa(a.sin_addr));
-       ctx->connPool[ctx->connToUse].peerPort = ntohs(a.sin_port);
+       ctx->connections->serverConnection->peerName = strdup(inet_ntoa(a.sin_addr));
+       ctx->connections->serverConnection->peerPort = ntohs(a.sin_port);
        ctx->count_statistics = count_statistics;
 
        ctx->serverIdentity = strdup(server_subject);
@@ -810,8 +808,8 @@ int bk_handle_connection(int conn, struct timeval *timeout, void *data)
        case NETDB_SUCCESS:
                if (name) dprintf(("[%d] connection from %s:%d (%s)\n",
                                        getpid(), inet_ntoa(a.sin_addr), ntohs(a.sin_port), name));
-               free(ctx->connPool[ctx->connToUse].peerName);
-               ctx->connPool[ctx->connToUse].peerName = name;
+               free(ctx->connections->serverConnection->peerName);
+               ctx->connections->serverConnection->peerName = name;
                name = NULL;
                break;
 
@@ -876,7 +874,7 @@ int bk_handle_connection(int conn, struct timeval *timeout, void *data)
                ctx->srvPort = ntohs(a.sin_port);
        }
 
-       if ( (ret = edg_wll_gss_accept(mycred, conn, timeout, &ctx->connPool[ctx->connToUse].gss, &gss_code)) )
+       if ( (ret = edg_wll_gss_accept(mycred, conn, timeout, &ctx->connections->serverConnection->gss, &gss_code)) )
        {
                if ( ret == EDG_WLL_GSS_ERROR_TIMEOUT )
                {
@@ -893,7 +891,7 @@ int bk_handle_connection(int conn, struct timeval *timeout, void *data)
                return 1;
        } 
 
-       maj_stat = gss_inquire_context(&min_stat, ctx->connPool[ctx->connToUse].gss.context,
+       maj_stat = gss_inquire_context(&min_stat, ctx->connections->serverConnection->gss.context,
                                                        &client_name, NULL, NULL, NULL, NULL, NULL, NULL);
        if ( !GSS_ERROR(maj_stat) )
                maj_stat = gss_display_name(&min_stat, client_name, &token, NULL);
@@ -920,7 +918,7 @@ int bk_handle_connection(int conn, struct timeval *timeout, void *data)
        if ( token.value )
                gss_release_buffer(&min_stat, &token);
 
-       if ( edg_wll_SetVomsGroups(ctx, &ctx->connPool[ctx->connToUse].gss, server_cert, server_key, vomsdir, cadir) )
+       if ( edg_wll_SetVomsGroups(ctx, &ctx->connections->serverConnection->gss, server_cert, server_key, vomsdir, cadir) )
        {
                char *errt, *errd;
 
@@ -988,7 +986,7 @@ int bk_handle_ws_connection(int conn, struct timeval *timeout, void *data)
                soap_done(soap);
                goto err;
        }
-       gsplugin_ctx->connection = &cdata->ctx->connPool[cdata->ctx->connToUse].gss;
+       gsplugin_ctx->connection = &cdata->ctx->connections->serverConnection->gss;
        gsplugin_ctx->cred = mycred;
        cdata->soap = soap;
 
@@ -1201,8 +1199,8 @@ int bk_clnt_disconnect(int conn, struct timeval *timeout, void *cdata)
        edg_wll_Context         ctx = ((struct clnt_data_t *) cdata)->ctx;
 
 
-       if ( ctx->connPool[ctx->connToUse].gss.context != GSS_C_NO_CONTEXT)
-               edg_wll_gss_close(&ctx->connPool[ctx->connToUse].gss, timeout);
+       if ( ctx->connections->serverConnection->gss.context != GSS_C_NO_CONTEXT)
+               edg_wll_gss_close(&ctx->connections->serverConnection->gss, timeout);
        edg_wll_FreeContext(ctx);
        ctx = NULL;
 
index 038d679..3e2cb50 100644 (file)
@@ -26,7 +26,7 @@ int edg_wll_ServerHTTP(edg_wll_Context ctx)
 
 
        if ( ctx->isProxy ) err = edg_wll_http_recv_proxy(ctx,&req,&hdr,&body);
-       else err = edg_wll_http_recv(ctx,&req,&hdr,&body);
+       else err = edg_wll_http_recv(ctx,&req,&hdr,&body,ctx->connections->serverConnection);
 
        dprintf(("[%d] %s\n",getpid(),req));
        if (body) dprintf(("\n%s\n\n",body));
@@ -39,7 +39,7 @@ int edg_wll_ServerHTTP(edg_wll_Context ctx)
                        if ( ctx->isProxy )
                                edg_wll_http_send_proxy(ctx,resp,(char const * const *)hdrOut,bodyOut);
                        else
-                               edg_wll_http_send(ctx,resp,(char const * const *)hdrOut,bodyOut);
+                               edg_wll_http_send(ctx,resp,(char const * const *)hdrOut,bodyOut,ctx->connections->serverConnection);
                }
        }
 
index 769011f..18d6287 100644 (file)
@@ -97,7 +97,7 @@ int edg_wll_NotifNewServer(
                        goto cleanup;
                }
                if ( !strncmp(address_override, "0.0.0.0", aux-address_override) )
-                       trio_asprintf(&addr_s, "%s:%s", ctx->connPool[ctx->connToUse].peerName, aux+1);
+                       trio_asprintf(&addr_s, "%s:%s", ctx->connections->serverConnection->peerName, aux+1);
        }
 
        /*      Format DB insert statement
@@ -203,7 +203,7 @@ int edg_wll_NotifBindServer(
                        goto cleanup;
                }
                if ( !strncmp(address_override, "0.0.0.0", aux-address_override) )
-                       trio_asprintf(&addr_s, "%s:%s", ctx->connPool[ctx->connToUse].peerName, aux+1);
+                       trio_asprintf(&addr_s, "%s:%s", ctx->connections->serverConnection->peerName, aux+1);
        }
 
 
index e36acbe..5a625bd 100644 (file)
@@ -380,7 +380,7 @@ abort:
 
        asprintf(&response, "HTTP/1.1 %d %s", ret, edg_wll_HTTPErrorMessage(ret));
 
-       edg_wll_http_send(ctx, response, resp_headers, message);
+       edg_wll_http_send(ctx, response, resp_headers, message,ctx->connections->serverConnection);
 
        return edg_wll_Error(ctx,NULL,NULL);
 }
index 5107454..1a1ce49 100644 (file)
@@ -28,7 +28,7 @@ gss_reader(void *user_data, char *buffer, int max_len)
   int ret, len;
   edg_wll_GssStatus gss_code;
 
-  ret = edg_wll_gss_read_full(&tmp_ctx->connPool[tmp_ctx->connToUse].gss,
+  ret = edg_wll_gss_read_full(&tmp_ctx->connections->serverConnection->gss,
                              buffer, max_len,
                              &tmp_ctx->p_tmp_timeout,
                              &len, &gss_code);
@@ -75,7 +75,7 @@ int edg_wll_StoreProto(edg_wll_Context ctx)
        free(buf);
 
        if ((len = create_reply(ctx,&buf)) > 0) {
-               if ((ret = edg_wll_gss_write_full(&ctx->connPool[ctx->connToUse].gss,buf,len,&ctx->p_tmp_timeout,&total,&gss_code)) < 0)
+               if ((ret = edg_wll_gss_write_full(&ctx->connections->serverConnection->gss,buf,len,&ctx->p_tmp_timeout,&total,&gss_code)) < 0)
                        edg_wll_SetError(ctx,
                                ret == EDG_WLL_GSS_ERROR_TIMEOUT ? 
                                        ETIMEDOUT : EDG_WLL_ERROR_GSS,