/// @param enabled Flag which indicates if thread pool is started and running.
/// @param paused Flag which indicates if thread pool is started and paused.
void checkState(bool mode, size_t size, size_t count, size_t running,
- bool in_cs = false, bool enabled = true, bool paused = false) {
+ bool in_cs = false, bool enabled = false, bool paused = false) {
EXPECT_EQ(MultiThreadingMgr::instance().getMode(), mode);
EXPECT_EQ(MultiThreadingMgr::instance().getThreadPoolSize(), size);
EXPECT_EQ(MultiThreadingMgr::instance().getPacketQueueSize(), count);
checkState(false, 0, 0, 0);
// enable MT with 16 threads and queue size 256
EXPECT_NO_THROW(MultiThreadingMgr::instance().apply(true, 16, 256));
- checkState(true, 16, 256, 16);
+ checkState(true, 16, 256, 16, false, true);
// disable MT
EXPECT_NO_THROW(MultiThreadingMgr::instance().apply(false, 16, 256));
checkState(false, 0, 0, 0);
// enable MT with auto scaling
EXPECT_NO_THROW(MultiThreadingMgr::instance().apply(true, 0, 0));
- checkState(true, MultiThreadingMgr::detectThreadCount(), 0, MultiThreadingMgr::detectThreadCount());
+ checkState(true, MultiThreadingMgr::detectThreadCount(), 0, MultiThreadingMgr::detectThreadCount(), false, true);
// disable MT
EXPECT_NO_THROW(MultiThreadingMgr::instance().apply(false, 0, 0));
checkState(false, 0, 0, 0);
checkState(false, 0, 0, 0);
// apply multi-threading configuration with 16 threads and queue size 256
MultiThreadingMgr::instance().apply(true, 16, 256);
- checkState(true, 16, 256, 16);
+ checkState(true, 16, 256, 16, false, true);
// use scope to test constructor and destructor
{
MultiThreadingCriticalSection cs;
- checkState(true, 16, 256, 0, true);
+ checkState(true, 16, 256, 16, true, true, true);
// use scope to test constructor and destructor
{
MultiThreadingCriticalSection inner_cs;
- checkState(true, 16, 256, 0, true);
+ checkState(true, 16, 256, 16, true, true, true);
}
- checkState(true, 16, 256, 0, true);
+ checkState(true, 16, 256, 16, true, true, true);
}
- checkState(true, 16, 256, 16);
+ checkState(true, 16, 256, 16, false, true);
// use scope to test constructor and destructor
{
MultiThreadingCriticalSection cs;
- checkState(true, 16, 256, 0, true);
+ checkState(true, 16, 256, 16, true, true, true);
// apply multi-threading configuration with 64 threads and queue size 4
MultiThreadingMgr::instance().apply(true, 64, 4);
checkState(true, 64, 4, 0, true);
}
- checkState(true, 64, 4, 64);
+ checkState(true, 64, 4, 64, false, true);
// use scope to test constructor and destructor
{
MultiThreadingCriticalSection cs;
- checkState(true, 64, 4, 0, true);
+ checkState(true, 64, 4, 64, true, true, true);
// apply multi-threading configuration with 0 threads
MultiThreadingMgr::instance().apply(false, 64, 256);
checkState(false, 0, 0, 0, true);
MultiThreadingMgr::instance().apply(true, 64, 256);
checkState(true, 64, 256, 0, true);
}
- checkState(true, 64, 256, 64);
+ checkState(true, 64, 256, 64, false, true);
// apply multi-threading configuration with 0 threads
MultiThreadingMgr::instance().apply(false, 0, 0);
checkState(false, 0, 0, 0);
sigaddset(&sset, SIGHUP);
sigaddset(&sset, SIGTERM);
pthread_sigmask(SIG_BLOCK, &sset, &osset);
- queue_.enable();
+ queue_.enable(thread_count);
try {
for (uint32_t i = 0; i < thread_count; ++i) {
threads_.push_back(boost::make_shared<std::thread>(&ThreadPool::run, this));
clear();
}
- /// @brief register thread so that it can be taken into account
- void registerThread() {
- std::lock_guard<std::mutex> lock(mutex_);
- ++working_;
- }
-
- /// @brief unregister thread so that it can be ignored
- void unregisterThread() {
- std::lock_guard<std::mutex> lock(mutex_);
- --working_;
- }
-
/// @brief set maximum number of work items in the queue
///
/// @return the maximum size (0 means unlimited)
}
// Wait for push or disable functions.
cv_.wait(lock, [&]() {return (!enabled_ || (!queue_.empty() && !paused_));});
- ++working_;
if (!enabled_) {
return (Item());
}
+ ++working_;
size_t length = queue_.size();
stat10 = stat10 * CEXP10 + (1 - CEXP10) * length;
stat100 = stat100 * CEXP100 + (1 - CEXP100) * length;
/// @brief enable the queue
///
/// Sets the queue state to 'enabled'
- void enable() {
+ ///
+ /// @param thread_count number of working threads
+ void enable(uint32_t thread_count) {
std::lock_guard<std::mutex> lock(mutex_);
enabled_ = true;
+ working_ = thread_count;
}
/// @brief disable the queue
/// @brief run function of each thread
void run() {
- bool work = queue_.enabled();
- if (!work) {
- return;
- }
- queue_.registerThread();
- for (; work; work = queue_.enabled()) {
+ for (bool work = true; work; work = queue_.enabled()) {
WorkItemPtr item = queue_.pop();
if (item) {
try {
}
}
}
- queue_.unregisterThread();
}
/// @brief list of worker threads