From: Michal Voců Date: Tue, 2 Feb 2010 14:21:32 +0000 (+0000) Subject: more meat... X-Git-Tag: glite-jobid_R_1_0_1_1~24 X-Git-Url: http://scientific.zcu.cz/git/?a=commitdiff_plain;h=b83f6ff666453ad765f6727afbf3843defeea496;p=jra1mw.git more meat... --- diff --git a/org.glite.lb.logger/src/activemq_cpp_plugin.cpp b/org.glite.lb.logger/src/activemq_cpp_plugin.cpp index 2fb610f..4efedfb 100644 --- a/org.glite.lb.logger/src/activemq_cpp_plugin.cpp +++ b/org.glite.lb.logger/src/activemq_cpp_plugin.cpp @@ -8,16 +8,24 @@ */ #include "interlogd.h" +#include "glite/lb/events_parse.h" +#include "glite/lb/context.h" +#include "glite/lbu/escape.h" +#include "glite/lb/jobstat.h" +#include #include #include #include #include #include +#include +#include +#include #include -class OutputPlugin { +class OutputPlugin : public cms::ExceptionListener { public: @@ -25,6 +33,11 @@ public: virtual void onException(const cms::CMSException &ex); + void connect(const std::string &topic); + void send(cms::Message &msg); + void close(); + void cleanup(); + public: cms::Session *session; @@ -38,6 +51,60 @@ public: }; +void +OutputPlugin::onException(const cms::CMSException &ex) +{ + this->cleanup(); +} + + +void +OutputPlugin::connect(const std::string &topic) +{ + if(this->session == NULL) { + this->session = self::connection->createSession(/* TODO: ackMode */); + this->destination = this->session->createTopic(topic); + this->producer = this->session->createProducer(this->destination); + } + self::connection->start(); + self::connection->setExceptionListener(this); +} + + +void +OutputPlugin::send(cms::Message &msg) +{ + +} + + +void +OutputPlugin::close() +{ + this->cleanup(); + self::connection->stop(); +} + + +void +OutputPlugin::cleanup() +{ + if(this->producer != NULL) { + delete this->producer; + this->producer = NULL; + } + if(this->destination != NULL) { + delete this->destination; + this->destination = NULL; + } + if(this->session != NULL) { + this->session->close(); + delete this->session; + this->session = NULL; + } +} + + extern "C" int event_queue_connect(struct event_queue *eq) @@ -53,31 +120,14 @@ event_queue_connect(struct event_queue *eq) } try { - output->session = OutputPlugin::connection->createSession(/* TODO: ackMode */); - output->destination = output->session->createTopic(topicName); - output->producer = output->session->createProducer(output->destination); - OutputPlugin::connection->start(); + output->connect(topicName); } catch(cms::CMSException &e) { - try { - if(output->producer != NULL) { - delete output->producer; - output->producer = NULL; - } - if(output->destination != NULL) { - delete output->destination; - output->destination = NULL; - } - if(output->session != NULL) { - output->session->close(); - delete output->session; - output->session = NULL; - } - } catch(cms::CMSException &e) { - return -1; - } + output->cleanup(); + eq->timeout = TIMEOUT; + return 0; } - - return 0; + eq->first_event_sent = 0; + return 1; } @@ -86,45 +136,125 @@ int event_queue_send(struct event_queue *eq) { OutputPlugin *output = (OutputPlugin*)eq->plugin_data; + edg_wll_Context context; + edg_wll_Event *notif_event; + edg_wll_JobStat *state_out; + int ret; assert(output != NULL); + edg_wll_InitContext(&context); + while(!event_queue_empty(eq)) { struct server_msg *msg; + char *s; + int i; std::string val; - if(event_queue_get(eq, &msg) < 0) - return(-1); + if(event_queue_get(eq, &msg) < 0) { + goto err; + } try { + ret=edg_wll_ParseNotifEvent(context, event->data, ¬if_event); + if(ret) { + set_error(IL_LBAPI, ret, "event_queue_send: error parsing notification event"); + goto err; + } + jobstat_char = glite_lbu_UnescapeXML((const char *) notif_event->notification.jobstat); + if (jobstat_char == NULL) { + set_error(IL_LBAPI, EINVAL, "event_queue_send: error unescaping job status"); + goto err; + } + if ( edg_wll_ParseJobStat(context, jobstat_char, strlen(jobstat_char), state_out)) { + set_error(IL_LBAPI, EINVAL, "event_queue_send: error parsing job status"); + goto err; + } + + cms::TextMessage *cms_msg = output->session->createTextMessage(); + /* ownerDn */ + val.assign(state_out->owner); cms_msg->setStringProperty('ownerDn', val); + /* voname */ + s = edg_wll_JDLField(state_out,"VirtualOrganisation"); + val.assign(s); + free(s); cms_msg->setStringProperty('voname', val); + /* bkHost */ + glite_jobid_getServerParts(state_out->jobId, &s, &i); + val.assign(s); + free(s); cms_msg->setStringProperty('bkHost', val); + /* networkServer */ + /* TODO: XXX cut out hostname */ + val.assign(state_out->network_server); cms_msg->setStringProperty('networkHost', val); + cms_msg->setStringProperty('lastUpdateTime', val); + /* stateName */ + s = edg_wll_StatToString(state_out->state); + val.assign(s); + free(s); cms_msg->setStringProperty('stateName', val); + cms_msg->setStringProperty('stateStartTime', val); + /* condorId */ + val.assign(state_out->condorId); cms_msg->setStringProperty('condorId', val); + /* destSite */ + val.assign(state_out->destination); cms_msg->setStringProperty('destSite', val); - cms_msg->setStringProperty('exitCode', val); - cms_msg->setStringProperty('doneCode', val); + /* exitCode */ + cms_msg->setIntProperty('exitCode', state_out->exitCode); + /* doneCode */ + cms_msg->setIntProperty('doneCode', state_out->doneCode); + /* statusReason */ + val.assign(state_out->reason); cms_msg->setStringProperty('statusReason', val); - output->producer->send(cms_msg); - // output->session->commit(); - delete cms_msg; + + edg_wll_FreeEvent(notif_event); + free(notif_event); + edg_wll_FreeStatus(state_out); + free(state_out); + free(jobstat_char); + } catch(cms::CMSException &e) { + goto err; + } + + try { + output->send(cms_msg); + delete cms_msg; if(event_store_commit(msg->es, msg->ev_len, queue_list_is_log(eq), msg->generation) < 0) { /* failure committing message, this is bad */ - return(-1); + goto err; } event_queue_remove(eq); eq->first_event_sent = 1; } catch(cms::CMSException &e) { - + delete cms_msg; + output->cleanup(); + eq->timeout = TIMEOUT; + edg_wll_FreeContext(context); + return 0; } - } + edg_wll_FreeContext(context); return 1; + +err: + if(notif_event) { + edg_wll_FreeEvent(notif_event); + free(notif_event); + } + if(jobstat_char) { + free(jobstat_char); + } + if(state_out) { + edg_wll_FreeStatus(state_out); + free(state_out); + } + return -1; } @@ -137,16 +267,11 @@ event_queue_close(struct event_queue *eq) assert(output != NULL); try { - if(output->session != NULL) { - output->session->close(); - delete output->session; - output->session = NULL; - } - OutputPlugin::connection->stop(); + output->close(); } catch(cms::CMSException &e) { return -1; } - + eq->first_event_sent = 0; return 0; } @@ -160,7 +285,8 @@ plugin_init() try { activemq::library::ActiveMQCPP::initializeLibrary(); - OutputPlugin::connectionFactory = cms::ConnectionFactory::createCMSConnectionFactory(brokerURI); + OutputPlugin::connectionFactory = + cms::ConnectionFactory::createCMSConnectionFactory(brokerURI); OutputPlugin::connection = output->connectionFactory->createConnection(); } catch (cms::CMSException &e) { try {