#include "config.h"
#include "fstrm_logger.hh"
+
+#ifdef RECURSOR
+#include "logger.hh"
+#else
#include "dolog.hh"
+#endif
#define DNSTAP_CONTENT_TYPE "protobuf:dnstap.Dnstap"
#ifdef HAVE_FSTRM
-FrameStreamLogger::FrameStreamLogger(const int family, const std::string& address, bool connect): d_family(family), d_address(address)
+FrameStreamLogger::FrameStreamLogger(const int family, const std::string& address, bool connect,
+ const std::unordered_map<string,unsigned>& options): d_family(family), d_address(address)
{
fstrm_res res;
throw std::runtime_error("FrameStreamLogger: fstrm_iothr_options_set_queue_model failed: " + std::to_string(res));
}
+ if (options.find("bufferHint") != options.end() && options.at("bufferHint")) {
+ res = fstrm_iothr_options_set_buffer_hint(d_iothropt, options.at("bufferHint"));
+ if (res != fstrm_res_success) {
+ throw std::runtime_error("FrameStreamLogger: fstrm_iothr_options_set_buffer_hint failed: " + std::to_string(res));
+ }
+ }
+ if (options.find("flushTimeout") != options.end() && options.at("flushTimeout")) {
+ res = fstrm_iothr_options_set_flush_timeout(d_iothropt, options.at("flushTimeout"));
+ if (res != fstrm_res_success) {
+ throw std::runtime_error("FrameStreamLogger: fstrm_iothr_options_set_flush_timeout failed: " + std::to_string(res));
+ }
+ }
+ if (options.find("inputQueueSize") != options.end() && options.at("inputQueueSize")) {
+ res = fstrm_iothr_options_set_input_queue_size(d_iothropt, options.at("inputQueueSize"));
+ if (res != fstrm_res_success) {
+ throw std::runtime_error("FrameStreamLogger: fstrm_iothr_options_set_input_queue_size failed: " + std::to_string(res));
+ }
+ }
+ if (options.find("outputQueueSize") != options.end() && options.at("outputQueueSize")) {
+ res = fstrm_iothr_options_set_output_queue_size(d_iothropt, options.at("outputQueueSize"));
+ if (res != fstrm_res_success) {
+ throw std::runtime_error("FrameStreamLogger: fstrm_iothr_options_set_output_queue_size failed: " + std::to_string(res));
+ }
+ }
+ if (options.find("queueNotifyThreshold") != options.end() && options.at("queueNotifyThreshold")) {
+ res = fstrm_iothr_options_set_queue_notify_threshold(d_iothropt, options.at("queueNotifyThreshold"));
+ if (res != fstrm_res_success) {
+ throw std::runtime_error("FrameStreamLogger: fstrm_iothr_options_set_queue_notify_threshold failed: " + std::to_string(res));
+ }
+ }
+ if (options.find("setReopenInterval") != options.end() && options.at("setReopenInterval")) {
+ res = fstrm_iothr_options_set_reopen_interval(d_iothropt, options.at("setReopenInterval"));
+ if (res != fstrm_res_success) {
+ throw std::runtime_error("FrameStreamLogger: fstrm_iothr_options_set_reopen_interval failed: " + std::to_string(res));
+ }
+ }
+
+
if (connect) {
d_iothr = fstrm_iothr_init(d_iothropt, &d_writer);
if (!d_iothr) {
}
uint8_t *frame = (uint8_t*)malloc(data.length());
if (!frame) {
+#ifdef RECURSOR
+ g_log<<Logger::Warning<<"FrameStreamLogger: cannot allocate memory for stream."<<std::endl;
+#else
warnlog("FrameStreamLogger: cannot allocate memory for stream.");
+#endif
return;
}
memcpy(frame, data.c_str(), data.length());
// Frame successfully queued.
} else if (res == fstrm_res_again) {
free(frame);
+#ifdef RECURSOR
+ g_log<<Logger::Warning<<"FrameStreamLogger: queue full, dropping."<<std::endl;
+#else
warnlog("FrameStreamLogger: queue full, dropping.");
- } else {
+#endif
+ } else {
// Permanent failure.
free(frame);
+#ifdef RECURSOR
+ g_log<<Logger::Warning<<"FrameStreamLogger: submitting to queue failed."<<std::endl;
+#else
warnlog("FrameStreamLogger: submitting to queue failed.");
+#endif
}
}