5 #include "fstrm_logger.hh"
8 #define DNSTAP_CONTENT_TYPE "protobuf:dnstap.Dnstap"
12 FrameStreamLogger::FrameStreamLogger(const int family
, const std::string
& address
, bool connect
): d_family(family
), d_address(address
)
17 d_fwopt
= fstrm_writer_options_init();
19 throw std::runtime_error("FrameStreamLogger: fstrm_writer_options_init failed.");
22 res
= fstrm_writer_options_add_content_type(d_fwopt
, DNSTAP_CONTENT_TYPE
, sizeof(DNSTAP_CONTENT_TYPE
) - 1);
23 if (res
!= fstrm_res_success
) {
24 throw std::runtime_error("FrameStreamLogger: fstrm_writer_options_add_content_type failed: " + std::to_string(res
));
27 if (d_family
== AF_UNIX
) {
28 struct sockaddr_un local
;
29 if (makeUNsockaddr(d_address
, &local
)) {
30 throw std::runtime_error("FrameStreamLogger: Unable to use '" + d_address
+ "', it is not a valid UNIX socket path.");
33 d_uwopt
= fstrm_unix_writer_options_init();
35 throw std::runtime_error("FrameStreamLogger: fstrm_unix_writer_options_init failed.");
38 // void return, no error checking.
39 fstrm_unix_writer_options_set_socket_path(d_uwopt
, d_address
.c_str());
41 d_writer
= fstrm_unix_writer_init(d_uwopt
, d_fwopt
);
43 throw std::runtime_error("FrameStreamLogger: fstrm_unix_writer_init() failed.");
45 #ifdef HAVE_FSTRM_TCP_WRITER_INIT
46 } else if (family
== AF_INET
) {
47 d_twopt
= fstrm_tcp_writer_options_init();
49 throw std::runtime_error("FrameStreamLogger: fstrm_tcp_writer_options_init failed.");
53 ComboAddress
ca(d_address
);
55 // void return, no error checking.
56 fstrm_tcp_writer_options_set_socket_address(d_twopt
, ca
.toString().c_str());
57 fstrm_tcp_writer_options_set_socket_port(d_twopt
, std::to_string(ca
.getPort()).c_str());
58 } catch (PDNSException
&e
) {
59 throw std::runtime_error("FrameStreamLogger: Unable to use '" + d_address
+ "': " + e
.reason
);
62 d_writer
= fstrm_tcp_writer_init(d_twopt
, d_fwopt
);
64 throw std::runtime_error("FrameStreamLogger: fstrm_tcp_writer_init() failed.");
68 throw std::runtime_error("FrameStreamLogger: family " + std::to_string(family
) + " not supported");
71 d_iothropt
= fstrm_iothr_options_init();
73 throw std::runtime_error("FrameStreamLogger: fstrm_iothr_options_init() failed.");
76 res
= fstrm_iothr_options_set_queue_model(d_iothropt
, FSTRM_IOTHR_QUEUE_MODEL_MPSC
);
77 if (res
!= fstrm_res_success
) {
78 throw std::runtime_error("FrameStreamLogger: fstrm_iothr_options_set_queue_model failed: " + std::to_string(res
));
82 d_iothr
= fstrm_iothr_init(d_iothropt
, &d_writer
);
84 throw std::runtime_error("FrameStreamLogger: fstrm_iothr_init() failed.");
87 d_ioqueue
= fstrm_iothr_get_input_queue(d_iothr
);
89 throw std::runtime_error("FrameStreamLogger: fstrm_iothr_get_input_queue() failed.");
92 } catch (std::runtime_error
&e
) {
98 void FrameStreamLogger::cleanup()
100 if (d_iothr
!= nullptr) {
101 fstrm_iothr_destroy(&d_iothr
);
104 if (d_iothropt
!= nullptr) {
105 fstrm_iothr_options_destroy(&d_iothropt
);
106 d_iothropt
= nullptr;
108 if (d_writer
!= nullptr) {
109 fstrm_writer_destroy(&d_writer
);
112 if (d_uwopt
!= nullptr) {
113 fstrm_unix_writer_options_destroy(&d_uwopt
);
116 #ifdef HAVE_FSTRM_TCP_WRITER_INIT
117 if (d_twopt
!= nullptr) {
118 fstrm_tcp_writer_options_destroy(&d_twopt
);
122 if (d_fwopt
!= nullptr) {
123 fstrm_writer_options_destroy(&d_fwopt
);
128 FrameStreamLogger::~FrameStreamLogger()
133 void FrameStreamLogger::queueData(const std::string
& data
)
135 if (!d_ioqueue
|| !d_iothr
) {
138 uint8_t *frame
= (uint8_t*)malloc(data
.length());
140 warnlog("FrameStreamLogger: cannot allocate memory for stream.");
143 memcpy(frame
, data
.c_str(), data
.length());
146 res
= fstrm_iothr_submit(d_iothr
, d_ioqueue
, frame
, data
.length(), fstrm_free_wrapper
, nullptr);
148 if (res
== fstrm_res_success
) {
149 // Frame successfully queued.
150 } else if (res
== fstrm_res_again
) {
152 warnlog("FrameStreamLogger: queue full, dropping.");
154 // Permanent failure.
156 warnlog("FrameStreamLogger: submitting to queue failed.");
160 #endif /* HAVE_FSTRM */