fix for SB#95013 and maybe couple of others
authorMichal Voců <michal@ruk.cuni.cz>
Mon, 11 Jun 2012 12:49:30 +0000 (12:49 +0000)
committerMichal Voců <michal@ruk.cuni.cz>
Mon, 11 Jun 2012 12:49:30 +0000 (12:49 +0000)
org.glite.lb.logger/interface/interlogd.h
org.glite.lb.logger/src/event_queue.c
org.glite.lb.logger/src/send_event.c
org.glite.lb.logger/src/server_msg.c

index c2ef7d0..1420cea 100644 (file)
@@ -180,6 +180,8 @@ struct server_msg {
        char                   *dest;
        char                   *owner;
        time_t                  expires;        /* time (in seconds from epoch) the message expires */
+       int                     use_count;      /* number of queue threads trying to deliver this message */
+       pthread_mutex_t         use_lock;       /* protect the use_count */
 };
 
 
@@ -240,6 +242,8 @@ int server_msg_init(struct server_msg *, il_octet_string_t *);
 int server_msg_is_priority(struct server_msg *);
 #endif
 int server_msg_free(struct server_msg *);
+void server_msg_use(struct server_msg *);
+int server_msg_release(struct server_msg *);
 
 /* general event queue methods */
 struct event_queue *event_queue_create(char *, struct il_output_plugin *);
index f8ec91a..001982d 100644 (file)
@@ -37,6 +37,7 @@ struct event_queue_msg {
   struct server_msg *msg;
   struct event_queue_msg *prev;
   struct event_queue_msg *next;
+  struct event_queue *eq;             /* queue we are currently in */
 };
 
 struct event_queue *
@@ -257,6 +258,8 @@ event_queue_insert(struct event_queue *eq, struct server_msg *msg)
   if(++eq->cur_len > eq->max_len)
          eq->max_len = eq->cur_len;
 
+  el->eq = eq;
+
   event_queue_unlock(eq);
   /* end of critical section */
 
@@ -306,6 +309,7 @@ event_queue_get(struct event_queue *eq, struct queue_thread *me, struct server_m
          me->current = el;
          me->jobid = el->msg->job_id_s;
          *msg = el->msg;
+         server_msg_use(*msg);
   }
   event_queue_unlock(eq);
 
@@ -331,19 +335,37 @@ event_queue_remove(struct event_queue *eq, struct queue_thread *me)
   /* this is critical section */
   event_queue_lock(eq);
 
