locking fixes, memory leaks
authorMichal Voců <michal@ruk.cuni.cz>
Fri, 24 Oct 2008 15:46:43 +0000 (15:46 +0000)
committerMichal Voců <michal@ruk.cuni.cz>
Fri, 24 Oct 2008 15:46:43 +0000 (15:46 +0000)
org.glite.lb.logger/src/event_queue.c
org.glite.lb.logger/src/event_store.c
org.glite.lb.logger/src/event_store_http.c
org.glite.lb.logger/src/il_master.c
org.glite.lb.logger/src/interlogd.c
org.glite.lb.logger/src/interlogd.h
org.glite.lb.logger/src/perftest_il.sh
org.glite.lb.logger/src/queue_mgr.c
org.glite.lb.logger/src/send_event.c
org.glite.lb.logger/src/server_msg.c

index b9345d3..5e59e68 100644 (file)
@@ -291,8 +291,8 @@ event_queue_move_events(struct event_queue *eq_s,
        eq_s->tail = NULL;
        while(p) {
          if((*cmp_func)(p->msg, data)) {
-                       il_log(LOG_DEBUG, "  moving event at offset %d from %s:%d to %s:%d\n",
-                              p->msg->offset, eq_s->dest_name, eq_s->dest_port, 
+                       il_log(LOG_DEBUG, "  moving event at offset %d(%d) from %s:%d to %s:%d\n",
+                              p->msg->offset, p->msg->generation, eq_s->dest_name, eq_s->dest_port, 
                               eq_d ? eq_d->dest_name : "trash", eq_d ? eq_d->dest_port : -1);
                        il_log(LOG_DEBUG, "  current: %x, next: %x\n", p, p->prev);
                        /* remove the message from the source list */
@@ -305,7 +305,8 @@ event_queue_move_events(struct event_queue *eq_s,
                                eq_d->tail = p;
                        } else {
                                /* signal that the message was 'delivered' */
-                               event_store_commit(p->msg->es, p->msg->ev_len, queue_list_is_log(eq_s));
+                               event_store_commit(p->msg->es, p->msg->ev_len, queue_list_is_log(eq_s),
+                                                  p->msg->generation);
                                /* free the message */
                                server_msg_free(p->msg);
                                free(p);
index 54ab4cd..4a93172 100644 (file)
@@ -139,7 +139,8 @@ event_store_free(struct event_store *es)
   if(es->event_file_name) free(es->event_file_name);
   if(es->control_file_name) free(es->control_file_name);
   pthread_rwlock_destroy(&es->use_lock);
-  pthread_rwlock_destroy(&es->update_lock);
+  pthread_rwlock_destroy(&es->commit_lock);
+  pthread_rwlock_destroy(&es->offset_lock);
   free(es);
 
   return(0);
@@ -175,7 +176,9 @@ event_store_create(char *job_id_s)
   es->control_file_name = jobid2controlfile(job_id);
   IL_EVENT_ID_FREE(job_id);
 
-  if(pthread_rwlock_init(&es->update_lock, NULL)) 
+  if(pthread_rwlock_init(&es->commit_lock, NULL)) 
+          abort();
+  if(pthread_rwlock_init(&es->offset_lock, NULL)) 
           abort();
   if(pthread_rwlock_init(&es->use_lock, NULL)) 
          abort();
@@ -190,7 +193,7 @@ event_store_lock_ro(struct event_store *es)
 {
   assert(es != NULL);
 
-  if(pthread_rwlock_rdlock(&es->update_lock)) 
+  if(pthread_rwlock_rdlock(&es->commit_lock)) 
     abort();
 
   return(0);
@@ -203,7 +206,7 @@ event_store_lock(struct event_store *es)
 {
   assert(es != NULL);
 
-  if(pthread_rwlock_wrlock(&es->update_lock)) 
+  if(pthread_rwlock_wrlock(&es->commit_lock)) 
     abort();
 
   return(0);
@@ -216,7 +219,7 @@ event_store_unlock(struct event_store *es)
 {
   assert(es != NULL);
 
-  if(pthread_rwlock_unlock(&es->update_lock)) 
+  if(pthread_rwlock_unlock(&es->commit_lock))
     abort();
   return(0);
 }
@@ -334,6 +337,9 @@ event_store_quarantine(struct event_store *es)
        es->last_committed_bs = 0;
        es->offset = 0;
 
+       /* increase cleanup count, this will invalidate all commits from previous generation */
+       es->generation++;
+
        return(0);
 }
 
@@ -373,7 +379,10 @@ event_store_recover(struct event_store *es)
   eq_l = queue_list_get(NULL);
 #endif
 
+  /* lock the event_store and offset locks */
   event_store_lock(es);
+  if(pthread_rwlock_wrlock(&es->offset_lock))
+         abort();
 
   il_log(LOG_DEBUG, "  reading events from %s\n", es->event_file_name);
 
@@ -385,6 +394,8 @@ event_store_recover(struct event_store *es)
                   es->event_file_name);
          set_error(IL_SYS, errno, err_msg);
          event_store_unlock(es);
+         if(pthread_rwlock_unlock(&es->offset_lock)) 
+                 abort();
          return(-1);
   }
 
@@ -400,6 +411,8 @@ event_store_recover(struct event_store *es)
                   es->event_file_name);
          set_error(IL_SYS, errno, err_msg);
          event_store_unlock(es);
+         if(pthread_rwlock_unlock(&es->offset_lock)) 
+                 abort();
          fclose(ef);
          return(-1);
   }
@@ -410,12 +423,16 @@ event_store_recover(struct event_store *es)
          il_log(LOG_ERR, "    could not stat event file %s: %s\n", es->event_file_name, strerror(errno));
          fclose(ef);
          event_store_unlock(es);
+         if(pthread_rwlock_unlock(&es->offset_lock)) 
+                 abort();
          return -1;
   } else {
          if((es->offset == stbuf.st_size) && (es->last_modified == stbuf.st_mtime)) {
                  il_log(LOG_DEBUG, "  event file not modified since last visit, skipping\n");
                  fclose(ef);
                  event_store_unlock(es);
+                 if(pthread_rwlock_unlock(&es->offset_lock)) 
+                         abort();
                  return(0);
          }
   }
@@ -454,6 +471,8 @@ event_store_recover(struct event_store *es)
                          set_error(IL_SYS, errno, "event_store_recover: error setting position for read");
                          event_store_unlock(es);
                          fclose(ef);
+                         if(pthread_rwlock_unlock(&es->offset_lock)) 
+                                 abort();
                          return(-1);
                  }
                  /* the last enqueued event MUST end with EVENT_SEPARATOR,
@@ -484,20 +503,38 @@ event_store_recover(struct event_store *es)
                          set_error(IL_SYS, errno, "event_store_recover: error setting position for read");
                          event_store_unlock(es);
                          fclose(ef);
+                         if(pthread_rwlock_unlock(&es->offset_lock)) 
+                                 abort();
                          return(-1);
                  }
                  break;
          }
   }
 
+  /* now we have: 
+   *   - event file opened at position 'last'
+   *   - offset and last_committed_* potentially reset to zero
+   */
+
+  /* release lock on commits, offset remains locked;
+   * other threads are allowed to send/remove events, but not insert
+   */
+  event_store_unlock(es);
+
   /* enqueue all remaining events */
   ret = 1;
   msg = NULL;
   while((event_s=read_event_string(ef)) != NULL) {
-       
+    long last_ls, last_bs;
+
     /* last holds the starting position of event_s in file */
     il_log(LOG_DEBUG, "    reading event at %ld\n", last);
 
+    event_store_lock_ro(es);
+    last_ls = es->last_committed_ls;
+    last_bs = es->last_committed_bs;
+    event_store_unlock(es);
+
     /* break from now on means there was some error */
     ret = -1;
 
@@ -515,13 +552,15 @@ event_store_recover(struct event_store *es)
            /* actually do not bother if quarantine succeeded or not - we could not do more */
            event_store_quarantine(es);
            fclose(ef);
-           event_store_unlock(es);
+           if(pthread_rwlock_unlock(&es->offset_lock)) 
+                   abort();
            return(-1);
     }
     msg->es = es;
