From 61ee2bd5e4421f7ed280ea4824947f2b2875fc24 Mon Sep 17 00:00:00 2001 From: =?utf8?q?Michal=20Voc=C5=AF?= Date: Tue, 6 Mar 2007 11:19:30 +0000 Subject: [PATCH] * import of new interlogger skeleton --- org.glite.lb.logger/src-nt/Connection.H | 32 ++ org.glite.lb.logger/src-nt/HTTPTransport.H | 34 ++ org.glite.lb.logger/src-nt/HTTPTransport.cpp | 42 +++ org.glite.lb.logger/src-nt/Makefile | 21 ++ org.glite.lb.logger/src-nt/PlainConnection.H | 33 ++ org.glite.lb.logger/src-nt/PlainConnection.cpp | 38 ++ org.glite.lb.logger/src-nt/PluginManager.H | 66 ++++ org.glite.lb.logger/src-nt/PluginManager.cpp | 4 + org.glite.lb.logger/src-nt/SocketInput.H | 32 ++ org.glite.lb.logger/src-nt/SocketInput.cpp | 67 ++++ org.glite.lb.logger/src-nt/ThreadPool.H | 116 ++++++ org.glite.lb.logger/src-nt/ThreadPool.cpp | 402 +++++++++++++++++++++ org.glite.lb.logger/src-nt/Transport.H | 26 ++ org.glite.lb.logger/src-nt/main.cpp | 33 ++ .../src-nt/test/PluginManagerTest.cpp | 58 +++ org.glite.lb.logger/src-nt/test/ThreadPoolTest.cpp | 190 ++++++++++ org.glite.lb.logger/src-nt/test/test_main.cpp | 32 ++ 17 files changed, 1226 insertions(+) create mode 100644 org.glite.lb.logger/src-nt/Connection.H create mode 100644 org.glite.lb.logger/src-nt/HTTPTransport.H create mode 100644 org.glite.lb.logger/src-nt/HTTPTransport.cpp create mode 100644 org.glite.lb.logger/src-nt/Makefile create mode 100644 org.glite.lb.logger/src-nt/PlainConnection.H create mode 100644 org.glite.lb.logger/src-nt/PlainConnection.cpp create mode 100644 org.glite.lb.logger/src-nt/PluginManager.H create mode 100644 org.glite.lb.logger/src-nt/PluginManager.cpp create mode 100644 org.glite.lb.logger/src-nt/SocketInput.H create mode 100644 org.glite.lb.logger/src-nt/SocketInput.cpp create mode 100644 org.glite.lb.logger/src-nt/ThreadPool.H create mode 100644 org.glite.lb.logger/src-nt/ThreadPool.cpp create mode 100644 org.glite.lb.logger/src-nt/Transport.H create mode 100644 org.glite.lb.logger/src-nt/main.cpp create mode 100644 org.glite.lb.logger/src-nt/test/PluginManagerTest.cpp create mode 100644 org.glite.lb.logger/src-nt/test/ThreadPoolTest.cpp create mode 100644 org.glite.lb.logger/src-nt/test/test_main.cpp diff --git a/org.glite.lb.logger/src-nt/Connection.H b/org.glite.lb.logger/src-nt/Connection.H new file mode 100644 index 0000000..692c019 --- /dev/null +++ b/org.glite.lb.logger/src-nt/Connection.H @@ -0,0 +1,32 @@ +#ifndef _CONNECTION_H +#define _CONNECTION_H + + +class Connection { +public: + class Factory { + public: + virtual Connection *newConnection(int fd) const = 0; + virtual Connection *accept(int fd) const = 0; + }; + + class Endpoint { + }; + + Connection(int a_fd) : fd(a_fd) + {} + + virtual ~Connection() + {} + + virtual int getFD() const + { return fd; } + + virtual int read(char *buf, unsigned int len) = 0; + virtual int write(char *buf, unsigned int len) = 0; + +protected: + int fd; +}; + +#endif diff --git a/org.glite.lb.logger/src-nt/HTTPTransport.H b/org.glite.lb.logger/src-nt/HTTPTransport.H new file mode 100644 index 0000000..0148254 --- /dev/null +++ b/org.glite.lb.logger/src-nt/HTTPTransport.H @@ -0,0 +1,34 @@ +#ifndef _HTTP_TRANSPORT_H +#define _HTTP_TRANSPORT_H + +#include "ThreadPool.H" +#include "Transport.H" + +class HTTPTransport: + public Transport +{ +public: + class Factory: public Transport::Factory { + public: + virtual Transport *newTransport(Connection *conn) const { + return(new HTTPTransport(conn)); + } + }; + + static Factory theFactory; + + HTTPTransport(Connection *conn) + : Transport(conn) + {} + + virtual ~HTTPTransport(); + + // from ThreadPool::WorkDescription + virtual void onReady(); + virtual void onTimeout(); + virtual void onError(); + +}; + + +#endif diff --git a/org.glite.lb.logger/src-nt/HTTPTransport.cpp b/org.glite.lb.logger/src-nt/HTTPTransport.cpp new file mode 100644 index 0000000..afd4df1 --- /dev/null +++ b/org.glite.lb.logger/src-nt/HTTPTransport.cpp @@ -0,0 +1,42 @@ +#include "HTTPTransport.H" + +#include + + +HTTPTransport::Factory HTTPTransport::theFactory; + + +HTTPTransport::~HTTPTransport() +{ +} + + +void +HTTPTransport::onReady() +{ + char buffer[256]; + int len; + + len = conn->read(buffer, sizeof(buffer)); + if(len < 0) { + std::cout << "error on receive - closing connection" << std::endl; + } else if ( len > 0) { + std::cout.write(buffer, len); + std::cout.flush(); + ThreadPool::theThreadPool.queueWorkRead(this); + } else { + std::cout << "no more data" << std::endl; + } +} + + +void +HTTPTransport::onTimeout() +{ +} + + +void +HTTPTransport::onError() +{ +} diff --git a/org.glite.lb.logger/src-nt/Makefile b/org.glite.lb.logger/src-nt/Makefile new file mode 100644 index 0000000..abd2193 --- /dev/null +++ b/org.glite.lb.logger/src-nt/Makefile @@ -0,0 +1,21 @@ + +CXX = c++ +CC = gcc + +CFLAGS = -g -Wall + +COMPILEPP = $(CXX) $(CXXFLAGS) $(CFLAGS) +COMPILE = $(CC) $(CFLAGS) +LINK = c++ $(LDFLAGS) + +THREAD_LIB = -lpthread + + +plain: SocketInput.o PlainConnection.o HTTPTransport.o PluginManager.o ThreadPool.o main.o + $(LINK) -o $@ $+ $(THREAD_LIB) + +%.o: %.cpp + $(COMPILEPP) -o $@ -c $< + +%.o: %.c + $(COMPILE) -o $@ -c $< \ No newline at end of file diff --git a/org.glite.lb.logger/src-nt/PlainConnection.H b/org.glite.lb.logger/src-nt/PlainConnection.H new file mode 100644 index 0000000..3551286 --- /dev/null +++ b/org.glite.lb.logger/src-nt/PlainConnection.H @@ -0,0 +1,33 @@ +#ifndef _PLAIN_CONNECTION_H +#define _PLAIN_CONNECTION_H + +#include "Connection.H" + + +class PlainConnection: + public Connection +{ +public: + class Factory: public Connection::Factory { + public: + virtual Connection *newConnection(int fd) const { + return new PlainConnection(fd); + } + + virtual Connection *accept(int fd) const; + }; + + static Factory theFactory; + + PlainConnection(int a_fd): Connection(a_fd) + {} + + virtual ~PlainConnection(); + + // from Connection + virtual int read(char *buf, unsigned int len); + virtual int write(char *buf, unsigned int len); + +}; + +#endif diff --git a/org.glite.lb.logger/src-nt/PlainConnection.cpp b/org.glite.lb.logger/src-nt/PlainConnection.cpp new file mode 100644 index 0000000..156ec6e --- /dev/null +++ b/org.glite.lb.logger/src-nt/PlainConnection.cpp @@ -0,0 +1,38 @@ +#include "PlainConnection.H" +#include "ThreadPool.H" + +#include +#include + +PlainConnection::Factory PlainConnection::theFactory; + + +PlainConnection::~PlainConnection() +{ +} + + +Connection * +PlainConnection::Factory::accept(int fd) const +{ + int nfd; + + nfd = ::accept(fd, NULL, NULL); + return newConnection(nfd); +} + + +int +PlainConnection::read(char *buf, unsigned int len) +{ + int ret; + + ret = ::recv(fd, buf, len, MSG_NOSIGNAL); + return ret; +} + + +int +PlainConnection::write(char *buf, unsigned int len) +{ +} diff --git a/org.glite.lb.logger/src-nt/PluginManager.H b/org.glite.lb.logger/src-nt/PluginManager.H new file mode 100644 index 0000000..29556d5 --- /dev/null +++ b/org.glite.lb.logger/src-nt/PluginManager.H @@ -0,0 +1,66 @@ +#ifndef _PLUGIN_MANAGER_H +#define _PLUGIN_MANAGER_H + +#include + +#include + +class PluginManager { +public: + // the plugin manager instance + static PluginManager thePluginManager; + + + // base class for plugins + class Plugin { + public: + const char *name; + + Plugin(const char *aname) : name(aname) { + PluginManager::thePluginManager.registerPlugin(this, aname); + } + + virtual bool initialize() = 0; + virtual bool cleanup () = 0; + }; + + // add plugin with given name to the list of registered plugins + void registerPlugin(Plugin *plugin, const char *name) { + pluginList.push_front(plugin); + } + + // remove plugin from the list + void removePlugin(); + + // initialize all plugins on list + void initialize() { + for(std::list::iterator i = pluginList.begin(); + i != pluginList.end(); + i++) { + (*i)->initialize(); + } + + } + + // cleanup all plugins on list + void cleanup() { + for(std::list::iterator i = pluginList.begin(); + i != pluginList.end(); + i++) { + (*i)->cleanup(); + } + + } + + +private: + // list of registered plugins + std::list pluginList; + + // singleton class with private default constructor + PluginManager() : pluginList() + {}; +}; + + +#endif diff --git a/org.glite.lb.logger/src-nt/PluginManager.cpp b/org.glite.lb.logger/src-nt/PluginManager.cpp new file mode 100644 index 0000000..de9f981 --- /dev/null +++ b/org.glite.lb.logger/src-nt/PluginManager.cpp @@ -0,0 +1,4 @@ +#include "PluginManager.H" + +PluginManager PluginManager::thePluginManager; + diff --git a/org.glite.lb.logger/src-nt/SocketInput.H b/org.glite.lb.logger/src-nt/SocketInput.H new file mode 100644 index 0000000..0021816 --- /dev/null +++ b/org.glite.lb.logger/src-nt/SocketInput.H @@ -0,0 +1,32 @@ +#ifndef _SOCKET_INPUT_H +#define _SOCKET_INPUT_H + +#include "ThreadPool.H" +#include "Connection.H" +#include "Transport.H" + +#include "sys/un.h" + + +class SocketInput: public ThreadPool::WorkDescription +{ +public: + SocketInput(const char *path, + const Connection::Factory *a_cfactory, + const Transport::Factory *a_tfactory); + virtual ~SocketInput(); + + // from WorkDescription + virtual void onReady(); + virtual void onTimeout(); + virtual void onError(); + +private: + static const int SOCK_QUEUE_MAX = 5; + + struct sockaddr_un saddr; + const Connection::Factory *cFactory; + const Transport::Factory *tFactory; +}; + +#endif diff --git a/org.glite.lb.logger/src-nt/SocketInput.cpp b/org.glite.lb.logger/src-nt/SocketInput.cpp new file mode 100644 index 0000000..420ff22 --- /dev/null +++ b/org.glite.lb.logger/src-nt/SocketInput.cpp @@ -0,0 +1,67 @@ +#include +#include +#include +#include +#include + +#include "ThreadPool.H" +#include "SocketInput.H" + + + +// create unix domain socket for input +SocketInput::SocketInput(const char *path, + const Connection::Factory *a_cfactory, + const Transport::Factory *a_tfactory) + : ThreadPool::WorkDescription(0), + cFactory(a_cfactory), + tFactory(a_tfactory) +{ + memset(&saddr, 0, sizeof(saddr)); + saddr.sun_family = AF_UNIX; + strcpy(saddr.sun_path, path); + fd = socket(PF_UNIX, SOCK_STREAM, 0); + if(connect(fd, (struct sockaddr*)&saddr, sizeof(saddr.sun_path)) < 0) { + if(errno == ECONNREFUSED) { + unlink(saddr.sun_path); + } + } else { + // another instance running + // throw new Exception + } + bind(fd, (struct sockaddr *)&saddr, sizeof(saddr)); + listen(fd, SOCK_QUEUE_MAX); + ThreadPool::theThreadPool.setWorkAccept(this); +} + + +// remove the socket +SocketInput::~SocketInput() +{ + if(fd >= 0) + close(fd); + unlink(saddr.sun_path); +} + + +void +SocketInput::onReady() +{ + Connection *conn = cFactory->accept(fd); + Transport *trans = tFactory->newTransport(conn); + ThreadPool::theThreadPool.queueWorkRead(trans); +} + + +void +SocketInput::onTimeout() +{ + // nothing special, just sit around +} + + +void +SocketInput::onError() +{ + // should report an error? +} diff --git a/org.glite.lb.logger/src-nt/ThreadPool.H b/org.glite.lb.logger/src-nt/ThreadPool.H new file mode 100644 index 0000000..602b86b --- /dev/null +++ b/org.glite.lb.logger/src-nt/ThreadPool.H @@ -0,0 +1,116 @@ +#ifndef _THREAD_POOL_H +#define _THREAD_POOL_H + +#include +#include +#include + +#include + +class ThreadPool { +public: + const static int default_timeout = 5; + + class WorkDescription { + friend class ThreadPool; + public: + int fd; + + WorkDescription(int afd) + : fd(afd), event(NONE) {} + + virtual void onReady() + {} + + virtual void onTimeout() + {} + + virtual void onError() + {} + + protected: + enum Event { NONE, READY, TIMEOUT, ERROR } event; + void doWork(); + }; + +public: + static ThreadPool theThreadPool; + + static ThreadPool *getThreadPool() + { return &theThreadPool; } + + void startWorkers(unsigned int n); + void stopWorkers(); + + void postWork(WorkDescription *work_unit); + + void queueWorkAccept(WorkDescription *work_unit); + void queueWorkRead(WorkDescription *work_unit); + void queueWorkWrite(WorkDescription *work_unit); + void queueWorkTimeout(WorkDescription *work_unit); + void queueWorkConnect(WorkDescription *work_unit); + + void setWorkAccept(WorkDescription *work_unit); + void setWorkRead(WorkDescription *work_unit); + void setWorkWrite(WorkDescription *work_unit); + void setWorkTimeout(WorkDescription *work_unit); + + void run(); + void exit() + { f_exit = true; pthread_cond_signal(&wait_queue_cond_ready); } + +protected: + ThreadPool(); + ~ThreadPool(); + + WorkDescription *getWork(); + +private: + class WaitDesc { + public: + WorkDescription *wd; + short event; + bool f_permanent; + struct timeval timeout; + + WaitDesc(WorkDescription *w, short e, + bool permanent = false, int t = default_timeout) + : wd(w), event(e), f_permanent(permanent) { + timeout.tv_sec = t; + timeout.tv_usec = 0; + } + + int get_fd() { return wd->fd; }; + void adjustTimeout(const struct timeval &delta); + int timeoutExpired() { return((timeout.tv_sec < 0) || + (timeout.tv_sec == 0 && timeout.tv_usec == 0)); } + }; + +private: + bool f_exit; + int num_workers; + pthread_t *workers; + int work_count; + int wait_count; + int ufds_size; + std::list work_queue; + std::list wait_queue; + pthread_mutex_t work_queue_mutex; + pthread_cond_t work_queue_cond_ready; + pthread_cond_t work_queue_cond_full; + pthread_mutex_t wait_queue_mutex; + pthread_cond_t wait_queue_cond_ready; + struct pollfd *ufds; + int pd[2]; + struct timeval min_timeout; + + void prepareDescriptorArray(); + void removeWaitDesc(std::list::iterator &i); + void removeWorkDesc(); + void queueWork(WaitDesc *); + + static void *threadMain(void *); + static void threadCleanup(void *); +}; + +#endif diff --git a/org.glite.lb.logger/src-nt/ThreadPool.cpp b/org.glite.lb.logger/src-nt/ThreadPool.cpp new file mode 100644 index 0000000..c57998d --- /dev/null +++ b/org.glite.lb.logger/src-nt/ThreadPool.cpp @@ -0,0 +1,402 @@ +#include +#include +#include +#include +#include +#include + +#include + +#include "ThreadPool.H" +#include "Exception.H" + +ThreadPool ThreadPool::theThreadPool; + +static inline +void +tv_sub(struct timeval &a, const struct timeval &b) { + a.tv_usec -= b.tv_usec; + a.tv_sec -= b.tv_sec; + if (a.tv_usec < 0) { + a.tv_sec--; + a.tv_usec += 1000000; + } +} + + +static inline +int +tv_cmp(const struct timeval &a, const struct timeval &b) { + if(a.tv_sec < b.tv_sec) { + return -1; + } else if(a.tv_sec > b.tv_sec) { + return 1; + } else { + if (a.tv_usec < b.tv_usec) { + return -1; + } else if(a.tv_usec > b.tv_usec) { + return 1; + } else { + return 0; + } + } +} + + +inline +void +ThreadPool::WorkDescription::doWork() { + switch(event) { + case READY: + onReady(); + break; + case TIMEOUT: + onTimeout(); + break; + case ERROR: + onError(); + break; + default: + break; + } +} + + +inline +void +ThreadPool::WaitDesc::adjustTimeout(const struct timeval &delta) +{ + tv_sub(timeout, delta); +} + + +ThreadPool::ThreadPool() + : work_count(0), wait_count(0), ufds_size(0), ufds(NULL), f_exit(false) +{ + pthread_mutex_init(&wait_queue_mutex, NULL); + pthread_mutex_init(&work_queue_mutex, NULL); + pthread_cond_init(&work_queue_cond_ready, NULL); + pthread_cond_init(&work_queue_cond_full, NULL); + pthread_cond_init(&wait_queue_cond_ready, NULL); + pipe(pd); + ufds = static_cast(malloc(sizeof(struct pollfd))); + ufds->fd = pd[0]; + ufds->events = POLLIN; + ufds_size = 1; +} + + +ThreadPool::~ThreadPool() +{ + pthread_cond_destroy(&work_queue_cond_full); + pthread_cond_destroy(&work_queue_cond_ready); + pthread_cond_destroy(&wait_queue_cond_ready); + pthread_mutex_destroy(&work_queue_mutex); + pthread_mutex_destroy(&wait_queue_mutex); +} + + +void +ThreadPool::startWorkers(unsigned int n) +{ + workers = new pthread_t[n]; + + num_workers = n; + for(unsigned int i = 0; i < n; i++) { + pthread_create(&workers[i], NULL, ThreadPool::threadMain, NULL); + } +} + + +void +ThreadPool::stopWorkers() +{ + for(int i = 0; i < num_workers; i++) { + pthread_cancel(workers[i]); + pthread_join(workers[i], NULL); + } + delete[] workers; +} + + +void +ThreadPool::postWork(WorkDescription *work_unit) +{ + pthread_mutex_lock(&work_queue_mutex); + work_queue.push_back(work_unit); + work_count++; + pthread_cond_signal(&work_queue_cond_ready); + pthread_mutex_unlock(&work_queue_mutex); +} + + +inline +void +ThreadPool::queueWork(WaitDesc *wd) +{ + pthread_mutex_lock(&wait_queue_mutex); + wait_queue.push_back(wd); + wait_count++; + pthread_cond_signal(&wait_queue_cond_ready); + pthread_mutex_unlock(&wait_queue_mutex); + write(pd[1], "1", 1); +} + + +void +ThreadPool::queueWorkAccept(WorkDescription *work_unit) +{ + queueWork(new WaitDesc(work_unit, POLLIN)); +} + + +void +ThreadPool::queueWorkRead(WorkDescription *work_unit) +{ + queueWork(new WaitDesc(work_unit, POLLIN)); +} + + +void +ThreadPool::queueWorkWrite(WorkDescription *work_unit) +{ + queueWork(new WaitDesc(work_unit, POLLOUT)); +} + + +void +ThreadPool::queueWorkTimeout(WorkDescription *work_unit) +{ + queueWork(new WaitDesc(work_unit, 0)); +} + + +void +ThreadPool::queueWorkConnect(WorkDescription *work_unit) +{ + queueWork(new WaitDesc(work_unit, POLLIN)); +} + + +void +ThreadPool::setWorkAccept(WorkDescription *work_unit) +{ + queueWork(new WaitDesc(work_unit, POLLIN, true)); +} + + +void +ThreadPool::setWorkRead(WorkDescription *work_unit) +{ + queueWork(new WaitDesc(work_unit, POLLIN, true)); +} + + +void +ThreadPool::setWorkWrite(WorkDescription *work_unit) +{ + queueWork(new WaitDesc(work_unit, POLLOUT, true)); +} + + +void +ThreadPool::setWorkTimeout(WorkDescription *work_unit) +{ + queueWork(new WaitDesc(work_unit, 0, true)); +} + + +ThreadPool::WorkDescription * +ThreadPool::getWork() +{ + WorkDescription *work_unit = NULL; + struct timespec timeout; + + pthread_mutex_lock(&work_queue_mutex); + if(work_count == 0) { + timeout.tv_sec = 1; + timeout.tv_nsec = 0; +// pthread_cond_timedwait(&work_queue_cond_ready, &work_queue_mutex, &timeout); + pthread_cond_wait(&work_queue_cond_ready, &work_queue_mutex); + } + if(work_count > 0) { + work_count--; + work_unit = work_queue.front(); + work_queue.pop_front(); + } + pthread_mutex_unlock(&work_queue_mutex); + return work_unit; +} + +void +ThreadPool::threadCleanup(void *data) +{ + ThreadPool *pool = ThreadPool::getThreadPool(); + + pthread_mutex_unlock(&(pool->work_queue_mutex)); +} + + +void * +ThreadPool::threadMain(void *data) +{ + ThreadPool *pool = ThreadPool::getThreadPool(); + WorkDescription *work_unit; + + pthread_cleanup_push(ThreadPool::threadCleanup, NULL); + while(true) { + + work_unit = pool->getWork(); + if(work_unit) { + // something to work on + work_unit->doWork(); + } else { + // timed out waiting for work + } + } + pthread_cleanup_pop(1); +} + + +void +ThreadPool::removeWaitDesc(std::list::iterator &i) +{ + std::list::iterator j = i; + + // actually this is safe even for the first element + pthread_mutex_lock(&wait_queue_mutex); + j--; + wait_queue.erase(i); + wait_count--; + i = j; + pthread_mutex_unlock(&wait_queue_mutex); +} + + +void +ThreadPool::prepareDescriptorArray() +{ + std::list::iterator theIterator; + struct pollfd *p; + + pthread_mutex_lock(&wait_queue_mutex); + if(wait_count == 0) { + pthread_cond_wait(&wait_queue_cond_ready, &wait_queue_mutex); + } + if(wait_count == 0) { + pthread_mutex_unlock(&wait_queue_mutex); + return; + } + if(ufds_size != wait_count + 1) { + ufds = static_cast(realloc(ufds, (1 + wait_count) * sizeof(struct pollfd))); + if(ufds == NULL) { +// throw new Exception(); + } + ufds_size = wait_count + 1; + } + min_timeout.tv_sec = default_timeout; + min_timeout.tv_usec = 0; + for(theIterator = wait_queue.begin(), p = ufds + 1; + theIterator != wait_queue.end(); + theIterator++, p++) { + WaitDesc *w = *theIterator; + p->fd = w->get_fd(); + p->events = w->event; + if(tv_cmp(min_timeout, w->timeout) > 0) { + min_timeout = w->timeout; + } + } + pthread_mutex_unlock(&wait_queue_mutex); +} + + +void +ThreadPool::run() +{ + f_exit = false; + while(!f_exit) { + struct pollfd *p; + struct timeval before, after; + int ret; + + // may block waiting for new work + prepareDescriptorArray(); + + gettimeofday(&before, NULL); + ret = poll(ufds, ufds_size, 1000*min_timeout.tv_sec + min_timeout.tv_usec/1000); + gettimeofday(&after, NULL); + tv_sub(after, before); + + if((ret >= 0) || // ready or timeout + ((ret < 0) && (errno == EINTR))) { // interrupted + std::list::iterator i; + WaitDesc *w; + + // handle the pipe + if(ufds->revents & POLLIN) { + char discard[1]; + read(ufds->fd, discard, 1); + } + + // at least we have to adjust timeouts + pthread_mutex_lock(&wait_queue_mutex); + i = wait_queue.begin(); + pthread_mutex_unlock(&wait_queue_mutex); + // the wait queue mutex is unlocked inside the loop + // to allow handlers to add queue new + // WorkDescriptions - these are added at the + // end of the list so we should be safe + for(p = ufds + 1; p - ufds < ufds_size; p++) { + enum WorkDescription::Event event = WorkDescription::NONE; + + w = *i; + // check for consistency + if(p->fd != w->get_fd()) { + // mismatch, what shall we do? + abort(); + } + + // subtract the time passed from timeout + w->adjustTimeout(after); + + // see what happened + if(ret <= 0) { + // timeout or interrupted + if(w->timeoutExpired()) { + event = WorkDescription::TIMEOUT; + } + } else { + // ready or error + if(p->revents & POLLERR) { + event = WorkDescription::ERROR; + } else if(p->revents & w->event) { + event = WorkDescription::READY; + } else if(w->timeoutExpired()) { + event = WorkDescription::TIMEOUT; + } + } + if(event != WorkDescription::NONE) { + WorkDescription *wd; + wd = w->wd; + wd->event = event; + if(!w->f_permanent) { + postWork(wd); + removeWaitDesc(i); + delete w; + } else { + w->wd->doWork(); + // we have to reset the timeout + w->timeout.tv_sec = default_timeout; + w->timeout.tv_usec = 0; + } + } + pthread_mutex_lock(&wait_queue_mutex); + i++; + pthread_mutex_unlock(&wait_queue_mutex); + } + } else { + // some nasty error + } + } +} + diff --git a/org.glite.lb.logger/src-nt/Transport.H b/org.glite.lb.logger/src-nt/Transport.H new file mode 100644 index 0000000..974b8ac --- /dev/null +++ b/org.glite.lb.logger/src-nt/Transport.H @@ -0,0 +1,26 @@ +#ifndef _TRANSPORT_H +#define _TRANSPORT_H + +#include "Connection.H" + + +class Transport: public ThreadPool::WorkDescription { +public: + class Factory { + public: + virtual Transport *newTransport(Connection *conn) const = 0; + }; + + Transport(Connection *a_conn) + : conn(a_conn), + ThreadPool::WorkDescription(a_conn ? a_conn->getFD() : -1) + {} + + virtual ~Transport() + { if(conn) delete conn; } + +protected: + Connection *conn; + +}; +#endif diff --git a/org.glite.lb.logger/src-nt/main.cpp b/org.glite.lb.logger/src-nt/main.cpp new file mode 100644 index 0000000..fce6c0e --- /dev/null +++ b/org.glite.lb.logger/src-nt/main.cpp @@ -0,0 +1,33 @@ +#include "PluginManager.H" +#include "ThreadPool.H" +#include "SocketInput.H" +#include "PlainConnection.H" +#include "HTTPTransport.H" + +const int num_threads = 2; +const char *sock_path = "il_sock"; + +int main(int argc, char *argv[]) +{ + SocketInput *input; + + // initialize plugins + PluginManager::thePluginManager.initialize(); + + // create unix socket with plain IO and HTTP transport + input = new SocketInput(sock_path, + &PlainConnection::theFactory, + &HTTPTransport::theFactory); + + // start worker threads + ThreadPool::theThreadPool.startWorkers(num_threads); + + // run the main loop + ThreadPool::theThreadPool.run(); + + // cleanup & exit + delete input; + PluginManager::thePluginManager.cleanup(); + + return 0; +} diff --git a/org.glite.lb.logger/src-nt/test/PluginManagerTest.cpp b/org.glite.lb.logger/src-nt/test/PluginManagerTest.cpp new file mode 100644 index 0000000..e8a7b81 --- /dev/null +++ b/org.glite.lb.logger/src-nt/test/PluginManagerTest.cpp @@ -0,0 +1,58 @@ +#include + +#include "PluginManager.H" + +class TestPlugin : public PluginManager::Plugin { +public: + bool inited, cleaned; + + virtual bool initialize() { + inited = true; + } + + virtual bool cleanup() { + cleaned = true; + } + + static TestPlugin theTestPlugin; + +private: + TestPlugin() : PluginManager::Plugin("test plugin"), + inited(false), + cleaned(false) + {} + + + +}; + + +class PluginManagerTest : public CppUnit::TestFixture +{ + CPPUNIT_TEST_SUITE(PluginManagerTest); + CPPUNIT_TEST(testInit); + CPPUNIT_TEST(testClean); + CPPUNIT_TEST_SUITE_END(); + +public: + void setUp() { + } + + void tearDown() { + } + + void testInit() { + PluginManager::thePluginManager.initialize(); + CPPUNIT_ASSERT(TestPlugin::theTestPlugin.inited); + } + + void testClean() { + PluginManager::thePluginManager.cleanup(); + CPPUNIT_ASSERT(TestPlugin::theTestPlugin.cleaned); + } +}; + + +TestPlugin TestPlugin::theTestPlugin; + +CPPUNIT_TEST_SUITE_REGISTRATION( PluginManagerTest ); diff --git a/org.glite.lb.logger/src-nt/test/ThreadPoolTest.cpp b/org.glite.lb.logger/src-nt/test/ThreadPoolTest.cpp new file mode 100644 index 0000000..d61168d --- /dev/null +++ b/org.glite.lb.logger/src-nt/test/ThreadPoolTest.cpp @@ -0,0 +1,190 @@ +#include +#include +#include +#include +#include + +#include + +#include "ThreadPool.H" + +class TestWork : public ThreadPool::WorkDescription { +public: + int done; + + TestWork(int fd) : ThreadPool::WorkDescription(fd), done(0) {}; + + virtual void onReady() { + done++; + } + +}; + + +class TestConsumer : public ThreadPool::WorkDescription { +public: + char buf[2]; + + TestConsumer(int fd) : ThreadPool::WorkDescription(fd) {}; + + virtual void onReady() { + int r; + + r = read(fd, buf, 1); + buf[1] = 0; + ThreadPool::getThreadPool()->exit(); + } + + virtual void onTimeout() { + } + +}; + + +class TestProducer : public ThreadPool::WorkDescription { +public: + TestProducer(int fd) : ThreadPool::WorkDescription(fd) {}; + + virtual void onReady() { + write(fd, "a", 1); + } + + virtual void onTimeout() { + } + +}; + + +class TestSocketRead: public ThreadPool::WorkDescription { +public: + char buffer[10]; + + TestSocketRead(int fd) : ThreadPool::WorkDescription(fd) { + } + + virtual void onReady() { + + int len = recv(fd, buffer, sizeof(buffer), MSG_NOSIGNAL); + ThreadPool::getThreadPool()->exit(); + } + + virtual void onError() { + } +}; + + +class TestSocketWrite: public ThreadPool::WorkDescription { +public: + static char buffer[]; + + TestSocketWrite(const char *name) + : ThreadPool::WorkDescription(0) { + struct sockaddr_un saddr; + int ret; + fd = socket(PF_UNIX, SOCK_STREAM, 0); + memset(&saddr, 0, sizeof(saddr)); + saddr.sun_family = AF_UNIX; + strcpy(saddr.sun_path, name); + if((ret = connect(fd, (struct sockaddr *)&saddr, sizeof(saddr))) < 0) { + } + } + + virtual void onReady() { + int ret; + + ret = send(fd, buffer, strlen(buffer)+1, MSG_NOSIGNAL); + close(fd); + } + +}; + +char TestSocketWrite::buffer[] = "ahoj"; + +class TestSocketAccept : public ThreadPool::WorkDescription { +public: + TestSocketRead *reader; + + TestSocketAccept(const char *name) + : ThreadPool::WorkDescription(0) { + struct sockaddr_un saddr; + + fd = socket(PF_UNIX, SOCK_STREAM, 0); + memset(&saddr, 0, sizeof(saddr)); + saddr.sun_family = AF_UNIX; + strcpy(saddr.sun_path, name); + bind(fd, (struct sockaddr *)&saddr, sizeof(saddr)); + listen(fd, 1); + } + + virtual void onReady() { + int nfd; + + nfd = accept(fd, NULL, NULL); + if(nfd < 0) { + } else { + ThreadPool *pool = ThreadPool::getThreadPool(); + + reader = new TestSocketRead(nfd); + pool->queueWorkRead(reader); + } + } +}; + + +class ThreadPoolTest: public CppUnit::TestFixture +{ + CPPUNIT_TEST_SUITE( ThreadPoolTest ); +// CPPUNIT_TEST( testWorkQueue ); + CPPUNIT_TEST( testPoll ); + CPPUNIT_TEST( testAccept ); + CPPUNIT_TEST_SUITE_END(); + +public: + void setUp() { + pool = ThreadPool::getThreadPool(); + unlink("/tmp/smazat.sock"); + pool->startWorkers(2); + } + + void tearDown() { + pool->stopWorkers(); + } + + void testWorkQueue() { + TestWork *work_unit = new TestWork(0); + pool->postWork(work_unit); + } + + void testPoll() { + int fd[2]; + TestProducer *p = new TestProducer(0); + TestConsumer *c = new TestConsumer(0); + + pipe(fd); + c->fd = fd[0]; + p->fd = fd[1]; + pool->queueWorkRead(c); + pool->queueWorkWrite(p); + pool->run(); + CPPUNIT_ASSERT(c->buf[0] == 'a'); + CPPUNIT_ASSERT(c->buf[1] == 0); + } + + void testAccept() { + TestSocketAccept *consumer = new TestSocketAccept("/tmp/smazat.sock"); + TestSocketWrite *producer; + + pool->queueWorkAccept(consumer); + producer = new TestSocketWrite("/tmp/smazat.sock"); + ThreadPool::getThreadPool()->queueWorkWrite(producer); + pool->run(); + CPPUNIT_ASSERT(consumer->reader != NULL); + CPPUNIT_ASSERT(strcmp(consumer->reader->buffer, TestSocketWrite::buffer) == 0); + } + +private: + ThreadPool *pool; +}; + + +CPPUNIT_TEST_SUITE_REGISTRATION( ThreadPoolTest ); diff --git a/org.glite.lb.logger/src-nt/test/test_main.cpp b/org.glite.lb.logger/src-nt/test/test_main.cpp new file mode 100644 index 0000000..1ae6d6a --- /dev/null +++ b/org.glite.lb.logger/src-nt/test/test_main.cpp @@ -0,0 +1,32 @@ +#include +#include + +#include +#include +//#include +#include +#include +#include + +int main (int argc,const char *argv[]) +{ + CppUnit::Test *suite = CppUnit::TestFactoryRegistry::getRegistry().makeTest(); + +// assert(argc == 2); +// std::ofstream xml(argv[1]); + + CppUnit::TestResult controller; + CppUnit::TestResultCollector result; + controller.addListener( &result ); + + CppUnit::TestRunner runner; + runner.addTest(suite); + runner.run(controller); + +// CppUnit::XmlOutputter xout( &result, xml ); + CppUnit::CompilerOutputter tout( &result, std::cout); +// xout.write(); + tout.write(); + + return result.wasSuccessful() ? 0 : 1 ; +} -- 1.8.2.3