]> git.ipfire.org Git - thirdparty/pdns.git/commitdiff
Allow 4 byte framesize and use it if configured
authorOtto Moerbeek <otto.moerbeek@open-xchange.com>
Wed, 11 Mar 2026 12:57:11 +0000 (13:57 +0100)
committerOtto Moerbeek <otto.moerbeek@open-xchange.com>
Fri, 20 Mar 2026 11:24:26 +0000 (12:24 +0100)
Signed-off-by: Otto Moerbeek <otto.moerbeek@open-xchange.com>
15 files changed:
contrib/ProtobufLogger.py
pdns/dnsdistdist/dnsdist-configuration-yaml.cc
pdns/dnsdistdist/dnsdist-lua-bindings-protobuf.cc
pdns/recursordist/lwres.cc
pdns/recursordist/pdns_recursor.cc
pdns/recursordist/rec-lua-conf.cc
pdns/recursordist/rec-lua-conf.hh
pdns/recursordist/rec-main.cc
pdns/recursordist/rec-protozero.cc
pdns/recursordist/rec-protozero.hh
pdns/recursordist/rec-rust-lib/cxxsupport.cc
pdns/recursordist/rec-rust-lib/rust-bridge-in.rs
pdns/recursordist/rec-rust-lib/rust/src/bridge.rs
pdns/remote_logger.cc
pdns/remote_logger.hh

index 2b29ad776ff00935e86efdd04b661eb8baecbfc0..3222032c954ef0b11975cca9914b261872416c88 100644 (file)
@@ -33,10 +33,12 @@ except Exception:
 
 
 class PDNSPBConnHandler(object):
-    def __init__(self, conn, oturl, printjson):
+
+    def __init__(self, conn, oturl, printjson, frame4):
         self._conn = conn
         self._oturl = oturl
         self._printjson = printjson
+        self._frame4 = frame4
 
     messageTypeToStringMap = {
         dnsmessage_pb2.PBDNSMessage.UNKNOWN: "Unknown",
@@ -49,11 +51,17 @@ class PDNSPBConnHandler(object):
 
     def run(self):
         while True:
-            data = self._conn.recv(2)
-            if not data or len(data) < 2:
-                break
+            if self._frame4:
+                data = self._conn.recv(4)
+                if not data or len(data) < 4:
+                    break
+                (datalen,) = struct.unpack("!L", data)
+            else:
+                data = self._conn.recv(2)
+                if not data or len(data) < 2:
+                    break
+                (datalen,) = struct.unpack("!H", data)
 
-            (datalen,) = struct.unpack("!H", data)
             data = b""
             remaining = datalen
 
@@ -153,10 +161,8 @@ class PDNSPBConnHandler(object):
                     self.convertIDs(values)
                     json_string = json.dumps(values, indent=True)
                     print("- openTelemetry: " + json_string)
-                else:
-                    print(
-                        "- openTelemetry decoding not available, see the comments in ProtoBuffer.py to make it available."
-                    )
+            else:
+                print("- openTelemetry decoding not available, see the comments in ProtoBuffer.py to make it available.")
 
     @staticmethod
     def getAppliedPolicyTypeAsString(polType):
@@ -236,7 +242,7 @@ class PDNSPBConnHandler(object):
                 if (rrclass == 1 or rrclass == 255) and rr.HasField("rdata"):
                     if rrtype == 1:
                         rdatastr = socket.inet_ntop(socket.AF_INET, rr.rdata)
-                    elif rrtype in (5, 35, 64, 65):
+                    elif rrtype in (5, 16, 35, 64, 65):
                         rdatastr = rr.rdata
                     elif rrtype == 28:
                         rdatastr = socket.inet_ntop(socket.AF_INET6, rr.rdata)
@@ -388,11 +394,13 @@ class PDNSPBConnHandler(object):
         return requestorstr
 
 
-class PDNSPBListener(object):
-    def __init__(self, addr, port, oturl, printjson):
+    def __init__(self, addr, port, oturl, printjson, frame4):
         self._oturl = oturl
         self._printjson = printjson
-        res = socket.getaddrinfo(addr, port, socket.AF_UNSPEC, socket.SOCK_STREAM, 0, socket.AI_PASSIVE)
+        self._frame4 = frame4
+        res = socket.getaddrinfo(addr, port, socket.AF_UNSPEC,
+                                 socket.SOCK_STREAM, 0,
+                                 socket.AI_PASSIVE)
         if len(res) != 1:
             print("Error parsing the supplied address")
             sys.exit(1)
@@ -411,8 +419,10 @@ class PDNSPBListener(object):
         while True:
             (conn, _) = self._sock.accept()
 
-            handler = PDNSPBConnHandler(conn, self._oturl, self._printjson)
-            thread = threading.Thread(name="Connection Handler", target=PDNSPBConnHandler.run, args=[handler])
+            handler = PDNSPBConnHandler(conn, self._oturl, self._printjson, self._frame4)
+            thread = threading.Thread(name='Connection Handler',
+                                      target=PDNSPBConnHandler.run,
+                                      args=[handler])
             thread.daemon = True
             thread.start()
 
@@ -424,10 +434,11 @@ if __name__ == "__main__":
         epilog="URL is an optional url of a OpenTelemetry Trace collector endpoint",
     )
 
