From: Otto Moerbeek Date: Wed, 24 Aug 2022 07:56:26 +0000 (+0200) Subject: Reorganize queueData() with respect to logging. X-Git-Tag: rec-4.8.0-alpha1~47^2~1 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=4d7db3d7b5dae1ede8be1b595174ee893600b864;p=thirdparty%2Fpdns.git Reorganize queueData() with respect to logging. Let queueData() return a status and log that via a program supplied helper. This way, the program specific (recursor,dnsdist) logging isn't polluting the common code. There are a few other cases that need to be dealt with some day. dnsdist log levels should be reviewed (I copied the existing), they might be too verbose. --- diff --git a/pdns/dnsdist-lua-actions.cc b/pdns/dnsdist-lua-actions.cc index 45deb30646..5004d3f1db 100644 --- a/pdns/dnsdist-lua-actions.cc +++ b/pdns/dnsdist-lua-actions.cc @@ -1433,6 +1433,26 @@ static DnstapMessage::ProtocolType ProtocolToDNSTap(dnsdist::Protocol protocol) throw std::runtime_error("Unhandled protocol for dnstap: " + protocol.toPrettyString()); } +void remoteLoggerQueueData(RemoteLoggerInterface& r, const std::string& data) +{ + auto ret = r.queueData(data); + + switch (ret) { + case RemoteLoggerInterface::Result::Queued: + break; + case RemoteLoggerInterface::Result::PipeFull: { + vinfolog("%s: queue full, dropping.", r.name().c_str()); + break; + } + case RemoteLoggerInterface::Result::TooLarge: { + warnlog("%s: Not sending too large protobuf message", r.name().c_str()); + break; + } + case RemoteLoggerInterface::Result::OtherError: + warnlog("%s: submitting to queue failed", r.name().c_str()); + } +} + class DnstapLogAction : public DNSAction, public boost::noncopyable { public: @@ -1454,7 +1474,7 @@ public: } } - d_logger->queueData(data); + remoteLoggerQueueData(*d_logger, data); return Action::None; } @@ -1501,7 +1521,7 @@ public: static thread_local std::string data; data.clear(); message.serialize(data); - d_logger->queueData(data); + remoteLoggerQueueData(*d_logger, data); return Action::None; } @@ -1587,7 +1607,7 @@ public: } } - d_logger->queueData(data); + remoteLoggerQueueData(*d_logger, data); return Action::None; } diff --git a/pdns/fstrm_logger.cc b/pdns/fstrm_logger.cc index c5339d645f..1261953a80 100644 --- a/pdns/fstrm_logger.cc +++ b/pdns/fstrm_logger.cc @@ -158,20 +158,14 @@ FrameStreamLogger::~FrameStreamLogger() this->cleanup(); } -void FrameStreamLogger::queueData(const std::string& data) +RemoteLoggerInterface::Result FrameStreamLogger::queueData(const std::string& data) { if (!d_ioqueue || !d_iothr) { - return; + return Result::OtherError; } uint8_t *frame = (uint8_t*)malloc(data.length()); if (!frame) { -#ifdef RECURSOR - SLOG(g_log<withName("dnstap")->info(Logr::Warning, "cannot allocate memory for stream")); -#else - warnlog("FrameStreamLogger: cannot allocate memory for stream."); -#endif - return; + return Result::TooLarge; } memcpy(frame, data.c_str(), data.length()); @@ -181,25 +175,16 @@ void FrameStreamLogger::queueData(const std::string& data) if (res == fstrm_res_success) { // Frame successfully queued. ++d_framesSent; + return Result::Queued; } else if (res == fstrm_res_again) { free(frame); -#ifdef RECURSOR - SLOG(g_log<withName("dnstap")->info(Logr::Debug, "queue full, dropping")); -#else - vinfolog("FrameStreamLogger: queue full, dropping."); -#endif ++d_queueFullDrops; + return Result::PipeFull; } else { // Permanent failure. free(frame); -#ifdef RECURSOR - SLOG(g_log<withName("dnstap")->info(Logr::Warning, "submitting to queue failed")); -#else - warnlog("FrameStreamLogger: submitting to queue failed."); -#endif ++d_permanentFailures; + return Result::OtherError; } } diff --git a/pdns/fstrm_logger.hh b/pdns/fstrm_logger.hh index a59e194b9d..624f67c7eb 100644 --- a/pdns/fstrm_logger.hh +++ b/pdns/fstrm_logger.hh @@ -38,7 +38,11 @@ class FrameStreamLogger : public RemoteLoggerInterface, boost::noncopyable public: FrameStreamLogger(int family, const std::string& address, bool connect, const std::unordered_map& options = std::unordered_map()); ~FrameStreamLogger(); - void queueData(const std::string& data) override; + [[nodiscard]] RemoteLoggerInterface::Result queueData(const std::string& data) override; + std::string name() const override + { + return "framestream"; + } std::string toString() const override { return "FrameStreamLogger to " + d_address + " (" + std::to_string(d_framesSent) + " frames sent, " + std::to_string(d_queueFullDrops) + " dropped, " + std::to_string(d_permanentFailures) + " permanent failures)"; diff --git a/pdns/lwres.cc b/pdns/lwres.cc index c1241e43c8..d9ae39f14f 100644 --- a/pdns/lwres.cc +++ b/pdns/lwres.cc @@ -64,6 +64,37 @@ std::shared_ptr g_slogout; bool g_syslog; +void remoteLoggerQueueData(RemoteLoggerInterface& r, const std::string& data) +{ + auto ret = r.queueData(data); + + switch (ret) { + case RemoteLoggerInterface::Result::Queued: + break; + case RemoteLoggerInterface::Result::PipeFull: { + const auto name = r.name(); + const auto msg = "queue full, dropping"; + SLOG(g_log << Logger::Debug << name << ": " << msg <withName(name)->info(Logr::Debug, msg)); + break; + } + case RemoteLoggerInterface::Result::TooLarge: { + const auto name = r.name(); + const auto msg = "Not sending too large protobuf message"; + SLOG(g_log << Logger::Notice << name << ": " << msg <withName(name)->info(Logr::Debug, msg)); + break; + } + case RemoteLoggerInterface::Result::OtherError: { + const auto name = r.name(); + const auto msg = "submitting to queue failed"; + SLOG(g_log << Logger::Warning << name << ": " << msg << std::endl, + g_slog->withName(name)->info(Logr::Warning, msg)); + break; + } + } +} + static bool isEnabledForQueries(const std::shared_ptr>>& fstreamLoggers) { if (fstreamLoggers == nullptr) { @@ -88,7 +119,7 @@ static void logFstreamQuery(const std::shared_ptr(&*packet.begin()), packet.size(), &ts, nullptr, auth); for (auto& logger : *fstreamLoggers) { - logger->queueData(str); + remoteLoggerQueueData(*logger, str); } } @@ -117,7 +148,7 @@ static void logFstreamResponse(const std::shared_ptr(packet.data()), packet.size(), &ts1, &ts2, auth); for (auto& logger : *fstreamLoggers) { - logger->queueData(str); + remoteLoggerQueueData(*logger, str); } } @@ -175,7 +206,7 @@ static void logOutgoingQuery(const std::shared_ptrlogQueries()) { - logger->queueData(buffer); + remoteLoggerQueueData(*logger, buffer); } } } @@ -245,7 +276,7 @@ static void logIncomingResponse(const std::shared_ptrlogResponses()) { - logger->queueData(buffer); + remoteLoggerQueueData(*logger, buffer); } } } diff --git a/pdns/recursordist/rec-main.cc b/pdns/recursordist/rec-main.cc index 4b37c1e969..6e031277d5 100644 --- a/pdns/recursordist/rec-main.cc +++ b/pdns/recursordist/rec-main.cc @@ -506,7 +506,7 @@ void protobufLogQuery(LocalStateHolder& luaconfsLocal, const boo std::string msg(m.finishAndMoveBuf()); for (auto& server : *t_protobufServers) { - server->queueData(msg); + remoteLoggerQueueData(*server, msg); } } @@ -518,7 +518,7 @@ void protobufLogResponse(pdns::ProtoZero::RecMessage& message) std::string msg(message.finishAndMoveBuf()); for (auto& server : *t_protobufServers) { - server->queueData(msg); + remoteLoggerQueueData(*server, msg); } } diff --git a/pdns/remote_logger.cc b/pdns/remote_logger.cc index 77eee2d446..0d3c37625c 100644 --- a/pdns/remote_logger.cc +++ b/pdns/remote_logger.cc @@ -131,17 +131,10 @@ bool RemoteLogger::reconnect() return true; } -void RemoteLogger::queueData(const std::string& data) +RemoteLoggerInterface::Result RemoteLogger::queueData(const std::string& data) { if (data.size() > std::numeric_limits::max()) { - const auto msg = "Not sending too large protobuf message"; -#ifdef WE_ARE_RECURSOR - SLOG(g_log<withName("protobuf")->info(Logr::Info, msg)); -#else - warnlog(msg); -#endif - return; + return Result::TooLarge; } auto runtime = d_runtime.lock(); @@ -150,33 +143,34 @@ void RemoteLogger::queueData(const std::string& data) /* not connected, queue is full, just drop */ if (!runtime->d_socket) { ++d_drops; - return; + return Result::PipeFull; } try { /* we try to flush some data */ if (!runtime->d_writer.flush(runtime->d_socket->getHandle())) { /* but failed, let's just drop */ ++d_drops; - return; + return Result::PipeFull; } /* see if we freed enough data */ if (!runtime->d_writer.hasRoomFor(data)) { /* we didn't */ ++d_drops; - return; + return Result::PipeFull; } } catch(const std::exception& e) { // cout << "Got exception writing: "<d_socket.reset(); - return; + return Result::PipeFull; } } runtime->d_writer.write(data); ++d_processed; + return Result::Queued; } void RemoteLogger::maintenanceThread() diff --git a/pdns/remote_logger.hh b/pdns/remote_logger.hh index 04db3b53ad..7c59626343 100644 --- a/pdns/remote_logger.hh +++ b/pdns/remote_logger.hh @@ -60,10 +60,12 @@ private: class RemoteLoggerInterface { public: + enum class Result : uint8_t { Queued, PipeFull, TooLarge, OtherError }; + virtual ~RemoteLoggerInterface() {}; - virtual void queueData(const std::string& data) = 0; + virtual Result queueData(const std::string& data) = 0; virtual std::string toString() const = 0; - + virtual std::string name() const = 0; bool logQueries(void) const { return d_logQueries; } bool logResponses(void) const { return d_logResponses; } void setLogQueries(bool flag) { d_logQueries = flag; } @@ -87,7 +89,12 @@ public: uint8_t reconnectWaitTime=1, bool asyncConnect=false); ~RemoteLogger(); - void queueData(const std::string& data) override; + + [[nodiscard]] Result queueData(const std::string& data) override; + std::string name() const override + { + return "protobuf"; + } std::string toString() const override { return d_remote.toStringWithPort() + " (" + std::to_string(d_processed) + " processed, " + std::to_string(d_drops) + " dropped)"; @@ -118,3 +125,6 @@ private: LockGuarded d_runtime; std::thread d_thread; }; + +// Helper to be defined by main program: queue data and log based on return value of queueData() +void remoteLoggerQueueData(RemoteLoggerInterface&, const std::string&);