]> git.ipfire.org Git - thirdparty/pdns.git/commitdiff
Add dnstap-compatible protobuf support to dnsdist. 6170/head
authorChris Hofstaedtler <chris.hofstaedtler@deduktiva.com>
Tue, 23 Jan 2018 14:36:45 +0000 (15:36 +0100)
committerChris Hofstaedtler <chris.hofstaedtler@deduktiva.com>
Thu, 25 Jan 2018 10:52:44 +0000 (11:52 +0100)
Based on a PR by Justin Valentini <jvalentini@dyn.com>.

27 files changed:
build-scripts/travis.sh
m4/pdns_check_fstrm.m4 [new file with mode: 0644]
pdns/dnsdist-console.cc
pdns/dnsdist-lua-actions.cc
pdns/dnsdist-lua-bindings.cc
pdns/dnsdist.cc
pdns/dnsdistdist/.gitignore
pdns/dnsdistdist/Makefile.am
pdns/dnsdistdist/configure.ac
pdns/dnsdistdist/dnstap.cc [new symlink]
pdns/dnsdistdist/dnstap.hh [new symlink]
pdns/dnsdistdist/dnstap.proto [new symlink]
pdns/dnsdistdist/docs/reference/dnstap.rst [new file with mode: 0644]
pdns/dnsdistdist/docs/reference/index.rst
pdns/dnsdistdist/docs/rules-actions.rst
pdns/dnsdistdist/fstrm_logger.cc [new symlink]
pdns/dnsdistdist/fstrm_logger.hh [new symlink]
pdns/dnsdistdist/m4/pdns_check_fstrm.m4 [new symlink]
pdns/dnstap.cc [new file with mode: 0644]
pdns/dnstap.hh [new file with mode: 0644]
pdns/dnstap.proto [new file with mode: 0644]
pdns/fstrm_logger.cc [new file with mode: 0644]
pdns/fstrm_logger.hh [new file with mode: 0644]
pdns/remote_logger.hh
regression-tests.dnsdist/.gitignore
regression-tests.dnsdist/runtests
regression-tests.dnsdist/test_Dnstap.py [new file with mode: 0644]

index bf5f2636a31ce99e0a416c0966ccb1e26cbbb407..58d909a8c529b189a1e9c1f6ed9d66b76e52f84f 100755 (executable)
@@ -347,9 +347,13 @@ install_recursor() {
 
 install_dnsdist() {
   # test requirements / setup
+  run "sudo add-apt-repository -y ppa:zeha/libfstrm-ppa"
+  run 'curl "http://keyserver.ubuntu.com:11371/pks/lookup?op=get&search=0x396160EF8126A2E2" | sudo apt-key add - '
+  run "sudo apt-get -qq update"
   run "sudo apt-get -qq --no-install-recommends install \
     snmpd \
-    libsnmp-dev"
+    libsnmp-dev \
+    libfstrm-dev"
   run "sudo sed -i \"s/agentxperms 0700 0755 dnsdist/agentxperms 0700 0755 ${USER}/g\" regression-tests.dnsdist/snmpd.conf"
   run "sudo cp -f regression-tests.dnsdist/snmpd.conf /etc/snmp/snmpd.conf"
   run "sudo service snmpd restart"
@@ -408,6 +412,7 @@ build_dnsdist(){
     --enable-libsodium \
     --enable-dnscrypt \
     --enable-dns-over-tls \
+    --enable-fstrm \
     --prefix=$HOME/dnsdist \
     --disable-silent-rules"
   run "make -k -j3"
diff --git a/m4/pdns_check_fstrm.m4 b/m4/pdns_check_fstrm.m4
new file mode 100644 (file)
index 0000000..2eb7936
--- /dev/null
@@ -0,0 +1,30 @@
+AC_DEFUN([PDNS_CHECK_FSTRM], [
+  AC_MSG_CHECKING([whether we will be linking in fstrm])
+  AC_ARG_ENABLE([fstrm],
+    AS_HELP_STRING([--enable-fstrm],[use fstrm @<:@default=auto@:>@]),
+    [enable_fstrm=$enableval],
+    [enable_fstrm=auto],
+  )
+  AC_MSG_RESULT([$enable_fstrm])
+
+  AS_IF([test "x$enable_fstrm" != "xno"], [
+    AS_IF([test "x$enable_fstrm" = "xyes" -o "x$enable_fstrm" = "xauto"], [
+      PKG_CHECK_MODULES([FSTRM], [libfstrm], [
+        AC_DEFINE([HAVE_FSTRM], [1], [Define to 1 if you have libfstrm])
+        save_CFLAGS=$CFLAGS
+        save_LIBS=$LIBS
+        CFLAGS="$FSTRM_CFLAGS $CFLAGS"
+        LIBS="$FSTRM_LIBS $LIBS"
+        AC_CHECK_FUNCS([fstrm_tcp_writer_init])
+        CFLAGS=$save_CFLAGS
+        LIBS=$save_LIBS
+      ], [ : ])
+    ])
+  ])
+  AM_CONDITIONAL([FSTRM], [test "x$FSTRM_LIBS" != "x"])
+  AS_IF([test "x$enable_fstrm" = "xyes"], [
+    AS_IF([test x"$FSTRM_LIBS" = "x"], [
+      AC_MSG_ERROR([fstrm requested but libfstrm was not found])
+    ])
+  ])
+])
index 1f267c0aa9ca4aa9841f8f132b25d561f0272106..7d9d0209eedbb9c934338d71942db6f3a9d2d92a 100644 (file)
@@ -308,6 +308,8 @@ const std::vector<ConsoleKeyword> g_consoleKeywords{
   { "DelayResponseAction", true, "milliseconds", "delay the response by the specified amount of milliseconds (UDP-only)" },
   { "delta", true, "", "shows all commands entered that changed the configuration" },
   { "DisableValidationAction", true, "", "set the CD bit in the question, let it go through" },
+  { "DnstapLogAction", true, "identity, FrameStreamLogger [, alterFunction]", "send the contents of this query to a FrameStreamLogger or RemoteLogger as dnstap. `alterFunction` is a callback, receiving a DNSQuestion and a DnstapMessage, that can be used to modify the dnstap message" },
+  { "DnstapLogResponseAction", true, "identity, FrameStreamLogger [, alterFunction]", "send the contents of this response to a remote or FrameStreamLogger or RemoteLogger as dnstap. `alterFunction` is a callback, receiving a DNSResponse and a DnstapMessage, that can be used to modify the dnstap message" },
   { "DropAction", true, "", "drop these packets" },
   { "DropResponseAction", true, "", "drop these packets" },
   { "dumpStats", true, "", "print all statistics we gather" },
@@ -341,6 +343,8 @@ const std::vector<ConsoleKeyword> g_consoleKeywords{
   { "mvRule", true, "from, to", "move rule 'from' to a position where it is in front of 'to'. 'to' can be one larger than the largest rule, in which case the rule will be moved to the last position" },
   { "mvSelfAnsweredResponseRule", true, "from, to", "move self-answered response rule 'from' to a position where it is in front of 'to'. 'to' can be one larger than the largest rule" },
   { "newDNSName", true, "name", "make a DNSName based on this .-terminated name" },
+  { "newFrameStreamTcpLogger", true, "addr", "create a FrameStream logger object writing to a TCP address (addr should be ip:port), to use with `DnstapLogAction()` and `DnstapLogResponseAction()`" },
+  { "newFrameStreamUnixLogger", true, "socket", "create a FrameStream logger object writing to a local unix socket, to use with `DnstapLogAction()` and `DnstapLogResponseAction()`" },
   { "newPacketCache", true, "maxEntries[, maxTTL=86400, minTTL=0, temporaryFailureTTL=60, staleTTL=60, dontAge=false, numberOfShards=1, deferrableInsertLock=true]", "return a new Packet Cache" },
   { "newQPSLimiter", true, "rate, burst", "configure a QPS limiter with that rate and that burst capacity" },
   { "newRemoteLogger", true, "address:port [, timeout=2, maxQueuedEntries=100, reconnectWaitTime=1]", "create a Remote Logger object, to use with `RemoteLogAction()` and `RemoteLogResponseAction()`" },
index 351d407fb3a9d925082528c1119f4f6effe46e99..1dc577821821eddf150d7b42e1039f9b8d97f2f2 100644 (file)
@@ -25,7 +25,9 @@
 #include "dnsdist-protobuf.hh"
 
 #include "dolog.hh"
+#include "dnstap.hh"
 #include "ednsoptions.hh"
+#include "fstrm_logger.hh"
 #include "remote_logger.hh"
 
 class DropAction : public DNSAction
@@ -599,10 +601,42 @@ public:
   }
 };
 
+class DnstapLogAction : public DNSAction, public boost::noncopyable
+{
+public:
+  DnstapLogAction(const std::string& identity, std::shared_ptr<RemoteLoggerInterface> logger, boost::optional<std::function<void(const DNSQuestion&, DnstapMessage*)> > alterFunc): d_identity(identity), d_logger(logger), d_alterFunc(alterFunc)
+  {
+  }
+  DNSAction::Action operator()(DNSQuestion* dq, string* ruleresult) const override
+  {
+#ifdef HAVE_PROTOBUF
+    DnstapMessage message(d_identity, dq->remote, dq->local, dq->tcp, reinterpret_cast<const char*>(dq->dh), dq->len, dq->queryTime, nullptr);
+    {
+      if (d_alterFunc) {
+        std::lock_guard<std::mutex> lock(g_luamutex);
+        (*d_alterFunc)(*dq, &message);
+      }
+    }
+    std::string data;
+    message.serialize(data);
+    d_logger->queueData(data);
+#endif /* HAVE_PROTOBUF */
+    return Action::None;
+  }
+  string toString() const override
+  {
+    return "remote log as dnstap to " + (d_logger ? d_logger->toString() : "");
+  }
+private:
+  std::string d_identity;
+  std::shared_ptr<RemoteLoggerInterface> d_logger;
+  boost::optional<std::function<void(const DNSQuestion&, DnstapMessage*)> > d_alterFunc;
+};
+
 class RemoteLogAction : public DNSAction, public boost::noncopyable
 {
 public:
-  RemoteLogAction(std::shared_ptr<RemoteLogger> logger, boost::optional<std::function<void(const DNSQuestion&, DNSDistProtoBufMessage*)> > alterFunc): d_logger(logger), d_alterFunc(alterFunc)
+  RemoteLogAction(std::shared_ptr<RemoteLoggerInterface> logger, boost::optional<std::function<void(const DNSQuestion&, DNSDistProtoBufMessage*)> > alterFunc): d_logger(logger), d_alterFunc(alterFunc)
   {
   }
   DNSAction::Action operator()(DNSQuestion* dq, string* ruleresult) const override
@@ -630,7 +664,7 @@ public:
     return "remote log to " + (d_logger ? d_logger->toString() : "");
   }
 private:
-  std::shared_ptr<RemoteLogger> d_logger;
+  std::shared_ptr<RemoteLoggerInterface> d_logger;
   boost::optional<std::function<void(const DNSQuestion&, DNSDistProtoBufMessage*)> > d_alterFunc;
 };
 
@@ -681,10 +715,44 @@ private:
   std::string d_value;
 };
 
+class DnstapLogResponseAction : public DNSResponseAction, public boost::noncopyable
+{
+public:
+  DnstapLogResponseAction(const std::string& identity, std::shared_ptr<RemoteLoggerInterface> logger, boost::optional<std::function<void(const DNSResponse&, DnstapMessage*)> > alterFunc): d_identity(identity), d_logger(logger), d_alterFunc(alterFunc)
+  {
+  }
+  DNSResponseAction::Action operator()(DNSResponse* dr, string* ruleresult) const override
+  {
+#ifdef HAVE_PROTOBUF
+    struct timespec now;
+    gettime(&now, true);
+    DnstapMessage message(d_identity, dr->remote, dr->local, dr->tcp, reinterpret_cast<const char*>(dr->dh), dr->len, dr->queryTime, &now);
+    {
+      if (d_alterFunc) {
+        std::lock_guard<std::mutex> lock(g_luamutex);
+        (*d_alterFunc)(*dr, &message);
+      }
+    }
+    std::string data;
+    message.serialize(data);
+    d_logger->queueData(data);
+#endif /* HAVE_PROTOBUF */
+    return Action::None;
+  }
+  string toString() const override
+  {
+    return "log response as dnstap to " + (d_logger ? d_logger->toString() : "");
+  }
+private:
+  std::string d_identity;
+  std::shared_ptr<RemoteLoggerInterface> d_logger;
+  boost::optional<std::function<void(const DNSResponse&, DnstapMessage*)> > d_alterFunc;
+};
+
 class RemoteLogResponseAction : public DNSResponseAction, public boost::noncopyable
 {
 public:
-  RemoteLogResponseAction(std::shared_ptr<RemoteLogger> logger, boost::optional<std::function<void(const DNSResponse&, DNSDistProtoBufMessage*)> > alterFunc, bool includeCNAME): d_logger(logger), d_alterFunc(alterFunc), d_includeCNAME(includeCNAME)
+  RemoteLogResponseAction(std::shared_ptr<RemoteLoggerInterface> logger, boost::optional<std::function<void(const DNSResponse&, DNSDistProtoBufMessage*)> > alterFunc, bool includeCNAME): d_logger(logger), d_alterFunc(alterFunc), d_includeCNAME(includeCNAME)
   {
   }
   DNSResponseAction::Action operator()(DNSResponse* dr, string* ruleresult) const override
@@ -712,7 +780,7 @@ public:
     return "remote log response to " + (d_logger ? d_logger->toString() : "");
   }
 private:
-  std::shared_ptr<RemoteLogger> d_logger;
+  std::shared_ptr<RemoteLoggerInterface> d_logger;
   boost::optional<std::function<void(const DNSResponse&, DNSDistProtoBufMessage*)> > d_alterFunc;
   bool d_includeCNAME;
 };
