]> git.ipfire.org Git - thirdparty/kea.git/commitdiff
[283-perfdhcp-sending-thread] Moved the mutex to a new sent packet queue 283-perfdhcp-sending-thread
authorFrancis Dupont <fdupont@isc.org>
Mon, 24 Dec 2018 16:13:48 +0000 (17:13 +0100)
committerFrancis Dupont <fdupont@isc.org>
Mon, 24 Dec 2018 16:13:48 +0000 (17:13 +0100)
src/bin/perfdhcp/stats_mgr.h
src/bin/perfdhcp/test_control.cc
src/bin/perfdhcp/test_control.h

index fe3b790a2b58e41ed64e04c26a3a0ac09bd827fe..4ce4cf347310774dbef3aed55e251df5f7f1127a 100644 (file)
@@ -9,7 +9,6 @@
 
 #include <dhcp/pkt4.h>
 #include <dhcp/pkt6.h>
-#include <util/threads/sync.h>
 #include <exceptions/exceptions.h>
 
 #include <boost/noncopyable.hpp>
@@ -307,7 +306,6 @@ public:
             if (!packet) {
                 isc_throw(BadValue, "Packet is null");
             }
-            isc::util::thread::Mutex::Locker lock(mutex_);
             ++sent_packets_num_;
             sent_packets_.template get<0>().push_back(packet);
         }
@@ -322,7 +320,6 @@ public:
             if (!packet) {
                 isc_throw(BadValue, "Packet is null");
             }
-            isc::util::thread::Mutex::Locker lock(mutex_);
             rcvd_packets_.push_back(packet);
         }
 
@@ -470,7 +467,6 @@ public:
                 ++ordered_lookups_;
                 packet_found = true;
             } else {
-                isc::util::thread::Mutex::Locker lock(mutex_);
                 // If we are here, it means that we were unable to match the
                 // next incoming packet with next sent packet so we need to
                 // take a little more expensive approach to look packets using
@@ -579,7 +575,6 @@ public:
             // If packet was found, we assume it will be never searched
             // again. We want to delete this packet from the list to
             // improve performance of future searches.
-            isc::util::thread::Mutex::Locker lock(mutex_);
             next_sent_ = eraseSent(next_sent_);
             return(sent_packet);
         }
@@ -919,24 +914,23 @@ public:
         /// \brief Erase packet from the list of sent packets.
         ///
         /// Method erases packet from the list of sent packets.
-        /// Lock must be held by caller.
         ///
         /// \param it iterator pointing to packet to be erased.
         /// \return iterator pointing to packet following erased
         /// packet or sent_packets_.end() if packet not found.
         PktListIterator eraseSent(const PktListIterator it) {
-            if (archive_enabled_) {
-                // We don't want to keep list of all sent packets
-                // because it will affect packet lookup performance.
-                // If packet is matched with received packet we
-                // move it to list of archived packets. List of
-                // archived packets may be used for diagnostics
-                // when test is completed.
-                archived_packets_.push_back(*it);
-            }
-            // get<0>() template returns sequential index to
-            // container.
-            return(sent_packets_.template get<0>().erase(it));
+             if (archive_enabled_) {
+                 // We don't want to keep list of all sent packets
+                 // because it will affect packet lookup performance.
+                 // If packet is matched with received packet we
+                 // move it to list of archived packets. List of
+                 // archived packets may be used for diagnostics
+                 // when test is completed.
+                 archived_packets_.push_back(*it);
+             }
+             // get<0>() template returns sequential index to
+             // container.
+             return(sent_packets_.template get<0>().erase(it));
         }
 
         ExchangeType xchg_type_;             ///< Packet exchange type.
@@ -1004,7 +998,6 @@ public:
         uint64_t sent_packets_num_;    ///< Total number of sent packets.
         uint64_t rcvd_packets_num_;    ///< Total number of received packets.
         boost::posix_time::ptime boot_time_; ///< Time when test is started.
