/usr/share/cruisecontrol-bin-2.6.1/projects/qpid-trunk/cpp/src/qpid/broker/SessionAdapter.h

00001 #ifndef _broker_SessionAdapter_h
00002 #define _broker_SessionAdapter_h
00003 
00004 /*
00005  *
00006  * Copyright (c) 2006 The Apache Software Foundation
00007  *
00008  * Licensed under the Apache License, Version 2.0 (the "License");
00009  * you may not use this file except in compliance with the License.
00010  * You may obtain a copy of the License at
00011  *
00012  *    http://www.apache.org/licenses/LICENSE-2.0
00013  *
00014  * Unless required by applicable law or agreed to in writing, software
00015  * distributed under the License is distributed on an "AS IS" BASIS,
00016  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
00017  * See the License for the specific language governing permissions and
00018  * limitations under the License.
00019  *
00020  */
00021 
00022 #include "HandlerImpl.h"
00023 
00024 #include "ConnectionToken.h"
00025 #include "OwnershipToken.h"
00026 #include "qpid/Exception.h"
00027 #include "qpid/framing/AMQP_ServerOperations.h"
00028 #include "qpid/framing/reply_exceptions.h"
00029 #include "qpid/framing/SequenceSet.h"
00030 
00031 #include <vector>
00032 #include <boost/function.hpp>
00033 #include <boost/shared_ptr.hpp>
00034 
00035 namespace qpid {
00036 namespace broker {
00037 
00038 class Channel;
00039 class Connection;
00040 class Broker;
00041 class Queue;
00042 
00052  class SessionAdapter : public HandlerImpl, public framing::AMQP_ServerOperations
00053 {
00054   public:
00055     SessionAdapter(SemanticState& session);
00056 
00057 
00058     framing::ProtocolVersion getVersion() const { return session.getConnection().getVersion();}
00059 
00060     Message010Handler* getMessage010Handler(){ return &messageImpl;  }
00061     Exchange010Handler* getExchange010Handler(){ return &exchangeImpl; }
00062     Queue010Handler* getQueue010Handler(){ return &queueImpl;  }
00063     Execution010Handler* getExecution010Handler(){ return &executionImpl; }
00064     Tx010Handler* getTx010Handler(){ return &txImpl; }
00065     Dtx010Handler* getDtx010Handler(){ return &dtxImpl; }
00066 
00067     BasicHandler* getBasicHandler() { throw framing::NotImplementedException("Class not implemented");  }
00068     ExchangeHandler* getExchangeHandler(){ throw framing::NotImplementedException("Class not implemented");  }
00069     BindingHandler* getBindingHandler(){ throw framing::NotImplementedException("Class not implemented");  }
00070     QueueHandler* getQueueHandler(){ throw framing::NotImplementedException("Class not implemented");  }
00071     TxHandler* getTxHandler(){ throw framing::NotImplementedException("Class not implemented");  }
00072     MessageHandler* getMessageHandler(){ throw framing::NotImplementedException("Class not implemented");  }
00073     DtxCoordinationHandler* getDtxCoordinationHandler(){ throw framing::NotImplementedException("Class not implemented");  }
00074     DtxDemarcationHandler* getDtxDemarcationHandler(){ throw framing::NotImplementedException("Class not implemented");  }
00075     AccessHandler* getAccessHandler() { throw framing::NotImplementedException("Class not implemented");  }
00076     FileHandler* getFileHandler() { throw framing::NotImplementedException("Class not implemented");  }
00077     StreamHandler* getStreamHandler() { throw framing::NotImplementedException("Class not implemented");  }
00078     TunnelHandler* getTunnelHandler() { throw framing::NotImplementedException("Class not implemented"); }
00079     ExecutionHandler* getExecutionHandler() { throw framing::NotImplementedException("Class not implemented"); }
00080     ConnectionHandler* getConnectionHandler() { throw framing::NotImplementedException("Class not implemented"); }
00081     SessionHandler* getSessionHandler() { throw framing::NotImplementedException("Class not implemented"); }
00082     Connection010Handler* getConnection010Handler() { throw framing::NotImplementedException("Class not implemented"); }
00083     Session010Handler* getSession010Handler() { throw framing::NotImplementedException("Class not implemented"); }
00084 
00085   private:
00086     //common base for utility methods etc that are specific to this adapter
00087     struct HandlerHelper : public HandlerImpl 
00088     {
00089         HandlerHelper(SemanticState& s) : HandlerImpl(s) {}
00090 
00091         Queue::shared_ptr getQueue(const string& name) const;
00092     };
00093 
00094 
00095     class ExchangeHandlerImpl :
00096         public Exchange010Handler,
00097         public HandlerHelper
00098     {
00099       public:
00100         ExchangeHandlerImpl(SemanticState& session) : HandlerHelper(session) {}
00101         
00102         void declare(const std::string& exchange, const std::string& type,
00103                      const std::string& alternateExchange, 
00104                      bool passive, bool durable, bool autoDelete, 
00105                      const qpid::framing::FieldTable& arguments); 
00106         void delete_(const std::string& exchange, bool ifUnused); 
00107         framing::Exchange010QueryResult query(const std::string& name);
00108         void bind(const std::string& queue, 
00109                   const std::string& exchange, const std::string& routingKey,
00110                   const qpid::framing::FieldTable& arguments); 
00111         void unbind(const std::string& queue,
00112                     const std::string& exchange,
00113                     const std::string& routingKey);
00114         framing::Exchange010BoundResult bound(const std::string& exchange,
00115                                            const std::string& queue,
00116                                            const std::string& routingKey,
00117                                            const framing::FieldTable& arguments);
00118       private:
00119         void checkType(shared_ptr<Exchange> exchange, const std::string& type);
00120 
00121         void checkAlternate(shared_ptr<Exchange> exchange,
00122                             shared_ptr<Exchange> alternate);
00123     };
00124 
00125     class QueueHandlerImpl : public Queue010Handler,
00126             public HandlerHelper, public OwnershipToken
00127     {
00128         Broker& broker;
00129         std::vector< boost::shared_ptr<Queue> > exclusiveQueues;
00130 
00131       public:
00132         QueueHandlerImpl(SemanticState& session);
00133         ~QueueHandlerImpl();
00134         
00135         void declare(const std::string& queue,
00136                      const std::string& alternateExchange, 
00137                      bool passive, bool durable, bool exclusive, 
00138                      bool autoDelete,
00139                      const qpid::framing::FieldTable& arguments); 
00140         void delete_(const std::string& queue,
00141                      bool ifUnused, bool ifEmpty);
00142         void purge(const std::string& queue); 
00143         framing::Queue010QueryResult query(const std::string& queue);
00144         bool isLocal(const ConnectionToken* t) const; 
00145     };
00146 
00147     class MessageHandlerImpl :
00148         public Message010Handler,
00149         public HandlerHelper
00150     {
00151         typedef boost::function<void(DeliveryId, DeliveryId)> RangedOperation;    
00152         RangedOperation releaseRedeliveredOp;
00153         RangedOperation releaseOp;
00154         RangedOperation rejectOp;
00155         RangedOperation acceptOp;
00156 
00157       public:
00158         MessageHandlerImpl(SemanticState& session);
00159         void transfer(const string& destination,
00160                       uint8_t acceptMode,
00161                       uint8_t acquireMode);
00162         
00163         void accept(const framing::SequenceSet& commands);
00164         
00165         void reject(const framing::SequenceSet& commands,
00166                     uint16_t code,
00167                     const string& text);
00168         
00169         void release(const framing::SequenceSet& commands,
00170                      bool setRedelivered);
00171         
00172         framing::Message010AcquireResult acquire(const framing::SequenceSet&);
00173 
00174         void subscribe(const string& queue,
00175                        const string& destination,
00176                        uint8_t acceptMode,
00177                        uint8_t acquireMode,
00178                        bool exclusive,
00179                        const string& resumeId,
00180                        uint64_t resumeTtl,
00181                        const framing::FieldTable& arguments);
00182         
00183         void cancel(const string& destination);
00184         
00185         void setFlowMode(const string& destination,
00186                          uint8_t flowMode);
00187         
00188         void flow(const string& destination,
00189                   uint8_t unit,
00190                   uint32_t value);
00191         
00192         void flush(const string& destination);
00193         
00194         void stop(const string& destination);
00195     
00196     };
00197 
00198     class ExecutionHandlerImpl : public Execution010Handler, public HandlerHelper
00199     {
00200     public:
00201         ExecutionHandlerImpl(SemanticState& session) : HandlerHelper(session) {}
00202 
00203         void sync();            
00204         void result(uint32_t commandId, const string& value);        
00205         void exception(uint16_t errorCode,
00206                        uint32_t commandId,
00207                        uint8_t classCode,
00208                        uint8_t commandCode,
00209                        uint8_t fieldIndex,
00210                        const std::string& description,
00211                        const framing::FieldTable& errorInfo);
00212 
00213     };
00214 
00215     class TxHandlerImpl : public Tx010Handler, public HandlerHelper
00216     {
00217       public:
00218         TxHandlerImpl(SemanticState& session) : HandlerHelper(session) {}
00219         
00220         void select();
00221         void commit();
00222         void rollback();
00223     };
00224 
00225     class DtxHandlerImpl : public Dtx010Handler, public HandlerHelper
00226     {
00227         std::string convert(const framing::Xid010& xid);
00228 
00229       public:
00230         DtxHandlerImpl(SemanticState& session) : HandlerHelper(session) {}
00231 
00232         void select();
00233             
00234         framing::Dtx010StartResult start(const framing::Xid010& xid,
00235                                          bool join,
00236                                          bool resume);
00237         
00238         framing::Dtx010EndResult end(const framing::Xid010& xid,
00239                                      bool fail,
00240                                      bool suspend);
00241         
00242         framing::Dtx010CommitResult commit(const framing::Xid010& xid,
00243                                            bool onePhase);
00244         
00245         void forget(const framing::Xid010& xid);
00246         
00247         framing::Dtx010GetTimeoutResult getTimeout(const framing::Xid010& xid);
00248         
00249         framing::Dtx010PrepareResult prepare(const framing::Xid010& xid);
00250         
00251         framing::Dtx010RecoverResult recover();
00252         
00253         framing::Dtx010RollbackResult rollback(const framing::Xid010& xid);
00254         
00255         void setTimeout(const framing::Xid010& xid, uint32_t timeout);        
00256     };
00257 
00258     ExchangeHandlerImpl exchangeImpl;
00259     QueueHandlerImpl queueImpl;
00260     MessageHandlerImpl messageImpl;
00261     ExecutionHandlerImpl executionImpl;
00262     TxHandlerImpl txImpl;
00263     DtxHandlerImpl dtxImpl;
00264 };
00265 }} // namespace qpid::broker
00266 
00267 
00268 
00269 #endif  

Generated on Thu Apr 10 11:08:17 2008 for Qpid by  doxygen 1.4.7