defer moving events between queues until final destination was found
authorMichal Voců <michal@ruk.cuni.cz>
Fri, 17 Apr 2009 16:04:54 +0000 (16:04 +0000)
committerMichal Voců <michal@ruk.cuni.cz>
Fri, 17 Apr 2009 16:04:54 +0000 (16:04 +0000)
org.glite.lb.logger/src/event_queue.c
org.glite.lb.logger/src/event_store.c
org.glite.lb.logger/src/il_master.c
org.glite.lb.logger/src/interlogd.h

index 9ce47f6..18bdcdc 100644 (file)
@@ -1,6 +1,6 @@
 #ident "$Header$"
 
-/* 
+/*
  *   - general queue handling routines (insert, get)
  */
 
@@ -27,11 +27,6 @@ event_queue_create(char *server_name)
   struct event_queue *eq;
   char *p;
 
-  p = strchr(server_name, ':');
-  
-  if(p) 
-    *p++ = 0;
-
   if((eq = malloc(sizeof(*eq))) == NULL) {
     set_error(IL_NOMEM, ENOMEM, "event_queue_create: error allocating event queue");
     return(NULL);
@@ -39,11 +34,15 @@ event_queue_create(char *server_name)
 
   memset(eq, 0, sizeof(*eq));
 
-  eq->dest_name = strdup(server_name);
+  eq->dest = strdup(server_name);
 
+  p = strchr(server_name, ':');
+  if(p)
+    *p++ = 0;
+  eq->dest_name = strdup(server_name);
   if(p)
     *(p-1) = ':';
-  
+
 #if defined(IL_NOTIFICATIONS) || defined(IL_WS)
   eq->dest_port = atoi(p);
 #else
@@ -96,6 +95,11 @@ event_queue_free(struct event_queue *eq)
 #if defined(INTERLOGD_HANDLE_CMD) && defined(INTERLOGD_FLUSH)
   pthread_cond_destroy(&eq->flush_cond);
 #endif
+
+  if(eq->dest_name)
+         free(eq->dest_name);
+  if(eq->dest)
+         free(eq->dest);
   free(eq);
 
   return(0);
@@ -124,10 +128,10 @@ event_queue_insert(struct event_queue *eq, struct server_msg *msg)
 #if defined(INTERLOGD_EMS)
   struct event_queue_msg *tail;
 #endif
-  
+
   assert(eq != NULL);
 
-  if((el = malloc(sizeof(*el))) == NULL) 
+  if((el = malloc(sizeof(*el))) == NULL)
     return(set_error(IL_NOMEM, ENOMEM, "event_queue_insert: not enough room for queue element"));
 
   el->msg = server_msg_copy(msg);
@@ -154,9 +158,9 @@ event_queue_insert(struct event_queue *eq, struct server_msg *msg)
        eq->tail = el;
     }
     eq->tail_ems = el;
-  } else 
+  } else
 #endif
-  { 
+  {
     /* normal messages */
     if(eq->tail)
       eq->tail->prev = el;
@@ -168,7 +172,7 @@ event_queue_insert(struct event_queue *eq, struct server_msg *msg)
 #if defined(INTERLOGD_EMS)
   /* if we are inserting message between mark_prev and mark_this,
      we have to adjust mark_prev accordingly */
-  if(eq->mark_this && (el->prev == eq->mark_this)) 
+  if(eq->mark_this && (el->prev == eq->mark_this))
     eq->mark_prev = el;
 #endif
 
@@ -189,7 +193,7 @@ event_queue_get(struct event_queue *eq, struct server_msg **msg)
 
   assert(eq != NULL);
   assert(msg != NULL);
-  
+
   event_queue_lock(eq);
   el = eq->head;
 #if defined(INTERLOGD_EMS)
@@ -208,7 +212,7 @@ event_queue_get(struct event_queue *eq, struct server_msg **msg)
 }
 
 
-int 
+int
 event_queue_remove(struct event_queue *eq)
 {
   struct event_queue_msg *el;
@@ -258,12 +262,12 @@ event_queue_remove(struct event_queue *eq)
          eq->tail = NULL;
   }
 #endif
-  if(--eq->cur_len == 0) 
+  if(--eq->cur_len == 0)
          eq->times_empty++;
 
   event_queue_unlock(eq);
   /* end of critical section */
-    
+
   server_msg_free(el->msg);
   free(el);
 
@@ -271,9 +275,9 @@ event_queue_remove(struct event_queue *eq)
 }
 
 int
