]>
Commit | Line | Data |
---|---|---|
82a91ddf CH |
1 | #include <unistd.h> |
2 | #include <sys/un.h> | |
3 | ||
4 | #include "config.h" | |
5 | #include "fstrm_logger.hh" | |
6 | #include "dolog.hh" | |
7 | ||
8 | #define DNSTAP_CONTENT_TYPE "protobuf:dnstap.Dnstap" | |
9 | ||
10 | #ifdef HAVE_FSTRM | |
11 | ||
573f4ff0 OM |
12 | FrameStreamLogger::FrameStreamLogger(const int family, const std::string& address, bool connect, |
13 | const std::unordered_map<string,unsigned>& options): d_family(family), d_address(address) | |
82a91ddf CH |
14 | { |
15 | fstrm_res res; | |
16 | ||
17 | try { | |
18 | d_fwopt = fstrm_writer_options_init(); | |
19 | if (!d_fwopt) { | |
20 | throw std::runtime_error("FrameStreamLogger: fstrm_writer_options_init failed."); | |
21 | } | |
22 | ||
23 | res = fstrm_writer_options_add_content_type(d_fwopt, DNSTAP_CONTENT_TYPE, sizeof(DNSTAP_CONTENT_TYPE) - 1); | |
24 | if (res != fstrm_res_success) { | |
25 | throw std::runtime_error("FrameStreamLogger: fstrm_writer_options_add_content_type failed: " + std::to_string(res)); | |
26 | } | |
27 | ||
28 | if (d_family == AF_UNIX) { | |
29 | struct sockaddr_un local; | |
30 | if (makeUNsockaddr(d_address, &local)) { | |
31 | throw std::runtime_error("FrameStreamLogger: Unable to use '" + d_address + "', it is not a valid UNIX socket path."); | |
32 | } | |
33 | ||
34 | d_uwopt = fstrm_unix_writer_options_init(); | |
35 | if (!d_uwopt) { | |
36 | throw std::runtime_error("FrameStreamLogger: fstrm_unix_writer_options_init failed."); | |
37 | } | |
38 | ||
39 | // void return, no error checking. | |
40 | fstrm_unix_writer_options_set_socket_path(d_uwopt, d_address.c_str()); | |
41 | ||
42 | d_writer = fstrm_unix_writer_init(d_uwopt, d_fwopt); | |
43 | if (!d_writer) { | |
44 | throw std::runtime_error("FrameStreamLogger: fstrm_unix_writer_init() failed."); | |
45 | } | |
46 | #ifdef HAVE_FSTRM_TCP_WRITER_INIT | |
47 | } else if (family == AF_INET) { | |
48 | d_twopt = fstrm_tcp_writer_options_init(); | |
49 | if (!d_twopt) { | |
50 | throw std::runtime_error("FrameStreamLogger: fstrm_tcp_writer_options_init failed."); | |
51 | } | |
52 | ||
53 | try { | |
54 | ComboAddress ca(d_address); | |
55 | ||
56 | // void return, no error checking. | |
57 | fstrm_tcp_writer_options_set_socket_address(d_twopt, ca.toString().c_str()); | |
58 | fstrm_tcp_writer_options_set_socket_port(d_twopt, std::to_string(ca.getPort()).c_str()); | |
59 | } catch (PDNSException &e) { | |
60 | throw std::runtime_error("FrameStreamLogger: Unable to use '" + d_address + "': " + e.reason); | |
61 | } | |
62 | ||
63 | d_writer = fstrm_tcp_writer_init(d_twopt, d_fwopt); | |
64 | if (!d_writer) { | |
65 | throw std::runtime_error("FrameStreamLogger: fstrm_tcp_writer_init() failed."); | |
66 | } | |
67 | #endif | |
68 | } else { | |
69 | throw std::runtime_error("FrameStreamLogger: family " + std::to_string(family) + " not supported"); | |
70 | } | |
71 | ||
72 | d_iothropt = fstrm_iothr_options_init(); | |
73 | if (!d_iothropt) { | |
74 | throw std::runtime_error("FrameStreamLogger: fstrm_iothr_options_init() failed."); | |
75 | } | |
76 | ||
77 | res = fstrm_iothr_options_set_queue_model(d_iothropt, FSTRM_IOTHR_QUEUE_MODEL_MPSC); | |
78 | if (res != fstrm_res_success) { | |
79 | throw std::runtime_error("FrameStreamLogger: fstrm_iothr_options_set_queue_model failed: " + std::to_string(res)); | |
80 | } | |
81 | ||
573f4ff0 OM |
82 | if (options.find("bufferHint") != options.end() && options.at("bufferHint")) { |
83 | res = fstrm_iothr_options_set_buffer_hint(d_iothropt, options.at("bufferHint")); | |
84 | if (res != fstrm_res_success) { | |
85 | throw std::runtime_error("FrameStreamLogger: fstrm_iothr_options_set_buffer_hint failed: " + std::to_string(res)); | |
86 | } | |
87 | } | |
88 | if (options.find("flushTimeout") != options.end() && options.at("flushTimeout")) { | |
89 | res = fstrm_iothr_options_set_flush_timeout(d_iothropt, options.at("flushTimeout")); | |
90 | if (res != fstrm_res_success) { | |
91 | throw std::runtime_error("FrameStreamLogger: fstrm_iothr_options_set_flush_timeout failed: " + std::to_string(res)); | |
92 | } | |
93 | } | |
94 | if (options.find("inputQueueSize") != options.end() && options.at("inputQueueSize")) { | |
95 | res = fstrm_iothr_options_set_input_queue_size(d_iothropt, options.at("inputQueueSize")); | |
96 | if (res != fstrm_res_success) { | |
97 | throw std::runtime_error("FrameStreamLogger: fstrm_iothr_options_set_input_queue_size failed: " + std::to_string(res)); | |
98 | } | |
99 | } | |
100 | if (options.find("outputQueueSize") != options.end() && options.at("outputQueueSize")) { | |
101 | res = fstrm_iothr_options_set_output_queue_size(d_iothropt, options.at("outputQueueSize")); | |
102 | if (res != fstrm_res_success) { | |
103 | throw std::runtime_error("FrameStreamLogger: fstrm_iothr_options_set_output_queue_size failed: " + std::to_string(res)); | |
104 | } | |
105 | } | |
106 | if (options.find("queueNotifyThreshold") != options.end() && options.at("queueNotifyThreshold")) { | |
107 | res = fstrm_iothr_options_set_queue_notify_threshold(d_iothropt, options.at("queueNotifyThreshold")); | |
108 | if (res != fstrm_res_success) { | |
109 | throw std::runtime_error("FrameStreamLogger: fstrm_iothr_options_set_queue_notify_threshold failed: " + std::to_string(res)); | |
110 | } | |
111 | } | |
112 | if (options.find("setReopenInterval") != options.end() && options.at("setReopenInterval")) { | |
113 | res = fstrm_iothr_options_set_reopen_interval(d_iothropt, options.at("setReopenInterval")); | |
114 | if (res != fstrm_res_success) { | |
115 | throw std::runtime_error("FrameStreamLogger: fstrm_iothr_options_set_reopen_interval failed: " + std::to_string(res)); | |
116 | } | |
117 | } | |
118 | ||
119 | ||
6b44773a CH |
120 | if (connect) { |
121 | d_iothr = fstrm_iothr_init(d_iothropt, &d_writer); | |
122 | if (!d_iothr) { | |
123 | throw std::runtime_error("FrameStreamLogger: fstrm_iothr_init() failed."); | |
124 | } | |
82a91ddf | 125 | |
6b44773a CH |
126 | d_ioqueue = fstrm_iothr_get_input_queue(d_iothr); |
127 | if (!d_ioqueue) { | |
128 | throw std::runtime_error("FrameStreamLogger: fstrm_iothr_get_input_queue() failed."); | |
129 | } | |
82a91ddf CH |
130 | } |
131 | } catch (std::runtime_error &e) { | |
132 | this->cleanup(); | |
133 | throw; | |
134 | } | |
135 | } | |
136 | ||
137 | void FrameStreamLogger::cleanup() | |
138 | { | |
139 | if (d_iothr != nullptr) { | |
140 | fstrm_iothr_destroy(&d_iothr); | |
141 | d_iothr = nullptr; | |
142 | } | |
143 | if (d_iothropt != nullptr) { | |
144 | fstrm_iothr_options_destroy(&d_iothropt); | |
145 | d_iothropt = nullptr; | |
146 | } | |
147 | if (d_writer != nullptr) { | |
148 | fstrm_writer_destroy(&d_writer); | |
149 | d_writer = nullptr; | |
150 | } | |
151 | if (d_uwopt != nullptr) { | |
152 | fstrm_unix_writer_options_destroy(&d_uwopt); | |
153 | d_uwopt = nullptr; | |
154 | } | |
155 | #ifdef HAVE_FSTRM_TCP_WRITER_INIT | |
156 | if (d_twopt != nullptr) { | |
157 | fstrm_tcp_writer_options_destroy(&d_twopt); | |
158 | d_twopt = nullptr; | |
159 | } | |
160 | #endif | |
161 | if (d_fwopt != nullptr) { | |
162 | fstrm_writer_options_destroy(&d_fwopt); | |
163 | d_fwopt = nullptr; | |
164 | } | |
165 | } | |
166 | ||
167 | FrameStreamLogger::~FrameStreamLogger() | |
168 | { | |
169 | this->cleanup(); | |
170 | } | |
171 | ||
172 | void FrameStreamLogger::queueData(const std::string& data) | |
173 | { | |
6b44773a CH |
174 | if (!d_ioqueue || !d_iothr) { |
175 | return; | |
176 | } | |
82a91ddf CH |
177 | uint8_t *frame = (uint8_t*)malloc(data.length()); |
178 | if (!frame) { | |
179 | warnlog("FrameStreamLogger: cannot allocate memory for stream."); | |
180 | return; | |
181 | } | |
182 | memcpy(frame, data.c_str(), data.length()); | |
183 | ||
184 | fstrm_res res; | |
185 | res = fstrm_iothr_submit(d_iothr, d_ioqueue, frame, data.length(), fstrm_free_wrapper, nullptr); | |
186 | ||
187 | if (res == fstrm_res_success) { | |
188 | // Frame successfully queued. | |
189 | } else if (res == fstrm_res_again) { | |
190 | free(frame); | |
191 | warnlog("FrameStreamLogger: queue full, dropping."); | |
192 | } else { | |
193 | // Permanent failure. | |
194 | free(frame); | |
195 | warnlog("FrameStreamLogger: submitting to queue failed."); | |
196 | } | |
197 | } | |
198 | ||
199 | #endif /* HAVE_FSTRM */ |