From 5a2693ca602ea746bfc69a32128a29183144a850 Mon Sep 17 00:00:00 2001 From: =?utf8?q?Michal=20Voc=C5=AF?= Date: Fri, 25 Jun 2010 12:37:48 +0000 Subject: [PATCH] reworked initialization and error reporting --- .../src/activemq_cpp_plugin.cpp | 379 ++++++++++++--------- 1 file changed, 222 insertions(+), 157 deletions(-) diff --git a/org.glite.lb.logger-msg/src/activemq_cpp_plugin.cpp b/org.glite.lb.logger-msg/src/activemq_cpp_plugin.cpp index 8559962..71885c5 100644 --- a/org.glite.lb.logger-msg/src/activemq_cpp_plugin.cpp +++ b/org.glite.lb.logger-msg/src/activemq_cpp_plugin.cpp @@ -48,57 +48,135 @@ limitations under the License. #include #include -class OutputPlugin : public cms::ExceptionListener { +class OutputPlugin { public: OutputPlugin() : session(NULL), destination(NULL), producer(NULL) {}; - virtual void onException(const cms::CMSException &ex); - void connect(const std::string &topic); - void send(cms::Message *msg); - void close(); - void cleanup(); + cms::Message *createMessage(edg_wll_JobStat *state_out); + void connect(const std::string &topic); + void send(cms::Message *msg); + void close(); + void cleanup(); -public: + static void initialize(std::string brokerURI); + + static const char *SCHEME; + +private: + + static cms::Connection *getConnection(); + static void releaseConnection(); cms::Session *session; cms::Topic *destination; cms::MessageProducer *producer; + cms::Connection *current_connection; static cms::Connection *connection; static cms::ConnectionFactory *connectionFactory; - - static const char *SCHEME; + static pthread_rwlock_t connection_lock; }; void -OutputPlugin::onException(const cms::CMSException &ex) +OutputPlugin::connect(const std::string &topic) { - this->cleanup(); + try { + current_connection = getConnection(); + if(session == NULL) { + session = current_connection->createSession(/* TODO: ackMode */); + destination = session->createTopic(topic); + producer = session->createProducer(destination); + } + current_connection->start(); + releaseConnection(); + } catch (cms::CMSException &e) { + releaseConnection(); + cleanup(); + throw e; + } } -void -OutputPlugin::connect(const std::string &topic) +static +void timeval2str(struct timeval *t, char **str) { + struct tm *tm; + + tm = gmtime(&t->tv_sec); + asprintf(str,"%4d-%02d-%02dT%02d:%02d:%02dZ",tm->tm_year+1900,tm->tm_mon+1, + tm->tm_mday,tm->tm_hour,tm->tm_min,tm->tm_sec); +} + + +cms::Message * +OutputPlugin::createMessage(edg_wll_JobStat *state_out) { - if(this->session == NULL) { - this->session = connection->createSession(/* TODO: ackMode */); - this->destination = this->session->createTopic(topic); - this->producer = this->session->createProducer(this->destination); - } - connection->start(); - connection->setExceptionListener(this); + cms::TextMessage *cms_msg = session->createTextMessage(); + char *s; + unsigned int i; + std::string val; + + /* ownerDn */ + val.assign(state_out->owner); + cms_msg->setStringProperty("ownerDn", val); + /* voname */ + s = edg_wll_JDLField(state_out,"VirtualOrganisation"); + val.assign(s); + free(s); + cms_msg->setStringProperty("voname", val); + /* bkHost */ + glite_jobid_getServerParts(state_out->jobId, &s, &i); + val.assign(s); + free(s); + cms_msg->setStringProperty("bkHost", val); + /* networkServer */ + /* TODO: XXX cut out hostname */ + val.assign(state_out->network_server); + cms_msg->setStringProperty("networkHost", val); + timeval2str(&state_out->lastUpdateTime, &s); + val.assign(s); + if(s) free(s); + cms_msg->setStringProperty("lastUpdateTime", val); + /* stateName */ + s = edg_wll_StatToString(state_out->state); + val.assign(s); + if(s) free(s); + cms_msg->setStringProperty("stateName", val); + timeval2str(&state_out->stateEnterTime, &s); + val.assign(s); + if(s) free(s); + cms_msg->setStringProperty("stateStartTime", val); + /* condorId */ + val.assign(state_out->condorId); + cms_msg->setStringProperty("condorId", val); + /* destSite */ + val.assign(state_out->destination); + cms_msg->setStringProperty("destSite", val); + /* exitCode */ + cms_msg->setIntProperty("exitCode", state_out->exit_code); + /* doneCode */ + cms_msg->setIntProperty("doneCode", state_out->done_code); + /* statusReason */ + val.assign(state_out->reason); + cms_msg->setStringProperty("statusReason", val); + + return cms_msg; } void OutputPlugin::send(cms::Message *msg) { - if(this->producer != NULL) { - this->producer->send(this->destination, msg); + try { + if(producer != NULL) { + producer->send(destination, msg); + } + } catch(cms::CMSException &e) { + cleanup(); + throw e; } } @@ -106,46 +184,93 @@ OutputPlugin::send(cms::Message *msg) void OutputPlugin::close() { - this->cleanup(); - connection->stop(); + if(producer != NULL) { + delete producer; + producer = NULL; + } + if(destination != NULL) { + delete destination; + destination = NULL; + } + if(session != NULL) { + session->close(); + delete session; + session = NULL; + } } void OutputPlugin::cleanup() { - if(this->producer != NULL) { - delete this->producer; - this->producer = NULL; + close(); + pthread_rwlock_wrlock(&connection_lock); + if(connection && current_connection == connection) { + connection->close(); + delete connection; + connection = NULL; + current_connection = NULL; } - if(this->destination != NULL) { - delete this->destination; - this->destination = NULL; + pthread_rwlock_unlock(&connection_lock); +} + + +cms::Connection * +OutputPlugin::getConnection() +{ + pthread_rwlock_rdlock(&connection_lock); + if(connection) { + return connection; } - if(this->session != NULL) { - this->session->close(); - delete this->session; - this->session = NULL; + + pthread_rwlock_unlock(&connection_lock); + + pthread_rwlock_wrlock(&connection_lock); + if(!connection) { + connection = connectionFactory->createConnection(); } + pthread_rwlock_unlock(&connection_lock); + + pthread_rwlock_rdlock(&connection_lock); + return connection; } -static -void timeval2str(struct timeval *t, char **str) { - struct tm *tm; +void +OutputPlugin::releaseConnection() +{ + pthread_rwlock_unlock(&connection_lock); +} - tm = gmtime(&t->tv_sec); - asprintf(str,"%4d-%02d-%02dT%02d:%02d:%02dZ",tm->tm_year+1900,tm->tm_mon+1, - tm->tm_mday,tm->tm_hour,tm->tm_min,tm->tm_sec); + +void +OutputPlugin::initialize(std::string brokerURI) +{ + pthread_rwlock_init(&connection_lock, NULL); + try { + activemq::library::ActiveMQCPP::initializeLibrary(); + + connectionFactory = cms::ConnectionFactory::createCMSConnectionFactory(brokerURI); + } catch (cms::CMSException &e) { + try { + if(connectionFactory != NULL) { + delete connectionFactory; + connectionFactory = NULL; + } + } catch(cms::CMSException &d) { + } + throw e; + } } + extern "C" int event_queue_connect(struct event_queue *eq) { OutputPlugin *output; - std::string topicName; + std::string topicName(eq->dest_name); if(eq->plugin_data == NULL) { output = new OutputPlugin(); @@ -154,12 +279,14 @@ event_queue_connect(struct event_queue *eq) output = (OutputPlugin*)eq->plugin_data; } + assert(output != NULL); + try { output->connect(topicName); } catch(cms::CMSException &e) { - output->cleanup(); - eq->timeout = TIMEOUT; - return 0; + set_error(IL_DL, 0, (char*)e.what()); + eq->timeout = TIMEOUT; + return 0; } eq->first_event_sent = 0; return 1; @@ -173,9 +300,9 @@ event_queue_send(struct event_queue *eq) OutputPlugin *output = (OutputPlugin*)eq->plugin_data; edg_wll_Context context; edg_wll_Event *notif_event; - edg_wll_JobStat *state_out; - il_octet_string_t event; - char *jobstat_char; + edg_wll_JobStat *state_out; + il_octet_string_t event; + char *jobstat_char; int ret; assert(output != NULL); @@ -184,107 +311,58 @@ event_queue_send(struct event_queue *eq) while(!event_queue_empty(eq)) { struct server_msg *msg; - cms::TextMessage *cms_msg; - char *s; - unsigned int i; - std::string val; + cms::Message *cms_msg; if(event_queue_get(eq, &msg) < 0) { goto err; } - try { - if(decode_il_msg(&event, msg->msg) < 0) { - set_error(IL_LBAPI, EINVAL, "event_queue_send: error parsing notification event data"); - goto err; - } - ret=edg_wll_ParseNotifEvent(context, event.data, ¬if_event); - if(ret) { - set_error(IL_LBAPI, ret, "event_queue_send: error parsing notification event"); - goto err; - } - jobstat_char = glite_lbu_UnescapeXML((const char *) notif_event->notification.jobstat); - if (jobstat_char == NULL) { + if(decode_il_msg(&event, msg->msg) < 0) { + set_error(IL_LBAPI, EINVAL, "event_queue_send: error parsing notification event data"); + goto err; + } + ret=edg_wll_ParseNotifEvent(context, event.data, ¬if_event); + if(ret) { + set_error(IL_LBAPI, ret, "event_queue_send: error parsing notification event"); + goto err; + } + jobstat_char = glite_lbu_UnescapeXML((const char *) notif_event->notification.jobstat); + if (jobstat_char == NULL) { set_error(IL_LBAPI, EINVAL, "event_queue_send: error unescaping job status"); goto err; - } - if ( edg_wll_ParseJobStat(context, jobstat_char, strlen(jobstat_char), state_out)) { - set_error(IL_LBAPI, EINVAL, "event_queue_send: error parsing job status"); - goto err; - } - - - cms_msg = output->session->createTextMessage(); - /* ownerDn */ - val.assign(state_out->owner); - cms_msg->setStringProperty("ownerDn", val); - /* voname */ - s = edg_wll_JDLField(state_out,"VirtualOrganisation"); - val.assign(s); - free(s); - cms_msg->setStringProperty("voname", val); - /* bkHost */ - glite_jobid_getServerParts(state_out->jobId, &s, &i); - val.assign(s); - free(s); - cms_msg->setStringProperty("bkHost", val); - /* networkServer */ - /* TODO: XXX cut out hostname */ - val.assign(state_out->network_server); - cms_msg->setStringProperty("networkHost", val); - timeval2str(&state_out->lastUpdateTime, &s); - val.assign(s); - if(s) free(s); - cms_msg->setStringProperty("lastUpdateTime", val); - /* stateName */ - s = edg_wll_StatToString(state_out->state); - val.assign(s); - if(s) free(s); - cms_msg->setStringProperty("stateName", val); - timeval2str(&state_out->stateEnterTime, &s); - val.assign(s); - if(s) free(s); - cms_msg->setStringProperty("stateStartTime", val); - /* condorId */ - val.assign(state_out->condorId); - cms_msg->setStringProperty("condorId", val); - /* destSite */ - val.assign(state_out->destination); - cms_msg->setStringProperty("destSite", val); - /* exitCode */ - cms_msg->setIntProperty("exitCode", state_out->exit_code); - /* doneCode */ - cms_msg->setIntProperty("doneCode", state_out->done_code); - /* statusReason */ - val.assign(state_out->reason); - cms_msg->setStringProperty("statusReason", val); - - free(event.data); - edg_wll_FreeEvent(notif_event); - free(notif_event); - edg_wll_FreeStatus(state_out); - free(state_out); - free(jobstat_char); - } catch(cms::CMSException &e) { - goto err; + } + if ( edg_wll_ParseJobStat(context, jobstat_char, strlen(jobstat_char), state_out)) { + set_error(IL_LBAPI, EINVAL, "event_queue_send: error parsing job status"); + goto err; } try { - output->send(cms_msg); - delete cms_msg; - if(event_store_commit(msg->es, msg->ev_len, queue_list_is_log(eq), msg->generation) < 0) { - /* failure committing message, this is bad */ - goto err; - } - event_queue_remove(eq); - eq->first_event_sent = 1; + cms_msg = output->createMessage(state_out); + + free(event.data); + edg_wll_FreeEvent(notif_event); + free(notif_event); + edg_wll_FreeStatus(state_out); + free(state_out); + free(jobstat_char); + + output->send(cms_msg); + delete cms_msg; + if(event_store_commit(msg->es, msg->ev_len, queue_list_is_log(eq), msg->generation) < 0) { + /* failure committing message, this is bad */ + goto err; + } } catch(cms::CMSException &e) { - delete cms_msg; - output->cleanup(); - eq->timeout = TIMEOUT; - edg_wll_FreeContext(context); - return 0; + if(cms_msg) { + delete cms_msg; + } + eq->timeout = TIMEOUT; + edg_wll_FreeContext(context); + set_error(IL_DL, 0, (char*)e.what()); + return 0; } + event_queue_remove(eq); + eq->first_event_sent = 1; } edg_wll_FreeContext(context); return 1; @@ -294,8 +372,8 @@ err: free(event.data); } if(notif_event) { - edg_wll_FreeEvent(notif_event); - free(notif_event); + edg_wll_FreeEvent(notif_event); + free(notif_event); } if(jobstat_char) { free(jobstat_char); @@ -316,9 +394,10 @@ event_queue_close(struct event_queue *eq) assert(output != NULL); - try { + try { output->close(); } catch(cms::CMSException &e) { + set_error(IL_DL, 0, (char*)e.what()); return -1; } eq->first_event_sent = 0; @@ -362,23 +441,8 @@ plugin_init(char *config) } try { - activemq::library::ActiveMQCPP::initializeLibrary(); - - OutputPlugin::connectionFactory = - cms::ConnectionFactory::createCMSConnectionFactory(brokerURI); - OutputPlugin::connection = OutputPlugin::connectionFactory->createConnection(); - } catch (cms::CMSException &e) { - try { - if(OutputPlugin::connection != NULL) { - delete OutputPlugin::connection; - OutputPlugin::connection = NULL; - } - if(OutputPlugin::connectionFactory != NULL) { - delete OutputPlugin::connectionFactory; - OutputPlugin::connectionFactory = NULL; - } - } catch(cms::CMSException &e) { - } + OutputPlugin::initialize(brokerURI); + } catch(cms::CMSException &e) { set_error(IL_DL, 0, (char*)e.what()); return -1; } @@ -397,4 +461,5 @@ plugin_supports_scheme(const char *scheme) cms::Connection *OutputPlugin::connection = NULL; cms::ConnectionFactory *OutputPlugin::connectionFactory = NULL; -const char *OutputPlugin::SCHEME = "x-msg:"; +const char *OutputPlugin::SCHEME = "x-msg://"; +pthread_rwlock_t OutputPlugin::connection_lock; -- 1.8.2.3