/usr/share/cruisecontrol-bin-2.6.1/projects/qpid-trunk/cpp/src/qpid/sys/BlockingQueue.h

00001 #ifndef QPID_SYS_BLOCKINGQUEUE_H
00002 #define QPID_SYS_BLOCKINGQUEUE_H
00003 
00004 /*
00005  *
00006  * Licensed to the Apache Software Foundation (ASF) under one
00007  * or more contributor license agreements.  See the NOTICE file
00008  * distributed with this work for additional information
00009  * regarding copyright ownership.  The ASF licenses this file
00010  * to you under the Apache License, Version 2.0 (the
00011  * "License"); you may not use this file except in compliance
00012  * with the License.  You may obtain a copy of the License at
00013  * 
00014  *   http://www.apache.org/licenses/LICENSE-2.0
00015  * 
00016  * Unless required by applicable law or agreed to in writing,
00017  * software distributed under the License is distributed on an
00018  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
00019  * KIND, either express or implied.  See the License for the
00020  * specific language governing permissions and limitations
00021  * under the License.
00022  *
00023  */
00024 
00025 #include "Waitable.h"
00026 
00027 #include <queue>
00028 
00029 namespace qpid {
00030 namespace sys {
00031 
00035 template <class T>
00036 class BlockingQueue
00037 {
00038     mutable sys::Waitable lock;
00039     std::queue<T> queue;
00040     bool closed;
00041 
00042 public:
00043     BlockingQueue() : closed(false) {}
00044     ~BlockingQueue() { close(); }
00045 
00047     T pop()
00048     {
00049         Waitable::ScopedLock l(lock);
00050         if (!queueWait()) throw ClosedException();
00051         return popInternal();
00052     }
00053 
00057     bool tryPop(T& outValue) {
00058         Waitable::ScopedLock l(lock);
00059         if (queue.empty()) return false;
00060         outValue = popInternal();
00061         return true;
00062     }
00063 
00067     T tryPop(const T& valueIfEmpty=T()) {
00068         T result=valueIfEmpty;
00069         tryPop(result);
00070         return result;
00071     }
00072 
00074     void push(const T& t)
00075     {
00076         Waitable::ScopedLock l(lock);
00077         queue.push(t);
00078         queueNotify(0);
00079     }
00080 
00085     void close()
00086     {
00087         Waitable::ScopedLock l(lock);
00088         if (!closed) {
00089             closed = true;
00090             lock.notifyAll();
00091             lock.waitWaiters(); // Ensure no threads are still waiting.
00092         }
00093     }
00094 
00096     void open() {
00097         Waitable::ScopedLock l(lock);
00098         closed=false;
00099     }
00100 
00101     bool isClosed() const { 
00102         Waitable::ScopedLock l(lock);
00103         return closed;
00104     }
00105 
00106     bool empty() const {
00107         Waitable::ScopedLock l(lock);
00108         return queue.empty();
00109     }    
00110     size_t size() const {
00111         Waitable::ScopedLock l(lock);
00112         return queue.size();
00113     }    
00114 
00115   private:
00116 
00117     void queueNotify(size_t ignore) {
00118         if (!queue.empty() && lock.hasWaiters()>ignore)
00119             lock.notify();      // Notify another waiter.
00120     }
00121 
00122     bool queueWait() {
00123         Waitable::ScopedWait w(lock);
00124         while (!closed && queue.empty())
00125             lock.wait();
00126         return !queue.empty();
00127     }
00128 
00129     T popInternal() {
00130         T t=queue.front();
00131         queue.pop();
00132         queueNotify(1);
00133         return t;
00134     }
00135     
00136 };
00137 
00138 }}
00139 
00140 
00141 
00142 #endif  

Generated on Thu Apr 10 11:08:18 2008 for Qpid by  doxygen 1.4.7