From: Thomas Markwalder Date: Thu, 15 Nov 2018 20:38:57 +0000 (-0500) Subject: [#260,!120] Extracted IfaceMgr thread bits into new class, isc::dhcp::Receiver X-Git-Tag: 204-move-models-base~4^2^2~10 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=88b5f25a62be2d45bc2f2626abf00b3ceb54591f;p=thirdparty%2Fkea.git [#260,!120] Extracted IfaceMgr thread bits into new class, isc::dhcp::Receiver src/lib/dhcp/iface_mgr.* Extracted the thread, watch sockets, mutex, and error message members from IfaceMgr and wrapped them in a new class, isc:dhcp::Receiver. IfaceMgr::add_fd(int fd, int& maxfd, fd_set* sockets) - new convenience method for adding descriptors to fd_set(s) src/lib/dhcp/tests/iface_mgr_unittest.cc class ReceiverTest - new test fixture TEST_F(ReceiverTest, receiverClassBasics) - new unit test --- diff --git a/src/lib/dhcp/iface_mgr.cc b/src/lib/dhcp/iface_mgr.cc index 2618f3a59e..cdcbf5cacc 100644 --- a/src/lib/dhcp/iface_mgr.cc +++ b/src/lib/dhcp/iface_mgr.cc @@ -179,14 +179,77 @@ bool Iface::delSocket(const uint16_t sockfd) { return (false); // socket not found } +Receiver::Receiver(const boost::function& thread_main) { + clearReady(RCV_ERROR); + clearReady(RCV_READY); + clearReady(RCV_TERMINATE); + last_error_ = "no error"; + thread_.reset(new isc::util::thread::Thread(thread_main)); +} + +int +Receiver::getWatchFd(WatchType watch_type) { + return(sockets_[watch_type].getSelectFd()); +} + +void +Receiver::markReady(WatchType watch_type) { + sockets_[watch_type].markReady(); +} + +bool +Receiver::isReady(WatchType watch_type) { + return (sockets_[watch_type].isReady()); +} + +void +Receiver::clearReady(WatchType watch_type) { + sockets_[watch_type].clearReady(); +} + +bool +Receiver::shouldTerminate() { + if (sockets_[RCV_TERMINATE].isReady()) { + clearReady(RCV_TERMINATE); + return (true); + } + + return (false); +} + +void +Receiver::stop() { + markReady(RCV_TERMINATE); + thread_->wait(); + thread_.reset(); + clearReady(RCV_ERROR); + clearReady(RCV_READY); + last_error_ = "thread stopped"; +} + +isc::util::thread::Mutex& +Receiver::getLock() { + return(lock_); +} + +void +Receiver::setError(const std::string& error_msg) { + last_error_ = error_msg; + markReady(RCV_ERROR); +} + +std::string +Receiver::getLastError() { + return (last_error_); +} + IfaceMgr::IfaceMgr() :control_buf_len_(CMSG_SPACE(sizeof(struct in6_pktinfo))), control_buf_(new char[control_buf_len_]), packet_filter_(new PktFilterInet()), packet_filter6_(new PktFilterInet6()), test_mode_(false), - allow_loopback_(false), - receiver_error_("no error") { + allow_loopback_(false) { // Ensure that PQMs have been created to guarantee we have // default packet queues in place. @@ -293,13 +356,9 @@ void IfaceMgr::closeSockets() { void IfaceMgr::stopDHCPReceiver() { if (isReceiverRunning()) { - terminate_watch_.markReady(); - receiver_thread_->wait(); - receiver_thread_.reset(); - error_watch_.clearReady(); - + receiver_->stop(); + receiver_.reset(); } - receiver_error_ = "no error"; if (getPacketQueue4()) { getPacketQueue4()->clear(); @@ -697,10 +756,10 @@ IfaceMgr::startDHCPReceiver(const uint16_t family) { case AF_INET: // If there's no queue, then has been disabled, simply return. if(!getPacketQueue4()) { - return; + return; } - receiver_thread_.reset(new Thread(boost::bind(&IfaceMgr::receiveDHCP4Packets, this))); + receiver_.reset(new Receiver(boost::bind(boost::bind(&IfaceMgr::receiveDHCP4Packets, this)))); break; case AF_INET6: // If there's no queue, then has been disabled, simply return. @@ -708,7 +767,7 @@ IfaceMgr::startDHCPReceiver(const uint16_t family) { return; } - receiver_thread_.reset(new Thread(boost::bind(&IfaceMgr::receiveDHCP6Packets, this))); + receiver_.reset(new Receiver(boost::bind(boost::bind(&IfaceMgr::receiveDHCP6Packets, this)))); break; default: isc_throw (BadValue, "startDHCPReceiver: invalid family: " << family); @@ -980,6 +1039,7 @@ Pkt4Ptr IfaceMgr::receive4Indirect(uint32_t timeout_sec, uint32_t timeout_usec / isc_throw(BadValue, "fractional timeout must be shorter than" " one million microseconds"); } + fd_set sockets; int maxfd = 0; @@ -988,22 +1048,15 @@ Pkt4Ptr IfaceMgr::receive4Indirect(uint32_t timeout_sec, uint32_t timeout_usec / // if there are any callbacks for external sockets registered... if (!callbacks_.empty()) { BOOST_FOREACH(SocketCallbackInfo s, callbacks_) { - FD_SET(s.socket_, &sockets); - if (maxfd < s.socket_) { - maxfd = s.socket_; - } + add_fd(s.socket_, maxfd, &sockets); } } - // Add receiver thread watch and error sockets. - FD_SET(receive_watch_.getSelectFd(), &sockets); - if (maxfd < receive_watch_.getSelectFd()) { - maxfd = receive_watch_.getSelectFd(); - } - FD_SET(error_watch_.getSelectFd(), &sockets); - if (maxfd < error_watch_.getSelectFd()) { - maxfd = error_watch_.getSelectFd(); - } + // Add Receiver ready watch socket + add_fd(receiver_->getWatchFd(Receiver::RCV_READY), maxfd, &sockets); + + // Add Receiver error watch socket + add_fd(receiver_->getWatchFd(Receiver::RCV_ERROR), maxfd, &sockets); // Set timeout for our next select() call. If there are // no DHCP packets to read, then we'll wait for a finite @@ -1046,9 +1099,9 @@ Pkt4Ptr IfaceMgr::receive4Indirect(uint32_t timeout_sec, uint32_t timeout_usec / // We only check external sockets if select detected an event. if (result > 0) { // Check for receiver thread read errors. - if (FD_ISSET(error_watch_.getSelectFd(), &sockets)) { - string msg = receiver_error_; - error_watch_.clearReady(); + if (receiver_->isReady(Receiver::RCV_ERROR)) { + string msg = receiver_->getLastError(); + receiver_->clearReady(Receiver::RCV_ERROR); isc_throw(SocketReadError, msg); } @@ -1073,10 +1126,13 @@ Pkt4Ptr IfaceMgr::receive4Indirect(uint32_t timeout_sec, uint32_t timeout_usec / // If we're here it should only be because there are DHCP packets waiting. // Protected packet queue access. - Mutex::Locker lock(receiver_lock_); - Pkt4Ptr pkt = getPacketQueue4()->dequeuePacket(); - if (!pkt) { - receive_watch_.clearReady(); + Pkt4Ptr pkt; + { + Mutex::Locker lock(receiver_->getLock()); + pkt = getPacketQueue4()->dequeuePacket(); + if (!pkt) { + receiver_->clearReady(Receiver::RCV_READY); + } } return (pkt); @@ -1100,15 +1156,10 @@ Pkt4Ptr IfaceMgr::receive4Direct(uint32_t timeout_sec, uint32_t timeout_usec /* /// provided set to indicated which sockets have something to read. BOOST_FOREACH(iface, ifaces_) { BOOST_FOREACH(SocketInfo s, iface->getSockets()) { - // Only deal with IPv4 addresses. if (s.addr_.isV4()) { - // Add this socket to listening set - FD_SET(s.sockfd_, &sockets); - if (maxfd < s.sockfd_) { - maxfd = s.sockfd_; - } + add_fd(s.sockfd_, maxfd, &sockets); } } } @@ -1116,10 +1167,8 @@ Pkt4Ptr IfaceMgr::receive4Direct(uint32_t timeout_sec, uint32_t timeout_usec /* // if there are any callbacks for external sockets registered... if (!callbacks_.empty()) { BOOST_FOREACH(SocketCallbackInfo s, callbacks_) { - FD_SET(s.socket_, &sockets); - if (maxfd < s.socket_) { - maxfd = s.socket_; - } + // Add this socket to listening set + add_fd(s.socket_, maxfd, &sockets); } } @@ -1191,7 +1240,8 @@ Pkt4Ptr IfaceMgr::receive4Direct(uint32_t timeout_sec, uint32_t timeout_usec /* return (packet_filter_->receive(*iface, *candidate)); } -Pkt6Ptr IfaceMgr::receive6(uint32_t timeout_sec, uint32_t timeout_usec /* = 0 */) { +Pkt6Ptr +IfaceMgr::receive6(uint32_t timeout_sec, uint32_t timeout_usec /* = 0 */) { if (isReceiverRunning()) { return (receive6Indirect(timeout_sec, timeout_usec)); } @@ -1199,7 +1249,20 @@ Pkt6Ptr IfaceMgr::receive6(uint32_t timeout_sec, uint32_t timeout_usec /* = 0 */ return (receive6Direct(timeout_sec, timeout_usec)); } -Pkt6Ptr IfaceMgr::receive6Direct(uint32_t timeout_sec, uint32_t timeout_usec /* = 0 */ ) { +void +IfaceMgr::add_fd(int fd, int& maxfd, fd_set* sockets) { + if (!sockets) { + isc_throw(BadValue, "add_fd: sockets can't be null"); + } + + FD_SET(fd, sockets); + if (maxfd < fd) { + maxfd = fd; + } +} + +Pkt6Ptr +IfaceMgr::receive6Direct(uint32_t timeout_sec, uint32_t timeout_usec /* = 0 */ ) { // Sanity check for microsecond timeout. if (timeout_usec >= 1000000) { isc_throw(BadValue, "fractional timeout must be shorter than" @@ -1216,16 +1279,11 @@ Pkt6Ptr IfaceMgr::receive6Direct(uint32_t timeout_sec, uint32_t timeout_usec /* /// and then use its copy for select(). Please note that select() modifies /// provided set to indicated which sockets have something to read. BOOST_FOREACH(IfacePtr iface, ifaces_) { - BOOST_FOREACH(SocketInfo s, iface->getSockets()) { // Only deal with IPv6 addresses. if (s.addr_.isV6()) { - // Add this socket to listening set - FD_SET(s.sockfd_, &sockets); - if (maxfd < s.sockfd_) { - maxfd = s.sockfd_; - } + add_fd(s.sockfd_, maxfd, &sockets); } } } @@ -1234,10 +1292,7 @@ Pkt6Ptr IfaceMgr::receive6Direct(uint32_t timeout_sec, uint32_t timeout_usec /* if (!callbacks_.empty()) { BOOST_FOREACH(SocketCallbackInfo s, callbacks_) { // Add it to the set as well - FD_SET(s.socket_, &sockets); - if (maxfd < s.socket_) { - maxfd = s.socket_; - } + add_fd(s.socket_, maxfd, &sockets); } } @@ -1307,8 +1362,8 @@ Pkt6Ptr IfaceMgr::receive6Direct(uint32_t timeout_sec, uint32_t timeout_usec /* return (packet_filter6_->receive(*candidate)); } - -Pkt6Ptr IfaceMgr::receive6Indirect(uint32_t timeout_sec, uint32_t timeout_usec /* = 0 */ ) { +Pkt6Ptr +IfaceMgr::receive6Indirect(uint32_t timeout_sec, uint32_t timeout_usec /* = 0 */ ) { // Sanity check for microsecond timeout. if (timeout_usec >= 1000000) { isc_throw(BadValue, "fractional timeout must be shorter than" @@ -1324,22 +1379,15 @@ Pkt6Ptr IfaceMgr::receive6Indirect(uint32_t timeout_sec, uint32_t timeout_usec / if (!callbacks_.empty()) { BOOST_FOREACH(SocketCallbackInfo s, callbacks_) { // Add it to the set as well - FD_SET(s.socket_, &sockets); - if (maxfd < s.socket_) { - maxfd = s.socket_; - } + add_fd(s.socket_, maxfd, &sockets); } } - // Add receiver thread watch and error sockets. - FD_SET(receive_watch_.getSelectFd(), &sockets); - if (maxfd < receive_watch_.getSelectFd()) { - maxfd = receive_watch_.getSelectFd(); - } - FD_SET(error_watch_.getSelectFd(), &sockets); - if (maxfd < error_watch_.getSelectFd()) { - maxfd = error_watch_.getSelectFd(); - } + // Add Receiver ready watch socket + add_fd(receiver_->getWatchFd(Receiver::RCV_READY), maxfd, &sockets); + + // Add Receiver error watch socket + add_fd(receiver_->getWatchFd(Receiver::RCV_ERROR), maxfd, &sockets); // Set timeout for our next select() call. If there are // no DHCP packets to read, then we'll wait for a finite @@ -1382,9 +1430,9 @@ Pkt6Ptr IfaceMgr::receive6Indirect(uint32_t timeout_sec, uint32_t timeout_usec / // We only check external sockets if select detected an event. if (result > 0) { // Check for receiver thread read errors. - if (FD_ISSET(error_watch_.getSelectFd(), &sockets)) { - string msg = receiver_error_; - error_watch_.clearReady(); + if (receiver_->isReady(Receiver::RCV_ERROR)) { + string msg = receiver_->getLastError(); + receiver_->clearReady(Receiver::RCV_ERROR); isc_throw(SocketReadError, msg); } @@ -1409,16 +1457,20 @@ Pkt6Ptr IfaceMgr::receive6Indirect(uint32_t timeout_sec, uint32_t timeout_usec / // If we're here it should only be because there are DHCP packets waiting. // Protected packet queue access. - Mutex::Locker lock(receiver_lock_); - Pkt6Ptr pkt = getPacketQueue6()->dequeuePacket(); - if (!pkt) { - receive_watch_.clearReady(); + Pkt6Ptr pkt; + { + Mutex::Locker lock(receiver_->getLock()); + pkt = getPacketQueue6()->dequeuePacket(); + if (!pkt) { + receiver_->clearReady(Receiver::RCV_READY); + } } return (pkt); } -void IfaceMgr::receiveDHCP4Packets() { +void +IfaceMgr::receiveDHCP4Packets() { IfacePtr iface; fd_set sockets; int maxfd = 0; @@ -1426,30 +1478,22 @@ void IfaceMgr::receiveDHCP4Packets() { FD_ZERO(&sockets); // Add terminate watch socket. - FD_SET(terminate_watch_.getSelectFd(), &sockets); - if (maxfd < terminate_watch_.getSelectFd()) { - maxfd = terminate_watch_.getSelectFd(); - } + add_fd(receiver_->getWatchFd(Receiver::RCV_TERMINATE), maxfd, &sockets); // Add Interface sockets. BOOST_FOREACH(iface, ifaces_) { BOOST_FOREACH(SocketInfo s, iface->getSockets()) { - // Only deal with IPv4 addresses. if (s.addr_.isV4()) { // Add this socket to listening set. - FD_SET(s.sockfd_, &sockets); - if (maxfd < s.sockfd_) { - maxfd = s.sockfd_; - } + add_fd(s.sockfd_, maxfd, &sockets); } } } for (;;) { // Check the watch socket. - if (terminate_watch_.isReady()) { - terminate_watch_.clearReady(); + if (receiver_->shouldTerminate()) { return; } @@ -1463,8 +1507,7 @@ void IfaceMgr::receiveDHCP4Packets() { int result = select(maxfd + 1, &rd_set, 0, 0, 0); // Re-check the watch socket. - if (terminate_watch_.isReady()) { - terminate_watch_.clearReady(); + if (receiver_->shouldTerminate()) { return; } @@ -1476,8 +1519,7 @@ void IfaceMgr::receiveDHCP4Packets() { // This thread should not get signals? if (errno != EINTR) { // Signal the error to receive4. - receiver_error_ = strerror(errno); - error_watch_.markReady(); + receiver_->setError(strerror(errno)); // We need to sleep in case of the error condition to // prevent the thread from tight looping when result // gets negative. @@ -1492,8 +1534,7 @@ void IfaceMgr::receiveDHCP4Packets() { if (FD_ISSET(s.sockfd_, &sockets)) { receiveDHCP4Packet(*iface, s); // Can take time so check one more time the watch socket. - if (terminate_watch_.isReady()) { - terminate_watch_.clearReady(); + if (receiver_->shouldTerminate()) { return; } } @@ -1503,7 +1544,8 @@ void IfaceMgr::receiveDHCP4Packets() { } -void IfaceMgr::receiveDHCP6Packets() { +void +IfaceMgr::receiveDHCP6Packets() { IfacePtr iface; fd_set sockets; int maxfd = 0; @@ -1511,31 +1553,22 @@ void IfaceMgr::receiveDHCP6Packets() { FD_ZERO(&sockets); // Add terminate watch socket. - FD_SET(terminate_watch_.getSelectFd(), &sockets); - if (maxfd < terminate_watch_.getSelectFd()) { - maxfd = terminate_watch_.getSelectFd(); - } + add_fd(receiver_->getWatchFd(Receiver::RCV_TERMINATE), maxfd, &sockets); // Add Interface sockets. BOOST_FOREACH(iface, ifaces_) { BOOST_FOREACH(SocketInfo s, iface->getSockets()) { - // Only deal with IPv6 addresses. if (s.addr_.isV6()) { - // Add this socket to listening set. - FD_SET(s.sockfd_, &sockets); - if (maxfd < s.sockfd_) { - maxfd = s.sockfd_; - } + add_fd(s.sockfd_ , maxfd, &sockets); } } } for (;;) { // Check the watch socket. - if (terminate_watch_.isReady()) { - terminate_watch_.clearReady(); + if (receiver_->shouldTerminate()) { return; } @@ -1549,8 +1582,7 @@ void IfaceMgr::receiveDHCP6Packets() { int result = select(maxfd + 1, &rd_set, 0, 0, 0); // Re-check the watch socket. - if (terminate_watch_.isReady()) { - terminate_watch_.clearReady(); + if (receiver_->shouldTerminate()) { return; } @@ -1561,8 +1593,7 @@ void IfaceMgr::receiveDHCP6Packets() { // This thread should not get signals? if (errno != EINTR) { // Signal the error to receive6. - receiver_error_ = strerror(errno); - error_watch_.markReady(); + receiver_->setError(strerror(errno)); // We need to sleep in case of the error condition to // prevent the thread from tight looping when result // gets negative. @@ -1577,8 +1608,7 @@ void IfaceMgr::receiveDHCP6Packets() { if (FD_ISSET(s.sockfd_, &sockets)) { receiveDHCP6Packet(s); // Can take time so check one more time the watch socket. - if (terminate_watch_.isReady()) { - terminate_watch_.clearReady(); + if (receiver_->shouldTerminate()) { return; } } @@ -1587,13 +1617,14 @@ void IfaceMgr::receiveDHCP6Packets() { } } -void IfaceMgr::receiveDHCP4Packet(Iface& iface, const SocketInfo& socket_info) { +void +IfaceMgr::receiveDHCP4Packet(Iface& iface, const SocketInfo& socket_info) { int len; + int result = ioctl(socket_info.sockfd_, FIONREAD, &len); if (result < 0) { // Signal the error to receive4. - receiver_error_ = strerror(errno); - error_watch_.markReady(); + receiver_->setError(strerror(errno)); return; } if (len == 0) { @@ -1606,27 +1637,26 @@ void IfaceMgr::receiveDHCP4Packet(Iface& iface, const SocketInfo& socket_info) { try { pkt = packet_filter_->receive(iface, socket_info); } catch (const std::exception& ex) { - receiver_error_ = ex.what(); - error_watch_.markReady(); + receiver_->setError(strerror(errno)); } catch (...) { - receiver_error_ = "packet filter receive() failed"; - error_watch_.markReady(); + receiver_->setError("packet filter receive() failed"); } if (pkt) { - Mutex::Locker lock(receiver_lock_); + Mutex::Locker lock(receiver_->getLock()); getPacketQueue4()->enqueuePacket(pkt, socket_info); - receive_watch_.markReady(); + receiver_->markReady(Receiver::RCV_READY); } } -void IfaceMgr::receiveDHCP6Packet(const SocketInfo& socket_info) { +void +IfaceMgr::receiveDHCP6Packet(const SocketInfo& socket_info) { int len; + int result = ioctl(socket_info.sockfd_, FIONREAD, &len); if (result < 0) { // Signal the error to receive6. - receiver_error_ = strerror(errno); - error_watch_.markReady(); + receiver_->setError(strerror(errno)); return; } if (len == 0) { @@ -1639,21 +1669,20 @@ void IfaceMgr::receiveDHCP6Packet(const SocketInfo& socket_info) { try { pkt = packet_filter6_->receive(socket_info); } catch (const std::exception& ex) { - receiver_error_ = ex.what(); - error_watch_.markReady(); + receiver_->setError(ex.what()); } catch (...) { - receiver_error_ = "packet filter receive() failed"; - error_watch_.markReady(); + receiver_->setError("packet filter receive() failed"); } if (pkt) { - Mutex::Locker lock(receiver_lock_); + Mutex::Locker lock(receiver_->getLock()); getPacketQueue6()->enqueuePacket(pkt, socket_info); - receive_watch_.markReady(); + receiver_->markReady(Receiver::RCV_READY); } } -uint16_t IfaceMgr::getSocket(const isc::dhcp::Pkt6& pkt) { +uint16_t +IfaceMgr::getSocket(const isc::dhcp::Pkt6& pkt) { IfacePtr iface = getIface(pkt.getIface()); if (!iface) { isc_throw(IfaceNotFound, "Tried to find socket for non-existent interface"); diff --git a/src/lib/dhcp/iface_mgr.h b/src/lib/dhcp/iface_mgr.h index 310a52f227..dc950e9847 100644 --- a/src/lib/dhcp/iface_mgr.h +++ b/src/lib/dhcp/iface_mgr.h @@ -458,6 +458,115 @@ private: typedef boost::shared_ptr IfacePtr; +/// @brief Provides a thread and controls for receiving packets. +/// +/// Given a "worker function", this class creates a thread which +/// runs the function and provides the means to monitor the thread +/// for "error" and "ready" conditions, and finally to stop the thread. +/// It uses three WatchSockets: one to indicate an error, one to indicate +/// data is ready, and a third to monitor as a shut-down command. +class Receiver { +public: + /// @brief Enumerates the list of watch sockets used to mark events + /// These are used as arguments to watch socket accessor methods. + enum WatchType { + RCV_ERROR = 0, + RCV_READY = 1, + RCV_TERMINATE = 2 + }; + + /// @brief Constructor + /// + /// It initializes the watch sockets and then instantiates and + /// starts the receiver's worker thread. + /// + /// @param thread_main function the receiver's thread should run + Receiver(const boost::function& thread_main); + + /// @brief Virtual destructor + virtual ~Receiver() {} + + /// @brief Fetches the fd of a watch socket + /// + /// @param watch_type indicates which watch socket + /// @return the watch socket's file descriptor + int getWatchFd(WatchType watch_type); + + /// @brief Sets a watch socket state to ready + /// + /// @param watch_type indicates which watch socket to mark + void markReady(WatchType watch_type); + + /// @brief Indicates if a watch socket state is ready + /// + /// @param watch_type indicates which watch socket to mark + /// @return true if the watch socket is ready, false otherwise + bool isReady(WatchType watch_type); + + /// @brief Sets a watch socket state to not ready + /// + /// @param watch_type indicates which watch socket to clear + void clearReady(WatchType watch_type); + + /// @brief Checks if the receiver thread should terminate + /// + /// Performs a "one-shot" check of the receiver's terminate + /// watch socket. If it is ready, return true and then clear + /// it, otherwise return false. + /// + /// @return true if the terminate watch socket is ready + bool shouldTerminate(); + + /// @brief Terminates the receiver thread + /// + /// It marks the terminate watch socket ready, and then waits for the + /// thread to stop. At this point, the receiver is defunct. This is + /// not done in the destructor to avoid race conditions. + void stop(); + + /// @brief Fetches the receiver's thread mutex. + /// + /// This is used for instantation mutex locks to protect code blocks. + /// + /// @return a reference to the mutex + isc::util::thread::Mutex& getLock(); + + /// @brief Sets the receiver error state + /// + /// This records the given error message and sets the error watch + /// socket to ready. + /// + /// @param error_msg + void setError(const std::string& error_msg); + + /// @brief Fetches the error message text for the most recent socket error + /// + /// @return string containing the error message + std::string getLastError(); + + /// @brief Error message of the last error encountered + std::string last_error_; + + /// @brief DHCP watch sockets that are used to communicate with the owning thread + /// There are three: + /// -# RCV_ERROR - packet receive error watch socket. + /// Marked as ready when the DHCP packet receiver experiences an I/O error. + /// -# RCV_READY - Marked as ready when the DHCP packet receiver adds a packet + /// to the packet queue. + /// -# RCV_TERMINATE Packet receiver terminate watch socket. + /// Marked as ready when the DHCP packet receiver thread should terminate. + isc::util::WatchSocket sockets_[RCV_TERMINATE + 1]; + + /// DHCP packet thread mutex. + isc::util::thread::Mutex lock_; + + /// DHCP packet receiver thread. + isc::util::thread::ThreadPtr thread_ ; +}; + +/// @brief Defines a pointer to a Receiver +typedef boost::shared_ptr ReceiverPtr; + /// @brief Forward declaration to the @c IfaceMgr. class IfaceMgr; @@ -1056,7 +1165,7 @@ public: /// @brief Returns true if there is a receiver currently running. bool isReceiverRunning() const { - return (receiver_thread_ != 0); + return (receiver_ != 0); } /// @brief Configures DHCP packet queue @@ -1075,6 +1184,15 @@ public: bool configureDHCPPacketQueue(const uint16_t family, data::ConstElementPtr queue_control); + /// @brief Convenience method for adding an descriptor to a set + /// + /// @param fd descriptor to add + /// @param[out] maxfd maximum fd value in the set. If the new fd is + /// larger than it's current value, it will be updated to new fd value + /// @param sockets pointer to the set of sockets + /// @throw BadValue if sockets is null + static void add_fd(int fd, int& maxfd, fd_set* sockets); + // don't use private, we need derived classes in tests protected: @@ -1349,28 +1467,8 @@ private: /// @brief Allows to use loopback bool allow_loopback_; - /// @brief Error message of the last DHCP packet receive error. - std::string receiver_error_; - - /// @brief DHCP packet receive error watch socket. - /// Marked as ready when the DHCP packet receiver experiences - /// an I/O error. - isc::util::WatchSocket error_watch_; - - /// @brief DHCP packet receive watch socket. - /// Marked as ready when the DHCP packet receiver adds a packet - /// to the packet queue. - isc::util::WatchSocket receive_watch_; - - /// @brief Packet receiver terminate watch socket. - /// Marked as ready when the DHCP packet receiver thread should terminate. - isc::util::WatchSocket terminate_watch_; - - /// DHCP packet receiver mutex. - isc::util::thread::Mutex receiver_lock_; - - /// DHCP packet receiver thread. - isc::util::thread::ThreadPtr receiver_thread_; + /// DHCP packet receiver. + ReceiverPtr receiver_; }; }; // namespace isc::dhcp diff --git a/src/lib/dhcp/tests/iface_mgr_unittest.cc b/src/lib/dhcp/tests/iface_mgr_unittest.cc index edad39271f..94cbe8d71d 100644 --- a/src/lib/dhcp/tests/iface_mgr_unittest.cc +++ b/src/lib/dhcp/tests/iface_mgr_unittest.cc @@ -867,7 +867,7 @@ TEST_F(IfaceMgrTest, packetQueue4) { // Verify that we can create a queue with default factory. data::ConstElementPtr config = makeQueueConfig(PacketQueueMgr4::DEFAULT_QUEUE_TYPE4, 2000); ASSERT_NO_THROW(PacketQueueMgr4::instance().createPacketQueue(config)); - CHECK_QUEUE_INFO(ifacemgr.getPacketQueue4(), "{ \"capacity\": 2000, \"queue-type\": \"" + CHECK_QUEUE_INFO(ifacemgr.getPacketQueue4(), "{ \"capacity\": 2000, \"queue-type\": \"" << PacketQueueMgr4::DEFAULT_QUEUE_TYPE4 << "\", \"size\": 0 }"); // Verify that fetching the queue via IfaceMgr and PacketQueueMgr @@ -885,7 +885,7 @@ TEST_F(IfaceMgrTest, packetQueue6) { // Verify that we can create a queue with default factory. data::ConstElementPtr config = makeQueueConfig(PacketQueueMgr6::DEFAULT_QUEUE_TYPE6, 2000); ASSERT_NO_THROW(PacketQueueMgr6::instance().createPacketQueue(config)); - CHECK_QUEUE_INFO(ifacemgr.getPacketQueue6(), "{ \"capacity\": 2000, \"queue-type\": \"" + CHECK_QUEUE_INFO(ifacemgr.getPacketQueue6(), "{ \"capacity\": 2000, \"queue-type\": \"" << PacketQueueMgr6::DEFAULT_QUEUE_TYPE6 << "\", \"size\": 0 }"); // Verify that fetching the queue via IfaceMgr and PacketQueueMgr @@ -3083,7 +3083,7 @@ TEST_F(IfaceMgrTest, configureDHCPPacketQueueTest4) { ASSERT_NO_THROW(queue_enabled = ifacemgr->configureDHCPPacketQueue(AF_INET, queue_control)); ASSERT_TRUE(queue_enabled); // Verify we have correctly created the queue. - CHECK_QUEUE_INFO(ifacemgr->getPacketQueue4(), "{ \"capacity\": 500, \"queue-type\": \"" + CHECK_QUEUE_INFO(ifacemgr->getPacketQueue4(), "{ \"capacity\": 500, \"queue-type\": \"" << PacketQueueMgr4::DEFAULT_QUEUE_TYPE4 << "\", \"size\": 0 }"); // configureDHCPPacketQueue() should never start the thread. ASSERT_FALSE(ifacemgr->isReceiverRunning()); @@ -3152,7 +3152,7 @@ TEST_F(IfaceMgrTest, configureDHCPPacketQueueTest6) { ASSERT_NO_THROW(queue_enabled = ifacemgr->configureDHCPPacketQueue(AF_INET6, queue_control)); ASSERT_TRUE(queue_enabled); // Verify we have correctly created the queue. - CHECK_QUEUE_INFO(ifacemgr->getPacketQueue6(), "{ \"capacity\": 500, \"queue-type\": \"" + CHECK_QUEUE_INFO(ifacemgr->getPacketQueue6(), "{ \"capacity\": 500, \"queue-type\": \"" << PacketQueueMgr6::DEFAULT_QUEUE_TYPE6 << "\", \"size\": 0 }"); // configureDHCPPacketQueue() should never start the thread. ASSERT_FALSE(ifacemgr->isReceiverRunning()); @@ -3187,5 +3187,178 @@ TEST_F(IfaceMgrTest, configureDHCPPacketQueueTest6) { ASSERT_FALSE(ifacemgr->isReceiverRunning()); } +/// @brief Test Fixture for testing isc:dhcp::Receiver +class ReceiverTest : public ::testing::Test { +public: + /// @brief Maximum number of passes allowed in worker event loop + static const int WORKER_MAX_PASSES; + + /// @brief Constructor. + ReceiverTest() {} + + /// @brief Destructor. + ~ReceiverTest() { + } + + /// @brief Sleeps for a given number of event periods sleep + /// Each period is 50 ms. + void nap(int periods) { + usleep(periods * 50 * 1000); + }; + + /// @brief Worker function to be used by the Receiver's thread + /// + /// The function runs 5 passes through an "event" loop. + /// On each pass: + /// - check terminate command + /// - instigate the desired event (second pass only) + /// - naps for 1 period (50ms) + /// + /// @param watch_type type of event that should occur + void worker(Receiver::WatchType watch_type) { + for (passes_ = 1; passes_ < WORKER_MAX_PASSES; ++passes_) { + + // Stop if we're told to do it. + if (receiver_->shouldTerminate()) { + return; + } + + // On the second pass, set the event. + if (passes_ == 2) { + switch (watch_type) { + case Receiver::RCV_ERROR: + receiver_->setError("we have an error"); + break; + case Receiver::RCV_READY: + receiver_->markReady(watch_type); + break; + case Receiver::RCV_TERMINATE: + default: + // Do nothing, we're waiting to be told to stop. + break; + } + } + + // Take a nap. + nap(1); + } + + // Indicate why we stopped. + receiver_->setError("thread expired"); + } + + /// @brief Current receiver instance. + ReceiverPtr receiver_; + + /// @brief Counter used to track the number of passes made + /// within the thread worker function. + int passes_; +}; + +const int ReceiverTest::WORKER_MAX_PASSES = 5; + +/// Verifies the basic operation of the Receiver class. +/// It checks that a Receiver can be created, can be stopped, +/// and that in set and clear sockets. +TEST_F(ReceiverTest, receiverClassBasics) { + + /// We'll create a receiver and let it run until it expires. (Note this is more + /// of a test of ReceiverTest itself and ensures our tests later for why we + /// exited are sound.) + receiver_.reset(new Receiver(boost::bind(boost::bind(&ReceiverTest::worker, this, + Receiver::RCV_TERMINATE)))); + // Wait long enough for thread to expire. + nap(WORKER_MAX_PASSES + 1); + + // It should have done the maximum number of passes. + EXPECT_EQ(passes_, WORKER_MAX_PASSES); + + // Error should be ready and error text should be "thread expired". + ASSERT_TRUE(receiver_->isReady(Receiver::RCV_ERROR)); + ASSERT_FALSE(receiver_->isReady(Receiver::RCV_READY)); + ASSERT_FALSE(receiver_->isReady(Receiver::RCV_TERMINATE)); + EXPECT_EQ("thread expired", receiver_->getLastError()); + + /// Now we'll test stopping a thread. + /// We'll create a Receiver, let it run a little and then tell it to stop. + receiver_.reset(new Receiver(boost::bind(boost::bind(&ReceiverTest::worker, this, + Receiver::RCV_TERMINATE)))); + // No watches should be ready. + ASSERT_FALSE(receiver_->isReady(Receiver::RCV_ERROR)); + ASSERT_FALSE(receiver_->isReady(Receiver::RCV_READY)); + ASSERT_FALSE(receiver_->isReady(Receiver::RCV_TERMINATE)); + + // Wait a little while. + nap(2); + + // Tell it to stop. + receiver_->stop(); + + // It should have done less than the maximum number of passes. + EXPECT_LT(passes_, WORKER_MAX_PASSES); + + // No watches should be ready. Error text should be "thread stopped". + ASSERT_FALSE(receiver_->isReady(Receiver::RCV_ERROR)); + ASSERT_FALSE(receiver_->isReady(Receiver::RCV_READY)); + ASSERT_FALSE(receiver_->isReady(Receiver::RCV_TERMINATE)); + EXPECT_EQ("thread stopped", receiver_->getLastError()); + + + // Next we'll test error notification. + // We'll create a receiver that sets an error on the second pass. + receiver_.reset(new Receiver(boost::bind(boost::bind(&ReceiverTest::worker, this, + Receiver::RCV_ERROR)))); + // No watches should be ready. + ASSERT_FALSE(receiver_->isReady(Receiver::RCV_ERROR)); + ASSERT_FALSE(receiver_->isReady(Receiver::RCV_READY)); + ASSERT_FALSE(receiver_->isReady(Receiver::RCV_TERMINATE)); + + // Wait a little while. + nap(2); + + // It should now indicate an error. + ASSERT_TRUE(receiver_->isReady(Receiver::RCV_ERROR)); + EXPECT_EQ("we have an error", receiver_->getLastError()); + + // Tell it to stop. + receiver_->stop(); + + // It should have done less than the maximum number of passes. + EXPECT_LT(passes_, WORKER_MAX_PASSES); + + // No watches should be ready. Error text should be "thread stopped". + ASSERT_FALSE(receiver_->isReady(Receiver::RCV_ERROR)); + ASSERT_FALSE(receiver_->isReady(Receiver::RCV_READY)); + ASSERT_FALSE(receiver_->isReady(Receiver::RCV_TERMINATE)); + EXPECT_EQ("thread stopped", receiver_->getLastError()); + + + // Finally, we'll test data ready notification. + // We'll create a receiver that indicates data ready on its second pass. + receiver_.reset(new Receiver(boost::bind(boost::bind(&ReceiverTest::worker, this, + Receiver::RCV_READY)))); + // No watches should be ready. + ASSERT_FALSE(receiver_->isReady(Receiver::RCV_ERROR)); + ASSERT_FALSE(receiver_->isReady(Receiver::RCV_READY)); + ASSERT_FALSE(receiver_->isReady(Receiver::RCV_TERMINATE)); + + // Wait a little while. + nap(2); + + // It should now indicate data ready. + ASSERT_TRUE(receiver_->isReady(Receiver::RCV_READY)); + + // Tell it to stop. + receiver_->stop(); + + // It should have done less than the maximum number of passes. + EXPECT_LT(passes_, WORKER_MAX_PASSES); + + // No watches should be ready. Error text should be "thread stopped". + ASSERT_FALSE(receiver_->isReady(Receiver::RCV_ERROR)); + ASSERT_FALSE(receiver_->isReady(Receiver::RCV_READY)); + ASSERT_FALSE(receiver_->isReady(Receiver::RCV_TERMINATE)); + EXPECT_EQ("thread stopped", receiver_->getLastError()); +} }