implementation of parallel delivery to server
authorMichal Voců <michal@ruk.cuni.cz>
Mon, 25 Oct 2010 19:55:01 +0000 (19:55 +0000)
committerMichal Voců <michal@ruk.cuni.cz>
Mon, 25 Oct 2010 19:55:01 +0000 (19:55 +0000)
17 files changed:
org.glite.lb.logger/Makefile
org.glite.lb.logger/interface/interlogd.h
org.glite.lb.logger/src/event_queue.c
org.glite.lb.logger/src/event_store.c
org.glite.lb.logger/src/il_master.c
org.glite.lb.logger/src/input_queue_socket.c
org.glite.lb.logger/src/interlogd.c
org.glite.lb.logger/src/perftest_il.sh
org.glite.lb.logger/src/queue_mgr.c
org.glite.lb.logger/src/queue_thread.c
org.glite.lb.logger/src/recover.c
org.glite.lb.logger/src/send_event.c
org.glite.lb.logger/test/IlTestBase.cpp
org.glite.lb.logger/test/event_queueTest.cpp
org.glite.lb.logger/test/il_test.cpp
org.glite.lb.logger/test/input_queue_socketTest.cpp
org.glite.lb.logger/test/server_msgTest.cpp

index 541eca0..22fbfff 100644 (file)
@@ -71,7 +71,7 @@ EXT_LIB:=-lglite_lbu_trio -lglite_lbu_log
 
 GLITE_GSS_LIB:=-lglite_security_gss
 
-TEST_LIBS:=-L${cppunit_prefix}/${libdir} -lcppunit
+TEST_LIBS:=-L${cppunit_prefix}/lib -lcppunit
 TEST_INC:=-I${cppunit_prefix}/include
 
 LOGD_OBJS:= logd_proto.o logd.o
@@ -106,7 +106,7 @@ INTERLOG_TEST_OBJS:= \
        input_queue_socket.o \
        input_queue_socketTest.o \
        send_event.o \
-       plugin_mgr. o \
+       plugin_mgr.o \
        event_queue.o \
        event_queueTest.o \
        IlTestBase.o \
@@ -172,7 +172,7 @@ check.ll:
        -echo commented out -- fix needed
 
 check.il: ${INTERLOG_TEST_OBJS}
-       ${LINKXX} -o $@ ${COMMON_LIB}_${thrflavour} ${GLITE_GSS_LIB}_${nothrflavour} ${TEST_LIBS} -lpthread $+
+       ${LINKXX} -o $@ ${COMMON_LIB}_${thrflavour} ${TEST_LIBS} ${EXT_LIB} -lpthread $+
 
 install:
        -mkdir -p ${PREFIX}/bin
index fc8d95f..8ddb5b9 100644 (file)
@@ -182,18 +182,23 @@ struct server_msg {
 };
 
 
