From: Michal Voců Date: Mon, 8 Feb 2010 12:24:49 +0000 (+0000) Subject: almost complete, compilation fixes X-Git-Tag: glite-jobid_R_1_0_1_1~21 X-Git-Url: http://scientific.zcu.cz/git/?a=commitdiff_plain;h=db1813ba2b9d1bf5066b174d40b5a39b7593e4dc;p=jra1mw.git almost complete, compilation fixes --- diff --git a/org.glite.lb.logger/src/activemq_cpp_plugin.cpp b/org.glite.lb.logger/src/activemq_cpp_plugin.cpp index 4efedfb..5d887c3 100644 --- a/org.glite.lb.logger/src/activemq_cpp_plugin.cpp +++ b/org.glite.lb.logger/src/activemq_cpp_plugin.cpp @@ -12,6 +12,7 @@ #include "glite/lb/context.h" #include "glite/lbu/escape.h" #include "glite/lb/jobstat.h" +#include "glite/lb/xml_parse.h" #include #include @@ -25,6 +26,11 @@ #include +#include +#include +#include + + class OutputPlugin : public cms::ExceptionListener { public: @@ -34,7 +40,7 @@ public: virtual void onException(const cms::CMSException &ex); void connect(const std::string &topic); - void send(cms::Message &msg); + void send(cms::Message *msg); void close(); void cleanup(); @@ -44,10 +50,10 @@ public: cms::Topic *destination; cms::MessageProducer *producer; - static cms::Connection *connection = NULL; - static cms::ConnectionFactory *connectionFactory = NULL; + static cms::Connection *connection; + static cms::ConnectionFactory *connectionFactory; - static const char *SCHEME = "x-msg:"; + static const char *SCHEME; }; @@ -62,19 +68,21 @@ void OutputPlugin::connect(const std::string &topic) { if(this->session == NULL) { - this->session = self::connection->createSession(/* TODO: ackMode */); + this->session = connection->createSession(/* TODO: ackMode */); this->destination = this->session->createTopic(topic); this->producer = this->session->createProducer(this->destination); } - self::connection->start(); - self::connection->setExceptionListener(this); + connection->start(); + connection->setExceptionListener(this); } void -OutputPlugin::send(cms::Message &msg) +OutputPlugin::send(cms::Message *msg) { - + if(this->producer != NULL) { + this->producer->send(this->destination, msg); + } } @@ -82,7 +90,7 @@ void OutputPlugin::close() { this->cleanup(); - self::connection->stop(); + connection->stop(); } @@ -105,6 +113,16 @@ OutputPlugin::cleanup() } +static +void timeval2str(struct timeval *t, char **str) { + struct tm *tm; + + tm = gmtime(&t->tv_sec); + asprintf(str,"%4d-%02d-%02dT%02d:%02d:%02dZ",tm->tm_year+1900,tm->tm_mon+1, + tm->tm_mday,tm->tm_hour,tm->tm_min,tm->tm_sec); +} + + extern "C" int event_queue_connect(struct event_queue *eq) @@ -139,6 +157,8 @@ event_queue_send(struct event_queue *eq) edg_wll_Context context; edg_wll_Event *notif_event; edg_wll_JobStat *state_out; + il_octet_string_t event; + char *jobstat_char; int ret; assert(output != NULL); @@ -147,8 +167,9 @@ event_queue_send(struct event_queue *eq) while(!event_queue_empty(eq)) { struct server_msg *msg; + cms::TextMessage *cms_msg; char *s; - int i; + unsigned int i; std::string val; if(event_queue_get(eq, &msg) < 0) { @@ -156,7 +177,11 @@ event_queue_send(struct event_queue *eq) } try { - ret=edg_wll_ParseNotifEvent(context, event->data, ¬if_event); + if(decode_il_msg(&event, msg->msg) < 0) { + set_error(IL_LBAPI, EINVAL, "event_queue_send: error parsing notification event data"); + goto err; + } + ret=edg_wll_ParseNotifEvent(context, event.data, ¬if_event); if(ret) { set_error(IL_LBAPI, ret, "event_queue_send: error parsing notification event"); goto err; @@ -172,47 +197,52 @@ event_queue_send(struct event_queue *eq) } - cms::TextMessage *cms_msg = output->session->createTextMessage(); + cms_msg = output->session->createTextMessage(); /* ownerDn */ val.assign(state_out->owner); - cms_msg->setStringProperty('ownerDn', val); + cms_msg->setStringProperty("ownerDn", val); /* voname */ s = edg_wll_JDLField(state_out,"VirtualOrganisation"); val.assign(s); free(s); - cms_msg->setStringProperty('voname', val); + cms_msg->setStringProperty("voname", val); /* bkHost */ glite_jobid_getServerParts(state_out->jobId, &s, &i); val.assign(s); free(s); - cms_msg->setStringProperty('bkHost', val); + cms_msg->setStringProperty("bkHost", val); /* networkServer */ /* TODO: XXX cut out hostname */ val.assign(state_out->network_server); - cms_msg->setStringProperty('networkHost', val); - - cms_msg->setStringProperty('lastUpdateTime', val); + cms_msg->setStringProperty("networkHost", val); + timeval2str(&state_out->lastUpdateTime, &s); + val.assign(s); + if(s) free(s); + cms_msg->setStringProperty("lastUpdateTime", val); /* stateName */ s = edg_wll_StatToString(state_out->state); val.assign(s); - free(s); - cms_msg->setStringProperty('stateName', val); - - cms_msg->setStringProperty('stateStartTime', val); + if(s) free(s); + cms_msg->setStringProperty("stateName", val); + timeval2str(&state_out->stateEnterTime, &s); + val.assign(s); + if(s) free(s); + cms_msg->setStringProperty("stateStartTime", val); /* condorId */ val.assign(state_out->condorId); - cms_msg->setStringProperty('condorId', val); + cms_msg->setStringProperty("condorId", val); /* destSite */ val.assign(state_out->destination); - cms_msg->setStringProperty('destSite', val); + cms_msg->setStringProperty("destSite", val); /* exitCode */ - cms_msg->setIntProperty('exitCode', state_out->exitCode); + cms_msg->setIntProperty("exitCode", state_out->exit_code); /* doneCode */ - cms_msg->setIntProperty('doneCode', state_out->doneCode); + cms_msg->setIntProperty("doneCode", state_out->done_code); /* statusReason */ val.assign(state_out->reason); - cms_msg->setStringProperty('statusReason', val); + cms_msg->setStringProperty("statusReason", val); + free(event.data); edg_wll_FreeEvent(notif_event); free(notif_event); edg_wll_FreeStatus(state_out); @@ -243,6 +273,9 @@ event_queue_send(struct event_queue *eq) return 1; err: + if(event.data) { + free(event.data); + } if(notif_event) { edg_wll_FreeEvent(notif_event); free(notif_event); @@ -278,7 +311,7 @@ event_queue_close(struct event_queue *eq) extern "C" int -plugin_init() +plugin_init(const char *s) { std::string brokerURI; @@ -287,7 +320,7 @@ plugin_init() OutputPlugin::connectionFactory = cms::ConnectionFactory::createCMSConnectionFactory(brokerURI); - OutputPlugin::connection = output->connectionFactory->createConnection(); + OutputPlugin::connection = OutputPlugin::connectionFactory->createConnection(); } catch (cms::CMSException &e) { try { if(OutputPlugin::connection != NULL) { @@ -314,3 +347,8 @@ plugin_supports_scheme(const char *scheme) { return strncmp(scheme, OutputPlugin::SCHEME, strlen(OutputPlugin::SCHEME)) == 0; } + + +cms::Connection *OutputPlugin::connection = NULL; +cms::ConnectionFactory *OutputPlugin::connectionFactory = NULL; +const char *OutputPlugin::SCHEME = "x-msg://";