From: Zdeněk Šustr Date: Fri, 1 Apr 2011 09:28:54 +0000 (+0000) Subject: New MSG client example. X-Git-Tag: glite-lb-client_R_5_0_4_3~22 X-Git-Url: http://scientific.zcu.cz/git/?a=commitdiff_plain;h=afebbff55f77da210776d890128c85d36891ad37;p=jra1mw.git New MSG client example. --- diff --git a/org.glite.lb.logger-msg/Makefile b/org.glite.lb.logger-msg/Makefile index d862716..fe9fea1 100644 --- a/org.glite.lb.logger-msg/Makefile +++ b/org.glite.lb.logger-msg/Makefile @@ -18,7 +18,7 @@ version=${module.version} CC=gcc CXX=gcc -VPATH:=${top_srcdir}/src:${top_srcdir}/interface:${top_srcdir}/test:${top_srcdir}/doc +VPATH:=${top_srcdir}/src:${top_srcdir}/interface:${top_srcdir}/test:${top_srcdir}/doc:${top_srcdir}/examples VERSION=-DVERSION=\"GLite-${version}\" @@ -65,9 +65,11 @@ PLUGIN_OBJS:=activemq_cpp_plugin.o PLUGIN_LOBJS:=${PLUGIN_OBJS:.o=.lo} PLUGIN_LIB:=activemq_cpp_plugin.la +EXAMPLES=glite-lb-cmsclient + default: all -all compile: ${PLUGIN_LIB} +all compile: ${PLUGIN_LIB} ${EXAMPLES} ${PLUGIN_LIB}: ${PLUGIN_LOBJS} ${SOLINK} -o $@ ${PLUGIN_LOBJS} ${EXT_LIB} @@ -77,6 +79,9 @@ ${MAN_GZ}: ${MAN} cp $? . gzip -f $(notdir $?) +glite-lb-cmsclient: cmsclient.o + $(CC) $< ${EXT_LIB} -o $@ + man: ${MAN_GZ} stage: compile @@ -96,6 +101,7 @@ install: -mkdir -p ${DESTDIR}${PREFIX}${prefix}/share/doc/${package}-${version} ${INSTALL} -m 644 ${top_srcdir}/LICENSE ${DESTDIR}${PREFIX}${prefix}/share/doc/${package}-${version} ${INSTALL} -m 755 ${PLUGIN_LIB} ${DESTDIR}${PREFIX}${prefix}/${libdir} + ${INSTALL} -m 755 ${EXAMPLES} ${DESTDIR}${PREFIX}${prefix}/${libdir}/glite-lb/examples ( cd ${top_srcdir}/project && ${INSTALL} -m 644 ChangeLog package.description package.summary ${DESTDIR}${PREFIX}${prefix}/share/doc/${package}-${version} ) # ${INSTALL} -m 644 ${MAN_GZ} ${DESTDIR}${PREFIX}${prefix}/share/man/man8 @@ -104,5 +110,5 @@ install: clean: - rm -rvf .libs/ *.o *.lo ${PLUGIN_LIB} ${MAN_GZ} + rm -rvf .libs/ *.o *.lo ${PLUGIN_LIB} ${MAN_GZ} ${EXAMPLES} rm -rvf log.xml rpmbuild/ RPMS/ tgz/ diff --git a/org.glite.lb.logger-msg/examples/cmsclient.cpp b/org.glite.lb.logger-msg/examples/cmsclient.cpp new file mode 100644 index 0000000..26aea16 --- /dev/null +++ b/org.glite.lb.logger-msg/examples/cmsclient.cpp @@ -0,0 +1,294 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +using namespace activemq; +using namespace activemq::core; +using namespace activemq::transport; +using namespace decaf::lang; +using namespace decaf::util; +using namespace decaf::util::concurrent; +using namespace cms; +using namespace std; + +//////////////////////////////////////////////////////////////////////////////// +class SimpleAsyncConsumer : public ExceptionListener, + public MessageListener, + public DefaultTransportListener { +private: + + Connection* connection; + Session* session; + Destination* destination; + MessageConsumer* consumer; + bool useTopic; + bool clientAck; + std::string brokerURI; + std::string destURI; + +public: + + SimpleAsyncConsumer( const std::string& brokerURI, + const std::string& destURI, + bool useTopic = false, + bool clientAck = false ) { + connection = NULL; + session = NULL; + destination = NULL; + consumer = NULL; + this->useTopic = useTopic; + this->brokerURI = brokerURI; + this->destURI = destURI; + this->clientAck = clientAck; + } + + virtual ~SimpleAsyncConsumer(){ + this->cleanup(); + } + + void close() { + this->cleanup(); + } + + void runConsumer() { + + try { + + // Create a ConnectionFactory + ActiveMQConnectionFactory* connectionFactory = + new ActiveMQConnectionFactory( brokerURI ); + + // Create a Connection + connection = connectionFactory->createConnection(); + delete connectionFactory; + + ActiveMQConnection* amqConnection = dynamic_cast( connection ); + if( amqConnection != NULL ) { + amqConnection->addTransportListener( this ); + } + + connection->start(); + + connection->setExceptionListener(this); + + // Create a Session + if( clientAck ) { + session = connection->createSession( Session::CLIENT_ACKNOWLEDGE ); + } else { + session = connection->createSession( Session::AUTO_ACKNOWLEDGE ); + } + + // Create the destination (Topic or Queue) + if( useTopic ) { + destination = session->createTopic( destURI ); + } else { + destination = session->createQueue( destURI ); + } + + // Create a MessageConsumer from the Session to the Topic or Queue + consumer = session->createConsumer( destination ); + consumer->setMessageListener( this ); + + } catch (CMSException& e) { + e.printStackTrace(); + } + } + + // Called from the consumer since this class is a registered MessageListener. + virtual void onMessage( const Message* message ){ + + static int count = 0; + + try + { + count++; + const TextMessage* textMessage = + dynamic_cast< const TextMessage* >( message ); + string text = ""; + + if( textMessage != NULL ) { + text = textMessage->getText(); + } else { + text = "NOT A TEXTMESSAGE!"; + } + + if( clientAck ) { + message->acknowledge(); + } + + printf( "Message #%d Received: %s\n", count, text.c_str() ); + } catch (CMSException& e) { + e.printStackTrace(); + } + } + + // If something bad happens you see it here as this class is also been + // registered as an ExceptionListener with the connection. + virtual void onException( const CMSException& ex AMQCPP_UNUSED ) { + printf("CMS Exception occurred. Shutting down client.\n"); + //exit(1); + } + + virtual void transportInterrupted() { + std::cout << "The Connection's Transport has been Interrupted." << std::endl; + } + + virtual void transportResumed() { + std::cout << "The Connection's Transport has been Restored." << std::endl; + } + +private: + + void cleanup(){ + + //************************************************* + // Always close destination, consumers and producers before + // you destroy their sessions and connection. + //************************************************* + + // Destroy resources. + try{ + if( destination != NULL ) delete destination; + }catch (CMSException& e) {} + destination = NULL; + + try{ + if( consumer != NULL ) delete consumer; + }catch (CMSException& e) {} + consumer = NULL; + + // Close open resources. + try{ + if( session != NULL ) session->close(); + if( connection != NULL ) connection->close(); + }catch (CMSException& e) {} + + // Now Destroy them + try{ + if( session != NULL ) delete session; + }catch (CMSException& e) {} + session = NULL; + + try{ + if( connection != NULL ) delete connection; + }catch (CMSException& e) {} + connection = NULL; + } +}; + +//////////////////////////////////////////////////////////////////////////////// +int main(int argc AMQCPP_UNUSED, char* argv[] AMQCPP_UNUSED) { + if (argc <= 1) { + printf("Usage: %s BROKER\n", argv[0]); + return 1; + } + + activemq::library::ActiveMQCPP::initializeLibrary(); + + std::cout << "=====================================================\n"; + std::cout << "Starting the example:" << std::endl; + std::cout << "-----------------------------------------------------\n"; + + // Set the URI to point to the IPAddress of your broker. + // add any optional params to the url to enable things like + // tightMarshalling or tcp logging etc. See the CMS web site for + // a full list of configuration options. + // + // http://activemq.apache.org/cms/ + // + // Wire Format Options: + // ===================== + // Use either stomp or openwire, the default ports are different for each + // + // Examples: + // tcp://127.0.0.1:61616 default to openwire + // tcp://127.0.0.1:61616?wireFormat=openwire same as above + // tcp://127.0.0.1:61613?wireFormat=stomp use stomp instead + // + std::string brokerURI = "failover:(tcp://"; + brokerURI += argv[1]; + brokerURI += ")"; +// std::string brokerURI = +// "failover:(tcp://harad.ics.muni.cz:61616" +// "?wireFormat=openwire" +// "&connection.useAsyncSend=true" +// "&transport.commandTracingEnabled=true" +// "&transport.tcpTracingEnabled=true" +// "&wireFormat.tightEncodingEnabled=true" +// ")"; + + //============================================================ + // This is the Destination Name and URI options. Use this to + // customize where the consumer listens, to have the consumer + // use a topic or queue set the 'useTopics' flag. + //============================================================ + std::string destURI = "topic"; //?consumer.prefetchSize=1"; + + //============================================================ + // set to true to use topics instead of queues + // Note in the code above that this causes createTopic or + // createQueue to be used in the consumer. + //============================================================ + bool useTopics = true; + + //============================================================ + // set to true if you want the consumer to use client ack mode + // instead of the default auto ack mode. + //============================================================ + bool clientAck = false; + + // Create the consumer + SimpleAsyncConsumer consumer( brokerURI, destURI, useTopics, clientAck ); + + // Start it up and it will listen forever. + consumer.runConsumer(); + + // Wait to exit. + std::cout << "Press 'q' to quit" << std::endl; + while( std::cin.get() != 'q') {} + + // All CMS resources should be closed before the library is shutdown. + consumer.close(); + + std::cout << "-----------------------------------------------------\n"; + std::cout << "Finished with the example." << std::endl; + std::cout << "=====================================================\n"; + + activemq::library::ActiveMQCPP::shutdownLibrary(); +} + +