]> git.ipfire.org Git - thirdparty/pdns.git/commitdiff
dnsdist: add pooling support for RemoteLoggerInterface
authorEnsar Sarajčić <dev@ensarsarajcic.com>
Wed, 5 Feb 2025 14:58:05 +0000 (15:58 +0100)
committerEnsar Sarajčić <dev@ensarsarajcic.com>
Wed, 5 Feb 2025 14:58:05 +0000 (15:58 +0100)
This adds a new kind of `RemoteLoggerInterface`: `RemoteLoggerPool`.
It can take multiple other `RemoteLoggerInterface`s and pass data to
them in round-robin order by default.

This also adds additional option to `newRemoteLogger`, `newFrameStreamTcpLogger`
and `newFrameStreamUnixLogger`: `connectionCount`, which can be used to
generate a pool with multiple connections.

Closes: #14861
pdns/dnsdistdist/Makefile.am
pdns/dnsdistdist/dnsdist-lua-bindings-protobuf.cc
pdns/dnsdistdist/remote_logger_pool.cc [new symlink]
pdns/dnsdistdist/remote_logger_pool.hh [new symlink]
pdns/remote_logger_pool.cc [new file with mode: 0644]
pdns/remote_logger_pool.hh [new file with mode: 0644]

index c1507b80b757db6c90fafdb04a22d72fef6ab031..e8225d954b35de3ec78a5ac9c0e1bca5d8ac0a21 100644 (file)
@@ -287,6 +287,7 @@ dnsdist_SOURCES = \
        proxy-protocol.cc proxy-protocol.hh \
        qtype.cc qtype.hh \
        remote_logger.cc remote_logger.hh \
+       remote_logger_pool.cc remote_loggerpool.hh \
        sholder.hh \
        snmp-agent.cc snmp-agent.hh \
        sstuff.hh \
index 8235f48af91c317c461df9a4cf7980c418600908..2d0d1795746342a8df86e9f2cbb93cef4601f80f 100644 (file)
@@ -30,6 +30,7 @@
 #include "fstrm_logger.hh"
 #include "ipcipher.hh"
 #include "remote_logger.hh"
+#include "remote_logger_pool.hh"
 
 #ifdef HAVE_FSTRM
 static void parseFSTRMOptions(boost::optional<LuaAssociativeTable<unsigned int>>& params, LuaAssociativeTable<unsigned int>& options)
@@ -38,7 +39,7 @@ static void parseFSTRMOptions(boost::optional<LuaAssociativeTable<unsigned int>>
     return;
   }
 
