#include "fstrm_logger.hh"
#include "ipcipher.hh"
#include "remote_logger.hh"
+#include "remote_logger_pool.hh"
#ifdef HAVE_FSTRM
static void parseFSTRMOptions(boost::optional<LuaAssociativeTable<unsigned int>>& params, LuaAssociativeTable<unsigned int>& options)
return;
}
- static std::vector<std::string> const potentialOptions = { "bufferHint", "flushTimeout", "inputQueueSize", "outputQueueSize", "queueNotifyThreshold", "reopenInterval" };
+ static std::vector<std::string> const potentialOptions = { "bufferHint", "flushTimeout", "inputQueueSize", "outputQueueSize", "queueNotifyThreshold", "reopenInterval", "connectionCount" };
for (const auto& potentialOption : potentialOptions) {
getOptionalValue<unsigned int>(params, potentialOption, options[potentialOption]);
#endif /* HAVE_IPCIPHER */
/* ProtobufMessage */
- luaCtx.registerFunction<void(DNSDistProtoBufMessage::*)(std::string)>("setTag", [](DNSDistProtoBufMessage& message, const std::string& strValue) {
- message.addTag(strValue);
- });
- luaCtx.registerFunction<void(DNSDistProtoBufMessage::*)(LuaArray<std::string>)>("setTagArray", [](DNSDistProtoBufMessage& message, const LuaArray<std::string>& tags) {
- for (const auto& tag : tags) {
- message.addTag(tag.second);
- }
- });
+ pick luaCtx.registerFunction<void (DNSDistProtoBufMessage::*)(std::string)>("setTag", [](DNSDistProtoBufMessage& message, const std::string& strValue) {
+ message.addTag(strValue);
+ });
+ luaCtx.registerFunction<void (DNSDistProtoBufMessage::*)(LuaArray<std::string>)>("setTagArray", [](DNSDistProtoBufMessage& message, const LuaArray<std::string>& tags) {
+ for (const auto& tag : tags) {
+ message.addTag(tag.second);
+ }
+ });
luaCtx.registerFunction<void(DNSDistProtoBufMessage::*)(boost::optional <time_t> sec, boost::optional <uint32_t> uSec)>("setProtobufResponseType",
[](DNSDistProtoBufMessage& message, boost::optional <time_t> sec, boost::optional <uint32_t> uSec) {
});
/* RemoteLogger */
- luaCtx.writeFunction("newRemoteLogger", [client,configCheck](const std::string& remote, boost::optional<uint16_t> timeout, boost::optional<uint64_t> maxQueuedEntries, boost::optional<uint8_t> reconnectWaitTime) {
+ luaCtx.writeFunction("newRemoteLogger", [client,configCheck](const std::string& remote, boost::optional<uint16_t> timeout, boost::optional<uint64_t> maxQueuedEntries, boost::optional<uint8_t> reconnectWaitTime, boost::optional<uint64_t> connectionCount) {
if (client || configCheck) {
return std::shared_ptr<RemoteLoggerInterface>(nullptr);
}
- return std::shared_ptr<RemoteLoggerInterface>(new RemoteLogger(ComboAddress(remote), timeout ? *timeout : 2, maxQueuedEntries ? (*maxQueuedEntries*100) : 10000, reconnectWaitTime ? *reconnectWaitTime : 1, client));
- });
+ auto count = connectionCount ? *connectionCount : 1;
+ if (count > 1) {
+ std::vector<std::shared_ptr<RemoteLoggerInterface>> loggers;
+ for (uint64_t i = 0; i < count; i++) {
+ loggers.emplace_back(new RemoteLogger(ComboAddress(remote), timeout ? *timeout : 2, maxQueuedEntries ? (*maxQueuedEntries * 100) : 10000, reconnectWaitTime ? *reconnectWaitTime : 1, client));
+ }
+ return std::shared_ptr<RemoteLoggerInterface>(new RemoteLoggerPool(std::move(loggers)));
+ }
+ else {
+ return std::shared_ptr<RemoteLoggerInterface>(new RemoteLogger(ComboAddress(remote), timeout ? *timeout : 2, maxQueuedEntries ? (*maxQueuedEntries * 100) : 10000, reconnectWaitTime ? *reconnectWaitTime : 1, client));
+ }
+ });
luaCtx.writeFunction("newFrameStreamUnixLogger", [client,configCheck]([[maybe_unused]] const std::string& address, [[maybe_unused]] boost::optional<LuaAssociativeTable<unsigned int>> params) {
#ifdef HAVE_FSTRM
LuaAssociativeTable<unsigned int> options;
parseFSTRMOptions(params, options);
checkAllParametersConsumed("newRemoteLogger", params);
- return std::shared_ptr<RemoteLoggerInterface>(new FrameStreamLogger(AF_UNIX, address, !client, options));
+ auto connectionCount = options.find("connectionCount");
+ auto count = connectionCount == options.end() ? 1 : connectionCount->second;
+ options.erase(connectionCount);
+ if (count > 1) {
+ std::vector<std::shared_ptr<RemoteLoggerInterface>> loggers;
+ for (uint64_t i = 0; i < count; i++) {
+ loggers.emplace_back(new FrameStreamLogger(AF_UNIX, address, !client, options));
+ }
+ return std::shared_ptr<RemoteLoggerInterface>(new RemoteLoggerPool(std::move(loggers)));
+ }
+ else {
+ return std::shared_ptr<RemoteLoggerInterface>(new FrameStreamLogger(AF_UNIX, address, !client, options));
+ }
#else
- throw std::runtime_error("fstrm support is required to build an AF_UNIX FrameStreamLogger");
+ throw std::runtime_error("fstrm support is required to build an AF_UNIX FrameStreamLogger");
#endif /* HAVE_FSTRM */
- });
+ });
luaCtx.writeFunction("newFrameStreamTcpLogger", [client,configCheck]([[maybe_unused]] const std::string& address, [[maybe_unused]] boost::optional<LuaAssociativeTable<unsigned int>> params) {
#if defined(HAVE_FSTRM) && defined(HAVE_FSTRM_TCP_WRITER_INIT)
LuaAssociativeTable<unsigned int> options;
parseFSTRMOptions(params, options);
checkAllParametersConsumed("newFrameStreamTcpLogger", params);
- return std::shared_ptr<RemoteLoggerInterface>(new FrameStreamLogger(AF_INET, address, !client, options));
+ auto connectionCount = options.find("connectionCount");
+ auto count = connectionCount == options.end() ? 1 : connectionCount->second;
+ options.erase(connectionCount);
+ if (count > 1) {
+ std::vector<std::shared_ptr<RemoteLoggerInterface>> loggers;
+ for (uint64_t i = 0; i < count; i++) {
+ loggers.emplace_back(new FrameStreamLogger(AF_INET, address, !client, options));
+ }
+ return std::shared_ptr<RemoteLoggerInterface>(new RemoteLoggerPool(std::move(loggers)));
+ }
+ else {
+ return std::shared_ptr<RemoteLoggerInterface>(new FrameStreamLogger(AF_INET, address, !client, options));
+ }
#else
throw std::runtime_error("fstrm with TCP support is required to build an AF_INET FrameStreamLogger");
#endif /* HAVE_FSTRM */
--- /dev/null
+/*
+ * This file is part of PowerDNS or dnsdist.
+ * Copyright -- PowerDNS.COM B.V. and its contributors
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of version 2 of the GNU General Public License as
+ * published by the Free Software Foundation.
+ *
+ * In addition, for the avoidance of any doubt, permission is granted to
+ * link this program with OpenSSL and to (re)distribute the binaries
+ * produced as the result of such linking.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
+ */
+#pragma once
+#include "config.h"
+#include "remote_logger.hh"
+#include <memory>
+#include <vector>
+
+class RemoteLoggerPool : public RemoteLoggerInterface
+{
+public:
+ RemoteLoggerPool(std::vector<std::shared_ptr<RemoteLoggerInterface>>&& pool);
+ RemoteLoggerPool(const RemoteLoggerPool&) = delete;
+ RemoteLoggerPool(RemoteLoggerPool&&) = delete;
+ RemoteLoggerPool& operator=(const RemoteLoggerPool&) = delete;
+ RemoteLoggerPool& operator=(RemoteLoggerPool&&) = delete;
+ [[nodiscard]] RemoteLoggerInterface::Result queueData(const std::string& data) override;
+
+ [[nodiscard]] std::string address() const override
+ {
+ return "";
+ }
+
+ [[nodiscard]] std::string name() const override
+ {
+ return "";
+ }
+
+ [[nodiscard]] std::string toString() override;
+
+ [[nodiscard]] RemoteLoggerInterface::Stats getStats() override
+ {
+ Stats total_stats;
+ for (auto& logger : d_pool) {
+ total_stats += logger->getStats();
+ }
+ return total_stats;
+ }
+
+private:
+ std::vector<std::shared_ptr<RemoteLoggerInterface>> d_pool;
+ std::vector<std::shared_ptr<RemoteLoggerInterface>>::iterator d_pool_it;
+};