#include <algorithm>
#if CXX_STD_THREAD
+#include <condition_variable>
#include <system_error>
#include <thread>
#endif
#include "gdbsupport/block-signals.h"
+#include "gdbsupport/thread_pool.h"
namespace gdb
{
extern int max_threads;
+extern thread_pool parallel_for_pool;
+
/* A very simple "parallel for". This splits the range of iterators
into subranges, and then passes each subrange to the callback. The
work may or may not be done in separate threads.
n_threads = max_threads;
if (n_threads > local_max)
n_threads = local_max;
- int n_actual_threads = 0;
+ if (!parallel_for_pool.started ())
+ {
+ /* Ensure that signals used by gdb are blocked in the new
+ threads. */
+ block_signals blocker;
+ parallel_for_pool.start (n_threads);
+ }
+
+ std::mutex mtx;
+ std::condition_variable cv;
+ int num_finished = 0;
- std::thread threads[local_max];
size_t n_elements = last - first;
if (n_threads > 1 && 2 * n_threads <= n_elements)
{
- /* Ensure that signals used by gdb are blocked in the new
- threads. */
- block_signals blocker;
-
size_t elts_per_thread = n_elements / n_threads;
- n_actual_threads = n_threads - 1;
- for (int i = 0; i < n_actual_threads; ++i)
+ for (int i = 0; i < n_threads; ++i)
{
RandomIt end = first + elts_per_thread;
- try
- {
- threads[i] = std::thread (callback, first, end);
- }
- catch (const std::system_error &failure)
- {
- /* If a thread failed to start, ignore it and fall back
- to processing in the main thread. */
- n_actual_threads = i;
- break;
- }
+ parallel_for_pool.post_task ([&, first, end] () {
+ callback (first, end);
+ std::unique_lock<std::mutex> lck (mtx);
+ num_finished++;
+ cv.notify_all ();
+ });
first = end;
}
}
+ else
+ n_threads = 0;
#endif /* CXX_STD_THREAD */
/* Process all the remaining elements in the main thread. */
callback (first, last);
-
-#if CXX_STD_THREAD
- for (int i = 0; i < n_actual_threads; ++i)
- threads[i].join ();
-#endif /* CXX_STD_THREAD */
+ if (n_threads)
+ {
+ for (;;) {
+ std::unique_lock<std::mutex> lck (mtx);
+ if (num_finished == n_threads)
+ break;
+ cv.wait (lck);
+ }
+ }
}
}
--- /dev/null
+#include "thread_pool.h"
+
+namespace gdb {
+
+thread_pool::~thread_pool ()
+{
+ {
+ std::lock_guard<std::mutex> guard (m_tasks_mutex);
+ m_shutdown = true;
+ m_tasks_cv.notify_all ();
+ }
+ for (auto& t : m_threads)
+ t.join();
+}
+
+void
+thread_pool::start (size_t num_threads)
+{
+ for (size_t i = 0; i < num_threads; ++i)
+ {
+ m_threads.emplace_back (&thread_pool::thread_function, this);
+ }
+ m_started = true;
+}
+
+
+void
+thread_pool::thread_function ()
+{
+ while (!m_shutdown)
+ {
+ task t;
+ {
+ std::unique_lock<std::mutex> guard (m_tasks_mutex);
+ if (m_shutdown)
+ break;
+ if (m_tasks.empty ())
+ m_tasks_cv.wait (guard);
+ if (m_shutdown)
+ break;
+ if (m_tasks.empty ())
+ continue;
+ t = m_tasks.front();
+ m_tasks.pop();
+ }
+ t ();
+ }
+}
+
+}
--- /dev/null
+#ifndef PARALLEL_FOR_H
+#define PARALLEL_FOR_H
+
+#include <queue>
+#include <thread>
+#include <vector>
+#include <functional>
+#include <atomic>
+#include <mutex>
+#include <condition_variable>
+
+namespace gdb {
+
+class thread_pool {
+ public:
+ thread_pool() : m_started (false), m_shutdown (false) {}
+ ~thread_pool();
+
+ bool started() const { return m_started; }
+
+ void start(size_t num_threads);
+
+ typedef std::function<void ()> task;
+ void post_task(task t) {
+ std::lock_guard<std::mutex> guard (m_tasks_mutex);
+ m_tasks.push (t);
+ m_tasks_cv.notify_one ();
+ }
+
+ private:
+ void thread_function();
+
+ bool m_started;
+ std::vector<std::thread> m_threads;
+ std::atomic<bool> m_shutdown;
+ std::queue<task> m_tasks;
+ std::condition_variable m_tasks_cv;
+ std::mutex m_tasks_mutex;
+};
+
+}
+
+#endif