--- /dev/null
+#include "Connection.H"
+
+Connection::Factory::~Factory() {
+}
public Transport
{
public:
+
+ // factory class
class Factory: public Transport::Factory,
public Singleton<HTTPTransport::Factory> {
public:
- virtual Transport *newTransport(Connection *conn) const {
- if(conn)
- return(new HTTPTransport(conn));
- else
- return NULL;
+ virtual Transport *newTransport() const {
+ return(new HTTPTransport());
}
};
- HTTPTransport(Connection *conn)
- : Transport(conn),
+ HTTPTransport()
+ : Transport(),
state(NONE),
request(), headers(), body(NULL), pos(NULL),
content_length(0)
virtual ~HTTPTransport();
-
-protected:
- // from ThreadPool::WorkDescription
- virtual void onReady();
- virtual void onTimeout();
- virtual void onError();
+ virtual int receive(Connection *conn, Message* &msg);
+ virtual int send(Connection *conn, Message* msg);
+ virtual void reset();
private:
enum { NONE,
unsigned int content_length;
int parseHeader(const char *s, unsigned int len);
+ void serializeHeaders(Message *msg);
};
}
-void
-HTTPTransport::onReady()
+// read what is available and parse what can be parsed
+// returns the result of read operation of the underlying connection,
+// ie. the number of bytes read or error code
+int
+HTTPTransport::receive(Connection *conn, Message* &msg)
{
int len;
len = conn->read(pos, sizeof(buffer) - (pos - buffer));
if(len < 0) {
// error during request
- // XXX - handle this
- state = NONE;
+ // state = NONE;
+ return len;
} else if(len == 0) {
// other side closed connection
- // XXX - handle this
- state = NONE;
+ // state = NONE;
+ return len;
} else {
char *cr = NULL, *p = buffer, *s = buffer;
bool crlf_seen = false;
len = conn->read(pos, content_length - (pos - body));
if(len < 0) {
// error reading
- state = NONE;
+ // state = NONE;
+ return len;
} else if(len == 0) {
// no more data
- state = NONE;
+ // state = NONE;
+ return len;
} else {
pos += len;
if(pos == content_length + body) {
}
if(state != NONE)
- ThreadPool::instance()->queueWorkRead(this);
+ msg = NULL;
else {
// we have a new message
// XXX - or we have an error, must handle it
std::cout << request << std::endl << headers << std::endl;
std::cout.write(body, content_length);
std::cout.flush();
- // res = EventManager::instance()->postEvent(new NewMessageEvent(conn, headers, body));
}
-
-}
-
-
-void
-HTTPTransport::onTimeout()
-{
-}
-
-
-void
-HTTPTransport::onError()
-{
+ return len;
}
}
return(0);
}
+
+
+int
+HTTPTransport::send(Connection *conn, Message* msg)
+{
+ int len;
+ switch(state) {
+ case NONE:
+ state = IN_REQUEST;
+ request = "POST " + msg->path() + "HTTP/1.1\r\n";
+ pos = request.c_str();
+ content_length = msg->getContent(body);
+
+ case IN_REQUEST:
+ len = conn->send(pos, request.length() - pos + request.c_str());
+ if(len < 0) {
+ return len;
+ }
+ pos += len;
+ if(request.c_str() + request.length() == pos) {
+ state = IN_HEADERS;
+ prepareHeaders(msg);
+ pos = headers.c_str();
+ } else {
+ break;
+ }
+
+ case IN_HEADERS:
+ len = conn->send(pos, headers.length() - pos + headers.c_str());
+ if(len < 0) {
+ return len;
+ }
+ pos += len;
+ if(headers.c_str() + headers.length() == pos) {
+ state = IN_BODY;
+ pos = body;
+ } else {
+ break;
+ }
+
+ case IN_BODY:
+ len = conn->send(pos, body, content_length - pos + body);
+ if(len < 0) {
+ return len;
+ }
+ pos += len;
+ if(body + content_length == pos) {
+ state = NONE;
+ return 0;
+ }
+ break;
+
+ default:
+ }
+ return len;
+}
+
+
+void
+HTTPTransport::reset()
+{
+ state = NONE;
+ request.clear();
+ headers.clear();
+ if(body) {
+ free(body);
+ body = NULL;
+ }
+ content_length = 0;
+ pos = buffer;
+}
+
+
+void
+HTTPTransport::serializeHeaders(Message *msg);
+{
+ for(Properties::iterator i = msg->
+}
--- /dev/null
+#ifndef _INPUT_CHANNEL_H_
+#define _INPUT_CHANNEL_H_
+
+#include "ThreadPool.H"
+#include "Connection.H"
+#include "Transport.H"
+
+class InputChannel
+ : public ThreadPool::WorkDescription {
+public:
+
+ InputChannel(Connection *conn, Transport *trans)
+ : ThreadPool::WorkDescription(conn->getFD()),
+ m_connection(conn), m_transport(trans)
+ {}
+
+ void start();
+
+protected:
+ virtual void onReady();
+ virtual void onTimeout();
+ virtual void onError();
+
+private:
+ Connection *m_connection;
+ Transport *m_transport;
+};
+
+#endif
--- /dev/null
+#include "InputChannel.H"
+#include "ThreadPool.H"
+#include "EventManager.H"
+
+extern EventManager theEventManager;
+
+void
+InputChannel::start()
+{
+ ThreadPool::instance()->queueWorkRead(this);
+}
+
+void
+InputChannel::onReady()
+{
+ Transport::Message *msg = NULL;
+ int ret = m_transport->receive(m_connection, msg);
+ if(ret <= 0) {
+ // no new data read
+ } else if(msg) {
+ // we have a new message
+ //theEventManager.postEvent(new NewMessageEvent());
+ } else {
+ // still need more data
+ ThreadPool::instance()->queueWorkRead(this);
+ }
+}
+
+void
+InputChannel::onTimeout()
+{
+}
+
+void
+InputChannel::onError()
+{
+}
Transport.o \
HTTPTransport.o \
ThreadPool.o \
- EventManager.o
+ EventManager.o \
+ InputChannel.cpp
plain: main.o $(OBJS)
$(LINK) -o $@ $+ $(THREAD_LIB)
--- /dev/null
+#ifndef _MESSAGE_H_
+#define _MESSAGE_H
+
+class Message {
+public:
+ class ID {
+ public:
+ private:
+ unsigned char m_uuid[16];
+ };
+
+ Message(void * data, unsigned int length)
+ : m_length(length),
+ m_data(data)
+ {}
+
+private:
+ ID m_id;
+ unsigned int m_length;
+ void * m_data;
+};
+
+#endif
--- /dev/null
+#ifndef _PROPERTIES_H_
+#define _PROPERTIES_H_
+
+#include <map>
+
+class Properties {
+public:
+
+ // default constructor
+ Properties()
+ : properties()
+ {}
+
+ // accessors
+ string& getProperty(string &key)
+ { return properties[key]; }
+
+ void setProperty(string &key, string &val)
+ { properties[key] = val; }
+
+ // iterators
+ typedef std::map<string,string>::iterator iterator;
+
+ iterator begin()
+ { return properties.begin(); }
+
+ iterator end()
+ { return properties.end(); }
+
+
+private:
+ std::map<string,string> properties;
+};
+
+#endif
#include "ThreadPool.H"
#include "SocketInput.H"
+#include "InputChannel.H"
#include "Exception.H"
SocketInput::onReady()
{
Connection *conn = cFactory->accept(fd);
- Transport *trans = tFactory->newTransport(conn);
- ThreadPool::instance()->queueWorkRead(trans);
+ Transport *trans = tFactory->newTransport();
+ InputChannel *channel = new InputChannel(conn, trans);
+ channel->start();
}
#define _TRANSPORT_H
#include "Connection.H"
-#include "ThreadPool.H"
+#include "Properties.H"
-class Transport: public ThreadPool::WorkDescription {
+// Transport implements transport protocol
+// - reads/writes messages using Connection interface
+// -
+class Transport {
public:
+
+ //
class Factory {
public:
- virtual Transport *newTransport(Connection *conn) const = 0;
+ virtual Transport *newTransport() const = 0;
- virtual ~Factory();
+ virtual ~Factory()
+ {}
};
- Transport(Connection *a_conn)
- : ThreadPool::WorkDescription(a_conn ? a_conn->getFD() : -1),
- conn(a_conn)
- {}
- virtual ~Transport()
- { if(conn) delete conn; }
+ //
+ class Message {
+ public:
+
+ Message();
+
+ int getContent(char* &data) const;
+ int getContentLength() const;
+ int getHeader(const char* name, char* &val) const;
+ Properties& getProperties();
+ char *path() const;
+
+ private:
+
+ };
+
+ //
+ Transport()
+ {}
-protected:
- Connection *conn;
+ virtual ~Transport();
+ //
+ virtual int receive(Connection *conn, Message* &msg) = 0;
+ virtual int send(Connection *conn, Message* msg) = 0;
+ virtual void reset() = 0;
};
#endif
--- /dev/null
+#include "Transport.H"
+
+Transport::~Transport()
+{
+}
#include "SocketInput.H"
#include "PlainConnection.H"
#include "HTTPTransport.H"
+#include "EventManager.H"
const int num_threads = 2;
const char *sock_path = "/tmp/il_sock";
+EventManager theEventManager();
+
int main(int argc, char *argv[])
{
SocketInput *input;