]> git.ipfire.org Git - thirdparty/pdns.git/commitdiff
Reorganize queueData() with respect to logging.
authorOtto Moerbeek <otto.moerbeek@open-xchange.com>
Wed, 24 Aug 2022 07:56:26 +0000 (09:56 +0200)
committerOtto Moerbeek <otto.moerbeek@open-xchange.com>
Wed, 24 Aug 2022 07:56:26 +0000 (09:56 +0200)
Let queueData() return a status and log that via a program supplied helper.
This way, the program specific (recursor,dnsdist) logging isn't polluting the common code.

There are a few other cases that need to be dealt with some day.

dnsdist log levels should be reviewed (I copied the existing), they might be too verbose.

pdns/dnsdist-lua-actions.cc
pdns/fstrm_logger.cc
pdns/fstrm_logger.hh
pdns/lwres.cc
pdns/recursordist/rec-main.cc
pdns/remote_logger.cc
pdns/remote_logger.hh

index 45deb30646f6c9f7c79b5c078321afee45f774db..5004d3f1dbd01d1232175a4f67d12a19a95a4423 100644 (file)
@@ -1433,6 +1433,26 @@ static DnstapMessage::ProtocolType ProtocolToDNSTap(dnsdist::Protocol protocol)
   throw std::runtime_error("Unhandled protocol for dnstap: " + protocol.toPrettyString());
 }
 
+void remoteLoggerQueueData(RemoteLoggerInterface& r, const std::string& data)
+{
+  auto ret = r.queueData(data);
+
+  switch (ret) {
+  case RemoteLoggerInterface::Result::Queued:
+    break;
+  case RemoteLoggerInterface::Result::PipeFull: {
+    vinfolog("%s: queue full, dropping.", r.name().c_str());
+    break;
+  }
+  case RemoteLoggerInterface::Result::TooLarge: {
+    warnlog("%s: Not sending too large protobuf message", r.name().c_str());
+    break;
+  }
+  case RemoteLoggerInterface::Result::OtherError:
+    warnlog("%s: submitting to queue failed", r.name().c_str());
+  }
+}
+
 class DnstapLogAction : public DNSAction, public boost::noncopyable
 {
 public:
@@ -1454,7 +1474,7 @@ public:
       }
     }
 
-    d_logger->queueData(data);
+    remoteLoggerQueueData(*d_logger, data);
 
     return Action::None;
   }
@@ -1501,7 +1521,7 @@ public:
     static thread_local std::string data;
     data.clear();
     message.serialize(data);
-    d_logger->queueData(data);
+    remoteLoggerQueueData(*d_logger, data);
 
     return Action::None;
   }
@@ -1587,7 +1607,7 @@ public:
       }
     }
 
-    d_logger->queueData(data);
+    remoteLoggerQueueData(*d_logger, data);
 
     return Action::None;
   }
index c5339d645f9e29ff6cf7e40dfd147ff15aa7bcb4..1261953a80d9047914fa6260ee805dc56a238854 100644 (file)
@@ -158,20 +158,14 @@ FrameStreamLogger::~FrameStreamLogger()
   this->cleanup();
 }
 
-void FrameStreamLogger::queueData(const std::string& data)
+RemoteLoggerInterface::Result FrameStreamLogger::queueData(const std::string& data)
 {
   if (!d_ioqueue || !d_iothr) {
-    return;
+    return Result::OtherError;
   }
   uint8_t *frame = (uint8_t*)malloc(data.length());
   if (!frame) {
-#ifdef RECURSOR
-    SLOG(g_log<<Logger::Warning<<"FrameStreamLogger: cannot allocate memory for stream."<<std::endl,
-         g_slog->withName("dnstap")->info(Logr::Warning, "cannot allocate memory for stream"));
-#else
-    warnlog("FrameStreamLogger: cannot allocate memory for stream.");
-#endif
-    return;
+    return Result::TooLarge;
   }
   memcpy(frame, data.c_str(), data.length());
 
@@ -181,25 +175,16 @@ void FrameStreamLogger::queueData(const std::string& data)
   if (res == fstrm_res_success) {
     // Frame successfully queued.
     ++d_framesSent;
+    return Result::Queued;
   } else if (res == fstrm_res_again) {
     free(frame);
-#ifdef RECURSOR
-    SLOG(g_log<<Logger::Debug<<"FrameStreamLogger: queue full, dropping."<<std::endl,
-         g_slog->withName("dnstap")->info(Logr::Debug, "queue full, dropping"));
-#else
-    vinfolog("FrameStreamLogger: queue full, dropping.");
-#endif
     ++d_queueFullDrops;
+    return Result::PipeFull;
  } else {
     // Permanent failure.
     free(frame);
-#ifdef RECURSOR
-    SLOG(g_log<<Logger::Warning<<"FrameStreamLogger: submitting to queue failed."<<std::endl,
-         g_slog->withName("dnstap")->info(Logr::Warning, "submitting to queue failed"));
-#else
-    warnlog("FrameStreamLogger: submitting to queue failed.");
-#endif
     ++d_permanentFailures;
+    return Result::OtherError;
   }
 }
 
