backports from 3.1:
authorMichal Voců <michal@ruk.cuni.cz>
Thu, 7 Dec 2006 10:29:46 +0000 (10:29 +0000)
committerMichal Voců <michal@ruk.cuni.cz>
Thu, 7 Dec 2006 10:29:46 +0000 (10:29 +0000)
  - rename corrupted event file to avoid rereading it
  - communication performance improvements

org.glite.lb.logger/src/event_store.c
org.glite.lb.logger/src/il_master.c
org.glite.lb.logger/src/input_queue_socket.c
org.glite.lb.logger/src/interlogd.c
org.glite.lb.logger/src/interlogd.h
org.glite.lb.logger/src/queue_thread.c
org.glite.lb.logger/src/send_event.c

index 9f56219..b6ed5f9 100644 (file)
@@ -6,6 +6,7 @@
 #include <string.h>
 #include <stdlib.h>
 #include <sys/types.h>
+#include <sys/stat.h>
 #include <dirent.h>
 #ifdef HAVE_UNISTD_H
 #include <unistd.h>
@@ -279,6 +280,65 @@ event_store_write_ctl(struct event_store *es)
 
 
 /*
+ * event_store_qurantine() 
+ *   - rename damaged event store file 
+ *   - essentially does the same actions as cleanup, but the event store 
+ *     does not have to be empty
+ * returns 0 on success, -1 on error
+ */
+static
+int
+event_store_quarantine(struct event_store *es) 
+{
+       int num;
+       char newname[MAXPATHLEN+1];
+
+       /* find available qurantine name */
+       /* we give it at most 1024 tries */
+       for(num = 0; num < 1024; num++) {
+               struct stat st;
+
+               snprintf(newname, MAXPATHLEN, "%s.quarantine.%d", es->event_file_name, num);
+               newname[MAXPATHLEN] = 0;
+               if(stat(newname, &st) < 0) {
+                       if(errno == ENOENT) {
+                               /* file not found */
+                               break;
+                       } else {
+                               /* some other error with name, probably permanent */
+                               set_error(IL_SYS, errno, "event_store_qurantine: error looking for qurantine filename");
+                               return(-1);
+                               
+                       }
+               } else {
+                       /* the filename is used already */
+               }
+       }
+       if(num >= 1024) {
+               /* new name not found */
+               /* XXX - is there more suitable error? */
+               set_error(IL_SYS, ENOSPC, "event_store_quarantine: exhausted number of retries looking for quarantine filename");
+               return(-1);
+       }
+
+       /* actually rename the file */
+       il_log(LOG_DEBUG, "    renaming damaged event file from %s to %s\n",
+              es->event_file_name, newname);
+       if(rename(es->event_file_name, newname) < 0) {
+               set_error(IL_SYS, errno, "event_store_quarantine: error renaming event file");
+               return(-1);
+       }
+
+       /* clear the counters */
+       es->last_committed_ls = 0;
+       es->last_committed_bs = 0;
+       es->offset = 0;
+
+       return(0);
+}
+
+
+/*
  * event_store_recover()
  *   - recover after restart or catch up when events missing in IPC
  *   - if offset > 0, read everything behind it
@@ -428,8 +488,12 @@ event_store_recover(struct event_store *es)
     msg = server_msg_create(event_s, last);
     free(event_s);
     if(msg == NULL) {
-           il_log(LOG_ALERT, "    event file corrupted! Please move it to quarantine (ie. somewhere else) and restart interlogger.\n");
-           break;
+           il_log(LOG_ALERT, "    event file corrupted! I will try to move it to quarantine (ie. rename it).\n");
+           /* actually do not bother if quarantine succeeded or not - we could not do more */
+           event_store_quarantine(es);
+           fclose(ef);
+           event_store_unlock(es);
+           return(-1);
     }
     msg->es = es;
 
@@ -704,7 +768,6 @@ event_store_clean(struct event_store *es)
 }
 
 
