merge
authorAleš Křenek <ljocha@ics.muni.cz>
Fri, 23 Mar 2007 14:08:55 +0000 (14:08 +0000)
committerAleš Křenek <ljocha@ics.muni.cz>
Fri, 23 Mar 2007 14:08:55 +0000 (14:08 +0000)
org.glite.lb.logger/Makefile
org.glite.lb.logger/src/event_store.c
org.glite.lb.logger/src/il_error.c
org.glite.lb.logger/src/interlogd.c
org.glite.lb.logger/src/interlogd.h
org.glite.lb.logger/src/logd.c
org.glite.lb.logger/src/send_event.c

index b141272..59ebe41 100644 (file)
@@ -1,7 +1,5 @@
 # defaults
 top_srcdir=.
-builddir=build
-top_builddir=${top_srcdir}/${builddir}
 stagedir=.
 distdir=.
 globalprefix=glite
@@ -14,7 +12,6 @@ glite_location=/opt/glite
 globus_prefix=/opt/globus
 nothrflavour=gcc32
 thrflavour=gcc32pthr
-expat_prefix=/opt/expat
 
 -include Makefile.inc
 -include ../project/version.properties
@@ -71,12 +68,12 @@ GLOBUS_THRLIBS:= -L${globus_prefix}/lib \
        -lglobus_common_${thrflavour} \
        -lglobus_gssapi_gsi_${thrflavour}
 
-ifneq (${expat_prefix},/usr)
-       EXPAT_LIBS:=-L${expat_prefix}/lib
-endif
-EXPAT_LIBS:=${EXPAT_LIBS} -lexpat
-
-EXT_LIBS:= ${EXPAT_LIBS}
+#ifneq (${expat_prefix},/usr)
+#      EXPAT_LIBS:=-L${expat_prefix}/lib
+#endif
+#EXPAT_LIBS:=${EXPAT_LIBS} -lexpat
+#
+#EXT_LIBS:= ${EXPAT_LIBS}
 
 COMMON_LIB:=-lglite_lb_common
 
index 365406b..6b7b3ce 100644 (file)
@@ -280,6 +280,65 @@ event_store_write_ctl(struct event_store *es)
 
 
 /*
+ * event_store_qurantine() 
+ *   - rename damaged event store file 
+ *   - essentially does the same actions as cleanup, but the event store 
+ *     does not have to be empty
+ * returns 0 on success, -1 on error
+ */
+static
+int
+event_store_quarantine(struct event_store *es) 
+{
+       int num;
+       char newname[MAXPATHLEN+1];
+
+       /* find available qurantine name */
+       /* we give it at most 1024 tries */
+       for(num = 0; num < 1024; num++) {
+               struct stat st;
+
+               snprintf(newname, MAXPATHLEN, "%s.quarantine.%d", es->event_file_name, num);
+               newname[MAXPATHLEN] = 0;
+               if(stat(newname, &st) < 0) {
+                       if(errno == ENOENT) {
+                               /* file not found */
+                               break;
+                       } else {
+                               /* some other error with name, probably permanent */
+                               set_error(IL_SYS, errno, "event_store_qurantine: error looking for qurantine filename");
+                               return(-1);
+                               
+                       }
+               } else {
+                       /* the filename is used already */
+               }
+       }
+       if(num >= 1024) {
+               /* new name not found */
+               /* XXX - is there more suitable error? */
+               set_error(IL_SYS, ENOSPC, "event_store_quarantine: exhausted number of retries looking for quarantine filename");
+               return(-1);
+       }
+
+       /* actually rename the file */
+       il_log(LOG_DEBUG, "    renaming damaged event file from %s to %s\n",
+              es->event_file_name, newname);
+       if(rename(es->event_file_name, newname) < 0) {
+               set_error(IL_SYS, errno, "event_store_quarantine: error renaming event file");
+               return(-1);
+       }
+
+       /* clear the counters */
+       es->last_committed_ls = 0;
+       es->last_committed_bs = 0;
+       es->offset = 0;
+
+       return(0);
+}
+
+
+/*
  * event_store_recover()
  *   - recover after restart or catch up when events missing in IPC
  *   - if offset > 0, read everything behind it
@@ -296,6 +355,7 @@ event_store_recover(struct event_store *es)
   FILE *ef;
   struct flock efl;
   char err_msg[128];
+  struct stat stbuf;
 
   assert(es != NULL);
   
@@ -344,6 +404,22 @@ event_store_recover(struct event_store *es)
          return(-1);
   }
 
+  /* check the file modification time and size to avoid unnecessary operations */
+  memset(&stbuf, 0, sizeof(stbuf));
+  if(fstat(fd, &stbuf) < 0) {
+         il_log(LOG_ERR, "    could not stat event file %s: %s\n", es->event_file_name, strerror(errno));
+         fclose(ef);
+         event_store_unlock(es);
+         return -1;
+  } else {
+         if((es->offset == stbuf.st_size) && (es->last_modified == stbuf.st_mtime)) {
+                 il_log(LOG_DEBUG, "  event file not modified since last visit, skipping\n");
+                 fclose(ef);
+                 event_store_unlock(es);
+                 return(0);
+         }
+  }
+
   while(1) { /* try, try, try */
 
          /* get the position in file to be sought */
@@ -435,8 +511,12 @@ event_store_recover(struct event_store *es)
            free(event_s);
     }
     if(msg == NULL) {
-           il_log(LOG_ALERT, "    event file corrupted! Please move it to quarantine (ie. somewhere else) and restart interlogger.\n");
-           break;
+           il_log(LOG_ALERT, "    event file corrupted! I will try to move it to quarantine (ie. rename it).\n");
+           /* actually do not bother if quarantine succeeded or not - we could not do more */
+           event_store_quarantine(es);
+           fclose(ef);
+           event_store_unlock(es);
+           return(-1);
     }
     msg->es = es;
 
