- pullup INFN CVS changes
authorAleš Křenek <ljocha@ics.muni.cz>
Thu, 7 Oct 2004 14:34:08 +0000 (14:34 +0000)
committerAleš Křenek <ljocha@ics.muni.cz>
Thu, 7 Oct 2004 14:34:08 +0000 (14:34 +0000)
- comment out unit tests in Makefile until they are added into CVS

15 files changed:
org.glite.lb.logger/Makefile
org.glite.lb.logger/project/version.properties
org.glite.lb.logger/src/event_queue.c
org.glite.lb.logger/src/event_store.c
org.glite.lb.logger/src/il_master.c
org.glite.lb.logger/src/input_queue_socket.c
org.glite.lb.logger/src/interlogd.c
org.glite.lb.logger/src/interlogd.h
org.glite.lb.logger/src/logd.c
org.glite.lb.logger/src/logd_proto.c
org.glite.lb.logger/src/logd_proto.h
org.glite.lb.logger/src/queue_thread.c
org.glite.lb.logger/src/recover.c
org.glite.lb.logger/src/send_event.c
org.glite.lb.logger/src/server_msg.c

index 95c3adc..7cfce1f 100644 (file)
@@ -104,7 +104,9 @@ all compile: glite_lb_logd glite_lb_interlogd glite_lb_notif_interlogd
 stage: compile
        $(MAKE) install PREFIX=${stagedir}
 
-check: check.ll check.il
+check: 
+# do nothing until test/ is really added to CVS
+# check.ll check.il
 
 #check.ll: logd_proto_test.o ll_test.o
 #      ${LINKXX} -o $@ ${COMMON_LIB}_${nothrflavour} ${EXT_LIBS} ${GLOBUS_LIBS} ${TEST_LIBS} $+
index 7732282..544b6e8 100644 (file)
@@ -1,4 +1,4 @@
 #Thu Oct 07 13:57:49 CEST 2004
-module.version=0.1.0
+module.version=0.2.0
 module.build=34
 module.age=0
index 1873bde..d311a1a 100644 (file)
@@ -264,6 +264,7 @@ event_queue_remove(struct event_queue *eq)
 }
 
 #if defined(IL_NOTIFICATIONS)
