implementation of notification pool
authorMiloš Mulač <mulac@civ.zcu.cz>
Mon, 2 Feb 2009 16:30:50 +0000 (16:30 +0000)
committerMiloš Mulač <mulac@civ.zcu.cz>
Mon, 2 Feb 2009 16:30:50 +0000 (16:30 +0000)
- seems to work for basic scenario but needs extensive testing

org.glite.lb.client/interface/connection.h
org.glite.lb.client/interface/notification.h
org.glite.lb.client/src/connection.c
org.glite.lb.client/src/notification.c
org.glite.lb.client/src/notify.c
org.glite.lb.common/interface/connpool.h
org.glite.lb.common/interface/context-int.h
org.glite.lb.common/src/connpool.c
org.glite.lb.common/src/context.c

index f8eca35..5bd330d 100644 (file)
@@ -5,6 +5,7 @@
 
 int edg_wll_close(edg_wll_Context ctx,int *);
 int edg_wll_open(edg_wll_Context ctx,int *);
+int edg_wll_accept(edg_wll_Context ctx, int fd);
 int edg_wll_http_send_recv(edg_wll_Context, char *, const char * const *, char *, char **, char ***, char **);
 
 int edg_wll_close_proxy(edg_wll_Context ctx);
@@ -17,6 +18,7 @@ int ConnectionIndex(edg_wll_Context ctx, const char *name, int port);
 int AddConnection(edg_wll_Context ctx, char *name, int port);
 int ReleaseConnection(edg_wll_Context ctx, char *name, int port);
 int CloseConnection(edg_wll_Context ctx, int conn_index);
+int CloseConnectionNotif(edg_wll_Context ctx);
 
 #define PROXY_CONNECT_RETRY 10 /* ms */
 
index 5d8a021..add5370 100644 (file)
 extern "C" {
 #endif
 
+/** allocate memory for notification buffer using block of size XY
+*/
+#define NOTIF_POOL_BUFFER_BLOCK_SIZE   16384
+
 /**
  * \defgroup notifications Notifications handling
  * \brief Notifications handling.
@@ -167,6 +171,15 @@ int edg_wll_NotifCloseFd(
        edg_wll_Context         context
 );
 
+/** Close all connections in notifPool and free it.
+ * \param[in,out] context      context to work with
+ */
+
+int edg_wll_NotifClosePool(
+       edg_wll_Context         context
+);
+
+
 /*
  *@} end of group
  */
index ad5c168..f8ca315 100644 (file)
@@ -49,6 +49,35 @@ int CloseConnection(edg_wll_Context ctx, int conn_index)
 }
 
 
+int CloseConnectionNotif(edg_wll_Context ctx)
+{
+       /* close connection and free its structures */
+       int cIndex,ret = 0;
+
+
+        cIndex = ctx->connNotif->connToUse;
+
+       assert(ctx->connNotif->connOpened);
+
+       if (ctx->connNotif->connPool[cIndex].gss.sock >= 0)
+               edg_wll_gss_close(&ctx->connNotif->connPool[cIndex].gss, &ctx->p_tmp_timeout); // always returns 0
+       if (ctx->connNotif->connPool[cIndex].gsiCred != NULL) 
+               if ( (ret = edg_wll_gss_release_cred(&ctx->connNotif->connPool[cIndex].gsiCred, NULL)) )
+                       edg_wll_SetError(ctx,ret,"error in edg_wll_gss_release_cred()");        
+       free(ctx->connNotif->connPool[cIndex].peerName);
+       free(ctx->connNotif->connPool[cIndex].buf);
+       free(ctx->connNotif->connPool[cIndex].bufOut);
+       free(ctx->connNotif->connPool[cIndex].certfile);
+       
+       memset(ctx->connNotif->connPool + cIndex, 0, sizeof(edg_wll_ConnPool));
+       ctx->connNotif->connPool[cIndex].gss.sock = -1;
+       
+       ctx->connNotif->connOpened--;
+
+       return edg_wll_Error(ctx,NULL,NULL);
+}
+
+
 
 int ConnectionIndex(edg_wll_Context ctx, const char *name, int port)
 {
@@ -98,7 +127,6 @@ int ConnectionIndex(edg_wll_Context ctx, const char *name, int port)
 }
 
 
-
 int AddConnection(edg_wll_Context ctx, char *name, int port)
 {
        int i,index = -1;
@@ -125,6 +153,25 @@ int AddConnection(edg_wll_Context ctx, char *name, int port)
 }
 
 
+int SetFreeConnectionIndexNotif(edg_wll_Context ctx)
+{
+       int i;
+
+
+       ctx->connNotif->connToUse = -1;
+
+        for (i = 0; i < ctx->connNotif->poolSize; i++) 
+               if (ctx->connNotif->connPool[i].gss.sock == -1) {
+                       ctx->connNotif->connToUse = i;
+                       ctx->connNotif->connOpened++;
+                       assert(!ctx->connNotif->connPool[i].buf);
+                       break;
+               }
+
+       return ctx->connNotif->connToUse;
+}
+
+
 
 int ReleaseConnection(edg_wll_Context ctx, char *name, int port)
 {
@@ -165,6 +212,34 @@ int ReleaseConnection(edg_wll_Context ctx, char *name, int port)
 }
                        
 
