00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021 #ifndef _Connector_
00022 #define _Connector_
00023
00024
00025 #include "qpid/framing/InputHandler.h"
00026 #include "qpid/framing/OutputHandler.h"
00027 #include "qpid/framing/InitiationHandler.h"
00028 #include "qpid/framing/ProtocolInitiation.h"
00029 #include "qpid/framing/ProtocolVersion.h"
00030 #include "qpid/sys/ShutdownHandler.h"
00031 #include "qpid/sys/TimeoutHandler.h"
00032 #include "qpid/sys/Thread.h"
00033 #include "qpid/sys/Runnable.h"
00034 #include "qpid/sys/Monitor.h"
00035 #include "qpid/sys/Socket.h"
00036 #include "qpid/sys/Time.h"
00037 #include "qpid/sys/AsynchIO.h"
00038
00039 #include <queue>
00040
00041 namespace qpid {
00042
00043 namespace client {
00044
00045 class Connector : public framing::OutputHandler,
00046 private sys::Runnable
00047 {
00048 struct Buff;
00049
00051 class Writer : public framing::FrameHandler {
00052 typedef sys::AsynchIO::BufferBase BufferBase;
00053 typedef std::vector<framing::AMQFrame> Frames;
00054
00055 sys::Mutex lock;
00056 sys::AsynchIO* aio;
00057 BufferBase* buffer;
00058 Frames frames;
00059 size_t lastEof;
00060 framing::Buffer encode;
00061 size_t framesEncoded;
00062 std::string identifier;
00063
00064 void writeOne(const sys::Mutex::ScopedLock&);
00065 void newBuffer(const sys::Mutex::ScopedLock&);
00066
00067 public:
00068
00069 Writer();
00070 ~Writer();
00071 void setAio(sys::AsynchIO*);
00072 void handle(framing::AMQFrame&);
00073 void write(sys::AsynchIO&);
00074 };
00075
00076 const bool debug;
00077 const int receive_buffer_size;
00078 const int send_buffer_size;
00079 framing::ProtocolVersion version;
00080
00081 sys::Mutex closedLock;
00082 bool closed;
00083 bool joined;
00084
00085 sys::AbsTime lastIn;
00086 sys::AbsTime lastOut;
00087 sys::Duration timeout;
00088 sys::Duration idleIn;
00089 sys::Duration idleOut;
00090
00091 sys::TimeoutHandler* timeoutHandler;
00092 sys::ShutdownHandler* shutdownHandler;
00093 framing::InputHandler* input;
00094 framing::InitiationHandler* initialiser;
00095 framing::OutputHandler* output;
00096
00097 Writer writer;
00098
00099 sys::Thread receiver;
00100
00101 sys::Socket socket;
00102
00103 sys::AsynchIO* aio;
00104 sys::Poller::shared_ptr poller;
00105
00106 void checkIdle(ssize_t status);
00107 void setSocketTimeout();
00108
00109 void run();
00110 void handleClosed();
00111 bool closeInternal();
00112
00113 void readbuff(qpid::sys::AsynchIO&, qpid::sys::AsynchIO::BufferBase*);
00114 void writebuff(qpid::sys::AsynchIO&);
00115 void writeDataBlock(const framing::AMQDataBlock& data);
00116 void eof(qpid::sys::AsynchIO&);
00117
00118 std::string identifier;
00119
00120 friend class Channel;
00121
00122 public:
00123 Connector(framing::ProtocolVersion pVersion,
00124 bool debug = false, uint32_t buffer_size = 1024);
00125 virtual ~Connector();
00126 virtual void connect(const std::string& host, int port);
00127 virtual void init();
00128 virtual void close();
00129 virtual void setInputHandler(framing::InputHandler* handler);
00130 virtual void setTimeoutHandler(sys::TimeoutHandler* handler);
00131 virtual void setShutdownHandler(sys::ShutdownHandler* handler);
00132 virtual sys::ShutdownHandler* getShutdownHandler() { return shutdownHandler; }
00133 virtual framing::OutputHandler* getOutputHandler();
00134 virtual void send(framing::AMQFrame& frame);
00135 virtual void setReadTimeout(uint16_t timeout);
00136 virtual void setWriteTimeout(uint16_t timeout);
00137 const std::string& getIdentifier() const { return identifier; }
00138 };
00139
00140 }}
00141
00142
00143 #endif