merge recent HEAD changes
authorAleš Křenek <ljocha@ics.muni.cz>
Tue, 14 Apr 2009 09:05:50 +0000 (09:05 +0000)
committerAleš Křenek <ljocha@ics.muni.cz>
Tue, 14 Apr 2009 09:05:50 +0000 (09:05 +0000)
org.glite.lb.logger/src/event_queue.c
org.glite.lb.logger/src/event_store.c
org.glite.lb.logger/src/interlogd.h
org.glite.lb.logger/src/send_event.c

index 455414f..702e614 100644 (file)
@@ -294,7 +294,7 @@ event_queue_move_events(struct event_queue *eq_s,
                        il_log(LOG_DEBUG, "  moving event at offset %d(%d) from %s:%d to %s:%d\n",
                               p->msg->offset, p->msg->generation, eq_s->dest_name, eq_s->dest_port, 
                               eq_d ? eq_d->dest_name : "trash", eq_d ? eq_d->dest_port : -1);
-                       il_log(LOG_DEBUG, "  current: %x, next: %x\n", p, p->prev);
+                       /* il_log(LOG_DEBUG, "  current: %x, next: %x\n", p, p->prev); */
                        /* remove the message from the source list */
                        *source_prev = p->prev;
                        if(eq_d) {
index 5aa9919..e584e89 100644 (file)
@@ -94,11 +94,12 @@ jobid2controlfile(IL_EVENT_ID_T job_id)
 }
 
 static
-int
+long long
 fname2index(const char *filename)
 {
        char *p = rindex(filename, '.');
        char *s;
+       long long       ret;
 
        if(p == NULL)
                return 0;
@@ -109,7 +110,8 @@ fname2index(const char *filename)
                }
        }
 
-       return atoi(p+1)+1;
+       sscanf(p+1,"%lld",&ret);
+       return ret+1;
 }
 
 
@@ -211,7 +213,7 @@ event_store_create(char *job_id_s, const char *filename)
   es->rotate_index = filename ? fname2index(filename) : 0;
   IL_EVENT_ID_FREE(job_id);
 
-  il_log(LOG_DEBUG, "  creating event store for id %s, filename %s, rotate index %d\n",
+  il_log(LOG_DEBUG, "  creating event store for id %s, filename %s, rotate index %lld\n",
         job_id_s, es->event_file_name, es->rotate_index);
 
   if(pthread_rwlock_init(&es->commit_lock, NULL))
@@ -440,7 +442,7 @@ event_store_rotate_file(struct event_store *es)
        /* change names in event_store */
        es->event_file_name = strdup(newname);
        es->control_file_name = astrcat(newname, ".ctl");
-       es->rotate_index = num + 1;
+       es->rotate_index = 1000*timestamp + num + 1;
 
        return(0);
 }
@@ -707,10 +709,25 @@ event_store_recover(struct event_store *es)
       if(enqueue_msg(eq_l, msg) < 0)
        break;
 #endif
-      }
+    }
 
 #ifdef IL_NOTIFICATIONS
     eq_b = queue_list_get(msg->dest);
+    /* if the message does not have destination itself, use destination cached for notification id */
+    if(eq_b == NULL) {
+       eq_b = notifid_map_get_dest(msg->job_id_s);
+       if(eq_b == NULL) {
+               /* message has no destination and no destination is known for notification id,
+                * commit it immediately
+                */
+               il_log(LOG_DEBUG, "    message has no known destination, will not be sent\n");
+                       event_store_commit(es, msg->ev_len, 0, msg->generation);
+               /* if the expiration changed, set new one now, message will be discarded soon */
+               if(msg->expires != notifid_map_get_expiration(msg->job_id_s)) {
+                       notifid_map_set_expiration(msg->job_id_s, msg->expires);
+               }
+       }
+    }
 #endif
 
     /* now enqueue to the BS, if neccessary */
@@ -720,7 +737,7 @@ event_store_recover(struct event_store *es)
       il_log(LOG_DEBUG, "      queueing event at %ld to bookkeeping server\n", last);
 
       if(enqueue_msg(eq_b, msg) < 0)
-       break;
+         break;
     }
     server_msg_free(msg);
     msg = NULL;
index c5faf7e..3b038ee 100644 (file)
@@ -125,7 +125,7 @@ struct event_store {
        long      offset;                  /* expected file position of next event */
        time_t    last_modified;           /* time of the last file modification */
        int       generation;              /* cleanup counter, scopes the offset */
-       int               rotate_index;                    /* rotation counter */
+       long long                 rotate_index;                    /* rotation counter */
        struct  event_store_list *le;      /* points back to the list */
        pthread_rwlock_t commit_lock;      /* lock to prevent simultaneous updates to last_committed_* */
        pthread_rwlock_t offset_lock;      /* lock to prevent simultaneous updates offset */
index 20c8dda..0afd62c 100644 (file)
@@ -308,7 +308,7 @@ event_queue_send(struct event_queue *eq)
                    return(0);
            }
        }
-       else { code = LB_OK; code_min = 0; rep = strdup("not sending emtpy message"); }
+       else { code = LB_OK; code_min = 0; rep = strdup("not sending empty message"); }
 #ifdef LB_PERF
     } else {
            glite_wll_perftest_consumeEventIlMsg(msg->msg+17);