Let queueData() return a status and log that via a program supplied helper.
This way, the program specific (recursor,dnsdist) logging isn't polluting the common code.
There are a few other cases that need to be dealt with some day.
dnsdist log levels should be reviewed (I copied the existing), they might be too verbose.
throw std::runtime_error("Unhandled protocol for dnstap: " + protocol.toPrettyString());
}
+void remoteLoggerQueueData(RemoteLoggerInterface& r, const std::string& data)
+{
+ auto ret = r.queueData(data);
+
+ switch (ret) {
+ case RemoteLoggerInterface::Result::Queued:
+ break;
+ case RemoteLoggerInterface::Result::PipeFull: {
+ vinfolog("%s: queue full, dropping.", r.name().c_str());
+ break;
+ }
+ case RemoteLoggerInterface::Result::TooLarge: {
+ warnlog("%s: Not sending too large protobuf message", r.name().c_str());
+ break;
+ }
+ case RemoteLoggerInterface::Result::OtherError:
+ warnlog("%s: submitting to queue failed", r.name().c_str());
+ }
+}
+
class DnstapLogAction : public DNSAction, public boost::noncopyable
{
public:
}
}
- d_logger->queueData(data);
+ remoteLoggerQueueData(*d_logger, data);
return Action::None;
}
static thread_local std::string data;
data.clear();
message.serialize(data);
- d_logger->queueData(data);
+ remoteLoggerQueueData(*d_logger, data);
return Action::None;
}
}
}
- d_logger->queueData(data);
+ remoteLoggerQueueData(*d_logger, data);
return Action::None;
}
this->cleanup();
}
-void FrameStreamLogger::queueData(const std::string& data)
+RemoteLoggerInterface::Result FrameStreamLogger::queueData(const std::string& data)
{
if (!d_ioqueue || !d_iothr) {
- return;
+ return Result::OtherError;
}
uint8_t *frame = (uint8_t*)malloc(data.length());
if (!frame) {
-#ifdef RECURSOR
- SLOG(g_log<<Logger::Warning<<"FrameStreamLogger: cannot allocate memory for stream."<<std::endl,
- g_slog->withName("dnstap")->info(Logr::Warning, "cannot allocate memory for stream"));
-#else
- warnlog("FrameStreamLogger: cannot allocate memory for stream.");
-#endif
- return;
+ return Result::TooLarge;
}
memcpy(frame, data.c_str(), data.length());
if (res == fstrm_res_success) {
// Frame successfully queued.
++d_framesSent;
+ return Result::Queued;
} else if (res == fstrm_res_again) {
free(frame);
-#ifdef RECURSOR
- SLOG(g_log<<Logger::Debug<<"FrameStreamLogger: queue full, dropping."<<std::endl,
- g_slog->withName("dnstap")->info(Logr::Debug, "queue full, dropping"));
-#else
- vinfolog("FrameStreamLogger: queue full, dropping.");
-#endif
++d_queueFullDrops;
+ return Result::PipeFull;
} else {
// Permanent failure.
free(frame);
-#ifdef RECURSOR
- SLOG(g_log<<Logger::Warning<<"FrameStreamLogger: submitting to queue failed."<<std::endl,
- g_slog->withName("dnstap")->info(Logr::Warning, "submitting to queue failed"));
-#else
- warnlog("FrameStreamLogger: submitting to queue failed.");
-#endif
++d_permanentFailures;
+ return Result::OtherError;
}
}
public:
FrameStreamLogger(int family, const std::string& address, bool connect, const std::unordered_map<string,unsigned>& options = std::unordered_map<string,unsigned>());
~FrameStreamLogger();
- void queueData(const std::string& data) override;
+ [[nodiscard]] RemoteLoggerInterface::Result queueData(const std::string& data) override;
+ std::string name() const override
+ {
+ return "framestream";
+ }
std::string toString() const override
{
return "FrameStreamLogger to " + d_address + " (" + std::to_string(d_framesSent) + " frames sent, " + std::to_string(d_queueFullDrops) + " dropped, " + std::to_string(d_permanentFailures) + " permanent failures)";
bool g_syslog;
+void remoteLoggerQueueData(RemoteLoggerInterface& r, const std::string& data)
+{
+ auto ret = r.queueData(data);
+
+ switch (ret) {
+ case RemoteLoggerInterface::Result::Queued:
+ break;
+ case RemoteLoggerInterface::Result::PipeFull: {
+ const auto name = r.name();
+ const auto msg = "queue full, dropping";
+ SLOG(g_log << Logger::Debug << name << ": " << msg <<std::endl,
+ g_slog->withName(name)->info(Logr::Debug, msg));
+ break;
+ }
+ case RemoteLoggerInterface::Result::TooLarge: {
+ const auto name = r.name();
+ const auto msg = "Not sending too large protobuf message";
+ SLOG(g_log << Logger::Notice << name << ": " << msg <<endl,
+ g_slog->withName(name)->info(Logr::Debug, msg));
+ break;
+ }
+ case RemoteLoggerInterface::Result::OtherError: {
+ const auto name = r.name();
+ const auto msg = "submitting to queue failed";
+ SLOG(g_log << Logger::Warning << name << ": " << msg << std::endl,
+ g_slog->withName(name)->info(Logr::Warning, msg));
+ break;
+ }
+ }
+}
+
static bool isEnabledForQueries(const std::shared_ptr<std::vector<std::unique_ptr<FrameStreamLogger>>>& fstreamLoggers)
{
if (fstreamLoggers == nullptr) {
DnstapMessage message(str, DnstapMessage::MessageType::resolver_query, SyncRes::s_serverID, &localip, &ip, protocol, reinterpret_cast<const char*>(&*packet.begin()), packet.size(), &ts, nullptr, auth);
for (auto& logger : *fstreamLoggers) {
- logger->queueData(str);
+ remoteLoggerQueueData(*logger, str);
}
}
DnstapMessage message(str, DnstapMessage::MessageType::resolver_response, SyncRes::s_serverID, &localip, &ip, protocol, reinterpret_cast<const char*>(packet.data()), packet.size(), &ts1, &ts2, auth);
for (auto& logger : *fstreamLoggers) {
- logger->queueData(str);
+ remoteLoggerQueueData(*logger, str);
}
}
for (auto& logger : *outgoingLoggers) {
if (logger->logQueries()) {
- logger->queueData(buffer);
+ remoteLoggerQueueData(*logger, buffer);
}
}
}
for (auto& logger : *outgoingLoggers) {
if (logger->logResponses()) {
- logger->queueData(buffer);
+ remoteLoggerQueueData(*logger, buffer);
}
}
}
std::string msg(m.finishAndMoveBuf());
for (auto& server : *t_protobufServers) {
- server->queueData(msg);
+ remoteLoggerQueueData(*server, msg);
}
}
std::string msg(message.finishAndMoveBuf());
for (auto& server : *t_protobufServers) {
- server->queueData(msg);
+ remoteLoggerQueueData(*server, msg);
}
}
return true;
}
-void RemoteLogger::queueData(const std::string& data)
+RemoteLoggerInterface::Result RemoteLogger::queueData(const std::string& data)
{
if (data.size() > std::numeric_limits<uint16_t>::max()) {
- const auto msg = "Not sending too large protobuf message";
-#ifdef WE_ARE_RECURSOR
- SLOG(g_log<<Logger::Info<<msg<<endl,
- g_slog->withName("protobuf")->info(Logr::Info, msg));
-#else
- warnlog(msg);
-#endif
- return;
+ return Result::TooLarge;
}
auto runtime = d_runtime.lock();
/* not connected, queue is full, just drop */
if (!runtime->d_socket) {
++d_drops;
- return;
+ return Result::PipeFull;
}
try {
/* we try to flush some data */
if (!runtime->d_writer.flush(runtime->d_socket->getHandle())) {
/* but failed, let's just drop */
++d_drops;
- return;
+ return Result::PipeFull;
}
/* see if we freed enough data */
if (!runtime->d_writer.hasRoomFor(data)) {
/* we didn't */
++d_drops;
- return;
+ return Result::PipeFull;
}
}
catch(const std::exception& e) {
// cout << "Got exception writing: "<<e.what()<<endl;
++d_drops;
runtime->d_socket.reset();
- return;
+ return Result::PipeFull;
}
}
runtime->d_writer.write(data);
++d_processed;
+ return Result::Queued;
}
void RemoteLogger::maintenanceThread()
class RemoteLoggerInterface
{
public:
+ enum class Result : uint8_t { Queued, PipeFull, TooLarge, OtherError };
+
virtual ~RemoteLoggerInterface() {};
- virtual void queueData(const std::string& data) = 0;
+ virtual Result queueData(const std::string& data) = 0;
virtual std::string toString() const = 0;
-
+ virtual std::string name() const = 0;
bool logQueries(void) const { return d_logQueries; }
bool logResponses(void) const { return d_logResponses; }
void setLogQueries(bool flag) { d_logQueries = flag; }
uint8_t reconnectWaitTime=1,
bool asyncConnect=false);
~RemoteLogger();
- void queueData(const std::string& data) override;
+
+ [[nodiscard]] Result queueData(const std::string& data) override;
+ std::string name() const override
+ {
+ return "protobuf";
+ }
std::string toString() const override
{
return d_remote.toStringWithPort() + " (" + std::to_string(d_processed) + " processed, " + std::to_string(d_drops) + " dropped)";
LockGuarded<RuntimeData> d_runtime;
std::thread d_thread;
};
+
+// Helper to be defined by main program: queue data and log based on return value of queueData()
+void remoteLoggerQueueData(RemoteLoggerInterface&, const std::string&);