From: Thomas Markwalder Date: Tue, 20 Nov 2018 11:31:38 +0000 (-0500) Subject: [#260,!120] Addressed more review comments X-Git-Tag: 204-move-models-base~4^2~6 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=37b2d32135fe57b1cd950fbb01e236b04c6ac622;p=thirdparty%2Fkea.git [#260,!120] Addressed more review comments doc/guide/congestion-handling.xml Added subsections to Congestion Handling chapter src/lib/dhcp/iface_mgr.* Removed Receiver class, replaced with new class lib::util::thread::WatchedThread Renamed IfaceMgr::receiver_ to dhcp_receiver Renamed add_fd to addFDtoSet src/lib/dhcp/packet_queue_mgr.h PacketQueueMgr::unregisterFactory() - destroys queue instance if it matches type being unregistered New files: src/lib/util/threads/ watched_thread.h watched_thread.cc src/lib/util/threads/tests watched_thread_unittest.cc --- diff --git a/doc/guide/congestion-handling.xml b/doc/guide/congestion-handling.xml index 89c5c4ac63..a8ea39af15 100644 --- a/doc/guide/congestion-handling.xml +++ b/doc/guide/congestion-handling.xml @@ -9,6 +9,8 @@ Congestion Handling in DHCPv4 and DHCPv6 +
+ What is Congestion? Congestion occurs when servers are subjected to client queries faster than they can be fulfilled. Subsequently, the servers begin accumulating a backlog of pending queries. The longer the high rate of @@ -42,7 +44,9 @@ relevant, or worse are redundant. In other words, the packets waiting in the FIFO socket buffers become increasingly stale. - +
+
+ Configuring Congestion Handling Kea 1.5 introduces a new feature referred to as Congestion Handling. Congestion handling offers the ability to configure the server to use a separate thread to read packets from the interface socket buffers. As the @@ -145,4 +149,5 @@ The number of parameters and plug-ins is expected to grow over time. +
diff --git a/src/lib/dhcp/iface_mgr.cc b/src/lib/dhcp/iface_mgr.cc index 91af356b01..a11a3daa50 100644 --- a/src/lib/dhcp/iface_mgr.cc +++ b/src/lib/dhcp/iface_mgr.cc @@ -179,69 +179,6 @@ bool Iface::delSocket(const uint16_t sockfd) { return (false); // socket not found } -void -Receiver::start(const boost::function& thread_main) { - clearReady(RCV_ERROR); - clearReady(RCV_READY); - clearReady(RCV_TERMINATE); - last_error_ = "no error"; - thread_.reset(new isc::util::thread::Thread(thread_main)); -} - -int -Receiver::getWatchFd(WatchType watch_type) { - return(sockets_[watch_type].getSelectFd()); -} - -void -Receiver::markReady(WatchType watch_type) { - sockets_[watch_type].markReady(); -} - -bool -Receiver::isReady(WatchType watch_type) { - return (sockets_[watch_type].isReady()); -} - -void -Receiver::clearReady(WatchType watch_type) { - sockets_[watch_type].clearReady(); -} - -bool -Receiver::shouldTerminate() { - if (sockets_[RCV_TERMINATE].isReady()) { - clearReady(RCV_TERMINATE); - return (true); - } - - return (false); -} - -void -Receiver::stop() { - if (thread_) { - markReady(RCV_TERMINATE); - thread_->wait(); - thread_.reset(); - } - - clearReady(RCV_ERROR); - clearReady(RCV_READY); - last_error_ = "thread stopped"; -} - -void -Receiver::setError(const std::string& error_msg) { - last_error_ = error_msg; - markReady(RCV_ERROR); -} - -std::string -Receiver::getLastError() { - return (last_error_); -} - IfaceMgr::IfaceMgr() :control_buf_len_(CMSG_SPACE(sizeof(struct in6_pktinfo))), control_buf_(new char[control_buf_len_]), @@ -354,11 +291,12 @@ void IfaceMgr::closeSockets() { } void IfaceMgr::stopDHCPReceiver() { - if (isReceiverRunning()) { - receiver_->stop(); - receiver_.reset(); + if (isDHCPReceiverRunning()) { + dhcp_receiver_->stop(); } + dhcp_receiver_.reset(); + if (getPacketQueue4()) { getPacketQueue4()->clear(); } @@ -743,29 +681,31 @@ IfaceMgr::openSockets6(const uint16_t port, void IfaceMgr::startDHCPReceiver(const uint16_t family) { - if (isReceiverRunning()) { + if (isDHCPReceiverRunning()) { isc_throw(InvalidOperation, "a receiver thread already exists"); } switch (family) { case AF_INET: - // If there's no queue, then has been disabled, simply return. + // If the queue doesn't exist, packet queing has been configured + // as disabled. If there is no queue, we do not create a reciever. if(!getPacketQueue4()) { - return; + return; } - receiver_.reset(new Receiver()); - receiver_->start(boost::bind(boost::bind(&IfaceMgr::receiveDHCP4Packets, this))); + dhcp_receiver_.reset(new WatchedThread()); + dhcp_receiver_->start(boost::bind(&IfaceMgr::receiveDHCP4Packets, this)); break; case AF_INET6: - // If there's no queue, then has been disabled, simply return. + // If the queue doesn't exist, packet queing has been configured + // as disabled. If there is no queue, we do not create a reciever. if(!getPacketQueue6()) { return; } - receiver_.reset(new Receiver()); - receiver_->start(boost::bind(boost::bind(&IfaceMgr::receiveDHCP6Packets, this))); + dhcp_receiver_.reset(new WatchedThread()); + dhcp_receiver_->start(boost::bind(&IfaceMgr::receiveDHCP6Packets, this)); break; default: isc_throw (BadValue, "startDHCPReceiver: invalid family: " << family); @@ -1024,7 +964,7 @@ IfaceMgr::send(const Pkt4Ptr& pkt) { } Pkt4Ptr IfaceMgr::receive4(uint32_t timeout_sec, uint32_t timeout_usec /* = 0 */) { - if (isReceiverRunning()) { + if (isDHCPReceiverRunning()) { return (receive4Indirect(timeout_sec, timeout_usec)); } @@ -1046,15 +986,15 @@ Pkt4Ptr IfaceMgr::receive4Indirect(uint32_t timeout_sec, uint32_t timeout_usec / // if there are any callbacks for external sockets registered... if (!callbacks_.empty()) { BOOST_FOREACH(SocketCallbackInfo s, callbacks_) { - add_fd(s.socket_, maxfd, &sockets); + addFDtoSet(s.socket_, maxfd, &sockets); } } // Add Receiver ready watch socket - add_fd(receiver_->getWatchFd(Receiver::RCV_READY), maxfd, &sockets); + addFDtoSet(dhcp_receiver_->getWatchFd(WatchedThread::RCV_READY), maxfd, &sockets); // Add Receiver error watch socket - add_fd(receiver_->getWatchFd(Receiver::RCV_ERROR), maxfd, &sockets); + addFDtoSet(dhcp_receiver_->getWatchFd(WatchedThread::RCV_ERROR), maxfd, &sockets); // Set timeout for our next select() call. If there are // no DHCP packets to read, then we'll wait for a finite @@ -1097,9 +1037,9 @@ Pkt4Ptr IfaceMgr::receive4Indirect(uint32_t timeout_sec, uint32_t timeout_usec / // We only check external sockets if select detected an event. if (result > 0) { // Check for receiver thread read errors. - if (receiver_->isReady(Receiver::RCV_ERROR)) { - string msg = receiver_->getLastError(); - receiver_->clearReady(Receiver::RCV_ERROR); + if (dhcp_receiver_->isReady(WatchedThread::RCV_ERROR)) { + string msg = dhcp_receiver_->getLastError(); + dhcp_receiver_->clearReady(WatchedThread::RCV_ERROR); isc_throw(SocketReadError, msg); } @@ -1125,7 +1065,7 @@ Pkt4Ptr IfaceMgr::receive4Indirect(uint32_t timeout_sec, uint32_t timeout_usec / // If we're here it should only be because there are DHCP packets waiting. Pkt4Ptr pkt = getPacketQueue4()->dequeuePacket(); if (!pkt) { - receiver_->clearReady(Receiver::RCV_READY); + dhcp_receiver_->clearReady(WatchedThread::RCV_READY); } return (pkt); @@ -1152,7 +1092,7 @@ Pkt4Ptr IfaceMgr::receive4Direct(uint32_t timeout_sec, uint32_t timeout_usec /* // Only deal with IPv4 addresses. if (s.addr_.isV4()) { // Add this socket to listening set - add_fd(s.sockfd_, maxfd, &sockets); + addFDtoSet(s.sockfd_, maxfd, &sockets); } } } @@ -1161,7 +1101,7 @@ Pkt4Ptr IfaceMgr::receive4Direct(uint32_t timeout_sec, uint32_t timeout_usec /* if (!callbacks_.empty()) { BOOST_FOREACH(SocketCallbackInfo s, callbacks_) { // Add this socket to listening set - add_fd(s.socket_, maxfd, &sockets); + addFDtoSet(s.socket_, maxfd, &sockets); } } @@ -1235,7 +1175,7 @@ Pkt4Ptr IfaceMgr::receive4Direct(uint32_t timeout_sec, uint32_t timeout_usec /* Pkt6Ptr IfaceMgr::receive6(uint32_t timeout_sec, uint32_t timeout_usec /* = 0 */) { - if (isReceiverRunning()) { + if (isDHCPReceiverRunning()) { return (receive6Indirect(timeout_sec, timeout_usec)); } @@ -1243,9 +1183,9 @@ IfaceMgr::receive6(uint32_t timeout_sec, uint32_t timeout_usec /* = 0 */) { } void -IfaceMgr::add_fd(int fd, int& maxfd, fd_set* sockets) { +IfaceMgr::addFDtoSet(int fd, int& maxfd, fd_set* sockets) { if (!sockets) { - isc_throw(BadValue, "add_fd: sockets can't be null"); + isc_throw(BadValue, "addFDtoSet: sockets can't be null"); } FD_SET(fd, sockets); @@ -1276,7 +1216,7 @@ IfaceMgr::receive6Direct(uint32_t timeout_sec, uint32_t timeout_usec /* = 0 */ ) // Only deal with IPv6 addresses. if (s.addr_.isV6()) { // Add this socket to listening set - add_fd(s.sockfd_, maxfd, &sockets); + addFDtoSet(s.sockfd_, maxfd, &sockets); } } } @@ -1285,7 +1225,7 @@ IfaceMgr::receive6Direct(uint32_t timeout_sec, uint32_t timeout_usec /* = 0 */ ) if (!callbacks_.empty()) { BOOST_FOREACH(SocketCallbackInfo s, callbacks_) { // Add it to the set as well - add_fd(s.socket_, maxfd, &sockets); + addFDtoSet(s.socket_, maxfd, &sockets); } } @@ -1372,15 +1312,15 @@ IfaceMgr::receive6Indirect(uint32_t timeout_sec, uint32_t timeout_usec /* = 0 */ if (!callbacks_.empty()) { BOOST_FOREACH(SocketCallbackInfo s, callbacks_) { // Add it to the set as well - add_fd(s.socket_, maxfd, &sockets); + addFDtoSet(s.socket_, maxfd, &sockets); } } // Add Receiver ready watch socket - add_fd(receiver_->getWatchFd(Receiver::RCV_READY), maxfd, &sockets); + addFDtoSet(dhcp_receiver_->getWatchFd(WatchedThread::RCV_READY), maxfd, &sockets); // Add Receiver error watch socket - add_fd(receiver_->getWatchFd(Receiver::RCV_ERROR), maxfd, &sockets); + addFDtoSet(dhcp_receiver_->getWatchFd(WatchedThread::RCV_ERROR), maxfd, &sockets); // Set timeout for our next select() call. If there are // no DHCP packets to read, then we'll wait for a finite @@ -1423,9 +1363,9 @@ IfaceMgr::receive6Indirect(uint32_t timeout_sec, uint32_t timeout_usec /* = 0 */ // We only check external sockets if select detected an event. if (result > 0) { // Check for receiver thread read errors. - if (receiver_->isReady(Receiver::RCV_ERROR)) { - string msg = receiver_->getLastError(); - receiver_->clearReady(Receiver::RCV_ERROR); + if (dhcp_receiver_->isReady(WatchedThread::RCV_ERROR)) { + string msg = dhcp_receiver_->getLastError(); + dhcp_receiver_->clearReady(WatchedThread::RCV_ERROR); isc_throw(SocketReadError, msg); } @@ -1451,7 +1391,7 @@ IfaceMgr::receive6Indirect(uint32_t timeout_sec, uint32_t timeout_usec /* = 0 */ // If we're here it should only be because there are DHCP packets waiting. Pkt6Ptr pkt = getPacketQueue6()->dequeuePacket(); if (!pkt) { - receiver_->clearReady(Receiver::RCV_READY); + dhcp_receiver_->clearReady(WatchedThread::RCV_READY); } return (pkt); @@ -1466,7 +1406,7 @@ IfaceMgr::receiveDHCP4Packets() { FD_ZERO(&sockets); // Add terminate watch socket. - add_fd(receiver_->getWatchFd(Receiver::RCV_TERMINATE), maxfd, &sockets); + addFDtoSet(dhcp_receiver_->getWatchFd(WatchedThread::RCV_TERMINATE), maxfd, &sockets); // Add Interface sockets. BOOST_FOREACH(iface, ifaces_) { @@ -1474,14 +1414,14 @@ IfaceMgr::receiveDHCP4Packets() { // Only deal with IPv4 addresses. if (s.addr_.isV4()) { // Add this socket to listening set. - add_fd(s.sockfd_, maxfd, &sockets); + addFDtoSet(s.sockfd_, maxfd, &sockets); } } } for (;;) { // Check the watch socket. - if (receiver_->shouldTerminate()) { + if (dhcp_receiver_->shouldTerminate()) { return; } @@ -1495,7 +1435,7 @@ IfaceMgr::receiveDHCP4Packets() { int result = select(maxfd + 1, &rd_set, 0, 0, 0); // Re-check the watch socket. - if (receiver_->shouldTerminate()) { + if (dhcp_receiver_->shouldTerminate()) { return; } @@ -1507,7 +1447,7 @@ IfaceMgr::receiveDHCP4Packets() { // This thread should not get signals? if (errno != EINTR) { // Signal the error to receive4. - receiver_->setError(strerror(errno)); + dhcp_receiver_->setError(strerror(errno)); // We need to sleep in case of the error condition to // prevent the thread from tight looping when result // gets negative. @@ -1522,7 +1462,7 @@ IfaceMgr::receiveDHCP4Packets() { if (FD_ISSET(s.sockfd_, &sockets)) { receiveDHCP4Packet(*iface, s); // Can take time so check one more time the watch socket. - if (receiver_->shouldTerminate()) { + if (dhcp_receiver_->shouldTerminate()) { return; } } @@ -1541,7 +1481,7 @@ IfaceMgr::receiveDHCP6Packets() { FD_ZERO(&sockets); // Add terminate watch socket. - add_fd(receiver_->getWatchFd(Receiver::RCV_TERMINATE), maxfd, &sockets); + addFDtoSet(dhcp_receiver_->getWatchFd(WatchedThread::RCV_TERMINATE), maxfd, &sockets); // Add Interface sockets. BOOST_FOREACH(iface, ifaces_) { @@ -1549,14 +1489,14 @@ IfaceMgr::receiveDHCP6Packets() { // Only deal with IPv6 addresses. if (s.addr_.isV6()) { // Add this socket to listening set. - add_fd(s.sockfd_ , maxfd, &sockets); + addFDtoSet(s.sockfd_ , maxfd, &sockets); } } } for (;;) { // Check the watch socket. - if (receiver_->shouldTerminate()) { + if (dhcp_receiver_->shouldTerminate()) { return; } @@ -1570,7 +1510,7 @@ IfaceMgr::receiveDHCP6Packets() { int result = select(maxfd + 1, &rd_set, 0, 0, 0); // Re-check the watch socket. - if (receiver_->shouldTerminate()) { + if (dhcp_receiver_->shouldTerminate()) { return; } @@ -1581,7 +1521,7 @@ IfaceMgr::receiveDHCP6Packets() { // This thread should not get signals? if (errno != EINTR) { // Signal the error to receive6. - receiver_->setError(strerror(errno)); + dhcp_receiver_->setError(strerror(errno)); // We need to sleep in case of the error condition to // prevent the thread from tight looping when result // gets negative. @@ -1596,7 +1536,7 @@ IfaceMgr::receiveDHCP6Packets() { if (FD_ISSET(s.sockfd_, &sockets)) { receiveDHCP6Packet(s); // Can take time so check one more time the watch socket. - if (receiver_->shouldTerminate()) { + if (dhcp_receiver_->shouldTerminate()) { return; } } @@ -1612,7 +1552,7 @@ IfaceMgr::receiveDHCP4Packet(Iface& iface, const SocketInfo& socket_info) { int result = ioctl(socket_info.sockfd_, FIONREAD, &len); if (result < 0) { // Signal the error to receive4. - receiver_->setError(strerror(errno)); + dhcp_receiver_->setError(strerror(errno)); return; } if (len == 0) { @@ -1625,14 +1565,14 @@ IfaceMgr::receiveDHCP4Packet(Iface& iface, const SocketInfo& socket_info) { try { pkt = packet_filter_->receive(iface, socket_info); } catch (const std::exception& ex) { - receiver_->setError(strerror(errno)); + dhcp_receiver_->setError(strerror(errno)); } catch (...) { - receiver_->setError("packet filter receive() failed"); + dhcp_receiver_->setError("packet filter receive() failed"); } if (pkt) { getPacketQueue4()->enqueuePacket(pkt, socket_info); - receiver_->markReady(Receiver::RCV_READY); + dhcp_receiver_->markReady(WatchedThread::RCV_READY); } } @@ -1643,7 +1583,7 @@ IfaceMgr::receiveDHCP6Packet(const SocketInfo& socket_info) { int result = ioctl(socket_info.sockfd_, FIONREAD, &len); if (result < 0) { // Signal the error to receive6. - receiver_->setError(strerror(errno)); + dhcp_receiver_->setError(strerror(errno)); return; } if (len == 0) { @@ -1656,14 +1596,14 @@ IfaceMgr::receiveDHCP6Packet(const SocketInfo& socket_info) { try { pkt = packet_filter6_->receive(socket_info); } catch (const std::exception& ex) { - receiver_->setError(ex.what()); + dhcp_receiver_->setError(ex.what()); } catch (...) { - receiver_->setError("packet filter receive() failed"); + dhcp_receiver_->setError("packet filter receive() failed"); } if (pkt) { getPacketQueue6()->enqueuePacket(pkt, socket_info); - receiver_->markReady(Receiver::RCV_READY); + dhcp_receiver_->markReady(WatchedThread::RCV_READY); } } @@ -1759,9 +1699,9 @@ IfaceMgr::getSocket(isc::dhcp::Pkt4 const& pkt) { bool IfaceMgr::configureDHCPPacketQueue(uint16_t family, data::ConstElementPtr queue_control) { - if (isReceiverRunning()) { + if (isDHCPReceiverRunning()) { isc_throw(InvalidOperation, "Cannot reconfigure queueing" - " while receiver thread is running"); + " while DHCP receiver thread is running"); } bool enable_queue = false; diff --git a/src/lib/dhcp/iface_mgr.h b/src/lib/dhcp/iface_mgr.h index af9d4cfce3..43fc908815 100644 --- a/src/lib/dhcp/iface_mgr.h +++ b/src/lib/dhcp/iface_mgr.h @@ -18,13 +18,12 @@ #include #include #include -#include +#include #include #include #include #include -#include #include #include @@ -457,116 +456,6 @@ private: typedef boost::shared_ptr IfacePtr; -/// @brief Provides a thread and controls for receiving packets. -/// -/// Given a "worker function", this class creates a thread which -/// runs the function and provides the means to monitor the thread -/// for "error" and "ready" conditions, and finally to stop the thread. -/// It uses three WatchSockets: one to indicate an error, one to indicate -/// data is ready, and a third to monitor as a shut-down command. -class Receiver { -public: - /// @brief Enumerates the list of watch sockets used to mark events - /// These are used as arguments to watch socket accessor methods. - enum WatchType { - RCV_ERROR = 0, - RCV_READY = 1, - RCV_TERMINATE = 2 - }; - - /// @brief Constructor - Receiver(){}; - - /// @brief Virtual destructor - virtual ~Receiver(){} - - /// @brief Fetches the fd of a watch socket - /// - /// @param watch_type indicates which watch socket - /// @return the watch socket's file descriptor - int getWatchFd(WatchType watch_type); - - /// @brief Sets a watch socket state to ready - /// - /// @param watch_type indicates which watch socket to mark - void markReady(WatchType watch_type); - - /// @brief Indicates if a watch socket state is ready - /// - /// @param watch_type indicates which watch socket to mark - /// @return true if the watch socket is ready, false otherwise - bool isReady(WatchType watch_type); - - /// @brief Sets a watch socket state to not ready - /// - /// @param watch_type indicates which watch socket to clear - void clearReady(WatchType watch_type); - - /// @brief Checks if the receiver thread should terminate - /// - /// Performs a "one-shot" check of the receiver's terminate - /// watch socket. If it is ready, return true and then clear - /// it, otherwise return false. - /// - /// @return true if the terminate watch socket is ready - bool shouldTerminate(); - - /// @brief Creates and runs the thread. - /// - /// Creates teh receiver's thread, passing into it the given - /// function to run. - /// - /// @param thread_main function the receiver's thread should run - void start(const boost::function& thread_main); - - /// @brief Returns true if the receiver thread is running - /// @todo - this may need additional logic to handle cases where - /// a thread function exits w/o the caller invoking @c - /// Receiver::stop(). - bool isRunning() { - return (thread_ != 0); - } - - /// @brief Terminates the receiver thread - /// - /// It marks the terminate watch socket ready, and then waits for the - /// thread to stop. At this point, the receiver is defunct. This is - /// not done in the destructor to avoid race conditions. - void stop(); - - /// @brief Sets the receiver error state - /// - /// This records the given error message and sets the error watch - /// socket to ready. - /// - /// @param error_msg - void setError(const std::string& error_msg); - - /// @brief Fetches the error message text for the most recent socket error - /// - /// @return string containing the error message - std::string getLastError(); - - /// @brief Error message of the last error encountered - std::string last_error_; - - /// @brief DHCP watch sockets that are used to communicate with the owning thread - /// There are three: - /// -# RCV_ERROR - packet receive error watch socket. - /// Marked as ready when the DHCP packet receiver experiences an I/O error. - /// -# RCV_READY - Marked as ready when the DHCP packet receiver adds a packet - /// to the packet queue. - /// -# RCV_TERMINATE Packet receiver terminate watch socket. - /// Marked as ready when the DHCP packet receiver thread should terminate. - isc::util::WatchSocket sockets_[RCV_TERMINATE + 1]; - - /// DHCP packet receiver thread. - isc::util::thread::ThreadPtr thread_ ; -}; - -/// @brief Defines a pointer to a Receiver -typedef boost::shared_ptr ReceiverPtr; - /// @brief Forward declaration to the @c IfaceMgr. class IfaceMgr; @@ -1181,8 +1070,8 @@ public: /// @brief Returns true if there is a receiver exists and its /// thread is currently running. - bool isReceiverRunning() const { - return (receiver_ != 0 && receiver_->isRunning()); + bool isDHCPReceiverRunning() const { + return (dhcp_receiver_ != 0 && dhcp_receiver_->isRunning()); } /// @brief Configures DHCP packet queue @@ -1208,7 +1097,7 @@ public: /// larger than it's current value, it will be updated to new fd value /// @param sockets pointer to the set of sockets /// @throw BadValue if sockets is null - static void add_fd(int fd, int& maxfd, fd_set* sockets); + static void addFDtoSet(int fd, int& maxfd, fd_set* sockets); // don't use private, we need derived classes in tests protected: @@ -1491,7 +1380,7 @@ private: PacketQueueMgr6Ptr packet_queue_mgr6_; /// DHCP packet receiver. - ReceiverPtr receiver_; + isc::util::thread::WatchedThreadPtr dhcp_receiver_; }; }; // namespace isc::dhcp diff --git a/src/lib/dhcp/packet_queue_mgr.h b/src/lib/dhcp/packet_queue_mgr.h index 6eaf6c4b21..fcc555d01e 100644 --- a/src/lib/dhcp/packet_queue_mgr.h +++ b/src/lib/dhcp/packet_queue_mgr.h @@ -93,14 +93,21 @@ public: // Look for it. auto index = factories_.find(queue_type); - // If it's there remove it - if (index != factories_.end()) { - factories_.erase(index); - return (true); + // Not there so nothing to do. + if (index == factories_.end()) { + return (false); + } + // If the queue is of the type being unregistered, then remove it. We don't + // a queue instance outliving its library. + if ((packet_queue_) && (packet_queue_->getQueueType() == queue_type)) { + packet_queue_.reset(); } - return (false); + // Remove the factory. + factories_.erase(index); + + return (true); } /// @brief Create an instance of a packet queue. diff --git a/src/lib/dhcp/tests/iface_mgr_unittest.cc b/src/lib/dhcp/tests/iface_mgr_unittest.cc index 9f240a9b0b..6478e17349 100644 --- a/src/lib/dhcp/tests/iface_mgr_unittest.cc +++ b/src/lib/dhcp/tests/iface_mgr_unittest.cc @@ -484,13 +484,13 @@ public: // Thread should only start when there is a packet queue. ASSERT_NO_THROW(ifacemgr->startDHCPReceiver(AF_INET6)); - ASSERT_TRUE(queue_enabled == ifacemgr->isReceiverRunning()); + ASSERT_TRUE(queue_enabled == ifacemgr->isDHCPReceiverRunning()); // If the thread is already running, trying to start it again should fail. if (queue_enabled) { ASSERT_THROW(ifacemgr->startDHCPReceiver(AF_INET6), InvalidOperation); // Should still have one running. - ASSERT_TRUE(ifacemgr->isReceiverRunning()); + ASSERT_TRUE(ifacemgr->isDHCPReceiverRunning()); } // Let's build our DHCPv6 packet. @@ -531,7 +531,7 @@ public: // Stop the thread. This should be no harm/no foul if we're not // queueuing. Either way, we should not have a thread afterwards. ASSERT_NO_THROW(ifacemgr->stopDHCPReceiver()); - ASSERT_FALSE(ifacemgr->isReceiverRunning()); + ASSERT_FALSE(ifacemgr->isDHCPReceiverRunning()); } @@ -570,13 +570,13 @@ public: // Thread should only start when there is a packet queue. ASSERT_NO_THROW(ifacemgr->startDHCPReceiver(AF_INET)); - ASSERT_TRUE(queue_enabled == ifacemgr->isReceiverRunning()); + ASSERT_TRUE(queue_enabled == ifacemgr->isDHCPReceiverRunning()); // If the thread is already running, trying to start it again should fail. if (queue_enabled) { ASSERT_THROW(ifacemgr->startDHCPReceiver(AF_INET), InvalidOperation); // Should still have one running. - ASSERT_TRUE(ifacemgr->isReceiverRunning()); + ASSERT_TRUE(ifacemgr->isDHCPReceiverRunning()); } // Let's construct the packet to send. @@ -660,7 +660,7 @@ public: // Stop the thread. This should be no harm/no foul if we're not // queueuing. Either way, we should not have a thread afterwards. ASSERT_NO_THROW(ifacemgr->stopDHCPReceiver()); - ASSERT_FALSE(ifacemgr->isReceiverRunning()); + ASSERT_FALSE(ifacemgr->isDHCPReceiverRunning()); } /// Holds the invocation counter for ifaceMgrErrorHandler. @@ -3047,7 +3047,7 @@ TEST_F(IfaceMgrTest, configureDHCPPacketQueueTest4) { // First let's make sure there is no queue and no thread. ASSERT_FALSE(ifacemgr->getPacketQueue4()); - ASSERT_FALSE(ifacemgr->isReceiverRunning()); + ASSERT_FALSE(ifacemgr->isDHCPReceiverRunning()); bool queue_enabled = false; // Given an empty pointer, we should default to no queue. @@ -3056,11 +3056,11 @@ TEST_F(IfaceMgrTest, configureDHCPPacketQueueTest4) { EXPECT_FALSE(queue_enabled); EXPECT_FALSE(ifacemgr->getPacketQueue4()); // configureDHCPPacketQueue() should never start the thread. - ASSERT_FALSE(ifacemgr->isReceiverRunning()); + ASSERT_FALSE(ifacemgr->isDHCPReceiverRunning()); // Verify that calling startDHCPReceiver with no queue, does NOT start the thread. ASSERT_NO_THROW(ifacemgr->startDHCPReceiver(AF_INET)); - ASSERT_FALSE(ifacemgr->isReceiverRunning()); + ASSERT_FALSE(ifacemgr->isDHCPReceiverRunning()); // Now let's try with a populated queue control, but with enable-queue = false. queue_control = makeQueueConfig(PacketQueueMgr4::DEFAULT_QUEUE_TYPE4, 500, false); @@ -3068,7 +3068,7 @@ TEST_F(IfaceMgrTest, configureDHCPPacketQueueTest4) { EXPECT_FALSE(queue_enabled); EXPECT_FALSE(ifacemgr->getPacketQueue4()); // configureDHCPPacketQueue() should never start the thread. - ASSERT_FALSE(ifacemgr->isReceiverRunning()); + ASSERT_FALSE(ifacemgr->isDHCPReceiverRunning()); // Now let's enable the queue. queue_control = makeQueueConfig(PacketQueueMgr4::DEFAULT_QUEUE_TYPE4, 500, true); @@ -3078,11 +3078,11 @@ TEST_F(IfaceMgrTest, configureDHCPPacketQueueTest4) { CHECK_QUEUE_INFO(ifacemgr->getPacketQueue4(), "{ \"capacity\": 500, \"queue-type\": \"" << PacketQueueMgr4::DEFAULT_QUEUE_TYPE4 << "\", \"size\": 0 }"); // configureDHCPPacketQueue() should never start the thread. - ASSERT_FALSE(ifacemgr->isReceiverRunning()); + ASSERT_FALSE(ifacemgr->isDHCPReceiverRunning()); // Calling startDHCPReceiver with a queue, should start the thread. ASSERT_NO_THROW(ifacemgr->startDHCPReceiver(AF_INET)); - ASSERT_TRUE(ifacemgr->isReceiverRunning()); + ASSERT_TRUE(ifacemgr->isDHCPReceiverRunning()); // Verify that calling startDHCPReceiver when the thread is running, throws. ASSERT_THROW(ifacemgr->startDHCPReceiver(AF_INET), InvalidOperation); @@ -3096,18 +3096,18 @@ TEST_F(IfaceMgrTest, configureDHCPPacketQueueTest4) { // We should still have our queue and the thread should still be running. EXPECT_TRUE(ifacemgr->getPacketQueue4()); - ASSERT_TRUE(ifacemgr->isReceiverRunning()); + ASSERT_TRUE(ifacemgr->isDHCPReceiverRunning()); // Now let's stop stop the thread. ASSERT_NO_THROW(ifacemgr->stopDHCPReceiver()); - ASSERT_FALSE(ifacemgr->isReceiverRunning()); + ASSERT_FALSE(ifacemgr->isDHCPReceiverRunning()); // Stopping the thread should not destroy the queue. ASSERT_TRUE(ifacemgr->getPacketQueue4()); // Reconfigure with the queue turned off. We should have neither queue nor thread. ASSERT_NO_THROW(queue_enabled = ifacemgr->configureDHCPPacketQueue(AF_INET, queue_control)); EXPECT_FALSE(ifacemgr->getPacketQueue4()); - ASSERT_FALSE(ifacemgr->isReceiverRunning()); + ASSERT_FALSE(ifacemgr->isDHCPReceiverRunning()); } // Verifies DHCPv6 behavior of configureDHCPPacketQueue() @@ -3116,7 +3116,7 @@ TEST_F(IfaceMgrTest, configureDHCPPacketQueueTest6) { // First let's make sure there is no queue and no thread. ASSERT_FALSE(ifacemgr->getPacketQueue6()); - ASSERT_FALSE(ifacemgr->isReceiverRunning()); + ASSERT_FALSE(ifacemgr->isDHCPReceiverRunning()); bool queue_enabled = false; // Given an empty pointer, we should default to no queue. @@ -3125,11 +3125,11 @@ TEST_F(IfaceMgrTest, configureDHCPPacketQueueTest6) { EXPECT_FALSE(queue_enabled); EXPECT_FALSE(ifacemgr->getPacketQueue6()); // configureDHCPPacketQueue() should never start the thread. - ASSERT_FALSE(ifacemgr->isReceiverRunning()); + ASSERT_FALSE(ifacemgr->isDHCPReceiverRunning()); // Verify that calling startDHCPReceiver with no queue, does NOT start the thread. ASSERT_NO_THROW(ifacemgr->startDHCPReceiver(AF_INET)); - ASSERT_FALSE(ifacemgr->isReceiverRunning()); + ASSERT_FALSE(ifacemgr->isDHCPReceiverRunning()); // Now let's try with a populated queue control, but with enable-queue = false. queue_control = makeQueueConfig(PacketQueueMgr6::DEFAULT_QUEUE_TYPE6, 500, false); @@ -3137,7 +3137,7 @@ TEST_F(IfaceMgrTest, configureDHCPPacketQueueTest6) { EXPECT_FALSE(queue_enabled); EXPECT_FALSE(ifacemgr->getPacketQueue6()); // configureDHCPPacketQueue() should never start the thread. - ASSERT_FALSE(ifacemgr->isReceiverRunning()); + ASSERT_FALSE(ifacemgr->isDHCPReceiverRunning()); // Now let's enable the queue. queue_control = makeQueueConfig(PacketQueueMgr6::DEFAULT_QUEUE_TYPE6, 500, true); @@ -3147,11 +3147,11 @@ TEST_F(IfaceMgrTest, configureDHCPPacketQueueTest6) { CHECK_QUEUE_INFO(ifacemgr->getPacketQueue6(), "{ \"capacity\": 500, \"queue-type\": \"" << PacketQueueMgr6::DEFAULT_QUEUE_TYPE6 << "\", \"size\": 0 }"); // configureDHCPPacketQueue() should never start the thread. - ASSERT_FALSE(ifacemgr->isReceiverRunning()); + ASSERT_FALSE(ifacemgr->isDHCPReceiverRunning()); // Calling startDHCPReceiver with a queue, should start the thread. ASSERT_NO_THROW(ifacemgr->startDHCPReceiver(AF_INET6)); - ASSERT_TRUE(ifacemgr->isReceiverRunning()); + ASSERT_TRUE(ifacemgr->isDHCPReceiverRunning()); // Verify that calling startDHCPReceiver when the thread is running, throws. ASSERT_THROW(ifacemgr->startDHCPReceiver(AF_INET6), InvalidOperation); @@ -3165,206 +3165,18 @@ TEST_F(IfaceMgrTest, configureDHCPPacketQueueTest6) { // We should still have our queue and the thread should still be running. EXPECT_TRUE(ifacemgr->getPacketQueue6()); - ASSERT_TRUE(ifacemgr->isReceiverRunning()); + ASSERT_TRUE(ifacemgr->isDHCPReceiverRunning()); // Now let's stop stop the thread. ASSERT_NO_THROW(ifacemgr->stopDHCPReceiver()); - ASSERT_FALSE(ifacemgr->isReceiverRunning()); + ASSERT_FALSE(ifacemgr->isDHCPReceiverRunning()); // Stopping the thread should not destroy the queue. ASSERT_TRUE(ifacemgr->getPacketQueue6()); // Reconfigure with the queue turned off. We should have neither queue nor thread. ASSERT_NO_THROW(queue_enabled = ifacemgr->configureDHCPPacketQueue(AF_INET6, queue_control)); EXPECT_FALSE(ifacemgr->getPacketQueue6()); - ASSERT_FALSE(ifacemgr->isReceiverRunning()); -} - -/// @brief Test Fixture for testing isc:dhcp::Receiver -class ReceiverTest : public ::testing::Test { -public: - /// @brief Maximum number of passes allowed in worker event loop - static const int WORKER_MAX_PASSES; - - /// @brief Constructor. - ReceiverTest() {} - - /// @brief Destructor. - ~ReceiverTest() { - } - - /// @brief Sleeps for a given number of event periods sleep - /// Each period is 50 ms. - void nap(int periods) { - usleep(periods * 50 * 1000); - }; - - /// @brief Worker function to be used by the Receiver's thread - /// - /// The function runs 5 passes through an "event" loop. - /// On each pass: - /// - check terminate command - /// - instigate the desired event (second pass only) - /// - naps for 1 period (50ms) - /// - /// @param watch_type type of event that should occur - void worker(Receiver::WatchType watch_type) { - for (passes_ = 1; passes_ < WORKER_MAX_PASSES; ++passes_) { - - // Stop if we're told to do it. - if (receiver_->shouldTerminate()) { - return; - } - - // On the second pass, set the event. - if (passes_ == 2) { - switch (watch_type) { - case Receiver::RCV_ERROR: - receiver_->setError("we have an error"); - break; - case Receiver::RCV_READY: - receiver_->markReady(watch_type); - break; - case Receiver::RCV_TERMINATE: - default: - // Do nothing, we're waiting to be told to stop. - break; - } - } - - // Take a nap. - nap(1); - } - - // Indicate why we stopped. - receiver_->setError("thread expired"); - } - - /// @brief Current receiver instance. - ReceiverPtr receiver_; - - /// @brief Counter used to track the number of passes made - /// within the thread worker function. - int passes_; -}; - -const int ReceiverTest::WORKER_MAX_PASSES = 5; - -/// Verifies the basic operation of the Receiver class. -/// It checks that a Receiver can be created, can be stopped, -/// and that in set and clear sockets. -TEST_F(ReceiverTest, receiverClassBasics) { - - /// We'll create a receiver and let it run until it expires. (Note this is more - /// of a test of ReceiverTest itself and ensures our tests later for why we - /// exited are sound.) - receiver_.reset(new Receiver()); - ASSERT_FALSE(receiver_->isRunning()); - receiver_->start(boost::bind(&ReceiverTest::worker, this, Receiver::RCV_TERMINATE)); - ASSERT_TRUE(receiver_->isRunning()); - - // Wait long enough for thread to expire. - nap(WORKER_MAX_PASSES + 1); - - // It should have done the maximum number of passes. - EXPECT_EQ(passes_, WORKER_MAX_PASSES); - - // Error should be ready and error text should be "thread expired". - ASSERT_TRUE(receiver_->isReady(Receiver::RCV_ERROR)); - ASSERT_FALSE(receiver_->isReady(Receiver::RCV_READY)); - ASSERT_FALSE(receiver_->isReady(Receiver::RCV_TERMINATE)); - EXPECT_EQ("thread expired", receiver_->getLastError()); - - // Thread is technically still running, so let's stop it. - EXPECT_TRUE(receiver_->isRunning()); - ASSERT_NO_THROW(receiver_->stop()); - ASSERT_FALSE(receiver_->isRunning()); - - /// Now we'll test stopping a thread. - /// Start the receiver, let it run a little and then tell it to stop. - receiver_->start(boost::bind(&ReceiverTest::worker, this, Receiver::RCV_TERMINATE)); - ASSERT_TRUE(receiver_->isRunning()); - - // No watches should be ready. - ASSERT_FALSE(receiver_->isReady(Receiver::RCV_ERROR)); - ASSERT_FALSE(receiver_->isReady(Receiver::RCV_READY)); - ASSERT_FALSE(receiver_->isReady(Receiver::RCV_TERMINATE)); - - // Wait a little while. - nap(2); - - // Tell it to stop. - receiver_->stop(); - ASSERT_FALSE(receiver_->isRunning()); - - // It should have done less than the maximum number of passes. - EXPECT_LT(passes_, WORKER_MAX_PASSES); - - // No watches should be ready. Error text should be "thread stopped". - ASSERT_FALSE(receiver_->isReady(Receiver::RCV_ERROR)); - ASSERT_FALSE(receiver_->isReady(Receiver::RCV_READY)); - ASSERT_FALSE(receiver_->isReady(Receiver::RCV_TERMINATE)); - EXPECT_EQ("thread stopped", receiver_->getLastError()); - - - // Next we'll test error notification. - // Start the receiver with a thread that sets an error on the second pass. - receiver_->start(boost::bind(&ReceiverTest::worker, this, Receiver::RCV_ERROR)); - ASSERT_TRUE(receiver_->isRunning()); - - // No watches should be ready. - ASSERT_FALSE(receiver_->isReady(Receiver::RCV_ERROR)); - ASSERT_FALSE(receiver_->isReady(Receiver::RCV_READY)); - ASSERT_FALSE(receiver_->isReady(Receiver::RCV_TERMINATE)); - - // Wait a little while. - nap(2); - - // It should now indicate an error. - ASSERT_TRUE(receiver_->isReady(Receiver::RCV_ERROR)); - EXPECT_EQ("we have an error", receiver_->getLastError()); - - // Tell it to stop. - receiver_->stop(); - ASSERT_FALSE(receiver_->isRunning()); - - // It should have done less than the maximum number of passes. - EXPECT_LT(passes_, WORKER_MAX_PASSES); - - // No watches should be ready. Error text should be "thread stopped". - ASSERT_FALSE(receiver_->isReady(Receiver::RCV_ERROR)); - ASSERT_FALSE(receiver_->isReady(Receiver::RCV_READY)); - ASSERT_FALSE(receiver_->isReady(Receiver::RCV_TERMINATE)); - EXPECT_EQ("thread stopped", receiver_->getLastError()); - - - // Finally, we'll test data ready notification. - // We'll start the receiver with a thread that indicates data ready on its second pass. - receiver_->start(boost::bind(&ReceiverTest::worker, this, Receiver::RCV_READY)); - ASSERT_TRUE(receiver_->isRunning()); - - // No watches should be ready. - ASSERT_FALSE(receiver_->isReady(Receiver::RCV_ERROR)); - ASSERT_FALSE(receiver_->isReady(Receiver::RCV_READY)); - ASSERT_FALSE(receiver_->isReady(Receiver::RCV_TERMINATE)); - - // Wait a little while. - nap(2); - - // It should now indicate data ready. - ASSERT_TRUE(receiver_->isReady(Receiver::RCV_READY)); - - // Tell it to stop. - receiver_->stop(); - ASSERT_FALSE(receiver_->isRunning()); - - // It should have done less than the maximum number of passes. - EXPECT_LT(passes_, WORKER_MAX_PASSES); - - // No watches should be ready. Error text should be "thread stopped". - ASSERT_FALSE(receiver_->isReady(Receiver::RCV_ERROR)); - ASSERT_FALSE(receiver_->isReady(Receiver::RCV_READY)); - ASSERT_FALSE(receiver_->isReady(Receiver::RCV_TERMINATE)); - EXPECT_EQ("thread stopped", receiver_->getLastError()); + ASSERT_FALSE(ifacemgr->isDHCPReceiverRunning()); } } diff --git a/src/lib/dhcp/tests/packet_queue_mgr4_unittest.cc b/src/lib/dhcp/tests/packet_queue_mgr4_unittest.cc index 0de5557f2e..d746d68465 100644 --- a/src/lib/dhcp/tests/packet_queue_mgr4_unittest.cc +++ b/src/lib/dhcp/tests/packet_queue_mgr4_unittest.cc @@ -127,9 +127,8 @@ TEST_F(PacketQueueMgr4Test, customQueueType) { // Now unregister the factory. ASSERT_NO_THROW(mgr().unregisterPacketQueueFactory("custom-queue")); - - // Verify we did not lose the queue. - checkMyInfo("{ \"capacity\": 2000, \"queue-type\": \"custom-queue\", \"size\": 0 }"); + // Queue should be gone too. + ASSERT_FALSE(mgr().getPacketQueue()); // Try and recreate the custom queue, type should be invalid. ASSERT_THROW(mgr().createPacketQueue(config), InvalidQueueType); diff --git a/src/lib/dhcp/tests/packet_queue_mgr6_unittest.cc b/src/lib/dhcp/tests/packet_queue_mgr6_unittest.cc index 89cd49a48a..38e7643766 100644 --- a/src/lib/dhcp/tests/packet_queue_mgr6_unittest.cc +++ b/src/lib/dhcp/tests/packet_queue_mgr6_unittest.cc @@ -116,9 +116,8 @@ TEST_F(PacketQueueMgr6Test, customQueueType) { // Now unregister the factory. ASSERT_NO_THROW(mgr().unregisterPacketQueueFactory("custom-queue")); - - // Verify we did not lose the queue. - checkMyInfo("{ \"capacity\": 2000, \"queue-type\": \"custom-queue\", \"size\": 0 }"); + // Queue should be gone too. + ASSERT_FALSE(mgr().getPacketQueue()); // Try and recreate the custom queue, type should be invalid. ASSERT_THROW(mgr().createPacketQueue(config), InvalidQueueType); diff --git a/src/lib/util/threads/Makefile.am b/src/lib/util/threads/Makefile.am index 0a24b1345b..b3e6c20e47 100644 --- a/src/lib/util/threads/Makefile.am +++ b/src/lib/util/threads/Makefile.am @@ -7,6 +7,7 @@ AM_CPPFLAGS += $(BOOST_INCLUDES) lib_LTLIBRARIES = libkea-threads.la libkea_threads_la_SOURCES = sync.h sync.cc libkea_threads_la_SOURCES += thread.h thread.cc +libkea_threads_la_SOURCES += watched_thread.h watched_thread.cc libkea_threads_la_LIBADD = $(top_builddir)/src/lib/exceptions/libkea-exceptions.la libkea_threads_la_LDFLAGS = -no-undefined -version-info 1:0:0 diff --git a/src/lib/util/threads/tests/Makefile.am b/src/lib/util/threads/tests/Makefile.am index 81c5e5ef08..2975ab6764 100644 --- a/src/lib/util/threads/tests/Makefile.am +++ b/src/lib/util/threads/tests/Makefile.am @@ -24,6 +24,7 @@ run_unittests_SOURCES = run_unittests.cc run_unittests_SOURCES += thread_unittest.cc run_unittests_SOURCES += lock_unittest.cc run_unittests_SOURCES += condvar_unittest.cc +run_unittests_SOURCES += watched_thread_unittest.cc run_unittests_CPPFLAGS = $(AM_CPPFLAGS) $(GTEST_INCLUDES) run_unittests_LDFLAGS = $(AM_LDFLAGS) $(GTEST_LDFLAGS) diff --git a/src/lib/util/threads/tests/watched_thread_unittest.cc b/src/lib/util/threads/tests/watched_thread_unittest.cc new file mode 100644 index 0000000000..75310374a0 --- /dev/null +++ b/src/lib/util/threads/tests/watched_thread_unittest.cc @@ -0,0 +1,211 @@ +// Copyright (C) 2018 Internet Systems Consortium, Inc. ("ISC") +// +// This Source Code Form is subject to the terms of the Mozilla Public +// License, v. 2.0. If a copy of the MPL was not distributed with this +// file, You can obtain one at http://mozilla.org/MPL/2.0/. + +#include + +#include + +#include +#include + +#include + +using namespace std; +using namespace isc; +using namespace isc::util; +using namespace isc::util::thread; + +namespace { + +/// @brief Test Fixture for testing isc:util::thread::WatchedThread +class WatchedThreadTest : public ::testing::Test { +public: + /// @brief Maximum number of passes allowed in worker event loop + static const int WORKER_MAX_PASSES; + + /// @brief Constructor. + WatchedThreadTest() {} + + /// @brief Destructor. + ~WatchedThreadTest() { + } + + /// @brief Sleeps for a given number of event periods sleep + /// Each period is 50 ms. + void nap(int periods) { + usleep(periods * 500 * 1000); + }; + + /// @brief Worker function to be used by the WatchedThread's thread + /// + /// The function runs 5 passes through an "event" loop. + /// On each pass: + /// - check terminate command + /// - instigate the desired event (second pass only) + /// - naps for 1 period (50ms) + /// + /// @param watch_type type of event that should occur + void worker(WatchedThread::WatchType watch_type) { + for (passes_ = 1; passes_ < WORKER_MAX_PASSES; ++passes_) { + + // Stop if we're told to do it. + if (wthread_->shouldTerminate()) { + return; + } + + // On the second pass, set the event. + if (passes_ == 2) { + switch (watch_type) { + case WatchedThread::RCV_ERROR: + wthread_->setError("we have an error"); + break; + case WatchedThread::RCV_READY: + wthread_->markReady(watch_type); + break; + case WatchedThread::RCV_TERMINATE: + default: + // Do nothing, we're waiting to be told to stop. + break; + } + } + + // Take a nap. + nap(1); + } + + // Indicate why we stopped. + wthread_->setError("thread expired"); + } + + /// @brief Current receiver instance. + WatchedThreadPtr wthread_; + + /// @brief Counter used to track the number of passes made + /// within the thread worker function. + int passes_; +}; + +const int WatchedThreadTest::WORKER_MAX_PASSES = 5; + +/// Verifies the basic operation of the WatchedThread class. +/// It checks that a WatchedThread can be created, can be stopped, +/// and that in set and clear sockets. +TEST_F(WatchedThreadTest, receiverClassBasics) { + + /// We'll create a receiver and let it run until it expires. (Note this is more + /// of a test of WatchedThreadTest itself and ensures our tests later for why we + /// exited are sound.) + wthread_.reset(new WatchedThread()); + ASSERT_FALSE(wthread_->isRunning()); + wthread_->start(boost::bind(&WatchedThreadTest::worker, this, WatchedThread::RCV_TERMINATE)); + ASSERT_TRUE(wthread_->isRunning()); + + // Wait long enough for thread to expire. + nap(WORKER_MAX_PASSES + 1); + + // It should have done the maximum number of passes. + EXPECT_EQ(passes_, WORKER_MAX_PASSES); + + // Error should be ready and error text should be "thread expired". + ASSERT_TRUE(wthread_->isReady(WatchedThread::RCV_ERROR)); + ASSERT_FALSE(wthread_->isReady(WatchedThread::RCV_READY)); + ASSERT_FALSE(wthread_->isReady(WatchedThread::RCV_TERMINATE)); + EXPECT_EQ("thread expired", wthread_->getLastError()); + + // Thread is technically still running, so let's stop it. + EXPECT_TRUE(wthread_->isRunning()); + ASSERT_NO_THROW(wthread_->stop()); + ASSERT_FALSE(wthread_->isRunning()); + + /// Now we'll test stopping a thread. + /// Start the receiver, let it run a little and then tell it to stop. + wthread_->start(boost::bind(&WatchedThreadTest::worker, this, WatchedThread::RCV_TERMINATE)); + ASSERT_TRUE(wthread_->isRunning()); + + // No watches should be ready. + ASSERT_FALSE(wthread_->isReady(WatchedThread::RCV_ERROR)); + ASSERT_FALSE(wthread_->isReady(WatchedThread::RCV_READY)); + ASSERT_FALSE(wthread_->isReady(WatchedThread::RCV_TERMINATE)); + + // Wait a little while. + nap(2); + + // Tell it to stop. + wthread_->stop(); + ASSERT_FALSE(wthread_->isRunning()); + + // It should have done less than the maximum number of passes. + EXPECT_LT(passes_, WORKER_MAX_PASSES); + + // No watches should be ready. Error text should be "thread stopped". + ASSERT_FALSE(wthread_->isReady(WatchedThread::RCV_ERROR)); + ASSERT_FALSE(wthread_->isReady(WatchedThread::RCV_READY)); + ASSERT_FALSE(wthread_->isReady(WatchedThread::RCV_TERMINATE)); + EXPECT_EQ("thread stopped", wthread_->getLastError()); + + + // Next we'll test error notification. + // Start the receiver with a thread that sets an error on the second pass. + wthread_->start(boost::bind(&WatchedThreadTest::worker, this, WatchedThread::RCV_ERROR)); + ASSERT_TRUE(wthread_->isRunning()); + + // No watches should be ready. + ASSERT_FALSE(wthread_->isReady(WatchedThread::RCV_ERROR)); + ASSERT_FALSE(wthread_->isReady(WatchedThread::RCV_READY)); + ASSERT_FALSE(wthread_->isReady(WatchedThread::RCV_TERMINATE)); + + // Wait a little while. + nap(2); + + // It should now indicate an error. + ASSERT_TRUE(wthread_->isReady(WatchedThread::RCV_ERROR)); + EXPECT_EQ("we have an error", wthread_->getLastError()); + + // Tell it to stop. + wthread_->stop(); + ASSERT_FALSE(wthread_->isRunning()); + + // It should have done less than the maximum number of passes. + EXPECT_LT(passes_, WORKER_MAX_PASSES); + + // No watches should be ready. Error text should be "thread stopped". + ASSERT_FALSE(wthread_->isReady(WatchedThread::RCV_ERROR)); + ASSERT_FALSE(wthread_->isReady(WatchedThread::RCV_READY)); + ASSERT_FALSE(wthread_->isReady(WatchedThread::RCV_TERMINATE)); + EXPECT_EQ("thread stopped", wthread_->getLastError()); + + + // Finally, we'll test data ready notification. + // We'll start the receiver with a thread that indicates data ready on its second pass. + wthread_->start(boost::bind(&WatchedThreadTest::worker, this, WatchedThread::RCV_READY)); + ASSERT_TRUE(wthread_->isRunning()); + + // No watches should be ready. + ASSERT_FALSE(wthread_->isReady(WatchedThread::RCV_ERROR)); + ASSERT_FALSE(wthread_->isReady(WatchedThread::RCV_READY)); + ASSERT_FALSE(wthread_->isReady(WatchedThread::RCV_TERMINATE)); + + // Wait a little while. + nap(2); + + // It should now indicate data ready. + ASSERT_TRUE(wthread_->isReady(WatchedThread::RCV_READY)); + + // Tell it to stop. + wthread_->stop(); + ASSERT_FALSE(wthread_->isRunning()); + + // It should have done less than the maximum number of passes. + EXPECT_LT(passes_, WORKER_MAX_PASSES); + + // No watches should be ready. Error text should be "thread stopped". + ASSERT_FALSE(wthread_->isReady(WatchedThread::RCV_ERROR)); + ASSERT_FALSE(wthread_->isReady(WatchedThread::RCV_READY)); + ASSERT_FALSE(wthread_->isReady(WatchedThread::RCV_TERMINATE)); + EXPECT_EQ("thread stopped", wthread_->getLastError()); +} + +} diff --git a/src/lib/util/threads/watched_thread.cc b/src/lib/util/threads/watched_thread.cc new file mode 100644 index 0000000000..6925a4ce54 --- /dev/null +++ b/src/lib/util/threads/watched_thread.cc @@ -0,0 +1,78 @@ +// Copyright (C) 2018 Internet Systems Consortium, Inc. ("ISC") +// +// This Source Code Form is subject to the terms of the Mozilla Public +// License, v. 2.0. If a copy of the MPL was not distributed with this +// file, You can obtain one at http://mozilla.org/MPL/2.0/. + +#include +#include + +namespace isc { +namespace util { +namespace thread { + +void +WatchedThread::start(const boost::function& thread_main) { + clearReady(RCV_ERROR); + clearReady(RCV_READY); + clearReady(RCV_TERMINATE); + last_error_ = "no error"; + thread_.reset(new Thread(thread_main)); +} + +int +WatchedThread::getWatchFd(WatchType watch_type) { + return(sockets_[watch_type].getSelectFd()); +} + +void +WatchedThread::markReady(WatchType watch_type) { + sockets_[watch_type].markReady(); +} + +bool +WatchedThread::isReady(WatchType watch_type) { + return (sockets_[watch_type].isReady()); +} + +void +WatchedThread::clearReady(WatchType watch_type) { + sockets_[watch_type].clearReady(); +} + +bool +WatchedThread::shouldTerminate() { + if (sockets_[RCV_TERMINATE].isReady()) { + clearReady(RCV_TERMINATE); + return (true); + } + + return (false); +} + +void +WatchedThread::stop() { + if (thread_) { + markReady(RCV_TERMINATE); + thread_->wait(); + thread_.reset(); + } + + clearReady(RCV_ERROR); + clearReady(RCV_READY); + last_error_ = "thread stopped"; +} + +void +WatchedThread::setError(const std::string& error_msg) { + last_error_ = error_msg; + markReady(RCV_ERROR); +} + +std::string +WatchedThread::getLastError() { + return (last_error_); +} +} // end of namespace isc::util::thread +} // end of namespace isc::util +} // end of namespace isc diff --git a/src/lib/util/threads/watched_thread.h b/src/lib/util/threads/watched_thread.h new file mode 100644 index 0000000000..c9dd99cbce --- /dev/null +++ b/src/lib/util/threads/watched_thread.h @@ -0,0 +1,133 @@ +// Copyright (C) 2018 Internet Systems Consortium, Inc. ("ISC") +// +// This Source Code Form is subject to the terms of the Mozilla Public +// License, v. 2.0. If a copy of the MPL was not distributed with this +// file, You can obtain one at http://mozilla.org/MPL/2.0/. + +#ifndef WATCHED_THREAD_H +#define WATCHED_THREAD_H + +#include +#include + +#include + +namespace isc { +namespace util { +namespace thread { + +/// @brief Provides a thread and controls for receiving packets. +/// +/// Given a "worker function", this class creates a thread which +/// runs the function and provides the means to monitor the thread +/// for "error" and "ready" conditions, and finally to stop the thread. +/// It uses three WatchSockets: one to indicate an error, one to indicate +/// data is ready, and a third to monitor as a shut-down command. +class WatchedThread { +public: + /// @brief Enumerates the list of watch sockets used to mark events + /// These are used as arguments to watch socket accessor methods. + enum WatchType { + RCV_ERROR = 0, + RCV_READY = 1, + RCV_TERMINATE = 2 + }; + + /// @brief Constructor + WatchedThread(){}; + + /// @brief Virtual destructor + virtual ~WatchedThread(){} + + /// @brief Fetches the fd of a watch socket + /// + /// @param watch_type indicates which watch socket + /// @return the watch socket's file descriptor + int getWatchFd(WatchType watch_type); + + /// @brief Sets a watch socket state to ready + /// + /// @param watch_type indicates which watch socket to mark + void markReady(WatchType watch_type); + + /// @brief Indicates if a watch socket state is ready + /// + /// @param watch_type indicates which watch socket to mark + /// @return true if the watch socket is ready, false otherwise + bool isReady(WatchType watch_type); + + /// @brief Sets a watch socket state to not ready + /// + /// @param watch_type indicates which watch socket to clear + void clearReady(WatchType watch_type); + + /// @brief Checks if the receiver thread should terminate + /// + /// Performs a "one-shot" check of the receiver's terminate + /// watch socket. If it is ready, return true and then clear + /// it, otherwise return false. + /// + /// @return true if the terminate watch socket is ready + bool shouldTerminate(); + + /// @brief Creates and runs the thread. + /// + /// Creates teh receiver's thread, passing into it the given + /// function to run. + /// + /// @param thread_main function the receiver's thread should run + void start(const boost::function& thread_main); + + /// @brief Returns true if the receiver thread is running + /// @todo - this may need additional logic to handle cases where + /// a thread function exits w/o the caller invoking @c + /// WatchedThread::stop(). + bool isRunning() { + return (thread_ != 0); + } + + /// @brief Terminates the receiver thread + /// + /// It marks the terminate watch socket ready, and then waits for the + /// thread to stop. At this point, the receiver is defunct. This is + /// not done in the destructor to avoid race conditions. + void stop(); + + /// @brief Sets the receiver error state + /// + /// This records the given error message and sets the error watch + /// socket to ready. + /// + /// @param error_msg + void setError(const std::string& error_msg); + + /// @brief Fetches the error message text for the most recent socket error + /// + /// @return string containing the error message + std::string getLastError(); + + /// @brief Error message of the last error encountered + std::string last_error_; + + /// @brief DHCP watch sockets that are used to communicate with the owning thread + /// There are three: + /// -# RCV_ERROR - packet receive error watch socket. + /// Marked as ready when the DHCP packet receiver experiences an I/O error. + /// -# RCV_READY - Marked as ready when the DHCP packet receiver adds a packet + /// to the packet queue. + /// -# RCV_TERMINATE Packet receiver terminate watch socket. + /// Marked as ready when the DHCP packet receiver thread should terminate. + WatchSocket sockets_[RCV_TERMINATE + 1]; + + /// DHCP packet receiver thread. + thread::ThreadPtr thread_ ; +}; + +/// @brief Defines a pointer to a WatchedThread +typedef boost::shared_ptr WatchedThreadPtr; + +}; // namespace isc::util::thread +}; // namespace isc::util +}; // namespace isc + +#endif // WATCHED_THREAD_H