added notification expiration
authorMichal Voců <michal@ruk.cuni.cz>
Fri, 30 Nov 2007 19:05:22 +0000 (19:05 +0000)
committerMichal Voců <michal@ruk.cuni.cz>
Fri, 30 Nov 2007 19:05:22 +0000 (19:05 +0000)
org.glite.lb.logger/src/event_queue.c
org.glite.lb.logger/src/il_master.c
org.glite.lb.logger/src/interlogd.h
org.glite.lb.logger/src/queue_mgr.c
org.glite.lb.logger/src/queue_thread.c
org.glite.lb.logger/src/server_msg.c

index f305a8f..fcfcfbc 100644 (file)
@@ -270,15 +270,15 @@ event_queue_remove(struct event_queue *eq)
   return(0);
 }
 
-#if defined(IL_NOTIFICATIONS)
-
 int
-event_queue_move_events(struct event_queue *eq_s, struct event_queue *eq_d, char *notif_id)
+event_queue_move_events(struct event_queue *eq_s, 
+                       struct event_queue *eq_d, 
+                       int (*cmp_func)(struct server_msg *, void *), 
+                       void *data)
 {
        struct event_queue_msg *p, **source_prev, **dest_tail;
 
        assert(eq_s != NULL);
-       assert(notif_id != NULL);
 
        event_queue_lock(eq_s);
        if(eq_d) {
@@ -290,9 +290,10 @@ event_queue_move_events(struct event_queue *eq_s, struct event_queue *eq_d, char
        p = *source_prev;
        eq_s->tail = NULL;
        while(p) {
-               if(strcmp(p->msg->job_id_s, notif_id) == 0) {
+         if((*cmp_func)(p->msg, data)) {
                        il_log(LOG_DEBUG, "  moving event at offset %d from %s:%d to %s:%d\n",
-                              p->msg->offset, eq_s->dest_name,eq_s->dest_port, eq_d ? eq_d->dest_name : "trash",eq_d ? eq_d->dest_port : -1);
+                              p->msg->offset, eq_s->dest_name, eq_s->dest_port, 
+                              eq_d ? eq_d->dest_name : "trash", eq_d ? eq_d->dest_port : -1);
                        il_log(LOG_DEBUG, "  current: %x, next: %x\n", p, p->prev);
                        /* remove the message from the source list */
                        *source_prev = p->prev;
@@ -319,42 +320,3 @@ event_queue_move_events(struct event_queue *eq_s, struct event_queue *eq_d, char
        return(0);
 }
 
-#endif
-
-/* Remove all expired events.
- *
- * Returns: -1 - error, 0 - 
- */
-int
-event_queue_clean_expired(struct event_queue *eq)
-{
-       time_t now = time(NULL);
-       struct event_queue_msg *p, **prev;
-
-       assert(eq != NULL);
-
-       event_queue_lock(eq);
-       prev = &(eq->head);
-       p = *prev;
-       eq->tail = NULL;
-       while(p) {
-               if((p->msg->expires > 0) && (p->msg->expires < now)) {
-                       il_log(LOG_DEBUG, "    removing expired event at offset %d from %s:%d\n",
-                              p->msg->offset, eq->dest_name,eq->dest_port);
-                       /* il_log(LOG_DEBUG, "  current: %x, next: %x\n", p, p->prev); */
-                       /* remove the message from the list */
-                       *prev = p->prev;
-                       /* commit and free the message */
-                       event_store_commit(p->msg->es, p->msg->ev_len, queue_list_is_log(eq));
-                       server_msg_free(p->msg);
-                       free(p);
-               } else {
-                       /* message stays */
-                       prev = &(p->prev);
-                       eq->tail = p;
-               }
-               p = *prev;
-       }
-       event_queue_unlock(eq);
-       return(0);
-}
index 793959c..e394e5a 100644 (file)
 #include "glite/lb/lb_perftest.h"
 #endif
 
+static
+int
+cmp_jobid(struct server_msg *msg, void *data) 
+{
+       char *job_id_s = (char*)data;
+       return strcmp(msg->job_id_s, job_id_s) == 0;
+}
+
+static
+int
+cmp_jobid_set_exp(struct server_msg *msg, void *data)
+{
+       struct server_msg *m = (struct server_msg *)data;
+
+       if(strcmp(msg->job_id_s, m->job_id_s) == 0) {
+               msg->expires = m->expires;
+       }
+       return 0;
+}
+
+
 int 
 enqueue_msg(struct event_queue *eq, struct server_msg *msg)
 {
@@ -31,10 +52,17 @@ enqueue_msg(struct event_queue *eq, struct server_msg *msg)
                        return(-1);
                /* move all events with this notif_id from eq_known to eq */
                if(eq_known != NULL) {
-                       event_queue_move_events(eq_known, eq, msg->job_id_s);
+                       event_queue_move_events(eq_known, eq, cmp_jobid, msg->job_id_s);
                        /* XXX - we should kill the old queue too */
                }
        }
+
+       /* if the expiration changed, set new one */
+       if(msg->expires != notifid_map_get_expiration(msg->job_id_s)) {
+               notifid_map_set_expiration(msg->job_id_s, msg->expires);
+               /* set expiration for all events with this notif id */
+               event_queue_move_events(eq, NULL, cmp_jobid_set_exp, msg);
+       }
 #endif
 
        /* fire thread to take care of this queue */
index 92f8dc1..cf4d634 100644 (file)
@@ -166,7 +166,7 @@ int event_queue_remove(struct event_queue *);
 int event_queue_enqueue(struct event_queue *, char *);
 /* helper */
 int enqueue_msg(struct event_queue *, struct server_msg *);
-int event_queue_clean_expired(struct event_queue *eq);
+int event_queue_move_events(struct event_queue *, struct event_queue *, int (*)(struct server_msg *, void *), void *); 
 
 /* protocol event queue methods */
 int event_queue_connect(struct event_queue *);
@@ -201,7 +201,8 @@ int queue_list_is_log(struct event_queue *);
 #if defined(IL_NOTIFICATIONS)
 struct event_queue *notifid_map_get_dest(const char *);
 int notifid_map_set_dest(const char *, struct event_queue *);
-int event_queue_move_events(struct event_queue *, struct event_queue *, char *); 
+time_t notifid_map_get_expiration(const char *);
+int notifid_map_set_expiration(const char *, time_t);
 #endif
 
 /* event store functions */
index abae1f3..b76c397 100644 (file)
@@ -12,6 +12,9 @@ struct queue_list {
   struct event_queue *queue;
   char   *dest;
   struct queue_list *next;
+#if defined(IL_NOTIFICATIONS)
+  time_t expires;
+#endif
 };
 
 static struct event_queue *log_queue;
@@ -91,24 +94,6 @@ queue_list_add(struct queue_list **ql, const char *dest, struct event_queue *eq)
 }
 
 
-/*
-static
-int
-queue_list_remove(struct queue_list *el, struct queue_list *prev)
-{
-  assert(el != NULL);
-
-  if(prev) 
-    prev->next = el->next;
-  else
-    queues = el->next;
-
-  free(el);
-  return(1);
-}
-*/
-
-
 #if !defined(IL_NOTIFICATIONS)
 static
 char *
@@ -213,6 +198,17 @@ queue_list_next()
   return(current ? current->queue : NULL);
 }
 
+
+int
+queue_list_remove_queue(struct event_queue *eq)
+{
+  assert(eq != NULL);
+
+  free(eq);
+  return(1);
+}
+
+
 #if defined(IL_NOTIFICATIONS)
 
 static struct queue_list  *notifid_map = NULL;
@@ -241,4 +237,32 @@ notifid_map_set_dest(const char *notif_id, struct event_queue *eq)
        }
 }
 
+
+time_t
+notifid_map_get_expiration(const char * notif_id)
+{
+  struct queue_list *q;
+
+  queue_list_find(notifid_map, notif_id, &q, NULL);
+  return(q ? q->expires : 0);
+}
+
+
+int
+notifid_map_set_expiration(const char *notif_id, time_t exp)
+{
+  struct queue_list *q;
+
+  if(queue_list_find(notifid_map, notif_id, &q, NULL)) {
+    q->expires = exp;
+    return(1);
+  } else {
+    return(0);
+  }
+}
+
 #endif
+
+/* Local Variables:           */
+/* c-indentation-style: gnu   */
+/* End:                       */
index 3c91c92..b75c501 100644 (file)
@@ -27,6 +27,16 @@ queue_thread_cleanup(void *q)
 }
 
 
