* import of new interlogger skeleton
authorMichal Voců <michal@ruk.cuni.cz>
Tue, 6 Mar 2007 11:19:30 +0000 (11:19 +0000)
committerMichal Voců <michal@ruk.cuni.cz>
Tue, 6 Mar 2007 11:19:30 +0000 (11:19 +0000)
17 files changed:
org.glite.lb.logger/src-nt/Connection.H [new file with mode: 0644]
org.glite.lb.logger/src-nt/HTTPTransport.H [new file with mode: 0644]
org.glite.lb.logger/src-nt/HTTPTransport.cpp [new file with mode: 0644]
org.glite.lb.logger/src-nt/Makefile [new file with mode: 0644]
org.glite.lb.logger/src-nt/PlainConnection.H [new file with mode: 0644]
org.glite.lb.logger/src-nt/PlainConnection.cpp [new file with mode: 0644]
org.glite.lb.logger/src-nt/PluginManager.H [new file with mode: 0644]
org.glite.lb.logger/src-nt/PluginManager.cpp [new file with mode: 0644]
org.glite.lb.logger/src-nt/SocketInput.H [new file with mode: 0644]
org.glite.lb.logger/src-nt/SocketInput.cpp [new file with mode: 0644]
org.glite.lb.logger/src-nt/ThreadPool.H [new file with mode: 0644]
org.glite.lb.logger/src-nt/ThreadPool.cpp [new file with mode: 0644]
org.glite.lb.logger/src-nt/Transport.H [new file with mode: 0644]
org.glite.lb.logger/src-nt/main.cpp [new file with mode: 0644]
org.glite.lb.logger/src-nt/test/PluginManagerTest.cpp [new file with mode: 0644]
org.glite.lb.logger/src-nt/test/ThreadPoolTest.cpp [new file with mode: 0644]
org.glite.lb.logger/src-nt/test/test_main.cpp [new file with mode: 0644]

