00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021 #ifndef _QueuePolicy_
00022 #define _QueuePolicy_
00023
00024 #include <deque>
00025 #include <iostream>
00026 #include <memory>
00027 #include "QueuedMessage.h"
00028 #include "qpid/framing/FieldTable.h"
00029 #include "qpid/sys/AtomicValue.h"
00030 #include "qpid/sys/Mutex.h"
00031
00032 namespace qpid {
00033 namespace broker {
00034
00035 class QueuePolicy
00036 {
00037 static uint64_t defaultMaxSize;
00038
00039 const uint32_t maxCount;
00040 const uint64_t maxSize;
00041 const std::string type;
00042 qpid::sys::AtomicValue<uint32_t> count;
00043 qpid::sys::AtomicValue<uint64_t> size;
00044 bool policyExceeded;
00045
00046 static int getInt(const qpid::framing::FieldTable& settings, const std::string& key, int defaultValue);
00047 static std::string getType(const qpid::framing::FieldTable& settings);
00048
00049 public:
00050 static const std::string maxCountKey;
00051 static const std::string maxSizeKey;
00052 static const std::string typeKey;
00053 static const std::string REJECT;
00054 static const std::string FLOW_TO_DISK;
00055 static const std::string RING;
00056 static const std::string RING_STRICT;
00057
00058 virtual ~QueuePolicy() {}
00059 void tryEnqueue(const QueuedMessage&);
00060 virtual void dequeued(const QueuedMessage&);
00061 virtual bool isEnqueued(const QueuedMessage&);
00062 virtual bool checkLimit(const QueuedMessage&);
00063 void update(qpid::framing::FieldTable& settings);
00064 uint32_t getMaxCount() const { return maxCount; }
00065 uint64_t getMaxSize() const { return maxSize; }
00066
00067 static std::auto_ptr<QueuePolicy> createQueuePolicy(const qpid::framing::FieldTable& settings);
00068 static std::auto_ptr<QueuePolicy> createQueuePolicy(uint32_t maxCount, uint64_t maxSize, const std::string& type = REJECT);
00069 static void setDefaultMaxSize(uint64_t);
00070 friend std::ostream& operator<<(std::ostream&, const QueuePolicy&);
00071 protected:
00072 QueuePolicy(uint32_t maxCount, uint64_t maxSize, const std::string& type = REJECT);
00073
00074 virtual void enqueued(const QueuedMessage&);
00075 void enqueued(uint64_t size);
00076 void dequeued(uint64_t size);
00077 };
00078
00079
00080 class FlowToDiskPolicy : public QueuePolicy
00081 {
00082 public:
00083 FlowToDiskPolicy(uint32_t maxCount, uint64_t maxSize);
00084 bool checkLimit(const QueuedMessage&);
00085 };
00086
00087 class RingQueuePolicy : public QueuePolicy
00088 {
00089 public:
00090 RingQueuePolicy(uint32_t maxCount, uint64_t maxSize, const std::string& type = RING);
00091 void enqueued(const QueuedMessage&);
00092 void dequeued(const QueuedMessage&);
00093 bool isEnqueued(const QueuedMessage&);
00094 bool checkLimit(const QueuedMessage&);
00095 private:
00096 typedef std::deque<QueuedMessage> Messages;
00097 qpid::sys::Mutex lock;
00098 Messages queue;
00099 const bool strict;
00100 };
00101
00102 }}
00103
00104
00105 #endif