* added some queue statistics
authorMichal Voců <michal@ruk.cuni.cz>
Mon, 12 Jun 2006 16:13:38 +0000 (16:13 +0000)
committerMichal Voců <michal@ruk.cuni.cz>
Mon, 12 Jun 2006 16:13:38 +0000 (16:13 +0000)
* added thread monitoring startup (if defined LB_PROF)
* implemented lazy connection close
* added the lazy test case

org.glite.lb.logger/src/event_queue.c
org.glite.lb.logger/src/il_error.c
org.glite.lb.logger/src/interlogd.c
org.glite.lb.logger/src/interlogd.h
org.glite.lb.logger/src/perftest_il.sh
org.glite.lb.logger/src/queue_thread.c

index 7161505..bc9a44e 100644 (file)
@@ -172,6 +172,9 @@ event_queue_insert(struct event_queue *eq, struct server_msg *msg)
     eq->mark_prev = el;
 #endif
 
+  if(++eq->cur_len > eq->max_len)
+         eq->max_len = eq->cur_len;
+
   event_queue_unlock(eq);
   /* end of critical section */
 
@@ -255,6 +258,9 @@ event_queue_remove(struct event_queue *eq)
          eq->tail = NULL;
   }
 #endif
+  if(--eq->cur_len == 0) 
+         eq->times_empty++;
+
   event_queue_unlock(eq);
   /* end of critical section */
     
index f57b2ed..e608b1c 100644 (file)
@@ -8,6 +8,11 @@
 #include <pthread.h>
 #include <stdlib.h>
 
+#ifdef LB_PROF
+#include <sys/gmon.h>
+extern void _start (void), etext (void);
+#endif
+
 /* XXX DK: */
 #include <err.h> // SSL header file 
 
@@ -49,6 +54,7 @@ error_get_err ()
   return(err);
 }
 
+
 int
 init_errors(int level)
 {
@@ -77,6 +83,10 @@ init_errors(int level)
   if(level)
     log_level = level;
 
+#ifdef LB_PROF
+  monstartup((u_long)&_start, (u_long)&etext);
+#endif
+
   return(0);
 }
 
index ce5d522..3281718 100644 (file)
@@ -56,6 +56,7 @@ static void usage (int status)
               "  -b, --book                 send events to bookkeeping server only\n"
               "  -l, --log-server <host>    specify address of log server\n"
               "  -s, --socket <path>        non-default path of local socket\n"
+              "  -L, --lazy [<timeout>]     be lazy when closing connections to servers\n"
 #ifdef LB_PERF
               "  -n, --nosend               PERFTEST: consume events instead of sending\n"
               "  -S, --nosync               PERFTEST: do not check logd files for lost events\n"
@@ -76,6 +77,8 @@ static int debug;
 static int verbose = 0;
 char *file_prefix = DEFAULT_PREFIX;
 int bs_only = 0;
+int lazy_close = 0;
+int default_close_timeout;
 #ifdef LB_PERF
 int nosend = 0, norecover=0, nosync=0, noparse=0;
 char *event_source = NULL;
@@ -101,6 +104,7 @@ static struct option const long_options[] =
   {"CAdir", required_argument, 0, 'C'},
   {"log-server", required_argument, 0, 'l'},
   {"socket", required_argument, 0, 's'},
+  {"lazy", optional_argument, 0, 'L'},
 #ifdef LB_PERF
   {"nosend", no_argument, 0, 'n'},
   {"nosync", no_argument, 0, 'S'},
@@ -146,6 +150,7 @@ decode_switches (int argc, char **argv)
                           "j:" /* num jobs */
 #endif
 #endif                    
