]> git.ipfire.org Git - thirdparty/pdns.git/commitdiff
Also do the compare for protobuf logger config objects. 12146/head
authorOtto Moerbeek <otto.moerbeek@open-xchange.com>
Wed, 31 Aug 2022 14:58:30 +0000 (16:58 +0200)
committerOtto Moerbeek <otto.moerbeek@open-xchange.com>
Tue, 1 Nov 2022 07:54:41 +0000 (08:54 +0100)
I'm not doing the async part now, as tsan reports there would be a race.
The case occuring the most: no changes is now handled correctly and quickly and that is the main thing.

(cherry picked from commit babe943035818b7a97e59cfd70921ba06bbf31cf)

pdns/pdns_recursor.cc
pdns/rec-lua-conf.cc
pdns/rec-lua-conf.hh
pdns/rec_channel_rec.cc
pdns/recursordist/docs/manpages/rec_control.1.rst
pdns/recursordist/rec-main.cc
pdns/recursordist/rec-main.hh
pdns/recursordist/rec-tcp.cc

index 7148778082382a994caf4a58b67831ea533fe24a..986da266ab52bdf5129e469588950e2265160e6a 100644 (file)
@@ -43,8 +43,8 @@
 
 thread_local std::shared_ptr<RecursorLua4> t_pdl;
 thread_local std::shared_ptr<Regex> t_traceRegex;
-thread_local std::shared_ptr<std::vector<std::unique_ptr<RemoteLogger>>> t_protobufServers{nullptr};
-thread_local std::shared_ptr<std::vector<std::unique_ptr<RemoteLogger>>> t_outgoingProtobufServers{nullptr};
+thread_local ProtobufServersInfo t_protobufServers;
+thread_local ProtobufServersInfo t_outgoingProtobufServers;
 
 thread_local std::unique_ptr<MT_t> MT; // the big MTasker
 std::unique_ptr<MemRecursorCache> g_recCache;
@@ -917,7 +917,7 @@ void startDoResolve(void* p)
 
       // RRSets added below
     }
-
+    checkOutgoingProtobufExport(luaconfsLocal); // to pick up changed configs
 #ifdef HAVE_FSTRM
     checkFrameStreamExport(luaconfsLocal, luaconfsLocal->frameStreamExportConfig, t_frameStreamServersInfo);
     checkFrameStreamExport(luaconfsLocal, luaconfsLocal->nodFrameStreamExportConfig, t_nodFrameStreamServersInfo);
@@ -977,7 +977,7 @@ void startDoResolve(void* p)
     sr.setDNSSECValidationRequested(g_dnssecmode == DNSSECMode::ValidateAll || g_dnssecmode == DNSSECMode::ValidateForLog || ((dc->d_mdp.d_header.ad || DNSSECOK) && g_dnssecmode == DNSSECMode::Process));
 
     sr.setInitialRequestId(dc->d_uuid);
-    sr.setOutgoingProtobufServers(t_outgoingProtobufServers);
+    sr.setOutgoingProtobufServers(t_outgoingProtobufServers.servers);
 #ifdef HAVE_FSTRM
     sr.setFrameStreamServers(t_frameStreamServersInfo.servers);
 #endif
@@ -1391,7 +1391,7 @@ void startDoResolve(void* p)
         }
 #endif /* NOD ENABLED */
 
