implement recovery by jobid from rotated event files
authorMichal Voců <michal@ruk.cuni.cz>
Thu, 2 Apr 2009 09:07:10 +0000 (09:07 +0000)
committerMichal Voců <michal@ruk.cuni.cz>
Thu, 2 Apr 2009 09:07:10 +0000 (09:07 +0000)
org.glite.lb.logger/src/event_store.c
org.glite.lb.logger/src/interlogd.h

index 60c6fd6..87f1fe8 100644 (file)
@@ -30,8 +30,6 @@ static char *file_prefix = NULL;
 struct event_store_list {
        struct event_store *es;
        struct event_store_list *next;                  // LL of event_store's
-       struct event_store_list *jobid_next;    // double LL of event_store's for this jobid - forward
-       struct event_store_list *jobid_prev;    // double LL of event_store's for this jobid - backward
 };
 
 
@@ -208,6 +206,8 @@ event_store_create(char *job_id_s, const char *filename)
   es->event_file_name = filename ? strdup(filename) : jobid2eventfile(job_id);
   es->control_file_name = filename ? astrcat(filename, ".ctl") : jobid2controlfile(job_id);
   es->rotate_index = filename ? fname2index(filename) : 0;
+  es->jobid_next = es;
+  es->jobid_prev = es;
   IL_EVENT_ID_FREE(job_id);
 
   il_log(LOG_DEBUG, "  creating event store for id %s, filename %s\n", job_id_s, es->event_file_name);
@@ -450,7 +450,27 @@ static
 int
 event_store_recover_jobid(struct event_store *es)
 {
-       // TODO check locking of store list
+       // es is locked for use already
+       struct event_store *p = es;
+
+       do {
+               event_store_recover(p);
+               if(p != es ) {
+                       event_store_release(p);
+               }
+
+               if(pthread_rwlock_rdlock(&store_list_lock))
+                       abort();
+               p = p->jobid_next;
+               if(p != es) {
+                       if(pthread_rwlock_rdlock(&p->use_lock))
+                               abort();
+               }
+               if(pthread_rwlock_unlock(&store_list_lock))
+                       abort();
+
+
+       } while(p != es);
 }
 
 
@@ -998,8 +1018,8 @@ event_store_clean(struct event_store *es)
 struct event_store *
 event_store_find(char *job_id_s, const char *filename)
 {
-  struct event_store_list *q, *p, *d;
-  struct event_store *es;
+  struct event_store_list *p, *d;
+  struct event_store *es, *q;
 
   if(pthread_rwlock_wrlock(&store_list_lock)) {
          abort();
@@ -1052,17 +1072,17 @@ event_store_find(char *job_id_s, const char *filename)
                  p->next = d->next;
                  d->next = p;
                  // insert behind d in jobid LL
-                 p->jobid_next = d->jobid_next;
-                 p->jobid_prev = d;
-                 d->jobid_next->jobid_prev = p;
-                 d->jobid_next = p;
+                 p->es->jobid_next = d->es->jobid_next;
+                 p->es->jobid_prev = d->es;
+                 d->es->jobid_next->jobid_prev = p->es;
+                 d->es->jobid_next = p->es;
          } else {
-                 struct event_store_list *r;
+                 struct event_store *r;
                  q = NULL;
-                 for(r = d->jobid_next; r != d->jobid_next; r = r->jobid_next) {
-                         if(p->es->rotate_index < r->es->rotate_index)
+                 for(r = d->es->jobid_next; r != d->es->jobid_next; r = r->jobid_next) {
+                         if(p->es->rotate_index < r->rotate_index)
                                  break;
-                         if(r->es->rotate_index > 0)
+                         if(r->rotate_index > 0)
                                  q = r;
                  }
                  // q has the last lesser non-zero index than p
@@ -1070,18 +1090,18 @@ event_store_find(char *job_id_s, const char *filename)
                          p->next = store_list;
                          store_list = p;
                          // insert behind d
-                         p->jobid_next = d->jobid_next;
-                         p->jobid_prev = d;
-                         d->jobid_next->jobid_prev = p;
-                         d->jobid_next = p;
+                         p->es->jobid_next = d->es->jobid_next;
+                         p->es->jobid_prev = d->es;
+                         d->es->jobid_next->jobid_prev = p->es;
+                         d->es->jobid_next = p->es;
                  } else {
                          p->next = q->next;
                          q->next = p;
                          // insert behind q
-                         p->jobid_next = q->jobid_next;
-                         p->jobid_prev = q;
-                         q->jobid_next->jobid_prev = p;
-                         q->jobid_next = p;
+                         p->es->jobid_next = q->jobid_next;
+                         p->es->jobid_prev = q;
+                         q->jobid_next->jobid_prev = p->es;
+                         q->jobid_next = p->es;
                  }
          }
   } else {
@@ -1107,7 +1127,7 @@ event_store_release(struct event_store *es)
 
        if(pthread_rwlock_unlock(&es->use_lock))
                abort();
-       il_log(LOG_DEBUG, "  released lock on %s\n", es->job_id_s);
+       il_log(LOG_DEBUG, "  released lock on %s (%s)\n", es->job_id_s, es->event_file_name);
        return(0);
 }
 
@@ -1453,9 +1473,9 @@ event_store_cleanup()
                  /* remove this event store from LL */
                  (*prev) = slnext;
                  /* remove this event store from jobid's LL */
-                 if(sl->jobid_next != sl) {
-                         sl->jobid_prev->jobid_next = sl->jobid_next;
-                         sl->jobid_next->jobid_prev = sl->jobid_prev;
+                 if(sl->es->jobid_next != sl->es) {
+                         sl->es->jobid_prev->jobid_next = sl->es->jobid_next;
+                         sl->es->jobid_next->jobid_prev = sl->es->jobid_prev;
                  }
                  event_store_free(sl->es);
                  free(sl);
index 834da53..7ce683f 100644 (file)
@@ -126,6 +126,8 @@ struct event_store {
        time_t    last_modified;           /* time of the last file modification */
        int       generation;              /* cleanup counter, scopes the offset */
        int               rotate_index;                    /* rotation counter */
+       struct event_store *jobid_next;    /* double LL of rotated stores - forward */
+       struct event_store *jobid_prev;    /* double LL of rotated stores - backward */
        pthread_rwlock_t commit_lock;      /* lock to prevent simultaneous updates to last_committed_* */
        pthread_rwlock_t offset_lock;      /* lock to prevent simultaneous updates offset */
        pthread_rwlock_t use_lock;         /* lock to prevent struct deallocation */