]> git.ipfire.org Git - thirdparty/pdns.git/commitdiff
Basic functions to dump and undump record cache content
authorOtto Moerbeek <otto.moerbeek@open-xchange.com>
Fri, 11 Oct 2024 09:42:44 +0000 (11:42 +0200)
committerOtto Moerbeek <otto.moerbeek@open-xchange.com>
Thu, 24 Oct 2024 09:23:06 +0000 (11:23 +0200)
pdns/recursordist/lua-recursor4.cc
pdns/recursordist/rec-main.cc
pdns/recursordist/recursor_cache.cc
pdns/recursordist/recursor_cache.hh

index e1b9773a4d24ffc8688cd8652bff0a8eb1e986f2..92f8197ab2b2b12e7dd5370a40ab1e79e962f808 100644 (file)
@@ -493,6 +493,17 @@ void RecursorLua4::postPrepareContext() // NOLINT(readability-function-cognitive
       (*event.discardedPolicies)[policy] = true;
     }
   });
+
+  d_lw->writeFunction("getRecordCacheRecords", [](size_t howmany) {
+    std::string ret;
+    g_recCache->getRecords(howmany, ret);
+    return ret;
+  });
+
+  d_lw->writeFunction("putIntoRecordCache", [](const string& data) {
+    g_recCache->putRecords(data);
+  });
+
   if (!d_include_path.empty()) {
     includePath(d_include_path);
   }
index 8175042ce674ed0d529d41d8fff4fa994c899fc8..9854cfda245b80261639c28de7dcee19da9ac768 100644 (file)
@@ -807,6 +807,16 @@ static void checkSocketDir(Logr::log_t log)
   _exit(1);
 }
 
+static void setupNatsThread(Logr::log_t log)
+{
+  log->info(Logr::Info, "Starting nats thread");
+  std::thread thread([tid = std::this_thread::get_id()]() {
+    auto lua = std::make_shared<RecursorLua4>();
+    lua->loadFile("tmp/natsimpl.lua");
+  });
+  thread.detach();
+}
+
 #ifdef NOD_ENABLED
 static void setupNODThread(Logr::log_t log)
 {
@@ -2376,6 +2386,8 @@ static int serviceMain(Logr::log_t log)
   setupNODThread(log);
 #endif /* NOD_ENABLED */
 
+  setupNatsThread(log);
+  
   return RecThreadInfo::runThreads(log);
 }
 
index 28f8e2261e0ff8942994a410d03cb3a7ac876b5d..55fd685c6f4aacb62b2a854e99ba264148296d42 100644 (file)
@@ -13,6 +13,9 @@
 #include "namespaces.hh"
 #include "cachecleaner.hh"
 #include "rec-taskqueue.hh"
