]>
Commit | Line | Data |
---|---|---|
82a91ddf CH |
1 | #include <unistd.h> |
2 | #include <sys/un.h> | |
3 | ||
4 | #include "config.h" | |
5 | #include "fstrm_logger.hh" | |
d0f0ed4d OM |
6 | |
7 | #ifdef RECURSOR | |
8 | #include "logger.hh" | |
9 | #else | |
82a91ddf | 10 | #include "dolog.hh" |
d0f0ed4d | 11 | #endif |
82a91ddf CH |
12 | |
13 | #define DNSTAP_CONTENT_TYPE "protobuf:dnstap.Dnstap" | |
14 | ||
15 | #ifdef HAVE_FSTRM | |
16 | ||
573f4ff0 OM |
17 | FrameStreamLogger::FrameStreamLogger(const int family, const std::string& address, bool connect, |
18 | const std::unordered_map<string,unsigned>& options): d_family(family), d_address(address) | |
82a91ddf CH |
19 | { |
20 | fstrm_res res; | |
21 | ||
22 | try { | |
23 | d_fwopt = fstrm_writer_options_init(); | |
24 | if (!d_fwopt) { | |
25 | throw std::runtime_error("FrameStreamLogger: fstrm_writer_options_init failed."); | |
26 | } | |
27 | ||
28 | res = fstrm_writer_options_add_content_type(d_fwopt, DNSTAP_CONTENT_TYPE, sizeof(DNSTAP_CONTENT_TYPE) - 1); | |
29 | if (res != fstrm_res_success) { | |
30 | throw std::runtime_error("FrameStreamLogger: fstrm_writer_options_add_content_type failed: " + std::to_string(res)); | |
31 | } | |
32 | ||
33 | if (d_family == AF_UNIX) { | |
34 | struct sockaddr_un local; | |
35 | if (makeUNsockaddr(d_address, &local)) { | |
36 | throw std::runtime_error("FrameStreamLogger: Unable to use '" + d_address + "', it is not a valid UNIX socket path."); | |
37 | } | |
38 | ||
39 | d_uwopt = fstrm_unix_writer_options_init(); | |
40 | if (!d_uwopt) { | |
41 | throw std::runtime_error("FrameStreamLogger: fstrm_unix_writer_options_init failed."); | |
42 | } | |
43 | ||
44 | // void return, no error checking. | |
45 | fstrm_unix_writer_options_set_socket_path(d_uwopt, d_address.c_str()); | |
46 | ||
47 | d_writer = fstrm_unix_writer_init(d_uwopt, d_fwopt); | |
48 | if (!d_writer) { | |
49 | throw std::runtime_error("FrameStreamLogger: fstrm_unix_writer_init() failed."); | |
50 | } | |
51 | #ifdef HAVE_FSTRM_TCP_WRITER_INIT | |
52 | } else if (family == AF_INET) { | |
53 | d_twopt = fstrm_tcp_writer_options_init(); | |
54 | if (!d_twopt) { | |
55 | throw std::runtime_error("FrameStreamLogger: fstrm_tcp_writer_options_init failed."); | |
56 | } | |
57 | ||
58 | try { | |
59 | ComboAddress ca(d_address); | |
60 | ||
61 | // void return, no error checking. | |
62 | fstrm_tcp_writer_options_set_socket_address(d_twopt, ca.toString().c_str()); | |
63 | fstrm_tcp_writer_options_set_socket_port(d_twopt, std::to_string(ca.getPort()).c_str()); | |
64 | } catch (PDNSException &e) { | |
65 | throw std::runtime_error("FrameStreamLogger: Unable to use '" + d_address + "': " + e.reason); | |
66 | } | |
67 | ||
68 | d_writer = fstrm_tcp_writer_init(d_twopt, d_fwopt); | |
69 | if (!d_writer) { | |
70 | throw std::runtime_error("FrameStreamLogger: fstrm_tcp_writer_init() failed."); | |
71 | } | |
72 | #endif | |
73 | } else { | |
74 | throw std::runtime_error("FrameStreamLogger: family " + std::to_string(family) + " not supported"); | |
75 | } | |
76 | ||
77 | d_iothropt = fstrm_iothr_options_init(); | |
78 | if (!d_iothropt) { | |
79 | throw std::runtime_error("FrameStreamLogger: fstrm_iothr_options_init() failed."); | |
80 | } | |
81 | ||
82 | res = fstrm_iothr_options_set_queue_model(d_iothropt, FSTRM_IOTHR_QUEUE_MODEL_MPSC); | |
83 | if (res != fstrm_res_success) { | |
84 | throw std::runtime_error("FrameStreamLogger: fstrm_iothr_options_set_queue_model failed: " + std::to_string(res)); | |
85 | } | |
86 | ||
573f4ff0 OM |
87 | if (options.find("bufferHint") != options.end() && options.at("bufferHint")) { |
88 | res = fstrm_iothr_options_set_buffer_hint(d_iothropt, options.at("bufferHint")); | |
89 | if (res != fstrm_res_success) { | |
90 | throw std::runtime_error("FrameStreamLogger: fstrm_iothr_options_set_buffer_hint failed: " + std::to_string(res)); | |
91 | } | |
92 | } | |
93 | if (options.find("flushTimeout") != options.end() && options.at("flushTimeout")) { | |
94 | res = fstrm_iothr_options_set_flush_timeout(d_iothropt, options.at("flushTimeout")); | |
95 | if (res != fstrm_res_success) { | |
96 | throw std::runtime_error("FrameStreamLogger: fstrm_iothr_options_set_flush_timeout failed: " + std::to_string(res)); | |
97 | } | |
98 | } | |
99 | if (options.find("inputQueueSize") != options.end() && options.at("inputQueueSize")) { | |
100 | res = fstrm_iothr_options_set_input_queue_size(d_iothropt, options.at("inputQueueSize")); | |
101 | if (res != fstrm_res_success) { | |
102 | throw std::runtime_error("FrameStreamLogger: fstrm_iothr_options_set_input_queue_size failed: " + std::to_string(res)); | |
103 | } | |
104 | } | |
105 | if (options.find("outputQueueSize") != options.end() && options.at("outputQueueSize")) { | |
106 | res = fstrm_iothr_options_set_output_queue_size(d_iothropt, options.at("outputQueueSize")); | |
107 | if (res != fstrm_res_success) { | |
108 | throw std::runtime_error("FrameStreamLogger: fstrm_iothr_options_set_output_queue_size failed: " + std::to_string(res)); | |
109 | } | |
110 | } | |
111 | if (options.find("queueNotifyThreshold") != options.end() && options.at("queueNotifyThreshold")) { | |
112 | res = fstrm_iothr_options_set_queue_notify_threshold(d_iothropt, options.at("queueNotifyThreshold")); | |
113 | if (res != fstrm_res_success) { | |
114 | throw std::runtime_error("FrameStreamLogger: fstrm_iothr_options_set_queue_notify_threshold failed: " + std::to_string(res)); | |
115 | } | |
116 | } | |
117 | if (options.find("setReopenInterval") != options.end() && options.at("setReopenInterval")) { | |
118 | res = fstrm_iothr_options_set_reopen_interval(d_iothropt, options.at("setReopenInterval")); | |
119 | if (res != fstrm_res_success) { | |
120 | throw std::runtime_error("FrameStreamLogger: fstrm_iothr_options_set_reopen_interval failed: " + std::to_string(res)); | |
121 | } | |
122 | } | |
123 | ||
124 | ||
6b44773a CH |
125 | if (connect) { |
126 | d_iothr = fstrm_iothr_init(d_iothropt, &d_writer); | |
127 | if (!d_iothr) { | |
128 | throw std::runtime_error("FrameStreamLogger: fstrm_iothr_init() failed."); | |
129 | } | |
82a91ddf | 130 | |
6b44773a CH |
131 | d_ioqueue = fstrm_iothr_get_input_queue(d_iothr); |
132 | if (!d_ioqueue) { | |
133 | throw std::runtime_error("FrameStreamLogger: fstrm_iothr_get_input_queue() failed."); | |
134 | } | |
82a91ddf CH |
135 | } |
136 | } catch (std::runtime_error &e) { | |
137 | this->cleanup(); | |
138 | throw; | |
139 | } | |
140 | } | |
141 | ||
142 | void FrameStreamLogger::cleanup() | |
143 | { | |
144 | if (d_iothr != nullptr) { | |
145 | fstrm_iothr_destroy(&d_iothr); | |
146 | d_iothr = nullptr; | |
147 | } | |
148 | if (d_iothropt != nullptr) { | |
149 | fstrm_iothr_options_destroy(&d_iothropt); | |
150 | d_iothropt = nullptr; | |
151 | } | |
152 | if (d_writer != nullptr) { | |
153 | fstrm_writer_destroy(&d_writer); | |
154 | d_writer = nullptr; | |
155 | } | |
156 | if (d_uwopt != nullptr) { | |
157 | fstrm_unix_writer_options_destroy(&d_uwopt); | |
158 | d_uwopt = nullptr; | |
159 | } | |
160 | #ifdef HAVE_FSTRM_TCP_WRITER_INIT | |
161 | if (d_twopt != nullptr) { | |
162 | fstrm_tcp_writer_options_destroy(&d_twopt); | |
163 | d_twopt = nullptr; | |
164 | } | |
165 | #endif | |
166 | if (d_fwopt != nullptr) { | |
167 | fstrm_writer_options_destroy(&d_fwopt); | |
168 | d_fwopt = nullptr; | |
169 | } | |
170 | } | |
171 | ||
172 | FrameStreamLogger::~FrameStreamLogger() | |
173 | { | |
174 | this->cleanup(); | |
175 | } | |
176 | ||
177 | void FrameStreamLogger::queueData(const std::string& data) | |
178 | { | |
6b44773a CH |
179 | if (!d_ioqueue || !d_iothr) { |
180 | return; | |
181 | } | |
82a91ddf CH |
182 | uint8_t *frame = (uint8_t*)malloc(data.length()); |
183 | if (!frame) { | |
d0f0ed4d OM |
184 | #ifdef RECURSOR |
185 | g_log<<Logger::Warning<<"FrameStreamLogger: cannot allocate memory for stream."<<std::endl; | |
186 | #else | |
82a91ddf | 187 | warnlog("FrameStreamLogger: cannot allocate memory for stream."); |
d0f0ed4d | 188 | #endif |
82a91ddf CH |
189 | return; |
190 | } | |
191 | memcpy(frame, data.c_str(), data.length()); | |
192 | ||
193 | fstrm_res res; | |
194 | res = fstrm_iothr_submit(d_iothr, d_ioqueue, frame, data.length(), fstrm_free_wrapper, nullptr); | |
195 | ||
196 | if (res == fstrm_res_success) { | |
197 | // Frame successfully queued. | |
198 | } else if (res == fstrm_res_again) { | |
199 | free(frame); | |
d0f0ed4d OM |
200 | #ifdef RECURSOR |
201 | g_log<<Logger::Warning<<"FrameStreamLogger: queue full, dropping."<<std::endl; | |
202 | #else | |
82a91ddf | 203 | warnlog("FrameStreamLogger: queue full, dropping."); |
d0f0ed4d OM |
204 | #endif |
205 | } else { | |
82a91ddf CH |
206 | // Permanent failure. |
207 | free(frame); | |
d0f0ed4d OM |
208 | #ifdef RECURSOR |
209 | g_log<<Logger::Warning<<"FrameStreamLogger: submitting to queue failed."<<std::endl; | |
210 | #else | |
82a91ddf | 211 | warnlog("FrameStreamLogger: submitting to queue failed."); |
d0f0ed4d | 212 | #endif |
82a91ddf CH |
213 | } |
214 | } | |
215 | ||
216 | #endif /* HAVE_FSTRM */ |