#include <dhcp6/dhcp6_log.h>
#include <exceptions/exceptions.h>
#include <stats/stats_mgr.h>
+#include <util/multi_threading_mgr.h>
using namespace std;
+using namespace isc::util;
namespace isc {
namespace dhcp {
}
// Assume erase will never fail so not checking its result.
clients_.erase(locked_->getDuid());
+ if (!client_ || !client_->cont_) {
+ return;
+ }
+ // Try to process next query. As the caller holds the mutex of
+ // the handler class the continuation will be resumed after.
+ MultiThreadingMgr& mt_mgr = MultiThreadingMgr::instance();
+ if (mt_mgr.getMode()) {
+ if (!mt_mgr.getThreadPool().addFront(client_->cont_)) {
+ LOG_DEBUG(dhcp6_logger, DBG_DHCP6_BASIC, DHCP6_PACKET_QUEUE_FULL);
+ }
+ }
+ client_->cont_.reset();
}
bool
-ClientHandler::tryLock(Pkt6Ptr query) {
+ClientHandler::tryLock(Pkt6Ptr query, ContinuationPtr cont) {
if (!query) {
isc_throw(InvalidParameter, "null query in ClientHandler::tryLock");
}
return (false);
}
}
- // This query is a duplicate: currently it is simply dropped.
+ // This query can be a duplicate so put the continuation.
+ if (cont) {
+ Pkt6Ptr next_query = holder->next_query_;
+ holder->next_query_ = query;
+ holder->cont_ = cont;
+ if (next_query) {
+ // Logging a warning as it is supposed to be a rare event
+ // with well behaving clients...
+ LOG_WARN(bad_packet6_logger, DHCP6_PACKET_DROP_DUPLICATE)
+ .arg(next_query->toText())
+ .arg(this_thread::get_id())
+ .arg(query->toText())
+ .arg(this_thread::get_id());
+ stats::StatsMgr::instance().addValue("pkt6-receive-drop",
+ static_cast<int64_t>(1));
+ }
+ return (false);
+ }
+
// Logging a warning as it is supposed to be a rare event
// with well behaving clients...
LOG_WARN(bad_packet6_logger, DHCP6_PACKET_DROP_DUPLICATE)
#include <dhcp6/client_handler.h>
#include <dhcp6/tests/dhcp6_test_utils.h>
#include <stats/stats_mgr.h>
+#include <util/multi_threading_mgr.h>
+#include <unistd.h>
using namespace isc;
using namespace isc::dhcp;
/// @brief Constructor.
///
/// Creates the pkt6-receive-drop statistic.
- ClientHandleTest() {
+ ClientHandleTest() : called1_(false), called2_(false), called3_(false) {
+ MultiThreadingMgr::instance().apply(false, 0, 0);
StatsMgr::instance().setValue("pkt6-receive-drop", static_cast<int64_t>(0));
}
///
/// Removes statistics.
~ClientHandleTest() {
+ MultiThreadingMgr::instance().apply(false, 0, 0);
StatsMgr::instance().removeAll();
}
EXPECT_EQ(0, obs->getInteger().first);
}
}
+
+ /// @brief Waits for pending continuations.
+ void waitForThreads() {
+ while (MultiThreadingMgr::instance().getThreadPool().count() > 0) {
+ usleep(100);
+ }
+ }
+
+ /// @brief Set called1_ to true.
+ void setCalled1() {
+ called1_ = true;
+ }
+
+ /// @brief Set called2_ to true.
+ void setCalled2() {
+ called2_ = true;
+ }
+
+ /// @brief Set called3_ to true.
+ void setCalled3() {
+ called3_ = true;
+ }
+
+ /// @brief The called flag number 1.
+ bool called1_;
+
+ /// @brief The called flag number 2.
+ bool called2_;
+
+ /// @brief The called flag number 3.
+ bool called3_;
+
};
// Verifies behavior with empty block.
} catch (const std::exception& ex) {
ADD_FAILURE() << "unexpected exception: " << ex.what();
}
+ checkStat(false);
}
// Verifies behavior with one query.
// Cannot verifies that empty client ID fails because getClientId() handles
// this condition and replaces it by no client ID.
+// Verifies behavior with two queries for the same client and multi-threading.
+TEST_F(ClientHandleTest, serializeTwoQueries) {
+ // Get two queries.
+ Pkt6Ptr sol(new Pkt6(DHCPV6_SOLICIT, 1234));
+ Pkt6Ptr req(new Pkt6(DHCPV6_REQUEST, 2345));
+ OptionPtr client_id = generateClientId();
+ // Same client ID: same client.
+ sol->addOption(client_id);
+ req->addOption(client_id);
+
+ // Start multi-threading.
+ EXPECT_NO_THROW(MultiThreadingMgr::instance().apply(true, 1, 0));
+
+ try {
+ // Get a client handler.
+ ClientHandler client_handler;
+
+ // Create a continuation.
+ ClientHandler::ContinuationPtr cont1 =
+ ClientHandler::makeContinuation(std::bind(&ClientHandleTest::setCalled1, this));
+
+ // Try to lock it with the solicit.
+ bool duplicate = false;
+ EXPECT_NO_THROW(duplicate = client_handler.tryLock(sol, cont1));
+
+ // Should return false (no duplicate).
+ EXPECT_FALSE(duplicate);
+
+ // Get a second client handler.
+ ClientHandler client_handler2;
+
+ // Create a continuation.
+ ClientHandler::ContinuationPtr cont2 =
+ ClientHandler::makeContinuation(std::bind(&ClientHandleTest::setCalled2, this));
+
+ // Try to lock it with a request.
+ EXPECT_NO_THROW(duplicate = client_handler2.tryLock(req, cont2));
+
+ // Should return false (multi-threading enforces serialization).
+ EXPECT_FALSE(duplicate);
+ } catch (const std::exception& ex) {
+ ADD_FAILURE() << "unexpected exception: " << ex.what();
+ }
+
+ // Give the second continuation a chance.
+ waitForThreads();
+
+ // Force multi-threading to stop;
+ MultiThreadingCriticalSection cs;
+
+ checkStat(false);
+ EXPECT_FALSE(called1_);
+ EXPECT_TRUE(called2_);
+}
+
+// Verifies behavior with two queries for the same client and multi-threading.
+// Continuations are required for serialization.
+TEST_F(ClientHandleTest, serializeNoCont) {
+ // Get two queries.
+ Pkt6Ptr sol(new Pkt6(DHCPV6_SOLICIT, 1234));
+ Pkt6Ptr req(new Pkt6(DHCPV6_REQUEST, 2345));
+ OptionPtr client_id = generateClientId();
+ // Same client ID: same client.
+ sol->addOption(client_id);
+ req->addOption(client_id);
+
+ // Start multi-threading.
+ EXPECT_NO_THROW(MultiThreadingMgr::instance().apply(true, 1, 0));
+
+ try {
+ // Get a client handler.
+ ClientHandler client_handler;
+
+ // Try to lock it with the solicit.
+ bool duplicate = false;
+ EXPECT_NO_THROW(duplicate = client_handler.tryLock(sol));
+
+ // Should return false (no duplicate).
+ EXPECT_FALSE(duplicate);
+
+ // Get a second client handler.
+ ClientHandler client_handler2;
+
+ // Try to lock it with a request.
+ EXPECT_NO_THROW(duplicate = client_handler2.tryLock(req));
+
+ // Should return true (duplicate without continuation).
+ EXPECT_TRUE(duplicate);
+ } catch (const std::exception& ex) {
+ ADD_FAILURE() << "unexpected exception: " << ex.what();
+ }
+
+ // Give the second continuation a chance even there is none...
+ waitForThreads();
+
+ // Force multi-threading to stop;
+ MultiThreadingCriticalSection cs;
+
+ checkStat(true);
+}
+
+// Verifies behavior with three queries for the same client and
+// multi-threading: currently we accept only two queries,
+// a third one replaces second so we get the first (oldest) query and
+// the last (newest) query when the client is busy.
+TEST_F(ClientHandleTest, serializeThreeQueries) {
+ // Get two queries.
+ Pkt6Ptr sol(new Pkt6(DHCPV6_SOLICIT, 1234));
+ Pkt6Ptr req(new Pkt6(DHCPV6_REQUEST, 2345));
+ Pkt6Ptr ren(new Pkt6(DHCPV6_RENEW, 3456));
+ OptionPtr client_id = generateClientId();
+ // Same client ID: same client.
+ sol->addOption(client_id);
+ req->addOption(client_id);
+ ren->addOption(client_id);
+
+ // Start multi-threading.
+ EXPECT_NO_THROW(MultiThreadingMgr::instance().apply(true, 1, 0));
+
+ try {
+ // Get a client handler.
+ ClientHandler client_handler;
+
+ // Create a continuation.
+ ClientHandler::ContinuationPtr cont1 =
+ ClientHandler::makeContinuation(std::bind(&ClientHandleTest::setCalled1, this));
+
+ // Try to lock it with the solicit.
+ bool duplicate = false;
+ EXPECT_NO_THROW(duplicate = client_handler.tryLock(sol, cont1));
+
+ // Should return false (no duplicate).
+ EXPECT_FALSE(duplicate);
+
+ // Get a second client handler.
+ ClientHandler client_handler2;
+
+ // Create a continuation.
+ ClientHandler::ContinuationPtr cont2 =
+ ClientHandler::makeContinuation(std::bind(&ClientHandleTest::setCalled2, this));
+
+ // Try to lock it with a request.
+ EXPECT_NO_THROW(duplicate = client_handler2.tryLock(req, cont2));
+
+ // Should return false (multi-threading enforces serialization).
+ EXPECT_FALSE(duplicate);
+
+ // Get a third client handler.
+ ClientHandler client_handler3;
+
+ // Create a continuation.
+ ClientHandler::ContinuationPtr cont3 =
+ ClientHandler::makeContinuation(std::bind(&ClientHandleTest::setCalled3, this));
+
+ // Try to lock it with a renew.
+ EXPECT_NO_THROW(duplicate = client_handler3.tryLock(ren, cont3));
+
+ // Should return false (multi-threading enforces serialization).
+ EXPECT_FALSE(duplicate);
+ } catch (const std::exception& ex) {
+ ADD_FAILURE() << "unexpected exception: " << ex.what();
+ }
+
+ // Give the second continuation a chance.
+ waitForThreads();
+
+ // Force multi-threading to stop;
+ MultiThreadingCriticalSection cs;
+
+ checkStat(true);
+ EXPECT_FALSE(called1_);
+ EXPECT_FALSE(called2_);
+ EXPECT_TRUE(called3_);
+}
+
} // end of anonymous namespace
};
/// @brief test ThreadPool add and count
-TEST_F(ThreadPoolTest, testAddAndCount) {
+TEST_F(ThreadPoolTest, addAndCount) {
uint32_t items_count;
CallBack call_back;
ThreadPool<CallBack> thread_pool;
}
/// @brief test ThreadPool start and stop
-TEST_F(ThreadPoolTest, testStartAndStop) {
+TEST_F(ThreadPoolTest, startAndStop) {
uint32_t items_count;
uint32_t thread_count;
CallBack call_back;
}
/// @brief test ThreadPool max queue size
-TEST_F(ThreadPoolTest, testMaxQueueSize) {
+TEST_F(ThreadPoolTest, maxQueueSize) {
uint32_t items_count;
CallBack call_back;
ThreadPool<CallBack> thread_pool;
EXPECT_EQ(thread_pool.count(), max_queue_size);
}
+/// @brief test ThreadPool add front.
+TEST_F(ThreadPoolTest, addFront) {
+ uint32_t items_count;
+ CallBack call_back;
+ ThreadPool<CallBack> thread_pool;
+ // the item count should be 0
+ ASSERT_EQ(thread_pool.count(), 0);
+ // the thread count should be 0
+ ASSERT_EQ(thread_pool.size(), 0);
+
+ items_count = 20;
+
+ call_back = std::bind(&ThreadPoolTest::run, this);
+
+ // add items to stopped thread pool
+ bool ret = true;
+ for (uint32_t i = 0; i < items_count; ++i) {
+ EXPECT_NO_THROW(ret = thread_pool.addFront(boost::make_shared<CallBack>(call_back)));
+ EXPECT_TRUE(ret);
+ }
+
+ // the item count should match
+ ASSERT_EQ(thread_pool.count(), items_count);
+
+ // change the max count
+ ASSERT_EQ(thread_pool.getMaxQueueSize(), 0);
+ size_t max_queue_size = 10;
+ thread_pool.setMaxQueueSize(max_queue_size);
+ EXPECT_EQ(thread_pool.getMaxQueueSize(), max_queue_size);
+
+ // adding an item at front should change nothing queue
+ EXPECT_EQ(thread_pool.count(), items_count);
+ EXPECT_NO_THROW(ret = thread_pool.addFront(boost::make_shared<CallBack>(call_back)));
+ EXPECT_FALSE(ret);
+ EXPECT_EQ(thread_pool.count(), items_count);
+}
+
} // namespace
///
/// @tparam WorkItem a functor
/// @tparam Container a 'queue like' container
-template <typename WorkItem, typename Container = std::queue<boost::shared_ptr<WorkItem>>>
+template <typename WorkItem, typename Container = std::deque<boost::shared_ptr<WorkItem>>>
struct ThreadPool {
typedef typename boost::shared_ptr<WorkItem> WorkItemPtr;
/// @return false if the queue was full and oldest item(s) was dropped,
/// true otherwise.
bool add(const WorkItemPtr& item) {
- return (queue_.push(item));
+ return (queue_.push_back(item));
+ }
+
+ /// @brief add a work item to the thread pool at front
+ ///
+ /// @param item the 'functor' object to be added to the queue
+ /// @return false if the queue was full, true otherwise.
+ bool addFront(const WorkItemPtr& item) {
+ return (queue_.push_front(item));
}
/// @brief count number of work items in the queue
/// @param item the new item to be added to the queue
/// @return false if the queue was full and oldest item(s) dropped,
/// true otherwise
- bool push(const Item& item) {
+ bool push_back(const Item& item) {
bool ret = true;
if (!item) {
return (ret);
std::lock_guard<std::mutex> lock(mutex_);
if (max_queue_size_ != 0) {
while (queue_.size() >= max_queue_size_) {
- queue_.pop();
+ queue_.pop_front();
ret = false;
}
}
- queue_.push(item);
+ queue_.push_back(item);
}
// Notify pop function so that it can effectively remove a work item.
cv_.notify_one();
return (ret);
}
+ /// @brief push work item to the queue at front.
+ ///
+ /// Used to add work items to the queue at front.
+ /// When the queue is full the item is not added.
+ ///
+ /// @param item the new item to be added to the queue
+ /// @return false if the queue was full, true otherwise
+ bool push_front(const Item& item) {
+ if (!item) {
+ return (true);
+ }
+ {
+ std::lock_guard<std::mutex> lock(mutex_);
+ if ((max_queue_size_ != 0) &&
+ (queue_.size() >= max_queue_size_)) {
+ return (false);
+ }
+ queue_.push_front(item);
+ }
+ // Notify pop function so that it can effectively remove a work item.
+ cv_.notify_one();
+ return (true);
+ }
+
/// @brief pop work item from the queue or block waiting
///
/// Used to retrieve and remove a work item from the queue
return (Item());
}
Item item = queue_.front();
- queue_.pop();
+ queue_.pop_front();
return (item);
}