#include <string.h>
#include <stdlib.h>
#include <sys/types.h>
+#include <sys/stat.h>
#include <dirent.h>
#ifdef HAVE_UNISTD_H
#include <unistd.h>
/*
+ * event_store_qurantine()
+ * - rename damaged event store file
+ * - essentially does the same actions as cleanup, but the event store
+ * does not have to be empty
+ * returns 0 on success, -1 on error
+ */
+static
+int
+event_store_quarantine(struct event_store *es)
+{
+ int num;
+ char newname[MAXPATHLEN+1];
+
+ /* find available qurantine name */
+ /* we give it at most 1024 tries */
+ for(num = 0; num < 1024; num++) {
+ struct stat st;
+
+ snprintf(newname, MAXPATHLEN, "%s.quarantine.%d", es->event_file_name, num);
+ newname[MAXPATHLEN] = 0;
+ if(stat(newname, &st) < 0) {
+ if(errno == ENOENT) {
+ /* file not found */
+ break;
+ } else {
+ /* some other error with name, probably permanent */
+ set_error(IL_SYS, errno, "event_store_qurantine: error looking for qurantine filename");
+ return(-1);
+
+ }
+ } else {
+ /* the filename is used already */
+ }
+ }
+ if(num >= 1024) {
+ /* new name not found */
+ /* XXX - is there more suitable error? */
+ set_error(IL_SYS, ENOSPC, "event_store_quarantine: exhausted number of retries looking for quarantine filename");
+ return(-1);
+ }
+
+ /* actually rename the file */
+ il_log(LOG_DEBUG, " renaming damaged event file from %s to %s\n",
+ es->event_file_name, newname);
+ if(rename(es->event_file_name, newname) < 0) {
+ set_error(IL_SYS, errno, "event_store_quarantine: error renaming event file");
+ return(-1);
+ }
+
+ /* clear the counters */
+ es->last_committed_ls = 0;
+ es->last_committed_bs = 0;
+ es->offset = 0;
+
+ return(0);
+}
+
+
+/*
* event_store_recover()
* - recover after restart or catch up when events missing in IPC
* - if offset > 0, read everything behind it
msg = server_msg_create(event_s, last);
free(event_s);
if(msg == NULL) {
- il_log(LOG_ALERT, " event file corrupted! Please move it to quarantine (ie. somewhere else) and restart interlogger.\n");
- break;
+ il_log(LOG_ALERT, " event file corrupted! I will try to move it to quarantine (ie. rename it).\n");
+ /* actually do not bother if quarantine succeeded or not - we could not do more */
+ event_store_quarantine(es);
+ fclose(ef);
+ event_store_unlock(es);
+ return(-1);
}
msg->es = es;
}
-
/* --------------------------------
* event store management functions
* --------------------------------
il_log(LOG_INFO, " attaching to event file: %s\n", filename);
+ if(strstr(filename, "quarantine") != NULL) {
+ il_log(LOG_INFO, " file name belongs to quarantine, not touching that.\n");
+ return(0);
+ }
+
event_file = fopen(filename, "r");
if(event_file == NULL) {
set_error(IL_SYS, errno, "event_store_from_file: error opening event file");
free(s);
}
closedir(event_dir);
+
+ /* one more pass - this time remove stale .ctl files */
+ event_dir = opendir(dir);
+ if(event_dir == NULL) {
+ free(dir);
+ set_error(IL_SYS, errno, "event_store_init: error opening event directory");
+ return(-1);
+ }
+
+ while((entry=readdir(event_dir))) {
+ char *s;
+
+ /* skip all files that do not match prefix */
+ if(strncmp(entry->d_name, p, len) != 0)
+ continue;
+
+ /* find all control files */
+ if((s=strstr(entry->d_name, ".ctl")) != NULL &&
+ s[4] == '\0') {
+ char *ef;
+ struct stat st;
+
+ /* is there corresponding event file? */
+ ef = malloc(strlen(dir) + strlen(entry->d_name) + 2);
+ if(ef == NULL) {
+ free(dir);
+ set_error(IL_NOMEM, ENOMEM, "event_store_init: no room for event file name");
+ return(-1);
+ }
+
+ s[0] = 0;
+ *ef = '\0';
+ strcat(ef, dir);
+ strcat(ef, "/");
+ strcat(ef, entry->d_name);
+ s[0] = '.';
+
+ if(stat(ef, &st) == 0) {
+ /* something is there */
+ /* XXX - it could be something else than event file, but do not bother now */
+ } else {
+ /* could not stat file, remove ctl */
+ strcat(ef, s);
+ il_log(LOG_DEBUG, " removing stale file %s\n", ef);
+ if(unlink(ef))
+ il_log(LOG_ERR, " could not remove file %s: %s\n", ef, strerror(errno));
+
+ }
+ free(ef);
+
+ }
+ }
+ closedir(event_dir);
free(dir);
}
char *msg;
long offset;
int ret;
-
+
+ if(killflg)
+ return(0);
+
clear_error();
if((ret = input_queue_get(&msg, &offset, INPUT_TIMEOUT)) < 0)
{
}
/* copy all relevant bytes from buffer */
- for(i=0; (i < len) && (buf[i] != EVENT_SEPARATOR); i++)
- *p++ = buf[i];
+ n = (char*)memccpy(p, buf, EVENT_SEPARATOR, len);
+ if(n) {
+ /* separator found */
+ n--; /* but do not preserve it */
+ i = n - p;
+ p = n;
+ } else {
+ /* separator not found */
+ i = len;
+ p += len;
+ }
+ /* This was definitely slowing us down:
+ * for(i=0; (i < len) && (buf[i] != EVENT_SEPARATOR); i++)
+ * *p++ = buf[i];
+ */
/* remove the data from queue */
if(i > 0)
return(NULL);
}
+#if 0
/* this is probably not necessary at all:
either len <=0, which was covered before,
or 0 <= i < len => p > buffer;
free(buffer);
return(NULL);
}
+#endif
return(buffer);
}
return(0);
case -1: /* error */
- set_error(IL_SYS, errno, "input_queue_get: error waiting for event");
- return(-1);
-
+ switch(errno) {
+ case EINTR:
+ il_log(LOG_DEBUG, " interrupted while waiting for event!\n");
+ return(0);
+
+ default:
+ set_error(IL_SYS, errno, "input_queue_get: error waiting for event");
+ return(-1);
+ }
default:
- break;
+ break;
}
if((accepted=accept(sock, NULL, NULL)) < 0) {
#include "interlogd.h"
#include "glite/lb/consumer.h"
+#include "glite/lb/log_proto.h"
#include "glite/security/glite_gss.h"
#define EXIT_FAILURE 1
#define DEFAULT_PREFIX "/tmp/notif_events"
#define DEFAULT_SOCKET "/tmp/notif_interlogger.sock"
#else
-#define DEFAULT_PREFIX "/tmp/dglogd.log"
+#define DEFAULT_PREFIX EDG_WLL_LOG_PREFIX_DEFAULT
#define DEFAULT_SOCKET "/tmp/interlogger.sock"
#endif
/* The name the program was run with, stripped of any leading path. */
char *program_name;
-static int killflg = 0;
+int killflg = 0;
int TIMEOUT = DEFAULT_TIMEOUT;
" -b, --book send events to bookkeeping server only\n"
" -l, --log-server <host> specify address of log server\n"
" -s, --socket <path> non-default path of local socket\n"
+ " -L, --lazy [<timeout>] be lazy when closing connections to servers\n"
, program_name, program_name);
exit(status);
}
static int verbose = 0;
char *file_prefix = DEFAULT_PREFIX;
int bs_only = 0;
+int lazy_close = 0;
+int default_close_timeout;
char *cert_file = NULL;
char *key_file = NULL;
{"CAdir", required_argument, 0, 'C'},
{"log-server", required_argument, 0, 'l'},
{"socket", required_argument, 0, 's'},
+ {"lazy", optional_argument, 0, 'L'},
{NULL, 0, NULL, 0}
};
"b" /* only bookeeping */
"l:" /* log server */
"d" /* debug */
+ "L::" /* lazy */
"s:", /* socket */
long_options, (int *) 0)) != EOF)
{
socket_path = strdup(optarg);
break;
+ case 'L':
+ lazy_close = 1;
+ if(optarg)
+ default_close_timeout = atoi(optarg);
+ else
+ default_close_timeout = TIMEOUT;
+ break;
+
default:
usage (EXIT_FAILURE);
}
setlinebuf(stdout);
setlinebuf(stderr);
- i = decode_switches (argc, argv);
-
if ((p = getenv("EDG_WL_INTERLOG_TIMEOUT"))) TIMEOUT = atoi(p);
+ i = decode_switches (argc, argv);
+
/* force -b if we do not have log server */
if(log_server == NULL) {
log_server = strdup(DEFAULT_LOG_SERVER);
il_log(LOG_CRIT, "Failed to initialize output event queues: %s\n", error_get_msg());
exit(EXIT_FAILURE);
}
+ if(lazy_close)
+ il_log(LOG_DEBUG, " using lazy mode when closing connections, timeout %d\n",
+ default_close_timeout);
if (CAcert_dir)
setenv("X509_CERT_DIR", CAcert_dir, 1);
extern char *key_file;
extern char *CAcert_dir;
extern int bs_only;
-
+extern int killflg;
+extern int lazy_close;
+extern int default_close_timeout;
/* shared data for thread communication */
#ifdef INTERLOGD_FLUSH
{
struct event_queue *eq = (struct event_queue *)q;
int ret, exit;
+ int retrycnt;
+ int close_timeout;
if(init_errors(0) < 0) {
il_log(LOG_ERR, "Error initializing thread specific data, exiting!");
event_queue_cond_lock(eq);
exit = 0;
+ retrycnt = 0;
while(!exit) {
clear_error();
&& (eq->flushing != 1)
#endif
) {
- ret = event_queue_wait(eq, 0);
+ if(lazy_close && close_timeout) {
+ ret = event_queue_wait(eq, close_timeout);
+ if(ret == 1) {/* timeout? */
+ event_queue_close(eq);
+ il_log(LOG_DEBUG, " connection to %s:%d closed\n",
+ eq->dest_name, eq->dest_port);
+ }
+ close_timeout = 0;
+ } else
+ ret = event_queue_wait(eq, 0);
if(ret < 0) {
/* error waiting */
il_log(LOG_ERR, "queue_thread: %s\n", error_get_msg());
#else
il_log(LOG_INFO, " could not connect to bookkeeping server %s, waiting for retry\n", eq->dest_name);
#endif
+ retrycnt++;
} else {
+ retrycnt = 0;
/* connected, send events */
switch(ret=event_queue_send(eq)) {
} /* switch */
/* we are done for now anyway, so close the queue */
+ if((ret == 1) && lazy_close)
+ close_timeout = default_close_timeout;
+ else {
event_queue_close(eq);
+ il_log(LOG_DEBUG, " connection to %s:%d closed\n",
+ eq->dest_name, eq->dest_port);
+ }
}
#if defined(INTERLOGD_HANDLE_CMD) && defined(INTERLOGD_FLUSH)
/* if there was some error with server, sleep for a while */
/* iff !event_queue_empty() */
- if(ret == 0)
+ /* also allow for one more try immediately after server disconnect,
+ which may cure server kicking us out after given number of connections */
+#ifndef LB_PERF
+ if((ret == 0) && (retrycnt > 0)) {
+ il_log(LOG_WARNING, " sleeping\n");
event_queue_sleep(eq);
+ }
+#endif
if(exit) {
/* we have to clean up before exiting */
int
event_queue_send(struct event_queue *eq)
{
+ int events_sent = 0;
assert(eq != NULL);
if(eq->gss.context == GSS_C_NO_CONTEXT)
tv.tv_usec = 0;
ret = edg_wll_gss_write_full(&eq->gss, msg->msg, msg->len, &tv, &bytes_sent, &gss_stat);
if(ret < 0) {
- eq->timeout = TIMEOUT;
- return(0);
+ eq->timeout = TIMEOUT;
+ return(0);
+ }
+ if(ret < 0) {
+ if (ret == EDG_WLL_GSS_ERROR_ERRNO && errno == EPIPE && events_sent > 0)
+ eq->timeout = 0;
+ else
+ eq->timeout = TIMEOUT;
+ return(0);
}
if((code = get_reply(eq, &rep, &code_min)) < 0) {
- /* could not get the reply properly, so try again later */
- il_log(LOG_ERR, " error reading server %s reply:\n %s\n", eq->dest_name, error_get_msg());
- eq->timeout = TIMEOUT;
- return(0);
+ /* could not get the reply properly, so try again later */
+ il_log(LOG_ERR, " error reading server %s reply:\n %s\n", eq->dest_name, error_get_msg());
+ if (events_sent>0)
+ eq->timeout = 1;
+ else
+ eq->timeout = TIMEOUT;
+ return(0);
}
il_log(LOG_DEBUG, " event sent, server %s replied with %d, %s\n", eq->dest_name, code, rep);
il_log(LOG_ERR, "send_event: %s\n", error_get_msg());
event_queue_remove(eq);
+ events_sent++;
break;
} /* switch */