]> git.ipfire.org Git - thirdparty/pdns.git/blobdiff - pdns/fstrm_logger.cc
Merge pull request #7908 from omoerbeek/rec-4.1.14-changelog
[thirdparty/pdns.git] / pdns / fstrm_logger.cc
index 617aa73d5cfd76df42c687f3c7d52b2500c39e0a..62ccd72ebd10a5b3f52c9b5ebd7b3ef89c66f6d9 100644 (file)
@@ -3,13 +3,19 @@
 
 #include "config.h"
 #include "fstrm_logger.hh"
+
+#ifdef RECURSOR
+#include "logger.hh"
+#else
 #include "dolog.hh"
+#endif
 
 #define DNSTAP_CONTENT_TYPE            "protobuf:dnstap.Dnstap"
 
 #ifdef HAVE_FSTRM
 
-FrameStreamLogger::FrameStreamLogger(const int family, const std::string& address, bool connect): d_family(family), d_address(address)
+FrameStreamLogger::FrameStreamLogger(const int family, const std::string& address, bool connect,
+    const std::unordered_map<string,unsigned>& options): d_family(family), d_address(address)
 {
   fstrm_res res;
 
@@ -78,6 +84,44 @@ FrameStreamLogger::FrameStreamLogger(const int family, const std::string& addres
       throw std::runtime_error("FrameStreamLogger: fstrm_iothr_options_set_queue_model failed: " + std::to_string(res));
     }
 
+    if (options.find("bufferHint") != options.end() && options.at("bufferHint")) {
+      res = fstrm_iothr_options_set_buffer_hint(d_iothropt, options.at("bufferHint"));
+      if (res != fstrm_res_success) {
+        throw std::runtime_error("FrameStreamLogger: fstrm_iothr_options_set_buffer_hint failed: " + std::to_string(res));
+      }
+    }
+    if (options.find("flushTimeout") != options.end() && options.at("flushTimeout")) {
+      res = fstrm_iothr_options_set_flush_timeout(d_iothropt, options.at("flushTimeout"));
+      if (res != fstrm_res_success) {
+        throw std::runtime_error("FrameStreamLogger: fstrm_iothr_options_set_flush_timeout failed: " + std::to_string(res));
+      }
+    }
+    if (options.find("inputQueueSize") != options.end() && options.at("inputQueueSize")) {
+      res = fstrm_iothr_options_set_input_queue_size(d_iothropt, options.at("inputQueueSize"));
+      if (res != fstrm_res_success) {
+        throw std::runtime_error("FrameStreamLogger: fstrm_iothr_options_set_input_queue_size failed: " + std::to_string(res));
+      }
+    }
+    if (options.find("outputQueueSize") != options.end() && options.at("outputQueueSize")) {
+      res = fstrm_iothr_options_set_output_queue_size(d_iothropt, options.at("outputQueueSize"));
+      if (res != fstrm_res_success) {
+        throw std::runtime_error("FrameStreamLogger: fstrm_iothr_options_set_output_queue_size failed: " + std::to_string(res));
+      }
+    }
+    if (options.find("queueNotifyThreshold") != options.end() && options.at("queueNotifyThreshold")) {
+      res = fstrm_iothr_options_set_queue_notify_threshold(d_iothropt, options.at("queueNotifyThreshold"));
+      if (res != fstrm_res_success) {
+        throw std::runtime_error("FrameStreamLogger: fstrm_iothr_options_set_queue_notify_threshold failed: " + std::to_string(res));
+      }
+    }
+    if (options.find("setReopenInterval") != options.end() && options.at("setReopenInterval")) {
+      res = fstrm_iothr_options_set_reopen_interval(d_iothropt, options.at("setReopenInterval"));
+      if (res != fstrm_res_success) {
+        throw std::runtime_error("FrameStreamLogger: fstrm_iothr_options_set_reopen_interval failed: " + std::to_string(res));
+      }
+    }
+
+
     if (connect) {
       d_iothr = fstrm_iothr_init(d_iothropt, &d_writer);
       if (!d_iothr) {
@@ -137,7 +181,11 @@ void FrameStreamLogger::queueData(const std::string& data)
   }
   uint8_t *frame = (uint8_t*)malloc(data.length());
   if (!frame) {
+#ifdef RECURSOR
+    g_log<<Logger::Warning<<"FrameStreamLogger: cannot allocate memory for stream."<<std::endl;
+#else
     warnlog("FrameStreamLogger: cannot allocate memory for stream.");
+#endif
     return;
   }
   memcpy(frame, data.c_str(), data.length());
@@ -149,11 +197,19 @@ void FrameStreamLogger::queueData(const std::string& data)
     // Frame successfully queued.
   } else if (res == fstrm_res_again) {
     free(frame);
+#ifdef RECURSOR
+    g_log<<Logger::Warning<<"FrameStreamLogger: queue full, dropping."<<std::endl;
+#else
     warnlog("FrameStreamLogger: queue full, dropping.");
-  } else {
+#endif
+ } else {
     // Permanent failure.
     free(frame);
+#ifdef RECURSOR
+    g_log<<Logger::Warning<<"FrameStreamLogger: submitting to queue failed."<<std::endl;
+#else
     warnlog("FrameStreamLogger: submitting to queue failed.");
+#endif
   }
 }