From: Ensar Sarajčić Date: Wed, 5 Feb 2025 14:58:05 +0000 (+0100) Subject: dnsdist: add pooling support for RemoteLoggerInterface X-Git-Tag: dnsdist-2.0.0-alpha1~111^2~24 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=965f0bb3be062cc04cc9c922e0407a91944dc789;p=thirdparty%2Fpdns.git dnsdist: add pooling support for RemoteLoggerInterface This adds a new kind of `RemoteLoggerInterface`: `RemoteLoggerPool`. It can take multiple other `RemoteLoggerInterface`s and pass data to them in round-robin order by default. This also adds additional option to `newRemoteLogger`, `newFrameStreamTcpLogger` and `newFrameStreamUnixLogger`: `connectionCount`, which can be used to generate a pool with multiple connections. Closes: #14861 --- diff --git a/pdns/dnsdistdist/Makefile.am b/pdns/dnsdistdist/Makefile.am index c1507b80b7..e8225d954b 100644 --- a/pdns/dnsdistdist/Makefile.am +++ b/pdns/dnsdistdist/Makefile.am @@ -287,6 +287,7 @@ dnsdist_SOURCES = \ proxy-protocol.cc proxy-protocol.hh \ qtype.cc qtype.hh \ remote_logger.cc remote_logger.hh \ + remote_logger_pool.cc remote_loggerpool.hh \ sholder.hh \ snmp-agent.cc snmp-agent.hh \ sstuff.hh \ diff --git a/pdns/dnsdistdist/dnsdist-lua-bindings-protobuf.cc b/pdns/dnsdistdist/dnsdist-lua-bindings-protobuf.cc index 8235f48af9..2d0d179574 100644 --- a/pdns/dnsdistdist/dnsdist-lua-bindings-protobuf.cc +++ b/pdns/dnsdistdist/dnsdist-lua-bindings-protobuf.cc @@ -30,6 +30,7 @@ #include "fstrm_logger.hh" #include "ipcipher.hh" #include "remote_logger.hh" +#include "remote_logger_pool.hh" #ifdef HAVE_FSTRM static void parseFSTRMOptions(boost::optional>& params, LuaAssociativeTable& options) @@ -38,7 +39,7 @@ static void parseFSTRMOptions(boost::optional> return; } - static std::vector const potentialOptions = { "bufferHint", "flushTimeout", "inputQueueSize", "outputQueueSize", "queueNotifyThreshold", "reopenInterval" }; + static std::vector const potentialOptions = { "bufferHint", "flushTimeout", "inputQueueSize", "outputQueueSize", "queueNotifyThreshold", "reopenInterval", "connectionCount" }; for (const auto& potentialOption : potentialOptions) { getOptionalValue(params, potentialOption, options[potentialOption]); @@ -62,14 +63,14 @@ void setupLuaBindingsProtoBuf(LuaContext& luaCtx, bool client, bool configCheck) #endif /* HAVE_IPCIPHER */ /* ProtobufMessage */ - luaCtx.registerFunction("setTag", [](DNSDistProtoBufMessage& message, const std::string& strValue) { - message.addTag(strValue); - }); - luaCtx.registerFunction)>("setTagArray", [](DNSDistProtoBufMessage& message, const LuaArray& tags) { - for (const auto& tag : tags) { - message.addTag(tag.second); - } - }); + pick luaCtx.registerFunction("setTag", [](DNSDistProtoBufMessage& message, const std::string& strValue) { + message.addTag(strValue); + }); + luaCtx.registerFunction)>("setTagArray", [](DNSDistProtoBufMessage& message, const LuaArray& tags) { + for (const auto& tag : tags) { + message.addTag(tag.second); + } + }); luaCtx.registerFunction sec, boost::optional uSec)>("setProtobufResponseType", [](DNSDistProtoBufMessage& message, boost::optional sec, boost::optional uSec) { @@ -121,12 +122,22 @@ void setupLuaBindingsProtoBuf(LuaContext& luaCtx, bool client, bool configCheck) }); /* RemoteLogger */ - luaCtx.writeFunction("newRemoteLogger", [client,configCheck](const std::string& remote, boost::optional timeout, boost::optional maxQueuedEntries, boost::optional reconnectWaitTime) { + luaCtx.writeFunction("newRemoteLogger", [client,configCheck](const std::string& remote, boost::optional timeout, boost::optional maxQueuedEntries, boost::optional reconnectWaitTime, boost::optional connectionCount) { if (client || configCheck) { return std::shared_ptr(nullptr); } - return std::shared_ptr(new RemoteLogger(ComboAddress(remote), timeout ? *timeout : 2, maxQueuedEntries ? (*maxQueuedEntries*100) : 10000, reconnectWaitTime ? *reconnectWaitTime : 1, client)); - }); + auto count = connectionCount ? *connectionCount : 1; + if (count > 1) { + std::vector> loggers; + for (uint64_t i = 0; i < count; i++) { + loggers.emplace_back(new RemoteLogger(ComboAddress(remote), timeout ? *timeout : 2, maxQueuedEntries ? (*maxQueuedEntries * 100) : 10000, reconnectWaitTime ? *reconnectWaitTime : 1, client)); + } + return std::shared_ptr(new RemoteLoggerPool(std::move(loggers))); + } + else { + return std::shared_ptr(new RemoteLogger(ComboAddress(remote), timeout ? *timeout : 2, maxQueuedEntries ? (*maxQueuedEntries * 100) : 10000, reconnectWaitTime ? *reconnectWaitTime : 1, client)); + } + }); luaCtx.writeFunction("newFrameStreamUnixLogger", [client,configCheck]([[maybe_unused]] const std::string& address, [[maybe_unused]] boost::optional> params) { #ifdef HAVE_FSTRM @@ -137,11 +148,23 @@ void setupLuaBindingsProtoBuf(LuaContext& luaCtx, bool client, bool configCheck) LuaAssociativeTable options; parseFSTRMOptions(params, options); checkAllParametersConsumed("newRemoteLogger", params); - return std::shared_ptr(new FrameStreamLogger(AF_UNIX, address, !client, options)); + auto connectionCount = options.find("connectionCount"); + auto count = connectionCount == options.end() ? 1 : connectionCount->second; + options.erase(connectionCount); + if (count > 1) { + std::vector> loggers; + for (uint64_t i = 0; i < count; i++) { + loggers.emplace_back(new FrameStreamLogger(AF_UNIX, address, !client, options)); + } + return std::shared_ptr(new RemoteLoggerPool(std::move(loggers))); + } + else { + return std::shared_ptr(new FrameStreamLogger(AF_UNIX, address, !client, options)); + } #else - throw std::runtime_error("fstrm support is required to build an AF_UNIX FrameStreamLogger"); + throw std::runtime_error("fstrm support is required to build an AF_UNIX FrameStreamLogger"); #endif /* HAVE_FSTRM */ - }); + }); luaCtx.writeFunction("newFrameStreamTcpLogger", [client,configCheck]([[maybe_unused]] const std::string& address, [[maybe_unused]] boost::optional> params) { #if defined(HAVE_FSTRM) && defined(HAVE_FSTRM_TCP_WRITER_INIT) @@ -152,7 +175,19 @@ void setupLuaBindingsProtoBuf(LuaContext& luaCtx, bool client, bool configCheck) LuaAssociativeTable options; parseFSTRMOptions(params, options); checkAllParametersConsumed("newFrameStreamTcpLogger", params); - return std::shared_ptr(new FrameStreamLogger(AF_INET, address, !client, options)); + auto connectionCount = options.find("connectionCount"); + auto count = connectionCount == options.end() ? 1 : connectionCount->second; + options.erase(connectionCount); + if (count > 1) { + std::vector> loggers; + for (uint64_t i = 0; i < count; i++) { + loggers.emplace_back(new FrameStreamLogger(AF_INET, address, !client, options)); + } + return std::shared_ptr(new RemoteLoggerPool(std::move(loggers))); + } + else { + return std::shared_ptr(new FrameStreamLogger(AF_INET, address, !client, options)); + } #else throw std::runtime_error("fstrm with TCP support is required to build an AF_INET FrameStreamLogger"); #endif /* HAVE_FSTRM */ diff --git a/pdns/dnsdistdist/remote_logger_pool.cc b/pdns/dnsdistdist/remote_logger_pool.cc new file mode 120000 index 0000000000..b92d4c52da --- /dev/null +++ b/pdns/dnsdistdist/remote_logger_pool.cc @@ -0,0 +1 @@ +../remote_logger_pool.cc \ No newline at end of file diff --git a/pdns/dnsdistdist/remote_logger_pool.hh b/pdns/dnsdistdist/remote_logger_pool.hh new file mode 120000 index 0000000000..037a11a7f9 --- /dev/null +++ b/pdns/dnsdistdist/remote_logger_pool.hh @@ -0,0 +1 @@ +../remote_logger_pool.hh \ No newline at end of file diff --git a/pdns/remote_logger_pool.cc b/pdns/remote_logger_pool.cc new file mode 100644 index 0000000000..a730b125cc --- /dev/null +++ b/pdns/remote_logger_pool.cc @@ -0,0 +1,37 @@ +#include +#include +#include +#include +#include + +#include "config.h" +#include "remote_logger_pool.hh" + +RemoteLoggerPool::RemoteLoggerPool(std::vector>&& pool) : + d_pool(std::move(pool)) +{ + d_pool_it = d_pool.begin(); +} + +[[nodiscard]] std::string RemoteLoggerPool::toString() +{ + auto stats = this->getStats(); + std::string loggersDesc; + for (size_t i = 0; i < this->d_pool.size(); i++) { + if (i > 0) { + loggersDesc += ", "; + } + loggersDesc += d_pool[i]->toString(); + } + return "RemoteLoggerPool of " + std::to_string(d_pool.size()) + " loggers (" + std::to_string(stats.d_queued) + " processed, " + std::to_string(stats.d_pipeFull + stats.d_tooLarge + stats.d_otherError) + " dropped)[ " + loggersDesc + "]"; +} + +RemoteLoggerInterface::Result RemoteLoggerPool::queueData(const std::string& data) +{ + auto result = (*d_pool_it)->queueData(data); + d_pool_it++; + if (d_pool_it == d_pool.end()) { + d_pool_it = d_pool.begin(); + } + return result; +} diff --git a/pdns/remote_logger_pool.hh b/pdns/remote_logger_pool.hh new file mode 100644 index 0000000000..fdfe2f4908 --- /dev/null +++ b/pdns/remote_logger_pool.hh @@ -0,0 +1,62 @@ +/* + * This file is part of PowerDNS or dnsdist. + * Copyright -- PowerDNS.COM B.V. and its contributors + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of version 2 of the GNU General Public License as + * published by the Free Software Foundation. + * + * In addition, for the avoidance of any doubt, permission is granted to + * link this program with OpenSSL and to (re)distribute the binaries + * produced as the result of such linking. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. + */ +#pragma once +#include "config.h" +#include "remote_logger.hh" +#include +#include + +class RemoteLoggerPool : public RemoteLoggerInterface +{ +public: + RemoteLoggerPool(std::vector>&& pool); + RemoteLoggerPool(const RemoteLoggerPool&) = delete; + RemoteLoggerPool(RemoteLoggerPool&&) = delete; + RemoteLoggerPool& operator=(const RemoteLoggerPool&) = delete; + RemoteLoggerPool& operator=(RemoteLoggerPool&&) = delete; + [[nodiscard]] RemoteLoggerInterface::Result queueData(const std::string& data) override; + + [[nodiscard]] std::string address() const override + { + return ""; + } + + [[nodiscard]] std::string name() const override + { + return ""; + } + + [[nodiscard]] std::string toString() override; + + [[nodiscard]] RemoteLoggerInterface::Stats getStats() override + { + Stats total_stats; + for (auto& logger : d_pool) { + total_stats += logger->getStats(); + } + return total_stats; + } + +private: + std::vector> d_pool; + std::vector>::iterator d_pool_it; +};