infrastructure for event file rotation
authorMichal Voců <michal@ruk.cuni.cz>
Wed, 1 Apr 2009 21:34:30 +0000 (21:34 +0000)
committerMichal Voců <michal@ruk.cuni.cz>
Wed, 1 Apr 2009 21:34:30 +0000 (21:34 +0000)
org.glite.lb.logger/src/event_store.c
org.glite.lb.logger/src/interlogd.h

index 1b6c657..0c98382 100644 (file)
@@ -29,7 +29,9 @@ static char *file_prefix = NULL;
 
 struct event_store_list {
        struct event_store *es;
-       struct event_store_list *next;
+       struct event_store_list *next;                  // LL of event_store's
+       struct event_store_list *jobid_next;    // double LL of event_store's for this jobid - forward
+       struct event_store_list *jobid_prev;    // double LL of event_store's for this jobid - backward
 };
 
 
@@ -43,6 +45,20 @@ static pthread_rwlock_t store_list_lock = PTHREAD_RWLOCK_INITIALIZER;
  */
 static
 char *
+astrcat(const char *s1, const char *s2)
+{
+       char *s = malloc(strlen(s1) + strlen(s2) + 1);
+       if(s == NULL)
+               return NULL;
+       *s = 0;
+       strcat(s, s1);
+       strcat(s, s2);
+       return s;
+}
+
+
+static
+char *
 jobid2eventfile(IL_EVENT_ID_T job_id)
 {
   char *buffer;
@@ -52,9 +68,9 @@ jobid2eventfile(IL_EVENT_ID_T job_id)
     hash = IL_EVENT_GET_UNIQUE(job_id);
     asprintf(&buffer, "%s.%s", file_prefix, hash);
     free(hash);
-  } else 
+  } else
     asprintf(&buffer, "%s.default", file_prefix);
-    
+
   return(buffer);
 }
 
@@ -70,12 +86,31 @@ jobid2controlfile(IL_EVENT_ID_T job_id)
     hash = IL_EVENT_GET_UNIQUE(job_id);
     snprintf(buffer, 256, "%s.%s.ctl", file_prefix, hash);
     free(hash);
-  } else 
+  } else
     snprintf(buffer, 256, "%s.default.ctl", file_prefix);
-    
+
   return(strdup(buffer));
 }
 
+static
+int
+fname2index(const char *filename)
+{
+       char *p = rindex(filename, '.');
+       char *s;
+
+       if(p == NULL)
+               return 0;
+
+       for(s = p+1; *s != NULL; s++) {
+               if(*s < '0' || *s > '9') {
+                       return 0;
+               }
+       }
+
+       return atoi(p+1);
+}
+
 
 static
 char *
