event expiration and delivery thread termination
authorMichal Voců <michal@ruk.cuni.cz>
Fri, 23 Nov 2007 15:30:41 +0000 (15:30 +0000)
committerMichal Voců <michal@ruk.cuni.cz>
Fri, 23 Nov 2007 15:30:41 +0000 (15:30 +0000)
org.glite.lb.logger/src/event_queue.c
org.glite.lb.logger/src/interlogd.h
org.glite.lb.logger/src/queue_thread.c
org.glite.lb.logger/src/server_msg.c

index bc9a44e..f305a8f 100644 (file)
@@ -320,3 +320,41 @@ event_queue_move_events(struct event_queue *eq_s, struct event_queue *eq_d, char
 }
 
 #endif
+
+/* Remove all expired events.
+ *
+ * Returns: -1 - error, 0 - 
+ */
+int
+event_queue_clean_expired(struct event_queue *eq)
+{
+       time_t now = time(NULL);
+       struct event_queue_msg *p, **prev;
+
+       assert(eq != NULL);
+
+       event_queue_lock(eq);
+       prev = &(eq->head);
+       p = *prev;
+       eq->tail = NULL;
+       while(p) {
+               if((p->msg->expires > 0) && (p->msg->expires < now)) {
+                       il_log(LOG_DEBUG, "    removing expired event at offset %d from %s:%d\n",
+                              p->msg->offset, eq->dest_name,eq->dest_port);
+                       /* il_log(LOG_DEBUG, "  current: %x, next: %x\n", p, p->prev); */
+                       /* remove the message from the list */
+                       *prev = p->prev;
+                       /* commit and free the message */
+                       event_store_commit(p->msg->es, p->msg->ev_len, queue_list_is_log(eq));
+                       server_msg_free(p->msg);
+                       free(p);
+               } else {
+                       /* message stays */
+                       prev = &(p->prev);
+                       eq->tail = p;
+               }
+               p = *prev;
+       }
+       event_queue_unlock(eq);
+       return(0);
+}
index 810314e..92f8dc1 100644 (file)
@@ -61,7 +61,7 @@
 // #define TIMEOUT      5
 extern int TIMEOUT;
 #define INPUT_TIMEOUT (60)
-
+#define EXIT_TIMEOUT (1*60)
 
 extern gss_cred_id_t cred_handle;
 extern pthread_mutex_t cred_handle_lock;
@@ -115,6 +115,7 @@ struct server_msg {
        int                     dest_port;
        char                   *dest;
 #endif
+       time_t                  expires;        /* time (in seconds from epoch) the message expires */
 };
 
 
@@ -165,6 +166,7 @@ int event_queue_remove(struct event_queue *);
 int event_queue_enqueue(struct event_queue *, char *);
 /* helper */
 int enqueue_msg(struct event_queue *, struct server_msg *);
+int event_queue_clean_expired(struct event_queue *eq);
 
 /* protocol event queue methods */
 int event_queue_connect(struct event_queue *);
index cb5424e..3c91c92 100644 (file)
@@ -34,7 +34,8 @@ queue_thread(void *q)
        struct event_queue *eq = (struct event_queue *)q;
        int ret, exit;
        int retrycnt;
-       int close_timeout;
+       int close_timeout = 0;
+       int exit_timeout = EXIT_TIMEOUT;
 
        if(init_errors(0) < 0) {
                il_log(LOG_ERR, "Error initializing thread specific data, exiting!");
@@ -68,8 +69,15 @@ queue_thread(void *q)
                                               eq->dest_name, eq->dest_port);
                                }
                                close_timeout = 0;
-                       } else 
-                               ret = event_queue_wait(eq, 0);
+                       } else {
+                               ret = event_queue_wait(eq, exit_timeout);
+                               if(ret == 1) {
+                                       il_log(LOG_INFO, "  thread idle for more than %d seconds, exiting\n", exit_timeout);
+                                       event_queue_close(eq);
+                                       event_queue_cond_unlock(eq);
+                                       pthread_exit((void*)0);
+                               }
+                       }
                        if(ret < 0) {
                                /* error waiting */
                                il_log(LOG_ERR, "queue_thread: %s\n", error_get_msg());
@@ -78,56 +86,63 @@ queue_thread(void *q)
                        }
                }  /* END while(empty) */
     
-               il_log(LOG_DEBUG, "  attempting delivery to %s:%d\n", eq->dest_name, eq->dest_port);
 
                /* allow other threads to signal us, ie. insert new events while
                 * we are sending or request flush operation
                 */
                event_queue_cond_unlock(eq);
                
