}
#endif
+
+/* Remove all expired events.
+ *
+ * Returns: -1 - error, 0 -
+ */
+int
+event_queue_clean_expired(struct event_queue *eq)
+{
+ time_t now = time(NULL);
+ struct event_queue_msg *p, **prev;
+
+ assert(eq != NULL);
+
+ event_queue_lock(eq);
+ prev = &(eq->head);
+ p = *prev;
+ eq->tail = NULL;
+ while(p) {
+ if((p->msg->expires > 0) && (p->msg->expires < now)) {
+ il_log(LOG_DEBUG, " removing expired event at offset %d from %s:%d\n",
+ p->msg->offset, eq->dest_name,eq->dest_port);
+ /* il_log(LOG_DEBUG, " current: %x, next: %x\n", p, p->prev); */
+ /* remove the message from the list */
+ *prev = p->prev;
+ /* commit and free the message */
+ event_store_commit(p->msg->es, p->msg->ev_len, queue_list_is_log(eq));
+ server_msg_free(p->msg);
+ free(p);
+ } else {
+ /* message stays */
+ prev = &(p->prev);
+ eq->tail = p;
+ }
+ p = *prev;
+ }
+ event_queue_unlock(eq);
+ return(0);
+}
struct event_queue *eq = (struct event_queue *)q;
int ret, exit;
int retrycnt;
- int close_timeout;
+ int close_timeout = 0;
+ int exit_timeout = EXIT_TIMEOUT;
if(init_errors(0) < 0) {
il_log(LOG_ERR, "Error initializing thread specific data, exiting!");
eq->dest_name, eq->dest_port);
}
close_timeout = 0;
- } else
- ret = event_queue_wait(eq, 0);
+ } else {
+ ret = event_queue_wait(eq, exit_timeout);
+ if(ret == 1) {
+ il_log(LOG_INFO, " thread idle for more than %d seconds, exiting\n", exit_timeout);
+ event_queue_close(eq);
+ event_queue_cond_unlock(eq);
+ pthread_exit((void*)0);
+ }
+ }
if(ret < 0) {
/* error waiting */
il_log(LOG_ERR, "queue_thread: %s\n", error_get_msg());
}
} /* END while(empty) */
- il_log(LOG_DEBUG, " attempting delivery to %s:%d\n", eq->dest_name, eq->dest_port);
/* allow other threads to signal us, ie. insert new events while
* we are sending or request flush operation
*/
event_queue_cond_unlock(eq);
- /* connect to server */
- if((ret=event_queue_connect(eq)) == 0) {
- /* not connected */
- if(error_get_maj() != IL_OK)
- il_log(LOG_ERR, "queue_thread: %s\n", error_get_msg());
+ /* discard expired events */
+ il_log(LOG_DEBUG, " discarding expired events\n");
+ event_queue_clean_expired(eq);
+ if(!event_queue_empty(eq)) {
+
+ /* deliver pending events */
+ il_log(LOG_DEBUG, " attempting delivery to %s:%d\n", eq->dest_name, eq->dest_port);
+ /* connect to server */
+ if((ret=event_queue_connect(eq)) == 0) {
+ /* not connected */
+ if(error_get_maj() != IL_OK)
+ il_log(LOG_ERR, "queue_thread: %s\n", error_get_msg());
#if defined(IL_NOTIFICATIONS)
- il_log(LOG_INFO, " could not connect to client %s, waiting for retry\n", eq->dest_name);
+ il_log(LOG_INFO, " could not connect to client %s, waiting for retry\n", eq->dest_name);
#else
- il_log(LOG_INFO, " could not connect to bookkeeping server %s, waiting for retry\n", eq->dest_name);
+ il_log(LOG_INFO, " could not connect to bookkeeping server %s, waiting for retry\n", eq->dest_name);
#endif
- retrycnt++;
- } else {
- retrycnt = 0;
- /* connected, send events */
- switch(ret=event_queue_send(eq)) {
-
- case 0:
- /* there was an error and we still have events to send */
- if(error_get_maj() != IL_OK)
+ retrycnt++;
+ } else {
+ retrycnt = 0;
+ /* connected, send events */
+ switch(ret=event_queue_send(eq)) {
+
+ case 0:
+ /* there was an error and we still have events to send */
+ if(error_get_maj() != IL_OK)
+ il_log(LOG_ERR, "queue_thread: %s\n", error_get_msg());
+ il_log(LOG_DEBUG, " events still waiting\n");
+ break;
+
+ case 1:
+ /* hey, we are done for now */
+ il_log(LOG_DEBUG, " all events for %s sent\n", eq->dest_name);
+ break;
+
+ default:
+ /* internal error */
il_log(LOG_ERR, "queue_thread: %s\n", error_get_msg());
- il_log(LOG_DEBUG, " events still waiting\n");
- break;
-
- case 1:
- /* hey, we are done for now */
- il_log(LOG_DEBUG, " all events for %s sent\n", eq->dest_name);
- break;
-
- default:
- /* internal error */
- il_log(LOG_ERR, "queue_thread: %s\n", error_get_msg());
- exit = 1;
- break;
-
- } /* switch */
+ exit = 1;
+ break;
+
+ } /* switch */
- /* we are done for now anyway, so close the queue */
- if((ret == 1) && lazy_close)
- close_timeout = default_close_timeout;
- else {
- event_queue_close(eq);
- il_log(LOG_DEBUG, " connection to %s:%d closed\n",
- eq->dest_name, eq->dest_port);
+ /* we are done for now anyway, so close the queue */
+ if((ret == 1) && lazy_close)
+ close_timeout = default_close_timeout;
+ else {
+ event_queue_close(eq);
+ il_log(LOG_DEBUG, " connection to %s:%d closed\n",
+ eq->dest_name, eq->dest_port);
+ }
}
}
static
int
-create_msg(il_octet_string_t *ev, char **buffer, long *receipt)
+create_msg(il_octet_string_t *ev, char **buffer, long *receipt, time_t *expires)
{
char *p; int len;
char *event = ev->data;
}
#endif
+ if(p = strstr(event, "DG.EXPIRES")) {
+ int n;
+
+ p += 11;
+ *expires = atoi(p);
+ }
len = encode_il_msg(buffer, ev);
if(len < 0) {
set_error(IL_NOMEM, ENOMEM, "create_msg: out of memory allocating message");
msg->dest_port = src->dest_port;
msg->dest = strdup(src->dest);
#endif
+ msg->expires = src->expires;
return(msg);
}
msg->job_id_s = edg_wll_NotifIdUnparse(notif_event->notification.notifId);
if(notif_event->notification.jobstat &&
(strlen(notif_event->notification.jobstat) > 0)) {
- msg->len = create_msg(event, &msg->msg, &msg->receipt_to);
+ msg->len = create_msg(event, &msg->msg, &msg->receipt_to, &msg->expires);
}
edg_wll_FreeEvent(notif_event);
free(notif_event);
return(-1);
}
#else
- msg->len = create_msg(event, &msg->msg, &msg->receipt_to);
+ msg->len = create_msg(event, &msg->msg, &msg->receipt_to, &msg->expires);
if(msg->len < 0) {
return(-1);
}