struct event_store_list {
struct event_store *es;
- struct event_store_list *next;
+ struct event_store_list *next; // LL of event_store's
+ struct event_store_list *jobid_next; // double LL of event_store's for this jobid - forward
+ struct event_store_list *jobid_prev; // double LL of event_store's for this jobid - backward
};
*/
static
char *
+astrcat(const char *s1, const char *s2)
+{
+ char *s = malloc(strlen(s1) + strlen(s2) + 1);
+ if(s == NULL)
+ return NULL;
+ *s = 0;
+ strcat(s, s1);
+ strcat(s, s2);
+ return s;
+}
+
+
+static
+char *
jobid2eventfile(IL_EVENT_ID_T job_id)
{
char *buffer;
hash = IL_EVENT_GET_UNIQUE(job_id);
asprintf(&buffer, "%s.%s", file_prefix, hash);
free(hash);
- } else
+ } else
asprintf(&buffer, "%s.default", file_prefix);
-
+
return(buffer);
}
hash = IL_EVENT_GET_UNIQUE(job_id);
snprintf(buffer, 256, "%s.%s.ctl", file_prefix, hash);
free(hash);
- } else
+ } else
snprintf(buffer, 256, "%s.default.ctl", file_prefix);
-
+
return(strdup(buffer));
}
+static
+int
+fname2index(const char *filename)
+{
+ char *p = rindex(filename, '.');
+ char *s;
+
+ if(p == NULL)
+ return 0;
+
+ for(s = p+1; *s != NULL; s++) {
+ if(*s < '0' || *s > '9') {
+ return 0;
+ }
+ }
+
+ return atoi(p+1);
+}
+
static
char *
len = 1024;
while((c=fgetc(file)) != EOF) {
-
+
/* we have to have free room for one byte */
/* if(len - (p - buffer) < 1) */
if(p - buffer >= len) {
*p++ = 0;
break;
} else
- *p++ = (char) c;
+ *p++ = (char) c;
}
if(c != EVENT_SEPARATOR) {
static
struct event_store *
-event_store_create(char *job_id_s)
+event_store_create(char *job_id_s, const char *filename)
{
struct event_store *es;
IL_EVENT_ID_T job_id;
memset(es, 0, sizeof(*es));
- il_log(LOG_DEBUG, " creating event store for id %s\n", job_id_s);
-
job_id = NULL;
if(strcmp(job_id_s, "default") && IL_EVENT_ID_PARSE(job_id_s, &job_id)) {
set_error(IL_LBAPI, EDG_WLL_ERROR_PARSE_BROKEN_ULM, "event_store_create: error parsing id");
}
es->job_id_s = strdup(job_id_s);
- es->event_file_name = jobid2eventfile(job_id);
- es->control_file_name = jobid2controlfile(job_id);
+ es->event_file_name = filename ? strdup(filename) : jobid2eventfile(job_id);
+ es->control_file_name = filename ? astrcat(filename, ".ctl") : jobid2controlfile(job_id);
+ es->rotate_index = filename ? fname2index(filename) : 0;
IL_EVENT_ID_FREE(job_id);
- if(pthread_rwlock_init(&es->commit_lock, NULL))
+ il_log(LOG_DEBUG, " creating event store for id %s, filename %s\n", job_id_s, es->event_file_name);
+
+ if(pthread_rwlock_init(&es->commit_lock, NULL))
abort();
- if(pthread_rwlock_init(&es->offset_lock, NULL))
+ if(pthread_rwlock_init(&es->offset_lock, NULL))
abort();
- if(pthread_rwlock_init(&es->use_lock, NULL))
+ if(pthread_rwlock_init(&es->use_lock, NULL))
abort();
return(es);
{
assert(es != NULL);
- if(pthread_rwlock_rdlock(&es->commit_lock))
+ if(pthread_rwlock_rdlock(&es->commit_lock))
abort();
return(0);
{
assert(es != NULL);
- if(pthread_rwlock_wrlock(&es->commit_lock))
+ if(pthread_rwlock_wrlock(&es->commit_lock))
abort();
return(0);
return(-1);
}
- if(fprintf(ctl, "%s\n%ld\n%ld\n",
- es->job_id_s,
+ if(fprintf(ctl, "%s\n%ld\n%ld\n",
+ es->job_id_s,
es->last_committed_ls,
es->last_committed_bs) < 0) {
set_error(IL_SYS, errno, "event_store_write_ctl: error writing control record");
/*
- * event_store_qurantine()
- * - rename damaged event store file
- * - essentially does the same actions as cleanup, but the event store
+ * event_store_qurantine()
+ * - rename damaged event store file
+ * - essentially does the same actions as cleanup, but the event store
* does not have to be empty
* returns 0 on success, -1 on error
*/
static
int
-event_store_quarantine(struct event_store *es)
+event_store_quarantine(struct event_store *es)
{
+ // TODO enable cleanup of quarantined event_store struct
+ // TODO handle file rotation
+
int num;
char newname[MAXPATHLEN+1];
- /* find available qurantine name */
+ /* find available quarantine name */
/* we give it at most 1024 tries */
for(num = 0; num < 1024; num++) {
struct stat st;
break;
} else {
/* some other error with name, probably permanent */
- set_error(IL_SYS, errno, "event_store_qurantine: error looking for qurantine filename");
+ set_error(IL_SYS, errno, "event_store_qurantine: error looking for quarantine filename");
return(-1);
-
+
}
} else {
/* the filename is used already */
/*
+ * event_store_rotate_file()
+ * returns 0 on success, -1 on error
+ */
+static
+int
+event_store_rotate_file(struct event_store *es)
+{
+ int num;
+ char newname[MAXPATHLEN+1];
+
+ /* do not rotate already rotated files */
+ if(es->rotate_index > 0)
+ return 0;
+
+ /* find available name */
+ /* we give it at most 1024 tries */
+ for(num = 0; num < 1024; num++) {
+ struct stat st;
+
+ snprintf(newname, MAXPATHLEN, "%s.%d", es->event_file_name, num);
+ newname[MAXPATHLEN] = 0;
+ if(stat(newname, &st) < 0) {
+ if(errno == ENOENT) {
+ /* file not found */
+ break;
+ } else {
+ /* some other error with name, probably permanent */
+ set_error(IL_SYS, errno, "event_store_rotate_file: error looking for available filename");
+ return(-1);
+
+ }
+ } else {
+ /* the filename is used already */
+ }
+ }
+ if(num >= 1024) {
+ /* new name not found */
+ /* XXX - is there more suitable error? */
+ set_error(IL_SYS, ENOSPC, "event_store_quarantine: exhausted number of retries looking for quarantine filename");
+ return(-1);
+ }
+
+ /* actually rename the file */
+ il_log(LOG_DEBUG, " renaming too large event file from %s to %s\n",
+ es->event_file_name, newname);
+ if(rename(es->event_file_name, newname) < 0) {
+ set_error(IL_SYS, errno, "event_store_rotate_file: error renaming event file");
+ return(-1);
+ }
+
+ /* change names in event_store */
+ es->event_file_name = strdup(newname);
+ es->control_file_name = astrcat(newname, ".ctl");
+
+ return(0);
+}
+
+
+/*
+ * event_store_recover_jobid()
+ * - recover all event stores for given jobid
+ */
+static
+int
+event_store_recover_jobid(struct event_store *es)
+{
+ // TODO check locking of store list
+}
+
+
+/*
* event_store_recover()
* - recover after restart or catch up when events missing in IPC
* - if offset > 0, read everything behind it
struct stat stbuf;
assert(es != NULL);
-
+
#if defined(IL_NOTIFICATIONS)
/* destination queue has to be found for each message separately */
#else
- /* find bookkepping server queue */
+ /* find bookkeeping server queue */
eq_b = queue_list_get(es->job_id_s);
- if(eq_b == NULL)
+ if(eq_b == NULL)
return(-1);
#endif
/* open event file */
ef = fopen(es->event_file_name, "r");
if(ef == NULL) {
- snprintf(err_msg, sizeof(err_msg),
+ snprintf(err_msg, sizeof(err_msg),
"event_store_recover: error opening event file %s",
es->event_file_name);
set_error(IL_SYS, errno, err_msg);
event_store_unlock(es);
- if(pthread_rwlock_unlock(&es->offset_lock))
+ if(pthread_rwlock_unlock(&es->offset_lock))
abort();
return(-1);
}
efl.l_start = 0;
efl.l_len = 0;
if(fcntl(fd, F_SETLKW, &efl) < 0) {
- snprintf(err_msg, sizeof(err_msg),
+ snprintf(err_msg, sizeof(err_msg),
"event_store_recover: error locking event file %s",
es->event_file_name);
set_error(IL_SYS, errno, err_msg);
event_store_unlock(es);
- if(pthread_rwlock_unlock(&es->offset_lock))
+ if(pthread_rwlock_unlock(&es->offset_lock))
abort();
fclose(ef);
return(-1);
il_log(LOG_ERR, " could not stat event file %s: %s\n", es->event_file_name, strerror(errno));
fclose(ef);
event_store_unlock(es);
- if(pthread_rwlock_unlock(&es->offset_lock))
+ if(pthread_rwlock_unlock(&es->offset_lock))
abort();
return -1;
} else {
il_log(LOG_DEBUG, " event file not modified since last visit, skipping\n");
fclose(ef);
event_store_unlock(es);
- if(pthread_rwlock_unlock(&es->offset_lock))
+ if(pthread_rwlock_unlock(&es->offset_lock))
abort();
return(0);
}
}
+ /* check the file size, rename it if it is bigger than max_store_size */
+ if(stbuf.st_size > max_store_size) {
+ event_store_rotate_file(es);
+ }
+
while(1) { /* try, try, try */
/* get the position in file to be sought */
last = es->offset;
else {
#if !defined(IL_NOTIFICATIONS)
- if(eq_b == eq_l)
+ if(eq_b == eq_l)
last = es->last_committed_ls;
else
#endif
set_error(IL_SYS, errno, "event_store_recover: error setting position for read");
event_store_unlock(es);
fclose(ef);
- if(pthread_rwlock_unlock(&es->offset_lock))
+ if(pthread_rwlock_unlock(&es->offset_lock))
abort();
return(-1);
}
even if the offset points at EOF */
if((c=fgetc(ef)) != EVENT_SEPARATOR) {
/* Houston, we have got a problem */
- il_log(LOG_WARNING,
+ il_log(LOG_WARNING,
" file position %ld does not point at the beginning of event string, backing off!\n",
last);
/* now, where were we? */
set_error(IL_SYS, errno, "event_store_recover: error setting position for read");
event_store_unlock(es);
fclose(ef);
- if(pthread_rwlock_unlock(&es->offset_lock))
+ if(pthread_rwlock_unlock(&es->offset_lock))
abort();
return(-1);
}
}
}
- /* now we have:
+ /* now we have:
* - event file opened at position 'last'
* - offset and last_committed_* potentially reset to zero
*/
/* actually do not bother if quarantine succeeded or not - we could not do more */
event_store_quarantine(es);
fclose(ef);
- if(pthread_rwlock_unlock(&es->offset_lock))
+ if(pthread_rwlock_unlock(&es->offset_lock))
abort();
return(-1);
}
/* first enqueue to the LS */
if(!bs_only && (last >= last_ls)) {
-
- il_log(LOG_DEBUG, " queueing event at %ld to logging server\n", last);
+
+ il_log(LOG_DEBUG, " queuing event at %ld to logging server\n", last);
#if !defined(IL_NOTIFICATIONS)
if(enqueue_msg(eq_l, msg) < 0)
#endif
/* now enqueue to the BS, if neccessary */
- if((eq_b != eq_l) &&
+ if((eq_b != eq_l) &&
(last >= last_bs)) {
-
+
il_log(LOG_DEBUG, " queueing event at %ld to bookkeeping server\n", last);
-
+
if(enqueue_msg(eq_b, msg) < 0)
break;
}
es->last_modified = stbuf.st_mtime;
il_log(LOG_DEBUG, " event store offset set to %ld\n", last);
- if(msg)
+ if(msg)
server_msg_free(msg);
fclose(ef);
il_log(LOG_DEBUG, " finished reading events with %d\n", ret);
- if(pthread_rwlock_unlock(&es->offset_lock))
+ if(pthread_rwlock_unlock(&es->offset_lock))
abort();
return(ret);
* event will be read from file, socket now serves only to notify
* about possible event file change.
*/
- ret = event_store_recover(es);
+ ret = event_store_recover_jobid(es);
ret = (ret < 0) ? ret : 0;
return(ret);
#if 0
event_store_lock_ro(es);
- if(es->offset == offset)
+ if(es->offset == offset)
/* we are up to date */
ret = 1;
else if(es->offset > offset)
/* es->offset < offset, i.e. we have missed some events */
event_store_unlock(es);
ret = event_store_recover(es);
- /* XXX possible room for intervention by another thread - is there
- * any other thread messing with us?
- * 1) After recover() es->offset is set at the end of file.
+ /* XXX possible room for intervention by another thread - is there
+ * any other thread messing with us?
+ * 1) After recover() es->offset is set at the end of file.
* 2) es->offset is set only by recover() and next().
* 3) Additional recover can not do much harm.
* 4) And next() is only called by the same thread as sync().
event_store_lock_ro(es);
if(ret < 0)
ret = -1;
- else
+ else
if(es->offset <= offset) {
/* Apparently there is something wrong - we are receiving an event
* which is beyond the end of file. Someone must have removed the file
event_store_next(struct event_store *es, long offset, int len)
{
assert(es != NULL);
-
+
/* Commented out due to the fact that offset as received on socket
* has little to do with real event file at the moment. es->offset
* handling is left solely to the event_store_recover().
*/
-
+
#if 0
event_store_lock(es);
/* Whoa, be careful now. The es->offset points right after the last enqueued event,
- * but it may not be the offset of the event WE have just enqueued, because:!
+ * but it may not be the offset of the event WE have just enqueued, because:!
* 1) someone could have removed the event file behind our back
* 2) the file could have been recover()ed and more events read
* In either case the offset should not be moved.
}
-/*
+/*
* event_store_commit()
*
*/
{
assert(es != NULL);
- /* do not move counters if event store with this message was cleaned up
+ /* do not move counters if event store with this message was cleaned up
* (this can happen only when moving to quarantine)
*/
/* XXX - assume int access is atomic */
- if(generation != es->generation)
+ if(generation != es->generation)
return 0;
event_store_lock(es);
* Q: How do we know that we can safely remove the files?
* A: When all events from file have been committed both by LS and BS.
*/
-static
+static
int
event_store_clean(struct event_store *es)
{
abort();
}
- /* the file can only be removed when all the events were succesfully sent
+ /* the file can only be removed when all the events were succesfully sent
(ie. committed both by LS and BS */
/* That also implies that the event queues are 'empty' at the moment. */
ef = fopen(es->event_file_name, "r+");
/* if we can not open the event store, it is an error and the struct should be removed */
/* XXX - is it true? */
event_store_unlock(es);
- if(pthread_rwlock_unlock(&es->offset_lock))
+ if(pthread_rwlock_unlock(&es->offset_lock))
abort();
il_log(LOG_ERR, " event_store_clean: error opening event file: %s\n", strerror(errno));
return(1);
}
-
+
fd = fileno(ef);
-
+
/* prevent local-logger from writing into event file */
efl.l_type = F_WRLCK;
efl.l_whence = SEEK_SET;
il_log(LOG_DEBUG, " could not lock event file, cleanup aborted\n");
fclose(ef);
event_store_unlock(es);
- if(pthread_rwlock_unlock(&es->offset_lock))
+ if(pthread_rwlock_unlock(&es->offset_lock))
abort();
if(errno != EACCES &&
errno != EAGAIN) {
}
return(0);
}
-
+
/* now the file should not contain partially written event, so it is safe
to get offset behind last event by seeking the end of file */
if(fseek(ef, 0, SEEK_END) < 0) {
set_error(IL_SYS, errno, "event_store_clean: error seeking the end of file");
event_store_unlock(es);
- if(pthread_rwlock_unlock(&es->offset_lock))
+ if(pthread_rwlock_unlock(&es->offset_lock))
abort();
fclose(ef);
return(-1);
}
-
+
last = ftell(ef);
il_log(LOG_DEBUG, " total bytes in file: %d\n", last);
if(es->last_committed_ls < last) {
fclose(ef);
event_store_unlock(es);
- if(pthread_rwlock_unlock(&es->offset_lock))
+ if(pthread_rwlock_unlock(&es->offset_lock))
abort();
il_log(LOG_DEBUG, " events still waiting in queue, cleanup aborted\n");
return(0);
some undelivered events referring to that event store */
fclose(ef);
event_store_unlock(es);
- if(pthread_rwlock_unlock(&es->offset_lock))
+ if(pthread_rwlock_unlock(&es->offset_lock))
abort();
return(0);
}
-
+
/* now we are sure that all events were sent and the event queues are empty */
il_log(LOG_INFO, " removing event file %s\n", es->event_file_name);
-
+
/* remove the event file */
unlink(es->event_file_name);
unlink(es->control_file_name);
-
+
/* clear the counters */
es->last_committed_ls = 0;
es->last_committed_bs = 0;
/* unlock the event_store even if it is going to be removed */
event_store_unlock(es);
- if(pthread_rwlock_unlock(&es->offset_lock))
+ if(pthread_rwlock_unlock(&es->offset_lock))
abort();
/* close the event file (that unlocks it as well) */
* --------------------------------
*/
struct event_store *
-event_store_find(char *job_id_s)
+event_store_find(char *job_id_s, const char *filename = NULL)
{
- struct event_store_list *q, *p;
+ struct event_store_list *q, *p, *d;
struct event_store *es;
if(pthread_rwlock_wrlock(&store_list_lock)) {
}
es = NULL;
-
- q = NULL;
+
+ d = NULL;
p = store_list;
-
+
while(p) {
if(strcmp(p->es->job_id_s, job_id_s) == 0) {
- es = p->es;
- if(pthread_rwlock_rdlock(&es->use_lock))
- abort();
- if(pthread_rwlock_unlock(&store_list_lock))
- abort();
- return(es);
- }
-
- q = p;
+ es = p->es;
+ d = p;
+ // if filename was given, compare it as well
+ if(filename == NULL || strcmp(p->es->event_file_name, filename) != 0) {
+ if(pthread_rwlock_rdlock(&es->use_lock))
+ abort();
+ if(pthread_rwlock_unlock(&store_list_lock))
+ abort();
+ return(es);
+ }
+ }
p = p->next;
}
- es = event_store_create(job_id_s);
+ // event store for given jobid and filename was not found, create one
+ es = event_store_create(job_id_s, filename);
if(es == NULL) {
- if(pthread_rwlock_unlock(&store_list_lock))
+ if(pthread_rwlock_unlock(&store_list_lock))
abort();
return(NULL);
}
p = malloc(sizeof(*p));
if(p == NULL) {
set_error(IL_NOMEM, ENOMEM, "event_store_find: no room for new event store");
- if(pthread_rwlock_unlock(&store_list_lock))
+ if(pthread_rwlock_unlock(&store_list_lock))
abort();
return(NULL);
}
-
- p->next = store_list;
- store_list = p;
-
p->es = es;
+ if(filename != NULL && d != NULL) {
+ // there is another event store for this jobid;
+ // d points to the last event store for this jobid in LL
+ // find proper place to insert new event store
+ if(p->es->rotate_index == 0) {
+ // insert behind d in LL
+ p->next = d->next;
+ d->next = p;
+ // insert behind d in jobid LL
+ p->jobid_next = d->jobid_next;
+ p->jobid_prev = d;
+ d->jobid_next->jobid_prev = p;
+ d->jobid_next = p;
+ } else {
+ struct event_store_list *r;
+ q = NULL;
+ for(r = d->jobid_next; r != d->jobid_next; r = r->jobid_next) {
+ if(p->es->rotate_index < r->es->rotate_index)
+ break;
+ if(r->es->rotate_index > 0)
+ q = r;
+ }
+ // q has the last lesser non-zero index than p
+ if(q == NULL) {
+ p->next = store_list;
+ store_list = p;
+ // insert behind d
+ p->jobid_next = d->jobid_next;
+ p->jobid_prev = d;
+ d->jobid_next->jobid_prev = p;
+ d->jobid_next = p;
+ } else {
+ p->next = q->next;
+ q->next = p;
+ // insert behind q
+ p->jobid_next = q->jobid_next;
+ p->jobid_prev = q;
+ q->jobid_next->jobid_prev = p;
+ q->jobid_next = p;
+ }
+ }
+ } else {
+ // insert at the beginning
+ p->next = store_list;
+ store_list = p;
+ }
+
if(pthread_rwlock_rdlock(&es->use_lock))
abort();
- if(pthread_rwlock_unlock(&store_list_lock))
+ if(pthread_rwlock_unlock(&store_list_lock))
abort();
return(es);
char *dest_name = NULL;
#endif
-
+
il_log(LOG_INFO, " attaching to event file: %s\n", filename);
-
+
if(strstr(filename, "quarantine") != NULL) {
il_log(LOG_INFO, " file name belongs to quarantine, not touching that.\n");
return(0);
}
event_s = read_event_string(event_file);
fclose(event_file);
- if(event_s == NULL)
+ if(event_s == NULL)
return(0);
-
+
#if defined(IL_NOTIFICATIONS)
edg_wll_InitContext(&context);
ret=edg_wll_ParseNotifEvent(context, event_s, ¬if_event);
goto out;
}
if(notif_event->notification.notifId == NULL) {
- set_error(IL_LBAPI, EDG_WLL_ERROR_PARSE_BROKEN_ULM,
+ set_error(IL_LBAPI, EDG_WLL_ERROR_PARSE_BROKEN_ULM,
"event_store_from_file: parse error - no notif id");
ret = -1;
goto out;
goto out;
}
/* XXX: what was that good for?
- if(notif_event->notification.dest_host &&
+ if(notif_event->notification.dest_host &&
(strlen(notif_event->notification.dest_host) > 0)) {
asprintf(&dest_name, "%s:%d", notif_event->notification.dest_host, notif_event->notification.dest_port);
}
*/
-
+
#else
job_id_s = edg_wll_GetJobId(event_s);
#endif
ret = 0;
goto out;
}
-
- es=event_store_find(job_id_s);
-
+
+ es = event_store_find(job_id_s, filename);
+
if(es == NULL) {
ret = -1;
goto out;
(es->last_committed_bs == 0) &&
(es->offset == 0)) {
ret = event_store_read_ctl(es);
- } else
+ } else
ret = 0;
-
+
event_store_release(es);
out:
free(notif_event);
}
#endif
- if(event_s) free(event_s);
+ if(event_s) free(event_s);
if(job_id_s) free(job_id_s);
return(ret);
}
set_error(IL_SYS, errno, "event_store_init: error opening event directory");
return(-1);
}
-
+
while((entry=readdir(event_dir))) {
char *s;
/* skip all files that do not match prefix */
- if(strncmp(entry->d_name, p, len) != 0)
+ if(strncmp(entry->d_name, p, len) != 0)
continue;
/* skip all control files */
set_error(IL_SYS, errno, "event_store_init: error opening event directory");
return(-1);
}
-
+
while((entry=readdir(event_dir))) {
char *s;
/* skip all files that do not match prefix */
- if(strncmp(entry->d_name, p, len) != 0)
+ if(strncmp(entry->d_name, p, len) != 0)
continue;
/* find all control files */
/* could not stat file, remove ctl */
strcat(ef, s);
il_log(LOG_DEBUG, " removing stale file %s\n", ef);
- if(unlink(ef))
+ if(unlink(ef))
il_log(LOG_ERR, " could not remove file %s: %s\n", ef, strerror(errno));
-
+
}
free(ef);
struct event_store_list *sl;
- if(pthread_rwlock_rdlock(&store_list_lock))
+ if(pthread_rwlock_rdlock(&store_list_lock))
abort();
/* recover all event stores */
}
sl = sl->next;
}
-
- if(pthread_rwlock_unlock(&store_list_lock))
+
+ if(pthread_rwlock_unlock(&store_list_lock))
abort();
return(0);
}
-#if 0
+#if 0
int
event_store_remove(struct event_store *es)
{
case 0:
il_log(LOG_DEBUG, " event store not removed, still used\n");
return(0);
-
+
case 1:
if(pthread_rwlock_wrlock(&store_list_lock) < 0) {
set_error(IL_SYS, errno, " event_store_remove: error locking event store list");
/* try to remove event files */
- if(pthread_rwlock_wrlock(&store_list_lock))
+ if(pthread_rwlock_wrlock(&store_list_lock))
abort();
sl = store_list;
int ret;
slnext = sl->next;
-
+
/* one event store at time */
ret = pthread_rwlock_trywrlock(&sl->es->use_lock);
if(ret == EBUSY) {
- il_log(LOG_DEBUG, " event_store %s is in use by another thread\n",
+ il_log(LOG_DEBUG, " event_store %s is in use by another thread\n",
sl->es->job_id_s);
sl = slnext;
continue;
abort();
switch(event_store_clean(sl->es)) {
-
+
case 1:
- /* remove this event store */
+ /* remove this event store from LL */
(*prev) = slnext;
+ /* remove this event store from jobid's LL */
+ if(sl->jobid_next != sl) {
+ sl->jobid_prev->jobid_next = sl->jobid_next;
+ sl->jobid_next->jobid_prev = sl->jobid_prev;
+ }
event_store_free(sl->es);
free(sl);
break;
-
+
case -1:
- il_log(LOG_ERR, " error removing event store %s (file %s):\n %s\n",
+ il_log(LOG_ERR, " error removing event store %s (file %s):\n %s\n",
sl->es->job_id_s, sl->es->event_file_name, error_get_msg());
/* event_store_release(sl->es); */
clear_error();
/* go on to the next */
-
+
default:
event_store_release(sl->es);
prev = &(sl->next);
break;
}
-
+
sl = slnext;
}
-
- if(pthread_rwlock_unlock(&store_list_lock))
+
+ if(pthread_rwlock_unlock(&store_list_lock))
abort();
-
+
return(0);
}