return (false); // socket not found
}
+Receiver::Receiver(const boost::function<void()>& thread_main) {
+ clearReady(RCV_ERROR);
+ clearReady(RCV_READY);
+ clearReady(RCV_TERMINATE);
+ last_error_ = "no error";
+ thread_.reset(new isc::util::thread::Thread(thread_main));
+}
+
+int
+Receiver::getWatchFd(WatchType watch_type) {
+ return(sockets_[watch_type].getSelectFd());
+}
+
+void
+Receiver::markReady(WatchType watch_type) {
+ sockets_[watch_type].markReady();
+}
+
+bool
+Receiver::isReady(WatchType watch_type) {
+ return (sockets_[watch_type].isReady());
+}
+
+void
+Receiver::clearReady(WatchType watch_type) {
+ sockets_[watch_type].clearReady();
+}
+
+bool
+Receiver::shouldTerminate() {
+ if (sockets_[RCV_TERMINATE].isReady()) {
+ clearReady(RCV_TERMINATE);
+ return (true);
+ }
+
+ return (false);
+}
+
+void
+Receiver::stop() {
+ markReady(RCV_TERMINATE);
+ thread_->wait();
+ thread_.reset();
+ clearReady(RCV_ERROR);
+ clearReady(RCV_READY);
+ last_error_ = "thread stopped";
+}
+
+isc::util::thread::Mutex&
+Receiver::getLock() {
+ return(lock_);
+}
+
+void
+Receiver::setError(const std::string& error_msg) {
+ last_error_ = error_msg;
+ markReady(RCV_ERROR);
+}
+
+std::string
+Receiver::getLastError() {
+ return (last_error_);
+}
+
IfaceMgr::IfaceMgr()
:control_buf_len_(CMSG_SPACE(sizeof(struct in6_pktinfo))),
control_buf_(new char[control_buf_len_]),
packet_filter_(new PktFilterInet()),
packet_filter6_(new PktFilterInet6()),
test_mode_(false),
- allow_loopback_(false),
- receiver_error_("no error") {
+ allow_loopback_(false) {
// Ensure that PQMs have been created to guarantee we have
// default packet queues in place.
void IfaceMgr::stopDHCPReceiver() {
if (isReceiverRunning()) {
- terminate_watch_.markReady();
- receiver_thread_->wait();
- receiver_thread_.reset();
- error_watch_.clearReady();
-
+ receiver_->stop();
+ receiver_.reset();
}
- receiver_error_ = "no error";
if (getPacketQueue4()) {
getPacketQueue4()->clear();
case AF_INET:
// If there's no queue, then has been disabled, simply return.
if(!getPacketQueue4()) {
- return;
+ return;
}
- receiver_thread_.reset(new Thread(boost::bind(&IfaceMgr::receiveDHCP4Packets, this)));
+ receiver_.reset(new Receiver(boost::bind(boost::bind(&IfaceMgr::receiveDHCP4Packets, this))));
break;
case AF_INET6:
// If there's no queue, then has been disabled, simply return.
return;
}
- receiver_thread_.reset(new Thread(boost::bind(&IfaceMgr::receiveDHCP6Packets, this)));
+ receiver_.reset(new Receiver(boost::bind(boost::bind(&IfaceMgr::receiveDHCP6Packets, this))));
break;
default:
isc_throw (BadValue, "startDHCPReceiver: invalid family: " << family);
isc_throw(BadValue, "fractional timeout must be shorter than"
" one million microseconds");
}
+
fd_set sockets;
int maxfd = 0;
// if there are any callbacks for external sockets registered...
if (!callbacks_.empty()) {
BOOST_FOREACH(SocketCallbackInfo s, callbacks_) {
- FD_SET(s.socket_, &sockets);
- if (maxfd < s.socket_) {
- maxfd = s.socket_;
- }
+ add_fd(s.socket_, maxfd, &sockets);
}
}
- // Add receiver thread watch and error sockets.
- FD_SET(receive_watch_.getSelectFd(), &sockets);
- if (maxfd < receive_watch_.getSelectFd()) {
- maxfd = receive_watch_.getSelectFd();
- }
- FD_SET(error_watch_.getSelectFd(), &sockets);
- if (maxfd < error_watch_.getSelectFd()) {
- maxfd = error_watch_.getSelectFd();
- }
+ // Add Receiver ready watch socket
+ add_fd(receiver_->getWatchFd(Receiver::RCV_READY), maxfd, &sockets);
+
+ // Add Receiver error watch socket
+ add_fd(receiver_->getWatchFd(Receiver::RCV_ERROR), maxfd, &sockets);
// Set timeout for our next select() call. If there are
// no DHCP packets to read, then we'll wait for a finite
// We only check external sockets if select detected an event.
if (result > 0) {
// Check for receiver thread read errors.
- if (FD_ISSET(error_watch_.getSelectFd(), &sockets)) {
- string msg = receiver_error_;
- error_watch_.clearReady();
+ if (receiver_->isReady(Receiver::RCV_ERROR)) {
+ string msg = receiver_->getLastError();
+ receiver_->clearReady(Receiver::RCV_ERROR);
isc_throw(SocketReadError, msg);
}
// If we're here it should only be because there are DHCP packets waiting.
// Protected packet queue access.
- Mutex::Locker lock(receiver_lock_);
- Pkt4Ptr pkt = getPacketQueue4()->dequeuePacket();
- if (!pkt) {
- receive_watch_.clearReady();
+ Pkt4Ptr pkt;
+ {
+ Mutex::Locker lock(receiver_->getLock());
+ pkt = getPacketQueue4()->dequeuePacket();
+ if (!pkt) {
+ receiver_->clearReady(Receiver::RCV_READY);
+ }
}
return (pkt);
/// provided set to indicated which sockets have something to read.
BOOST_FOREACH(iface, ifaces_) {
BOOST_FOREACH(SocketInfo s, iface->getSockets()) {
-
// Only deal with IPv4 addresses.
if (s.addr_.isV4()) {
-
// Add this socket to listening set
- FD_SET(s.sockfd_, &sockets);
- if (maxfd < s.sockfd_) {
- maxfd = s.sockfd_;
- }
+ add_fd(s.sockfd_, maxfd, &sockets);
}
}
}
// if there are any callbacks for external sockets registered...
if (!callbacks_.empty()) {
BOOST_FOREACH(SocketCallbackInfo s, callbacks_) {
- FD_SET(s.socket_, &sockets);
- if (maxfd < s.socket_) {
- maxfd = s.socket_;
- }
+ // Add this socket to listening set
+ add_fd(s.socket_, maxfd, &sockets);
}
}
return (packet_filter_->receive(*iface, *candidate));
}
-Pkt6Ptr IfaceMgr::receive6(uint32_t timeout_sec, uint32_t timeout_usec /* = 0 */) {
+Pkt6Ptr
+IfaceMgr::receive6(uint32_t timeout_sec, uint32_t timeout_usec /* = 0 */) {
if (isReceiverRunning()) {
return (receive6Indirect(timeout_sec, timeout_usec));
}
return (receive6Direct(timeout_sec, timeout_usec));
}
-Pkt6Ptr IfaceMgr::receive6Direct(uint32_t timeout_sec, uint32_t timeout_usec /* = 0 */ ) {
+void
+IfaceMgr::add_fd(int fd, int& maxfd, fd_set* sockets) {
+ if (!sockets) {
+ isc_throw(BadValue, "add_fd: sockets can't be null");
+ }
+
+ FD_SET(fd, sockets);
+ if (maxfd < fd) {
+ maxfd = fd;
+ }
+}
+
+Pkt6Ptr
+IfaceMgr::receive6Direct(uint32_t timeout_sec, uint32_t timeout_usec /* = 0 */ ) {
// Sanity check for microsecond timeout.
if (timeout_usec >= 1000000) {
isc_throw(BadValue, "fractional timeout must be shorter than"
/// and then use its copy for select(). Please note that select() modifies
/// provided set to indicated which sockets have something to read.
BOOST_FOREACH(IfacePtr iface, ifaces_) {
-
BOOST_FOREACH(SocketInfo s, iface->getSockets()) {
// Only deal with IPv6 addresses.
if (s.addr_.isV6()) {
-
// Add this socket to listening set
- FD_SET(s.sockfd_, &sockets);
- if (maxfd < s.sockfd_) {
- maxfd = s.sockfd_;
- }
+ add_fd(s.sockfd_, maxfd, &sockets);
}
}
}
if (!callbacks_.empty()) {
BOOST_FOREACH(SocketCallbackInfo s, callbacks_) {
// Add it to the set as well
- FD_SET(s.socket_, &sockets);
- if (maxfd < s.socket_) {
- maxfd = s.socket_;
- }
+ add_fd(s.socket_, maxfd, &sockets);
}
}
return (packet_filter6_->receive(*candidate));
}
-
-Pkt6Ptr IfaceMgr::receive6Indirect(uint32_t timeout_sec, uint32_t timeout_usec /* = 0 */ ) {
+Pkt6Ptr
+IfaceMgr::receive6Indirect(uint32_t timeout_sec, uint32_t timeout_usec /* = 0 */ ) {
// Sanity check for microsecond timeout.
if (timeout_usec >= 1000000) {
isc_throw(BadValue, "fractional timeout must be shorter than"
if (!callbacks_.empty()) {
BOOST_FOREACH(SocketCallbackInfo s, callbacks_) {
// Add it to the set as well
- FD_SET(s.socket_, &sockets);
- if (maxfd < s.socket_) {
- maxfd = s.socket_;
- }
+ add_fd(s.socket_, maxfd, &sockets);
}
}
- // Add receiver thread watch and error sockets.
- FD_SET(receive_watch_.getSelectFd(), &sockets);
- if (maxfd < receive_watch_.getSelectFd()) {
- maxfd = receive_watch_.getSelectFd();
- }
- FD_SET(error_watch_.getSelectFd(), &sockets);
- if (maxfd < error_watch_.getSelectFd()) {
- maxfd = error_watch_.getSelectFd();
- }
+ // Add Receiver ready watch socket
+ add_fd(receiver_->getWatchFd(Receiver::RCV_READY), maxfd, &sockets);
+
+ // Add Receiver error watch socket
+ add_fd(receiver_->getWatchFd(Receiver::RCV_ERROR), maxfd, &sockets);
// Set timeout for our next select() call. If there are
// no DHCP packets to read, then we'll wait for a finite
// We only check external sockets if select detected an event.
if (result > 0) {
// Check for receiver thread read errors.
- if (FD_ISSET(error_watch_.getSelectFd(), &sockets)) {
- string msg = receiver_error_;
- error_watch_.clearReady();
+ if (receiver_->isReady(Receiver::RCV_ERROR)) {
+ string msg = receiver_->getLastError();
+ receiver_->clearReady(Receiver::RCV_ERROR);
isc_throw(SocketReadError, msg);
}
// If we're here it should only be because there are DHCP packets waiting.
// Protected packet queue access.
- Mutex::Locker lock(receiver_lock_);
- Pkt6Ptr pkt = getPacketQueue6()->dequeuePacket();
- if (!pkt) {
- receive_watch_.clearReady();
+ Pkt6Ptr pkt;
+ {
+ Mutex::Locker lock(receiver_->getLock());
+ pkt = getPacketQueue6()->dequeuePacket();
+ if (!pkt) {
+ receiver_->clearReady(Receiver::RCV_READY);
+ }
}
return (pkt);
}
-void IfaceMgr::receiveDHCP4Packets() {
+void
+IfaceMgr::receiveDHCP4Packets() {
IfacePtr iface;
fd_set sockets;
int maxfd = 0;
FD_ZERO(&sockets);
// Add terminate watch socket.
- FD_SET(terminate_watch_.getSelectFd(), &sockets);
- if (maxfd < terminate_watch_.getSelectFd()) {
- maxfd = terminate_watch_.getSelectFd();
- }
+ add_fd(receiver_->getWatchFd(Receiver::RCV_TERMINATE), maxfd, &sockets);
// Add Interface sockets.
BOOST_FOREACH(iface, ifaces_) {
BOOST_FOREACH(SocketInfo s, iface->getSockets()) {
-
// Only deal with IPv4 addresses.
if (s.addr_.isV4()) {
// Add this socket to listening set.
- FD_SET(s.sockfd_, &sockets);
- if (maxfd < s.sockfd_) {
- maxfd = s.sockfd_;
- }
+ add_fd(s.sockfd_, maxfd, &sockets);
}
}
}
for (;;) {
// Check the watch socket.
- if (terminate_watch_.isReady()) {
- terminate_watch_.clearReady();
+ if (receiver_->shouldTerminate()) {
return;
}
int result = select(maxfd + 1, &rd_set, 0, 0, 0);
// Re-check the watch socket.
- if (terminate_watch_.isReady()) {
- terminate_watch_.clearReady();
+ if (receiver_->shouldTerminate()) {
return;
}
// This thread should not get signals?
if (errno != EINTR) {
// Signal the error to receive4.
- receiver_error_ = strerror(errno);
- error_watch_.markReady();
+ receiver_->setError(strerror(errno));
// We need to sleep in case of the error condition to
// prevent the thread from tight looping when result
// gets negative.
if (FD_ISSET(s.sockfd_, &sockets)) {
receiveDHCP4Packet(*iface, s);
// Can take time so check one more time the watch socket.
- if (terminate_watch_.isReady()) {
- terminate_watch_.clearReady();
+ if (receiver_->shouldTerminate()) {
return;
}
}
}
-void IfaceMgr::receiveDHCP6Packets() {
+void
+IfaceMgr::receiveDHCP6Packets() {
IfacePtr iface;
fd_set sockets;
int maxfd = 0;
FD_ZERO(&sockets);
// Add terminate watch socket.
- FD_SET(terminate_watch_.getSelectFd(), &sockets);
- if (maxfd < terminate_watch_.getSelectFd()) {
- maxfd = terminate_watch_.getSelectFd();
- }
+ add_fd(receiver_->getWatchFd(Receiver::RCV_TERMINATE), maxfd, &sockets);
// Add Interface sockets.
BOOST_FOREACH(iface, ifaces_) {
BOOST_FOREACH(SocketInfo s, iface->getSockets()) {
-
// Only deal with IPv6 addresses.
if (s.addr_.isV6()) {
-
// Add this socket to listening set.
- FD_SET(s.sockfd_, &sockets);
- if (maxfd < s.sockfd_) {
- maxfd = s.sockfd_;
- }
+ add_fd(s.sockfd_ , maxfd, &sockets);
}
}
}
for (;;) {
// Check the watch socket.
- if (terminate_watch_.isReady()) {
- terminate_watch_.clearReady();
+ if (receiver_->shouldTerminate()) {
return;
}
int result = select(maxfd + 1, &rd_set, 0, 0, 0);
// Re-check the watch socket.
- if (terminate_watch_.isReady()) {
- terminate_watch_.clearReady();
+ if (receiver_->shouldTerminate()) {
return;
}
// This thread should not get signals?
if (errno != EINTR) {
// Signal the error to receive6.
- receiver_error_ = strerror(errno);
- error_watch_.markReady();
+ receiver_->setError(strerror(errno));
// We need to sleep in case of the error condition to
// prevent the thread from tight looping when result
// gets negative.
if (FD_ISSET(s.sockfd_, &sockets)) {
receiveDHCP6Packet(s);
// Can take time so check one more time the watch socket.
- if (terminate_watch_.isReady()) {
- terminate_watch_.clearReady();
+ if (receiver_->shouldTerminate()) {
return;
}
}
}
}
-void IfaceMgr::receiveDHCP4Packet(Iface& iface, const SocketInfo& socket_info) {
+void
+IfaceMgr::receiveDHCP4Packet(Iface& iface, const SocketInfo& socket_info) {
int len;
+
int result = ioctl(socket_info.sockfd_, FIONREAD, &len);
if (result < 0) {
// Signal the error to receive4.
- receiver_error_ = strerror(errno);
- error_watch_.markReady();
+ receiver_->setError(strerror(errno));
return;
}
if (len == 0) {
try {
pkt = packet_filter_->receive(iface, socket_info);
} catch (const std::exception& ex) {
- receiver_error_ = ex.what();
- error_watch_.markReady();
+ receiver_->setError(strerror(errno));
} catch (...) {
- receiver_error_ = "packet filter receive() failed";
- error_watch_.markReady();
+ receiver_->setError("packet filter receive() failed");
}
if (pkt) {
- Mutex::Locker lock(receiver_lock_);
+ Mutex::Locker lock(receiver_->getLock());
getPacketQueue4()->enqueuePacket(pkt, socket_info);
- receive_watch_.markReady();
+ receiver_->markReady(Receiver::RCV_READY);
}
}
-void IfaceMgr::receiveDHCP6Packet(const SocketInfo& socket_info) {
+void
+IfaceMgr::receiveDHCP6Packet(const SocketInfo& socket_info) {
int len;
+
int result = ioctl(socket_info.sockfd_, FIONREAD, &len);
if (result < 0) {
// Signal the error to receive6.
- receiver_error_ = strerror(errno);
- error_watch_.markReady();
+ receiver_->setError(strerror(errno));
return;
}
if (len == 0) {
try {
pkt = packet_filter6_->receive(socket_info);
} catch (const std::exception& ex) {
- receiver_error_ = ex.what();
- error_watch_.markReady();
+ receiver_->setError(ex.what());
} catch (...) {
- receiver_error_ = "packet filter receive() failed";
- error_watch_.markReady();
+ receiver_->setError("packet filter receive() failed");
}
if (pkt) {
- Mutex::Locker lock(receiver_lock_);
+ Mutex::Locker lock(receiver_->getLock());
getPacketQueue6()->enqueuePacket(pkt, socket_info);
- receive_watch_.markReady();
+ receiver_->markReady(Receiver::RCV_READY);
}
}
-uint16_t IfaceMgr::getSocket(const isc::dhcp::Pkt6& pkt) {
+uint16_t
+IfaceMgr::getSocket(const isc::dhcp::Pkt6& pkt) {
IfacePtr iface = getIface(pkt.getIface());
if (!iface) {
isc_throw(IfaceNotFound, "Tried to find socket for non-existent interface");
typedef boost::shared_ptr<Iface> IfacePtr;
+/// @brief Provides a thread and controls for receiving packets.
+///
+/// Given a "worker function", this class creates a thread which
+/// runs the function and provides the means to monitor the thread
+/// for "error" and "ready" conditions, and finally to stop the thread.
+/// It uses three WatchSockets: one to indicate an error, one to indicate
+/// data is ready, and a third to monitor as a shut-down command.
+class Receiver {
+public:
+ /// @brief Enumerates the list of watch sockets used to mark events
+ /// These are used as arguments to watch socket accessor methods.
+ enum WatchType {
+ RCV_ERROR = 0,
+ RCV_READY = 1,
+ RCV_TERMINATE = 2
+ };
+
+ /// @brief Constructor
+ ///
+ /// It initializes the watch sockets and then instantiates and
+ /// starts the receiver's worker thread.
+ ///
+ /// @param thread_main function the receiver's thread should run
+ Receiver(const boost::function<void()>& thread_main);
+
+ /// @brief Virtual destructor
+ virtual ~Receiver() {}
+
+ /// @brief Fetches the fd of a watch socket
+ ///
+ /// @param watch_type indicates which watch socket
+ /// @return the watch socket's file descriptor
+ int getWatchFd(WatchType watch_type);
+
+ /// @brief Sets a watch socket state to ready
+ ///
+ /// @param watch_type indicates which watch socket to mark
+ void markReady(WatchType watch_type);
+
+ /// @brief Indicates if a watch socket state is ready
+ ///
+ /// @param watch_type indicates which watch socket to mark
+ /// @return true if the watch socket is ready, false otherwise
+ bool isReady(WatchType watch_type);
+
+ /// @brief Sets a watch socket state to not ready
+ ///
+ /// @param watch_type indicates which watch socket to clear
+ void clearReady(WatchType watch_type);
+
+ /// @brief Checks if the receiver thread should terminate
+ ///
+ /// Performs a "one-shot" check of the receiver's terminate
+ /// watch socket. If it is ready, return true and then clear
+ /// it, otherwise return false.
+ ///
+ /// @return true if the terminate watch socket is ready
+ bool shouldTerminate();
+
+ /// @brief Terminates the receiver thread
+ ///
+ /// It marks the terminate watch socket ready, and then waits for the
+ /// thread to stop. At this point, the receiver is defunct. This is
+ /// not done in the destructor to avoid race conditions.
+ void stop();
+
+ /// @brief Fetches the receiver's thread mutex.
+ ///
+ /// This is used for instantation mutex locks to protect code blocks.
+ ///
+ /// @return a reference to the mutex
+ isc::util::thread::Mutex& getLock();
+
+ /// @brief Sets the receiver error state
+ ///
+ /// This records the given error message and sets the error watch
+ /// socket to ready.
+ ///
+ /// @param error_msg
+ void setError(const std::string& error_msg);
+
+ /// @brief Fetches the error message text for the most recent socket error
+ ///
+ /// @return string containing the error message
+ std::string getLastError();
+
+ /// @brief Error message of the last error encountered
+ std::string last_error_;
+
+ /// @brief DHCP watch sockets that are used to communicate with the owning thread
+ /// There are three:
+ /// -# RCV_ERROR - packet receive error watch socket.
+ /// Marked as ready when the DHCP packet receiver experiences an I/O error.
+ /// -# RCV_READY - Marked as ready when the DHCP packet receiver adds a packet
+ /// to the packet queue.
+ /// -# RCV_TERMINATE Packet receiver terminate watch socket.
+ /// Marked as ready when the DHCP packet receiver thread should terminate.
+ isc::util::WatchSocket sockets_[RCV_TERMINATE + 1];
+
+ /// DHCP packet thread mutex.
+ isc::util::thread::Mutex lock_;
+
+ /// DHCP packet receiver thread.
+ isc::util::thread::ThreadPtr thread_ ;
+};
+
+/// @brief Defines a pointer to a Receiver
+typedef boost::shared_ptr<Receiver> ReceiverPtr;
+
/// @brief Forward declaration to the @c IfaceMgr.
class IfaceMgr;
/// @brief Returns true if there is a receiver currently running.
bool isReceiverRunning() const {
- return (receiver_thread_ != 0);
+ return (receiver_ != 0);
}
/// @brief Configures DHCP packet queue
bool configureDHCPPacketQueue(const uint16_t family,
data::ConstElementPtr queue_control);
+ /// @brief Convenience method for adding an descriptor to a set
+ ///
+ /// @param fd descriptor to add
+ /// @param[out] maxfd maximum fd value in the set. If the new fd is
+ /// larger than it's current value, it will be updated to new fd value
+ /// @param sockets pointer to the set of sockets
+ /// @throw BadValue if sockets is null
+ static void add_fd(int fd, int& maxfd, fd_set* sockets);
+
// don't use private, we need derived classes in tests
protected:
/// @brief Allows to use loopback
bool allow_loopback_;
- /// @brief Error message of the last DHCP packet receive error.
- std::string receiver_error_;
-
- /// @brief DHCP packet receive error watch socket.
- /// Marked as ready when the DHCP packet receiver experiences
- /// an I/O error.
- isc::util::WatchSocket error_watch_;
-
- /// @brief DHCP packet receive watch socket.
- /// Marked as ready when the DHCP packet receiver adds a packet
- /// to the packet queue.
- isc::util::WatchSocket receive_watch_;
-
- /// @brief Packet receiver terminate watch socket.
- /// Marked as ready when the DHCP packet receiver thread should terminate.
- isc::util::WatchSocket terminate_watch_;
-
- /// DHCP packet receiver mutex.
- isc::util::thread::Mutex receiver_lock_;
-
- /// DHCP packet receiver thread.
- isc::util::thread::ThreadPtr receiver_thread_;
+ /// DHCP packet receiver.
+ ReceiverPtr receiver_;
};
}; // namespace isc::dhcp
// Verify that we can create a queue with default factory.
data::ConstElementPtr config = makeQueueConfig(PacketQueueMgr4::DEFAULT_QUEUE_TYPE4, 2000);
ASSERT_NO_THROW(PacketQueueMgr4::instance().createPacketQueue(config));
- CHECK_QUEUE_INFO(ifacemgr.getPacketQueue4(), "{ \"capacity\": 2000, \"queue-type\": \""
+ CHECK_QUEUE_INFO(ifacemgr.getPacketQueue4(), "{ \"capacity\": 2000, \"queue-type\": \""
<< PacketQueueMgr4::DEFAULT_QUEUE_TYPE4 << "\", \"size\": 0 }");
// Verify that fetching the queue via IfaceMgr and PacketQueueMgr
// Verify that we can create a queue with default factory.
data::ConstElementPtr config = makeQueueConfig(PacketQueueMgr6::DEFAULT_QUEUE_TYPE6, 2000);
ASSERT_NO_THROW(PacketQueueMgr6::instance().createPacketQueue(config));
- CHECK_QUEUE_INFO(ifacemgr.getPacketQueue6(), "{ \"capacity\": 2000, \"queue-type\": \""
+ CHECK_QUEUE_INFO(ifacemgr.getPacketQueue6(), "{ \"capacity\": 2000, \"queue-type\": \""
<< PacketQueueMgr6::DEFAULT_QUEUE_TYPE6 << "\", \"size\": 0 }");
// Verify that fetching the queue via IfaceMgr and PacketQueueMgr
ASSERT_NO_THROW(queue_enabled = ifacemgr->configureDHCPPacketQueue(AF_INET, queue_control));
ASSERT_TRUE(queue_enabled);
// Verify we have correctly created the queue.
- CHECK_QUEUE_INFO(ifacemgr->getPacketQueue4(), "{ \"capacity\": 500, \"queue-type\": \""
+ CHECK_QUEUE_INFO(ifacemgr->getPacketQueue4(), "{ \"capacity\": 500, \"queue-type\": \""
<< PacketQueueMgr4::DEFAULT_QUEUE_TYPE4 << "\", \"size\": 0 }");
// configureDHCPPacketQueue() should never start the thread.
ASSERT_FALSE(ifacemgr->isReceiverRunning());
ASSERT_NO_THROW(queue_enabled = ifacemgr->configureDHCPPacketQueue(AF_INET6, queue_control));
ASSERT_TRUE(queue_enabled);
// Verify we have correctly created the queue.
- CHECK_QUEUE_INFO(ifacemgr->getPacketQueue6(), "{ \"capacity\": 500, \"queue-type\": \""
+ CHECK_QUEUE_INFO(ifacemgr->getPacketQueue6(), "{ \"capacity\": 500, \"queue-type\": \""
<< PacketQueueMgr6::DEFAULT_QUEUE_TYPE6 << "\", \"size\": 0 }");
// configureDHCPPacketQueue() should never start the thread.
ASSERT_FALSE(ifacemgr->isReceiverRunning());
ASSERT_FALSE(ifacemgr->isReceiverRunning());
}
+/// @brief Test Fixture for testing isc:dhcp::Receiver
+class ReceiverTest : public ::testing::Test {
+public:
+ /// @brief Maximum number of passes allowed in worker event loop
+ static const int WORKER_MAX_PASSES;
+
+ /// @brief Constructor.
+ ReceiverTest() {}
+
+ /// @brief Destructor.
+ ~ReceiverTest() {
+ }
+
+ /// @brief Sleeps for a given number of event periods sleep
+ /// Each period is 50 ms.
+ void nap(int periods) {
+ usleep(periods * 50 * 1000);
+ };
+
+ /// @brief Worker function to be used by the Receiver's thread
+ ///
+ /// The function runs 5 passes through an "event" loop.
+ /// On each pass:
+ /// - check terminate command
+ /// - instigate the desired event (second pass only)
+ /// - naps for 1 period (50ms)
+ ///
+ /// @param watch_type type of event that should occur
+ void worker(Receiver::WatchType watch_type) {
+ for (passes_ = 1; passes_ < WORKER_MAX_PASSES; ++passes_) {
+
+ // Stop if we're told to do it.
+ if (receiver_->shouldTerminate()) {
+ return;
+ }
+
+ // On the second pass, set the event.
+ if (passes_ == 2) {
+ switch (watch_type) {
+ case Receiver::RCV_ERROR:
+ receiver_->setError("we have an error");
+ break;
+ case Receiver::RCV_READY:
+ receiver_->markReady(watch_type);
+ break;
+ case Receiver::RCV_TERMINATE:
+ default:
+ // Do nothing, we're waiting to be told to stop.
+ break;
+ }
+ }
+
+ // Take a nap.
+ nap(1);
+ }
+
+ // Indicate why we stopped.
+ receiver_->setError("thread expired");
+ }
+
+ /// @brief Current receiver instance.
+ ReceiverPtr receiver_;
+
+ /// @brief Counter used to track the number of passes made
+ /// within the thread worker function.
+ int passes_;
+};
+
+const int ReceiverTest::WORKER_MAX_PASSES = 5;
+
+/// Verifies the basic operation of the Receiver class.
+/// It checks that a Receiver can be created, can be stopped,
+/// and that in set and clear sockets.
+TEST_F(ReceiverTest, receiverClassBasics) {
+
+ /// We'll create a receiver and let it run until it expires. (Note this is more
+ /// of a test of ReceiverTest itself and ensures our tests later for why we
+ /// exited are sound.)
+ receiver_.reset(new Receiver(boost::bind(boost::bind(&ReceiverTest::worker, this,
+ Receiver::RCV_TERMINATE))));
+ // Wait long enough for thread to expire.
+ nap(WORKER_MAX_PASSES + 1);
+
+ // It should have done the maximum number of passes.
+ EXPECT_EQ(passes_, WORKER_MAX_PASSES);
+
+ // Error should be ready and error text should be "thread expired".
+ ASSERT_TRUE(receiver_->isReady(Receiver::RCV_ERROR));
+ ASSERT_FALSE(receiver_->isReady(Receiver::RCV_READY));
+ ASSERT_FALSE(receiver_->isReady(Receiver::RCV_TERMINATE));
+ EXPECT_EQ("thread expired", receiver_->getLastError());
+
+ /// Now we'll test stopping a thread.
+ /// We'll create a Receiver, let it run a little and then tell it to stop.
+ receiver_.reset(new Receiver(boost::bind(boost::bind(&ReceiverTest::worker, this,
+ Receiver::RCV_TERMINATE))));
+ // No watches should be ready.
+ ASSERT_FALSE(receiver_->isReady(Receiver::RCV_ERROR));
+ ASSERT_FALSE(receiver_->isReady(Receiver::RCV_READY));
+ ASSERT_FALSE(receiver_->isReady(Receiver::RCV_TERMINATE));
+
+ // Wait a little while.
+ nap(2);
+
+ // Tell it to stop.
+ receiver_->stop();
+
+ // It should have done less than the maximum number of passes.
+ EXPECT_LT(passes_, WORKER_MAX_PASSES);
+
+ // No watches should be ready. Error text should be "thread stopped".
+ ASSERT_FALSE(receiver_->isReady(Receiver::RCV_ERROR));
+ ASSERT_FALSE(receiver_->isReady(Receiver::RCV_READY));
+ ASSERT_FALSE(receiver_->isReady(Receiver::RCV_TERMINATE));
+ EXPECT_EQ("thread stopped", receiver_->getLastError());
+
+
+ // Next we'll test error notification.
+ // We'll create a receiver that sets an error on the second pass.
+ receiver_.reset(new Receiver(boost::bind(boost::bind(&ReceiverTest::worker, this,
+ Receiver::RCV_ERROR))));
+ // No watches should be ready.
+ ASSERT_FALSE(receiver_->isReady(Receiver::RCV_ERROR));
+ ASSERT_FALSE(receiver_->isReady(Receiver::RCV_READY));
+ ASSERT_FALSE(receiver_->isReady(Receiver::RCV_TERMINATE));
+
+ // Wait a little while.
+ nap(2);
+
+ // It should now indicate an error.
+ ASSERT_TRUE(receiver_->isReady(Receiver::RCV_ERROR));
+ EXPECT_EQ("we have an error", receiver_->getLastError());
+
+ // Tell it to stop.
+ receiver_->stop();
+
+ // It should have done less than the maximum number of passes.
+ EXPECT_LT(passes_, WORKER_MAX_PASSES);
+
+ // No watches should be ready. Error text should be "thread stopped".
+ ASSERT_FALSE(receiver_->isReady(Receiver::RCV_ERROR));
+ ASSERT_FALSE(receiver_->isReady(Receiver::RCV_READY));
+ ASSERT_FALSE(receiver_->isReady(Receiver::RCV_TERMINATE));
+ EXPECT_EQ("thread stopped", receiver_->getLastError());
+
+
+ // Finally, we'll test data ready notification.
+ // We'll create a receiver that indicates data ready on its second pass.
+ receiver_.reset(new Receiver(boost::bind(boost::bind(&ReceiverTest::worker, this,
+ Receiver::RCV_READY))));
+ // No watches should be ready.
+ ASSERT_FALSE(receiver_->isReady(Receiver::RCV_ERROR));
+ ASSERT_FALSE(receiver_->isReady(Receiver::RCV_READY));
+ ASSERT_FALSE(receiver_->isReady(Receiver::RCV_TERMINATE));
+
+ // Wait a little while.
+ nap(2);
+
+ // It should now indicate data ready.
+ ASSERT_TRUE(receiver_->isReady(Receiver::RCV_READY));
+
+ // Tell it to stop.
+ receiver_->stop();
+
+ // It should have done less than the maximum number of passes.
+ EXPECT_LT(passes_, WORKER_MAX_PASSES);
+
+ // No watches should be ready. Error text should be "thread stopped".
+ ASSERT_FALSE(receiver_->isReady(Receiver::RCV_ERROR));
+ ASSERT_FALSE(receiver_->isReady(Receiver::RCV_READY));
+ ASSERT_FALSE(receiver_->isReady(Receiver::RCV_TERMINATE));
+ EXPECT_EQ("thread stopped", receiver_->getLastError());
+}
}