il_log(LOG_DEBUG, " moving event at offset %d(%d) from %s:%d to %s:%d\n",
p->msg->offset, p->msg->generation, eq_s->dest_name, eq_s->dest_port,
eq_d ? eq_d->dest_name : "trash", eq_d ? eq_d->dest_port : -1);
- il_log(LOG_DEBUG, " current: %x, next: %x\n", p, p->prev);
+ /* il_log(LOG_DEBUG, " current: %x, next: %x\n", p, p->prev); */
/* remove the message from the source list */
*source_prev = p->prev;
if(eq_d) {
}
static
-int
+long long
fname2index(const char *filename)
{
char *p = rindex(filename, '.');
char *s;
+ long long ret;
if(p == NULL)
return 0;
}
}
- return atoi(p+1)+1;
+ sscanf(p+1,"%lld",&ret);
+ return ret+1;
}
es->rotate_index = filename ? fname2index(filename) : 0;
IL_EVENT_ID_FREE(job_id);
- il_log(LOG_DEBUG, " creating event store for id %s, filename %s, rotate index %d\n",
+ il_log(LOG_DEBUG, " creating event store for id %s, filename %s, rotate index %lld\n",
job_id_s, es->event_file_name, es->rotate_index);
if(pthread_rwlock_init(&es->commit_lock, NULL))
/* change names in event_store */
es->event_file_name = strdup(newname);
es->control_file_name = astrcat(newname, ".ctl");
- es->rotate_index = num + 1;
+ es->rotate_index = 1000*timestamp + num + 1;
return(0);
}
if(enqueue_msg(eq_l, msg) < 0)
break;
#endif
- }
+ }
#ifdef IL_NOTIFICATIONS
eq_b = queue_list_get(msg->dest);
+ /* if the message does not have destination itself, use destination cached for notification id */
+ if(eq_b == NULL) {
+ eq_b = notifid_map_get_dest(msg->job_id_s);
+ if(eq_b == NULL) {
+ /* message has no destination and no destination is known for notification id,
+ * commit it immediately
+ */
+ il_log(LOG_DEBUG, " message has no known destination, will not be sent\n");
+ event_store_commit(es, msg->ev_len, 0, msg->generation);
+ /* if the expiration changed, set new one now, message will be discarded soon */
+ if(msg->expires != notifid_map_get_expiration(msg->job_id_s)) {
+ notifid_map_set_expiration(msg->job_id_s, msg->expires);
+ }
+ }
+ }
#endif
/* now enqueue to the BS, if neccessary */
il_log(LOG_DEBUG, " queueing event at %ld to bookkeeping server\n", last);
if(enqueue_msg(eq_b, msg) < 0)
- break;
+ break;
}
server_msg_free(msg);
msg = NULL;
long offset; /* expected file position of next event */
time_t last_modified; /* time of the last file modification */
int generation; /* cleanup counter, scopes the offset */
- int rotate_index; /* rotation counter */
+ long long rotate_index; /* rotation counter */
struct event_store_list *le; /* points back to the list */
pthread_rwlock_t commit_lock; /* lock to prevent simultaneous updates to last_committed_* */
pthread_rwlock_t offset_lock; /* lock to prevent simultaneous updates offset */