almost complete, compilation fixes
authorMichal Voců <michal@ruk.cuni.cz>
Mon, 8 Feb 2010 12:24:49 +0000 (12:24 +0000)
committerMichal Voců <michal@ruk.cuni.cz>
Mon, 8 Feb 2010 12:24:49 +0000 (12:24 +0000)
org.glite.lb.logger/src/activemq_cpp_plugin.cpp

index 4efedfb..5d887c3 100644 (file)
@@ -12,6 +12,7 @@
 #include "glite/lb/context.h"
 #include "glite/lbu/escape.h"
 #include "glite/lb/jobstat.h"
+#include "glite/lb/xml_parse.h"
 
 #include <activemq/library/ActiveMQCPP.h>
 #include <cms/ConnectionFactory.h>
 
 #include <string>
 
+#include <assert.h>
+#include <string.h>
+#include <errno.h>
+
+
 class OutputPlugin : public cms::ExceptionListener {
 
 public:
@@ -34,7 +40,7 @@ public:
        virtual void onException(const cms::CMSException &ex);
 
        void connect(const std::string &topic);
-       void send(cms::Message &msg);
+       void send(cms::Message *msg);
        void close();
        void cleanup();
 
@@ -44,10 +50,10 @@ public:
        cms::Topic   *destination;
        cms::MessageProducer *producer;
 
-       static cms::Connection *connection = NULL;
-       static cms::ConnectionFactory *connectionFactory = NULL;
+       static cms::Connection *connection;
+       static cms::ConnectionFactory *connectionFactory;
 
-       static const char *SCHEME = "x-msg:";
+       static const char *SCHEME;
 };
 
 
@@ -62,19 +68,21 @@ void
 OutputPlugin::connect(const std::string &topic)
 {
        if(this->session == NULL) {
-               this->session = self::connection->createSession(/* TODO: ackMode */);
+               this->session = connection->createSession(/* TODO: ackMode */);
                this->destination = this->session->createTopic(topic);
                this->producer = this->session->createProducer(this->destination);
        }
-       self::connection->start();
-       self::connection->setExceptionListener(this);
+       connection->start();
+       connection->setExceptionListener(this);
 }
 
 
 void
-OutputPlugin::send(cms::Message &msg)
+OutputPlugin::send(cms::Message *msg)
 {
-
+       if(this->producer != NULL) {
+               this->producer->send(this->destination, msg);
+       }
 }
 
 
@@ -82,7 +90,7 @@ void
 OutputPlugin::close()
 {
        this->cleanup();
-       self::connection->stop();
+       connection->stop();
 }
 
 
@@ -105,6 +113,16 @@ OutputPlugin::cleanup()
 }
 
 
+static
+void timeval2str(struct timeval *t, char **str) {
+        struct tm       *tm;
+
+        tm = gmtime(&t->tv_sec);
+        asprintf(str,"%4d-%02d-%02dT%02d:%02d:%02dZ",tm->tm_year+1900,tm->tm_mon+1,
+                tm->tm_mday,tm->tm_hour,tm->tm_min,tm->tm_sec);
+}
+
+
 extern "C"
 int
 event_queue_connect(struct event_queue *eq)
@@ -139,6 +157,8 @@ event_queue_send(struct event_queue *eq)
        edg_wll_Context context;
        edg_wll_Event *notif_event;
     edg_wll_JobStat *state_out;
+    il_octet_string_t event;
+    char *jobstat_char;
        int ret;
 
        assert(output != NULL);