@@ -991,7 +1059,13 @@ void setupLuaActions()
       return std::shared_ptr<DNSResponseAction>(new LuaResponseAction(func));
     });
 
-  g_lua.writeFunction("RemoteLogAction", [](std::shared_ptr<RemoteLogger> logger, boost::optional<std::function<void(const DNSQuestion&, DNSDistProtoBufMessage*)> > alterFunc) {
+  g_lua.writeFunction("RemoteLogAction", [](std::shared_ptr<RemoteLoggerInterface> logger, boost::optional<std::function<void(const DNSQuestion&, DNSDistProtoBufMessage*)> > alterFunc) {
+      // avoids potentially-evaluated-expression warning with clang.
+      RemoteLoggerInterface& rl = *logger.get();
+      if (typeid(rl) != typeid(RemoteLogger)) {
+        // We could let the user do what he wants, but wrapping PowerDNS Protobuf inside a FrameStream tagged as dnstap is logically wrong.
+        throw std::runtime_error(std::string("RemoteLogAction only takes RemoteLogger. For other types, please look at DnstapLogAction."));
+      }
 #ifdef HAVE_PROTOBUF
       return std::shared_ptr<DNSAction>(new RemoteLogAction(logger, alterFunc));
 #else
@@ -999,7 +1073,13 @@ void setupLuaActions()
 #endif
     });
 
-  g_lua.writeFunction("RemoteLogResponseAction", [](std::shared_ptr<RemoteLogger> logger, boost::optional<std::function<void(const DNSResponse&, DNSDistProtoBufMessage*)> > alterFunc, boost::optional<bool> includeCNAME) {
+  g_lua.writeFunction("RemoteLogResponseAction", [](std::shared_ptr<RemoteLoggerInterface> logger, boost::optional<std::function<void(const DNSResponse&, DNSDistProtoBufMessage*)> > alterFunc, boost::optional<bool> includeCNAME) {
+      // avoids potentially-evaluated-expression warning with clang.
+      RemoteLoggerInterface& rl = *logger.get();
+      if (typeid(rl) != typeid(RemoteLogger)) {
+        // We could let the user do what he wants, but wrapping PowerDNS Protobuf inside a FrameStream tagged as dnstap is logically wrong.
+        throw std::runtime_error("RemoteLogResponseAction only takes RemoteLogger. For other types, please look at DnstapLogResponseAction.");
+      }
 #ifdef HAVE_PROTOBUF
       return std::shared_ptr<DNSResponseAction>(new RemoteLogResponseAction(logger, alterFunc, includeCNAME ? *includeCNAME : false));
 #else
@@ -1007,6 +1087,22 @@ void setupLuaActions()
 #endif
     });
 
+  g_lua.writeFunction("DnstapLogAction", [](const std::string& identity, std::shared_ptr<RemoteLoggerInterface> logger, boost::optional<std::function<void(const DNSQuestion&, DnstapMessage*)> > alterFunc) {
+#ifdef HAVE_PROTOBUF
+      return std::shared_ptr<DNSAction>(new DnstapLogAction(identity, logger, alterFunc));
+#else
+      throw std::runtime_error("Protobuf support is required to use DnstapLogAction");
+#endif
+    });
+
+  g_lua.writeFunction("DnstapLogResponseAction", [](const std::string& identity, std::shared_ptr<RemoteLoggerInterface> logger, boost::optional<std::function<void(const DNSResponse&, DnstapMessage*)> > alterFunc) {
+#ifdef HAVE_PROTOBUF
+      return std::shared_ptr<DNSResponseAction>(new DnstapLogResponseAction(identity, logger, alterFunc));
+#else
+      throw std::runtime_error("Protobuf support is required to use DnstapLogResponseAction");
+#endif
+    });
+
   g_lua.writeFunction("TeeAction", [](const std::string& remote, boost::optional<bool> addECS) {
       return std::shared_ptr<DNSAction>(new TeeAction(ComboAddress(remote, 53), addECS ? *addECS : false));
     });
index 9de16a27939fdf8daf57a2152ccba0640d57f2a7..e8cc99f5a43afc437a2ac2cef537a125be55b116 100644 (file)
@@ -23,7 +23,9 @@
 #include "dnsdist-lua.hh"
 #include "dnsdist-protobuf.hh"
 
+#include "dnstap.hh"
 #include "dolog.hh"
+#include "fstrm_logger.hh"
 #include "remote_logger.hh"
 
 void setupLuaBindings(bool client)
@@ -257,13 +259,40 @@ void setupLuaBindings(bool client)
       message.setResponder(str);
     });
 
+  g_lua.registerFunction<std::string(DnstapMessage::*)()>("toDebugString", [](const DnstapMessage& message) { return message.toDebugString(); });
+  g_lua.registerFunction<void(DnstapMessage::*)(const std::string&)>("setExtra", [](DnstapMessage& message, const std::string& str) {
+      message.setExtra(str);
+    });
+
   /* RemoteLogger */
   g_lua.writeFunction("newRemoteLogger", [client](const std::string& remote, boost::optional<uint16_t> timeout, boost::optional<uint64_t> maxQueuedEntries, boost::optional<uint8_t> reconnectWaitTime) {
       if (client) {
-        return std::shared_ptr<RemoteLogger>();
+        return std::shared_ptr<RemoteLoggerInterface>();
       }
-      return std::make_shared<RemoteLogger>(ComboAddress(remote), timeout ? *timeout : 2, maxQueuedEntries ? *maxQueuedEntries : 100, reconnectWaitTime ? *reconnectWaitTime : 1);
-      });
+      return std::shared_ptr<RemoteLoggerInterface>(new RemoteLogger(ComboAddress(remote), timeout ? *timeout : 2, maxQueuedEntries ? *maxQueuedEntries : 100, reconnectWaitTime ? *reconnectWaitTime : 1));
+    });
+
+  g_lua.writeFunction("newFrameStreamUnixLogger", [client](const std::string& address) {
+      if (client) {
+        return std::shared_ptr<RemoteLoggerInterface>();
+      }
+#ifdef HAVE_FSTRM
+      return std::shared_ptr<RemoteLoggerInterface>(new FrameStreamLogger(AF_UNIX, address));
+#else
+      throw std::runtime_error("fstrm support is required to build an AF_UNIX FrameStreamLogger");
+#endif /* HAVE_FSTRM */
+    });
+
+  g_lua.writeFunction("newFrameStreamTcpLogger", [client](const std::string& address) {
+      if (client) {
+        return std::shared_ptr<RemoteLoggerInterface>();
+      }
+#if defined(HAVE_FSTRM) && defined(HAVE_FSTRM_TCP_WRITER_INIT)
+      return std::shared_ptr<RemoteLoggerInterface>(new FrameStreamLogger(AF_INET, address));
+#else
+      throw std::runtime_error("fstrm with TCP support is required to build an AF_INET FrameStreamLogger");
+#endif /* HAVE_FSTRM */
+    });
 
 #ifdef HAVE_DNSCRYPT
   /* DnsCryptContext bindings */
