00001 #ifndef _ManagementAgent_
00002 #define _ManagementAgent_
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025 #include "qpid/Options.h"
00026 #include "qpid/broker/Exchange.h"
00027 #include "qpid/broker/Timer.h"
00028 #include "qpid/framing/Uuid.h"
00029 #include "qpid/sys/Mutex.h"
00030 #include "ManagementObject.h"
00031 #include <qpid/framing/AMQFrame.h>
00032 #include <boost/shared_ptr.hpp>
00033
00034 namespace qpid {
00035 namespace management {
00036
00037 class ManagementAgent
00038 {
00039 private:
00040
00041 ManagementAgent (std::string dataDir, uint16_t interval);
00042
00043 public:
00044
00045 virtual ~ManagementAgent ();
00046
00047 typedef boost::shared_ptr<ManagementAgent> shared_ptr;
00048
00049 static void enableManagement (std::string dataDir, uint16_t interval);
00050 static shared_ptr getAgent (void);
00051 static void shutdown (void);
00052
00053 void setInterval (uint16_t _interval) { interval = _interval; }
00054 void setExchange (broker::Exchange::shared_ptr mgmtExchange,
00055 broker::Exchange::shared_ptr directExchange);
00056 void RegisterClass (std::string packageName,
00057 std::string className,
00058 uint8_t* md5Sum,
00059 ManagementObject::writeSchemaCall_t schemaCall);
00060 void addObject (ManagementObject::shared_ptr object,
00061 uint64_t persistenceId = 0,
00062 uint64_t idOffset = 10);
00063 void clientAdded (void);
00064 void dispatchCommand (broker::Deliverable& msg,
00065 const std::string& routingKey,
00066 const qpid::framing::FieldTable* args);
00067
00068 private:
00069
00070 struct Periodic : public broker::TimerTask
00071 {
00072 ManagementAgent& agent;
00073
00074 Periodic (ManagementAgent& agent, uint32_t seconds);
00075 virtual ~Periodic ();
00076 void fire ();
00077 };
00078
00079
00080
00081
00082 struct RemoteAgent
00083 {
00084 std::string name;
00085 uint64_t objIdBase;
00086 };
00087
00088
00089
00090
00091 typedef std::map<std::string, RemoteAgent> RemoteAgentMap;
00092 typedef std::vector<std::string> ReplyToVector;
00093
00094
00095
00096
00097
00098
00099
00100 struct SchemaClassKey
00101 {
00102 std::string name;
00103 uint8_t hash[16];
00104 };
00105
00106 struct SchemaClassKeyComp
00107 {
00108 bool operator() (const SchemaClassKey& lhs, const SchemaClassKey& rhs) const
00109 {
00110 if (lhs.name != rhs.name)
00111 return lhs.name < rhs.name;
00112 else
00113 for (int i = 0; i < 16; i++)
00114 if (lhs.hash[i] != rhs.hash[i])
00115 return lhs.hash[i] < rhs.hash[i];
00116 return false;
00117 }
00118 };
00119
00120 struct SchemaClass
00121 {
00122 ManagementObject::writeSchemaCall_t writeSchemaCall;
00123 ReplyToVector remoteAgents;
00124
00125 SchemaClass () : writeSchemaCall(0) {}
00126 };
00127
00128 typedef std::map<SchemaClassKey, SchemaClass, SchemaClassKeyComp> ClassMap;
00129 typedef std::map<std::string, ClassMap> PackageMap;
00130
00131 RemoteAgentMap remoteAgents;
00132 PackageMap packages;
00133 ManagementObjectMap managementObjects;
00134
00135 static shared_ptr agent;
00136 static bool enabled;
00137
00138 qpid::framing::Uuid uuid;
00139 qpid::sys::Mutex userLock;
00140 broker::Timer timer;
00141 broker::Exchange::shared_ptr mExchange;
00142 broker::Exchange::shared_ptr dExchange;
00143 std::string dataDir;
00144 uint16_t interval;
00145 uint64_t nextObjectId;
00146 uint32_t nextRemotePrefix;
00147
00148 # define MA_BUFFER_SIZE 65536
00149 char inputBuffer[MA_BUFFER_SIZE];
00150 char outputBuffer[MA_BUFFER_SIZE];
00151
00152 void PeriodicProcessing (void);
00153 void EncodeHeader (qpid::framing::Buffer& buf, uint8_t opcode, uint32_t seq = 0);
00154 bool CheckHeader (qpid::framing::Buffer& buf, uint8_t *opcode, uint32_t *seq);
00155 void SendBuffer (qpid::framing::Buffer& buf,
00156 uint32_t length,
00157 broker::Exchange::shared_ptr exchange,
00158 std::string routingKey);
00159
00160 void dispatchMethod (broker::Message& msg,
00161 const std::string& routingKey,
00162 size_t first);
00163 void dispatchAgentCommand (broker::Message& msg);
00164
00165 PackageMap::iterator FindOrAddPackage (std::string name);
00166 void AddClassLocal (PackageMap::iterator pIter,
00167 std::string className,
00168 uint8_t* md5Sum,
00169 ManagementObject::writeSchemaCall_t schemaCall);
00170 void EncodePackageIndication (qpid::framing::Buffer& buf,
00171 PackageMap::iterator pIter);
00172 void EncodeClassIndication (qpid::framing::Buffer& buf,
00173 PackageMap::iterator pIter,
00174 ClassMap::iterator cIter);
00175 uint32_t assignPrefix (uint32_t requestedPrefix);
00176 void sendCommandComplete (std::string replyToKey, uint32_t sequence,
00177 uint32_t code = 0, std::string text = std::string("OK"));
00178 void handleBrokerRequest (qpid::framing::Buffer& inBuffer, std::string replyToKey, uint32_t sequence);
00179 void handlePackageQuery (qpid::framing::Buffer& inBuffer, std::string replyToKey, uint32_t sequence);
00180 void handlePackageInd (qpid::framing::Buffer& inBuffer, std::string replyToKey, uint32_t sequence);
00181 void handleClassQuery (qpid::framing::Buffer& inBuffer, std::string replyToKey, uint32_t sequence);
00182 void handleSchemaQuery (qpid::framing::Buffer& inBuffer, std::string replyToKey, uint32_t sequence);
00183 void handleAttachRequest (qpid::framing::Buffer& inBuffer, std::string replyToKey, uint32_t sequence);
00184 void handleGetRequest (qpid::framing::Buffer& inBuffer, std::string replyToKey, uint32_t sequence);
00185 };
00186
00187 }}
00188
00189 #endif