/usr/share/cruisecontrol-bin-2.6.1/projects/qpid-trunk/cpp/src/qpid/client/Connector.h

00001 /*
00002  *
00003  * Licensed to the Apache Software Foundation (ASF) under one
00004  * or more contributor license agreements.  See the NOTICE file
00005  * distributed with this work for additional information
00006  * regarding copyright ownership.  The ASF licenses this file
00007  * to you under the Apache License, Version 2.0 (the
00008  * "License"); you may not use this file except in compliance
00009  * with the License.  You may obtain a copy of the License at
00010  * 
00011  *   http://www.apache.org/licenses/LICENSE-2.0
00012  * 
00013  * Unless required by applicable law or agreed to in writing,
00014  * software distributed under the License is distributed on an
00015  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
00016  * KIND, either express or implied.  See the License for the
00017  * specific language governing permissions and limitations
00018  * under the License.
00019  *
00020  */
00021 #ifndef _Connector_
00022 #define _Connector_
00023 
00024 
00025 #include "qpid/framing/InputHandler.h"
00026 #include "qpid/framing/OutputHandler.h"
00027 #include "qpid/framing/InitiationHandler.h"
00028 #include "qpid/framing/ProtocolInitiation.h"
00029 #include "qpid/framing/ProtocolVersion.h"
00030 #include "qpid/sys/ShutdownHandler.h"
00031 #include "qpid/sys/TimeoutHandler.h"
00032 #include "qpid/sys/Thread.h"
00033 #include "qpid/sys/Runnable.h"
00034 #include "qpid/sys/Monitor.h"
00035 #include "qpid/sys/Socket.h"
00036 #include "qpid/sys/Time.h"
00037 #include "qpid/sys/AsynchIO.h"
00038 
00039 #include <queue>
00040 
00041 namespace qpid {
00042         
00043 namespace client {
00044 
00045 class Connector : public framing::OutputHandler, 
00046                   private sys::Runnable
00047 {
00048     struct Buff;
00049 
00051     class Writer : public framing::FrameHandler {
00052         typedef sys::AsynchIO::BufferBase BufferBase;
00053         typedef std::vector<framing::AMQFrame> Frames;
00054 
00055         sys::Mutex lock;
00056         sys::AsynchIO* aio;
00057         BufferBase* buffer;
00058         Frames frames;
00059         size_t lastEof; // Position after last EOF in frames
00060         framing::Buffer encode;
00061         size_t framesEncoded;
00062         std::string identifier;
00063         
00064         void writeOne(const sys::Mutex::ScopedLock&);
00065         void newBuffer(const sys::Mutex::ScopedLock&);
00066 
00067       public:
00068         
00069         Writer();
00070         ~Writer();
00071         void setAio(sys::AsynchIO*);
00072         void handle(framing::AMQFrame&);
00073         void write(sys::AsynchIO&);
00074     };
00075     
00076     const bool debug;
00077     const int receive_buffer_size;
00078     const int send_buffer_size;
00079     framing::ProtocolVersion version;
00080 
00081     sys::Mutex closedLock;
00082     bool closed;
00083     bool joined;
00084 
00085     sys::AbsTime lastIn;
00086     sys::AbsTime lastOut;
00087     sys::Duration timeout;
00088     sys::Duration idleIn;
00089     sys::Duration idleOut;
00090 
00091     sys::TimeoutHandler* timeoutHandler;
00092     sys::ShutdownHandler* shutdownHandler;
00093     framing::InputHandler* input;
00094     framing::InitiationHandler* initialiser;
00095     framing::OutputHandler* output;
00096 
00097     Writer writer;
00098     
00099     sys::Thread receiver;
00100 
00101     sys::Socket socket;
00102 
00103     sys::AsynchIO* aio;
00104     sys::Poller::shared_ptr poller;
00105 
00106     void checkIdle(ssize_t status);
00107     void setSocketTimeout();
00108 
00109     void run();
00110     void handleClosed();
00111     bool closeInternal();
00112     
00113     void readbuff(qpid::sys::AsynchIO&, qpid::sys::AsynchIO::BufferBase*);
00114     void writebuff(qpid::sys::AsynchIO&);
00115     void writeDataBlock(const framing::AMQDataBlock& data);
00116     void eof(qpid::sys::AsynchIO&);
00117 
00118     std::string identifier;
00119     
00120   friend class Channel;
00121 
00122   public:
00123     Connector(framing::ProtocolVersion pVersion,
00124               bool debug = false, uint32_t buffer_size = 1024);
00125     virtual ~Connector();
00126     virtual void connect(const std::string& host, int port);
00127     virtual void init();
00128     virtual void close();
00129     virtual void setInputHandler(framing::InputHandler* handler);
00130     virtual void setTimeoutHandler(sys::TimeoutHandler* handler);
00131     virtual void setShutdownHandler(sys::ShutdownHandler* handler);
00132     virtual sys::ShutdownHandler* getShutdownHandler() { return shutdownHandler; }
00133     virtual framing::OutputHandler* getOutputHandler();
00134     virtual void send(framing::AMQFrame& frame);
00135     virtual void setReadTimeout(uint16_t timeout);
00136     virtual void setWriteTimeout(uint16_t timeout);
00137     const std::string& getIdentifier() const { return identifier; }
00138 };
00139 
00140 }}
00141 
00142 
00143 #endif

Generated on Thu Apr 10 11:08:17 2008 for Qpid by  doxygen 1.4.7