From 01bff05c9a6d0c8c56c5307427d02bdcc3586966 Mon Sep 17 00:00:00 2001 From: =?utf8?q?Michal=20Voc=C5=AF?= Date: Thu, 7 Dec 2006 10:29:46 +0000 Subject: [PATCH] backports from 3.1: - rename corrupted event file to avoid rereading it - communication performance improvements --- org.glite.lb.logger/src/event_store.c | 127 ++++++++++++++++++++++++++- org.glite.lb.logger/src/il_master.c | 5 +- org.glite.lb.logger/src/input_queue_socket.c | 33 +++++-- org.glite.lb.logger/src/interlogd.c | 25 +++++- org.glite.lb.logger/src/interlogd.h | 4 +- org.glite.lb.logger/src/queue_thread.c | 30 ++++++- org.glite.lb.logger/src/send_event.c | 24 +++-- 7 files changed, 225 insertions(+), 23 deletions(-) diff --git a/org.glite.lb.logger/src/event_store.c b/org.glite.lb.logger/src/event_store.c index 9f56219..b6ed5f9 100644 --- a/org.glite.lb.logger/src/event_store.c +++ b/org.glite.lb.logger/src/event_store.c @@ -6,6 +6,7 @@ #include #include #include +#include #include #ifdef HAVE_UNISTD_H #include @@ -279,6 +280,65 @@ event_store_write_ctl(struct event_store *es) /* + * event_store_qurantine() + * - rename damaged event store file + * - essentially does the same actions as cleanup, but the event store + * does not have to be empty + * returns 0 on success, -1 on error + */ +static +int +event_store_quarantine(struct event_store *es) +{ + int num; + char newname[MAXPATHLEN+1]; + + /* find available qurantine name */ + /* we give it at most 1024 tries */ + for(num = 0; num < 1024; num++) { + struct stat st; + + snprintf(newname, MAXPATHLEN, "%s.quarantine.%d", es->event_file_name, num); + newname[MAXPATHLEN] = 0; + if(stat(newname, &st) < 0) { + if(errno == ENOENT) { + /* file not found */ + break; + } else { + /* some other error with name, probably permanent */ + set_error(IL_SYS, errno, "event_store_qurantine: error looking for qurantine filename"); + return(-1); + + } + } else { + /* the filename is used already */ + } + } + if(num >= 1024) { + /* new name not found */ + /* XXX - is there more suitable error? */ + set_error(IL_SYS, ENOSPC, "event_store_quarantine: exhausted number of retries looking for quarantine filename"); + return(-1); + } + + /* actually rename the file */ + il_log(LOG_DEBUG, " renaming damaged event file from %s to %s\n", + es->event_file_name, newname); + if(rename(es->event_file_name, newname) < 0) { + set_error(IL_SYS, errno, "event_store_quarantine: error renaming event file"); + return(-1); + } + + /* clear the counters */ + es->last_committed_ls = 0; + es->last_committed_bs = 0; + es->offset = 0; + + return(0); +} + + +/* * event_store_recover() * - recover after restart or catch up when events missing in IPC * - if offset > 0, read everything behind it @@ -428,8 +488,12 @@ event_store_recover(struct event_store *es) msg = server_msg_create(event_s, last); free(event_s); if(msg == NULL) { - il_log(LOG_ALERT, " event file corrupted! Please move it to quarantine (ie. somewhere else) and restart interlogger.\n"); - break; + il_log(LOG_ALERT, " event file corrupted! I will try to move it to quarantine (ie. rename it).\n"); + /* actually do not bother if quarantine succeeded or not - we could not do more */ + event_store_quarantine(es); + fclose(ef); + event_store_unlock(es); + return(-1); } msg->es = es; @@ -704,7 +768,6 @@ event_store_clean(struct event_store *es) } - /* -------------------------------- * event store management functions * -------------------------------- @@ -798,6 +861,11 @@ event_store_from_file(char *filename) il_log(LOG_INFO, " attaching to event file: %s\n", filename); + if(strstr(filename, "quarantine") != NULL) { + il_log(LOG_INFO, " file name belongs to quarantine, not touching that.\n"); + return(0); + } + event_file = fopen(filename, "r"); if(event_file == NULL) { set_error(IL_SYS, errno, "event_store_from_file: error opening event file"); @@ -944,6 +1012,59 @@ event_store_init(char *prefix) free(s); } closedir(event_dir); + + /* one more pass - this time remove stale .ctl files */ + event_dir = opendir(dir); + if(event_dir == NULL) { + free(dir); + set_error(IL_SYS, errno, "event_store_init: error opening event directory"); + return(-1); + } + + while((entry=readdir(event_dir))) { + char *s; + + /* skip all files that do not match prefix */ + if(strncmp(entry->d_name, p, len) != 0) + continue; + + /* find all control files */ + if((s=strstr(entry->d_name, ".ctl")) != NULL && + s[4] == '\0') { + char *ef; + struct stat st; + + /* is there corresponding event file? */ + ef = malloc(strlen(dir) + strlen(entry->d_name) + 2); + if(ef == NULL) { + free(dir); + set_error(IL_NOMEM, ENOMEM, "event_store_init: no room for event file name"); + return(-1); + } + + s[0] = 0; + *ef = '\0'; + strcat(ef, dir); + strcat(ef, "/"); + strcat(ef, entry->d_name); + s[0] = '.'; + + if(stat(ef, &st) == 0) { + /* something is there */ + /* XXX - it could be something else than event file, but do not bother now */ + } else { + /* could not stat file, remove ctl */ + strcat(ef, s); + il_log(LOG_DEBUG, " removing stale file %s\n", ef); + if(unlink(ef)) + il_log(LOG_ERR, " could not remove file %s: %s\n", ef, strerror(errno)); + + } + free(ef); + + } + } + closedir(event_dir); free(dir); } diff --git a/org.glite.lb.logger/src/il_master.c b/org.glite.lb.logger/src/il_master.c index b045112..546fbc7 100644 --- a/org.glite.lb.logger/src/il_master.c +++ b/org.glite.lb.logger/src/il_master.c @@ -385,7 +385,10 @@ loop() char *msg; long offset; int ret; - + + if(killflg) + return(0); + clear_error(); if((ret = input_queue_get(&msg, &offset, INPUT_TIMEOUT)) < 0) { diff --git a/org.glite.lb.logger/src/input_queue_socket.c b/org.glite.lb.logger/src/input_queue_socket.c index f183319..becca80 100644 --- a/org.glite.lb.logger/src/input_queue_socket.c +++ b/org.glite.lb.logger/src/input_queue_socket.c @@ -109,8 +109,21 @@ read_event(int sock, long *offset) } /* copy all relevant bytes from buffer */ - for(i=0; (i < len) && (buf[i] != EVENT_SEPARATOR); i++) - *p++ = buf[i]; + n = (char*)memccpy(p, buf, EVENT_SEPARATOR, len); + if(n) { + /* separator found */ + n--; /* but do not preserve it */ + i = n - p; + p = n; + } else { + /* separator not found */ + i = len; + p += len; + } + /* This was definitely slowing us down: + * for(i=0; (i < len) && (buf[i] != EVENT_SEPARATOR); i++) + * *p++ = buf[i]; + */ /* remove the data from queue */ if(i > 0) @@ -140,6 +153,7 @@ read_event(int sock, long *offset) return(NULL); } +#if 0 /* this is probably not necessary at all: either len <=0, which was covered before, or 0 <= i < len => p > buffer; @@ -150,6 +164,7 @@ read_event(int sock, long *offset) free(buffer); return(NULL); } +#endif return(buffer); } @@ -181,11 +196,17 @@ input_queue_get(char **buffer, long *offset, int timeout) return(0); case -1: /* error */ - set_error(IL_SYS, errno, "input_queue_get: error waiting for event"); - return(-1); - + switch(errno) { + case EINTR: + il_log(LOG_DEBUG, " interrupted while waiting for event!\n"); + return(0); + + default: + set_error(IL_SYS, errno, "input_queue_get: error waiting for event"); + return(-1); + } default: - break; + break; } if((accepted=accept(sock, NULL, NULL)) < 0) { diff --git a/org.glite.lb.logger/src/interlogd.c b/org.glite.lb.logger/src/interlogd.c index d2d53b3..ef64a90 100644 --- a/org.glite.lb.logger/src/interlogd.c +++ b/org.glite.lb.logger/src/interlogd.c @@ -13,6 +13,7 @@ #include "interlogd.h" #include "glite/lb/consumer.h" +#include "glite/lb/log_proto.h" #include "glite/security/glite_gss.h" #define EXIT_FAILURE 1 @@ -20,14 +21,14 @@ #define DEFAULT_PREFIX "/tmp/notif_events" #define DEFAULT_SOCKET "/tmp/notif_interlogger.sock" #else -#define DEFAULT_PREFIX "/tmp/dglogd.log" +#define DEFAULT_PREFIX EDG_WLL_LOG_PREFIX_DEFAULT #define DEFAULT_SOCKET "/tmp/interlogger.sock" #endif /* The name the program was run with, stripped of any leading path. */ char *program_name; -static int killflg = 0; +int killflg = 0; int TIMEOUT = DEFAULT_TIMEOUT; @@ -53,6 +54,7 @@ static void usage (int status) " -b, --book send events to bookkeeping server only\n" " -l, --log-server specify address of log server\n" " -s, --socket non-default path of local socket\n" + " -L, --lazy [] be lazy when closing connections to servers\n" , program_name, program_name); exit(status); } @@ -63,6 +65,8 @@ static int debug; static int verbose = 0; char *file_prefix = DEFAULT_PREFIX; int bs_only = 0; +int lazy_close = 0; +int default_close_timeout; char *cert_file = NULL; char *key_file = NULL; @@ -83,6 +87,7 @@ static struct option const long_options[] = {"CAdir", required_argument, 0, 'C'}, {"log-server", required_argument, 0, 'l'}, {"socket", required_argument, 0, 's'}, + {"lazy", optional_argument, 0, 'L'}, {NULL, 0, NULL, 0} }; @@ -108,6 +113,7 @@ decode_switches (int argc, char **argv) "b" /* only bookeeping */ "l:" /* log server */ "d" /* debug */ + "L::" /* lazy */ "s:", /* socket */ long_options, (int *) 0)) != EOF) { @@ -156,6 +162,14 @@ decode_switches (int argc, char **argv) socket_path = strdup(optarg); break; + case 'L': + lazy_close = 1; + if(optarg) + default_close_timeout = atoi(optarg); + else + default_close_timeout = TIMEOUT; + break; + default: usage (EXIT_FAILURE); } @@ -183,10 +197,10 @@ main (int argc, char **argv) setlinebuf(stdout); setlinebuf(stderr); - i = decode_switches (argc, argv); - if ((p = getenv("EDG_WL_INTERLOG_TIMEOUT"))) TIMEOUT = atoi(p); + i = decode_switches (argc, argv); + /* force -b if we do not have log server */ if(log_server == NULL) { log_server = strdup(DEFAULT_LOG_SERVER); @@ -218,6 +232,9 @@ main (int argc, char **argv) il_log(LOG_CRIT, "Failed to initialize output event queues: %s\n", error_get_msg()); exit(EXIT_FAILURE); } + if(lazy_close) + il_log(LOG_DEBUG, " using lazy mode when closing connections, timeout %d\n", + default_close_timeout); if (CAcert_dir) setenv("X509_CERT_DIR", CAcert_dir, 1); diff --git a/org.glite.lb.logger/src/interlogd.h b/org.glite.lb.logger/src/interlogd.h index 64c6889..7c443fe 100644 --- a/org.glite.lb.logger/src/interlogd.h +++ b/org.glite.lb.logger/src/interlogd.h @@ -64,7 +64,9 @@ extern char *cert_file; extern char *key_file; extern char *CAcert_dir; extern int bs_only; - +extern int killflg; +extern int lazy_close; +extern int default_close_timeout; /* shared data for thread communication */ #ifdef INTERLOGD_FLUSH diff --git a/org.glite.lb.logger/src/queue_thread.c b/org.glite.lb.logger/src/queue_thread.c index 9ee4f27..8064ce7 100644 --- a/org.glite.lb.logger/src/queue_thread.c +++ b/org.glite.lb.logger/src/queue_thread.c @@ -33,6 +33,8 @@ queue_thread(void *q) { struct event_queue *eq = (struct event_queue *)q; int ret, exit; + int retrycnt; + int close_timeout; if(init_errors(0) < 0) { il_log(LOG_ERR, "Error initializing thread specific data, exiting!"); @@ -46,6 +48,7 @@ queue_thread(void *q) event_queue_cond_lock(eq); exit = 0; + retrycnt = 0; while(!exit) { clear_error(); @@ -57,7 +60,16 @@ queue_thread(void *q) && (eq->flushing != 1) #endif ) { - ret = event_queue_wait(eq, 0); + if(lazy_close && close_timeout) { + ret = event_queue_wait(eq, close_timeout); + if(ret == 1) {/* timeout? */ + event_queue_close(eq); + il_log(LOG_DEBUG, " connection to %s:%d closed\n", + eq->dest_name, eq->dest_port); + } + close_timeout = 0; + } else + ret = event_queue_wait(eq, 0); if(ret < 0) { /* error waiting */ il_log(LOG_ERR, "queue_thread: %s\n", error_get_msg()); @@ -83,7 +95,9 @@ queue_thread(void *q) #else il_log(LOG_INFO, " could not connect to bookkeeping server %s, waiting for retry\n", eq->dest_name); #endif + retrycnt++; } else { + retrycnt = 0; /* connected, send events */ switch(ret=event_queue_send(eq)) { @@ -108,7 +122,13 @@ queue_thread(void *q) } /* switch */ /* we are done for now anyway, so close the queue */ + if((ret == 1) && lazy_close) + close_timeout = default_close_timeout; + else { event_queue_close(eq); + il_log(LOG_DEBUG, " connection to %s:%d closed\n", + eq->dest_name, eq->dest_port); + } } #if defined(INTERLOGD_HANDLE_CMD) && defined(INTERLOGD_FLUSH) @@ -133,8 +153,14 @@ queue_thread(void *q) /* if there was some error with server, sleep for a while */ /* iff !event_queue_empty() */ - if(ret == 0) + /* also allow for one more try immediately after server disconnect, + which may cure server kicking us out after given number of connections */ +#ifndef LB_PERF + if((ret == 0) && (retrycnt > 0)) { + il_log(LOG_WARNING, " sleeping\n"); event_queue_sleep(eq); + } +#endif if(exit) { /* we have to clean up before exiting */ diff --git a/org.glite.lb.logger/src/send_event.c b/org.glite.lb.logger/src/send_event.c index 7f1eb9d..0040a47 100644 --- a/org.glite.lb.logger/src/send_event.c +++ b/org.glite.lb.logger/src/send_event.c @@ -216,6 +216,7 @@ event_queue_close(struct event_queue *eq) int event_queue_send(struct event_queue *eq) { + int events_sent = 0; assert(eq != NULL); if(eq->gss.context == GSS_C_NO_CONTEXT) @@ -241,15 +242,25 @@ event_queue_send(struct event_queue *eq) tv.tv_usec = 0; ret = edg_wll_gss_write_full(&eq->gss, msg->msg, msg->len, &tv, &bytes_sent, &gss_stat); if(ret < 0) { - eq->timeout = TIMEOUT; - return(0); + eq->timeout = TIMEOUT; + return(0); + } + if(ret < 0) { + if (ret == EDG_WLL_GSS_ERROR_ERRNO && errno == EPIPE && events_sent > 0) + eq->timeout = 0; + else + eq->timeout = TIMEOUT; + return(0); } if((code = get_reply(eq, &rep, &code_min)) < 0) { - /* could not get the reply properly, so try again later */ - il_log(LOG_ERR, " error reading server %s reply:\n %s\n", eq->dest_name, error_get_msg()); - eq->timeout = TIMEOUT; - return(0); + /* could not get the reply properly, so try again later */ + il_log(LOG_ERR, " error reading server %s reply:\n %s\n", eq->dest_name, error_get_msg()); + if (events_sent>0) + eq->timeout = 1; + else + eq->timeout = TIMEOUT; + return(0); } il_log(LOG_DEBUG, " event sent, server %s replied with %d, %s\n", eq->dest_name, code, rep); @@ -288,6 +299,7 @@ event_queue_send(struct event_queue *eq) il_log(LOG_ERR, "send_event: %s\n", error_get_msg()); event_queue_remove(eq); + events_sent++; break; } /* switch */ -- 1.8.2.3