#define _GLIBCXX_JOB_VOLATILE volatile
- /** @brief One job for a certain thread. */
- template<typename _DifferenceTp>
+/** @brief One job for a certain thread. */
+template<typename _DifferenceTp>
struct Job
{
typedef _DifferenceTp difference_type;
_GLIBCXX_JOB_VOLATILE difference_type load;
};
- /** @brief Work stealing algorithm for random access iterators.
- *
- * Uses O(1) additional memory. Synchronization at job lists is
- * done with atomic operations.
- * @param begin Begin iterator of element sequence.
- * @param end End iterator of element sequence.
- * @param op User-supplied functor (comparator, predicate, adding
- * functor, ...).
- * @param f Functor to "process" an element with op (depends on
- * desired functionality, e. g. for std::for_each(), ...).
- * @param r Functor to "add" a single result to the already
- * processed elements (depends on functionality).
- * @param base Base value for reduction.
- * @param output Pointer to position where final result is written to
- * @param bound Maximum number of elements processed (e. g. for
- * std::count_n()).
- * @return User-supplied functor (that may contain a part of the result).
- */
- template<typename RandomAccessIterator, typename Op, typename Fu, typename Red, typename Result>
+/** @brief Work stealing algorithm for random access iterators.
+ *
+ * Uses O(1) additional memory. Synchronization at job lists is
+ * done with atomic operations.
+ * @param begin Begin iterator of element sequence.
+ * @param end End iterator of element sequence.
+ * @param op User-supplied functor (comparator, predicate, adding
+ * functor, ...).
+ * @param f Functor to "process" an element with op (depends on
+ * desired functionality, e. g. for std::for_each(), ...).
+ * @param r Functor to "add" a single result to the already
+ * processed elements (depends on functionality).
+ * @param base Base value for reduction.
+ * @param output Pointer to position where final result is written to
+ * @param bound Maximum number of elements processed (e. g. for
+ * std::count_n()).
+ * @return User-supplied functor (that may contain a part of the result).
+ */
+template<
+ typename RandomAccessIterator,
+ typename Op,
+ typename Fu,
+ typename Red,
+ typename Result>
Op
- for_each_template_random_access_workstealing(RandomAccessIterator begin,
- RandomAccessIterator end,
- Op op, Fu& f, Red r,
- Result base, Result& output,
- typename std::iterator_traits<RandomAccessIterator>::difference_type bound)
+ for_each_template_random_access_workstealing(
+ RandomAccessIterator begin,
+ RandomAccessIterator end,
+ Op op, Fu& f, Red r,
+ Result base, Result& output,
+ typename std::iterator_traits<RandomAccessIterator>::difference_type
+ bound)
{
_GLIBCXX_CALL(end - begin)
typedef typename traits_type::difference_type difference_type;
- difference_type chunk_size = static_cast<difference_type>(Settings::workstealing_chunk_size);
+ difference_type chunk_size =
+ static_cast<difference_type>(Settings::workstealing_chunk_size);
// How many jobs?
difference_type length = (bound < 0) ? (end - begin) : bound;
// To avoid false sharing in a cache line.
- const int stride = Settings::cache_line_size * 10 / sizeof(Job<difference_type>) + 1;
+ const int stride =
+ Settings::cache_line_size * 10 / sizeof(Job<difference_type>) + 1;
// Total number of threads currently working.
thread_index_t busy = 0;
- thread_index_t num_threads = get_max_threads();
- difference_type num_threads_min = num_threads < end - begin ? num_threads : end - begin;
+
+ Job<difference_type> *job;
omp_lock_t output_lock;
omp_init_lock(&output_lock);
- // No more threads than jobs, at least one thread.
- difference_type num_threads_max = num_threads_min > 1 ? num_threads_min : 1;
- num_threads = static_cast<thread_index_t>(num_threads_max);
-
- // Create job description array.
- Job<difference_type> *job = new Job<difference_type>[num_threads * stride];
-
// Write base value to output.
output = base;
-#pragma omp parallel shared(busy) num_threads(num_threads)
- {
- // Initialization phase.
-
- // Flags for every thread if it is doing productive work.
- bool iam_working = false;
-
- // Thread id.
- thread_index_t iam = omp_get_thread_num();
-
- // This job.
- Job<difference_type>& my_job = job[iam * stride];
-
- // Random number (for work stealing).
- thread_index_t victim;
-
- // Local value for reduction.
- Result result = Result();
-
- // Number of elements to steal in one attempt.
- difference_type steal;
-
- // Every thread has its own random number generator (modulo num_threads).
- random_number rand_gen(iam, num_threads);
-
-#pragma omp atomic
- // This thread is currently working.
- busy++;
-
- iam_working = true;
-
- // How many jobs per thread? last thread gets the rest.
- my_job.first = static_cast<difference_type>(iam * (length / num_threads));
-
- my_job.last = (iam == (num_threads - 1)) ? (length - 1) : ((iam + 1) * (length / num_threads) - 1);
- my_job.load = my_job.last - my_job.first + 1;
-
- // Init result with first value (to have a base value for reduction).
- if (my_job.first <= my_job.last)
- {
- // Cannot use volatile variable directly.
- difference_type my_first = my_job.first;
- result = f(op, begin + my_first);
- my_job.first++;
- my_job.load--;
- }
-
- RandomAccessIterator current;
-
-#pragma omp barrier
-
- // Actual work phase
- // Work on own or stolen start
- while (busy > 0)
- {
- // Work until no productive thread left.
-#pragma omp flush(busy)
-
- // Thread has own work to do
- while (my_job.first <= my_job.last)
- {
- // fetch-and-add call
- // Reserve current job block (size chunk_size) in my queue.
- difference_type current_job = fetch_and_add<difference_type>(&(my_job.first), chunk_size);
-
- // Update load, to make the three values consistent,
- // first might have been changed in the meantime
- my_job.load = my_job.last - my_job.first + 1;
- for (difference_type job_counter = 0; job_counter < chunk_size && current_job <= my_job.last; job_counter++)
- {
- // Yes: process it!
- current = begin + current_job;
- current_job++;
-
- // Do actual work.
- result = r(result, f(op, current));
- }
-
-#pragma omp flush(busy)
-
- }
-
- // After reaching this point, a thread's job list is empty.
- if (iam_working)
- {
-#pragma omp atomic
- // This thread no longer has work.
- busy--;
-
- iam_working = false;
- }
-
- difference_type supposed_first, supposed_last, supposed_load;
- do
- {
- // Find random nonempty deque (not own) and do consistency check.
- yield();
-#pragma omp flush(busy)
- victim = rand_gen();
- supposed_first = job[victim * stride].first;
- supposed_last = job[victim * stride].last;
- supposed_load = job[victim * stride].load;
- }
- while (busy > 0
- && ((supposed_load <= 0) || ((supposed_first + supposed_load - 1) != supposed_last)));
-
- if (busy == 0)
- break;
-
- if (supposed_load > 0)
- {
- // Has work and work to do.
- // Number of elements to steal (at least one).
- steal = (supposed_load < 2) ? 1 : supposed_load / 2;
-
- // Protects against stealing threads
- // omp_set_lock(&(job[victim * stride].lock));
-
- // Push victim's start forward.
- difference_type stolen_first = fetch_and_add<difference_type>(&(job[victim * stride].first), steal);
- difference_type stolen_try = stolen_first + steal - difference_type(1);
-
- // Protects against working thread
- // omp_unset_lock(&(job[victim * stride].lock));
-
- my_job.first = stolen_first;
-
- // Avoid std::min dependencies.
- my_job.last = stolen_try < supposed_last ? stolen_try : supposed_last;
-
- my_job.load = my_job.last - my_job.first + 1;
-
- //omp_unset_lock(&(my_job.lock));
-
-#pragma omp atomic
- // Has potential work again.
- busy++;
- iam_working = true;
-
-#pragma omp flush(busy)
- }
-#pragma omp flush(busy)
- } // end while busy > 0
- // Add accumulated result to output.
- omp_set_lock(&output_lock);
- output = r(output, result);
- omp_unset_lock(&output_lock);
-
- //omp_destroy_lock(&(my_job.lock));
- }
+ // No more threads than jobs, at least one thread.
+ thread_index_t num_threads =
+ __gnu_parallel::max<thread_index_t>(1,
+ __gnu_parallel::min<difference_type>(length, get_max_threads()));
+
+# pragma omp parallel shared(busy) num_threads(num_threads)
+ {
+
+# pragma omp single
+ {
+ num_threads = omp_get_num_threads();
+
+ // Create job description array.
+ job = new Job<difference_type>[num_threads * stride];
+ }
+
+ // Initialization phase.
+
+ // Flags for every thread if it is doing productive work.
+ bool iam_working = false;
+
+ // Thread id.
+ thread_index_t iam = omp_get_thread_num();
+
+ // This job.
+ Job<difference_type>& my_job = job[iam * stride];
+
+ // Random number (for work stealing).
+ thread_index_t victim;
+
+ // Local value for reduction.
+ Result result = Result();
+
+ // Number of elements to steal in one attempt.
+ difference_type steal;
+
+ // Every thread has its own random number generator
+ // (modulo num_threads).
+ random_number rand_gen(iam, num_threads);
+
+ // This thread is currently working.
+# pragma omp atomic
+ busy++;
+
+ iam_working = true;
+
+ // How many jobs per thread? last thread gets the rest.
+ my_job.first =
+ static_cast<difference_type>(iam * (length / num_threads));
+
+ my_job.last = (iam == (num_threads - 1)) ?
+ (length - 1) : ((iam + 1) * (length / num_threads) - 1);
+ my_job.load = my_job.last - my_job.first + 1;
+
+ // Init result with first value (to have a base value for reduction).
+ if (my_job.first <= my_job.last)
+ {
+ // Cannot use volatile variable directly.
+ difference_type my_first = my_job.first;
+ result = f(op, begin + my_first);
+ my_job.first++;
+ my_job.load--;
+ }
+
+ RandomAccessIterator current;
+
+# pragma omp barrier
+
+ // Actual work phase
+ // Work on own or stolen start
+ while (busy > 0)
+ {
+ // Work until no productive thread left.
+# pragma omp flush(busy)
+
+ // Thread has own work to do
+ while (my_job.first <= my_job.last)
+ {
+ // fetch-and-add call
+ // Reserve current job block (size chunk_size) in my queue.
+ difference_type current_job =
+ fetch_and_add<difference_type>(&(my_job.first), chunk_size);
+
+ // Update load, to make the three values consistent,
+ // first might have been changed in the meantime
+ my_job.load = my_job.last - my_job.first + 1;
+ for (difference_type job_counter = 0;
+ job_counter < chunk_size && current_job <= my_job.last;
+ job_counter++)
+ {
+ // Yes: process it!
+ current = begin + current_job;
+ current_job++;
+
+ // Do actual work.
+ result = r(result, f(op, current));
+ }
+
+# pragma omp flush(busy)
+ }
+
+ // After reaching this point, a thread's job list is empty.
+ if (iam_working)
+ {
+ // This thread no longer has work.
+# pragma omp atomic
+ busy--;
+
+ iam_working = false;
+ }
+
+ difference_type supposed_first, supposed_last, supposed_load;
+ do
+ {
+ // Find random nonempty deque (not own), do consistency check.
+ yield();
+# pragma omp flush(busy)
+ victim = rand_gen();
+ supposed_first = job[victim * stride].first;
+ supposed_last = job[victim * stride].last;
+ supposed_load = job[victim * stride].load;
+ }
+ while (busy > 0
+ && ((supposed_load <= 0)
+ || ((supposed_first + supposed_load - 1) != supposed_last)));
+
+ if (busy == 0)
+ break;
+
+ if (supposed_load > 0)
+ {
+ // Has work and work to do.
+ // Number of elements to steal (at least one).
+ steal = (supposed_load < 2) ? 1 : supposed_load / 2;
+
+ // Push victim's start forward.
+ difference_type stolen_first =
+ fetch_and_add<difference_type>(
+ &(job[victim * stride].first), steal);
+ difference_type stolen_try =
+ stolen_first + steal - difference_type(1);
+
+ my_job.first = stolen_first;
+ my_job.last = __gnu_parallel::min(stolen_try, supposed_last);
+ my_job.load = my_job.last - my_job.first + 1;
+
+ // Has potential work again.
+# pragma omp atomic
+ busy++;
+ iam_working = true;
+
+# pragma omp flush(busy)
+ }
+# pragma omp flush(busy)
+ } // end while busy > 0
+ // Add accumulated result to output.
+ omp_set_lock(&output_lock);
+ output = r(output, result);
+ omp_unset_lock(&output_lock);
+ }
delete[] job;