Alexandria  2.18
Please provide a description of the project.
ThreadPool.cpp
Go to the documentation of this file.
1 /*
2  * Copyright (C) 2012-2021 Euclid Science Ground Segment
3  *
4  * This library is free software; you can redistribute it and/or modify it under
5  * the terms of the GNU Lesser General Public License as published by the Free
6  * Software Foundation; either version 3.0 of the License, or (at your option)
7  * any later version.
8  *
9  * This library is distributed in the hope that it will be useful, but WITHOUT
10  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
11  * FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
12  * details.
13  *
14  * You should have received a copy of the GNU Lesser General Public License
15  * along with this library; if not, write to the Free Software Foundation, Inc.,
16  * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
17  */
18 
27 #include <algorithm>
28 #include <numeric>
29 
30 namespace Euclid {
31 
32 namespace {
33 
34 class Worker {
35 
36 public:
37  Worker(std::mutex& queue_mutex, std::deque<ThreadPool::Task>& queue, std::atomic<bool>& run_flag,
38  std::atomic<bool>& sleeping_flag, std::atomic<bool>& done_flag, unsigned int empty_queue_wait_time,
39  std::exception_ptr& exception_ptr)
40  : m_queue_mutex(queue_mutex)
41  , m_queue(queue)
42  , m_run_flag(run_flag)
43  , m_sleeping_flag(sleeping_flag)
44  , m_done_flag(done_flag)
45  , m_empty_queue_wait_time(empty_queue_wait_time)
46  , m_exception_ptr(exception_ptr) {}
47 
48  void operator()() {
49  while (m_run_flag.get() && m_exception_ptr == nullptr) {
50  // Check if there is anything it the queue to be done and get it
51  std::unique_ptr<ThreadPool::Task> task_ptr = nullptr;
53  if (!m_queue.get().empty()) {
54  task_ptr = Euclid::make_unique<ThreadPool::Task>(m_queue.get().front());
55  m_queue.get().pop_front();
56  }
57  lock.unlock();
58 
59  // If we have some work to do, do it. Otherwise sleep for some time.
60  if (task_ptr) {
61  try {
62  (*task_ptr)();
63  } catch (...) {
65  }
66  } else {
67  m_sleeping_flag.get() = true;
69  m_sleeping_flag.get() = false;
70  }
71  }
72  // Indicate that the worker is done
73  m_sleeping_flag.get() = true;
74  m_done_flag.get() = true;
75  }
76 
77 private:
85 };
86 
87 } // end of anonymous namespace
88 
89 ThreadPool::ThreadPool(unsigned int thread_count, unsigned int empty_queue_wait_time)
90  : m_worker_run_flags(thread_count)
91  , m_worker_sleeping_flags(thread_count)
92  , m_worker_done_flags(thread_count)
93  , m_empty_queue_wait_time(empty_queue_wait_time) {
94  for (unsigned int i = 0; i < thread_count; ++i) {
95  m_worker_run_flags.at(i) = true;
96  m_worker_sleeping_flags.at(i) = false;
97  m_worker_done_flags.at(i) = false;
100  }
101 }
102 
103 namespace {
104 
105 void waitWorkers(std::vector<std::atomic<bool>>& worker_flags, unsigned int wait_time) {
106  // Now wait until all the workers have finish any current tasks
107  for (auto& flag : worker_flags) {
108  while (!flag) {
110  }
111  }
112 }
113 
114 } // namespace
115 
116 bool ThreadPool::checkForException(bool rethrow) {
117  if (m_exception_ptr) {
118  if (rethrow) {
120  } else {
121  return true;
122  }
123  }
124  return false;
125 }
126 
127 size_t ThreadPool::queued() const {
129  return m_queue.size();
130 }
131 
132 size_t ThreadPool::running() const {
135  return m_worker_sleeping_flags.size() - sleeping;
136 }
137 
139  // Wait for the queue to be empty
140  bool queue_is_empty = false;
141  while (!queue_is_empty && m_exception_ptr == nullptr) {
143  queue_is_empty = m_queue.empty();
144  lock.unlock();
145  if (!queue_is_empty) {
147  }
148  }
149  // Wait for the workers to finish the currently executing tasks
151  // Check if any worker finished with an exception
152  checkForException(true);
153 }
154 
156  // Stop all the workers. They will stop right after they finish the task
157  // they already run.
159  // Now wait until all the workers have finish any current tasks
161  for (auto& worker : m_workers) {
162  worker.join();
163  }
164 }
165 
168  if (m_worker_run_flags.empty()) {
169  task();
170  } else {
171  m_queue.emplace_back(std::move(task));
172  }
173 }
174 
175 } // namespace Euclid
std::reference_wrapper< std::mutex > m_queue_mutex
Definition: ThreadPool.cpp:78
std::reference_wrapper< std::atomic< bool > > m_run_flag
Definition: ThreadPool.cpp:80
std::reference_wrapper< std::atomic< bool > > m_sleeping_flag
Definition: ThreadPool.cpp:81
std::reference_wrapper< std::deque< ThreadPool::Task > > m_queue
Definition: ThreadPool.cpp:79
std::reference_wrapper< std::atomic< bool > > m_done_flag
Definition: ThreadPool.cpp:82
unsigned int m_empty_queue_wait_time
Definition: ThreadPool.cpp:83
std::reference_wrapper< std::exception_ptr > m_exception_ptr
Definition: ThreadPool.cpp:84
T accumulate(T... args)
T at(T... args)
T begin(T... args)
void submit(Task task)
Submit a task to be executed.
Definition: ThreadPool.cpp:166
std::deque< Task > m_queue
Definition: ThreadPool.h:110
size_t running() const
Return the number of running tasks.
Definition: ThreadPool.cpp:132
std::vector< std::atomic< bool > > m_worker_sleeping_flags
Definition: ThreadPool.h:107
size_t queued() const
Return the number of queued tasks.
Definition: ThreadPool.cpp:127
unsigned int m_empty_queue_wait_time
Definition: ThreadPool.h:111
std::vector< std::thread > m_workers
Definition: ThreadPool.h:109
std::mutex m_queue_mutex
Definition: ThreadPool.h:105
std::vector< std::atomic< bool > > m_worker_run_flags
Definition: ThreadPool.h:106
std::vector< std::atomic< bool > > m_worker_done_flags
Definition: ThreadPool.h:108
bool checkForException(bool rethrow=false)
Checks if any task has thrown an exception and optionally rethrows it.
Definition: ThreadPool.cpp:116
virtual ~ThreadPool()
Definition: ThreadPool.cpp:155
std::exception_ptr m_exception_ptr
Definition: ThreadPool.h:112
ThreadPool(unsigned int thread_count=std::thread::hardware_concurrency(), unsigned int empty_queue_wait_time=50)
Constructs a new ThreadPool.
Definition: ThreadPool.cpp:89
T current_exception(T... args)
T emplace_back(T... args)
T empty(T... args)
T end(T... args)
T fill(T... args)
T lock(T... args)
T move(T... args)
T rethrow_exception(T... args)
T size(T... args)
T sleep_for(T... args)