index 8a36a0db6c807af502752950166f14fa3c1d0a0f..6ee5fb3564dbe3539fba30a77cf694d43517b3ec 100644 (file)
@@ -2163,6 +2163,9 @@ try
 #ifdef HAVE_LIBSODIUM
       cout<<"libsodium ";
 #endif
+#ifdef HAVE_FSTRM
+      cout<<"fstrm ";
+#endif
 #ifdef HAVE_PROTOBUF
       cout<<"protobuf ";
 #endif
index 6878668c80a49c4f1f80aa619b5c60483c9e4bd3..ea53c9fffefc9e1bc90079d9eda6de4c5e69628a 100644 (file)
@@ -30,8 +30,8 @@
 /missing
 /testrunner
 /dnsdist
-/dnsmessage.pb.cc
-/dnsmessage.pb.h
+/*.pb.cc
+/*.pb.h
 /dnsdist.service
 /dnsdist@.service
 /lua.hpp
index 30b4eddab5feebe3aa85f6c2e4b4386f4131b9af..c884135f0b7f31ec2704849db96af9c5e271f18d 100644 (file)
@@ -1,10 +1,10 @@
-AM_CPPFLAGS += $(SYSTEMD_CFLAGS) $(LUA_CFLAGS) $(LIBEDIT_CFLAGS) $(LIBSODIUM_CFLAGS) $(YAHTTP_CFLAGS) $(SANITIZER_FLAGS) $(NET_SNMP_CFLAGS) -DSYSCONFDIR=\"${sysconfdir}\"
+AM_CPPFLAGS += $(SYSTEMD_CFLAGS) $(LUA_CFLAGS) $(LIBEDIT_CFLAGS) $(LIBSODIUM_CFLAGS) $(FSTRM_CFLAGS) $(YAHTTP_CFLAGS) $(SANITIZER_FLAGS) $(NET_SNMP_CFLAGS) -DSYSCONFDIR=\"${sysconfdir}\"
 
 ACLOCAL_AMFLAGS = -I m4
 
 SUBDIRS=ext/yahttp
 
-CLEANFILES = dnsmessage.pb.cc dnsmessage.pb.h htmlfiles.h.tmp htmlfiles.h
+CLEANFILES = dnsmessage.pb.cc dnsmessage.pb.h htmlfiles.h.tmp htmlfiles.h dnstap.pb.cc dnstap.pb.h
 
 dnslabeltext.cc: dnslabeltext.rl
        $(AM_V_GEN)$(RAGEL) $< -o dnslabeltext.cc
@@ -41,6 +41,7 @@ endif
 EXTRA_DIST=dnslabeltext.rl \
           dnsdistconf.lua \
           dnsmessage.proto \
+          dnstap.proto \
           README.md \
           delaypipe.cc delaypipe.hh \
           html \
@@ -108,6 +109,7 @@ dnsdist_SOURCES = \
        ednsoptions.cc ednsoptions.hh \
        ednscookies.cc ednscookies.hh \
        ednssubnet.cc ednssubnet.hh \
+       fstrm_logger.cc fstrm_logger.hh \
        gettime.cc gettime.hh \
        htmlfiles.h \
        iputils.cc iputils.hh \
@@ -117,6 +119,7 @@ dnsdist_SOURCES = \
        namespaces.hh \
        pdnsexception.hh \
        protobuf.cc protobuf.hh \
+       dnstap.cc dnstap.hh \
        qtype.cc qtype.hh \
        remote_logger.cc remote_logger.hh \
        selectmplexer.cc \
@@ -144,6 +147,7 @@ dnsdist_LDADD = \
        $(RT_LIBS) \
        $(YAHTTP_LIBS) \
        $(LIBSODIUM_LIBS) \
+       $(FSTRM_LIBS) \
        $(SANITIZER_FLAGS) \
        $(SYSTEMD_LIBS) \
        $(NET_SNMP_LIBS)
@@ -173,12 +177,15 @@ if HAVE_PROTOC
 dnsmessage.pb.cc: dnsmessage.proto
        $(AM_V_GEN)$(PROTOC) -I$(srcdir) --cpp_out=./ $<
 
-BUILT_SOURCES += dnsmessage.pb.cc
+dnstap.pb.cc: dnstap.proto
+       $(AM_V_GEN)$(PROTOC) -I$(srcdir) --cpp_out=./ $<
+
+BUILT_SOURCES += dnsmessage.pb.cc dnstap.pb.cc
 
-nodist_dnsdist_SOURCES = dnsmessage.pb.cc dnsmessage.pb.h
+nodist_dnsdist_SOURCES = dnsmessage.pb.cc dnsmessage.pb.h dnstap.pb.cc dnstap.pb.h
 dnsdist_LDADD += $(PROTOBUF_LIBS)
 
-dnsdist.$(OBJEXT): dnsmessage.pb.cc
+dnsdist.$(OBJEXT): dnsmessage.pb.cc dnstap.pb.cc
 endif
 endif
 
@@ -237,6 +244,7 @@ testrunner_LDFLAGS = \
 testrunner_LDADD = \
        $(BOOST_UNIT_TEST_FRAMEWORK_LIBS) \
        $(LIBSODIUM_LIBS) \
+       $(FSTRM_LIBS) \
        $(RT_LIBS) \
        $(SANITIZER_FLAGS)
 
index 5354fea36dc46281d92b874721d174e1a96fcc6e..6098778eb401b2a046f80834f3597953cb88bfaf 100644 (file)
@@ -14,6 +14,7 @@ LT_PREREQ([2.2.2])
 LT_INIT([disable-static])
 
 PDNS_CHECK_LIBSODIUM
+PDNS_CHECK_FSTRM
 PDNS_CHECK_RAGEL([dnslabeltext.cc], [www.dnsdist.org])
 PDNS_CHECK_LIBEDIT
 PDNS_CHECK_CLOCK_GETTIME
@@ -145,6 +146,10 @@ AS_IF([test "x$enable_dnscrypt" != "xno"],
   [AC_MSG_NOTICE([DNSCrypt: yes])],
   [AC_MSG_NOTICE([DNSCrypt: no])]
 )
+AS_IF([test "x$FSTRM_LIBS" != "x"],
+  [AC_MSG_NOTICE([fstrm: yes])],
+  [AC_MSG_NOTICE([fstrm: no])]
+)
 AS_IF([test "x$RE2_LIBS" != "x"],
   [AC_MSG_NOTICE([re2: yes])],
   [AC_MSG_NOTICE([re2: no])]
diff --git a/pdns/dnsdistdist/dnstap.cc b/pdns/dnsdistdist/dnstap.cc
new file mode 120000 (symlink)
index 0000000..06c8e37
--- /dev/null
@@ -0,0 +1 @@
+../dnstap.cc
\ No newline at end of file
diff --git a/pdns/dnsdistdist/dnstap.hh b/pdns/dnsdistdist/dnstap.hh
new file mode 120000 (symlink)
index 0000000..9fd70f0
--- /dev/null
@@ -0,0 +1 @@
+../dnstap.hh
\ No newline at end of file
diff --git a/pdns/dnsdistdist/dnstap.proto b/pdns/dnsdistdist/dnstap.proto
new file mode 120000 (symlink)
index 0000000..6b6dfbd
--- /dev/null
@@ -0,0 +1 @@
+../dnstap.proto
\ No newline at end of file
diff --git a/pdns/dnsdistdist/docs/reference/dnstap.rst b/pdns/dnsdistdist/docs/reference/dnstap.rst
new file mode 100644 (file)
index 0000000..b486262
--- /dev/null
@@ -0,0 +1,38 @@
+dnstap Logging Reference
+========================
+
+http://dnstap.info is a flexible, structured binary log format for DNS software.
+Reader implementations in various languages exist.
+
+Canonically, dnstap is sent over a FrameStream socket, either a local AF_UNIX (see :func:`newFrameStreamUnixLogger`) or a TCP/IP socket (see :func:`newFrameStreamTcpLogger`).
+As an extension, :program:`dnsdist` can send raw dnstap protobuf messages over a :func:`newRemoteLogger`.
+
+To use FrameStream transport, :program:`dnsdist` must have been built with `libfstrm`.
+
+.. function:: newFrameStreamUnixLogger(path)
+
+  Create a Frame Stream Logger object, to use with :func:`DnstapLogAction` and :func:`DnstapLogResponseAction`.
+  This version will log to a local AF_UNIX socket.
+
+  :param string path: A local AF_UNIX socket path. Note that most platforms have a rather short limit on the length.
+
+.. function:: newFrameStreamTcpLogger(address)
+
+  Create a Frame Stream Logger object, to use with :func:`DnstapLogAction` and :func:`DnstapLogResponseAction`.
+  This version will log to a local AF_UNIX socket.
+
+  :param string address: An IP:PORT combination where the logger will connect to. Needs tcp_writer support in libfstrm.
+
+.. class:: DnstapMessage
+
+  This object represents a single dnstap message as emitted by :program:`dnsdist`.
+
+.. classmethod:: DnstapMessage:setExtra(extraData)
+
+  Set the time at which the query or response has been received.
+
+  :param string extraData: Extra data stuffed into the dnstap "extra" field.
+
+.. classmethod:: DnstapMessage:toDebugString() -> string
+
+  Return a string containing the content of the message
index 3b9ffa1491b27d2907ee0355d2d7cebc1eb21a93..1df3ce56b13b1add475b2e0ef92045dd23726602 100644 (file)
@@ -15,6 +15,7 @@ These chapters contain extensive information on all functions and object availab
   ebpf
   dnscrypt
   protobuf
+  dnstap
   carbon
   snmp
   tuning
index e704162a5a970378610a54ea8e7dbb970979313d..d603db49bf0bb9357d36c33ff2972162ea838458 100644 (file)
@@ -727,6 +727,24 @@ The following actions exist.
 
   Set the CD bit in the query and let it go through.
 
+.. function:: DnstapLogAction(identity, logger[, alterFunction])
+
+  Send the the current query to a remote logger as a dnstap message.
+  ``alterFunction`` is a callback, receiving a :class:`DNSQuestion` and a :class:`DnstapMessage`, that can be used to modify the message.
+
+  :param string identity: Server identity to store in the dnstap message
+  :param logger: The :func:`FrameStreamLogger <newFrameStreamUnixLogger>` or :func:`RemoteLogger <newRemoteLogger>` object to write to
+  :param alterFunction: A Lua function to alter the message before sending
+
+.. function:: DnstapLogResponseAction(identity, logger[, alterFunction])
+
+  Send the the current response to a remote logger as a dnstap message.
+  ``alterFunction`` is a callback, receiving a :class:`DNSQuestion` and a :class:`DnstapMessage`, that can be used to modify the message.
+
+  :param string identity: Server identity to store in the dnstap message
+  :param logger: The :func:`FrameStreamLogger <newFrameStreamUnixLogger>` or :func:`RemoteLogger <newRemoteLogger>` object to write to
+  :param alterFunction: A Lua function to alter the message before sending
+
 .. function:: DropAction()
 
   Drop the packet.
diff --git a/pdns/dnsdistdist/fstrm_logger.cc b/pdns/dnsdistdist/fstrm_logger.cc
new file mode 120000 (symlink)
index 0000000..e66c9cc
--- /dev/null
@@ -0,0 +1 @@
+../fstrm_logger.cc
\ No newline at end of file
diff --git a/pdns/dnsdistdist/fstrm_logger.hh b/pdns/dnsdistdist/fstrm_logger.hh
new file mode 120000 (symlink)
index 0000000..b4898e9
--- /dev/null
@@ -0,0 +1 @@
+../fstrm_logger.hh
\ No newline at end of file
diff --git a/pdns/dnsdistdist/m4/pdns_check_fstrm.m4 b/pdns/dnsdistdist/m4/pdns_check_fstrm.m4
new file mode 120000 (symlink)
index 0000000..f6852de
--- /dev/null
@@ -0,0 +1 @@
+../../../m4/pdns_check_fstrm.m4
\ No newline at end of file
diff --git a/pdns/dnstap.cc b/pdns/dnstap.cc
new file mode 100644 (file)
index 0000000..65d9673
--- /dev/null
@@ -0,0 +1,76 @@
+#include "config.h"
+#include "gettime.hh"
+#include "dnstap.hh"
+
+DnstapMessage::DnstapMessage(const std::string& identity, const ComboAddress* requestor, const ComboAddress* responder, bool isTCP, const char* packet, const size_t len, const struct timespec* queryTime, const struct timespec* responseTime)
+{
+#ifdef HAVE_PROTOBUF
+  const struct dnsheader* dh = reinterpret_cast<const struct dnsheader*>(packet);
+
+  proto_message.set_identity(identity);
+  proto_message.set_version(PACKAGE_STRING);
+  proto_message.set_type(dnstap::Dnstap::MESSAGE);
+
+  dnstap::Message* message = proto_message.mutable_message();
+
+  message->set_type(!dh->qr ? dnstap::Message_Type_CLIENT_QUERY : dnstap::Message_Type_CLIENT_RESPONSE);
+
+  message->set_socket_family(requestor->sin4.sin_family == AF_INET ? dnstap::INET : dnstap::INET6);
+  message->set_socket_protocol(isTCP ? dnstap::TCP : dnstap::UDP);
+
+  if (requestor->sin4.sin_family == AF_INET) {
+    message->set_query_address(&requestor->sin4.sin_addr.s_addr, sizeof(requestor->sin4.sin_addr.s_addr));
+  }
+  else if (requestor->sin4.sin_family == AF_INET6) {
+    message->set_query_address(&requestor->sin6.sin6_addr.s6_addr, sizeof(requestor->sin6.sin6_addr.s6_addr));
+  }
+  message->set_query_port(ntohs(requestor->sin4.sin_port));
+
+  if (requestor->sin4.sin_family == AF_INET) {
+    message->set_response_address(&responder->sin4.sin_addr.s_addr, sizeof(responder->sin4.sin_addr.s_addr));
+  }
+  else if (requestor->sin4.sin_family == AF_INET6) {
+    message->set_response_address(&responder->sin6.sin6_addr.s6_addr, sizeof(responder->sin6.sin6_addr.s6_addr));
+  }
+  message->set_response_port(ntohs(responder->sin4.sin_port));
+
+  if (queryTime != nullptr) {
+    message->set_query_time_sec(queryTime->tv_sec);
+    message->set_query_time_nsec(queryTime->tv_nsec / 1000);
+  }
+  if (responseTime != nullptr) {
+    message->set_response_time_sec(responseTime->tv_sec);
+    message->set_response_time_nsec(responseTime->tv_nsec / 1000);
+  }
+
+  if (!dh->qr) {
+    message->set_query_message(packet, len);
+  } else {
+    message->set_response_message(packet, len);
+  }
+#endif /* HAVE_PROTOBUF */
+}
+
+void DnstapMessage::serialize(std::string& data) const
+{
+#ifdef HAVE_PROTOBUF
+  proto_message.SerializeToString(&data);
+#endif /* HAVE_PROTOBUF */
+}
+
+std::string DnstapMessage::toDebugString() const
+{
+  return
+#ifdef HAVE_PROTOBUF
+    proto_message.DebugString();
+#else
+    "";
+#endif /* HAVE_PROTOBUF */
+}
+
+void DnstapMessage::setExtra(const std::string& extra)
+{
+#ifdef HAVE_PROTOBUF
+  proto_message.set_extra(extra);
+#endif /* HAVE_PROTOBUF */
+}
diff --git a/pdns/dnstap.hh b/pdns/dnstap.hh
new file mode 100644 (file)
index 0000000..e25df4f
--- /dev/null
@@ -0,0 +1,51 @@
+/*
+ * This file is part of PowerDNS or dnsdist.
+ * Copyright -- PowerDNS.COM B.V. and its contributors
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of version 2 of the GNU General Public License as
+ * published by the Free Software Foundation.
+ *
+ * In addition, for the avoidance of any doubt, permission is granted to
+ * link this program with OpenSSL and to (re)distribute the binaries
+ * produced as the result of such linking.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
+ */
+#pragma once
+
+#include <cstddef>
+#include <string>
+
+#include "config.h"
+
+#include "dnsname.hh"
+#include "iputils.hh"
+
+#ifdef HAVE_PROTOBUF
+#include <boost/uuid/uuid.hpp>
+#include <boost/uuid/uuid_generators.hpp>
+#include "dnstap.pb.h"
+#endif /* HAVE_PROTOBUF */
+
+class DnstapMessage
+{
+public:
+  DnstapMessage(const std::string& identity, const ComboAddress* requestor, const ComboAddress* responder, bool isTCP, const char* packet, const size_t len, const struct timespec* queryTime, const struct timespec* responseTime);
+  void serialize(std::string& data) const;
+  std::string toDebugString() const;
+
+  void setExtra(const std::string& extra);
+
+#ifdef HAVE_PROTOBUF
+protected:
+  dnstap::Dnstap proto_message;
+#endif /* HAVE_PROTOBUF */
+};
\ No newline at end of file
diff --git a/pdns/dnstap.proto b/pdns/dnstap.proto
new file mode 100644 (file)
index 0000000..1ed1bb0
--- /dev/null
@@ -0,0 +1,268 @@
+// dnstap: flexible, structured event replication format for DNS software
+//
+// This file contains the protobuf schemas for the "dnstap" structured event
+// replication format for DNS software.
+
+// Written in 2013-2014 by Farsight Security, Inc.
+//
+// To the extent possible under law, the author(s) have dedicated all
+// copyright and related and neighboring rights to this file to the public
+// domain worldwide. This file is distributed without any warranty.
+//
+// You should have received a copy of the CC0 Public Domain Dedication along
+// with this file. If not, see:
+//
+// <http://creativecommons.org/publicdomain/zero/1.0/>.
+
+package dnstap;
+
+// "Dnstap": this is the top-level dnstap type, which is a "union" type that
+// contains other kinds of dnstap payloads, although currently only one type
+// of dnstap payload is defined.
+// See: https://developers.google.com/protocol-buffers/docs/techniques#union
+message Dnstap {
+    // DNS server identity.
+    // If enabled, this is the identity string of the DNS server which generated
+    // this message. Typically this would be the same string as returned by an
+    // "NSID" (RFC 5001) query.
+    optional bytes      identity = 1;
+
+    // DNS server version.
+    // If enabled, this is the version string of the DNS server which generated
+    // this message. Typically this would be the same string as returned by a
+    // "version.bind" query.
+    optional bytes      version = 2;
+
+    // Extra data for this payload.
+    // This field can be used for adding an arbitrary byte-string annotation to
+    // the payload. No encoding or interpretation is applied or enforced.
+    optional bytes      extra = 3;
+
+    // Identifies which field below is filled in.
+    enum Type {
+        MESSAGE = 1;
+    }
+    required Type       type = 15;
+
+    // One of the following will be filled in.
+    optional Message    message = 14;
+}
+
+// SocketFamily: the network protocol family of a socket. This specifies how
+// to interpret "network address" fields.
+enum SocketFamily {
+    INET = 1;   // IPv4 (RFC 791)
+    INET6 = 2;  // IPv6 (RFC 2460)
+}
+
+// SocketProtocol: the transport protocol of a socket. This specifies how to
+// interpret "transport port" fields.
+enum SocketProtocol {
+    UDP = 1;    // User Datagram Protocol (RFC 768)
+    TCP = 2;    // Transmission Control Protocol (RFC 793)
+}
+
+// Message: a wire-format (RFC 1035 section 4) DNS message and associated
+// metadata. Applications generating "Message" payloads should follow
+// certain requirements based on the MessageType, see below.
+message Message {
+
+    // There are eight types of "Message" defined that correspond to the
+    // four arrows in the following diagram, slightly modified from RFC 1035
+    // section 2:
+
+    //    +---------+               +----------+           +--------+
+    //    |         |     query     |          |   query   |        |
+    //    | Stub    |-SQ--------CQ->| Recursive|-RQ----AQ->| Auth.  |
+    //    | Resolver|               | Server   |           | Name   |
+    //    |         |<-SR--------CR-|          |<-RR----AR-| Server |
+    //    +---------+    response   |          |  response |        |
+    //                              +----------+           +--------+
+
+    // Each arrow has two Type values each, one for each "end" of each arrow,
+    // because these are considered to be distinct events. Each end of each
+    // arrow on the diagram above has been marked with a two-letter Type
+    // mnemonic. Clockwise from upper left, these mnemonic values are:
+    //
+    //   SQ:        STUB_QUERY
+    //   CQ:      CLIENT_QUERY
+    //   RQ:    RESOLVER_QUERY
+    //   AQ:        AUTH_QUERY
+    //   AR:        AUTH_RESPONSE
+    //   RR:    RESOLVER_RESPONSE
+    //   CR:      CLIENT_RESPONSE
+    //   SR:        STUB_RESPONSE
+
+    // Two additional types of "Message" have been defined for the
+    // "forwarding" case where an upstream DNS server is responsible for
+    // further recursion. These are not shown on the diagram above, but have
+    // the following mnemonic values:
+
+    //   FQ:   FORWARDER_QUERY
+    //   FR:   FORWARDER_RESPONSE
+
+    // The "Message" Type values are defined below.
+
+    enum Type {
+        // AUTH_QUERY is a DNS query message received from a resolver by an
+        // authoritative name server, from the perspective of the authorative
+        // name server.
+        AUTH_QUERY = 1;
+
+        // AUTH_RESPONSE is a DNS response message sent from an authoritative
+        // name server to a resolver, from the perspective of the authoritative
+        // name server.
+        AUTH_RESPONSE = 2;
+
+        // RESOLVER_QUERY is a DNS query message sent from a resolver to an
+        // authoritative name server, from the perspective of the resolver.
+        // Resolvers typically clear the RD (recursion desired) bit when
+        // sending queries.
+        RESOLVER_QUERY = 3;
+
+        // RESOLVER_RESPONSE is a DNS response message received from an
+        // authoritative name server by a resolver, from the perspective of
+        // the resolver.
+        RESOLVER_RESPONSE = 4;
+
+        // CLIENT_QUERY is a DNS query message sent from a client to a DNS
+        // server which is expected to perform further recursion, from the
+        // perspective of the DNS server. The client may be a stub resolver or
+        // forwarder or some other type of software which typically sets the RD
+        // (recursion desired) bit when querying the DNS server. The DNS server
+        // may be a simple forwarding proxy or it may be a full recursive
+        // resolver.
+        CLIENT_QUERY = 5;
+
+        // CLIENT_RESPONSE is a DNS response message sent from a DNS server to
+        // a client, from the perspective of the DNS server. The DNS server
+        // typically sets the RA (recursion available) bit when responding.
+        CLIENT_RESPONSE = 6;
+
+        // FORWARDER_QUERY is a DNS query message sent from a downstream DNS
+        // server to an upstream DNS server which is expected to perform
+        // further recursion, from the perspective of the downstream DNS
+        // server.
+        FORWARDER_QUERY = 7;
+
+        // FORWARDER_RESPONSE is a DNS response message sent from an upstream
+        // DNS server performing recursion to a downstream DNS server, from the
+        // perspective of the downstream DNS server.
+        FORWARDER_RESPONSE = 8;
+
+        // STUB_QUERY is a DNS query message sent from a stub resolver to a DNS
+        // server, from the perspective of the stub resolver.
+        STUB_QUERY = 9;
+
+        // STUB_RESPONSE is a DNS response message sent from a DNS server to a
+        // stub resolver, from the perspective of the stub resolver.
+        STUB_RESPONSE = 10;
+
+        // TOOL_QUERY is a DNS query message sent from a DNS software tool to a
+        // DNS server, from the perspective of the tool.
+        TOOL_QUERY = 11;
+
+        // TOOL_RESPONSE is a DNS response message received by a DNS software
+        // tool from a DNS server, from the perspective of the tool.
+        TOOL_RESPONSE = 12;
+    }
+
+    // One of the Type values described above.
+    required Type               type = 1;
+
+    // One of the SocketFamily values described above.
+    optional SocketFamily       socket_family = 2;
+
+    // One of the SocketProtocol values described above.
+    optional SocketProtocol     socket_protocol = 3;
+
+    // The network address of the message initiator.
+    // For SocketFamily INET, this field is 4 octets (IPv4 address).
+    // For SocketFamily INET6, this field is 16 octets (IPv6 address).
+    optional bytes              query_address = 4;
+
+    // The network address of the message responder.
+    // For SocketFamily INET, this field is 4 octets (IPv4 address).
+    // For SocketFamily INET6, this field is 16 octets (IPv6 address).
+    optional bytes              response_address = 5;
+
+    // The transport port of the message initiator.
+    // This is a 16-bit UDP or TCP port number, depending on SocketProtocol.
+    optional uint32             query_port = 6;
+
+    // The transport port of the message responder.
+    // This is a 16-bit UDP or TCP port number, depending on SocketProtocol.
+    optional uint32             response_port = 7;
+
+    // The time at which the DNS query message was sent or received, depending
+    // on whether this is an AUTH_QUERY, RESOLVER_QUERY, or CLIENT_QUERY.
+    // This is the number of seconds since the UNIX epoch.
+    optional uint64             query_time_sec = 8;
+
+    // The time at which the DNS query message was sent or received.
+    // This is the seconds fraction, expressed as a count of nanoseconds.
+    optional fixed32            query_time_nsec = 9;
+
+    // The initiator's original wire-format DNS query message, verbatim.
+    optional bytes              query_message = 10;
+
+    // The "zone" or "bailiwick" pertaining to the DNS query message.
+    // This is a wire-format DNS domain name.
+    optional bytes              query_zone = 11;
+
+    // The time at which the DNS response message was sent or received,
+    // depending on whether this is an AUTH_RESPONSE, RESOLVER_RESPONSE, or
+    // CLIENT_RESPONSE.
+    // This is the number of seconds since the UNIX epoch.
+    optional uint64             response_time_sec = 12;
+
+    // The time at which the DNS response message was sent or received.
+    // This is the seconds fraction, expressed as a count of nanoseconds.
+    optional fixed32            response_time_nsec = 13;
+
+    // The responder's original wire-format DNS response message, verbatim.
+    optional bytes              response_message = 14;
+}
+
+// All fields except for 'type' in the Message schema are optional.
+// It is recommended that at least the following fields be filled in for
+// particular types of Messages.
+
+// AUTH_QUERY:
+//      socket_family, socket_protocol
+//      query_address, query_port
+//      query_message
+//      query_time_sec, query_time_nsec
+
+// AUTH_RESPONSE:
+//      socket_family, socket_protocol
+//      query_address, query_port
+//      query_time_sec, query_time_nsec
+//      response_message
+//      response_time_sec, response_time_nsec
+
+// RESOLVER_QUERY:
+//      socket_family, socket_protocol
+//      query_message
+//      query_time_sec, query_time_nsec
+//      query_zone
+//      response_address, response_port
+
+// RESOLVER_RESPONSE:
+//      socket_family, socket_protocol
+//      query_time_sec, query_time_nsec
+//      query_zone
+//      response_address, response_port
+//      response_message
+//      response_time_sec, response_time_nsec
+
+// CLIENT_QUERY:
+//      socket_family, socket_protocol
+//      query_message
+//      query_time_sec, query_time_nsec
+
+// CLIENT_RESPONSE:
+//      socket_family, socket_protocol
+//      query_time_sec, query_time_nsec
+//      response_message
+//      response_time_sec, response_time_nsec
diff --git a/pdns/fstrm_logger.cc b/pdns/fstrm_logger.cc
new file mode 100644 (file)
index 0000000..fb232b3
--- /dev/null
@@ -0,0 +1,155 @@
+#include <unistd.h>
+#include <sys/un.h>
+
+#include "config.h"
+#include "fstrm_logger.hh"
+#include "dolog.hh"
+
+#define DNSTAP_CONTENT_TYPE            "protobuf:dnstap.Dnstap"
+
+#ifdef HAVE_FSTRM
+
+FrameStreamLogger::FrameStreamLogger(const int family, const std::string& address): d_family(family), d_address(address)
+{
+  fstrm_res res;
+
+  try {
+    d_fwopt = fstrm_writer_options_init();
+    if (!d_fwopt) {
+      throw std::runtime_error("FrameStreamLogger: fstrm_writer_options_init failed.");
+    }
+
+    res = fstrm_writer_options_add_content_type(d_fwopt, DNSTAP_CONTENT_TYPE, sizeof(DNSTAP_CONTENT_TYPE) - 1);
+    if (res != fstrm_res_success) {
+      throw std::runtime_error("FrameStreamLogger: fstrm_writer_options_add_content_type failed: " + std::to_string(res));
+    }
+
+    if (d_family == AF_UNIX) {
+      struct sockaddr_un local;
+      if (makeUNsockaddr(d_address, &local)) {
+        throw std::runtime_error("FrameStreamLogger: Unable to use '" + d_address + "', it is not a valid UNIX socket path.");
+      }
+
+      d_uwopt = fstrm_unix_writer_options_init();
+      if (!d_uwopt) {
+        throw std::runtime_error("FrameStreamLogger: fstrm_unix_writer_options_init failed.");
+      }
+
+      // void return, no error checking.
+      fstrm_unix_writer_options_set_socket_path(d_uwopt, d_address.c_str());
+
+      d_writer = fstrm_unix_writer_init(d_uwopt, d_fwopt);
+      if (!d_writer) {
+        throw std::runtime_error("FrameStreamLogger: fstrm_unix_writer_init() failed.");
+      }
+  #ifdef HAVE_FSTRM_TCP_WRITER_INIT
+    } else if (family == AF_INET) {
+      d_twopt = fstrm_tcp_writer_options_init();
+      if (!d_twopt) {
+        throw std::runtime_error("FrameStreamLogger: fstrm_tcp_writer_options_init failed.");
+      }
+
+      try {
+        ComboAddress ca(d_address);
+
+        // void return, no error checking.
+        fstrm_tcp_writer_options_set_socket_address(d_twopt, ca.toString().c_str());
+        fstrm_tcp_writer_options_set_socket_port(d_twopt, std::to_string(ca.getPort()).c_str());
+      } catch (PDNSException &e) {
+        throw std::runtime_error("FrameStreamLogger: Unable to use '" + d_address + "': " + e.reason);
+      }
+
+      d_writer = fstrm_tcp_writer_init(d_twopt, d_fwopt);
+      if (!d_writer) {
+        throw std::runtime_error("FrameStreamLogger: fstrm_tcp_writer_init() failed.");
+      }
+  #endif
+    } else {
+      throw std::runtime_error("FrameStreamLogger: family " + std::to_string(family) + " not supported");
+    }
+
+    d_iothropt = fstrm_iothr_options_init();
+    if (!d_iothropt) {
+      throw std::runtime_error("FrameStreamLogger: fstrm_iothr_options_init() failed.");
+    }
+
+    res = fstrm_iothr_options_set_queue_model(d_iothropt, FSTRM_IOTHR_QUEUE_MODEL_MPSC);
+    if (res != fstrm_res_success) {
+      throw std::runtime_error("FrameStreamLogger: fstrm_iothr_options_set_queue_model failed: " + std::to_string(res));
+    }
+
+    d_iothr = fstrm_iothr_init(d_iothropt, &d_writer);
+    if (!d_iothr) {
+      throw std::runtime_error("FrameStreamLogger: fstrm_iothr_init() failed.");
+    }
+
+    d_ioqueue = fstrm_iothr_get_input_queue(d_iothr);
+    if (!d_ioqueue) {
+      throw std::runtime_error("FrameStreamLogger: fstrm_iothr_get_input_queue() failed.");
+    }
+  } catch (std::runtime_error &e) {
+    this->cleanup();
+    throw;
+  }
+}
+
+void FrameStreamLogger::cleanup()
+{
+  if (d_iothr != nullptr) {
+    fstrm_iothr_destroy(&d_iothr);
+    d_iothr = nullptr;
+  }
+  if (d_iothropt != nullptr) {
+    fstrm_iothr_options_destroy(&d_iothropt);
+    d_iothropt = nullptr;
+  }
+  if (d_writer != nullptr) {
+    fstrm_writer_destroy(&d_writer);
+    d_writer = nullptr;
+  }
+  if (d_uwopt != nullptr) {
+    fstrm_unix_writer_options_destroy(&d_uwopt);
+    d_uwopt = nullptr;
+  }
+#ifdef HAVE_FSTRM_TCP_WRITER_INIT
+  if (d_twopt != nullptr) {
+    fstrm_tcp_writer_options_destroy(&d_twopt);
+    d_twopt = nullptr;
+  }
+#endif
+  if (d_fwopt != nullptr) {
+    fstrm_writer_options_destroy(&d_fwopt);
+    d_fwopt = nullptr;
+  }
+}
+
+FrameStreamLogger::~FrameStreamLogger()
+{
+  this->cleanup();
+}
+
+void FrameStreamLogger::queueData(const std::string& data)
+{
+  uint8_t *frame = (uint8_t*)malloc(data.length());
+  if (!frame) {
+    warnlog("FrameStreamLogger: cannot allocate memory for stream.");
+    return;
+  }
+  memcpy(frame, data.c_str(), data.length());
+
+  fstrm_res res;
+  res = fstrm_iothr_submit(d_iothr, d_ioqueue, frame, data.length(), fstrm_free_wrapper, nullptr);
+
+  if (res == fstrm_res_success) {
+    // Frame successfully queued.
+  } else if (res == fstrm_res_again) {
+    free(frame);
+    warnlog("FrameStreamLogger: queue full, dropping.");
+  } else {
+    // Permanent failure.
+    free(frame);
+    warnlog("FrameStreamLogger: submitting to queue failed.");
+  }
+}
+
+#endif /* HAVE_FSTRM */
diff --git a/pdns/fstrm_logger.hh b/pdns/fstrm_logger.hh
new file mode 100644 (file)
index 0000000..f661e71
--- /dev/null
@@ -0,0 +1,61 @@
+/*
+ * This file is part of PowerDNS or dnsdist.
+ * Copyright -- PowerDNS.COM B.V. and its contributors
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of version 2 of the GNU General Public License as
+ * published by the Free Software Foundation.
+ *
+ * In addition, for the avoidance of any doubt, permission is granted to
+ * link this program with OpenSSL and to (re)distribute the binaries
+ * produced as the result of such linking.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
+ */
+#pragma once
+#include "config.h"
+#include "remote_logger.hh"
+
+#ifdef HAVE_FSTRM
+
+#include <fstrm.h>
+#include <fstrm/iothr.h>
+#include <fstrm/unix_writer.h>
+#ifdef HAVE_FSTRM_TCP_WRITER_INIT
+#include <fstrm/tcp_writer.h>
+#endif
+
+class FrameStreamLogger : public RemoteLoggerInterface, boost::noncopyable
+{
+public:
+  FrameStreamLogger(int family, const std::string& address);
+  virtual ~FrameStreamLogger();
+  virtual void queueData(const std::string& data) override;
+  virtual std::string toString() override
+  {
+    return "FrameStreamLogger to " + d_address;
+  }
+private:
+  const int d_family;
+  const std::string d_address;
+  struct fstrm_iothr_queue *d_ioqueue{nullptr};
+  struct fstrm_writer_options *d_fwopt{nullptr};
+  struct fstrm_unix_writer_options *d_uwopt{nullptr};
+#ifdef HAVE_FSTRM_TCP_WRITER_INIT
+  struct fstrm_tcp_writer_options *d_twopt{nullptr};
+#endif
+  struct fstrm_writer *d_writer{nullptr};
+  struct fstrm_iothr_options *d_iothropt{nullptr};
+  struct fstrm_iothr *d_iothr{nullptr};
+
+  void cleanup();
+};
+
+#endif /* HAVE_FSTRM */
index e4d420243a059832678d808d7db0e1b8cb358a8a..6f72840c610f343b3d41840fad43aac0a916a501 100644 (file)
 
 #include "iputils.hh"
 
