00001 #ifndef QPID_BROKER_SEMANTICSTATE_H
00002 #define QPID_BROKER_SEMANTICSTATE_H
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025 #include "Consumer.h"
00026 #include "Deliverable.h"
00027 #include "DeliveryAdapter.h"
00028 #include "DeliveryRecord.h"
00029 #include "DeliveryToken.h"
00030 #include "DtxBuffer.h"
00031 #include "DtxManager.h"
00032 #include "NameGenerator.h"
00033 #include "Prefetch.h"
00034 #include "TxBuffer.h"
00035
00036 #include "qpid/framing/FrameHandler.h"
00037 #include "qpid/framing/AccumulatedAck.h"
00038 #include "qpid/framing/Uuid.h"
00039 #include "qpid/sys/AggregateOutput.h"
00040 #include "qpid/shared_ptr.h"
00041
00042 #include <list>
00043 #include <map>
00044 #include <vector>
00045
00046 #include <boost/intrusive_ptr.hpp>
00047
00048 namespace qpid {
00049 namespace broker {
00050
00051 class SessionContext;
00052
00057 class SemanticState : public framing::FrameHandler::Chains,
00058 public sys::OutputTask,
00059 private boost::noncopyable
00060 {
00061 class ConsumerImpl : public Consumer, public sys::OutputTask
00062 {
00063 SemanticState* const parent;
00064 const DeliveryToken::shared_ptr token;
00065 const string name;
00066 const Queue::shared_ptr queue;
00067 const bool ackExpected;
00068 const bool nolocal;
00069 const bool acquire;
00070 bool blocked;
00071 bool windowing;
00072 uint32_t msgCredit;
00073 uint32_t byteCredit;
00074
00075 bool checkCredit(boost::intrusive_ptr<Message>& msg);
00076 void allocateCredit(boost::intrusive_ptr<Message>& msg);
00077
00078 public:
00079 ConsumerImpl(SemanticState* parent, DeliveryToken::shared_ptr token,
00080 const string& name, Queue::shared_ptr queue,
00081 bool ack, bool nolocal, bool acquire);
00082 ~ConsumerImpl();
00083 bool deliver(QueuedMessage& msg);
00084 bool filter(boost::intrusive_ptr<Message> msg);
00085 bool accept(boost::intrusive_ptr<Message> msg);
00086 void notify();
00087
00088 void setWindowMode();
00089 void setCreditMode();
00090 void addByteCredit(uint32_t value);
00091 void addMessageCredit(uint32_t value);
00092 void flush();
00093 void stop();
00094 void complete(DeliveryRecord&);
00095 Queue::shared_ptr getQueue() { return queue; }
00096 bool isBlocked() const { return blocked; }
00097
00098 bool doOutput();
00099 };
00100
00101 typedef boost::ptr_map<std::string,ConsumerImpl> ConsumerImplMap;
00102 typedef std::map<std::string, DtxBuffer::shared_ptr> DtxBufferMap;
00103
00104 SessionContext& session;
00105 DeliveryAdapter& deliveryAdapter;
00106 Queue::shared_ptr defaultQueue;
00107 ConsumerImplMap consumers;
00108 uint32_t prefetchSize;
00109 uint16_t prefetchCount;
00110 Prefetch outstanding;
00111 NameGenerator tagGenerator;
00112 std::list<DeliveryRecord> unacked;
00113 TxBuffer::shared_ptr txBuffer;
00114 DtxBuffer::shared_ptr dtxBuffer;
00115 bool dtxSelected;
00116 DtxBufferMap suspendedXids;
00117 framing::AccumulatedAck accumulatedAck;
00118 bool flowActive;
00119 boost::shared_ptr<Exchange> cacheExchange;
00120 sys::AggregateOutput outputTasks;
00121
00122 void route(boost::intrusive_ptr<Message> msg, Deliverable& strategy);
00123 void record(const DeliveryRecord& delivery);
00124 bool checkPrefetch(boost::intrusive_ptr<Message>& msg);
00125 void checkDtxTimeout();
00126 ConsumerImpl& find(const std::string& destination);
00127 void ack(DeliveryId deliveryTag, DeliveryId endTag, bool cumulative);
00128 void complete(DeliveryRecord&);
00129 AckRange findRange(DeliveryId first, DeliveryId last);
00130 void requestDispatch();
00131 void requestDispatch(ConsumerImpl&);
00132 void cancel(ConsumerImpl&);
00133
00134 public:
00135 SemanticState(DeliveryAdapter&, SessionContext&);
00136 ~SemanticState();
00137
00138 SessionContext& getSession() { return session; }
00139
00146 Queue::shared_ptr getQueue(const std::string& name) const;
00147
00148 uint32_t setPrefetchSize(uint32_t size){ return prefetchSize = size; }
00149 uint16_t setPrefetchCount(uint16_t n){ return prefetchCount = n; }
00150
00151 bool exists(const string& consumerTag);
00152
00156 void consume(DeliveryToken::shared_ptr token, string& tagInOut, Queue::shared_ptr queue,
00157 bool nolocal, bool ackRequired, bool acquire, bool exclusive, const framing::FieldTable* = 0);
00158
00159 void cancel(const string& tag);
00160
00161 void setWindowMode(const std::string& destination);
00162 void setCreditMode(const std::string& destination);
00163 void addByteCredit(const std::string& destination, uint32_t value);
00164 void addMessageCredit(const std::string& destination, uint32_t value);
00165 void flush(const std::string& destination);
00166 void stop(const std::string& destination);
00167
00168 bool get(DeliveryToken::shared_ptr token, Queue::shared_ptr queue, bool ackExpected);
00169 void startTx();
00170 void commit(MessageStore* const store, bool completeOnCommit);
00171 void rollback();
00172 void selectDtx();
00173 void startDtx(const std::string& xid, DtxManager& mgr, bool join);
00174 void endDtx(const std::string& xid, bool fail);
00175 void suspendDtx(const std::string& xid);
00176 void resumeDtx(const std::string& xid);
00177 void recover(bool requeue);
00178 void flow(bool active);
00179 DeliveryId redeliver(QueuedMessage& msg, DeliveryToken::shared_ptr token);
00180 void acquire(DeliveryId first, DeliveryId last, DeliveryIds& acquired);
00181 void release(DeliveryId first, DeliveryId last, bool setRedelivered);
00182 void reject(DeliveryId first, DeliveryId last);
00183 void handle(boost::intrusive_ptr<Message> msg);
00184 bool doOutput() { return outputTasks.doOutput(); }
00185
00186
00187 void ackCumulative(DeliveryId deliveryTag);
00188 void ackRange(DeliveryId deliveryTag, DeliveryId endTag);
00189
00190
00191 void completed(DeliveryId deliveryTag, DeliveryId endTag);
00192 void accepted(DeliveryId deliveryTag, DeliveryId endTag);
00193 };
00194
00195 }}
00196
00197
00198
00199
00200 #endif