From 6b009b17777d899aeab2535993937a6157fcca6b Mon Sep 17 00:00:00 2001 From: =?utf8?q?Michal=20Voc=C5=AF?= Date: Wed, 1 Apr 2009 21:34:30 +0000 Subject: [PATCH] infrastructure for event file rotation --- org.glite.lb.logger/src/event_store.c | 432 +++++++++++++++++++++++----------- org.glite.lb.logger/src/interlogd.h | 1 + 2 files changed, 300 insertions(+), 133 deletions(-) diff --git a/org.glite.lb.logger/src/event_store.c b/org.glite.lb.logger/src/event_store.c index 1b6c657..0c98382 100644 --- a/org.glite.lb.logger/src/event_store.c +++ b/org.glite.lb.logger/src/event_store.c @@ -29,7 +29,9 @@ static char *file_prefix = NULL; struct event_store_list { struct event_store *es; - struct event_store_list *next; + struct event_store_list *next; // LL of event_store's + struct event_store_list *jobid_next; // double LL of event_store's for this jobid - forward + struct event_store_list *jobid_prev; // double LL of event_store's for this jobid - backward }; @@ -43,6 +45,20 @@ static pthread_rwlock_t store_list_lock = PTHREAD_RWLOCK_INITIALIZER; */ static char * +astrcat(const char *s1, const char *s2) +{ + char *s = malloc(strlen(s1) + strlen(s2) + 1); + if(s == NULL) + return NULL; + *s = 0; + strcat(s, s1); + strcat(s, s2); + return s; +} + + +static +char * jobid2eventfile(IL_EVENT_ID_T job_id) { char *buffer; @@ -52,9 +68,9 @@ jobid2eventfile(IL_EVENT_ID_T job_id) hash = IL_EVENT_GET_UNIQUE(job_id); asprintf(&buffer, "%s.%s", file_prefix, hash); free(hash); - } else + } else asprintf(&buffer, "%s.default", file_prefix); - + return(buffer); } @@ -70,12 +86,31 @@ jobid2controlfile(IL_EVENT_ID_T job_id) hash = IL_EVENT_GET_UNIQUE(job_id); snprintf(buffer, 256, "%s.%s.ctl", file_prefix, hash); free(hash); - } else + } else snprintf(buffer, 256, "%s.default.ctl", file_prefix); - + return(strdup(buffer)); } +static +int +fname2index(const char *filename) +{ + char *p = rindex(filename, '.'); + char *s; + + if(p == NULL) + return 0; + + for(s = p+1; *s != NULL; s++) { + if(*s < '0' || *s > '9') { + return 0; + } + } + + return atoi(p+1); +} + static char * @@ -93,7 +128,7 @@ read_event_string(FILE *file) len = 1024; while((c=fgetc(file)) != EOF) { - + /* we have to have free room for one byte */ /* if(len - (p - buffer) < 1) */ if(p - buffer >= len) { @@ -112,7 +147,7 @@ read_event_string(FILE *file) *p++ = 0; break; } else - *p++ = (char) c; + *p++ = (char) c; } if(c != EVENT_SEPARATOR) { @@ -149,7 +184,7 @@ event_store_free(struct event_store *es) static struct event_store * -event_store_create(char *job_id_s) +event_store_create(char *job_id_s, const char *filename) { struct event_store *es; IL_EVENT_ID_T job_id; @@ -162,8 +197,6 @@ event_store_create(char *job_id_s) memset(es, 0, sizeof(*es)); - il_log(LOG_DEBUG, " creating event store for id %s\n", job_id_s); - job_id = NULL; if(strcmp(job_id_s, "default") && IL_EVENT_ID_PARSE(job_id_s, &job_id)) { set_error(IL_LBAPI, EDG_WLL_ERROR_PARSE_BROKEN_ULM, "event_store_create: error parsing id"); @@ -172,15 +205,18 @@ event_store_create(char *job_id_s) } es->job_id_s = strdup(job_id_s); - es->event_file_name = jobid2eventfile(job_id); - es->control_file_name = jobid2controlfile(job_id); + es->event_file_name = filename ? strdup(filename) : jobid2eventfile(job_id); + es->control_file_name = filename ? astrcat(filename, ".ctl") : jobid2controlfile(job_id); + es->rotate_index = filename ? fname2index(filename) : 0; IL_EVENT_ID_FREE(job_id); - if(pthread_rwlock_init(&es->commit_lock, NULL)) + il_log(LOG_DEBUG, " creating event store for id %s, filename %s\n", job_id_s, es->event_file_name); + + if(pthread_rwlock_init(&es->commit_lock, NULL)) abort(); - if(pthread_rwlock_init(&es->offset_lock, NULL)) + if(pthread_rwlock_init(&es->offset_lock, NULL)) abort(); - if(pthread_rwlock_init(&es->use_lock, NULL)) + if(pthread_rwlock_init(&es->use_lock, NULL)) abort(); return(es); @@ -193,7 +229,7 @@ event_store_lock_ro(struct event_store *es) { assert(es != NULL); - if(pthread_rwlock_rdlock(&es->commit_lock)) + if(pthread_rwlock_rdlock(&es->commit_lock)) abort(); return(0); @@ -206,7 +242,7 @@ event_store_lock(struct event_store *es) { assert(es != NULL); - if(pthread_rwlock_wrlock(&es->commit_lock)) + if(pthread_rwlock_wrlock(&es->commit_lock)) abort(); return(0); @@ -265,8 +301,8 @@ event_store_write_ctl(struct event_store *es) return(-1); } - if(fprintf(ctl, "%s\n%ld\n%ld\n", - es->job_id_s, + if(fprintf(ctl, "%s\n%ld\n%ld\n", + es->job_id_s, es->last_committed_ls, es->last_committed_bs) < 0) { set_error(IL_SYS, errno, "event_store_write_ctl: error writing control record"); @@ -283,20 +319,23 @@ event_store_write_ctl(struct event_store *es) /* - * event_store_qurantine() - * - rename damaged event store file - * - essentially does the same actions as cleanup, but the event store + * event_store_qurantine() + * - rename damaged event store file + * - essentially does the same actions as cleanup, but the event store * does not have to be empty * returns 0 on success, -1 on error */ static int -event_store_quarantine(struct event_store *es) +event_store_quarantine(struct event_store *es) { + // TODO enable cleanup of quarantined event_store struct + // TODO handle file rotation + int num; char newname[MAXPATHLEN+1]; - /* find available qurantine name */ + /* find available quarantine name */ /* we give it at most 1024 tries */ for(num = 0; num < 1024; num++) { struct stat st; @@ -309,9 +348,9 @@ event_store_quarantine(struct event_store *es) break; } else { /* some other error with name, probably permanent */ - set_error(IL_SYS, errno, "event_store_qurantine: error looking for qurantine filename"); + set_error(IL_SYS, errno, "event_store_qurantine: error looking for quarantine filename"); return(-1); - + } } else { /* the filename is used already */ @@ -345,6 +384,77 @@ event_store_quarantine(struct event_store *es) /* + * event_store_rotate_file() + * returns 0 on success, -1 on error + */ +static +int +event_store_rotate_file(struct event_store *es) +{ + int num; + char newname[MAXPATHLEN+1]; + + /* do not rotate already rotated files */ + if(es->rotate_index > 0) + return 0; + + /* find available name */ + /* we give it at most 1024 tries */ + for(num = 0; num < 1024; num++) { + struct stat st; + + snprintf(newname, MAXPATHLEN, "%s.%d", es->event_file_name, num); + newname[MAXPATHLEN] = 0; + if(stat(newname, &st) < 0) { + if(errno == ENOENT) { + /* file not found */ + break; + } else { + /* some other error with name, probably permanent */ + set_error(IL_SYS, errno, "event_store_rotate_file: error looking for available filename"); + return(-1); + + } + } else { + /* the filename is used already */ + } + } + if(num >= 1024) { + /* new name not found */ + /* XXX - is there more suitable error? */ + set_error(IL_SYS, ENOSPC, "event_store_quarantine: exhausted number of retries looking for quarantine filename"); + return(-1); + } + + /* actually rename the file */ + il_log(LOG_DEBUG, " renaming too large event file from %s to %s\n", + es->event_file_name, newname); + if(rename(es->event_file_name, newname) < 0) { + set_error(IL_SYS, errno, "event_store_rotate_file: error renaming event file"); + return(-1); + } + + /* change names in event_store */ + es->event_file_name = strdup(newname); + es->control_file_name = astrcat(newname, ".ctl"); + + return(0); +} + + +/* + * event_store_recover_jobid() + * - recover all event stores for given jobid + */ +static +int +event_store_recover_jobid(struct event_store *es) +{ + // TODO check locking of store list +} + + +/* * event_store_recover() * - recover after restart or catch up when events missing in IPC * - if offset > 0, read everything behind it @@ -364,13 +474,13 @@ event_store_recover(struct event_store *es) struct stat stbuf; assert(es != NULL); - + #if defined(IL_NOTIFICATIONS) /* destination queue has to be found for each message separately */ #else - /* find bookkepping server queue */ + /* find bookkeeping server queue */ eq_b = queue_list_get(es->job_id_s); - if(eq_b == NULL) + if(eq_b == NULL) return(-1); #endif @@ -389,12 +499,12 @@ event_store_recover(struct event_store *es) /* open event file */ ef = fopen(es->event_file_name, "r"); if(ef == NULL) { - snprintf(err_msg, sizeof(err_msg), + snprintf(err_msg, sizeof(err_msg), "event_store_recover: error opening event file %s", es->event_file_name); set_error(IL_SYS, errno, err_msg); event_store_unlock(es); - if(pthread_rwlock_unlock(&es->offset_lock)) + if(pthread_rwlock_unlock(&es->offset_lock)) abort(); return(-1); } @@ -406,12 +516,12 @@ event_store_recover(struct event_store *es) efl.l_start = 0; efl.l_len = 0; if(fcntl(fd, F_SETLKW, &efl) < 0) { - snprintf(err_msg, sizeof(err_msg), + snprintf(err_msg, sizeof(err_msg), "event_store_recover: error locking event file %s", es->event_file_name); set_error(IL_SYS, errno, err_msg); event_store_unlock(es); - if(pthread_rwlock_unlock(&es->offset_lock)) + if(pthread_rwlock_unlock(&es->offset_lock)) abort(); fclose(ef); return(-1); @@ -423,7 +533,7 @@ event_store_recover(struct event_store *es) il_log(LOG_ERR, " could not stat event file %s: %s\n", es->event_file_name, strerror(errno)); fclose(ef); event_store_unlock(es); - if(pthread_rwlock_unlock(&es->offset_lock)) + if(pthread_rwlock_unlock(&es->offset_lock)) abort(); return -1; } else { @@ -431,12 +541,17 @@ event_store_recover(struct event_store *es) il_log(LOG_DEBUG, " event file not modified since last visit, skipping\n"); fclose(ef); event_store_unlock(es); - if(pthread_rwlock_unlock(&es->offset_lock)) + if(pthread_rwlock_unlock(&es->offset_lock)) abort(); return(0); } } + /* check the file size, rename it if it is bigger than max_store_size */ + if(stbuf.st_size > max_store_size) { + event_store_rotate_file(es); + } + while(1) { /* try, try, try */ /* get the position in file to be sought */ @@ -444,7 +559,7 @@ event_store_recover(struct event_store *es) last = es->offset; else { #if !defined(IL_NOTIFICATIONS) - if(eq_b == eq_l) + if(eq_b == eq_l) last = es->last_committed_ls; else #endif @@ -471,7 +586,7 @@ event_store_recover(struct event_store *es) set_error(IL_SYS, errno, "event_store_recover: error setting position for read"); event_store_unlock(es); fclose(ef); - if(pthread_rwlock_unlock(&es->offset_lock)) + if(pthread_rwlock_unlock(&es->offset_lock)) abort(); return(-1); } @@ -479,7 +594,7 @@ event_store_recover(struct event_store *es) even if the offset points at EOF */ if((c=fgetc(ef)) != EVENT_SEPARATOR) { /* Houston, we have got a problem */ - il_log(LOG_WARNING, + il_log(LOG_WARNING, " file position %ld does not point at the beginning of event string, backing off!\n", last); /* now, where were we? */ @@ -503,7 +618,7 @@ event_store_recover(struct event_store *es) set_error(IL_SYS, errno, "event_store_recover: error setting position for read"); event_store_unlock(es); fclose(ef); - if(pthread_rwlock_unlock(&es->offset_lock)) + if(pthread_rwlock_unlock(&es->offset_lock)) abort(); return(-1); } @@ -511,7 +626,7 @@ event_store_recover(struct event_store *es) } } - /* now we have: + /* now we have: * - event file opened at position 'last' * - offset and last_committed_* potentially reset to zero */ @@ -550,7 +665,7 @@ event_store_recover(struct event_store *es) /* actually do not bother if quarantine succeeded or not - we could not do more */ event_store_quarantine(es); fclose(ef); - if(pthread_rwlock_unlock(&es->offset_lock)) + if(pthread_rwlock_unlock(&es->offset_lock)) abort(); return(-1); } @@ -559,8 +674,8 @@ event_store_recover(struct event_store *es) /* first enqueue to the LS */ if(!bs_only && (last >= last_ls)) { - - il_log(LOG_DEBUG, " queueing event at %ld to logging server\n", last); + + il_log(LOG_DEBUG, " queuing event at %ld to logging server\n", last); #if !defined(IL_NOTIFICATIONS) if(enqueue_msg(eq_l, msg) < 0) @@ -573,11 +688,11 @@ event_store_recover(struct event_store *es) #endif /* now enqueue to the BS, if neccessary */ - if((eq_b != eq_l) && + if((eq_b != eq_l) && (last >= last_bs)) { - + il_log(LOG_DEBUG, " queueing event at %ld to bookkeeping server\n", last); - + if(enqueue_msg(eq_b, msg) < 0) break; } @@ -596,13 +711,13 @@ event_store_recover(struct event_store *es) es->last_modified = stbuf.st_mtime; il_log(LOG_DEBUG, " event store offset set to %ld\n", last); - if(msg) + if(msg) server_msg_free(msg); fclose(ef); il_log(LOG_DEBUG, " finished reading events with %d\n", ret); - if(pthread_rwlock_unlock(&es->offset_lock)) + if(pthread_rwlock_unlock(&es->offset_lock)) abort(); return(ret); @@ -628,13 +743,13 @@ event_store_sync(struct event_store *es, long offset) * event will be read from file, socket now serves only to notify * about possible event file change. */ - ret = event_store_recover(es); + ret = event_store_recover_jobid(es); ret = (ret < 0) ? ret : 0; return(ret); #if 0 event_store_lock_ro(es); - if(es->offset == offset) + if(es->offset == offset) /* we are up to date */ ret = 1; else if(es->offset > offset) @@ -644,9 +759,9 @@ event_store_sync(struct event_store *es, long offset) /* es->offset < offset, i.e. we have missed some events */ event_store_unlock(es); ret = event_store_recover(es); - /* XXX possible room for intervention by another thread - is there - * any other thread messing with us? - * 1) After recover() es->offset is set at the end of file. + /* XXX possible room for intervention by another thread - is there + * any other thread messing with us? + * 1) After recover() es->offset is set at the end of file. * 2) es->offset is set only by recover() and next(). * 3) Additional recover can not do much harm. * 4) And next() is only called by the same thread as sync(). @@ -655,7 +770,7 @@ event_store_sync(struct event_store *es, long offset) event_store_lock_ro(es); if(ret < 0) ret = -1; - else + else if(es->offset <= offset) { /* Apparently there is something wrong - we are receiving an event * which is beyond the end of file. Someone must have removed the file @@ -680,16 +795,16 @@ int event_store_next(struct event_store *es, long offset, int len) { assert(es != NULL); - + /* Commented out due to the fact that offset as received on socket * has little to do with real event file at the moment. es->offset * handling is left solely to the event_store_recover(). */ - + #if 0 event_store_lock(es); /* Whoa, be careful now. The es->offset points right after the last enqueued event, - * but it may not be the offset of the event WE have just enqueued, because:! + * but it may not be the offset of the event WE have just enqueued, because:! * 1) someone could have removed the event file behind our back * 2) the file could have been recover()ed and more events read * In either case the offset should not be moved. @@ -704,7 +819,7 @@ event_store_next(struct event_store *es, long offset, int len) } -/* +/* * event_store_commit() * */ @@ -713,11 +828,11 @@ event_store_commit(struct event_store *es, int len, int ls, int generation) { assert(es != NULL); - /* do not move counters if event store with this message was cleaned up + /* do not move counters if event store with this message was cleaned up * (this can happen only when moving to quarantine) */ /* XXX - assume int access is atomic */ - if(generation != es->generation) + if(generation != es->generation) return 0; event_store_lock(es); @@ -749,7 +864,7 @@ event_store_commit(struct event_store *es, int len, int ls, int generation) * Q: How do we know that we can safely remove the files? * A: When all events from file have been committed both by LS and BS. */ -static +static int event_store_clean(struct event_store *es) { @@ -780,7 +895,7 @@ event_store_clean(struct event_store *es) abort(); } - /* the file can only be removed when all the events were succesfully sent + /* the file can only be removed when all the events were succesfully sent (ie. committed both by LS and BS */ /* That also implies that the event queues are 'empty' at the moment. */ ef = fopen(es->event_file_name, "r+"); @@ -788,14 +903,14 @@ event_store_clean(struct event_store *es) /* if we can not open the event store, it is an error and the struct should be removed */ /* XXX - is it true? */ event_store_unlock(es); - if(pthread_rwlock_unlock(&es->offset_lock)) + if(pthread_rwlock_unlock(&es->offset_lock)) abort(); il_log(LOG_ERR, " event_store_clean: error opening event file: %s\n", strerror(errno)); return(1); } - + fd = fileno(ef); - + /* prevent local-logger from writing into event file */ efl.l_type = F_WRLCK; efl.l_whence = SEEK_SET; @@ -805,7 +920,7 @@ event_store_clean(struct event_store *es) il_log(LOG_DEBUG, " could not lock event file, cleanup aborted\n"); fclose(ef); event_store_unlock(es); - if(pthread_rwlock_unlock(&es->offset_lock)) + if(pthread_rwlock_unlock(&es->offset_lock)) abort(); if(errno != EACCES && errno != EAGAIN) { @@ -814,25 +929,25 @@ event_store_clean(struct event_store *es) } return(0); } - + /* now the file should not contain partially written event, so it is safe to get offset behind last event by seeking the end of file */ if(fseek(ef, 0, SEEK_END) < 0) { set_error(IL_SYS, errno, "event_store_clean: error seeking the end of file"); event_store_unlock(es); - if(pthread_rwlock_unlock(&es->offset_lock)) + if(pthread_rwlock_unlock(&es->offset_lock)) abort(); fclose(ef); return(-1); } - + last = ftell(ef); il_log(LOG_DEBUG, " total bytes in file: %d\n", last); if(es->last_committed_ls < last) { fclose(ef); event_store_unlock(es); - if(pthread_rwlock_unlock(&es->offset_lock)) + if(pthread_rwlock_unlock(&es->offset_lock)) abort(); il_log(LOG_DEBUG, " events still waiting in queue, cleanup aborted\n"); return(0); @@ -842,18 +957,18 @@ event_store_clean(struct event_store *es) some undelivered events referring to that event store */ fclose(ef); event_store_unlock(es); - if(pthread_rwlock_unlock(&es->offset_lock)) + if(pthread_rwlock_unlock(&es->offset_lock)) abort(); return(0); } - + /* now we are sure that all events were sent and the event queues are empty */ il_log(LOG_INFO, " removing event file %s\n", es->event_file_name); - + /* remove the event file */ unlink(es->event_file_name); unlink(es->control_file_name); - + /* clear the counters */ es->last_committed_ls = 0; es->last_committed_bs = 0; @@ -865,7 +980,7 @@ event_store_clean(struct event_store *es) /* unlock the event_store even if it is going to be removed */ event_store_unlock(es); - if(pthread_rwlock_unlock(&es->offset_lock)) + if(pthread_rwlock_unlock(&es->offset_lock)) abort(); /* close the event file (that unlocks it as well) */ @@ -881,9 +996,9 @@ event_store_clean(struct event_store *es) * -------------------------------- */ struct event_store * -event_store_find(char *job_id_s) +event_store_find(char *job_id_s, const char *filename = NULL) { - struct event_store_list *q, *p; + struct event_store_list *q, *p, *d; struct event_store *es; if(pthread_rwlock_wrlock(&store_list_lock)) { @@ -891,27 +1006,30 @@ event_store_find(char *job_id_s) } es = NULL; - - q = NULL; + + d = NULL; p = store_list; - + while(p) { if(strcmp(p->es->job_id_s, job_id_s) == 0) { - es = p->es; - if(pthread_rwlock_rdlock(&es->use_lock)) - abort(); - if(pthread_rwlock_unlock(&store_list_lock)) - abort(); - return(es); - } - - q = p; + es = p->es; + d = p; + // if filename was given, compare it as well + if(filename == NULL || strcmp(p->es->event_file_name, filename) != 0) { + if(pthread_rwlock_rdlock(&es->use_lock)) + abort(); + if(pthread_rwlock_unlock(&store_list_lock)) + abort(); + return(es); + } + } p = p->next; } - es = event_store_create(job_id_s); + // event store for given jobid and filename was not found, create one + es = event_store_create(job_id_s, filename); if(es == NULL) { - if(pthread_rwlock_unlock(&store_list_lock)) + if(pthread_rwlock_unlock(&store_list_lock)) abort(); return(NULL); } @@ -919,20 +1037,63 @@ event_store_find(char *job_id_s) p = malloc(sizeof(*p)); if(p == NULL) { set_error(IL_NOMEM, ENOMEM, "event_store_find: no room for new event store"); - if(pthread_rwlock_unlock(&store_list_lock)) + if(pthread_rwlock_unlock(&store_list_lock)) abort(); return(NULL); } - - p->next = store_list; - store_list = p; - p->es = es; + if(filename != NULL && d != NULL) { + // there is another event store for this jobid; + // d points to the last event store for this jobid in LL + // find proper place to insert new event store + if(p->es->rotate_index == 0) { + // insert behind d in LL + p->next = d->next; + d->next = p; + // insert behind d in jobid LL + p->jobid_next = d->jobid_next; + p->jobid_prev = d; + d->jobid_next->jobid_prev = p; + d->jobid_next = p; + } else { + struct event_store_list *r; + q = NULL; + for(r = d->jobid_next; r != d->jobid_next; r = r->jobid_next) { + if(p->es->rotate_index < r->es->rotate_index) + break; + if(r->es->rotate_index > 0) + q = r; + } + // q has the last lesser non-zero index than p + if(q == NULL) { + p->next = store_list; + store_list = p; + // insert behind d + p->jobid_next = d->jobid_next; + p->jobid_prev = d; + d->jobid_next->jobid_prev = p; + d->jobid_next = p; + } else { + p->next = q->next; + q->next = p; + // insert behind q + p->jobid_next = q->jobid_next; + p->jobid_prev = q; + q->jobid_next->jobid_prev = p; + q->jobid_next = p; + } + } + } else { + // insert at the beginning + p->next = store_list; + store_list = p; + } + if(pthread_rwlock_rdlock(&es->use_lock)) abort(); - if(pthread_rwlock_unlock(&store_list_lock)) + if(pthread_rwlock_unlock(&store_list_lock)) abort(); return(es); @@ -965,9 +1126,9 @@ event_store_from_file(char *filename) char *dest_name = NULL; #endif - + il_log(LOG_INFO, " attaching to event file: %s\n", filename); - + if(strstr(filename, "quarantine") != NULL) { il_log(LOG_INFO, " file name belongs to quarantine, not touching that.\n"); return(0); @@ -980,9 +1141,9 @@ event_store_from_file(char *filename) } event_s = read_event_string(event_file); fclose(event_file); - if(event_s == NULL) + if(event_s == NULL) return(0); - + #if defined(IL_NOTIFICATIONS) edg_wll_InitContext(&context); ret=edg_wll_ParseNotifEvent(context, event_s, ¬if_event); @@ -993,7 +1154,7 @@ event_store_from_file(char *filename) goto out; } if(notif_event->notification.notifId == NULL) { - set_error(IL_LBAPI, EDG_WLL_ERROR_PARSE_BROKEN_ULM, + set_error(IL_LBAPI, EDG_WLL_ERROR_PARSE_BROKEN_ULM, "event_store_from_file: parse error - no notif id"); ret = -1; goto out; @@ -1004,12 +1165,12 @@ event_store_from_file(char *filename) goto out; } /* XXX: what was that good for? - if(notif_event->notification.dest_host && + if(notif_event->notification.dest_host && (strlen(notif_event->notification.dest_host) > 0)) { asprintf(&dest_name, "%s:%d", notif_event->notification.dest_host, notif_event->notification.dest_port); } */ - + #else job_id_s = edg_wll_GetJobId(event_s); #endif @@ -1019,9 +1180,9 @@ event_store_from_file(char *filename) ret = 0; goto out; } - - es=event_store_find(job_id_s); - + + es = event_store_find(job_id_s, filename); + if(es == NULL) { ret = -1; goto out; @@ -1031,9 +1192,9 @@ event_store_from_file(char *filename) (es->last_committed_bs == 0) && (es->offset == 0)) { ret = event_store_read_ctl(es); - } else + } else ret = 0; - + event_store_release(es); out: @@ -1043,7 +1204,7 @@ out: free(notif_event); } #endif - if(event_s) free(event_s); + if(event_s) free(event_s); if(job_id_s) free(job_id_s); return(ret); } @@ -1085,12 +1246,12 @@ event_store_init(char *prefix) set_error(IL_SYS, errno, "event_store_init: error opening event directory"); return(-1); } - + while((entry=readdir(event_dir))) { char *s; /* skip all files that do not match prefix */ - if(strncmp(entry->d_name, p, len) != 0) + if(strncmp(entry->d_name, p, len) != 0) continue; /* skip all control files */ @@ -1128,12 +1289,12 @@ event_store_init(char *prefix) set_error(IL_SYS, errno, "event_store_init: error opening event directory"); return(-1); } - + while((entry=readdir(event_dir))) { char *s; /* skip all files that do not match prefix */ - if(strncmp(entry->d_name, p, len) != 0) + if(strncmp(entry->d_name, p, len) != 0) continue; /* find all control files */ @@ -1164,9 +1325,9 @@ event_store_init(char *prefix) /* could not stat file, remove ctl */ strcat(ef, s); il_log(LOG_DEBUG, " removing stale file %s\n", ef); - if(unlink(ef)) + if(unlink(ef)) il_log(LOG_ERR, " could not remove file %s: %s\n", ef, strerror(errno)); - + } free(ef); @@ -1186,7 +1347,7 @@ event_store_recover_all() struct event_store_list *sl; - if(pthread_rwlock_rdlock(&store_list_lock)) + if(pthread_rwlock_rdlock(&store_list_lock)) abort(); /* recover all event stores */ @@ -1201,15 +1362,15 @@ event_store_recover_all() } sl = sl->next; } - - if(pthread_rwlock_unlock(&store_list_lock)) + + if(pthread_rwlock_unlock(&store_list_lock)) abort(); return(0); } -#if 0 +#if 0 int event_store_remove(struct event_store *es) { @@ -1221,7 +1382,7 @@ event_store_remove(struct event_store *es) case 0: il_log(LOG_DEBUG, " event store not removed, still used\n"); return(0); - + case 1: if(pthread_rwlock_wrlock(&store_list_lock) < 0) { set_error(IL_SYS, errno, " event_store_remove: error locking event store list"); @@ -1265,7 +1426,7 @@ event_store_cleanup() /* try to remove event files */ - if(pthread_rwlock_wrlock(&store_list_lock)) + if(pthread_rwlock_wrlock(&store_list_lock)) abort(); sl = store_list; @@ -1275,11 +1436,11 @@ event_store_cleanup() int ret; slnext = sl->next; - + /* one event store at time */ ret = pthread_rwlock_trywrlock(&sl->es->use_lock); if(ret == EBUSY) { - il_log(LOG_DEBUG, " event_store %s is in use by another thread\n", + il_log(LOG_DEBUG, " event_store %s is in use by another thread\n", sl->es->job_id_s); sl = slnext; continue; @@ -1287,33 +1448,38 @@ event_store_cleanup() abort(); switch(event_store_clean(sl->es)) { - + case 1: - /* remove this event store */ + /* remove this event store from LL */ (*prev) = slnext; + /* remove this event store from jobid's LL */ + if(sl->jobid_next != sl) { + sl->jobid_prev->jobid_next = sl->jobid_next; + sl->jobid_next->jobid_prev = sl->jobid_prev; + } event_store_free(sl->es); free(sl); break; - + case -1: - il_log(LOG_ERR, " error removing event store %s (file %s):\n %s\n", + il_log(LOG_ERR, " error removing event store %s (file %s):\n %s\n", sl->es->job_id_s, sl->es->event_file_name, error_get_msg()); /* event_store_release(sl->es); */ clear_error(); /* go on to the next */ - + default: event_store_release(sl->es); prev = &(sl->next); break; } - + sl = slnext; } - - if(pthread_rwlock_unlock(&store_list_lock)) + + if(pthread_rwlock_unlock(&store_list_lock)) abort(); - + return(0); } diff --git a/org.glite.lb.logger/src/interlogd.h b/org.glite.lb.logger/src/interlogd.h index df3ed84..0a13297 100644 --- a/org.glite.lb.logger/src/interlogd.h +++ b/org.glite.lb.logger/src/interlogd.h @@ -125,6 +125,7 @@ struct event_store { long offset; /* expected file position of next event */ time_t last_modified; /* time of the last file modification */ int generation; /* cleanup counter, scopes the offset */ + int rotate_index; /* rotation counter */ pthread_rwlock_t commit_lock; /* lock to prevent simultaneous updates to last_committed_* */ pthread_rwlock_t offset_lock; /* lock to prevent simultaneous updates offset */ pthread_rwlock_t use_lock; /* lock to prevent struct deallocation */ -- 1.8.2.3