(work)
authorMichal Voců <michal@ruk.cuni.cz>
Wed, 12 Sep 2007 13:51:26 +0000 (13:51 +0000)
committerMichal Voců <michal@ruk.cuni.cz>
Wed, 12 Sep 2007 13:51:26 +0000 (13:51 +0000)
12 files changed:
org.glite.lb.logger/src-nt/Connection.cpp [new file with mode: 0644]
org.glite.lb.logger/src-nt/HTTPTransport.H
org.glite.lb.logger/src-nt/HTTPTransport.cpp
org.glite.lb.logger/src-nt/InputChannel.H [new file with mode: 0644]
org.glite.lb.logger/src-nt/InputChannel.cpp [new file with mode: 0644]
org.glite.lb.logger/src-nt/Makefile
org.glite.lb.logger/src-nt/Message.H [new file with mode: 0644]
org.glite.lb.logger/src-nt/Properties.H [new file with mode: 0644]
org.glite.lb.logger/src-nt/SocketInput.cpp
org.glite.lb.logger/src-nt/Transport.H
org.glite.lb.logger/src-nt/Transport.cpp [new file with mode: 0644]
org.glite.lb.logger/src-nt/main.cpp

diff --git a/org.glite.lb.logger/src-nt/Connection.cpp b/org.glite.lb.logger/src-nt/Connection.cpp
new file mode 100644 (file)
index 0000000..7cac943
--- /dev/null
@@ -0,0 +1,4 @@
+#include "Connection.H"
+
+Connection::Factory::~Factory() {
+}
index 37c91d7..05e928b 100644 (file)
@@ -11,19 +11,18 @@ class HTTPTransport:
        public Transport
 {
 public:
+
+       // factory class
        class Factory: public Transport::Factory, 
                       public Singleton<HTTPTransport::Factory>  {
        public:
-               virtual Transport *newTransport(Connection *conn) const {
-                       if(conn) 
-                               return(new HTTPTransport(conn));
-                       else 
-                               return NULL;
+               virtual Transport *newTransport() const {
+                       return(new HTTPTransport());
                }
        };
 
-       HTTPTransport(Connection *conn
-               : Transport(conn),
+       HTTPTransport() 
+               : Transport(),
                  state(NONE),
                  request(), headers(), body(NULL), pos(NULL),
                  content_length(0)
@@ -31,12 +30,9 @@ public:
 
        virtual ~HTTPTransport();
 
-
-protected:
-       // from ThreadPool::WorkDescription
-       virtual void onReady();
-       virtual void onTimeout();
-       virtual void onError();
+       virtual int receive(Connection *conn, Message* &msg);
+       virtual int send(Connection *conn, Message* msg);
+       virtual void reset();
 
 private:
        enum { NONE, 
@@ -51,6 +47,7 @@ private:
        unsigned int content_length;
 
        int parseHeader(const char *s, unsigned int len);
+       void serializeHeaders(Message *msg);
 };
 
 
index 9dbe316..c77606f 100644 (file)
@@ -12,8 +12,11 @@ HTTPTransport::~HTTPTransport()
 }
 
 
-void
-HTTPTransport::onReady()
+// read what is available and parse what can be parsed
+// returns the result of read operation of the underlying connection,
+// ie. the number of bytes read or error code
+int
+HTTPTransport::receive(Connection *conn, Message* &msg)
 {
        int len;
 
@@ -27,12 +30,12 @@ HTTPTransport::onReady()
                len = conn->read(pos, sizeof(buffer) - (pos - buffer));
                if(len < 0) {
                        // error during request
-                       // XXX - handle this
-                       state = NONE;
+                       // state = NONE;
+                       return len;
                } else if(len == 0) {
                        // other side closed connection
-                       // XXX - handle this
-                       state = NONE;
+                       // state = NONE;
+                       return len;
                } else {
                        char *cr = NULL, *p = buffer, *s = buffer;
                        bool crlf_seen = false;
@@ -132,10 +135,12 @@ HTTPTransport::onReady()
                len = conn->read(pos, content_length - (pos - body));
                if(len < 0) {
                        // error reading
-                       state = NONE;
+                       // state = NONE;
+                       return len;
                } else if(len == 0) {
                        // no more data
-                       state = NONE;
+                       // state = NONE;
+                       return len;
                } else {
                        pos += len;
                        if(pos == content_length + body) {
@@ -147,28 +152,15 @@ HTTPTransport::onReady()
        }
 
        if(state != NONE) 
-               ThreadPool::instance()->queueWorkRead(this);
+               msg = NULL;
        else {
                // we have a new message
                // XXX - or we have an error, must handle it
                std::cout << request << std::endl << headers << std::endl;
                std::cout.write(body, content_length);
                std::cout.flush();
-               // res = EventManager::instance()->postEvent(new NewMessageEvent(conn, headers, body));
        }
-
-}
-
-
-void 
-HTTPTransport::onTimeout()
-{
-}
-
-
-void 
-HTTPTransport::onError()
-{
+       return len;
 }
 
 
@@ -184,3 +176,81 @@ HTTPTransport::parseHeader(const char *s, unsigned int len)
        }
        return(0);
 }
+
+
+int
+HTTPTransport::send(Connection *conn, Message* msg)
+{
+       int len;
+       switch(state) {
+       case NONE:
+               state = IN_REQUEST;
+               request = "POST " + msg->path() + "HTTP/1.1\r\n";
+               pos = request.c_str();
+               content_length = msg->getContent(body);
+
+       case IN_REQUEST:
+               len = conn->send(pos, request.length() - pos + request.c_str());
+               if(len < 0) {
+                       return len;
+               }
+               pos += len;
+               if(request.c_str() + request.length() == pos) {
+                       state = IN_HEADERS;
+                       prepareHeaders(msg);
+                       pos = headers.c_str();
+               } else {
+                       break;
+               }
+
+       case IN_HEADERS:
+               len = conn->send(pos, headers.length() - pos + headers.c_str());
+               if(len < 0) {
+                       return len;
+               }
+               pos += len;
+               if(headers.c_str() + headers.length() == pos) {
+                       state = IN_BODY;
+                       pos = body;
+               } else {
+                       break;
+               }
+
+       case IN_BODY:
+               len = conn->send(pos, body, content_length - pos + body);
+               if(len < 0) {
+                       return len;
+               }
+               pos += len;
+               if(body + content_length == pos) {
+                       state = NONE;
+                       return 0;
+               }
+               break;
+               
+       default:
+       }
+       return len;
+}
+
+
+void 
+HTTPTransport::reset()
+{
+       state = NONE;
+       request.clear();
+       headers.clear();
+       if(body) {
+               free(body);
+               body = NULL;
+       }
+       content_length = 0;
+       pos = buffer;
+}
+       
+
+void
+HTTPTransport::serializeHeaders(Message *msg);
+{
+       for(Properties::iterator i = msg->
+}
diff --git a/org.glite.lb.logger/src-nt/InputChannel.H b/org.glite.lb.logger/src-nt/InputChannel.H
new file mode 100644 (file)
index 0000000..2bac262
--- /dev/null
@@ -0,0 +1,29 @@
+#ifndef _INPUT_CHANNEL_H_
+#define _INPUT_CHANNEL_H_
+
+#include "ThreadPool.H"
+#include "Connection.H"
+#include "Transport.H"
+
+class InputChannel 
+       : public ThreadPool::WorkDescription {
+public:
+       
+       InputChannel(Connection *conn, Transport *trans)
+               :  ThreadPool::WorkDescription(conn->getFD()),
+                  m_connection(conn), m_transport(trans)
+               {}
+
+       void start();
+
+protected:
+       virtual void onReady();
+       virtual void onTimeout();
+       virtual void onError();
+
+private:
+       Connection *m_connection;
+       Transport  *m_transport;
+};
+
+#endif
diff --git a/org.glite.lb.logger/src-nt/InputChannel.cpp b/org.glite.lb.logger/src-nt/InputChannel.cpp
new file mode 100644 (file)
index 0000000..aad201d
--- /dev/null
@@ -0,0 +1,37 @@
+#include "InputChannel.H"
+#include "ThreadPool.H"
+#include "EventManager.H"
+
+extern EventManager theEventManager;
+
+void
+InputChannel::start()
+{
+       ThreadPool::instance()->queueWorkRead(this);
+}
+
+void
+InputChannel::onReady()
+{
+       Transport::Message *msg = NULL;
+       int ret = m_transport->receive(m_connection, msg);
+       if(ret <= 0) {
+               // no new data read
+       } else if(msg) {
+               // we have a new message
+               //theEventManager.postEvent(new NewMessageEvent());
+       } else {
+               // still need more data
+               ThreadPool::instance()->queueWorkRead(this);
+       }
+}
+
+void
+InputChannel::onTimeout()
+{
+}
+
+void
+InputChannel::onError()
+{
+}
index 7263354..8ebe707 100644 (file)
@@ -29,7 +29,8 @@ OBJS = \
        Transport.o \
        HTTPTransport.o \
        ThreadPool.o \
-       EventManager.o
+       EventManager.o \
+       InputChannel.cpp 
 
 plain: main.o $(OBJS)
        $(LINK) -o $@ $+ $(THREAD_LIB)
diff --git a/org.glite.lb.logger/src-nt/Message.H b/org.glite.lb.logger/src-nt/Message.H
new file mode 100644 (file)
index 0000000..4303f14
--- /dev/null
@@ -0,0 +1,23 @@
+#ifndef _MESSAGE_H_
+#define _MESSAGE_H
+
+class Message {
+public:
+       class ID {
+       public:
+       private:
+               unsigned char m_uuid[16];
+       };
+
+       Message(void * data, unsigned int length) 
+               : m_length(length), 
+                 m_data(data)
+               {}
+
+private:
+       ID                      m_id;
+       unsigned int            m_length;
+       void *                  m_data;
+};
+
+#endif
diff --git a/org.glite.lb.logger/src-nt/Properties.H b/org.glite.lb.logger/src-nt/Properties.H
new file mode 100644 (file)
index 0000000..a3a18eb
--- /dev/null
@@ -0,0 +1,35 @@
+#ifndef _PROPERTIES_H_
+#define _PROPERTIES_H_
+
+#include <map>
+
+class Properties {
+public:
+
+       // default constructor
+       Properties() 
+               : properties()
+               {}
+
+       // accessors
+       string& getProperty(string &key) 
+               { return properties[key]; }
+
+       void setProperty(string &key, string &val) 
+               { properties[key] = val; }
+
+       // iterators
+       typedef std::map<string,string>::iterator  iterator;
+
+       iterator begin() 
+               { return properties.begin(); }
+
+       iterator end()
+               { return properties.end(); }
+
+       
+private:
+       std::map<string,string> properties;
+};
+
+#endif
index 259e843..e553ec8 100644 (file)
@@ -6,6 +6,7 @@
 
 #include "ThreadPool.H"
 #include "SocketInput.H"
+#include "InputChannel.H"
 #include "Exception.H"
 
 
@@ -50,8 +51,9 @@ void
 SocketInput::onReady()
 {
        Connection *conn = cFactory->accept(fd);
-       Transport  *trans = tFactory->newTransport(conn);
-       ThreadPool::instance()->queueWorkRead(trans);
+       Transport  *trans = tFactory->newTransport();
+       InputChannel *channel = new InputChannel(conn, trans);
+       channel->start();
 }
 
 
index 115ddaf..67ac1ee 100644 (file)
@@ -2,27 +2,49 @@
 #define _TRANSPORT_H
 
 #include "Connection.H"
-#include "ThreadPool.H"
+#include "Properties.H"
 
-class Transport: public ThreadPool::WorkDescription {
+// Transport implements transport protocol 
+//  - reads/writes messages using Connection interface
+//  - 
+class Transport {
 public:
+
+       // 
        class Factory {
        public:
-               virtual Transport *newTransport(Connection *conn) const = 0;
+               virtual Transport *newTransport() const = 0;
 
-               virtual ~Factory();
+               virtual ~Factory() 
+                       {}
        };
 
-       Transport(Connection *a_conn) 
-               : ThreadPool::WorkDescription(a_conn ? a_conn->getFD() : -1),
-                 conn(a_conn)
-               {}
 
-       virtual ~Transport() 
-               { if(conn) delete conn; }
+       // 
+       class Message {
+       public:
+
+               Message();
+
+               int getContent(char* &data) const;
+               int getContentLength() const;
+               int getHeader(const char* name, char* &val) const;
+               Properties& getProperties();
+               char *path() const;
+
+       private:
+               
+       };
+
+       //
+       Transport() 
+               {}
 
-protected:
-       Connection *conn;
+       virtual ~Transport();
 
+       // 
+       virtual int receive(Connection *conn, Message* &msg) = 0;
+       virtual int send(Connection *conn, Message* msg) = 0;
+       virtual void reset() = 0;
 };
 #endif
diff --git a/org.glite.lb.logger/src-nt/Transport.cpp b/org.glite.lb.logger/src-nt/Transport.cpp
new file mode 100644 (file)
index 0000000..2544997
--- /dev/null
@@ -0,0 +1,5 @@
+#include "Transport.H"
+
+Transport::~Transport()
+{
+}
index 1967015..b2fed11 100644 (file)
@@ -3,10 +3,13 @@
 #include "SocketInput.H"
 #include "PlainConnection.H"
 #include "HTTPTransport.H"
+#include "EventManager.H"
 
 const int num_threads = 2;
 const char *sock_path = "/tmp/il_sock";
 
+EventManager theEventManager();
+
 int main(int argc, char *argv[])
 {
        SocketInput *input;