#ifdef HAVE_PROTOBUF
-static void logOutgoingQuery(std::shared_ptr<RemoteLogger> outgoingLogger, boost::optional<const boost::uuids::uuid&> initialRequestId, const boost::uuids::uuid& uuid, const ComboAddress& ip, const DNSName& domain, int type, uint16_t qid, bool doTCP, size_t bytes, boost::optional<Netmask>& srcmask)
+static void logOutgoingQuery(const std::shared_ptr<std::vector<std::shared_ptr<RemoteLogger>>>& outgoingLoggers, boost::optional<const boost::uuids::uuid&> initialRequestId, const boost::uuids::uuid& uuid, const ComboAddress& ip, const DNSName& domain, int type, uint16_t qid, bool doTCP, size_t bytes, boost::optional<Netmask>& srcmask)
{
- if(!outgoingLogger)
+ if(!outgoingLoggers)
return;
RecProtoBufMessage message(DNSProtoBufMessage::OutgoingQuery, uuid, nullptr, &ip, domain, type, QClass::IN, qid, doTCP, bytes);
// cerr <<message.toDebugString()<<endl;
std::string str;
message.serialize(str);
- outgoingLogger->queueData(str);
+
+ for (auto& logger : *outgoingLoggers) {
+ logger->queueData(str);
+ }
}
-static void logIncomingResponse(std::shared_ptr<RemoteLogger> outgoingLogger, boost::optional<const boost::uuids::uuid&> initialRequestId, const boost::uuids::uuid& uuid, const ComboAddress& ip, const DNSName& domain, int type, uint16_t qid, bool doTCP, size_t bytes, int rcode, const std::vector<DNSRecord>& records, const struct timeval& queryTime, const std::set<uint16_t>& exportTypes)
+static void logIncomingResponse(const std::shared_ptr<std::vector<std::shared_ptr<RemoteLogger>>>& outgoingLoggers, boost::optional<const boost::uuids::uuid&> initialRequestId, const boost::uuids::uuid& uuid, const ComboAddress& ip, const DNSName& domain, int type, uint16_t qid, bool doTCP, size_t bytes, int rcode, const std::vector<DNSRecord>& records, const struct timeval& queryTime, const std::set<uint16_t>& exportTypes)
{
- if(!outgoingLogger)
+ if(!outgoingLoggers)
return;
RecProtoBufMessage message(DNSProtoBufMessage::IncomingResponse, uuid, nullptr, &ip, domain, type, QClass::IN, qid, doTCP, bytes);
// cerr <<message.toDebugString()<<endl;
std::string str;
message.serialize(str);
- outgoingLogger->queueData(str);
+
+ for (auto& logger : *outgoingLoggers) {
+ logger->queueData(str);
+ }
}
#endif /* HAVE_PROTOBUF */
/** lwr is only filled out in case 1 was returned, and even when returning 1 for 'success', lwr might contain DNS errors
Never throws!
*/
-int asyncresolve(const ComboAddress& ip, const DNSName& domain, int type, bool doTCP, bool sendRDQuery, int EDNS0Level, struct timeval* now, boost::optional<Netmask>& srcmask, boost::optional<const ResolveContext&> context, const std::shared_ptr<RemoteLogger>& outgoingLogger, const std::set<uint16_t>& exportTypes, LWResult *lwr, bool* chained)
+int asyncresolve(const ComboAddress& ip, const DNSName& domain, int type, bool doTCP, bool sendRDQuery, int EDNS0Level, struct timeval* now, boost::optional<Netmask>& srcmask, boost::optional<const ResolveContext&> context, const std::shared_ptr<std::vector<std::shared_ptr<RemoteLogger>>>& outgoingLoggers, const std::set<uint16_t>& exportTypes, LWResult *lwr, bool* chained)
{
size_t len;
size_t bufsize=g_outgoingEDNSBufsize;
boost::uuids::uuid uuid;
const struct timeval queryTime = *now;
- if (outgoingLogger) {
+ if (outgoingLoggers) {
uuid = (*t_uuidGenerator)();
- logOutgoingQuery(outgoingLogger, context ? context->d_initialRequestId : boost::none, uuid, ip, domain, type, qid, doTCP, vpacket.size(), srcmask);
+ logOutgoingQuery(outgoingLoggers, context ? context->d_initialRequestId : boost::none, uuid, ip, domain, type, qid, doTCP, vpacket.size(), srcmask);
}
#endif
if(mdp.d_header.rcode == RCode::FormErr && mdp.d_qname.empty() && mdp.d_qtype == 0 && mdp.d_qclass == 0) {
#ifdef HAVE_PROTOBUF
- if(outgoingLogger) {
- logIncomingResponse(outgoingLogger, context ? context->d_initialRequestId : boost::none, uuid, ip, domain, type, qid, doTCP, len, lwr->d_rcode, lwr->d_records, queryTime, exportTypes);
+ if(outgoingLoggers) {
+ logIncomingResponse(outgoingLoggers, context ? context->d_initialRequestId : boost::none, uuid, ip, domain, type, qid, doTCP, len, lwr->d_rcode, lwr->d_records, queryTime, exportTypes);
}
#endif
lwr->d_validpacket=true;
}
#ifdef HAVE_PROTOBUF
- if(outgoingLogger) {
- logIncomingResponse(outgoingLogger, context ? context->d_initialRequestId : boost::none, uuid, ip, domain, type, qid, doTCP, len, lwr->d_rcode, lwr->d_records, queryTime, exportTypes);
+ if(outgoingLoggers) {
+ logIncomingResponse(outgoingLoggers, context ? context->d_initialRequestId : boost::none, uuid, ip, domain, type, qid, doTCP, len, lwr->d_rcode, lwr->d_records, queryTime, exportTypes);
}
#endif
lwr->d_validpacket=true;
lwr->d_rcode = RCode::FormErr;
g_stats.serverParseError++;
#ifdef HAVE_PROTOBUF
- if(outgoingLogger) {
- logIncomingResponse(outgoingLogger, context ? context->d_initialRequestId : boost::none, uuid, ip, domain, type, qid, doTCP, len, lwr->d_rcode, lwr->d_records, queryTime, exportTypes);
+ if(outgoingLoggers) {
+ logIncomingResponse(outgoingLoggers, context ? context->d_initialRequestId : boost::none, uuid, ip, domain, type, qid, doTCP, len, lwr->d_rcode, lwr->d_records, queryTime, exportTypes);
}
#endif
lwr->d_validpacket=false;
bool d_haveEDNS{false};
};
-int asyncresolve(const ComboAddress& ip, const DNSName& domain, int type, bool doTCP, bool sendRDQuery, int EDNS0Level, struct timeval* now, boost::optional<Netmask>& srcmask, boost::optional<const ResolveContext&> context, const std::shared_ptr<RemoteLogger>& outgoingLogger, const std::set<uint16_t>& exportTypes, LWResult* res, bool* chained);
+int asyncresolve(const ComboAddress& ip, const DNSName& domain, int type, bool doTCP, bool sendRDQuery, int EDNS0Level, struct timeval* now, boost::optional<Netmask>& srcmask, boost::optional<const ResolveContext&> context, const std::shared_ptr<std::vector<std::shared_ptr<RemoteLogger>>>& outgoingLoggers, const std::set<uint16_t>& exportTypes, LWResult* res, bool* chained);
#endif // PDNS_LWRES_HH
static thread_local std::shared_ptr<Regex> t_traceRegex;
static thread_local std::unique_ptr<tcpClientCounts_t> t_tcpClientCounts;
#ifdef HAVE_PROTOBUF
-static thread_local std::shared_ptr<RemoteLogger> t_protobufServer{nullptr};
-static thread_local std::shared_ptr<RemoteLogger> t_outgoingProtobufServer{nullptr};
+static thread_local std::shared_ptr<std::vector<std::shared_ptr<RemoteLogger>>> t_protobufServers{nullptr};
+static thread_local uint64_t t_protobufServersGeneration;
+static thread_local std::shared_ptr<std::vector<std::shared_ptr<RemoteLogger>>> t_outgoingProtobufServers{nullptr};
+static thread_local uint64_t t_outgoingProtobufServersGeneration;
#endif /* HAVE_PROTOBUF */
thread_local std::unique_ptr<MT_t> MT; // the big MTasker
}
#ifdef HAVE_PROTOBUF
-static void protobufLogQuery(const std::shared_ptr<RemoteLogger>& logger, uint8_t maskV4, uint8_t maskV6, const boost::uuids::uuid& uniqueId, const ComboAddress& remote, const ComboAddress& local, const Netmask& ednssubnet, bool tcp, uint16_t id, size_t len, const DNSName& qname, uint16_t qtype, uint16_t qclass, const std::vector<std::string>& policyTags, const std::string& requestorId, const std::string& deviceId)
+static void protobufLogQuery(uint8_t maskV4, uint8_t maskV6, const boost::uuids::uuid& uniqueId, const ComboAddress& remote, const ComboAddress& local, const Netmask& ednssubnet, bool tcp, uint16_t id, size_t len, const DNSName& qname, uint16_t qtype, uint16_t qclass, const std::vector<std::string>& policyTags, const std::string& requestorId, const std::string& deviceId)
{
+ if (!t_protobufServers) {
+ return;
+ }
+
Netmask requestorNM(remote, remote.sin4.sin_family == AF_INET ? maskV4 : maskV6);
const ComboAddress& requestor = requestorNM.getMaskedNetwork();
RecProtoBufMessage message(DNSProtoBufMessage::Query, uniqueId, &requestor, &local, qname, qtype, qclass, id, tcp, len);
// cerr <<message.toDebugString()<<endl;
std::string str;
message.serialize(str);
- logger->queueData(str);
+
+ for (auto& server : *t_protobufServers) {
+ server->queueData(str);
+ }
}
-static void protobufLogResponse(const std::shared_ptr<RemoteLogger>& logger, const RecProtoBufMessage& message)
+static void protobufLogResponse(const RecProtoBufMessage& message)
{
+ if (!t_protobufServers) {
+ return;
+ }
+
// cerr <<message.toDebugString()<<endl;
std::string str;
message.serialize(str);
- logger->queueData(str);
+
+ for (auto& server : *t_protobufServers) {
+ server->queueData(str);
+ }
}
#endif
}
#ifdef HAVE_PROTOBUF
-static std::shared_ptr<RemoteLogger> startProtobufServer(const ProtobufExportConfig& config, uint64_t generation)
+static std::shared_ptr<std::vector<std::shared_ptr<RemoteLogger>>> startProtobufServers(const ProtobufExportConfig& config)
{
- std::shared_ptr<RemoteLogger> result = nullptr;
- try {
- result = std::make_shared<RemoteLogger>(config.server, config.timeout, config.maxQueuedEntries, config.reconnectWaitTime, config.asyncConnect);
- result->setGeneration(generation);
- }
- catch(const std::exception& e) {
- g_log<<Logger::Error<<"Error while starting protobuf logger to '"<<config.server<<": "<<e.what()<<endl;
- }
- catch(const PDNSException& e) {
- g_log<<Logger::Error<<"Error while starting protobuf logger to '"<<config.server<<": "<<e.reason<<endl;
+ auto result = std::make_shared<std::vector<std::shared_ptr<RemoteLogger>>>();
+
+ for (const auto& server : config.servers) {
+ try {
+ result->push_back(std::make_shared<RemoteLogger>(server, config.timeout, config.maxQueuedEntries, config.reconnectWaitTime, config.asyncConnect));
+ }
+ catch(const std::exception& e) {
+ g_log<<Logger::Error<<"Error while starting protobuf logger to '"<<server<<": "<<e.what()<<endl;
+ }
+ catch(const PDNSException& e) {
+ g_log<<Logger::Error<<"Error while starting protobuf logger to '"<<server<<": "<<e.reason<<endl;
+ }
}
return result;
static bool checkProtobufExport(LocalStateHolder<LuaConfigItems>& luaconfsLocal)
{
if (!luaconfsLocal->protobufExportConfig.enabled) {
- if (t_protobufServer != nullptr) {
- t_protobufServer->stop();
- t_protobufServer = nullptr;
+ if (t_protobufServers) {
+ for (auto& server : *t_protobufServers) {
+ server->stop();
+ }
+ t_protobufServers.reset();
}
return false;
/* if the server was not running, or if it was running according to a
previous configuration */
- if (t_protobufServer == nullptr ||
- t_protobufServer->getGeneration() < luaconfsLocal->generation) {
+ if (!t_protobufServers ||
+ t_protobufServersGeneration < luaconfsLocal->generation) {
- if (t_protobufServer) {
- t_protobufServer->stop();
+ if (t_protobufServers) {
+ for (auto& server : *t_protobufServers) {
+ server->stop();
+ }
}
+ t_protobufServers.reset();
- t_protobufServer = startProtobufServer(luaconfsLocal->protobufExportConfig, luaconfsLocal->generation);
+ t_protobufServers = startProtobufServers(luaconfsLocal->protobufExportConfig);
+ t_protobufServersGeneration = luaconfsLocal->generation;
}
return true;
static bool checkOutgoingProtobufExport(LocalStateHolder<LuaConfigItems>& luaconfsLocal)
{
if (!luaconfsLocal->outgoingProtobufExportConfig.enabled) {
- if (t_outgoingProtobufServer != nullptr) {
- t_outgoingProtobufServer->stop();
- t_outgoingProtobufServer = nullptr;
+ if (t_outgoingProtobufServers) {
+ for (auto& server : *t_outgoingProtobufServers) {
+ server->stop();
+ }
}
+ t_outgoingProtobufServers.reset();
return false;
}
/* if the server was not running, or if it was running according to a
previous configuration */
- if (t_outgoingProtobufServer == nullptr ||
- t_outgoingProtobufServer->getGeneration() < luaconfsLocal->generation) {
+ if (!t_outgoingProtobufServers ||
+ t_outgoingProtobufServersGeneration < luaconfsLocal->generation) {
- if (t_outgoingProtobufServer) {
- t_outgoingProtobufServer->stop();
+ if (t_outgoingProtobufServers) {
+ for (auto& server : *t_outgoingProtobufServers) {
+ server->stop();
+ }
}
+ t_outgoingProtobufServers.reset();
- t_outgoingProtobufServer = startProtobufServer(luaconfsLocal->outgoingProtobufExportConfig, luaconfsLocal->generation);
+ t_outgoingProtobufServers = startProtobufServers(luaconfsLocal->outgoingProtobufExportConfig);
+ t_outgoingProtobufServersGeneration = luaconfsLocal->generation;
}
return true;
bool logResponse = false;
#ifdef HAVE_PROTOBUF
if (checkProtobufExport(luaconfsLocal)) {
- logResponse = t_protobufServer && luaconfsLocal->protobufExportConfig.logResponses;
+ logResponse = t_protobufServers && luaconfsLocal->protobufExportConfig.logResponses;
Netmask requestorNM(dc->d_source, dc->d_source.sin4.sin_family == AF_INET ? luaconfsLocal->protobufMaskV4 : luaconfsLocal->protobufMaskV6);
const ComboAddress& requestor = requestorNM.getMaskedNetwork();
pbMessage = RecProtoBufMessage(RecProtoBufMessage::Response, dc->d_uuid, &requestor, &dc->d_destination, dc->d_mdp.d_qname, dc->d_mdp.d_qtype, dc->d_mdp.d_qclass, dc->d_mdp.d_header.id, dc->d_tcp, 0);
#ifdef HAVE_PROTOBUF
sr.setInitialRequestId(dc->d_uuid);
- sr.setOutgoingProtobufServer(t_outgoingProtobufServer);
+ sr.setOutgoingProtobufServers(t_outgoingProtobufServers);
#endif
sr.setQuerySource(dc->d_remote, g_useIncomingECS && !dc->d_ednssubnet.source.empty() ? boost::optional<const EDNSSubnetOpts&>(dc->d_ednssubnet) : boost::none);
#endif /* NOD ENABLED */
#ifdef HAVE_PROTOBUF
- if(t_protobufServer) {
+ if (t_protobufServers) {
#ifdef NOD_ENABLED
pbMessage->addRR(*i, luaconfsLocal->protobufExportConfig.exportTypes, udr);
#else
}
#endif /* NOD_ENABLED */
#ifdef HAVE_PROTOBUF
- if (t_protobufServer && logResponse && !(luaconfsLocal->protobufExportConfig.taggedOnly && (!appliedPolicy.d_name || appliedPolicy.d_name->empty()) && dc->d_policyTags.empty())) {
+ if (t_protobufServers && logResponse && !(luaconfsLocal->protobufExportConfig.taggedOnly && (!appliedPolicy.d_name || appliedPolicy.d_name->empty()) && dc->d_policyTags.empty())) {
pbMessage->setBytes(packet.size());
pbMessage->setResponseCode(pw.getHeader()->rcode);
if (appliedPolicy.d_name) {
}
}
#endif /* NOD_ENABLED */
- protobufLogResponse(t_protobufServer, *pbMessage);
+ protobufLogResponse(*pbMessage);
#ifdef NOD_ENABLED
if (g_nodEnabled) {
pbMessage->setNOD(false);
if (checkProtobufExport(luaconfsLocal)) {
needECS = true;
}
- logQuery = t_protobufServer && luaconfsLocal->protobufExportConfig.logQueries;
+ logQuery = t_protobufServers && luaconfsLocal->protobufExportConfig.logQueries;
#endif
if(needECS || needXPF || (t_pdl && (t_pdl->d_gettag_ffi || t_pdl->d_gettag))) {
const struct dnsheader* dh = reinterpret_cast<const struct dnsheader*>(&conn->data[0]);
#ifdef HAVE_PROTOBUF
- if(t_protobufServer || t_outgoingProtobufServer) {
+ if(t_protobufServers || t_outgoingProtobufServers) {
dc->d_requestorId = requestorId;
dc->d_deviceId = deviceId;
dc->d_uuid = (*t_uuidGenerator)();
}
- if(t_protobufServer) {
+ if(t_protobufServers) {
try {
if (logQuery && !(luaconfsLocal->protobufExportConfig.taggedOnly && dc->d_policyTags.empty())) {
- protobufLogQuery(t_protobufServer, luaconfsLocal->protobufMaskV4, luaconfsLocal->protobufMaskV6, dc->d_uuid, dc->d_source, dc->d_destination, dc->d_ednssubnet.source, true, dh->id, conn->qlen, qname, qtype, qclass, dc->d_policyTags, dc->d_requestorId, dc->d_deviceId);
+ protobufLogQuery(luaconfsLocal->protobufMaskV4, luaconfsLocal->protobufMaskV6, dc->d_uuid, dc->d_source, dc->d_destination, dc->d_ednssubnet.source, true, dh->id, conn->qlen, qname, qtype, qclass, dc->d_policyTags, dc->d_requestorId, dc->d_deviceId);
}
}
catch(std::exception& e) {
} else if (checkOutgoingProtobufExport(luaconfsLocal)) {
uniqueId = (*t_uuidGenerator)();
}
- logQuery = t_protobufServer && luaconfsLocal->protobufExportConfig.logQueries;
- bool logResponse = t_protobufServer && luaconfsLocal->protobufExportConfig.logResponses;
+ logQuery = t_protobufServers && luaconfsLocal->protobufExportConfig.logQueries;
+ bool logResponse = t_protobufServers && luaconfsLocal->protobufExportConfig.logResponses;
#endif
EDNSSubnetOpts ednssubnet;
bool ecsFound = false;
bool cacheHit = false;
boost::optional<RecProtoBufMessage> pbMessage(boost::none);
#ifdef HAVE_PROTOBUF
- if(t_protobufServer) {
+ if (t_protobufServers) {
pbMessage = RecProtoBufMessage(DNSProtoBufMessage::DNSProtoBufMessageType::Response);
pbMessage->setServerIdentity(SyncRes::s_serverID);
if (logQuery && !(luaconfsLocal->protobufExportConfig.taggedOnly && policyTags.empty())) {
- protobufLogQuery(t_protobufServer, luaconfsLocal->protobufMaskV4, luaconfsLocal->protobufMaskV6, uniqueId, source, destination, ednssubnet.source, false, dh->id, question.size(), qname, qtype, qclass, policyTags, requestorId, deviceId);
+ protobufLogQuery(luaconfsLocal->protobufMaskV4, luaconfsLocal->protobufMaskV6, uniqueId, source, destination, ednssubnet.source, false, dh->id, question.size(), qname, qtype, qclass, policyTags, requestorId, deviceId);
}
}
#endif /* HAVE_PROTOBUF */
}
#ifdef HAVE_PROTOBUF
- if(t_protobufServer && logResponse && !(luaconfsLocal->protobufExportConfig.taggedOnly && pbMessage->getAppliedPolicy().empty() && pbMessage->getPolicyTags().empty())) {
+ if(t_protobufServers && logResponse && !(luaconfsLocal->protobufExportConfig.taggedOnly && pbMessage->getAppliedPolicy().empty() && pbMessage->getPolicyTags().empty())) {
Netmask requestorNM(source, source.sin4.sin_family == AF_INET ? luaconfsLocal->protobufMaskV4 : luaconfsLocal->protobufMaskV6);
const ComboAddress& requestor = requestorNM.getMaskedNetwork();
pbMessage->update(uniqueId, &requestor, &destination, false, dh->id);
pbMessage->setQueryTime(g_now.tv_sec, g_now.tv_usec);
pbMessage->setRequestorId(requestorId);
pbMessage->setDeviceId(deviceId);
- protobufLogResponse(t_protobufServer, *pbMessage);
+ protobufLogResponse(*pbMessage);
}
#endif /* HAVE_PROTOBUF */
if(!g_quiet)
dc->d_ttlCap = ttlCap;
dc->d_variable = variable;
#ifdef HAVE_PROTOBUF
- if (t_protobufServer || t_outgoingProtobufServer) {
+ if (t_protobufServers || t_outgoingProtobufServers) {
dc->d_uuid = std::move(uniqueId);
}
dc->d_requestorId = requestorId;
lci.protobufMaskV6 = maskV6;
});
- Lua.writeFunction("protobufServer", [&lci](const string& server_, boost::optional<protobufOptions_t> vars) {
- try {
+ Lua.writeFunction("protobufServer", [&lci](boost::variant<const std::string, const std::unordered_map<int, std::string>> servers, boost::optional<protobufOptions_t> vars) {
if (!lci.protobufExportConfig.enabled) {
+
lci.protobufExportConfig.enabled = true;
- lci.protobufExportConfig.server = ComboAddress(server_);
- parseProtobufOptions(vars, lci.protobufExportConfig);
+
+ try {
+ if (servers.type() == typeid(std::string)) {
+ auto server = boost::get<const std::string>(servers);
+
+ lci.protobufExportConfig.servers.emplace_back(server);
+ }
+ else {
+ auto serversMap = boost::get<const std::unordered_map<int,std::string>>(servers);
+ for (const auto& serverPair : serversMap) {
+ lci.protobufExportConfig.servers.emplace_back(serverPair.second);
+ }
+ }
+
+ parseProtobufOptions(vars, lci.protobufExportConfig);
+ }
+ catch(std::exception& e) {
+ g_log<<Logger::Error<<"Error while adding protobuf logger: "<<e.what()<<endl;
+ }
+ catch(PDNSException& e) {
+ g_log<<Logger::Error<<"Error while adding protobuf logger: "<<e.reason<<endl;
+ }
}
else {
- g_log<<Logger::Error<<"Only one protobuf server can be configured, we already have "<<lci.protobufExportConfig.server.toString()<<endl;
+ g_log<<Logger::Error<<"Only one protobufServer() directive can be configured, we already have "<<lci.protobufExportConfig.servers.at(0).toString()<<endl;
}
- }
- catch(std::exception& e) {
- g_log<<Logger::Error<<"Error while starting protobuf logger to '"<<server_<<": "<<e.what()<<endl;
- }
- catch(PDNSException& e) {
- g_log<<Logger::Error<<"Error while starting protobuf logger to '"<<server_<<": "<<e.reason<<endl;
- }
});
- Lua.writeFunction("outgoingProtobufServer", [&lci](const string& server_, boost::optional<protobufOptions_t> vars) {
- try {
- if (!lci.outgoingProtobufExportConfig.enabled) {
- lci.outgoingProtobufExportConfig.enabled = true;
- lci.outgoingProtobufExportConfig.server = ComboAddress(server_);
- parseProtobufOptions(vars, lci.outgoingProtobufExportConfig);
- }
- else {
- g_log<<Logger::Error<<"Only one protobuf server can be configured, we already have "<<lci.outgoingProtobufExportConfig.server.toString()<<endl;
- }
- }
- catch(std::exception& e) {
- g_log<<Logger::Error<<"Error while starting protobuf logger to '"<<server_<<": "<<e.what()<<endl;
+ Lua.writeFunction("outgoingProtobufServer", [&lci](boost::variant<const std::string, const std::unordered_map<int, std::string>> servers, boost::optional<protobufOptions_t> vars) {
+ if (!lci.outgoingProtobufExportConfig.enabled) {
+
+ lci.outgoingProtobufExportConfig.enabled = true;
+
+ try {
+ if (servers.type() == typeid(std::string)) {
+ auto server = boost::get<const std::string>(servers);
+
+ lci.outgoingProtobufExportConfig.servers.emplace_back(server);
+ }
+ else {
+ auto serversMap = boost::get<const std::unordered_map<int,std::string>>(servers);
+ for (const auto& serverPair : serversMap) {
+ lci.outgoingProtobufExportConfig.servers.emplace_back(serverPair.second);
+ }
+ }
+
+ parseProtobufOptions(vars, lci.outgoingProtobufExportConfig);
+ }
+ catch(std::exception& e) {
+ g_log<<Logger::Error<<"Error while starting outgoing protobuf logger: "<<e.what()<<endl;
+ }
+ catch(PDNSException& e) {
+ g_log<<Logger::Error<<"Error while starting outgoing protobuf logger: "<<e.reason<<endl;
+ }
}
- catch(PDNSException& e) {
- g_log<<Logger::Error<<"Error while starting protobuf logger to '"<<server_<<": "<<e.reason<<endl;
+ else {
+ g_log<<Logger::Error<<"Only one outgoingProtobufServer() directive can be configured, we already have "<<lci.outgoingProtobufExportConfig.servers.at(0).toString()<<endl;
}
});
#endif
struct ProtobufExportConfig
{
std::set<uint16_t> exportTypes = { QType::A, QType::AAAA, QType::CNAME };
- ComboAddress server;
+ std::vector<ComboAddress> servers;
uint64_t maxQueuedEntries{100};
uint16_t timeout{2};
uint16_t reconnectWaitTime{1};
--------------------------------
Protobuf export to a server is enabled using the ``protobufServer()`` directive:
-.. function:: protobufServer(server [, options]))
+.. function:: protobufServer(servers [, options]))
.. versionadded:: 4.2.0
- Send protocol buffer messages to a server for incoming queries and/or outgoing responses. The client address may be masked using :func:`setProtobufMasks`, for anonymization purposes.
+ Send protocol buffer messages to one or more servers for incoming queries and/or outgoing responses. The client address may be masked using :func:`setProtobufMasks`, for anonymization purposes.
- :param string server: The IP and port to connect to
+ :param string or list of strings servers: The IP and port to connect to, or a list of those. If more than one server is configured, all messages are sent to every server.
:param table options: A table with key: value pairs with options.
Options:
While :func:`protobufServer` only exports the queries sent to the recursor from clients, with the corresponding responses, ``outgoingProtobufServer()`` can be used to export outgoing queries sent by the recursor to authoritative servers, along with the corresponding responses.
-.. function:: outgoingProtobufServer(server [, options])
+.. function:: outgoingProtobufServer(servers [, options])
.. versionadded:: 4.2.0
- Send protocol buffer messages to a server for outgoing queries and/or incoming responses.
+ Send protocol buffer messages to one or more servers for outgoing queries and/or incoming responses.
- :param string server: The IP and port to connect to
+ :param string or list of strings servers: The IP and port to connect to, or a list of those. If more than one server is configured, all messages are sent to every server.
:param table options: A table with key: value pairs with options.
Options:
return false;
}
-int asyncresolve(const ComboAddress& ip, const DNSName& domain, int type, bool doTCP, bool sendRDQuery, int EDNS0Level, struct timeval* now, boost::optional<Netmask>& srcmask, boost::optional<const ResolveContext&> context, const std::shared_ptr<RemoteLogger>& outgoingLogger, const std::set<uint16_t>& exportTypes, LWResult* res, bool* chained)
+int asyncresolve(const ComboAddress& ip, const DNSName& domain, int type, bool doTCP, bool sendRDQuery, int EDNS0Level, struct timeval* now, boost::optional<Netmask>& srcmask, boost::optional<const ResolveContext&> context, const std::shared_ptr<std::vector<std::shared_ptr<RemoteLogger>>>& outgoingLoggers, const std::set<uint16_t>& exportTypes, LWResult* res, bool* chained)
{
return 0;
}
{
d_exiting = true;
}
- uint64_t getGeneration() const
- {
- return d_generation;
- }
- void setGeneration(uint64_t newGeneration)
- {
- d_generation = newGeneration;
- }
private:
void busyReconnectLoop();
bool reconnect();
std::condition_variable d_queueCond;
ComboAddress d_remote;
uint64_t d_maxQueuedEntries;
- uint64_t d_generation{0};
int d_socket{-1};
uint16_t d_timeout;
uint8_t d_reconnectWaitTime;
ret = d_asyncResolve(ip, sendQname, type, doTCP, sendRDQuery, EDNSLevel, now, srcmask, ctx, res, chained);
}
else {
- ret=asyncresolve(ip, sendQname, type, doTCP, sendRDQuery, EDNSLevel, now, srcmask, ctx, d_outgoingProtobufServer, luaconfsLocal->outgoingProtobufExportConfig.exportTypes, res, chained);
+ ret=asyncresolve(ip, sendQname, type, doTCP, sendRDQuery, EDNSLevel, now, srcmask, ctx, d_outgoingProtobufServers, luaconfsLocal->outgoingProtobufExportConfig.exportTypes, res, chained);
}
if(ret < 0) {
return ret; // transport error, nothing to learn here
d_initialRequestId = initialRequestId;
}
- void setOutgoingProtobufServer(std::shared_ptr<RemoteLogger>& server)
+ void setOutgoingProtobufServers(std::shared_ptr<std::vector<std::shared_ptr<RemoteLogger>>>& servers)
{
- d_outgoingProtobufServer = server;
+ d_outgoingProtobufServers = servers;
}
#endif
ostringstream d_trace;
shared_ptr<RecursorLua4> d_pdl;
boost::optional<Netmask> d_outgoingECSNetwork;
- std::shared_ptr<RemoteLogger> d_outgoingProtobufServer{nullptr};
+ std::shared_ptr<std::vector<std::shared_ptr<RemoteLogger>>> d_outgoingProtobufServers{nullptr};
#ifdef HAVE_PROTOBUF
boost::optional<const boost::uuids::uuid&> d_initialRequestId;
#endif
from recursortests import RecursorTest
-protobufQueue = Queue()
-protobufServerPort = 4243
-
def ProtobufConnectionHandler(queue, conn):
data = None
while True:
conn.close()
-def ProtobufListener(port):
- global protobufQueue
+def ProtobufListener(queue, port):
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1)
try:
(conn, _) = sock.accept()
thread = threading.Thread(name='Connection Handler',
target=ProtobufConnectionHandler,
- args=[protobufQueue, conn])
+ args=[queue, conn])
thread.setDaemon(True)
thread.start()
sock.close()
-protobufListener = threading.Thread(name='Protobuf Listener', target=ProtobufListener, args=[protobufServerPort])
-protobufListener.setDaemon(True)
-protobufListener.start()
+class ProtobufServerParams:
+ def __init__(self, port):
+ self.queue = Queue()
+ self.port = port
+
+protobufServersParameters = [ProtobufServerParams(4243), ProtobufServerParams(4244)]
+protobufListeners = []
+for param in protobufServersParameters:
+ listener = threading.Thread(name='Protobuf Listener', target=ProtobufListener, args=[param.queue, param.port])
+ listener.setDaemon(True)
+ listener.start()
+ protobufListeners.append(listener)
class TestRecursorProtobuf(RecursorTest):
- global protobufServerPort
_lua_config_file = """
- protobufServer("127.0.0.1:%d")
- """ % (protobufServerPort)
+ protobufServer({"127.0.0.1:%d", "127.0.0.1:%d"})
+ """ % (protobufServersParameters[0].port, protobufServersParameters[1].port)
def getFirstProtobufMessage(self, retries=1, waitTime=1):
- global protobufQueue
- failed = 0
-
- while protobufQueue.empty:
- if failed >= retries:
- break
-
- failed = failed + 1
- time.sleep(waitTime)
+ msg = None
+
+ print("in getFirstProtobufMessage")
+ for param in protobufServersParameters:
+ print(param.port)
+ failed = 0
+
+ while param.queue.empty:
+ print(failed)
+ print(retries)
+ if failed >= retries:
+ break
+
+ failed = failed + 1
+ print("waiting")
+ time.sleep(waitTime)
+
+ self.assertFalse(param.queue.empty())
+ data = param.queue.get(False)
+ self.assertTrue(data)
+ oldmsg = msg
+ msg = dnsmessage_pb2.PBDNSMessage()
+ msg.ParseFromString(data)
+ if oldmsg is not None:
+ self.assertEquals(msg, oldmsg)
- self.assertFalse(protobufQueue.empty())
- data = protobufQueue.get(False)
- self.assertTrue(data)
- msg = dnsmessage_pb2.PBDNSMessage()
- msg.ParseFromString(data)
return msg
def checkNoRemainingMessage(self):
- global protobufQueue
- self.assertTrue(protobufQueue.empty())
+ for param in protobufServersParameters:
+ self.assertTrue(param.queue.empty())
def checkProtobufBase(self, msg, protocol, query, initiator, normalQueryResponse=True, expectedECS=None, receivedSize=None):
self.assertTrue(msg)
@classmethod
def setUpClass(cls):
- global protobufListener
- global protobufServerPort
- global ProtobufListener
- if protobufListener is None or not protobufListener.isAlive():
- protobufListener = threading.Thread(name='Protobuf Listener', target=ProtobufListener, args=[protobufServerPort])
- protobufListener.setDaemon(True)
- protobufListener.start()
-
cls.setUpSockets()
cls.startResponders()
def setUp(self):
# Make sure the queue is empty, in case
# a previous test failed
- global protobufQueue
- while not protobufQueue.empty():
- protobufQueue.get(False)
+ for param in protobufServersParameters:
+ while not param.queue.empty():
+ param.queue.get(False)
@classmethod
def generateRecursorConfig(cls, confdir):
_config_template = """
auth-zones=example=configs/%s/example.zone""" % _confdir
_lua_config_file = """
- outgoingProtobufServer("127.0.0.1:%d")
- """ % (protobufServerPort)
+ outgoingProtobufServer({"127.0.0.1:%d", "127.0.0.1:%d"})
+ """ % (protobufServersParameters[0].port, protobufServersParameters[1].port)
def testA(self):
name = 'www.example.org.'
_confdir = 'ProtobufMasks'
_config_template = """
auth-zones=example=configs/%s/example.zone""" % _confdir
- global protobufServerPort
_protobufMaskV4 = 4
_protobufMaskV6 = 128
_lua_config_file = """
- protobufServer("127.0.0.1:%d")
+ protobufServer({"127.0.0.1:%d", "127.0.0.1:%d"})
setProtobufMasks(%d, %d)
- """ % (protobufServerPort, _protobufMaskV4, _protobufMaskV6)
+ """ % (protobufServersParameters[0].port, protobufServersParameters[1].port, _protobufMaskV4, _protobufMaskV6)
def testA(self):
name = 'a.example.'
_confdir = 'ProtobufQueriesOnly'
_config_template = """
auth-zones=example=configs/%s/example.zone""" % _confdir
- global protobufServerPort
_lua_config_file = """
- protobufServer("127.0.0.1:%d", { logQueries=true, logResponses=false } )
- """ % (protobufServerPort)
+ protobufServer({"127.0.0.1:%d", "127.0.0.1:%d"}, { logQueries=true, logResponses=false } )
+ """ % (protobufServersParameters[0].port, protobufServersParameters[1].port)
def testA(self):
name = 'a.example.'
_confdir = 'ProtobufResponsesOnly'
_config_template = """
auth-zones=example=configs/%s/example.zone""" % _confdir
- global protobufServerPort
_lua_config_file = """
- protobufServer("127.0.0.1:%d", { logQueries=false, logResponses=true } )
- """ % (protobufServerPort)
+ protobufServer({"127.0.0.1:%d", "127.0.0.1:%d"}, { logQueries=false, logResponses=true } )
+ """ % (protobufServersParameters[0].port, protobufServersParameters[1].port)
def testA(self):
name = 'a.example.'
_confdir = 'ProtobufTaggedOnly'
_config_template = """
auth-zones=example=configs/%s/example.zone""" % _confdir
- global protobufServerPort
_lua_config_file = """
- protobufServer("127.0.0.1:%d", { logQueries=true, logResponses=true, taggedOnly=true } )
- """ % (protobufServerPort)
+ protobufServer({"127.0.0.1:%d", "127.0.0.1:%d"}, { logQueries=true, logResponses=true, taggedOnly=true } )
+ """ % (protobufServersParameters[0].port, protobufServersParameters[1].port)
_tags = ['tag1', 'tag2']
_tag_from_gettag = 'tag-from-gettag'
_lua_dns_script_file = """
_confdir = 'ProtobufSelectedFromLua'
_config_template = """
auth-zones=example=configs/%s/example.zone""" % _confdir
- global protobufServerPort
_lua_config_file = """
- protobufServer("127.0.0.1:%d", { logQueries=false, logResponses=false } )
- """ % (protobufServerPort)
+ protobufServer({"127.0.0.1:%d", "127.0.0.1:%d"}, { logQueries=false, logResponses=false } )
+ """ % (protobufServersParameters[0].port, protobufServersParameters[1].port)
_lua_dns_script_file = """
local ffi = require("ffi")
_confdir = 'ProtobufExportTypes'
_config_template = """
auth-zones=example=configs/%s/example.zone""" % _confdir
- global protobufServerPort
_lua_config_file = """
- protobufServer("127.0.0.1:%d", { exportTypes={"AAAA", "MX", "SPF", "SRV", "TXT"} } )
- """ % (protobufServerPort)
+ protobufServer({"127.0.0.1:%d", "127.0.0.1:%d"}, { exportTypes={"AAAA", "MX", "SPF", "SRV", "TXT"} } )
+ """ % (protobufServersParameters[0].port, protobufServersParameters[1].port)
def testA(self):
name = 'types.example.'
print('Error in RPZ socket: %s' % str(e))
sock.close()
-rpzServerPort = 4244
+rpzServerPort = 4250
rpzServer = RPZServer(rpzServerPort)
class RPZRecursorTest(RecursorTest):