-struct event_queue {
+struct queue_thread {
+       pthread_t               thread_id;      /* id's of associated threads */
        edg_wll_GssConnection   gss;            /* GSS connection */
+       char                   *jobid;
+       int                     timeout;        /* queue timeout */
+       int                     first_event_sent; /* connection can be preempted by server */
+       struct event_queue_msg *current;        /* current message being sent */
+};
+
+struct event_queue {
        char                   *dest_name;
        int                     dest_port;
-       char                       *dest;
-       int                     timeout;        /* queue timeout */
-       struct event_queue_msg *tail;           /* last message in the queue */
+       char                   *dest;
        struct event_queue_msg *head;           /* first message in the queue */
        struct event_queue_msg *tail_ems;       /* last priority message in the queue (or NULL) */
-       struct event_queue_msg *mark_this;      /* mark message for removal */
-       struct event_queue_msg *mark_prev;      /* predecessor of the marked message */
-       pthread_t               thread_id;      /* id of associated thread */
+       int                     num_threads;    /* number of delivery threads */
+       struct queue_thread    *thread;         /* info for delivery threads */
        pthread_rwlock_t        update_lock;    /* mutex for queue updates */
        pthread_mutex_t         cond_lock;      /* mutex for condition variable */
        pthread_cond_t          ready_cond;     /* condition variable for message arrival */
@@ -204,19 +209,18 @@ struct event_queue {
        int                     times_empty;    /* number of times the queue was emptied */
        int                     max_len;        /* max queue length */
        int                     cur_len;        /* current length */
-       int                                             throttling;             /* event insertion suspend flag */
-       int                                             first_event_sent; /* connection can be preempted by server */
+       int                     throttling;     /* event insertion suspend flag */
        /* delivery methods */
-       int             (*event_queue_connect)(struct event_queue *);
-       int             (*event_queue_send)(struct event_queue *);
-       int             (*event_queue_close)(struct event_queue *);
-       void                               *plugin_data;        /* opaque data used by output plugins */
+       int             (*event_queue_connect)(struct event_queue *, struct queue_thread *);
+       int             (*event_queue_send)(struct event_queue *, struct queue_thread *);
+       int             (*event_queue_close)(struct event_queue *, struct queue_thread *);
+       void                   *plugin_data;    /* opaque data used by output plugins */
 };
 
 struct il_output_plugin {
-       int     (*event_queue_connect)(struct event_queue *);
-       int     (*event_queue_send)(struct event_queue *);
-       int     (*event_queue_close)(struct event_queue *);
+       int     (*event_queue_connect)(struct event_queue *, struct queue_thread *);
+       int     (*event_queue_send)(struct event_queue *, struct queue_thread *);
+       int     (*event_queue_close)(struct event_queue *, struct queue_thread *);
        int             (*plugin_init)(char *);
        int             (*plugin_supports_scheme)(const char *);
 };
@@ -238,27 +242,27 @@ struct event_queue *event_queue_create(char *, struct il_output_plugin *);
 int event_queue_free(struct event_queue *);
 int event_queue_empty(struct event_queue *);
 int event_queue_insert(struct event_queue *, struct server_msg *);
-int event_queue_get(struct event_queue *, struct server_msg **);
-int event_queue_remove(struct event_queue *);
+int event_queue_get(struct event_queue *, struct queue_thread *, struct server_msg **);
+int event_queue_remove(struct event_queue *, struct queue_thread *);
 int event_queue_enqueue(struct event_queue *, char *);
 /* helper */
 int enqueue_msg(struct event_queue *, struct server_msg *);
 int event_queue_move_events(struct event_queue *, struct event_queue *, int (*)(struct server_msg *, void *), void *);
 
 /* protocol event queue methods */
-int event_queue_connect(struct event_queue *);
-int event_queue_send(struct event_queue *);
-int event_queue_close(struct event_queue *);
+int event_queue_connect(struct event_queue *, struct queue_thread *);
+int event_queue_send(struct event_queue *, struct queue_thread *);
+int event_queue_close(struct event_queue *, struct queue_thread *);
 int send_confirmation(long, int);
 
 /* thread event queue methods */
-int event_queue_create_thread(struct event_queue *);
+int event_queue_create_thread(struct event_queue *, int);
 int event_queue_lock(struct event_queue *);
 int event_queue_unlock(struct event_queue *);
 int event_queue_lock_ro(struct event_queue *);
 int event_queue_signal(struct event_queue *);
 int event_queue_wait(struct event_queue *, int);
-int event_queue_sleep(struct event_queue *);
+int event_queue_sleep(struct event_queue *, int);
 int event_queue_wakeup(struct event_queue *);
 int event_queue_cond_lock(struct event_queue *);
 int event_queue_cond_unlock(struct event_queue *);
index 0f7bd32..58bee20 100644 (file)
@@ -36,6 +36,7 @@ limitations under the License.
 struct event_queue_msg {
   struct server_msg *msg;
   struct event_queue_msg *prev;
+  struct event_queue_msg *next;
 };
 
 struct event_queue *
@@ -132,9 +133,10 @@ event_queue_free(struct event_queue *eq)
   if(!event_queue_empty(eq))
     return(-1);
 
+  /* 
   if(eq->thread_id)
     pthread_cancel(eq->thread_id);
-
+  */
 
   pthread_rwlock_destroy(&eq->update_lock);
   pthread_mutex_destroy(&eq->cond_lock);
@@ -194,6 +196,26 @@ event_queue_insert(struct event_queue *eq, struct server_msg *msg)
 
   /* this is critical section */
   event_queue_lock(eq);
+  if(NULL == eq->head) {
+         el->prev = el;
+         el->next = el;
+         eq->head = el;
+         eq->tail_ems = el;
+  } else {
+         if(server_msg_is_priority(msg)) {
+                 el->next = eq->tail_ems->next;
+                 el->prev = eq->tail_ems;
+                 if(eq->head == eq->tail_ems) {
+                         eq->head = el;
+                 }
+         } else {
+                 el->next = eq->head->next;
+                 el->prev = eq->head;
+         }
+         el->next->prev = el;
+         el->prev->next = el;
+  }
+#if 0  /* OLD IMPLEMENTATION */
 #if defined(INTERLOGD_EMS)
   if(server_msg_is_priority(msg)) {
     /* priority messages go first */
@@ -227,6 +249,7 @@ event_queue_insert(struct event_queue *eq, struct server_msg *msg)
   if(eq->mark_this && (el->prev == eq->mark_this))
     eq->mark_prev = el;
 #endif
+#endif /* OLD IMPLEMENTATION */
 
   if(++eq->cur_len > eq->max_len)
          eq->max_len = eq->cur_len;
@@ -239,33 +262,59 @@ event_queue_insert(struct event_queue *eq, struct server_msg *msg)
 
 
 int
-event_queue_get(struct event_queue *eq, struct server_msg **msg)
+event_queue_get(struct event_queue *eq, struct queue_thread *me, struct server_msg **msg)
 {
   struct event_queue_msg *el;
+  int found;
 
   assert(eq != NULL);
   assert(msg != NULL);
 
   event_queue_lock(eq);
+  if(me->jobid) {
+         free(me->jobid);
+         me->jobid = NULL;
+         me->current = NULL;
+  }
+  if(NULL == eq->head) {
+         event_queue_unlock(eq);
+         return 0;
+  }
+  found = 0;
   el = eq->head;
-#if defined(INTERLOGD_EMS)
-  /* this message is marked for removal, it is first on the queue */
-  eq->mark_this = el;
-  eq->mark_prev = NULL;
-#endif
+  do {
+         char *jobid = el->msg->job_id_s;
+         int i;
+         
+         for(i = 0; i < eq->num_threads; i++) {
+                 if(me == eq->thread + i) {
+                         continue;
+                 }
+                 if(eq->thread[i].jobid && strcmp(jobid, eq->thread[i].jobid) == 0) {
+                         break;
+                 }
+         }
+         if(i >= eq->num_threads) {
+                 /* no other thread is working on this job */
+                 found = 1;
+                 break;
+         }
+         el = el->prev;
+  } while(el != eq->head);
+  if(found) {
+         me->current = el;
+         me->jobid = strdup(el->msg->job_id_s);
+         *msg = el->msg;
+  } else {
+  }
   event_queue_unlock(eq);
 
-  if(el == NULL)
-    return(-1);
-
-  *msg = el->msg;
-
-  return(0);
+  return found;
 }
 
 
 int
-event_queue_remove(struct event_queue *eq)
+event_queue_remove(struct event_queue *eq, struct queue_thread *me)
 {
   struct event_queue_msg *el;
 #if defined(INTERLOGD_EMS)
@@ -273,9 +322,31 @@ event_queue_remove(struct event_queue *eq)
 #endif
 
   assert(eq != NULL);
+  
+  el = me->current;
+  if(NULL == el) {
+         return -1;
+  }
 
   /* this is critical section */
   event_queue_lock(eq);
+
+  if(el == el->prev) {
+         /* last element */
+         eq->head = NULL;
+         eq->tail_ems = NULL;
+  } else {
+         el->next->prev = el->prev;
+         el->prev->next = el->next;
+         if(el == eq->head) {
+                 eq->head = el->prev;
+         }
+         if(el == eq->tail_ems) {
+                 eq->tail_ems = el->prev;
+         }
+  }
+
+#if 0 /* OLD IMPLEMENTATION */
 #if defined(INTERLOGD_EMS)
   el = eq->mark_this;
   prev = eq->mark_prev;
@@ -314,6 +385,8 @@ event_queue_remove(struct event_queue *eq)
          eq->tail = NULL;
   }
 #endif
+#endif /* OLD IMPLEMENTATION */
+
   if(--eq->cur_len == 0)
          eq->times_empty++;
 
@@ -330,18 +403,87 @@ event_queue_remove(struct event_queue *eq)
   return(0);
 }
 
+
 int
 event_queue_move_events(struct event_queue *eq_s,
                        struct event_queue *eq_d,
                        int (*cmp_func)(struct server_msg *, void *),
                        void *data)
 {
-       struct event_queue_msg *p, **source_prev, **dest_tail;
+       struct event_queue_msg *p, *q, *last;
+       /* struct event_queue_msg **source_prev, **dest_tail; */
 
        assert(eq_s != NULL);
        assert(data != NULL);
 
        event_queue_lock(eq_s);
+
+       p = eq_s->head;
+       if(NULL == p) {
+               event_queue_unlock(eq_s);
+               return 0;
+       }
+       if(eq_d) {
+               event_queue_lock(eq_d);
+       }
+
+       last = p->next;
+       do {
+               if((*cmp_func)(p->msg, data)) {
+                       glite_common_log(IL_LOG_CATEGORY, LOG_PRIORITY_DEBUG, 
+                                        "      moving event at offset %d(%d) from %s:%d to %s:%d",
+                                        p->msg->offset, p->msg->generation, eq_s->dest_name, eq_s->dest_port,
+                                        eq_d ? eq_d->dest_name : "trash", eq_d ? eq_d->dest_port : -1);
+                       q = p->prev;
+                       /* remove message from the source queue */
+                       if(p->next == p->prev) {
+                               /* removing last message */
+                               eq_s->head = NULL;
+                               eq_s->tail_ems = NULL;
+                       } else {
+                               p->next->prev = p->prev;
+                               p->prev->next = p->next;
+                               if(p == eq_s->head) {
+                                       eq_s->head = p->prev;
+                               }
+                               if(p == eq_s->tail_ems) {
+                                       eq_s->tail_ems = p->prev;
+                               }
+                       }
+                       eq_s->cur_len--;
+                       if(eq_d) {
+                               /* append message to the destination queue */
+                               if(eq_d->head == NULL) {
+                                       eq_d->head = p;
+                                       eq_d->tail_ems = p;
+                                       p->next = p;
+                                       p->prev = p;
+                               } else {
+                                       /* priorities are ignored */
+                                       p->next = eq_d->head->next;
+                                       p->prev = eq_d->head;
+                                       p->next->prev = p;
+                                       p->prev->next = p;
+
+                               }
+                               if(++eq_d->cur_len > eq_d->max_len) {
+                                       eq_d->max_len = eq_d->cur_len;
+                               }
+                       } else {
+                               /* signal that the message was 'delivered' */
+                               event_store_commit(p->msg->es, p->msg->ev_len, queue_list_is_log(eq_s),
+                                                  p->msg->generation);
+                               /* free the message */
+                               server_msg_free(p->msg);
+                               free(p);
+                       }
+                       p = q;
+               } else {
+                       p = p->prev;
+               }
+       } while (eq_s->head && p != last);
+
+#if 0 /* OLD IMPLEMENTATION */
        if(eq_d) {
                event_queue_lock(eq_d);
                /* dest tail is set to point to the last (NULL) pointer in the list */
@@ -385,6 +527,8 @@ event_queue_move_events(struct event_queue *eq_s,
                }
                p = *source_prev;
        }
+#endif /* OLD IMPLEMENTATION */
+
        if(eq_s->cur_len <= queue_size_low) {
                eq_s->throttling = 0;
        }
index 1824899..98f7755 100644 (file)
@@ -850,7 +850,7 @@ event_store_recover(struct event_store *es)
                  glite_common_log(IL_LOG_CATEGORY, LOG_PRIORITY_INFO, 
                                   "    all messages for notif id %s are now destined to %s",
                         es->job_id_s, eq_b->dest);
-                 if(event_queue_create_thread(eq_b) < 0) {
+                 if(event_queue_create_thread(eq_b, parallel) < 0) {
                          ret = -1;
                  } else {
                          event_queue_cond_lock(eq_b);
index 2cc1018..16079d6 100644 (file)
@@ -35,11 +35,12 @@ limitations under the License.
 
 int
 enqueue_msg(struct event_queue *eq, struct server_msg *msg)
+/* global: parallel */
 {
        int ret;
 
        /* fire thread to take care of this queue */
-       if(event_queue_create_thread(eq) < 0)
+       if(event_queue_create_thread(eq, parallel) < 0)
                return(-1);
 
 #if defined(IL_NOTIFICATIONS)
index ffcca81..9489fe6 100644 (file)
@@ -28,7 +28,7 @@ limitations under the License.
 #include "interlogd.h"
 
 
-static const int   SOCK_QUEUE_MAX = 50;
+static const int   SOCK_QUEUE_MAX = 100;
 extern char *socket_path;
 
 static int sock;
index c6db23b..08b2728 100644 (file)
@@ -113,7 +113,7 @@ int default_close_timeout;
 size_t max_store_size;
 size_t queue_size_low = 0;
 size_t queue_size_high = 0;
-int parallel = 0;
+int parallel = 1;
 #ifdef LB_PERF
 int nosend = 0, norecover=0, nosync=0, noparse=0;
 char *event_source = NULL;
@@ -182,9 +182,9 @@ decode_switches (int argc, char **argv)
                           "C:"          /* CA dir */
                           "b"  /* only bookeeping */
                           "i:"  /* pidfile*/
-               "l:" /* log server */
+                          "l:" /* log server */
                           "d" /* debug */
-                          "p" /* parallel */
+                          "p::" /* parallel */
                           "q:"
                           "Q:"
                           "F:" /* conf file */
@@ -593,6 +593,7 @@ main (int argc, char **argv)
   }
 #endif
 
+  glite_common_log(LOG_CATEGORY_CONTROL, LOG_PRIORITY_INFO, "Using %d threads for parallel delivery.", parallel);
   glite_common_log(LOG_CATEGORY_CONTROL, LOG_PRIORITY_INFO, "Entering main loop.");
 
   /* do the work */
index 45d16df..2f906fc 100644 (file)
@@ -17,6 +17,7 @@
 #
 
 numjobs=10
+parallel=1
 
 # XXX - there must be better way to find stage
 if [ -z "${GLITE_LOCATION}" ]; then
@@ -44,7 +45,7 @@ COMM_ARGS="-s /tmp/interlogger.perftest --file-prefix=/tmp/perftest.log"
 #TEST_VARIANT=
 
 SILENT=0
-while getopts "G:t:n:s" OPTION 
+while getopts "G:t:n:sp:" OPTION 
 do
     case "$OPTION" in 
     "G") TEST_GROUP=$OPTARG
@@ -59,6 +60,9 @@ do
     "s") SILENT=1
     ;;
 
+    "p") parallel=$OPTARG
+    ;;
+
     esac
 done
 
@@ -273,7 +277,7 @@ echo ""
 fi
 
 PERFTEST_CONSUMER=$STAGEDIR/bin/glite-lb-bkserverd
-CONSUMER_ARGS="--silent -S /tmp -D /tmp -t 1 -d --perf-sink=1 -p 10500 -w 10503"
+CONSUMER_ARGS="-g --silent -S /tmp -D /tmp -t 1 -d --perf-sink=1 -p 10500 -w 10503"
 PERFTEST_COMPONENT=$STAGEDIR/bin/glite-lb-interlogd-perf
 LOGJOBS_ARGS=" -m localhost:10500 $COMM_ARGS"
 }
@@ -316,7 +320,7 @@ group_c_test_x ()
 
 group_c_test_d ()
 {
-    COMPONENT_ARGS="-i /tmp/perftest_il.pid -d  --lazy=10 --nosync --norecover $COMM_ARGS"
+    COMPONENT_ARGS="-p $parallel -i /tmp/perftest_il.pid -d  --lazy=10 --nosync --norecover $COMM_ARGS"
     echo -n "d)"
     run_test il $numjobs
     print_result
@@ -325,7 +329,7 @@ group_c_test_d ()
 
 group_c_test_e ()
 {
-    COMPONENT_ARGS="-i /tmp/perftest_il.pid -d $COMM_ARGS"
+    COMPONENT_ARGS="-p $parallel -i /tmp/perftest_il.pid -d $COMM_ARGS"
     echo -n "e)"
     run_test il $numjobs
     print_result
index ac05324..2bfa4e4 100644 (file)
@@ -28,6 +28,7 @@ limitations under the License.
 #include "interlogd.h"
 
 struct queue_list {
+  struct queue_list *queues;
   struct event_queue *queue;
   char   *dest;
   struct queue_list *next;
@@ -91,9 +92,8 @@ int
 queue_list_add(struct queue_list **ql, const char *dest, struct event_queue *eq)
 {
   struct queue_list *el;
-  
+
   assert(dest != NULL);
-  assert(eq != NULL);
   assert(ql != NULL);
 
   el = malloc(sizeof(*el));
@@ -168,19 +168,18 @@ queue_list_get(char *job_id_s)
     return(NULL);
   
   if(queue_list_find(queues, dest, &q, NULL)) {
-#if !defined(IL_NOTIFICATIONS)
-    free(dest);
-#endif
-    return(q->queue);
+    /* queue found for given destination */
+    eq = q->queue;
   } else {
+    /* no queue for given destination found */
     eq = event_queue_create(dest, outp);
     if(eq)
       queue_list_add(&queues, dest, eq);
+  }
 #if !defined(IL_NOTIFICATIONS)
-    free(dest);
+  free(dest);
 #endif
-    return(eq);
-  }
+  return eq;
 }
 
 
index 8aae585..a4bc1dd 100644 (file)
@@ -30,8 +30,10 @@ void
 queue_thread_cleanup(void *q)
 {
        struct event_queue *eq = (struct event_queue *)q;
+       pthread_t my_id = pthread_self();
+       int me;
 
-       glite_common_log(LOG_CATEGORY_CONTROL, LOG_PRIORITY_WARN, "thread %d exits", eq->thread_id);
+       glite_common_log(LOG_CATEGORY_CONTROL, LOG_PRIORITY_WARN, "thread %d exits", my_id);
 
        /* unlock all held locks */
        /* FIXME: check that the thread always exits when holding these locks;
@@ -41,7 +43,12 @@ queue_thread_cleanup(void *q)
        */
   
        /* clear thread id */
-       eq->thread_id = 0;
+       for(me = 0; me < eq->num_threads; me++) {
+               if(my_id == eq->thread[me].thread_id) {
+                       eq->thread[me].thread_id = 0;
+                       break;
+               }
+       }
 }
 
 
@@ -60,6 +67,8 @@ void *
 queue_thread(void *q)
 {
        struct event_queue *eq = (struct event_queue *)q;
+       struct queue_thread *me;
+       pthread_t my_id;
        int ret, exit;
        int retrycnt;
        int close_timeout = 0;
@@ -75,6 +84,18 @@ queue_thread(void *q)
                         "  started new thread for delivery to %s",
                         eq->dest);
 
+       my_id = pthread_self();
+       for(me = eq->thread; me - eq->thread < eq->num_threads; me++) {
+               if(my_id == me->thread_id) {
+                       break;
+               }
+       }
+       if(me - eq->thread >= eq->num_threads) {
+               glite_common_log(LOG_CATEGORY_CONTROL, LOG_PRIORITY_ERROR,
+                                "Error looking up thread identity, exiting!");
+               pthread_exit(NULL);
+       }
+
        pthread_cleanup_push(queue_thread_cleanup, q); 
 
        event_queue_cond_lock(eq);
@@ -87,7 +108,7 @@ queue_thread(void *q)
 
                /* if there are no events, wait for them */
                ret = 0;
-               while (event_queue_empty(eq) 
+               while (event_queue_empty(eq)
 #if defined(INTERLOGD_HANDLE_CMD) && defined(INTERLOGD_FLUSH)
                       && (eq->flushing != 1)
 #endif
@@ -95,7 +116,7 @@ queue_thread(void *q)
                        if(lazy_close && close_timeout) {
                                ret = event_queue_wait(eq, close_timeout);
                                if(ret == 1) {/* timeout? */
-                                       (*eq->event_queue_close)(eq);
+                                       (*eq->event_queue_close)(eq, me);
                                        glite_common_log(IL_LOG_CATEGORY, LOG_PRIORITY_DEBUG, 
                                                         "  connection to %s closed",
                                                         eq->dest);
@@ -105,9 +126,10 @@ queue_thread(void *q)
                                ret = event_queue_wait(eq, exit_timeout);
                                if(ret == 1) {
                                        glite_common_log(LOG_CATEGORY_CONTROL, LOG_PRIORITY_INFO, 
-                                                        "  thread idle for more than %d seconds, exiting", 
+                                                        "  thread %x idle for more than %d seconds, exiting", 
+                                                        me,
                                                         exit_timeout);
-                                       (*eq->event_queue_close)(eq);
+                                       (*eq->event_queue_close)(eq, me);
                                        event_queue_cond_unlock(eq);
                                        pthread_exit((void*)0);
                                }
@@ -120,6 +142,7 @@ queue_thread(void *q)
                                event_queue_cond_unlock(eq);
                                pthread_exit((void*)-1);
                        }
+                       /* sleep only once when there is no event available for this particular thread */
                }  /* END while(empty) */
     
 
@@ -128,36 +151,41 @@ queue_thread(void *q)
                 */
                event_queue_cond_unlock(eq);
                
-               /* discard expired events */
-               glite_common_log(IL_LOG_CATEGORY, LOG_PRIORITY_DEBUG, "  discarding expired events");
-               now = time(NULL);
-               event_queue_move_events(eq, NULL, cmp_expires, &now);
+               /* discard expired events - only the first thread */
+               if(me == eq->thread) {
+                 glite_common_log(IL_LOG_CATEGORY, LOG_PRIORITY_DEBUG, "  thread %x: discarding expired events", me);
+                       now = time(NULL);
+                       event_queue_move_events(eq, NULL, cmp_expires, &now);
+               }
                if(!event_queue_empty(eq)) {
 
                        /* deliver pending events */
                        glite_common_log(IL_LOG_CATEGORY, LOG_PRIORITY_DEBUG, 
-                                        "  attempting delivery to %s",
+                                        "  thread %x: attempting delivery to %s",
+                                        me,
                                         eq->dest);
                        /* connect to server */
-                       if((ret=(*eq->event_queue_connect)(eq)) == 0) {
+                       if((ret=(*eq->event_queue_connect)(eq, me)) == 0) {
                                /* not connected */
                                if(error_get_maj() != IL_OK)
                                        glite_common_log(IL_LOG_CATEGORY, LOG_PRIORITY_WARN, 
                                                         "queue_thread: %s", error_get_msg());
 #if defined(IL_NOTIFICATIONS)
                                glite_common_log(IL_LOG_CATEGORY, LOG_PRIORITY_INFO, 
-                                                "    could not connect to client %s, waiting for retry", 
+                                                "    thread %x: could not connect to client %s, waiting for retry", 
+                                                me,
                                                 eq->dest);
 #else
                                glite_common_log(IL_LOG_CATEGORY, LOG_PRIORITY_INFO, 
-                                                "    could not connect to bookkeeping server %s, waiting for retry", 
+                                                "    thread %x: could not connect to bookkeeping server %s, waiting for retry", 
+                                                me,
                                                 eq->dest);
 #endif
                                retrycnt++;
                        } else {
                                retrycnt = 0;
                                /* connected, send events */
-                               switch(ret=(*eq->event_queue_send)(eq)) {
+                               switch(ret=(*eq->event_queue_send)(eq, me)) {
                                        
                                case 0:
                                        /* there was an error and we still have events to send */
@@ -166,13 +194,14 @@ queue_thread(void *q)
                                                                 "queue_thread: %s", 
                                                                 error_get_msg());
                                        glite_common_log(IL_LOG_CATEGORY, LOG_PRIORITY_DEBUG, 
-                                                        "  events still waiting");
+                                                        "  thread %x: events still waiting", me);
                                        break;
                                        
                                case 1:
                                        /* hey, we are done for now */
                                        glite_common_log(IL_LOG_CATEGORY, LOG_PRIORITY_DEBUG, 
-                                                        "  all events for %s sent", 
+                                                        "  thread %x: all events for %s sent", 
+                                                        me,
                                                         eq->dest);
                                        break;
                                        
@@ -190,9 +219,10 @@ queue_thread(void *q)
                                if((ret == 1) && lazy_close)
                                        close_timeout = default_close_timeout;
                                else {
-                                       (*eq->event_queue_close)(eq);
+                                       (*eq->event_queue_close)(eq, me);
                                        glite_common_log(IL_LOG_CATEGORY, LOG_PRIORITY_DEBUG,
-                                                        "  connection to %sclosed",
+                                                        "  thread %x: connection to %s closed",
+                                                        me,
                                                         eq->dest);
                                }
                        }
@@ -218,6 +248,15 @@ queue_thread(void *q)
 #else
 #endif
 
+               event_queue_lock(eq);
+               if(me->jobid) {
+                       free(me->jobid); 
+                       me->jobid = NULL;
+                       me->current = NULL;
+               }
+               event_queue_unlock(eq);
+
+
                /* if there was some error with server, sleep for a while */
                /* iff !event_queue_empty() */
                /* also allow for one more try immediately after server disconnect,
@@ -225,8 +264,8 @@ queue_thread(void *q)
 #ifndef LB_PERF
                if((ret == 0) && (retrycnt > 0)) {
                        glite_common_log(IL_LOG_CATEGORY, LOG_PRIORITY_DEBUG, 
-                                        "    sleeping");
-                       event_queue_sleep(eq);
+                                        "    thread %x: sleeping", me);
+                       event_queue_sleep(eq, me->timeout);
                }
 #endif
 
@@ -249,33 +288,43 @@ queue_thread(void *q)
 
 
 int
-event_queue_create_thread(struct event_queue *eq)
+event_queue_create_thread(struct event_queue *eq, int num_threads)
 {
        pthread_attr_t attr;
+       int i;
 
        assert(eq != NULL);
 
        event_queue_lock(eq);
 
-       /* if there is a thread already, just return */
-       if(eq->thread_id != 0) {
-               event_queue_unlock(eq);
-               return(0);
+       eq->num_threads = num_threads;
+
+       /* create thread data */
+       if(NULL == eq->thread) {
+               eq->thread = calloc(num_threads, sizeof(*eq->thread));
+               if(NULL == eq->thread) {
+                       set_error(IL_NOMEM, ENOMEM, "event_queue_create: error allocating data for threads");
+                       event_queue_unlock(eq);
+                       return -1;
+               }
        }
 
-       /* create the thread itself */
-       pthread_attr_init(&attr);
-       pthread_attr_setstacksize(&attr, 65536); 
-       if(pthread_create(&eq->thread_id, &attr, queue_thread, eq) < 0) {
-               eq->thread_id = 0;
-               set_error(IL_SYS, errno, "event_queue_create_thread: error creating new thread");
-               event_queue_unlock(eq);
-               return(-1);
+       /* create the threads itself */
+       for(i = 0; i < eq->num_threads; i++) {
+               pthread_attr_init(&attr);
+               pthread_attr_setstacksize(&attr, 65536); 
+               if(eq->thread[i].thread_id == 0) {
+                       if(pthread_create(&eq->thread[i].thread_id, &attr, queue_thread, eq) < 0) {
+                               eq->thread[i].thread_id = 0;
+                               set_error(IL_SYS, errno, "event_queue_create_thread: error creating new thread");
+                               event_queue_unlock(eq);
+                               return(-1);
+                       }
+               }
+               /* the thread is never going to be joined */
+               pthread_detach(eq->thread[i].thread_id);
        }
 
-       /* the thread is never going to be joined */
-       pthread_detach(eq->thread_id);
-       
        event_queue_unlock(eq);
 
        return(1);
@@ -384,7 +433,7 @@ event_queue_wait(struct event_queue *eq, int timeout)
 }
 
 
-int event_queue_sleep(struct event_queue *eq)
+int event_queue_sleep(struct event_queue *eq, int timeout)
 {
 #if defined(INTERLOGD_HANDLE_CMD) && defined(INTERLOGD_FLUSH)
        struct timespec ts;
@@ -394,7 +443,7 @@ int event_queue_sleep(struct event_queue *eq)
        assert(eq != NULL);
 
        gettimeofday(&tv, NULL);
-       ts.tv_sec = tv.tv_sec + eq->timeout;
+       ts.tv_sec = tv.tv_sec + timeout;
        ts.tv_nsec = 1000 * tv.tv_usec;
        if((ret=pthread_cond_timedwait(&eq->flush_cond, &eq->cond_lock, &ts)) < 0) {
                if(ret != ETIMEDOUT) {
@@ -406,7 +455,7 @@ int event_queue_sleep(struct event_queue *eq)
                }
        }
 #else
-       sleep(eq->timeout);
+       sleep(timeout);
 #endif
        return(0);
 }
index 250a40a..20a9d9e 100644 (file)
@@ -87,6 +87,10 @@ recover_thread(void *q)
                                                 new_creds->name);
                        }
                }
+#ifndef LB_PERF
                sleep(RECOVER_TIMEOUT);
+#else
+               sleep(2);
+#endif
        }
 }
index b48ab23..979962c 100644 (file)
@@ -147,7 +147,7 @@ gss_reader(void *user_data, char *buffer, int max_len)
  */
 static
 int 
-get_reply(struct event_queue *eq, char **buf, int *code_min)
+get_reply(struct event_queue *eq, struct queue_thread *me, char **buf, int *code_min)
 {
   char *msg=NULL;
   int ret, code;
@@ -157,7 +157,7 @@ get_reply(struct event_queue *eq, char **buf, int *code_min)
 
   tv.tv_sec = TIMEOUT;
   tv.tv_usec = 0;
-  data.gss = &eq->gss;
+  data.gss = &me->gss;
   data.timeout = &tv;
   len = read_il_data(&data, &msg, gss_reader);
   if(len < 0) {
@@ -179,7 +179,7 @@ get_reply(struct event_queue *eq, char **buf, int *code_min)
  *  Returns: 0 - not connected, timeout set, 1 - OK
  */
 int 
-event_queue_connect(struct event_queue *eq)
+event_queue_connect(struct event_queue *eq, struct queue_thread *me)
 {
   int ret;
   struct timeval tv;
@@ -187,12 +187,13 @@ event_queue_connect(struct event_queue *eq)
   cred_handle_t *local_cred_handle;
 
   assert(eq != NULL);
+  assert(me != NULL);
 
 #ifdef LB_PERF
   if(!nosend) {
 #endif
 
-  if(eq->gss.context == NULL) {
+  if(me->gss.context == NULL) {
 
     tv.tv_sec = TIMEOUT;
     tv.tv_usec = 0;
@@ -208,7 +209,7 @@ event_queue_connect(struct event_queue *eq)
     glite_common_log(IL_LOG_CATEGORY, LOG_PRIORITY_DEBUG, 
                     "    trying to connect to %s:%d", 
                     eq->dest_name, eq->dest_port);
-    ret = edg_wll_gss_connect(local_cred_handle->creds, eq->dest_name, eq->dest_port, &tv, &eq->gss, &gss_stat);
+    ret = edg_wll_gss_connect(local_cred_handle->creds, eq->dest_name, eq->dest_port, &tv, &me->gss, &gss_stat);
     if(pthread_mutex_lock(&cred_handle_lock) < 0)
            abort();
     /* check if we need to release the credentials */
@@ -229,11 +230,11 @@ event_queue_connect(struct event_queue *eq)
       set_error(IL_DGGSS, ret,
                (ret == EDG_WLL_GSS_ERROR_GSS) ? gss_err : "event_queue_connect: edg_wll_gss_connect");
       if (gss_err) free(gss_err);
-      eq->gss.context = NULL;
-      eq->timeout = TIMEOUT;
+      me->gss.context = NULL;
+      me->timeout = TIMEOUT;
       return(0);
     }
-    eq->first_event_sent = 0;
+    me->first_event_sent = 0;
   }
 
 #ifdef LB_PERF
@@ -245,19 +246,20 @@ event_queue_connect(struct event_queue *eq)
 
 
 int
-event_queue_close(struct event_queue *eq)
+event_queue_close(struct event_queue *eq, struct queue_thread *me)
 {
   assert(eq != NULL);
+  assert(me != NULL);
 
 #ifdef LB_PERF
   if(!nosend) {
 #endif
 
-  if(eq->gss.context != NULL) {
-    edg_wll_gss_close(&eq->gss, NULL);
-    eq->gss.context = NULL;
+  if(me->gss.context != NULL) {
+    edg_wll_gss_close(&me->gss, NULL);
+    me->gss.context = NULL;
   }
-  eq->first_event_sent = 0;
+  me->first_event_sent = 0;
 #ifdef LB_PERF
   }
 #endif
@@ -270,14 +272,15 @@ event_queue_close(struct event_queue *eq)
  *   Returns: -1 - system error, 0 - not send, 1 - queue empty
  */
 int 
-event_queue_send(struct event_queue *eq)
+event_queue_send(struct event_queue *eq, struct queue_thread *me)
 {
   assert(eq != NULL);
+  assert(me != NULL);
 
 #ifdef LB_PERF
   if(!nosend) {
 #endif
-  if(eq->gss.context == NULL)
+  if(me->gss.context == NULL)
     return(0);
 #ifdef LB_PERF
   }
@@ -294,8 +297,8 @@ event_queue_send(struct event_queue *eq)
 
     clear_error();
 
-    if(event_queue_get(eq, &msg) < 0) 
-      return(-1);
+    if(event_queue_get(eq, me, &msg) == 0) 
+      return(1);
 
     glite_common_log(IL_LOG_CATEGORY, LOG_PRIORITY_DEBUG, 
                     "    trying to deliver event at offset %d for job %s", 
@@ -307,23 +310,23 @@ event_queue_send(struct event_queue *eq)
        if (msg->len) {
            tv.tv_sec = TIMEOUT;
            tv.tv_usec = 0;
-           ret = edg_wll_gss_write_full(&eq->gss, msg->msg, msg->len, &tv, &bytes_sent, &gss_stat);
+           ret = edg_wll_gss_write_full(&me->gss, msg->msg, msg->len, &tv, &bytes_sent, &gss_stat);
            if(ret < 0) {
-             if (ret == EDG_WLL_GSS_ERROR_ERRNO && errno == EPIPE && eq->first_event_sent )
-               eq->timeout = 0;
+             if (ret == EDG_WLL_GSS_ERROR_ERRNO && errno == EPIPE && me->first_event_sent )
+               me->timeout = 0;
              else
-               eq->timeout = TIMEOUT;
+               me->timeout = TIMEOUT;
              return(0);
            }
            
-           if((code = get_reply(eq, &rep, &code_min)) < 0) {
+           if((code = get_reply(eq, me, &rep, &code_min)) < 0) {
                    /* could not get the reply properly, so try again later */
-                   if (eq->first_event_sent) {
+                   if (me->first_event_sent) {
                        /* could be expected server connection preemption */
                        clear_error();
-                       eq->timeout = 1;
+                       me->timeout = 1;
                    } else {
-                       eq->timeout = TIMEOUT;
+                       me->timeout = TIMEOUT;
                        glite_common_log(IL_LOG_CATEGORY, LOG_PRIORITY_WARN, "  error reading server %s reply: %s", 
                                         eq->dest_name, error_get_msg());
                     }
@@ -354,7 +357,7 @@ event_queue_send(struct event_queue *eq)
        case LB_PERM:
        case LB_DBERR:
       /* non fatal errors (for us) */
-      eq->timeout = TIMEOUT;
+      me->timeout = TIMEOUT;
       return(0);
        
     case LB_OK:
@@ -380,8 +383,8 @@ event_queue_send(struct event_queue *eq)
                               "send_event: %s", 
                               error_get_msg());
        
-      event_queue_remove(eq);
-      eq->first_event_sent = 1;
+      event_queue_remove(eq, me);
+      me->first_event_sent = 1;
       break;
       
     } /* switch */
index 4ef7301..903f381 100644 (file)
@@ -25,8 +25,15 @@ const char *IlTestBase::msg_enc = "             429\n6 michal\n415 DATE=20040831
 
 const struct server_msg IlTestBase::smsg = {
        "https://some.host:1234/x67qr549qc",
+       0L,
        (char*)IlTestBase::msg_enc,
        strlen(IlTestBase::msg_enc),
        strlen(IlTestBase::msg) + 1,
-       NULL
+       NULL,
+       0,
+       0L,
+       "some.host",
+       1234,
+       "some.host:1234",
+       0
 };
index e07c19f..f5c761f 100644 (file)
@@ -23,6 +23,7 @@ extern "C" {
 struct event_queue_msg {
   struct server_msg *msg;
   struct event_queue_msg *prev;
+  struct event_queue_msg *next;
 };
 }
 
@@ -41,7 +42,15 @@ class event_queueTest: public CppUnit::TestFixture
 public:
        void setUp() {
                server = strdup("localhost:8080");
-               eq = event_queue_create(server);
+               eq = event_queue_create(server, NULL);
+               threads[0].thread_id = pthread_self();
+               threads[0].gss.context = NULL;
+               threads[0].jobid = strdup("1");
+               threads[1].thread_id = (pthread_t)1;
+               threads[1].gss.context = NULL;
+               threads[1].jobid = strdup("2");
+               eq->thread = threads;
+               eq->num_threads = 2;
                free(server);
        }
 
@@ -49,7 +58,7 @@ public:
                struct event_queue_msg *mp;
                struct server_msg *m;
 
-               for(mp = eq->head; mp != NULL; ) {
+               for(mp = eq->head; mp != eq->head; ) {
                        struct event_queue_msg *mq;
 
                        server_msg_free(mp->msg);
@@ -65,12 +74,8 @@ public:
                CPPUNIT_ASSERT( eq != NULL );
                CPPUNIT_ASSERT_EQUAL( string(eq->dest_name), string("localhost") );
                CPPUNIT_ASSERT_EQUAL( eq->dest_port, 8081 );
-               CPPUNIT_ASSERT( eq->tail == NULL );
                CPPUNIT_ASSERT( eq->head == NULL );
                CPPUNIT_ASSERT( eq->tail_ems == NULL );
-               CPPUNIT_ASSERT( eq->mark_this == NULL );
-               CPPUNIT_ASSERT( eq->mark_prev == NULL );
-               CPPUNIT_ASSERT( eq->thread_id == 0 );
                CPPUNIT_ASSERT( eq->flushing == 0 );
                CPPUNIT_ASSERT( eq->flush_result == 0 );
        }
@@ -83,67 +88,67 @@ public:
                mp = eq->head;
                m = mp->msg;
                CPPUNIT_ASSERT_EQUAL( string(m->job_id_s), string("2") );
-               CPPUNIT_ASSERT_EQUAL( mp, eq->tail_ems );
+               CPPUNIT_ASSERT_EQUAL( mp, mp->next->prev);
                mp = mp->prev;
                m = mp->msg;
                CPPUNIT_ASSERT_EQUAL( string(m->job_id_s), string("1") );
+               CPPUNIT_ASSERT_EQUAL( mp, eq->tail_ems );
+               CPPUNIT_ASSERT_EQUAL( mp, mp->next->prev);
                mp = mp->prev;
                m = mp->msg;
                CPPUNIT_ASSERT_EQUAL( string(m->job_id_s), string("3") );
-               CPPUNIT_ASSERT_EQUAL( mp, eq->tail );
-               CPPUNIT_ASSERT( mp->prev == NULL );
+               CPPUNIT_ASSERT_EQUAL( mp, mp->next->prev);
+               CPPUNIT_ASSERT_EQUAL( mp->prev, eq->head );
        }
 
        void testEventQueueGet() {
-               struct event_queue_msg *mp;
-               struct server_msg *m,sm;
+               struct queue_thread *me = threads + 0;
+               struct server_msg *m;
                int ret;
 
                doSomeInserts();
-               mp = eq->head;
-               eq->head = mp->prev;
-               eq->tail_ems = NULL;
-               server_msg_free(mp->msg);
-               free(mp);
-               ret = event_queue_get(eq, &m);
-               CPPUNIT_ASSERT( ret == 0 );
-               CPPUNIT_ASSERT( eq->mark_this == eq->head );
-               CPPUNIT_ASSERT( eq->mark_prev == NULL );
+               ret = event_queue_get(eq, me, &m);
+               CPPUNIT_ASSERT( ret == 1 );
                CPPUNIT_ASSERT_EQUAL( string("1"), string(m->job_id_s) );
-               sm = IlTestBase::smsg;
-               sm.job_id_s = "4";
-               sm.receipt_to = 1;
-               ret = event_queue_insert(eq, &sm);
-               CPPUNIT_ASSERT( ret == 0 );
-               CPPUNIT_ASSERT( eq->mark_prev == eq->head );
-               CPPUNIT_ASSERT( eq->mark_this == eq->head->prev );
-               ret = event_queue_insert(eq, &sm);
-               CPPUNIT_ASSERT( ret == 0 );
-               CPPUNIT_ASSERT( eq->mark_prev == eq->head->prev );
-               CPPUNIT_ASSERT( eq->mark_this == eq->head->prev->prev );
+               CPPUNIT_ASSERT( me->current == eq->head->prev );
+               CPPUNIT_ASSERT_EQUAL( string("1"), string(me->jobid) );
+               CPPUNIT_ASSERT( m == me->current->msg );
        }
 
        void testEventQueueRemove() {
-               struct event_queue_msg *mp;
-               struct server_msg *m,sm;
+               struct server_msg *m;
+               struct queue_thread *me = threads + 0;
                int ret;
 
                doSomeInserts();
-               ret = event_queue_get(eq, &m);
-               mp = eq->mark_this->prev;
-               sm = IlTestBase::smsg;
-               sm.job_id_s = "4";
-               sm.receipt_to = 1;
-               event_queue_insert(eq, &sm);
-               ret = event_queue_remove(eq);
-               CPPUNIT_ASSERT( eq->head->prev == mp );
-               CPPUNIT_ASSERT( eq->mark_this == NULL );
-               CPPUNIT_ASSERT( eq->mark_prev == NULL );
+               threads[1].jobid = strdup("4");
+               ret = event_queue_get(eq, me, &m);
+               CPPUNIT_ASSERT(ret == 1);
+               ret = event_queue_remove(eq, me);
+               /* remain 2 */
+               CPPUNIT_ASSERT( eq->head->prev ==  eq->head->next );
+               CPPUNIT_ASSERT( eq->head != eq->head->prev );
+               CPPUNIT_ASSERT( eq->head == eq->tail_ems );
+               ret = event_queue_get(eq, me, &m);
+               CPPUNIT_ASSERT( ret == 1);
+               ret = event_queue_remove(eq, me);
+               /* remain 1 */
+               CPPUNIT_ASSERT( ret == 0 );
+               CPPUNIT_ASSERT( eq->head != NULL );
+               CPPUNIT_ASSERT( eq->head == eq->head->prev );
+               CPPUNIT_ASSERT( eq->head == eq->head->next );
+               ret = event_queue_get(eq, me, &m);
+               CPPUNIT_ASSERT( ret == 1 );
+               ret = event_queue_remove(eq, me);
+               /* empty queue */
+               CPPUNIT_ASSERT( ret == 0);
+               CPPUNIT_ASSERT( eq->head == NULL );
        }
 
 protected:
        char *server;
        struct event_queue *eq;
+       struct queue_thread threads[2];
 
        void doSomeInserts() {
                struct server_msg m = IlTestBase::smsg;
index 3d1d554..6722fba 100644 (file)
@@ -20,8 +20,10 @@ limitations under the License.
 
 extern "C" {
 #include <pthread.h>
+#if 0
 #include "glite/wmsutils/tls/ssl_helpers/ssl_inits.h"
 #include "glite/wmsutils/tls/ssl_helpers/ssl_pthreads.h"
+#endif
 #include "glite/security/glite_gss.h"
 #include "interlogd.h"
 #include "glite/lb/consumer.h"
@@ -37,11 +39,22 @@ extern "C" {
                                                                                 
 int TIMEOUT = DEFAULT_TIMEOUT;
                                                                                 
+#if 0
 gss_cred_id_t cred_handle = GSS_C_NO_CREDENTIAL;
 pthread_mutex_t cred_handle_lock = PTHREAD_MUTEX_INITIALIZER;
+#endif
+cred_handle_t *cred_handle = NULL;
+pthread_mutex_t cred_handle_lock = PTHREAD_MUTEX_INITIALIZER;
                                                                                 
 char *file_prefix = DEFAULT_PREFIX;
 int bs_only = 0;
+int lazy_close = 1;
+int default_close_timeout;
+size_t max_store_size;
+size_t queue_size_low = 0;
+size_t queue_size_high = 0;
+int parallel = 0;
+int killflg = 0;
  
 char *cert_file = NULL;
 char *key_file  = NULL;
@@ -49,6 +62,9 @@ char *CAcert_dir = NULL;
 char *log_server = NULL;
 char *socket_path = DEFAULT_SOCKET;
  
+extern "C" {
+       void do_handle_signal() { };
+}
 
 int 
 main (int ac,const char *av[])
index dd53f52..3604040 100644 (file)
@@ -71,15 +71,15 @@ public:
 
 
        void input_queue_getTest() {
-               char *event;
+               il_octet_string_t *event;
                long offset;
                int ret;
 
                ret = input_queue_get(&event, &offset, 10);
                CPPUNIT_ASSERT( ret >= 0 );
                CPPUNIT_ASSERT_EQUAL( 0L, offset );
-               CPPUNIT_ASSERT_EQUAL( string(IlTestBase::msg), string(event) );
-               free(event);
+               CPPUNIT_ASSERT_EQUAL( string(IlTestBase::msg), string(event->data) );
+               free(event->data);
        }
 };
 
index 2bdc50d..d7f2198 100644 (file)
@@ -33,7 +33,9 @@ class server_msgTest: public CppUnit::TestFixture
 public:
 
        void setUp() {
-               msg = server_msg_create((char *)IlTestBase::msg);
+               il_octet_string d = { len: strlen((char*)IlTestBase::msg), data: (char*)IlTestBase::msg };
+                                       
+               msg = server_msg_create(&d, 0);
        }
 
        void tearDown() {