From d21e9338e1bae781cb9c7de030e2b5cc2d92d457 Mon Sep 17 00:00:00 2001 From: =?utf8?q?Michal=20Voc=C5=AF?= Date: Wed, 12 Sep 2007 13:51:26 +0000 Subject: [PATCH] (work) --- org.glite.lb.logger/src-nt/Connection.cpp | 4 + org.glite.lb.logger/src-nt/HTTPTransport.H | 23 +++--- org.glite.lb.logger/src-nt/HTTPTransport.cpp | 116 +++++++++++++++++++++------ org.glite.lb.logger/src-nt/InputChannel.H | 29 +++++++ org.glite.lb.logger/src-nt/InputChannel.cpp | 37 +++++++++ org.glite.lb.logger/src-nt/Makefile | 3 +- org.glite.lb.logger/src-nt/Message.H | 23 ++++++ org.glite.lb.logger/src-nt/Properties.H | 35 ++++++++ org.glite.lb.logger/src-nt/SocketInput.cpp | 6 +- org.glite.lb.logger/src-nt/Transport.H | 46 ++++++++--- org.glite.lb.logger/src-nt/Transport.cpp | 5 ++ org.glite.lb.logger/src-nt/main.cpp | 3 + 12 files changed, 279 insertions(+), 51 deletions(-) create mode 100644 org.glite.lb.logger/src-nt/Connection.cpp create mode 100644 org.glite.lb.logger/src-nt/InputChannel.H create mode 100644 org.glite.lb.logger/src-nt/InputChannel.cpp create mode 100644 org.glite.lb.logger/src-nt/Message.H create mode 100644 org.glite.lb.logger/src-nt/Properties.H create mode 100644 org.glite.lb.logger/src-nt/Transport.cpp diff --git a/org.glite.lb.logger/src-nt/Connection.cpp b/org.glite.lb.logger/src-nt/Connection.cpp new file mode 100644 index 0000000..7cac943 --- /dev/null +++ b/org.glite.lb.logger/src-nt/Connection.cpp @@ -0,0 +1,4 @@ +#include "Connection.H" + +Connection::Factory::~Factory() { +} diff --git a/org.glite.lb.logger/src-nt/HTTPTransport.H b/org.glite.lb.logger/src-nt/HTTPTransport.H index 37c91d7..05e928b 100644 --- a/org.glite.lb.logger/src-nt/HTTPTransport.H +++ b/org.glite.lb.logger/src-nt/HTTPTransport.H @@ -11,19 +11,18 @@ class HTTPTransport: public Transport { public: + + // factory class class Factory: public Transport::Factory, public Singleton { public: - virtual Transport *newTransport(Connection *conn) const { - if(conn) - return(new HTTPTransport(conn)); - else - return NULL; + virtual Transport *newTransport() const { + return(new HTTPTransport()); } }; - HTTPTransport(Connection *conn) - : Transport(conn), + HTTPTransport() + : Transport(), state(NONE), request(), headers(), body(NULL), pos(NULL), content_length(0) @@ -31,12 +30,9 @@ public: virtual ~HTTPTransport(); - -protected: - // from ThreadPool::WorkDescription - virtual void onReady(); - virtual void onTimeout(); - virtual void onError(); + virtual int receive(Connection *conn, Message* &msg); + virtual int send(Connection *conn, Message* msg); + virtual void reset(); private: enum { NONE, @@ -51,6 +47,7 @@ private: unsigned int content_length; int parseHeader(const char *s, unsigned int len); + void serializeHeaders(Message *msg); }; diff --git a/org.glite.lb.logger/src-nt/HTTPTransport.cpp b/org.glite.lb.logger/src-nt/HTTPTransport.cpp index 9dbe316..c77606f 100644 --- a/org.glite.lb.logger/src-nt/HTTPTransport.cpp +++ b/org.glite.lb.logger/src-nt/HTTPTransport.cpp @@ -12,8 +12,11 @@ HTTPTransport::~HTTPTransport() } -void -HTTPTransport::onReady() +// read what is available and parse what can be parsed +// returns the result of read operation of the underlying connection, +// ie. the number of bytes read or error code +int +HTTPTransport::receive(Connection *conn, Message* &msg) { int len; @@ -27,12 +30,12 @@ HTTPTransport::onReady() len = conn->read(pos, sizeof(buffer) - (pos - buffer)); if(len < 0) { // error during request - // XXX - handle this - state = NONE; + // state = NONE; + return len; } else if(len == 0) { // other side closed connection - // XXX - handle this - state = NONE; + // state = NONE; + return len; } else { char *cr = NULL, *p = buffer, *s = buffer; bool crlf_seen = false; @@ -132,10 +135,12 @@ HTTPTransport::onReady() len = conn->read(pos, content_length - (pos - body)); if(len < 0) { // error reading - state = NONE; + // state = NONE; + return len; } else if(len == 0) { // no more data - state = NONE; + // state = NONE; + return len; } else { pos += len; if(pos == content_length + body) { @@ -147,28 +152,15 @@ HTTPTransport::onReady() } if(state != NONE) - ThreadPool::instance()->queueWorkRead(this); + msg = NULL; else { // we have a new message // XXX - or we have an error, must handle it std::cout << request << std::endl << headers << std::endl; std::cout.write(body, content_length); std::cout.flush(); - // res = EventManager::instance()->postEvent(new NewMessageEvent(conn, headers, body)); } - -} - - -void -HTTPTransport::onTimeout() -{ -} - - -void -HTTPTransport::onError() -{ + return len; } @@ -184,3 +176,81 @@ HTTPTransport::parseHeader(const char *s, unsigned int len) } return(0); } + + +int +HTTPTransport::send(Connection *conn, Message* msg) +{ + int len; + switch(state) { + case NONE: + state = IN_REQUEST; + request = "POST " + msg->path() + "HTTP/1.1\r\n"; + pos = request.c_str(); + content_length = msg->getContent(body); + + case IN_REQUEST: + len = conn->send(pos, request.length() - pos + request.c_str()); + if(len < 0) { + return len; + } + pos += len; + if(request.c_str() + request.length() == pos) { + state = IN_HEADERS; + prepareHeaders(msg); + pos = headers.c_str(); + } else { + break; + } + + case IN_HEADERS: + len = conn->send(pos, headers.length() - pos + headers.c_str()); + if(len < 0) { + return len; + } + pos += len; + if(headers.c_str() + headers.length() == pos) { + state = IN_BODY; + pos = body; + } else { + break; + } + + case IN_BODY: + len = conn->send(pos, body, content_length - pos + body); + if(len < 0) { + return len; + } + pos += len; + if(body + content_length == pos) { + state = NONE; + return 0; + } + break; + + default: + } + return len; +} + + +void +HTTPTransport::reset() +{ + state = NONE; + request.clear(); + headers.clear(); + if(body) { + free(body); + body = NULL; + } + content_length = 0; + pos = buffer; +} + + +void +HTTPTransport::serializeHeaders(Message *msg); +{ + for(Properties::iterator i = msg-> +} diff --git a/org.glite.lb.logger/src-nt/InputChannel.H b/org.glite.lb.logger/src-nt/InputChannel.H new file mode 100644 index 0000000..2bac262 --- /dev/null +++ b/org.glite.lb.logger/src-nt/InputChannel.H @@ -0,0 +1,29 @@ +#ifndef _INPUT_CHANNEL_H_ +#define _INPUT_CHANNEL_H_ + +#include "ThreadPool.H" +#include "Connection.H" +#include "Transport.H" + +class InputChannel + : public ThreadPool::WorkDescription { +public: + + InputChannel(Connection *conn, Transport *trans) + : ThreadPool::WorkDescription(conn->getFD()), + m_connection(conn), m_transport(trans) + {} + + void start(); + +protected: + virtual void onReady(); + virtual void onTimeout(); + virtual void onError(); + +private: + Connection *m_connection; + Transport *m_transport; +}; + +#endif diff --git a/org.glite.lb.logger/src-nt/InputChannel.cpp b/org.glite.lb.logger/src-nt/InputChannel.cpp new file mode 100644 index 0000000..aad201d --- /dev/null +++ b/org.glite.lb.logger/src-nt/InputChannel.cpp @@ -0,0 +1,37 @@ +#include "InputChannel.H" +#include "ThreadPool.H" +#include "EventManager.H" + +extern EventManager theEventManager; + +void +InputChannel::start() +{ + ThreadPool::instance()->queueWorkRead(this); +} + +void +InputChannel::onReady() +{ + Transport::Message *msg = NULL; + int ret = m_transport->receive(m_connection, msg); + if(ret <= 0) { + // no new data read + } else if(msg) { + // we have a new message + //theEventManager.postEvent(new NewMessageEvent()); + } else { + // still need more data + ThreadPool::instance()->queueWorkRead(this); + } +} + +void +InputChannel::onTimeout() +{ +} + +void +InputChannel::onError() +{ +} diff --git a/org.glite.lb.logger/src-nt/Makefile b/org.glite.lb.logger/src-nt/Makefile index 7263354..8ebe707 100644 --- a/org.glite.lb.logger/src-nt/Makefile +++ b/org.glite.lb.logger/src-nt/Makefile @@ -29,7 +29,8 @@ OBJS = \ Transport.o \ HTTPTransport.o \ ThreadPool.o \ - EventManager.o + EventManager.o \ + InputChannel.cpp plain: main.o $(OBJS) $(LINK) -o $@ $+ $(THREAD_LIB) diff --git a/org.glite.lb.logger/src-nt/Message.H b/org.glite.lb.logger/src-nt/Message.H new file mode 100644 index 0000000..4303f14 --- /dev/null +++ b/org.glite.lb.logger/src-nt/Message.H @@ -0,0 +1,23 @@ +#ifndef _MESSAGE_H_ +#define _MESSAGE_H + +class Message { +public: + class ID { + public: + private: + unsigned char m_uuid[16]; + }; + + Message(void * data, unsigned int length) + : m_length(length), + m_data(data) + {} + +private: + ID m_id; + unsigned int m_length; + void * m_data; +}; + +#endif diff --git a/org.glite.lb.logger/src-nt/Properties.H b/org.glite.lb.logger/src-nt/Properties.H new file mode 100644 index 0000000..a3a18eb --- /dev/null +++ b/org.glite.lb.logger/src-nt/Properties.H @@ -0,0 +1,35 @@ +#ifndef _PROPERTIES_H_ +#define _PROPERTIES_H_ + +#include + +class Properties { +public: + + // default constructor + Properties() + : properties() + {} + + // accessors + string& getProperty(string &key) + { return properties[key]; } + + void setProperty(string &key, string &val) + { properties[key] = val; } + + // iterators + typedef std::map::iterator iterator; + + iterator begin() + { return properties.begin(); } + + iterator end() + { return properties.end(); } + + +private: + std::map properties; +}; + +#endif diff --git a/org.glite.lb.logger/src-nt/SocketInput.cpp b/org.glite.lb.logger/src-nt/SocketInput.cpp index 259e843..e553ec8 100644 --- a/org.glite.lb.logger/src-nt/SocketInput.cpp +++ b/org.glite.lb.logger/src-nt/SocketInput.cpp @@ -6,6 +6,7 @@ #include "ThreadPool.H" #include "SocketInput.H" +#include "InputChannel.H" #include "Exception.H" @@ -50,8 +51,9 @@ void SocketInput::onReady() { Connection *conn = cFactory->accept(fd); - Transport *trans = tFactory->newTransport(conn); - ThreadPool::instance()->queueWorkRead(trans); + Transport *trans = tFactory->newTransport(); + InputChannel *channel = new InputChannel(conn, trans); + channel->start(); } diff --git a/org.glite.lb.logger/src-nt/Transport.H b/org.glite.lb.logger/src-nt/Transport.H index 115ddaf..67ac1ee 100644 --- a/org.glite.lb.logger/src-nt/Transport.H +++ b/org.glite.lb.logger/src-nt/Transport.H @@ -2,27 +2,49 @@ #define _TRANSPORT_H #include "Connection.H" -#include "ThreadPool.H" +#include "Properties.H" -class Transport: public ThreadPool::WorkDescription { +// Transport implements transport protocol +// - reads/writes messages using Connection interface +// - +class Transport { public: + + // class Factory { public: - virtual Transport *newTransport(Connection *conn) const = 0; + virtual Transport *newTransport() const = 0; - virtual ~Factory(); + virtual ~Factory() + {} }; - Transport(Connection *a_conn) - : ThreadPool::WorkDescription(a_conn ? a_conn->getFD() : -1), - conn(a_conn) - {} - virtual ~Transport() - { if(conn) delete conn; } + // + class Message { + public: + + Message(); + + int getContent(char* &data) const; + int getContentLength() const; + int getHeader(const char* name, char* &val) const; + Properties& getProperties(); + char *path() const; + + private: + + }; + + // + Transport() + {} -protected: - Connection *conn; + virtual ~Transport(); + // + virtual int receive(Connection *conn, Message* &msg) = 0; + virtual int send(Connection *conn, Message* msg) = 0; + virtual void reset() = 0; }; #endif diff --git a/org.glite.lb.logger/src-nt/Transport.cpp b/org.glite.lb.logger/src-nt/Transport.cpp new file mode 100644 index 0000000..2544997 --- /dev/null +++ b/org.glite.lb.logger/src-nt/Transport.cpp @@ -0,0 +1,5 @@ +#include "Transport.H" + +Transport::~Transport() +{ +} diff --git a/org.glite.lb.logger/src-nt/main.cpp b/org.glite.lb.logger/src-nt/main.cpp index 1967015..b2fed11 100644 --- a/org.glite.lb.logger/src-nt/main.cpp +++ b/org.glite.lb.logger/src-nt/main.cpp @@ -3,10 +3,13 @@ #include "SocketInput.H" #include "PlainConnection.H" #include "HTTPTransport.H" +#include "EventManager.H" const int num_threads = 2; const char *sock_path = "/tmp/il_sock"; +EventManager theEventManager(); + int main(int argc, char *argv[]) { SocketInput *input; -- 1.8.2.3