ASSERT_EQ(thread_pool.count(), 0);
// the thread count should be 0
ASSERT_EQ(thread_pool.size(), 0);
+
+ /// statistics
+ std::cout << "stat10: " << thread_pool.getQueueStat(10) << std::endl;
+ std::cout << "stat100: " << thread_pool.getQueueStat(100) << std::endl;
+ std::cout << "stat1000: " << thread_pool.getQueueStat(1000) << std::endl;
}
/// @brief test ThreadPool max queue size
EXPECT_EQ(thread_pool.count(), items_count);
}
+/// @brief test ThreadPool get queue statistics.
+TEST_F(ThreadPoolTest, getQueueStat) {
+ ThreadPool<CallBack> thread_pool;
+ EXPECT_THROW(thread_pool.getQueueStat(0), InvalidParameter);
+ EXPECT_THROW(thread_pool.getQueueStat(1), InvalidParameter);
+ EXPECT_THROW(thread_pool.getQueueStat(-10), InvalidParameter);
+ EXPECT_THROW(thread_pool.getQueueStat(10000), InvalidParameter);
+ EXPECT_NO_THROW(thread_pool.getQueueStat(10));
+ EXPECT_NO_THROW(thread_pool.getQueueStat(100));
+ EXPECT_NO_THROW(thread_pool.getQueueStat(1000));
+}
+
} // namespace
#include <boost/shared_ptr.hpp>
#include <atomic>
+#include <cmath>
#include <condition_variable>
#include <list>
#include <mutex>
/// @tparam Container a 'queue like' container
template <typename WorkItem, typename Container = std::deque<boost::shared_ptr<WorkItem>>>
struct ThreadPool {
+ /// @brief Rounding value for 10 packet statistic.
+ static const double CEXP10;
+
+ /// @brief Rounding value for 100 packet statistic.
+ static const double CEXP100;
+
+ /// @brief Rounding value for 1000 packet statistic.
+ static const double CEXP1000;
+
+ /// @brief Type of shared pointers to work items.
typedef typename boost::shared_ptr<WorkItem> WorkItemPtr;
/// @brief Constructor
return (threads_.size());
}
+ /// @brief get queue length statistic
+ ///
+ /// @param which select the statistic (10, 100 or 1000)
+ /// @return the queue length statistic
+ /// @throw InvalidParameter if which is not 10 and 100 and 1000.
+ double getQueueStat(size_t which) {
+ return (queue_.getQueueStat(which));
+ }
+
private:
/// @brief start all the threads
///
/// @brief Constructor
///
/// Creates the thread pool queue in 'disabled' state
- ThreadPoolQueue() : enabled_(false), max_queue_size_(0) {
+ ThreadPoolQueue()
+ : enabled_(false), max_queue_size_(0),
+ stat10(0.), stat100(0.), stat1000(0.) {
}
/// @brief Destructor
/// If the queue is 'enabled', this function returns the first element in
/// the queue or blocks the calling thread if there are no work items
/// available.
+ /// Before a work item is returned statistics are updated.
///
/// @return the first work item from the queue or an empty element.
Item pop() {
if (!enabled_) {
return (Item());
}
+ size_t length = queue_.size();
+ stat10 = stat10 * CEXP10 + (1 - CEXP10) * length;
+ stat100 = stat100 * CEXP100 + (1 - CEXP100) * length;
+ stat1000 = stat1000 * CEXP1000 + (1 - CEXP1000) * length;
Item item = queue_.front();
queue_.pop_front();
return (item);
return (queue_.size());
}
+ /// @brief get queue length statistic
+ ///
+ /// @param which select the statistic (10, 100 or 1000)
+ /// @return the queue length statistic
+ /// @throw InvalidParameter if which is not 10 and 100 and 1000.
+ double getQueueStat(size_t which) {
+ std::lock_guard<std::mutex> lock(mutex_);
+ switch (which) {
+ case 10:
+ return (stat10);
+ case 100:
+ return (stat100);
+ case 1000:
+ return (stat1000);
+ default:
+ isc_throw(InvalidParameter, "supported statistic for "
+ << "10/100/1000 only, not " << which);
+ }
+ }
+
/// @brief clear remove all work items
///
/// Removes all queued work items
/// @brief maximum number of work items in the queue
/// (0 means unlimited)
size_t max_queue_size_;
+
+ /// @brief queue length statistic for 10 packets
+ double stat10;
+
+ /// @brief queue length statistic for 100 packets
+ double stat100;
+
+ /// @brief queue length statistic for 1000 packets
+ double stat1000;
};
/// @brief run function of each thread
ThreadPoolQueue<WorkItemPtr, Container> queue_;
};
+/// Initialize the 10 packet rounding to exp(-.1)
+template <typename W, typename C>
+ const double ThreadPool<W, C>::CEXP10 = std::exp(-.1);
+
+/// Initialize the 100 packet rounding to exp(-.01)
+template <typename W, typename C>
+const double ThreadPool<W, C>::CEXP100 = std::exp(-.01);
+
+/// Initialize the 1000 packet rounding to exp(-.1)
+template <typename W, typename C>
+const double ThreadPool<W, C>::CEXP1000 = std::exp(-.001);
+
} // namespace util
} // namespace isc