checkState(thread_pool, 0, 0);
items_count = 16;
- thread_count = 16;
+ thread_count = 256;
// prepare setup
reset(thread_count);
// calling stop should clear all threads and should keep queued items
EXPECT_NO_THROW(thread_pool.stop());
- // the thread count should be 0
- ASSERT_EQ(thread_pool.size(), 0);
+ checkState(thread_pool, 0, 0);
+
// wait for all items to be processed
ASSERT_TRUE(thread_pool.wait(1));
checkState(thread_pool, 0, 0);
/// Creates the thread pool queue in 'disabled' state
ThreadPoolQueue()
: enabled_(false), paused_(false), max_queue_size_(0), working_(0),
- stat10(0.), stat100(0.), stat1000(0.) {
+ unavailable_(0), stat10(0.), stat100(0.), stat1000(0.) {
}
/// @brief Destructor
clear();
}
+ /// @brief register thread so that it can be taken into account
+ void registerThread() {
+ std::lock_guard<std::mutex> lock(mutex_);
+ ++working_;
+ --unavailable_;
+ }
+
+ /// @brief unregister thread so that it can be ignored
+ void unregisterThread() {
+ std::lock_guard<std::mutex> lock(mutex_);
+ --working_;
+ ++unavailable_;
+ }
+
/// @brief set maximum number of work items in the queue
///
/// @return the maximum size (0 means unlimited)
std::unique_lock<std::mutex> lock(mutex_);
--working_;
// Signal thread waiting for threads to pause.
- if (working_ == 0 && paused_) {
+ if (paused_ && working_ == 0 && unavailable_ == 0) {
wait_threads_cv_.notify_all();
}
// Signal thread waiting for tasks to finish.
}
// Wait for push or disable functions.
cv_.wait(lock, [&]() {return (!enabled_ || (!queue_.empty() && !paused_));});
+ ++working_;
if (!enabled_) {
return (Item());
}
- ++working_;
size_t length = queue_.size();
stat10 = stat10 * CEXP10 + (1 - CEXP10) * length;
stat100 = stat100 * CEXP100 + (1 - CEXP100) * length;
paused_ = true;
if (wait) {
// Wait for working threads to finish.
- wait_threads_cv_.wait(lock, [&]() {return (working_ == 0);});
+ wait_threads_cv_.wait(lock, [&]() {return (working_ == 0 && unavailable_ == 0);});
}
}
void enable(uint32_t thread_count) {
std::lock_guard<std::mutex> lock(mutex_);
enabled_ = true;
- working_ = thread_count;
+ unavailable_ = thread_count;
}
/// @brief disable the queue
/// @brief number of threads currently doing work
uint32_t working_;
+ /// @brief number of threads not running
+ uint32_t unavailable_;
+
/// @brief queue length statistic for 10 packets
double stat10;
/// @brief run function of each thread
void run() {
+ queue_.registerThread();
for (bool work = true; work; work = queue_.enabled()) {
WorkItemPtr item = queue_.pop();
if (item) {
}
}
}
+ queue_.unregisterThread();
}
/// @brief list of worker threads