Fawkes API  Fawkes Development Version
 All Classes Namespaces Functions Variables Typedefs Enumerations Enumerator Friends Groups Pages
interruptible_barrier.cpp
1 
2 /***************************************************************************
3  * interruptible_barrier.cpp - Interruptible Barrier
4  *
5  * Created: Sat Jan 31 12:30:32 2009
6  * Copyright 2006-2009 Tim Niemueller [www.niemueller.de]
7  *
8  ****************************************************************************/
9 
10 /* This program is free software; you can redistribute it and/or modify
11  * it under the terms of the GNU General Public License as published by
12  * the Free Software Foundation; either version 2 of the License, or
13  * (at your option) any later version. A runtime exception applies to
14  * this software (see LICENSE.GPL_WRE file mentioned below for details).
15  *
16  * This program is distributed in the hope that it will be useful,
17  * but WITHOUT ANY WARRANTY; without even the implied warranty of
18  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
19  * GNU Library General Public License for more details.
20  *
21  * Read the full text in the LICENSE.GPL_WRE file in the doc directory.
22  */
23 
24 #include <core/threading/interruptible_barrier.h>
25 #include <core/threading/thread_list.h>
26 #include <core/exceptions/system.h>
27 #include <core/macros.h>
28 
29 #include <core/threading/mutex.h>
30 #include <core/threading/wait_condition.h>
31 
32 namespace fawkes {
33 #if 0 /* just to make Emacs auto-indent happy */
34 }
35 #endif
36 
37 
38 /// @cond INTERNALS
39 class InterruptibleBarrierData
40 {
41  public:
42  unsigned int threads_left;
43  Mutex *mutex;
44  WaitCondition *waitcond;
45  bool own_mutex;
46 
47  InterruptibleBarrierData(Mutex *mutex)
48  {
49  if (mutex) {
50  this->mutex = mutex;
51  own_mutex = false;
52  } else {
53  this->mutex = new Mutex();
54  own_mutex = true;
55  }
56  waitcond = new WaitCondition(this->mutex);
57  }
58 
59  ~InterruptibleBarrierData()
60  {
61  if (own_mutex) delete mutex;
62  delete waitcond;
63  }
64 };
65 /// @endcond
66 
67 
68 /** @class InterruptibleBarrier <core/threading/barrier.h>
69  * A barrier is a synchronization tool which blocks until a given number of
70  * threads have reached the barrier. This particular implementations allows for
71  * giving a timeout after which the waiting is aborted.
72  *
73  * For general information when a barrier is useful see the Barrier class.
74  *
75  * Additionally to the general barrier features the InterruptibleBarrier::wait()
76  * can be given a timeout after which the waiting is aborted.
77  * Since the POSIX standard does not provide a timed wait for barriers this
78  * implementation uses a Mutex and WaitCondition internally to achieve the
79  * desired result.
80  *
81  * @see Barrier
82  * @ingroup Threading
83  * @author Tim Niemueller
84  */
85 
86 
87 /** Constructor.
88  * @param count the number of threads to wait for
89  */
90 InterruptibleBarrier::InterruptibleBarrier(unsigned int count)
91  : Barrier(count)
92 {
93  _count = count;
94  if ( _count == 0 ) {
95  throw Exception("Barrier count must be at least 1");
96  }
97  __data = new InterruptibleBarrierData(NULL);
98  __data->threads_left = 0;
99  __passed_threads = RefPtr<ThreadList>(new ThreadList());
100 
101  __interrupted = false;
102  __timeout = false;
103 }
104 
105 
106 /** Constructor with custom mutex.
107  * Use this constructor only if you really know what you are doing. This constructor
108  * allows to pass a mutex that is used internally for the barrier. Note that in
109  * this case it is your duty to lock the mutex before the wait() and unlock
110  * afterwards! It combines features of a barrier and a wait condition.
111  * @param mutex Mutex to use
112  * @param count the number of threads to wait for
113  */
115  : Barrier(count)
116 {
117  _count = count;
118  if ( _count == 0 ) {
119  throw Exception("Barrier count must be at least 1");
120  }
121  __data = new InterruptibleBarrierData(mutex);
122  __data->threads_left = 0;
123  __passed_threads = RefPtr<ThreadList>(new ThreadList());
124 
125  __interrupted = false;
126  __timeout = false;
127 }
128 
129 /** Invalid constructor.
130  * This will throw an exception if called as it is illegal to copy
131  * a barrier.
132  * @param barrier to copy
133  */
135  : Barrier()
136 {
137  throw Exception("Barriers cannot be copied");
138 }
139 
140 
141 /** Invalid constructor.
142  * This will throw an exception if called as it is illegal to copy
143  * a barrier.
144  * @param barrier to copy
145  */
146 InterruptibleBarrier::InterruptibleBarrier(const InterruptibleBarrier *b)
147  : Barrier()
148 {
149  throw Exception("Barriers cannot be copied");
150 }
151 
152 
153 /** Invalid assignment operator.
154  * This will throw an exception if called as it is illegal to assign
155  * a barrier.
156  * @param barrier to copy
157  */
158 InterruptibleBarrier &
159 InterruptibleBarrier::operator=(const InterruptibleBarrier &b)
160 {
161  throw Exception("Barriers cannot be assigned");
162 }
163 
164 /** Invalid assignment operator.
165  * This will throw an exception if called as it is illegal to assign
166  * a barrier.
167  * @param barrier to copy
168  */
169 InterruptibleBarrier &
170 InterruptibleBarrier::operator=(const InterruptibleBarrier *b)
171 {
172  throw Exception("Barriers cannot be assigned");
173 }
174 
175 
176 /** Destructor */
178 {
179  delete __data;
180 }
181 
182 
183 /** Get a list of threads that passed the barrier.
184  * The list contains the threads that passed the barrier. With some book keeping
185  * outside of the barrier you can determine which threads you expected at the
186  * barrier but did not pass it.
187  * @return refptr to list of threads that passed the barrier.
188  */
191 {
192  return __passed_threads;
193 }
194 
195 
196 /** Interrupt the barrier.
197  * This will cause all threads currently waiting on the barrier to
198  * throw an exception and no further thread will wait.
199  * You have to call reset() the before you use this barrier
200  * the next time.
201  */
202 void
204 {
205  if (likely(__data->own_mutex)) __data->mutex->lock();
206  __interrupted = true;
207  __data->waitcond->wake_all();
208  if (likely(__data->own_mutex)) __data->mutex->unlock();
209 }
210 
211 
212 /** Clears the barrier.
213  * Call this method when you want to use the barrier the next time after
214  * an interrupt or timeout occured. Make sure all threads that should have
215  * passed the barrier the last time did pass it.
216  */
217 void
219 {
220  if (likely(__data->own_mutex)) __data->mutex->lock();
221  __interrupted = false;
222  __timeout = false;
223  __data->threads_left = _count;
224  __passed_threads.clear();
225  if (likely(__data->own_mutex)) __data->mutex->unlock();
226 }
227 
228 
229 /** Wait for other threads.
230  * This method will block until as many threads have called wait as you have
231  * given count to the constructor. Note that if the barrier is interrupted or
232  * times out you need to call reset() to get the barrier into a re-usable state.
233  * It is your duty to make sure that all threads using the barrier are in a
234  * cohesive state.
235  * @param timeout_sec relative timeout in seconds, added to timeout_nanosec
236  * @param timeout_nanosec timeout in nanoseconds
237  * @return true, if the barrier was properly reached, false if the barrier timeout
238  * was reached and the wait did not finish properly.
239  * @exception InterruptedException thrown if the barrier was forcefully interrupted
240  * by calling interrupt().
241  */
242 bool
243 InterruptibleBarrier::wait(unsigned int timeout_sec, unsigned int timeout_nanosec)
244 {
245  if (likely(__data->own_mutex)) __data->mutex->lock();
246 
247  if ( __data->threads_left == 0 ) {
248  // first to come
249  __timeout = __interrupted = __wait_at_barrier = false;
250  __data->threads_left = _count;
251  __passed_threads->clear();
252  } else {
253  if ( __interrupted || __timeout ) {
254  // interrupted or timed out threads need to be reset if they should be reused
255  if (likely(__data->own_mutex)) __data->mutex->unlock();
256  return true;
257  }
258  }
259 
260  --__data->threads_left;
261  try {
262  __passed_threads->push_back_locked(Thread::current_thread());
263  } catch (Exception &e) {
264  // Cannot do anything more useful :-/
265  // to stay fully compatible with Barrier we do *not* re-throw
266  e.print_trace();
267  }
268 
269  bool local_timeout = false;
270  bool waker = (__data->threads_left == 0);
271 
272  while ( __data->threads_left && !__interrupted && !__timeout && ! local_timeout) {
273  local_timeout = ! __data->waitcond->reltimed_wait(timeout_sec, timeout_nanosec);
274  }
275  if (local_timeout) __timeout = true;
276 
277  if ( __interrupted ) {
278  if (likely(__data->own_mutex)) __data->mutex->unlock();
279  throw InterruptedException("InterruptibleBarrier forcefully interrupted, only "
280  "%u of %u threads reached the barrier",
281  _count - __data->threads_left, _count);
282  }
283 
284  if (waker || local_timeout) {
285  __wait_at_barrier = waker;
286  __data->waitcond->wake_all();
287  }
288 
289  if (likely(__data->own_mutex)) __data->mutex->unlock();
290 
291  if (__wait_at_barrier) {
292  Barrier::wait();
293  }
294 
295  return ! __timeout;
296 }
297 
298 } // end namespace fawkes
unsigned int count()
Get number of threads this barrier will wait for.
Definition: barrier.cpp:181
void interrupt()
Interrupt the barrier.
virtual void wait()
Wait for other threads.
virtual void wait()
Wait for other threads.
Definition: barrier.cpp:157
virtual ~InterruptibleBarrier()
Destructor.
A barrier is a synchronization tool which blocks until a given number of threads have reached the bar...
List of threads.
Definition: thread_list.h:57
Base class for exceptions in Fawkes.
Definition: exception.h:36
InterruptibleBarrier(unsigned int count)
Constructor.
unsigned int _count
Number of threads that are expected to wait for the barrier.
Definition: barrier.h:48
static Thread * current_thread()
Get the Thread instance of the currently running thread.
Definition: thread.cpp:1295
The current system call has been interrupted (for instance by a signal).
Definition: system.h:39
void print_trace()
Prints trace to stderr.
Definition: exception.cpp:619
RefPtr< ThreadList > passed_threads()
Get a list of threads that passed the barrier.
RefPtr&lt;&gt; is a reference-counting shared smartpointer.
Definition: refptr.h:49
void reset()
Clears the barrier.
Mutex mutual exclusion lock.
Definition: mutex.h:32
A barrier is a synchronization tool which blocks until a given number of threads have reached the bar...
Definition: barrier.h:32