From: Michal Voců Date: Tue, 21 Apr 2009 13:23:21 +0000 (+0000) Subject: fix thread creation, fix destination mapping X-Git-Tag: glite-lbjp-common-db_R_1_0_0_7~8 X-Git-Url: http://scientific.zcu.cz/git/?a=commitdiff_plain;h=57a83738edfd2a08ccef1350ec0bac51e4330e8f;p=jra1mw.git fix thread creation, fix destination mapping --- diff --git a/org.glite.lb.logger/src/event_queue.c b/org.glite.lb.logger/src/event_queue.c index 18bdcdc..f90afd8 100644 --- a/org.glite.lb.logger/src/event_queue.c +++ b/org.glite.lb.logger/src/event_queue.c @@ -283,6 +283,7 @@ event_queue_move_events(struct event_queue *eq_s, struct event_queue_msg *p, **source_prev, **dest_tail; assert(eq_s != NULL); + assert(data != NULL); event_queue_lock(eq_s); if(eq_d) { @@ -294,10 +295,10 @@ event_queue_move_events(struct event_queue *eq_s, p = *source_prev; eq_s->tail = NULL; while(p) { - if((*cmp_func)(p->msg, data)) { - il_log(LOG_DEBUG, " moving event at offset %d(%d) from %s:%d to %s:%d\n", - p->msg->offset, p->msg->generation, eq_s->dest_name, eq_s->dest_port, - eq_d ? eq_d->dest_name : "trash", eq_d ? eq_d->dest_port : -1); + if((*cmp_func)(p->msg, data)) { + il_log(LOG_DEBUG, " moving event at offset %d(%d) from %s:%d to %s:%d\n", + p->msg->offset, p->msg->generation, eq_s->dest_name, eq_s->dest_port, + eq_d ? eq_d->dest_name : "trash", eq_d ? eq_d->dest_port : -1); /* il_log(LOG_DEBUG, " current: %x, next: %x\n", p, p->prev); */ /* remove the message from the source list */ *source_prev = p->prev; diff --git a/org.glite.lb.logger/src/event_store.c b/org.glite.lb.logger/src/event_store.c index 33ee416..1e9063f 100644 --- a/org.glite.lb.logger/src/event_store.c +++ b/org.glite.lb.logger/src/event_store.c @@ -485,15 +485,26 @@ static int cmp_jobid(struct server_msg *msg, void *data) { + assert(msg != NULL); + assert(data != NULL); + char *job_id_s = (char*)data; return strcmp(msg->job_id_s, job_id_s) == 0; } +struct cmp_exp_data { + char *job_id_s; + time_t expires; +}; + static int cmp_jobid_set_exp(struct server_msg *msg, void *data) { - struct server_msg *m = (struct server_msg *)data; + struct cmp_exp_data *m = (struct cmp_exp_data *)data; + + assert(msg != NULL); + assert(data != NULL); if(strcmp(msg->job_id_s, m->job_id_s) == 0) { msg->expires = m->expires; @@ -723,31 +734,39 @@ event_store_recover(struct event_store *es) msg->generation = es->generation; #ifdef IL_NOTIFICATIONS + il_log(LOG_DEBUG, "DEBUG: message dest %s, last dest %s, known dest %s\n", + msg->dest, last_dest, eq_b ? eq_b->dest : "none"); /* check message destination */ if(msg->dest == NULL) { - /* the message does not have destination itself, use destination cached for notification id */ - if(eq_b == NULL) { - /* no destination is known for notification id, commit it immediately */ - il_log(LOG_DEBUG, " message has no known destination, will not be sent\n"); - event_store_commit(es, msg->ev_len, 0, msg->generation); - } + /* the message does not have destination itself, use destination cached for notification id */ + if(eq_b == NULL) { + /* no destination is known for notification id, commit it immediately */ + il_log(LOG_DEBUG, " message has no known destination, will not be sent\n"); + event_store_commit(es, msg->ev_len, 0, msg->generation); + } } else { /* check if we know destination for notification id */ if(eq_b == NULL) { eq_b = queue_list_get(msg->dest); - } else { - if(last_dest == NULL || strcmp(msg->dest, last_dest) != 0) { - /* destination changed */ - last_dest = msg->dest; - } + if(notifid_map_set_dest(es->job_id_s, eq_b) < 0) { + break; + } } + /* remember last message destination */ + if(last_dest == NULL || strcmp(msg->dest, last_dest) != 0) { + /* destination changed */ + if(last_dest) { + free(last_dest); + } + last_dest = strdup(msg->dest); + } } /* check message expiration */ if(last_exp == 0 || last_exp != msg->expires) { last_exp = msg->expires; } - #else +#else /* first enqueue to the LS */ if(!bs_only && (last >= last_ls)) { @@ -782,26 +801,43 @@ event_store_recover(struct event_store *es) #if defined(IL_NOTIFICATIONS) /* check if we have to move events to new destination */ + il_log(LOG_DEBUG, " last destination %s, last known destination %s\n", last_dest, eq_b->dest); if(last_dest && strcmp(last_dest, eq_b->dest)) { struct event_queue *eq_dest = queue_list_get(last_dest); /* set new destination */ if(notifid_map_set_dest(es->job_id_s, eq_dest) < 0) { ret = -1; - } - - /* move all events with this notif_id from eq_b to eq_dest */ - event_queue_move_events(eq_b, eq_dest, cmp_jobid, es->job_id_s); - eq_b = eq_dest; + } else { - /* XXX - we should kill the old queue too */ + /* move all events with this notif_id from eq_b to eq_dest */ + event_queue_move_events(eq_b, eq_dest, cmp_jobid, es->job_id_s); + eq_b = eq_dest; + il_log(LOG_DEBUG, " all messages for notif id %s are now destined to %s\n", + es->job_id_s, eq_b->dest); + if(event_queue_create_thread(eq_b) < 0) { + ret = -1; + } else { + event_queue_cond_lock(eq_b); + event_queue_signal(eq_b); + event_queue_cond_unlock(eq_b); + } + } + } + if(last_dest) { + free(last_dest); + last_dest = NULL; } /* if the expiration changed, set new one */ if(last_exp != notifid_map_get_expiration(es->job_id_s)) { + struct cmp_exp_data data; + notifid_map_set_expiration(es->job_id_s, last_exp); /* set expiration for all events with this notif id */ - event_queue_move_events(eq_b, NULL, cmp_jobid_set_exp, msg); + data.job_id_s = es->job_id_s; + data.expires = last_exp; + event_queue_move_events(eq_b, NULL, cmp_jobid_set_exp, &data); } #endif diff --git a/org.glite.lb.logger/src/interlogd.h b/org.glite.lb.logger/src/interlogd.h index edbbad0..7210983 100644 --- a/org.glite.lb.logger/src/interlogd.h +++ b/org.glite.lb.logger/src/interlogd.h @@ -158,7 +158,7 @@ struct event_queue { edg_wll_GssConnection gss; /* GSS connection */ char *dest_name; int dest_port; - char *dest; + char *dest; int timeout; /* queue timeout */ struct event_queue_msg *tail; /* last message in the queue */ struct event_queue_msg *head; /* first message in the queue */ diff --git a/org.glite.lb.logger/src/queue_thread.c b/org.glite.lb.logger/src/queue_thread.c index 20c25e7..da2cb57 100644 --- a/org.glite.lb.logger/src/queue_thread.c +++ b/org.glite.lb.logger/src/queue_thread.c @@ -213,7 +213,7 @@ event_queue_create_thread(struct event_queue *eq) event_queue_lock(eq); /* if there is a thread already, just return */ - if(eq->thread_id > 0) { + if(eq->thread_id != 0) { event_queue_unlock(eq); return(0); }