]> git.ipfire.org Git - thirdparty/pdns.git/commitdiff
dnsdist: Shard MaxQPSIPRule() for better scalability
authorRemi Gacogne <remi.gacogne@powerdns.com>
Fri, 10 Feb 2023 16:06:06 +0000 (17:06 +0100)
committerRemi Gacogne <remi.gacogne@powerdns.com>
Fri, 10 Feb 2023 16:21:30 +0000 (17:21 +0100)
pdns/dnsdist-console.cc
pdns/dnsdist-lua-rules.cc
pdns/dnsdistdist/dnsdist-rules.hh
pdns/dnsdistdist/docs/rules-actions.rst
pdns/dnsdistdist/test-dnsdistrules_cc.cc

index 1c132733331ca49040c489adf2b8837a810af1b0..fd71c052b56c8315c4ab41a62d40eae4a880b21a 100644 (file)
@@ -583,7 +583,7 @@ const std::vector<ConsoleKeyword> g_consoleKeywords{
 #endif /* HAVE_IPCIPHER */
   { "makeKey", true, "", "generate a new server access key, emit configuration line ready for pasting" },
   { "makeRule", true, "rule", "Make a NetmaskGroupRule() or a SuffixMatchNodeRule(), depending on how it is called" }  ,
-  { "MaxQPSIPRule", true, "qps, [v4Mask=32 [, v6Mask=64 [, burst=qps [, expiration=300 [, cleanupDelay=60]]]]]", "matches traffic exceeding the qps limit per subnet" },
+  { "MaxQPSIPRule", true, "qps, [v4Mask=32 [, v6Mask=64 [, burst=qps [, expiration=300 [, cleanupDelay=60 [, scanFraction=10 [, shards=10]]]]]]]", "matches traffic exceeding the qps limit per subnet" },
   { "MaxQPSRule", true, "qps", "matches traffic **not** exceeding this qps limit" },
   { "mvCacheHitResponseRule", true, "from, to", "move cache hit response rule 'from' to a position where it is in front of 'to'. 'to' can be one larger than the largest rule" },
   { "mvCacheHitResponseRuleToTop", true, "", "move the last cache hit response rule to the first position" },
index 67740f9e030577fa9c549d3af121d9a3b3a4d4a3..33c667676ebc69ae8ca99a506ce477c4ab84b880 100644 (file)
@@ -386,8 +386,8 @@ void setupLuaRules(LuaContext& luaCtx)
     return rulesToString(getTopRules(*rules, top.get_value_or(10)), vars);
   });
 
