#include "gettime.hh"
#include "dnstap.hh"
-DnstapMessage::DnstapMessage(const std::string& identity, const ComboAddress* requestor, const ComboAddress* responder, bool isTCP, const char* packet, const size_t len, const struct timespec* queryTime, const struct timespec* responseTime)
+DnstapMessage::DnstapMessage(const std::string& identity, const ComboAddress* requestor, const ComboAddress* responder, bool isTCP, const char* packet, const size_t len, const struct timespec* queryTime, const struct timespec* responseTime, bool recursor)
{
#ifdef HAVE_PROTOBUF
const struct dnsheader* dh = reinterpret_cast<const struct dnsheader*>(packet);
dnstap::Message* message = proto_message.mutable_message();
- message->set_type(!dh->qr ? dnstap::Message_Type_CLIENT_QUERY : dnstap::Message_Type_CLIENT_RESPONSE);
-
- message->set_socket_family(requestor->sin4.sin_family == AF_INET ? dnstap::INET : dnstap::INET6);
- message->set_socket_protocol(isTCP ? dnstap::TCP : dnstap::UDP);
-
- if (requestor->sin4.sin_family == AF_INET) {
- message->set_query_address(&requestor->sin4.sin_addr.s_addr, sizeof(requestor->sin4.sin_addr.s_addr));
- }
- else if (requestor->sin4.sin_family == AF_INET6) {
- message->set_query_address(&requestor->sin6.sin6_addr.s6_addr, sizeof(requestor->sin6.sin6_addr.s6_addr));
+ if (recursor) {
+ message->set_type(!dh->qr ? dnstap::Message_Type_RESOLVER_QUERY : dnstap::Message_Type_RESOLVER_RESPONSE);
+ } else {
+ message->set_type(!dh->qr ? dnstap::Message_Type_CLIENT_QUERY : dnstap::Message_Type_CLIENT_RESPONSE);
}
- message->set_query_port(ntohs(requestor->sin4.sin_port));
+ message->set_socket_protocol(isTCP ? dnstap::TCP : dnstap::UDP);
- if (requestor->sin4.sin_family == AF_INET) {
- message->set_response_address(&responder->sin4.sin_addr.s_addr, sizeof(responder->sin4.sin_addr.s_addr));
+ if (requestor != nullptr) {
+ message->set_socket_family(requestor->sin4.sin_family == AF_INET ? dnstap::INET : dnstap::INET6);
+ if (requestor->sin4.sin_family == AF_INET) {
+ message->set_query_address(&requestor->sin4.sin_addr.s_addr, sizeof(requestor->sin4.sin_addr.s_addr));
+ } else if (requestor->sin4.sin_family == AF_INET6) {
+ message->set_query_address(&requestor->sin6.sin6_addr.s6_addr, sizeof(requestor->sin6.sin6_addr.s6_addr));
+ }
+ message->set_query_port(ntohs(requestor->sin4.sin_port));
}
- else if (requestor->sin4.sin_family == AF_INET6) {
- message->set_response_address(&responder->sin6.sin6_addr.s6_addr, sizeof(responder->sin6.sin6_addr.s6_addr));
+ if (responder != nullptr) {
+ message->set_socket_family(responder->sin4.sin_family == AF_INET ? dnstap::INET : dnstap::INET6);
+ if (responder->sin4.sin_family == AF_INET) {
+ message->set_response_address(&responder->sin4.sin_addr.s_addr, sizeof(responder->sin4.sin_addr.s_addr));
+ } else if (responder->sin4.sin_family == AF_INET6) {
+ message->set_response_address(&responder->sin6.sin6_addr.s6_addr, sizeof(responder->sin6.sin6_addr.s6_addr));
+ }
+ message->set_response_port(ntohs(responder->sin4.sin_port));
}
- message->set_response_port(ntohs(responder->sin4.sin_port));
-
if (queryTime != nullptr) {
message->set_query_time_sec(queryTime->tv_sec);
message->set_query_time_nsec(queryTime->tv_nsec);
class DnstapMessage
{
public:
- DnstapMessage(const std::string& identity, const ComboAddress* requestor, const ComboAddress* responder, bool isTCP, const char* packet, const size_t len, const struct timespec* queryTime, const struct timespec* responseTime);
+ DnstapMessage(const std::string& identity, const ComboAddress* requestor, const ComboAddress* responder, bool isTCP, const char* packet, const size_t len, const struct timespec* queryTime, const struct timespec* responseTime, bool recursor = false);
void serialize(std::string& data) const;
std::string toDebugString() const;
#ifdef HAVE_FSTRM
-FrameStreamLogger::FrameStreamLogger(const int family, const std::string& address, bool connect): d_family(family), d_address(address)
+FrameStreamLogger::FrameStreamLogger(const int family, const std::string& address, bool connect,
+ const std::unordered_map<string,unsigned>& options): d_family(family), d_address(address)
{
fstrm_res res;
throw std::runtime_error("FrameStreamLogger: fstrm_iothr_options_set_queue_model failed: " + std::to_string(res));
}
+ if (options.find("bufferHint") != options.end() && options.at("bufferHint")) {
+ res = fstrm_iothr_options_set_buffer_hint(d_iothropt, options.at("bufferHint"));
+ if (res != fstrm_res_success) {
+ throw std::runtime_error("FrameStreamLogger: fstrm_iothr_options_set_buffer_hint failed: " + std::to_string(res));
+ }
+ }
+ if (options.find("flushTimeout") != options.end() && options.at("flushTimeout")) {
+ res = fstrm_iothr_options_set_flush_timeout(d_iothropt, options.at("flushTimeout"));
+ if (res != fstrm_res_success) {
+ throw std::runtime_error("FrameStreamLogger: fstrm_iothr_options_set_flush_timeout failed: " + std::to_string(res));
+ }
+ }
+ if (options.find("inputQueueSize") != options.end() && options.at("inputQueueSize")) {
+ res = fstrm_iothr_options_set_input_queue_size(d_iothropt, options.at("inputQueueSize"));
+ if (res != fstrm_res_success) {
+ throw std::runtime_error("FrameStreamLogger: fstrm_iothr_options_set_input_queue_size failed: " + std::to_string(res));
+ }
+ }
+ if (options.find("outputQueueSize") != options.end() && options.at("outputQueueSize")) {
+ res = fstrm_iothr_options_set_output_queue_size(d_iothropt, options.at("outputQueueSize"));
+ if (res != fstrm_res_success) {
+ throw std::runtime_error("FrameStreamLogger: fstrm_iothr_options_set_output_queue_size failed: " + std::to_string(res));
+ }
+ }
+ if (options.find("queueNotifyThreshold") != options.end() && options.at("queueNotifyThreshold")) {
+ res = fstrm_iothr_options_set_queue_notify_threshold(d_iothropt, options.at("queueNotifyThreshold"));
+ if (res != fstrm_res_success) {
+ throw std::runtime_error("FrameStreamLogger: fstrm_iothr_options_set_queue_notify_threshold failed: " + std::to_string(res));
+ }
+ }
+ if (options.find("setReopenInterval") != options.end() && options.at("setReopenInterval")) {
+ res = fstrm_iothr_options_set_reopen_interval(d_iothropt, options.at("setReopenInterval"));
+ if (res != fstrm_res_success) {
+ throw std::runtime_error("FrameStreamLogger: fstrm_iothr_options_set_reopen_interval failed: " + std::to_string(res));
+ }
+ }
+
+
if (connect) {
d_iothr = fstrm_iothr_init(d_iothropt, &d_writer);
if (!d_iothr) {
#ifdef HAVE_FSTRM
+#include <unordered_map>
#include <fstrm.h>
#include <fstrm/iothr.h>
#include <fstrm/unix_writer.h>
class FrameStreamLogger : public RemoteLoggerInterface, boost::noncopyable
{
public:
- FrameStreamLogger(int family, const std::string& address, bool connect);
+ FrameStreamLogger(int family, const std::string& address, bool connect, const std::unordered_map<string,unsigned>& options = std::unordered_map<string,unsigned>());
virtual ~FrameStreamLogger();
virtual void queueData(const std::string& data) override;
virtual std::string toString() const override
{
return "FrameStreamLogger to " + d_address;
}
+ bool logQueries(void) const { return d_logQueries; }
+ bool logResponses(void) const { return d_logResponses; }
+ void setLogQueries(bool flag) { d_logQueries = flag; }
+ void setLogResponses(bool flag) { d_logResponses = flag; }
+
private:
+
const int d_family;
const std::string d_address;
struct fstrm_iothr_queue *d_ioqueue{nullptr};
struct fstrm_iothr *d_iothr{nullptr};
void cleanup();
+
+ bool d_logQueries{true};
+ bool d_logResponses{true};
};
#endif /* HAVE_FSTRM */
#include "uuid-utils.hh"
+#ifdef HAVE_FSTRM
+#include "dnstap.hh"
+#include "fstrm_logger.hh"
+bool g_syslog;
+
+static bool isEnabledForQueries(const std::shared_ptr<std::vector<std::unique_ptr<RemoteLoggerInterface>>>& fstreamLoggers)
+{
+ if (fstreamLoggers == nullptr) {
+ return false;
+ }
+ for (auto& logger : *fstreamLoggers) {
+ // We know this is safe since fstreamLoggers is filled with FrameStreamLogger objects only
+ auto fsl = static_cast<const FrameStreamLogger*>(logger.get());
+ if (fsl->logQueries()) {
+ return true;
+ }
+ }
+ return false;
+}
+
+static void logFstreamQuery(const std::shared_ptr<std::vector<std::unique_ptr<RemoteLoggerInterface>>>& fstreamLoggers, const struct timeval &queryTime, const ComboAddress& ip, bool doTCP, const vector<uint8_t>& packet)
+{
+ if (fstreamLoggers == nullptr)
+ return;
+
+ struct timespec ts;
+ TIMEVAL_TO_TIMESPEC(&queryTime, &ts);
+ DnstapMessage message(SyncRes::s_serverID, nullptr, &ip, doTCP, reinterpret_cast<const char*>(&*packet.begin()), packet.size(), &ts, nullptr, true);
+ std::string str;
+ message.serialize(str);
+
+ for (auto& logger : *fstreamLoggers) {
+ logger->queueData(str);
+ }
+}
+
+static bool isEnabledForResponses(const std::shared_ptr<std::vector<std::unique_ptr<RemoteLoggerInterface>>>& fstreamLoggers)
+{
+ if (fstreamLoggers == nullptr) {
+ return false;
+ }
+ for (auto& logger : *fstreamLoggers) {
+ // We know this is safe since fstreamLoggers is filled with FrameStreamLogger objects only
+ auto fsl = static_cast<const FrameStreamLogger*>(logger.get());
+ if (fsl->logResponses()) {
+ return true;
+ }
+ }
+ return false;
+}
+
+static void logFstreamResponse(const std::shared_ptr<std::vector<std::unique_ptr<RemoteLoggerInterface>>>& fstreamLoggers, const ComboAddress& ip, bool doTCP, const std::string& packet, const struct timeval& queryTime, const struct timeval& replyTime)
+{
+ if (fstreamLoggers == nullptr)
+ return;
+
+ struct timespec ts1, ts2;
+ TIMEVAL_TO_TIMESPEC(&queryTime, &ts1);
+ TIMEVAL_TO_TIMESPEC(&replyTime, &ts2);
+ DnstapMessage message(SyncRes::s_serverID, nullptr, &ip, doTCP, static_cast<const char*>(&*packet.begin()), packet.size(), &ts1, &ts2, true);
+ std::string str;
+ message.serialize(str);
+
+ for (auto& logger : *fstreamLoggers) {
+ logger->queueData(str);
+ }
+}
+
+#endif // HAVE_FSTRM
+
static void logOutgoingQuery(const std::shared_ptr<std::vector<std::unique_ptr<RemoteLogger>>>& outgoingLoggers, boost::optional<const boost::uuids::uuid&> initialRequestId, const boost::uuids::uuid& uuid, const ComboAddress& ip, const DNSName& domain, int type, uint16_t qid, bool doTCP, size_t bytes, boost::optional<Netmask>& srcmask)
{
if(!outgoingLoggers)
/** lwr is only filled out in case 1 was returned, and even when returning 1 for 'success', lwr might contain DNS errors
Never throws!
*/
-int asyncresolve(const ComboAddress& ip, const DNSName& domain, int type, bool doTCP, bool sendRDQuery, int EDNS0Level, struct timeval* now, boost::optional<Netmask>& srcmask, boost::optional<const ResolveContext&> context, const std::shared_ptr<std::vector<std::unique_ptr<RemoteLogger>>>& outgoingLoggers, const std::set<uint16_t>& exportTypes, LWResult *lwr, bool* chained)
+int asyncresolve(const ComboAddress& ip, const DNSName& domain, int type, bool doTCP, bool sendRDQuery, int EDNS0Level, struct timeval* now, boost::optional<Netmask>& srcmask, boost::optional<const ResolveContext&> context, const std::shared_ptr<std::vector<std::unique_ptr<RemoteLogger>>>& outgoingLoggers, const std::shared_ptr<std::vector<std::unique_ptr<RemoteLoggerInterface>>>& fstrmLoggers, const std::set<uint16_t>& exportTypes, LWResult *lwr, bool* chained)
{
size_t len;
size_t bufsize=g_outgoingEDNSBufsize;
uuid = getUniqueID();
logOutgoingQuery(outgoingLoggers, context ? context->d_initialRequestId : boost::none, uuid, ip, domain, type, qid, doTCP, vpacket.size(), srcmask);
}
-#endif
+#endif /* HAVE_PROTOBUF */
+#ifdef HAVE_FSTRM
+ if (isEnabledForQueries(fstrmLoggers)) {
+ logFstreamQuery(fstrmLoggers, queryTime, ip, doTCP, vpacket);
+ }
+#endif /* HAVE_FSTRM */
srcmask = boost::none; // this is also our return value, even if EDNS0Level == 0
return ret;
buf.resize(len);
+
+#ifdef HAVE_FSTRM
+ if (isEnabledForResponses(fstrmLoggers)) {
+ logFstreamResponse(fstrmLoggers, ip, doTCP, buf, queryTime, *now);
+ }
+#endif /* HAVE_FSTRM */
+
lwr->d_records.clear();
try {
lwr->d_tcbit=0;
#include "remote_logger.hh"
#include "resolve-context.hh"
+
int asendto(const char *data, size_t len, int flags, const ComboAddress& ip, uint16_t id,
const DNSName& domain, uint16_t qtype, int* fd);
int arecvfrom(std::string& packet, int flags, const ComboAddress& ip, size_t *d_len, uint16_t id,
bool d_haveEDNS{false};
};
-int asyncresolve(const ComboAddress& ip, const DNSName& domain, int type, bool doTCP, bool sendRDQuery, int EDNS0Level, struct timeval* now, boost::optional<Netmask>& srcmask, boost::optional<const ResolveContext&> context, const std::shared_ptr<std::vector<std::unique_ptr<RemoteLogger>>>& outgoingLoggers, const std::set<uint16_t>& exportTypes, LWResult* res, bool* chained);
+int asyncresolve(const ComboAddress& ip, const DNSName& domain, int type, bool doTCP, bool sendRDQuery, int EDNS0Level, struct timeval* now, boost::optional<Netmask>& srcmask, boost::optional<const ResolveContext&> context, const std::shared_ptr<std::vector<std::unique_ptr<RemoteLogger>>>& outgoingLoggers, const std::shared_ptr<std::vector<std::unique_ptr<RemoteLoggerInterface>>>& fstrmLoggers, const std::set<uint16_t>& exportTypes, LWResult* res, bool* chained);
#endif // PDNS_LWRES_HH
#ifdef HAVE_PROTOBUF
#include "uuid-utils.hh"
-#endif
+#endif /* HAVE_PROTOBUF */
#include "xpf.hh"
static thread_local uint64_t t_outgoingProtobufServersGeneration;
#endif /* HAVE_PROTOBUF */
+#ifdef HAVE_FSTRM
+static thread_local std::shared_ptr<std::vector<std::unique_ptr<RemoteLoggerInterface>>> t_frameStreamServers{nullptr};
+static thread_local uint64_t t_frameStreamServersGeneration;
+#endif /* HAVE_FSTRM */
+
thread_local std::unique_ptr<MT_t> MT; // the big MTasker
thread_local std::unique_ptr<MemRecursorCache> t_RC;
thread_local std::unique_ptr<RecursorPacketCache> t_packetCache;
return true;
}
+
+#ifdef HAVE_FSTRM
+
+static std::shared_ptr<std::vector<std::unique_ptr<RemoteLoggerInterface>>> startFrameStreamServers(const FrameStreamExportConfig& config)
+{
+ auto result = std::make_shared<std::vector<std::unique_ptr<RemoteLoggerInterface>>>();
+
+ for (const auto& server : config.servers) {
+ try {
+ std::unordered_map<string,unsigned> options;
+ options["bufferHint"] = config.bufferHint;
+ options["flushTimeout"] = config.flushTimeout;
+ options["inputQueueSize"] = config.inputQueueSize;
+ options["outputQueueSize"] = config.outputQueueSize;
+ options["queueNotifyThreshold"] = config.queueNotifyThreshold;
+ options["reopenInterval"] = config.reopenInterval;
+ auto fsl = new FrameStreamLogger(server.sin4.sin_family, server.toStringWithPort(), true, options);
+ fsl->setLogQueries(config.logQueries);
+ fsl->setLogResponses(config.logResponses);
+ result->emplace_back(fsl);
+ }
+ catch(const std::exception& e) {
+ g_log<<Logger::Error<<"Error while starting dnstap framestream logger to '"<<server<<": "<<e.what()<<endl;
+ }
+ catch(const PDNSException& e) {
+ g_log<<Logger::Error<<"Error while starting dnstap framestream logger to '"<<server<<": "<<e.reason<<endl;
+ }
+ }
+
+ return result;
+}
+
+static bool checkFrameStreamExport(LocalStateHolder<LuaConfigItems>& luaconfsLocal)
+{
+ if (!luaconfsLocal->frameStreamExportConfig.enabled) {
+ if (t_frameStreamServers) {
+ // dt's take care of cleanup
+ t_frameStreamServers.reset();
+ }
+
+ return false;
+ }
+
+ /* if the server was not running, or if it was running according to a
+ previous configuration */
+ if (!t_frameStreamServers ||
+ t_frameStreamServersGeneration < luaconfsLocal->generation) {
+
+ if (t_frameStreamServers) {
+ // dt's take care of cleanup
+ t_frameStreamServers.reset();
+ }
+
+ t_frameStreamServers = startFrameStreamServers(luaconfsLocal->frameStreamExportConfig);
+ t_frameStreamServersGeneration = luaconfsLocal->generation;
+ }
+
+ return true;
+}
+#endif /* HAVE_FSTRM */
#endif /* HAVE_PROTOBUF */
#ifdef NOD_ENABLED
}
#endif /* HAVE_PROTOBUF */
+#ifdef HAVE_FSTRM
+ checkFrameStreamExport(luaconfsLocal);
+#endif
+
DNSPacketWriter pw(packet, dc->d_mdp.d_qname, dc->d_mdp.d_qtype, dc->d_mdp.d_qclass);
pw.getHeader()->aa=0;
sr.setInitialRequestId(dc->d_uuid);
sr.setOutgoingProtobufServers(t_outgoingProtobufServers);
#endif
-
+#ifdef HAVE_FSTRM
+ sr.setFrameStreamServers(t_frameStreamServers);
+#endif
sr.setQuerySource(dc->d_remote, g_useIncomingECS && !dc->d_ednssubnet.source.empty() ? boost::optional<const EDNSSubnetOpts&>(dc->d_ednssubnet) : boost::none);
bool tracedQuery=false; // we could consider letting Lua know about this too
needECS = true;
}
logQuery = t_protobufServers && luaconfsLocal->protobufExportConfig.logQueries;
+#endif /* HAVE_PROTOBUF */
+
+#ifdef HAVE_FSTRM
+ checkFrameStreamExport(luaconfsLocal);
#endif
if(needECS || needXPF || (t_pdl && (t_pdl->d_gettag_ffi || t_pdl->d_gettag))) {
}
logQuery = t_protobufServers && luaconfsLocal->protobufExportConfig.logQueries;
bool logResponse = t_protobufServers && luaconfsLocal->protobufExportConfig.logResponses;
+#endif
+#ifdef HAVE_FSTRM
+ checkFrameStreamExport(luaconfsLocal);
#endif
EDNSSubnetOpts ednssubnet;
bool ecsFound = false;
checkProtobufExport(luaconfsLocal);
checkOutgoingProtobufExport(luaconfsLocal);
#endif /* HAVE_PROTOBUF */
+#ifdef HAVE_FSTRM
+ checkFrameStreamExport(luaconfsLocal);
+#endif
PacketID pident;
}
#endif /* HAVE_PROTOBUF */
+#ifdef HAVE_FSTRM
+typedef std::unordered_map<std::string, boost::variant<bool, uint64_t, std::string, std::vector<std::pair<int,std::string> > > > frameStreamOptions_t;
+
+static void parseFrameStreamOptions(boost::optional<frameStreamOptions_t> vars, FrameStreamExportConfig& config)
+{
+ if (!vars) {
+ return;
+ }
+
+ if (vars->count("logQueries")) {
+ config.logQueries = boost::get<bool>((*vars)["logQueries"]);
+ }
+ if (vars->count("logResponses")) {
+ config.logResponses = boost::get<bool>((*vars)["logResponses"]);
+ }
+
+ if (vars->count("bufferHint")) {
+ config.bufferHint = boost::get<uint64_t>((*vars)["bufferHint"]);
+ }
+ if (vars->count("flushTimeout")) {
+ config.flushTimeout = boost::get<uint64_t>((*vars)["flushTimeout"]);
+ }
+ if (vars->count("inputQueueSize")) {
+ config.inputQueueSize = boost::get<uint64_t>((*vars)["inputQueueSize"]);
+ }
+ if (vars->count("outputQueueSize")) {
+ config.outputQueueSize = boost::get<uint64_t>((*vars)["outputQueueSize"]);
+ }
+ if (vars->count("queueNotifyThreshold")) {
+ config.queueNotifyThreshold = boost::get<uint64_t>((*vars)["queueNotifyThreshold"]);
+ }
+ if (vars->count("reopenInterval")) {
+ config.reopenInterval = boost::get<uint64_t>((*vars)["reopenInterval"]);
+ }
+}
+#endif /* HAVE_FSTRM */
+
void loadRecursorLuaConfig(const std::string& fname, luaConfigDelayedThreads& delayedThreads)
{
LuaConfigItems lci;
});
#endif
+#ifdef HAVE_FSTRM
+ Lua.writeFunction("dnstapFrameStreamServer", [&lci](boost::variant<const std::string, const std::unordered_map<int, std::string>> servers, boost::optional<frameStreamOptions_t> vars) {
+ if (!lci.frameStreamExportConfig.enabled) {
+
+ lci.frameStreamExportConfig.enabled = true;
+
+ try {
+ if (servers.type() == typeid(std::string)) {
+ auto server = boost::get<const std::string>(servers);
+
+ lci.frameStreamExportConfig.servers.emplace_back(server);
+ }
+ else {
+ auto serversMap = boost::get<const std::unordered_map<int,std::string>>(servers);
+ for (const auto& serverPair : serversMap) {
+ lci.frameStreamExportConfig.servers.emplace_back(serverPair.second);
+ }
+ }
+
+ parseFrameStreamOptions(vars, lci.frameStreamExportConfig);
+ }
+ catch(std::exception& e) {
+ g_log<<Logger::Error<<"Error while starting dnstap framestream logger: "<<e.what()<<endl;
+ }
+ catch(PDNSException& e) {
+ g_log<<Logger::Error<<"Error while starting dnstap framestream logger: "<<e.reason<<endl;
+ }
+ }
+ else {
+ g_log<<Logger::Error<<"Only one dnstapFrameStreamServer() directive can be configured, we already have "<<lci.frameStreamExportConfig.servers.at(0).toString()<<endl;
+ }
+ });
+#endif /* HAVE_FSTRM */
+
try {
Lua.executeCode(ifs);
g_luaconfs.setState(lci);
bool taggedOnly{false};
};
+struct FrameStreamExportConfig
+{
+ std::vector<ComboAddress> servers;
+ bool enabled{false};
+ bool logQueries{true};
+ bool logResponses{true};
+ unsigned bufferHint{0};
+ unsigned flushTimeout{0};
+ unsigned inputQueueSize{0};
+ unsigned outputQueueSize{0};
+ unsigned queueNotifyThreshold{0};
+ unsigned reopenInterval{0};
+};
+
struct TrustAnchorFileInfo {
uint32_t interval{24};
std::string fname;
map<DNSName,std::string> negAnchors;
ProtobufExportConfig protobufExportConfig;
ProtobufExportConfig outgoingProtobufExportConfig;
+ FrameStreamExportConfig frameStreamExportConfig;
+
/* we need to increment this every time the configuration
is reloaded, so we know if we need to reload the protobuf
remote loggers */
doctrees
latex
PowerDNS-Recursor.pdf
+/*.pb.cc
+/*.pb.h
-DNODCACHEDIR=\"$(nodcachedir)\"
endif
+if FSTRM
+AM_CPPFLAGS += \
+ $(FSTRM_CFLAGS)
+endif
+
AM_LDFLAGS = \
$(PROGRAM_LDFLAGS) \
$(THREADFLAGS)
dnsmessage.pb.cc: dnsmessage.proto
$(AM_V_GEN)$(PROTOC) --cpp_out=./ $<
+if FSTRM
+dnstap.pb.cc: dnstap.proto
+ $(AM_V_GEN)$(PROTOC) -I$(srcdir) --cpp_out=./ $<
+endif
+
+
BUILT_SOURCES += dnsmessage.pb.cc
pdns_recursor_LDADD += $(PROTOBUF_LIBS)
nodist_pdns_recursor_SOURCES = dnsmessage.pb.cc dnsmessage.pb.h
+nodist_testrunner_SOURCES = dnsmessage.pb.cc dnsmessage.pb.h
+
+if FSTRM
+BUILT_SOURCES += dnstap.pb.cc
+pdns_recursor.$(OBJEXT): dnstap.pb.cc dnsmessage.pb.cc
+testrunner$(OBJEXT): dnstap.pb.cc dnsmessage.pb.cc
+nodist_pdns_recursor_SOURCES += dnstap.pb.cc dnstap.pb.h
+nodist_testrunner_SOURCES += dnstap.pb.cc dnstap.pb.h
+else
pdns_recursor.$(OBJEXT): dnsmessage.pb.cc
+testrunner$(OBJEXT): dnsmessage.pb.cc
+endif
-nodist_testrunner_SOURCES = dnsmessage.pb.cc dnsmessage.pb.h
testrunner_LDADD += $(PROTOBUF_LIBS)
-testrunner$(OBJEXT): dnsmessage.pb.cc
endif
endif
+if FSTRM
+pdns_recursor_SOURCES += \
+ dnstap.cc dnstap.hh fstrm_logger.cc fstrm_logger.hh dolog.hh
+
+pdns_recursor_LDADD += \
+ $(FSTRM_LIBS)
+endif
+
rec_control_SOURCES = \
arguments.cc arguments.hh \
dnsname.hh dnsname.cc \
[nodcachedir="$withval"]
)
+PDNS_CHECK_DNSTAP
+
AC_MSG_CHECKING([whether we will enable compiler security checks])
AC_ARG_ENABLE([hardening],
[AS_HELP_STRING([--disable-hardening], [disable compiler security checks @<:@default=no@:>@])],
[AC_MSG_NOTICE([nod: yes])],
[AC_MSG_NOTICE([nod: no])]
)
+AM_COND_IF([FSTRM],
+ [AC_MSG_NOTICE([dnstap: yes])],
+ [AC_MSG_NOTICE([dnstap: no])]
+)
AC_MSG_NOTICE([Context library: $pdns_context_library])
AC_MSG_NOTICE([])
--- /dev/null
+../dnstap.cc
\ No newline at end of file
--- /dev/null
+../dnstap.hh
\ No newline at end of file
--- /dev/null
+../dnstap.proto
\ No newline at end of file
The protocol buffers message types can be found in the `dnsmessage.proto <https://github.com/PowerDNS/pdns/blob/master/pdns/dnsmessage.proto>`_ file and is included here:
.. literalinclude:: ../../../dnsmessage.proto
+
+Logging in ``dnstap`` format using framestreams
+-----------------------------------------------
+Define the following function to enable logging of outgoing queries and/or responses in ``dnstap`` format.
+The recursor must have been built with configure ``--enable-dnstap`` to make this feature available.
+
+.. function:: dnstapFrameStreamServer(servers, [, options])
+
+ .. versionadded:: 4.X.0
+
+ Send dnstap formatted message to one or more framestream servers for outgoing queries and/or incoming responses.
+
+ :param servers: The IP and port to connect to, or a list of those. If more than one server is configured, all messages are sent to every server.
+ :type servers: string or list of strings
+ :param table options: A table with ``key=value`` pairs with options.
+
+ Options:
+
+ * ``logQueries=true``: bool - log oputgoing queries
+ * ``logResponses=true``: bool - log incoming responses
+
+ The follwing options apply to the settings of the framestream library. Refer to the documentation of that
+ library for the default values, exact description and allowable values for these options.
+ For all these options, absence or a zero value has the effect of using the library-provided default value.
+
+ * ``bufferHint=0``: unsigned
+ * ``flushTimeout=0``: unsigned
+ * ``inputQueueSize=0``: unsigned
+ * ``outputQueueSize=0``: unsigned
+ * ``queueNotifyThreshold=0``: unsigned
+ * ``reopenInterval=0``: unsigned
+
--- /dev/null
+../dolog.hh
\ No newline at end of file
--- /dev/null
+../fstrm_logger.cc
\ No newline at end of file
--- /dev/null
+../fstrm_logger.hh
\ No newline at end of file
--- /dev/null
+../../../m4/pdns_check_dnstap.m4
\ No newline at end of file
return false;
}
-int asyncresolve(const ComboAddress& ip, const DNSName& domain, int type, bool doTCP, bool sendRDQuery, int EDNS0Level, struct timeval* now, boost::optional<Netmask>& srcmask, boost::optional<const ResolveContext&> context, const std::shared_ptr<std::vector<std::unique_ptr<RemoteLogger>>>& outgoingLoggers, const std::set<uint16_t>& exportTypes, LWResult* res, bool* chained)
+int asyncresolve(const ComboAddress& ip, const DNSName& domain, int type, bool doTCP, bool sendRDQuery, int EDNS0Level, struct timeval* now, boost::optional<Netmask>& srcmask, boost::optional<const ResolveContext&> context, const std::shared_ptr<std::vector<std::unique_ptr<RemoteLogger>>>& outgoingLoggers, const std::shared_ptr<std::vector<std::unique_ptr<RemoteLoggerInterface>>>& fstrmLoggers,const std::set<uint16_t>& exportTypes, LWResult* res, bool* chained)
{
return 0;
}
ret = d_asyncResolve(ip, sendQname, type, doTCP, sendRDQuery, EDNSLevel, now, srcmask, ctx, res, chained);
}
else {
- ret=asyncresolve(ip, sendQname, type, doTCP, sendRDQuery, EDNSLevel, now, srcmask, ctx, d_outgoingProtobufServers, luaconfsLocal->outgoingProtobufExportConfig.exportTypes, res, chained);
+ ret=asyncresolve(ip, sendQname, type, doTCP, sendRDQuery, EDNSLevel, now, srcmask, ctx, d_outgoingProtobufServers, d_frameStreamServers, luaconfsLocal->outgoingProtobufExportConfig.exportTypes, res, chained);
}
if(ret < 0) {
return ret; // transport error, nothing to learn here
#ifdef HAVE_PROTOBUF
#include <boost/uuid/uuid.hpp>
+#ifdef HAVE_FSTRM
+#include "fstrm_logger.hh"
+#endif /* HAVE_FSTRM */
#endif
class RecursorLua4;
}
#endif
+#ifdef HAVE_FSTRM
+ void setFrameStreamServers(std::shared_ptr<std::vector<std::unique_ptr<RemoteLoggerInterface>>>& servers)
+ {
+ d_frameStreamServers = servers;
+ }
+#endif /* HAVE_FSTRM */
+
void setAsyncCallback(asyncresolve_t func)
{
d_asyncResolve = func;
shared_ptr<RecursorLua4> d_pdl;
boost::optional<Netmask> d_outgoingECSNetwork;
std::shared_ptr<std::vector<std::unique_ptr<RemoteLogger>>> d_outgoingProtobufServers{nullptr};
+ std::shared_ptr<std::vector<std::unique_ptr<RemoteLoggerInterface>>> d_frameStreamServers{nullptr};
#ifdef HAVE_PROTOBUF
boost::optional<const boost::uuids::uuid&> d_initialRequestId;
#endif
"PKCS#11 " <<
#endif
#ifdef HAVE_PROTOBUF
- "protobuf " <<
+"protobuf " <<
+#endif
+#ifdef HAVE_FSTRM
+"dnstap-framestream " <<
#endif
#ifdef REMOTEBACKEND_ZEROMQ
"remotebackend-zeromq " <<
pip install -r requirements.txt
protoc -I=../pdns/ --python_out=. ../pdns/dnsmessage.proto
+protoc -I=../pdns/ --python_out=. ../pdns/dnstap.proto
+
mkdir -p configs
LIBFAKETIME_DEFAULT=/usr/local/lib/faketime/libfaketime.1.dylib
LIBAUTHBIND_DEFAULT=""
fi
+if [ $(uname -s) = "OpenBSD" ]; then
+ # OpenBSD is not /really/ supported here; it works for some tests, and then you might need sudo.
+ LIBFAKETIME_DEFAULT=""
+ LIBAUTHBIND_DEFAULT=""
+fi
export LIBFAKETIME=${LIBFAKETIME:-$LIBFAKETIME_DEFAULT}
export LIBAUTHBIND=${LIBAUTHBIND:-$LIBAUTHBIND_DEFAULT}
if [ "${PDNS_DEBUG}" = "YES" ]; then
set -x
fi
+
+if ! "$PDNSRECURSOR" --version 2>&1 | grep Features | grep -q dnstap-framestream; then
+ export NODNSTAPTESTS=1
+fi
+
+if [ "${LIBAUTHBIND}" != "" -o "${LIBFAKETIME}" != "" ]; then
LD_PRELOAD="${LIBASAN} ${LIBAUTHBIND} ${LIBFAKETIME}" nosetests -I test_WellKnown.py --with-xunit $@
+else
+nosetests -I test_WellKnown.py --with-xunit $@
+fi
--- /dev/null
+import dns
+import dnsmessage_pb2
+import os
+import socket
+import struct
+import sys
+import threading
+import time
+
+import dns
+import dnstap_pb2
+
+FSTRM_CONTROL_ACCEPT = 0x01
+FSTRM_CONTROL_START = 0x02
+FSTRM_CONTROL_STOP = 0x03
+FSTRM_CONTROL_READY = 0x04
+FSTRM_CONTROL_FINISH = 0x05
+
+# Python2/3 compatibility hacks
+try:
+ from queue import Queue
+except ImportError:
+ from Queue import Queue
+
+try:
+ range = xrange
+except NameError:
+ pass
+
+from nose import SkipTest
+from recursortests import RecursorTest
+
+def checkDnstapBase(testinstance, dnstap, protocol, initiator):
+ testinstance.assertTrue(dnstap)
+ testinstance.assertTrue(dnstap.HasField('identity'))
+ #testinstance.assertEqual(dnstap.identity, b'a.server')
+ testinstance.assertTrue(dnstap.HasField('version'))
+ #testinstance.assertIn(b'dnsdist ', dnstap.version)
+ testinstance.assertTrue(dnstap.HasField('type'))
+ testinstance.assertEqual(dnstap.type, dnstap.MESSAGE)
+ testinstance.assertTrue(dnstap.HasField('message'))
+ testinstance.assertTrue(dnstap.message.HasField('socket_protocol'))
+ testinstance.assertEqual(dnstap.message.socket_protocol, protocol)
+ testinstance.assertTrue(dnstap.message.HasField('socket_family'))
+ testinstance.assertEquals(dnstap.message.socket_family, dnstap_pb2.INET)
+ #testinstance.assertTrue(dnstap.message.HasField('query_address'))
+ #testinstance.assertEquals(socket.inet_ntop(socket.AF_INET, dnstap.message.query_address), initiator)
+ testinstance.assertTrue(dnstap.message.HasField('response_address'))
+ testinstance.assertEquals(socket.inet_ntop(socket.AF_INET, dnstap.message.response_address), initiator)
+ testinstance.assertTrue(dnstap.message.HasField('response_port'))
+ testinstance.assertEquals(dnstap.message.response_port, 53)
+
+
+def checkDnstapQuery(testinstance, dnstap, protocol, query, initiator='127.0.0.1'):
+ testinstance.assertEquals(dnstap.message.type, dnstap_pb2.Message.RESOLVER_QUERY)
+ checkDnstapBase(testinstance, dnstap, protocol, initiator)
+
+ testinstance.assertTrue(dnstap.message.HasField('query_time_sec'))
+ testinstance.assertTrue(dnstap.message.HasField('query_time_nsec'))
+
+ testinstance.assertTrue(dnstap.message.HasField('query_message'))
+ wire_message = dns.message.from_wire(dnstap.message.query_message)
+ #testinstance.assertEqual(wire_message, query)
+
+
+def checkDnstapExtra(testinstance, dnstap, expected):
+ testinstance.assertTrue(dnstap.HasField('extra'))
+ testinstance.assertEqual(dnstap.extra, expected)
+
+
+def checkDnstapNoExtra(testinstance, dnstap):
+ testinstance.assertFalse(dnstap.HasField('extra'))
+
+
+def checkDnstapResponse(testinstance, dnstap, protocol, response, initiator='127.0.0.1'):
+ testinstance.assertEquals(dnstap.message.type, dnstap_pb2.Message.RESOLVER_RESPONSE)
+ checkDnstapBase(testinstance, dnstap, protocol, initiator)
+
+ testinstance.assertTrue(dnstap.message.HasField('query_time_sec'))
+ testinstance.assertTrue(dnstap.message.HasField('query_time_nsec'))
+
+ testinstance.assertTrue(dnstap.message.HasField('response_time_sec'))
+ testinstance.assertTrue(dnstap.message.HasField('response_time_nsec'))
+
+ testinstance.assertTrue(dnstap.message.response_time_sec > dnstap.message.query_time_sec or \
+ dnstap.message.response_time_nsec > dnstap.message.query_time_nsec)
+
+ testinstance.assertTrue(dnstap.message.HasField('response_message'))
+ wire_message = dns.message.from_wire(dnstap.message.response_message)
+ testinstance.assertEqual(wire_message, response)
+
+def fstrm_get_control_frame_type(data):
+ (t,) = struct.unpack("!L", data[0:4])
+ return t
+
+
+def fstrm_make_control_frame_reply(cft, data):
+ if cft == FSTRM_CONTROL_READY:
+ # Reply with ACCEPT frame and content-type
+ contenttype = b'protobuf:dnstap.Dnstap'
+ frame = struct.pack('!LLL', FSTRM_CONTROL_ACCEPT, 1,
+ len(contenttype)) + contenttype
+ buf = struct.pack("!LL", 0, len(frame)) + frame
+ return buf
+ elif cft == FSTRM_CONTROL_START:
+ return None
+ else:
+ raise Exception('unhandled control frame ' + cft)
+
+
+def fstrm_read_and_dispatch_control_frame(conn):
+ data = conn.recv(4)
+ if not data:
+ raise Exception('length of control frame payload could not be read')
+ (datalen,) = struct.unpack("!L", data)
+ data = conn.recv(datalen)
+ cft = fstrm_get_control_frame_type(data)
+ reply = fstrm_make_control_frame_reply(cft, data)
+ if reply:
+ conn.send(reply)
+ return cft
+
+
+def fstrm_handle_bidir_connection(conn, on_data):
+ data = None
+ while True:
+ data = conn.recv(4)
+ if not data:
+ break
+ (datalen,) = struct.unpack("!L", data)
+ if datalen == 0:
+ # control frame length follows
+ cft = fstrm_read_and_dispatch_control_frame(conn)
+ if cft == FSTRM_CONTROL_STOP:
+ break
+ else:
+ # data frame
+ data = conn.recv(datalen)
+ if not data:
+ break
+
+ on_data(data)
+
+
+
+class DNSTapServerParams:
+ def __init__(self, port):
+ self.queue = Queue()
+ self.port = port
+
+
+DNSTapServerParameters = DNSTapServerParams(4243)
+DNSTapListeners = []
+
+class TestRecursorDNSTap(RecursorTest):
+ @classmethod
+ def FrameStreamUnixListener(cls, conn, param):
+ while True:
+ try:
+ fstrm_handle_bidir_connection(conn, lambda data: \
+ param.queue.put(data, True, timeout=2.0))
+ except socket.error as e:
+ if e.errno == 9:
+ break
+ printf("Unexpected socket error %d", e)
+ sys.exit(1)
+ conn.close()
+
+ @classmethod
+ def FrameStreamUnixListenerMain(cls, param):
+ sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+ sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1)
+ try:
+ sock.bind(("127.0.0.1", param.port))
+ except socket.error as e:
+ print("Error binding in the framestream listener: %s" % str(e))
+ sys.exit(1)
+ DNSTapListeners.append(sock)
+ sock.listen(100)
+ while True:
+ (conn, _) = sock.accept()
+ print("Accepting connection")
+ listener = threading.Thread(name='DNSTap Worker', target=cls.FrameStreamUnixListener, args=[conn, param])
+ listener.setDaemon(True)
+ listener.start()
+
+ sock.close()
+
+ @classmethod
+ def setUpClass(cls):
+
+ if os.environ.get("NODNSTAPTESTS") == "1":
+ raise SkipTest("Not Yet Supported")
+
+ cls.setUpSockets()
+
+ cls.startResponders()
+
+ listener = threading.Thread(name='DNSTap Listener', target=cls.FrameStreamUnixListenerMain, args=[DNSTapServerParameters])
+ listener.setDaemon(True)
+ listener.start()
+
+
+ confdir = os.path.join('configs', cls._confdir)
+ cls.createConfigDir(confdir)
+
+ cls.generateRecursorConfig(confdir)
+ cls.startRecursor(confdir, cls._recursorPort)
+
+ def setUp(self):
+ # Make sure the queue is empty, in case
+ # a previous test failed
+ while not DNSTapServerParameters.queue.empty():
+ DNSTapServerParameters.queue.get(False)
+
+ @classmethod
+ def generateRecursorConfig(cls, confdir):
+ authzonepath = os.path.join(confdir, 'example.zone')
+ with open(authzonepath, 'w') as authzone:
+ authzone.write("""$ORIGIN example.
+@ 3600 IN SOA {soa}
+a 3600 IN A 192.0.2.42
+tagged 3600 IN A 192.0.2.84
+query-selected 3600 IN A 192.0.2.84
+answer-selected 3600 IN A 192.0.2.84
+types 3600 IN A 192.0.2.84
+types 3600 IN AAAA 2001:DB8::1
+types 3600 IN TXT "Lorem ipsum dolor sit amet"
+types 3600 IN MX 10 a.example.
+types 3600 IN SPF "v=spf1 -all"
+types 3600 IN SRV 10 20 443 a.example.
+cname 3600 IN CNAME a.example.
+
+""".format(soa=cls._SOA))
+ super(TestRecursorDNSTap, cls).generateRecursorConfig(confdir)
+
+ @classmethod
+ def tearDownClass(cls):
+ cls.tearDownRecursor()
+ for listerner in DNSTapListeners:
+ listerner.close()
+
+class DNSTapDefaultTest(TestRecursorDNSTap):
+ """
+ This test makes sure that we correctly export outgoing queries over DNSTap.
+ It must be improved and setup env so we can check for incoming responses, but makes sure for now
+ that the recursor at least connects to the DNSTap server.
+ """
+
+ _confdir = 'DNSTapDefault'
+ _config_template = """
+auth-zones=example=configs/%s/example.zone""" % _confdir
+ _lua_config_file = """
+ dnstapFrameStreamServer({"127.0.0.1:%d"})
+ """ % (DNSTapServerParameters.port)
+
+ def getFirstDnstap(self):
+ data = DNSTapServerParameters.queue.get(True, timeout=2.0)
+ self.assertTrue(data)
+ dnstap = dnstap_pb2.Dnstap()
+ dnstap.ParseFromString(data)
+ return dnstap
+
+ def testA(self):
+
+ name = 'www.example.org.'
+ query = dns.message.make_query(name, 'A', want_dnssec=True)
+ query.flags |= dns.flags.RD
+ res = self.sendUDPQuery(query)
+
+ # check the DNSTap messages corresponding to the UDP query and answer
+ # check the dnstap message corresponding to the UDP query
+ dnstap = self.getFirstDnstap()
+
+ checkDnstapQuery(self, dnstap, dnstap_pb2.UDP, query, '127.0.0.8')
+ # We don't expect a response
+ checkDnstapNoExtra(self, dnstap)
+
+class DNSTapLogNoQueriesTest(TestRecursorDNSTap):
+ """
+ This test makes sure that we correctly export outgoing queries over DNSTap.
+ It must be improved and setup env so we can check for incoming responses, but makes sure for now
+ that the recursor at least connects to the DNSTap server.
+ """
+
+ _confdir = 'DNSTapLogNoQueries'
+ _config_template = """
+auth-zones=example=configs/%s/example.zone""" % _confdir
+ _lua_config_file = """
+ dnstapFrameStreamServer({"127.0.0.1:%d"}, {logQueries=false})
+ """ % (DNSTapServerParameters.port)
+
+ def testA(self):
+ name = 'www.example.org.'
+ query = dns.message.make_query(name, 'A', want_dnssec=True)
+ query.flags |= dns.flags.RD
+ res = self.sendUDPQuery(query)
+
+ # We don't expect anything
+ self.assertTrue(DNSTapServerParameters.queue.empty())
+