@@ -93,7 +128,7 @@ read_event_string(FILE *file)
   len = 1024;
 
   while((c=fgetc(file)) != EOF) {
-    
+
     /* we have to have free room for one byte */
     /* if(len - (p - buffer) < 1) */
     if(p - buffer >= len) {
@@ -112,7 +147,7 @@ read_event_string(FILE *file)
       *p++ = 0;
       break;
     } else
-      *p++ = (char) c; 
+      *p++ = (char) c;
   }
 
   if(c != EVENT_SEPARATOR) {
@@ -149,7 +184,7 @@ event_store_free(struct event_store *es)
 
 static
 struct event_store *
-event_store_create(char *job_id_s)
+event_store_create(char *job_id_s, const char *filename)
 {
   struct event_store *es;
   IL_EVENT_ID_T job_id;
@@ -162,8 +197,6 @@ event_store_create(char *job_id_s)
 
   memset(es, 0, sizeof(*es));
 
-  il_log(LOG_DEBUG, "  creating event store for id %s\n", job_id_s);
-
   job_id = NULL;
   if(strcmp(job_id_s, "default") && IL_EVENT_ID_PARSE(job_id_s, &job_id)) {
     set_error(IL_LBAPI, EDG_WLL_ERROR_PARSE_BROKEN_ULM, "event_store_create: error parsing id");
@@ -172,15 +205,18 @@ event_store_create(char *job_id_s)
   }
 
   es->job_id_s = strdup(job_id_s);
-  es->event_file_name = jobid2eventfile(job_id);
-  es->control_file_name = jobid2controlfile(job_id);
+  es->event_file_name = filename ? strdup(filename) : jobid2eventfile(job_id);
+  es->control_file_name = filename ? astrcat(filename, ".ctl") : jobid2controlfile(job_id);
+  es->rotate_index = filename ? fname2index(filename) : 0;
   IL_EVENT_ID_FREE(job_id);
 
-  if(pthread_rwlock_init(&es->commit_lock, NULL)) 
+  il_log(LOG_DEBUG, "  creating event store for id %s, filename %s\n", job_id_s, es->event_file_name);
+
+  if(pthread_rwlock_init(&es->commit_lock, NULL))
           abort();
-  if(pthread_rwlock_init(&es->offset_lock, NULL)) 
+  if(pthread_rwlock_init(&es->offset_lock, NULL))
           abort();
-  if(pthread_rwlock_init(&es->use_lock, NULL)) 
+  if(pthread_rwlock_init(&es->use_lock, NULL))
          abort();
 
   return(es);
@@ -193,7 +229,7 @@ event_store_lock_ro(struct event_store *es)
 {
   assert(es != NULL);
 
-  if(pthread_rwlock_rdlock(&es->commit_lock)) 
+  if(pthread_rwlock_rdlock(&es->commit_lock))
     abort();
 
   return(0);
@@ -206,7 +242,7 @@ event_store_lock(struct event_store *es)
 {
   assert(es != NULL);
 
-  if(pthread_rwlock_wrlock(&es->commit_lock)) 
+  if(pthread_rwlock_wrlock(&es->commit_lock))
     abort();
 
   return(0);
@@ -265,8 +301,8 @@ event_store_write_ctl(struct event_store *es)
     return(-1);
   }
 
-  if(fprintf(ctl, "%s\n%ld\n%ld\n", 
-            es->job_id_s, 
+  if(fprintf(ctl, "%s\n%ld\n%ld\n",
+            es->job_id_s,
             es->last_committed_ls,
             es->last_committed_bs) < 0) {
     set_error(IL_SYS, errno, "event_store_write_ctl: error writing control record");
@@ -283,20 +319,23 @@ event_store_write_ctl(struct event_store *es)
 
 
 /*
- * event_store_qurantine() 
- *   - rename damaged event store file 
- *   - essentially does the same actions as cleanup, but the event store 
+ * event_store_qurantine()
+ *   - rename damaged event store file
+ *   - essentially does the same actions as cleanup, but the event store
  *     does not have to be empty
  * returns 0 on success, -1 on error
  */
 static
 int
-event_store_quarantine(struct event_store *es) 
+event_store_quarantine(struct event_store *es)
 {
+       // TODO enable cleanup of quarantined event_store struct
+       // TODO handle file rotation
+
        int num;
        char newname[MAXPATHLEN+1];
 
-       /* find available qurantine name */
+       /* find available quarantine name */
        /* we give it at most 1024 tries */
        for(num = 0; num < 1024; num++) {
                struct stat st;
@@ -309,9 +348,9 @@ event_store_quarantine(struct event_store *es)
                                break;
                        } else {
                                /* some other error with name, probably permanent */
-                               set_error(IL_SYS, errno, "event_store_qurantine: error looking for qurantine filename");
+                               set_error(IL_SYS, errno, "event_store_qurantine: error looking for quarantine filename");
                                return(-1);
-                               
+
                        }
                } else {
                        /* the filename is used already */
@@ -345,6 +384,77 @@ event_store_quarantine(struct event_store *es)
 
 
 /*
+ * event_store_rotate_file()
+ * returns 0 on success, -1 on error
+ */
+static
+int
+event_store_rotate_file(struct event_store *es)
+{
+       int num;
+       char newname[MAXPATHLEN+1];
+
+       /* do not rotate already rotated files */
+       if(es->rotate_index > 0)
+               return 0;
+
+       /* find available name */
+       /* we give it at most 1024 tries */
+       for(num = 0; num < 1024; num++) {
+               struct stat st;
+
+               snprintf(newname, MAXPATHLEN, "%s.%d", es->event_file_name, num);
+               newname[MAXPATHLEN] = 0;
+               if(stat(newname, &st) < 0) {
+                       if(errno == ENOENT) {
+                               /* file not found */
+                               break;
+                       } else {
+                               /* some other error with name, probably permanent */
+                               set_error(IL_SYS, errno, "event_store_rotate_file: error looking for available filename");
+                               return(-1);
+
+                       }
+               } else {
+                       /* the filename is used already */
+               }
+       }
+       if(num >= 1024) {
+               /* new name not found */
+               /* XXX - is there more suitable error? */
+               set_error(IL_SYS, ENOSPC, "event_store_quarantine: exhausted number of retries looking for quarantine filename");
+               return(-1);
+       }
+
+       /* actually rename the file */
+       il_log(LOG_DEBUG, "    renaming too large event file from %s to %s\n",
+              es->event_file_name, newname);
+       if(rename(es->event_file_name, newname) < 0) {
+               set_error(IL_SYS, errno, "event_store_rotate_file: error renaming event file");
+               return(-1);
+       }
+
+       /* change names in event_store */
+       es->event_file_name = strdup(newname);
+       es->control_file_name = astrcat(newname, ".ctl");
+
+       return(0);
+}
+
+
+/*
+ * event_store_recover_jobid()
+ *  - recover all event stores for given jobid
+ */
+static
+int
+event_store_recover_jobid(struct event_store *es)
+{
+       // TODO check locking of store list
+}
+
+
+/*
  * event_store_recover()
  *   - recover after restart or catch up when events missing in IPC
  *   - if offset > 0, read everything behind it
@@ -364,13 +474,13 @@ event_store_recover(struct event_store *es)
   struct stat stbuf;
 
   assert(es != NULL);
-  
+
 #if defined(IL_NOTIFICATIONS)
   /* destination queue has to be found for each message separately */
 #else
-  /* find bookkepping server queue */
+  /* find bookkeeping server queue */
   eq_b = queue_list_get(es->job_id_s);
-  if(eq_b == NULL) 
+  if(eq_b == NULL)
     return(-1);
 #endif
 
@@ -389,12 +499,12 @@ event_store_recover(struct event_store *es)
   /* open event file */
   ef = fopen(es->event_file_name, "r");
   if(ef == NULL) {
-         snprintf(err_msg, sizeof(err_msg), 
+         snprintf(err_msg, sizeof(err_msg),
                   "event_store_recover: error opening event file %s",
                   es->event_file_name);
          set_error(IL_SYS, errno, err_msg);
          event_store_unlock(es);
-         if(pthread_rwlock_unlock(&es->offset_lock)) 
+         if(pthread_rwlock_unlock(&es->offset_lock))
                  abort();
          return(-1);
   }
@@ -406,12 +516,12 @@ event_store_recover(struct event_store *es)
   efl.l_start = 0;
   efl.l_len = 0;
   if(fcntl(fd, F_SETLKW, &efl) < 0) {
-         snprintf(err_msg, sizeof(err_msg), 
+         snprintf(err_msg, sizeof(err_msg),
                   "event_store_recover: error locking event file %s",
                   es->event_file_name);
          set_error(IL_SYS, errno, err_msg);
          event_store_unlock(es);
-         if(pthread_rwlock_unlock(&es->offset_lock)) 
+         if(pthread_rwlock_unlock(&es->offset_lock))
                  abort();
          fclose(ef);
          return(-1);
@@ -423,7 +533,7 @@ event_store_recover(struct event_store *es)
          il_log(LOG_ERR, "    could not stat event file %s: %s\n", es->event_file_name, strerror(errno));
          fclose(ef);
          event_store_unlock(es);
-         if(pthread_rwlock_unlock(&es->offset_lock)) 
+         if(pthread_rwlock_unlock(&es->offset_lock))
                  abort();
          return -1;
   } else {
@@ -431,12 +541,17 @@ event_store_recover(struct event_store *es)
                  il_log(LOG_DEBUG, "  event file not modified since last visit, skipping\n");
                  fclose(ef);
                  event_store_unlock(es);
-                 if(pthread_rwlock_unlock(&es->offset_lock)) 
+                 if(pthread_rwlock_unlock(&es->offset_lock))
                          abort();
                  return(0);
          }
   }
 
+  /* check the file size, rename it if it is bigger than max_store_size */
+  if(stbuf.st_size > max_store_size) {
+         event_store_rotate_file(es);
+  }
+
   while(1) { /* try, try, try */
 
          /* get the position in file to be sought */
@@ -444,7 +559,7 @@ event_store_recover(struct event_store *es)
                  last = es->offset;
          else {
 #if !defined(IL_NOTIFICATIONS)
-                 if(eq_b == eq_l) 
+                 if(eq_b == eq_l)
                          last = es->last_committed_ls;
                  else
 #endif
@@ -471,7 +586,7 @@ event_store_recover(struct event_store *es)
                          set_error(IL_SYS, errno, "event_store_recover: error setting position for read");
                          event_store_unlock(es);
                          fclose(ef);
-                         if(pthread_rwlock_unlock(&es->offset_lock)) 
+                         if(pthread_rwlock_unlock(&es->offset_lock))
                                  abort();
                          return(-1);
                  }
@@ -479,7 +594,7 @@ event_store_recover(struct event_store *es)
                     even if the offset points at EOF */
                  if((c=fgetc(ef)) != EVENT_SEPARATOR) {
                          /* Houston, we have got a problem */
-                         il_log(LOG_WARNING, 
+                         il_log(LOG_WARNING,
                                 "    file position %ld does not point at the beginning of event string, backing off!\n",
                                 last);
                          /* now, where were we? */
@@ -503,7 +618,7 @@ event_store_recover(struct event_store *es)
                          set_error(IL_SYS, errno, "event_store_recover: error setting position for read");
                          event_store_unlock(es);
                          fclose(ef);
-                         if(pthread_rwlock_unlock(&es->offset_lock)) 
+                         if(pthread_rwlock_unlock(&es->offset_lock))
                                  abort();
                          return(-1);
                  }
@@ -511,7 +626,7 @@ event_store_recover(struct event_store *es)
          }
   }
 
-  /* now we have: 
+  /* now we have:
    *   - event file opened at position 'last'
    *   - offset and last_committed_* potentially reset to zero
    */
@@ -550,7 +665,7 @@ event_store_recover(struct event_store *es)
            /* actually do not bother if quarantine succeeded or not - we could not do more */
            event_store_quarantine(es);
            fclose(ef);
-           if(pthread_rwlock_unlock(&es->offset_lock)) 
+           if(pthread_rwlock_unlock(&es->offset_lock))
                    abort();
            return(-1);
     }
@@ -559,8 +674,8 @@ event_store_recover(struct event_store *es)
 
     /* first enqueue to the LS */
     if(!bs_only && (last >= last_ls)) {
-      
-      il_log(LOG_DEBUG, "      queueing event at %ld to logging server\n", last);
+
+      il_log(LOG_DEBUG, "      queuing event at %ld to logging server\n", last);
 
 #if !defined(IL_NOTIFICATIONS)
       if(enqueue_msg(eq_l, msg) < 0)
@@ -573,11 +688,11 @@ event_store_recover(struct event_store *es)
 #endif
 
     /* now enqueue to the BS, if neccessary */
-    if((eq_b != eq_l) && 
+    if((eq_b != eq_l) &&
        (last >= last_bs)) {
-      
+
       il_log(LOG_DEBUG, "      queueing event at %ld to bookkeeping server\n", last);
-      
+
       if(enqueue_msg(eq_b, msg) < 0)
        break;
     }
@@ -596,13 +711,13 @@ event_store_recover(struct event_store *es)
   es->last_modified = stbuf.st_mtime;
   il_log(LOG_DEBUG, "  event store offset set to %ld\n", last);
 
-  if(msg) 
+  if(msg)
     server_msg_free(msg);
 
   fclose(ef);
   il_log(LOG_DEBUG, "  finished reading events with %d\n", ret);
 
-  if(pthread_rwlock_unlock(&es->offset_lock)) 
+  if(pthread_rwlock_unlock(&es->offset_lock))
          abort();
 
   return(ret);
@@ -628,13 +743,13 @@ event_store_sync(struct event_store *es, long offset)
    * event will be read from file, socket now serves only to notify
    * about possible event file change.
    */
-  ret = event_store_recover(es);
+  ret = event_store_recover_jobid(es);
   ret = (ret < 0) ? ret : 0;
   return(ret);
 
 #if 0
   event_store_lock_ro(es);
-  if(es->offset == offset) 
+  if(es->offset == offset)
     /* we are up to date */
     ret = 1;
   else if(es->offset > offset)
@@ -644,9 +759,9 @@ event_store_sync(struct event_store *es, long offset)
     /* es->offset < offset, i.e. we have missed some events */
     event_store_unlock(es);
     ret = event_store_recover(es);
-    /* XXX possible room for intervention by another thread - is there 
-     * any other thread messing with us? 
-     * 1) After recover() es->offset is set at the end of file. 
+    /* XXX possible room for intervention by another thread - is there
+     * any other thread messing with us?
+     * 1) After recover() es->offset is set at the end of file.
      * 2) es->offset is set only by recover() and next().
      * 3) Additional recover can not do much harm.
      * 4) And next() is only called by the same thread as sync().
@@ -655,7 +770,7 @@ event_store_sync(struct event_store *es, long offset)
     event_store_lock_ro(es);
     if(ret < 0)
       ret = -1;
-    else 
+    else
            if(es->offset <= offset) {
                    /* Apparently there is something wrong - we are receiving an event
                     * which is beyond the end of file. Someone must have removed the file
@@ -680,16 +795,16 @@ int
 event_store_next(struct event_store *es, long offset, int len)
 {
   assert(es != NULL);
-  
+
   /* Commented out due to the fact that offset as received on socket
    * has little to do with real event file at the moment. es->offset
    * handling is left solely to the event_store_recover().
    */
-   
+
 #if 0
   event_store_lock(es);
   /* Whoa, be careful now. The es->offset points right after the last enqueued event,
-   * but it may not be the offset of the event WE have just enqueued, because:!    
+   * but it may not be the offset of the event WE have just enqueued, because:!
    *  1) someone could have removed the event file behind our back
    *  2) the file could have been recover()ed and more events read
    * In either case the offset should not be moved.
@@ -704,7 +819,7 @@ event_store_next(struct event_store *es, long offset, int len)
 }
 
 
-/* 
+/*
  * event_store_commit()
  *
  */
@@ -713,11 +828,11 @@ event_store_commit(struct event_store *es, int len, int ls, int generation)
 {
   assert(es != NULL);
 
-  /* do not move counters if event store with this message was cleaned up 
+  /* do not move counters if event store with this message was cleaned up
    * (this can happen only when moving to quarantine)
    */
   /* XXX - assume int access is atomic */
-  if(generation != es->generation) 
+  if(generation != es->generation)
          return 0;
 
   event_store_lock(es);
@@ -749,7 +864,7 @@ event_store_commit(struct event_store *es, int len, int ls, int generation)
  * Q: How do we know that we can safely remove the files?
  * A: When all events from file have been committed both by LS and BS.
  */
-static 
+static
 int
 event_store_clean(struct event_store *es)
 {
@@ -780,7 +895,7 @@ event_store_clean(struct event_store *es)
          abort();
   }
 
-  /* the file can only be removed when all the events were succesfully sent 
+  /* the file can only be removed when all the events were succesfully sent
      (ie. committed both by LS and BS */
   /* That also implies that the event queues are 'empty' at the moment. */
   ef = fopen(es->event_file_name, "r+");
@@ -788,14 +903,14 @@ event_store_clean(struct event_store *es)
     /* if we can not open the event store, it is an error and the struct should be removed */
     /* XXX - is it true? */
     event_store_unlock(es);
-    if(pthread_rwlock_unlock(&es->offset_lock)) 
+    if(pthread_rwlock_unlock(&es->offset_lock))
            abort();
     il_log(LOG_ERR,  "  event_store_clean: error opening event file: %s\n", strerror(errno));
     return(1);
   }
-  
+
   fd = fileno(ef);
-  
+
   /* prevent local-logger from writing into event file */
   efl.l_type = F_WRLCK;
   efl.l_whence = SEEK_SET;
@@ -805,7 +920,7 @@ event_store_clean(struct event_store *es)
     il_log(LOG_DEBUG, "    could not lock event file, cleanup aborted\n");
     fclose(ef);
     event_store_unlock(es);
-    if(pthread_rwlock_unlock(&es->offset_lock)) 
+    if(pthread_rwlock_unlock(&es->offset_lock))
            abort();
     if(errno != EACCES &&
        errno != EAGAIN) {
@@ -814,25 +929,25 @@ event_store_clean(struct event_store *es)
     }
     return(0);
   }
-  
+
   /* now the file should not contain partially written event, so it is safe
      to get offset behind last event by seeking the end of file */
   if(fseek(ef, 0, SEEK_END) < 0) {
     set_error(IL_SYS, errno, "event_store_clean: error seeking the end of file");
     event_store_unlock(es);
-    if(pthread_rwlock_unlock(&es->offset_lock)) 
+    if(pthread_rwlock_unlock(&es->offset_lock))
            abort();
     fclose(ef);
     return(-1);
   }
-  
+
   last = ftell(ef);
   il_log(LOG_DEBUG, "    total bytes in file: %d\n", last);
 
   if(es->last_committed_ls < last) {
     fclose(ef);
     event_store_unlock(es);
-    if(pthread_rwlock_unlock(&es->offset_lock)) 
+    if(pthread_rwlock_unlock(&es->offset_lock))
            abort();
     il_log(LOG_DEBUG, "    events still waiting in queue, cleanup aborted\n");
     return(0);
@@ -842,18 +957,18 @@ event_store_clean(struct event_store *es)
             some undelivered events referring to that event store */
          fclose(ef);
          event_store_unlock(es);
-         if(pthread_rwlock_unlock(&es->offset_lock)) 
+         if(pthread_rwlock_unlock(&es->offset_lock))
                  abort();
          return(0);
   }
-  
+
   /* now we are sure that all events were sent and the event queues are empty */
   il_log(LOG_INFO, "    removing event file %s\n", es->event_file_name);
-  
+
   /* remove the event file */
   unlink(es->event_file_name);
   unlink(es->control_file_name);
-  
+
   /* clear the counters */
   es->last_committed_ls = 0;
   es->last_committed_bs = 0;
@@ -865,7 +980,7 @@ event_store_clean(struct event_store *es)
 
   /* unlock the event_store even if it is going to be removed */
   event_store_unlock(es);
-  if(pthread_rwlock_unlock(&es->offset_lock)) 
+  if(pthread_rwlock_unlock(&es->offset_lock))
          abort();
 
   /* close the event file (that unlocks it as well) */
@@ -881,9 +996,9 @@ event_store_clean(struct event_store *es)
  * --------------------------------
  */
 struct event_store *
-event_store_find(char *job_id_s)
+event_store_find(char *job_id_s, const char *filename = NULL)
 {
-  struct event_store_list *q, *p;
+  struct event_store_list *q, *p, *d;
   struct event_store *es;
 
   if(pthread_rwlock_wrlock(&store_list_lock)) {
@@ -891,27 +1006,30 @@ event_store_find(char *job_id_s)
   }
 
   es = NULL;
-  
-  q = NULL;
+
+  d = NULL;
   p = store_list;
-  
+
   while(p) {
     if(strcmp(p->es->job_id_s, job_id_s) == 0) {
-      es = p->es;
-      if(pthread_rwlock_rdlock(&es->use_lock))
-             abort();
-      if(pthread_rwlock_unlock(&store_list_lock)) 
-             abort();
-      return(es);
-    }
-
-    q = p;
+               es = p->es;
+           d = p;
+       // if filename was given, compare it as well
+       if(filename == NULL || strcmp(p->es->event_file_name, filename) != 0) {
+               if(pthread_rwlock_rdlock(&es->use_lock))
+                       abort();
+               if(pthread_rwlock_unlock(&store_list_lock))
+                       abort();
+               return(es);
+       }
+       }
     p = p->next;
   }
 
-  es = event_store_create(job_id_s);
+  // event store for given jobid and filename was not found, create one
+  es = event_store_create(job_id_s, filename);
   if(es == NULL) {
-         if(pthread_rwlock_unlock(&store_list_lock)) 
+         if(pthread_rwlock_unlock(&store_list_lock))
                  abort();
          return(NULL);
   }
@@ -919,20 +1037,63 @@ event_store_find(char *job_id_s)
   p = malloc(sizeof(*p));
   if(p == NULL) {
     set_error(IL_NOMEM, ENOMEM, "event_store_find: no room for new event store");
-      if(pthread_rwlock_unlock(&store_list_lock)) 
+      if(pthread_rwlock_unlock(&store_list_lock))
              abort();
     return(NULL);
   }
-  
-  p->next = store_list;
-  store_list = p;
-    
   p->es = es;
 
+  if(filename != NULL && d != NULL) {
+         // there is another event store for this jobid;
+         //    d points to the last event store for this jobid in LL
+         // find proper place to insert new event store
+         if(p->es->rotate_index == 0) {
+                 // insert behind d in LL
+                 p->next = d->next;
+                 d->next = p;
+                 // insert behind d in jobid LL
+                 p->jobid_next = d->jobid_next;
+                 p->jobid_prev = d;
+                 d->jobid_next->jobid_prev = p;
+                 d->jobid_next = p;
+         } else {
+                 struct event_store_list *r;
+                 q = NULL;
+                 for(r = d->jobid_next; r != d->jobid_next; r = r->jobid_next) {
+                         if(p->es->rotate_index < r->es->rotate_index)
+                                 break;
+                         if(r->es->rotate_index > 0)
+                                 q = r;
+                 }
+                 // q has the last lesser non-zero index than p
+                 if(q == NULL) {
+                         p->next = store_list;
+                         store_list = p;
+                         // insert behind d
+                         p->jobid_next = d->jobid_next;
+                         p->jobid_prev = d;
+                         d->jobid_next->jobid_prev = p;
+                         d->jobid_next = p;
+                 } else {
+                         p->next = q->next;
+                         q->next = p;
+                         // insert behind q
+                         p->jobid_next = q->jobid_next;
+                         p->jobid_prev = q;
+                         q->jobid_next->jobid_prev = p;
+                         q->jobid_next = p;
+                 }
+         }
+  } else {
+         // insert at the beginning
+         p->next = store_list;
+         store_list = p;
+  }
+
   if(pthread_rwlock_rdlock(&es->use_lock))
          abort();
 
-  if(pthread_rwlock_unlock(&store_list_lock)) 
+  if(pthread_rwlock_unlock(&store_list_lock))
          abort();
 
   return(es);
@@ -965,9 +1126,9 @@ event_store_from_file(char *filename)
        char *dest_name = NULL;
 
 #endif
-       
+
        il_log(LOG_INFO, "  attaching to event file: %s\n", filename);
-       
+
        if(strstr(filename, "quarantine") != NULL) {
                il_log(LOG_INFO, "  file name belongs to quarantine, not touching that.\n");
                return(0);
@@ -980,9 +1141,9 @@ event_store_from_file(char *filename)
        }
        event_s = read_event_string(event_file);
        fclose(event_file);
-       if(event_s == NULL) 
+       if(event_s == NULL)
                return(0);
-       
+
 #if defined(IL_NOTIFICATIONS)
        edg_wll_InitContext(&context);
        ret=edg_wll_ParseNotifEvent(context, event_s, &notif_event);
@@ -993,7 +1154,7 @@ event_store_from_file(char *filename)
                goto out;
        }
        if(notif_event->notification.notifId == NULL) {
-               set_error(IL_LBAPI, EDG_WLL_ERROR_PARSE_BROKEN_ULM, 
+               set_error(IL_LBAPI, EDG_WLL_ERROR_PARSE_BROKEN_ULM,
                          "event_store_from_file: parse error - no notif id");
                ret = -1;
                goto out;
@@ -1004,12 +1165,12 @@ event_store_from_file(char *filename)
                goto out;
        }
        /*  XXX: what was that good for?
-       if(notif_event->notification.dest_host && 
+       if(notif_event->notification.dest_host &&
           (strlen(notif_event->notification.dest_host) > 0)) {
                asprintf(&dest_name, "%s:%d", notif_event->notification.dest_host, notif_event->notification.dest_port);
        }
        */
-       
+
 #else
        job_id_s = edg_wll_GetJobId(event_s);
 #endif
@@ -1019,9 +1180,9 @@ event_store_from_file(char *filename)
                ret = 0;
                goto out;
        }
-       
-       es=event_store_find(job_id_s);
-       
+
+       es = event_store_find(job_id_s, filename);
+
        if(es == NULL) {
                ret = -1;
                goto out;
@@ -1031,9 +1192,9 @@ event_store_from_file(char *filename)
           (es->last_committed_bs == 0) &&
           (es->offset == 0)) {
                ret = event_store_read_ctl(es);
-       } else 
+       } else
                ret = 0;
-       
+
        event_store_release(es);
 
 out:
@@ -1043,7 +1204,7 @@ out:
                free(notif_event);
        }
 #endif
-       if(event_s) free(event_s); 
+       if(event_s) free(event_s);
        if(job_id_s) free(job_id_s);
        return(ret);
 }
@@ -1085,12 +1246,12 @@ event_store_init(char *prefix)
       set_error(IL_SYS, errno, "event_store_init: error opening event directory");
       return(-1);
     }
-    
+
     while((entry=readdir(event_dir))) {
       char *s;
 
       /* skip all files that do not match prefix */
-      if(strncmp(entry->d_name, p, len) != 0) 
+      if(strncmp(entry->d_name, p, len) != 0)
        continue;
 
       /* skip all control files */
@@ -1128,12 +1289,12 @@ event_store_init(char *prefix)
       set_error(IL_SYS, errno, "event_store_init: error opening event directory");
       return(-1);
     }
-    
+
     while((entry=readdir(event_dir))) {
       char *s;
 
       /* skip all files that do not match prefix */
-      if(strncmp(entry->d_name, p, len) != 0) 
+      if(strncmp(entry->d_name, p, len) != 0)
        continue;
 
       /* find all control files */
@@ -1164,9 +1325,9 @@ event_store_init(char *prefix)
                      /* could not stat file, remove ctl */
                      strcat(ef, s);
                      il_log(LOG_DEBUG, "  removing stale file %s\n", ef);
-                     if(unlink(ef)) 
+                     if(unlink(ef))
                              il_log(LOG_ERR, "  could not remove file %s: %s\n", ef, strerror(errno));
-                     
+
              }
              free(ef);
 
@@ -1186,7 +1347,7 @@ event_store_recover_all()
   struct event_store_list *sl;
 
 
-  if(pthread_rwlock_rdlock(&store_list_lock)) 
+  if(pthread_rwlock_rdlock(&store_list_lock))
          abort();
 
   /* recover all event stores */
@@ -1201,15 +1362,15 @@ event_store_recover_all()
          }
          sl = sl->next;
   }
-  
-  if(pthread_rwlock_unlock(&store_list_lock)) 
+
+  if(pthread_rwlock_unlock(&store_list_lock))
          abort();
 
   return(0);
 }
 
 
