added output plugins infrastructure
authorMichal Voců <michal@ruk.cuni.cz>
Wed, 20 Jan 2010 08:11:22 +0000 (08:11 +0000)
committerMichal Voců <michal@ruk.cuni.cz>
Wed, 20 Jan 2010 08:11:22 +0000 (08:11 +0000)
org.glite.lb.logger/src/event_queue.c
org.glite.lb.logger/src/interlogd.h
org.glite.lb.logger/src/plugin_mgr.c [new file with mode: 0644]
org.glite.lb.logger/src/queue_mgr.c
org.glite.lb.logger/src/queue_thread.c
org.glite.lb.logger/src/server_msg.c

index 8e1588d..33f026e 100644 (file)
@@ -22,10 +22,10 @@ struct event_queue_msg {
 };
 
 struct event_queue *
-event_queue_create(char *server_name)
+event_queue_create(char *server_name, struct il_output_plugin *output)
 {
   struct event_queue *eq;
-  char *p;
+  char *p,*s, c;
 
   if((eq = malloc(sizeof(*eq))) == NULL) {
     set_error(IL_NOMEM, ENOMEM, "event_queue_create: error allocating event queue");
@@ -36,18 +36,48 @@ event_queue_create(char *server_name)
 
   eq->dest = strdup(server_name);
 
-  p = strchr(server_name, ':');
-  if(p)
+  s = strstr(server_name, "://");
+  if(s == NULL) {
+         s = server_name;
+  } else {
+         s = s + 3;
+  }
+  p = strchr(s, ':');
+  if(p) {
     *p++ = 0;
+    c = ':';
+  } else {
+         p = strchr(s, '/');
+         if(p) {
+                 *p++ = 0;
+                 c = '/';
+         }
+  }
   eq->dest_name = strdup(server_name);
   if(p)
-    *(p-1) = ':';
+    *(p-1) = c;
 
 #if defined(IL_NOTIFICATIONS) || defined(IL_WS)
-  eq->dest_port = atoi(p);
+  if(p && c == ':') {
+         eq->dest_port = atoi(p);
+  } else {
+         eq->dest_port = 0; // use whatever default there is for given url scheme
+  }
 #else
-  eq->dest_port = p ? atoi(p)+1 : GLITE_JOBID_DEFAULT_PORT+1;
+         eq->dest_port = p && c == ':' ? atoi(p)+1 : GLITE_JOBID_DEFAULT_PORT+1;
 #endif
+
+  /* setup output functions */
+  if(output != NULL) {
+    eq->event_queue_connect = output->event_queue_connect;
+    eq->event_queue_send = output->event_queue_send;
+    eq->event_queue_close = output->event_queue_close;
+  } else {
+    eq->event_queue_connect = event_queue_connect;
+    eq->event_queue_send = event_queue_send;
+    eq->event_queue_close = event_queue_close;
+  }
+
   /* create all necessary locks */
   if(pthread_rwlock_init(&eq->update_lock, NULL)) {
     set_error(IL_SYS, errno, "event_queue_create: error creating update lock");
index 4c5763a..cd905c3 100644 (file)
@@ -191,8 +191,19 @@ struct event_queue {
        int                     cur_len;        /* current length */
        int                     throttling;     /* event insertion suspend flag */
        int                     first_event_sent; /* connection can be preempted by server */
+       /* delivery methods */
+       int             (*event_queue_connect)(struct event_queue *);
+       int             (*event_queue_send)(struct event_queue *);
+       int             (*event_queue_close)(struct event_queue *);
 };
 
+struct il_output_plugin {
+       int     (*event_queue_connect)(struct event_queue *);
+       int     (*event_queue_send)(struct event_queue *);
+       int     (*event_queue_close)(struct event_queue *);
+       int             (*plugin_init)();
+       int             (*plugin_supports_scheme)(const char *);
+};
 
 /* credential destructor */
 void cred_handle_destroy(void *);
@@ -207,7 +218,7 @@ int server_msg_is_priority(struct server_msg *);
 int server_msg_free(struct server_msg *);
 
 /* general event queue methods */
-struct event_queue *event_queue_create(char *);
+struct event_queue *event_queue_create(char *, struct il_output_plugin *);
 int event_queue_free(struct event_queue *);
 int event_queue_empty(struct event_queue *);
 int event_queue_insert(struct event_queue *, struct server_msg *);
@@ -273,6 +284,10 @@ int parse_header(const char *, il_http_message_t *);
 int receive_http(void *, int (*)(void *, char *, const int), il_http_message_t *);
 #endif
 
+/* plugin functions */
+int plugin_init(const char *);
+struct il_output_plugin *plugin_get(const char *);
+
 /* master main loop */
 int loop();
 
diff --git a/org.glite.lb.logger/src/plugin_mgr.c b/org.glite.lb.logger/src/plugin_mgr.c
new file mode 100644 (file)
index 0000000..0826dcc
--- /dev/null
@@ -0,0 +1,70 @@
+#ident "$Header$"
+
+#include "interlogd.h"
+
+#include <errno.h>
+#include <stdio.h>
+#include <stdlib.h>
+#include <dlfcn.h>
+
+struct plugin_list {
+       struct il_output_plugin plugin_def;
+       struct plugin_list *next;
+};
+
+static struct plugin_list *plugins = NULL;
+
+#define DL_RESOLVESYM(var, handle, name, type) \
+       dlerror(); \
+       var = (type) dlsym(handle, name); \
+       if(var == NULL) { \
+               snprintf(err, sizeof(err), "plugin_init: error resolving %s: %s", name, dlerror()); \
+               set_error(IL_SYS, errno, err); \
+               return -1; \
+       }
+
+int plugin_init(const char *plugin_name)
+{
+       char err[256];
+       void *dl_handle;
+       struct plugin_list *plugin;
+
+       dlerror();
+       dl_handle = dlopen(plugin_name, RTLD_LAZY);
+       if(dl_handle == NULL) {
+               snprintf(err, sizeof(err), "plugin_init: error opening dynamic library: %s", dlerror());
+               set_error(IL_SYS, errno, err);
+               return -1;
+       }
+       dlerror();
+
+       plugin = malloc(sizeof(*plugin));
+       if(plugin == NULL) {
+               set_error(IL_NOMEM, ENOMEM, "plugin_init: error allocating plugin description");
+               return -1;
+       }
+
+       plugin->next = plugins;
+       DL_RESOLVESYM(plugin->plugin_def.plugin_init, dl_handle, "plugin_init", int(*)());
+       DL_RESOLVESYM(plugin->plugin_def.plugin_supports_scheme, dl_handle,  "plugin_supports_scheme", int(*)(const char *));
+       DL_RESOLVESYM(plugin->plugin_def.event_queue_connect, dl_handle, "event_queue_connect", int (*)(struct event_queue*));
+       DL_RESOLVESYM(plugin->plugin_def.event_queue_send, dl_handle, "event_queue_send", int (*)(struct event_queue *));
+       DL_RESOLVESYM(plugin->plugin_def.event_queue_close, dl_handle, "event_queue_close", int (*)(struct event_queue *));
+
+       return (*plugin->plugin_def.plugin_init)();
+}
+
+
+struct il_output_plugin *
+plugin_get(const char *scheme)
+{
+       struct plugin_list *outp;
+
+       for(outp = plugins; outp != NULL; outp = outp->next) {
+               if((outp->plugin_def.plugin_supports_scheme)(scheme)) {
+                       return &outp->plugin_def;
+               }
+       }
+
+       return NULL;
+}
index 2879e14..3fb6014 100644 (file)
@@ -123,6 +123,8 @@ queue_list_get(char *job_id_s)
   char *dest;
   struct queue_list *q;
   struct event_queue *eq;
+  struct il_output_plugin *outp;
+
 #if !defined(IL_NOTIFICATIONS)
   IL_EVENT_ID_T job_id;
 
@@ -136,8 +138,10 @@ queue_list_get(char *job_id_s)
 
   dest = jobid2dest(job_id);
   edg_wlc_JobIdFree(job_id);
+  outp = NULL;
 #else
   dest = job_id_s;
+  outp = plugin_get(dest);
 #endif
 
   if(dest == NULL) 
@@ -149,7 +153,7 @@ queue_list_get(char *job_id_s)
 #endif
     return(q->queue);
   } else {
-    eq = event_queue_create(dest);
+    eq = event_queue_create(dest, outp);
     if(eq)
       queue_list_add(&queues, dest, eq);
 #if !defined(IL_NOTIFICATIONS)
@@ -172,7 +176,7 @@ queue_list_init(char *ls)
 {
 #if !defined(IL_NOTIFICATIONS)
   /* create queue for log server */
-  log_queue = event_queue_create(ls);
+  log_queue = event_queue_create(ls, NULL);
   if(log_queue == NULL)
     return(-1);
 #endif
index ea596a8..72f2964 100644 (file)
@@ -54,8 +54,8 @@ queue_thread(void *q)
        }
   
        glite_common_log(LOG_CATEGORY_CONTROL, LOG_PRIORITY_INFO, 
-                        "  started new thread for delivery to %s:%d", 
-                        eq->dest_name, eq->dest_port);
+                        "  started new thread for delivery to %s",
+                        eq->dest);
 
        pthread_cleanup_push(queue_thread_cleanup, q); 
 
@@ -77,10 +77,10 @@ queue_thread(void *q)
                        if(lazy_close && close_timeout) {
                                ret = event_queue_wait(eq, close_timeout);
                                if(ret == 1) {/* timeout? */
-                                       event_queue_close(eq);
+                                       (*eq->event_queue_close)(eq);
                                        glite_common_log(LOG_CATEGORY_LB_IL, LOG_PRIORITY_DEBUG, 
-                                                        "  connection to %s:%d closed",
-                                                        eq->dest_name, eq->dest_port);
+                                                        "  connection to %s closed",
+                                                        eq->dest);
                                }
                                close_timeout = 0;
                        } else {
@@ -89,7 +89,7 @@ queue_thread(void *q)
                                        glite_common_log(LOG_CATEGORY_CONTROL, LOG_PRIORITY_INFO, 
                                                         "  thread idle for more than %d seconds, exiting", 
                                                         exit_timeout);
-                                       event_queue_close(eq);
+                                       (*eq->event_queue_close)(eq);
                                        event_queue_cond_unlock(eq);
                                        pthread_exit((void*)0);
                                }
@@ -118,10 +118,10 @@ queue_thread(void *q)
 
                        /* deliver pending events */
                        glite_common_log(LOG_CATEGORY_LB_IL, LOG_PRIORITY_DEBUG, 
-                                        "  attempting delivery to %s:%d", 
-                                        eq->dest_name, eq->dest_port);
+                                        "  attempting delivery to %s",
+                                        eq->dest);
                        /* connect to server */