-
 /* --------------------------------
  * event store management functions
  * --------------------------------
@@ -798,6 +861,11 @@ event_store_from_file(char *filename)
        
        il_log(LOG_INFO, "  attaching to event file: %s\n", filename);
        
+       if(strstr(filename, "quarantine") != NULL) {
+               il_log(LOG_INFO, "  file name belongs to quarantine, not touching that.\n");
+               return(0);
+       }
+
        event_file = fopen(filename, "r");
        if(event_file == NULL) {
                set_error(IL_SYS, errno, "event_store_from_file: error opening event file");
@@ -944,6 +1012,59 @@ event_store_init(char *prefix)
       free(s);
     }
     closedir(event_dir);
+
+    /* one more pass - this time remove stale .ctl files */
+    event_dir = opendir(dir);
+    if(event_dir == NULL) {
+      free(dir);
+      set_error(IL_SYS, errno, "event_store_init: error opening event directory");
+      return(-1);
+    }
+    
+    while((entry=readdir(event_dir))) {
+      char *s;
+
+      /* skip all files that do not match prefix */
+      if(strncmp(entry->d_name, p, len) != 0) 
+       continue;
+
+      /* find all control files */
+      if((s=strstr(entry->d_name, ".ctl")) != NULL &&
+        s[4] == '\0') {
+             char *ef;
+             struct stat st;
+
+             /* is there corresponding event file? */
+             ef = malloc(strlen(dir) + strlen(entry->d_name) + 2);
+             if(ef == NULL) {
+                     free(dir);
+                     set_error(IL_NOMEM, ENOMEM, "event_store_init: no room for event file name");
+                     return(-1);
+             }
+
+             s[0] = 0;
+             *ef = '\0';
+             strcat(ef, dir);
+             strcat(ef, "/");
+             strcat(ef, entry->d_name);
+             s[0] = '.';
+
+             if(stat(ef, &st) == 0) {
+                     /* something is there */
+                     /* XXX - it could be something else than event file, but do not bother now */
+             } else {
+                     /* could not stat file, remove ctl */
+                     strcat(ef, s);
+                     il_log(LOG_DEBUG, "  removing stale file %s\n", ef);
+                     if(unlink(ef)) 
+                             il_log(LOG_ERR, "  could not remove file %s: %s\n", ef, strerror(errno));
+                     
+             }
+             free(ef);
+
+      }
+    }
+    closedir(event_dir);
     free(dir);
   }
 
index b045112..546fbc7 100644 (file)
@@ -385,7 +385,10 @@ loop()
                char *msg;
                long offset;
                int ret;
-    
+
+               if(killflg)
+                       return(0);
+
                clear_error();
                if((ret = input_queue_get(&msg, &offset, INPUT_TIMEOUT)) < 0) 
                {
index f183319..becca80 100644 (file)
@@ -109,8 +109,21 @@ read_event(int sock, long *offset)
     }
 
     /* copy all relevant bytes from buffer */
-    for(i=0; (i < len) && (buf[i] != EVENT_SEPARATOR); i++) 
-      *p++ = buf[i];
+    n = (char*)memccpy(p, buf, EVENT_SEPARATOR, len);
+    if(n) {
+           /* separator found */
+           n--; /* but do not preserve it */
+           i = n - p;
+           p = n;
+    } else {
+           /* separator not found */
+           i = len;
+           p += len;
+    }
+   /* This was definitely slowing us down:
+    *    for(i=0; (i < len) && (buf[i] != EVENT_SEPARATOR); i++) 
+    *    *p++ = buf[i];
+    */
 
     /* remove the data from queue */
     if(i > 0) 
@@ -140,6 +153,7 @@ read_event(int sock, long *offset)
     return(NULL);
   }
 
+#if 0
   /* this is probably not necessary at all:
      either len <=0, which was covered before,
      or 0 <= i < len => p > buffer;
@@ -150,6 +164,7 @@ read_event(int sock, long *offset)
     free(buffer);
     return(NULL);
   }
+#endif
 
   return(buffer);
 }
@@ -181,11 +196,17 @@ input_queue_get(char **buffer, long *offset, int timeout)
     return(0);
     
   case -1: /* error */
-    set_error(IL_SYS, errno, "input_queue_get: error waiting for event");
-    return(-1);
-    
+         switch(errno) {
+         case EINTR:
+                 il_log(LOG_DEBUG, "  interrupted while waiting for event!\n");
+                 return(0);
+
+         default:
+                 set_error(IL_SYS, errno, "input_queue_get: error waiting for event");
+                 return(-1);
+         }
   default:
