]> git.ipfire.org Git - thirdparty/pdns.git/blame - pdns/fstrm_logger.cc
Merge pull request #14032 from rgacogne/ddist-192-changelog-secpoll
[thirdparty/pdns.git] / pdns / fstrm_logger.cc
CommitLineData
82a91ddf
CH
1#include <unistd.h>
2#include <sys/un.h>
3
4#include "config.h"
5#include "fstrm_logger.hh"
d0f0ed4d
OM
6
7#ifdef RECURSOR
8#include "logger.hh"
62b191dc 9#include "logging.hh"
d0f0ed4d 10#else
82a91ddf 11#include "dolog.hh"
d0f0ed4d 12#endif
82a91ddf 13
82a91ddf
CH
14#ifdef HAVE_FSTRM
15
de7c2e59
OM
16static const std::string DNSTAP_CONTENT_TYPE = "protobuf:dnstap.Dnstap";
17
5c9334e7
OM
18FrameStreamLogger::FrameStreamLogger(const int family, std::string address, bool connect, const std::unordered_map<string, unsigned>& options) :
19 d_family(family), d_address(std::move(address))
82a91ddf 20{
82a91ddf
CH
21 try {
22 d_fwopt = fstrm_writer_options_init();
5c9334e7 23 if (d_fwopt == nullptr) {
82a91ddf
CH
24 throw std::runtime_error("FrameStreamLogger: fstrm_writer_options_init failed.");
25 }
26
de7c2e59 27 auto res = fstrm_writer_options_add_content_type(d_fwopt, DNSTAP_CONTENT_TYPE.c_str(), DNSTAP_CONTENT_TYPE.size());
82a91ddf
CH
28 if (res != fstrm_res_success) {
29 throw std::runtime_error("FrameStreamLogger: fstrm_writer_options_add_content_type failed: " + std::to_string(res));
30 }
31
32 if (d_family == AF_UNIX) {
5c9334e7
OM
33 struct sockaddr_un local
34 {
35 };
36 if (makeUNsockaddr(d_address, &local) != 0) {
82a91ddf
CH
37 throw std::runtime_error("FrameStreamLogger: Unable to use '" + d_address + "', it is not a valid UNIX socket path.");
38 }
39
40 d_uwopt = fstrm_unix_writer_options_init();
5c9334e7 41 if (d_uwopt == nullptr) {
82a91ddf
CH
42 throw std::runtime_error("FrameStreamLogger: fstrm_unix_writer_options_init failed.");
43 }
44
45 // void return, no error checking.
46 fstrm_unix_writer_options_set_socket_path(d_uwopt, d_address.c_str());
47
48 d_writer = fstrm_unix_writer_init(d_uwopt, d_fwopt);
5c9334e7 49 if (d_writer == nullptr) {
82a91ddf
CH
50 throw std::runtime_error("FrameStreamLogger: fstrm_unix_writer_init() failed.");
51 }
5c9334e7
OM
52#ifdef HAVE_FSTRM_TCP_WRITER_INIT
53 }
54 else if (family == AF_INET || family == AF_INET6) {
82a91ddf 55 d_twopt = fstrm_tcp_writer_options_init();
5c9334e7 56 if (d_twopt == nullptr) {
82a91ddf
CH
57 throw std::runtime_error("FrameStreamLogger: fstrm_tcp_writer_options_init failed.");
58 }
59
60 try {
5c9334e7 61 ComboAddress inetAddress(d_address);
82a91ddf
CH
62
63 // void return, no error checking.
5c9334e7
OM
64 fstrm_tcp_writer_options_set_socket_address(d_twopt, inetAddress.toString().c_str());
65 fstrm_tcp_writer_options_set_socket_port(d_twopt, std::to_string(inetAddress.getPort()).c_str());
66 }
67 catch (PDNSException& e) {
82a91ddf
CH
68 throw std::runtime_error("FrameStreamLogger: Unable to use '" + d_address + "': " + e.reason);
69 }
70
71 d_writer = fstrm_tcp_writer_init(d_twopt, d_fwopt);
5c9334e7 72 if (d_writer == nullptr) {
82a91ddf
CH
73 throw std::runtime_error("FrameStreamLogger: fstrm_tcp_writer_init() failed.");
74 }
5c9334e7
OM
75#endif
76 }
77 else {
82a91ddf
CH
78 throw std::runtime_error("FrameStreamLogger: family " + std::to_string(family) + " not supported");
79 }
80
81 d_iothropt = fstrm_iothr_options_init();
5c9334e7 82 if (d_iothropt == nullptr) {
82a91ddf
CH
83 throw std::runtime_error("FrameStreamLogger: fstrm_iothr_options_init() failed.");
84 }
85
86 res = fstrm_iothr_options_set_queue_model(d_iothropt, FSTRM_IOTHR_QUEUE_MODEL_MPSC);
87 if (res != fstrm_res_success) {
88 throw std::runtime_error("FrameStreamLogger: fstrm_iothr_options_set_queue_model failed: " + std::to_string(res));
89 }
90
5c9334e7
OM
91 struct setters
92 {
627df2af 93 const std::string name;
5c9334e7 94 fstrm_res (*function)(struct fstrm_iothr_options*, const unsigned int);
7964dcda 95 };
5c9334e7
OM
96 const std::array<struct setters, 6> list = {{{"bufferHint", fstrm_iothr_options_set_buffer_hint},
97 {"flushTimeout", fstrm_iothr_options_set_flush_timeout},
98 {"inputQueueSize", fstrm_iothr_options_set_input_queue_size},
99 {"outputQueueSize", fstrm_iothr_options_set_output_queue_size},
100 {"queueNotifyThreshold", fstrm_iothr_options_set_queue_notify_threshold},
101 {"setReopenInterval", fstrm_iothr_options_set_reopen_interval}}};
102
103 for (const auto& entry : list) {
104 if (auto option = options.find(entry.name); option != options.end() && option->second != 0) {
105 auto result = entry.function(d_iothropt, option->second);
106 if (result != fstrm_res_success) {
107 throw std::runtime_error("FrameStreamLogger: setting " + string(entry.name) + " failed: " + std::to_string(result));
7964dcda 108 }
573f4ff0
OM
109 }
110 }
573f4ff0 111
6b44773a
CH
112 if (connect) {
113 d_iothr = fstrm_iothr_init(d_iothropt, &d_writer);
5c9334e7 114 if (d_iothr == nullptr) {
6b44773a
CH
115 throw std::runtime_error("FrameStreamLogger: fstrm_iothr_init() failed.");
116 }
82a91ddf 117
6b44773a 118 d_ioqueue = fstrm_iothr_get_input_queue(d_iothr);
5c9334e7 119 if (d_ioqueue == nullptr) {
6b44773a
CH
120 throw std::runtime_error("FrameStreamLogger: fstrm_iothr_get_input_queue() failed.");
121 }
82a91ddf 122 }
5c9334e7
OM
123 }
124 catch (std::runtime_error& e) {
82a91ddf
CH
125 this->cleanup();
126 throw;
127 }
128}
129
130void FrameStreamLogger::cleanup()
131{
132 if (d_iothr != nullptr) {
133 fstrm_iothr_destroy(&d_iothr);
134 d_iothr = nullptr;
135 }
136 if (d_iothropt != nullptr) {
137 fstrm_iothr_options_destroy(&d_iothropt);
138 d_iothropt = nullptr;
139 }
140 if (d_writer != nullptr) {
141 fstrm_writer_destroy(&d_writer);
142 d_writer = nullptr;
143 }
144 if (d_uwopt != nullptr) {
145 fstrm_unix_writer_options_destroy(&d_uwopt);
146 d_uwopt = nullptr;
147 }
148#ifdef HAVE_FSTRM_TCP_WRITER_INIT
149 if (d_twopt != nullptr) {
150 fstrm_tcp_writer_options_destroy(&d_twopt);
151 d_twopt = nullptr;
152 }
153#endif
154 if (d_fwopt != nullptr) {
155 fstrm_writer_options_destroy(&d_fwopt);
156 d_fwopt = nullptr;
157 }
158}
159
160FrameStreamLogger::~FrameStreamLogger()
161{
162 this->cleanup();
163}
164
4d7db3d7 165RemoteLoggerInterface::Result FrameStreamLogger::queueData(const std::string& data)
82a91ddf 166{
5c9334e7 167 if ((d_ioqueue == nullptr) || d_iothr == nullptr) {
2b7a8236 168 ++d_permanentFailures;
4d7db3d7 169 return Result::OtherError;
6b44773a 170 }
5c9334e7
OM
171 uint8_t* frame = (uint8_t*)malloc(data.length()); // NOLINT: it's the API
172 if (frame == nullptr) {
2b7a8236 173 ++d_queueFullDrops; // XXX separate count?
4d7db3d7 174 return Result::TooLarge;
82a91ddf
CH
175 }
176 memcpy(frame, data.c_str(), data.length());
177
5c9334e7 178 auto res = fstrm_iothr_submit(d_iothr, d_ioqueue, frame, data.length(), fstrm_free_wrapper, nullptr);
82a91ddf
CH
179
180 if (res == fstrm_res_success) {
181 // Frame successfully queued.
414a03e1 182 ++d_framesSent;
5c9334e7 183 // do not call free here
4d7db3d7 184 return Result::Queued;
5c9334e7
OM
185 }
186 if (res == fstrm_res_again) {
187 free(frame); // NOLINT: it's the API
414a03e1 188 ++d_queueFullDrops;
4d7db3d7 189 return Result::PipeFull;
82a91ddf 190 }
5c9334e7
OM
191 // Permanent failure.
192 free(frame); // NOLINT: it's the API
193 ++d_permanentFailures;
194 return Result::OtherError;
82a91ddf
CH
195}
196
197#endif /* HAVE_FSTRM */