From: Remi Gacogne Date: Mon, 22 Oct 2018 07:53:27 +0000 (+0200) Subject: rec: Add support for more than one protobuf server X-Git-Tag: auth-4.2.0-alpha1~50^2~1 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=b773359c6a950e0dd9b07082471e972f4e8b61fc;p=thirdparty%2Fpdns.git rec: Add support for more than one protobuf server --- diff --git a/pdns/lwres.cc b/pdns/lwres.cc index dd561f19e4..540823abdf 100644 --- a/pdns/lwres.cc +++ b/pdns/lwres.cc @@ -50,9 +50,9 @@ #ifdef HAVE_PROTOBUF -static void logOutgoingQuery(std::shared_ptr outgoingLogger, boost::optional initialRequestId, const boost::uuids::uuid& uuid, const ComboAddress& ip, const DNSName& domain, int type, uint16_t qid, bool doTCP, size_t bytes, boost::optional& srcmask) +static void logOutgoingQuery(const std::shared_ptr>>& outgoingLoggers, boost::optional initialRequestId, const boost::uuids::uuid& uuid, const ComboAddress& ip, const DNSName& domain, int type, uint16_t qid, bool doTCP, size_t bytes, boost::optional& srcmask) { - if(!outgoingLogger) + if(!outgoingLoggers) return; RecProtoBufMessage message(DNSProtoBufMessage::OutgoingQuery, uuid, nullptr, &ip, domain, type, QClass::IN, qid, doTCP, bytes); @@ -69,12 +69,15 @@ static void logOutgoingQuery(std::shared_ptr outgoingLogger, boost // cerr <queueData(str); + + for (auto& logger : *outgoingLoggers) { + logger->queueData(str); + } } -static void logIncomingResponse(std::shared_ptr outgoingLogger, boost::optional initialRequestId, const boost::uuids::uuid& uuid, const ComboAddress& ip, const DNSName& domain, int type, uint16_t qid, bool doTCP, size_t bytes, int rcode, const std::vector& records, const struct timeval& queryTime, const std::set& exportTypes) +static void logIncomingResponse(const std::shared_ptr>>& outgoingLoggers, boost::optional initialRequestId, const boost::uuids::uuid& uuid, const ComboAddress& ip, const DNSName& domain, int type, uint16_t qid, bool doTCP, size_t bytes, int rcode, const std::vector& records, const struct timeval& queryTime, const std::set& exportTypes) { - if(!outgoingLogger) + if(!outgoingLoggers) return; RecProtoBufMessage message(DNSProtoBufMessage::IncomingResponse, uuid, nullptr, &ip, domain, type, QClass::IN, qid, doTCP, bytes); @@ -89,7 +92,10 @@ static void logIncomingResponse(std::shared_ptr outgoingLogger, bo // cerr <queueData(str); + + for (auto& logger : *outgoingLoggers) { + logger->queueData(str); + } } #endif /* HAVE_PROTOBUF */ @@ -97,7 +103,7 @@ static void logIncomingResponse(std::shared_ptr outgoingLogger, bo /** lwr is only filled out in case 1 was returned, and even when returning 1 for 'success', lwr might contain DNS errors Never throws! */ -int asyncresolve(const ComboAddress& ip, const DNSName& domain, int type, bool doTCP, bool sendRDQuery, int EDNS0Level, struct timeval* now, boost::optional& srcmask, boost::optional context, const std::shared_ptr& outgoingLogger, const std::set& exportTypes, LWResult *lwr, bool* chained) +int asyncresolve(const ComboAddress& ip, const DNSName& domain, int type, bool doTCP, bool sendRDQuery, int EDNS0Level, struct timeval* now, boost::optional& srcmask, boost::optional context, const std::shared_ptr>>& outgoingLoggers, const std::set& exportTypes, LWResult *lwr, bool* chained) { size_t len; size_t bufsize=g_outgoingEDNSBufsize; @@ -153,9 +159,9 @@ int asyncresolve(const ComboAddress& ip, const DNSName& domain, int type, bool d boost::uuids::uuid uuid; const struct timeval queryTime = *now; - if (outgoingLogger) { + if (outgoingLoggers) { uuid = (*t_uuidGenerator)(); - logOutgoingQuery(outgoingLogger, context ? context->d_initialRequestId : boost::none, uuid, ip, domain, type, qid, doTCP, vpacket.size(), srcmask); + logOutgoingQuery(outgoingLoggers, context ? context->d_initialRequestId : boost::none, uuid, ip, domain, type, qid, doTCP, vpacket.size(), srcmask); } #endif @@ -241,8 +247,8 @@ int asyncresolve(const ComboAddress& ip, const DNSName& domain, int type, bool d if(mdp.d_header.rcode == RCode::FormErr && mdp.d_qname.empty() && mdp.d_qtype == 0 && mdp.d_qclass == 0) { #ifdef HAVE_PROTOBUF - if(outgoingLogger) { - logIncomingResponse(outgoingLogger, context ? context->d_initialRequestId : boost::none, uuid, ip, domain, type, qid, doTCP, len, lwr->d_rcode, lwr->d_records, queryTime, exportTypes); + if(outgoingLoggers) { + logIncomingResponse(outgoingLoggers, context ? context->d_initialRequestId : boost::none, uuid, ip, domain, type, qid, doTCP, len, lwr->d_rcode, lwr->d_records, queryTime, exportTypes); } #endif lwr->d_validpacket=true; @@ -287,8 +293,8 @@ int asyncresolve(const ComboAddress& ip, const DNSName& domain, int type, bool d } #ifdef HAVE_PROTOBUF - if(outgoingLogger) { - logIncomingResponse(outgoingLogger, context ? context->d_initialRequestId : boost::none, uuid, ip, domain, type, qid, doTCP, len, lwr->d_rcode, lwr->d_records, queryTime, exportTypes); + if(outgoingLoggers) { + logIncomingResponse(outgoingLoggers, context ? context->d_initialRequestId : boost::none, uuid, ip, domain, type, qid, doTCP, len, lwr->d_rcode, lwr->d_records, queryTime, exportTypes); } #endif lwr->d_validpacket=true; @@ -300,8 +306,8 @@ int asyncresolve(const ComboAddress& ip, const DNSName& domain, int type, bool d lwr->d_rcode = RCode::FormErr; g_stats.serverParseError++; #ifdef HAVE_PROTOBUF - if(outgoingLogger) { - logIncomingResponse(outgoingLogger, context ? context->d_initialRequestId : boost::none, uuid, ip, domain, type, qid, doTCP, len, lwr->d_rcode, lwr->d_records, queryTime, exportTypes); + if(outgoingLoggers) { + logIncomingResponse(outgoingLoggers, context ? context->d_initialRequestId : boost::none, uuid, ip, domain, type, qid, doTCP, len, lwr->d_rcode, lwr->d_records, queryTime, exportTypes); } #endif lwr->d_validpacket=false; diff --git a/pdns/lwres.hh b/pdns/lwres.hh index c9dab346c1..a548206bf9 100644 --- a/pdns/lwres.hh +++ b/pdns/lwres.hh @@ -68,5 +68,5 @@ public: bool d_haveEDNS{false}; }; -int asyncresolve(const ComboAddress& ip, const DNSName& domain, int type, bool doTCP, bool sendRDQuery, int EDNS0Level, struct timeval* now, boost::optional& srcmask, boost::optional context, const std::shared_ptr& outgoingLogger, const std::set& exportTypes, LWResult* res, bool* chained); +int asyncresolve(const ComboAddress& ip, const DNSName& domain, int type, bool doTCP, bool sendRDQuery, int EDNS0Level, struct timeval* now, boost::optional& srcmask, boost::optional context, const std::shared_ptr>>& outgoingLoggers, const std::set& exportTypes, LWResult* res, bool* chained); #endif // PDNS_LWRES_HH diff --git a/pdns/pdns_recursor.cc b/pdns/pdns_recursor.cc index 6360a6ba02..17f7051b6b 100644 --- a/pdns/pdns_recursor.cc +++ b/pdns/pdns_recursor.cc @@ -109,8 +109,10 @@ static thread_local unsigned int t_id = 0; static thread_local std::shared_ptr t_traceRegex; static thread_local std::unique_ptr t_tcpClientCounts; #ifdef HAVE_PROTOBUF -static thread_local std::shared_ptr t_protobufServer{nullptr}; -static thread_local std::shared_ptr t_outgoingProtobufServer{nullptr}; +static thread_local std::shared_ptr>> t_protobufServers{nullptr}; +static thread_local uint64_t t_protobufServersGeneration; +static thread_local std::shared_ptr>> t_outgoingProtobufServers{nullptr}; +static thread_local uint64_t t_outgoingProtobufServersGeneration; #endif /* HAVE_PROTOBUF */ thread_local std::unique_ptr MT; // the big MTasker @@ -774,8 +776,12 @@ catch(...) } #ifdef HAVE_PROTOBUF -static void protobufLogQuery(const std::shared_ptr& logger, uint8_t maskV4, uint8_t maskV6, const boost::uuids::uuid& uniqueId, const ComboAddress& remote, const ComboAddress& local, const Netmask& ednssubnet, bool tcp, uint16_t id, size_t len, const DNSName& qname, uint16_t qtype, uint16_t qclass, const std::vector& policyTags, const std::string& requestorId, const std::string& deviceId) +static void protobufLogQuery(uint8_t maskV4, uint8_t maskV6, const boost::uuids::uuid& uniqueId, const ComboAddress& remote, const ComboAddress& local, const Netmask& ednssubnet, bool tcp, uint16_t id, size_t len, const DNSName& qname, uint16_t qtype, uint16_t qclass, const std::vector& policyTags, const std::string& requestorId, const std::string& deviceId) { + if (!t_protobufServers) { + return; + } + Netmask requestorNM(remote, remote.sin4.sin_family == AF_INET ? maskV4 : maskV6); const ComboAddress& requestor = requestorNM.getMaskedNetwork(); RecProtoBufMessage message(DNSProtoBufMessage::Query, uniqueId, &requestor, &local, qname, qtype, qclass, id, tcp, len); @@ -791,15 +797,25 @@ static void protobufLogQuery(const std::shared_ptr& logger, uint8_ // cerr <queueData(str); + + for (auto& server : *t_protobufServers) { + server->queueData(str); + } } -static void protobufLogResponse(const std::shared_ptr& logger, const RecProtoBufMessage& message) +static void protobufLogResponse(const RecProtoBufMessage& message) { + if (!t_protobufServers) { + return; + } + // cerr <queueData(str); + + for (auto& server : *t_protobufServers) { + server->queueData(str); + } } #endif @@ -850,18 +866,20 @@ static bool addRecordToPacket(DNSPacketWriter& pw, const DNSRecord& rec, uint32_ } #ifdef HAVE_PROTOBUF -static std::shared_ptr startProtobufServer(const ProtobufExportConfig& config, uint64_t generation) +static std::shared_ptr>> startProtobufServers(const ProtobufExportConfig& config) { - std::shared_ptr result = nullptr; - try { - result = std::make_shared(config.server, config.timeout, config.maxQueuedEntries, config.reconnectWaitTime, config.asyncConnect); - result->setGeneration(generation); - } - catch(const std::exception& e) { - g_log<>>(); + + for (const auto& server : config.servers) { + try { + result->push_back(std::make_shared(server, config.timeout, config.maxQueuedEntries, config.reconnectWaitTime, config.asyncConnect)); + } + catch(const std::exception& e) { + g_log< startProtobufServer(const ProtobufExportCon static bool checkProtobufExport(LocalStateHolder& luaconfsLocal) { if (!luaconfsLocal->protobufExportConfig.enabled) { - if (t_protobufServer != nullptr) { - t_protobufServer->stop(); - t_protobufServer = nullptr; + if (t_protobufServers) { + for (auto& server : *t_protobufServers) { + server->stop(); + } + t_protobufServers.reset(); } return false; @@ -880,14 +900,18 @@ static bool checkProtobufExport(LocalStateHolder& luaconfsLocal) /* if the server was not running, or if it was running according to a previous configuration */ - if (t_protobufServer == nullptr || - t_protobufServer->getGeneration() < luaconfsLocal->generation) { + if (!t_protobufServers || + t_protobufServersGeneration < luaconfsLocal->generation) { - if (t_protobufServer) { - t_protobufServer->stop(); + if (t_protobufServers) { + for (auto& server : *t_protobufServers) { + server->stop(); + } } + t_protobufServers.reset(); - t_protobufServer = startProtobufServer(luaconfsLocal->protobufExportConfig, luaconfsLocal->generation); + t_protobufServers = startProtobufServers(luaconfsLocal->protobufExportConfig); + t_protobufServersGeneration = luaconfsLocal->generation; } return true; @@ -896,24 +920,30 @@ static bool checkProtobufExport(LocalStateHolder& luaconfsLocal) static bool checkOutgoingProtobufExport(LocalStateHolder& luaconfsLocal) { if (!luaconfsLocal->outgoingProtobufExportConfig.enabled) { - if (t_outgoingProtobufServer != nullptr) { - t_outgoingProtobufServer->stop(); - t_outgoingProtobufServer = nullptr; + if (t_outgoingProtobufServers) { + for (auto& server : *t_outgoingProtobufServers) { + server->stop(); + } } + t_outgoingProtobufServers.reset(); return false; } /* if the server was not running, or if it was running according to a previous configuration */ - if (t_outgoingProtobufServer == nullptr || - t_outgoingProtobufServer->getGeneration() < luaconfsLocal->generation) { + if (!t_outgoingProtobufServers || + t_outgoingProtobufServersGeneration < luaconfsLocal->generation) { - if (t_outgoingProtobufServer) { - t_outgoingProtobufServer->stop(); + if (t_outgoingProtobufServers) { + for (auto& server : *t_outgoingProtobufServers) { + server->stop(); + } } + t_outgoingProtobufServers.reset(); - t_outgoingProtobufServer = startProtobufServer(luaconfsLocal->outgoingProtobufExportConfig, luaconfsLocal->generation); + t_outgoingProtobufServers = startProtobufServers(luaconfsLocal->outgoingProtobufExportConfig); + t_outgoingProtobufServersGeneration = luaconfsLocal->generation; } return true; @@ -1038,7 +1068,7 @@ static void startDoResolve(void *p) bool logResponse = false; #ifdef HAVE_PROTOBUF if (checkProtobufExport(luaconfsLocal)) { - logResponse = t_protobufServer && luaconfsLocal->protobufExportConfig.logResponses; + logResponse = t_protobufServers && luaconfsLocal->protobufExportConfig.logResponses; Netmask requestorNM(dc->d_source, dc->d_source.sin4.sin_family == AF_INET ? luaconfsLocal->protobufMaskV4 : luaconfsLocal->protobufMaskV6); const ComboAddress& requestor = requestorNM.getMaskedNetwork(); pbMessage = RecProtoBufMessage(RecProtoBufMessage::Response, dc->d_uuid, &requestor, &dc->d_destination, dc->d_mdp.d_qname, dc->d_mdp.d_qtype, dc->d_mdp.d_qclass, dc->d_mdp.d_header.id, dc->d_tcp, 0); @@ -1085,7 +1115,7 @@ static void startDoResolve(void *p) #ifdef HAVE_PROTOBUF sr.setInitialRequestId(dc->d_uuid); - sr.setOutgoingProtobufServer(t_outgoingProtobufServer); + sr.setOutgoingProtobufServers(t_outgoingProtobufServers); #endif sr.setQuerySource(dc->d_remote, g_useIncomingECS && !dc->d_ednssubnet.source.empty() ? boost::optional(dc->d_ednssubnet) : boost::none); @@ -1422,7 +1452,7 @@ static void startDoResolve(void *p) #endif /* NOD ENABLED */ #ifdef HAVE_PROTOBUF - if(t_protobufServer) { + if (t_protobufServers) { #ifdef NOD_ENABLED pbMessage->addRR(*i, luaconfsLocal->protobufExportConfig.exportTypes, udr); #else @@ -1457,7 +1487,7 @@ static void startDoResolve(void *p) } #endif /* NOD_ENABLED */ #ifdef HAVE_PROTOBUF - if (t_protobufServer && logResponse && !(luaconfsLocal->protobufExportConfig.taggedOnly && (!appliedPolicy.d_name || appliedPolicy.d_name->empty()) && dc->d_policyTags.empty())) { + if (t_protobufServers && logResponse && !(luaconfsLocal->protobufExportConfig.taggedOnly && (!appliedPolicy.d_name || appliedPolicy.d_name->empty()) && dc->d_policyTags.empty())) { pbMessage->setBytes(packet.size()); pbMessage->setResponseCode(pw.getHeader()->rcode); if (appliedPolicy.d_name) { @@ -1479,7 +1509,7 @@ static void startDoResolve(void *p) } } #endif /* NOD_ENABLED */ - protobufLogResponse(t_protobufServer, *pbMessage); + protobufLogResponse(*pbMessage); #ifdef NOD_ENABLED if (g_nodEnabled) { pbMessage->setNOD(false); @@ -1827,7 +1857,7 @@ static void handleRunningTCPQuestion(int fd, FDMultiplexer::funcparam_t& var) if (checkProtobufExport(luaconfsLocal)) { needECS = true; } - logQuery = t_protobufServer && luaconfsLocal->protobufExportConfig.logQueries; + logQuery = t_protobufServers && luaconfsLocal->protobufExportConfig.logQueries; #endif if(needECS || needXPF || (t_pdl && (t_pdl->d_gettag_ffi || t_pdl->d_gettag))) { @@ -1866,17 +1896,17 @@ static void handleRunningTCPQuestion(int fd, FDMultiplexer::funcparam_t& var) const struct dnsheader* dh = reinterpret_cast(&conn->data[0]); #ifdef HAVE_PROTOBUF - if(t_protobufServer || t_outgoingProtobufServer) { + if(t_protobufServers || t_outgoingProtobufServers) { dc->d_requestorId = requestorId; dc->d_deviceId = deviceId; dc->d_uuid = (*t_uuidGenerator)(); } - if(t_protobufServer) { + if(t_protobufServers) { try { if (logQuery && !(luaconfsLocal->protobufExportConfig.taggedOnly && dc->d_policyTags.empty())) { - protobufLogQuery(t_protobufServer, luaconfsLocal->protobufMaskV4, luaconfsLocal->protobufMaskV6, dc->d_uuid, dc->d_source, dc->d_destination, dc->d_ednssubnet.source, true, dh->id, conn->qlen, qname, qtype, qclass, dc->d_policyTags, dc->d_requestorId, dc->d_deviceId); + protobufLogQuery(luaconfsLocal->protobufMaskV4, luaconfsLocal->protobufMaskV6, dc->d_uuid, dc->d_source, dc->d_destination, dc->d_ednssubnet.source, true, dh->id, conn->qlen, qname, qtype, qclass, dc->d_policyTags, dc->d_requestorId, dc->d_deviceId); } } catch(std::exception& e) { @@ -2009,8 +2039,8 @@ static string* doProcessUDPQuestion(const std::string& question, const ComboAddr } else if (checkOutgoingProtobufExport(luaconfsLocal)) { uniqueId = (*t_uuidGenerator)(); } - logQuery = t_protobufServer && luaconfsLocal->protobufExportConfig.logQueries; - bool logResponse = t_protobufServer && luaconfsLocal->protobufExportConfig.logResponses; + logQuery = t_protobufServers && luaconfsLocal->protobufExportConfig.logQueries; + bool logResponse = t_protobufServers && luaconfsLocal->protobufExportConfig.logResponses; #endif EDNSSubnetOpts ednssubnet; bool ecsFound = false; @@ -2074,11 +2104,11 @@ static string* doProcessUDPQuestion(const std::string& question, const ComboAddr bool cacheHit = false; boost::optional pbMessage(boost::none); #ifdef HAVE_PROTOBUF - if(t_protobufServer) { + if (t_protobufServers) { pbMessage = RecProtoBufMessage(DNSProtoBufMessage::DNSProtoBufMessageType::Response); pbMessage->setServerIdentity(SyncRes::s_serverID); if (logQuery && !(luaconfsLocal->protobufExportConfig.taggedOnly && policyTags.empty())) { - protobufLogQuery(t_protobufServer, luaconfsLocal->protobufMaskV4, luaconfsLocal->protobufMaskV6, uniqueId, source, destination, ednssubnet.source, false, dh->id, question.size(), qname, qtype, qclass, policyTags, requestorId, deviceId); + protobufLogQuery(luaconfsLocal->protobufMaskV4, luaconfsLocal->protobufMaskV6, uniqueId, source, destination, ednssubnet.source, false, dh->id, question.size(), qname, qtype, qclass, policyTags, requestorId, deviceId); } } #endif /* HAVE_PROTOBUF */ @@ -2103,7 +2133,7 @@ static string* doProcessUDPQuestion(const std::string& question, const ComboAddr } #ifdef HAVE_PROTOBUF - if(t_protobufServer && logResponse && !(luaconfsLocal->protobufExportConfig.taggedOnly && pbMessage->getAppliedPolicy().empty() && pbMessage->getPolicyTags().empty())) { + if(t_protobufServers && logResponse && !(luaconfsLocal->protobufExportConfig.taggedOnly && pbMessage->getAppliedPolicy().empty() && pbMessage->getPolicyTags().empty())) { Netmask requestorNM(source, source.sin4.sin_family == AF_INET ? luaconfsLocal->protobufMaskV4 : luaconfsLocal->protobufMaskV6); const ComboAddress& requestor = requestorNM.getMaskedNetwork(); pbMessage->update(uniqueId, &requestor, &destination, false, dh->id); @@ -2111,7 +2141,7 @@ static string* doProcessUDPQuestion(const std::string& question, const ComboAddr pbMessage->setQueryTime(g_now.tv_sec, g_now.tv_usec); pbMessage->setRequestorId(requestorId); pbMessage->setDeviceId(deviceId); - protobufLogResponse(t_protobufServer, *pbMessage); + protobufLogResponse(*pbMessage); } #endif /* HAVE_PROTOBUF */ if(!g_quiet) @@ -2179,7 +2209,7 @@ static string* doProcessUDPQuestion(const std::string& question, const ComboAddr dc->d_ttlCap = ttlCap; dc->d_variable = variable; #ifdef HAVE_PROTOBUF - if (t_protobufServer || t_outgoingProtobufServer) { + if (t_protobufServers || t_outgoingProtobufServers) { dc->d_uuid = std::move(uniqueId); } dc->d_requestorId = requestorId; diff --git a/pdns/rec-lua-conf.cc b/pdns/rec-lua-conf.cc index dfa539841e..5abfa4d1a5 100644 --- a/pdns/rec-lua-conf.cc +++ b/pdns/rec-lua-conf.cc @@ -425,41 +425,67 @@ void loadRecursorLuaConfig(const std::string& fname, luaConfigDelayedThreads& de lci.protobufMaskV6 = maskV6; }); - Lua.writeFunction("protobufServer", [&lci](const string& server_, boost::optional vars) { - try { + Lua.writeFunction("protobufServer", [&lci](boost::variant> servers, boost::optional vars) { if (!lci.protobufExportConfig.enabled) { + lci.protobufExportConfig.enabled = true; - lci.protobufExportConfig.server = ComboAddress(server_); - parseProtobufOptions(vars, lci.protobufExportConfig); + + try { + if (servers.type() == typeid(std::string)) { + auto server = boost::get(servers); + + lci.protobufExportConfig.servers.emplace_back(server); + } + else { + auto serversMap = boost::get>(servers); + for (const auto& serverPair : serversMap) { + lci.protobufExportConfig.servers.emplace_back(serverPair.second); + } + } + + parseProtobufOptions(vars, lci.protobufExportConfig); + } + catch(std::exception& e) { + g_log< vars) { - try { - if (!lci.outgoingProtobufExportConfig.enabled) { - lci.outgoingProtobufExportConfig.enabled = true; - lci.outgoingProtobufExportConfig.server = ComboAddress(server_); - parseProtobufOptions(vars, lci.outgoingProtobufExportConfig); - } - else { - g_log<> servers, boost::optional vars) { + if (!lci.outgoingProtobufExportConfig.enabled) { + + lci.outgoingProtobufExportConfig.enabled = true; + + try { + if (servers.type() == typeid(std::string)) { + auto server = boost::get(servers); + + lci.outgoingProtobufExportConfig.servers.emplace_back(server); + } + else { + auto serversMap = boost::get>(servers); + for (const auto& serverPair : serversMap) { + lci.outgoingProtobufExportConfig.servers.emplace_back(serverPair.second); + } + } + + parseProtobufOptions(vars, lci.outgoingProtobufExportConfig); + } + catch(std::exception& e) { + g_log< exportTypes = { QType::A, QType::AAAA, QType::CNAME }; - ComboAddress server; + std::vector servers; uint64_t maxQueuedEntries{100}; uint16_t timeout{2}; uint16_t reconnectWaitTime{1}; diff --git a/pdns/recursordist/docs/lua-config/protobuf.rst b/pdns/recursordist/docs/lua-config/protobuf.rst index 76d482da15..0f5c381303 100644 --- a/pdns/recursordist/docs/lua-config/protobuf.rst +++ b/pdns/recursordist/docs/lua-config/protobuf.rst @@ -12,13 +12,13 @@ Configuring Protocol Buffer logs -------------------------------- Protobuf export to a server is enabled using the ``protobufServer()`` directive: -.. function:: protobufServer(server [, options])) +.. function:: protobufServer(servers [, options])) .. versionadded:: 4.2.0 - Send protocol buffer messages to a server for incoming queries and/or outgoing responses. The client address may be masked using :func:`setProtobufMasks`, for anonymization purposes. + Send protocol buffer messages to one or more servers for incoming queries and/or outgoing responses. The client address may be masked using :func:`setProtobufMasks`, for anonymization purposes. - :param string server: The IP and port to connect to + :param string or list of strings servers: The IP and port to connect to, or a list of those. If more than one server is configured, all messages are sent to every server. :param table options: A table with key: value pairs with options. Options: @@ -57,13 +57,13 @@ Logging outgoing queries and responses While :func:`protobufServer` only exports the queries sent to the recursor from clients, with the corresponding responses, ``outgoingProtobufServer()`` can be used to export outgoing queries sent by the recursor to authoritative servers, along with the corresponding responses. -.. function:: outgoingProtobufServer(server [, options]) +.. function:: outgoingProtobufServer(servers [, options]) .. versionadded:: 4.2.0 - Send protocol buffer messages to a server for outgoing queries and/or incoming responses. + Send protocol buffer messages to one or more servers for outgoing queries and/or incoming responses. - :param string server: The IP and port to connect to + :param string or list of strings servers: The IP and port to connect to, or a list of those. If more than one server is configured, all messages are sent to every server. :param table options: A table with key: value pairs with options. Options: diff --git a/pdns/recursordist/test-syncres_cc.cc b/pdns/recursordist/test-syncres_cc.cc index a7280c0267..0c99ef035b 100644 --- a/pdns/recursordist/test-syncres_cc.cc +++ b/pdns/recursordist/test-syncres_cc.cc @@ -39,7 +39,7 @@ bool RecursorLua4::preoutquery(const ComboAddress& ns, const ComboAddress& reque return false; } -int asyncresolve(const ComboAddress& ip, const DNSName& domain, int type, bool doTCP, bool sendRDQuery, int EDNS0Level, struct timeval* now, boost::optional& srcmask, boost::optional context, const std::shared_ptr& outgoingLogger, const std::set& exportTypes, LWResult* res, bool* chained) +int asyncresolve(const ComboAddress& ip, const DNSName& domain, int type, bool doTCP, bool sendRDQuery, int EDNS0Level, struct timeval* now, boost::optional& srcmask, boost::optional context, const std::shared_ptr>>& outgoingLoggers, const std::set& exportTypes, LWResult* res, bool* chained) { return 0; } diff --git a/pdns/remote_logger.hh b/pdns/remote_logger.hh index 6037e484c5..3eb2aa5757 100644 --- a/pdns/remote_logger.hh +++ b/pdns/remote_logger.hh @@ -53,14 +53,6 @@ public: { d_exiting = true; } - uint64_t getGeneration() const - { - return d_generation; - } - void setGeneration(uint64_t newGeneration) - { - d_generation = newGeneration; - } private: void busyReconnectLoop(); bool reconnect(); @@ -71,7 +63,6 @@ private: std::condition_variable d_queueCond; ComboAddress d_remote; uint64_t d_maxQueuedEntries; - uint64_t d_generation{0}; int d_socket{-1}; uint16_t d_timeout; uint8_t d_reconnectWaitTime; diff --git a/pdns/syncres.cc b/pdns/syncres.cc index 16c7c73dd1..a99667a3bc 100644 --- a/pdns/syncres.cc +++ b/pdns/syncres.cc @@ -491,7 +491,7 @@ int SyncRes::asyncresolveWrapper(const ComboAddress& ip, bool ednsMANDATORY, con ret = d_asyncResolve(ip, sendQname, type, doTCP, sendRDQuery, EDNSLevel, now, srcmask, ctx, res, chained); } else { - ret=asyncresolve(ip, sendQname, type, doTCP, sendRDQuery, EDNSLevel, now, srcmask, ctx, d_outgoingProtobufServer, luaconfsLocal->outgoingProtobufExportConfig.exportTypes, res, chained); + ret=asyncresolve(ip, sendQname, type, doTCP, sendRDQuery, EDNSLevel, now, srcmask, ctx, d_outgoingProtobufServers, luaconfsLocal->outgoingProtobufExportConfig.exportTypes, res, chained); } if(ret < 0) { return ret; // transport error, nothing to learn here diff --git a/pdns/syncres.hh b/pdns/syncres.hh index 6bc8bb752d..32fbdb9ea8 100644 --- a/pdns/syncres.hh +++ b/pdns/syncres.hh @@ -659,9 +659,9 @@ public: d_initialRequestId = initialRequestId; } - void setOutgoingProtobufServer(std::shared_ptr& server) + void setOutgoingProtobufServers(std::shared_ptr>>& servers) { - d_outgoingProtobufServer = server; + d_outgoingProtobufServers = servers; } #endif @@ -804,7 +804,7 @@ private: ostringstream d_trace; shared_ptr d_pdl; boost::optional d_outgoingECSNetwork; - std::shared_ptr d_outgoingProtobufServer{nullptr}; + std::shared_ptr>> d_outgoingProtobufServers{nullptr}; #ifdef HAVE_PROTOBUF boost::optional d_initialRequestId; #endif diff --git a/regression-tests.recursor-dnssec/test_Protobuf.py b/regression-tests.recursor-dnssec/test_Protobuf.py index 0c36bfcbfd..813087be02 100644 --- a/regression-tests.recursor-dnssec/test_Protobuf.py +++ b/regression-tests.recursor-dnssec/test_Protobuf.py @@ -17,9 +17,6 @@ else: from recursortests import RecursorTest -protobufQueue = Queue() -protobufServerPort = 4243 - def ProtobufConnectionHandler(queue, conn): data = None while True: @@ -35,8 +32,7 @@ def ProtobufConnectionHandler(queue, conn): conn.close() -def ProtobufListener(port): - global protobufQueue +def ProtobufListener(queue, port): sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1) try: @@ -51,7 +47,7 @@ def ProtobufListener(port): (conn, _) = sock.accept() thread = threading.Thread(name='Connection Handler', target=ProtobufConnectionHandler, - args=[protobufQueue, conn]) + args=[queue, conn]) thread.setDaemon(True) thread.start() @@ -61,39 +57,58 @@ def ProtobufListener(port): sock.close() -protobufListener = threading.Thread(name='Protobuf Listener', target=ProtobufListener, args=[protobufServerPort]) -protobufListener.setDaemon(True) -protobufListener.start() +class ProtobufServerParams: + def __init__(self, port): + self.queue = Queue() + self.port = port + +protobufServersParameters = [ProtobufServerParams(4243), ProtobufServerParams(4244)] +protobufListeners = [] +for param in protobufServersParameters: + listener = threading.Thread(name='Protobuf Listener', target=ProtobufListener, args=[param.queue, param.port]) + listener.setDaemon(True) + listener.start() + protobufListeners.append(listener) class TestRecursorProtobuf(RecursorTest): - global protobufServerPort _lua_config_file = """ - protobufServer("127.0.0.1:%d") - """ % (protobufServerPort) + protobufServer({"127.0.0.1:%d", "127.0.0.1:%d"}) + """ % (protobufServersParameters[0].port, protobufServersParameters[1].port) def getFirstProtobufMessage(self, retries=1, waitTime=1): - global protobufQueue - failed = 0 - - while protobufQueue.empty: - if failed >= retries: - break - - failed = failed + 1 - time.sleep(waitTime) + msg = None + + print("in getFirstProtobufMessage") + for param in protobufServersParameters: + print(param.port) + failed = 0 + + while param.queue.empty: + print(failed) + print(retries) + if failed >= retries: + break + + failed = failed + 1 + print("waiting") + time.sleep(waitTime) + + self.assertFalse(param.queue.empty()) + data = param.queue.get(False) + self.assertTrue(data) + oldmsg = msg + msg = dnsmessage_pb2.PBDNSMessage() + msg.ParseFromString(data) + if oldmsg is not None: + self.assertEquals(msg, oldmsg) - self.assertFalse(protobufQueue.empty()) - data = protobufQueue.get(False) - self.assertTrue(data) - msg = dnsmessage_pb2.PBDNSMessage() - msg.ParseFromString(data) return msg def checkNoRemainingMessage(self): - global protobufQueue - self.assertTrue(protobufQueue.empty()) + for param in protobufServersParameters: + self.assertTrue(param.queue.empty()) def checkProtobufBase(self, msg, protocol, query, initiator, normalQueryResponse=True, expectedECS=None, receivedSize=None): self.assertTrue(msg) @@ -206,14 +221,6 @@ class TestRecursorProtobuf(RecursorTest): @classmethod def setUpClass(cls): - global protobufListener - global protobufServerPort - global ProtobufListener - if protobufListener is None or not protobufListener.isAlive(): - protobufListener = threading.Thread(name='Protobuf Listener', target=ProtobufListener, args=[protobufServerPort]) - protobufListener.setDaemon(True) - protobufListener.start() - cls.setUpSockets() cls.startResponders() @@ -227,9 +234,9 @@ class TestRecursorProtobuf(RecursorTest): def setUp(self): # Make sure the queue is empty, in case # a previous test failed - global protobufQueue - while not protobufQueue.empty(): - protobufQueue.get(False) + for param in protobufServersParameters: + while not param.queue.empty(): + param.queue.get(False) @classmethod def generateRecursorConfig(cls, confdir): @@ -327,8 +334,8 @@ class OutgoingProtobufDefaultTest(TestRecursorProtobuf): _config_template = """ auth-zones=example=configs/%s/example.zone""" % _confdir _lua_config_file = """ - outgoingProtobufServer("127.0.0.1:%d") - """ % (protobufServerPort) + outgoingProtobufServer({"127.0.0.1:%d", "127.0.0.1:%d"}) + """ % (protobufServersParameters[0].port, protobufServersParameters[1].port) def testA(self): name = 'www.example.org.' @@ -353,13 +360,12 @@ class ProtobufMasksTest(TestRecursorProtobuf): _confdir = 'ProtobufMasks' _config_template = """ auth-zones=example=configs/%s/example.zone""" % _confdir - global protobufServerPort _protobufMaskV4 = 4 _protobufMaskV6 = 128 _lua_config_file = """ - protobufServer("127.0.0.1:%d") + protobufServer({"127.0.0.1:%d", "127.0.0.1:%d"}) setProtobufMasks(%d, %d) - """ % (protobufServerPort, _protobufMaskV4, _protobufMaskV6) + """ % (protobufServersParameters[0].port, protobufServersParameters[1].port, _protobufMaskV4, _protobufMaskV6) def testA(self): name = 'a.example.' @@ -391,10 +397,9 @@ class ProtobufQueriesOnlyTest(TestRecursorProtobuf): _confdir = 'ProtobufQueriesOnly' _config_template = """ auth-zones=example=configs/%s/example.zone""" % _confdir - global protobufServerPort _lua_config_file = """ - protobufServer("127.0.0.1:%d", { logQueries=true, logResponses=false } ) - """ % (protobufServerPort) + protobufServer({"127.0.0.1:%d", "127.0.0.1:%d"}, { logQueries=true, logResponses=false } ) + """ % (protobufServersParameters[0].port, protobufServersParameters[1].port) def testA(self): name = 'a.example.' @@ -418,10 +423,9 @@ class ProtobufResponsesOnlyTest(TestRecursorProtobuf): _confdir = 'ProtobufResponsesOnly' _config_template = """ auth-zones=example=configs/%s/example.zone""" % _confdir - global protobufServerPort _lua_config_file = """ - protobufServer("127.0.0.1:%d", { logQueries=false, logResponses=true } ) - """ % (protobufServerPort) + protobufServer({"127.0.0.1:%d", "127.0.0.1:%d"}, { logQueries=false, logResponses=true } ) + """ % (protobufServersParameters[0].port, protobufServersParameters[1].port) def testA(self): name = 'a.example.' @@ -450,10 +454,9 @@ class ProtobufTaggedOnlyTest(TestRecursorProtobuf): _confdir = 'ProtobufTaggedOnly' _config_template = """ auth-zones=example=configs/%s/example.zone""" % _confdir - global protobufServerPort _lua_config_file = """ - protobufServer("127.0.0.1:%d", { logQueries=true, logResponses=true, taggedOnly=true } ) - """ % (protobufServerPort) + protobufServer({"127.0.0.1:%d", "127.0.0.1:%d"}, { logQueries=true, logResponses=true, taggedOnly=true } ) + """ % (protobufServersParameters[0].port, protobufServersParameters[1].port) _tags = ['tag1', 'tag2'] _tag_from_gettag = 'tag-from-gettag' _lua_dns_script_file = """ @@ -517,10 +520,9 @@ class ProtobufSelectedFromLuaTest(TestRecursorProtobuf): _confdir = 'ProtobufSelectedFromLua' _config_template = """ auth-zones=example=configs/%s/example.zone""" % _confdir - global protobufServerPort _lua_config_file = """ - protobufServer("127.0.0.1:%d", { logQueries=false, logResponses=false } ) - """ % (protobufServerPort) + protobufServer({"127.0.0.1:%d", "127.0.0.1:%d"}, { logQueries=false, logResponses=false } ) + """ % (protobufServersParameters[0].port, protobufServersParameters[1].port) _lua_dns_script_file = """ local ffi = require("ffi") @@ -599,10 +601,9 @@ class ProtobufExportTypesTest(TestRecursorProtobuf): _confdir = 'ProtobufExportTypes' _config_template = """ auth-zones=example=configs/%s/example.zone""" % _confdir - global protobufServerPort _lua_config_file = """ - protobufServer("127.0.0.1:%d", { exportTypes={"AAAA", "MX", "SPF", "SRV", "TXT"} } ) - """ % (protobufServerPort) + protobufServer({"127.0.0.1:%d", "127.0.0.1:%d"}, { exportTypes={"AAAA", "MX", "SPF", "SRV", "TXT"} } ) + """ % (protobufServersParameters[0].port, protobufServersParameters[1].port) def testA(self): name = 'types.example.' diff --git a/regression-tests.recursor-dnssec/test_RPZ.py b/regression-tests.recursor-dnssec/test_RPZ.py index f6c75c0b42..60a5274339 100644 --- a/regression-tests.recursor-dnssec/test_RPZ.py +++ b/regression-tests.recursor-dnssec/test_RPZ.py @@ -159,7 +159,7 @@ class RPZServer(object): print('Error in RPZ socket: %s' % str(e)) sock.close() -rpzServerPort = 4244 +rpzServerPort = 4250 rpzServer = RPZServer(rpzServerPort) class RPZRecursorTest(RecursorTest):