00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021 #ifndef _SemanticHandler_
00022 #define _SemanticHandler_
00023
00024 #include <memory>
00025 #include "BrokerAdapter.h"
00026 #include "DeliveryAdapter.h"
00027 #include "MessageBuilder.h"
00028 #include "IncomingExecutionContext.h"
00029 #include "HandlerImpl.h"
00030
00031 #include "qpid/framing/amqp_types.h"
00032 #include "qpid/framing/AMQP_ServerOperations.h"
00033 #include "qpid/framing/FrameHandler.h"
00034 #include "qpid/framing/SequenceNumber.h"
00035
00036 #include <boost/function.hpp>
00037
00038 namespace qpid {
00039
00040 namespace framing {
00041 class AMQMethodBody;
00042 class AMQHeaderBody;
00043 class AMQContentBody;
00044 class AMQHeaderBody;
00045 }
00046
00047 namespace broker {
00048
00049 class SessionContext;
00050
00051 class SemanticHandler : public DeliveryAdapter,
00052 public framing::FrameHandler,
00053 public framing::AMQP_ServerOperations::ExecutionHandler
00054
00055 {
00056 typedef boost::function<void(DeliveryId, DeliveryId)> RangedOperation;
00057
00058 SemanticState state;
00059 SessionContext& session;
00060
00061
00062 IncomingExecutionContext incoming;
00063 framing::Window outgoing;
00064 MessageBuilder msgBuilder;
00065 RangedOperation ackOp;
00066
00067 enum TrackId {EXECUTION_CONTROL_TRACK, MODEL_COMMAND_TRACK, MODEL_CONTENT_TRACK};
00068 TrackId getTrack(const framing::AMQFrame& frame);
00069
00070 void handleL3(framing::AMQMethodBody* method);
00071 void handleCommand(framing::AMQMethodBody* method);
00072 void handleContent(framing::AMQFrame& frame);
00073
00074 void sendCompletion();
00075
00076
00077 DeliveryId deliver(QueuedMessage& msg, DeliveryToken::shared_ptr token);
00078
00079 framing::AMQP_ClientProxy& getProxy() { return session.getProxy(); }
00080
00081 Broker& getBroker() { return session.getConnection().getBroker(); }
00082
00083 public:
00084 SemanticHandler(SessionContext& session);
00085
00086
00087 void handle(framing::AMQFrame& frame);
00088
00089
00090 void complete(uint32_t cumulativeExecutionMark, const framing::SequenceNumberSet& range);
00091 void flush();
00092 void noop();
00093 void result(uint32_t command, const std::string& data);
00094 void sync();
00095
00096
00097 SemanticState& getSemanticState() { return state; }
00098 };
00099
00100 }}
00101
00102 #endif