+int ReleaseConnectionNotif(edg_wll_Context ctx)
+{
+       int i, index = 0;
+       long min;
+
+
+       edg_wll_ResetError(ctx);
+       if (ctx->connNotif->connOpened == 0) return 0;  /* nothing to release */
+       
+       min = ctx->connNotif->connPool[0].lastUsed.tv_sec;
+
+       /* free the oldest (unlocked) connection */
+       for (i=0; i<ctx->connNotif->poolSize; i++) {
+               assert(ctx->connNotif->connPool[i].gss.sock > -1); // Full pool expected - accept non-NULL values only
+
+               if (ctx->connections->connPool[i].lastUsed.tv_sec < min) {
+                       min = ctx->connections->connPool[i].lastUsed.tv_sec;
+                       index = i;
+               }
+       }       
+
+       ctx->connNotif->connToUse = index;
+       CloseConnectionNotif(ctx);
+
+       return edg_wll_Error(ctx,NULL,NULL);
+}
+                       
+
 
 int edg_wll_close(edg_wll_Context ctx, int* connToUse)
 {
@@ -579,3 +654,138 @@ int edg_wll_http_send_recv_proxy(
        
        return edg_wll_Error(ctx,NULL,NULL);
 }
+
+
+int edg_wll_accept(edg_wll_Context ctx, int fd)
+{
+       int recv_sock;
+       edg_wll_GssStatus gss_stat;
+       time_t lifetime = 0;
+       struct stat statinfo;
+       int acquire_cred = 0;
+        struct sockaddr_in     a;
+        socklen_t              alen;
+       edg_wll_GssStatus       gss_code;
+
+       
+       edg_wll_ResetError(ctx);
+       assert(fd > 0);
+
+       alen=sizeof(a);
+       recv_sock = accept(fd,(struct sockaddr *)&a,&alen);
+       if (recv_sock <0) {
+               edg_wll_SetError(ctx, errno, "accept() failed");
+               goto err;
+       }
+
+       if (ctx->connNotif->connOpened == ctx->connNotif->poolSize)     
+               if (ReleaseConnectionNotif(ctx)) goto err;
+       
+       if (SetFreeConnectionIndexNotif(ctx) < 0) {
+               edg_wll_SetError(ctx,EAGAIN,"connection pool size exceeded");
+               goto err; 
+       }
+
+        #ifdef EDG_WLL_CONNPOOL_DEBUG  
+               printf("Connection with fd %d accepted. %d in the pool\n",>srvName,ctx->srvPort,ctx->connNotif->connToUse);
+        #endif
+               
+
+       // In case of using a specifically given cert file, stat it and check for the need to reauthenticate
+       if (ctx->p_proxy_filename || ctx->p_cert_filename) {
+               if (ctx->connNotif->connPool[ctx->connNotif->connToUse].certfile)       {       // Has the file been stated before?
+                       stat(ctx->p_proxy_filename ? ctx->p_proxy_filename : ctx->p_cert_filename, &statinfo);
+                       if (ctx->connNotif->connPool[ctx->connNotif->connToUse].certfile->st_mtime != statinfo.st_mtime)
+                               acquire_cred = 1;       // File has been modified. Need to acquire new creds.
+               }
+               else acquire_cred = 1; 
+       }
+               
+       // Check if credentials exist. If so, check validity
+       if (ctx->connNotif->connPool[ctx->connNotif->connToUse].gsiCred) {
+               lifetime = ctx->connNotif->connPool[ctx->connNotif->connToUse].gsiCred->lifetime;
+               #ifdef EDG_WLL_CONNPOOL_DEBUG   
+                       printf ("Credential exists, lifetime: %d\n", lifetime);
+               #endif
+               if (!lifetime) acquire_cred = 1;        // Credentials exist and lifetime is OK. No need to authenticate.
+       }
+       else {
+                       acquire_cred = 1;       // No credentials exist so far, acquire. 
+       }
+
+
+       if (acquire_cred) {
+               edg_wll_GssCred newcred = NULL;
+               if (edg_wll_gss_acquire_cred_gsi(
+                       ctx->p_proxy_filename ? ctx->p_proxy_filename : ctx->p_cert_filename,
+                      ctx->p_proxy_filename ? ctx->p_proxy_filename : ctx->p_key_filename,
+                      &newcred, &gss_stat)) {
+                   edg_wll_SetErrorGss(ctx, "failed to load GSI credentials", &gss_stat);
+                   goto err;
+               } else {
+                       if (ctx->connNotif->connPool[ctx->connNotif->connToUse].gsiCred != NULL)
+                               edg_wll_gss_release_cred(&ctx->connNotif->connPool[ctx->connNotif->connToUse].gsiCred,&gss_stat);
+                       ctx->connNotif->connPool[ctx->connNotif->connToUse].gsiCred = newcred;
+                       newcred = NULL;
+
+                       // Credentials Acquired successfully. Storing file identification.
+                       #ifdef EDG_WLL_CONNPOOL_DEBUG   
+                               printf("Cert file: %s\n", ctx->p_proxy_filename ? ctx->p_proxy_filename : ctx->p_cert_filename);
+                       #endif
+
+                       if (ctx->p_proxy_filename || ctx->p_cert_filename) {
+                               if (!ctx->connNotif->connPool[ctx->connNotif->connToUse].certfile) // Allocate space for certfile stats
+                                       ctx->connNotif->connPool[ctx->connNotif->connToUse].certfile = 
+                                               (struct stat*)calloc(1, sizeof(struct stat));
+                               stat(ctx->p_proxy_filename ? ctx->p_proxy_filename : ctx->p_cert_filename, ctx->connNotif->connPool[ctx->connNotif->connToUse].certfile);
+                       }
+               }
+       }
+
+       assert(ctx->connNotif->connPool[ctx->connNotif->connToUse].gss.context == NULL);
+
+       switch ( edg_wll_gss_accept(ctx->connNotif->connPool[ctx->connNotif->connToUse].gsiCred, recv_sock,
+                       &ctx->p_tmp_timeout, &ctx->connNotif->connPool[ctx->connNotif->connToUse].gss,&gss_code)) {
+
+               case EDG_WLL_GSS_OK:
+                       break;
+               case EDG_WLL_GSS_ERROR_ERRNO:
+                       edg_wll_SetError(ctx,errno,"failed to receive notification");
+                       goto err;
+               case EDG_WLL_GSS_ERROR_GSS:
+                       edg_wll_SetErrorGss(ctx, "failed to authenticate sender", &gss_code);
+                       goto err;
+               case EDG_WLL_GSS_ERROR_HERRNO:
+                       { const char *msg1;
+                         char *msg2;
+                         msg1 = hstrerror(errno);
+                         asprintf(&msg2, "edg_wll_gss_connect(): %s", msg1);
+                         edg_wll_SetError(ctx,EDG_WLL_ERROR_DNS, msg2);
+                         free(msg2);
+                       }
+                       break;
+               case EDG_WLL_GSS_ERROR_EOF:
+                       edg_wll_SetError(ctx,ECONNREFUSED,"sender closed the connection");
+                       goto err;
+               case EDG_WLL_GSS_ERROR_TIMEOUT:
+                       edg_wll_SetError(ctx,ETIMEDOUT,"accepting notification");
+                       goto err;
+               default:
+                       edg_wll_SetError(ctx, ENOTCONN, "failed to accept notification");
+                       goto err;
+       }
+
+       return edg_wll_Error(ctx,NULL,NULL);
+
+
+err:
+       /* some error occured; close created connection
+        * and free all fields in connPool[ctx->connNotif->connToUse] */
+       if (ctx->connNotif->connToUse >= 0) {
+               CloseConnectionNotif(ctx);
+       }
+
+
+       return edg_wll_Error(ctx,NULL,NULL);
+}
+
index 32fac6f..469c398 100644 (file)
@@ -10,6 +10,7 @@
 #include <netdb.h>
 #include <poll.h>
 #include <stdio.h>
+#include <assert.h>
 
 #include "glite/security/glite_gss.h"
 #include "glite/lbu/escape.h"
@@ -446,90 +447,127 @@ err:
 }
 
 
