assert(eq != NULL);
+ if(queue_size_high > 0 && (eq->cur_len >= queue_size_high || eq->throttling)) {
+ eq->throttling = 1;
+ return 1;
+ }
+
if((el = malloc(sizeof(*el))) == NULL)
return(set_error(IL_NOMEM, ENOMEM, "event_queue_insert: not enough room for queue element"));
if(--eq->cur_len == 0)
eq->times_empty++;
+ if(eq->cur_len <= queue_size_low) {
+ eq->throttling = 0;
+ }
+
event_queue_unlock(eq);
/* end of critical section */
while(p) {
if((*cmp_func)(p->msg, data)) {
il_log(LOG_DEBUG, " moving event at offset %d(%d) from %s:%d to %s:%d\n",
- p->msg->offset, p->msg->generation, eq_s->dest_name, eq_s->dest_port,
+ p->msg->offset, p->msg->generation, eq_s->dest_name, eq_s->dest_port,
eq_d ? eq_d->dest_name : "trash", eq_d ? eq_d->dest_port : -1);
/* il_log(LOG_DEBUG, " current: %x, next: %x\n", p, p->prev); */
/* remove the message from the source list */
*source_prev = p->prev;
+ assert(eq_s->cur_len > 0);
+ es_s->cur_len--;
if(eq_d) {
/* append the message at the end of destination list */
p->prev = NULL;
*dest_tail = p;
dest_tail = &(p->prev);
eq_d->tail = p;
+ if(++eq_d->cur_len > eq_d->max_len) {
+ eq_d->max_len = eq_d->cur_len;
+ }
} else {
/* signal that the message was 'delivered' */
event_store_commit(p->msg->es, p->msg->ev_len, queue_list_is_log(eq_s),
}
p = *source_prev;
}
+ if(eq_s->cur_len <= queue_size_low) {
+ eq_s->throttling = 0;
+ }
if(eq_d) event_queue_unlock(eq_d);
event_queue_unlock(eq_s);
return(0);
struct server_msg *msg;
char *event_s;
int fd, ret;
+ int throttle;
long last;
FILE *ef;
struct flock efl;
/* enqueue all remaining events */
ret = 1;
msg = NULL;
+ throttle = 0;
while((event_s=read_event_string(ef)) != NULL) {
long last_ls, last_bs;
+ int r;
/* last holds the starting position of event_s in file */
il_log(LOG_DEBUG, " reading event at %ld\n", last);
#ifdef IL_NOTIFICATIONS
il_log(LOG_DEBUG, "DEBUG: message dest %s, last dest %s, known dest %s\n",
- msg->dest, last_dest, eq_b ? eq_b->dest : "none");
+ msg->dest, last_dest, eq_b ? eq_b->dest : "none");
/* check message destination */
if(msg->dest == NULL) {
/* the message does not have destination itself, use destination cached for notification id */
il_log(LOG_DEBUG, " queuing event at %ld to logging server\n", last);
+ /* TODO: throttling for the log server queue? */
if(enqueue_msg(eq_l, msg) < 0) {
break;
}
#endif
/* now enqueue to the BS, if necessary */
- if((eq_b != eq_l) &&
- (last >= last_bs)) {
+ if(!throttle && (eq_b != eq_l) && (last >= last_bs)) {
il_log(LOG_DEBUG, " queuing event at %ld to bookkeeping server\n", last);
- if(enqueue_msg(eq_b, msg) < 0) {
+ if(r=enqueue_msg(eq_b, msg) < 0) {
break;
+ } else if(r > 0) {
+ throttle = 1;
}
}
server_msg_free(msg);
msg = NULL;
/* now last is also the offset behind the last successfully queued event */
- last = ftell(ef);
+ if(!throttle) {
+ last = ftell(ef);
+ } else {
+ il_log(LOG_DEBUG, " queue max length limit reached, event at %ld throttled\n", ftell(ef));
+ }
/* ret == 0 means EOF or incomplete event found */
ret = 0;
/* set new destination */
if(notifid_map_set_dest(es->job_id_s, eq_dest) < 0) {
- ret = -1;
+ ret = -1;
} else {
/* move all events with this notif_id from eq_b to eq_dest */
il_log(LOG_DEBUG, " all messages for notif id %s are now destined to %s\n",
es->job_id_s, eq_b->dest);
if(event_queue_create_thread(eq_b) < 0) {
- ret = -1;
+ ret = -1;
} else {
event_queue_cond_lock(eq_b);
event_queue_signal(eq_b);
int
enqueue_msg(struct event_queue *eq, struct server_msg *msg)
{
+ int ret;
+
/* fire thread to take care of this queue */
if(event_queue_create_thread(eq) < 0)
return(-1);
event_queue_cond_lock(eq);
/* insert new event */
- if(event_queue_insert(eq, msg) < 0) {
+ if(ret = event_queue_insert(eq, msg) < 0) {
event_queue_cond_unlock(eq);
- return(-1);
+ return ret;
}
/* signal thread that we have a new message */
/* allow thread to continue */
event_queue_cond_unlock(eq);
- return(0);
+ return ret;
}
" -s, --socket <path> non-default path of local socket\n"
" -L, --lazy [<timeout>] be lazy when closing connections to servers (default, timeout==0 means turn lazy off)\n"
" -p, --parallel [<num>] use <num> parallel streams to the same server\n"
+ " -q, --queue-low <num> queue length that enables another insertions\n"
+ " -Q, --queue-high <num> max queue length\n"
#ifdef LB_PERF
" -n, --nosend PERFTEST: consume events instead of sending\n"
" -S, --nosync PERFTEST: do not check logd files for lost events\n"
int lazy_close = 1;
int default_close_timeout;
size_t max_store_size;
+size_t queue_size_low = 0;
+size_t queue_size_high = 0;
int parallel = 0;
#ifdef LB_PERF
int nosend = 0, norecover=0, nosync=0, noparse=0;
{"lazy", optional_argument, 0, 'L'},
{"max-store", required_argument, 0, 'M'},
{"parallel", optional_argument, 0, 'p'},
+ {"queue_size_low", required_argument, 0, 'q'},
+ {"queue_size_high", required_argument, 0, 'Q'},
#ifdef LB_PERF
{"nosend", no_argument, 0, 'n'},
{"nosync", no_argument, 0, 'S'},
"l:" /* log server */
"d" /* debug */
"p" /* parallel */
+ "q:"
+ "Q:"
#ifdef LB_PERF
"n" /* nosend */
"S" /* nosync */
parallel = 4;
break;
+ case 'q':
+ queue_size_low = atoi(optarg);
+ break;
+
+ case 'Q':
+ queue_size_high = atoi(optarg);
+ break;
+
#ifdef LB_PERF
case 'n':
nosend = 1;
i = decode_switches (argc, argv);
+ /* check for reasonable queue lengths */
+ if(queue_size_low == 0 && queue_size_high > 0 ||
+ queue_size_low > queue_size_high) {
+ fprintf(stderr, "max queue length -Q must be greater than low queue length -q, both or none must be specified!\n");
+ exit(EXIT_FAILURE);
+ }
+
/* force -b if we do not have log server */
if(log_server == NULL) {
log_server = strdup(DEFAULT_LOG_SERVER);
}
if(init_errors(verbose ? LOG_DEBUG : LOG_WARNING)) {
- fprintf(stderr, "Failed to initialize error message subsys. Exiting.\n");
+ fprintf(stderr, "Failed to initialize error message subsystem. Exiting.\n");
exit(EXIT_FAILURE);
}
extern int lazy_close;
extern int default_close_timeout;
extern size_t max_store_size;
+extern size_t queue_size_high;
+extern size_t queue_size_low;
extern int parallel;
#ifdef LB_PERF
extern int nosend, nosync, norecover, noparse;
int times_empty; /* number of times the queue was emptied */
int max_len; /* max queue length */
int cur_len; /* current length */
+ int throttling; /* event insertion suspend flag */
};