From: Thomas Markwalder Date: Tue, 20 Nov 2018 15:35:56 +0000 (-0500) Subject: [#260,!120] Addressed more review comments X-Git-Tag: 204-move-models-base~4^2~4 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=a55d229f1dcd6cf0362da1559fca87c89baacb45;p=thirdparty%2Fkea.git [#260,!120] Addressed more review comments Removed RCV_ prefix from WatchedThread::WatchtType members Scrubbed WatchedThread source for lingering packet/receiver comments Adjusted test wait times --- diff --git a/src/lib/dhcp/iface_mgr.cc b/src/lib/dhcp/iface_mgr.cc index a11a3daa50..d2e32d2233 100644 --- a/src/lib/dhcp/iface_mgr.cc +++ b/src/lib/dhcp/iface_mgr.cc @@ -991,10 +991,10 @@ Pkt4Ptr IfaceMgr::receive4Indirect(uint32_t timeout_sec, uint32_t timeout_usec / } // Add Receiver ready watch socket - addFDtoSet(dhcp_receiver_->getWatchFd(WatchedThread::RCV_READY), maxfd, &sockets); + addFDtoSet(dhcp_receiver_->getWatchFd(WatchedThread::READY), maxfd, &sockets); // Add Receiver error watch socket - addFDtoSet(dhcp_receiver_->getWatchFd(WatchedThread::RCV_ERROR), maxfd, &sockets); + addFDtoSet(dhcp_receiver_->getWatchFd(WatchedThread::ERROR), maxfd, &sockets); // Set timeout for our next select() call. If there are // no DHCP packets to read, then we'll wait for a finite @@ -1037,9 +1037,9 @@ Pkt4Ptr IfaceMgr::receive4Indirect(uint32_t timeout_sec, uint32_t timeout_usec / // We only check external sockets if select detected an event. if (result > 0) { // Check for receiver thread read errors. - if (dhcp_receiver_->isReady(WatchedThread::RCV_ERROR)) { + if (dhcp_receiver_->isReady(WatchedThread::ERROR)) { string msg = dhcp_receiver_->getLastError(); - dhcp_receiver_->clearReady(WatchedThread::RCV_ERROR); + dhcp_receiver_->clearReady(WatchedThread::ERROR); isc_throw(SocketReadError, msg); } @@ -1065,7 +1065,7 @@ Pkt4Ptr IfaceMgr::receive4Indirect(uint32_t timeout_sec, uint32_t timeout_usec / // If we're here it should only be because there are DHCP packets waiting. Pkt4Ptr pkt = getPacketQueue4()->dequeuePacket(); if (!pkt) { - dhcp_receiver_->clearReady(WatchedThread::RCV_READY); + dhcp_receiver_->clearReady(WatchedThread::READY); } return (pkt); @@ -1317,10 +1317,10 @@ IfaceMgr::receive6Indirect(uint32_t timeout_sec, uint32_t timeout_usec /* = 0 */ } // Add Receiver ready watch socket - addFDtoSet(dhcp_receiver_->getWatchFd(WatchedThread::RCV_READY), maxfd, &sockets); + addFDtoSet(dhcp_receiver_->getWatchFd(WatchedThread::READY), maxfd, &sockets); // Add Receiver error watch socket - addFDtoSet(dhcp_receiver_->getWatchFd(WatchedThread::RCV_ERROR), maxfd, &sockets); + addFDtoSet(dhcp_receiver_->getWatchFd(WatchedThread::ERROR), maxfd, &sockets); // Set timeout for our next select() call. If there are // no DHCP packets to read, then we'll wait for a finite @@ -1363,9 +1363,9 @@ IfaceMgr::receive6Indirect(uint32_t timeout_sec, uint32_t timeout_usec /* = 0 */ // We only check external sockets if select detected an event. if (result > 0) { // Check for receiver thread read errors. - if (dhcp_receiver_->isReady(WatchedThread::RCV_ERROR)) { + if (dhcp_receiver_->isReady(WatchedThread::ERROR)) { string msg = dhcp_receiver_->getLastError(); - dhcp_receiver_->clearReady(WatchedThread::RCV_ERROR); + dhcp_receiver_->clearReady(WatchedThread::ERROR); isc_throw(SocketReadError, msg); } @@ -1391,7 +1391,7 @@ IfaceMgr::receive6Indirect(uint32_t timeout_sec, uint32_t timeout_usec /* = 0 */ // If we're here it should only be because there are DHCP packets waiting. Pkt6Ptr pkt = getPacketQueue6()->dequeuePacket(); if (!pkt) { - dhcp_receiver_->clearReady(WatchedThread::RCV_READY); + dhcp_receiver_->clearReady(WatchedThread::READY); } return (pkt); @@ -1406,7 +1406,7 @@ IfaceMgr::receiveDHCP4Packets() { FD_ZERO(&sockets); // Add terminate watch socket. - addFDtoSet(dhcp_receiver_->getWatchFd(WatchedThread::RCV_TERMINATE), maxfd, &sockets); + addFDtoSet(dhcp_receiver_->getWatchFd(WatchedThread::TERMINATE), maxfd, &sockets); // Add Interface sockets. BOOST_FOREACH(iface, ifaces_) { @@ -1481,7 +1481,7 @@ IfaceMgr::receiveDHCP6Packets() { FD_ZERO(&sockets); // Add terminate watch socket. - addFDtoSet(dhcp_receiver_->getWatchFd(WatchedThread::RCV_TERMINATE), maxfd, &sockets); + addFDtoSet(dhcp_receiver_->getWatchFd(WatchedThread::TERMINATE), maxfd, &sockets); // Add Interface sockets. BOOST_FOREACH(iface, ifaces_) { @@ -1572,7 +1572,7 @@ IfaceMgr::receiveDHCP4Packet(Iface& iface, const SocketInfo& socket_info) { if (pkt) { getPacketQueue4()->enqueuePacket(pkt, socket_info); - dhcp_receiver_->markReady(WatchedThread::RCV_READY); + dhcp_receiver_->markReady(WatchedThread::READY); } } @@ -1603,7 +1603,7 @@ IfaceMgr::receiveDHCP6Packet(const SocketInfo& socket_info) { if (pkt) { getPacketQueue6()->enqueuePacket(pkt, socket_info); - dhcp_receiver_->markReady(WatchedThread::RCV_READY); + dhcp_receiver_->markReady(WatchedThread::READY); } } diff --git a/src/lib/dhcp/packet_queue_mgr.h b/src/lib/dhcp/packet_queue_mgr.h index fcc555d01e..892a1b472b 100644 --- a/src/lib/dhcp/packet_queue_mgr.h +++ b/src/lib/dhcp/packet_queue_mgr.h @@ -84,6 +84,10 @@ public: /// This function is used to remove the factory function for a given type. /// Typically, it would be called when unloading the hook library which /// loaded the type, and thus called by the library's @c unload function. + /// In addition to removing the factory, it will also destroy the current + /// queue if it is of the same queue-type as the factory being removed. + /// This avoids the nastiness that occurs when objecs are left in existence + /// after their library is unloaded. /// /// @param queue_type queue type, e.g. "kea-ring4". /// diff --git a/src/lib/util/threads/tests/watched_thread_unittest.cc b/src/lib/util/threads/tests/watched_thread_unittest.cc index 75310374a0..0cdc8ce9e1 100644 --- a/src/lib/util/threads/tests/watched_thread_unittest.cc +++ b/src/lib/util/threads/tests/watched_thread_unittest.cc @@ -36,7 +36,7 @@ public: /// @brief Sleeps for a given number of event periods sleep /// Each period is 50 ms. void nap(int periods) { - usleep(periods * 500 * 1000); + usleep(periods * 50 * 1000); }; /// @brief Worker function to be used by the WatchedThread's thread @@ -59,13 +59,13 @@ public: // On the second pass, set the event. if (passes_ == 2) { switch (watch_type) { - case WatchedThread::RCV_ERROR: + case WatchedThread::ERROR: wthread_->setError("we have an error"); break; - case WatchedThread::RCV_READY: + case WatchedThread::READY: wthread_->markReady(watch_type); break; - case WatchedThread::RCV_TERMINATE: + case WatchedThread::TERMINATE: default: // Do nothing, we're waiting to be told to stop. break; @@ -80,7 +80,7 @@ public: wthread_->setError("thread expired"); } - /// @brief Current receiver instance. + /// @brief Current WatchedThread instance. WatchedThreadPtr wthread_; /// @brief Counter used to track the number of passes made @@ -93,26 +93,26 @@ const int WatchedThreadTest::WORKER_MAX_PASSES = 5; /// Verifies the basic operation of the WatchedThread class. /// It checks that a WatchedThread can be created, can be stopped, /// and that in set and clear sockets. -TEST_F(WatchedThreadTest, receiverClassBasics) { +TEST_F(WatchedThreadTest, watchedThreadClassBasics) { - /// We'll create a receiver and let it run until it expires. (Note this is more - /// of a test of WatchedThreadTest itself and ensures our tests later for why we - /// exited are sound.) + /// We'll create a WatchedThread and let it run until it expires. (Note this is more + /// of a test of WatchedThreadTest itself and ensures that the assumptions made in + /// our other tests as to why threads have finished are sound. wthread_.reset(new WatchedThread()); ASSERT_FALSE(wthread_->isRunning()); - wthread_->start(boost::bind(&WatchedThreadTest::worker, this, WatchedThread::RCV_TERMINATE)); + wthread_->start(boost::bind(&WatchedThreadTest::worker, this, WatchedThread::TERMINATE)); ASSERT_TRUE(wthread_->isRunning()); - // Wait long enough for thread to expire. - nap(WORKER_MAX_PASSES + 1); + // Wait more long enough (we hope) for the thread to expire. + nap(WORKER_MAX_PASSES * 4); // It should have done the maximum number of passes. EXPECT_EQ(passes_, WORKER_MAX_PASSES); // Error should be ready and error text should be "thread expired". - ASSERT_TRUE(wthread_->isReady(WatchedThread::RCV_ERROR)); - ASSERT_FALSE(wthread_->isReady(WatchedThread::RCV_READY)); - ASSERT_FALSE(wthread_->isReady(WatchedThread::RCV_TERMINATE)); + ASSERT_TRUE(wthread_->isReady(WatchedThread::ERROR)); + ASSERT_FALSE(wthread_->isReady(WatchedThread::READY)); + ASSERT_FALSE(wthread_->isReady(WatchedThread::TERMINATE)); EXPECT_EQ("thread expired", wthread_->getLastError()); // Thread is technically still running, so let's stop it. @@ -121,17 +121,17 @@ TEST_F(WatchedThreadTest, receiverClassBasics) { ASSERT_FALSE(wthread_->isRunning()); /// Now we'll test stopping a thread. - /// Start the receiver, let it run a little and then tell it to stop. - wthread_->start(boost::bind(&WatchedThreadTest::worker, this, WatchedThread::RCV_TERMINATE)); + /// Start the WatchedThread, let it run a little and then tell it to stop. + wthread_->start(boost::bind(&WatchedThreadTest::worker, this, WatchedThread::TERMINATE)); ASSERT_TRUE(wthread_->isRunning()); // No watches should be ready. - ASSERT_FALSE(wthread_->isReady(WatchedThread::RCV_ERROR)); - ASSERT_FALSE(wthread_->isReady(WatchedThread::RCV_READY)); - ASSERT_FALSE(wthread_->isReady(WatchedThread::RCV_TERMINATE)); + ASSERT_FALSE(wthread_->isReady(WatchedThread::ERROR)); + ASSERT_FALSE(wthread_->isReady(WatchedThread::READY)); + ASSERT_FALSE(wthread_->isReady(WatchedThread::TERMINATE)); // Wait a little while. - nap(2); + nap(3); // Tell it to stop. wthread_->stop(); @@ -141,27 +141,27 @@ TEST_F(WatchedThreadTest, receiverClassBasics) { EXPECT_LT(passes_, WORKER_MAX_PASSES); // No watches should be ready. Error text should be "thread stopped". - ASSERT_FALSE(wthread_->isReady(WatchedThread::RCV_ERROR)); - ASSERT_FALSE(wthread_->isReady(WatchedThread::RCV_READY)); - ASSERT_FALSE(wthread_->isReady(WatchedThread::RCV_TERMINATE)); + ASSERT_FALSE(wthread_->isReady(WatchedThread::ERROR)); + ASSERT_FALSE(wthread_->isReady(WatchedThread::READY)); + ASSERT_FALSE(wthread_->isReady(WatchedThread::TERMINATE)); EXPECT_EQ("thread stopped", wthread_->getLastError()); // Next we'll test error notification. - // Start the receiver with a thread that sets an error on the second pass. - wthread_->start(boost::bind(&WatchedThreadTest::worker, this, WatchedThread::RCV_ERROR)); + // Start the WatchedThread with a thread that sets an error on the second pass. + wthread_->start(boost::bind(&WatchedThreadTest::worker, this, WatchedThread::ERROR)); ASSERT_TRUE(wthread_->isRunning()); // No watches should be ready. - ASSERT_FALSE(wthread_->isReady(WatchedThread::RCV_ERROR)); - ASSERT_FALSE(wthread_->isReady(WatchedThread::RCV_READY)); - ASSERT_FALSE(wthread_->isReady(WatchedThread::RCV_TERMINATE)); + ASSERT_FALSE(wthread_->isReady(WatchedThread::ERROR)); + ASSERT_FALSE(wthread_->isReady(WatchedThread::READY)); + ASSERT_FALSE(wthread_->isReady(WatchedThread::TERMINATE)); // Wait a little while. - nap(2); + nap(3); // It should now indicate an error. - ASSERT_TRUE(wthread_->isReady(WatchedThread::RCV_ERROR)); + ASSERT_TRUE(wthread_->isReady(WatchedThread::ERROR)); EXPECT_EQ("we have an error", wthread_->getLastError()); // Tell it to stop. @@ -172,27 +172,27 @@ TEST_F(WatchedThreadTest, receiverClassBasics) { EXPECT_LT(passes_, WORKER_MAX_PASSES); // No watches should be ready. Error text should be "thread stopped". - ASSERT_FALSE(wthread_->isReady(WatchedThread::RCV_ERROR)); - ASSERT_FALSE(wthread_->isReady(WatchedThread::RCV_READY)); - ASSERT_FALSE(wthread_->isReady(WatchedThread::RCV_TERMINATE)); + ASSERT_FALSE(wthread_->isReady(WatchedThread::ERROR)); + ASSERT_FALSE(wthread_->isReady(WatchedThread::READY)); + ASSERT_FALSE(wthread_->isReady(WatchedThread::TERMINATE)); EXPECT_EQ("thread stopped", wthread_->getLastError()); // Finally, we'll test data ready notification. - // We'll start the receiver with a thread that indicates data ready on its second pass. - wthread_->start(boost::bind(&WatchedThreadTest::worker, this, WatchedThread::RCV_READY)); + // We'll start the WatchedThread with a thread that indicates data ready on its second pass. + wthread_->start(boost::bind(&WatchedThreadTest::worker, this, WatchedThread::READY)); ASSERT_TRUE(wthread_->isRunning()); // No watches should be ready. - ASSERT_FALSE(wthread_->isReady(WatchedThread::RCV_ERROR)); - ASSERT_FALSE(wthread_->isReady(WatchedThread::RCV_READY)); - ASSERT_FALSE(wthread_->isReady(WatchedThread::RCV_TERMINATE)); + ASSERT_FALSE(wthread_->isReady(WatchedThread::ERROR)); + ASSERT_FALSE(wthread_->isReady(WatchedThread::READY)); + ASSERT_FALSE(wthread_->isReady(WatchedThread::TERMINATE)); // Wait a little while. - nap(2); + nap(3); // It should now indicate data ready. - ASSERT_TRUE(wthread_->isReady(WatchedThread::RCV_READY)); + ASSERT_TRUE(wthread_->isReady(WatchedThread::READY)); // Tell it to stop. wthread_->stop(); @@ -202,9 +202,9 @@ TEST_F(WatchedThreadTest, receiverClassBasics) { EXPECT_LT(passes_, WORKER_MAX_PASSES); // No watches should be ready. Error text should be "thread stopped". - ASSERT_FALSE(wthread_->isReady(WatchedThread::RCV_ERROR)); - ASSERT_FALSE(wthread_->isReady(WatchedThread::RCV_READY)); - ASSERT_FALSE(wthread_->isReady(WatchedThread::RCV_TERMINATE)); + ASSERT_FALSE(wthread_->isReady(WatchedThread::ERROR)); + ASSERT_FALSE(wthread_->isReady(WatchedThread::READY)); + ASSERT_FALSE(wthread_->isReady(WatchedThread::TERMINATE)); EXPECT_EQ("thread stopped", wthread_->getLastError()); } diff --git a/src/lib/util/threads/watched_thread.cc b/src/lib/util/threads/watched_thread.cc index 6925a4ce54..9f6b891a41 100644 --- a/src/lib/util/threads/watched_thread.cc +++ b/src/lib/util/threads/watched_thread.cc @@ -13,9 +13,9 @@ namespace thread { void WatchedThread::start(const boost::function& thread_main) { - clearReady(RCV_ERROR); - clearReady(RCV_READY); - clearReady(RCV_TERMINATE); + clearReady(ERROR); + clearReady(READY); + clearReady(TERMINATE); last_error_ = "no error"; thread_.reset(new Thread(thread_main)); } @@ -42,8 +42,8 @@ WatchedThread::clearReady(WatchType watch_type) { bool WatchedThread::shouldTerminate() { - if (sockets_[RCV_TERMINATE].isReady()) { - clearReady(RCV_TERMINATE); + if (sockets_[TERMINATE].isReady()) { + clearReady(TERMINATE); return (true); } @@ -53,20 +53,20 @@ WatchedThread::shouldTerminate() { void WatchedThread::stop() { if (thread_) { - markReady(RCV_TERMINATE); + markReady(TERMINATE); thread_->wait(); thread_.reset(); } - clearReady(RCV_ERROR); - clearReady(RCV_READY); + clearReady(ERROR); + clearReady(READY); last_error_ = "thread stopped"; } void WatchedThread::setError(const std::string& error_msg) { last_error_ = error_msg; - markReady(RCV_ERROR); + markReady(ERROR); } std::string diff --git a/src/lib/util/threads/watched_thread.h b/src/lib/util/threads/watched_thread.h index c9dd99cbce..d0243be032 100644 --- a/src/lib/util/threads/watched_thread.h +++ b/src/lib/util/threads/watched_thread.h @@ -16,7 +16,7 @@ namespace isc { namespace util { namespace thread { -/// @brief Provides a thread and controls for receiving packets. +/// @brief Provides a thread and controls for monitoring its activities /// /// Given a "worker function", this class creates a thread which /// runs the function and provides the means to monitor the thread @@ -28,9 +28,9 @@ public: /// @brief Enumerates the list of watch sockets used to mark events /// These are used as arguments to watch socket accessor methods. enum WatchType { - RCV_ERROR = 0, - RCV_READY = 1, - RCV_TERMINATE = 2 + ERROR = 0, + READY = 1, + TERMINATE = 2 }; /// @brief Constructor @@ -61,39 +61,35 @@ public: /// @param watch_type indicates which watch socket to clear void clearReady(WatchType watch_type); - /// @brief Checks if the receiver thread should terminate + /// @brief Checks if the thread should terminate /// - /// Performs a "one-shot" check of the receiver's terminate - /// watch socket. If it is ready, return true and then clear - /// it, otherwise return false. + /// Performs a "one-shot" check of the terminate watch socket. + /// If it is ready, return true and then clear it, otherwise + /// return false. /// /// @return true if the terminate watch socket is ready bool shouldTerminate(); /// @brief Creates and runs the thread. /// - /// Creates teh receiver's thread, passing into it the given - /// function to run. + /// Creates the thread, passing into it the given function to run. /// - /// @param thread_main function the receiver's thread should run + /// @param thread_main function the thread should run void start(const boost::function& thread_main); - /// @brief Returns true if the receiver thread is running - /// @todo - this may need additional logic to handle cases where - /// a thread function exits w/o the caller invoking @c - /// WatchedThread::stop(). + /// @brief Returns true if the thread is running bool isRunning() { return (thread_ != 0); } - /// @brief Terminates the receiver thread + /// @brief Terminates the thread /// /// It marks the terminate watch socket ready, and then waits for the - /// thread to stop. At this point, the receiver is defunct. This is + /// thread to stop. At this point, the thread is defunct. This is /// not done in the destructor to avoid race conditions. void stop(); - /// @brief Sets the receiver error state + /// @brief Sets the error state /// /// This records the given error message and sets the error watch /// socket to ready. @@ -101,7 +97,7 @@ public: /// @param error_msg void setError(const std::string& error_msg); - /// @brief Fetches the error message text for the most recent socket error + /// @brief Fetches the error message text for the most recent error /// /// @return string containing the error message std::string getLastError(); @@ -109,17 +105,17 @@ public: /// @brief Error message of the last error encountered std::string last_error_; - /// @brief DHCP watch sockets that are used to communicate with the owning thread + /// @brief WatchSockets that are used to communicate with the owning thread /// There are three: - /// -# RCV_ERROR - packet receive error watch socket. - /// Marked as ready when the DHCP packet receiver experiences an I/O error. - /// -# RCV_READY - Marked as ready when the DHCP packet receiver adds a packet - /// to the packet queue. - /// -# RCV_TERMINATE Packet receiver terminate watch socket. - /// Marked as ready when the DHCP packet receiver thread should terminate. - WatchSocket sockets_[RCV_TERMINATE + 1]; - - /// DHCP packet receiver thread. + /// -# ERROR - Marked as ready by the thread when it experiences an error. + /// -# READY - Marked as ready by the thread when it needs attention for a normal event + /// (e.g. a thread used to receive data would mark READY when it has data available) + /// -# TERMINATE - Marked as ready by WatchedThread owner to instruct the thread to + /// terminate. Worker functions must monitor TERMINATa by periodically calling + /// @c shouldTerminate + WatchSocket sockets_[TERMINATE + 1]; + + /// @brief Current thread instance thread::ThreadPtr thread_ ; };