+
 int
 event_queue_move_events(struct event_queue *eq_s, struct event_queue *eq_d, char *notif_id)
 {
@@ -276,15 +277,15 @@ event_queue_move_events(struct event_queue *eq_s, struct event_queue *eq_d, char
        if(eq_d) {
                event_queue_lock(eq_d);
                /* dest tail is set to point to the last (NULL) pointer in the list */
-               dest_tail = (eq_d->tail == NULL) ? &(eq_d->tail) : &(eq_d->tail->prev);
+               dest_tail = (eq_d->head == NULL) ? &(eq_d->head) : &(eq_d->tail->prev);
        }
        source_prev = &(eq_s->head);
        p = *source_prev;
-       eq_s = NULL;
+       eq_s->tail = NULL;
        while(p) {
                if(strcmp(p->msg->job_id_s, notif_id) == 0) {
-                       il_log(LOG_DEBUG, "  moving event with notif id %s from %s to %s\n",
-                              notif_id, eq_s->dest_name, eq_d ? eq_d->dest_name : "trash");
+                       il_log(LOG_DEBUG, "  moving event with notif id %s from %s:%d to %s:%d\n",
+                              notif_id, eq_s->dest_name,eq_s->dest_port, eq_d ? eq_d->dest_name : "trash",eq_d ? eq_d->dest_port : -1);
                        /* remove the message from the source list */
                        *source_prev = p->prev;
                        if(eq_d) {
index 2e08d17..295b2f0 100644 (file)
@@ -287,7 +287,7 @@ event_store_write_ctl(struct event_store *es)
 int
 event_store_recover(struct event_store *es)
 {
-  struct event_queue *eq_l, *eq_b;
+  struct event_queue *eq_l = NULL, *eq_b, *eq_b_new;
   struct server_msg *msg;
   char *event_s;
   int fd, ret;
@@ -380,7 +380,7 @@ event_store_recover(struct event_store *es)
     ret = -1;
 
     /* create message for server */
-    msg = server_msg_create(event_s);
+    msg = server_msg_create(event_s, last);
     free(event_s);
     if(msg == NULL) {
       break;
@@ -398,6 +398,15 @@ event_store_recover(struct event_store *es)
 #endif
       }
 
+#ifdef IL_NOTIFICATIONS
+    eq_b_new = queue_list_get(msg->dest);
+    if (eq_b_new != eq_b) {
+           free(es->dest);
+           es->dest = strdup(msg->dest);
+           eq_b = eq_b_new;
+    }
+#endif
+
     /* now enqueue to the BS, if neccessary */
     if((eq_b != eq_l) && 
        (last >= es->last_committed_bs)) {
index c4f95e4..f6414c9 100644 (file)
@@ -29,17 +29,19 @@ enqueue_msg(struct event_queue *eq, struct server_msg *msg)
                if(eq_known != NULL) 
                        event_queue_move_events(eq_known, eq, msg->job_id_s);
        }
+#endif
 
+       /* fire thread to take care of this queue */
+       if(event_queue_create_thread(eq) < 0) 
+               return(-1);
+       
+#if defined(IL_NOTIFICATIONS)
        /* if there are no data to send, do not send anything 
           (messsage was just to change the delivery address) */
        if(msg->len == 0) 
                return(0);
 
 #endif
-       /* fire thread to take care of this queue */
-       if(event_queue_create_thread(eq) < 0) 
-               return(-1);
-       
        /* avoid losing signal to thread */
        event_queue_cond_lock(eq);
 
@@ -309,7 +311,7 @@ handle_msg(char *event, long offset)
        int ret;
 
        /* convert event to message for server */
-       if((msg = server_msg_create(event)) == NULL) {
+       if((msg = server_msg_create(event, offset)) == NULL) {
                il_log(LOG_ERR, "    handle_msg: error parsing event '%s':\n      %s\n", event, error_get_msg());
                return(0);
        }
index 0587cc8..f183319 100644 (file)
@@ -11,7 +11,7 @@
 #include "interlogd.h"
 
 
-static const int   SOCK_QUEUE_MAX = 5;
+static const int   SOCK_QUEUE_MAX = 50;
 extern char *socket_path;
 
 static int sock;
index 51c65f5..efe13e7 100644 (file)
@@ -11,8 +11,6 @@
 
 #include <globus_common.h>
 
-#include "glite/wmsutils/tls/ssl_helpers/ssl_inits.h"
-#include "glite/wmsutils/tls/ssl_helpers/ssl_pthreads.h"
 #include "interlogd.h"
 #include "glite/lb/consumer.h"
 #include "glite/lb/lb_gss.h"
@@ -36,6 +34,8 @@ int TIMEOUT = DEFAULT_TIMEOUT;
 gss_cred_id_t cred_handle = GSS_C_NO_CREDENTIAL;
 pthread_mutex_t cred_handle_lock = PTHREAD_MUTEX_INITIALIZER;
 
+time_t key_mtime = 0, cert_mtime = 0;
+
 static void usage (int status)
 {
        printf("%s - \n"
@@ -222,6 +222,7 @@ main (int argc, char **argv)
   if (CAcert_dir)
      setenv("X509_CERT_DIR", CAcert_dir, 1);
 
+  edg_wll_gss_watch_creds(cert_file,&cert_mtime);
   ret = edg_wll_gss_acquire_cred_gsi(cert_file, &cred_handle, NULL, &gss_stat);
   if (ret) {
      char *gss_err = NULL;
index bebda87..3c96828 100644 (file)
@@ -90,6 +90,7 @@ struct event_store {
 
 struct server_msg {
        char                   *job_id_s;       /* necessary for commit */
+       long                    offset;         /* just for printing more information to debug */
        char                   *msg;
        int                     len;
        int                     ev_len;
@@ -128,7 +129,7 @@ struct event_queue {
 
 
 /* server msg methods */
-struct server_msg *server_msg_create(char *);
+struct server_msg *server_msg_create(char *, long);
 struct server_msg *server_msg_copy(struct server_msg *);
 int server_msg_init(struct server_msg *, char *);
 #if defined(INTERLOGD_EMS)
index ad3fbd4..c8c1bfe 100644 (file)
@@ -340,6 +340,7 @@ Copyright (c) 2002 CERN, INFN and CESNET on behalf of the EU DataGrid.\n");
    if (mysignal(SIGTERM, handle_signal) == SIG_ERR) { perror("signal"); exit(1); }
    if (mysignal(SIGCHLD, handle_signal) == SIG_ERR) { perror("signal"); exit(1); }
 
+   edg_wll_gss_watch_creds(cert_file,&cert_mtime);
    /* XXX DK: support noAuth */
    ret = edg_wll_gss_acquire_cred_gsi(cert_file, &cred, &my_subject_name,
                                      &gss_stat);
index bb033a7..6235592 100644 (file)
@@ -682,19 +682,27 @@ open_event_file:
                }         
 
                edg_wll_ll_log(LOG_DEBUG,"Connecting to UNIX socket...");
-               if(connect(msg_sock, (struct sockaddr *)&saddr, sizeof(saddr)) < 0) {
-                       if(errno != EISCONN) {
-                               edg_wll_ll_log(LOG_DEBUG,"error.\n");
-                               SYSTEM_ERROR("connect"); 
-                               answer = errno; 
-                               close(msg_sock); 
-                               goto edg_wll_log_proto_server_end;
-                       } else {
+               for (i=0; i < CONNECT_ATTEMPTS; i++) {
+                       if(connect(msg_sock, (struct sockaddr *)&saddr, sizeof(saddr)) < 0) {
+                               if ((errno == EAGAIN) || (errno == ETIMEDOUT)) {
+                                       edg_wll_ll_log(LOG_DEBUG,"."); 
+                                       sleep(CONNECT_TIMEOUT);
+                                       continue;
+                               } else if (errno == EISCONN) {
                                        edg_wll_ll_log(LOG_DEBUG,"warning.\n");
-                               edg_wll_ll_log(LOG_ERR,"The socket is already connected!\n");
+                                       edg_wll_ll_log(LOG_ERR,"The socket is already connected!\n");
+                                       break;
+                               } else {
+                                       edg_wll_ll_log(LOG_DEBUG,"error.\n");
+                                       SYSTEM_ERROR("connect"); 
+                                       answer = errno; 
+                                       close(msg_sock); 
+                                       goto edg_wll_log_proto_server_end_1;
+                               }
+                       } else {
+                               edg_wll_ll_log(LOG_DEBUG,"o.k.\n");
+                               break;
                        }
-               } else {
-                       edg_wll_ll_log(LOG_DEBUG,"o.k.\n");
                }
 
                edg_wll_ll_log(LOG_DEBUG,"Sending via IPC the message position %ld (%d bytes)...", filepos, sizeof(filepos));
@@ -704,7 +712,7 @@ open_event_file:
                        edg_wll_ll_log(LOG_ERR,"edg_wll_socket_write_full(): error,\n");
                        answer = errno; 
                        close(msg_sock); 
-                       goto edg_wll_log_proto_server_end;
+                       goto edg_wll_log_proto_server_end_1;
                } else {
                        edg_wll_ll_log(LOG_DEBUG,"o.k.\n");
                }
@@ -715,7 +723,7 @@ open_event_file:
                        edg_wll_ll_log(LOG_ERR,"edg_wll_socket_write_full(): error."); 
                        answer = errno; 
                        close(msg_sock); 
-                       goto edg_wll_log_proto_server_end;
+                       goto edg_wll_log_proto_server_end_1;
                } else {
                        edg_wll_ll_log(LOG_DEBUG,"o.k.\n");
                }
@@ -759,6 +767,13 @@ edg_wll_log_proto_server_end:
        edg_wll_ll_log(LOG_INFO,"Done.\n");
 
        return answer;
+
+edg_wll_log_proto_server_end_1:
+       if (event->any.priority) {
+               close(confirm_sock);
+               unlink(confirm_sock_name);
+       }       
+       goto edg_wll_log_proto_server_end;
 }
 
 /*
@@ -837,7 +852,7 @@ void edg_wll_ll_log(int level, const char *fmt, ...) {
     va_end(fmt_args);
 
        if(level <= edg_wll_ll_log_level) 
-               fprintf(stderr, err_text);
+               fprintf(stderr, "[%d] %s", (int) getpid(), err_text);
        if(level <= LOG_ERR) {
         openlog("edg-wl-logd", LOG_PID | LOG_CONS, LOG_DAEMON);
                syslog(level, "%s", err_text);
index f28ee53..b8e167e 100644 (file)
@@ -34,11 +34,19 @@ extern int edg_wll_ll_log_level;
 void edg_wll_ll_log_init(int level);
 void edg_wll_ll_log(int level, const char *fmt, ...);
 
+
 /* fcntl defaults */
 
 #define FCNTL_ATTEMPTS         5
 #define FCNTL_TIMEOUT          1
 
+
+/* connect defaults */
+
+#define CONNECT_ATTEMPTS       50
+#define CONNECT_TIMEOUT                10
+
+
 /* locallogger daemon listen and connect functions prototypes */
 
 int do_listen(int port);
index 801f9ec..5eeff30 100644 (file)
@@ -32,7 +32,7 @@ void *
 queue_thread(void *q)
 {
        struct event_queue *eq = (struct event_queue *)q;
-       int ret, exit;
+       int ret, exit, flushing;
 
        if(init_errors(0) < 0) {
                il_log(LOG_ERR, "Error initializing thread specific data, exiting!");
@@ -52,7 +52,7 @@ queue_thread(void *q)
                ret = 0;
                while (event_queue_empty(eq) 
 #if defined(INTERLOGD_HANDLE_CMD) && defined(INTERLOGD_FLUSH)
-                      && !eq->flushing
+                      && ((flushing=eq->flushing) != 1)
 #endif
                        ) {
                        ret = event_queue_wait(eq, 0);
@@ -70,75 +70,69 @@ queue_thread(void *q)
                 * we are sending or request flush operation
                 */
                event_queue_cond_unlock(eq);
-    
+               
                /* connect to server */
-               if((ret=event_queue_connect(eq)) < 0) {
-                       /* internal error */
-                       il_log(LOG_ERR, "queue_thread: %s\n", error_get_msg());
-                       /* this allows for collecting status when flushing; 
-                          immediate exit would not do */
-                       exit = 1;
-                       break;
-               } else if(ret == 0) {
+               if((ret=event_queue_connect(eq)) == 0) {
                        /* not connected */
                        if(error_get_maj() != IL_OK)
                                il_log(LOG_ERR, "queue_thread: %s\n", error_get_msg());
-                       il_log(LOG_INFO, "    could not connect to server %s, waiting for retry\n", eq->dest_name);
+#if defined(IL_NOTIFICATIONS)
+                       il_log(LOG_INFO, "    could not connect to client %s, waiting for retry\n", eq->dest_name);
+#else
+                       il_log(LOG_INFO, "    could not connect to bookkeeping server %s, waiting for retry\n", eq->dest_name);
+#endif
                } else {
                        /* connected, send events */
                        switch(ret=event_queue_send(eq)) {
-      
+                               
                        case 0:
                                /* there was an error and we still have events to send */
                                if(error_get_maj() != IL_OK)
                                        il_log(LOG_ERR, "queue_thread: %s\n", error_get_msg());
                                il_log(LOG_DEBUG, "  events still waiting\n");
                                break;
-      
+                               
                        case 1:
                                /* hey, we are done for now */
                                il_log(LOG_DEBUG, "  all events for %s sent\n", eq->dest_name);
                                break;
-
+                               
                        default:
                                /* internal error */
                                il_log(LOG_ERR, "queue_thread: %s\n", error_get_msg());
                                exit = 1;      
                                break;
-
+                               
                        } /* switch */
+                       
+                       /* we are done for now anyway, so close the queue */
+                               event_queue_close(eq);
                } 
 
-               /* we are done for now anyway, so close the queue */
-               event_queue_close(eq);
-
+#if defined(INTERLOGD_HANDLE_CMD) && defined(INTERLOGD_FLUSH)
+               if(pthread_mutex_lock(&flush_lock) < 0)
+                       abort();
                event_queue_cond_lock(eq);
 
-#if defined(INTERLOGD_HANDLE_CMD) && defined(INTERLOGD_FLUSH)
                /* Check if we are flushing and if we are, report status to master */
-               if(eq->flushing == 1) {
+               if(flushing == 1) {
                        il_log(LOG_DEBUG, "    flushing mode detected, reporting status\n");
                        /* 0 - events waiting, 1 - events sent, < 0 - some error */
                        eq->flush_result = ret;
                        eq->flushing = 2;
-                       if(pthread_mutex_lock(&flush_lock) < 0)
-                               abort();
                        if(pthread_cond_signal(&flush_cond) < 0)
                                abort();
-                       if(pthread_mutex_unlock(&flush_lock) < 0)
-                               abort();
                }
+               if(pthread_mutex_unlock(&flush_lock) < 0)
+                       abort();
+#else
+               event_queue_cond_lock(eq);
 #endif
 
                /* if there was some error with server, sleep for a while */
                /* iff !event_queue_empty() */
-               if(ret == 0) {
-                       if(event_queue_sleep(eq) < 0) {
-                               il_log(LOG_ERR, "queue_thread: %s\n", error_get_msg());
-                               event_queue_cond_unlock(eq);
-                               pthread_exit((void*)-1);
-                       }
-               }
+               if(ret == 0) 
+                       event_queue_sleep(eq);
 
                if(exit) {
                        /* we have to clean up before exiting */
index 7ae417e..670c2f1 100644 (file)
@@ -8,11 +8,11 @@
 
 extern char *file_prefix;
 
+extern time_t cert_mtime, key_mtime;
+
 void *
 recover_thread(void *q)
 {
-       time_t cert_mtime = 0, key_mtime = 0;
-
        if(init_errors(0) < 0) {
                il_log(LOG_ERR, "Error initializing thread specific data, exiting!");
                pthread_exit(NULL);
index b3b0193..83de250 100644 (file)
@@ -183,7 +183,7 @@ get_reply(struct event_queue *eq, char **buf, int *code_min)
 
 
 /*
- *  Returns: -1 - internal error, 0 - not connected, timeout set, 1 - OK
+ *  Returns: 0 - not connected, timeout set, 1 - OK
  */
 int 
 event_queue_connect(struct event_queue *eq)
@@ -260,6 +260,8 @@ event_queue_send(struct event_queue *eq)
     if(event_queue_get(eq, &msg) < 0) 
       return(-1);
 
+    il_log(LOG_DEBUG, "    trying to deliver event at offset %d for job %s\n", msg->offset, msg->job_id_s);
+
     tv.tv_sec = TIMEOUT;
     tv.tv_usec = 0;
     ret = edg_wll_gss_write_full(&eq->gss, msg->msg, msg->len, &tv, &bytes_sent, &gss_stat);
index 69f8fb7..aa7e75c 100644 (file)
@@ -80,7 +80,7 @@ create_msg(char *event, char **buffer, long *receipt)
 
 
 struct server_msg *
-server_msg_create(char *event)
+server_msg_create(char *event, long offset)
 {
   struct server_msg *msg;
 
@@ -94,6 +94,7 @@ server_msg_create(char *event)
     server_msg_free(msg);
     return(NULL);
   }
+  msg->offset = offset;
 
   return(msg);
 }
@@ -123,6 +124,7 @@ server_msg_copy(struct server_msg *src)
   msg->ev_len = src->ev_len;
   msg->es = src->es;
   msg->receipt_to = src->receipt_to;
+  msg->offset = src->offset;
 #if defined(IL_NOTIFICATIONS)
   msg->dest_name = strdup(src->dest_name);
   msg->dest_port = src->dest_port;