#include "glite/lb/context.h"
#include "glite/lbu/escape.h"
#include "glite/lb/jobstat.h"
+#include "glite/lb/xml_parse.h"
#include <activemq/library/ActiveMQCPP.h>
#include <cms/ConnectionFactory.h>
#include <string>
+#include <assert.h>
+#include <string.h>
+#include <errno.h>
+
+
class OutputPlugin : public cms::ExceptionListener {
public:
virtual void onException(const cms::CMSException &ex);
void connect(const std::string &topic);
- void send(cms::Message &msg);
+ void send(cms::Message *msg);
void close();
void cleanup();
cms::Topic *destination;
cms::MessageProducer *producer;
- static cms::Connection *connection = NULL;
- static cms::ConnectionFactory *connectionFactory = NULL;
+ static cms::Connection *connection;
+ static cms::ConnectionFactory *connectionFactory;
- static const char *SCHEME = "x-msg:";
+ static const char *SCHEME;
};
OutputPlugin::connect(const std::string &topic)
{
if(this->session == NULL) {
- this->session = self::connection->createSession(/* TODO: ackMode */);
+ this->session = connection->createSession(/* TODO: ackMode */);
this->destination = this->session->createTopic(topic);
this->producer = this->session->createProducer(this->destination);
}
- self::connection->start();
- self::connection->setExceptionListener(this);
+ connection->start();
+ connection->setExceptionListener(this);
}
void
-OutputPlugin::send(cms::Message &msg)
+OutputPlugin::send(cms::Message *msg)
{
-
+ if(this->producer != NULL) {
+ this->producer->send(this->destination, msg);
+ }
}
OutputPlugin::close()
{
this->cleanup();
- self::connection->stop();
+ connection->stop();
}
}
+static
+void timeval2str(struct timeval *t, char **str) {
+ struct tm *tm;
+
+ tm = gmtime(&t->tv_sec);
+ asprintf(str,"%4d-%02d-%02dT%02d:%02d:%02dZ",tm->tm_year+1900,tm->tm_mon+1,
+ tm->tm_mday,tm->tm_hour,tm->tm_min,tm->tm_sec);
+}
+
+
extern "C"
int
event_queue_connect(struct event_queue *eq)
edg_wll_Context context;
edg_wll_Event *notif_event;
edg_wll_JobStat *state_out;
+ il_octet_string_t event;
+ char *jobstat_char;
int ret;
assert(output != NULL);
while(!event_queue_empty(eq)) {
struct server_msg *msg;
+ cms::TextMessage *cms_msg;
char *s;
- int i;
+ unsigned int i;
std::string val;
if(event_queue_get(eq, &msg) < 0) {
}
try {
- ret=edg_wll_ParseNotifEvent(context, event->data, ¬if_event);
+ if(decode_il_msg(&event, msg->msg) < 0) {
+ set_error(IL_LBAPI, EINVAL, "event_queue_send: error parsing notification event data");
+ goto err;
+ }
+ ret=edg_wll_ParseNotifEvent(context, event.data, ¬if_event);
if(ret) {
set_error(IL_LBAPI, ret, "event_queue_send: error parsing notification event");
goto err;
}
- cms::TextMessage *cms_msg = output->session->createTextMessage();
+ cms_msg = output->session->createTextMessage();
/* ownerDn */
val.assign(state_out->owner);
- cms_msg->setStringProperty('ownerDn', val);
+ cms_msg->setStringProperty("ownerDn", val);
/* voname */
s = edg_wll_JDLField(state_out,"VirtualOrganisation");
val.assign(s);
free(s);
- cms_msg->setStringProperty('voname', val);
+ cms_msg->setStringProperty("voname", val);
/* bkHost */
glite_jobid_getServerParts(state_out->jobId, &s, &i);
val.assign(s);
free(s);
- cms_msg->setStringProperty('bkHost', val);
+ cms_msg->setStringProperty("bkHost", val);
/* networkServer */
/* TODO: XXX cut out hostname */
val.assign(state_out->network_server);
- cms_msg->setStringProperty('networkHost', val);
-
- cms_msg->setStringProperty('lastUpdateTime', val);
+ cms_msg->setStringProperty("networkHost", val);
+ timeval2str(&state_out->lastUpdateTime, &s);
+ val.assign(s);
+ if(s) free(s);
+ cms_msg->setStringProperty("lastUpdateTime", val);
/* stateName */
s = edg_wll_StatToString(state_out->state);
val.assign(s);
- free(s);
- cms_msg->setStringProperty('stateName', val);
-
- cms_msg->setStringProperty('stateStartTime', val);
+ if(s) free(s);
+ cms_msg->setStringProperty("stateName", val);
+ timeval2str(&state_out->stateEnterTime, &s);
+ val.assign(s);
+ if(s) free(s);
+ cms_msg->setStringProperty("stateStartTime", val);
/* condorId */
val.assign(state_out->condorId);
- cms_msg->setStringProperty('condorId', val);
+ cms_msg->setStringProperty("condorId", val);
/* destSite */
val.assign(state_out->destination);
- cms_msg->setStringProperty('destSite', val);
+ cms_msg->setStringProperty("destSite", val);
/* exitCode */
- cms_msg->setIntProperty('exitCode', state_out->exitCode);
+ cms_msg->setIntProperty("exitCode", state_out->exit_code);
/* doneCode */
- cms_msg->setIntProperty('doneCode', state_out->doneCode);
+ cms_msg->setIntProperty("doneCode", state_out->done_code);
/* statusReason */
val.assign(state_out->reason);
- cms_msg->setStringProperty('statusReason', val);
+ cms_msg->setStringProperty("statusReason", val);
+ free(event.data);
edg_wll_FreeEvent(notif_event);
free(notif_event);
edg_wll_FreeStatus(state_out);
return 1;
err:
+ if(event.data) {
+ free(event.data);
+ }
if(notif_event) {
edg_wll_FreeEvent(notif_event);
free(notif_event);
extern "C"
int
-plugin_init()
+plugin_init(const char *s)
{
std::string brokerURI;
OutputPlugin::connectionFactory =
cms::ConnectionFactory::createCMSConnectionFactory(brokerURI);
- OutputPlugin::connection = output->connectionFactory->createConnection();
+ OutputPlugin::connection = OutputPlugin::connectionFactory->createConnection();
} catch (cms::CMSException &e) {
try {
if(OutputPlugin::connection != NULL) {
{
return strncmp(scheme, OutputPlugin::SCHEME, strlen(OutputPlugin::SCHEME)) == 0;
}
+
+
+cms::Connection *OutputPlugin::connection = NULL;
+cms::ConnectionFactory *OutputPlugin::connectionFactory = NULL;
+const char *OutputPlugin::SCHEME = "x-msg://";