]> git.ipfire.org Git - thirdparty/pdns.git/commitdiff
rec: Add support for more than one protobuf server
authorRemi Gacogne <remi.gacogne@powerdns.com>
Mon, 22 Oct 2018 07:53:27 +0000 (09:53 +0200)
committerRemi Gacogne <remi.gacogne@powerdns.com>
Wed, 31 Oct 2018 09:38:15 +0000 (10:38 +0100)
12 files changed:
pdns/lwres.cc
pdns/lwres.hh
pdns/pdns_recursor.cc
pdns/rec-lua-conf.cc
pdns/rec-lua-conf.hh
pdns/recursordist/docs/lua-config/protobuf.rst
pdns/recursordist/test-syncres_cc.cc
pdns/remote_logger.hh
pdns/syncres.cc
pdns/syncres.hh
regression-tests.recursor-dnssec/test_Protobuf.py
regression-tests.recursor-dnssec/test_RPZ.py

index dd561f19e4fc5715bb7de65467f02a220dbec270..540823abdfd6c29bc705b0fa6a5d3a10c09ca9f2 100644 (file)
@@ -50,9 +50,9 @@
 
 #ifdef HAVE_PROTOBUF
 
-static void logOutgoingQuery(std::shared_ptr<RemoteLogger> outgoingLogger, boost::optional<const boost::uuids::uuid&> initialRequestId, const boost::uuids::uuid& uuid, const ComboAddress& ip, const DNSName& domain, int type, uint16_t qid, bool doTCP, size_t bytes, boost::optional<Netmask>& srcmask)
+static void logOutgoingQuery(const std::shared_ptr<std::vector<std::shared_ptr<RemoteLogger>>>& outgoingLoggers, boost::optional<const boost::uuids::uuid&> initialRequestId, const boost::uuids::uuid& uuid, const ComboAddress& ip, const DNSName& domain, int type, uint16_t qid, bool doTCP, size_t bytes, boost::optional<Netmask>& srcmask)
 {
-  if(!outgoingLogger)
+  if(!outgoingLoggers)
     return;
 
   RecProtoBufMessage message(DNSProtoBufMessage::OutgoingQuery, uuid, nullptr, &ip, domain, type, QClass::IN, qid, doTCP, bytes);
@@ -69,12 +69,15 @@ static void logOutgoingQuery(std::shared_ptr<RemoteLogger> outgoingLogger, boost
 //  cerr <<message.toDebugString()<<endl;
   std::string str;
   message.serialize(str);
-  outgoingLogger->queueData(str);
+
+  for (auto& logger : *outgoingLoggers) {
+    logger->queueData(str);
+  }
 }
 
-static void logIncomingResponse(std::shared_ptr<RemoteLogger> outgoingLogger, boost::optional<const boost::uuids::uuid&> initialRequestId, const boost::uuids::uuid& uuid, const ComboAddress& ip, const DNSName& domain, int type, uint16_t qid, bool doTCP, size_t bytes, int rcode, const std::vector<DNSRecord>& records, const struct timeval& queryTime, const std::set<uint16_t>& exportTypes)
+static void logIncomingResponse(const std::shared_ptr<std::vector<std::shared_ptr<RemoteLogger>>>& outgoingLoggers, boost::optional<const boost::uuids::uuid&> initialRequestId, const boost::uuids::uuid& uuid, const ComboAddress& ip, const DNSName& domain, int type, uint16_t qid, bool doTCP, size_t bytes, int rcode, const std::vector<DNSRecord>& records, const struct timeval& queryTime, const std::set<uint16_t>& exportTypes)
 {
-  if(!outgoingLogger)
+  if(!outgoingLoggers)
     return;
 
   RecProtoBufMessage message(DNSProtoBufMessage::IncomingResponse, uuid, nullptr, &ip, domain, type, QClass::IN, qid, doTCP, bytes);
@@ -89,7 +92,10 @@ static void logIncomingResponse(std::shared_ptr<RemoteLogger> outgoingLogger, bo
 //  cerr <<message.toDebugString()<<endl;
   std::string str;
   message.serialize(str);
-  outgoingLogger->queueData(str);
+
+  for (auto& logger : *outgoingLoggers) {
+    logger->queueData(str);
+  }
 }
 #endif /* HAVE_PROTOBUF */
 
@@ -97,7 +103,7 @@ static void logIncomingResponse(std::shared_ptr<RemoteLogger> outgoingLogger, bo
 /** lwr is only filled out in case 1 was returned, and even when returning 1 for 'success', lwr might contain DNS errors
     Never throws! 
  */
-int asyncresolve(const ComboAddress& ip, const DNSName& domain, int type, bool doTCP, bool sendRDQuery, int EDNS0Level, struct timeval* now, boost::optional<Netmask>& srcmask, boost::optional<const ResolveContext&> context, const std::shared_ptr<RemoteLogger>& outgoingLogger, const std::set<uint16_t>& exportTypes, LWResult *lwr, bool* chained)
+int asyncresolve(const ComboAddress& ip, const DNSName& domain, int type, bool doTCP, bool sendRDQuery, int EDNS0Level, struct timeval* now, boost::optional<Netmask>& srcmask, boost::optional<const ResolveContext&> context, const std::shared_ptr<std::vector<std::shared_ptr<RemoteLogger>>>& outgoingLoggers, const std::set<uint16_t>& exportTypes, LWResult *lwr, bool* chained)
 {
   size_t len;
   size_t bufsize=g_outgoingEDNSBufsize;
@@ -153,9 +159,9 @@ int asyncresolve(const ComboAddress& ip, const DNSName& domain, int type, bool d
   boost::uuids::uuid uuid;
   const struct timeval queryTime = *now;
 
-  if (outgoingLogger) {
+  if (outgoingLoggers) {
     uuid = (*t_uuidGenerator)();
-    logOutgoingQuery(outgoingLogger, context ? context->d_initialRequestId : boost::none, uuid, ip, domain, type, qid, doTCP, vpacket.size(), srcmask);
+    logOutgoingQuery(outgoingLoggers, context ? context->d_initialRequestId : boost::none, uuid, ip, domain, type, qid, doTCP, vpacket.size(), srcmask);
   }
 #endif
 
@@ -241,8 +247,8 @@ int asyncresolve(const ComboAddress& ip, const DNSName& domain, int type, bool d
     
     if(mdp.d_header.rcode == RCode::FormErr && mdp.d_qname.empty() && mdp.d_qtype == 0 && mdp.d_qclass == 0) {
 #ifdef HAVE_PROTOBUF
-      if(outgoingLogger) {
-        logIncomingResponse(outgoingLogger, context ? context->d_initialRequestId : boost::none, uuid, ip, domain, type, qid, doTCP, len, lwr->d_rcode, lwr->d_records, queryTime, exportTypes);
+      if(outgoingLoggers) {
+        logIncomingResponse(outgoingLoggers, context ? context->d_initialRequestId : boost::none, uuid, ip, domain, type, qid, doTCP, len, lwr->d_rcode, lwr->d_records, queryTime, exportTypes);
       }
 #endif
       lwr->d_validpacket=true;
@@ -287,8 +293,8 @@ int asyncresolve(const ComboAddress& ip, const DNSName& domain, int type, bool d
     }
         
 #ifdef HAVE_PROTOBUF
-    if(outgoingLogger) {
-      logIncomingResponse(outgoingLogger, context ? context->d_initialRequestId : boost::none, uuid, ip, domain, type, qid, doTCP, len, lwr->d_rcode, lwr->d_records, queryTime, exportTypes);
+    if(outgoingLoggers) {
+      logIncomingResponse(outgoingLoggers, context ? context->d_initialRequestId : boost::none, uuid, ip, domain, type, qid, doTCP, len, lwr->d_rcode, lwr->d_records, queryTime, exportTypes);
     }
 #endif
     lwr->d_validpacket=true;
@@ -300,8 +306,8 @@ int asyncresolve(const ComboAddress& ip, const DNSName& domain, int type, bool d
     lwr->d_rcode = RCode::FormErr;
     g_stats.serverParseError++;
 #ifdef HAVE_PROTOBUF
-    if(outgoingLogger) {
-      logIncomingResponse(outgoingLogger, context ? context->d_initialRequestId : boost::none, uuid, ip, domain, type, qid, doTCP, len, lwr->d_rcode, lwr->d_records, queryTime, exportTypes);
+    if(outgoingLoggers) {
+      logIncomingResponse(outgoingLoggers, context ? context->d_initialRequestId : boost::none, uuid, ip, domain, type, qid, doTCP, len, lwr->d_rcode, lwr->d_records, queryTime, exportTypes);
     }
 #endif
     lwr->d_validpacket=false;
index c9dab346c1938d2d63d8aeef0220d9b3f9f199f9..a548206bf93b2b7964205cc8349fb5d234bb9952 100644 (file)
@@ -68,5 +68,5 @@ public:
   bool d_haveEDNS{false};
 };
 
-int asyncresolve(const ComboAddress& ip, const DNSName& domain, int type, bool doTCP, bool sendRDQuery, int EDNS0Level, struct timeval* now, boost::optional<Netmask>& srcmask, boost::optional<const ResolveContext&> context, const std::shared_ptr<RemoteLogger>& outgoingLogger, const std::set<uint16_t>& exportTypes, LWResult* res, bool* chained);
+int asyncresolve(const ComboAddress& ip, const DNSName& domain, int type, bool doTCP, bool sendRDQuery, int EDNS0Level, struct timeval* now, boost::optional<Netmask>& srcmask, boost::optional<const ResolveContext&> context, const std::shared_ptr<std::vector<std::shared_ptr<RemoteLogger>>>& outgoingLoggers, const std::set<uint16_t>& exportTypes, LWResult* res, bool* chained);
 #endif // PDNS_LWRES_HH
index 6360a6ba028e63be47dd2c621000f285f119a2cb..17f7051b6bcf62ccfa3b62be92ae0df5e510b491 100644 (file)
@@ -109,8 +109,10 @@ static thread_local unsigned int t_id = 0;
 static thread_local std::shared_ptr<Regex> t_traceRegex;
 static thread_local std::unique_ptr<tcpClientCounts_t> t_tcpClientCounts;
 #ifdef HAVE_PROTOBUF
-static thread_local std::shared_ptr<RemoteLogger> t_protobufServer{nullptr};
-static thread_local std::shared_ptr<RemoteLogger> t_outgoingProtobufServer{nullptr};
+static thread_local std::shared_ptr<std::vector<std::shared_ptr<RemoteLogger>>> t_protobufServers{nullptr};
+static thread_local uint64_t t_protobufServersGeneration;
+static thread_local std::shared_ptr<std::vector<std::shared_ptr<RemoteLogger>>> t_outgoingProtobufServers{nullptr};
+static thread_local uint64_t t_outgoingProtobufServersGeneration;
 #endif /* HAVE_PROTOBUF */
 
 thread_local std::unique_ptr<MT_t> MT; // the big MTasker
@@ -774,8 +776,12 @@ catch(...)
 }
 
 #ifdef HAVE_PROTOBUF
-static void protobufLogQuery(const std::shared_ptr<RemoteLogger>& logger, uint8_t maskV4, uint8_t maskV6, const boost::uuids::uuid& uniqueId, const ComboAddress& remote, const ComboAddress& local, const Netmask& ednssubnet, bool tcp, uint16_t id, size_t len, const DNSName& qname, uint16_t qtype, uint16_t qclass, const std::vector<std::string>& policyTags, const std::string& requestorId, const std::string& deviceId)
+static void protobufLogQuery(uint8_t maskV4, uint8_t maskV6, const boost::uuids::uuid& uniqueId, const ComboAddress& remote, const ComboAddress& local, const Netmask& ednssubnet, bool tcp, uint16_t id, size_t len, const DNSName& qname, uint16_t qtype, uint16_t qclass, const std::vector<std::string>& policyTags, const std::string& requestorId, const std::string& deviceId)
 {
+  if (!t_protobufServers) {
+    return;
+  }
+
   Netmask requestorNM(remote, remote.sin4.sin_family == AF_INET ? maskV4 : maskV6);
   const ComboAddress& requestor = requestorNM.getMaskedNetwork();
   RecProtoBufMessage message(DNSProtoBufMessage::Query, uniqueId, &requestor, &local, qname, qtype, qclass, id, tcp, len);
@@ -791,15 +797,25 @@ static void protobufLogQuery(const std::shared_ptr<RemoteLogger>& logger, uint8_
 //  cerr <<message.toDebugString()<<endl;
   std::string str;
   message.serialize(str);
-  logger->queueData(str);
+
+  for (auto& server : *t_protobufServers) {
+    server->queueData(str);
+  }
 }
 
-static void protobufLogResponse(const std::shared_ptr<RemoteLogger>& logger, const RecProtoBufMessage& message)
+static void protobufLogResponse(const RecProtoBufMessage& message)
 {
+  if (!t_protobufServers) {
+    return;
+  }
+
 //  cerr <<message.toDebugString()<<endl;
   std::string str;
   message.serialize(str);
-  logger->queueData(str);
+
+  for (auto& server : *t_protobufServers) {
+    server->queueData(str);
+  }
 }
 #endif
 
@@ -850,18 +866,20 @@ static bool addRecordToPacket(DNSPacketWriter& pw, const DNSRecord& rec, uint32_
 }
 
 #ifdef HAVE_PROTOBUF
-static std::shared_ptr<RemoteLogger> startProtobufServer(const ProtobufExportConfig& config, uint64_t generation)
+static std::shared_ptr<std::vector<std::shared_ptr<RemoteLogger>>> startProtobufServers(const ProtobufExportConfig& config)
 {
-  std::shared_ptr<RemoteLogger> result = nullptr;
-  try {
-    result = std::make_shared<RemoteLogger>(config.server, config.timeout, config.maxQueuedEntries, config.reconnectWaitTime, config.asyncConnect);
-    result->setGeneration(generation);
-  }
-  catch(const std::exception& e) {
-    g_log<<Logger::Error<<"Error while starting protobuf logger to '"<<config.server<<": "<<e.what()<<endl;
-  }
-  catch(const PDNSException& e) {
-    g_log<<Logger::Error<<"Error while starting protobuf logger to '"<<config.server<<": "<<e.reason<<endl;
+  auto result = std::make_shared<std::vector<std::shared_ptr<RemoteLogger>>>();
+
+  for (const auto& server : config.servers) {
+    try {
+      result->push_back(std::make_shared<RemoteLogger>(server, config.timeout, config.maxQueuedEntries, config.reconnectWaitTime, config.asyncConnect));
+    }
+    catch(const std::exception& e) {
+      g_log<<Logger::Error<<"Error while starting protobuf logger to '"<<server<<": "<<e.what()<<endl;
+    }
+    catch(const PDNSException& e) {
+      g_log<<Logger::Error<<"Error while starting protobuf logger to '"<<server<<": "<<e.reason<<endl;
+    }
   }
 
   return result;
@@ -870,9 +888,11 @@ static std::shared_ptr<RemoteLogger> startProtobufServer(const ProtobufExportCon
 static bool checkProtobufExport(LocalStateHolder<LuaConfigItems>& luaconfsLocal)
 {
   if (!luaconfsLocal->protobufExportConfig.enabled) {
-    if (t_protobufServer != nullptr) {
-      t_protobufServer->stop();
-      t_protobufServer = nullptr;
+    if (t_protobufServers) {
+      for (auto& server : *t_protobufServers) {
+        server->stop();
+      }
+      t_protobufServers.reset();
     }
 
     return false;
@@ -880,14 +900,18 @@ static bool checkProtobufExport(LocalStateHolder<LuaConfigItems>& luaconfsLocal)
 
   /* if the server was not running, or if it was running according to a
      previous configuration */
-  if (t_protobufServer == nullptr ||
-      t_protobufServer->getGeneration() < luaconfsLocal->generation) {
+  if (!t_protobufServers ||
+      t_protobufServersGeneration < luaconfsLocal->generation) {
 
-    if (t_protobufServer) {
-      t_protobufServer->stop();
+    if (t_protobufServers) {
+      for (auto& server : *t_protobufServers) {
+        server->stop();
+      }
     }
+    t_protobufServers.reset();
 
-    t_protobufServer = startProtobufServer(luaconfsLocal->protobufExportConfig, luaconfsLocal->generation);
+    t_protobufServers = startProtobufServers(luaconfsLocal->protobufExportConfig);
+    t_protobufServersGeneration = luaconfsLocal->generation;
   }
 
   return true;
@@ -896,24 +920,30 @@ static bool checkProtobufExport(LocalStateHolder<LuaConfigItems>& luaconfsLocal)
 static bool checkOutgoingProtobufExport(LocalStateHolder<LuaConfigItems>& luaconfsLocal)
 {
   if (!luaconfsLocal->outgoingProtobufExportConfig.enabled) {
-    if (t_outgoingProtobufServer != nullptr) {
-      t_outgoingProtobufServer->stop();
-      t_outgoingProtobufServer = nullptr;
+    if (t_outgoingProtobufServers) {
+      for (auto& server : *t_outgoingProtobufServers) {
+        server->stop();
+      }
     }
+    t_outgoingProtobufServers.reset();
 
     return false;
   }
 
   /* if the server was not running, or if it was running according to a
      previous configuration */
-  if (t_outgoingProtobufServer == nullptr ||
-      t_outgoingProtobufServer->getGeneration() < luaconfsLocal->generation) {
+  if (!t_outgoingProtobufServers ||
+      t_outgoingProtobufServersGeneration < luaconfsLocal->generation) {
 
-    if (t_outgoingProtobufServer) {
-      t_outgoingProtobufServer->stop();
+    if (t_outgoingProtobufServers) {
+      for (auto& server : *t_outgoingProtobufServers) {
+        server->stop();
+      }
     }
+    t_outgoingProtobufServers.reset();
 
-    t_outgoingProtobufServer = startProtobufServer(luaconfsLocal->outgoingProtobufExportConfig, luaconfsLocal->generation);
+    t_outgoingProtobufServers = startProtobufServers(luaconfsLocal->outgoingProtobufExportConfig);
+    t_outgoingProtobufServersGeneration = luaconfsLocal->generation;
   }
 
   return true;
@@ -1038,7 +1068,7 @@ static void startDoResolve(void *p)
     bool logResponse = false;
 #ifdef HAVE_PROTOBUF
     if (checkProtobufExport(luaconfsLocal)) {
-      logResponse = t_protobufServer && luaconfsLocal->protobufExportConfig.logResponses;
+      logResponse = t_protobufServers && luaconfsLocal->protobufExportConfig.logResponses;
       Netmask requestorNM(dc->d_source, dc->d_source.sin4.sin_family == AF_INET ? luaconfsLocal->protobufMaskV4 : luaconfsLocal->protobufMaskV6);
       const ComboAddress& requestor = requestorNM.getMaskedNetwork();
       pbMessage = RecProtoBufMessage(RecProtoBufMessage::Response, dc->d_uuid, &requestor, &dc->d_destination, dc->d_mdp.d_qname, dc->d_mdp.d_qtype, dc->d_mdp.d_qclass, dc->d_mdp.d_header.id, dc->d_tcp, 0);
@@ -1085,7 +1115,7 @@ static void startDoResolve(void *p)
 
 #ifdef HAVE_PROTOBUF
     sr.setInitialRequestId(dc->d_uuid);
-    sr.setOutgoingProtobufServer(t_outgoingProtobufServer);
+    sr.setOutgoingProtobufServers(t_outgoingProtobufServers);
 #endif
 
     sr.setQuerySource(dc->d_remote, g_useIncomingECS && !dc->d_ednssubnet.source.empty() ? boost::optional<const EDNSSubnetOpts&>(dc->d_ednssubnet) : boost::none);
@@ -1422,7 +1452,7 @@ static void startDoResolve(void *p)
 #endif /* NOD ENABLED */    
 
 #ifdef HAVE_PROTOBUF
-        if(t_protobufServer) {
+        if (t_protobufServers) {
 #ifdef NOD_ENABLED
           pbMessage->addRR(*i, luaconfsLocal->protobufExportConfig.exportTypes, udr);
 #else
@@ -1457,7 +1487,7 @@ static void startDoResolve(void *p)
     }
 #endif /* NOD_ENABLED */
 #ifdef HAVE_PROTOBUF
-    if (t_protobufServer && logResponse && !(luaconfsLocal->protobufExportConfig.taggedOnly && (!appliedPolicy.d_name || appliedPolicy.d_name->empty()) && dc->d_policyTags.empty())) {
+    if (t_protobufServers && logResponse && !(luaconfsLocal->protobufExportConfig.taggedOnly && (!appliedPolicy.d_name || appliedPolicy.d_name->empty()) && dc->d_policyTags.empty())) {
       pbMessage->setBytes(packet.size());
       pbMessage->setResponseCode(pw.getHeader()->rcode);
       if (appliedPolicy.d_name) {
@@ -1479,7 +1509,7 @@ static void startDoResolve(void *p)
         }
       }
 #endif /* NOD_ENABLED */
-      protobufLogResponse(t_protobufServer, *pbMessage);
+      protobufLogResponse(*pbMessage);
 #ifdef NOD_ENABLED
       if (g_nodEnabled) {
         pbMessage->setNOD(false);
@@ -1827,7 +1857,7 @@ static void handleRunningTCPQuestion(int fd, FDMultiplexer::funcparam_t& var)
       if (checkProtobufExport(luaconfsLocal)) {
         needECS = true;
       }
-      logQuery = t_protobufServer && luaconfsLocal->protobufExportConfig.logQueries;
+      logQuery = t_protobufServers && luaconfsLocal->protobufExportConfig.logQueries;
 #endif
 
       if(needECS || needXPF || (t_pdl && (t_pdl->d_gettag_ffi || t_pdl->d_gettag))) {
@@ -1866,17 +1896,17 @@ static void handleRunningTCPQuestion(int fd, FDMultiplexer::funcparam_t& var)
       const struct dnsheader* dh = reinterpret_cast<const struct dnsheader*>(&conn->data[0]);
 
 #ifdef HAVE_PROTOBUF
-      if(t_protobufServer || t_outgoingProtobufServer) {
+      if(t_protobufServers || t_outgoingProtobufServers) {
         dc->d_requestorId = requestorId;
         dc->d_deviceId = deviceId;
         dc->d_uuid = (*t_uuidGenerator)();
       }
 
-      if(t_protobufServer) {
+      if(t_protobufServers) {
         try {
 
           if (logQuery && !(luaconfsLocal->protobufExportConfig.taggedOnly && dc->d_policyTags.empty())) {
-            protobufLogQuery(t_protobufServer, luaconfsLocal->protobufMaskV4, luaconfsLocal->protobufMaskV6, dc->d_uuid, dc->d_source, dc->d_destination, dc->d_ednssubnet.source, true, dh->id, conn->qlen, qname, qtype, qclass, dc->d_policyTags, dc->d_requestorId, dc->d_deviceId);
+              protobufLogQuery(luaconfsLocal->protobufMaskV4, luaconfsLocal->protobufMaskV6, dc->d_uuid, dc->d_source, dc->d_destination, dc->d_ednssubnet.source, true, dh->id, conn->qlen, qname, qtype, qclass, dc->d_policyTags, dc->d_requestorId, dc->d_deviceId);
           }
         }
         catch(std::exception& e) {
@@ -2009,8 +2039,8 @@ static string* doProcessUDPQuestion(const std::string& question, const ComboAddr
   } else if (checkOutgoingProtobufExport(luaconfsLocal)) {
     uniqueId = (*t_uuidGenerator)();
   }
-  logQuery = t_protobufServer && luaconfsLocal->protobufExportConfig.logQueries;
-  bool logResponse = t_protobufServer && luaconfsLocal->protobufExportConfig.logResponses;
+  logQuery = t_protobufServers && luaconfsLocal->protobufExportConfig.logQueries;
+  bool logResponse = t_protobufServers && luaconfsLocal->protobufExportConfig.logResponses;
 #endif
   EDNSSubnetOpts ednssubnet;
   bool ecsFound = false;
@@ -2074,11 +2104,11 @@ static string* doProcessUDPQuestion(const std::string& question, const ComboAddr
     bool cacheHit = false;
     boost::optional<RecProtoBufMessage> pbMessage(boost::none);
 #ifdef HAVE_PROTOBUF
-    if(t_protobufServer) {
+    if (t_protobufServers) {
       pbMessage = RecProtoBufMessage(DNSProtoBufMessage::DNSProtoBufMessageType::Response);
       pbMessage->setServerIdentity(SyncRes::s_serverID);
       if (logQuery && !(luaconfsLocal->protobufExportConfig.taggedOnly && policyTags.empty())) {
-        protobufLogQuery(t_protobufServer, luaconfsLocal->protobufMaskV4, luaconfsLocal->protobufMaskV6, uniqueId, source, destination, ednssubnet.source, false, dh->id, question.size(), qname, qtype, qclass, policyTags, requestorId, deviceId);
+        protobufLogQuery(luaconfsLocal->protobufMaskV4, luaconfsLocal->protobufMaskV6, uniqueId, source, destination, ednssubnet.source, false, dh->id, question.size(), qname, qtype, qclass, policyTags, requestorId, deviceId);
       }
     }
 #endif /* HAVE_PROTOBUF */
@@ -2103,7 +2133,7 @@ static string* doProcessUDPQuestion(const std::string& question, const ComboAddr
       }
 
 #ifdef HAVE_PROTOBUF
-      if(t_protobufServer && logResponse && !(luaconfsLocal->protobufExportConfig.taggedOnly && pbMessage->getAppliedPolicy().empty() && pbMessage->getPolicyTags().empty())) {
+      if(t_protobufServers && logResponse && !(luaconfsLocal->protobufExportConfig.taggedOnly && pbMessage->getAppliedPolicy().empty() && pbMessage->getPolicyTags().empty())) {
         Netmask requestorNM(source, source.sin4.sin_family == AF_INET ? luaconfsLocal->protobufMaskV4 : luaconfsLocal->protobufMaskV6);
         const ComboAddress& requestor = requestorNM.getMaskedNetwork();
         pbMessage->update(uniqueId, &requestor, &destination, false, dh->id);
@@ -2111,7 +2141,7 @@ static string* doProcessUDPQuestion(const std::string& question, const ComboAddr
         pbMessage->setQueryTime(g_now.tv_sec, g_now.tv_usec);
         pbMessage->setRequestorId(requestorId);
         pbMessage->setDeviceId(deviceId);
-        protobufLogResponse(t_protobufServer, *pbMessage);
+        protobufLogResponse(*pbMessage);
       }
 #endif /* HAVE_PROTOBUF */
       if(!g_quiet)
@@ -2179,7 +2209,7 @@ static string* doProcessUDPQuestion(const std::string& question, const ComboAddr
   dc->d_ttlCap = ttlCap;
   dc->d_variable = variable;
 #ifdef HAVE_PROTOBUF
-  if (t_protobufServer || t_outgoingProtobufServer) {
+  if (t_protobufServers || t_outgoingProtobufServers) {
     dc->d_uuid = std::move(uniqueId);
   }
   dc->d_requestorId = requestorId;
index dfa539841e2dbc1df3911a06f33d6f4b7e8c3286..5abfa4d1a5400065d03b5a6eb45beb957f4e8651 100644 (file)
@@ -425,41 +425,67 @@ void loadRecursorLuaConfig(const std::string& fname, luaConfigDelayedThreads& de
       lci.protobufMaskV6 = maskV6;
     });
 
-  Lua.writeFunction("protobufServer", [&lci](const string& server_, boost::optional<protobufOptions_t> vars) {
-      try {
+  Lua.writeFunction("protobufServer", [&lci](boost::variant<const std::string, const std::unordered_map<int, std::string>> servers, boost::optional<protobufOptions_t> vars) {
         if (!lci.protobufExportConfig.enabled) {
+
           lci.protobufExportConfig.enabled = true;
-          lci.protobufExportConfig.server = ComboAddress(server_);
-          parseProtobufOptions(vars, lci.protobufExportConfig);
+
+          try {
+            if (servers.type() == typeid(std::string)) {
+              auto server = boost::get<const std::string>(servers);
+
+              lci.protobufExportConfig.servers.emplace_back(server);
+            }
+            else {
+              auto serversMap = boost::get<const std::unordered_map<int,std::string>>(servers);
+              for (const auto& serverPair : serversMap) {
+                lci.protobufExportConfig.servers.emplace_back(serverPair.second);
+              }
+            }
+
+            parseProtobufOptions(vars, lci.protobufExportConfig);
+          }
+          catch(std::exception& e) {
+            g_log<<Logger::Error<<"Error while adding protobuf logger: "<<e.what()<<endl;
+          }
+          catch(PDNSException& e) {
+            g_log<<Logger::Error<<"Error while adding protobuf logger: "<<e.reason<<endl;
+          }
         }
         else {
-          g_log<<Logger::Error<<"Only one protobuf server can be configured, we already have "<<lci.protobufExportConfig.server.toString()<<endl;
+          g_log<<Logger::Error<<"Only one protobufServer() directive can be configured, we already have "<<lci.protobufExportConfig.servers.at(0).toString()<<endl;
         }
-      }
-      catch(std::exception& e) {
-       g_log<<Logger::Error<<"Error while starting protobuf logger to '"<<server_<<": "<<e.what()<<endl;
-      }
-      catch(PDNSException& e) {
-        g_log<<Logger::Error<<"Error while starting protobuf logger to '"<<server_<<": "<<e.reason<<endl;
-      }
     });
 
-  Lua.writeFunction("outgoingProtobufServer", [&lci](const string& server_, boost::optional<protobufOptions_t> vars) {
-      try {
-        if (!lci.outgoingProtobufExportConfig.enabled) {
-          lci.outgoingProtobufExportConfig.enabled = true;
-          lci.outgoingProtobufExportConfig.server = ComboAddress(server_);
-          parseProtobufOptions(vars, lci.outgoingProtobufExportConfig);
-        }
-        else {
-          g_log<<Logger::Error<<"Only one protobuf server can be configured, we already have "<<lci.outgoingProtobufExportConfig.server.toString()<<endl;
-        }
-      }
-      catch(std::exception& e) {
-       g_log<<Logger::Error<<"Error while starting protobuf logger to '"<<server_<<": "<<e.what()<<endl;
+  Lua.writeFunction("outgoingProtobufServer", [&lci](boost::variant<const std::string, const std::unordered_map<int, std::string>> servers, boost::optional<protobufOptions_t> vars) {
+      if (!lci.outgoingProtobufExportConfig.enabled) {
+
+        lci.outgoingProtobufExportConfig.enabled = true;
+
+          try {
+            if (servers.type() == typeid(std::string)) {
+              auto server = boost::get<const std::string>(servers);
+
+              lci.outgoingProtobufExportConfig.servers.emplace_back(server);
+            }
+            else {
+              auto serversMap = boost::get<const std::unordered_map<int,std::string>>(servers);
+              for (const auto& serverPair : serversMap) {
+                lci.outgoingProtobufExportConfig.servers.emplace_back(serverPair.second);
+              }
+            }
+
+            parseProtobufOptions(vars, lci.outgoingProtobufExportConfig);
+          }
+          catch(std::exception& e) {
+            g_log<<Logger::Error<<"Error while starting outgoing protobuf logger: "<<e.what()<<endl;
+          }
+          catch(PDNSException& e) {
+            g_log<<Logger::Error<<"Error while starting outgoing protobuf logger: "<<e.reason<<endl;
+          }
       }
-      catch(PDNSException& e) {
-        g_log<<Logger::Error<<"Error while starting protobuf logger to '"<<server_<<": "<<e.reason<<endl;
+      else {
+        g_log<<Logger::Error<<"Only one outgoingProtobufServer() directive can be configured, we already have "<<lci.outgoingProtobufExportConfig.servers.at(0).toString()<<endl;
       }
     });
 #endif
index a47077a2804df4e6c0fdd6cd1fc2114555d4b872..6ce143076820e1fc2ea1c554327eb0fce07d0005 100644 (file)
@@ -30,7 +30,7 @@
 struct ProtobufExportConfig
 {
   std::set<uint16_t> exportTypes = { QType::A, QType::AAAA, QType::CNAME };
-  ComboAddress server;
+  std::vector<ComboAddress> servers;
   uint64_t maxQueuedEntries{100};
   uint16_t timeout{2};
   uint16_t reconnectWaitTime{1};
index 76d482da154ab2b90edae60004391e044706b871..0f5c3813035be113e7ef33b81fe4bab90be3094c 100644 (file)
@@ -12,13 +12,13 @@ Configuring Protocol Buffer logs
 --------------------------------
 Protobuf export to a server is enabled using the ``protobufServer()`` directive:
 
-.. function:: protobufServer(server [, options]))
+.. function:: protobufServer(servers [, options]))
 
   .. versionadded:: 4.2.0
 
-  Send protocol buffer messages to a server for incoming queries and/or outgoing responses. The client address may be masked using :func:`setProtobufMasks`, for anonymization purposes.
+  Send protocol buffer messages to one or more servers for incoming queries and/or outgoing responses. The client address may be masked using :func:`setProtobufMasks`, for anonymization purposes.
 
-  :param string server: The IP and port to connect to
+  :param string or list of strings servers: The IP and port to connect to, or a list of those. If more than one server is configured, all messages are sent to every server.
   :param table options: A table with key: value pairs with options.
 
   Options:
@@ -57,13 +57,13 @@ Logging outgoing queries and responses
 
 While :func:`protobufServer` only exports the queries sent to the recursor from clients, with the corresponding responses, ``outgoingProtobufServer()`` can be used to export outgoing queries sent by the recursor to authoritative servers, along with the corresponding responses.
 
-.. function:: outgoingProtobufServer(server [, options])
+.. function:: outgoingProtobufServer(servers [, options])
 
   .. versionadded:: 4.2.0
 
-  Send protocol buffer messages to a server for outgoing queries and/or incoming responses.
+  Send protocol buffer messages to one or more servers for outgoing queries and/or incoming responses.
 
-  :param string server: The IP and port to connect to
+  :param string or list of strings servers: The IP and port to connect to, or a list of those. If more than one server is configured, all messages are sent to every server.
   :param table options: A table with key: value pairs with options.
 
   Options:
index a7280c026783a681cc9ca4c55575796bccde1074..0c99ef035b9db5df8cb9afb8fbdddc4617be8360 100644 (file)
@@ -39,7 +39,7 @@ bool RecursorLua4::preoutquery(const ComboAddress& ns, const ComboAddress& reque
   return false;
 }
 
-int asyncresolve(const ComboAddress& ip, const DNSName& domain, int type, bool doTCP, bool sendRDQuery, int EDNS0Level, struct timeval* now, boost::optional<Netmask>& srcmask, boost::optional<const ResolveContext&> context, const std::shared_ptr<RemoteLogger>& outgoingLogger, const std::set<uint16_t>& exportTypes, LWResult* res, bool* chained)
+int asyncresolve(const ComboAddress& ip, const DNSName& domain, int type, bool doTCP, bool sendRDQuery, int EDNS0Level, struct timeval* now, boost::optional<Netmask>& srcmask, boost::optional<const ResolveContext&> context, const std::shared_ptr<std::vector<std::shared_ptr<RemoteLogger>>>& outgoingLoggers, const std::set<uint16_t>& exportTypes, LWResult* res, bool* chained)
 {
   return 0;
 }
index 6037e484c5c713adb9c26a96279cb936e52d52aa..3eb2aa575711f0f23fa4ef645ce706a0171f8378 100644 (file)
@@ -53,14 +53,6 @@ public:
   {
     d_exiting = true;
   }
-  uint64_t getGeneration() const
-  {
-    return d_generation;
-  }
-  void setGeneration(uint64_t newGeneration)
-  {
-    d_generation = newGeneration;
-  }
 private:
   void busyReconnectLoop();
   bool reconnect();
@@ -71,7 +63,6 @@ private:
   std::condition_variable d_queueCond;
   ComboAddress d_remote;
   uint64_t d_maxQueuedEntries;
-  uint64_t d_generation{0};
   int d_socket{-1};
   uint16_t d_timeout;
   uint8_t d_reconnectWaitTime;
index 16c7c73dd145a017704a5d6780c5caccd1d263d9..a99667a3bcdf9915f1522331ea4fc506b31219a2 100644 (file)
@@ -491,7 +491,7 @@ int SyncRes::asyncresolveWrapper(const ComboAddress& ip, bool ednsMANDATORY, con
       ret = d_asyncResolve(ip, sendQname, type, doTCP, sendRDQuery, EDNSLevel, now, srcmask, ctx, res, chained);
     }
     else {
-      ret=asyncresolve(ip, sendQname, type, doTCP, sendRDQuery, EDNSLevel, now, srcmask, ctx, d_outgoingProtobufServer, luaconfsLocal->outgoingProtobufExportConfig.exportTypes, res, chained);
+      ret=asyncresolve(ip, sendQname, type, doTCP, sendRDQuery, EDNSLevel, now, srcmask, ctx, d_outgoingProtobufServers, luaconfsLocal->outgoingProtobufExportConfig.exportTypes, res, chained);
     }
     if(ret < 0) {
       return ret; // transport error, nothing to learn here
index 6bc8bb752d24677c18957480b8ff6b300aa394ca..32fbdb9ea82b896eb676c2543f60e97d6cfc5252 100644 (file)
@@ -659,9 +659,9 @@ public:
     d_initialRequestId = initialRequestId;
   }
 
-  void setOutgoingProtobufServer(std::shared_ptr<RemoteLogger>& server)
+  void setOutgoingProtobufServers(std::shared_ptr<std::vector<std::shared_ptr<RemoteLogger>>>& servers)
   {
-    d_outgoingProtobufServer = server;
+    d_outgoingProtobufServers = servers;
   }
 #endif
 
@@ -804,7 +804,7 @@ private:
   ostringstream d_trace;
   shared_ptr<RecursorLua4> d_pdl;
   boost::optional<Netmask> d_outgoingECSNetwork;
-  std::shared_ptr<RemoteLogger> d_outgoingProtobufServer{nullptr};
+  std::shared_ptr<std::vector<std::shared_ptr<RemoteLogger>>> d_outgoingProtobufServers{nullptr};
 #ifdef HAVE_PROTOBUF
   boost::optional<const boost::uuids::uuid&> d_initialRequestId;
 #endif
index 0c36bfcbfd394ac0f47efa4ae3fa60630700f459..813087be02764d9b78715352ea5241203629ca13 100644 (file)
@@ -17,9 +17,6 @@ else:
 
 from recursortests import RecursorTest
 
-protobufQueue = Queue()
-protobufServerPort = 4243
-
 def ProtobufConnectionHandler(queue, conn):
     data = None
     while True:
@@ -35,8 +32,7 @@ def ProtobufConnectionHandler(queue, conn):
 
     conn.close()
 
-def ProtobufListener(port):
-    global protobufQueue
+def ProtobufListener(queue, port):
     sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
     sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1)
     try:
@@ -51,7 +47,7 @@ def ProtobufListener(port):
             (conn, _) = sock.accept()
             thread = threading.Thread(name='Connection Handler',
                                       target=ProtobufConnectionHandler,
-                                      args=[protobufQueue, conn])
+                                      args=[queue, conn])
             thread.setDaemon(True)
             thread.start()
 
@@ -61,39 +57,58 @@ def ProtobufListener(port):
     sock.close()
 
 
-protobufListener = threading.Thread(name='Protobuf Listener', target=ProtobufListener, args=[protobufServerPort])
-protobufListener.setDaemon(True)
-protobufListener.start()
+class ProtobufServerParams:
+  def __init__(self, port):
+    self.queue = Queue()
+    self.port = port
+
+protobufServersParameters = [ProtobufServerParams(4243), ProtobufServerParams(4244)]
+protobufListeners = []
+for param in protobufServersParameters:
+  listener = threading.Thread(name='Protobuf Listener', target=ProtobufListener, args=[param.queue, param.port])
+  listener.setDaemon(True)
+  listener.start()
+  protobufListeners.append(listener)
 
 class TestRecursorProtobuf(RecursorTest):
 
-    global protobufServerPort
     _lua_config_file = """
-    protobufServer("127.0.0.1:%d")
-    """ % (protobufServerPort)
+    protobufServer({"127.0.0.1:%d", "127.0.0.1:%d"})
+    """ % (protobufServersParameters[0].port, protobufServersParameters[1].port)
 
 
     def getFirstProtobufMessage(self, retries=1, waitTime=1):
-        global protobufQueue
-        failed = 0
-
-        while protobufQueue.empty:
-          if failed >= retries:
-            break
-
-          failed = failed + 1
-          time.sleep(waitTime)
+        msg = None
+
+        print("in getFirstProtobufMessage")
+        for param in protobufServersParameters:
+          print(param.port)
+          failed = 0
+
+          while param.queue.empty:
+            print(failed)
+            print(retries)
+            if failed >= retries:
+              break
+
+            failed = failed + 1
+            print("waiting")
+            time.sleep(waitTime)
+
+          self.assertFalse(param.queue.empty())
+          data = param.queue.get(False)
+          self.assertTrue(data)
+          oldmsg = msg
+          msg = dnsmessage_pb2.PBDNSMessage()
+          msg.ParseFromString(data)
+          if oldmsg is not None:
+            self.assertEquals(msg, oldmsg)
 
-        self.assertFalse(protobufQueue.empty())
-        data = protobufQueue.get(False)
-        self.assertTrue(data)
-        msg = dnsmessage_pb2.PBDNSMessage()
-        msg.ParseFromString(data)
         return msg
 
     def checkNoRemainingMessage(self):
-        global protobufQueue
-        self.assertTrue(protobufQueue.empty())
+        for param in protobufServersParameters:
+          self.assertTrue(param.queue.empty())
 
     def checkProtobufBase(self, msg, protocol, query, initiator, normalQueryResponse=True, expectedECS=None, receivedSize=None):
         self.assertTrue(msg)
@@ -206,14 +221,6 @@ class TestRecursorProtobuf(RecursorTest):
     @classmethod
     def setUpClass(cls):
 
-        global protobufListener
-        global protobufServerPort
-        global ProtobufListener
-        if protobufListener is None or not protobufListener.isAlive():
-            protobufListener = threading.Thread(name='Protobuf Listener', target=ProtobufListener, args=[protobufServerPort])
-            protobufListener.setDaemon(True)
-            protobufListener.start()
-
         cls.setUpSockets()
 
         cls.startResponders()
@@ -227,9 +234,9 @@ class TestRecursorProtobuf(RecursorTest):
     def setUp(self):
       # Make sure the queue is empty, in case
       # a previous test failed
-      global protobufQueue
-      while not protobufQueue.empty():
-        protobufQueue.get(False)
+      for param in protobufServersParameters:
+        while not param.queue.empty():
+          param.queue.get(False)
 
     @classmethod
     def generateRecursorConfig(cls, confdir):
@@ -327,8 +334,8 @@ class OutgoingProtobufDefaultTest(TestRecursorProtobuf):
     _config_template = """
 auth-zones=example=configs/%s/example.zone""" % _confdir
     _lua_config_file = """
-    outgoingProtobufServer("127.0.0.1:%d")
-    """ % (protobufServerPort)
+    outgoingProtobufServer({"127.0.0.1:%d", "127.0.0.1:%d"})
+    """ % (protobufServersParameters[0].port, protobufServersParameters[1].port)
 
     def testA(self):
         name = 'www.example.org.'
@@ -353,13 +360,12 @@ class ProtobufMasksTest(TestRecursorProtobuf):
     _confdir = 'ProtobufMasks'
     _config_template = """
 auth-zones=example=configs/%s/example.zone""" % _confdir
-    global protobufServerPort
     _protobufMaskV4 = 4
     _protobufMaskV6 = 128
     _lua_config_file = """
-    protobufServer("127.0.0.1:%d")
+    protobufServer({"127.0.0.1:%d", "127.0.0.1:%d"})
     setProtobufMasks(%d, %d)
-    """ % (protobufServerPort, _protobufMaskV4, _protobufMaskV6)
+    """ % (protobufServersParameters[0].port, protobufServersParameters[1].port, _protobufMaskV4, _protobufMaskV6)
 
     def testA(self):
         name = 'a.example.'
@@ -391,10 +397,9 @@ class ProtobufQueriesOnlyTest(TestRecursorProtobuf):
     _confdir = 'ProtobufQueriesOnly'
     _config_template = """
 auth-zones=example=configs/%s/example.zone""" % _confdir
-    global protobufServerPort
     _lua_config_file = """
-    protobufServer("127.0.0.1:%d", { logQueries=true, logResponses=false } )
-    """ % (protobufServerPort)
+    protobufServer({"127.0.0.1:%d", "127.0.0.1:%d"}, { logQueries=true, logResponses=false } )
+    """ % (protobufServersParameters[0].port, protobufServersParameters[1].port)
 
     def testA(self):
         name = 'a.example.'
@@ -418,10 +423,9 @@ class ProtobufResponsesOnlyTest(TestRecursorProtobuf):
     _confdir = 'ProtobufResponsesOnly'
     _config_template = """
 auth-zones=example=configs/%s/example.zone""" % _confdir
-    global protobufServerPort
     _lua_config_file = """
-    protobufServer("127.0.0.1:%d", { logQueries=false, logResponses=true } )
-    """ % (protobufServerPort)
+    protobufServer({"127.0.0.1:%d", "127.0.0.1:%d"}, { logQueries=false, logResponses=true } )
+    """ % (protobufServersParameters[0].port, protobufServersParameters[1].port)
 
     def testA(self):
         name = 'a.example.'
@@ -450,10 +454,9 @@ class ProtobufTaggedOnlyTest(TestRecursorProtobuf):
     _confdir = 'ProtobufTaggedOnly'
     _config_template = """
 auth-zones=example=configs/%s/example.zone""" % _confdir
-    global protobufServerPort
     _lua_config_file = """
-    protobufServer("127.0.0.1:%d", { logQueries=true, logResponses=true, taggedOnly=true } )
-    """ % (protobufServerPort)
+    protobufServer({"127.0.0.1:%d", "127.0.0.1:%d"}, { logQueries=true, logResponses=true, taggedOnly=true } )
+    """ % (protobufServersParameters[0].port, protobufServersParameters[1].port)
     _tags = ['tag1', 'tag2']
     _tag_from_gettag = 'tag-from-gettag'
     _lua_dns_script_file = """
@@ -517,10 +520,9 @@ class ProtobufSelectedFromLuaTest(TestRecursorProtobuf):
     _confdir = 'ProtobufSelectedFromLua'
     _config_template = """
 auth-zones=example=configs/%s/example.zone""" % _confdir
-    global protobufServerPort
     _lua_config_file = """
-    protobufServer("127.0.0.1:%d", { logQueries=false, logResponses=false } )
-    """ % (protobufServerPort)
+    protobufServer({"127.0.0.1:%d", "127.0.0.1:%d"}, { logQueries=false, logResponses=false } )
+    """ % (protobufServersParameters[0].port, protobufServersParameters[1].port)
     _lua_dns_script_file = """
     local ffi = require("ffi")
 
@@ -599,10 +601,9 @@ class ProtobufExportTypesTest(TestRecursorProtobuf):
     _confdir = 'ProtobufExportTypes'
     _config_template = """
 auth-zones=example=configs/%s/example.zone""" % _confdir
-    global protobufServerPort
     _lua_config_file = """
-    protobufServer("127.0.0.1:%d", { exportTypes={"AAAA", "MX", "SPF", "SRV", "TXT"} } )
-    """ % (protobufServerPort)
+    protobufServer({"127.0.0.1:%d", "127.0.0.1:%d"}, { exportTypes={"AAAA", "MX", "SPF", "SRV", "TXT"} } )
+    """ % (protobufServersParameters[0].port, protobufServersParameters[1].port)
 
     def testA(self):
         name = 'types.example.'
index f6c75c0b42cc9b1c05f1a59d9755618c5b054da0..60a52743395c84bf9b46c56d61c1af9530a9b902 100644 (file)
@@ -159,7 +159,7 @@ class RPZServer(object):
                 print('Error in RPZ socket: %s' % str(e))
                 sock.close()
 
-rpzServerPort = 4244
+rpzServerPort = 4250
 rpzServer = RPZServer(rpzServerPort)
 
 class RPZRecursorTest(RecursorTest):