@@ -482,6 +562,7 @@ event_store_recover(struct event_store *es)
 
   /* due to this little assignment we had to lock the event_store for writing */
   es->offset = last;
+  es->last_modified = stbuf.st_mtime;
   il_log(LOG_DEBUG, "  event store offset set to %ld\n", last);
 
   if(msg) 
@@ -509,6 +590,16 @@ event_store_sync(struct event_store *es, long offset)
 
   assert(es != NULL);
 
+  /* Commented out due to the fact that offset as received on socket
+   * has little to do with the real event file at the moment. The
+   * event will be read from file, socket now serves only to notify
+   * about possible event file change.
+   */
+  ret = event_store_recover(es);
+  ret = (ret < 0) ? ret : 0;
+  return(ret);
+
+#if 0
   event_store_lock_ro(es);
   if(es->offset == offset) 
     /* we are up to date */
@@ -548,6 +639,7 @@ event_store_sync(struct event_store *es, long offset)
   }
   event_store_unlock(es);
   return(ret);
+#endif
 }
 
 
@@ -556,6 +648,12 @@ event_store_next(struct event_store *es, long offset, int len)
 {
   assert(es != NULL);
   
+  /* Commented out due to the fact that offset as received on socket
+   * has little to do with real event file at the moment. es->offset
+   * handling is left solely to the event_store_recover().
+   */
+   
+#if 0
   event_store_lock(es);
   /* Whoa, be careful now. The es->offset points right after the last enqueued event,
    * but it may not be the offset of the event WE have just enqueued, because:!    
@@ -567,6 +665,7 @@ event_store_next(struct event_store *es, long offset, int len)
          es->offset += len;
   }
   event_store_unlock(es);
+#endif
 
   return(0);
 }
@@ -686,6 +785,11 @@ event_store_clean(struct event_store *es)
     return(0);
   } else if( es->last_committed_ls > last) {
          il_log(LOG_WARNING, "  warning: event file seems to shrink!\n");
+         /* XXX - in that case we can not continue because there may be
+            some undelivered events referring to that event store */
+         fclose(ef);
+         event_store_unlock(es);
+         return(0);
   }
   
   /* now we are sure that all events were sent and the event queues are empty */
