]> git.ipfire.org Git - thirdparty/pdns.git/blame - pdns/fstrm_logger.cc
Merge pull request #8223 from PowerDNS/omoerbeek-patch-1
[thirdparty/pdns.git] / pdns / fstrm_logger.cc
CommitLineData
82a91ddf
CH
1#include <unistd.h>
2#include <sys/un.h>
3
4#include "config.h"
5#include "fstrm_logger.hh"
d0f0ed4d
OM
6
7#ifdef RECURSOR
8#include "logger.hh"
9#else
82a91ddf 10#include "dolog.hh"
d0f0ed4d 11#endif
82a91ddf
CH
12
13#define DNSTAP_CONTENT_TYPE "protobuf:dnstap.Dnstap"
14
15#ifdef HAVE_FSTRM
16
573f4ff0
OM
17FrameStreamLogger::FrameStreamLogger(const int family, const std::string& address, bool connect,
18 const std::unordered_map<string,unsigned>& options): d_family(family), d_address(address)
82a91ddf
CH
19{
20 fstrm_res res;
21
22 try {
23 d_fwopt = fstrm_writer_options_init();
24 if (!d_fwopt) {
25 throw std::runtime_error("FrameStreamLogger: fstrm_writer_options_init failed.");
26 }
27
28 res = fstrm_writer_options_add_content_type(d_fwopt, DNSTAP_CONTENT_TYPE, sizeof(DNSTAP_CONTENT_TYPE) - 1);
29 if (res != fstrm_res_success) {
30 throw std::runtime_error("FrameStreamLogger: fstrm_writer_options_add_content_type failed: " + std::to_string(res));
31 }
32
33 if (d_family == AF_UNIX) {
34 struct sockaddr_un local;
35 if (makeUNsockaddr(d_address, &local)) {
36 throw std::runtime_error("FrameStreamLogger: Unable to use '" + d_address + "', it is not a valid UNIX socket path.");
37 }
38
39 d_uwopt = fstrm_unix_writer_options_init();
40 if (!d_uwopt) {
41 throw std::runtime_error("FrameStreamLogger: fstrm_unix_writer_options_init failed.");
42 }
43
44 // void return, no error checking.
45 fstrm_unix_writer_options_set_socket_path(d_uwopt, d_address.c_str());
46
47 d_writer = fstrm_unix_writer_init(d_uwopt, d_fwopt);
48 if (!d_writer) {
49 throw std::runtime_error("FrameStreamLogger: fstrm_unix_writer_init() failed.");
50 }
51 #ifdef HAVE_FSTRM_TCP_WRITER_INIT
52 } else if (family == AF_INET) {
53 d_twopt = fstrm_tcp_writer_options_init();
54 if (!d_twopt) {
55 throw std::runtime_error("FrameStreamLogger: fstrm_tcp_writer_options_init failed.");
56 }
57
58 try {
59 ComboAddress ca(d_address);
60
61 // void return, no error checking.
62 fstrm_tcp_writer_options_set_socket_address(d_twopt, ca.toString().c_str());
63 fstrm_tcp_writer_options_set_socket_port(d_twopt, std::to_string(ca.getPort()).c_str());
64 } catch (PDNSException &e) {
65 throw std::runtime_error("FrameStreamLogger: Unable to use '" + d_address + "': " + e.reason);
66 }
67
68 d_writer = fstrm_tcp_writer_init(d_twopt, d_fwopt);
69 if (!d_writer) {
70 throw std::runtime_error("FrameStreamLogger: fstrm_tcp_writer_init() failed.");
71 }
72 #endif
73 } else {
74 throw std::runtime_error("FrameStreamLogger: family " + std::to_string(family) + " not supported");
75 }
76
77 d_iothropt = fstrm_iothr_options_init();
78 if (!d_iothropt) {
79 throw std::runtime_error("FrameStreamLogger: fstrm_iothr_options_init() failed.");
80 }
81
82 res = fstrm_iothr_options_set_queue_model(d_iothropt, FSTRM_IOTHR_QUEUE_MODEL_MPSC);
83 if (res != fstrm_res_success) {
84 throw std::runtime_error("FrameStreamLogger: fstrm_iothr_options_set_queue_model failed: " + std::to_string(res));
85 }
86
573f4ff0
OM
87 if (options.find("bufferHint") != options.end() && options.at("bufferHint")) {
88 res = fstrm_iothr_options_set_buffer_hint(d_iothropt, options.at("bufferHint"));
89 if (res != fstrm_res_success) {
90 throw std::runtime_error("FrameStreamLogger: fstrm_iothr_options_set_buffer_hint failed: " + std::to_string(res));
91 }
92 }
93 if (options.find("flushTimeout") != options.end() && options.at("flushTimeout")) {
94 res = fstrm_iothr_options_set_flush_timeout(d_iothropt, options.at("flushTimeout"));
95 if (res != fstrm_res_success) {
96 throw std::runtime_error("FrameStreamLogger: fstrm_iothr_options_set_flush_timeout failed: " + std::to_string(res));
97 }
98 }
99 if (options.find("inputQueueSize") != options.end() && options.at("inputQueueSize")) {
100 res = fstrm_iothr_options_set_input_queue_size(d_iothropt, options.at("inputQueueSize"));
101 if (res != fstrm_res_success) {
102 throw std::runtime_error("FrameStreamLogger: fstrm_iothr_options_set_input_queue_size failed: " + std::to_string(res));
103 }
104 }
105 if (options.find("outputQueueSize") != options.end() && options.at("outputQueueSize")) {
106 res = fstrm_iothr_options_set_output_queue_size(d_iothropt, options.at("outputQueueSize"));
107 if (res != fstrm_res_success) {
108 throw std::runtime_error("FrameStreamLogger: fstrm_iothr_options_set_output_queue_size failed: " + std::to_string(res));
109 }
110 }
111 if (options.find("queueNotifyThreshold") != options.end() && options.at("queueNotifyThreshold")) {
112 res = fstrm_iothr_options_set_queue_notify_threshold(d_iothropt, options.at("queueNotifyThreshold"));
113 if (res != fstrm_res_success) {
114 throw std::runtime_error("FrameStreamLogger: fstrm_iothr_options_set_queue_notify_threshold failed: " + std::to_string(res));
115 }
116 }
117 if (options.find("setReopenInterval") != options.end() && options.at("setReopenInterval")) {
118 res = fstrm_iothr_options_set_reopen_interval(d_iothropt, options.at("setReopenInterval"));
119 if (res != fstrm_res_success) {
120 throw std::runtime_error("FrameStreamLogger: fstrm_iothr_options_set_reopen_interval failed: " + std::to_string(res));
121 }
122 }
123
124
6b44773a
CH
125 if (connect) {
126 d_iothr = fstrm_iothr_init(d_iothropt, &d_writer);
127 if (!d_iothr) {
128 throw std::runtime_error("FrameStreamLogger: fstrm_iothr_init() failed.");
129 }
82a91ddf 130
6b44773a
CH
131 d_ioqueue = fstrm_iothr_get_input_queue(d_iothr);
132 if (!d_ioqueue) {
133 throw std::runtime_error("FrameStreamLogger: fstrm_iothr_get_input_queue() failed.");
134 }
82a91ddf
CH
135 }
136 } catch (std::runtime_error &e) {
137 this->cleanup();
138 throw;
139 }
140}
141
142void FrameStreamLogger::cleanup()
143{
144 if (d_iothr != nullptr) {
145 fstrm_iothr_destroy(&d_iothr);
146 d_iothr = nullptr;
147 }
148 if (d_iothropt != nullptr) {
149 fstrm_iothr_options_destroy(&d_iothropt);
150 d_iothropt = nullptr;
151 }
152 if (d_writer != nullptr) {
153 fstrm_writer_destroy(&d_writer);
154 d_writer = nullptr;
155 }
156 if (d_uwopt != nullptr) {
157 fstrm_unix_writer_options_destroy(&d_uwopt);
158 d_uwopt = nullptr;
159 }
160#ifdef HAVE_FSTRM_TCP_WRITER_INIT
161 if (d_twopt != nullptr) {
162 fstrm_tcp_writer_options_destroy(&d_twopt);
163 d_twopt = nullptr;
164 }
165#endif
166 if (d_fwopt != nullptr) {
167 fstrm_writer_options_destroy(&d_fwopt);
168 d_fwopt = nullptr;
169 }
170}
171
172FrameStreamLogger::~FrameStreamLogger()
173{
174 this->cleanup();
175}
176
177void FrameStreamLogger::queueData(const std::string& data)
178{
6b44773a
CH
179 if (!d_ioqueue || !d_iothr) {
180 return;
181 }
82a91ddf
CH
182 uint8_t *frame = (uint8_t*)malloc(data.length());
183 if (!frame) {
d0f0ed4d
OM
184#ifdef RECURSOR
185 g_log<<Logger::Warning<<"FrameStreamLogger: cannot allocate memory for stream."<<std::endl;
186#else
82a91ddf 187 warnlog("FrameStreamLogger: cannot allocate memory for stream.");
d0f0ed4d 188#endif
82a91ddf
CH
189 return;
190 }
191 memcpy(frame, data.c_str(), data.length());
192
193 fstrm_res res;
194 res = fstrm_iothr_submit(d_iothr, d_ioqueue, frame, data.length(), fstrm_free_wrapper, nullptr);
195
196 if (res == fstrm_res_success) {
197 // Frame successfully queued.
198 } else if (res == fstrm_res_again) {
199 free(frame);
d0f0ed4d
OM
200#ifdef RECURSOR
201 g_log<<Logger::Warning<<"FrameStreamLogger: queue full, dropping."<<std::endl;
202#else
82a91ddf 203 warnlog("FrameStreamLogger: queue full, dropping.");
d0f0ed4d
OM
204#endif
205 } else {
82a91ddf
CH
206 // Permanent failure.
207 free(frame);
d0f0ed4d
OM
208#ifdef RECURSOR
209 g_log<<Logger::Warning<<"FrameStreamLogger: submitting to queue failed."<<std::endl;
210#else
82a91ddf 211 warnlog("FrameStreamLogger: submitting to queue failed.");
d0f0ed4d 212#endif
82a91ddf
CH
213 }
214}
215
216#endif /* HAVE_FSTRM */