-#if 0 
+#if 0
 int
 event_store_remove(struct event_store *es)
 {
@@ -1221,7 +1382,7 @@ event_store_remove(struct event_store *es)
   case 0:
     il_log(LOG_DEBUG, "  event store not removed, still used\n");
     return(0);
-    
+
   case 1:
     if(pthread_rwlock_wrlock(&store_list_lock) < 0) {
       set_error(IL_SYS, errno, "  event_store_remove: error locking event store list");
@@ -1265,7 +1426,7 @@ event_store_cleanup()
 
   /* try to remove event files */
 
-  if(pthread_rwlock_wrlock(&store_list_lock)) 
+  if(pthread_rwlock_wrlock(&store_list_lock))
          abort();
 
   sl = store_list;
@@ -1275,11 +1436,11 @@ event_store_cleanup()
          int ret;
 
          slnext = sl->next;
-         
+
          /* one event store at time */
          ret = pthread_rwlock_trywrlock(&sl->es->use_lock);
          if(ret == EBUSY) {
-                 il_log(LOG_DEBUG, "  event_store %s is in use by another thread\n", 
+                 il_log(LOG_DEBUG, "  event_store %s is in use by another thread\n",
                         sl->es->job_id_s);
                  sl = slnext;
                  continue;
@@ -1287,33 +1448,38 @@ event_store_cleanup()
            abort();
 
          switch(event_store_clean(sl->es)) {
-                 
+
          case 1:
-                 /* remove this event store */
+                 /* remove this event store from LL */
                  (*prev) = slnext;
+                 /* remove this event store from jobid's LL */
+                 if(sl->jobid_next != sl) {
+                         sl->jobid_prev->jobid_next = sl->jobid_next;
+                         sl->jobid_next->jobid_prev = sl->jobid_prev;
+                 }
                  event_store_free(sl->es);
                  free(sl);
                  break;
-                 
+
          case -1:
-                 il_log(LOG_ERR, "  error removing event store %s (file %s):\n    %s\n", 
+                 il_log(LOG_ERR, "  error removing event store %s (file %s):\n    %s\n",
                         sl->es->job_id_s, sl->es->event_file_name, error_get_msg());
                  /* event_store_release(sl->es); */
                  clear_error();
                  /* go on to the next */
-                 
+
          default:
                  event_store_release(sl->es);
                  prev = &(sl->next);
                  break;
          }
-         
+
          sl = slnext;
   }
-  
-  if(pthread_rwlock_unlock(&store_list_lock)) 
+
+  if(pthread_rwlock_unlock(&store_list_lock))
          abort();
-  
+
   return(0);
 }
 
index df3ed84..0a13297 100644 (file)
@@ -125,6 +125,7 @@ struct event_store {
        long      offset;                  /* expected file position of next event */
        time_t    last_modified;           /* time of the last file modification */
        int       generation;              /* cleanup counter, scopes the offset */
+       int               rotate_index;                    /* rotation counter */
        pthread_rwlock_t commit_lock;      /* lock to prevent simultaneous updates to last_committed_* */
        pthread_rwlock_t offset_lock;      /* lock to prevent simultaneous updates offset */
        pthread_rwlock_t use_lock;         /* lock to prevent struct deallocation */