-    break;
+         break;
   }
   
   if((accepted=accept(sock, NULL, NULL)) < 0) {
index d2d53b3..ef64a90 100644 (file)
@@ -13,6 +13,7 @@
 
 #include "interlogd.h"
 #include "glite/lb/consumer.h"
+#include "glite/lb/log_proto.h"
 #include "glite/security/glite_gss.h"
 
 #define EXIT_FAILURE 1
 #define DEFAULT_PREFIX "/tmp/notif_events"
 #define DEFAULT_SOCKET "/tmp/notif_interlogger.sock"
 #else
-#define DEFAULT_PREFIX "/tmp/dglogd.log"
+#define DEFAULT_PREFIX EDG_WLL_LOG_PREFIX_DEFAULT
 #define DEFAULT_SOCKET "/tmp/interlogger.sock"
 #endif
 
 
 /* The name the program was run with, stripped of any leading path. */
 char *program_name;
-static int killflg = 0;
+int killflg = 0;
 
 int TIMEOUT = DEFAULT_TIMEOUT;
 
@@ -53,6 +54,7 @@ static void usage (int status)
               "  -b, --book                 send events to bookkeeping server only\n"
               "  -l, --log-server <host>    specify address of log server\n"
               "  -s, --socket <path>        non-default path of local socket\n"
+              "  -L, --lazy [<timeout>]     be lazy when closing connections to servers\n"
               , program_name, program_name);
        exit(status);
 }
@@ -63,6 +65,8 @@ static int debug;
 static int verbose = 0;
 char *file_prefix = DEFAULT_PREFIX;
 int bs_only = 0;
+int lazy_close = 0;
+int default_close_timeout;
 
 char *cert_file = NULL;
 char *key_file  = NULL;
@@ -83,6 +87,7 @@ static struct option const long_options[] =
   {"CAdir", required_argument, 0, 'C'},
   {"log-server", required_argument, 0, 'l'},
   {"socket", required_argument, 0, 's'},
+  {"lazy", optional_argument, 0, 'L'},
   {NULL, 0, NULL, 0}
 };
 
@@ -108,6 +113,7 @@ decode_switches (int argc, char **argv)
                           "b"  /* only bookeeping */
                            "l:" /* log server */
                           "d" /* debug */
+                          "L::" /* lazy */
                           "s:", /* socket */
                           long_options, (int *) 0)) != EOF)
     {
@@ -156,6 +162,14 @@ decode_switches (int argc, char **argv)
          socket_path = strdup(optarg);
          break;
 
+       case 'L':
+               lazy_close = 1;
+               if(optarg) 
+                       default_close_timeout = atoi(optarg);
+               else
+                       default_close_timeout = TIMEOUT;
+               break;
+
        default:
          usage (EXIT_FAILURE);
        }
@@ -183,10 +197,10 @@ main (int argc, char **argv)
   setlinebuf(stdout);
   setlinebuf(stderr);
 
-  i = decode_switches (argc, argv);
-
   if ((p = getenv("EDG_WL_INTERLOG_TIMEOUT"))) TIMEOUT = atoi(p);
 
+  i = decode_switches (argc, argv);
+
   /* force -b if we do not have log server */
   if(log_server == NULL) {
     log_server = strdup(DEFAULT_LOG_SERVER);
@@ -218,6 +232,9 @@ main (int argc, char **argv)
     il_log(LOG_CRIT, "Failed to initialize output event queues: %s\n", error_get_msg());
     exit(EXIT_FAILURE);
   }
+  if(lazy_close)
+         il_log(LOG_DEBUG, "  using lazy mode when closing connections, timeout %d\n",
+                default_close_timeout);
 
   if (CAcert_dir)
      setenv("X509_CERT_DIR", CAcert_dir, 1);
index 64c6889..7c443fe 100644 (file)
@@ -64,7 +64,9 @@ extern char *cert_file;
 extern char *key_file;
 extern char *CAcert_dir;
 extern int bs_only;
-
+extern int killflg;
+extern int lazy_close;
+extern int default_close_timeout;
 
 /* shared data for thread communication */
 #ifdef INTERLOGD_FLUSH
