* 2) es->offset is set only by recover() and next().
* 3) Additional recover can not do much harm.
* 4) And next() is only called by the same thread as sync().
- * => no one is messing with us right now */
+ * 5) use_lock is in place, so no cleanup possible
+ * => no one is messing with us right now */
event_store_lock_ro(es);
if(ret < 0)
ret = -1;
else
- /* somehow we suppose that now es->offset >= offset */
- /* in fact it must be es->offset > offset, anything else would be weird */
- ret = (es->offset > offset) ? 0 : 1;
+ if(es->offset <= offset) {
+ /* Apparently there is something wrong - we are receiving an event
+ * which is beyond the end of file. Someone must have removed the file
+ * when we were not looking. The question is - what should we do with the event?
+ * We have to send it, as this is the only one occasion when we see it.
+ * However, we must not allow the es->offset to be set using this event,
+ * as it would point after the end of file. Sort this out in event_store_next().
+ */
+ ret = 1;
+ } else if(es->offset > offset) {
+ /* we have seen at least this event */
+ ret = 0;
+ }
}
event_store_unlock(es);
return(ret);
int
-event_store_next(struct event_store *es, int len)
+event_store_next(struct event_store *es, long offset, int len)
{
assert(es != NULL);
event_store_lock(es);
- es->offset += len;
+ /* Whoa, be careful now. The es->offset points right after the last enqueued event,
+ * but it may not be the offset of the event WE have just enqueued, because:!
+ * 1) someone could have removed the event file behind our back
+ * 2) the file could have been recover()ed and more events read
+ * In either case the offset should not be moved.
+ */
+ if(es->offset == offset) {
+ es->offset += len;
+ }
event_store_unlock(es);
return(0);
(messsage was just to change the delivery address) */
if(msg->len == 0)
return(0);
-
#endif
/* avoid losing signal to thread */
event_queue_cond_lock(eq);
/* insert new event */
- if(event_queue_insert(eq, msg) < 0)
+ if(event_queue_insert(eq, msg) < 0) {
+ event_queue_cond_unlock(eq);
return(-1);
+ }
/* signal thread that we have a new message */
event_queue_signal(eq);
#endif
/* if there was no error, set the next expected event offset */
- event_store_next(es, msg->ev_len);
+ event_store_next(es, offset, msg->ev_len);
+
/* allow cleanup thread to check on this event_store */
event_store_release(es);
int event_store_recover_all(void);
struct event_store *event_store_find(char *);
int event_store_sync(struct event_store *, long);
-int event_store_next(struct event_store *, int);
+int event_store_next(struct event_store *, long, int);
int event_store_commit(struct event_store *, int, int);
int event_store_recover(struct event_store *);
int event_store_release(struct event_store *);
pthread_exit(NULL);
}
+ il_log(LOG_DEBUG, " started new thread for delivery to %s:%d\n", eq->dest_name, eq->dest_port);
+
pthread_cleanup_push(queue_thread_cleanup, q);
event_queue_cond_lock(eq);
{
assert(eq != NULL);
+ event_queue_lock(eq);
+
/* if there is a thread already, just return */
- if(eq->thread_id > 0)
+ if(eq->thread_id > 0) {
+ event_queue_unlock(eq);
return(0);
+ }
/* create the thread itself */
if(pthread_create(&eq->thread_id, NULL, queue_thread, eq) < 0) {
eq->thread_id = 0;
set_error(IL_SYS, errno, "event_queue_create_thread: error creating new thread");
+ event_queue_unlock(eq);
return(-1);
}
/* the thread is never going to be joined */
pthread_detach(eq->thread_id);
+
+ event_queue_unlock(eq);
return(1);
}