-               /* connect to server */
-               if((ret=event_queue_connect(eq)) == 0) {
-                       /* not connected */
-                       if(error_get_maj() != IL_OK)
-                               il_log(LOG_ERR, "queue_thread: %s\n", error_get_msg());
+               /* discard expired events */
+               il_log(LOG_DEBUG, "  discarding expired events\n");
+               event_queue_clean_expired(eq);
+               if(!event_queue_empty(eq)) {
+
+                       /* deliver pending events */
+                       il_log(LOG_DEBUG, "  attempting delivery to %s:%d\n", eq->dest_name, eq->dest_port);
+                       /* connect to server */
+                       if((ret=event_queue_connect(eq)) == 0) {
+                               /* not connected */
+                               if(error_get_maj() != IL_OK)
+                                       il_log(LOG_ERR, "queue_thread: %s\n", error_get_msg());
 #if defined(IL_NOTIFICATIONS)
-                       il_log(LOG_INFO, "    could not connect to client %s, waiting for retry\n", eq->dest_name);
+                               il_log(LOG_INFO, "    could not connect to client %s, waiting for retry\n", eq->dest_name);
 #else
-                       il_log(LOG_INFO, "    could not connect to bookkeeping server %s, waiting for retry\n", eq->dest_name);
+                               il_log(LOG_INFO, "    could not connect to bookkeeping server %s, waiting for retry\n", eq->dest_name);
 #endif
-                       retrycnt++;
-               } else {
-                       retrycnt = 0;
-                       /* connected, send events */
-                       switch(ret=event_queue_send(eq)) {
-                               
-                       case 0:
-                               /* there was an error and we still have events to send */
-                               if(error_get_maj() != IL_OK)
+                               retrycnt++;
+                       } else {
+                               retrycnt = 0;
+                               /* connected, send events */
+                               switch(ret=event_queue_send(eq)) {
+                                       
+                               case 0:
+                                       /* there was an error and we still have events to send */
+                                       if(error_get_maj() != IL_OK)
+                                               il_log(LOG_ERR, "queue_thread: %s\n", error_get_msg());
+                                       il_log(LOG_DEBUG, "  events still waiting\n");
+                                       break;
+                                       
+                               case 1:
+                                       /* hey, we are done for now */
+                                       il_log(LOG_DEBUG, "  all events for %s sent\n", eq->dest_name);
+                                       break;
+                                       
+                               default:
+                                       /* internal error */
                                        il_log(LOG_ERR, "queue_thread: %s\n", error_get_msg());
-                               il_log(LOG_DEBUG, "  events still waiting\n");
-                               break;
-                               
-                       case 1:
-                               /* hey, we are done for now */
-                               il_log(LOG_DEBUG, "  all events for %s sent\n", eq->dest_name);
-                               break;
-                               
-                       default:
-                               /* internal error */
-                               il_log(LOG_ERR, "queue_thread: %s\n", error_get_msg());
-                               exit = 1;      
-                               break;
-                               
-                       } /* switch */
+                                       exit = 1;      
+                                       break;
+                                       
+                               } /* switch */
                        
-                       /* we are done for now anyway, so close the queue */
-                       if((ret == 1) && lazy_close)
-                               close_timeout = default_close_timeout;
-                       else {
-                               event_queue_close(eq);
-                               il_log(LOG_DEBUG, "  connection to %s:%d closed\n",
-                                      eq->dest_name, eq->dest_port);
+                               /* we are done for now anyway, so close the queue */
+                               if((ret == 1) && lazy_close)
+                                       close_timeout = default_close_timeout;
+                               else {
+                                       event_queue_close(eq);
+                                       il_log(LOG_DEBUG, "  connection to %s:%d closed\n",
+                                              eq->dest_name, eq->dest_port);
+                               }
                        }
                } 
 
index 3b1002d..0f8c988 100644 (file)
@@ -12,7 +12,7 @@
 
 static
 int 
-create_msg(il_octet_string_t *ev, char **buffer, long *receipt)
+create_msg(il_octet_string_t *ev, char **buffer, long *receipt, time_t *expires)
 {
   char *p;  int  len;
   char *event = ev->data;
@@ -57,6 +57,12 @@ create_msg(il_octet_string_t *ev, char **buffer, long *receipt)
   }
 #endif
 
+  if(p = strstr(event, "DG.EXPIRES")) {
+         int n;
+
+         p += 11;
+         *expires = atoi(p);
+  }
   len = encode_il_msg(buffer, ev);
   if(len < 0) {
     set_error(IL_NOMEM, ENOMEM, "create_msg: out of memory allocating message");
@@ -117,6 +123,7 @@ server_msg_copy(struct server_msg *src)
   msg->dest_port = src->dest_port;
   msg->dest = strdup(src->dest);
 #endif
+  msg->expires = src->expires;
   return(msg);
 }
 
@@ -154,7 +161,7 @@ server_msg_init(struct server_msg *msg, il_octet_string_t *event)
        msg->job_id_s = edg_wll_NotifIdUnparse(notif_event->notification.notifId);
        if(notif_event->notification.jobstat && 
           (strlen(notif_event->notification.jobstat) > 0)) {
-               msg->len = create_msg(event, &msg->msg, &msg->receipt_to);
+               msg->len = create_msg(event, &msg->msg, &msg->receipt_to, &msg->expires);
        }
        edg_wll_FreeEvent(notif_event);
        free(notif_event);
@@ -162,7 +169,7 @@ server_msg_init(struct server_msg *msg, il_octet_string_t *event)
                return(-1);
        }
 #else
-       msg->len = create_msg(event, &msg->msg, &msg->receipt_to);
+       msg->len = create_msg(event, &msg->msg, &msg->receipt_to, &msg->expires);
        if(msg->len < 0) {
                return(-1);
        }