-  luaCtx.writeFunction("MaxQPSIPRule", [](unsigned int qps, boost::optional<unsigned int> ipv4trunc, boost::optional<unsigned int> ipv6trunc, boost::optional<unsigned int> burst, boost::optional<unsigned int> expiration, boost::optional<unsigned int> cleanupDelay, boost::optional<unsigned int> scanFraction) {
-      return std::shared_ptr<DNSRule>(new MaxQPSIPRule(qps, burst.get_value_or(qps), ipv4trunc.get_value_or(32), ipv6trunc.get_value_or(64), expiration.get_value_or(300), cleanupDelay.get_value_or(60), scanFraction.get_value_or(10)));
+  luaCtx.writeFunction("MaxQPSIPRule", [](unsigned int qps, boost::optional<unsigned int> ipv4trunc, boost::optional<unsigned int> ipv6trunc, boost::optional<unsigned int> burst, boost::optional<unsigned int> expiration, boost::optional<unsigned int> cleanupDelay, boost::optional<unsigned int> scanFraction, boost::optional<unsigned int> shards) {
+      return std::shared_ptr<DNSRule>(new MaxQPSIPRule(qps, burst.get_value_or(qps), ipv4trunc.get_value_or(32), ipv6trunc.get_value_or(64), expiration.get_value_or(300), cleanupDelay.get_value_or(60), scanFraction.get_value_or(10), shards.get_value_or(10)));
     });
 
   luaCtx.writeFunction("MaxQPSRule", [](unsigned int qps, boost::optional<unsigned int> burst) {
index b89d9df3ca072fcf14ce88ca5e802bfbe68bc22f..8684d7ffd3a3e8bba98c740024808ba1eeabef49 100644 (file)
@@ -37,8 +37,8 @@
 class MaxQPSIPRule : public DNSRule
 {
 public:
-  MaxQPSIPRule(unsigned int qps, unsigned int burst, unsigned int ipv4trunc=32, unsigned int ipv6trunc=64, unsigned int expiration=300, unsigned int cleanupDelay=60, unsigned int scanFraction=10):
-    d_qps(qps), d_burst(burst), d_ipv4trunc(ipv4trunc), d_ipv6trunc(ipv6trunc), d_cleanupDelay(cleanupDelay), d_expiration(expiration), d_scanFraction(scanFraction)
+  MaxQPSIPRule(unsigned int qps, unsigned int burst, unsigned int ipv4trunc=32, unsigned int ipv6trunc=64, unsigned int expiration=300, unsigned int cleanupDelay=60, unsigned int scanFraction=10, size_t shardsCount=10):
+    d_shards(shardsCount), d_qps(qps), d_burst(burst), d_ipv4trunc(ipv4trunc), d_ipv6trunc(ipv6trunc), d_cleanupDelay(cleanupDelay), d_expiration(expiration), d_scanFraction(scanFraction)
   {
     d_cleaningUp.clear();
     gettime(&d_lastCleanup, true);
@@ -46,32 +46,40 @@ public:
 
   void clear()
   {
-    d_limits.lock()->clear();
+    for (auto& shard : d_shards) {
+      shard.lock()->clear();
+    }
   }
 
   size_t cleanup(const struct timespec& cutOff, size_t* scannedCount=nullptr) const
   {
-    auto limits = d_limits.lock();
-    size_t toLook = limits->size() / d_scanFraction + 1;
-    size_t lookedAt = 0;
-
     size_t removed = 0;
-    auto& sequence = limits->get<SequencedTag>();
-    for (auto entry = sequence.begin(); entry != sequence.end() && lookedAt < toLook; lookedAt++) {
-      if (entry->d_limiter.seenSince(cutOff)) {
-        /* entries are ordered from least recently seen to more recently
-           seen, as soon as we see one that has not expired yet, we are
-           done */
-        lookedAt++;
-        break;
-      }
-
-      entry = sequence.erase(entry);
-      removed++;
+    if (scannedCount != nullptr) {
+      *scannedCount = 0;
     }
 
-    if (scannedCount != nullptr) {
-      *scannedCount = lookedAt;
+    for (auto& shard : d_shards) {
+      auto limits = shard.lock();
+      const size_t toLook = std::round((1.0 * limits->size()) / d_scanFraction)+ 1;
+      size_t lookedAt = 0;
+
+      auto& sequence = limits->get<SequencedTag>();
+      for (auto entry = sequence.begin(); entry != sequence.end() && lookedAt < toLook; lookedAt++) {
+        if (entry->d_limiter.seenSince(cutOff)) {
+          /* entries are ordered from least recently seen to more recently
+             seen, as soon as we see one that has not expired yet, we are
+             done */
+          lookedAt++;
+          break;
+        }
+
+        entry = sequence.erase(entry);
+        removed++;
+      }
+
+      if (scannedCount != nullptr) {
+        *scannedCount += lookedAt;
+      }
     }
 
     return removed;
@@ -112,8 +120,10 @@ public:
     ComboAddress zeroport(dq->ids.origRemote);
     zeroport.sin4.sin_port=0;
     zeroport.truncate(zeroport.sin4.sin_family == AF_INET ? d_ipv4trunc : d_ipv6trunc);
+    auto hash = ComboAddress::addressOnlyHash()(zeroport);
+    auto& shard = d_shards[hash % d_shards.size()];
     {
-      auto limits = d_limits.lock();
+      auto limits = shard.lock();
       auto iter = limits->find(zeroport);
       if (iter == limits->end()) {
         Entry e(zeroport, QPSLimiter(d_qps, d_burst));
@@ -132,7 +142,16 @@ public:
 
   size_t getEntriesCount() const
   {
-    return d_limits.lock()->size();
+    size_t count = 0;
+    for (auto& shard : d_shards) {
+      count += shard.lock()->size();
+    }
+    return count;
+  }
+
+  size_t getNumberOfShards() const
+  {
+    return d_shards.size();
   }
 
 private:
@@ -155,10 +174,10 @@ private:
       >
   > qpsContainer_t;
 
-  mutable LockGuarded<qpsContainer_t> d_limits;
+  mutable std::vector<LockGuarded<qpsContainer_t>> d_shards;
   mutable struct timespec d_lastCleanup;
-  unsigned int d_qps, d_burst, d_ipv4trunc, d_ipv6trunc, d_cleanupDelay, d_expiration;
-  unsigned int d_scanFraction{10};
+  const unsigned int d_qps, d_burst, d_ipv4trunc, d_ipv6trunc, d_cleanupDelay, d_expiration;
+  const unsigned int d_scanFraction{10};
   mutable std::atomic_flag d_cleaningUp;
 };
 
index 38bf7b564bf24e1171cfb1f221413753c91b1cbd..9f461cd6ded1ea13a3f2ed6b2a4a2f77b80d946a 100644 (file)
@@ -607,7 +607,10 @@ These ``DNSRule``\ s be one of the following items:
 
   :param string function: the name of a Lua function
 
-.. function:: MaxQPSIPRule(qps[, v4Mask[, v6Mask[, burst[, expiration[, cleanupDelay[, scanFraction]]]]]])
+.. function:: MaxQPSIPRule(qps[, v4Mask[, v6Mask[, burst[, expiration[, cleanupDelay[, scanFraction [, shards]]]]]]])
+
+  .. versionchanged:: 1.8.0
+    ``shards`` parameter added
 
   Matches traffic for a subnet specified by ``v4Mask`` or ``v6Mask`` exceeding ``qps`` queries per second up to ``burst`` allowed.
   This rule keeps track of QPS by netmask or source IP. This state is cleaned up regularly if  ``cleanupDelay`` is greater than zero,
@@ -620,6 +623,7 @@ These ``DNSRule``\ s be one of the following items:
   :param int expiration: How long to keep netmask or IP addresses after they have last been seen, in seconds. Default is 300
   :param int cleanupDelay: The number of seconds between two cleanups. Default is 60
   :param int scanFraction: The maximum fraction of the store to scan for expired entries, for example 5 would scan at most 20% of it. Default is 10 so 10%
+  :param int shards: How many shards to use, to decrease lock contention between threads. Default is 10 and is a safe default unless a very high number of threads are used to process incoming queries
 
 .. function:: MaxQPSRule(qps)
 
index b8adc5916d16fd3b61a52fd25af9a5a139cdc01a..b636ea922fb381fdd3ddde98599846dea4de2194 100644 (file)
@@ -106,7 +106,7 @@ BOOST_AUTO_TEST_CASE(test_MaxQPSIPRule) {
   auto removed = rule.cleanup(notExpiredTime, &scanned);
   BOOST_CHECK_EQUAL(removed, 0U);
   /* the first entry should still have been valid, we should not have scanned more */
-  BOOST_CHECK_EQUAL(scanned, 1U);
+  BOOST_CHECK_EQUAL(scanned, rule.getNumberOfShards());
   BOOST_CHECK_EQUAL(rule.getEntriesCount(), total);
 
   /* make sure all entries are _not_ valid anymore */
@@ -114,7 +114,7 @@ BOOST_AUTO_TEST_CASE(test_MaxQPSIPRule) {
   expiredTime.tv_sec += 1;
 
   removed = rule.cleanup(expiredTime, &scanned);
-  BOOST_CHECK_EQUAL(removed, (total / scanFraction) + 1);
+  BOOST_CHECK_EQUAL(removed, (total / scanFraction) + 1 + rule.getNumberOfShards());
   /* we should not have scanned more than scanFraction */
   BOOST_CHECK_EQUAL(scanned, removed);
   BOOST_CHECK_EQUAL(rule.getEntriesCount(), total - removed);