+    msg->generation = es->generation;
 
     /* first enqueue to the LS */
-    if(!bs_only && (last >= es->last_committed_ls)) {
+    if(!bs_only && (last >= last_ls)) {
       
       il_log(LOG_DEBUG, "      queueing event at %ld to logging server\n", last);
 
@@ -537,7 +576,7 @@ event_store_recover(struct event_store *es)
 
     /* now enqueue to the BS, if neccessary */
     if((eq_b != eq_l) && 
-       (last >= es->last_committed_bs)) {
+       (last >= last_bs)) {
       
       il_log(LOG_DEBUG, "      queueing event at %ld to bookkeeping server\n", last);
       
@@ -555,7 +594,6 @@ event_store_recover(struct event_store *es)
 
   } /* while */
 
-  /* due to this little assignment we had to lock the event_store for writing */
   es->offset = last;
   es->last_modified = stbuf.st_mtime;
   il_log(LOG_DEBUG, "  event store offset set to %ld\n", last);
@@ -566,7 +604,9 @@ event_store_recover(struct event_store *es)
   fclose(ef);
   il_log(LOG_DEBUG, "  finished reading events with %d\n", ret);
 
-  event_store_unlock(es);
+  if(pthread_rwlock_unlock(&es->offset_lock)) 
+         abort();
+
   return(ret);
 }
 
@@ -671,10 +711,17 @@ event_store_next(struct event_store *es, long offset, int len)
  *
  */
 int
-event_store_commit(struct event_store *es, int len, int ls)
+event_store_commit(struct event_store *es, int len, int ls, int generation)
 {
   assert(es != NULL);
 
+  /* do not move counters if event store with this message was cleaned up 
+   * (this can happen only when moving to quarantine)
+   */
+  /* XXX - assume int access is atomic */
+  if(generation != es->generation) 
+         return 0;
+
   event_store_lock(es);
 
   if(ls)
@@ -717,7 +764,7 @@ event_store_clean(struct event_store *es)
 
   /* prevent sender threads from updating */
   event_store_lock(es);
-  
+
   il_log(LOG_DEBUG, "  trying to cleanup event store %s\n", es->job_id_s);
   il_log(LOG_DEBUG, "    bytes sent to logging server: %d\n", es->last_committed_ls);
   il_log(LOG_DEBUG, "    bytes sent to bookkeeping server: %d\n", es->last_committed_bs);
@@ -730,6 +777,11 @@ event_store_clean(struct event_store *es)
     return(0);
   }
 
+  if(fd = pthread_rwlock_wrlock(&es->offset_lock)) {
+         fprintf(stderr, "Fatal locking error: %s\n", strerror(fd));
+         abort();
+  }
+
   /* the file can only be removed when all the events were succesfully sent 
      (ie. committed both by LS and BS */
   /* That also implies that the event queues are 'empty' at the moment. */
@@ -738,6 +790,8 @@ event_store_clean(struct event_store *es)
     /* if we can not open the event store, it is an error and the struct should be removed */
     /* XXX - is it true? */
     event_store_unlock(es);
+    if(pthread_rwlock_unlock(&es->offset_lock)) 
+           abort();
     il_log(LOG_ERR,  "  event_store_clean: error opening event file: %s\n", strerror(errno));
     return(1);
   }