-  if(el == el->prev) {
-         /* last element */
-         eq->head = NULL;
-         eq->tail_ems = NULL;
-  } else {
-         el->next->prev = el->prev;
-         el->prev->next = el->next;
-         if(el == eq->head) {
-                 eq->head = el->prev;
+  me->current = NULL;
+  me->jobid = NULL;
+
+  if(eq == el->eq) {
+         /* message is still ours, remove from queue */
+         if(el == el->prev) {
+                 /* last element */
+                 eq->head = NULL;
+                 eq->tail_ems = NULL;
+         } else {
+                 el->next->prev = el->prev;
+                 el->prev->next = el->next;
+                 if(el == eq->head) {
+                         eq->head = el->prev;
+                 }
+                 if(el == eq->tail_ems) {
+                         eq->tail_ems = el->prev;
+                 }
          }
-         if(el == eq->tail_ems) {
-                 eq->tail_ems = el->prev;
+         
+         if(--eq->cur_len == 0)
+                 eq->times_empty++;
+         
+         if(eq->cur_len <= queue_size_low) {
+                 eq->throttling = 0;
          }
+  } else {
+         /* not ours anymore */
+         server_msg_release(el->msg);
+         event_queue_unlock(eq);
+         return 0;
   }
 
 #if 0 /* OLD IMPLEMENTATION */
@@ -387,16 +409,7 @@ event_queue_remove(struct event_queue *eq, struct queue_thread *me)
 #endif
 #endif /* OLD IMPLEMENTATION */
 
-  if(--eq->cur_len == 0)
-         eq->times_empty++;
-
-  if(eq->cur_len <= queue_size_low) {
-         eq->throttling = 0;
-  }
   
-  me->current = NULL;
-  me->jobid = NULL;
-
   event_queue_unlock(eq);
   /* end of critical section */
 
@@ -439,7 +452,7 @@ event_queue_move_events(struct event_queue *eq_s,
                                         eq_d ? eq_d->dest_name : "trash", eq_d ? eq_d->dest_port : -1);
                        q = p->prev;
                        /* remove message from the source queue */
-                       if(p->next == p->prev) {
+                       if(p == p->prev) {
                                /* removing last message */
                                eq_s->head = NULL;
                                eq_s->tail_ems = NULL;
@@ -472,6 +485,7 @@ event_queue_move_events(struct event_queue *eq_s,
                                if(++eq_d->cur_len > eq_d->max_len) {
                                        eq_d->max_len = eq_d->cur_len;
                                }
+                               p->eq = eq_d;
                        } else {
                                /* signal that the message was 'delivered' */
                                event_store_commit(p->msg->es, p->msg->ev_len, queue_list_is_log(eq_s),
index 8b34cf5..a7847a2 100644 (file)
@@ -321,6 +321,7 @@ event_queue_send(struct event_queue *eq, struct queue_thread *me)
                me->timeout = 0;
              else
                me->timeout = TIMEOUT;
+             server_msg_release(msg);
              return(0);
            }
            
@@ -335,6 +336,7 @@ event_queue_send(struct event_queue *eq, struct queue_thread *me)
                        glite_common_log(IL_LOG_CATEGORY, LOG_PRIORITY_WARN, "  error reading server %s reply: %s", 
                                         eq->dest_name, error_get_msg());
                     }
+                   server_msg_release(msg);
                    return(0);
            }
        }
@@ -364,6 +366,7 @@ event_queue_send(struct event_queue *eq, struct queue_thread *me)
            if(!(ENOENT == code_min)) {
                    /* non fatal errors (for us) */
                    me->timeout = TIMEOUT;
+                   server_msg_release(msg);
                    return(0);
            }
        
@@ -374,9 +377,11 @@ event_queue_send(struct event_queue *eq, struct queue_thread *me)
     default: /* LB_PROTO */
       /* the event was not accepted by the server */
       /* update the event pointer */
-        if(event_store_commit(msg->es, msg->ev_len, queue_list_is_log(eq), msg->generation) < 0)
-       /* failure committing message, this is bad */
-       return(-1);
+      if(event_store_commit(msg->es, msg->ev_len, queue_list_is_log(eq), msg->generation) < 0) {
+             /* failure committing message, this is bad */
+             server_msg_release(msg);
+             return(-1);
+      }
       /* if we have just delivered priority message from the queue, send confirmation */
       ret = 1;
 #if defined(INTERLOGD_EMS)
index 28ce300..8bea3ab 100644 (file)
@@ -142,6 +142,8 @@ server_msg_copy(struct server_msg *src)
 #endif
   msg->expires = src->expires;
   msg->generation = src->generation;
+  msg->use_count = 0;
+  pthread_mutex_init(&msg->use_lock, NULL);
   return(msg);
 }
 
@@ -160,6 +162,8 @@ server_msg_init(struct server_msg *msg, il_octet_string_t *event)
 
        memset(msg, 0, sizeof(*msg));
 
+       pthread_mutex_init(&msg->use_lock, NULL);
+       msg->use_count = 0;
 
 #if defined(IL_NOTIFICATIONS)
 
@@ -238,13 +242,41 @@ server_msg_free(struct server_msg *msg)
 {
   assert(msg != NULL);
 
-  if(msg->msg) free(msg->msg);
-  if(msg->job_id_s) free(msg->job_id_s);
+  if(server_msg_release(msg) <= 0) {
+         pthread_mutex_destroy(&msg->use_lock);
+         if(msg->msg) free(msg->msg);
+         if(msg->job_id_s) free(msg->job_id_s);
 #if defined(IL_NOTIFICATIONS)
-  if(msg->dest_name) free(msg->dest_name);
-  if(msg->dest) free(msg->dest);
-  if(msg->owner) free(msg->owner);
+         if(msg->dest_name) free(msg->dest_name);
+         if(msg->dest) free(msg->dest);
+         if(msg->owner) free(msg->owner);
 #endif
-  free(msg);
+         free(msg);
+  }
   return 0;
 }
+
+
+void
+server_msg_use(struct server_msg *msg)
+{
+       assert(msg != NULL);
+
+       pthread_mutex_lock(&msg->use_lock);
+       (msg->use_count)++;
+       pthread_mutex_unlock(&msg->use_lock);
+}
+
+
+int
+server_msg_release(struct server_msg *msg)
+{
+       int ret;
+
+       assert(msg != NULL);
+       
+       pthread_mutex_lock(&msg->use_lock);
+       ret = --(msg->use_count);
+       pthread_mutex_unlock(&msg->use_lock);
+       return ret;
+}