00001 #ifndef QPID_SESSIONSTATE_H
00002 #define QPID_SESSIONSTATE_H
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025 #include <qpid/SessionId.h>
00026 #include <qpid/framing/SequenceNumber.h>
00027 #include <qpid/framing/SequenceSet.h>
00028 #include <qpid/framing/AMQFrame.h>
00029 #include <qpid/framing/FrameHandler.h>
00030 #include <boost/operators.hpp>
00031 #include <boost/range/iterator_range.hpp>
00032 #include <vector>
00033 #include <iosfwd>
00034
00035 namespace qpid {
00036 using framing::SequenceNumber;
00037 using framing::SequenceSet;
00038
00040 struct SessionPoint : boost::totally_ordered1<SessionPoint> {
00041 SessionPoint(SequenceNumber command = 0, uint64_t offset = 0);
00042
00043 SequenceNumber command;
00044 uint64_t offset;
00045
00047 void advance(const framing::AMQFrame& f);
00048
00049 bool operator<(const SessionPoint&) const;
00050 bool operator==(const SessionPoint&) const;
00051 };
00052
00053 std::ostream& operator<<(std::ostream&, const SessionPoint&);
00054
00073 class SessionState {
00074 typedef std::vector<framing::AMQFrame> ReplayList;
00075
00076 public:
00077
00078 typedef boost::iterator_range<ReplayList::iterator> ReplayRange;
00079
00080 struct Configuration {
00081 Configuration(size_t flush=1024*1024, size_t hard=0);
00082 size_t replayFlushLimit;
00083 size_t replayHardLimit;
00084 };
00085
00086 SessionState(const SessionId& =SessionId(), const Configuration& =Configuration());
00087
00088 virtual ~SessionState();
00089
00090 bool hasState() const;
00091
00092 const SessionId& getId() const { return id; }
00093
00094 uint32_t getTimeout() const { return timeout; }
00095 void setTimeout(uint32_t seconds) { timeout = seconds; }
00096
00097 bool operator==(const SessionId& other) const { return id == other; }
00098 bool operator==(const SessionState& other) const { return id == other.id; }
00099
00100
00101
00103 virtual void senderRecord(const framing::AMQFrame& f);
00104
00106 virtual bool senderNeedFlush() const;
00107
00109 virtual void senderRecordFlush();
00110
00112 virtual bool senderNeedKnownCompleted() const;
00113
00115 virtual void senderRecordKnownCompleted();
00116
00118 virtual void senderConfirmed(const SessionPoint& confirmed);
00119
00121 virtual void senderCompleted(const SequenceSet& commands);
00122
00124 virtual SessionPoint senderGetCommandPoint();
00125
00127 virtual SequenceSet senderGetIncomplete() const;
00128
00130 virtual SessionPoint senderGetReplayPoint() const;
00131
00135 virtual ReplayRange senderExpected(const SessionPoint& expected);
00136
00137
00138
00140 virtual void receiverSetCommandPoint(const SessionPoint& point);
00141
00143 virtual bool receiverRecord(const framing::AMQFrame& f);
00144
00146 virtual void receiverCompleted(SequenceNumber command, bool cumulative=false);
00147
00149 virtual void receiverKnownCompleted(const SequenceSet& commands);
00150
00154 virtual bool receiverNeedKnownCompleted() const;
00155
00157 virtual const SessionPoint& receiverGetExpected() const;
00158
00160 virtual const SessionPoint& receiverGetReceived() const;
00161
00163 virtual const SequenceSet& receiverGetUnknownComplete() const;
00164
00166 virtual const SequenceSet& receiverGetIncomplete() const;
00167
00169 virtual SequenceNumber receiverGetCurrent() const;
00170
00174 virtual void setState(
00175 const SequenceNumber& replayStart,
00176 const SequenceNumber& sendCommandPoint,
00177 const SequenceSet& sentIncomplete,
00178 const SequenceNumber& expected,
00179 const SequenceNumber& received,
00180 const SequenceSet& unknownCompleted,
00181 const SequenceSet& receivedIncomplete
00182 );
00183
00184 private:
00185
00186 struct SendState {
00187 SendState();
00188
00189 SessionPoint replayPoint;
00190 SessionPoint flushPoint;
00191 SessionPoint sendPoint;
00192 ReplayList replayList;
00193 size_t unflushedSize;
00194 size_t replaySize;
00195 SequenceSet incomplete;
00196 size_t bytesSinceKnownCompleted;
00197 } sender;
00198
00199 struct ReceiveState {
00200 ReceiveState();
00201 SessionPoint expected;
00202 SessionPoint received;
00203 SequenceSet unknownCompleted;
00204 SequenceSet incomplete;
00205 size_t bytesSinceKnownCompleted;
00206 } receiver;
00207
00208 SessionId id;
00209 uint32_t timeout;
00210 Configuration config;
00211 bool stateful;
00212 };
00213
00214 inline bool operator==(const SessionId& id, const SessionState& s) { return s == id; }
00215
00216 }
00217
00218
00219 #endif