-    parser.add_argument("-json", action="store_true")
-    parser.add_argument("address")
-    parser.add_argument("port")
-    parser.add_argument("-url")
-    args = parser.parse_args()
-    PDNSPBListener(args.address, args.port, args.url, args.json).run()
+    parser.add_argument('-json', action='store_true')
+    parser.add_argument('address')
+    parser.add_argument('port')
+    parser.add_argument('-url')
+    parser.add_argument('-frame4', action='store_true')
+    args = parser.parse_args();
+    PDNSPBListener(args.address, args.port, args.url, args.json, args.frame4).run()
     sys.exit(0)
index 8c44eb32d62ea3a85e280e2d9acb907995629644..48eff1d21670d84dae0a931676f1c5c838d5d65e 100644 (file)
@@ -1876,12 +1876,12 @@ void registerProtobufLogger(const ProtobufLoggerConfiguration& config)
     std::vector<std::shared_ptr<RemoteLoggerInterface>> loggers;
     loggers.reserve(config.connection_count);
     for (uint64_t i = 0; i < config.connection_count; i++) {
-      loggers.push_back(std::make_shared<RemoteLogger>(ComboAddress(std::string(config.address)), config.timeout, config.max_queued_entries * 100, config.reconnect_wait_time, dnsdist::configuration::yaml::s_inClientMode));
+      loggers.push_back(std::make_shared<RemoteLogger>(ComboAddress(std::string(config.address)), config.timeout, config.max_queued_entries * 100, config.reconnect_wait_time, dnsdist::configuration::yaml::s_inClientMode, RemoteLogger::FrameSize::Two));
     }
     object = std::shared_ptr<RemoteLoggerInterface>(std::make_shared<RemoteLoggerPool>(std::move(loggers)));
   }
   else {
-    object = std::shared_ptr<RemoteLoggerInterface>(std::make_shared<RemoteLogger>(ComboAddress(std::string(config.address)), config.timeout, config.max_queued_entries * 100, config.reconnect_wait_time, dnsdist::configuration::yaml::s_inClientMode));
+    object = std::shared_ptr<RemoteLoggerInterface>(std::make_shared<RemoteLogger>(ComboAddress(std::string(config.address)), config.timeout, config.max_queued_entries * 100, config.reconnect_wait_time, dnsdist::configuration::yaml::s_inClientMode, RemoteLogger::FrameSize::Two));
   }
   dnsdist::configuration::yaml::registerType<RemoteLoggerInterface>(object, config.name);
 #endif
