/usr/share/cruisecontrol-bin-2.6.1/projects/qpid-trunk/cpp/src/qpid/broker/PersistableMessage.h

00001 #ifndef _broker_PersistableMessage_h
00002 #define _broker_PersistableMessage_h
00003 
00004 /*
00005  *
00006  * Licensed to the Apache Software Foundation (ASF) under one
00007  * or more contributor license agreements.  See the NOTICE file
00008  * distributed with this work for additional information
00009  * regarding copyright ownership.  The ASF licenses this file
00010  * to you under the Apache License, Version 2.0 (the
00011  * "License"); you may not use this file except in compliance
00012  * with the License.  You may obtain a copy of the License at
00013  * 
00014  *   http://www.apache.org/licenses/LICENSE-2.0
00015  * 
00016  * Unless required by applicable law or agreed to in writing,
00017  * software distributed under the License is distributed on an
00018  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
00019  * KIND, either express or implied.  See the License for the
00020  * specific language governing permissions and limitations
00021  * under the License.
00022  *
00023  */
00024 
00025 #include <string>
00026 #include <list>
00027 #include <boost/shared_ptr.hpp>
00028 #include <boost/weak_ptr.hpp>
00029 #include "Persistable.h"
00030 #include "qpid/framing/amqp_types.h"
00031 #include "qpid/sys/Monitor.h"
00032 #include "PersistableQueue.h"
00033 
00034 namespace qpid {
00035 namespace broker {
00036 
00037 class MessageStore;
00038 
00043 class PersistableMessage : public Persistable
00044 {
00045     sys::Monitor asyncEnqueueLock;
00046     sys::Monitor asyncDequeueLock;
00047     sys::Mutex storeLock;
00048         
00056     int asyncEnqueueCounter;
00057 
00065     int asyncDequeueCounter;
00066 protected:
00067     typedef std::list< boost::weak_ptr<PersistableQueue> > syncList;
00068     syncList synclist;
00069     MessageStore* store;
00070     bool contentReleased;
00071     
00072     inline void setContentReleased() {contentReleased = true; }
00073 
00074 public:
00075     typedef boost::shared_ptr<PersistableMessage> shared_ptr;
00076 
00080     virtual uint32_t encodedHeaderSize() const = 0;
00081 
00082     virtual ~PersistableMessage() {};
00083 
00084     PersistableMessage(): 
00085         asyncEnqueueCounter(0), 
00086         asyncDequeueCounter(0),
00087         store(0),
00088         contentReleased(false) 
00089         {}
00090 
00091     void flush();
00092     
00093     inline bool isContentReleased()const {return contentReleased; }
00094         
00095     inline void waitForEnqueueComplete() {
00096         sys::ScopedLock<sys::Monitor> l(asyncEnqueueLock);
00097         while (asyncEnqueueCounter > 0) {
00098             asyncEnqueueLock.wait();
00099         }
00100     }
00101 
00102     inline bool isEnqueueComplete() {
00103         sys::ScopedLock<sys::Monitor> l(asyncEnqueueLock);
00104         return asyncEnqueueCounter == 0;
00105     }
00106 
00107     inline void enqueueComplete() {
00108         bool notify = false;
00109         {
00110             sys::ScopedLock<sys::Monitor> l(asyncEnqueueLock);
00111             if (asyncEnqueueCounter > 0) {
00112                 if (--asyncEnqueueCounter == 0) {
00113                     asyncEnqueueLock.notify();
00114                     notify = true;
00115                 }
00116             }
00117         }
00118         if (notify) {
00119             sys::ScopedLock<sys::Mutex> l(storeLock);
00120             if (store) {
00121                 for (syncList::iterator i = synclist.begin(); i != synclist.end(); ++i) {
00122                     PersistableQueue::shared_ptr q(i->lock());
00123                     if (q) q->notifyDurableIOComplete();
00124                 } 
00125                 //synclist.clear();
00126             }            
00127         }
00128     }
00129 
00130     inline void enqueueAsync(PersistableQueue::shared_ptr queue, MessageStore* _store) { 
00131         if (_store){
00132             sys::ScopedLock<sys::Mutex> l(storeLock);
00133             store = _store;
00134             boost::weak_ptr<PersistableQueue> q(queue);
00135             synclist.push_back(q);
00136         }
00137         enqueueAsync();
00138     }
00139 
00140     inline void enqueueAsync() { 
00141         sys::ScopedLock<sys::Monitor> l(asyncEnqueueLock);
00142         asyncEnqueueCounter++; 
00143     }
00144 
00145     inline bool isDequeueComplete() { 
00146         sys::ScopedLock<sys::Monitor> l(asyncDequeueLock);
00147         return asyncDequeueCounter == 0;
00148     }
00149     
00150     inline void dequeueComplete() { 
00151 
00152         sys::ScopedLock<sys::Monitor> l(asyncDequeueLock);
00153         if (asyncDequeueCounter > 0) {
00154             if (--asyncDequeueCounter == 0) {
00155                 asyncDequeueLock.notify();
00156             }
00157         }
00158     }
00159 
00160     inline void waitForDequeueComplete() {
00161         sys::ScopedLock<sys::Monitor> l(asyncDequeueLock);
00162         while (asyncDequeueCounter > 0) {
00163             asyncDequeueLock.wait();
00164         }
00165     }
00166 
00167     inline void dequeueAsync(PersistableQueue::shared_ptr queue, MessageStore* _store) { 
00168         if (_store){
00169             sys::ScopedLock<sys::Mutex> l(storeLock);
00170             store = _store;
00171             boost::weak_ptr<PersistableQueue> q(queue);
00172             synclist.push_back(q);
00173         }
00174         dequeueAsync();
00175     }
00176 
00177     inline void dequeueAsync() { 
00178         sys::ScopedLock<sys::Monitor> l(asyncDequeueLock);
00179         asyncDequeueCounter++; 
00180     }
00181 
00182     
00183 };
00184 
00185 }}
00186 
00187 
00188 #endif

Generated on Thu Apr 10 11:08:17 2008 for Qpid by  doxygen 1.4.7