From 52300df201ed1a35eb15b2bc5eb441a1b7eec393 Mon Sep 17 00:00:00 2001 From: Christian Biesinger Date: Tue, 1 Oct 2019 18:38:35 -0500 Subject: [PATCH] Implement a thread pool --- gdb/Makefile.in | 1 + gdb/gdbsupport/parallel-for.c | 1 + gdb/gdbsupport/parallel-for.h | 57 ++++++++++++++++++++--------------- gdb/gdbsupport/thread_pool.c | 50 ++++++++++++++++++++++++++++++ gdb/gdbsupport/thread_pool.h | 43 ++++++++++++++++++++++++++ gdb/minsyms.c | 1 + 6 files changed, 129 insertions(+), 24 deletions(-) create mode 100644 gdb/gdbsupport/thread_pool.c create mode 100644 gdb/gdbsupport/thread_pool.h diff --git a/gdb/Makefile.in b/gdb/Makefile.in index c7b9efdbbc6..8265eb8a246 100644 --- a/gdb/Makefile.in +++ b/gdb/Makefile.in @@ -980,6 +980,7 @@ COMMON_SFILES = \ gdbsupport/signals.c \ gdbsupport/signals-state-save-restore.c \ gdbsupport/tdesc.c \ + gdbsupport/thread_pool.c \ gdbsupport/vec.c \ gdbsupport/xml-utils.c \ complaints.c \ diff --git a/gdb/gdbsupport/parallel-for.c b/gdb/gdbsupport/parallel-for.c index 0024278bd7f..7d15ae120d5 100644 --- a/gdb/gdbsupport/parallel-for.c +++ b/gdb/gdbsupport/parallel-for.c @@ -24,4 +24,5 @@ namespace gdb { /* See parallel-for.h. */ int max_threads = -1; +thread_pool parallel_for_pool; } diff --git a/gdb/gdbsupport/parallel-for.h b/gdb/gdbsupport/parallel-for.h index 60b9472e1fe..6f4235da363 100644 --- a/gdb/gdbsupport/parallel-for.h +++ b/gdb/gdbsupport/parallel-for.h @@ -22,11 +22,13 @@ #include #if CXX_STD_THREAD +#include #include #include #endif #include "gdbsupport/block-signals.h" +#include "gdbsupport/thread_pool.h" namespace gdb { @@ -35,6 +37,8 @@ namespace gdb extern int max_threads; +extern thread_pool parallel_for_pool; + /* A very simple "parallel for". This splits the range of iterators into subranges, and then passes each subrange to the callback. The work may or may not be done in separate threads. @@ -56,44 +60,49 @@ parallel_for_each (RandomIt first, RandomIt last, RangeFunction callback) n_threads = max_threads; if (n_threads > local_max) n_threads = local_max; - int n_actual_threads = 0; + if (!parallel_for_pool.started ()) + { + /* Ensure that signals used by gdb are blocked in the new + threads. */ + block_signals blocker; + parallel_for_pool.start (n_threads); + } + + std::mutex mtx; + std::condition_variable cv; + int num_finished = 0; - std::thread threads[local_max]; size_t n_elements = last - first; if (n_threads > 1 && 2 * n_threads <= n_elements) { - /* Ensure that signals used by gdb are blocked in the new - threads. */ - block_signals blocker; - size_t elts_per_thread = n_elements / n_threads; - n_actual_threads = n_threads - 1; - for (int i = 0; i < n_actual_threads; ++i) + for (int i = 0; i < n_threads; ++i) { RandomIt end = first + elts_per_thread; - try - { - threads[i] = std::thread (callback, first, end); - } - catch (const std::system_error &failure) - { - /* If a thread failed to start, ignore it and fall back - to processing in the main thread. */ - n_actual_threads = i; - break; - } + parallel_for_pool.post_task ([&, first, end] () { + callback (first, end); + std::unique_lock lck (mtx); + num_finished++; + cv.notify_all (); + }); first = end; } } + else + n_threads = 0; #endif /* CXX_STD_THREAD */ /* Process all the remaining elements in the main thread. */ callback (first, last); - -#if CXX_STD_THREAD - for (int i = 0; i < n_actual_threads; ++i) - threads[i].join (); -#endif /* CXX_STD_THREAD */ + if (n_threads) + { + for (;;) { + std::unique_lock lck (mtx); + if (num_finished == n_threads) + break; + cv.wait (lck); + } + } } } diff --git a/gdb/gdbsupport/thread_pool.c b/gdb/gdbsupport/thread_pool.c new file mode 100644 index 00000000000..03fef956df7 --- /dev/null +++ b/gdb/gdbsupport/thread_pool.c @@ -0,0 +1,50 @@ +#include "thread_pool.h" + +namespace gdb { + +thread_pool::~thread_pool () +{ + { + std::lock_guard guard (m_tasks_mutex); + m_shutdown = true; + m_tasks_cv.notify_all (); + } + for (auto& t : m_threads) + t.join(); +} + +void +thread_pool::start (size_t num_threads) +{ + for (size_t i = 0; i < num_threads; ++i) + { + m_threads.emplace_back (&thread_pool::thread_function, this); + } + m_started = true; +} + + +void +thread_pool::thread_function () +{ + while (!m_shutdown) + { + task t; + { + std::unique_lock guard (m_tasks_mutex); + if (m_shutdown) + break; + if (m_tasks.empty ()) + m_tasks_cv.wait (guard); + if (m_shutdown) + break; + if (m_tasks.empty ()) + continue; + t = m_tasks.front(); + m_tasks.pop(); + } + t (); + } +} + +} diff --git a/gdb/gdbsupport/thread_pool.h b/gdb/gdbsupport/thread_pool.h new file mode 100644 index 00000000000..77760a95042 --- /dev/null +++ b/gdb/gdbsupport/thread_pool.h @@ -0,0 +1,43 @@ +#ifndef PARALLEL_FOR_H +#define PARALLEL_FOR_H + +#include +#include +#include +#include +#include +#include +#include + +namespace gdb { + +class thread_pool { + public: + thread_pool() : m_started (false), m_shutdown (false) {} + ~thread_pool(); + + bool started() const { return m_started; } + + void start(size_t num_threads); + + typedef std::function task; + void post_task(task t) { + std::lock_guard guard (m_tasks_mutex); + m_tasks.push (t); + m_tasks_cv.notify_one (); + } + + private: + void thread_function(); + + bool m_started; + std::vector m_threads; + std::atomic m_shutdown; + std::queue m_tasks; + std::condition_variable m_tasks_cv; + std::mutex m_tasks_mutex; +}; + +} + +#endif diff --git a/gdb/minsyms.c b/gdb/minsyms.c index 45ba61a2a87..32d25f03e38 100644 --- a/gdb/minsyms.c +++ b/gdb/minsyms.c @@ -57,6 +57,7 @@ #include "gdbsupport/parallel-for.h" #if CXX_STD_THREAD +#include #include #endif -- 2.47.2