From: Aleš Křenek Date: Fri, 18 Jun 2004 13:20:25 +0000 (+0000) Subject: initial import X-Git-Tag: import_scm_1~1 X-Git-Url: http://scientific.zcu.cz/git/?a=commitdiff_plain;h=4602b88a04fd41a9902e8dab4acf6b7e09b5d299;p=jra1mw.git initial import --- diff --git a/org.glite.lb.logger/Makefile b/org.glite.lb.logger/Makefile new file mode 100644 index 0000000..c3c2713 --- /dev/null +++ b/org.glite.lb.logger/Makefile @@ -0,0 +1,66 @@ +include Makefile.inc + +VPATH:=${src} + +# XXX +VERSION=-DVERSION=\"glite-test\" + +SUFFIXES=.no + +DEBUG:=-g -O0 +CFLAGS:=${DEBUG} \ + -I${stageinc} -I${src} \ + -I${repository}/${globus}/include/${globusflavour} \ + -I${repository}/${globus}/include/${globusflavour}/openssl \ + -D_GNU_SOURCE \ + ${VERSION} + +LDFLAGS:=-L${stagelib} +LINK:=libtool --mode=link ${CC} ${LDFLAGS} +INSTALL:=libtool --mode=install install + +GLOBUS_LIBS:= -L${repository}/${globus}/lib \ + -lglobus_common_${globusflavour} \ + -lssl_${globusflavour} + +EXT_LIBS:= ${GLOBUS_LIBS} \ + -L${repository}/${ares}/lib -lares \ + -L${repository}/${expat}/lib -lexpat + +HELPERS:=-lglite_wms_tls_ssl_helpers +PHELPERS:=-lglite_wms_tls_ssl_pthr_helpers + +COMMON_LIB:=-lglite_lb_common + +LOGD_OBJS:= logd_proto.o logd.o + +INTERLOG_OBJS:=il_error.o input_queue_socket.o \ + recover.o send_event.o \ + event_queue.o event_store.o il_master.o interlogd.o \ + queue_mgr.o server_msg.o queue_thread.o + +INTERLOG_NOBJS:=${INTERLOG_OBJS:.o=.no} + +default: all + +compile: glite_lb_logd glite_lb_interlogd glite_lb_notif_interlogd + +stage export all: compile + ${INSTALL} -m 755 glite_lb_logd glite_lb_interlogd glite_lb_notif_interlogd ${stagebin} + +check: + echo No unit tests so far + +glite_lb_logd: ${LOGD_OBJS} + ${LINK} -o $@ ${LOGD_OBJS} ${COMMON_LIB} ${HELPERS} ${EXT_LIBS} + +glite_lb_interlogd: ${INTERLOG_OBJS} + ${LINK} -o $@ ${INTERLOG_OBJS} \ + ${COMMON_LIB} ${HELPERS} ${PHELPERS} ${EXT_LIBS} -lpthread + +glite_lb_notif_interlogd: ${INTERLOG_NOBJS} + ${LINK} -o $@ ${INTERLOG_NOBJS} \ + ${COMMON_LIB} ${HELPERS} ${PHELPERS} ${EXT_LIBS} -lpthread + +%.no: %.c + ${CC} ${CFLAGS} -DIL_NOTIFICATIONS -c $< -o $@ diff --git a/org.glite.lb.logger/build.xml b/org.glite.lb.logger/build.xml new file mode 100755 index 0000000..4aa4105 --- /dev/null +++ b/org.glite.lb.logger/build.xml @@ -0,0 +1,36 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + diff --git a/org.glite.lb.logger/project/properties.xml b/org.glite.lb.logger/project/properties.xml new file mode 100755 index 0000000..d9d07b8 --- /dev/null +++ b/org.glite.lb.logger/project/properties.xml @@ -0,0 +1,12 @@ + + + + + + + + + + + + diff --git a/org.glite.lb.logger/project/taskdefs.xml b/org.glite.lb.logger/project/taskdefs.xml new file mode 100755 index 0000000..9125c37 --- /dev/null +++ b/org.glite.lb.logger/project/taskdefs.xml @@ -0,0 +1,4 @@ + + + + diff --git a/org.glite.lb.logger/src/event_queue.c b/org.glite.lb.logger/src/event_queue.c new file mode 100644 index 0000000..f4b749e --- /dev/null +++ b/org.glite.lb.logger/src/event_queue.c @@ -0,0 +1,307 @@ +#ident "$Header$" + +/* + * - general queue handling routines (insert, get) + */ + +#include +#include +#include +#include +#include +#include +#include + +#include "glite/wms/jobid/cjobid.h" + +#include "interlogd.h" + +struct event_queue_msg { + struct server_msg *msg; + struct event_queue_msg *prev; +}; + +struct event_queue * +event_queue_create(char *server_name) +{ + struct event_queue *eq; + char *p; + + p = strchr(server_name, ':'); + + if(p) + *p++ = 0; + + if((eq = malloc(sizeof(*eq))) == NULL) { + set_error(IL_NOMEM, ENOMEM, "event_queue_create: error allocating event queue"); + return(NULL); + } + + memset(eq, 0, sizeof(*eq)); + + eq->dest_name = strdup(server_name); + + if(p) + *(p-1) = ':'; + +#if defined(IL_NOTIFICATIONS) + eq->dest_port = atoi(p); +#else + eq->dest_port = p ? atoi(p)+1 : GLITE_WMSC_JOBID_DEFAULT_PORT+1; +#endif + /* create all necessary locks */ + if(pthread_rwlock_init(&eq->update_lock, NULL)) { + set_error(IL_SYS, errno, "event_queue_create: error creating update lock"); + free(eq); + return(NULL); + } + if(pthread_mutex_init(&eq->cond_lock, NULL)) { + set_error(IL_SYS, errno, "event_queue_create: error creating cond mutex"); + free(eq); + return(NULL); + } + if(pthread_cond_init(&eq->ready_cond, NULL)) { + set_error(IL_SYS, errno, "event_queue_create: error creating cond variable"); + free(eq); + return(NULL); + } + +#if defined(INTERLOGD_HANDLE_CMD) && defined(INTERLOGD_FLUSH) + if(pthread_cond_init(&eq->flush_cond, NULL)) { + set_error(IL_SYS, errno, "event_queue_create: error creating cond variable"); + free(eq); + return(NULL); + } +#endif + + return(eq); +} + + +int +event_queue_free(struct event_queue *eq) +{ + assert(eq != NULL); + + if(!event_queue_empty(eq)) + return(-1); + + if(eq->thread_id) + pthread_cancel(eq->thread_id); + + + pthread_rwlock_destroy(&eq->update_lock); + pthread_mutex_destroy(&eq->cond_lock); + pthread_cond_destroy(&eq->ready_cond); +#if defined(INTERLOGD_HANDLE_CMD) && defined(INTERLOGD_FLUSH) + pthread_cond_destroy(&eq->flush_cond); +#endif + free(eq); + + return(0); +} + + +int +event_queue_empty(struct event_queue *eq) +{ + int ret; + + assert(eq != NULL); + + event_queue_lock_ro(eq); + ret = eq->head == NULL; + event_queue_unlock(eq); + + return(ret); +} + + +int +event_queue_insert(struct event_queue *eq, struct server_msg *msg) +{ + struct event_queue_msg *el; +#if defined(INTERLOGD_EMS) + struct event_queue_msg *tail; +#endif + + assert(eq != NULL); + + if((el = malloc(sizeof(*el))) == NULL) + return(set_error(IL_NOMEM, ENOMEM, "event_queue_insert: not enough room for queue element")); + + el->msg = server_msg_copy(msg); + if(el->msg == NULL) { + free(el); + return(-1); + }; + + /* this is critical section */ + event_queue_lock(eq); +#if defined(INTERLOGD_EMS) + if(server_msg_is_priority(msg)) { + /* priority messages go first */ + tail = eq->tail_ems; + if(tail) { + el->prev = tail->prev; + tail->prev = el; + if (tail == eq->tail) + eq->tail = el; + } else { + el->prev = eq->head; + eq->head = el; + if(eq->tail == NULL) + eq->tail = el; + } + eq->tail_ems = el; + } else +#endif + { + /* normal messages */ + if(eq->tail) + eq->tail->prev = el; + else + eq->head = el; + eq->tail = el; + el->prev = NULL; + } +#if defined(INTERLOGD_EMS) + /* if we are inserting message between mark_prev and mark_this, + we have to adjust mark_prev accordingly */ + if(eq->mark_this && (el->prev == eq->mark_this)) + eq->mark_prev = el; +#endif + event_queue_unlock(eq); + /* end of critical section */ + + return(0); +} + + +int +event_queue_get(struct event_queue *eq, struct server_msg **msg) +{ + struct event_queue_msg *el; + + assert(eq != NULL); + assert(msg != NULL); + + event_queue_lock_ro(eq); + el = eq->head; +#if defined(INTERLOGD_EMS) + /* this message is marked for removal, it is first on the queue */ + eq->mark_this = el; + eq->mark_prev = NULL; +#endif + event_queue_unlock(eq); + + if(el == NULL) + return(-1); + + *msg = el->msg; + + return(0); +} + + +int +event_queue_remove(struct event_queue *eq) +{ + struct event_queue_msg *el; +#if defined(INTERLOGD_EMS) + struct event_queue_msg *prev; +#endif + + assert(eq != NULL); + + /* this is critical section */ + event_queue_lock(eq); +#if defined(INTERLOGD_EMS) + el = eq->mark_this; + prev = eq->mark_prev; + + if(el == NULL) { + event_queue_unlock(eq); + return(-1); + } + + if(prev == NULL) { + /* removing from head of the queue */ + eq->head = el->prev; + } else { + /* removing from middle of the queue */ + prev->prev = el->prev; + } + if(el == eq->tail) { + /* we are removing the last message */ + eq->tail = NULL; + } + if(el == eq->tail_ems) { + /* we are removing last priority message */ + eq->tail_ems = NULL; + } +#else + el = eq->head; + if(el == NULL) { + event_queue_unlock(eq); + return(-1); + } + eq->head = el->prev; + if(el == eq->tail) { + eq->tail = NULL; + } +#endif + event_queue_unlock(eq); + /* end of critical section */ + + server_msg_free(el->msg); + free(el); + + return(0); +} + +#if defined(IL_NOTIFICATIONS) +int +event_queue_move_events(struct event_queue *eq_s, struct event_queue *eq_d, char *notif_id) +{ + struct event_queue_msg *p, **source_prev, **dest_tail; + + assert(eq_s != NULL); + assert(notif_id != NULL); + + event_queue_lock(eq_s); + if(eq_d) { + event_queue_lock(eq_d); + /* dest tail is set to point to the last (NULL) pointer in the list */ + dest_tail = (eq_d->tail == NULL) ? &(eq_d->tail) : &(eq_d->tail->prev); + } + source_prev = &(eq_s->head); + p = *source_prev; + while(p) { + if(strcmp(p->msg->job_id_s, notif_id) == 0) { + il_log(LOG_DEBUG, " moving event with notif id %s from %s to %s\n", + notif_id, eq_s->dest_name, eq_d ? eq_d->dest_name : "trash"); + /* remove the message from the source list */ + *source_prev = p->prev; + if(eq_d) { + /* append the message at the end of destination list */ + p->prev = NULL; + *dest_tail = p; + dest_tail = &(p->prev); + } else { + /* free the message */ + server_msg_free(p->msg); + free(p); + } + } else { + /* message stays */ + source_prev = &(p->prev); + } + p = *source_prev; + } + if(eq_d) event_queue_unlock(eq_d); + event_queue_unlock(eq_s); + return(0); +} + +#endif diff --git a/org.glite.lb.logger/src/event_store.c b/org.glite.lb.logger/src/event_store.c new file mode 100644 index 0000000..2e08d17 --- /dev/null +++ b/org.glite.lb.logger/src/event_store.c @@ -0,0 +1,1016 @@ +#ident "$Header$" + +#include +#include +#include +#include +#include +#include +#include +#ifdef HAVE_UNISTD_H +#include +#endif +#include + +#include "glite/lb/consumer.h" +#include "glite/lb/events_parse.h" + +#include "interlogd.h" + +#ifdef __GNUC__ +#define UNUSED_VAR __attribute__((unused)) +#else +#define UNUSED_VAR +#endif + +static char *file_prefix = NULL; + + +struct event_store_list { + struct event_store *es; + struct event_store_list *next; +}; + + +static struct event_store_list *store_list; +static pthread_rwlock_t store_list_lock = PTHREAD_RWLOCK_INITIALIZER; + + +/* ---------------- + * helper functions + * ---------------- + */ +static +char * +jobid2eventfile(IL_EVENT_ID_T job_id) +{ + char *buffer; + char *hash; + + if(job_id) { + hash = IL_EVENT_GET_UNIQUE(job_id); + asprintf(&buffer, "%s.%s", file_prefix, hash); + free(hash); + } else + asprintf(&buffer, "%s.default", file_prefix); + + return(buffer); +} + + +static +char * +jobid2controlfile(IL_EVENT_ID_T job_id) +{ + char buffer[256]; + char *hash; + + if(job_id) { + hash = IL_EVENT_GET_UNIQUE(job_id); + snprintf(buffer, 256, "%s.%s.ctl", file_prefix, hash); + free(hash); + } else + snprintf(buffer, 256, "%s.default.ctl", file_prefix); + + return(strdup(buffer)); +} + + +static +char * +read_event_string(FILE *file) +{ + char *buffer, *p, *n; + int len, c; + + buffer=malloc(1024); + if(buffer == NULL) { + set_error(IL_NOMEM, ENOMEM, "read_event_string: no room for event"); + return(NULL); + } + p = buffer; + len = 1024; + + while((c=fgetc(file)) != EOF) { + + /* we have to have free room for one byte */ + /* if(len - (p - buffer) < 1) */ + if(p - buffer >= len) { + n = realloc(buffer, len + 8192); + if(n == NULL) { + free(buffer); + set_error(IL_NOMEM, ENOMEM, "read_event_string: no room for event"); + return(NULL); + } + p = p - buffer + n; + buffer = n; + len += 8192; + } + + if(c == EVENT_SEPARATOR) { + *p++ = 0; + break; + } else + *p++ = (char) c; + } + + if(c != EVENT_SEPARATOR) { + free(buffer); + return(NULL); + } + + return(buffer); +} + + + +/* ------------------------------ + * event_store 'member' functions + * ------------------------------ + */ +static +int +event_store_free(struct event_store *es) +{ + assert(es != NULL); + + if(es->job_id_s) free(es->job_id_s); + if(es->event_file_name) free(es->event_file_name); + if(es->control_file_name) free(es->control_file_name); + pthread_rwlock_destroy(&es->use_lock); + pthread_rwlock_destroy(&es->update_lock); + free(es); + + return(0); +} + + +static +struct event_store * +event_store_create(char *job_id_s) +{ + struct event_store *es; + IL_EVENT_ID_T job_id; + + es = malloc(sizeof(*es)); + if(es == NULL) { + set_error(IL_NOMEM, ENOMEM, "event_store_create: error allocating room for structure"); + return(NULL); + } + + memset(es, 0, sizeof(*es)); + + il_log(LOG_DEBUG, " creating event store for id %s\n", job_id_s); + + job_id = NULL; + if(strcmp(job_id_s, "default") && IL_EVENT_ID_PARSE(job_id_s, &job_id)) { + set_error(IL_LBAPI, EDG_WLL_ERROR_PARSE_BROKEN_ULM, "event_store_create: error parsing id"); + free(es); + return(NULL); + } + + es->job_id_s = strdup(job_id_s); + es->event_file_name = jobid2eventfile(job_id); + es->control_file_name = jobid2controlfile(job_id); + IL_EVENT_ID_FREE(job_id); + + if(pthread_rwlock_init(&es->update_lock, NULL)) + abort(); + if(pthread_rwlock_init(&es->use_lock, NULL)) + abort(); + + return(es); +} + + +static +int +event_store_lock_ro(struct event_store *es) +{ + assert(es != NULL); + + if(pthread_rwlock_rdlock(&es->update_lock)) + abort(); + + return(0); +} + + +static +int +event_store_lock(struct event_store *es) +{ + assert(es != NULL); + + if(pthread_rwlock_wrlock(&es->update_lock)) + abort(); + + return(0); +} + + +static +int +event_store_unlock(struct event_store *es) +{ + assert(es != NULL); + + if(pthread_rwlock_unlock(&es->update_lock)) + abort(); + return(0); +} + + +static +int +event_store_read_ctl(struct event_store *es) +{ + FILE *ctl_file; + + assert(es != NULL); + + event_store_lock(es); + if((ctl_file = fopen(es->control_file_name, "r")) == NULL) { + /* no control file, new event file */ + es->last_committed_ls = 0; + es->last_committed_bs = 0; + } else { + /* read last seen and last committed counts */ + fscanf(ctl_file, "%*s\n%ld\n%ld\n", + &es->last_committed_ls, + &es->last_committed_bs); + fclose(ctl_file); + } + event_store_unlock(es); + + return(0); +} + + +static +int +event_store_write_ctl(struct event_store *es) +{ + FILE *ctl; + + assert(es != NULL); + + ctl = fopen(es->control_file_name, "w"); + if(ctl == NULL) { + set_error(IL_SYS, errno, "event_store_write_ctl: error opening control file"); + return(-1); + } + + if(fprintf(ctl, "%s\n%ld\n%ld\n", + es->job_id_s, + es->last_committed_ls, + es->last_committed_bs) < 0) { + set_error(IL_SYS, errno, "event_store_write_ctl: error writing control record"); + return(-1); + } + + if(fclose(ctl) < 0) { + set_error(IL_SYS, errno, "event_store_write_ctl: error closing control file"); + return(-1); + } + + return(0); +} + + +/* + * event_store_recover() + * - recover after restart or catch up when events missing in IPC + * - if offset > 0, read everything behind it + * - if offset == 0, read everything behind min(last_committed_bs, last_committed_es) + */ +int +event_store_recover(struct event_store *es) +{ + struct event_queue *eq_l, *eq_b; + struct server_msg *msg; + char *event_s; + int fd, ret; + long last; + FILE *ef; + struct flock efl; + char err_msg[128]; + + assert(es != NULL); + +#if defined(IL_NOTIFICATIONS) + eq_b = queue_list_get(es->dest); +#else + /* find bookkepping server queue */ + eq_b = queue_list_get(es->job_id_s); +#endif + if(eq_b == NULL) + return(-1); + +#if !defined(IL_NOTIFICATIONS) + /* get log server queue */ + eq_l = queue_list_get(NULL); +#endif + + event_store_lock(es); + + il_log(LOG_DEBUG, " reading events from %s\n", es->event_file_name); + + /* open event file */ + ef = fopen(es->event_file_name, "r"); + if(ef == NULL) { + snprintf(err_msg, sizeof(err_msg), + "event_store_recover: error opening event file %s", + es->event_file_name); + set_error(IL_SYS, errno, err_msg); + event_store_unlock(es); + return(-1); + } + + /* lock the file for reading (we should not read while dglogd is writing) */ + fd = fileno(ef); + efl.l_type = F_RDLCK; + efl.l_whence = SEEK_SET; + efl.l_start = 0; + efl.l_len = 0; + if(fcntl(fd, F_SETLKW, &efl) < 0) { + snprintf(err_msg, sizeof(err_msg), + "event_store_recover: error locking event file %s", + es->event_file_name); + set_error(IL_SYS, errno, err_msg); + event_store_unlock(es); + fclose(ef); + return(-1); + } + + /* get the position in file to be sought */ + if(es->offset) + last = es->offset; + else { +#if !defined(IL_NOTIFICATIONS) + if(eq_b == eq_l) + last = es->last_committed_ls; + else +#endif + /* last = min(ls, bs) */ + last = (es->last_committed_bs < es->last_committed_ls) ? es->last_committed_bs : es->last_committed_ls; + } + + il_log(LOG_DEBUG, " setting starting file position to %ld\n", last); + il_log(LOG_DEBUG, " bytes sent to logging server: %d\n", es->last_committed_ls); + il_log(LOG_DEBUG, " bytes sent to bookkeeping server: %d\n", es->last_committed_bs); + + /* skip all committed or already enqueued events */ + if(fseek(ef, last, SEEK_SET) < 0) { + set_error(IL_SYS, errno, "event_store_recover: error setting position for read"); + event_store_unlock(es); + fclose(ef); + return(-1); + } + + /* enqueue all remaining events */ + ret = 1; + msg = NULL; + while((event_s=read_event_string(ef)) != NULL) { + + /* last holds the starting position of event_s in file */ + il_log(LOG_DEBUG, " reading event at %ld\n", last); + + /* break from now on means there was some error */ + ret = -1; + + /* create message for server */ + msg = server_msg_create(event_s); + free(event_s); + if(msg == NULL) { + break; + } + msg->es = es; + + /* first enqueue to the LS */ + if(!bs_only && (last >= es->last_committed_ls)) { + + il_log(LOG_DEBUG, " queueing event at %ld to logging server\n", last); + +#if !defined(IL_NOTIFICATIONS) + if(enqueue_msg(eq_l, msg) < 0) + break; +#endif + } + + /* now enqueue to the BS, if neccessary */ + if((eq_b != eq_l) && + (last >= es->last_committed_bs)) { + + il_log(LOG_DEBUG, " queueing event at %ld to bookkeeping server\n", last); + + if(enqueue_msg(eq_b, msg) < 0) + break; + } + server_msg_free(msg); + msg = NULL; + + /* now last is also the offset behind the last successfully queued event */ + last = ftell(ef); + + /* ret == 0 means EOF or incomplete event found */ + ret = 0; + + } /* while */ + + /* due to this little assignment we had to lock the event_store for writing */ + es->offset = last; + il_log(LOG_DEBUG, " event store offset set to %ld\n", last); + + if(msg) + server_msg_free(msg); + + fclose(ef); + il_log(LOG_DEBUG, " finished reading events with %d\n", ret); + + event_store_unlock(es); + return(ret); +} + + +/* + * event_store_sync() + * - check the position of event and fill holes from file + * - return 1 if the event is new, + * 0 if it was seen before, + * -1 if there was an error + */ +int +event_store_sync(struct event_store *es, long offset) +{ + int ret; + + assert(es != NULL); + + event_store_lock_ro(es); + if(es->offset == offset) + /* we are up to date */ + ret = 1; + else if(es->offset > offset) + /* we have already seen this event */ + ret = 0; + else { + /* es->offset < offset, i.e. we have missed some events */ + event_store_unlock(es); + ret = event_store_recover(es); + /* XXX possible room for intervention by another thread - is there + * any other thread messing with us? + * 1) After recover() es->offset is set at the end of file. + * 2) es->offset is set only by recover() and next(). + * 3) Additional recover can not do much harm. + * 4) And next() is only called by the same thread as sync(). + * => no one is messing with us right now */ + event_store_lock_ro(es); + if(ret < 0) + ret = -1; + else + /* somehow we suppose that now es->offset >= offset */ + /* in fact it must be es->offset > offset, anything else would be weird */ + ret = (es->offset > offset) ? 0 : 1; + } + event_store_unlock(es); + return(ret); +} + + +int +event_store_next(struct event_store *es, int len) +{ + assert(es != NULL); + + event_store_lock(es); + es->offset += len; + event_store_unlock(es); + + return(0); +} + + +/* + * event_store_commit() + * + */ +int +event_store_commit(struct event_store *es, int len, int ls) +{ + assert(es != NULL); + + event_store_lock(es); + + if(ls) + es->last_committed_ls += len; + else { + es->last_committed_bs += len; + if (bs_only) es->last_committed_ls += len; + } + + if(event_store_write_ctl(es) < 0) { + event_store_unlock(es); + return(-1); + } + + event_store_unlock(es); + + + return(0); +} + + +/* + * event_store_clean() + * - remove the event files (event and ctl), if they are not needed anymore + * - returns 0 if event_store is in use, 1 if it was removed and -1 on error + * + * Q: How do we know that we can safely remove the files? + * A: When all events from file have been committed both by LS and BS. + */ +static +int +event_store_clean(struct event_store *es) +{ + long last; + int fd; + FILE *ef; + struct flock efl; + + assert(es != NULL); + + /* prevent sender threads from updating */ + event_store_lock(es); + + il_log(LOG_DEBUG, " trying to cleanup event store %s\n", es->job_id_s); + il_log(LOG_DEBUG, " bytes sent to logging server: %d\n", es->last_committed_ls); + il_log(LOG_DEBUG, " bytes sent to bookkeeping server: %d\n", es->last_committed_bs); + + /* preliminary check to avoid opening event file */ + /* if the positions differ, some events still have to be sent */ + if(es->last_committed_ls != es->last_committed_bs) { + event_store_unlock(es); + il_log(LOG_DEBUG, " not all events sent, cleanup aborted\n"); + return(0); + } + + /* the file can only be removed when all the events were succesfully sent + (ie. committed both by LS and BS */ + /* That also implies that the event queues are 'empty' at the moment. */ + ef = fopen(es->event_file_name, "r+"); + if(ef == NULL) { + /* if we can not open the event store, it is an error and the struct should be removed */ + /* XXX - is it true? */ + event_store_unlock(es); + il_log(LOG_ERR, " event_store_clean: error opening event file: %s\n", strerror(errno)); + return(1); + } + + fd = fileno(ef); + + /* prevent local-logger from writing into event file */ + efl.l_type = F_WRLCK; + efl.l_whence = SEEK_SET; + efl.l_start = 0; + efl.l_len = 0; + if(fcntl(fd, F_SETLK, &efl) < 0) { + il_log(LOG_DEBUG, " could not lock event file, cleanup aborted\n"); + fclose(ef); + event_store_unlock(es); + if(errno != EACCES && + errno != EAGAIN) { + set_error(IL_SYS, errno, "event_store_clean: error locking event file"); + return(-1); + } + return(0); + } + + /* now the file should not contain partially written event, so it is safe + to get offset behind last event by seeking the end of file */ + if(fseek(ef, 0, SEEK_END) < 0) { + set_error(IL_SYS, errno, "event_store_clean: error seeking the end of file"); + event_store_unlock(es); + fclose(ef); + return(-1); + } + + last = ftell(ef); + il_log(LOG_DEBUG, " total bytes in file: %d\n", last); + + if(es->last_committed_ls < last) { + fclose(ef); + event_store_unlock(es); + il_log(LOG_DEBUG, " events still waiting in queue, cleanup aborted\n"); + return(0); + } else if( es->last_committed_ls > last) { + il_log(LOG_WARNING, " warning: event file seems to shrink!\n"); + } + + /* now we are sure that all events were sent and the event queues are empty */ + il_log(LOG_INFO, " removing event file %s\n", es->event_file_name); + + /* remove the event file */ + unlink(es->event_file_name); + unlink(es->control_file_name); + + /* clear the counters */ + es->last_committed_ls = 0; + es->last_committed_bs = 0; + es->offset = 0; + + /* unlock the event_store even if it is going to be removed */ + event_store_unlock(es); + + /* close the event file (that unlocks it as well) */ + fclose(ef); + + /* indicate that it is safe to remove this event_store */ + return(1); +} + + + +/* -------------------------------- + * event store management functions + * -------------------------------- + */ +struct event_store * +event_store_find(char *job_id_s) +{ + struct event_store_list *q, *p; + struct event_store *es; + + if(pthread_rwlock_wrlock(&store_list_lock)) { + abort(); + } + + es = NULL; + + q = NULL; + p = store_list; + + while(p) { + if(strcmp(p->es->job_id_s, job_id_s) == 0) { + es = p->es; + if(pthread_rwlock_rdlock(&es->use_lock)) + abort(); + if(pthread_rwlock_unlock(&store_list_lock)) + abort(); + return(es); + } + + q = p; + p = p->next; + } + + es = event_store_create(job_id_s); + if(es == NULL) { + if(pthread_rwlock_unlock(&store_list_lock)) + abort(); + return(NULL); + } + + p = malloc(sizeof(*p)); + if(p == NULL) { + set_error(IL_NOMEM, ENOMEM, "event_store_find: no room for new event store"); + if(pthread_rwlock_unlock(&store_list_lock)) + abort(); + return(NULL); + } + + p->next = store_list; + store_list = p; + + p->es = es; + + if(pthread_rwlock_rdlock(&es->use_lock)) + abort(); + + if(pthread_rwlock_unlock(&store_list_lock)) + abort(); + + return(es); +} + + +int +event_store_release(struct event_store *es) +{ + assert(es != NULL); + + if(pthread_rwlock_unlock(&es->use_lock)) + abort(); + il_log(LOG_DEBUG, " released lock on %s\n", es->job_id_s); + return(0); +} + + +static +int +event_store_from_file(char *filename) +{ + struct event_store *es; + FILE *event_file; + char *event_s, *job_id_s = NULL; + int ret; +#if defined(IL_NOTIFICATIONS) + edg_wll_Event *notif_event; + edg_wll_Context context; + char *dest_name = NULL; + + edg_wll_InitContext(&context); +#endif + + il_log(LOG_INFO, " attaching to event file: %s\n", filename); + + event_file = fopen(filename, "r"); + if(event_file == NULL) { + set_error(IL_SYS, errno, "event_store_from_file: error opening event file"); + return(-1); + } + event_s = read_event_string(event_file); + fclose(event_file); + if(event_s == NULL) + return(0); + +#if defined(IL_NOTIFICATIONS) + if((ret=edg_wll_ParseNotifEvent(context, event_s, ¬if_event))) { + set_error(IL_LBAPI, ret, "event_store_from_file: could not parse event"); + ret = -1; + goto out; + } + if(notif_event->notification.notifId == NULL) { + set_error(IL_LBAPI, EDG_WLL_ERROR_PARSE_BROKEN_ULM, + "event_store_from_file: parse error - no notif id"); + ret = -1; + goto out; + } + if((job_id_s = edg_wll_NotifIdUnparse(notif_event->notification.notifId)) == NULL) { + set_error(IL_SYS, ENOMEM, "event_store_from_file: could not copy id"); + ret = -1; + goto out; + } + if(notif_event->notification.dest_host && + (strlen(notif_event->notification.dest_host) > 0)) { + asprintf(&dest_name, "%s:%d", notif_event->notification.dest_host, notif_event->notification.dest_port); + } + +#else + job_id_s = edg_wll_GetJobId(event_s); +#endif + il_log(LOG_DEBUG, " event id: '%s'\n", job_id_s); + if(job_id_s == NULL) { + il_log(LOG_NOTICE, " skipping file, could not parse event\n"); + ret = 0; + goto out; + } + + es=event_store_find(job_id_s); + + if(es == NULL) { + ret = -1; + goto out; + } + +#if defined(IL_NOTIFICATIONS) + es->dest = dest_name; +#endif + + if((es->last_committed_ls == 0) && + (es->last_committed_bs == 0) && + (es->offset == 0)) { + ret = event_store_read_ctl(es); + } else + ret = 0; + + event_store_release(es); + +out: +#if defined(IL_NOTIFICATIONS) + if(notif_event) { + edg_wll_FreeEvent(notif_event); + free(notif_event); + } +#endif + if(event_s) free(event_s); + if(job_id_s) free(job_id_s); + return(ret); +} + + +int +event_store_init(char *prefix) +{ + if(file_prefix == NULL) { + file_prefix = strdup(prefix); + store_list = NULL; + } + + /* read directory and get a list of event files */ + { + int len; + + char *p, *dir; + DIR *event_dir; + struct dirent *entry; + + + /* get directory name */ + p = strrchr(file_prefix, '/'); + if(p == NULL) { + dir = strdup("."); + p = ""; + len = 0; + } else { + *p = '\0'; + dir = strdup(file_prefix); + *p++ = '/'; + len = strlen(p); + } + + event_dir = opendir(dir); + if(event_dir == NULL) { + free(dir); + set_error(IL_SYS, errno, "event_store_init: error opening event directory"); + return(-1); + } + + while((entry=readdir(event_dir))) { + char *s; + + /* skip all files that do not match prefix */ + if(strncmp(entry->d_name, p, len) != 0) + continue; + + /* skip all control files */ + if((s=strstr(entry->d_name, ".ctl")) != NULL && + s[4] == '\0') + continue; + + s = malloc(strlen(dir) + strlen(entry->d_name) + 2); + if(s == NULL) { + free(dir); + set_error(IL_NOMEM, ENOMEM, "event_store_init: no room for file name"); + return(-1); + } + + *s = '\0'; + strcat(s, dir); + strcat(s, "/"); + strcat(s, entry->d_name); + + if(event_store_from_file(s) < 0) { + free(dir); + free(s); + closedir(event_dir); + return(-1); + } + + free(s); + } + closedir(event_dir); + free(dir); + } + + return(0); +} + + +int +event_store_recover_all() +{ + struct event_store_list *sl; + + + if(pthread_rwlock_rdlock(&store_list_lock)) + abort(); + + /* recover all event stores */ + sl = store_list; + while(sl != NULL) { + + /* recover this event store */ + /* no need to lock use_lock in event_store, the store_list_lock is in place */ + if(event_store_recover(sl->es) < 0) { + il_log(LOG_ERR, " error recovering event store %s:\n %s\n", sl->es->event_file_name, error_get_msg()); + clear_error(); + } + sl = sl->next; + } + + if(pthread_rwlock_unlock(&store_list_lock)) + abort(); + + return(0); +} + + +#if 0 +int +event_store_remove(struct event_store *es) +{ + struct event_store_list *p, **q; + + assert(es != NULL); + + switch(event_store_clean(es)) { + case 0: + il_log(LOG_DEBUG, " event store not removed, still used\n"); + return(0); + + case 1: + if(pthread_rwlock_wrlock(&store_list_lock) < 0) { + set_error(IL_SYS, errno, " event_store_remove: error locking event store list"); + return(-1); + } + + p = store_list; + q = &store_list; + + while(p) { + if(p->es == es) { + (*q) = p->next; + event_store_free(es); + free(p); + break; + } + q = &(p->next); + p = p->next; + } + + if(pthread_rwlock_unlock(&store_list_lock) < 0) { + set_error(IL_SYS, errno, " event_store_remove: error unlocking event store list"); + return(-1); + } + return(1); + + default: + return(-1); + } + /* not reached */ + return(0); +} +#endif + +int +event_store_cleanup() +{ + struct event_store_list *sl; + struct event_store_list *slnext; + struct event_store_list **prev; + + /* try to remove event files */ + + if(pthread_rwlock_wrlock(&store_list_lock)) + abort(); + + sl = store_list; + prev = &store_list; + + while(sl != NULL) { + int ret; + + slnext = sl->next; + + /* one event store at time */ + ret = pthread_rwlock_trywrlock(&sl->es->use_lock); + if(ret == EBUSY) { + il_log(LOG_DEBUG, " event_store %s is in use by another thread\n", + sl->es->job_id_s); + sl = slnext; + continue; + } else if (ret < 0) + abort(); + + switch(event_store_clean(sl->es)) { + + case 1: + /* remove this event store */ + (*prev) = slnext; + event_store_free(sl->es); + free(sl); + break; + + case -1: + il_log(LOG_ERR, " error removing event store %s (file %s):\n %s\n", + sl->es->job_id_s, sl->es->event_file_name, error_get_msg()); + event_store_release(sl->es); + clear_error(); + /* go on to the next */ + + default: + event_store_release(sl->es); + prev = &(sl->next); + break; + } + + sl = slnext; + } + + if(pthread_rwlock_unlock(&store_list_lock)) + abort(); + + return(0); +} + diff --git a/org.glite.lb.logger/src/il_error.c b/org.glite.lb.logger/src/il_error.c new file mode 100644 index 0000000..322d6f1 --- /dev/null +++ b/org.glite.lb.logger/src/il_error.c @@ -0,0 +1,182 @@ +#ident "$Header$" + +#include +#include +#include +#include +#include +#include +#include + +#include // SSL header file + +#include "glite/lb/dgssl.h" + +#include "il_error.h" + + +extern int log_level; + +static pthread_key_t err_key; + +static int IL_ERR_MSG_LEN = 128; + +static +void +error_key_delete(void *err) +{ + free(err); +} + +static +void +error_key_create() +{ + pthread_key_create(&err_key, error_key_delete); +} + +static +struct error_inf * +error_get_err () +{ + struct error_inf *err; + + /* get thread specific error structure */ + err = (struct error_inf *)pthread_getspecific(err_key); + assert(err != NULL); + + return(err); +} + +int +init_errors(int level) +{ + static pthread_once_t error_once = PTHREAD_ONCE_INIT; + struct error_inf *err; + + /* create the key for thread specific error only once */ + pthread_once(&error_once, error_key_create); + + /* there is no thread error yet, try to create one */ + if((err = (struct error_inf *)malloc(sizeof(*err)))) { + /* allocation successfull, make it thread specific data */ + if(pthread_setspecific(err_key, err)) { + free(err); + return(-1); + } + } else + return(-1); + + err->code_maj = 0; + err->code_min = 0; + err->msg = malloc(IL_ERR_MSG_LEN + 1); + if(err->msg == NULL) + return(-1); + + if(level) + log_level = level; + + return(0); +} + +int +set_error(int code, long minor, char *msg) +{ + struct error_inf *err; + + err = error_get_err(); + + err->code_maj = code; + err->code_min = minor; + + switch(code) { + + case IL_SYS: + snprintf(err->msg, IL_ERR_MSG_LEN, "%s: %s", msg, strerror(err->code_min)); + break; + + case IL_HOST: + snprintf(err->msg, IL_ERR_MSG_LEN, "%s: %s", msg, hstrerror(err->code_min)); + break; + + case IL_AUTH: + snprintf(err->msg, IL_ERR_MSG_LEN, "%s: %s", msg, ERR_error_string(err->code_min, NULL)); + break; + + case IL_DGSSL: + switch(err->code_min) { + + case EDG_WLL_SSL_ERROR_SSL: + snprintf(err->msg, IL_ERR_MSG_LEN, "%s: %s", msg, ERR_error_string(ERR_get_error(), NULL)); + break; + + case EDG_WLL_SSL_ERROR_TIMEOUT: + snprintf(err->msg, IL_ERR_MSG_LEN, "%s: Timeout in SSL connection.", msg); + break; + + case EDG_WLL_SSL_ERROR_EOF: + snprintf(err->msg, IL_ERR_MSG_LEN, "%s: Connection lost.", msg); + break; + + case EDG_WLL_SSL_ERROR_ERRNO: + snprintf(err->msg, IL_ERR_MSG_LEN, "%s: %s", msg, strerror(errno)); + break; + + case EDG_WLL_SSL_ERROR_HERRNO: + snprintf(err->msg, IL_ERR_MSG_LEN, "%s: %s", msg, hstrerror(errno)); + break; + } + + default: + strncpy(err->msg, msg, IL_ERR_MSG_LEN); + } + + return(code); +} + + +int +clear_error() { + struct error_inf *err; + + err = error_get_err(); + + err->code_maj = IL_OK; + err->code_min = 0; + *(err->msg) = 0; + + return(0); +} + + +int +error_get_maj() +{ + struct error_inf *err; + + err = error_get_err(); + + return(err->code_maj); +} + + +long +error_get_min() +{ + struct error_inf *err; + + err = error_get_err(); + + return(err->code_min); +} + + +char * +error_get_msg() +{ + struct error_inf *err; + + err = error_get_err(); + + return(err->msg); +} diff --git a/org.glite.lb.logger/src/il_error.h b/org.glite.lb.logger/src/il_error.h new file mode 100644 index 0000000..b593bd6 --- /dev/null +++ b/org.glite.lb.logger/src/il_error.h @@ -0,0 +1,34 @@ +#ifndef IL_ERROR_H +#define IL_ERROR_H + +#ident "$Header$" + +#include + +enum err_code_maj { /* minor = */ + IL_OK, /* 0 */ + IL_SYS, /* errno */ + IL_NOMEM, /* ENOMEM */ + IL_AUTH, /* 0 (SSL error) */ + IL_PROTO, /* LB_* */ + IL_LBAPI, /* dgLBErrCode */ + IL_DGSSL, /* EDG_WLL_SSL_* */ + IL_HOST /* h_errno */ +}; + +struct error_inf { + int code_maj; + long code_min; + char *msg; +}; + +int init_errors(int); +int set_error(int, long, char *); +int clear_error(); +int error_get_maj(); +long error_get_min(); +char *error_get_msg(); + +int il_log(int, char *, ...); + +#endif diff --git a/org.glite.lb.logger/src/il_master.c b/org.glite.lb.logger/src/il_master.c new file mode 100644 index 0000000..d4f77c3 --- /dev/null +++ b/org.glite.lb.logger/src/il_master.c @@ -0,0 +1,417 @@ +#ident "$Header$" + +#include +#include +#include + +#include "glite/wms/jobid/cjobid.h" +#include "glite/lb/context.h" +#include "glite/lb/events_parse.h" +#include "glite/lb/il_string.h" + +#include "interlogd.h" + +int +enqueue_msg(struct event_queue *eq, struct server_msg *msg) +{ +#if defined(IL_NOTIFICATIONS) + struct event_queue *eq_known; + + /* now we have a new event with possibly changed destination, + so check for the already known destination and possibly move + events from the original output queue to a new one */ + eq_known = notifid_map_get_dest(msg->job_id_s); + if(eq != eq_known) { + /* client has changed delivery address for this notification */ + if(notifid_map_set_dest(msg->job_id_s, eq) < 0) + return(-1); + /* move all events with this notif_id from eq_known to eq */ + if(eq_known != NULL) + event_queue_move_events(eq_known, eq, msg->job_id_s); + } + + /* if there are no data to send, do not send anything + (messsage was just to change the delivery address) */ + if(msg->len == 0) + return(0); + +#endif + /* fire thread to take care of this queue */ + if(event_queue_create_thread(eq) < 0) + return(-1); + + /* avoid losing signal to thread */ + event_queue_cond_lock(eq); + + /* insert new event */ + if(event_queue_insert(eq, msg) < 0) + return(-1); + + /* signal thread that we have a new message */ + event_queue_signal(eq); + + /* allow thread to continue */ + event_queue_cond_unlock(eq); + + return(0); +} + + +#if defined(INTERLOGD_HANDLE_CMD) && defined(INTERLOGD_FLUSH) +pthread_mutex_t flush_lock = PTHREAD_MUTEX_INITIALIZER; +pthread_cond_t flush_cond = PTHREAD_COND_INITIALIZER; +#endif /* INTERLOGD_FLUSH */ + +#ifdef INTERLOGD_HANDLE_CMD +static +int +parse_cmd(char *event, char **job_id_s, long *receipt, int *timeout) +{ + char *token, *r; + int ret; + + if(strstr(event, "DG.TYPE=\"command\"") == NULL) + return(-1); + + *job_id_s = NULL; + *timeout = 0; + *receipt = 0; + ret = 0; + + for(token = strtok(event, " "); token != NULL; token = strtok(NULL, " ")) { + r = index(token, '='); + if(r == NULL) { + ret = -1; + continue; + } + if(strncmp(token, "DG.COMMAND", r - token) == 0) { +#if defined(INTERLOGD_FLUSH) + if(strcmp(++r, "\"flush\"")) { +#endif + il_log(LOG_WARNING, " command %s not implemented\n", r); + ret = -1; + continue; +#if defined(INTERLOGD_FLUSH) + } +#endif + } else if(strncmp(token, "DG.JOBID", r - token) == 0) { + char *p; + + r += 2; /* skip =" */ + p = index(r, '"'); + if(p == NULL) { ret = -1; continue; } + *job_id_s = strndup(r, p-r); + + } else if(strncmp(token, "DG.TIMEOUT", r - token) == 0) { + sscanf(++r, "\"%d\"", timeout); + } else if(strncmp(token, "DG.LLLID", r - token) == 0) { + sscanf(++r, "%ld", receipt); + } + + } + return(0); +} + + +/* return value: + * 0 - not command + * 1 - success + * -1 - failure + */ + +static +int +handle_cmd(char *event, long offset) +{ + char *job_id_s; + struct event_queue *eq; + int num_replies, num_threads = 0; + int timeout, result; + long receipt; + struct timespec endtime; + struct timeval tv; + + /* parse command */ + if(parse_cmd(event, &job_id_s, &receipt, &timeout) < 0) + return(0); + +#if defined(INTERLOGD_FLUSH) + il_log(LOG_DEBUG, " received FLUSH command\n"); + + /* catchup with all neccessary event files */ + if(job_id_s) { + struct event_store *es = event_store_find(job_id_s); + + if(es == NULL) { + goto cmd_error; + } + result = event_store_recover(es); + /* NOTE: if flush had been stored in file, there would have been + no need to lock the event_store at all */ + event_store_release(es); + if(result < 0) { + il_log(LOG_ERR, " error trying to catch up with event file: %s\n", + error_get_msg()); + clear_error(); + } + } else + /* this call does not fail :-) */ + event_store_recover_all(); + + il_log(LOG_DEBUG, " alerting threads to report status\n"); + + /* prevent threads from reporting too early */ + if(pthread_mutex_lock(&flush_lock) < 0) { + /*** this error is considered too serious to allow the program run anymore! + set_error(IL_SYS, errno, "pthread_mutex_lock: error locking flush lock"); + goto cmd_error; + */ + abort(); + } + + /* wake up all threads */ + if(job_id_s) { + /* find appropriate queue */ + eq = queue_list_get(job_id_s); + if(eq == NULL) goto cmd_error; + if(!event_queue_empty(eq) && !queue_list_is_log(eq)) { + num_threads++; + event_queue_cond_lock(eq); + eq->flushing = 1; + event_queue_wakeup(eq); + event_queue_cond_unlock(eq); + } + } else { + /* iterate over event queues */ + for(eq=queue_list_first(); eq != NULL; eq=queue_list_next()) { + if(!event_queue_empty(eq) && !queue_list_is_log(eq)) { + num_threads++; + event_queue_cond_lock(eq); + eq->flushing = 1; + event_queue_wakeup(eq); + event_queue_cond_unlock(eq); + } + } + } + if(!bs_only) { + eq = queue_list_get(NULL); + if(eq == NULL) goto cmd_error; + if(!event_queue_empty(eq)) { + num_threads++; + event_queue_cond_lock(eq); + eq->flushing = 1; + event_queue_wakeup(eq); + event_queue_cond_unlock(eq); + } + } + + /* wait for thread replies */ + num_replies = 0; + result = 1; + gettimeofday(&tv, NULL); + endtime.tv_sec = tv.tv_sec + timeout; + endtime.tv_nsec = 1000 * tv.tv_usec; + while(num_replies < num_threads) { + int ret; + if((ret=pthread_cond_timedwait(&flush_cond, &flush_lock, &endtime)) < 0) { + il_log(LOG_ERR, " error waiting for thread reply: %s\n", strerror(errno)); + result = (ret == ETIMEDOUT) ? 0 : -1; + break; + } + + /* collect results from reporting threads */ + if(job_id_s) { + /* find appropriate queue */ + eq = queue_list_get(job_id_s); + if(eq == NULL) goto cmd_error; + if(!queue_list_is_log(eq)) { + event_queue_cond_lock(eq); + if(eq->flushing == 2) { + eq->flushing = 0; + num_replies++; + result = ((result == 1) || (eq->flush_result < 0)) ? + eq->flush_result : result; + } + event_queue_cond_unlock(eq); + } + } else { + /* iterate over event queues */ + for(eq=queue_list_first(); eq != NULL; eq=queue_list_next()) { + if(!queue_list_is_log(eq)) { + event_queue_cond_lock(eq); + if(eq->flushing == 2) { + eq->flushing = 0; + num_replies++; + il_log(LOG_DEBUG, " thread reply: %d\n", eq->flush_result); + result = ((result == 1) || (eq->flush_result < 0)) ? + eq->flush_result : result; + } + event_queue_cond_unlock(eq); + } + } + } + if(!bs_only) { + eq = queue_list_get(NULL); + if(eq == NULL) goto cmd_error; + event_queue_cond_lock(eq); + if(eq->flushing == 2) { + eq->flushing = 0; + num_replies++; + result = ((result == 1) || (eq->flush_result < 0)) ? + eq->flush_result : result; + } + event_queue_cond_unlock(eq); + } + } + + /* prevent deadlock in next flush */ + if(pthread_mutex_unlock(&flush_lock) < 0) + abort(); + + + /* report back to local logger */ + switch(result) { + case 1: + result = 0; break; + case 0: + result = EDG_WLL_IL_EVENTS_WAITING; break; + default: + result = EDG_WLL_IL_SYS; break; + } + if(job_id_s) free(job_id_s); + result = send_confirmation(receipt, result); + if(result <= 0) + il_log(LOG_ERR, "handle_cmd: error sending status: %s\n", error_get_msg()); + return(1); + + +cmd_error: + if(job_id_s) free(job_id_s); + return(-1); +#else + return(0); +#endif /* INTERLOGD_FLUSH */ +} +#endif /* INTERLOGD_HANDLE_CMD */ + + +static +int +handle_msg(char *event, long offset) +{ + struct server_msg *msg = NULL; +#if !defined(IL_NOTIFICATIONS) + struct event_queue *eq_l; +#endif + struct event_queue *eq_s; + struct event_store *es; + + int ret; + + /* convert event to message for server */ + if((msg = server_msg_create(event)) == NULL) { + il_log(LOG_ERR, " handle_msg: error parsing event '%s':\n %s\n", event, error_get_msg()); + return(0); + } + + /* sync event store with IPC (if neccessary) + * This MUST be called before inserting event into output queue! */ + if((es = event_store_find(msg->job_id_s)) == NULL) + return(-1); + msg->es = es; + + ret = event_store_sync(es, offset); + il_log(LOG_DEBUG, " syncing event store at %d with event at %d, result %d\n", es->offset, offset, ret); + if(ret < 0) { + il_log(LOG_ERR, " handle_msg: error syncing event store:\n %s\n", error_get_msg()); + event_store_release(es); + return(0); + } else if(ret == 0) { + /* we have seen this event already */ + server_msg_free(msg); + event_store_release(es); + return(1); + } + + /* find apropriate queue for this event */ +#if defined(IL_NOTIFICATIONS) + eq_s = queue_list_get(msg->dest); +#else + eq_s = queue_list_get(msg->job_id_s); +#endif + if(eq_s == NULL) { + il_log(LOG_ERR, " handle_msg: apropriate queue not found: %s\n", error_get_msg()); + clear_error(); + } else { + if(enqueue_msg(eq_s, msg) < 0) + goto err; + } + +#if !defined(IL_NOTIFICATIONS) + eq_l = queue_list_get(NULL); + if(!bs_only && eq_l != eq_s) { + /* send to default queue (logging server) as well */ + if(enqueue_msg(eq_l, msg) < 0) + goto err; + } +#endif + + /* if there was no error, set the next expected event offset */ + event_store_next(es, msg->ev_len); + /* allow cleanup thread to check on this event_store */ + event_store_release(es); + + /* free the message */ + server_msg_free(msg); + return(1); + +err: + event_store_release(es); + server_msg_free(msg); + return(-1); +} + + + +int +loop() +{ + /* receive events */ + while(1) { + char *msg; + long offset; + int ret; + + clear_error(); + if((ret = input_queue_get(&msg, &offset, INPUT_TIMEOUT)) < 0) + { + if(error_get_maj() == IL_PROTO) { + il_log(LOG_DEBUG, " premature EOF while receiving event\n"); + /* problems with socket input, try to catch up from files */ + event_store_recover_all(); + continue; + } else + return(-1); + } + else if(ret == 0) { + continue; + } + +#ifdef INTERLOGD_HANDLE_CMD + ret = handle_cmd(msg, offset); + if(ret == 0) +#endif + ret = handle_msg(msg, offset); + free(msg); + if(ret < 0) + switch (error_get_maj()) { + case IL_SYS: + case IL_NOMEM: + return (ret); + break; + default: + il_log(LOG_ERR, "Error: %s\n", error_get_msg()); + break; + } + } /* while */ +} diff --git a/org.glite.lb.logger/src/input_queue_socket.c b/org.glite.lb.logger/src/input_queue_socket.c new file mode 100644 index 0000000..0587cc8 --- /dev/null +++ b/org.glite.lb.logger/src/input_queue_socket.c @@ -0,0 +1,207 @@ +#ident "$Header$" + +#include +#include +#include +#include +#include +#include +#include + +#include "interlogd.h" + + +static const int SOCK_QUEUE_MAX = 5; +extern char *socket_path; + +static int sock; +static int accepted; + +int +input_queue_attach() +{ + struct sockaddr_un saddr; + + if((sock=socket(PF_UNIX, SOCK_STREAM, 0)) < 0) { + set_error(IL_SYS, errno, "input_queue_attach: error creating socket"); + return(-1); + } + + memset(&saddr, 0, sizeof(saddr)); + saddr.sun_family = AF_UNIX; + strcpy(saddr.sun_path, socket_path); + + /* test for the presence of the socket and another instance + of interlogger listening */ + if(connect(sock, (struct sockaddr *)&saddr, sizeof(saddr.sun_path)) < 0) { + if(errno == ECONNREFUSED) { + /* socket present, but no one at the other end; remove it */ + il_log(LOG_WARNING, " removing stale input socket %s\n", socket_path); + unlink(socket_path); + } + /* ignore other errors for now */ + } else { + /* connection was successful, so bail out - there is + another interlogger running */ + set_error(IL_SYS, EADDRINUSE, "input_queue_attach: another instance of interlogger is running"); + return(-1); + } + + if(bind(sock, (struct sockaddr *)&saddr, sizeof(saddr)) < 0) { + set_error(IL_SYS, errno, "input_queue_attach: error binding socket"); + return(-1); + } + + if (listen(sock, SOCK_QUEUE_MAX)) { + set_error(IL_SYS, errno, "input_queue_attach: error listening on socket"); + return -1; + } + + return(0); +} + +void input_queue_detach() +{ + if (sock >= 0) + close(sock); + unlink(socket_path); +} + + +static +char * +read_event(int sock, long *offset) +{ + char *buffer, *p, *n; + int len, alen; + char buf[256]; + + /* receive offset */ + len = recv(sock, offset, sizeof(*offset), MSG_NOSIGNAL); + if(len < sizeof(*offset)) { + set_error(IL_PROTO, errno, "read_event: error reading offset"); + return(NULL); + } + + /* receive event string */ + buffer=malloc(1024); + if(buffer == NULL) { + set_error(IL_NOMEM, ENOMEM, "read_event: no room for event"); + return(NULL); + } + p = buffer; + + alen = 1024; + while((len=recv(sock, buf, sizeof(buf), MSG_PEEK | MSG_NOSIGNAL)) > 0) { + int i; + + /* we have to be prepared for sizeof(buf) bytes */ + if(alen - (p - buffer) < (int)sizeof(buf)) { + alen += 8192; + n = realloc(buffer, alen); + if(n == NULL) { + free(buffer); + set_error(IL_NOMEM, ENOMEM, "read_event: no room for event"); + return(NULL); + } + p = p - buffer + n; + buffer = n; + } + + /* copy all relevant bytes from buffer */ + for(i=0; (i < len) && (buf[i] != EVENT_SEPARATOR); i++) + *p++ = buf[i]; + + /* remove the data from queue */ + if(i > 0) + if(recv(sock, buf, i, MSG_NOSIGNAL) != i) { + set_error(IL_SYS, errno, "read_event: error reading data"); + free(buffer); + return(NULL); + } + if(i < len) + /* the event is complete */ + break; + } + + /* terminate buffer */ + *p = 0; + + if(len < 0) { + set_error(IL_SYS, errno, "read_event: error reading data"); + free(buffer); + return(NULL); + } + + /* if len == 0, we have not encountered EVENT_SEPARATOR and thus the event is not complete */ + if(len == 0) { + set_error(IL_PROTO, errno, "read_event: error reading data - premature EOF"); + free(buffer); + return(NULL); + } + + /* this is probably not necessary at all: + either len <=0, which was covered before, + or 0 <= i < len => p > buffer; + I would say this condition can not be satisfied. + */ + if(p == buffer) { + set_error(IL_PROTO, errno, "read_event: error reading data - no data received"); + free(buffer); + return(NULL); + } + + return(buffer); +} + + +/* + * Returns: -1 on error, 0 if no message available, message length otherwise + * + */ +int +input_queue_get(char **buffer, long *offset, int timeout) +{ + fd_set fds; + struct timeval tv; + int msg_len; + + assert(buffer != NULL); + + FD_ZERO(&fds); + FD_SET(sock, &fds); + + tv.tv_sec = timeout; + tv.tv_usec = 0; + + msg_len = select(sock + 1, &fds, NULL, NULL, timeout >= 0 ? &tv : NULL); + switch(msg_len) { + + case 0: /* timeout */ + return(0); + + case -1: /* error */ + set_error(IL_SYS, errno, "input_queue_get: error waiting for event"); + return(-1); + + default: + break; + } + + if((accepted=accept(sock, NULL, NULL)) < 0) { + set_error(IL_SYS, errno, "input_queue_get: error accepting connection"); + return(-1); + } + + *buffer = read_event(accepted, offset); + close(accepted); + + if(*buffer == NULL) { + if(error_get_maj() != IL_OK) + return(-1); + else + return(0); + } + + return(strlen(*buffer)); +} diff --git a/org.glite.lb.logger/src/interlogd.c b/org.glite.lb.logger/src/interlogd.c new file mode 100644 index 0000000..b91ab45 --- /dev/null +++ b/org.glite.lb.logger/src/interlogd.c @@ -0,0 +1,280 @@ +#ident "$Header$" + +/* + interlogger - collect events from local-logger and send them to logging and bookkeeping servers + +*/ +#include +#include +#include +#include + +#include + +#include "glite/wms/tls/ssl_helpers/ssl_inits.h" +#include "glite/wms/tls/ssl_helpers/ssl_pthreads.h" +#include "interlogd.h" +#include "glite/lb/consumer.h" +#include "glite/lb/dgssl.h" + +#define EXIT_FAILURE 1 +#if defined(IL_NOTIFICATIONS) +#define DEFAULT_PREFIX "/tmp/notif_events" +#define DEFAULT_SOCKET "/tmp/notif_interlogger.sock" +#else +#define DEFAULT_PREFIX "/tmp/dglogd.log" +#define DEFAULT_SOCKET "/tmp/interlogger.sock" +#endif + + +/* The name the program was run with, stripped of any leading path. */ +char *program_name; +static int killflg = 0; + +int TIMEOUT = DEFAULT_TIMEOUT; + +proxy_cred_desc *cred_handle; +pthread_mutex_t cred_handle_lock = PTHREAD_MUTEX_INITIALIZER; + +static void usage (int status) +{ + printf("%s - \n" + " collect events from local-logger and send them to logging and bookkeeping servers\n" + "Usage: %s [OPTION]... [FILE]...\n" + "Options:\n" + " -h, --help display this help and exit\n" + " -V, --version output version information and exit\n" + " -d, --debug do not run as daemon\n" + " -v, --verbose print extensive debug output\n" + " -f, --file-prefix path and prefix for event files\n" + " -c, --cert location of server certificate\n" + " -k, --key location of server private key\n" + " -C, --CAdir directory containing CA certificates\n" + " -b, --book send events to bookkeeping server only\n" + " -l, --log-server specify address of log server\n" + " -s, --socket non-default path of local socket\n" + , program_name, program_name); + exit(status); +} + + +/* Option flags and variables */ +static int debug; +static int verbose = 0; +char *file_prefix = DEFAULT_PREFIX; +int bs_only = 0; + +char *cert_file = NULL; +char *key_file = NULL; +char *CAcert_dir = NULL; +char *log_server = NULL; +char *socket_path = DEFAULT_SOCKET; + +static struct option const long_options[] = +{ + {"help", no_argument, 0, 'h'}, + {"version", no_argument, 0, 'V'}, + {"verbose", no_argument, 0, 'v'}, + {"debug", no_argument, 0, 'd'}, + {"file-prefix", required_argument, 0, 'f'}, + {"cert", required_argument, 0, 'c'}, + {"key", required_argument, 0, 'k'}, + {"book", no_argument, 0, 'b'}, + {"CAdir", required_argument, 0, 'C'}, + {"log-server", required_argument, 0, 'l'}, + {"socket", required_argument, 0, 's'}, + {NULL, 0, NULL, 0} +}; + + + +/* Set all the option flags according to the switches specified. + Return the index of the first non-option argument. */ +static int +decode_switches (int argc, char **argv) +{ + int c; + + debug = 0; + + while ((c = getopt_long (argc, argv, + "f:" /* file prefix */ + "h" /* help */ + "V" /* version */ + "v" /* verbose */ + "c:" /* certificate */ + "k:" /* key */ + "C:" /* CA dir */ + "b" /* only bookeeping */ + "l:" /* log server */ + "d" /* debug */ + "s:", /* socket */ + long_options, (int *) 0)) != EOF) + { + switch (c) + { + case 'V': + printf ("interlogger %s\n", VERSION); + exit (0); + + case 'v': + verbose = 1; + break; + + case 'h': + usage (0); + + case 'd': + debug = 1; + break; + + case 'f': + file_prefix = strdup(optarg); + break; + + case 'c': + cert_file = strdup(optarg); + break; + + case 'k': + key_file = strdup(optarg); + break; + + case 'b': + bs_only = 1; + break; + + case 'l': + log_server = strdup(optarg); + break; + + case 'C': + CAcert_dir = strdup(optarg); + break; + + case 's': + socket_path = strdup(optarg); + break; + + default: + usage (EXIT_FAILURE); + } + } + + return optind; +} + + +void handle_signal(int num) { + il_log(LOG_DEBUG, "Received signal %d\n", num); + killflg++; +} + +int +main (int argc, char **argv) +{ + int i; + char *dummy = NULL,*p; + + program_name = argv[0]; + + setlinebuf(stdout); + setlinebuf(stderr); + + i = decode_switches (argc, argv); + + if ((p = getenv("EDG_WL_INTERLOG_TIMEOUT"))) TIMEOUT = atoi(p); + + /* force -b if we do not have log server */ + if(log_server == NULL) { + log_server = strdup(DEFAULT_LOG_SERVER); + bs_only = 1; + } + + if(init_errors(verbose ? LOG_DEBUG : LOG_WARNING)) { + fprintf(stderr, "Failed to initialize error message subsys. Exiting.\n"); + exit(EXIT_FAILURE); + } + + if (signal(SIGPIPE, handle_signal) == SIG_ERR + || signal(SIGABRT, handle_signal) == SIG_ERR + || signal(SIGTERM, handle_signal) == SIG_ERR + || signal(SIGINT, handle_signal) == SIG_ERR) { + perror("signal"); + exit(EXIT_FAILURE); + } + + il_log(LOG_INFO, "Initializing input queue...\n"); + if(input_queue_attach() < 0) { + il_log(LOG_CRIT, "Failed to initialize input queue: %s\n", error_get_msg()); + exit(EXIT_FAILURE); + } + + /* initialize output queues */ + il_log(LOG_INFO, "Initializing event queues...\n"); + if(queue_list_init(log_server) < 0) { + il_log(LOG_CRIT, "Failed to initialize output event queues: %s\n", error_get_msg()); + exit(EXIT_FAILURE); + } + + /* try to get default credential file names from globus */ + if(proxy_get_filenames(NULL,0, &dummy, &CAcert_dir, &dummy, &cert_file, &key_file) < 0) { + il_log(LOG_CRIT, "Failed to acquire credential file names. Exiting.\n"); + exit(EXIT_FAILURE); + } + + il_log(LOG_INFO, "Initializing SSL...\n"); + if(edg_wlc_SSLInitialization() < 0) { + il_log(LOG_CRIT, "Failed to initialize SSL. Exiting.\n"); + exit(EXIT_FAILURE); + } + + cred_handle = edg_wll_ssl_init(SSL_VERIFY_FAIL_IF_NO_PEER_CERT, 0, cert_file, key_file, 0, 0); + if(cred_handle == NULL) { + il_log(LOG_CRIT, "Failed to initialize SSL certificates. Exiting.\n"); + exit(EXIT_FAILURE); + } + + if(!debug && + (daemon(0,0) < 0)) { + perror("daemon"); + exit(EXIT_FAILURE); + } + + if (globus_module_activate(GLOBUS_COMMON_MODULE) != GLOBUS_SUCCESS) { + il_log(LOG_CRIT, "Failed to initialize Globus common module\n"); + exit(EXIT_FAILURE); + } + + if (edg_wlc_SSLLockingInit() != 0) { + il_log(LOG_CRIT, "Failed to initialize SSL locking. Exiting.\n"); + exit(EXIT_FAILURE); + } + + /* find all unsent events waiting in files */ + { + pthread_t rid; + + il_log(LOG_INFO, "Starting recovery thread...\n"); + if(pthread_create(&rid, NULL, recover_thread, NULL) < 0) { + il_log(LOG_CRIT, "Failed to start recovery thread: %s\n", strerror(errno)); + exit(EXIT_FAILURE); + } + pthread_detach(rid); + } + + il_log(LOG_INFO, "Entering main loop...\n"); + + /* do the work */ + if(loop() < 0) { + il_log(LOG_CRIT, "Fatal error: %s\n", error_get_msg()); + if (killflg) { + input_queue_detach(); + exit(EXIT_FAILURE); + } + } + il_log(LOG_INFO, "Done!\n"); + input_queue_detach(); + + exit (0); +} diff --git a/org.glite.lb.logger/src/interlogd.h b/org.glite.lb.logger/src/interlogd.h new file mode 100644 index 0000000..543bb12 --- /dev/null +++ b/org.glite.lb.logger/src/interlogd.h @@ -0,0 +1,205 @@ +#ifndef INTERLOGGER_P_H +#define INTERLOGGER_P_H + +#ident "$Header$" + +#include "il_error.h" +#include "glite/lb/dgssl.h" + +#include +#include +#include +#include +#ifdef HAVE_UNISTD_H +#include +#endif +#include +#ifdef HAVE_DMALLOC_H +#include +#endif + +#define INTERLOGD_HANDLE_CMD +#define INTERLOGD_FLUSH +#define INTERLOGD_EMS + +#define DEFAULT_USER "michal" +#define DEFAULT_LOG_SERVER "localhost" +#define DEFAULT_TIMEOUT 60 + +#if defined(IL_NOTIFICATIONS) + +#include "glite/lb/notifid.h" + +#undef INTERLOGD_HANDLE_CMD +#undef INTERLOGD_FLUSH +#undef INTERLOGD_EMS +#define IL_EVENT_ID_T edg_wll_NotifId +#define IL_EVENT_GET_UNIQUE(a) edg_wll_NotifIdGetUnique((a)) +#define IL_EVENT_ID_FREE(a) edg_wll_NotifIdFree((a)) +#define IL_EVENT_ID_PARSE(a,b) edg_wll_NotifIdParse((a),(b)) + +#else + +#define INTERLOGD_HANDLE_CMD +#define INTERLOGD_FLUSH +#define INTERLOGD_EMS +#define IL_EVENT_ID_T edg_wlc_JobId +#define IL_EVENT_GET_UNIQUE(a) edg_wlc_JobIdGetUnique((a)) +#define IL_EVENT_ID_FREE(a) edg_wlc_JobIdFree((a)) +#define IL_EVENT_ID_PARSE(a,b) edg_wlc_JobIdParse((a),(b)) + +#endif + + +#define EVENT_SEPARATOR '\n' + +// #define TIMEOUT 5 +extern int TIMEOUT; +#define INPUT_TIMEOUT (60) + + +/* extern SSL_CTX *sslContext; */ +extern proxy_cred_desc *cred_handle; +extern pthread_mutex_t cred_handle_lock; +extern char *cert_file; +extern char *key_file; +extern char *CAcert_dir; +extern int bs_only; + + +/* shared data for thread communication */ +#ifdef INTERLOGD_FLUSH +extern pthread_mutex_t flush_lock; +extern pthread_cond_t flush_cond; +#endif + +struct event_store { + char *event_file_name; /* file with events from local logger */ + char *control_file_name; /* file with control information */ + char *job_id_s; /* string form of the job id */ + long last_committed_bs; /* offset behind event that was last committed by BS */ + long last_committed_ls; /* -"- LS */ + long offset; /* expected file position of next event */ + int recovering; /* flag for recovery mode */ + pthread_rwlock_t update_lock; /* lock to prevent simultaneous updates */ + pthread_rwlock_t use_lock; /* lock to prevent struct deallocation */ +#if defined(IL_NOTIFICATIONS) + char *dest; /* host:port destination */ +#endif +}; + + +struct server_msg { + char *job_id_s; /* necessary for commit */ + char *msg; + int len; + int ev_len; + struct event_store *es; /* cache for corresponding event store */ + long receipt_to; /* receiver (long local-logger id - LLLID) of delivery confirmation (for priority messages) */ +#if defined(IL_NOTIFICATIONS) + char *dest_name; + int dest_port; + char *dest; +#endif +}; + + +struct event_queue { + SSL *ssl; /* SSL connection */ + char *dest_name; + int dest_port; + int timeout; /* queue timeout */ + struct event_queue_msg *tail; /* last message in the queue */ + struct event_queue_msg *head; /* first message in the queue */ +#if defined(INTERLOGD_EMS) + struct event_queue_msg *tail_ems; /* last priority message in the queue (or NULL) */ + struct event_queue_msg *mark_this; /* mark message for removal */ + struct event_queue_msg *mark_prev; /* predecessor of the marked message */ +#endif + pthread_t thread_id; /* id of associated thread */ + pthread_rwlock_t update_lock; /* mutex for queue updates */ + pthread_mutex_t cond_lock; /* mutex for condition variable */ + pthread_cond_t ready_cond; /* condition variable for message arrival */ +#if defined(INTERLOGD_HANDLE_CMD) && defined(INTERLOGD_FLUSH) + int flushing; + int flush_result; /* result of flush operation */ + pthread_cond_t flush_cond; /* condition variable for flush operation */ +#endif +}; + + +/* server msg methods */ +struct server_msg *server_msg_create(char *); +struct server_msg *server_msg_copy(struct server_msg *); +int server_msg_init(struct server_msg *, char *); +#if defined(INTERLOGD_EMS) +int server_msg_is_priority(struct server_msg *); +#endif +int server_msg_free(struct server_msg *); + +/* general event queue methods */ +struct event_queue *event_queue_create(char *); +int event_queue_free(struct event_queue *); +int event_queue_empty(struct event_queue *); +int event_queue_insert(struct event_queue *, struct server_msg *); +int event_queue_get(struct event_queue *, struct server_msg **); +int event_queue_remove(struct event_queue *); +int event_queue_enqueue(struct event_queue *, char *); +/* helper */ +int enqueue_msg(struct event_queue *, struct server_msg *); + +/* protocol event queue methods */ +int event_queue_connect(struct event_queue *); +int event_queue_send(struct event_queue *); +int event_queue_close(struct event_queue *); +int send_confirmation(long, int); + +/* thread event queue methods */ +int event_queue_create_thread(struct event_queue *); +int event_queue_lock(struct event_queue *); +int event_queue_unlock(struct event_queue *); +int event_queue_lock_ro(struct event_queue *); +int event_queue_signal(struct event_queue *); +int event_queue_wait(struct event_queue *, int); +int event_queue_sleep(struct event_queue *); +int event_queue_wakeup(struct event_queue *); +int event_queue_cond_lock(struct event_queue *); +int event_queue_cond_unlock(struct event_queue *); + +/* input queue */ +int input_queue_attach(); +void input_queue_detach(); +int input_queue_get(char **, long *, int); + +/* queue management functions */ +int queue_list_init(char *); +struct event_queue *queue_list_get(char *); +struct event_queue *queue_list_first(); +struct event_queue *queue_list_next(); +int queue_list_is_log(struct event_queue *); + +#if defined(IL_NOTIFICATIONS) +struct event_queue *notifid_map_get_dest(const char *); +int notifid_map_set_dest(const char *, struct event_queue *); +int event_queue_move_events(struct event_queue *, struct event_queue *, char *); +#endif + +/* event store functions */ +int event_store_init(char *); +int event_store_cleanup(); +int event_store_recover_all(void); +struct event_store *event_store_find(char *); +int event_store_sync(struct event_store *, long); +int event_store_next(struct event_store *, int); +int event_store_commit(struct event_store *, int, int); +int event_store_recover(struct event_store *); +int event_store_release(struct event_store *); +/* int event_store_remove(struct event_store *); */ + +/* master main loop */ +int loop(); + +/* recover thread */ +void *recover_thread(void*); + +#endif diff --git a/org.glite.lb.logger/src/logd.c b/org.glite.lb.logger/src/logd.c new file mode 100644 index 0000000..22af270 --- /dev/null +++ b/org.glite.lb.logger/src/logd.c @@ -0,0 +1,413 @@ +#ident "$Header$" + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include + +#include "logd_proto.h" +#include "glite/lb/consumer.h" +#include "glite/wms/tls/ssl_helpers/ssl_inits.h" + +static const char rcsid[] = "@(#)$Id$"; +static int verbose = 0; +static int debug = 0; +static int port = EDG_WLL_LOG_PORT_DEFAULT; +static char *prefix = EDG_WLL_LOG_PREFIX_DEFAULT; +static char *cert_file = NULL; +static char *key_file = NULL; +static char *CAcert_dir = NULL; +static char *gridmap_file = NULL; +static int noAuth = 0; +static int noIPC = 0; +static int noParse = 0; + +#define DEFAULT_SOCKET "/tmp/interlogger.sock" +char *socket_path = DEFAULT_SOCKET; + +extern int confirm_sock; +extern char confirm_sock_name[256]; + +static struct option const long_options[] = { + { "help", no_argument, 0, 'h' }, + { "version", no_argument, 0, 'V' }, + { "verbose", no_argument, 0, 'v' }, + { "debug", no_argument, 0, 'd' }, + { "port", required_argument, 0, 'p' }, + { "file-prefix", required_argument, 0, 'f' }, + { "cert", required_argument, 0, 'c' }, + { "key", required_argument, 0, 'k' }, + { "CAdir", required_argument, 0, 'C' }, + { "gridmap", required_argument, 0, 'g' }, + { "socket",required_argument, 0, 's' }, + { "noAuth", no_argument, 0, 'x' }, + { "noIPC", no_argument, 0, 'y' }, + { "noParse", no_argument, 0, 'z' }, + { NULL, 0, NULL, 0} +}; + +/* + *---------------------------------------------------------------------- + * + * usage - print usage + * + *---------------------------------------------------------------------- + */ + +static void +usage(char *program_name) { + fprintf(stdout,"%s\n" + "- collect events from logging API calls,\n" + "- save them to files and\n" + "- send them to inter-logger\n\n" + "Usage: %s [option]\n" + "-h, --help display this help and exit\n" + "-V, --version output version information and exit\n" + "-d, --debug do not run as daemon\n" + "-v, --verbose print extensive debug output\n" + "-p, --port port to listen\n" + "-f, --file-prefix path and prefix for event files\n" + "-c, --cert location of server certificate\n" + "-k, --key location of server private key\n" + "-C, --CAdir directory containing CA certificates\n" + "-s, --socket socket to send messages (NOT IMPLEMENTED YET)\n" + "--noAuth do not check caller's identity\n" + "--noIPC do not send messages to inter-logger\n" + "--noParse do not parse messages for correctness\n", + program_name,program_name); +} + +static sighandler_t mysignal(int num,sighandler_t handler) +{ + struct sigaction sa,osa; + + memset(&sa,0,sizeof(sa)); + sa.sa_handler = handler; + sa.sa_flags = SA_RESTART; + return sigaction(num,&sa,&osa) ? SIG_ERR : osa.sa_handler; +} + +/* + *---------------------------------------------------------------------- + * + * handle_signal - + * USR1 - increase the verbosity of the program + * USR2 - decrease the verbosity of the program + * + *---------------------------------------------------------------------- + */ +void handle_signal(int num) { + if (num != SIGCHLD) edg_wll_ll_log(LOG_NOTICE,"Received signal %d\n", num); + switch (num) { + case SIGUSR1: + if (edg_wll_ll_log_level < LOG_DEBUG) edg_wll_ll_log_level++; + edg_wll_ll_log(LOG_NOTICE,"Logging level is now %d\n", edg_wll_ll_log_level); + break; + case SIGUSR2: + if (edg_wll_ll_log_level > LOG_EMERG) edg_wll_ll_log_level--; + edg_wll_ll_log(LOG_NOTICE,"Logging level is now %d\n", edg_wll_ll_log_level); + break; + case SIGPIPE: + edg_wll_ll_log(LOG_NOTICE,"Broken pipe, lost communication channel.\n"); + break; + case SIGCHLD: + while (wait3(NULL,WNOHANG,NULL) > 0); + break; + case SIGINT: + case SIGTERM: + case SIGQUIT: + if (confirm_sock) { + edg_wll_ll_log(LOG_NOTICE,"Closing confirmation socket.\n"); + close(confirm_sock); + unlink(confirm_sock_name); + } + exit(1); + break; + default: break; + } +} + +/* + *---------------------------------------------------------------------- + * + * doit - do all the dirty work + * + *---------------------------------------------------------------------- + */ +static int +doit(int socket, void *cred_handle, char *file_name_prefix, int noipc, int noparse) +{ + SSL *ssl = NULL; + X509 *peer = NULL; + + char *subject; + char buf[1024]; + int ret; + struct timeval timeout = {10,0}; + + /* authentication */ + edg_wll_ll_log(LOG_INFO,"Processing authentication:\n"); +// FIXME - put here some meaningfull value of timeout + do somthing if timeouted + ssl = edg_wll_ssl_accept(cred_handle,socket,&timeout); + if (ssl==NULL) { + edg_wll_ll_log(LOG_ERR,"edg_wll_ssl_accept() failed (%s)\n", + ERR_error_string(ERR_get_error(), NULL)); + return(-1); + } + peer = SSL_get_peer_certificate(ssl); + if (peer != NULL) { + X509_NAME *s; + + edg_wll_ll_log(LOG_INFO," User successfully authenticated with the following certificate:\n"); + X509_NAME_oneline(X509_get_issuer_name(peer), buf, sizeof(buf)); + edg_wll_ll_log(LOG_INFO, " Issuer: %s\n", buf); + X509_NAME_oneline(X509_get_subject_name(peer), buf, sizeof(buf)); + edg_wll_ll_log(LOG_INFO, " Subject: %s\n", buf); + s = X509_NAME_dup(X509_get_subject_name(peer)); + proxy_get_base_name(s); + subject=X509_NAME_oneline(s,NULL,0); + X509_NAME_free(s); + } + else { + edg_wll_ll_log(LOG_INFO," User not authenticated, setting as \"%s\". \n",EDG_WLL_LOG_USER_DEFAULT); + subject=strdup(EDG_WLL_LOG_USER_DEFAULT); + } + + ret = edg_wll_log_proto_server(ssl,subject,file_name_prefix,noipc,noparse); + + edg_wll_ssl_close(ssl); + if (subject) free(subject); + return ret; +} + +/* + *---------------------------------------------------------------------- + * + * Main - + * + *---------------------------------------------------------------------- + */ +int main(int argc, char *argv[]) +{ + void *cred_handle = NULL; + + int ret; + int childpid; + int opt; + + int listener_fd; + int client_fd; + struct sockaddr_in client_addr; + int client_addr_len; + + char *my_subject_name = NULL; + + time_t cert_mtime = 0, key_mtime = 0; + + + setlinebuf(stdout); + setlinebuf(stderr); + + /* welcome */ + fprintf(stdout,"\ +This is LocalLogger, part of Workload Management System in EU DataGrid.\ +Copyright (c) 2002 CERN, INFN and CESNET on behalf of the EU DataGrid.\n"); + + /* get arguments */ + while ((opt = getopt_long(argc,argv, + "h" /* help */ + "V" /* version */ + "v" /* verbose */ + "d" /* debug */ + "p:" /* port */ + "f:" /* file prefix */ + "c:" /* certificate */ + "k:" /* key */ + "C:" /* CA dir */ + "g:" /* gridmap */ + "s:" /* socket */ + "x" /* noAuth */ + "y" /* noIPC */ + "z", /* noParse */ + long_options, (int *) 0)) != EOF) { + + switch (opt) { + case 'V': fprintf(stdout,"%s:\t%s\n",argv[0],rcsid); exit(0); + case 'v': verbose = 1; break; + case 'd': debug = 1; break; + case 'p': port = atoi(optarg); break; + case 'f': prefix = optarg; break; + case 'c': cert_file = optarg; break; + case 'k': key_file = optarg; break; + case 'C': CAcert_dir = optarg; break; + case 'g': gridmap_file = optarg; break; + case 's': socket_path = optarg; break; + case 'x': noAuth = 1; break; + case 'y': noIPC = 1; break; + case 'z': noParse = 1; break; + case 'h': + default: + usage(argv[0]); exit(0); + } + } + edg_wll_ll_log_init(verbose ? LOG_DEBUG : LOG_INFO); + edg_wll_ll_log(LOG_INFO,"Initializing...\n"); + + /* check noParse */ + edg_wll_ll_log(LOG_INFO,"Parse messages for correctness..."); + if (noParse) { + edg_wll_ll_log(LOG_INFO,"no.\n"); + } else { + edg_wll_ll_log(LOG_INFO,"yes.\n"); + } + + /* check noIPC */ + edg_wll_ll_log(LOG_INFO,"Send messages also to inter-logger..."); + if (noIPC) { + edg_wll_ll_log(LOG_INFO,"no.\n"); + } else { + edg_wll_ll_log(LOG_INFO,"yes.\n"); + } + + /* check prefix correctness */ +/* XXX: check probably also write permisions */ + edg_wll_ll_log(LOG_INFO,"Store messages with the filename prefix \"%s\"...",prefix); + if (strlen(prefix) > FILENAME_MAX - 34) { + edg_wll_ll_log(LOG_INFO,"no.\n"); + edg_wll_ll_log(LOG_CRIT,"Too long prefix for file names, would not be able to write to log files. Exiting.\n"); + exit(1); + } else { + edg_wll_ll_log(LOG_INFO,"yes.\n"); + } + + /* parse X509 arguments to environment */ + edg_wll_set_environment(cert_file, key_file, NULL, NULL, CAcert_dir, gridmap_file); + if (noAuth) setenv("X509_USER_PROXY","/dev/null",1); + + /* daemonize */ + edg_wll_ll_log(LOG_INFO,"Running as daemon..."); + if (debug) { + edg_wll_ll_log(LOG_NOTICE,"no.\n"); + } + else if (daemon(0,0) < 0) { + edg_wll_ll_log(LOG_CRIT,"Failed to run as daemon. Exiting.\n"); + perror("daemon"); + exit(1); + } + else { + edg_wll_ll_log(LOG_INFO,"yes.\n"); + } + + /* initialize Globus common module */ + edg_wll_ll_log(LOG_INFO,"Initializing Globus common module..."); + if (globus_module_activate(GLOBUS_COMMON_MODULE) != GLOBUS_SUCCESS) { + edg_wll_ll_log(LOG_NOTICE,"no.\n"); + edg_wll_ll_log(LOG_CRIT, "Failed to initialize Globus common module. Exiting.\n"); + exit(1); + } else { + edg_wll_ll_log(LOG_INFO,"yes.\n"); + } + + /* initialize signal handling */ + if (mysignal(SIGUSR1, handle_signal) == SIG_ERR) { perror("signal"); exit(1); } + if (mysignal(SIGUSR2, handle_signal) == SIG_ERR) { perror("signal"); exit(1); } + if (mysignal(SIGPIPE, handle_signal) == SIG_ERR) { perror("signal"); exit(1); } + if (mysignal(SIGHUP, SIG_DFL) == SIG_ERR) { perror("signal"); exit(1); } + if (mysignal(SIGINT, handle_signal) == SIG_ERR) { perror("signal"); exit(1); } + if (mysignal(SIGQUIT, handle_signal) == SIG_ERR) { perror("signal"); exit(1); } + if (mysignal(SIGTERM, handle_signal) == SIG_ERR) { perror("signal"); exit(1); } + if (mysignal(SIGCHLD, handle_signal) == SIG_ERR) { perror("signal"); exit(1); } + + /* SSL init */ + edg_wll_ll_log(LOG_INFO,"Initializing SSL:\n"); + if (edg_wlc_SSLInitialization() != 0) { + edg_wll_ll_log(LOG_CRIT,"Failed to initialize SSL. Exiting.\n"); + exit(1); + } + + cred_handle=edg_wll_ssl_init(SSL_VERIFY_PEER, 1,cert_file,key_file,0,0); + if (cred_handle==NULL) { + edg_wll_ll_log(LOG_CRIT,"Failed to initialize SSL certificates. Exiting.\n"); + exit(1); + } + + edg_wll_ssl_get_my_subject(cred_handle, &my_subject_name); + if (my_subject_name!=NULL) { + edg_wll_ll_log(LOG_INFO," server running with certificate: %s\n",my_subject_name); + free(my_subject_name); + } else if (noAuth) { + edg_wll_ll_log(LOG_INFO," running without certificate\n"); + } else { + edg_wll_ll_log(LOG_CRIT,"No server credential found. Exiting.\n"); + exit(1); + } + + /* do listen */ + edg_wll_ll_log(LOG_INFO,"Listening on port %d\n",port); + listener_fd = do_listen(port); + if (listener_fd == -1) { + edg_wll_ll_log(LOG_CRIT,"Failed to listen on port %d\n",port); + edg_wll_ssl_free(cred_handle); + exit(-1); + } + + client_addr_len = sizeof(client_addr); + bzero((char *) &client_addr, client_addr_len); + + /* + * Main loop + */ + while (1) { + edg_wll_ll_log(LOG_INFO,"Accepting incomming connections...\n"); + client_fd = accept(listener_fd, (struct sockaddr *) &client_addr, + &client_addr_len); + if (client_fd < 0) { + close(listener_fd); + edg_wll_ll_log(LOG_CRIT,"Failed to accept incomming connections\n"); + perror("accept"); + edg_wll_ssl_free(cred_handle); + exit(-1); + } + + if (edg_wll_ssl_watch_creds(key_file,cert_file,&key_mtime,&cert_mtime) > 0) { + void * new_cred_handle=edg_wll_ssl_init(SSL_VERIFY_PEER, 1,cert_file,key_file,0,0); + if (new_cred_handle) { + edg_wll_ssl_free(cred_handle); + cred_handle = new_cred_handle; + } + } + /* FORK - change next line if fork() is not needed (for debugging for + * example + */ +#if 1 + if ((childpid = fork()) < 0) { + perror("fork()"); + close(client_fd); + } + if (childpid == 0) { + ret=doit(client_fd,cred_handle,prefix,noIPC,noParse); + close(client_fd); + goto end; + } + if (childpid > 0) { + close(client_fd); + } +#else + ret=doit(client_fd,cred_handle,prefix,msg_sock); + close(client_fd); +#endif + } /* while */ + +end: + close(listener_fd); + edg_wll_ssl_free(cred_handle); + exit(ret); +} diff --git a/org.glite.lb.logger/src/logd_proto.c b/org.glite.lb.logger/src/logd_proto.c new file mode 100644 index 0000000..4ef2f29 --- /dev/null +++ b/org.glite.lb.logger/src/logd_proto.c @@ -0,0 +1,870 @@ +#ident "$Header$" + +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include "logd_proto.h" +#include "glite/lb/context-int.h" +#include "glite/lb/escape.h" +#include "glite/lb/events_parse.h" + +static const int one = 1; + +extern char* socket_path; + +int edg_wll_ll_log_level; + +#define tv_sub(a,b) {\ + (a).tv_usec -= (b).tv_usec;\ + (a).tv_sec -= (b).tv_sec;\ + if ((a).tv_usec < 0) {\ + (a).tv_sec--;\ + (a).tv_usec += 1000000;\ + }\ +} + +/* + *---------------------------------------------------------------------- + * + * send_answer_back - + * + *---------------------------------------------------------------------- + */ +static int send_answer_back(SSL *ssl, int answer, struct timeval *timeout) { + int count = 0; + int err = 0; + int ans = answer; + u_int8_t ans_end[4]; + + edg_wll_ll_log(LOG_INFO,"Sending answer \"%d\" back to client...",answer); + ans_end[0] = ans & 0xff; ans >>= 8; + ans_end[1] = ans & 0xff; ans >>= 8; + ans_end[2] = ans & 0xff; ans >>= 8; + ans_end[3] = ans; + if ((err = edg_wll_ssl_write_full(ssl,ans_end,4,timeout,&count)) < 0 ) { + edg_wll_ll_log(LOG_INFO,"error.\n"); + return edg_wll_log_proto_server_failure(err,"Error sending answer"); + } else { + edg_wll_ll_log(LOG_INFO,"o.k.\n"); + return 0; + } +} + +/* + *---------------------------------------------------------------------- + * + * wait_for_confirmation - + * + * Args: timeout - number of seconds to wait, 0 => wait indefinitely + * + * Returns: 1 => OK, *code contains error code sent by interlogger + * 0 => timeout expired before anything interesting happened + * -1 => some error (see errno for details) + * + *---------------------------------------------------------------------- + */ +int confirm_sock; +char confirm_sock_name[256]; + +static +int init_confirmation() +{ + struct sockaddr_un saddr; + + /* create socket */ + if((confirm_sock=socket(PF_UNIX, SOCK_STREAM, 0)) < 0) { + edg_wll_ll_log(LOG_ERR,"init_confirmation(): error creating socket\n"); + SYSTEM_ERROR("socket"); + return(-1); + } + + /* set the socket parameters */ + memset(&saddr, 0, sizeof(saddr)); + saddr.sun_family = AF_UNIX; + strcpy(saddr.sun_path, confirm_sock_name); + + /* bind the socket */ + if(bind(confirm_sock, (struct sockaddr *)&saddr, sizeof(saddr.sun_path)) < 0) { + edg_wll_ll_log(LOG_ERR,"init_confirmation(): error binding socket\n"); + SYSTEM_ERROR("bind"); + close(confirm_sock); + unlink(confirm_sock_name); + return(-1); + } + + /* and listen */ + if(listen(confirm_sock, 5) < 0) { + edg_wll_ll_log(LOG_ERR,"init_confirmation(): error listening on socket\n"); + SYSTEM_ERROR("listen"); + close(confirm_sock); + unlink(confirm_sock_name); + return(-1); + } + + return(0); +} + + +int wait_for_confirmation(struct timeval *timeout, int *code) +{ + fd_set fds; + struct timeval to,before,after; + int ret = 0, tmp = 0; + + *code = 0; + + FD_ZERO(&fds); + FD_SET(confirm_sock, &fds); + + /* set timeout */ + if (timeout) { + memcpy(&to,timeout,sizeof to); + gettimeofday(&before,NULL); + } + + /* wait for confirmation at most timeout seconds */ + if ((tmp=select(confirm_sock+1, &fds, NULL, NULL, timeout?&to:NULL)) < 0) { + edg_wll_ll_log(LOG_ERR,"wait_for_confirmation(): error selecting socket\n"); + SYSTEM_ERROR("select"); + ret = -1; + } else { + if (tmp == 0) + ret = 0; + else { + int nsd = accept(confirm_sock, NULL, NULL); + ret = 1; + if(nsd < 0) { + edg_wll_ll_log(LOG_ERR,"wait_for_confirmation(): error accepting a connection on a socket\n"); + SYSTEM_ERROR("accept"); + ret = -1; + } else { + if(recv(nsd, code, sizeof(*code), MSG_NOSIGNAL) < 0) { + edg_wll_ll_log(LOG_ERR,"wait_for_confirmation(): error receiving a message from a socket\n"); + SYSTEM_ERROR("recv"); + ret = -1; + } + close(nsd); + } + } + } + close(confirm_sock); + unlink(confirm_sock_name); + if (timeout) { + gettimeofday(&after,NULL); + tv_sub(after,before); + tv_sub(*timeout,after); + if (timeout->tv_sec < 0) { + timeout->tv_sec = 0; + timeout->tv_usec = 0; + } + } + return ret; +} + +/* + *---------------------------------------------------------------------- + * + * do_listen - listen on given port + * + * Returns: socket handle or -1 if something fails + * + * Calls: socket, bind, listen + * + * Algorithm: + * + *---------------------------------------------------------------------- + */ +int do_listen(int port) +{ + int ret; + int sock; + struct sockaddr_in my_addr; + + memset(&my_addr, 0, sizeof(my_addr)); + my_addr.sin_family = AF_INET; + my_addr.sin_addr.s_addr = INADDR_ANY; + my_addr.sin_port = htons(port); + + sock = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP); + if (sock == -1) { + edg_wll_ll_log(LOG_ERR,"do_listen(): error creating socket\n"); + SYSTEM_ERROR("socket"); + return -1; + } + + setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(one)); + ret = bind(sock, (struct sockaddr *)&my_addr, sizeof(my_addr)); + if (ret == -1) { + edg_wll_ll_log(LOG_ERR,"do_listen(): error binding socket\n"); + SYSTEM_ERROR("bind"); + return -1; + } + + ret = listen(sock, 5); + if (ret == -1) { + edg_wll_ll_log(LOG_ERR,"do_listen(): error listening on socket\n"); + SYSTEM_ERROR("listen"); + close(sock); + return -1; + } + + return sock; +} + +/*! + *---------------------------------------------------------------------- + * Write to socket + * Needn't write entire buffer. Timeout is applicable only for non-blocking + * connections + * \param sock IN: connection to work with + * \param buf IN: buffer + * \param bufsize IN: max size to write + * \param timeout INOUT: max time allowed for operation, remaining time on return + * \retval bytes written (>0) on success + * \retval -1 on write error + *---------------------------------------------------------------------- + */ +static ssize_t edg_wll_socket_write(int sock,const void *buf,size_t bufsize,struct timeval *timeout) +{ + ssize_t len = 0, ret = 0; + fd_set fds; + struct timeval to,before,after; + + if (timeout) { + memcpy(&to,timeout,sizeof to); + gettimeofday(&before,NULL); + } + len = write(sock,buf,bufsize); + while (len <= 0) { + FD_ZERO(&fds); + FD_SET(sock,&fds); + if ((ret=select(sock+1,&fds,NULL,NULL,timeout?&to:NULL)) < 0) { + edg_wll_ll_log(LOG_ERR,"edg_wll_socket_write(): error selecting socket\n"); + SYSTEM_ERROR("select"); + break; + } + len = write(sock,buf,bufsize); + } + if (timeout) { + gettimeofday(&after,NULL); + tv_sub(after,before); + tv_sub(*timeout,after); + if (timeout->tv_sec < 0) { + timeout->tv_sec = 0; + timeout->tv_usec = 0; + } + } + return len; +} + +/*! + *---------------------------------------------------------------------- + * Write specified amount of data to socket + * Attempts to call edg_wll_socket_write() untill the entire request is satisfied + * (or times out). + * \param sock IN: connection to work with + * \param buf IN: buffer + * \param bufsize IN: max size to write + * \param timeout INOUT: max time allowed for operation, remaining time on return + * \param total OUT: bytes actually written + * \retval bytes written (>0) on success + * \retval -1 on write error + *---------------------------------------------------------------------- + */ +static ssize_t edg_wll_socket_write_full(int sock,void *buf,size_t bufsize,struct timeval *timeout,ssize_t *total) +{ + ssize_t len; + *total = 0; + + while (*total < bufsize) { + len = edg_wll_socket_write(sock,buf+*total,bufsize-*total,timeout); + if (len < 0) return len; + *total += len; + } + return 0; +} + +/* + *---------------------------------------------------------------------- + * + * edg_wll_log_proto_server - handle incoming data + * + * Returns: 0 if done properly or errno + * + * Calls: + * + * Algorithm: + * + *---------------------------------------------------------------------- + */ +int edg_wll_log_proto_server(SSL *ssl, char *name, char *prefix, int noipc, int noparse) +{ + char *buf,*dglllid,*dguser,*jobId,*name_esc; + char header[EDG_WLL_LOG_SOCKET_HEADER_LENGTH+1]; + char outfilename[FILENAME_MAX]; + int count,count_total,size; + u_int8_t size_end[4]; + size_t msg_size,dglllid_size,dguser_size; + int i,answer,answer_sent; + int msg_sock; + char *msg,*msg_begin; + FILE *outfile; + int filedesc,filelock_status,flags; + long filepos; + struct flock filelock; + int priority; + long lllid; + int unique; + struct timeval timeout; + int err; + edg_wll_Context context; + edg_wll_Event *event; + + errno = i = answer = answer_sent = size = msg_size = dglllid_size = dguser_size = count = count_total = msg_sock = filedesc = filelock_status = /* priority */ unique = err = 0; + buf = dglllid = dguser = jobId = name_esc = msg = msg_begin = NULL; + event = NULL; + if (EDG_WLL_LOG_TIMEOUT_MAX > EDG_WLL_LOG_SYNC_TIMEOUT_MAX) timeout.tv_sec = EDG_WLL_LOG_TIMEOUT_MAX; + else timeout.tv_sec = EDG_WLL_LOG_SYNC_TIMEOUT_MAX; + timeout.tv_usec = 0; + if (edg_wll_InitContext(&context) != 0) { + edg_wll_ll_log(LOG_ERR,"edg_wll_InitContex(): error.\n"); + answer = ENOMEM; + goto edg_wll_log_proto_server_end; + } + if (edg_wll_ResetError(context) != 0) { + edg_wll_ll_log(LOG_ERR,"edg_wll_ResetError(): error.\n"); + answer = ENOMEM; + goto edg_wll_log_proto_server_end; + } + + /* look for the unique unused long local-logger id (LLLID) */ + lllid = 1000*getpid(); + for (i=0; (i<1000)&&(!unique); i++) { + lllid += i; + snprintf(confirm_sock_name, sizeof(confirm_sock_name), "/tmp/dglogd_sock_%ld", lllid); + if ((filedesc = open(confirm_sock_name,O_CREAT)) == -1) { + if (errno == EEXIST) { + edg_wll_ll_log(LOG_WARNING,"Warning: LLLID %ld already in use.\n",lllid); + } else { + SYSTEM_ERROR("open"); + } + } else { + unique = 1; + close(filedesc); filedesc = 0; + unlink(confirm_sock_name); + } + } + if (!unique) { + edg_wll_ll_log(LOG_ERR,"Cannot determine the unique long local-logger id (LLLID)!\n",lllid); + return EAGAIN; + } + edg_wll_ll_log(LOG_INFO,"Long local-logger id (LLLID): %ld\n",lllid); + + /* receive socket header */ + edg_wll_ll_log(LOG_INFO,"Reading socket header..."); + memset(header, 0, EDG_WLL_LOG_SOCKET_HEADER_LENGTH+1); + if ((err = edg_wll_ssl_read_full(ssl, header, EDG_WLL_LOG_SOCKET_HEADER_LENGTH, &timeout, &count)) < 0) { + edg_wll_ll_log(LOG_INFO,"error.\n"); + answer = edg_wll_log_proto_server_failure(err,"Error receiving header"); + goto edg_wll_log_proto_server_end; + } else { + edg_wll_ll_log(LOG_INFO,"o.k.\n"); + } + edg_wll_ll_log(LOG_DEBUG,"Checking socket header..."); + header[EDG_WLL_LOG_SOCKET_HEADER_LENGTH] = '\0'; + if (strncmp(header,EDG_WLL_LOG_SOCKET_HEADER,EDG_WLL_LOG_SOCKET_HEADER_LENGTH)) { + /* not the proper socket header text */ + edg_wll_ll_log(LOG_DEBUG,"error.\n"); + edg_wll_ll_log(LOG_ERR,"edg_wll_log_proto_server(): invalid socket header\n"); + edg_wll_ll_log(LOG_DEBUG,"edg_wll_log_proto_server(): read header '%s' instead of '%s'\n", + header,EDG_WLL_LOG_SOCKET_HEADER); + answer = EINVAL; + goto edg_wll_log_proto_server_end; + } else { + edg_wll_ll_log(LOG_DEBUG,"o.k.\n"); + } + +/* + edg_wll_ll_log(LOG_DEBUG,"Reading message priority..."); + count = 0; + if ((err = edg_wll_ssl_read_full(ssl, &priority, sizeof(priority), &timeout, &count)) < 0) { + edg_wll_ll_log(LOG_DEBUG,"error.\n"); + answer = edg_wll_log_proto_server_failure(err,"Error receiving message priority"); + goto edg_wll_log_proto_server_end; + } else { + edg_wll_ll_log(LOG_DEBUG,"o.k.\n"); + } +*/ + + edg_wll_ll_log(LOG_DEBUG,"Reading message size..."); + count = 0; + if ((err = edg_wll_ssl_read_full(ssl, size_end, 4, &timeout, &count)) < 0) { + edg_wll_ll_log(LOG_DEBUG,"error.\n"); + answer = edg_wll_log_proto_server_failure(err,"Error receiving message size"); + goto edg_wll_log_proto_server_end; + } else { + edg_wll_ll_log(LOG_DEBUG,"o.k.\n"); + } + size = size_end[3]; size <<=8; + size |= size_end[2]; size <<=8; + size |= size_end[1]; size <<=8; + size |= size_end[0]; + edg_wll_ll_log(LOG_DEBUG,"Checking message size..."); + if (size <= 0) { + edg_wll_ll_log(LOG_DEBUG,"error.\n"); + /* probably wrong size in the header or nothing to read */ + edg_wll_ll_log(LOG_ERR,"edg_wll_log_proto_server(): invalid size read from socket header\n"); + edg_wll_ll_log(LOG_DEBUG,"Read size '%d'.\n",size); + answer = EINVAL; + goto edg_wll_log_proto_server_end; + } else { + edg_wll_ll_log(LOG_DEBUG,"o.k.\n"); + edg_wll_ll_log(LOG_DEBUG,"- Size read from header: %d bytes.\n",size); + } + + /* format the DG.LLLID string */ + if (asprintf(&dglllid,"DG.LLLID=%ld ",lllid) == -1) { + edg_wll_ll_log(LOG_ERR,"edg_wll_log_proto_server(): nomem for DG.LLLID\n"); + SYSTEM_ERROR("asprintf"); + answer = ENOMEM; + goto edg_wll_log_proto_server_end; + } + dglllid_size = strlen(dglllid); + + /* format the DG.USER string */ + name_esc = edg_wll_LogEscape(name); + if (asprintf(&dguser,"DG.USER=\"%s\" ",name_esc) == -1) { + edg_wll_ll_log(LOG_ERR,"edg_wll_log_proto_server(): nomem for DG.USER\n"); + SYSTEM_ERROR("asprintf"); + answer = ENOMEM; + goto edg_wll_log_proto_server_end; + } + dguser_size = strlen(dguser); + + /* allocate enough memory for all data */ + msg_size = dglllid_size + dguser_size + size + 1; + if ((msg = malloc(msg_size)) == NULL) { + edg_wll_ll_log(LOG_ERR,"edg_wll_log_proto_server(): out of memory for allocating message\n"); + SYSTEM_ERROR("malloc"); + answer = ENOMEM; + goto edg_wll_log_proto_server_end; + } + strncpy(msg,dglllid,dglllid_size); + msg_begin = msg + dglllid_size; // this is the "official" beginning of the message + strncpy(msg_begin,dguser,dguser_size); + + /* receive message */ + edg_wll_ll_log(LOG_INFO,"Reading message from socket..."); + buf = msg_begin + dguser_size; + count = 0; + if ((err = edg_wll_ssl_read_full(ssl, buf, size, &timeout, &count)) < 0) { + edg_wll_ll_log(LOG_INFO,"error.\n"); + answer = edg_wll_log_proto_server_failure(err,"Error receiving message"); + goto edg_wll_log_proto_server_end; + } else { + edg_wll_ll_log(LOG_INFO,"o.k.\n"); + } + + if (buf[count] != '\0') buf[count] = '\0'; + + /* parse message and get jobId and priority from it */ + if (!noparse && strstr(msg, "DG.TYPE=\"command\"") == NULL) { + edg_wll_ll_log(LOG_INFO,"Parsing message for correctness..."); + if (edg_wll_ParseEvent(context,msg_begin,&event) != 0) { + edg_wll_ll_log(LOG_INFO,"error.\n"); + edg_wll_ll_log(LOG_ERR,"edg_wll_log_proto_server(): edg_wll_ParseEvent error\n"); + edg_wll_ll_log(LOG_ERR,"edg_wll_ParseEvent(): %s\n",context->errDesc); + answer = edg_wll_Error(context,NULL,NULL); + goto edg_wll_log_proto_server_end; + } else { + edg_wll_ll_log(LOG_INFO,"o.k.\n"); + } + edg_wll_ll_log(LOG_DEBUG,"Getting jobId from message..."); + jobId = edg_wlc_JobIdGetUnique(event->any.jobId); + priority = event->any.priority; + edg_wll_FreeEvent(event); + event->any.priority = priority; + edg_wll_ll_log(LOG_DEBUG,"o.k.\n"); + } else { +/* FIXME: what if edg_wll_InitEvent fails? should be checked somehow -> nomem etc. */ + event = edg_wll_InitEvent(EDG_WLL_EVENT_UNDEF); +/* XXX: + event = calloc(1,sizeof(*event)); + if(event == NULL) { + edg_wll_ll_log(LOG_ERR, "out of memory\n"); + answer = ENOMEM; + goto edg_wll_log_proto_server_end; + } +*/ + +/* XXX: obsolete, logd now doesn't need jobId for 'command' messages, + * it will be probably needed for writing 'command' messages to some files + edg_wll_ll_log(LOG_DEBUG,"Getting jobId from message..."); + jobId = edg_wll_GetJobId(msg); + if (!jobId || edg_wlc_JobIdParse(jobId,&j)) { + edg_wll_ll_log(LOG_DEBUG,"error.\n"); + edg_wll_ll_log(LOG_ERR,"ParseJobId(%s)\n",jobId?jobId:"NULL"); + answer = EINVAL; + goto edg_wll_log_proto_server_end; + } else { + edg_wll_ll_log(LOG_DEBUG,"o.k.\n"); + } + free(jobId); + jobId = edg_wlc_JobIdGetUnique(j); + edg_wlc_JobIdFree(j); +*/ + +/* FIXME: get the priority from message some better way */ + if (strstr(msg, "DG.PRIORITY=1") != NULL) + event->any.priority = 1; + else event->any.priority = 0; + } + + + /* if not command, save message to file */ + if(strstr(msg, "DG.TYPE=\"command\"") == NULL) { + /* compose the name of the log file */ +// edg_wll_ll_log(LOG_DEBUG,"Composing filename from prefix \"%s\" and jobId \"%s\"...",prefix,jobId); + count = strlen(prefix); + strncpy(outfilename,prefix,count); count_total=count; + strncpy(outfilename+count_total,".",1); count_total+=1; count=strlen(jobId); + strncpy(outfilename+count_total,jobId,count); count_total+=count; + outfilename[count_total]='\0'; +// edg_wll_ll_log(LOG_DEBUG,"o.k.\n"); + edg_wll_ll_log(LOG_INFO,"Writing message to \"%s\"...",outfilename); + + i = 0; +open_event_file: + /* fopen and properly handle the filelock */ + if ((outfile = fopen(outfilename,"a")) == NULL) { + edg_wll_ll_log(LOG_INFO,"error.\n"); + SYSTEM_ERROR("fopen"); + answer = errno; + goto edg_wll_log_proto_server_end; + } else { + edg_wll_ll_log(LOG_INFO,"."); + } + if ((filedesc = fileno(outfile)) == -1) { + edg_wll_ll_log(LOG_INFO,"error.\n"); + SYSTEM_ERROR("fileno"); + answer = errno; + fclose(outfile); + goto edg_wll_log_proto_server_end; + } else { + edg_wll_ll_log(LOG_INFO,"."); + } + filelock.l_type = F_WRLCK; + filelock.l_whence = SEEK_SET; + filelock.l_start = 0; + filelock.l_len = 0; + if ((filelock_status=fcntl(filedesc,F_SETLK,&filelock) < 0) && (i < FCNTL_ATTEMPTS)) { + fclose(outfile); + edg_wll_ll_log(LOG_DEBUG,"\nWaiting %d seconds for filelock to open...\n",FCNTL_TIMEOUT); + sleep(FCNTL_TIMEOUT); + i++; + goto open_event_file; + } + if (filelock_status < 0) { + edg_wll_ll_log(LOG_INFO,"error.\n"); + SYSTEM_ERROR("fcntl"); + answer = errno; + fclose(outfile); + goto edg_wll_log_proto_server_end; + } else { + edg_wll_ll_log(LOG_INFO,"."); + } + if (fseek(outfile, 0, SEEK_END) == -1) { + SYSTEM_ERROR("fseek"); + answer = errno; + fclose(outfile); + goto edg_wll_log_proto_server_end; + } + if ((filepos=ftell(outfile)) == -1) { + SYSTEM_ERROR("ftell"); + answer = errno; + fclose(outfile); + goto edg_wll_log_proto_server_end; + } + /* write, flush and sync */ + if (fputs(msg,outfile) == EOF) { + edg_wll_ll_log(LOG_INFO,"error.\n"); + SYSTEM_ERROR("fputs"); + answer = errno; + fclose(outfile); + goto edg_wll_log_proto_server_end; + } + if (fflush(outfile) == EOF) { + edg_wll_ll_log(LOG_INFO,"error.\n"); + SYSTEM_ERROR("fflush"); + answer = errno; + fclose(outfile); + goto edg_wll_log_proto_server_end; + } + if (fsync(filedesc) < 0) { /* synchronize */ + edg_wll_ll_log(LOG_INFO,"error.\n"); + SYSTEM_ERROR("fsync"); + answer = errno; + fclose(outfile); + goto edg_wll_log_proto_server_end; + } else { + edg_wll_ll_log(LOG_INFO,"o.k.\n"); + } + /* close and unlock */ + fclose(outfile); + } else { + filepos = 0; + } + + + /* if not priority send now the answer back to client */ + if (!event->any.priority) { + if (!send_answer_back(ssl,answer,&timeout)) { + answer_sent = 1; + } + } + + /* send message via IPC (UNIX socket) */ + if (!noipc) { + struct sockaddr_un saddr; + edg_wll_ll_log(LOG_INFO,"The message will be send via IPC (UNIX socket):\n"); + + /* initialize socket */ + edg_wll_ll_log(LOG_DEBUG,"Initializing UNIX socket...\n"); + + edg_wll_ll_log(LOG_DEBUG,"- Getting UNIX socket descriptor..."); + msg_sock = socket(PF_UNIX, SOCK_STREAM, 0); + if(msg_sock < 0) { + edg_wll_ll_log(LOG_DEBUG,"error.\n"); + SYSTEM_ERROR("socket"); + answer = errno; + goto edg_wll_log_proto_server_end; + } else { + edg_wll_ll_log(LOG_DEBUG,"o.k.\n"); + } + + edg_wll_ll_log(LOG_DEBUG,"- Setting UNIX socket parameters..."); + memset(&saddr, 0, sizeof(saddr)); + saddr.sun_family = AF_UNIX; + strcpy(saddr.sun_path, socket_path); + edg_wll_ll_log(LOG_DEBUG,"o.k.\n"); + + edg_wll_ll_log(LOG_DEBUG,"-- adding O_NONBLOCK to socket parameters..."); + if ((flags = fcntl(msg_sock, F_GETFL, 0)) < 0 || + fcntl(msg_sock, F_SETFL, flags | O_NONBLOCK) < 0) { + edg_wll_ll_log(LOG_DEBUG,"error.\n"); + SYSTEM_ERROR("fcntl"); + answer = errno; + close(msg_sock); + goto edg_wll_log_proto_server_end; + } else { + edg_wll_ll_log(LOG_DEBUG,"o.k.\n"); + } + + /* for priority messages initialize also another socket for confirmation */ + if (event->any.priority) { + edg_wll_ll_log(LOG_DEBUG,"- Initializing 2nd UNIX socket for priority messages confirmation..."); + if(init_confirmation() < 0) { + edg_wll_ll_log(LOG_DEBUG,"error.\n"); + answer = errno; + goto edg_wll_log_proto_server_end; + } else { + edg_wll_ll_log(LOG_DEBUG,"o.k.\n"); + } + } + + edg_wll_ll_log(LOG_DEBUG,"Connecting to UNIX socket..."); + if(connect(msg_sock, (struct sockaddr *)&saddr, sizeof(saddr)) < 0) { + if(errno != EISCONN) { + edg_wll_ll_log(LOG_DEBUG,"error.\n"); + SYSTEM_ERROR("connect"); + answer = errno; + close(msg_sock); + goto edg_wll_log_proto_server_end; + } else { + edg_wll_ll_log(LOG_DEBUG,"warning.\n"); + edg_wll_ll_log(LOG_ERR,"The socket is already connected!\n"); + } + } else { + edg_wll_ll_log(LOG_DEBUG,"o.k.\n"); + } + + edg_wll_ll_log(LOG_DEBUG,"Sending via IPC the message position %ld (%d bytes)...", filepos, sizeof(filepos)); + count = 0; + if (edg_wll_socket_write_full(msg_sock, &filepos, sizeof(filepos), &timeout, &count) < 0) { + edg_wll_ll_log(LOG_DEBUG,"error.\n"); + edg_wll_ll_log(LOG_ERR,"edg_wll_socket_write_full(): error,\n"); + answer = errno; + close(msg_sock); + goto edg_wll_log_proto_server_end; + } else { + edg_wll_ll_log(LOG_DEBUG,"o.k.\n"); + } + + edg_wll_ll_log(LOG_DEBUG,"Sending via IPC the message itself (%d bytes)...",msg_size); + if (edg_wll_socket_write_full(msg_sock, msg, msg_size, &timeout, &count) < 0) { + edg_wll_ll_log(LOG_DEBUG,"error.\n"); + edg_wll_ll_log(LOG_ERR,"edg_wll_socket_write_full(): error."); + answer = errno; + close(msg_sock); + goto edg_wll_log_proto_server_end; + } else { + edg_wll_ll_log(LOG_DEBUG,"o.k.\n"); + } + + close(msg_sock); + + if (event->any.priority) { + edg_wll_ll_log(LOG_INFO,"Waiting for confirmation..."); + if ((count = wait_for_confirmation(&timeout, &answer)) < 0) { + edg_wll_ll_log(LOG_INFO,"error.\n"); + edg_wll_ll_log(LOG_ERR,"wait_for_confirmation(): error.\n"); + answer = errno; + } else { + edg_wll_ll_log(LOG_INFO,"o.k.\n"); + if (count == 0) { + edg_wll_ll_log(LOG_DEBUG,"Waking up, timeout expired.\n"); + answer = EAGAIN; + } else { + edg_wll_ll_log(LOG_DEBUG,"Confirmation received, waking up.\n"); + } + } + } + } else { + edg_wll_ll_log(LOG_NOTICE,"Not sending via IPC.\n"); + } + +edg_wll_log_proto_server_end: + /* if not sent already, send the answer back to client */ + if (!answer_sent) { + answer = send_answer_back(ssl,answer,&timeout); + } + /* clean */ + edg_wll_FreeContext(context); + if (name_esc) free(name_esc); + if (dglllid) free(dglllid); + if (dguser) free(dguser); + if (jobId) free(jobId); + if (msg) free(msg); + if (event) free(event); + + edg_wll_ll_log(LOG_INFO,"Done.\n"); + + return answer; +} + +/* + *---------------------------------------------------------------------- + * + * edg_wll_log_proto_server_failure - handle protocol failures on the server side + * + * Returns: errno + * + *---------------------------------------------------------------------- + */ +int edg_wll_log_proto_server_failure(int code, const char *text) +{ + const char *func = "edg_wll_log_proto_server()"; + int ret = 0; + + if(code>0) { + return(0); + } + switch(code) { + case EDG_WLL_SSL_ERROR_EOF: + edg_wll_ll_log(LOG_ERR,"%s: %s, EOF occured\n", func, text); + ret = EAGAIN; + break; + case EDG_WLL_SSL_ERROR_TIMEOUT: + edg_wll_ll_log(LOG_ERR,"%s: %s, timeout expired\n", func, text); + ret = EAGAIN; + break; + case EDG_WLL_SSL_ERROR_ERRNO: perror("edg_wll_ssl_read()"); break; + edg_wll_ll_log(LOG_ERR,"%s: %s, system error occured\n", func, text); + ret = EAGAIN; + break; + case EDG_WLL_SSL_ERROR_SSL: + edg_wll_ll_log(LOG_ERR,"%s: %s, SSL error occured: %s\n", func, text, + ERR_reason_error_string(ERR_get_error())); + ret = EAGAIN; + break; + default: + edg_wll_ll_log(LOG_ERR,"%s: %s, unknown error occured\n"); + break; + } + return ret; +} + +/* + *---------------------------------------------------------------------- + * + * edg_wll_set_environment - set X509 environment variables form given args + * + * Calls: setenv + * + * Algorithm: + * + *---------------------------------------------------------------------- + */ +void edg_wll_set_environment(char *user_cert, + char *user_key, + char *user_proxy, + char *CAcert_file, + char *CAcert_dir, + char *gridmap) +{ + if (user_cert) setenv("X509_USER_CERT", user_cert, 1); + if (user_key) setenv("X509_USER_KEY", user_key, 1); + if (user_proxy) setenv("X509_USER_PROXY", user_proxy, 1); + if (CAcert_file) setenv("X509_CERT_FILE", CAcert_file, 1); + if (CAcert_dir) setenv("X509_CERT_DIR", CAcert_dir, 1); + if (gridmap) setenv("GRIDMAP", gridmap, 1); + + return; +} + +/* + *---------------------------------------------------------------------- + * + * edg_wll_ll_log_init - initialize the logging level + * + *---------------------------------------------------------------------- + */ +void edg_wll_ll_log_init(int level) { + edg_wll_ll_log_level = level; +} + +/* + *---------------------------------------------------------------------- + * + * edg_wll_ll_log - print to stderr according to logging level + * serious messages are also written to syslog + * + *---------------------------------------------------------------------- + */ +void edg_wll_ll_log(int level, const char *fmt, ...) { + char *err_text; + va_list fmt_args; + + va_start(fmt_args, fmt); + vasprintf(&err_text, fmt, fmt_args); + va_end(fmt_args); + + if(level <= edg_wll_ll_log_level) + fprintf(stderr, err_text); + if(level <= LOG_ERR) { + openlog("edg-wl-logd", LOG_PID | LOG_CONS, LOG_DAEMON); + syslog(level, "%s", err_text); + closelog(); + } + + if(err_text) free(err_text); + + return; +} diff --git a/org.glite.lb.logger/src/logd_proto.h b/org.glite.lb.logger/src/logd_proto.h new file mode 100644 index 0000000..b71bdf5 --- /dev/null +++ b/org.glite.lb.logger/src/logd_proto.h @@ -0,0 +1,59 @@ +#ifndef __EDG_WORKLOAD_LOGGING_LOCALLOGGER_LOGD_PROTO_H__ +#define __EDG_WORKLOAD_LOGGING_LOCALLOGGER_LOGD_PROTO_H__ + +#ident "$Header$" + +/** + * \file edg/workload/logging/locallogger/logd_proto.h + * \brief server part of the logging protocol + * \note private + */ + +#ifdef __cplusplus +extern "C" { +#endif + + +#include + +#include "glite/lb/log_proto.h" +#include "glite/lb/dgssl.h" + +int edg_wll_log_proto_server(SSL *ssl, char *name, char *prefix, int noipc, int noparse); +int edg_wll_log_proto_server_failure(int code, const char *text); + +#define SYSTEM_ERROR(my_err) { \ + if (errno !=0 ) \ + edg_wll_ll_log(LOG_ERR,"%s: %s\n",my_err,strerror(errno)); \ + else \ + edg_wll_ll_log(LOG_ERR,"%s\n",my_err); } + +void edg_wll_set_environment(char *user_cert, + char *user_key, + char *user_proxy, + char *CAcert_file, + char *CAcert_dir, + char *gridmapfile); + +/* locallogger daemon error handling */ + +extern int edg_wll_ll_log_level; +void edg_wll_ll_log_init(int level); +void edg_wll_ll_log(int level, const char *fmt, ...); + +/* fcntl defaults */ + +#define FCNTL_ATTEMPTS 5 +#define FCNTL_TIMEOUT 1 + +/* locallogger daemon listen and connect functions prototypes */ + +int do_listen(int port); +int do_connect(char *hostname, int port); + + +#ifdef __cplusplus +} +#endif + +#endif /* __EDG_WORKLOAD_LOGGING_LOCALLOGGER_LOGD_PROTO_H__ */ diff --git a/org.glite.lb.logger/src/queue_mgr.c b/org.glite.lb.logger/src/queue_mgr.c new file mode 100644 index 0000000..abae1f3 --- /dev/null +++ b/org.glite.lb.logger/src/queue_mgr.c @@ -0,0 +1,244 @@ +#ident "$Header$" + +#include +#include +#include + +#include "glite/lb/consumer.h" + +#include "interlogd.h" + +struct queue_list { + struct event_queue *queue; + char *dest; + struct queue_list *next; +}; + +static struct event_queue *log_queue; +static struct queue_list *queues; + + +static +int +queue_list_create() +{ + queues = NULL; + + return(0); +} + + +static +int +queue_list_find(struct queue_list *ql, const char *dest, struct queue_list **el, struct queue_list **prev) +{ + struct queue_list *q, *p; + + assert(el != NULL); + + *el = NULL; + if(prev) + *prev = NULL; + + if(ql == NULL) + return(0); + + q = NULL; + p = ql; + + while(p) { + if(strcmp(p->dest, dest) == 0) { + *el = p; + if(prev) + *prev = q; + return(1); + } + + q = p; + p = p->next; + }; + + return(0); +} + + +static +int +queue_list_add(struct queue_list **ql, const char *dest, struct event_queue *eq) +{ + struct queue_list *el; + + assert(dest != NULL); + assert(eq != NULL); + assert(ql != NULL); + + el = malloc(sizeof(*el)); + if(el == NULL) { + set_error(IL_NOMEM, ENOMEM, "queue_list_add: not enough room for new queue"); + return(-1); + } + + el->dest = strdup(dest); + if(el->dest == NULL) { + free(el); + set_error(IL_NOMEM, ENOMEM, "queue_list_add: not enough memory for new queue"); + return(-1); + } + el->queue = eq; + el->next = queues; + *ql = el; + return 0; +} + + +/* +static +int +queue_list_remove(struct queue_list *el, struct queue_list *prev) +{ + assert(el != NULL); + + if(prev) + prev->next = el->next; + else + queues = el->next; + + free(el); + return(1); +} +*/ + + +#if !defined(IL_NOTIFICATIONS) +static +char * +jobid2dest(edg_wlc_JobId jobid) +{ + char *server_name,*out; + unsigned int server_port; + + if (!jobid) { + set_error(IL_PROTO, EDG_WLL_ERROR_PARSE_BROKEN_ULM, "jobid2dest: invalid job id"); + return(NULL); + } + edg_wlc_JobIdGetServerParts(jobid,&server_name,&server_port); + + asprintf(&out,"%s:%d",server_name,server_port); + free(server_name); + if(!out) + set_error(IL_SYS, ENOMEM, "jobid2dest: error creating server name"); + return(out); +} +#endif + +struct event_queue * +queue_list_get(char *job_id_s) +{ + char *dest; + struct queue_list *q; + struct event_queue *eq; +#if !defined(IL_NOTIFICATIONS) + IL_EVENT_ID_T job_id; + + if(job_id_s == NULL || strcmp(job_id_s, "default") == 0) + return(log_queue); + + if(edg_wlc_JobIdParse(job_id_s, &job_id)) { + set_error(IL_LBAPI, EDG_WLL_ERROR_PARSE_BROKEN_ULM, "queue_list_get: invalid job id"); + return(NULL); + } + + dest = jobid2dest(job_id); + edg_wlc_JobIdFree(job_id); +#else + dest = job_id_s; +#endif + + if(dest == NULL) + return(NULL); + + if(queue_list_find(queues, dest, &q, NULL)) { +#if !defined(IL_NOTIFICATIONS) + free(dest); +#endif + return(q->queue); + } else { + eq = event_queue_create(dest); + if(eq) + queue_list_add(&queues, dest, eq); +#if !defined(IL_NOTIFICATIONS) + free(dest); +#endif + return(eq); + } +} + + +int +queue_list_is_log(struct event_queue *eq) +{ + return(eq == queue_list_get(NULL)); +} + + +int +queue_list_init(char *ls) +{ +#if !defined(IL_NOTIFICATIONS) + /* create queue for log server */ + log_queue = event_queue_create(ls); + if(log_queue == NULL) + return(-1); +#endif + + return(queue_list_create()); +} + + +static struct queue_list *current; + + +struct event_queue * +queue_list_first() +{ + current = queues; + return(current ? current->queue : NULL); +} + + +struct event_queue * +queue_list_next() +{ + current = current ? current->next : NULL; + return(current ? current->queue : NULL); +} + +#if defined(IL_NOTIFICATIONS) + +static struct queue_list *notifid_map = NULL; + +struct event_queue * +notifid_map_get_dest(const char * notif_id) +{ + struct queue_list *q = NULL; + + queue_list_find(notifid_map, notif_id, &q, NULL); + return(q ? q->queue : NULL); +} + + +/* returns 1 if mapping was changed, 0 if new one had to be created, -1 on error */ +int +notifid_map_set_dest(const char *notif_id, struct event_queue *eq) +{ + struct queue_list *q; + + if(queue_list_find(notifid_map, notif_id, &q, NULL)) { + q->queue = eq; + return(1); + } else { + return(queue_list_add(¬ifid_map, notif_id, eq)); + } +} + +#endif diff --git a/org.glite.lb.logger/src/queue_thread.c b/org.glite.lb.logger/src/queue_thread.c new file mode 100644 index 0000000..801f9ec --- /dev/null +++ b/org.glite.lb.logger/src/queue_thread.c @@ -0,0 +1,363 @@ +#ident "$Header$" + +#include +#include +#include +#include + +#include "interlogd.h" + +static +void +queue_thread_cleanup(void *q) +{ + struct event_queue *eq = (struct event_queue *)q; + + il_log(LOG_WARNING, "thread %d exits\n", eq->thread_id); + + /* unlock all held locks */ + /* FIXME: check that the thread always exits when holding these locks; + unlock them at appropriate places if this condition is not met + event_queue_unlock(eq); + event_queue_cond_unlock(eq); + */ + + /* clear thread id */ + eq->thread_id = 0; +} + + +static +void * +queue_thread(void *q) +{ + struct event_queue *eq = (struct event_queue *)q; + int ret, exit; + + if(init_errors(0) < 0) { + il_log(LOG_ERR, "Error initializing thread specific data, exiting!"); + pthread_exit(NULL); + } + + pthread_cleanup_push(queue_thread_cleanup, q); + + event_queue_cond_lock(eq); + + exit = 0; + while(!exit) { + + clear_error(); + + /* if there are no events, wait for them */ + ret = 0; + while (event_queue_empty(eq) +#if defined(INTERLOGD_HANDLE_CMD) && defined(INTERLOGD_FLUSH) + && !eq->flushing +#endif + ) { + ret = event_queue_wait(eq, 0); + if(ret < 0) { + /* error waiting */ + il_log(LOG_ERR, "queue_thread: %s\n", error_get_msg()); + event_queue_cond_unlock(eq); + pthread_exit((void*)-1); + } + } /* END while(empty) */ + + il_log(LOG_DEBUG, " attempting delivery to %s:%d\n", eq->dest_name, eq->dest_port); + + /* allow other threads to signal us, ie. insert new events while + * we are sending or request flush operation + */ + event_queue_cond_unlock(eq); + + /* connect to server */ + if((ret=event_queue_connect(eq)) < 0) { + /* internal error */ + il_log(LOG_ERR, "queue_thread: %s\n", error_get_msg()); + /* this allows for collecting status when flushing; + immediate exit would not do */ + exit = 1; + break; + } else if(ret == 0) { + /* not connected */ + if(error_get_maj() != IL_OK) + il_log(LOG_ERR, "queue_thread: %s\n", error_get_msg()); + il_log(LOG_INFO, " could not connect to server %s, waiting for retry\n", eq->dest_name); + } else { + /* connected, send events */ + switch(ret=event_queue_send(eq)) { + + case 0: + /* there was an error and we still have events to send */ + if(error_get_maj() != IL_OK) + il_log(LOG_ERR, "queue_thread: %s\n", error_get_msg()); + il_log(LOG_DEBUG, " events still waiting\n"); + break; + + case 1: + /* hey, we are done for now */ + il_log(LOG_DEBUG, " all events for %s sent\n", eq->dest_name); + break; + + default: + /* internal error */ + il_log(LOG_ERR, "queue_thread: %s\n", error_get_msg()); + exit = 1; + break; + + } /* switch */ + } + + /* we are done for now anyway, so close the queue */ + event_queue_close(eq); + + event_queue_cond_lock(eq); + +#if defined(INTERLOGD_HANDLE_CMD) && defined(INTERLOGD_FLUSH) + /* Check if we are flushing and if we are, report status to master */ + if(eq->flushing == 1) { + il_log(LOG_DEBUG, " flushing mode detected, reporting status\n"); + /* 0 - events waiting, 1 - events sent, < 0 - some error */ + eq->flush_result = ret; + eq->flushing = 2; + if(pthread_mutex_lock(&flush_lock) < 0) + abort(); + if(pthread_cond_signal(&flush_cond) < 0) + abort(); + if(pthread_mutex_unlock(&flush_lock) < 0) + abort(); + } +#endif + + /* if there was some error with server, sleep for a while */ + /* iff !event_queue_empty() */ + if(ret == 0) { + if(event_queue_sleep(eq) < 0) { + il_log(LOG_ERR, "queue_thread: %s\n", error_get_msg()); + event_queue_cond_unlock(eq); + pthread_exit((void*)-1); + } + } + + if(exit) { + /* we have to clean up before exiting */ + event_queue_cond_unlock(eq); + } + + } /* while */ + + pthread_cleanup_pop(1); + + return(eq); +} + + +int +event_queue_create_thread(struct event_queue *eq) +{ + assert(eq != NULL); + + /* if there is a thread already, just return */ + if(eq->thread_id > 0) + return(0); + + /* create the thread itself */ + if(pthread_create(&eq->thread_id, NULL, queue_thread, eq) < 0) { + eq->thread_id = 0; + set_error(IL_SYS, errno, "event_queue_create_thread: error creating new thread"); + return(-1); + } + + /* the thread is never going to be joined */ + pthread_detach(eq->thread_id); + + return(1); +} + + + +int +event_queue_lock(struct event_queue *eq) +{ + assert(eq != NULL); + + if(pthread_rwlock_wrlock(&eq->update_lock)) { + /*** abort instead, this is too serious + set_error(IL_SYS, errno, "event_queue_lock: error acquiring write lock"); + return(-1); + */ + abort(); + } + + return(0); +} + + +int +event_queue_lock_ro(struct event_queue *eq) +{ + assert(eq != NULL); + + if(pthread_rwlock_rdlock(&eq->update_lock)) { + /*** abort instead, this is too serious + set_error(IL_SYS, errno, "event_queue_lock_ro: error acquiring read lock"); + return(-1); + */ + abort(); + } + + return(0); +} + + +int +event_queue_unlock(struct event_queue *eq) +{ + assert(eq != NULL); + + if(pthread_rwlock_unlock(&eq->update_lock)) { + /*** abort instead, this is too serious + set_error(IL_SYS, errno, "event_queue_unlock: error releasing lock"); + return(-1); + */ + abort(); + } + + return(0); +} + + +int +event_queue_signal(struct event_queue *eq) +{ + assert(eq != NULL); + + if(pthread_cond_signal(&eq->ready_cond)) { + /*** abort instead, this is too serious + set_error(IL_SYS, errno, "event_queue_signal: error signaling queue thread"); + return(-1); + */ + abort(); + } + return(0); +} + + +int +event_queue_wait(struct event_queue *eq, int timeout) +{ + assert(eq != NULL); + + if(timeout) { + struct timespec endtime; + int ret = 0; + + endtime.tv_sec = time(NULL) + timeout; + endtime.tv_nsec = 0; + + if((ret=pthread_cond_timedwait(&eq->ready_cond, &eq->cond_lock, &endtime))) { + if(ret == ETIMEDOUT) + return(1); + /*** abort instead, this is too serious + set_error(IL_SYS, errno, "event_queue_wait: error waiting on condition variable"); + return(-1); + */ + abort(); + } + } else { + if(pthread_cond_wait(&eq->ready_cond, &eq->cond_lock)) { + /*** abort instead, this is too serious + set_error(IL_SYS, errno, "event_queue_wait: error waiting on condition variable"); + return(-1); + */ + abort(); + } + } + return(0); +} + + +int event_queue_sleep(struct event_queue *eq) +{ +#if defined(INTERLOGD_HANDLE_CMD) && defined(INTERLOGD_FLUSH) + struct timespec ts; + struct timeval tv; + int ret; + + assert(eq != NULL); + + gettimeofday(&tv, NULL); + ts.tv_sec = tv.tv_sec + eq->timeout; + ts.tv_nsec = 1000 * tv.tv_usec; + if((ret=pthread_cond_timedwait(&eq->flush_cond, &eq->cond_lock, &ts)) < 0) { + if(ret != ETIMEDOUT) { + /*** abort instead, this is too serious + set_error(IL_SYS, errno, "event_queue_sleep: error waiting on condition"); + return(-1); + */ + abort(); + } + } +#else + sleep(eq->timeout); +#endif + return(0); +} + + +#if defined(INTERLOGD_HANDLE_CMD) +int event_queue_wakeup(struct event_queue *eq) +{ + assert(eq != NULL); + + if(pthread_cond_signal(&eq->ready_cond)) { + /** + set_error(IL_SYS, errno, "event_queue_wakeup: error signaling queue thread"); + return(-1); + */ + abort(); + } +#if defined(INTERLOGD_FLUSH) + if(pthread_cond_signal(&eq->flush_cond)) { + /** + set_error(IL_SYS, errno, "event_queue_wakeup: error signaling queue thread"); + return(-1); + */ + abort(); + } +#endif + return(0); +} +#endif + +int event_queue_cond_lock(struct event_queue *eq) +{ + assert(eq != NULL); + + if(pthread_mutex_lock(&eq->cond_lock)) { + /** + set_error(IL_SYS, errno, "event_queue_cond_lock: error locking condition mutex"); + return(-1); + */ + abort(); + } + + return(0); +} + + +int event_queue_cond_unlock(struct event_queue *eq) +{ + assert(eq != NULL); + + if(pthread_mutex_unlock(&eq->cond_lock)) { + /** + set_error(IL_SYS, errno, "event_queue_cond_unlock: error locking condition mutex"); + return(-1); + */ + abort(); + } + + return(0); +} diff --git a/org.glite.lb.logger/src/recover.c b/org.glite.lb.logger/src/recover.c new file mode 100644 index 0000000..18b6d5b --- /dev/null +++ b/org.glite.lb.logger/src/recover.c @@ -0,0 +1,58 @@ +#ident "$Header$" + +#include +#include +#include + +#include "interlogd.h" + +extern char *file_prefix; + +void * +recover_thread(void *q) +{ + time_t cert_mtime = 0, key_mtime = 0; + + if(init_errors(0) < 0) { + il_log(LOG_ERR, "Error initializing thread specific data, exiting!"); + pthread_exit(NULL); + } + + while(1) { + il_log(LOG_INFO, "Looking up event files...\n"); + if(event_store_init(file_prefix) < 0) { + il_log(LOG_ERR, "recover_thread: %s\n", error_get_msg()); + exit(1); + } + if(event_store_recover_all() < 0) { + il_log(LOG_ERR, "recover_thread: %s\n", error_get_msg()); + exit(1); + } + if(event_store_cleanup() < 0) { + il_log(LOG_ERR, "recover_thread: %s\n", error_get_msg()); + exit(1); + } + il_log(LOG_INFO, "Checking for new certificate...\n"); + if(pthread_mutex_lock(&cred_handle_lock) < 0) + abort(); + if (edg_wll_ssl_watch_creds(key_file, + cert_file, + &key_mtime, + &cert_mtime) > 0) { + void * new_cred_handle = edg_wll_ssl_init(SSL_VERIFY_FAIL_IF_NO_PEER_CERT, + 0, + cert_file, + key_file, + 0, + 0); + if (new_cred_handle) { + edg_wll_ssl_free(cred_handle); + cred_handle = new_cred_handle; + il_log(LOG_INFO, "New certificate found and deployed.\n"); + } + } + if(pthread_mutex_unlock(&cred_handle_lock) < 0) + abort(); + sleep(INPUT_TIMEOUT); + } +} diff --git a/org.glite.lb.logger/src/send_event.c b/org.glite.lb.logger/src/send_event.c new file mode 100644 index 0000000..179581b --- /dev/null +++ b/org.glite.lb.logger/src/send_event.c @@ -0,0 +1,302 @@ +#ident "$Header$" + +#include +#include +#ifdef HAVE_UNISTD_H +#include +#endif +#include +#include +#include +#include + + +/* + * - L/B server protocol handling routines + */ + +#include "glite/wms/jobid/cjobid.h" +#include "glite/lb/il_string.h" +#include "glite/lb/context.h" + +#include "interlogd.h" + +#if defined(INTERLOGD_EMS) || (defined(INTERLOGD_HANDLE_CMD) && defined(INTERLOGD_FLUSH)) +/* + * Send confirmation to client. + * + */ +int +send_confirmation(long lllid, int code) +{ + struct sockaddr_un saddr; + char sname[256]; + int sock, ret; + + if((sock=socket(PF_UNIX, SOCK_STREAM, 0)) < 0) { + set_error(IL_SYS, errno, "send_confirmation: error creating socket"); + return(-1); + } + + if(fcntl(sock, F_SETFL, O_NONBLOCK) < 0) { + set_error(IL_SYS, errno, "send_confirmation: error setting socket options"); + return(-1); + } + + ret = 0; + memset(&saddr, 0, sizeof(saddr)); + saddr.sun_family = AF_UNIX; + snprintf(sname, sizeof(sname), "/tmp/dglogd_sock_%ld", lllid); + strcpy(saddr.sun_path, sname); + if(connect(sock, (struct sockaddr *)&saddr, sizeof(saddr.sun_path)) < 0) { + set_error(IL_SYS, errno, "send_confirmation: error connecting socket"); + goto out; + } + + if(send(sock, &code, sizeof(code), MSG_NOSIGNAL) < 0) { + set_error(IL_SYS, errno, "send_confirmation: error sending data"); + goto out; + } + ret = 1; + + il_log(LOG_DEBUG, " sent code %d back to client\n", code); + + out: + close(sock); + return(ret); +} + + +static +int +confirm_msg(struct server_msg *msg, int code, int code_min) +{ + switch(code) { + case LB_OK: + code_min = 0; + break; + case LB_DBERR: + /* code_min already contains apropriate error code */ + break; + case LB_PROTO: + code_min = EDG_WLL_IL_PROTO; + break; + default: + code_min = EDG_WLL_IL_SYS; + break; + } + + return(send_confirmation(msg->receipt_to, code_min)); +} +#endif + + +/* + * Read reply from server. + * Returns: -1 - error reading message, + * code > 0 - error code from server + */ +static +int +get_reply(struct event_queue *eq, char **buf, int *code_min) +{ + char buffer[17]; + char *msg, *p; + int len, code, l; + SSL *ssl; + struct timeval tv; + + ssl = eq->ssl; + + /* get message header */ + tv.tv_sec = TIMEOUT; + tv.tv_usec = 0; + code = edg_wll_ssl_read_full(ssl, buffer, 17, &tv, &len); + if(code < 0) { + set_error(IL_DGSSL, code, "get_reply (header)"); + return(-1); + } + + buffer[16] = 0; + + sscanf(buffer, "%d", &len); + if(len > MAXLEN) { + set_error(IL_PROTO, LB_NOMEM, "get_reply: error reading reply length"); + return(-1); + } + + /* allocate room for message body */ + if((msg = malloc(len)) == NULL) { + set_error(IL_NOMEM, ENOMEM, "get_reply: no room for message body"); + return(-1); + } + + /* read all the data */ + tv.tv_sec = TIMEOUT; + tv.tv_usec = 0; + code = edg_wll_ssl_read_full(ssl, msg, len, &tv, &l); + if(code < 0) { + set_error(IL_DGSSL, code, "get_reply (body)"); + return(-1); + } + + p = msg; + p = get_int(p, &code); + if(p == NULL) { + set_error(IL_PROTO, LB_PROTO, "get_reply: error receiving result code"); + free(msg); + return(-1); + } + p = get_int(p, code_min); + if(p == NULL) { + set_error(IL_PROTO, LB_PROTO, "get_reply: error receiving result code minor"); + free(msg); + return(-1); + } + p = get_string(p, buf); + if(p == NULL) { + if(*buf) { + free(*buf); + *buf = NULL; + } + free(msg); + set_error(IL_PROTO, LB_PROTO, "get_reply: error receiving result string"); + return(-1); + } + free(msg); + return(code); +} + + + +/* + * Returns: -1 - internal error, 0 - not connected, timeout set, 1 - OK + */ +int +event_queue_connect(struct event_queue *eq) +{ + int ret; + struct timeval tv; + + assert(eq != NULL); + + if(eq->ssl == NULL) { + + tv.tv_sec = TIMEOUT; + tv.tv_usec = 0; + if(pthread_mutex_lock(&cred_handle_lock) < 0) + abort(); + il_log(LOG_DEBUG, " trying to connect to %s:%d\n", eq->dest_name, eq->dest_port); + ret = edg_wll_ssl_connect(cred_handle, eq->dest_name, eq->dest_port, &tv, &eq->ssl); + if(pthread_mutex_unlock(&cred_handle_lock) < 0) + abort(); + if(ret < 0) { + set_error(IL_DGSSL, ret, "event_queue_connect: edg_wll_ssl_connect"); + eq->ssl = NULL; + eq->timeout = TIMEOUT; + return(0); + } + } + + return(1); +} + + +int +event_queue_close(struct event_queue *eq) +{ + assert(eq != NULL); + + if(eq->ssl != NULL) { + edg_wll_ssl_close(eq->ssl); + eq->ssl = NULL; + } + return(0); +} + + +/* + * Send all events from the queue. + * Returns: -1 - system error, 0 - not send, 1 - queue empty + */ +int +event_queue_send(struct event_queue *eq) +{ + assert(eq != NULL); + + if(eq->ssl == NULL) + return(0); + + /* feed the server with events */ + while (!event_queue_empty(eq)) { + struct server_msg *msg; + char *rep; + int ret, code, code_min, bytes_sent; + struct timeval tv; + + clear_error(); + + if(event_queue_get(eq, &msg) < 0) + return(-1); + + tv.tv_sec = TIMEOUT; + tv.tv_usec = 0; + ret = edg_wll_ssl_write_full(eq->ssl, msg->msg, msg->len, &tv, &bytes_sent); + if(ret < 0) { + eq->timeout = TIMEOUT; + return(0); + } + + if((code = get_reply(eq, &rep, &code_min)) < 0) { + /* could not get the reply properly, so try again later */ + il_log(LOG_ERR, " error reading server %s reply:\n %s\n", eq->dest_name, error_get_msg()); + eq->timeout = TIMEOUT; + return(0); + } + + il_log(LOG_DEBUG, " event sent, server %s replied with %d, %s\n", eq->dest_name, code, rep); + free(rep); + + /* the reply is back here */ + switch(code) { + + /* NOT USED: case LB_TIME: */ + case LB_NOMEM: + /* NOT USED: case LB_SYS: */ + /* NOT USED: case LB_AUTH: */ + /* non fatal errors (for us) */ + eq->timeout = TIMEOUT; + return(0); + + case LB_OK: + /* event succesfully delivered */ + + default: /* LB_DBERR, LB_PROTO */ + /* the event was not accepted by the server */ + /* update the event pointer */ + if(event_store_commit(msg->es, msg->ev_len, queue_list_is_log(eq)) < 0) + /* failure committing message, this is bad */ + return(-1); + /* if we have just delivered priority message from the queue, send confirmation */ + ret = 1; +#if defined(INTERLOGD_EMS) + if(server_msg_is_priority(msg) && + ((ret=confirm_msg(msg, code, code_min)) < 0)) + return(ret); +#endif + + if((ret == 0) && + (error_get_maj() != IL_OK)) + il_log(LOG_ERR, "send_event: %s\n", error_get_msg()); + + event_queue_remove(eq); + break; + + } /* switch */ + } /* while */ + + return(1); + +} /* send_events */ + + diff --git a/org.glite.lb.logger/src/server_msg.c b/org.glite.lb.logger/src/server_msg.c new file mode 100644 index 0000000..f230a35 --- /dev/null +++ b/org.glite.lb.logger/src/server_msg.c @@ -0,0 +1,216 @@ +#ident "$Header$" + +#include +#include +#include + +#include "interlogd.h" +#include "glite/lb/il_string.h" +#include "glite/lb/events_parse.h" +#include "glite/lb/consumer.h" +#include "glite/lb/context.h" + +static +int +create_msg(char *event, char **buffer, long *receipt) +{ + char *p; int len; + char *ucs = "michal"; + + *receipt = 0; + +#if defined(INTERLOGD_EMS) + /* find DG.LLLID */ + if(strncmp(event, "DG.LLLID",8) == 0 || + strncmp(event, "DG.LLPID",8) == 0) { /* 8 == strlen("DG.LLLID") */ + + /* skip the key */ + event += 9; /* 9 = strlen("DG.LLLID=") */ + *receipt = atol(event); + p = strchr(event, ' '); + if(!p) { + set_error(IL_LBAPI, EDG_WLL_ERROR_PARSE_BROKEN_ULM, + "create_msg: error parsing locallogger PID"); + return(-1); + } + /* skip the value */ + event = p + 1; + + /* find DG.PRIORITY */ + p = strstr(event, "DG.PRIORITY"); + if(p) { + int n; + + p += 12; /* skip the key and = */ + if((n = atoi(p)) == 0) { + /* normal asynchronous message */ + *receipt = 0; + } + } else { + /* could not find priority key */ + *receipt = 0; + } + + } else { + /* could not find local logger PID, confirmation can not be sent */ + *receipt = 0; + } +#endif + + /* allocate enough room to hold the message */ + len = 17 + len_string(ucs) + len_string(event); + if((*buffer = malloc(len)) == NULL) { + set_error(IL_NOMEM, ENOMEM, "create_msg: out of memory allocating message"); + return(-1); + } + + p = *buffer; + + /* write header */ + sprintf(p, "%16d\n", len - 17); + p += 17; + + /* write rest of the message */ + p = put_string(p, ucs); + p = put_string(p, event); + + return(p - *buffer); + +} + + +struct server_msg * +server_msg_create(char *event) +{ + struct server_msg *msg; + + msg = malloc(sizeof(*msg)); + if(msg == NULL) { + set_error(IL_NOMEM, ENOMEM, "server_msg_create: out of memory allocating message"); + return(NULL); + } + + if(server_msg_init(msg, event) < 0) { + server_msg_free(msg); + return(NULL); + } + + return(msg); +} + + +struct server_msg * +server_msg_copy(struct server_msg *src) +{ + struct server_msg *msg; + + msg = malloc(sizeof(*msg)); + if(msg == NULL) { + set_error(IL_NOMEM, ENOMEM, "server_msg_copy: out of memory allocating message"); + return(NULL); + } + + msg->msg = malloc(src->len); + if(msg->msg == NULL) { + set_error(IL_NOMEM, ENOMEM, "server_msg_copy: out of memory allocating server message"); + server_msg_free(msg); + return(NULL); + } + msg->len = src->len; + memcpy(msg->msg, src->msg, src->len); + + msg->job_id_s = strdup(src->job_id_s); + msg->ev_len = src->ev_len; + msg->es = src->es; + msg->receipt_to = src->receipt_to; +#if defined(IL_NOTIFICATIONS) + msg->dest_name = strdup(src->dest_name); + msg->dest_port = src->dest_port; + msg->dest = strdup(src->dest); +#endif + return(msg); +} + + +int +server_msg_init(struct server_msg *msg, char *event) +{ +#if defined(IL_NOTIFICATIONS) + edg_wll_Context context; + edg_wll_Event *notif_event; + int ret; +#endif + + assert(msg != NULL); + + memset(msg, 0, sizeof(*msg)); + + +#if defined(IL_NOTIFICATIONS) + edg_wll_InitContext(&context); + + /* parse the notification event */ + if((ret=edg_wll_ParseNotifEvent(context, event, ¬if_event))) { + set_error(IL_LBAPI, ret, "server_msg_init: error parsing notification event"); + return(-1); + } + /* FIXME: check for allocation error */ + if(notif_event->notification.dest_host && + (strlen(notif_event->notification.dest_host) > 0)) { + msg->dest_name = strdup(notif_event->notification.dest_host); + msg->dest_port = notif_event->notification.dest_port; + asprintf(&msg->dest, "%s:%d", msg->dest_name, msg->dest_port); + } + msg->job_id_s = edg_wll_NotifIdUnparse(notif_event->notification.notifId); + if(notif_event->notification.jobstat && + (strlen(notif_event->notification.jobstat) > 0)) { + /* remember to add event separator to the length */ + msg->ev_len = strlen(event) + 1; + msg->len = create_msg(event, &msg->msg, &msg->receipt_to); + } + edg_wll_FreeEvent(notif_event); + free(notif_event); + if(msg->len < 0) { + return(-1); + } +#else + /* remember to add event separator to the length */ + msg->ev_len = strlen(event) + 1; + msg->len = create_msg(event, &msg->msg, &msg->receipt_to); + if(msg->len < 0) { + return(-1); + } + msg->job_id_s = edg_wll_GetJobId(event); +#endif + if(msg->job_id_s == NULL) { + set_error(IL_LBAPI, EDG_WLL_ERROR_PARSE_BROKEN_ULM, "server_msg_init: error getting id"); + return(-1); + } + + return(0); +} + + +int +server_msg_is_priority(struct server_msg *msg) +{ + assert(msg != NULL); + + return(msg->receipt_to != 0); +} + + +int +server_msg_free(struct server_msg *msg) +{ + assert(msg != NULL); + + if(msg->msg) free(msg->msg); + if(msg->job_id_s) free(msg->job_id_s); +#if defined(IL_NOTIFICATIONS) + if(msg->dest_name) free(msg->dest_name); + if(msg->dest) free(msg->dest); +#endif + free(msg); + return 0; +}