char                   *dest;
        char                   *owner;
        time_t                  expires;        /* time (in seconds from epoch) the message expires */
+       int                     use_count;      /* number of queue threads trying to deliver this message */
+       pthread_mutex_t         use_lock;       /* protect the use_count */
 };
 
 
 int server_msg_is_priority(struct server_msg *);
 #endif
 int server_msg_free(struct server_msg *);
+void server_msg_use(struct server_msg *);
+int server_msg_release(struct server_msg *);
 
 /* general event queue methods */
 struct event_queue *event_queue_create(char *, struct il_output_plugin *);
 
   struct server_msg *msg;
   struct event_queue_msg *prev;
   struct event_queue_msg *next;
+  struct event_queue *eq;             /* queue we are currently in */
 };
 
 struct event_queue *
   if(++eq->cur_len > eq->max_len)
          eq->max_len = eq->cur_len;
 
+  el->eq = eq;
+
   event_queue_unlock(eq);
   /* end of critical section */
 
          me->current = el;
          me->jobid = el->msg->job_id_s;
          *msg = el->msg;
+         server_msg_use(*msg);
   }
   event_queue_unlock(eq);
 
   /* this is critical section */
   event_queue_lock(eq);
 
-  if(el == el->prev) {
-         /* last element */
-         eq->head = NULL;
-         eq->tail_ems = NULL;
-  } else {
-         el->next->prev = el->prev;
-         el->prev->next = el->next;
-         if(el == eq->head) {
-                 eq->head = el->prev;
+  me->current = NULL;
+  me->jobid = NULL;
+
+  if(eq == el->eq) {
+         /* message is still ours, remove from queue */
+         if(el == el->prev) {
+                 /* last element */
+                 eq->head = NULL;
+                 eq->tail_ems = NULL;
+         } else {
+                 el->next->prev = el->prev;
+                 el->prev->next = el->next;
+                 if(el == eq->head) {
+                         eq->head = el->prev;
+                 }
+                 if(el == eq->tail_ems) {
+                         eq->tail_ems = el->prev;
+                 }
          }
-         if(el == eq->tail_ems) {
-                 eq->tail_ems = el->prev;
+         
+         if(--eq->cur_len == 0)
+                 eq->times_empty++;
+         
+         if(eq->cur_len <= queue_size_low) {
+                 eq->throttling = 0;
          }
+  } else {
+         /* not ours anymore */
+         server_msg_release(el->msg);
+         event_queue_unlock(eq);
+         return 0;
   }
 
 #if 0 /* OLD IMPLEMENTATION */
 #endif
 #endif /* OLD IMPLEMENTATION */
 
-  if(--eq->cur_len == 0)
-         eq->times_empty++;
-
-  if(eq->cur_len <= queue_size_low) {
-         eq->throttling = 0;
-  }
   
-  me->current = NULL;
-  me->jobid = NULL;
-
   event_queue_unlock(eq);
   /* end of critical section */
 
                                         eq_d ? eq_d->dest_name : "trash", eq_d ? eq_d->dest_port : -1);
                        q = p->prev;
                        /* remove message from the source queue */
-                       if(p->next == p->prev) {
+                       if(p == p->prev) {
                                /* removing last message */
                                eq_s->head = NULL;
                                eq_s->tail_ems = NULL;
                                if(++eq_d->cur_len > eq_d->max_len) {
                                        eq_d->max_len = eq_d->cur_len;
                                }
+                               p->eq = eq_d;
                        } else {
                                /* signal that the message was 'delivered' */
                                event_store_commit(p->msg->es, p->msg->ev_len, queue_list_is_log(eq_s),
 
                me->timeout = 0;
              else
                me->timeout = TIMEOUT;
+             server_msg_release(msg);
              return(0);
            }
            
                        glite_common_log(IL_LOG_CATEGORY, LOG_PRIORITY_WARN, "  error reading server %s reply: %s", 
                                         eq->dest_name, error_get_msg());
                     }
+                   server_msg_release(msg);
                    return(0);
            }
        }
            if(!(ENOENT == code_min)) {
                    /* non fatal errors (for us) */
                    me->timeout = TIMEOUT;
+                   server_msg_release(msg);
                    return(0);
            }
        
     default: /* LB_PROTO */
       /* the event was not accepted by the server */
       /* update the event pointer */
-        if(event_store_commit(msg->es, msg->ev_len, queue_list_is_log(eq), msg->generation) < 0)
-       /* failure committing message, this is bad */
-       return(-1);
+      if(event_store_commit(msg->es, msg->ev_len, queue_list_is_log(eq), msg->generation) < 0) {
+             /* failure committing message, this is bad */
+             server_msg_release(msg);
+             return(-1);
+      }
       /* if we have just delivered priority message from the queue, send confirmation */
       ret = 1;
 #if defined(INTERLOGD_EMS)
 
 #endif
   msg->expires = src->expires;
   msg->generation = src->generation;
+  msg->use_count = 0;
+  pthread_mutex_init(&msg->use_lock, NULL);
   return(msg);
 }
 
 
        memset(msg, 0, sizeof(*msg));
 
+       pthread_mutex_init(&msg->use_lock, NULL);
+       msg->use_count = 0;
 
 #if defined(IL_NOTIFICATIONS)
 
 {
   assert(msg != NULL);
 
-  if(msg->msg) free(msg->msg);
-  if(msg->job_id_s) free(msg->job_id_s);
+  if(server_msg_release(msg) <= 0) {
+         pthread_mutex_destroy(&msg->use_lock);
+         if(msg->msg) free(msg->msg);
+         if(msg->job_id_s) free(msg->job_id_s);
 #if defined(IL_NOTIFICATIONS)
-  if(msg->dest_name) free(msg->dest_name);
-  if(msg->dest) free(msg->dest);
-  if(msg->owner) free(msg->owner);
+         if(msg->dest_name) free(msg->dest_name);
+         if(msg->dest) free(msg->dest);
+         if(msg->owner) free(msg->owner);
 #endif
-  free(msg);
+         free(msg);
+  }
   return 0;
 }
+
+
+void
+server_msg_use(struct server_msg *msg)
+{
+       assert(msg != NULL);
+
+       pthread_mutex_lock(&msg->use_lock);
+       (msg->use_count)++;
+       pthread_mutex_unlock(&msg->use_lock);
+}
+
+
+int
+server_msg_release(struct server_msg *msg)
+{
+       int ret;
+
+       assert(msg != NULL);
+       
+       pthread_mutex_lock(&msg->use_lock);
+       ret = --(msg->use_count);
+       pthread_mutex_unlock(&msg->use_lock);
+       return ret;
+}