-class RemoteLogger
+class RemoteLoggerInterface
+{
+public:
+  virtual ~RemoteLoggerInterface() {};
+  virtual void queueData(const std::string& data) = 0;
+  virtual std::string toString() = 0;
+};
+
+class RemoteLogger : public RemoteLoggerInterface
 {
 public:
   RemoteLogger(const ComboAddress& remote, uint16_t timeout=2, uint64_t maxQueuedEntries=100, uint8_t reconnectWaitTime=1, bool asyncConnect=false);
-  ~RemoteLogger();
-  void queueData(const std::string& data);
-  std::string toString()
+  virtual ~RemoteLogger();
+  virtual void queueData(const std::string& data) override;
+  virtual std::string toString() override
   {
-    return d_remote.toStringWithPort();
+    return "RemoteLogger to " + d_remote.toStringWithPort();
   }
 private:
   void busyReconnectLoop();
index b0f70b87bd5b82fd446a1174a62d3f13d0f6a548..b58aec4ec401633e31f491d2787bf405dc421106 100644 (file)
@@ -7,4 +7,4 @@ DNSCryptResolver*
 .dnsdist_history
 .history
 dnsdist.log
-/dnsmessage_pb2.py
\ No newline at end of file
+/*_pb2.py
index f6803132a32e6789c75a1308cea9bc8bd41aec23..fdb8dcb47bec81b5124a446ce4b04d6499fb68cf 100755 (executable)
@@ -10,6 +10,7 @@ fi
 python -V
 pip install -r requirements.txt
 protoc -I=../pdns/ --python_out=. ../pdns/dnsmessage.proto
+protoc -I=../pdns/ --python_out=. ../pdns/dnstap.proto
 
 if [ -z "${DNSDISTBIN}" ]; then
     DNSDISTBIN=$(ls ../pdns/dnsdistdist/dnsdist-*/dnsdist)
