]> git.ipfire.org Git - thirdparty/pdns.git/commitdiff
rec: Refactor Protobuf options, add query/response selection
authorRemi Gacogne <remi.gacogne@powerdns.com>
Thu, 31 May 2018 13:32:14 +0000 (15:32 +0200)
committerRemi Gacogne <remi.gacogne@powerdns.com>
Fri, 3 Aug 2018 15:35:19 +0000 (17:35 +0200)
- Turn the protobuf options into a table because there are already
too many of them
- Split the masks applied to the initiator to a separate
`setProtobufMasks` directive
- Add the possibility to log only queries, or only responses
- Add the possibility to select queries (FFI only) and responses
for export from the Lua hooks
- Add regression tests for the protobuf features

12 files changed:
pdns/lua-recursor4-ffi.hh
pdns/lua-recursor4.cc
pdns/lua-recursor4.hh
pdns/pdns_recursor.cc
pdns/rec-lua-conf.cc
pdns/rec-lua-conf.hh
pdns/recursordist/docs/lua-config/protobuf.rst
pdns/recursordist/docs/lua-scripting/dq.rst
regression-tests.recursor-dnssec/.gitignore
regression-tests.recursor-dnssec/requirements.txt
regression-tests.recursor-dnssec/runtests
regression-tests.recursor-dnssec/test_Protobuf.py [new file with mode: 0644]

index cf8edf7bd048f6e9832d83ba9c802045cfd2ac97..fb9f63d04f557a5f761f39db6c988e6b20e80ba1 100644 (file)
@@ -53,4 +53,5 @@ extern "C" {
   void pdns_ffi_param_set_deviceid(pdns_ffi_param_t* ref, size_t len, const void* name) __attribute__ ((visibility ("default")));
   void pdns_ffi_param_set_variable(pdns_ffi_param_t* ref, bool variable) __attribute__ ((visibility ("default")));
   void pdns_ffi_param_set_ttl_cap(pdns_ffi_param_t* ref, uint32_t ttl) __attribute__ ((visibility ("default")));
+  void pdns_ffi_param_set_log_query(pdns_ffi_param_t* ref, bool logQuery) __attribute__ ((visibility ("default")));
 }
index beb62ed36b2e51b86e58b41c8f9b892d817a9396..d0f1af10a26c06c7b71186734326444c6470ef6a 100644 (file)
@@ -226,6 +226,7 @@ void RecursorLua4::postPrepareContext()
 
   d_lw->registerMember<bool (DNSQuestion::*)>("variable", [](const DNSQuestion& dq) -> bool { return dq.variable; }, [](DNSQuestion& dq, bool newVariable) { dq.variable = newVariable; });
   d_lw->registerMember<bool (DNSQuestion::*)>("wantsRPZ", [](const DNSQuestion& dq) -> bool { return dq.wantsRPZ; }, [](DNSQuestion& dq, bool newWantsRPZ) { dq.wantsRPZ = newWantsRPZ; });
+  d_lw->registerMember<bool (DNSQuestion::*)>("logResponse", [](const DNSQuestion& dq) -> bool { return dq.logResponse; }, [](DNSQuestion& dq, bool newLogResponse) { dq.logResponse = newLogResponse; });
 
   d_lw->registerMember("rcode", &DNSQuestion::rcode);
   d_lw->registerMember("tag", &DNSQuestion::tag);
