stage: compile
$(MAKE) install PREFIX=${stagedir}
-check: check.ll check.il
+check:
+# do nothing until test/ is really added to CVS
+# check.ll check.il
#check.ll: logd_proto_test.o ll_test.o
# ${LINKXX} -o $@ ${COMMON_LIB}_${nothrflavour} ${EXT_LIBS} ${GLOBUS_LIBS} ${TEST_LIBS} $+
#Thu Oct 07 13:57:49 CEST 2004
-module.version=0.1.0
+module.version=0.2.0
module.build=34
module.age=0
}
#if defined(IL_NOTIFICATIONS)
+
int
event_queue_move_events(struct event_queue *eq_s, struct event_queue *eq_d, char *notif_id)
{
if(eq_d) {
event_queue_lock(eq_d);
/* dest tail is set to point to the last (NULL) pointer in the list */
- dest_tail = (eq_d->tail == NULL) ? &(eq_d->tail) : &(eq_d->tail->prev);
+ dest_tail = (eq_d->head == NULL) ? &(eq_d->head) : &(eq_d->tail->prev);
}
source_prev = &(eq_s->head);
p = *source_prev;
- eq_s = NULL;
+ eq_s->tail = NULL;
while(p) {
if(strcmp(p->msg->job_id_s, notif_id) == 0) {
- il_log(LOG_DEBUG, " moving event with notif id %s from %s to %s\n",
- notif_id, eq_s->dest_name, eq_d ? eq_d->dest_name : "trash");
+ il_log(LOG_DEBUG, " moving event with notif id %s from %s:%d to %s:%d\n",
+ notif_id, eq_s->dest_name,eq_s->dest_port, eq_d ? eq_d->dest_name : "trash",eq_d ? eq_d->dest_port : -1);
/* remove the message from the source list */
*source_prev = p->prev;
if(eq_d) {
int
event_store_recover(struct event_store *es)
{
- struct event_queue *eq_l, *eq_b;
+ struct event_queue *eq_l = NULL, *eq_b, *eq_b_new;
struct server_msg *msg;
char *event_s;
int fd, ret;
ret = -1;
/* create message for server */
- msg = server_msg_create(event_s);
+ msg = server_msg_create(event_s, last);
free(event_s);
if(msg == NULL) {
break;
#endif
}
+#ifdef IL_NOTIFICATIONS
+ eq_b_new = queue_list_get(msg->dest);
+ if (eq_b_new != eq_b) {
+ free(es->dest);
+ es->dest = strdup(msg->dest);
+ eq_b = eq_b_new;
+ }
+#endif
+
/* now enqueue to the BS, if neccessary */
if((eq_b != eq_l) &&
(last >= es->last_committed_bs)) {
if(eq_known != NULL)
event_queue_move_events(eq_known, eq, msg->job_id_s);
}
+#endif
+ /* fire thread to take care of this queue */
+ if(event_queue_create_thread(eq) < 0)
+ return(-1);
+
+#if defined(IL_NOTIFICATIONS)
/* if there are no data to send, do not send anything
(messsage was just to change the delivery address) */
if(msg->len == 0)
return(0);
#endif
- /* fire thread to take care of this queue */
- if(event_queue_create_thread(eq) < 0)
- return(-1);
-
/* avoid losing signal to thread */
event_queue_cond_lock(eq);
int ret;
/* convert event to message for server */
- if((msg = server_msg_create(event)) == NULL) {
+ if((msg = server_msg_create(event, offset)) == NULL) {
il_log(LOG_ERR, " handle_msg: error parsing event '%s':\n %s\n", event, error_get_msg());
return(0);
}
#include "interlogd.h"
-static const int SOCK_QUEUE_MAX = 5;
+static const int SOCK_QUEUE_MAX = 50;
extern char *socket_path;
static int sock;
#include <globus_common.h>
-#include "glite/wmsutils/tls/ssl_helpers/ssl_inits.h"
-#include "glite/wmsutils/tls/ssl_helpers/ssl_pthreads.h"
#include "interlogd.h"
#include "glite/lb/consumer.h"
#include "glite/lb/lb_gss.h"
gss_cred_id_t cred_handle = GSS_C_NO_CREDENTIAL;
pthread_mutex_t cred_handle_lock = PTHREAD_MUTEX_INITIALIZER;
+time_t key_mtime = 0, cert_mtime = 0;
+
static void usage (int status)
{
printf("%s - \n"
if (CAcert_dir)
setenv("X509_CERT_DIR", CAcert_dir, 1);
+ edg_wll_gss_watch_creds(cert_file,&cert_mtime);
ret = edg_wll_gss_acquire_cred_gsi(cert_file, &cred_handle, NULL, &gss_stat);
if (ret) {
char *gss_err = NULL;
struct server_msg {
char *job_id_s; /* necessary for commit */
+ long offset; /* just for printing more information to debug */
char *msg;
int len;
int ev_len;
/* server msg methods */
-struct server_msg *server_msg_create(char *);
+struct server_msg *server_msg_create(char *, long);
struct server_msg *server_msg_copy(struct server_msg *);
int server_msg_init(struct server_msg *, char *);
#if defined(INTERLOGD_EMS)
if (mysignal(SIGTERM, handle_signal) == SIG_ERR) { perror("signal"); exit(1); }
if (mysignal(SIGCHLD, handle_signal) == SIG_ERR) { perror("signal"); exit(1); }
+ edg_wll_gss_watch_creds(cert_file,&cert_mtime);
/* XXX DK: support noAuth */
ret = edg_wll_gss_acquire_cred_gsi(cert_file, &cred, &my_subject_name,
&gss_stat);
}
edg_wll_ll_log(LOG_DEBUG,"Connecting to UNIX socket...");
- if(connect(msg_sock, (struct sockaddr *)&saddr, sizeof(saddr)) < 0) {
- if(errno != EISCONN) {
- edg_wll_ll_log(LOG_DEBUG,"error.\n");
- SYSTEM_ERROR("connect");
- answer = errno;
- close(msg_sock);
- goto edg_wll_log_proto_server_end;
- } else {
+ for (i=0; i < CONNECT_ATTEMPTS; i++) {
+ if(connect(msg_sock, (struct sockaddr *)&saddr, sizeof(saddr)) < 0) {
+ if ((errno == EAGAIN) || (errno == ETIMEDOUT)) {
+ edg_wll_ll_log(LOG_DEBUG,".");
+ sleep(CONNECT_TIMEOUT);
+ continue;
+ } else if (errno == EISCONN) {
edg_wll_ll_log(LOG_DEBUG,"warning.\n");
- edg_wll_ll_log(LOG_ERR,"The socket is already connected!\n");
+ edg_wll_ll_log(LOG_ERR,"The socket is already connected!\n");
+ break;
+ } else {
+ edg_wll_ll_log(LOG_DEBUG,"error.\n");
+ SYSTEM_ERROR("connect");
+ answer = errno;
+ close(msg_sock);
+ goto edg_wll_log_proto_server_end_1;
+ }
+ } else {
+ edg_wll_ll_log(LOG_DEBUG,"o.k.\n");
+ break;
}
- } else {
- edg_wll_ll_log(LOG_DEBUG,"o.k.\n");
}
edg_wll_ll_log(LOG_DEBUG,"Sending via IPC the message position %ld (%d bytes)...", filepos, sizeof(filepos));
edg_wll_ll_log(LOG_ERR,"edg_wll_socket_write_full(): error,\n");
answer = errno;
close(msg_sock);
- goto edg_wll_log_proto_server_end;
+ goto edg_wll_log_proto_server_end_1;
} else {
edg_wll_ll_log(LOG_DEBUG,"o.k.\n");
}
edg_wll_ll_log(LOG_ERR,"edg_wll_socket_write_full(): error.");
answer = errno;
close(msg_sock);
- goto edg_wll_log_proto_server_end;
+ goto edg_wll_log_proto_server_end_1;
} else {
edg_wll_ll_log(LOG_DEBUG,"o.k.\n");
}
edg_wll_ll_log(LOG_INFO,"Done.\n");
return answer;
+
+edg_wll_log_proto_server_end_1:
+ if (event->any.priority) {
+ close(confirm_sock);
+ unlink(confirm_sock_name);
+ }
+ goto edg_wll_log_proto_server_end;
}
/*
va_end(fmt_args);
if(level <= edg_wll_ll_log_level)
- fprintf(stderr, err_text);
+ fprintf(stderr, "[%d] %s", (int) getpid(), err_text);
if(level <= LOG_ERR) {
openlog("edg-wl-logd", LOG_PID | LOG_CONS, LOG_DAEMON);
syslog(level, "%s", err_text);
void edg_wll_ll_log_init(int level);
void edg_wll_ll_log(int level, const char *fmt, ...);
+
/* fcntl defaults */
#define FCNTL_ATTEMPTS 5
#define FCNTL_TIMEOUT 1
+
+/* connect defaults */
+
+#define CONNECT_ATTEMPTS 50
+#define CONNECT_TIMEOUT 10
+
+
/* locallogger daemon listen and connect functions prototypes */
int do_listen(int port);
queue_thread(void *q)
{
struct event_queue *eq = (struct event_queue *)q;
- int ret, exit;
+ int ret, exit, flushing;
if(init_errors(0) < 0) {
il_log(LOG_ERR, "Error initializing thread specific data, exiting!");
ret = 0;
while (event_queue_empty(eq)
#if defined(INTERLOGD_HANDLE_CMD) && defined(INTERLOGD_FLUSH)
- && !eq->flushing
+ && ((flushing=eq->flushing) != 1)
#endif
) {
ret = event_queue_wait(eq, 0);
* we are sending or request flush operation
*/
event_queue_cond_unlock(eq);
-
+
/* connect to server */
- if((ret=event_queue_connect(eq)) < 0) {
- /* internal error */
- il_log(LOG_ERR, "queue_thread: %s\n", error_get_msg());
- /* this allows for collecting status when flushing;
- immediate exit would not do */
- exit = 1;
- break;
- } else if(ret == 0) {
+ if((ret=event_queue_connect(eq)) == 0) {
/* not connected */
if(error_get_maj() != IL_OK)
il_log(LOG_ERR, "queue_thread: %s\n", error_get_msg());
- il_log(LOG_INFO, " could not connect to server %s, waiting for retry\n", eq->dest_name);
+#if defined(IL_NOTIFICATIONS)
+ il_log(LOG_INFO, " could not connect to client %s, waiting for retry\n", eq->dest_name);
+#else
+ il_log(LOG_INFO, " could not connect to bookkeeping server %s, waiting for retry\n", eq->dest_name);
+#endif
} else {
/* connected, send events */
switch(ret=event_queue_send(eq)) {
-
+
case 0:
/* there was an error and we still have events to send */
if(error_get_maj() != IL_OK)
il_log(LOG_ERR, "queue_thread: %s\n", error_get_msg());
il_log(LOG_DEBUG, " events still waiting\n");
break;
-
+
case 1:
/* hey, we are done for now */
il_log(LOG_DEBUG, " all events for %s sent\n", eq->dest_name);
break;
-
+
default:
/* internal error */
il_log(LOG_ERR, "queue_thread: %s\n", error_get_msg());
exit = 1;
break;
-
+
} /* switch */
+
+ /* we are done for now anyway, so close the queue */
+ event_queue_close(eq);
}
- /* we are done for now anyway, so close the queue */
- event_queue_close(eq);
-
+#if defined(INTERLOGD_HANDLE_CMD) && defined(INTERLOGD_FLUSH)
+ if(pthread_mutex_lock(&flush_lock) < 0)
+ abort();
event_queue_cond_lock(eq);
-#if defined(INTERLOGD_HANDLE_CMD) && defined(INTERLOGD_FLUSH)
/* Check if we are flushing and if we are, report status to master */
- if(eq->flushing == 1) {
+ if(flushing == 1) {
il_log(LOG_DEBUG, " flushing mode detected, reporting status\n");
/* 0 - events waiting, 1 - events sent, < 0 - some error */
eq->flush_result = ret;
eq->flushing = 2;
- if(pthread_mutex_lock(&flush_lock) < 0)
- abort();
if(pthread_cond_signal(&flush_cond) < 0)
abort();
- if(pthread_mutex_unlock(&flush_lock) < 0)
- abort();
}
+ if(pthread_mutex_unlock(&flush_lock) < 0)
+ abort();
+#else
+ event_queue_cond_lock(eq);
#endif
/* if there was some error with server, sleep for a while */
/* iff !event_queue_empty() */
- if(ret == 0) {
- if(event_queue_sleep(eq) < 0) {
- il_log(LOG_ERR, "queue_thread: %s\n", error_get_msg());
- event_queue_cond_unlock(eq);
- pthread_exit((void*)-1);
- }
- }
+ if(ret == 0)
+ event_queue_sleep(eq);
if(exit) {
/* we have to clean up before exiting */
extern char *file_prefix;
+extern time_t cert_mtime, key_mtime;
+
void *
recover_thread(void *q)
{
- time_t cert_mtime = 0, key_mtime = 0;
-
if(init_errors(0) < 0) {
il_log(LOG_ERR, "Error initializing thread specific data, exiting!");
pthread_exit(NULL);
/*
- * Returns: -1 - internal error, 0 - not connected, timeout set, 1 - OK
+ * Returns: 0 - not connected, timeout set, 1 - OK
*/
int
event_queue_connect(struct event_queue *eq)
if(event_queue_get(eq, &msg) < 0)
return(-1);
+ il_log(LOG_DEBUG, " trying to deliver event at offset %d for job %s\n", msg->offset, msg->job_id_s);
+
tv.tv_sec = TIMEOUT;
tv.tv_usec = 0;
ret = edg_wll_gss_write_full(&eq->gss, msg->msg, msg->len, &tv, &bytes_sent, &gss_stat);
struct server_msg *
-server_msg_create(char *event)
+server_msg_create(char *event, long offset)
{
struct server_msg *msg;
server_msg_free(msg);
return(NULL);
}
+ msg->offset = offset;
return(msg);
}
msg->ev_len = src->ev_len;
msg->es = src->es;
msg->receipt_to = src->receipt_to;
+ msg->offset = src->offset;
#if defined(IL_NOTIFICATIONS)
msg->dest_name = strdup(src->dest_name);
msg->dest_port = src->dest_port;