some fixes, json messages
authorMichal Voců <michal@ruk.cuni.cz>
Wed, 19 Jan 2011 13:44:24 +0000 (13:44 +0000)
committerMichal Voců <michal@ruk.cuni.cz>
Wed, 19 Jan 2011 13:44:24 +0000 (13:44 +0000)
org.glite.lb.logger-msg/src/activemq_cpp_plugin.cpp

index 5daa51f..67fd77e 100644 (file)
@@ -44,6 +44,7 @@ extern "C" {
 #include <cms/Message.h>
 
 #include <string>
+#include <strstream>
 
 #include <assert.h>
 #include <string.h>
@@ -119,72 +120,68 @@ OutputPlugin::createMessage(edg_wll_JobStat &state_out)
        cms::TextMessage *cms_msg = session->createTextMessage();
        char *s;
        unsigned int i;
-       std::string val;
+       std::ostrstream body;
 
+       body << "{";
        /* ownerDn */
        if(state_out.owner) {
-               val.assign(state_out.owner);
-               cms_msg->setStringProperty("ownerDn", val);
+               body << "ownerDn: \"" << state_out.owner << "\", ";
+               // cms_msg->setStringProperty("ownerDn", val);
        }
        /* voname */
        s = edg_wll_JDLField(&state_out,"VirtualOrganisation");
        if(s) {
-               val.assign(s);
+               body << "VirtualOrganisation: \"" << s << "\", ";
                free(s);
-               cms_msg->setStringProperty("voname", val);
        }
        /* bkHost */
        glite_jobid_getServerParts(state_out.jobId, &s, &i);
        if(s) {
-               val.assign(s);
+               body << "bkHost: \"" << s << "\", ";
                free(s);
-               cms_msg->setStringProperty("bkHost", val);
        }
        /* networkServer */
        /* TODO: XXX cut out hostname */
        if(state_out.network_server) {
-               val.assign(state_out.network_server);
-               cms_msg->setStringProperty("networkHost", val);
+               body << "networkHost: \"" << state_out.network_server << "\", ";
        }
        timeval2str(&state_out.lastUpdateTime, &s);
        if(s) {
-               val.assign(s);
+               body << "lastUpdateTime: \"" << s << "\", ";
                free(s);
-               cms_msg->setStringProperty("lastUpdateTime", val);
        }
        /* stateName */
        s = edg_wll_StatToString(state_out.state);
        if(s) {
-               val.assign(s);
+               body << "stateName: \"" << s << "\", ";
                free(s);
-               cms_msg->setStringProperty("stateName", val);
        }
        timeval2str(&state_out.stateEnterTime, &s);
        if(s) {
-               val.assign(s);
+               body << "stateStartTime: \"" << s << "\", ";
                free(s);
-               cms_msg->setStringProperty("stateStartTime", val);
        }
        /* condorId */
        if(state_out.condorId) {
-               val.assign(state_out.condorId);
-               cms_msg->setStringProperty("condorId", val);
+               body << "condorId: \"" << state_out.condorId << "\", ";
        }
        /* destSite */
        if(state_out.destination) {
-               val.assign(state_out.destination);
-               cms_msg->setStringProperty("destSite", val);
+               body << "destSite: \"" << state_out.destination << "\", ";
        }
        /* exitCode */
-       cms_msg->setIntProperty("exitCode", state_out.exit_code);
+       body << "exitCode: " << state_out.exit_code <<  ", ";
        /* doneCode */
-       cms_msg->setIntProperty("doneCode", state_out.done_code);
+       body << "doneCode: " << state_out.done_code << ", ";
        /* statusReason */
        if(state_out.reason) {
-               val.assign(state_out.reason);
-               cms_msg->setStringProperty("statusReason", val);
+               body << "statusReason: \"" << state_out.reason << "\", ";
        }
-       
+       body << "}";
+
+       cms_msg->setText(body.str());
+       cms_msg->setStringProperty("Content-type", "text/javascript");
+
        return cms_msg;
 }
 
@@ -342,10 +339,22 @@ event_queue_send(struct event_queue *eq, struct queue_thread *me)
                    break;
            }
 
+           if(0 == msg->len) {
+                   glite_common_log(IL_LOG_CATEGORY, LOG_PRIORITY_DEBUG,
+                                    "    not sending empty message at offset %d for job %s",
+                                    msg->offset, msg->job_id_s);
+                   if(event_store_commit(msg->es, msg->ev_len, 0, msg->generation) < 0) {
+                           /* failure committing message, this is bad */
+                           goto err;
+                   }
+                   event_queue_remove(eq, me);
+                   continue;
+           }
+
            glite_common_log(IL_LOG_CATEGORY, LOG_PRIORITY_DEBUG, 
                             "    trying to deliver event at offset %d for job %s", 
                             msg->offset, msg->job_id_s);
-
+           
            if(decode_il_msg(&event, msg->msg + 17) < 0) {
                    set_error(IL_LBAPI, EINVAL, "event_queue_send: error parsing notification event data");
                    goto err;
@@ -357,33 +366,33 @@ event_queue_send(struct event_queue *eq, struct queue_thread *me)
            }
            jobstat_char = glite_lbu_UnescapeXML((const char *) notif_event->notification.jobstat);
            if (jobstat_char == NULL) {
-                   set_error(IL_LBAPI, EINVAL, "event_queue_send: error unescaping job status");
-                   goto err;
+                   set_error(IL_LBAPI, EINVAL, "event_queue_send: error unescaping job status");
+                   goto err;
            }
            if ( edg_wll_ParseJobStat(context, jobstat_char, strlen(jobstat_char), &state_out)) {
                    set_error(IL_LBAPI, EINVAL, "event_queue_send: error parsing job status");
                    fprintf(stderr, "Status string: %s\n", jobstat_char);
                    goto err;
            }
-
+           
            try {
                    cms_msg = output->createMessage(state_out);
-
+                   
                    free(event.data); event.data = NULL;
                    edg_wll_FreeEvent(notif_event);
                    free(notif_event); notif_event = NULL;
                    edg_wll_FreeStatus(&state_out);
                    free(jobstat_char); jobstat_char = NULL;
-
+                   
                    output->send(cms_msg); 
-                   delete cms_msg;
+                   delete cms_msg;
                    if(event_store_commit(msg->es, msg->ev_len, 0, msg->generation) < 0) {
                            /* failure committing message, this is bad */
                            goto err;
                    }
                    glite_common_log(IL_LOG_CATEGORY, LOG_PRIORITY_DEBUG, 
                                     "    event sent to %s", eq->dest_name);
-
+                   
            } catch(cms::CMSException &e) {
                    if(cms_msg) {
                            delete cms_msg;