]> git.ipfire.org Git - thirdparty/kea.git/commitdiff
[#260,!120] Addressed more review comments
authorThomas Markwalder <tmark@isc.org>
Tue, 20 Nov 2018 15:35:56 +0000 (10:35 -0500)
committerThomas Markwalder <tmark@isc.org>
Tue, 20 Nov 2018 15:35:56 +0000 (10:35 -0500)
    Removed RCV_ prefix from WatchedThread::WatchtType members
    Scrubbed WatchedThread source for lingering packet/receiver comments
    Adjusted test wait times

src/lib/dhcp/iface_mgr.cc
src/lib/dhcp/packet_queue_mgr.h
src/lib/util/threads/tests/watched_thread_unittest.cc
src/lib/util/threads/watched_thread.cc
src/lib/util/threads/watched_thread.h

index a11a3daa5094642fbaa74ff7c42a296b10175196..d2e32d2233fdc16e1ce7cdde1d3f818447a4f5f4 100644 (file)
@@ -991,10 +991,10 @@ Pkt4Ptr IfaceMgr::receive4Indirect(uint32_t timeout_sec, uint32_t timeout_usec /
     }
 
     // Add Receiver ready watch socket
-    addFDtoSet(dhcp_receiver_->getWatchFd(WatchedThread::RCV_READY), maxfd, &sockets);
+    addFDtoSet(dhcp_receiver_->getWatchFd(WatchedThread::READY), maxfd, &sockets);
 
     // Add Receiver error watch socket
-    addFDtoSet(dhcp_receiver_->getWatchFd(WatchedThread::RCV_ERROR), maxfd, &sockets);
+    addFDtoSet(dhcp_receiver_->getWatchFd(WatchedThread::ERROR), maxfd, &sockets);
 
     // Set timeout for our next select() call.  If there are
     // no DHCP packets to read, then we'll wait for a finite
@@ -1037,9 +1037,9 @@ Pkt4Ptr IfaceMgr::receive4Indirect(uint32_t timeout_sec, uint32_t timeout_usec /
     // We only check external sockets if select detected an event.
     if (result > 0) {
         // Check for receiver thread read errors.
-        if (dhcp_receiver_->isReady(WatchedThread::RCV_ERROR)) {
+        if (dhcp_receiver_->isReady(WatchedThread::ERROR)) {
             string msg = dhcp_receiver_->getLastError();
-            dhcp_receiver_->clearReady(WatchedThread::RCV_ERROR);
+            dhcp_receiver_->clearReady(WatchedThread::ERROR);
             isc_throw(SocketReadError, msg);
         }
 
@@ -1065,7 +1065,7 @@ Pkt4Ptr IfaceMgr::receive4Indirect(uint32_t timeout_sec, uint32_t timeout_usec /
     // If we're here it should only be because there are DHCP packets waiting.
     Pkt4Ptr pkt = getPacketQueue4()->dequeuePacket();
     if (!pkt) {
-        dhcp_receiver_->clearReady(WatchedThread::RCV_READY);
+        dhcp_receiver_->clearReady(WatchedThread::READY);
     }
 
     return (pkt);
@@ -1317,10 +1317,10 @@ IfaceMgr::receive6Indirect(uint32_t timeout_sec, uint32_t timeout_usec /* = 0 */
     }
 
     // Add Receiver ready watch socket
-    addFDtoSet(dhcp_receiver_->getWatchFd(WatchedThread::RCV_READY), maxfd, &sockets);
+    addFDtoSet(dhcp_receiver_->getWatchFd(WatchedThread::READY), maxfd, &sockets);
 
     // Add Receiver error watch socket
-    addFDtoSet(dhcp_receiver_->getWatchFd(WatchedThread::RCV_ERROR), maxfd, &sockets);
+    addFDtoSet(dhcp_receiver_->getWatchFd(WatchedThread::ERROR), maxfd, &sockets);
 
     // Set timeout for our next select() call.  If there are
     // no DHCP packets to read, then we'll wait for a finite
@@ -1363,9 +1363,9 @@ IfaceMgr::receive6Indirect(uint32_t timeout_sec, uint32_t timeout_usec /* = 0 */
     // We only check external sockets if select detected an event.
     if (result > 0) {
         // Check for receiver thread read errors.
-        if (dhcp_receiver_->isReady(WatchedThread::RCV_ERROR)) {
+        if (dhcp_receiver_->isReady(WatchedThread::ERROR)) {
             string msg = dhcp_receiver_->getLastError();
-            dhcp_receiver_->clearReady(WatchedThread::RCV_ERROR);
+            dhcp_receiver_->clearReady(WatchedThread::ERROR);
             isc_throw(SocketReadError, msg);
         }
 
@@ -1391,7 +1391,7 @@ IfaceMgr::receive6Indirect(uint32_t timeout_sec, uint32_t timeout_usec /* = 0 */
     // If we're here it should only be because there are DHCP packets waiting.
     Pkt6Ptr pkt = getPacketQueue6()->dequeuePacket();
     if (!pkt) {
-        dhcp_receiver_->clearReady(WatchedThread::RCV_READY);
+        dhcp_receiver_->clearReady(WatchedThread::READY);
     }
 
     return (pkt);
@@ -1406,7 +1406,7 @@ IfaceMgr::receiveDHCP4Packets() {
     FD_ZERO(&sockets);
 
     // Add terminate watch socket.
-    addFDtoSet(dhcp_receiver_->getWatchFd(WatchedThread::RCV_TERMINATE), maxfd, &sockets);
+    addFDtoSet(dhcp_receiver_->getWatchFd(WatchedThread::TERMINATE), maxfd, &sockets);
 
     // Add Interface sockets.
     BOOST_FOREACH(iface, ifaces_) {
@@ -1481,7 +1481,7 @@ IfaceMgr::receiveDHCP6Packets() {
     FD_ZERO(&sockets);
 
     // Add terminate watch socket.
-    addFDtoSet(dhcp_receiver_->getWatchFd(WatchedThread::RCV_TERMINATE), maxfd, &sockets);
+    addFDtoSet(dhcp_receiver_->getWatchFd(WatchedThread::TERMINATE), maxfd, &sockets);
 
     // Add Interface sockets.
     BOOST_FOREACH(iface, ifaces_) {
@@ -1572,7 +1572,7 @@ IfaceMgr::receiveDHCP4Packet(Iface& iface, const SocketInfo& socket_info) {
 
     if (pkt) {
         getPacketQueue4()->enqueuePacket(pkt, socket_info);
-        dhcp_receiver_->markReady(WatchedThread::RCV_READY);
+        dhcp_receiver_->markReady(WatchedThread::READY);
     }
 }
 
@@ -1603,7 +1603,7 @@ IfaceMgr::receiveDHCP6Packet(const SocketInfo& socket_info) {
 
     if (pkt) {
         getPacketQueue6()->enqueuePacket(pkt, socket_info);
-        dhcp_receiver_->markReady(WatchedThread::RCV_READY);
+        dhcp_receiver_->markReady(WatchedThread::READY);
     }
 }
 
index fcc555d01ed406a73e41dcad5e9b2d246c789af4..892a1b472ba97943b38fcebe3901dc064cd0ba9a 100644 (file)
@@ -84,6 +84,10 @@ public:
     /// This function is used to remove the factory function for a given type.
     /// Typically, it would be called when unloading the hook library which
     /// loaded the type, and thus called by the library's @c unload function.
+    /// In addition to removing the factory, it will also destroy the current
+    /// queue if it is of the same queue-type as the factory being removed.
+    /// This avoids the nastiness that occurs when objecs are left in existence
+    /// after their library is unloaded.
     ///
     /// @param queue_type queue type, e.g. "kea-ring4".
     ///
index 75310374a0478ed5af7c4e9a4b16bfaecedd3ac6..0cdc8ce9e186c7ac6c41623e709d520767a71366 100644 (file)
@@ -36,7 +36,7 @@ public:
     /// @brief Sleeps for a given number of event periods sleep
     /// Each period is 50 ms.
     void nap(int periods) {
-        usleep(periods * 500 * 1000);
+        usleep(periods * 50 * 1000);
     };
 
     /// @brief Worker function to be used by the WatchedThread's thread
@@ -59,13 +59,13 @@ public:
             // On the second pass, set the event.
             if (passes_ == 2) {
                 switch (watch_type) {
-                case WatchedThread::RCV_ERROR:
+                case WatchedThread::ERROR:
                     wthread_->setError("we have an error");
                     break;
-                case WatchedThread::RCV_READY:
+                case WatchedThread::READY:
                     wthread_->markReady(watch_type);
                     break;
-                case WatchedThread::RCV_TERMINATE:
+                case WatchedThread::TERMINATE:
                 default:
                     // Do nothing, we're waiting to be told to stop.
                     break;
@@ -80,7 +80,7 @@ public:
         wthread_->setError("thread expired");
     }
 
-    /// @brief Current receiver instance.
+    /// @brief Current WatchedThread instance.
     WatchedThreadPtr wthread_;
 
     /// @brief Counter used to track the number of passes made
@@ -93,26 +93,26 @@ const int WatchedThreadTest::WORKER_MAX_PASSES = 5;
 /// Verifies the basic operation of the WatchedThread class.
 /// It checks that a WatchedThread can be created, can be stopped,
 /// and that in set and clear sockets.
-TEST_F(WatchedThreadTest, receiverClassBasics) {
+TEST_F(WatchedThreadTest, watchedThreadClassBasics) {
 
-    /// We'll create a receiver and let it run until it expires.  (Note this is more
-    /// of a test of WatchedThreadTest itself and ensures our tests later for why we
-    /// exited are sound.)
+    /// We'll create a WatchedThread and let it run until it expires.  (Note this is more
+    /// of a test of WatchedThreadTest itself and ensures that the assumptions made in
+    /// our other tests as to why threads have finished are sound.
     wthread_.reset(new WatchedThread());
     ASSERT_FALSE(wthread_->isRunning());
-    wthread_->start(boost::bind(&WatchedThreadTest::worker, this, WatchedThread::RCV_TERMINATE));
+    wthread_->start(boost::bind(&WatchedThreadTest::worker, this, WatchedThread::TERMINATE));
     ASSERT_TRUE(wthread_->isRunning());
 
-    // Wait long enough for thread to expire.
-    nap(WORKER_MAX_PASSES + 1);
+    // Wait more long enough (we hope) for the thread to expire.
+    nap(WORKER_MAX_PASSES * 4);
 
     // It should have done the maximum number of passes.
     EXPECT_EQ(passes_, WORKER_MAX_PASSES);
 
     // Error should be ready and error text should be "thread expired".
-    ASSERT_TRUE(wthread_->isReady(WatchedThread::RCV_ERROR));
-    ASSERT_FALSE(wthread_->isReady(WatchedThread::RCV_READY));
-    ASSERT_FALSE(wthread_->isReady(WatchedThread::RCV_TERMINATE));
+    ASSERT_TRUE(wthread_->isReady(WatchedThread::ERROR));
+    ASSERT_FALSE(wthread_->isReady(WatchedThread::READY));
+    ASSERT_FALSE(wthread_->isReady(WatchedThread::TERMINATE));
     EXPECT_EQ("thread expired", wthread_->getLastError());
 
     // Thread is technically still running, so let's stop it.
@@ -121,17 +121,17 @@ TEST_F(WatchedThreadTest, receiverClassBasics) {
     ASSERT_FALSE(wthread_->isRunning());
 
     /// Now we'll test stopping a thread.
-    /// Start the receiver, let it run a little and then tell it to stop.
-    wthread_->start(boost::bind(&WatchedThreadTest::worker, this, WatchedThread::RCV_TERMINATE));
+    /// Start the WatchedThread, let it run a little and then tell it to stop.
+    wthread_->start(boost::bind(&WatchedThreadTest::worker, this, WatchedThread::TERMINATE));
     ASSERT_TRUE(wthread_->isRunning());
 
     // No watches should be ready.
-    ASSERT_FALSE(wthread_->isReady(WatchedThread::RCV_ERROR));
-    ASSERT_FALSE(wthread_->isReady(WatchedThread::RCV_READY));
-    ASSERT_FALSE(wthread_->isReady(WatchedThread::RCV_TERMINATE));
+    ASSERT_FALSE(wthread_->isReady(WatchedThread::ERROR));
+    ASSERT_FALSE(wthread_->isReady(WatchedThread::READY));
+    ASSERT_FALSE(wthread_->isReady(WatchedThread::TERMINATE));
 
     // Wait a little while.
-    nap(2);
+    nap(3);
 
     // Tell it to stop.
     wthread_->stop();
@@ -141,27 +141,27 @@ TEST_F(WatchedThreadTest, receiverClassBasics) {
     EXPECT_LT(passes_, WORKER_MAX_PASSES);
 
     // No watches should be ready.  Error text should be "thread stopped".
-    ASSERT_FALSE(wthread_->isReady(WatchedThread::RCV_ERROR));
-    ASSERT_FALSE(wthread_->isReady(WatchedThread::RCV_READY));
-    ASSERT_FALSE(wthread_->isReady(WatchedThread::RCV_TERMINATE));
+    ASSERT_FALSE(wthread_->isReady(WatchedThread::ERROR));
+    ASSERT_FALSE(wthread_->isReady(WatchedThread::READY));
+    ASSERT_FALSE(wthread_->isReady(WatchedThread::TERMINATE));
     EXPECT_EQ("thread stopped", wthread_->getLastError());
 
 
     // Next we'll test error notification.
-    // Start the receiver with a thread that sets an error on the second pass.
-    wthread_->start(boost::bind(&WatchedThreadTest::worker, this, WatchedThread::RCV_ERROR));
+    // Start the WatchedThread with a thread that sets an error on the second pass.
+    wthread_->start(boost::bind(&WatchedThreadTest::worker, this, WatchedThread::ERROR));
     ASSERT_TRUE(wthread_->isRunning());
 
     // No watches should be ready.
-    ASSERT_FALSE(wthread_->isReady(WatchedThread::RCV_ERROR));
-    ASSERT_FALSE(wthread_->isReady(WatchedThread::RCV_READY));
-    ASSERT_FALSE(wthread_->isReady(WatchedThread::RCV_TERMINATE));
+    ASSERT_FALSE(wthread_->isReady(WatchedThread::ERROR));
+    ASSERT_FALSE(wthread_->isReady(WatchedThread::READY));
+    ASSERT_FALSE(wthread_->isReady(WatchedThread::TERMINATE));
 
     // Wait a little while.
-    nap(2);
+    nap(3);
 
     // It should now indicate an error.
-    ASSERT_TRUE(wthread_->isReady(WatchedThread::RCV_ERROR));
+    ASSERT_TRUE(wthread_->isReady(WatchedThread::ERROR));
     EXPECT_EQ("we have an error", wthread_->getLastError());
 
     // Tell it to stop.
@@ -172,27 +172,27 @@ TEST_F(WatchedThreadTest, receiverClassBasics) {
     EXPECT_LT(passes_, WORKER_MAX_PASSES);
 
     // No watches should be ready.  Error text should be "thread stopped".
-    ASSERT_FALSE(wthread_->isReady(WatchedThread::RCV_ERROR));
-    ASSERT_FALSE(wthread_->isReady(WatchedThread::RCV_READY));
-    ASSERT_FALSE(wthread_->isReady(WatchedThread::RCV_TERMINATE));
+    ASSERT_FALSE(wthread_->isReady(WatchedThread::ERROR));
+    ASSERT_FALSE(wthread_->isReady(WatchedThread::READY));
+    ASSERT_FALSE(wthread_->isReady(WatchedThread::TERMINATE));
     EXPECT_EQ("thread stopped", wthread_->getLastError());
 
 
     // Finally, we'll test data ready notification.
-    // We'll start the receiver with a thread that indicates data ready on its second pass.
-    wthread_->start(boost::bind(&WatchedThreadTest::worker, this, WatchedThread::RCV_READY));
+    // We'll start the WatchedThread with a thread that indicates data ready on its second pass.
+    wthread_->start(boost::bind(&WatchedThreadTest::worker, this, WatchedThread::READY));
     ASSERT_TRUE(wthread_->isRunning());
 
     // No watches should be ready.
-    ASSERT_FALSE(wthread_->isReady(WatchedThread::RCV_ERROR));
-    ASSERT_FALSE(wthread_->isReady(WatchedThread::RCV_READY));
-    ASSERT_FALSE(wthread_->isReady(WatchedThread::RCV_TERMINATE));
+    ASSERT_FALSE(wthread_->isReady(WatchedThread::ERROR));
+    ASSERT_FALSE(wthread_->isReady(WatchedThread::READY));
+    ASSERT_FALSE(wthread_->isReady(WatchedThread::TERMINATE));
 
     // Wait a little while.
-    nap(2);
+    nap(3);
 
     // It should now indicate data ready.
-    ASSERT_TRUE(wthread_->isReady(WatchedThread::RCV_READY));
+    ASSERT_TRUE(wthread_->isReady(WatchedThread::READY));
 
     // Tell it to stop.
     wthread_->stop();
@@ -202,9 +202,9 @@ TEST_F(WatchedThreadTest, receiverClassBasics) {
     EXPECT_LT(passes_, WORKER_MAX_PASSES);
 
     // No watches should be ready.  Error text should be "thread stopped".
-    ASSERT_FALSE(wthread_->isReady(WatchedThread::RCV_ERROR));
-    ASSERT_FALSE(wthread_->isReady(WatchedThread::RCV_READY));
-    ASSERT_FALSE(wthread_->isReady(WatchedThread::RCV_TERMINATE));
+    ASSERT_FALSE(wthread_->isReady(WatchedThread::ERROR));
+    ASSERT_FALSE(wthread_->isReady(WatchedThread::READY));
+    ASSERT_FALSE(wthread_->isReady(WatchedThread::TERMINATE));
     EXPECT_EQ("thread stopped", wthread_->getLastError());
 }
 
index 6925a4ce54706084e3f7fec06e6b2a4bea04c979..9f6b891a414400f6795e2664db937613da22dc4d 100644 (file)
@@ -13,9 +13,9 @@ namespace thread {
 
 void
 WatchedThread::start(const boost::function<void()>& thread_main) {
-    clearReady(RCV_ERROR);
-    clearReady(RCV_READY);
-    clearReady(RCV_TERMINATE);
+    clearReady(ERROR);
+    clearReady(READY);
+    clearReady(TERMINATE);
     last_error_ = "no error";
     thread_.reset(new Thread(thread_main));
 }
@@ -42,8 +42,8 @@ WatchedThread::clearReady(WatchType watch_type) {
 
 bool
 WatchedThread::shouldTerminate() {
-    if (sockets_[RCV_TERMINATE].isReady()) {
-        clearReady(RCV_TERMINATE);
+    if (sockets_[TERMINATE].isReady()) {
+        clearReady(TERMINATE);
         return (true);
     }
 
@@ -53,20 +53,20 @@ WatchedThread::shouldTerminate() {
 void
 WatchedThread::stop() {
     if (thread_) {
-        markReady(RCV_TERMINATE);
+        markReady(TERMINATE);
         thread_->wait();
         thread_.reset();
     }
 
-    clearReady(RCV_ERROR);
-    clearReady(RCV_READY);
+    clearReady(ERROR);
+    clearReady(READY);
     last_error_ = "thread stopped";
 }
 
 void
 WatchedThread::setError(const std::string& error_msg) {
     last_error_ = error_msg;
-    markReady(RCV_ERROR);
+    markReady(ERROR);
 }
 
 std::string
index c9dd99cbcec553fed99e90907d4f12d321f2a146..d0243be032f74bd989228891fb9eb546188ddf60 100644 (file)
@@ -16,7 +16,7 @@ namespace isc {
 namespace util {
 namespace thread {
 
-/// @brief Provides a thread and controls for receiving packets.
+/// @brief Provides a thread and controls for monitoring its activities
 ///
 /// Given a "worker function", this class creates a thread which
 /// runs the function and provides the means to monitor the thread
@@ -28,9 +28,9 @@ public:
     /// @brief Enumerates the list of watch sockets used to mark events
     /// These are used as arguments to watch socket accessor methods.
     enum WatchType {
-        RCV_ERROR = 0,
-        RCV_READY = 1,
-        RCV_TERMINATE = 2
+        ERROR = 0,
+        READY = 1,
+        TERMINATE = 2
     };
 
     /// @brief Constructor
@@ -61,39 +61,35 @@ public:
     /// @param watch_type indicates which watch socket to clear
     void clearReady(WatchType watch_type);
 
-    /// @brief Checks if the receiver thread should terminate
+    /// @brief Checks if the thread should terminate
     ///
-    /// Performs a "one-shot" check of the receiver's terminate
-    /// watch socket.  If it is ready, return true and then clear
-    /// it, otherwise return false.
+    /// Performs a "one-shot" check of the terminate watch socket.
+    /// If it is ready, return true and then clear it, otherwise
+    /// return false.
     ///
     /// @return true if the terminate watch socket is ready
     bool shouldTerminate();
 
     /// @brief Creates and runs the thread.
     ///
-    /// Creates teh receiver's thread, passing into it the given
-    /// function to run.
+    /// Creates the thread, passing into it the given function to run.
     ///
-    /// @param thread_main function the receiver's thread should run
+    /// @param thread_main function the thread should run
     void start(const boost::function<void()>& thread_main);
 
-    /// @brief Returns true if the receiver thread is running
-    /// @todo - this may need additional logic to handle cases where
-    /// a thread function exits w/o the caller invoking @c
-    /// WatchedThread::stop().
+    /// @brief Returns true if the thread is running
     bool isRunning() {
         return (thread_ != 0);
     }
 
-    /// @brief Terminates the receiver thread
+    /// @brief Terminates the thread
     ///
     /// It marks the terminate watch socket ready, and then waits for the
-    /// thread to stop.  At this point, the receiver is defunct.  This is
+    /// thread to stop.  At this point, the thread is defunct.  This is
     /// not done in the destructor to avoid race conditions.
     void stop();
 
-    /// @brief Sets the receiver error state
+    /// @brief Sets the error state
     ///
     /// This records the given error message and sets the error watch
     /// socket to ready.
@@ -101,7 +97,7 @@ public:
     /// @param error_msg
     void setError(const std::string& error_msg);
 
-    /// @brief Fetches the error message text for the most recent socket error
+    /// @brief Fetches the error message text for the most recent error
     ///
     /// @return string containing the error message
     std::string getLastError();
@@ -109,17 +105,17 @@ public:
     /// @brief Error message of the last error encountered
     std::string last_error_;
 
-    /// @brief DHCP watch sockets that are used to communicate with the owning thread
+    /// @brief WatchSockets that are used to communicate with the owning thread
     /// There are three:
-    /// -# RCV_ERROR - packet receive error watch socket.
-    /// Marked as ready when the DHCP packet receiver experiences an I/O error.
-    /// -# RCV_READY - Marked as ready when the DHCP packet receiver adds a packet
-    /// to the packet queue.
-    /// -# RCV_TERMINATE Packet receiver terminate watch socket.
-    /// Marked as ready when the DHCP packet receiver thread should terminate.
-    WatchSocket sockets_[RCV_TERMINATE + 1];
-
-    /// DHCP packet receiver thread.
+    /// -# ERROR - Marked as ready by the thread when it experiences an error.
+    /// -# READY - Marked as ready by the thread when it needs attention for a normal event
+    /// (e.g. a thread used to receive data would mark READY when it has data available)
+    /// -# TERMINATE - Marked as ready by WatchedThread owner to instruct the thread to
+    /// terminate.  Worker functions must monitor TERMINATa by periodically calling
+    /// @c shouldTerminate
+    WatchSocket sockets_[TERMINATE + 1];
+
+    /// @brief Current thread instance
     thread::ThreadPtr thread_ ;
 };