From: Michal Voců Date: Fri, 24 Oct 2008 15:46:43 +0000 (+0000) Subject: locking fixes, memory leaks X-Git-Tag: test_tag~103 X-Git-Url: http://scientific.zcu.cz/git/?a=commitdiff_plain;h=d51d79ff2503e71e5095080a1ba20733dc33b217;p=jra1mw.git locking fixes, memory leaks --- diff --git a/org.glite.lb.logger/src/event_queue.c b/org.glite.lb.logger/src/event_queue.c index b9345d3..5e59e68 100644 --- a/org.glite.lb.logger/src/event_queue.c +++ b/org.glite.lb.logger/src/event_queue.c @@ -291,8 +291,8 @@ event_queue_move_events(struct event_queue *eq_s, eq_s->tail = NULL; while(p) { if((*cmp_func)(p->msg, data)) { - il_log(LOG_DEBUG, " moving event at offset %d from %s:%d to %s:%d\n", - p->msg->offset, eq_s->dest_name, eq_s->dest_port, + il_log(LOG_DEBUG, " moving event at offset %d(%d) from %s:%d to %s:%d\n", + p->msg->offset, p->msg->generation, eq_s->dest_name, eq_s->dest_port, eq_d ? eq_d->dest_name : "trash", eq_d ? eq_d->dest_port : -1); il_log(LOG_DEBUG, " current: %x, next: %x\n", p, p->prev); /* remove the message from the source list */ @@ -305,7 +305,8 @@ event_queue_move_events(struct event_queue *eq_s, eq_d->tail = p; } else { /* signal that the message was 'delivered' */ - event_store_commit(p->msg->es, p->msg->ev_len, queue_list_is_log(eq_s)); + event_store_commit(p->msg->es, p->msg->ev_len, queue_list_is_log(eq_s), + p->msg->generation); /* free the message */ server_msg_free(p->msg); free(p); diff --git a/org.glite.lb.logger/src/event_store.c b/org.glite.lb.logger/src/event_store.c index 54ab4cd..4a93172 100644 --- a/org.glite.lb.logger/src/event_store.c +++ b/org.glite.lb.logger/src/event_store.c @@ -139,7 +139,8 @@ event_store_free(struct event_store *es) if(es->event_file_name) free(es->event_file_name); if(es->control_file_name) free(es->control_file_name); pthread_rwlock_destroy(&es->use_lock); - pthread_rwlock_destroy(&es->update_lock); + pthread_rwlock_destroy(&es->commit_lock); + pthread_rwlock_destroy(&es->offset_lock); free(es); return(0); @@ -175,7 +176,9 @@ event_store_create(char *job_id_s) es->control_file_name = jobid2controlfile(job_id); IL_EVENT_ID_FREE(job_id); - if(pthread_rwlock_init(&es->update_lock, NULL)) + if(pthread_rwlock_init(&es->commit_lock, NULL)) + abort(); + if(pthread_rwlock_init(&es->offset_lock, NULL)) abort(); if(pthread_rwlock_init(&es->use_lock, NULL)) abort(); @@ -190,7 +193,7 @@ event_store_lock_ro(struct event_store *es) { assert(es != NULL); - if(pthread_rwlock_rdlock(&es->update_lock)) + if(pthread_rwlock_rdlock(&es->commit_lock)) abort(); return(0); @@ -203,7 +206,7 @@ event_store_lock(struct event_store *es) { assert(es != NULL); - if(pthread_rwlock_wrlock(&es->update_lock)) + if(pthread_rwlock_wrlock(&es->commit_lock)) abort(); return(0); @@ -216,7 +219,7 @@ event_store_unlock(struct event_store *es) { assert(es != NULL); - if(pthread_rwlock_unlock(&es->update_lock)) + if(pthread_rwlock_unlock(&es->commit_lock)) abort(); return(0); } @@ -334,6 +337,9 @@ event_store_quarantine(struct event_store *es) es->last_committed_bs = 0; es->offset = 0; + /* increase cleanup count, this will invalidate all commits from previous generation */ + es->generation++; + return(0); } @@ -373,7 +379,10 @@ event_store_recover(struct event_store *es) eq_l = queue_list_get(NULL); #endif + /* lock the event_store and offset locks */ event_store_lock(es); + if(pthread_rwlock_wrlock(&es->offset_lock)) + abort(); il_log(LOG_DEBUG, " reading events from %s\n", es->event_file_name); @@ -385,6 +394,8 @@ event_store_recover(struct event_store *es) es->event_file_name); set_error(IL_SYS, errno, err_msg); event_store_unlock(es); + if(pthread_rwlock_unlock(&es->offset_lock)) + abort(); return(-1); } @@ -400,6 +411,8 @@ event_store_recover(struct event_store *es) es->event_file_name); set_error(IL_SYS, errno, err_msg); event_store_unlock(es); + if(pthread_rwlock_unlock(&es->offset_lock)) + abort(); fclose(ef); return(-1); } @@ -410,12 +423,16 @@ event_store_recover(struct event_store *es) il_log(LOG_ERR, " could not stat event file %s: %s\n", es->event_file_name, strerror(errno)); fclose(ef); event_store_unlock(es); + if(pthread_rwlock_unlock(&es->offset_lock)) + abort(); return -1; } else { if((es->offset == stbuf.st_size) && (es->last_modified == stbuf.st_mtime)) { il_log(LOG_DEBUG, " event file not modified since last visit, skipping\n"); fclose(ef); event_store_unlock(es); + if(pthread_rwlock_unlock(&es->offset_lock)) + abort(); return(0); } } @@ -454,6 +471,8 @@ event_store_recover(struct event_store *es) set_error(IL_SYS, errno, "event_store_recover: error setting position for read"); event_store_unlock(es); fclose(ef); + if(pthread_rwlock_unlock(&es->offset_lock)) + abort(); return(-1); } /* the last enqueued event MUST end with EVENT_SEPARATOR, @@ -484,20 +503,38 @@ event_store_recover(struct event_store *es) set_error(IL_SYS, errno, "event_store_recover: error setting position for read"); event_store_unlock(es); fclose(ef); + if(pthread_rwlock_unlock(&es->offset_lock)) + abort(); return(-1); } break; } } + /* now we have: + * - event file opened at position 'last' + * - offset and last_committed_* potentially reset to zero + */ + + /* release lock on commits, offset remains locked; + * other threads are allowed to send/remove events, but not insert + */ + event_store_unlock(es); + /* enqueue all remaining events */ ret = 1; msg = NULL; while((event_s=read_event_string(ef)) != NULL) { - + long last_ls, last_bs; + /* last holds the starting position of event_s in file */ il_log(LOG_DEBUG, " reading event at %ld\n", last); + event_store_lock_ro(es); + last_ls = es->last_committed_ls; + last_bs = es->last_committed_bs; + event_store_unlock(es); + /* break from now on means there was some error */ ret = -1; @@ -515,13 +552,15 @@ event_store_recover(struct event_store *es) /* actually do not bother if quarantine succeeded or not - we could not do more */ event_store_quarantine(es); fclose(ef); - event_store_unlock(es); + if(pthread_rwlock_unlock(&es->offset_lock)) + abort(); return(-1); } msg->es = es; + msg->generation = es->generation; /* first enqueue to the LS */ - if(!bs_only && (last >= es->last_committed_ls)) { + if(!bs_only && (last >= last_ls)) { il_log(LOG_DEBUG, " queueing event at %ld to logging server\n", last); @@ -537,7 +576,7 @@ event_store_recover(struct event_store *es) /* now enqueue to the BS, if neccessary */ if((eq_b != eq_l) && - (last >= es->last_committed_bs)) { + (last >= last_bs)) { il_log(LOG_DEBUG, " queueing event at %ld to bookkeeping server\n", last); @@ -555,7 +594,6 @@ event_store_recover(struct event_store *es) } /* while */ - /* due to this little assignment we had to lock the event_store for writing */ es->offset = last; es->last_modified = stbuf.st_mtime; il_log(LOG_DEBUG, " event store offset set to %ld\n", last); @@ -566,7 +604,9 @@ event_store_recover(struct event_store *es) fclose(ef); il_log(LOG_DEBUG, " finished reading events with %d\n", ret); - event_store_unlock(es); + if(pthread_rwlock_unlock(&es->offset_lock)) + abort(); + return(ret); } @@ -671,10 +711,17 @@ event_store_next(struct event_store *es, long offset, int len) * */ int -event_store_commit(struct event_store *es, int len, int ls) +event_store_commit(struct event_store *es, int len, int ls, int generation) { assert(es != NULL); + /* do not move counters if event store with this message was cleaned up + * (this can happen only when moving to quarantine) + */ + /* XXX - assume int access is atomic */ + if(generation != es->generation) + return 0; + event_store_lock(es); if(ls) @@ -717,7 +764,7 @@ event_store_clean(struct event_store *es) /* prevent sender threads from updating */ event_store_lock(es); - + il_log(LOG_DEBUG, " trying to cleanup event store %s\n", es->job_id_s); il_log(LOG_DEBUG, " bytes sent to logging server: %d\n", es->last_committed_ls); il_log(LOG_DEBUG, " bytes sent to bookkeeping server: %d\n", es->last_committed_bs); @@ -730,6 +777,11 @@ event_store_clean(struct event_store *es) return(0); } + if(fd = pthread_rwlock_wrlock(&es->offset_lock)) { + fprintf(stderr, "Fatal locking error: %s\n", strerror(fd)); + abort(); + } + /* the file can only be removed when all the events were succesfully sent (ie. committed both by LS and BS */ /* That also implies that the event queues are 'empty' at the moment. */ @@ -738,6 +790,8 @@ event_store_clean(struct event_store *es) /* if we can not open the event store, it is an error and the struct should be removed */ /* XXX - is it true? */ event_store_unlock(es); + if(pthread_rwlock_unlock(&es->offset_lock)) + abort(); il_log(LOG_ERR, " event_store_clean: error opening event file: %s\n", strerror(errno)); return(1); } @@ -753,6 +807,8 @@ event_store_clean(struct event_store *es) il_log(LOG_DEBUG, " could not lock event file, cleanup aborted\n"); fclose(ef); event_store_unlock(es); + if(pthread_rwlock_unlock(&es->offset_lock)) + abort(); if(errno != EACCES && errno != EAGAIN) { set_error(IL_SYS, errno, "event_store_clean: error locking event file"); @@ -766,6 +822,8 @@ event_store_clean(struct event_store *es) if(fseek(ef, 0, SEEK_END) < 0) { set_error(IL_SYS, errno, "event_store_clean: error seeking the end of file"); event_store_unlock(es); + if(pthread_rwlock_unlock(&es->offset_lock)) + abort(); fclose(ef); return(-1); } @@ -776,6 +834,8 @@ event_store_clean(struct event_store *es) if(es->last_committed_ls < last) { fclose(ef); event_store_unlock(es); + if(pthread_rwlock_unlock(&es->offset_lock)) + abort(); il_log(LOG_DEBUG, " events still waiting in queue, cleanup aborted\n"); return(0); } else if( es->last_committed_ls > last) { @@ -784,6 +844,8 @@ event_store_clean(struct event_store *es) some undelivered events referring to that event store */ fclose(ef); event_store_unlock(es); + if(pthread_rwlock_unlock(&es->offset_lock)) + abort(); return(0); } @@ -799,8 +861,14 @@ event_store_clean(struct event_store *es) es->last_committed_bs = 0; es->offset = 0; + /* increasing the generation count is rather pointless here, because there + are no messages waiting in the queue that would be invalidated */ + /* es->generation++ */ + /* unlock the event_store even if it is going to be removed */ event_store_unlock(es); + if(pthread_rwlock_unlock(&es->offset_lock)) + abort(); /* close the event file (that unlocks it as well) */ fclose(ef); @@ -898,7 +966,6 @@ event_store_from_file(char *filename) edg_wll_Context context; char *dest_name = NULL; - edg_wll_InitContext(&context); #endif il_log(LOG_INFO, " attaching to event file: %s\n", filename); @@ -919,7 +986,10 @@ event_store_from_file(char *filename) return(0); #if defined(IL_NOTIFICATIONS) - if((ret=edg_wll_ParseNotifEvent(context, event_s, ¬if_event))) { + edg_wll_InitContext(&context); + ret=edg_wll_ParseNotifEvent(context, event_s, ¬if_event); + edg_wll_FreeContext(context); + if(ret) { set_error(IL_LBAPI, ret, "event_store_from_file: could not parse event"); ret = -1; goto out; @@ -935,10 +1005,12 @@ event_store_from_file(char *filename) ret = -1; goto out; } + /* XXX: what was that good for? if(notif_event->notification.dest_host && (strlen(notif_event->notification.dest_host) > 0)) { asprintf(&dest_name, "%s:%d", notif_event->notification.dest_host, notif_event->notification.dest_port); } + */ #else job_id_s = edg_wll_GetJobId(event_s); diff --git a/org.glite.lb.logger/src/event_store_http.c b/org.glite.lb.logger/src/event_store_http.c index a34e0a3..ebd5523 100644 --- a/org.glite.lb.logger/src/event_store_http.c +++ b/org.glite.lb.logger/src/event_store_http.c @@ -128,7 +128,7 @@ event_store_free(struct event_store *es) if(es->event_file_name) free(es->event_file_name); if(es->control_file_name) free(es->control_file_name); pthread_rwlock_destroy(&es->use_lock); - pthread_rwlock_destroy(&es->update_lock); + pthread_rwlock_destroy(&es->commit_lock); free(es); return(0); @@ -155,7 +155,7 @@ event_store_create(char *job_id_s) es->event_file_name = jobid2eventfile(job_id_s); es->control_file_name = jobid2controlfile(job_id_s); - if(pthread_rwlock_init(&es->update_lock, NULL)) + if(pthread_rwlock_init(&es->commit_lock, NULL)) abort(); if(pthread_rwlock_init(&es->use_lock, NULL)) abort(); @@ -170,7 +170,7 @@ event_store_lock_ro(struct event_store *es) { assert(es != NULL); - if(pthread_rwlock_rdlock(&es->update_lock)) + if(pthread_rwlock_rdlock(&es->commit_lock)) abort(); return(0); @@ -183,7 +183,7 @@ event_store_lock(struct event_store *es) { assert(es != NULL); - if(pthread_rwlock_wrlock(&es->update_lock)) + if(pthread_rwlock_wrlock(&es->commit_lock)) abort(); return(0); @@ -196,7 +196,7 @@ event_store_unlock(struct event_store *es) { assert(es != NULL); - if(pthread_rwlock_unlock(&es->update_lock)) + if(pthread_rwlock_unlock(&es->commit_lock)) abort(); return(0); } diff --git a/org.glite.lb.logger/src/il_master.c b/org.glite.lb.logger/src/il_master.c index 0721aa5..c3b0307 100644 --- a/org.glite.lb.logger/src/il_master.c +++ b/org.glite.lb.logger/src/il_master.c @@ -359,7 +359,7 @@ handle_msg(il_octet_string_t *event, long offset) if((es = event_store_find(msg->job_id_s)) == NULL) return(-1); msg->es = es; - + #ifdef LB_PERF if(nosync) ret = 1; diff --git a/org.glite.lb.logger/src/interlogd.c b/org.glite.lb.logger/src/interlogd.c index 8bf4833..b247487 100644 --- a/org.glite.lb.logger/src/interlogd.c +++ b/org.glite.lb.logger/src/interlogd.c @@ -57,6 +57,7 @@ static void usage (int status) " -l, --log-server specify address of log server\n" " -s, --socket non-default path of local socket\n" " -L, --lazy [] be lazy when closing connections to servers (default, timeout==0 means turn lazy off)\n" + " -p, --parallel [] use parallel streams to the same server\n" #ifdef LB_PERF " -n, --nosend PERFTEST: consume events instead of sending\n" " -S, --nosync PERFTEST: do not check logd files for lost events\n" @@ -79,6 +80,7 @@ char *file_prefix = DEFAULT_PREFIX; int bs_only = 0; int lazy_close = 1; int default_close_timeout; +int parallel = 0; #ifdef LB_PERF int nosend = 0, norecover=0, nosync=0, noparse=0; char *event_source = NULL; @@ -105,6 +107,7 @@ static struct option const long_options[] = {"log-server", required_argument, 0, 'l'}, {"socket", required_argument, 0, 's'}, {"lazy", optional_argument, 0, 'L'}, + {"parallel", optional_argument, 0, 'p'}, #ifdef LB_PERF {"nosend", no_argument, 0, 'n'}, {"nosync", no_argument, 0, 'S'}, @@ -140,6 +143,7 @@ decode_switches (int argc, char **argv) "b" /* only bookeeping */ "l:" /* log server */ "d" /* debug */ + "p" /* parallel */ #ifdef LB_PERF "n" /* nosend */ "S" /* nosync */ @@ -211,6 +215,13 @@ decode_switches (int argc, char **argv) default_close_timeout = TIMEOUT; break; + case 'p': + if(optarg) + parallel = atoi(optarg); + else + parallel = 4; + break; + #ifdef LB_PERF case 'n': nosend = 1; diff --git a/org.glite.lb.logger/src/interlogd.h b/org.glite.lb.logger/src/interlogd.h index c0af5b3..d3d81b6 100644 --- a/org.glite.lb.logger/src/interlogd.h +++ b/org.glite.lb.logger/src/interlogd.h @@ -78,6 +78,7 @@ extern int bs_only; extern int killflg; extern int lazy_close; extern int default_close_timeout; +extern int parallel; #ifdef LB_PERF extern int nosend, nosync, norecover, noparse; #ifdef PERF_EVENTS_INLINE @@ -122,8 +123,9 @@ struct event_store { long last_committed_ls; /* -"- LS */ long offset; /* expected file position of next event */ time_t last_modified; /* time of the last file modification */ - int recovering; /* flag for recovery mode */ - pthread_rwlock_t update_lock; /* lock to prevent simultaneous updates */ + int generation; /* cleanup counter, scopes the offset */ + pthread_rwlock_t commit_lock; /* lock to prevent simultaneous updates to last_committed_* */ + pthread_rwlock_t offset_lock; /* lock to prevent simultaneous updates offset */ pthread_rwlock_t use_lock; /* lock to prevent struct deallocation */ #if defined(IL_NOTIFICATIONS) char *dest; /* host:port destination */ @@ -138,6 +140,7 @@ struct server_msg { int len; int ev_len; struct event_store *es; /* cache for corresponding event store */ + int generation; /* event store genereation */ long receipt_to; /* receiver (long local-logger id - LLLID) of delivery confirmation (for priority messages) */ #if defined(IL_NOTIFICATIONS) char *dest_name; @@ -244,7 +247,7 @@ int event_store_recover_all(void); struct event_store *event_store_find(char *); int event_store_sync(struct event_store *, long); int event_store_next(struct event_store *, long, int); -int event_store_commit(struct event_store *, int, int); +int event_store_commit(struct event_store *, int, int, int); int event_store_recover(struct event_store *); int event_store_release(struct event_store *); /* int event_store_remove(struct event_store *); */ diff --git a/org.glite.lb.logger/src/perftest_il.sh b/org.glite.lb.logger/src/perftest_il.sh index 6121555..318b9e8 100644 --- a/org.glite.lb.logger/src/perftest_il.sh +++ b/org.glite.lb.logger/src/perftest_il.sh @@ -15,7 +15,7 @@ DEBUG=${DEBUG:-0} # CONSUMER_ARGS= # PERFTEST_COMPONENT= # COMPONENT_ARGS= -#LOGJOBS_ARGS="" +LOGJOBS_ARGS="-s /tmp/interlogger.perftest" check_test_files || exit 1 diff --git a/org.glite.lb.logger/src/queue_mgr.c b/org.glite.lb.logger/src/queue_mgr.c index 2134a32..2879e14 100644 --- a/org.glite.lb.logger/src/queue_mgr.c +++ b/org.glite.lb.logger/src/queue_mgr.c @@ -89,7 +89,7 @@ queue_list_add(struct queue_list **ql, const char *dest, struct event_queue *eq) return(-1); } el->queue = eq; - el->next = queues; + el->next = *ql; *ql = el; return 0; } diff --git a/org.glite.lb.logger/src/send_event.c b/org.glite.lb.logger/src/send_event.c index d33d541..d0e546b 100644 --- a/org.glite.lb.logger/src/send_event.c +++ b/org.glite.lb.logger/src/send_event.c @@ -336,7 +336,7 @@ event_queue_send(struct event_queue *eq) default: /* LB_DBERR, LB_PROTO */ /* the event was not accepted by the server */ /* update the event pointer */ - if(event_store_commit(msg->es, msg->ev_len, queue_list_is_log(eq)) < 0) + if(event_store_commit(msg->es, msg->ev_len, queue_list_is_log(eq), msg->generation) < 0) /* failure committing message, this is bad */ return(-1); /* if we have just delivered priority message from the queue, send confirmation */ diff --git a/org.glite.lb.logger/src/server_msg.c b/org.glite.lb.logger/src/server_msg.c index 90dec49..21fa23d 100644 --- a/org.glite.lb.logger/src/server_msg.c +++ b/org.glite.lb.logger/src/server_msg.c @@ -144,13 +144,16 @@ server_msg_init(struct server_msg *msg, il_octet_string_t *event) #if defined(IL_NOTIFICATIONS) - edg_wll_InitContext(&context); /* parse the notification event */ - if((ret=edg_wll_ParseNotifEvent(context, event->data, ¬if_event))) { + edg_wll_InitContext(&context); + ret=edg_wll_ParseNotifEvent(context, event->data, ¬if_event); + edg_wll_FreeContext(context); + if(ret) { set_error(IL_LBAPI, ret, "server_msg_init: error parsing notification event"); return(-1); } + /* FIXME: check for allocation error */ if(notif_event->notification.dest_host && (strlen(notif_event->notification.dest_host) > 0)) {