const std::vector<do_foreach_t> for_each_functions
{
+ /* Test gdb::parallel_for_each. */
[] (int *start, int *end, foreach_callback_t callback)
- { gdb::parallel_for_each<1, int *, test_worker> (start, end, callback,
- 0); },
+ { gdb::parallel_for_each<1, int *, test_worker> (start, end, callback, 0); },
+
+ /* Test gdb::parallel_for_each_async. */
+ [] (int *start, int *end, foreach_callback_t callback)
+ {
+ bool done_flag = false;
+ std::condition_variable cv;
+ std::mutex mtx;
+
+ gdb::parallel_for_each_async<1, int *, test_worker> (start, end,
+ [&mtx, &done_flag, &cv] ()
+ {
+ std::lock_guard<std::mutex> lock (mtx);
+ done_flag = true;
+ cv.notify_one();
+ }, callback, 0);
+
+ /* Wait for the async parallel-for to complete. */
+ std::unique_lock<std::mutex> lock (mtx);
+ cv.wait (lock, [&done_flag] () { return done_flag; });
+ },
+
+ /* Test gdb::sequential_for_each. */
[] (int *start, int *end, foreach_callback_t callback)
- { gdb::sequential_for_each<int *, test_worker> (start, end, callback,
- 0); },
+ { gdb::sequential_for_each<int *, test_worker> (start, end, callback, 0); },
};
int default_thread_count = gdb::thread_pool::g_thread_pool->thread_count ();
namespace gdb
{
+/* If enabled, print debug info about the inner workings of the parallel for
+ each functions. */
+constexpr bool parallel_for_each_debug = false;
+
/* A "parallel-for" implementation using a shared work queue. Work items get
popped in batches of size up to BATCH_SIZE from the queue and handed out to
worker threads.
thread state.
Worker threads call Worker::operator() repeatedly until the queue is
- empty. */
+ empty.
+
+ This function is synchronous, meaning that it blocks and returns once the
+ processing is complete. */
template<std::size_t batch_size, class RandomIt, class Worker,
class... WorkerArgs>
parallel_for_each (const RandomIt first, const RandomIt last,
WorkerArgs &&...worker_args)
{
- /* If enabled, print debug info about how the work is distributed across
- the threads. */
- const bool parallel_for_each_debug = false;
-
gdb_assert (first <= last);
if (parallel_for_each_debug)
Worker (std::forward<WorkerArgs> (worker_args)...) ({ first, last });
}
+namespace detail
+{
+
+/* Type to hold the state shared between threads of
+ gdb::parallel_for_each_async. */
+
+template<std::size_t min_batch_size, typename RandomIt, typename... WorkerArgs>
+struct pfea_state
+{
+ pfea_state (RandomIt first, RandomIt last, std::function<void ()> &&done,
+ WorkerArgs &&...worker_args)
+ : first (first),
+ last (last),
+ worker_args_tuple (std::forward_as_tuple
+ (std::forward<WorkerArgs> (worker_args)...)),
+ queue (first, last),
+ m_done (std::move (done))
+ {}
+
+ DISABLE_COPY_AND_ASSIGN (pfea_state);
+
+ /* This gets called by the last worker thread that drops its reference on
+ the shared state, thus when the processing is complete. */
+ ~pfea_state ()
+ {
+ if (m_done)
+ m_done ();
+ }
+
+ /* The interval to process. */
+ const RandomIt first, last;
+
+ /* Tuple of arguments to pass when constructing the user's worker object.
+
+ Use std::decay_t to avoid storing references to the caller's local
+ variables. If we didn't use it and the caller passed an lvalue `foo *`,
+ we would store it as a reference to `foo *`, thus storing a reference to
+ the caller's local variable.
+
+ The downside is that it's not possible to pass arguments by reference,
+ callers need to pass pointers or std::reference_wrappers. */
+ std::tuple<std::decay_t<WorkerArgs>...> worker_args_tuple;
+
+ /* Work queue that worker threads pull work items from. */
+ work_queue<RandomIt, min_batch_size> queue;
+
+private:
+ /* Callable called when the parallel-for is done. */
+ std::function<void ()> m_done;
+};
+
+} /* namespace detail */
+
+/* A "parallel-for" implementation using a shared work queue. Work items get
+ popped in batches from the queue and handed out to worker threads.
+
+ Batch sizes are proportional to the number of remaining items in the queue,
+ but always greater or equal to MIN_BATCH_SIZE.
+
+ The DONE callback is invoked when processing is done.
+
+ Each worker thread instantiates an object of type Worker, forwarding ARGS to
+ its constructor. The Worker object can be used to keep some per-worker
+ thread state. This version does not support passing references as arguments
+ to the worker. Use std::reference_wrapper or pointers instead.
+
+ Worker threads call Worker::operator() repeatedly until the queue is
+ empty.
+
+ This function is asynchronous. An arbitrary worker thread will call the DONE
+ callback when processing is done. */
+
+template<std::size_t min_batch_size, class RandomIt, class Worker,
+ class... WorkerArgs>
+void
+parallel_for_each_async (const RandomIt first, const RandomIt last,
+ std::function<void ()> &&done,
+ WorkerArgs &&...worker_args)
+{
+ gdb_assert (first <= last);
+
+ if (parallel_for_each_debug)
+ {
+ debug_printf ("Parallel for: n elements: %zu\n",
+ static_cast<std::size_t> (last - first));
+ debug_printf ("Parallel for: min batch size: %zu\n", min_batch_size);
+ }
+
+ const size_t n_worker_threads
+ = std::max<size_t> (thread_pool::g_thread_pool->thread_count (), 1);
+
+ /* The state shared between all worker threads. All worker threads get a
+ reference on the shared pointer through the lambda below. The last worker
+ thread to drop its reference will cause this object to be destroyed, which
+ will call the DONE callback. */
+ using state_t = detail::pfea_state<min_batch_size, RandomIt, WorkerArgs...>;
+ auto state
+ = std::make_shared<state_t> (first, last, std::move (done),
+ std::forward<WorkerArgs> (worker_args)...);
+
+ /* The worker thread task. */
+ auto task = [state] ()
+ {
+ /* Instantiate the user-defined worker. */
+ auto worker = std::make_from_tuple<Worker> (state->worker_args_tuple);
+
+ for (;;)
+ {
+ const auto batch = state->queue.pop_batch ();
+
+ if (batch.empty ())
+ break;
+
+ if (parallel_for_each_debug)
+ debug_printf ("Processing %zu items, range [%zu, %zu[\n",
+ batch.size (),
+ batch.begin () - state->first,
+ batch.end () - state->first);
+
+ worker (batch);
+ }
+ };
+
+ /* Start N_WORKER_THREADS tasks. */
+ for (int i = 0; i < n_worker_threads; ++i)
+ gdb::thread_pool::g_thread_pool->post_task (task);
+}
+
}
#endif /* GDBSUPPORT_PARALLEL_FOR_H */