]>
Commit | Line | Data |
---|---|---|
d55c9a68 TT |
1 | /* Parallel for loops |
2 | ||
213516ef | 3 | Copyright (C) 2019-2023 Free Software Foundation, Inc. |
d55c9a68 TT |
4 | |
5 | This file is part of GDB. | |
6 | ||
7 | This program is free software; you can redistribute it and/or modify | |
8 | it under the terms of the GNU General Public License as published by | |
9 | the Free Software Foundation; either version 3 of the License, or | |
10 | (at your option) any later version. | |
11 | ||
12 | This program is distributed in the hope that it will be useful, | |
13 | but WITHOUT ANY WARRANTY; without even the implied warranty of | |
14 | MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the | |
15 | GNU General Public License for more details. | |
16 | ||
17 | You should have received a copy of the GNU General Public License | |
18 | along with this program. If not, see <http://www.gnu.org/licenses/>. */ | |
19 | ||
20 | #ifndef GDBSUPPORT_PARALLEL_FOR_H | |
21 | #define GDBSUPPORT_PARALLEL_FOR_H | |
22 | ||
23 | #include <algorithm> | |
f4565e4c | 24 | #include <type_traits> |
d55c9a68 | 25 | #include "gdbsupport/thread-pool.h" |
b859a3ef | 26 | #include "gdbsupport/function-view.h" |
d55c9a68 TT |
27 | |
28 | namespace gdb | |
29 | { | |
30 | ||
f4565e4c TT |
31 | namespace detail |
32 | { | |
33 | ||
34 | /* This is a helper class that is used to accumulate results for | |
35 | parallel_for. There is a specialization for 'void', below. */ | |
36 | template<typename T> | |
37 | struct par_for_accumulator | |
38 | { | |
39 | public: | |
40 | ||
41 | explicit par_for_accumulator (size_t n_threads) | |
42 | : m_futures (n_threads) | |
43 | { | |
44 | } | |
45 | ||
46 | /* The result type that is accumulated. */ | |
47 | typedef std::vector<T> result_type; | |
48 | ||
49 | /* Post the Ith task to a background thread, and store a future for | |
50 | later. */ | |
51 | void post (size_t i, std::function<T ()> task) | |
52 | { | |
53 | m_futures[i] | |
54 | = gdb::thread_pool::g_thread_pool->post_task (std::move (task)); | |
55 | } | |
56 | ||
57 | /* Invoke TASK in the current thread, then compute all the results | |
58 | from all background tasks and put them into a result vector, | |
59 | which is returned. */ | |
60 | result_type finish (gdb::function_view<T ()> task) | |
61 | { | |
62 | result_type result (m_futures.size () + 1); | |
63 | ||
64 | result.back () = task (); | |
65 | ||
66 | for (size_t i = 0; i < m_futures.size (); ++i) | |
67 | result[i] = m_futures[i].get (); | |
68 | ||
69 | return result; | |
70 | } | |
71 | ||
63078a04 TT |
72 | /* Resize the results to N. */ |
73 | void resize (size_t n) | |
74 | { | |
75 | m_futures.resize (n); | |
76 | } | |
77 | ||
f4565e4c TT |
78 | private: |
79 | ||
80 | /* A vector of futures coming from the tasks run in the | |
81 | background. */ | |
20c4eb42 | 82 | std::vector<gdb::future<T>> m_futures; |
f4565e4c TT |
83 | }; |
84 | ||
85 | /* See the generic template. */ | |
86 | template<> | |
87 | struct par_for_accumulator<void> | |
88 | { | |
89 | public: | |
90 | ||
91 | explicit par_for_accumulator (size_t n_threads) | |
92 | : m_futures (n_threads) | |
93 | { | |
94 | } | |
95 | ||
96 | /* This specialization does not compute results. */ | |
97 | typedef void result_type; | |
98 | ||
99 | void post (size_t i, std::function<void ()> task) | |
100 | { | |
101 | m_futures[i] | |
102 | = gdb::thread_pool::g_thread_pool->post_task (std::move (task)); | |
103 | } | |
104 | ||
105 | result_type finish (gdb::function_view<void ()> task) | |
106 | { | |
107 | task (); | |
108 | ||
109 | for (auto &future : m_futures) | |
110 | { | |
111 | /* Use 'get' and not 'wait', to propagate any exception. */ | |
112 | future.get (); | |
113 | } | |
114 | } | |
115 | ||
63078a04 TT |
116 | /* Resize the results to N. */ |
117 | void resize (size_t n) | |
118 | { | |
119 | m_futures.resize (n); | |
120 | } | |
121 | ||
f4565e4c TT |
122 | private: |
123 | ||
20c4eb42 | 124 | std::vector<gdb::future<void>> m_futures; |
f4565e4c TT |
125 | }; |
126 | ||
127 | } | |
128 | ||
d55c9a68 TT |
129 | /* A very simple "parallel for". This splits the range of iterators |
130 | into subranges, and then passes each subrange to the callback. The | |
131 | work may or may not be done in separate threads. | |
132 | ||
133 | This approach was chosen over having the callback work on single | |
134 | items because it makes it simple for the caller to do | |
82d734f7 TT |
135 | once-per-subrange initialization and destruction. |
136 | ||
137 | The parameter N says how batching ought to be done -- there will be | |
138 | at least N elements processed per thread. Setting N to 0 is not | |
f4565e4c TT |
139 | allowed. |
140 | ||
141 | If the function returns a non-void type, then a vector of the | |
142 | results is returned. The size of the resulting vector depends on | |
143 | the number of threads that were used. */ | |
d55c9a68 TT |
144 | |
145 | template<class RandomIt, class RangeFunction> | |
f4565e4c | 146 | typename gdb::detail::par_for_accumulator< |
70d02be7 | 147 | typename std::invoke_result<RangeFunction, RandomIt, RandomIt>::type |
f4565e4c | 148 | >::result_type |
82d734f7 | 149 | parallel_for_each (unsigned n, RandomIt first, RandomIt last, |
b859a3ef TV |
150 | RangeFunction callback, |
151 | gdb::function_view<size_t(RandomIt)> task_size = nullptr) | |
d55c9a68 | 152 | { |
a09520cd | 153 | using result_type |
70d02be7 | 154 | = typename std::invoke_result<RangeFunction, RandomIt, RandomIt>::type; |
d55c9a68 | 155 | |
53944a3b TV |
156 | /* If enabled, print debug info about how the work is distributed across |
157 | the threads. */ | |
9b89bf16 | 158 | const bool parallel_for_each_debug = false; |
53944a3b TV |
159 | |
160 | size_t n_worker_threads = thread_pool::g_thread_pool->thread_count (); | |
161 | size_t n_threads = n_worker_threads; | |
d55c9a68 | 162 | size_t n_elements = last - first; |
f4565e4c | 163 | size_t elts_per_thread = 0; |
4319180c | 164 | size_t elts_left_over = 0; |
b859a3ef TV |
165 | size_t total_size = 0; |
166 | size_t size_per_thread = 0; | |
167 | size_t max_element_size = n_elements == 0 ? 1 : SIZE_MAX / n_elements; | |
4319180c | 168 | |
d55c9a68 TT |
169 | if (n_threads > 1) |
170 | { | |
b859a3ef TV |
171 | if (task_size != nullptr) |
172 | { | |
173 | gdb_assert (n == 1); | |
174 | for (RandomIt i = first; i != last; ++i) | |
175 | { | |
176 | size_t element_size = task_size (i); | |
177 | gdb_assert (element_size > 0); | |
178 | if (element_size > max_element_size) | |
179 | /* We could start scaling here, but that doesn't seem to be | |
180 | worth the effort. */ | |
181 | element_size = max_element_size; | |
182 | size_t prev_total_size = total_size; | |
183 | total_size += element_size; | |
184 | /* Check for overflow. */ | |
185 | gdb_assert (prev_total_size < total_size); | |
186 | } | |
187 | size_per_thread = total_size / n_threads; | |
188 | } | |
189 | else | |
190 | { | |
191 | /* Require that there should be at least N elements in a | |
192 | thread. */ | |
193 | gdb_assert (n > 0); | |
194 | if (n_elements / n_threads < n) | |
195 | n_threads = std::max (n_elements / n, (size_t) 1); | |
196 | elts_per_thread = n_elements / n_threads; | |
197 | elts_left_over = n_elements % n_threads; | |
198 | /* n_elements == n_threads * elts_per_thread + elts_left_over. */ | |
199 | } | |
d55c9a68 | 200 | } |
d55c9a68 | 201 | |
f4565e4c TT |
202 | size_t count = n_threads == 0 ? 0 : n_threads - 1; |
203 | gdb::detail::par_for_accumulator<result_type> results (count); | |
d55c9a68 | 204 | |
53944a3b TV |
205 | if (parallel_for_each_debug) |
206 | { | |
207 | debug_printf (_("Parallel for: n_elements: %zu\n"), n_elements); | |
b859a3ef TV |
208 | if (task_size != nullptr) |
209 | { | |
210 | debug_printf (_("Parallel for: total_size: %zu\n"), total_size); | |
211 | debug_printf (_("Parallel for: size_per_thread: %zu\n"), size_per_thread); | |
212 | } | |
213 | else | |
214 | { | |
215 | debug_printf (_("Parallel for: minimum elements per thread: %u\n"), n); | |
216 | debug_printf (_("Parallel for: elts_per_thread: %zu\n"), elts_per_thread); | |
217 | } | |
53944a3b TV |
218 | } |
219 | ||
b859a3ef | 220 | size_t remaining_size = total_size; |
f4565e4c TT |
221 | for (int i = 0; i < count; ++i) |
222 | { | |
b859a3ef TV |
223 | RandomIt end; |
224 | size_t chunk_size = 0; | |
225 | if (task_size == nullptr) | |
226 | { | |
227 | end = first + elts_per_thread; | |
228 | if (i < elts_left_over) | |
229 | /* Distribute the leftovers over the worker threads, to avoid having | |
230 | to handle all of them in a single thread. */ | |
231 | end++; | |
232 | } | |
233 | else | |
234 | { | |
235 | RandomIt j; | |
236 | for (j = first; j < last && chunk_size < size_per_thread; ++j) | |
237 | { | |
238 | size_t element_size = task_size (j); | |
239 | if (element_size > max_element_size) | |
240 | element_size = max_element_size; | |
241 | chunk_size += element_size; | |
242 | } | |
243 | end = j; | |
244 | remaining_size -= chunk_size; | |
245 | } | |
63078a04 TT |
246 | |
247 | /* This case means we don't have enough elements to really | |
248 | distribute them. Rather than ever submit a task that does | |
249 | nothing, we short-circuit here. */ | |
250 | if (first == end) | |
251 | end = last; | |
252 | ||
253 | if (end == last) | |
254 | { | |
255 | /* We're about to dispatch the last batch of elements, which | |
256 | we normally process in the main thread. So just truncate | |
257 | the result list here. This avoids submitting empty tasks | |
258 | to the thread pool. */ | |
259 | count = i; | |
260 | results.resize (count); | |
261 | break; | |
262 | } | |
263 | ||
53944a3b | 264 | if (parallel_for_each_debug) |
b859a3ef TV |
265 | { |
266 | debug_printf (_("Parallel for: elements on worker thread %i\t: %zu"), | |
267 | i, (size_t)(end - first)); | |
268 | if (task_size != nullptr) | |
269 | debug_printf (_("\t(size: %zu)"), chunk_size); | |
270 | debug_printf (_("\n")); | |
271 | } | |
f4565e4c | 272 | results.post (i, [=] () |
287de656 | 273 | { return callback (first, end); }); |
f4565e4c TT |
274 | first = end; |
275 | } | |
276 | ||
53944a3b TV |
277 | for (int i = count; i < n_worker_threads; ++i) |
278 | if (parallel_for_each_debug) | |
b859a3ef TV |
279 | { |
280 | debug_printf (_("Parallel for: elements on worker thread %i\t: 0"), i); | |
281 | if (task_size != nullptr) | |
282 | debug_printf (_("\t(size: 0)")); | |
283 | debug_printf (_("\n")); | |
284 | } | |
53944a3b | 285 | |
f4565e4c | 286 | /* Process all the remaining elements in the main thread. */ |
53944a3b | 287 | if (parallel_for_each_debug) |
b859a3ef TV |
288 | { |
289 | debug_printf (_("Parallel for: elements on main thread\t\t: %zu"), | |
290 | (size_t)(last - first)); | |
291 | if (task_size != nullptr) | |
292 | debug_printf (_("\t(size: %zu)"), remaining_size); | |
293 | debug_printf (_("\n")); | |
294 | } | |
f4565e4c TT |
295 | return results.finish ([=] () |
296 | { | |
297 | return callback (first, last); | |
298 | }); | |
d55c9a68 TT |
299 | } |
300 | ||
18a5766d TV |
301 | /* A sequential drop-in replacement of parallel_for_each. This can be useful |
302 | when debugging multi-threading behaviour, and you want to limit | |
303 | multi-threading in a fine-grained way. */ | |
304 | ||
305 | template<class RandomIt, class RangeFunction> | |
306 | typename gdb::detail::par_for_accumulator< | |
70d02be7 | 307 | typename std::invoke_result<RangeFunction, RandomIt, RandomIt>::type |
18a5766d TV |
308 | >::result_type |
309 | sequential_for_each (unsigned n, RandomIt first, RandomIt last, | |
b859a3ef TV |
310 | RangeFunction callback, |
311 | gdb::function_view<size_t(RandomIt)> task_size = nullptr) | |
18a5766d | 312 | { |
70d02be7 | 313 | using result_type = typename std::invoke_result<RangeFunction, RandomIt, RandomIt>::type; |
18a5766d TV |
314 | |
315 | gdb::detail::par_for_accumulator<result_type> results (0); | |
316 | ||
317 | /* Process all the remaining elements in the main thread. */ | |
318 | return results.finish ([=] () | |
319 | { | |
320 | return callback (first, last); | |
321 | }); | |
322 | } | |
323 | ||
d55c9a68 TT |
324 | } |
325 | ||
326 | #endif /* GDBSUPPORT_PARALLEL_FOR_H */ |