]> git.ipfire.org Git - thirdparty/kea.git/commitdiff
[#260,!120] Moved thread creation from Receiver ctor to new start function
authorThomas Markwalder <tmark@isc.org>
Mon, 19 Nov 2018 13:09:55 +0000 (08:09 -0500)
committerThomas Markwalder <tmark@isc.org>
Tue, 20 Nov 2018 18:25:03 +0000 (13:25 -0500)
    dhcp::Receiver - moved creating receiver's thread out of the constructor
    to new Receier::start() method.  This alleviates issues with thread worker
    functions referring to Receiver members before Receiver fully exists.

src/lib/dhcp/iface_mgr.cc
src/lib/dhcp/iface_mgr.h
src/lib/dhcp/packet_queue_mgr.h
src/lib/dhcp/tests/iface_mgr_unittest.cc
src/lib/dhcpsrv/parsers/ifaces_config_parser.cc

index 4a0dd9cfc27b26de0518acc858bc61ba2bcec3b1..4620bfb3e0aa43b35528c5b9d06fc7687cd2706a 100644 (file)
@@ -179,7 +179,8 @@ bool Iface::delSocket(const uint16_t sockfd) {
     return (false); // socket not found
 }
 
-Receiver::Receiver(const boost::function<void()>& thread_main) {
+void
+Receiver::start(const boost::function<void()>& thread_main) {
     clearReady(RCV_ERROR);
     clearReady(RCV_READY);
     clearReady(RCV_TERMINATE);
@@ -219,9 +220,12 @@ Receiver::shouldTerminate() {
 
 void
 Receiver::stop() {
-    markReady(RCV_TERMINATE);
-    thread_->wait();
-    thread_.reset();
+    if (thread_) {
+        markReady(RCV_TERMINATE);
+        thread_->wait();
+        thread_.reset();
+    }
+
     clearReady(RCV_ERROR);
     clearReady(RCV_READY);
     last_error_ = "thread stopped";
@@ -353,14 +357,14 @@ void IfaceMgr::stopDHCPReceiver() {
     if (isReceiverRunning()) {
         receiver_->stop();
         receiver_.reset();
-    }
 
-    if (getPacketQueue4()) {
-        getPacketQueue4()->clear();
-    }
+        if (getPacketQueue4()) {
+            getPacketQueue4()->clear();
+        }
 
-    if (getPacketQueue6()) {
-        getPacketQueue6()->clear();
+        if (getPacketQueue6()) {
+            getPacketQueue6()->clear();
+        }
     }
 }
 
@@ -369,7 +373,6 @@ IfaceMgr::~IfaceMgr() {
     control_buf_len_ = 0;
 
     closeSockets();
-
     // Explicitly delete PQM singletons.
     PacketQueueMgr4::destroy();
     PacketQueueMgr6::destroy();
@@ -754,7 +757,9 @@ IfaceMgr::startDHCPReceiver(const uint16_t family) {
                 return;
         }
 
-        receiver_.reset(new Receiver(boost::bind(boost::bind(&IfaceMgr::receiveDHCP4Packets, this))));
+        receiver_.reset(new Receiver());
+        receiver_->start(boost::bind(boost::bind(&IfaceMgr::receiveDHCP4Packets, this)));
+
         break;
     case AF_INET6:
         // If there's no queue, then has been disabled, simply return.
@@ -762,7 +767,8 @@ IfaceMgr::startDHCPReceiver(const uint16_t family) {
             return;
         }
 
-        receiver_.reset(new Receiver(boost::bind(boost::bind(&IfaceMgr::receiveDHCP6Packets, this))));
+        receiver_.reset(new Receiver());
+        receiver_->start(boost::bind(boost::bind(&IfaceMgr::receiveDHCP6Packets, this)));
         break;
     default:
         isc_throw (BadValue, "startDHCPReceiver: invalid family: " << family);
index 428523ea8265d5c3e4ed0d4cfa83557c6e6e76d5..a32912b83d55799230add7c2784cbff80dd7426c 100644 (file)
@@ -475,15 +475,10 @@ public:
     };
 
     /// @brief Constructor
-    ///
-    /// It initializes the watch sockets and then instantiates and
-    /// starts the receiver's worker thread.
-    ///
-    /// @param thread_main function the receiver's thread should run
-    Receiver(const boost::function<void()>& thread_main);
+    Receiver(){};
 
     /// @brief Virtual destructor
-    virtual ~Receiver() {}
+    virtual ~Receiver(){}
 
     /// @brief Fetches the fd of a watch socket
     ///
@@ -516,6 +511,22 @@ public:
     /// @return true if the terminate watch socket is ready
     bool shouldTerminate();
 
+    /// @brief Creates and runs the thread.
+    ///
+    /// Creates teh receiver's thread, passing into it the given
+    /// function to run.
+    ///
+    /// @param thread_main function the receiver's thread should run
+    void start(const boost::function<void()>& thread_main);
+
+    /// @brief Returns true if the receiver thread is running
+    /// @todo - this may need additional logic to handle cases where
+    /// a thread function exits w/o the caller invoking @c
+    /// Receiver::stop().
+    bool isRunning() {
+        return (thread_ != 0);
+    }
+
     /// @brief Terminates the receiver thread
     ///
     /// It marks the terminate watch socket ready, and then waits for the
@@ -1152,9 +1163,10 @@ public:
     /// the packet queue is flushed.
     void stopDHCPReceiver();
 
-    /// @brief Returns true if there is a receiver currently running.
+    /// @brief Returns true if there is a receiver exists and its
+    /// thread is currently running.
     bool isReceiverRunning() const {
-        return (receiver_ != 0);
+        return (receiver_ != 0 && receiver_->isRunning());
     }
 
     /// @brief Configures DHCP packet queue
index 5b1d4da7796edccc865b44d6dbb854ac9c0eaa7e..6eaf6c4b218e9c7ac97caaa0b89a88fe53a68e9e 100644 (file)
@@ -68,7 +68,7 @@ public:
     /// @return true if the queue type has been successfully registered, false
     /// if the type already exists.
     bool registerPacketQueueFactory(const std::string& queue_type,
-                                    const Factory& factory) {
+                                    Factory factory) {
         // Check if this backend has been already registered.
         if (factories_.count(queue_type)) {
             return (false);
index 94cbe8d71dc402de940dab0cf5c0672e22dfaeb6..8559139fe53def8d293c29ad181bf40dcf37314f 100644 (file)
@@ -3265,8 +3265,11 @@ TEST_F(ReceiverTest, receiverClassBasics) {
     /// We'll create a receiver and let it run until it expires.  (Note this is more
     /// of a test of ReceiverTest itself and ensures our tests later for why we
     /// exited are sound.)
-    receiver_.reset(new Receiver(boost::bind(boost::bind(&ReceiverTest::worker, this,
-                                                         Receiver::RCV_TERMINATE))));
+    receiver_.reset(new Receiver());
+    ASSERT_FALSE(receiver_->isRunning());
+    receiver_->start(boost::bind(&ReceiverTest::worker, this, Receiver::RCV_TERMINATE));
+    ASSERT_TRUE(receiver_->isRunning());
+
     // Wait long enough for thread to expire.
     nap(WORKER_MAX_PASSES + 1);
 
@@ -3279,10 +3282,18 @@ TEST_F(ReceiverTest, receiverClassBasics) {
     ASSERT_FALSE(receiver_->isReady(Receiver::RCV_TERMINATE));
     EXPECT_EQ("thread expired", receiver_->getLastError());
 
+    // This one is a little wonky, as a thread function expiring needs to be
+    // supported in Receiver.  There needs to be something in Receiver, so it
+    // nows the thread exited.  Thread exists but I think it's underlying
+    // impl does not.
+    EXPECT_TRUE(receiver_->isRunning());
+    ASSERT_NO_THROW(receiver_->stop());
+
     /// Now we'll test stopping a thread.
-    /// We'll create a Receiver, let it run a little and then tell it to stop.
-    receiver_.reset(new Receiver(boost::bind(boost::bind(&ReceiverTest::worker, this,
-                                                         Receiver::RCV_TERMINATE))));
+    /// Start the receiver, let it run a little and then tell it to stop.
+    receiver_->start(boost::bind(&ReceiverTest::worker, this, Receiver::RCV_TERMINATE));
+    ASSERT_TRUE(receiver_->isRunning());
+
     // No watches should be ready.
     ASSERT_FALSE(receiver_->isReady(Receiver::RCV_ERROR));
     ASSERT_FALSE(receiver_->isReady(Receiver::RCV_READY));
@@ -3293,6 +3304,7 @@ TEST_F(ReceiverTest, receiverClassBasics) {
 
     // Tell it to stop.
     receiver_->stop();
+    ASSERT_FALSE(receiver_->isRunning());
 
     // It should have done less than the maximum number of passes.
     EXPECT_LT(passes_, WORKER_MAX_PASSES);
@@ -3305,9 +3317,10 @@ TEST_F(ReceiverTest, receiverClassBasics) {
 
 
     // Next we'll test error notification.
-    // We'll create a receiver that sets an error on the second pass.
-    receiver_.reset(new Receiver(boost::bind(boost::bind(&ReceiverTest::worker, this,
-                                                         Receiver::RCV_ERROR))));
+    // Start the receiver with a thread that sets an error on the second pass.
+    receiver_->start(boost::bind(&ReceiverTest::worker, this, Receiver::RCV_ERROR));
+    ASSERT_TRUE(receiver_->isRunning());
+
     // No watches should be ready.
     ASSERT_FALSE(receiver_->isReady(Receiver::RCV_ERROR));
     ASSERT_FALSE(receiver_->isReady(Receiver::RCV_READY));
@@ -3322,6 +3335,7 @@ TEST_F(ReceiverTest, receiverClassBasics) {
 
     // Tell it to stop.
     receiver_->stop();
+    ASSERT_FALSE(receiver_->isRunning());
 
     // It should have done less than the maximum number of passes.
     EXPECT_LT(passes_, WORKER_MAX_PASSES);
@@ -3334,9 +3348,10 @@ TEST_F(ReceiverTest, receiverClassBasics) {
 
 
     // Finally, we'll test data ready notification.
-    // We'll create a receiver that indicates data ready on its second pass.
-    receiver_.reset(new Receiver(boost::bind(boost::bind(&ReceiverTest::worker, this,
-                                                         Receiver::RCV_READY))));
+    // We'll start the receiver with a thread that indicates data ready on its second pass.
+    receiver_->start(boost::bind(&ReceiverTest::worker, this, Receiver::RCV_READY));
+    ASSERT_TRUE(receiver_->isRunning());
+
     // No watches should be ready.
     ASSERT_FALSE(receiver_->isReady(Receiver::RCV_ERROR));
     ASSERT_FALSE(receiver_->isReady(Receiver::RCV_READY));
@@ -3350,6 +3365,7 @@ TEST_F(ReceiverTest, receiverClassBasics) {
 
     // Tell it to stop.
     receiver_->stop();
+    ASSERT_FALSE(receiver_->isRunning());
 
     // It should have done less than the maximum number of passes.
     EXPECT_LT(passes_, WORKER_MAX_PASSES);
index f8dc97111444c899fbf2a5b7e08d9751d1fab087..ed90f64898e8c91336c10e5834e4a711688db7e4 100644 (file)
@@ -47,7 +47,6 @@ IfacesConfigParser::parse(const CfgIfacePtr& cfg,
     if (re_detect) {
         // Interface clear will drop opened socket information
         // so close them if the caller did not.
-        IfaceMgr::instance().stopDHCPReceiver();
         IfaceMgr::instance().closeSockets();
         IfaceMgr::instance().clearIfaces();
         IfaceMgr::instance().detectIfaces();