00001 #ifndef _broker_Message_h
00002 #define _broker_Message_h
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025 #include <string>
00026 #include <vector>
00027 #include <boost/shared_ptr.hpp>
00028 #include <boost/variant.hpp>
00029 #include "PersistableMessage.h"
00030 #include "MessageAdapter.h"
00031 #include "qpid/framing/amqp_types.h"
00032 #include "qpid/sys/Mutex.h"
00033 #include "qpid/sys/Time.h"
00034
00035 namespace qpid {
00036
00037 namespace framing {
00038 class FieldTable;
00039 class SequenceNumber;
00040 }
00041
00042 namespace broker {
00043 class ConnectionToken;
00044 class Exchange;
00045 class ExchangeRegistry;
00046 class MessageStore;
00047 class Queue;
00048
00049 class Message : public PersistableMessage {
00050 public:
00051 Message(const framing::SequenceNumber& id = framing::SequenceNumber());
00052 ~Message();
00053
00054 uint64_t getPersistenceId() const { return persistenceId; }
00055 void setPersistenceId(uint64_t _persistenceId) const { persistenceId = _persistenceId; }
00056
00057 bool getRedelivered() const { return redelivered; }
00058 void redeliver() { redelivered = true; }
00059
00060 const ConnectionToken* getPublisher() const { return publisher; }
00061 void setPublisher(ConnectionToken* p) { publisher = p; }
00062
00063 const framing::SequenceNumber& getCommandId() { return frames.getId(); }
00064
00065 uint64_t contentSize() const;
00066
00067 std::string getRoutingKey() const;
00068 const boost::shared_ptr<Exchange> getExchange(ExchangeRegistry&) const;
00069 std::string getExchangeName() const;
00070 bool isImmediate() const;
00071 const framing::FieldTable* getApplicationHeaders() const;
00072 bool isPersistent();
00073 bool requiresAccept();
00074 void setTimestamp();
00075 bool hasExpired() const;
00076
00077 framing::FrameSet& getFrames() { return frames; }
00078 const framing::FrameSet& getFrames() const { return frames; }
00079
00080 template <class T> T* getProperties() {
00081 qpid::framing::AMQHeaderBody* p = frames.getHeaders();
00082 return p->get<T>(true);
00083 }
00084
00085 template <class T> const T* getProperties() const {
00086 qpid::framing::AMQHeaderBody* p = frames.getHeaders();
00087 return p->get<T>(true);
00088 }
00089
00090 template <class T> const T* hasProperties() const {
00091 const qpid::framing::AMQHeaderBody* p = frames.getHeaders();
00092 return p->get<T>();
00093 }
00094
00095 template <class T> const T* getMethod() const {
00096 return frames.as<T>();
00097 }
00098
00099 template <class T> bool isA() const {
00100 return frames.isA<T>();
00101 }
00102
00103 uint32_t getRequiredCredit() const;
00104
00105 void encode(framing::Buffer& buffer) const;
00106 void encodeContent(framing::Buffer& buffer) const;
00107
00112 uint32_t encodedSize() const;
00118 uint32_t encodedHeaderSize() const;
00119 uint32_t encodedContentSize() const;
00120
00121 void decodeHeader(framing::Buffer& buffer);
00122 void decodeContent(framing::Buffer& buffer);
00123
00129 void releaseContent(MessageStore* store);
00130 void destroy();
00131
00132 void sendContent(Queue& queue, framing::FrameHandler& out, uint16_t maxFrameSize) const;
00133 void sendHeader(framing::FrameHandler& out, uint16_t maxFrameSize) const;
00134
00135 bool isContentLoaded() const;
00136
00137 bool isExcluded(const std::vector<std::string>& excludes) const;
00138 void addTraceId(const std::string& id);
00139
00140 void forcePersistent();
00141
00142 private:
00143 mutable sys::Mutex lock;
00144 framing::FrameSet frames;
00145 mutable boost::shared_ptr<Exchange> exchange;
00146 mutable uint64_t persistenceId;
00147 bool redelivered;
00148 bool loaded;
00149 bool staged;
00150 bool forcePersistentPolicy;
00151 ConnectionToken* publisher;
00152 mutable MessageAdapter* adapter;
00153 qpid::sys::AbsTime expiration;
00154
00155 static TransferAdapter TRANSFER;
00156
00157 MessageAdapter& getAdapter() const;
00158 };
00159
00160 }}
00161
00162
00163 #endif