-                       if((ret=event_queue_connect(eq)) == 0) {
+                       if((ret=(*eq->event_queue_connect)(eq)) == 0) {
                                /* not connected */
                                if(error_get_maj() != IL_OK)
                                        glite_common_log(LOG_CATEGORY_LB_IL, LOG_PRIORITY_WARN, 
@@ -129,17 +129,17 @@ queue_thread(void *q)
 #if defined(IL_NOTIFICATIONS)
                                glite_common_log(LOG_CATEGORY_LB_IL, LOG_PRIORITY_INFO, 
                                                 "    could not connect to client %s, waiting for retry", 
-                                                eq->dest_name);
+                                                eq->dest);
 #else
                                glite_common_log(LOG_CATEGORY_LB_IL, LOG_PRIORITY_INFO, 
                                                 "    could not connect to bookkeeping server %s, waiting for retry", 
-                                                eq->dest_name);
+                                                eq->dest);
 #endif
                                retrycnt++;
                        } else {
                                retrycnt = 0;
                                /* connected, send events */
-                               switch(ret=event_queue_send(eq)) {
+                               switch(ret=(*eq->event_queue_send)(eq)) {
                                        
                                case 0:
                                        /* there was an error and we still have events to send */
@@ -155,7 +155,7 @@ queue_thread(void *q)
                                        /* hey, we are done for now */
                                        glite_common_log(LOG_CATEGORY_LB_IL, LOG_PRIORITY_DEBUG, 
                                                         "  all events for %s sent", 
-                                                        eq->dest_name);
+                                                        eq->dest);
                                        break;
                                        
                                default:
@@ -172,10 +172,10 @@ queue_thread(void *q)
                                if((ret == 1) && lazy_close)
                                        close_timeout = default_close_timeout;
                                else {
-                                       event_queue_close(eq);
+                                       (*eq->event_queue_close)(eq);
                                        glite_common_log(LOG_CATEGORY_LB_IL, LOG_PRIORITY_DEBUG,
-                                                        "  connection to %s:%d closed",
-                                                        eq->dest_name, eq->dest_port);
+                                                        "  connection to %sclosed",
+                                                        eq->dest);
                                }
                        }
                } 
index 84ae833..4bcc587 100644 (file)
@@ -156,8 +156,15 @@ server_msg_init(struct server_msg *msg, il_octet_string_t *event)
        }
 
        /* FIXME: check for allocation error */
-       if(notif_event->notification.dest_host && 
+       if(notif_event->notification.dest_url &&
+               (strlen(notif_event->notification.dest_url) > 0)) {
+               /* destination URL */
+               msg->dest = strdup(notif_event->notification.dest_url);
+               msg->dest_name = NULL;
+               msg->dest_port = 0;
+       } else if(notif_event->notification.dest_host &&
           (strlen(notif_event->notification.dest_host) > 0)) {
+               /* destination host and port */
                msg->dest_name = strdup(notif_event->notification.dest_host);
                msg->dest_port = notif_event->notification.dest_port;
                asprintf(&msg->dest, "%s:%d", msg->dest_name, msg->dest_port);