struct event_queue_msg *p, **source_prev, **dest_tail;
assert(eq_s != NULL);
+ assert(data != NULL);
event_queue_lock(eq_s);
if(eq_d) {
p = *source_prev;
eq_s->tail = NULL;
while(p) {
- if((*cmp_func)(p->msg, data)) {
- il_log(LOG_DEBUG, " moving event at offset %d(%d) from %s:%d to %s:%d\n",
- p->msg->offset, p->msg->generation, eq_s->dest_name, eq_s->dest_port,
- eq_d ? eq_d->dest_name : "trash", eq_d ? eq_d->dest_port : -1);
+ if((*cmp_func)(p->msg, data)) {
+ il_log(LOG_DEBUG, " moving event at offset %d(%d) from %s:%d to %s:%d\n",
+ p->msg->offset, p->msg->generation, eq_s->dest_name, eq_s->dest_port,
+ eq_d ? eq_d->dest_name : "trash", eq_d ? eq_d->dest_port : -1);
/* il_log(LOG_DEBUG, " current: %x, next: %x\n", p, p->prev); */
/* remove the message from the source list */
*source_prev = p->prev;
int
cmp_jobid(struct server_msg *msg, void *data)
{
+ assert(msg != NULL);
+ assert(data != NULL);
+
char *job_id_s = (char*)data;
return strcmp(msg->job_id_s, job_id_s) == 0;
}
+struct cmp_exp_data {
+ char *job_id_s;
+ time_t expires;
+};
+
static
int
cmp_jobid_set_exp(struct server_msg *msg, void *data)
{
- struct server_msg *m = (struct server_msg *)data;
+ struct cmp_exp_data *m = (struct cmp_exp_data *)data;
+
+ assert(msg != NULL);
+ assert(data != NULL);
if(strcmp(msg->job_id_s, m->job_id_s) == 0) {
msg->expires = m->expires;
msg->generation = es->generation;
#ifdef IL_NOTIFICATIONS
+ il_log(LOG_DEBUG, "DEBUG: message dest %s, last dest %s, known dest %s\n",
+ msg->dest, last_dest, eq_b ? eq_b->dest : "none");
/* check message destination */
if(msg->dest == NULL) {
- /* the message does not have destination itself, use destination cached for notification id */
- if(eq_b == NULL) {
- /* no destination is known for notification id, commit it immediately */
- il_log(LOG_DEBUG, " message has no known destination, will not be sent\n");
- event_store_commit(es, msg->ev_len, 0, msg->generation);
- }
+ /* the message does not have destination itself, use destination cached for notification id */
+ if(eq_b == NULL) {
+ /* no destination is known for notification id, commit it immediately */
+ il_log(LOG_DEBUG, " message has no known destination, will not be sent\n");
+ event_store_commit(es, msg->ev_len, 0, msg->generation);
+ }
} else {
/* check if we know destination for notification id */
if(eq_b == NULL) {
eq_b = queue_list_get(msg->dest);
- } else {
- if(last_dest == NULL || strcmp(msg->dest, last_dest) != 0) {
- /* destination changed */
- last_dest = msg->dest;
- }
+ if(notifid_map_set_dest(es->job_id_s, eq_b) < 0) {
+ break;
+ }
}
+ /* remember last message destination */
+ if(last_dest == NULL || strcmp(msg->dest, last_dest) != 0) {
+ /* destination changed */
+ if(last_dest) {
+ free(last_dest);
+ }
+ last_dest = strdup(msg->dest);
+ }
}
/* check message expiration */
if(last_exp == 0 || last_exp != msg->expires) {
last_exp = msg->expires;
}
- #else
+#else
/* first enqueue to the LS */
if(!bs_only && (last >= last_ls)) {
#if defined(IL_NOTIFICATIONS)
/* check if we have to move events to new destination */
+ il_log(LOG_DEBUG, " last destination %s, last known destination %s\n", last_dest, eq_b->dest);
if(last_dest && strcmp(last_dest, eq_b->dest)) {
struct event_queue *eq_dest = queue_list_get(last_dest);
/* set new destination */
if(notifid_map_set_dest(es->job_id_s, eq_dest) < 0) {
ret = -1;
- }
-
- /* move all events with this notif_id from eq_b to eq_dest */
- event_queue_move_events(eq_b, eq_dest, cmp_jobid, es->job_id_s);
- eq_b = eq_dest;
+ } else {
- /* XXX - we should kill the old queue too */
+ /* move all events with this notif_id from eq_b to eq_dest */
+ event_queue_move_events(eq_b, eq_dest, cmp_jobid, es->job_id_s);
+ eq_b = eq_dest;
+ il_log(LOG_DEBUG, " all messages for notif id %s are now destined to %s\n",
+ es->job_id_s, eq_b->dest);
+ if(event_queue_create_thread(eq_b) < 0) {
+ ret = -1;
+ } else {
+ event_queue_cond_lock(eq_b);
+ event_queue_signal(eq_b);
+ event_queue_cond_unlock(eq_b);
+ }
+ }
+ }
+ if(last_dest) {
+ free(last_dest);
+ last_dest = NULL;
}
/* if the expiration changed, set new one */
if(last_exp != notifid_map_get_expiration(es->job_id_s)) {
+ struct cmp_exp_data data;
+
notifid_map_set_expiration(es->job_id_s, last_exp);
/* set expiration for all events with this notif id */
- event_queue_move_events(eq_b, NULL, cmp_jobid_set_exp, msg);
+ data.job_id_s = es->job_id_s;
+ data.expires = last_exp;
+ event_queue_move_events(eq_b, NULL, cmp_jobid_set_exp, &data);
}
#endif