]> git.ipfire.org Git - thirdparty/pdns.git/commitdiff
Merge remote-tracking branch 'origin/master' into rec-dnstap
authorOtto Moerbeek <otto.moerbeek@open-xchange.com>
Fri, 8 Mar 2019 08:45:56 +0000 (09:45 +0100)
committerOtto Moerbeek <otto.moerbeek@open-xchange.com>
Fri, 8 Mar 2019 08:45:56 +0000 (09:45 +0100)
26 files changed:
pdns/dnstap.cc
pdns/dnstap.hh
pdns/fstrm_logger.cc
pdns/fstrm_logger.hh
pdns/lwres.cc
pdns/lwres.hh
pdns/pdns_recursor.cc
pdns/rec-lua-conf.cc
pdns/rec-lua-conf.hh
pdns/recursordist/.gitignore
pdns/recursordist/Makefile.am
pdns/recursordist/configure.ac
pdns/recursordist/dnstap.cc [new symlink]
pdns/recursordist/dnstap.hh [new symlink]
pdns/recursordist/dnstap.proto [new symlink]
pdns/recursordist/docs/lua-config/protobuf.rst
pdns/recursordist/dolog.hh [new symlink]
pdns/recursordist/fstrm_logger.cc [new symlink]
pdns/recursordist/fstrm_logger.hh [new symlink]
pdns/recursordist/m4/pdns_check_dnstap.m4 [new symlink]
pdns/recursordist/test-syncres_cc.cc
pdns/syncres.cc
pdns/syncres.hh
pdns/version.cc
regression-tests.recursor-dnssec/runtests
regression-tests.recursor-dnssec/test_RecDnstap.py [new file with mode: 0644]

index b98b21672d426b437d18d5261ef0746ca86d60af..488e6216591438ce9bdafffde99a60c7ee32c9b9 100644 (file)
@@ -2,7 +2,7 @@
 #include "gettime.hh"
 #include "dnstap.hh"
 
