]> git.ipfire.org Git - thirdparty/pdns.git/blob - pdns/fstrm_logger.cc
Merge pull request #9134 from omoerbeek/secpoll-cleanup
[thirdparty/pdns.git] / pdns / fstrm_logger.cc
1 #include <unistd.h>
2 #include <sys/un.h>
3
4 #include "config.h"
5 #include "fstrm_logger.hh"
6
7 #ifdef RECURSOR
8 #include "logger.hh"
9 #else
10 #include "dolog.hh"
11 #endif
12
13 #define DNSTAP_CONTENT_TYPE "protobuf:dnstap.Dnstap"
14
15 #ifdef HAVE_FSTRM
16
17 FrameStreamLogger::FrameStreamLogger(const int family, const std::string& address, bool connect,
18 const std::unordered_map<string,unsigned>& options): d_family(family), d_address(address)
19 {
20 fstrm_res res;
21
22 try {
23 d_fwopt = fstrm_writer_options_init();
24 if (!d_fwopt) {
25 throw std::runtime_error("FrameStreamLogger: fstrm_writer_options_init failed.");
26 }
27
28 res = fstrm_writer_options_add_content_type(d_fwopt, DNSTAP_CONTENT_TYPE, sizeof(DNSTAP_CONTENT_TYPE) - 1);
29 if (res != fstrm_res_success) {
30 throw std::runtime_error("FrameStreamLogger: fstrm_writer_options_add_content_type failed: " + std::to_string(res));
31 }
32
33 if (d_family == AF_UNIX) {
34 struct sockaddr_un local;
35 if (makeUNsockaddr(d_address, &local)) {
36 throw std::runtime_error("FrameStreamLogger: Unable to use '" + d_address + "', it is not a valid UNIX socket path.");
37 }
38
39 d_uwopt = fstrm_unix_writer_options_init();
40 if (!d_uwopt) {
41 throw std::runtime_error("FrameStreamLogger: fstrm_unix_writer_options_init failed.");
42 }
43
44 // void return, no error checking.
45 fstrm_unix_writer_options_set_socket_path(d_uwopt, d_address.c_str());
46
47 d_writer = fstrm_unix_writer_init(d_uwopt, d_fwopt);
48 if (!d_writer) {
49 throw std::runtime_error("FrameStreamLogger: fstrm_unix_writer_init() failed.");
50 }
51 #ifdef HAVE_FSTRM_TCP_WRITER_INIT
52 } else if (family == AF_INET) {
53 d_twopt = fstrm_tcp_writer_options_init();
54 if (!d_twopt) {
55 throw std::runtime_error("FrameStreamLogger: fstrm_tcp_writer_options_init failed.");
56 }
57
58 try {
59 ComboAddress ca(d_address);
60
61 // void return, no error checking.
62 fstrm_tcp_writer_options_set_socket_address(d_twopt, ca.toString().c_str());
63 fstrm_tcp_writer_options_set_socket_port(d_twopt, std::to_string(ca.getPort()).c_str());
64 } catch (PDNSException &e) {
65 throw std::runtime_error("FrameStreamLogger: Unable to use '" + d_address + "': " + e.reason);
66 }
67
68 d_writer = fstrm_tcp_writer_init(d_twopt, d_fwopt);
69 if (!d_writer) {
70 throw std::runtime_error("FrameStreamLogger: fstrm_tcp_writer_init() failed.");
71 }
72 #endif
73 } else {
74 throw std::runtime_error("FrameStreamLogger: family " + std::to_string(family) + " not supported");
75 }
76
77 d_iothropt = fstrm_iothr_options_init();
78 if (!d_iothropt) {
79 throw std::runtime_error("FrameStreamLogger: fstrm_iothr_options_init() failed.");
80 }
81
82 res = fstrm_iothr_options_set_queue_model(d_iothropt, FSTRM_IOTHR_QUEUE_MODEL_MPSC);
83 if (res != fstrm_res_success) {
84 throw std::runtime_error("FrameStreamLogger: fstrm_iothr_options_set_queue_model failed: " + std::to_string(res));
85 }
86
87 if (options.find("bufferHint") != options.end() && options.at("bufferHint")) {
88 res = fstrm_iothr_options_set_buffer_hint(d_iothropt, options.at("bufferHint"));
89 if (res != fstrm_res_success) {
90 throw std::runtime_error("FrameStreamLogger: fstrm_iothr_options_set_buffer_hint failed: " + std::to_string(res));
91 }
92 }
93 if (options.find("flushTimeout") != options.end() && options.at("flushTimeout")) {
94 res = fstrm_iothr_options_set_flush_timeout(d_iothropt, options.at("flushTimeout"));
95 if (res != fstrm_res_success) {
96 throw std::runtime_error("FrameStreamLogger: fstrm_iothr_options_set_flush_timeout failed: " + std::to_string(res));
97 }
98 }
99 if (options.find("inputQueueSize") != options.end() && options.at("inputQueueSize")) {
100 res = fstrm_iothr_options_set_input_queue_size(d_iothropt, options.at("inputQueueSize"));
101 if (res != fstrm_res_success) {
102 throw std::runtime_error("FrameStreamLogger: fstrm_iothr_options_set_input_queue_size failed: " + std::to_string(res));
103 }
104 }
105 if (options.find("outputQueueSize") != options.end() && options.at("outputQueueSize")) {
106 res = fstrm_iothr_options_set_output_queue_size(d_iothropt, options.at("outputQueueSize"));
107 if (res != fstrm_res_success) {
108 throw std::runtime_error("FrameStreamLogger: fstrm_iothr_options_set_output_queue_size failed: " + std::to_string(res));
109 }
110 }
111 if (options.find("queueNotifyThreshold") != options.end() && options.at("queueNotifyThreshold")) {
112 res = fstrm_iothr_options_set_queue_notify_threshold(d_iothropt, options.at("queueNotifyThreshold"));
113 if (res != fstrm_res_success) {
114 throw std::runtime_error("FrameStreamLogger: fstrm_iothr_options_set_queue_notify_threshold failed: " + std::to_string(res));
115 }
116 }
117 if (options.find("setReopenInterval") != options.end() && options.at("setReopenInterval")) {
118 res = fstrm_iothr_options_set_reopen_interval(d_iothropt, options.at("setReopenInterval"));
119 if (res != fstrm_res_success) {
120 throw std::runtime_error("FrameStreamLogger: fstrm_iothr_options_set_reopen_interval failed: " + std::to_string(res));
121 }
122 }
123
124
125 if (connect) {
126 d_iothr = fstrm_iothr_init(d_iothropt, &d_writer);
127 if (!d_iothr) {
128 throw std::runtime_error("FrameStreamLogger: fstrm_iothr_init() failed.");
129 }
130
131 d_ioqueue = fstrm_iothr_get_input_queue(d_iothr);
132 if (!d_ioqueue) {
133 throw std::runtime_error("FrameStreamLogger: fstrm_iothr_get_input_queue() failed.");
134 }
135 }
136 } catch (std::runtime_error &e) {
137 this->cleanup();
138 throw;
139 }
140 }
141
142 void FrameStreamLogger::cleanup()
143 {
144 if (d_iothr != nullptr) {
145 fstrm_iothr_destroy(&d_iothr);
146 d_iothr = nullptr;
147 }
148 if (d_iothropt != nullptr) {
149 fstrm_iothr_options_destroy(&d_iothropt);
150 d_iothropt = nullptr;
151 }
152 if (d_writer != nullptr) {
153 fstrm_writer_destroy(&d_writer);
154 d_writer = nullptr;
155 }
156 if (d_uwopt != nullptr) {
157 fstrm_unix_writer_options_destroy(&d_uwopt);
158 d_uwopt = nullptr;
159 }
160 #ifdef HAVE_FSTRM_TCP_WRITER_INIT
161 if (d_twopt != nullptr) {
162 fstrm_tcp_writer_options_destroy(&d_twopt);
163 d_twopt = nullptr;
164 }
165 #endif
166 if (d_fwopt != nullptr) {
167 fstrm_writer_options_destroy(&d_fwopt);
168 d_fwopt = nullptr;
169 }
170 }
171
172 FrameStreamLogger::~FrameStreamLogger()
173 {
174 this->cleanup();
175 }
176
177 void FrameStreamLogger::queueData(const std::string& data)
178 {
179 if (!d_ioqueue || !d_iothr) {
180 return;
181 }
182 uint8_t *frame = (uint8_t*)malloc(data.length());
183 if (!frame) {
184 #ifdef RECURSOR
185 g_log<<Logger::Warning<<"FrameStreamLogger: cannot allocate memory for stream."<<std::endl;
186 #else
187 warnlog("FrameStreamLogger: cannot allocate memory for stream.");
188 #endif
189 return;
190 }
191 memcpy(frame, data.c_str(), data.length());
192
193 fstrm_res res;
194 res = fstrm_iothr_submit(d_iothr, d_ioqueue, frame, data.length(), fstrm_free_wrapper, nullptr);
195
196 if (res == fstrm_res_success) {
197 // Frame successfully queued.
198 ++d_framesSent;
199 } else if (res == fstrm_res_again) {
200 free(frame);
201 #ifdef RECURSOR
202 g_log<<Logger::Debug<<"FrameStreamLogger: queue full, dropping."<<std::endl;
203 #else
204 vinfolog("FrameStreamLogger: queue full, dropping.");
205 #endif
206 ++d_queueFullDrops;
207 } else {
208 // Permanent failure.
209 free(frame);
210 #ifdef RECURSOR
211 g_log<<Logger::Warning<<"FrameStreamLogger: submitting to queue failed."<<std::endl;
212 #else
213 warnlog("FrameStreamLogger: submitting to queue failed.");
214 #endif
215 ++d_permanentFailures;
216 }
217 }
218
219 #endif /* HAVE_FSTRM */