-  static std::vector<std::string> const potentialOptions = { "bufferHint", "flushTimeout", "inputQueueSize", "outputQueueSize", "queueNotifyThreshold", "reopenInterval" };
+  static std::vector<std::string> const potentialOptions = { "bufferHint", "flushTimeout", "inputQueueSize", "outputQueueSize", "queueNotifyThreshold", "reopenInterval", "connectionCount" };
 
   for (const auto& potentialOption : potentialOptions) {
     getOptionalValue<unsigned int>(params, potentialOption, options[potentialOption]);
@@ -62,14 +63,14 @@ void setupLuaBindingsProtoBuf(LuaContext& luaCtx, bool client, bool configCheck)
 #endif /* HAVE_IPCIPHER */
 
   /* ProtobufMessage */
 luaCtx.registerFunction<void(DNSDistProtoBufMessage::*)(std::string)>("setTag", [](DNSDistProtoBufMessage& message, const std::string& strValue) {
-      message.addTag(strValue);
-    });
-  luaCtx.registerFunction<void(DNSDistProtoBufMessage::*)(LuaArray<std::string>)>("setTagArray", [](DNSDistProtoBufMessage& message, const LuaArray<std::string>& tags) {
-      for (const auto& tag : tags) {
-        message.addTag(tag.second);
-      }
-    });
pick luaCtx.registerFunction<void (DNSDistProtoBufMessage::*)(std::string)>("setTag", [](DNSDistProtoBufMessage& message, const std::string& strValue) {
+    message.addTag(strValue);
+  });
+  luaCtx.registerFunction<void (DNSDistProtoBufMessage::*)(LuaArray<std::string>)>("setTagArray", [](DNSDistProtoBufMessage& message, const LuaArray<std::string>& tags) {
+    for (const auto& tag : tags) {
+      message.addTag(tag.second);
+    }
+  });
 
   luaCtx.registerFunction<void(DNSDistProtoBufMessage::*)(boost::optional <time_t> sec, boost::optional <uint32_t> uSec)>("setProtobufResponseType",
                                         [](DNSDistProtoBufMessage& message, boost::optional <time_t> sec, boost::optional <uint32_t> uSec) {
@@ -121,12 +122,22 @@ void setupLuaBindingsProtoBuf(LuaContext& luaCtx, bool client, bool configCheck)
     });
 
   /* RemoteLogger */
-  luaCtx.writeFunction("newRemoteLogger", [client,configCheck](const std::string& remote, boost::optional<uint16_t> timeout, boost::optional<uint64_t> maxQueuedEntries, boost::optional<uint8_t> reconnectWaitTime) {
+  luaCtx.writeFunction("newRemoteLogger", [client,configCheck](const std::string& remote, boost::optional<uint16_t> timeout, boost::optional<uint64_t> maxQueuedEntries, boost::optional<uint8_t> reconnectWaitTime, boost::optional<uint64_t> connectionCount) {
       if (client || configCheck) {
         return std::shared_ptr<RemoteLoggerInterface>(nullptr);
       }
-      return std::shared_ptr<RemoteLoggerInterface>(new RemoteLogger(ComboAddress(remote), timeout ? *timeout : 2, maxQueuedEntries ? (*maxQueuedEntries*100) : 10000, reconnectWaitTime ? *reconnectWaitTime : 1, client));
-    });
+      auto count = connectionCount ? *connectionCount : 1;
+      if (count > 1) {
+        std::vector<std::shared_ptr<RemoteLoggerInterface>> loggers;
+        for (uint64_t i = 0; i < count; i++) {
+          loggers.emplace_back(new RemoteLogger(ComboAddress(remote), timeout ? *timeout : 2, maxQueuedEntries ? (*maxQueuedEntries * 100) : 10000, reconnectWaitTime ? *reconnectWaitTime : 1, client));
+        }
+        return std::shared_ptr<RemoteLoggerInterface>(new RemoteLoggerPool(std::move(loggers)));
+      }
+      else {
+        return std::shared_ptr<RemoteLoggerInterface>(new RemoteLogger(ComboAddress(remote), timeout ? *timeout : 2, maxQueuedEntries ? (*maxQueuedEntries * 100) : 10000, reconnectWaitTime ? *reconnectWaitTime : 1, client));
+      }
+  });
 
   luaCtx.writeFunction("newFrameStreamUnixLogger", [client,configCheck]([[maybe_unused]] const std::string& address, [[maybe_unused]] boost::optional<LuaAssociativeTable<unsigned int>> params) {
 #ifdef HAVE_FSTRM
@@ -137,11 +148,23 @@ void setupLuaBindingsProtoBuf(LuaContext& luaCtx, bool client, bool configCheck)
       LuaAssociativeTable<unsigned int> options;
       parseFSTRMOptions(params, options);
       checkAllParametersConsumed("newRemoteLogger", params);
-      return std::shared_ptr<RemoteLoggerInterface>(new FrameStreamLogger(AF_UNIX, address, !client, options));
+      auto connectionCount = options.find("connectionCount");
+      auto count = connectionCount == options.end() ? 1 : connectionCount->second;
+      options.erase(connectionCount);
+      if (count > 1) {
+        std::vector<std::shared_ptr<RemoteLoggerInterface>> loggers;
+        for (uint64_t i = 0; i < count; i++) {
+          loggers.emplace_back(new FrameStreamLogger(AF_UNIX, address, !client, options));
+        }
+        return std::shared_ptr<RemoteLoggerInterface>(new RemoteLoggerPool(std::move(loggers)));
+      }
+      else {
+        return std::shared_ptr<RemoteLoggerInterface>(new FrameStreamLogger(AF_UNIX, address, !client, options));
+      }
 #else
-      throw std::runtime_error("fstrm support is required to build an AF_UNIX FrameStreamLogger");
+    throw std::runtime_error("fstrm support is required to build an AF_UNIX FrameStreamLogger");
 #endif /* HAVE_FSTRM */
-    });
+  });
 
   luaCtx.writeFunction("newFrameStreamTcpLogger", [client,configCheck]([[maybe_unused]] const std::string& address, [[maybe_unused]] boost::optional<LuaAssociativeTable<unsigned int>> params) {
 #if defined(HAVE_FSTRM) && defined(HAVE_FSTRM_TCP_WRITER_INIT)
@@ -152,7 +175,19 @@ void setupLuaBindingsProtoBuf(LuaContext& luaCtx, bool client, bool configCheck)
       LuaAssociativeTable<unsigned int> options;
       parseFSTRMOptions(params, options);
       checkAllParametersConsumed("newFrameStreamTcpLogger", params);
-      return std::shared_ptr<RemoteLoggerInterface>(new FrameStreamLogger(AF_INET, address, !client, options));
+      auto connectionCount = options.find("connectionCount");
+      auto count = connectionCount == options.end() ? 1 : connectionCount->second;
+      options.erase(connectionCount);
+      if (count > 1) {
+        std::vector<std::shared_ptr<RemoteLoggerInterface>> loggers;
+        for (uint64_t i = 0; i < count; i++) {
+          loggers.emplace_back(new FrameStreamLogger(AF_INET, address, !client, options));
+        }
+        return std::shared_ptr<RemoteLoggerInterface>(new RemoteLoggerPool(std::move(loggers)));
+      }
+      else {
+        return std::shared_ptr<RemoteLoggerInterface>(new FrameStreamLogger(AF_INET, address, !client, options));
+      }
 #else
       throw std::runtime_error("fstrm with TCP support is required to build an AF_INET FrameStreamLogger");
 #endif /* HAVE_FSTRM */
diff --git a/pdns/dnsdistdist/remote_logger_pool.cc b/pdns/dnsdistdist/remote_logger_pool.cc
new file mode 120000 (symlink)
index 0000000..b92d4c5
--- /dev/null
@@ -0,0 +1 @@
+../remote_logger_pool.cc
\ No newline at end of file
diff --git a/pdns/dnsdistdist/remote_logger_pool.hh b/pdns/dnsdistdist/remote_logger_pool.hh
new file mode 120000 (symlink)
index 0000000..037a11a
--- /dev/null
@@ -0,0 +1 @@
+../remote_logger_pool.hh
\ No newline at end of file
diff --git a/pdns/remote_logger_pool.cc b/pdns/remote_logger_pool.cc
new file mode 100644 (file)
index 0000000..a730b12
--- /dev/null
@@ -0,0 +1,37 @@
+#include <memory>
+#include <numeric>
+#include <string>
+#include <unistd.h>
+#include <sys/un.h>
+
+#include "config.h"
+#include "remote_logger_pool.hh"
+
+RemoteLoggerPool::RemoteLoggerPool(std::vector<std::shared_ptr<RemoteLoggerInterface>>&& pool) :
+  d_pool(std::move(pool))
+{
+  d_pool_it = d_pool.begin();
+}
+
+[[nodiscard]] std::string RemoteLoggerPool::toString()
+{
+  auto stats = this->getStats();
+  std::string loggersDesc;
+  for (size_t i = 0; i < this->d_pool.size(); i++) {
+    if (i > 0) {
+      loggersDesc += ", ";
+    }
+    loggersDesc += d_pool[i]->toString();
+  }
+  return "RemoteLoggerPool of " + std::to_string(d_pool.size()) + " loggers (" + std::to_string(stats.d_queued) + " processed, " + std::to_string(stats.d_pipeFull + stats.d_tooLarge + stats.d_otherError) + " dropped)[ " + loggersDesc + "]";
+}
+
+RemoteLoggerInterface::Result RemoteLoggerPool::queueData(const std::string& data)
+{
+  auto result = (*d_pool_it)->queueData(data);
+  d_pool_it++;
+  if (d_pool_it == d_pool.end()) {
+    d_pool_it = d_pool.begin();
+  }
+  return result;
+}
diff --git a/pdns/remote_logger_pool.hh b/pdns/remote_logger_pool.hh
new file mode 100644 (file)
index 0000000..fdfe2f4
--- /dev/null
@@ -0,0 +1,62 @@
+/*
+ * This file is part of PowerDNS or dnsdist.
+ * Copyright -- PowerDNS.COM B.V. and its contributors
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of version 2 of the GNU General Public License as
+ * published by the Free Software Foundation.
+ *
+ * In addition, for the avoidance of any doubt, permission is granted to
+ * link this program with OpenSSL and to (re)distribute the binaries
+ * produced as the result of such linking.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
+ */
+#pragma once
+#include "config.h"
+#include "remote_logger.hh"
+#include <memory>
+#include <vector>
+
+class RemoteLoggerPool : public RemoteLoggerInterface
+{
+public:
+  RemoteLoggerPool(std::vector<std::shared_ptr<RemoteLoggerInterface>>&& pool);
+  RemoteLoggerPool(const RemoteLoggerPool&) = delete;
+  RemoteLoggerPool(RemoteLoggerPool&&) = delete;
+  RemoteLoggerPool& operator=(const RemoteLoggerPool&) = delete;
+  RemoteLoggerPool& operator=(RemoteLoggerPool&&) = delete;
+  [[nodiscard]] RemoteLoggerInterface::Result queueData(const std::string& data) override;
+
+  [[nodiscard]] std::string address() const override
+  {
+    return "";
+  }
+
+  [[nodiscard]] std::string name() const override
+  {
+    return "";
+  }
+
+  [[nodiscard]] std::string toString() override;
+
+  [[nodiscard]] RemoteLoggerInterface::Stats getStats() override
+  {
+    Stats total_stats;
+    for (auto& logger : d_pool) {
+      total_stats += logger->getStats();
+    }
+    return total_stats;
+  }
+
+private:
+  std::vector<std::shared_ptr<RemoteLoggerInterface>> d_pool;
+  std::vector<std::shared_ptr<RemoteLoggerInterface>>::iterator d_pool_it;
+};