/usr/share/cruisecontrol-bin-2.6.1/projects/qpid-trunk/cpp/src/qpid/sys/Serializer.h

00001 #ifndef SERIALIZER_H
00002 #define SERIALIZER_H
00003 
00004 
00005 /*
00006  *
00007  * Licensed to the Apache Software Foundation (ASF) under one
00008  * or more contributor license agreements.  See the NOTICE file
00009  * distributed with this work for additional information
00010  * regarding copyright ownership.  The ASF licenses this file
00011  * to you under the Apache License, Version 2.0 (the
00012  * "License"); you may not use this file except in compliance
00013  * with the License.  You may obtain a copy of the License at
00014  * 
00015  *   http://www.apache.org/licenses/LICENSE-2.0
00016  * 
00017  * Unless required by applicable law or agreed to in writing,
00018  * software distributed under the License is distributed on an
00019  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
00020  * KIND, either express or implied.  See the License for the
00021  * specific language governing permissions and limitations
00022  * under the License.
00023  *
00024  */
00025 
00026 #include "qpid/Exception.h"
00027 #include "qpid/sys/Runnable.h"
00028 #include "qpid/sys/Monitor.h"
00029 #include "qpid/sys/Thread.h"
00030 
00031 #include <boost/function.hpp>
00032 #include <boost/noncopyable.hpp>
00033 
00034 #include <deque>
00035 
00036 namespace qpid {
00037 namespace sys {
00038 
00040 class SerializerBase : private boost::noncopyable, private Runnable
00041 {
00042   public:
00043     typedef boost::function<void()> VoidFn0;
00044     struct ShutdownException : public Exception {};
00045         
00047     SerializerBase(bool immediate=true);
00048 
00049     virtual ~SerializerBase() { shutdown(); }
00050 
00051     virtual void dispatch() = 0;
00052   protected:
00053     enum State {
00054         IDLE, 
00055         EXECUTING, 
00056         DISPATCHING, 
00057         SHUTDOWN 
00058     };
00059 
00060     void shutdown();
00061     void notifyWorker();
00062     void run();
00063     virtual bool empty() = 0;
00064     bool running();
00065     void wait();
00066 
00067     Monitor lock;
00068     State state;
00069     bool immediate;
00070     Thread worker;
00071 };
00072 
00073 
00085 template <class Task>
00086 class Serializer : public SerializerBase {
00087 
00088     std::deque<Task> queue;
00089 
00090     bool empty() { return queue.empty(); }
00091     void dispatch(Task& task);
00092     
00093   public:
00099     Serializer(bool immediate=true)
00100         : SerializerBase(immediate) {}
00101 
00102     ~Serializer() { shutdown(); }
00109     void execute(Task& task);
00110 
00111 
00117     void dispatch();
00118     };
00119 
00120 
00121 template <class Task>
00122 void Serializer<Task>::execute(Task& task) {
00123     Mutex::ScopedLock l(lock);
00124     assert(state != SHUTDOWN);
00125     if (immediate && state == IDLE) {
00126         state = EXECUTING;
00127         dispatch(task);
00128         if (state != SHUTDOWN) {
00129             assert(state == EXECUTING);
00130             state = IDLE;
00131         }
00132     }
00133     else 
00134         queue.push_back(task);
00135     if (!queue.empty() && state == IDLE) {
00136         state = DISPATCHING;
00137         notifyWorker();
00138     }
00139 }
00140 
00141 template <class Task>
00142 void Serializer<Task>::dispatch() {
00143     Mutex::ScopedLock l(lock);
00144     // TODO aconway 2007-07-16: This loop could be unbounded
00145     // if other threads add work while we're in dispatch(Task&).
00146     // If we need to bound it we could dispatch just the elements
00147     // that were enqueued when dispatch() was first called - save
00148     // begin() iterator and pop only up to that.
00149     while (!queue.empty() && state != SHUTDOWN) {
00150         assert(state == DISPATCHING);
00151         dispatch(queue.front());
00152         queue.pop_front();
00153     }
00154     if (state != SHUTDOWN) {
00155         assert(state == DISPATCHING);
00156         state = IDLE;
00157     }
00158 }
00159 
00160 template <class Task>
00161 void Serializer<Task>::dispatch(Task& task) {
00162     // Preconditions: lock is held, state is EXECUTING or DISPATCHING
00163     assert(state != IDLE);
00164     assert(state != SHUTDOWN);
00165     assert(state == EXECUTING || state == DISPATCHING);
00166     Mutex::ScopedUnlock u(lock);
00167     // No exceptions allowed in task.
00168         notifyWorker();
00169     try { task(); } catch (...) { assert(0); }
00170 }
00171 
00172 
00173 
00174 
00175 }} // namespace qpid::sys
00176 
00177 
00178 
00179 
00180 
00181 #endif  

Generated on Thu Apr 10 11:08:18 2008 for Qpid by  doxygen 1.4.7