]> git.ipfire.org Git - thirdparty/kea.git/commitdiff
[#260,!120] Addressed more review comments
authorThomas Markwalder <tmark@isc.org>
Tue, 20 Nov 2018 11:31:38 +0000 (06:31 -0500)
committerThomas Markwalder <tmark@isc.org>
Tue, 20 Nov 2018 11:31:38 +0000 (06:31 -0500)
doc/guide/congestion-handling.xml
    Added subsections to Congestion Handling chapter

src/lib/dhcp/iface_mgr.*
    Removed Receiver class, replaced with new class lib::util::thread::WatchedThread
    Renamed IfaceMgr::receiver_ to dhcp_receiver
    Renamed add_fd to addFDtoSet

src/lib/dhcp/packet_queue_mgr.h
    PacketQueueMgr::unregisterFactory() - destroys queue instance
    if it matches type being unregistered

New files:
src/lib/util/threads/
    watched_thread.h
    watched_thread.cc

src/lib/util/threads/tests
    watched_thread_unittest.cc

12 files changed:
doc/guide/congestion-handling.xml
src/lib/dhcp/iface_mgr.cc
src/lib/dhcp/iface_mgr.h
src/lib/dhcp/packet_queue_mgr.h
src/lib/dhcp/tests/iface_mgr_unittest.cc
src/lib/dhcp/tests/packet_queue_mgr4_unittest.cc
src/lib/dhcp/tests/packet_queue_mgr6_unittest.cc
src/lib/util/threads/Makefile.am
src/lib/util/threads/tests/Makefile.am
src/lib/util/threads/tests/watched_thread_unittest.cc [new file with mode: 0644]
src/lib/util/threads/watched_thread.cc [new file with mode: 0644]
src/lib/util/threads/watched_thread.h [new file with mode: 0644]

index 89c5c4ac635c321ecf7383b5c8d4d7ca6d385af1..a8ea39af152879e5b964d6d9c2f3afdf2cc09e93 100644 (file)
@@ -9,6 +9,8 @@
 <!-- Converted by db4-upgrade version 1.1 -->
 <chapter xmlns="http://docbook.org/ns/docbook" version="5.0" xml:id="congestion-handling">
   <title>Congestion Handling in DHCPv4 and DHCPv6</title>
+  <section xml:id="congeston-handling-background">
+  <title>What is Congestion?</title>
   <para>Congestion occurs when servers are subjected to client queries
   faster than they can be fulfilled.  Subsequently, the servers begin
   accumulating a backlog of pending queries.  The longer the high rate of
@@ -42,7 +44,9 @@
   relevant, or worse are redundant. In other words, the packets waiting in
   the FIFO socket buffers become increasingly stale.
   </para>
-
+  </section>
+  <section xml:id="congeston-handling-solution">
+  <title>Configuring Congestion Handling</title>
   <para>Kea 1.5 introduces a new feature referred to as Congestion Handling.
   Congestion handling offers the ability to configure the server to use a
   separate thread to read packets from the interface socket buffers. As the
   <para>
   The number of parameters and plug-ins is expected to grow over time.
   </para>
+</section>
 </chapter>
index 91af356b016de541ae0defd89bd8132939ef1263..a11a3daa5094642fbaa74ff7c42a296b10175196 100644 (file)
@@ -179,69 +179,6 @@ bool Iface::delSocket(const uint16_t sockfd) {
     return (false); // socket not found
 }
 
-void
-Receiver::start(const boost::function<void()>& thread_main) {
-    clearReady(RCV_ERROR);
-    clearReady(RCV_READY);
-    clearReady(RCV_TERMINATE);
-    last_error_ = "no error";
-    thread_.reset(new isc::util::thread::Thread(thread_main));
-}
-
-int
-Receiver::getWatchFd(WatchType watch_type) {
-    return(sockets_[watch_type].getSelectFd());
-}
-
-void
-Receiver::markReady(WatchType watch_type)  {
-    sockets_[watch_type].markReady();
-}
-
-bool
-Receiver::isReady(WatchType watch_type) {
-    return (sockets_[watch_type].isReady());
-}
-
-void
-Receiver::clearReady(WatchType watch_type) {
-    sockets_[watch_type].clearReady();
-}
-
-bool
-Receiver::shouldTerminate() {
-    if (sockets_[RCV_TERMINATE].isReady()) {
-        clearReady(RCV_TERMINATE);
-        return (true);
-    }
-
-    return (false);
-}
-
-void
-Receiver::stop() {
-    if (thread_) {
-        markReady(RCV_TERMINATE);
-        thread_->wait();
-        thread_.reset();
-    }
-
-    clearReady(RCV_ERROR);
-    clearReady(RCV_READY);
-    last_error_ = "thread stopped";
-}
-
-void
-Receiver::setError(const std::string& error_msg) {
-    last_error_ = error_msg;
-    markReady(RCV_ERROR);
-}
-
-std::string
-Receiver::getLastError() {
-    return (last_error_);
-}
-
 IfaceMgr::IfaceMgr()
     :control_buf_len_(CMSG_SPACE(sizeof(struct in6_pktinfo))),
      control_buf_(new char[control_buf_len_]),
@@ -354,11 +291,12 @@ void IfaceMgr::closeSockets() {
 }
 
 void IfaceMgr::stopDHCPReceiver() {
-    if (isReceiverRunning()) {
-        receiver_->stop();
-        receiver_.reset();
+    if (isDHCPReceiverRunning()) {
+        dhcp_receiver_->stop();
     }
 
+    dhcp_receiver_.reset();
+
     if (getPacketQueue4()) {
         getPacketQueue4()->clear();
     }
@@ -743,29 +681,31 @@ IfaceMgr::openSockets6(const uint16_t port,
 
 void
 IfaceMgr::startDHCPReceiver(const uint16_t family) {
-    if (isReceiverRunning()) {
+    if (isDHCPReceiverRunning()) {
         isc_throw(InvalidOperation, "a receiver thread already exists");
     }
 
     switch (family) {
     case AF_INET:
-        // If there's no queue, then has been disabled, simply return.
+        // If the queue doesn't exist, packet queing has been configured
+        // as disabled. If there is no queue, we do not create a reciever.
         if(!getPacketQueue4()) {
-                return;
+            return;
         }
 
-        receiver_.reset(new Receiver());
-        receiver_->start(boost::bind(boost::bind(&IfaceMgr::receiveDHCP4Packets, this)));
+        dhcp_receiver_.reset(new WatchedThread());
+        dhcp_receiver_->start(boost::bind(&IfaceMgr::receiveDHCP4Packets, this));
 
         break;
     case AF_INET6:
-        // If there's no queue, then has been disabled, simply return.
+        // If the queue doesn't exist, packet queing has been configured
+        // as disabled. If there is no queue, we do not create a reciever.
         if(!getPacketQueue6()) {
             return;
         }
 
-        receiver_.reset(new Receiver());
-        receiver_->start(boost::bind(boost::bind(&IfaceMgr::receiveDHCP6Packets, this)));
+        dhcp_receiver_.reset(new WatchedThread());
+        dhcp_receiver_->start(boost::bind(&IfaceMgr::receiveDHCP6Packets, this));
         break;
     default:
         isc_throw (BadValue, "startDHCPReceiver: invalid family: " << family);
@@ -1024,7 +964,7 @@ IfaceMgr::send(const Pkt4Ptr& pkt) {
 }
 
 Pkt4Ptr IfaceMgr::receive4(uint32_t timeout_sec, uint32_t timeout_usec /* = 0 */) {
-    if (isReceiverRunning()) {
+    if (isDHCPReceiverRunning()) {
         return (receive4Indirect(timeout_sec, timeout_usec));
     }
 
@@ -1046,15 +986,15 @@ Pkt4Ptr IfaceMgr::receive4Indirect(uint32_t timeout_sec, uint32_t timeout_usec /
     // if there are any callbacks for external sockets registered...
     if (!callbacks_.empty()) {
         BOOST_FOREACH(SocketCallbackInfo s, callbacks_) {
-            add_fd(s.socket_, maxfd, &sockets);
+            addFDtoSet(s.socket_, maxfd, &sockets);
         }
     }
 
     // Add Receiver ready watch socket
-    add_fd(receiver_->getWatchFd(Receiver::RCV_READY), maxfd, &sockets);
+    addFDtoSet(dhcp_receiver_->getWatchFd(WatchedThread::RCV_READY), maxfd, &sockets);
 
     // Add Receiver error watch socket
-    add_fd(receiver_->getWatchFd(Receiver::RCV_ERROR), maxfd, &sockets);
+    addFDtoSet(dhcp_receiver_->getWatchFd(WatchedThread::RCV_ERROR), maxfd, &sockets);
 
     // Set timeout for our next select() call.  If there are
     // no DHCP packets to read, then we'll wait for a finite
@@ -1097,9 +1037,9 @@ Pkt4Ptr IfaceMgr::receive4Indirect(uint32_t timeout_sec, uint32_t timeout_usec /
     // We only check external sockets if select detected an event.
     if (result > 0) {
         // Check for receiver thread read errors.
-        if (receiver_->isReady(Receiver::RCV_ERROR)) {
-            string msg = receiver_->getLastError();
-            receiver_->clearReady(Receiver::RCV_ERROR);
+        if (dhcp_receiver_->isReady(WatchedThread::RCV_ERROR)) {
+            string msg = dhcp_receiver_->getLastError();
+            dhcp_receiver_->clearReady(WatchedThread::RCV_ERROR);
             isc_throw(SocketReadError, msg);
         }
 
@@ -1125,7 +1065,7 @@ Pkt4Ptr IfaceMgr::receive4Indirect(uint32_t timeout_sec, uint32_t timeout_usec /
     // If we're here it should only be because there are DHCP packets waiting.
     Pkt4Ptr pkt = getPacketQueue4()->dequeuePacket();
     if (!pkt) {
-        receiver_->clearReady(Receiver::RCV_READY);
+        dhcp_receiver_->clearReady(WatchedThread::RCV_READY);
     }
 
     return (pkt);
@@ -1152,7 +1092,7 @@ Pkt4Ptr IfaceMgr::receive4Direct(uint32_t timeout_sec, uint32_t timeout_usec /*
             // Only deal with IPv4 addresses.
             if (s.addr_.isV4()) {
                 // Add this socket to listening set
-                add_fd(s.sockfd_, maxfd, &sockets);
+                addFDtoSet(s.sockfd_, maxfd, &sockets);
             }
         }
     }
@@ -1161,7 +1101,7 @@ Pkt4Ptr IfaceMgr::receive4Direct(uint32_t timeout_sec, uint32_t timeout_usec /*
     if (!callbacks_.empty()) {
         BOOST_FOREACH(SocketCallbackInfo s, callbacks_) {
             // Add this socket to listening set
-            add_fd(s.socket_, maxfd, &sockets);
+            addFDtoSet(s.socket_, maxfd, &sockets);
         }
     }
 
@@ -1235,7 +1175,7 @@ Pkt4Ptr IfaceMgr::receive4Direct(uint32_t timeout_sec, uint32_t timeout_usec /*
 
 Pkt6Ptr
 IfaceMgr::receive6(uint32_t timeout_sec, uint32_t timeout_usec /* = 0 */) {
-    if (isReceiverRunning()) {
+    if (isDHCPReceiverRunning()) {
         return (receive6Indirect(timeout_sec, timeout_usec));
     }
 
@@ -1243,9 +1183,9 @@ IfaceMgr::receive6(uint32_t timeout_sec, uint32_t timeout_usec /* = 0 */) {
 }
 
 void
-IfaceMgr::add_fd(int fd, int& maxfd, fd_set* sockets) {
+IfaceMgr::addFDtoSet(int fd, int& maxfd, fd_set* sockets) {
     if (!sockets) {
-        isc_throw(BadValue, "add_fd: sockets can't be null");
+        isc_throw(BadValue, "addFDtoSet: sockets can't be null");
     }
 
     FD_SET(fd, sockets);
@@ -1276,7 +1216,7 @@ IfaceMgr::receive6Direct(uint32_t timeout_sec, uint32_t timeout_usec /* = 0 */ )
             // Only deal with IPv6 addresses.
             if (s.addr_.isV6()) {
                 // Add this socket to listening set
-                add_fd(s.sockfd_, maxfd, &sockets);
+                addFDtoSet(s.sockfd_, maxfd, &sockets);
             }
         }
     }
@@ -1285,7 +1225,7 @@ IfaceMgr::receive6Direct(uint32_t timeout_sec, uint32_t timeout_usec /* = 0 */ )
     if (!callbacks_.empty()) {
         BOOST_FOREACH(SocketCallbackInfo s, callbacks_) {
             // Add it to the set as well
-            add_fd(s.socket_, maxfd, &sockets);
+            addFDtoSet(s.socket_, maxfd, &sockets);
         }
     }
 
@@ -1372,15 +1312,15 @@ IfaceMgr::receive6Indirect(uint32_t timeout_sec, uint32_t timeout_usec /* = 0 */
     if (!callbacks_.empty()) {
         BOOST_FOREACH(SocketCallbackInfo s, callbacks_) {
             // Add it to the set as well
-            add_fd(s.socket_, maxfd, &sockets);
+            addFDtoSet(s.socket_, maxfd, &sockets);
         }
     }
 
     // Add Receiver ready watch socket
-    add_fd(receiver_->getWatchFd(Receiver::RCV_READY), maxfd, &sockets);
+    addFDtoSet(dhcp_receiver_->getWatchFd(WatchedThread::RCV_READY), maxfd, &sockets);
 
     // Add Receiver error watch socket
-    add_fd(receiver_->getWatchFd(Receiver::RCV_ERROR), maxfd, &sockets);
+    addFDtoSet(dhcp_receiver_->getWatchFd(WatchedThread::RCV_ERROR), maxfd, &sockets);
 
     // Set timeout for our next select() call.  If there are
     // no DHCP packets to read, then we'll wait for a finite
@@ -1423,9 +1363,9 @@ IfaceMgr::receive6Indirect(uint32_t timeout_sec, uint32_t timeout_usec /* = 0 */
     // We only check external sockets if select detected an event.
     if (result > 0) {
         // Check for receiver thread read errors.
-        if (receiver_->isReady(Receiver::RCV_ERROR)) {
-            string msg = receiver_->getLastError();
-            receiver_->clearReady(Receiver::RCV_ERROR);
+        if (dhcp_receiver_->isReady(WatchedThread::RCV_ERROR)) {
+            string msg = dhcp_receiver_->getLastError();
+            dhcp_receiver_->clearReady(WatchedThread::RCV_ERROR);
             isc_throw(SocketReadError, msg);
         }
 
@@ -1451,7 +1391,7 @@ IfaceMgr::receive6Indirect(uint32_t timeout_sec, uint32_t timeout_usec /* = 0 */
     // If we're here it should only be because there are DHCP packets waiting.
     Pkt6Ptr pkt = getPacketQueue6()->dequeuePacket();
     if (!pkt) {
-        receiver_->clearReady(Receiver::RCV_READY);
+        dhcp_receiver_->clearReady(WatchedThread::RCV_READY);
     }
 
     return (pkt);
@@ -1466,7 +1406,7 @@ IfaceMgr::receiveDHCP4Packets() {
     FD_ZERO(&sockets);
 
     // Add terminate watch socket.
-    add_fd(receiver_->getWatchFd(Receiver::RCV_TERMINATE), maxfd, &sockets);
+    addFDtoSet(dhcp_receiver_->getWatchFd(WatchedThread::RCV_TERMINATE), maxfd, &sockets);
 
     // Add Interface sockets.
     BOOST_FOREACH(iface, ifaces_) {
@@ -1474,14 +1414,14 @@ IfaceMgr::receiveDHCP4Packets() {
             // Only deal with IPv4 addresses.
             if (s.addr_.isV4()) {
                 // Add this socket to listening set.
-                add_fd(s.sockfd_, maxfd, &sockets);
+                addFDtoSet(s.sockfd_, maxfd, &sockets);
             }
         }
     }
 
     for (;;) {
         // Check the watch socket.
-        if (receiver_->shouldTerminate()) {
+        if (dhcp_receiver_->shouldTerminate()) {
             return;
         }
 
@@ -1495,7 +1435,7 @@ IfaceMgr::receiveDHCP4Packets() {
         int result = select(maxfd + 1, &rd_set, 0, 0, 0);
 
         // Re-check the watch socket.
-        if (receiver_->shouldTerminate()) {
+        if (dhcp_receiver_->shouldTerminate()) {
             return;
         }
 
@@ -1507,7 +1447,7 @@ IfaceMgr::receiveDHCP4Packets() {
             // This thread should not get signals?
             if (errno != EINTR) {
                 // Signal the error to receive4.
-                receiver_->setError(strerror(errno));
+                dhcp_receiver_->setError(strerror(errno));
                 // We need to sleep in case of the error condition to
                 // prevent the thread from tight looping when result
                 // gets negative.
@@ -1522,7 +1462,7 @@ IfaceMgr::receiveDHCP4Packets() {
                 if (FD_ISSET(s.sockfd_, &sockets)) {
                     receiveDHCP4Packet(*iface, s);
                     // Can take time so check one more time the watch socket.
-                    if (receiver_->shouldTerminate()) {
+                    if (dhcp_receiver_->shouldTerminate()) {
                         return;
                     }
                 }
@@ -1541,7 +1481,7 @@ IfaceMgr::receiveDHCP6Packets() {
     FD_ZERO(&sockets);
 
     // Add terminate watch socket.
-    add_fd(receiver_->getWatchFd(Receiver::RCV_TERMINATE), maxfd, &sockets);
+    addFDtoSet(dhcp_receiver_->getWatchFd(WatchedThread::RCV_TERMINATE), maxfd, &sockets);
 
     // Add Interface sockets.
     BOOST_FOREACH(iface, ifaces_) {
@@ -1549,14 +1489,14 @@ IfaceMgr::receiveDHCP6Packets() {
             // Only deal with IPv6 addresses.
             if (s.addr_.isV6()) {
                 // Add this socket to listening set.
-                add_fd(s.sockfd_ , maxfd, &sockets);
+                addFDtoSet(s.sockfd_ , maxfd, &sockets);
             }
         }
     }
 
     for (;;) {
         // Check the watch socket.
-        if (receiver_->shouldTerminate()) {
+        if (dhcp_receiver_->shouldTerminate()) {
             return;
         }
 
@@ -1570,7 +1510,7 @@ IfaceMgr::receiveDHCP6Packets() {
         int result = select(maxfd + 1, &rd_set, 0, 0, 0);
 
         // Re-check the watch socket.
-        if (receiver_->shouldTerminate()) {
+        if (dhcp_receiver_->shouldTerminate()) {
             return;
         }
 
@@ -1581,7 +1521,7 @@ IfaceMgr::receiveDHCP6Packets() {
             // This thread should not get signals?
             if (errno != EINTR) {
                 // Signal the error to receive6.
-                receiver_->setError(strerror(errno));
+                dhcp_receiver_->setError(strerror(errno));
                 // We need to sleep in case of the error condition to
                 // prevent the thread from tight looping when result
                 // gets negative.
@@ -1596,7 +1536,7 @@ IfaceMgr::receiveDHCP6Packets() {
                 if (FD_ISSET(s.sockfd_, &sockets)) {
                     receiveDHCP6Packet(s);
                     // Can take time so check one more time the watch socket.
-                    if (receiver_->shouldTerminate()) {
+                    if (dhcp_receiver_->shouldTerminate()) {
                         return;
                     }
                 }
@@ -1612,7 +1552,7 @@ IfaceMgr::receiveDHCP4Packet(Iface& iface, const SocketInfo& socket_info) {
     int result = ioctl(socket_info.sockfd_, FIONREAD, &len);
     if (result < 0) {
         // Signal the error to receive4.
-        receiver_->setError(strerror(errno));
+        dhcp_receiver_->setError(strerror(errno));
         return;
     }
     if (len == 0) {
@@ -1625,14 +1565,14 @@ IfaceMgr::receiveDHCP4Packet(Iface& iface, const SocketInfo& socket_info) {
     try {
         pkt = packet_filter_->receive(iface, socket_info);
     } catch (const std::exception& ex) {
-        receiver_->setError(strerror(errno));
+        dhcp_receiver_->setError(strerror(errno));
     } catch (...) {
-        receiver_->setError("packet filter receive() failed");
+        dhcp_receiver_->setError("packet filter receive() failed");
     }
 
     if (pkt) {
         getPacketQueue4()->enqueuePacket(pkt, socket_info);
-        receiver_->markReady(Receiver::RCV_READY);
+        dhcp_receiver_->markReady(WatchedThread::RCV_READY);
     }
 }
 
@@ -1643,7 +1583,7 @@ IfaceMgr::receiveDHCP6Packet(const SocketInfo& socket_info) {
     int result = ioctl(socket_info.sockfd_, FIONREAD, &len);
     if (result < 0) {
         // Signal the error to receive6.
-        receiver_->setError(strerror(errno));
+        dhcp_receiver_->setError(strerror(errno));
         return;
     }
     if (len == 0) {
@@ -1656,14 +1596,14 @@ IfaceMgr::receiveDHCP6Packet(const SocketInfo& socket_info) {
     try {
         pkt = packet_filter6_->receive(socket_info);
     } catch (const std::exception& ex) {
-        receiver_->setError(ex.what());
+        dhcp_receiver_->setError(ex.what());
     } catch (...) {
-        receiver_->setError("packet filter receive() failed");
+        dhcp_receiver_->setError("packet filter receive() failed");
     }
 
     if (pkt) {
         getPacketQueue6()->enqueuePacket(pkt, socket_info);
-        receiver_->markReady(Receiver::RCV_READY);
+        dhcp_receiver_->markReady(WatchedThread::RCV_READY);
     }
 }
 
@@ -1759,9 +1699,9 @@ IfaceMgr::getSocket(isc::dhcp::Pkt4 const& pkt) {
 
 bool
 IfaceMgr::configureDHCPPacketQueue(uint16_t family, data::ConstElementPtr queue_control) {
-    if (isReceiverRunning()) {
+    if (isDHCPReceiverRunning()) {
         isc_throw(InvalidOperation, "Cannot reconfigure queueing"
-                                    " while receiver thread is running");
+                                    " while DHCP receiver thread is running");
     }
 
     bool enable_queue = false;
index af9d4cfce3dc001c9d1abd1e02ed73d0b27ec796..43fc90881545ccdf63b7df7f1995cf3276e7fddf 100644 (file)
 #include <dhcp/pkt_filter6.h>
 #include <util/optional_value.h>
 #include <util/watch_socket.h>
-#include <util/threads/thread.h>
+#include <util/threads/watched_thread.h>
 
 #include <boost/function.hpp>
 #include <boost/noncopyable.hpp>
 #include <boost/scoped_array.hpp>
 #include <boost/shared_ptr.hpp>
-#include <boost/circular_buffer.hpp>
 
 #include <list>
 #include <vector>
@@ -457,116 +456,6 @@ private:
 
 typedef boost::shared_ptr<Iface> IfacePtr;
 
-/// @brief Provides a thread and controls for receiving packets.
-///
-/// Given a "worker function", this class creates a thread which
-/// runs the function and provides the means to monitor the thread
-/// for "error" and "ready" conditions, and finally to stop the thread.
-/// It uses three WatchSockets: one to indicate an error, one to indicate
-/// data is ready, and a third to monitor as a shut-down command.
-class Receiver {
-public:
-    /// @brief Enumerates the list of watch sockets used to mark events
-    /// These are used as arguments to watch socket accessor methods.
-    enum WatchType {
-        RCV_ERROR = 0,
-        RCV_READY = 1,
-        RCV_TERMINATE = 2
-    };
-
-    /// @brief Constructor
-    Receiver(){};
-
-    /// @brief Virtual destructor
-    virtual ~Receiver(){}
-
-    /// @brief Fetches the fd of a watch socket
-    ///
-    /// @param watch_type indicates which watch socket
-    /// @return the watch socket's file descriptor
-    int getWatchFd(WatchType watch_type);
-
-    /// @brief Sets a watch socket state to ready
-    ///
-    /// @param watch_type indicates which watch socket to mark
-    void markReady(WatchType watch_type);
-
-    /// @brief Indicates if a watch socket state is ready
-    ///
-    /// @param watch_type indicates which watch socket to mark
-    /// @return true if the watch socket is ready, false otherwise
-    bool isReady(WatchType watch_type);
-
-    /// @brief Sets a watch socket state to not ready
-    ///
-    /// @param watch_type indicates which watch socket to clear
-    void clearReady(WatchType watch_type);
-
-    /// @brief Checks if the receiver thread should terminate
-    ///
-    /// Performs a "one-shot" check of the receiver's terminate
-    /// watch socket.  If it is ready, return true and then clear
-    /// it, otherwise return false.
-    ///
-    /// @return true if the terminate watch socket is ready
-    bool shouldTerminate();
-
-    /// @brief Creates and runs the thread.
-    ///
-    /// Creates teh receiver's thread, passing into it the given
-    /// function to run.
-    ///
-    /// @param thread_main function the receiver's thread should run
-    void start(const boost::function<void()>& thread_main);
-
-    /// @brief Returns true if the receiver thread is running
-    /// @todo - this may need additional logic to handle cases where
-    /// a thread function exits w/o the caller invoking @c
-    /// Receiver::stop().
-    bool isRunning() {
-        return (thread_ != 0);
-    }
-
-    /// @brief Terminates the receiver thread
-    ///
-    /// It marks the terminate watch socket ready, and then waits for the
-    /// thread to stop.  At this point, the receiver is defunct.  This is
-    /// not done in the destructor to avoid race conditions.
-    void stop();
-
-    /// @brief Sets the receiver error state
-    ///
-    /// This records the given error message and sets the error watch
-    /// socket to ready.
-    ///
-    /// @param error_msg
-    void setError(const std::string& error_msg);
-
-    /// @brief Fetches the error message text for the most recent socket error
-    ///
-    /// @return string containing the error message
-    std::string getLastError();
-
-    /// @brief Error message of the last error encountered
-    std::string last_error_;
-
-    /// @brief DHCP watch sockets that are used to communicate with the owning thread
-    /// There are three:
-    /// -# RCV_ERROR - packet receive error watch socket.
-    /// Marked as ready when the DHCP packet receiver experiences an I/O error.
-    /// -# RCV_READY - Marked as ready when the DHCP packet receiver adds a packet
-    /// to the packet queue.
-    /// -# RCV_TERMINATE Packet receiver terminate watch socket.
-    /// Marked as ready when the DHCP packet receiver thread should terminate.
-    isc::util::WatchSocket sockets_[RCV_TERMINATE + 1];
-
-    /// DHCP packet receiver thread.
-    isc::util::thread::ThreadPtr thread_ ;
-};
-
-/// @brief Defines a pointer to a Receiver
-typedef boost::shared_ptr<Receiver> ReceiverPtr;
-
 /// @brief Forward declaration to the @c IfaceMgr.
 class IfaceMgr;
 
@@ -1181,8 +1070,8 @@ public:
 
     /// @brief Returns true if there is a receiver exists and its
     /// thread is currently running.
-    bool isReceiverRunning() const {
-        return (receiver_ != 0 && receiver_->isRunning());
+    bool isDHCPReceiverRunning() const {
+        return (dhcp_receiver_ != 0 && dhcp_receiver_->isRunning());
     }
 
     /// @brief Configures DHCP packet queue
@@ -1208,7 +1097,7 @@ public:
     /// larger than it's current value, it will be updated to new fd value
     /// @param sockets pointer to the set of sockets
     /// @throw BadValue if sockets is null
-    static void add_fd(int fd, int& maxfd, fd_set* sockets);
+    static void addFDtoSet(int fd, int& maxfd, fd_set* sockets);
 
     // don't use private, we need derived classes in tests
 protected:
@@ -1491,7 +1380,7 @@ private:
     PacketQueueMgr6Ptr packet_queue_mgr6_;
 
     /// DHCP packet receiver.
-    ReceiverPtr receiver_;
+    isc::util::thread::WatchedThreadPtr dhcp_receiver_;
 };
 
 }; // namespace isc::dhcp
index 6eaf6c4b218e9c7ac97caaa0b89a88fe53a68e9e..fcc555d01ed406a73e41dcad5e9b2d246c789af4 100644 (file)
@@ -93,14 +93,21 @@ public:
         // Look for it.
         auto index = factories_.find(queue_type);
 
-        // If it's there remove it
-        if (index != factories_.end()) {
-            factories_.erase(index);
-            return (true);
+        // Not there so nothing to do.
+        if (index == factories_.end()) {
+            return (false);
+        }
 
+        // If the queue is of the type being unregistered, then remove it. We don't
+        // a queue instance outliving its library.
+        if ((packet_queue_) && (packet_queue_->getQueueType() == queue_type)) {
+            packet_queue_.reset();
         }
 
-        return (false);
+        // Remove the factory.
+        factories_.erase(index);
+
+        return (true);
     }
 
     /// @brief Create an instance of a packet queue.
index 9f240a9b0b90b6bad02468f796b18738c6ced537..6478e17349c7e55c84b7e38f90600fb1fd978748 100644 (file)
@@ -484,13 +484,13 @@ public:
 
         // Thread should only start when there is a packet queue.
         ASSERT_NO_THROW(ifacemgr->startDHCPReceiver(AF_INET6));
-        ASSERT_TRUE(queue_enabled == ifacemgr->isReceiverRunning());
+        ASSERT_TRUE(queue_enabled == ifacemgr->isDHCPReceiverRunning());
 
         // If the thread is already running, trying to start it again should fail.
         if (queue_enabled) {
             ASSERT_THROW(ifacemgr->startDHCPReceiver(AF_INET6), InvalidOperation);
             // Should still have one running.
-            ASSERT_TRUE(ifacemgr->isReceiverRunning());
+            ASSERT_TRUE(ifacemgr->isDHCPReceiverRunning());
         }
 
         // Let's build our DHCPv6 packet.
@@ -531,7 +531,7 @@ public:
         // Stop the thread.  This should be no harm/no foul if we're not
         // queueuing.  Either way, we should not have a thread afterwards.
         ASSERT_NO_THROW(ifacemgr->stopDHCPReceiver());
-        ASSERT_FALSE(ifacemgr->isReceiverRunning());
+        ASSERT_FALSE(ifacemgr->isDHCPReceiverRunning());
     }
 
 
@@ -570,13 +570,13 @@ public:
 
         // Thread should only start when there is a packet queue.
         ASSERT_NO_THROW(ifacemgr->startDHCPReceiver(AF_INET));
-        ASSERT_TRUE(queue_enabled == ifacemgr->isReceiverRunning());
+        ASSERT_TRUE(queue_enabled == ifacemgr->isDHCPReceiverRunning());
 
         // If the thread is already running, trying to start it again should fail.
         if (queue_enabled) {
             ASSERT_THROW(ifacemgr->startDHCPReceiver(AF_INET), InvalidOperation);
             // Should still have one running.
-            ASSERT_TRUE(ifacemgr->isReceiverRunning());
+            ASSERT_TRUE(ifacemgr->isDHCPReceiverRunning());
         }
 
         // Let's construct the packet to send.
@@ -660,7 +660,7 @@ public:
         // Stop the thread.  This should be no harm/no foul if we're not
         // queueuing.  Either way, we should not have a thread afterwards.
         ASSERT_NO_THROW(ifacemgr->stopDHCPReceiver());
-        ASSERT_FALSE(ifacemgr->isReceiverRunning());
+        ASSERT_FALSE(ifacemgr->isDHCPReceiverRunning());
     }
 
     /// Holds the invocation counter for ifaceMgrErrorHandler.
@@ -3047,7 +3047,7 @@ TEST_F(IfaceMgrTest, configureDHCPPacketQueueTest4) {
 
     // First let's make sure there is no queue and no thread.
     ASSERT_FALSE(ifacemgr->getPacketQueue4());
-    ASSERT_FALSE(ifacemgr->isReceiverRunning());
+    ASSERT_FALSE(ifacemgr->isDHCPReceiverRunning());
 
     bool queue_enabled = false;
     // Given an empty pointer, we should default to no queue.
@@ -3056,11 +3056,11 @@ TEST_F(IfaceMgrTest, configureDHCPPacketQueueTest4) {
     EXPECT_FALSE(queue_enabled);
     EXPECT_FALSE(ifacemgr->getPacketQueue4());
     // configureDHCPPacketQueue() should never start the thread.
-    ASSERT_FALSE(ifacemgr->isReceiverRunning());
+    ASSERT_FALSE(ifacemgr->isDHCPReceiverRunning());
 
     // Verify that calling startDHCPReceiver with no queue, does NOT start the thread.
     ASSERT_NO_THROW(ifacemgr->startDHCPReceiver(AF_INET));
-    ASSERT_FALSE(ifacemgr->isReceiverRunning());
+    ASSERT_FALSE(ifacemgr->isDHCPReceiverRunning());
 
     // Now let's try with a populated queue control, but with enable-queue = false.
     queue_control = makeQueueConfig(PacketQueueMgr4::DEFAULT_QUEUE_TYPE4, 500, false);
@@ -3068,7 +3068,7 @@ TEST_F(IfaceMgrTest, configureDHCPPacketQueueTest4) {
     EXPECT_FALSE(queue_enabled);
     EXPECT_FALSE(ifacemgr->getPacketQueue4());
     // configureDHCPPacketQueue() should never start the thread.
-    ASSERT_FALSE(ifacemgr->isReceiverRunning());
+    ASSERT_FALSE(ifacemgr->isDHCPReceiverRunning());
 
     // Now let's enable the queue.
     queue_control = makeQueueConfig(PacketQueueMgr4::DEFAULT_QUEUE_TYPE4, 500, true);
@@ -3078,11 +3078,11 @@ TEST_F(IfaceMgrTest, configureDHCPPacketQueueTest4) {
     CHECK_QUEUE_INFO(ifacemgr->getPacketQueue4(), "{ \"capacity\": 500, \"queue-type\": \""
                      << PacketQueueMgr4::DEFAULT_QUEUE_TYPE4 << "\", \"size\": 0 }");
     // configureDHCPPacketQueue() should never start the thread.
-    ASSERT_FALSE(ifacemgr->isReceiverRunning());
+    ASSERT_FALSE(ifacemgr->isDHCPReceiverRunning());
 
     // Calling startDHCPReceiver with a queue, should start the thread.
     ASSERT_NO_THROW(ifacemgr->startDHCPReceiver(AF_INET));
-    ASSERT_TRUE(ifacemgr->isReceiverRunning());
+    ASSERT_TRUE(ifacemgr->isDHCPReceiverRunning());
 
     // Verify that calling startDHCPReceiver when the thread is running, throws.
     ASSERT_THROW(ifacemgr->startDHCPReceiver(AF_INET), InvalidOperation);
@@ -3096,18 +3096,18 @@ TEST_F(IfaceMgrTest, configureDHCPPacketQueueTest4) {
 
     // We should still have our queue and the thread should still be running.
     EXPECT_TRUE(ifacemgr->getPacketQueue4());
-    ASSERT_TRUE(ifacemgr->isReceiverRunning());
+    ASSERT_TRUE(ifacemgr->isDHCPReceiverRunning());
 
     // Now let's stop stop the thread.
     ASSERT_NO_THROW(ifacemgr->stopDHCPReceiver());
-    ASSERT_FALSE(ifacemgr->isReceiverRunning());
+    ASSERT_FALSE(ifacemgr->isDHCPReceiverRunning());
     // Stopping the thread should not destroy the queue.
     ASSERT_TRUE(ifacemgr->getPacketQueue4());
 
     // Reconfigure with the queue turned off.  We should have neither queue nor thread.
     ASSERT_NO_THROW(queue_enabled = ifacemgr->configureDHCPPacketQueue(AF_INET, queue_control));
     EXPECT_FALSE(ifacemgr->getPacketQueue4());
-    ASSERT_FALSE(ifacemgr->isReceiverRunning());
+    ASSERT_FALSE(ifacemgr->isDHCPReceiverRunning());
 }
 
 // Verifies DHCPv6 behavior of configureDHCPPacketQueue()
@@ -3116,7 +3116,7 @@ TEST_F(IfaceMgrTest, configureDHCPPacketQueueTest6) {
 
     // First let's make sure there is no queue and no thread.
     ASSERT_FALSE(ifacemgr->getPacketQueue6());
-    ASSERT_FALSE(ifacemgr->isReceiverRunning());
+    ASSERT_FALSE(ifacemgr->isDHCPReceiverRunning());
 
     bool queue_enabled = false;
     // Given an empty pointer, we should default to no queue.
@@ -3125,11 +3125,11 @@ TEST_F(IfaceMgrTest, configureDHCPPacketQueueTest6) {
     EXPECT_FALSE(queue_enabled);
     EXPECT_FALSE(ifacemgr->getPacketQueue6());
     // configureDHCPPacketQueue() should never start the thread.
-    ASSERT_FALSE(ifacemgr->isReceiverRunning());
+    ASSERT_FALSE(ifacemgr->isDHCPReceiverRunning());
 
     // Verify that calling startDHCPReceiver with no queue, does NOT start the thread.
     ASSERT_NO_THROW(ifacemgr->startDHCPReceiver(AF_INET));
-    ASSERT_FALSE(ifacemgr->isReceiverRunning());
+    ASSERT_FALSE(ifacemgr->isDHCPReceiverRunning());
 
     // Now let's try with a populated queue control, but with enable-queue = false.
     queue_control = makeQueueConfig(PacketQueueMgr6::DEFAULT_QUEUE_TYPE6, 500, false);
@@ -3137,7 +3137,7 @@ TEST_F(IfaceMgrTest, configureDHCPPacketQueueTest6) {
     EXPECT_FALSE(queue_enabled);
     EXPECT_FALSE(ifacemgr->getPacketQueue6());
     // configureDHCPPacketQueue() should never start the thread.
-    ASSERT_FALSE(ifacemgr->isReceiverRunning());
+    ASSERT_FALSE(ifacemgr->isDHCPReceiverRunning());
 
     // Now let's enable the queue.
     queue_control = makeQueueConfig(PacketQueueMgr6::DEFAULT_QUEUE_TYPE6, 500, true);
@@ -3147,11 +3147,11 @@ TEST_F(IfaceMgrTest, configureDHCPPacketQueueTest6) {
     CHECK_QUEUE_INFO(ifacemgr->getPacketQueue6(), "{ \"capacity\": 500, \"queue-type\": \""
                      << PacketQueueMgr6::DEFAULT_QUEUE_TYPE6 << "\", \"size\": 0 }");
     // configureDHCPPacketQueue() should never start the thread.
-    ASSERT_FALSE(ifacemgr->isReceiverRunning());
+    ASSERT_FALSE(ifacemgr->isDHCPReceiverRunning());
 
     // Calling startDHCPReceiver with a queue, should start the thread.
     ASSERT_NO_THROW(ifacemgr->startDHCPReceiver(AF_INET6));
-    ASSERT_TRUE(ifacemgr->isReceiverRunning());
+    ASSERT_TRUE(ifacemgr->isDHCPReceiverRunning());
 
     // Verify that calling startDHCPReceiver when the thread is running, throws.
     ASSERT_THROW(ifacemgr->startDHCPReceiver(AF_INET6), InvalidOperation);
@@ -3165,206 +3165,18 @@ TEST_F(IfaceMgrTest, configureDHCPPacketQueueTest6) {
 
     // We should still have our queue and the thread should still be running.
     EXPECT_TRUE(ifacemgr->getPacketQueue6());
-    ASSERT_TRUE(ifacemgr->isReceiverRunning());
+    ASSERT_TRUE(ifacemgr->isDHCPReceiverRunning());
 
     // Now let's stop stop the thread.
     ASSERT_NO_THROW(ifacemgr->stopDHCPReceiver());
-    ASSERT_FALSE(ifacemgr->isReceiverRunning());
+    ASSERT_FALSE(ifacemgr->isDHCPReceiverRunning());
     // Stopping the thread should not destroy the queue.
     ASSERT_TRUE(ifacemgr->getPacketQueue6());
 
     // Reconfigure with the queue turned off.  We should have neither queue nor thread.
     ASSERT_NO_THROW(queue_enabled = ifacemgr->configureDHCPPacketQueue(AF_INET6, queue_control));
     EXPECT_FALSE(ifacemgr->getPacketQueue6());
-    ASSERT_FALSE(ifacemgr->isReceiverRunning());
-}
-
-/// @brief Test Fixture for testing isc:dhcp::Receiver
-class ReceiverTest : public ::testing::Test {
-public:
-    /// @brief Maximum number of passes allowed in worker event loop
-    static const int WORKER_MAX_PASSES;
-
-    /// @brief Constructor.
-    ReceiverTest() {}
-
-    /// @brief Destructor.
-    ~ReceiverTest() {
-    }
-
-    /// @brief Sleeps for a given number of event periods sleep
-    /// Each period is 50 ms.
-    void nap(int periods) {
-        usleep(periods * 50 * 1000);
-    };
-
-    /// @brief Worker function to be used by the Receiver's thread
-    ///
-    /// The function runs 5 passes through an "event" loop.
-    /// On each pass:
-    /// - check terminate command
-    /// - instigate the desired event (second pass only)
-    /// - naps for 1 period (50ms)
-    ///
-    /// @param watch_type type of event that should occur
-    void worker(Receiver::WatchType watch_type) {
-        for (passes_ = 1; passes_ < WORKER_MAX_PASSES; ++passes_) {
-
-            // Stop if we're told to do it.
-            if (receiver_->shouldTerminate()) {
-                return;
-            }
-
-            // On the second pass, set the event.
-            if (passes_ == 2) {
-                switch (watch_type) {
-                case Receiver::RCV_ERROR:
-                    receiver_->setError("we have an error");
-                    break;
-                case Receiver::RCV_READY:
-                    receiver_->markReady(watch_type);
-                    break;
-                case Receiver::RCV_TERMINATE:
-                default:
-                    // Do nothing, we're waiting to be told to stop.
-                    break;
-                }
-            }
-
-            // Take a nap.
-            nap(1);
-        }
-
-        // Indicate why we stopped.
-        receiver_->setError("thread expired");
-    }
-
-    /// @brief Current receiver instance.
-    ReceiverPtr receiver_;
-
-    /// @brief Counter used to track the number of passes made
-    /// within the thread worker function.
-    int passes_;
-};
-
-const int ReceiverTest::WORKER_MAX_PASSES = 5;
-
-/// Verifies the basic operation of the Receiver class.
-/// It checks that a Receiver can be created, can be stopped,
-/// and that in set and clear sockets.
-TEST_F(ReceiverTest, receiverClassBasics) {
-
-    /// We'll create a receiver and let it run until it expires.  (Note this is more
-    /// of a test of ReceiverTest itself and ensures our tests later for why we
-    /// exited are sound.)
-    receiver_.reset(new Receiver());
-    ASSERT_FALSE(receiver_->isRunning());
-    receiver_->start(boost::bind(&ReceiverTest::worker, this, Receiver::RCV_TERMINATE));
-    ASSERT_TRUE(receiver_->isRunning());
-
-    // Wait long enough for thread to expire.
-    nap(WORKER_MAX_PASSES + 1);
-
-    // It should have done the maximum number of passes.
-    EXPECT_EQ(passes_, WORKER_MAX_PASSES);
-
-    // Error should be ready and error text should be "thread expired".
-    ASSERT_TRUE(receiver_->isReady(Receiver::RCV_ERROR));
-    ASSERT_FALSE(receiver_->isReady(Receiver::RCV_READY));
-    ASSERT_FALSE(receiver_->isReady(Receiver::RCV_TERMINATE));
-    EXPECT_EQ("thread expired", receiver_->getLastError());
-
-    // Thread is technically still running, so let's stop it.
-    EXPECT_TRUE(receiver_->isRunning());
-    ASSERT_NO_THROW(receiver_->stop());
-    ASSERT_FALSE(receiver_->isRunning());
-
-    /// Now we'll test stopping a thread.
-    /// Start the receiver, let it run a little and then tell it to stop.
-    receiver_->start(boost::bind(&ReceiverTest::worker, this, Receiver::RCV_TERMINATE));
-    ASSERT_TRUE(receiver_->isRunning());
-
-    // No watches should be ready.
-    ASSERT_FALSE(receiver_->isReady(Receiver::RCV_ERROR));
-    ASSERT_FALSE(receiver_->isReady(Receiver::RCV_READY));
-    ASSERT_FALSE(receiver_->isReady(Receiver::RCV_TERMINATE));
-
-    // Wait a little while.
-    nap(2);
-
-    // Tell it to stop.
-    receiver_->stop();
-    ASSERT_FALSE(receiver_->isRunning());
-
-    // It should have done less than the maximum number of passes.
-    EXPECT_LT(passes_, WORKER_MAX_PASSES);
-
-    // No watches should be ready.  Error text should be "thread stopped".
-    ASSERT_FALSE(receiver_->isReady(Receiver::RCV_ERROR));
-    ASSERT_FALSE(receiver_->isReady(Receiver::RCV_READY));
-    ASSERT_FALSE(receiver_->isReady(Receiver::RCV_TERMINATE));
-    EXPECT_EQ("thread stopped", receiver_->getLastError());
-
-
-    // Next we'll test error notification.
-    // Start the receiver with a thread that sets an error on the second pass.
-    receiver_->start(boost::bind(&ReceiverTest::worker, this, Receiver::RCV_ERROR));
-    ASSERT_TRUE(receiver_->isRunning());
-
-    // No watches should be ready.
-    ASSERT_FALSE(receiver_->isReady(Receiver::RCV_ERROR));
-    ASSERT_FALSE(receiver_->isReady(Receiver::RCV_READY));
-    ASSERT_FALSE(receiver_->isReady(Receiver::RCV_TERMINATE));
-
-    // Wait a little while.
-    nap(2);
-
-    // It should now indicate an error.
-    ASSERT_TRUE(receiver_->isReady(Receiver::RCV_ERROR));
-    EXPECT_EQ("we have an error", receiver_->getLastError());
-
-    // Tell it to stop.
-    receiver_->stop();
-    ASSERT_FALSE(receiver_->isRunning());
-
-    // It should have done less than the maximum number of passes.
-    EXPECT_LT(passes_, WORKER_MAX_PASSES);
-
-    // No watches should be ready.  Error text should be "thread stopped".
-    ASSERT_FALSE(receiver_->isReady(Receiver::RCV_ERROR));
-    ASSERT_FALSE(receiver_->isReady(Receiver::RCV_READY));
-    ASSERT_FALSE(receiver_->isReady(Receiver::RCV_TERMINATE));
-    EXPECT_EQ("thread stopped", receiver_->getLastError());
-
-
-    // Finally, we'll test data ready notification.
-    // We'll start the receiver with a thread that indicates data ready on its second pass.
-    receiver_->start(boost::bind(&ReceiverTest::worker, this, Receiver::RCV_READY));
-    ASSERT_TRUE(receiver_->isRunning());
-
-    // No watches should be ready.
-    ASSERT_FALSE(receiver_->isReady(Receiver::RCV_ERROR));
-    ASSERT_FALSE(receiver_->isReady(Receiver::RCV_READY));
-    ASSERT_FALSE(receiver_->isReady(Receiver::RCV_TERMINATE));
-
-    // Wait a little while.
-    nap(2);
-
-    // It should now indicate data ready.
-    ASSERT_TRUE(receiver_->isReady(Receiver::RCV_READY));
-
-    // Tell it to stop.
-    receiver_->stop();
-    ASSERT_FALSE(receiver_->isRunning());
-
-    // It should have done less than the maximum number of passes.
-    EXPECT_LT(passes_, WORKER_MAX_PASSES);
-
-    // No watches should be ready.  Error text should be "thread stopped".
-    ASSERT_FALSE(receiver_->isReady(Receiver::RCV_ERROR));
-    ASSERT_FALSE(receiver_->isReady(Receiver::RCV_READY));
-    ASSERT_FALSE(receiver_->isReady(Receiver::RCV_TERMINATE));
-    EXPECT_EQ("thread stopped", receiver_->getLastError());
+    ASSERT_FALSE(ifacemgr->isDHCPReceiverRunning());
 }
 
 }
index e59e30e82199ddd684a4d3594a9ce9a39585bfbd..f2441658067950ac0605f986132e83356ed8a981 100644 (file)
@@ -127,9 +127,8 @@ TEST_F(PacketQueueMgr4Test, customQueueType) {
 
     // Now unregister the factory.
     ASSERT_NO_THROW(mgr().unregisterPacketQueueFactory("custom-queue"));
-
-    // Verify we did not lose the queue.
-    checkMyInfo("{ \"capacity\": 2000, \"queue-type\": \"custom-queue\", \"size\": 0 }");
+    // Queue should be gone too.
+    ASSERT_FALSE(mgr().getPacketQueue());
 
     // Try and recreate the custom queue, type should be invalid.
     ASSERT_THROW(mgr().createPacketQueue(config), InvalidQueueType);
index 00a762447c8f8d4061f70757208cf8d603c7d04e..233d540e820a98fde80ad7b917ad4132afce92d4 100644 (file)
@@ -116,9 +116,8 @@ TEST_F(PacketQueueMgr6Test, customQueueType) {
 
     // Now unregister the factory.
     ASSERT_NO_THROW(mgr().unregisterPacketQueueFactory("custom-queue"));
-
-    // Verify we did not lose the queue.
-    checkMyInfo("{ \"capacity\": 2000, \"queue-type\": \"custom-queue\", \"size\": 0 }");
+    // Queue should be gone too.
+    ASSERT_FALSE(mgr().getPacketQueue());
 
     // Try and recreate the custom queue, type should be invalid.
     ASSERT_THROW(mgr().createPacketQueue(config), InvalidQueueType);
index 0a24b1345bcd66ecbde6854c8b22f1a8f84400c7..b3e6c20e4756f917ebee17c2d0444b9529254be5 100644 (file)
@@ -7,6 +7,7 @@ AM_CPPFLAGS += $(BOOST_INCLUDES)
 lib_LTLIBRARIES = libkea-threads.la
 libkea_threads_la_SOURCES  = sync.h sync.cc
 libkea_threads_la_SOURCES += thread.h thread.cc
+libkea_threads_la_SOURCES += watched_thread.h watched_thread.cc
 libkea_threads_la_LIBADD  = $(top_builddir)/src/lib/exceptions/libkea-exceptions.la
 
 libkea_threads_la_LDFLAGS  = -no-undefined -version-info 1:0:0
index 81c5e5ef08def908cf02bf37e7225b286612e8d1..2975ab676454d01376878d64633570517aeb3919 100644 (file)
@@ -24,6 +24,7 @@ run_unittests_SOURCES  = run_unittests.cc
 run_unittests_SOURCES += thread_unittest.cc
 run_unittests_SOURCES += lock_unittest.cc
 run_unittests_SOURCES += condvar_unittest.cc
+run_unittests_SOURCES += watched_thread_unittest.cc
 
 run_unittests_CPPFLAGS = $(AM_CPPFLAGS) $(GTEST_INCLUDES)
 run_unittests_LDFLAGS = $(AM_LDFLAGS) $(GTEST_LDFLAGS)
diff --git a/src/lib/util/threads/tests/watched_thread_unittest.cc b/src/lib/util/threads/tests/watched_thread_unittest.cc
new file mode 100644 (file)
index 0000000..7531037
--- /dev/null
@@ -0,0 +1,211 @@
+// Copyright (C) 2018 Internet Systems Consortium, Inc. ("ISC")
+//
+// This Source Code Form is subject to the terms of the Mozilla Public
+// License, v. 2.0. If a copy of the MPL was not distributed with this
+// file, You can obtain one at http://mozilla.org/MPL/2.0/.
+
+#include <config.h>
+
+#include <util/threads/watched_thread.h>
+
+#include <boost/bind.hpp>
+#include <gtest/gtest.h>
+
+#include <unistd.h>
+
+using namespace std;
+using namespace isc;
+using namespace isc::util;
+using namespace isc::util::thread;
+
+namespace {
+
+/// @brief Test Fixture for testing isc:util::thread::WatchedThread
+class WatchedThreadTest : public ::testing::Test {
+public:
+    /// @brief Maximum number of passes allowed in worker event loop
+    static const int WORKER_MAX_PASSES;
+
+    /// @brief Constructor.
+    WatchedThreadTest() {}
+
+    /// @brief Destructor.
+    ~WatchedThreadTest() {
+    }
+
+    /// @brief Sleeps for a given number of event periods sleep
+    /// Each period is 50 ms.
+    void nap(int periods) {
+        usleep(periods * 500 * 1000);
+    };
+
+    /// @brief Worker function to be used by the WatchedThread's thread
+    ///
+    /// The function runs 5 passes through an "event" loop.
+    /// On each pass:
+    /// - check terminate command
+    /// - instigate the desired event (second pass only)
+    /// - naps for 1 period (50ms)
+    ///
+    /// @param watch_type type of event that should occur
+    void worker(WatchedThread::WatchType watch_type) {
+        for (passes_ = 1; passes_ < WORKER_MAX_PASSES; ++passes_) {
+
+            // Stop if we're told to do it.
+            if (wthread_->shouldTerminate()) {
+                return;
+            }
+
+            // On the second pass, set the event.
+            if (passes_ == 2) {
+                switch (watch_type) {
+                case WatchedThread::RCV_ERROR:
+                    wthread_->setError("we have an error");
+                    break;
+                case WatchedThread::RCV_READY:
+                    wthread_->markReady(watch_type);
+                    break;
+                case WatchedThread::RCV_TERMINATE:
+                default:
+                    // Do nothing, we're waiting to be told to stop.
+                    break;
+                }
+            }
+
+            // Take a nap.
+            nap(1);
+        }
+
+        // Indicate why we stopped.
+        wthread_->setError("thread expired");
+    }
+
+    /// @brief Current receiver instance.
+    WatchedThreadPtr wthread_;
+
+    /// @brief Counter used to track the number of passes made
+    /// within the thread worker function.
+    int passes_;
+};
+
+const int WatchedThreadTest::WORKER_MAX_PASSES = 5;
+
+/// Verifies the basic operation of the WatchedThread class.
+/// It checks that a WatchedThread can be created, can be stopped,
+/// and that in set and clear sockets.
+TEST_F(WatchedThreadTest, receiverClassBasics) {
+
+    /// We'll create a receiver and let it run until it expires.  (Note this is more
+    /// of a test of WatchedThreadTest itself and ensures our tests later for why we
+    /// exited are sound.)
+    wthread_.reset(new WatchedThread());
+    ASSERT_FALSE(wthread_->isRunning());
+    wthread_->start(boost::bind(&WatchedThreadTest::worker, this, WatchedThread::RCV_TERMINATE));
+    ASSERT_TRUE(wthread_->isRunning());
+
+    // Wait long enough for thread to expire.
+    nap(WORKER_MAX_PASSES + 1);
+
+    // It should have done the maximum number of passes.
+    EXPECT_EQ(passes_, WORKER_MAX_PASSES);
+
+    // Error should be ready and error text should be "thread expired".
+    ASSERT_TRUE(wthread_->isReady(WatchedThread::RCV_ERROR));
+    ASSERT_FALSE(wthread_->isReady(WatchedThread::RCV_READY));
+    ASSERT_FALSE(wthread_->isReady(WatchedThread::RCV_TERMINATE));
+    EXPECT_EQ("thread expired", wthread_->getLastError());
+
+    // Thread is technically still running, so let's stop it.
+    EXPECT_TRUE(wthread_->isRunning());
+    ASSERT_NO_THROW(wthread_->stop());
+    ASSERT_FALSE(wthread_->isRunning());
+
+    /// Now we'll test stopping a thread.
+    /// Start the receiver, let it run a little and then tell it to stop.
+    wthread_->start(boost::bind(&WatchedThreadTest::worker, this, WatchedThread::RCV_TERMINATE));
+    ASSERT_TRUE(wthread_->isRunning());
+
+    // No watches should be ready.
+    ASSERT_FALSE(wthread_->isReady(WatchedThread::RCV_ERROR));
+    ASSERT_FALSE(wthread_->isReady(WatchedThread::RCV_READY));
+    ASSERT_FALSE(wthread_->isReady(WatchedThread::RCV_TERMINATE));
+
+    // Wait a little while.
+    nap(2);
+
+    // Tell it to stop.
+    wthread_->stop();
+    ASSERT_FALSE(wthread_->isRunning());
+
+    // It should have done less than the maximum number of passes.
+    EXPECT_LT(passes_, WORKER_MAX_PASSES);
+
+    // No watches should be ready.  Error text should be "thread stopped".
+    ASSERT_FALSE(wthread_->isReady(WatchedThread::RCV_ERROR));
+    ASSERT_FALSE(wthread_->isReady(WatchedThread::RCV_READY));
+    ASSERT_FALSE(wthread_->isReady(WatchedThread::RCV_TERMINATE));
+    EXPECT_EQ("thread stopped", wthread_->getLastError());
+
+
+    // Next we'll test error notification.
+    // Start the receiver with a thread that sets an error on the second pass.
+    wthread_->start(boost::bind(&WatchedThreadTest::worker, this, WatchedThread::RCV_ERROR));
+    ASSERT_TRUE(wthread_->isRunning());
+
+    // No watches should be ready.
+    ASSERT_FALSE(wthread_->isReady(WatchedThread::RCV_ERROR));
+    ASSERT_FALSE(wthread_->isReady(WatchedThread::RCV_READY));
+    ASSERT_FALSE(wthread_->isReady(WatchedThread::RCV_TERMINATE));
+
+    // Wait a little while.
+    nap(2);
+
+    // It should now indicate an error.
+    ASSERT_TRUE(wthread_->isReady(WatchedThread::RCV_ERROR));
+    EXPECT_EQ("we have an error", wthread_->getLastError());
+
+    // Tell it to stop.
+    wthread_->stop();
+    ASSERT_FALSE(wthread_->isRunning());
+
+    // It should have done less than the maximum number of passes.
+    EXPECT_LT(passes_, WORKER_MAX_PASSES);
+
+    // No watches should be ready.  Error text should be "thread stopped".
+    ASSERT_FALSE(wthread_->isReady(WatchedThread::RCV_ERROR));
+    ASSERT_FALSE(wthread_->isReady(WatchedThread::RCV_READY));
+    ASSERT_FALSE(wthread_->isReady(WatchedThread::RCV_TERMINATE));
+    EXPECT_EQ("thread stopped", wthread_->getLastError());
+
+
+    // Finally, we'll test data ready notification.
+    // We'll start the receiver with a thread that indicates data ready on its second pass.
+    wthread_->start(boost::bind(&WatchedThreadTest::worker, this, WatchedThread::RCV_READY));
+    ASSERT_TRUE(wthread_->isRunning());
+
+    // No watches should be ready.
+    ASSERT_FALSE(wthread_->isReady(WatchedThread::RCV_ERROR));
+    ASSERT_FALSE(wthread_->isReady(WatchedThread::RCV_READY));
+    ASSERT_FALSE(wthread_->isReady(WatchedThread::RCV_TERMINATE));
+
+    // Wait a little while.
+    nap(2);
+
+    // It should now indicate data ready.
+    ASSERT_TRUE(wthread_->isReady(WatchedThread::RCV_READY));
+
+    // Tell it to stop.
+    wthread_->stop();
+    ASSERT_FALSE(wthread_->isRunning());
+
+    // It should have done less than the maximum number of passes.
+    EXPECT_LT(passes_, WORKER_MAX_PASSES);
+
+    // No watches should be ready.  Error text should be "thread stopped".
+    ASSERT_FALSE(wthread_->isReady(WatchedThread::RCV_ERROR));
+    ASSERT_FALSE(wthread_->isReady(WatchedThread::RCV_READY));
+    ASSERT_FALSE(wthread_->isReady(WatchedThread::RCV_TERMINATE));
+    EXPECT_EQ("thread stopped", wthread_->getLastError());
+}
+
+}
diff --git a/src/lib/util/threads/watched_thread.cc b/src/lib/util/threads/watched_thread.cc
new file mode 100644 (file)
index 0000000..6925a4c
--- /dev/null
@@ -0,0 +1,78 @@
+// Copyright (C) 2018 Internet Systems Consortium, Inc. ("ISC")
+//
+// This Source Code Form is subject to the terms of the Mozilla Public
+// License, v. 2.0. If a copy of the MPL was not distributed with this
+// file, You can obtain one at http://mozilla.org/MPL/2.0/.
+
+#include <config.h>
+#include <util/threads/watched_thread.h>
+
+namespace isc {
+namespace util {
+namespace thread {
+
+void
+WatchedThread::start(const boost::function<void()>& thread_main) {
+    clearReady(RCV_ERROR);
+    clearReady(RCV_READY);
+    clearReady(RCV_TERMINATE);
+    last_error_ = "no error";
+    thread_.reset(new Thread(thread_main));
+}
+
+int
+WatchedThread::getWatchFd(WatchType watch_type) {
+    return(sockets_[watch_type].getSelectFd());
+}
+
+void
+WatchedThread::markReady(WatchType watch_type)  {
+    sockets_[watch_type].markReady();
+}
+
+bool
+WatchedThread::isReady(WatchType watch_type) {
+    return (sockets_[watch_type].isReady());
+}
+
+void
+WatchedThread::clearReady(WatchType watch_type) {
+    sockets_[watch_type].clearReady();
+}
+
+bool
+WatchedThread::shouldTerminate() {
+    if (sockets_[RCV_TERMINATE].isReady()) {
+        clearReady(RCV_TERMINATE);
+        return (true);
+    }
+
+    return (false);
+}
+
+void
+WatchedThread::stop() {
+    if (thread_) {
+        markReady(RCV_TERMINATE);
+        thread_->wait();
+        thread_.reset();
+    }
+
+    clearReady(RCV_ERROR);
+    clearReady(RCV_READY);
+    last_error_ = "thread stopped";
+}
+
+void
+WatchedThread::setError(const std::string& error_msg) {
+    last_error_ = error_msg;
+    markReady(RCV_ERROR);
+}
+
+std::string
+WatchedThread::getLastError() {
+    return (last_error_);
+}
+} // end of namespace isc::util::thread
+} // end of namespace isc::util
+} // end of namespace isc
diff --git a/src/lib/util/threads/watched_thread.h b/src/lib/util/threads/watched_thread.h
new file mode 100644 (file)
index 0000000..c9dd99c
--- /dev/null
@@ -0,0 +1,133 @@
+// Copyright (C) 2018 Internet Systems Consortium, Inc. ("ISC")
+//
+// This Source Code Form is subject to the terms of the Mozilla Public
+// License, v. 2.0. If a copy of the MPL was not distributed with this
+// file, You can obtain one at http://mozilla.org/MPL/2.0/.
+
+#ifndef WATCHED_THREAD_H
+#define WATCHED_THREAD_H
+
+#include <util/watch_socket.h>
+#include <util/threads/thread.h>
+
+#include <boost/shared_ptr.hpp>
+
+namespace isc {
+namespace util {
+namespace thread {
+
+/// @brief Provides a thread and controls for receiving packets.
+///
+/// Given a "worker function", this class creates a thread which
+/// runs the function and provides the means to monitor the thread
+/// for "error" and "ready" conditions, and finally to stop the thread.
+/// It uses three WatchSockets: one to indicate an error, one to indicate
+/// data is ready, and a third to monitor as a shut-down command.
+class WatchedThread {
+public:
+    /// @brief Enumerates the list of watch sockets used to mark events
+    /// These are used as arguments to watch socket accessor methods.
+    enum WatchType {
+        RCV_ERROR = 0,
+        RCV_READY = 1,
+        RCV_TERMINATE = 2
+    };
+
+    /// @brief Constructor
+    WatchedThread(){};
+
+    /// @brief Virtual destructor
+    virtual ~WatchedThread(){}
+
+    /// @brief Fetches the fd of a watch socket
+    ///
+    /// @param watch_type indicates which watch socket
+    /// @return the watch socket's file descriptor
+    int getWatchFd(WatchType watch_type);
+
+    /// @brief Sets a watch socket state to ready
+    ///
+    /// @param watch_type indicates which watch socket to mark
+    void markReady(WatchType watch_type);
+
+    /// @brief Indicates if a watch socket state is ready
+    ///
+    /// @param watch_type indicates which watch socket to mark
+    /// @return true if the watch socket is ready, false otherwise
+    bool isReady(WatchType watch_type);
+
+    /// @brief Sets a watch socket state to not ready
+    ///
+    /// @param watch_type indicates which watch socket to clear
+    void clearReady(WatchType watch_type);
+
+    /// @brief Checks if the receiver thread should terminate
+    ///
+    /// Performs a "one-shot" check of the receiver's terminate
+    /// watch socket.  If it is ready, return true and then clear
+    /// it, otherwise return false.
+    ///
+    /// @return true if the terminate watch socket is ready
+    bool shouldTerminate();
+
+    /// @brief Creates and runs the thread.
+    ///
+    /// Creates teh receiver's thread, passing into it the given
+    /// function to run.
+    ///
+    /// @param thread_main function the receiver's thread should run
+    void start(const boost::function<void()>& thread_main);
+
+    /// @brief Returns true if the receiver thread is running
+    /// @todo - this may need additional logic to handle cases where
+    /// a thread function exits w/o the caller invoking @c
+    /// WatchedThread::stop().
+    bool isRunning() {
+        return (thread_ != 0);
+    }
+
+    /// @brief Terminates the receiver thread
+    ///
+    /// It marks the terminate watch socket ready, and then waits for the
+    /// thread to stop.  At this point, the receiver is defunct.  This is
+    /// not done in the destructor to avoid race conditions.
+    void stop();
+
+    /// @brief Sets the receiver error state
+    ///
+    /// This records the given error message and sets the error watch
+    /// socket to ready.
+    ///
+    /// @param error_msg
+    void setError(const std::string& error_msg);
+
+    /// @brief Fetches the error message text for the most recent socket error
+    ///
+    /// @return string containing the error message
+    std::string getLastError();
+
+    /// @brief Error message of the last error encountered
+    std::string last_error_;
+
+    /// @brief DHCP watch sockets that are used to communicate with the owning thread
+    /// There are three:
+    /// -# RCV_ERROR - packet receive error watch socket.
+    /// Marked as ready when the DHCP packet receiver experiences an I/O error.
+    /// -# RCV_READY - Marked as ready when the DHCP packet receiver adds a packet
+    /// to the packet queue.
+    /// -# RCV_TERMINATE Packet receiver terminate watch socket.
+    /// Marked as ready when the DHCP packet receiver thread should terminate.
+    WatchSocket sockets_[RCV_TERMINATE + 1];
+
+    /// DHCP packet receiver thread.
+    thread::ThreadPtr thread_ ;
+};
+
+/// @brief Defines a pointer to a WatchedThread
+typedef boost::shared_ptr<WatchedThread> WatchedThreadPtr;
+
+}; // namespace isc::util::thread
+}; // namespace isc::util
+}; // namespace isc
+
+#endif // WATCHED_THREAD_H