@@ -147,8 +167,9 @@ event_queue_send(struct event_queue *eq)
 
        while(!event_queue_empty(eq)) {
            struct server_msg *msg;
+           cms::TextMessage *cms_msg;
            char *s;
-           int i;
+           unsigned int i;
            std::string val;
 
            if(event_queue_get(eq, &msg) < 0) {
@@ -156,7 +177,11 @@ event_queue_send(struct event_queue *eq)
            }
 
            try {
-               ret=edg_wll_ParseNotifEvent(context, event->data, &notif_event);
+               if(decode_il_msg(&event, msg->msg) < 0) {
+                       set_error(IL_LBAPI, EINVAL, "event_queue_send: error parsing notification event data");
+                       goto err;
+               }
+               ret=edg_wll_ParseNotifEvent(context, event.data, &notif_event);
                if(ret) {
                        set_error(IL_LBAPI, ret, "event_queue_send: error parsing notification event");
                        goto err;
@@ -172,47 +197,52 @@ event_queue_send(struct event_queue *eq)
                }
 
 
-               cms::TextMessage *cms_msg = output->session->createTextMessage();
+               cms_msg = output->session->createTextMessage();
                /* ownerDn */
                val.assign(state_out->owner);
-               cms_msg->setStringProperty('ownerDn', val);
+               cms_msg->setStringProperty("ownerDn", val);
                /* voname */
                s = edg_wll_JDLField(state_out,"VirtualOrganisation");
                val.assign(s);
                free(s);
-               cms_msg->setStringProperty('voname', val);
+               cms_msg->setStringProperty("voname", val);
                /* bkHost */
                glite_jobid_getServerParts(state_out->jobId, &s, &i);
                val.assign(s);
                free(s);
-               cms_msg->setStringProperty('bkHost', val);
+               cms_msg->setStringProperty("bkHost", val);
                /* networkServer */
                /* TODO: XXX cut out hostname */
                val.assign(state_out->network_server);
-               cms_msg->setStringProperty('networkHost', val);
-
-               cms_msg->setStringProperty('lastUpdateTime', val);
+               cms_msg->setStringProperty("networkHost", val);
+               timeval2str(&state_out->lastUpdateTime, &s);
+               val.assign(s);
+               if(s) free(s);
+               cms_msg->setStringProperty("lastUpdateTime", val);
                /* stateName */
                s = edg_wll_StatToString(state_out->state);
                val.assign(s);
-               free(s);
-               cms_msg->setStringProperty('stateName', val);
-
-               cms_msg->setStringProperty('stateStartTime', val);
+               if(s) free(s);
+               cms_msg->setStringProperty("stateName", val);
+               timeval2str(&state_out->stateEnterTime, &s);
+               val.assign(s);
+               if(s) free(s);
+               cms_msg->setStringProperty("stateStartTime", val);
                /* condorId */
                val.assign(state_out->condorId);
-               cms_msg->setStringProperty('condorId', val);
+               cms_msg->setStringProperty("condorId", val);
                /* destSite */
                val.assign(state_out->destination);
-               cms_msg->setStringProperty('destSite', val);
+               cms_msg->setStringProperty("destSite", val);
                /* exitCode */
-               cms_msg->setIntProperty('exitCode', state_out->exitCode);
+               cms_msg->setIntProperty("exitCode", state_out->exit_code);
                /* doneCode */
-               cms_msg->setIntProperty('doneCode', state_out->doneCode);
+               cms_msg->setIntProperty("doneCode", state_out->done_code);
                /* statusReason */
                val.assign(state_out->reason);
-               cms_msg->setStringProperty('statusReason', val);
+               cms_msg->setStringProperty("statusReason", val);
 
+               free(event.data);
                edg_wll_FreeEvent(notif_event);
                free(notif_event);
                edg_wll_FreeStatus(state_out);
@@ -243,6 +273,9 @@ event_queue_send(struct event_queue *eq)
        return 1;
 
 err:
+       if(event.data) {
+               free(event.data);
+       }
        if(notif_event) {
        edg_wll_FreeEvent(notif_event);
        free(notif_event);
@@ -278,7 +311,7 @@ event_queue_close(struct event_queue *eq)
 
 extern "C"
 int
-plugin_init()
+plugin_init(const char *s)
 {
        std::string brokerURI;
 
@@ -287,7 +320,7 @@ plugin_init()
 
                OutputPlugin::connectionFactory =
                                cms::ConnectionFactory::createCMSConnectionFactory(brokerURI);
-               OutputPlugin::connection = output->connectionFactory->createConnection();
+               OutputPlugin::connection = OutputPlugin::connectionFactory->createConnection();
        } catch (cms::CMSException &e) {
                try {
                        if(OutputPlugin::connection != NULL) {
@@ -314,3 +347,8 @@ plugin_supports_scheme(const char *scheme)
 {
        return strncmp(scheme, OutputPlugin::SCHEME, strlen(OutputPlugin::SCHEME)) == 0;
 }
+
+
+cms::Connection *OutputPlugin::connection = NULL;
+cms::ConnectionFactory *OutputPlugin::connectionFactory = NULL;
+const char *OutputPlugin::SCHEME = "x-msg://";