index a59e194b9deb6d25f0fd5502f2a06bce72ad9a77..624f67c7eb31c9f46385fd0ad02688eddaa40aad 100644 (file)
@@ -38,7 +38,11 @@ class FrameStreamLogger : public RemoteLoggerInterface, boost::noncopyable
 public:
   FrameStreamLogger(int family, const std::string& address, bool connect, const std::unordered_map<string,unsigned>& options = std::unordered_map<string,unsigned>());
   ~FrameStreamLogger();
-  void queueData(const std::string& data) override;
+  [[nodiscard]] RemoteLoggerInterface::Result queueData(const std::string& data) override;
+  std::string name() const override
+  {
+    return "framestream";
+  }
   std::string toString() const override
   {
     return "FrameStreamLogger to " + d_address + " (" + std::to_string(d_framesSent) + " frames sent, " + std::to_string(d_queueFullDrops) + " dropped, " + std::to_string(d_permanentFailures) + " permanent failures)";
index c1241e43c874df0efd7408b7ec624ec62d167283..d9ae39f14f20c9d4a707a4416cd84b18286fb472 100644 (file)
@@ -64,6 +64,37 @@ std::shared_ptr<Logr::Logger> g_slogout;
 
 bool g_syslog;
 
+void remoteLoggerQueueData(RemoteLoggerInterface& r, const std::string& data)
+{
+  auto ret = r.queueData(data);
+
+  switch (ret) {
+  case RemoteLoggerInterface::Result::Queued:
+    break;
+  case RemoteLoggerInterface::Result::PipeFull: {
+    const auto name = r.name();
+    const auto msg = "queue full, dropping";
+    SLOG(g_log << Logger::Debug << name << ": " << msg <<std::endl,
+         g_slog->withName(name)->info(Logr::Debug, msg));
+    break;
+  }
+  case RemoteLoggerInterface::Result::TooLarge: {
+    const auto name = r.name();
+    const auto msg = "Not sending too large protobuf message";
+    SLOG(g_log << Logger::Notice << name << ": " << msg <<endl,
+         g_slog->withName(name)->info(Logr::Debug, msg));
+    break;
+  }
+  case RemoteLoggerInterface::Result::OtherError: {
+    const auto name = r.name();
+    const auto msg = "submitting to queue failed";
+    SLOG(g_log << Logger::Warning << name << ": " << msg << std::endl,
+         g_slog->withName(name)->info(Logr::Warning, msg));
+    break;
+  }
+  }
+}
+
 static bool isEnabledForQueries(const std::shared_ptr<std::vector<std::unique_ptr<FrameStreamLogger>>>& fstreamLoggers)
 {
   if (fstreamLoggers == nullptr) {
@@ -88,7 +119,7 @@ static void logFstreamQuery(const std::shared_ptr<std::vector<std::unique_ptr<Fr
   DnstapMessage message(str, DnstapMessage::MessageType::resolver_query, SyncRes::s_serverID, &localip, &ip, protocol, reinterpret_cast<const char*>(&*packet.begin()), packet.size(), &ts, nullptr, auth);
 
   for (auto& logger : *fstreamLoggers) {
-    logger->queueData(str);
+    remoteLoggerQueueData(*logger, str);
   }
 }
 
@@ -117,7 +148,7 @@ static void logFstreamResponse(const std::shared_ptr<std::vector<std::unique_ptr
   DnstapMessage message(str, DnstapMessage::MessageType::resolver_response, SyncRes::s_serverID, &localip, &ip, protocol, reinterpret_cast<const char*>(packet.data()), packet.size(), &ts1, &ts2, auth);
 
   for (auto& logger : *fstreamLoggers) {
-    logger->queueData(str);
+    remoteLoggerQueueData(*logger, str);
   }
 }
 
@@ -175,7 +206,7 @@ static void logOutgoingQuery(const std::shared_ptr<std::vector<std::unique_ptr<R
 
   for (auto& logger : *outgoingLoggers) {
     if (logger->logQueries()) {
-      logger->queueData(buffer);
+      remoteLoggerQueueData(*logger, buffer);
     }
   }
 }
@@ -245,7 +276,7 @@ static void logIncomingResponse(const std::shared_ptr<std::vector<std::unique_pt
 
   for (auto& logger : *outgoingLoggers) {
     if (logger->logResponses()) {
-      logger->queueData(buffer);
+      remoteLoggerQueueData(*logger, buffer);
     }
   }
 }
index 4b37c1e969885e4816282d8342a5903f4421b569..6e031277d54628c24804dd116c8b03ffb9841516 100644 (file)
@@ -506,7 +506,7 @@ void protobufLogQuery(LocalStateHolder<LuaConfigItems>& luaconfsLocal, const boo
 
   std::string msg(m.finishAndMoveBuf());
   for (auto& server : *t_protobufServers) {
-    server->queueData(msg);
+    remoteLoggerQueueData(*server, msg);
   }
 }
 
@@ -518,7 +518,7 @@ void protobufLogResponse(pdns::ProtoZero::RecMessage& message)
 
   std::string msg(message.finishAndMoveBuf());
   for (auto& server : *t_protobufServers) {
-    server->queueData(msg);
+    remoteLoggerQueueData(*server, msg);
   }
 }
 
index 77eee2d446bd0d6286e9b020391fc1b5107e7c7e..0d3c37625c1a0bd7562e76c1a9ebcb50beb5b4b8 100644 (file)
@@ -131,17 +131,10 @@ bool RemoteLogger::reconnect()
   return true;
 }
 
-void RemoteLogger::queueData(const std::string& data)
+RemoteLoggerInterface::Result RemoteLogger::queueData(const std::string& data)
 {
   if (data.size() > std::numeric_limits<uint16_t>::max()) {
-    const auto msg = "Not sending too large protobuf message";
-#ifdef WE_ARE_RECURSOR
-    SLOG(g_log<<Logger::Info<<msg<<endl,
-         g_slog->withName("protobuf")->info(Logr::Info, msg));
-#else
-    warnlog(msg);
-#endif
-    return;
+    return Result::TooLarge;
   }
 
   auto runtime = d_runtime.lock();
@@ -150,33 +143,34 @@ void RemoteLogger::queueData(const std::string& data)
     /* not connected, queue is full, just drop */
     if (!runtime->d_socket) {
       ++d_drops;
-      return;
+      return Result::PipeFull;
     }
     try {
       /* we try to flush some data */
       if (!runtime->d_writer.flush(runtime->d_socket->getHandle())) {
         /* but failed, let's just drop */
         ++d_drops;
-        return;
+        return Result::PipeFull;
       }
 
       /* see if we freed enough data */
       if (!runtime->d_writer.hasRoomFor(data)) {
         /* we didn't */
         ++d_drops;
-        return;
+        return Result::PipeFull;
       }
     }
     catch(const std::exception& e) {
       //      cout << "Got exception writing: "<<e.what()<<endl;
       ++d_drops;
       runtime->d_socket.reset();
-      return;
+      return Result::PipeFull;
     }
   }
 
   runtime->d_writer.write(data);
   ++d_processed;
+  return Result::Queued;
 }
 
 void RemoteLogger::maintenanceThread() 