+/* read all data available on active notif connection */
+static int read_data(edg_wll_Context ctx)
+{
+       edg_wll_GssStatus       gss_code;
+       int                     ret;
+       edg_wll_ConnPool        *notif = &(ctx->connNotif->connPool[ctx->connNotif->connToUse]);
+
+
+       do {
+               if (notif->bufUse == notif->bufSize) {
+                       notif->bufSize += NOTIF_POOL_BUFFER_BLOCK_SIZE;
+                       notif->buf = realloc(notif->buf, notif->bufSize);
+                       if (!notif->buf) 
+                               return edg_wll_SetError(ctx, ENOMEM, "read_data()");
+               }
+
+               ret = edg_wll_gss_read(&(notif->gss), notif->buf + notif->bufUse, 
+                       notif->bufSize - notif->bufUse, &ctx->p_tmp_timeout, &gss_code);
+
+               if (ret < 0) {
+                       switch(ret) {
+                               case EDG_WLL_GSS_ERROR_TIMEOUT:
+                                       edg_wll_SetError(ctx, ETIMEDOUT, "read message");
+                                       break;
+                               case EDG_WLL_GSS_ERROR_EOF:
+                                       edg_wll_SetError(ctx, ENOTCONN, NULL);
+                                       break;
+                               default:
+                                       edg_wll_SetError(ctx, EDG_WLL_ERROR_GSS, "read message");
+                                       break;
+                       }
+                       goto err;
+               }       
+               notif->bufUse += ret;
+
+       } while (notif->bufSize == notif->bufUse); 
+
+err:
+       return edg_wll_Error(ctx, NULL, NULL);
+}
+
 
-static int gss_reader(void *user_data, char *buffer, int max_len)
+/* copies bytes_to_copy chars from pool buffer to given _allocated_ working buffer
+ */
+static int buffer_reader(void *user_data, char *buffer, const int bytes_to_copy)
 {
-  edg_wll_GssStatus gss_code;
-  edg_wll_Context tmp_ctx = (edg_wll_Context)user_data;
-  int ret;
-  size_t       len;
-
-  ret = edg_wll_gss_read_full(&tmp_ctx->connPoolNotif[0].gss,
-                             buffer, max_len,
-                             &tmp_ctx->p_tmp_timeout,
-                             &len, &gss_code);
-  if(ret < 0)
-    switch(ret) {
-    case EDG_WLL_GSS_ERROR_TIMEOUT:
-      edg_wll_SetError(tmp_ctx, ETIMEDOUT, "read message");
-      break;
-    case EDG_WLL_GSS_ERROR_EOF:
-      edg_wll_SetError(tmp_ctx, ENOTCONN, NULL);
-      break;
-    default:
-      edg_wll_SetError(tmp_ctx, EDG_WLL_ERROR_GSS, "read message");
-      break;
-    }
-
-  return(ret);
+       edg_wll_Context tmp_ctx = (edg_wll_Context)user_data;
+
+
+       if (tmp_ctx->connNotif->connPool[tmp_ctx->connNotif->connToUse].bufUse < bytes_to_copy) 
+               return(-1);
+
+       memcpy(buffer,tmp_ctx->connNotif->connPool[tmp_ctx->connNotif->connToUse].buf + 
+               tmp_ctx->connNotif->connPool[tmp_ctx->connNotif->connToUse].bufPtr, bytes_to_copy);
+       tmp_ctx->connNotif->connPool[tmp_ctx->connNotif->connToUse].bufPtr += bytes_to_copy;
+
+       return(bytes_to_copy);
 }
 
 
-static int recv_notif(edg_wll_Context ctx)
+static int recv_notif(edg_wll_Context ctx, char **message)
 {
        int     len;
        
-       if (ctx->connPoolNotif[0].buf) {
-               free(ctx->connPoolNotif[0].buf);
-               ctx->connPoolNotif[0].buf = NULL;
-       }
-       ctx->connPoolNotif[0].bufUse = 0;
-       ctx->connPoolNotif[0].bufSize = 0;
-       
-       len = read_il_data(ctx, &ctx->connPoolNotif[0].buf, gss_reader);
+
+       len = read_il_data(ctx, message, buffer_reader);
        if(len < 0)
          return(len);
        
-       ctx->connPoolNotif[0].bufSize = len+1;
-       ctx->connPoolNotif[0].bufUse = len+1;
-
-       
        return edg_wll_Error(ctx,NULL,NULL);
 }
 
 
