initial throttling (queue length limit) implementation
authorMichal Voců <michal@ruk.cuni.cz>
Tue, 12 May 2009 12:21:34 +0000 (12:21 +0000)
committerMichal Voců <michal@ruk.cuni.cz>
Tue, 12 May 2009 12:21:34 +0000 (12:21 +0000)
org.glite.lb.logger/src/event_queue.c
org.glite.lb.logger/src/event_store.c
org.glite.lb.logger/src/il_master.c
org.glite.lb.logger/src/interlogd.c
org.glite.lb.logger/src/interlogd.h

index f90afd8..4939f1c 100644 (file)
@@ -131,6 +131,11 @@ event_queue_insert(struct event_queue *eq, struct server_msg *msg)
 
   assert(eq != NULL);
 
+  if(queue_size_high > 0 && (eq->cur_len >= queue_size_high || eq->throttling)) {
+         eq->throttling = 1;
+         return 1;
+  }
+
   if((el = malloc(sizeof(*el))) == NULL)
     return(set_error(IL_NOMEM, ENOMEM, "event_queue_insert: not enough room for queue element"));
 
@@ -265,6 +270,10 @@ event_queue_remove(struct event_queue *eq)
   if(--eq->cur_len == 0)
          eq->times_empty++;
 
+  if(eq->cur_len <= queue_size_low) {
+         eq->throttling = 0;
+  }
+
   event_queue_unlock(eq);
   /* end of critical section */
 