diff --git a/org.glite.lb.logger/src-nt/Connection.H b/org.glite.lb.logger/src-nt/Connection.H
new file mode 100644 (file)
index 0000000..692c019
--- /dev/null
@@ -0,0 +1,32 @@
+#ifndef _CONNECTION_H
+#define _CONNECTION_H
+
+
+class Connection {
+public:
+       class Factory {
+       public:
+               virtual Connection *newConnection(int fd) const = 0;
+               virtual Connection *accept(int fd) const = 0;
+       };
+
+       class Endpoint {
+       };
+
+       Connection(int a_fd) : fd(a_fd)
+               {}
+
+       virtual ~Connection() 
+               {}
+
+       virtual int getFD() const 
+               { return fd; }
+
+       virtual int read(char *buf, unsigned int len) = 0;
+       virtual int write(char *buf, unsigned int len) = 0;
+
+protected:
+       int fd;
+};
+
+#endif
diff --git a/org.glite.lb.logger/src-nt/HTTPTransport.H b/org.glite.lb.logger/src-nt/HTTPTransport.H
new file mode 100644 (file)
index 0000000..0148254
--- /dev/null
@@ -0,0 +1,34 @@
+#ifndef _HTTP_TRANSPORT_H
+#define _HTTP_TRANSPORT_H
+
+#include "ThreadPool.H"
+#include "Transport.H"
+
+class HTTPTransport: 
+       public Transport
+{
+public:
+       class Factory: public Transport::Factory {
+       public:
+               virtual Transport *newTransport(Connection *conn) const {
+                       return(new HTTPTransport(conn));
+               }
+       };
+
+       static Factory theFactory;
+
+       HTTPTransport(Connection *conn) 
+               : Transport(conn)
+               {}
+
+       virtual ~HTTPTransport();
+
+       // from ThreadPool::WorkDescription
+       virtual void onReady();
+       virtual void onTimeout();
+       virtual void onError();
+
+};
+
+
+#endif
diff --git a/org.glite.lb.logger/src-nt/HTTPTransport.cpp b/org.glite.lb.logger/src-nt/HTTPTransport.cpp
new file mode 100644 (file)
index 0000000..afd4df1
--- /dev/null
@@ -0,0 +1,42 @@
+#include "HTTPTransport.H"
+
+#include <iostream>
+
+
+HTTPTransport::Factory HTTPTransport::theFactory;
+
+
+HTTPTransport::~HTTPTransport()
+{
+}
+
+
+void
+HTTPTransport::onReady()
+{
+       char buffer[256];
+       int len;
+
+       len = conn->read(buffer, sizeof(buffer));
+       if(len < 0) {
+               std::cout << "error on receive - closing connection" << std::endl;
+       } else if ( len > 0) {
+               std::cout.write(buffer, len);
+               std::cout.flush();
+               ThreadPool::theThreadPool.queueWorkRead(this);
+       } else {
+               std::cout << "no more data" << std::endl;
+       }
+}
+
+
+void 
+HTTPTransport::onTimeout()
+{
+}
+
+
+void 
+HTTPTransport::onError()
+{
+}
diff --git a/org.glite.lb.logger/src-nt/Makefile b/org.glite.lb.logger/src-nt/Makefile
new file mode 100644 (file)
index 0000000..abd2193
--- /dev/null
@@ -0,0 +1,21 @@
+
+CXX = c++
+CC = gcc
+
+CFLAGS = -g -Wall
+
+COMPILEPP = $(CXX) $(CXXFLAGS) $(CFLAGS)
+COMPILE = $(CC) $(CFLAGS)
+LINK = c++ $(LDFLAGS)
+
+THREAD_LIB = -lpthread
+
+
+plain: SocketInput.o PlainConnection.o HTTPTransport.o PluginManager.o ThreadPool.o main.o
+       $(LINK) -o $@ $+ $(THREAD_LIB)
+
+%.o: %.cpp
+       $(COMPILEPP) -o $@ -c $<
+
+%.o: %.c
+       $(COMPILE) -o $@ -c $<
\ No newline at end of file
diff --git a/org.glite.lb.logger/src-nt/PlainConnection.H b/org.glite.lb.logger/src-nt/PlainConnection.H
new file mode 100644 (file)
index 0000000..3551286
--- /dev/null
@@ -0,0 +1,33 @@
+#ifndef _PLAIN_CONNECTION_H
+#define _PLAIN_CONNECTION_H
+
+#include "Connection.H"
+
+
+class PlainConnection: 
+       public Connection
+{
+public:
+       class Factory: public Connection::Factory {
+       public:
+               virtual Connection *newConnection(int fd) const {
+                       return new PlainConnection(fd);
+               }
+
+               virtual Connection *accept(int fd) const;
+       };
+
+       static Factory theFactory;
+
+       PlainConnection(int a_fd): Connection(a_fd)
+               {}
+
+       virtual ~PlainConnection();
+
+       // from Connection
+       virtual int read(char *buf, unsigned int len);
+       virtual int write(char *buf, unsigned int len);
+
+};
+
+#endif
diff --git a/org.glite.lb.logger/src-nt/PlainConnection.cpp b/org.glite.lb.logger/src-nt/PlainConnection.cpp
new file mode 100644 (file)
index 0000000..156ec6e
--- /dev/null
@@ -0,0 +1,38 @@
+#include "PlainConnection.H"
+#include "ThreadPool.H"
+
+#include <sys/types.h>
+#include <sys/socket.h>
+
+PlainConnection::Factory PlainConnection::theFactory;
+
+
+PlainConnection::~PlainConnection()
+{
+}
+
+
+Connection *
+PlainConnection::Factory::accept(int fd) const
+{
+       int nfd;
+
+       nfd = ::accept(fd, NULL, NULL);
+       return newConnection(nfd);
+}
+
+
+int
+PlainConnection::read(char *buf, unsigned int len)
+{
+       int ret;
+
+       ret = ::recv(fd, buf, len, MSG_NOSIGNAL);
+       return ret;
+}
+
+
+int 
+PlainConnection::write(char *buf, unsigned int len)
+{
+}
diff --git a/org.glite.lb.logger/src-nt/PluginManager.H b/org.glite.lb.logger/src-nt/PluginManager.H
new file mode 100644 (file)
index 0000000..29556d5
--- /dev/null
@@ -0,0 +1,66 @@
+#ifndef _PLUGIN_MANAGER_H
+#define _PLUGIN_MANAGER_H
+
+#include <list>
+
+#include <iostream>
+
+class PluginManager {
+public:
+       // the plugin manager instance
+       static PluginManager thePluginManager;
+       
+       
+       // base class for plugins
+       class Plugin {
+       public:
+               const char *name;
+
+               Plugin(const char *aname) : name(aname) { 
+                       PluginManager::thePluginManager.registerPlugin(this, aname); 
+               }
+
+               virtual bool initialize() = 0;
+               virtual bool cleanup () = 0;
+       };
+
+       // add plugin with given name to the list of registered plugins
+       void registerPlugin(Plugin *plugin, const char *name) { 
+               pluginList.push_front(plugin);
+       }
+
+       // remove plugin from the list
+       void removePlugin();
+
+       // initialize all plugins on list
+       void initialize() {
+               for(std::list<Plugin *>::iterator i = pluginList.begin();
+                   i != pluginList.end();
+                   i++) {
+                       (*i)->initialize();
+               }
+               
+       }
+
+       // cleanup all plugins on list
+       void cleanup() {
+               for(std::list<Plugin *>::iterator i = pluginList.begin();
+                   i != pluginList.end();
+                   i++) {
+                       (*i)->cleanup();
+               }
+               
+       }
+               
+
+private:
+       // list of registered plugins
+       std::list<Plugin *> pluginList;
+
+       // singleton class with private default constructor
+       PluginManager() : pluginList()
+               {};
+};
+
+
+#endif
diff --git a/org.glite.lb.logger/src-nt/PluginManager.cpp b/org.glite.lb.logger/src-nt/PluginManager.cpp
new file mode 100644 (file)
index 0000000..de9f981
--- /dev/null
@@ -0,0 +1,4 @@
+#include "PluginManager.H"
+
+PluginManager PluginManager::thePluginManager;
+
diff --git a/org.glite.lb.logger/src-nt/SocketInput.H b/org.glite.lb.logger/src-nt/SocketInput.H
new file mode 100644 (file)
index 0000000..0021816
--- /dev/null
@@ -0,0 +1,32 @@
+#ifndef _SOCKET_INPUT_H
+#define _SOCKET_INPUT_H
+
+#include "ThreadPool.H"
+#include "Connection.H"
+#include "Transport.H"
+
+#include "sys/un.h"
+
+
+class SocketInput: public ThreadPool::WorkDescription
+{
+public:
+       SocketInput(const char *path, 
+                   const Connection::Factory *a_cfactory,
+                   const Transport::Factory *a_tfactory);
+       virtual ~SocketInput();
+
+       // from WorkDescription
+       virtual void onReady();
+       virtual void onTimeout();
+       virtual void onError();
+
+private:
+       static const int SOCK_QUEUE_MAX = 5;
+
+       struct sockaddr_un saddr;
+       const Connection::Factory *cFactory;
+       const Transport::Factory *tFactory;
+};
+
+#endif
diff --git a/org.glite.lb.logger/src-nt/SocketInput.cpp b/org.glite.lb.logger/src-nt/SocketInput.cpp
new file mode 100644 (file)
index 0000000..420ff22
--- /dev/null
@@ -0,0 +1,67 @@
+#include <unistd.h>
+#include <sys/socket.h>
+#include <sys/types.h>
+#include <sys/un.h>
+#include <errno.h>
+
+#include "ThreadPool.H"
+#include "SocketInput.H"
+
+
+
+// create unix domain socket for input
+SocketInput::SocketInput(const char *path, 
+                        const Connection::Factory *a_cfactory,
+                        const Transport::Factory *a_tfactory)
+       : ThreadPool::WorkDescription(0),
+         cFactory(a_cfactory),
+         tFactory(a_tfactory)
+{
+       memset(&saddr, 0, sizeof(saddr));
+       saddr.sun_family = AF_UNIX;
+       strcpy(saddr.sun_path, path);
+       fd = socket(PF_UNIX, SOCK_STREAM, 0);
+       if(connect(fd, (struct sockaddr*)&saddr, sizeof(saddr.sun_path)) < 0) {
+               if(errno == ECONNREFUSED) {
+                       unlink(saddr.sun_path);
+               }
+       } else {
+               // another instance running
+               // throw new Exception
+       }
+       bind(fd, (struct sockaddr *)&saddr, sizeof(saddr));
+       listen(fd, SOCK_QUEUE_MAX);
+       ThreadPool::theThreadPool.setWorkAccept(this);
+}
+
+
+// remove the socket
+SocketInput::~SocketInput()
+{
+       if(fd >= 0)
+               close(fd);
+       unlink(saddr.sun_path);
+}
+
+
+void
+SocketInput::onReady()
+{
+       Connection *conn = cFactory->accept(fd);
+       Transport  *trans = tFactory->newTransport(conn);
+       ThreadPool::theThreadPool.queueWorkRead(trans);
+}
+
+
+void
+SocketInput::onTimeout()
+{
+       // nothing special, just sit around
+}
+
+
+void
+SocketInput::onError()
+{
+       // should report an error?
+}
diff --git a/org.glite.lb.logger/src-nt/ThreadPool.H b/org.glite.lb.logger/src-nt/ThreadPool.H
new file mode 100644 (file)
index 0000000..602b86b
--- /dev/null
@@ -0,0 +1,116 @@
+#ifndef _THREAD_POOL_H
+#define _THREAD_POOL_H
+
+#include <pthread.h>
+#include <poll.h>
+#include <time.h>
+
+#include <list>
+
+class ThreadPool {
+public:
+       const static int default_timeout = 5;
+
+       class WorkDescription {
+               friend class ThreadPool;
+       public:
+               int fd;
+               
+               WorkDescription(int afd) 
+                       : fd(afd), event(NONE) {}
+
+               virtual void onReady() 
+                       {}
+
+               virtual void onTimeout() 
+                       {}
+
+               virtual void onError() 
+                       {}
+
+       protected:
+               enum Event { NONE, READY, TIMEOUT, ERROR } event;
+               void doWork();
+       };
+
+public:
+       static ThreadPool theThreadPool;
+
+       static ThreadPool *getThreadPool() 
+               { return &theThreadPool; }
+
+       void startWorkers(unsigned int n);
+       void stopWorkers();
+
+       void postWork(WorkDescription *work_unit);
+
+       void queueWorkAccept(WorkDescription *work_unit);
+       void queueWorkRead(WorkDescription *work_unit);
+       void queueWorkWrite(WorkDescription *work_unit);
+       void queueWorkTimeout(WorkDescription *work_unit);
+       void queueWorkConnect(WorkDescription *work_unit);
+
+       void setWorkAccept(WorkDescription *work_unit);
+       void setWorkRead(WorkDescription *work_unit);
+       void setWorkWrite(WorkDescription *work_unit);
+       void setWorkTimeout(WorkDescription *work_unit);
+
+       void run();
+       void exit() 
+               { f_exit = true; pthread_cond_signal(&wait_queue_cond_ready); }
+
+protected:
+       ThreadPool();
+       ~ThreadPool();
+
+       WorkDescription *getWork();
+
+private:
+       class WaitDesc {
+       public:
+               WorkDescription *wd;
+               short event;
+               bool f_permanent;
+               struct timeval timeout;
+
+               WaitDesc(WorkDescription *w, short e, 
+                        bool permanent = false, int t = default_timeout) 
+                       : wd(w), event(e), f_permanent(permanent) {
+                       timeout.tv_sec = t;
+                       timeout.tv_usec = 0;
+               }
+               
+               int get_fd() { return wd->fd; };
+               void adjustTimeout(const struct timeval &delta);
+               int timeoutExpired() { return((timeout.tv_sec < 0) ||
+                                             (timeout.tv_sec == 0 && timeout.tv_usec == 0)); }
+       };
+
+private:
+       bool f_exit;
+       int num_workers;
+       pthread_t *workers;
+       int work_count;
+       int wait_count;
+       int ufds_size;
+       std::list<WorkDescription *> work_queue;
+       std::list<WaitDesc *> wait_queue;
+       pthread_mutex_t work_queue_mutex;
+       pthread_cond_t work_queue_cond_ready;
+       pthread_cond_t work_queue_cond_full;
+       pthread_mutex_t wait_queue_mutex;
+       pthread_cond_t wait_queue_cond_ready;
+       struct pollfd *ufds;
+       int pd[2];
+       struct timeval min_timeout;
+
+       void prepareDescriptorArray();
+       void removeWaitDesc(std::list<WaitDesc *>::iterator &i);
+       void removeWorkDesc();
+       void queueWork(WaitDesc *);
+
+       static void *threadMain(void *);
+       static void threadCleanup(void *);
+};
+
+#endif
diff --git a/org.glite.lb.logger/src-nt/ThreadPool.cpp b/org.glite.lb.logger/src-nt/ThreadPool.cpp
new file mode 100644 (file)
index 0000000..c57998d
--- /dev/null
@@ -0,0 +1,402 @@
+#include <time.h>
+#include <pthread.h>
+#include <poll.h>
+#include <sys/time.h>
+#include <time.h>
+#include <stdlib.h>
+
+#include <iostream>
+
+#include "ThreadPool.H"
+#include "Exception.H"
+
+ThreadPool ThreadPool::theThreadPool;
+
+static inline
+void
+tv_sub(struct timeval &a, const struct timeval &b) {
+        a.tv_usec -= b.tv_usec;                
+        a.tv_sec -= b.tv_sec;
+        if (a.tv_usec < 0) {
+                a.tv_sec--;
+                a.tv_usec += 1000000;
+        }
+}
+
+
+static inline
+int 
+tv_cmp(const struct timeval &a, const struct timeval &b) {
+       if(a.tv_sec < b.tv_sec) {
+               return -1;
+       } else  if(a.tv_sec > b.tv_sec) {
+               return 1;
+       } else {
+               if (a.tv_usec < b.tv_usec) {
+                       return -1;
+               } else if(a.tv_usec > b.tv_usec) {
+                       return 1;
+               } else {
+                       return 0;
+               }
+       } 
+}
+
+
+inline
+void 
+ThreadPool::WorkDescription::doWork() {
+       switch(event) {
+       case READY:
+               onReady();
+               break;
+       case TIMEOUT:
+               onTimeout();
+               break;
+       case ERROR:
+               onError();
+               break;
+       default:
+               break;
+       }
+}
+
+
+inline
+void
+ThreadPool::WaitDesc::adjustTimeout(const struct timeval &delta)
+{
+       tv_sub(timeout, delta);
+}
+
+
+ThreadPool::ThreadPool() 
+       : work_count(0), wait_count(0), ufds_size(0), ufds(NULL), f_exit(false)
+{
+       pthread_mutex_init(&wait_queue_mutex, NULL);
+       pthread_mutex_init(&work_queue_mutex, NULL);
+       pthread_cond_init(&work_queue_cond_ready, NULL);
+       pthread_cond_init(&work_queue_cond_full, NULL);
+       pthread_cond_init(&wait_queue_cond_ready, NULL);
+       pipe(pd);
+       ufds = static_cast<struct pollfd *>(malloc(sizeof(struct pollfd)));
+       ufds->fd = pd[0];
+       ufds->events = POLLIN;
+       ufds_size = 1;
+}
+
+
+ThreadPool::~ThreadPool()
+{
+       pthread_cond_destroy(&work_queue_cond_full);
+       pthread_cond_destroy(&work_queue_cond_ready);
+       pthread_cond_destroy(&wait_queue_cond_ready);
+       pthread_mutex_destroy(&work_queue_mutex);
+       pthread_mutex_destroy(&wait_queue_mutex);
+}
+
+
+void
+ThreadPool::startWorkers(unsigned int n) 
+{
+       workers = new pthread_t[n];
+
+       num_workers = n;
+       for(unsigned int i = 0; i < n; i++) {
+               pthread_create(&workers[i], NULL, ThreadPool::threadMain, NULL);
+       }
+}
+
+
+void 
+ThreadPool::stopWorkers()
+{
+       for(int i = 0; i < num_workers; i++) {
+               pthread_cancel(workers[i]);
+               pthread_join(workers[i], NULL);
+       }
+       delete[] workers;
+}
+
+
+void 
+ThreadPool::postWork(WorkDescription *work_unit)
+{
+       pthread_mutex_lock(&work_queue_mutex);
+       work_queue.push_back(work_unit);
+       work_count++;
+       pthread_cond_signal(&work_queue_cond_ready);
+       pthread_mutex_unlock(&work_queue_mutex);
+}
+
+
+inline
+void 
+ThreadPool::queueWork(WaitDesc *wd)
+{
+       pthread_mutex_lock(&wait_queue_mutex);
+       wait_queue.push_back(wd);
+       wait_count++;
+       pthread_cond_signal(&wait_queue_cond_ready);
+       pthread_mutex_unlock(&wait_queue_mutex);
+       write(pd[1], "1", 1);
+}
+
+
+void 
+ThreadPool::queueWorkAccept(WorkDescription *work_unit) 
+{
+       queueWork(new WaitDesc(work_unit, POLLIN));
+}
+       
+
+void 
+ThreadPool::queueWorkRead(WorkDescription *work_unit) 
+{
+       queueWork(new WaitDesc(work_unit, POLLIN));
+}
+
+
+void
+ThreadPool::queueWorkWrite(WorkDescription *work_unit) 
+{
+       queueWork(new WaitDesc(work_unit, POLLOUT));
+}
+       
+
+void 
+ThreadPool::queueWorkTimeout(WorkDescription *work_unit) 
+{
+       queueWork(new WaitDesc(work_unit, 0));
+}
+
+
+void 
+ThreadPool::queueWorkConnect(WorkDescription *work_unit) 
+{
+       queueWork(new WaitDesc(work_unit, POLLIN));
+}
+
+
+void 
+ThreadPool::setWorkAccept(WorkDescription *work_unit) 
+{
+       queueWork(new WaitDesc(work_unit, POLLIN, true));
+}
+       
+
+void 
+ThreadPool::setWorkRead(WorkDescription *work_unit) 
+{
+       queueWork(new WaitDesc(work_unit, POLLIN, true));
+}
+
+
+void
+ThreadPool::setWorkWrite(WorkDescription *work_unit) 
+{
+       queueWork(new WaitDesc(work_unit, POLLOUT, true));
+}
+       
+
+void 
+ThreadPool::setWorkTimeout(WorkDescription *work_unit) 
+{
+       queueWork(new WaitDesc(work_unit, 0, true));
+}
+
+
+ThreadPool::WorkDescription *
+ThreadPool::getWork()
+{
+       WorkDescription *work_unit = NULL;
+       struct timespec timeout;
+
+       pthread_mutex_lock(&work_queue_mutex);
+       if(work_count == 0) {
+               timeout.tv_sec = 1;
+               timeout.tv_nsec = 0;
+//             pthread_cond_timedwait(&work_queue_cond_ready, &work_queue_mutex, &timeout);
+               pthread_cond_wait(&work_queue_cond_ready, &work_queue_mutex);
+       }
+       if(work_count > 0) {
+               work_count--;
+               work_unit = work_queue.front();
+               work_queue.pop_front();
+       }
+       pthread_mutex_unlock(&work_queue_mutex);
+       return work_unit;
+}
+
+void
+ThreadPool::threadCleanup(void *data)
+{
+       ThreadPool *pool = ThreadPool::getThreadPool();
+
+       pthread_mutex_unlock(&(pool->work_queue_mutex));
+}
+
+
+void *
+ThreadPool::threadMain(void *data)
+{
+       ThreadPool *pool  = ThreadPool::getThreadPool();
+       WorkDescription *work_unit;
+
+       pthread_cleanup_push(ThreadPool::threadCleanup, NULL); 
+       while(true) {
+
+               work_unit = pool->getWork();
+               if(work_unit) {
+                       // something to work on
+                       work_unit->doWork();
+               } else {
+                       // timed out waiting for work
+               }
+       }
+       pthread_cleanup_pop(1);
+}
+
+
+void 
+ThreadPool::removeWaitDesc(std::list<WaitDesc *>::iterator &i)
+{
+       std::list<WaitDesc *>::iterator j = i;
+       
+       // actually this is safe even for the first element
+       pthread_mutex_lock(&wait_queue_mutex);
+       j--;
+       wait_queue.erase(i);
+       wait_count--;
+       i = j;
+       pthread_mutex_unlock(&wait_queue_mutex);
+}
+
+
+void
+ThreadPool::prepareDescriptorArray()
+{
+       std::list<WaitDesc *>::iterator theIterator;
+       struct pollfd *p;
+
+       pthread_mutex_lock(&wait_queue_mutex);
+       if(wait_count == 0) {
+               pthread_cond_wait(&wait_queue_cond_ready, &wait_queue_mutex);
+       }
+       if(wait_count == 0) {
+               pthread_mutex_unlock(&wait_queue_mutex);
+               return;
+       }
+       if(ufds_size != wait_count + 1) {
+               ufds = static_cast<struct pollfd *>(realloc(ufds, (1 + wait_count) * sizeof(struct pollfd)));
+               if(ufds == NULL) {
+//                     throw new Exception();
+               }
+               ufds_size = wait_count + 1;
+       }
+       min_timeout.tv_sec = default_timeout;
+       min_timeout.tv_usec = 0;
+       for(theIterator = wait_queue.begin(), p = ufds + 1;
+           theIterator != wait_queue.end(); 
+           theIterator++, p++) {
+               WaitDesc *w = *theIterator;
+               p->fd = w->get_fd();
+               p->events = w->event;
+               if(tv_cmp(min_timeout, w->timeout) > 0) {
+                       min_timeout = w->timeout;
+               }
+       }
+       pthread_mutex_unlock(&wait_queue_mutex);
+}
+
+
+void
+ThreadPool::run()
+{
+       f_exit = false;
+       while(!f_exit) {
+               struct pollfd *p;
+               struct timeval before, after;
+               int ret;
+
+               // may block waiting for new work
+               prepareDescriptorArray();
+
+               gettimeofday(&before, NULL);
+               ret = poll(ufds, ufds_size, 1000*min_timeout.tv_sec + min_timeout.tv_usec/1000);
+               gettimeofday(&after, NULL);
+               tv_sub(after, before);
+
+               if((ret >= 0) || // ready or timeout
+                  ((ret < 0) && (errno == EINTR))) { // interrupted 
+                       std::list<WaitDesc *>::iterator i;
+                       WaitDesc *w;
+
+                       // handle the pipe
+                       if(ufds->revents & POLLIN) {
+                               char discard[1];
+                               read(ufds->fd, discard, 1);
+                       }
+
+                       // at least we have to adjust timeouts
+                       pthread_mutex_lock(&wait_queue_mutex);
+                       i = wait_queue.begin();
+                       pthread_mutex_unlock(&wait_queue_mutex);
+                       // the wait queue mutex is unlocked inside the loop
+                       // to allow handlers to add queue new
+                       // WorkDescriptions - these are added at the
+                       // end of the list so we should be safe 
+                       for(p = ufds + 1; p - ufds < ufds_size; p++) {
+                               enum WorkDescription::Event event = WorkDescription::NONE;
+                               
+                               w = *i;
+                               // check for consistency
+                               if(p->fd != w->get_fd()) {
+                                       // mismatch, what shall we do?
+                                       abort();
+                               }
+
+                               // subtract the time passed from timeout
+                               w->adjustTimeout(after);
+
+                               // see what happened
+                               if(ret <= 0) {
+                                       // timeout or interrupted
+                                       if(w->timeoutExpired()) {
+                                               event = WorkDescription::TIMEOUT;
+                                       }
+                               } else {
+                                       // ready or error
+                                       if(p->revents & POLLERR) {
+                                               event = WorkDescription::ERROR;
+                                       } else if(p->revents & w->event) {
+                                               event = WorkDescription::READY;
+                                       } else if(w->timeoutExpired()) {
+                                               event = WorkDescription::TIMEOUT;
+                                       }
+                               }
+                               if(event != WorkDescription::NONE) {
+                                       WorkDescription *wd;
+                                       wd = w->wd;
+                                       wd->event = event;
+                                       if(!w->f_permanent) {
+                                               postWork(wd);
+                                               removeWaitDesc(i);
+                                               delete w;
+                                       } else {
+                                               w->wd->doWork();
+                                               // we have to reset the timeout
+                                               w->timeout.tv_sec = default_timeout;
+                                               w->timeout.tv_usec = 0;
+                                       }
+                               }
+                               pthread_mutex_lock(&wait_queue_mutex);
+                               i++;
+                               pthread_mutex_unlock(&wait_queue_mutex);
+                       }
+               } else {
+                       // some nasty error
+               }
+       }
+}
+
diff --git a/org.glite.lb.logger/src-nt/Transport.H b/org.glite.lb.logger/src-nt/Transport.H
new file mode 100644 (file)
index 0000000..974b8ac
--- /dev/null
@@ -0,0 +1,26 @@
+#ifndef _TRANSPORT_H
+#define _TRANSPORT_H
+
+#include "Connection.H"
+
+
+class Transport: public ThreadPool::WorkDescription {
+public:
+       class Factory {
+       public:
+               virtual Transport *newTransport(Connection *conn) const = 0;
+       };
+
+       Transport(Connection *a_conn) 
+               : conn(a_conn),
+                 ThreadPool::WorkDescription(a_conn ? a_conn->getFD() : -1)
+               {}
+
+       virtual ~Transport() 
+               { if(conn) delete conn; }
+
+protected:
+       Connection *conn;
+
+};
+#endif
diff --git a/org.glite.lb.logger/src-nt/main.cpp b/org.glite.lb.logger/src-nt/main.cpp
new file mode 100644 (file)
index 0000000..fce6c0e
--- /dev/null
@@ -0,0 +1,33 @@
+#include "PluginManager.H"
+#include "ThreadPool.H"
+#include "SocketInput.H"
+#include "PlainConnection.H"
+#include "HTTPTransport.H"
+
+const int num_threads = 2;
+const char *sock_path = "il_sock";
+
+int main(int argc, char *argv[])
+{
+       SocketInput *input;
+       
+       // initialize plugins
+       PluginManager::thePluginManager.initialize();
+
+       // create unix socket with plain IO and HTTP transport
+       input = new SocketInput(sock_path, 
+                               &PlainConnection::theFactory, 
+                               &HTTPTransport::theFactory);
+
+       // start worker threads
+       ThreadPool::theThreadPool.startWorkers(num_threads);
+
+       // run the main loop
+       ThreadPool::theThreadPool.run();
+
+       // cleanup & exit
+       delete input;
+       PluginManager::thePluginManager.cleanup();
+
+       return 0;
+}
diff --git a/org.glite.lb.logger/src-nt/test/PluginManagerTest.cpp b/org.glite.lb.logger/src-nt/test/PluginManagerTest.cpp
new file mode 100644 (file)
index 0000000..e8a7b81
--- /dev/null
@@ -0,0 +1,58 @@
+#include <cppunit/extensions/HelperMacros.h>
+
+#include "PluginManager.H"
+
+class TestPlugin : public PluginManager::Plugin {
+public:
+       bool inited, cleaned;
+
+       virtual bool initialize() {
+               inited = true;
+       }
+
+       virtual bool cleanup() {
+               cleaned = true;
+       }
+
+       static TestPlugin theTestPlugin;
+
+private:
+       TestPlugin() : PluginManager::Plugin("test plugin"),
+                      inited(false),
+                      cleaned(false) 
+               {}
+
+
+
+};
+
+
+class PluginManagerTest : public CppUnit::TestFixture 
+{
+       CPPUNIT_TEST_SUITE(PluginManagerTest);
+       CPPUNIT_TEST(testInit);
+       CPPUNIT_TEST(testClean);
+       CPPUNIT_TEST_SUITE_END();
+       
+public:
+       void setUp() {
+       }
+
+       void tearDown() {
+       }
+
+       void testInit() {
+               PluginManager::thePluginManager.initialize();
+               CPPUNIT_ASSERT(TestPlugin::theTestPlugin.inited);
+       }
+       
+       void testClean() {
+               PluginManager::thePluginManager.cleanup();
+               CPPUNIT_ASSERT(TestPlugin::theTestPlugin.cleaned);
+       }
+};
+
+
+TestPlugin TestPlugin::theTestPlugin;
+
+CPPUNIT_TEST_SUITE_REGISTRATION( PluginManagerTest );
diff --git a/org.glite.lb.logger/src-nt/test/ThreadPoolTest.cpp b/org.glite.lb.logger/src-nt/test/ThreadPoolTest.cpp
new file mode 100644 (file)
index 0000000..d61168d
--- /dev/null
@@ -0,0 +1,190 @@
+#include <cppunit/extensions/HelperMacros.h>
+#include <unistd.h>
+#include <sys/socket.h>
+#include <sys/types.h>
+#include <sys/un.h>
+
+#include <iostream>
+
+#include "ThreadPool.H"
+
+class TestWork : public ThreadPool::WorkDescription {
+public:
+       int done;
+       
+       TestWork(int fd) : ThreadPool::WorkDescription(fd), done(0) {};
+
+       virtual void onReady() {
+               done++;
+       }
+       
+};
+
+
+class TestConsumer : public ThreadPool::WorkDescription {
+public:
+       char buf[2];
+
+       TestConsumer(int fd) : ThreadPool::WorkDescription(fd) {};
+
+       virtual void onReady() {
+               int r;
+               
+               r = read(fd, buf, 1);
+               buf[1] = 0;
+               ThreadPool::getThreadPool()->exit();
+       }
+
+       virtual void onTimeout() {
+       }
+
+};
+
+
+class TestProducer : public ThreadPool::WorkDescription {
+public:
+       TestProducer(int fd) : ThreadPool::WorkDescription(fd) {};
+
+       virtual void onReady() {
+               write(fd, "a", 1);
+       }
+       
+       virtual void onTimeout() {
+       }
+
+};
+
+
+class TestSocketRead: public ThreadPool::WorkDescription {
+public:
+       char buffer[10];
+
+       TestSocketRead(int fd) : ThreadPool::WorkDescription(fd) {
+       }
+
+       virtual void onReady() {
+               
+               int len = recv(fd, buffer, sizeof(buffer), MSG_NOSIGNAL);
+               ThreadPool::getThreadPool()->exit();
+       }
+
+       virtual void onError() {
+       }
+};
+
+
+class TestSocketWrite: public ThreadPool::WorkDescription {
+public:
+       static char buffer[];
+
+       TestSocketWrite(const char *name) 
+               : ThreadPool::WorkDescription(0) {
+               struct sockaddr_un saddr;
+               int ret;
+               fd = socket(PF_UNIX, SOCK_STREAM, 0);
+               memset(&saddr, 0, sizeof(saddr));
+               saddr.sun_family = AF_UNIX;
+               strcpy(saddr.sun_path, name);
+               if((ret = connect(fd, (struct sockaddr *)&saddr, sizeof(saddr))) < 0) {
+               }
+       }
+
+       virtual void onReady() {
+               int ret;
+
+               ret = send(fd, buffer, strlen(buffer)+1, MSG_NOSIGNAL);
+               close(fd);
+       }
+
+};
+
+char TestSocketWrite::buffer[] = "ahoj";
+
+class TestSocketAccept : public ThreadPool::WorkDescription {
+public:
+       TestSocketRead *reader;
+
+       TestSocketAccept(const char *name) 
+               : ThreadPool::WorkDescription(0) {
+               struct sockaddr_un saddr;
+
+               fd = socket(PF_UNIX, SOCK_STREAM, 0);
+               memset(&saddr, 0, sizeof(saddr));
+               saddr.sun_family = AF_UNIX;
+               strcpy(saddr.sun_path, name);
+               bind(fd, (struct sockaddr *)&saddr, sizeof(saddr));
+               listen(fd, 1);
+       }
+
+       virtual void onReady() {
+               int nfd;
+
+               nfd = accept(fd, NULL, NULL);
+               if(nfd < 0) { 
+               } else {
+                       ThreadPool *pool = ThreadPool::getThreadPool();
+
+                       reader  = new TestSocketRead(nfd);
+                       pool->queueWorkRead(reader);
+               }
+       }
+};
+
+
+class ThreadPoolTest: public CppUnit::TestFixture
+{
+       CPPUNIT_TEST_SUITE( ThreadPoolTest );
+//     CPPUNIT_TEST( testWorkQueue );
+       CPPUNIT_TEST( testPoll );
+       CPPUNIT_TEST( testAccept );
+       CPPUNIT_TEST_SUITE_END();
+
+public:
+       void setUp() {
+               pool = ThreadPool::getThreadPool();
+               unlink("/tmp/smazat.sock");
+               pool->startWorkers(2);
+       }
+
+       void tearDown() {
+               pool->stopWorkers();
+       }
+
+       void testWorkQueue() {
+               TestWork *work_unit = new TestWork(0);
+               pool->postWork(work_unit);
+       }
+
+       void testPoll() {
+               int fd[2];
+               TestProducer *p = new TestProducer(0);
+               TestConsumer *c = new TestConsumer(0);
+
+               pipe(fd);
+               c->fd = fd[0];
+               p->fd = fd[1];
+               pool->queueWorkRead(c);
+               pool->queueWorkWrite(p);
+               pool->run();
+               CPPUNIT_ASSERT(c->buf[0] == 'a');
+               CPPUNIT_ASSERT(c->buf[1] == 0);
+       }
+
+       void testAccept() {
+               TestSocketAccept *consumer = new TestSocketAccept("/tmp/smazat.sock");
+               TestSocketWrite *producer;
+
+               pool->queueWorkAccept(consumer);
+               producer = new TestSocketWrite("/tmp/smazat.sock");
+               ThreadPool::getThreadPool()->queueWorkWrite(producer);
+               pool->run();
+               CPPUNIT_ASSERT(consumer->reader != NULL);
+               CPPUNIT_ASSERT(strcmp(consumer->reader->buffer, TestSocketWrite::buffer) == 0);
+       }
+
+private:
+       ThreadPool *pool;
+};
+
+
+CPPUNIT_TEST_SUITE_REGISTRATION( ThreadPoolTest );
diff --git a/org.glite.lb.logger/src-nt/test/test_main.cpp b/org.glite.lb.logger/src-nt/test/test_main.cpp
new file mode 100644 (file)
index 0000000..1ae6d6a
--- /dev/null
@@ -0,0 +1,32 @@
+#include <assert.h>
+#include <fstream>
+
+#include <cppunit/extensions/TestFactoryRegistry.h>
+#include <cppunit/CompilerOutputter.h>
+//#include <cppunit/XmlOutputter.h>
+#include <cppunit/TestRunner.h>
+#include <cppunit/TestResult.h>
+#include <cppunit/TestResultCollector.h>
+
+int main (int argc,const char *argv[])
+{
+        CppUnit::Test *suite = CppUnit::TestFactoryRegistry::getRegistry().makeTest();
+
+//        assert(argc == 2);
+//        std::ofstream   xml(argv[1]);
+
+        CppUnit::TestResult controller;
+        CppUnit::TestResultCollector result;
+        controller.addListener( &result );
+
+        CppUnit::TestRunner runner;
+        runner.addTest(suite);
+        runner.run(controller);
+
+//        CppUnit::XmlOutputter xout( &result, xml );
+        CppUnit::CompilerOutputter tout( &result, std::cout);
+//        xout.write();
+        tout.write();
+
+        return result.wasSuccessful() ? 0 : 1 ;
+}