-
-static int send_reply(const edg_wll_Context ctx)
+static int prepare_reply(const edg_wll_Context ctx)
 {
-       int     ret, len, err_code, err_code_min = 0;
-       char    *buf, *err_msg = NULL;
-       size_t  total;
-       edg_wll_GssStatus  gss_code;
+       int     len, err_code, err_code_min = 0;
+       char    *err_msg = NULL;
 
        
        err_code = edg_wll_Error(ctx,NULL,&err_msg);
 
        if (!err_msg) err_msg=strdup("OK");
        
-       len = encode_il_reply(&buf, err_code, err_code_min, err_msg);
+       len = encode_il_reply(&(ctx->connNotif->connPool[ctx->connNotif->connToUse].bufOut),
+                err_code, err_code_min, err_msg);
+
        if(len < 0) {
                edg_wll_SetError(ctx,E2BIG,"create_reply()");
                goto err;
        }
 
-       ret = edg_wll_gss_write_full(&ctx->connPoolNotif[0].gss,
-                                    buf,len,&ctx->p_tmp_timeout,&total, &gss_code);
+       ctx->connNotif->connPool[ctx->connNotif->connToUse].bufSizeOut = len;
+err:
+       free(err_msg);
+       return edg_wll_Error(ctx,NULL,NULL);
+}
+
+
+// send pool output buffer nonblockingly
+// XXX: missing partial-sent logic (gss_write does not return written bytes count)
+static int send_reply(const edg_wll_Context ctx)
+{
+       int     ret;
+       edg_wll_GssStatus       gss_code;
+
+
+       ret = edg_wll_gss_write(&ctx->connNotif->connPool[ctx->connNotif->connToUse].gss,
+               ctx->connNotif->connPool[ctx->connNotif->connToUse].bufOut, 
+               ctx->connNotif->connPool[ctx->connNotif->connToUse].bufSizeOut, &ctx->p_tmp_timeout, &gss_code);
+
        if (ret < 0) {
+               // edg_wll_gss_write does not return how many bytes were written, so
+               // any error is fatal for us and we can close connection
                edg_wll_SetError(ctx,
                                ret == EDG_WLL_GSS_ERROR_TIMEOUT ? 
                                        ETIMEDOUT : EDG_WLL_ERROR_GSS,
-                                       "write reply");
+                                       "send_reply()");
                goto err;
        }
        
 err:
-       if(buf) free(buf);
-       free(err_msg);
        return edg_wll_Error(ctx,NULL,NULL);
 }
 
@@ -543,321 +581,168 @@ int edg_wll_NotifReceive(
         edg_wll_JobStat         *state_out,
         edg_wll_NotifId         *id_out)
 
