From: Remi Gacogne Date: Thu, 31 May 2018 13:32:14 +0000 (+0200) Subject: rec: Refactor Protobuf options, add query/response selection X-Git-Tag: dnsdist-1.3.3~174^2~2 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=f1c7929abb7a05c273b54b02a488707dc9b818f6;p=thirdparty%2Fpdns.git rec: Refactor Protobuf options, add query/response selection - Turn the protobuf options into a table because there are already too many of them - Split the masks applied to the initiator to a separate `setProtobufMasks` directive - Add the possibility to log only queries, or only responses - Add the possibility to select queries (FFI only) and responses for export from the Lua hooks - Add regression tests for the protobuf features --- diff --git a/pdns/lua-recursor4-ffi.hh b/pdns/lua-recursor4-ffi.hh index cf8edf7bd0..fb9f63d04f 100644 --- a/pdns/lua-recursor4-ffi.hh +++ b/pdns/lua-recursor4-ffi.hh @@ -53,4 +53,5 @@ extern "C" { void pdns_ffi_param_set_deviceid(pdns_ffi_param_t* ref, size_t len, const void* name) __attribute__ ((visibility ("default"))); void pdns_ffi_param_set_variable(pdns_ffi_param_t* ref, bool variable) __attribute__ ((visibility ("default"))); void pdns_ffi_param_set_ttl_cap(pdns_ffi_param_t* ref, uint32_t ttl) __attribute__ ((visibility ("default"))); + void pdns_ffi_param_set_log_query(pdns_ffi_param_t* ref, bool logQuery) __attribute__ ((visibility ("default"))); } diff --git a/pdns/lua-recursor4.cc b/pdns/lua-recursor4.cc index beb62ed36b..d0f1af10a2 100644 --- a/pdns/lua-recursor4.cc +++ b/pdns/lua-recursor4.cc @@ -226,6 +226,7 @@ void RecursorLua4::postPrepareContext() d_lw->registerMember("variable", [](const DNSQuestion& dq) -> bool { return dq.variable; }, [](DNSQuestion& dq, bool newVariable) { dq.variable = newVariable; }); d_lw->registerMember("wantsRPZ", [](const DNSQuestion& dq) -> bool { return dq.wantsRPZ; }, [](DNSQuestion& dq, bool newWantsRPZ) { dq.wantsRPZ = newWantsRPZ; }); + d_lw->registerMember("logResponse", [](const DNSQuestion& dq) -> bool { return dq.logResponse; }, [](DNSQuestion& dq, bool newLogResponse) { dq.logResponse = newLogResponse; }); d_lw->registerMember("rcode", &DNSQuestion::rcode); d_lw->registerMember("tag", &DNSQuestion::tag); @@ -468,7 +469,8 @@ bool RecursorLua4::preoutquery(const ComboAddress& ns, const ComboAddress& reque { bool variableAnswer = false; bool wantsRPZ = false; - RecursorLua4::DNSQuestion dq(ns, requestor, query, qtype.getCode(), isTcp, variableAnswer, wantsRPZ); + bool logQuery = false; + RecursorLua4::DNSQuestion dq(ns, requestor, query, qtype.getCode(), isTcp, variableAnswer, wantsRPZ, logQuery); dq.currentRecords = &res; return genhook(d_preoutquery, dq, ret); @@ -514,7 +516,7 @@ unsigned int RecursorLua4::gettag(const ComboAddress& remote, const Netmask& edn struct pdns_ffi_param { public: - pdns_ffi_param(const DNSName& qname_, uint16_t qtype_, const ComboAddress& local_, const ComboAddress& remote_, const Netmask& ednssubnet_, std::vector& policyTags_, const EDNSOptionViewMap& ednsOptions_, std::string& requestorId_, std::string& deviceId_, uint32_t& ttlCap_, bool& variable_, bool tcp_): qname(qname_), local(local_), remote(remote_), ednssubnet(ednssubnet_), policyTags(policyTags_), ednsOptions(ednsOptions_), requestorId(requestorId_), deviceId(deviceId_), ttlCap(ttlCap_), variable(variable_), qtype(qtype_), tcp(tcp_) + pdns_ffi_param(const DNSName& qname_, uint16_t qtype_, const ComboAddress& local_, const ComboAddress& remote_, const Netmask& ednssubnet_, std::vector& policyTags_, const EDNSOptionViewMap& ednsOptions_, std::string& requestorId_, std::string& deviceId_, uint32_t& ttlCap_, bool& variable_, bool tcp_, bool& logQuery_): qname(qname_), local(local_), remote(remote_), ednssubnet(ednssubnet_), policyTags(policyTags_), ednsOptions(ednsOptions_), requestorId(requestorId_), deviceId(deviceId_), ttlCap(ttlCap_), variable(variable_), logQuery(logQuery_), qtype(qtype_), tcp(tcp_) { } @@ -534,16 +536,17 @@ public: std::string& deviceId; uint32_t& ttlCap; bool& variable; + bool& logQuery; unsigned int tag{0}; uint16_t qtype; bool tcp; }; -unsigned int RecursorLua4::gettag_ffi(const ComboAddress& remote, const Netmask& ednssubnet, const ComboAddress& local, const DNSName& qname, uint16_t qtype, std::vector* policyTags, LuaContext::LuaObject& data, const EDNSOptionViewMap& ednsOptions, bool tcp, std::string& requestorId, std::string& deviceId, uint32_t& ttlCap, bool& variable) const +unsigned int RecursorLua4::gettag_ffi(const ComboAddress& remote, const Netmask& ednssubnet, const ComboAddress& local, const DNSName& qname, uint16_t qtype, std::vector* policyTags, LuaContext::LuaObject& data, const EDNSOptionViewMap& ednsOptions, bool tcp, std::string& requestorId, std::string& deviceId, uint32_t& ttlCap, bool& variable, bool& logQuery) const { if (d_gettag_ffi) { - pdns_ffi_param_t param(qname, qtype, local, remote, ednssubnet, *policyTags, ednsOptions, requestorId, deviceId, ttlCap, variable, tcp); + pdns_ffi_param_t param(qname, qtype, local, remote, ednssubnet, *policyTags, ednsOptions, requestorId, deviceId, ttlCap, variable, tcp, logQuery); auto ret = d_gettag_ffi(¶m); if (ret) { @@ -807,3 +810,8 @@ void pdns_ffi_param_set_ttl_cap(pdns_ffi_param_t* ref, uint32_t ttl) { ref->ttlCap = ttl; } + +void pdns_ffi_param_set_log_query(pdns_ffi_param_t* ref, bool logQuery) +{ + ref->logQuery = logQuery; +} diff --git a/pdns/lua-recursor4.hh b/pdns/lua-recursor4.hh index 6c0bf7ce5f..55b5646759 100644 --- a/pdns/lua-recursor4.hh +++ b/pdns/lua-recursor4.hh @@ -60,7 +60,7 @@ public: struct DNSQuestion { - DNSQuestion(const ComboAddress& rem, const ComboAddress& loc, const DNSName& query, uint16_t type, bool tcp, bool& variable_, bool& wantsRPZ_): qname(query), qtype(type), local(loc), remote(rem), isTcp(tcp), variable(variable_), wantsRPZ(wantsRPZ_) + DNSQuestion(const ComboAddress& rem, const ComboAddress& loc, const DNSName& query, uint16_t type, bool tcp, bool& variable_, bool& wantsRPZ_, bool& logResponse_): qname(query), qtype(type), local(loc), remote(rem), isTcp(tcp), variable(variable_), wantsRPZ(wantsRPZ_), logResponse(logResponse_) { } const DNSName& qname; @@ -80,6 +80,7 @@ public: vState validationState{Indeterminate}; bool& variable; bool& wantsRPZ; + bool& logResponse; unsigned int tag{0}; void addAnswer(uint16_t type, const std::string& content, boost::optional ttl, boost::optional name); @@ -110,7 +111,7 @@ public: }; unsigned int gettag(const ComboAddress& remote, const Netmask& ednssubnet, const ComboAddress& local, const DNSName& qname, uint16_t qtype, std::vector* policyTags, LuaContext::LuaObject& data, const EDNSOptionViewMap&, bool tcp, std::string& requestorId, std::string& deviceId) const; - unsigned int gettag_ffi(const ComboAddress& remote, const Netmask& ednssubnet, const ComboAddress& local, const DNSName& qname, uint16_t qtype, std::vector* policyTags, LuaContext::LuaObject& data, const EDNSOptionViewMap&, bool tcp, std::string& requestorId, std::string& deviceId, uint32_t& ttlCap, bool& variable) const; + unsigned int gettag_ffi(const ComboAddress& remote, const Netmask& ednssubnet, const ComboAddress& local, const DNSName& qname, uint16_t qtype, std::vector* policyTags, LuaContext::LuaObject& data, const EDNSOptionViewMap&, bool tcp, std::string& requestorId, std::string& deviceId, uint32_t& ttlCap, bool& variable, bool& logQuery) const; void maintenance() const; bool prerpz(DNSQuestion& dq, int& ret) const; diff --git a/pdns/pdns_recursor.cc b/pdns/pdns_recursor.cc index ee81153579..081223e775 100644 --- a/pdns/pdns_recursor.cc +++ b/pdns/pdns_recursor.cc @@ -963,8 +963,10 @@ static void startDoResolve(void *p) // Used to tell syncres later on if we should apply NSDNAME and NSIP RPZ triggers for this query bool wantsRPZ(true); boost::optional pbMessage(boost::none); + bool logResponse = false; #ifdef HAVE_PROTOBUF if (checkProtobufExport(luaconfsLocal)) { + logResponse = t_protobufServer && luaconfsLocal->protobufExportConfig.logResponses; Netmask requestorNM(dc->d_source, dc->d_source.sin4.sin_family == AF_INET ? luaconfsLocal->protobufMaskV4 : luaconfsLocal->protobufMaskV6); const ComboAddress& requestor = requestorNM.getMaskedNetwork(); pbMessage = RecProtoBufMessage(RecProtoBufMessage::Response); @@ -1024,7 +1026,7 @@ static void startDoResolve(void *p) int res = RCode::NoError; DNSFilterEngine::Policy appliedPolicy; DNSRecord spoofed; - RecursorLua4::DNSQuestion dq(dc->d_source, dc->d_destination, dc->d_mdp.d_qname, dc->d_mdp.d_qtype, dc->d_tcp, variableAnswer, wantsRPZ); + RecursorLua4::DNSQuestion dq(dc->d_source, dc->d_destination, dc->d_mdp.d_qname, dc->d_mdp.d_qtype, dc->d_tcp, variableAnswer, wantsRPZ, logResponse); dq.ednsFlags = &edo.d_extFlags; dq.ednsOptions = &ednsOpts; dq.tag = dc->d_tag; @@ -1372,7 +1374,7 @@ static void startDoResolve(void *p) g_rs.submitResponse(dc->d_mdp.d_qtype, packet.size(), !dc->d_tcp); updateResponseStats(res, dc->d_source, packet.size(), &dc->d_mdp.d_qname, dc->d_mdp.d_qtype); #ifdef HAVE_PROTOBUF - if (t_protobufServer && (!luaconfsLocal->protobufTaggedOnly || (appliedPolicy.d_name && !appliedPolicy.d_name->empty()) || !dc->d_policyTags.empty())) { + if (t_protobufServer && logResponse && (!luaconfsLocal->protobufExportConfig.taggedOnly || (appliedPolicy.d_name && !appliedPolicy.d_name->empty()) || !dc->d_policyTags.empty())) { pbMessage->setBytes(packet.size()); pbMessage->setResponseCode(pw.getHeader()->rcode); if (appliedPolicy.d_name) { @@ -1731,6 +1733,7 @@ static void handleRunningTCPQuestion(int fd, FDMultiplexer::funcparam_t& var) if (checkProtobufExport(luaconfsLocal)) { needECS = true; } + bool logQuery = t_protobufServer && luaconfsLocal->protobufExportConfig.logQueries; #endif if(needECS || needXPF || (t_pdl && (t_pdl->d_gettag_ffi || t_pdl->d_gettag))) { @@ -1747,7 +1750,7 @@ static void handleRunningTCPQuestion(int fd, FDMultiplexer::funcparam_t& var) if(t_pdl) { try { if (t_pdl->d_gettag_ffi) { - dc->d_tag = t_pdl->gettag_ffi(dc->d_source, dc->d_ednssubnet.source, dc->d_destination, qname, qtype, &dc->d_policyTags, dc->d_data, ednsOptions, true, requestorId, deviceId, dc->d_ttlCap, dc->d_variable); + dc->d_tag = t_pdl->gettag_ffi(dc->d_source, dc->d_ednssubnet.source, dc->d_destination, qname, qtype, &dc->d_policyTags, dc->d_data, ednsOptions, true, requestorId, deviceId, dc->d_ttlCap, dc->d_variable, logQuery); } else if (t_pdl->d_gettag) { dc->d_tag = t_pdl->gettag(dc->d_source, dc->d_ednssubnet.source, dc->d_destination, qname, qtype, &dc->d_policyTags, dc->d_data, ednsOptions, true, requestorId, deviceId); @@ -1776,7 +1779,7 @@ static void handleRunningTCPQuestion(int fd, FDMultiplexer::funcparam_t& var) try { const struct dnsheader* dh = (const struct dnsheader*) conn->data; - if (!luaconfsLocal->protobufTaggedOnly) { + if (logQuery && (!luaconfsLocal->protobufExportConfig.taggedOnly || !dc->d_policyTags.empty())) { protobufLogQuery(t_protobufServer, luaconfsLocal->protobufMaskV4, luaconfsLocal->protobufMaskV6, dc->d_uuid, dc->d_source, dc->d_destination, dc->d_ednssubnet.source, true, dh->id, conn->qlen, qname, qtype, qclass, dc->d_policyTags, dc->d_requestorId, dc->d_deviceId); } } @@ -1900,6 +1903,8 @@ static string* doProcessUDPQuestion(const std::string& question, const ComboAddr } else if (checkOutgoingProtobufExport(luaconfsLocal)) { uniqueId = (*t_uuidGenerator)(); } + bool logQuery = t_protobufServer && luaconfsLocal->protobufExportConfig.logQueries; + bool logResponse = t_protobufServer && luaconfsLocal->protobufExportConfig.logResponses; #endif EDNSSubnetOpts ednssubnet; bool ecsFound = false; @@ -1941,7 +1946,7 @@ static string* doProcessUDPQuestion(const std::string& question, const ComboAddr if(t_pdl) { try { if (t_pdl->d_gettag_ffi) { - ctag = t_pdl->gettag_ffi(source, ednssubnet.source, destination, qname, qtype, &policyTags, data, ednsOptions, false, requestorId, deviceId, ttlCap, variable); + ctag = t_pdl->gettag_ffi(source, ednssubnet.source, destination, qname, qtype, &policyTags, data, ednsOptions, false, requestorId, deviceId, ttlCap, variable, logQuery); } else if (t_pdl->d_gettag) { ctag = t_pdl->gettag(source, ednssubnet.source, destination, qname, qtype, &policyTags, data, ednsOptions, false, requestorId, deviceId); @@ -1965,7 +1970,7 @@ static string* doProcessUDPQuestion(const std::string& question, const ComboAddr #ifdef HAVE_PROTOBUF if(t_protobufServer) { pbMessage = RecProtoBufMessage(DNSProtoBufMessage::DNSProtoBufMessageType::Response); - if (!luaconfsLocal->protobufTaggedOnly || !policyTags.empty()) { + if (logQuery && (!luaconfsLocal->protobufExportConfig.taggedOnly || !policyTags.empty())) { protobufLogQuery(t_protobufServer, luaconfsLocal->protobufMaskV4, luaconfsLocal->protobufMaskV6, uniqueId, source, destination, ednssubnet.source, false, dh->id, question.size(), qname, qtype, qclass, policyTags, requestorId, deviceId); } } @@ -1991,7 +1996,7 @@ static string* doProcessUDPQuestion(const std::string& question, const ComboAddr } #ifdef HAVE_PROTOBUF - if(t_protobufServer && (!luaconfsLocal->protobufTaggedOnly || !pbMessage->getAppliedPolicy().empty() || !pbMessage->getPolicyTags().empty())) { + if(t_protobufServer && logResponse && (!luaconfsLocal->protobufExportConfig.taggedOnly || !pbMessage->getAppliedPolicy().empty() || !pbMessage->getPolicyTags().empty())) { Netmask requestorNM(source, source.sin4.sin_family == AF_INET ? luaconfsLocal->protobufMaskV4 : luaconfsLocal->protobufMaskV6); const ComboAddress& requestor = requestorNM.getMaskedNetwork(); pbMessage->update(uniqueId, &requestor, &destination, false, dh->id); diff --git a/pdns/rec-lua-conf.cc b/pdns/rec-lua-conf.cc index cac3cc71e3..d8ca13de2e 100644 --- a/pdns/rec-lua-conf.cc +++ b/pdns/rec-lua-conf.cc @@ -81,6 +81,43 @@ static void parseRPZParameters(const std::unordered_map > protobufOptions_t; + +static void parseProtobufOptions(boost::optional vars, ProtobufExportConfig& config) +{ + if (!vars) { + return; + } + + if (vars->count("timeout")) { + config.timeout = boost::get((*vars)["timeout"]); + } + + if (vars->count("maxQueuedEntries")) { + config.maxQueuedEntries = boost::get((*vars)["maxQueuedEntries"]); + } + + if (vars->count("reconnectWaitTime")) { + config.reconnectWaitTime = boost::get((*vars)["reconnectWaitTime"]); + } + + if (vars->count("asyncConnect")) { + config.asyncConnect = boost::get((*vars)["asyncConnect"]); + } + + if (vars->count("taggedOnly")) { + config.taggedOnly = boost::get((*vars)["taggedOnly"]); + } + + if (vars->count("logQueries")) { + config.logQueries = boost::get((*vars)["logQueries"]); + } + + if (vars->count("logResponses")) { + config.logResponses = boost::get((*vars)["logResponses"]); + } +} + void loadRecursorLuaConfig(const std::string& fname, luaConfigDelayedThreads& delayedThreads) { LuaConfigItems lci; @@ -314,40 +351,17 @@ void loadRecursorLuaConfig(const std::string& fname, luaConfigDelayedThreads& de }); #if HAVE_PROTOBUF - Lua.writeFunction("protobufServer", [&lci](const string& server_, const boost::optional timeout, const boost::optional maxQueuedEntries, const boost::optional reconnectWaitTime, const boost::optional maskV4, boost::optional maskV6, boost::optional asyncConnect, boost::optional taggedOnly) { + Lua.writeFunction("setProtobufMasks", [&lci](const uint8_t maskV4, uint8_t maskV6) { + lci.protobufMaskV4 = maskV4; + lci.protobufMaskV6 = maskV6; + }); + + Lua.writeFunction("protobufServer", [&lci](const string& server_, boost::optional vars) { try { - ComboAddress server(server_); if (!lci.protobufExportConfig.enabled) { - lci.protobufExportConfig.enabled = true; - - lci.protobufExportConfig.server = server; - - if (timeout) { - lci.protobufExportConfig.timeout = *timeout; - } - - if (maxQueuedEntries) { - lci.protobufExportConfig.maxQueuedEntries = *maxQueuedEntries; - } - - if (reconnectWaitTime) { - lci.protobufExportConfig.reconnectWaitTime = *reconnectWaitTime; - } - - if (asyncConnect) { - lci.protobufExportConfig.asyncConnect = *asyncConnect; - } - - if (maskV4) { - lci.protobufMaskV4 = *maskV4; - } - if (maskV6) { - lci.protobufMaskV6 = *maskV6; - } - if (taggedOnly) { - lci.protobufTaggedOnly = *taggedOnly; - } + lci.protobufExportConfig.server = ComboAddress(server_); + parseProtobufOptions(vars, lci.protobufExportConfig); } else { g_log< timeout, const boost::optional maxQueuedEntries, const boost::optional reconnectWaitTime, boost::optional asyncConnect) { + Lua.writeFunction("outgoingProtobufServer", [&lci](const string& server_, boost::optional vars) { try { - ComboAddress server(server_); if (!lci.outgoingProtobufExportConfig.enabled) { - lci.outgoingProtobufExportConfig.enabled = true; - - lci.outgoingProtobufExportConfig.server = server; - - if (timeout) { - lci.outgoingProtobufExportConfig.timeout = *timeout; - } - - if (maxQueuedEntries) { - lci.outgoingProtobufExportConfig.maxQueuedEntries = *maxQueuedEntries; - } - - if (reconnectWaitTime) { - lci.outgoingProtobufExportConfig.reconnectWaitTime = *reconnectWaitTime; - } - - if (asyncConnect) { - lci.outgoingProtobufExportConfig.asyncConnect = *asyncConnect; - } + lci.protobufExportConfig.server = ComboAddress(server_); + parseProtobufOptions(vars, lci.protobufExportConfig); } else { g_log< g_luaconfs; diff --git a/pdns/recursordist/docs/lua-config/protobuf.rst b/pdns/recursordist/docs/lua-config/protobuf.rst index b7ac703cf6..40d614bcc8 100644 --- a/pdns/recursordist/docs/lua-config/protobuf.rst +++ b/pdns/recursordist/docs/lua-config/protobuf.rst @@ -12,8 +12,27 @@ Configuring Protocol Buffer logs -------------------------------- Protobuf export to a server is enabled using the ``protobufServer()`` directive: +.. function:: protobufServer(server [, options])) + + .. versionadded:: 4.2.0 + + :param string server: The IP and port to connect to + :param table options: A table with key: value pairs with options. + + Options: + + * ``timeout=2``: int - Time in seconds to wait when sending a message + * ``maxQueuedEntries=100``: int - How many entries will be kept in memory if the server becomes unreachable + * ``reconnectWaitTime=1``: int - How long to wait, in seconds, between two reconnection attempts + * ``taggedOnly=false``: bool - Only entries with a policy or a policy tag set will be sent + * ``asyncConnect``: bool - When set to false (default) the first connection to the server during startup will block up to ``timeout`` seconds, otherwise the connection is done in a separate thread, after the first message has been queued + * ``logQueries=true``: bool - Whether to export queries + * ``logResponses=true``: bool - Whether to export responses + .. function:: protobufServer(server [[[[[[[, timeout=2], maxQueuedEntries=100], reconnectWaitTime=1], maskV4=32], maskV6=128], asyncConnect=false], taggedOnly=false]) + .. deprecated:: 4.2.0 + :param string server: The IP and port to connect to :param int timeout: Time in seconds to wait when sending a message :param int maxQueuedEntries: How many entries will be kept in memory if the server becomes unreachable @@ -23,13 +42,39 @@ Protobuf export to a server is enabled using the ``protobufServer()`` directive: :param bool taggedOnly: Only entries with a policy or a policy tag set will be sent. :param bool asyncConnect: When set to false (default) the first connection to the server during startup will block up to ``timeout`` seconds, otherwise the connection is done in a separate thread, after the first message has been queued.. +.. function:: setProtobufMasks(maskv4, maskV6) + + .. versionadded:: 4.2.0 + + :param int maskV4: network mask to apply to the client IPv4 addresses, for anonymization purposes. The default of 32 means no anonymization. + :param int maskV6: Same as maskV4, but for IPv6. Defaults to 128. + Logging outgoing queries and responses -------------------------------------- While :func:`protobufServer` only exports the queries sent to the recursor from clients, with the corresponding responses, ``outgoingProtobufServer()`` can be used to export outgoing queries sent by the recursor to authoritative servers, along with the corresponding responses. +.. function:: outgoingProtobufServer(server [, options]) + + .. versionadded:: 4.2.0 + + :param string server: The IP and port to connect to + :param table options: A table with key: value pairs with options. + + Options: + + * ``timeout=2``: int - Time in seconds to wait when sending a message + * ``maxQueuedEntries=100``: int - How many entries will be kept in memory if the server becomes unreachable + * ``reconnectWaitTime=1``: int - How long to wait, in seconds, between two reconnection attempts + * ``taggedOnly=false``: bool - Only entries with a policy or a policy tag set will be sent + * ``asyncConnect``: bool - When set to false (default) the first connection to the server during startup will block up to ``timeout`` seconds, otherwise the connection is done in a separate thread, after the first message has been queued + * ``logQueries=true``: bool - Whether to export queries + * ``logResponses=true``: bool - Whether to export responses + .. function:: outgoingProtobufServer(server [[[[, timeout=2], maxQueuedEntries=100], reconnectWaitTime=1], asyncConnect=false]) + .. deprecated:: 4.2.0 + :param string server: The IP and port to connect to :param int timeout: Time in seconds to wait when sending a message :param int maxQueuedEntries: How many entries will be kept in memory if the server becomes unreachable diff --git a/pdns/recursordist/docs/lua-scripting/dq.rst b/pdns/recursordist/docs/lua-scripting/dq.rst index 9f6f78ec0a..fc3cc847d2 100644 --- a/pdns/recursordist/docs/lua-scripting/dq.rst +++ b/pdns/recursordist/docs/lua-scripting/dq.rst @@ -133,6 +133,12 @@ The DNSQuestion object contains at least the following fields: Possible states are ``pdns.validationstates.Indeterminate``, ``pdns.validationstates.Bogus``, ``pdns.validationstates.Insecure`` and ``pdns.validationstates.Secure``. The result will always be ``pdns.validationstates.Indeterminate`` is validation is disabled or was not requested. + .. attribute:: DNSQuestion.logResponse + + .. versionadded:: 4.2.0 + + Whether the response to this query will be exported to a remote protobuf logger, if one has been configured. + It also supports the following methods: .. method:: DNSQuestion:addAnswer(type, content, [ttl, name]) diff --git a/regression-tests.recursor-dnssec/.gitignore b/regression-tests.recursor-dnssec/.gitignore index 7103d74161..4b621b2f5e 100644 --- a/regression-tests.recursor-dnssec/.gitignore +++ b/regression-tests.recursor-dnssec/.gitignore @@ -3,3 +3,4 @@ /.venv /configs /vars +/*_pb2.py diff --git a/regression-tests.recursor-dnssec/requirements.txt b/regression-tests.recursor-dnssec/requirements.txt index 18aa37e592..4f5afe9d85 100644 --- a/regression-tests.recursor-dnssec/requirements.txt +++ b/regression-tests.recursor-dnssec/requirements.txt @@ -1,5 +1,7 @@ dnspython>=1.11 nose>=1.3.7 +protobuf>=2.5; sys_platform != 'darwin' +protobuf>=3.0; sys_platform == 'darwin' pysnmp>=4.3.4 requests>=2.1.0 Twisted>0.15.0 diff --git a/regression-tests.recursor-dnssec/runtests b/regression-tests.recursor-dnssec/runtests index 9682f92eb6..2878142216 100755 --- a/regression-tests.recursor-dnssec/runtests +++ b/regression-tests.recursor-dnssec/runtests @@ -9,6 +9,8 @@ fi python -V pip install -r requirements.txt +protoc -I=../pdns/ --python_out=. ../pdns/dnsmessage.proto + mkdir -p configs [ -f ./vars ] && . ./vars @@ -42,4 +44,4 @@ set -e if [ "${PDNS_DEBUG}" = "YES" ]; then set -x fi -LD_PRELOAD="${LIBAUTHBIND} ${LIBFAKETIME}" nosetests -I test_WellKnown.py --with-xunit $@ +LD_PRELOAD="${LIBASAN} ${LIBAUTHBIND} ${LIBFAKETIME}" nosetests -I test_WellKnown.py --with-xunit $@ diff --git a/regression-tests.recursor-dnssec/test_Protobuf.py b/regression-tests.recursor-dnssec/test_Protobuf.py new file mode 100644 index 0000000000..469c680259 --- /dev/null +++ b/regression-tests.recursor-dnssec/test_Protobuf.py @@ -0,0 +1,484 @@ +import dns +import dnsmessage_pb2 +import os +import socket +import struct +import sys +import threading +import time + +# Python2/3 compatibility hacks +if sys.version_info[0] == 2: + from Queue import Queue + range = xrange +else: + from queue import Queue + range = range # allow re-export of the builtin name + +from recursortests import RecursorTest + +protobufQueue = Queue() +protobufServerPort = 4243 + +def ProtobufConnectionHandler(queue, conn): + data = None + while True: + data = conn.recv(2) + if not data: + break + (datalen,) = struct.unpack("!H", data) + data = conn.recv(datalen) + if not data: + break + + queue.put(data, True, timeout=2.0) + + conn.close() + +def ProtobufListener(port): + global protobufQueue + sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1) + try: + sock.bind(("127.0.0.1", port)) + except socket.error as e: + print("Error binding in the protobuf listener: %s" % str(e)) + sys.exit(1) + + sock.listen(100) + while True: + try: + (conn, _) = sock.accept() + thread = threading.Thread(name='Connection Handler', + target=ProtobufConnectionHandler, + args=[protobufQueue, conn]) + thread.setDaemon(True) + thread.start() + + except socket.error as e: + print('Error in protobuf socket: %s' % str(e)) + + sock.close() + + +protobufListener = threading.Thread(name='Protobuf Listener', target=ProtobufListener, args=[protobufServerPort]) +protobufListener.setDaemon(True) +protobufListener.start() + +class TestRecursorProtobuf(RecursorTest): + + global protobufServerPort + _lua_config_file = """ + protobufServer("127.0.0.1:%d") + """ % (protobufServerPort) + + + def getFirstProtobufMessage(self, retries=1, waitTime=1): + global protobufQueue + failed = 0 + + while protobufQueue.empty: + if failed >= retries: + break + + failed = failed + 1 + time.sleep(waitTime) + + self.assertFalse(protobufQueue.empty()) + data = protobufQueue.get(False) + self.assertTrue(data) + msg = dnsmessage_pb2.PBDNSMessage() + msg.ParseFromString(data) + return msg + + def checkNoRemainingMessage(self): + global protobufQueue + self.assertTrue(protobufQueue.empty()) + + def checkProtobufBase(self, msg, protocol, query, initiator, normalQueryResponse=True, expectedECS=None): + self.assertTrue(msg) + self.assertTrue(msg.HasField('timeSec')) + self.assertTrue(msg.HasField('socketFamily')) + self.assertEquals(msg.socketFamily, dnsmessage_pb2.PBDNSMessage.INET) + self.assertTrue(msg.HasField('from')) + fromvalue = getattr(msg, 'from') + self.assertEquals(socket.inet_ntop(socket.AF_INET, fromvalue), initiator) + self.assertTrue(msg.HasField('socketProtocol')) + self.assertEquals(msg.socketProtocol, protocol) + self.assertTrue(msg.HasField('messageId')) + self.assertTrue(msg.HasField('id')) + self.assertEquals(msg.id, query.id) + self.assertTrue(msg.HasField('inBytes')) + if normalQueryResponse: + # compare inBytes with length of query/response + self.assertEquals(msg.inBytes, len(query.to_wire())) + if expectedECS is not None: + self.assertTrue(msg.HasField('originalRequestorSubnet')) + # v4 only for now + self.assertEquals(len(msg.originalRequestorSubnet), 4) + self.assertEquals(socket.inet_ntop(socket.AF_INET, msg.originalRequestorSubnet), '127.0.0.1') + + def checkProtobufQuery(self, msg, protocol, query, qclass, qtype, qname, initiator='127.0.0.1'): + self.assertEquals(msg.type, dnsmessage_pb2.PBDNSMessage.DNSQueryType) + self.checkProtobufBase(msg, protocol, query, initiator) + # dnsdist doesn't fill the responder field for responses + # because it doesn't keep the information around. + self.assertTrue(msg.HasField('to')) + self.assertEquals(socket.inet_ntop(socket.AF_INET, msg.to), '127.0.0.1') + self.assertTrue(msg.HasField('question')) + self.assertTrue(msg.question.HasField('qClass')) + self.assertEquals(msg.question.qClass, qclass) + self.assertTrue(msg.question.HasField('qType')) + self.assertEquals(msg.question.qClass, qtype) + self.assertTrue(msg.question.HasField('qName')) + self.assertEquals(msg.question.qName, qname) + + def checkProtobufResponse(self, msg, protocol, response, initiator='127.0.0.1'): + self.assertEquals(msg.type, dnsmessage_pb2.PBDNSMessage.DNSResponseType) + self.checkProtobufBase(msg, protocol, response, initiator) + self.assertTrue(msg.HasField('response')) + self.assertTrue(msg.response.HasField('queryTimeSec')) + + def checkProtobufResponseRecord(self, record, rclass, rtype, rname, rttl): + self.assertTrue(record.HasField('class')) + self.assertEquals(getattr(record, 'class'), rclass) + self.assertTrue(record.HasField('type')) + self.assertEquals(record.type, rtype) + self.assertTrue(record.HasField('name')) + self.assertEquals(record.name, rname) + self.assertTrue(record.HasField('ttl')) + self.assertEquals(record.ttl, rttl) + self.assertTrue(record.HasField('rdata')) + + def checkProtobufPolicy(self, msg, policyType, reason): + self.assertEquals(msg.type, dnsmessage_pb2.PBDNSMessage.DNSResponseType) + self.assertTrue(msg.response.HasField('appliedPolicyType')) + self.assertTrue(msg.response.HasField('appliedPolicy')) + self.assertEquals(msg.response.appliedPolicy, reason) + self.assertEquals(msg.response.appliedPolicyType, policyType) + + def checkProtobufTags(self, msg, tags): + self.assertEquals(len(msg.response.tags), len(tags)) + for tag in msg.response.tags: + self.assertTrue(tag in tags) + + @classmethod + def setUpClass(cls): + + global protobufListener + global protobufServerPort + global ProtobufListener + if protobufListener is None or not protobufListener.isAlive(): + protobufListener = threading.Thread(name='Protobuf Listener', target=ProtobufListener, args=[protobufServerPort]) + protobufListener.setDaemon(True) + protobufListener.start() + + cls.setUpSockets() + + cls.startResponders() + + confdir = os.path.join('configs', cls._confdir) + cls.createConfigDir(confdir) + + cls.generateRecursorConfig(confdir) + cls.startRecursor(confdir, cls._recursorPort) + + def setUp(self): + # Make sure the queue is empty, in case + # a previous test failed + global protobufQueue + while not protobufQueue.empty(): + protobufQueue.get(False) + + @classmethod + def generateRecursorConfig(cls, confdir): + authzonepath = os.path.join(confdir, 'example.zone') + with open(authzonepath, 'w') as authzone: + authzone.write("""$ORIGIN example. +@ 3600 IN SOA {soa} +a 3600 IN A 192.0.2.42 +tagged 3600 IN A 192.0.2.84 +query-selected 3600 IN A 192.0.2.84 +answer-selected 3600 IN A 192.0.2.84 +""".format(soa=cls._SOA)) + super(TestRecursorProtobuf, cls).generateRecursorConfig(confdir) + + @classmethod + def tearDownClass(cls): + cls.tearDownRecursor() + +class ProtobufDefaultTest(TestRecursorProtobuf): + """ + This test makes sure that we correctly export queries and response over protobuf. + """ + + _confdir = 'ProtobufDefault' + _config_template = """ +auth-zones=example=configs/%s/example.zone""" % _confdir + + def testA(self): + name = 'a.example.' + expected = dns.rrset.from_text(name, 0, dns.rdataclass.IN, 'A', '192.0.2.42') + query = dns.message.make_query(name, 'A', want_dnssec=True) + query.flags |= dns.flags.CD + res = self.sendUDPQuery(query) + self.assertRRsetInAnswer(res, expected) + + # check the protobuf messages corresponding to the UDP query and answer + msg = self.getFirstProtobufMessage() + self.checkProtobufQuery(msg, dnsmessage_pb2.PBDNSMessage.UDP, query, dns.rdataclass.IN, dns.rdatatype.A, name) + # then the response + msg = self.getFirstProtobufMessage() + self.checkProtobufResponse(msg, dnsmessage_pb2.PBDNSMessage.UDP, res) + self.assertEquals(len(msg.response.rrs), 1) + rr = msg.response.rrs[0] + # we have max-cache-ttl set to 15 + self.checkProtobufResponseRecord(rr, dns.rdataclass.IN, dns.rdatatype.A, name, 15) + self.assertEquals(socket.inet_ntop(socket.AF_INET, rr.rdata), '192.0.2.42') + self.checkNoRemainingMessage() + +class ProtobufMasksTest(TestRecursorProtobuf): + """ + This test makes sure that we correctly export queries and response over protobuf, respecting the configured initiator masking. + """ + + _confdir = 'ProtobufMasks' + _config_template = """ +auth-zones=example=configs/%s/example.zone""" % _confdir + global protobufServerPort + _protobufMaskV4 = 4 + _protobufMaskV6 = 128 + _lua_config_file = """ + protobufServer("127.0.0.1:%d") + setProtobufMasks(%d, %d) + """ % (protobufServerPort, _protobufMaskV4, _protobufMaskV6) + + def testA(self): + name = 'a.example.' + expected = dns.rrset.from_text(name, 0, dns.rdataclass.IN, 'A', '192.0.2.42') + query = dns.message.make_query(name, 'A', want_dnssec=True) + query.flags |= dns.flags.CD + res = self.sendUDPQuery(query) + self.assertRRsetInAnswer(res, expected) + + # check the protobuf messages corresponding to the UDP query and answer + # but first let the protobuf messages the time to get there + msg = self.getFirstProtobufMessage() + self.checkProtobufQuery(msg, dnsmessage_pb2.PBDNSMessage.UDP, query, dns.rdataclass.IN, dns.rdatatype.A, name, '112.0.0.0') + # then the response + msg = self.getFirstProtobufMessage() + self.checkProtobufResponse(msg, dnsmessage_pb2.PBDNSMessage.UDP, res, '112.0.0.0') + self.assertEquals(len(msg.response.rrs), 1) + rr = msg.response.rrs[0] + # we have max-cache-ttl set to 15 + self.checkProtobufResponseRecord(rr, dns.rdataclass.IN, dns.rdatatype.A, name, 15) + self.assertEquals(socket.inet_ntop(socket.AF_INET, rr.rdata), '192.0.2.42') + self.checkNoRemainingMessage() + +class ProtobufQueriesOnlyTest(TestRecursorProtobuf): + """ + This test makes sure that we correctly export queries but not responses over protobuf. + """ + + _confdir = 'ProtobufQueriesOnly' + _config_template = """ +auth-zones=example=configs/%s/example.zone""" % _confdir + global protobufServerPort + _lua_config_file = """ + protobufServer("127.0.0.1:%d", { logQueries=true, logResponses=false } ) + """ % (protobufServerPort) + + def testA(self): + name = 'a.example.' + expected = dns.rrset.from_text(name, 0, dns.rdataclass.IN, 'A', '192.0.2.42') + query = dns.message.make_query(name, 'A', want_dnssec=True) + query.flags |= dns.flags.CD + res = self.sendUDPQuery(query) + self.assertRRsetInAnswer(res, expected) + + # check the protobuf message corresponding to the UDP query + msg = self.getFirstProtobufMessage() + self.checkProtobufQuery(msg, dnsmessage_pb2.PBDNSMessage.UDP, query, dns.rdataclass.IN, dns.rdatatype.A, name) + # no response + self.checkNoRemainingMessage() + +class ProtobufResponsesOnlyTest(TestRecursorProtobuf): + """ + This test makes sure that we correctly export responses but not queries over protobuf. + """ + + _confdir = 'ProtobufResponsesOnly' + _config_template = """ +auth-zones=example=configs/%s/example.zone""" % _confdir + global protobufServerPort + _lua_config_file = """ + protobufServer("127.0.0.1:%d", { logQueries=false, logResponses=true } ) + """ % (protobufServerPort) + + def testA(self): + name = 'a.example.' + expected = dns.rrset.from_text(name, 0, dns.rdataclass.IN, 'A', '192.0.2.42') + query = dns.message.make_query(name, 'A', want_dnssec=True) + query.flags |= dns.flags.CD + res = self.sendUDPQuery(query) + self.assertRRsetInAnswer(res, expected) + + # check the protobuf message corresponding to the UDP response + msg = self.getFirstProtobufMessage() + self.checkProtobufResponse(msg, dnsmessage_pb2.PBDNSMessage.UDP, res) + self.assertEquals(len(msg.response.rrs), 1) + rr = msg.response.rrs[0] + # we have max-cache-ttl set to 15 + self.checkProtobufResponseRecord(rr, dns.rdataclass.IN, dns.rdatatype.A, name, 15) + self.assertEquals(socket.inet_ntop(socket.AF_INET, rr.rdata), '192.0.2.42') + # nothing else in the queue + self.checkNoRemainingMessage() + +class ProtobufTaggedOnlyTest(TestRecursorProtobuf): + """ + This test makes sure that we correctly export queries and responses but only if they have been tagged. + """ + + _confdir = 'ProtobufTaggedOnly' + _config_template = """ +auth-zones=example=configs/%s/example.zone""" % _confdir + global protobufServerPort + _lua_config_file = """ + protobufServer("127.0.0.1:%d", { logQueries=true, logResponses=true, taggedOnly=true } ) + """ % (protobufServerPort) + _tags = ['tag1', 'tag2'] + _tag_from_gettag = 'tag-from-gettag' + _lua_dns_script_file = """ + function gettag(remote, ednssubnet, localip, qname, qtype, ednsoptions, tcp) + if qname:equal('tagged.example.') then + return 0, { '%s' } + end + return 0 + end + function preresolve(dq) + if dq.qname:equal('tagged.example.') then + dq:addPolicyTag('%s') + dq:addPolicyTag('%s') + end + return false + end + """ % (_tag_from_gettag, _tags[0], _tags[1]) + + def testA(self): + name = 'a.example.' + expected = dns.rrset.from_text(name, 0, dns.rdataclass.IN, 'A', '192.0.2.42') + query = dns.message.make_query(name, 'A', want_dnssec=True) + query.flags |= dns.flags.CD + res = self.sendUDPQuery(query) + self.assertRRsetInAnswer(res, expected) + + # check the protobuf message corresponding to the UDP response + # the first query and answer are not tagged, so there is nothing in the queue + time.sleep(1) + self.checkNoRemainingMessage() + + def testTagged(self): + name = 'tagged.example.' + expected = dns.rrset.from_text(name, 0, dns.rdataclass.IN, 'A', '192.0.2.84') + query = dns.message.make_query(name, 'A', want_dnssec=True) + query.flags |= dns.flags.CD + res = self.sendUDPQuery(query) + self.assertRRsetInAnswer(res, expected) + + # check the protobuf messages corresponding to the UDP query and answer + msg = self.getFirstProtobufMessage() + self.checkProtobufQuery(msg, dnsmessage_pb2.PBDNSMessage.UDP, query, dns.rdataclass.IN, dns.rdatatype.A, name) + self.checkProtobufTags(msg, [self._tag_from_gettag]) + # then the response + msg = self.getFirstProtobufMessage() + self.checkProtobufResponse(msg, dnsmessage_pb2.PBDNSMessage.UDP, res) + self.assertEquals(len(msg.response.rrs), 1) + rr = msg.response.rrs[0] + # we have max-cache-ttl set to 15 + self.checkProtobufResponseRecord(rr, dns.rdataclass.IN, dns.rdatatype.A, name, 15) + self.assertEquals(socket.inet_ntop(socket.AF_INET, rr.rdata), '192.0.2.84') + tags = [self._tag_from_gettag] + self._tags + self.checkProtobufTags(msg, tags) + self.checkNoRemainingMessage() + +class ProtobufSelectedFromLuaTest(TestRecursorProtobuf): + """ + This test makes sure that we correctly export queries and responses but only if they have been selected from Lua. + """ + + _confdir = 'ProtobufSelectedFromLua' + _config_template = """ +auth-zones=example=configs/%s/example.zone""" % _confdir + global protobufServerPort + _lua_config_file = """ + protobufServer("127.0.0.1:%d", { logQueries=false, logResponses=false } ) + """ % (protobufServerPort) + _lua_dns_script_file = """ + local ffi = require("ffi") + + ffi.cdef[[ + typedef struct pdns_ffi_param pdns_ffi_param_t; + + const char* pdns_ffi_param_get_qname(pdns_ffi_param_t* ref); + void pdns_ffi_param_set_log_query(pdns_ffi_param_t* ref, bool logQuery); + ]] + + function gettag_ffi(obj) + qname = ffi.string(ffi.C.pdns_ffi_param_get_qname(obj)) + if qname == 'query-selected.example' then + ffi.C.pdns_ffi_param_set_log_query(obj, true) + end + return 0 + end + + function preresolve(dq) + if dq.qname:equal('answer-selected.example.') then + dq.logResponse = true + end + return false + end + """ + + def testA(self): + name = 'a.example.' + expected = dns.rrset.from_text(name, 0, dns.rdataclass.IN, 'A', '192.0.2.42') + query = dns.message.make_query(name, 'A', want_dnssec=True) + query.flags |= dns.flags.CD + res = self.sendUDPQuery(query) + self.assertRRsetInAnswer(res, expected) + + # check the protobuf message corresponding to the UDP response + # the first query and answer are not selected, so there is nothing in the queue + self.checkNoRemainingMessage() + + def testQuerySelected(self): + name = 'query-selected.example.' + expected = dns.rrset.from_text(name, 0, dns.rdataclass.IN, 'A', '192.0.2.84') + query = dns.message.make_query(name, 'A', want_dnssec=True) + query.flags |= dns.flags.CD + res = self.sendUDPQuery(query) + self.assertRRsetInAnswer(res, expected) + + # check the protobuf messages corresponding to the UDP query + msg = self.getFirstProtobufMessage() + self.checkProtobufQuery(msg, dnsmessage_pb2.PBDNSMessage.UDP, query, dns.rdataclass.IN, dns.rdatatype.A, name) + # there should be no response + self.checkNoRemainingMessage() + + def testResponseSelected(self): + name = 'answer-selected.example.' + expected = dns.rrset.from_text(name, 0, dns.rdataclass.IN, 'A', '192.0.2.84') + query = dns.message.make_query(name, 'A', want_dnssec=True) + query.flags |= dns.flags.CD + res = self.sendUDPQuery(query) + self.assertRRsetInAnswer(res, expected) + + # check the protobuf messages corresponding to the UDP response + msg = self.getFirstProtobufMessage() + self.checkProtobufResponse(msg, dnsmessage_pb2.PBDNSMessage.UDP, res) + self.assertEquals(len(msg.response.rrs), 1) + rr = msg.response.rrs[0] + # we have max-cache-ttl set to 15 + self.checkProtobufResponseRecord(rr, dns.rdataclass.IN, dns.rdatatype.A, name, 15) + self.assertEquals(socket.inet_ntop(socket.AF_INET, rr.rdata), '192.0.2.84') + self.checkNoRemainingMessage()