From cbd2f03a2bae1e1a54bf282f7f1790f73cb565a0 Mon Sep 17 00:00:00 2001 From: =?utf8?q?Michal=20Voc=C5=AF?= Date: Fri, 17 Apr 2009 16:04:54 +0000 Subject: [PATCH] defer moving events between queues until final destination was found --- org.glite.lb.logger/src/event_queue.c | 46 +++++++------ org.glite.lb.logger/src/event_store.c | 117 +++++++++++++++++++++++++--------- org.glite.lb.logger/src/il_master.c | 113 ++++++++++---------------------- org.glite.lb.logger/src/interlogd.h | 1 + 4 files changed, 145 insertions(+), 132 deletions(-) diff --git a/org.glite.lb.logger/src/event_queue.c b/org.glite.lb.logger/src/event_queue.c index 9ce47f6..18bdcdc 100644 --- a/org.glite.lb.logger/src/event_queue.c +++ b/org.glite.lb.logger/src/event_queue.c @@ -1,6 +1,6 @@ #ident "$Header$" -/* +/* * - general queue handling routines (insert, get) */ @@ -27,11 +27,6 @@ event_queue_create(char *server_name) struct event_queue *eq; char *p; - p = strchr(server_name, ':'); - - if(p) - *p++ = 0; - if((eq = malloc(sizeof(*eq))) == NULL) { set_error(IL_NOMEM, ENOMEM, "event_queue_create: error allocating event queue"); return(NULL); @@ -39,11 +34,15 @@ event_queue_create(char *server_name) memset(eq, 0, sizeof(*eq)); - eq->dest_name = strdup(server_name); + eq->dest = strdup(server_name); + p = strchr(server_name, ':'); + if(p) + *p++ = 0; + eq->dest_name = strdup(server_name); if(p) *(p-1) = ':'; - + #if defined(IL_NOTIFICATIONS) || defined(IL_WS) eq->dest_port = atoi(p); #else @@ -96,6 +95,11 @@ event_queue_free(struct event_queue *eq) #if defined(INTERLOGD_HANDLE_CMD) && defined(INTERLOGD_FLUSH) pthread_cond_destroy(&eq->flush_cond); #endif + + if(eq->dest_name) + free(eq->dest_name); + if(eq->dest) + free(eq->dest); free(eq); return(0); @@ -124,10 +128,10 @@ event_queue_insert(struct event_queue *eq, struct server_msg *msg) #if defined(INTERLOGD_EMS) struct event_queue_msg *tail; #endif - + assert(eq != NULL); - if((el = malloc(sizeof(*el))) == NULL) + if((el = malloc(sizeof(*el))) == NULL) return(set_error(IL_NOMEM, ENOMEM, "event_queue_insert: not enough room for queue element")); el->msg = server_msg_copy(msg); @@ -154,9 +158,9 @@ event_queue_insert(struct event_queue *eq, struct server_msg *msg) eq->tail = el; } eq->tail_ems = el; - } else + } else #endif - { + { /* normal messages */ if(eq->tail) eq->tail->prev = el; @@ -168,7 +172,7 @@ event_queue_insert(struct event_queue *eq, struct server_msg *msg) #if defined(INTERLOGD_EMS) /* if we are inserting message between mark_prev and mark_this, we have to adjust mark_prev accordingly */ - if(eq->mark_this && (el->prev == eq->mark_this)) + if(eq->mark_this && (el->prev == eq->mark_this)) eq->mark_prev = el; #endif @@ -189,7 +193,7 @@ event_queue_get(struct event_queue *eq, struct server_msg **msg) assert(eq != NULL); assert(msg != NULL); - + event_queue_lock(eq); el = eq->head; #if defined(INTERLOGD_EMS) @@ -208,7 +212,7 @@ event_queue_get(struct event_queue *eq, struct server_msg **msg) } -int +int event_queue_remove(struct event_queue *eq) { struct event_queue_msg *el; @@ -258,12 +262,12 @@ event_queue_remove(struct event_queue *eq) eq->tail = NULL; } #endif - if(--eq->cur_len == 0) + if(--eq->cur_len == 0) eq->times_empty++; event_queue_unlock(eq); /* end of critical section */ - + server_msg_free(el->msg); free(el); @@ -271,9 +275,9 @@ event_queue_remove(struct event_queue *eq) } int -event_queue_move_events(struct event_queue *eq_s, - struct event_queue *eq_d, - int (*cmp_func)(struct server_msg *, void *), +event_queue_move_events(struct event_queue *eq_s, + struct event_queue *eq_d, + int (*cmp_func)(struct server_msg *, void *), void *data) { struct event_queue_msg *p, **source_prev, **dest_tail; @@ -292,7 +296,7 @@ event_queue_move_events(struct event_queue *eq_s, while(p) { if((*cmp_func)(p->msg, data)) { il_log(LOG_DEBUG, " moving event at offset %d(%d) from %s:%d to %s:%d\n", - p->msg->offset, p->msg->generation, eq_s->dest_name, eq_s->dest_port, + p->msg->offset, p->msg->generation, eq_s->dest_name, eq_s->dest_port, eq_d ? eq_d->dest_name : "trash", eq_d ? eq_d->dest_port : -1); /* il_log(LOG_DEBUG, " current: %x, next: %x\n", p, p->prev); */ /* remove the message from the source list */ diff --git a/org.glite.lb.logger/src/event_store.c b/org.glite.lb.logger/src/event_store.c index e584e89..78cc5e4 100644 --- a/org.glite.lb.logger/src/event_store.c +++ b/org.glite.lb.logger/src/event_store.c @@ -481,6 +481,25 @@ event_store_recover_jobid(struct event_store *es) return 0; } +static +int +cmp_jobid(struct server_msg *msg, void *data) +{ + char *job_id_s = (char*)data; + return strcmp(msg->job_id_s, job_id_s) == 0; +} + +static +int +cmp_jobid_set_exp(struct server_msg *msg, void *data) +{ + struct server_msg *m = (struct server_msg *)data; + + if(strcmp(msg->job_id_s, m->job_id_s) == 0) { + msg->expires = m->expires; + } + return 0; +} /* * event_store_recover() @@ -500,23 +519,26 @@ event_store_recover(struct event_store *es) struct flock efl; char err_msg[128]; struct stat stbuf; +#if defined(IL_NOTIFICATIONS) + char *last_dest = NULL; + time_t last_exp = 0; +#endif assert(es != NULL); #if defined(IL_NOTIFICATIONS) - /* destination queue has to be found for each message separately */ + /* destination queue has to be found for each message separately, */ + /* this is current known destination for our notification id (may be NULL!) */ + eq_b = notifid_map_get_dest(es->job_id_s); #else + /* get log server queue */ + eq_l = queue_list_get(NULL); /* find bookkeeping server queue */ eq_b = queue_list_get(es->job_id_s); if(eq_b == NULL) return(-1); #endif -#if !defined(IL_NOTIFICATIONS) - /* get log server queue */ - eq_l = queue_list_get(NULL); -#endif - /* lock the event_store and offset locks */ event_store_lock(es); if(pthread_rwlock_wrlock(&es->offset_lock)) @@ -700,44 +722,52 @@ event_store_recover(struct event_store *es) msg->es = es; msg->generation = es->generation; - /* first enqueue to the LS */ - if(!bs_only && (last >= last_ls)) { - - il_log(LOG_DEBUG, " queuing event at %ld to logging server\n", last); - -#if !defined(IL_NOTIFICATIONS) - if(enqueue_msg(eq_l, msg) < 0) - break; -#endif - } - #ifdef IL_NOTIFICATIONS - eq_b = queue_list_get(msg->dest); - /* if the message does not have destination itself, use destination cached for notification id */ - if(eq_b == NULL) { - eq_b = notifid_map_get_dest(msg->job_id_s); + /* check message destination */ + if(msg->dest == NULL) { + /* the message does not have destination itself, use destination cached for notification id */ if(eq_b == NULL) { - /* message has no destination and no destination is known for notification id, - * commit it immediately - */ + /* no destination is known for notification id, commit it immediately */ il_log(LOG_DEBUG, " message has no known destination, will not be sent\n"); event_store_commit(es, msg->ev_len, 0, msg->generation); - /* if the expiration changed, set new one now, message will be discarded soon */ - if(msg->expires != notifid_map_get_expiration(msg->job_id_s)) { - notifid_map_set_expiration(msg->job_id_s, msg->expires); - } } + } else { + /* check if we know destination for notification id */ + if(eq_b == NULL) { + eq_b = queue_list_get(msg->dest); + } else { + if(last_dest == NULL || strcmp(msg->dest, last_dest) != 0) { + /* destination changed */ + last_dest = msg->dest; + } + } + } + + /* check message expiration */ + if(last_exp == 0 || last_exp != msg->expires) { + last_exp = msg->expires; + } + #else + /* first enqueue to the LS */ + if(!bs_only && (last >= last_ls)) { + + il_log(LOG_DEBUG, " queuing event at %ld to logging server\n", last); + + if(enqueue_msg(eq_l, msg) < 0) { + break; + } } #endif - /* now enqueue to the BS, if neccessary */ + /* now enqueue to the BS, if necessary */ if((eq_b != eq_l) && (last >= last_bs)) { - il_log(LOG_DEBUG, " queueing event at %ld to bookkeeping server\n", last); + il_log(LOG_DEBUG, " queuing event at %ld to bookkeeping server\n", last); - if(enqueue_msg(eq_b, msg) < 0) + if(enqueue_msg(eq_b, msg) < 0) { break; + } } server_msg_free(msg); msg = NULL; @@ -750,6 +780,31 @@ event_store_recover(struct event_store *es) } /* while */ +#if defined(IL_NOTIFICATIONS) + /* check if we have to move events to new destination */ + if(last_dest && strcmp(last_dest, eq_b->dest)) { + struct event_queue *eq_dest = queue_list_get(last_dest); + + /* set new destination */ + if(notifid_map_set_dest(es->job_id_s, eq) < 0) { + ret = -1; break; + } + + /* move all events with this notif_id from eq_b to eq_dest */ + event_queue_move_events(eq_b, eq_dest, cmp_jobid, es->job_id_s); + eq_b = eq_dest; + + /* XXX - we should kill the old queue too */ + } + + /* if the expiration changed, set new one */ + if(last_exp != notifid_map_get_expiration(es->job_id_s)) { + notifid_map_set_expiration(es->job_id_s, last_exp); + /* set expiration for all events with this notif id */ + event_queue_move_events(eq_b, NULL, cmp_jobid_set_exp, msg); + } +#endif + es->offset = last; es->last_modified = stbuf.st_mtime; il_log(LOG_DEBUG, " event store offset set to %ld\n", last); diff --git a/org.glite.lb.logger/src/il_master.c b/org.glite.lb.logger/src/il_master.c index 384c89a..3299541 100644 --- a/org.glite.lb.logger/src/il_master.c +++ b/org.glite.lb.logger/src/il_master.c @@ -15,66 +15,19 @@ #include "glite/lb/lb_perftest.h" #endif -static int -cmp_jobid(struct server_msg *msg, void *data) -{ - char *job_id_s = (char*)data; - return strcmp(msg->job_id_s, job_id_s) == 0; -} - -static -int -cmp_jobid_set_exp(struct server_msg *msg, void *data) -{ - struct server_msg *m = (struct server_msg *)data; - - if(strcmp(msg->job_id_s, m->job_id_s) == 0) { - msg->expires = m->expires; - } - return 0; -} - - -int enqueue_msg(struct event_queue *eq, struct server_msg *msg) { -#if defined(IL_NOTIFICATIONS) - struct event_queue *eq_known; - - /* now we have a new event with possibly changed destination, - so check for the already known destination and possibly move - events from the original output queue to a new one */ - eq_known = notifid_map_get_dest(msg->job_id_s); - if(eq != eq_known) { - /* client has changed delivery address for this notification */ - if(notifid_map_set_dest(msg->job_id_s, eq) < 0) - return(-1); - /* move all events with this notif_id from eq_known to eq */ - if(eq_known != NULL) { - event_queue_move_events(eq_known, eq, cmp_jobid, msg->job_id_s); - /* XXX - we should kill the old queue too */ - } - } - - /* if the expiration changed, set new one */ - if(msg->expires != notifid_map_get_expiration(msg->job_id_s)) { - notifid_map_set_expiration(msg->job_id_s, msg->expires); - /* set expiration for all events with this notif id */ - event_queue_move_events(eq, NULL, cmp_jobid_set_exp, msg); - } -#endif - /* fire thread to take care of this queue */ - if(event_queue_create_thread(eq) < 0) + if(event_queue_create_thread(eq) < 0) return(-1); - + #if defined(IL_NOTIFICATIONS) - /* if there are no data to send, do not send anything + /* if there are no data to send, do not send anything (messsage was just to change the delivery address) */ /* CORRECTION - let the message pass through the output queue to commit it properly and keep event_store in sync */ - /* if(msg->len == 0) + /* if(msg->len == 0) return(0); */ #endif @@ -86,7 +39,7 @@ enqueue_msg(struct event_queue *eq, struct server_msg *msg) event_queue_cond_unlock(eq); return(-1); } - + /* signal thread that we have a new message */ event_queue_signal(eq); @@ -103,7 +56,7 @@ pthread_cond_t flush_cond = PTHREAD_COND_INITIALIZER; #endif /* INTERLOGD_FLUSH */ #ifdef INTERLOGD_HANDLE_CMD -static +static int parse_cmd(char *event, char **job_id_s, long *receipt, int *timeout) { @@ -125,7 +78,7 @@ parse_cmd(char *event, char **job_id_s, long *receipt, int *timeout) continue; } if(strncmp(token, "DG.COMMAND", r - token) == 0) { -#if defined(INTERLOGD_FLUSH) +#if defined(INTERLOGD_FLUSH) if(strcmp(++r, "\"flush\"")) { #endif il_log(LOG_WARNING, " command %s not implemented\n", r); @@ -136,7 +89,7 @@ parse_cmd(char *event, char **job_id_s, long *receipt, int *timeout) #endif } else if(strncmp(token, "DG.JOBID", r - token) == 0) { char *p; - + r += 2; /* skip =" */ p = index(r, '"'); if(p == NULL) { ret = -1; continue; } @@ -147,7 +100,7 @@ parse_cmd(char *event, char **job_id_s, long *receipt, int *timeout) } else if(strncmp(token, "DG.LLLID", r - token) == 0) { sscanf(++r, "%ld", receipt); } - + } return(0); } @@ -159,8 +112,8 @@ parse_cmd(char *event, char **job_id_s, long *receipt, int *timeout) * -1 - failure */ -static -int +static +int handle_cmd(il_octet_string_t *event, long offset) { char *job_id_s; @@ -172,7 +125,7 @@ handle_cmd(il_octet_string_t *event, long offset) struct timeval tv; /* parse command */ - if(parse_cmd(event->data, &job_id_s, &receipt, &timeout) < 0) + if(parse_cmd(event->data, &job_id_s, &receipt, &timeout) < 0) return(0); #if defined(INTERLOGD_FLUSH) @@ -194,7 +147,7 @@ handle_cmd(il_octet_string_t *event, long offset) error_get_msg()); clear_error(); } - } else + } else /* this call does not fail :-) */ event_store_recover_all(); @@ -258,7 +211,7 @@ handle_cmd(il_octet_string_t *event, long offset) result = (ret == ETIMEDOUT) ? 0 : -1; break; } - + /* collect results from reporting threads */ if(job_id_s) { /* find appropriate queue */ @@ -269,7 +222,7 @@ handle_cmd(il_octet_string_t *event, long offset) if(eq->flushing == 2) { eq->flushing = 0; num_replies++; - result = ((result == 1) || (eq->flush_result < 0)) ? + result = ((result == 1) || (eq->flush_result < 0)) ? eq->flush_result : result; } event_queue_cond_unlock(eq); @@ -283,7 +236,7 @@ handle_cmd(il_octet_string_t *event, long offset) eq->flushing = 0; num_replies++; il_log(LOG_DEBUG, " thread reply: %d\n", eq->flush_result); - result = ((result == 1) || (eq->flush_result < 0)) ? + result = ((result == 1) || (eq->flush_result < 0)) ? eq->flush_result : result; } event_queue_cond_unlock(eq); @@ -297,7 +250,7 @@ handle_cmd(il_octet_string_t *event, long offset) if(eq->flushing == 2) { eq->flushing = 0; num_replies++; - result = ((result == 1) || (eq->flush_result < 0)) ? + result = ((result == 1) || (eq->flush_result < 0)) ? eq->flush_result : result; } event_queue_cond_unlock(eq); @@ -305,7 +258,7 @@ handle_cmd(il_octet_string_t *event, long offset) } /* prevent deadlock in next flush */ - if(pthread_mutex_unlock(&flush_lock) < 0) + if(pthread_mutex_unlock(&flush_lock) < 0) abort(); @@ -320,7 +273,7 @@ handle_cmd(il_octet_string_t *event, long offset) } if(job_id_s) free(job_id_s); result = send_confirmation(receipt, result); - if(result <= 0) + if(result <= 0) il_log(LOG_ERR, "handle_cmd: error sending status: %s\n", error_get_msg()); return(1); @@ -335,10 +288,10 @@ cmd_error: #endif /* INTERLOGD_HANDLE_CMD */ -static +static int handle_msg(il_octet_string_t *event, long offset) -{ +{ struct server_msg *msg = NULL; #if !defined(IL_NOTIFICATIONS) struct event_queue *eq_l; @@ -353,17 +306,17 @@ handle_msg(il_octet_string_t *event, long offset) il_log(LOG_ERR, " handle_msg: error parsing event '%s':\n %s\n", event, error_get_msg()); return(0); } - + /* sync event store with IPC (if neccessary) * This MUST be called before inserting event into output queue! */ - if((es = event_store_find(msg->job_id_s, NULL)) == NULL) + if((es = event_store_find(msg->job_id_s, NULL)) == NULL) return(-1); msg->es = es; - + #ifdef LB_PERF - if(nosync) + if(nosync) ret = 1; - else + else #endif ret = event_store_sync(es, offset); /* no longer informative: @@ -393,7 +346,7 @@ handle_msg(il_octet_string_t *event, long offset) #else eq_s = queue_list_get(msg->job_id_s); #endif - if(eq_s == NULL) { + if(eq_s == NULL) { il_log(LOG_ERR, " handle_msg: apropriate queue not found: %s\n", error_get_msg()); clear_error(); } else { @@ -428,7 +381,7 @@ err: -int +int loop() { /* receive events */ @@ -436,12 +389,12 @@ loop() il_octet_string_t *msg; long offset; int ret; - + if(killflg) exit(0); clear_error(); - if((ret = input_queue_get(&msg, &offset, INPUT_TIMEOUT)) < 0) + if((ret = input_queue_get(&msg, &offset, INPUT_TIMEOUT)) < 0) { if(error_get_maj() == IL_PROTO) { il_log(LOG_DEBUG, " premature EOF while receiving event\n"); @@ -450,7 +403,7 @@ loop() event_store_recover_all(); #endif continue; - } else + } else return(-1); } else if(ret == 0) { @@ -463,7 +416,7 @@ loop() continue; #endif -#ifdef INTERLOGD_HANDLE_CMD +#ifdef INTERLOGD_HANDLE_CMD ret = handle_cmd(msg, offset); if(ret == 0) #endif @@ -475,7 +428,7 @@ loop() case IL_NOMEM: return (ret); break; - default: + default: il_log(LOG_ERR, "Error: %s\n", error_get_msg()); break; } diff --git a/org.glite.lb.logger/src/interlogd.h b/org.glite.lb.logger/src/interlogd.h index b2fdde4..edbbad0 100644 --- a/org.glite.lb.logger/src/interlogd.h +++ b/org.glite.lb.logger/src/interlogd.h @@ -158,6 +158,7 @@ struct event_queue { edg_wll_GssConnection gss; /* GSS connection */ char *dest_name; int dest_port; + char *dest; int timeout; /* queue timeout */ struct event_queue_msg *tail; /* last message in the queue */ struct event_queue_msg *head; /* first message in the queue */ -- 1.8.2.3