libkvikio  24.02.00
thread_pool.hpp
Go to the documentation of this file.
1 #pragma once
2 
28 #define THREAD_POOL_VERSION "v2.0.0 (2021-08-14)"
29 
30 #include <atomic> // std::atomic
31 #include <chrono> // std::chrono
32 #include <cstdint> // std::int_fast64_t, std::uint_fast32_t
33 #include <functional> // std::function
34 #include <future> // std::future, std::promise
35 #include <iostream> // std::cout, std::ostream
36 #include <memory> // std::shared_ptr, std::unique_ptr
37 #include <mutex> // std::mutex, std::scoped_lock
38 #include <queue> // std::queue
39 #include <thread> // std::this_thread, std::thread
40 #include <type_traits> // std::common_type_t, std::decay_t, std::enable_if_t, std::is_void_v, std::invoke_result_t
41 #include <utility> // std::move
42 
43 // ============================================================================================= //
44 // Begin class thread_pool //
45 
46 namespace kvikio::third_party {
47 
54 class thread_pool {
55  typedef std::uint_fast32_t ui32;
56  typedef std::uint_fast64_t ui64;
57 
58  public:
59  // ============================
60  // Constructors and destructors
61  // ============================
62 
71  thread_pool(const ui32& _thread_count = std::thread::hardware_concurrency())
72  : thread_count(_thread_count ? _thread_count : std::thread::hardware_concurrency()),
73  threads(new std::thread[_thread_count ? _thread_count : std::thread::hardware_concurrency()])
74  {
75  create_threads();
76  }
77 
84  {
86  running = false;
87  destroy_threads();
88  }
89 
90  // =======================
91  // Public member functions
92  // =======================
93 
99  ui64 get_tasks_queued() const
100  {
101  const std::scoped_lock lock(queue_mutex);
102  return tasks.size();
103  }
104 
110  ui32 get_tasks_running() const { return tasks_total - (ui32)get_tasks_queued(); }
111 
118  ui32 get_tasks_total() const { return tasks_total; }
119 
125  ui32 get_thread_count() const { return thread_count; }
126 
147  template <typename T1, typename T2, typename F>
148  void parallelize_loop(const T1& first_index,
149  const T2& index_after_last,
150  const F& loop,
151  ui32 num_blocks = 0)
152  {
153  typedef std::common_type_t<T1, T2> T;
154  T the_first_index = (T)first_index;
155  T last_index = (T)index_after_last;
156  if (the_first_index == last_index) return;
157  if (last_index < the_first_index) {
158  T temp = last_index;
159  last_index = the_first_index;
160  the_first_index = temp;
161  }
162  last_index--;
163  if (num_blocks == 0) num_blocks = thread_count;
164  ui64 total_size = (ui64)(last_index - the_first_index + 1);
165  ui64 block_size = (ui64)(total_size / num_blocks);
166  if (block_size == 0) {
167  block_size = 1;
168  num_blocks = (ui32)total_size > 1 ? (ui32)total_size : 1;
169  }
170  std::atomic<ui32> blocks_running = 0;
171  for (ui32 t = 0; t < num_blocks; t++) {
172  T start = ((T)(t * block_size) + the_first_index);
173  T end =
174  (t == num_blocks - 1) ? last_index + 1 : ((T)((t + 1) * block_size) + the_first_index);
175  blocks_running++;
176  push_task([start, end, &loop, &blocks_running] {
177  loop(start, end);
178  blocks_running--;
179  });
180  }
181  while (blocks_running != 0) {
182  sleep_or_yield();
183  }
184  }
185 
192  template <typename F>
193  void push_task(const F& task)
194  {
195  tasks_total++;
196  {
197  const std::scoped_lock lock(queue_mutex);
198  tasks.push(std::function<void()>(task));
199  }
200  }
201 
214  template <typename F, typename... A>
215  void push_task(const F& task, const A&... args)
216  {
217  push_task([task, args...] { task(args...); });
218  }
219 
232  void reset(const ui32& _thread_count = std::thread::hardware_concurrency())
233  {
234  bool was_paused = paused;
235  paused = true;
236  wait_for_tasks();
237  running = false;
238  destroy_threads();
239  thread_count = _thread_count ? _thread_count : std::thread::hardware_concurrency();
240  threads.reset(new std::thread[thread_count]);
241  paused = was_paused;
242  running = true;
243  create_threads();
244  }
245 
256  template <typename F,
257  typename... A,
258  typename = std::enable_if_t<
259  std::is_void_v<std::invoke_result_t<std::decay_t<F>, std::decay_t<A>...>>>>
260  std::future<bool> submit(const F& task, const A&... args)
261  {
262  std::shared_ptr<std::promise<bool>> task_promise(new std::promise<bool>);
263  std::future<bool> future = task_promise->get_future();
264  push_task([task, args..., task_promise] {
265  try {
266  task(args...);
267  task_promise->set_value(true);
268  } catch (...) {
269  try {
270  task_promise->set_exception(std::current_exception());
271  } catch (...) {
272  }
273  }
274  });
275  return future;
276  }
277 
290  template <typename F,
291  typename... A,
292  typename R = std::invoke_result_t<std::decay_t<F>, std::decay_t<A>...>,
293  typename = std::enable_if_t<!std::is_void_v<R>>>
294  std::future<R> submit(const F& task, const A&... args)
295  {
296  std::shared_ptr<std::promise<R>> task_promise(new std::promise<R>);
297  std::future<R> future = task_promise->get_future();
298  push_task([task, args..., task_promise] {
299  try {
300  task_promise->set_value(task(args...));
301  } catch (...) {
302  try {
303  task_promise->set_exception(std::current_exception());
304  } catch (...) {
305  }
306  }
307  });
308  return future;
309  }
310 
319  {
320  while (true) {
321  if (!paused) {
322  if (tasks_total == 0) break;
323  } else {
324  if (get_tasks_running() == 0) break;
325  }
326  sleep_or_yield();
327  }
328  }
329 
330  // ===========
331  // Public data
332  // ===========
333 
339  std::atomic<bool> paused = false;
340 
347  ui32 sleep_duration = 1000;
348 
349  private:
350  // ========================
351  // Private member functions
352  // ========================
353 
357  void create_threads()
358  {
359  for (ui32 i = 0; i < thread_count; i++) {
360  threads[i] = std::thread(&thread_pool::worker, this);
361  }
362  }
363 
367  void destroy_threads()
368  {
369  for (ui32 i = 0; i < thread_count; i++) {
370  threads[i].join();
371  }
372  }
373 
381  bool pop_task(std::function<void()>& task)
382  {
383  const std::scoped_lock lock(queue_mutex);
384  if (tasks.empty())
385  return false;
386  else {
387  task = std::move(tasks.front());
388  tasks.pop();
389  return true;
390  }
391  }
392 
397  void sleep_or_yield()
398  {
399  if (sleep_duration)
400  std::this_thread::sleep_for(std::chrono::microseconds(sleep_duration));
401  else
402  std::this_thread::yield();
403  }
404 
409  void worker()
410  {
411  while (running) {
412  std::function<void()> task;
413  if (!paused && pop_task(task)) {
414  task();
415  tasks_total--;
416  } else {
417  sleep_or_yield();
418  }
419  }
420  }
421 
422  // ============
423  // Private data
424  // ============
425 
429  mutable std::mutex queue_mutex = {};
430 
435  std::atomic<bool> running = true;
436 
440  std::queue<std::function<void()>> tasks = {};
441 
445  ui32 thread_count;
446 
450  std::unique_ptr<std::thread[]> threads;
451 
456  std::atomic<ui32> tasks_total = 0;
457 };
458 } // namespace kvikio::third_party
459 
460 // End class thread_pool //
461 // ============================================================================================= //
A C++17 thread pool class. The user submits tasks to be executed into a queue. Whenever a thread beco...
Definition: thread_pool.hpp:54
~thread_pool()
Destruct the thread pool. Waits for all tasks to complete, then destroys all threads....
Definition: thread_pool.hpp:83
ui32 get_tasks_running() const
Get the number of tasks currently being executed by the threads.
std::future< bool > submit(const F &task, const A &... args)
Submit a function with zero or more arguments and no return value into the task queue,...
thread_pool(const ui32 &_thread_count=std::thread::hardware_concurrency())
Construct a new thread pool.
Definition: thread_pool.hpp:71
std::atomic< bool > paused
An atomic variable indicating to the workers to pause. When set to true, the workers temporarily stop...
void push_task(const F &task, const A &... args)
Push a function with arguments, but no return value, into the task queue.
void reset(const ui32 &_thread_count=std::thread::hardware_concurrency())
Reset the number of threads in the pool. Waits for all currently running tasks to be completed,...
void wait_for_tasks()
Wait for tasks to be completed. Normally, this function waits for all tasks, both those that are curr...
ui64 get_tasks_queued() const
Get the number of tasks currently waiting in the queue to be executed by the threads.
Definition: thread_pool.hpp:99
void push_task(const F &task)
Push a function with no arguments or return value into the task queue.
std::future< R > submit(const F &task, const A &... args)
Submit a function with zero or more arguments and a return value into the task queue,...
ui32 get_thread_count() const
Get the number of threads in the pool.
ui32 get_tasks_total() const
Get the total number of unfinished tasks - either still in the queue, or running in a thread.
void parallelize_loop(const T1 &first_index, const T2 &index_after_last, const F &loop, ui32 num_blocks=0)
Parallelize a loop by splitting it into blocks, submitting each block separately to the thread pool,...