return(0);
}
-#if defined(IL_NOTIFICATIONS)
-
int
-event_queue_move_events(struct event_queue *eq_s, struct event_queue *eq_d, char *notif_id)
+event_queue_move_events(struct event_queue *eq_s,
+ struct event_queue *eq_d,
+ int (*cmp_func)(struct server_msg *, void *),
+ void *data)
{
struct event_queue_msg *p, **source_prev, **dest_tail;
assert(eq_s != NULL);
- assert(notif_id != NULL);
event_queue_lock(eq_s);
if(eq_d) {
p = *source_prev;
eq_s->tail = NULL;
while(p) {
- if(strcmp(p->msg->job_id_s, notif_id) == 0) {
+ if((*cmp_func)(p->msg, data)) {
il_log(LOG_DEBUG, " moving event at offset %d from %s:%d to %s:%d\n",
- p->msg->offset, eq_s->dest_name,eq_s->dest_port, eq_d ? eq_d->dest_name : "trash",eq_d ? eq_d->dest_port : -1);
+ p->msg->offset, eq_s->dest_name, eq_s->dest_port,
+ eq_d ? eq_d->dest_name : "trash", eq_d ? eq_d->dest_port : -1);
il_log(LOG_DEBUG, " current: %x, next: %x\n", p, p->prev);
/* remove the message from the source list */
*source_prev = p->prev;
return(0);
}
-#endif
-
-/* Remove all expired events.
- *
- * Returns: -1 - error, 0 -
- */
-int
-event_queue_clean_expired(struct event_queue *eq)
-{
- time_t now = time(NULL);
- struct event_queue_msg *p, **prev;
-
- assert(eq != NULL);
-
- event_queue_lock(eq);
- prev = &(eq->head);
- p = *prev;
- eq->tail = NULL;
- while(p) {
- if((p->msg->expires > 0) && (p->msg->expires < now)) {
- il_log(LOG_DEBUG, " removing expired event at offset %d from %s:%d\n",
- p->msg->offset, eq->dest_name,eq->dest_port);
- /* il_log(LOG_DEBUG, " current: %x, next: %x\n", p, p->prev); */
- /* remove the message from the list */
- *prev = p->prev;
- /* commit and free the message */
- event_store_commit(p->msg->es, p->msg->ev_len, queue_list_is_log(eq));
- server_msg_free(p->msg);
- free(p);
- } else {
- /* message stays */
- prev = &(p->prev);
- eq->tail = p;
- }
- p = *prev;
- }
- event_queue_unlock(eq);
- return(0);
-}
#include "glite/lb/lb_perftest.h"
#endif
+static
+int
+cmp_jobid(struct server_msg *msg, void *data)
+{
+ char *job_id_s = (char*)data;
+ return strcmp(msg->job_id_s, job_id_s) == 0;
+}
+
+static
+int
+cmp_jobid_set_exp(struct server_msg *msg, void *data)
+{
+ struct server_msg *m = (struct server_msg *)data;
+
+ if(strcmp(msg->job_id_s, m->job_id_s) == 0) {
+ msg->expires = m->expires;
+ }
+ return 0;
+}
+
+
int
enqueue_msg(struct event_queue *eq, struct server_msg *msg)
{
return(-1);
/* move all events with this notif_id from eq_known to eq */
if(eq_known != NULL) {
- event_queue_move_events(eq_known, eq, msg->job_id_s);
+ event_queue_move_events(eq_known, eq, cmp_jobid, msg->job_id_s);
/* XXX - we should kill the old queue too */
}
}
+
+ /* if the expiration changed, set new one */
+ if(msg->expires != notifid_map_get_expiration(msg->job_id_s)) {
+ notifid_map_set_expiration(msg->job_id_s, msg->expires);
+ /* set expiration for all events with this notif id */
+ event_queue_move_events(eq, NULL, cmp_jobid_set_exp, msg);
+ }
#endif
/* fire thread to take care of this queue */
int event_queue_enqueue(struct event_queue *, char *);
/* helper */
int enqueue_msg(struct event_queue *, struct server_msg *);
-int event_queue_clean_expired(struct event_queue *eq);
+int event_queue_move_events(struct event_queue *, struct event_queue *, int (*)(struct server_msg *, void *), void *);
/* protocol event queue methods */
int event_queue_connect(struct event_queue *);
#if defined(IL_NOTIFICATIONS)
struct event_queue *notifid_map_get_dest(const char *);
int notifid_map_set_dest(const char *, struct event_queue *);
-int event_queue_move_events(struct event_queue *, struct event_queue *, char *);
+time_t notifid_map_get_expiration(const char *);
+int notifid_map_set_expiration(const char *, time_t);
#endif
/* event store functions */
struct event_queue *queue;
char *dest;
struct queue_list *next;
+#if defined(IL_NOTIFICATIONS)
+ time_t expires;
+#endif
};
static struct event_queue *log_queue;
}
-/*
-static
-int
-queue_list_remove(struct queue_list *el, struct queue_list *prev)
-{
- assert(el != NULL);
-
- if(prev)
- prev->next = el->next;
- else
- queues = el->next;
-
- free(el);
- return(1);
-}
-*/
-
-
#if !defined(IL_NOTIFICATIONS)
static
char *
return(current ? current->queue : NULL);
}
+
+int
+queue_list_remove_queue(struct event_queue *eq)
+{
+ assert(eq != NULL);
+
+ free(eq);
+ return(1);
+}
+
+
#if defined(IL_NOTIFICATIONS)
static struct queue_list *notifid_map = NULL;
}
}
+
+time_t
+notifid_map_get_expiration(const char * notif_id)
+{
+ struct queue_list *q;
+
+ queue_list_find(notifid_map, notif_id, &q, NULL);
+ return(q ? q->expires : 0);
+}
+
+
+int
+notifid_map_set_expiration(const char *notif_id, time_t exp)
+{
+ struct queue_list *q;
+
+ if(queue_list_find(notifid_map, notif_id, &q, NULL)) {
+ q->expires = exp;
+ return(1);
+ } else {
+ return(0);
+ }
+}
+
#endif
+
+/* Local Variables: */
+/* c-indentation-style: gnu */
+/* End: */
}
+static time_t now;
+
+static
+int
+cmp_expires(struct server_msg *msg, void *data)
+{
+ time_t *t = (time_t*)data;
+ return msg->expires < *t;
+}
+
static
void *
queue_thread(void *q)
/* discard expired events */
il_log(LOG_DEBUG, " discarding expired events\n");
- event_queue_clean_expired(eq);
+ now = time(NULL);
+ event_queue_move_events(eq, NULL, cmp_expires, &now);
if(!event_queue_empty(eq)) {
/* deliver pending events */
return(0);
}
+
+/* Local Variables: */
+/* c-indentation-style: linux */
+/* End: */
(strlen(notif_event->notification.jobstat) > 0)) {
msg->len = create_msg(event, &msg->msg, &msg->receipt_to, &msg->expires);
}
+ msg->expires = notif_event->notification.expires;
edg_wll_FreeEvent(notif_event);
free(notif_event);
if(msg->len < 0) {