-event_queue_move_events(struct event_queue *eq_s, 
-                       struct event_queue *eq_d, 
-                       int (*cmp_func)(struct server_msg *, void *), 
+event_queue_move_events(struct event_queue *eq_s,
+                       struct event_queue *eq_d,
+                       int (*cmp_func)(struct server_msg *, void *),
                        void *data)
 {
        struct event_queue_msg *p, **source_prev, **dest_tail;
@@ -292,7 +296,7 @@ event_queue_move_events(struct event_queue *eq_s,
        while(p) {
          if((*cmp_func)(p->msg, data)) {
                        il_log(LOG_DEBUG, "  moving event at offset %d(%d) from %s:%d to %s:%d\n",
-                              p->msg->offset, p->msg->generation, eq_s->dest_name, eq_s->dest_port, 
+                              p->msg->offset, p->msg->generation, eq_s->dest_name, eq_s->dest_port,
                               eq_d ? eq_d->dest_name : "trash", eq_d ? eq_d->dest_port : -1);
                        /* il_log(LOG_DEBUG, "  current: %x, next: %x\n", p, p->prev); */
                        /* remove the message from the source list */
index e584e89..78cc5e4 100644 (file)
@@ -481,6 +481,25 @@ event_store_recover_jobid(struct event_store *es)
        return 0;
 }
 
+static
+int
+cmp_jobid(struct server_msg *msg, void *data)
+{
+       char *job_id_s = (char*)data;
+       return strcmp(msg->job_id_s, job_id_s) == 0;
+}
+
+static
+int
+cmp_jobid_set_exp(struct server_msg *msg, void *data)
+{
+       struct server_msg *m = (struct server_msg *)data;
+
+       if(strcmp(msg->job_id_s, m->job_id_s) == 0) {
+               msg->expires = m->expires;
+       }
+       return 0;
+}
 
 /*
  * event_store_recover()
@@ -500,23 +519,26 @@ event_store_recover(struct event_store *es)
   struct flock efl;
   char err_msg[128];
   struct stat stbuf;
+#if defined(IL_NOTIFICATIONS)
+  char *last_dest = NULL;
+  time_t last_exp = 0;
+#endif
 
   assert(es != NULL);
 
 #if defined(IL_NOTIFICATIONS)
-  /* destination queue has to be found for each message separately */
+  /* destination queue has to be found for each message separately, */
+  /* this is current known destination for our notification id (may be NULL!) */
+  eq_b = notifid_map_get_dest(es->job_id_s);
 #else
+  /* get log server queue */
+  eq_l = queue_list_get(NULL);
   /* find bookkeeping server queue */
   eq_b = queue_list_get(es->job_id_s);
   if(eq_b == NULL)
     return(-1);
 #endif
 
-#if !defined(IL_NOTIFICATIONS)
-  /* get log server queue */
-  eq_l = queue_list_get(NULL);
-#endif
-
   /* lock the event_store and offset locks */
   event_store_lock(es);
   if(pthread_rwlock_wrlock(&es->offset_lock))
@@ -700,44 +722,52 @@ event_store_recover(struct event_store *es)
     msg->es = es;
     msg->generation = es->generation;
 
-    /* first enqueue to the LS */
-    if(!bs_only && (last >= last_ls)) {
-
-      il_log(LOG_DEBUG, "      queuing event at %ld to logging server\n", last);
-
-#if !defined(IL_NOTIFICATIONS)
-      if(enqueue_msg(eq_l, msg) < 0)
-       break;
-#endif
-    }
-
 #ifdef IL_NOTIFICATIONS
-    eq_b = queue_list_get(msg->dest);
-    /* if the message does not have destination itself, use destination cached for notification id */
-    if(eq_b == NULL) {
-       eq_b = notifid_map_get_dest(msg->job_id_s);
+    /* check message destination */
+    if(msg->dest == NULL) {
+        /* the message does not have destination itself, use destination cached for notification id */
        if(eq_b == NULL) {
-               /* message has no destination and no destination is known for notification id,
-                * commit it immediately
-                */
+               /* no destination is known for notification id, commit it immediately */
                il_log(LOG_DEBUG, "    message has no known destination, will not be sent\n");
                        event_store_commit(es, msg->ev_len, 0, msg->generation);
-               /* if the expiration changed, set new one now, message will be discarded soon */
-               if(msg->expires != notifid_map_get_expiration(msg->job_id_s)) {
-                       notifid_map_set_expiration(msg->job_id_s, msg->expires);
-               }
        }
+    } else {
+        /* check if we know destination for notification id */
+        if(eq_b == NULL) {
+               eq_b = queue_list_get(msg->dest);
+        } else {
+               if(last_dest == NULL || strcmp(msg->dest, last_dest) != 0) {
+                       /* destination changed */
+                       last_dest = msg->dest;
+               }
+        }
+    }
+
+    /* check message expiration */
+    if(last_exp == 0 || last_exp != msg->expires) {
+       last_exp = msg->expires;
+    }
+ #else
+    /* first enqueue to the LS */
+    if(!bs_only && (last >= last_ls)) {
+
+      il_log(LOG_DEBUG, "      queuing event at %ld to logging server\n", last);
+
+      if(enqueue_msg(eq_l, msg) < 0) {
+         break;
+      }
     }
 #endif
 
-    /* now enqueue to the BS, if neccessary */
+    /* now enqueue to the BS, if necessary */
     if((eq_b != eq_l) &&
        (last >= last_bs)) {
 
-      il_log(LOG_DEBUG, "      queueing event at %ld to bookkeeping server\n", last);
+      il_log(LOG_DEBUG, "      queuing event at %ld to bookkeeping server\n", last);
 
-      if(enqueue_msg(eq_b, msg) < 0)
+      if(enqueue_msg(eq_b, msg) < 0) {
          break;
+      }
     }
     server_msg_free(msg);
     msg = NULL;
@@ -750,6 +780,31 @@ event_store_recover(struct event_store *es)
 
   } /* while */
 
+#if defined(IL_NOTIFICATIONS)
+  /* check if we have to move events to new destination */
+  if(last_dest && strcmp(last_dest, eq_b->dest)) {
+         struct event_queue *eq_dest = queue_list_get(last_dest);
+
+         /* set new destination */
+         if(notifid_map_set_dest(es->job_id_s, eq) < 0) {
+                 ret = -1; break;
+         }
+
+         /* move all events with this notif_id from eq_b to eq_dest */
+         event_queue_move_events(eq_b, eq_dest, cmp_jobid, es->job_id_s);
+         eq_b = eq_dest;
+
+         /* XXX - we should kill the old queue too */
+  }
+
+  /* if the expiration changed, set new one */
+  if(last_exp != notifid_map_get_expiration(es->job_id_s)) {
+         notifid_map_set_expiration(es->job_id_s, last_exp);
+         /* set expiration for all events with this notif id */
+         event_queue_move_events(eq_b, NULL, cmp_jobid_set_exp, msg);
+  }
+#endif
+
   es->offset = last;
   es->last_modified = stbuf.st_mtime;
   il_log(LOG_DEBUG, "  event store offset set to %ld\n", last);
index 384c89a..3299541 100644 (file)
 #include "glite/lb/lb_perftest.h"
 #endif
 
-static
 int
-cmp_jobid(struct server_msg *msg, void *data) 
-{
-       char *job_id_s = (char*)data;
-       return strcmp(msg->job_id_s, job_id_s) == 0;
-}
-
-static
-int
-cmp_jobid_set_exp(struct server_msg *msg, void *data)
-{
-       struct server_msg *m = (struct server_msg *)data;
-
-       if(strcmp(msg->job_id_s, m->job_id_s) == 0) {
-               msg->expires = m->expires;
-       }
-       return 0;
-}
-
-
-int 
 enqueue_msg(struct event_queue *eq, struct server_msg *msg)
 {
-#if defined(IL_NOTIFICATIONS)
-       struct event_queue *eq_known;
-
-       /* now we have a new event with possibly changed destination,
-          so check for the already known destination and possibly move 
-          events from the original output queue to a new one */
-       eq_known = notifid_map_get_dest(msg->job_id_s);
-       if(eq != eq_known) {
-               /* client has changed delivery address for this notification */
-               if(notifid_map_set_dest(msg->job_id_s, eq) < 0) 
-                       return(-1);
-               /* move all events with this notif_id from eq_known to eq */
-               if(eq_known != NULL) {
-                       event_queue_move_events(eq_known, eq, cmp_jobid, msg->job_id_s);
-                       /* XXX - we should kill the old queue too */
-               }
-       }
-
-       /* if the expiration changed, set new one */
-       if(msg->expires != notifid_map_get_expiration(msg->job_id_s)) {
-               notifid_map_set_expiration(msg->job_id_s, msg->expires);
-               /* set expiration for all events with this notif id */
-               event_queue_move_events(eq, NULL, cmp_jobid_set_exp, msg);
-       }
-#endif
-
        /* fire thread to take care of this queue */
-       if(event_queue_create_thread(eq) < 0) 
+       if(event_queue_create_thread(eq) < 0)
                return(-1);
-       
+
 #if defined(IL_NOTIFICATIONS)
-       /* if there are no data to send, do not send anything 
+       /* if there are no data to send, do not send anything
           (messsage was just to change the delivery address) */
        /* CORRECTION - let the message pass through the output queue
           to commit it properly and keep event_store in sync */
-       /* if(msg->len == 0) 
+       /* if(msg->len == 0)
                return(0);
        */
 #endif
@@ -86,7 +39,7 @@ enqueue_msg(struct event_queue *eq, struct server_msg *msg)
                event_queue_cond_unlock(eq);
                return(-1);
        }
-      
+
        /* signal thread that we have a new message */
        event_queue_signal(eq);
 
@@ -103,7 +56,7 @@ pthread_cond_t flush_cond = PTHREAD_COND_INITIALIZER;
 #endif /* INTERLOGD_FLUSH */
 
 #ifdef INTERLOGD_HANDLE_CMD
-static 
+static
 int
 parse_cmd(char *event, char **job_id_s, long *receipt, int *timeout)
 {
@@ -125,7 +78,7 @@ parse_cmd(char *event, char **job_id_s, long *receipt, int *timeout)
                        continue;
                }
                if(strncmp(token, "DG.COMMAND", r - token) == 0) {
-#if defined(INTERLOGD_FLUSH)                   
+#if defined(INTERLOGD_FLUSH)
                        if(strcmp(++r, "\"flush\"")) {
 #endif
                                il_log(LOG_WARNING, "  command %s not implemented\n", r);
@@ -136,7 +89,7 @@ parse_cmd(char *event, char **job_id_s, long *receipt, int *timeout)
 #endif
                } else if(strncmp(token, "DG.JOBID", r - token) == 0) {
                        char  *p;
-      
+
                        r += 2; /* skip =" */
                        p = index(r, '"');
                        if(p == NULL) { ret = -1; continue; }
@@ -147,7 +100,7 @@ parse_cmd(char *event, char **job_id_s, long *receipt, int *timeout)
                } else if(strncmp(token, "DG.LLLID", r - token) == 0) {
                        sscanf(++r, "%ld", receipt);
                }
-    
+
        }
        return(0);
 }
@@ -159,8 +112,8 @@ parse_cmd(char *event, char **job_id_s, long *receipt, int *timeout)
  *  -1 - failure
  */
 
-static 
-int 
+static
+int
 handle_cmd(il_octet_string_t *event, long offset)
 {
        char *job_id_s;
@@ -172,7 +125,7 @@ handle_cmd(il_octet_string_t *event, long offset)
        struct timeval  tv;
 
        /* parse command */
-       if(parse_cmd(event->data, &job_id_s, &receipt, &timeout) < 0) 
+       if(parse_cmd(event->data, &job_id_s, &receipt, &timeout) < 0)
                return(0);
 
 #if defined(INTERLOGD_FLUSH)
@@ -194,7 +147,7 @@ handle_cmd(il_octet_string_t *event, long offset)
                               error_get_msg());
                        clear_error();
                }
-       } else 
+       } else
          /* this call does not fail :-) */
          event_store_recover_all();
 
