}
 
 #endif
+
+/* Remove all expired events.
+ *
+ * Returns: -1 - error, 0 - 
+ */
+int
+event_queue_clean_expired(struct event_queue *eq)
+{
+       time_t now = time(NULL);
+       struct event_queue_msg *p, **prev;
+
+       assert(eq != NULL);
+
+       event_queue_lock(eq);
+       prev = &(eq->head);
+       p = *prev;
+       eq->tail = NULL;
+       while(p) {
+               if((p->msg->expires > 0) && (p->msg->expires < now)) {
+                       il_log(LOG_DEBUG, "    removing expired event at offset %d from %s:%d\n",
+                              p->msg->offset, eq->dest_name,eq->dest_port);
+                       /* il_log(LOG_DEBUG, "  current: %x, next: %x\n", p, p->prev); */
+                       /* remove the message from the list */
+                       *prev = p->prev;
+                       /* commit and free the message */
+                       event_store_commit(p->msg->es, p->msg->ev_len, queue_list_is_log(eq));
+                       server_msg_free(p->msg);
+                       free(p);
+               } else {
+                       /* message stays */
+                       prev = &(p->prev);
+                       eq->tail = p;
+               }
+               p = *prev;
+       }
+       event_queue_unlock(eq);
+       return(0);
+}
 
        struct event_queue *eq = (struct event_queue *)q;
        int ret, exit;
        int retrycnt;
-       int close_timeout;
+       int close_timeout = 0;
+       int exit_timeout = EXIT_TIMEOUT;
 
        if(init_errors(0) < 0) {
                il_log(LOG_ERR, "Error initializing thread specific data, exiting!");
                                               eq->dest_name, eq->dest_port);
                                }
                                close_timeout = 0;
-                       } else 
-                               ret = event_queue_wait(eq, 0);
+                       } else {
+                               ret = event_queue_wait(eq, exit_timeout);
+                               if(ret == 1) {
+                                       il_log(LOG_INFO, "  thread idle for more than %d seconds, exiting\n", exit_timeout);
+                                       event_queue_close(eq);
+                                       event_queue_cond_unlock(eq);
+                                       pthread_exit((void*)0);
+                               }
+                       }
                        if(ret < 0) {
                                /* error waiting */
                                il_log(LOG_ERR, "queue_thread: %s\n", error_get_msg());
                        }
                }  /* END while(empty) */
     
-               il_log(LOG_DEBUG, "  attempting delivery to %s:%d\n", eq->dest_name, eq->dest_port);
 
                /* allow other threads to signal us, ie. insert new events while
                 * we are sending or request flush operation
                 */
                event_queue_cond_unlock(eq);
                
