From 35140340e201f2b449b96c97cff7105d696cc9bf Mon Sep 17 00:00:00 2001 From: =?utf8?q?Michal=20Voc=C5=AF?= Date: Wed, 20 Jan 2010 08:11:22 +0000 Subject: [PATCH] added output plugins infrastructure --- org.glite.lb.logger/src/event_queue.c | 44 +++++++++++++++++---- org.glite.lb.logger/src/interlogd.h | 17 ++++++++- org.glite.lb.logger/src/plugin_mgr.c | 70 ++++++++++++++++++++++++++++++++++ org.glite.lb.logger/src/queue_mgr.c | 8 +++- org.glite.lb.logger/src/queue_thread.c | 32 ++++++++-------- org.glite.lb.logger/src/server_msg.c | 9 ++++- 6 files changed, 153 insertions(+), 27 deletions(-) create mode 100644 org.glite.lb.logger/src/plugin_mgr.c diff --git a/org.glite.lb.logger/src/event_queue.c b/org.glite.lb.logger/src/event_queue.c index 8e1588d..33f026e 100644 --- a/org.glite.lb.logger/src/event_queue.c +++ b/org.glite.lb.logger/src/event_queue.c @@ -22,10 +22,10 @@ struct event_queue_msg { }; struct event_queue * -event_queue_create(char *server_name) +event_queue_create(char *server_name, struct il_output_plugin *output) { struct event_queue *eq; - char *p; + char *p,*s, c; if((eq = malloc(sizeof(*eq))) == NULL) { set_error(IL_NOMEM, ENOMEM, "event_queue_create: error allocating event queue"); @@ -36,18 +36,48 @@ event_queue_create(char *server_name) eq->dest = strdup(server_name); - p = strchr(server_name, ':'); - if(p) + s = strstr(server_name, "://"); + if(s == NULL) { + s = server_name; + } else { + s = s + 3; + } + p = strchr(s, ':'); + if(p) { *p++ = 0; + c = ':'; + } else { + p = strchr(s, '/'); + if(p) { + *p++ = 0; + c = '/'; + } + } eq->dest_name = strdup(server_name); if(p) - *(p-1) = ':'; + *(p-1) = c; #if defined(IL_NOTIFICATIONS) || defined(IL_WS) - eq->dest_port = atoi(p); + if(p && c == ':') { + eq->dest_port = atoi(p); + } else { + eq->dest_port = 0; // use whatever default there is for given url scheme + } #else - eq->dest_port = p ? atoi(p)+1 : GLITE_JOBID_DEFAULT_PORT+1; + eq->dest_port = p && c == ':' ? atoi(p)+1 : GLITE_JOBID_DEFAULT_PORT+1; #endif + + /* setup output functions */ + if(output != NULL) { + eq->event_queue_connect = output->event_queue_connect; + eq->event_queue_send = output->event_queue_send; + eq->event_queue_close = output->event_queue_close; + } else { + eq->event_queue_connect = event_queue_connect; + eq->event_queue_send = event_queue_send; + eq->event_queue_close = event_queue_close; + } + /* create all necessary locks */ if(pthread_rwlock_init(&eq->update_lock, NULL)) { set_error(IL_SYS, errno, "event_queue_create: error creating update lock"); diff --git a/org.glite.lb.logger/src/interlogd.h b/org.glite.lb.logger/src/interlogd.h index 4c5763a..cd905c3 100644 --- a/org.glite.lb.logger/src/interlogd.h +++ b/org.glite.lb.logger/src/interlogd.h @@ -191,8 +191,19 @@ struct event_queue { int cur_len; /* current length */ int throttling; /* event insertion suspend flag */ int first_event_sent; /* connection can be preempted by server */ + /* delivery methods */ + int (*event_queue_connect)(struct event_queue *); + int (*event_queue_send)(struct event_queue *); + int (*event_queue_close)(struct event_queue *); }; +struct il_output_plugin { + int (*event_queue_connect)(struct event_queue *); + int (*event_queue_send)(struct event_queue *); + int (*event_queue_close)(struct event_queue *); + int (*plugin_init)(); + int (*plugin_supports_scheme)(const char *); +}; /* credential destructor */ void cred_handle_destroy(void *); @@ -207,7 +218,7 @@ int server_msg_is_priority(struct server_msg *); int server_msg_free(struct server_msg *); /* general event queue methods */ -struct event_queue *event_queue_create(char *); +struct event_queue *event_queue_create(char *, struct il_output_plugin *); int event_queue_free(struct event_queue *); int event_queue_empty(struct event_queue *); int event_queue_insert(struct event_queue *, struct server_msg *); @@ -273,6 +284,10 @@ int parse_header(const char *, il_http_message_t *); int receive_http(void *, int (*)(void *, char *, const int), il_http_message_t *); #endif +/* plugin functions */ +int plugin_init(const char *); +struct il_output_plugin *plugin_get(const char *); + /* master main loop */ int loop(); diff --git a/org.glite.lb.logger/src/plugin_mgr.c b/org.glite.lb.logger/src/plugin_mgr.c new file mode 100644 index 0000000..0826dcc --- /dev/null +++ b/org.glite.lb.logger/src/plugin_mgr.c @@ -0,0 +1,70 @@ +#ident "$Header$" + +#include "interlogd.h" + +#include +#include +#include +#include + +struct plugin_list { + struct il_output_plugin plugin_def; + struct plugin_list *next; +}; + +static struct plugin_list *plugins = NULL; + +#define DL_RESOLVESYM(var, handle, name, type) \ + dlerror(); \ + var = (type) dlsym(handle, name); \ + if(var == NULL) { \ + snprintf(err, sizeof(err), "plugin_init: error resolving %s: %s", name, dlerror()); \ + set_error(IL_SYS, errno, err); \ + return -1; \ + } + +int plugin_init(const char *plugin_name) +{ + char err[256]; + void *dl_handle; + struct plugin_list *plugin; + + dlerror(); + dl_handle = dlopen(plugin_name, RTLD_LAZY); + if(dl_handle == NULL) { + snprintf(err, sizeof(err), "plugin_init: error opening dynamic library: %s", dlerror()); + set_error(IL_SYS, errno, err); + return -1; + } + dlerror(); + + plugin = malloc(sizeof(*plugin)); + if(plugin == NULL) { + set_error(IL_NOMEM, ENOMEM, "plugin_init: error allocating plugin description"); + return -1; + } + + plugin->next = plugins; + DL_RESOLVESYM(plugin->plugin_def.plugin_init, dl_handle, "plugin_init", int(*)()); + DL_RESOLVESYM(plugin->plugin_def.plugin_supports_scheme, dl_handle, "plugin_supports_scheme", int(*)(const char *)); + DL_RESOLVESYM(plugin->plugin_def.event_queue_connect, dl_handle, "event_queue_connect", int (*)(struct event_queue*)); + DL_RESOLVESYM(plugin->plugin_def.event_queue_send, dl_handle, "event_queue_send", int (*)(struct event_queue *)); + DL_RESOLVESYM(plugin->plugin_def.event_queue_close, dl_handle, "event_queue_close", int (*)(struct event_queue *)); + + return (*plugin->plugin_def.plugin_init)(); +} + + +struct il_output_plugin * +plugin_get(const char *scheme) +{ + struct plugin_list *outp; + + for(outp = plugins; outp != NULL; outp = outp->next) { + if((outp->plugin_def.plugin_supports_scheme)(scheme)) { + return &outp->plugin_def; + } + } + + return NULL; +} diff --git a/org.glite.lb.logger/src/queue_mgr.c b/org.glite.lb.logger/src/queue_mgr.c index 2879e14..3fb6014 100644 --- a/org.glite.lb.logger/src/queue_mgr.c +++ b/org.glite.lb.logger/src/queue_mgr.c @@ -123,6 +123,8 @@ queue_list_get(char *job_id_s) char *dest; struct queue_list *q; struct event_queue *eq; + struct il_output_plugin *outp; + #if !defined(IL_NOTIFICATIONS) IL_EVENT_ID_T job_id; @@ -136,8 +138,10 @@ queue_list_get(char *job_id_s) dest = jobid2dest(job_id); edg_wlc_JobIdFree(job_id); + outp = NULL; #else dest = job_id_s; + outp = plugin_get(dest); #endif if(dest == NULL) @@ -149,7 +153,7 @@ queue_list_get(char *job_id_s) #endif return(q->queue); } else { - eq = event_queue_create(dest); + eq = event_queue_create(dest, outp); if(eq) queue_list_add(&queues, dest, eq); #if !defined(IL_NOTIFICATIONS) @@ -172,7 +176,7 @@ queue_list_init(char *ls) { #if !defined(IL_NOTIFICATIONS) /* create queue for log server */ - log_queue = event_queue_create(ls); + log_queue = event_queue_create(ls, NULL); if(log_queue == NULL) return(-1); #endif diff --git a/org.glite.lb.logger/src/queue_thread.c b/org.glite.lb.logger/src/queue_thread.c index ea596a8..72f2964 100644 --- a/org.glite.lb.logger/src/queue_thread.c +++ b/org.glite.lb.logger/src/queue_thread.c @@ -54,8 +54,8 @@ queue_thread(void *q) } glite_common_log(LOG_CATEGORY_CONTROL, LOG_PRIORITY_INFO, - " started new thread for delivery to %s:%d", - eq->dest_name, eq->dest_port); + " started new thread for delivery to %s", + eq->dest); pthread_cleanup_push(queue_thread_cleanup, q); @@ -77,10 +77,10 @@ queue_thread(void *q) if(lazy_close && close_timeout) { ret = event_queue_wait(eq, close_timeout); if(ret == 1) {/* timeout? */ - event_queue_close(eq); + (*eq->event_queue_close)(eq); glite_common_log(LOG_CATEGORY_LB_IL, LOG_PRIORITY_DEBUG, - " connection to %s:%d closed", - eq->dest_name, eq->dest_port); + " connection to %s closed", + eq->dest); } close_timeout = 0; } else { @@ -89,7 +89,7 @@ queue_thread(void *q) glite_common_log(LOG_CATEGORY_CONTROL, LOG_PRIORITY_INFO, " thread idle for more than %d seconds, exiting", exit_timeout); - event_queue_close(eq); + (*eq->event_queue_close)(eq); event_queue_cond_unlock(eq); pthread_exit((void*)0); } @@ -118,10 +118,10 @@ queue_thread(void *q) /* deliver pending events */ glite_common_log(LOG_CATEGORY_LB_IL, LOG_PRIORITY_DEBUG, - " attempting delivery to %s:%d", - eq->dest_name, eq->dest_port); + " attempting delivery to %s", + eq->dest); /* connect to server */ - if((ret=event_queue_connect(eq)) == 0) { + if((ret=(*eq->event_queue_connect)(eq)) == 0) { /* not connected */ if(error_get_maj() != IL_OK) glite_common_log(LOG_CATEGORY_LB_IL, LOG_PRIORITY_WARN, @@ -129,17 +129,17 @@ queue_thread(void *q) #if defined(IL_NOTIFICATIONS) glite_common_log(LOG_CATEGORY_LB_IL, LOG_PRIORITY_INFO, " could not connect to client %s, waiting for retry", - eq->dest_name); + eq->dest); #else glite_common_log(LOG_CATEGORY_LB_IL, LOG_PRIORITY_INFO, " could not connect to bookkeeping server %s, waiting for retry", - eq->dest_name); + eq->dest); #endif retrycnt++; } else { retrycnt = 0; /* connected, send events */ - switch(ret=event_queue_send(eq)) { + switch(ret=(*eq->event_queue_send)(eq)) { case 0: /* there was an error and we still have events to send */ @@ -155,7 +155,7 @@ queue_thread(void *q) /* hey, we are done for now */ glite_common_log(LOG_CATEGORY_LB_IL, LOG_PRIORITY_DEBUG, " all events for %s sent", - eq->dest_name); + eq->dest); break; default: @@ -172,10 +172,10 @@ queue_thread(void *q) if((ret == 1) && lazy_close) close_timeout = default_close_timeout; else { - event_queue_close(eq); + (*eq->event_queue_close)(eq); glite_common_log(LOG_CATEGORY_LB_IL, LOG_PRIORITY_DEBUG, - " connection to %s:%d closed", - eq->dest_name, eq->dest_port); + " connection to %sclosed", + eq->dest); } } } diff --git a/org.glite.lb.logger/src/server_msg.c b/org.glite.lb.logger/src/server_msg.c index 84ae833..4bcc587 100644 --- a/org.glite.lb.logger/src/server_msg.c +++ b/org.glite.lb.logger/src/server_msg.c @@ -156,8 +156,15 @@ server_msg_init(struct server_msg *msg, il_octet_string_t *event) } /* FIXME: check for allocation error */ - if(notif_event->notification.dest_host && + if(notif_event->notification.dest_url && + (strlen(notif_event->notification.dest_url) > 0)) { + /* destination URL */ + msg->dest = strdup(notif_event->notification.dest_url); + msg->dest_name = NULL; + msg->dest_port = 0; + } else if(notif_event->notification.dest_host && (strlen(notif_event->notification.dest_host) > 0)) { + /* destination host and port */ msg->dest_name = strdup(notif_event->notification.dest_host); msg->dest_port = notif_event->notification.dest_port; asprintf(&msg->dest, "%s:%d", msg->dest_name, msg->dest_port); -- 1.8.2.3