@@ -258,7 +211,7 @@ handle_cmd(il_octet_string_t *event, long offset)
                        result = (ret == ETIMEDOUT) ? 0 : -1;
                        break;
                }
-               
+
                /* collect results from reporting threads */
                if(job_id_s) {
                        /* find appropriate queue */
@@ -269,7 +222,7 @@ handle_cmd(il_octet_string_t *event, long offset)
                                if(eq->flushing == 2) {
                                        eq->flushing = 0;
                                        num_replies++;
-                                       result = ((result == 1) || (eq->flush_result < 0))  ? 
+                                       result = ((result == 1) || (eq->flush_result < 0))  ?
                                                eq->flush_result : result;
                                }
                                event_queue_cond_unlock(eq);
@@ -283,7 +236,7 @@ handle_cmd(il_octet_string_t *event, long offset)
                                                eq->flushing = 0;
                                                num_replies++;
                                                il_log(LOG_DEBUG, "    thread reply: %d\n", eq->flush_result);
-                                               result = ((result == 1) || (eq->flush_result < 0))  ? 
+                                               result = ((result == 1) || (eq->flush_result < 0))  ?
                                                        eq->flush_result : result;
                                        }
                                        event_queue_cond_unlock(eq);
@@ -297,7 +250,7 @@ handle_cmd(il_octet_string_t *event, long offset)
                        if(eq->flushing == 2) {
                                eq->flushing = 0;
                                num_replies++;
-                               result = ((result == 1) || (eq->flush_result < 0))  ? 
+                               result = ((result == 1) || (eq->flush_result < 0))  ?
                                        eq->flush_result : result;
                        }
                        event_queue_cond_unlock(eq);
