From 3f2eaec39679138db9c9708b7e3aa32353fbfbbb Mon Sep 17 00:00:00 2001 From: =?utf8?q?Michal=20Voc=C5=AF?= Date: Fri, 30 Nov 2007 19:05:22 +0000 Subject: [PATCH] added notification expiration --- org.glite.lb.logger/src/event_queue.c | 52 ++++------------------------- org.glite.lb.logger/src/il_master.c | 30 ++++++++++++++++- org.glite.lb.logger/src/interlogd.h | 5 +-- org.glite.lb.logger/src/queue_mgr.c | 60 ++++++++++++++++++++++++---------- org.glite.lb.logger/src/queue_thread.c | 17 +++++++++- org.glite.lb.logger/src/server_msg.c | 1 + 6 files changed, 98 insertions(+), 67 deletions(-) diff --git a/org.glite.lb.logger/src/event_queue.c b/org.glite.lb.logger/src/event_queue.c index f305a8f..fcfcfbc 100644 --- a/org.glite.lb.logger/src/event_queue.c +++ b/org.glite.lb.logger/src/event_queue.c @@ -270,15 +270,15 @@ event_queue_remove(struct event_queue *eq) return(0); } -#if defined(IL_NOTIFICATIONS) - int -event_queue_move_events(struct event_queue *eq_s, struct event_queue *eq_d, char *notif_id) +event_queue_move_events(struct event_queue *eq_s, + struct event_queue *eq_d, + int (*cmp_func)(struct server_msg *, void *), + void *data) { struct event_queue_msg *p, **source_prev, **dest_tail; assert(eq_s != NULL); - assert(notif_id != NULL); event_queue_lock(eq_s); if(eq_d) { @@ -290,9 +290,10 @@ event_queue_move_events(struct event_queue *eq_s, struct event_queue *eq_d, char p = *source_prev; eq_s->tail = NULL; while(p) { - if(strcmp(p->msg->job_id_s, notif_id) == 0) { + if((*cmp_func)(p->msg, data)) { il_log(LOG_DEBUG, " moving event at offset %d from %s:%d to %s:%d\n", - p->msg->offset, eq_s->dest_name,eq_s->dest_port, eq_d ? eq_d->dest_name : "trash",eq_d ? eq_d->dest_port : -1); + p->msg->offset, eq_s->dest_name, eq_s->dest_port, + eq_d ? eq_d->dest_name : "trash", eq_d ? eq_d->dest_port : -1); il_log(LOG_DEBUG, " current: %x, next: %x\n", p, p->prev); /* remove the message from the source list */ *source_prev = p->prev; @@ -319,42 +320,3 @@ event_queue_move_events(struct event_queue *eq_s, struct event_queue *eq_d, char return(0); } -#endif - -/* Remove all expired events. - * - * Returns: -1 - error, 0 - - */ -int -event_queue_clean_expired(struct event_queue *eq) -{ - time_t now = time(NULL); - struct event_queue_msg *p, **prev; - - assert(eq != NULL); - - event_queue_lock(eq); - prev = &(eq->head); - p = *prev; - eq->tail = NULL; - while(p) { - if((p->msg->expires > 0) && (p->msg->expires < now)) { - il_log(LOG_DEBUG, " removing expired event at offset %d from %s:%d\n", - p->msg->offset, eq->dest_name,eq->dest_port); - /* il_log(LOG_DEBUG, " current: %x, next: %x\n", p, p->prev); */ - /* remove the message from the list */ - *prev = p->prev; - /* commit and free the message */ - event_store_commit(p->msg->es, p->msg->ev_len, queue_list_is_log(eq)); - server_msg_free(p->msg); - free(p); - } else { - /* message stays */ - prev = &(p->prev); - eq->tail = p; - } - p = *prev; - } - event_queue_unlock(eq); - return(0); -} diff --git a/org.glite.lb.logger/src/il_master.c b/org.glite.lb.logger/src/il_master.c index 793959c..e394e5a 100644 --- a/org.glite.lb.logger/src/il_master.c +++ b/org.glite.lb.logger/src/il_master.c @@ -15,6 +15,27 @@ #include "glite/lb/lb_perftest.h" #endif +static +int +cmp_jobid(struct server_msg *msg, void *data) +{ + char *job_id_s = (char*)data; + return strcmp(msg->job_id_s, job_id_s) == 0; +} + +static +int +cmp_jobid_set_exp(struct server_msg *msg, void *data) +{ + struct server_msg *m = (struct server_msg *)data; + + if(strcmp(msg->job_id_s, m->job_id_s) == 0) { + msg->expires = m->expires; + } + return 0; +} + + int enqueue_msg(struct event_queue *eq, struct server_msg *msg) { @@ -31,10 +52,17 @@ enqueue_msg(struct event_queue *eq, struct server_msg *msg) return(-1); /* move all events with this notif_id from eq_known to eq */ if(eq_known != NULL) { - event_queue_move_events(eq_known, eq, msg->job_id_s); + event_queue_move_events(eq_known, eq, cmp_jobid, msg->job_id_s); /* XXX - we should kill the old queue too */ } } + + /* if the expiration changed, set new one */ + if(msg->expires != notifid_map_get_expiration(msg->job_id_s)) { + notifid_map_set_expiration(msg->job_id_s, msg->expires); + /* set expiration for all events with this notif id */ + event_queue_move_events(eq, NULL, cmp_jobid_set_exp, msg); + } #endif /* fire thread to take care of this queue */ diff --git a/org.glite.lb.logger/src/interlogd.h b/org.glite.lb.logger/src/interlogd.h index 92f8dc1..cf4d634 100644 --- a/org.glite.lb.logger/src/interlogd.h +++ b/org.glite.lb.logger/src/interlogd.h @@ -166,7 +166,7 @@ int event_queue_remove(struct event_queue *); int event_queue_enqueue(struct event_queue *, char *); /* helper */ int enqueue_msg(struct event_queue *, struct server_msg *); -int event_queue_clean_expired(struct event_queue *eq); +int event_queue_move_events(struct event_queue *, struct event_queue *, int (*)(struct server_msg *, void *), void *); /* protocol event queue methods */ int event_queue_connect(struct event_queue *); @@ -201,7 +201,8 @@ int queue_list_is_log(struct event_queue *); #if defined(IL_NOTIFICATIONS) struct event_queue *notifid_map_get_dest(const char *); int notifid_map_set_dest(const char *, struct event_queue *); -int event_queue_move_events(struct event_queue *, struct event_queue *, char *); +time_t notifid_map_get_expiration(const char *); +int notifid_map_set_expiration(const char *, time_t); #endif /* event store functions */ diff --git a/org.glite.lb.logger/src/queue_mgr.c b/org.glite.lb.logger/src/queue_mgr.c index abae1f3..b76c397 100644 --- a/org.glite.lb.logger/src/queue_mgr.c +++ b/org.glite.lb.logger/src/queue_mgr.c @@ -12,6 +12,9 @@ struct queue_list { struct event_queue *queue; char *dest; struct queue_list *next; +#if defined(IL_NOTIFICATIONS) + time_t expires; +#endif }; static struct event_queue *log_queue; @@ -91,24 +94,6 @@ queue_list_add(struct queue_list **ql, const char *dest, struct event_queue *eq) } -/* -static -int -queue_list_remove(struct queue_list *el, struct queue_list *prev) -{ - assert(el != NULL); - - if(prev) - prev->next = el->next; - else - queues = el->next; - - free(el); - return(1); -} -*/ - - #if !defined(IL_NOTIFICATIONS) static char * @@ -213,6 +198,17 @@ queue_list_next() return(current ? current->queue : NULL); } + +int +queue_list_remove_queue(struct event_queue *eq) +{ + assert(eq != NULL); + + free(eq); + return(1); +} + + #if defined(IL_NOTIFICATIONS) static struct queue_list *notifid_map = NULL; @@ -241,4 +237,32 @@ notifid_map_set_dest(const char *notif_id, struct event_queue *eq) } } + +time_t +notifid_map_get_expiration(const char * notif_id) +{ + struct queue_list *q; + + queue_list_find(notifid_map, notif_id, &q, NULL); + return(q ? q->expires : 0); +} + + +int +notifid_map_set_expiration(const char *notif_id, time_t exp) +{ + struct queue_list *q; + + if(queue_list_find(notifid_map, notif_id, &q, NULL)) { + q->expires = exp; + return(1); + } else { + return(0); + } +} + #endif + +/* Local Variables: */ +/* c-indentation-style: gnu */ +/* End: */ diff --git a/org.glite.lb.logger/src/queue_thread.c b/org.glite.lb.logger/src/queue_thread.c index 3c91c92..b75c501 100644 --- a/org.glite.lb.logger/src/queue_thread.c +++ b/org.glite.lb.logger/src/queue_thread.c @@ -27,6 +27,16 @@ queue_thread_cleanup(void *q) } +static time_t now; + +static +int +cmp_expires(struct server_msg *msg, void *data) +{ + time_t *t = (time_t*)data; + return msg->expires < *t; +} + static void * queue_thread(void *q) @@ -94,7 +104,8 @@ queue_thread(void *q) /* discard expired events */ il_log(LOG_DEBUG, " discarding expired events\n"); - event_queue_clean_expired(eq); + now = time(NULL); + event_queue_move_events(eq, NULL, cmp_expires, &now); if(!event_queue_empty(eq)) { /* deliver pending events */ @@ -409,3 +420,7 @@ int event_queue_cond_unlock(struct event_queue *eq) return(0); } + +/* Local Variables: */ +/* c-indentation-style: linux */ +/* End: */ diff --git a/org.glite.lb.logger/src/server_msg.c b/org.glite.lb.logger/src/server_msg.c index 0f8c988..e9578a1 100644 --- a/org.glite.lb.logger/src/server_msg.c +++ b/org.glite.lb.logger/src/server_msg.c @@ -163,6 +163,7 @@ server_msg_init(struct server_msg *msg, il_octet_string_t *event) (strlen(notif_event->notification.jobstat) > 0)) { msg->len = create_msg(event, &msg->msg, &msg->receipt_to, &msg->expires); } + msg->expires = notif_event->notification.expires; edg_wll_FreeEvent(notif_event); free(notif_event); if(msg->len < 0) { -- 1.8.2.3