more meat...
authorMichal Voců <michal@ruk.cuni.cz>
Tue, 2 Feb 2010 14:21:32 +0000 (14:21 +0000)
committerMichal Voců <michal@ruk.cuni.cz>
Tue, 2 Feb 2010 14:21:32 +0000 (14:21 +0000)
org.glite.lb.logger/src/activemq_cpp_plugin.cpp

index 2fb610f..4efedfb 100644 (file)
@@ -8,16 +8,24 @@
  */
 
 #include "interlogd.h"
+#include "glite/lb/events_parse.h"
+#include "glite/lb/context.h"
+#include "glite/lbu/escape.h"
+#include "glite/lb/jobstat.h"
 
+#include <activemq/library/ActiveMQCPP.h>
 #include <cms/ConnectionFactory.h>
 #include <cms/Connection.h>
 #include <cms/Session.h>
 #include <cms/Topic.h>
 #include <cms/MessageProducer.h>
+#include <cms/ExceptionListener.h>
+#include <cms/TextMessage.h>
+#include <cms/Message.h>
 
 #include <string>
 
-class OutputPlugin {
+class OutputPlugin : public cms::ExceptionListener {
 
 public:
 
@@ -25,6 +33,11 @@ public:
 
        virtual void onException(const cms::CMSException &ex);
 
+       void connect(const std::string &topic);
+       void send(cms::Message &msg);
+       void close();
+       void cleanup();
+
 public:
 
        cms::Session *session;
@@ -38,6 +51,60 @@ public:
 };
 
 
+void
+OutputPlugin::onException(const cms::CMSException &ex)
+{
+       this->cleanup();
+}
+
+
+void
+OutputPlugin::connect(const std::string &topic)
+{
+       if(this->session == NULL) {
+               this->session = self::connection->createSession(/* TODO: ackMode */);
+               this->destination = this->session->createTopic(topic);
+               this->producer = this->session->createProducer(this->destination);
+       }
+       self::connection->start();
+       self::connection->setExceptionListener(this);
+}
+
+
+void
+OutputPlugin::send(cms::Message &msg)
+{
+
+}
+
+
+void
+OutputPlugin::close()
+{
+       this->cleanup();
+       self::connection->stop();
+}
+
+
+void
+OutputPlugin::cleanup()
+{
+       if(this->producer != NULL) {
+               delete this->producer;
+               this->producer = NULL;
+       }
+       if(this->destination != NULL) {
+               delete this->destination;
+               this->destination = NULL;
+       }
+       if(this->session != NULL) {
+               this->session->close();
+               delete this->session;
+               this->session = NULL;
+       }
+}
+
+
 extern "C"
 int
 event_queue_connect(struct event_queue *eq)
@@ -53,31 +120,14 @@ event_queue_connect(struct event_queue *eq)
        }
 
        try {
-               output->session = OutputPlugin::connection->createSession(/* TODO: ackMode */);
-               output->destination = output->session->createTopic(topicName);
-               output->producer = output->session->createProducer(output->destination);
-               OutputPlugin::connection->start();
+               output->connect(topicName);
        } catch(cms::CMSException &e) {
-               try {
-                       if(output->producer != NULL) {
-                               delete output->producer;
-                               output->producer = NULL;
-                       }
-                       if(output->destination != NULL) {
-                               delete output->destination;
-                               output->destination = NULL;
-                       }
-                       if(output->session != NULL) {
-                               output->session->close();
-                               delete output->session;
-                               output->session = NULL;
-                       }
-               } catch(cms::CMSException &e) {
-                       return -1;
-               }
+                       output->cleanup();
+                       eq->timeout = TIMEOUT;
+                       return 0;
        }
-
-       return 0;
+       eq->first_event_sent = 0;
+       return 1;
 }
 
 
