/usr/share/cruisecontrol-bin-2.6.1/projects/qpid-trunk/cpp/src/qpid/management/ManagementAgent.h

00001 #ifndef _ManagementAgent_
00002 #define _ManagementAgent_
00003 
00004 /*
00005  *
00006  * Licensed to the Apache Software Foundation (ASF) under one
00007  * or more contributor license agreements.  See the NOTICE file
00008  * distributed with this work for additional information
00009  * regarding copyright ownership.  The ASF licenses this file
00010  * to you under the Apache License, Version 2.0 (the
00011  * "License"); you may not use this file except in compliance
00012  * with the License.  You may obtain a copy of the License at
00013  * 
00014  *   http://www.apache.org/licenses/LICENSE-2.0
00015  * 
00016  * Unless required by applicable law or agreed to in writing,
00017  * software distributed under the License is distributed on an
00018  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
00019  * KIND, either express or implied.  See the License for the
00020  * specific language governing permissions and limitations
00021  * under the License.
00022  *
00023  */
00024 
00025 #include "qpid/Options.h"
00026 #include "qpid/broker/Exchange.h"
00027 #include "qpid/broker/Timer.h"
00028 #include "qpid/framing/Uuid.h"
00029 #include "qpid/sys/Mutex.h"
00030 #include "ManagementObject.h"
00031 #include <qpid/framing/AMQFrame.h>
00032 #include <boost/shared_ptr.hpp>
00033 
00034 namespace qpid {
00035 namespace management {
00036 
00037 class ManagementAgent
00038 {
00039   private:
00040 
00041     ManagementAgent (std::string dataDir, uint16_t interval);
00042 
00043   public:
00044 
00045     virtual ~ManagementAgent ();
00046 
00047     typedef boost::shared_ptr<ManagementAgent> shared_ptr;
00048 
00049     static void       enableManagement (std::string dataDir, uint16_t interval);
00050     static shared_ptr getAgent (void);
00051     static void       shutdown (void);
00052 
00053     void setInterval     (uint16_t _interval) { interval = _interval; }
00054     void setExchange     (broker::Exchange::shared_ptr mgmtExchange,
00055                           broker::Exchange::shared_ptr directExchange);
00056     void RegisterClass   (std::string packageName,
00057                           std::string className,
00058                           uint8_t*    md5Sum,
00059                           ManagementObject::writeSchemaCall_t schemaCall);
00060     void addObject       (ManagementObject::shared_ptr object,
00061                           uint64_t                     persistenceId = 0,
00062                           uint64_t                     idOffset      = 10);
00063     void clientAdded     (void);
00064     void dispatchCommand (broker::Deliverable&             msg,
00065                           const std::string&               routingKey,
00066                           const qpid::framing::FieldTable* args);
00067     
00068   private:
00069 
00070     struct Periodic : public broker::TimerTask
00071     {
00072         ManagementAgent& agent;
00073 
00074         Periodic (ManagementAgent& agent, uint32_t seconds);
00075         virtual ~Periodic ();
00076         void fire ();
00077     };
00078 
00079     //  Storage for tracking remote management agents, attached via the client
00080     //  management agent API.
00081     //
00082     struct RemoteAgent
00083     {
00084         std::string name;
00085         uint64_t    objIdBase;
00086     };
00087 
00088     // TODO: Eventually replace string with entire reply-to structure.  reply-to
00089     //       currently assumes that the exchange is "amq.direct" even though it could
00090     //       in theory be specified differently.
00091     typedef std::map<std::string, RemoteAgent> RemoteAgentMap;
00092     typedef std::vector<std::string>           ReplyToVector;
00093 
00094     //  Storage for known schema classes:
00095     //
00096     //  SchemaClassKey     -- Key elements for map lookups
00097     //  SchemaClassKeyComp -- Comparison class for SchemaClassKey
00098     //  SchemaClass        -- Non-key elements for classes
00099     //
00100     struct SchemaClassKey
00101     {
00102         std::string name;
00103         uint8_t     hash[16];
00104     };
00105 
00106     struct SchemaClassKeyComp
00107     {
00108         bool operator() (const SchemaClassKey& lhs, const SchemaClassKey& rhs) const
00109         {
00110             if (lhs.name != rhs.name)
00111                 return lhs.name < rhs.name;
00112             else
00113                 for (int i = 0; i < 16; i++)
00114                     if (lhs.hash[i] != rhs.hash[i])
00115                         return lhs.hash[i] < rhs.hash[i];
00116             return false;
00117         }
00118     };
00119 
00120     struct SchemaClass
00121     {
00122         ManagementObject::writeSchemaCall_t writeSchemaCall;
00123         ReplyToVector                       remoteAgents;
00124 
00125         SchemaClass () : writeSchemaCall(0) {}
00126     };
00127 
00128     typedef std::map<SchemaClassKey, SchemaClass, SchemaClassKeyComp> ClassMap;
00129     typedef std::map<std::string, ClassMap> PackageMap;
00130 
00131     RemoteAgentMap               remoteAgents;
00132     PackageMap                   packages;
00133     ManagementObjectMap          managementObjects;
00134 
00135     static shared_ptr            agent;
00136     static bool                  enabled;
00137 
00138     qpid::framing::Uuid          uuid;
00139     qpid::sys::Mutex             userLock;
00140     broker::Timer                timer;
00141     broker::Exchange::shared_ptr mExchange;
00142     broker::Exchange::shared_ptr dExchange;
00143     std::string                  dataDir;
00144     uint16_t                     interval;
00145     uint64_t                     nextObjectId;
00146     uint32_t                     nextRemotePrefix;
00147 
00148 #   define MA_BUFFER_SIZE 65536
00149     char inputBuffer[MA_BUFFER_SIZE];
00150     char outputBuffer[MA_BUFFER_SIZE];
00151 
00152     void PeriodicProcessing (void);
00153     void EncodeHeader       (qpid::framing::Buffer& buf, uint8_t  opcode, uint32_t  seq = 0);
00154     bool CheckHeader        (qpid::framing::Buffer& buf, uint8_t *opcode, uint32_t *seq);
00155     void SendBuffer         (qpid::framing::Buffer&       buf,
00156                              uint32_t                     length,
00157                              broker::Exchange::shared_ptr exchange,
00158                              std::string                  routingKey);
00159 
00160     void dispatchMethod (broker::Message&   msg,
00161                          const std::string& routingKey,
00162                          size_t             first);
00163     void dispatchAgentCommand (broker::Message& msg);
00164 
00165     PackageMap::iterator FindOrAddPackage (std::string name);
00166     void AddClassLocal (PackageMap::iterator         pIter,
00167                         std::string                  className,
00168                         uint8_t*                     md5Sum,
00169                         ManagementObject::writeSchemaCall_t schemaCall);
00170     void EncodePackageIndication (qpid::framing::Buffer& buf,
00171                                   PackageMap::iterator   pIter);
00172     void EncodeClassIndication (qpid::framing::Buffer& buf,
00173                                 PackageMap::iterator   pIter,
00174                                 ClassMap::iterator     cIter);
00175     uint32_t assignPrefix (uint32_t requestedPrefix);
00176     void sendCommandComplete (std::string replyToKey, uint32_t sequence,
00177                               uint32_t code = 0, std::string text = std::string("OK"));
00178     void handleBrokerRequest (qpid::framing::Buffer& inBuffer, std::string replyToKey, uint32_t sequence);
00179     void handlePackageQuery  (qpid::framing::Buffer& inBuffer, std::string replyToKey, uint32_t sequence);
00180     void handlePackageInd    (qpid::framing::Buffer& inBuffer, std::string replyToKey, uint32_t sequence);
00181     void handleClassQuery    (qpid::framing::Buffer& inBuffer, std::string replyToKey, uint32_t sequence);
00182     void handleSchemaQuery   (qpid::framing::Buffer& inBuffer, std::string replyToKey, uint32_t sequence);
00183     void handleAttachRequest (qpid::framing::Buffer& inBuffer, std::string replyToKey, uint32_t sequence);
00184     void handleGetRequest    (qpid::framing::Buffer& inBuffer, std::string replyToKey, uint32_t sequence);
00185 };
00186 
00187 }}
00188             
00189 #endif  

Generated on Thu Apr 10 11:08:18 2008 for Qpid by  doxygen 1.4.7