Connection pool - threaded version (recommit :-( )
authorZdeněk Šustr <sustr4@cesnet.cz>
Mon, 4 Sep 2006 09:35:34 +0000 (09:35 +0000)
committerZdeněk Šustr <sustr4@cesnet.cz>
Mon, 4 Sep 2006 09:35:34 +0000 (09:35 +0000)
org.glite.lb.client/examples/user_jobs_threaded.c [new file with mode: 0644]
org.glite.lb.common/interface/connpool.h
org.glite.lb.common/src/connpool.c

diff --git a/org.glite.lb.client/examples/user_jobs_threaded.c b/org.glite.lb.client/examples/user_jobs_threaded.c
new file mode 100644 (file)
index 0000000..8d96ed3
--- /dev/null
@@ -0,0 +1,107 @@
+#ident "$Header$"
+
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+
+#include <expat.h>
+
+#include "glite/lb/context.h"
+#include "glite/lb/xml_conversions.h"
+#include "glite/lb/consumer.h"
+
+int use_proxy = 0;
+
+int (*user_jobs)(edg_wll_Context, edg_wlc_JobId **, edg_wll_JobStat **);
+
+
+void
+usage(char *me)
+{
+       fprintf(stderr,"usage: %s [-h] [-x] [userid]\n", me);
+}
+
+int main(int argc,char **argv)
+{
+       edg_wll_Context ctx;
+       char            *errt,*errd;
+       edg_wlc_JobId           *jobs = NULL;
+       edg_wll_JobStat         *states = NULL;
+       int             i,j;
+       char            *owner = NULL;
+
+       user_jobs = edg_wll_UserJobs;
+       for ( i = 1; i < argc; i++ ) {
+               if ( !strcmp(argv[i], "-h") || !strcmp(argv[i], "--help") ) {
+                       usage(argv[0]);
+                       exit(0);
+               } else if ( !strcmp(argv[i], "-x") ) {
+                       user_jobs = edg_wll_UserJobsProxy;
+                       continue;
+               }
+
+               owner = strdup(argv[i]);
+               break;  
+       }
+
+       edg_wll_InitContext(&ctx);
+       if ( user_jobs == edg_wll_UserJobsProxy  && owner )
+               edg_wll_SetParam(ctx, EDG_WLL_PARAM_LBPROXY_USER, owner);
+
+       if (user_jobs(ctx,&jobs,&states)) goto err;
+       for (i=0; states[i].state != EDG_WLL_JOB_UNDEF; i++) {  
+               char *id = edg_wlc_JobIdUnparse(states[i].jobId),
+                    *st = edg_wll_StatToString(states[i].state);
+               
+               if (!states[i].parent_job) {
+                       if (states[i].jobtype == EDG_WLL_STAT_SIMPLE) { 
+                               printf("      %s .... %s %s\n", id, st, (states[i].state==EDG_WLL_JOB_DONE) ? edg_wll_done_codeToString(states[i].done_code) : "" );
+                       }
+                       else if (states[i].jobtype == EDG_WLL_STAT_DAG) {
+                               printf("DAG   %s .... %s %s\n", id, st, (states[i].state==EDG_WLL_JOB_DONE) ? edg_wll_done_codeToString(states[i].done_code) : "");
+                               for (j=0; states[j].state != EDG_WLL_JOB_UNDEF; j++) {
+                                       if (states[j].parent_job) {
+                                               char *par_id = edg_wlc_JobIdUnparse(states[j].parent_job);
+                                               
+                                               if (!strcmp(id,par_id)) {
+                                                       char *sub_id = edg_wlc_JobIdUnparse(states[j].jobId),
+                                                            *sub_st = edg_wll_StatToString(states[j].state);
+                                                       
+                                                       printf(" `-       %s .... %s %s\n", sub_id, sub_st, (states[j].state==EDG_WLL_JOB_DONE) ? edg_wll_done_codeToString(states[j].done_code) : "");
+                                                       free(sub_id);
+                                                       free(sub_st);
+                                               }
+                                               free(par_id);
+                                       }       
+                               }
+                       }
+               }
+                       
+               free(id);
+               free(st);
+       }
+
+       printf("\nFound %d jobs\n",i);
+
+err:
+       free(owner);
+       if  (jobs) {
+               for (i=0; jobs[i]; i++)  edg_wlc_JobIdFree(jobs[i]);    
+               free(jobs);
+       }
+
+       if  (states) {
+               for (i=0; states[i].state; i++)  edg_wll_FreeStatus(&states[i]);        
+               free(states);
+       }
+
+       if (edg_wll_Error(ctx,&errt,&errd)) {
+               fprintf(stderr,"%s: %s (%s)\n",argv[0],errt,errd);
+               edg_wll_FreeContext(ctx);
+               return 1;
+       }
+
+       edg_wll_FreeContext(ctx);
+       return 0;
+}
+
index f9c1df7..af874ac 100644 (file)
@@ -2,13 +2,17 @@
 #include "glite/lb/consumer.h"
 #include "lb_plain_io.h"
 #include "authz.h"
-#include "log_proto.h"
-#include <pthread.h>
+#include "glite/lb/log_proto.h"
+#ifdef GLITE_LB_THREADED
+  #include <pthread.h>
+#endif
 
 #ifdef __cplusplus
 extern "C" {
 #endif
 
+#define EDG_WLL_CONNPOOL_DEBUG
+
 #ifndef EDG_WLL_CONNPOOL_DECLARED
 #define EDG_WLL_CONNPOOL_DECLARED 1
 
@@ -43,15 +47,18 @@ typedef struct _edg_wll_Connections {
         int             connOpened;     /* number of opened connections  */
 
 /* Connection pool locks & accessories.*/
+#ifdef GLITE_LB_THREADED
         pthread_mutex_t poolLock;       /* Global pool lock (to be used for pool-wide operations carried out locally) */
         pthread_mutex_t *connectionLock;      /* Per-connection lock (used to lock out connections that are in use) */
+#endif 
        edg_wll_Context *locked_by;     /* identifies contexts that have been used to lock a connection since they
                                            do probably have a connToUse value stored in them*/
+
 } edg_wll_Connections;
 #endif
-/* Connections Global Handle */
 
 
+/* Connections Global Handle */
 extern edg_wll_Connections connectionsHandle;
 
 /** ** Locking Functions (wrappers) **/
index 1275b07..e094d28 100644 (file)
@@ -1,19 +1,37 @@
 #include "connpool.h"
 
+#ifdef GLITE_LB_THREADED
 edg_wll_Connections connectionsHandle = 
   { NULL , NULL , EDG_WLL_LOG_CONNECTIONS_DEFAULT , 0 , PTHREAD_MUTEX_INITIALIZER , NULL , NULL};
+#endif
 
+#ifndef GLITE_LB_THREADED
+edg_wll_Connections connectionsHandle =
+  { NULL , NULL , EDG_WLL_LOG_CONNECTIONS_DEFAULT , 0 , NULL};
+#endif
 
 /** Lock (try) the pool */
 int edg_wll_poolTryLock() {
 int RetVal;
 
-  #ifdef THR
-    RetVal = pthread_mutex_trylock(&edg_wll_Connections.poolLock);
+  #ifdef GLITE_LB_THREADED  /* Threaded version */
+    RetVal =
+        pthread_mutex_trylock(
+           &connectionsHandle.poolLock
+        );
+
+    #ifdef EDG_WLL_CONNPOOL_DEBUG
+      printf("Thread %d tring to lock the pool - result %d\n",pthread_self(),RetVal);
+    #endif
   #endif
 
-  #ifndef THR
+
+  #ifndef GLITE_LB_THREADED  /* Single-thread version */
     RetVal = 0;
+
+    #ifdef EDG_WLL_CONNPOOL_DEBUG
+      printf("Dummy - Locking the pool (pthreads not included)\n");
+    #endif
   #endif
 
 
@@ -25,11 +43,16 @@ int RetVal;
 int edg_wll_poolLock() {
 int RetVal;
 
-  #ifdef THR
-    RetVal = pthread_mutex_lock(&edg_wll_Connections.poolLock);
+  #ifdef GLITE_LB_THREADED
+    RetVal = pthread_mutex_lock(&connectionsHandle.poolLock);
+    printf("Debug out edg_wll_poolLock #2\n");
+
+    #ifdef EDG_WLL_CONNPOOL_DEBUG
+      printf("Thread %d tring to lock the pool - result %d\n",pthread_self(),RetVal);
+    #endif
   #endif
 
-  #ifndef THR
+  #ifndef GLITE_LB_THREADED
     RetVal = 0;
   #endif
 
@@ -42,11 +65,11 @@ int RetVal;
 int edg_wll_poolUnlock() {
 int RetVal;
 
-  #ifdef THR
-    RetVal = pthread_mutex_unlock(&edg_wll_Connections.poolLock);
+  #ifdef GLITE_LB_THREADED
+    RetVal = pthread_mutex_unlock(&connectionsHandle.poolLock);
   #endif
 
-  #ifndef THR
+  #ifndef GLITE_LB_THREADED
     RetVal = 0;
   #endif
 
@@ -59,13 +82,13 @@ int RetVal;
 int edg_wll_connectionTryLock(edg_wll_Context ctx, int index) {
 int RetVal;
 
-  #ifdef THR
-    RetVal = pthread_mutex_trylock(&edg_wll_Connections.connectionLock[index]); /* Try to lock the connection */
+  #ifdef GLITE_LB_THREADED
+    RetVal = pthread_mutex_trylock(&connectionsHandle.connectionLock[index]); /* Try to lock the connection */
     if (!RetVal) connectionsHandle.locked_by[index] = (void*)ctx;              /* If lock succeeded, store the
                                                                                   locking context address */
   #endif
 
-  #ifndef THR
+  #ifndef GLITE_LB_THREADED
     RetVal = 0;
   #endif
 
@@ -78,14 +101,14 @@ int RetVal;
 int edg_wll_connectionLock(edg_wll_Context ctx, int index) {
 int RetVal;
 
-  #ifdef THR
-    RetVal = pthread_mutex_lock(&edg_wll_Connections.connectionLock[index]);   /* Lock the connection (wait if
+  #ifdef GLITE_LB_THREADED
+    RetVal = pthread_mutex_lock(&connectionsHandle.connectionLock[index]);     /* Lock the connection (wait if
                                                                                   not available)*/
     if (!RetVal) connectionsHandle.locked_by[index] = (void*)ctx;               /* If lock succeeded, store the
                                                                                   locking context address */
   #endif
 
-  #ifndef THR
+  #ifndef GLITE_LB_THREADED
     RetVal = 0;
   #endif
 
@@ -98,12 +121,12 @@ int RetVal;
 int edg_wll_connectionUnlock(edg_wll_Context ctx, int index) {
 int RetVal;
 
-  #ifdef THR
-    RetVal = pthread_mutex_unlock(&edg_wll_Connections.connectionLock[index]);
+  #ifdef GLITE_LB_THREADED
+    RetVal = pthread_mutex_unlock(&connectionsHandle.connectionLock[index]);
     if (!RetVal) connectionsHandle.locked_by[index] = NULL;
   #endif
 
-  #ifndef THR
+  #ifndef GLITE_LB_THREADED
     RetVal = 0;
   #endif
 
@@ -131,11 +154,13 @@ void edg_wll_poolFree() {
         }
 
        edg_wll_poolLock();
+#ifdef GLITE_LB_THREADED
        free(connectionsHandle.connectionLock);
+       connectionsHandle.connectionLock = NULL;
+#endif
        free(connectionsHandle.serverConnection);
        free(connectionsHandle.connPool);
        free(connectionsHandle.locked_by);
-       connectionsHandle.connectionLock = NULL;
        connectionsHandle.serverConnection = NULL;
         connectionsHandle.connPool = NULL;
         connectionsHandle.locked_by = NULL;
@@ -150,7 +175,9 @@ edg_wll_Connections* edg_wll_initConnections() {
   if((connectionsHandle.connPool == NULL) &&
      (connectionsHandle.poolSize > 0)) { /* We need to allocate memory for the connPool and connectionLock arrays */ 
     connectionsHandle.connPool = (edg_wll_ConnPool *) calloc(connectionsHandle.poolSize, sizeof(edg_wll_ConnPool));
+#ifdef GLITE_LB_THREADED
     connectionsHandle.connectionLock = (pthread_mutex_t *) calloc(connectionsHandle.poolSize, sizeof(pthread_mutex_t));
+#endif
     connectionsHandle.locked_by = (edg_wll_Context) calloc(connectionsHandle.poolSize, sizeof(edg_wll_Context));
 
   }