@@ -711,7 +815,6 @@ event_store_clean(struct event_store *es)
 }
 
 
-
 /* --------------------------------
  * event store management functions
  * --------------------------------
@@ -805,6 +908,11 @@ event_store_from_file(char *filename)
        
        il_log(LOG_INFO, "  attaching to event file: %s\n", filename);
        
+       if(strstr(filename, "quarantine") != NULL) {
+               il_log(LOG_INFO, "  file name belongs to quarantine, not touching that.\n");
+               return(0);
+       }
+
        event_file = fopen(filename, "r");
        if(event_file == NULL) {
                set_error(IL_SYS, errno, "event_store_from_file: error opening event file");
@@ -1129,7 +1237,7 @@ event_store_cleanup()
          case -1:
                  il_log(LOG_ERR, "  error removing event store %s (file %s):\n    %s\n", 
                         sl->es->job_id_s, sl->es->event_file_name, error_get_msg());
-                 event_store_release(sl->es);
+                 /* event_store_release(sl->es); */
                  clear_error();
                  /* go on to the next */
                  
index e608b1c..1fe9bb9 100644 (file)
@@ -143,6 +143,8 @@ set_error(int code, long minor, char *msg)
          strncpy(err->msg, msg, IL_ERR_MSG_LEN);
   }
 
+  err->msg[IL_ERR_MSG_LEN] = 0; /* OK, malloc()ed IL_ERR_MSG_LEN + 1 */
+
   return(code);
 }
 
index a8527b9..010a8bc 100644 (file)
@@ -57,7 +57,7 @@ static void usage (int status)
               "  -b, --book                 send events to bookkeeping server only\n"
               "  -l, --log-server <host>    specify address of log server\n"
               "  -s, --socket <path>        non-default path of local socket\n"
-              "  -L, --lazy [<timeout>]     be lazy when closing connections to servers\n"
+              "  -L, --lazy [<timeout>]     be lazy when closing connections to servers (default, timeout==0 means turn lazy off)\n"
 #ifdef LB_PERF
               "  -n, --nosend               PERFTEST: consume events instead of sending\n"
               "  -S, --nosync               PERFTEST: do not check logd files for lost events\n"
@@ -78,7 +78,7 @@ static int debug;
 static int verbose = 0;
 char *file_prefix = DEFAULT_PREFIX;
 int bs_only = 0;
-int lazy_close = 0;
+int lazy_close = 1;
 int default_close_timeout;
 #ifdef LB_PERF
 int nosend = 0, norecover=0, nosync=0, noparse=0;
@@ -204,6 +204,10 @@ decode_switches (int argc, char **argv)
                lazy_close = 1;
                if(optarg) 
                        default_close_timeout = atoi(optarg);
+                       if(default_close_timeout == 0) {
+                               default_close_timeout = TIMEOUT;
+                               lazy_close = 0;
+                       }
                else
                        default_close_timeout = TIMEOUT;
                break;
