]> git.ipfire.org Git - thirdparty/pdns.git/commitdiff
Cleanup of PB encoding, including logging and some meta info
authorOtto Moerbeek <otto.moerbeek@open-xchange.com>
Mon, 14 Oct 2024 09:29:17 +0000 (11:29 +0200)
committerOtto Moerbeek <otto.moerbeek@open-xchange.com>
Thu, 24 Oct 2024 09:23:06 +0000 (11:23 +0200)
pdns/recursordist/Makefile.am
pdns/recursordist/lua-recursor4.cc
pdns/recursordist/recursor_cache.cc
pdns/recursordist/recursor_cache.hh

index 0d525767a66ef5032c11de0b51985c1530bf41c2..fe71369ee729e4c4fb1a9ccec6001c1dd581bde3 100644 (file)
@@ -401,6 +401,7 @@ testrunner_SOURCES = \
        unix_utility.cc \
        validate-recursor.cc validate-recursor.hh \
        validate.cc validate.hh \
+       version.cc version.hh \
        zonemd.cc zonemd.hh \
        zoneparser-tng.cc zoneparser-tng.hh
 
index 92f8197ab2b2b12e7dd5370a40ab1e79e962f808..7f6d8bb87e27377ef0e0b95613801bee6a747a71 100644 (file)
@@ -494,9 +494,9 @@ void RecursorLua4::postPrepareContext() // NOLINT(readability-function-cognitive
     }
   });
 
