#include <errno.h>
#include <sys/param.h>
-class OutputPlugin : public cms::ExceptionListener {
+class OutputPlugin {
public:
OutputPlugin() : session(NULL), destination(NULL), producer(NULL) {};
- virtual void onException(const cms::CMSException &ex);
- void connect(const std::string &topic);
- void send(cms::Message *msg);
- void close();
- void cleanup();
+ cms::Message *createMessage(edg_wll_JobStat *state_out);
+ void connect(const std::string &topic);
+ void send(cms::Message *msg);
+ void close();
+ void cleanup();
-public:
+ static void initialize(std::string brokerURI);
+
+ static const char *SCHEME;
+
+private:
+
+ static cms::Connection *getConnection();
+ static void releaseConnection();
cms::Session *session;
cms::Topic *destination;
cms::MessageProducer *producer;
+ cms::Connection *current_connection;
static cms::Connection *connection;
static cms::ConnectionFactory *connectionFactory;
-
- static const char *SCHEME;
+ static pthread_rwlock_t connection_lock;
};
void
-OutputPlugin::onException(const cms::CMSException &ex)
+OutputPlugin::connect(const std::string &topic)
{
- this->cleanup();
+ try {
+ current_connection = getConnection();
+ if(session == NULL) {
+ session = current_connection->createSession(/* TODO: ackMode */);
+ destination = session->createTopic(topic);
+ producer = session->createProducer(destination);
+ }
+ current_connection->start();
+ releaseConnection();
+ } catch (cms::CMSException &e) {
+ releaseConnection();
+ cleanup();
+ throw e;
+ }
}
-void
-OutputPlugin::connect(const std::string &topic)
+static
+void timeval2str(struct timeval *t, char **str) {
+ struct tm *tm;
+
+ tm = gmtime(&t->tv_sec);
+ asprintf(str,"%4d-%02d-%02dT%02d:%02d:%02dZ",tm->tm_year+1900,tm->tm_mon+1,
+ tm->tm_mday,tm->tm_hour,tm->tm_min,tm->tm_sec);
+}
+
+
+cms::Message *
+OutputPlugin::createMessage(edg_wll_JobStat *state_out)
{
- if(this->session == NULL) {
- this->session = connection->createSession(/* TODO: ackMode */);
- this->destination = this->session->createTopic(topic);
- this->producer = this->session->createProducer(this->destination);
- }
- connection->start();
- connection->setExceptionListener(this);
+ cms::TextMessage *cms_msg = session->createTextMessage();
+ char *s;
+ unsigned int i;
+ std::string val;
+
+ /* ownerDn */
+ val.assign(state_out->owner);
+ cms_msg->setStringProperty("ownerDn", val);
+ /* voname */
+ s = edg_wll_JDLField(state_out,"VirtualOrganisation");
+ val.assign(s);
+ free(s);
+ cms_msg->setStringProperty("voname", val);
+ /* bkHost */
+ glite_jobid_getServerParts(state_out->jobId, &s, &i);
+ val.assign(s);
+ free(s);
+ cms_msg->setStringProperty("bkHost", val);
+ /* networkServer */
+ /* TODO: XXX cut out hostname */
+ val.assign(state_out->network_server);
+ cms_msg->setStringProperty("networkHost", val);
+ timeval2str(&state_out->lastUpdateTime, &s);
+ val.assign(s);
+ if(s) free(s);
+ cms_msg->setStringProperty("lastUpdateTime", val);
+ /* stateName */
+ s = edg_wll_StatToString(state_out->state);
+ val.assign(s);
+ if(s) free(s);
+ cms_msg->setStringProperty("stateName", val);
+ timeval2str(&state_out->stateEnterTime, &s);
+ val.assign(s);
+ if(s) free(s);
+ cms_msg->setStringProperty("stateStartTime", val);
+ /* condorId */
+ val.assign(state_out->condorId);
+ cms_msg->setStringProperty("condorId", val);
+ /* destSite */
+ val.assign(state_out->destination);
+ cms_msg->setStringProperty("destSite", val);
+ /* exitCode */
+ cms_msg->setIntProperty("exitCode", state_out->exit_code);
+ /* doneCode */
+ cms_msg->setIntProperty("doneCode", state_out->done_code);
+ /* statusReason */
+ val.assign(state_out->reason);
+ cms_msg->setStringProperty("statusReason", val);
+
+ return cms_msg;
}
void
OutputPlugin::send(cms::Message *msg)
{
- if(this->producer != NULL) {
- this->producer->send(this->destination, msg);
+ try {
+ if(producer != NULL) {
+ producer->send(destination, msg);
+ }
+ } catch(cms::CMSException &e) {
+ cleanup();
+ throw e;
}
}
void
OutputPlugin::close()
{
- this->cleanup();
- connection->stop();
+ if(producer != NULL) {
+ delete producer;
+ producer = NULL;
+ }
+ if(destination != NULL) {
+ delete destination;
+ destination = NULL;
+ }
+ if(session != NULL) {
+ session->close();
+ delete session;
+ session = NULL;
+ }
}
void
OutputPlugin::cleanup()
{
- if(this->producer != NULL) {
- delete this->producer;
- this->producer = NULL;
+ close();
+ pthread_rwlock_wrlock(&connection_lock);
+ if(connection && current_connection == connection) {
+ connection->close();
+ delete connection;
+ connection = NULL;
+ current_connection = NULL;
}
- if(this->destination != NULL) {
- delete this->destination;
- this->destination = NULL;
+ pthread_rwlock_unlock(&connection_lock);
+}
+
+
+cms::Connection *
+OutputPlugin::getConnection()
+{
+ pthread_rwlock_rdlock(&connection_lock);
+ if(connection) {
+ return connection;
}
- if(this->session != NULL) {
- this->session->close();
- delete this->session;
- this->session = NULL;
+
+ pthread_rwlock_unlock(&connection_lock);
+
+ pthread_rwlock_wrlock(&connection_lock);
+ if(!connection) {
+ connection = connectionFactory->createConnection();
}
+ pthread_rwlock_unlock(&connection_lock);
+
+ pthread_rwlock_rdlock(&connection_lock);
+ return connection;
}
-static
-void timeval2str(struct timeval *t, char **str) {
- struct tm *tm;
+void
+OutputPlugin::releaseConnection()
+{
+ pthread_rwlock_unlock(&connection_lock);
+}
- tm = gmtime(&t->tv_sec);
- asprintf(str,"%4d-%02d-%02dT%02d:%02d:%02dZ",tm->tm_year+1900,tm->tm_mon+1,
- tm->tm_mday,tm->tm_hour,tm->tm_min,tm->tm_sec);
+
+void
+OutputPlugin::initialize(std::string brokerURI)
+{
+ pthread_rwlock_init(&connection_lock, NULL);
+ try {
+ activemq::library::ActiveMQCPP::initializeLibrary();
+
+ connectionFactory = cms::ConnectionFactory::createCMSConnectionFactory(brokerURI);
+ } catch (cms::CMSException &e) {
+ try {
+ if(connectionFactory != NULL) {
+ delete connectionFactory;
+ connectionFactory = NULL;
+ }
+ } catch(cms::CMSException &d) {
+ }
+ throw e;
+ }
}
+
extern "C"
int
event_queue_connect(struct event_queue *eq)
{
OutputPlugin *output;
- std::string topicName;
+ std::string topicName(eq->dest_name);
if(eq->plugin_data == NULL) {
output = new OutputPlugin();
output = (OutputPlugin*)eq->plugin_data;
}
+ assert(output != NULL);
+
try {
output->connect(topicName);
} catch(cms::CMSException &e) {
- output->cleanup();
- eq->timeout = TIMEOUT;
- return 0;
+ set_error(IL_DL, 0, (char*)e.what());
+ eq->timeout = TIMEOUT;
+ return 0;
}
eq->first_event_sent = 0;
return 1;
OutputPlugin *output = (OutputPlugin*)eq->plugin_data;
edg_wll_Context context;
edg_wll_Event *notif_event;
- edg_wll_JobStat *state_out;
- il_octet_string_t event;
- char *jobstat_char;
+ edg_wll_JobStat *state_out;
+ il_octet_string_t event;
+ char *jobstat_char;
int ret;
assert(output != NULL);
while(!event_queue_empty(eq)) {
struct server_msg *msg;
- cms::TextMessage *cms_msg;
- char *s;
- unsigned int i;
- std::string val;
+ cms::Message *cms_msg;
if(event_queue_get(eq, &msg) < 0) {
goto err;
}
- try {
- if(decode_il_msg(&event, msg->msg) < 0) {
- set_error(IL_LBAPI, EINVAL, "event_queue_send: error parsing notification event data");
- goto err;
- }
- ret=edg_wll_ParseNotifEvent(context, event.data, ¬if_event);
- if(ret) {
- set_error(IL_LBAPI, ret, "event_queue_send: error parsing notification event");
- goto err;
- }
- jobstat_char = glite_lbu_UnescapeXML((const char *) notif_event->notification.jobstat);
- if (jobstat_char == NULL) {
+ if(decode_il_msg(&event, msg->msg) < 0) {
+ set_error(IL_LBAPI, EINVAL, "event_queue_send: error parsing notification event data");
+ goto err;
+ }
+ ret=edg_wll_ParseNotifEvent(context, event.data, ¬if_event);
+ if(ret) {
+ set_error(IL_LBAPI, ret, "event_queue_send: error parsing notification event");
+ goto err;
+ }
+ jobstat_char = glite_lbu_UnescapeXML((const char *) notif_event->notification.jobstat);
+ if (jobstat_char == NULL) {
set_error(IL_LBAPI, EINVAL, "event_queue_send: error unescaping job status");
goto err;
- }
- if ( edg_wll_ParseJobStat(context, jobstat_char, strlen(jobstat_char), state_out)) {
- set_error(IL_LBAPI, EINVAL, "event_queue_send: error parsing job status");
- goto err;
- }
-
-
- cms_msg = output->session->createTextMessage();
- /* ownerDn */
- val.assign(state_out->owner);
- cms_msg->setStringProperty("ownerDn", val);
- /* voname */
- s = edg_wll_JDLField(state_out,"VirtualOrganisation");
- val.assign(s);
- free(s);
- cms_msg->setStringProperty("voname", val);
- /* bkHost */
- glite_jobid_getServerParts(state_out->jobId, &s, &i);
- val.assign(s);
- free(s);
- cms_msg->setStringProperty("bkHost", val);
- /* networkServer */
- /* TODO: XXX cut out hostname */
- val.assign(state_out->network_server);
- cms_msg->setStringProperty("networkHost", val);
- timeval2str(&state_out->lastUpdateTime, &s);
- val.assign(s);
- if(s) free(s);
- cms_msg->setStringProperty("lastUpdateTime", val);
- /* stateName */
- s = edg_wll_StatToString(state_out->state);
- val.assign(s);
- if(s) free(s);
- cms_msg->setStringProperty("stateName", val);
- timeval2str(&state_out->stateEnterTime, &s);
- val.assign(s);
- if(s) free(s);
- cms_msg->setStringProperty("stateStartTime", val);
- /* condorId */
- val.assign(state_out->condorId);
- cms_msg->setStringProperty("condorId", val);
- /* destSite */
- val.assign(state_out->destination);
- cms_msg->setStringProperty("destSite", val);
- /* exitCode */
- cms_msg->setIntProperty("exitCode", state_out->exit_code);
- /* doneCode */
- cms_msg->setIntProperty("doneCode", state_out->done_code);
- /* statusReason */
- val.assign(state_out->reason);
- cms_msg->setStringProperty("statusReason", val);
-
- free(event.data);
- edg_wll_FreeEvent(notif_event);
- free(notif_event);
- edg_wll_FreeStatus(state_out);
- free(state_out);
- free(jobstat_char);
- } catch(cms::CMSException &e) {
- goto err;
+ }
+ if ( edg_wll_ParseJobStat(context, jobstat_char, strlen(jobstat_char), state_out)) {
+ set_error(IL_LBAPI, EINVAL, "event_queue_send: error parsing job status");
+ goto err;
}
try {
- output->send(cms_msg);
- delete cms_msg;
- if(event_store_commit(msg->es, msg->ev_len, queue_list_is_log(eq), msg->generation) < 0) {
- /* failure committing message, this is bad */
- goto err;
- }
- event_queue_remove(eq);
- eq->first_event_sent = 1;
+ cms_msg = output->createMessage(state_out);
+
+ free(event.data);
+ edg_wll_FreeEvent(notif_event);
+ free(notif_event);
+ edg_wll_FreeStatus(state_out);
+ free(state_out);
+ free(jobstat_char);
+
+ output->send(cms_msg);
+ delete cms_msg;
+ if(event_store_commit(msg->es, msg->ev_len, queue_list_is_log(eq), msg->generation) < 0) {
+ /* failure committing message, this is bad */
+ goto err;
+ }
} catch(cms::CMSException &e) {
- delete cms_msg;
- output->cleanup();
- eq->timeout = TIMEOUT;
- edg_wll_FreeContext(context);
- return 0;
+ if(cms_msg) {
+ delete cms_msg;
+ }
+ eq->timeout = TIMEOUT;
+ edg_wll_FreeContext(context);
+ set_error(IL_DL, 0, (char*)e.what());
+ return 0;
}
+ event_queue_remove(eq);
+ eq->first_event_sent = 1;
}
edg_wll_FreeContext(context);
return 1;
free(event.data);
}
if(notif_event) {
- edg_wll_FreeEvent(notif_event);
- free(notif_event);
+ edg_wll_FreeEvent(notif_event);
+ free(notif_event);
}
if(jobstat_char) {
free(jobstat_char);
assert(output != NULL);
- try {
+ try {
output->close();
} catch(cms::CMSException &e) {
+ set_error(IL_DL, 0, (char*)e.what());
return -1;
}
eq->first_event_sent = 0;
}
try {
- activemq::library::ActiveMQCPP::initializeLibrary();
-
- OutputPlugin::connectionFactory =
- cms::ConnectionFactory::createCMSConnectionFactory(brokerURI);
- OutputPlugin::connection = OutputPlugin::connectionFactory->createConnection();
- } catch (cms::CMSException &e) {
- try {
- if(OutputPlugin::connection != NULL) {
- delete OutputPlugin::connection;
- OutputPlugin::connection = NULL;
- }
- if(OutputPlugin::connectionFactory != NULL) {
- delete OutputPlugin::connectionFactory;
- OutputPlugin::connectionFactory = NULL;
- }
- } catch(cms::CMSException &e) {
- }
+ OutputPlugin::initialize(brokerURI);
+ } catch(cms::CMSException &e) {
set_error(IL_DL, 0, (char*)e.what());
return -1;
}
cms::Connection *OutputPlugin::connection = NULL;
cms::ConnectionFactory *OutputPlugin::connectionFactory = NULL;
-const char *OutputPlugin::SCHEME = "x-msg:";
+const char *OutputPlugin::SCHEME = "x-msg://";
+pthread_rwlock_t OutputPlugin::connection_lock;