00001 #ifndef _broker_BrokerAdapter_h
00002 #define _broker_BrokerAdapter_h
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021 #include "DtxHandlerImpl.h"
00022 #include "MessageHandlerImpl.h"
00023
00024 #include "qpid/Exception.h"
00025 #include "qpid/framing/AMQP_ServerOperations.h"
00026 #include "qpid/framing/reply_exceptions.h"
00027
00028 namespace qpid {
00029 namespace broker {
00030
00031 class Channel;
00032 class Connection;
00033 class Broker;
00034 class ConnectionHandler;
00035 class BasicHandler;
00036 class ExchangeHandler;
00037 class QueueHandler;
00038 class TxHandler;
00039 class MessageHandler;
00040 class AccessHandler;
00041 class FileHandler;
00042 class StreamHandler;
00043 class DtxHandler;
00044 class TunnelHandler;
00045 class MessageHandlerImpl;
00046 class Exchange;
00047
00057 class BrokerAdapter : public HandlerImpl, public framing::AMQP_ServerOperations
00058 {
00059 public:
00060 BrokerAdapter(SemanticState& session);
00061
00062 BasicHandler* getBasicHandler() { return &basicHandler; }
00063 ExchangeHandler* getExchangeHandler() { return &exchangeHandler; }
00064 BindingHandler* getBindingHandler() { return &bindingHandler; }
00065 QueueHandler* getQueueHandler() { return &queueHandler; }
00066 TxHandler* getTxHandler() { return &txHandler; }
00067 MessageHandler* getMessageHandler() { return &messageHandler; }
00068 DtxCoordinationHandler* getDtxCoordinationHandler() { return &dtxHandler; }
00069 DtxDemarcationHandler* getDtxDemarcationHandler() { return &dtxHandler; }
00070
00071 framing::ProtocolVersion getVersion() const { return session.getConnection().getVersion();}
00072
00073
00074 AccessHandler* getAccessHandler() {
00075 throw framing::NotImplementedException("Access class not implemented"); }
00076 FileHandler* getFileHandler() {
00077 throw framing::NotImplementedException("File class not implemented"); }
00078 StreamHandler* getStreamHandler() {
00079 throw framing::NotImplementedException("Stream class not implemented"); }
00080 TunnelHandler* getTunnelHandler() {
00081 throw framing::NotImplementedException("Tunnel class not implemented"); }
00082
00083 Exchange010Handler* getExchange010Handler() { throw framing::NotImplementedException("Class not implemented"); }
00084 Queue010Handler* getQueue010Handler() { throw framing::NotImplementedException("Class not implemented"); }
00085 Message010Handler* getMessage010Handler() { throw framing::NotImplementedException("Class not implemented"); }
00086 Tx010Handler* getTx010Handler() { throw framing::NotImplementedException("Class not implemented"); }
00087 Dtx010Handler* getDtx010Handler() { throw framing::NotImplementedException("Class not implemented"); }
00088 Execution010Handler* getExecution010Handler() { throw framing::NotImplementedException("Class not implemented"); }
00089
00090
00091 #define BADHANDLER() assert(0); throw framing::NotImplementedException("")
00092 ExecutionHandler* getExecutionHandler() { BADHANDLER(); }
00093 ConnectionHandler* getConnectionHandler() { BADHANDLER(); }
00094 SessionHandler* getSessionHandler() { BADHANDLER(); }
00095 Connection010Handler* getConnection010Handler() { BADHANDLER(); }
00096 Session010Handler* getSession010Handler() { BADHANDLER(); }
00097 #undef BADHANDLER
00098
00099 private:
00100 class ExchangeHandlerImpl :
00101 public ExchangeHandler,
00102 public HandlerImpl
00103 {
00104 public:
00105 ExchangeHandlerImpl(SemanticState& session) : HandlerImpl(session) {}
00106
00107 void declare(uint16_t ticket,
00108 const std::string& exchange, const std::string& type,
00109 const std::string& alternateExchange,
00110 bool passive, bool durable, bool autoDelete,
00111 const qpid::framing::FieldTable& arguments);
00112 void delete_(uint16_t ticket,
00113 const std::string& exchange, bool ifUnused);
00114 framing::ExchangeQueryResult query(u_int16_t ticket,
00115 const std::string& name);
00116 private:
00117 void checkType(shared_ptr<Exchange> exchange, const std::string& type);
00118
00119 void checkAlternate(shared_ptr<Exchange> exchange,
00120 shared_ptr<Exchange> alternate);
00121 };
00122
00123 class BindingHandlerImpl :
00124 public BindingHandler,
00125 public HandlerImpl
00126 {
00127 public:
00128 BindingHandlerImpl(SemanticState& session) : HandlerImpl(session) {}
00129
00130 framing::BindingQueryResult query(u_int16_t ticket,
00131 const std::string& exchange,
00132 const std::string& queue,
00133 const std::string& routingKey,
00134 const framing::FieldTable& arguments);
00135 };
00136
00137 class QueueHandlerImpl :
00138 public QueueHandler,
00139 public HandlerImpl
00140 {
00141 public:
00142 QueueHandlerImpl(SemanticState& session) : HandlerImpl(session) {}
00143
00144 void declare(uint16_t ticket, const std::string& queue,
00145 const std::string& alternateExchange,
00146 bool passive, bool durable, bool exclusive,
00147 bool autoDelete,
00148 const qpid::framing::FieldTable& arguments);
00149 void bind(uint16_t ticket, const std::string& queue,
00150 const std::string& exchange, const std::string& routingKey,
00151 const qpid::framing::FieldTable& arguments);
00152 void unbind(uint16_t ticket,
00153 const std::string& queue,
00154 const std::string& exchange,
00155 const std::string& routingKey,
00156 const qpid::framing::FieldTable& arguments );
00157 framing::QueueQueryResult query(const std::string& queue);
00158 void purge(uint16_t ticket, const std::string& queue);
00159 void delete_(uint16_t ticket, const std::string& queue,
00160 bool ifUnused, bool ifEmpty);
00161 };
00162
00163 class BasicHandlerImpl :
00164 public BasicHandler,
00165 public HandlerImpl
00166 {
00167 NameGenerator tagGenerator;
00168 public:
00169 BasicHandlerImpl(SemanticState& session) : HandlerImpl(session), tagGenerator("sgen") {}
00170
00171 void qos(uint32_t prefetchSize,
00172 uint16_t prefetchCount, bool global);
00173 void consume(uint16_t ticket, const std::string& queue,
00174 const std::string& consumerTag,
00175 bool noLocal, bool noAck, bool exclusive, bool nowait,
00176 const qpid::framing::FieldTable& fields);
00177 void cancel(const std::string& consumerTag);
00178 void get(uint16_t ticket, const std::string& queue, bool noAck);
00179 void ack(uint64_t deliveryTag, bool multiple);
00180 void reject(uint64_t deliveryTag, bool requeue);
00181 void recover(bool requeue);
00182 };
00183
00184 class TxHandlerImpl :
00185 public TxHandler,
00186 public HandlerImpl
00187 {
00188 public:
00189 TxHandlerImpl(SemanticState& session) : HandlerImpl(session) {}
00190
00191 void select();
00192 void commit();
00193 void rollback();
00194 };
00195
00196 BasicHandlerImpl basicHandler;
00197 ExchangeHandlerImpl exchangeHandler;
00198 BindingHandlerImpl bindingHandler;
00199 MessageHandlerImpl messageHandler;
00200 QueueHandlerImpl queueHandler;
00201 TxHandlerImpl txHandler;
00202 DtxHandlerImpl dtxHandler;
00203 };
00204 }}
00205
00206
00207
00208 #endif