00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022 #ifndef _SessionCore_
00023 #define _SessionCore_
00024
00025 #include "qpid/shared_ptr.h"
00026 #include "qpid/framing/FrameHandler.h"
00027 #include "qpid/framing/ChannelHandler.h"
00028 #include "qpid/framing/SessionState.h"
00029 #include "qpid/framing/SequenceNumber.h"
00030 #include "qpid/framing/AMQP_ClientOperations.h"
00031 #include "qpid/framing/AMQP_ServerProxy.h"
00032 #include "qpid/sys/StateMonitor.h"
00033 #include "ExecutionHandler.h"
00034
00035 #include <boost/optional.hpp>
00036
00037 namespace qpid {
00038 namespace framing {
00039 class FrameSet;
00040 class MethodContent;
00041 class SequenceNumberSet;
00042 }
00043
00044 namespace client {
00045
00046 class Future;
00047 class ConnectionImpl;
00048
00054 class SessionCore : public framing::FrameHandler::InOutHandler,
00055 private framing::AMQP_ClientOperations::SessionHandler
00056 {
00057 public:
00058 SessionCore(shared_ptr<ConnectionImpl>, uint16_t channel, uint64_t maxFrameSize);
00059 ~SessionCore();
00060
00061 framing::FrameSet::shared_ptr get();
00062 const framing::Uuid getId() const;
00063 uint16_t getChannel() const { return channel; }
00064 void assertOpen() const;
00065
00066
00067 void open(uint32_t detachedLifetime);
00068 void close();
00069 void resume(shared_ptr<ConnectionImpl>);
00070 void suspend();
00071 void setChannel(uint16_t channel);
00072
00073 void setSync(bool s);
00074 bool isSync();
00075 ExecutionHandler& getExecution();
00076
00077 Future send(const framing::AMQBody& command);
00078
00079 Future send(const framing::AMQBody& command, const framing::MethodContent& content);
00080
00081 void connectionClosed(uint16_t code, const std::string& text);
00082 void connectionBroke(uint16_t code, const std::string& text);
00083
00084 private:
00085 enum State {
00086 OPENING,
00087 RESUMING,
00088 OPEN,
00089 CLOSING,
00090 SUSPENDING,
00091 SUSPENDED,
00092 CLOSED
00093 };
00094 typedef framing::AMQP_ClientOperations::SessionHandler SessionHandler;
00095 typedef sys::StateMonitor<State, CLOSED> StateMonitor;
00096 typedef StateMonitor::Set States;
00097
00098 inline void invariant() const;
00099 inline void setState(State s);
00100 inline void waitFor(State);
00101 void doClose(int code, const std::string& text);
00102 void doSuspend(int code, const std::string& text);
00103
00105 void check(bool condition, int code, const std::string& text) const;
00107 void check() const;
00108
00109 void handleIn(framing::AMQFrame& frame);
00110 void handleOut(framing::AMQFrame& frame);
00111
00112
00113 void attached(const framing::Uuid& sessionId, uint32_t detachedLifetime);
00114 void flow(bool active);
00115 void flowOk(bool active);
00116 void detached();
00117 void ack(uint32_t cumulativeSeenMark,
00118 const framing::SequenceNumberSet& seenFrameSet);
00119 void highWaterMark(uint32_t lastSentMark);
00120 void solicitAck();
00121 void closed(uint16_t code, const std::string& text);
00122
00123 void attaching(shared_ptr<ConnectionImpl>);
00124 void detach(int code, const std::string& text);
00125 void checkOpen() const;
00126
00127 int code;
00128 std::string text;
00129 boost::optional<framing::SessionState> session;
00130 shared_ptr<ConnectionImpl> connection;
00131 ExecutionHandler l3;
00132 volatile bool sync;
00133 framing::ChannelHandler channel;
00134 framing::AMQP_ServerProxy::Session proxy;
00135 mutable StateMonitor state;
00136 uint32_t detachedLifetime;
00137 };
00138
00139 }}
00140
00141 #endif