-        if (t_protobufServers) {
+        if (t_protobufServers.servers) {
           // Max size is 64k, but we're conservative here, as other fields are added after the answers have been added
           // If a single answer causes a too big protobuf message, it wil be dropped by queueData()
           // But note addRR has code to prevent that
@@ -1587,7 +1587,7 @@ void startDoResolve(void* p)
       g_stats.variableResponses++;
     }
 
-    if (t_protobufServers && !(luaconfsLocal->protobufExportConfig.taggedOnly && appliedPolicy.getName().empty() && dc->d_policyTags.empty())) {
+    if (t_protobufServers.servers && !(luaconfsLocal->protobufExportConfig.taggedOnly && appliedPolicy.getName().empty() && dc->d_policyTags.empty())) {
       // Start constructing embedded DNSResponse object
       pbMessage.setResponseCode(pw.getHeader()->rcode);
       if (!appliedPolicy.getName().empty()) {
@@ -1652,7 +1652,7 @@ void startDoResolve(void* p)
     sr.d_eventTrace.add(RecEventTrace::AnswerSent);
 
     // Now do the per query changing part ot the protobuf message
-    if (t_protobufServers && !(luaconfsLocal->protobufExportConfig.taggedOnly && appliedPolicy.getName().empty() && dc->d_policyTags.empty())) {
+    if (t_protobufServers.servers && !(luaconfsLocal->protobufExportConfig.taggedOnly && appliedPolicy.getName().empty() && dc->d_policyTags.empty())) {
       // Below are the fields that are not stored in the packet cache and will be appended here and on a cache hit
       if (g_useKernelTimestamp && dc->d_kernelTimestamp.tv_sec) {
         pbMessage.setQueryTime(dc->d_kernelTimestamp.tv_sec, dc->d_kernelTimestamp.tv_usec);
@@ -2003,15 +2003,16 @@ static string* doProcessUDPQuestion(const std::string& question, const ComboAddr
   bool logResponse = false;
   boost::uuids::uuid uniqueId;
   auto luaconfsLocal = g_luaconfs.getLocal();
-  if (checkProtobufExport(luaconfsLocal)) {
-    uniqueId = getUniqueID();
-    needECS = true;
-  }
-  else if (checkOutgoingProtobufExport(luaconfsLocal)) {
+  const auto pbExport = checkProtobufExport(luaconfsLocal);
+  const auto outgoingbExport = checkOutgoingProtobufExport(luaconfsLocal);
+  if (pbExport || outgoingbExport) {
+    if (pbExport) {
+      needECS = true;
+    }
     uniqueId = getUniqueID();
   }
-  logQuery = t_protobufServers && luaconfsLocal->protobufExportConfig.logQueries;
-  logResponse = t_protobufServers && luaconfsLocal->protobufExportConfig.logResponses;
+  logQuery = t_protobufServers.servers && luaconfsLocal->protobufExportConfig.logQueries;
+  logResponse = t_protobufServers.servers && luaconfsLocal->protobufExportConfig.logResponses;
 #ifdef HAVE_FSTRM
   checkFrameStreamExport(luaconfsLocal, luaconfsLocal->frameStreamExportConfig, t_frameStreamServersInfo);
 #endif
@@ -2088,7 +2089,7 @@ static string* doProcessUDPQuestion(const std::string& question, const ComboAddr
     }
 
     RecursorPacketCache::OptPBData pbData{boost::none};
-    if (t_protobufServers) {
+    if (t_protobufServers.servers) {
       if (logQuery && !(luaconfsLocal->protobufExportConfig.taggedOnly && policyTags.empty())) {
         protobufLogQuery(luaconfsLocal, uniqueId, source, destination, mappedSource, ednssubnet.source, false, dh->id, question.size(), qname, qtype, qclass, policyTags, requestorId, deviceId, deviceName, meta);
       }
@@ -2124,7 +2125,7 @@ static string* doProcessUDPQuestion(const std::string& question, const ComboAddr
         int sendErr = sendOnNBSocket(fd, &msgh);
         eventTrace.add(RecEventTrace::AnswerSent);
 
-        if (t_protobufServers && logResponse && !(luaconfsLocal->protobufExportConfig.taggedOnly && pbData && !pbData->d_tagged)) {
+        if (t_protobufServers.servers && logResponse && !(luaconfsLocal->protobufExportConfig.taggedOnly && pbData && !pbData->d_tagged)) {
           protobufLogResponse(dh, luaconfsLocal, pbData, tv, false, source, destination, mappedSource, ednssubnet, uniqueId, requestorId, deviceId, deviceName, meta, eventTrace);
         }
 
@@ -2229,7 +2230,7 @@ static string* doProcessUDPQuestion(const std::string& question, const ComboAddr
   dc->d_followCNAMERecords = followCNAMEs;
   dc->d_rcode = rcode;
   dc->d_logResponse = logResponse;
-  if (t_protobufServers || t_outgoingProtobufServers) {
+  if (t_protobufServers.servers || t_outgoingProtobufServers.servers) {
     dc->d_uuid = std::move(uniqueId);
   }
   dc->d_requestorId = requestorId;
index 29d7c49b8f4617cc489203df8cb07b691bef03a0..9ad994b697c8ca268396c16421ad29f9b8cb611b 100644 (file)
@@ -43,6 +43,28 @@ LuaConfigItems::LuaConfigItems()
 
 /* DID YOU READ THE STORY ABOVE? */
 
+bool operator==(const ProtobufExportConfig& configA, const ProtobufExportConfig& configB)
+{
+  // clang-format off
+  return configA.exportTypes          == configB.exportTypes       &&
+         configA.servers              == configB.servers           &&
+         configA.maxQueuedEntries     == configB.maxQueuedEntries  &&
+         configA.timeout              == configB.timeout           &&
+         configA.reconnectWaitTime    == configB.reconnectWaitTime &&
+         configA.asyncConnect         == configB.asyncConnect      &&
+         configA.enabled              == configB.enabled           &&
+         configA.logQueries           == configB.logQueries        &&
+         configA.logResponses         == configB.logResponses      &&
+         configA.taggedOnly           == configB.taggedOnly        &&
+         configA.logMappedFrom        == configB.logMappedFrom;
+  // clang-format on
+}
+
+bool operator!=(const ProtobufExportConfig& configA, const ProtobufExportConfig& configB)
+{
+  return !(configA == configB);
+}
+
 bool operator==(const FrameStreamExportConfig& configA, const FrameStreamExportConfig& configB)
 {
   // clang-format off
index 6b101332a1eb7106ccb9fcc76b335331b6fea9e3..a357de5eb5f16330a0d44f6b34f5c1f19558304e 100644 (file)
@@ -45,6 +45,9 @@ struct ProtobufExportConfig
   bool logMappedFrom{false};
 };
 
+bool operator==(const ProtobufExportConfig& configA, const ProtobufExportConfig& configB);
+bool operator!=(const ProtobufExportConfig& configA, const ProtobufExportConfig& configB);
+
 struct FrameStreamExportConfig
 {
   std::vector<string> servers;
index bd89c6d6f15118b4e3208b89fdba8eae0e94b403..52c93bf4189f1a21fd667b241644464edf2c4c3b 100644 (file)
@@ -943,8 +943,8 @@ static RemoteLoggerStats_t* pleaseGetRemoteLoggerStats()
 {
   auto ret = make_unique<RemoteLoggerStats_t>();
 
-  if (t_protobufServers) {
-    for (const auto& server : *t_protobufServers) {
+  if (t_protobufServers.servers) {
+    for (const auto& server : *t_protobufServers.servers) {
       ret->emplace(std::make_pair(server->address(), server->getStats()));
     }
   }
@@ -966,8 +966,8 @@ static RemoteLoggerStats_t* pleaseGetOutgoingRemoteLoggerStats()
 {
   auto ret = make_unique<RemoteLoggerStats_t>();
 
-  if (t_outgoingProtobufServers) {
-    for (const auto& server : *t_outgoingProtobufServers) {
+  if (t_outgoingProtobufServers.servers) {
+    for (const auto& server : *t_outgoingProtobufServers.servers) {
       ret->emplace(std::make_pair(server->address(), server->getStats()));
     }
   }
index 1e701b48878f34ea00064063587d964909d9f93a..7b464ff21739e008b57292c2e8190379d12a92da 100644 (file)
@@ -204,8 +204,9 @@ reload-lua-config [*FILENAME*]
     (Re)loads Lua configuration *FILENAME*. If *FILENAME* is empty, attempt
     to reload the currently loaded file. Note that *FILENAME* will be fully
     executed, any settings changed at runtime that are not modified in this
-    file, will still be active. Reloading RPZ, especially by AXFR, can take
-    some time; during which the recursor will not answer questions.
+    file, will still be active. The effects of reloading do not always take
+    place immediately, as some subsystems reload and replace configuration
+    in an asynchronous way.
 
 reload-zones
     Reload authoritative and forward zones. Retains current configuration in
index b03584db45c4c7b0b46586470b6bea6d7fabf2cb..f22d84e72415690e86e15ebaa75cc310b3954747 100644 (file)
@@ -54,9 +54,6 @@
 #include <systemd/sd-journal.h>
 #endif
 
-static thread_local uint64_t t_protobufServersGeneration;
-static thread_local uint64_t t_outgoingProtobufServersGeneration;
-
 #ifdef HAVE_FSTRM
 thread_local FrameStreamServersInfo t_frameStreamServersInfo;
 thread_local FrameStreamServersInfo t_nodFrameStreamServersInfo;
@@ -409,11 +406,9 @@ static std::shared_ptr<std::vector<std::unique_ptr<RemoteLogger>>> startProtobuf
 bool checkProtobufExport(LocalStateHolder<LuaConfigItems>& luaconfsLocal)
 {
   if (!luaconfsLocal->protobufExportConfig.enabled) {
-    if (t_protobufServers) {
-      for (auto& server : *t_protobufServers) {
-        server->stop();
-      }
-      t_protobufServers.reset();
+    if (t_protobufServers.servers) {
+      t_protobufServers.servers.reset();
+      t_protobufServers.config = luaconfsLocal->protobufExportConfig;
     }
 
     return false;
@@ -421,18 +416,15 @@ bool checkProtobufExport(LocalStateHolder<LuaConfigItems>& luaconfsLocal)
 
   /* if the server was not running, or if it was running according to a
      previous configuration */
-  if (!t_protobufServers || t_protobufServersGeneration < luaconfsLocal->generation) {
+  if (t_protobufServers.generation < luaconfsLocal->generation && t_protobufServers.config != luaconfsLocal->protobufExportConfig) {
 
-    if (t_protobufServers) {
-      for (auto& server : *t_protobufServers) {
-        server->stop();
-      }
+    if (t_protobufServers.servers) {
+      t_protobufServers.servers.reset();
     }
-    t_protobufServers.reset();
-
     auto log = g_slog->withName("protobuf");
-    t_protobufServers = startProtobufServers(luaconfsLocal->protobufExportConfig, log);
-    t_protobufServersGeneration = luaconfsLocal->generation;
+    t_protobufServers.servers = startProtobufServers(luaconfsLocal->protobufExportConfig, log);
+    t_protobufServers.config = luaconfsLocal->protobufExportConfig;
+    t_protobufServers.generation = luaconfsLocal->generation;
   }
 
   return true;
@@ -441,30 +433,25 @@ bool checkProtobufExport(LocalStateHolder<LuaConfigItems>& luaconfsLocal)
 bool checkOutgoingProtobufExport(LocalStateHolder<LuaConfigItems>& luaconfsLocal)
 {
   if (!luaconfsLocal->outgoingProtobufExportConfig.enabled) {
-    if (t_outgoingProtobufServers) {
-      for (auto& server : *t_outgoingProtobufServers) {
-        server->stop();
-      }
+    if (t_outgoingProtobufServers.servers) {
+      t_outgoingProtobufServers.servers.reset();
+      t_outgoingProtobufServers.config = luaconfsLocal->outgoingProtobufExportConfig;
     }
-    t_outgoingProtobufServers.reset();
 
     return false;
   }
 
   /* if the server was not running, or if it was running according to a
      previous configuration */
-  if (!t_outgoingProtobufServers || t_outgoingProtobufServersGeneration < luaconfsLocal->generation) {
+  if (t_outgoingProtobufServers.generation < luaconfsLocal->generation && t_outgoingProtobufServers.config != luaconfsLocal->outgoingProtobufExportConfig) {
 
-    if (t_outgoingProtobufServers) {
-      for (auto& server : *t_outgoingProtobufServers) {
-        server->stop();
-      }
+    if (t_outgoingProtobufServers.servers) {
+      t_outgoingProtobufServers.servers.reset();
     }
-    t_outgoingProtobufServers.reset();
-
     auto log = g_slog->withName("protobuf");
-    t_outgoingProtobufServers = startProtobufServers(luaconfsLocal->outgoingProtobufExportConfig, log);
-    t_outgoingProtobufServersGeneration = luaconfsLocal->generation;
+    t_outgoingProtobufServers.servers = startProtobufServers(luaconfsLocal->outgoingProtobufExportConfig, log);
+    t_outgoingProtobufServers.config = luaconfsLocal->outgoingProtobufExportConfig;
+    t_outgoingProtobufServers.generation = luaconfsLocal->generation;
   }
 
   return true;
@@ -472,7 +459,9 @@ bool checkOutgoingProtobufExport(LocalStateHolder<LuaConfigItems>& luaconfsLocal
 
 void protobufLogQuery(LocalStateHolder<LuaConfigItems>& luaconfsLocal, const boost::uuids::uuid& uniqueId, const ComboAddress& remote, const ComboAddress& local, const ComboAddress& mappedRemote, const Netmask& ednssubnet, bool tcp, uint16_t id, size_t len, const DNSName& qname, uint16_t qtype, uint16_t qclass, const std::unordered_set<std::string>& policyTags, const std::string& requestorId, const std::string& deviceId, const std::string& deviceName, const std::map<std::string, RecursorLua4::MetaValue>& meta)
 {
-  if (!t_protobufServers) {
+  auto log = g_slog->withName("pblq");
+
+  if (!t_protobufServers.servers) {
     return;
   }
 
@@ -505,19 +494,19 @@ void protobufLogQuery(LocalStateHolder<LuaConfigItems>& luaconfsLocal, const boo
   }
 
   std::string msg(m.finishAndMoveBuf());
-  for (auto& server : *t_protobufServers) {
+  for (auto& server : *t_protobufServers.servers) {
     remoteLoggerQueueData(*server, msg);
   }
 }
 
 void protobufLogResponse(pdns::ProtoZero::RecMessage& message)
 {
-  if (!t_protobufServers) {
+  if (!t_protobufServers.servers) {
     return;
   }
 
   std::string msg(message.finishAndMoveBuf());
-  for (auto& server : *t_protobufServers) {
+  for (auto& server : *t_protobufServers.servers) {
     remoteLoggerQueueData(*server, msg);
   }
 }
@@ -2149,6 +2138,19 @@ static void houseKeeping(void*)
 
     const auto& info = RecThreadInfo::self();
 
+    // Threads handling packets process config changes in the input path, but not all threads process input packets
+    // distr threads only process TCP, so that may not happenn very often. So do all periodically.
+    static thread_local PeriodicTask exportConfigTask{"exportConfigTask", 30};
+    auto luaconfsLocal = g_luaconfs.getLocal();
+    exportConfigTask.runIfDue(now, [&luaconfsLocal]() {
+      checkProtobufExport(luaconfsLocal);
+      checkOutgoingProtobufExport(luaconfsLocal);
+#ifdef HAVE_FSTRM
+      checkFrameStreamExport(luaconfsLocal, luaconfsLocal->frameStreamExportConfig, t_frameStreamServersInfo);
+      checkFrameStreamExport(luaconfsLocal, luaconfsLocal->nodFrameStreamExportConfig, t_nodFrameStreamServersInfo);
+#endif
+    });
+
     // Below are the thread specific tasks for the handler and the taskThread
     // Likley a few handler tasks could be moved to the taskThread
     if (info.isTaskThread()) {
@@ -2157,7 +2159,6 @@ static void houseKeeping(void*)
 
       static PeriodicTask ztcTask{"ZTC", 60};
       static map<DNSName, RecZoneToCache::State> ztcStates;
-      auto luaconfsLocal = g_luaconfs.getLocal();
       ztcTask.runIfDue(now, [&luaconfsLocal]() {
         RecZoneToCache::maintainStates(luaconfsLocal->ztcConfigs, ztcStates, luaconfsLocal->generation);
         for (auto& ztc : luaconfsLocal->ztcConfigs) {
@@ -2289,7 +2290,6 @@ static void houseKeeping(void*)
         }
       });
 
-      auto luaconfsLocal = g_luaconfs.getLocal();
       static PeriodicTask trustAnchorTask{"trustAnchorTask", std::max(1U, luaconfsLocal->trustAnchorFileInfo.interval) * 3600};
       if (!trustAnchorTask.hasRun()) {
         // Loading the Lua config file already "refreshed" the TAs
index 548cc2c0c81fce6a0ddefab9c788284112d928d6..6c231f7fbe783661b37fe02df2a069e457952c1c 100644 (file)
@@ -193,13 +193,11 @@ using RemoteLoggerStats_t = std::unordered_map<std::string, RemoteLoggerInterfac
 extern bool g_logCommonErrors;
 extern size_t g_proxyProtocolMaximumSize;
 extern std::atomic<bool> g_quiet;
-extern thread_local std::shared_ptr<std::vector<std::unique_ptr<RemoteLogger>>> t_protobufServers;
 extern thread_local std::shared_ptr<RecursorLua4> t_pdl;
 extern bool g_gettagNeedsEDNSOptions;
 extern NetmaskGroup g_paddingFrom;
 extern unsigned int g_paddingTag;
 extern PaddingMode g_paddingMode;
-extern thread_local std::shared_ptr<std::vector<std::unique_ptr<RemoteLogger>>> t_outgoingProtobufServers;
 extern unsigned int g_maxMThreads;
 extern bool g_reusePort;
 extern bool g_anyToTcp;
@@ -249,6 +247,15 @@ extern thread_local std::shared_ptr<nod::NODDB> t_nodDBp;
 extern thread_local std::shared_ptr<nod::UniqueResponseDB> t_udrDBp;
 #endif
 
+struct ProtobufServersInfo
+{
+  std::shared_ptr<std::vector<std::unique_ptr<RemoteLogger>>> servers;
+  uint64_t generation;
+  ProtobufExportConfig config;
+};
+extern thread_local ProtobufServersInfo t_protobufServers;
+extern thread_local ProtobufServersInfo t_outgoingProtobufServers;
+
 #ifdef HAVE_FSTRM
 struct FrameStreamServersInfo
 {
index 59fe99ddf884d40e55d3c354542163828f993233..b6deec10b50fb549409c3e31c71d9e5cf47401aa 100644 (file)
@@ -400,12 +400,8 @@ static void handleRunningTCPQuestion(int fd, FDMultiplexer::funcparam_t& var)
       if (checkProtobufExport(luaconfsLocal)) {
         needECS = true;
       }
-      logQuery = t_protobufServers && luaconfsLocal->protobufExportConfig.logQueries;
-      dc->d_logResponse = t_protobufServers && luaconfsLocal->protobufExportConfig.logResponses;
-
-#ifdef HAVE_FSTRM
-      checkFrameStreamExport(luaconfsLocal, luaconfsLocal->frameStreamExportConfig, t_frameStreamServersInfo);
-#endif
+      logQuery = t_protobufServers.servers && luaconfsLocal->protobufExportConfig.logQueries;
+      dc->d_logResponse = t_protobufServers.servers && luaconfsLocal->protobufExportConfig.logResponses;
 
       if (needECS || (t_pdl && (t_pdl->d_gettag_ffi || t_pdl->d_gettag)) || dc->d_mdp.d_header.opcode == Opcode::Notify) {
 
@@ -454,14 +450,14 @@ static void handleRunningTCPQuestion(int fd, FDMultiplexer::funcparam_t& var)
       const dnsheader_aligned headerdata(conn->data.data());
       const struct dnsheader* dh = headerdata.get();
 
-      if (t_protobufServers || t_outgoingProtobufServers) {
+      if (t_protobufServers.servers || t_outgoingProtobufServers.servers) {
         dc->d_requestorId = requestorId;
         dc->d_deviceId = deviceId;
         dc->d_deviceName = deviceName;
         dc->d_uuid = getUniqueID();
       }
 
-      if (t_protobufServers) {
+      if (t_protobufServers.servers) {
         try {
 
           if (logQuery && !(luaconfsLocal->protobufExportConfig.taggedOnly && dc->d_policyTags.empty())) {
@@ -570,7 +566,7 @@ static void handleRunningTCPQuestion(int fd, FDMultiplexer::funcparam_t& var)
             g_stats.cumulativeAnswers(spentUsec);
             dc->d_eventTrace.add(RecEventTrace::AnswerSent);
 
-            if (t_protobufServers && dc->d_logResponse && !(luaconfsLocal->protobufExportConfig.taggedOnly && pbData && !pbData->d_tagged)) {
+            if (t_protobufServers.servers && dc->d_logResponse && !(luaconfsLocal->protobufExportConfig.taggedOnly && pbData && !pbData->d_tagged)) {
               struct timeval tv
               {
                 0, 0