00001 #ifndef _sys_AsynchIO
00002 #define _sys_AsynchIO
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024 #include "Dispatcher.h"
00025
00026 #include <boost/function.hpp>
00027 #include <deque>
00028
00029 namespace qpid {
00030 namespace sys {
00031
00032
00033
00034
00035
00036 class AsynchAcceptor {
00037 public:
00038 typedef boost::function1<void, const Socket&> Callback;
00039
00040 private:
00041 Callback acceptedCallback;
00042 DispatchHandle handle;
00043
00044 public:
00045 AsynchAcceptor(const Socket& s, Callback callback);
00046 void start(Poller::shared_ptr poller);
00047
00048 private:
00049 void readable(DispatchHandle& handle);
00050 };
00051
00052
00053
00054
00055
00056
00057
00058
00059
00060
00061
00062
00063
00064 class AsynchIO : private DispatchHandle {
00065 public:
00066 struct BufferBase {
00067 char* const bytes;
00068 const int32_t byteCount;
00069 int32_t dataStart;
00070 int32_t dataCount;
00071
00072 BufferBase(char* const b, const int32_t s) :
00073 bytes(b),
00074 byteCount(s),
00075 dataStart(0),
00076 dataCount(0)
00077 {}
00078
00079 virtual ~BufferBase()
00080 {}
00081 };
00082
00083 typedef boost::function2<void, AsynchIO&, BufferBase*> ReadCallback;
00084 typedef boost::function1<void, AsynchIO&> EofCallback;
00085 typedef boost::function1<void, AsynchIO&> DisconnectCallback;
00086 typedef boost::function2<void, AsynchIO&, const Socket&> ClosedCallback;
00087 typedef boost::function1<void, AsynchIO&> BuffersEmptyCallback;
00088 typedef boost::function1<void, AsynchIO&> IdleCallback;
00089
00090 private:
00091 ReadCallback readCallback;
00092 EofCallback eofCallback;
00093 DisconnectCallback disCallback;
00094 ClosedCallback closedCallback;
00095 BuffersEmptyCallback emptyCallback;
00096 IdleCallback idleCallback;
00097 std::deque<BufferBase*> bufferQueue;
00098 std::deque<BufferBase*> writeQueue;
00099 bool queuedClose;
00106 volatile bool writePending;
00107
00108 public:
00109 AsynchIO(const Socket& s,
00110 ReadCallback rCb, EofCallback eofCb, DisconnectCallback disCb,
00111 ClosedCallback cCb = 0, BuffersEmptyCallback eCb = 0, IdleCallback iCb = 0);
00112 void queueForDeletion();
00113
00114 void start(Poller::shared_ptr poller);
00115 void queueReadBuffer(BufferBase* buff);
00116 void unread(BufferBase* buff);
00117 void queueWrite(BufferBase* buff);
00118 void notifyPendingWrite();
00119 void queueWriteClose();
00120 bool writeQueueEmpty() { return writeQueue.empty(); }
00121 BufferBase* getQueuedBuffer();
00122 const Socket& getSocket() const { return DispatchHandle::getSocket(); }
00123
00124 private:
00125 ~AsynchIO();
00126 void readable(DispatchHandle& handle);
00127 void writeable(DispatchHandle& handle);
00128 void disconnected(DispatchHandle& handle);
00129 void close(DispatchHandle& handle);
00130 };
00131
00132 }}
00133
00134 #endif // _sys_AsynchIO