null value handling, other fixes
authorMichal Voců <michal@ruk.cuni.cz>
Fri, 3 Sep 2010 18:10:25 +0000 (18:10 +0000)
committerMichal Voců <michal@ruk.cuni.cz>
Fri, 3 Sep 2010 18:10:25 +0000 (18:10 +0000)
org.glite.lb.logger-msg/src/activemq_cpp_plugin.cpp

index 71885c5..fb3d5e8 100644 (file)
@@ -24,12 +24,14 @@ limitations under the License.
  *      Author: michal
  */
 
+extern "C" {
 #include "glite/lb/interlogd.h"
 #include "glite/lb/events_parse.h"
 #include "glite/lb/context.h"
 #include "glite/lbu/escape.h"
 #include "glite/lb/jobstat.h"
 #include "glite/lb/xml_parse.h"
+}
 
 #include <activemq/library/ActiveMQCPP.h>
 #include <cms/ConnectionFactory.h>
@@ -55,13 +57,13 @@ public:
        OutputPlugin() : session(NULL), destination(NULL), producer(NULL) {};
 
 
-       cms::Message *createMessage(edg_wll_JobStat *state_out);
+       cms::Message *createMessage(edg_wll_JobStat &state_out);
        void         connect(const std::string &topic);
        void         send(cms::Message *msg);
        void         close();
        void         cleanup();
 
-       static void   initialize(std::string brokerURI);
+       static void   initialize(const std::string &brokerURI);
 
        static const char *SCHEME;
 
