From: Michal Voců Date: Mon, 11 Jun 2012 14:05:27 +0000 (+0000) Subject: fix for SB#95013 and maybe couple of others X-Git-Tag: merge_32_head_take_2_src~2 X-Git-Url: http://scientific.zcu.cz/git/?a=commitdiff_plain;h=7ecffbbd0aa6c80c23df988a46927a185c11b1a6;p=jra1mw.git fix for SB#95013 and maybe couple of others --- diff --git a/org.glite.lb.logger/interface/interlogd.h b/org.glite.lb.logger/interface/interlogd.h index c2ef7d0..1420cea 100644 --- a/org.glite.lb.logger/interface/interlogd.h +++ b/org.glite.lb.logger/interface/interlogd.h @@ -180,6 +180,8 @@ struct server_msg { char *dest; char *owner; time_t expires; /* time (in seconds from epoch) the message expires */ + int use_count; /* number of queue threads trying to deliver this message */ + pthread_mutex_t use_lock; /* protect the use_count */ }; @@ -240,6 +242,8 @@ int server_msg_init(struct server_msg *, il_octet_string_t *); int server_msg_is_priority(struct server_msg *); #endif int server_msg_free(struct server_msg *); +void server_msg_use(struct server_msg *); +int server_msg_release(struct server_msg *); /* general event queue methods */ struct event_queue *event_queue_create(char *, struct il_output_plugin *); diff --git a/org.glite.lb.logger/src/event_queue.c b/org.glite.lb.logger/src/event_queue.c index f8ec91a..001982d 100644 --- a/org.glite.lb.logger/src/event_queue.c +++ b/org.glite.lb.logger/src/event_queue.c @@ -37,6 +37,7 @@ struct event_queue_msg { struct server_msg *msg; struct event_queue_msg *prev; struct event_queue_msg *next; + struct event_queue *eq; /* queue we are currently in */ }; struct event_queue * @@ -257,6 +258,8 @@ event_queue_insert(struct event_queue *eq, struct server_msg *msg) if(++eq->cur_len > eq->max_len) eq->max_len = eq->cur_len; + el->eq = eq; + event_queue_unlock(eq); /* end of critical section */ @@ -306,6 +309,7 @@ event_queue_get(struct event_queue *eq, struct queue_thread *me, struct server_m me->current = el; me->jobid = el->msg->job_id_s; *msg = el->msg; + server_msg_use(*msg); } event_queue_unlock(eq); @@ -331,19 +335,37 @@ event_queue_remove(struct event_queue *eq, struct queue_thread *me) /* this is critical section */ event_queue_lock(eq); - if(el == el->prev) { - /* last element */ - eq->head = NULL; - eq->tail_ems = NULL; - } else { - el->next->prev = el->prev; - el->prev->next = el->next; - if(el == eq->head) { - eq->head = el->prev; + me->current = NULL; + me->jobid = NULL; + + if(eq == el->eq) { + /* message is still ours, remove from queue */ + if(el == el->prev) { + /* last element */ + eq->head = NULL; + eq->tail_ems = NULL; + } else { + el->next->prev = el->prev; + el->prev->next = el->next; + if(el == eq->head) { + eq->head = el->prev; + } + if(el == eq->tail_ems) { + eq->tail_ems = el->prev; + } } - if(el == eq->tail_ems) { - eq->tail_ems = el->prev; + + if(--eq->cur_len == 0) + eq->times_empty++; + + if(eq->cur_len <= queue_size_low) { + eq->throttling = 0; } + } else { + /* not ours anymore */ + server_msg_release(el->msg); + event_queue_unlock(eq); + return 0; } #if 0 /* OLD IMPLEMENTATION */ @@ -387,16 +409,7 @@ event_queue_remove(struct event_queue *eq, struct queue_thread *me) #endif #endif /* OLD IMPLEMENTATION */ - if(--eq->cur_len == 0) - eq->times_empty++; - - if(eq->cur_len <= queue_size_low) { - eq->throttling = 0; - } - me->current = NULL; - me->jobid = NULL; - event_queue_unlock(eq); /* end of critical section */ @@ -439,7 +452,7 @@ event_queue_move_events(struct event_queue *eq_s, eq_d ? eq_d->dest_name : "trash", eq_d ? eq_d->dest_port : -1); q = p->prev; /* remove message from the source queue */ - if(p->next == p->prev) { + if(p == p->prev) { /* removing last message */ eq_s->head = NULL; eq_s->tail_ems = NULL; @@ -472,6 +485,7 @@ event_queue_move_events(struct event_queue *eq_s, if(++eq_d->cur_len > eq_d->max_len) { eq_d->max_len = eq_d->cur_len; } + p->eq = eq_d; } else { /* signal that the message was 'delivered' */ event_store_commit(p->msg->es, p->msg->ev_len, queue_list_is_log(eq_s), diff --git a/org.glite.lb.logger/src/send_event.c b/org.glite.lb.logger/src/send_event.c index 8b34cf5..a7847a2 100644 --- a/org.glite.lb.logger/src/send_event.c +++ b/org.glite.lb.logger/src/send_event.c @@ -321,6 +321,7 @@ event_queue_send(struct event_queue *eq, struct queue_thread *me) me->timeout = 0; else me->timeout = TIMEOUT; + server_msg_release(msg); return(0); } @@ -335,6 +336,7 @@ event_queue_send(struct event_queue *eq, struct queue_thread *me) glite_common_log(IL_LOG_CATEGORY, LOG_PRIORITY_WARN, " error reading server %s reply: %s", eq->dest_name, error_get_msg()); } + server_msg_release(msg); return(0); } } @@ -364,6 +366,7 @@ event_queue_send(struct event_queue *eq, struct queue_thread *me) if(!(ENOENT == code_min)) { /* non fatal errors (for us) */ me->timeout = TIMEOUT; + server_msg_release(msg); return(0); } @@ -374,9 +377,11 @@ event_queue_send(struct event_queue *eq, struct queue_thread *me) default: /* LB_PROTO */ /* the event was not accepted by the server */ /* update the event pointer */ - if(event_store_commit(msg->es, msg->ev_len, queue_list_is_log(eq), msg->generation) < 0) - /* failure committing message, this is bad */ - return(-1); + if(event_store_commit(msg->es, msg->ev_len, queue_list_is_log(eq), msg->generation) < 0) { + /* failure committing message, this is bad */ + server_msg_release(msg); + return(-1); + } /* if we have just delivered priority message from the queue, send confirmation */ ret = 1; #if defined(INTERLOGD_EMS) diff --git a/org.glite.lb.logger/src/server_msg.c b/org.glite.lb.logger/src/server_msg.c index 28ce300..8bea3ab 100644 --- a/org.glite.lb.logger/src/server_msg.c +++ b/org.glite.lb.logger/src/server_msg.c @@ -142,6 +142,8 @@ server_msg_copy(struct server_msg *src) #endif msg->expires = src->expires; msg->generation = src->generation; + msg->use_count = 0; + pthread_mutex_init(&msg->use_lock, NULL); return(msg); } @@ -160,6 +162,8 @@ server_msg_init(struct server_msg *msg, il_octet_string_t *event) memset(msg, 0, sizeof(*msg)); + pthread_mutex_init(&msg->use_lock, NULL); + msg->use_count = 0; #if defined(IL_NOTIFICATIONS) @@ -238,13 +242,41 @@ server_msg_free(struct server_msg *msg) { assert(msg != NULL); - if(msg->msg) free(msg->msg); - if(msg->job_id_s) free(msg->job_id_s); + if(server_msg_release(msg) <= 0) { + pthread_mutex_destroy(&msg->use_lock); + if(msg->msg) free(msg->msg); + if(msg->job_id_s) free(msg->job_id_s); #if defined(IL_NOTIFICATIONS) - if(msg->dest_name) free(msg->dest_name); - if(msg->dest) free(msg->dest); - if(msg->owner) free(msg->owner); + if(msg->dest_name) free(msg->dest_name); + if(msg->dest) free(msg->dest); + if(msg->owner) free(msg->owner); #endif - free(msg); + free(msg); + } return 0; } + + +void +server_msg_use(struct server_msg *msg) +{ + assert(msg != NULL); + + pthread_mutex_lock(&msg->use_lock); + (msg->use_count)++; + pthread_mutex_unlock(&msg->use_lock); +} + + +int +server_msg_release(struct server_msg *msg) +{ + int ret; + + assert(msg != NULL); + + pthread_mutex_lock(&msg->use_lock); + ret = --(msg->use_count); + pthread_mutex_unlock(&msg->use_lock); + return ret; +}