-/* pullup from INFN, support multiple messages from interlogger */
-#if 0
+/* NotifReceive */
 {
-       fd_set                  fds;    
-       struct sockaddr_in      a;
-       int                     recv_sock, alen;
+       int                     i, j, fd_num = ctx->connNotif->connOpened + 1;
+       struct _fd_map {
+               struct pollfd   pollfds[fd_num];
+               int             index[fd_num];
+       }                       fd_map;
        edg_wll_Event           *event = NULL;
-       struct timeval          start_time,check_time,tv;
-       char                    *p = NULL, *ucs = NULL,
-                                       *event_char = NULL, *jobstat_char = NULL;
-       int                     ret;
-       edg_wll_GssStatus       gss_code;
+       struct timeval          start_time,check_time;
+       char                    *event_char = NULL, *jobstat_char = NULL, *message = NULL;
        
        
        edg_wll_ResetError(ctx);
 
+       ctx->p_tmp_timeout = *timeout;
        /* start timer */
        gettimeofday(&start_time,0);
-       
+
        if (fd == -1) {
                if (ctx->notifSock == -1) {
-                       edg_wll_SetError(ctx, EINVAL, "No client socket opened.");
-                       goto err;
+                       return edg_wll_SetError(ctx, EINVAL, "No client socket opened.");
                }
                else {
                        fd = ctx->notifSock;
                }
        }
 
-       FD_ZERO(&fds);
-       FD_SET(fd,&fds);
+       fd_map.pollfds[0].fd = fd;
+       fd_map.pollfds[0].events = POLLIN;
+       fd_map.index[0] = -1;
        
-       tv.tv_sec = timeout->tv_sec;
-       tv.tv_usec = timeout->tv_usec;
+       j = 1;
+       for (i=0; i < ctx->connNotif->poolSize; i++) {
+               if (ctx->connNotif->connPool[i].gss.sock != -1) {
+                       fd_map.pollfds[j].fd = ctx->connNotif->connPool[i].gss.sock;
+                       fd_map.pollfds[j].events = POLLIN;
+                       fd_map.index[j] = i;
+                       j++;
+               }
+       }
        
-       switch(select(fd+1, &fds, NULL, NULL, &tv)) {
+       /* XXX   notif_send() & notif_receive() should then migrate to          */
+       /*       client/connection.c and use connPool management f-cions        */
+
+       /* XXX: long-lived contexts may have problems, TODO implement credential reload */
+
+       switch(poll(fd_map.pollfds, fd_num, ctx->p_tmp_timeout.tv_sec*1000+ctx->p_tmp_timeout.tv_usec/1000)) {
                case -1:
-                       edg_wll_SetError(ctx, errno, "select() failed");
+                       edg_wll_SetError(ctx, errno, "edg_wll_NotifReceive: poll() failed");
                        goto err;
                case 0:
-                       edg_wll_SetError(ctx, ETIMEDOUT, "select() timeouted");
+                       edg_wll_SetError(ctx, ETIMEDOUT, "edg_wll_NotifReceive: poll() timed out");
                        goto err;
                default:
+                       for (i=0; i < fd_num; i++) {
+                               if ((fd_map.pollfds[i].revents & POLLIN)) {
+                                       break;
+                               }
+                       }
+                       if (fd_num == i) { 
+                               /* no events on any socket */
+                               edg_wll_SetError(ctx, errno, "edg_wll_NotifReceive: error on filedescriptor");
+                               goto err;
+                       }
                        break;
        }
 
-/* check time */
+       /* check time */
        gettimeofday(&check_time,0);
-       if (decrement_timeout(&tv, start_time, check_time)) {
+       if (decrement_timeout(&ctx->p_tmp_timeout, start_time, check_time)) {
                edg_wll_SetError(ctx, ETIMEDOUT, "edg_wll_NotifReceive()");
                goto err;
        }
-       
-       start_time = check_time;
-       
-       alen=sizeof(a);
-       recv_sock = accept(fd,&a,&alen);
-       if (recv_sock <0) {
-               edg_wll_SetError(ctx, errno, "accept() failed");
-               goto err;
-       }
 
-       ret = edg_wll_gss_accept(ctx->connPool[ctx->connToUse].gsiCred, recv_sock,
-                                &tv, &ctx->connPool[ctx->connToUse].gss, &gss_code);
+       // there is some incomming connection(s)
+       // XXX: what has higher priority? new connection, or data on active connection ?
 
-       if (ret) {
-               edg_wll_SetError(ctx, errno, "GSS authentication failed.");
-               goto err;       
+       if (fd_map.pollfds[0].revents & POLLIN) {       /* new connection */
+               start_time = check_time;
+               
+               if (edg_wll_accept(ctx,fd)) goto err;
+               
+               /* check time */
+               gettimeofday(&check_time,0);
+               if (decrement_timeout(&ctx->p_tmp_timeout, start_time, check_time)) {
+                       edg_wll_SetError(ctx, ETIMEDOUT, "edg_wll_NotifReceive()");
+                       goto err;
+               }
        }
-       
-/* check time */
-       gettimeofday(&check_time,0);
-       if (decrement_timeout(&tv, start_time, check_time)) {
-               edg_wll_SetError(ctx, ETIMEDOUT, "edg_wll_NotifReceive()");
-               goto err;
+       else {  /* data on some of active (NotifPool) connections */
+               // XXX: if data arrives for more connections, which one serve first?
+               //      poor-man solution - take first one on which data arrived
+               for (i=1; i < fd_num; i++) {
+                       if (fd_map.pollfds[i].revents & POLLIN) { 
+                               ctx->connNotif->connToUse = fd_map.index[i];
+                               break;
+                       }
+               }
+               assert(i < fd_num);
        }
        
-       start_time = check_time;
+       /****************************************************************/
+       /* Communication with notif-interlogger                         */
+       /****************************************************************/
        
+       start_time = check_time;
 
-       ctx->p_tmp_timeout = tv;
-               
-       if (recv_notif(ctx)) {
-               /* error set in recv_notif() */
-               goto err;
-       }       
-
-       if (send_reply(ctx)) {
-               /* error set in send_reply() */
+       if (read_data(ctx)) {
+               ctx->connNotif->connPool[ctx->connNotif->connToUse].bufPtr = 0;
                goto err;
-       }       
-       
-       p = ctx->connPool[ctx->connToUse].buf;
-       p = get_string(p, &ucs);
-       if (p == NULL) return edg_wll_SetError(ctx,EDG_WLL_IL_PROTO,"reading UCS");
-       free(ucs);
-
-       p = get_string(p, &event_char);
-       if (p == NULL) {
-               free(ucs);
-               return edg_wll_SetError(ctx,EDG_WLL_IL_PROTO,"reading event string");;
        }
-       
-       /****************************************************************/
-       /* end of  notif-interlogger message exchange                   */
-       /****************************************************************/
-       
+
        /* check time */
        gettimeofday(&check_time,0);
-       if (decrement_timeout(&tv, start_time, check_time)) {
+       if (decrement_timeout(&ctx->p_tmp_timeout, start_time, check_time)) {
                edg_wll_SetError(ctx, ETIMEDOUT, "edg_wll_NotifReceive()");
                goto err;
        }
        start_time = check_time;
-               
-       event = edg_wll_InitEvent(EDG_WLL_EVENT_NOTIFICATION);
-       if (edg_wll_ParseNotifEvent(ctx, event_char, &event)) {
-               goto err;
-       }
 
-       jobstat_char = glite_lbu_UnescapeXML((const char *) event->notification.jobstat);
-       if (jobstat_char == NULL) {
-               edg_wll_SetError(ctx, EINVAL, "glite_lbu_UnescapeXML()");
-               goto err;
-       }
-               
-       /* fill in return values
-        */
-       if ( edg_wll_ParseJobStat(ctx, jobstat_char, 
-                               strlen(jobstat_char), state_out)) {
+       if (recv_notif(ctx, &message) < 0) {
+               ctx->connNotif->connPool[ctx->connNotif->connToUse].bufPtr = 0;
                goto err;
        }
-       
-       if ( id_out ) {
-               *id_out = event->notification.notifId;
-               event->notification.notifId = NULL;
-       }
-       
-       
-err:
-       if (event) { 
-               edg_wll_FreeEvent(event);
-               // XXX - konzultovat s honikem; podle meho by to free 
-               // mel delat uz edg_wll_FreeEvent
-               //free(event);
-       }
-
-       free(ctx->connPool[ctx->connToUse].buf);
-       ctx->connPool[ctx->connToUse].buf = NULL;
-       ctx->connPool[ctx->connToUse].bufUse = 0;
-       ctx->connPool[ctx->connToUse].bufSize = 0;
-       
-       free(event_char);
-       free(jobstat_char);
 
-       // XXX
-       // konzultovat s Danem
-       /* Dan: ??? */
-       edg_wll_gss_close(&ctx->connPool[ctx->connToUse].gss, NULL);
-       
-       return edg_wll_Error(ctx,NULL,NULL);
-}
-#endif
-/* NotifReceive */
-{
-       struct pollfd           pollfds[1];
-       struct sockaddr_in      a;
-       int                     recv_sock;
-       socklen_t                       alen;
-       edg_wll_Event           *event = NULL;
-       struct timeval          start_time,check_time,tv;
-       char                    *event_char = NULL, *jobstat_char = NULL;
-       edg_wll_GssStatus       gss_code;
-       
+       /* receive message complete, free input buffer */
+       free(ctx->connNotif->connPool[ctx->connNotif->connToUse].buf);
+       ctx->connNotif->connPool[ctx->connNotif->connToUse].buf = NULL;
+       ctx->connNotif->connPool[ctx->connNotif->connToUse].bufPtr = 0;
+       ctx->connNotif->connPool[ctx->connNotif->connToUse].bufUse = 0;
+       ctx->connNotif->connPool[ctx->connNotif->connToUse].bufSize = 0;
        
-       edg_wll_ResetError(ctx);
 
-       /* start timer */
-       gettimeofday(&start_time,0);
-       
-       if (fd == -1) {
-               if (ctx->notifSock == -1) {
-                       edg_wll_SetError(ctx, EINVAL, "No client socket opened.");
-                       goto err;
-               }
-               else {
-                       fd = ctx->notifSock;
-               }
+       if (prepare_reply(ctx) || send_reply(ctx)) { 
+               // fatal error
+               // XXX: output buffer not used between edg_wll_NotifReceive() calls
+               CloseConnectionNotif(ctx);      
+               goto err;       
        }
-       
-       pollfds[0].fd = fd;
-       pollfds[0].events = POLLIN;
-       tv.tv_sec = timeout->tv_sec;
-       tv.tv_usec = timeout->tv_usec;
-       
-       
-select:
-       /* XXX - index 0 is used because of absence of connection management    */
-               /*       to use it, support in client/connection.c needed               */
-       /*       it is better to separate it from ctx->connPool, which is used  */
-       /*       for outgouing messages to server                               */
-       /*       In future it should be in context, so one could use:           */
-       /*       ctx->connPoolNotif[ctx->connPoolNotifToUse]                    */
-       /*       notif_send() & notif_receive() should then migrate to          */
-       /*       client/connection.c and use connPool management f-cions        */
 
-       /* XXX: long-lived contexts may have problems, TODO implement credential reload */
-
-       if (!ctx->connPoolNotif[0].gsiCred) {
-               if (edg_wll_gss_acquire_cred_gsi(
-                       ctx->p_proxy_filename ? ctx->p_proxy_filename : ctx->p_cert_filename,
-                       ctx->p_proxy_filename ? ctx->p_proxy_filename : ctx->p_key_filename,
-                       &ctx->connPoolNotif[0].gsiCred,&gss_code))
-               {
-                       edg_wll_SetErrorGss(ctx,"failed aquiring credentials",&gss_code);
-                       goto err;
-               }
+       /* check time */
+       gettimeofday(&check_time,0);
+       if (decrement_timeout(&ctx->p_tmp_timeout, start_time, check_time)) {
+               edg_wll_SetError(ctx, ETIMEDOUT, "edg_wll_NotifReceive()");
+               goto err;
        }
-       
-       if (ctx->connPoolNotif[0].gss.context == NULL) 
-       {       
-               int     ret;
-               switch(poll(pollfds, 1, tv.tv_sec*1000+tv.tv_usec/1000)) {
-                       case -1:
-                               edg_wll_SetError(ctx, errno, "edg_wll_NotifReceive: poll() failed");
-                               goto err;
-                       case 0:
-                               edg_wll_SetError(ctx, ETIMEDOUT, "edg_wll_NotifReceive: poll() timed out");
-                               goto err;
-                       default:
-                               if (!(pollfds[0].revents & POLLIN)) {
-                                       edg_wll_SetError(ctx, errno, "edg_wll_NotifReceive: error on filedescriptor");
-                                       goto err;
-                               }
-                               break;
-               }
-
-               /* check time */
-               gettimeofday(&check_time,0);
-               if (decrement_timeout(&tv, start_time, check_time)) {
-                       edg_wll_SetError(ctx, ETIMEDOUT, "edg_wll_NotifReceive()");
-                       goto err;
-               }
-               start_time = check_time;
-               
-               alen=sizeof(a);
-               recv_sock = accept(fd,(struct sockaddr *)&a,&alen);
-               if (recv_sock <0) {
-                       edg_wll_SetError(ctx, errno, "accept() failed");
-                       goto err;
-               }
-
-               ret = edg_wll_gss_accept(ctx->connPoolNotif[0].gsiCred, recv_sock,
-                               &tv, &ctx->connPoolNotif[0].gss,&gss_code);
-
-               switch (ret) {
-                       case EDG_WLL_GSS_OK:
-                               break;
-                       case EDG_WLL_GSS_ERROR_ERRNO:
-                               edg_wll_SetError(ctx,errno,"failed to receive notification");
-                               goto err;
-                       case EDG_WLL_GSS_ERROR_GSS:
-                               edg_wll_SetErrorGss(ctx, "failed to authenticate sender", &gss_code);
-                               goto err;
-                       case EDG_WLL_GSS_ERROR_EOF:
-                               edg_wll_SetError(ctx,ECONNREFUSED,"sender closed the connection");
-                               goto err;
-                       case EDG_WLL_GSS_ERROR_TIMEOUT:
-                               edg_wll_SetError(ctx,ETIMEDOUT,"accepting notification");
-                               goto err;
-                       default:
-                               edg_wll_SetError(ctx, ENOTCONN, "failed to accept notification");
-                               goto err;
-               }
-
-               /* check time */
-               gettimeofday(&check_time,0);
-               if (decrement_timeout(&tv, start_time, check_time)) {
-                       edg_wll_SetError(ctx, ETIMEDOUT, "edg_wll_NotifReceive()");
-                       goto err;
-               }
-               start_time = check_time;
-       }       
+       start_time = check_time;
 
-       
-       ctx->p_tmp_timeout = tv;
-               
-       /****************************************************************/
-       /* Communication with notif-interlogger                         */
-       /****************************************************************/
-       
-       if (recv_notif(ctx)) {
-               if (ctx->errCode == ENOTCONN) {
-                       /* other side (interlogger-notif) probably closed connection */
-                       edg_wll_ResetError(ctx);
-                       
-                       edg_wll_gss_close(&ctx->connPoolNotif[0].gss,NULL);
-                       // buffer is freed in recv_notif()
-               
-                       goto select;
-               }                               
-               else {
-                       goto err;       /* error set in recv_notif() */
-               }
-       }       
+       /* response sent, free output buffer */
+       free(ctx->connNotif->connPool[ctx->connNotif->connToUse].bufOut);
+       ctx->connNotif->connPool[ctx->connNotif->connToUse].bufOut = NULL;
+       ctx->connNotif->connPool[ctx->connNotif->connToUse].bufUseOut = 0;
+       ctx->connNotif->connPool[ctx->connNotif->connToUse].bufSizeOut = 0;
+       ctx->connNotif->connPool[ctx->connNotif->connToUse].bufPtrOut = 0;
 
-       if (send_reply(ctx)) {
-               goto err;               /* error set in send_reply() */
-       }
-       
        {
                il_octet_string_t ev;
 
-               if(decode_il_msg(&ev, ctx->connPoolNotif[0].buf) < 0)
+                if ( decode_il_msg(&ev, message) < 0 ) {
+                       CloseConnectionNotif(ctx);
                        return edg_wll_SetError(ctx, EDG_WLL_IL_PROTO, "decoding event string");
+               }
                event_char = ev.data;
        }
        /****************************************************************/
@@ -866,7 +751,7 @@ select:
        
        /* check time */
        gettimeofday(&check_time,0);
-       if (decrement_timeout(&tv, start_time, check_time)) {
+       if (decrement_timeout(&ctx->p_tmp_timeout, start_time, check_time)) {
                edg_wll_SetError(ctx, ETIMEDOUT, "edg_wll_NotifReceive()");
                goto err;
        }
@@ -889,7 +774,7 @@ select:
                                strlen(jobstat_char), state_out)) {
                goto err;
        }
-       
+
        if (id_out) { 
                *id_out = event->notification.notifId;
                event->notification.notifId = NULL;
@@ -903,11 +788,6 @@ err:
                //free(event);
        }
 
-       free(ctx->connPoolNotif[0].buf);
-       ctx->connPoolNotif[0].buf = NULL;
-       ctx->connPoolNotif[0].bufUse = 0;
-       ctx->connPoolNotif[0].bufSize = 0;
-       
        free(event_char);
        free(jobstat_char);
 
@@ -933,9 +813,6 @@ int edg_wll_NotifCloseFd(
        int err;
        
        if (ctx->notifSock >= 0) {
-               if (ctx->connPoolNotif[0].gss.context != NULL) {
-                       edg_wll_gss_close(&ctx->connPoolNotif[0].gss, NULL);
-               }
                err = close(ctx->notifSock);
                ctx->notifSock = -1;
                
@@ -945,3 +822,17 @@ int edg_wll_NotifCloseFd(
 
        return edg_wll_Error(ctx,NULL,NULL);
 }
+
+int edg_wll_NotifClosePool(
+        edg_wll_Context         ctx)
+{
+       if (ctx->connNotif->connOpened) {
+               for (ctx->connNotif->connToUse=0; ctx->connNotif->connToUse < ctx->connNotif->poolSize; 
+                               ctx->connNotif->connToUse++) {
+                       if (ctx->connNotif->connPool[ctx->connNotif->connToUse].gss.sock != -1) CloseConnectionNotif(ctx);
+                       
+               }
+       }
+
+       return edg_wll_Error(ctx,NULL,NULL);
+}
index d75b1ee..6e607b3 100644 (file)
@@ -203,7 +203,7 @@ int main(int argc,char **argv)
                edg_wll_NotifId         nid = NULL;
                int                     c;
                char    *field_arg = "owner",*err;
-               time_t  client_tout = time(NULL) + 60;
+               time_t  client_tout = time(NULL) + 600;
                int     refresh = 0;
                struct timeval          tout;
                time_t  opt_valid = 0,do_refresh = client_tout + 999999999,now;
@@ -269,6 +269,7 @@ int main(int argc,char **argv)
                        tout.tv_usec = 0;
 
                        edg_wll_FreeStatus(&stat);
+                       stat.state = EDG_WLL_JOB_UNDEF;
                
                        if ( (err = edg_wll_NotifReceive(ctx, sock, &tout, &stat, &recv_nid)) ) {
                                edg_wll_NotifIdFree(recv_nid);
@@ -324,6 +325,7 @@ receive_err:
                if (stat.state != EDG_WLL_JOB_UNDEF) edg_wll_FreeStatus(&stat);
                if (nid) edg_wll_NotifIdFree(nid);
                edg_wll_NotifCloseFd(ctx);
+               edg_wll_NotifClosePool(ctx);
 
                if (edg_wll_Error(ctx,&errt,&errd))
                        fprintf(stderr, "%s: %s (%s)\n", me, errt, errd);
index 3393349..8b76f40 100644 (file)
@@ -26,9 +26,10 @@ extern "C" {
 #ifndef EDG_WLL_CONNPOOL_DECLARED
 #define EDG_WLL_CONNPOOL_DECLARED 1
 
-#define        GLITE_LB_COMMON_CONNPOOL_SIZE   50
+#define        GLITE_LB_COMMON_CONNPOOL_SIZE           50
+#define        GLITE_LB_COMMON_CONNPOOL_NOTIF_SIZE     50
 
-glite_lb_padded_struct(_edg_wll_ConnPool,15,
+glite_lb_padded_struct(_edg_wll_ConnPool,25,
 /* address and port where we are connected to */
         char            *peerName;
         unsigned int    peerPort;
@@ -37,6 +38,7 @@ glite_lb_padded_struct(_edg_wll_ConnPool,15,
         edg_wll_GssCred   gsiCred;
         edg_wll_GssConnection   gss;
         char            *buf;
+       int             bufPtr;
         int             bufUse;
        int             bufSize;
 
@@ -47,6 +49,11 @@ glite_lb_padded_struct(_edg_wll_ConnPool,15,
 
        struct stat     *certfile;      
 
+/* output buffer */
+       char    *bufOut;
+       int     bufPtrOut;
+       int     bufUseOut;      
+       int     bufSizeOut;
 );
 typedef struct _edg_wll_ConnPool  edg_wll_ConnPool;
 #endif
@@ -64,6 +71,7 @@ typedef struct _edg_wll_Connections {
 /* pool of connections from client */
         int             poolSize;       /* number of connections in the pool */
         int             connOpened;     /* number of opened connections  */
+       int             connToUse;      /* active connection we are working with */
 
 /* Connection pool locks & accessories.*/
 #ifdef GLITE_LB_THREADED
@@ -110,6 +118,9 @@ void edg_wll_poolFree();
     in case memory has been already allocated, just return a pointer */
 edg_wll_Connections* edg_wll_initConnections();
 
+/** Initializes connNotif structure */
+void edg_wll_initConnNotif(edg_wll_Connections *);
+
 
 #ifdef __cplusplus
 }
index b717155..d2ad4dc 100644 (file)
@@ -65,7 +65,7 @@ glite_lb_padded_struct(_edg_wll_Context,150,
        void            *dbctx;
        int             dbcaps;
        edg_wll_Connections     *connections;
-       edg_wll_ConnPool        *connPoolNotif;         /* hold _one_ connection from notif-interlogger */
+       edg_wll_Connections     *connNotif;             /* holds connections from notif-interlogger */
        edg_wll_ConnProxy       *connProxy;             /* holds one plain connection */
 
        edg_wll_QueryRec        **job_index;
index e6f4089..0f287a9 100644 (file)
@@ -2,14 +2,19 @@
 
 #ifdef GLITE_LB_THREADED
 edg_wll_Connections connectionsHandle = 
-  { NULL , NULL , GLITE_LB_COMMON_CONNPOOL_SIZE , 0 , PTHREAD_MUTEX_INITIALIZER , NULL , NULL};
+  { NULL , NULL , GLITE_LB_COMMON_CONNPOOL_SIZE , 0 , 0,  PTHREAD_MUTEX_INITIALIZER , NULL , NULL};
+edg_wll_Connections connNotifInitializer = 
+  { NULL , NULL , GLITE_LB_COMMON_CONNPOOL_NOTIF_SIZE , 0 , 0 , PTHREAD_MUTEX_INITIALIZER , NULL, NULL};
 #endif
 
 #ifndef GLITE_LB_THREADED
 edg_wll_Connections connectionsHandle =
-  { NULL , NULL , GLITE_LB_COMMON_CONNPOOL_SIZE , 0 , NULL};
+  { NULL , NULL , GLITE_LB_COMMON_CONNPOOL_SIZE , 0 , 0, NULL};
+edg_wll_Connections connNotifInitializer = 
+  { NULL , NULL , GLITE_LB_COMMON_CONNPOOL_NOTIF_SIZE , 0 , 0 , NULL};
 #endif
 
+
 /** Lock (try) the pool */
 int edg_wll_poolTryLock() {
 int RetVal;
@@ -277,3 +282,13 @@ edg_wll_Connections* edg_wll_initConnections() {
 }
 
 
+void edg_wll_initConnNotif(edg_wll_Connections *conn) 
+{
+       int     i;
+
+       *conn = connNotifInitializer;
+        conn->connPool = (edg_wll_ConnPool *) calloc(conn->poolSize, sizeof(edg_wll_ConnPool));
+        for (i=0; i<conn->poolSize; i++) {
+               conn->connPool[i].gss.sock = -1;
+        }
+}
index 50af048..fe09148 100644 (file)
@@ -38,8 +38,8 @@ int edg_wll_InitContext(edg_wll_Context *ctx)
        out->p_tmp_timeout.tv_usec = out->p_log_timeout.tv_usec;
 
         out->connections = edg_wll_initConnections();
-//     out->connections->connPool = (edg_wll_ConnPool *) calloc(out->connections->poolSize, sizeof(edg_wll_ConnPool));
-       out->connPoolNotif = (edg_wll_ConnPool *) calloc(1, sizeof(edg_wll_ConnPool));
+       out->connNotif = (edg_wll_Connections *) calloc(1, sizeof(edg_wll_Connections));
+       edg_wll_initConnNotif(out->connNotif);
        out->connProxy = (edg_wll_ConnProxy *) calloc(1, sizeof(edg_wll_ConnProxy));
        out->connProxy->conn.sock = -1;
 //     out->connToUse = -1;
@@ -63,6 +63,7 @@ int edg_wll_InitContext(edg_wll_Context *ctx)
 void edg_wll_FreeContext(edg_wll_Context ctx)
 {
        struct timeval close_timeout = {0, 50000};
+       int     i;
 
        if (!ctx) return;
 #ifdef CTXTRACE
@@ -114,13 +115,17 @@ void edg_wll_FreeContext(edg_wll_Context ctx)
                }       
                free(ctx->connections->connPool);*/
        }
-       if (ctx->connPoolNotif) {
-               if (ctx->connPoolNotif[0].peerName) free(ctx->connPoolNotif[0].peerName);
-               edg_wll_gss_close(&ctx->connPoolNotif[0].gss,&close_timeout);
-               if (ctx->connPoolNotif[0].gsiCred)
-                       edg_wll_gss_release_cred(&ctx->connPoolNotif[0].gsiCred, NULL);
-               if (ctx->connPoolNotif[0].buf) free(ctx->connPoolNotif[0].buf);
-               free(ctx->connPoolNotif);
+       if (ctx->connNotif) {
+               for (i=0; i<ctx->connNotif->poolSize; i++) {
+                       if (ctx->connNotif->connPool[i].peerName) free(ctx->connNotif->connPool[i].peerName);
+                       edg_wll_gss_close(&ctx->connNotif->connPool[i].gss,&close_timeout);
+                       if (ctx->connNotif->connPool[i].gsiCred)
+                               edg_wll_gss_release_cred(&ctx->connNotif->connPool[i].gsiCred, NULL);
+                       if (ctx->connNotif->connPool[i].buf) free(ctx->connNotif->connPool[i].buf);
+                       if (ctx->connNotif->connPool[i].bufOut) free(ctx->connNotif->connPool[i].bufOut);
+               }
+               free(ctx->connNotif->connPool);
+               free(ctx->connNotif);
        }
        if ( ctx->connProxy ) {
                if ( ctx->connProxy->buf ) free(ctx->connProxy->buf);