#ident "$Header$"
-/*
+/*
* - general queue handling routines (insert, get)
*/
char *p;
p = strchr(server_name, ':');
-
- if(p)
+
+ if(p)
*p++ = 0;
if((eq = malloc(sizeof(*eq))) == NULL) {
if(p)
*(p-1) = ':';
-
+
#if defined(IL_NOTIFICATIONS) || defined(IL_WS)
eq->dest_port = atoi(p);
#else
#if defined(INTERLOGD_EMS)
struct event_queue_msg *tail;
#endif
-
+
assert(eq != NULL);
- if((el = malloc(sizeof(*el))) == NULL)
+ if((el = malloc(sizeof(*el))) == NULL)
return(set_error(IL_NOMEM, ENOMEM, "event_queue_insert: not enough room for queue element"));
el->msg = server_msg_copy(msg);
eq->tail = el;
}
eq->tail_ems = el;
- } else
+ } else
#endif
- {
+ {
/* normal messages */
if(eq->tail)
eq->tail->prev = el;
#if defined(INTERLOGD_EMS)
/* if we are inserting message between mark_prev and mark_this,
we have to adjust mark_prev accordingly */
- if(eq->mark_this && (el->prev == eq->mark_this))
+ if(eq->mark_this && (el->prev == eq->mark_this))
eq->mark_prev = el;
#endif
assert(eq != NULL);
assert(msg != NULL);
-
+
event_queue_lock(eq);
el = eq->head;
#if defined(INTERLOGD_EMS)
}
-int
+int
event_queue_remove(struct event_queue *eq)
{
struct event_queue_msg *el;
eq->tail = NULL;
}
#endif
- if(--eq->cur_len == 0)
+ if(--eq->cur_len == 0)
eq->times_empty++;
event_queue_unlock(eq);
/* end of critical section */
-
+
server_msg_free(el->msg);
free(el);
}
int
-event_queue_move_events(struct event_queue *eq_s,
- struct event_queue *eq_d,
- int (*cmp_func)(struct server_msg *, void *),
+event_queue_move_events(struct event_queue *eq_s,
+ struct event_queue *eq_d,
+ int (*cmp_func)(struct server_msg *, void *),
void *data)
{
struct event_queue_msg *p, **source_prev, **dest_tail;
while(p) {
if((*cmp_func)(p->msg, data)) {
il_log(LOG_DEBUG, " moving event at offset %d(%d) from %s:%d to %s:%d\n",
- p->msg->offset, p->msg->generation, eq_s->dest_name, eq_s->dest_port,
+ p->msg->offset, p->msg->generation, eq_s->dest_name, eq_s->dest_port,
eq_d ? eq_d->dest_name : "trash", eq_d ? eq_d->dest_port : -1);
/* il_log(LOG_DEBUG, " current: %x, next: %x\n", p, p->prev); */
/* remove the message from the source list */
/*
- * - L/B server protocol handling routines
+ * - L/B server protocol handling routines
*/
#include "glite/jobid/cjobid.h"
#include "interlogd.h"
#if defined(INTERLOGD_EMS) || (defined(INTERLOGD_HANDLE_CMD) && defined(INTERLOGD_FLUSH))
-/*
+/*
* Send confirmation to client.
*
*/
}
-static
+static
int
confirm_msg(struct server_msg *msg, int code, int code_min)
{
code_min = EDG_WLL_IL_SYS;
break;
}
-
+
return(send_confirmation(msg->receipt_to, code_min));
}
#endif
edg_wll_gss_get_error(&gss_stat, "get_reply", &gss_err);
set_error(IL_DGGSS, ret, gss_err);
free(gss_err);
- } else
+ } else
set_error(IL_DGGSS, ret, "get_reply");
}
return(ret);
/*
* Read reply from server.
- * Returns: -1 - error reading message,
+ * Returns: -1 - error reading message,
* code > 0 - error code from server
*/
static
-int
+int
get_reply(struct event_queue *eq, char **buf, int *code_min)
{
char *msg=NULL;
/*
* Returns: 0 - not connected, timeout set, 1 - OK
*/
-int
+int
event_queue_connect(struct event_queue *eq)
{
int ret;
local_cred_handle->counter++;
if(pthread_mutex_unlock(&cred_handle_lock) < 0)
abort();
-
+
il_log(LOG_DEBUG, " trying to connect to %s:%d\n", eq->dest_name, eq->dest_port);
ret = edg_wll_gss_connect(local_cred_handle->creds, eq->dest_name, eq->dest_port, &tv, &eq->gss, &gss_stat);
if(pthread_mutex_lock(&cred_handle_lock) < 0)
free(local_cred_handle);
il_log(LOG_DEBUG, " freed credentials, not used anymore\n");
}
- if(pthread_mutex_unlock(&cred_handle_lock) < 0)
+ if(pthread_mutex_unlock(&cred_handle_lock) < 0)
abort();
if(ret < 0) {
}
-/*
+/*
* Send all events from the queue.
* Returns: -1 - system error, 0 - not send, 1 - queue empty
*/
-int
+int
event_queue_send(struct event_queue *eq)
{
int events_sent = 0;
clear_error();
- if(event_queue_get(eq, &msg) < 0)
+ if(event_queue_get(eq, &msg) < 0)
return(-1);
il_log(LOG_DEBUG, " trying to deliver event at offset %d for job %s\n", msg->offset, msg->job_id_s);
eq->timeout = TIMEOUT;
return(0);
}
-
+
if((code = get_reply(eq, &rep, &code_min)) < 0) {
/* could not get the reply properly, so try again later */
if (events_sent>0) {
rep = strdup("OK");
}
#endif
-
+
il_log(LOG_DEBUG, " event sent, server %s replied with %d, %s\n", eq->dest_name, code, rep);
free(rep);
/* the reply is back here */
switch(code) {
-
+
/* NOT USED: case LB_TIME: */
case LB_NOMEM:
/* NOT USED: case LB_SYS: */
/* non fatal errors (for us) */
eq->timeout = TIMEOUT;
return(0);
-
+
case LB_OK:
/* event succesfully delivered */
-
+
default: /* LB_DBERR, LB_PROTO */
/* the event was not accepted by the server */
/* update the event pointer */
if((ret == 0) &&
(error_get_maj() != IL_OK))
il_log(LOG_ERR, "send_event: %s\n", error_get_msg());
-
+
event_queue_remove(eq);
events_sent++;
break;
-
+
} /* switch */
} /* while */