@@ -753,6 +807,8 @@ event_store_clean(struct event_store *es)
     il_log(LOG_DEBUG, "    could not lock event file, cleanup aborted\n");
     fclose(ef);
     event_store_unlock(es);
+    if(pthread_rwlock_unlock(&es->offset_lock)) 
+           abort();
     if(errno != EACCES &&
        errno != EAGAIN) {
       set_error(IL_SYS, errno, "event_store_clean: error locking event file");
@@ -766,6 +822,8 @@ event_store_clean(struct event_store *es)
   if(fseek(ef, 0, SEEK_END) < 0) {
     set_error(IL_SYS, errno, "event_store_clean: error seeking the end of file");
     event_store_unlock(es);
+    if(pthread_rwlock_unlock(&es->offset_lock)) 
+           abort();
     fclose(ef);
     return(-1);
   }
@@ -776,6 +834,8 @@ event_store_clean(struct event_store *es)
   if(es->last_committed_ls < last) {
     fclose(ef);
     event_store_unlock(es);
+    if(pthread_rwlock_unlock(&es->offset_lock)) 
+           abort();
     il_log(LOG_DEBUG, "    events still waiting in queue, cleanup aborted\n");
     return(0);
   } else if( es->last_committed_ls > last) {
@@ -784,6 +844,8 @@ event_store_clean(struct event_store *es)
             some undelivered events referring to that event store */
          fclose(ef);
          event_store_unlock(es);
+         if(pthread_rwlock_unlock(&es->offset_lock)) 
+                 abort();
          return(0);
   }
   
@@ -799,8 +861,14 @@ event_store_clean(struct event_store *es)
   es->last_committed_bs = 0;
   es->offset = 0;
 
+  /* increasing the generation count is rather pointless here, because there
+     are no messages waiting in the queue that would be invalidated */
+  /* es->generation++ */
+
   /* unlock the event_store even if it is going to be removed */
   event_store_unlock(es);
+  if(pthread_rwlock_unlock(&es->offset_lock)) 
+         abort();
 
   /* close the event file (that unlocks it as well) */
   fclose(ef);
@@ -898,7 +966,6 @@ event_store_from_file(char *filename)
        edg_wll_Context context;
        char *dest_name = NULL;
 
-       edg_wll_InitContext(&context);
 #endif
        
        il_log(LOG_INFO, "  attaching to event file: %s\n", filename);
@@ -919,7 +986,10 @@ event_store_from_file(char *filename)
                return(0);
        
 #if defined(IL_NOTIFICATIONS)
-       if((ret=edg_wll_ParseNotifEvent(context, event_s, &notif_event))) {
+       edg_wll_InitContext(&context);
+       ret=edg_wll_ParseNotifEvent(context, event_s, &notif_event);
+       edg_wll_FreeContext(context);
+       if(ret) {
                set_error(IL_LBAPI, ret, "event_store_from_file: could not parse event");
                ret = -1;
                goto out;
@@ -935,10 +1005,12 @@ event_store_from_file(char *filename)
                ret = -1;
                goto out;
        }
+       /*  XXX: what was that good for?
        if(notif_event->notification.dest_host && 
           (strlen(notif_event->notification.dest_host) > 0)) {
                asprintf(&dest_name, "%s:%d", notif_event->notification.dest_host, notif_event->notification.dest_port);
        }
+       */
        
 #else
        job_id_s = edg_wll_GetJobId(event_s);
index a34e0a3..ebd5523 100644 (file)
@@ -128,7 +128,7 @@ event_store_free(struct event_store *es)
   if(es->event_file_name) free(es->event_file_name);
   if(es->control_file_name) free(es->control_file_name);
   pthread_rwlock_destroy(&es->use_lock);
-  pthread_rwlock_destroy(&es->update_lock);
+  pthread_rwlock_destroy(&es->commit_lock);
   free(es);
 
   return(0);
@@ -155,7 +155,7 @@ event_store_create(char *job_id_s)
   es->event_file_name = jobid2eventfile(job_id_s);
   es->control_file_name = jobid2controlfile(job_id_s);
 
-  if(pthread_rwlock_init(&es->update_lock, NULL)) 
+  if(pthread_rwlock_init(&es->commit_lock, NULL)) 
           abort();
   if(pthread_rwlock_init(&es->use_lock, NULL)) 
          abort();
@@ -170,7 +170,7 @@ event_store_lock_ro(struct event_store *es)
 {
   assert(es != NULL);
 
-  if(pthread_rwlock_rdlock(&es->update_lock)) 
+  if(pthread_rwlock_rdlock(&es->commit_lock)) 
     abort();
 
   return(0);
@@ -183,7 +183,7 @@ event_store_lock(struct event_store *es)
 {
   assert(es != NULL);
 
-  if(pthread_rwlock_wrlock(&es->update_lock)) 
+  if(pthread_rwlock_wrlock(&es->commit_lock)) 
     abort();
 
   return(0);
@@ -196,7 +196,7 @@ event_store_unlock(struct event_store *es)
 {
   assert(es != NULL);
 
-  if(pthread_rwlock_unlock(&es->update_lock)) 
+  if(pthread_rwlock_unlock(&es->commit_lock)) 
     abort();
   return(0);
 }
index 0721aa5..c3b0307 100644 (file)
@@ -359,7 +359,7 @@ handle_msg(il_octet_string_t *event, long offset)
        if((es = event_store_find(msg->job_id_s)) == NULL) 
                return(-1);
        msg->es = es;
-
+       
 #ifdef LB_PERF
        if(nosync) 
                ret = 1;
index 8bf4833..b247487 100644 (file)
@@ -57,6 +57,7 @@ static void usage (int status)
               "  -l, --log-server <host>    specify address of log server\n"
               "  -s, --socket <path>        non-default path of local socket\n"
               "  -L, --lazy [<timeout>]     be lazy when closing connections to servers (default, timeout==0 means turn lazy off)\n"
+              "  -p, --parallel [<num>]     use <num> parallel streams to the same server\n"
 #ifdef LB_PERF
               "  -n, --nosend               PERFTEST: consume events instead of sending\n"
               "  -S, --nosync               PERFTEST: do not check logd files for lost events\n"
@@ -79,6 +80,7 @@ char *file_prefix = DEFAULT_PREFIX;
 int bs_only = 0;
 int lazy_close = 1;
 int default_close_timeout;
+int parallel = 0;
 #ifdef LB_PERF
 int nosend = 0, norecover=0, nosync=0, noparse=0;
 char *event_source = NULL;
@@ -105,6 +107,7 @@ static struct option const long_options[] =
   {"log-server", required_argument, 0, 'l'},
   {"socket", required_argument, 0, 's'},
   {"lazy", optional_argument, 0, 'L'},
+  {"parallel", optional_argument, 0, 'p'},
 #ifdef LB_PERF
   {"nosend", no_argument, 0, 'n'},
   {"nosync", no_argument, 0, 'S'},
@@ -140,6 +143,7 @@ decode_switches (int argc, char **argv)
                           "b"  /* only bookeeping */
                            "l:" /* log server */
                           "d" /* debug */
+                          "p" /* parallel */
 #ifdef LB_PERF
                           "n" /* nosend */
                           "S" /* nosync */
@@ -211,6 +215,13 @@ decode_switches (int argc, char **argv)
                        default_close_timeout = TIMEOUT;
                break;
 
+       case 'p':
+               if(optarg) 
+                       parallel = atoi(optarg);
+               else 
+                       parallel = 4;
+               break;
+
 #ifdef LB_PERF
        case 'n':
                nosend = 1;
index c0af5b3..d3d81b6 100644 (file)
@@ -78,6 +78,7 @@ extern int bs_only;
 extern int killflg;
 extern int lazy_close;
 extern int default_close_timeout;
+extern int parallel;
 #ifdef LB_PERF
 extern int nosend, nosync, norecover, noparse;
 #ifdef PERF_EVENTS_INLINE
@@ -122,8 +123,9 @@ struct event_store {
        long      last_committed_ls;       /*  -"-                                           LS */
        long      offset;                  /* expected file position of next event */
        time_t    last_modified;           /* time of the last file modification */
-       int       recovering;              /* flag for recovery mode */
-       pthread_rwlock_t update_lock;      /* lock to prevent simultaneous updates */
+       int       generation;              /* cleanup counter, scopes the offset */
+       pthread_rwlock_t commit_lock;      /* lock to prevent simultaneous updates to last_committed_* */
+       pthread_rwlock_t offset_lock;      /* lock to prevent simultaneous updates offset */
        pthread_rwlock_t use_lock;         /* lock to prevent struct deallocation */
 #if defined(IL_NOTIFICATIONS)
        char     *dest;                    /* host:port destination */
@@ -138,6 +140,7 @@ struct server_msg {
        int                     len;
        int                     ev_len;
        struct event_store     *es;             /* cache for corresponding event store */
+       int                     generation;     /* event store genereation */
        long                    receipt_to;     /* receiver (long local-logger id - LLLID) of delivery confirmation (for priority messages) */
 #if defined(IL_NOTIFICATIONS)
        char                   *dest_name;
@@ -244,7 +247,7 @@ int event_store_recover_all(void);
 struct event_store *event_store_find(char *);
 int event_store_sync(struct event_store *, long);
 int event_store_next(struct event_store *, long, int);
-int event_store_commit(struct event_store *, int, int);
+int event_store_commit(struct event_store *, int, int, int);
 int event_store_recover(struct event_store *);
 int event_store_release(struct event_store *);
 /* int event_store_remove(struct event_store *); */
index 6121555..318b9e8 100644 (file)
@@ -15,7 +15,7 @@ DEBUG=${DEBUG:-0}
 # CONSUMER_ARGS=
 # PERFTEST_COMPONENT=
 # COMPONENT_ARGS=
-#LOGJOBS_ARGS="
+LOGJOBS_ARGS="-s /tmp/interlogger.perftest
 
 check_test_files || exit 1
 
index 2134a32..2879e14 100644 (file)
@@ -89,7 +89,7 @@ queue_list_add(struct queue_list **ql, const char *dest, struct event_queue *eq)
     return(-1);
   }
   el->queue = eq;
-  el->next = queues;
+  el->next = *ql;
   *ql = el;
   return 0;
 }
index d33d541..d0e546b 100644 (file)
@@ -336,7 +336,7 @@ event_queue_send(struct event_queue *eq)
     default: /* LB_DBERR, LB_PROTO */
       /* the event was not accepted by the server */
       /* update the event pointer */
-      if(event_store_commit(msg->es, msg->ev_len, queue_list_is_log(eq)) < 0) 
+        if(event_store_commit(msg->es, msg->ev_len, queue_list_is_log(eq), msg->generation) < 0)
        /* failure committing message, this is bad */
        return(-1);
       /* if we have just delivered priority message from the queue, send confirmation */
index 90dec49..21fa23d 100644 (file)
@@ -144,13 +144,16 @@ server_msg_init(struct server_msg *msg, il_octet_string_t *event)
 
 
 #if defined(IL_NOTIFICATIONS)
-       edg_wll_InitContext(&context);
 
        /* parse the notification event */
-       if((ret=edg_wll_ParseNotifEvent(context, event->data, &notif_event))) {
+       edg_wll_InitContext(&context);
+       ret=edg_wll_ParseNotifEvent(context, event->data, &notif_event);
+       edg_wll_FreeContext(context);
+       if(ret) {
                set_error(IL_LBAPI, ret, "server_msg_init: error parsing notification event");
                return(-1);
        }
+
        /* FIXME: check for allocation error */
        if(notif_event->notification.dest_host && 
           (strlen(notif_event->notification.dest_host) > 0)) {