00001
00002 #ifndef _MANAGEMENT_CONSUMER_
00003 #define _MANAGEMENT_CONSUMER_
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027 #include "qpid/management/ManagementObject.h"
00028 #include "qpid/framing/Uuid.h"
00029
00030 namespace qpid {
00031 namespace management {
00032
00033 class Consumer : public ManagementObject
00034 {
00035 private:
00036
00037 static std::string packageName;
00038 static std::string className;
00039 static uint8_t md5Sum[16];
00040
00041
00042 uint64_t destinationRef;
00043 uint64_t queueRef;
00044
00045
00046 uint64_t msgsConsumed;
00047 uint64_t bytesConsumed;
00048 uint32_t unackedMessages;
00049 uint32_t unackedMessagesHigh;
00050 uint32_t unackedMessagesLow;
00051
00052
00053 static void writeSchema (qpid::framing::Buffer& buf);
00054 void writeConfig (qpid::framing::Buffer& buf);
00055 void writeInstrumentation (qpid::framing::Buffer& buf,
00056 bool skipHeaders = false);
00057 void doMethod (std::string methodName,
00058 qpid::framing::Buffer& inBuf,
00059 qpid::framing::Buffer& outBuf);
00060 writeSchemaCall_t getWriteSchemaCall (void) { return writeSchema; }
00061
00062
00063 public:
00064
00065 friend class PackageQpid;
00066 typedef boost::shared_ptr<Consumer> shared_ptr;
00067
00068 Consumer (Manageable* coreObject, uint64_t _destinationRef, uint64_t _queueRef);
00069 ~Consumer (void);
00070
00071 std::string getPackageName (void) { return packageName; }
00072 std::string getClassName (void) { return className; }
00073 uint8_t* getMd5Sum (void) { return md5Sum; }
00074
00075
00076 static const uint32_t METHOD_CLOSE = 1;
00077
00078
00079 inline void inc_msgsConsumed (uint64_t by = 1){
00080 sys::RWlock::ScopedWlock writeLock (accessLock);
00081 msgsConsumed += by;
00082 instChanged = true;
00083 }
00084 inline void dec_msgsConsumed (uint64_t by = 1){
00085 sys::RWlock::ScopedWlock writeLock (accessLock);
00086 msgsConsumed -= by;
00087 instChanged = true;
00088 }
00089 inline void inc_bytesConsumed (uint64_t by = 1){
00090 sys::RWlock::ScopedWlock writeLock (accessLock);
00091 bytesConsumed += by;
00092 instChanged = true;
00093 }
00094 inline void dec_bytesConsumed (uint64_t by = 1){
00095 sys::RWlock::ScopedWlock writeLock (accessLock);
00096 bytesConsumed -= by;
00097 instChanged = true;
00098 }
00099 inline void inc_unackedMessages (uint32_t by = 1){
00100 sys::RWlock::ScopedWlock writeLock (accessLock);
00101 unackedMessages += by;
00102 if (unackedMessagesHigh < unackedMessages)
00103 unackedMessagesHigh = unackedMessages;
00104 instChanged = true;
00105 }
00106 inline void dec_unackedMessages (uint32_t by = 1){
00107 sys::RWlock::ScopedWlock writeLock (accessLock);
00108 unackedMessages -= by;
00109 if (unackedMessagesLow > unackedMessages)
00110 unackedMessagesLow = unackedMessages;
00111 instChanged = true;
00112 }
00113
00114 };
00115
00116 }}
00117
00118
00119 #endif