char *event_s;
int fd, ret;
int throttle;
- long last;
+ long fpos, last;
FILE *ef;
struct flock efl;
char err_msg[128];
ret = 1;
msg = NULL;
throttle = 0;
+ fpos = last;
while((event_s=read_event_string(ef)) != NULL) {
long last_ls, last_bs;
int r;
- /* last holds the starting position of event_s in file */
- il_log(LOG_DEBUG, " reading event at %ld\n", last);
+ /* fpos holds the starting position of event_s in file */
+ il_log(LOG_DEBUG, " reading event at %ld\n", fpos);
last_ls = es->last_committed_ls;
last_bs = es->last_committed_bs;
/* now enqueue to the BS, if necessary */
if(!throttle && (eq_b != eq_l) && (last >= last_bs)) {
- il_log(LOG_DEBUG, " queuing event at %ld to bookkeeping server\n", last);
-
if((r=enqueue_msg(eq_b, msg)) < 0) {
break;
} else if(r > 0) {
throttle = 1;
+ il_log(LOG_DEBUG, " queue max length limit reached, event at %ld throttled\n", fpos);
+ } else {
+ il_log(LOG_DEBUG, " queuing event at %ld to bookkeeping server\n", last);
}
}
server_msg_free(msg);
msg = NULL;
+ fpos = ftell(ef);
/* now last is also the offset behind the last successfully queued event */
if(!throttle) {
- last = ftell(ef);
- } else {
- il_log(LOG_DEBUG, " queue max length limit reached, event at %ld throttled\n", ftell(ef));
+ last = fpos;
}
/* ret == 0 means EOF or incomplete event found */