From 7e57ad4f2344edb74a2e5919d3629c7a3c0e228e Mon Sep 17 00:00:00 2001 From: =?utf8?q?Michal=20Voc=C5=AF?= Date: Mon, 12 Jun 2006 16:13:38 +0000 Subject: [PATCH] * added some queue statistics * added thread monitoring startup (if defined LB_PROF) * implemented lazy connection close * added the lazy test case --- org.glite.lb.logger/src/event_queue.c | 6 ++++++ org.glite.lb.logger/src/il_error.c | 10 ++++++++++ org.glite.lb.logger/src/interlogd.c | 20 ++++++++++++++++++-- org.glite.lb.logger/src/interlogd.h | 6 ++++++ org.glite.lb.logger/src/perftest_il.sh | 8 ++++++-- org.glite.lb.logger/src/queue_thread.c | 18 +++++++++++++++++- 6 files changed, 63 insertions(+), 5 deletions(-) diff --git a/org.glite.lb.logger/src/event_queue.c b/org.glite.lb.logger/src/event_queue.c index 7161505..bc9a44e 100644 --- a/org.glite.lb.logger/src/event_queue.c +++ b/org.glite.lb.logger/src/event_queue.c @@ -172,6 +172,9 @@ event_queue_insert(struct event_queue *eq, struct server_msg *msg) eq->mark_prev = el; #endif + if(++eq->cur_len > eq->max_len) + eq->max_len = eq->cur_len; + event_queue_unlock(eq); /* end of critical section */ @@ -255,6 +258,9 @@ event_queue_remove(struct event_queue *eq) eq->tail = NULL; } #endif + if(--eq->cur_len == 0) + eq->times_empty++; + event_queue_unlock(eq); /* end of critical section */ diff --git a/org.glite.lb.logger/src/il_error.c b/org.glite.lb.logger/src/il_error.c index f57b2ed..e608b1c 100644 --- a/org.glite.lb.logger/src/il_error.c +++ b/org.glite.lb.logger/src/il_error.c @@ -8,6 +8,11 @@ #include #include +#ifdef LB_PROF +#include +extern void _start (void), etext (void); +#endif + /* XXX DK: */ #include // SSL header file @@ -49,6 +54,7 @@ error_get_err () return(err); } + int init_errors(int level) { @@ -77,6 +83,10 @@ init_errors(int level) if(level) log_level = level; +#ifdef LB_PROF + monstartup((u_long)&_start, (u_long)&etext); +#endif + return(0); } diff --git a/org.glite.lb.logger/src/interlogd.c b/org.glite.lb.logger/src/interlogd.c index ce5d522..3281718 100644 --- a/org.glite.lb.logger/src/interlogd.c +++ b/org.glite.lb.logger/src/interlogd.c @@ -56,6 +56,7 @@ static void usage (int status) " -b, --book send events to bookkeeping server only\n" " -l, --log-server specify address of log server\n" " -s, --socket non-default path of local socket\n" + " -L, --lazy [] be lazy when closing connections to servers\n" #ifdef LB_PERF " -n, --nosend PERFTEST: consume events instead of sending\n" " -S, --nosync PERFTEST: do not check logd files for lost events\n" @@ -76,6 +77,8 @@ static int debug; static int verbose = 0; char *file_prefix = DEFAULT_PREFIX; int bs_only = 0; +int lazy_close = 0; +int default_close_timeout; #ifdef LB_PERF int nosend = 0, norecover=0, nosync=0, noparse=0; char *event_source = NULL; @@ -101,6 +104,7 @@ static struct option const long_options[] = {"CAdir", required_argument, 0, 'C'}, {"log-server", required_argument, 0, 'l'}, {"socket", required_argument, 0, 's'}, + {"lazy", optional_argument, 0, 'L'}, #ifdef LB_PERF {"nosend", no_argument, 0, 'n'}, {"nosync", no_argument, 0, 'S'}, @@ -146,6 +150,7 @@ decode_switches (int argc, char **argv) "j:" /* num jobs */ #endif #endif + "L::" /* lazy */ "s:", /* socket */ long_options, (int *) 0)) != EOF) { @@ -194,6 +199,14 @@ decode_switches (int argc, char **argv) socket_path = strdup(optarg); break; + case 'L': + lazy_close = 1; + if(optarg) + default_close_timeout = atoi(optarg); + else + default_close_timeout = TIMEOUT; + break; + #ifdef LB_PERF case 'n': nosend = 1; @@ -249,10 +262,10 @@ main (int argc, char **argv) setlinebuf(stdout); setlinebuf(stderr); - i = decode_switches (argc, argv); - if ((p = getenv("EDG_WL_INTERLOG_TIMEOUT"))) TIMEOUT = atoi(p); + i = decode_switches (argc, argv); + /* force -b if we do not have log server */ if(log_server == NULL) { log_server = strdup(DEFAULT_LOG_SERVER); @@ -293,6 +306,9 @@ main (int argc, char **argv) il_log(LOG_CRIT, "Failed to initialize output event queues: %s\n", error_get_msg()); exit(EXIT_FAILURE); } + if(lazy_close) + il_log(LOG_DEBUG, " using lazy mode when closing connections, timeout %d\n", + default_close_timeout); if (CAcert_dir) setenv("X509_CERT_DIR", CAcert_dir, 1); diff --git a/org.glite.lb.logger/src/interlogd.h b/org.glite.lb.logger/src/interlogd.h index 656e18d..9ec0d55 100644 --- a/org.glite.lb.logger/src/interlogd.h +++ b/org.glite.lb.logger/src/interlogd.h @@ -70,6 +70,8 @@ extern char *key_file; extern char *CAcert_dir; extern int bs_only; extern int killflg; +extern int lazy_close; +extern int default_close_timeout; #ifdef LB_PERF extern int nosend, nosync, norecover, noparse; #ifdef PERF_EVENTS_INLINE @@ -136,6 +138,10 @@ struct event_queue { int flush_result; /* result of flush operation */ pthread_cond_t flush_cond; /* condition variable for flush operation */ #endif + /* statistics */ + int times_empty; /* number of times the queue was emptied */ + int max_len; /* max queue length */ + int cur_len; /* current length */ }; diff --git a/org.glite.lb.logger/src/perftest_il.sh b/org.glite.lb.logger/src/perftest_il.sh index b647c42..9879951 100644 --- a/org.glite.lb.logger/src/perftest_il.sh +++ b/org.glite.lb.logger/src/perftest_il.sh @@ -120,7 +120,7 @@ run_test il $numjobs print_result rm -f /tmp/perftest.log.* -echo "d) this test is not yet implemented" +echo "d) this test is not applicable" CONSUMER_ARGS="-d --nosend $COMM_ARGS" echo -n "e)" @@ -219,7 +219,11 @@ run_test il $numjobs print_result rm -f /tmp/perftest.log.* -echo "d) this test is not yet implemented" +COMPONENT_ARGS="-d --lazy=10 --nosync --norecover $COMM_ARGS" +echo -n "d)" +run_test il $numjobs +print_result +rm -f /tmp/perftest.log.* COMPONENT_ARGS="-d $COMM_ARGS" echo -n "e)" diff --git a/org.glite.lb.logger/src/queue_thread.c b/org.glite.lb.logger/src/queue_thread.c index 8cdfb2c..858487e 100644 --- a/org.glite.lb.logger/src/queue_thread.c +++ b/org.glite.lb.logger/src/queue_thread.c @@ -34,6 +34,7 @@ queue_thread(void *q) struct event_queue *eq = (struct event_queue *)q; int ret, exit; int retrycnt; + int close_timeout; if(init_errors(0) < 0) { il_log(LOG_ERR, "Error initializing thread specific data, exiting!"); @@ -59,7 +60,16 @@ queue_thread(void *q) && (eq->flushing != 1) #endif ) { - ret = event_queue_wait(eq, 0); + if(lazy_close && close_timeout) { + ret = event_queue_wait(eq, close_timeout); + if(ret == 1) {/* timeout? */ + event_queue_close(eq); + il_log(LOG_DEBUG, " connection to %s:%d closed\n", + eq->dest_name, eq->dest_port); + } + close_timeout = 0; + } else + ret = event_queue_wait(eq, 0); if(ret < 0) { /* error waiting */ il_log(LOG_ERR, "queue_thread: %s\n", error_get_msg()); @@ -112,7 +122,13 @@ queue_thread(void *q) } /* switch */ /* we are done for now anyway, so close the queue */ + if((ret == 1) && lazy_close) + close_timeout = default_close_timeout; + else { event_queue_close(eq); + il_log(LOG_DEBUG, " connection to %s:%d closed\n", + eq->dest_name, eq->dest_port); + } } #if defined(INTERLOGD_HANDLE_CMD) && defined(INTERLOGD_FLUSH) -- 1.8.2.3