#ident "$Header$"
-/*
+/*
* - general queue handling routines (insert, get)
*/
struct event_queue *eq;
char *p;
- p = strchr(server_name, ':');
-
- if(p)
- *p++ = 0;
-
if((eq = malloc(sizeof(*eq))) == NULL) {
set_error(IL_NOMEM, ENOMEM, "event_queue_create: error allocating event queue");
return(NULL);
memset(eq, 0, sizeof(*eq));
- eq->dest_name = strdup(server_name);
+ eq->dest = strdup(server_name);
+ p = strchr(server_name, ':');
+ if(p)
+ *p++ = 0;
+ eq->dest_name = strdup(server_name);
if(p)
*(p-1) = ':';
-
+
#if defined(IL_NOTIFICATIONS) || defined(IL_WS)
eq->dest_port = atoi(p);
#else
#if defined(INTERLOGD_HANDLE_CMD) && defined(INTERLOGD_FLUSH)
pthread_cond_destroy(&eq->flush_cond);
#endif
+
+ if(eq->dest_name)
+ free(eq->dest_name);
+ if(eq->dest)
+ free(eq->dest);
free(eq);
return(0);
#if defined(INTERLOGD_EMS)
struct event_queue_msg *tail;
#endif
-
+
assert(eq != NULL);
- if((el = malloc(sizeof(*el))) == NULL)
+ if((el = malloc(sizeof(*el))) == NULL)
return(set_error(IL_NOMEM, ENOMEM, "event_queue_insert: not enough room for queue element"));
el->msg = server_msg_copy(msg);
eq->tail = el;
}
eq->tail_ems = el;
- } else
+ } else
#endif
- {
+ {
/* normal messages */
if(eq->tail)
eq->tail->prev = el;
#if defined(INTERLOGD_EMS)
/* if we are inserting message between mark_prev and mark_this,
we have to adjust mark_prev accordingly */
- if(eq->mark_this && (el->prev == eq->mark_this))
+ if(eq->mark_this && (el->prev == eq->mark_this))
eq->mark_prev = el;
#endif
assert(eq != NULL);
assert(msg != NULL);
-
+
event_queue_lock(eq);
el = eq->head;
#if defined(INTERLOGD_EMS)
}
-int
+int
event_queue_remove(struct event_queue *eq)
{
struct event_queue_msg *el;
eq->tail = NULL;
}
#endif
- if(--eq->cur_len == 0)
+ if(--eq->cur_len == 0)
eq->times_empty++;
event_queue_unlock(eq);
/* end of critical section */
-
+
server_msg_free(el->msg);
free(el);
}
int
-event_queue_move_events(struct event_queue *eq_s,
- struct event_queue *eq_d,
- int (*cmp_func)(struct server_msg *, void *),
+event_queue_move_events(struct event_queue *eq_s,
+ struct event_queue *eq_d,
+ int (*cmp_func)(struct server_msg *, void *),
void *data)
{
struct event_queue_msg *p, **source_prev, **dest_tail;
while(p) {
if((*cmp_func)(p->msg, data)) {
il_log(LOG_DEBUG, " moving event at offset %d(%d) from %s:%d to %s:%d\n",
- p->msg->offset, p->msg->generation, eq_s->dest_name, eq_s->dest_port,
+ p->msg->offset, p->msg->generation, eq_s->dest_name, eq_s->dest_port,
eq_d ? eq_d->dest_name : "trash", eq_d ? eq_d->dest_port : -1);
/* il_log(LOG_DEBUG, " current: %x, next: %x\n", p, p->prev); */
/* remove the message from the source list */
return 0;
}
+static
+int
+cmp_jobid(struct server_msg *msg, void *data)
+{
+ char *job_id_s = (char*)data;
+ return strcmp(msg->job_id_s, job_id_s) == 0;
+}
+
+static
+int
+cmp_jobid_set_exp(struct server_msg *msg, void *data)
+{
+ struct server_msg *m = (struct server_msg *)data;
+
+ if(strcmp(msg->job_id_s, m->job_id_s) == 0) {
+ msg->expires = m->expires;
+ }
+ return 0;
+}
/*
* event_store_recover()
struct flock efl;
char err_msg[128];
struct stat stbuf;
+#if defined(IL_NOTIFICATIONS)
+ char *last_dest = NULL;
+ time_t last_exp = 0;
+#endif
assert(es != NULL);
#if defined(IL_NOTIFICATIONS)
- /* destination queue has to be found for each message separately */
+ /* destination queue has to be found for each message separately, */
+ /* this is current known destination for our notification id (may be NULL!) */
+ eq_b = notifid_map_get_dest(es->job_id_s);
#else
+ /* get log server queue */
+ eq_l = queue_list_get(NULL);
/* find bookkeeping server queue */
eq_b = queue_list_get(es->job_id_s);
if(eq_b == NULL)
return(-1);
#endif
-#if !defined(IL_NOTIFICATIONS)
- /* get log server queue */
- eq_l = queue_list_get(NULL);
-#endif
-
/* lock the event_store and offset locks */
event_store_lock(es);
if(pthread_rwlock_wrlock(&es->offset_lock))
msg->es = es;
msg->generation = es->generation;
- /* first enqueue to the LS */
- if(!bs_only && (last >= last_ls)) {
-
- il_log(LOG_DEBUG, " queuing event at %ld to logging server\n", last);
-
-#if !defined(IL_NOTIFICATIONS)
- if(enqueue_msg(eq_l, msg) < 0)
- break;
-#endif
- }
-
#ifdef IL_NOTIFICATIONS
- eq_b = queue_list_get(msg->dest);
- /* if the message does not have destination itself, use destination cached for notification id */
- if(eq_b == NULL) {
- eq_b = notifid_map_get_dest(msg->job_id_s);
+ /* check message destination */
+ if(msg->dest == NULL) {
+ /* the message does not have destination itself, use destination cached for notification id */
if(eq_b == NULL) {
- /* message has no destination and no destination is known for notification id,
- * commit it immediately
- */
+ /* no destination is known for notification id, commit it immediately */
il_log(LOG_DEBUG, " message has no known destination, will not be sent\n");
event_store_commit(es, msg->ev_len, 0, msg->generation);
- /* if the expiration changed, set new one now, message will be discarded soon */
- if(msg->expires != notifid_map_get_expiration(msg->job_id_s)) {
- notifid_map_set_expiration(msg->job_id_s, msg->expires);
- }
}
+ } else {
+ /* check if we know destination for notification id */
+ if(eq_b == NULL) {
+ eq_b = queue_list_get(msg->dest);
+ } else {
+ if(last_dest == NULL || strcmp(msg->dest, last_dest) != 0) {
+ /* destination changed */
+ last_dest = msg->dest;
+ }
+ }
+ }
+
+ /* check message expiration */
+ if(last_exp == 0 || last_exp != msg->expires) {
+ last_exp = msg->expires;
+ }
+ #else
+ /* first enqueue to the LS */
+ if(!bs_only && (last >= last_ls)) {
+
+ il_log(LOG_DEBUG, " queuing event at %ld to logging server\n", last);
+
+ if(enqueue_msg(eq_l, msg) < 0) {
+ break;
+ }
}
#endif
- /* now enqueue to the BS, if neccessary */
+ /* now enqueue to the BS, if necessary */
if((eq_b != eq_l) &&
(last >= last_bs)) {
- il_log(LOG_DEBUG, " queueing event at %ld to bookkeeping server\n", last);
+ il_log(LOG_DEBUG, " queuing event at %ld to bookkeeping server\n", last);
- if(enqueue_msg(eq_b, msg) < 0)
+ if(enqueue_msg(eq_b, msg) < 0) {
break;
+ }
}
server_msg_free(msg);
msg = NULL;
} /* while */
+#if defined(IL_NOTIFICATIONS)
+ /* check if we have to move events to new destination */
+ if(last_dest && strcmp(last_dest, eq_b->dest)) {
+ struct event_queue *eq_dest = queue_list_get(last_dest);
+
+ /* set new destination */
+ if(notifid_map_set_dest(es->job_id_s, eq) < 0) {
+ ret = -1; break;
+ }
+
+ /* move all events with this notif_id from eq_b to eq_dest */
+ event_queue_move_events(eq_b, eq_dest, cmp_jobid, es->job_id_s);
+ eq_b = eq_dest;
+
+ /* XXX - we should kill the old queue too */
+ }
+
+ /* if the expiration changed, set new one */
+ if(last_exp != notifid_map_get_expiration(es->job_id_s)) {
+ notifid_map_set_expiration(es->job_id_s, last_exp);
+ /* set expiration for all events with this notif id */
+ event_queue_move_events(eq_b, NULL, cmp_jobid_set_exp, msg);
+ }
+#endif
+
es->offset = last;
es->last_modified = stbuf.st_mtime;
il_log(LOG_DEBUG, " event store offset set to %ld\n", last);
#include "glite/lb/lb_perftest.h"
#endif
-static
int
-cmp_jobid(struct server_msg *msg, void *data)
-{
- char *job_id_s = (char*)data;
- return strcmp(msg->job_id_s, job_id_s) == 0;
-}
-
-static
-int
-cmp_jobid_set_exp(struct server_msg *msg, void *data)
-{
- struct server_msg *m = (struct server_msg *)data;
-
- if(strcmp(msg->job_id_s, m->job_id_s) == 0) {
- msg->expires = m->expires;
- }
- return 0;
-}
-
-
-int
enqueue_msg(struct event_queue *eq, struct server_msg *msg)
{
-#if defined(IL_NOTIFICATIONS)
- struct event_queue *eq_known;
-
- /* now we have a new event with possibly changed destination,
- so check for the already known destination and possibly move
- events from the original output queue to a new one */
- eq_known = notifid_map_get_dest(msg->job_id_s);
- if(eq != eq_known) {
- /* client has changed delivery address for this notification */
- if(notifid_map_set_dest(msg->job_id_s, eq) < 0)
- return(-1);
- /* move all events with this notif_id from eq_known to eq */
- if(eq_known != NULL) {
- event_queue_move_events(eq_known, eq, cmp_jobid, msg->job_id_s);
- /* XXX - we should kill the old queue too */
- }
- }
-
- /* if the expiration changed, set new one */
- if(msg->expires != notifid_map_get_expiration(msg->job_id_s)) {
- notifid_map_set_expiration(msg->job_id_s, msg->expires);
- /* set expiration for all events with this notif id */
- event_queue_move_events(eq, NULL, cmp_jobid_set_exp, msg);
- }
-#endif
-
/* fire thread to take care of this queue */
- if(event_queue_create_thread(eq) < 0)
+ if(event_queue_create_thread(eq) < 0)
return(-1);
-
+
#if defined(IL_NOTIFICATIONS)
- /* if there are no data to send, do not send anything
+ /* if there are no data to send, do not send anything
(messsage was just to change the delivery address) */
/* CORRECTION - let the message pass through the output queue
to commit it properly and keep event_store in sync */
- /* if(msg->len == 0)
+ /* if(msg->len == 0)
return(0);
*/
#endif
event_queue_cond_unlock(eq);
return(-1);
}
-
+
/* signal thread that we have a new message */
event_queue_signal(eq);
#endif /* INTERLOGD_FLUSH */
#ifdef INTERLOGD_HANDLE_CMD
-static
+static
int
parse_cmd(char *event, char **job_id_s, long *receipt, int *timeout)
{
continue;
}
if(strncmp(token, "DG.COMMAND", r - token) == 0) {
-#if defined(INTERLOGD_FLUSH)
+#if defined(INTERLOGD_FLUSH)
if(strcmp(++r, "\"flush\"")) {
#endif
il_log(LOG_WARNING, " command %s not implemented\n", r);
#endif
} else if(strncmp(token, "DG.JOBID", r - token) == 0) {
char *p;
-
+
r += 2; /* skip =" */
p = index(r, '"');
if(p == NULL) { ret = -1; continue; }
} else if(strncmp(token, "DG.LLLID", r - token) == 0) {
sscanf(++r, "%ld", receipt);
}
-
+
}
return(0);
}
* -1 - failure
*/
-static
-int
+static
+int
handle_cmd(il_octet_string_t *event, long offset)
{
char *job_id_s;
struct timeval tv;
/* parse command */
- if(parse_cmd(event->data, &job_id_s, &receipt, &timeout) < 0)
+ if(parse_cmd(event->data, &job_id_s, &receipt, &timeout) < 0)
return(0);
#if defined(INTERLOGD_FLUSH)
error_get_msg());
clear_error();
}
- } else
+ } else
/* this call does not fail :-) */
event_store_recover_all();
result = (ret == ETIMEDOUT) ? 0 : -1;
break;
}
-
+
/* collect results from reporting threads */
if(job_id_s) {
/* find appropriate queue */
if(eq->flushing == 2) {
eq->flushing = 0;
num_replies++;
- result = ((result == 1) || (eq->flush_result < 0)) ?
+ result = ((result == 1) || (eq->flush_result < 0)) ?
eq->flush_result : result;
}
event_queue_cond_unlock(eq);
eq->flushing = 0;
num_replies++;
il_log(LOG_DEBUG, " thread reply: %d\n", eq->flush_result);
- result = ((result == 1) || (eq->flush_result < 0)) ?
+ result = ((result == 1) || (eq->flush_result < 0)) ?
eq->flush_result : result;
}
event_queue_cond_unlock(eq);
if(eq->flushing == 2) {
eq->flushing = 0;
num_replies++;
- result = ((result == 1) || (eq->flush_result < 0)) ?
+ result = ((result == 1) || (eq->flush_result < 0)) ?
eq->flush_result : result;
}
event_queue_cond_unlock(eq);
}
/* prevent deadlock in next flush */
- if(pthread_mutex_unlock(&flush_lock) < 0)
+ if(pthread_mutex_unlock(&flush_lock) < 0)
abort();
}
if(job_id_s) free(job_id_s);
result = send_confirmation(receipt, result);
- if(result <= 0)
+ if(result <= 0)
il_log(LOG_ERR, "handle_cmd: error sending status: %s\n", error_get_msg());
return(1);
#endif /* INTERLOGD_HANDLE_CMD */
-static
+static
int
handle_msg(il_octet_string_t *event, long offset)
-{
+{
struct server_msg *msg = NULL;
#if !defined(IL_NOTIFICATIONS)
struct event_queue *eq_l;
il_log(LOG_ERR, " handle_msg: error parsing event '%s':\n %s\n", event, error_get_msg());
return(0);
}
-
+
/* sync event store with IPC (if neccessary)
* This MUST be called before inserting event into output queue! */
- if((es = event_store_find(msg->job_id_s, NULL)) == NULL)
+ if((es = event_store_find(msg->job_id_s, NULL)) == NULL)
return(-1);
msg->es = es;
-
+
#ifdef LB_PERF
- if(nosync)
+ if(nosync)
ret = 1;
- else
+ else
#endif
ret = event_store_sync(es, offset);
/* no longer informative:
#else
eq_s = queue_list_get(msg->job_id_s);
#endif
- if(eq_s == NULL) {
+ if(eq_s == NULL) {
il_log(LOG_ERR, " handle_msg: apropriate queue not found: %s\n", error_get_msg());
clear_error();
} else {
-int
+int
loop()
{
/* receive events */
il_octet_string_t *msg;
long offset;
int ret;
-
+
if(killflg)
exit(0);
clear_error();
- if((ret = input_queue_get(&msg, &offset, INPUT_TIMEOUT)) < 0)
+ if((ret = input_queue_get(&msg, &offset, INPUT_TIMEOUT)) < 0)
{
if(error_get_maj() == IL_PROTO) {
il_log(LOG_DEBUG, " premature EOF while receiving event\n");
event_store_recover_all();
#endif
continue;
- } else
+ } else
return(-1);
}
else if(ret == 0) {
continue;
#endif
-#ifdef INTERLOGD_HANDLE_CMD
+#ifdef INTERLOGD_HANDLE_CMD
ret = handle_cmd(msg, offset);
if(ret == 0)
#endif
case IL_NOMEM:
return (ret);
break;
- default:
+ default:
il_log(LOG_ERR, "Error: %s\n", error_get_msg());
break;
}
edg_wll_GssConnection gss; /* GSS connection */
char *dest_name;
int dest_port;
+ char *dest;
int timeout; /* queue timeout */
struct event_queue_msg *tail; /* last message in the queue */
struct event_queue_msg *head; /* first message in the queue */