00001 #ifndef _Broker_
00002 #define _Broker_
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025 #include "ConnectionFactory.h"
00026 #include "ConnectionToken.h"
00027 #include "DirectExchange.h"
00028 #include "DtxManager.h"
00029 #include "ExchangeRegistry.h"
00030 #include "MessageStore.h"
00031 #include "QueueRegistry.h"
00032 #include "SessionManager.h"
00033 #include "PreviewSessionManager.h"
00034 #include "Vhost.h"
00035 #include "System.h"
00036 #include "qpid/management/Manageable.h"
00037 #include "qpid/management/ManagementAgent.h"
00038 #include "qpid/management/Broker.h"
00039 #include "qpid/management/ArgsBrokerConnect.h"
00040 #include "qpid/Options.h"
00041 #include "qpid/Plugin.h"
00042 #include "qpid/DataDir.h"
00043 #include "qpid/framing/FrameHandler.h"
00044 #include "qpid/framing/OutputHandler.h"
00045 #include "qpid/framing/ProtocolInitiation.h"
00046 #include "qpid/sys/Acceptor.h"
00047 #include "qpid/sys/Runnable.h"
00048
00049 #include <vector>
00050
00051 namespace qpid {
00052
00053 class Url;
00054
00055 namespace broker {
00056
00057 static const uint16_t DEFAULT_PORT=5672;
00058
00062 class Broker : public sys::Runnable, public Plugin::Target,
00063 public management::Manageable
00064 {
00065 public:
00066
00067 struct Options : public qpid::Options {
00068 Options(const std::string& name="Broker Options");
00069
00070 bool noDataDir;
00071 std::string dataDir;
00072 uint16_t port;
00073 int workerThreads;
00074 int maxConnections;
00075 int connectionBacklog;
00076 uint64_t stagingThreshold;
00077 bool enableMgmt;
00078 uint16_t mgmtPubInterval;
00079 uint32_t ack;
00080 };
00081
00082 virtual ~Broker();
00083
00084 Broker(const Options& configuration);
00085 static shared_ptr<Broker> create(const Options& configuration);
00086 static shared_ptr<Broker> create(int16_t port = DEFAULT_PORT);
00087
00094 virtual uint16_t getPort() const;
00095
00100 virtual void run();
00101
00103 virtual void shutdown();
00104
00105 void setStore (MessageStore*);
00106 MessageStore& getStore() { return *store; }
00107 QueueRegistry& getQueues() { return queues; }
00108 ExchangeRegistry& getExchanges() { return exchanges; }
00109 uint64_t getStagingThreshold() { return config.stagingThreshold; }
00110 DtxManager& getDtxManager() { return dtxManager; }
00111 DataDir& getDataDir() { return dataDir; }
00112
00113 SessionManager& getSessionManager() { return sessionManager; }
00114 PreviewSessionManager& getPreviewSessionManager() { return previewSessionManager; }
00115
00116 management::ManagementObject::shared_ptr GetManagementObject (void) const;
00117 management::Manageable* GetVhostObject (void) const;
00118 management::Manageable::status_t
00119 ManagementMethod (uint32_t methodId, management::Args& args);
00120
00122 void connect(const std::string& host, uint16_t port,
00123 sys::ConnectionCodec::Factory* =0);
00125 void connect(const Url& url, sys::ConnectionCodec::Factory* =0);
00126
00127 private:
00128 sys::Acceptor& getAcceptor() const;
00129
00130 Options config;
00131 sys::Acceptor::shared_ptr acceptor;
00132 MessageStore* store;
00133 DataDir dataDir;
00134
00135 QueueRegistry queues;
00136 ExchangeRegistry exchanges;
00137 ConnectionFactory factory;
00138 DtxManager dtxManager;
00139 SessionManager sessionManager;
00140 PreviewSessionManager previewSessionManager;
00141 management::ManagementAgent::shared_ptr managementAgent;
00142 management::Broker::shared_ptr mgmtObject;
00143 Vhost::shared_ptr vhostObject;
00144 System::shared_ptr systemObject;
00145
00146 void declareStandardExchange(const std::string& name, const std::string& type);
00147 };
00148
00149 }}
00150
00151
00152
00153 #endif