class SimpleAsyncConsumer : public ExceptionListener,
public MessageListener,
public DefaultTransportListener {
+
private:
Connection* connection;
bool clientAck;
std::string brokerURI;
std::string destURI;
+ FILE * outputfile;
public:
message->acknowledge();
}
- printf( "Message #%d Received: %s\n", count, text.c_str() );
+ fprintf(outputfile, "Message #%d Received: %s\n", count, text.c_str() );
+ fflush(outputfile);
} catch (CMSException& e) {
e.printStackTrace();
}
// If something bad happens you see it here as this class is also been
// registered as an ExceptionListener with the connection.
virtual void onException( const CMSException& ex AMQCPP_UNUSED ) {
- printf("CMS Exception occurred. Shutting down client.\n");
+ fprintf(stderr, "CMS Exception occurred. Shutting down client.\n");
//exit(1);
}
virtual void transportInterrupted() {
- std::cout << "The Connection's Transport has been Interrupted." << std::endl;
+ fprintf(stderr, "The Connection's Transiort has been Interrupted.\n");
}
virtual void transportResumed() {
- std::cout << "The Connection's Transport has been Restored." << std::endl;
+ fprintf(stderr, "The Connection's Transport has been Restored.\n");
+ }
+
+ virtual void setOutfile(FILE * setout) {
+ outputfile=setout;
}
private:
////////////////////////////////////////////////////////////////////////////////
int main(int argc AMQCPP_UNUSED, char* argv[] AMQCPP_UNUSED) {
- if (argc <= 2) {
- printf("Usage: %s <broker> <topic>\n", argv[0]);
+ char *argbrokerURI = NULL;
+ char *argdestURI = NULL;
+ char *outputfile = NULL;
+ FILE * outfile;
+ int i;
+
+
+ for(i=1; i<argc; i++)
+ if((!strcmp(argv[i],"-o")) && (i<argc)) outputfile = argv[++i];
+ else if(!argbrokerURI) argbrokerURI=argv[i];
+ else argdestURI=argv[i];
+ if (!(outfile = fopen (outputfile,"w"))) outfile = stdout;
+
+ if (!argdestURI) {
+ printf("Usage: %s [-o <output_file>] <broker> <topic>\n", argv[0]);
return 1;
}
activemq::library::ActiveMQCPP::initializeLibrary();
- std::cout << "=====================================================\n";
- std::cout << "Starting the example:" << std::endl;
- std::cout << "-----------------------------------------------------\n";
+ fprintf(outfile, "=====================================================\n");
+ fprintf(outfile, "Starting the example:\n");
+ fprintf(outfile, "-----------------------------------------------------\n");
// Set the URI to point to the IPAddress of your broker.
// add any optional params to the url to enable things like
// tcp://127.0.0.1:61613?wireFormat=stomp use stomp instead
//
std::string brokerURI = "failover:(tcp://";
- brokerURI += argv[1];
+ brokerURI += argbrokerURI;
brokerURI += ")";
// std::string brokerURI =
// customize where the consumer listens, to have the consumer
// use a topic or queue set the 'useTopics' flag.
//============================================================
- std::string destURI = argv[2]; //?consumer.prefetchSize=1";
+ std::string destURI = argdestURI; //?consumer.prefetchSize=1";
//============================================================
// set to true to use topics instead of queues
// Create the consumer
SimpleAsyncConsumer consumer( brokerURI, destURI, useTopics, clientAck );
+ consumer.setOutfile(outfile);
+
// Start it up and it will listen forever.
consumer.runConsumer();
// Wait to exit.
- std::cout << "Press 'q' to quit" << std::endl;
+ fprintf(stdout, "Press 'q' to quit\n");
while( std::cin.get() != 'q') {}
// All CMS resources should be closed before the library is shutdown.
consumer.close();
- std::cout << "-----------------------------------------------------\n";
- std::cout << "Finished with the example." << std::endl;
- std::cout << "=====================================================\n";
+ fprintf(outfile, "-----------------------------------------------------\n");
+ fprintf(outfile, "Finished with the example.\n");
+ fprintf(outfile, "=====================================================\n");
+
+ if (outputfile) fclose(outfile);
activemq::library::ActiveMQCPP::shutdownLibrary();
}