index 26a31e768257be2dee64d68ab082e93745da27cc..7ce6e6dcb995128d9fb7a995f6767062f96a2c83 100644 (file)
@@ -130,12 +130,12 @@ void setupLuaBindingsProtoBuf(LuaContext& luaCtx, bool client, bool configCheck)
       std::vector<std::shared_ptr<RemoteLoggerInterface>> loggers;
       loggers.reserve(count);
       for (uint64_t i = 0; i < count; i++) {
-        loggers.push_back(std::make_shared<RemoteLogger>(ComboAddress(remote), timeout ? *timeout : 2, maxQueuedEntries ? (*maxQueuedEntries * 100) : 10000, reconnectWaitTime ? *reconnectWaitTime : 1, client));
+        loggers.push_back(std::make_shared<RemoteLogger>(ComboAddress(remote), timeout ? *timeout : 2, maxQueuedEntries ? (*maxQueuedEntries * 100) : 10000, reconnectWaitTime ? *reconnectWaitTime : 1, client, RemoteLogger::FrameSize::Two));
       }
       return std::shared_ptr<RemoteLoggerInterface>(new RemoteLoggerPool(std::move(loggers)));
     }
 
-    return std::shared_ptr<RemoteLoggerInterface>(new RemoteLogger(ComboAddress(remote), timeout ? *timeout : 2, maxQueuedEntries ? (*maxQueuedEntries * 100) : 10000, reconnectWaitTime ? *reconnectWaitTime : 1, client));
+    return std::shared_ptr<RemoteLoggerInterface>(new RemoteLogger(ComboAddress(remote), timeout ? *timeout : 2, maxQueuedEntries ? (*maxQueuedEntries * 100) : 10000, reconnectWaitTime ? *reconnectWaitTime : 1, client, RemoteLogger::FrameSize::Two));
   });
 
   luaCtx.writeFunction("newFrameStreamUnixLogger", [client, configCheck]([[maybe_unused]] const std::string& address, [[maybe_unused]] std::optional<LuaAssociativeTable<unsigned int>> params) {
index a9a7a39b51fdf245c5a35b78c1a2612b5dfedc86..fb1ab21f15da2541a861d3b1e69a1a9a8a376b23 100644 (file)
@@ -339,8 +339,9 @@ static void logIncomingResponse(const std::shared_ptr<std::vector<std::unique_pt
     m.setResponseCode(rcode);
   }
 
+  const auto limit = (outgoingLoggers->size() > 0 ? outgoingLoggers->at(0)->maxSize() : std::numeric_limits<uint16_t>::max()) / 2;
   for (const auto& record : records) {
-    m.addRR(record, exportTypes, std::nullopt);
+    m.addRR(record, exportTypes, std::nullopt, limit);
   }
   m.commitResponse();
 
index 7e22bb1f5083970ee0e0a5b07d389a2ae653fc33..d797cf87bf34e8e8fbc1265b4687b39bc83b599f 100644 (file)
@@ -1537,11 +1537,12 @@ void startDoResolve(void* arg) // NOLINT(readability-function-cognitive-complexi
 #endif /* NOD ENABLED */
 
         if (t_protobufServers.servers) {
-          // Max size is 64k, but we're conservative here, as other fields are added after the answers have been added
-          // If a single answer causes a too big protobuf message, it will be dropped by queueData()
-          // But note addRR has code to prevent that
-          if (pbMessage.size() < std::numeric_limits<uint16_t>::max() / 2) {
-            pbMessage.addRR(record, luaconfsLocal->protobufExportConfig.exportTypes, udr);
+          // Max size is 64k for 2 bytes frames, but we're conservative here, as other fields are
+          // added after the answers have been added. If a single answer causes a too big protobuf
+          // message, it will be dropped by queueData(), but note addRR has code to prevent that.
+          const auto limit = (t_protobufServers.servers->size() > 0 ? t_protobufServers.servers->at(0)->maxSize() : std::numeric_limits<uint16_t>::max()) / 2;
+          if (pbMessage.size() < limit) {
+            pbMessage.addRR(record, luaconfsLocal->protobufExportConfig.exportTypes, udr, limit);
           }
         }
       }
index 7c1ca4fd9ed37f970f0956210c8e7d0838276549..4fe55899190e0efd23cf4aeac5e7411225737b93 100644 (file)
@@ -75,7 +75,8 @@ bool operator==(const ProtobufExportConfig& configA, const ProtobufExportConfig&
          configA.logQueries           == configB.logQueries        &&
          configA.logResponses         == configB.logResponses      &&
          configA.taggedOnly           == configB.taggedOnly        &&
-         configA.logMappedFrom        == configB.logMappedFrom;
+         configA.logMappedFrom        == configB.logMappedFrom     &&
+         configA.frame4               == configB.frame4;
   // clang-format on
 }
 
@@ -214,6 +215,10 @@ static void parseProtobufOptions(const std::optional<protobufOptions_t>& vars, P
     config.logMappedFrom = boost::get<bool>(have.at("logMappedFrom"));
   }
 
+  if (have.count("frame4") != 0) {
+    config.frame4 = boost::get<bool>(have.at("frame4"));
+  }
+
   if (have.count("exportTypes") != 0) {
     config.exportTypes.clear();
 
index de657fcb94e6bfbf8d76bd6c8d3dea8ddbc33e2e..767fb9bfc2e4d138d9336384a5638fa94bac095c 100644 (file)
@@ -44,6 +44,7 @@ struct ProtobufExportConfig
   bool logResponses{true};
   bool taggedOnly{false};
   bool logMappedFrom{false};
+  bool frame4{false};
 };
 
 bool operator==(const ProtobufExportConfig& configA, const ProtobufExportConfig& configB);
index 72a70c1e4eed4d355f296a903cd57678c9bcfbc1..c5c4d97dade16e51089b5766617613047156aa27 100644 (file)
@@ -416,7 +416,7 @@ static std::shared_ptr<std::vector<std::unique_ptr<RemoteLogger>>> startProtobuf
 
   for (const auto& server : config.servers) {
     try {
-      auto logger = make_unique<RemoteLogger>(server, config.timeout, 100 * config.maxQueuedEntries, config.reconnectWaitTime, config.asyncConnect);
+      auto logger = make_unique<RemoteLogger>(server, config.timeout, 100 * config.maxQueuedEntries, config.reconnectWaitTime, config.asyncConnect, config.frame4 ? RemoteLogger::FrameSize::Four : RemoteLogger::FrameSize::Two);
       logger->setLogQueries(config.logQueries);
       logger->setLogResponses(config.logResponses);
       result->emplace_back(std::move(logger));
index fd8106f2dad97455c4a3c6f9ef29957a05ea3e2b..346e9920b0bbeffed01b50e6e24ff895dd1ab3f2 100644 (file)
@@ -24,7 +24,7 @@
 #include "rec-protozero.hh"
 #include <variant>
 
-void pdns::ProtoZero::RecMessage::addRR(const DNSRecord& record, const std::set<uint16_t>& exportTypes, [[maybe_unused]] std::optional<bool> udr)
+void pdns::ProtoZero::RecMessage::addRR(const DNSRecord& record, const std::set<uint16_t>& exportTypes, [[maybe_unused]] std::optional<bool> udr, size_t limit)
 {
   if (record.d_place != DNSResourceRecord::ANSWER || record.d_class != QClass::IN) {
     return;
@@ -42,7 +42,7 @@ void pdns::ProtoZero::RecMessage::addRR(const DNSRecord& record, const std::set<
   pbf_rr.add_uint32(static_cast<protozero::pbf_tag_type>(pdns::ProtoZero::Message::RRField::ttl), record.d_ttl);
 
   auto add = [&](const std::string& str) {
-    if (size() + str.length() < std::numeric_limits<uint16_t>::max() / 2) {
+    if (size() + str.length() < limit) {
       pbf_rr.add_string(static_cast<protozero::pbf_tag_type>(pdns::ProtoZero::Message::RRField::rdata), str);
     }
   };
index 1bed1722d54f9e7ef9c15f801f4e8532a31246b4..dd103131c0d6630fdeccdad65941f65303329759 100644 (file)
@@ -109,7 +109,7 @@ namespace ProtoZero
 
     // DNSResponse related fields below
 
-    void addRR(const DNSRecord& record, const std::set<uint16_t>& exportTypes, std::optional<bool> udr);
+    void addRR(const DNSRecord& record, const std::set<uint16_t>& exportTypes, std::optional<bool> udr, size_t limit);
 
     void setAppliedPolicyType(const DNSFilterEngine::PolicyType type)
     {
index ff672514991c0d85da7376fd1c804531880dde66..76b975700df6c1d59430ad90e4b586a2603d776d 100644 (file)
@@ -789,6 +789,7 @@ void fromLuaToRust(const ProtobufExportConfig& pbConfig, pdns::rust::settings::r
     pbServer.exportTypes.emplace_back(QType(num).toString());
   }
   pbServer.logMappedFrom = pbConfig.logMappedFrom;
+  pbServer.frame4 = pbConfig.frame4;
 }
 
 void fromLuaToRust(const FrameStreamExportConfig& fsc, pdns::rust::settings::rec::DNSTapFrameStreamServer& dnstap)
@@ -1142,6 +1143,7 @@ void fromRustToLuaConfig(const pdns::rust::settings::rec::ProtobufServer& pbServ
   exp.logResponses = pbServer.logResponses;
   exp.taggedOnly = pbServer.taggedOnly;
   exp.logMappedFrom = pbServer.logMappedFrom;
+  exp.frame4 = pbServer.frame4;
 }
 
 void fromRustToLuaConfig(const pdns::rust::settings::rec::DNSTapFrameStreamServer& dnstap, FrameStreamExportConfig& exp)
index b31b3316f3a3e88f76ac28fa7ac8079c7a85522b..11880cd8956417336e51595713b5875bb7ba33f5 100644 (file)
@@ -71,6 +71,9 @@ pub struct ProtobufServer {
     exportTypes: Vec<String>,
     #[serde(default, skip_serializing_if = "crate::is_default", alias = "log_mapped_from")]
     logMappedFrom: bool,
+    // Added in 5.5.0
+    #[serde(default, skip_serializing_if = "crate::is_default")]
+    frame4: bool,
 }
 
 // A dnstap logging server
index 036aa93156a689293b8cc1d5ca3c67591bd082d5..5c8cfcaec3d0a283bfcf2146e9d7243fb8382947 100644 (file)
@@ -438,6 +438,7 @@ impl ProtobufServer {
         }
         insertseq(&mut map, "exportTypes", &seq2);
         insertb(&mut map, "logMappedFrom", self.logMappedFrom);
+        insertb(&mut map, "frame4", self.frame4);
         serde_yaml::Value::Mapping(map)
     }
 }
index 941e9b214aa28b2730a72a6006a1597d1b9a1ef0..acb398490b5ff68e0caf089ea170a14b3cb08c66 100644 (file)
 
 bool CircularWriteBuffer::hasRoomFor(const std::string& str) const
 {
-  return d_buffer.size() + 2 + str.size() <= d_buffer.capacity();
+  return d_buffer.size() + d_framesize + str.size() <= d_buffer.capacity();
+}
+
+bool CircularWriteBuffer::tooBig(const std::string& str) const
+{
+  return str.size() > (d_framesize == 2 ? std::numeric_limits<uint16_t>::max() : std::numeric_limits<uint32_t>::max());
 }
 
 bool CircularWriteBuffer::write(const std::string& str)
 {
-  if (str.size() > std::numeric_limits<uint16_t>::max() || !hasRoomFor(str)) {
+  if (tooBig(str) || !hasRoomFor(str)) {
     return false;
   }
 
-  uint16_t len = htons(str.size());
-  const char* ptr = reinterpret_cast<const char*>(&len); // NOLINT(cppcoreguidelines-pro-type-reinterpret-cast)
-  d_buffer.insert(d_buffer.end(), ptr, ptr + sizeof(len)); // NOLINT(cppcoreguidelines-pro-bounds-pointer-arithmetic)
+  if (d_framesize == 2) {
+    uint16_t len = htons(str.size());
+    const char* ptr = reinterpret_cast<const char*>(&len); // NOLINT(cppcoreguidelines-pro-type-reinterpret-cast)
+    d_buffer.insert(d_buffer.end(), ptr, ptr + sizeof(len)); // NOLINT(cppcoreguidelines-pro-bounds-pointer-arithmetic)
+  }
+  else {
+    uint32_t len = htonl(str.size());
+    const char* ptr = reinterpret_cast<const char*>(&len); // NOLINT(cppcoreguidelines-pro-type-reinterpret-cast)
+    d_buffer.insert(d_buffer.end(), ptr, ptr + sizeof(len)); // NOLINT(cppcoreguidelines-pro-bounds-pointer-arithmetic)
+  }
   d_buffer.insert(d_buffer.end(), str.begin(), str.end());
-
   return true;
 }
 
@@ -125,7 +136,7 @@ const std::string& RemoteLoggerInterface::toErrorString(Result result)
   return str.at(std::min(tmp, 4U));
 }
 
-RemoteLogger::RemoteLogger(const ComboAddress& remote, uint16_t timeout, uint64_t maxQueuedBytes, uint8_t reconnectWaitTime, bool asyncConnect) : d_remote(remote), d_timeout(timeout), d_reconnectWaitTime(reconnectWaitTime), d_asyncConnect(asyncConnect), d_runtime({CircularWriteBuffer(maxQueuedBytes), nullptr})
+RemoteLogger::RemoteLogger(const ComboAddress& remote, uint16_t timeout, uint64_t maxQueuedBytes, uint8_t reconnectWaitTime, bool asyncConnect, RemoteLogger::FrameSize frame) : d_remote(remote), d_timeout(timeout), d_reconnectWaitTime(reconnectWaitTime), d_asyncConnect(asyncConnect), d_runtime({CircularWriteBuffer(maxQueuedBytes, frame == FrameSize::Two ? 2 : 4), nullptr}), d_framesize(frame)
 {
   if (!d_asyncConnect) {
     reconnect();
@@ -171,7 +182,7 @@ RemoteLoggerInterface::Result RemoteLogger::queueData(const std::string& data)
 {
   auto runtime = d_runtime.lock();
 
-  if (data.size() > std::numeric_limits<uint16_t>::max()) {
+  if (runtime->d_writer.tooBig(data)) {
     ++runtime->d_stats.d_tooLarge;
     return Result::TooLarge;
   }
index 5817bdafbbb1e7afd2033635a203f5f0caeea0ca..43292f6aa048f7e33ce98f2b44c0125ab4c7a00e 100644 (file)
 class CircularWriteBuffer
 {
 public:
-  explicit CircularWriteBuffer(size_t size) : d_buffer(size)
+  explicit CircularWriteBuffer(size_t size, uint8_t frame) : d_buffer(size), d_framesize(frame)
   {
   }
 
   [[nodiscard]] bool hasRoomFor(const std::string& str) const;
+  [[nodiscard]] bool tooBig(const std::string& str) const;
   bool write(const std::string& str);
   bool flush(int fileDesc);
 
 private:
   boost::circular_buffer<char> d_buffer;
+  uint8_t d_framesize;
 };
 
 class RemoteLoggerInterface
@@ -122,6 +124,11 @@ private:
 class RemoteLogger : public RemoteLoggerInterface
 {
 public:
+  enum class FrameSize : uint8_t
+  {
+    Two,
+    Four,
+  };
   RemoteLogger(const RemoteLogger&) = delete;
   RemoteLogger(RemoteLogger&&) = delete;
   RemoteLogger& operator=(const RemoteLogger&) = delete;
@@ -129,7 +136,8 @@ public:
   RemoteLogger(const ComboAddress& remote, uint16_t timeout = 2,
                uint64_t maxQueuedBytes = 100000,
                uint8_t reconnectWaitTime = 1,
-               bool asyncConnect = false);
+               bool asyncConnect = false,
+               FrameSize frame = FrameSize::Two);
   ~RemoteLogger() override;
 
   std::string address() const override
@@ -138,6 +146,10 @@ public:
   }
 
   [[nodiscard]] Result queueData(const std::string& data) override;
+  [[nodiscard]] size_t maxSize() const
+  {
+    return d_framesize == FrameSize::Two ? std::numeric_limits<uint16_t>::max() : std::numeric_limits<uint32_t>::max();
+  }
   [[nodiscard]] std::string name() const override
   {
     return "protobuf";
@@ -177,4 +189,5 @@ private:
 
   LockGuarded<RuntimeData> d_runtime;
   std::thread d_thread;
+  FrameSize d_framesize;
 };