--- /dev/null
+#ident "$Header$"
+
+#include <assert.h>
+#include <stdio.h>
+#include <errno.h>
+#include <string.h>
+#include <stdlib.h>
+#include <sys/types.h>
+#include <sys/stat.h>
+#include <dirent.h>
+#ifdef HAVE_UNISTD_H
+#include <unistd.h>
+#endif
+#include <fcntl.h>
+#include <sys/param.h>
+
+#include "glite/lb/events_parse.h"
+
+#include "interlogd.h"
+
+#ifdef __GNUC__
+#define UNUSED_VAR __attribute__((unused))
+#else
+#define UNUSED_VAR
+#endif
+
+static char *file_prefix = NULL;
+
+
+struct event_store_list {
+ struct event_store *es;
+ struct event_store_list *next;
+};
+
+
+static struct event_store_list *store_list;
+static pthread_rwlock_t store_list_lock = PTHREAD_RWLOCK_INITIALIZER;
+
+
+/* ----------------
+ * helper functions
+ * ----------------
+ */
+static
+char *
+jobid2eventfile(const char *job_id_s)
+{
+ char *buffer;
+
+ if(job_id_s) {
+ asprintf(&buffer, "%s.%s", file_prefix, job_id_s);
+ } else
+ asprintf(&buffer, "%s.default", file_prefix);
+
+ return(buffer);
+}
+
+
+static
+char *
+jobid2controlfile(char *job_id_s)
+{
+ char *buffer;
+ char *hash;
+
+ if(job_id_s) {
+ asprintf(&buffer, "%s.%s.ctl", file_prefix, job_id_s);
+ } else
+ asprintf(&buffer, "%s.default.ctl", file_prefix);
+
+ return(buffer);
+}
+
+struct file_reader_data {
+ int fd;
+ size_t max_len;
+ size_t pos;
+};
+
+#define IL_RD_VALUE(a,b) ((struct file_reader_data*)(a))->b
+
+static
+int
+file_reader(void *user_data, char *buffer, const int len)
+{
+ int l, m, ret;
+
+ m = IL_RD_VALUE(user_data, max_len) - IL_RD_VALUE(user_data, pos);
+ l = (len > m) ? m : len;
+ if(l > 0)
+ ret = read(IL_RD_VALUE(user_data, fd), buffer, l);
+ else
+ ret = 0;
+ IL_RD_VALUE(user_data, pos) += ret;
+ return ret;
+}
+
+
+static
+int
+read_event_string(FILE *file, il_http_message_t *msg)
+{
+ struct file_reader_data rd;
+ char s_len[20];
+ int len, ret;
+ int fd = fileno(file);
+
+ len = read(fd, s_len, sizeof(s_len));
+ if(len != sizeof(s_len)) {
+ if(len < 0)
+ set_error(IL_SYS, errno, "read_event_string: error reading record header");
+ else
+ set_error(IL_SYS, EIO, "read_event_string: record header too short");
+ return -1;
+ }
+ if(s_len[0] != 0 || s_len[sizeof(s_len) - 1] != 0) {
+ set_error(IL_SYS, EINVAL, "read_event_string: invalid record header");
+ return -1;
+ }
+ len = atoi(s_len + 1);
+ if(len < 0) {
+ set_error(IL_SYS, EINVAL, "read_event_string: invalid record length in header");
+ return -1;
+ }
+ rd.fd = fd;
+ rd.max_len = len;
+ rd.pos = 0;
+ ret = receive_http(&rd, file_reader, msg);
+ return ret;
+}
+
+
+
+/* ------------------------------
+ * event_store 'member' functions
+ * ------------------------------
+ */
+static
+int
+event_store_free(struct event_store *es)
+{
+ assert(es != NULL);
+
+ if(es->job_id_s) free(es->job_id_s);
+ if(es->event_file_name) free(es->event_file_name);
+ if(es->control_file_name) free(es->control_file_name);
+ pthread_rwlock_destroy(&es->use_lock);
+ pthread_rwlock_destroy(&es->update_lock);
+ free(es);
+
+ return(0);
+}
+
+
+static
+struct event_store *
+event_store_create(char *job_id_s)
+{
+ struct event_store *es;
+
+ es = malloc(sizeof(*es));
+ if(es == NULL) {
+ set_error(IL_NOMEM, ENOMEM, "event_store_create: error allocating room for structure");
+ return(NULL);
+ }
+
+ memset(es, 0, sizeof(*es));
+
+ il_log(LOG_DEBUG, " creating event store for id %s\n", job_id_s);
+
+ es->job_id_s = strdup(job_id_s);
+ es->event_file_name = jobid2eventfile(job_id_s);
+ es->control_file_name = jobid2controlfile(job_id_s);
+
+ if(pthread_rwlock_init(&es->update_lock, NULL))
+ abort();
+ if(pthread_rwlock_init(&es->use_lock, NULL))
+ abort();
+
+ return(es);
+}
+
+
+static
+int
+event_store_lock_ro(struct event_store *es)
+{
+ assert(es != NULL);
+
+ if(pthread_rwlock_rdlock(&es->update_lock))
+ abort();
+
+ return(0);
+}
+
+
+static
+int
+event_store_lock(struct event_store *es)
+{
+ assert(es != NULL);
+
+ if(pthread_rwlock_wrlock(&es->update_lock))
+ abort();
+
+ return(0);
+}
+
+
+static
+int
+event_store_unlock(struct event_store *es)
+{
+ assert(es != NULL);
+
+ if(pthread_rwlock_unlock(&es->update_lock))
+ abort();
+ return(0);
+}
+
+
+static
+int
+event_store_read_ctl(struct event_store *es)
+{
+ FILE *ctl_file;
+
+ assert(es != NULL);
+
+ event_store_lock(es);
+ if((ctl_file = fopen(es->control_file_name, "r")) == NULL) {
+ /* no control file, new event file */
+ es->last_committed_ls = 0;
+ es->last_committed_bs = 0;
+ } else {
+ /* read last seen and last committed counts */
+ fscanf(ctl_file, "%*s\n%ld\n%ld\n",
+ &es->last_committed_ls,
+ &es->last_committed_bs);
+ fclose(ctl_file);
+ }
+ event_store_unlock(es);
+
+ return(0);
+}
+
+
+static
+int
+event_store_write_ctl(struct event_store *es)
+{
+ FILE *ctl;
+
+ assert(es != NULL);
+
+ ctl = fopen(es->control_file_name, "w");
+ if(ctl == NULL) {
+ set_error(IL_SYS, errno, "event_store_write_ctl: error opening control file");
+ return(-1);
+ }
+
+ if(fprintf(ctl, "%s\n%ld\n%ld\n",
+ es->job_id_s,
+ es->last_committed_ls,
+ es->last_committed_bs) < 0) {
+ set_error(IL_SYS, errno, "event_store_write_ctl: error writing control record");
+ return(-1);
+ }
+
+ if(fclose(ctl) < 0) {
+ set_error(IL_SYS, errno, "event_store_write_ctl: error closing control file");
+ return(-1);
+ }
+
+ return(0);
+}
+
+
+/*
+ * event_store_qurantine()
+ * - rename damaged event store file
+ * - essentially does the same actions as cleanup, but the event store
+ * does not have to be empty
+ * returns 0 on success, -1 on error
+ */
+static
+int
+event_store_quarantine(struct event_store *es)
+{
+ int num;
+ char newname[MAXPATHLEN+1];
+
+ /* find available qurantine name */
+ /* we give it at most 1024 tries */
+ for(num = 0; num < 1024; num++) {
+ struct stat st;
+
+ snprintf(newname, MAXPATHLEN, "%s.quarantine.%d", es->event_file_name, num);
+ newname[MAXPATHLEN] = 0;
+ if(stat(newname, &st) < 0) {
+ if(errno == ENOENT) {
+ /* file not found */
+ break;
+ } else {
+ /* some other error with name, probably permanent */
+ set_error(IL_SYS, errno, "event_store_qurantine: error looking for qurantine filename");
+ return(-1);
+
+ }
+ } else {
+ /* the filename is used already */
+ }
+ }
+ if(num >= 1024) {
+ /* new name not found */
+ /* XXX - is there more suitable error? */
+ set_error(IL_SYS, ENOSPC, "event_store_quarantine: exhausted number of retries looking for quarantine filename");
+ return(-1);
+ }
+
+ /* actually rename the file */
+ il_log(LOG_DEBUG, " renaming damaged event file from %s to %s\n",
+ es->event_file_name, newname);
+ if(rename(es->event_file_name, newname) < 0) {
+ set_error(IL_SYS, errno, "event_store_quarantine: error renaming event file");
+ return(-1);
+ }
+
+ /* clear the counters */
+ es->last_committed_ls = 0;
+ es->last_committed_bs = 0;
+ es->offset = 0;
+
+ return(0);
+}
+
+
+/*
+ * event_store_recover()
+ * - recover after restart or catch up when events missing in IPC
+ * - if offset > 0, read everything behind it
+ * - if offset == 0, read everything behind min(last_committed_bs, last_committed_es)
+ */
+int
+event_store_recover(struct event_store *es)
+{
+ struct event_queue *eq_l = NULL, *eq_b = NULL;
+ struct server_msg *msg;
+ il_http_message_t hmsg;
+ char *event_s;
+ int fd, ret;
+ long last;
+ FILE *ef;
+ struct flock efl;
+ char err_msg[128];
+ struct stat stbuf;
+
+ assert(es != NULL);
+
+#if defined(IL_NOTIFICATIONS)
+ /* destination queue has to be found for each message separately */
+#else
+ /* find bookkepping server queue */
+ eq_b = queue_list_get(es->job_id_s);
+ if(eq_b == NULL)
+ return(-1);
+#endif
+
+#if !defined(IL_NOTIFICATIONS)
+ /* get log server queue */
+ eq_l = queue_list_get(NULL);
+#endif
+
+ event_store_lock(es);
+
+ il_log(LOG_DEBUG, " reading events from %s\n", es->event_file_name);
+
+ /* open event file */
+ ef = fopen(es->event_file_name, "r");
+ if(ef == NULL) {
+ snprintf(err_msg, sizeof(err_msg),
+ "event_store_recover: error opening event file %s",
+ es->event_file_name);
+ set_error(IL_SYS, errno, err_msg);
+ event_store_unlock(es);
+ return(-1);
+ }
+
+ /* lock the file for reading (we should not read while dglogd is writing) */
+ fd = fileno(ef);
+ efl.l_type = F_RDLCK;
+ efl.l_whence = SEEK_SET;
+ efl.l_start = 0;
+ efl.l_len = 0;
+ if(fcntl(fd, F_SETLKW, &efl) < 0) {
+ snprintf(err_msg, sizeof(err_msg),
+ "event_store_recover: error locking event file %s",
+ es->event_file_name);
+ set_error(IL_SYS, errno, err_msg);
+ event_store_unlock(es);
+ fclose(ef);
+ return(-1);
+ }
+
+ /* check the file modification time and size to avoid unnecessary operations */
+ memset(&stbuf, 0, sizeof(stbuf));
+ if(fstat(fd, &stbuf) < 0) {
+ il_log(LOG_ERR, " could not stat event file %s: %s\n", es->event_file_name, strerror(errno));
+ fclose(ef);
+ event_store_unlock(es);
+ return -1;
+ } else {
+ if((es->offset == stbuf.st_size) && (es->last_modified == stbuf.st_mtime)) {
+ il_log(LOG_DEBUG, " event file not modified since last visit, skipping\n");
+ fclose(ef);
+ event_store_unlock(es);
+ return(0);
+ }
+ }
+
+ while(1) { /* try, try, try */
+
+ /* get the position in file to be sought */
+ if(es->offset)
+ last = es->offset;
+ else {
+ last = es->last_committed_bs;
+ }
+
+ il_log(LOG_DEBUG, " setting starting file position to %ld\n", last);
+ il_log(LOG_DEBUG, " bytes sent to destination: %d\n", es->last_committed_bs);
+
+ if(last > 0) {
+ int c;
+
+ /* skip all committed or already enqueued events */
+ /* be careful - check, if the offset really points to the
+ beginning of event string */
+ if(fseek(ef, last, SEEK_SET) < 0) {
+ set_error(IL_SYS, errno, "event_store_recover: error setting position for read");
+ event_store_unlock(es);
+ fclose(ef);
+ return(-1);
+ }
+ /* the new event MUST start with 0 */
+ if((c=fgetc(ef)) != 0) {
+ /* Houston, we have got a problem */
+ il_log(LOG_WARNING,
+ " file position %ld does not point at the beginning of event string, backing off!\n",
+ last);
+ /* now, where were we? */
+ if(es->offset) {
+ /* next try will be with
+ last_commited_bs */
+ es->offset = 0;
+ } else {
+ /* this is really weird... back off completely */
+ es->last_committed_ls = es->last_committed_bs = 0;
+ }
+ } else {
+ /* OK, break out of the loop */
+ fseek(ef, -1, SEEK_CUR); /* should ungetc, but we are reading with read... */
+ break;
+ }
+ } else {
+ /* this breaks out of the loop, we are starting at
+ * the beginning of file
+ */
+ if(fseek(ef, 0, SEEK_SET) < 0) {
+ set_error(IL_SYS, errno, "event_store_recover: error setting position for read");
+ event_store_unlock(es);
+ fclose(ef);
+ return(-1);
+ }
+ break;
+ }
+ }
+
+ /* enqueue all remaining events */
+ ret = 1;
+ msg = NULL;
+ while(read_event_string(ef, &hmsg) >= 0) {
+
+ /* last holds the starting position of event_s in file */
+ il_log(LOG_DEBUG, " reading event at %ld\n", last);
+
+ /* break from now on means there was some error */
+ ret = -1;
+
+ /* create message for server */
+ msg = server_msg_create(&hmsg, last);
+ if(msg == NULL) {
+ il_log(LOG_ALERT, " event file corrupted! I will try to move it to quarantine (ie. rename it).\n");
+ /* actually do not bother if quarantine succeeded or not - we could not do more */
+ event_store_quarantine(es);
+ fclose(ef);
+ event_store_unlock(es);
+ return(-1);
+ }
+ msg->es = es;
+
+ /* first enqueue to the LS */
+ if(!bs_only && (last >= es->last_committed_ls)) {
+
+ il_log(LOG_DEBUG, " queueing event at %ld to logging server\n", last);
+
+#if !defined(IL_NOTIFICATIONS)
+ if(enqueue_msg(eq_l, msg) < 0)
+ break;
+#endif
+ }
+
+#ifdef IL_NOTIFICATIONS
+ eq_b = queue_list_get(msg->dest);
+#endif
+
+ /* now enqueue to the BS, if neccessary */
+ if((eq_b != eq_l) &&
+ (last >= es->last_committed_bs)) {
+
+ il_log(LOG_DEBUG, " queueing event at %ld to bookkeeping server\n", last);
+
+ if(enqueue_msg(eq_b, msg) < 0)
+ break;
+ }
+ server_msg_free(msg);
+ msg = NULL;
+
+ /* now last is also the offset behind the last successfully queued event */
+ last = ftell(ef);
+
+ /* ret == 0 means EOF or incomplete event found */
+ ret = 0;
+
+ } /* while */
+
+ /* due to this little assignment we had to lock the event_store for writing */
+ es->offset = last;
+ es->last_modified = stbuf.st_mtime;
+ il_log(LOG_DEBUG, " event store offset set to %ld\n", last);
+
+ if(msg)
+ server_msg_free(msg);
+
+ fclose(ef);
+ il_log(LOG_DEBUG, " finished reading events with %d\n", ret);
+
+ event_store_unlock(es);
+ return(ret);
+}
+
+
+/*
+ * event_store_sync()
+ * - check the position of event and fill holes from file
+ * - return 1 if the event is new,
+ * 0 if it was seen before,
+ * -1 if there was an error
+ */
+int
+event_store_sync(struct event_store *es, long offset)
+{
+ assert(es != NULL);
+
+ /* all events actually come through socket before going to file,
+ so nothing can be found in file that was not seen here */
+ return 1;
+}
+
+
+int
+event_store_next(struct event_store *es, long offset, int len)
+{
+ assert(es != NULL);
+
+ /* offsets are good only to detect losses (differences between socket and file),
+ which is not possible now */
+ return 0;
+}
+
+
+/*
+ * event_store_commit()
+ *
+ */
+int
+event_store_commit(struct event_store *es, int len, int ls)
+{
+ assert(es != NULL);
+
+ event_store_lock(es);
+
+ if(ls)
+ es->last_committed_ls += len;
+ else {
+ es->last_committed_bs += len;
+ if (bs_only) es->last_committed_ls += len;
+ }
+
+ if(event_store_write_ctl(es) < 0) {
+ event_store_unlock(es);
+ return(-1);
+ }
+
+ event_store_unlock(es);
+
+
+ return(0);
+}
+
+
+/*
+ * event_store_clean()
+ * - remove the event files (event and ctl), if they are not needed anymore
+ * - returns 0 if event_store is in use, 1 if it was removed and -1 on error
+ *
+ * Q: How do we know that we can safely remove the files?
+ * A: When all events from file have been committed both by LS and BS.
+ */
+static
+int
+event_store_clean(struct event_store *es)
+{
+ long last;
+ int fd;
+ FILE *ef;
+ struct flock efl;
+
+ assert(es != NULL);
+
+ /* prevent sender threads from updating */
+ event_store_lock(es);
+
+ il_log(LOG_DEBUG, " trying to cleanup event store %s\n", es->job_id_s);
+ il_log(LOG_DEBUG, " bytes sent to logging server: %d\n", es->last_committed_ls);
+ il_log(LOG_DEBUG, " bytes sent to bookkeeping server: %d\n", es->last_committed_bs);
+
+ /* preliminary check to avoid opening event file */
+ /* if the positions differ, some events still have to be sent */
+ if(es->last_committed_ls != es->last_committed_bs) {
+ event_store_unlock(es);
+ il_log(LOG_DEBUG, " not all events sent, cleanup aborted\n");
+ return(0);
+ }
+
+ /* the file can only be removed when all the events were succesfully sent
+ (ie. committed both by LS and BS */
+ /* That also implies that the event queues are 'empty' at the moment. */
+ ef = fopen(es->event_file_name, "r+");
+ if(ef == NULL) {
+ /* if we can not open the event store, it is an error and the struct should be removed */
+ /* XXX - is it true? */
+ event_store_unlock(es);
+ il_log(LOG_ERR, " event_store_clean: error opening event file: %s\n", strerror(errno));
+ return(1);
+ }
+
+ fd = fileno(ef);
+
+ /* prevent local-logger from writing into event file */
+ efl.l_type = F_WRLCK;
+ efl.l_whence = SEEK_SET;
+ efl.l_start = 0;
+ efl.l_len = 0;
+ if(fcntl(fd, F_SETLK, &efl) < 0) {
+ il_log(LOG_DEBUG, " could not lock event file, cleanup aborted\n");
+ fclose(ef);
+ event_store_unlock(es);
+ if(errno != EACCES &&
+ errno != EAGAIN) {
+ set_error(IL_SYS, errno, "event_store_clean: error locking event file");
+ return(-1);
+ }
+ return(0);
+ }
+
+ /* now the file should not contain partially written event, so it is safe
+ to get offset behind last event by seeking the end of file */
+ if(fseek(ef, 0, SEEK_END) < 0) {
+ set_error(IL_SYS, errno, "event_store_clean: error seeking the end of file");
+ event_store_unlock(es);
+ fclose(ef);
+ return(-1);
+ }
+
+ last = ftell(ef);
+ il_log(LOG_DEBUG, " total bytes in file: %d\n", last);
+
+ if(es->last_committed_ls < last) {
+ fclose(ef);
+ event_store_unlock(es);
+ il_log(LOG_DEBUG, " events still waiting in queue, cleanup aborted\n");
+ return(0);
+ } else if( es->last_committed_ls > last) {
+ il_log(LOG_WARNING, " warning: event file seems to shrink!\n");
+ /* XXX - in that case we can not continue because there may be
+ some undelivered events referring to that event store */
+ fclose(ef);
+ event_store_unlock(es);
+ return(0);
+ }
+
+ /* now we are sure that all events were sent and the event queues are empty */
+ il_log(LOG_INFO, " removing event file %s\n", es->event_file_name);
+
+ /* remove the event file */
+ unlink(es->event_file_name);
+ unlink(es->control_file_name);
+
+ /* clear the counters */
+ es->last_committed_ls = 0;
+ es->last_committed_bs = 0;
+ es->offset = 0;
+
+ /* unlock the event_store even if it is going to be removed */
+ event_store_unlock(es);
+
+ /* close the event file (that unlocks it as well) */
+ fclose(ef);
+
+ /* indicate that it is safe to remove this event_store */
+ return(1);
+}
+
+
+/* --------------------------------
+ * event store management functions
+ * --------------------------------
+ */
+struct event_store *
+event_store_find(char *job_id_s)
+{
+ struct event_store_list *q, *p;
+ struct event_store *es;
+
+ if(pthread_rwlock_wrlock(&store_list_lock)) {
+ abort();
+ }
+
+ es = NULL;
+
+ q = NULL;
+ p = store_list;
+
+ while(p) {
+ if(strcmp(p->es->job_id_s, job_id_s) == 0) {
+ es = p->es;
+ if(pthread_rwlock_rdlock(&es->use_lock))
+ abort();
+ if(pthread_rwlock_unlock(&store_list_lock))
+ abort();
+ return(es);
+ }
+
+ q = p;
+ p = p->next;
+ }
+
+ es = event_store_create(job_id_s);
+ if(es == NULL) {
+ if(pthread_rwlock_unlock(&store_list_lock))
+ abort();
+ return(NULL);
+ }
+
+ p = malloc(sizeof(*p));
+ if(p == NULL) {
+ set_error(IL_NOMEM, ENOMEM, "event_store_find: no room for new event store");
+ if(pthread_rwlock_unlock(&store_list_lock))
+ abort();
+ return(NULL);
+ }
+
+ p->next = store_list;
+ store_list = p;
+
+ p->es = es;
+
+ if(pthread_rwlock_rdlock(&es->use_lock))
+ abort();
+
+ if(pthread_rwlock_unlock(&store_list_lock))
+ abort();
+
+ return(es);
+}
+
+
+int
+event_store_release(struct event_store *es)
+{
+ assert(es != NULL);
+
+ if(pthread_rwlock_unlock(&es->use_lock))
+ abort();
+ il_log(LOG_DEBUG, " released lock on %s\n", es->job_id_s);
+ return(0);
+}
+
+
+event_store_from_file(char *filename)
+{
+ struct event_store *es;
+ FILE *event_file;
+ char *job_id_s = NULL, *p;
+ il_http_message_t hmsg;
+ int ret;
+
+ il_log(LOG_INFO, " attaching to event file: %s\n", filename);
+
+ if(strstr(filename, "quarantine") != NULL) {
+ il_log(LOG_INFO, " file name belongs to quarantine, not touching that.\n");
+ return(0);
+ }
+
+ event_file = fopen(filename, "r");
+ if(event_file == NULL) {
+ set_error(IL_SYS, errno, "event_store_from_file: error opening event file");
+ return(-1);
+ }
+ ret = read_event_string(event_file, &hmsg);
+ fclose(event_file);
+ if(ret < 0)
+ return(0);
+
+ /* get id aka dest */
+ job_id_s = hmsg.host;
+
+ il_log(LOG_DEBUG, " message dest: '%s'\n", job_id_s);
+ if(job_id_s == NULL) {
+ il_log(LOG_NOTICE, " skipping file, could not parse event\n");
+ ret = 0;
+ goto out;
+ }
+
+ es=event_store_find(job_id_s);
+
+ if(es == NULL) {
+ ret = -1;
+ goto out;
+ }
+
+ if((es->last_committed_ls == 0) &&
+ (es->last_committed_bs == 0) &&
+ (es->offset == 0)) {
+ ret = event_store_read_ctl(es);
+ } else
+ ret = 0;
+
+ event_store_release(es);
+
+out:
+ if(hmsg.data) free(hmsg.data);
+ if(job_id_s) free(job_id_s);
+ return(ret);
+}
+
+
+int
+event_store_init(char *prefix)
+{
+ if(file_prefix == NULL) {
+ file_prefix = strdup(prefix);
+ store_list = NULL;
+ }
+
+ /* read directory and get a list of event files */
+ {
+ int len;
+
+ char *p, *dir;
+ DIR *event_dir;
+ struct dirent *entry;
+
+
+ /* get directory name */
+ p = strrchr(file_prefix, '/');
+ if(p == NULL) {
+ dir = strdup(".");
+ p = "";
+ len = 0;
+ } else {
+ *p = '\0';
+ dir = strdup(file_prefix);
+ *p++ = '/';
+ len = strlen(p);
+ }
+
+ event_dir = opendir(dir);
+ if(event_dir == NULL) {
+ free(dir);
+ set_error(IL_SYS, errno, "event_store_init: error opening event directory");
+ return(-1);
+ }
+
+ while((entry=readdir(event_dir))) {
+ char *s;
+
+ /* skip all files that do not match prefix */
+ if(strncmp(entry->d_name, p, len) != 0)
+ continue;
+
+ /* skip all control files */
+ if((s=strstr(entry->d_name, ".ctl")) != NULL &&
+ s[4] == '\0')
+ continue;
+
+ s = malloc(strlen(dir) + strlen(entry->d_name) + 2);
+ if(s == NULL) {
+ free(dir);
+ set_error(IL_NOMEM, ENOMEM, "event_store_init: no room for file name");
+ return(-1);
+ }
+
+ *s = '\0';
+ strcat(s, dir);
+ strcat(s, "/");
+ strcat(s, entry->d_name);
+
+ if(event_store_from_file(s) < 0) {
+ free(dir);
+ free(s);
+ closedir(event_dir);
+ return(-1);
+ }
+
+ free(s);
+ }
+ closedir(event_dir);
+
+ /* one more pass - this time remove stale .ctl files */
+ event_dir = opendir(dir);
+ if(event_dir == NULL) {
+ free(dir);
+ set_error(IL_SYS, errno, "event_store_init: error opening event directory");
+ return(-1);
+ }
+
+ while((entry=readdir(event_dir))) {
+ char *s;
+
+ /* skip all files that do not match prefix */
+ if(strncmp(entry->d_name, p, len) != 0)
+ continue;
+
+ /* find all control files */
+ if((s=strstr(entry->d_name, ".ctl")) != NULL &&
+ s[4] == '\0') {
+ char *ef;
+ struct stat st;
+
+ /* is there corresponding event file? */
+ ef = malloc(strlen(dir) + strlen(entry->d_name) + 2);
+ if(ef == NULL) {
+ free(dir);
+ set_error(IL_NOMEM, ENOMEM, "event_store_init: no room for event file name");
+ return(-1);
+ }
+
+ s[0] = 0;
+ *ef = '\0';
+ strcat(ef, dir);
+ strcat(ef, "/");
+ strcat(ef, entry->d_name);
+ s[0] = '.';
+
+ if(stat(ef, &st) == 0) {
+ /* something is there */
+ /* XXX - it could be something else than event file, but do not bother now */
+ } else {
+ /* could not stat file, remove ctl */
+ strcat(ef, s);
+ il_log(LOG_DEBUG, " removing stale file %s\n", ef);
+ if(unlink(ef))
+ il_log(LOG_ERR, " could not remove file %s: %s\n", ef, strerror(errno));
+
+ }
+ free(ef);
+
+ }
+ }
+ closedir(event_dir);
+ free(dir);
+ }
+
+ return(0);
+}
+
+
+int
+event_store_recover_all()
+{
+ struct event_store_list *sl;
+
+
+ if(pthread_rwlock_rdlock(&store_list_lock))
+ abort();
+
+ /* recover all event stores */
+ sl = store_list;
+ while(sl != NULL) {
+
+ /* recover this event store */
+ /* no need to lock use_lock in event_store, the store_list_lock is in place */
+ if(event_store_recover(sl->es) < 0) {
+ il_log(LOG_ERR, " error recovering event store %s:\n %s\n", sl->es->event_file_name, error_get_msg());
+ clear_error();
+ }
+ sl = sl->next;
+ }
+
+ if(pthread_rwlock_unlock(&store_list_lock))
+ abort();
+
+ return(0);
+}
+
+
+#if 0
+int
+event_store_remove(struct event_store *es)
+{
+ struct event_store_list *p, **q;
+
+ assert(es != NULL);
+
+ switch(event_store_clean(es)) {
+ case 0:
+ il_log(LOG_DEBUG, " event store not removed, still used\n");
+ return(0);
+
+ case 1:
+ if(pthread_rwlock_wrlock(&store_list_lock) < 0) {
+ set_error(IL_SYS, errno, " event_store_remove: error locking event store list");
+ return(-1);
+ }
+
+ p = store_list;
+ q = &store_list;
+
+ while(p) {
+ if(p->es == es) {
+ (*q) = p->next;
+ event_store_free(es);
+ free(p);
+ break;
+ }
+ q = &(p->next);
+ p = p->next;
+ }
+
+ if(pthread_rwlock_unlock(&store_list_lock) < 0) {
+ set_error(IL_SYS, errno, " event_store_remove: error unlocking event store list");
+ return(-1);
+ }
+ return(1);
+
+ default:
+ return(-1);
+ }
+ /* not reached */
+ return(0);
+}
+#endif
+
+int
+event_store_cleanup()
+{
+ struct event_store_list *sl;
+ struct event_store_list *slnext;
+ struct event_store_list **prev;
+
+ /* try to remove event files */
+
+ if(pthread_rwlock_wrlock(&store_list_lock))
+ abort();
+
+ sl = store_list;
+ prev = &store_list;
+
+ while(sl != NULL) {
+ int ret;
+
+ slnext = sl->next;
+
+ /* one event store at time */
+ ret = pthread_rwlock_trywrlock(&sl->es->use_lock);
+ if(ret == EBUSY) {
+ il_log(LOG_DEBUG, " event_store %s is in use by another thread\n",
+ sl->es->job_id_s);
+ sl = slnext;
+ continue;
+ } else if (ret < 0)
+ abort();
+
+ switch(event_store_clean(sl->es)) {
+
+ case 1:
+ /* remove this event store */
+ (*prev) = slnext;
+ event_store_free(sl->es);
+ free(sl);
+ break;
+
+ case -1:
+ il_log(LOG_ERR, " error removing event store %s (file %s):\n %s\n",
+ sl->es->job_id_s, sl->es->event_file_name, error_get_msg());
+ /* event_store_release(sl->es); */
+ clear_error();
+ /* go on to the next */
+
+ default:
+ event_store_release(sl->es);
+ prev = &(sl->next);
+ break;
+ }
+
+ sl = slnext;
+ }
+
+ if(pthread_rwlock_unlock(&store_list_lock))
+ abort();
+
+ return(0);
+}
+
--- /dev/null
+#ident "$Header$"
+
+#include <sys/socket.h>
+#include <sys/un.h>
+#include <sys/time.h>
+#include <unistd.h>
+#include <errno.h>
+#include <assert.h>
+#include <string.h>
+#include <unistd.h>
+#include <fcntl.h>
+#include <stdio.h>
+#include <sys/stat.h>
+#include <unistd.h>
+#include <errno.h>
+
+#include "interlogd.h"
+
+static const int SOCK_QUEUE_MAX = 50;
+extern char *socket_path;
+extern char *file_prefix;
+
+static int sock;
+static int accepted;
+
+static
+int plain_reader(void *user_data, char *buffer, const int len)
+{
+ return (recv(*(int*)user_data, buffer, len, MSG_NOSIGNAL));
+}
+
+
+int
+input_queue_attach()
+{
+ struct sockaddr_un saddr;
+
+ if((sock=socket(PF_UNIX, SOCK_STREAM, 0)) < 0) {
+ set_error(IL_SYS, errno, "input_queue_attach: error creating socket");
+ return(-1);
+ }
+
+ memset(&saddr, 0, sizeof(saddr));
+ saddr.sun_family = AF_UNIX;
+ strcpy(saddr.sun_path, socket_path);
+
+ /* test for the presence of the socket and another instance
+ of interlogger listening */
+ if(connect(sock, (struct sockaddr *)&saddr, sizeof(saddr.sun_path)) < 0) {
+ if(errno == ECONNREFUSED) {
+ /* socket present, but no one at the other end; remove it */
+ il_log(LOG_WARNING, " removing stale input socket %s\n", socket_path);
+ unlink(socket_path);
+ }
+ /* ignore other errors for now */
+ } else {
+ /* connection was successful, so bail out - there is
+ another interlogger running */
+ set_error(IL_SYS, EADDRINUSE, "input_queue_attach: another instance of interlogger is running");
+ return(-1);
+ }
+
+ if(bind(sock, (struct sockaddr *)&saddr, sizeof(saddr)) < 0) {
+ set_error(IL_SYS, errno, "input_queue_attach: error binding socket");
+ return(-1);
+ }
+
+ if (listen(sock, SOCK_QUEUE_MAX)) {
+ set_error(IL_SYS, errno, "input_queue_attach: error listening on socket");
+ return -1;
+ }
+
+ return(0);
+}
+
+
+void input_queue_detach()
+{
+ if (sock >= 0)
+ close(sock);
+ unlink(socket_path);
+}
+
+
+int
+store_to_file(il_http_message_t *msg, long *offset) {
+ char s_len[20];
+ char filename[PATH_MAX];
+ FILE *outfile;
+ int i, filedesc;
+ int ret = -1;
+
+ if(msg->host == NULL) {
+ set_error(IL_PROTO, EINVAL, "store_to_file: no message destination specified");
+ }
+
+ snprintf(filename, sizeof(filename), "%s.%s", file_prefix, msg->host);
+ filename[sizeof(filename) - 1] = 0;
+ snprintf(s_len+1, sizeof(s_len)-1, "%18d\n", msg->len);
+ s_len[sizeof(s_len) - 1] = 0;
+ s_len[0] = 0;
+
+try_again:
+ if((outfile = fopen(filename, "a")) == NULL) {
+ set_error(IL_SYS, errno, "store_to_file: error opening file");
+ goto cleanup;
+ }
+ if((filedesc = fileno(outfile)) < 0) {
+ set_error(IL_SYS, errno, "store_to_file: error getting file descriptor");
+ goto cleanup;
+ }
+
+ for(i = 0; i < 5; i++) {
+ struct flock filelock;
+ int filelock_status;
+ struct stat statbuf;
+
+ filelock.l_type = F_WRLCK;
+ filelock.l_whence = SEEK_SET;
+ filelock.l_start = 0;
+ filelock.l_len = 0;
+
+ if((filelock_status=fcntl(filedesc, F_SETLK, &filelock)) < 0) {
+ switch(errno) {
+ case EAGAIN:
+ case EACCES:
+ case EINTR:
+ if((i+1) < 5) sleep(1);
+ break;
+ default:
+ set_error(IL_SYS, errno, "store_to_file: error locking file");
+ goto cleanup;
+ }
+ } else {
+ if(stat(filename, &statbuf)) {
+ if(errno == ENOENT) {
+ fclose(outfile);
+ goto try_again;
+ } else {
+ set_error(IL_SYS, errno, "store_file: could not stat file");
+ goto cleanup;
+ }
+ } else {
+ /* success */
+ break;
+ }
+ }
+ }
+
+ if(i == 5) {
+ set_error(IL_SYS, ETIMEDOUT, "store_to_file: timed out trying to lock file");
+ goto cleanup;
+ }
+ if(fseek(outfile, 0, SEEK_END) < 0) {
+ set_error(IL_SYS, errno, "store_to_file: error seeking at end of file");
+ goto cleanup;
+ }
+ if((*offset=ftell(outfile)) < 0) {
+ set_error(IL_SYS, errno, "store_to_file: error getting current position");
+ goto cleanup;
+ }
+ if(fwrite(s_len, sizeof(s_len), 1, outfile) != 1) {
+ set_error(IL_SYS, errno, "store_to_file: error writing data header to file");
+ goto cleanup;
+ }
+ if(fwrite(msg->data, msg->len, 1, outfile) != 1) {
+ set_error(IL_SYS, errno, "store_to_file: error writing data to file");
+ goto cleanup;
+ }
+ ret = 0;
+ fflush(outfile);
+
+cleanup:
+ if(outfile) fclose(outfile);
+ return ret;
+}
+
+
+int
+send_reply(int sd)
+{
+ const char reply[] = "<?xml version=\"1.0\" encoding=\"UTF-8\"?>"
+ "<SOAP-ENV:Envelope"
+ " xmlns:SOAP-ENV=\"http://schemas.xmlsoap.org/soap/envelope/\""
+ " xmlns:SOAP-ENC=\"http://schemas.xmlsoap.org/soap/encoding/\""
+ " xmlns:xsi=\"http://www.w3.org/2001/XMLSchema-instance\""
+ " xmlns:xsd=\"http://www.w3.org/2001/XMLSchema\""
+ " xmlns:ns3=\"http://glite.org/wsdl/types/jp\""
+ " xmlns:ns1=\"http://glite.org/wsdl/services/jp\""
+ " xmlns:ns2=\"http://glite.org/wsdl/elements/jp\">"
+ " <SOAP-ENV:Body>"
+ " <ns2:UpdateJobsResponse>"
+ " </ns2:UpdateJobsResponse>"
+ " </SOAP-ENV:Body>"
+ "</SOAP-ENV:Envelope>";
+
+ return(write(sd, reply, sizeof(reply)));
+}
+
+
+/*
+ * Returns: -1 on error, 0 if no message available, message length otherwise
+ *
+ */
+#ifdef PERF_EVENTS_INLINE
+int
+input_queue_get(il_octet_string_t **buffer, long *offset, int timeout)
+{
+ static long o = 0;
+ int len;
+ char *jobid;
+ static il_octet_string_t my_buffer;
+
+ assert(buffer != NULL);
+
+ *buffer = &my_buffer;
+
+ len = glite_wll_perftest_produceEventString(&my_buffer.data, &jobid);
+ my_buffer.len = len;
+ if(len) {
+ o += len;
+ *offset = o;
+ } else if (len == 0) {
+ sleep(timeout);
+ }
+ return(len);
+}
+#else
+int
+input_queue_get(il_octet_string_t **buffer, long *offset, int timeout)
+{
+ fd_set fds;
+ struct timeval tv;
+ int msg_len;
+ static il_http_message_t msg;
+
+ assert(buffer != NULL);
+
+ *buffer = (il_octet_string_t *)&msg;
+
+ FD_ZERO(&fds);
+ FD_SET(sock, &fds);
+
+ tv.tv_sec = timeout;
+ tv.tv_usec = 0;
+
+ msg_len = select(sock + 1, &fds, NULL, NULL, timeout >= 0 ? &tv : NULL);
+ switch(msg_len) {
+
+ case 0: /* timeout */
+ return(0);
+
+ case -1: /* error */
+ switch(errno) {
+ case EINTR:
+ il_log(LOG_DEBUG, " interrupted while waiting for event!\n");
+ return(0);
+
+ default:
+ set_error(IL_SYS, errno, "input_queue_get: error waiting for event");
+ return(-1);
+ }
+ default:
+ break;
+ }
+
+ if((accepted=accept(sock, NULL, NULL)) < 0) {
+ set_error(IL_SYS, errno, "input_queue_get: error accepting connection");
+ return(-1);
+ }
+
+ msg_len = receive_http(&accepted, plain_reader, &msg);
+
+ if(msg_len < 0) {
+ close(accepted);
+ if(error_get_maj() != IL_OK)
+ return -1;
+ else
+ return 0;
+ }
+
+ if(store_to_file(&msg, offset) < 0) {
+ close(accepted);
+ return -1;
+ }
+
+ send_reply(accepted);
+ close(accepted);
+ return(msg.len);
+}
+#endif
+
--- /dev/null
+#ident "$Header$"
+
+#include <assert.h>
+#include <errno.h>
+#include <stdio.h>
+#ifdef HAVE_UNISTD_H
+#include <unistd.h>
+#endif
+#include <fcntl.h>
+#include <sys/types.h>
+#include <sys/socket.h>
+#include <sys/un.h>
+
+
+/*
+ * - L/B server protocol handling routines
+ */
+
+#include "glite/jobid/cjobid.h"
+#include "glite/lb/il_string.h"
+#include "glite/lb/context.h"
+
+#include "interlogd.h"
+
+struct reader_data {
+ edg_wll_GssConnection *gss;
+ struct timeval *timeout;
+};
+
+
+static
+int
+gss_reader(void *user_data, char *buffer, int max_len)
+{
+ int ret;
+ struct reader_data *data = (struct reader_data *)user_data;
+ edg_wll_GssStatus gss_stat;
+
+ ret = edg_wll_gss_read(data->gss, buffer, max_len, data->timeout, &gss_stat);
+ if(ret < 0) {
+ char *gss_err = NULL;
+
+ if(ret == EDG_WLL_GSS_ERROR_GSS) {
+ edg_wll_gss_get_error(&gss_stat, "get_reply", &gss_err);
+ set_error(IL_DGGSS, ret, gss_err);
+ free(gss_err);
+ } else
+ set_error(IL_DGGSS, ret, "get_reply");
+ }
+ return(ret);
+}
+
+
+/*
+ * Read reply from server.
+ * Returns: -1 - error reading message,
+ * code > 0 - http status code from server
+ */
+static
+int
+get_reply(struct event_queue *eq, char **buf, int *code_min)
+{
+ int ret, code;
+ int len;
+ struct timeval tv;
+ struct reader_data data;
+ il_http_message_t msg;
+
+ tv.tv_sec = TIMEOUT;
+ tv.tv_usec = 0;
+ data.gss = &eq->gss;
+ data.timeout = &tv;
+ len = receive_http(&data, gss_reader, &msg);
+ if(len < 0) {
+ set_error(IL_PROTO, LB_PROTO, "get_reply: error reading server reply");
+ return(-1);
+ }
+ if(msg.data) free(msg.data);
+ if(msg.reply_string) *buf = msg.reply_string;
+ *code_min = 0; /* XXX fill in flag for fault */
+ return(msg.reply_code);
+}
+
+
+
+/*
+ * Returns: 0 - not connected, timeout set, 1 - OK
+ */
+int
+event_queue_connect(struct event_queue *eq)
+{
+ int ret;
+ struct timeval tv;
+ edg_wll_GssStatus gss_stat;
+ cred_handle_t *local_cred_handle;
+
+ assert(eq != NULL);
+
+#ifdef LB_PERF
+ if(!nosend) {
+#endif
+
+ if(eq->gss.context == NULL) {
+
+ tv.tv_sec = TIMEOUT;
+ tv.tv_usec = 0;
+
+ /* get pointer to the credentials */
+ if(pthread_mutex_lock(&cred_handle_lock) < 0)
+ abort();
+ local_cred_handle = cred_handle;
+ local_cred_handle->counter++;
+ if(pthread_mutex_unlock(&cred_handle_lock) < 0)
+ abort();
+
+ il_log(LOG_DEBUG, " trying to connect to %s:%d\n", eq->dest_name, eq->dest_port);
+ ret = edg_wll_gss_connect(local_cred_handle->creds, eq->dest_name, eq->dest_port, &tv, &eq->gss, &gss_stat);
+ if(pthread_mutex_lock(&cred_handle_lock) < 0)
+ abort();
+ /* check if we need to release the credentials */
+ --local_cred_handle->counter;
+ if(local_cred_handle != cred_handle && local_cred_handle->counter == 0) {
+ edg_wll_gss_release_cred(&local_cred_handle->creds, NULL);
+ free(local_cred_handle);
+ il_log(LOG_DEBUG, " freed credentials, not used anymore\n");
+ }
+ if(pthread_mutex_unlock(&cred_handle_lock) < 0)
+ abort();
+
+ if(ret < 0) {
+ char *gss_err = NULL;
+
+ if (ret == EDG_WLL_GSS_ERROR_GSS)
+ edg_wll_gss_get_error(&gss_stat, "event_queue_connect: edg_wll_gss_connect", &gss_err);
+ set_error(IL_DGGSS, ret,
+ (ret == EDG_WLL_GSS_ERROR_GSS) ? gss_err : "event_queue_connect: edg_wll_gss_connect");
+ if (gss_err) free(gss_err);
+ eq->gss.context = NULL;
+ eq->timeout = TIMEOUT;
+ return(0);
+ }
+ }
+
+#ifdef LB_PERF
+ }
+#endif
+
+ return(1);
+}
+
+
+int
+event_queue_close(struct event_queue *eq)
+{
+ assert(eq != NULL);
+
+#ifdef LB_PERF
+ if(!nosend) {
+#endif
+
+ if(eq->gss.context != NULL) {
+ edg_wll_gss_close(&eq->gss, NULL);
+ eq->gss.context = NULL;
+ }
+#ifdef LB_PERF
+ }
+#endif
+ return(0);
+}
+
+
+/*
+ * Send all events from the queue.
+ * Returns: -1 - system error, 0 - not sent, 1 - queue empty
+ */
+int
+event_queue_send(struct event_queue *eq)
+{
+ int events_sent = 0;
+ assert(eq != NULL);
+
+#ifdef LB_PERF
+ if(!nosend) {
+#endif
+ if(eq->gss.context == NULL)
+ return(0);
+#ifdef LB_PERF
+ }
+#endif
+
+ /* feed the server with events */
+ while (!event_queue_empty(eq)) {
+ struct server_msg *msg;
+ char *rep;
+ int ret, code, code_min;
+ size_t bytes_sent;
+ struct timeval tv;
+ edg_wll_GssStatus gss_stat;
+
+ clear_error();
+
+ if(event_queue_get(eq, &msg) < 0)
+ return(-1);
+
+ il_log(LOG_DEBUG, " trying to deliver event at offset %d for job %s\n", msg->offset, msg->job_id_s);
+
+#ifdef LB_PERF
+ if(!nosend) {
+#endif
+ /* XXX: ljocha -- does it make sense to send empty messages ? */
+ if (msg->len) {
+ tv.tv_sec = TIMEOUT;
+ tv.tv_usec = 0;
+ ret = edg_wll_gss_write_full(&eq->gss, msg->msg, msg->len, &tv, &bytes_sent, &gss_stat);
+ if(ret < 0) {
+ if (ret == EDG_WLL_GSS_ERROR_ERRNO && errno == EPIPE && events_sent > 0) {
+ eq->timeout = 0;
+ } else {
+ il_log(LOG_ERR, "send_event: %s\n", error_get_msg());
+ eq->timeout = TIMEOUT;
+ }
+ return(0);
+ }
+ if((code = get_reply(eq, &rep, &code_min)) < 0) {
+ /* could not get the reply properly, so try again later */
+ if (events_sent>0)
+ eq->timeout = 1;
+ else {
+ eq->timeout = TIMEOUT;
+ il_log(LOG_ERR, " error reading server %s reply:\n %s\n", eq->dest_name, error_get_msg());
+ }
+ return(0);
+ }
+ }
+ else { code = 200; code_min = 0; rep = strdup("not sending empty message"); }
+#ifdef LB_PERF
+ } else {
+ glite_wll_perftest_consumeEventIlMsg(msg->msg+17);
+ code = 200;
+ rep = strdup("OK");
+ }
+#endif
+
+ il_log(LOG_DEBUG, " event sent, server %s replied with %d, %s\n", eq->dest_name, code, rep);
+ free(rep);
+
+ /* the reply is back here, decide what to do with message */
+ /* HTTP error codes:
+ 1xx - informational (eg. 100 Continue)
+ 2xx - successful (eg. 200 OK)
+ 3xx - redirection (eg. 301 Moved Permanently)
+ 4xx - client error (eq. 400 Bad Request)
+ 5xx - server error (eq. 500 Internal Server Error)
+ */
+ if(code >= 500 && code < 600) {
+
+ /* non fatal errors (for us), try to deliver later */
+ eq->timeout = TIMEOUT;
+ return(0);
+ }
+
+ /* the message was consumed (successfully or not) */
+ /* update the event pointer */
+ if(event_store_commit(msg->es, msg->ev_len, queue_list_is_log(eq)) < 0)
+ /* failure committing message, this is bad */
+ return(-1);
+
+ event_queue_remove(eq);
+ events_sent++;
+ } /* while */
+
+ return(1);
+
+} /* send_events */
+
+
+/* this is just not used */
+int
+send_confirmation(long lllid, int code)
+{
+ return 0;
+}