From: Michal Voců Date: Tue, 12 May 2009 12:21:34 +0000 (+0000) Subject: initial throttling (queue length limit) implementation X-Git-Tag: glite-lb-client_R_4_0_2_2~17 X-Git-Url: http://scientific.zcu.cz/git/?a=commitdiff_plain;h=6c76198763f9ffc1c64be79634d1735216ad041b;p=jra1mw.git initial throttling (queue length limit) implementation --- diff --git a/org.glite.lb.logger/src/event_queue.c b/org.glite.lb.logger/src/event_queue.c index f90afd8..4939f1c 100644 --- a/org.glite.lb.logger/src/event_queue.c +++ b/org.glite.lb.logger/src/event_queue.c @@ -131,6 +131,11 @@ event_queue_insert(struct event_queue *eq, struct server_msg *msg) assert(eq != NULL); + if(queue_size_high > 0 && (eq->cur_len >= queue_size_high || eq->throttling)) { + eq->throttling = 1; + return 1; + } + if((el = malloc(sizeof(*el))) == NULL) return(set_error(IL_NOMEM, ENOMEM, "event_queue_insert: not enough room for queue element")); @@ -265,6 +270,10 @@ event_queue_remove(struct event_queue *eq) if(--eq->cur_len == 0) eq->times_empty++; + if(eq->cur_len <= queue_size_low) { + eq->throttling = 0; + } + event_queue_unlock(eq); /* end of critical section */ @@ -297,17 +306,22 @@ event_queue_move_events(struct event_queue *eq_s, while(p) { if((*cmp_func)(p->msg, data)) { il_log(LOG_DEBUG, " moving event at offset %d(%d) from %s:%d to %s:%d\n", - p->msg->offset, p->msg->generation, eq_s->dest_name, eq_s->dest_port, + p->msg->offset, p->msg->generation, eq_s->dest_name, eq_s->dest_port, eq_d ? eq_d->dest_name : "trash", eq_d ? eq_d->dest_port : -1); /* il_log(LOG_DEBUG, " current: %x, next: %x\n", p, p->prev); */ /* remove the message from the source list */ *source_prev = p->prev; + assert(eq_s->cur_len > 0); + es_s->cur_len--; if(eq_d) { /* append the message at the end of destination list */ p->prev = NULL; *dest_tail = p; dest_tail = &(p->prev); eq_d->tail = p; + if(++eq_d->cur_len > eq_d->max_len) { + eq_d->max_len = eq_d->cur_len; + } } else { /* signal that the message was 'delivered' */ event_store_commit(p->msg->es, p->msg->ev_len, queue_list_is_log(eq_s), @@ -323,6 +337,9 @@ event_queue_move_events(struct event_queue *eq_s, } p = *source_prev; } + if(eq_s->cur_len <= queue_size_low) { + eq_s->throttling = 0; + } if(eq_d) event_queue_unlock(eq_d); event_queue_unlock(eq_s); return(0); diff --git a/org.glite.lb.logger/src/event_store.c b/org.glite.lb.logger/src/event_store.c index 1a386b4..edfd08c 100644 --- a/org.glite.lb.logger/src/event_store.c +++ b/org.glite.lb.logger/src/event_store.c @@ -525,6 +525,7 @@ event_store_recover(struct event_store *es) struct server_msg *msg; char *event_s; int fd, ret; + int throttle; long last; FILE *ef; struct flock efl; @@ -700,8 +701,10 @@ event_store_recover(struct event_store *es) /* enqueue all remaining events */ ret = 1; msg = NULL; + throttle = 0; while((event_s=read_event_string(ef)) != NULL) { long last_ls, last_bs; + int r; /* last holds the starting position of event_s in file */ il_log(LOG_DEBUG, " reading event at %ld\n", last); @@ -735,7 +738,7 @@ event_store_recover(struct event_store *es) #ifdef IL_NOTIFICATIONS il_log(LOG_DEBUG, "DEBUG: message dest %s, last dest %s, known dest %s\n", - msg->dest, last_dest, eq_b ? eq_b->dest : "none"); + msg->dest, last_dest, eq_b ? eq_b->dest : "none"); /* check message destination */ if(msg->dest == NULL) { /* the message does not have destination itself, use destination cached for notification id */ @@ -772,6 +775,7 @@ event_store_recover(struct event_store *es) il_log(LOG_DEBUG, " queuing event at %ld to logging server\n", last); + /* TODO: throttling for the log server queue? */ if(enqueue_msg(eq_l, msg) < 0) { break; } @@ -779,20 +783,25 @@ event_store_recover(struct event_store *es) #endif /* now enqueue to the BS, if necessary */ - if((eq_b != eq_l) && - (last >= last_bs)) { + if(!throttle && (eq_b != eq_l) && (last >= last_bs)) { il_log(LOG_DEBUG, " queuing event at %ld to bookkeeping server\n", last); - if(enqueue_msg(eq_b, msg) < 0) { + if(r=enqueue_msg(eq_b, msg) < 0) { break; + } else if(r > 0) { + throttle = 1; } } server_msg_free(msg); msg = NULL; /* now last is also the offset behind the last successfully queued event */ - last = ftell(ef); + if(!throttle) { + last = ftell(ef); + } else { + il_log(LOG_DEBUG, " queue max length limit reached, event at %ld throttled\n", ftell(ef)); + } /* ret == 0 means EOF or incomplete event found */ ret = 0; @@ -807,7 +816,7 @@ event_store_recover(struct event_store *es) /* set new destination */ if(notifid_map_set_dest(es->job_id_s, eq_dest) < 0) { - ret = -1; + ret = -1; } else { /* move all events with this notif_id from eq_b to eq_dest */ @@ -816,7 +825,7 @@ event_store_recover(struct event_store *es) il_log(LOG_DEBUG, " all messages for notif id %s are now destined to %s\n", es->job_id_s, eq_b->dest); if(event_queue_create_thread(eq_b) < 0) { - ret = -1; + ret = -1; } else { event_queue_cond_lock(eq_b); event_queue_signal(eq_b); diff --git a/org.glite.lb.logger/src/il_master.c b/org.glite.lb.logger/src/il_master.c index 3299541..bf4d026 100644 --- a/org.glite.lb.logger/src/il_master.c +++ b/org.glite.lb.logger/src/il_master.c @@ -18,6 +18,8 @@ int enqueue_msg(struct event_queue *eq, struct server_msg *msg) { + int ret; + /* fire thread to take care of this queue */ if(event_queue_create_thread(eq) < 0) return(-1); @@ -35,9 +37,9 @@ enqueue_msg(struct event_queue *eq, struct server_msg *msg) event_queue_cond_lock(eq); /* insert new event */ - if(event_queue_insert(eq, msg) < 0) { + if(ret = event_queue_insert(eq, msg) < 0) { event_queue_cond_unlock(eq); - return(-1); + return ret; } /* signal thread that we have a new message */ @@ -46,7 +48,7 @@ enqueue_msg(struct event_queue *eq, struct server_msg *msg) /* allow thread to continue */ event_queue_cond_unlock(eq); - return(0); + return ret; } diff --git a/org.glite.lb.logger/src/interlogd.c b/org.glite.lb.logger/src/interlogd.c index e6ebded..51a5a74 100644 --- a/org.glite.lb.logger/src/interlogd.c +++ b/org.glite.lb.logger/src/interlogd.c @@ -58,6 +58,8 @@ static void usage (int status) " -s, --socket non-default path of local socket\n" " -L, --lazy [] be lazy when closing connections to servers (default, timeout==0 means turn lazy off)\n" " -p, --parallel [] use parallel streams to the same server\n" + " -q, --queue-low queue length that enables another insertions\n" + " -Q, --queue-high max queue length\n" #ifdef LB_PERF " -n, --nosend PERFTEST: consume events instead of sending\n" " -S, --nosync PERFTEST: do not check logd files for lost events\n" @@ -81,6 +83,8 @@ int bs_only = 0; int lazy_close = 1; int default_close_timeout; size_t max_store_size; +size_t queue_size_low = 0; +size_t queue_size_high = 0; int parallel = 0; #ifdef LB_PERF int nosend = 0, norecover=0, nosync=0, noparse=0; @@ -110,6 +114,8 @@ static struct option const long_options[] = {"lazy", optional_argument, 0, 'L'}, {"max-store", required_argument, 0, 'M'}, {"parallel", optional_argument, 0, 'p'}, + {"queue_size_low", required_argument, 0, 'q'}, + {"queue_size_high", required_argument, 0, 'Q'}, #ifdef LB_PERF {"nosend", no_argument, 0, 'n'}, {"nosync", no_argument, 0, 'S'}, @@ -146,6 +152,8 @@ decode_switches (int argc, char **argv) "l:" /* log server */ "d" /* debug */ "p" /* parallel */ + "q:" + "Q:" #ifdef LB_PERF "n" /* nosend */ "S" /* nosync */ @@ -229,6 +237,14 @@ decode_switches (int argc, char **argv) parallel = 4; break; + case 'q': + queue_size_low = atoi(optarg); + break; + + case 'Q': + queue_size_high = atoi(optarg); + break; + #ifdef LB_PERF case 'n': nosend = 1; @@ -289,6 +305,13 @@ main (int argc, char **argv) i = decode_switches (argc, argv); + /* check for reasonable queue lengths */ + if(queue_size_low == 0 && queue_size_high > 0 || + queue_size_low > queue_size_high) { + fprintf(stderr, "max queue length -Q must be greater than low queue length -q, both or none must be specified!\n"); + exit(EXIT_FAILURE); + } + /* force -b if we do not have log server */ if(log_server == NULL) { log_server = strdup(DEFAULT_LOG_SERVER); @@ -296,7 +319,7 @@ main (int argc, char **argv) } if(init_errors(verbose ? LOG_DEBUG : LOG_WARNING)) { - fprintf(stderr, "Failed to initialize error message subsys. Exiting.\n"); + fprintf(stderr, "Failed to initialize error message subsystem. Exiting.\n"); exit(EXIT_FAILURE); } diff --git a/org.glite.lb.logger/src/interlogd.h b/org.glite.lb.logger/src/interlogd.h index 7210983..50c7264 100644 --- a/org.glite.lb.logger/src/interlogd.h +++ b/org.glite.lb.logger/src/interlogd.h @@ -79,6 +79,8 @@ extern int killflg; extern int lazy_close; extern int default_close_timeout; extern size_t max_store_size; +extern size_t queue_size_high; +extern size_t queue_size_low; extern int parallel; #ifdef LB_PERF extern int nosend, nosync, norecover, noparse; @@ -180,6 +182,7 @@ struct event_queue { int times_empty; /* number of times the queue was emptied */ int max_len; /* max queue length */ int cur_len; /* current length */ + int throttling; /* event insertion suspend flag */ };