}
// Add Receiver ready watch socket
- addFDtoSet(dhcp_receiver_->getWatchFd(WatchedThread::RCV_READY), maxfd, &sockets);
+ addFDtoSet(dhcp_receiver_->getWatchFd(WatchedThread::READY), maxfd, &sockets);
// Add Receiver error watch socket
- addFDtoSet(dhcp_receiver_->getWatchFd(WatchedThread::RCV_ERROR), maxfd, &sockets);
+ addFDtoSet(dhcp_receiver_->getWatchFd(WatchedThread::ERROR), maxfd, &sockets);
// Set timeout for our next select() call. If there are
// no DHCP packets to read, then we'll wait for a finite
// We only check external sockets if select detected an event.
if (result > 0) {
// Check for receiver thread read errors.
- if (dhcp_receiver_->isReady(WatchedThread::RCV_ERROR)) {
+ if (dhcp_receiver_->isReady(WatchedThread::ERROR)) {
string msg = dhcp_receiver_->getLastError();
- dhcp_receiver_->clearReady(WatchedThread::RCV_ERROR);
+ dhcp_receiver_->clearReady(WatchedThread::ERROR);
isc_throw(SocketReadError, msg);
}
// If we're here it should only be because there are DHCP packets waiting.
Pkt4Ptr pkt = getPacketQueue4()->dequeuePacket();
if (!pkt) {
- dhcp_receiver_->clearReady(WatchedThread::RCV_READY);
+ dhcp_receiver_->clearReady(WatchedThread::READY);
}
return (pkt);
}
// Add Receiver ready watch socket
- addFDtoSet(dhcp_receiver_->getWatchFd(WatchedThread::RCV_READY), maxfd, &sockets);
+ addFDtoSet(dhcp_receiver_->getWatchFd(WatchedThread::READY), maxfd, &sockets);
// Add Receiver error watch socket
- addFDtoSet(dhcp_receiver_->getWatchFd(WatchedThread::RCV_ERROR), maxfd, &sockets);
+ addFDtoSet(dhcp_receiver_->getWatchFd(WatchedThread::ERROR), maxfd, &sockets);
// Set timeout for our next select() call. If there are
// no DHCP packets to read, then we'll wait for a finite
// We only check external sockets if select detected an event.
if (result > 0) {
// Check for receiver thread read errors.
- if (dhcp_receiver_->isReady(WatchedThread::RCV_ERROR)) {
+ if (dhcp_receiver_->isReady(WatchedThread::ERROR)) {
string msg = dhcp_receiver_->getLastError();
- dhcp_receiver_->clearReady(WatchedThread::RCV_ERROR);
+ dhcp_receiver_->clearReady(WatchedThread::ERROR);
isc_throw(SocketReadError, msg);
}
// If we're here it should only be because there are DHCP packets waiting.
Pkt6Ptr pkt = getPacketQueue6()->dequeuePacket();
if (!pkt) {
- dhcp_receiver_->clearReady(WatchedThread::RCV_READY);
+ dhcp_receiver_->clearReady(WatchedThread::READY);
}
return (pkt);
FD_ZERO(&sockets);
// Add terminate watch socket.
- addFDtoSet(dhcp_receiver_->getWatchFd(WatchedThread::RCV_TERMINATE), maxfd, &sockets);
+ addFDtoSet(dhcp_receiver_->getWatchFd(WatchedThread::TERMINATE), maxfd, &sockets);
// Add Interface sockets.
BOOST_FOREACH(iface, ifaces_) {
FD_ZERO(&sockets);
// Add terminate watch socket.
- addFDtoSet(dhcp_receiver_->getWatchFd(WatchedThread::RCV_TERMINATE), maxfd, &sockets);
+ addFDtoSet(dhcp_receiver_->getWatchFd(WatchedThread::TERMINATE), maxfd, &sockets);
// Add Interface sockets.
BOOST_FOREACH(iface, ifaces_) {
if (pkt) {
getPacketQueue4()->enqueuePacket(pkt, socket_info);
- dhcp_receiver_->markReady(WatchedThread::RCV_READY);
+ dhcp_receiver_->markReady(WatchedThread::READY);
}
}
if (pkt) {
getPacketQueue6()->enqueuePacket(pkt, socket_info);
- dhcp_receiver_->markReady(WatchedThread::RCV_READY);
+ dhcp_receiver_->markReady(WatchedThread::READY);
}
}
/// This function is used to remove the factory function for a given type.
/// Typically, it would be called when unloading the hook library which
/// loaded the type, and thus called by the library's @c unload function.
+ /// In addition to removing the factory, it will also destroy the current
+ /// queue if it is of the same queue-type as the factory being removed.
+ /// This avoids the nastiness that occurs when objecs are left in existence
+ /// after their library is unloaded.
///
/// @param queue_type queue type, e.g. "kea-ring4".
///
/// @brief Sleeps for a given number of event periods sleep
/// Each period is 50 ms.
void nap(int periods) {
- usleep(periods * 500 * 1000);
+ usleep(periods * 50 * 1000);
};
/// @brief Worker function to be used by the WatchedThread's thread
// On the second pass, set the event.
if (passes_ == 2) {
switch (watch_type) {
- case WatchedThread::RCV_ERROR:
+ case WatchedThread::ERROR:
wthread_->setError("we have an error");
break;
- case WatchedThread::RCV_READY:
+ case WatchedThread::READY:
wthread_->markReady(watch_type);
break;
- case WatchedThread::RCV_TERMINATE:
+ case WatchedThread::TERMINATE:
default:
// Do nothing, we're waiting to be told to stop.
break;
wthread_->setError("thread expired");
}
- /// @brief Current receiver instance.
+ /// @brief Current WatchedThread instance.
WatchedThreadPtr wthread_;
/// @brief Counter used to track the number of passes made
/// Verifies the basic operation of the WatchedThread class.
/// It checks that a WatchedThread can be created, can be stopped,
/// and that in set and clear sockets.
-TEST_F(WatchedThreadTest, receiverClassBasics) {
+TEST_F(WatchedThreadTest, watchedThreadClassBasics) {
- /// We'll create a receiver and let it run until it expires. (Note this is more
- /// of a test of WatchedThreadTest itself and ensures our tests later for why we
- /// exited are sound.)
+ /// We'll create a WatchedThread and let it run until it expires. (Note this is more
+ /// of a test of WatchedThreadTest itself and ensures that the assumptions made in
+ /// our other tests as to why threads have finished are sound.
wthread_.reset(new WatchedThread());
ASSERT_FALSE(wthread_->isRunning());
- wthread_->start(boost::bind(&WatchedThreadTest::worker, this, WatchedThread::RCV_TERMINATE));
+ wthread_->start(boost::bind(&WatchedThreadTest::worker, this, WatchedThread::TERMINATE));
ASSERT_TRUE(wthread_->isRunning());
- // Wait long enough for thread to expire.
- nap(WORKER_MAX_PASSES + 1);
+ // Wait more long enough (we hope) for the thread to expire.
+ nap(WORKER_MAX_PASSES * 4);
// It should have done the maximum number of passes.
EXPECT_EQ(passes_, WORKER_MAX_PASSES);
// Error should be ready and error text should be "thread expired".
- ASSERT_TRUE(wthread_->isReady(WatchedThread::RCV_ERROR));
- ASSERT_FALSE(wthread_->isReady(WatchedThread::RCV_READY));
- ASSERT_FALSE(wthread_->isReady(WatchedThread::RCV_TERMINATE));
+ ASSERT_TRUE(wthread_->isReady(WatchedThread::ERROR));
+ ASSERT_FALSE(wthread_->isReady(WatchedThread::READY));
+ ASSERT_FALSE(wthread_->isReady(WatchedThread::TERMINATE));
EXPECT_EQ("thread expired", wthread_->getLastError());
// Thread is technically still running, so let's stop it.
ASSERT_FALSE(wthread_->isRunning());
/// Now we'll test stopping a thread.
- /// Start the receiver, let it run a little and then tell it to stop.
- wthread_->start(boost::bind(&WatchedThreadTest::worker, this, WatchedThread::RCV_TERMINATE));
+ /// Start the WatchedThread, let it run a little and then tell it to stop.
+ wthread_->start(boost::bind(&WatchedThreadTest::worker, this, WatchedThread::TERMINATE));
ASSERT_TRUE(wthread_->isRunning());
// No watches should be ready.
- ASSERT_FALSE(wthread_->isReady(WatchedThread::RCV_ERROR));
- ASSERT_FALSE(wthread_->isReady(WatchedThread::RCV_READY));
- ASSERT_FALSE(wthread_->isReady(WatchedThread::RCV_TERMINATE));
+ ASSERT_FALSE(wthread_->isReady(WatchedThread::ERROR));
+ ASSERT_FALSE(wthread_->isReady(WatchedThread::READY));
+ ASSERT_FALSE(wthread_->isReady(WatchedThread::TERMINATE));
// Wait a little while.
- nap(2);
+ nap(3);
// Tell it to stop.
wthread_->stop();
EXPECT_LT(passes_, WORKER_MAX_PASSES);
// No watches should be ready. Error text should be "thread stopped".
- ASSERT_FALSE(wthread_->isReady(WatchedThread::RCV_ERROR));
- ASSERT_FALSE(wthread_->isReady(WatchedThread::RCV_READY));
- ASSERT_FALSE(wthread_->isReady(WatchedThread::RCV_TERMINATE));
+ ASSERT_FALSE(wthread_->isReady(WatchedThread::ERROR));
+ ASSERT_FALSE(wthread_->isReady(WatchedThread::READY));
+ ASSERT_FALSE(wthread_->isReady(WatchedThread::TERMINATE));
EXPECT_EQ("thread stopped", wthread_->getLastError());
// Next we'll test error notification.
- // Start the receiver with a thread that sets an error on the second pass.
- wthread_->start(boost::bind(&WatchedThreadTest::worker, this, WatchedThread::RCV_ERROR));
+ // Start the WatchedThread with a thread that sets an error on the second pass.
+ wthread_->start(boost::bind(&WatchedThreadTest::worker, this, WatchedThread::ERROR));
ASSERT_TRUE(wthread_->isRunning());
// No watches should be ready.
- ASSERT_FALSE(wthread_->isReady(WatchedThread::RCV_ERROR));
- ASSERT_FALSE(wthread_->isReady(WatchedThread::RCV_READY));
- ASSERT_FALSE(wthread_->isReady(WatchedThread::RCV_TERMINATE));
+ ASSERT_FALSE(wthread_->isReady(WatchedThread::ERROR));
+ ASSERT_FALSE(wthread_->isReady(WatchedThread::READY));
+ ASSERT_FALSE(wthread_->isReady(WatchedThread::TERMINATE));
// Wait a little while.
- nap(2);
+ nap(3);
// It should now indicate an error.
- ASSERT_TRUE(wthread_->isReady(WatchedThread::RCV_ERROR));
+ ASSERT_TRUE(wthread_->isReady(WatchedThread::ERROR));
EXPECT_EQ("we have an error", wthread_->getLastError());
// Tell it to stop.
EXPECT_LT(passes_, WORKER_MAX_PASSES);
// No watches should be ready. Error text should be "thread stopped".
- ASSERT_FALSE(wthread_->isReady(WatchedThread::RCV_ERROR));
- ASSERT_FALSE(wthread_->isReady(WatchedThread::RCV_READY));
- ASSERT_FALSE(wthread_->isReady(WatchedThread::RCV_TERMINATE));
+ ASSERT_FALSE(wthread_->isReady(WatchedThread::ERROR));
+ ASSERT_FALSE(wthread_->isReady(WatchedThread::READY));
+ ASSERT_FALSE(wthread_->isReady(WatchedThread::TERMINATE));
EXPECT_EQ("thread stopped", wthread_->getLastError());
// Finally, we'll test data ready notification.
- // We'll start the receiver with a thread that indicates data ready on its second pass.
- wthread_->start(boost::bind(&WatchedThreadTest::worker, this, WatchedThread::RCV_READY));
+ // We'll start the WatchedThread with a thread that indicates data ready on its second pass.
+ wthread_->start(boost::bind(&WatchedThreadTest::worker, this, WatchedThread::READY));
ASSERT_TRUE(wthread_->isRunning());
// No watches should be ready.
- ASSERT_FALSE(wthread_->isReady(WatchedThread::RCV_ERROR));
- ASSERT_FALSE(wthread_->isReady(WatchedThread::RCV_READY));
- ASSERT_FALSE(wthread_->isReady(WatchedThread::RCV_TERMINATE));
+ ASSERT_FALSE(wthread_->isReady(WatchedThread::ERROR));
+ ASSERT_FALSE(wthread_->isReady(WatchedThread::READY));
+ ASSERT_FALSE(wthread_->isReady(WatchedThread::TERMINATE));
// Wait a little while.
- nap(2);
+ nap(3);
// It should now indicate data ready.
- ASSERT_TRUE(wthread_->isReady(WatchedThread::RCV_READY));
+ ASSERT_TRUE(wthread_->isReady(WatchedThread::READY));
// Tell it to stop.
wthread_->stop();
EXPECT_LT(passes_, WORKER_MAX_PASSES);
// No watches should be ready. Error text should be "thread stopped".
- ASSERT_FALSE(wthread_->isReady(WatchedThread::RCV_ERROR));
- ASSERT_FALSE(wthread_->isReady(WatchedThread::RCV_READY));
- ASSERT_FALSE(wthread_->isReady(WatchedThread::RCV_TERMINATE));
+ ASSERT_FALSE(wthread_->isReady(WatchedThread::ERROR));
+ ASSERT_FALSE(wthread_->isReady(WatchedThread::READY));
+ ASSERT_FALSE(wthread_->isReady(WatchedThread::TERMINATE));
EXPECT_EQ("thread stopped", wthread_->getLastError());
}
void
WatchedThread::start(const boost::function<void()>& thread_main) {
- clearReady(RCV_ERROR);
- clearReady(RCV_READY);
- clearReady(RCV_TERMINATE);
+ clearReady(ERROR);
+ clearReady(READY);
+ clearReady(TERMINATE);
last_error_ = "no error";
thread_.reset(new Thread(thread_main));
}
bool
WatchedThread::shouldTerminate() {
- if (sockets_[RCV_TERMINATE].isReady()) {
- clearReady(RCV_TERMINATE);
+ if (sockets_[TERMINATE].isReady()) {
+ clearReady(TERMINATE);
return (true);
}
void
WatchedThread::stop() {
if (thread_) {
- markReady(RCV_TERMINATE);
+ markReady(TERMINATE);
thread_->wait();
thread_.reset();
}
- clearReady(RCV_ERROR);
- clearReady(RCV_READY);
+ clearReady(ERROR);
+ clearReady(READY);
last_error_ = "thread stopped";
}
void
WatchedThread::setError(const std::string& error_msg) {
last_error_ = error_msg;
- markReady(RCV_ERROR);
+ markReady(ERROR);
}
std::string
namespace util {
namespace thread {
-/// @brief Provides a thread and controls for receiving packets.
+/// @brief Provides a thread and controls for monitoring its activities
///
/// Given a "worker function", this class creates a thread which
/// runs the function and provides the means to monitor the thread
/// @brief Enumerates the list of watch sockets used to mark events
/// These are used as arguments to watch socket accessor methods.
enum WatchType {
- RCV_ERROR = 0,
- RCV_READY = 1,
- RCV_TERMINATE = 2
+ ERROR = 0,
+ READY = 1,
+ TERMINATE = 2
};
/// @brief Constructor
/// @param watch_type indicates which watch socket to clear
void clearReady(WatchType watch_type);
- /// @brief Checks if the receiver thread should terminate
+ /// @brief Checks if the thread should terminate
///
- /// Performs a "one-shot" check of the receiver's terminate
- /// watch socket. If it is ready, return true and then clear
- /// it, otherwise return false.
+ /// Performs a "one-shot" check of the terminate watch socket.
+ /// If it is ready, return true and then clear it, otherwise
+ /// return false.
///
/// @return true if the terminate watch socket is ready
bool shouldTerminate();
/// @brief Creates and runs the thread.
///
- /// Creates teh receiver's thread, passing into it the given
- /// function to run.
+ /// Creates the thread, passing into it the given function to run.
///
- /// @param thread_main function the receiver's thread should run
+ /// @param thread_main function the thread should run
void start(const boost::function<void()>& thread_main);
- /// @brief Returns true if the receiver thread is running
- /// @todo - this may need additional logic to handle cases where
- /// a thread function exits w/o the caller invoking @c
- /// WatchedThread::stop().
+ /// @brief Returns true if the thread is running
bool isRunning() {
return (thread_ != 0);
}
- /// @brief Terminates the receiver thread
+ /// @brief Terminates the thread
///
/// It marks the terminate watch socket ready, and then waits for the
- /// thread to stop. At this point, the receiver is defunct. This is
+ /// thread to stop. At this point, the thread is defunct. This is
/// not done in the destructor to avoid race conditions.
void stop();
- /// @brief Sets the receiver error state
+ /// @brief Sets the error state
///
/// This records the given error message and sets the error watch
/// socket to ready.
/// @param error_msg
void setError(const std::string& error_msg);
- /// @brief Fetches the error message text for the most recent socket error
+ /// @brief Fetches the error message text for the most recent error
///
/// @return string containing the error message
std::string getLastError();
/// @brief Error message of the last error encountered
std::string last_error_;
- /// @brief DHCP watch sockets that are used to communicate with the owning thread
+ /// @brief WatchSockets that are used to communicate with the owning thread
/// There are three:
- /// -# RCV_ERROR - packet receive error watch socket.
- /// Marked as ready when the DHCP packet receiver experiences an I/O error.
- /// -# RCV_READY - Marked as ready when the DHCP packet receiver adds a packet
- /// to the packet queue.
- /// -# RCV_TERMINATE Packet receiver terminate watch socket.
- /// Marked as ready when the DHCP packet receiver thread should terminate.
- WatchSocket sockets_[RCV_TERMINATE + 1];
-
- /// DHCP packet receiver thread.
+ /// -# ERROR - Marked as ready by the thread when it experiences an error.
+ /// -# READY - Marked as ready by the thread when it needs attention for a normal event
+ /// (e.g. a thread used to receive data would mark READY when it has data available)
+ /// -# TERMINATE - Marked as ready by WatchedThread owner to instruct the thread to
+ /// terminate. Worker functions must monitor TERMINATa by periodically calling
+ /// @c shouldTerminate
+ WatchSocket sockets_[TERMINATE + 1];
+
+ /// @brief Current thread instance
thread::ThreadPtr thread_ ;
};