From a6bd0c68975bd6299af064461ed4b45c69cfef28 Mon Sep 17 00:00:00 2001 From: =?utf8?q?Michal=20Voc=C5=AF?= Date: Mon, 1 Feb 2010 17:34:47 +0000 Subject: [PATCH] more meat to the skeleton... --- org.glite.lb.logger/src/activemq_cpp_plugin.cpp | 106 +++++++++++++++++++++--- 1 file changed, 96 insertions(+), 10 deletions(-) diff --git a/org.glite.lb.logger/src/activemq_cpp_plugin.cpp b/org.glite.lb.logger/src/activemq_cpp_plugin.cpp index e6ca060..2fb610f 100644 --- a/org.glite.lb.logger/src/activemq_cpp_plugin.cpp +++ b/org.glite.lb.logger/src/activemq_cpp_plugin.cpp @@ -10,13 +10,18 @@ #include "interlogd.h" #include +#include +#include +#include +#include + #include class OutputPlugin { public: - OutputPlugin() : connection(NULL) {}; + OutputPlugin() : session(NULL), destination(NULL), producer(NULL) {}; virtual void onException(const cms::CMSException &ex); @@ -26,9 +31,10 @@ public: cms::Topic *destination; cms::MessageProducer *producer; - static cms::Connection *connection; + static cms::Connection *connection = NULL; static cms::ConnectionFactory *connectionFactory = NULL; + static const char *SCHEME = "x-msg:"; }; @@ -46,10 +52,30 @@ event_queue_connect(struct event_queue *eq) output = (OutputPlugin*)eq->plugin_data; } - output->session = OutputPlugin::connection->createSession(/* TODO: ackMode */); - output->destination = output->session->createTopic(topicName); - output->producer = output->session->createProducer(output->destination); - OutputPlugin::connection->start(); + try { + output->session = OutputPlugin::connection->createSession(/* TODO: ackMode */); + output->destination = output->session->createTopic(topicName); + output->producer = output->session->createProducer(output->destination); + OutputPlugin::connection->start(); + } catch(cms::CMSException &e) { + try { + if(output->producer != NULL) { + delete output->producer; + output->producer = NULL; + } + if(output->destination != NULL) { + delete output->destination; + output->destination = NULL; + } + if(output->session != NULL) { + output->session->close(); + delete output->session; + output->session = NULL; + } + } catch(cms::CMSException &e) { + return -1; + } + } return 0; } @@ -59,7 +85,46 @@ extern "C" int event_queue_send(struct event_queue *eq) { - return 0; + OutputPlugin *output = (OutputPlugin*)eq->plugin_data; + + assert(output != NULL); + + while(!event_queue_empty(eq)) { + struct server_msg *msg; + std::string val; + + if(event_queue_get(eq, &msg) < 0) + return(-1); + + try { + cms::TextMessage *cms_msg = output->session->createTextMessage(); + cms_msg->setStringProperty('ownerDn', val); + cms_msg->setStringProperty('voname', val); + cms_msg->setStringProperty('bkHost', val); + cms_msg->setStringProperty('networkHost', val); + cms_msg->setStringProperty('lastUpdateTime', val); + cms_msg->setStringProperty('stateName', val); + cms_msg->setStringProperty('stateStartTime', val); + cms_msg->setStringProperty('condorId', val); + cms_msg->setStringProperty('destSite', val); + cms_msg->setStringProperty('exitCode', val); + cms_msg->setStringProperty('doneCode', val); + cms_msg->setStringProperty('statusReason', val); + output->producer->send(cms_msg); + // output->session->commit(); + delete cms_msg; + if(event_store_commit(msg->es, msg->ev_len, queue_list_is_log(eq), msg->generation) < 0) { + /* failure committing message, this is bad */ + return(-1); + } + event_queue_remove(eq); + eq->first_event_sent = 1; + } catch(cms::CMSException &e) { + + } + + } + return 1; } @@ -71,7 +136,16 @@ event_queue_close(struct event_queue *eq) assert(output != NULL); - OutputPlugin::connection->stop(); + try { + if(output->session != NULL) { + output->session->close(); + delete output->session; + output->session = NULL; + } + OutputPlugin::connection->stop(); + } catch(cms::CMSException &e) { + return -1; + } return 0; } @@ -88,7 +162,19 @@ plugin_init() OutputPlugin::connectionFactory = cms::ConnectionFactory::createCMSConnectionFactory(brokerURI); OutputPlugin::connection = output->connectionFactory->createConnection(); - } catch (cms::CMSException e) { + } catch (cms::CMSException &e) { + try { + if(OutputPlugin::connection != NULL) { + delete OutputPlugin::connection; + OutputPlugin::connection = NULL; + } + if(OutputPlugin::connectionFactory != NULL) { + delete OutputPlugin::connectionFactory; + OutputPlugin::connectionFactory = NULL; + } + } catch(cms::CMSException &e) { + + } return -1; } @@ -100,5 +186,5 @@ extern "C" int plugin_supports_scheme(const char *scheme) { - + return strncmp(scheme, OutputPlugin::SCHEME, strlen(OutputPlugin::SCHEME)) == 0; } -- 1.8.2.3