+#include "protozero.hh"
+#include <protozero/pbf_builder.hpp>
+#include <protozero/pbf_message.hpp>
 
 /*
  * SERVE-STALE: the general approach
@@ -546,6 +549,27 @@ bool MemRecursorCache::CacheEntry::shouldReplace(time_t now, bool auth, vState s
   return true;
 }
 
+void MemRecursorCache::replace(CacheEntry&& entry)
+{
+  if (!entry.d_netmask.empty()) {
+    // We don't handle that yet
+    return;
+  }
+  auto& shard = getMap(entry.d_qname);
+  auto lockedShard = shard.lock();
+
+  lockedShard->d_cachecachevalid = false;
+
+  auto key = std::tuple(entry.d_qname, entry.d_qtype.getCode(), boost::none, Netmask());
+  cache_t::iterator stored = lockedShard->d_map.find(key);
+  if (stored == lockedShard->d_map.end()) {
+    entry.d_submitted = false;
+    entry.d_servedStale = 0;
+    lockedShard->d_map.insert(entry);
+    shard.incEntriesCount();
+  }
+}
+
 void MemRecursorCache::replace(time_t now, const DNSName& qname, const QType qtype, const vector<DNSRecord>& content, const vector<shared_ptr<const RRSIGRecordContent>>& signatures, const std::vector<std::shared_ptr<DNSRecord>>& authorityRecs, bool auth, const DNSName& authZone, boost::optional<Netmask> ednsmask, const OptTag& routingTag, vState state, boost::optional<ComboAddress> from, bool refresh, time_t ttl_time)
 {
   auto& shard = getMap(qname);
@@ -856,6 +880,277 @@ void MemRecursorCache::doPrune(time_t now, size_t keep)
   pruneMutexCollectionsVector<SequencedTag>(now, d_maps, keep, cacheSize);
 }
 
+  
+enum class PBCacheEntry : protozero::pbf_tag_type
+{
+  repeated_message_record = 1,
+  repeated_message_sig = 2,
+  repeated_message_authRecord = 3,
+  required_bytes_name = 4,
+  required_bytes_authZone = 5,
+  required_bytes_from = 6,
+  optional_bytes_netmask = 7,
+  optional_bytes_rtag = 8,
+  required_uint32_state = 9,
+  required_int64_ttd = 10,
+  required_uint32_orig_ttl = 11,
+  required_uint32_servedStale = 12,
+  required_uint32_qtype = 13,
+  required_bool_auth = 14,
+  required_bool_submitted = 15,
+  required_bool_tooBig = 16,
+};
+
+enum class PBRecord : protozero::pbf_tag_type
+{
+  required_uint32_type = 1,
+  repeated_bytes_content = 2,
+};
+
+enum class PBAuthRecord : protozero::pbf_tag_type
+{
+  required_bytes_name = 1,
+  required_bytes_rdata = 2,
+  required_uint32_type = 3,
+  required_uint32_class = 4,
+  required_uint32_ttl = 5,
+  required_uint32_place = 6,
+  required_uint32_clen = 7,
+};
+
+void MemRecursorCache::getRecords(size_t howmany, std::string& ret)
+{
+  protozero::pbf_writer full(ret);
+  const auto perShard = howmany / d_maps.size();
+  size_t count = 0;
+  for (auto& shard : d_maps) {
+    auto lockedShard = shard.lock();
+    const auto& sidx = lockedShard->d_map.get<SequencedTag>();
+    size_t thisShardCount = 0;
+    string buf;
+    for (const auto& recordSet : sidx) {
+      protozero::pbf_builder<PBCacheEntry> message(full, 1);
+      //cerr << "S " << count << ' ' << recordSet.d_qname << ' ' << QType(recordSet.d_qtype) << ' ' << recordSet.d_auth << ' ' << recordSet.d_records.size() << endl;
+      message.add_bytes(PBCacheEntry::required_bytes_name, recordSet.d_qname.toString());
+      message.add_uint32(PBCacheEntry::required_uint32_qtype, recordSet.d_qtype);
+      for (const auto& record : recordSet.d_records) {
+        protozero::pbf_builder<PBRecord> recordMesg(message, PBCacheEntry::repeated_message_record);
+        recordMesg.add_uint32(PBRecord::required_uint32_type, record->getType());
+        recordMesg.add_bytes(PBRecord::repeated_bytes_content, record->serialize(recordSet.d_qname, true));
+      }
+      for (const auto& record : recordSet.d_signatures) {
+        protozero::pbf_builder<PBRecord> recordMesg(message, PBCacheEntry::repeated_message_sig);
+        recordMesg.add_uint32(PBRecord::required_uint32_type, record->getType());
+        recordMesg.add_bytes(PBRecord::repeated_bytes_content, record->serialize(recordSet.d_qname, true));
+      }
+      for (const auto& authRec : recordSet.d_authorityRecs) {
+        protozero::pbf_builder<PBAuthRecord> auth(message, PBCacheEntry::repeated_message_authRecord);
+        auth.add_bytes(PBAuthRecord::required_bytes_name, authRec->d_name.toString());
+        auth.add_bytes(PBAuthRecord::required_bytes_rdata, authRec->getContent()->serialize(authRec->d_name, true));
+        auth.add_uint32(PBAuthRecord::required_uint32_type, authRec->d_type);
+        auth.add_uint32(PBAuthRecord::required_uint32_class, authRec->d_class);
+        auth.add_uint32(PBAuthRecord::required_uint32_ttl, authRec->d_ttl);
+        auth.add_uint32(PBAuthRecord::required_uint32_place, authRec->d_place);
+        auth.add_uint32(PBAuthRecord::required_uint32_clen, authRec->d_clen);
+      }
+      message.add_bytes(PBCacheEntry::required_bytes_authZone, recordSet.d_authZone.toString());
+      message.add_bytes(PBCacheEntry::required_bytes_from, recordSet.d_from.toString());
+      if (!recordSet.d_netmask.empty()) {
+        message.add_bytes(PBCacheEntry::optional_bytes_netmask, recordSet.d_netmask.toString());
+      }
+      if (recordSet.d_rtag) {
+        message.add_bytes(PBCacheEntry::optional_bytes_rtag, *recordSet.d_rtag);
+      }
+      message.add_uint32(PBCacheEntry::required_uint32_state, static_cast<uint32_t>(recordSet.d_state));
+      message.add_int64(PBCacheEntry::required_int64_ttd, recordSet.d_ttd);
+      message.add_uint32(PBCacheEntry::required_uint32_orig_ttl, recordSet.d_orig_ttl);
+      message.add_uint32(PBCacheEntry::required_uint32_servedStale, recordSet.d_servedStale);
+      message.add_bool(PBCacheEntry::required_bool_auth, recordSet.d_auth);
+      message.add_bool(PBCacheEntry::required_bool_submitted, recordSet.d_submitted);
+      message.add_bool(PBCacheEntry::required_bool_tooBig, recordSet.d_tooBig);
+      ++thisShardCount;
+      ++count;
+      if (thisShardCount > perShard) {
+        break;
+      }
+      if (count > howmany) {
+        break;
+      }
+    }
+    if (count > howmany) {
+      break;
+    }
+  }
+  cerr << "Constructed a string of length " << ret.size() << " for " << count << " record sets (requested " << howmany << ')' << endl;
+}
+
+static void PBReadRecord(protozero::pbf_message<PBCacheEntry>& message, const DNSName& name, vector<std::shared_ptr<const DNSRecordContent>>& records)
+{
+  protozero::pbf_message<PBRecord> recordMesg = message.get_message();
+  shared_ptr<DNSRecordContent> ptr{nullptr};
+  unsigned int type = -1;
+  while (recordMesg.next()) {
+    switch (recordMesg.tag()) {
+    case PBRecord::required_uint32_type:
+      type = recordMesg.get_uint64();
+      break;
+    case PBRecord::repeated_bytes_content:
+      if (type == -1U) {
+      }
+      else {
+        ptr = DNSRecordContent::deserialize(name, type, recordMesg.get_bytes());
+      }
+      break;
+    }
+  }
+  if (ptr == nullptr) {
+    cerr << "NULL PTR!" << endl;
+  }
+  else {
+    records.emplace_back(ptr);
+  }
+}
+
+static void PBReadRecord(protozero::pbf_message<PBCacheEntry>& message, const DNSName& name, vector<std::shared_ptr<const RRSIGRecordContent>>& records)
+{
+  protozero::pbf_message<PBRecord> recordMesg = message.get_message();
+  shared_ptr<const DNSRecordContent> ptr{nullptr};
+  unsigned int type = -1;
+  while (recordMesg.next()) {
+    switch (recordMesg.tag()) {
+    case PBRecord::required_uint32_type:
+      type = recordMesg.get_uint64();
+      break;
+    case PBRecord::repeated_bytes_content:
+      if (type == -1U) {
+        cerr << "Type not seen before content" << endl;
+      } else {
+        ptr = DNSRecordContent::deserialize(name, type, recordMesg.get_bytes());
+      }
+      break;
+    }
+  }
+  if (ptr == nullptr) {
+    cerr << "NULL PTR!" << endl;
+  }
+  else {
+    records.emplace_back(std::dynamic_pointer_cast<const RRSIGRecordContent>(ptr));
+  }
+}
+
+void MemRecursorCache::putRecords(const std::string& pbuf)
+{
+  cerr << "Received a PB string of length " << pbuf.size() << endl;
+  protozero::pbf_reader full(pbuf);
+  size_t count = 0;
+  try {
+    while (full.next(1)) {
+      protozero::pbf_message<PBCacheEntry> message = full.get_message();
+      CacheEntry cacheEntry{{g_rootdnsname, QType::A, boost::none, Netmask()}, false};
+      while (message.next()) {
+        switch (message.tag()) {
+        case PBCacheEntry::repeated_message_record:
+          PBReadRecord(message, cacheEntry.d_qname, cacheEntry.d_records);
+          break;
+        case PBCacheEntry::repeated_message_sig:
+          PBReadRecord(message, cacheEntry.d_qname, cacheEntry.d_signatures);
+          break;
+        case PBCacheEntry::repeated_message_authRecord: {
+          protozero::pbf_message<PBAuthRecord> auth = message.get_message();
+          DNSRecord authRecord;
+          while (auth.next()) {
+            switch (auth.tag()) {
+            case PBAuthRecord::required_bytes_name:
+              authRecord.d_name = DNSName(auth.get_bytes());
+              break;
+            case PBAuthRecord::required_bytes_rdata: {
+              auto ptr = DNSRecordContent::deserialize(cacheEntry.d_qname, authRecord.d_type, auth.get_bytes());
+              authRecord.setContent(ptr);
+              break;
+            }
+            case PBAuthRecord::required_uint32_class:
+              authRecord.d_class = auth.get_uint32();
+              break;
+            case PBAuthRecord::required_uint32_type:
+              authRecord.d_type = auth.get_uint32();
+              break;
+            case PBAuthRecord::required_uint32_ttl:
+              authRecord.d_ttl = auth.get_uint32();
+              break;
+            case PBAuthRecord::required_uint32_place:
+              authRecord.d_place = static_cast<DNSResourceRecord::Place>(auth.get_uint32());
+              break;
+            case PBAuthRecord::required_uint32_clen:
+              authRecord.d_clen = auth.get_uint32();
+              break;
+            }
+          }
+          cacheEntry.d_authorityRecs.emplace_back(std::make_shared<DNSRecord>(authRecord));
+          break;
+        }
+        case PBCacheEntry::required_bytes_name:
+          cacheEntry.d_qname = DNSName(message.get_bytes());
+          break;
+        case PBCacheEntry::required_bytes_authZone:
+          cacheEntry.d_authZone = DNSName(message.get_bytes());
+          break;
+        case PBCacheEntry::required_bytes_from:
+          cacheEntry.d_from = ComboAddress(message.get_bytes());
+          break;
+        case PBCacheEntry::optional_bytes_netmask: {
+          auto netmask = message.get_bytes();
+          cacheEntry.d_netmask = netmask.empty() ? Netmask() : Netmask(netmask);
+          break;
+        }
+        case PBCacheEntry::optional_bytes_rtag:
+          cacheEntry.d_rtag = message.get_bytes();
+          break;
+        case PBCacheEntry::required_uint32_state:
+          cacheEntry.d_state = static_cast<vState>(message.get_uint32());
+          break;
+        case PBCacheEntry::required_int64_ttd:
+          cacheEntry.d_ttd = message.get_int64();
+          break;
+        case PBCacheEntry::required_uint32_orig_ttl:
+          cacheEntry.d_orig_ttl = message.get_uint32();
+          break;
+        case PBCacheEntry::required_uint32_servedStale:
+          cacheEntry.d_servedStale = message.get_uint32();
+          break;
+        case PBCacheEntry::required_uint32_qtype:
+          cacheEntry.d_qtype = message.get_uint32();
+          break;
+        case PBCacheEntry::required_bool_auth:
+          cacheEntry.d_auth = message.get_bool();
+          break;
+        case PBCacheEntry::required_bool_submitted:
+          cacheEntry.d_submitted = message.get_bool(); // XXX
+          break;
+        case PBCacheEntry::required_bool_tooBig:
+          cacheEntry.d_tooBig = message.get_bool();
+          break;
+        default:
+          message.skip();
+          break;
+        }
+      }
+      //cerr << "I " << count << ' ' << cacheEntry.d_qname << ' ' << QType(cacheEntry.d_qtype) << ' ' << cacheEntry.d_auth << ' ' << cacheEntry.d_records.size() << endl;
+      g_recCache->replace(std::move(cacheEntry));
+      ++count;
+    }
+  }
+  catch (const std::runtime_error& e) {
+    cerr << "runtime: " << e.what() << endl;
+  }
+  catch (const std::exception& e) {
+    cerr << "exception: " << e.what() << endl;
+  }
+  catch (...) {
+    cerr << "other"  << endl;
+  }
+  cerr << "Inserted " << count << " record sets" << endl;
+}
+
 namespace boost
 {
 size_t hash_value(const MemRecursorCache::OptTag& rtag)
index adf65ec6a8ba975faa81ef28f476cb59230d7b84..06dbf9af73e3e729df43f1510512d7af74cef5c5 100644 (file)
@@ -64,6 +64,9 @@ public:
   [[nodiscard]] pair<uint64_t, uint64_t> stats();
   [[nodiscard]] size_t ecsIndexSize();
 
+  void getRecords(size_t howmany, std::string& ret);
+  void putRecords(const std::string& pbuf);
+
   using OptTag = boost::optional<std::string>;
 
   using Flags = uint8_t;
@@ -113,6 +116,7 @@ private:
     {
     }
 
+
     using records_t = vector<std::shared_ptr<const DNSRecordContent>>;
 
     bool isStale(time_t now) const
@@ -150,6 +154,8 @@ private:
     bool d_tooBig{false};
   };
 
+  void replace(CacheEntry&& entry);
+
   /* The ECS Index (d_ecsIndex) keeps track of whether there is any ECS-specific
      entry for a given (qname,qtype) entry in the cache (d_map), and if so
      provides a NetmaskTree of those ECS entries.