From: Francis Dupont Date: Mon, 24 Dec 2018 16:13:48 +0000 (+0100) Subject: [283-perfdhcp-sending-thread] Moved the mutex to a new sent packet queue X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=683f0ef393179fd1a50027a84330a1e1fe097dba;p=thirdparty%2Fkea.git [283-perfdhcp-sending-thread] Moved the mutex to a new sent packet queue --- diff --git a/src/bin/perfdhcp/stats_mgr.h b/src/bin/perfdhcp/stats_mgr.h index fe3b790a2b..4ce4cf3473 100644 --- a/src/bin/perfdhcp/stats_mgr.h +++ b/src/bin/perfdhcp/stats_mgr.h @@ -9,7 +9,6 @@ #include #include -#include #include #include @@ -307,7 +306,6 @@ public: if (!packet) { isc_throw(BadValue, "Packet is null"); } - isc::util::thread::Mutex::Locker lock(mutex_); ++sent_packets_num_; sent_packets_.template get<0>().push_back(packet); } @@ -322,7 +320,6 @@ public: if (!packet) { isc_throw(BadValue, "Packet is null"); } - isc::util::thread::Mutex::Locker lock(mutex_); rcvd_packets_.push_back(packet); } @@ -470,7 +467,6 @@ public: ++ordered_lookups_; packet_found = true; } else { - isc::util::thread::Mutex::Locker lock(mutex_); // If we are here, it means that we were unable to match the // next incoming packet with next sent packet so we need to // take a little more expensive approach to look packets using @@ -579,7 +575,6 @@ public: // If packet was found, we assume it will be never searched // again. We want to delete this packet from the list to // improve performance of future searches. - isc::util::thread::Mutex::Locker lock(mutex_); next_sent_ = eraseSent(next_sent_); return(sent_packet); } @@ -919,24 +914,23 @@ public: /// \brief Erase packet from the list of sent packets. /// /// Method erases packet from the list of sent packets. - /// Lock must be held by caller. /// /// \param it iterator pointing to packet to be erased. /// \return iterator pointing to packet following erased /// packet or sent_packets_.end() if packet not found. PktListIterator eraseSent(const PktListIterator it) { - if (archive_enabled_) { - // We don't want to keep list of all sent packets - // because it will affect packet lookup performance. - // If packet is matched with received packet we - // move it to list of archived packets. List of - // archived packets may be used for diagnostics - // when test is completed. - archived_packets_.push_back(*it); - } - // get<0>() template returns sequential index to - // container. - return(sent_packets_.template get<0>().erase(it)); + if (archive_enabled_) { + // We don't want to keep list of all sent packets + // because it will affect packet lookup performance. + // If packet is matched with received packet we + // move it to list of archived packets. List of + // archived packets may be used for diagnostics + // when test is completed. + archived_packets_.push_back(*it); + } + // get<0>() template returns sequential index to + // container. + return(sent_packets_.template get<0>().erase(it)); } ExchangeType xchg_type_; ///< Packet exchange type. @@ -1004,7 +998,6 @@ public: uint64_t sent_packets_num_; ///< Total number of sent packets. uint64_t rcvd_packets_num_; ///< Total number of received packets. boost::posix_time::ptime boot_time_; ///< Time when test is started. - isc::util::thread::Mutex mutex_; ///< Mutex for concurrent access. }; /// Pointer to ExchangeStats. diff --git a/src/bin/perfdhcp/test_control.cc b/src/bin/perfdhcp/test_control.cc index 6acf235a4d..0afb9600ad 100644 --- a/src/bin/perfdhcp/test_control.cc +++ b/src/bin/perfdhcp/test_control.cc @@ -1237,6 +1237,9 @@ TestControl::readPacketTemplate(const std::string& file_name) { void TestControl::processReceivedPacket4(const Pkt4Ptr& pkt4) { if (pkt4->getType() == DHCPOFFER) { + if (sender_thread_) { + consumeSent(); + } Pkt4Ptr discover_pkt4(stats_mgr4_->passRcvdPacket(StatsMgr4::XCHG_DO, pkt4)); CommandOptions::ExchangeMode xchg_mode = @@ -1280,6 +1283,9 @@ void TestControl::processReceivedPacket6(const Pkt6Ptr& pkt6) { uint8_t packet_type = pkt6->getType(); if (packet_type == DHCPV6_ADVERTISE) { + if (sender_thread_) { + consumeSent(); + } Pkt6Ptr solicit_pkt6(stats_mgr6_->passRcvdPacket(StatsMgr6::XCHG_SA, pkt6)); CommandOptions::ExchangeMode xchg_mode = @@ -1557,7 +1563,9 @@ TestControl::run() { // Calculate number of packets to be sent to stay // catch up with rate. uint64_t packets_due = 0; - if (!sender_thread_) { + if (sender_thread_) { + consumeSent(); + } else { packets_due = basic_rate_control_.getOutboundMessageCount(); if (packets_due > 0) { checkLateMessages(basic_rate_control_); @@ -1581,6 +1589,7 @@ TestControl::run() { if (checkExitConditions()) { if (sender_thread_ && sender_thread_->isRunning()) { sender_thread_->stop(); + consumeSent(); } break; } @@ -1622,6 +1631,9 @@ TestControl::run() { // Report delay means that user requested printing number // of sent/received/dropped packets repeatedly. if (options.getReportDelay() > 0) { + if (sender_thread_) { + consumeSent(); + } printIntermediateStats(); } @@ -1697,6 +1709,15 @@ TestControl::runSender() { cerr << "sendPackets failed with " << ex.what() << endl; } continue; + } else { + if (testDiags('i')) { + CommandOptions& options = CommandOptions::instance(); + if (options.getIpVersion() == 4) { + stats_mgr4_->incrementCounter("shortwait"); + } else if (options.getIpVersion() == 6) { + stats_mgr6_->incrementCounter("shortwait"); + } + } } // Wait for next due or terminate. @@ -1797,7 +1818,11 @@ TestControl::sendDiscover4(const bool preload /*= false*/) { isc_throw(InvalidOperation, "Statistics Manager for DHCPv4 " "hasn't been initialized"); } - stats_mgr4_->passSentPacket(StatsMgr4::XCHG_DO, pkt4); + if (sender_thread_) { + sent_queue4_.push(pkt4); + } else { + stats_mgr4_->passSentPacket(StatsMgr4::XCHG_DO, pkt4); + } stats_mgr4_->passSentTimes(StatsMgr4::XCHG_DO, basic_rate_control_.getDue(), basic_rate_control_.getLast()); @@ -1834,31 +1859,35 @@ TestControl::sendDiscover4(const std::vector& template_buf, if (rand_offset + HW_ETHER_LEN > in_buf.size()) { isc_throw(OutOfRange, "randomization offset is out of bounds"); } - PerfPkt4Ptr pkt4(new PerfPkt4(&in_buf[0], in_buf.size(), - transid_offset, - transid)); + PerfPkt4Ptr ppkt4(new PerfPkt4(&in_buf[0], in_buf.size(), + transid_offset, + transid)); // Replace MAC address in the template with actual MAC address. - pkt4->writeAt(rand_offset, mac_address.begin(), mac_address.end()); + ppkt4->writeAt(rand_offset, mac_address.begin(), mac_address.end()); // Create a packet from the temporary buffer. - setDefaults4(boost::static_pointer_cast(pkt4)); + setDefaults4(boost::static_pointer_cast(ppkt4)); // Pack the input packet buffer to output buffer so as it can // be sent to server. - pkt4->rawPack(); - IfaceMgr::instance().send(boost::static_pointer_cast(pkt4)); + ppkt4->rawPack(); + IfaceMgr::instance().send(boost::static_pointer_cast(ppkt4)); if (!preload) { if (!stats_mgr4_) { isc_throw(InvalidOperation, "Statistics Manager for DHCPv4 " "hasn't been initialized"); } // Update packet stats. - stats_mgr4_->passSentPacket(StatsMgr4::XCHG_DO, - boost::static_pointer_cast(pkt4)); + Pkt4Ptr pkt4 = boost::static_pointer_cast(pkt4); + if (sender_thread_) { + sent_queue4_.push(pkt4); + } else { + stats_mgr4_->passSentPacket(StatsMgr4::XCHG_DO, pkt4); + } stats_mgr4_->passSentTimes(StatsMgr4::XCHG_DO, basic_rate_control_.getDue(), basic_rate_control_.getLast()); } - saveFirstPacket(pkt4); + saveFirstPacket(ppkt4); } bool @@ -2341,7 +2370,11 @@ TestControl::sendSolicit6(const bool preload /*= false*/) { isc_throw(InvalidOperation, "Statistics Manager for DHCPv6 " "hasn't been initialized"); } - stats_mgr6_->passSentPacket(StatsMgr6::XCHG_SA, pkt6); + if (sender_thread_) { + sent_queue6_.push(pkt6); + } else { + stats_mgr6_->passSentPacket(StatsMgr6::XCHG_SA, pkt6); + } stats_mgr6_->passSentTimes(StatsMgr6::XCHG_SA, basic_rate_control_.getDue(), basic_rate_control_.getLast()); @@ -2395,7 +2428,11 @@ TestControl::sendSolicit6(const std::vector& template_buf, "hasn't been initialized"); } // Update packet stats. - stats_mgr6_->passSentPacket(StatsMgr6::XCHG_SA, pkt6); + if (sender_thread_) { + sent_queue6_.push(pkt6); + } else { + stats_mgr6_->passSentPacket(StatsMgr6::XCHG_SA, pkt6); + } stats_mgr6_->passSentTimes(StatsMgr6::XCHG_SA, basic_rate_control_.getDue(), basic_rate_control_.getLast()); @@ -2497,5 +2534,15 @@ TestControl::testDiags(const char diag) const { return (false); } +void +TestControl::consumeSent() { + CommandOptions& options = CommandOptions::instance(); + if (options.getIpVersion() == 4) { + consumeSent4(); + } else { + consumeSent6(); + } +} + } // namespace perfdhcp } // namespace isc diff --git a/src/bin/perfdhcp/test_control.h b/src/bin/perfdhcp/test_control.h index 507428da58..349eac7aaa 100644 --- a/src/bin/perfdhcp/test_control.h +++ b/src/bin/perfdhcp/test_control.h @@ -16,6 +16,7 @@ #include #include #include +#include #include #include @@ -23,6 +24,7 @@ #include #include +#include #include #include @@ -1144,6 +1146,83 @@ protected: boost::shared_ptr socket_; ///< Socket object. util::thread::WatchedThreadPtr sender_thread_; ///< Sender watched thread. + + /// \brief Sent Packet Queue. + /// \tparam T class representing DHCPv4 or DHCPv6 packet. + template + struct SentQueue { + /// \brief Type of shared pointer to packets. + typedef boost::shared_ptr TPtr; + + /// \brief Push a packet. + void push(TPtr pkt) { + isc::util::thread::Mutex::Locker lock(mutex_); + pkt_queue_.push(pkt); + } + + /// \brief Pop a packet. + TPtr pop() { + if (pkt_queue_.empty()) { + return (TPtr()); + } + isc::util::thread::Mutex::Locker lock(mutex_); + auto pkt = pkt_queue_.front(); + pkt_queue_.pop(); + return pkt; + } + + /// \brief Consume sent packets. + /// \param pass_sent_packet passSentPacket lambda, + void consumeSent(const boost::function& pass_sent_packet) { + TPtr pkt; + while ((pkt = pop())) { + pass_sent_packet(pkt); + } + } + + /// \brief Packet queue. + std::queue pkt_queue_; + + /// \brief Mutex for concurrent access. + isc::util::thread::Mutex mutex_; + }; + + /// Type of sent Packet Queue for DHCPv4. + typedef SentQueue SentQueue4; + + /// Type of sent Packet Queue for DHCPv6. + typedef SentQueue SentQueue6; + + /// Sent Packet Queue for DHCPv4. + SentQueue4 sent_queue4_; + + /// Sent Packet Queue for DHCPv6. + SentQueue6 sent_queue6_; + + /// Consume sent packets for DHCPv4. + void consumeSent4() { + if (!stats_mgr4_) { + isc_throw(InvalidOperation, "Statistics Manager for DHCPv4 " + "hasn't been initialized"); + } + sent_queue4_.consumeSent([this](dhcp::Pkt4Ptr pkt4) { + stats_mgr4_->passSentPacket(StatsMgr4::XCHG_DO, pkt4); + }); + } + + /// Consume sent packets for DHCPv6. + void consumeSent6() { + if (!stats_mgr6_) { + isc_throw(InvalidOperation, "Statistics Manager for DHCPv6 " + "hasn't been initialized"); + } + sent_queue6_.consumeSent([this](dhcp::Pkt6Ptr pkt6) { + stats_mgr6_->passSentPacket(StatsMgr6::XCHG_SA, pkt6); + }); + } + + /// Consume sent packets. + void consumeSent(); }; } // namespace perfdhcp