From: Otto Moerbeek Date: Mon, 14 Oct 2024 09:29:17 +0000 (+0200) Subject: Cleanup of PB encoding, including logging and some meta info X-Git-Tag: rec-5.2.0-alpha1~12^2~11 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=7b8758e271db5f9bab66f9242a0d14ec33960985;p=thirdparty%2Fpdns.git Cleanup of PB encoding, including logging and some meta info --- diff --git a/pdns/recursordist/Makefile.am b/pdns/recursordist/Makefile.am index 0d525767a6..fe71369ee7 100644 --- a/pdns/recursordist/Makefile.am +++ b/pdns/recursordist/Makefile.am @@ -401,6 +401,7 @@ testrunner_SOURCES = \ unix_utility.cc \ validate-recursor.cc validate-recursor.hh \ validate.cc validate.hh \ + version.cc version.hh \ zonemd.cc zonemd.hh \ zoneparser-tng.cc zoneparser-tng.hh diff --git a/pdns/recursordist/lua-recursor4.cc b/pdns/recursordist/lua-recursor4.cc index 92f8197ab2..7f6d8bb87e 100644 --- a/pdns/recursordist/lua-recursor4.cc +++ b/pdns/recursordist/lua-recursor4.cc @@ -494,9 +494,9 @@ void RecursorLua4::postPrepareContext() // NOLINT(readability-function-cognitive } }); - d_lw->writeFunction("getRecordCacheRecords", [](size_t howmany) { + d_lw->writeFunction("getRecordCacheRecords", [](size_t howmany, size_t maxsize) { std::string ret; - g_recCache->getRecords(howmany, ret); + g_recCache->getRecords(howmany, maxsize, ret); return ret; }); diff --git a/pdns/recursordist/recursor_cache.cc b/pdns/recursordist/recursor_cache.cc index 9b249600e1..b0b8dfd274 100644 --- a/pdns/recursordist/recursor_cache.cc +++ b/pdns/recursordist/recursor_cache.cc @@ -12,6 +12,7 @@ #include "namespaces.hh" #include "cachecleaner.hh" #include "rec-taskqueue.hh" +#include "version.hh" #include #include @@ -547,25 +548,22 @@ bool MemRecursorCache::CacheEntry::shouldReplace(time_t now, bool auth, vState s return true; } -void MemRecursorCache::replace(CacheEntry&& entry) +bool MemRecursorCache::replace(CacheEntry&& entry) { if (!entry.d_netmask.empty()) { // We don't handle that yet - return; + return false; } auto& shard = getMap(entry.d_qname); auto lockedShard = shard.lock(); lockedShard->d_cachecachevalid = false; - - auto key = std::tuple(entry.d_qname, entry.d_qtype.getCode(), boost::none, Netmask()); - cache_t::iterator stored = lockedShard->d_map.find(key); - if (stored == lockedShard->d_map.end()) { - entry.d_submitted = false; - entry.d_servedStale = 0; - lockedShard->d_map.insert(entry); + entry.d_submitted = false; + if (lockedShard->d_map.insert(entry).second) { shard.incEntriesCount(); + return true; } + return false; } void MemRecursorCache::replace(time_t now, const DNSName& qname, const QType qtype, const vector& content, const vector>& signatures, const std::vector>& authorityRecs, bool auth, const DNSName& authZone, boost::optional ednsmask, const OptTag& routingTag, vState state, boost::optional from, bool refresh, time_t ttl_time) @@ -878,7 +876,15 @@ void MemRecursorCache::doPrune(time_t now, size_t keep) pruneMutexCollectionsVector(now, d_maps, keep, cacheSize); } - +enum class PBCacheDump : protozero::pbf_tag_type +{ + required_string_version = 1, + required_string_identity = 2, + required_uint64_protocolVersion = 3, + required_int64_time = 4, + repeated_message_cacheEntry = 5, +}; + enum class PBCacheEntry : protozero::pbf_tag_type { repeated_bytes_record = 1, @@ -886,7 +892,7 @@ enum class PBCacheEntry : protozero::pbf_tag_type repeated_message_authRecord = 3, required_bytes_name = 4, required_bytes_authZone = 5, - required_bytes_from = 6, + required_message_from = 6, optional_bytes_netmask = 7, optional_bytes_rtag = 8, required_uint32_state = 9, @@ -899,6 +905,12 @@ enum class PBCacheEntry : protozero::pbf_tag_type required_bool_tooBig = 16, }; +enum class PBComboAddress : protozero::pbf_tag_type +{ + required_uint32_port = 1, + required_bytes_address = 2, // family implicit +}; + enum class PBAuthRecord : protozero::pbf_tag_type { required_bytes_name = 1, @@ -913,22 +925,33 @@ enum class PBAuthRecord : protozero::pbf_tag_type template static void encodeComboAddress(protozero::pbf_builder& writer, T type, const ComboAddress& address) { + protozero::pbf_builder message(writer, type); + + // Skip all parts except address and port + message.add_uint32(PBComboAddress::required_uint32_port, address.getPort()); if (address.sin4.sin_family == AF_INET) { - writer.add_bytes(type, reinterpret_cast(&address.sin4.sin_addr.s_addr), sizeof(address.sin4.sin_addr.s_addr)); // NOLINT(cppcoreguidelines-pro-type-reinterpret-cast): it's the API + message.add_bytes(PBComboAddress::required_bytes_address, reinterpret_cast(&address.sin4.sin_addr.s_addr), sizeof(address.sin4.sin_addr.s_addr)); // NOLINT(cppcoreguidelines-pro-type-reinterpret-cast): it's the API } else if (address.sin4.sin_family == AF_INET6) { - writer.add_bytes(type, reinterpret_cast(&address.sin6.sin6_addr.s6_addr), sizeof(address.sin6.sin6_addr)); // NOLINT(cppcoreguidelines-pro-type-reinterpret-cast): it's the API + message.add_bytes(PBComboAddress::required_bytes_address, reinterpret_cast(&address.sin6.sin6_addr.s6_addr), sizeof(address.sin6.sin6_addr)); // NOLINT(cppcoreguidelines-pro-type-reinterpret-cast): it's the API } } template -static void decodeComboAddress(protozero::pbf_message& message, ComboAddress& address) +static void decodeComboAddress(protozero::pbf_message& reader, ComboAddress& address) { - // Skip port and other parts - auto data = message.get_bytes(); address.reset(); - address.sin4.sin_family = data.size() == 4 ? AF_INET : AF_INET6; - memcpy(&address.sin4.sin_addr, data.data(), data.size()); + protozero::pbf_message message(reader.get_message()); + + // Skip all parts except address and port + if (message.next(PBComboAddress::required_uint32_port)) { + address.setPort(message.get_uint32()); + } + if (message.next(PBComboAddress::required_bytes_address)) { + auto data = message.get_bytes(); + address.sin4.sin_family = data.size() == 4 ? AF_INET : AF_INET6; + memcpy(&address.sin4.sin_addr, data.data(), data.size()); + } } template @@ -946,19 +969,29 @@ static void decodeNetmask(protozero::pbf_message& message, Netmask& subnet) memcpy(&subnet, data.data(), data.size()); } -void MemRecursorCache::getRecords(size_t howmany, std::string& ret) +void MemRecursorCache::getRecords(size_t howmany, size_t maxSize, std::string& ret) { - protozero::pbf_writer full(ret); - const auto perShard = howmany / d_maps.size(); + auto log = g_slog->withName("recordcache")->withValues("howmany", Logging::Loggable(howmany), "maxSize", Logging::Loggable(maxSize)); + log->info(Logr::Info, "Producing cache dump"); + + protozero::pbf_builder full(ret); + full.add_string(PBCacheDump::required_string_version, getPDNSVersion()); + full.add_string(PBCacheDump::required_string_identity, SyncRes::s_serverID); + full.add_uint64(PBCacheDump::required_uint64_protocolVersion, 1); + full.add_int64(PBCacheDump::required_int64_time, time(nullptr)); + + const auto perShard = (howmany + d_maps.size() - 1) / d_maps.size(); // at least 1 per shard size_t count = 0; + ret.reserve(maxSize + 4096); // We may overshoot (will be rolled back) + for (auto& shard : d_maps) { auto lockedShard = shard.lock(); const auto& sidx = lockedShard->d_map.get(); size_t thisShardCount = 0; string buf; for (const auto& recordSet : sidx) { - protozero::pbf_builder message(full, 1); - //cerr << "S " << count << ' ' << recordSet.d_qname << ' ' << QType(recordSet.d_qtype) << ' ' << recordSet.d_auth << ' ' << recordSet.d_records.size() << endl; + protozero::pbf_builder message(full, PBCacheDump::repeated_message_cacheEntry); + cerr << "S " << count << ' ' << recordSet.d_qname << ' ' << QType(recordSet.d_qtype) << ' ' << recordSet.d_auth << ' ' << recordSet.d_records.size() << endl; // Two fields below must come before the other fields message.add_bytes(PBCacheEntry::required_bytes_name, recordSet.d_qname.toString()); message.add_uint32(PBCacheEntry::required_uint32_qtype, recordSet.d_qtype); @@ -979,7 +1012,7 @@ void MemRecursorCache::getRecords(size_t howmany, std::string& ret) auth.add_uint32(PBAuthRecord::required_uint32_clen, authRec->d_clen); } message.add_bytes(PBCacheEntry::required_bytes_authZone, recordSet.d_authZone.toString()); - encodeComboAddress(message, PBCacheEntry::required_bytes_from, recordSet.d_from); + encodeComboAddress(message, PBCacheEntry::required_message_from, recordSet.d_from); encodeNetmask(message, PBCacheEntry::optional_bytes_netmask, recordSet.d_netmask); if (recordSet.d_rtag) { message.add_bytes(PBCacheEntry::optional_bytes_rtag, *recordSet.d_rtag); @@ -991,136 +1024,180 @@ void MemRecursorCache::getRecords(size_t howmany, std::string& ret) message.add_bool(PBCacheEntry::required_bool_auth, recordSet.d_auth); message.add_bool(PBCacheEntry::required_bool_submitted, recordSet.d_submitted); message.add_bool(PBCacheEntry::required_bool_tooBig, recordSet.d_tooBig); - ++thisShardCount; + if (ret.size() > maxSize) { + message.rollback(); + log->info(Logr::Info, "Produced cache dump (max size reached)", "size", Logging::Loggable(ret.size()), "count", Logging::Loggable(count)); + return; + } ++count; - if (thisShardCount > perShard) { - break; + if (count >= howmany) { + message.commit(); + log->info(Logr::Info, "Produced cache dump (howmany reached)", "size", Logging::Loggable(ret.size()), "count", Logging::Loggable(count)); + return; } - if (count > howmany) { + ++thisShardCount; + if (thisShardCount >= perShard) { break; } } - if (count > howmany) { - break; - } } - cerr << "Constructed a string of length " << ret.size() << " for " << count << " record sets (requested " << howmany << ')' << endl; +log->info(Logr::Info, "Produced cache dump", "size", Logging::Loggable(ret.size()), "count", Logging::Loggable(count)); } -void MemRecursorCache::putRecords(const std::string& pbuf) + +template +bool MemRecursorCache::putRecord(T& message) { - cerr << "Received a PB string of length " << pbuf.size() << endl; - protozero::pbf_reader full(pbuf); - size_t count = 0; - try { - while (full.next(1)) { - protozero::pbf_message message = full.get_message(); - CacheEntry cacheEntry{{g_rootdnsname, QType::A, boost::none, Netmask()}, false}; - while (message.next()) { - switch (message.tag()) { - case PBCacheEntry::repeated_bytes_record: { - auto ptr = DNSRecordContent::deserialize(cacheEntry.d_qname, cacheEntry.d_qtype, message.get_bytes()); - cacheEntry.d_records.emplace_back(ptr); + CacheEntry cacheEntry{{g_rootdnsname, QType::A, boost::none, Netmask()}, false}; + while (message.next()) { + switch (message.tag()) { + case PBCacheEntry::repeated_bytes_record: { + auto ptr = DNSRecordContent::deserialize(cacheEntry.d_qname, cacheEntry.d_qtype, message.get_bytes()); + cacheEntry.d_records.emplace_back(ptr); + break; + } + case PBCacheEntry::repeated_bytes_sig: { + auto ptr = DNSRecordContent::deserialize(cacheEntry.d_qname, QType::RRSIG, message.get_bytes()); + cacheEntry.d_signatures.emplace_back(std::dynamic_pointer_cast(ptr)); + break; + } + case PBCacheEntry::repeated_message_authRecord: { + protozero::pbf_message auth = message.get_message(); + DNSRecord authRecord; + while (auth.next()) { + switch (auth.tag()) { + case PBAuthRecord::required_bytes_name: + authRecord.d_name = DNSName(auth.get_bytes()); break; - } - case PBCacheEntry::repeated_bytes_sig: { - auto ptr = DNSRecordContent::deserialize(cacheEntry.d_qname, QType::RRSIG, message.get_bytes()); - cacheEntry.d_signatures.emplace_back(std::dynamic_pointer_cast(ptr)); + case PBAuthRecord::required_bytes_rdata: { + auto ptr = DNSRecordContent::deserialize(cacheEntry.d_qname, authRecord.d_type, auth.get_bytes()); + authRecord.setContent(ptr); break; } - case PBCacheEntry::repeated_message_authRecord: { - protozero::pbf_message auth = message.get_message(); - DNSRecord authRecord; - while (auth.next()) { - switch (auth.tag()) { - case PBAuthRecord::required_bytes_name: - authRecord.d_name = DNSName(auth.get_bytes()); - break; - case PBAuthRecord::required_bytes_rdata: { - auto ptr = DNSRecordContent::deserialize(cacheEntry.d_qname, authRecord.d_type, auth.get_bytes()); - authRecord.setContent(ptr); - break; - } - case PBAuthRecord::required_uint32_class: - authRecord.d_class = auth.get_uint32(); - break; - case PBAuthRecord::required_uint32_type: - authRecord.d_type = auth.get_uint32(); - break; - case PBAuthRecord::required_uint32_ttl: - authRecord.d_ttl = auth.get_uint32(); - break; - case PBAuthRecord::required_uint32_place: - authRecord.d_place = static_cast(auth.get_uint32()); - break; - case PBAuthRecord::required_uint32_clen: - authRecord.d_clen = auth.get_uint32(); - break; - } - } - cacheEntry.d_authorityRecs.emplace_back(std::make_shared(authRecord)); + case PBAuthRecord::required_uint32_class: + authRecord.d_class = auth.get_uint32(); break; - } - case PBCacheEntry::required_bytes_name: - cacheEntry.d_qname = DNSName(message.get_bytes()); + case PBAuthRecord::required_uint32_type: + authRecord.d_type = auth.get_uint32(); break; - case PBCacheEntry::required_bytes_authZone: - cacheEntry.d_authZone = DNSName(message.get_bytes()); + case PBAuthRecord::required_uint32_ttl: + authRecord.d_ttl = auth.get_uint32(); break; - case PBCacheEntry::required_bytes_from: - decodeComboAddress(message, cacheEntry.d_from); + case PBAuthRecord::required_uint32_place: + authRecord.d_place = static_cast(auth.get_uint32()); break; - case PBCacheEntry::optional_bytes_netmask: { - decodeNetmask(message, cacheEntry.d_netmask); + case PBAuthRecord::required_uint32_clen: + authRecord.d_clen = auth.get_uint32(); break; } - case PBCacheEntry::optional_bytes_rtag: - cacheEntry.d_rtag = message.get_bytes(); - break; - case PBCacheEntry::required_uint32_state: - cacheEntry.d_state = static_cast(message.get_uint32()); - break; - case PBCacheEntry::required_int64_ttd: - cacheEntry.d_ttd = message.get_int64(); - break; - case PBCacheEntry::required_uint32_orig_ttl: - cacheEntry.d_orig_ttl = message.get_uint32(); - break; - case PBCacheEntry::required_uint32_servedStale: - cacheEntry.d_servedStale = message.get_uint32(); - break; - case PBCacheEntry::required_uint32_qtype: - cacheEntry.d_qtype = message.get_uint32(); - break; - case PBCacheEntry::required_bool_auth: - cacheEntry.d_auth = message.get_bool(); - break; - case PBCacheEntry::required_bool_submitted: - cacheEntry.d_submitted = message.get_bool(); // XXX - break; - case PBCacheEntry::required_bool_tooBig: - cacheEntry.d_tooBig = message.get_bool(); - break; - default: - message.skip(); - break; + } + cacheEntry.d_authorityRecs.emplace_back(std::make_shared(authRecord)); + break; + } + case PBCacheEntry::required_bytes_name: + cacheEntry.d_qname = DNSName(message.get_bytes()); + break; + case PBCacheEntry::required_bytes_authZone: + cacheEntry.d_authZone = DNSName(message.get_bytes()); + break; + case PBCacheEntry::required_message_from: + decodeComboAddress(message, cacheEntry.d_from); + break; + case PBCacheEntry::optional_bytes_netmask: { + decodeNetmask(message, cacheEntry.d_netmask); + break; + } + case PBCacheEntry::optional_bytes_rtag: + cacheEntry.d_rtag = message.get_bytes(); + break; + case PBCacheEntry::required_uint32_state: + cacheEntry.d_state = static_cast(message.get_uint32()); + break; + case PBCacheEntry::required_int64_ttd: + cacheEntry.d_ttd = message.get_int64(); + break; + case PBCacheEntry::required_uint32_orig_ttl: + cacheEntry.d_orig_ttl = message.get_uint32(); + break; + case PBCacheEntry::required_uint32_servedStale: + cacheEntry.d_servedStale = message.get_uint32(); + break; + case PBCacheEntry::required_uint32_qtype: + cacheEntry.d_qtype = message.get_uint32(); + break; + case PBCacheEntry::required_bool_auth: + cacheEntry.d_auth = message.get_bool(); + break; + case PBCacheEntry::required_bool_submitted: + cacheEntry.d_submitted = message.get_bool(); + cacheEntry.d_submitted = false; // actually not + break; + case PBCacheEntry::required_bool_tooBig: + cacheEntry.d_tooBig = message.get_bool(); + break; + default: + message.skip(); + break; + } + } + return g_recCache->replace(std::move(cacheEntry)); +} + +void MemRecursorCache::putRecords(const std::string& pbuf) +{ + auto log = g_slog->withName("recordcache")->withValues("size", Logging::Loggable(pbuf.size())); + log->info(Logr::Debug, "Processing cache dump"); + + protozero::pbf_message full(pbuf); + size_t count = 0; + size_t inserted = 0; + try { + while (full.next()) { + switch (full.tag()) { + case PBCacheDump::required_string_version: { + auto version = full.get_string(); + log = log->withValues("version", Logging::Loggable(version)); + break; + } + case PBCacheDump::required_string_identity: { + auto identity = full.get_string(); + log = log->withValues("identity", Logging::Loggable(identity)); + break; + } + case PBCacheDump::required_uint64_protocolVersion: { + auto protocolVersion = full.get_uint64(); + log = log->withValues("protocolVersion", Logging::Loggable(protocolVersion)); + if (protocolVersion != 1) { + throw std::runtime_error("Protocol version mismatch"); } + break; + } + case PBCacheDump::required_int64_time: { + auto time = full.get_int64(); + log = log->withValues("time", Logging::Loggable(time)); + break; + } + case PBCacheDump::repeated_message_cacheEntry: { + protozero::pbf_message message = full.get_message(); + if (putRecord(message)) { + ++inserted; + } + ++count; + break; + } } - //cerr << "I " << count << ' ' << cacheEntry.d_qname << ' ' << QType(cacheEntry.d_qtype) << ' ' << cacheEntry.d_auth << ' ' << cacheEntry.d_records.size() << endl; - g_recCache->replace(std::move(cacheEntry)); - ++count; } + log->info(Logr::Info, "Processed cache dump", "processed", Logging::Loggable(count), "inserted", Logging::Loggable(inserted)); } catch (const std::runtime_error& e) { - cerr << "runtime: " << e.what() << endl; + log->error(Logr::Error, e.what(), "Runtime exception processing cache dump"); } catch (const std::exception& e) { - cerr << "exception: " << e.what() << endl; + log->error(Logr::Error, e.what(), "Exception processing cache dump"); } catch (...) { - cerr << "other" << endl; + log->error(Logr::Error, "Other exception processing cache dump"); } - cerr << "Inserted " << count << " record sets" << endl; } namespace boost diff --git a/pdns/recursordist/recursor_cache.hh b/pdns/recursordist/recursor_cache.hh index 06dbf9af73..9a07c73b67 100644 --- a/pdns/recursordist/recursor_cache.hh +++ b/pdns/recursordist/recursor_cache.hh @@ -64,7 +64,7 @@ public: [[nodiscard]] pair stats(); [[nodiscard]] size_t ecsIndexSize(); - void getRecords(size_t howmany, std::string& ret); + void getRecords(size_t howmany, size_t maxsize, std::string& ret); void putRecords(const std::string& pbuf); using OptTag = boost::optional; @@ -154,7 +154,8 @@ private: bool d_tooBig{false}; }; - void replace(CacheEntry&& entry); + bool replace(CacheEntry&& entry); + template bool putRecord(T&); /* The ECS Index (d_ecsIndex) keeps track of whether there is any ECS-specific entry for a given (qname,qtype) entry in the cache (d_map), and if so