]> git.ipfire.org Git - thirdparty/pdns.git/blame - pdns/fstrm_logger.cc
Merge remote-tracking branch 'origin/master' into rec-dnstap
[thirdparty/pdns.git] / pdns / fstrm_logger.cc
CommitLineData
82a91ddf
CH
1#include <unistd.h>
2#include <sys/un.h>
3
4#include "config.h"
5#include "fstrm_logger.hh"
6#include "dolog.hh"
7
8#define DNSTAP_CONTENT_TYPE "protobuf:dnstap.Dnstap"
9
10#ifdef HAVE_FSTRM
11
573f4ff0
OM
12FrameStreamLogger::FrameStreamLogger(const int family, const std::string& address, bool connect,
13 const std::unordered_map<string,unsigned>& options): d_family(family), d_address(address)
82a91ddf
CH
14{
15 fstrm_res res;
16
17 try {
18 d_fwopt = fstrm_writer_options_init();
19 if (!d_fwopt) {
20 throw std::runtime_error("FrameStreamLogger: fstrm_writer_options_init failed.");
21 }
22
23 res = fstrm_writer_options_add_content_type(d_fwopt, DNSTAP_CONTENT_TYPE, sizeof(DNSTAP_CONTENT_TYPE) - 1);
24 if (res != fstrm_res_success) {
25 throw std::runtime_error("FrameStreamLogger: fstrm_writer_options_add_content_type failed: " + std::to_string(res));
26 }
27
28 if (d_family == AF_UNIX) {
29 struct sockaddr_un local;
30 if (makeUNsockaddr(d_address, &local)) {
31 throw std::runtime_error("FrameStreamLogger: Unable to use '" + d_address + "', it is not a valid UNIX socket path.");
32 }
33
34 d_uwopt = fstrm_unix_writer_options_init();
35 if (!d_uwopt) {
36 throw std::runtime_error("FrameStreamLogger: fstrm_unix_writer_options_init failed.");
37 }
38
39 // void return, no error checking.
40 fstrm_unix_writer_options_set_socket_path(d_uwopt, d_address.c_str());
41
42 d_writer = fstrm_unix_writer_init(d_uwopt, d_fwopt);
43 if (!d_writer) {
44 throw std::runtime_error("FrameStreamLogger: fstrm_unix_writer_init() failed.");
45 }
46 #ifdef HAVE_FSTRM_TCP_WRITER_INIT
47 } else if (family == AF_INET) {
48 d_twopt = fstrm_tcp_writer_options_init();
49 if (!d_twopt) {
50 throw std::runtime_error("FrameStreamLogger: fstrm_tcp_writer_options_init failed.");
51 }
52
53 try {
54 ComboAddress ca(d_address);
55
56 // void return, no error checking.
57 fstrm_tcp_writer_options_set_socket_address(d_twopt, ca.toString().c_str());
58 fstrm_tcp_writer_options_set_socket_port(d_twopt, std::to_string(ca.getPort()).c_str());
59 } catch (PDNSException &e) {
60 throw std::runtime_error("FrameStreamLogger: Unable to use '" + d_address + "': " + e.reason);
61 }
62
63 d_writer = fstrm_tcp_writer_init(d_twopt, d_fwopt);
64 if (!d_writer) {
65 throw std::runtime_error("FrameStreamLogger: fstrm_tcp_writer_init() failed.");
66 }
67 #endif
68 } else {
69 throw std::runtime_error("FrameStreamLogger: family " + std::to_string(family) + " not supported");
70 }
71
72 d_iothropt = fstrm_iothr_options_init();
73 if (!d_iothropt) {
74 throw std::runtime_error("FrameStreamLogger: fstrm_iothr_options_init() failed.");
75 }
76
77 res = fstrm_iothr_options_set_queue_model(d_iothropt, FSTRM_IOTHR_QUEUE_MODEL_MPSC);
78 if (res != fstrm_res_success) {
79 throw std::runtime_error("FrameStreamLogger: fstrm_iothr_options_set_queue_model failed: " + std::to_string(res));
80 }
81
573f4ff0
OM
82 if (options.find("bufferHint") != options.end() && options.at("bufferHint")) {
83 res = fstrm_iothr_options_set_buffer_hint(d_iothropt, options.at("bufferHint"));
84 if (res != fstrm_res_success) {
85 throw std::runtime_error("FrameStreamLogger: fstrm_iothr_options_set_buffer_hint failed: " + std::to_string(res));
86 }
87 }
88 if (options.find("flushTimeout") != options.end() && options.at("flushTimeout")) {
89 res = fstrm_iothr_options_set_flush_timeout(d_iothropt, options.at("flushTimeout"));
90 if (res != fstrm_res_success) {
91 throw std::runtime_error("FrameStreamLogger: fstrm_iothr_options_set_flush_timeout failed: " + std::to_string(res));
92 }
93 }
94 if (options.find("inputQueueSize") != options.end() && options.at("inputQueueSize")) {
95 res = fstrm_iothr_options_set_input_queue_size(d_iothropt, options.at("inputQueueSize"));
96 if (res != fstrm_res_success) {
97 throw std::runtime_error("FrameStreamLogger: fstrm_iothr_options_set_input_queue_size failed: " + std::to_string(res));
98 }
99 }
100 if (options.find("outputQueueSize") != options.end() && options.at("outputQueueSize")) {
101 res = fstrm_iothr_options_set_output_queue_size(d_iothropt, options.at("outputQueueSize"));
102 if (res != fstrm_res_success) {
103 throw std::runtime_error("FrameStreamLogger: fstrm_iothr_options_set_output_queue_size failed: " + std::to_string(res));
104 }
105 }
106 if (options.find("queueNotifyThreshold") != options.end() && options.at("queueNotifyThreshold")) {
107 res = fstrm_iothr_options_set_queue_notify_threshold(d_iothropt, options.at("queueNotifyThreshold"));
108 if (res != fstrm_res_success) {
109 throw std::runtime_error("FrameStreamLogger: fstrm_iothr_options_set_queue_notify_threshold failed: " + std::to_string(res));
110 }
111 }
112 if (options.find("setReopenInterval") != options.end() && options.at("setReopenInterval")) {
113 res = fstrm_iothr_options_set_reopen_interval(d_iothropt, options.at("setReopenInterval"));
114 if (res != fstrm_res_success) {
115 throw std::runtime_error("FrameStreamLogger: fstrm_iothr_options_set_reopen_interval failed: " + std::to_string(res));
116 }
117 }
118
119
6b44773a
CH
120 if (connect) {
121 d_iothr = fstrm_iothr_init(d_iothropt, &d_writer);
122 if (!d_iothr) {
123 throw std::runtime_error("FrameStreamLogger: fstrm_iothr_init() failed.");
124 }
82a91ddf 125
6b44773a
CH
126 d_ioqueue = fstrm_iothr_get_input_queue(d_iothr);
127 if (!d_ioqueue) {
128 throw std::runtime_error("FrameStreamLogger: fstrm_iothr_get_input_queue() failed.");
129 }
82a91ddf
CH
130 }
131 } catch (std::runtime_error &e) {
132 this->cleanup();
133 throw;
134 }
135}
136
137void FrameStreamLogger::cleanup()
138{
139 if (d_iothr != nullptr) {
140 fstrm_iothr_destroy(&d_iothr);
141 d_iothr = nullptr;
142 }
143 if (d_iothropt != nullptr) {
144 fstrm_iothr_options_destroy(&d_iothropt);
145 d_iothropt = nullptr;
146 }
147 if (d_writer != nullptr) {
148 fstrm_writer_destroy(&d_writer);
149 d_writer = nullptr;
150 }
151 if (d_uwopt != nullptr) {
152 fstrm_unix_writer_options_destroy(&d_uwopt);
153 d_uwopt = nullptr;
154 }
155#ifdef HAVE_FSTRM_TCP_WRITER_INIT
156 if (d_twopt != nullptr) {
157 fstrm_tcp_writer_options_destroy(&d_twopt);
158 d_twopt = nullptr;
159 }
160#endif
161 if (d_fwopt != nullptr) {
162 fstrm_writer_options_destroy(&d_fwopt);
163 d_fwopt = nullptr;
164 }
165}
166
167FrameStreamLogger::~FrameStreamLogger()
168{
169 this->cleanup();
170}
171
172void FrameStreamLogger::queueData(const std::string& data)
173{
6b44773a
CH
174 if (!d_ioqueue || !d_iothr) {
175 return;
176 }
82a91ddf
CH
177 uint8_t *frame = (uint8_t*)malloc(data.length());
178 if (!frame) {
179 warnlog("FrameStreamLogger: cannot allocate memory for stream.");
180 return;
181 }
182 memcpy(frame, data.c_str(), data.length());
183
184 fstrm_res res;
185 res = fstrm_iothr_submit(d_iothr, d_ioqueue, frame, data.length(), fstrm_free_wrapper, nullptr);
186
187 if (res == fstrm_res_success) {
188 // Frame successfully queued.
189 } else if (res == fstrm_res_again) {
190 free(frame);
191 warnlog("FrameStreamLogger: queue full, dropping.");
192 } else {
193 // Permanent failure.
194 free(frame);
195 warnlog("FrameStreamLogger: submitting to queue failed.");
196 }
197}
198
199#endif /* HAVE_FSTRM */