@@ -112,7 +114,7 @@ void timeval2str(struct timeval *t, char **str) {
 
 
 cms::Message *
-OutputPlugin::createMessage(edg_wll_JobStat *state_out)
+OutputPlugin::createMessage(edg_wll_JobStat &state_out)
 {
        cms::TextMessage *cms_msg = session->createTextMessage();
        char *s;
@@ -120,48 +122,68 @@ OutputPlugin::createMessage(edg_wll_JobStat *state_out)
        std::string val;
 
        /* ownerDn */
-       val.assign(state_out->owner);
-       cms_msg->setStringProperty("ownerDn", val);
+       if(state_out.owner) {
+               val.assign(state_out.owner);
+               cms_msg->setStringProperty("ownerDn", val);
+       }
        /* voname */
-       s = edg_wll_JDLField(state_out,"VirtualOrganisation");
-       val.assign(s);
-       free(s);
-       cms_msg->setStringProperty("voname", val);
+       s = edg_wll_JDLField(&state_out,"VirtualOrganisation");
+       if(s) {
+               val.assign(s);
+               free(s);
+               cms_msg->setStringProperty("voname", val);
+       }
        /* bkHost */
-       glite_jobid_getServerParts(state_out->jobId, &s, &i);
-       val.assign(s);
-       free(s);
-       cms_msg->setStringProperty("bkHost", val);
+       glite_jobid_getServerParts(state_out.jobId, &s, &i);
+       if(s) {
+               val.assign(s);
+               free(s);
+               cms_msg->setStringProperty("bkHost", val);
+       }
        /* networkServer */
        /* TODO: XXX cut out hostname */
-       val.assign(state_out->network_server);
-       cms_msg->setStringProperty("networkHost", val);
-       timeval2str(&state_out->lastUpdateTime, &s);
-       val.assign(s);
-       if(s) free(s);
-       cms_msg->setStringProperty("lastUpdateTime", val);
+       if(state_out.network_server) {
+               val.assign(state_out.network_server);
+               cms_msg->setStringProperty("networkHost", val);
+       }
+       timeval2str(&state_out.lastUpdateTime, &s);
+       if(s) {
+               val.assign(s);
+               free(s);
+               cms_msg->setStringProperty("lastUpdateTime", val);
+       }
        /* stateName */
-       s = edg_wll_StatToString(state_out->state);
-       val.assign(s);
-       if(s) free(s);
-       cms_msg->setStringProperty("stateName", val);
-       timeval2str(&state_out->stateEnterTime, &s);
-       val.assign(s);
-       if(s) free(s);
-       cms_msg->setStringProperty("stateStartTime", val);
+       s = edg_wll_StatToString(state_out.state);
+       if(s) {
+               val.assign(s);
+               free(s);
+               cms_msg->setStringProperty("stateName", val);
+       }
+       timeval2str(&state_out.stateEnterTime, &s);
+       if(s) {
+               val.assign(s);
+               free(s);
+               cms_msg->setStringProperty("stateStartTime", val);
+       }
        /* condorId */
-       val.assign(state_out->condorId);
-       cms_msg->setStringProperty("condorId", val);
+       if(state_out.condorId) {
+               val.assign(state_out.condorId);
+               cms_msg->setStringProperty("condorId", val);
+       }
        /* destSite */
-       val.assign(state_out->destination);
-       cms_msg->setStringProperty("destSite", val);
+       if(state_out.destination) {
+               val.assign(state_out.destination);
+               cms_msg->setStringProperty("destSite", val);
+       }
        /* exitCode */
-       cms_msg->setIntProperty("exitCode", state_out->exit_code);
+       cms_msg->setIntProperty("exitCode", state_out.exit_code);
        /* doneCode */
-       cms_msg->setIntProperty("doneCode", state_out->done_code);
+       cms_msg->setIntProperty("doneCode", state_out.done_code);
        /* statusReason */
-       val.assign(state_out->reason);
-       cms_msg->setStringProperty("statusReason", val);
+       if(state_out.reason) {
+               val.assign(state_out.reason);
+               cms_msg->setStringProperty("statusReason", val);
+       }
        
        return cms_msg;
 }
@@ -172,7 +194,7 @@ OutputPlugin::send(cms::Message *msg)
 {
        try {
                if(producer != NULL) {
-                       producer->send(destination, msg);
+                       producer->send(msg);
                }
        } catch(cms::CMSException &e) {
                cleanup();
@@ -244,7 +266,7 @@ OutputPlugin::releaseConnection()
 
 
 void
-OutputPlugin::initialize(std::string brokerURI) 
+OutputPlugin::initialize(const std::string &brokerURI) 
 {
        pthread_rwlock_init(&connection_lock, NULL);
        try {
@@ -282,6 +304,9 @@ event_queue_connect(struct event_queue *eq)
        assert(output != NULL);
 
        try {
+               glite_common_log(IL_LOG_CATEGORY, LOG_PRIORITY_DEBUG, 
+                                "    trying to connect to %s", 
+                                eq->dest_name);
                output->connect(topicName);
        } catch(cms::CMSException &e) {
                set_error(IL_DL, 0, (char*)e.what());
@@ -300,7 +325,7 @@ event_queue_send(struct event_queue *eq)
        OutputPlugin *output = (OutputPlugin*)eq->plugin_data;
        edg_wll_Context context;
        edg_wll_Event *notif_event;
-       edg_wll_JobStat *state_out;
+       edg_wll_JobStat state_out;
        il_octet_string_t event;
        char *jobstat_char;
        int ret;
@@ -317,7 +342,11 @@ event_queue_send(struct event_queue *eq)
                goto err;
            }
 
-           if(decode_il_msg(&event, msg->msg) < 0) {
+           glite_common_log(IL_LOG_CATEGORY, LOG_PRIORITY_DEBUG, 
+                            "    trying to deliver event at offset %d for job %s", 
+                            msg->offset, msg->job_id_s);
+
+           if(decode_il_msg(&event, msg->msg + 17) < 0) {
                    set_error(IL_LBAPI, EINVAL, "event_queue_send: error parsing notification event data");
                    goto err;
            }
@@ -331,27 +360,30 @@ event_queue_send(struct event_queue *eq)
                    set_error(IL_LBAPI, EINVAL, "event_queue_send: error unescaping job status");
                    goto err;
            }
-           if ( edg_wll_ParseJobStat(context, jobstat_char, strlen(jobstat_char), state_out)) {
+           if ( edg_wll_ParseJobStat(context, jobstat_char, strlen(jobstat_char), &state_out)) {
                    set_error(IL_LBAPI, EINVAL, "event_queue_send: error parsing job status");
+                   fprintf(stderr, "Status string: %s\n", jobstat_char);
                    goto err;
            }
 
            try {
                    cms_msg = output->createMessage(state_out);
 
-                   free(event.data);
+                   free(event.data); event.data = NULL;
                    edg_wll_FreeEvent(notif_event);
-                   free(notif_event);
-                   edg_wll_FreeStatus(state_out);
-                   free(state_out);
-                   free(jobstat_char);
+                   free(notif_event); notif_event = NULL;
+                   edg_wll_FreeStatus(&state_out);
+                   free(jobstat_char); jobstat_char = NULL;
 
                    output->send(cms_msg); 
                    delete cms_msg;
-                   if(event_store_commit(msg->es, msg->ev_len, queue_list_is_log(eq), msg->generation) < 0) {
+                   if(event_store_commit(msg->es, msg->ev_len, 0, msg->generation) < 0) {
                            /* failure committing message, this is bad */
                            goto err;
                    }
+                   glite_common_log(IL_LOG_CATEGORY, LOG_PRIORITY_DEBUG, 
+                                    "    event sent to %s", eq->dest_name);
+
            } catch(cms::CMSException &e) {
                    if(cms_msg) {
                            delete cms_msg;
@@ -378,10 +410,8 @@ err:
        if(jobstat_char) {
                free(jobstat_char);
        }
-       if(state_out) {
-               edg_wll_FreeStatus(state_out);
-               free(state_out);
-       }
+       edg_wll_FreeStatus(&state_out);
+
        return -1;
 }