00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021 #ifndef _ExecutionHandler_
00022 #define _ExecutionHandler_
00023
00024 #include <queue>
00025 #include "qpid/framing/AccumulatedAck.h"
00026 #include "qpid/framing/AMQP_ServerOperations.h"
00027 #include "qpid/framing/FrameSet.h"
00028 #include "qpid/framing/MethodContent.h"
00029 #include "qpid/framing/SequenceNumber.h"
00030 #include "qpid/sys/Mutex.h"
00031 #include "ChainableFrameHandler.h"
00032 #include "CompletionTracker.h"
00033 #include "Correlator.h"
00034 #include "Demux.h"
00035 #include "Execution.h"
00036
00037 namespace qpid {
00038 namespace client {
00039
00040 class ExecutionHandler :
00041 public framing::AMQP_ServerOperations::ExecutionHandler,
00042 public framing::FrameHandler,
00043 public Execution
00044 {
00045 framing::SequenceNumber incomingCounter;
00046 framing::AccumulatedAck incomingCompletionStatus;
00047 framing::SequenceNumber outgoingCounter;
00048 framing::AccumulatedAck outgoingCompletionStatus;
00049 framing::FrameSet::shared_ptr arriving;
00050 Correlator correlation;
00051 CompletionTracker completion;
00052 Demux demux;
00053 sys::Mutex lock;
00054 framing::ProtocolVersion version;
00055 uint64_t maxFrameSize;
00056 boost::function<void()> completionListener;
00057
00058 void complete(uint32_t mark, const framing::SequenceNumberSet& range);
00059 void flush();
00060 void noop();
00061 void result(uint32_t command, const std::string& data);
00062 void sync();
00063
00064 void sendCompletion();
00065
00066 framing::SequenceNumber send(const framing::AMQBody&, CompletionTracker::ResultListener, bool hasContent);
00067 void sendContent(const framing::MethodContent&);
00068
00069 public:
00070 typedef CompletionTracker::ResultListener ResultListener;
00071
00072
00073 framing::FrameHandler::Chain out;
00074
00075 ExecutionHandler(uint64_t maxFrameSize = 65535);
00076
00077
00078 void handle(framing::AMQFrame& frame);
00079
00080 framing::SequenceNumber send(const framing::AMQBody& command, ResultListener=ResultListener());
00081 framing::SequenceNumber send(const framing::AMQBody& command, const framing::MethodContent& content,
00082 ResultListener=ResultListener());
00083 framing::SequenceNumber lastSent() const;
00084 void sendSyncRequest();
00085 void sendFlushRequest();
00086 void completed(const framing::SequenceNumber& id, bool cumulative, bool send);
00087 void syncTo(const framing::SequenceNumber& point);
00088 void flushTo(const framing::SequenceNumber& point);
00089 void syncWait(const framing::SequenceNumber& id);
00090
00091 bool isComplete(const framing::SequenceNumber& id);
00092 bool isCompleteUpTo(const framing::SequenceNumber& id);
00093
00094 void setMaxFrameSize(uint64_t size) { maxFrameSize = size; }
00095 Correlator& getCorrelator() { return correlation; }
00096 CompletionTracker& getCompletionTracker() { return completion; }
00097 Demux& getDemux() { return demux; }
00098
00099 void setCompletionListener(boost::function<void()>);
00100 };
00101
00102 }}
00103
00104 #endif