5 #include "fstrm_logger.hh"
13 #define DNSTAP_CONTENT_TYPE "protobuf:dnstap.Dnstap"
17 FrameStreamLogger::FrameStreamLogger(const int family
, const std::string
& address
, bool connect
,
18 const std::unordered_map
<string
,unsigned>& options
): d_family(family
), d_address(address
)
23 d_fwopt
= fstrm_writer_options_init();
25 throw std::runtime_error("FrameStreamLogger: fstrm_writer_options_init failed.");
28 res
= fstrm_writer_options_add_content_type(d_fwopt
, DNSTAP_CONTENT_TYPE
, sizeof(DNSTAP_CONTENT_TYPE
) - 1);
29 if (res
!= fstrm_res_success
) {
30 throw std::runtime_error("FrameStreamLogger: fstrm_writer_options_add_content_type failed: " + std::to_string(res
));
33 if (d_family
== AF_UNIX
) {
34 struct sockaddr_un local
;
35 if (makeUNsockaddr(d_address
, &local
)) {
36 throw std::runtime_error("FrameStreamLogger: Unable to use '" + d_address
+ "', it is not a valid UNIX socket path.");
39 d_uwopt
= fstrm_unix_writer_options_init();
41 throw std::runtime_error("FrameStreamLogger: fstrm_unix_writer_options_init failed.");
44 // void return, no error checking.
45 fstrm_unix_writer_options_set_socket_path(d_uwopt
, d_address
.c_str());
47 d_writer
= fstrm_unix_writer_init(d_uwopt
, d_fwopt
);
49 throw std::runtime_error("FrameStreamLogger: fstrm_unix_writer_init() failed.");
51 #ifdef HAVE_FSTRM_TCP_WRITER_INIT
52 } else if (family
== AF_INET
) {
53 d_twopt
= fstrm_tcp_writer_options_init();
55 throw std::runtime_error("FrameStreamLogger: fstrm_tcp_writer_options_init failed.");
59 ComboAddress
ca(d_address
);
61 // void return, no error checking.
62 fstrm_tcp_writer_options_set_socket_address(d_twopt
, ca
.toString().c_str());
63 fstrm_tcp_writer_options_set_socket_port(d_twopt
, std::to_string(ca
.getPort()).c_str());
64 } catch (PDNSException
&e
) {
65 throw std::runtime_error("FrameStreamLogger: Unable to use '" + d_address
+ "': " + e
.reason
);
68 d_writer
= fstrm_tcp_writer_init(d_twopt
, d_fwopt
);
70 throw std::runtime_error("FrameStreamLogger: fstrm_tcp_writer_init() failed.");
74 throw std::runtime_error("FrameStreamLogger: family " + std::to_string(family
) + " not supported");
77 d_iothropt
= fstrm_iothr_options_init();
79 throw std::runtime_error("FrameStreamLogger: fstrm_iothr_options_init() failed.");
82 res
= fstrm_iothr_options_set_queue_model(d_iothropt
, FSTRM_IOTHR_QUEUE_MODEL_MPSC
);
83 if (res
!= fstrm_res_success
) {
84 throw std::runtime_error("FrameStreamLogger: fstrm_iothr_options_set_queue_model failed: " + std::to_string(res
));
88 const std::string name
;
89 fstrm_res (*function
)(struct fstrm_iothr_options
*, const unsigned int);
91 { "bufferHint", fstrm_iothr_options_set_buffer_hint
},
92 { "flushTimeout", fstrm_iothr_options_set_flush_timeout
},
93 { "inputQueueSize", fstrm_iothr_options_set_input_queue_size
},
94 { "outputQueueSize", fstrm_iothr_options_set_output_queue_size
},
95 { "queueNotifyThreshold", fstrm_iothr_options_set_queue_notify_threshold
},
96 { "setReopenInterval", fstrm_iothr_options_set_reopen_interval
}
99 for (const auto& i
: list
) {
100 if (options
.find(i
.name
) != options
.end() && options
.at(i
.name
)) {
101 fstrm_res r
= i
.function(d_iothropt
, options
.at(i
.name
));
102 if (r
!= fstrm_res_success
) {
103 throw std::runtime_error("FrameStreamLogger: setting " + string(i
.name
) + " failed: " + std::to_string(r
));
109 d_iothr
= fstrm_iothr_init(d_iothropt
, &d_writer
);
111 throw std::runtime_error("FrameStreamLogger: fstrm_iothr_init() failed.");
114 d_ioqueue
= fstrm_iothr_get_input_queue(d_iothr
);
116 throw std::runtime_error("FrameStreamLogger: fstrm_iothr_get_input_queue() failed.");
119 } catch (std::runtime_error
&e
) {
125 void FrameStreamLogger::cleanup()
127 if (d_iothr
!= nullptr) {
128 fstrm_iothr_destroy(&d_iothr
);
131 if (d_iothropt
!= nullptr) {
132 fstrm_iothr_options_destroy(&d_iothropt
);
133 d_iothropt
= nullptr;
135 if (d_writer
!= nullptr) {
136 fstrm_writer_destroy(&d_writer
);
139 if (d_uwopt
!= nullptr) {
140 fstrm_unix_writer_options_destroy(&d_uwopt
);
143 #ifdef HAVE_FSTRM_TCP_WRITER_INIT
144 if (d_twopt
!= nullptr) {
145 fstrm_tcp_writer_options_destroy(&d_twopt
);
149 if (d_fwopt
!= nullptr) {
150 fstrm_writer_options_destroy(&d_fwopt
);
155 FrameStreamLogger::~FrameStreamLogger()
160 void FrameStreamLogger::queueData(const std::string
& data
)
162 if (!d_ioqueue
|| !d_iothr
) {
165 uint8_t *frame
= (uint8_t*)malloc(data
.length());
168 g_log
<<Logger::Warning
<<"FrameStreamLogger: cannot allocate memory for stream."<<std::endl
;
170 warnlog("FrameStreamLogger: cannot allocate memory for stream.");
174 memcpy(frame
, data
.c_str(), data
.length());
177 res
= fstrm_iothr_submit(d_iothr
, d_ioqueue
, frame
, data
.length(), fstrm_free_wrapper
, nullptr);
179 if (res
== fstrm_res_success
) {
180 // Frame successfully queued.
182 } else if (res
== fstrm_res_again
) {
185 g_log
<<Logger::Debug
<<"FrameStreamLogger: queue full, dropping."<<std::endl
;
187 vinfolog("FrameStreamLogger: queue full, dropping.");
191 // Permanent failure.
194 g_log
<<Logger::Warning
<<"FrameStreamLogger: submitting to queue failed."<<std::endl
;
196 warnlog("FrameStreamLogger: submitting to queue failed.");
198 ++d_permanentFailures
;
202 #endif /* HAVE_FSTRM */