index 04db3b53ad865d5e980fa099b514ba62e2e9f764..7c59626343bb60ccf13fdb39bb1db074363d14e4 100644 (file)
@@ -60,10 +60,12 @@ private:
 class RemoteLoggerInterface
 {
 public:
+  enum class Result : uint8_t { Queued, PipeFull, TooLarge, OtherError };
+
   virtual ~RemoteLoggerInterface() {};
-  virtual void queueData(const std::string& data) = 0;
+  virtual Result queueData(const std::string& data) = 0;
   virtual std::string toString() const = 0;
-
+  virtual std::string name() const = 0;
   bool logQueries(void) const { return d_logQueries; }
   bool logResponses(void) const { return d_logResponses; }
   void setLogQueries(bool flag) { d_logQueries = flag; }
@@ -87,7 +89,12 @@ public:
                uint8_t reconnectWaitTime=1,
                bool asyncConnect=false);
   ~RemoteLogger();
-  void queueData(const std::string& data) override;
+
+  [[nodiscard]] Result queueData(const std::string& data) override;
+  std::string name() const override
+  {
+    return "protobuf";
+  }
   std::string toString() const override
   {
     return d_remote.toStringWithPort() + " (" + std::to_string(d_processed) + " processed, " + std::to_string(d_drops) + " dropped)";
@@ -118,3 +125,6 @@ private:
   LockGuarded<RuntimeData> d_runtime;
   std::thread d_thread;
 };
+
+// Helper to be defined by main program: queue data and log based on return value of queueData()
+void remoteLoggerQueueData(RemoteLoggerInterface&, const std::string&);