<!-- Converted by db4-upgrade version 1.1 -->
<chapter xmlns="http://docbook.org/ns/docbook" version="5.0" xml:id="congestion-handling">
<title>Congestion Handling in DHCPv4 and DHCPv6</title>
+ <section xml:id="congeston-handling-background">
+ <title>What is Congestion?</title>
<para>Congestion occurs when servers are subjected to client queries
faster than they can be fulfilled. Subsequently, the servers begin
accumulating a backlog of pending queries. The longer the high rate of
relevant, or worse are redundant. In other words, the packets waiting in
the FIFO socket buffers become increasingly stale.
</para>
-
+ </section>
+ <section xml:id="congeston-handling-solution">
+ <title>Configuring Congestion Handling</title>
<para>Kea 1.5 introduces a new feature referred to as Congestion Handling.
Congestion handling offers the ability to configure the server to use a
separate thread to read packets from the interface socket buffers. As the
<para>
The number of parameters and plug-ins is expected to grow over time.
</para>
+</section>
</chapter>
return (false); // socket not found
}
-void
-Receiver::start(const boost::function<void()>& thread_main) {
- clearReady(RCV_ERROR);
- clearReady(RCV_READY);
- clearReady(RCV_TERMINATE);
- last_error_ = "no error";
- thread_.reset(new isc::util::thread::Thread(thread_main));
-}
-
-int
-Receiver::getWatchFd(WatchType watch_type) {
- return(sockets_[watch_type].getSelectFd());
-}
-
-void
-Receiver::markReady(WatchType watch_type) {
- sockets_[watch_type].markReady();
-}
-
-bool
-Receiver::isReady(WatchType watch_type) {
- return (sockets_[watch_type].isReady());
-}
-
-void
-Receiver::clearReady(WatchType watch_type) {
- sockets_[watch_type].clearReady();
-}
-
-bool
-Receiver::shouldTerminate() {
- if (sockets_[RCV_TERMINATE].isReady()) {
- clearReady(RCV_TERMINATE);
- return (true);
- }
-
- return (false);
-}
-
-void
-Receiver::stop() {
- if (thread_) {
- markReady(RCV_TERMINATE);
- thread_->wait();
- thread_.reset();
- }
-
- clearReady(RCV_ERROR);
- clearReady(RCV_READY);
- last_error_ = "thread stopped";
-}
-
-void
-Receiver::setError(const std::string& error_msg) {
- last_error_ = error_msg;
- markReady(RCV_ERROR);
-}
-
-std::string
-Receiver::getLastError() {
- return (last_error_);
-}
-
IfaceMgr::IfaceMgr()
:control_buf_len_(CMSG_SPACE(sizeof(struct in6_pktinfo))),
control_buf_(new char[control_buf_len_]),
}
void IfaceMgr::stopDHCPReceiver() {
- if (isReceiverRunning()) {
- receiver_->stop();
- receiver_.reset();
+ if (isDHCPReceiverRunning()) {
+ dhcp_receiver_->stop();
}
+ dhcp_receiver_.reset();
+
if (getPacketQueue4()) {
getPacketQueue4()->clear();
}
void
IfaceMgr::startDHCPReceiver(const uint16_t family) {
- if (isReceiverRunning()) {
+ if (isDHCPReceiverRunning()) {
isc_throw(InvalidOperation, "a receiver thread already exists");
}
switch (family) {
case AF_INET:
- // If there's no queue, then has been disabled, simply return.
+ // If the queue doesn't exist, packet queing has been configured
+ // as disabled. If there is no queue, we do not create a reciever.
if(!getPacketQueue4()) {
- return;
+ return;
}
- receiver_.reset(new Receiver());
- receiver_->start(boost::bind(boost::bind(&IfaceMgr::receiveDHCP4Packets, this)));
+ dhcp_receiver_.reset(new WatchedThread());
+ dhcp_receiver_->start(boost::bind(&IfaceMgr::receiveDHCP4Packets, this));
break;
case AF_INET6:
- // If there's no queue, then has been disabled, simply return.
+ // If the queue doesn't exist, packet queing has been configured
+ // as disabled. If there is no queue, we do not create a reciever.
if(!getPacketQueue6()) {
return;
}
- receiver_.reset(new Receiver());
- receiver_->start(boost::bind(boost::bind(&IfaceMgr::receiveDHCP6Packets, this)));
+ dhcp_receiver_.reset(new WatchedThread());
+ dhcp_receiver_->start(boost::bind(&IfaceMgr::receiveDHCP6Packets, this));
break;
default:
isc_throw (BadValue, "startDHCPReceiver: invalid family: " << family);
}
Pkt4Ptr IfaceMgr::receive4(uint32_t timeout_sec, uint32_t timeout_usec /* = 0 */) {
- if (isReceiverRunning()) {
+ if (isDHCPReceiverRunning()) {
return (receive4Indirect(timeout_sec, timeout_usec));
}
// if there are any callbacks for external sockets registered...
if (!callbacks_.empty()) {
BOOST_FOREACH(SocketCallbackInfo s, callbacks_) {
- add_fd(s.socket_, maxfd, &sockets);
+ addFDtoSet(s.socket_, maxfd, &sockets);
}
}
// Add Receiver ready watch socket
- add_fd(receiver_->getWatchFd(Receiver::RCV_READY), maxfd, &sockets);
+ addFDtoSet(dhcp_receiver_->getWatchFd(WatchedThread::RCV_READY), maxfd, &sockets);
// Add Receiver error watch socket
- add_fd(receiver_->getWatchFd(Receiver::RCV_ERROR), maxfd, &sockets);
+ addFDtoSet(dhcp_receiver_->getWatchFd(WatchedThread::RCV_ERROR), maxfd, &sockets);
// Set timeout for our next select() call. If there are
// no DHCP packets to read, then we'll wait for a finite
// We only check external sockets if select detected an event.
if (result > 0) {
// Check for receiver thread read errors.
- if (receiver_->isReady(Receiver::RCV_ERROR)) {
- string msg = receiver_->getLastError();
- receiver_->clearReady(Receiver::RCV_ERROR);
+ if (dhcp_receiver_->isReady(WatchedThread::RCV_ERROR)) {
+ string msg = dhcp_receiver_->getLastError();
+ dhcp_receiver_->clearReady(WatchedThread::RCV_ERROR);
isc_throw(SocketReadError, msg);
}
// If we're here it should only be because there are DHCP packets waiting.
Pkt4Ptr pkt = getPacketQueue4()->dequeuePacket();
if (!pkt) {
- receiver_->clearReady(Receiver::RCV_READY);
+ dhcp_receiver_->clearReady(WatchedThread::RCV_READY);
}
return (pkt);
// Only deal with IPv4 addresses.
if (s.addr_.isV4()) {
// Add this socket to listening set
- add_fd(s.sockfd_, maxfd, &sockets);
+ addFDtoSet(s.sockfd_, maxfd, &sockets);
}
}
}
if (!callbacks_.empty()) {
BOOST_FOREACH(SocketCallbackInfo s, callbacks_) {
// Add this socket to listening set
- add_fd(s.socket_, maxfd, &sockets);
+ addFDtoSet(s.socket_, maxfd, &sockets);
}
}
Pkt6Ptr
IfaceMgr::receive6(uint32_t timeout_sec, uint32_t timeout_usec /* = 0 */) {
- if (isReceiverRunning()) {
+ if (isDHCPReceiverRunning()) {
return (receive6Indirect(timeout_sec, timeout_usec));
}
}
void
-IfaceMgr::add_fd(int fd, int& maxfd, fd_set* sockets) {
+IfaceMgr::addFDtoSet(int fd, int& maxfd, fd_set* sockets) {
if (!sockets) {
- isc_throw(BadValue, "add_fd: sockets can't be null");
+ isc_throw(BadValue, "addFDtoSet: sockets can't be null");
}
FD_SET(fd, sockets);
// Only deal with IPv6 addresses.
if (s.addr_.isV6()) {
// Add this socket to listening set
- add_fd(s.sockfd_, maxfd, &sockets);
+ addFDtoSet(s.sockfd_, maxfd, &sockets);
}
}
}
if (!callbacks_.empty()) {
BOOST_FOREACH(SocketCallbackInfo s, callbacks_) {
// Add it to the set as well
- add_fd(s.socket_, maxfd, &sockets);
+ addFDtoSet(s.socket_, maxfd, &sockets);
}
}
if (!callbacks_.empty()) {
BOOST_FOREACH(SocketCallbackInfo s, callbacks_) {
// Add it to the set as well
- add_fd(s.socket_, maxfd, &sockets);
+ addFDtoSet(s.socket_, maxfd, &sockets);
}
}
// Add Receiver ready watch socket
- add_fd(receiver_->getWatchFd(Receiver::RCV_READY), maxfd, &sockets);
+ addFDtoSet(dhcp_receiver_->getWatchFd(WatchedThread::RCV_READY), maxfd, &sockets);
// Add Receiver error watch socket
- add_fd(receiver_->getWatchFd(Receiver::RCV_ERROR), maxfd, &sockets);
+ addFDtoSet(dhcp_receiver_->getWatchFd(WatchedThread::RCV_ERROR), maxfd, &sockets);
// Set timeout for our next select() call. If there are
// no DHCP packets to read, then we'll wait for a finite
// We only check external sockets if select detected an event.
if (result > 0) {
// Check for receiver thread read errors.
- if (receiver_->isReady(Receiver::RCV_ERROR)) {
- string msg = receiver_->getLastError();
- receiver_->clearReady(Receiver::RCV_ERROR);
+ if (dhcp_receiver_->isReady(WatchedThread::RCV_ERROR)) {
+ string msg = dhcp_receiver_->getLastError();
+ dhcp_receiver_->clearReady(WatchedThread::RCV_ERROR);
isc_throw(SocketReadError, msg);
}
// If we're here it should only be because there are DHCP packets waiting.
Pkt6Ptr pkt = getPacketQueue6()->dequeuePacket();
if (!pkt) {
- receiver_->clearReady(Receiver::RCV_READY);
+ dhcp_receiver_->clearReady(WatchedThread::RCV_READY);
}
return (pkt);
FD_ZERO(&sockets);
// Add terminate watch socket.
- add_fd(receiver_->getWatchFd(Receiver::RCV_TERMINATE), maxfd, &sockets);
+ addFDtoSet(dhcp_receiver_->getWatchFd(WatchedThread::RCV_TERMINATE), maxfd, &sockets);
// Add Interface sockets.
BOOST_FOREACH(iface, ifaces_) {
// Only deal with IPv4 addresses.
if (s.addr_.isV4()) {
// Add this socket to listening set.
- add_fd(s.sockfd_, maxfd, &sockets);
+ addFDtoSet(s.sockfd_, maxfd, &sockets);
}
}
}
for (;;) {
// Check the watch socket.
- if (receiver_->shouldTerminate()) {
+ if (dhcp_receiver_->shouldTerminate()) {
return;
}
int result = select(maxfd + 1, &rd_set, 0, 0, 0);
// Re-check the watch socket.
- if (receiver_->shouldTerminate()) {
+ if (dhcp_receiver_->shouldTerminate()) {
return;
}
// This thread should not get signals?
if (errno != EINTR) {
// Signal the error to receive4.
- receiver_->setError(strerror(errno));
+ dhcp_receiver_->setError(strerror(errno));
// We need to sleep in case of the error condition to
// prevent the thread from tight looping when result
// gets negative.
if (FD_ISSET(s.sockfd_, &sockets)) {
receiveDHCP4Packet(*iface, s);
// Can take time so check one more time the watch socket.
- if (receiver_->shouldTerminate()) {
+ if (dhcp_receiver_->shouldTerminate()) {
return;
}
}
FD_ZERO(&sockets);
// Add terminate watch socket.
- add_fd(receiver_->getWatchFd(Receiver::RCV_TERMINATE), maxfd, &sockets);
+ addFDtoSet(dhcp_receiver_->getWatchFd(WatchedThread::RCV_TERMINATE), maxfd, &sockets);
// Add Interface sockets.
BOOST_FOREACH(iface, ifaces_) {
// Only deal with IPv6 addresses.
if (s.addr_.isV6()) {
// Add this socket to listening set.
- add_fd(s.sockfd_ , maxfd, &sockets);
+ addFDtoSet(s.sockfd_ , maxfd, &sockets);
}
}
}
for (;;) {
// Check the watch socket.
- if (receiver_->shouldTerminate()) {
+ if (dhcp_receiver_->shouldTerminate()) {
return;
}
int result = select(maxfd + 1, &rd_set, 0, 0, 0);
// Re-check the watch socket.
- if (receiver_->shouldTerminate()) {
+ if (dhcp_receiver_->shouldTerminate()) {
return;
}
// This thread should not get signals?
if (errno != EINTR) {
// Signal the error to receive6.
- receiver_->setError(strerror(errno));
+ dhcp_receiver_->setError(strerror(errno));
// We need to sleep in case of the error condition to
// prevent the thread from tight looping when result
// gets negative.
if (FD_ISSET(s.sockfd_, &sockets)) {
receiveDHCP6Packet(s);
// Can take time so check one more time the watch socket.
- if (receiver_->shouldTerminate()) {
+ if (dhcp_receiver_->shouldTerminate()) {
return;
}
}
int result = ioctl(socket_info.sockfd_, FIONREAD, &len);
if (result < 0) {
// Signal the error to receive4.
- receiver_->setError(strerror(errno));
+ dhcp_receiver_->setError(strerror(errno));
return;
}
if (len == 0) {
try {
pkt = packet_filter_->receive(iface, socket_info);
} catch (const std::exception& ex) {
- receiver_->setError(strerror(errno));
+ dhcp_receiver_->setError(strerror(errno));
} catch (...) {
- receiver_->setError("packet filter receive() failed");
+ dhcp_receiver_->setError("packet filter receive() failed");
}
if (pkt) {
getPacketQueue4()->enqueuePacket(pkt, socket_info);
- receiver_->markReady(Receiver::RCV_READY);
+ dhcp_receiver_->markReady(WatchedThread::RCV_READY);
}
}
int result = ioctl(socket_info.sockfd_, FIONREAD, &len);
if (result < 0) {
// Signal the error to receive6.
- receiver_->setError(strerror(errno));
+ dhcp_receiver_->setError(strerror(errno));
return;
}
if (len == 0) {
try {
pkt = packet_filter6_->receive(socket_info);
} catch (const std::exception& ex) {
- receiver_->setError(ex.what());
+ dhcp_receiver_->setError(ex.what());
} catch (...) {
- receiver_->setError("packet filter receive() failed");
+ dhcp_receiver_->setError("packet filter receive() failed");
}
if (pkt) {
getPacketQueue6()->enqueuePacket(pkt, socket_info);
- receiver_->markReady(Receiver::RCV_READY);
+ dhcp_receiver_->markReady(WatchedThread::RCV_READY);
}
}
bool
IfaceMgr::configureDHCPPacketQueue(uint16_t family, data::ConstElementPtr queue_control) {
- if (isReceiverRunning()) {
+ if (isDHCPReceiverRunning()) {
isc_throw(InvalidOperation, "Cannot reconfigure queueing"
- " while receiver thread is running");
+ " while DHCP receiver thread is running");
}
bool enable_queue = false;
#include <dhcp/pkt_filter6.h>
#include <util/optional_value.h>
#include <util/watch_socket.h>
-#include <util/threads/thread.h>
+#include <util/threads/watched_thread.h>
#include <boost/function.hpp>
#include <boost/noncopyable.hpp>
#include <boost/scoped_array.hpp>
#include <boost/shared_ptr.hpp>
-#include <boost/circular_buffer.hpp>
#include <list>
#include <vector>
typedef boost::shared_ptr<Iface> IfacePtr;
-/// @brief Provides a thread and controls for receiving packets.
-///
-/// Given a "worker function", this class creates a thread which
-/// runs the function and provides the means to monitor the thread
-/// for "error" and "ready" conditions, and finally to stop the thread.
-/// It uses three WatchSockets: one to indicate an error, one to indicate
-/// data is ready, and a third to monitor as a shut-down command.
-class Receiver {
-public:
- /// @brief Enumerates the list of watch sockets used to mark events
- /// These are used as arguments to watch socket accessor methods.
- enum WatchType {
- RCV_ERROR = 0,
- RCV_READY = 1,
- RCV_TERMINATE = 2
- };
-
- /// @brief Constructor
- Receiver(){};
-
- /// @brief Virtual destructor
- virtual ~Receiver(){}
-
- /// @brief Fetches the fd of a watch socket
- ///
- /// @param watch_type indicates which watch socket
- /// @return the watch socket's file descriptor
- int getWatchFd(WatchType watch_type);
-
- /// @brief Sets a watch socket state to ready
- ///
- /// @param watch_type indicates which watch socket to mark
- void markReady(WatchType watch_type);
-
- /// @brief Indicates if a watch socket state is ready
- ///
- /// @param watch_type indicates which watch socket to mark
- /// @return true if the watch socket is ready, false otherwise
- bool isReady(WatchType watch_type);
-
- /// @brief Sets a watch socket state to not ready
- ///
- /// @param watch_type indicates which watch socket to clear
- void clearReady(WatchType watch_type);
-
- /// @brief Checks if the receiver thread should terminate
- ///
- /// Performs a "one-shot" check of the receiver's terminate
- /// watch socket. If it is ready, return true and then clear
- /// it, otherwise return false.
- ///
- /// @return true if the terminate watch socket is ready
- bool shouldTerminate();
-
- /// @brief Creates and runs the thread.
- ///
- /// Creates teh receiver's thread, passing into it the given
- /// function to run.
- ///
- /// @param thread_main function the receiver's thread should run
- void start(const boost::function<void()>& thread_main);
-
- /// @brief Returns true if the receiver thread is running
- /// @todo - this may need additional logic to handle cases where
- /// a thread function exits w/o the caller invoking @c
- /// Receiver::stop().
- bool isRunning() {
- return (thread_ != 0);
- }
-
- /// @brief Terminates the receiver thread
- ///
- /// It marks the terminate watch socket ready, and then waits for the
- /// thread to stop. At this point, the receiver is defunct. This is
- /// not done in the destructor to avoid race conditions.
- void stop();
-
- /// @brief Sets the receiver error state
- ///
- /// This records the given error message and sets the error watch
- /// socket to ready.
- ///
- /// @param error_msg
- void setError(const std::string& error_msg);
-
- /// @brief Fetches the error message text for the most recent socket error
- ///
- /// @return string containing the error message
- std::string getLastError();
-
- /// @brief Error message of the last error encountered
- std::string last_error_;
-
- /// @brief DHCP watch sockets that are used to communicate with the owning thread
- /// There are three:
- /// -# RCV_ERROR - packet receive error watch socket.
- /// Marked as ready when the DHCP packet receiver experiences an I/O error.
- /// -# RCV_READY - Marked as ready when the DHCP packet receiver adds a packet
- /// to the packet queue.
- /// -# RCV_TERMINATE Packet receiver terminate watch socket.
- /// Marked as ready when the DHCP packet receiver thread should terminate.
- isc::util::WatchSocket sockets_[RCV_TERMINATE + 1];
-
- /// DHCP packet receiver thread.
- isc::util::thread::ThreadPtr thread_ ;
-};
-
-/// @brief Defines a pointer to a Receiver
-typedef boost::shared_ptr<Receiver> ReceiverPtr;
-
/// @brief Forward declaration to the @c IfaceMgr.
class IfaceMgr;
/// @brief Returns true if there is a receiver exists and its
/// thread is currently running.
- bool isReceiverRunning() const {
- return (receiver_ != 0 && receiver_->isRunning());
+ bool isDHCPReceiverRunning() const {
+ return (dhcp_receiver_ != 0 && dhcp_receiver_->isRunning());
}
/// @brief Configures DHCP packet queue
/// larger than it's current value, it will be updated to new fd value
/// @param sockets pointer to the set of sockets
/// @throw BadValue if sockets is null
- static void add_fd(int fd, int& maxfd, fd_set* sockets);
+ static void addFDtoSet(int fd, int& maxfd, fd_set* sockets);
// don't use private, we need derived classes in tests
protected:
PacketQueueMgr6Ptr packet_queue_mgr6_;
/// DHCP packet receiver.
- ReceiverPtr receiver_;
+ isc::util::thread::WatchedThreadPtr dhcp_receiver_;
};
}; // namespace isc::dhcp
// Look for it.
auto index = factories_.find(queue_type);
- // If it's there remove it
- if (index != factories_.end()) {
- factories_.erase(index);
- return (true);
+ // Not there so nothing to do.
+ if (index == factories_.end()) {
+ return (false);
+ }
+ // If the queue is of the type being unregistered, then remove it. We don't
+ // a queue instance outliving its library.
+ if ((packet_queue_) && (packet_queue_->getQueueType() == queue_type)) {
+ packet_queue_.reset();
}
- return (false);
+ // Remove the factory.
+ factories_.erase(index);
+
+ return (true);
}
/// @brief Create an instance of a packet queue.
// Thread should only start when there is a packet queue.
ASSERT_NO_THROW(ifacemgr->startDHCPReceiver(AF_INET6));
- ASSERT_TRUE(queue_enabled == ifacemgr->isReceiverRunning());
+ ASSERT_TRUE(queue_enabled == ifacemgr->isDHCPReceiverRunning());
// If the thread is already running, trying to start it again should fail.
if (queue_enabled) {
ASSERT_THROW(ifacemgr->startDHCPReceiver(AF_INET6), InvalidOperation);
// Should still have one running.
- ASSERT_TRUE(ifacemgr->isReceiverRunning());
+ ASSERT_TRUE(ifacemgr->isDHCPReceiverRunning());
}
// Let's build our DHCPv6 packet.
// Stop the thread. This should be no harm/no foul if we're not
// queueuing. Either way, we should not have a thread afterwards.
ASSERT_NO_THROW(ifacemgr->stopDHCPReceiver());
- ASSERT_FALSE(ifacemgr->isReceiverRunning());
+ ASSERT_FALSE(ifacemgr->isDHCPReceiverRunning());
}
// Thread should only start when there is a packet queue.
ASSERT_NO_THROW(ifacemgr->startDHCPReceiver(AF_INET));
- ASSERT_TRUE(queue_enabled == ifacemgr->isReceiverRunning());
+ ASSERT_TRUE(queue_enabled == ifacemgr->isDHCPReceiverRunning());
// If the thread is already running, trying to start it again should fail.
if (queue_enabled) {
ASSERT_THROW(ifacemgr->startDHCPReceiver(AF_INET), InvalidOperation);
// Should still have one running.
- ASSERT_TRUE(ifacemgr->isReceiverRunning());
+ ASSERT_TRUE(ifacemgr->isDHCPReceiverRunning());
}
// Let's construct the packet to send.
// Stop the thread. This should be no harm/no foul if we're not
// queueuing. Either way, we should not have a thread afterwards.
ASSERT_NO_THROW(ifacemgr->stopDHCPReceiver());
- ASSERT_FALSE(ifacemgr->isReceiverRunning());
+ ASSERT_FALSE(ifacemgr->isDHCPReceiverRunning());
}
/// Holds the invocation counter for ifaceMgrErrorHandler.
// First let's make sure there is no queue and no thread.
ASSERT_FALSE(ifacemgr->getPacketQueue4());
- ASSERT_FALSE(ifacemgr->isReceiverRunning());
+ ASSERT_FALSE(ifacemgr->isDHCPReceiverRunning());
bool queue_enabled = false;
// Given an empty pointer, we should default to no queue.
EXPECT_FALSE(queue_enabled);
EXPECT_FALSE(ifacemgr->getPacketQueue4());
// configureDHCPPacketQueue() should never start the thread.
- ASSERT_FALSE(ifacemgr->isReceiverRunning());
+ ASSERT_FALSE(ifacemgr->isDHCPReceiverRunning());
// Verify that calling startDHCPReceiver with no queue, does NOT start the thread.
ASSERT_NO_THROW(ifacemgr->startDHCPReceiver(AF_INET));
- ASSERT_FALSE(ifacemgr->isReceiverRunning());
+ ASSERT_FALSE(ifacemgr->isDHCPReceiverRunning());
// Now let's try with a populated queue control, but with enable-queue = false.
queue_control = makeQueueConfig(PacketQueueMgr4::DEFAULT_QUEUE_TYPE4, 500, false);
EXPECT_FALSE(queue_enabled);
EXPECT_FALSE(ifacemgr->getPacketQueue4());
// configureDHCPPacketQueue() should never start the thread.
- ASSERT_FALSE(ifacemgr->isReceiverRunning());
+ ASSERT_FALSE(ifacemgr->isDHCPReceiverRunning());
// Now let's enable the queue.
queue_control = makeQueueConfig(PacketQueueMgr4::DEFAULT_QUEUE_TYPE4, 500, true);
CHECK_QUEUE_INFO(ifacemgr->getPacketQueue4(), "{ \"capacity\": 500, \"queue-type\": \""
<< PacketQueueMgr4::DEFAULT_QUEUE_TYPE4 << "\", \"size\": 0 }");
// configureDHCPPacketQueue() should never start the thread.
- ASSERT_FALSE(ifacemgr->isReceiverRunning());
+ ASSERT_FALSE(ifacemgr->isDHCPReceiverRunning());
// Calling startDHCPReceiver with a queue, should start the thread.
ASSERT_NO_THROW(ifacemgr->startDHCPReceiver(AF_INET));
- ASSERT_TRUE(ifacemgr->isReceiverRunning());
+ ASSERT_TRUE(ifacemgr->isDHCPReceiverRunning());
// Verify that calling startDHCPReceiver when the thread is running, throws.
ASSERT_THROW(ifacemgr->startDHCPReceiver(AF_INET), InvalidOperation);
// We should still have our queue and the thread should still be running.
EXPECT_TRUE(ifacemgr->getPacketQueue4());
- ASSERT_TRUE(ifacemgr->isReceiverRunning());
+ ASSERT_TRUE(ifacemgr->isDHCPReceiverRunning());
// Now let's stop stop the thread.
ASSERT_NO_THROW(ifacemgr->stopDHCPReceiver());
- ASSERT_FALSE(ifacemgr->isReceiverRunning());
+ ASSERT_FALSE(ifacemgr->isDHCPReceiverRunning());
// Stopping the thread should not destroy the queue.
ASSERT_TRUE(ifacemgr->getPacketQueue4());
// Reconfigure with the queue turned off. We should have neither queue nor thread.
ASSERT_NO_THROW(queue_enabled = ifacemgr->configureDHCPPacketQueue(AF_INET, queue_control));
EXPECT_FALSE(ifacemgr->getPacketQueue4());
- ASSERT_FALSE(ifacemgr->isReceiverRunning());
+ ASSERT_FALSE(ifacemgr->isDHCPReceiverRunning());
}
// Verifies DHCPv6 behavior of configureDHCPPacketQueue()
// First let's make sure there is no queue and no thread.
ASSERT_FALSE(ifacemgr->getPacketQueue6());
- ASSERT_FALSE(ifacemgr->isReceiverRunning());
+ ASSERT_FALSE(ifacemgr->isDHCPReceiverRunning());
bool queue_enabled = false;
// Given an empty pointer, we should default to no queue.
EXPECT_FALSE(queue_enabled);
EXPECT_FALSE(ifacemgr->getPacketQueue6());
// configureDHCPPacketQueue() should never start the thread.
- ASSERT_FALSE(ifacemgr->isReceiverRunning());
+ ASSERT_FALSE(ifacemgr->isDHCPReceiverRunning());
// Verify that calling startDHCPReceiver with no queue, does NOT start the thread.
ASSERT_NO_THROW(ifacemgr->startDHCPReceiver(AF_INET));
- ASSERT_FALSE(ifacemgr->isReceiverRunning());
+ ASSERT_FALSE(ifacemgr->isDHCPReceiverRunning());
// Now let's try with a populated queue control, but with enable-queue = false.
queue_control = makeQueueConfig(PacketQueueMgr6::DEFAULT_QUEUE_TYPE6, 500, false);
EXPECT_FALSE(queue_enabled);
EXPECT_FALSE(ifacemgr->getPacketQueue6());
// configureDHCPPacketQueue() should never start the thread.
- ASSERT_FALSE(ifacemgr->isReceiverRunning());
+ ASSERT_FALSE(ifacemgr->isDHCPReceiverRunning());
// Now let's enable the queue.
queue_control = makeQueueConfig(PacketQueueMgr6::DEFAULT_QUEUE_TYPE6, 500, true);
CHECK_QUEUE_INFO(ifacemgr->getPacketQueue6(), "{ \"capacity\": 500, \"queue-type\": \""
<< PacketQueueMgr6::DEFAULT_QUEUE_TYPE6 << "\", \"size\": 0 }");
// configureDHCPPacketQueue() should never start the thread.
- ASSERT_FALSE(ifacemgr->isReceiverRunning());
+ ASSERT_FALSE(ifacemgr->isDHCPReceiverRunning());
// Calling startDHCPReceiver with a queue, should start the thread.
ASSERT_NO_THROW(ifacemgr->startDHCPReceiver(AF_INET6));
- ASSERT_TRUE(ifacemgr->isReceiverRunning());
+ ASSERT_TRUE(ifacemgr->isDHCPReceiverRunning());
// Verify that calling startDHCPReceiver when the thread is running, throws.
ASSERT_THROW(ifacemgr->startDHCPReceiver(AF_INET6), InvalidOperation);
// We should still have our queue and the thread should still be running.
EXPECT_TRUE(ifacemgr->getPacketQueue6());
- ASSERT_TRUE(ifacemgr->isReceiverRunning());
+ ASSERT_TRUE(ifacemgr->isDHCPReceiverRunning());
// Now let's stop stop the thread.
ASSERT_NO_THROW(ifacemgr->stopDHCPReceiver());
- ASSERT_FALSE(ifacemgr->isReceiverRunning());
+ ASSERT_FALSE(ifacemgr->isDHCPReceiverRunning());
// Stopping the thread should not destroy the queue.
ASSERT_TRUE(ifacemgr->getPacketQueue6());
// Reconfigure with the queue turned off. We should have neither queue nor thread.
ASSERT_NO_THROW(queue_enabled = ifacemgr->configureDHCPPacketQueue(AF_INET6, queue_control));
EXPECT_FALSE(ifacemgr->getPacketQueue6());
- ASSERT_FALSE(ifacemgr->isReceiverRunning());
-}
-
-/// @brief Test Fixture for testing isc:dhcp::Receiver
-class ReceiverTest : public ::testing::Test {
-public:
- /// @brief Maximum number of passes allowed in worker event loop
- static const int WORKER_MAX_PASSES;
-
- /// @brief Constructor.
- ReceiverTest() {}
-
- /// @brief Destructor.
- ~ReceiverTest() {
- }
-
- /// @brief Sleeps for a given number of event periods sleep
- /// Each period is 50 ms.
- void nap(int periods) {
- usleep(periods * 50 * 1000);
- };
-
- /// @brief Worker function to be used by the Receiver's thread
- ///
- /// The function runs 5 passes through an "event" loop.
- /// On each pass:
- /// - check terminate command
- /// - instigate the desired event (second pass only)
- /// - naps for 1 period (50ms)
- ///
- /// @param watch_type type of event that should occur
- void worker(Receiver::WatchType watch_type) {
- for (passes_ = 1; passes_ < WORKER_MAX_PASSES; ++passes_) {
-
- // Stop if we're told to do it.
- if (receiver_->shouldTerminate()) {
- return;
- }
-
- // On the second pass, set the event.
- if (passes_ == 2) {
- switch (watch_type) {
- case Receiver::RCV_ERROR:
- receiver_->setError("we have an error");
- break;
- case Receiver::RCV_READY:
- receiver_->markReady(watch_type);
- break;
- case Receiver::RCV_TERMINATE:
- default:
- // Do nothing, we're waiting to be told to stop.
- break;
- }
- }
-
- // Take a nap.
- nap(1);
- }
-
- // Indicate why we stopped.
- receiver_->setError("thread expired");
- }
-
- /// @brief Current receiver instance.
- ReceiverPtr receiver_;
-
- /// @brief Counter used to track the number of passes made
- /// within the thread worker function.
- int passes_;
-};
-
-const int ReceiverTest::WORKER_MAX_PASSES = 5;
-
-/// Verifies the basic operation of the Receiver class.
-/// It checks that a Receiver can be created, can be stopped,
-/// and that in set and clear sockets.
-TEST_F(ReceiverTest, receiverClassBasics) {
-
- /// We'll create a receiver and let it run until it expires. (Note this is more
- /// of a test of ReceiverTest itself and ensures our tests later for why we
- /// exited are sound.)
- receiver_.reset(new Receiver());
- ASSERT_FALSE(receiver_->isRunning());
- receiver_->start(boost::bind(&ReceiverTest::worker, this, Receiver::RCV_TERMINATE));
- ASSERT_TRUE(receiver_->isRunning());
-
- // Wait long enough for thread to expire.
- nap(WORKER_MAX_PASSES + 1);
-
- // It should have done the maximum number of passes.
- EXPECT_EQ(passes_, WORKER_MAX_PASSES);
-
- // Error should be ready and error text should be "thread expired".
- ASSERT_TRUE(receiver_->isReady(Receiver::RCV_ERROR));
- ASSERT_FALSE(receiver_->isReady(Receiver::RCV_READY));
- ASSERT_FALSE(receiver_->isReady(Receiver::RCV_TERMINATE));
- EXPECT_EQ("thread expired", receiver_->getLastError());
-
- // Thread is technically still running, so let's stop it.
- EXPECT_TRUE(receiver_->isRunning());
- ASSERT_NO_THROW(receiver_->stop());
- ASSERT_FALSE(receiver_->isRunning());
-
- /// Now we'll test stopping a thread.
- /// Start the receiver, let it run a little and then tell it to stop.
- receiver_->start(boost::bind(&ReceiverTest::worker, this, Receiver::RCV_TERMINATE));
- ASSERT_TRUE(receiver_->isRunning());
-
- // No watches should be ready.
- ASSERT_FALSE(receiver_->isReady(Receiver::RCV_ERROR));
- ASSERT_FALSE(receiver_->isReady(Receiver::RCV_READY));
- ASSERT_FALSE(receiver_->isReady(Receiver::RCV_TERMINATE));
-
- // Wait a little while.
- nap(2);
-
- // Tell it to stop.
- receiver_->stop();
- ASSERT_FALSE(receiver_->isRunning());
-
- // It should have done less than the maximum number of passes.
- EXPECT_LT(passes_, WORKER_MAX_PASSES);
-
- // No watches should be ready. Error text should be "thread stopped".
- ASSERT_FALSE(receiver_->isReady(Receiver::RCV_ERROR));
- ASSERT_FALSE(receiver_->isReady(Receiver::RCV_READY));
- ASSERT_FALSE(receiver_->isReady(Receiver::RCV_TERMINATE));
- EXPECT_EQ("thread stopped", receiver_->getLastError());
-
-
- // Next we'll test error notification.
- // Start the receiver with a thread that sets an error on the second pass.
- receiver_->start(boost::bind(&ReceiverTest::worker, this, Receiver::RCV_ERROR));
- ASSERT_TRUE(receiver_->isRunning());
-
- // No watches should be ready.
- ASSERT_FALSE(receiver_->isReady(Receiver::RCV_ERROR));
- ASSERT_FALSE(receiver_->isReady(Receiver::RCV_READY));
- ASSERT_FALSE(receiver_->isReady(Receiver::RCV_TERMINATE));
-
- // Wait a little while.
- nap(2);
-
- // It should now indicate an error.
- ASSERT_TRUE(receiver_->isReady(Receiver::RCV_ERROR));
- EXPECT_EQ("we have an error", receiver_->getLastError());
-
- // Tell it to stop.
- receiver_->stop();
- ASSERT_FALSE(receiver_->isRunning());
-
- // It should have done less than the maximum number of passes.
- EXPECT_LT(passes_, WORKER_MAX_PASSES);
-
- // No watches should be ready. Error text should be "thread stopped".
- ASSERT_FALSE(receiver_->isReady(Receiver::RCV_ERROR));
- ASSERT_FALSE(receiver_->isReady(Receiver::RCV_READY));
- ASSERT_FALSE(receiver_->isReady(Receiver::RCV_TERMINATE));
- EXPECT_EQ("thread stopped", receiver_->getLastError());
-
-
- // Finally, we'll test data ready notification.
- // We'll start the receiver with a thread that indicates data ready on its second pass.
- receiver_->start(boost::bind(&ReceiverTest::worker, this, Receiver::RCV_READY));
- ASSERT_TRUE(receiver_->isRunning());
-
- // No watches should be ready.
- ASSERT_FALSE(receiver_->isReady(Receiver::RCV_ERROR));
- ASSERT_FALSE(receiver_->isReady(Receiver::RCV_READY));
- ASSERT_FALSE(receiver_->isReady(Receiver::RCV_TERMINATE));
-
- // Wait a little while.
- nap(2);
-
- // It should now indicate data ready.
- ASSERT_TRUE(receiver_->isReady(Receiver::RCV_READY));
-
- // Tell it to stop.
- receiver_->stop();
- ASSERT_FALSE(receiver_->isRunning());
-
- // It should have done less than the maximum number of passes.
- EXPECT_LT(passes_, WORKER_MAX_PASSES);
-
- // No watches should be ready. Error text should be "thread stopped".
- ASSERT_FALSE(receiver_->isReady(Receiver::RCV_ERROR));
- ASSERT_FALSE(receiver_->isReady(Receiver::RCV_READY));
- ASSERT_FALSE(receiver_->isReady(Receiver::RCV_TERMINATE));
- EXPECT_EQ("thread stopped", receiver_->getLastError());
+ ASSERT_FALSE(ifacemgr->isDHCPReceiverRunning());
}
}
// Now unregister the factory.
ASSERT_NO_THROW(mgr().unregisterPacketQueueFactory("custom-queue"));
-
- // Verify we did not lose the queue.
- checkMyInfo("{ \"capacity\": 2000, \"queue-type\": \"custom-queue\", \"size\": 0 }");
+ // Queue should be gone too.
+ ASSERT_FALSE(mgr().getPacketQueue());
// Try and recreate the custom queue, type should be invalid.
ASSERT_THROW(mgr().createPacketQueue(config), InvalidQueueType);
// Now unregister the factory.
ASSERT_NO_THROW(mgr().unregisterPacketQueueFactory("custom-queue"));
-
- // Verify we did not lose the queue.
- checkMyInfo("{ \"capacity\": 2000, \"queue-type\": \"custom-queue\", \"size\": 0 }");
+ // Queue should be gone too.
+ ASSERT_FALSE(mgr().getPacketQueue());
// Try and recreate the custom queue, type should be invalid.
ASSERT_THROW(mgr().createPacketQueue(config), InvalidQueueType);
lib_LTLIBRARIES = libkea-threads.la
libkea_threads_la_SOURCES = sync.h sync.cc
libkea_threads_la_SOURCES += thread.h thread.cc
+libkea_threads_la_SOURCES += watched_thread.h watched_thread.cc
libkea_threads_la_LIBADD = $(top_builddir)/src/lib/exceptions/libkea-exceptions.la
libkea_threads_la_LDFLAGS = -no-undefined -version-info 1:0:0
run_unittests_SOURCES += thread_unittest.cc
run_unittests_SOURCES += lock_unittest.cc
run_unittests_SOURCES += condvar_unittest.cc
+run_unittests_SOURCES += watched_thread_unittest.cc
run_unittests_CPPFLAGS = $(AM_CPPFLAGS) $(GTEST_INCLUDES)
run_unittests_LDFLAGS = $(AM_LDFLAGS) $(GTEST_LDFLAGS)
--- /dev/null
+// Copyright (C) 2018 Internet Systems Consortium, Inc. ("ISC")
+//
+// This Source Code Form is subject to the terms of the Mozilla Public
+// License, v. 2.0. If a copy of the MPL was not distributed with this
+// file, You can obtain one at http://mozilla.org/MPL/2.0/.
+
+#include <config.h>
+
+#include <util/threads/watched_thread.h>
+
+#include <boost/bind.hpp>
+#include <gtest/gtest.h>
+
+#include <unistd.h>
+
+using namespace std;
+using namespace isc;
+using namespace isc::util;
+using namespace isc::util::thread;
+
+namespace {
+
+/// @brief Test Fixture for testing isc:util::thread::WatchedThread
+class WatchedThreadTest : public ::testing::Test {
+public:
+ /// @brief Maximum number of passes allowed in worker event loop
+ static const int WORKER_MAX_PASSES;
+
+ /// @brief Constructor.
+ WatchedThreadTest() {}
+
+ /// @brief Destructor.
+ ~WatchedThreadTest() {
+ }
+
+ /// @brief Sleeps for a given number of event periods sleep
+ /// Each period is 50 ms.
+ void nap(int periods) {
+ usleep(periods * 500 * 1000);
+ };
+
+ /// @brief Worker function to be used by the WatchedThread's thread
+ ///
+ /// The function runs 5 passes through an "event" loop.
+ /// On each pass:
+ /// - check terminate command
+ /// - instigate the desired event (second pass only)
+ /// - naps for 1 period (50ms)
+ ///
+ /// @param watch_type type of event that should occur
+ void worker(WatchedThread::WatchType watch_type) {
+ for (passes_ = 1; passes_ < WORKER_MAX_PASSES; ++passes_) {
+
+ // Stop if we're told to do it.
+ if (wthread_->shouldTerminate()) {
+ return;
+ }
+
+ // On the second pass, set the event.
+ if (passes_ == 2) {
+ switch (watch_type) {
+ case WatchedThread::RCV_ERROR:
+ wthread_->setError("we have an error");
+ break;
+ case WatchedThread::RCV_READY:
+ wthread_->markReady(watch_type);
+ break;
+ case WatchedThread::RCV_TERMINATE:
+ default:
+ // Do nothing, we're waiting to be told to stop.
+ break;
+ }
+ }
+
+ // Take a nap.
+ nap(1);
+ }
+
+ // Indicate why we stopped.
+ wthread_->setError("thread expired");
+ }
+
+ /// @brief Current receiver instance.
+ WatchedThreadPtr wthread_;
+
+ /// @brief Counter used to track the number of passes made
+ /// within the thread worker function.
+ int passes_;
+};
+
+const int WatchedThreadTest::WORKER_MAX_PASSES = 5;
+
+/// Verifies the basic operation of the WatchedThread class.
+/// It checks that a WatchedThread can be created, can be stopped,
+/// and that in set and clear sockets.
+TEST_F(WatchedThreadTest, receiverClassBasics) {
+
+ /// We'll create a receiver and let it run until it expires. (Note this is more
+ /// of a test of WatchedThreadTest itself and ensures our tests later for why we
+ /// exited are sound.)
+ wthread_.reset(new WatchedThread());
+ ASSERT_FALSE(wthread_->isRunning());
+ wthread_->start(boost::bind(&WatchedThreadTest::worker, this, WatchedThread::RCV_TERMINATE));
+ ASSERT_TRUE(wthread_->isRunning());
+
+ // Wait long enough for thread to expire.
+ nap(WORKER_MAX_PASSES + 1);
+
+ // It should have done the maximum number of passes.
+ EXPECT_EQ(passes_, WORKER_MAX_PASSES);
+
+ // Error should be ready and error text should be "thread expired".
+ ASSERT_TRUE(wthread_->isReady(WatchedThread::RCV_ERROR));
+ ASSERT_FALSE(wthread_->isReady(WatchedThread::RCV_READY));
+ ASSERT_FALSE(wthread_->isReady(WatchedThread::RCV_TERMINATE));
+ EXPECT_EQ("thread expired", wthread_->getLastError());
+
+ // Thread is technically still running, so let's stop it.
+ EXPECT_TRUE(wthread_->isRunning());
+ ASSERT_NO_THROW(wthread_->stop());
+ ASSERT_FALSE(wthread_->isRunning());
+
+ /// Now we'll test stopping a thread.
+ /// Start the receiver, let it run a little and then tell it to stop.
+ wthread_->start(boost::bind(&WatchedThreadTest::worker, this, WatchedThread::RCV_TERMINATE));
+ ASSERT_TRUE(wthread_->isRunning());
+
+ // No watches should be ready.
+ ASSERT_FALSE(wthread_->isReady(WatchedThread::RCV_ERROR));
+ ASSERT_FALSE(wthread_->isReady(WatchedThread::RCV_READY));
+ ASSERT_FALSE(wthread_->isReady(WatchedThread::RCV_TERMINATE));
+
+ // Wait a little while.
+ nap(2);
+
+ // Tell it to stop.
+ wthread_->stop();
+ ASSERT_FALSE(wthread_->isRunning());
+
+ // It should have done less than the maximum number of passes.
+ EXPECT_LT(passes_, WORKER_MAX_PASSES);
+
+ // No watches should be ready. Error text should be "thread stopped".
+ ASSERT_FALSE(wthread_->isReady(WatchedThread::RCV_ERROR));
+ ASSERT_FALSE(wthread_->isReady(WatchedThread::RCV_READY));
+ ASSERT_FALSE(wthread_->isReady(WatchedThread::RCV_TERMINATE));
+ EXPECT_EQ("thread stopped", wthread_->getLastError());
+
+
+ // Next we'll test error notification.
+ // Start the receiver with a thread that sets an error on the second pass.
+ wthread_->start(boost::bind(&WatchedThreadTest::worker, this, WatchedThread::RCV_ERROR));
+ ASSERT_TRUE(wthread_->isRunning());
+
+ // No watches should be ready.
+ ASSERT_FALSE(wthread_->isReady(WatchedThread::RCV_ERROR));
+ ASSERT_FALSE(wthread_->isReady(WatchedThread::RCV_READY));
+ ASSERT_FALSE(wthread_->isReady(WatchedThread::RCV_TERMINATE));
+
+ // Wait a little while.
+ nap(2);
+
+ // It should now indicate an error.
+ ASSERT_TRUE(wthread_->isReady(WatchedThread::RCV_ERROR));
+ EXPECT_EQ("we have an error", wthread_->getLastError());
+
+ // Tell it to stop.
+ wthread_->stop();
+ ASSERT_FALSE(wthread_->isRunning());
+
+ // It should have done less than the maximum number of passes.
+ EXPECT_LT(passes_, WORKER_MAX_PASSES);
+
+ // No watches should be ready. Error text should be "thread stopped".
+ ASSERT_FALSE(wthread_->isReady(WatchedThread::RCV_ERROR));
+ ASSERT_FALSE(wthread_->isReady(WatchedThread::RCV_READY));
+ ASSERT_FALSE(wthread_->isReady(WatchedThread::RCV_TERMINATE));
+ EXPECT_EQ("thread stopped", wthread_->getLastError());
+
+
+ // Finally, we'll test data ready notification.
+ // We'll start the receiver with a thread that indicates data ready on its second pass.
+ wthread_->start(boost::bind(&WatchedThreadTest::worker, this, WatchedThread::RCV_READY));
+ ASSERT_TRUE(wthread_->isRunning());
+
+ // No watches should be ready.
+ ASSERT_FALSE(wthread_->isReady(WatchedThread::RCV_ERROR));
+ ASSERT_FALSE(wthread_->isReady(WatchedThread::RCV_READY));
+ ASSERT_FALSE(wthread_->isReady(WatchedThread::RCV_TERMINATE));
+
+ // Wait a little while.
+ nap(2);
+
+ // It should now indicate data ready.
+ ASSERT_TRUE(wthread_->isReady(WatchedThread::RCV_READY));
+
+ // Tell it to stop.
+ wthread_->stop();
+ ASSERT_FALSE(wthread_->isRunning());
+
+ // It should have done less than the maximum number of passes.
+ EXPECT_LT(passes_, WORKER_MAX_PASSES);
+
+ // No watches should be ready. Error text should be "thread stopped".
+ ASSERT_FALSE(wthread_->isReady(WatchedThread::RCV_ERROR));
+ ASSERT_FALSE(wthread_->isReady(WatchedThread::RCV_READY));
+ ASSERT_FALSE(wthread_->isReady(WatchedThread::RCV_TERMINATE));
+ EXPECT_EQ("thread stopped", wthread_->getLastError());
+}
+
+}
--- /dev/null
+// Copyright (C) 2018 Internet Systems Consortium, Inc. ("ISC")
+//
+// This Source Code Form is subject to the terms of the Mozilla Public
+// License, v. 2.0. If a copy of the MPL was not distributed with this
+// file, You can obtain one at http://mozilla.org/MPL/2.0/.
+
+#include <config.h>
+#include <util/threads/watched_thread.h>
+
+namespace isc {
+namespace util {
+namespace thread {
+
+void
+WatchedThread::start(const boost::function<void()>& thread_main) {
+ clearReady(RCV_ERROR);
+ clearReady(RCV_READY);
+ clearReady(RCV_TERMINATE);
+ last_error_ = "no error";
+ thread_.reset(new Thread(thread_main));
+}
+
+int
+WatchedThread::getWatchFd(WatchType watch_type) {
+ return(sockets_[watch_type].getSelectFd());
+}
+
+void
+WatchedThread::markReady(WatchType watch_type) {
+ sockets_[watch_type].markReady();
+}
+
+bool
+WatchedThread::isReady(WatchType watch_type) {
+ return (sockets_[watch_type].isReady());
+}
+
+void
+WatchedThread::clearReady(WatchType watch_type) {
+ sockets_[watch_type].clearReady();
+}
+
+bool
+WatchedThread::shouldTerminate() {
+ if (sockets_[RCV_TERMINATE].isReady()) {
+ clearReady(RCV_TERMINATE);
+ return (true);
+ }
+
+ return (false);
+}
+
+void
+WatchedThread::stop() {
+ if (thread_) {
+ markReady(RCV_TERMINATE);
+ thread_->wait();
+ thread_.reset();
+ }
+
+ clearReady(RCV_ERROR);
+ clearReady(RCV_READY);
+ last_error_ = "thread stopped";
+}
+
+void
+WatchedThread::setError(const std::string& error_msg) {
+ last_error_ = error_msg;
+ markReady(RCV_ERROR);
+}
+
+std::string
+WatchedThread::getLastError() {
+ return (last_error_);
+}
+} // end of namespace isc::util::thread
+} // end of namespace isc::util
+} // end of namespace isc
--- /dev/null
+// Copyright (C) 2018 Internet Systems Consortium, Inc. ("ISC")
+//
+// This Source Code Form is subject to the terms of the Mozilla Public
+// License, v. 2.0. If a copy of the MPL was not distributed with this
+// file, You can obtain one at http://mozilla.org/MPL/2.0/.
+
+#ifndef WATCHED_THREAD_H
+#define WATCHED_THREAD_H
+
+#include <util/watch_socket.h>
+#include <util/threads/thread.h>
+
+#include <boost/shared_ptr.hpp>
+
+namespace isc {
+namespace util {
+namespace thread {
+
+/// @brief Provides a thread and controls for receiving packets.
+///
+/// Given a "worker function", this class creates a thread which
+/// runs the function and provides the means to monitor the thread
+/// for "error" and "ready" conditions, and finally to stop the thread.
+/// It uses three WatchSockets: one to indicate an error, one to indicate
+/// data is ready, and a third to monitor as a shut-down command.
+class WatchedThread {
+public:
+ /// @brief Enumerates the list of watch sockets used to mark events
+ /// These are used as arguments to watch socket accessor methods.
+ enum WatchType {
+ RCV_ERROR = 0,
+ RCV_READY = 1,
+ RCV_TERMINATE = 2
+ };
+
+ /// @brief Constructor
+ WatchedThread(){};
+
+ /// @brief Virtual destructor
+ virtual ~WatchedThread(){}
+
+ /// @brief Fetches the fd of a watch socket
+ ///
+ /// @param watch_type indicates which watch socket
+ /// @return the watch socket's file descriptor
+ int getWatchFd(WatchType watch_type);
+
+ /// @brief Sets a watch socket state to ready
+ ///
+ /// @param watch_type indicates which watch socket to mark
+ void markReady(WatchType watch_type);
+
+ /// @brief Indicates if a watch socket state is ready
+ ///
+ /// @param watch_type indicates which watch socket to mark
+ /// @return true if the watch socket is ready, false otherwise
+ bool isReady(WatchType watch_type);
+
+ /// @brief Sets a watch socket state to not ready
+ ///
+ /// @param watch_type indicates which watch socket to clear
+ void clearReady(WatchType watch_type);
+
+ /// @brief Checks if the receiver thread should terminate
+ ///
+ /// Performs a "one-shot" check of the receiver's terminate
+ /// watch socket. If it is ready, return true and then clear
+ /// it, otherwise return false.
+ ///
+ /// @return true if the terminate watch socket is ready
+ bool shouldTerminate();
+
+ /// @brief Creates and runs the thread.
+ ///
+ /// Creates teh receiver's thread, passing into it the given
+ /// function to run.
+ ///
+ /// @param thread_main function the receiver's thread should run
+ void start(const boost::function<void()>& thread_main);
+
+ /// @brief Returns true if the receiver thread is running
+ /// @todo - this may need additional logic to handle cases where
+ /// a thread function exits w/o the caller invoking @c
+ /// WatchedThread::stop().
+ bool isRunning() {
+ return (thread_ != 0);
+ }
+
+ /// @brief Terminates the receiver thread
+ ///
+ /// It marks the terminate watch socket ready, and then waits for the
+ /// thread to stop. At this point, the receiver is defunct. This is
+ /// not done in the destructor to avoid race conditions.
+ void stop();
+
+ /// @brief Sets the receiver error state
+ ///
+ /// This records the given error message and sets the error watch
+ /// socket to ready.
+ ///
+ /// @param error_msg
+ void setError(const std::string& error_msg);
+
+ /// @brief Fetches the error message text for the most recent socket error
+ ///
+ /// @return string containing the error message
+ std::string getLastError();
+
+ /// @brief Error message of the last error encountered
+ std::string last_error_;
+
+ /// @brief DHCP watch sockets that are used to communicate with the owning thread
+ /// There are three:
+ /// -# RCV_ERROR - packet receive error watch socket.
+ /// Marked as ready when the DHCP packet receiver experiences an I/O error.
+ /// -# RCV_READY - Marked as ready when the DHCP packet receiver adds a packet
+ /// to the packet queue.
+ /// -# RCV_TERMINATE Packet receiver terminate watch socket.
+ /// Marked as ready when the DHCP packet receiver thread should terminate.
+ WatchSocket sockets_[RCV_TERMINATE + 1];
+
+ /// DHCP packet receiver thread.
+ thread::ThreadPtr thread_ ;
+};
+
+/// @brief Defines a pointer to a WatchedThread
+typedef boost::shared_ptr<WatchedThread> WatchedThreadPtr;
+
+}; // namespace isc::util::thread
+}; // namespace isc::util
+}; // namespace isc
+
+#endif // WATCHED_THREAD_H