--- /dev/null
+#ifndef _CONNECTION_H
+#define _CONNECTION_H
+
+
+class Connection {
+public:
+ class Factory {
+ public:
+ virtual Connection *newConnection(int fd) const = 0;
+ virtual Connection *accept(int fd) const = 0;
+ };
+
+ class Endpoint {
+ };
+
+ Connection(int a_fd) : fd(a_fd)
+ {}
+
+ virtual ~Connection()
+ {}
+
+ virtual int getFD() const
+ { return fd; }
+
+ virtual int read(char *buf, unsigned int len) = 0;
+ virtual int write(char *buf, unsigned int len) = 0;
+
+protected:
+ int fd;
+};
+
+#endif
--- /dev/null
+#ifndef _HTTP_TRANSPORT_H
+#define _HTTP_TRANSPORT_H
+
+#include "ThreadPool.H"
+#include "Transport.H"
+
+class HTTPTransport:
+ public Transport
+{
+public:
+ class Factory: public Transport::Factory {
+ public:
+ virtual Transport *newTransport(Connection *conn) const {
+ return(new HTTPTransport(conn));
+ }
+ };
+
+ static Factory theFactory;
+
+ HTTPTransport(Connection *conn)
+ : Transport(conn)
+ {}
+
+ virtual ~HTTPTransport();
+
+ // from ThreadPool::WorkDescription
+ virtual void onReady();
+ virtual void onTimeout();
+ virtual void onError();
+
+};
+
+
+#endif
--- /dev/null
+#include "HTTPTransport.H"
+
+#include <iostream>
+
+
+HTTPTransport::Factory HTTPTransport::theFactory;
+
+
+HTTPTransport::~HTTPTransport()
+{
+}
+
+
+void
+HTTPTransport::onReady()
+{
+ char buffer[256];
+ int len;
+
+ len = conn->read(buffer, sizeof(buffer));
+ if(len < 0) {
+ std::cout << "error on receive - closing connection" << std::endl;
+ } else if ( len > 0) {
+ std::cout.write(buffer, len);
+ std::cout.flush();
+ ThreadPool::theThreadPool.queueWorkRead(this);
+ } else {
+ std::cout << "no more data" << std::endl;
+ }
+}
+
+
+void
+HTTPTransport::onTimeout()
+{
+}
+
+
+void
+HTTPTransport::onError()
+{
+}
--- /dev/null
+
+CXX = c++
+CC = gcc
+
+CFLAGS = -g -Wall
+
+COMPILEPP = $(CXX) $(CXXFLAGS) $(CFLAGS)
+COMPILE = $(CC) $(CFLAGS)
+LINK = c++ $(LDFLAGS)
+
+THREAD_LIB = -lpthread
+
+
+plain: SocketInput.o PlainConnection.o HTTPTransport.o PluginManager.o ThreadPool.o main.o
+ $(LINK) -o $@ $+ $(THREAD_LIB)
+
+%.o: %.cpp
+ $(COMPILEPP) -o $@ -c $<
+
+%.o: %.c
+ $(COMPILE) -o $@ -c $<
\ No newline at end of file
--- /dev/null
+#ifndef _PLAIN_CONNECTION_H
+#define _PLAIN_CONNECTION_H
+
+#include "Connection.H"
+
+
+class PlainConnection:
+ public Connection
+{
+public:
+ class Factory: public Connection::Factory {
+ public:
+ virtual Connection *newConnection(int fd) const {
+ return new PlainConnection(fd);
+ }
+
+ virtual Connection *accept(int fd) const;
+ };
+
+ static Factory theFactory;
+
+ PlainConnection(int a_fd): Connection(a_fd)
+ {}
+
+ virtual ~PlainConnection();
+
+ // from Connection
+ virtual int read(char *buf, unsigned int len);
+ virtual int write(char *buf, unsigned int len);
+
+};
+
+#endif
--- /dev/null
+#include "PlainConnection.H"
+#include "ThreadPool.H"
+
+#include <sys/types.h>
+#include <sys/socket.h>
+
+PlainConnection::Factory PlainConnection::theFactory;
+
+
+PlainConnection::~PlainConnection()
+{
+}
+
+
+Connection *
+PlainConnection::Factory::accept(int fd) const
+{
+ int nfd;
+
+ nfd = ::accept(fd, NULL, NULL);
+ return newConnection(nfd);
+}
+
+
+int
+PlainConnection::read(char *buf, unsigned int len)
+{
+ int ret;
+
+ ret = ::recv(fd, buf, len, MSG_NOSIGNAL);
+ return ret;
+}
+
+
+int
+PlainConnection::write(char *buf, unsigned int len)
+{
+}
--- /dev/null
+#ifndef _PLUGIN_MANAGER_H
+#define _PLUGIN_MANAGER_H
+
+#include <list>
+
+#include <iostream>
+
+class PluginManager {
+public:
+ // the plugin manager instance
+ static PluginManager thePluginManager;
+
+
+ // base class for plugins
+ class Plugin {
+ public:
+ const char *name;
+
+ Plugin(const char *aname) : name(aname) {
+ PluginManager::thePluginManager.registerPlugin(this, aname);
+ }
+
+ virtual bool initialize() = 0;
+ virtual bool cleanup () = 0;
+ };
+
+ // add plugin with given name to the list of registered plugins
+ void registerPlugin(Plugin *plugin, const char *name) {
+ pluginList.push_front(plugin);
+ }
+
+ // remove plugin from the list
+ void removePlugin();
+
+ // initialize all plugins on list
+ void initialize() {
+ for(std::list<Plugin *>::iterator i = pluginList.begin();
+ i != pluginList.end();
+ i++) {
+ (*i)->initialize();
+ }
+
+ }
+
+ // cleanup all plugins on list
+ void cleanup() {
+ for(std::list<Plugin *>::iterator i = pluginList.begin();
+ i != pluginList.end();
+ i++) {
+ (*i)->cleanup();
+ }
+
+ }
+
+
+private:
+ // list of registered plugins
+ std::list<Plugin *> pluginList;
+
+ // singleton class with private default constructor
+ PluginManager() : pluginList()
+ {};
+};
+
+
+#endif
--- /dev/null
+#include "PluginManager.H"
+
+PluginManager PluginManager::thePluginManager;
+
--- /dev/null
+#ifndef _SOCKET_INPUT_H
+#define _SOCKET_INPUT_H
+
+#include "ThreadPool.H"
+#include "Connection.H"
+#include "Transport.H"
+
+#include "sys/un.h"
+
+
+class SocketInput: public ThreadPool::WorkDescription
+{
+public:
+ SocketInput(const char *path,
+ const Connection::Factory *a_cfactory,
+ const Transport::Factory *a_tfactory);
+ virtual ~SocketInput();
+
+ // from WorkDescription
+ virtual void onReady();
+ virtual void onTimeout();
+ virtual void onError();
+
+private:
+ static const int SOCK_QUEUE_MAX = 5;
+
+ struct sockaddr_un saddr;
+ const Connection::Factory *cFactory;
+ const Transport::Factory *tFactory;
+};
+
+#endif
--- /dev/null
+#include <unistd.h>
+#include <sys/socket.h>
+#include <sys/types.h>
+#include <sys/un.h>
+#include <errno.h>
+
+#include "ThreadPool.H"
+#include "SocketInput.H"
+
+
+
+// create unix domain socket for input
+SocketInput::SocketInput(const char *path,
+ const Connection::Factory *a_cfactory,
+ const Transport::Factory *a_tfactory)
+ : ThreadPool::WorkDescription(0),
+ cFactory(a_cfactory),
+ tFactory(a_tfactory)
+{
+ memset(&saddr, 0, sizeof(saddr));
+ saddr.sun_family = AF_UNIX;
+ strcpy(saddr.sun_path, path);
+ fd = socket(PF_UNIX, SOCK_STREAM, 0);
+ if(connect(fd, (struct sockaddr*)&saddr, sizeof(saddr.sun_path)) < 0) {
+ if(errno == ECONNREFUSED) {
+ unlink(saddr.sun_path);
+ }
+ } else {
+ // another instance running
+ // throw new Exception
+ }
+ bind(fd, (struct sockaddr *)&saddr, sizeof(saddr));
+ listen(fd, SOCK_QUEUE_MAX);
+ ThreadPool::theThreadPool.setWorkAccept(this);
+}
+
+
+// remove the socket
+SocketInput::~SocketInput()
+{
+ if(fd >= 0)
+ close(fd);
+ unlink(saddr.sun_path);
+}
+
+
+void
+SocketInput::onReady()
+{
+ Connection *conn = cFactory->accept(fd);
+ Transport *trans = tFactory->newTransport(conn);
+ ThreadPool::theThreadPool.queueWorkRead(trans);
+}
+
+
+void
+SocketInput::onTimeout()
+{
+ // nothing special, just sit around
+}
+
+
+void
+SocketInput::onError()
+{
+ // should report an error?
+}
--- /dev/null
+#ifndef _THREAD_POOL_H
+#define _THREAD_POOL_H
+
+#include <pthread.h>
+#include <poll.h>
+#include <time.h>
+
+#include <list>
+
+class ThreadPool {
+public:
+ const static int default_timeout = 5;
+
+ class WorkDescription {
+ friend class ThreadPool;
+ public:
+ int fd;
+
+ WorkDescription(int afd)
+ : fd(afd), event(NONE) {}
+
+ virtual void onReady()
+ {}
+
+ virtual void onTimeout()
+ {}
+
+ virtual void onError()
+ {}
+
+ protected:
+ enum Event { NONE, READY, TIMEOUT, ERROR } event;
+ void doWork();
+ };
+
+public:
+ static ThreadPool theThreadPool;
+
+ static ThreadPool *getThreadPool()
+ { return &theThreadPool; }
+
+ void startWorkers(unsigned int n);
+ void stopWorkers();
+
+ void postWork(WorkDescription *work_unit);
+
+ void queueWorkAccept(WorkDescription *work_unit);
+ void queueWorkRead(WorkDescription *work_unit);
+ void queueWorkWrite(WorkDescription *work_unit);
+ void queueWorkTimeout(WorkDescription *work_unit);
+ void queueWorkConnect(WorkDescription *work_unit);
+
+ void setWorkAccept(WorkDescription *work_unit);
+ void setWorkRead(WorkDescription *work_unit);
+ void setWorkWrite(WorkDescription *work_unit);
+ void setWorkTimeout(WorkDescription *work_unit);
+
+ void run();
+ void exit()
+ { f_exit = true; pthread_cond_signal(&wait_queue_cond_ready); }
+
+protected:
+ ThreadPool();
+ ~ThreadPool();
+
+ WorkDescription *getWork();
+
+private:
+ class WaitDesc {
+ public:
+ WorkDescription *wd;
+ short event;
+ bool f_permanent;
+ struct timeval timeout;
+
+ WaitDesc(WorkDescription *w, short e,
+ bool permanent = false, int t = default_timeout)
+ : wd(w), event(e), f_permanent(permanent) {
+ timeout.tv_sec = t;
+ timeout.tv_usec = 0;
+ }
+
+ int get_fd() { return wd->fd; };
+ void adjustTimeout(const struct timeval &delta);
+ int timeoutExpired() { return((timeout.tv_sec < 0) ||
+ (timeout.tv_sec == 0 && timeout.tv_usec == 0)); }
+ };
+
+private:
+ bool f_exit;
+ int num_workers;
+ pthread_t *workers;
+ int work_count;
+ int wait_count;
+ int ufds_size;
+ std::list<WorkDescription *> work_queue;
+ std::list<WaitDesc *> wait_queue;
+ pthread_mutex_t work_queue_mutex;
+ pthread_cond_t work_queue_cond_ready;
+ pthread_cond_t work_queue_cond_full;
+ pthread_mutex_t wait_queue_mutex;
+ pthread_cond_t wait_queue_cond_ready;
+ struct pollfd *ufds;
+ int pd[2];
+ struct timeval min_timeout;
+
+ void prepareDescriptorArray();
+ void removeWaitDesc(std::list<WaitDesc *>::iterator &i);
+ void removeWorkDesc();
+ void queueWork(WaitDesc *);
+
+ static void *threadMain(void *);
+ static void threadCleanup(void *);
+};
+
+#endif
--- /dev/null
+#include <time.h>
+#include <pthread.h>
+#include <poll.h>
+#include <sys/time.h>
+#include <time.h>
+#include <stdlib.h>
+
+#include <iostream>
+
+#include "ThreadPool.H"
+#include "Exception.H"
+
+ThreadPool ThreadPool::theThreadPool;
+
+static inline
+void
+tv_sub(struct timeval &a, const struct timeval &b) {
+ a.tv_usec -= b.tv_usec;
+ a.tv_sec -= b.tv_sec;
+ if (a.tv_usec < 0) {
+ a.tv_sec--;
+ a.tv_usec += 1000000;
+ }
+}
+
+
+static inline
+int
+tv_cmp(const struct timeval &a, const struct timeval &b) {
+ if(a.tv_sec < b.tv_sec) {
+ return -1;
+ } else if(a.tv_sec > b.tv_sec) {
+ return 1;
+ } else {
+ if (a.tv_usec < b.tv_usec) {
+ return -1;
+ } else if(a.tv_usec > b.tv_usec) {
+ return 1;
+ } else {
+ return 0;
+ }
+ }
+}
+
+
+inline
+void
+ThreadPool::WorkDescription::doWork() {
+ switch(event) {
+ case READY:
+ onReady();
+ break;
+ case TIMEOUT:
+ onTimeout();
+ break;
+ case ERROR:
+ onError();
+ break;
+ default:
+ break;
+ }
+}
+
+
+inline
+void
+ThreadPool::WaitDesc::adjustTimeout(const struct timeval &delta)
+{
+ tv_sub(timeout, delta);
+}
+
+
+ThreadPool::ThreadPool()
+ : work_count(0), wait_count(0), ufds_size(0), ufds(NULL), f_exit(false)
+{
+ pthread_mutex_init(&wait_queue_mutex, NULL);
+ pthread_mutex_init(&work_queue_mutex, NULL);
+ pthread_cond_init(&work_queue_cond_ready, NULL);
+ pthread_cond_init(&work_queue_cond_full, NULL);
+ pthread_cond_init(&wait_queue_cond_ready, NULL);
+ pipe(pd);
+ ufds = static_cast<struct pollfd *>(malloc(sizeof(struct pollfd)));
+ ufds->fd = pd[0];
+ ufds->events = POLLIN;
+ ufds_size = 1;
+}
+
+
+ThreadPool::~ThreadPool()
+{
+ pthread_cond_destroy(&work_queue_cond_full);
+ pthread_cond_destroy(&work_queue_cond_ready);
+ pthread_cond_destroy(&wait_queue_cond_ready);
+ pthread_mutex_destroy(&work_queue_mutex);
+ pthread_mutex_destroy(&wait_queue_mutex);
+}
+
+
+void
+ThreadPool::startWorkers(unsigned int n)
+{
+ workers = new pthread_t[n];
+
+ num_workers = n;
+ for(unsigned int i = 0; i < n; i++) {
+ pthread_create(&workers[i], NULL, ThreadPool::threadMain, NULL);
+ }
+}
+
+
+void
+ThreadPool::stopWorkers()
+{
+ for(int i = 0; i < num_workers; i++) {
+ pthread_cancel(workers[i]);
+ pthread_join(workers[i], NULL);
+ }
+ delete[] workers;
+}
+
+
+void
+ThreadPool::postWork(WorkDescription *work_unit)
+{
+ pthread_mutex_lock(&work_queue_mutex);
+ work_queue.push_back(work_unit);
+ work_count++;
+ pthread_cond_signal(&work_queue_cond_ready);
+ pthread_mutex_unlock(&work_queue_mutex);
+}
+
+
+inline
+void
+ThreadPool::queueWork(WaitDesc *wd)
+{
+ pthread_mutex_lock(&wait_queue_mutex);
+ wait_queue.push_back(wd);
+ wait_count++;
+ pthread_cond_signal(&wait_queue_cond_ready);
+ pthread_mutex_unlock(&wait_queue_mutex);
+ write(pd[1], "1", 1);
+}
+
+
+void
+ThreadPool::queueWorkAccept(WorkDescription *work_unit)
+{
+ queueWork(new WaitDesc(work_unit, POLLIN));
+}
+
+
+void
+ThreadPool::queueWorkRead(WorkDescription *work_unit)
+{
+ queueWork(new WaitDesc(work_unit, POLLIN));
+}
+
+
+void
+ThreadPool::queueWorkWrite(WorkDescription *work_unit)
+{
+ queueWork(new WaitDesc(work_unit, POLLOUT));
+}
+
+
+void
+ThreadPool::queueWorkTimeout(WorkDescription *work_unit)
+{
+ queueWork(new WaitDesc(work_unit, 0));
+}
+
+
+void
+ThreadPool::queueWorkConnect(WorkDescription *work_unit)
+{
+ queueWork(new WaitDesc(work_unit, POLLIN));
+}
+
+
+void
+ThreadPool::setWorkAccept(WorkDescription *work_unit)
+{
+ queueWork(new WaitDesc(work_unit, POLLIN, true));
+}
+
+
+void
+ThreadPool::setWorkRead(WorkDescription *work_unit)
+{
+ queueWork(new WaitDesc(work_unit, POLLIN, true));
+}
+
+
+void
+ThreadPool::setWorkWrite(WorkDescription *work_unit)
+{
+ queueWork(new WaitDesc(work_unit, POLLOUT, true));
+}
+
+
+void
+ThreadPool::setWorkTimeout(WorkDescription *work_unit)
+{
+ queueWork(new WaitDesc(work_unit, 0, true));
+}
+
+
+ThreadPool::WorkDescription *
+ThreadPool::getWork()
+{
+ WorkDescription *work_unit = NULL;
+ struct timespec timeout;
+
+ pthread_mutex_lock(&work_queue_mutex);
+ if(work_count == 0) {
+ timeout.tv_sec = 1;
+ timeout.tv_nsec = 0;
+// pthread_cond_timedwait(&work_queue_cond_ready, &work_queue_mutex, &timeout);
+ pthread_cond_wait(&work_queue_cond_ready, &work_queue_mutex);
+ }
+ if(work_count > 0) {
+ work_count--;
+ work_unit = work_queue.front();
+ work_queue.pop_front();
+ }
+ pthread_mutex_unlock(&work_queue_mutex);
+ return work_unit;
+}
+
+void
+ThreadPool::threadCleanup(void *data)
+{
+ ThreadPool *pool = ThreadPool::getThreadPool();
+
+ pthread_mutex_unlock(&(pool->work_queue_mutex));
+}
+
+
+void *
+ThreadPool::threadMain(void *data)
+{
+ ThreadPool *pool = ThreadPool::getThreadPool();
+ WorkDescription *work_unit;
+
+ pthread_cleanup_push(ThreadPool::threadCleanup, NULL);
+ while(true) {
+
+ work_unit = pool->getWork();
+ if(work_unit) {
+ // something to work on
+ work_unit->doWork();
+ } else {
+ // timed out waiting for work
+ }
+ }
+ pthread_cleanup_pop(1);
+}
+
+
+void
+ThreadPool::removeWaitDesc(std::list<WaitDesc *>::iterator &i)
+{
+ std::list<WaitDesc *>::iterator j = i;
+
+ // actually this is safe even for the first element
+ pthread_mutex_lock(&wait_queue_mutex);
+ j--;
+ wait_queue.erase(i);
+ wait_count--;
+ i = j;
+ pthread_mutex_unlock(&wait_queue_mutex);
+}
+
+
+void
+ThreadPool::prepareDescriptorArray()
+{
+ std::list<WaitDesc *>::iterator theIterator;
+ struct pollfd *p;
+
+ pthread_mutex_lock(&wait_queue_mutex);
+ if(wait_count == 0) {
+ pthread_cond_wait(&wait_queue_cond_ready, &wait_queue_mutex);
+ }
+ if(wait_count == 0) {
+ pthread_mutex_unlock(&wait_queue_mutex);
+ return;
+ }
+ if(ufds_size != wait_count + 1) {
+ ufds = static_cast<struct pollfd *>(realloc(ufds, (1 + wait_count) * sizeof(struct pollfd)));
+ if(ufds == NULL) {
+// throw new Exception();
+ }
+ ufds_size = wait_count + 1;
+ }
+ min_timeout.tv_sec = default_timeout;
+ min_timeout.tv_usec = 0;
+ for(theIterator = wait_queue.begin(), p = ufds + 1;
+ theIterator != wait_queue.end();
+ theIterator++, p++) {
+ WaitDesc *w = *theIterator;
+ p->fd = w->get_fd();
+ p->events = w->event;
+ if(tv_cmp(min_timeout, w->timeout) > 0) {
+ min_timeout = w->timeout;
+ }
+ }
+ pthread_mutex_unlock(&wait_queue_mutex);
+}
+
+
+void
+ThreadPool::run()
+{
+ f_exit = false;
+ while(!f_exit) {
+ struct pollfd *p;
+ struct timeval before, after;
+ int ret;
+
+ // may block waiting for new work
+ prepareDescriptorArray();
+
+ gettimeofday(&before, NULL);
+ ret = poll(ufds, ufds_size, 1000*min_timeout.tv_sec + min_timeout.tv_usec/1000);
+ gettimeofday(&after, NULL);
+ tv_sub(after, before);
+
+ if((ret >= 0) || // ready or timeout
+ ((ret < 0) && (errno == EINTR))) { // interrupted
+ std::list<WaitDesc *>::iterator i;
+ WaitDesc *w;
+
+ // handle the pipe
+ if(ufds->revents & POLLIN) {
+ char discard[1];
+ read(ufds->fd, discard, 1);
+ }
+
+ // at least we have to adjust timeouts
+ pthread_mutex_lock(&wait_queue_mutex);
+ i = wait_queue.begin();
+ pthread_mutex_unlock(&wait_queue_mutex);
+ // the wait queue mutex is unlocked inside the loop
+ // to allow handlers to add queue new
+ // WorkDescriptions - these are added at the
+ // end of the list so we should be safe
+ for(p = ufds + 1; p - ufds < ufds_size; p++) {
+ enum WorkDescription::Event event = WorkDescription::NONE;
+
+ w = *i;
+ // check for consistency
+ if(p->fd != w->get_fd()) {
+ // mismatch, what shall we do?
+ abort();
+ }
+
+ // subtract the time passed from timeout
+ w->adjustTimeout(after);
+
+ // see what happened
+ if(ret <= 0) {
+ // timeout or interrupted
+ if(w->timeoutExpired()) {
+ event = WorkDescription::TIMEOUT;
+ }
+ } else {
+ // ready or error
+ if(p->revents & POLLERR) {
+ event = WorkDescription::ERROR;
+ } else if(p->revents & w->event) {
+ event = WorkDescription::READY;
+ } else if(w->timeoutExpired()) {
+ event = WorkDescription::TIMEOUT;
+ }
+ }
+ if(event != WorkDescription::NONE) {
+ WorkDescription *wd;
+ wd = w->wd;
+ wd->event = event;
+ if(!w->f_permanent) {
+ postWork(wd);
+ removeWaitDesc(i);
+ delete w;
+ } else {
+ w->wd->doWork();
+ // we have to reset the timeout
+ w->timeout.tv_sec = default_timeout;
+ w->timeout.tv_usec = 0;
+ }
+ }
+ pthread_mutex_lock(&wait_queue_mutex);
+ i++;
+ pthread_mutex_unlock(&wait_queue_mutex);
+ }
+ } else {
+ // some nasty error
+ }
+ }
+}
+
--- /dev/null
+#ifndef _TRANSPORT_H
+#define _TRANSPORT_H
+
+#include "Connection.H"
+
+
+class Transport: public ThreadPool::WorkDescription {
+public:
+ class Factory {
+ public:
+ virtual Transport *newTransport(Connection *conn) const = 0;
+ };
+
+ Transport(Connection *a_conn)
+ : conn(a_conn),
+ ThreadPool::WorkDescription(a_conn ? a_conn->getFD() : -1)
+ {}
+
+ virtual ~Transport()
+ { if(conn) delete conn; }
+
+protected:
+ Connection *conn;
+
+};
+#endif
--- /dev/null
+#include "PluginManager.H"
+#include "ThreadPool.H"
+#include "SocketInput.H"
+#include "PlainConnection.H"
+#include "HTTPTransport.H"
+
+const int num_threads = 2;
+const char *sock_path = "il_sock";
+
+int main(int argc, char *argv[])
+{
+ SocketInput *input;
+
+ // initialize plugins
+ PluginManager::thePluginManager.initialize();
+
+ // create unix socket with plain IO and HTTP transport
+ input = new SocketInput(sock_path,
+ &PlainConnection::theFactory,
+ &HTTPTransport::theFactory);
+
+ // start worker threads
+ ThreadPool::theThreadPool.startWorkers(num_threads);
+
+ // run the main loop
+ ThreadPool::theThreadPool.run();
+
+ // cleanup & exit
+ delete input;
+ PluginManager::thePluginManager.cleanup();
+
+ return 0;
+}
--- /dev/null
+#include <cppunit/extensions/HelperMacros.h>
+
+#include "PluginManager.H"
+
+class TestPlugin : public PluginManager::Plugin {
+public:
+ bool inited, cleaned;
+
+ virtual bool initialize() {
+ inited = true;
+ }
+
+ virtual bool cleanup() {
+ cleaned = true;
+ }
+
+ static TestPlugin theTestPlugin;
+
+private:
+ TestPlugin() : PluginManager::Plugin("test plugin"),
+ inited(false),
+ cleaned(false)
+ {}
+
+
+
+};
+
+
+class PluginManagerTest : public CppUnit::TestFixture
+{
+ CPPUNIT_TEST_SUITE(PluginManagerTest);
+ CPPUNIT_TEST(testInit);
+ CPPUNIT_TEST(testClean);
+ CPPUNIT_TEST_SUITE_END();
+
+public:
+ void setUp() {
+ }
+
+ void tearDown() {
+ }
+
+ void testInit() {
+ PluginManager::thePluginManager.initialize();
+ CPPUNIT_ASSERT(TestPlugin::theTestPlugin.inited);
+ }
+
+ void testClean() {
+ PluginManager::thePluginManager.cleanup();
+ CPPUNIT_ASSERT(TestPlugin::theTestPlugin.cleaned);
+ }
+};
+
+
+TestPlugin TestPlugin::theTestPlugin;
+
+CPPUNIT_TEST_SUITE_REGISTRATION( PluginManagerTest );
--- /dev/null
+#include <cppunit/extensions/HelperMacros.h>
+#include <unistd.h>
+#include <sys/socket.h>
+#include <sys/types.h>
+#include <sys/un.h>
+
+#include <iostream>
+
+#include "ThreadPool.H"
+
+class TestWork : public ThreadPool::WorkDescription {
+public:
+ int done;
+
+ TestWork(int fd) : ThreadPool::WorkDescription(fd), done(0) {};
+
+ virtual void onReady() {
+ done++;
+ }
+
+};
+
+
+class TestConsumer : public ThreadPool::WorkDescription {
+public:
+ char buf[2];
+
+ TestConsumer(int fd) : ThreadPool::WorkDescription(fd) {};
+
+ virtual void onReady() {
+ int r;
+
+ r = read(fd, buf, 1);
+ buf[1] = 0;
+ ThreadPool::getThreadPool()->exit();
+ }
+
+ virtual void onTimeout() {
+ }
+
+};
+
+
+class TestProducer : public ThreadPool::WorkDescription {
+public:
+ TestProducer(int fd) : ThreadPool::WorkDescription(fd) {};
+
+ virtual void onReady() {
+ write(fd, "a", 1);
+ }
+
+ virtual void onTimeout() {
+ }
+
+};
+
+
+class TestSocketRead: public ThreadPool::WorkDescription {
+public:
+ char buffer[10];
+
+ TestSocketRead(int fd) : ThreadPool::WorkDescription(fd) {
+ }
+
+ virtual void onReady() {
+
+ int len = recv(fd, buffer, sizeof(buffer), MSG_NOSIGNAL);
+ ThreadPool::getThreadPool()->exit();
+ }
+
+ virtual void onError() {
+ }
+};
+
+
+class TestSocketWrite: public ThreadPool::WorkDescription {
+public:
+ static char buffer[];
+
+ TestSocketWrite(const char *name)
+ : ThreadPool::WorkDescription(0) {
+ struct sockaddr_un saddr;
+ int ret;
+ fd = socket(PF_UNIX, SOCK_STREAM, 0);
+ memset(&saddr, 0, sizeof(saddr));
+ saddr.sun_family = AF_UNIX;
+ strcpy(saddr.sun_path, name);
+ if((ret = connect(fd, (struct sockaddr *)&saddr, sizeof(saddr))) < 0) {
+ }
+ }
+
+ virtual void onReady() {
+ int ret;
+
+ ret = send(fd, buffer, strlen(buffer)+1, MSG_NOSIGNAL);
+ close(fd);
+ }
+
+};
+
+char TestSocketWrite::buffer[] = "ahoj";
+
+class TestSocketAccept : public ThreadPool::WorkDescription {
+public:
+ TestSocketRead *reader;
+
+ TestSocketAccept(const char *name)
+ : ThreadPool::WorkDescription(0) {
+ struct sockaddr_un saddr;
+
+ fd = socket(PF_UNIX, SOCK_STREAM, 0);
+ memset(&saddr, 0, sizeof(saddr));
+ saddr.sun_family = AF_UNIX;
+ strcpy(saddr.sun_path, name);
+ bind(fd, (struct sockaddr *)&saddr, sizeof(saddr));
+ listen(fd, 1);
+ }
+
+ virtual void onReady() {
+ int nfd;
+
+ nfd = accept(fd, NULL, NULL);
+ if(nfd < 0) {
+ } else {
+ ThreadPool *pool = ThreadPool::getThreadPool();
+
+ reader = new TestSocketRead(nfd);
+ pool->queueWorkRead(reader);
+ }
+ }
+};
+
+
+class ThreadPoolTest: public CppUnit::TestFixture
+{
+ CPPUNIT_TEST_SUITE( ThreadPoolTest );
+// CPPUNIT_TEST( testWorkQueue );
+ CPPUNIT_TEST( testPoll );
+ CPPUNIT_TEST( testAccept );
+ CPPUNIT_TEST_SUITE_END();
+
+public:
+ void setUp() {
+ pool = ThreadPool::getThreadPool();
+ unlink("/tmp/smazat.sock");
+ pool->startWorkers(2);
+ }
+
+ void tearDown() {
+ pool->stopWorkers();
+ }
+
+ void testWorkQueue() {
+ TestWork *work_unit = new TestWork(0);
+ pool->postWork(work_unit);
+ }
+
+ void testPoll() {
+ int fd[2];
+ TestProducer *p = new TestProducer(0);
+ TestConsumer *c = new TestConsumer(0);
+
+ pipe(fd);
+ c->fd = fd[0];
+ p->fd = fd[1];
+ pool->queueWorkRead(c);
+ pool->queueWorkWrite(p);
+ pool->run();
+ CPPUNIT_ASSERT(c->buf[0] == 'a');
+ CPPUNIT_ASSERT(c->buf[1] == 0);
+ }
+
+ void testAccept() {
+ TestSocketAccept *consumer = new TestSocketAccept("/tmp/smazat.sock");
+ TestSocketWrite *producer;
+
+ pool->queueWorkAccept(consumer);
+ producer = new TestSocketWrite("/tmp/smazat.sock");
+ ThreadPool::getThreadPool()->queueWorkWrite(producer);
+ pool->run();
+ CPPUNIT_ASSERT(consumer->reader != NULL);
+ CPPUNIT_ASSERT(strcmp(consumer->reader->buffer, TestSocketWrite::buffer) == 0);
+ }
+
+private:
+ ThreadPool *pool;
+};
+
+
+CPPUNIT_TEST_SUITE_REGISTRATION( ThreadPoolTest );
--- /dev/null
+#include <assert.h>
+#include <fstream>
+
+#include <cppunit/extensions/TestFactoryRegistry.h>
+#include <cppunit/CompilerOutputter.h>
+//#include <cppunit/XmlOutputter.h>
+#include <cppunit/TestRunner.h>
+#include <cppunit/TestResult.h>
+#include <cppunit/TestResultCollector.h>
+
+int main (int argc,const char *argv[])
+{
+ CppUnit::Test *suite = CppUnit::TestFactoryRegistry::getRegistry().makeTest();
+
+// assert(argc == 2);
+// std::ofstream xml(argv[1]);
+
+ CppUnit::TestResult controller;
+ CppUnit::TestResultCollector result;
+ controller.addListener( &result );
+
+ CppUnit::TestRunner runner;
+ runner.addTest(suite);
+ runner.run(controller);
+
+// CppUnit::XmlOutputter xout( &result, xml );
+ CppUnit::CompilerOutputter tout( &result, std::cout);
+// xout.write();
+ tout.write();
+
+ return result.wasSuccessful() ? 0 : 1 ;
+}