return (queue_.count());
}
+ /// @brief wait for current items to be processed
+ ///
+ /// Used to block the calling thread until all items in the queue have
+ /// been processed
+ void wait() {
+ auto id = std::this_thread::get_id();
+ if (checkThreadId(id)) {
+ isc_throw(InvalidOperation, "thread pool stop called by owned thread");
+ }
+ queue_.wait();
+ }
+
/// @brief set maximum number of work items in the queue
///
/// @param max_queue_size the maximum size (0 means unlimited)
/// @param thread_count specifies the number of threads to be created and
/// started
void startInternal(uint32_t thread_count) {
- queue_.enable();
+ queue_.enable(thread_count);
for (uint32_t i = 0; i < thread_count; ++i) {
threads_.push_back(boost::make_shared<std::thread>(&ThreadPool::run, this));
}
///
/// Creates the thread pool queue in 'disabled' state
ThreadPoolQueue()
- : enabled_(false), max_queue_size_(0),
+ : enabled_(false), max_queue_size_(0), working_(0),
stat10(0.), stat100(0.), stat1000(0.) {
}
/// @return the first work item from the queue or an empty element.
Item pop() {
std::unique_lock<std::mutex> lock(mutex_);
+ --working_;
// Wait for push or disable functions.
+ if (working_ == 0 && queue_.empty()) {
+ wait_cv_.notify_all();
+ }
cv_.wait(lock, [&]() {return (!enabled_ || !queue_.empty());});
if (!enabled_) {
return (Item());
}
+ ++working_;
size_t length = queue_.size();
stat10 = stat10 * CEXP10 + (1 - CEXP10) * length;
stat100 = stat100 * CEXP100 + (1 - CEXP100) * length;
return (queue_.size());
}
- /// @brief get queue length statistic
+ /// @brief wait for current items to be processed
+ ///
+ /// Used to block the calling thread until all items in the queue have
+ /// been processed
+ void wait() {
+ std::unique_lock<std::mutex> lock(mutex_);
+ // Wait for any item or for working threads to finish.
+ wait_cv_.wait(lock, [&]() {return (working_ == 0 && queue_.empty());});
+ }
+
+ /// @brief get queue length statistic
///
/// @param which select the statistic (10, 100 or 1000)
/// @return the queue length statistic
/// @brief enable the queue
///
/// Sets the queue state to 'enabled'
- void enable() {
+ ///
+ /// @param number of working threads
+ void enable(uint32_t thread_count) {
std::lock_guard<std::mutex> lock(mutex_);
enabled_ = true;
+ working_ = thread_count;
}
/// @brief disable the queue
/// @brief condition variable used to signal waiting threads
std::condition_variable cv_;
+ /// @brief condition variable used to wait for all items to be processed
+ std::condition_variable wait_cv_;
+
/// @brief the sate of the queue
/// The 'enabled' state corresponds to true value
/// The 'disabled' state corresponds to false value
/// (0 means unlimited)
size_t max_queue_size_;
+ /// @brief number of threads currently doing work
+ uint32_t working_;
+
/// @brief queue length statistic for 10 packets
double stat10;