#include "fstrm_logger.hh"
bool g_syslog;
-static FrameStreamLogger *mylogger = new FrameStreamLogger(AF_INET, "127.0.0.1:9999", true);
-#endif // HAVE_FSTRM
-
-static void logOutgoingQuery(const std::shared_ptr<std::vector<std::unique_ptr<RemoteLogger>>>& outgoingLoggers, const struct timeval &queryTime, boost::optional<const boost::uuids::uuid&> initialRequestId, const boost::uuids::uuid& uuid, const ComboAddress& ip, const DNSName& domain, int type, uint16_t qid, bool doTCP, const vector<uint8_t>& packet, boost::optional<Netmask>& srcmask)
+static void logFstreamQuery(const std::shared_ptr<std::vector<std::unique_ptr<RemoteLoggerInterface>>>& fstreamLoggers, const struct timeval &queryTime, const ComboAddress& ip, bool doTCP, const vector<uint8_t>& packet)
{
- if(!outgoingLoggers)
+ if (fstreamLoggers == nullptr)
return;
- std::string str;
- if (0) {
- RecProtoBufMessage message(DNSProtoBufMessage::OutgoingQuery, uuid, nullptr, &ip, domain, type, QClass::IN, qid,
- doTCP, packet.size());
- message.setServerIdentity(SyncRes::s_serverID);
+ struct timespec ts;
+ TIMEVAL_TO_TIMESPEC(&queryTime, &ts);
+ DnstapMessage message(SyncRes::s_serverID, nullptr, &ip, doTCP, reinterpret_cast<const char*>(&*packet.begin()), packet.size(), &ts, nullptr, true);
+ std::string str;
+ message.serialize(str);
- if (initialRequestId) {
- message.setInitialRequestID(*initialRequestId);
- }
+ for (auto& logger : *fstreamLoggers) {
+ logger->queueData(str);
+ }
+}
- if (srcmask) {
- message.setEDNSSubnet(*srcmask);
- }
+static void logFstreamResponse(const std::shared_ptr<std::vector<std::unique_ptr<RemoteLoggerInterface>>>& fstreamLoggers, const ComboAddress& ip, bool doTCP, const std::string& packet, const struct timeval& queryTime, const struct timeval& replyTime)
+{
+ if (fstreamLoggers == nullptr)
+ return;
-// cerr <<message.toDebugString()<<endl;
- message.serialize(str);
+ struct timespec ts1, ts2;
+ TIMEVAL_TO_TIMESPEC(&queryTime, &ts1);
+ TIMEVAL_TO_TIMESPEC(&replyTime, &ts2);
+ DnstapMessage message(SyncRes::s_serverID, nullptr, &ip, doTCP, static_cast<const char*>(&*packet.begin()), packet.size(), &ts1, &ts2, true);
+ std::string str;
+ message.serialize(str);
- for (auto& logger : *outgoingLoggers) {
- logger->queueData(str);
- }
- }
-#ifdef HAVE_FSTRM
- else {
- struct timespec ts;
- TIMEVAL_TO_TIMESPEC(&queryTime, &ts);
- DnstapMessage message(SyncRes::s_serverID, nullptr, &ip, doTCP, (const char*)&*packet.begin(), packet.size(), &ts, nullptr, true);
- message.serialize(str);
- mylogger->queueData(str);
+ for (auto& logger : *fstreamLoggers) {
+ logger->queueData(str);
}
-#endif // HAVE_FSTRM
}
+#endif // HAVE_FSTRM
-static void logIncomingResponse(const std::shared_ptr<std::vector<std::unique_ptr<RemoteLogger>>>& outgoingLoggers, boost::optional<const boost::uuids::uuid&> initialRequestId, const boost::uuids::uuid& uuid, const ComboAddress& ip, const DNSName& domain, int type, uint16_t qid, bool doTCP, const std::string& packet, int rcode, const std::vector<DNSRecord>& records, const struct timeval& queryTime, const struct timeval& replyTime, const std::set<uint16_t>& exportTypes)
+static void logOutgoingQuery(const std::shared_ptr<std::vector<std::unique_ptr<RemoteLogger>>>& outgoingLoggers, boost::optional<const boost::uuids::uuid&> initialRequestId, const boost::uuids::uuid& uuid, const ComboAddress& ip, const DNSName& domain, int type, uint16_t qid, bool doTCP, size_t bytes, boost::optional<Netmask>& srcmask)
{
if(!outgoingLoggers)
return;
- std::string str;
+ RecProtoBufMessage message(DNSProtoBufMessage::OutgoingQuery, uuid, nullptr, &ip, domain, type, QClass::IN, qid, doTCP, bytes);
+ message.setServerIdentity(SyncRes::s_serverID);
- if (0) {
- RecProtoBufMessage message(DNSProtoBufMessage::IncomingResponse, uuid, nullptr, &ip, domain, type, QClass::IN, qid,
- doTCP, packet.size());
- message.setServerIdentity(SyncRes::s_serverID);
- if (initialRequestId) {
- message.setInitialRequestID(*initialRequestId);
- }
- message.setQueryTime(queryTime.tv_sec, queryTime.tv_usec);
- message.setResponseCode(rcode);
- message.addRRs(records, exportTypes);
+ if (initialRequestId) {
+ message.setInitialRequestID(*initialRequestId);
+ }
+
+ if (srcmask) {
+ message.setEDNSSubnet(*srcmask);
+ }
// cerr <<message.toDebugString()<<endl;
- message.serialize(str);
- for (auto& logger : *outgoingLoggers) {
- logger->queueData(str);
- }
+ std::string str;
+ message.serialize(str);
+
+ for (auto& logger : *outgoingLoggers) {
+ logger->queueData(str);
}
-#ifdef HAVE_FSTRM
- else {
- struct timespec ts1, ts2;
- TIMEVAL_TO_TIMESPEC(&queryTime, &ts1);
- TIMEVAL_TO_TIMESPEC(&replyTime, &ts2);
- DnstapMessage message(SyncRes::s_serverID, nullptr, &ip, doTCP, (const char*)&*packet.begin(), packet.size(), &ts1, &ts2, true);
- message.serialize(str);
- mylogger->queueData(str);
+}
+
+static void logIncomingResponse(const std::shared_ptr<std::vector<std::unique_ptr<RemoteLogger>>>& outgoingLoggers, boost::optional<const boost::uuids::uuid&> initialRequestId, const boost::uuids::uuid& uuid, const ComboAddress& ip, const DNSName& domain, int type, uint16_t qid, bool doTCP, size_t bytes, int rcode, const std::vector<DNSRecord>& records, const struct timeval& queryTime, const std::set<uint16_t>& exportTypes)
+{
+ if(!outgoingLoggers)
+ return;
+
+ RecProtoBufMessage message(DNSProtoBufMessage::IncomingResponse, uuid, nullptr, &ip, domain, type, QClass::IN, qid, doTCP, bytes);
+ message.setServerIdentity(SyncRes::s_serverID);
+ if (initialRequestId) {
+ message.setInitialRequestID(*initialRequestId);
}
-#endif // HAVE_FSTRM
+ message.setQueryTime(queryTime.tv_sec, queryTime.tv_usec);
+ message.setResponseCode(rcode);
+ message.addRRs(records, exportTypes);
+// cerr <<message.toDebugString()<<endl;
+ std::string str;
+ message.serialize(str);
+
+ for (auto& logger : *outgoingLoggers) {
+ logger->queueData(str);
+ }
}
#endif /* HAVE_PROTOBUF */
/** lwr is only filled out in case 1 was returned, and even when returning 1 for 'success', lwr might contain DNS errors
Never throws!
*/
-int asyncresolve(const ComboAddress& ip, const DNSName& domain, int type, bool doTCP, bool sendRDQuery, int EDNS0Level, struct timeval* now, boost::optional<Netmask>& srcmask, boost::optional<const ResolveContext&> context, const std::shared_ptr<std::vector<std::unique_ptr<RemoteLogger>>>& outgoingLoggers, const std::set<uint16_t>& exportTypes, LWResult *lwr, bool* chained)
+int asyncresolve(const ComboAddress& ip, const DNSName& domain, int type, bool doTCP, bool sendRDQuery, int EDNS0Level, struct timeval* now, boost::optional<Netmask>& srcmask, boost::optional<const ResolveContext&> context, const std::shared_ptr<std::vector<std::unique_ptr<RemoteLogger>>>& outgoingLoggers, const std::shared_ptr<std::vector<std::unique_ptr<RemoteLoggerInterface>>>& fstrmLoggers, const std::set<uint16_t>& exportTypes, LWResult *lwr, bool* chained)
{
size_t len;
size_t bufsize=g_outgoingEDNSBufsize;
if (outgoingLoggers) {
uuid = getUniqueID();
- logOutgoingQuery(outgoingLoggers, queryTime, context ? context->d_initialRequestId : boost::none, uuid, ip, domain, type, qid, doTCP, vpacket, srcmask);
+ logOutgoingQuery(outgoingLoggers, context ? context->d_initialRequestId : boost::none, uuid, ip, domain, type, qid, doTCP, vpacket.size(), srcmask);
}
-#endif
+#endif /* HAVE_PROTOBUF */
+#ifdef HAVE_FSTRM
+ if (fstrmLoggers) {
+ logFstreamQuery(fstrmLoggers, queryTime, ip, doTCP, vpacket);
+ }
+#endif /* HAVE_FSTRM */
srcmask = boost::none; // this is also our return value, even if EDNS0Level == 0
return ret;
buf.resize(len);
+
+#ifdef HAVE_FSTRM
+ if (fstrmLoggers) {
+ logFstreamResponse(fstrmLoggers, ip, doTCP, buf, queryTime, *now);
+ }
+#endif /* HAVE_FSTRM */
+
lwr->d_records.clear();
try {
lwr->d_tcbit=0;
if(mdp.d_header.rcode == RCode::FormErr && mdp.d_qname.empty() && mdp.d_qtype == 0 && mdp.d_qclass == 0) {
#ifdef HAVE_PROTOBUF
if(outgoingLoggers) {
- logIncomingResponse(outgoingLoggers, context ? context->d_initialRequestId : boost::none, uuid, ip, domain, type, qid, doTCP, buf, lwr->d_rcode, lwr->d_records, queryTime, *now, exportTypes);
+ logIncomingResponse(outgoingLoggers, context ? context->d_initialRequestId : boost::none, uuid, ip, domain, type, qid, doTCP, len, lwr->d_rcode, lwr->d_records, queryTime, exportTypes);
}
#endif
lwr->d_validpacket=true;
#ifdef HAVE_PROTOBUF
if(outgoingLoggers) {
- logIncomingResponse(outgoingLoggers, context ? context->d_initialRequestId : boost::none, uuid, ip, domain, type, qid, doTCP, buf, lwr->d_rcode, lwr->d_records, queryTime, *now, exportTypes);
+ logIncomingResponse(outgoingLoggers, context ? context->d_initialRequestId : boost::none, uuid, ip, domain, type, qid, doTCP, len, lwr->d_rcode, lwr->d_records, queryTime, exportTypes);
}
#endif
lwr->d_validpacket=true;
g_stats.serverParseError++;
#ifdef HAVE_PROTOBUF
if(outgoingLoggers) {
- logIncomingResponse(outgoingLoggers, context ? context->d_initialRequestId : boost::none, uuid, ip, domain, type, qid, doTCP, buf, lwr->d_rcode, lwr->d_records, queryTime, *now, exportTypes);
+ logIncomingResponse(outgoingLoggers, context ? context->d_initialRequestId : boost::none, uuid, ip, domain, type, qid, doTCP, len, lwr->d_rcode, lwr->d_records, queryTime, exportTypes);
}
#endif
lwr->d_validpacket=false;
#ifdef HAVE_PROTOBUF
#include "uuid-utils.hh"
-#endif
+#endif /* HAVE_PROTOBUF */
#include "xpf.hh"
static thread_local uint64_t t_outgoingProtobufServersGeneration;
#endif /* HAVE_PROTOBUF */
+#ifdef HAVE_FSTRM
+static thread_local std::shared_ptr<std::vector<std::unique_ptr<RemoteLoggerInterface>>> t_frameStreamServers{nullptr};
+static thread_local uint64_t t_frameStreamServersGeneration;
+#endif /* HAVE_FSTRM */
+
thread_local std::unique_ptr<MT_t> MT; // the big MTasker
thread_local std::unique_ptr<MemRecursorCache> t_RC;
thread_local std::unique_ptr<RecursorPacketCache> t_packetCache;
return true;
}
+
+#ifdef HAVE_FSTRM
+
+static std::shared_ptr<std::vector<std::unique_ptr<RemoteLoggerInterface>>> startFrameStreamServers(const FrameStreamExportConfig& config)
+{
+ auto result = std::make_shared<std::vector<std::unique_ptr<RemoteLoggerInterface>>>();
+
+ for (const auto& server : config.servers) {
+ try {
+ result->emplace_back(new FrameStreamLogger(server.sin4.sin_family, server.toStringWithPort(), true));
+ }
+ catch(const std::exception& e) {
+ g_log<<Logger::Error<<"Error while starting dnstap framestream logger to '"<<server<<": "<<e.what()<<endl;
+ }
+ catch(const PDNSException& e) {
+ g_log<<Logger::Error<<"Error while starting dnstap framestream logger to '"<<server<<": "<<e.reason<<endl;
+ }
+ }
+
+ return result;
+}
+
+static bool checkFrameStreamExport(LocalStateHolder<LuaConfigItems>& luaconfsLocal)
+{
+ if (!luaconfsLocal->frameStreamExportConfig.enabled) {
+ if (t_frameStreamServers) {
+ // dt's take care of cleanup
+ t_frameStreamServers.reset();
+ }
+
+ return false;
+ }
+
+ /* if the server was not running, or if it was running according to a
+ previous configuration */
+ if (!t_frameStreamServers ||
+ t_frameStreamServersGeneration < luaconfsLocal->generation) {
+
+ if (t_frameStreamServers) {
+ // dt's take care of cleanup
+ t_frameStreamServers.reset();
+ }
+
+ t_frameStreamServers = startFrameStreamServers(luaconfsLocal->frameStreamExportConfig);
+ t_frameStreamServersGeneration = luaconfsLocal->generation;
+ }
+
+ return true;
+}
+#endif /* HAVE_FSTRM */
#endif /* HAVE_PROTOBUF */
#ifdef NOD_ENABLED
}
#endif /* HAVE_PROTOBUF */
+#ifdef HAVE_FSTRM
+ checkFrameStreamExport(luaconfsLocal);
+#endif
+
DNSPacketWriter pw(packet, dc->d_mdp.d_qname, dc->d_mdp.d_qtype, dc->d_mdp.d_qclass);
pw.getHeader()->aa=0;
sr.setInitialRequestId(dc->d_uuid);
sr.setOutgoingProtobufServers(t_outgoingProtobufServers);
#endif
-
+#ifdef HAVE_FSTRM
+ sr.setFrameStreamServers(t_frameStreamServers);
+#endif
sr.setQuerySource(dc->d_remote, g_useIncomingECS && !dc->d_ednssubnet.source.empty() ? boost::optional<const EDNSSubnetOpts&>(dc->d_ednssubnet) : boost::none);
bool tracedQuery=false; // we could consider letting Lua know about this too
needECS = true;
}
logQuery = t_protobufServers && luaconfsLocal->protobufExportConfig.logQueries;
+#endif /* HAVE_PROTOBUF */
+
+#ifdef HAVE_FSTRM
+ checkFrameStreamExport(luaconfsLocal);
#endif
if(needECS || needXPF || (t_pdl && (t_pdl->d_gettag_ffi || t_pdl->d_gettag))) {
}
logQuery = t_protobufServers && luaconfsLocal->protobufExportConfig.logQueries;
bool logResponse = t_protobufServers && luaconfsLocal->protobufExportConfig.logResponses;
+#endif
+#ifdef HAVE_FSTRM
+ checkFrameStreamExport(luaconfsLocal);
#endif
EDNSSubnetOpts ednssubnet;
bool ecsFound = false;
checkProtobufExport(luaconfsLocal);
checkOutgoingProtobufExport(luaconfsLocal);
#endif /* HAVE_PROTOBUF */
+#ifdef HAVE_FSTRM
+ checkFrameStreamExport(luaconfsLocal);
+#endif
PacketID pident;