/usr/share/cruisecontrol-bin-2.6.1/projects/qpid-trunk/cpp/src/qpid/broker/SemanticState.h

00001 #ifndef QPID_BROKER_SEMANTICSTATE_H
00002 #define QPID_BROKER_SEMANTICSTATE_H
00003 
00004 /*
00005  *
00006  * Licensed to the Apache Software Foundation (ASF) under one
00007  * or more contributor license agreements.  See the NOTICE file
00008  * distributed with this work for additional information
00009  * regarding copyright ownership.  The ASF licenses this file
00010  * to you under the Apache License, Version 2.0 (the
00011  * "License"); you may not use this file except in compliance
00012  * with the License.  You may obtain a copy of the License at
00013  *
00014  *   http://www.apache.org/licenses/LICENSE-2.0
00015  *
00016  * Unless required by applicable law or agreed to in writing,
00017  * software distributed under the License is distributed on an
00018  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
00019  * KIND, either express or implied.  See the License for the
00020  * specific language governing permissions and limitations
00021  * under the License.
00022  *
00023  */
00024 
00025 #include "Consumer.h"
00026 #include "Deliverable.h"
00027 #include "DeliveryAdapter.h"
00028 #include "DeliveryRecord.h"
00029 #include "DeliveryToken.h"
00030 #include "DtxBuffer.h"
00031 #include "DtxManager.h"
00032 #include "NameGenerator.h"
00033 #include "Prefetch.h"
00034 #include "TxBuffer.h"
00035 
00036 #include "qpid/framing/FrameHandler.h"
00037 #include "qpid/framing/AccumulatedAck.h"
00038 #include "qpid/framing/Uuid.h"
00039 #include "qpid/sys/AggregateOutput.h"
00040 #include "qpid/shared_ptr.h"
00041 
00042 #include <list>
00043 #include <map>
00044 #include <vector>
00045 
00046 #include <boost/intrusive_ptr.hpp>
00047 
00048 namespace qpid {
00049 namespace broker {
00050 
00051 class SessionContext;
00052 
00057 class SemanticState : public framing::FrameHandler::Chains,
00058                       public sys::OutputTask,
00059                       private boost::noncopyable
00060 {
00061     class ConsumerImpl : public Consumer, public sys::OutputTask
00062     {
00063         SemanticState* const parent;
00064         const DeliveryToken::shared_ptr token;
00065         const string name;
00066         const Queue::shared_ptr queue;
00067         const bool ackExpected;
00068         const bool nolocal;
00069         const bool acquire;
00070         bool blocked;
00071         bool windowing;
00072         uint32_t msgCredit;
00073         uint32_t byteCredit;
00074 
00075         bool checkCredit(boost::intrusive_ptr<Message>& msg);
00076         void allocateCredit(boost::intrusive_ptr<Message>& msg);
00077 
00078       public:
00079         ConsumerImpl(SemanticState* parent, DeliveryToken::shared_ptr token, 
00080                      const string& name, Queue::shared_ptr queue,
00081                      bool ack, bool nolocal, bool acquire);
00082         ~ConsumerImpl();
00083         bool deliver(QueuedMessage& msg);            
00084         bool filter(boost::intrusive_ptr<Message> msg);            
00085         bool accept(boost::intrusive_ptr<Message> msg);            
00086         void notify();
00087 
00088         void setWindowMode();
00089         void setCreditMode();
00090         void addByteCredit(uint32_t value);
00091         void addMessageCredit(uint32_t value);
00092         void flush();
00093         void stop();
00094         void complete(DeliveryRecord&);    
00095         Queue::shared_ptr getQueue() { return queue; }
00096         bool isBlocked() const { return blocked; }
00097 
00098         bool doOutput();
00099     };
00100 
00101     typedef boost::ptr_map<std::string,ConsumerImpl> ConsumerImplMap;
00102     typedef std::map<std::string, DtxBuffer::shared_ptr> DtxBufferMap;
00103 
00104     SessionContext& session;
00105     DeliveryAdapter& deliveryAdapter;
00106     Queue::shared_ptr defaultQueue;
00107     ConsumerImplMap consumers;
00108     uint32_t prefetchSize;    
00109     uint16_t prefetchCount;    
00110     Prefetch outstanding;
00111     NameGenerator tagGenerator;
00112     std::list<DeliveryRecord> unacked;
00113     TxBuffer::shared_ptr txBuffer;
00114     DtxBuffer::shared_ptr dtxBuffer;
00115     bool dtxSelected;
00116     DtxBufferMap suspendedXids;
00117     framing::AccumulatedAck accumulatedAck;
00118     bool flowActive;
00119     boost::shared_ptr<Exchange> cacheExchange;
00120     sys::AggregateOutput outputTasks;
00121     
00122     void route(boost::intrusive_ptr<Message> msg, Deliverable& strategy);
00123     void record(const DeliveryRecord& delivery);
00124     bool checkPrefetch(boost::intrusive_ptr<Message>& msg);
00125     void checkDtxTimeout();
00126     ConsumerImpl& find(const std::string& destination);
00127     void ack(DeliveryId deliveryTag, DeliveryId endTag, bool cumulative);
00128     void complete(DeliveryRecord&);
00129     AckRange findRange(DeliveryId first, DeliveryId last);
00130     void requestDispatch();
00131     void requestDispatch(ConsumerImpl&);
00132     void cancel(ConsumerImpl&);
00133 
00134   public:
00135     SemanticState(DeliveryAdapter&, SessionContext&);
00136     ~SemanticState();
00137 
00138     SessionContext& getSession() { return session; }
00139     
00146     Queue::shared_ptr getQueue(const std::string& name) const;
00147     
00148     uint32_t setPrefetchSize(uint32_t size){ return prefetchSize = size; }
00149     uint16_t setPrefetchCount(uint16_t n){ return prefetchCount = n; }
00150 
00151     bool exists(const string& consumerTag);
00152 
00156     void consume(DeliveryToken::shared_ptr token, string& tagInOut, Queue::shared_ptr queue, 
00157                  bool nolocal, bool ackRequired, bool acquire, bool exclusive, const framing::FieldTable* = 0);
00158 
00159     void cancel(const string& tag);
00160 
00161     void setWindowMode(const std::string& destination);
00162     void setCreditMode(const std::string& destination);
00163     void addByteCredit(const std::string& destination, uint32_t value);
00164     void addMessageCredit(const std::string& destination, uint32_t value);
00165     void flush(const std::string& destination);
00166     void stop(const std::string& destination);
00167 
00168     bool get(DeliveryToken::shared_ptr token, Queue::shared_ptr queue, bool ackExpected);
00169     void startTx();
00170     void commit(MessageStore* const store, bool completeOnCommit);
00171     void rollback();
00172     void selectDtx();
00173     void startDtx(const std::string& xid, DtxManager& mgr, bool join);
00174     void endDtx(const std::string& xid, bool fail);
00175     void suspendDtx(const std::string& xid);
00176     void resumeDtx(const std::string& xid);
00177     void recover(bool requeue);
00178     void flow(bool active);
00179     DeliveryId redeliver(QueuedMessage& msg, DeliveryToken::shared_ptr token);            
00180     void acquire(DeliveryId first, DeliveryId last, DeliveryIds& acquired);
00181     void release(DeliveryId first, DeliveryId last, bool setRedelivered);
00182     void reject(DeliveryId first, DeliveryId last);
00183     void handle(boost::intrusive_ptr<Message> msg);
00184     bool doOutput() { return outputTasks.doOutput(); }
00185 
00186     //preview only (completed == ack):
00187     void ackCumulative(DeliveryId deliveryTag);
00188     void ackRange(DeliveryId deliveryTag, DeliveryId endTag);
00189 
00190     //final 0-10 spec (completed and accepted are distinct):
00191     void completed(DeliveryId deliveryTag, DeliveryId endTag);
00192     void accepted(DeliveryId deliveryTag, DeliveryId endTag);
00193 };
00194 
00195 }} // namespace qpid::broker
00196 
00197 
00198 
00199 
00200 #endif  

Generated on Thu Apr 10 11:08:17 2008 for Qpid by  doxygen 1.4.7