@@ -468,7 +469,8 @@ bool RecursorLua4::preoutquery(const ComboAddress& ns, const ComboAddress& reque
 {
   bool variableAnswer = false;
   bool wantsRPZ = false;
-  RecursorLua4::DNSQuestion dq(ns, requestor, query, qtype.getCode(), isTcp, variableAnswer, wantsRPZ);
+  bool logQuery = false;
+  RecursorLua4::DNSQuestion dq(ns, requestor, query, qtype.getCode(), isTcp, variableAnswer, wantsRPZ, logQuery);
   dq.currentRecords = &res;
 
   return genhook(d_preoutquery, dq, ret);
@@ -514,7 +516,7 @@ unsigned int RecursorLua4::gettag(const ComboAddress& remote, const Netmask& edn
 struct pdns_ffi_param
 {
 public:
-  pdns_ffi_param(const DNSName& qname_, uint16_t qtype_, const ComboAddress& local_, const ComboAddress& remote_, const Netmask& ednssubnet_, std::vector<std::string>& policyTags_, const EDNSOptionViewMap& ednsOptions_, std::string& requestorId_, std::string& deviceId_, uint32_t& ttlCap_, bool& variable_, bool tcp_): qname(qname_), local(local_), remote(remote_), ednssubnet(ednssubnet_), policyTags(policyTags_), ednsOptions(ednsOptions_), requestorId(requestorId_), deviceId(deviceId_), ttlCap(ttlCap_), variable(variable_), qtype(qtype_), tcp(tcp_)
+  pdns_ffi_param(const DNSName& qname_, uint16_t qtype_, const ComboAddress& local_, const ComboAddress& remote_, const Netmask& ednssubnet_, std::vector<std::string>& policyTags_, const EDNSOptionViewMap& ednsOptions_, std::string& requestorId_, std::string& deviceId_, uint32_t& ttlCap_, bool& variable_, bool tcp_, bool& logQuery_): qname(qname_), local(local_), remote(remote_), ednssubnet(ednssubnet_), policyTags(policyTags_), ednsOptions(ednsOptions_), requestorId(requestorId_), deviceId(deviceId_), ttlCap(ttlCap_), variable(variable_), logQuery(logQuery_), qtype(qtype_), tcp(tcp_)
   {
   }
 
@@ -534,16 +536,17 @@ public:
   std::string& deviceId;
   uint32_t& ttlCap;
   bool& variable;
+  bool& logQuery;
 
   unsigned int tag{0};
   uint16_t qtype;
   bool tcp;
 };
 
-unsigned int RecursorLua4::gettag_ffi(const ComboAddress& remote, const Netmask& ednssubnet, const ComboAddress& local, const DNSName& qname, uint16_t qtype, std::vector<std::string>* policyTags, LuaContext::LuaObject& data, const EDNSOptionViewMap& ednsOptions, bool tcp, std::string& requestorId, std::string& deviceId, uint32_t& ttlCap, bool& variable) const
+unsigned int RecursorLua4::gettag_ffi(const ComboAddress& remote, const Netmask& ednssubnet, const ComboAddress& local, const DNSName& qname, uint16_t qtype, std::vector<std::string>* policyTags, LuaContext::LuaObject& data, const EDNSOptionViewMap& ednsOptions, bool tcp, std::string& requestorId, std::string& deviceId, uint32_t& ttlCap, bool& variable, bool& logQuery) const
 {
   if (d_gettag_ffi) {
-    pdns_ffi_param_t param(qname, qtype, local, remote, ednssubnet, *policyTags, ednsOptions, requestorId, deviceId, ttlCap, variable, tcp);
+    pdns_ffi_param_t param(qname, qtype, local, remote, ednssubnet, *policyTags, ednsOptions, requestorId, deviceId, ttlCap, variable, tcp, logQuery);
 
     auto ret = d_gettag_ffi(&param);
     if (ret) {
@@ -807,3 +810,8 @@ void pdns_ffi_param_set_ttl_cap(pdns_ffi_param_t* ref, uint32_t ttl)
 {
   ref->ttlCap = ttl;
 }
+
+void pdns_ffi_param_set_log_query(pdns_ffi_param_t* ref, bool logQuery)
+{
+  ref->logQuery = logQuery;
+}
index 6c0bf7ce5f070b782dc6e7a412bb3086e50a0109..55b5646759d06858d56173293c473e3b74c48013 100644 (file)
@@ -60,7 +60,7 @@ public:
 
   struct DNSQuestion
   {
-    DNSQuestion(const ComboAddress& rem, const ComboAddress& loc, const DNSName& query, uint16_t type, bool tcp, bool& variable_, bool& wantsRPZ_): qname(query), qtype(type), local(loc), remote(rem), isTcp(tcp), variable(variable_), wantsRPZ(wantsRPZ_)
+    DNSQuestion(const ComboAddress& rem, const ComboAddress& loc, const DNSName& query, uint16_t type, bool tcp, bool& variable_, bool& wantsRPZ_, bool& logResponse_): qname(query), qtype(type), local(loc), remote(rem), isTcp(tcp), variable(variable_), wantsRPZ(wantsRPZ_), logResponse(logResponse_)
     {
     }
     const DNSName& qname;
@@ -80,6 +80,7 @@ public:
     vState validationState{Indeterminate};
     bool& variable;
     bool& wantsRPZ;
+    bool& logResponse;
     unsigned int tag{0};
 
     void addAnswer(uint16_t type, const std::string& content, boost::optional<int> ttl, boost::optional<string> name);
@@ -110,7 +111,7 @@ public:
   };
 
   unsigned int gettag(const ComboAddress& remote, const Netmask& ednssubnet, const ComboAddress& local, const DNSName& qname, uint16_t qtype, std::vector<std::string>* policyTags, LuaContext::LuaObject& data, const EDNSOptionViewMap&, bool tcp, std::string& requestorId, std::string& deviceId) const;
-  unsigned int gettag_ffi(const ComboAddress& remote, const Netmask& ednssubnet, const ComboAddress& local, const DNSName& qname, uint16_t qtype, std::vector<std::string>* policyTags, LuaContext::LuaObject& data, const EDNSOptionViewMap&, bool tcp, std::string& requestorId, std::string& deviceId, uint32_t& ttlCap, bool& variable) const;
+  unsigned int gettag_ffi(const ComboAddress& remote, const Netmask& ednssubnet, const ComboAddress& local, const DNSName& qname, uint16_t qtype, std::vector<std::string>* policyTags, LuaContext::LuaObject& data, const EDNSOptionViewMap&, bool tcp, std::string& requestorId, std::string& deviceId, uint32_t& ttlCap, bool& variable, bool& logQuery) const;
 
   void maintenance() const;
   bool prerpz(DNSQuestion& dq, int& ret) const;
index ee81153579e5d79e9ea6b60761a2c01dee82f75c..081223e775c6a8d7baebe548919aa12c0efa2ac2 100644 (file)
@@ -963,8 +963,10 @@ static void startDoResolve(void *p)
     // Used to tell syncres later on if we should apply NSDNAME and NSIP RPZ triggers for this query
     bool wantsRPZ(true);
     boost::optional<RecProtoBufMessage> pbMessage(boost::none);
+    bool logResponse = false;
 #ifdef HAVE_PROTOBUF
     if (checkProtobufExport(luaconfsLocal)) {
+      logResponse = t_protobufServer && luaconfsLocal->protobufExportConfig.logResponses;
       Netmask requestorNM(dc->d_source, dc->d_source.sin4.sin_family == AF_INET ? luaconfsLocal->protobufMaskV4 : luaconfsLocal->protobufMaskV6);
       const ComboAddress& requestor = requestorNM.getMaskedNetwork();
       pbMessage = RecProtoBufMessage(RecProtoBufMessage::Response);
@@ -1024,7 +1026,7 @@ static void startDoResolve(void *p)
     int res = RCode::NoError;
     DNSFilterEngine::Policy appliedPolicy;
     DNSRecord spoofed;
-    RecursorLua4::DNSQuestion dq(dc->d_source, dc->d_destination, dc->d_mdp.d_qname, dc->d_mdp.d_qtype, dc->d_tcp, variableAnswer, wantsRPZ);
+    RecursorLua4::DNSQuestion dq(dc->d_source, dc->d_destination, dc->d_mdp.d_qname, dc->d_mdp.d_qtype, dc->d_tcp, variableAnswer, wantsRPZ, logResponse);
     dq.ednsFlags = &edo.d_extFlags;
     dq.ednsOptions = &ednsOpts;
     dq.tag = dc->d_tag;
@@ -1372,7 +1374,7 @@ static void startDoResolve(void *p)
     g_rs.submitResponse(dc->d_mdp.d_qtype, packet.size(), !dc->d_tcp);
     updateResponseStats(res, dc->d_source, packet.size(), &dc->d_mdp.d_qname, dc->d_mdp.d_qtype);
 #ifdef HAVE_PROTOBUF
-    if (t_protobufServer && (!luaconfsLocal->protobufTaggedOnly || (appliedPolicy.d_name && !appliedPolicy.d_name->empty()) || !dc->d_policyTags.empty())) {
+    if (t_protobufServer && logResponse && (!luaconfsLocal->protobufExportConfig.taggedOnly || (appliedPolicy.d_name && !appliedPolicy.d_name->empty()) || !dc->d_policyTags.empty())) {
       pbMessage->setBytes(packet.size());
       pbMessage->setResponseCode(pw.getHeader()->rcode);
       if (appliedPolicy.d_name) {
@@ -1731,6 +1733,7 @@ static void handleRunningTCPQuestion(int fd, FDMultiplexer::funcparam_t& var)
       if (checkProtobufExport(luaconfsLocal)) {
         needECS = true;
       }
+      bool logQuery = t_protobufServer && luaconfsLocal->protobufExportConfig.logQueries;
 #endif
 
       if(needECS || needXPF || (t_pdl && (t_pdl->d_gettag_ffi || t_pdl->d_gettag))) {
@@ -1747,7 +1750,7 @@ static void handleRunningTCPQuestion(int fd, FDMultiplexer::funcparam_t& var)
           if(t_pdl) {
             try {
               if (t_pdl->d_gettag_ffi) {
-                dc->d_tag = t_pdl->gettag_ffi(dc->d_source, dc->d_ednssubnet.source, dc->d_destination, qname, qtype, &dc->d_policyTags, dc->d_data, ednsOptions, true, requestorId, deviceId, dc->d_ttlCap, dc->d_variable);
+                dc->d_tag = t_pdl->gettag_ffi(dc->d_source, dc->d_ednssubnet.source, dc->d_destination, qname, qtype, &dc->d_policyTags, dc->d_data, ednsOptions, true, requestorId, deviceId, dc->d_ttlCap, dc->d_variable, logQuery);
               }
               else if (t_pdl->d_gettag) {
                 dc->d_tag = t_pdl->gettag(dc->d_source, dc->d_ednssubnet.source, dc->d_destination, qname, qtype, &dc->d_policyTags, dc->d_data, ednsOptions, true, requestorId, deviceId);
@@ -1776,7 +1779,7 @@ static void handleRunningTCPQuestion(int fd, FDMultiplexer::funcparam_t& var)
         try {
           const struct dnsheader* dh = (const struct dnsheader*) conn->data;
 
-          if (!luaconfsLocal->protobufTaggedOnly) {
+          if (logQuery && (!luaconfsLocal->protobufExportConfig.taggedOnly || !dc->d_policyTags.empty())) {
             protobufLogQuery(t_protobufServer, luaconfsLocal->protobufMaskV4, luaconfsLocal->protobufMaskV6, dc->d_uuid, dc->d_source, dc->d_destination, dc->d_ednssubnet.source, true, dh->id, conn->qlen, qname, qtype, qclass, dc->d_policyTags, dc->d_requestorId, dc->d_deviceId);
           }
         }
@@ -1900,6 +1903,8 @@ static string* doProcessUDPQuestion(const std::string& question, const ComboAddr
   } else if (checkOutgoingProtobufExport(luaconfsLocal)) {
     uniqueId = (*t_uuidGenerator)();
   }
+  bool logQuery = t_protobufServer && luaconfsLocal->protobufExportConfig.logQueries;
+  bool logResponse = t_protobufServer && luaconfsLocal->protobufExportConfig.logResponses;
 #endif
   EDNSSubnetOpts ednssubnet;
   bool ecsFound = false;
@@ -1941,7 +1946,7 @@ static string* doProcessUDPQuestion(const std::string& question, const ComboAddr
         if(t_pdl) {
           try {
             if (t_pdl->d_gettag_ffi) {
-              ctag = t_pdl->gettag_ffi(source, ednssubnet.source, destination, qname, qtype, &policyTags, data, ednsOptions, false, requestorId, deviceId, ttlCap, variable);
+              ctag = t_pdl->gettag_ffi(source, ednssubnet.source, destination, qname, qtype, &policyTags, data, ednsOptions, false, requestorId, deviceId, ttlCap, variable, logQuery);
             }
             else if (t_pdl->d_gettag) {
               ctag = t_pdl->gettag(source, ednssubnet.source, destination, qname, qtype, &policyTags, data, ednsOptions, false, requestorId, deviceId);
@@ -1965,7 +1970,7 @@ static string* doProcessUDPQuestion(const std::string& question, const ComboAddr
 #ifdef HAVE_PROTOBUF
     if(t_protobufServer) {
       pbMessage = RecProtoBufMessage(DNSProtoBufMessage::DNSProtoBufMessageType::Response);
-      if (!luaconfsLocal->protobufTaggedOnly || !policyTags.empty()) {
+      if (logQuery && (!luaconfsLocal->protobufExportConfig.taggedOnly || !policyTags.empty())) {
         protobufLogQuery(t_protobufServer, luaconfsLocal->protobufMaskV4, luaconfsLocal->protobufMaskV6, uniqueId, source, destination, ednssubnet.source, false, dh->id, question.size(), qname, qtype, qclass, policyTags, requestorId, deviceId);
       }
     }
@@ -1991,7 +1996,7 @@ static string* doProcessUDPQuestion(const std::string& question, const ComboAddr
       }
 
 #ifdef HAVE_PROTOBUF
-      if(t_protobufServer && (!luaconfsLocal->protobufTaggedOnly || !pbMessage->getAppliedPolicy().empty() || !pbMessage->getPolicyTags().empty())) {
+      if(t_protobufServer && logResponse && (!luaconfsLocal->protobufExportConfig.taggedOnly || !pbMessage->getAppliedPolicy().empty() || !pbMessage->getPolicyTags().empty())) {
         Netmask requestorNM(source, source.sin4.sin_family == AF_INET ? luaconfsLocal->protobufMaskV4 : luaconfsLocal->protobufMaskV6);
         const ComboAddress& requestor = requestorNM.getMaskedNetwork();
         pbMessage->update(uniqueId, &requestor, &destination, false, dh->id);
index cac3cc71e3347b595b6cbbeed167697afec18e16..d8ca13de2e9488b2147812b9edc9caf44b4e3696 100644 (file)
@@ -81,6 +81,43 @@ static void parseRPZParameters(const std::unordered_map<string,boost::variant<ui
   }
 }
 
+typedef std::unordered_map<std::string, boost::variant<bool, uint64_t, std::string> > protobufOptions_t;
+
+static void parseProtobufOptions(boost::optional<protobufOptions_t> vars, ProtobufExportConfig& config)
+{
+  if (!vars) {
+    return;
+  }
+
+  if (vars->count("timeout")) {
+    config.timeout = boost::get<uint64_t>((*vars)["timeout"]);
+  }
+
+  if (vars->count("maxQueuedEntries")) {
+    config.maxQueuedEntries = boost::get<uint64_t>((*vars)["maxQueuedEntries"]);
+  }
+
+  if (vars->count("reconnectWaitTime")) {
+    config.reconnectWaitTime = boost::get<uint64_t>((*vars)["reconnectWaitTime"]);
+  }
+
+  if (vars->count("asyncConnect")) {
+    config.asyncConnect = boost::get<bool>((*vars)["asyncConnect"]);
+  }
+
+  if (vars->count("taggedOnly")) {
+    config.taggedOnly = boost::get<bool>((*vars)["taggedOnly"]);
+  }
+
+  if (vars->count("logQueries")) {
+    config.logQueries = boost::get<bool>((*vars)["logQueries"]);
+  }
+
+  if (vars->count("logResponses")) {
+    config.logResponses = boost::get<bool>((*vars)["logResponses"]);
+  }
+}
+
 void loadRecursorLuaConfig(const std::string& fname, luaConfigDelayedThreads& delayedThreads)
 {
   LuaConfigItems lci;
@@ -314,40 +351,17 @@ void loadRecursorLuaConfig(const std::string& fname, luaConfigDelayedThreads& de
     });
 
 #if HAVE_PROTOBUF
-  Lua.writeFunction("protobufServer", [&lci](const string& server_, const boost::optional<uint16_t> timeout, const boost::optional<uint64_t> maxQueuedEntries, const boost::optional<uint8_t> reconnectWaitTime, const boost::optional<uint8_t> maskV4, boost::optional<uint8_t> maskV6, boost::optional<bool> asyncConnect, boost::optional<bool> taggedOnly) {
+  Lua.writeFunction("setProtobufMasks", [&lci](const uint8_t maskV4, uint8_t maskV6) {
+      lci.protobufMaskV4 = maskV4;
+      lci.protobufMaskV6 = maskV6;
+    });
+
+  Lua.writeFunction("protobufServer", [&lci](const string& server_, boost::optional<protobufOptions_t> vars) {
       try {
-       ComboAddress server(server_);
         if (!lci.protobufExportConfig.enabled) {
-
           lci.protobufExportConfig.enabled = true;
-
-          lci.protobufExportConfig.server = server;
-
-          if (timeout) {
-            lci.protobufExportConfig.timeout = *timeout;
-          }
-
-          if (maxQueuedEntries) {
-            lci.protobufExportConfig.maxQueuedEntries = *maxQueuedEntries;
-          }
-
-          if (reconnectWaitTime) {
-            lci.protobufExportConfig.reconnectWaitTime = *reconnectWaitTime;
-          }
-
-          if (asyncConnect) {
-            lci.protobufExportConfig.asyncConnect = *asyncConnect;
-          }
-
-          if (maskV4) {
-            lci.protobufMaskV4 = *maskV4;
-          }
-          if (maskV6) {
-            lci.protobufMaskV6 = *maskV6;
-          }
-          if (taggedOnly) {
-            lci.protobufTaggedOnly = *taggedOnly;
-          }
+          lci.protobufExportConfig.server = ComboAddress(server_);
+          parseProtobufOptions(vars, lci.protobufExportConfig);
         }
         else {
           g_log<<Logger::Error<<"Only one protobuf server can be configured, we already have "<<lci.protobufExportConfig.server.toString()<<endl;
@@ -361,30 +375,12 @@ void loadRecursorLuaConfig(const std::string& fname, luaConfigDelayedThreads& de
       }
     });
 
-  Lua.writeFunction("outgoingProtobufServer", [&lci](const string& server_, const boost::optional<uint16_t> timeout, const boost::optional<uint64_t> maxQueuedEntries, const boost::optional<uint8_t> reconnectWaitTime, boost::optional<bool> asyncConnect) {
+  Lua.writeFunction("outgoingProtobufServer", [&lci](const string& server_, boost::optional<protobufOptions_t> vars) {
       try {
-       ComboAddress server(server_);
         if (!lci.outgoingProtobufExportConfig.enabled) {
-
           lci.outgoingProtobufExportConfig.enabled = true;
-
-          lci.outgoingProtobufExportConfig.server = server;
-
-          if (timeout) {
-            lci.outgoingProtobufExportConfig.timeout = *timeout;
-          }
-
-          if (maxQueuedEntries) {
-            lci.outgoingProtobufExportConfig.maxQueuedEntries = *maxQueuedEntries;
-          }
-
-          if (reconnectWaitTime) {
-            lci.outgoingProtobufExportConfig.reconnectWaitTime = *reconnectWaitTime;
-          }
-
-          if (asyncConnect) {
-            lci.outgoingProtobufExportConfig.asyncConnect = *asyncConnect;
-          }
+          lci.protobufExportConfig.server = ComboAddress(server_);
+          parseProtobufOptions(vars, lci.protobufExportConfig);
         }
         else {
           g_log<<Logger::Error<<"Only one protobuf server can be configured, we already have "<<lci.outgoingProtobufExportConfig.server.toString()<<endl;
index a9ba373e24356e3da2f6a2e67b44863158725d1a..4af94ecc2f1989ea67c4919f45c1da3b2e41e95a 100644 (file)
@@ -33,6 +33,9 @@ struct ProtobufExportConfig
   uint16_t reconnectWaitTime{1};
   bool asyncConnect{false};
   bool enabled{false};
+  bool logQueries{true};
+  bool logResponses{true};
+  bool taggedOnly{false};
 };
 
 class LuaConfigItems 
@@ -51,7 +54,6 @@ public:
   uint64_t generation{0};
   uint8_t protobufMaskV4{32};
   uint8_t protobufMaskV6{128};
-  bool protobufTaggedOnly{false};
 };
 
 extern GlobalStateHolder<LuaConfigItems> g_luaconfs;
index b7ac703cf66a43dc7189ef3aaa6c835fb15469f3..40d614bcc8c2eca86ced69d9f4e8454e51227131 100644 (file)
@@ -12,8 +12,27 @@ Configuring Protocol Buffer logs
 --------------------------------
 Protobuf export to a server is enabled using the ``protobufServer()`` directive:
 
+.. function:: protobufServer(server [, options]))
+
+  .. versionadded:: 4.2.0
+
+  :param string server: The IP and port to connect to
+  :param table options: A table with key: value pairs with options.
+
+  Options:
+
+  * ``timeout=2``: int - Time in seconds to wait when sending a message
+  * ``maxQueuedEntries=100``: int - How many entries will be kept in memory if the server becomes unreachable
+  * ``reconnectWaitTime=1``: int - How long to wait, in seconds, between two reconnection attempts
+  * ``taggedOnly=false``: bool - Only entries with a policy or a policy tag set will be sent
+  * ``asyncConnect``: bool - When set to false (default) the first connection to the server during startup will block up to ``timeout`` seconds, otherwise the connection is done in a separate thread, after the first message has been queued
+  * ``logQueries=true``: bool - Whether to export queries
+  * ``logResponses=true``: bool - Whether to export responses
+
 .. function:: protobufServer(server [[[[[[[, timeout=2], maxQueuedEntries=100], reconnectWaitTime=1], maskV4=32], maskV6=128], asyncConnect=false], taggedOnly=false])
 
+  .. deprecated:: 4.2.0
+
   :param string server: The IP and port to connect to
   :param int timeout: Time in seconds to wait when sending a message
   :param int maxQueuedEntries: How many entries will be kept in memory if the server becomes unreachable
@@ -23,13 +42,39 @@ Protobuf export to a server is enabled using the ``protobufServer()`` directive:
   :param bool taggedOnly: Only entries with a policy or a policy tag set will be sent.
   :param bool asyncConnect: When set to false (default) the first connection to the server during startup will block up to ``timeout`` seconds, otherwise the connection is done in a separate thread, after the first message has been queued..
 
+.. function:: setProtobufMasks(maskv4, maskV6)
+
+  .. versionadded:: 4.2.0
+
+  :param int maskV4: network mask to apply to the client IPv4 addresses, for anonymization purposes. The default of 32 means no anonymization.
+  :param int maskV6: Same as maskV4, but for IPv6. Defaults to 128.
+
 Logging outgoing queries and responses
 --------------------------------------
 
 While :func:`protobufServer` only exports the queries sent to the recursor from clients, with the corresponding responses, ``outgoingProtobufServer()`` can be used to export outgoing queries sent by the recursor to authoritative servers, along with the corresponding responses.
 
+.. function:: outgoingProtobufServer(server [, options])
+
+  .. versionadded:: 4.2.0
+
+  :param string server: The IP and port to connect to
+  :param table options: A table with key: value pairs with options.
+
+  Options:
+
+  * ``timeout=2``: int - Time in seconds to wait when sending a message
+  * ``maxQueuedEntries=100``: int - How many entries will be kept in memory if the server becomes unreachable
+  * ``reconnectWaitTime=1``: int - How long to wait, in seconds, between two reconnection attempts
+  * ``taggedOnly=false``: bool - Only entries with a policy or a policy tag set will be sent
+  * ``asyncConnect``: bool - When set to false (default) the first connection to the server during startup will block up to ``timeout`` seconds, otherwise the connection is done in a separate thread, after the first message has been queued
+  * ``logQueries=true``: bool - Whether to export queries
+  * ``logResponses=true``: bool - Whether to export responses
+
 .. function:: outgoingProtobufServer(server [[[[, timeout=2], maxQueuedEntries=100], reconnectWaitTime=1], asyncConnect=false])
 
+  .. deprecated:: 4.2.0
+
   :param string server: The IP and port to connect to
   :param int timeout: Time in seconds to wait when sending a message
   :param int maxQueuedEntries: How many entries will be kept in memory if the server becomes unreachable
index 9f6f78ec0a49426607e9b94fb28b3c25a8a2cf9f..fc3cc847d2ca3ff91050c2feb5c91b772245a165 100644 (file)
@@ -133,6 +133,12 @@ The DNSQuestion object contains at least the following fields:
       Possible states are ``pdns.validationstates.Indeterminate``, ``pdns.validationstates.Bogus``, ``pdns.validationstates.Insecure`` and ``pdns.validationstates.Secure``.
       The result will always be ``pdns.validationstates.Indeterminate`` is validation is disabled or was not requested.
 
+  .. attribute:: DNSQuestion.logResponse
+
+      .. versionadded:: 4.2.0
+
+      Whether the response to this query will be exported to a remote protobuf logger, if one has been configured.
+
   It also supports the following methods:
 
   .. method:: DNSQuestion:addAnswer(type, content, [ttl, name])
index 7103d74161e8a3311a96385d19e45f7e4453da6e..4b621b2f5e0b04401450e1495161132d0adffe2c 100644 (file)
@@ -3,3 +3,4 @@
 /.venv
 /configs
 /vars
+/*_pb2.py
index 18aa37e592302caca42b29184d237293587c62dc..4f5afe9d85890b50c8196ee090d7e3889d5c7f06 100644 (file)
@@ -1,5 +1,7 @@
 dnspython>=1.11
 nose>=1.3.7
+protobuf>=2.5; sys_platform != 'darwin'
+protobuf>=3.0; sys_platform == 'darwin'
 pysnmp>=4.3.4
 requests>=2.1.0
 Twisted>0.15.0
index 9682f92eb63e31bb7fa610bf117f16a5b90ee141..2878142216a480ec5e6f6496d6bb7d9b5b77cc00 100755 (executable)
@@ -9,6 +9,8 @@ fi
 python -V
 pip install -r requirements.txt
 
+protoc -I=../pdns/ --python_out=. ../pdns/dnsmessage.proto
+
 mkdir -p configs
 
 [ -f ./vars ] && . ./vars
@@ -42,4 +44,4 @@ set -e
 if [ "${PDNS_DEBUG}" = "YES" ]; then
   set -x
 fi
-LD_PRELOAD="${LIBAUTHBIND} ${LIBFAKETIME}" nosetests -I test_WellKnown.py --with-xunit $@
+LD_PRELOAD="${LIBASAN} ${LIBAUTHBIND} ${LIBFAKETIME}" nosetests -I test_WellKnown.py --with-xunit $@
diff --git a/regression-tests.recursor-dnssec/test_Protobuf.py b/regression-tests.recursor-dnssec/test_Protobuf.py
new file mode 100644 (file)
index 0000000..469c680
--- /dev/null
@@ -0,0 +1,484 @@
+import dns
+import dnsmessage_pb2
+import os
+import socket
+import struct
+import sys
+import threading
+import time
+
+# Python2/3 compatibility hacks
+if sys.version_info[0] == 2:
+  from Queue import Queue
+  range = xrange
+else:
+  from queue import Queue
+  range = range  # allow re-export of the builtin name
+
+from recursortests import RecursorTest
+
+protobufQueue = Queue()
+protobufServerPort = 4243
+
+def ProtobufConnectionHandler(queue, conn):
+    data = None
+    while True:
+        data = conn.recv(2)
+        if not data:
+            break
+        (datalen,) = struct.unpack("!H", data)
+        data = conn.recv(datalen)
+        if not data:
+            break
+
+        queue.put(data, True, timeout=2.0)
+
+    conn.close()
+
+def ProtobufListener(port):
+    global protobufQueue
+    sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+    sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1)
+    try:
+        sock.bind(("127.0.0.1", port))
+    except socket.error as e:
+        print("Error binding in the protobuf listener: %s" % str(e))
+        sys.exit(1)
+
+    sock.listen(100)
+    while True:
+        try:
+            (conn, _) = sock.accept()
+            thread = threading.Thread(name='Connection Handler',
+                                      target=ProtobufConnectionHandler,
+                                      args=[protobufQueue, conn])
+            thread.setDaemon(True)
+            thread.start()
+
+        except socket.error as e:
+            print('Error in protobuf socket: %s' % str(e))
+
+    sock.close()
+
+
+protobufListener = threading.Thread(name='Protobuf Listener', target=ProtobufListener, args=[protobufServerPort])
+protobufListener.setDaemon(True)
+protobufListener.start()
+
+class TestRecursorProtobuf(RecursorTest):
+
+    global protobufServerPort
+    _lua_config_file = """
+    protobufServer("127.0.0.1:%d")
+    """ % (protobufServerPort)
+
+
+    def getFirstProtobufMessage(self, retries=1, waitTime=1):
+        global protobufQueue
+        failed = 0
+
+        while protobufQueue.empty:
+          if failed >= retries:
+            break
+
+          failed = failed + 1
+          time.sleep(waitTime)
+
+        self.assertFalse(protobufQueue.empty())
+        data = protobufQueue.get(False)
+        self.assertTrue(data)
+        msg = dnsmessage_pb2.PBDNSMessage()
+        msg.ParseFromString(data)
+        return msg
+
+    def checkNoRemainingMessage(self):
+        global protobufQueue
+        self.assertTrue(protobufQueue.empty())
+
+    def checkProtobufBase(self, msg, protocol, query, initiator, normalQueryResponse=True, expectedECS=None):
+        self.assertTrue(msg)
+        self.assertTrue(msg.HasField('timeSec'))
+        self.assertTrue(msg.HasField('socketFamily'))
+        self.assertEquals(msg.socketFamily, dnsmessage_pb2.PBDNSMessage.INET)
+        self.assertTrue(msg.HasField('from'))
+        fromvalue = getattr(msg, 'from')
+        self.assertEquals(socket.inet_ntop(socket.AF_INET, fromvalue), initiator)
+        self.assertTrue(msg.HasField('socketProtocol'))
+        self.assertEquals(msg.socketProtocol, protocol)
+        self.assertTrue(msg.HasField('messageId'))
+        self.assertTrue(msg.HasField('id'))
+        self.assertEquals(msg.id, query.id)
+        self.assertTrue(msg.HasField('inBytes'))
+        if normalQueryResponse:
+            # compare inBytes with length of query/response
+            self.assertEquals(msg.inBytes, len(query.to_wire()))
+        if expectedECS is not None:
+            self.assertTrue(msg.HasField('originalRequestorSubnet'))
+            # v4 only for now
+            self.assertEquals(len(msg.originalRequestorSubnet), 4)
+            self.assertEquals(socket.inet_ntop(socket.AF_INET, msg.originalRequestorSubnet), '127.0.0.1')
+
+    def checkProtobufQuery(self, msg, protocol, query, qclass, qtype, qname, initiator='127.0.0.1'):
+        self.assertEquals(msg.type, dnsmessage_pb2.PBDNSMessage.DNSQueryType)
+        self.checkProtobufBase(msg, protocol, query, initiator)
+        # dnsdist doesn't fill the responder field for responses
+        # because it doesn't keep the information around.
+        self.assertTrue(msg.HasField('to'))
+        self.assertEquals(socket.inet_ntop(socket.AF_INET, msg.to), '127.0.0.1')
+        self.assertTrue(msg.HasField('question'))
+        self.assertTrue(msg.question.HasField('qClass'))
+        self.assertEquals(msg.question.qClass, qclass)
+        self.assertTrue(msg.question.HasField('qType'))
+        self.assertEquals(msg.question.qClass, qtype)
+        self.assertTrue(msg.question.HasField('qName'))
+        self.assertEquals(msg.question.qName, qname)
+
+    def checkProtobufResponse(self, msg, protocol, response, initiator='127.0.0.1'):
+        self.assertEquals(msg.type, dnsmessage_pb2.PBDNSMessage.DNSResponseType)
+        self.checkProtobufBase(msg, protocol, response, initiator)
+        self.assertTrue(msg.HasField('response'))
+        self.assertTrue(msg.response.HasField('queryTimeSec'))
+
+    def checkProtobufResponseRecord(self, record, rclass, rtype, rname, rttl):
+        self.assertTrue(record.HasField('class'))
+        self.assertEquals(getattr(record, 'class'), rclass)
+        self.assertTrue(record.HasField('type'))
+        self.assertEquals(record.type, rtype)
+        self.assertTrue(record.HasField('name'))
+        self.assertEquals(record.name, rname)
+        self.assertTrue(record.HasField('ttl'))
+        self.assertEquals(record.ttl, rttl)
+        self.assertTrue(record.HasField('rdata'))
+
+    def checkProtobufPolicy(self, msg, policyType, reason):
+        self.assertEquals(msg.type, dnsmessage_pb2.PBDNSMessage.DNSResponseType)
+        self.assertTrue(msg.response.HasField('appliedPolicyType'))
+        self.assertTrue(msg.response.HasField('appliedPolicy'))
+        self.assertEquals(msg.response.appliedPolicy, reason)
+        self.assertEquals(msg.response.appliedPolicyType, policyType)
+
+    def checkProtobufTags(self, msg, tags):
+        self.assertEquals(len(msg.response.tags), len(tags))
+        for tag in msg.response.tags:
+            self.assertTrue(tag in tags)
+
+    @classmethod
+    def setUpClass(cls):
+
+        global protobufListener
+        global protobufServerPort
+        global ProtobufListener
+        if protobufListener is None or not protobufListener.isAlive():
+            protobufListener = threading.Thread(name='Protobuf Listener', target=ProtobufListener, args=[protobufServerPort])
+            protobufListener.setDaemon(True)
+            protobufListener.start()
+
+        cls.setUpSockets()
+
+        cls.startResponders()
+
+        confdir = os.path.join('configs', cls._confdir)
+        cls.createConfigDir(confdir)
+
+        cls.generateRecursorConfig(confdir)
+        cls.startRecursor(confdir, cls._recursorPort)
+
+    def setUp(self):
+      # Make sure the queue is empty, in case
+      # a previous test failed
+      global protobufQueue
+      while not protobufQueue.empty():
+        protobufQueue.get(False)
+
+    @classmethod
+    def generateRecursorConfig(cls, confdir):
+        authzonepath = os.path.join(confdir, 'example.zone')
+        with open(authzonepath, 'w') as authzone:
+            authzone.write("""$ORIGIN example.
+@ 3600 IN SOA {soa}
+a 3600 IN A 192.0.2.42
+tagged 3600 IN A 192.0.2.84
+query-selected 3600 IN A 192.0.2.84
+answer-selected 3600 IN A 192.0.2.84
+""".format(soa=cls._SOA))
+        super(TestRecursorProtobuf, cls).generateRecursorConfig(confdir)
+
+    @classmethod
+    def tearDownClass(cls):
+        cls.tearDownRecursor()
+
+class ProtobufDefaultTest(TestRecursorProtobuf):
+    """
+    This test makes sure that we correctly export queries and response over protobuf.
+    """
+
+    _confdir = 'ProtobufDefault'
+    _config_template = """
+auth-zones=example=configs/%s/example.zone""" % _confdir
+
+    def testA(self):
+        name = 'a.example.'
+        expected = dns.rrset.from_text(name, 0, dns.rdataclass.IN, 'A', '192.0.2.42')
+        query = dns.message.make_query(name, 'A', want_dnssec=True)
+        query.flags |= dns.flags.CD
+        res = self.sendUDPQuery(query)
+        self.assertRRsetInAnswer(res, expected)
+
+        # check the protobuf messages corresponding to the UDP query and answer
+        msg = self.getFirstProtobufMessage()
+        self.checkProtobufQuery(msg, dnsmessage_pb2.PBDNSMessage.UDP, query, dns.rdataclass.IN, dns.rdatatype.A, name)
+        # then the response
+        msg = self.getFirstProtobufMessage()
+        self.checkProtobufResponse(msg, dnsmessage_pb2.PBDNSMessage.UDP, res)
+        self.assertEquals(len(msg.response.rrs), 1)
+        rr = msg.response.rrs[0]
+        # we have max-cache-ttl set to 15
+        self.checkProtobufResponseRecord(rr, dns.rdataclass.IN, dns.rdatatype.A, name, 15)
+        self.assertEquals(socket.inet_ntop(socket.AF_INET, rr.rdata), '192.0.2.42')
+        self.checkNoRemainingMessage()
+
+class ProtobufMasksTest(TestRecursorProtobuf):
+    """
+    This test makes sure that we correctly export queries and response over protobuf, respecting the configured initiator masking.
+    """
+
+    _confdir = 'ProtobufMasks'
+    _config_template = """
+auth-zones=example=configs/%s/example.zone""" % _confdir
+    global protobufServerPort
+    _protobufMaskV4 = 4
+    _protobufMaskV6 = 128
+    _lua_config_file = """
+    protobufServer("127.0.0.1:%d")
+    setProtobufMasks(%d, %d)
+    """ % (protobufServerPort, _protobufMaskV4, _protobufMaskV6)
+
+    def testA(self):
+        name = 'a.example.'
+        expected = dns.rrset.from_text(name, 0, dns.rdataclass.IN, 'A', '192.0.2.42')
+        query = dns.message.make_query(name, 'A', want_dnssec=True)
+        query.flags |= dns.flags.CD
+        res = self.sendUDPQuery(query)
+        self.assertRRsetInAnswer(res, expected)
+
+        # check the protobuf messages corresponding to the UDP query and answer
+        # but first let the protobuf messages the time to get there
+        msg = self.getFirstProtobufMessage()
+        self.checkProtobufQuery(msg, dnsmessage_pb2.PBDNSMessage.UDP, query, dns.rdataclass.IN, dns.rdatatype.A, name, '112.0.0.0')
+        # then the response
+        msg = self.getFirstProtobufMessage()
+        self.checkProtobufResponse(msg, dnsmessage_pb2.PBDNSMessage.UDP, res, '112.0.0.0')
+        self.assertEquals(len(msg.response.rrs), 1)
+        rr = msg.response.rrs[0]
+        # we have max-cache-ttl set to 15
+        self.checkProtobufResponseRecord(rr, dns.rdataclass.IN, dns.rdatatype.A, name, 15)
+        self.assertEquals(socket.inet_ntop(socket.AF_INET, rr.rdata), '192.0.2.42')
+        self.checkNoRemainingMessage()
+
+class ProtobufQueriesOnlyTest(TestRecursorProtobuf):
+    """
+    This test makes sure that we correctly export queries but not responses over protobuf.
+    """
+
+    _confdir = 'ProtobufQueriesOnly'
+    _config_template = """
+auth-zones=example=configs/%s/example.zone""" % _confdir
+    global protobufServerPort
+    _lua_config_file = """
+    protobufServer("127.0.0.1:%d", { logQueries=true, logResponses=false } )
+    """ % (protobufServerPort)
+
+    def testA(self):
+        name = 'a.example.'
+        expected = dns.rrset.from_text(name, 0, dns.rdataclass.IN, 'A', '192.0.2.42')
+        query = dns.message.make_query(name, 'A', want_dnssec=True)
+        query.flags |= dns.flags.CD
+        res = self.sendUDPQuery(query)
+        self.assertRRsetInAnswer(res, expected)
+
+        # check the protobuf message corresponding to the UDP query
+        msg = self.getFirstProtobufMessage()
+        self.checkProtobufQuery(msg, dnsmessage_pb2.PBDNSMessage.UDP, query, dns.rdataclass.IN, dns.rdatatype.A, name)
+        # no response
+        self.checkNoRemainingMessage()
+
+class ProtobufResponsesOnlyTest(TestRecursorProtobuf):
+    """
+    This test makes sure that we correctly export responses but not queries over protobuf.
+    """
+
+    _confdir = 'ProtobufResponsesOnly'
+    _config_template = """
+auth-zones=example=configs/%s/example.zone""" % _confdir
+    global protobufServerPort
+    _lua_config_file = """
+    protobufServer("127.0.0.1:%d", { logQueries=false, logResponses=true } )
+    """ % (protobufServerPort)
+
+    def testA(self):
+        name = 'a.example.'
+        expected = dns.rrset.from_text(name, 0, dns.rdataclass.IN, 'A', '192.0.2.42')
+        query = dns.message.make_query(name, 'A', want_dnssec=True)
+        query.flags |= dns.flags.CD
+        res = self.sendUDPQuery(query)
+        self.assertRRsetInAnswer(res, expected)
+
+        # check the protobuf message corresponding to the UDP response
+        msg = self.getFirstProtobufMessage()
+        self.checkProtobufResponse(msg, dnsmessage_pb2.PBDNSMessage.UDP, res)
+        self.assertEquals(len(msg.response.rrs), 1)
+        rr = msg.response.rrs[0]
+        # we have max-cache-ttl set to 15
+        self.checkProtobufResponseRecord(rr, dns.rdataclass.IN, dns.rdatatype.A, name, 15)
+        self.assertEquals(socket.inet_ntop(socket.AF_INET, rr.rdata), '192.0.2.42')
+        # nothing else in the queue
+        self.checkNoRemainingMessage()
+
+class ProtobufTaggedOnlyTest(TestRecursorProtobuf):
+    """
+    This test makes sure that we correctly export queries and responses but only if they have been tagged.
+    """
+
+    _confdir = 'ProtobufTaggedOnly'
+    _config_template = """
+auth-zones=example=configs/%s/example.zone""" % _confdir
+    global protobufServerPort
+    _lua_config_file = """
+    protobufServer("127.0.0.1:%d", { logQueries=true, logResponses=true, taggedOnly=true } )
+    """ % (protobufServerPort)
+    _tags = ['tag1', 'tag2']
+    _tag_from_gettag = 'tag-from-gettag'
+    _lua_dns_script_file = """
+    function gettag(remote, ednssubnet, localip, qname, qtype, ednsoptions, tcp)
+      if qname:equal('tagged.example.') then
+        return 0, { '%s' }
+      end
+      return 0
+    end
+    function preresolve(dq)
+      if dq.qname:equal('tagged.example.') then
+        dq:addPolicyTag('%s')
+        dq:addPolicyTag('%s')
+      end
+      return false
+    end
+    """ % (_tag_from_gettag, _tags[0], _tags[1])
+
+    def testA(self):
+        name = 'a.example.'
+        expected = dns.rrset.from_text(name, 0, dns.rdataclass.IN, 'A', '192.0.2.42')
+        query = dns.message.make_query(name, 'A', want_dnssec=True)
+        query.flags |= dns.flags.CD
+        res = self.sendUDPQuery(query)
+        self.assertRRsetInAnswer(res, expected)
+
+        # check the protobuf message corresponding to the UDP response
+        # the first query and answer are not tagged, so there is nothing in the queue
+        time.sleep(1)
+        self.checkNoRemainingMessage()
+
+    def testTagged(self):
+        name = 'tagged.example.'
+        expected = dns.rrset.from_text(name, 0, dns.rdataclass.IN, 'A', '192.0.2.84')
+        query = dns.message.make_query(name, 'A', want_dnssec=True)
+        query.flags |= dns.flags.CD
+        res = self.sendUDPQuery(query)
+        self.assertRRsetInAnswer(res, expected)
+
+        # check the protobuf messages corresponding to the UDP query and answer
+        msg = self.getFirstProtobufMessage()
+        self.checkProtobufQuery(msg, dnsmessage_pb2.PBDNSMessage.UDP, query, dns.rdataclass.IN, dns.rdatatype.A, name)
+        self.checkProtobufTags(msg, [self._tag_from_gettag])
+        # then the response
+        msg = self.getFirstProtobufMessage()
+        self.checkProtobufResponse(msg, dnsmessage_pb2.PBDNSMessage.UDP, res)
+        self.assertEquals(len(msg.response.rrs), 1)
+        rr = msg.response.rrs[0]
+        # we have max-cache-ttl set to 15
+        self.checkProtobufResponseRecord(rr, dns.rdataclass.IN, dns.rdatatype.A, name, 15)
+        self.assertEquals(socket.inet_ntop(socket.AF_INET, rr.rdata), '192.0.2.84')
+        tags = [self._tag_from_gettag] + self._tags
+        self.checkProtobufTags(msg, tags)
+        self.checkNoRemainingMessage()
+
+class ProtobufSelectedFromLuaTest(TestRecursorProtobuf):
+    """
+    This test makes sure that we correctly export queries and responses but only if they have been selected from Lua.
+    """
+
+    _confdir = 'ProtobufSelectedFromLua'
+    _config_template = """
+auth-zones=example=configs/%s/example.zone""" % _confdir
+    global protobufServerPort
+    _lua_config_file = """
+    protobufServer("127.0.0.1:%d", { logQueries=false, logResponses=false } )
+    """ % (protobufServerPort)
+    _lua_dns_script_file = """
+    local ffi = require("ffi")
+
+    ffi.cdef[[
+      typedef struct pdns_ffi_param pdns_ffi_param_t;
+
+      const char* pdns_ffi_param_get_qname(pdns_ffi_param_t* ref);
+      void pdns_ffi_param_set_log_query(pdns_ffi_param_t* ref, bool logQuery);
+    ]]
+
+    function gettag_ffi(obj)
+      qname = ffi.string(ffi.C.pdns_ffi_param_get_qname(obj))
+      if qname == 'query-selected.example' then
+        ffi.C.pdns_ffi_param_set_log_query(obj, true)
+      end
+      return 0
+    end
+
+    function preresolve(dq)
+      if dq.qname:equal('answer-selected.example.') then
+        dq.logResponse = true
+      end
+      return false
+    end
+    """
+
+    def testA(self):
+        name = 'a.example.'
+        expected = dns.rrset.from_text(name, 0, dns.rdataclass.IN, 'A', '192.0.2.42')
+        query = dns.message.make_query(name, 'A', want_dnssec=True)
+        query.flags |= dns.flags.CD
+        res = self.sendUDPQuery(query)
+        self.assertRRsetInAnswer(res, expected)
+
+        # check the protobuf message corresponding to the UDP response
+        # the first query and answer are not selected, so there is nothing in the queue
+        self.checkNoRemainingMessage()
+
+    def testQuerySelected(self):
+        name = 'query-selected.example.'
+        expected = dns.rrset.from_text(name, 0, dns.rdataclass.IN, 'A', '192.0.2.84')
+        query = dns.message.make_query(name, 'A', want_dnssec=True)
+        query.flags |= dns.flags.CD
+        res = self.sendUDPQuery(query)
+        self.assertRRsetInAnswer(res, expected)
+
+        # check the protobuf messages corresponding to the UDP query
+        msg = self.getFirstProtobufMessage()
+        self.checkProtobufQuery(msg, dnsmessage_pb2.PBDNSMessage.UDP, query, dns.rdataclass.IN, dns.rdatatype.A, name)
+        # there should be no response
+        self.checkNoRemainingMessage()
+
+    def testResponseSelected(self):
+        name = 'answer-selected.example.'
+        expected = dns.rrset.from_text(name, 0, dns.rdataclass.IN, 'A', '192.0.2.84')
+        query = dns.message.make_query(name, 'A', want_dnssec=True)
+        query.flags |= dns.flags.CD
+        res = self.sendUDPQuery(query)
+        self.assertRRsetInAnswer(res, expected)
+
+        # check the protobuf messages corresponding to the UDP response
+        msg = self.getFirstProtobufMessage()
+        self.checkProtobufResponse(msg, dnsmessage_pb2.PBDNSMessage.UDP, res)
+        self.assertEquals(len(msg.response.rrs), 1)
+        rr = msg.response.rrs[0]
+        # we have max-cache-ttl set to 15
+        self.checkProtobufResponseRecord(rr, dns.rdataclass.IN, dns.rdatatype.A, name, 15)
+        self.assertEquals(socket.inet_ntop(socket.AF_INET, rr.rdata), '192.0.2.84')
+        self.checkNoRemainingMessage()