From bc5b35978e5656ed7e514772d114d03fbce4ff12 Mon Sep 17 00:00:00 2001 From: =?utf8?q?Ale=C5=A1=20K=C5=99enek?= Date: Thu, 7 Oct 2004 14:34:08 +0000 Subject: [PATCH] - pullup INFN CVS changes - comment out unit tests in Makefile until they are added into CVS --- org.glite.lb.logger/Makefile | 4 +- org.glite.lb.logger/project/version.properties | 2 +- org.glite.lb.logger/src/event_queue.c | 9 ++-- org.glite.lb.logger/src/event_store.c | 13 +++++- org.glite.lb.logger/src/il_master.c | 12 +++--- org.glite.lb.logger/src/input_queue_socket.c | 2 +- org.glite.lb.logger/src/interlogd.c | 5 ++- org.glite.lb.logger/src/interlogd.h | 3 +- org.glite.lb.logger/src/logd.c | 1 + org.glite.lb.logger/src/logd_proto.c | 43 ++++++++++++------- org.glite.lb.logger/src/logd_proto.h | 8 ++++ org.glite.lb.logger/src/queue_thread.c | 58 ++++++++++++-------------- org.glite.lb.logger/src/recover.c | 4 +- org.glite.lb.logger/src/send_event.c | 4 +- org.glite.lb.logger/src/server_msg.c | 4 +- 15 files changed, 105 insertions(+), 67 deletions(-) diff --git a/org.glite.lb.logger/Makefile b/org.glite.lb.logger/Makefile index 95c3adc..7cfce1f 100644 --- a/org.glite.lb.logger/Makefile +++ b/org.glite.lb.logger/Makefile @@ -104,7 +104,9 @@ all compile: glite_lb_logd glite_lb_interlogd glite_lb_notif_interlogd stage: compile $(MAKE) install PREFIX=${stagedir} -check: check.ll check.il +check: +# do nothing until test/ is really added to CVS +# check.ll check.il #check.ll: logd_proto_test.o ll_test.o # ${LINKXX} -o $@ ${COMMON_LIB}_${nothrflavour} ${EXT_LIBS} ${GLOBUS_LIBS} ${TEST_LIBS} $+ diff --git a/org.glite.lb.logger/project/version.properties b/org.glite.lb.logger/project/version.properties index 7732282..544b6e8 100644 --- a/org.glite.lb.logger/project/version.properties +++ b/org.glite.lb.logger/project/version.properties @@ -1,4 +1,4 @@ #Thu Oct 07 13:57:49 CEST 2004 -module.version=0.1.0 +module.version=0.2.0 module.build=34 module.age=0 diff --git a/org.glite.lb.logger/src/event_queue.c b/org.glite.lb.logger/src/event_queue.c index 1873bde..d311a1a 100644 --- a/org.glite.lb.logger/src/event_queue.c +++ b/org.glite.lb.logger/src/event_queue.c @@ -264,6 +264,7 @@ event_queue_remove(struct event_queue *eq) } #if defined(IL_NOTIFICATIONS) + int event_queue_move_events(struct event_queue *eq_s, struct event_queue *eq_d, char *notif_id) { @@ -276,15 +277,15 @@ event_queue_move_events(struct event_queue *eq_s, struct event_queue *eq_d, char if(eq_d) { event_queue_lock(eq_d); /* dest tail is set to point to the last (NULL) pointer in the list */ - dest_tail = (eq_d->tail == NULL) ? &(eq_d->tail) : &(eq_d->tail->prev); + dest_tail = (eq_d->head == NULL) ? &(eq_d->head) : &(eq_d->tail->prev); } source_prev = &(eq_s->head); p = *source_prev; - eq_s = NULL; + eq_s->tail = NULL; while(p) { if(strcmp(p->msg->job_id_s, notif_id) == 0) { - il_log(LOG_DEBUG, " moving event with notif id %s from %s to %s\n", - notif_id, eq_s->dest_name, eq_d ? eq_d->dest_name : "trash"); + il_log(LOG_DEBUG, " moving event with notif id %s from %s:%d to %s:%d\n", + notif_id, eq_s->dest_name,eq_s->dest_port, eq_d ? eq_d->dest_name : "trash",eq_d ? eq_d->dest_port : -1); /* remove the message from the source list */ *source_prev = p->prev; if(eq_d) { diff --git a/org.glite.lb.logger/src/event_store.c b/org.glite.lb.logger/src/event_store.c index 2e08d17..295b2f0 100644 --- a/org.glite.lb.logger/src/event_store.c +++ b/org.glite.lb.logger/src/event_store.c @@ -287,7 +287,7 @@ event_store_write_ctl(struct event_store *es) int event_store_recover(struct event_store *es) { - struct event_queue *eq_l, *eq_b; + struct event_queue *eq_l = NULL, *eq_b, *eq_b_new; struct server_msg *msg; char *event_s; int fd, ret; @@ -380,7 +380,7 @@ event_store_recover(struct event_store *es) ret = -1; /* create message for server */ - msg = server_msg_create(event_s); + msg = server_msg_create(event_s, last); free(event_s); if(msg == NULL) { break; @@ -398,6 +398,15 @@ event_store_recover(struct event_store *es) #endif } +#ifdef IL_NOTIFICATIONS + eq_b_new = queue_list_get(msg->dest); + if (eq_b_new != eq_b) { + free(es->dest); + es->dest = strdup(msg->dest); + eq_b = eq_b_new; + } +#endif + /* now enqueue to the BS, if neccessary */ if((eq_b != eq_l) && (last >= es->last_committed_bs)) { diff --git a/org.glite.lb.logger/src/il_master.c b/org.glite.lb.logger/src/il_master.c index c4f95e4..f6414c9 100644 --- a/org.glite.lb.logger/src/il_master.c +++ b/org.glite.lb.logger/src/il_master.c @@ -29,17 +29,19 @@ enqueue_msg(struct event_queue *eq, struct server_msg *msg) if(eq_known != NULL) event_queue_move_events(eq_known, eq, msg->job_id_s); } +#endif + /* fire thread to take care of this queue */ + if(event_queue_create_thread(eq) < 0) + return(-1); + +#if defined(IL_NOTIFICATIONS) /* if there are no data to send, do not send anything (messsage was just to change the delivery address) */ if(msg->len == 0) return(0); #endif - /* fire thread to take care of this queue */ - if(event_queue_create_thread(eq) < 0) - return(-1); - /* avoid losing signal to thread */ event_queue_cond_lock(eq); @@ -309,7 +311,7 @@ handle_msg(char *event, long offset) int ret; /* convert event to message for server */ - if((msg = server_msg_create(event)) == NULL) { + if((msg = server_msg_create(event, offset)) == NULL) { il_log(LOG_ERR, " handle_msg: error parsing event '%s':\n %s\n", event, error_get_msg()); return(0); } diff --git a/org.glite.lb.logger/src/input_queue_socket.c b/org.glite.lb.logger/src/input_queue_socket.c index 0587cc8..f183319 100644 --- a/org.glite.lb.logger/src/input_queue_socket.c +++ b/org.glite.lb.logger/src/input_queue_socket.c @@ -11,7 +11,7 @@ #include "interlogd.h" -static const int SOCK_QUEUE_MAX = 5; +static const int SOCK_QUEUE_MAX = 50; extern char *socket_path; static int sock; diff --git a/org.glite.lb.logger/src/interlogd.c b/org.glite.lb.logger/src/interlogd.c index 51c65f5..efe13e7 100644 --- a/org.glite.lb.logger/src/interlogd.c +++ b/org.glite.lb.logger/src/interlogd.c @@ -11,8 +11,6 @@ #include -#include "glite/wmsutils/tls/ssl_helpers/ssl_inits.h" -#include "glite/wmsutils/tls/ssl_helpers/ssl_pthreads.h" #include "interlogd.h" #include "glite/lb/consumer.h" #include "glite/lb/lb_gss.h" @@ -36,6 +34,8 @@ int TIMEOUT = DEFAULT_TIMEOUT; gss_cred_id_t cred_handle = GSS_C_NO_CREDENTIAL; pthread_mutex_t cred_handle_lock = PTHREAD_MUTEX_INITIALIZER; +time_t key_mtime = 0, cert_mtime = 0; + static void usage (int status) { printf("%s - \n" @@ -222,6 +222,7 @@ main (int argc, char **argv) if (CAcert_dir) setenv("X509_CERT_DIR", CAcert_dir, 1); + edg_wll_gss_watch_creds(cert_file,&cert_mtime); ret = edg_wll_gss_acquire_cred_gsi(cert_file, &cred_handle, NULL, &gss_stat); if (ret) { char *gss_err = NULL; diff --git a/org.glite.lb.logger/src/interlogd.h b/org.glite.lb.logger/src/interlogd.h index bebda87..3c96828 100644 --- a/org.glite.lb.logger/src/interlogd.h +++ b/org.glite.lb.logger/src/interlogd.h @@ -90,6 +90,7 @@ struct event_store { struct server_msg { char *job_id_s; /* necessary for commit */ + long offset; /* just for printing more information to debug */ char *msg; int len; int ev_len; @@ -128,7 +129,7 @@ struct event_queue { /* server msg methods */ -struct server_msg *server_msg_create(char *); +struct server_msg *server_msg_create(char *, long); struct server_msg *server_msg_copy(struct server_msg *); int server_msg_init(struct server_msg *, char *); #if defined(INTERLOGD_EMS) diff --git a/org.glite.lb.logger/src/logd.c b/org.glite.lb.logger/src/logd.c index ad3fbd4..c8c1bfe 100644 --- a/org.glite.lb.logger/src/logd.c +++ b/org.glite.lb.logger/src/logd.c @@ -340,6 +340,7 @@ Copyright (c) 2002 CERN, INFN and CESNET on behalf of the EU DataGrid.\n"); if (mysignal(SIGTERM, handle_signal) == SIG_ERR) { perror("signal"); exit(1); } if (mysignal(SIGCHLD, handle_signal) == SIG_ERR) { perror("signal"); exit(1); } + edg_wll_gss_watch_creds(cert_file,&cert_mtime); /* XXX DK: support noAuth */ ret = edg_wll_gss_acquire_cred_gsi(cert_file, &cred, &my_subject_name, &gss_stat); diff --git a/org.glite.lb.logger/src/logd_proto.c b/org.glite.lb.logger/src/logd_proto.c index bb033a7..6235592 100644 --- a/org.glite.lb.logger/src/logd_proto.c +++ b/org.glite.lb.logger/src/logd_proto.c @@ -682,19 +682,27 @@ open_event_file: } edg_wll_ll_log(LOG_DEBUG,"Connecting to UNIX socket..."); - if(connect(msg_sock, (struct sockaddr *)&saddr, sizeof(saddr)) < 0) { - if(errno != EISCONN) { - edg_wll_ll_log(LOG_DEBUG,"error.\n"); - SYSTEM_ERROR("connect"); - answer = errno; - close(msg_sock); - goto edg_wll_log_proto_server_end; - } else { + for (i=0; i < CONNECT_ATTEMPTS; i++) { + if(connect(msg_sock, (struct sockaddr *)&saddr, sizeof(saddr)) < 0) { + if ((errno == EAGAIN) || (errno == ETIMEDOUT)) { + edg_wll_ll_log(LOG_DEBUG,"."); + sleep(CONNECT_TIMEOUT); + continue; + } else if (errno == EISCONN) { edg_wll_ll_log(LOG_DEBUG,"warning.\n"); - edg_wll_ll_log(LOG_ERR,"The socket is already connected!\n"); + edg_wll_ll_log(LOG_ERR,"The socket is already connected!\n"); + break; + } else { + edg_wll_ll_log(LOG_DEBUG,"error.\n"); + SYSTEM_ERROR("connect"); + answer = errno; + close(msg_sock); + goto edg_wll_log_proto_server_end_1; + } + } else { + edg_wll_ll_log(LOG_DEBUG,"o.k.\n"); + break; } - } else { - edg_wll_ll_log(LOG_DEBUG,"o.k.\n"); } edg_wll_ll_log(LOG_DEBUG,"Sending via IPC the message position %ld (%d bytes)...", filepos, sizeof(filepos)); @@ -704,7 +712,7 @@ open_event_file: edg_wll_ll_log(LOG_ERR,"edg_wll_socket_write_full(): error,\n"); answer = errno; close(msg_sock); - goto edg_wll_log_proto_server_end; + goto edg_wll_log_proto_server_end_1; } else { edg_wll_ll_log(LOG_DEBUG,"o.k.\n"); } @@ -715,7 +723,7 @@ open_event_file: edg_wll_ll_log(LOG_ERR,"edg_wll_socket_write_full(): error."); answer = errno; close(msg_sock); - goto edg_wll_log_proto_server_end; + goto edg_wll_log_proto_server_end_1; } else { edg_wll_ll_log(LOG_DEBUG,"o.k.\n"); } @@ -759,6 +767,13 @@ edg_wll_log_proto_server_end: edg_wll_ll_log(LOG_INFO,"Done.\n"); return answer; + +edg_wll_log_proto_server_end_1: + if (event->any.priority) { + close(confirm_sock); + unlink(confirm_sock_name); + } + goto edg_wll_log_proto_server_end; } /* @@ -837,7 +852,7 @@ void edg_wll_ll_log(int level, const char *fmt, ...) { va_end(fmt_args); if(level <= edg_wll_ll_log_level) - fprintf(stderr, err_text); + fprintf(stderr, "[%d] %s", (int) getpid(), err_text); if(level <= LOG_ERR) { openlog("edg-wl-logd", LOG_PID | LOG_CONS, LOG_DAEMON); syslog(level, "%s", err_text); diff --git a/org.glite.lb.logger/src/logd_proto.h b/org.glite.lb.logger/src/logd_proto.h index f28ee53..b8e167e 100644 --- a/org.glite.lb.logger/src/logd_proto.h +++ b/org.glite.lb.logger/src/logd_proto.h @@ -34,11 +34,19 @@ extern int edg_wll_ll_log_level; void edg_wll_ll_log_init(int level); void edg_wll_ll_log(int level, const char *fmt, ...); + /* fcntl defaults */ #define FCNTL_ATTEMPTS 5 #define FCNTL_TIMEOUT 1 + +/* connect defaults */ + +#define CONNECT_ATTEMPTS 50 +#define CONNECT_TIMEOUT 10 + + /* locallogger daemon listen and connect functions prototypes */ int do_listen(int port); diff --git a/org.glite.lb.logger/src/queue_thread.c b/org.glite.lb.logger/src/queue_thread.c index 801f9ec..5eeff30 100644 --- a/org.glite.lb.logger/src/queue_thread.c +++ b/org.glite.lb.logger/src/queue_thread.c @@ -32,7 +32,7 @@ void * queue_thread(void *q) { struct event_queue *eq = (struct event_queue *)q; - int ret, exit; + int ret, exit, flushing; if(init_errors(0) < 0) { il_log(LOG_ERR, "Error initializing thread specific data, exiting!"); @@ -52,7 +52,7 @@ queue_thread(void *q) ret = 0; while (event_queue_empty(eq) #if defined(INTERLOGD_HANDLE_CMD) && defined(INTERLOGD_FLUSH) - && !eq->flushing + && ((flushing=eq->flushing) != 1) #endif ) { ret = event_queue_wait(eq, 0); @@ -70,75 +70,69 @@ queue_thread(void *q) * we are sending or request flush operation */ event_queue_cond_unlock(eq); - + /* connect to server */ - if((ret=event_queue_connect(eq)) < 0) { - /* internal error */ - il_log(LOG_ERR, "queue_thread: %s\n", error_get_msg()); - /* this allows for collecting status when flushing; - immediate exit would not do */ - exit = 1; - break; - } else if(ret == 0) { + if((ret=event_queue_connect(eq)) == 0) { /* not connected */ if(error_get_maj() != IL_OK) il_log(LOG_ERR, "queue_thread: %s\n", error_get_msg()); - il_log(LOG_INFO, " could not connect to server %s, waiting for retry\n", eq->dest_name); +#if defined(IL_NOTIFICATIONS) + il_log(LOG_INFO, " could not connect to client %s, waiting for retry\n", eq->dest_name); +#else + il_log(LOG_INFO, " could not connect to bookkeeping server %s, waiting for retry\n", eq->dest_name); +#endif } else { /* connected, send events */ switch(ret=event_queue_send(eq)) { - + case 0: /* there was an error and we still have events to send */ if(error_get_maj() != IL_OK) il_log(LOG_ERR, "queue_thread: %s\n", error_get_msg()); il_log(LOG_DEBUG, " events still waiting\n"); break; - + case 1: /* hey, we are done for now */ il_log(LOG_DEBUG, " all events for %s sent\n", eq->dest_name); break; - + default: /* internal error */ il_log(LOG_ERR, "queue_thread: %s\n", error_get_msg()); exit = 1; break; - + } /* switch */ + + /* we are done for now anyway, so close the queue */ + event_queue_close(eq); } - /* we are done for now anyway, so close the queue */ - event_queue_close(eq); - +#if defined(INTERLOGD_HANDLE_CMD) && defined(INTERLOGD_FLUSH) + if(pthread_mutex_lock(&flush_lock) < 0) + abort(); event_queue_cond_lock(eq); -#if defined(INTERLOGD_HANDLE_CMD) && defined(INTERLOGD_FLUSH) /* Check if we are flushing and if we are, report status to master */ - if(eq->flushing == 1) { + if(flushing == 1) { il_log(LOG_DEBUG, " flushing mode detected, reporting status\n"); /* 0 - events waiting, 1 - events sent, < 0 - some error */ eq->flush_result = ret; eq->flushing = 2; - if(pthread_mutex_lock(&flush_lock) < 0) - abort(); if(pthread_cond_signal(&flush_cond) < 0) abort(); - if(pthread_mutex_unlock(&flush_lock) < 0) - abort(); } + if(pthread_mutex_unlock(&flush_lock) < 0) + abort(); +#else + event_queue_cond_lock(eq); #endif /* if there was some error with server, sleep for a while */ /* iff !event_queue_empty() */ - if(ret == 0) { - if(event_queue_sleep(eq) < 0) { - il_log(LOG_ERR, "queue_thread: %s\n", error_get_msg()); - event_queue_cond_unlock(eq); - pthread_exit((void*)-1); - } - } + if(ret == 0) + event_queue_sleep(eq); if(exit) { /* we have to clean up before exiting */ diff --git a/org.glite.lb.logger/src/recover.c b/org.glite.lb.logger/src/recover.c index 7ae417e..670c2f1 100644 --- a/org.glite.lb.logger/src/recover.c +++ b/org.glite.lb.logger/src/recover.c @@ -8,11 +8,11 @@ extern char *file_prefix; +extern time_t cert_mtime, key_mtime; + void * recover_thread(void *q) { - time_t cert_mtime = 0, key_mtime = 0; - if(init_errors(0) < 0) { il_log(LOG_ERR, "Error initializing thread specific data, exiting!"); pthread_exit(NULL); diff --git a/org.glite.lb.logger/src/send_event.c b/org.glite.lb.logger/src/send_event.c index b3b0193..83de250 100644 --- a/org.glite.lb.logger/src/send_event.c +++ b/org.glite.lb.logger/src/send_event.c @@ -183,7 +183,7 @@ get_reply(struct event_queue *eq, char **buf, int *code_min) /* - * Returns: -1 - internal error, 0 - not connected, timeout set, 1 - OK + * Returns: 0 - not connected, timeout set, 1 - OK */ int event_queue_connect(struct event_queue *eq) @@ -260,6 +260,8 @@ event_queue_send(struct event_queue *eq) if(event_queue_get(eq, &msg) < 0) return(-1); + il_log(LOG_DEBUG, " trying to deliver event at offset %d for job %s\n", msg->offset, msg->job_id_s); + tv.tv_sec = TIMEOUT; tv.tv_usec = 0; ret = edg_wll_gss_write_full(&eq->gss, msg->msg, msg->len, &tv, &bytes_sent, &gss_stat); diff --git a/org.glite.lb.logger/src/server_msg.c b/org.glite.lb.logger/src/server_msg.c index 69f8fb7..aa7e75c 100644 --- a/org.glite.lb.logger/src/server_msg.c +++ b/org.glite.lb.logger/src/server_msg.c @@ -80,7 +80,7 @@ create_msg(char *event, char **buffer, long *receipt) struct server_msg * -server_msg_create(char *event) +server_msg_create(char *event, long offset) { struct server_msg *msg; @@ -94,6 +94,7 @@ server_msg_create(char *event) server_msg_free(msg); return(NULL); } + msg->offset = offset; return(msg); } @@ -123,6 +124,7 @@ server_msg_copy(struct server_msg *src) msg->ev_len = src->ev_len; msg->es = src->es; msg->receipt_to = src->receipt_to; + msg->offset = src->offset; #if defined(IL_NOTIFICATIONS) msg->dest_name = strdup(src->dest_name); msg->dest_port = src->dest_port; -- 1.8.2.3