-DnstapMessage::DnstapMessage(const std::string& identity, const ComboAddress* requestor, const ComboAddress* responder, bool isTCP, const char* packet, const size_t len, const struct timespec* queryTime, const struct timespec* responseTime)
+DnstapMessage::DnstapMessage(const std::string& identity, const ComboAddress* requestor, const ComboAddress* responder, bool isTCP, const char* packet, const size_t len, const struct timespec* queryTime, const struct timespec* responseTime, bool recursor)
 {
 #ifdef HAVE_PROTOBUF
   const struct dnsheader* dh = reinterpret_cast<const struct dnsheader*>(packet);
@@ -13,27 +13,31 @@ DnstapMessage::DnstapMessage(const std::string& identity, const ComboAddress* re
 
   dnstap::Message* message = proto_message.mutable_message();
 
-  message->set_type(!dh->qr ? dnstap::Message_Type_CLIENT_QUERY : dnstap::Message_Type_CLIENT_RESPONSE);
-
-  message->set_socket_family(requestor->sin4.sin_family == AF_INET ? dnstap::INET : dnstap::INET6);
-  message->set_socket_protocol(isTCP ? dnstap::TCP : dnstap::UDP);
-
-  if (requestor->sin4.sin_family == AF_INET) {
-    message->set_query_address(&requestor->sin4.sin_addr.s_addr, sizeof(requestor->sin4.sin_addr.s_addr));
-  }
-  else if (requestor->sin4.sin_family == AF_INET6) {
-    message->set_query_address(&requestor->sin6.sin6_addr.s6_addr, sizeof(requestor->sin6.sin6_addr.s6_addr));
+  if (recursor) {
+    message->set_type(!dh->qr ? dnstap::Message_Type_RESOLVER_QUERY : dnstap::Message_Type_RESOLVER_RESPONSE);
+  } else {
+    message->set_type(!dh->qr ? dnstap::Message_Type_CLIENT_QUERY : dnstap::Message_Type_CLIENT_RESPONSE);
   }
-  message->set_query_port(ntohs(requestor->sin4.sin_port));
+  message->set_socket_protocol(isTCP ? dnstap::TCP : dnstap::UDP);
 
-  if (requestor->sin4.sin_family == AF_INET) {
-    message->set_response_address(&responder->sin4.sin_addr.s_addr, sizeof(responder->sin4.sin_addr.s_addr));
+  if (requestor != nullptr) {
+    message->set_socket_family(requestor->sin4.sin_family == AF_INET ? dnstap::INET : dnstap::INET6);
+    if (requestor->sin4.sin_family == AF_INET) {
+      message->set_query_address(&requestor->sin4.sin_addr.s_addr, sizeof(requestor->sin4.sin_addr.s_addr));
+    } else if (requestor->sin4.sin_family == AF_INET6) {
+      message->set_query_address(&requestor->sin6.sin6_addr.s6_addr, sizeof(requestor->sin6.sin6_addr.s6_addr));
+    }
+    message->set_query_port(ntohs(requestor->sin4.sin_port));
   }
-  else if (requestor->sin4.sin_family == AF_INET6) {
-    message->set_response_address(&responder->sin6.sin6_addr.s6_addr, sizeof(responder->sin6.sin6_addr.s6_addr));
+  if (responder != nullptr) {
+    message->set_socket_family(responder->sin4.sin_family == AF_INET ? dnstap::INET : dnstap::INET6);
+    if (responder->sin4.sin_family == AF_INET) {
+      message->set_response_address(&responder->sin4.sin_addr.s_addr, sizeof(responder->sin4.sin_addr.s_addr));
+    } else if (responder->sin4.sin_family == AF_INET6) {
+      message->set_response_address(&responder->sin6.sin6_addr.s6_addr, sizeof(responder->sin6.sin6_addr.s6_addr));
+    }
+    message->set_response_port(ntohs(responder->sin4.sin_port));
   }
-  message->set_response_port(ntohs(responder->sin4.sin_port));
-
   if (queryTime != nullptr) {
     message->set_query_time_sec(queryTime->tv_sec);
     message->set_query_time_nsec(queryTime->tv_nsec);
index d00bd0bca81c5c74477a0b66858bab47d68295f3..59dd3c752f2b8d3b5bc0c91e0b58fd44b79dae86 100644 (file)
@@ -37,7 +37,7 @@
 class DnstapMessage
 {
 public:
-  DnstapMessage(const std::string& identity, const ComboAddress* requestor, const ComboAddress* responder, bool isTCP, const char* packet, const size_t len, const struct timespec* queryTime, const struct timespec* responseTime);
+  DnstapMessage(const std::string& identity, const ComboAddress* requestor, const ComboAddress* responder, bool isTCP, const char* packet, const size_t len, const struct timespec* queryTime, const struct timespec* responseTime, bool recursor = false);
   void serialize(std::string& data) const;
   std::string toDebugString() const;
 
index 617aa73d5cfd76df42c687f3c7d52b2500c39e0a..7df889b049209d1eb48d1332bf2a27e13aec62d5 100644 (file)
@@ -9,7 +9,8 @@
 
 #ifdef HAVE_FSTRM
 
-FrameStreamLogger::FrameStreamLogger(const int family, const std::string& address, bool connect): d_family(family), d_address(address)
+FrameStreamLogger::FrameStreamLogger(const int family, const std::string& address, bool connect,
+    const std::unordered_map<string,unsigned>& options): d_family(family), d_address(address)
 {
   fstrm_res res;
 
@@ -78,6 +79,44 @@ FrameStreamLogger::FrameStreamLogger(const int family, const std::string& addres
       throw std::runtime_error("FrameStreamLogger: fstrm_iothr_options_set_queue_model failed: " + std::to_string(res));
     }
 
+    if (options.find("bufferHint") != options.end() && options.at("bufferHint")) {
+      res = fstrm_iothr_options_set_buffer_hint(d_iothropt, options.at("bufferHint"));
+      if (res != fstrm_res_success) {
+        throw std::runtime_error("FrameStreamLogger: fstrm_iothr_options_set_buffer_hint failed: " + std::to_string(res));
+      }
+    }
+    if (options.find("flushTimeout") != options.end() && options.at("flushTimeout")) {
+      res = fstrm_iothr_options_set_flush_timeout(d_iothropt, options.at("flushTimeout"));
+      if (res != fstrm_res_success) {
+        throw std::runtime_error("FrameStreamLogger: fstrm_iothr_options_set_flush_timeout failed: " + std::to_string(res));
+      }
+    }
+    if (options.find("inputQueueSize") != options.end() && options.at("inputQueueSize")) {
+      res = fstrm_iothr_options_set_input_queue_size(d_iothropt, options.at("inputQueueSize"));
+      if (res != fstrm_res_success) {
+        throw std::runtime_error("FrameStreamLogger: fstrm_iothr_options_set_input_queue_size failed: " + std::to_string(res));
+      }
+    }
+    if (options.find("outputQueueSize") != options.end() && options.at("outputQueueSize")) {
+      res = fstrm_iothr_options_set_output_queue_size(d_iothropt, options.at("outputQueueSize"));
+      if (res != fstrm_res_success) {
+        throw std::runtime_error("FrameStreamLogger: fstrm_iothr_options_set_output_queue_size failed: " + std::to_string(res));
+      }
+    }
+    if (options.find("queueNotifyThreshold") != options.end() && options.at("queueNotifyThreshold")) {
+      res = fstrm_iothr_options_set_queue_notify_threshold(d_iothropt, options.at("queueNotifyThreshold"));
+      if (res != fstrm_res_success) {
+        throw std::runtime_error("FrameStreamLogger: fstrm_iothr_options_set_queue_notify_threshold failed: " + std::to_string(res));
+      }
+    }
+    if (options.find("setReopenInterval") != options.end() && options.at("setReopenInterval")) {
+      res = fstrm_iothr_options_set_reopen_interval(d_iothropt, options.at("setReopenInterval"));
+      if (res != fstrm_res_success) {
+        throw std::runtime_error("FrameStreamLogger: fstrm_iothr_options_set_reopen_interval failed: " + std::to_string(res));
+      }
+    }
+
+
     if (connect) {
       d_iothr = fstrm_iothr_init(d_iothropt, &d_writer);
       if (!d_iothr) {
index 240b3d6adde12ce807df71f0723f2bf1e818da25..ceb1e800905b149403bfd8b74a159d8890a4e999 100644 (file)
@@ -25,6 +25,7 @@
 
 #ifdef HAVE_FSTRM
 
+#include <unordered_map>
 #include <fstrm.h>
 #include <fstrm/iothr.h>
 #include <fstrm/unix_writer.h>
 class FrameStreamLogger : public RemoteLoggerInterface, boost::noncopyable
 {
 public:
-  FrameStreamLogger(int family, const std::string& address, bool connect);
+  FrameStreamLogger(int family, const std::string& address, bool connect, const std::unordered_map<string,unsigned>& options = std::unordered_map<string,unsigned>());
   virtual ~FrameStreamLogger();
   virtual void queueData(const std::string& data) override;
   virtual std::string toString() const override
   {
     return "FrameStreamLogger to " + d_address;
   }
+  bool logQueries(void) const { return d_logQueries; }
+  bool logResponses(void) const { return d_logResponses; }
+  void setLogQueries(bool flag) { d_logQueries = flag; }
+  void setLogResponses(bool flag) { d_logResponses = flag; }
+
 private:
+
   const int d_family;
   const std::string d_address;
   struct fstrm_iothr_queue *d_ioqueue{nullptr};
@@ -56,6 +63,9 @@ private:
   struct fstrm_iothr *d_iothr{nullptr};
 
   void cleanup();
+
+  bool d_logQueries{true};
+  bool d_logResponses{true};
 };
 
 #endif /* HAVE_FSTRM */
index 794838924ae0106d532b81d3e79d31ed6ff45c26..5c5b80256554be4c4fdc6ee587589d3fdeb57f14 100644 (file)
 
 #include "uuid-utils.hh"
 
+#ifdef HAVE_FSTRM
+#include "dnstap.hh"
+#include "fstrm_logger.hh"
+bool g_syslog;
+
+static bool isEnabledForQueries(const std::shared_ptr<std::vector<std::unique_ptr<RemoteLoggerInterface>>>& fstreamLoggers)
+{
+  if (fstreamLoggers == nullptr) {
+    return false;
+  }
+  for (auto& logger : *fstreamLoggers) {
+    // We know this is safe since fstreamLoggers is filled with FrameStreamLogger objects only
+    auto fsl = static_cast<const FrameStreamLogger*>(logger.get());
+    if (fsl->logQueries()) {
+      return true;
+    }
+  }
+  return false;
+}
+
+static void logFstreamQuery(const std::shared_ptr<std::vector<std::unique_ptr<RemoteLoggerInterface>>>& fstreamLoggers, const struct timeval &queryTime, const ComboAddress& ip, bool doTCP, const vector<uint8_t>& packet)
+{
+  if (fstreamLoggers == nullptr)
+    return;
+
+  struct timespec ts;
+  TIMEVAL_TO_TIMESPEC(&queryTime, &ts);
+  DnstapMessage message(SyncRes::s_serverID, nullptr, &ip, doTCP, reinterpret_cast<const char*>(&*packet.begin()), packet.size(), &ts, nullptr, true);
+  std::string str;
+  message.serialize(str);
+
+  for (auto& logger : *fstreamLoggers) {
+    logger->queueData(str);
+  }
+}
+
+static bool isEnabledForResponses(const std::shared_ptr<std::vector<std::unique_ptr<RemoteLoggerInterface>>>& fstreamLoggers)
+{
+  if (fstreamLoggers == nullptr) {
+    return false;
+  }
+  for (auto& logger : *fstreamLoggers) {
+    // We know this is safe since fstreamLoggers is filled with FrameStreamLogger objects only
+    auto fsl = static_cast<const FrameStreamLogger*>(logger.get());
+    if (fsl->logResponses()) {
+      return true;
+    }
+  }
+  return false;
+}
+
+static void logFstreamResponse(const std::shared_ptr<std::vector<std::unique_ptr<RemoteLoggerInterface>>>& fstreamLoggers, const ComboAddress& ip, bool doTCP, const std::string& packet, const struct timeval& queryTime, const struct timeval& replyTime)
+{
+  if (fstreamLoggers == nullptr)
+    return;
+
+  struct timespec ts1, ts2;
+  TIMEVAL_TO_TIMESPEC(&queryTime, &ts1);
+  TIMEVAL_TO_TIMESPEC(&replyTime, &ts2);
+  DnstapMessage message(SyncRes::s_serverID, nullptr, &ip, doTCP, static_cast<const char*>(&*packet.begin()), packet.size(), &ts1, &ts2, true);
+  std::string str;
+  message.serialize(str);
+
+  for (auto& logger : *fstreamLoggers) {
+    logger->queueData(str);
+  }
+}
+
+#endif // HAVE_FSTRM
+
 static void logOutgoingQuery(const std::shared_ptr<std::vector<std::unique_ptr<RemoteLogger>>>& outgoingLoggers, boost::optional<const boost::uuids::uuid&> initialRequestId, const boost::uuids::uuid& uuid, const ComboAddress& ip, const DNSName& domain, int type, uint16_t qid, bool doTCP, size_t bytes, boost::optional<Netmask>& srcmask)
 {
   if(!outgoingLoggers)
@@ -105,7 +175,7 @@ static void logIncomingResponse(const std::shared_ptr<std::vector<std::unique_pt
 /** lwr is only filled out in case 1 was returned, and even when returning 1 for 'success', lwr might contain DNS errors
     Never throws! 
  */
-int asyncresolve(const ComboAddress& ip, const DNSName& domain, int type, bool doTCP, bool sendRDQuery, int EDNS0Level, struct timeval* now, boost::optional<Netmask>& srcmask, boost::optional<const ResolveContext&> context, const std::shared_ptr<std::vector<std::unique_ptr<RemoteLogger>>>& outgoingLoggers, const std::set<uint16_t>& exportTypes, LWResult *lwr, bool* chained)
+int asyncresolve(const ComboAddress& ip, const DNSName& domain, int type, bool doTCP, bool sendRDQuery, int EDNS0Level, struct timeval* now, boost::optional<Netmask>& srcmask, boost::optional<const ResolveContext&> context, const std::shared_ptr<std::vector<std::unique_ptr<RemoteLogger>>>& outgoingLoggers, const std::shared_ptr<std::vector<std::unique_ptr<RemoteLoggerInterface>>>& fstrmLoggers, const std::set<uint16_t>& exportTypes, LWResult *lwr, bool* chained)
 {
   size_t len;
   size_t bufsize=g_outgoingEDNSBufsize;
@@ -165,7 +235,12 @@ int asyncresolve(const ComboAddress& ip, const DNSName& domain, int type, bool d
     uuid = getUniqueID();
     logOutgoingQuery(outgoingLoggers, context ? context->d_initialRequestId : boost::none, uuid, ip, domain, type, qid, doTCP, vpacket.size(), srcmask);
   }
-#endif
+#endif /* HAVE_PROTOBUF */
+#ifdef HAVE_FSTRM
+  if (isEnabledForQueries(fstrmLoggers)) {
+    logFstreamQuery(fstrmLoggers, queryTime, ip, doTCP, vpacket);
+  }
+#endif /* HAVE_FSTRM */
 
   srcmask = boost::none; // this is also our return value, even if EDNS0Level == 0
 
@@ -239,6 +314,13 @@ int asyncresolve(const ComboAddress& ip, const DNSName& domain, int type, bool d
     return ret;
 
   buf.resize(len);
+
+#ifdef HAVE_FSTRM
+  if (isEnabledForResponses(fstrmLoggers)) {
+    logFstreamResponse(fstrmLoggers, ip, doTCP, buf, queryTime, *now);
+  }
+#endif /* HAVE_FSTRM */
+
   lwr->d_records.clear();
   try {
     lwr->d_tcbit=0;
index 5ddc3af0dab4a2a4b84271e9bb4f1857369ac357..12a08e72e23e08fa06105c4c58d676c34f5a113b 100644 (file)
@@ -43,6 +43,7 @@
 #include "remote_logger.hh"
 #include "resolve-context.hh"
 
+
 int asendto(const char *data, size_t len, int flags, const ComboAddress& ip, uint16_t id,
             const DNSName& domain, uint16_t qtype,  int* fd);
 int arecvfrom(std::string& packet, int flags, const ComboAddress& ip, size_t *d_len, uint16_t id,
@@ -68,5 +69,5 @@ public:
   bool d_haveEDNS{false};
 };
 
-int asyncresolve(const ComboAddress& ip, const DNSName& domain, int type, bool doTCP, bool sendRDQuery, int EDNS0Level, struct timeval* now, boost::optional<Netmask>& srcmask, boost::optional<const ResolveContext&> context, const std::shared_ptr<std::vector<std::unique_ptr<RemoteLogger>>>& outgoingLoggers, const std::set<uint16_t>& exportTypes, LWResult* res, bool* chained);
+int asyncresolve(const ComboAddress& ip, const DNSName& domain, int type, bool doTCP, bool sendRDQuery, int EDNS0Level, struct timeval* now, boost::optional<Netmask>& srcmask, boost::optional<const ResolveContext&> context, const std::shared_ptr<std::vector<std::unique_ptr<RemoteLogger>>>& outgoingLoggers, const std::shared_ptr<std::vector<std::unique_ptr<RemoteLoggerInterface>>>& fstrmLoggers, const std::set<uint16_t>& exportTypes, LWResult* res, bool* chained);
 #endif // PDNS_LWRES_HH
index dcbbf88579b8b93cda311296be150be2b7e389fe..25ff42300c5710d544bad3506e6ea7de82c66e1c 100644 (file)
 
 #ifdef HAVE_PROTOBUF
 #include "uuid-utils.hh"
-#endif
+#endif /* HAVE_PROTOBUF */
 
 #include "xpf.hh"
 
@@ -121,6 +121,11 @@ static thread_local std::shared_ptr<std::vector<std::unique_ptr<RemoteLogger>>>
 static thread_local uint64_t t_outgoingProtobufServersGeneration;
 #endif /* HAVE_PROTOBUF */
 
+#ifdef HAVE_FSTRM
+static thread_local std::shared_ptr<std::vector<std::unique_ptr<RemoteLoggerInterface>>> t_frameStreamServers{nullptr};
+static thread_local uint64_t t_frameStreamServersGeneration;
+#endif /* HAVE_FSTRM */
+
 thread_local std::unique_ptr<MT_t> MT; // the big MTasker
 thread_local std::unique_ptr<MemRecursorCache> t_RC;
 thread_local std::unique_ptr<RecursorPacketCache> t_packetCache;
@@ -955,6 +960,66 @@ static bool checkOutgoingProtobufExport(LocalStateHolder<LuaConfigItems>& luacon
 
   return true;
 }
+
+#ifdef HAVE_FSTRM
+
+static std::shared_ptr<std::vector<std::unique_ptr<RemoteLoggerInterface>>> startFrameStreamServers(const FrameStreamExportConfig& config)
+{
+  auto result = std::make_shared<std::vector<std::unique_ptr<RemoteLoggerInterface>>>();
+
+  for (const auto& server : config.servers) {
+    try {
+      std::unordered_map<string,unsigned> options;
+      options["bufferHint"] = config.bufferHint;
+      options["flushTimeout"] = config.flushTimeout;
+      options["inputQueueSize"] = config.inputQueueSize;
+      options["outputQueueSize"] = config.outputQueueSize;
+      options["queueNotifyThreshold"] = config.queueNotifyThreshold;
+      options["reopenInterval"] = config.reopenInterval;
+      auto fsl = new FrameStreamLogger(server.sin4.sin_family, server.toStringWithPort(), true, options);
+      fsl->setLogQueries(config.logQueries);
+      fsl->setLogResponses(config.logResponses);
+      result->emplace_back(fsl);
+    }
+    catch(const std::exception& e) {
+      g_log<<Logger::Error<<"Error while starting dnstap framestream logger to '"<<server<<": "<<e.what()<<endl;
+    }
+    catch(const PDNSException& e) {
+      g_log<<Logger::Error<<"Error while starting dnstap framestream logger to '"<<server<<": "<<e.reason<<endl;
+    }
+  }
+
+  return result;
+}
+
+static bool checkFrameStreamExport(LocalStateHolder<LuaConfigItems>& luaconfsLocal)
+{
+  if (!luaconfsLocal->frameStreamExportConfig.enabled) {
+    if (t_frameStreamServers) {
+      // dt's take care of cleanup
+      t_frameStreamServers.reset();
+    }
+
+    return false;
+  }
+
+  /* if the server was not running, or if it was running according to a
+     previous configuration */
+  if (!t_frameStreamServers ||
+      t_frameStreamServersGeneration < luaconfsLocal->generation) {
+
+    if (t_frameStreamServers) {
+      // dt's take care of cleanup
+      t_frameStreamServers.reset();
+    }
+
+    t_frameStreamServers = startFrameStreamServers(luaconfsLocal->frameStreamExportConfig);
+    t_frameStreamServersGeneration = luaconfsLocal->generation;
+  }
+
+  return true;
+}
+#endif /* HAVE_FSTRM */
 #endif /* HAVE_PROTOBUF */
 
 #ifdef NOD_ENABLED
@@ -1083,6 +1148,10 @@ static void startDoResolve(void *p)
     }
 #endif /* HAVE_PROTOBUF */
 
+#ifdef HAVE_FSTRM
+    checkFrameStreamExport(luaconfsLocal);
+#endif
+
     DNSPacketWriter pw(packet, dc->d_mdp.d_qname, dc->d_mdp.d_qtype, dc->d_mdp.d_qclass);
 
     pw.getHeader()->aa=0;
@@ -1138,7 +1207,9 @@ static void startDoResolve(void *p)
     sr.setInitialRequestId(dc->d_uuid);
     sr.setOutgoingProtobufServers(t_outgoingProtobufServers);
 #endif
-
+#ifdef HAVE_FSTRM
+    sr.setFrameStreamServers(t_frameStreamServers);
+#endif
     sr.setQuerySource(dc->d_remote, g_useIncomingECS && !dc->d_ednssubnet.source.empty() ? boost::optional<const EDNSSubnetOpts&>(dc->d_ednssubnet) : boost::none);
 
     bool tracedQuery=false; // we could consider letting Lua know about this too
@@ -1902,6 +1973,10 @@ static void handleRunningTCPQuestion(int fd, FDMultiplexer::funcparam_t& var)
         needECS = true;
       }
       logQuery = t_protobufServers && luaconfsLocal->protobufExportConfig.logQueries;
+#endif /* HAVE_PROTOBUF */
+
+#ifdef HAVE_FSTRM
+      checkFrameStreamExport(luaconfsLocal);
 #endif
 
       if(needECS || needXPF || (t_pdl && (t_pdl->d_gettag_ffi || t_pdl->d_gettag))) {
@@ -2094,6 +2169,9 @@ static string* doProcessUDPQuestion(const std::string& question, const ComboAddr
   }
   logQuery = t_protobufServers && luaconfsLocal->protobufExportConfig.logQueries;
   bool logResponse = t_protobufServers && luaconfsLocal->protobufExportConfig.logResponses;
+#endif
+#ifdef HAVE_FSTRM
+  checkFrameStreamExport(luaconfsLocal);
 #endif
   EDNSSubnetOpts ednssubnet;
   bool ecsFound = false;
@@ -4017,6 +4095,9 @@ try
   checkProtobufExport(luaconfsLocal);
   checkOutgoingProtobufExport(luaconfsLocal);
 #endif /* HAVE_PROTOBUF */
+#ifdef HAVE_FSTRM
+  checkFrameStreamExport(luaconfsLocal);
+#endif
 
   PacketID pident;
 
index ea1d56ed6c9d42934aa71b5df2d93fe682c1580e..bd077250cbe8c4d3d5ef77e649f4f8e25b08604a 100644 (file)
@@ -145,6 +145,43 @@ static void parseProtobufOptions(boost::optional<protobufOptions_t> vars, Protob
 }
 #endif /* HAVE_PROTOBUF */
 
+#ifdef HAVE_FSTRM
+typedef std::unordered_map<std::string, boost::variant<bool, uint64_t, std::string, std::vector<std::pair<int,std::string> > > > frameStreamOptions_t;
+
+static void parseFrameStreamOptions(boost::optional<frameStreamOptions_t> vars, FrameStreamExportConfig& config)
+{
+  if (!vars) {
+    return;
+  }
+
+  if (vars->count("logQueries")) {
+    config.logQueries = boost::get<bool>((*vars)["logQueries"]);
+  }
+  if (vars->count("logResponses")) {
+    config.logResponses = boost::get<bool>((*vars)["logResponses"]);
+  }
+
+  if (vars->count("bufferHint")) {
+    config.bufferHint = boost::get<uint64_t>((*vars)["bufferHint"]);
+  }
+  if (vars->count("flushTimeout")) {
+    config.flushTimeout = boost::get<uint64_t>((*vars)["flushTimeout"]);
+  }
+  if (vars->count("inputQueueSize")) {
+    config.inputQueueSize = boost::get<uint64_t>((*vars)["inputQueueSize"]);
+  }
+  if (vars->count("outputQueueSize")) {
+    config.outputQueueSize = boost::get<uint64_t>((*vars)["outputQueueSize"]);
+  }
+  if (vars->count("queueNotifyThreshold")) {
+    config.queueNotifyThreshold = boost::get<uint64_t>((*vars)["queueNotifyThreshold"]);
+  }
+  if (vars->count("reopenInterval")) {
+    config.reopenInterval = boost::get<uint64_t>((*vars)["reopenInterval"]);
+  }
+}
+#endif /* HAVE_FSTRM */
+
 void loadRecursorLuaConfig(const std::string& fname, luaConfigDelayedThreads& delayedThreads)
 {
   LuaConfigItems lci;
@@ -495,6 +532,40 @@ void loadRecursorLuaConfig(const std::string& fname, luaConfigDelayedThreads& de
     });
 #endif
 
+#ifdef HAVE_FSTRM
+  Lua.writeFunction("dnstapFrameStreamServer", [&lci](boost::variant<const std::string, const std::unordered_map<int, std::string>> servers, boost::optional<frameStreamOptions_t> vars) {
+      if (!lci.frameStreamExportConfig.enabled) {
+
+        lci.frameStreamExportConfig.enabled = true;
+
+          try {
+            if (servers.type() == typeid(std::string)) {
+              auto server = boost::get<const std::string>(servers);
+
+              lci.frameStreamExportConfig.servers.emplace_back(server);
+            }
+            else {
+              auto serversMap = boost::get<const std::unordered_map<int,std::string>>(servers);
+              for (const auto& serverPair : serversMap) {
+                lci.frameStreamExportConfig.servers.emplace_back(serverPair.second);
+              }
+            }
+
+            parseFrameStreamOptions(vars, lci.frameStreamExportConfig);
+          }
+          catch(std::exception& e) {
+            g_log<<Logger::Error<<"Error while starting dnstap framestream logger: "<<e.what()<<endl;
+          }
+          catch(PDNSException& e) {
+            g_log<<Logger::Error<<"Error while starting dnstap framestream logger: "<<e.reason<<endl;
+          }
+      }
+      else {
+        g_log<<Logger::Error<<"Only one dnstapFrameStreamServer() directive can be configured, we already have "<<lci.frameStreamExportConfig.servers.at(0).toString()<<endl;
+      }
+    });
+#endif /* HAVE_FSTRM */
+
   try {
     Lua.executeCode(ifs);
     g_luaconfs.setState(lci);
index 4323bd0c6cc9b5bbde4750bba9a2bb54b3856ff2..8060864c9a12307df650aa472c5a5943f65bea88 100644 (file)
@@ -41,6 +41,20 @@ struct ProtobufExportConfig
   bool taggedOnly{false};
 };
 
+struct FrameStreamExportConfig
+{
+  std::vector<ComboAddress> servers;
+  bool enabled{false};
+  bool logQueries{true};
+  bool logResponses{true};
+  unsigned bufferHint{0};
+  unsigned flushTimeout{0};
+  unsigned inputQueueSize{0};
+  unsigned outputQueueSize{0};
+  unsigned queueNotifyThreshold{0};
+  unsigned reopenInterval{0};
+};
+
 struct TrustAnchorFileInfo {
   uint32_t interval{24};
   std::string fname;
@@ -57,6 +71,8 @@ public:
   map<DNSName,std::string> negAnchors;
   ProtobufExportConfig protobufExportConfig;
   ProtobufExportConfig outgoingProtobufExportConfig;
+  FrameStreamExportConfig frameStreamExportConfig;
+
   /* we need to increment this every time the configuration
      is reloaded, so we know if we need to reload the protobuf
      remote loggers */
index b7636f11b6c8379277ff20e97b344a8a97004713..4f3f8a2199f2db6ec07d1071eab4b489fcfc6512 100644 (file)
@@ -52,3 +52,5 @@ html-docs
 doctrees
 latex
 PowerDNS-Recursor.pdf
+/*.pb.cc
+/*.pb.h
index 92c8ae5a27ed0b0eb3ae11fd64c662c64771f1e7..9ebce5994feff952d0bf783b427db8e2d6f9f1c8 100644 (file)
@@ -18,6 +18,11 @@ AM_CXXFLAGS += \
        -DNODCACHEDIR=\"$(nodcachedir)\"
 endif
 
+if FSTRM
+AM_CPPFLAGS += \
+       $(FSTRM_CFLAGS)
+endif
+
 AM_LDFLAGS = \
        $(PROGRAM_LDFLAGS) \
        $(THREADFLAGS)
@@ -352,14 +357,29 @@ if HAVE_PROTOC
 dnsmessage.pb.cc: dnsmessage.proto
        $(AM_V_GEN)$(PROTOC) --cpp_out=./ $<
 
+if FSTRM
+dnstap.pb.cc: dnstap.proto
+       $(AM_V_GEN)$(PROTOC) -I$(srcdir) --cpp_out=./ $<
+endif
+
+
 BUILT_SOURCES += dnsmessage.pb.cc
 pdns_recursor_LDADD += $(PROTOBUF_LIBS)
 nodist_pdns_recursor_SOURCES = dnsmessage.pb.cc dnsmessage.pb.h
+nodist_testrunner_SOURCES = dnsmessage.pb.cc dnsmessage.pb.h
+
+if FSTRM
+BUILT_SOURCES += dnstap.pb.cc
+pdns_recursor.$(OBJEXT): dnstap.pb.cc  dnsmessage.pb.cc
+testrunner$(OBJEXT): dnstap.pb.cc dnsmessage.pb.cc
+nodist_pdns_recursor_SOURCES += dnstap.pb.cc dnstap.pb.h
+nodist_testrunner_SOURCES += dnstap.pb.cc dnstap.pb.h
+else
 pdns_recursor.$(OBJEXT): dnsmessage.pb.cc
+testrunner$(OBJEXT): dnsmessage.pb.cc
+endif
 
-nodist_testrunner_SOURCES = dnsmessage.pb.cc dnsmessage.pb.h
 testrunner_LDADD += $(PROTOBUF_LIBS)
-testrunner$(OBJEXT): dnsmessage.pb.cc
 
 endif
 
@@ -368,6 +388,14 @@ pdns_recursor_SOURCES += \
 
 endif
 
+if FSTRM
+pdns_recursor_SOURCES += \
+       dnstap.cc dnstap.hh fstrm_logger.cc fstrm_logger.hh dolog.hh
+
+pdns_recursor_LDADD += \
+       $(FSTRM_LIBS)
+endif
+
 rec_control_SOURCES = \
        arguments.cc arguments.hh \
        dnsname.hh dnsname.cc \
index ce066fb7a31c9e139f60106c3ae3ec472e5b34b8..2e75672bb0d927d6761af32f866c09eb1ae277b6 100644 (file)
@@ -155,6 +155,8 @@ AC_ARG_WITH([nod-cache-dir],
   [nodcachedir="$withval"]
 )
 
+PDNS_CHECK_DNSTAP
+
 AC_MSG_CHECKING([whether we will enable compiler security checks])
 AC_ARG_ENABLE([hardening],
   [AS_HELP_STRING([--disable-hardening], [disable compiler security checks @<:@default=no@:>@])],
@@ -251,5 +253,9 @@ AM_COND_IF([NOD_ENABLED],
   [AC_MSG_NOTICE([nod: yes])],
   [AC_MSG_NOTICE([nod: no])]
 )
+AM_COND_IF([FSTRM],
+  [AC_MSG_NOTICE([dnstap: yes])],
+  [AC_MSG_NOTICE([dnstap: no])]
+)
 AC_MSG_NOTICE([Context library: $pdns_context_library])
 AC_MSG_NOTICE([])
diff --git a/pdns/recursordist/dnstap.cc b/pdns/recursordist/dnstap.cc
new file mode 120000 (symlink)
index 0000000..06c8e37
--- /dev/null
@@ -0,0 +1 @@
+../dnstap.cc
\ No newline at end of file
diff --git a/pdns/recursordist/dnstap.hh b/pdns/recursordist/dnstap.hh
new file mode 120000 (symlink)
index 0000000..9fd70f0
--- /dev/null
@@ -0,0 +1 @@
+../dnstap.hh
\ No newline at end of file
diff --git a/pdns/recursordist/dnstap.proto b/pdns/recursordist/dnstap.proto
new file mode 120000 (symlink)
index 0000000..6b6dfbd
--- /dev/null
@@ -0,0 +1 @@
+../dnstap.proto
\ No newline at end of file
index 629f63047e69cef42890c66c577e00581f5317b2..c3889c3bd0bf48353a9283ee0a306e21349b4fe9 100644 (file)
@@ -95,3 +95,35 @@ Protobol Buffers Definition
 The protocol buffers message types can be found in the `dnsmessage.proto <https://github.com/PowerDNS/pdns/blob/master/pdns/dnsmessage.proto>`_ file and is included here:
 
 .. literalinclude:: ../../../dnsmessage.proto
+
+Logging in ``dnstap`` format using framestreams
+-----------------------------------------------
+Define the following function to enable logging of outgoing queries and/or responses in ``dnstap`` format.
+The recursor must have been built with configure ``--enable-dnstap`` to make this feature available.
+
+.. function:: dnstapFrameStreamServer(servers, [, options])
+
+  .. versionadded:: 4.X.0
+
+  Send dnstap formatted message to one or more framestream servers for outgoing queries and/or incoming responses.
+
+  :param servers: The IP and port to connect to, or a list of those. If more than one server is configured, all messages are sent to every server.
+  :type servers: string or list of strings
+  :param table options: A table with ``key=value`` pairs with options.
+
+  Options:
+
+  * ``logQueries=true``: bool - log oputgoing queries
+  * ``logResponses=true``: bool - log incoming responses
+  The follwing options apply to the settings of the framestream library. Refer to the documentation of that
+  library for the default values, exact description and allowable values for these options.
+  For all these options, absence or a zero value has the effect of using the library-provided default value.
+
+  * ``bufferHint=0``: unsigned
+  * ``flushTimeout=0``: unsigned
+  * ``inputQueueSize=0``: unsigned
+  * ``outputQueueSize=0``: unsigned
+  * ``queueNotifyThreshold=0``: unsigned
+  * ``reopenInterval=0``: unsigned
+
diff --git a/pdns/recursordist/dolog.hh b/pdns/recursordist/dolog.hh
new file mode 120000 (symlink)
index 0000000..e458b07
--- /dev/null
@@ -0,0 +1 @@
+../dolog.hh
\ No newline at end of file
diff --git a/pdns/recursordist/fstrm_logger.cc b/pdns/recursordist/fstrm_logger.cc
new file mode 120000 (symlink)
index 0000000..e66c9cc
--- /dev/null
@@ -0,0 +1 @@
+../fstrm_logger.cc
\ No newline at end of file
diff --git a/pdns/recursordist/fstrm_logger.hh b/pdns/recursordist/fstrm_logger.hh
new file mode 120000 (symlink)
index 0000000..b4898e9
--- /dev/null
@@ -0,0 +1 @@
+../fstrm_logger.hh
\ No newline at end of file
diff --git a/pdns/recursordist/m4/pdns_check_dnstap.m4 b/pdns/recursordist/m4/pdns_check_dnstap.m4
new file mode 120000 (symlink)
index 0000000..ed71845
--- /dev/null
@@ -0,0 +1 @@
+../../../m4/pdns_check_dnstap.m4
\ No newline at end of file
index 8736f32aee43dac397c1f50c1e6487d43c2a8dbb..23e90995946f9beac2e2d7762f00b3a33b4eaf68 100644 (file)
@@ -39,7 +39,7 @@ bool RecursorLua4::preoutquery(const ComboAddress& ns, const ComboAddress& reque
   return false;
 }
 
-int asyncresolve(const ComboAddress& ip, const DNSName& domain, int type, bool doTCP, bool sendRDQuery, int EDNS0Level, struct timeval* now, boost::optional<Netmask>& srcmask, boost::optional<const ResolveContext&> context, const std::shared_ptr<std::vector<std::unique_ptr<RemoteLogger>>>& outgoingLoggers, const std::set<uint16_t>& exportTypes, LWResult* res, bool* chained)
+int asyncresolve(const ComboAddress& ip, const DNSName& domain, int type, bool doTCP, bool sendRDQuery, int EDNS0Level, struct timeval* now, boost::optional<Netmask>& srcmask, boost::optional<const ResolveContext&> context, const std::shared_ptr<std::vector<std::unique_ptr<RemoteLogger>>>& outgoingLoggers, const std::shared_ptr<std::vector<std::unique_ptr<RemoteLoggerInterface>>>& fstrmLoggers,const std::set<uint16_t>& exportTypes, LWResult* res, bool* chained)
 {
   return 0;
 }
index 3cf44f16a37127c762b864f709fe2e8894691d76..f7ee8bff0ce73a7a5c56df9bcdfe7cf41bcda8f9 100644 (file)
@@ -493,7 +493,7 @@ int SyncRes::asyncresolveWrapper(const ComboAddress& ip, bool ednsMANDATORY, con
       ret = d_asyncResolve(ip, sendQname, type, doTCP, sendRDQuery, EDNSLevel, now, srcmask, ctx, res, chained);
     }
     else {
-      ret=asyncresolve(ip, sendQname, type, doTCP, sendRDQuery, EDNSLevel, now, srcmask, ctx, d_outgoingProtobufServers, luaconfsLocal->outgoingProtobufExportConfig.exportTypes, res, chained);
+      ret=asyncresolve(ip, sendQname, type, doTCP, sendRDQuery, EDNSLevel, now, srcmask, ctx, d_outgoingProtobufServers, d_frameStreamServers, luaconfsLocal->outgoingProtobufExportConfig.exportTypes, res, chained);
     }
     if(ret < 0) {
       return ret; // transport error, nothing to learn here
index a19f21e54e6d9ead23d52b2238e582f1690e5f08..885a55828e07d87f1998a7a978b504f8b36dd3fe 100644 (file)
@@ -56,6 +56,9 @@
 
 #ifdef HAVE_PROTOBUF
 #include <boost/uuid/uuid.hpp>
+#ifdef HAVE_FSTRM
+#include "fstrm_logger.hh"
+#endif /* HAVE_FSTRM */
 #endif
 
 class RecursorLua4;
@@ -661,6 +664,13 @@ public:
   }
 #endif
 
+#ifdef HAVE_FSTRM
+  void setFrameStreamServers(std::shared_ptr<std::vector<std::unique_ptr<RemoteLoggerInterface>>>& servers)
+  {
+    d_frameStreamServers = servers;
+  }
+#endif /* HAVE_FSTRM */
+
   void setAsyncCallback(asyncresolve_t func)
   {
     d_asyncResolve = func;
@@ -805,6 +815,7 @@ private:
   shared_ptr<RecursorLua4> d_pdl;
   boost::optional<Netmask> d_outgoingECSNetwork;
   std::shared_ptr<std::vector<std::unique_ptr<RemoteLogger>>> d_outgoingProtobufServers{nullptr};
+  std::shared_ptr<std::vector<std::unique_ptr<RemoteLoggerInterface>>> d_frameStreamServers{nullptr};
 #ifdef HAVE_PROTOBUF
   boost::optional<const boost::uuids::uuid&> d_initialRequestId;
 #endif
index 9ecfc34998edc13b8540203b9450ef17157230e1..ce2f3963ce8079367b94c21433cfc275ef48f816 100644 (file)
@@ -117,7 +117,10 @@ void showBuildConfiguration()
     "PKCS#11 " <<
 #endif
 #ifdef HAVE_PROTOBUF
-    "protobuf " <<
+"protobuf " <<
+#endif
+#ifdef HAVE_FSTRM
+"dnstap-framestream " <<
 #endif
 #ifdef REMOTEBACKEND_ZEROMQ
     "remotebackend-zeromq " <<
index 50923a84ce3bb00e2e52da391a53f871fcd98890..4f1a674a3cea9cb6211e35e3eb8a3df7f665db4f 100755 (executable)
@@ -11,6 +11,8 @@ pip install -U pip
 pip install -r requirements.txt
 
 protoc -I=../pdns/ --python_out=. ../pdns/dnsmessage.proto
+protoc -I=../pdns/ --python_out=. ../pdns/dnstap.proto
+
 
 mkdir -p configs
 
@@ -28,6 +30,11 @@ if [ $(uname -s) = "Darwin" ]; then
   LIBFAKETIME_DEFAULT=/usr/local/lib/faketime/libfaketime.1.dylib
   LIBAUTHBIND_DEFAULT=""
 fi
+if [ $(uname -s) = "OpenBSD" ]; then
+  # OpenBSD is not /really/ supported here; it works for some tests, and then you might need sudo.
+  LIBFAKETIME_DEFAULT=""
+  LIBAUTHBIND_DEFAULT=""
+fi
 
 export LIBFAKETIME=${LIBFAKETIME:-$LIBFAKETIME_DEFAULT}
 export LIBAUTHBIND=${LIBAUTHBIND:-$LIBAUTHBIND_DEFAULT}
@@ -45,4 +52,13 @@ set -e
 if [ "${PDNS_DEBUG}" = "YES" ]; then
   set -x
 fi
+
+if ! "$PDNSRECURSOR" --version 2>&1 | grep Features | grep -q dnstap-framestream; then
+  export NODNSTAPTESTS=1
+fi
+
+if [ "${LIBAUTHBIND}" != "" -o "${LIBFAKETIME}" != "" ]; then
 LD_PRELOAD="${LIBASAN} ${LIBAUTHBIND} ${LIBFAKETIME}" nosetests -I test_WellKnown.py --with-xunit $@
+else
+nosetests -I test_WellKnown.py --with-xunit $@
+fi
diff --git a/regression-tests.recursor-dnssec/test_RecDnstap.py b/regression-tests.recursor-dnssec/test_RecDnstap.py
new file mode 100644 (file)
index 0000000..0669fbc
--- /dev/null
@@ -0,0 +1,301 @@
+import dns
+import dnsmessage_pb2
+import os
+import socket
+import struct
+import sys
+import threading
+import time
+
+import dns
+import dnstap_pb2
+
+FSTRM_CONTROL_ACCEPT = 0x01
+FSTRM_CONTROL_START = 0x02
+FSTRM_CONTROL_STOP = 0x03
+FSTRM_CONTROL_READY = 0x04
+FSTRM_CONTROL_FINISH = 0x05
+
+# Python2/3 compatibility hacks
+try:
+  from queue import Queue
+except ImportError:
+  from Queue import Queue
+
+try:
+  range = xrange
+except NameError:
+  pass
+
+from nose import SkipTest
+from recursortests import RecursorTest
+
+def checkDnstapBase(testinstance, dnstap, protocol, initiator):
+    testinstance.assertTrue(dnstap)
+    testinstance.assertTrue(dnstap.HasField('identity'))
+    #testinstance.assertEqual(dnstap.identity, b'a.server')
+    testinstance.assertTrue(dnstap.HasField('version'))
+    #testinstance.assertIn(b'dnsdist ', dnstap.version)
+    testinstance.assertTrue(dnstap.HasField('type'))
+    testinstance.assertEqual(dnstap.type, dnstap.MESSAGE)
+    testinstance.assertTrue(dnstap.HasField('message'))
+    testinstance.assertTrue(dnstap.message.HasField('socket_protocol'))
+    testinstance.assertEqual(dnstap.message.socket_protocol, protocol)
+    testinstance.assertTrue(dnstap.message.HasField('socket_family'))
+    testinstance.assertEquals(dnstap.message.socket_family, dnstap_pb2.INET)
+    #testinstance.assertTrue(dnstap.message.HasField('query_address'))
+    #testinstance.assertEquals(socket.inet_ntop(socket.AF_INET, dnstap.message.query_address), initiator)
+    testinstance.assertTrue(dnstap.message.HasField('response_address'))
+    testinstance.assertEquals(socket.inet_ntop(socket.AF_INET, dnstap.message.response_address), initiator)
+    testinstance.assertTrue(dnstap.message.HasField('response_port'))
+    testinstance.assertEquals(dnstap.message.response_port, 53)
+
+
+def checkDnstapQuery(testinstance, dnstap, protocol, query, initiator='127.0.0.1'):
+    testinstance.assertEquals(dnstap.message.type, dnstap_pb2.Message.RESOLVER_QUERY)
+    checkDnstapBase(testinstance, dnstap, protocol, initiator)
+
+    testinstance.assertTrue(dnstap.message.HasField('query_time_sec'))
+    testinstance.assertTrue(dnstap.message.HasField('query_time_nsec'))
+
+    testinstance.assertTrue(dnstap.message.HasField('query_message'))
+    wire_message = dns.message.from_wire(dnstap.message.query_message)
+    #testinstance.assertEqual(wire_message, query)
+
+
+def checkDnstapExtra(testinstance, dnstap, expected):
+    testinstance.assertTrue(dnstap.HasField('extra'))
+    testinstance.assertEqual(dnstap.extra, expected)
+
+
+def checkDnstapNoExtra(testinstance, dnstap):
+    testinstance.assertFalse(dnstap.HasField('extra'))
+
+
+def checkDnstapResponse(testinstance, dnstap, protocol, response, initiator='127.0.0.1'):
+    testinstance.assertEquals(dnstap.message.type, dnstap_pb2.Message.RESOLVER_RESPONSE)
+    checkDnstapBase(testinstance, dnstap, protocol, initiator)
+
+    testinstance.assertTrue(dnstap.message.HasField('query_time_sec'))
+    testinstance.assertTrue(dnstap.message.HasField('query_time_nsec'))
+
+    testinstance.assertTrue(dnstap.message.HasField('response_time_sec'))
+    testinstance.assertTrue(dnstap.message.HasField('response_time_nsec'))
+
+    testinstance.assertTrue(dnstap.message.response_time_sec > dnstap.message.query_time_sec or \
+                            dnstap.message.response_time_nsec > dnstap.message.query_time_nsec)
+
+    testinstance.assertTrue(dnstap.message.HasField('response_message'))
+    wire_message = dns.message.from_wire(dnstap.message.response_message)
+    testinstance.assertEqual(wire_message, response)
+
+def fstrm_get_control_frame_type(data):
+    (t,) = struct.unpack("!L", data[0:4])
+    return t
+
+
+def fstrm_make_control_frame_reply(cft, data):
+    if cft == FSTRM_CONTROL_READY:
+        # Reply with ACCEPT frame and content-type
+        contenttype = b'protobuf:dnstap.Dnstap'
+        frame = struct.pack('!LLL', FSTRM_CONTROL_ACCEPT, 1,
+                            len(contenttype)) + contenttype
+        buf = struct.pack("!LL", 0, len(frame)) + frame
+        return buf
+    elif cft == FSTRM_CONTROL_START:
+        return None
+    else:
+        raise Exception('unhandled control frame ' + cft)
+
+
+def fstrm_read_and_dispatch_control_frame(conn):
+    data = conn.recv(4)
+    if not data:
+        raise Exception('length of control frame payload could not be read')
+    (datalen,) = struct.unpack("!L", data)
+    data = conn.recv(datalen)
+    cft = fstrm_get_control_frame_type(data)
+    reply = fstrm_make_control_frame_reply(cft, data)
+    if reply:
+        conn.send(reply)
+    return cft
+
+
+def fstrm_handle_bidir_connection(conn, on_data):
+    data = None
+    while True:
+        data = conn.recv(4)
+        if not data:
+            break
+        (datalen,) = struct.unpack("!L", data)
+        if datalen == 0:
+            # control frame length follows
+            cft = fstrm_read_and_dispatch_control_frame(conn)
+            if cft == FSTRM_CONTROL_STOP:
+                break
+        else:
+            # data frame
+            data = conn.recv(datalen)
+            if not data:
+                break
+
+            on_data(data)
+
+
+
+class DNSTapServerParams:
+  def __init__(self, port):
+    self.queue = Queue()
+    self.port = port
+
+
+DNSTapServerParameters = DNSTapServerParams(4243)
+DNSTapListeners = []
+
+class TestRecursorDNSTap(RecursorTest):
+    @classmethod
+    def FrameStreamUnixListener(cls, conn, param):
+        while True:
+            try:
+                fstrm_handle_bidir_connection(conn, lambda data: \
+                param.queue.put(data, True, timeout=2.0))
+            except socket.error as e:
+                if e.errno == 9:
+                    break
+                printf("Unexpected socket error %d", e)
+                sys.exit(1)
+            conn.close()
+
+    @classmethod
+    def FrameStreamUnixListenerMain(cls, param):
+        sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+        sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1)
+        try:
+            sock.bind(("127.0.0.1", param.port))
+        except socket.error as e:
+            print("Error binding in the framestream listener: %s" % str(e))
+            sys.exit(1)
+        DNSTapListeners.append(sock)
+        sock.listen(100)
+        while True:
+            (conn, _) = sock.accept()
+            print("Accepting connection")
+            listener = threading.Thread(name='DNSTap Worker', target=cls.FrameStreamUnixListener, args=[conn, param])
+            listener.setDaemon(True)
+            listener.start()
+
+        sock.close()
+
+    @classmethod
+    def setUpClass(cls):
+       
+       if os.environ.get("NODNSTAPTESTS") == "1":
+               raise SkipTest("Not Yet Supported")
+
+        cls.setUpSockets()
+
+        cls.startResponders()
+
+        listener = threading.Thread(name='DNSTap Listener', target=cls.FrameStreamUnixListenerMain, args=[DNSTapServerParameters])
+        listener.setDaemon(True)
+        listener.start()
+
+
+        confdir = os.path.join('configs', cls._confdir)
+        cls.createConfigDir(confdir)
+
+        cls.generateRecursorConfig(confdir)
+        cls.startRecursor(confdir, cls._recursorPort)
+
+    def setUp(self):
+        # Make sure the queue is empty, in case
+        # a previous test failed
+        while not DNSTapServerParameters.queue.empty():
+            DNSTapServerParameters.queue.get(False)
+
+    @classmethod
+    def generateRecursorConfig(cls, confdir):
+        authzonepath = os.path.join(confdir, 'example.zone')
+        with open(authzonepath, 'w') as authzone:
+            authzone.write("""$ORIGIN example.
+@ 3600 IN SOA {soa}
+a 3600 IN A 192.0.2.42
+tagged 3600 IN A 192.0.2.84
+query-selected 3600 IN A 192.0.2.84
+answer-selected 3600 IN A 192.0.2.84
+types 3600 IN A 192.0.2.84
+types 3600 IN AAAA 2001:DB8::1
+types 3600 IN TXT "Lorem ipsum dolor sit amet"
+types 3600 IN MX 10 a.example.
+types 3600 IN SPF "v=spf1 -all"
+types 3600 IN SRV 10 20 443 a.example.
+cname 3600 IN CNAME a.example.
+
+""".format(soa=cls._SOA))
+        super(TestRecursorDNSTap, cls).generateRecursorConfig(confdir)
+
+    @classmethod
+    def tearDownClass(cls):
+        cls.tearDownRecursor()
+        for listerner in DNSTapListeners:
+            listerner.close()
+
+class DNSTapDefaultTest(TestRecursorDNSTap):
+    """
+    This test makes sure that we correctly export outgoing queries over DNSTap.
+    It must be improved and setup env so we can check for incoming responses, but makes sure for now
+    that the recursor at least connects to the DNSTap server.
+    """
+
+    _confdir = 'DNSTapDefault'
+    _config_template = """
+auth-zones=example=configs/%s/example.zone""" % _confdir
+    _lua_config_file = """
+    dnstapFrameStreamServer({"127.0.0.1:%d"})
+    """ % (DNSTapServerParameters.port)
+
+    def getFirstDnstap(self):
+        data = DNSTapServerParameters.queue.get(True, timeout=2.0)
+        self.assertTrue(data)
+        dnstap = dnstap_pb2.Dnstap()
+        dnstap.ParseFromString(data)
+        return dnstap
+
+    def testA(self):
+
+        name = 'www.example.org.'
+        query = dns.message.make_query(name, 'A', want_dnssec=True)
+        query.flags |= dns.flags.RD
+        res = self.sendUDPQuery(query)
+
+        # check the DNSTap messages corresponding to the UDP query and answer
+        # check the dnstap message corresponding to the UDP query
+        dnstap = self.getFirstDnstap()
+
+        checkDnstapQuery(self, dnstap, dnstap_pb2.UDP, query, '127.0.0.8')
+        # We don't expect a response
+        checkDnstapNoExtra(self, dnstap)
+
+class DNSTapLogNoQueriesTest(TestRecursorDNSTap):
+    """
+    This test makes sure that we correctly export outgoing queries over DNSTap.
+    It must be improved and setup env so we can check for incoming responses, but makes sure for now
+    that the recursor at least connects to the DNSTap server.
+    """
+
+    _confdir = 'DNSTapLogNoQueries'
+    _config_template = """
+auth-zones=example=configs/%s/example.zone""" % _confdir
+    _lua_config_file = """
+    dnstapFrameStreamServer({"127.0.0.1:%d"}, {logQueries=false})
+    """ % (DNSTapServerParameters.port)
+
+    def testA(self):
+        name = 'www.example.org.'
+        query = dns.message.make_query(name, 'A', want_dnssec=True)
+        query.flags |= dns.flags.RD
+        res = self.sendUDPQuery(query)
+
+        # We don't expect anything
+        self.assertTrue(DNSTapServerParameters.queue.empty())
+