/usr/share/cruisecontrol-bin-2.6.1/projects/qpid-trunk/cpp/src/qpid/sys/AsynchIO.h

00001 #ifndef _sys_AsynchIO
00002 #define _sys_AsynchIO
00003 /*
00004  *
00005  * Licensed to the Apache Software Foundation (ASF) under one
00006  * or more contributor license agreements.  See the NOTICE file
00007  * distributed with this work for additional information
00008  * regarding copyright ownership.  The ASF licenses this file
00009  * to you under the Apache License, Version 2.0 (the
00010  * "License"); you may not use this file except in compliance
00011  * with the License.  You may obtain a copy of the License at
00012  * 
00013  *   http://www.apache.org/licenses/LICENSE-2.0
00014  * 
00015  * Unless required by applicable law or agreed to in writing,
00016  * software distributed under the License is distributed on an
00017  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
00018  * KIND, either express or implied.  See the License for the
00019  * specific language governing permissions and limitations
00020  * under the License.
00021  *
00022  */
00023 
00024 #include "Dispatcher.h"
00025 
00026 #include <boost/function.hpp>
00027 #include <deque>
00028 
00029 namespace qpid {
00030 namespace sys {
00031 
00032 /*
00033  * Asynchronous acceptor: accepts connections then does a callback with the
00034  * accepted fd
00035  */
00036 class AsynchAcceptor {
00037 public:
00038     typedef boost::function1<void, const Socket&> Callback;
00039 
00040 private:
00041     Callback acceptedCallback;
00042     DispatchHandle handle;
00043 
00044 public:
00045     AsynchAcceptor(const Socket& s, Callback callback);
00046     void start(Poller::shared_ptr poller);
00047 
00048 private:
00049     void readable(DispatchHandle& handle);
00050 };
00051 
00052 /*
00053  * Asycnchronous reader/writer: 
00054  * Reader accepts buffers to read into; reads into the provided buffers
00055  * and then does a callback with the buffer and amount read. Optionally it can callback
00056  * when there is something to read but no buffer to read it into.
00057  * 
00058  * Writer accepts a buffer and queues it for writing; can also be given
00059  * a callback for when writing is "idle" (ie fd is writable, but nothing to write)
00060  * 
00061  * The class is implemented in terms of DispatchHandle to allow it to be deleted by deleting
00062  * the contained DispatchHandle
00063  */
00064 class AsynchIO : private DispatchHandle {
00065 public:
00066     struct BufferBase {
00067         char* const bytes;
00068         const int32_t byteCount;
00069         int32_t dataStart;
00070         int32_t dataCount;
00071         
00072         BufferBase(char* const b, const int32_t s) :
00073             bytes(b),
00074             byteCount(s),
00075             dataStart(0),
00076             dataCount(0)
00077         {}
00078         
00079         virtual ~BufferBase()
00080         {}
00081     };
00082 
00083     typedef boost::function2<void, AsynchIO&, BufferBase*> ReadCallback;
00084     typedef boost::function1<void, AsynchIO&> EofCallback;
00085     typedef boost::function1<void, AsynchIO&> DisconnectCallback;
00086     typedef boost::function2<void, AsynchIO&, const Socket&> ClosedCallback;
00087     typedef boost::function1<void, AsynchIO&> BuffersEmptyCallback;
00088     typedef boost::function1<void, AsynchIO&> IdleCallback;
00089 
00090 private:
00091     ReadCallback readCallback;
00092     EofCallback eofCallback;
00093     DisconnectCallback disCallback;
00094     ClosedCallback closedCallback;
00095     BuffersEmptyCallback emptyCallback;
00096     IdleCallback idleCallback;
00097     std::deque<BufferBase*> bufferQueue;
00098     std::deque<BufferBase*> writeQueue;
00099     bool queuedClose;
00106     volatile bool writePending;
00107 
00108 public:
00109     AsynchIO(const Socket& s,
00110         ReadCallback rCb, EofCallback eofCb, DisconnectCallback disCb,
00111         ClosedCallback cCb = 0, BuffersEmptyCallback eCb = 0, IdleCallback iCb = 0);
00112     void queueForDeletion();
00113 
00114     void start(Poller::shared_ptr poller);
00115     void queueReadBuffer(BufferBase* buff);
00116     void unread(BufferBase* buff);
00117     void queueWrite(BufferBase* buff);
00118     void notifyPendingWrite();
00119     void queueWriteClose();
00120     bool writeQueueEmpty() { return writeQueue.empty(); }
00121     BufferBase* getQueuedBuffer();
00122     const Socket& getSocket() const { return DispatchHandle::getSocket(); }
00123 
00124 private:
00125     ~AsynchIO();
00126     void readable(DispatchHandle& handle);
00127     void writeable(DispatchHandle& handle);
00128     void disconnected(DispatchHandle& handle);
00129     void close(DispatchHandle& handle);
00130 };
00131 
00132 }}
00133 
00134 #endif // _sys_AsynchIO

Generated on Thu Apr 10 11:08:18 2008 for Qpid by  doxygen 1.4.7