5 #include "fstrm_logger.hh"
13 #define DNSTAP_CONTENT_TYPE "protobuf:dnstap.Dnstap"
17 FrameStreamLogger::FrameStreamLogger(const int family
, const std::string
& address
, bool connect
,
18 const std::unordered_map
<string
,unsigned>& options
): d_family(family
), d_address(address
)
23 d_fwopt
= fstrm_writer_options_init();
25 throw std::runtime_error("FrameStreamLogger: fstrm_writer_options_init failed.");
28 res
= fstrm_writer_options_add_content_type(d_fwopt
, DNSTAP_CONTENT_TYPE
, sizeof(DNSTAP_CONTENT_TYPE
) - 1);
29 if (res
!= fstrm_res_success
) {
30 throw std::runtime_error("FrameStreamLogger: fstrm_writer_options_add_content_type failed: " + std::to_string(res
));
33 if (d_family
== AF_UNIX
) {
34 struct sockaddr_un local
;
35 if (makeUNsockaddr(d_address
, &local
)) {
36 throw std::runtime_error("FrameStreamLogger: Unable to use '" + d_address
+ "', it is not a valid UNIX socket path.");
39 d_uwopt
= fstrm_unix_writer_options_init();
41 throw std::runtime_error("FrameStreamLogger: fstrm_unix_writer_options_init failed.");
44 // void return, no error checking.
45 fstrm_unix_writer_options_set_socket_path(d_uwopt
, d_address
.c_str());
47 d_writer
= fstrm_unix_writer_init(d_uwopt
, d_fwopt
);
49 throw std::runtime_error("FrameStreamLogger: fstrm_unix_writer_init() failed.");
51 #ifdef HAVE_FSTRM_TCP_WRITER_INIT
52 } else if (family
== AF_INET
) {
53 d_twopt
= fstrm_tcp_writer_options_init();
55 throw std::runtime_error("FrameStreamLogger: fstrm_tcp_writer_options_init failed.");
59 ComboAddress
ca(d_address
);
61 // void return, no error checking.
62 fstrm_tcp_writer_options_set_socket_address(d_twopt
, ca
.toString().c_str());
63 fstrm_tcp_writer_options_set_socket_port(d_twopt
, std::to_string(ca
.getPort()).c_str());
64 } catch (PDNSException
&e
) {
65 throw std::runtime_error("FrameStreamLogger: Unable to use '" + d_address
+ "': " + e
.reason
);
68 d_writer
= fstrm_tcp_writer_init(d_twopt
, d_fwopt
);
70 throw std::runtime_error("FrameStreamLogger: fstrm_tcp_writer_init() failed.");
74 throw std::runtime_error("FrameStreamLogger: family " + std::to_string(family
) + " not supported");
77 d_iothropt
= fstrm_iothr_options_init();
79 throw std::runtime_error("FrameStreamLogger: fstrm_iothr_options_init() failed.");
82 res
= fstrm_iothr_options_set_queue_model(d_iothropt
, FSTRM_IOTHR_QUEUE_MODEL_MPSC
);
83 if (res
!= fstrm_res_success
) {
84 throw std::runtime_error("FrameStreamLogger: fstrm_iothr_options_set_queue_model failed: " + std::to_string(res
));
87 if (options
.find("bufferHint") != options
.end() && options
.at("bufferHint")) {
88 res
= fstrm_iothr_options_set_buffer_hint(d_iothropt
, options
.at("bufferHint"));
89 if (res
!= fstrm_res_success
) {
90 throw std::runtime_error("FrameStreamLogger: fstrm_iothr_options_set_buffer_hint failed: " + std::to_string(res
));
93 if (options
.find("flushTimeout") != options
.end() && options
.at("flushTimeout")) {
94 res
= fstrm_iothr_options_set_flush_timeout(d_iothropt
, options
.at("flushTimeout"));
95 if (res
!= fstrm_res_success
) {
96 throw std::runtime_error("FrameStreamLogger: fstrm_iothr_options_set_flush_timeout failed: " + std::to_string(res
));
99 if (options
.find("inputQueueSize") != options
.end() && options
.at("inputQueueSize")) {
100 res
= fstrm_iothr_options_set_input_queue_size(d_iothropt
, options
.at("inputQueueSize"));
101 if (res
!= fstrm_res_success
) {
102 throw std::runtime_error("FrameStreamLogger: fstrm_iothr_options_set_input_queue_size failed: " + std::to_string(res
));
105 if (options
.find("outputQueueSize") != options
.end() && options
.at("outputQueueSize")) {
106 res
= fstrm_iothr_options_set_output_queue_size(d_iothropt
, options
.at("outputQueueSize"));
107 if (res
!= fstrm_res_success
) {
108 throw std::runtime_error("FrameStreamLogger: fstrm_iothr_options_set_output_queue_size failed: " + std::to_string(res
));
111 if (options
.find("queueNotifyThreshold") != options
.end() && options
.at("queueNotifyThreshold")) {
112 res
= fstrm_iothr_options_set_queue_notify_threshold(d_iothropt
, options
.at("queueNotifyThreshold"));
113 if (res
!= fstrm_res_success
) {
114 throw std::runtime_error("FrameStreamLogger: fstrm_iothr_options_set_queue_notify_threshold failed: " + std::to_string(res
));
117 if (options
.find("setReopenInterval") != options
.end() && options
.at("setReopenInterval")) {
118 res
= fstrm_iothr_options_set_reopen_interval(d_iothropt
, options
.at("setReopenInterval"));
119 if (res
!= fstrm_res_success
) {
120 throw std::runtime_error("FrameStreamLogger: fstrm_iothr_options_set_reopen_interval failed: " + std::to_string(res
));
126 d_iothr
= fstrm_iothr_init(d_iothropt
, &d_writer
);
128 throw std::runtime_error("FrameStreamLogger: fstrm_iothr_init() failed.");
131 d_ioqueue
= fstrm_iothr_get_input_queue(d_iothr
);
133 throw std::runtime_error("FrameStreamLogger: fstrm_iothr_get_input_queue() failed.");
136 } catch (std::runtime_error
&e
) {
142 void FrameStreamLogger::cleanup()
144 if (d_iothr
!= nullptr) {
145 fstrm_iothr_destroy(&d_iothr
);
148 if (d_iothropt
!= nullptr) {
149 fstrm_iothr_options_destroy(&d_iothropt
);
150 d_iothropt
= nullptr;
152 if (d_writer
!= nullptr) {
153 fstrm_writer_destroy(&d_writer
);
156 if (d_uwopt
!= nullptr) {
157 fstrm_unix_writer_options_destroy(&d_uwopt
);
160 #ifdef HAVE_FSTRM_TCP_WRITER_INIT
161 if (d_twopt
!= nullptr) {
162 fstrm_tcp_writer_options_destroy(&d_twopt
);
166 if (d_fwopt
!= nullptr) {
167 fstrm_writer_options_destroy(&d_fwopt
);
172 FrameStreamLogger::~FrameStreamLogger()
177 void FrameStreamLogger::queueData(const std::string
& data
)
179 if (!d_ioqueue
|| !d_iothr
) {
182 uint8_t *frame
= (uint8_t*)malloc(data
.length());
185 g_log
<<Logger::Warning
<<"FrameStreamLogger: cannot allocate memory for stream."<<std::endl
;
187 warnlog("FrameStreamLogger: cannot allocate memory for stream.");
191 memcpy(frame
, data
.c_str(), data
.length());
194 res
= fstrm_iothr_submit(d_iothr
, d_ioqueue
, frame
, data
.length(), fstrm_free_wrapper
, nullptr);
196 if (res
== fstrm_res_success
) {
197 // Frame successfully queued.
198 } else if (res
== fstrm_res_again
) {
201 g_log
<<Logger::Warning
<<"FrameStreamLogger: queue full, dropping."<<std::endl
;
203 warnlog("FrameStreamLogger: queue full, dropping.");
206 // Permanent failure.
209 g_log
<<Logger::Warning
<<"FrameStreamLogger: submitting to queue failed."<<std::endl
;
211 warnlog("FrameStreamLogger: submitting to queue failed.");
216 #endif /* HAVE_FSTRM */