+static time_t now;
+
+static
+int
+cmp_expires(struct server_msg *msg, void *data)
+{
+  time_t *t = (time_t*)data;
+  return msg->expires < *t;
+}
+
 static
 void *
 queue_thread(void *q)
@@ -94,7 +104,8 @@ queue_thread(void *q)
                
                /* discard expired events */
                il_log(LOG_DEBUG, "  discarding expired events\n");
-               event_queue_clean_expired(eq);
+               now = time(NULL);
+               event_queue_move_events(eq, NULL, cmp_expires, &now);
                if(!event_queue_empty(eq)) {
 
                        /* deliver pending events */
@@ -409,3 +420,7 @@ int event_queue_cond_unlock(struct event_queue *eq)
 
        return(0);
 }
+
+/* Local Variables:           */
+/* c-indentation-style: linux */
+/* End:                       */
index 0f8c988..e9578a1 100644 (file)
@@ -163,6 +163,7 @@ server_msg_init(struct server_msg *msg, il_octet_string_t *event)
           (strlen(notif_event->notification.jobstat) > 0)) {
                msg->len = create_msg(event, &msg->msg, &msg->receipt_to, &msg->expires);
        }
+       msg->expires = notif_event->notification.expires;
        edg_wll_FreeEvent(notif_event);
        free(notif_event);
        if(msg->len < 0) {