char *dest;
char *owner;
time_t expires; /* time (in seconds from epoch) the message expires */
+ int use_count; /* number of queue threads trying to deliver this message */
+ pthread_mutex_t use_lock; /* protect the use_count */
};
int server_msg_is_priority(struct server_msg *);
#endif
int server_msg_free(struct server_msg *);
+void server_msg_use(struct server_msg *);
+int server_msg_release(struct server_msg *);
/* general event queue methods */
struct event_queue *event_queue_create(char *, struct il_output_plugin *);
struct server_msg *msg;
struct event_queue_msg *prev;
struct event_queue_msg *next;
+ struct event_queue *eq; /* queue we are currently in */
};
struct event_queue *
if(++eq->cur_len > eq->max_len)
eq->max_len = eq->cur_len;
+ el->eq = eq;
+
event_queue_unlock(eq);
/* end of critical section */
me->current = el;
me->jobid = el->msg->job_id_s;
*msg = el->msg;
+ server_msg_use(*msg);
}
event_queue_unlock(eq);
/* this is critical section */
event_queue_lock(eq);
- if(el == el->prev) {
- /* last element */
- eq->head = NULL;
- eq->tail_ems = NULL;
- } else {
- el->next->prev = el->prev;
- el->prev->next = el->next;
- if(el == eq->head) {
- eq->head = el->prev;
+ me->current = NULL;
+ me->jobid = NULL;
+
+ if(eq == el->eq) {
+ /* message is still ours, remove from queue */
+ if(el == el->prev) {
+ /* last element */
+ eq->head = NULL;
+ eq->tail_ems = NULL;
+ } else {
+ el->next->prev = el->prev;
+ el->prev->next = el->next;
+ if(el == eq->head) {
+ eq->head = el->prev;
+ }
+ if(el == eq->tail_ems) {
+ eq->tail_ems = el->prev;
+ }
}
- if(el == eq->tail_ems) {
- eq->tail_ems = el->prev;
+
+ if(--eq->cur_len == 0)
+ eq->times_empty++;
+
+ if(eq->cur_len <= queue_size_low) {
+ eq->throttling = 0;
}
+ } else {
+ /* not ours anymore */
+ server_msg_release(el->msg);
+ event_queue_unlock(eq);
+ return 0;
}
#if 0 /* OLD IMPLEMENTATION */
#endif
#endif /* OLD IMPLEMENTATION */
- if(--eq->cur_len == 0)
- eq->times_empty++;
-
- if(eq->cur_len <= queue_size_low) {
- eq->throttling = 0;
- }
- me->current = NULL;
- me->jobid = NULL;
-
event_queue_unlock(eq);
/* end of critical section */
eq_d ? eq_d->dest_name : "trash", eq_d ? eq_d->dest_port : -1);
q = p->prev;
/* remove message from the source queue */
- if(p->next == p->prev) {
+ if(p == p->prev) {
/* removing last message */
eq_s->head = NULL;
eq_s->tail_ems = NULL;
if(++eq_d->cur_len > eq_d->max_len) {
eq_d->max_len = eq_d->cur_len;
}
+ p->eq = eq_d;
} else {
/* signal that the message was 'delivered' */
event_store_commit(p->msg->es, p->msg->ev_len, queue_list_is_log(eq_s),
me->timeout = 0;
else
me->timeout = TIMEOUT;
+ server_msg_release(msg);
return(0);
}
glite_common_log(IL_LOG_CATEGORY, LOG_PRIORITY_WARN, " error reading server %s reply: %s",
eq->dest_name, error_get_msg());
}
+ server_msg_release(msg);
return(0);
}
}
if(!(ENOENT == code_min)) {
/* non fatal errors (for us) */
me->timeout = TIMEOUT;
+ server_msg_release(msg);
return(0);
}
default: /* LB_PROTO */
/* the event was not accepted by the server */
/* update the event pointer */
- if(event_store_commit(msg->es, msg->ev_len, queue_list_is_log(eq), msg->generation) < 0)
- /* failure committing message, this is bad */
- return(-1);
+ if(event_store_commit(msg->es, msg->ev_len, queue_list_is_log(eq), msg->generation) < 0) {
+ /* failure committing message, this is bad */
+ server_msg_release(msg);
+ return(-1);
+ }
/* if we have just delivered priority message from the queue, send confirmation */
ret = 1;
#if defined(INTERLOGD_EMS)
#endif
msg->expires = src->expires;
msg->generation = src->generation;
+ msg->use_count = 0;
+ pthread_mutex_init(&msg->use_lock, NULL);
return(msg);
}
memset(msg, 0, sizeof(*msg));
+ pthread_mutex_init(&msg->use_lock, NULL);
+ msg->use_count = 0;
#if defined(IL_NOTIFICATIONS)
{
assert(msg != NULL);
- if(msg->msg) free(msg->msg);
- if(msg->job_id_s) free(msg->job_id_s);
+ if(server_msg_release(msg) <= 0) {
+ pthread_mutex_destroy(&msg->use_lock);
+ if(msg->msg) free(msg->msg);
+ if(msg->job_id_s) free(msg->job_id_s);
#if defined(IL_NOTIFICATIONS)
- if(msg->dest_name) free(msg->dest_name);
- if(msg->dest) free(msg->dest);
- if(msg->owner) free(msg->owner);
+ if(msg->dest_name) free(msg->dest_name);
+ if(msg->dest) free(msg->dest);
+ if(msg->owner) free(msg->owner);
#endif
- free(msg);
+ free(msg);
+ }
return 0;
}
+
+
+void
+server_msg_use(struct server_msg *msg)
+{
+ assert(msg != NULL);
+
+ pthread_mutex_lock(&msg->use_lock);
+ (msg->use_count)++;
+ pthread_mutex_unlock(&msg->use_lock);
+}
+
+
+int
+server_msg_release(struct server_msg *msg)
+{
+ int ret;
+
+ assert(msg != NULL);
+
+ pthread_mutex_lock(&msg->use_lock);
+ ret = --(msg->use_count);
+ pthread_mutex_unlock(&msg->use_lock);
+ return ret;
+}