-               /* connect to server */
-               if((ret=event_queue_connect(eq)) == 0) {
-                       /* not connected */
-                       if(error_get_maj() != IL_OK)
-                               il_log(LOG_ERR, "queue_thread: %s\n", error_get_msg());
+               /* discard expired events */
+               il_log(LOG_DEBUG, "  discarding expired events\n");
+               event_queue_clean_expired(eq);
+               if(!event_queue_empty(eq)) {
+
+                       /* deliver pending events */
+                       il_log(LOG_DEBUG, "  attempting delivery to %s:%d\n", eq->dest_name, eq->dest_port);
+                       /* connect to server */
+                       if((ret=event_queue_connect(eq)) == 0) {
+                               /* not connected */
+                               if(error_get_maj() != IL_OK)
+                                       il_log(LOG_ERR, "queue_thread: %s\n", error_get_msg());
 #if defined(IL_NOTIFICATIONS)
-                       il_log(LOG_INFO, "    could not connect to client %s, waiting for retry\n", eq->dest_name);
+                               il_log(LOG_INFO, "    could not connect to client %s, waiting for retry\n", eq->dest_name);
 #else
-                       il_log(LOG_INFO, "    could not connect to bookkeeping server %s, waiting for retry\n", eq->dest_name);
+                               il_log(LOG_INFO, "    could not connect to bookkeeping server %s, waiting for retry\n", eq->dest_name);
 #endif
-                       retrycnt++;
-               } else {
-                       retrycnt = 0;
-                       /* connected, send events */
-                       switch(ret=event_queue_send(eq)) {
-                               
-                       case 0:
-                               /* there was an error and we still have events to send */
-                               if(error_get_maj() != IL_OK)
+                               retrycnt++;
+                       } else {
+                               retrycnt = 0;
+                               /* connected, send events */
+                               switch(ret=event_queue_send(eq)) {
+                                       
+                               case 0:
+                                       /* there was an error and we still have events to send */
+                                       if(error_get_maj() != IL_OK)
+                                               il_log(LOG_ERR, "queue_thread: %s\n", error_get_msg());
+                                       il_log(LOG_DEBUG, "  events still waiting\n");
+                                       break;
+                                       
+                               case 1:
+                                       /* hey, we are done for now */
+                                       il_log(LOG_DEBUG, "  all events for %s sent\n", eq->dest_name);
+                                       break;
+                                       
+                               default:
+                                       /* internal error */
                                        il_log(LOG_ERR, "queue_thread: %s\n", error_get_msg());
-                               il_log(LOG_DEBUG, "  events still waiting\n");
-                               break;
-                               
-                       case 1:
-                               /* hey, we are done for now */
-                               il_log(LOG_DEBUG, "  all events for %s sent\n", eq->dest_name);
-                               break;
-                               
-                       default:
-                               /* internal error */
-                               il_log(LOG_ERR, "queue_thread: %s\n", error_get_msg());
-                               exit = 1;      
-                               break;
-                               
-                       } /* switch */
+                                       exit = 1;      
+                                       break;
+                                       
+                               } /* switch */
                        
-                       /* we are done for now anyway, so close the queue */
-                       if((ret == 1) && lazy_close)
-                               close_timeout = default_close_timeout;
-                       else {
-                               event_queue_close(eq);
-                               il_log(LOG_DEBUG, "  connection to %s:%d closed\n",
-                                      eq->dest_name, eq->dest_port);
+                               /* we are done for now anyway, so close the queue */
+                               if((ret == 1) && lazy_close)
+                                       close_timeout = default_close_timeout;
+                               else {
+                                       event_queue_close(eq);
+                                       il_log(LOG_DEBUG, "  connection to %s:%d closed\n",
+                                              eq->dest_name, eq->dest_port);
+                               }
                        }
                } 
 
 
 
 static
 int 
-create_msg(il_octet_string_t *ev, char **buffer, long *receipt)
+create_msg(il_octet_string_t *ev, char **buffer, long *receipt, time_t *expires)
 {
   char *p;  int  len;
   char *event = ev->data;
   }
 #endif
 
+  if(p = strstr(event, "DG.EXPIRES")) {
+         int n;
+
+         p += 11;
+         *expires = atoi(p);
+  }
   len = encode_il_msg(buffer, ev);
   if(len < 0) {
     set_error(IL_NOMEM, ENOMEM, "create_msg: out of memory allocating message");
   msg->dest_port = src->dest_port;
   msg->dest = strdup(src->dest);
 #endif
+  msg->expires = src->expires;
   return(msg);
 }
 
        msg->job_id_s = edg_wll_NotifIdUnparse(notif_event->notification.notifId);
        if(notif_event->notification.jobstat && 
           (strlen(notif_event->notification.jobstat) > 0)) {
-               msg->len = create_msg(event, &msg->msg, &msg->receipt_to);
+               msg->len = create_msg(event, &msg->msg, &msg->receipt_to, &msg->expires);
        }
        edg_wll_FreeEvent(notif_event);
        free(notif_event);
                return(-1);
        }
 #else
-       msg->len = create_msg(event, &msg->msg, &msg->receipt_to);
+       msg->len = create_msg(event, &msg->msg, &msg->receipt_to, &msg->expires);
        if(msg->len < 0) {
                return(-1);
        }