index 9ee4f27..8064ce7 100644 (file)
@@ -33,6 +33,8 @@ queue_thread(void *q)
 {
        struct event_queue *eq = (struct event_queue *)q;
        int ret, exit;
+       int retrycnt;
+       int close_timeout;
 
        if(init_errors(0) < 0) {
                il_log(LOG_ERR, "Error initializing thread specific data, exiting!");
@@ -46,6 +48,7 @@ queue_thread(void *q)
        event_queue_cond_lock(eq);
 
        exit = 0;
+       retrycnt = 0;
        while(!exit) {
     
                clear_error();
@@ -57,7 +60,16 @@ queue_thread(void *q)
                       && (eq->flushing != 1)
 #endif
                        ) {
-                       ret = event_queue_wait(eq, 0);
+                       if(lazy_close && close_timeout) {
+                               ret = event_queue_wait(eq, close_timeout);
+                               if(ret == 1) {/* timeout? */
+                                       event_queue_close(eq);
+                                       il_log(LOG_DEBUG, "  connection to %s:%d closed\n",
+                                              eq->dest_name, eq->dest_port);
+                               }
+                               close_timeout = 0;
+                       } else 
+                               ret = event_queue_wait(eq, 0);
                        if(ret < 0) {
                                /* error waiting */
                                il_log(LOG_ERR, "queue_thread: %s\n", error_get_msg());
@@ -83,7 +95,9 @@ queue_thread(void *q)
 #else
                        il_log(LOG_INFO, "    could not connect to bookkeeping server %s, waiting for retry\n", eq->dest_name);
 #endif
+                       retrycnt++;
                } else {
+                       retrycnt = 0;
                        /* connected, send events */
                        switch(ret=event_queue_send(eq)) {
                                
@@ -108,7 +122,13 @@ queue_thread(void *q)
                        } /* switch */
                        
                        /* we are done for now anyway, so close the queue */
+                       if((ret == 1) && lazy_close)
+                               close_timeout = default_close_timeout;
+                       else {
                                event_queue_close(eq);
+                               il_log(LOG_DEBUG, "  connection to %s:%d closed\n",
+                                      eq->dest_name, eq->dest_port);
+                       }
                } 
 
 #if defined(INTERLOGD_HANDLE_CMD) && defined(INTERLOGD_FLUSH)
@@ -133,8 +153,14 @@ queue_thread(void *q)
 
                /* if there was some error with server, sleep for a while */
                /* iff !event_queue_empty() */
-               if(ret == 0) 
+               /* also allow for one more try immediately after server disconnect,
+                  which may cure server kicking us out after given number of connections */
+#ifndef LB_PERF
+               if((ret == 0) && (retrycnt > 0)) {
+                       il_log(LOG_WARNING, "    sleeping\n");
                        event_queue_sleep(eq);
+               }
+#endif
 
                if(exit) {
                        /* we have to clean up before exiting */
index 7f1eb9d..0040a47 100644 (file)
@@ -216,6 +216,7 @@ event_queue_close(struct event_queue *eq)
 int 
 event_queue_send(struct event_queue *eq)
 {
+  int events_sent = 0;
   assert(eq != NULL);
 
   if(eq->gss.context == GSS_C_NO_CONTEXT)
@@ -241,15 +242,25 @@ event_queue_send(struct event_queue *eq)
     tv.tv_usec = 0;
     ret = edg_wll_gss_write_full(&eq->gss, msg->msg, msg->len, &tv, &bytes_sent, &gss_stat);
     if(ret < 0) {
-      eq->timeout = TIMEOUT;
-      return(0);
+           eq->timeout = TIMEOUT;
+           return(0);
+    }
+    if(ret < 0) {
+           if (ret == EDG_WLL_GSS_ERROR_ERRNO && errno == EPIPE && events_sent > 0)
+                   eq->timeout = 0;
+           else
+                   eq->timeout = TIMEOUT;
+           return(0);
     }
     
     if((code = get_reply(eq, &rep, &code_min)) < 0) {
-      /* could not get the reply properly, so try again later */
-      il_log(LOG_ERR, "  error reading server %s reply:\n    %s\n", eq->dest_name, error_get_msg());
-      eq->timeout = TIMEOUT;
-      return(0);
+           /* could not get the reply properly, so try again later */
+           il_log(LOG_ERR, "  error reading server %s reply:\n    %s\n", eq->dest_name, error_get_msg());
+           if (events_sent>0) 
+                   eq->timeout = 1;
+           else
+                   eq->timeout = TIMEOUT;
+           return(0);
     }
     
     il_log(LOG_DEBUG, "    event sent, server %s replied with %d, %s\n", eq->dest_name, code, rep);
@@ -288,6 +299,7 @@ event_queue_send(struct event_queue *eq)
          il_log(LOG_ERR, "send_event: %s\n", error_get_msg());
        
       event_queue_remove(eq);
+      events_sent++;
       break;
       
     } /* switch */