@@ -86,45 +136,125 @@ int
 event_queue_send(struct event_queue *eq)
 {
        OutputPlugin *output = (OutputPlugin*)eq->plugin_data;
+       edg_wll_Context context;
+       edg_wll_Event *notif_event;
+    edg_wll_JobStat *state_out;
+       int ret;
 
        assert(output != NULL);
 
+       edg_wll_InitContext(&context);
+
        while(!event_queue_empty(eq)) {
            struct server_msg *msg;
+           char *s;
+           int i;
            std::string val;
 
-           if(event_queue_get(eq, &msg) < 0)
-             return(-1);
+           if(event_queue_get(eq, &msg) < 0) {
+               goto err;
+           }
 
            try {
+               ret=edg_wll_ParseNotifEvent(context, event->data, &notif_event);
+               if(ret) {
+                       set_error(IL_LBAPI, ret, "event_queue_send: error parsing notification event");
+                       goto err;
+               }
+               jobstat_char = glite_lbu_UnescapeXML((const char *) notif_event->notification.jobstat);
+               if (jobstat_char == NULL) {
+                   set_error(IL_LBAPI, EINVAL, "event_queue_send: error unescaping job status");
+                   goto err;
+               }
+               if ( edg_wll_ParseJobStat(context, jobstat_char, strlen(jobstat_char), state_out)) {
+                       set_error(IL_LBAPI, EINVAL, "event_queue_send: error parsing job status");
+                       goto err;
+               }
+
+
                cms::TextMessage *cms_msg = output->session->createTextMessage();
+               /* ownerDn */
+               val.assign(state_out->owner);
                cms_msg->setStringProperty('ownerDn', val);
+               /* voname */
+               s = edg_wll_JDLField(state_out,"VirtualOrganisation");
+               val.assign(s);
+               free(s);
                cms_msg->setStringProperty('voname', val);
+               /* bkHost */
+               glite_jobid_getServerParts(state_out->jobId, &s, &i);
+               val.assign(s);
+               free(s);
                cms_msg->setStringProperty('bkHost', val);
+               /* networkServer */
+               /* TODO: XXX cut out hostname */
+               val.assign(state_out->network_server);
                cms_msg->setStringProperty('networkHost', val);
+
                cms_msg->setStringProperty('lastUpdateTime', val);
+               /* stateName */
+               s = edg_wll_StatToString(state_out->state);
+               val.assign(s);
+               free(s);
                cms_msg->setStringProperty('stateName', val);
+
                cms_msg->setStringProperty('stateStartTime', val);
+               /* condorId */
+               val.assign(state_out->condorId);
                cms_msg->setStringProperty('condorId', val);
+               /* destSite */
+               val.assign(state_out->destination);
                cms_msg->setStringProperty('destSite', val);
-               cms_msg->setStringProperty('exitCode', val);
-               cms_msg->setStringProperty('doneCode', val);
+               /* exitCode */
+               cms_msg->setIntProperty('exitCode', state_out->exitCode);
+               /* doneCode */
+               cms_msg->setIntProperty('doneCode', state_out->doneCode);
+               /* statusReason */
+               val.assign(state_out->reason);
                cms_msg->setStringProperty('statusReason', val);
-               output->producer->send(cms_msg);
-               // output->session->commit();
-               delete cms_msg;
+
+               edg_wll_FreeEvent(notif_event);
+               free(notif_event);
+               edg_wll_FreeStatus(state_out);
+               free(state_out);
+               free(jobstat_char);
+           } catch(cms::CMSException &e) {
+               goto err;
+           }
+
+           try {
+               output->send(cms_msg);
+                   delete cms_msg;
                if(event_store_commit(msg->es, msg->ev_len, queue_list_is_log(eq), msg->generation) < 0) {
                        /* failure committing message, this is bad */
-                       return(-1);
+                       goto err;
                }
                event_queue_remove(eq);
                eq->first_event_sent = 1;
            } catch(cms::CMSException &e) {
-
+                   delete cms_msg;
+               output->cleanup();
+               eq->timeout = TIMEOUT;
+               edg_wll_FreeContext(context);
+               return 0;
            }
-
        }
+       edg_wll_FreeContext(context);
        return 1;
+
+err:
+       if(notif_event) {
+       edg_wll_FreeEvent(notif_event);
+       free(notif_event);
+       }
+       if(jobstat_char) {
+               free(jobstat_char);
+       }
+       if(state_out) {
+               edg_wll_FreeStatus(state_out);
+               free(state_out);
+       }
+       return -1;
 }
 
 
@@ -137,16 +267,11 @@ event_queue_close(struct event_queue *eq)
        assert(output != NULL);
 
        try {
-               if(output->session != NULL) {
-                       output->session->close();
-                       delete output->session;
-                       output->session = NULL;
-               }
-               OutputPlugin::connection->stop();
+               output->close();
        } catch(cms::CMSException &e) {
                return -1;
        }
-
+       eq->first_event_sent = 0;
        return 0;
 }
 
@@ -160,7 +285,8 @@ plugin_init()
        try {
                activemq::library::ActiveMQCPP::initializeLibrary();
 
-               OutputPlugin::connectionFactory = cms::ConnectionFactory::createCMSConnectionFactory(brokerURI);
+               OutputPlugin::connectionFactory =
+                               cms::ConnectionFactory::createCMSConnectionFactory(brokerURI);
                OutputPlugin::connection = output->connectionFactory->createConnection();
        } catch (cms::CMSException &e) {
                try {