/usr/share/cruisecontrol-bin-2.6.1/projects/qpid-trunk/cpp/src/qpid/cluster/Cpg.h

00001 #ifndef CPG_H
00002 #define CPG_H
00003 
00004 /*
00005  *
00006  * Copyright (c) 2006 The Apache Software Foundation
00007  *
00008  * Licensed under the Apache License, Version 2.0 (the "License");
00009  * you may not use this file except in compliance with the License.
00010  * You may obtain a copy of the License at
00011  *
00012  *    http://www.apache.org/licenses/LICENSE-2.0
00013  *
00014  * Unless required by applicable law or agreed to in writing, software
00015  * distributed under the License is distributed on an "AS IS" BASIS,
00016  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
00017  * See the License for the specific language governing permissions and
00018  * limitations under the License.
00019  *
00020  */
00021 
00022 #include "qpid/Exception.h"
00023 #include "qpid/cluster/Dispatchable.h"
00024 
00025 #include <cassert>
00026 #include <string.h>
00027 
00028 extern "C" {
00029 #include <openais/cpg.h>
00030 }
00031 
00032 namespace qpid {
00033 namespace cluster {
00034 
00040 class Cpg : public Dispatchable {
00041   public:
00042     struct Exception : public ::qpid::Exception {
00043         Exception(const std::string& msg) : ::qpid::Exception(msg) {}
00044     };
00045 
00046     struct Name : public cpg_name {
00047         Name(const char* s) { copy(s, strlen(s)); }
00048         Name(const char* s, size_t n) { copy(s,n); }
00049         Name(const std::string& s) { copy(s.data(), s.size()); }
00050         void copy(const char* s, size_t n) {
00051             assert(n < CPG_MAX_NAME_LENGTH);
00052             memcpy(value, s, n);
00053             length=n;
00054         }
00055 
00056         std::string str() const { return std::string(value, length); }
00057     };
00058     
00059     struct Id {
00060         uint64_t id;
00061         Id(uint64_t n=0) : id(n) {}
00062         Id(uint32_t nodeid, uint32_t pid) { id=(uint64_t(nodeid)<<32)+ pid; }
00063         Id(const cpg_address& addr) : id(Id(addr.nodeid, addr.pid)) {}
00064 
00065         operator uint64_t() const { return id; }
00066         uint32_t nodeId() const { return id >> 32; }
00067         pid_t pid() const { return id & 0xFFFF; }
00068     };
00069 
00070     static std::string str(const cpg_name& n) {
00071         return std::string(n.value, n.length);
00072     }
00073 
00074     struct Handler {
00075         virtual ~Handler() {};
00076         virtual void deliver(
00077             cpg_handle_t /*handle*/,
00078             struct cpg_name *group,
00079             uint32_t /*nodeid*/,
00080             uint32_t /*pid*/,
00081             void* /*msg*/,
00082             int /*msg_len*/) = 0;
00083 
00084         virtual void configChange(
00085             cpg_handle_t /*handle*/,
00086             struct cpg_name */*group*/,
00087             struct cpg_address */*members*/, int /*nMembers*/,
00088             struct cpg_address */*left*/, int /*nLeft*/,
00089             struct cpg_address */*joined*/, int /*nJoined*/
00090         ) = 0;
00091     };
00092 
00096     Cpg(Handler&);
00097     
00099     ~Cpg();
00100 
00102     void shutdown();
00103     
00110     void dispatch(cpg_dispatch_t type) {
00111         check(cpg_dispatch(handle,type), "Error in CPG dispatch");
00112     }
00113 
00114     void dispatchOne() { dispatch(CPG_DISPATCH_ONE); }
00115     void dispatchAll() { dispatch(CPG_DISPATCH_ALL); }
00116     void dispatchBlocking() { dispatch(CPG_DISPATCH_BLOCKING); }
00117 
00118     void join(const Name& group) {
00119         check(cpg_join(handle, const_cast<Name*>(&group)),cantJoinMsg(group));
00120     };
00121     
00122     void leave(const Name& group) {
00123         check(cpg_leave(handle,const_cast<Name*>(&group)),cantLeaveMsg(group));
00124     }
00125 
00126     void mcast(const Name& group, const iovec* iov, int iovLen) {
00127         check(cpg_mcast_joined(
00128                   handle, CPG_TYPE_AGREED, const_cast<iovec*>(iov), iovLen),
00129               cantMcastMsg(group));
00130     }
00131 
00132     cpg_handle_t getHandle() const { return handle; }
00133 
00134   private:
00135     class Handles;
00136     struct ClearHandleOnExit;
00137   friend class Handles;
00138   friend struct ClearHandleOnExit;
00139     
00140     static std::string errorStr(cpg_error_t err, const std::string& msg);
00141     static std::string cantJoinMsg(const Name&);
00142     static std::string cantLeaveMsg(const Name&);
00143     static std::string cantMcastMsg(const Name&);
00144     
00145     static void check(cpg_error_t result, const std::string& msg) {
00146         // TODO aconway 2007-06-01: Logging and exceptions.
00147         if (result != CPG_OK) 
00148             throw Exception(errorStr(result, msg));
00149     }
00150 
00151     static void globalDeliver(
00152         cpg_handle_t /*handle*/,
00153         struct cpg_name *group,
00154         uint32_t /*nodeid*/,
00155         uint32_t /*pid*/,
00156         void* /*msg*/,
00157         int /*msg_len*/);
00158 
00159     static void globalConfigChange(
00160         cpg_handle_t /*handle*/,
00161         struct cpg_name */*group*/,
00162         struct cpg_address */*members*/, int /*nMembers*/,
00163         struct cpg_address */*left*/, int /*nLeft*/,
00164         struct cpg_address */*joined*/, int /*nJoined*/
00165     );
00166 
00167     static Handles handles;
00168     cpg_handle_t handle;
00169     Handler& handler;
00170 };
00171 
00172 std::ostream& operator <<(std::ostream& out, const cpg_name& name);
00173 std::ostream& operator <<(std::ostream& out, const Cpg::Id& id);
00174 std::ostream& operator <<(std::ostream& out, const std::pair<cpg_address*,int> addresses);
00175 
00176 inline bool operator==(const cpg_name& a, const cpg_name& b) {
00177     return a.length==b.length &&  strncmp(a.value, b.value, a.length) == 0;
00178 }
00179 inline bool operator!=(const cpg_name& a, const cpg_name& b) { return !(a == b); }
00180 
00181 }} // namespace qpid::cluster
00182 
00183 
00184 
00185 #endif  

Generated on Thu Apr 10 11:08:18 2008 for Qpid by  doxygen 1.4.7