added handling of http messages
authorMichal Voců <michal@ruk.cuni.cz>
Mon, 21 Apr 2008 15:43:04 +0000 (15:43 +0000)
committerMichal Voců <michal@ruk.cuni.cz>
Mon, 21 Apr 2008 15:43:04 +0000 (15:43 +0000)
org.glite.lb.logger/src/event_store_http.c [new file with mode: 0644]
org.glite.lb.logger/src/http.c [new file with mode: 0644]
org.glite.lb.logger/src/input_queue_socket_http.c [new file with mode: 0644]
org.glite.lb.logger/src/queue_mgr_http.c [new file with mode: 0644]
org.glite.lb.logger/src/send_event_http.c [new file with mode: 0644]
org.glite.lb.logger/src/server_msg_http.c [new file with mode: 0644]

diff --git a/org.glite.lb.logger/src/event_store_http.c b/org.glite.lb.logger/src/event_store_http.c
new file mode 100644 (file)
index 0000000..0363ec2
--- /dev/null
@@ -0,0 +1,1126 @@
+#ident "$Header$"
+
+#include <assert.h>
+#include <stdio.h>
+#include <errno.h>
+#include <string.h>
+#include <stdlib.h>
+#include <sys/types.h>
+#include <sys/stat.h>
+#include <dirent.h>
+#ifdef HAVE_UNISTD_H
+#include <unistd.h>
+#endif
+#include <fcntl.h>
+#include <sys/param.h>
+
+#include "glite/lb/events_parse.h"
+
+#include "interlogd.h"
+
+#ifdef __GNUC__
+#define UNUSED_VAR __attribute__((unused))
+#else
+#define UNUSED_VAR
+#endif
+
+static char *file_prefix = NULL;
+
+
+struct event_store_list {
+       struct event_store *es;
+       struct event_store_list *next;
+};
+
+
+static struct event_store_list *store_list;
+static pthread_rwlock_t store_list_lock = PTHREAD_RWLOCK_INITIALIZER;
+
+
+/* ----------------
+ * helper functions
+ * ----------------
+ */
+static
+char *
+jobid2eventfile(const char *job_id_s)
+{
+  char *buffer;
+
+  if(job_id_s) {
+    asprintf(&buffer, "%s.%s", file_prefix, job_id_s);
+  } else 
+    asprintf(&buffer, "%s.default", file_prefix);
+    
+  return(buffer);
+}
+
+
+static
+char *
+jobid2controlfile(char *job_id_s)
+{
+  char *buffer;
+  char *hash;
+
+  if(job_id_s) {
+    asprintf(&buffer, "%s.%s.ctl", file_prefix, job_id_s);
+  } else 
+    asprintf(&buffer, "%s.default.ctl", file_prefix);
+    
+  return(buffer);
+}
+
+struct file_reader_data {
+       int fd;
+       size_t max_len;
+       size_t pos;
+};
+
+#define IL_RD_VALUE(a,b) ((struct file_reader_data*)(a))->b
+
+static
+int
+file_reader(void *user_data, char *buffer, const int len)
+{
+       int l, m, ret;
+       
+       m = IL_RD_VALUE(user_data, max_len) - IL_RD_VALUE(user_data, pos);
+       l = (len > m) ? m : len;
+       if(l > 0) 
+               ret = read(IL_RD_VALUE(user_data, fd), buffer, l);
+       else 
+               ret = 0;
+       IL_RD_VALUE(user_data, pos) += ret;
+       return ret;
+}
+
+
+static
+int
+read_event_string(FILE *file, il_http_message_t *msg)
+{
+       struct file_reader_data rd;
+       char s_len[20];
+       int  len, ret;
+       int fd = fileno(file);
+
+       len = read(fd, s_len, sizeof(s_len));
+       if(len != sizeof(s_len)) {
+               if(len < 0) 
+                       set_error(IL_SYS, errno, "read_event_string: error reading record header");
+               else 
+                       set_error(IL_SYS, EIO, "read_event_string: record header too short");
+               return -1;
+       }
+       if(s_len[0] != 0 || s_len[sizeof(s_len) - 1] != 0) {
+               set_error(IL_SYS, EINVAL, "read_event_string: invalid record header");
+               return -1;
+       }
+       len = atoi(s_len + 1);
+       if(len < 0) {
+               set_error(IL_SYS, EINVAL, "read_event_string: invalid record length in header");
+               return -1;
+       }
+       rd.fd = fd;
+       rd.max_len = len;
+       rd.pos = 0;
+       ret = receive_http(&rd, file_reader, msg);
+       return ret;
+}
+
+
+
+/* ------------------------------
+ * event_store 'member' functions
+ * ------------------------------
+ */
+static
+int
+event_store_free(struct event_store *es)
+{
+  assert(es != NULL);
+
+  if(es->job_id_s) free(es->job_id_s);
+  if(es->event_file_name) free(es->event_file_name);
+  if(es->control_file_name) free(es->control_file_name);
+  pthread_rwlock_destroy(&es->use_lock);
+  pthread_rwlock_destroy(&es->update_lock);
+  free(es);
+
+  return(0);
+}
+
+
+static
+struct event_store *
+event_store_create(char *job_id_s)
+{
+  struct event_store *es;
+
+  es = malloc(sizeof(*es));
+  if(es == NULL) {
+    set_error(IL_NOMEM, ENOMEM, "event_store_create: error allocating room for structure");
+    return(NULL);
+  }
+
+  memset(es, 0, sizeof(*es));
+
+  il_log(LOG_DEBUG, "  creating event store for id %s\n", job_id_s);
+
+  es->job_id_s = strdup(job_id_s);
+  es->event_file_name = jobid2eventfile(job_id_s);
+  es->control_file_name = jobid2controlfile(job_id_s);
+
+  if(pthread_rwlock_init(&es->update_lock, NULL)) 
+          abort();
+  if(pthread_rwlock_init(&es->use_lock, NULL)) 
+         abort();
+
+  return(es);
+}
+
+
+static
+int
+event_store_lock_ro(struct event_store *es)
+{
+  assert(es != NULL);
+
+  if(pthread_rwlock_rdlock(&es->update_lock)) 
+    abort();
+
+  return(0);
+}
+
+
+static
+int
+event_store_lock(struct event_store *es)
+{
+  assert(es != NULL);
+
+  if(pthread_rwlock_wrlock(&es->update_lock)) 
+    abort();
+
+  return(0);
+}
+
+
+static
+int
+event_store_unlock(struct event_store *es)
+{
+  assert(es != NULL);
+
+  if(pthread_rwlock_unlock(&es->update_lock)) 
+    abort();
+  return(0);
+}
+
+
+static
+int
+event_store_read_ctl(struct event_store *es)
+{
+  FILE *ctl_file;
+
+  assert(es != NULL);
+
+  event_store_lock(es);
+  if((ctl_file = fopen(es->control_file_name, "r")) == NULL) {
+    /* no control file, new event file */
+    es->last_committed_ls = 0;
+    es->last_committed_bs = 0;
+  } else {
+    /* read last seen and last committed counts */
+    fscanf(ctl_file, "%*s\n%ld\n%ld\n",
+          &es->last_committed_ls,
+          &es->last_committed_bs);
+    fclose(ctl_file);
+  }
+  event_store_unlock(es);
+
+  return(0);
+}
+
+
+static
+int
+event_store_write_ctl(struct event_store *es)
+{
+  FILE   *ctl;
+
+  assert(es != NULL);
+
+  ctl = fopen(es->control_file_name, "w");
+  if(ctl == NULL) {
+    set_error(IL_SYS, errno, "event_store_write_ctl: error opening control file");
+    return(-1);
+  }
+
+  if(fprintf(ctl, "%s\n%ld\n%ld\n", 
+            es->job_id_s, 
+            es->last_committed_ls,
+            es->last_committed_bs) < 0) {
+    set_error(IL_SYS, errno, "event_store_write_ctl: error writing control record");
+    return(-1);
+  }
+
+  if(fclose(ctl) < 0) {
+    set_error(IL_SYS, errno, "event_store_write_ctl: error closing control file");
+    return(-1);
+  }
+
+  return(0);
+}
+
+
+/*
+ * event_store_qurantine() 
+ *   - rename damaged event store file 
+ *   - essentially does the same actions as cleanup, but the event store 
+ *     does not have to be empty
+ * returns 0 on success, -1 on error
+ */
+static
+int
+event_store_quarantine(struct event_store *es) 
+{
+       int num;
+       char newname[MAXPATHLEN+1];
+
+       /* find available qurantine name */
+       /* we give it at most 1024 tries */
+       for(num = 0; num < 1024; num++) {
+               struct stat st;
+
+               snprintf(newname, MAXPATHLEN, "%s.quarantine.%d", es->event_file_name, num);
+               newname[MAXPATHLEN] = 0;
+               if(stat(newname, &st) < 0) {
+                       if(errno == ENOENT) {
+                               /* file not found */
+                               break;
+                       } else {
+                               /* some other error with name, probably permanent */
+                               set_error(IL_SYS, errno, "event_store_qurantine: error looking for qurantine filename");
+                               return(-1);
+                               
+                       }
+               } else {
+                       /* the filename is used already */
+               }
+       }
+       if(num >= 1024) {
+               /* new name not found */
+               /* XXX - is there more suitable error? */
+               set_error(IL_SYS, ENOSPC, "event_store_quarantine: exhausted number of retries looking for quarantine filename");
+               return(-1);
+       }
+
+       /* actually rename the file */
+       il_log(LOG_DEBUG, "    renaming damaged event file from %s to %s\n",
+              es->event_file_name, newname);
+       if(rename(es->event_file_name, newname) < 0) {
+               set_error(IL_SYS, errno, "event_store_quarantine: error renaming event file");
+               return(-1);
+       }
+
+       /* clear the counters */
+       es->last_committed_ls = 0;
+       es->last_committed_bs = 0;
+       es->offset = 0;
+
+       return(0);
+}
+
+
+/*
+ * event_store_recover()
+ *   - recover after restart or catch up when events missing in IPC
+ *   - if offset > 0, read everything behind it
+ *   - if offset == 0, read everything behind min(last_committed_bs, last_committed_es)
+ */
+int
+event_store_recover(struct event_store *es)
+{
+  struct event_queue *eq_l = NULL, *eq_b = NULL;
+  struct server_msg *msg;
+  il_http_message_t hmsg;
+  char *event_s;
+  int fd, ret;
+  long last;
+  FILE *ef;
+  struct flock efl;
+  char err_msg[128];
+  struct stat stbuf;
+
+  assert(es != NULL);
+  
+#if defined(IL_NOTIFICATIONS)
+  /* destination queue has to be found for each message separately */
+#else
+  /* find bookkepping server queue */
+  eq_b = queue_list_get(es->job_id_s);
+  if(eq_b == NULL) 
+    return(-1);
+#endif
+
+#if !defined(IL_NOTIFICATIONS)
+  /* get log server queue */
+  eq_l = queue_list_get(NULL);
+#endif
+
+  event_store_lock(es);
+
+  il_log(LOG_DEBUG, "  reading events from %s\n", es->event_file_name);
+
+  /* open event file */
+  ef = fopen(es->event_file_name, "r");
+  if(ef == NULL) {
+         snprintf(err_msg, sizeof(err_msg), 
+                  "event_store_recover: error opening event file %s",
+                  es->event_file_name);
+         set_error(IL_SYS, errno, err_msg);
+         event_store_unlock(es);
+         return(-1);
+  }
+
+  /* lock the file for reading (we should not read while dglogd is writing) */
+  fd = fileno(ef);
+  efl.l_type = F_RDLCK;
+  efl.l_whence = SEEK_SET;
+  efl.l_start = 0;
+  efl.l_len = 0;
+  if(fcntl(fd, F_SETLKW, &efl) < 0) {
+         snprintf(err_msg, sizeof(err_msg), 
+                  "event_store_recover: error locking event file %s",
+                  es->event_file_name);
+         set_error(IL_SYS, errno, err_msg);
+         event_store_unlock(es);
+         fclose(ef);
+         return(-1);
+  }
+
+  /* check the file modification time and size to avoid unnecessary operations */
+  memset(&stbuf, 0, sizeof(stbuf));
+  if(fstat(fd, &stbuf) < 0) {
+         il_log(LOG_ERR, "    could not stat event file %s: %s\n", es->event_file_name, strerror(errno));
+         fclose(ef);
+         event_store_unlock(es);
+         return -1;
+  } else {
+         if((es->offset == stbuf.st_size) && (es->last_modified == stbuf.st_mtime)) {
+                 il_log(LOG_DEBUG, "  event file not modified since last visit, skipping\n");
+                 fclose(ef);
+                 event_store_unlock(es);
+                 return(0);
+         }
+  }
+
+  while(1) { /* try, try, try */
+
+         /* get the position in file to be sought */
+         if(es->offset)
+                 last = es->offset;
+         else {
+                 last = es->last_committed_bs;
+         }
+
+         il_log(LOG_DEBUG, "    setting starting file position to  %ld\n", last);
+         il_log(LOG_DEBUG, "    bytes sent to destination: %d\n", es->last_committed_bs);
+
+         if(last > 0) {
+                 int c;
+
+                 /* skip all committed or already enqueued events */
+                 /* be careful - check, if the offset really points to the
+                    beginning of event string */
+                 if(fseek(ef, last, SEEK_SET) < 0) {
+                         set_error(IL_SYS, errno, "event_store_recover: error setting position for read");
+                         event_store_unlock(es);
+                         fclose(ef);
+                         return(-1);
+                 }
+                 /* the new event MUST start with 0 */
+                 if((c=fgetc(ef)) != 0) {
+                         /* Houston, we have got a problem */
+                         il_log(LOG_WARNING, 
+                                "    file position %ld does not point at the beginning of event string, backing off!\n",
+                                last);
+                         /* now, where were we? */
+                         if(es->offset) {
+                                 /* next try will be with
+                                    last_commited_bs */
+                                 es->offset = 0;
+                         } else {
+                                 /* this is really weird... back off completely */
+                                 es->last_committed_ls = es->last_committed_bs = 0;
+                         }
+                 } else {
+                         /* OK, break out of the loop */
+                         fseek(ef, -1, SEEK_CUR); /* should ungetc, but we are reading with read... */
+                         break;
+                 }
+         } else {
+                 /* this breaks out of the loop, we are starting at
+                  * the beginning of file
+                  */
+                 if(fseek(ef, 0, SEEK_SET) < 0) {
+                         set_error(IL_SYS, errno, "event_store_recover: error setting position for read");
+                         event_store_unlock(es);
+                         fclose(ef);
+                         return(-1);
+                 }
+                 break;
+         }
+  }
+
+  /* enqueue all remaining events */
+  ret = 1;
+  msg = NULL;
+  while(read_event_string(ef, &hmsg) >= 0) {
+       
+    /* last holds the starting position of event_s in file */
+    il_log(LOG_DEBUG, "    reading event at %ld\n", last);
+
+    /* break from now on means there was some error */
+    ret = -1;
+
+    /* create message for server */
+    msg = server_msg_create(&hmsg, last);
+    if(msg == NULL) {
+           il_log(LOG_ALERT, "    event file corrupted! I will try to move it to quarantine (ie. rename it).\n");
+           /* actually do not bother if quarantine succeeded or not - we could not do more */
+           event_store_quarantine(es);
+           fclose(ef);
+           event_store_unlock(es);
+           return(-1);
+    }
+    msg->es = es;
+
+    /* first enqueue to the LS */
+    if(!bs_only && (last >= es->last_committed_ls)) {
+      
+      il_log(LOG_DEBUG, "      queueing event at %ld to logging server\n", last);
+
+#if !defined(IL_NOTIFICATIONS)
+      if(enqueue_msg(eq_l, msg) < 0)
+       break;
+#endif
+      }
+
+#ifdef IL_NOTIFICATIONS
+    eq_b = queue_list_get(msg->dest);
+#endif
+
+    /* now enqueue to the BS, if neccessary */
+    if((eq_b != eq_l) && 
+       (last >= es->last_committed_bs)) {
+      
+      il_log(LOG_DEBUG, "      queueing event at %ld to bookkeeping server\n", last);
+      
+      if(enqueue_msg(eq_b, msg) < 0)
+       break;
+    }
+    server_msg_free(msg);
+    msg = NULL;
+
+    /* now last is also the offset behind the last successfully queued event */
+    last = ftell(ef);
+
+    /* ret == 0 means EOF or incomplete event found */
+    ret = 0;
+
+  } /* while */
+
+  /* due to this little assignment we had to lock the event_store for writing */
+  es->offset = last;
+  es->last_modified = stbuf.st_mtime;
+  il_log(LOG_DEBUG, "  event store offset set to %ld\n", last);
+
+  if(msg) 
+    server_msg_free(msg);
+
+  fclose(ef);
+  il_log(LOG_DEBUG, "  finished reading events with %d\n", ret);
+
+  event_store_unlock(es);
+  return(ret);
+}
+
+
+/*
+ * event_store_sync()
+ *   - check the position of event and fill holes from file
+ *   - return 1 if the event is new,
+ *            0 if it was seen before,
+ *           -1 if there was an error
+ */
+int
+event_store_sync(struct event_store *es, long offset)
+{
+       assert(es != NULL);
+
+       /* all events actually come through socket before going to file,
+          so nothing can be found in file that was not seen here */
+       return 1;
+}
+
+
+int
+event_store_next(struct event_store *es, long offset, int len)
+{
+       assert(es != NULL);
+  
+       /* offsets are good only to detect losses (differences between socket and file),
+          which is not possible now */
+       return 0;
+}
+
+
+/* 
+ * event_store_commit()
+ *
+ */
+int
+event_store_commit(struct event_store *es, int len, int ls)
+{
+  assert(es != NULL);
+
+  event_store_lock(es);
+
+  if(ls)
+    es->last_committed_ls += len;
+  else {
+    es->last_committed_bs += len;
+    if (bs_only) es->last_committed_ls += len;
+  }
+
+  if(event_store_write_ctl(es) < 0) {
+    event_store_unlock(es);
+    return(-1);
+  }
+
+  event_store_unlock(es);
+
+
+  return(0);
+}
+
+
+/*
+ * event_store_clean()
+ *  - remove the event files (event and ctl), if they are not needed anymore
+ *  - returns 0 if event_store is in use, 1 if it was removed and -1 on error
+ *
+ * Q: How do we know that we can safely remove the files?
+ * A: When all events from file have been committed both by LS and BS.
+ */
+static 
+int
+event_store_clean(struct event_store *es)
+{
+  long last;
+  int fd;
+  FILE *ef;
+  struct flock efl;
+
+  assert(es != NULL);
+
+  /* prevent sender threads from updating */
+  event_store_lock(es);
+  
+  il_log(LOG_DEBUG, "  trying to cleanup event store %s\n", es->job_id_s);
+  il_log(LOG_DEBUG, "    bytes sent to logging server: %d\n", es->last_committed_ls);
+  il_log(LOG_DEBUG, "    bytes sent to bookkeeping server: %d\n", es->last_committed_bs);
+
+  /* preliminary check to avoid opening event file */
+  /* if the positions differ, some events still have to be sent */
+  if(es->last_committed_ls != es->last_committed_bs) {
+    event_store_unlock(es);
+    il_log(LOG_DEBUG, "  not all events sent, cleanup aborted\n");
+    return(0);
+  }
+
+  /* the file can only be removed when all the events were succesfully sent 
+     (ie. committed both by LS and BS */
+  /* That also implies that the event queues are 'empty' at the moment. */
+  ef = fopen(es->event_file_name, "r+");
+  if(ef == NULL) {
+    /* if we can not open the event store, it is an error and the struct should be removed */
+    /* XXX - is it true? */
+    event_store_unlock(es);
+    il_log(LOG_ERR,  "  event_store_clean: error opening event file: %s\n", strerror(errno));
+    return(1);
+  }
+  
+  fd = fileno(ef);
+  
+  /* prevent local-logger from writing into event file */
+  efl.l_type = F_WRLCK;
+  efl.l_whence = SEEK_SET;
+  efl.l_start = 0;
+  efl.l_len = 0;
+  if(fcntl(fd, F_SETLK, &efl) < 0) {
+    il_log(LOG_DEBUG, "    could not lock event file, cleanup aborted\n");
+    fclose(ef);
+    event_store_unlock(es);
+    if(errno != EACCES &&
+       errno != EAGAIN) {
+      set_error(IL_SYS, errno, "event_store_clean: error locking event file");
+      return(-1);
+    }
+    return(0);
+  }
+  
+  /* now the file should not contain partially written event, so it is safe
+     to get offset behind last event by seeking the end of file */
+  if(fseek(ef, 0, SEEK_END) < 0) {
+    set_error(IL_SYS, errno, "event_store_clean: error seeking the end of file");
+    event_store_unlock(es);
+    fclose(ef);
+    return(-1);
+  }
+  
+  last = ftell(ef);
+  il_log(LOG_DEBUG, "    total bytes in file: %d\n", last);
+
+  if(es->last_committed_ls < last) {
+    fclose(ef);
+    event_store_unlock(es);
+    il_log(LOG_DEBUG, "    events still waiting in queue, cleanup aborted\n");
+    return(0);
+  } else if( es->last_committed_ls > last) {
+         il_log(LOG_WARNING, "  warning: event file seems to shrink!\n");
+         /* XXX - in that case we can not continue because there may be
+            some undelivered events referring to that event store */
+         fclose(ef);
+         event_store_unlock(es);
+         return(0);
+  }
+  
+  /* now we are sure that all events were sent and the event queues are empty */
+  il_log(LOG_INFO, "    removing event file %s\n", es->event_file_name);
+  
+  /* remove the event file */
+  unlink(es->event_file_name);
+  unlink(es->control_file_name);
+  
+  /* clear the counters */
+  es->last_committed_ls = 0;
+  es->last_committed_bs = 0;
+  es->offset = 0;
+
+  /* unlock the event_store even if it is going to be removed */
+  event_store_unlock(es);
+
+  /* close the event file (that unlocks it as well) */
+  fclose(ef);
+
+  /* indicate that it is safe to remove this event_store */
+  return(1);
+}
+
+
+/* --------------------------------
+ * event store management functions
+ * --------------------------------
+ */
+struct event_store *
+event_store_find(char *job_id_s)
+{
+  struct event_store_list *q, *p;
+  struct event_store *es;
+
+  if(pthread_rwlock_wrlock(&store_list_lock)) {
+         abort();
+  }
+
+  es = NULL;
+  
+  q = NULL;
+  p = store_list;
+  
+  while(p) {
+    if(strcmp(p->es->job_id_s, job_id_s) == 0) {
+      es = p->es;
+      if(pthread_rwlock_rdlock(&es->use_lock))
+             abort();
+      if(pthread_rwlock_unlock(&store_list_lock)) 
+             abort();
+      return(es);
+    }
+
+    q = p;
+    p = p->next;
+  }
+
+  es = event_store_create(job_id_s);
+  if(es == NULL) {
+         if(pthread_rwlock_unlock(&store_list_lock)) 
+                 abort();
+         return(NULL);
+  }
+
+  p = malloc(sizeof(*p));
+  if(p == NULL) {
+    set_error(IL_NOMEM, ENOMEM, "event_store_find: no room for new event store");
+      if(pthread_rwlock_unlock(&store_list_lock)) 
+             abort();
+    return(NULL);
+  }
+  
+  p->next = store_list;
+  store_list = p;
+    
+  p->es = es;
+
+  if(pthread_rwlock_rdlock(&es->use_lock))
+         abort();
+
+  if(pthread_rwlock_unlock(&store_list_lock)) 
+         abort();
+
+  return(es);
+}
+
+
+int
+event_store_release(struct event_store *es)
+{
+       assert(es != NULL);
+
+       if(pthread_rwlock_unlock(&es->use_lock))
+               abort();
+       il_log(LOG_DEBUG, "  released lock on %s\n", es->job_id_s);
+       return(0);
+}
+
+
+event_store_from_file(char *filename)
+{
+       struct event_store *es;
+       FILE *event_file;
+       char *job_id_s = NULL, *p;
+       il_http_message_t hmsg;
+       int ret;
+       
+       il_log(LOG_INFO, "  attaching to event file: %s\n", filename);
+       
+       if(strstr(filename, "quarantine") != NULL) {
+               il_log(LOG_INFO, "  file name belongs to quarantine, not touching that.\n");
+               return(0);
+       }
+
+       event_file = fopen(filename, "r");
+       if(event_file == NULL) {
+               set_error(IL_SYS, errno, "event_store_from_file: error opening event file");
+               return(-1);
+       }
+       ret = read_event_string(event_file, &hmsg);
+       fclose(event_file);
+       if(ret < 0) 
+               return(0);
+       
+       /* get id aka dest */
+       job_id_s = hmsg.host;
+
+       il_log(LOG_DEBUG, "  message dest: '%s'\n", job_id_s);
+       if(job_id_s == NULL) {
+               il_log(LOG_NOTICE, "  skipping file, could not parse event\n");
+               ret = 0;
+               goto out;
+       }
+       
+       es=event_store_find(job_id_s);
+       
+       if(es == NULL) {
+               ret = -1;
+               goto out;
+       }
+
+       if((es->last_committed_ls == 0) &&
+          (es->last_committed_bs == 0) &&
+          (es->offset == 0)) {
+               ret = event_store_read_ctl(es);
+       } else 
+               ret = 0;
+       
+       event_store_release(es);
+
+out:
+       if(hmsg.data) free(hmsg.data);
+       if(job_id_s) free(job_id_s);
+       return(ret);
+}
+
+
+int
+event_store_init(char *prefix)
+{
+  if(file_prefix == NULL) {
+    file_prefix = strdup(prefix);
+    store_list = NULL;
+  }
+
+  /* read directory and get a list of event files */
+  {
+    int len;
+
+    char *p, *dir;
+    DIR *event_dir;
+    struct dirent *entry;
+
+
+    /* get directory name */
+    p = strrchr(file_prefix, '/');
+    if(p == NULL) {
+      dir = strdup(".");
+      p = "";
+      len = 0;
+    } else {
+      *p = '\0';
+      dir = strdup(file_prefix);
+      *p++ = '/';
+      len = strlen(p);
+    }
+
+    event_dir = opendir(dir);
+    if(event_dir == NULL) {
+      free(dir);
+      set_error(IL_SYS, errno, "event_store_init: error opening event directory");
+      return(-1);
+    }
+    
+    while((entry=readdir(event_dir))) {
+      char *s;
+
+      /* skip all files that do not match prefix */
+      if(strncmp(entry->d_name, p, len) != 0) 
+       continue;
+
+      /* skip all control files */
+      if((s=strstr(entry->d_name, ".ctl")) != NULL &&
+        s[4] == '\0')
+       continue;
+
+      s = malloc(strlen(dir) + strlen(entry->d_name) + 2);
+      if(s == NULL) {
+       free(dir);
+       set_error(IL_NOMEM, ENOMEM, "event_store_init: no room for file name");
+       return(-1);
+      }
+
+      *s = '\0';
+      strcat(s, dir);
+      strcat(s, "/");
+      strcat(s, entry->d_name);
+
+      if(event_store_from_file(s) < 0) {
+       free(dir);
+       free(s);
+       closedir(event_dir);
+       return(-1);
+      }
+
+      free(s);
+    }
+    closedir(event_dir);
+
+    /* one more pass - this time remove stale .ctl files */
+    event_dir = opendir(dir);
+    if(event_dir == NULL) {
+      free(dir);
+      set_error(IL_SYS, errno, "event_store_init: error opening event directory");
+      return(-1);
+    }
+    
+    while((entry=readdir(event_dir))) {
+      char *s;
+
+      /* skip all files that do not match prefix */
+      if(strncmp(entry->d_name, p, len) != 0) 
+       continue;
+
+      /* find all control files */
+      if((s=strstr(entry->d_name, ".ctl")) != NULL &&
+        s[4] == '\0') {
+             char *ef;
+             struct stat st;
+
+             /* is there corresponding event file? */
+             ef = malloc(strlen(dir) + strlen(entry->d_name) + 2);
+             if(ef == NULL) {
+                     free(dir);
+                     set_error(IL_NOMEM, ENOMEM, "event_store_init: no room for event file name");
+                     return(-1);
+             }
+
+             s[0] = 0;
+             *ef = '\0';
+             strcat(ef, dir);
+             strcat(ef, "/");
+             strcat(ef, entry->d_name);
+             s[0] = '.';
+
+             if(stat(ef, &st) == 0) {
+                     /* something is there */
+                     /* XXX - it could be something else than event file, but do not bother now */
+             } else {
+                     /* could not stat file, remove ctl */
+                     strcat(ef, s);
+                     il_log(LOG_DEBUG, "  removing stale file %s\n", ef);
+                     if(unlink(ef)) 
+                             il_log(LOG_ERR, "  could not remove file %s: %s\n", ef, strerror(errno));
+                     
+             }
+             free(ef);
+
+      }
+    }
+    closedir(event_dir);
+    free(dir);
+  }
+
+  return(0);
+}
+
+
+int
+event_store_recover_all()
+{
+  struct event_store_list *sl;
+
+
+  if(pthread_rwlock_rdlock(&store_list_lock)) 
+         abort();
+
+  /* recover all event stores */
+  sl = store_list;
+  while(sl != NULL) {
+
+         /* recover this event store */
+         /* no need to lock use_lock in event_store, the store_list_lock is in place */
+         if(event_store_recover(sl->es) < 0) {
+                 il_log(LOG_ERR, "  error recovering event store %s:\n    %s\n", sl->es->event_file_name, error_get_msg());
+                 clear_error();
+         }
+         sl = sl->next;
+  }
+  
+  if(pthread_rwlock_unlock(&store_list_lock)) 
+         abort();
+
+  return(0);
+}
+
+
+#if 0 
+int
+event_store_remove(struct event_store *es)
+{
+  struct event_store_list *p, **q;
+
+  assert(es != NULL);
+
+  switch(event_store_clean(es)) {
+  case 0:
+    il_log(LOG_DEBUG, "  event store not removed, still used\n");
+    return(0);
+    
+  case 1:
+    if(pthread_rwlock_wrlock(&store_list_lock) < 0) {
+      set_error(IL_SYS, errno, "  event_store_remove: error locking event store list");
+      return(-1);
+    }
+
+    p = store_list;
+    q = &store_list;
+
+    while(p) {
+      if(p->es == es) {
+       (*q) = p->next;
+       event_store_free(es);
+       free(p);
+       break;
+      }
+      q = &(p->next);
+      p = p->next;
+    }
+
+    if(pthread_rwlock_unlock(&store_list_lock) < 0) {
+      set_error(IL_SYS, errno, "  event_store_remove: error unlocking event store list");
+      return(-1);
+    }
+    return(1);
+
+  default:
+    return(-1);
+  }
+  /* not reached */
+  return(0);
+}
+#endif
+
+int
+event_store_cleanup()
+{
+  struct event_store_list *sl;
+  struct event_store_list *slnext;
+  struct event_store_list **prev;
+
+  /* try to remove event files */
+
+  if(pthread_rwlock_wrlock(&store_list_lock)) 
+         abort();
+
+  sl = store_list;
+  prev = &store_list;
+
+  while(sl != NULL) {
+         int ret;
+
+         slnext = sl->next;
+         
+         /* one event store at time */
+         ret = pthread_rwlock_trywrlock(&sl->es->use_lock);
+         if(ret == EBUSY) {
+                 il_log(LOG_DEBUG, "  event_store %s is in use by another thread\n", 
+                        sl->es->job_id_s);
+                 sl = slnext;
+                 continue;
+         } else if (ret < 0)
+           abort();
+
+         switch(event_store_clean(sl->es)) {
+                 
+         case 1:
+                 /* remove this event store */
+                 (*prev) = slnext;
+                 event_store_free(sl->es);
+                 free(sl);
+                 break;
+                 
+         case -1:
+                 il_log(LOG_ERR, "  error removing event store %s (file %s):\n    %s\n", 
+                        sl->es->job_id_s, sl->es->event_file_name, error_get_msg());
+                 /* event_store_release(sl->es); */
+                 clear_error();
+                 /* go on to the next */
+                 
+         default:
+                 event_store_release(sl->es);
+                 prev = &(sl->next);
+                 break;
+         }
+         
+         sl = slnext;
+  }
+  
+  if(pthread_rwlock_unlock(&store_list_lock)) 
+         abort();
+  
+  return(0);
+}
+
diff --git a/org.glite.lb.logger/src/http.c b/org.glite.lb.logger/src/http.c
new file mode 100644 (file)
index 0000000..415e86e
--- /dev/null
@@ -0,0 +1,196 @@
+#ident "$Header$"
+
+#include <string.h>
+#include <errno.h>
+
+#include "interlogd.h"
+
+
+int 
+parse_request(const char *s, il_http_message_t *msg)
+{
+       if(!strncasecmp(s, "HTTP", 4)) {
+               msg->msg_type = IL_HTTP_REPLY;
+       } else if(!strncasecmp(s, "POST", 4)) {
+               msg->msg_type = IL_HTTP_POST;
+       } else if(!strncasecmp(s, "GET", 3)) {
+               msg->msg_type = IL_HTTP_GET;
+       } else {
+               msg->msg_type = IL_HTTP_OTHER;
+       }
+       if(msg->msg_type == IL_HTTP_REPLY) {
+               char *p = strchr(s, ' ');
+
+               if(!p) goto parse_end;
+               p++;
+               msg->reply_code=atoi(p);
+               p = strchr(p, ' ');
+               if(!p) goto parse_end;
+               p++;
+               msg->reply_string = strdup(p);
+
+       parse_end:
+               ;
+       }
+}
+
+
+int
+parse_header(const char *s, il_http_message_t *msg)
+{
+       if(!strncasecmp(s, "Content-Length:", 15)) {
+               msg->content_length = atoi(s + 15);
+       } else if(!strncasecmp(s, "Host:", 5)) {
+               const char *p = s + 4;
+               while(*++p == ' '); /* skip spaces */
+               msg->host = strdup(p);
+       }
+       return(0);
+}
+
+
+#define DEFAULT_CHUNK_SIZE 1024
+
+// read what is available and parse what can be parsed
+// returns the result of read operation of the underlying connection,
+// ie. the number of bytes read or error code
+int
+receive_http(void *user_data, int (*reader)(void *, char *, const int), il_http_message_t *msg)
+{
+       static enum { NONE, IN_REQUEST, IN_HEADERS, IN_BODY } state = NONE;
+       int  len, alen, clen, i, buffer_free, min_buffer_free = DEFAULT_CHUNK_SIZE;
+       char *buffer, *p, *s, *cr;
+       
+       msg->data = NULL;
+       msg->len = 0;
+       state = IN_REQUEST;
+       alen = 0;
+       buffer = NULL;
+       buffer_free = 0;
+       p = NULL;
+       s = NULL;
+
+       do {
+               /* p - first empty position in buffer
+                  alen - size of allocated buffer
+                  len - number of bytes received in last read
+                  s - points behind last scanned CRLF or at buffer start 
+                  buffer_free = alen - (p - buffer) 
+               */
+
+               /* prepare at least chunk_size bytes for next data */
+               if(buffer_free < min_buffer_free) {
+                       char *n;
+                       
+                       alen += min_buffer_free;
+                       n = realloc(buffer, alen);
+                       if(n == NULL) {
+                               free(buffer);
+                               set_error(IL_NOMEM, ENOMEM, "read_event: no room for event");
+                               return(-1);
+                       }
+                       buffer_free += min_buffer_free;
+                       p = n + (p - buffer);
+                       s = n + (s - buffer);
+                       buffer = n;
+               }
+
+               if(buffer_free > 0) {
+                       len = (*reader)(user_data, p, buffer_free); 
+                       if(len < 0) {
+                               // error
+                               free(buffer);
+                               // set_error(IL_SYS, errno, "receive_http: error reading data");
+                               return -1;
+                       } else if(len == 0) {
+                               // EOF
+                               free(buffer);
+                               set_error(IL_PROTO, errno, "receive_http: error reading data - premature EOF");
+                               return -1;
+                       }
+                       buffer_free -= len;
+                       p+= len;
+               }
+
+
+               switch(state) {
+
+                       // parse buffer, look for CRLFs
+                       //   s - start scan position
+                       //   p - start of current token
+                       //   cr - current CRLF position
+
+               case IN_REQUEST:
+                       if((s < p - 1) &&
+                          (cr = (char*)memchr(s, '\r', p - s - 1)) &&
+                          (cr[1] == '\n')) {
+                               *cr = 0;
+                               parse_request(s, msg);
+                               *cr = '\r';
+                               // change state
+                               state = IN_HEADERS;
+                               // start new tokens (cr < p - 1 -> s < p + 1 <-> s <= p)
+                               s = cr + 2;
+                       } else {
+                         break;
+                       }
+
+               case IN_HEADERS:  
+                       while((state != IN_BODY) &&
+                             (s < p - 1) && 
+                             (cr = (char*)memchr(s, '\r', p - s - 1)) &&
+                             (cr[1] == '\n')) {
+                               if(s == cr) { /* do not consider request starting with CRLF */
+                                       // found CRLFCRLF
+                                       state = IN_BODY;
+                               } else {
+                                       *cr = 0;
+                                       parse_header(s, msg);
+                                       *cr = '\r';
+                               }
+                               // next scan starts after CRLF
+                               s = cr + 2; 
+                       }
+                       if(state == IN_BODY) {
+                               // we found body
+                               // content-length should be set at the moment
+                               if(msg->content_length > 0) {
+                                       int need_free = msg->content_length - (p - s);
+                                       char *n;
+                       
+                                       alen += need_free - buffer_free + 1;
+                                       n = realloc(buffer, alen);
+                                       if(n == NULL) {
+                                               free(buffer);
+                                               set_error(IL_NOMEM, ENOMEM, "read_event: no room for event");
+                                               return(-1);
+                                       }
+                                       buffer_free = need_free;
+                                       min_buffer_free = 0;
+                                       p = n + (p - buffer);
+                                       s = n + (s - buffer);
+                                       buffer = n;
+                               } else {
+                                       // report error
+                                       free(buffer);
+                                       set_error(IL_PROTO, EINVAL, "receive_http: error reading data - no content length specified\n");
+                                       return -1;
+                               }
+                       }
+                       break;
+                       
+               case IN_BODY:
+                       if(buffer_free == 0) {
+                               // finished reading
+                               *p = 0;
+                               state = NONE;
+                       }
+                       break;
+               }
+       } while(state != NONE);
+       
+       msg->data = buffer;
+       msg->len = p - buffer;
+
+       return 0;
+}
diff --git a/org.glite.lb.logger/src/input_queue_socket_http.c b/org.glite.lb.logger/src/input_queue_socket_http.c
new file mode 100644 (file)
index 0000000..53a519d
--- /dev/null
@@ -0,0 +1,292 @@
+#ident "$Header$"
+
+#include <sys/socket.h>
+#include <sys/un.h>
+#include <sys/time.h>
+#include <unistd.h>
+#include <errno.h>
+#include <assert.h>
+#include <string.h>
+#include <unistd.h>
+#include <fcntl.h>
+#include <stdio.h>
+#include <sys/stat.h>
+#include <unistd.h>
+#include <errno.h>
+
+#include "interlogd.h"
+
+static const int   SOCK_QUEUE_MAX = 50;
+extern char *socket_path;
+extern char *file_prefix;
+
+static int sock;
+static int accepted;
+
+static
+int plain_reader(void *user_data, char *buffer, const int len)
+{
+       return (recv(*(int*)user_data, buffer, len, MSG_NOSIGNAL));
+}
+
+                
+int 
+input_queue_attach()
+{ 
+  struct sockaddr_un saddr;
+
+  if((sock=socket(PF_UNIX, SOCK_STREAM, 0)) < 0) {
+    set_error(IL_SYS, errno, "input_queue_attach: error creating socket");
+    return(-1);
+  }
+
+  memset(&saddr, 0, sizeof(saddr));
+  saddr.sun_family = AF_UNIX;
+  strcpy(saddr.sun_path, socket_path);
+
+  /* test for the presence of the socket and another instance 
+     of interlogger listening */
+  if(connect(sock, (struct sockaddr *)&saddr, sizeof(saddr.sun_path)) < 0) {
+         if(errno == ECONNREFUSED) {
+                 /* socket present, but no one at the other end; remove it */
+                 il_log(LOG_WARNING, "  removing stale input socket %s\n", socket_path);
+                 unlink(socket_path);
+         }
+         /* ignore other errors for now */
+  } else {
+         /* connection was successful, so bail out - there is 
+            another interlogger running */
+         set_error(IL_SYS, EADDRINUSE, "input_queue_attach: another instance of interlogger is running");
+         return(-1);
+  }
+  
+  if(bind(sock, (struct sockaddr *)&saddr, sizeof(saddr)) < 0) {
+    set_error(IL_SYS, errno, "input_queue_attach: error binding socket");
+    return(-1);
+  }
+
+  if (listen(sock, SOCK_QUEUE_MAX)) {
+    set_error(IL_SYS, errno, "input_queue_attach: error listening on socket");
+    return -1;
+  }
+
+  return(0);
+}
+
+
+void input_queue_detach()
+{
+  if (sock >= 0)
+    close(sock);
+  unlink(socket_path);
+}
+
+
+int
+store_to_file(il_http_message_t *msg, long *offset) {
+       char s_len[20];
+       char filename[PATH_MAX];
+       FILE *outfile;
+       int i, filedesc;
+       int ret = -1;
+
+       if(msg->host == NULL) {
+               set_error(IL_PROTO, EINVAL, "store_to_file: no message destination specified");
+       }
+
+       snprintf(filename, sizeof(filename), "%s.%s", file_prefix, msg->host);
+       filename[sizeof(filename) - 1] = 0;
+       snprintf(s_len+1, sizeof(s_len)-1, "%18d\n", msg->len);
+       s_len[sizeof(s_len) - 1] = 0;
+       s_len[0] = 0;
+
+try_again:
+       if((outfile = fopen(filename, "a")) == NULL) {
+               set_error(IL_SYS, errno, "store_to_file: error opening file");
+               goto cleanup;
+       }
+       if((filedesc = fileno(outfile)) < 0) {
+               set_error(IL_SYS, errno, "store_to_file: error getting file descriptor");
+               goto cleanup;
+       }
+
+       for(i = 0; i < 5; i++) {
+               struct flock filelock;
+               int filelock_status;
+               struct stat statbuf;
+
+               filelock.l_type = F_WRLCK;
+               filelock.l_whence = SEEK_SET;
+               filelock.l_start = 0;
+               filelock.l_len = 0;
+
+               if((filelock_status=fcntl(filedesc, F_SETLK, &filelock)) < 0) {
+                       switch(errno) {
+                       case EAGAIN:
+                       case EACCES:
+                       case EINTR:
+                               if((i+1) < 5) sleep(1);
+                               break;
+                       default:
+                               set_error(IL_SYS, errno, "store_to_file: error locking file");
+                               goto cleanup;
+                       }
+               } else {
+                       if(stat(filename, &statbuf)) {
+                               if(errno == ENOENT) {
+                                       fclose(outfile);
+                                       goto try_again;
+                               } else {
+                                       set_error(IL_SYS, errno, "store_file: could not stat file");
+                                       goto cleanup;
+                               }
+                       } else {
+                               /* success */
+                               break;
+                       }
+               }
+       }
+
+       if(i == 5) {
+               set_error(IL_SYS, ETIMEDOUT, "store_to_file: timed out trying to lock file");
+               goto cleanup;
+       }
+       if(fseek(outfile, 0, SEEK_END) < 0) {
+               set_error(IL_SYS, errno, "store_to_file: error seeking at end of file");
+               goto cleanup;
+       }
+       if((*offset=ftell(outfile)) < 0) {
+               set_error(IL_SYS, errno, "store_to_file: error getting current position");
+               goto cleanup;
+       }
+       if(fwrite(s_len, sizeof(s_len), 1, outfile) != 1) {
+               set_error(IL_SYS, errno, "store_to_file: error writing data header to file");
+               goto cleanup;
+       }
+       if(fwrite(msg->data, msg->len, 1, outfile) != 1) {
+               set_error(IL_SYS, errno, "store_to_file: error writing data to file");
+               goto cleanup;
+       }
+       ret = 0;
+       fflush(outfile);
+
+cleanup:
+       if(outfile) fclose(outfile);
+       return ret;
+}
+
+
+int 
+send_reply(int sd)
+{
+       const char reply[] = "<?xml version=\"1.0\" encoding=\"UTF-8\"?>"
+               "<SOAP-ENV:Envelope"
+               " xmlns:SOAP-ENV=\"http://schemas.xmlsoap.org/soap/envelope/\""
+               " xmlns:SOAP-ENC=\"http://schemas.xmlsoap.org/soap/encoding/\""
+               " xmlns:xsi=\"http://www.w3.org/2001/XMLSchema-instance\""
+               " xmlns:xsd=\"http://www.w3.org/2001/XMLSchema\""
+               " xmlns:ns3=\"http://glite.org/wsdl/types/jp\""
+               " xmlns:ns1=\"http://glite.org/wsdl/services/jp\""
+               " xmlns:ns2=\"http://glite.org/wsdl/elements/jp\">"
+               " <SOAP-ENV:Body>"
+               "  <ns2:UpdateJobsResponse>"
+               "  </ns2:UpdateJobsResponse>"
+               " </SOAP-ENV:Body>"
+               "</SOAP-ENV:Envelope>";
+       
+       return(write(sd, reply, sizeof(reply)));
+}
+
+
+/*
+ * Returns: -1 on error, 0 if no message available, message length otherwise
+ *
+ */
+#ifdef PERF_EVENTS_INLINE
+int
+input_queue_get(il_octet_string_t **buffer, long *offset, int timeout)
+{
+       static long o = 0;
+       int len;
+       char *jobid;
+       static il_octet_string_t my_buffer;
+       
+       assert(buffer != NULL);
+
+       *buffer = &my_buffer;
+
+       len = glite_wll_perftest_produceEventString(&my_buffer.data, &jobid);
+       my_buffer.len = len;
+       if(len) {
+               o += len;
+               *offset = o;
+       } else if (len == 0) {
+               sleep(timeout);
+       }
+       return(len);
+}
+#else
+int
+input_queue_get(il_octet_string_t **buffer, long *offset, int timeout)
+{
+  fd_set fds;
+  struct timeval tv;
+  int msg_len;
+  static il_http_message_t msg;
+
+  assert(buffer != NULL);
+
+  *buffer = (il_octet_string_t *)&msg;
+
+  FD_ZERO(&fds);
+  FD_SET(sock, &fds);
+  
+  tv.tv_sec = timeout;
+  tv.tv_usec = 0;
+  
+  msg_len = select(sock + 1, &fds, NULL, NULL, timeout >= 0 ? &tv : NULL);
+  switch(msg_len) {
+     
+  case 0: /* timeout */
+    return(0);
+    
+  case -1: /* error */
+         switch(errno) {
+         case EINTR:
+                 il_log(LOG_DEBUG, "  interrupted while waiting for event!\n");
+                 return(0);
+
+         default:
+                 set_error(IL_SYS, errno, "input_queue_get: error waiting for event");
+                 return(-1);
+         }
+  default:
+         break;
+  }
+  
+  if((accepted=accept(sock, NULL, NULL)) < 0) {
+    set_error(IL_SYS, errno, "input_queue_get: error accepting connection");
+    return(-1);
+  }
+
+  msg_len = receive_http(&accepted, plain_reader, &msg);
+
+  if(msg_len < 0) {
+         close(accepted);
+         if(error_get_maj() != IL_OK) 
+                 return -1;
+         else
+                 return 0;
+  }
+
+  if(store_to_file(&msg, offset) < 0) {
+         close(accepted);
+         return -1;
+  }
+
+  send_reply(accepted);
+  close(accepted);
+  return(msg.len);
+}
+#endif
+
diff --git a/org.glite.lb.logger/src/queue_mgr_http.c b/org.glite.lb.logger/src/queue_mgr_http.c
new file mode 100644 (file)
index 0000000..59eba4a
--- /dev/null
@@ -0,0 +1,164 @@
+#ident "$Header$"
+
+#include <string.h>
+#include <errno.h>
+#include <assert.h>
+
+#include "glite/jobid/cjobid.h"
+#include "glite/lb/context.h"
+
+#include "interlogd.h"
+
+struct queue_list {
+  struct event_queue *queue;
+  char   *dest;
+  struct queue_list *next;
+  time_t expires;
+};
+
+static struct event_queue *log_queue;
+static struct queue_list  *queues;
+
+
+static 
+int
+queue_list_create()
+{
+  queues = NULL;
+
+  return(0);
+}
+
+
+static
+int
+queue_list_find(struct queue_list *ql, const char *dest, struct queue_list **el, struct queue_list **prev)
+{
+  struct queue_list *q, *p;
+
+  assert(el != NULL);
+
+  *el = NULL;
+  if(prev)
+    *prev = NULL;
+
+  if(ql == NULL) 
+    return(0);
+
+  q = NULL;
+  p = ql;
+
+  while(p) {
+    if(strcmp(p->dest, dest) == 0) {
+      *el = p;
+      if(prev)
+       *prev = q;
+      return(1);
+    }
+
+    q = p;
+    p = p->next;
+  };
+
+  return(0);
+}
+
+
+static
+int
+queue_list_add(struct queue_list **ql, const char *dest, struct event_queue *eq)
+{
+  struct queue_list *el;
+  
+  assert(dest != NULL);
+  assert(eq != NULL);
+  assert(ql != NULL);
+
+  el = malloc(sizeof(*el));
+  if(el == NULL) {
+    set_error(IL_NOMEM, ENOMEM, "queue_list_add: not enough room for new queue");
+    return(-1);
+  }
+
+  el->dest = strdup(dest);
+  if(el->dest == NULL) {
+    free(el);
+    set_error(IL_NOMEM, ENOMEM, "queue_list_add: not enough memory for new queue");
+    return(-1);
+  }
+  el->queue = eq;
+  el->next = queues;
+  *ql = el;
+  return 0;
+}
+
+
+struct event_queue *
+queue_list_get(char *job_id_s)
+{
+  char *dest;
+  struct queue_list *q;
+  struct event_queue *eq;
+  dest = job_id_s;
+
+  if(dest == NULL) 
+    return(NULL);
+  
+  if(queue_list_find(queues, dest, &q, NULL)) {
+    return(q->queue);
+  } else {
+    eq = event_queue_create(dest);
+    if(eq)
+      queue_list_add(&queues, dest, eq);
+    return(eq);
+  }
+}
+
+
+int
+queue_list_is_log(struct event_queue *eq)
+{
+  return(eq == queue_list_get(NULL));
+}
+
+
+int
+queue_list_init(char *ls)
+{
+  return(queue_list_create());
+}
+
+
+static struct queue_list *current;
+
+
+struct event_queue *
+queue_list_first()
+{
+  current = queues;
+  return(current ? current->queue : NULL);
+}
+
+
+struct event_queue *
+queue_list_next()
+{
+  current = current ? current->next : NULL;
+  return(current ? current->queue : NULL);
+}
+
+
+int
+queue_list_remove_queue(struct event_queue *eq)
+{
+  assert(eq != NULL);
+
+  free(eq);
+  return(1);
+}
+
+
+
+/* Local Variables:           */
+/* c-indentation-style: gnu   */
+/* End:                       */
diff --git a/org.glite.lb.logger/src/send_event_http.c b/org.glite.lb.logger/src/send_event_http.c
new file mode 100644 (file)
index 0000000..6eedcaf
--- /dev/null
@@ -0,0 +1,282 @@
+#ident "$Header$"
+
+#include <assert.h>
+#include <errno.h>
+#include <stdio.h>
+#ifdef HAVE_UNISTD_H
+#include <unistd.h>
+#endif
+#include <fcntl.h>
+#include <sys/types.h>
+#include <sys/socket.h>
+#include <sys/un.h>
+
+
+/*
+ *   - L/B server protocol handling routines 
+ */
+
+#include "glite/jobid/cjobid.h"
+#include "glite/lb/il_string.h"
+#include "glite/lb/context.h"
+
+#include "interlogd.h"
+
+struct reader_data {
+       edg_wll_GssConnection *gss;
+       struct timeval *timeout;
+};
+
+
+static
+int
+gss_reader(void *user_data, char *buffer, int max_len)
+{
+  int ret;
+  struct reader_data *data = (struct reader_data *)user_data;
+  edg_wll_GssStatus gss_stat;
+
+  ret = edg_wll_gss_read(data->gss, buffer, max_len, data->timeout, &gss_stat);
+  if(ret < 0) {
+    char *gss_err = NULL;
+
+    if(ret == EDG_WLL_GSS_ERROR_GSS) {
+      edg_wll_gss_get_error(&gss_stat, "get_reply", &gss_err);
+      set_error(IL_DGGSS, ret, gss_err);
+      free(gss_err);
+    } else 
+      set_error(IL_DGGSS, ret, "get_reply");
+  }
+  return(ret);
+}
+
+
+/*
+ * Read reply from server.
+ *  Returns: -1 - error reading message, 
+ *         code > 0 - http status code from server
+ */
+static
+int 
+get_reply(struct event_queue *eq, char **buf, int *code_min)
+{
+  int ret, code;
+  int len;
+  struct timeval tv;
+  struct reader_data data;
+  il_http_message_t msg;
+
+  tv.tv_sec = TIMEOUT;
+  tv.tv_usec = 0;
+  data.gss = &eq->gss;
+  data.timeout = &tv;
+  len = receive_http(&data, gss_reader, &msg);
+  if(len < 0) {
+    set_error(IL_PROTO, LB_PROTO, "get_reply: error reading server reply");
+    return(-1);
+  }
+  if(msg.data) free(msg.data);
+  if(msg.reply_string) *buf = msg.reply_string;
+  *code_min = 0; /* XXX fill in flag for fault */
+  return(msg.reply_code);
+}
+
+
+
+/*
+ *  Returns: 0 - not connected, timeout set, 1 - OK
+ */
+int 
+event_queue_connect(struct event_queue *eq)
+{
+  int ret;
+  struct timeval tv;
+  edg_wll_GssStatus gss_stat;
+  cred_handle_t *local_cred_handle;
+
+  assert(eq != NULL);
+
+#ifdef LB_PERF
+  if(!nosend) {
+#endif
+
+  if(eq->gss.context == NULL) {
+
+    tv.tv_sec = TIMEOUT;
+    tv.tv_usec = 0;
+
+    /* get pointer to the credentials */
+    if(pthread_mutex_lock(&cred_handle_lock) < 0)
+           abort();
+    local_cred_handle = cred_handle;
+    local_cred_handle->counter++;
+    if(pthread_mutex_unlock(&cred_handle_lock) < 0)
+           abort();
+    
+    il_log(LOG_DEBUG, "    trying to connect to %s:%d\n", eq->dest_name, eq->dest_port);
+    ret = edg_wll_gss_connect(local_cred_handle->creds, eq->dest_name, eq->dest_port, &tv, &eq->gss, &gss_stat);
+    if(pthread_mutex_lock(&cred_handle_lock) < 0)
+           abort();
+    /* check if we need to release the credentials */
+    --local_cred_handle->counter;
+    if(local_cred_handle != cred_handle && local_cred_handle->counter == 0) {
+           edg_wll_gss_release_cred(&local_cred_handle->creds, NULL);
+           free(local_cred_handle);
+           il_log(LOG_DEBUG, "   freed credentials, not used anymore\n");
+    }
+    if(pthread_mutex_unlock(&cred_handle_lock) < 0) 
+           abort();
+
+    if(ret < 0) {
+      char *gss_err = NULL;
+
+      if (ret == EDG_WLL_GSS_ERROR_GSS)
+        edg_wll_gss_get_error(&gss_stat, "event_queue_connect: edg_wll_gss_connect", &gss_err);
+      set_error(IL_DGGSS, ret,
+               (ret == EDG_WLL_GSS_ERROR_GSS) ? gss_err : "event_queue_connect: edg_wll_gss_connect");
+      if (gss_err) free(gss_err);
+      eq->gss.context = NULL;
+      eq->timeout = TIMEOUT;
+      return(0);
+    }
+  }
+
+#ifdef LB_PERF
+  }
+#endif
+
+  return(1);
+}
+
+
+int
+event_queue_close(struct event_queue *eq)
+{
+  assert(eq != NULL);
+
+#ifdef LB_PERF
+  if(!nosend) {
+#endif
+
+  if(eq->gss.context != NULL) {
+    edg_wll_gss_close(&eq->gss, NULL);
+    eq->gss.context = NULL;
+  }
+#ifdef LB_PERF
+  }
+#endif
+  return(0);
+}
+
+
+/* 
+ * Send all events from the queue.
+ *   Returns: -1 - system error, 0 - not sent, 1 - queue empty
+ */
+int 
+event_queue_send(struct event_queue *eq)
+{
+  int events_sent = 0;
+  assert(eq != NULL);
+
+#ifdef LB_PERF
+  if(!nosend) {
+#endif
+  if(eq->gss.context == NULL)
+    return(0);
+#ifdef LB_PERF
+  }
+#endif
+
+  /* feed the server with events */
+  while (!event_queue_empty(eq)) {
+    struct server_msg *msg;
+    char *rep;
+    int  ret, code, code_min;
+    size_t bytes_sent;
+    struct timeval tv;
+    edg_wll_GssStatus gss_stat;
+
+    clear_error();
+
+    if(event_queue_get(eq, &msg) < 0) 
+      return(-1);
+
+    il_log(LOG_DEBUG, "    trying to deliver event at offset %d for job %s\n", msg->offset, msg->job_id_s);
+
+#ifdef LB_PERF
+    if(!nosend) {
+#endif
+        /* XXX: ljocha -- does it make sense to send empty messages ? */
+       if (msg->len) {
+           tv.tv_sec = TIMEOUT;
+           tv.tv_usec = 0;
+           ret = edg_wll_gss_write_full(&eq->gss, msg->msg, msg->len, &tv, &bytes_sent, &gss_stat);
+           if(ret < 0) {
+                   if (ret == EDG_WLL_GSS_ERROR_ERRNO && errno == EPIPE && events_sent > 0) {
+                           eq->timeout = 0;
+                   }  else {
+                           il_log(LOG_ERR, "send_event: %s\n", error_get_msg());
+                           eq->timeout = TIMEOUT;
+                   }
+                   return(0);
+           }
+           if((code = get_reply(eq, &rep, &code_min)) < 0) {
+                   /* could not get the reply properly, so try again later */
+                   if (events_sent>0) 
+                           eq->timeout = 1;
+                   else {
+                           eq->timeout = TIMEOUT;
+                           il_log(LOG_ERR, "  error reading server %s reply:\n    %s\n", eq->dest_name, error_get_msg());
+                   }
+                   return(0);
+           }
+       }
+       else { code = 200; code_min = 0; rep = strdup("not sending empty message"); }
+#ifdef LB_PERF
+    } else {
+           glite_wll_perftest_consumeEventIlMsg(msg->msg+17);
+           code = 200;
+           rep = strdup("OK");
+    }
+#endif
+    
+    il_log(LOG_DEBUG, "    event sent, server %s replied with %d, %s\n", eq->dest_name, code, rep);
+    free(rep);
+
+    /* the reply is back here, decide what to do with message */
+    /* HTTP error codes:
+       1xx - informational (eg. 100 Continue)
+       2xx - successful (eg. 200 OK)
+       3xx - redirection (eg. 301 Moved Permanently)
+       4xx - client error (eq. 400 Bad Request)
+       5xx - server error (eq. 500 Internal Server Error)
+    */
+    if(code >= 500 && code < 600) {
+
+           /* non fatal errors (for us), try to deliver later */
+           eq->timeout = TIMEOUT;
+           return(0);
+    }
+
+    /* the message was consumed (successfully or not) */
+    /* update the event pointer */
+    if(event_store_commit(msg->es, msg->ev_len, queue_list_is_log(eq)) < 0) 
+           /* failure committing message, this is bad */
+           return(-1);
+    
+    event_queue_remove(eq);
+    events_sent++;
+  } /* while */
+
+  return(1);
+
+} /* send_events */
+
+
+/* this is just not used */
+int
+send_confirmation(long lllid, int code)
+{
+       return 0;
+}
diff --git a/org.glite.lb.logger/src/server_msg_http.c b/org.glite.lb.logger/src/server_msg_http.c
new file mode 100644 (file)
index 0000000..37be900
--- /dev/null
@@ -0,0 +1,127 @@
+#ident "$Header$"
+
+#include <errno.h>
+#include <assert.h>
+#include <string.h>
+
+#include "interlogd.h"
+#include "glite/lb/il_msg.h" 
+#include "glite/lb/events_parse.h"
+#include "glite/lb/context.h"
+
+static
+int 
+create_msg(il_http_message_t *ev, char **buffer, long *receipt, time_t *expires)
+{
+  char *event = ev->data;
+
+  *receipt = 0;
+  *expires = 0;
+
+  *buffer = ev->data;
+  return ev->len;;
+}
+
+
+struct server_msg *
+server_msg_create(il_octet_string_t *event, long offset)
+{
+  struct server_msg *msg;
+
+  msg = malloc(sizeof(*msg));
+  if(msg == NULL) {
+    set_error(IL_NOMEM, ENOMEM, "server_msg_create: out of memory allocating message");
+    return(NULL);
+  }
+
+  if(server_msg_init(msg, event) < 0) {
+    server_msg_free(msg);
+    return(NULL);
+  }
+  msg->offset = offset;
+
+  return(msg);
+}
+
+
+struct server_msg *
+server_msg_copy(struct server_msg *src)
+{
+  struct server_msg *msg;
+
+  msg = malloc(sizeof(*msg));
+  if(msg == NULL) {
+    set_error(IL_NOMEM, ENOMEM, "server_msg_copy: out of memory allocating message");
+    return(NULL);
+  }
+  
+  msg->msg = malloc(src->len);
+  if(msg->msg == NULL) {
+    set_error(IL_NOMEM, ENOMEM, "server_msg_copy: out of memory allocating server message");
+    server_msg_free(msg);
+    return(NULL);
+  }
+  msg->len = src->len;
+  memcpy(msg->msg, src->msg, src->len);
+
+  msg->job_id_s = strdup(src->job_id_s);
+  msg->ev_len = src->ev_len;
+  msg->es = src->es;
+  msg->receipt_to = src->receipt_to;
+  msg->offset = src->offset;
+#if defined(IL_NOTIFICATIONS)
+  msg->dest_name = strdup(src->dest_name);
+  msg->dest_port = src->dest_port;
+  msg->dest = strdup(src->dest);
+#endif
+  msg->expires = src->expires;
+  return(msg);
+}
+
+
+int
+server_msg_init(struct server_msg *msg, il_octet_string_t *event)
+{
+       il_http_message_t *hmsg = (il_http_message_t *)event;
+
+       assert(msg != NULL);
+       assert(event != NULL);
+
+       memset(msg, 0, sizeof(*msg));
+
+
+       msg->job_id_s = hmsg->host;
+       if(msg->job_id_s == NULL) {
+               set_error(IL_LBAPI, EDG_WLL_ERROR_PARSE_BROKEN_ULM, "server_msg_init: error getting id");
+               return -1;
+       }
+       msg->len = create_msg(hmsg, &msg->msg, &msg->receipt_to, &msg->expires);
+       if(msg->len < 0)
+               return -1;
+       /* set this to indicate new data owner */
+       hmsg->data = NULL;
+       msg->ev_len = hmsg->len; /* XXX: add lentgh size too */
+       return 0;
+
+}
+
+
+int
+server_msg_is_priority(struct server_msg *msg)
+{
+  assert(msg != NULL);
+
+  return(msg->receipt_to != 0);
+}
+
+
+int
+server_msg_free(struct server_msg *msg)
+{
+  assert(msg != NULL);
+
+  if(msg->msg) free(msg->msg);
+  if(msg->job_id_s) free(msg->job_id_s);
+  free(msg);
+  return 0;
+}