@@ -305,7 +258,7 @@ handle_cmd(il_octet_string_t *event, long offset)
        }
 
        /* prevent deadlock in next flush */
-       if(pthread_mutex_unlock(&flush_lock) < 0) 
+       if(pthread_mutex_unlock(&flush_lock) < 0)
                abort();
 
 
@@ -320,7 +273,7 @@ handle_cmd(il_octet_string_t *event, long offset)
        }
        if(job_id_s) free(job_id_s);
        result = send_confirmation(receipt, result);
-       if(result <= 0) 
+       if(result <= 0)
                il_log(LOG_ERR, "handle_cmd: error sending status: %s\n", error_get_msg());
        return(1);
 
@@ -335,10 +288,10 @@ cmd_error:
 #endif /* INTERLOGD_HANDLE_CMD */
 
 
-static 
+static
 int
 handle_msg(il_octet_string_t *event, long offset)
-{ 
+{
        struct server_msg *msg = NULL;
 #if !defined(IL_NOTIFICATIONS)
        struct event_queue *eq_l;
@@ -353,17 +306,17 @@ handle_msg(il_octet_string_t *event, long offset)
                il_log(LOG_ERR, "    handle_msg: error parsing event '%s':\n      %s\n", event, error_get_msg());
                return(0);
        }
-  
+
        /* sync event store with IPC (if neccessary)
         * This MUST be called before inserting event into output queue! */
-       if((es = event_store_find(msg->job_id_s, NULL)) == NULL) 
+       if((es = event_store_find(msg->job_id_s, NULL)) == NULL)
                return(-1);
        msg->es = es;
-       
+
 #ifdef LB_PERF
-       if(nosync) 
+       if(nosync)
                ret = 1;
-       else 
+       else
 #endif
                ret = event_store_sync(es, offset);
        /* no longer informative:
@@ -393,7 +346,7 @@ handle_msg(il_octet_string_t *event, long offset)
 #else
        eq_s = queue_list_get(msg->job_id_s);
 #endif
-       if(eq_s == NULL) { 
+       if(eq_s == NULL) {
                il_log(LOG_ERR, "    handle_msg: apropriate queue not found: %s\n", error_get_msg());
                clear_error();
        } else {
@@ -428,7 +381,7 @@ err:
 
 
 
-int 
+int
 loop()
 {
        /* receive events */
@@ -436,12 +389,12 @@ loop()
                il_octet_string_t *msg;
                long offset;
                int ret;
-    
+
                if(killflg)
                        exit(0);
 
                clear_error();
-               if((ret = input_queue_get(&msg, &offset, INPUT_TIMEOUT)) < 0) 
+               if((ret = input_queue_get(&msg, &offset, INPUT_TIMEOUT)) < 0)
                {
                        if(error_get_maj() == IL_PROTO) {
                                il_log(LOG_DEBUG, "  premature EOF while receiving event\n");
@@ -450,7 +403,7 @@ loop()
                                event_store_recover_all();
 #endif
                                continue;
-                       } else 
+                       } else
                                return(-1);
                }
                else if(ret == 0) {
@@ -463,7 +416,7 @@ loop()
                continue;
 #endif
 
-#ifdef INTERLOGD_HANDLE_CMD            
+#ifdef INTERLOGD_HANDLE_CMD
                ret = handle_cmd(msg, offset);
                if(ret == 0)
 #endif
@@ -475,7 +428,7 @@ loop()
                                case IL_NOMEM:
                                        return (ret);
                                        break;
-                               default: 
+                               default:
                                        il_log(LOG_ERR, "Error: %s\n", error_get_msg());
                                        break;
                        }
index b2fdde4..edbbad0 100644 (file)
@@ -158,6 +158,7 @@ struct event_queue {
        edg_wll_GssConnection   gss;            /* GSS connection */
        char                   *dest_name;
        int                     dest_port;
+       char                               *dest;
        int                     timeout;        /* queue timeout */
        struct event_queue_msg *tail;           /* last message in the queue */
        struct event_queue_msg *head;           /* first message in the queue */