more meat to the skeleton...
authorMichal Voců <michal@ruk.cuni.cz>
Mon, 1 Feb 2010 17:34:47 +0000 (17:34 +0000)
committerMichal Voců <michal@ruk.cuni.cz>
Mon, 1 Feb 2010 17:34:47 +0000 (17:34 +0000)
org.glite.lb.logger/src/activemq_cpp_plugin.cpp

index e6ca060..2fb610f 100644 (file)
 #include "interlogd.h"
 
 #include <cms/ConnectionFactory.h>
+#include <cms/Connection.h>
+#include <cms/Session.h>
+#include <cms/Topic.h>
+#include <cms/MessageProducer.h>
+
 #include <string>
 
 class OutputPlugin {
 
 public:
 
-       OutputPlugin() : connection(NULL) {};
+       OutputPlugin() : session(NULL), destination(NULL), producer(NULL) {};
 
        virtual void onException(const cms::CMSException &ex);
 
@@ -26,9 +31,10 @@ public:
        cms::Topic   *destination;
        cms::MessageProducer *producer;
 
-       static cms::Connection *connection;
+       static cms::Connection *connection = NULL;
        static cms::ConnectionFactory *connectionFactory = NULL;
 
+       static const char *SCHEME = "x-msg:";
 };
 
 
@@ -46,10 +52,30 @@ event_queue_connect(struct event_queue *eq)
                output = (OutputPlugin*)eq->plugin_data;
        }
 
-       output->session = OutputPlugin::connection->createSession(/* TODO: ackMode */);
-       output->destination = output->session->createTopic(topicName);
-       output->producer = output->session->createProducer(output->destination);
-       OutputPlugin::connection->start();
+       try {
+               output->session = OutputPlugin::connection->createSession(/* TODO: ackMode */);
+               output->destination = output->session->createTopic(topicName);
+               output->producer = output->session->createProducer(output->destination);
+               OutputPlugin::connection->start();
+       } catch(cms::CMSException &e) {
+               try {
+                       if(output->producer != NULL) {
+                               delete output->producer;
+                               output->producer = NULL;
+                       }
+                       if(output->destination != NULL) {
+                               delete output->destination;
+                               output->destination = NULL;
+                       }
+                       if(output->session != NULL) {
+                               output->session->close();
+                               delete output->session;
+                               output->session = NULL;
+                       }
+               } catch(cms::CMSException &e) {
+                       return -1;
+               }
+       }
 
        return 0;
 }
@@ -59,7 +85,46 @@ extern "C"
 int
 event_queue_send(struct event_queue *eq)
 {
-       return 0;
+       OutputPlugin *output = (OutputPlugin*)eq->plugin_data;
+
+       assert(output != NULL);
+
+       while(!event_queue_empty(eq)) {
+           struct server_msg *msg;
+           std::string val;
+
+           if(event_queue_get(eq, &msg) < 0)
+             return(-1);
+
+           try {
+               cms::TextMessage *cms_msg = output->session->createTextMessage();
+               cms_msg->setStringProperty('ownerDn', val);
+               cms_msg->setStringProperty('voname', val);
+               cms_msg->setStringProperty('bkHost', val);
+               cms_msg->setStringProperty('networkHost', val);
+               cms_msg->setStringProperty('lastUpdateTime', val);
+               cms_msg->setStringProperty('stateName', val);
+               cms_msg->setStringProperty('stateStartTime', val);
+               cms_msg->setStringProperty('condorId', val);
+               cms_msg->setStringProperty('destSite', val);
+               cms_msg->setStringProperty('exitCode', val);
+               cms_msg->setStringProperty('doneCode', val);
+               cms_msg->setStringProperty('statusReason', val);
+               output->producer->send(cms_msg);
+               // output->session->commit();
+               delete cms_msg;
+               if(event_store_commit(msg->es, msg->ev_len, queue_list_is_log(eq), msg->generation) < 0) {
+                       /* failure committing message, this is bad */
+                       return(-1);
+               }
+               event_queue_remove(eq);
+               eq->first_event_sent = 1;
+           } catch(cms::CMSException &e) {
+
+           }
+
+       }
+       return 1;
 }
 
 
@@ -71,7 +136,16 @@ event_queue_close(struct event_queue *eq)
 
        assert(output != NULL);
 
-       OutputPlugin::connection->stop();
+       try {
+               if(output->session != NULL) {
+                       output->session->close();
+                       delete output->session;
+                       output->session = NULL;
+               }
+               OutputPlugin::connection->stop();
+       } catch(cms::CMSException &e) {
+               return -1;
+       }
 
        return 0;
 }
@@ -88,7 +162,19 @@ plugin_init()
 
                OutputPlugin::connectionFactory = cms::ConnectionFactory::createCMSConnectionFactory(brokerURI);
                OutputPlugin::connection = output->connectionFactory->createConnection();
-       } catch (cms::CMSException e) {
+       } catch (cms::CMSException &e) {
+               try {
+                       if(OutputPlugin::connection != NULL) {
+                               delete OutputPlugin::connection;
+                               OutputPlugin::connection = NULL;
+                       }
+                       if(OutputPlugin::connectionFactory != NULL) {
+                               delete OutputPlugin::connectionFactory;
+                               OutputPlugin::connectionFactory = NULL;
+                       }
+               } catch(cms::CMSException &e) {
+
+               }
                return -1;
        }
 
@@ -100,5 +186,5 @@ extern "C"
 int
 plugin_supports_scheme(const char *scheme)
 {
-
+       return strncmp(scheme, OutputPlugin::SCHEME, strlen(OutputPlugin::SCHEME)) == 0;
 }