added handling of JPPS created SOAP files
authorMichal Voců <michal@ruk.cuni.cz>
Tue, 19 Aug 2008 12:42:56 +0000 (12:42 +0000)
committerMichal Voců <michal@ruk.cuni.cz>
Tue, 19 Aug 2008 12:42:56 +0000 (12:42 +0000)
org.glite.lb.logger/src/event_store_http.c
org.glite.lb.logger/src/http.c
org.glite.lb.logger/src/input_queue_socket_http.c
org.glite.lb.logger/src/interlogd.c
org.glite.lb.logger/src/send_event_http.c
org.glite.lb.logger/src/server_msg_http.c

index 0363ec2..a34e0a3 100644 (file)
@@ -71,27 +71,18 @@ jobid2controlfile(char *job_id_s)
   return(buffer);
 }
 
-struct file_reader_data {
-       int fd;
-       size_t max_len;
-       size_t pos;
-};
-
-#define IL_RD_VALUE(a,b) ((struct file_reader_data*)(a))->b
-
 static
 int
 file_reader(void *user_data, char *buffer, const int len)
 {
-       int l, m, ret;
+       size_t ret = 0;
        
-       m = IL_RD_VALUE(user_data, max_len) - IL_RD_VALUE(user_data, pos);
-       l = (len > m) ? m : len;
-       if(l > 0) 
-               ret = read(IL_RD_VALUE(user_data, fd), buffer, l);
-       else 
-               ret = 0;
-       IL_RD_VALUE(user_data, pos) += ret;
+       if(len > 0) {
+               ret = fread(buffer, 1, len, (FILE*)user_data);
+               if(ret == 0 && ferror((FILE*)user_data)) {
+                       return -1;
+               } 
+       }
        return ret;
 }
 
