]> git.ipfire.org Git - thirdparty/pdns.git/commitdiff
dnsdist: Move the ring buffers to LockGuarded
authorRemi Gacogne <remi.gacogne@powerdns.com>
Fri, 16 Apr 2021 13:39:53 +0000 (15:39 +0200)
committerRemi Gacogne <remi.gacogne@powerdns.com>
Tue, 17 Aug 2021 12:04:45 +0000 (14:04 +0200)
pdns/dnsdist-lua-inspection.cc
pdns/dnsdist-rings.cc
pdns/dnsdist-rings.hh
pdns/dnsdistdist/dnsdist-dynblocks.cc
pdns/dnsdistdist/test-dnsdistrings_cc.cc

index 163df71d0442da3ac2fa8202d209e68dade89044..b1d6a38b33f59ab4ead1a0027e20575931085d21 100644 (file)
@@ -33,9 +33,9 @@ static std::unordered_map<unsigned int, vector<boost::variant<string,double>>> g
   unsigned int total=0;
   {
     for (const auto& shard : g_rings.d_shards) {
-      std::lock_guard<std::mutex> rl(shard->respLock);
-      if(!labels) {
-        for(const auto& a : shard->respRing) {
+      auto rl = shard->respRing.lock();
+      if (!labels) {
+        for(const auto& a : *rl) {
           if(!pred(a))
             continue;
           counts[a.name]++;
@@ -44,7 +44,7 @@ static std::unordered_map<unsigned int, vector<boost::variant<string,double>>> g
       }
       else {
         unsigned int lab = *labels;
-        for(const auto& a : shard->respRing) {
+        for(const auto& a : *rl) {
           if(!pred(a))
             continue;
 
@@ -115,9 +115,9 @@ static void statNodeRespRing(statvisitor_t visitor, unsigned int seconds)
 
   StatNode root;
   for (const auto& shard : g_rings.d_shards) {
-    std::lock_guard<std::mutex> rl(shard->respLock);
+    auto rl = shard->respRing.lock();
 
-    for(const auto& c : shard->respRing) {
+    for(const auto& c : *rl) {
       if (now < c.when)
         continue;
 
@@ -139,11 +139,11 @@ static vector<pair<unsigned int, std::unordered_map<string,string> > > getRespRi
   vector<pair<unsigned int, entry_t > > ret;
 
   for (const auto& shard : g_rings.d_shards) {
-    std::lock_guard<std::mutex> rl(shard->respLock);
+    auto rl = shard->respRing.lock();
 
     entry_t e;
     unsigned int count=1;
-    for(const auto& c : shard->respRing) {
+    for(const auto& c : *rl) {
       if(rcode && (rcode.get() != c.dh.rcode))
         continue;
       e["qname"]=c.name.toString();
@@ -167,8 +167,8 @@ static counts_t exceedRespGen(unsigned int rate, int seconds, std::function<void
   counts.reserve(g_rings.getNumberOfResponseEntries());
 
   for (const auto& shard : g_rings.d_shards) {
-    std::lock_guard<std::mutex> rl(shard->respLock);
-    for(const auto& c : shard->respRing) {
+    auto rl = shard->respRing.lock();
+    for(const auto& c : *rl) {
 
       if(seconds && c.when < cutoff)
         continue;
@@ -196,8 +196,8 @@ static counts_t exceedQueryGen(unsigned int rate, int seconds, std::function<voi
   counts.reserve(g_rings.getNumberOfQueryEntries());
 
   for (const auto& shard : g_rings.d_shards) {
-    std::lock_guard<std::mutex> rl(shard->queryLock);
-    for(const auto& c : shard->queryRing) {
+    auto rl = shard->queryRing.lock();
+    for(const auto& c : *rl) {
       if(seconds && c.when < cutoff)
         continue;
       if(now < c.when)
@@ -239,8 +239,8 @@ void setupLuaInspection(LuaContext& luaCtx)
       unsigned int total=0;
       {
         for (const auto& shard : g_rings.d_shards) {
-          std::lock_guard<std::mutex> rl(shard->queryLock);
-          for(const auto& c : shard->queryRing) {
+          auto rl = shard->queryRing.lock();
+          for(const auto& c : *rl) {
             counts[c.requestor]++;
             total++;
           }
@@ -272,8 +272,8 @@ void setupLuaInspection(LuaContext& luaCtx)
       unsigned int total=0;
       if(!labels) {
         for (const auto& shard : g_rings.d_shards) {
-          std::lock_guard<std::mutex> rl(shard->queryLock);
-          for(const auto& a : shard->queryRing) {
+          auto rl = shard->queryRing.lock();
+          for(const auto& a : *rl) {
             counts[a.name]++;
             total++;
           }
@@ -282,8 +282,8 @@ void setupLuaInspection(LuaContext& luaCtx)
       else {
        unsigned int lab = *labels;
         for (const auto& shard : g_rings.d_shards) {
-          std::lock_guard<std::mutex> rl(shard->queryLock);
-          for(auto a : shard->queryRing) {
+          auto rl = shard->queryRing.lock();
+          for(auto a : *rl) {
             a.name.trimToLabels(lab);
             counts[a.name]++;
             total++;
@@ -330,8 +330,8 @@ void setupLuaInspection(LuaContext& luaCtx)
       rings.reserve(g_rings.getNumberOfShards());
       for (const auto& shard : g_rings.d_shards) {
         {
-          std::lock_guard<std::mutex> rl(shard->respLock);
-          rings.push_back(shard->respRing);
+          auto rl = shard->respRing.lock();
+          rings.push_back(*rl);
         }
         totalEntries += rings.back().size();
       }
@@ -426,14 +426,14 @@ void setupLuaInspection(LuaContext& luaCtx)
       rr.reserve(g_rings.getNumberOfResponseEntries());
       for (const auto& shard : g_rings.d_shards) {
         {
-          std::lock_guard<std::mutex> rl(shard->queryLock);
-          for (const auto& entry : shard->queryRing) {
+          auto rl = shard->queryRing.lock();
+          for (const auto& entry : *rl) {
             qr.push_back(entry);
           }
         }
         {
-          std::lock_guard<std::mutex> rl(shard->respLock);
-          for (const auto& entry : shard->respRing) {
+          auto rl = shard->respRing.lock();
+          for (const auto& entry : *rl) {
             rr.push_back(entry);
           }
         }
@@ -544,8 +544,8 @@ void setupLuaInspection(LuaContext& luaCtx)
       unsigned int size=0;
       {
         for (const auto& shard : g_rings.d_shards) {
-          std::lock_guard<std::mutex> rl(shard->respLock);
-          for(const auto& r : shard->respRing) {
+          auto rl = shard->respRing.lock();
+          for(const auto& r : *rl) {
             /* skip actively discovered timeouts */
             if (r.usec == std::numeric_limits<unsigned int>::max())
               continue;
index 8bb5ecef759bd12fde9a0eecd2aad25ec60812e8..ce24a5f646ae51dae42206ee172029e38eaf7b0d 100644 (file)
@@ -28,8 +28,8 @@ size_t Rings::numDistinctRequestors()
 {
   std::set<ComboAddress, ComboAddress::addressOnlyLessThan> s;
   for (const auto& shard : d_shards) {
-    std::lock_guard<std::mutex> rl(shard->queryLock);
-    for(const auto& q : shard->queryRing) {
+    auto rl = shard->queryRing.lock();
+    for (const auto& q : *rl) {
       s.insert(q.requestor);
     }
   }
@@ -42,16 +42,16 @@ std::unordered_map<int, vector<boost::variant<string,double>>> Rings::getTopBand
   uint64_t total=0;
   for (const auto& shard : d_shards) {
     {
-      std::lock_guard<std::mutex> rl(shard->queryLock);
-      for(const auto& q : shard->queryRing) {
-        counts[q.requestor]+=q.size;
+      auto rl = shard->queryRing.lock();
+      for(const auto& q : *rl) {
+        counts[q.requestor] += q.size;
         total+=q.size;
       }
     }
     {
-      std::lock_guard<std::mutex> rl(shard->respLock);
-      for(const auto& r : shard->respRing) {
-        counts[r.requestor]+=r.size;
+      auto rl = shard->respRing.lock();
+      for(const auto& r : *rl) {
+        counts[r.requestor] += r.size;
         total+=r.size;
       }
     }
index d9878806681fdcdc451ab8079460fb32215a351f..bc424a35c089a95a0a656a03cdc9635133be427c 100644 (file)
@@ -21,7 +21,6 @@
  */
 #pragma once
 
-#include <mutex>
 #include <time.h>
 #include <unordered_map>
 
@@ -30,6 +29,7 @@
 #include "circular_buffer.hh"
 #include "dnsname.hh"
 #include "iputils.hh"
+#include "lock.hh"
 #include "stat_t.hh"
 
 
@@ -57,10 +57,8 @@ struct Rings {
 
   struct Shard
   {
-    boost::circular_buffer<Query> queryRing;
-    boost::circular_buffer<Response> respRing;
-    std::mutex queryLock;
-    std::mutex respLock;
+    LockGuarded<boost::circular_buffer<Query>> queryRing{boost::circular_buffer<Query>()};
+    LockGuarded<boost::circular_buffer<Response>> respRing{boost::circular_buffer<Response>()};
   };
 
   Rings(size_t capacity=10000, size_t numberOfShards=10, size_t nbLockTries=5, bool keepLockingStats=false): d_blockingQueryInserts(0), d_blockingResponseInserts(0), d_deferredQueryInserts(0), d_deferredResponseInserts(0), d_nbQueryEntries(0), d_nbResponseEntries(0), d_currentShardId(0), d_numberOfShards(numberOfShards), d_nbLockTries(nbLockTries), d_keepLockingStats(keepLockingStats)
@@ -83,14 +81,8 @@ struct Rings {
     /* resize all the rings */
     for (auto& shard : d_shards) {
       shard = std::unique_ptr<Shard>(new Shard());
-      {
-        std::lock_guard<std::mutex> wl(shard->queryLock);
-        shard->queryRing.set_capacity(newCapacity / numberOfShards);
-      }
-      {
-        std::lock_guard<std::mutex> wl(shard->respLock);
-        shard->respRing.set_capacity(newCapacity / numberOfShards);
-      }
+      shard->queryRing.lock()->set_capacity(newCapacity / numberOfShards);
+      shard->respRing.lock()->set_capacity(newCapacity / numberOfShards);
     }
 
     /* we just recreated the shards so they are now empty */
@@ -126,9 +118,9 @@ struct Rings {
   {
     for (size_t idx = 0; idx < d_nbLockTries; idx++) {
       auto& shard = getOneShard();
-      std::unique_lock<std::mutex> wl(shard->queryLock, std::try_to_lock);
-      if (wl.owns_lock()) {
-        insertQueryLocked(shard, when, requestor, name, qtype, size, dh);
+      auto lock = shard->queryRing.try_lock();
+      if (lock.owns_lock()) {
+        insertQueryLocked(*lock, when, requestor, name, qtype, size, dh);
         return;
       }
       if (d_keepLockingStats) {
@@ -141,17 +133,17 @@ struct Rings {
       d_blockingResponseInserts++;
     }
     auto& shard = getOneShard();
-    std::lock_guard<std::mutex> wl(shard->queryLock);
-    insertQueryLocked(shard, when, requestor, name, qtype, size, dh);
+    auto lock = shard->queryRing.lock();
+    insertQueryLocked(*lock, when, requestor, name, qtype, size, dh);
   }
 
   void insertResponse(const struct timespec& when, const ComboAddress& requestor, const DNSName& name, uint16_t qtype, unsigned int usec, unsigned int size, const struct dnsheader& dh, const ComboAddress& backend)
   {
     for (size_t idx = 0; idx < d_nbLockTries; idx++) {
       auto& shard = getOneShard();
-      std::unique_lock<std::mutex> wl(shard->respLock, std::try_to_lock);
-      if (wl.owns_lock()) {
-        insertResponseLocked(shard, when, requestor, name, qtype, usec, size, dh, backend);
+      auto lock = shard->respRing.try_lock();
+      if (lock.owns_lock()) {
+        insertResponseLocked(*lock, when, requestor, name, qtype, usec, size, dh, backend);
         return;
       }
       if (d_keepLockingStats) {
@@ -164,21 +156,15 @@ struct Rings {
       d_blockingResponseInserts++;
     }
     auto& shard = getOneShard();
-    std::lock_guard<std::mutex> wl(shard->respLock);
-    insertResponseLocked(shard, when, requestor, name, qtype, usec, size, dh, backend);
+    auto lock = shard->respRing.lock();
+    insertResponseLocked(*lock, when, requestor, name, qtype, usec, size, dh, backend);
   }
 
   void clear()
   {
     for (auto& shard : d_shards) {
-      {
-        std::lock_guard<std::mutex> wl(shard->queryLock);
-        shard->queryRing.clear();
-      }
-      {
-        std::lock_guard<std::mutex> wl(shard->respLock);
-        shard->respRing.clear();
-      }
+      shard->queryRing.lock()->clear();
+      shard->respRing.lock()->clear();
     }
 
     d_nbQueryEntries.store(0);
@@ -211,20 +197,20 @@ private:
     return d_shards[getShardId()];
   }
 
-  void insertQueryLocked(std::unique_ptr<Shard>& shard, const struct timespec& when, const ComboAddress& requestor, const DNSName& name, uint16_t qtype, uint16_t size, const struct dnsheader& dh)
+  void insertQueryLocked(boost::circular_buffer<Query>& ring, const struct timespec& when, const ComboAddress& requestor, const DNSName& name, uint16_t qtype, uint16_t size, const struct dnsheader& dh)
   {
-    if (!shard->queryRing.full()) {
+    if (!ring.full()) {
       d_nbQueryEntries++;
     }
-    shard->queryRing.push_back({requestor, name, when, dh, size, qtype});
+    ring.push_back({requestor, name, when, dh, size, qtype});
   }
 
-  void insertResponseLocked(std::unique_ptr<Shard>& shard, const struct timespec& when, const ComboAddress& requestor, const DNSName& name, uint16_t qtype, unsigned int usec, unsigned int size, const struct dnsheader& dh, const ComboAddress& backend)
+  void insertResponseLocked(boost::circular_buffer<Response>& ring, const struct timespec& when, const ComboAddress& requestor, const DNSName& name, uint16_t qtype, unsigned int usec, unsigned int size, const struct dnsheader& dh, const ComboAddress& backend)
   {
-    if (!shard->respRing.full()) {
+    if (!ring.full()) {
       d_nbResponseEntries++;
     }
-    shard->respRing.push_back({requestor, backend, name, when, dh, usec, size, qtype});
+    ring.push_back({requestor, backend, name, when, dh, usec, size, qtype});
   }
 
   std::atomic<size_t> d_nbQueryEntries;
index 1c03aedff31d775b5533d6db3066c3a7499e7677..ef33e33d983384bb2dae4dfd0d25aa6eb36371bc 100644 (file)
@@ -290,8 +290,8 @@ void DynBlockRulesGroup::processQueryRules(counts_t& counts, const struct timesp
   }
 
   for (const auto& shard : g_rings.d_shards) {
-    std::lock_guard<std::mutex> rl(shard->queryLock);
-    for(const auto& c : shard->queryRing) {
+    auto rl = shard->queryRing.lock();
+    for(const auto& c : *rl) {
       if (now < c.when) {
         continue;
       }
@@ -349,8 +349,8 @@ void DynBlockRulesGroup::processResponseRules(counts_t& counts, StatNode& root,
   }
 
   for (const auto& shard : g_rings.d_shards) {
-    std::lock_guard<std::mutex> rl(shard->respLock);
-    for(const auto& c : shard->respRing) {
+    auto rl = shard->respRing.lock();
+    for(const auto& c : *rl) {
       if (now < c.when) {
         continue;
       }
index fee1f51dd971729a1b39566ee3c657ed7fd71f4a..b0194bc95d09d864aa07dc0b7805a73d1e17906a 100644 (file)
@@ -40,8 +40,9 @@ static void test_ring(size_t maxEntries, size_t numberOfShards, size_t nbLockTri
   BOOST_CHECK_EQUAL(rings.getNumberOfQueryEntries(), maxEntries);
   BOOST_CHECK_EQUAL(rings.getNumberOfResponseEntries(), 0U);
   for (const auto& shard : rings.d_shards) {
-    BOOST_CHECK_EQUAL(shard->queryRing.size(), entriesPerShard);
-    for (const auto& entry : shard->queryRing) {
+    auto ring = shard->queryRing.lock();
+    BOOST_CHECK_EQUAL(ring->size(), entriesPerShard);
+    for (const auto& entry : *ring) {
       BOOST_CHECK_EQUAL(entry.name, qname);
       BOOST_CHECK_EQUAL(entry.qtype, qtype);
       BOOST_CHECK_EQUAL(entry.size, size);
@@ -57,8 +58,9 @@ static void test_ring(size_t maxEntries, size_t numberOfShards, size_t nbLockTri
   BOOST_CHECK_EQUAL(rings.getNumberOfQueryEntries(), maxEntries);
   BOOST_CHECK_EQUAL(rings.getNumberOfResponseEntries(), 0U);
   for (const auto& shard : rings.d_shards) {
-    BOOST_CHECK_EQUAL(shard->queryRing.size(), entriesPerShard);
-    for (const auto& entry : shard->queryRing) {
+    auto ring = shard->queryRing.lock();
+    BOOST_CHECK_EQUAL(ring->size(), entriesPerShard);
+    for (const auto& entry : *ring) {
       BOOST_CHECK_EQUAL(entry.name, qname);
       BOOST_CHECK_EQUAL(entry.qtype, qtype);
       BOOST_CHECK_EQUAL(entry.size, size);
@@ -77,8 +79,9 @@ static void test_ring(size_t maxEntries, size_t numberOfShards, size_t nbLockTri
   BOOST_CHECK_EQUAL(rings.getNumberOfQueryEntries(), maxEntries);
   BOOST_CHECK_EQUAL(rings.getNumberOfResponseEntries(), maxEntries);
   for (const auto& shard : rings.d_shards) {
-    BOOST_CHECK_EQUAL(shard->respRing.size(), entriesPerShard);
-    for (const auto& entry : shard->respRing) {
+    auto ring = shard->respRing.lock();
+    BOOST_CHECK_EQUAL(ring->size(), entriesPerShard);
+    for (const auto& entry : *ring) {
       BOOST_CHECK_EQUAL(entry.name, qname);
       BOOST_CHECK_EQUAL(entry.qtype, qtype);
       BOOST_CHECK_EQUAL(entry.size, size);
@@ -96,8 +99,9 @@ static void test_ring(size_t maxEntries, size_t numberOfShards, size_t nbLockTri
   BOOST_CHECK_EQUAL(rings.getNumberOfQueryEntries(), maxEntries);
   BOOST_CHECK_EQUAL(rings.getNumberOfResponseEntries(), maxEntries);
   for (const auto& shard : rings.d_shards) {
-    BOOST_CHECK_EQUAL(shard->respRing.size(), entriesPerShard);
-    for (const auto& entry : shard->respRing) {
+    auto ring = shard->respRing.lock();
+    BOOST_CHECK_EQUAL(ring->size(), entriesPerShard);
+    for (const auto& entry : *ring) {
       BOOST_CHECK_EQUAL(entry.name, qname);
       BOOST_CHECK_EQUAL(entry.qtype, qtype);
       BOOST_CHECK_EQUAL(entry.size, size);
@@ -130,8 +134,8 @@ static void ringReaderThread(Rings& rings, std::atomic<bool>& done, size_t numbe
 
     for (const auto& shard : rings.d_shards) {
       {
-        std::lock_guard<std::mutex> rl(shard->queryLock);
-        for(const auto& c : shard->queryRing) {
+        auto rl = shard->queryRing.lock();
+        for(const auto& c : *rl) {
           numberOfQueries++;
           // BOOST_CHECK* is slow as hell..
           if(c.qtype != qtype) {
@@ -141,8 +145,8 @@ static void ringReaderThread(Rings& rings, std::atomic<bool>& done, size_t numbe
         }
       }
       {
-        std::lock_guard<std::mutex> rl(shard->respLock);
-        for(const auto& c : shard->respRing) {
+        auto rl = shard->respRing.lock();
+        for(const auto& c : *rl) {
           if(c.qtype != qtype) {
             cerr<<"Invalid response QType!"<<endl;
             return;
@@ -233,33 +237,39 @@ BOOST_AUTO_TEST_CASE(test_Rings_Threaded) {
   size_t totalQueries = 0;
   size_t totalResponses = 0;
   for (const auto& shard : rings.d_shards) {
-    BOOST_CHECK_LE(shard->queryRing.size(), entriesPerShard);
-    // verify that the shard is not empty
-    BOOST_CHECK_GT(shard->queryRing.size(), (entriesPerShard * 0.5) + 1);
-    // this would be optimal
-    BOOST_WARN_GT(shard->queryRing.size(), entriesPerShard * 0.95);
-    totalQueries += shard->queryRing.size();
-    for (const auto& entry : shard->queryRing) {
-      BOOST_CHECK_EQUAL(entry.name, qname);
-      BOOST_CHECK_EQUAL(entry.qtype, qtype);
-      BOOST_CHECK_EQUAL(entry.size, size);
-      BOOST_CHECK_EQUAL(entry.when.tv_sec, now.tv_sec);
-      BOOST_CHECK_EQUAL(entry.requestor.toStringWithPort(), requestor.toStringWithPort());
+    {
+      auto ring = shard->queryRing.lock();
+      BOOST_CHECK_LE(ring->size(), entriesPerShard);
+      // verify that the shard is not empty
+      BOOST_CHECK_GT(ring->size(), (entriesPerShard * 0.5) + 1);
+      // this would be optimal
+      BOOST_WARN_GT(ring->size(), entriesPerShard * 0.95);
+      totalQueries += ring->size();
+      for (const auto& entry : *ring) {
+        BOOST_CHECK_EQUAL(entry.name, qname);
+        BOOST_CHECK_EQUAL(entry.qtype, qtype);
+        BOOST_CHECK_EQUAL(entry.size, size);
+        BOOST_CHECK_EQUAL(entry.when.tv_sec, now.tv_sec);
+        BOOST_CHECK_EQUAL(entry.requestor.toStringWithPort(), requestor.toStringWithPort());
+      }
     }
-    BOOST_CHECK_LE(shard->respRing.size(), entriesPerShard);
-    // verify that the shard is not empty
-    BOOST_CHECK_GT(shard->queryRing.size(), (entriesPerShard * 0.5) + 1);
-    // this would be optimal
-    BOOST_WARN_GT(shard->respRing.size(), entriesPerShard * 0.95);
-    totalResponses += shard->respRing.size();
-    for (const auto& entry : shard->respRing) {
-      BOOST_CHECK_EQUAL(entry.name, qname);
-      BOOST_CHECK_EQUAL(entry.qtype, qtype);
-      BOOST_CHECK_EQUAL(entry.size, size);
-      BOOST_CHECK_EQUAL(entry.when.tv_sec, now.tv_sec);
-      BOOST_CHECK_EQUAL(entry.requestor.toStringWithPort(), requestor.toStringWithPort());
-      BOOST_CHECK_EQUAL(entry.usec, latency);
-      BOOST_CHECK_EQUAL(entry.ds.toStringWithPort(), server.toStringWithPort());
+    {
+      auto ring = shard->respRing.lock();
+      BOOST_CHECK_LE(ring->size(), entriesPerShard);
+      // verify that the shard is not empty
+      BOOST_CHECK_GT(ring->size(), (entriesPerShard * 0.5) + 1);
+      // this would be optimal
+      BOOST_WARN_GT(ring->size(), entriesPerShard * 0.95);
+      totalResponses += ring->size();
+      for (const auto& entry : *ring) {
+        BOOST_CHECK_EQUAL(entry.name, qname);
+        BOOST_CHECK_EQUAL(entry.qtype, qtype);
+        BOOST_CHECK_EQUAL(entry.size, size);
+        BOOST_CHECK_EQUAL(entry.when.tv_sec, now.tv_sec);
+        BOOST_CHECK_EQUAL(entry.requestor.toStringWithPort(), requestor.toStringWithPort());
+        BOOST_CHECK_EQUAL(entry.usec, latency);
+        BOOST_CHECK_EQUAL(entry.ds.toStringWithPort(), server.toStringWithPort());
+      }
     }
   }
   BOOST_CHECK_EQUAL(rings.getNumberOfQueryEntries(), totalQueries);