From: Michal Voců Date: Fri, 14 Jan 2005 14:56:01 +0000 (+0000) Subject: * patches from the LCG (and trunk) X-Git-Tag: glite-lb-logger_R_1_0_1~96 X-Git-Url: http://scientific.zcu.cz/git/?a=commitdiff_plain;h=2d54acee60a824fca95dbe02eae6db0c781527fd;p=jra1mw.git * patches from the LCG (and trunk) --- diff --git a/org.glite.lb.logger/src/event_queue.c b/org.glite.lb.logger/src/event_queue.c index d311a1a..a0a018f 100644 --- a/org.glite.lb.logger/src/event_queue.c +++ b/org.glite.lb.logger/src/event_queue.c @@ -284,8 +284,9 @@ event_queue_move_events(struct event_queue *eq_s, struct event_queue *eq_d, char eq_s->tail = NULL; while(p) { if(strcmp(p->msg->job_id_s, notif_id) == 0) { - il_log(LOG_DEBUG, " moving event with notif id %s from %s:%d to %s:%d\n", - notif_id, eq_s->dest_name,eq_s->dest_port, eq_d ? eq_d->dest_name : "trash",eq_d ? eq_d->dest_port : -1); + il_log(LOG_DEBUG, " moving event at offset %d from %s:%d to %s:%d\n", + p->msg->offset, eq_s->dest_name,eq_s->dest_port, eq_d ? eq_d->dest_name : "trash",eq_d ? eq_d->dest_port : -1); + il_log(LOG_DEBUG, " current: %x, next: %x\n", p, p->prev); /* remove the message from the source list */ *source_prev = p->prev; if(eq_d) { diff --git a/org.glite.lb.logger/src/event_store.c b/org.glite.lb.logger/src/event_store.c index 295b2f0..3ee97f3 100644 --- a/org.glite.lb.logger/src/event_store.c +++ b/org.glite.lb.logger/src/event_store.c @@ -473,14 +473,25 @@ event_store_sync(struct event_store *es, long offset) * 2) es->offset is set only by recover() and next(). * 3) Additional recover can not do much harm. * 4) And next() is only called by the same thread as sync(). - * => no one is messing with us right now */ + * 5) use_lock is in place, so no cleanup possible + * => no one is messing with us right now */ event_store_lock_ro(es); if(ret < 0) ret = -1; else - /* somehow we suppose that now es->offset >= offset */ - /* in fact it must be es->offset > offset, anything else would be weird */ - ret = (es->offset > offset) ? 0 : 1; + if(es->offset <= offset) { + /* Apparently there is something wrong - we are receiving an event + * which is beyond the end of file. Someone must have removed the file + * when we were not looking. The question is - what should we do with the event? + * We have to send it, as this is the only one occasion when we see it. + * However, we must not allow the es->offset to be set using this event, + * as it would point after the end of file. Sort this out in event_store_next(). + */ + ret = 1; + } else if(es->offset > offset) { + /* we have seen at least this event */ + ret = 0; + } } event_store_unlock(es); return(ret); @@ -488,12 +499,20 @@ event_store_sync(struct event_store *es, long offset) int -event_store_next(struct event_store *es, int len) +event_store_next(struct event_store *es, long offset, int len) { assert(es != NULL); event_store_lock(es); - es->offset += len; + /* Whoa, be careful now. The es->offset points right after the last enqueued event, + * but it may not be the offset of the event WE have just enqueued, because:! + * 1) someone could have removed the event file behind our back + * 2) the file could have been recover()ed and more events read + * In either case the offset should not be moved. + */ + if(es->offset == offset) { + es->offset += len; + } event_store_unlock(es); return(0); diff --git a/org.glite.lb.logger/src/il_master.c b/org.glite.lb.logger/src/il_master.c index f6414c9..b045112 100644 --- a/org.glite.lb.logger/src/il_master.c +++ b/org.glite.lb.logger/src/il_master.c @@ -40,14 +40,15 @@ enqueue_msg(struct event_queue *eq, struct server_msg *msg) (messsage was just to change the delivery address) */ if(msg->len == 0) return(0); - #endif /* avoid losing signal to thread */ event_queue_cond_lock(eq); /* insert new event */ - if(event_queue_insert(eq, msg) < 0) + if(event_queue_insert(eq, msg) < 0) { + event_queue_cond_unlock(eq); return(-1); + } /* signal thread that we have a new message */ event_queue_signal(eq); @@ -359,7 +360,8 @@ handle_msg(char *event, long offset) #endif /* if there was no error, set the next expected event offset */ - event_store_next(es, msg->ev_len); + event_store_next(es, offset, msg->ev_len); + /* allow cleanup thread to check on this event_store */ event_store_release(es); diff --git a/org.glite.lb.logger/src/interlogd.h b/org.glite.lb.logger/src/interlogd.h index 3c96828..74fc469 100644 --- a/org.glite.lb.logger/src/interlogd.h +++ b/org.glite.lb.logger/src/interlogd.h @@ -190,7 +190,7 @@ int event_store_cleanup(); int event_store_recover_all(void); struct event_store *event_store_find(char *); int event_store_sync(struct event_store *, long); -int event_store_next(struct event_store *, int); +int event_store_next(struct event_store *, long, int); int event_store_commit(struct event_store *, int, int); int event_store_recover(struct event_store *); int event_store_release(struct event_store *); diff --git a/org.glite.lb.logger/src/queue_thread.c b/org.glite.lb.logger/src/queue_thread.c index 5eeff30..3269dd0 100644 --- a/org.glite.lb.logger/src/queue_thread.c +++ b/org.glite.lb.logger/src/queue_thread.c @@ -39,6 +39,8 @@ queue_thread(void *q) pthread_exit(NULL); } + il_log(LOG_DEBUG, " started new thread for delivery to %s:%d\n", eq->dest_name, eq->dest_port); + pthread_cleanup_push(queue_thread_cleanup, q); event_queue_cond_lock(eq); @@ -152,19 +154,26 @@ event_queue_create_thread(struct event_queue *eq) { assert(eq != NULL); + event_queue_lock(eq); + /* if there is a thread already, just return */ - if(eq->thread_id > 0) + if(eq->thread_id > 0) { + event_queue_unlock(eq); return(0); + } /* create the thread itself */ if(pthread_create(&eq->thread_id, NULL, queue_thread, eq) < 0) { eq->thread_id = 0; set_error(IL_SYS, errno, "event_queue_create_thread: error creating new thread"); + event_queue_unlock(eq); return(-1); } /* the thread is never going to be joined */ pthread_detach(eq->thread_id); + + event_queue_unlock(eq); return(1); }