* Author: michal
*/
+extern "C" {
#include "glite/lb/interlogd.h"
#include "glite/lb/events_parse.h"
#include "glite/lb/context.h"
#include "glite/lbu/escape.h"
#include "glite/lb/jobstat.h"
#include "glite/lb/xml_parse.h"
+}
#include <activemq/library/ActiveMQCPP.h>
#include <cms/ConnectionFactory.h>
OutputPlugin() : session(NULL), destination(NULL), producer(NULL) {};
- cms::Message *createMessage(edg_wll_JobStat *state_out);
+ cms::Message *createMessage(edg_wll_JobStat &state_out);
void connect(const std::string &topic);
void send(cms::Message *msg);
void close();
void cleanup();
- static void initialize(std::string brokerURI);
+ static void initialize(const std::string &brokerURI);
static const char *SCHEME;
cms::Message *
-OutputPlugin::createMessage(edg_wll_JobStat *state_out)
+OutputPlugin::createMessage(edg_wll_JobStat &state_out)
{
cms::TextMessage *cms_msg = session->createTextMessage();
char *s;
std::string val;
/* ownerDn */
- val.assign(state_out->owner);
- cms_msg->setStringProperty("ownerDn", val);
+ if(state_out.owner) {
+ val.assign(state_out.owner);
+ cms_msg->setStringProperty("ownerDn", val);
+ }
/* voname */
- s = edg_wll_JDLField(state_out,"VirtualOrganisation");
- val.assign(s);
- free(s);
- cms_msg->setStringProperty("voname", val);
+ s = edg_wll_JDLField(&state_out,"VirtualOrganisation");
+ if(s) {
+ val.assign(s);
+ free(s);
+ cms_msg->setStringProperty("voname", val);
+ }
/* bkHost */
- glite_jobid_getServerParts(state_out->jobId, &s, &i);
- val.assign(s);
- free(s);
- cms_msg->setStringProperty("bkHost", val);
+ glite_jobid_getServerParts(state_out.jobId, &s, &i);
+ if(s) {
+ val.assign(s);
+ free(s);
+ cms_msg->setStringProperty("bkHost", val);
+ }
/* networkServer */
/* TODO: XXX cut out hostname */
- val.assign(state_out->network_server);
- cms_msg->setStringProperty("networkHost", val);
- timeval2str(&state_out->lastUpdateTime, &s);
- val.assign(s);
- if(s) free(s);
- cms_msg->setStringProperty("lastUpdateTime", val);
+ if(state_out.network_server) {
+ val.assign(state_out.network_server);
+ cms_msg->setStringProperty("networkHost", val);
+ }
+ timeval2str(&state_out.lastUpdateTime, &s);
+ if(s) {
+ val.assign(s);
+ free(s);
+ cms_msg->setStringProperty("lastUpdateTime", val);
+ }
/* stateName */
- s = edg_wll_StatToString(state_out->state);
- val.assign(s);
- if(s) free(s);
- cms_msg->setStringProperty("stateName", val);
- timeval2str(&state_out->stateEnterTime, &s);
- val.assign(s);
- if(s) free(s);
- cms_msg->setStringProperty("stateStartTime", val);
+ s = edg_wll_StatToString(state_out.state);
+ if(s) {
+ val.assign(s);
+ free(s);
+ cms_msg->setStringProperty("stateName", val);
+ }
+ timeval2str(&state_out.stateEnterTime, &s);
+ if(s) {
+ val.assign(s);
+ free(s);
+ cms_msg->setStringProperty("stateStartTime", val);
+ }
/* condorId */
- val.assign(state_out->condorId);
- cms_msg->setStringProperty("condorId", val);
+ if(state_out.condorId) {
+ val.assign(state_out.condorId);
+ cms_msg->setStringProperty("condorId", val);
+ }
/* destSite */
- val.assign(state_out->destination);
- cms_msg->setStringProperty("destSite", val);
+ if(state_out.destination) {
+ val.assign(state_out.destination);
+ cms_msg->setStringProperty("destSite", val);
+ }
/* exitCode */
- cms_msg->setIntProperty("exitCode", state_out->exit_code);
+ cms_msg->setIntProperty("exitCode", state_out.exit_code);
/* doneCode */
- cms_msg->setIntProperty("doneCode", state_out->done_code);
+ cms_msg->setIntProperty("doneCode", state_out.done_code);
/* statusReason */
- val.assign(state_out->reason);
- cms_msg->setStringProperty("statusReason", val);
+ if(state_out.reason) {
+ val.assign(state_out.reason);
+ cms_msg->setStringProperty("statusReason", val);
+ }
return cms_msg;
}
{
try {
if(producer != NULL) {
- producer->send(destination, msg);
+ producer->send(msg);
}
} catch(cms::CMSException &e) {
cleanup();
void
-OutputPlugin::initialize(std::string brokerURI)
+OutputPlugin::initialize(const std::string &brokerURI)
{
pthread_rwlock_init(&connection_lock, NULL);
try {
assert(output != NULL);
try {
+ glite_common_log(IL_LOG_CATEGORY, LOG_PRIORITY_DEBUG,
+ " trying to connect to %s",
+ eq->dest_name);
output->connect(topicName);
} catch(cms::CMSException &e) {
set_error(IL_DL, 0, (char*)e.what());
OutputPlugin *output = (OutputPlugin*)eq->plugin_data;
edg_wll_Context context;
edg_wll_Event *notif_event;
- edg_wll_JobStat *state_out;
+ edg_wll_JobStat state_out;
il_octet_string_t event;
char *jobstat_char;
int ret;
goto err;
}
- if(decode_il_msg(&event, msg->msg) < 0) {
+ glite_common_log(IL_LOG_CATEGORY, LOG_PRIORITY_DEBUG,
+ " trying to deliver event at offset %d for job %s",
+ msg->offset, msg->job_id_s);
+
+ if(decode_il_msg(&event, msg->msg + 17) < 0) {
set_error(IL_LBAPI, EINVAL, "event_queue_send: error parsing notification event data");
goto err;
}
set_error(IL_LBAPI, EINVAL, "event_queue_send: error unescaping job status");
goto err;
}
- if ( edg_wll_ParseJobStat(context, jobstat_char, strlen(jobstat_char), state_out)) {
+ if ( edg_wll_ParseJobStat(context, jobstat_char, strlen(jobstat_char), &state_out)) {
set_error(IL_LBAPI, EINVAL, "event_queue_send: error parsing job status");
+ fprintf(stderr, "Status string: %s\n", jobstat_char);
goto err;
}
try {
cms_msg = output->createMessage(state_out);
- free(event.data);
+ free(event.data); event.data = NULL;
edg_wll_FreeEvent(notif_event);
- free(notif_event);
- edg_wll_FreeStatus(state_out);
- free(state_out);
- free(jobstat_char);
+ free(notif_event); notif_event = NULL;
+ edg_wll_FreeStatus(&state_out);
+ free(jobstat_char); jobstat_char = NULL;
output->send(cms_msg);
delete cms_msg;
- if(event_store_commit(msg->es, msg->ev_len, queue_list_is_log(eq), msg->generation) < 0) {
+ if(event_store_commit(msg->es, msg->ev_len, 0, msg->generation) < 0) {
/* failure committing message, this is bad */
goto err;
}
+ glite_common_log(IL_LOG_CATEGORY, LOG_PRIORITY_DEBUG,
+ " event sent to %s", eq->dest_name);
+
} catch(cms::CMSException &e) {
if(cms_msg) {
delete cms_msg;
if(jobstat_char) {
free(jobstat_char);
}
- if(state_out) {
- edg_wll_FreeStatus(state_out);
- free(state_out);
- }
+ edg_wll_FreeStatus(&state_out);
+
return -1;
}