index 9ec0d55..810314e 100644 (file)
@@ -92,6 +92,7 @@ struct event_store {
        long      last_committed_bs;       /* offset behind event that was last committed by BS */
        long      last_committed_ls;       /*  -"-                                           LS */
        long      offset;                  /* expected file position of next event */
+       time_t    last_modified;           /* time of the last file modification */
        int       recovering;              /* flag for recovery mode */
        pthread_rwlock_t update_lock;      /* lock to prevent simultaneous updates */
        pthread_rwlock_t use_lock;         /* lock to prevent struct deallocation */
index ad3a120..bb1cf67 100644 (file)
@@ -6,6 +6,7 @@
 #include <sys/wait.h>
 #include <sys/socket.h>
 #include <netinet/in.h>
+#include <arpa/inet.h>
 #include <signal.h>
 #include <unistd.h> 
 #include <string.h>
@@ -157,6 +158,8 @@ doit(int socket, gss_cred_id_t cred_handle, char *file_name_prefix, int noipc, i
     OM_uint32  min_stat;
     gss_OID    name_type = GSS_C_NO_OID;
     fd_set fdset;
+    struct sockaddr_in peer;
+    socklen_t  alen = sizeof peer;
 
     ret = count = 0;
     FD_ZERO(&fdset);
@@ -164,11 +167,13 @@ doit(int socket, gss_cred_id_t cred_handle, char *file_name_prefix, int noipc, i
     /* accept */
     timeout.tv_sec = ACCEPT_TIMEOUT;
     timeout.tv_usec = 0;
+    getpeername(socket,(struct sockaddr *) &peer,&alen);
     edg_wll_ll_log(LOG_DEBUG,"Accepting connection (remaining timeout %d.%06d sec)\n",
                (int)timeout.tv_sec, (int) timeout.tv_usec);
     if ((ret = edg_wll_gss_accept(cred_handle,socket,&timeout,&con, &gss_stat)) < 0) {
        edg_wll_ll_log(LOG_DEBUG,"timeout after gss_accept is %d.%06d sec\n",
                (int)timeout.tv_sec, (int) timeout.tv_usec);
+        edg_wll_ll_log(LOG_ERR,"%s: edg_wll_gss_accept() failed\n",inet_ntoa(peer.sin_addr));
        return edg_wll_log_proto_server_failure(ret,&gss_stat,"edg_wll_gss_accept() failed\n");
     }
 
@@ -178,18 +183,18 @@ doit(int socket, gss_cred_id_t cred_handle, char *file_name_prefix, int noipc, i
                                                &client_name, NULL, NULL, NULL, NULL,
                                                NULL, NULL);
     if (GSS_ERROR(gss_stat.major_status)) {
-       char *gss_err;
-       edg_wll_gss_get_error(&gss_stat, "Cannot read client identification", &gss_err);
-       edg_wll_ll_log(LOG_WARNING, "%s\n", gss_err);
-       free(gss_err);
+       char *gss_err;
+       edg_wll_gss_get_error(&gss_stat, "Cannot read client identification", &gss_err);
+       edg_wll_ll_log(LOG_WARNING, "%s: %s\n", inet_ntoa(peer.sin_addr),gss_err);
+       free(gss_err);
     } else {
-       gss_stat.major_status = gss_display_name(&gss_stat.minor_status, client_name,
-                                                &gss_token, &name_type);
-       if (GSS_ERROR(gss_stat.major_status)) {
-               char *gss_err;
-               edg_wll_gss_get_error(&gss_stat, "Cannot process client identification", &gss_err);
-               edg_wll_ll_log(LOG_WARNING, "%s\n", gss_err);
-               free(gss_err);
+       gss_stat.major_status = gss_display_name(&gss_stat.minor_status, client_name,
+                                               &gss_token, &name_type);
+       if (GSS_ERROR(gss_stat.major_status)) {
+         char *gss_err;
+         edg_wll_gss_get_error(&gss_stat, "Cannot process client identification", &gss_err);
+         edg_wll_ll_log(LOG_WARNING, "%s: %s\n",inet_ntoa(peer.sin_addr),gss_err);
+         free(gss_err);
        }
     }
 
index 24e9e22..6a98039 100644 (file)
@@ -231,6 +231,7 @@ event_queue_close(struct event_queue *eq)
 int 
 event_queue_send(struct event_queue *eq)
 {
+  int events_sent = 0;
   assert(eq != NULL);
 
 #ifdef LB_PERF
@@ -250,7 +251,6 @@ event_queue_send(struct event_queue *eq)
     size_t bytes_sent;
     struct timeval tv;
     edg_wll_GssStatus gss_stat;
-    int events_sent = 0;
 
     clear_error();
 
@@ -280,7 +280,10 @@ event_queue_send(struct event_queue *eq)
            if((code = get_reply(eq, &rep, &code_min)) < 0) {
                    /* could not get the reply properly, so try again later */
                    il_log(LOG_ERR, "  error reading server %s reply:\n    %s\n", eq->dest_name, error_get_msg());
-                   eq->timeout = TIMEOUT;
+                   if (events_sent>0) 
+                       eq->timeout = 1;
+                   else
+                       eq->timeout = TIMEOUT;
                    return(0);
            }
 #ifdef LB_PERF