From a29f33bde46a592b1522214724f5fc5e40fd32fd Mon Sep 17 00:00:00 2001 From: =?utf8?q?Michal=20Voc=C5=AF?= Date: Mon, 25 Oct 2010 19:55:01 +0000 Subject: [PATCH] implementation of parallel delivery to server --- org.glite.lb.logger/Makefile | 6 +- org.glite.lb.logger/interface/interlogd.h | 50 +++--- org.glite.lb.logger/src/event_queue.c | 174 +++++++++++++++++++-- org.glite.lb.logger/src/event_store.c | 2 +- org.glite.lb.logger/src/il_master.c | 3 +- org.glite.lb.logger/src/input_queue_socket.c | 2 +- org.glite.lb.logger/src/interlogd.c | 7 +- org.glite.lb.logger/src/perftest_il.sh | 12 +- org.glite.lb.logger/src/queue_mgr.c | 17 +- org.glite.lb.logger/src/queue_thread.c | 129 ++++++++++----- org.glite.lb.logger/src/recover.c | 4 + org.glite.lb.logger/src/send_event.c | 59 +++---- org.glite.lb.logger/test/IlTestBase.cpp | 9 +- org.glite.lb.logger/test/event_queueTest.cpp | 91 ++++++----- org.glite.lb.logger/test/il_test.cpp | 16 ++ .../test/input_queue_socketTest.cpp | 6 +- org.glite.lb.logger/test/server_msgTest.cpp | 4 +- 17 files changed, 415 insertions(+), 176 deletions(-) diff --git a/org.glite.lb.logger/Makefile b/org.glite.lb.logger/Makefile index 541eca0..22fbfff 100644 --- a/org.glite.lb.logger/Makefile +++ b/org.glite.lb.logger/Makefile @@ -71,7 +71,7 @@ EXT_LIB:=-lglite_lbu_trio -lglite_lbu_log GLITE_GSS_LIB:=-lglite_security_gss -TEST_LIBS:=-L${cppunit_prefix}/${libdir} -lcppunit +TEST_LIBS:=-L${cppunit_prefix}/lib -lcppunit TEST_INC:=-I${cppunit_prefix}/include LOGD_OBJS:= logd_proto.o logd.o @@ -106,7 +106,7 @@ INTERLOG_TEST_OBJS:= \ input_queue_socket.o \ input_queue_socketTest.o \ send_event.o \ - plugin_mgr. o \ + plugin_mgr.o \ event_queue.o \ event_queueTest.o \ IlTestBase.o \ @@ -172,7 +172,7 @@ check.ll: -echo commented out -- fix needed check.il: ${INTERLOG_TEST_OBJS} - ${LINKXX} -o $@ ${COMMON_LIB}_${thrflavour} ${GLITE_GSS_LIB}_${nothrflavour} ${TEST_LIBS} -lpthread $+ + ${LINKXX} -o $@ ${COMMON_LIB}_${thrflavour} ${TEST_LIBS} ${EXT_LIB} -lpthread $+ install: -mkdir -p ${PREFIX}/bin diff --git a/org.glite.lb.logger/interface/interlogd.h b/org.glite.lb.logger/interface/interlogd.h index fc8d95f..8ddb5b9 100644 --- a/org.glite.lb.logger/interface/interlogd.h +++ b/org.glite.lb.logger/interface/interlogd.h @@ -182,18 +182,23 @@ struct server_msg { }; -struct event_queue { +struct queue_thread { + pthread_t thread_id; /* id's of associated threads */ edg_wll_GssConnection gss; /* GSS connection */ + char *jobid; + int timeout; /* queue timeout */ + int first_event_sent; /* connection can be preempted by server */ + struct event_queue_msg *current; /* current message being sent */ +}; + +struct event_queue { char *dest_name; int dest_port; - char *dest; - int timeout; /* queue timeout */ - struct event_queue_msg *tail; /* last message in the queue */ + char *dest; struct event_queue_msg *head; /* first message in the queue */ struct event_queue_msg *tail_ems; /* last priority message in the queue (or NULL) */ - struct event_queue_msg *mark_this; /* mark message for removal */ - struct event_queue_msg *mark_prev; /* predecessor of the marked message */ - pthread_t thread_id; /* id of associated thread */ + int num_threads; /* number of delivery threads */ + struct queue_thread *thread; /* info for delivery threads */ pthread_rwlock_t update_lock; /* mutex for queue updates */ pthread_mutex_t cond_lock; /* mutex for condition variable */ pthread_cond_t ready_cond; /* condition variable for message arrival */ @@ -204,19 +209,18 @@ struct event_queue { int times_empty; /* number of times the queue was emptied */ int max_len; /* max queue length */ int cur_len; /* current length */ - int throttling; /* event insertion suspend flag */ - int first_event_sent; /* connection can be preempted by server */ + int throttling; /* event insertion suspend flag */ /* delivery methods */ - int (*event_queue_connect)(struct event_queue *); - int (*event_queue_send)(struct event_queue *); - int (*event_queue_close)(struct event_queue *); - void *plugin_data; /* opaque data used by output plugins */ + int (*event_queue_connect)(struct event_queue *, struct queue_thread *); + int (*event_queue_send)(struct event_queue *, struct queue_thread *); + int (*event_queue_close)(struct event_queue *, struct queue_thread *); + void *plugin_data; /* opaque data used by output plugins */ }; struct il_output_plugin { - int (*event_queue_connect)(struct event_queue *); - int (*event_queue_send)(struct event_queue *); - int (*event_queue_close)(struct event_queue *); + int (*event_queue_connect)(struct event_queue *, struct queue_thread *); + int (*event_queue_send)(struct event_queue *, struct queue_thread *); + int (*event_queue_close)(struct event_queue *, struct queue_thread *); int (*plugin_init)(char *); int (*plugin_supports_scheme)(const char *); }; @@ -238,27 +242,27 @@ struct event_queue *event_queue_create(char *, struct il_output_plugin *); int event_queue_free(struct event_queue *); int event_queue_empty(struct event_queue *); int event_queue_insert(struct event_queue *, struct server_msg *); -int event_queue_get(struct event_queue *, struct server_msg **); -int event_queue_remove(struct event_queue *); +int event_queue_get(struct event_queue *, struct queue_thread *, struct server_msg **); +int event_queue_remove(struct event_queue *, struct queue_thread *); int event_queue_enqueue(struct event_queue *, char *); /* helper */ int enqueue_msg(struct event_queue *, struct server_msg *); int event_queue_move_events(struct event_queue *, struct event_queue *, int (*)(struct server_msg *, void *), void *); /* protocol event queue methods */ -int event_queue_connect(struct event_queue *); -int event_queue_send(struct event_queue *); -int event_queue_close(struct event_queue *); +int event_queue_connect(struct event_queue *, struct queue_thread *); +int event_queue_send(struct event_queue *, struct queue_thread *); +int event_queue_close(struct event_queue *, struct queue_thread *); int send_confirmation(long, int); /* thread event queue methods */ -int event_queue_create_thread(struct event_queue *); +int event_queue_create_thread(struct event_queue *, int); int event_queue_lock(struct event_queue *); int event_queue_unlock(struct event_queue *); int event_queue_lock_ro(struct event_queue *); int event_queue_signal(struct event_queue *); int event_queue_wait(struct event_queue *, int); -int event_queue_sleep(struct event_queue *); +int event_queue_sleep(struct event_queue *, int); int event_queue_wakeup(struct event_queue *); int event_queue_cond_lock(struct event_queue *); int event_queue_cond_unlock(struct event_queue *); diff --git a/org.glite.lb.logger/src/event_queue.c b/org.glite.lb.logger/src/event_queue.c index 0f7bd32..58bee20 100644 --- a/org.glite.lb.logger/src/event_queue.c +++ b/org.glite.lb.logger/src/event_queue.c @@ -36,6 +36,7 @@ limitations under the License. struct event_queue_msg { struct server_msg *msg; struct event_queue_msg *prev; + struct event_queue_msg *next; }; struct event_queue * @@ -132,9 +133,10 @@ event_queue_free(struct event_queue *eq) if(!event_queue_empty(eq)) return(-1); + /* if(eq->thread_id) pthread_cancel(eq->thread_id); - + */ pthread_rwlock_destroy(&eq->update_lock); pthread_mutex_destroy(&eq->cond_lock); @@ -194,6 +196,26 @@ event_queue_insert(struct event_queue *eq, struct server_msg *msg) /* this is critical section */ event_queue_lock(eq); + if(NULL == eq->head) { + el->prev = el; + el->next = el; + eq->head = el; + eq->tail_ems = el; + } else { + if(server_msg_is_priority(msg)) { + el->next = eq->tail_ems->next; + el->prev = eq->tail_ems; + if(eq->head == eq->tail_ems) { + eq->head = el; + } + } else { + el->next = eq->head->next; + el->prev = eq->head; + } + el->next->prev = el; + el->prev->next = el; + } +#if 0 /* OLD IMPLEMENTATION */ #if defined(INTERLOGD_EMS) if(server_msg_is_priority(msg)) { /* priority messages go first */ @@ -227,6 +249,7 @@ event_queue_insert(struct event_queue *eq, struct server_msg *msg) if(eq->mark_this && (el->prev == eq->mark_this)) eq->mark_prev = el; #endif +#endif /* OLD IMPLEMENTATION */ if(++eq->cur_len > eq->max_len) eq->max_len = eq->cur_len; @@ -239,33 +262,59 @@ event_queue_insert(struct event_queue *eq, struct server_msg *msg) int -event_queue_get(struct event_queue *eq, struct server_msg **msg) +event_queue_get(struct event_queue *eq, struct queue_thread *me, struct server_msg **msg) { struct event_queue_msg *el; + int found; assert(eq != NULL); assert(msg != NULL); event_queue_lock(eq); + if(me->jobid) { + free(me->jobid); + me->jobid = NULL; + me->current = NULL; + } + if(NULL == eq->head) { + event_queue_unlock(eq); + return 0; + } + found = 0; el = eq->head; -#if defined(INTERLOGD_EMS) - /* this message is marked for removal, it is first on the queue */ - eq->mark_this = el; - eq->mark_prev = NULL; -#endif + do { + char *jobid = el->msg->job_id_s; + int i; + + for(i = 0; i < eq->num_threads; i++) { + if(me == eq->thread + i) { + continue; + } + if(eq->thread[i].jobid && strcmp(jobid, eq->thread[i].jobid) == 0) { + break; + } + } + if(i >= eq->num_threads) { + /* no other thread is working on this job */ + found = 1; + break; + } + el = el->prev; + } while(el != eq->head); + if(found) { + me->current = el; + me->jobid = strdup(el->msg->job_id_s); + *msg = el->msg; + } else { + } event_queue_unlock(eq); - if(el == NULL) - return(-1); - - *msg = el->msg; - - return(0); + return found; } int -event_queue_remove(struct event_queue *eq) +event_queue_remove(struct event_queue *eq, struct queue_thread *me) { struct event_queue_msg *el; #if defined(INTERLOGD_EMS) @@ -273,9 +322,31 @@ event_queue_remove(struct event_queue *eq) #endif assert(eq != NULL); + + el = me->current; + if(NULL == el) { + return -1; + } /* this is critical section */ event_queue_lock(eq); + + if(el == el->prev) { + /* last element */ + eq->head = NULL; + eq->tail_ems = NULL; + } else { + el->next->prev = el->prev; + el->prev->next = el->next; + if(el == eq->head) { + eq->head = el->prev; + } + if(el == eq->tail_ems) { + eq->tail_ems = el->prev; + } + } + +#if 0 /* OLD IMPLEMENTATION */ #if defined(INTERLOGD_EMS) el = eq->mark_this; prev = eq->mark_prev; @@ -314,6 +385,8 @@ event_queue_remove(struct event_queue *eq) eq->tail = NULL; } #endif +#endif /* OLD IMPLEMENTATION */ + if(--eq->cur_len == 0) eq->times_empty++; @@ -330,18 +403,87 @@ event_queue_remove(struct event_queue *eq) return(0); } + int event_queue_move_events(struct event_queue *eq_s, struct event_queue *eq_d, int (*cmp_func)(struct server_msg *, void *), void *data) { - struct event_queue_msg *p, **source_prev, **dest_tail; + struct event_queue_msg *p, *q, *last; + /* struct event_queue_msg **source_prev, **dest_tail; */ assert(eq_s != NULL); assert(data != NULL); event_queue_lock(eq_s); + + p = eq_s->head; + if(NULL == p) { + event_queue_unlock(eq_s); + return 0; + } + if(eq_d) { + event_queue_lock(eq_d); + } + + last = p->next; + do { + if((*cmp_func)(p->msg, data)) { + glite_common_log(IL_LOG_CATEGORY, LOG_PRIORITY_DEBUG, + " moving event at offset %d(%d) from %s:%d to %s:%d", + p->msg->offset, p->msg->generation, eq_s->dest_name, eq_s->dest_port, + eq_d ? eq_d->dest_name : "trash", eq_d ? eq_d->dest_port : -1); + q = p->prev; + /* remove message from the source queue */ + if(p->next == p->prev) { + /* removing last message */ + eq_s->head = NULL; + eq_s->tail_ems = NULL; + } else { + p->next->prev = p->prev; + p->prev->next = p->next; + if(p == eq_s->head) { + eq_s->head = p->prev; + } + if(p == eq_s->tail_ems) { + eq_s->tail_ems = p->prev; + } + } + eq_s->cur_len--; + if(eq_d) { + /* append message to the destination queue */ + if(eq_d->head == NULL) { + eq_d->head = p; + eq_d->tail_ems = p; + p->next = p; + p->prev = p; + } else { + /* priorities are ignored */ + p->next = eq_d->head->next; + p->prev = eq_d->head; + p->next->prev = p; + p->prev->next = p; + + } + if(++eq_d->cur_len > eq_d->max_len) { + eq_d->max_len = eq_d->cur_len; + } + } else { + /* signal that the message was 'delivered' */ + event_store_commit(p->msg->es, p->msg->ev_len, queue_list_is_log(eq_s), + p->msg->generation); + /* free the message */ + server_msg_free(p->msg); + free(p); + } + p = q; + } else { + p = p->prev; + } + } while (eq_s->head && p != last); + +#if 0 /* OLD IMPLEMENTATION */ if(eq_d) { event_queue_lock(eq_d); /* dest tail is set to point to the last (NULL) pointer in the list */ @@ -385,6 +527,8 @@ event_queue_move_events(struct event_queue *eq_s, } p = *source_prev; } +#endif /* OLD IMPLEMENTATION */ + if(eq_s->cur_len <= queue_size_low) { eq_s->throttling = 0; } diff --git a/org.glite.lb.logger/src/event_store.c b/org.glite.lb.logger/src/event_store.c index 1824899..98f7755 100644 --- a/org.glite.lb.logger/src/event_store.c +++ b/org.glite.lb.logger/src/event_store.c @@ -850,7 +850,7 @@ event_store_recover(struct event_store *es) glite_common_log(IL_LOG_CATEGORY, LOG_PRIORITY_INFO, " all messages for notif id %s are now destined to %s", es->job_id_s, eq_b->dest); - if(event_queue_create_thread(eq_b) < 0) { + if(event_queue_create_thread(eq_b, parallel) < 0) { ret = -1; } else { event_queue_cond_lock(eq_b); diff --git a/org.glite.lb.logger/src/il_master.c b/org.glite.lb.logger/src/il_master.c index 2cc1018..16079d6 100644 --- a/org.glite.lb.logger/src/il_master.c +++ b/org.glite.lb.logger/src/il_master.c @@ -35,11 +35,12 @@ limitations under the License. int enqueue_msg(struct event_queue *eq, struct server_msg *msg) +/* global: parallel */ { int ret; /* fire thread to take care of this queue */ - if(event_queue_create_thread(eq) < 0) + if(event_queue_create_thread(eq, parallel) < 0) return(-1); #if defined(IL_NOTIFICATIONS) diff --git a/org.glite.lb.logger/src/input_queue_socket.c b/org.glite.lb.logger/src/input_queue_socket.c index ffcca81..9489fe6 100644 --- a/org.glite.lb.logger/src/input_queue_socket.c +++ b/org.glite.lb.logger/src/input_queue_socket.c @@ -28,7 +28,7 @@ limitations under the License. #include "interlogd.h" -static const int SOCK_QUEUE_MAX = 50; +static const int SOCK_QUEUE_MAX = 100; extern char *socket_path; static int sock; diff --git a/org.glite.lb.logger/src/interlogd.c b/org.glite.lb.logger/src/interlogd.c index c6db23b..08b2728 100644 --- a/org.glite.lb.logger/src/interlogd.c +++ b/org.glite.lb.logger/src/interlogd.c @@ -113,7 +113,7 @@ int default_close_timeout; size_t max_store_size; size_t queue_size_low = 0; size_t queue_size_high = 0; -int parallel = 0; +int parallel = 1; #ifdef LB_PERF int nosend = 0, norecover=0, nosync=0, noparse=0; char *event_source = NULL; @@ -182,9 +182,9 @@ decode_switches (int argc, char **argv) "C:" /* CA dir */ "b" /* only bookeeping */ "i:" /* pidfile*/ - "l:" /* log server */ + "l:" /* log server */ "d" /* debug */ - "p" /* parallel */ + "p::" /* parallel */ "q:" "Q:" "F:" /* conf file */ @@ -593,6 +593,7 @@ main (int argc, char **argv) } #endif + glite_common_log(LOG_CATEGORY_CONTROL, LOG_PRIORITY_INFO, "Using %d threads for parallel delivery.", parallel); glite_common_log(LOG_CATEGORY_CONTROL, LOG_PRIORITY_INFO, "Entering main loop."); /* do the work */ diff --git a/org.glite.lb.logger/src/perftest_il.sh b/org.glite.lb.logger/src/perftest_il.sh index 45d16df..2f906fc 100644 --- a/org.glite.lb.logger/src/perftest_il.sh +++ b/org.glite.lb.logger/src/perftest_il.sh @@ -17,6 +17,7 @@ # numjobs=10 +parallel=1 # XXX - there must be better way to find stage if [ -z "${GLITE_LOCATION}" ]; then @@ -44,7 +45,7 @@ COMM_ARGS="-s /tmp/interlogger.perftest --file-prefix=/tmp/perftest.log" #TEST_VARIANT= SILENT=0 -while getopts "G:t:n:s" OPTION +while getopts "G:t:n:sp:" OPTION do case "$OPTION" in "G") TEST_GROUP=$OPTARG @@ -59,6 +60,9 @@ do "s") SILENT=1 ;; + "p") parallel=$OPTARG + ;; + esac done @@ -273,7 +277,7 @@ echo "" fi PERFTEST_CONSUMER=$STAGEDIR/bin/glite-lb-bkserverd -CONSUMER_ARGS="--silent -S /tmp -D /tmp -t 1 -d --perf-sink=1 -p 10500 -w 10503" +CONSUMER_ARGS="-g --silent -S /tmp -D /tmp -t 1 -d --perf-sink=1 -p 10500 -w 10503" PERFTEST_COMPONENT=$STAGEDIR/bin/glite-lb-interlogd-perf LOGJOBS_ARGS=" -m localhost:10500 $COMM_ARGS" } @@ -316,7 +320,7 @@ group_c_test_x () group_c_test_d () { - COMPONENT_ARGS="-i /tmp/perftest_il.pid -d --lazy=10 --nosync --norecover $COMM_ARGS" + COMPONENT_ARGS="-p $parallel -i /tmp/perftest_il.pid -d --lazy=10 --nosync --norecover $COMM_ARGS" echo -n "d)" run_test il $numjobs print_result @@ -325,7 +329,7 @@ group_c_test_d () group_c_test_e () { - COMPONENT_ARGS="-i /tmp/perftest_il.pid -d $COMM_ARGS" + COMPONENT_ARGS="-p $parallel -i /tmp/perftest_il.pid -d $COMM_ARGS" echo -n "e)" run_test il $numjobs print_result diff --git a/org.glite.lb.logger/src/queue_mgr.c b/org.glite.lb.logger/src/queue_mgr.c index ac05324..2bfa4e4 100644 --- a/org.glite.lb.logger/src/queue_mgr.c +++ b/org.glite.lb.logger/src/queue_mgr.c @@ -28,6 +28,7 @@ limitations under the License. #include "interlogd.h" struct queue_list { + struct queue_list *queues; struct event_queue *queue; char *dest; struct queue_list *next; @@ -91,9 +92,8 @@ int queue_list_add(struct queue_list **ql, const char *dest, struct event_queue *eq) { struct queue_list *el; - + assert(dest != NULL); - assert(eq != NULL); assert(ql != NULL); el = malloc(sizeof(*el)); @@ -168,19 +168,18 @@ queue_list_get(char *job_id_s) return(NULL); if(queue_list_find(queues, dest, &q, NULL)) { -#if !defined(IL_NOTIFICATIONS) - free(dest); -#endif - return(q->queue); + /* queue found for given destination */ + eq = q->queue; } else { + /* no queue for given destination found */ eq = event_queue_create(dest, outp); if(eq) queue_list_add(&queues, dest, eq); + } #if !defined(IL_NOTIFICATIONS) - free(dest); + free(dest); #endif - return(eq); - } + return eq; } diff --git a/org.glite.lb.logger/src/queue_thread.c b/org.glite.lb.logger/src/queue_thread.c index 8aae585..a4bc1dd 100644 --- a/org.glite.lb.logger/src/queue_thread.c +++ b/org.glite.lb.logger/src/queue_thread.c @@ -30,8 +30,10 @@ void queue_thread_cleanup(void *q) { struct event_queue *eq = (struct event_queue *)q; + pthread_t my_id = pthread_self(); + int me; - glite_common_log(LOG_CATEGORY_CONTROL, LOG_PRIORITY_WARN, "thread %d exits", eq->thread_id); + glite_common_log(LOG_CATEGORY_CONTROL, LOG_PRIORITY_WARN, "thread %d exits", my_id); /* unlock all held locks */ /* FIXME: check that the thread always exits when holding these locks; @@ -41,7 +43,12 @@ queue_thread_cleanup(void *q) */ /* clear thread id */ - eq->thread_id = 0; + for(me = 0; me < eq->num_threads; me++) { + if(my_id == eq->thread[me].thread_id) { + eq->thread[me].thread_id = 0; + break; + } + } } @@ -60,6 +67,8 @@ void * queue_thread(void *q) { struct event_queue *eq = (struct event_queue *)q; + struct queue_thread *me; + pthread_t my_id; int ret, exit; int retrycnt; int close_timeout = 0; @@ -75,6 +84,18 @@ queue_thread(void *q) " started new thread for delivery to %s", eq->dest); + my_id = pthread_self(); + for(me = eq->thread; me - eq->thread < eq->num_threads; me++) { + if(my_id == me->thread_id) { + break; + } + } + if(me - eq->thread >= eq->num_threads) { + glite_common_log(LOG_CATEGORY_CONTROL, LOG_PRIORITY_ERROR, + "Error looking up thread identity, exiting!"); + pthread_exit(NULL); + } + pthread_cleanup_push(queue_thread_cleanup, q); event_queue_cond_lock(eq); @@ -87,7 +108,7 @@ queue_thread(void *q) /* if there are no events, wait for them */ ret = 0; - while (event_queue_empty(eq) + while (event_queue_empty(eq) #if defined(INTERLOGD_HANDLE_CMD) && defined(INTERLOGD_FLUSH) && (eq->flushing != 1) #endif @@ -95,7 +116,7 @@ queue_thread(void *q) if(lazy_close && close_timeout) { ret = event_queue_wait(eq, close_timeout); if(ret == 1) {/* timeout? */ - (*eq->event_queue_close)(eq); + (*eq->event_queue_close)(eq, me); glite_common_log(IL_LOG_CATEGORY, LOG_PRIORITY_DEBUG, " connection to %s closed", eq->dest); @@ -105,9 +126,10 @@ queue_thread(void *q) ret = event_queue_wait(eq, exit_timeout); if(ret == 1) { glite_common_log(LOG_CATEGORY_CONTROL, LOG_PRIORITY_INFO, - " thread idle for more than %d seconds, exiting", + " thread %x idle for more than %d seconds, exiting", + me, exit_timeout); - (*eq->event_queue_close)(eq); + (*eq->event_queue_close)(eq, me); event_queue_cond_unlock(eq); pthread_exit((void*)0); } @@ -120,6 +142,7 @@ queue_thread(void *q) event_queue_cond_unlock(eq); pthread_exit((void*)-1); } + /* sleep only once when there is no event available for this particular thread */ } /* END while(empty) */ @@ -128,36 +151,41 @@ queue_thread(void *q) */ event_queue_cond_unlock(eq); - /* discard expired events */ - glite_common_log(IL_LOG_CATEGORY, LOG_PRIORITY_DEBUG, " discarding expired events"); - now = time(NULL); - event_queue_move_events(eq, NULL, cmp_expires, &now); + /* discard expired events - only the first thread */ + if(me == eq->thread) { + glite_common_log(IL_LOG_CATEGORY, LOG_PRIORITY_DEBUG, " thread %x: discarding expired events", me); + now = time(NULL); + event_queue_move_events(eq, NULL, cmp_expires, &now); + } if(!event_queue_empty(eq)) { /* deliver pending events */ glite_common_log(IL_LOG_CATEGORY, LOG_PRIORITY_DEBUG, - " attempting delivery to %s", + " thread %x: attempting delivery to %s", + me, eq->dest); /* connect to server */ - if((ret=(*eq->event_queue_connect)(eq)) == 0) { + if((ret=(*eq->event_queue_connect)(eq, me)) == 0) { /* not connected */ if(error_get_maj() != IL_OK) glite_common_log(IL_LOG_CATEGORY, LOG_PRIORITY_WARN, "queue_thread: %s", error_get_msg()); #if defined(IL_NOTIFICATIONS) glite_common_log(IL_LOG_CATEGORY, LOG_PRIORITY_INFO, - " could not connect to client %s, waiting for retry", + " thread %x: could not connect to client %s, waiting for retry", + me, eq->dest); #else glite_common_log(IL_LOG_CATEGORY, LOG_PRIORITY_INFO, - " could not connect to bookkeeping server %s, waiting for retry", + " thread %x: could not connect to bookkeeping server %s, waiting for retry", + me, eq->dest); #endif retrycnt++; } else { retrycnt = 0; /* connected, send events */ - switch(ret=(*eq->event_queue_send)(eq)) { + switch(ret=(*eq->event_queue_send)(eq, me)) { case 0: /* there was an error and we still have events to send */ @@ -166,13 +194,14 @@ queue_thread(void *q) "queue_thread: %s", error_get_msg()); glite_common_log(IL_LOG_CATEGORY, LOG_PRIORITY_DEBUG, - " events still waiting"); + " thread %x: events still waiting", me); break; case 1: /* hey, we are done for now */ glite_common_log(IL_LOG_CATEGORY, LOG_PRIORITY_DEBUG, - " all events for %s sent", + " thread %x: all events for %s sent", + me, eq->dest); break; @@ -190,9 +219,10 @@ queue_thread(void *q) if((ret == 1) && lazy_close) close_timeout = default_close_timeout; else { - (*eq->event_queue_close)(eq); + (*eq->event_queue_close)(eq, me); glite_common_log(IL_LOG_CATEGORY, LOG_PRIORITY_DEBUG, - " connection to %sclosed", + " thread %x: connection to %s closed", + me, eq->dest); } } @@ -218,6 +248,15 @@ queue_thread(void *q) #else #endif + event_queue_lock(eq); + if(me->jobid) { + free(me->jobid); + me->jobid = NULL; + me->current = NULL; + } + event_queue_unlock(eq); + + /* if there was some error with server, sleep for a while */ /* iff !event_queue_empty() */ /* also allow for one more try immediately after server disconnect, @@ -225,8 +264,8 @@ queue_thread(void *q) #ifndef LB_PERF if((ret == 0) && (retrycnt > 0)) { glite_common_log(IL_LOG_CATEGORY, LOG_PRIORITY_DEBUG, - " sleeping"); - event_queue_sleep(eq); + " thread %x: sleeping", me); + event_queue_sleep(eq, me->timeout); } #endif @@ -249,33 +288,43 @@ queue_thread(void *q) int -event_queue_create_thread(struct event_queue *eq) +event_queue_create_thread(struct event_queue *eq, int num_threads) { pthread_attr_t attr; + int i; assert(eq != NULL); event_queue_lock(eq); - /* if there is a thread already, just return */ - if(eq->thread_id != 0) { - event_queue_unlock(eq); - return(0); + eq->num_threads = num_threads; + + /* create thread data */ + if(NULL == eq->thread) { + eq->thread = calloc(num_threads, sizeof(*eq->thread)); + if(NULL == eq->thread) { + set_error(IL_NOMEM, ENOMEM, "event_queue_create: error allocating data for threads"); + event_queue_unlock(eq); + return -1; + } } - /* create the thread itself */ - pthread_attr_init(&attr); - pthread_attr_setstacksize(&attr, 65536); - if(pthread_create(&eq->thread_id, &attr, queue_thread, eq) < 0) { - eq->thread_id = 0; - set_error(IL_SYS, errno, "event_queue_create_thread: error creating new thread"); - event_queue_unlock(eq); - return(-1); + /* create the threads itself */ + for(i = 0; i < eq->num_threads; i++) { + pthread_attr_init(&attr); + pthread_attr_setstacksize(&attr, 65536); + if(eq->thread[i].thread_id == 0) { + if(pthread_create(&eq->thread[i].thread_id, &attr, queue_thread, eq) < 0) { + eq->thread[i].thread_id = 0; + set_error(IL_SYS, errno, "event_queue_create_thread: error creating new thread"); + event_queue_unlock(eq); + return(-1); + } + } + /* the thread is never going to be joined */ + pthread_detach(eq->thread[i].thread_id); } - /* the thread is never going to be joined */ - pthread_detach(eq->thread_id); - event_queue_unlock(eq); return(1); @@ -384,7 +433,7 @@ event_queue_wait(struct event_queue *eq, int timeout) } -int event_queue_sleep(struct event_queue *eq) +int event_queue_sleep(struct event_queue *eq, int timeout) { #if defined(INTERLOGD_HANDLE_CMD) && defined(INTERLOGD_FLUSH) struct timespec ts; @@ -394,7 +443,7 @@ int event_queue_sleep(struct event_queue *eq) assert(eq != NULL); gettimeofday(&tv, NULL); - ts.tv_sec = tv.tv_sec + eq->timeout; + ts.tv_sec = tv.tv_sec + timeout; ts.tv_nsec = 1000 * tv.tv_usec; if((ret=pthread_cond_timedwait(&eq->flush_cond, &eq->cond_lock, &ts)) < 0) { if(ret != ETIMEDOUT) { @@ -406,7 +455,7 @@ int event_queue_sleep(struct event_queue *eq) } } #else - sleep(eq->timeout); + sleep(timeout); #endif return(0); } diff --git a/org.glite.lb.logger/src/recover.c b/org.glite.lb.logger/src/recover.c index 250a40a..20a9d9e 100644 --- a/org.glite.lb.logger/src/recover.c +++ b/org.glite.lb.logger/src/recover.c @@ -87,6 +87,10 @@ recover_thread(void *q) new_creds->name); } } +#ifndef LB_PERF sleep(RECOVER_TIMEOUT); +#else + sleep(2); +#endif } } diff --git a/org.glite.lb.logger/src/send_event.c b/org.glite.lb.logger/src/send_event.c index b48ab23..979962c 100644 --- a/org.glite.lb.logger/src/send_event.c +++ b/org.glite.lb.logger/src/send_event.c @@ -147,7 +147,7 @@ gss_reader(void *user_data, char *buffer, int max_len) */ static int -get_reply(struct event_queue *eq, char **buf, int *code_min) +get_reply(struct event_queue *eq, struct queue_thread *me, char **buf, int *code_min) { char *msg=NULL; int ret, code; @@ -157,7 +157,7 @@ get_reply(struct event_queue *eq, char **buf, int *code_min) tv.tv_sec = TIMEOUT; tv.tv_usec = 0; - data.gss = &eq->gss; + data.gss = &me->gss; data.timeout = &tv; len = read_il_data(&data, &msg, gss_reader); if(len < 0) { @@ -179,7 +179,7 @@ get_reply(struct event_queue *eq, char **buf, int *code_min) * Returns: 0 - not connected, timeout set, 1 - OK */ int -event_queue_connect(struct event_queue *eq) +event_queue_connect(struct event_queue *eq, struct queue_thread *me) { int ret; struct timeval tv; @@ -187,12 +187,13 @@ event_queue_connect(struct event_queue *eq) cred_handle_t *local_cred_handle; assert(eq != NULL); + assert(me != NULL); #ifdef LB_PERF if(!nosend) { #endif - if(eq->gss.context == NULL) { + if(me->gss.context == NULL) { tv.tv_sec = TIMEOUT; tv.tv_usec = 0; @@ -208,7 +209,7 @@ event_queue_connect(struct event_queue *eq) glite_common_log(IL_LOG_CATEGORY, LOG_PRIORITY_DEBUG, " trying to connect to %s:%d", eq->dest_name, eq->dest_port); - ret = edg_wll_gss_connect(local_cred_handle->creds, eq->dest_name, eq->dest_port, &tv, &eq->gss, &gss_stat); + ret = edg_wll_gss_connect(local_cred_handle->creds, eq->dest_name, eq->dest_port, &tv, &me->gss, &gss_stat); if(pthread_mutex_lock(&cred_handle_lock) < 0) abort(); /* check if we need to release the credentials */ @@ -229,11 +230,11 @@ event_queue_connect(struct event_queue *eq) set_error(IL_DGGSS, ret, (ret == EDG_WLL_GSS_ERROR_GSS) ? gss_err : "event_queue_connect: edg_wll_gss_connect"); if (gss_err) free(gss_err); - eq->gss.context = NULL; - eq->timeout = TIMEOUT; + me->gss.context = NULL; + me->timeout = TIMEOUT; return(0); } - eq->first_event_sent = 0; + me->first_event_sent = 0; } #ifdef LB_PERF @@ -245,19 +246,20 @@ event_queue_connect(struct event_queue *eq) int -event_queue_close(struct event_queue *eq) +event_queue_close(struct event_queue *eq, struct queue_thread *me) { assert(eq != NULL); + assert(me != NULL); #ifdef LB_PERF if(!nosend) { #endif - if(eq->gss.context != NULL) { - edg_wll_gss_close(&eq->gss, NULL); - eq->gss.context = NULL; + if(me->gss.context != NULL) { + edg_wll_gss_close(&me->gss, NULL); + me->gss.context = NULL; } - eq->first_event_sent = 0; + me->first_event_sent = 0; #ifdef LB_PERF } #endif @@ -270,14 +272,15 @@ event_queue_close(struct event_queue *eq) * Returns: -1 - system error, 0 - not send, 1 - queue empty */ int -event_queue_send(struct event_queue *eq) +event_queue_send(struct event_queue *eq, struct queue_thread *me) { assert(eq != NULL); + assert(me != NULL); #ifdef LB_PERF if(!nosend) { #endif - if(eq->gss.context == NULL) + if(me->gss.context == NULL) return(0); #ifdef LB_PERF } @@ -294,8 +297,8 @@ event_queue_send(struct event_queue *eq) clear_error(); - if(event_queue_get(eq, &msg) < 0) - return(-1); + if(event_queue_get(eq, me, &msg) == 0) + return(1); glite_common_log(IL_LOG_CATEGORY, LOG_PRIORITY_DEBUG, " trying to deliver event at offset %d for job %s", @@ -307,23 +310,23 @@ event_queue_send(struct event_queue *eq) if (msg->len) { tv.tv_sec = TIMEOUT; tv.tv_usec = 0; - ret = edg_wll_gss_write_full(&eq->gss, msg->msg, msg->len, &tv, &bytes_sent, &gss_stat); + ret = edg_wll_gss_write_full(&me->gss, msg->msg, msg->len, &tv, &bytes_sent, &gss_stat); if(ret < 0) { - if (ret == EDG_WLL_GSS_ERROR_ERRNO && errno == EPIPE && eq->first_event_sent ) - eq->timeout = 0; + if (ret == EDG_WLL_GSS_ERROR_ERRNO && errno == EPIPE && me->first_event_sent ) + me->timeout = 0; else - eq->timeout = TIMEOUT; + me->timeout = TIMEOUT; return(0); } - if((code = get_reply(eq, &rep, &code_min)) < 0) { + if((code = get_reply(eq, me, &rep, &code_min)) < 0) { /* could not get the reply properly, so try again later */ - if (eq->first_event_sent) { + if (me->first_event_sent) { /* could be expected server connection preemption */ clear_error(); - eq->timeout = 1; + me->timeout = 1; } else { - eq->timeout = TIMEOUT; + me->timeout = TIMEOUT; glite_common_log(IL_LOG_CATEGORY, LOG_PRIORITY_WARN, " error reading server %s reply: %s", eq->dest_name, error_get_msg()); } @@ -354,7 +357,7 @@ event_queue_send(struct event_queue *eq) case LB_PERM: case LB_DBERR: /* non fatal errors (for us) */ - eq->timeout = TIMEOUT; + me->timeout = TIMEOUT; return(0); case LB_OK: @@ -380,8 +383,8 @@ event_queue_send(struct event_queue *eq) "send_event: %s", error_get_msg()); - event_queue_remove(eq); - eq->first_event_sent = 1; + event_queue_remove(eq, me); + me->first_event_sent = 1; break; } /* switch */ diff --git a/org.glite.lb.logger/test/IlTestBase.cpp b/org.glite.lb.logger/test/IlTestBase.cpp index 4ef7301..903f381 100644 --- a/org.glite.lb.logger/test/IlTestBase.cpp +++ b/org.glite.lb.logger/test/IlTestBase.cpp @@ -25,8 +25,15 @@ const char *IlTestBase::msg_enc = " 429\n6 michal\n415 DATE=20040831 const struct server_msg IlTestBase::smsg = { "https://some.host:1234/x67qr549qc", + 0L, (char*)IlTestBase::msg_enc, strlen(IlTestBase::msg_enc), strlen(IlTestBase::msg) + 1, - NULL + NULL, + 0, + 0L, + "some.host", + 1234, + "some.host:1234", + 0 }; diff --git a/org.glite.lb.logger/test/event_queueTest.cpp b/org.glite.lb.logger/test/event_queueTest.cpp index e07c19f..f5c761f 100644 --- a/org.glite.lb.logger/test/event_queueTest.cpp +++ b/org.glite.lb.logger/test/event_queueTest.cpp @@ -23,6 +23,7 @@ extern "C" { struct event_queue_msg { struct server_msg *msg; struct event_queue_msg *prev; + struct event_queue_msg *next; }; } @@ -41,7 +42,15 @@ class event_queueTest: public CppUnit::TestFixture public: void setUp() { server = strdup("localhost:8080"); - eq = event_queue_create(server); + eq = event_queue_create(server, NULL); + threads[0].thread_id = pthread_self(); + threads[0].gss.context = NULL; + threads[0].jobid = strdup("1"); + threads[1].thread_id = (pthread_t)1; + threads[1].gss.context = NULL; + threads[1].jobid = strdup("2"); + eq->thread = threads; + eq->num_threads = 2; free(server); } @@ -49,7 +58,7 @@ public: struct event_queue_msg *mp; struct server_msg *m; - for(mp = eq->head; mp != NULL; ) { + for(mp = eq->head; mp != eq->head; ) { struct event_queue_msg *mq; server_msg_free(mp->msg); @@ -65,12 +74,8 @@ public: CPPUNIT_ASSERT( eq != NULL ); CPPUNIT_ASSERT_EQUAL( string(eq->dest_name), string("localhost") ); CPPUNIT_ASSERT_EQUAL( eq->dest_port, 8081 ); - CPPUNIT_ASSERT( eq->tail == NULL ); CPPUNIT_ASSERT( eq->head == NULL ); CPPUNIT_ASSERT( eq->tail_ems == NULL ); - CPPUNIT_ASSERT( eq->mark_this == NULL ); - CPPUNIT_ASSERT( eq->mark_prev == NULL ); - CPPUNIT_ASSERT( eq->thread_id == 0 ); CPPUNIT_ASSERT( eq->flushing == 0 ); CPPUNIT_ASSERT( eq->flush_result == 0 ); } @@ -83,67 +88,67 @@ public: mp = eq->head; m = mp->msg; CPPUNIT_ASSERT_EQUAL( string(m->job_id_s), string("2") ); - CPPUNIT_ASSERT_EQUAL( mp, eq->tail_ems ); + CPPUNIT_ASSERT_EQUAL( mp, mp->next->prev); mp = mp->prev; m = mp->msg; CPPUNIT_ASSERT_EQUAL( string(m->job_id_s), string("1") ); + CPPUNIT_ASSERT_EQUAL( mp, eq->tail_ems ); + CPPUNIT_ASSERT_EQUAL( mp, mp->next->prev); mp = mp->prev; m = mp->msg; CPPUNIT_ASSERT_EQUAL( string(m->job_id_s), string("3") ); - CPPUNIT_ASSERT_EQUAL( mp, eq->tail ); - CPPUNIT_ASSERT( mp->prev == NULL ); + CPPUNIT_ASSERT_EQUAL( mp, mp->next->prev); + CPPUNIT_ASSERT_EQUAL( mp->prev, eq->head ); } void testEventQueueGet() { - struct event_queue_msg *mp; - struct server_msg *m,sm; + struct queue_thread *me = threads + 0; + struct server_msg *m; int ret; doSomeInserts(); - mp = eq->head; - eq->head = mp->prev; - eq->tail_ems = NULL; - server_msg_free(mp->msg); - free(mp); - ret = event_queue_get(eq, &m); - CPPUNIT_ASSERT( ret == 0 ); - CPPUNIT_ASSERT( eq->mark_this == eq->head ); - CPPUNIT_ASSERT( eq->mark_prev == NULL ); + ret = event_queue_get(eq, me, &m); + CPPUNIT_ASSERT( ret == 1 ); CPPUNIT_ASSERT_EQUAL( string("1"), string(m->job_id_s) ); - sm = IlTestBase::smsg; - sm.job_id_s = "4"; - sm.receipt_to = 1; - ret = event_queue_insert(eq, &sm); - CPPUNIT_ASSERT( ret == 0 ); - CPPUNIT_ASSERT( eq->mark_prev == eq->head ); - CPPUNIT_ASSERT( eq->mark_this == eq->head->prev ); - ret = event_queue_insert(eq, &sm); - CPPUNIT_ASSERT( ret == 0 ); - CPPUNIT_ASSERT( eq->mark_prev == eq->head->prev ); - CPPUNIT_ASSERT( eq->mark_this == eq->head->prev->prev ); + CPPUNIT_ASSERT( me->current == eq->head->prev ); + CPPUNIT_ASSERT_EQUAL( string("1"), string(me->jobid) ); + CPPUNIT_ASSERT( m == me->current->msg ); } void testEventQueueRemove() { - struct event_queue_msg *mp; - struct server_msg *m,sm; + struct server_msg *m; + struct queue_thread *me = threads + 0; int ret; doSomeInserts(); - ret = event_queue_get(eq, &m); - mp = eq->mark_this->prev; - sm = IlTestBase::smsg; - sm.job_id_s = "4"; - sm.receipt_to = 1; - event_queue_insert(eq, &sm); - ret = event_queue_remove(eq); - CPPUNIT_ASSERT( eq->head->prev == mp ); - CPPUNIT_ASSERT( eq->mark_this == NULL ); - CPPUNIT_ASSERT( eq->mark_prev == NULL ); + threads[1].jobid = strdup("4"); + ret = event_queue_get(eq, me, &m); + CPPUNIT_ASSERT(ret == 1); + ret = event_queue_remove(eq, me); + /* remain 2 */ + CPPUNIT_ASSERT( eq->head->prev == eq->head->next ); + CPPUNIT_ASSERT( eq->head != eq->head->prev ); + CPPUNIT_ASSERT( eq->head == eq->tail_ems ); + ret = event_queue_get(eq, me, &m); + CPPUNIT_ASSERT( ret == 1); + ret = event_queue_remove(eq, me); + /* remain 1 */ + CPPUNIT_ASSERT( ret == 0 ); + CPPUNIT_ASSERT( eq->head != NULL ); + CPPUNIT_ASSERT( eq->head == eq->head->prev ); + CPPUNIT_ASSERT( eq->head == eq->head->next ); + ret = event_queue_get(eq, me, &m); + CPPUNIT_ASSERT( ret == 1 ); + ret = event_queue_remove(eq, me); + /* empty queue */ + CPPUNIT_ASSERT( ret == 0); + CPPUNIT_ASSERT( eq->head == NULL ); } protected: char *server; struct event_queue *eq; + struct queue_thread threads[2]; void doSomeInserts() { struct server_msg m = IlTestBase::smsg; diff --git a/org.glite.lb.logger/test/il_test.cpp b/org.glite.lb.logger/test/il_test.cpp index 3d1d554..6722fba 100644 --- a/org.glite.lb.logger/test/il_test.cpp +++ b/org.glite.lb.logger/test/il_test.cpp @@ -20,8 +20,10 @@ limitations under the License. extern "C" { #include +#if 0 #include "glite/wmsutils/tls/ssl_helpers/ssl_inits.h" #include "glite/wmsutils/tls/ssl_helpers/ssl_pthreads.h" +#endif #include "glite/security/glite_gss.h" #include "interlogd.h" #include "glite/lb/consumer.h" @@ -37,11 +39,22 @@ extern "C" { int TIMEOUT = DEFAULT_TIMEOUT; +#if 0 gss_cred_id_t cred_handle = GSS_C_NO_CREDENTIAL; pthread_mutex_t cred_handle_lock = PTHREAD_MUTEX_INITIALIZER; +#endif +cred_handle_t *cred_handle = NULL; +pthread_mutex_t cred_handle_lock = PTHREAD_MUTEX_INITIALIZER; char *file_prefix = DEFAULT_PREFIX; int bs_only = 0; +int lazy_close = 1; +int default_close_timeout; +size_t max_store_size; +size_t queue_size_low = 0; +size_t queue_size_high = 0; +int parallel = 0; +int killflg = 0; char *cert_file = NULL; char *key_file = NULL; @@ -49,6 +62,9 @@ char *CAcert_dir = NULL; char *log_server = NULL; char *socket_path = DEFAULT_SOCKET; +extern "C" { + void do_handle_signal() { }; +} int main (int ac,const char *av[]) diff --git a/org.glite.lb.logger/test/input_queue_socketTest.cpp b/org.glite.lb.logger/test/input_queue_socketTest.cpp index dd53f52..3604040 100644 --- a/org.glite.lb.logger/test/input_queue_socketTest.cpp +++ b/org.glite.lb.logger/test/input_queue_socketTest.cpp @@ -71,15 +71,15 @@ public: void input_queue_getTest() { - char *event; + il_octet_string_t *event; long offset; int ret; ret = input_queue_get(&event, &offset, 10); CPPUNIT_ASSERT( ret >= 0 ); CPPUNIT_ASSERT_EQUAL( 0L, offset ); - CPPUNIT_ASSERT_EQUAL( string(IlTestBase::msg), string(event) ); - free(event); + CPPUNIT_ASSERT_EQUAL( string(IlTestBase::msg), string(event->data) ); + free(event->data); } }; diff --git a/org.glite.lb.logger/test/server_msgTest.cpp b/org.glite.lb.logger/test/server_msgTest.cpp index 2bdc50d..d7f2198 100644 --- a/org.glite.lb.logger/test/server_msgTest.cpp +++ b/org.glite.lb.logger/test/server_msgTest.cpp @@ -33,7 +33,9 @@ class server_msgTest: public CppUnit::TestFixture public: void setUp() { - msg = server_msg_create((char *)IlTestBase::msg); + il_octet_string d = { len: strlen((char*)IlTestBase::msg), data: (char*)IlTestBase::msg }; + + msg = server_msg_create(&d, 0); } void tearDown() { -- 1.8.2.3