* patches from the LCG (and trunk)
authorMichal Voců <michal@ruk.cuni.cz>
Fri, 14 Jan 2005 14:56:01 +0000 (14:56 +0000)
committerMichal Voců <michal@ruk.cuni.cz>
Fri, 14 Jan 2005 14:56:01 +0000 (14:56 +0000)
org.glite.lb.logger/src/event_queue.c
org.glite.lb.logger/src/event_store.c
org.glite.lb.logger/src/il_master.c
org.glite.lb.logger/src/interlogd.h
org.glite.lb.logger/src/queue_thread.c

index d311a1a..a0a018f 100644 (file)
@@ -284,8 +284,9 @@ event_queue_move_events(struct event_queue *eq_s, struct event_queue *eq_d, char
        eq_s->tail = NULL;
        while(p) {
                if(strcmp(p->msg->job_id_s, notif_id) == 0) {
-                       il_log(LOG_DEBUG, "  moving event with notif id %s from %s:%d to %s:%d\n",
-                              notif_id, eq_s->dest_name,eq_s->dest_port, eq_d ? eq_d->dest_name : "trash",eq_d ? eq_d->dest_port : -1);
+                       il_log(LOG_DEBUG, "  moving event at offset %d from %s:%d to %s:%d\n",
+                              p->msg->offset, eq_s->dest_name,eq_s->dest_port, eq_d ? eq_d->dest_name : "trash",eq_d ? eq_d->dest_port : -1);
+                       il_log(LOG_DEBUG, "  current: %x, next: %x\n", p, p->prev);
                        /* remove the message from the source list */
                        *source_prev = p->prev;
                        if(eq_d) {
index 295b2f0..3ee97f3 100644 (file)
@@ -473,14 +473,25 @@ event_store_sync(struct event_store *es, long offset)
      * 2) es->offset is set only by recover() and next().
      * 3) Additional recover can not do much harm.
      * 4) And next() is only called by the same thread as sync().
-     * => no one is messing with us right now */
+     * 5) use_lock is in place, so no cleanup possible
+      * => no one is messing with us right now */
     event_store_lock_ro(es);
     if(ret < 0)
       ret = -1;
     else 
-      /* somehow we suppose that now es->offset >= offset */
-      /* in fact it must be es->offset > offset, anything else would be weird */
-      ret = (es->offset > offset) ? 0 : 1;
+           if(es->offset <= offset) {
+                   /* Apparently there is something wrong - we are receiving an event
+                    * which is beyond the end of file. Someone must have removed the file
+                    * when we were not looking. The question is - what should we do with the event?
+                    * We have to send it, as this is the only one occasion when we see it.
+                    * However, we must not allow the es->offset to be set using this event,
+                    * as it would point after the end of file. Sort this out in event_store_next().
+                    */
+                   ret = 1;
+           } else if(es->offset > offset) {
+                   /* we have seen at least this event */
+                   ret = 0;
+           }
   }
   event_store_unlock(es);
   return(ret);
@@ -488,12 +499,20 @@ event_store_sync(struct event_store *es, long offset)
 
 
 int
-event_store_next(struct event_store *es, int len)
+event_store_next(struct event_store *es, long offset, int len)
 {
   assert(es != NULL);
   
   event_store_lock(es);
-  es->offset += len;
+  /* Whoa, be careful now. The es->offset points right after the last enqueued event,
+   * but it may not be the offset of the event WE have just enqueued, because:!    
+   *  1) someone could have removed the event file behind our back
+   *  2) the file could have been recover()ed and more events read
+   * In either case the offset should not be moved.
+   */
+  if(es->offset == offset) {
+         es->offset += len;
+  }
   event_store_unlock(es);
 
   return(0);
index f6414c9..b045112 100644 (file)
@@ -40,14 +40,15 @@ enqueue_msg(struct event_queue *eq, struct server_msg *msg)
           (messsage was just to change the delivery address) */
        if(msg->len == 0) 
                return(0);
-
 #endif
        /* avoid losing signal to thread */
        event_queue_cond_lock(eq);
 
        /* insert new event */
-       if(event_queue_insert(eq, msg) < 0)
+       if(event_queue_insert(eq, msg) < 0) {
+               event_queue_cond_unlock(eq);
                return(-1);
+       }
       
        /* signal thread that we have a new message */
        event_queue_signal(eq);
@@ -359,7 +360,8 @@ handle_msg(char *event, long offset)
 #endif
 
        /* if there was no error, set the next expected event offset */
-       event_store_next(es, msg->ev_len);
+       event_store_next(es, offset, msg->ev_len);
+
        /* allow cleanup thread to check on this event_store */
        event_store_release(es);
 
index 3c96828..74fc469 100644 (file)
@@ -190,7 +190,7 @@ int event_store_cleanup();
 int event_store_recover_all(void);
 struct event_store *event_store_find(char *);
 int event_store_sync(struct event_store *, long);
-int event_store_next(struct event_store *, int);
+int event_store_next(struct event_store *, long, int);
 int event_store_commit(struct event_store *, int, int);
 int event_store_recover(struct event_store *);
 int event_store_release(struct event_store *);
index 5eeff30..3269dd0 100644 (file)
@@ -39,6 +39,8 @@ queue_thread(void *q)
                pthread_exit(NULL);
        }
   
+       il_log(LOG_DEBUG, "  started new thread for delivery to %s:%d\n", eq->dest_name, eq->dest_port);
+
        pthread_cleanup_push(queue_thread_cleanup, q); 
 
        event_queue_cond_lock(eq);
@@ -152,19 +154,26 @@ event_queue_create_thread(struct event_queue *eq)
 {
        assert(eq != NULL);
 
+       event_queue_lock(eq);
+
        /* if there is a thread already, just return */
-       if(eq->thread_id > 0)
+       if(eq->thread_id > 0) {
+               event_queue_unlock(eq);
                return(0);
+       }
 
        /* create the thread itself */
        if(pthread_create(&eq->thread_id, NULL, queue_thread, eq) < 0) {
                eq->thread_id = 0;
                set_error(IL_SYS, errno, "event_queue_create_thread: error creating new thread");
+               event_queue_unlock(eq);
                return(-1);
        }
 
        /* the thread is never going to be joined */
        pthread_detach(eq->thread_id);
+       
+       event_queue_unlock(eq);
 
        return(1);
 }