-  d_lw->writeFunction("getRecordCacheRecords", [](size_t howmany) {
+  d_lw->writeFunction("getRecordCacheRecords", [](size_t howmany, size_t maxsize) {
     std::string ret;
-    g_recCache->getRecords(howmany, ret);
+    g_recCache->getRecords(howmany, maxsize, ret);
     return ret;
   });
 
index 9b249600e1a7c010bca1cdffdd1a93d19783a5ff..b0b8dfd2742d629362c17d80e73fd82c4078e2cd 100644 (file)
@@ -12,6 +12,7 @@
 #include "namespaces.hh"
 #include "cachecleaner.hh"
 #include "rec-taskqueue.hh"
+#include "version.hh"
 #include <protozero/pbf_builder.hpp>
 #include <protozero/pbf_message.hpp>
 
@@ -547,25 +548,22 @@ bool MemRecursorCache::CacheEntry::shouldReplace(time_t now, bool auth, vState s
   return true;
 }
 
-void MemRecursorCache::replace(CacheEntry&& entry)
+bool MemRecursorCache::replace(CacheEntry&& entry)
 {
   if (!entry.d_netmask.empty()) {
     // We don't handle that yet
-    return;
+    return false;
   }
   auto& shard = getMap(entry.d_qname);
   auto lockedShard = shard.lock();
 
   lockedShard->d_cachecachevalid = false;
-
-  auto key = std::tuple(entry.d_qname, entry.d_qtype.getCode(), boost::none, Netmask());
-  cache_t::iterator stored = lockedShard->d_map.find(key);
-  if (stored == lockedShard->d_map.end()) {
-    entry.d_submitted = false;
-    entry.d_servedStale = 0;
-    lockedShard->d_map.insert(entry);
+  entry.d_submitted = false;
+  if (lockedShard->d_map.insert(entry).second) {
     shard.incEntriesCount();
+    return true;
   }
+  return false;
 }
 
 void MemRecursorCache::replace(time_t now, const DNSName& qname, const QType qtype, const vector<DNSRecord>& content, const vector<shared_ptr<const RRSIGRecordContent>>& signatures, const std::vector<std::shared_ptr<DNSRecord>>& authorityRecs, bool auth, const DNSName& authZone, boost::optional<Netmask> ednsmask, const OptTag& routingTag, vState state, boost::optional<ComboAddress> from, bool refresh, time_t ttl_time)
@@ -878,7 +876,15 @@ void MemRecursorCache::doPrune(time_t now, size_t keep)
   pruneMutexCollectionsVector<SequencedTag>(now, d_maps, keep, cacheSize);
 }
 
-  
+enum class PBCacheDump : protozero::pbf_tag_type
+{
+  required_string_version = 1,
+  required_string_identity = 2,
+  required_uint64_protocolVersion = 3,
+  required_int64_time = 4,
+  repeated_message_cacheEntry = 5,
+};
+
 enum class PBCacheEntry : protozero::pbf_tag_type
 {
   repeated_bytes_record = 1,
@@ -886,7 +892,7 @@ enum class PBCacheEntry : protozero::pbf_tag_type
   repeated_message_authRecord = 3,
   required_bytes_name = 4,
   required_bytes_authZone = 5,
-  required_bytes_from = 6,
+  required_message_from = 6,
   optional_bytes_netmask = 7,
   optional_bytes_rtag = 8,
   required_uint32_state = 9,
@@ -899,6 +905,12 @@ enum class PBCacheEntry : protozero::pbf_tag_type
   required_bool_tooBig = 16,
 };
 
+enum class PBComboAddress : protozero::pbf_tag_type
+{
+  required_uint32_port = 1,
+  required_bytes_address = 2, // family implicit
+};
+
 enum class PBAuthRecord : protozero::pbf_tag_type
 {
   required_bytes_name = 1,
@@ -913,22 +925,33 @@ enum class PBAuthRecord : protozero::pbf_tag_type
 template <typename T>
 static void encodeComboAddress(protozero::pbf_builder<T>& writer, T type, const ComboAddress& address)
 {
+  protozero::pbf_builder<PBComboAddress> message(writer, type);
+
+  // Skip all parts except address and port
+  message.add_uint32(PBComboAddress::required_uint32_port, address.getPort());
   if (address.sin4.sin_family == AF_INET) {
-    writer.add_bytes(type, reinterpret_cast<const char*>(&address.sin4.sin_addr.s_addr), sizeof(address.sin4.sin_addr.s_addr)); // NOLINT(cppcoreguidelines-pro-type-reinterpret-cast): it's the API
+    message.add_bytes(PBComboAddress::required_bytes_address, reinterpret_cast<const char*>(&address.sin4.sin_addr.s_addr), sizeof(address.sin4.sin_addr.s_addr)); // NOLINT(cppcoreguidelines-pro-type-reinterpret-cast): it's the API
   }
   else if (address.sin4.sin_family == AF_INET6) {
-    writer.add_bytes(type, reinterpret_cast<const char*>(&address.sin6.sin6_addr.s6_addr), sizeof(address.sin6.sin6_addr)); // NOLINT(cppcoreguidelines-pro-type-reinterpret-cast): it's the API
+    message.add_bytes(PBComboAddress::required_bytes_address, reinterpret_cast<const char*>(&address.sin6.sin6_addr.s6_addr), sizeof(address.sin6.sin6_addr)); // NOLINT(cppcoreguidelines-pro-type-reinterpret-cast): it's the API
   }
 }
 
 template <typename T>
-static void decodeComboAddress(protozero::pbf_message<T>& message, ComboAddress& address)
+static void decodeComboAddress(protozero::pbf_message<T>& reader, ComboAddress& address)
 {
-  // Skip port and other parts
-  auto data = message.get_bytes();
   address.reset();
-  address.sin4.sin_family = data.size() == 4 ? AF_INET : AF_INET6;
-  memcpy(&address.sin4.sin_addr, data.data(), data.size());
+  protozero::pbf_message<PBComboAddress> message(reader.get_message());
+
+  // Skip all parts except address and port
+  if (message.next(PBComboAddress::required_uint32_port)) {
+    address.setPort(message.get_uint32());
+  }
+  if (message.next(PBComboAddress::required_bytes_address)) {
+    auto data = message.get_bytes();
+    address.sin4.sin_family = data.size() == 4 ? AF_INET : AF_INET6;
+    memcpy(&address.sin4.sin_addr, data.data(), data.size());
+  }
 }
 
 template <typename T>
@@ -946,19 +969,29 @@ static void decodeNetmask(protozero::pbf_message<T>& message, Netmask& subnet)
   memcpy(&subnet, data.data(), data.size());
 }
 
-void MemRecursorCache::getRecords(size_t howmany, std::string& ret)
+void MemRecursorCache::getRecords(size_t howmany, size_t maxSize, std::string& ret)
 {
-  protozero::pbf_writer full(ret);
-  const auto perShard = howmany / d_maps.size();
+  auto log = g_slog->withName("recordcache")->withValues("howmany", Logging::Loggable(howmany), "maxSize", Logging::Loggable(maxSize));
+  log->info(Logr::Info, "Producing cache dump");
+
+  protozero::pbf_builder<PBCacheDump> full(ret);
+  full.add_string(PBCacheDump::required_string_version, getPDNSVersion());
+  full.add_string(PBCacheDump::required_string_identity, SyncRes::s_serverID);
+  full.add_uint64(PBCacheDump::required_uint64_protocolVersion, 1);
+  full.add_int64(PBCacheDump::required_int64_time, time(nullptr));
+
+  const auto perShard = (howmany + d_maps.size() - 1) / d_maps.size(); // at least 1 per shard
   size_t count = 0;
+  ret.reserve(maxSize + 4096); // We may overshoot (will be rolled back)
+
   for (auto& shard : d_maps) {
     auto lockedShard = shard.lock();
     const auto& sidx = lockedShard->d_map.get<SequencedTag>();
     size_t thisShardCount = 0;
     string buf;
     for (const auto& recordSet : sidx) {
-      protozero::pbf_builder<PBCacheEntry> message(full, 1);
-      //cerr << "S " << count << ' ' << recordSet.d_qname << ' ' << QType(recordSet.d_qtype) << ' ' << recordSet.d_auth << ' ' << recordSet.d_records.size() << endl;
+      protozero::pbf_builder<PBCacheEntry> message(full, PBCacheDump::repeated_message_cacheEntry);
+      cerr << "S " << count << ' ' << recordSet.d_qname << ' ' << QType(recordSet.d_qtype) << ' ' << recordSet.d_auth << ' ' << recordSet.d_records.size() << endl;
       // Two fields below must come before the other fields
       message.add_bytes(PBCacheEntry::required_bytes_name, recordSet.d_qname.toString());
       message.add_uint32(PBCacheEntry::required_uint32_qtype, recordSet.d_qtype);
@@ -979,7 +1012,7 @@ void MemRecursorCache::getRecords(size_t howmany, std::string& ret)
         auth.add_uint32(PBAuthRecord::required_uint32_clen, authRec->d_clen);
       }
       message.add_bytes(PBCacheEntry::required_bytes_authZone, recordSet.d_authZone.toString());
-      encodeComboAddress(message, PBCacheEntry::required_bytes_from, recordSet.d_from);
+      encodeComboAddress(message, PBCacheEntry::required_message_from, recordSet.d_from);
       encodeNetmask(message, PBCacheEntry::optional_bytes_netmask, recordSet.d_netmask);
       if (recordSet.d_rtag) {
         message.add_bytes(PBCacheEntry::optional_bytes_rtag, *recordSet.d_rtag);
@@ -991,136 +1024,180 @@ void MemRecursorCache::getRecords(size_t howmany, std::string& ret)
       message.add_bool(PBCacheEntry::required_bool_auth, recordSet.d_auth);
       message.add_bool(PBCacheEntry::required_bool_submitted, recordSet.d_submitted);
       message.add_bool(PBCacheEntry::required_bool_tooBig, recordSet.d_tooBig);
-      ++thisShardCount;
+      if (ret.size() > maxSize) {
+        message.rollback();
+        log->info(Logr::Info, "Produced cache dump (max size reached)", "size", Logging::Loggable(ret.size()), "count", Logging::Loggable(count));
+        return;
+      }
       ++count;
-      if (thisShardCount > perShard) {
-        break;
+      if (count >= howmany) {
+        message.commit();
+        log->info(Logr::Info, "Produced cache dump (howmany reached)", "size", Logging::Loggable(ret.size()), "count", Logging::Loggable(count));
+        return;
       }
-      if (count > howmany) {
+      ++thisShardCount;
+      if (thisShardCount >= perShard) {
         break;
       }
     }
-    if (count > howmany) {
-      break;
-    }
   }
-  cerr << "Constructed a string of length " << ret.size() << " for " << count << " record sets (requested " << howmany << ')' << endl;
+log->info(Logr::Info, "Produced cache dump", "size", Logging::Loggable(ret.size()), "count", Logging::Loggable(count));
 }
 
-void MemRecursorCache::putRecords(const std::string& pbuf)
+
+template <typename T>
+bool MemRecursorCache::putRecord(T& message)
 {
-  cerr << "Received a PB string of length " << pbuf.size() << endl;
-  protozero::pbf_reader full(pbuf);
-  size_t count = 0;
-  try {
-    while (full.next(1)) {
-      protozero::pbf_message<PBCacheEntry> message = full.get_message();
-      CacheEntry cacheEntry{{g_rootdnsname, QType::A, boost::none, Netmask()}, false};
-      while (message.next()) {
-        switch (message.tag()) {
-        case PBCacheEntry::repeated_bytes_record: {
-          auto ptr = DNSRecordContent::deserialize(cacheEntry.d_qname, cacheEntry.d_qtype, message.get_bytes());
-          cacheEntry.d_records.emplace_back(ptr);
+  CacheEntry cacheEntry{{g_rootdnsname, QType::A, boost::none, Netmask()}, false};
+  while (message.next()) {
+    switch (message.tag()) {
+    case PBCacheEntry::repeated_bytes_record: {
+      auto ptr = DNSRecordContent::deserialize(cacheEntry.d_qname, cacheEntry.d_qtype, message.get_bytes());
+      cacheEntry.d_records.emplace_back(ptr);
+      break;
+    }
+    case PBCacheEntry::repeated_bytes_sig: {
+      auto ptr = DNSRecordContent::deserialize(cacheEntry.d_qname, QType::RRSIG, message.get_bytes());
+      cacheEntry.d_signatures.emplace_back(std::dynamic_pointer_cast<RRSIGRecordContent>(ptr));
+      break;
+    }
+    case PBCacheEntry::repeated_message_authRecord: {
+      protozero::pbf_message<PBAuthRecord> auth = message.get_message();
+      DNSRecord authRecord;
+      while (auth.next()) {
+        switch (auth.tag()) {
+        case PBAuthRecord::required_bytes_name:
+          authRecord.d_name = DNSName(auth.get_bytes());
           break;
-        }
-        case PBCacheEntry::repeated_bytes_sig: {
-          auto ptr = DNSRecordContent::deserialize(cacheEntry.d_qname, QType::RRSIG, message.get_bytes());
-          cacheEntry.d_signatures.emplace_back(std::dynamic_pointer_cast<RRSIGRecordContent>(ptr));
+        case PBAuthRecord::required_bytes_rdata: {
+          auto ptr = DNSRecordContent::deserialize(cacheEntry.d_qname, authRecord.d_type, auth.get_bytes());
+          authRecord.setContent(ptr);
           break;
         }
-        case PBCacheEntry::repeated_message_authRecord: {
-          protozero::pbf_message<PBAuthRecord> auth = message.get_message();
-          DNSRecord authRecord;
-          while (auth.next()) {
-            switch (auth.tag()) {
-            case PBAuthRecord::required_bytes_name:
-              authRecord.d_name = DNSName(auth.get_bytes());
-              break;
-            case PBAuthRecord::required_bytes_rdata: {
-              auto ptr = DNSRecordContent::deserialize(cacheEntry.d_qname, authRecord.d_type, auth.get_bytes());
-              authRecord.setContent(ptr);
-              break;
-            }
-            case PBAuthRecord::required_uint32_class:
-              authRecord.d_class = auth.get_uint32();
-              break;
-            case PBAuthRecord::required_uint32_type:
-              authRecord.d_type = auth.get_uint32();
-              break;
-            case PBAuthRecord::required_uint32_ttl:
-              authRecord.d_ttl = auth.get_uint32();
-              break;
-            case PBAuthRecord::required_uint32_place:
-              authRecord.d_place = static_cast<DNSResourceRecord::Place>(auth.get_uint32());
-              break;
-            case PBAuthRecord::required_uint32_clen:
-              authRecord.d_clen = auth.get_uint32();
-              break;
-            }
-          }
-          cacheEntry.d_authorityRecs.emplace_back(std::make_shared<DNSRecord>(authRecord));
+        case PBAuthRecord::required_uint32_class:
+          authRecord.d_class = auth.get_uint32();
           break;
-        }
-        case PBCacheEntry::required_bytes_name:
-          cacheEntry.d_qname = DNSName(message.get_bytes());
+        case PBAuthRecord::required_uint32_type:
+          authRecord.d_type = auth.get_uint32();
           break;
-        case PBCacheEntry::required_bytes_authZone:
-          cacheEntry.d_authZone = DNSName(message.get_bytes());
+        case PBAuthRecord::required_uint32_ttl:
+          authRecord.d_ttl = auth.get_uint32();
           break;
-        case PBCacheEntry::required_bytes_from:
-          decodeComboAddress(message, cacheEntry.d_from);
+        case PBAuthRecord::required_uint32_place:
+          authRecord.d_place = static_cast<DNSResourceRecord::Place>(auth.get_uint32());
           break;
-        case PBCacheEntry::optional_bytes_netmask: {
-          decodeNetmask(message, cacheEntry.d_netmask);
+        case PBAuthRecord::required_uint32_clen:
+          authRecord.d_clen = auth.get_uint32();
           break;
         }
-        case PBCacheEntry::optional_bytes_rtag:
-          cacheEntry.d_rtag = message.get_bytes();
-          break;
-        case PBCacheEntry::required_uint32_state:
-          cacheEntry.d_state = static_cast<vState>(message.get_uint32());
-          break;
-        case PBCacheEntry::required_int64_ttd:
-          cacheEntry.d_ttd = message.get_int64();
-          break;
-        case PBCacheEntry::required_uint32_orig_ttl:
-          cacheEntry.d_orig_ttl = message.get_uint32();
-          break;
-        case PBCacheEntry::required_uint32_servedStale:
-          cacheEntry.d_servedStale = message.get_uint32();
-          break;
-        case PBCacheEntry::required_uint32_qtype:
-          cacheEntry.d_qtype = message.get_uint32();
-          break;
-        case PBCacheEntry::required_bool_auth:
-          cacheEntry.d_auth = message.get_bool();
-          break;
-        case PBCacheEntry::required_bool_submitted:
-          cacheEntry.d_submitted = message.get_bool(); // XXX
-          break;
-        case PBCacheEntry::required_bool_tooBig:
-          cacheEntry.d_tooBig = message.get_bool();
-          break;
-        default:
-          message.skip();
-          break;
+      }
+      cacheEntry.d_authorityRecs.emplace_back(std::make_shared<DNSRecord>(authRecord));
+      break;
+    }
+    case PBCacheEntry::required_bytes_name:
+      cacheEntry.d_qname = DNSName(message.get_bytes());
+      break;
+    case PBCacheEntry::required_bytes_authZone:
+      cacheEntry.d_authZone = DNSName(message.get_bytes());
+      break;
+    case PBCacheEntry::required_message_from:
+      decodeComboAddress(message, cacheEntry.d_from);
+      break;
+    case PBCacheEntry::optional_bytes_netmask: {
+      decodeNetmask(message, cacheEntry.d_netmask);
+      break;
+    }
+    case PBCacheEntry::optional_bytes_rtag:
+      cacheEntry.d_rtag = message.get_bytes();
+      break;
+    case PBCacheEntry::required_uint32_state:
+      cacheEntry.d_state = static_cast<vState>(message.get_uint32());
+      break;
+    case PBCacheEntry::required_int64_ttd:
+      cacheEntry.d_ttd = message.get_int64();
+      break;
+    case PBCacheEntry::required_uint32_orig_ttl:
+      cacheEntry.d_orig_ttl = message.get_uint32();
+      break;
+    case PBCacheEntry::required_uint32_servedStale:
+      cacheEntry.d_servedStale = message.get_uint32();
+      break;
+    case PBCacheEntry::required_uint32_qtype:
+      cacheEntry.d_qtype = message.get_uint32();
+      break;
+    case PBCacheEntry::required_bool_auth:
+      cacheEntry.d_auth = message.get_bool();
+      break;
+    case PBCacheEntry::required_bool_submitted:
+      cacheEntry.d_submitted = message.get_bool();
+      cacheEntry.d_submitted = false; // actually not
+      break;
+    case PBCacheEntry::required_bool_tooBig:
+      cacheEntry.d_tooBig = message.get_bool();
+      break;
+    default:
+      message.skip();
+      break;
+    }
+  }
+  return g_recCache->replace(std::move(cacheEntry));
+}
+
+void MemRecursorCache::putRecords(const std::string& pbuf)
+{
+  auto log = g_slog->withName("recordcache")->withValues("size", Logging::Loggable(pbuf.size()));
+  log->info(Logr::Debug, "Processing cache dump");
+
+  protozero::pbf_message<PBCacheDump> full(pbuf);
+  size_t count = 0;
+  size_t inserted = 0;
+  try {
+    while (full.next()) {
+      switch (full.tag()) {
+      case PBCacheDump::required_string_version: {
+        auto version = full.get_string();
+        log = log->withValues("version", Logging::Loggable(version));
+        break;
+      }
+      case PBCacheDump::required_string_identity: {
+        auto identity = full.get_string();
+        log = log->withValues("identity", Logging::Loggable(identity));
+        break;
+      }
+      case PBCacheDump::required_uint64_protocolVersion: {
+        auto protocolVersion = full.get_uint64();
+        log = log->withValues("protocolVersion", Logging::Loggable(protocolVersion));
+        if (protocolVersion != 1) {
+          throw std::runtime_error("Protocol version mismatch");
         }
+        break;
+      }
+      case PBCacheDump::required_int64_time: {
+        auto time = full.get_int64();
+        log = log->withValues("time", Logging::Loggable(time));
+        break;
+      }
+      case PBCacheDump::repeated_message_cacheEntry: {
+        protozero::pbf_message<PBCacheEntry> message = full.get_message();
+        if (putRecord(message)) {
+          ++inserted;
+        }
+        ++count;
+        break;
+      }
       }
-      //cerr << "I " << count << ' ' << cacheEntry.d_qname << ' ' << QType(cacheEntry.d_qtype) << ' ' << cacheEntry.d_auth << ' ' << cacheEntry.d_records.size() << endl;
-      g_recCache->replace(std::move(cacheEntry));
-      ++count;
     }
+    log->info(Logr::Info, "Processed cache dump", "processed", Logging::Loggable(count), "inserted", Logging::Loggable(inserted));
   }
   catch (const std::runtime_error& e) {
-    cerr << "runtime: " << e.what() << endl;
+    log->error(Logr::Error, e.what(), "Runtime exception processing cache dump");
   }
   catch (const std::exception& e) {
-    cerr << "exception: " << e.what() << endl;
+    log->error(Logr::Error, e.what(), "Exception processing cache dump");
   }
   catch (...) {
-    cerr << "other"  << endl;
+    log->error(Logr::Error, "Other exception processing cache dump");
   }
-  cerr << "Inserted " << count << " record sets" << endl;
 }
 
 namespace boost
index 06dbf9af73e3e729df43f1510512d7af74cef5c5..9a07c73b679ba3efddd5ccc22c71b299f7be9977 100644 (file)
@@ -64,7 +64,7 @@ public:
   [[nodiscard]] pair<uint64_t, uint64_t> stats();
   [[nodiscard]] size_t ecsIndexSize();
 
-  void getRecords(size_t howmany, std::string& ret);
+  void getRecords(size_t howmany, size_t maxsize, std::string& ret);
   void putRecords(const std::string& pbuf);
 
   using OptTag = boost::optional<std::string>;
@@ -154,7 +154,8 @@ private:
     bool d_tooBig{false};
   };
 
-  void replace(CacheEntry&& entry);
+  bool replace(CacheEntry&& entry);
+  template <typename T> bool putRecord(T&);
 
   /* The ECS Index (d_ecsIndex) keeps track of whether there is any ECS-specific
      entry for a given (qname,qtype) entry in the cache (d_map), and if so