fix thread creation, fix destination mapping
authorMichal Voců <michal@ruk.cuni.cz>
Tue, 21 Apr 2009 13:23:21 +0000 (13:23 +0000)
committerMichal Voců <michal@ruk.cuni.cz>
Tue, 21 Apr 2009 13:23:21 +0000 (13:23 +0000)
org.glite.lb.logger/src/event_queue.c
org.glite.lb.logger/src/event_store.c
org.glite.lb.logger/src/interlogd.h
org.glite.lb.logger/src/queue_thread.c

index 18bdcdc..f90afd8 100644 (file)
@@ -283,6 +283,7 @@ event_queue_move_events(struct event_queue *eq_s,
        struct event_queue_msg *p, **source_prev, **dest_tail;
 
        assert(eq_s != NULL);
+       assert(data != NULL);
 
        event_queue_lock(eq_s);
        if(eq_d) {
@@ -294,10 +295,10 @@ event_queue_move_events(struct event_queue *eq_s,
        p = *source_prev;
        eq_s->tail = NULL;
        while(p) {
-         if((*cmp_func)(p->msg, data)) {
-                       il_log(LOG_DEBUG, "  moving event at offset %d(%d) from %s:%d to %s:%d\n",
-                              p->msg->offset, p->msg->generation, eq_s->dest_name, eq_s->dest_port,
-                              eq_d ? eq_d->dest_name : "trash", eq_d ? eq_d->dest_port : -1);
+               if((*cmp_func)(p->msg, data)) {
+                       il_log(LOG_DEBUG, "      moving event at offset %d(%d) from %s:%d to %s:%d\n",
+                          p->msg->offset, p->msg->generation, eq_s->dest_name, eq_s->dest_port, 
+                          eq_d ? eq_d->dest_name : "trash", eq_d ? eq_d->dest_port : -1);
                        /* il_log(LOG_DEBUG, "  current: %x, next: %x\n", p, p->prev); */
                        /* remove the message from the source list */
                        *source_prev = p->prev;
index 33ee416..1e9063f 100644 (file)
@@ -485,15 +485,26 @@ static
 int
 cmp_jobid(struct server_msg *msg, void *data)
 {
+       assert(msg != NULL);
+       assert(data != NULL);
+
        char *job_id_s = (char*)data;
        return strcmp(msg->job_id_s, job_id_s) == 0;
 }
 
+struct cmp_exp_data {
+       char *job_id_s;
+       time_t expires;
+};
+
 static
 int
 cmp_jobid_set_exp(struct server_msg *msg, void *data)
 {
-       struct server_msg *m = (struct server_msg *)data;
+       struct cmp_exp_data *m = (struct cmp_exp_data *)data;
+
+       assert(msg != NULL);
+       assert(data != NULL);
 
        if(strcmp(msg->job_id_s, m->job_id_s) == 0) {
                msg->expires = m->expires;
@@ -723,31 +734,39 @@ event_store_recover(struct event_store *es)
     msg->generation = es->generation;
 
 #ifdef IL_NOTIFICATIONS
+    il_log(LOG_DEBUG, "DEBUG: message dest %s, last dest %s, known dest %s\n",
+          msg->dest, last_dest, eq_b ? eq_b->dest : "none"); 
     /* check message destination */
     if(msg->dest == NULL) {
-        /* the message does not have destination itself, use destination cached for notification id */
-       if(eq_b == NULL) {
-               /* no destination is known for notification id, commit it immediately */
-               il_log(LOG_DEBUG, "    message has no known destination, will not be sent\n");
-                       event_store_commit(es, msg->ev_len, 0, msg->generation);
-       }
+            /* the message does not have destination itself, use destination cached for notification id */
+           if(eq_b == NULL) {
+                   /* no destination is known for notification id, commit it immediately */
+                   il_log(LOG_DEBUG, "    message has no known destination, will not be sent\n");
+                   event_store_commit(es, msg->ev_len, 0, msg->generation);
+           }
     } else {
         /* check if we know destination for notification id */
         if(eq_b == NULL) {
                eq_b = queue_list_get(msg->dest);
-        } else {
-               if(last_dest == NULL || strcmp(msg->dest, last_dest) != 0) {
-                       /* destination changed */
-                       last_dest = msg->dest;
-               }
+               if(notifid_map_set_dest(es->job_id_s, eq_b) < 0) {
+                       break;
+               }
         }
+       /* remember last message destination */
+       if(last_dest == NULL || strcmp(msg->dest, last_dest) != 0) {
+               /* destination changed */
+               if(last_dest) {
+                       free(last_dest);
+               }
+               last_dest = strdup(msg->dest);
+       }
     }
 
     /* check message expiration */
     if(last_exp == 0 || last_exp != msg->expires) {
        last_exp = msg->expires;
     }
- #else
+#else
     /* first enqueue to the LS */
     if(!bs_only && (last >= last_ls)) {
 
@@ -782,26 +801,43 @@ event_store_recover(struct event_store *es)
 
 #if defined(IL_NOTIFICATIONS)
   /* check if we have to move events to new destination */
+  il_log(LOG_DEBUG, "    last destination %s, last known destination %s\n", last_dest, eq_b->dest);
   if(last_dest && strcmp(last_dest, eq_b->dest)) {
          struct event_queue *eq_dest = queue_list_get(last_dest);
 
          /* set new destination */
          if(notifid_map_set_dest(es->job_id_s, eq_dest) < 0) {
                  ret = -1; 
-         }
-
-         /* move all events with this notif_id from eq_b to eq_dest */
-         event_queue_move_events(eq_b, eq_dest, cmp_jobid, es->job_id_s);
-         eq_b = eq_dest;
+         } else {
 
-         /* XXX - we should kill the old queue too */
+                 /* move all events with this notif_id from eq_b to eq_dest */
+                 event_queue_move_events(eq_b, eq_dest, cmp_jobid, es->job_id_s);
+                 eq_b = eq_dest;
+                 il_log(LOG_DEBUG, "    all messages for notif id %s are now destined to %s\n",
+                        es->job_id_s, eq_b->dest);
+                 if(event_queue_create_thread(eq_b) < 0) {
+                         ret = -1; 
+                 } else {
+                         event_queue_cond_lock(eq_b);
+                         event_queue_signal(eq_b);
+                         event_queue_cond_unlock(eq_b);
+                 }
+         }
+  }
+  if(last_dest) {
+         free(last_dest);
+         last_dest = NULL;
   }
 
   /* if the expiration changed, set new one */
   if(last_exp != notifid_map_get_expiration(es->job_id_s)) {
+         struct cmp_exp_data data;
+
          notifid_map_set_expiration(es->job_id_s, last_exp);
          /* set expiration for all events with this notif id */
-         event_queue_move_events(eq_b, NULL, cmp_jobid_set_exp, msg);
+         data.job_id_s = es->job_id_s;
+         data.expires = last_exp;
+         event_queue_move_events(eq_b, NULL, cmp_jobid_set_exp, &data);
   }
 #endif
 
index edbbad0..7210983 100644 (file)
@@ -158,7 +158,7 @@ struct event_queue {
        edg_wll_GssConnection   gss;            /* GSS connection */
        char                   *dest_name;
        int                     dest_port;
-       char                               *dest;
+       char                   *dest;
        int                     timeout;        /* queue timeout */
        struct event_queue_msg *tail;           /* last message in the queue */
        struct event_queue_msg *head;           /* first message in the queue */
index 20c25e7..da2cb57 100644 (file)
@@ -213,7 +213,7 @@ event_queue_create_thread(struct event_queue *eq)
        event_queue_lock(eq);
 
        /* if there is a thread already, just return */
-       if(eq->thread_id > 0) {
+       if(eq->thread_id != 0) {
                event_queue_unlock(eq);
                return(0);
        }