diff --git a/regression-tests.dnsdist/test_Dnstap.py b/regression-tests.dnsdist/test_Dnstap.py
new file mode 100644 (file)
index 0000000..874579a
--- /dev/null
@@ -0,0 +1,515 @@
+#!/usr/bin/env python
+import Queue
+import threading
+import os
+import socket
+import struct
+import sys
+import time
+from dnsdisttests import DNSDistTest
+
+import dns
+import dnstap_pb2
+
+FSTRM_CONTROL_ACCEPT = 0x01
+FSTRM_CONTROL_START = 0x02
+FSTRM_CONTROL_STOP = 0x03
+FSTRM_CONTROL_READY = 0x04
+FSTRM_CONTROL_FINISH = 0x05
+
+
+def checkDnstapBase(testinstance, dnstap, protocol, initiator):
+    testinstance.assertTrue(dnstap)
+    testinstance.assertTrue(dnstap.HasField('identity'))
+    testinstance.assertEqual(dnstap.identity, 'a.server')
+    testinstance.assertTrue(dnstap.HasField('version'))
+    testinstance.assertIn('dnsdist ', dnstap.version)
+    testinstance.assertTrue(dnstap.HasField('type'))
+    testinstance.assertEqual(dnstap.type, dnstap.MESSAGE)
+    testinstance.assertTrue(dnstap.HasField('message'))
+    testinstance.assertTrue(dnstap.message.HasField('socket_protocol'))
+    testinstance.assertEqual(dnstap.message.socket_protocol, protocol)
+    testinstance.assertTrue(dnstap.message.HasField('socket_family'))
+    testinstance.assertEquals(dnstap.message.socket_family, dnstap_pb2.INET)
+    testinstance.assertTrue(dnstap.message.HasField('query_address'))
+    testinstance.assertEquals(socket.inet_ntop(socket.AF_INET, dnstap.message.query_address), initiator)
+    testinstance.assertTrue(dnstap.message.HasField('response_address'))
+    testinstance.assertEquals(socket.inet_ntop(socket.AF_INET, dnstap.message.response_address), initiator)
+    testinstance.assertTrue(dnstap.message.HasField('response_port'))
+    testinstance.assertEquals(dnstap.message.response_port, testinstance._dnsDistPort)
+  
+
+def checkDnstapQuery(testinstance, dnstap, protocol, query, initiator='127.0.0.1'):
+    testinstance.assertEquals(dnstap.message.type, dnstap_pb2.Message.CLIENT_QUERY)
+    checkDnstapBase(testinstance, dnstap, protocol, initiator)
+
+    testinstance.assertTrue(dnstap.message.HasField('query_time_sec'))
+    testinstance.assertTrue(dnstap.message.HasField('query_time_nsec'))
+
+    testinstance.assertTrue(dnstap.message.HasField('query_message'))
+    wire_message = dns.message.from_wire(dnstap.message.query_message)
+    testinstance.assertEqual(wire_message, query)
+
+
+def checkDnstapExtra(testinstance, dnstap, expected):
+    testinstance.assertTrue(dnstap.HasField('extra'))
+    testinstance.assertEqual(dnstap.extra, expected)
+
+
+def checkDnstapNoExtra(testinstance, dnstap):
+    testinstance.assertFalse(dnstap.HasField('extra'))
+
+
+def checkDnstapResponse(testinstance, dnstap, protocol, response, initiator='127.0.0.1'):
+    testinstance.assertEquals(dnstap.message.type, dnstap_pb2.Message.CLIENT_RESPONSE)
+    checkDnstapBase(testinstance, dnstap, protocol, initiator)
+
+    testinstance.assertTrue(dnstap.message.HasField('query_time_sec'))
+    testinstance.assertTrue(dnstap.message.HasField('query_time_nsec'))
+
+    testinstance.assertTrue(dnstap.message.HasField('response_time_sec'))
+    testinstance.assertTrue(dnstap.message.HasField('response_time_nsec'))
+
+    testinstance.assertTrue(dnstap.message.response_time_sec > dnstap.message.query_time_sec or \
+        dnstap.message.response_time_nsec > dnstap.message.query_time_nsec)
+
+    testinstance.assertTrue(dnstap.message.HasField('response_message'))
+    wire_message = dns.message.from_wire(dnstap.message.response_message)
+    testinstance.assertEqual(wire_message, response)
+
+
+class TestDnstapOverRemoteLogger(DNSDistTest):
+    _remoteLoggerServerPort = 4242
+    _remoteLoggerQueue = Queue.Queue()
+    _remoteLoggerCounter = 0
+    _config_params = ['_testServerPort', '_remoteLoggerServerPort']
+    _config_template = """
+    extrasmn = newSuffixMatchNode()
+    extrasmn:add(newDNSName('extra.dnstap.tests.powerdns.com.'))
+
+    luatarget = 'lua.dnstap.tests.powerdns.com.'
+
+    function alterDnstapQuery(dq, tap)
+      if extrasmn:check(dq.qname) then
+        tap:setExtra("Type,Query")
+      end
+    end
+
+    function alterDnstapResponse(dq, tap)
+      if extrasmn:check(dq.qname) then
+        tap:setExtra("Type,Response")
+      end
+    end
+
+    function luaFunc(dq)
+      dq.dh:setQR(true)
+      dq.dh:setRCode(dnsdist.NXDOMAIN)
+      return DNSAction.None, ""
+    end
+
+    newServer{address="127.0.0.1:%s", useClientSubnet=true}
+    rl = newRemoteLogger('127.0.0.1:%s')
+
+    addAction(AllRule(), DnstapLogAction("a.server", rl, alterDnstapQuery))                            -- Send dnstap message before lookup
+
+    addAction(luatarget, LuaAction(luaFunc))                           -- Send dnstap message before lookup
+
+    addResponseAction(AllRule(), DnstapLogResponseAction("a.server", rl, alterDnstapResponse)) -- Send dnstap message after lookup
+
+    addAction('spoof.dnstap.tests.powerdns.com.', SpoofAction("192.0.2.1"))
+    """
+
+    @classmethod
+    def RemoteLoggerListener(cls, port):
+        sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+        sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1)
+        try:
+            sock.bind(("127.0.0.1", port))
+        except socket.error as e:
+            print("Error binding in the protbuf listener: %s" % str(e))
+            sys.exit(1)
+
+        sock.listen(100)
+        while True:
+            (conn, _) = sock.accept()
+            data = None
+            while True:
+                data = conn.recv(2)
+                if not data:
+                    break
+                (datalen,) = struct.unpack("!H", data)
+                data = conn.recv(datalen)
+                if not data:
+                    break
+
+                cls._remoteLoggerQueue.put(data, True, timeout=2.0)
+
+            conn.close()
+        sock.close()
+
+    @classmethod
+    def startResponders(cls):
+        DNSDistTest.startResponders()
+
+        cls._remoteLoggerListener = threading.Thread(name='RemoteLogger Listener', target=cls.RemoteLoggerListener, args=[cls._remoteLoggerServerPort])
+        cls._remoteLoggerListener.setDaemon(True)
+        cls._remoteLoggerListener.start()
+
+    def getFirstDnstap(self):
+        self.assertFalse(self._remoteLoggerQueue.empty())
+        data = self._remoteLoggerQueue.get(False)
+        self.assertTrue(data)
+        dnstap = dnstap_pb2.Dnstap()
+        dnstap.ParseFromString(data)
+        return dnstap
+
+    def testDnstap(self):
+        """
+        Dnstap: Send query and responses packed in dnstap to a remotelogger server
+        """
+        name = 'query.dnstap.tests.powerdns.com.'
+
+        target = 'target.dnstap.tests.powerdns.com.'
+        query = dns.message.make_query(name, 'A', 'IN')
+        response = dns.message.make_response(query)
+
+        rrset = dns.rrset.from_text(name,
+                                    3600,
+                                    dns.rdataclass.IN,
+                                    dns.rdatatype.CNAME,
+                                    target)
+        response.answer.append(rrset)
+
+        rrset = dns.rrset.from_text(target,
+                                    3600,
+                                    dns.rdataclass.IN,
+                                    dns.rdatatype.A,
+                                    '127.0.0.1')
+        response.answer.append(rrset)
+
+        (receivedQuery, receivedResponse) = self.sendUDPQuery(query, response)
+        self.assertTrue(receivedQuery)
+        self.assertTrue(receivedResponse)
+        receivedQuery.id = query.id
+        self.assertEquals(query, receivedQuery)
+        self.assertEquals(response, receivedResponse)
+
+        # give the dnstap messages time to get here
+        time.sleep(1)
+
+        # check the dnstap message corresponding to the UDP query
+        dnstap = self.getFirstDnstap()
+
+        checkDnstapQuery(self, dnstap, dnstap_pb2.UDP, query)
+        checkDnstapNoExtra(self, dnstap)
+
+        # check the dnstap message corresponding to the UDP response
+        dnstap = self.getFirstDnstap()
+        checkDnstapResponse(self, dnstap, dnstap_pb2.UDP, response)
+        checkDnstapNoExtra(self, dnstap)
+
+        (receivedQuery, receivedResponse) = self.sendTCPQuery(query, response)
+        self.assertTrue(receivedQuery)
+        self.assertTrue(receivedResponse)
+        receivedQuery.id = query.id
+        self.assertEquals(query, receivedQuery)
+        self.assertEquals(response, receivedResponse)
+
+        # give the dnstap messages time to get here
+        time.sleep(1)
+
+        # check the dnstap message corresponding to the TCP query
+        dnstap = self.getFirstDnstap()
+
+        checkDnstapQuery(self, dnstap, dnstap_pb2.TCP, query)
+        checkDnstapNoExtra(self, dnstap)
+
+        # check the dnstap message corresponding to the TCP response
+        dnstap = self.getFirstDnstap()
+        checkDnstapResponse(self, dnstap, dnstap_pb2.TCP, response)
+        checkDnstapNoExtra(self, dnstap)
+
+    def testDnstapExtra(self):
+        """
+        DnstapExtra: Send query and responses packed in dnstap to a remotelogger server. Extra data is filled out.
+        """
+        name = 'extra.dnstap.tests.powerdns.com.'
+
+        target = 'target.dnstap.tests.powerdns.com.'
+        query = dns.message.make_query(name, 'A', 'IN')
+        response = dns.message.make_response(query)
+
+        rrset = dns.rrset.from_text(name,
+                                    3600,
+                                    dns.rdataclass.IN,
+                                    dns.rdatatype.CNAME,
+                                    target)
+        response.answer.append(rrset)
+
+        rrset = dns.rrset.from_text(target,
+                                    3600,
+                                    dns.rdataclass.IN,
+                                    dns.rdatatype.A,
+                                    '127.0.0.1')
+        response.answer.append(rrset)
+
+        (receivedQuery, receivedResponse) = self.sendUDPQuery(query, response)
+        self.assertTrue(receivedQuery)
+        self.assertTrue(receivedResponse)
+        receivedQuery.id = query.id
+        self.assertEquals(query, receivedQuery)
+        self.assertEquals(response, receivedResponse)
+
+        # give the dnstap messages time to get here
+        time.sleep(1)
+
+        # check the dnstap message corresponding to the UDP query
+        dnstap = self.getFirstDnstap()
+        checkDnstapQuery(self, dnstap, dnstap_pb2.UDP, query)
+        checkDnstapExtra(self, dnstap, "Type,Query")
+
+        # check the dnstap message corresponding to the UDP response
+        dnstap = self.getFirstDnstap()
+        checkDnstapResponse(self, dnstap, dnstap_pb2.UDP, response)
+        checkDnstapExtra(self, dnstap, "Type,Response")
+
+        (receivedQuery, receivedResponse) = self.sendTCPQuery(query, response)
+        self.assertTrue(receivedQuery)
+        self.assertTrue(receivedResponse)
+        receivedQuery.id = query.id
+        self.assertEquals(query, receivedQuery)
+        self.assertEquals(response, receivedResponse)
+
+        # give the dnstap messages time to get here
+        time.sleep(1)
+
+        # check the dnstap message corresponding to the TCP query
+        dnstap = self.getFirstDnstap()
+        checkDnstapQuery(self, dnstap, dnstap_pb2.TCP, query)
+        checkDnstapExtra(self, dnstap, "Type,Query")
+
+        # check the dnstap message corresponding to the TCP response
+        dnstap = self.getFirstDnstap()
+        checkDnstapResponse(self, dnstap, dnstap_pb2.TCP, response)
+        checkDnstapExtra(self, dnstap, "Type,Response")
+
+
+def fstrm_get_control_frame_type(data):
+    (t,) = struct.unpack("!L", data[0:4])
+    return t
+
+
+def fstrm_make_control_frame_reply(cft, data):
+    if cft == FSTRM_CONTROL_READY:
+        # Reply with ACCEPT frame and content-type
+        contenttype = 'protobuf:dnstap.Dnstap'
+        frame = struct.pack('!LLL', FSTRM_CONTROL_ACCEPT, 1,
+                            len(contenttype)) + contenttype
+        buf = struct.pack("!LL", 0, len(frame)) + frame
+        return buf
+    elif cft == FSTRM_CONTROL_START:
+        return None
+    else:
+        raise Exception('unhandled control frame ' + cft)
+
+
+def fstrm_read_and_dispatch_control_frame(conn):
+    data = conn.recv(4)
+    if not data:
+        raise Exception('length of control frame payload could not be read')
+    (datalen,) = struct.unpack("!L", data)
+    data = conn.recv(datalen)
+    cft = fstrm_get_control_frame_type(data)
+    reply = fstrm_make_control_frame_reply(cft, data)
+    if reply:
+        conn.send(reply)
+    return cft
+
+
+def fstrm_handle_bidir_connection(conn, on_data):
+    data = None
+    while True:
+        data = conn.recv(4)
+        if not data:
+            break
+        (datalen,) = struct.unpack("!L", data)
+        if datalen == 0:
+            # control frame length follows
+            cft = fstrm_read_and_dispatch_control_frame(conn)
+            if cft == FSTRM_CONTROL_STOP:
+                break
+        else:
+            # data frame
+            data = conn.recv(datalen)
+            if not data:
+                break
+
+            on_data(data)
+
+
+class TestDnstapOverFrameStreamUnixLogger(DNSDistTest):
+    _fstrmLoggerAddress = '/tmp/fslutest.sock'
+    _fstrmLoggerQueue = Queue.Queue()
+    _fstrmLoggerCounter = 0
+    _config_params = ['_testServerPort', '_fstrmLoggerAddress']
+    _config_template = """
+    newServer{address="127.0.0.1:%s", useClientSubnet=true}
+    fslu = newFrameStreamUnixLogger('%s')
+
+    addAction(AllRule(), DnstapLogAction("a.server", fslu))
+    """
+
+    @classmethod
+    def FrameStreamUnixListener(cls, path):
+        try:
+            os.unlink(path)
+        except OSError:
+            pass  # Assume file not found
+        sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
+        try:
+            sock.bind(path)
+        except socket.error as e:
+            print("Error binding in the framestream listener: %s" % str(e))
+            sys.exit(1)
+
+        sock.listen(100)
+        while True:
+            (conn, _) = sock.accept()
+            fstrm_handle_bidir_connection(conn, lambda data: \
+                cls._fstrmLoggerQueue.put(data, True, timeout=2.0))
+            conn.close()
+        sock.close()
+
+    @classmethod
+    def startResponders(cls):
+        DNSDistTest.startResponders()
+
+        cls._fstrmLoggerListener = threading.Thread(name='FrameStreamUnixListener', target=cls.FrameStreamUnixListener, args=[cls._fstrmLoggerAddress])
+        cls._fstrmLoggerListener.setDaemon(True)
+        cls._fstrmLoggerListener.start()
+
+    def getFirstDnstap(self):
+        data = self._fstrmLoggerQueue.get(True, timeout=2.0)
+        self.assertTrue(data)
+        dnstap = dnstap_pb2.Dnstap()
+        dnstap.ParseFromString(data)
+        return dnstap
+
+    def testDnstapOverFrameStreamUnix(self):
+        """
+        Dnstap: Send query packed in dnstap to a unix socket fstrmlogger server
+        """
+        name = 'query.dnstap.tests.powerdns.com.'
+
+        target = 'target.dnstap.tests.powerdns.com.'
+        query = dns.message.make_query(name, 'A', 'IN')
+        response = dns.message.make_response(query)
+
+        rrset = dns.rrset.from_text(name,
+                                    3600,
+                                    dns.rdataclass.IN,
+                                    dns.rdatatype.CNAME,
+                                    target)
+        response.answer.append(rrset)
+
+        rrset = dns.rrset.from_text(target,
+                                    3600,
+                                    dns.rdataclass.IN,
+                                    dns.rdatatype.A,
+                                    '127.0.0.1')
+        response.answer.append(rrset)
+
+        (receivedQuery, receivedResponse) = self.sendUDPQuery(query, response)
+        self.assertTrue(receivedQuery)
+        self.assertTrue(receivedResponse)
+        receivedQuery.id = query.id
+        self.assertEquals(query, receivedQuery)
+        self.assertEquals(response, receivedResponse)
+
+        # check the dnstap message corresponding to the UDP query
+        dnstap = self.getFirstDnstap()
+
+        checkDnstapQuery(self, dnstap, dnstap_pb2.UDP, query)
+        checkDnstapNoExtra(self, dnstap)
+
+
+class TestDnstapOverFrameStreamTcpLogger(DNSDistTest):
+    _fstrmLoggerPort = 4000
+    _fstrmLoggerQueue = Queue.Queue()
+    _fstrmLoggerCounter = 0
+    _config_params = ['_testServerPort', '_fstrmLoggerPort']
+    _config_template = """
+    newServer{address="127.0.0.1:%s", useClientSubnet=true}
+    fslu = newFrameStreamTcpLogger('127.0.0.1:%s')
+
+    addAction(AllRule(), DnstapLogAction("a.server", fslu))
+    """
+
+    @classmethod
+    def FrameStreamUnixListener(cls, port):
+        sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+        try:
+            sock.bind(("127.0.0.1", port))
+        except socket.error as e:
+            print("Error binding in the framestream listener: %s" % str(e))
+            sys.exit(1)
+
+        sock.listen(100)
+        while True:
+            (conn, _) = sock.accept()
+            fstrm_handle_bidir_connection(conn, lambda data: \
+                cls._fstrmLoggerQueue.put(data, True, timeout=2.0))
+            conn.close()
+        sock.close()
+
+    @classmethod
+    def startResponders(cls):
+        DNSDistTest.startResponders()
+
+        cls._fstrmLoggerListener = threading.Thread(name='FrameStreamUnixListener', target=cls.FrameStreamUnixListener, args=[cls._fstrmLoggerPort])
+        cls._fstrmLoggerListener.setDaemon(True)
+        cls._fstrmLoggerListener.start()
+
+    def getFirstDnstap(self):
+        data = self._fstrmLoggerQueue.get(True, timeout=2.0)
+        self.assertTrue(data)
+        dnstap = dnstap_pb2.Dnstap()
+        dnstap.ParseFromString(data)
+        return dnstap
+
+    def testDnstapOverFrameStreamTcp(self):
+        """
+        Dnstap: Send query packed in dnstap to a tcp socket fstrmlogger server
+        """
+        name = 'query.dnstap.tests.powerdns.com.'
+
+        target = 'target.dnstap.tests.powerdns.com.'
+        query = dns.message.make_query(name, 'A', 'IN')
+        response = dns.message.make_response(query)
+
+        rrset = dns.rrset.from_text(name,
+                                    3600,
+                                    dns.rdataclass.IN,
+                                    dns.rdatatype.CNAME,
+                                    target)
+        response.answer.append(rrset)
+
+        rrset = dns.rrset.from_text(target,
+                                    3600,
+                                    dns.rdataclass.IN,
+                                    dns.rdatatype.A,
+                                    '127.0.0.1')
+        response.answer.append(rrset)
+
+        (receivedQuery, receivedResponse) = self.sendUDPQuery(query, response)
+        self.assertTrue(receivedQuery)
+        self.assertTrue(receivedResponse)
+        receivedQuery.id = query.id
+        self.assertEquals(query, receivedQuery)
+        self.assertEquals(response, receivedResponse)
+
+        # check the dnstap message corresponding to the UDP query
+        dnstap = self.getFirstDnstap()
+
+        checkDnstapQuery(self, dnstap, dnstap_pb2.UDP, query)
+        checkDnstapNoExtra(self, dnstap)