GLITE_GSS_LIB:=-lglite_security_gss
-TEST_LIBS:=-L${cppunit_prefix}/${libdir} -lcppunit
+TEST_LIBS:=-L${cppunit_prefix}/lib -lcppunit
TEST_INC:=-I${cppunit_prefix}/include
LOGD_OBJS:= logd_proto.o logd.o
input_queue_socket.o \
input_queue_socketTest.o \
send_event.o \
- plugin_mgr. o \
+ plugin_mgr.o \
event_queue.o \
event_queueTest.o \
IlTestBase.o \
-echo commented out -- fix needed
check.il: ${INTERLOG_TEST_OBJS}
- ${LINKXX} -o $@ ${COMMON_LIB}_${thrflavour} ${GLITE_GSS_LIB}_${nothrflavour} ${TEST_LIBS} -lpthread $+
+ ${LINKXX} -o $@ ${COMMON_LIB}_${thrflavour} ${TEST_LIBS} ${EXT_LIB} -lpthread $+
install:
-mkdir -p ${PREFIX}/bin
};
-struct event_queue {
+struct queue_thread {
+ pthread_t thread_id; /* id's of associated threads */
edg_wll_GssConnection gss; /* GSS connection */
+ char *jobid;
+ int timeout; /* queue timeout */
+ int first_event_sent; /* connection can be preempted by server */
+ struct event_queue_msg *current; /* current message being sent */
+};
+
+struct event_queue {
char *dest_name;
int dest_port;
- char *dest;
- int timeout; /* queue timeout */
- struct event_queue_msg *tail; /* last message in the queue */
+ char *dest;
struct event_queue_msg *head; /* first message in the queue */
struct event_queue_msg *tail_ems; /* last priority message in the queue (or NULL) */
- struct event_queue_msg *mark_this; /* mark message for removal */
- struct event_queue_msg *mark_prev; /* predecessor of the marked message */
- pthread_t thread_id; /* id of associated thread */
+ int num_threads; /* number of delivery threads */
+ struct queue_thread *thread; /* info for delivery threads */
pthread_rwlock_t update_lock; /* mutex for queue updates */
pthread_mutex_t cond_lock; /* mutex for condition variable */
pthread_cond_t ready_cond; /* condition variable for message arrival */
int times_empty; /* number of times the queue was emptied */
int max_len; /* max queue length */
int cur_len; /* current length */
- int throttling; /* event insertion suspend flag */
- int first_event_sent; /* connection can be preempted by server */
+ int throttling; /* event insertion suspend flag */
/* delivery methods */
- int (*event_queue_connect)(struct event_queue *);
- int (*event_queue_send)(struct event_queue *);
- int (*event_queue_close)(struct event_queue *);
- void *plugin_data; /* opaque data used by output plugins */
+ int (*event_queue_connect)(struct event_queue *, struct queue_thread *);
+ int (*event_queue_send)(struct event_queue *, struct queue_thread *);
+ int (*event_queue_close)(struct event_queue *, struct queue_thread *);
+ void *plugin_data; /* opaque data used by output plugins */
};
struct il_output_plugin {
- int (*event_queue_connect)(struct event_queue *);
- int (*event_queue_send)(struct event_queue *);
- int (*event_queue_close)(struct event_queue *);
+ int (*event_queue_connect)(struct event_queue *, struct queue_thread *);
+ int (*event_queue_send)(struct event_queue *, struct queue_thread *);
+ int (*event_queue_close)(struct event_queue *, struct queue_thread *);
int (*plugin_init)(char *);
int (*plugin_supports_scheme)(const char *);
};
int event_queue_free(struct event_queue *);
int event_queue_empty(struct event_queue *);
int event_queue_insert(struct event_queue *, struct server_msg *);
-int event_queue_get(struct event_queue *, struct server_msg **);
-int event_queue_remove(struct event_queue *);
+int event_queue_get(struct event_queue *, struct queue_thread *, struct server_msg **);
+int event_queue_remove(struct event_queue *, struct queue_thread *);
int event_queue_enqueue(struct event_queue *, char *);
/* helper */
int enqueue_msg(struct event_queue *, struct server_msg *);
int event_queue_move_events(struct event_queue *, struct event_queue *, int (*)(struct server_msg *, void *), void *);
/* protocol event queue methods */
-int event_queue_connect(struct event_queue *);
-int event_queue_send(struct event_queue *);
-int event_queue_close(struct event_queue *);
+int event_queue_connect(struct event_queue *, struct queue_thread *);
+int event_queue_send(struct event_queue *, struct queue_thread *);
+int event_queue_close(struct event_queue *, struct queue_thread *);
int send_confirmation(long, int);
/* thread event queue methods */
-int event_queue_create_thread(struct event_queue *);
+int event_queue_create_thread(struct event_queue *, int);
int event_queue_lock(struct event_queue *);
int event_queue_unlock(struct event_queue *);
int event_queue_lock_ro(struct event_queue *);
int event_queue_signal(struct event_queue *);
int event_queue_wait(struct event_queue *, int);
-int event_queue_sleep(struct event_queue *);
+int event_queue_sleep(struct event_queue *, int);
int event_queue_wakeup(struct event_queue *);
int event_queue_cond_lock(struct event_queue *);
int event_queue_cond_unlock(struct event_queue *);
struct event_queue_msg {
struct server_msg *msg;
struct event_queue_msg *prev;
+ struct event_queue_msg *next;
};
struct event_queue *
if(!event_queue_empty(eq))
return(-1);
+ /*
if(eq->thread_id)
pthread_cancel(eq->thread_id);
-
+ */
pthread_rwlock_destroy(&eq->update_lock);
pthread_mutex_destroy(&eq->cond_lock);
/* this is critical section */
event_queue_lock(eq);
+ if(NULL == eq->head) {
+ el->prev = el;
+ el->next = el;
+ eq->head = el;
+ eq->tail_ems = el;
+ } else {
+ if(server_msg_is_priority(msg)) {
+ el->next = eq->tail_ems->next;
+ el->prev = eq->tail_ems;
+ if(eq->head == eq->tail_ems) {
+ eq->head = el;
+ }
+ } else {
+ el->next = eq->head->next;
+ el->prev = eq->head;
+ }
+ el->next->prev = el;
+ el->prev->next = el;
+ }
+#if 0 /* OLD IMPLEMENTATION */
#if defined(INTERLOGD_EMS)
if(server_msg_is_priority(msg)) {
/* priority messages go first */
if(eq->mark_this && (el->prev == eq->mark_this))
eq->mark_prev = el;
#endif
+#endif /* OLD IMPLEMENTATION */
if(++eq->cur_len > eq->max_len)
eq->max_len = eq->cur_len;
int
-event_queue_get(struct event_queue *eq, struct server_msg **msg)
+event_queue_get(struct event_queue *eq, struct queue_thread *me, struct server_msg **msg)
{
struct event_queue_msg *el;
+ int found;
assert(eq != NULL);
assert(msg != NULL);
event_queue_lock(eq);
+ if(me->jobid) {
+ free(me->jobid);
+ me->jobid = NULL;
+ me->current = NULL;
+ }
+ if(NULL == eq->head) {
+ event_queue_unlock(eq);
+ return 0;
+ }
+ found = 0;
el = eq->head;
-#if defined(INTERLOGD_EMS)
- /* this message is marked for removal, it is first on the queue */
- eq->mark_this = el;
- eq->mark_prev = NULL;
-#endif
+ do {
+ char *jobid = el->msg->job_id_s;
+ int i;
+
+ for(i = 0; i < eq->num_threads; i++) {
+ if(me == eq->thread + i) {
+ continue;
+ }
+ if(eq->thread[i].jobid && strcmp(jobid, eq->thread[i].jobid) == 0) {
+ break;
+ }
+ }
+ if(i >= eq->num_threads) {
+ /* no other thread is working on this job */
+ found = 1;
+ break;
+ }
+ el = el->prev;
+ } while(el != eq->head);
+ if(found) {
+ me->current = el;
+ me->jobid = strdup(el->msg->job_id_s);
+ *msg = el->msg;
+ } else {
+ }
event_queue_unlock(eq);
- if(el == NULL)
- return(-1);
-
- *msg = el->msg;
-
- return(0);
+ return found;
}
int
-event_queue_remove(struct event_queue *eq)
+event_queue_remove(struct event_queue *eq, struct queue_thread *me)
{
struct event_queue_msg *el;
#if defined(INTERLOGD_EMS)
#endif
assert(eq != NULL);
+
+ el = me->current;
+ if(NULL == el) {
+ return -1;
+ }
/* this is critical section */
event_queue_lock(eq);
+
+ if(el == el->prev) {
+ /* last element */
+ eq->head = NULL;
+ eq->tail_ems = NULL;
+ } else {
+ el->next->prev = el->prev;
+ el->prev->next = el->next;
+ if(el == eq->head) {
+ eq->head = el->prev;
+ }
+ if(el == eq->tail_ems) {
+ eq->tail_ems = el->prev;
+ }
+ }
+
+#if 0 /* OLD IMPLEMENTATION */
#if defined(INTERLOGD_EMS)
el = eq->mark_this;
prev = eq->mark_prev;
eq->tail = NULL;
}
#endif
+#endif /* OLD IMPLEMENTATION */
+
if(--eq->cur_len == 0)
eq->times_empty++;
return(0);
}
+
int
event_queue_move_events(struct event_queue *eq_s,
struct event_queue *eq_d,
int (*cmp_func)(struct server_msg *, void *),
void *data)
{
- struct event_queue_msg *p, **source_prev, **dest_tail;
+ struct event_queue_msg *p, *q, *last;
+ /* struct event_queue_msg **source_prev, **dest_tail; */
assert(eq_s != NULL);
assert(data != NULL);
event_queue_lock(eq_s);
+
+ p = eq_s->head;
+ if(NULL == p) {
+ event_queue_unlock(eq_s);
+ return 0;
+ }
+ if(eq_d) {
+ event_queue_lock(eq_d);
+ }
+
+ last = p->next;
+ do {
+ if((*cmp_func)(p->msg, data)) {
+ glite_common_log(IL_LOG_CATEGORY, LOG_PRIORITY_DEBUG,
+ " moving event at offset %d(%d) from %s:%d to %s:%d",
+ p->msg->offset, p->msg->generation, eq_s->dest_name, eq_s->dest_port,
+ eq_d ? eq_d->dest_name : "trash", eq_d ? eq_d->dest_port : -1);
+ q = p->prev;
+ /* remove message from the source queue */
+ if(p->next == p->prev) {
+ /* removing last message */
+ eq_s->head = NULL;
+ eq_s->tail_ems = NULL;
+ } else {
+ p->next->prev = p->prev;
+ p->prev->next = p->next;
+ if(p == eq_s->head) {
+ eq_s->head = p->prev;
+ }
+ if(p == eq_s->tail_ems) {
+ eq_s->tail_ems = p->prev;
+ }
+ }
+ eq_s->cur_len--;
+ if(eq_d) {
+ /* append message to the destination queue */
+ if(eq_d->head == NULL) {
+ eq_d->head = p;
+ eq_d->tail_ems = p;
+ p->next = p;
+ p->prev = p;
+ } else {
+ /* priorities are ignored */
+ p->next = eq_d->head->next;
+ p->prev = eq_d->head;
+ p->next->prev = p;
+ p->prev->next = p;
+
+ }
+ if(++eq_d->cur_len > eq_d->max_len) {
+ eq_d->max_len = eq_d->cur_len;
+ }
+ } else {
+ /* signal that the message was 'delivered' */
+ event_store_commit(p->msg->es, p->msg->ev_len, queue_list_is_log(eq_s),
+ p->msg->generation);
+ /* free the message */
+ server_msg_free(p->msg);
+ free(p);
+ }
+ p = q;
+ } else {
+ p = p->prev;
+ }
+ } while (eq_s->head && p != last);
+
+#if 0 /* OLD IMPLEMENTATION */
if(eq_d) {
event_queue_lock(eq_d);
/* dest tail is set to point to the last (NULL) pointer in the list */
}
p = *source_prev;
}
+#endif /* OLD IMPLEMENTATION */
+
if(eq_s->cur_len <= queue_size_low) {
eq_s->throttling = 0;
}
glite_common_log(IL_LOG_CATEGORY, LOG_PRIORITY_INFO,
" all messages for notif id %s are now destined to %s",
es->job_id_s, eq_b->dest);
- if(event_queue_create_thread(eq_b) < 0) {
+ if(event_queue_create_thread(eq_b, parallel) < 0) {
ret = -1;
} else {
event_queue_cond_lock(eq_b);
int
enqueue_msg(struct event_queue *eq, struct server_msg *msg)
+/* global: parallel */
{
int ret;
/* fire thread to take care of this queue */
- if(event_queue_create_thread(eq) < 0)
+ if(event_queue_create_thread(eq, parallel) < 0)
return(-1);
#if defined(IL_NOTIFICATIONS)
#include "interlogd.h"
-static const int SOCK_QUEUE_MAX = 50;
+static const int SOCK_QUEUE_MAX = 100;
extern char *socket_path;
static int sock;
size_t max_store_size;
size_t queue_size_low = 0;
size_t queue_size_high = 0;
-int parallel = 0;
+int parallel = 1;
#ifdef LB_PERF
int nosend = 0, norecover=0, nosync=0, noparse=0;
char *event_source = NULL;
"C:" /* CA dir */
"b" /* only bookeeping */
"i:" /* pidfile*/
- "l:" /* log server */
+ "l:" /* log server */
"d" /* debug */
- "p" /* parallel */
+ "p::" /* parallel */
"q:"
"Q:"
"F:" /* conf file */
}
#endif
+ glite_common_log(LOG_CATEGORY_CONTROL, LOG_PRIORITY_INFO, "Using %d threads for parallel delivery.", parallel);
glite_common_log(LOG_CATEGORY_CONTROL, LOG_PRIORITY_INFO, "Entering main loop.");
/* do the work */
#
numjobs=10
+parallel=1
# XXX - there must be better way to find stage
if [ -z "${GLITE_LOCATION}" ]; then
#TEST_VARIANT=
SILENT=0
-while getopts "G:t:n:s" OPTION
+while getopts "G:t:n:sp:" OPTION
do
case "$OPTION" in
"G") TEST_GROUP=$OPTARG
"s") SILENT=1
;;
+ "p") parallel=$OPTARG
+ ;;
+
esac
done
fi
PERFTEST_CONSUMER=$STAGEDIR/bin/glite-lb-bkserverd
-CONSUMER_ARGS="--silent -S /tmp -D /tmp -t 1 -d --perf-sink=1 -p 10500 -w 10503"
+CONSUMER_ARGS="-g --silent -S /tmp -D /tmp -t 1 -d --perf-sink=1 -p 10500 -w 10503"
PERFTEST_COMPONENT=$STAGEDIR/bin/glite-lb-interlogd-perf
LOGJOBS_ARGS=" -m localhost:10500 $COMM_ARGS"
}
group_c_test_d ()
{
- COMPONENT_ARGS="-i /tmp/perftest_il.pid -d --lazy=10 --nosync --norecover $COMM_ARGS"
+ COMPONENT_ARGS="-p $parallel -i /tmp/perftest_il.pid -d --lazy=10 --nosync --norecover $COMM_ARGS"
echo -n "d)"
run_test il $numjobs
print_result
group_c_test_e ()
{
- COMPONENT_ARGS="-i /tmp/perftest_il.pid -d $COMM_ARGS"
+ COMPONENT_ARGS="-p $parallel -i /tmp/perftest_il.pid -d $COMM_ARGS"
echo -n "e)"
run_test il $numjobs
print_result
#include "interlogd.h"
struct queue_list {
+ struct queue_list *queues;
struct event_queue *queue;
char *dest;
struct queue_list *next;
queue_list_add(struct queue_list **ql, const char *dest, struct event_queue *eq)
{
struct queue_list *el;
-
+
assert(dest != NULL);
- assert(eq != NULL);
assert(ql != NULL);
el = malloc(sizeof(*el));
return(NULL);
if(queue_list_find(queues, dest, &q, NULL)) {
-#if !defined(IL_NOTIFICATIONS)
- free(dest);
-#endif
- return(q->queue);
+ /* queue found for given destination */
+ eq = q->queue;
} else {
+ /* no queue for given destination found */
eq = event_queue_create(dest, outp);
if(eq)
queue_list_add(&queues, dest, eq);
+ }
#if !defined(IL_NOTIFICATIONS)
- free(dest);
+ free(dest);
#endif
- return(eq);
- }
+ return eq;
}
queue_thread_cleanup(void *q)
{
struct event_queue *eq = (struct event_queue *)q;
+ pthread_t my_id = pthread_self();
+ int me;
- glite_common_log(LOG_CATEGORY_CONTROL, LOG_PRIORITY_WARN, "thread %d exits", eq->thread_id);
+ glite_common_log(LOG_CATEGORY_CONTROL, LOG_PRIORITY_WARN, "thread %d exits", my_id);
/* unlock all held locks */
/* FIXME: check that the thread always exits when holding these locks;
*/
/* clear thread id */
- eq->thread_id = 0;
+ for(me = 0; me < eq->num_threads; me++) {
+ if(my_id == eq->thread[me].thread_id) {
+ eq->thread[me].thread_id = 0;
+ break;
+ }
+ }
}
queue_thread(void *q)
{
struct event_queue *eq = (struct event_queue *)q;
+ struct queue_thread *me;
+ pthread_t my_id;
int ret, exit;
int retrycnt;
int close_timeout = 0;
" started new thread for delivery to %s",
eq->dest);
+ my_id = pthread_self();
+ for(me = eq->thread; me - eq->thread < eq->num_threads; me++) {
+ if(my_id == me->thread_id) {
+ break;
+ }
+ }
+ if(me - eq->thread >= eq->num_threads) {
+ glite_common_log(LOG_CATEGORY_CONTROL, LOG_PRIORITY_ERROR,
+ "Error looking up thread identity, exiting!");
+ pthread_exit(NULL);
+ }
+
pthread_cleanup_push(queue_thread_cleanup, q);
event_queue_cond_lock(eq);
/* if there are no events, wait for them */
ret = 0;
- while (event_queue_empty(eq)
+ while (event_queue_empty(eq)
#if defined(INTERLOGD_HANDLE_CMD) && defined(INTERLOGD_FLUSH)
&& (eq->flushing != 1)
#endif
if(lazy_close && close_timeout) {
ret = event_queue_wait(eq, close_timeout);
if(ret == 1) {/* timeout? */
- (*eq->event_queue_close)(eq);
+ (*eq->event_queue_close)(eq, me);
glite_common_log(IL_LOG_CATEGORY, LOG_PRIORITY_DEBUG,
" connection to %s closed",
eq->dest);
ret = event_queue_wait(eq, exit_timeout);
if(ret == 1) {
glite_common_log(LOG_CATEGORY_CONTROL, LOG_PRIORITY_INFO,
- " thread idle for more than %d seconds, exiting",
+ " thread %x idle for more than %d seconds, exiting",
+ me,
exit_timeout);
- (*eq->event_queue_close)(eq);
+ (*eq->event_queue_close)(eq, me);
event_queue_cond_unlock(eq);
pthread_exit((void*)0);
}
event_queue_cond_unlock(eq);
pthread_exit((void*)-1);
}
+ /* sleep only once when there is no event available for this particular thread */
} /* END while(empty) */
*/
event_queue_cond_unlock(eq);
- /* discard expired events */
- glite_common_log(IL_LOG_CATEGORY, LOG_PRIORITY_DEBUG, " discarding expired events");
- now = time(NULL);
- event_queue_move_events(eq, NULL, cmp_expires, &now);
+ /* discard expired events - only the first thread */
+ if(me == eq->thread) {
+ glite_common_log(IL_LOG_CATEGORY, LOG_PRIORITY_DEBUG, " thread %x: discarding expired events", me);
+ now = time(NULL);
+ event_queue_move_events(eq, NULL, cmp_expires, &now);
+ }
if(!event_queue_empty(eq)) {
/* deliver pending events */
glite_common_log(IL_LOG_CATEGORY, LOG_PRIORITY_DEBUG,
- " attempting delivery to %s",
+ " thread %x: attempting delivery to %s",
+ me,
eq->dest);
/* connect to server */
- if((ret=(*eq->event_queue_connect)(eq)) == 0) {
+ if((ret=(*eq->event_queue_connect)(eq, me)) == 0) {
/* not connected */
if(error_get_maj() != IL_OK)
glite_common_log(IL_LOG_CATEGORY, LOG_PRIORITY_WARN,
"queue_thread: %s", error_get_msg());
#if defined(IL_NOTIFICATIONS)
glite_common_log(IL_LOG_CATEGORY, LOG_PRIORITY_INFO,
- " could not connect to client %s, waiting for retry",
+ " thread %x: could not connect to client %s, waiting for retry",
+ me,
eq->dest);
#else
glite_common_log(IL_LOG_CATEGORY, LOG_PRIORITY_INFO,
- " could not connect to bookkeeping server %s, waiting for retry",
+ " thread %x: could not connect to bookkeeping server %s, waiting for retry",
+ me,
eq->dest);
#endif
retrycnt++;
} else {
retrycnt = 0;
/* connected, send events */
- switch(ret=(*eq->event_queue_send)(eq)) {
+ switch(ret=(*eq->event_queue_send)(eq, me)) {
case 0:
/* there was an error and we still have events to send */
"queue_thread: %s",
error_get_msg());
glite_common_log(IL_LOG_CATEGORY, LOG_PRIORITY_DEBUG,
- " events still waiting");
+ " thread %x: events still waiting", me);
break;
case 1:
/* hey, we are done for now */
glite_common_log(IL_LOG_CATEGORY, LOG_PRIORITY_DEBUG,
- " all events for %s sent",
+ " thread %x: all events for %s sent",
+ me,
eq->dest);
break;
if((ret == 1) && lazy_close)
close_timeout = default_close_timeout;
else {
- (*eq->event_queue_close)(eq);
+ (*eq->event_queue_close)(eq, me);
glite_common_log(IL_LOG_CATEGORY, LOG_PRIORITY_DEBUG,
- " connection to %sclosed",
+ " thread %x: connection to %s closed",
+ me,
eq->dest);
}
}
#else
#endif
+ event_queue_lock(eq);
+ if(me->jobid) {
+ free(me->jobid);
+ me->jobid = NULL;
+ me->current = NULL;
+ }
+ event_queue_unlock(eq);
+
+
/* if there was some error with server, sleep for a while */
/* iff !event_queue_empty() */
/* also allow for one more try immediately after server disconnect,
#ifndef LB_PERF
if((ret == 0) && (retrycnt > 0)) {
glite_common_log(IL_LOG_CATEGORY, LOG_PRIORITY_DEBUG,
- " sleeping");
- event_queue_sleep(eq);
+ " thread %x: sleeping", me);
+ event_queue_sleep(eq, me->timeout);
}
#endif
int
-event_queue_create_thread(struct event_queue *eq)
+event_queue_create_thread(struct event_queue *eq, int num_threads)
{
pthread_attr_t attr;
+ int i;
assert(eq != NULL);
event_queue_lock(eq);
- /* if there is a thread already, just return */
- if(eq->thread_id != 0) {
- event_queue_unlock(eq);
- return(0);
+ eq->num_threads = num_threads;
+
+ /* create thread data */
+ if(NULL == eq->thread) {
+ eq->thread = calloc(num_threads, sizeof(*eq->thread));
+ if(NULL == eq->thread) {
+ set_error(IL_NOMEM, ENOMEM, "event_queue_create: error allocating data for threads");
+ event_queue_unlock(eq);
+ return -1;
+ }
}
- /* create the thread itself */
- pthread_attr_init(&attr);
- pthread_attr_setstacksize(&attr, 65536);
- if(pthread_create(&eq->thread_id, &attr, queue_thread, eq) < 0) {
- eq->thread_id = 0;
- set_error(IL_SYS, errno, "event_queue_create_thread: error creating new thread");
- event_queue_unlock(eq);
- return(-1);
+ /* create the threads itself */
+ for(i = 0; i < eq->num_threads; i++) {
+ pthread_attr_init(&attr);
+ pthread_attr_setstacksize(&attr, 65536);
+ if(eq->thread[i].thread_id == 0) {
+ if(pthread_create(&eq->thread[i].thread_id, &attr, queue_thread, eq) < 0) {
+ eq->thread[i].thread_id = 0;
+ set_error(IL_SYS, errno, "event_queue_create_thread: error creating new thread");
+ event_queue_unlock(eq);
+ return(-1);
+ }
+ }
+ /* the thread is never going to be joined */
+ pthread_detach(eq->thread[i].thread_id);
}
- /* the thread is never going to be joined */
- pthread_detach(eq->thread_id);
-
event_queue_unlock(eq);
return(1);
}
-int event_queue_sleep(struct event_queue *eq)
+int event_queue_sleep(struct event_queue *eq, int timeout)
{
#if defined(INTERLOGD_HANDLE_CMD) && defined(INTERLOGD_FLUSH)
struct timespec ts;
assert(eq != NULL);
gettimeofday(&tv, NULL);
- ts.tv_sec = tv.tv_sec + eq->timeout;
+ ts.tv_sec = tv.tv_sec + timeout;
ts.tv_nsec = 1000 * tv.tv_usec;
if((ret=pthread_cond_timedwait(&eq->flush_cond, &eq->cond_lock, &ts)) < 0) {
if(ret != ETIMEDOUT) {
}
}
#else
- sleep(eq->timeout);
+ sleep(timeout);
#endif
return(0);
}
new_creds->name);
}
}
+#ifndef LB_PERF
sleep(RECOVER_TIMEOUT);
+#else
+ sleep(2);
+#endif
}
}
*/
static
int
-get_reply(struct event_queue *eq, char **buf, int *code_min)
+get_reply(struct event_queue *eq, struct queue_thread *me, char **buf, int *code_min)
{
char *msg=NULL;
int ret, code;
tv.tv_sec = TIMEOUT;
tv.tv_usec = 0;
- data.gss = &eq->gss;
+ data.gss = &me->gss;
data.timeout = &tv;
len = read_il_data(&data, &msg, gss_reader);
if(len < 0) {
* Returns: 0 - not connected, timeout set, 1 - OK
*/
int
-event_queue_connect(struct event_queue *eq)
+event_queue_connect(struct event_queue *eq, struct queue_thread *me)
{
int ret;
struct timeval tv;
cred_handle_t *local_cred_handle;
assert(eq != NULL);
+ assert(me != NULL);
#ifdef LB_PERF
if(!nosend) {
#endif
- if(eq->gss.context == NULL) {
+ if(me->gss.context == NULL) {
tv.tv_sec = TIMEOUT;
tv.tv_usec = 0;
glite_common_log(IL_LOG_CATEGORY, LOG_PRIORITY_DEBUG,
" trying to connect to %s:%d",
eq->dest_name, eq->dest_port);
- ret = edg_wll_gss_connect(local_cred_handle->creds, eq->dest_name, eq->dest_port, &tv, &eq->gss, &gss_stat);
+ ret = edg_wll_gss_connect(local_cred_handle->creds, eq->dest_name, eq->dest_port, &tv, &me->gss, &gss_stat);
if(pthread_mutex_lock(&cred_handle_lock) < 0)
abort();
/* check if we need to release the credentials */
set_error(IL_DGGSS, ret,
(ret == EDG_WLL_GSS_ERROR_GSS) ? gss_err : "event_queue_connect: edg_wll_gss_connect");
if (gss_err) free(gss_err);
- eq->gss.context = NULL;
- eq->timeout = TIMEOUT;
+ me->gss.context = NULL;
+ me->timeout = TIMEOUT;
return(0);
}
- eq->first_event_sent = 0;
+ me->first_event_sent = 0;
}
#ifdef LB_PERF
int
-event_queue_close(struct event_queue *eq)
+event_queue_close(struct event_queue *eq, struct queue_thread *me)
{
assert(eq != NULL);
+ assert(me != NULL);
#ifdef LB_PERF
if(!nosend) {
#endif
- if(eq->gss.context != NULL) {
- edg_wll_gss_close(&eq->gss, NULL);
- eq->gss.context = NULL;
+ if(me->gss.context != NULL) {
+ edg_wll_gss_close(&me->gss, NULL);
+ me->gss.context = NULL;
}
- eq->first_event_sent = 0;
+ me->first_event_sent = 0;
#ifdef LB_PERF
}
#endif
* Returns: -1 - system error, 0 - not send, 1 - queue empty
*/
int
-event_queue_send(struct event_queue *eq)
+event_queue_send(struct event_queue *eq, struct queue_thread *me)
{
assert(eq != NULL);
+ assert(me != NULL);
#ifdef LB_PERF
if(!nosend) {
#endif
- if(eq->gss.context == NULL)
+ if(me->gss.context == NULL)
return(0);
#ifdef LB_PERF
}
clear_error();
- if(event_queue_get(eq, &msg) < 0)
- return(-1);
+ if(event_queue_get(eq, me, &msg) == 0)
+ return(1);
glite_common_log(IL_LOG_CATEGORY, LOG_PRIORITY_DEBUG,
" trying to deliver event at offset %d for job %s",
if (msg->len) {
tv.tv_sec = TIMEOUT;
tv.tv_usec = 0;
- ret = edg_wll_gss_write_full(&eq->gss, msg->msg, msg->len, &tv, &bytes_sent, &gss_stat);
+ ret = edg_wll_gss_write_full(&me->gss, msg->msg, msg->len, &tv, &bytes_sent, &gss_stat);
if(ret < 0) {
- if (ret == EDG_WLL_GSS_ERROR_ERRNO && errno == EPIPE && eq->first_event_sent )
- eq->timeout = 0;
+ if (ret == EDG_WLL_GSS_ERROR_ERRNO && errno == EPIPE && me->first_event_sent )
+ me->timeout = 0;
else
- eq->timeout = TIMEOUT;
+ me->timeout = TIMEOUT;
return(0);
}
- if((code = get_reply(eq, &rep, &code_min)) < 0) {
+ if((code = get_reply(eq, me, &rep, &code_min)) < 0) {
/* could not get the reply properly, so try again later */
- if (eq->first_event_sent) {
+ if (me->first_event_sent) {
/* could be expected server connection preemption */
clear_error();
- eq->timeout = 1;
+ me->timeout = 1;
} else {
- eq->timeout = TIMEOUT;
+ me->timeout = TIMEOUT;
glite_common_log(IL_LOG_CATEGORY, LOG_PRIORITY_WARN, " error reading server %s reply: %s",
eq->dest_name, error_get_msg());
}
case LB_PERM:
case LB_DBERR:
/* non fatal errors (for us) */
- eq->timeout = TIMEOUT;
+ me->timeout = TIMEOUT;
return(0);
case LB_OK:
"send_event: %s",
error_get_msg());
- event_queue_remove(eq);
- eq->first_event_sent = 1;
+ event_queue_remove(eq, me);
+ me->first_event_sent = 1;
break;
} /* switch */
const struct server_msg IlTestBase::smsg = {
"https://some.host:1234/x67qr549qc",
+ 0L,
(char*)IlTestBase::msg_enc,
strlen(IlTestBase::msg_enc),
strlen(IlTestBase::msg) + 1,
- NULL
+ NULL,
+ 0,
+ 0L,
+ "some.host",
+ 1234,
+ "some.host:1234",
+ 0
};
struct event_queue_msg {
struct server_msg *msg;
struct event_queue_msg *prev;
+ struct event_queue_msg *next;
};
}
public:
void setUp() {
server = strdup("localhost:8080");
- eq = event_queue_create(server);
+ eq = event_queue_create(server, NULL);
+ threads[0].thread_id = pthread_self();
+ threads[0].gss.context = NULL;
+ threads[0].jobid = strdup("1");
+ threads[1].thread_id = (pthread_t)1;
+ threads[1].gss.context = NULL;
+ threads[1].jobid = strdup("2");
+ eq->thread = threads;
+ eq->num_threads = 2;
free(server);
}
struct event_queue_msg *mp;
struct server_msg *m;
- for(mp = eq->head; mp != NULL; ) {
+ for(mp = eq->head; mp != eq->head; ) {
struct event_queue_msg *mq;
server_msg_free(mp->msg);
CPPUNIT_ASSERT( eq != NULL );
CPPUNIT_ASSERT_EQUAL( string(eq->dest_name), string("localhost") );
CPPUNIT_ASSERT_EQUAL( eq->dest_port, 8081 );
- CPPUNIT_ASSERT( eq->tail == NULL );
CPPUNIT_ASSERT( eq->head == NULL );
CPPUNIT_ASSERT( eq->tail_ems == NULL );
- CPPUNIT_ASSERT( eq->mark_this == NULL );
- CPPUNIT_ASSERT( eq->mark_prev == NULL );
- CPPUNIT_ASSERT( eq->thread_id == 0 );
CPPUNIT_ASSERT( eq->flushing == 0 );
CPPUNIT_ASSERT( eq->flush_result == 0 );
}
mp = eq->head;
m = mp->msg;
CPPUNIT_ASSERT_EQUAL( string(m->job_id_s), string("2") );
- CPPUNIT_ASSERT_EQUAL( mp, eq->tail_ems );
+ CPPUNIT_ASSERT_EQUAL( mp, mp->next->prev);
mp = mp->prev;
m = mp->msg;
CPPUNIT_ASSERT_EQUAL( string(m->job_id_s), string("1") );
+ CPPUNIT_ASSERT_EQUAL( mp, eq->tail_ems );
+ CPPUNIT_ASSERT_EQUAL( mp, mp->next->prev);
mp = mp->prev;
m = mp->msg;
CPPUNIT_ASSERT_EQUAL( string(m->job_id_s), string("3") );
- CPPUNIT_ASSERT_EQUAL( mp, eq->tail );
- CPPUNIT_ASSERT( mp->prev == NULL );
+ CPPUNIT_ASSERT_EQUAL( mp, mp->next->prev);
+ CPPUNIT_ASSERT_EQUAL( mp->prev, eq->head );
}
void testEventQueueGet() {
- struct event_queue_msg *mp;
- struct server_msg *m,sm;
+ struct queue_thread *me = threads + 0;
+ struct server_msg *m;
int ret;
doSomeInserts();
- mp = eq->head;
- eq->head = mp->prev;
- eq->tail_ems = NULL;
- server_msg_free(mp->msg);
- free(mp);
- ret = event_queue_get(eq, &m);
- CPPUNIT_ASSERT( ret == 0 );
- CPPUNIT_ASSERT( eq->mark_this == eq->head );
- CPPUNIT_ASSERT( eq->mark_prev == NULL );
+ ret = event_queue_get(eq, me, &m);
+ CPPUNIT_ASSERT( ret == 1 );
CPPUNIT_ASSERT_EQUAL( string("1"), string(m->job_id_s) );
- sm = IlTestBase::smsg;
- sm.job_id_s = "4";
- sm.receipt_to = 1;
- ret = event_queue_insert(eq, &sm);
- CPPUNIT_ASSERT( ret == 0 );
- CPPUNIT_ASSERT( eq->mark_prev == eq->head );
- CPPUNIT_ASSERT( eq->mark_this == eq->head->prev );
- ret = event_queue_insert(eq, &sm);
- CPPUNIT_ASSERT( ret == 0 );
- CPPUNIT_ASSERT( eq->mark_prev == eq->head->prev );
- CPPUNIT_ASSERT( eq->mark_this == eq->head->prev->prev );
+ CPPUNIT_ASSERT( me->current == eq->head->prev );
+ CPPUNIT_ASSERT_EQUAL( string("1"), string(me->jobid) );
+ CPPUNIT_ASSERT( m == me->current->msg );
}
void testEventQueueRemove() {
- struct event_queue_msg *mp;
- struct server_msg *m,sm;
+ struct server_msg *m;
+ struct queue_thread *me = threads + 0;
int ret;
doSomeInserts();
- ret = event_queue_get(eq, &m);
- mp = eq->mark_this->prev;
- sm = IlTestBase::smsg;
- sm.job_id_s = "4";
- sm.receipt_to = 1;
- event_queue_insert(eq, &sm);
- ret = event_queue_remove(eq);
- CPPUNIT_ASSERT( eq->head->prev == mp );
- CPPUNIT_ASSERT( eq->mark_this == NULL );
- CPPUNIT_ASSERT( eq->mark_prev == NULL );
+ threads[1].jobid = strdup("4");
+ ret = event_queue_get(eq, me, &m);
+ CPPUNIT_ASSERT(ret == 1);
+ ret = event_queue_remove(eq, me);
+ /* remain 2 */
+ CPPUNIT_ASSERT( eq->head->prev == eq->head->next );
+ CPPUNIT_ASSERT( eq->head != eq->head->prev );
+ CPPUNIT_ASSERT( eq->head == eq->tail_ems );
+ ret = event_queue_get(eq, me, &m);
+ CPPUNIT_ASSERT( ret == 1);
+ ret = event_queue_remove(eq, me);
+ /* remain 1 */
+ CPPUNIT_ASSERT( ret == 0 );
+ CPPUNIT_ASSERT( eq->head != NULL );
+ CPPUNIT_ASSERT( eq->head == eq->head->prev );
+ CPPUNIT_ASSERT( eq->head == eq->head->next );
+ ret = event_queue_get(eq, me, &m);
+ CPPUNIT_ASSERT( ret == 1 );
+ ret = event_queue_remove(eq, me);
+ /* empty queue */
+ CPPUNIT_ASSERT( ret == 0);
+ CPPUNIT_ASSERT( eq->head == NULL );
}
protected:
char *server;
struct event_queue *eq;
+ struct queue_thread threads[2];
void doSomeInserts() {
struct server_msg m = IlTestBase::smsg;
extern "C" {
#include <pthread.h>
+#if 0
#include "glite/wmsutils/tls/ssl_helpers/ssl_inits.h"
#include "glite/wmsutils/tls/ssl_helpers/ssl_pthreads.h"
+#endif
#include "glite/security/glite_gss.h"
#include "interlogd.h"
#include "glite/lb/consumer.h"
int TIMEOUT = DEFAULT_TIMEOUT;
+#if 0
gss_cred_id_t cred_handle = GSS_C_NO_CREDENTIAL;
pthread_mutex_t cred_handle_lock = PTHREAD_MUTEX_INITIALIZER;
+#endif
+cred_handle_t *cred_handle = NULL;
+pthread_mutex_t cred_handle_lock = PTHREAD_MUTEX_INITIALIZER;
char *file_prefix = DEFAULT_PREFIX;
int bs_only = 0;
+int lazy_close = 1;
+int default_close_timeout;
+size_t max_store_size;
+size_t queue_size_low = 0;
+size_t queue_size_high = 0;
+int parallel = 0;
+int killflg = 0;
char *cert_file = NULL;
char *key_file = NULL;
char *log_server = NULL;
char *socket_path = DEFAULT_SOCKET;
+extern "C" {
+ void do_handle_signal() { };
+}
int
main (int ac,const char *av[])
void input_queue_getTest() {
- char *event;
+ il_octet_string_t *event;
long offset;
int ret;
ret = input_queue_get(&event, &offset, 10);
CPPUNIT_ASSERT( ret >= 0 );
CPPUNIT_ASSERT_EQUAL( 0L, offset );
- CPPUNIT_ASSERT_EQUAL( string(IlTestBase::msg), string(event) );
- free(event);
+ CPPUNIT_ASSERT_EQUAL( string(IlTestBase::msg), string(event->data) );
+ free(event->data);
}
};
public:
void setUp() {
- msg = server_msg_create((char *)IlTestBase::msg);
+ il_octet_string d = { len: strlen((char*)IlTestBase::msg), data: (char*)IlTestBase::msg };
+
+ msg = server_msg_create(&d, 0);
}
void tearDown() {