]> git.ipfire.org Git - thirdparty/pdns.git/commitdiff
Make packet cache sharded
authorOtto Moerbeek <otto.moerbeek@open-xchange.com>
Wed, 22 Feb 2023 15:03:41 +0000 (16:03 +0100)
committerOtto Moerbeek <otto.moerbeek@open-xchange.com>
Mon, 3 Apr 2023 11:29:49 +0000 (13:29 +0200)
Unit test run fine, but recursor itself has not been adapted yet

pdns/recursordist/recpacketcache.cc
pdns/recursordist/recpacketcache.hh

index 205faf50b7f08d0e312a8769b8c4a2d4f212cfea..054efc9714620d71fcf40bcc439cb5080e84f88a 100644 (file)
 
 unsigned int RecursorPacketCache::s_refresh_ttlperc{0};
 
-int RecursorPacketCache::doWipePacketCache(const DNSName& name, uint16_t qtype, bool subtree)
+uint64_t RecursorPacketCache::doWipePacketCache(const DNSName& name, uint16_t qtype, bool subtree)
 {
-  int count = 0;
-  auto& idx = d_packetCache.get<NameTag>();
-  for (auto iter = idx.lower_bound(name); iter != idx.end();) {
-    if (subtree) {
-      if (!iter->d_name.isPartOf(name)) { // this is case insensitive
-        break;
+  uint64_t count = 0;
+  for (auto& map : d_maps) {
+    auto shard = map.lock();
+    auto& idx = shard->d_map.get<NameTag>();
+    for (auto iter = idx.lower_bound(name); iter != idx.end();) {
+      if (subtree) {
+        if (!iter->d_name.isPartOf(name)) { // this is case insensitive
+          break;
+        }
       }
-    }
-    else {
-      if (iter->d_name != name) {
-        break;
+      else {
+        if (iter->d_name != name) {
+          break;
+        }
+      }
+      if (qtype == 0xffff || iter->d_type == qtype) {
+        iter = idx.erase(iter);
+        map.d_entriesCount--;
+        count++;
+      }
+      else {
+        ++iter;
       }
-    }
-
-    if (qtype == 0xffff || iter->d_type == qtype) {
-      iter = idx.erase(iter);
-      count++;
-    }
-    else {
-      ++iter;
     }
   }
   return count;
@@ -50,7 +53,7 @@ bool RecursorPacketCache::qrMatch(const packetCache_t::index<HashTag>::type::ite
   return queryMatches(iter->d_query, queryPacket, qname, optionsToSkip);
 }
 
-bool RecursorPacketCache::checkResponseMatches(std::pair<packetCache_t::index<HashTag>::type::iterator, packetCache_t::index<HashTag>::type::iterator> range, const std::string& queryPacket, const DNSName& qname, uint16_t qtype, uint16_t qclass, time_t now, std::string* responsePacket, uint32_t* age, vState* valState, OptPBData* pbdata)
+bool RecursorPacketCache::checkResponseMatches(packetCache_t& shard, std::pair<packetCache_t::index<HashTag>::type::iterator, packetCache_t::index<HashTag>::type::iterator> range, const std::string& queryPacket, const DNSName& qname, uint16_t qtype, uint16_t qclass, time_t now, std::string* responsePacket, uint32_t* age, vState* valState, OptPBData* pbdata)
 {
   for (auto iter = range.first; iter != range.second; ++iter) {
     // the possibility is VERY real that we get hits that are not right - birthday paradox
@@ -80,7 +83,7 @@ bool RecursorPacketCache::checkResponseMatches(std::pair<packetCache_t::index<Ha
       }
 
       d_hits++;
-      moveCacheItemToBack<SequencedTag>(d_packetCache, iter);
+      moveCacheItemToBack<SequencedTag>(shard, iter);
 
       if (pbdata != nullptr) {
         if (iter->d_pbdata) {
@@ -108,7 +111,8 @@ bool RecursorPacketCache::getResponsePacket(unsigned int tag, const std::string&
                                             std::string* responsePacket, uint32_t* age, vState* valState, uint32_t* qhash, OptPBData* pbdata, bool tcp)
 {
   *qhash = canHashPacket(queryPacket, s_skipOptions);
-  const auto& idx = d_packetCache.get<HashTag>();
+  auto shard = getMap(tag, *qhash, tcp).lock();
+  const auto& idx = shard->d_map.get<HashTag>();
   auto range = idx.equal_range(std::tie(tag, *qhash, tcp));
 
   if (range.first == range.second) {
@@ -116,14 +120,15 @@ bool RecursorPacketCache::getResponsePacket(unsigned int tag, const std::string&
     return false;
   }
 
-  return checkResponseMatches(range, queryPacket, qname, qtype, qclass, now, responsePacket, age, valState, pbdata);
+  return checkResponseMatches(shard->d_map, range, queryPacket, qname, qtype, qclass, now, responsePacket, age, valState, pbdata);
 }
 
 bool RecursorPacketCache::getResponsePacket(unsigned int tag, const std::string& queryPacket, DNSName& qname, uint16_t* qtype, uint16_t* qclass, time_t now,
                                             std::string* responsePacket, uint32_t* age, vState* valState, uint32_t* qhash, OptPBData* pbdata, bool tcp)
 {
   *qhash = canHashPacket(queryPacket, s_skipOptions);
-  const auto& idx = d_packetCache.get<HashTag>();
+  auto shard = getMap(tag, *qhash, tcp).lock();
+  const auto& idx = shard->d_map.get<HashTag>();
   auto range = idx.equal_range(std::tie(tag, *qhash, tcp));
 
   if (range.first == range.second) {
@@ -133,12 +138,14 @@ bool RecursorPacketCache::getResponsePacket(unsigned int tag, const std::string&
 
   qname = DNSName(queryPacket.c_str(), static_cast<int>(queryPacket.length()), sizeof(dnsheader), false, qtype, qclass);
 
-  return checkResponseMatches(range, queryPacket, qname, *qtype, *qclass, now, responsePacket, age, valState, pbdata);
+  return checkResponseMatches(shard->d_map, range, queryPacket, qname, *qtype, *qclass, now, responsePacket, age, valState, pbdata);
 }
 
 void RecursorPacketCache::insertResponsePacket(unsigned int tag, uint32_t qhash, std::string&& query, const DNSName& qname, uint16_t qtype, uint16_t qclass, std::string&& responsePacket, time_t now, uint32_t ttl, const vState& valState, OptPBData&& pbdata, bool tcp)
 {
-  auto& idx = d_packetCache.get<HashTag>();
+  auto& map = getMap(tag, qhash, tcp);
+  auto shard = map.lock();
+  auto& idx = shard->d_map.get<HashTag>();
   auto range = idx.equal_range(std::tie(tag, qhash, tcp));
   auto iter = range.first;
 
@@ -147,7 +154,7 @@ void RecursorPacketCache::insertResponsePacket(unsigned int tag, uint32_t qhash,
       continue;
     }
 
-    moveCacheItemToBack<SequencedTag>(d_packetCache, iter);
+    moveCacheItemToBack<SequencedTag>(shard->d_map, iter);
     iter->d_packet = std::move(responsePacket);
     iter->d_query = std::move(query);
     iter->d_ttd = now + ttl;
@@ -166,26 +173,33 @@ void RecursorPacketCache::insertResponsePacket(unsigned int tag, uint32_t qhash,
     entry.d_pbdata = std::move(*pbdata);
   }
 
-  d_packetCache.insert(entry);
+  shard->d_map.insert(entry);
+  map.d_entriesCount++;
 
-  if (d_packetCache.size() > d_maxSize) {
-    auto& seq_idx = d_packetCache.get<SequencedTag>();
+  if (shard->d_map.size() > d_maxSize / d_maps.size()) {
+    auto& seq_idx = shard->d_map.get<SequencedTag>();
     seq_idx.erase(seq_idx.begin());
+    map.d_entriesCount--;
   }
+  assert(map.d_entriesCount == shard->d_map.size()); // XXX
 }
 
-uint64_t RecursorPacketCache::bytes() const
+uint64_t RecursorPacketCache::bytes()
 {
   uint64_t sum = 0;
-  for (const auto& entry : d_packetCache) {
-    sum += sizeof(entry) + entry.d_packet.length() + 4;
+  for (auto& shard : d_maps) {
+    auto lock = shard.lock();
+    for (const auto& entry : lock->d_map) {
+      sum += sizeof(entry) + entry.d_packet.length() + 4;
+    }
   }
   return sum;
 }
 
 void RecursorPacketCache::doPruneTo(size_t maxSize)
 {
-  pruneCollection<SequencedTag>(d_packetCache, maxSize);
+  size_t cacheSize = size();
+  pruneMutexCollectionsVector<SequencedTag>(*this, d_maps, maxSize, cacheSize);
 }
 
 uint64_t RecursorPacketCache::doDump(int file)
@@ -200,20 +214,31 @@ uint64_t RecursorPacketCache::doDump(int file)
     return 0;
   }
 
-  fprintf(filePtr.get(), "; main packet cache dump from thread follows\n;\n");
-
-  const auto& sidx = d_packetCache.get<SequencedTag>();
   uint64_t count = 0;
   time_t now = time(nullptr);
 
+  size_t shardNum = 0;
+  size_t min = std::numeric_limits<size_t>::max();
+  size_t max = 0;
+
+  for (auto& shard : d_maps) {
+    auto lock = shard.lock();
+    const auto& sidx = lock->d_map.get<SequencedTag>();
+    const auto shardSize = lock->d_map.size();
+    fprintf(filePtr.get(), "; packetcache shard %zu; size %zu\n", shardNum, shardSize);
+    min = std::min(min, shardSize);
+    max = std::max(max, shardSize);
+    shardNum++;
   for (const auto& entry : sidx) {
-    count++;
-    try {
-      fprintf(filePtr.get(), "%s %" PRId64 " %s  ; tag %d %s\n", entry.d_name.toString().c_str(), static_cast<int64_t>(entry.d_ttd - now), DNSRecordContent::NumberToType(entry.d_type).c_str(), entry.d_tag, entry.d_tcp ? "tcp" : "udp");
-    }
-    catch (...) {
-      fprintf(filePtr.get(), "; error printing '%s'\n", entry.d_name.empty() ? "EMPTY" : entry.d_name.toString().c_str());
+      count++;
+      try {
+        fprintf(filePtr.get(), "%s %" PRId64 " %s  ; tag %d %s\n", entry.d_name.toString().c_str(), static_cast<int64_t>(entry.d_ttd - now), DNSRecordContent::NumberToType(entry.d_type).c_str(), entry.d_tag, entry.d_tcp ? "tcp" : "udp");
+      }
+      catch (...) {
+        fprintf(filePtr.get(), "; error printing '%s'\n", entry.d_name.empty() ? "EMPTY" : entry.d_name.toString().c_str());
+      }
     }
   }
+  fprintf(filePtr.get(), "; packetcache size: %" PRIu64 "/%zu shards: %zu min/max shard size: %zu/%zu\n", size(), d_maxSize, d_maps.size(), min, max);
   return count;
 }
index b0a344db9833780783e3f91147dd8b2b8275b79d..72f58d526ac62660d105bf28297fed14ddbf9ba8 100644 (file)
@@ -34,6 +34,8 @@
 
 #include "packetcache.hh"
 #include "validate.hh"
+#include "lock.hh"
+#include "stat_t.hh"
 
 #ifdef HAVE_CONFIG_H
 #include "config.h"
 
 using namespace ::boost::multi_index;
 
-//! Stores whole packets, ready for lobbing back at the client. Not threadsafe.
-/* Note: we store answers as value AND KEY, and with careful work, we make sure that
-   you can use a query as a key too. But query and answer must compare as identical!
-
-   This precludes doing anything smart with EDNS directly from the packet */
 class RecursorPacketCache : public PacketCache
 {
 public:
@@ -59,9 +56,13 @@ public:
   };
   using OptPBData = boost::optional<PBData>;
 
-  RecursorPacketCache(size_t maxsize) :
+  RecursorPacketCache(size_t maxsize, size_t shards = 1024) :
+    d_maps(shards),
     d_maxSize(maxsize)
   {
+    if (d_maxSize / d_maps.size() == 0) {
+      d_maxSize = d_maps.size();
+    }
   }
 
   bool getResponsePacket(unsigned int tag, const std::string& queryPacket, time_t now,
@@ -87,7 +88,7 @@ public:
   void insertResponsePacket(unsigned int tag, uint32_t qhash, std::string&& query, const DNSName& qname, uint16_t qtype, uint16_t qclass, std::string&& responsePacket, time_t now, uint32_t ttl, const vState& valState, OptPBData&& pbdata, bool tcp);
   void doPruneTo(size_t maxSize = 250000);
   uint64_t doDump(int file);
-  int doWipePacketCache(const DNSName& name, uint16_t qtype = 0xffff, bool subtree = false);
+  uint64_t doWipePacketCache(const DNSName& name, uint16_t qtype = 0xffff, bool subtree = false);
 
   void setMaxSize(size_t size)
   {
@@ -96,9 +97,13 @@ public:
 
   [[nodiscard]] uint64_t size() const
   {
-    return d_packetCache.size();
+    uint64_t count = 0;
+    for (const auto& map : d_maps) {
+      count += map.d_entriesCount;
+    }
+    return count;
   }
-  [[nodiscard]] uint64_t bytes() const;
+  [[nodiscard]] uint64_t bytes();
 
   uint64_t d_hits{0};
   uint64_t d_misses{0};
@@ -156,9 +161,61 @@ private:
                                                          sequenced<tag<SequencedTag>>,
                                                          ordered_non_unique<tag<NameTag>, member<Entry, DNSName, &Entry::d_name>, CanonDNSNameCompare>>>;
 
-  packetCache_t d_packetCache;
+  struct MapCombo
+  {
+    MapCombo() {}
+    MapCombo(const MapCombo&) = delete;
+    MapCombo& operator=(const MapCombo&) = delete;
+    struct LockedContent
+    {
+      packetCache_t d_map;
+      uint64_t d_contended_count{0};
+      uint64_t d_acquired_count{0};
+      void invalidate() {}
+    };
+    pdns::stat_t d_entriesCount{0};
+
+    LockGuardedTryHolder<MapCombo::LockedContent> lock()
+    {
+      auto locked = d_content.try_lock();
+      if (!locked.owns_lock()) {
+        locked.lock();
+        ++locked->d_contended_count;
+      }
+      ++locked->d_acquired_count;
+      return locked;
+    }
+
+  private:
+    LockGuarded<LockedContent> d_content;
+  };
+
+  vector<MapCombo> d_maps;
+
+  size_t combine(unsigned int tag, uint32_t hash, bool tcp) const
+  {
+    size_t ret = 0;
+    boost::hash_combine(ret, tag);
+    boost::hash_combine(ret, hash);
+    boost::hash_combine(ret, tcp);
+    return ret;
+  }
+
+  MapCombo& getMap(unsigned int tag, uint32_t hash, bool tcp)
+  {
+    return d_maps.at(combine(tag, hash, tcp) % d_maps.size());
+  }
+  // const MapCombo& getMap(unsigned int tag, uint32_t hash, bool tcp) const
+  // {
+  //   return d_maps.at(combine(hash, hash, tcp) % d_maps.size());
+  // }
+
   size_t d_maxSize;
 
   static bool qrMatch(const packetCache_t::index<HashTag>::type::iterator& iter, const std::string& queryPacket, const DNSName& qname, uint16_t qtype, uint16_t qclass);
-  bool checkResponseMatches(std::pair<packetCache_t::index<HashTag>::type::iterator, packetCache_t::index<HashTag>::type::iterator> range, const std::string& queryPacket, const DNSName& qname, uint16_t qtype, uint16_t qclass, time_t now, std::string* responsePacket, uint32_t* age, vState* valState, OptPBData* pbdata);
+  bool checkResponseMatches(packetCache_t& shard, std::pair<packetCache_t::index<HashTag>::type::iterator, packetCache_t::index<HashTag>::type::iterator> range, const std::string& queryPacket, const DNSName& qname, uint16_t qtype, uint16_t qclass, time_t now, std::string* responsePacket, uint32_t* age, vState* valState, OptPBData* pbdata);
+public:
+  void preRemoval(MapCombo::LockedContent& map, const Entry& entry)
+  {
+  }
 };