reworked initialization and error reporting
authorMichal Voců <michal@ruk.cuni.cz>
Fri, 25 Jun 2010 12:37:48 +0000 (12:37 +0000)
committerMichal Voců <michal@ruk.cuni.cz>
Fri, 25 Jun 2010 12:37:48 +0000 (12:37 +0000)
org.glite.lb.logger-msg/src/activemq_cpp_plugin.cpp

index 8559962..71885c5 100644 (file)
@@ -48,57 +48,135 @@ limitations under the License.
 #include <errno.h>
 #include <sys/param.h>
 
-class OutputPlugin : public cms::ExceptionListener {
+class OutputPlugin {
 
 public:
 
        OutputPlugin() : session(NULL), destination(NULL), producer(NULL) {};
 
-       virtual void onException(const cms::CMSException &ex);
 
-       void connect(const std::string &topic);
-       void send(cms::Message *msg);
-       void close();
-       void cleanup();
+       cms::Message *createMessage(edg_wll_JobStat *state_out);
+       void         connect(const std::string &topic);
+       void         send(cms::Message *msg);
+       void         close();
+       void         cleanup();
 
-public:
+       static void   initialize(std::string brokerURI);
+
+       static const char *SCHEME;
+
+private:
+
+       static cms::Connection *getConnection();
+       static void releaseConnection();
 
        cms::Session *session;
        cms::Topic   *destination;
        cms::MessageProducer *producer;
+       cms::Connection *current_connection;
 
        static cms::Connection *connection;
        static cms::ConnectionFactory *connectionFactory;
-
-       static const char *SCHEME;
+       static pthread_rwlock_t connection_lock;
 };
 
 
 void
-OutputPlugin::onException(const cms::CMSException &ex)
+OutputPlugin::connect(const std::string &topic)
 {
-       this->cleanup();
+       try {
+               current_connection = getConnection();
+               if(session == NULL) {
+                       session = current_connection->createSession(/* TODO: ackMode */);
+                       destination = session->createTopic(topic);
+                       producer = session->createProducer(destination);
+               }
+               current_connection->start();
+               releaseConnection();
+       } catch (cms::CMSException &e) {
+               releaseConnection();
+               cleanup();
+               throw e;
+       }
 }
 
 
-void
-OutputPlugin::connect(const std::string &topic)
+static
+void timeval2str(struct timeval *t, char **str) {
+        struct tm       *tm;
+
+        tm = gmtime(&t->tv_sec);
+        asprintf(str,"%4d-%02d-%02dT%02d:%02d:%02dZ",tm->tm_year+1900,tm->tm_mon+1,
+                tm->tm_mday,tm->tm_hour,tm->tm_min,tm->tm_sec);
+}
+
+
+cms::Message *
+OutputPlugin::createMessage(edg_wll_JobStat *state_out)
 {
-       if(this->session == NULL) {
-               this->session = connection->createSession(/* TODO: ackMode */);
-               this->destination = this->session->createTopic(topic);
-               this->producer = this->session->createProducer(this->destination);
-       }
-       connection->start();
-       connection->setExceptionListener(this);
+       cms::TextMessage *cms_msg = session->createTextMessage();
+       char *s;
+       unsigned int i;
+       std::string val;
+
+       /* ownerDn */
+       val.assign(state_out->owner);
+       cms_msg->setStringProperty("ownerDn", val);
+       /* voname */
+       s = edg_wll_JDLField(state_out,"VirtualOrganisation");
+       val.assign(s);
+       free(s);
+       cms_msg->setStringProperty("voname", val);
+       /* bkHost */
+       glite_jobid_getServerParts(state_out->jobId, &s, &i);
+       val.assign(s);
+       free(s);
+       cms_msg->setStringProperty("bkHost", val);
+       /* networkServer */
+       /* TODO: XXX cut out hostname */
+       val.assign(state_out->network_server);
+       cms_msg->setStringProperty("networkHost", val);
+       timeval2str(&state_out->lastUpdateTime, &s);
+       val.assign(s);
+       if(s) free(s);
+       cms_msg->setStringProperty("lastUpdateTime", val);
+       /* stateName */
+       s = edg_wll_StatToString(state_out->state);
+       val.assign(s);
+       if(s) free(s);
+       cms_msg->setStringProperty("stateName", val);
+       timeval2str(&state_out->stateEnterTime, &s);
+       val.assign(s);
+       if(s) free(s);
+       cms_msg->setStringProperty("stateStartTime", val);
+       /* condorId */
+       val.assign(state_out->condorId);
+       cms_msg->setStringProperty("condorId", val);
+       /* destSite */
+       val.assign(state_out->destination);
+       cms_msg->setStringProperty("destSite", val);
+       /* exitCode */
+       cms_msg->setIntProperty("exitCode", state_out->exit_code);
+       /* doneCode */
+       cms_msg->setIntProperty("doneCode", state_out->done_code);
+       /* statusReason */
+       val.assign(state_out->reason);
+       cms_msg->setStringProperty("statusReason", val);
+       
+       return cms_msg;
 }
 
 
 void
 OutputPlugin::send(cms::Message *msg)
 {
-       if(this->producer != NULL) {
-               this->producer->send(this->destination, msg);
+       try {
+               if(producer != NULL) {
+                       producer->send(destination, msg);
+               }
+       } catch(cms::CMSException &e) {
+               cleanup();
+               throw e;
        }
 }
 
@@ -106,46 +184,93 @@ OutputPlugin::send(cms::Message *msg)
 void
 OutputPlugin::close()
 {
-       this->cleanup();
-       connection->stop();
+       if(producer != NULL) {
+               delete producer;
+               producer = NULL;
+       }
+       if(destination != NULL) {
+               delete destination;
+               destination = NULL;
+       }
+       if(session != NULL) {
+               session->close();
+               delete session;
+               session = NULL;
+       }
 }
 
 
 void
 OutputPlugin::cleanup()
 {
-       if(this->producer != NULL) {
-               delete this->producer;
-               this->producer = NULL;
+       close();
+       pthread_rwlock_wrlock(&connection_lock);
+       if(connection && current_connection == connection) {
+               connection->close();
+               delete connection;
+               connection = NULL;
+               current_connection = NULL;
        }
-       if(this->destination != NULL) {
-               delete this->destination;
-               this->destination = NULL;
+       pthread_rwlock_unlock(&connection_lock);
+}
+
+
+cms::Connection *
+OutputPlugin::getConnection() 
+{
+       pthread_rwlock_rdlock(&connection_lock);
+       if(connection) {
+               return connection;
        }
-       if(this->session != NULL) {
-               this->session->close();
-               delete this->session;
-               this->session = NULL;
+
+       pthread_rwlock_unlock(&connection_lock);
+
+       pthread_rwlock_wrlock(&connection_lock);
+       if(!connection) {
+               connection = connectionFactory->createConnection();
        }
+       pthread_rwlock_unlock(&connection_lock);
+
+       pthread_rwlock_rdlock(&connection_lock);
+       return connection;
 }
 
 
-static
-void timeval2str(struct timeval *t, char **str) {
-        struct tm       *tm;
+void
+OutputPlugin::releaseConnection() 
+{
+       pthread_rwlock_unlock(&connection_lock);
+}
 
-        tm = gmtime(&t->tv_sec);
-        asprintf(str,"%4d-%02d-%02dT%02d:%02d:%02dZ",tm->tm_year+1900,tm->tm_mon+1,
-                tm->tm_mday,tm->tm_hour,tm->tm_min,tm->tm_sec);
+
+void
+OutputPlugin::initialize(std::string brokerURI) 
+{
+       pthread_rwlock_init(&connection_lock, NULL);
+       try {
+               activemq::library::ActiveMQCPP::initializeLibrary();
+
+               connectionFactory = cms::ConnectionFactory::createCMSConnectionFactory(brokerURI);
+       } catch (cms::CMSException &e) {
+               try {
+                       if(connectionFactory != NULL) {
+                               delete connectionFactory;
+                               connectionFactory = NULL;
+                       }
+               } catch(cms::CMSException &d) {
+               }
+               throw e;
+       }
 }
 
 
+
 extern "C"
 int
 event_queue_connect(struct event_queue *eq)
 {
        OutputPlugin *output;
-       std::string topicName;
+       std::string topicName(eq->dest_name);
 
        if(eq->plugin_data == NULL) {
                output = new OutputPlugin();
@@ -154,12 +279,14 @@ event_queue_connect(struct event_queue *eq)
                output = (OutputPlugin*)eq->plugin_data;
        }
 
+       assert(output != NULL);
+
        try {
                output->connect(topicName);
        } catch(cms::CMSException &e) {
-                       output->cleanup();
-                       eq->timeout = TIMEOUT;
-                       return 0;
+               set_error(IL_DL, 0, (char*)e.what());
+               eq->timeout = TIMEOUT;
+               return 0;
        }
        eq->first_event_sent = 0;
        return 1;
@@ -173,9 +300,9 @@ event_queue_send(struct event_queue *eq)
        OutputPlugin *output = (OutputPlugin*)eq->plugin_data;
        edg_wll_Context context;
        edg_wll_Event *notif_event;
-    edg_wll_JobStat *state_out;
-    il_octet_string_t event;
-    char *jobstat_char;
+       edg_wll_JobStat *state_out;
+       il_octet_string_t event;
+       char *jobstat_char;
        int ret;
 
        assert(output != NULL);
@@ -184,107 +311,58 @@ event_queue_send(struct event_queue *eq)
 
        while(!event_queue_empty(eq)) {
            struct server_msg *msg;
-           cms::TextMessage *cms_msg;
-           char *s;
-           unsigned int i;
-           std::string val;
+           cms::Message *cms_msg;
 
            if(event_queue_get(eq, &msg) < 0) {
                goto err;
            }
 
-           try {
-               if(decode_il_msg(&event, msg->msg) < 0) {
-                       set_error(IL_LBAPI, EINVAL, "event_queue_send: error parsing notification event data");
-                       goto err;
-               }
-               ret=edg_wll_ParseNotifEvent(context, event.data, &notif_event);
-               if(ret) {
-                       set_error(IL_LBAPI, ret, "event_queue_send: error parsing notification event");
-                       goto err;
-               }
-               jobstat_char = glite_lbu_UnescapeXML((const char *) notif_event->notification.jobstat);
-               if (jobstat_char == NULL) {
+           if(decode_il_msg(&event, msg->msg) < 0) {
+                   set_error(IL_LBAPI, EINVAL, "event_queue_send: error parsing notification event data");
+                   goto err;
+           }
+           ret=edg_wll_ParseNotifEvent(context, event.data, &notif_event);
+           if(ret) {
+                   set_error(IL_LBAPI, ret, "event_queue_send: error parsing notification event");
+                   goto err;
+           }
+           jobstat_char = glite_lbu_UnescapeXML((const char *) notif_event->notification.jobstat);
+           if (jobstat_char == NULL) {
                    set_error(IL_LBAPI, EINVAL, "event_queue_send: error unescaping job status");
                    goto err;
-               }
-               if ( edg_wll_ParseJobStat(context, jobstat_char, strlen(jobstat_char), state_out)) {
-                       set_error(IL_LBAPI, EINVAL, "event_queue_send: error parsing job status");
-                       goto err;
-               }
-
-
-               cms_msg = output->session->createTextMessage();
-               /* ownerDn */
-               val.assign(state_out->owner);
-               cms_msg->setStringProperty("ownerDn", val);
-               /* voname */
-               s = edg_wll_JDLField(state_out,"VirtualOrganisation");
-               val.assign(s);
-               free(s);
-               cms_msg->setStringProperty("voname", val);
-               /* bkHost */
-               glite_jobid_getServerParts(state_out->jobId, &s, &i);
-               val.assign(s);
-               free(s);
-               cms_msg->setStringProperty("bkHost", val);
-               /* networkServer */
-               /* TODO: XXX cut out hostname */
-               val.assign(state_out->network_server);
-               cms_msg->setStringProperty("networkHost", val);
-               timeval2str(&state_out->lastUpdateTime, &s);
-               val.assign(s);
-               if(s) free(s);
-               cms_msg->setStringProperty("lastUpdateTime", val);
-               /* stateName */
-               s = edg_wll_StatToString(state_out->state);
-               val.assign(s);
-               if(s) free(s);
-               cms_msg->setStringProperty("stateName", val);
-               timeval2str(&state_out->stateEnterTime, &s);
-               val.assign(s);
-               if(s) free(s);
-               cms_msg->setStringProperty("stateStartTime", val);
-               /* condorId */
-               val.assign(state_out->condorId);
-               cms_msg->setStringProperty("condorId", val);
-               /* destSite */
-               val.assign(state_out->destination);
-               cms_msg->setStringProperty("destSite", val);
-               /* exitCode */
-               cms_msg->setIntProperty("exitCode", state_out->exit_code);
-               /* doneCode */
-               cms_msg->setIntProperty("doneCode", state_out->done_code);
-               /* statusReason */
-               val.assign(state_out->reason);
-               cms_msg->setStringProperty("statusReason", val);
-
-               free(event.data);
-               edg_wll_FreeEvent(notif_event);
-               free(notif_event);
-               edg_wll_FreeStatus(state_out);
-               free(state_out);
-               free(jobstat_char);
-           } catch(cms::CMSException &e) {
-               goto err;
+           }
+           if ( edg_wll_ParseJobStat(context, jobstat_char, strlen(jobstat_char), state_out)) {
+                   set_error(IL_LBAPI, EINVAL, "event_queue_send: error parsing job status");
+                   goto err;
            }
 
            try {
-               output->send(cms_msg);
-                   delete cms_msg;
-               if(event_store_commit(msg->es, msg->ev_len, queue_list_is_log(eq), msg->generation) < 0) {
-                       /* failure committing message, this is bad */
-                       goto err;
-               }
-               event_queue_remove(eq);
-               eq->first_event_sent = 1;
+                   cms_msg = output->createMessage(state_out);
+
+                   free(event.data);
+                   edg_wll_FreeEvent(notif_event);
+                   free(notif_event);
+                   edg_wll_FreeStatus(state_out);
+                   free(state_out);
+                   free(jobstat_char);
+
+                   output->send(cms_msg); 
+                   delete cms_msg;
+                   if(event_store_commit(msg->es, msg->ev_len, queue_list_is_log(eq), msg->generation) < 0) {
+                           /* failure committing message, this is bad */
+                           goto err;
+                   }
            } catch(cms::CMSException &e) {
-                   delete cms_msg;
-               output->cleanup();
-               eq->timeout = TIMEOUT;
-               edg_wll_FreeContext(context);
-               return 0;
+                   if(cms_msg) {
+                           delete cms_msg;
+                   }
+                   eq->timeout = TIMEOUT;
+                   edg_wll_FreeContext(context);
+                   set_error(IL_DL, 0, (char*)e.what());
+                   return 0;
            }
+           event_queue_remove(eq);
+           eq->first_event_sent = 1;
        }
        edg_wll_FreeContext(context);
        return 1;
@@ -294,8 +372,8 @@ err:
                free(event.data);
        }
        if(notif_event) {
-       edg_wll_FreeEvent(notif_event);
-       free(notif_event);
+               edg_wll_FreeEvent(notif_event);
+               free(notif_event);
        }
        if(jobstat_char) {
                free(jobstat_char);
@@ -316,9 +394,10 @@ event_queue_close(struct event_queue *eq)
 
        assert(output != NULL);
 
-       try {
+       try { 
                output->close();
        } catch(cms::CMSException &e) {
+               set_error(IL_DL, 0, (char*)e.what());
                return -1;
        }
        eq->first_event_sent = 0;
@@ -362,23 +441,8 @@ plugin_init(char *config)
        }
 
        try {
-               activemq::library::ActiveMQCPP::initializeLibrary();
-
-               OutputPlugin::connectionFactory =
-                               cms::ConnectionFactory::createCMSConnectionFactory(brokerURI);
-               OutputPlugin::connection = OutputPlugin::connectionFactory->createConnection();
-       } catch (cms::CMSException &e) {
-               try {
-                       if(OutputPlugin::connection != NULL) {
-                               delete OutputPlugin::connection;
-                               OutputPlugin::connection = NULL;
-                       }
-                       if(OutputPlugin::connectionFactory != NULL) {
-                               delete OutputPlugin::connectionFactory;
-                               OutputPlugin::connectionFactory = NULL;
-                       }
-               } catch(cms::CMSException &e) {
-               }
+               OutputPlugin::initialize(brokerURI);
+       } catch(cms::CMSException &e) {
                set_error(IL_DL, 0, (char*)e.what());
                return -1;
        }
@@ -397,4 +461,5 @@ plugin_supports_scheme(const char *scheme)
 
 cms::Connection *OutputPlugin::connection = NULL;
 cms::ConnectionFactory *OutputPlugin::connectionFactory = NULL;
-const char *OutputPlugin::SCHEME = "x-msg:";
+const char *OutputPlugin::SCHEME = "x-msg://";
+pthread_rwlock_t OutputPlugin::connection_lock;