From: Michal Voců Date: Fri, 23 Nov 2007 15:30:41 +0000 (+0000) Subject: event expiration and delivery thread termination X-Git-Tag: glite-lb-client_R_3_1_1_1~5 X-Git-Url: http://scientific.zcu.cz/git/?a=commitdiff_plain;h=999c3f54ebc696b135184e145a5e79c5287e398e;p=jra1mw.git event expiration and delivery thread termination --- diff --git a/org.glite.lb.logger/src/event_queue.c b/org.glite.lb.logger/src/event_queue.c index bc9a44e..f305a8f 100644 --- a/org.glite.lb.logger/src/event_queue.c +++ b/org.glite.lb.logger/src/event_queue.c @@ -320,3 +320,41 @@ event_queue_move_events(struct event_queue *eq_s, struct event_queue *eq_d, char } #endif + +/* Remove all expired events. + * + * Returns: -1 - error, 0 - + */ +int +event_queue_clean_expired(struct event_queue *eq) +{ + time_t now = time(NULL); + struct event_queue_msg *p, **prev; + + assert(eq != NULL); + + event_queue_lock(eq); + prev = &(eq->head); + p = *prev; + eq->tail = NULL; + while(p) { + if((p->msg->expires > 0) && (p->msg->expires < now)) { + il_log(LOG_DEBUG, " removing expired event at offset %d from %s:%d\n", + p->msg->offset, eq->dest_name,eq->dest_port); + /* il_log(LOG_DEBUG, " current: %x, next: %x\n", p, p->prev); */ + /* remove the message from the list */ + *prev = p->prev; + /* commit and free the message */ + event_store_commit(p->msg->es, p->msg->ev_len, queue_list_is_log(eq)); + server_msg_free(p->msg); + free(p); + } else { + /* message stays */ + prev = &(p->prev); + eq->tail = p; + } + p = *prev; + } + event_queue_unlock(eq); + return(0); +} diff --git a/org.glite.lb.logger/src/interlogd.h b/org.glite.lb.logger/src/interlogd.h index 810314e..92f8dc1 100644 --- a/org.glite.lb.logger/src/interlogd.h +++ b/org.glite.lb.logger/src/interlogd.h @@ -61,7 +61,7 @@ // #define TIMEOUT 5 extern int TIMEOUT; #define INPUT_TIMEOUT (60) - +#define EXIT_TIMEOUT (1*60) extern gss_cred_id_t cred_handle; extern pthread_mutex_t cred_handle_lock; @@ -115,6 +115,7 @@ struct server_msg { int dest_port; char *dest; #endif + time_t expires; /* time (in seconds from epoch) the message expires */ }; @@ -165,6 +166,7 @@ int event_queue_remove(struct event_queue *); int event_queue_enqueue(struct event_queue *, char *); /* helper */ int enqueue_msg(struct event_queue *, struct server_msg *); +int event_queue_clean_expired(struct event_queue *eq); /* protocol event queue methods */ int event_queue_connect(struct event_queue *); diff --git a/org.glite.lb.logger/src/queue_thread.c b/org.glite.lb.logger/src/queue_thread.c index cb5424e..3c91c92 100644 --- a/org.glite.lb.logger/src/queue_thread.c +++ b/org.glite.lb.logger/src/queue_thread.c @@ -34,7 +34,8 @@ queue_thread(void *q) struct event_queue *eq = (struct event_queue *)q; int ret, exit; int retrycnt; - int close_timeout; + int close_timeout = 0; + int exit_timeout = EXIT_TIMEOUT; if(init_errors(0) < 0) { il_log(LOG_ERR, "Error initializing thread specific data, exiting!"); @@ -68,8 +69,15 @@ queue_thread(void *q) eq->dest_name, eq->dest_port); } close_timeout = 0; - } else - ret = event_queue_wait(eq, 0); + } else { + ret = event_queue_wait(eq, exit_timeout); + if(ret == 1) { + il_log(LOG_INFO, " thread idle for more than %d seconds, exiting\n", exit_timeout); + event_queue_close(eq); + event_queue_cond_unlock(eq); + pthread_exit((void*)0); + } + } if(ret < 0) { /* error waiting */ il_log(LOG_ERR, "queue_thread: %s\n", error_get_msg()); @@ -78,56 +86,63 @@ queue_thread(void *q) } } /* END while(empty) */ - il_log(LOG_DEBUG, " attempting delivery to %s:%d\n", eq->dest_name, eq->dest_port); /* allow other threads to signal us, ie. insert new events while * we are sending or request flush operation */ event_queue_cond_unlock(eq); - /* connect to server */ - if((ret=event_queue_connect(eq)) == 0) { - /* not connected */ - if(error_get_maj() != IL_OK) - il_log(LOG_ERR, "queue_thread: %s\n", error_get_msg()); + /* discard expired events */ + il_log(LOG_DEBUG, " discarding expired events\n"); + event_queue_clean_expired(eq); + if(!event_queue_empty(eq)) { + + /* deliver pending events */ + il_log(LOG_DEBUG, " attempting delivery to %s:%d\n", eq->dest_name, eq->dest_port); + /* connect to server */ + if((ret=event_queue_connect(eq)) == 0) { + /* not connected */ + if(error_get_maj() != IL_OK) + il_log(LOG_ERR, "queue_thread: %s\n", error_get_msg()); #if defined(IL_NOTIFICATIONS) - il_log(LOG_INFO, " could not connect to client %s, waiting for retry\n", eq->dest_name); + il_log(LOG_INFO, " could not connect to client %s, waiting for retry\n", eq->dest_name); #else - il_log(LOG_INFO, " could not connect to bookkeeping server %s, waiting for retry\n", eq->dest_name); + il_log(LOG_INFO, " could not connect to bookkeeping server %s, waiting for retry\n", eq->dest_name); #endif - retrycnt++; - } else { - retrycnt = 0; - /* connected, send events */ - switch(ret=event_queue_send(eq)) { - - case 0: - /* there was an error and we still have events to send */ - if(error_get_maj() != IL_OK) + retrycnt++; + } else { + retrycnt = 0; + /* connected, send events */ + switch(ret=event_queue_send(eq)) { + + case 0: + /* there was an error and we still have events to send */ + if(error_get_maj() != IL_OK) + il_log(LOG_ERR, "queue_thread: %s\n", error_get_msg()); + il_log(LOG_DEBUG, " events still waiting\n"); + break; + + case 1: + /* hey, we are done for now */ + il_log(LOG_DEBUG, " all events for %s sent\n", eq->dest_name); + break; + + default: + /* internal error */ il_log(LOG_ERR, "queue_thread: %s\n", error_get_msg()); - il_log(LOG_DEBUG, " events still waiting\n"); - break; - - case 1: - /* hey, we are done for now */ - il_log(LOG_DEBUG, " all events for %s sent\n", eq->dest_name); - break; - - default: - /* internal error */ - il_log(LOG_ERR, "queue_thread: %s\n", error_get_msg()); - exit = 1; - break; - - } /* switch */ + exit = 1; + break; + + } /* switch */ - /* we are done for now anyway, so close the queue */ - if((ret == 1) && lazy_close) - close_timeout = default_close_timeout; - else { - event_queue_close(eq); - il_log(LOG_DEBUG, " connection to %s:%d closed\n", - eq->dest_name, eq->dest_port); + /* we are done for now anyway, so close the queue */ + if((ret == 1) && lazy_close) + close_timeout = default_close_timeout; + else { + event_queue_close(eq); + il_log(LOG_DEBUG, " connection to %s:%d closed\n", + eq->dest_name, eq->dest_port); + } } } diff --git a/org.glite.lb.logger/src/server_msg.c b/org.glite.lb.logger/src/server_msg.c index 3b1002d..0f8c988 100644 --- a/org.glite.lb.logger/src/server_msg.c +++ b/org.glite.lb.logger/src/server_msg.c @@ -12,7 +12,7 @@ static int -create_msg(il_octet_string_t *ev, char **buffer, long *receipt) +create_msg(il_octet_string_t *ev, char **buffer, long *receipt, time_t *expires) { char *p; int len; char *event = ev->data; @@ -57,6 +57,12 @@ create_msg(il_octet_string_t *ev, char **buffer, long *receipt) } #endif + if(p = strstr(event, "DG.EXPIRES")) { + int n; + + p += 11; + *expires = atoi(p); + } len = encode_il_msg(buffer, ev); if(len < 0) { set_error(IL_NOMEM, ENOMEM, "create_msg: out of memory allocating message"); @@ -117,6 +123,7 @@ server_msg_copy(struct server_msg *src) msg->dest_port = src->dest_port; msg->dest = strdup(src->dest); #endif + msg->expires = src->expires; return(msg); } @@ -154,7 +161,7 @@ server_msg_init(struct server_msg *msg, il_octet_string_t *event) msg->job_id_s = edg_wll_NotifIdUnparse(notif_event->notification.notifId); if(notif_event->notification.jobstat && (strlen(notif_event->notification.jobstat) > 0)) { - msg->len = create_msg(event, &msg->msg, &msg->receipt_to); + msg->len = create_msg(event, &msg->msg, &msg->receipt_to, &msg->expires); } edg_wll_FreeEvent(notif_event); free(notif_event); @@ -162,7 +169,7 @@ server_msg_init(struct server_msg *msg, il_octet_string_t *event) return(-1); } #else - msg->len = create_msg(event, &msg->msg, &msg->receipt_to); + msg->len = create_msg(event, &msg->msg, &msg->receipt_to, &msg->expires); if(msg->len < 0) { return(-1); }