};
struct event_queue *
-event_queue_create(char *server_name)
+event_queue_create(char *server_name, struct il_output_plugin *output)
{
struct event_queue *eq;
- char *p;
+ char *p,*s, c;
if((eq = malloc(sizeof(*eq))) == NULL) {
set_error(IL_NOMEM, ENOMEM, "event_queue_create: error allocating event queue");
eq->dest = strdup(server_name);
- p = strchr(server_name, ':');
- if(p)
+ s = strstr(server_name, "://");
+ if(s == NULL) {
+ s = server_name;
+ } else {
+ s = s + 3;
+ }
+ p = strchr(s, ':');
+ if(p) {
*p++ = 0;
+ c = ':';
+ } else {
+ p = strchr(s, '/');
+ if(p) {
+ *p++ = 0;
+ c = '/';
+ }
+ }
eq->dest_name = strdup(server_name);
if(p)
- *(p-1) = ':';
+ *(p-1) = c;
#if defined(IL_NOTIFICATIONS) || defined(IL_WS)
- eq->dest_port = atoi(p);
+ if(p && c == ':') {
+ eq->dest_port = atoi(p);
+ } else {
+ eq->dest_port = 0; // use whatever default there is for given url scheme
+ }
#else
- eq->dest_port = p ? atoi(p)+1 : GLITE_JOBID_DEFAULT_PORT+1;
+ eq->dest_port = p && c == ':' ? atoi(p)+1 : GLITE_JOBID_DEFAULT_PORT+1;
#endif
+
+ /* setup output functions */
+ if(output != NULL) {
+ eq->event_queue_connect = output->event_queue_connect;
+ eq->event_queue_send = output->event_queue_send;
+ eq->event_queue_close = output->event_queue_close;
+ } else {
+ eq->event_queue_connect = event_queue_connect;
+ eq->event_queue_send = event_queue_send;
+ eq->event_queue_close = event_queue_close;
+ }
+
/* create all necessary locks */
if(pthread_rwlock_init(&eq->update_lock, NULL)) {
set_error(IL_SYS, errno, "event_queue_create: error creating update lock");
int cur_len; /* current length */
int throttling; /* event insertion suspend flag */
int first_event_sent; /* connection can be preempted by server */
+ /* delivery methods */
+ int (*event_queue_connect)(struct event_queue *);
+ int (*event_queue_send)(struct event_queue *);
+ int (*event_queue_close)(struct event_queue *);
};
+struct il_output_plugin {
+ int (*event_queue_connect)(struct event_queue *);
+ int (*event_queue_send)(struct event_queue *);
+ int (*event_queue_close)(struct event_queue *);
+ int (*plugin_init)();
+ int (*plugin_supports_scheme)(const char *);
+};
/* credential destructor */
void cred_handle_destroy(void *);
int server_msg_free(struct server_msg *);
/* general event queue methods */
-struct event_queue *event_queue_create(char *);
+struct event_queue *event_queue_create(char *, struct il_output_plugin *);
int event_queue_free(struct event_queue *);
int event_queue_empty(struct event_queue *);
int event_queue_insert(struct event_queue *, struct server_msg *);
int receive_http(void *, int (*)(void *, char *, const int), il_http_message_t *);
#endif
+/* plugin functions */
+int plugin_init(const char *);
+struct il_output_plugin *plugin_get(const char *);
+
/* master main loop */
int loop();
--- /dev/null
+#ident "$Header$"
+
+#include "interlogd.h"
+
+#include <errno.h>
+#include <stdio.h>
+#include <stdlib.h>
+#include <dlfcn.h>
+
+struct plugin_list {
+ struct il_output_plugin plugin_def;
+ struct plugin_list *next;
+};
+
+static struct plugin_list *plugins = NULL;
+
+#define DL_RESOLVESYM(var, handle, name, type) \
+ dlerror(); \
+ var = (type) dlsym(handle, name); \
+ if(var == NULL) { \
+ snprintf(err, sizeof(err), "plugin_init: error resolving %s: %s", name, dlerror()); \
+ set_error(IL_SYS, errno, err); \
+ return -1; \
+ }
+
+int plugin_init(const char *plugin_name)
+{
+ char err[256];
+ void *dl_handle;
+ struct plugin_list *plugin;
+
+ dlerror();
+ dl_handle = dlopen(plugin_name, RTLD_LAZY);
+ if(dl_handle == NULL) {
+ snprintf(err, sizeof(err), "plugin_init: error opening dynamic library: %s", dlerror());
+ set_error(IL_SYS, errno, err);
+ return -1;
+ }
+ dlerror();
+
+ plugin = malloc(sizeof(*plugin));
+ if(plugin == NULL) {
+ set_error(IL_NOMEM, ENOMEM, "plugin_init: error allocating plugin description");
+ return -1;
+ }
+
+ plugin->next = plugins;
+ DL_RESOLVESYM(plugin->plugin_def.plugin_init, dl_handle, "plugin_init", int(*)());
+ DL_RESOLVESYM(plugin->plugin_def.plugin_supports_scheme, dl_handle, "plugin_supports_scheme", int(*)(const char *));
+ DL_RESOLVESYM(plugin->plugin_def.event_queue_connect, dl_handle, "event_queue_connect", int (*)(struct event_queue*));
+ DL_RESOLVESYM(plugin->plugin_def.event_queue_send, dl_handle, "event_queue_send", int (*)(struct event_queue *));
+ DL_RESOLVESYM(plugin->plugin_def.event_queue_close, dl_handle, "event_queue_close", int (*)(struct event_queue *));
+
+ return (*plugin->plugin_def.plugin_init)();
+}
+
+
+struct il_output_plugin *
+plugin_get(const char *scheme)
+{
+ struct plugin_list *outp;
+
+ for(outp = plugins; outp != NULL; outp = outp->next) {
+ if((outp->plugin_def.plugin_supports_scheme)(scheme)) {
+ return &outp->plugin_def;
+ }
+ }
+
+ return NULL;
+}
char *dest;
struct queue_list *q;
struct event_queue *eq;
+ struct il_output_plugin *outp;
+
#if !defined(IL_NOTIFICATIONS)
IL_EVENT_ID_T job_id;
dest = jobid2dest(job_id);
edg_wlc_JobIdFree(job_id);
+ outp = NULL;
#else
dest = job_id_s;
+ outp = plugin_get(dest);
#endif
if(dest == NULL)
#endif
return(q->queue);
} else {
- eq = event_queue_create(dest);
+ eq = event_queue_create(dest, outp);
if(eq)
queue_list_add(&queues, dest, eq);
#if !defined(IL_NOTIFICATIONS)
{
#if !defined(IL_NOTIFICATIONS)
/* create queue for log server */
- log_queue = event_queue_create(ls);
+ log_queue = event_queue_create(ls, NULL);
if(log_queue == NULL)
return(-1);
#endif
}
glite_common_log(LOG_CATEGORY_CONTROL, LOG_PRIORITY_INFO,
- " started new thread for delivery to %s:%d",
- eq->dest_name, eq->dest_port);
+ " started new thread for delivery to %s",
+ eq->dest);
pthread_cleanup_push(queue_thread_cleanup, q);
if(lazy_close && close_timeout) {
ret = event_queue_wait(eq, close_timeout);
if(ret == 1) {/* timeout? */
- event_queue_close(eq);
+ (*eq->event_queue_close)(eq);
glite_common_log(LOG_CATEGORY_LB_IL, LOG_PRIORITY_DEBUG,
- " connection to %s:%d closed",
- eq->dest_name, eq->dest_port);
+ " connection to %s closed",
+ eq->dest);
}
close_timeout = 0;
} else {
glite_common_log(LOG_CATEGORY_CONTROL, LOG_PRIORITY_INFO,
" thread idle for more than %d seconds, exiting",
exit_timeout);
- event_queue_close(eq);
+ (*eq->event_queue_close)(eq);
event_queue_cond_unlock(eq);
pthread_exit((void*)0);
}
/* deliver pending events */
glite_common_log(LOG_CATEGORY_LB_IL, LOG_PRIORITY_DEBUG,
- " attempting delivery to %s:%d",
- eq->dest_name, eq->dest_port);
+ " attempting delivery to %s",
+ eq->dest);
/* connect to server */
- if((ret=event_queue_connect(eq)) == 0) {
+ if((ret=(*eq->event_queue_connect)(eq)) == 0) {
/* not connected */
if(error_get_maj() != IL_OK)
glite_common_log(LOG_CATEGORY_LB_IL, LOG_PRIORITY_WARN,
#if defined(IL_NOTIFICATIONS)
glite_common_log(LOG_CATEGORY_LB_IL, LOG_PRIORITY_INFO,
" could not connect to client %s, waiting for retry",
- eq->dest_name);
+ eq->dest);
#else
glite_common_log(LOG_CATEGORY_LB_IL, LOG_PRIORITY_INFO,
" could not connect to bookkeeping server %s, waiting for retry",
- eq->dest_name);
+ eq->dest);
#endif
retrycnt++;
} else {
retrycnt = 0;
/* connected, send events */
- switch(ret=event_queue_send(eq)) {
+ switch(ret=(*eq->event_queue_send)(eq)) {
case 0:
/* there was an error and we still have events to send */
/* hey, we are done for now */
glite_common_log(LOG_CATEGORY_LB_IL, LOG_PRIORITY_DEBUG,
" all events for %s sent",
- eq->dest_name);
+ eq->dest);
break;
default:
if((ret == 1) && lazy_close)
close_timeout = default_close_timeout;
else {
- event_queue_close(eq);
+ (*eq->event_queue_close)(eq);
glite_common_log(LOG_CATEGORY_LB_IL, LOG_PRIORITY_DEBUG,
- " connection to %s:%d closed",
- eq->dest_name, eq->dest_port);
+ " connection to %sclosed",
+ eq->dest);
}
}
}
}
/* FIXME: check for allocation error */
- if(notif_event->notification.dest_host &&
+ if(notif_event->notification.dest_url &&
+ (strlen(notif_event->notification.dest_url) > 0)) {
+ /* destination URL */
+ msg->dest = strdup(notif_event->notification.dest_url);
+ msg->dest_name = NULL;
+ msg->dest_port = 0;
+ } else if(notif_event->notification.dest_host &&
(strlen(notif_event->notification.dest_host) > 0)) {
+ /* destination host and port */
msg->dest_name = strdup(notif_event->notification.dest_host);
msg->dest_port = notif_event->notification.dest_port;
asprintf(&msg->dest, "%s:%d", msg->dest_name, msg->dest_port);