00001 #ifndef _ManagementBroker_
00002 #define _ManagementBroker_
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024 #include "qpid/Options.h"
00025 #include "qpid/broker/Exchange.h"
00026 #include "qpid/broker/Timer.h"
00027 #include "qpid/framing/Uuid.h"
00028 #include "qpid/sys/Mutex.h"
00029 #include "qpid/broker/ConnectionToken.h"
00030 #include "qpid/agent/ManagementAgent.h"
00031 #include "ManagementObject.h"
00032 #include "Manageable.h"
00033 #include "qmf/org/apache/qpid/broker/Agent.h"
00034 #include <qpid/framing/AMQFrame.h>
00035
00036 namespace qpid {
00037 namespace management {
00038
00039 class ManagementBroker : public ManagementAgent
00040 {
00041 private:
00042
00043 int threadPoolSize;
00044
00045 public:
00046
00047 ManagementBroker ();
00048 virtual ~ManagementBroker ();
00049
00050 void configure (std::string dataDir, uint16_t interval, qpid::broker::Broker* broker, int threadPoolSize);
00051 void setInterval (uint16_t _interval) { interval = _interval; }
00052 void setExchange (qpid::broker::Exchange::shared_ptr mgmtExchange,
00053 qpid::broker::Exchange::shared_ptr directExchange);
00054 int getMaxThreads () { return threadPoolSize; }
00055 void registerClass (std::string& packageName,
00056 std::string& className,
00057 uint8_t* md5Sum,
00058 ManagementObject::writeSchemaCall_t schemaCall);
00059 void registerEvent (std::string& packageName,
00060 std::string& eventName,
00061 uint8_t* md5Sum,
00062 ManagementObject::writeSchemaCall_t schemaCall);
00063 ObjectId addObject (ManagementObject* object,
00064 uint64_t persistId = 0);
00065 void raiseEvent(const ManagementEvent& event, severity_t severity = SEV_DEFAULT);
00066 void clientAdded ();
00067 bool dispatchCommand (qpid::broker::Deliverable& msg,
00068 const std::string& routingKey,
00069 const framing::FieldTable* args);
00070
00071
00072 void init (std::string, uint16_t, uint16_t, bool, std::string) { assert(0); }
00073 uint32_t pollCallbacks (uint32_t) { assert(0); return 0; }
00074 int getSignalFd () { assert(0); return -1; }
00075
00076 private:
00077 friend class ManagementAgent;
00078
00079 struct Periodic : public qpid::broker::TimerTask
00080 {
00081 ManagementBroker& broker;
00082
00083 Periodic (ManagementBroker& broker, uint32_t seconds);
00084 virtual ~Periodic ();
00085 void fire ();
00086 };
00087
00088
00089
00090
00091 struct RemoteAgent : public Manageable
00092 {
00093 uint32_t objIdBank;
00094 std::string routingKey;
00095 ObjectId connectionRef;
00096 qmf::org::apache::qpid::broker::Agent* mgmtObject;
00097 ManagementObject* GetManagementObject (void) const { return mgmtObject; }
00098 virtual ~RemoteAgent ();
00099 };
00100
00101
00102
00103
00104 typedef std::map<ObjectId, RemoteAgent*> RemoteAgentMap;
00105 typedef std::vector<std::string> ReplyToVector;
00106
00107
00108
00109
00110
00111
00112
00113 struct SchemaClassKey
00114 {
00115 std::string name;
00116 uint8_t hash[16];
00117 };
00118
00119 struct SchemaClassKeyComp
00120 {
00121 bool operator() (const SchemaClassKey& lhs, const SchemaClassKey& rhs) const
00122 {
00123 if (lhs.name != rhs.name)
00124 return lhs.name < rhs.name;
00125 else
00126 for (int i = 0; i < 16; i++)
00127 if (lhs.hash[i] != rhs.hash[i])
00128 return lhs.hash[i] < rhs.hash[i];
00129 return false;
00130 }
00131 };
00132
00133 struct SchemaClass
00134 {
00135 uint8_t kind;
00136 ManagementObject::writeSchemaCall_t writeSchemaCall;
00137 uint32_t pendingSequence;
00138 size_t bufferLen;
00139 uint8_t* buffer;
00140
00141 SchemaClass(uint8_t _kind, uint32_t seq) :
00142 kind(_kind), writeSchemaCall(0), pendingSequence(seq), bufferLen(0), buffer(0) {}
00143 SchemaClass(uint8_t _kind, ManagementObject::writeSchemaCall_t call) :
00144 kind(_kind), writeSchemaCall(call), pendingSequence(0), bufferLen(0), buffer(0) {}
00145 bool hasSchema () { return (writeSchemaCall != 0) || (buffer != 0); }
00146 void appendSchema (framing::Buffer& buf);
00147 };
00148
00149 typedef std::map<SchemaClassKey, SchemaClass, SchemaClassKeyComp> ClassMap;
00150 typedef std::map<std::string, ClassMap> PackageMap;
00151
00152 RemoteAgentMap remoteAgents;
00153 PackageMap packages;
00154 ManagementObjectMap managementObjects;
00155 ManagementObjectMap newManagementObjects;
00156
00157 static ManagementAgent* agent;
00158 static bool enabled;
00159
00160 framing::Uuid uuid;
00161 sys::Mutex addLock;
00162 sys::Mutex userLock;
00163 qpid::broker::Timer timer;
00164 qpid::broker::Exchange::shared_ptr mExchange;
00165 qpid::broker::Exchange::shared_ptr dExchange;
00166 std::string dataDir;
00167 uint16_t interval;
00168 qpid::broker::Broker* broker;
00169 uint16_t bootSequence;
00170 uint32_t nextObjectId;
00171 uint32_t brokerBank;
00172 uint32_t nextRemoteBank;
00173 uint32_t nextRequestSequence;
00174 bool clientWasAdded;
00175
00176 # define MA_BUFFER_SIZE 65536
00177 char inputBuffer[MA_BUFFER_SIZE];
00178 char outputBuffer[MA_BUFFER_SIZE];
00179 char eventBuffer[MA_BUFFER_SIZE];
00180
00181 void writeData ();
00182 void periodicProcessing (void);
00183 void encodeHeader (framing::Buffer& buf, uint8_t opcode, uint32_t seq = 0);
00184 bool checkHeader (framing::Buffer& buf, uint8_t *opcode, uint32_t *seq);
00185 void sendBuffer (framing::Buffer& buf,
00186 uint32_t length,
00187 qpid::broker::Exchange::shared_ptr exchange,
00188 std::string routingKey);
00189 void moveNewObjectsLH();
00190
00191 bool authorizeAgentMessageLH(qpid::broker::Message& msg);
00192 void dispatchAgentCommandLH(qpid::broker::Message& msg);
00193
00194 PackageMap::iterator findOrAddPackageLH(std::string name);
00195 void addClassLH(uint8_t kind,
00196 PackageMap::iterator pIter,
00197 std::string& className,
00198 uint8_t* md5Sum,
00199 ManagementObject::writeSchemaCall_t schemaCall);
00200 void encodePackageIndication (framing::Buffer& buf,
00201 PackageMap::iterator pIter);
00202 void encodeClassIndication (framing::Buffer& buf,
00203 PackageMap::iterator pIter,
00204 ClassMap::iterator cIter);
00205 bool bankInUse (uint32_t bank);
00206 uint32_t allocateNewBank ();
00207 uint32_t assignBankLH (uint32_t requestedPrefix);
00208 void deleteOrphanedAgentsLH();
00209 void sendCommandComplete (std::string replyToKey, uint32_t sequence,
00210 uint32_t code = 0, std::string text = std::string("OK"));
00211 void handleBrokerRequestLH (framing::Buffer& inBuffer, std::string replyToKey, uint32_t sequence);
00212 void handlePackageQueryLH (framing::Buffer& inBuffer, std::string replyToKey, uint32_t sequence);
00213 void handlePackageIndLH (framing::Buffer& inBuffer, std::string replyToKey, uint32_t sequence);
00214 void handleClassQueryLH (framing::Buffer& inBuffer, std::string replyToKey, uint32_t sequence);
00215 void handleClassIndLH (framing::Buffer& inBuffer, std::string replyToKey, uint32_t sequence);
00216 void handleSchemaRequestLH (framing::Buffer& inBuffer, std::string replyToKey, uint32_t sequence);
00217 void handleSchemaResponseLH (framing::Buffer& inBuffer, std::string replyToKey, uint32_t sequence);
00218 void handleAttachRequestLH (framing::Buffer& inBuffer, std::string replyToKey, uint32_t sequence, const qpid::broker::ConnectionToken* connToken);
00219 void handleGetQueryLH (framing::Buffer& inBuffer, std::string replyToKey, uint32_t sequence);
00220 void handleMethodRequestLH (framing::Buffer& inBuffer, std::string replyToKey, uint32_t sequence, const qpid::broker::ConnectionToken* connToken);
00221
00222 size_t validateSchema(framing::Buffer&, uint8_t kind);
00223 size_t validateTableSchema(framing::Buffer&);
00224 size_t validateEventSchema(framing::Buffer&);
00225 };
00226
00227 }}
00228
00229 #endif