if(es->event_file_name) free(es->event_file_name);
if(es->control_file_name) free(es->control_file_name);
pthread_rwlock_destroy(&es->use_lock);
- pthread_rwlock_destroy(&es->update_lock);
+ pthread_rwlock_destroy(&es->commit_lock);
+ pthread_rwlock_destroy(&es->offset_lock);
free(es);
return(0);
es->control_file_name = jobid2controlfile(job_id);
IL_EVENT_ID_FREE(job_id);
- if(pthread_rwlock_init(&es->update_lock, NULL))
+ if(pthread_rwlock_init(&es->commit_lock, NULL))
+ abort();
+ if(pthread_rwlock_init(&es->offset_lock, NULL))
abort();
if(pthread_rwlock_init(&es->use_lock, NULL))
abort();
{
assert(es != NULL);
- if(pthread_rwlock_rdlock(&es->update_lock))
+ if(pthread_rwlock_rdlock(&es->commit_lock))
abort();
return(0);
{
assert(es != NULL);
- if(pthread_rwlock_wrlock(&es->update_lock))
+ if(pthread_rwlock_wrlock(&es->commit_lock))
abort();
return(0);
{
assert(es != NULL);
- if(pthread_rwlock_unlock(&es->update_lock))
+ if(pthread_rwlock_unlock(&es->commit_lock))
abort();
return(0);
}
es->last_committed_bs = 0;
es->offset = 0;
+ /* increase cleanup count, this will invalidate all commits from previous generation */
+ es->generation++;
+
return(0);
}
eq_l = queue_list_get(NULL);
#endif
+ /* lock the event_store and offset locks */
event_store_lock(es);
+ if(pthread_rwlock_wrlock(&es->offset_lock))
+ abort();
il_log(LOG_DEBUG, " reading events from %s\n", es->event_file_name);
es->event_file_name);
set_error(IL_SYS, errno, err_msg);
event_store_unlock(es);
+ if(pthread_rwlock_unlock(&es->offset_lock))
+ abort();
return(-1);
}
es->event_file_name);
set_error(IL_SYS, errno, err_msg);
event_store_unlock(es);
+ if(pthread_rwlock_unlock(&es->offset_lock))
+ abort();
fclose(ef);
return(-1);
}
il_log(LOG_ERR, " could not stat event file %s: %s\n", es->event_file_name, strerror(errno));
fclose(ef);
event_store_unlock(es);
+ if(pthread_rwlock_unlock(&es->offset_lock))
+ abort();
return -1;
} else {
if((es->offset == stbuf.st_size) && (es->last_modified == stbuf.st_mtime)) {
il_log(LOG_DEBUG, " event file not modified since last visit, skipping\n");
fclose(ef);
event_store_unlock(es);
+ if(pthread_rwlock_unlock(&es->offset_lock))
+ abort();
return(0);
}
}
set_error(IL_SYS, errno, "event_store_recover: error setting position for read");
event_store_unlock(es);
fclose(ef);
+ if(pthread_rwlock_unlock(&es->offset_lock))
+ abort();
return(-1);
}
/* the last enqueued event MUST end with EVENT_SEPARATOR,
set_error(IL_SYS, errno, "event_store_recover: error setting position for read");
event_store_unlock(es);
fclose(ef);
+ if(pthread_rwlock_unlock(&es->offset_lock))
+ abort();
return(-1);
}
break;
}
}
+ /* now we have:
+ * - event file opened at position 'last'
+ * - offset and last_committed_* potentially reset to zero
+ */
+
+ /* release lock on commits, offset remains locked;
+ * other threads are allowed to send/remove events, but not insert
+ */
+ event_store_unlock(es);
+
/* enqueue all remaining events */
ret = 1;
msg = NULL;
while((event_s=read_event_string(ef)) != NULL) {
-
+ long last_ls, last_bs;
+
/* last holds the starting position of event_s in file */
il_log(LOG_DEBUG, " reading event at %ld\n", last);
+ event_store_lock_ro(es);
+ last_ls = es->last_committed_ls;
+ last_bs = es->last_committed_bs;
+ event_store_unlock(es);
+
/* break from now on means there was some error */
ret = -1;
/* actually do not bother if quarantine succeeded or not - we could not do more */
event_store_quarantine(es);
fclose(ef);
- event_store_unlock(es);
+ if(pthread_rwlock_unlock(&es->offset_lock))
+ abort();
return(-1);
}
msg->es = es;
+ msg->generation = es->generation;
/* first enqueue to the LS */
- if(!bs_only && (last >= es->last_committed_ls)) {
+ if(!bs_only && (last >= last_ls)) {
il_log(LOG_DEBUG, " queueing event at %ld to logging server\n", last);
/* now enqueue to the BS, if neccessary */
if((eq_b != eq_l) &&
- (last >= es->last_committed_bs)) {
+ (last >= last_bs)) {
il_log(LOG_DEBUG, " queueing event at %ld to bookkeeping server\n", last);
} /* while */
- /* due to this little assignment we had to lock the event_store for writing */
es->offset = last;
es->last_modified = stbuf.st_mtime;
il_log(LOG_DEBUG, " event store offset set to %ld\n", last);
fclose(ef);
il_log(LOG_DEBUG, " finished reading events with %d\n", ret);
- event_store_unlock(es);
+ if(pthread_rwlock_unlock(&es->offset_lock))
+ abort();
+
return(ret);
}
*
*/
int
-event_store_commit(struct event_store *es, int len, int ls)
+event_store_commit(struct event_store *es, int len, int ls, int generation)
{
assert(es != NULL);
+ /* do not move counters if event store with this message was cleaned up
+ * (this can happen only when moving to quarantine)
+ */
+ /* XXX - assume int access is atomic */
+ if(generation != es->generation)
+ return 0;
+
event_store_lock(es);
if(ls)
/* prevent sender threads from updating */
event_store_lock(es);
-
+
il_log(LOG_DEBUG, " trying to cleanup event store %s\n", es->job_id_s);
il_log(LOG_DEBUG, " bytes sent to logging server: %d\n", es->last_committed_ls);
il_log(LOG_DEBUG, " bytes sent to bookkeeping server: %d\n", es->last_committed_bs);
return(0);
}
+ if(fd = pthread_rwlock_wrlock(&es->offset_lock)) {
+ fprintf(stderr, "Fatal locking error: %s\n", strerror(fd));
+ abort();
+ }
+
/* the file can only be removed when all the events were succesfully sent
(ie. committed both by LS and BS */
/* That also implies that the event queues are 'empty' at the moment. */
/* if we can not open the event store, it is an error and the struct should be removed */
/* XXX - is it true? */
event_store_unlock(es);
+ if(pthread_rwlock_unlock(&es->offset_lock))
+ abort();
il_log(LOG_ERR, " event_store_clean: error opening event file: %s\n", strerror(errno));
return(1);
}
il_log(LOG_DEBUG, " could not lock event file, cleanup aborted\n");
fclose(ef);
event_store_unlock(es);
+ if(pthread_rwlock_unlock(&es->offset_lock))
+ abort();
if(errno != EACCES &&
errno != EAGAIN) {
set_error(IL_SYS, errno, "event_store_clean: error locking event file");
if(fseek(ef, 0, SEEK_END) < 0) {
set_error(IL_SYS, errno, "event_store_clean: error seeking the end of file");
event_store_unlock(es);
+ if(pthread_rwlock_unlock(&es->offset_lock))
+ abort();
fclose(ef);
return(-1);
}
if(es->last_committed_ls < last) {
fclose(ef);
event_store_unlock(es);
+ if(pthread_rwlock_unlock(&es->offset_lock))
+ abort();
il_log(LOG_DEBUG, " events still waiting in queue, cleanup aborted\n");
return(0);
} else if( es->last_committed_ls > last) {
some undelivered events referring to that event store */
fclose(ef);
event_store_unlock(es);
+ if(pthread_rwlock_unlock(&es->offset_lock))
+ abort();
return(0);
}
es->last_committed_bs = 0;
es->offset = 0;
+ /* increasing the generation count is rather pointless here, because there
+ are no messages waiting in the queue that would be invalidated */
+ /* es->generation++ */
+
/* unlock the event_store even if it is going to be removed */
event_store_unlock(es);
+ if(pthread_rwlock_unlock(&es->offset_lock))
+ abort();
/* close the event file (that unlocks it as well) */
fclose(ef);
edg_wll_Context context;
char *dest_name = NULL;
- edg_wll_InitContext(&context);
#endif
il_log(LOG_INFO, " attaching to event file: %s\n", filename);
return(0);
#if defined(IL_NOTIFICATIONS)
- if((ret=edg_wll_ParseNotifEvent(context, event_s, ¬if_event))) {
+ edg_wll_InitContext(&context);
+ ret=edg_wll_ParseNotifEvent(context, event_s, ¬if_event);
+ edg_wll_FreeContext(context);
+ if(ret) {
set_error(IL_LBAPI, ret, "event_store_from_file: could not parse event");
ret = -1;
goto out;
ret = -1;
goto out;
}
+ /* XXX: what was that good for?
if(notif_event->notification.dest_host &&
(strlen(notif_event->notification.dest_host) > 0)) {
asprintf(&dest_name, "%s:%d", notif_event->notification.dest_host, notif_event->notification.dest_port);
}
+ */
#else
job_id_s = edg_wll_GetJobId(event_s);
" -l, --log-server <host> specify address of log server\n"
" -s, --socket <path> non-default path of local socket\n"
" -L, --lazy [<timeout>] be lazy when closing connections to servers (default, timeout==0 means turn lazy off)\n"
+ " -p, --parallel [<num>] use <num> parallel streams to the same server\n"
#ifdef LB_PERF
" -n, --nosend PERFTEST: consume events instead of sending\n"
" -S, --nosync PERFTEST: do not check logd files for lost events\n"
int bs_only = 0;
int lazy_close = 1;
int default_close_timeout;
+int parallel = 0;
#ifdef LB_PERF
int nosend = 0, norecover=0, nosync=0, noparse=0;
char *event_source = NULL;
{"log-server", required_argument, 0, 'l'},
{"socket", required_argument, 0, 's'},
{"lazy", optional_argument, 0, 'L'},
+ {"parallel", optional_argument, 0, 'p'},
#ifdef LB_PERF
{"nosend", no_argument, 0, 'n'},
{"nosync", no_argument, 0, 'S'},
"b" /* only bookeeping */
"l:" /* log server */
"d" /* debug */
+ "p" /* parallel */
#ifdef LB_PERF
"n" /* nosend */
"S" /* nosync */
default_close_timeout = TIMEOUT;
break;
+ case 'p':
+ if(optarg)
+ parallel = atoi(optarg);
+ else
+ parallel = 4;
+ break;
+
#ifdef LB_PERF
case 'n':
nosend = 1;
extern int killflg;
extern int lazy_close;
extern int default_close_timeout;
+extern int parallel;
#ifdef LB_PERF
extern int nosend, nosync, norecover, noparse;
#ifdef PERF_EVENTS_INLINE
long last_committed_ls; /* -"- LS */
long offset; /* expected file position of next event */
time_t last_modified; /* time of the last file modification */
- int recovering; /* flag for recovery mode */
- pthread_rwlock_t update_lock; /* lock to prevent simultaneous updates */
+ int generation; /* cleanup counter, scopes the offset */
+ pthread_rwlock_t commit_lock; /* lock to prevent simultaneous updates to last_committed_* */
+ pthread_rwlock_t offset_lock; /* lock to prevent simultaneous updates offset */
pthread_rwlock_t use_lock; /* lock to prevent struct deallocation */
#if defined(IL_NOTIFICATIONS)
char *dest; /* host:port destination */
int len;
int ev_len;
struct event_store *es; /* cache for corresponding event store */
+ int generation; /* event store genereation */
long receipt_to; /* receiver (long local-logger id - LLLID) of delivery confirmation (for priority messages) */
#if defined(IL_NOTIFICATIONS)
char *dest_name;
struct event_store *event_store_find(char *);
int event_store_sync(struct event_store *, long);
int event_store_next(struct event_store *, long, int);
-int event_store_commit(struct event_store *, int, int);
+int event_store_commit(struct event_store *, int, int, int);
int event_store_recover(struct event_store *);
int event_store_release(struct event_store *);
/* int event_store_remove(struct event_store *); */