@@ -297,17 +306,22 @@ event_queue_move_events(struct event_queue *eq_s,
        while(p) {
                if((*cmp_func)(p->msg, data)) {
                        il_log(LOG_DEBUG, "      moving event at offset %d(%d) from %s:%d to %s:%d\n",
-                          p->msg->offset, p->msg->generation, eq_s->dest_name, eq_s->dest_port, 
+                          p->msg->offset, p->msg->generation, eq_s->dest_name, eq_s->dest_port,
                           eq_d ? eq_d->dest_name : "trash", eq_d ? eq_d->dest_port : -1);
                        /* il_log(LOG_DEBUG, "  current: %x, next: %x\n", p, p->prev); */
                        /* remove the message from the source list */
                        *source_prev = p->prev;
+                       assert(eq_s->cur_len > 0);
+                       es_s->cur_len--;
                        if(eq_d) {
                                /* append the message at the end of destination list */
                                p->prev = NULL;
                                *dest_tail = p;
                                dest_tail = &(p->prev);
                                eq_d->tail = p;
+                               if(++eq_d->cur_len > eq_d->max_len) {
+                                       eq_d->max_len = eq_d->cur_len;
+                               }
                        } else {
                                /* signal that the message was 'delivered' */
                                event_store_commit(p->msg->es, p->msg->ev_len, queue_list_is_log(eq_s),
@@ -323,6 +337,9 @@ event_queue_move_events(struct event_queue *eq_s,
                }
                p = *source_prev;
        }
+       if(eq_s->cur_len <= queue_size_low) {
+               eq_s->throttling = 0;
+       }
        if(eq_d) event_queue_unlock(eq_d);
        event_queue_unlock(eq_s);
        return(0);
index 1a386b4..edfd08c 100644 (file)
@@ -525,6 +525,7 @@ event_store_recover(struct event_store *es)
   struct server_msg *msg;
   char *event_s;
   int fd, ret;
+  int throttle;
   long last;
   FILE *ef;
   struct flock efl;
@@ -700,8 +701,10 @@ event_store_recover(struct event_store *es)
   /* enqueue all remaining events */
   ret = 1;
   msg = NULL;
+  throttle = 0;
   while((event_s=read_event_string(ef)) != NULL) {
     long last_ls, last_bs;
+    int r;
 
     /* last holds the starting position of event_s in file */
     il_log(LOG_DEBUG, "    reading event at %ld\n", last);
@@ -735,7 +738,7 @@ event_store_recover(struct event_store *es)
 
 #ifdef IL_NOTIFICATIONS
     il_log(LOG_DEBUG, "DEBUG: message dest %s, last dest %s, known dest %s\n",
-          msg->dest, last_dest, eq_b ? eq_b->dest : "none"); 
+          msg->dest, last_dest, eq_b ? eq_b->dest : "none");
     /* check message destination */
     if(msg->dest == NULL) {
             /* the message does not have destination itself, use destination cached for notification id */
@@ -772,6 +775,7 @@ event_store_recover(struct event_store *es)
 
       il_log(LOG_DEBUG, "      queuing event at %ld to logging server\n", last);
 
+      /* TODO: throttling for the log server queue? */
       if(enqueue_msg(eq_l, msg) < 0) {
          break;
       }
@@ -779,20 +783,25 @@ event_store_recover(struct event_store *es)
 #endif
 
     /* now enqueue to the BS, if necessary */
-    if((eq_b != eq_l) &&
-       (last >= last_bs)) {
+    if(!throttle && (eq_b != eq_l) && (last >= last_bs)) {
 
       il_log(LOG_DEBUG, "      queuing event at %ld to bookkeeping server\n", last);
 
-      if(enqueue_msg(eq_b, msg) < 0) {
+      if(r=enqueue_msg(eq_b, msg) < 0) {
          break;
+      } else if(r > 0) {
+             throttle = 1;
       }
     }
     server_msg_free(msg);
     msg = NULL;
 
     /* now last is also the offset behind the last successfully queued event */
-    last = ftell(ef);
+    if(!throttle) {
+           last = ftell(ef);
+    } else {
+           il_log(LOG_DEBUG, "      queue max length limit reached, event at %ld throttled\n", ftell(ef));
+    }
 
     /* ret == 0 means EOF or incomplete event found */
     ret = 0;
@@ -807,7 +816,7 @@ event_store_recover(struct event_store *es)
 
          /* set new destination */
          if(notifid_map_set_dest(es->job_id_s, eq_dest) < 0) {
-                 ret = -1; 
+                 ret = -1;
          } else {
 
                  /* move all events with this notif_id from eq_b to eq_dest */
@@ -816,7 +825,7 @@ event_store_recover(struct event_store *es)
                  il_log(LOG_DEBUG, "    all messages for notif id %s are now destined to %s\n",
                         es->job_id_s, eq_b->dest);
                  if(event_queue_create_thread(eq_b) < 0) {
-                         ret = -1; 
+                         ret = -1;
                  } else {
                          event_queue_cond_lock(eq_b);
                          event_queue_signal(eq_b);
index 3299541..bf4d026 100644 (file)
@@ -18,6 +18,8 @@
 int
 enqueue_msg(struct event_queue *eq, struct server_msg *msg)
 {
+       int ret;
+
        /* fire thread to take care of this queue */
        if(event_queue_create_thread(eq) < 0)
                return(-1);
@@ -35,9 +37,9 @@ enqueue_msg(struct event_queue *eq, struct server_msg *msg)
        event_queue_cond_lock(eq);
 
        /* insert new event */
-       if(event_queue_insert(eq, msg) < 0) {
+       if(ret = event_queue_insert(eq, msg) < 0) {
                event_queue_cond_unlock(eq);
-               return(-1);
+               return ret;
        }
 
        /* signal thread that we have a new message */
@@ -46,7 +48,7 @@ enqueue_msg(struct event_queue *eq, struct server_msg *msg)
        /* allow thread to continue */
        event_queue_cond_unlock(eq);
 
-       return(0);
+       return ret;
 }
 
 
index e6ebded..51a5a74 100644 (file)
@@ -58,6 +58,8 @@ static void usage (int status)
               "  -s, --socket <path>        non-default path of local socket\n"
               "  -L, --lazy [<timeout>]     be lazy when closing connections to servers (default, timeout==0 means turn lazy off)\n"
               "  -p, --parallel [<num>]     use <num> parallel streams to the same server\n"
+              "  -q, --queue-low <num>      queue length that enables another insertions\n"
+              "  -Q, --queue-high <num>     max queue length\n"
 #ifdef LB_PERF
               "  -n, --nosend               PERFTEST: consume events instead of sending\n"
               "  -S, --nosync               PERFTEST: do not check logd files for lost events\n"
@@ -81,6 +83,8 @@ int bs_only = 0;
 int lazy_close = 1;
 int default_close_timeout;
 size_t max_store_size;
+size_t queue_size_low = 0;
+size_t queue_size_high = 0;
 int parallel = 0;
 #ifdef LB_PERF
 int nosend = 0, norecover=0, nosync=0, noparse=0;
@@ -110,6 +114,8 @@ static struct option const long_options[] =
   {"lazy", optional_argument, 0, 'L'},
   {"max-store", required_argument, 0, 'M'},
   {"parallel", optional_argument, 0, 'p'},
+  {"queue_size_low", required_argument, 0, 'q'},
+  {"queue_size_high", required_argument, 0, 'Q'},
 #ifdef LB_PERF
   {"nosend", no_argument, 0, 'n'},
   {"nosync", no_argument, 0, 'S'},
@@ -146,6 +152,8 @@ decode_switches (int argc, char **argv)
                "l:" /* log server */
                           "d" /* debug */
                           "p" /* parallel */
+                          "q:"
+                          "Q:"
 #ifdef LB_PERF
                           "n" /* nosend */
                           "S" /* nosync */
@@ -229,6 +237,14 @@ decode_switches (int argc, char **argv)
                        parallel = 4;
                break;
 
+       case 'q':
+               queue_size_low = atoi(optarg);
+               break;
+
+       case 'Q':
+               queue_size_high = atoi(optarg);
+               break;
+
 #ifdef LB_PERF
        case 'n':
                nosend = 1;
@@ -289,6 +305,13 @@ main (int argc, char **argv)
 
   i = decode_switches (argc, argv);
 
+  /* check for reasonable queue lengths */
+  if(queue_size_low == 0 && queue_size_high > 0 ||
+     queue_size_low > queue_size_high) {
+         fprintf(stderr, "max queue length -Q must be greater than low queue length -q, both or none must be specified!\n");
+         exit(EXIT_FAILURE);
+  }
+
   /* force -b if we do not have log server */
   if(log_server == NULL) {
     log_server = strdup(DEFAULT_LOG_SERVER);
@@ -296,7 +319,7 @@ main (int argc, char **argv)
   }
 
   if(init_errors(verbose ? LOG_DEBUG : LOG_WARNING)) {
-    fprintf(stderr, "Failed to initialize error message subsys. Exiting.\n");
+    fprintf(stderr, "Failed to initialize error message subsystem. Exiting.\n");
     exit(EXIT_FAILURE);
   }
 
index 7210983..50c7264 100644 (file)
@@ -79,6 +79,8 @@ extern int killflg;
 extern int lazy_close;
 extern int default_close_timeout;
 extern size_t max_store_size;
+extern size_t queue_size_high;
+extern size_t queue_size_low;
 extern int parallel;
 #ifdef LB_PERF
 extern int nosend, nosync, norecover, noparse;
@@ -180,6 +182,7 @@ struct event_queue {
        int                     times_empty;    /* number of times the queue was emptied */
        int                     max_len;        /* max queue length */
        int                     cur_len;        /* current length */
+       int                     throttling;     /* event insertion suspend flag */
 };