@@ -100,32 +91,24 @@ static
 int
 read_event_string(FILE *file, il_http_message_t *msg)
 {
-       struct file_reader_data rd;
-       char s_len[20];
        int  len, ret;
        int fd = fileno(file);
-
-       len = read(fd, s_len, sizeof(s_len));
-       if(len != sizeof(s_len)) {
-               if(len < 0) 
-                       set_error(IL_SYS, errno, "read_event_string: error reading record header");
-               else 
-                       set_error(IL_SYS, EIO, "read_event_string: record header too short");
-               return -1;
+       long start;
+
+       /* remember the start position */
+       start = ftell(file);
+       ret = receive_http(file, file_reader, msg);
+       if(ret < 0) return ret;
+       /* seek at the end of message in case the reader read ahead */
+       len = fseek(file, start + msg->len, SEEK_SET);
+       len = fgetc(file);
+       if(len != '\n') {
+               il_log(LOG_ERR, "error reading event from file, missing terminator character at %d, found %c(%d))\n", 
+                      start+msg->len, len, len);
+               if(msg->data) { free(msg->data); msg->data = NULL; }
+               if(msg->host) { free(msg->host); msg->host = NULL; }
+               return EINVAL;
        }
-       if(s_len[0] != 0 || s_len[sizeof(s_len) - 1] != 0) {
-               set_error(IL_SYS, EINVAL, "read_event_string: invalid record header");
-               return -1;
-       }
-       len = atoi(s_len + 1);
-       if(len < 0) {
-               set_error(IL_SYS, EINVAL, "read_event_string: invalid record length in header");
-               return -1;
-       }
-       rd.fd = fd;
-       rd.max_len = len;
-       rd.pos = 0;
-       ret = receive_http(&rd, file_reader, msg);
        return ret;
 }
 
@@ -436,14 +419,14 @@ event_store_recover(struct event_store *es)
                  /* skip all committed or already enqueued events */
                  /* be careful - check, if the offset really points to the
                     beginning of event string */
-                 if(fseek(ef, last, SEEK_SET) < 0) {
+                 if(fseek(ef, last - 1, SEEK_SET) < 0) {
                          set_error(IL_SYS, errno, "event_store_recover: error setting position for read");
                          event_store_unlock(es);
                          fclose(ef);
                          return(-1);
                  }
-                 /* the new event MUST start with 0 */
-                 if((c=fgetc(ef)) != 0) {
+                 /* the last enqueued event MUST end with \n */
+                 if((c=fgetc(ef)) != '\n') {
                          /* Houston, we have got a problem */
                          il_log(LOG_WARNING, 
                                 "    file position %ld does not point at the beginning of event string, backing off!\n",
@@ -459,7 +442,6 @@ event_store_recover(struct event_store *es)
                          }
                  } else {
                          /* OK, break out of the loop */
-                         fseek(ef, -1, SEEK_CUR); /* should ungetc, but we are reading with read... */
                          break;
                  }
          } else {
@@ -488,7 +470,7 @@ event_store_recover(struct event_store *es)
     ret = -1;
 
     /* create message for server */
-    msg = server_msg_create(&hmsg, last);
+    msg = server_msg_create((il_octet_string_t*)&hmsg, last);
     if(msg == NULL) {
            il_log(LOG_ALERT, "    event file corrupted! I will try to move it to quarantine (ie. rename it).\n");
            /* actually do not bother if quarantine succeeded or not - we could not do more */
@@ -502,7 +484,7 @@ event_store_recover(struct event_store *es)
     /* first enqueue to the LS */
     if(!bs_only && (last >= es->last_committed_ls)) {
       
-      il_log(LOG_DEBUG, "      queueing event at %ld to logging server\n", last);
+           il_log(LOG_DEBUG, "      queueing event at %ld to server %s\n", last, eq_l->dest_name);
 
 #if !defined(IL_NOTIFICATIONS)
       if(enqueue_msg(eq_l, msg) < 0)
@@ -518,7 +500,7 @@ event_store_recover(struct event_store *es)
     if((eq_b != eq_l) && 
        (last >= es->last_committed_bs)) {
       
-      il_log(LOG_DEBUG, "      queueing event at %ld to bookkeeping server\n", last);
+           il_log(LOG_DEBUG, "      queueing event at %ld to server %s\n", last, eq_b->dest_name);
       
       if(enqueue_msg(eq_b, msg) < 0)
        break;
@@ -560,11 +542,16 @@ event_store_recover(struct event_store *es)
 int
 event_store_sync(struct event_store *es, long offset)
 {
+       int ret;
+
        assert(es != NULL);
 
-       /* all events actually come through socket before going to file,
-          so nothing can be found in file that was not seen here */
-       return 1;
+       /* all events are actually read from file, the event on socket
+        * is ignored and serves just to notify us about file change
+        */
+       ret = event_store_recover(es);
+       ret = (ret < 0) ? ret : 0;
+       return(ret);
 }
 
 
index 415e86e..c9fb89b 100644 (file)
@@ -61,8 +61,9 @@ receive_http(void *user_data, int (*reader)(void *, char *, const int), il_http_
        int  len, alen, clen, i, buffer_free, min_buffer_free = DEFAULT_CHUNK_SIZE;
        char *buffer, *p, *s, *cr;
        
-       msg->data = NULL;
-       msg->len = 0;
+       memset(msg, 0, sizeof(*msg));
+       // msg->data = NULL;
+       // msg->len = 0;
        state = IN_REQUEST;
        alen = 0;
        buffer = NULL;
index 53a519d..939c45f 100644 (file)
@@ -7,12 +7,7 @@
 #include <errno.h>
 #include <assert.h>
 #include <string.h>
-#include <unistd.h>
-#include <fcntl.h>
 #include <stdio.h>
-#include <sys/stat.h>
-#include <unistd.h>
-#include <errno.h>
 
 #include "interlogd.h"
 
@@ -82,121 +77,6 @@ void input_queue_detach()
 }
 
 
-int
-store_to_file(il_http_message_t *msg, long *offset) {
-       char s_len[20];
-       char filename[PATH_MAX];
-       FILE *outfile;
-       int i, filedesc;
-       int ret = -1;
-
-       if(msg->host == NULL) {
-               set_error(IL_PROTO, EINVAL, "store_to_file: no message destination specified");
-       }
-
-       snprintf(filename, sizeof(filename), "%s.%s", file_prefix, msg->host);
-       filename[sizeof(filename) - 1] = 0;
-       snprintf(s_len+1, sizeof(s_len)-1, "%18d\n", msg->len);
-       s_len[sizeof(s_len) - 1] = 0;
-       s_len[0] = 0;
-
-try_again:
-       if((outfile = fopen(filename, "a")) == NULL) {
-               set_error(IL_SYS, errno, "store_to_file: error opening file");
-               goto cleanup;
-       }
-       if((filedesc = fileno(outfile)) < 0) {
-               set_error(IL_SYS, errno, "store_to_file: error getting file descriptor");
-               goto cleanup;
-       }
-
-       for(i = 0; i < 5; i++) {
-               struct flock filelock;
-               int filelock_status;
-               struct stat statbuf;
-
-               filelock.l_type = F_WRLCK;
-               filelock.l_whence = SEEK_SET;
-               filelock.l_start = 0;
-               filelock.l_len = 0;
-
-               if((filelock_status=fcntl(filedesc, F_SETLK, &filelock)) < 0) {
-                       switch(errno) {
-                       case EAGAIN:
-                       case EACCES:
-                       case EINTR:
-                               if((i+1) < 5) sleep(1);
-                               break;
-                       default:
-                               set_error(IL_SYS, errno, "store_to_file: error locking file");
-                               goto cleanup;
-                       }
-               } else {
-                       if(stat(filename, &statbuf)) {
-                               if(errno == ENOENT) {
-                                       fclose(outfile);
-                                       goto try_again;
-                               } else {
-                                       set_error(IL_SYS, errno, "store_file: could not stat file");
-                                       goto cleanup;
-                               }
-                       } else {
-                               /* success */
-                               break;
-                       }
-               }
-       }
-
-       if(i == 5) {
-               set_error(IL_SYS, ETIMEDOUT, "store_to_file: timed out trying to lock file");
-               goto cleanup;
-       }
-       if(fseek(outfile, 0, SEEK_END) < 0) {
-               set_error(IL_SYS, errno, "store_to_file: error seeking at end of file");
-               goto cleanup;
-       }
-       if((*offset=ftell(outfile)) < 0) {
-               set_error(IL_SYS, errno, "store_to_file: error getting current position");
-               goto cleanup;
-       }
-       if(fwrite(s_len, sizeof(s_len), 1, outfile) != 1) {
-               set_error(IL_SYS, errno, "store_to_file: error writing data header to file");
-               goto cleanup;
-       }
-       if(fwrite(msg->data, msg->len, 1, outfile) != 1) {
-               set_error(IL_SYS, errno, "store_to_file: error writing data to file");
-               goto cleanup;
-       }
-       ret = 0;
-       fflush(outfile);
-
-cleanup:
-       if(outfile) fclose(outfile);
-       return ret;
-}
-
-
-int 
-send_reply(int sd)
-{
-       const char reply[] = "<?xml version=\"1.0\" encoding=\"UTF-8\"?>"
-               "<SOAP-ENV:Envelope"
-               " xmlns:SOAP-ENV=\"http://schemas.xmlsoap.org/soap/envelope/\""
-               " xmlns:SOAP-ENC=\"http://schemas.xmlsoap.org/soap/encoding/\""
-               " xmlns:xsi=\"http://www.w3.org/2001/XMLSchema-instance\""
-               " xmlns:xsd=\"http://www.w3.org/2001/XMLSchema\""
-               " xmlns:ns3=\"http://glite.org/wsdl/types/jp\""
-               " xmlns:ns1=\"http://glite.org/wsdl/services/jp\""
-               " xmlns:ns2=\"http://glite.org/wsdl/elements/jp\">"
-               " <SOAP-ENV:Body>"
-               "  <ns2:UpdateJobsResponse>"
-               "  </ns2:UpdateJobsResponse>"
-               " </SOAP-ENV:Body>"
-               "</SOAP-ENV:Envelope>";
-       
-       return(write(sd, reply, sizeof(reply)));
-}
-
 
 /*
  * Returns: -1 on error, 0 if no message available, message length otherwise
@@ -279,13 +159,8 @@ input_queue_get(il_octet_string_t **buffer, long *offset, int timeout)
                  return 0;
   }
 
-  if(store_to_file(&msg, offset) < 0) {
-         close(accepted);
-         return -1;
-  }
-
-  send_reply(accepted);
   close(accepted);
+  *offset = -1;
   return(msg.len);
 }
 #endif
index 0ee2bde..8bf4833 100644 (file)
@@ -290,6 +290,12 @@ main (int argc, char **argv)
     exit(EXIT_FAILURE);
   }
 
+  if(!debug &&
+     (daemon(0,0) < 0)) {
+    perror("daemon");
+    exit(EXIT_FAILURE);
+  }
+
 #ifdef LB_PERF
   /* this must be called after installing signal handlers */
   glite_wll_perftest_init(NULL, /* host */
@@ -342,12 +348,6 @@ main (int argc, char **argv)
      exit(EXIT_FAILURE);
   }
   
-  if(!debug &&
-     (daemon(0,0) < 0)) {
-    perror("daemon");
-    exit(EXIT_FAILURE);
-  }
-
 #ifndef PERF_EMPTY
   /* find all unsent events waiting in files */
 #ifdef LB_PERF
index 6eedcaf..3c90562 100644 (file)
@@ -252,7 +252,7 @@ event_queue_send(struct event_queue *eq)
        4xx - client error (eq. 400 Bad Request)
        5xx - server error (eq. 500 Internal Server Error)
     */
-    if(code >= 500 && code < 600) {
+    if(code >= 100 && code < 200) {
 
            /* non fatal errors (for us), try to deliver later */
            eq->timeout = TIMEOUT;
index 37be900..8bd3623 100644 (file)
@@ -100,7 +100,8 @@ server_msg_init(struct server_msg *msg, il_octet_string_t *event)
                return -1;
        /* set this to indicate new data owner */
        hmsg->data = NULL;
-       msg->ev_len = hmsg->len; /* XXX: add lentgh size too */
+       hmsg->host = NULL;
+       msg->ev_len = hmsg->len + 1; /* must add separator size too */
        return 0;
 
 }