+                          "L::" /* lazy */
                           "s:", /* socket */
                           long_options, (int *) 0)) != EOF)
     {
@@ -194,6 +199,14 @@ decode_switches (int argc, char **argv)
          socket_path = strdup(optarg);
          break;
 
+       case 'L':
+               lazy_close = 1;
+               if(optarg) 
+                       default_close_timeout = atoi(optarg);
+               else
+                       default_close_timeout = TIMEOUT;
+               break;
+
 #ifdef LB_PERF
        case 'n':
                nosend = 1;
@@ -249,10 +262,10 @@ main (int argc, char **argv)
   setlinebuf(stdout);
   setlinebuf(stderr);
 
-  i = decode_switches (argc, argv);
-
   if ((p = getenv("EDG_WL_INTERLOG_TIMEOUT"))) TIMEOUT = atoi(p);
 
+  i = decode_switches (argc, argv);
+
   /* force -b if we do not have log server */
   if(log_server == NULL) {
     log_server = strdup(DEFAULT_LOG_SERVER);
@@ -293,6 +306,9 @@ main (int argc, char **argv)
     il_log(LOG_CRIT, "Failed to initialize output event queues: %s\n", error_get_msg());
     exit(EXIT_FAILURE);
   }
+  if(lazy_close)
+         il_log(LOG_DEBUG, "  using lazy mode when closing connections, timeout %d\n",
+                default_close_timeout);
 
   if (CAcert_dir)
      setenv("X509_CERT_DIR", CAcert_dir, 1);
index 656e18d..9ec0d55 100644 (file)
@@ -70,6 +70,8 @@ extern char *key_file;
 extern char *CAcert_dir;
 extern int bs_only;
 extern int killflg;
+extern int lazy_close;
+extern int default_close_timeout;
 #ifdef LB_PERF
 extern int nosend, nosync, norecover, noparse;
 #ifdef PERF_EVENTS_INLINE
@@ -136,6 +138,10 @@ struct event_queue {
        int                     flush_result;   /* result of flush operation */
        pthread_cond_t          flush_cond;     /* condition variable for flush operation */
 #endif
+       /* statistics */
+       int                     times_empty;    /* number of times the queue was emptied */
+       int                     max_len;        /* max queue length */
+       int                     cur_len;        /* current length */
 };
 
 
index b647c42..9879951 100644 (file)
@@ -120,7 +120,7 @@ run_test il $numjobs
 print_result
 rm -f /tmp/perftest.log.*
 
-echo "d)  this test is not yet implemented"
+echo "d)  this test is not applicable"
 
 CONSUMER_ARGS="-d --nosend $COMM_ARGS"
 echo -n "e)"
@@ -219,7 +219,11 @@ run_test il $numjobs
 print_result
 rm -f /tmp/perftest.log.*
 
-echo "d) this test is not yet implemented"
+COMPONENT_ARGS="-d  --lazy=10 --nosync --norecover $COMM_ARGS"
+echo -n "d)"
+run_test il $numjobs
+print_result
+rm -f /tmp/perftest.log.*
 
 COMPONENT_ARGS="-d $COMM_ARGS"
 echo -n "e)"
index 8cdfb2c..858487e 100644 (file)
@@ -34,6 +34,7 @@ queue_thread(void *q)
        struct event_queue *eq = (struct event_queue *)q;
        int ret, exit;
        int retrycnt;
+       int close_timeout;
 
        if(init_errors(0) < 0) {
                il_log(LOG_ERR, "Error initializing thread specific data, exiting!");
@@ -59,7 +60,16 @@ queue_thread(void *q)
                       && (eq->flushing != 1)
 #endif
                        ) {
-                       ret = event_queue_wait(eq, 0);
+                       if(lazy_close && close_timeout) {
+                               ret = event_queue_wait(eq, close_timeout);
+                               if(ret == 1) {/* timeout? */
+                                       event_queue_close(eq);
+                                       il_log(LOG_DEBUG, "  connection to %s:%d closed\n",
+                                              eq->dest_name, eq->dest_port);
+                               }
+                               close_timeout = 0;
+                       } else 
+                               ret = event_queue_wait(eq, 0);
                        if(ret < 0) {
                                /* error waiting */
                                il_log(LOG_ERR, "queue_thread: %s\n", error_get_msg());
@@ -112,7 +122,13 @@ queue_thread(void *q)
                        } /* switch */
                        
                        /* we are done for now anyway, so close the queue */
+                       if((ret == 1) && lazy_close)
+                               close_timeout = default_close_timeout;
+                       else {
                                event_queue_close(eq);
+                               il_log(LOG_DEBUG, "  connection to %s:%d closed\n",
+                                      eq->dest_name, eq->dest_port);
+                       }
                } 
 
 #if defined(INTERLOGD_HANDLE_CMD) && defined(INTERLOGD_FLUSH)