]> git.ipfire.org Git - thirdparty/pdns.git/blob - pdns/fstrm_logger.cc
Merge pull request #9715 from pieterlexis/4.1-docs
[thirdparty/pdns.git] / pdns / fstrm_logger.cc
1 #include <unistd.h>
2 #include <sys/un.h>
3
4 #include "config.h"
5 #include "fstrm_logger.hh"
6
7 #ifdef RECURSOR
8 #include "logger.hh"
9 #else
10 #include "dolog.hh"
11 #endif
12
13 #define DNSTAP_CONTENT_TYPE "protobuf:dnstap.Dnstap"
14
15 #ifdef HAVE_FSTRM
16
17 FrameStreamLogger::FrameStreamLogger(const int family, const std::string& address, bool connect,
18 const std::unordered_map<string,unsigned>& options): d_family(family), d_address(address)
19 {
20 fstrm_res res;
21
22 try {
23 d_fwopt = fstrm_writer_options_init();
24 if (!d_fwopt) {
25 throw std::runtime_error("FrameStreamLogger: fstrm_writer_options_init failed.");
26 }
27
28 res = fstrm_writer_options_add_content_type(d_fwopt, DNSTAP_CONTENT_TYPE, sizeof(DNSTAP_CONTENT_TYPE) - 1);
29 if (res != fstrm_res_success) {
30 throw std::runtime_error("FrameStreamLogger: fstrm_writer_options_add_content_type failed: " + std::to_string(res));
31 }
32
33 if (d_family == AF_UNIX) {
34 struct sockaddr_un local;
35 if (makeUNsockaddr(d_address, &local)) {
36 throw std::runtime_error("FrameStreamLogger: Unable to use '" + d_address + "', it is not a valid UNIX socket path.");
37 }
38
39 d_uwopt = fstrm_unix_writer_options_init();
40 if (!d_uwopt) {
41 throw std::runtime_error("FrameStreamLogger: fstrm_unix_writer_options_init failed.");
42 }
43
44 // void return, no error checking.
45 fstrm_unix_writer_options_set_socket_path(d_uwopt, d_address.c_str());
46
47 d_writer = fstrm_unix_writer_init(d_uwopt, d_fwopt);
48 if (!d_writer) {
49 throw std::runtime_error("FrameStreamLogger: fstrm_unix_writer_init() failed.");
50 }
51 #ifdef HAVE_FSTRM_TCP_WRITER_INIT
52 } else if (family == AF_INET) {
53 d_twopt = fstrm_tcp_writer_options_init();
54 if (!d_twopt) {
55 throw std::runtime_error("FrameStreamLogger: fstrm_tcp_writer_options_init failed.");
56 }
57
58 try {
59 ComboAddress ca(d_address);
60
61 // void return, no error checking.
62 fstrm_tcp_writer_options_set_socket_address(d_twopt, ca.toString().c_str());
63 fstrm_tcp_writer_options_set_socket_port(d_twopt, std::to_string(ca.getPort()).c_str());
64 } catch (PDNSException &e) {
65 throw std::runtime_error("FrameStreamLogger: Unable to use '" + d_address + "': " + e.reason);
66 }
67
68 d_writer = fstrm_tcp_writer_init(d_twopt, d_fwopt);
69 if (!d_writer) {
70 throw std::runtime_error("FrameStreamLogger: fstrm_tcp_writer_init() failed.");
71 }
72 #endif
73 } else {
74 throw std::runtime_error("FrameStreamLogger: family " + std::to_string(family) + " not supported");
75 }
76
77 d_iothropt = fstrm_iothr_options_init();
78 if (!d_iothropt) {
79 throw std::runtime_error("FrameStreamLogger: fstrm_iothr_options_init() failed.");
80 }
81
82 res = fstrm_iothr_options_set_queue_model(d_iothropt, FSTRM_IOTHR_QUEUE_MODEL_MPSC);
83 if (res != fstrm_res_success) {
84 throw std::runtime_error("FrameStreamLogger: fstrm_iothr_options_set_queue_model failed: " + std::to_string(res));
85 }
86
87 const struct {
88 const std::string name;
89 fstrm_res (*function)(struct fstrm_iothr_options *, const unsigned int);
90 } list[] = {
91 { "bufferHint", fstrm_iothr_options_set_buffer_hint },
92 { "flushTimeout", fstrm_iothr_options_set_flush_timeout },
93 { "inputQueueSize", fstrm_iothr_options_set_input_queue_size },
94 { "outputQueueSize", fstrm_iothr_options_set_output_queue_size },
95 { "queueNotifyThreshold", fstrm_iothr_options_set_queue_notify_threshold },
96 { "setReopenInterval", fstrm_iothr_options_set_reopen_interval }
97 };
98
99 for (const auto& i : list) {
100 if (options.find(i.name) != options.end() && options.at(i.name)) {
101 fstrm_res r = i.function(d_iothropt, options.at(i.name));
102 if (r != fstrm_res_success) {
103 throw std::runtime_error("FrameStreamLogger: setting " + string(i.name) + " failed: " + std::to_string(r));
104 }
105 }
106 }
107
108 if (connect) {
109 d_iothr = fstrm_iothr_init(d_iothropt, &d_writer);
110 if (!d_iothr) {
111 throw std::runtime_error("FrameStreamLogger: fstrm_iothr_init() failed.");
112 }
113
114 d_ioqueue = fstrm_iothr_get_input_queue(d_iothr);
115 if (!d_ioqueue) {
116 throw std::runtime_error("FrameStreamLogger: fstrm_iothr_get_input_queue() failed.");
117 }
118 }
119 } catch (std::runtime_error &e) {
120 this->cleanup();
121 throw;
122 }
123 }
124
125 void FrameStreamLogger::cleanup()
126 {
127 if (d_iothr != nullptr) {
128 fstrm_iothr_destroy(&d_iothr);
129 d_iothr = nullptr;
130 }
131 if (d_iothropt != nullptr) {
132 fstrm_iothr_options_destroy(&d_iothropt);
133 d_iothropt = nullptr;
134 }
135 if (d_writer != nullptr) {
136 fstrm_writer_destroy(&d_writer);
137 d_writer = nullptr;
138 }
139 if (d_uwopt != nullptr) {
140 fstrm_unix_writer_options_destroy(&d_uwopt);
141 d_uwopt = nullptr;
142 }
143 #ifdef HAVE_FSTRM_TCP_WRITER_INIT
144 if (d_twopt != nullptr) {
145 fstrm_tcp_writer_options_destroy(&d_twopt);
146 d_twopt = nullptr;
147 }
148 #endif
149 if (d_fwopt != nullptr) {
150 fstrm_writer_options_destroy(&d_fwopt);
151 d_fwopt = nullptr;
152 }
153 }
154
155 FrameStreamLogger::~FrameStreamLogger()
156 {
157 this->cleanup();
158 }
159
160 void FrameStreamLogger::queueData(const std::string& data)
161 {
162 if (!d_ioqueue || !d_iothr) {
163 return;
164 }
165 uint8_t *frame = (uint8_t*)malloc(data.length());
166 if (!frame) {
167 #ifdef RECURSOR
168 g_log<<Logger::Warning<<"FrameStreamLogger: cannot allocate memory for stream."<<std::endl;
169 #else
170 warnlog("FrameStreamLogger: cannot allocate memory for stream.");
171 #endif
172 return;
173 }
174 memcpy(frame, data.c_str(), data.length());
175
176 fstrm_res res;
177 res = fstrm_iothr_submit(d_iothr, d_ioqueue, frame, data.length(), fstrm_free_wrapper, nullptr);
178
179 if (res == fstrm_res_success) {
180 // Frame successfully queued.
181 ++d_framesSent;
182 } else if (res == fstrm_res_again) {
183 free(frame);
184 #ifdef RECURSOR
185 g_log<<Logger::Debug<<"FrameStreamLogger: queue full, dropping."<<std::endl;
186 #else
187 vinfolog("FrameStreamLogger: queue full, dropping.");
188 #endif
189 ++d_queueFullDrops;
190 } else {
191 // Permanent failure.
192 free(frame);
193 #ifdef RECURSOR
194 g_log<<Logger::Warning<<"FrameStreamLogger: submitting to queue failed."<<std::endl;
195 #else
196 warnlog("FrameStreamLogger: submitting to queue failed.");
197 #endif
198 ++d_permanentFailures;
199 }
200 }
201
202 #endif /* HAVE_FSTRM */