-        isc::util::thread::Mutex mutex_; ///< Mutex for concurrent access.
     };
 
     /// Pointer to ExchangeStats.
index 6acf235a4dc793081e2b8cc802d67e3caedf6bd4..0afb9600ad732ff10baf17f3f391744c14e80256 100644 (file)
@@ -1237,6 +1237,9 @@ TestControl::readPacketTemplate(const std::string& file_name) {
 void
 TestControl::processReceivedPacket4(const Pkt4Ptr& pkt4) {
     if (pkt4->getType() == DHCPOFFER) {
+        if (sender_thread_) {
+            consumeSent();
+        }
         Pkt4Ptr discover_pkt4(stats_mgr4_->passRcvdPacket(StatsMgr4::XCHG_DO,
                                                           pkt4));
         CommandOptions::ExchangeMode xchg_mode =
@@ -1280,6 +1283,9 @@ void
 TestControl::processReceivedPacket6(const Pkt6Ptr& pkt6) {
     uint8_t packet_type = pkt6->getType();
     if (packet_type == DHCPV6_ADVERTISE) {
+        if (sender_thread_) {
+            consumeSent();
+        }
         Pkt6Ptr solicit_pkt6(stats_mgr6_->passRcvdPacket(StatsMgr6::XCHG_SA,
                                                          pkt6));
         CommandOptions::ExchangeMode xchg_mode =
@@ -1557,7 +1563,9 @@ TestControl::run() {
         // Calculate number of packets to be sent to stay
         // catch up with rate.
         uint64_t packets_due = 0;
-        if (!sender_thread_) {
+        if (sender_thread_) {
+            consumeSent();
+        } else {
             packets_due = basic_rate_control_.getOutboundMessageCount();
             if (packets_due > 0) {
                 checkLateMessages(basic_rate_control_);
@@ -1581,6 +1589,7 @@ TestControl::run() {
         if (checkExitConditions()) {
             if (sender_thread_ && sender_thread_->isRunning()) {
                 sender_thread_->stop();
+                consumeSent();
             }
             break;
         }
@@ -1622,6 +1631,9 @@ TestControl::run() {
         // Report delay means that user requested printing number
         // of sent/received/dropped packets repeatedly.
         if (options.getReportDelay() > 0) {
+            if (sender_thread_) {
+                consumeSent();
+            }
             printIntermediateStats();
         }
 
@@ -1697,6 +1709,15 @@ TestControl::runSender() {
                 cerr << "sendPackets failed with " << ex.what() << endl;
             }
             continue;
+        } else {
+            if (testDiags('i')) {
+                CommandOptions& options = CommandOptions::instance();
+                if (options.getIpVersion() == 4) {
+                    stats_mgr4_->incrementCounter("shortwait");
+                } else if (options.getIpVersion() == 6) {
+                    stats_mgr6_->incrementCounter("shortwait");
+                }
+            }
         }
 
         // Wait for next due or terminate.
@@ -1797,7 +1818,11 @@ TestControl::sendDiscover4(const bool preload /*= false*/) {
             isc_throw(InvalidOperation, "Statistics Manager for DHCPv4 "
                       "hasn't been initialized");
         }
-        stats_mgr4_->passSentPacket(StatsMgr4::XCHG_DO, pkt4);
+        if (sender_thread_) {
+            sent_queue4_.push(pkt4);
+        } else {
+            stats_mgr4_->passSentPacket(StatsMgr4::XCHG_DO, pkt4);
+        }
         stats_mgr4_->passSentTimes(StatsMgr4::XCHG_DO,
                                    basic_rate_control_.getDue(),
                                    basic_rate_control_.getLast());
@@ -1834,31 +1859,35 @@ TestControl::sendDiscover4(const std::vector<uint8_t>& template_buf,
     if (rand_offset + HW_ETHER_LEN > in_buf.size()) {
         isc_throw(OutOfRange, "randomization offset is out of bounds");
     }
-    PerfPkt4Ptr pkt4(new PerfPkt4(&in_buf[0], in_buf.size(),
-                                  transid_offset,
-                                  transid));
+    PerfPkt4Ptr ppkt4(new PerfPkt4(&in_buf[0], in_buf.size(),
+                                   transid_offset,
+                                   transid));
 
     // Replace MAC address in the template with actual MAC address.
-    pkt4->writeAt(rand_offset, mac_address.begin(), mac_address.end());
+    ppkt4->writeAt(rand_offset, mac_address.begin(), mac_address.end());
     // Create a packet from the temporary buffer.
-    setDefaults4(boost::static_pointer_cast<Pkt4>(pkt4));
+    setDefaults4(boost::static_pointer_cast<Pkt4>(ppkt4));
     // Pack the input packet buffer to output buffer so as it can
     // be sent to server.
-    pkt4->rawPack();
-    IfaceMgr::instance().send(boost::static_pointer_cast<Pkt4>(pkt4));
+    ppkt4->rawPack();
+    IfaceMgr::instance().send(boost::static_pointer_cast<Pkt4>(ppkt4));
     if (!preload) {
         if (!stats_mgr4_) {
             isc_throw(InvalidOperation, "Statistics Manager for DHCPv4 "
                       "hasn't been initialized");
         }
         // Update packet stats.
-        stats_mgr4_->passSentPacket(StatsMgr4::XCHG_DO,
-                                    boost::static_pointer_cast<Pkt4>(pkt4));
+        Pkt4Ptr pkt4 = boost::static_pointer_cast<Pkt4>(pkt4);
+        if (sender_thread_) {
+            sent_queue4_.push(pkt4);
+        } else {
+            stats_mgr4_->passSentPacket(StatsMgr4::XCHG_DO, pkt4);
+        }
         stats_mgr4_->passSentTimes(StatsMgr4::XCHG_DO,
                                    basic_rate_control_.getDue(),
                                    basic_rate_control_.getLast());
     }
-    saveFirstPacket(pkt4);
+    saveFirstPacket(ppkt4);
 }
 
 bool
@@ -2341,7 +2370,11 @@ TestControl::sendSolicit6(const bool preload /*= false*/) {
             isc_throw(InvalidOperation, "Statistics Manager for DHCPv6 "
                       "hasn't been initialized");
         }
-        stats_mgr6_->passSentPacket(StatsMgr6::XCHG_SA, pkt6);
+        if (sender_thread_) {
+            sent_queue6_.push(pkt6);
+        } else {
+            stats_mgr6_->passSentPacket(StatsMgr6::XCHG_SA, pkt6);
+        }
         stats_mgr6_->passSentTimes(StatsMgr6::XCHG_SA,
                                    basic_rate_control_.getDue(),
                                    basic_rate_control_.getLast());
@@ -2395,7 +2428,11 @@ TestControl::sendSolicit6(const std::vector<uint8_t>& template_buf,
                       "hasn't been initialized");
         }
         // Update packet stats.
-        stats_mgr6_->passSentPacket(StatsMgr6::XCHG_SA, pkt6);
+        if (sender_thread_) {
+            sent_queue6_.push(pkt6);
+        } else {
+            stats_mgr6_->passSentPacket(StatsMgr6::XCHG_SA, pkt6);
+        }
         stats_mgr6_->passSentTimes(StatsMgr6::XCHG_SA,
                                    basic_rate_control_.getDue(),
                                    basic_rate_control_.getLast());
@@ -2497,5 +2534,15 @@ TestControl::testDiags(const char diag) const {
     return (false);
 }
 
+void
+TestControl::consumeSent() {
+    CommandOptions& options = CommandOptions::instance();
+    if (options.getIpVersion() == 4) {
+        consumeSent4();
+    } else {
+        consumeSent6();
+    }
+}
+
 }  // namespace perfdhcp
 }  // namespace isc
index 507428da583e4e45798b91a9924e1ea85a6a1030..349eac7aaae8ebb113d44843553e3b3d223fcc94 100644 (file)
@@ -16,6 +16,7 @@
 #include <dhcp/pkt4.h>
 #include <dhcp/pkt6.h>
 #include <util/random/random_number_generator.h>
+#include <util/threads/sync.h>
 #include <util/threads/watched_thread.h>
 
 #include <boost/noncopyable.hpp>
@@ -23,6 +24,7 @@
 #include <boost/function.hpp>
 #include <boost/date_time/posix_time/posix_time.hpp>
 
+#include <queue>
 #include <string>
 #include <vector>
 
@@ -1144,6 +1146,83 @@ protected:
     boost::shared_ptr<TestControlSocket> socket_; ///< Socket object.
 
     util::thread::WatchedThreadPtr sender_thread_; ///< Sender watched thread.
+
+    /// \brief Sent Packet Queue.
+    /// \tparam T class representing DHCPv4 or DHCPv6 packet.
+    template <class T = dhcp::Pkt4>
+    struct SentQueue {
+        /// \brief Type of shared pointer to packets.
+        typedef boost::shared_ptr<T> TPtr;
+
+        /// \brief Push a packet.
+        void push(TPtr pkt) {
+            isc::util::thread::Mutex::Locker lock(mutex_);
+            pkt_queue_.push(pkt);
+        }
+
+        /// \brief Pop a packet.
+        TPtr pop() {
+            if (pkt_queue_.empty()) {
+                return (TPtr());
+            }
+            isc::util::thread::Mutex::Locker lock(mutex_);
+            auto pkt = pkt_queue_.front();
+            pkt_queue_.pop();
+            return pkt;
+        }
+
+        /// \brief Consume sent packets.
+        /// \param pass_sent_packet passSentPacket lambda,
+        void consumeSent(const boost::function<void(TPtr)>& pass_sent_packet) {
+            TPtr pkt;
+            while ((pkt = pop())) {
+                pass_sent_packet(pkt);
+            }
+        }
+
+        /// \brief Packet queue.
+        std::queue<TPtr> pkt_queue_;
+
+        /// \brief Mutex for concurrent access.
+        isc::util::thread::Mutex mutex_;
+    };
+
+    /// Type of sent Packet Queue for DHCPv4.
+    typedef SentQueue<dhcp::Pkt4> SentQueue4;
+
+    /// Type of sent Packet Queue for DHCPv6.
+    typedef SentQueue<dhcp::Pkt6> SentQueue6;
+
+    /// Sent Packet Queue for DHCPv4.
+    SentQueue4 sent_queue4_;
+
+    /// Sent Packet Queue for DHCPv6.
+    SentQueue6 sent_queue6_;
+
+    /// Consume sent packets for DHCPv4.
+    void consumeSent4() {
+        if (!stats_mgr4_) {
+            isc_throw(InvalidOperation, "Statistics Manager for DHCPv4 "
+                      "hasn't been initialized");
+        }
+        sent_queue4_.consumeSent([this](dhcp::Pkt4Ptr pkt4) {
+                stats_mgr4_->passSentPacket(StatsMgr4::XCHG_DO, pkt4);
+            });
+    }
+    
+    /// Consume sent packets for DHCPv6.
+    void consumeSent6() {
+        if (!stats_mgr6_) {
+            isc_throw(InvalidOperation, "Statistics Manager for DHCPv6 "
+                      "hasn't been initialized");
+        }
+        sent_queue6_.consumeSent([this](dhcp::Pkt6Ptr pkt6) {
+                stats_mgr6_->passSentPacket(StatsMgr6::XCHG_SA, pkt6);
+            });
+    }
+
+    /// Consume sent packets.
+    void consumeSent();
 };
 
 }  // namespace perfdhcp