]> git.ipfire.org Git - thirdparty/kea.git/commitdiff
[#260,!120] Extracted IfaceMgr thread bits into new class, isc::dhcp::Receiver
authorThomas Markwalder <tmark@isc.org>
Thu, 15 Nov 2018 20:38:57 +0000 (15:38 -0500)
committerThomas Markwalder <tmark@isc.org>
Tue, 20 Nov 2018 18:25:03 +0000 (13:25 -0500)
src/lib/dhcp/iface_mgr.*
    Extracted the thread, watch sockets, mutex, and error message members
    from IfaceMgr and wrapped them in a new class, isc:dhcp::Receiver.

    IfaceMgr::add_fd(int fd, int& maxfd, fd_set* sockets) - new
    convenience method for adding descriptors to fd_set(s)

src/lib/dhcp/tests/iface_mgr_unittest.cc
    class ReceiverTest - new test fixture
    TEST_F(ReceiverTest, receiverClassBasics)  - new unit test

src/lib/dhcp/iface_mgr.cc
src/lib/dhcp/iface_mgr.h
src/lib/dhcp/tests/iface_mgr_unittest.cc

index 2618f3a59e3f4a4e4e305e06e9b2a84fe8f7cbdc..cdcbf5caccbc9e93665a030d03b99501d357c340 100644 (file)
@@ -179,14 +179,77 @@ bool Iface::delSocket(const uint16_t sockfd) {
     return (false); // socket not found
 }
 
+Receiver::Receiver(const boost::function<void()>& thread_main) {
+    clearReady(RCV_ERROR);
+    clearReady(RCV_READY);
+    clearReady(RCV_TERMINATE);
+    last_error_ = "no error";
+    thread_.reset(new isc::util::thread::Thread(thread_main));
+}
+
+int
+Receiver::getWatchFd(WatchType watch_type) {
+    return(sockets_[watch_type].getSelectFd());
+}
+
+void
+Receiver::markReady(WatchType watch_type)  {
+    sockets_[watch_type].markReady();
+}
+
+bool
+Receiver::isReady(WatchType watch_type) {
+    return (sockets_[watch_type].isReady());
+}
+
+void
+Receiver::clearReady(WatchType watch_type) {
+    sockets_[watch_type].clearReady();
+}
+
+bool
+Receiver::shouldTerminate() {
+    if (sockets_[RCV_TERMINATE].isReady()) {
+        clearReady(RCV_TERMINATE);
+        return (true);
+    }
+
+    return (false);
+}
+
+void
+Receiver::stop() {
+    markReady(RCV_TERMINATE);
+    thread_->wait();
+    thread_.reset();
+    clearReady(RCV_ERROR);
+    clearReady(RCV_READY);
+    last_error_ = "thread stopped";
+}
+
+isc::util::thread::Mutex&
+Receiver::getLock() {
+    return(lock_);
+}
+
+void
+Receiver::setError(const std::string& error_msg) {
+    last_error_ = error_msg;
+    markReady(RCV_ERROR);
+}
+
+std::string
+Receiver::getLastError() {
+    return (last_error_);
+}
+
 IfaceMgr::IfaceMgr()
     :control_buf_len_(CMSG_SPACE(sizeof(struct in6_pktinfo))),
      control_buf_(new char[control_buf_len_]),
      packet_filter_(new PktFilterInet()),
      packet_filter6_(new PktFilterInet6()),
      test_mode_(false),
-     allow_loopback_(false),
-    receiver_error_("no error") {
+     allow_loopback_(false) {
 
     // Ensure that PQMs have been created to guarantee we have
     // default packet queues in place.
@@ -293,13 +356,9 @@ void IfaceMgr::closeSockets() {
 
 void IfaceMgr::stopDHCPReceiver() {
     if (isReceiverRunning()) {
-        terminate_watch_.markReady();
-        receiver_thread_->wait();
-        receiver_thread_.reset();
-        error_watch_.clearReady();
-
+        receiver_->stop();
+        receiver_.reset();
     }
-    receiver_error_ = "no error";
 
     if (getPacketQueue4()) {
         getPacketQueue4()->clear();
@@ -697,10 +756,10 @@ IfaceMgr::startDHCPReceiver(const uint16_t family) {
     case AF_INET:
         // If there's no queue, then has been disabled, simply return.
         if(!getPacketQueue4()) {
-            return;
+                return;
         }
 
-        receiver_thread_.reset(new Thread(boost::bind(&IfaceMgr::receiveDHCP4Packets, this)));
+        receiver_.reset(new Receiver(boost::bind(boost::bind(&IfaceMgr::receiveDHCP4Packets, this))));
         break;
     case AF_INET6:
         // If there's no queue, then has been disabled, simply return.
@@ -708,7 +767,7 @@ IfaceMgr::startDHCPReceiver(const uint16_t family) {
             return;
         }
 
-        receiver_thread_.reset(new Thread(boost::bind(&IfaceMgr::receiveDHCP6Packets, this)));
+        receiver_.reset(new Receiver(boost::bind(boost::bind(&IfaceMgr::receiveDHCP6Packets, this))));
         break;
     default:
         isc_throw (BadValue, "startDHCPReceiver: invalid family: " << family);
@@ -980,6 +1039,7 @@ Pkt4Ptr IfaceMgr::receive4Indirect(uint32_t timeout_sec, uint32_t timeout_usec /
         isc_throw(BadValue, "fractional timeout must be shorter than"
                   " one million microseconds");
     }
+
     fd_set sockets;
     int maxfd = 0;
 
@@ -988,22 +1048,15 @@ Pkt4Ptr IfaceMgr::receive4Indirect(uint32_t timeout_sec, uint32_t timeout_usec /
     // if there are any callbacks for external sockets registered...
     if (!callbacks_.empty()) {
         BOOST_FOREACH(SocketCallbackInfo s, callbacks_) {
-            FD_SET(s.socket_, &sockets);
-            if (maxfd < s.socket_) {
-                maxfd = s.socket_;
-            }
+            add_fd(s.socket_, maxfd, &sockets);
         }
     }
 
-    // Add receiver thread watch and error sockets.
-    FD_SET(receive_watch_.getSelectFd(), &sockets);
-    if (maxfd < receive_watch_.getSelectFd()) {
-        maxfd = receive_watch_.getSelectFd();
-    }
-    FD_SET(error_watch_.getSelectFd(), &sockets);
-    if (maxfd < error_watch_.getSelectFd()) {
-        maxfd = error_watch_.getSelectFd();
-    }
+    // Add Receiver ready watch socket
+    add_fd(receiver_->getWatchFd(Receiver::RCV_READY), maxfd, &sockets);
+
+    // Add Receiver error watch socket
+    add_fd(receiver_->getWatchFd(Receiver::RCV_ERROR), maxfd, &sockets);
 
     // Set timeout for our next select() call.  If there are
     // no DHCP packets to read, then we'll wait for a finite
@@ -1046,9 +1099,9 @@ Pkt4Ptr IfaceMgr::receive4Indirect(uint32_t timeout_sec, uint32_t timeout_usec /
     // We only check external sockets if select detected an event.
     if (result > 0) {
         // Check for receiver thread read errors.
-        if (FD_ISSET(error_watch_.getSelectFd(), &sockets)) {
-            string msg = receiver_error_;
-            error_watch_.clearReady();
+        if (receiver_->isReady(Receiver::RCV_ERROR)) {
+            string msg = receiver_->getLastError();
+            receiver_->clearReady(Receiver::RCV_ERROR);
             isc_throw(SocketReadError, msg);
         }
 
@@ -1073,10 +1126,13 @@ Pkt4Ptr IfaceMgr::receive4Indirect(uint32_t timeout_sec, uint32_t timeout_usec /
 
     // If we're here it should only be because there are DHCP packets waiting.
     // Protected packet queue access.
-    Mutex::Locker lock(receiver_lock_);
-    Pkt4Ptr pkt = getPacketQueue4()->dequeuePacket();
-    if (!pkt) {
-        receive_watch_.clearReady();
+    Pkt4Ptr pkt;
+    {
+        Mutex::Locker lock(receiver_->getLock());
+        pkt = getPacketQueue4()->dequeuePacket();
+        if (!pkt) {
+            receiver_->clearReady(Receiver::RCV_READY);
+        }
     }
 
     return (pkt);
@@ -1100,15 +1156,10 @@ Pkt4Ptr IfaceMgr::receive4Direct(uint32_t timeout_sec, uint32_t timeout_usec /*
     /// provided set to indicated which sockets have something to read.
     BOOST_FOREACH(iface, ifaces_) {
         BOOST_FOREACH(SocketInfo s, iface->getSockets()) {
-
             // Only deal with IPv4 addresses.
             if (s.addr_.isV4()) {
-
                 // Add this socket to listening set
-                FD_SET(s.sockfd_, &sockets);
-                if (maxfd < s.sockfd_) {
-                    maxfd = s.sockfd_;
-                }
+                add_fd(s.sockfd_, maxfd, &sockets);
             }
         }
     }
@@ -1116,10 +1167,8 @@ Pkt4Ptr IfaceMgr::receive4Direct(uint32_t timeout_sec, uint32_t timeout_usec /*
     // if there are any callbacks for external sockets registered...
     if (!callbacks_.empty()) {
         BOOST_FOREACH(SocketCallbackInfo s, callbacks_) {
-            FD_SET(s.socket_, &sockets);
-            if (maxfd < s.socket_) {
-                maxfd = s.socket_;
-            }
+            // Add this socket to listening set
+            add_fd(s.socket_, maxfd, &sockets);
         }
     }
 
@@ -1191,7 +1240,8 @@ Pkt4Ptr IfaceMgr::receive4Direct(uint32_t timeout_sec, uint32_t timeout_usec /*
     return (packet_filter_->receive(*iface, *candidate));
 }
 
-Pkt6Ptr IfaceMgr::receive6(uint32_t timeout_sec, uint32_t timeout_usec /* = 0 */) {
+Pkt6Ptr
+IfaceMgr::receive6(uint32_t timeout_sec, uint32_t timeout_usec /* = 0 */) {
     if (isReceiverRunning()) {
         return (receive6Indirect(timeout_sec, timeout_usec));
     }
@@ -1199,7 +1249,20 @@ Pkt6Ptr IfaceMgr::receive6(uint32_t timeout_sec, uint32_t timeout_usec /* = 0 */
     return (receive6Direct(timeout_sec, timeout_usec));
 }
 
-Pkt6Ptr IfaceMgr::receive6Direct(uint32_t timeout_sec, uint32_t timeout_usec /* = 0 */ ) {
+void
+IfaceMgr::add_fd(int fd, int& maxfd, fd_set* sockets) {
+    if (!sockets) {
+        isc_throw(BadValue, "add_fd: sockets can't be null");
+    }
+
+    FD_SET(fd, sockets);
+    if (maxfd < fd) {
+        maxfd = fd;
+    }
+}
+
+Pkt6Ptr
+IfaceMgr::receive6Direct(uint32_t timeout_sec, uint32_t timeout_usec /* = 0 */ ) {
     // Sanity check for microsecond timeout.
     if (timeout_usec >= 1000000) {
         isc_throw(BadValue, "fractional timeout must be shorter than"
@@ -1216,16 +1279,11 @@ Pkt6Ptr IfaceMgr::receive6Direct(uint32_t timeout_sec, uint32_t timeout_usec /*
     /// and then use its copy for select(). Please note that select() modifies
     /// provided set to indicated which sockets have something to read.
     BOOST_FOREACH(IfacePtr iface, ifaces_) {
-
         BOOST_FOREACH(SocketInfo s, iface->getSockets()) {
             // Only deal with IPv6 addresses.
             if (s.addr_.isV6()) {
-
                 // Add this socket to listening set
-                FD_SET(s.sockfd_, &sockets);
-                if (maxfd < s.sockfd_) {
-                    maxfd = s.sockfd_;
-                }
+                add_fd(s.sockfd_, maxfd, &sockets);
             }
         }
     }
@@ -1234,10 +1292,7 @@ Pkt6Ptr IfaceMgr::receive6Direct(uint32_t timeout_sec, uint32_t timeout_usec /*
     if (!callbacks_.empty()) {
         BOOST_FOREACH(SocketCallbackInfo s, callbacks_) {
             // Add it to the set as well
-            FD_SET(s.socket_, &sockets);
-            if (maxfd < s.socket_) {
-                maxfd = s.socket_;
-            }
+            add_fd(s.socket_, maxfd, &sockets);
         }
     }
 
@@ -1307,8 +1362,8 @@ Pkt6Ptr IfaceMgr::receive6Direct(uint32_t timeout_sec, uint32_t timeout_usec /*
     return (packet_filter6_->receive(*candidate));
 }
 
-
-Pkt6Ptr IfaceMgr::receive6Indirect(uint32_t timeout_sec, uint32_t timeout_usec /* = 0 */ ) {
+Pkt6Ptr
+IfaceMgr::receive6Indirect(uint32_t timeout_sec, uint32_t timeout_usec /* = 0 */ ) {
     // Sanity check for microsecond timeout.
     if (timeout_usec >= 1000000) {
         isc_throw(BadValue, "fractional timeout must be shorter than"
@@ -1324,22 +1379,15 @@ Pkt6Ptr IfaceMgr::receive6Indirect(uint32_t timeout_sec, uint32_t timeout_usec /
     if (!callbacks_.empty()) {
         BOOST_FOREACH(SocketCallbackInfo s, callbacks_) {
             // Add it to the set as well
-            FD_SET(s.socket_, &sockets);
-            if (maxfd < s.socket_) {
-                maxfd = s.socket_;
-            }
+            add_fd(s.socket_, maxfd, &sockets);
         }
     }
 
-    // Add receiver thread watch and error sockets.
-    FD_SET(receive_watch_.getSelectFd(), &sockets);
-    if (maxfd < receive_watch_.getSelectFd()) {
-        maxfd = receive_watch_.getSelectFd();
-    }
-    FD_SET(error_watch_.getSelectFd(), &sockets);
-    if (maxfd < error_watch_.getSelectFd()) {
-        maxfd = error_watch_.getSelectFd();
-    }
+    // Add Receiver ready watch socket
+    add_fd(receiver_->getWatchFd(Receiver::RCV_READY), maxfd, &sockets);
+
+    // Add Receiver error watch socket
+    add_fd(receiver_->getWatchFd(Receiver::RCV_ERROR), maxfd, &sockets);
 
     // Set timeout for our next select() call.  If there are
     // no DHCP packets to read, then we'll wait for a finite
@@ -1382,9 +1430,9 @@ Pkt6Ptr IfaceMgr::receive6Indirect(uint32_t timeout_sec, uint32_t timeout_usec /
     // We only check external sockets if select detected an event.
     if (result > 0) {
         // Check for receiver thread read errors.
-        if (FD_ISSET(error_watch_.getSelectFd(), &sockets)) {
-            string msg = receiver_error_;
-            error_watch_.clearReady();
+        if (receiver_->isReady(Receiver::RCV_ERROR)) {
+            string msg = receiver_->getLastError();
+            receiver_->clearReady(Receiver::RCV_ERROR);
             isc_throw(SocketReadError, msg);
         }
 
@@ -1409,16 +1457,20 @@ Pkt6Ptr IfaceMgr::receive6Indirect(uint32_t timeout_sec, uint32_t timeout_usec /
 
     // If we're here it should only be because there are DHCP packets waiting.
     // Protected packet queue access.
-    Mutex::Locker lock(receiver_lock_);
-    Pkt6Ptr pkt = getPacketQueue6()->dequeuePacket();
-    if (!pkt) {
-        receive_watch_.clearReady();
+    Pkt6Ptr pkt;
+    {
+        Mutex::Locker lock(receiver_->getLock());
+        pkt = getPacketQueue6()->dequeuePacket();
+        if (!pkt) {
+            receiver_->clearReady(Receiver::RCV_READY);
+        }
     }
 
     return (pkt);
 }
 
-void IfaceMgr::receiveDHCP4Packets() {
+void
+IfaceMgr::receiveDHCP4Packets() {
     IfacePtr iface;
     fd_set sockets;
     int maxfd = 0;
@@ -1426,30 +1478,22 @@ void IfaceMgr::receiveDHCP4Packets() {
     FD_ZERO(&sockets);
 
     // Add terminate watch socket.
-    FD_SET(terminate_watch_.getSelectFd(), &sockets);
-    if (maxfd < terminate_watch_.getSelectFd()) {
-        maxfd = terminate_watch_.getSelectFd();
-    }
+    add_fd(receiver_->getWatchFd(Receiver::RCV_TERMINATE), maxfd, &sockets);
 
     // Add Interface sockets.
     BOOST_FOREACH(iface, ifaces_) {
         BOOST_FOREACH(SocketInfo s, iface->getSockets()) {
-
             // Only deal with IPv4 addresses.
             if (s.addr_.isV4()) {
                 // Add this socket to listening set.
-                FD_SET(s.sockfd_, &sockets);
-                if (maxfd < s.sockfd_) {
-                    maxfd = s.sockfd_;
-                }
+                add_fd(s.sockfd_, maxfd, &sockets);
             }
         }
     }
 
     for (;;) {
         // Check the watch socket.
-        if (terminate_watch_.isReady()) {
-            terminate_watch_.clearReady();
+        if (receiver_->shouldTerminate()) {
             return;
         }
 
@@ -1463,8 +1507,7 @@ void IfaceMgr::receiveDHCP4Packets() {
         int result = select(maxfd + 1, &rd_set, 0, 0, 0);
 
         // Re-check the watch socket.
-        if (terminate_watch_.isReady()) {
-            terminate_watch_.clearReady();
+        if (receiver_->shouldTerminate()) {
             return;
         }
 
@@ -1476,8 +1519,7 @@ void IfaceMgr::receiveDHCP4Packets() {
             // This thread should not get signals?
             if (errno != EINTR) {
                 // Signal the error to receive4.
-                receiver_error_ = strerror(errno);
-                error_watch_.markReady();
+                receiver_->setError(strerror(errno));
                 // We need to sleep in case of the error condition to
                 // prevent the thread from tight looping when result
                 // gets negative.
@@ -1492,8 +1534,7 @@ void IfaceMgr::receiveDHCP4Packets() {
                 if (FD_ISSET(s.sockfd_, &sockets)) {
                     receiveDHCP4Packet(*iface, s);
                     // Can take time so check one more time the watch socket.
-                    if (terminate_watch_.isReady()) {
-                        terminate_watch_.clearReady();
+                    if (receiver_->shouldTerminate()) {
                         return;
                     }
                 }
@@ -1503,7 +1544,8 @@ void IfaceMgr::receiveDHCP4Packets() {
 
 }
 
-void IfaceMgr::receiveDHCP6Packets() {
+void
+IfaceMgr::receiveDHCP6Packets() {
     IfacePtr iface;
     fd_set sockets;
     int maxfd = 0;
@@ -1511,31 +1553,22 @@ void IfaceMgr::receiveDHCP6Packets() {
     FD_ZERO(&sockets);
 
     // Add terminate watch socket.
-    FD_SET(terminate_watch_.getSelectFd(), &sockets);
-    if (maxfd < terminate_watch_.getSelectFd()) {
-        maxfd = terminate_watch_.getSelectFd();
-    }
+    add_fd(receiver_->getWatchFd(Receiver::RCV_TERMINATE), maxfd, &sockets);
 
     // Add Interface sockets.
     BOOST_FOREACH(iface, ifaces_) {
         BOOST_FOREACH(SocketInfo s, iface->getSockets()) {
-
             // Only deal with IPv6 addresses.
             if (s.addr_.isV6()) {
-
                 // Add this socket to listening set.
-                FD_SET(s.sockfd_, &sockets);
-                if (maxfd < s.sockfd_) {
-                    maxfd = s.sockfd_;
-                }
+                add_fd(s.sockfd_ , maxfd, &sockets);
             }
         }
     }
 
     for (;;) {
         // Check the watch socket.
-        if (terminate_watch_.isReady()) {
-            terminate_watch_.clearReady();
+        if (receiver_->shouldTerminate()) {
             return;
         }
 
@@ -1549,8 +1582,7 @@ void IfaceMgr::receiveDHCP6Packets() {
         int result = select(maxfd + 1, &rd_set, 0, 0, 0);
 
         // Re-check the watch socket.
-        if (terminate_watch_.isReady()) {
-            terminate_watch_.clearReady();
+        if (receiver_->shouldTerminate()) {
             return;
         }
 
@@ -1561,8 +1593,7 @@ void IfaceMgr::receiveDHCP6Packets() {
             // This thread should not get signals?
             if (errno != EINTR) {
                 // Signal the error to receive6.
-                receiver_error_ = strerror(errno);
-                error_watch_.markReady();
+                receiver_->setError(strerror(errno));
                 // We need to sleep in case of the error condition to
                 // prevent the thread from tight looping when result
                 // gets negative.
@@ -1577,8 +1608,7 @@ void IfaceMgr::receiveDHCP6Packets() {
                 if (FD_ISSET(s.sockfd_, &sockets)) {
                     receiveDHCP6Packet(s);
                     // Can take time so check one more time the watch socket.
-                    if (terminate_watch_.isReady()) {
-                        terminate_watch_.clearReady();
+                    if (receiver_->shouldTerminate()) {
                         return;
                     }
                 }
@@ -1587,13 +1617,14 @@ void IfaceMgr::receiveDHCP6Packets() {
     }
 }
 
-void IfaceMgr::receiveDHCP4Packet(Iface& iface, const SocketInfo& socket_info) {
+void
+IfaceMgr::receiveDHCP4Packet(Iface& iface, const SocketInfo& socket_info) {
     int len;
+
     int result = ioctl(socket_info.sockfd_, FIONREAD, &len);
     if (result < 0) {
         // Signal the error to receive4.
-        receiver_error_ = strerror(errno);
-        error_watch_.markReady();
+        receiver_->setError(strerror(errno));
         return;
     }
     if (len == 0) {
@@ -1606,27 +1637,26 @@ void IfaceMgr::receiveDHCP4Packet(Iface& iface, const SocketInfo& socket_info) {
     try {
         pkt = packet_filter_->receive(iface, socket_info);
     } catch (const std::exception& ex) {
-        receiver_error_ = ex.what();
-        error_watch_.markReady();
+        receiver_->setError(strerror(errno));
     } catch (...) {
-        receiver_error_ = "packet filter receive() failed";
-        error_watch_.markReady();
+        receiver_->setError("packet filter receive() failed");
     }
 
     if (pkt) {
-        Mutex::Locker lock(receiver_lock_);
+        Mutex::Locker lock(receiver_->getLock());
         getPacketQueue4()->enqueuePacket(pkt, socket_info);
-        receive_watch_.markReady();
+        receiver_->markReady(Receiver::RCV_READY);
     }
 }
 
-void IfaceMgr::receiveDHCP6Packet(const SocketInfo& socket_info) {
+void
+IfaceMgr::receiveDHCP6Packet(const SocketInfo& socket_info) {
     int len;
+
     int result = ioctl(socket_info.sockfd_, FIONREAD, &len);
     if (result < 0) {
         // Signal the error to receive6.
-        receiver_error_ = strerror(errno);
-        error_watch_.markReady();
+        receiver_->setError(strerror(errno));
         return;
     }
     if (len == 0) {
@@ -1639,21 +1669,20 @@ void IfaceMgr::receiveDHCP6Packet(const SocketInfo& socket_info) {
     try {
         pkt = packet_filter6_->receive(socket_info);
     } catch (const std::exception& ex) {
-        receiver_error_ = ex.what();
-        error_watch_.markReady();
+        receiver_->setError(ex.what());
     } catch (...) {
-        receiver_error_ = "packet filter receive() failed";
-        error_watch_.markReady();
+        receiver_->setError("packet filter receive() failed");
     }
 
     if (pkt) {
-        Mutex::Locker lock(receiver_lock_);
+        Mutex::Locker lock(receiver_->getLock());
         getPacketQueue6()->enqueuePacket(pkt, socket_info);
-        receive_watch_.markReady();
+        receiver_->markReady(Receiver::RCV_READY);
     }
 }
 
-uint16_t IfaceMgr::getSocket(const isc::dhcp::Pkt6& pkt) {
+uint16_t
+IfaceMgr::getSocket(const isc::dhcp::Pkt6& pkt) {
     IfacePtr iface = getIface(pkt.getIface());
     if (!iface) {
         isc_throw(IfaceNotFound, "Tried to find socket for non-existent interface");
index 310a52f227fe4800ece6db1eb70e0b9d22e8689f..dc950e9847771f3551e571c850b4787bc4724654 100644 (file)
@@ -458,6 +458,115 @@ private:
 
 typedef boost::shared_ptr<Iface> IfacePtr;
 
+/// @brief Provides a thread and controls for receiving packets.
+///
+/// Given a "worker function", this class creates a thread which
+/// runs the function and provides the means to monitor the thread
+/// for "error" and "ready" conditions, and finally to stop the thread.
+/// It uses three WatchSockets: one to indicate an error, one to indicate
+/// data is ready, and a third to monitor as a shut-down command.
+class Receiver {
+public:
+    /// @brief Enumerates the list of watch sockets used to mark events
+    /// These are used as arguments to watch socket accessor methods.
+    enum WatchType {
+        RCV_ERROR = 0,
+        RCV_READY = 1,
+        RCV_TERMINATE = 2
+    };
+
+    /// @brief Constructor
+    ///
+    /// It initializes the watch sockets and then instantiates and
+    /// starts the receiver's worker thread.
+    ///
+    /// @param thread_main function the receiver's thread should run
+    Receiver(const boost::function<void()>& thread_main);
+
+    /// @brief Virtual destructor
+    virtual ~Receiver() {}
+
+    /// @brief Fetches the fd of a watch socket
+    ///
+    /// @param watch_type indicates which watch socket
+    /// @return the watch socket's file descriptor
+    int getWatchFd(WatchType watch_type);
+
+    /// @brief Sets a watch socket state to ready
+    ///
+    /// @param watch_type indicates which watch socket to mark
+    void markReady(WatchType watch_type);
+
+    /// @brief Indicates if a watch socket state is ready
+    ///
+    /// @param watch_type indicates which watch socket to mark
+    /// @return true if the watch socket is ready, false otherwise
+    bool isReady(WatchType watch_type);
+
+    /// @brief Sets a watch socket state to not ready
+    ///
+    /// @param watch_type indicates which watch socket to clear
+    void clearReady(WatchType watch_type);
+
+    /// @brief Checks if the receiver thread should terminate
+    ///
+    /// Performs a "one-shot" check of the receiver's terminate
+    /// watch socket.  If it is ready, return true and then clear
+    /// it, otherwise return false.
+    ///
+    /// @return true if the terminate watch socket is ready
+    bool shouldTerminate();
+
+    /// @brief Terminates the receiver thread
+    ///
+    /// It marks the terminate watch socket ready, and then waits for the
+    /// thread to stop.  At this point, the receiver is defunct.  This is
+    /// not done in the destructor to avoid race conditions.
+    void stop();
+
+    /// @brief Fetches the receiver's thread mutex.
+    ///
+    /// This is used for instantation mutex locks to protect code blocks.
+    ///
+    /// @return a reference to the mutex
+    isc::util::thread::Mutex& getLock();
+
+    /// @brief Sets the receiver error state
+    ///
+    /// This records the given error message and sets the error watch
+    /// socket to ready.
+    ///
+    /// @param error_msg
+    void setError(const std::string& error_msg);
+
+    /// @brief Fetches the error message text for the most recent socket error
+    ///
+    /// @return string containing the error message
+    std::string getLastError();
+
+    /// @brief Error message of the last error encountered
+    std::string last_error_;
+
+    /// @brief DHCP watch sockets that are used to communicate with the owning thread
+    /// There are three:
+    /// -# RCV_ERROR - packet receive error watch socket.
+    /// Marked as ready when the DHCP packet receiver experiences an I/O error.
+    /// -# RCV_READY - Marked as ready when the DHCP packet receiver adds a packet
+    /// to the packet queue.
+    /// -# RCV_TERMINATE Packet receiver terminate watch socket.
+    /// Marked as ready when the DHCP packet receiver thread should terminate.
+    isc::util::WatchSocket sockets_[RCV_TERMINATE + 1];
+
+    /// DHCP packet thread mutex.
+    isc::util::thread::Mutex lock_;
+
+    /// DHCP packet receiver thread.
+    isc::util::thread::ThreadPtr thread_ ;
+};
+
+/// @brief Defines a pointer to a Receiver
+typedef boost::shared_ptr<Receiver> ReceiverPtr;
+
 /// @brief Forward declaration to the @c IfaceMgr.
 class IfaceMgr;
 
@@ -1056,7 +1165,7 @@ public:
 
     /// @brief Returns true if there is a receiver currently running.
     bool isReceiverRunning() const {
-        return (receiver_thread_ != 0);
+        return (receiver_ != 0);
     }
 
     /// @brief Configures DHCP packet queue
@@ -1075,6 +1184,15 @@ public:
     bool configureDHCPPacketQueue(const uint16_t family,
                                   data::ConstElementPtr queue_control);
 
+    /// @brief Convenience method for adding an descriptor to a set
+    ///
+    /// @param fd descriptor to add
+    /// @param[out] maxfd maximum fd value in the set.  If the new fd is
+    /// larger than it's current value, it will be updated to new fd value
+    /// @param sockets pointer to the set of sockets
+    /// @throw BadValue if sockets is null
+    static void add_fd(int fd, int& maxfd, fd_set* sockets);
+
     // don't use private, we need derived classes in tests
 protected:
 
@@ -1349,28 +1467,8 @@ private:
     /// @brief Allows to use loopback
     bool allow_loopback_;
 
-    /// @brief Error message of the last DHCP packet receive error.
-    std::string receiver_error_;
-
-    /// @brief DHCP packet receive error watch socket.
-    /// Marked as ready when the DHCP packet receiver experiences
-    /// an I/O error.
-    isc::util::WatchSocket error_watch_;
-
-    /// @brief DHCP packet receive watch socket.
-    /// Marked as ready when the DHCP packet receiver adds a packet
-    /// to the packet queue.
-    isc::util::WatchSocket receive_watch_;
-
-    /// @brief Packet receiver terminate watch socket.
-    /// Marked as ready when the DHCP packet receiver thread should terminate.
-    isc::util::WatchSocket terminate_watch_;
-
-    /// DHCP packet receiver mutex.
-    isc::util::thread::Mutex receiver_lock_;
-
-    /// DHCP packet receiver thread.
-    isc::util::thread::ThreadPtr receiver_thread_;
+    /// DHCP packet receiver.
+    ReceiverPtr receiver_;
 };
 
 }; // namespace isc::dhcp
index edad39271f2f811cb133c1fcaa1aa48f239efc38..94cbe8d71dc402de940dab0cf5c0672e22dfaeb6 100644 (file)
@@ -867,7 +867,7 @@ TEST_F(IfaceMgrTest, packetQueue4) {
     // Verify that we can create a queue with default factory.
     data::ConstElementPtr config = makeQueueConfig(PacketQueueMgr4::DEFAULT_QUEUE_TYPE4, 2000);
     ASSERT_NO_THROW(PacketQueueMgr4::instance().createPacketQueue(config));
-    CHECK_QUEUE_INFO(ifacemgr.getPacketQueue4(), "{ \"capacity\": 2000, \"queue-type\": \"" 
+    CHECK_QUEUE_INFO(ifacemgr.getPacketQueue4(), "{ \"capacity\": 2000, \"queue-type\": \""
                      << PacketQueueMgr4::DEFAULT_QUEUE_TYPE4 << "\", \"size\": 0 }");
 
     // Verify that fetching the queue via IfaceMgr and PacketQueueMgr
@@ -885,7 +885,7 @@ TEST_F(IfaceMgrTest, packetQueue6) {
     // Verify that we can create a queue with default factory.
     data::ConstElementPtr config = makeQueueConfig(PacketQueueMgr6::DEFAULT_QUEUE_TYPE6, 2000);
     ASSERT_NO_THROW(PacketQueueMgr6::instance().createPacketQueue(config));
-    CHECK_QUEUE_INFO(ifacemgr.getPacketQueue6(), "{ \"capacity\": 2000, \"queue-type\": \"" 
+    CHECK_QUEUE_INFO(ifacemgr.getPacketQueue6(), "{ \"capacity\": 2000, \"queue-type\": \""
                      << PacketQueueMgr6::DEFAULT_QUEUE_TYPE6 << "\", \"size\": 0 }");
 
     // Verify that fetching the queue via IfaceMgr and PacketQueueMgr
@@ -3083,7 +3083,7 @@ TEST_F(IfaceMgrTest, configureDHCPPacketQueueTest4) {
     ASSERT_NO_THROW(queue_enabled = ifacemgr->configureDHCPPacketQueue(AF_INET, queue_control));
     ASSERT_TRUE(queue_enabled);
     // Verify we have correctly created the queue.
-    CHECK_QUEUE_INFO(ifacemgr->getPacketQueue4(), "{ \"capacity\": 500, \"queue-type\": \"" 
+    CHECK_QUEUE_INFO(ifacemgr->getPacketQueue4(), "{ \"capacity\": 500, \"queue-type\": \""
                      << PacketQueueMgr4::DEFAULT_QUEUE_TYPE4 << "\", \"size\": 0 }");
     // configureDHCPPacketQueue() should never start the thread.
     ASSERT_FALSE(ifacemgr->isReceiverRunning());
@@ -3152,7 +3152,7 @@ TEST_F(IfaceMgrTest, configureDHCPPacketQueueTest6) {
     ASSERT_NO_THROW(queue_enabled = ifacemgr->configureDHCPPacketQueue(AF_INET6, queue_control));
     ASSERT_TRUE(queue_enabled);
     // Verify we have correctly created the queue.
-    CHECK_QUEUE_INFO(ifacemgr->getPacketQueue6(), "{ \"capacity\": 500, \"queue-type\": \"" 
+    CHECK_QUEUE_INFO(ifacemgr->getPacketQueue6(), "{ \"capacity\": 500, \"queue-type\": \""
                      << PacketQueueMgr6::DEFAULT_QUEUE_TYPE6 << "\", \"size\": 0 }");
     // configureDHCPPacketQueue() should never start the thread.
     ASSERT_FALSE(ifacemgr->isReceiverRunning());
@@ -3187,5 +3187,178 @@ TEST_F(IfaceMgrTest, configureDHCPPacketQueueTest6) {
     ASSERT_FALSE(ifacemgr->isReceiverRunning());
 }
 
+/// @brief Test Fixture for testing isc:dhcp::Receiver
+class ReceiverTest : public ::testing::Test {
+public:
+    /// @brief Maximum number of passes allowed in worker event loop
+    static const int WORKER_MAX_PASSES;
+
+    /// @brief Constructor.
+    ReceiverTest() {}
+
+    /// @brief Destructor.
+    ~ReceiverTest() {
+    }
+
+    /// @brief Sleeps for a given number of event periods sleep
+    /// Each period is 50 ms.
+    void nap(int periods) {
+        usleep(periods * 50 * 1000);
+    };
+
+    /// @brief Worker function to be used by the Receiver's thread
+    ///
+    /// The function runs 5 passes through an "event" loop.
+    /// On each pass:
+    /// - check terminate command
+    /// - instigate the desired event (second pass only)
+    /// - naps for 1 period (50ms)
+    ///
+    /// @param watch_type type of event that should occur
+    void worker(Receiver::WatchType watch_type) {
+        for (passes_ = 1; passes_ < WORKER_MAX_PASSES; ++passes_) {
+
+            // Stop if we're told to do it.
+            if (receiver_->shouldTerminate()) {
+                return;
+            }
+
+            // On the second pass, set the event.
+            if (passes_ == 2) {
+                switch (watch_type) {
+                case Receiver::RCV_ERROR:
+                    receiver_->setError("we have an error");
+                    break;
+                case Receiver::RCV_READY:
+                    receiver_->markReady(watch_type);
+                    break;
+                case Receiver::RCV_TERMINATE:
+                default:
+                    // Do nothing, we're waiting to be told to stop.
+                    break;
+                }
+            }
+
+            // Take a nap.
+            nap(1);
+        }
+
+        // Indicate why we stopped.
+        receiver_->setError("thread expired");
+    }
+
+    /// @brief Current receiver instance.
+    ReceiverPtr receiver_;
+
+    /// @brief Counter used to track the number of passes made
+    /// within the thread worker function.
+    int passes_;
+};
+
+const int ReceiverTest::WORKER_MAX_PASSES = 5;
+
+/// Verifies the basic operation of the Receiver class.
+/// It checks that a Receiver can be created, can be stopped,
+/// and that in set and clear sockets.
+TEST_F(ReceiverTest, receiverClassBasics) {
+
+    /// We'll create a receiver and let it run until it expires.  (Note this is more
+    /// of a test of ReceiverTest itself and ensures our tests later for why we
+    /// exited are sound.)
+    receiver_.reset(new Receiver(boost::bind(boost::bind(&ReceiverTest::worker, this,
+                                                         Receiver::RCV_TERMINATE))));
+    // Wait long enough for thread to expire.
+    nap(WORKER_MAX_PASSES + 1);
+
+    // It should have done the maximum number of passes.
+    EXPECT_EQ(passes_, WORKER_MAX_PASSES);
+
+    // Error should be ready and error text should be "thread expired".
+    ASSERT_TRUE(receiver_->isReady(Receiver::RCV_ERROR));
+    ASSERT_FALSE(receiver_->isReady(Receiver::RCV_READY));
+    ASSERT_FALSE(receiver_->isReady(Receiver::RCV_TERMINATE));
+    EXPECT_EQ("thread expired", receiver_->getLastError());
+
+    /// Now we'll test stopping a thread.
+    /// We'll create a Receiver, let it run a little and then tell it to stop.
+    receiver_.reset(new Receiver(boost::bind(boost::bind(&ReceiverTest::worker, this,
+                                                         Receiver::RCV_TERMINATE))));
+    // No watches should be ready.
+    ASSERT_FALSE(receiver_->isReady(Receiver::RCV_ERROR));
+    ASSERT_FALSE(receiver_->isReady(Receiver::RCV_READY));
+    ASSERT_FALSE(receiver_->isReady(Receiver::RCV_TERMINATE));
+
+    // Wait a little while.
+    nap(2);
+
+    // Tell it to stop.
+    receiver_->stop();
+
+    // It should have done less than the maximum number of passes.
+    EXPECT_LT(passes_, WORKER_MAX_PASSES);
+
+    // No watches should be ready.  Error text should be "thread stopped".
+    ASSERT_FALSE(receiver_->isReady(Receiver::RCV_ERROR));
+    ASSERT_FALSE(receiver_->isReady(Receiver::RCV_READY));
+    ASSERT_FALSE(receiver_->isReady(Receiver::RCV_TERMINATE));
+    EXPECT_EQ("thread stopped", receiver_->getLastError());
+
+
+    // Next we'll test error notification.
+    // We'll create a receiver that sets an error on the second pass.
+    receiver_.reset(new Receiver(boost::bind(boost::bind(&ReceiverTest::worker, this,
+                                                         Receiver::RCV_ERROR))));
+    // No watches should be ready.
+    ASSERT_FALSE(receiver_->isReady(Receiver::RCV_ERROR));
+    ASSERT_FALSE(receiver_->isReady(Receiver::RCV_READY));
+    ASSERT_FALSE(receiver_->isReady(Receiver::RCV_TERMINATE));
+
+    // Wait a little while.
+    nap(2);
+
+    // It should now indicate an error.
+    ASSERT_TRUE(receiver_->isReady(Receiver::RCV_ERROR));
+    EXPECT_EQ("we have an error", receiver_->getLastError());
+
+    // Tell it to stop.
+    receiver_->stop();
+
+    // It should have done less than the maximum number of passes.
+    EXPECT_LT(passes_, WORKER_MAX_PASSES);
+
+    // No watches should be ready.  Error text should be "thread stopped".
+    ASSERT_FALSE(receiver_->isReady(Receiver::RCV_ERROR));
+    ASSERT_FALSE(receiver_->isReady(Receiver::RCV_READY));
+    ASSERT_FALSE(receiver_->isReady(Receiver::RCV_TERMINATE));
+    EXPECT_EQ("thread stopped", receiver_->getLastError());
+
+
+    // Finally, we'll test data ready notification.
+    // We'll create a receiver that indicates data ready on its second pass.
+    receiver_.reset(new Receiver(boost::bind(boost::bind(&ReceiverTest::worker, this,
+                                                         Receiver::RCV_READY))));
+    // No watches should be ready.
+    ASSERT_FALSE(receiver_->isReady(Receiver::RCV_ERROR));
+    ASSERT_FALSE(receiver_->isReady(Receiver::RCV_READY));
+    ASSERT_FALSE(receiver_->isReady(Receiver::RCV_TERMINATE));
+
+    // Wait a little while.
+    nap(2);
+
+    // It should now indicate data ready.
+    ASSERT_TRUE(receiver_->isReady(Receiver::RCV_READY));
+
+    // Tell it to stop.
+    receiver_->stop();
+
+    // It should have done less than the maximum number of passes.
+    EXPECT_LT(passes_, WORKER_MAX_PASSES);
+
+    // No watches should be ready.  Error text should be "thread stopped".
+    ASSERT_FALSE(receiver_->isReady(Receiver::RCV_ERROR));
+    ASSERT_FALSE(receiver_->isReady(Receiver::RCV_READY));
+    ASSERT_FALSE(receiver_->isReady(Receiver::RCV_TERMINATE));
+    EXPECT_EQ("thread stopped", receiver_->getLastError());
+}
 
 }