* use modified il_string interface
authorMichal Voců <michal@ruk.cuni.cz>
Fri, 2 Jun 2006 13:03:17 +0000 (13:03 +0000)
committerMichal Voců <michal@ruk.cuni.cz>
Fri, 2 Jun 2006 13:03:17 +0000 (13:03 +0000)
* faster read_event

org.glite.lb.logger/src/event_store.c
org.glite.lb.logger/src/il_master.c
org.glite.lb.logger/src/input_queue_socket.c
org.glite.lb.logger/src/interlogd.c
org.glite.lb.logger/src/interlogd.h
org.glite.lb.logger/src/perftest_il.sh
org.glite.lb.logger/src/server_msg.c

index feab102..365406b 100644 (file)
@@ -426,8 +426,14 @@ event_store_recover(struct event_store *es)
     ret = -1;
 
     /* create message for server */
-    msg = server_msg_create(event_s, last);
-    free(event_s);
+    {
+           il_octet_string_t e;
+
+           e.data = event_s;
+           e.len = strlen(event_s);
+           msg = server_msg_create(&e, last);
+           free(event_s);
+    }
     if(msg == NULL) {
            il_log(LOG_ALERT, "    event file corrupted! Please move it to quarantine (ie. somewhere else) and restart interlogger.\n");
            break;
index e127af6..c09499d 100644 (file)
@@ -128,7 +128,7 @@ parse_cmd(char *event, char **job_id_s, long *receipt, int *timeout)
 
 static 
 int 
-handle_cmd(char *event, long offset)
+handle_cmd(il_octet_string_t *event, long offset)
 {
        char *job_id_s;
        struct event_queue *eq;
@@ -139,7 +139,7 @@ handle_cmd(char *event, long offset)
        struct timeval  tv;
 
        /* parse command */
-       if(parse_cmd(event, &job_id_s, &receipt, &timeout) < 0) 
+       if(parse_cmd(event->data, &job_id_s, &receipt, &timeout) < 0) 
                return(0);
 
 #if defined(INTERLOGD_FLUSH)
@@ -304,7 +304,7 @@ cmd_error:
 
 static 
 int
-handle_msg(char *event, long offset)
+handle_msg(il_octet_string_t *event, long offset)
 { 
        struct server_msg *msg = NULL;
 #if !defined(IL_NOTIFICATIONS)
@@ -386,10 +386,13 @@ loop()
 {
        /* receive events */
        while(1) {
-               char *msg;
+               il_octet_string_t msg;
                long offset;
                int ret;
     
+               if(killflg)
+                       exit(0);
+
                clear_error();
                if((ret = input_queue_get(&msg, &offset, INPUT_TIMEOUT)) < 0) 
                {
@@ -408,17 +411,17 @@ loop()
                }
 
 #ifdef PERF_EMPTY
-               glite_wll_perftest_consumeEventString(msg);
-               free(msg);
+               glite_wll_perftest_consumeEventString(msg.data);
+               free(msg.data);
                continue;
 #endif
 
 #ifdef INTERLOGD_HANDLE_CMD            
-               ret = handle_cmd(msg, offset);
+               ret = handle_cmd(&msg, offset);
                if(ret == 0)
 #endif
-                       ret = handle_msg(msg, offset);
-               free(msg);
+                       ret = handle_msg(&msg, offset);
+               free(msg.data);
                if(ret < 0)
                        switch (error_get_maj()) {
                                case IL_SYS:
index 71fc89d..19c81e3 100644 (file)
@@ -68,32 +68,96 @@ void input_queue_detach()
 }
 
 
+#define DEFAULT_CHUNK_SIZE 1024
+
 static
-char *
-read_event(int sock, long *offset)
+int
+read_event(int sock, long *offset, il_octet_string_t *msg)
 {
   char *buffer, *p, *n;
-  int  len, alen;
-  char buf[256];
+  int  len, alen, i, chunk_size = DEFAULT_CHUNK_SIZE;
+  static char buf[1024];
+
+  msg->data = NULL;
+  msg->len = 0;
 
   /* receive offset */
   len = recv(sock, offset, sizeof(*offset), MSG_NOSIGNAL);
   if(len < sizeof(*offset)) {
     set_error(IL_PROTO, errno, "read_event: error reading offset");
-    return(NULL);
+    return(-1);
   }
   
   /* receive event string */
-  buffer=malloc(1024);
+  buffer=malloc(8*chunk_size);
   if(buffer == NULL) {
     set_error(IL_NOMEM, ENOMEM, "read_event: no room for event");
-    return(NULL);
+    return(-1);
   }
   p = buffer;
+  alen = 8*chunk_size;
 
-  alen = 1024;
+  /* Variables used here:
+        - buffer points to allocated memory,
+       - alen is the allocated memory size,
+       - p points to the first free location in buffer,
+       - len is the amount actually read by recv,
+       - i is the amount of data belonging to the current event (including separator).
+       - n points to event separator or is NULL
+    Hence:
+         (p - buffer) gives the amount of valid data read so far,
+        (alen - (p - buffer)) is the free space,
+  */ 
+#if 1
+  /* Reading events - optimized version. Attempts to increase chunks read by recv
+   * when there are more data, reads directly into destination memory (instead of 
+   * copying from static buffer) etc.
+   *
+   * For some reason it is not much faster than the old variant.
+   */
+  do {
+         /* prepare at least chunk_size bytes for next data */
+         if(alen - (p - buffer) < chunk_size) {
+                 alen += (chunk_size < 8192) ? 8192 : 8*chunk_size;
+                 n = realloc(buffer, alen);
+                 if(n == NULL) {
+                         free(buffer);
+                         set_error(IL_NOMEM, ENOMEM, "read_event: no room for event");
+                         return(-1);
+                 }
+                 p = n + (p - buffer);
+                 buffer = n;
+         }
+
+         /* read chunk */
+         if((len=recv(sock, p, chunk_size, MSG_PEEK | MSG_NOSIGNAL)) > 0) {
+                 /* find the end of event, if any */
+                 /* faster (and dirty) way of doing strnchr (which is not in libc, anyway) */
+                 if((n=memccpy(p, p, EVENT_SEPARATOR, len)) != NULL) {
+                         i = n - p; /* length including separator */
+                 } else {
+                         i = len;
+                         /* long event, huh? try reading more data at once */
+                         chunk_size += 1024;
+                 }
+                 /* remove the relevant data from input */
+                 /* i > 0 */
+                 if(recv(sock, p, i, MSG_NOSIGNAL) != i) {
+                         set_error(IL_SYS, errno, "read_event: error reading data");
+                         free(buffer);
+                         return(-1);
+                 }
+                 /* move the pointer to the first free place, separator is considered free space */
+                 p = (n == NULL) ? p + len : n - 1;
+         }
+  } while ( (len > 0) && (n == NULL) );
+
+#else
+  /* Reading events - original version.
+   * Appears to behave quite good, anyway.
+   */
   while((len=recv(sock, buf, sizeof(buf), MSG_PEEK | MSG_NOSIGNAL)) > 0) {
-    int i;
 
     /* we have to be prepared for sizeof(buf) bytes */
     if(alen - (p - buffer) < (int)sizeof(buf)) {
@@ -102,27 +166,41 @@ read_event(int sock, long *offset)
       if(n == NULL) {
        free(buffer);
        set_error(IL_NOMEM, ENOMEM, "read_event: no room for event");
-       return(NULL);
+       return(-1);
       }
       p = p - buffer + n;
       buffer = n;
     }
 
     /* copy all relevant bytes from buffer */
-    for(i=0; (i < len) && (buf[i] != EVENT_SEPARATOR); i++) 
-      *p++ = buf[i];
+    n = (char*)memccpy(p, buf, EVENT_SEPARATOR, len);
+    if(n) {
+           /* separator found */
+           n--; /* but do not preserve it */
+           i = n - p;
+           p = n;
+    } else {
+           /* separator not found */
+           i = len;
+           p += len;
+    }
+   /* This was definitely slowing us down:
+    *    for(i=0; (i < len) && (buf[i] != EVENT_SEPARATOR); i++) 
+    *    *p++ = buf[i];
+    */
 
     /* remove the data from queue */
     if(i > 0) 
       if(recv(sock, buf, i, MSG_NOSIGNAL) != i) {
        set_error(IL_SYS, errno, "read_event: error reading data");
        free(buffer);
-       return(NULL);
+       return(-1);
       }
     if(i < len)
       /* the event is complete */
       break;
   }
+#endif
 
   /* terminate buffer */
   *p = 0;
@@ -130,16 +208,17 @@ read_event(int sock, long *offset)
   if(len < 0) {
     set_error(IL_SYS, errno, "read_event: error reading data");
     free(buffer);
-    return(NULL);
+    return(-1);
   }
 
   /* if len == 0, we have not encountered EVENT_SEPARATOR and thus the event is not complete */
   if(len == 0) {
     set_error(IL_PROTO, errno, "read_event: error reading data - premature EOF");
     free(buffer);
-    return(NULL);
+    return(-1);
   }
 
+#if 0
   /* this is probably not necessary at all:
      either len <=0, which was covered before,
      or 0 <= i < len => p > buffer;
@@ -148,10 +227,13 @@ read_event(int sock, long *offset)
   if(p == buffer) {
     set_error(IL_PROTO, errno, "read_event: error reading data - no data received");
     free(buffer);
-    return(NULL);
+    return(-1);
   }
+#endif
 
-  return(buffer);
+  msg->data = buffer;
+  msg->len = p - buffer;
+  return(msg->len);
 }
 
 
@@ -161,12 +243,13 @@ read_event(int sock, long *offset)
  */
 #ifdef PERF_EVENTS_INLINE
 int
-input_queue_get(char **buffer, long *offset, int timeout)
+input_queue_get(il_octet_string *buffer, long *offset, int timeout)
 {
        static long o = 0;
        int len;
 
-       len = glite_wll_perftest_produceEventString(buffer);
+       len = glite_wll_perftest_produceEventString(&buffer->data);
+       buffer->len = len;
        if(len) {
                o += len;
                *offset = o;
@@ -177,7 +260,7 @@ input_queue_get(char **buffer, long *offset, int timeout)
 }
 #else
 int
-input_queue_get(char **buffer, long *offset, int timeout)
+input_queue_get(il_octet_string_t *buffer, long *offset, int timeout)
 {
   fd_set fds;
   struct timeval tv;
@@ -198,11 +281,17 @@ input_queue_get(char **buffer, long *offset, int timeout)
     return(0);
     
   case -1: /* error */
-    set_error(IL_SYS, errno, "input_queue_get: error waiting for event");
-    return(-1);
-    
+         switch(errno) {
+         case EINTR:
+                 il_log(LOG_DEBUG, "  interrupted while waiting for event!\n");
+                 return(0);
+
+         default:
+                 set_error(IL_SYS, errno, "input_queue_get: error waiting for event");
+                 return(-1);
+         }
   default:
-    break;
+         break;
   }
   
   if((accepted=accept(sock, NULL, NULL)) < 0) {
@@ -210,16 +299,16 @@ input_queue_get(char **buffer, long *offset, int timeout)
     return(-1);
   }
 
-  *buffer = read_event(accepted, offset);
+  read_event(accepted, offset, buffer);
   close(accepted);
 
-  if(*buffer == NULL) {
+  if(buffer->data == NULL) {
     if(error_get_maj() != IL_OK)
       return(-1);
     else
       return(0);
   }
     
-  return(strlen(*buffer));
+  return(buffer->len);
 }
 #endif
index d5b0c32..aff7510 100644 (file)
@@ -30,7 +30,7 @@
 
 /* The name the program was run with, stripped of any leading path. */
 char *program_name;
-static int killflg = 0;
+int killflg = 0;
 
 int TIMEOUT = DEFAULT_TIMEOUT;
 
index 9f91a0d..846d9ea 100644 (file)
@@ -5,6 +5,7 @@
 
 #include "il_error.h"
 #include "glite/security/glite_gss.h"
+#include "glite/lb/il_msg.h"
 
 #include <pthread.h>
 #include <sys/time.h>
@@ -68,6 +69,7 @@ extern char *cert_file;
 extern char *key_file;
 extern char *CAcert_dir;
 extern int bs_only;
+extern int killflg;
 #ifdef LB_PERF
 extern int nosend;
 #ifdef PERF_EVENTS_INLINE
@@ -138,9 +140,9 @@ struct event_queue {
 
 
 /* server msg methods */
-struct server_msg *server_msg_create(char *, long);
+struct server_msg *server_msg_create(il_octet_string_t *, long);
 struct server_msg *server_msg_copy(struct server_msg *);
-int server_msg_init(struct server_msg *, char *);
+int server_msg_init(struct server_msg *, il_octet_string_t *);
 #if defined(INTERLOGD_EMS)
 int server_msg_is_priority(struct server_msg *);
 #endif
@@ -178,7 +180,7 @@ int event_queue_cond_unlock(struct event_queue *);
 /* input queue */
 int input_queue_attach();
 void input_queue_detach();
-int input_queue_get(char **, long *, int);
+int input_queue_get(il_octet_string_t *, long *, int);
 
 /* queue management functions */
 int queue_list_init(char *);
index dcf0ff2..e9562f9 100644 (file)
@@ -22,10 +22,17 @@ run_test il $numjobs
 j=0
 while [[ $j -lt 4 ]]
 do
-    echo -e -n "\t ${PERFTEST_EV_THROUGHPUT[$j]}"      
+    echo -e -n "\t ${PERFTEST_THROUGHPUT[$j]}" 
     j=$((j+1))
 done
 echo ""
+#j=0
+#while [[ $j -lt 4 ]]
+#do
+#    echo -e -n "\t (${PERFTEST_EV_THROUGHPUT[$j]})"   
+#    j=$((j+1))
+#done
+#echo ""
 
 
 #
index 1a90e72..b7a005b 100644 (file)
 
 static
 int 
-create_msg(char *event, char **buffer, long *receipt)
+create_msg(il_octet_string_t *ev, char **buffer, long *receipt)
 {
   char *p;  int  len;
-  
+  char *event = ev->data;
+
   *receipt = 0;
 
 #if defined(INTERLOGD_EMS)
@@ -56,7 +57,7 @@ create_msg(char *event, char **buffer, long *receipt)
   }
 #endif
 
-  len = encode_il_msg(buffer, event);
+  len = encode_il_msg(buffer, ev);
   if(len < 0) {
     set_error(IL_NOMEM, ENOMEM, "create_msg: out of memory allocating message");
     return(-1);
@@ -66,7 +67,7 @@ create_msg(char *event, char **buffer, long *receipt)
 
 
 struct server_msg *
-server_msg_create(char *event, long offset)
+server_msg_create(il_octet_string_t *event, long offset)
 {
   struct server_msg *msg;
 
@@ -121,7 +122,7 @@ server_msg_copy(struct server_msg *src)
 
 
 int
-server_msg_init(struct server_msg *msg, char *event)
+server_msg_init(struct server_msg *msg, il_octet_string_t *event)
 {
 #if defined(IL_NOTIFICATIONS)
        edg_wll_Context context;
@@ -130,6 +131,7 @@ server_msg_init(struct server_msg *msg, char *event)
 #endif
 
        assert(msg != NULL);
+       assert(event != NULL);
 
        memset(msg, 0, sizeof(*msg));
 
@@ -164,10 +166,10 @@ server_msg_init(struct server_msg *msg, char *event)
        if(msg->len < 0) {
                return(-1);
        }
-       msg->job_id_s = edg_wll_GetJobId(event);
+       msg->job_id_s = edg_wll_GetJobId(event->data);
 #endif
        /* remember to add event separator to the length */
-       msg->ev_len = strlen(event) + 1;
+       msg->ev_len = event->len + 1;
 
        if(msg->job_id_s == NULL) {
                set_error(IL_LBAPI, EDG_WLL_ERROR_PARSE_BROKEN_ULM, "server_msg_init: error getting id");