]>
Commit | Line | Data |
---|---|---|
9eb5394a RG |
1 | /* |
2 | * This file is part of PowerDNS or dnsdist. | |
3 | * Copyright -- PowerDNS.COM B.V. and its contributors | |
4 | * | |
5 | * This program is free software; you can redistribute it and/or modify | |
6 | * it under the terms of version 2 of the GNU General Public License as | |
7 | * published by the Free Software Foundation. | |
8 | * | |
9 | * In addition, for the avoidance of any doubt, permission is granted to | |
10 | * link this program with OpenSSL and to (re)distribute the binaries | |
11 | * produced as the result of such linking. | |
12 | * | |
13 | * This program is distributed in the hope that it will be useful, | |
14 | * but WITHOUT ANY WARRANTY; without even the implied warranty of | |
15 | * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the | |
16 | * GNU General Public License for more details. | |
17 | * | |
18 | * You should have received a copy of the GNU General Public License | |
19 | * along with this program; if not, write to the Free Software | |
20 | * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. | |
21 | */ | |
d523caf3 | 22 | |
6e069889 RG |
23 | #include "config.h" |
24 | ||
cf25b82b | 25 | #if defined(HAVE_DNS_OVER_HTTPS) && defined(HAVE_NGHTTP2) |
d523caf3 | 26 | #include <nghttp2/nghttp2.h> |
cf25b82b | 27 | #endif /* HAVE_DNS_OVER_HTTPS && HAVE_NGHTTP2 */ |
d523caf3 | 28 | |
9eb5394a | 29 | #include "dnsdist-nghttp2.hh" |
7e8a05fa | 30 | #include "dnsdist-nghttp2-in.hh" |
9eb5394a RG |
31 | #include "dnsdist-tcp.hh" |
32 | #include "dnsdist-tcp-downstream.hh" | |
8a2dd7db | 33 | #include "dnsdist-downstream-connection.hh" |
9eb5394a RG |
34 | |
35 | #include "dolog.hh" | |
0a5cb883 | 36 | #include "channel.hh" |
d523caf3 RG |
37 | #include "iputils.hh" |
38 | #include "libssl.hh" | |
39 | #include "noinitvector.hh" | |
40 | #include "tcpiohandler.hh" | |
9eb5394a | 41 | #include "threadname.hh" |
d523caf3 RG |
42 | #include "sstuff.hh" |
43 | ||
9eb5394a RG |
44 | std::atomic<uint64_t> g_dohStatesDumpRequested{0}; |
45 | std::unique_ptr<DoHClientCollection> g_dohClientThreads{nullptr}; | |
b146a84f | 46 | std::optional<uint16_t> g_outgoingDoHWorkerThreads{std::nullopt}; |
9eb5394a | 47 | |
cf25b82b | 48 | #if defined(HAVE_DNS_OVER_HTTPS) && defined(HAVE_NGHTTP2) |
645a1ca4 | 49 | class DoHConnectionToBackend : public ConnectionToBackend |
9eb5394a RG |
50 | { |
51 | public: | |
9fd8dc1f | 52 | DoHConnectionToBackend(const std::shared_ptr<DownstreamState>& ds, std::unique_ptr<FDMultiplexer>& mplexer, const struct timeval& now, std::string&& proxyProtocolPayload); |
9eb5394a | 53 | |
e82bf80f | 54 | void handleTimeout(const struct timeval& now, bool write) override; |
9eb5394a RG |
55 | void queueQuery(std::shared_ptr<TCPQuerySender>& sender, TCPQuery&& query) override; |
56 | ||
57 | std::string toString() const override | |
58 | { | |
59 | ostringstream o; | |
d9fffb37 | 60 | o << "DoH connection to backend " << (d_ds ? d_ds->getName() : "empty") << " over FD " << (d_handler ? std::to_string(d_handler->getDescriptor()) : "no socket") << ", " << getConcurrentStreamsCount() << " streams"; |
9eb5394a RG |
61 | return o.str(); |
62 | } | |
63 | ||
97b165e1 RG |
64 | void setHealthCheck(bool h) |
65 | { | |
66 | d_healthCheckQuery = h; | |
67 | } | |
68 | ||
9fd8dc1f | 69 | void stopIO() override; |
645a1ca4 RG |
70 | bool reachedMaxConcurrentQueries() const override; |
71 | bool reachedMaxStreamID() const override; | |
767f5514 | 72 | bool isIdle() const override; |
4ed2c08c RG |
73 | void release() override |
74 | { | |
75 | } | |
becad613 | 76 | |
9eb5394a RG |
77 | private: |
78 | static ssize_t send_callback(nghttp2_session* session, const uint8_t* data, size_t length, int flags, void* user_data); | |
79 | static int on_frame_recv_callback(nghttp2_session* session, const nghttp2_frame* frame, void* user_data); | |
80 | static int on_data_chunk_recv_callback(nghttp2_session* session, uint8_t flags, int32_t stream_id, const uint8_t* data, size_t len, void* user_data); | |
81 | static int on_stream_close_callback(nghttp2_session* session, int32_t stream_id, uint32_t error_code, void* user_data); | |
82 | static int on_header_callback(nghttp2_session* session, const nghttp2_frame* frame, const uint8_t* name, size_t namelen, const uint8_t* value, size_t valuelen, uint8_t flags, void* user_data); | |
9eb5394a RG |
83 | static int on_error_callback(nghttp2_session* session, int lib_error_code, const char* msg, size_t len, void* user_data); |
84 | static void handleReadableIOCallback(int fd, FDMultiplexer::funcparam_t& param); | |
85 | static void handleWritableIOCallback(int fd, FDMultiplexer::funcparam_t& param); | |
9eb5394a RG |
86 | |
87 | class PendingRequest | |
88 | { | |
89 | public: | |
90 | std::shared_ptr<TCPQuerySender> d_sender{nullptr}; | |
91 | TCPQuery d_query; | |
92 | PacketBuffer d_buffer; | |
f05cd66c | 93 | size_t d_queryPos{0}; |
e82bf80f | 94 | uint16_t d_responseCode{0}; |
9eb5394a RG |
95 | bool d_finished{false}; |
96 | }; | |
ea090af9 | 97 | void updateIO(IOState newState, FDMultiplexer::callbackfunc_t callback, bool noTTD = false); |
becad613 | 98 | void watchForRemoteHostClosingConnection(); |
9eb5394a | 99 | void handleResponse(PendingRequest&& request); |
e82bf80f | 100 | void handleResponseError(PendingRequest&& request, const struct timeval& now); |
f05cd66c | 101 | void handleIOError(); |
e82bf80f | 102 | uint32_t getConcurrentStreamsCount() const; |
9eb5394a | 103 | |
e82bf80f RG |
104 | size_t getUsageCount() const |
105 | { | |
106 | auto ref = shared_from_this(); | |
107 | return ref.use_count(); | |
108 | } | |
109 | ||
110 | static const std::unordered_map<std::string, std::string> s_constants; | |
9eb5394a | 111 | |
9eb5394a | 112 | std::unordered_map<int32_t, PendingRequest> d_currentStreams; |
73571c03 | 113 | std::string d_proxyProtocolPayload; |
9eb5394a RG |
114 | PacketBuffer d_out; |
115 | PacketBuffer d_in; | |
73571c03 | 116 | std::unique_ptr<nghttp2_session, void (*)(nghttp2_session*)> d_session{nullptr, nghttp2_session_del}; |
9eb5394a RG |
117 | size_t d_outPos{0}; |
118 | size_t d_inPos{0}; | |
97b165e1 | 119 | bool d_healthCheckQuery{false}; |
eec63896 | 120 | bool d_firstWrite{true}; |
9eb5394a RG |
121 | }; |
122 | ||
9fd8dc1f RG |
123 | using DownstreamDoHConnectionsManager = DownstreamConnectionsManager<DoHConnectionToBackend>; |
124 | thread_local DownstreamDoHConnectionsManager t_downstreamDoHConnectionsManager; | |
e82bf80f RG |
125 | |
126 | uint32_t DoHConnectionToBackend::getConcurrentStreamsCount() const | |
127 | { | |
128 | return d_currentStreams.size(); | |
129 | } | |
9eb5394a RG |
130 | |
131 | void DoHConnectionToBackend::handleResponse(PendingRequest&& request) | |
132 | { | |
35b27ac8 RG |
133 | struct timeval now |
134 | { | |
135 | .tv_sec = 0, .tv_usec = 0 | |
136 | }; | |
137 | ||
9eb5394a | 138 | gettimeofday(&now, nullptr); |
f05cd66c | 139 | try { |
0c51513a | 140 | if (!d_healthCheckQuery) { |
d5d15da1 | 141 | const double udiff = request.d_query.d_idstate.queryRealTime.udiff(); |
3fbea485 | 142 | d_ds->updateTCPLatency(udiff); |
e3ab12d9 RG |
143 | if (request.d_buffer.size() >= sizeof(dnsheader)) { |
144 | dnsheader dh; | |
145 | memcpy(&dh, request.d_buffer.data(), sizeof(dh)); | |
146 | d_ds->reportResponse(dh.rcode); | |
147 | } | |
148 | else { | |
149 | d_ds->reportTimeoutOrError(); | |
150 | } | |
0c51513a RG |
151 | } |
152 | ||
7e8a05fa RG |
153 | TCPResponse response(std::move(request.d_query)); |
154 | response.d_buffer = std::move(request.d_buffer); | |
155 | response.d_connection = shared_from_this(); | |
156 | response.d_ds = d_ds; | |
157 | request.d_sender->handleResponse(now, std::move(response)); | |
f05cd66c RG |
158 | } |
159 | catch (const std::exception& e) { | |
160 | vinfolog("Got exception while handling response for cross-protocol DoH: %s", e.what()); | |
161 | } | |
9eb5394a RG |
162 | } |
163 | ||
e82bf80f RG |
164 | void DoHConnectionToBackend::handleResponseError(PendingRequest&& request, const struct timeval& now) |
165 | { | |
f05cd66c | 166 | try { |
93b395a5 RG |
167 | if (!d_healthCheckQuery) { |
168 | d_ds->reportTimeoutOrError(); | |
169 | } | |
e3ab12d9 | 170 | |
7e8a05fa RG |
171 | TCPResponse response(PacketBuffer(), std::move(request.d_query.d_idstate), nullptr, nullptr); |
172 | request.d_sender->notifyIOError(now, std::move(response)); | |
f05cd66c RG |
173 | } |
174 | catch (const std::exception& e) { | |
175 | vinfolog("Got exception while handling response for cross-protocol DoH: %s", e.what()); | |
176 | } | |
e82bf80f RG |
177 | } |
178 | ||
f05cd66c | 179 | void DoHConnectionToBackend::handleIOError() |
e82bf80f RG |
180 | { |
181 | d_connectionDied = true; | |
f05cd66c RG |
182 | nghttp2_session_terminate_session(d_session.get(), NGHTTP2_PROTOCOL_ERROR); |
183 | ||
35b27ac8 RG |
184 | struct timeval now |
185 | { | |
186 | .tv_sec = 0, .tv_usec = 0 | |
187 | }; | |
188 | ||
f05cd66c | 189 | gettimeofday(&now, nullptr); |
e82bf80f RG |
190 | for (auto& request : d_currentStreams) { |
191 | handleResponseError(std::move(request.second), now); | |
192 | } | |
f05cd66c | 193 | |
e82bf80f | 194 | d_currentStreams.clear(); |
f05cd66c RG |
195 | stopIO(); |
196 | } | |
197 | ||
198 | void DoHConnectionToBackend::handleTimeout(const struct timeval& now, bool write) | |
199 | { | |
eec63896 RG |
200 | if (write) { |
201 | if (d_firstWrite) { | |
202 | ++d_ds->tcpConnectTimeouts; | |
203 | } | |
204 | else { | |
205 | ++d_ds->tcpWriteTimeouts; | |
206 | } | |
207 | } | |
208 | else { | |
209 | ++d_ds->tcpReadTimeouts; | |
210 | } | |
211 | ||
f05cd66c | 212 | handleIOError(); |
e82bf80f RG |
213 | } |
214 | ||
eec63896 RG |
215 | bool DoHConnectionToBackend::reachedMaxStreamID() const |
216 | { | |
217 | const uint32_t maximumStreamID = (static_cast<uint32_t>(1) << 31) - 1; | |
218 | return d_highestStreamID == maximumStreamID; | |
219 | } | |
220 | ||
645a1ca4 | 221 | bool DoHConnectionToBackend::reachedMaxConcurrentQueries() const |
e82bf80f | 222 | { |
f468a7fe | 223 | // cerr<<"Got "<<getConcurrentStreamsCount()<<" concurrent streams, max is "<<nghttp2_session_get_remote_settings(d_session.get(), NGHTTP2_SETTINGS_MAX_CONCURRENT_STREAMS)<<endl; |
e82bf80f | 224 | if (nghttp2_session_get_remote_settings(d_session.get(), NGHTTP2_SETTINGS_MAX_CONCURRENT_STREAMS) <= getConcurrentStreamsCount()) { |
eec63896 RG |
225 | return true; |
226 | } | |
eec63896 RG |
227 | return false; |
228 | } | |
229 | ||
767f5514 RG |
230 | bool DoHConnectionToBackend::isIdle() const |
231 | { | |
232 | return getConcurrentStreamsCount() == 0; | |
233 | } | |
234 | ||
9eb5394a RG |
235 | void DoHConnectionToBackend::queueQuery(std::shared_ptr<TCPQuerySender>& sender, TCPQuery&& query) |
236 | { | |
9eb5394a | 237 | auto payloadSize = std::to_string(query.d_buffer.size()); |
9eb5394a | 238 | |
8cf75ac2 | 239 | bool addXForwarded = d_ds->d_config.d_addXForwardedHeaders; |
5a96fcd2 RG |
240 | |
241 | /* We use nghttp2_nv_flag.NGHTTP2_NV_FLAG_NO_COPY_NAME and nghttp2_nv_flag.NGHTTP2_NV_FLAG_NO_COPY_VALUE | |
242 | to avoid a copy and lowercasing but we need to make sure that the data will outlive the request (nghttp2_on_frame_send_callback called), and that it is already lowercased. */ | |
243 | std::vector<nghttp2_nv> headers; | |
244 | // these need to live until after the request headers have been processed | |
245 | std::string remote; | |
246 | std::string remotePort; | |
247 | headers.reserve(8 + (addXForwarded ? 3 : 0)); | |
248 | ||
249 | /* Pseudo-headers need to come first (rfc7540 8.1.2.1) */ | |
83abf7f6 RG |
250 | NGHTTP2Headers::addStaticHeader(headers, NGHTTP2Headers::HeaderConstantIndexes::METHOD_NAME, NGHTTP2Headers::HeaderConstantIndexes::METHOD_VALUE); |
251 | NGHTTP2Headers::addStaticHeader(headers, NGHTTP2Headers::HeaderConstantIndexes::SCHEME_NAME, NGHTTP2Headers::HeaderConstantIndexes::SCHEME_VALUE); | |
252 | NGHTTP2Headers::addDynamicHeader(headers, NGHTTP2Headers::HeaderConstantIndexes::AUTHORITY_NAME, d_ds->d_config.d_tlsSubjectName); | |
253 | NGHTTP2Headers::addDynamicHeader(headers, NGHTTP2Headers::HeaderConstantIndexes::PATH_NAME, d_ds->d_config.d_dohPath); | |
254 | NGHTTP2Headers::addStaticHeader(headers, NGHTTP2Headers::HeaderConstantIndexes::ACCEPT_NAME, NGHTTP2Headers::HeaderConstantIndexes::ACCEPT_VALUE); | |
255 | NGHTTP2Headers::addStaticHeader(headers, NGHTTP2Headers::HeaderConstantIndexes::CONTENT_TYPE_NAME, NGHTTP2Headers::HeaderConstantIndexes::CONTENT_TYPE_VALUE); | |
256 | NGHTTP2Headers::addStaticHeader(headers, NGHTTP2Headers::HeaderConstantIndexes::USER_AGENT_NAME, NGHTTP2Headers::HeaderConstantIndexes::USER_AGENT_VALUE); | |
257 | NGHTTP2Headers::addDynamicHeader(headers, NGHTTP2Headers::HeaderConstantIndexes::CONTENT_LENGTH_NAME, payloadSize); | |
5a96fcd2 | 258 | /* no need to add these headers for health-check queries */ |
f05cd66c RG |
259 | if (addXForwarded && query.d_idstate.origRemote.getPort() != 0) { |
260 | remote = query.d_idstate.origRemote.toString(); | |
261 | remotePort = std::to_string(query.d_idstate.origRemote.getPort()); | |
83abf7f6 RG |
262 | NGHTTP2Headers::addDynamicHeader(headers, NGHTTP2Headers::HeaderConstantIndexes::X_FORWARDED_FOR_NAME, remote); |
263 | NGHTTP2Headers::addDynamicHeader(headers, NGHTTP2Headers::HeaderConstantIndexes::X_FORWARDED_PORT_NAME, remotePort); | |
f05cd66c RG |
264 | if (query.d_idstate.cs != nullptr) { |
265 | if (query.d_idstate.cs->isUDP()) { | |
83abf7f6 | 266 | NGHTTP2Headers::addStaticHeader(headers, NGHTTP2Headers::HeaderConstantIndexes::X_FORWARDED_PROTO_NAME, NGHTTP2Headers::HeaderConstantIndexes::X_FORWARDED_PROTO_VALUE_DNS_OVER_UDP); |
5a96fcd2 | 267 | } |
f05cd66c RG |
268 | else if (query.d_idstate.cs->isDoH()) { |
269 | if (query.d_idstate.cs->hasTLS()) { | |
83abf7f6 | 270 | NGHTTP2Headers::addStaticHeader(headers, NGHTTP2Headers::HeaderConstantIndexes::X_FORWARDED_PROTO_NAME, NGHTTP2Headers::HeaderConstantIndexes::X_FORWARDED_PROTO_VALUE_DNS_OVER_HTTPS); |
5a96fcd2 RG |
271 | } |
272 | else { | |
83abf7f6 | 273 | NGHTTP2Headers::addStaticHeader(headers, NGHTTP2Headers::HeaderConstantIndexes::X_FORWARDED_PROTO_NAME, NGHTTP2Headers::HeaderConstantIndexes::X_FORWARDED_PROTO_VALUE_DNS_OVER_HTTP); |
5a96fcd2 RG |
274 | } |
275 | } | |
f05cd66c | 276 | else if (query.d_idstate.cs->hasTLS()) { |
83abf7f6 | 277 | NGHTTP2Headers::addStaticHeader(headers, NGHTTP2Headers::HeaderConstantIndexes::X_FORWARDED_PROTO_NAME, NGHTTP2Headers::HeaderConstantIndexes::X_FORWARDED_PROTO_VALUE_DNS_OVER_TLS); |
5a96fcd2 RG |
278 | } |
279 | else { | |
83abf7f6 | 280 | NGHTTP2Headers::addStaticHeader(headers, NGHTTP2Headers::HeaderConstantIndexes::X_FORWARDED_PROTO_NAME, NGHTTP2Headers::HeaderConstantIndexes::X_FORWARDED_PROTO_VALUE_DNS_OVER_TCP); |
5a96fcd2 RG |
281 | } |
282 | } | |
283 | } | |
284 | ||
f05cd66c RG |
285 | PendingRequest pending; |
286 | pending.d_query = std::move(query); | |
287 | pending.d_sender = std::move(sender); | |
288 | ||
289 | uint32_t streamId = nghttp2_session_get_next_stream_id(d_session.get()); | |
290 | auto insertPair = d_currentStreams.insert({streamId, std::move(pending)}); | |
291 | if (!insertPair.second) { | |
292 | /* there is a stream ID collision, something is very wrong! */ | |
293 | d_connectionDied = true; | |
294 | nghttp2_session_terminate_session(d_session.get(), NGHTTP2_NO_ERROR); | |
295 | throw std::runtime_error("Stream ID collision"); | |
296 | } | |
297 | ||
5a96fcd2 RG |
298 | /* if data_prd is not NULL, it provides data which will be sent in subsequent DATA frames. In this case, a method that allows request message bodies (https://tools.ietf.org/html/rfc7231#section-4) must be specified with :method key (e.g. POST). This function does not take ownership of the data_prd. The function copies the members of the data_prd. If data_prd is NULL, HEADERS have END_STREAM set. |
299 | */ | |
9eb5394a | 300 | nghttp2_data_provider data_provider; |
f05cd66c | 301 | |
9eb5394a | 302 | data_provider.source.ptr = this; |
d9fffb37 | 303 | data_provider.read_callback = [](nghttp2_session* session, int32_t stream_id, uint8_t* buf, size_t length, uint32_t* data_flags, nghttp2_data_source* source, void* user_data) -> ssize_t { |
06140536 | 304 | auto* conn = static_cast<DoHConnectionToBackend*>(user_data); |
f05cd66c | 305 | auto& request = conn->d_currentStreams.at(stream_id); |
e82bf80f | 306 | size_t toCopy = 0; |
f05cd66c RG |
307 | if (request.d_queryPos < request.d_query.d_buffer.size()) { |
308 | size_t remaining = request.d_query.d_buffer.size() - request.d_queryPos; | |
e82bf80f | 309 | toCopy = length > remaining ? remaining : length; |
f05cd66c RG |
310 | memcpy(buf, &request.d_query.d_buffer.at(request.d_queryPos), toCopy); |
311 | request.d_queryPos += toCopy; | |
e82bf80f RG |
312 | } |
313 | ||
f05cd66c | 314 | if (request.d_queryPos >= request.d_query.d_buffer.size()) { |
d9fffb37 | 315 | *data_flags |= NGHTTP2_DATA_FLAG_EOF; |
e82bf80f | 316 | } |
9eb5394a RG |
317 | return toCopy; |
318 | }; | |
319 | ||
f05cd66c RG |
320 | auto newStreamId = nghttp2_submit_request(d_session.get(), nullptr, headers.data(), headers.size(), &data_provider, this); |
321 | if (newStreamId < 0) { | |
e82bf80f | 322 | d_connectionDied = true; |
eec63896 | 323 | ++d_ds->tcpDiedSendingQuery; |
f05cd66c RG |
324 | d_currentStreams.erase(streamId); |
325 | throw std::runtime_error("Error submitting HTTP request:" + std::string(nghttp2_strerror(newStreamId))); | |
9eb5394a | 326 | } |
e82bf80f | 327 | |
9eb5394a | 328 | auto rv = nghttp2_session_send(d_session.get()); |
9eb5394a | 329 | if (rv != 0) { |
e82bf80f | 330 | d_connectionDied = true; |
eec63896 | 331 | ++d_ds->tcpDiedSendingQuery; |
f05cd66c | 332 | d_currentStreams.erase(streamId); |
9eb5394a RG |
333 | throw std::runtime_error("Error in nghttp2_session_send:" + std::to_string(rv)); |
334 | } | |
e82bf80f | 335 | |
f05cd66c | 336 | d_highestStreamID = newStreamId; |
9eb5394a RG |
337 | } |
338 | ||
339 | class DoHClientThreadData | |
d523caf3 | 340 | { |
9eb5394a | 341 | public: |
0a5cb883 RG |
342 | DoHClientThreadData(pdns::channel::Receiver<CrossProtocolQuery>&& receiver) : |
343 | mplexer(std::unique_ptr<FDMultiplexer>(FDMultiplexer::getMultiplexerSilent())), | |
344 | d_receiver(std::move(receiver)) | |
9eb5394a RG |
345 | { |
346 | } | |
347 | ||
348 | std::unique_ptr<FDMultiplexer> mplexer{nullptr}; | |
0a5cb883 | 349 | pdns::channel::Receiver<CrossProtocolQuery> d_receiver; |
d523caf3 RG |
350 | }; |
351 | ||
9eb5394a RG |
352 | void DoHConnectionToBackend::handleReadableIOCallback(int fd, FDMultiplexer::funcparam_t& param) |
353 | { | |
9eb5394a RG |
354 | auto conn = boost::any_cast<std::shared_ptr<DoHConnectionToBackend>>(param); |
355 | if (fd != conn->getHandle()) { | |
356 | throw std::runtime_error("Unexpected socket descriptor " + std::to_string(fd) + " received in " + std::string(__PRETTY_FUNCTION__) + ", expected " + std::to_string(conn->getHandle())); | |
357 | } | |
358 | ||
359 | IOStateGuard ioGuard(conn->d_ioState); | |
360 | do { | |
361 | conn->d_inPos = 0; | |
362 | conn->d_in.resize(conn->d_in.size() + 512); | |
f05cd66c | 363 | // cerr<<"trying to read "<<conn->d_in.size()<<endl; |
9eb5394a RG |
364 | try { |
365 | IOState newState = conn->d_handler->tryRead(conn->d_in, conn->d_inPos, conn->d_in.size(), true); | |
f05cd66c | 366 | // cerr<<"got a "<<(int)newState<<" state and "<<conn->d_inPos<<" bytes"<<endl; |
9eb5394a | 367 | conn->d_in.resize(conn->d_inPos); |
f05cd66c RG |
368 | |
369 | if (conn->d_inPos > 0) { | |
370 | /* we got something */ | |
9eb5394a | 371 | auto readlen = nghttp2_session_mem_recv(conn->d_session.get(), conn->d_in.data(), conn->d_inPos); |
f05cd66c | 372 | // cerr<<"nghttp2_session_mem_recv returned "<<readlen<<endl; |
9eb5394a RG |
373 | /* as long as we don't require a pause by returning nghttp2_error.NGHTTP2_ERR_PAUSE from a CB, |
374 | all data should be consumed before returning */ | |
375 | if (readlen > 0 && static_cast<size_t>(readlen) < conn->d_inPos) { | |
f05cd66c | 376 | throw std::runtime_error("Fatal error while passing received data to nghttp2: " + std::string(nghttp2_strerror((int)readlen))); |
9eb5394a | 377 | } |
eec63896 | 378 | |
35b27ac8 RG |
379 | struct timeval now |
380 | { | |
381 | .tv_sec = 0, .tv_usec = 0 | |
382 | }; | |
383 | ||
eec63896 RG |
384 | gettimeofday(&now, nullptr); |
385 | conn->d_lastDataReceivedTime = now; | |
386 | ||
f05cd66c | 387 | // cerr<<"after read send"<<endl; |
e82bf80f | 388 | nghttp2_session_send(conn->d_session.get()); |
f05cd66c RG |
389 | } |
390 | ||
391 | if (newState == IOState::Done) { | |
54a9a226 | 392 | if (conn->isIdle()) { |
89e62bd8 | 393 | conn->stopIO(); |
becad613 | 394 | conn->watchForRemoteHostClosingConnection(); |
89e62bd8 RG |
395 | ioGuard.release(); |
396 | break; | |
397 | } | |
9eb5394a RG |
398 | } |
399 | else { | |
400 | if (newState == IOState::NeedWrite) { | |
f05cd66c | 401 | // cerr<<"need write"<<endl; |
9eb5394a RG |
402 | conn->updateIO(IOState::NeedWrite, handleReadableIOCallback); |
403 | } | |
404 | ioGuard.release(); | |
405 | break; | |
406 | } | |
407 | } | |
408 | catch (const std::exception& e) { | |
f05cd66c | 409 | vinfolog("Exception while trying to read from HTTP backend connection: %s", e.what()); |
eec63896 | 410 | ++conn->d_ds->tcpDiedReadingResponse; |
f05cd66c | 411 | conn->handleIOError(); |
9eb5394a RG |
412 | break; |
413 | } | |
d9fffb37 | 414 | } while (conn->getConcurrentStreamsCount() > 0); |
9eb5394a RG |
415 | } |
416 | ||
417 | void DoHConnectionToBackend::handleWritableIOCallback(int fd, FDMultiplexer::funcparam_t& param) | |
418 | { | |
9eb5394a RG |
419 | auto conn = boost::any_cast<std::shared_ptr<DoHConnectionToBackend>>(param); |
420 | if (fd != conn->getHandle()) { | |
421 | throw std::runtime_error("Unexpected socket descriptor " + std::to_string(fd) + " received in " + std::string(__PRETTY_FUNCTION__) + ", expected " + std::to_string(conn->getHandle())); | |
422 | } | |
423 | IOStateGuard ioGuard(conn->d_ioState); | |
424 | ||
f05cd66c | 425 | // cerr<<"in "<<__PRETTY_FUNCTION__<<" trying to write "<<conn->d_out.size()-conn->d_outPos<<endl; |
9eb5394a RG |
426 | try { |
427 | IOState newState = conn->d_handler->tryWrite(conn->d_out, conn->d_outPos, conn->d_out.size()); | |
f05cd66c | 428 | // cerr<<"got a "<<(int)newState<<" state, "<<conn->d_out.size()-conn->d_outPos<<" bytes remaining"<<endl; |
9eb5394a RG |
429 | if (newState == IOState::NeedRead) { |
430 | conn->updateIO(IOState::NeedRead, handleWritableIOCallback); | |
431 | } | |
432 | else if (newState == IOState::Done) { | |
f05cd66c | 433 | // cerr<<"done, buffer size was "<<conn->d_out.size()<<", pos was "<<conn->d_outPos<<endl; |
eec63896 | 434 | conn->d_firstWrite = false; |
9eb5394a RG |
435 | conn->d_out.clear(); |
436 | conn->d_outPos = 0; | |
437 | conn->stopIO(); | |
54a9a226 | 438 | if (!conn->isIdle()) { |
f05cd66c RG |
439 | conn->updateIO(IOState::NeedRead, handleReadableIOCallback); |
440 | } | |
becad613 RG |
441 | else { |
442 | conn->watchForRemoteHostClosingConnection(); | |
443 | } | |
9eb5394a RG |
444 | } |
445 | ioGuard.release(); | |
446 | } | |
447 | catch (const std::exception& e) { | |
f05cd66c | 448 | vinfolog("Exception while trying to write (ready) to HTTP backend connection: %s", e.what()); |
eec63896 | 449 | ++conn->d_ds->tcpDiedSendingQuery; |
f05cd66c | 450 | conn->handleIOError(); |
9eb5394a | 451 | } |
9eb5394a RG |
452 | } |
453 | ||
454 | void DoHConnectionToBackend::stopIO() | |
455 | { | |
456 | d_ioState->reset(); | |
f05cd66c | 457 | |
a34f264a RG |
458 | if (isIdle()) { |
459 | auto shared = std::dynamic_pointer_cast<DoHConnectionToBackend>(shared_from_this()); | |
460 | if (!willBeReusable(false)) { | |
461 | /* remove ourselves from the connection cache, this might mean that our | |
462 | reference count drops to zero after that, so we need to be careful */ | |
463 | t_downstreamDoHConnectionsManager.removeDownstreamConnection(shared); | |
464 | } | |
465 | else { | |
466 | t_downstreamDoHConnectionsManager.moveToIdle(shared); | |
467 | } | |
54a9a226 | 468 | } |
9eb5394a RG |
469 | } |
470 | ||
becad613 | 471 | void DoHConnectionToBackend::updateIO(IOState newState, FDMultiplexer::callbackfunc_t callback, bool noTTD) |
9eb5394a | 472 | { |
35b27ac8 RG |
473 | struct timeval now |
474 | { | |
475 | .tv_sec = 0, .tv_usec = 0 | |
476 | }; | |
477 | ||
9eb5394a RG |
478 | gettimeofday(&now, nullptr); |
479 | boost::optional<struct timeval> ttd{boost::none}; | |
becad613 RG |
480 | if (!noTTD) { |
481 | if (d_healthCheckQuery) { | |
482 | ttd = getBackendHealthCheckTTD(now); | |
483 | } | |
484 | else if (newState == IOState::NeedRead) { | |
485 | ttd = getBackendReadTTD(now); | |
486 | } | |
487 | else if (isFresh() && d_firstWrite) { | |
488 | /* first write just after the non-blocking connect */ | |
489 | ttd = getBackendConnectTTD(now); | |
490 | } | |
491 | else { | |
492 | ttd = getBackendWriteTTD(now); | |
493 | } | |
9eb5394a RG |
494 | } |
495 | ||
496 | auto shared = std::dynamic_pointer_cast<DoHConnectionToBackend>(shared_from_this()); | |
497 | if (shared) { | |
498 | if (newState == IOState::NeedRead) { | |
499 | d_ioState->update(newState, callback, shared, ttd); | |
500 | } | |
501 | else if (newState == IOState::NeedWrite) { | |
502 | d_ioState->update(newState, callback, shared, ttd); | |
503 | } | |
504 | } | |
505 | } | |
506 | ||
becad613 RG |
507 | void DoHConnectionToBackend::watchForRemoteHostClosingConnection() |
508 | { | |
645a1ca4 | 509 | if (willBeReusable(false) && !d_healthCheckQuery) { |
becad613 RG |
510 | updateIO(IOState::NeedRead, handleReadableIOCallback, false); |
511 | } | |
512 | } | |
513 | ||
d9fffb37 RG |
514 | ssize_t DoHConnectionToBackend::send_callback(nghttp2_session* session, const uint8_t* data, size_t length, int flags, void* user_data) |
515 | { | |
e82bf80f RG |
516 | DoHConnectionToBackend* conn = reinterpret_cast<DoHConnectionToBackend*>(user_data); |
517 | bool bufferWasEmpty = conn->d_out.empty(); | |
0e6892c6 RG |
518 | if (!conn->d_proxyProtocolPayloadSent && !conn->d_proxyProtocolPayload.empty()) { |
519 | conn->d_out.insert(conn->d_out.end(), conn->d_proxyProtocolPayload.begin(), conn->d_proxyProtocolPayload.end()); | |
520 | conn->d_proxyProtocolPayloadSent = true; | |
521 | } | |
522 | ||
e82bf80f | 523 | conn->d_out.insert(conn->d_out.end(), data, data + length); |
9eb5394a RG |
524 | |
525 | if (bufferWasEmpty) { | |
e82bf80f | 526 | try { |
f05cd66c | 527 | // cerr<<"in "<<__PRETTY_FUNCTION__<<" trying to write "<<conn->d_out.size()-conn->d_outPos<<endl; |
e82bf80f | 528 | auto state = conn->d_handler->tryWrite(conn->d_out, conn->d_outPos, conn->d_out.size()); |
f05cd66c | 529 | // cerr<<"got a "<<(int)state<<" state, "<<conn->d_out.size()-conn->d_outPos<<" bytes remaining"<<endl; |
e82bf80f | 530 | if (state == IOState::Done) { |
eec63896 | 531 | conn->d_firstWrite = false; |
e82bf80f RG |
532 | conn->d_out.clear(); |
533 | conn->d_outPos = 0; | |
f05cd66c | 534 | conn->stopIO(); |
54a9a226 | 535 | if (!conn->isIdle()) { |
f05cd66c RG |
536 | conn->updateIO(IOState::NeedRead, handleReadableIOCallback); |
537 | } | |
becad613 RG |
538 | else { |
539 | conn->watchForRemoteHostClosingConnection(); | |
540 | } | |
e82bf80f RG |
541 | } |
542 | else { | |
543 | conn->updateIO(state, handleWritableIOCallback); | |
544 | } | |
9eb5394a | 545 | } |
e82bf80f | 546 | catch (const std::exception& e) { |
f05cd66c RG |
547 | vinfolog("Exception while trying to write (send) to HTTP backend connection: %s", e.what()); |
548 | conn->handleIOError(); | |
eec63896 | 549 | ++conn->d_ds->tcpDiedSendingQuery; |
9eb5394a RG |
550 | } |
551 | } | |
552 | ||
d523caf3 RG |
553 | return length; |
554 | } | |
555 | ||
d9fffb37 RG |
556 | int DoHConnectionToBackend::on_frame_recv_callback(nghttp2_session* session, const nghttp2_frame* frame, void* user_data) |
557 | { | |
9eb5394a | 558 | DoHConnectionToBackend* conn = reinterpret_cast<DoHConnectionToBackend*>(user_data); |
f05cd66c | 559 | // cerr<<"Frame type is "<<std::to_string(frame->hd.type)<<endl; |
e82bf80f | 560 | #if 0 |
d523caf3 RG |
561 | switch (frame->hd.type) { |
562 | case NGHTTP2_HEADERS: | |
9eb5394a | 563 | cerr<<"got headers"<<endl; |
d523caf3 RG |
564 | if (frame->headers.cat == NGHTTP2_HCAT_RESPONSE) { |
565 | cerr<<"All headers received"<<endl; | |
566 | } | |
567 | break; | |
568 | case NGHTTP2_WINDOW_UPDATE: | |
569 | cerr<<"got window update"<<endl; | |
570 | break; | |
571 | case NGHTTP2_SETTINGS: | |
572 | cerr<<"got settings"<<endl; | |
573 | cerr<<frame->settings.niv<<endl; | |
574 | for (size_t idx = 0; idx < frame->settings.niv; idx++) { | |
575 | cerr<<"- "<<frame->settings.iv[idx].settings_id<<" "<<frame->settings.iv[idx].value<<endl; | |
576 | } | |
577 | break; | |
9eb5394a RG |
578 | case NGHTTP2_DATA: |
579 | cerr<<"got data"<<endl; | |
580 | break; | |
9eb5394a | 581 | } |
e82bf80f | 582 | #endif |
9eb5394a | 583 | |
becad613 RG |
584 | if (frame->hd.type == NGHTTP2_GOAWAY) { |
585 | conn->d_connectionDied = true; | |
586 | } | |
587 | ||
9eb5394a | 588 | /* is this the last frame for this stream? */ |
0b2b0041 | 589 | else if ((frame->hd.type == NGHTTP2_HEADERS || frame->hd.type == NGHTTP2_DATA) && frame->hd.flags & NGHTTP2_FLAG_END_STREAM) { |
9eb5394a RG |
590 | auto stream = conn->d_currentStreams.find(frame->hd.stream_id); |
591 | if (stream != conn->d_currentStreams.end()) { | |
f05cd66c | 592 | // cerr<<"Stream "<<frame->hd.stream_id<<" is now finished"<<endl; |
9eb5394a | 593 | stream->second.d_finished = true; |
eec63896 | 594 | ++conn->d_queries; |
9eb5394a RG |
595 | |
596 | auto request = std::move(stream->second); | |
597 | conn->d_currentStreams.erase(stream->first); | |
e82bf80f RG |
598 | if (request.d_responseCode == 200U) { |
599 | conn->handleResponse(std::move(request)); | |
d9fffb37 RG |
600 | } |
601 | else { | |
e82bf80f | 602 | vinfolog("HTTP response has a non-200 status code: %d", request.d_responseCode); |
35b27ac8 RG |
603 | struct timeval now |
604 | { | |
605 | .tv_sec = 0, .tv_usec = 0 | |
606 | }; | |
607 | ||
e82bf80f RG |
608 | gettimeofday(&now, nullptr); |
609 | ||
610 | conn->handleResponseError(std::move(request), now); | |
611 | } | |
becad613 | 612 | |
54a9a226 | 613 | if (conn->isIdle()) { |
e82bf80f | 614 | conn->stopIO(); |
becad613 | 615 | conn->watchForRemoteHostClosingConnection(); |
e82bf80f | 616 | } |
9eb5394a RG |
617 | } |
618 | else { | |
e82bf80f RG |
619 | vinfolog("Stream %d NOT FOUND", frame->hd.stream_id); |
620 | conn->d_connectionDied = true; | |
eec63896 | 621 | ++conn->d_ds->tcpDiedReadingResponse; |
e82bf80f | 622 | return NGHTTP2_ERR_CALLBACK_FAILURE; |
9eb5394a | 623 | } |
d523caf3 RG |
624 | } |
625 | ||
626 | return 0; | |
627 | } | |
628 | ||
d9fffb37 RG |
629 | int DoHConnectionToBackend::on_data_chunk_recv_callback(nghttp2_session* session, uint8_t flags, int32_t stream_id, const uint8_t* data, size_t len, void* user_data) |
630 | { | |
9eb5394a | 631 | DoHConnectionToBackend* conn = reinterpret_cast<DoHConnectionToBackend*>(user_data); |
f05cd66c | 632 | // cerr<<"Got data of size "<<len<<" for stream "<<stream_id<<endl; |
9eb5394a RG |
633 | auto stream = conn->d_currentStreams.find(stream_id); |
634 | if (stream == conn->d_currentStreams.end()) { | |
e82bf80f RG |
635 | vinfolog("Unable to match the stream ID %d to a known one!", stream_id); |
636 | conn->d_connectionDied = true; | |
eec63896 | 637 | ++conn->d_ds->tcpDiedReadingResponse; |
e82bf80f RG |
638 | return NGHTTP2_ERR_CALLBACK_FAILURE; |
639 | } | |
640 | if (len > std::numeric_limits<uint16_t>::max() || (std::numeric_limits<uint16_t>::max() - stream->second.d_buffer.size()) < len) { | |
641 | vinfolog("Data frame of size %d is too large for a DNS response (we already have %d)", len, stream->second.d_buffer.size()); | |
642 | conn->d_connectionDied = true; | |
eec63896 | 643 | ++conn->d_ds->tcpDiedReadingResponse; |
9eb5394a RG |
644 | return NGHTTP2_ERR_CALLBACK_FAILURE; |
645 | } | |
e82bf80f | 646 | |
9eb5394a RG |
647 | stream->second.d_buffer.insert(stream->second.d_buffer.end(), data, data + len); |
648 | if (stream->second.d_finished) { | |
becad613 RG |
649 | // cerr<<"we now have the full response!"<<endl; |
650 | // cerr<<std::string(reinterpret_cast<const char*>(data), len)<<endl; | |
e82bf80f | 651 | |
9eb5394a RG |
652 | auto request = std::move(stream->second); |
653 | conn->d_currentStreams.erase(stream->first); | |
e82bf80f RG |
654 | if (request.d_responseCode == 200U) { |
655 | conn->handleResponse(std::move(request)); | |
d9fffb37 RG |
656 | } |
657 | else { | |
e82bf80f | 658 | vinfolog("HTTP response has a non-200 status code: %d", request.d_responseCode); |
35b27ac8 RG |
659 | struct timeval now |
660 | { | |
661 | .tv_sec = 0, .tv_usec = 0 | |
662 | }; | |
663 | ||
e82bf80f RG |
664 | gettimeofday(&now, nullptr); |
665 | ||
666 | conn->handleResponseError(std::move(request), now); | |
667 | } | |
54a9a226 | 668 | if (conn->isIdle()) { |
e82bf80f | 669 | conn->stopIO(); |
becad613 | 670 | conn->watchForRemoteHostClosingConnection(); |
e82bf80f | 671 | } |
9eb5394a | 672 | } |
9eb5394a | 673 | |
d523caf3 RG |
674 | return 0; |
675 | } | |
676 | ||
d9fffb37 RG |
677 | int DoHConnectionToBackend::on_stream_close_callback(nghttp2_session* session, int32_t stream_id, uint32_t error_code, void* user_data) |
678 | { | |
e82bf80f RG |
679 | DoHConnectionToBackend* conn = reinterpret_cast<DoHConnectionToBackend*>(user_data); |
680 | ||
681 | if (error_code == 0) { | |
682 | return 0; | |
683 | } | |
d523caf3 | 684 | |
f05cd66c | 685 | // cerr << "Stream " << stream_id << " closed with error_code=" << error_code << endl; |
e82bf80f | 686 | conn->d_connectionDied = true; |
eec63896 | 687 | ++conn->d_ds->tcpDiedReadingResponse; |
e82bf80f RG |
688 | |
689 | auto stream = conn->d_currentStreams.find(stream_id); | |
690 | if (stream == conn->d_currentStreams.end()) { | |
691 | /* we don't care, then */ | |
e82bf80f | 692 | return 0; |
d523caf3 RG |
693 | } |
694 | ||
35b27ac8 RG |
695 | struct timeval now |
696 | { | |
697 | .tv_sec = 0, .tv_usec = 0 | |
698 | }; | |
699 | ||
e82bf80f RG |
700 | gettimeofday(&now, nullptr); |
701 | auto request = std::move(stream->second); | |
702 | conn->d_currentStreams.erase(stream->first); | |
d523caf3 | 703 | |
f05cd66c | 704 | // cerr<<"Query has "<<request.d_query.d_downstreamFailures<<" failures, backend limit is "<<conn->d_ds->d_retries<<endl; |
8cf75ac2 | 705 | if (request.d_query.d_downstreamFailures < conn->d_ds->d_config.d_retries) { |
f05cd66c RG |
706 | // cerr<<"in "<<__PRETTY_FUNCTION__<<", looking for a connection to send a query of size "<<request.d_query.d_buffer.size()<<endl; |
707 | ++request.d_query.d_downstreamFailures; | |
9fd8dc1f | 708 | auto downstream = t_downstreamDoHConnectionsManager.getConnectionToDownstream(conn->d_mplexer, conn->d_ds, now, std::string(conn->d_proxyProtocolPayload)); |
f05cd66c RG |
709 | downstream->queueQuery(request.d_sender, std::move(request.d_query)); |
710 | } | |
711 | else { | |
712 | conn->handleResponseError(std::move(request), now); | |
713 | } | |
d523caf3 | 714 | |
f468a7fe | 715 | // cerr<<"we now have "<<conn->getConcurrentStreamsCount()<<" concurrent connections"<<endl; |
54a9a226 | 716 | if (conn->isIdle()) { |
f468a7fe | 717 | // cerr<<"stopping IO"<<endl; |
e82bf80f | 718 | conn->stopIO(); |
becad613 | 719 | conn->watchForRemoteHostClosingConnection(); |
d523caf3 | 720 | } |
e82bf80f | 721 | |
d523caf3 RG |
722 | return 0; |
723 | } | |
724 | ||
d9fffb37 RG |
725 | int DoHConnectionToBackend::on_header_callback(nghttp2_session* session, const nghttp2_frame* frame, const uint8_t* name, size_t namelen, const uint8_t* value, size_t valuelen, uint8_t flags, void* user_data) |
726 | { | |
e82bf80f | 727 | DoHConnectionToBackend* conn = reinterpret_cast<DoHConnectionToBackend*>(user_data); |
d523caf3 | 728 | |
e82bf80f | 729 | const std::string status(":status"); |
5d662a4e | 730 | if (frame->hd.type == NGHTTP2_HEADERS && frame->headers.cat == NGHTTP2_HCAT_RESPONSE) { |
f468a7fe RG |
731 | // cerr<<"got header for "<<frame->hd.stream_id<<":"<<endl; |
732 | // cerr<<"- "<<std::string(reinterpret_cast<const char*>(name), namelen)<<endl; | |
733 | // cerr<<"- "<<std::string(reinterpret_cast<const char*>(value), valuelen)<<endl; | |
5d662a4e RG |
734 | if (namelen == status.size() && memcmp(status.data(), name, status.size()) == 0) { |
735 | auto stream = conn->d_currentStreams.find(frame->hd.stream_id); | |
736 | if (stream == conn->d_currentStreams.end()) { | |
737 | vinfolog("Unable to match the stream ID %d to a known one!", frame->hd.stream_id); | |
738 | conn->d_connectionDied = true; | |
739 | return NGHTTP2_ERR_CALLBACK_FAILURE; | |
740 | } | |
741 | try { | |
a0383aad | 742 | pdns::checked_stoi_into(stream->second.d_responseCode, std::string(reinterpret_cast<const char*>(value), valuelen)); |
5d662a4e RG |
743 | } |
744 | catch (...) { | |
745 | vinfolog("Error parsing the status header for stream ID %d", frame->hd.stream_id); | |
746 | conn->d_connectionDied = true; | |
eec63896 | 747 | ++conn->d_ds->tcpDiedReadingResponse; |
5d662a4e | 748 | return NGHTTP2_ERR_CALLBACK_FAILURE; |
d523caf3 | 749 | } |
d523caf3 RG |
750 | } |
751 | } | |
e82bf80f | 752 | return 0; |
d523caf3 RG |
753 | } |
754 | ||
d9fffb37 RG |
755 | int DoHConnectionToBackend::on_error_callback(nghttp2_session* session, int lib_error_code, const char* msg, size_t len, void* user_data) |
756 | { | |
e82bf80f | 757 | vinfolog("Error in HTTP/2 connection: %s", std::string(msg, len)); |
d523caf3 | 758 | |
e82bf80f RG |
759 | DoHConnectionToBackend* conn = reinterpret_cast<DoHConnectionToBackend*>(user_data); |
760 | conn->d_connectionDied = true; | |
eec63896 | 761 | ++conn->d_ds->tcpDiedReadingResponse; |
d523caf3 | 762 | |
e82bf80f | 763 | return 0; |
d523caf3 | 764 | } |
9eb5394a | 765 | |
9fd8dc1f | 766 | DoHConnectionToBackend::DoHConnectionToBackend(const std::shared_ptr<DownstreamState>& ds, std::unique_ptr<FDMultiplexer>& mplexer, const struct timeval& now, std::string&& proxyProtocolPayload) : |
645a1ca4 | 767 | ConnectionToBackend(ds, mplexer, now), d_proxyProtocolPayload(std::move(proxyProtocolPayload)) |
9eb5394a | 768 | { |
645a1ca4 | 769 | // inherit most of the stuff from the ConnectionToBackend() |
9eb5394a RG |
770 | d_ioState = make_unique<IOStateHandler>(*d_mplexer, d_handler->getDescriptor()); |
771 | ||
772 | nghttp2_session_callbacks* cbs = nullptr; | |
773 | if (nghttp2_session_callbacks_new(&cbs) != 0) { | |
e82bf80f | 774 | d_connectionDied = true; |
eec63896 | 775 | ++d_ds->tcpDiedSendingQuery; |
e82bf80f | 776 | vinfolog("Unable to create a callback object for a new HTTP/2 session"); |
9eb5394a RG |
777 | return; |
778 | } | |
d9fffb37 | 779 | std::unique_ptr<nghttp2_session_callbacks, void (*)(nghttp2_session_callbacks*)> callbacks(cbs, nghttp2_session_callbacks_del); |
9eb5394a RG |
780 | cbs = nullptr; |
781 | ||
782 | nghttp2_session_callbacks_set_send_callback(callbacks.get(), send_callback); | |
783 | nghttp2_session_callbacks_set_on_frame_recv_callback(callbacks.get(), on_frame_recv_callback); | |
784 | nghttp2_session_callbacks_set_on_data_chunk_recv_callback(callbacks.get(), on_data_chunk_recv_callback); | |
785 | nghttp2_session_callbacks_set_on_stream_close_callback(callbacks.get(), on_stream_close_callback); | |
786 | nghttp2_session_callbacks_set_on_header_callback(callbacks.get(), on_header_callback); | |
9eb5394a RG |
787 | nghttp2_session_callbacks_set_error_callback2(callbacks.get(), on_error_callback); |
788 | ||
789 | nghttp2_session* sess = nullptr; | |
790 | if (nghttp2_session_client_new(&sess, callbacks.get(), this) != 0) { | |
e82bf80f | 791 | d_connectionDied = true; |
eec63896 | 792 | ++d_ds->tcpDiedSendingQuery; |
e82bf80f | 793 | vinfolog("Coult not allocate a new HTTP/2 session"); |
9eb5394a RG |
794 | return; |
795 | } | |
796 | ||
d9fffb37 | 797 | d_session = std::unique_ptr<nghttp2_session, void (*)(nghttp2_session*)>(sess, nghttp2_session_del); |
9eb5394a RG |
798 | sess = nullptr; |
799 | ||
800 | callbacks.reset(); | |
801 | ||
9eb5394a | 802 | nghttp2_settings_entry iv[] = { |
e82bf80f RG |
803 | /* rfc7540 section-8.2.2: |
804 | "Advertising a SETTINGS_MAX_CONCURRENT_STREAMS value of zero disables | |
805 | server push by preventing the server from creating the necessary | |
806 | streams." | |
807 | */ | |
808 | {NGHTTP2_SETTINGS_MAX_CONCURRENT_STREAMS, 0}, | |
9eb5394a | 809 | {NGHTTP2_SETTINGS_ENABLE_PUSH, 0}, |
e82bf80f | 810 | /* we might want to make the initial window size configurable, but 16M is a large enough default */ |
d9fffb37 RG |
811 | {NGHTTP2_SETTINGS_INITIAL_WINDOW_SIZE, 16 * 1024 * 1024}}; |
812 | /* client 24 bytes magic string will be sent by nghttp2 library */ | |
813 | int rv = nghttp2_submit_settings(d_session.get(), NGHTTP2_FLAG_NONE, iv, sizeof(iv) / sizeof(*iv)); | |
9eb5394a | 814 | if (rv != 0) { |
e82bf80f | 815 | d_connectionDied = true; |
eec63896 | 816 | ++d_ds->tcpDiedSendingQuery; |
e82bf80f | 817 | vinfolog("Could not submit SETTINGS: %s", nghttp2_strerror(rv)); |
9eb5394a RG |
818 | return; |
819 | } | |
820 | } | |
821 | ||
9eb5394a RG |
822 | static void handleCrossProtocolQuery(int pipefd, FDMultiplexer::funcparam_t& param) |
823 | { | |
824 | auto threadData = boost::any_cast<DoHClientThreadData*>(param); | |
9eb5394a | 825 | |
0a5cb883 RG |
826 | std::unique_ptr<CrossProtocolQuery> cpq{nullptr}; |
827 | try { | |
828 | auto tmp = threadData->d_receiver.receive(); | |
829 | if (!tmp) { | |
9eb5394a RG |
830 | return; |
831 | } | |
0a5cb883 | 832 | cpq = std::move(*tmp); |
9eb5394a | 833 | } |
0a5cb883 RG |
834 | catch (const std::exception& e) { |
835 | throw std::runtime_error("Error while reading from the DoH cross-protocol channel:" + std::string(e.what())); | |
9eb5394a RG |
836 | } |
837 | ||
35b27ac8 RG |
838 | struct timeval now |
839 | { | |
840 | .tv_sec = 0, .tv_usec = 0 | |
841 | }; | |
0a5cb883 | 842 | gettimeofday(&now, nullptr); |
9eb5394a | 843 | |
0a5cb883 RG |
844 | std::shared_ptr<TCPQuerySender> tqs = cpq->getTCPQuerySender(); |
845 | auto query = std::move(cpq->query); | |
846 | auto downstreamServer = std::move(cpq->downstream); | |
847 | cpq.reset(); | |
9eb5394a | 848 | |
0a5cb883 RG |
849 | try { |
850 | auto downstream = t_downstreamDoHConnectionsManager.getConnectionToDownstream(threadData->mplexer, downstreamServer, now, std::move(query.d_proxyProtocolPayload)); | |
851 | downstream->queueQuery(tqs, std::move(query)); | |
9eb5394a RG |
852 | } |
853 | catch (...) { | |
7e8a05fa RG |
854 | TCPResponse response(std::move(query)); |
855 | tqs->notifyIOError(now, std::move(response)); | |
9eb5394a RG |
856 | } |
857 | } | |
858 | ||
0a5cb883 | 859 | static void dohClientThread(pdns::channel::Receiver<CrossProtocolQuery>&& receiver) |
9eb5394a RG |
860 | { |
861 | setThreadName("dnsdist/dohClie"); | |
862 | ||
55a2979f | 863 | try { |
0a5cb883 RG |
864 | DoHClientThreadData data(std::move(receiver)); |
865 | data.mplexer->addReadFD(data.d_receiver.getDescriptor(), handleCrossProtocolQuery, &data); | |
9eb5394a | 866 | |
35b27ac8 RG |
867 | struct timeval now |
868 | { | |
869 | .tv_sec = 0, .tv_usec = 0 | |
870 | }; | |
871 | ||
55a2979f RG |
872 | gettimeofday(&now, nullptr); |
873 | time_t lastTimeoutScan = now.tv_sec; | |
874 | ||
875 | for (;;) { | |
ab8eb008 | 876 | data.mplexer->run(&now, 1000); |
55a2979f RG |
877 | |
878 | if (now.tv_sec > lastTimeoutScan) { | |
879 | lastTimeoutScan = now.tv_sec; | |
880 | ||
881 | try { | |
9fd8dc1f | 882 | t_downstreamDoHConnectionsManager.cleanupClosedConnections(now); |
55a2979f RG |
883 | handleH2Timeouts(*data.mplexer, now); |
884 | ||
885 | if (g_dohStatesDumpRequested > 0) { | |
886 | /* just to keep things clean in the output, debug only */ | |
887 | static std::mutex s_lock; | |
888 | std::lock_guard<decltype(s_lock)> lck(s_lock); | |
889 | if (g_dohStatesDumpRequested > 0) { | |
890 | /* no race here, we took the lock so it can only be increased in the meantime */ | |
891 | --g_dohStatesDumpRequested; | |
f8e1161f | 892 | infolog("Dumping the DoH client states, as requested:"); |
55a2979f RG |
893 | data.mplexer->runForAllWatchedFDs([](bool isRead, int fd, const FDMultiplexer::funcparam_t& param, struct timeval ttd) { |
894 | struct timeval lnow; | |
895 | gettimeofday(&lnow, nullptr); | |
896 | if (ttd.tv_sec > 0) { | |
f8e1161f | 897 | infolog("- Descriptor %d is in %s state, TTD in %d", fd, (isRead ? "read" : "write"), (ttd.tv_sec - lnow.tv_sec)); |
55a2979f RG |
898 | } |
899 | else { | |
f8e1161f | 900 | infolog("- Descriptor %d is in %s state, no TTD set", fd, (isRead ? "read" : "write")); |
55a2979f RG |
901 | } |
902 | ||
903 | if (param.type() == typeid(std::shared_ptr<DoHConnectionToBackend>)) { | |
904 | auto conn = boost::any_cast<std::shared_ptr<DoHConnectionToBackend>>(param); | |
f8e1161f | 905 | infolog(" - %s", conn->toString()); |
55a2979f RG |
906 | } |
907 | else if (param.type() == typeid(DoHClientThreadData*)) { | |
f8e1161f | 908 | infolog(" - Worker thread pipe"); |
55a2979f RG |
909 | } |
910 | }); | |
f8e1161f | 911 | infolog("The DoH client cache has %d active and %d idle outgoing connections cached", t_downstreamDoHConnectionsManager.getActiveCount(), t_downstreamDoHConnectionsManager.getIdleCount()); |
9eb5394a | 912 | } |
55a2979f RG |
913 | } |
914 | } | |
915 | catch (const std::exception& e) { | |
f8e1161f | 916 | warnlog("Error in outgoing DoH thread: %s", e.what()); |
9eb5394a RG |
917 | } |
918 | } | |
919 | } | |
920 | } | |
55a2979f RG |
921 | catch (const std::exception& e) { |
922 | errlog("Fatal error in outgoing DoH thread: %s", e.what()); | |
923 | } | |
9eb5394a RG |
924 | } |
925 | ||
d9fffb37 RG |
926 | static bool select_next_proto_callback(unsigned char** out, unsigned char* outlen, const unsigned char* in, unsigned int inlen) |
927 | { | |
6e069889 RG |
928 | if (nghttp2_select_next_protocol(out, outlen, in, inlen) <= 0) { |
929 | vinfolog("The remote DoH backend did not advertise " NGHTTP2_PROTO_VERSION_ID); | |
930 | return false; | |
931 | } | |
932 | return true; | |
933 | } | |
934 | ||
cf25b82b | 935 | #endif /* HAVE_DNS_OVER_HTTPS && HAVE_NGHTTP2 */ |
6e069889 RG |
936 | |
937 | struct DoHClientCollection::DoHWorkerThread | |
938 | { | |
939 | DoHWorkerThread() | |
940 | { | |
941 | } | |
942 | ||
0a5cb883 RG |
943 | DoHWorkerThread(pdns::channel::Sender<CrossProtocolQuery>&& sender) : |
944 | d_sender(std::move(sender)) | |
6e069889 RG |
945 | { |
946 | } | |
947 | ||
d9fffb37 | 948 | DoHWorkerThread(DoHWorkerThread&& rhs) : |
0a5cb883 | 949 | d_sender(std::move(rhs.d_sender)) |
6e069889 | 950 | { |
6e069889 RG |
951 | } |
952 | ||
953 | DoHWorkerThread& operator=(DoHWorkerThread&& rhs) | |
954 | { | |
0a5cb883 | 955 | d_sender = std::move(rhs.d_sender); |
6e069889 RG |
956 | return *this; |
957 | } | |
958 | ||
959 | DoHWorkerThread(const DoHWorkerThread& rhs) = delete; | |
960 | DoHWorkerThread& operator=(const DoHWorkerThread&) = delete; | |
961 | ||
0a5cb883 | 962 | pdns::channel::Sender<CrossProtocolQuery> d_sender; |
6e069889 RG |
963 | }; |
964 | ||
168ef594 RG |
965 | DoHClientCollection::DoHClientCollection(size_t numberOfThreads) : |
966 | d_clientThreads(numberOfThreads) | |
6e069889 RG |
967 | { |
968 | } | |
969 | ||
970 | bool DoHClientCollection::passCrossProtocolQueryToThread(std::unique_ptr<CrossProtocolQuery>&& cpq) | |
971 | { | |
972 | if (d_numberOfThreads == 0) { | |
973 | throw std::runtime_error("No DoH worker thread yet"); | |
974 | } | |
975 | ||
976 | uint64_t pos = d_pos++; | |
0a5cb883 | 977 | if (!d_clientThreads.at(pos % d_numberOfThreads).d_sender.send(std::move(cpq))) { |
6ba8d6ca | 978 | ++dnsdist::metrics::g_stats.outgoingDoHQueryPipeFull; |
6e069889 RG |
979 | return false; |
980 | } | |
981 | ||
982 | return true; | |
983 | } | |
984 | ||
9eb5394a RG |
985 | void DoHClientCollection::addThread() |
986 | { | |
cf25b82b | 987 | #if defined(HAVE_DNS_OVER_HTTPS) && defined(HAVE_NGHTTP2) |
0a5cb883 | 988 | try { |
c1d76521 | 989 | auto [sender, receiver] = pdns::channel::createObjectQueue<CrossProtocolQuery>(pdns::channel::SenderBlockingMode::SenderNonBlocking, pdns::channel::ReceiverBlockingMode::ReceiverNonBlocking, g_tcpInternalPipeBufferSize); |
9eb5394a | 990 | |
0a5cb883 | 991 | vinfolog("Adding DoH Client thread"); |
9eb5394a RG |
992 | std::lock_guard<std::mutex> lock(d_mutex); |
993 | ||
994 | if (d_numberOfThreads >= d_clientThreads.size()) { | |
168ef594 | 995 | vinfolog("Adding a new DoH client thread would exceed the vector size (%d/%d), skipping. Consider increasing the maximum amount of DoH client threads with setMaxDoHClientThreads() in the configuration.", d_numberOfThreads, d_clientThreads.size()); |
9eb5394a RG |
996 | return; |
997 | } | |
998 | ||
0a5cb883 | 999 | DoHWorkerThread worker(std::move(sender)); |
9eb5394a | 1000 | try { |
0a5cb883 | 1001 | std::thread t1(dohClientThread, std::move(receiver)); |
9eb5394a RG |
1002 | t1.detach(); |
1003 | } | |
1004 | catch (const std::runtime_error& e) { | |
0a5cb883 | 1005 | /* the thread creation failed */ |
9eb5394a | 1006 | errlog("Error creating a DoH thread: %s", e.what()); |
9eb5394a RG |
1007 | return; |
1008 | } | |
1009 | ||
1010 | d_clientThreads.at(d_numberOfThreads) = std::move(worker); | |
1011 | ++d_numberOfThreads; | |
1012 | } | |
0a5cb883 RG |
1013 | catch (const std::exception& e) { |
1014 | errlog("Error creating the DoH channel: %s", e.what()); | |
1015 | return; | |
1016 | } | |
cf25b82b | 1017 | #else /* HAVE_DNS_OVER_HTTPS && HAVE_NGHTTP2 */ |
6e069889 | 1018 | throw std::runtime_error("DoHClientCollection::addThread() called but nghttp2 support is not available"); |
cf25b82b | 1019 | #endif /* HAVE_DNS_OVER_HTTPS && HAVE_NGHTTP2 */ |
9eb5394a RG |
1020 | } |
1021 | ||
1022 | bool initDoHWorkers() | |
1023 | { | |
cf25b82b | 1024 | #if defined(HAVE_DNS_OVER_HTTPS) && defined(HAVE_NGHTTP2) |
b146a84f RG |
1025 | if (!g_outgoingDoHWorkerThreads) { |
1026 | /* Unless the value has been set to 0 explicitly, always start at least one outgoing DoH worker thread, in case a DoH backend | |
1027 | is added at a later time. */ | |
1028 | g_outgoingDoHWorkerThreads = 1; | |
1029 | } | |
1030 | ||
1031 | if (g_outgoingDoHWorkerThreads && *g_outgoingDoHWorkerThreads > 0) { | |
1032 | g_dohClientThreads = std::make_unique<DoHClientCollection>(*g_outgoingDoHWorkerThreads); | |
1033 | for (size_t idx = 0; idx < *g_outgoingDoHWorkerThreads; idx++) { | |
eec63896 RG |
1034 | g_dohClientThreads->addThread(); |
1035 | } | |
47dc5b5f | 1036 | } |
9eb5394a | 1037 | return true; |
6e069889 RG |
1038 | #else |
1039 | return false; | |
cf25b82b | 1040 | #endif /* HAVE_DNS_OVER_HTTPS && HAVE_NGHTTP2 */ |
e82bf80f RG |
1041 | } |
1042 | ||
1043 | bool setupDoHClientProtocolNegotiation(std::shared_ptr<TLSCtx>& ctx) | |
1044 | { | |
1045 | if (ctx == nullptr) { | |
1046 | return false; | |
1047 | } | |
cf25b82b | 1048 | #if defined(HAVE_DNS_OVER_HTTPS) && defined(HAVE_NGHTTP2) |
e82bf80f RG |
1049 | /* we want to set the ALPN to h2, if only to mitigate the ALPACA attack */ |
1050 | const std::vector<std::vector<uint8_t>> h2Alpns = {{'h', '2'}}; | |
1051 | ctx->setALPNProtos(h2Alpns); | |
1052 | ctx->setNextProtocolSelectCallback(select_next_proto_callback); | |
1053 | return true; | |
cf25b82b | 1054 | #else /* HAVE_DNS_OVER_HTTPS && HAVE_NGHTTP2 */ |
6e069889 | 1055 | return false; |
cf25b82b | 1056 | #endif /* HAVE_DNS_OVER_HTTPS && HAVE_NGHTTP2 */ |
e82bf80f | 1057 | } |
8229d4f0 | 1058 | |
ae3b96d9 | 1059 | bool sendH2Query(const std::shared_ptr<DownstreamState>& ds, std::unique_ptr<FDMultiplexer>& mplexer, std::shared_ptr<TCPQuerySender>& sender, InternalQuery&& query, bool healthCheck) |
8229d4f0 | 1060 | { |
cf25b82b | 1061 | #if defined(HAVE_DNS_OVER_HTTPS) && defined(HAVE_NGHTTP2) |
35b27ac8 RG |
1062 | struct timeval now |
1063 | { | |
1064 | .tv_sec = 0, .tv_usec = 0 | |
1065 | }; | |
8229d4f0 RG |
1066 | gettimeofday(&now, nullptr); |
1067 | ||
f05cd66c RG |
1068 | if (healthCheck) { |
1069 | /* always do health-checks over a new connection */ | |
0e6892c6 | 1070 | auto newConnection = std::make_shared<DoHConnectionToBackend>(ds, mplexer, now, std::move(query.d_proxyProtocolPayload)); |
f05cd66c RG |
1071 | newConnection->setHealthCheck(healthCheck); |
1072 | newConnection->queueQuery(sender, std::move(query)); | |
1073 | } | |
1074 | else { | |
9fd8dc1f | 1075 | auto connection = t_downstreamDoHConnectionsManager.getConnectionToDownstream(mplexer, ds, now, std::move(query.d_proxyProtocolPayload)); |
f05cd66c RG |
1076 | connection->queueQuery(sender, std::move(query)); |
1077 | } | |
1078 | ||
8229d4f0 | 1079 | return true; |
cf25b82b | 1080 | #else /* HAVE_DNS_OVER_HTTPS && HAVE_NGHTTP2 */ |
6e069889 | 1081 | return false; |
cf25b82b | 1082 | #endif /* HAVE_DNS_OVER_HTTPS && HAVE_NGHTTP2 */ |
8229d4f0 | 1083 | } |
f05cd66c RG |
1084 | |
1085 | size_t clearH2Connections() | |
1086 | { | |
1087 | size_t cleared = 0; | |
cf25b82b | 1088 | #if defined(HAVE_DNS_OVER_HTTPS) && defined(HAVE_NGHTTP2) |
9fd8dc1f | 1089 | cleared = t_downstreamDoHConnectionsManager.clear(); |
cf25b82b | 1090 | #endif /* HAVE_DNS_OVER_HTTPS && HAVE_NGHTTP2 */ |
f05cd66c RG |
1091 | return cleared; |
1092 | } | |
1093 | ||
1094 | size_t handleH2Timeouts(FDMultiplexer& mplexer, const struct timeval& now) | |
1095 | { | |
1096 | size_t got = 0; | |
cf25b82b | 1097 | #if defined(HAVE_DNS_OVER_HTTPS) && defined(HAVE_NGHTTP2) |
f05cd66c RG |
1098 | auto expiredReadConns = mplexer.getTimeouts(now, false); |
1099 | for (const auto& cbData : expiredReadConns) { | |
1100 | if (cbData.second.type() == typeid(std::shared_ptr<DoHConnectionToBackend>)) { | |
1101 | auto conn = boost::any_cast<std::shared_ptr<DoHConnectionToBackend>>(cbData.second); | |
1102 | vinfolog("Timeout (read) from remote DoH backend %s", conn->getBackendName()); | |
1103 | conn->handleTimeout(now, false); | |
1104 | ++got; | |
1105 | } | |
1106 | } | |
1107 | ||
1108 | auto expiredWriteConns = mplexer.getTimeouts(now, true); | |
1109 | for (const auto& cbData : expiredWriteConns) { | |
1110 | if (cbData.second.type() == typeid(std::shared_ptr<DoHConnectionToBackend>)) { | |
1111 | auto conn = boost::any_cast<std::shared_ptr<DoHConnectionToBackend>>(cbData.second); | |
1112 | vinfolog("Timeout (write) from remote DoH backend %s", conn->getBackendName()); | |
1113 | conn->handleTimeout(now, true); | |
1114 | ++got; | |
1115 | } | |
1116 | } | |
cf25b82b | 1117 | #endif /* HAVE_DNS_OVER_HTTPS && HAVE_NGHTTP2 */ |
f05cd66c RG |
1118 | return got; |
1119 | } | |
767f5514 RG |
1120 | |
1121 | void setDoHDownstreamCleanupInterval(uint16_t max) | |
1122 | { | |
cf25b82b | 1123 | #if defined(HAVE_DNS_OVER_HTTPS) && defined(HAVE_NGHTTP2) |
767f5514 | 1124 | DownstreamDoHConnectionsManager::setCleanupInterval(max); |
cf25b82b | 1125 | #endif /* HAVE_DNS_OVER_HTTPS && HAVE_NGHTTP2 */ |
767f5514 RG |
1126 | } |
1127 | ||
1128 | void setDoHDownstreamMaxIdleTime(uint16_t max) | |
1129 | { | |
cf25b82b | 1130 | #if defined(HAVE_DNS_OVER_HTTPS) && defined(HAVE_NGHTTP2) |
767f5514 | 1131 | DownstreamDoHConnectionsManager::setMaxIdleTime(max); |
cf25b82b | 1132 | #endif /* HAVE_DNS_OVER_HTTPS && HAVE_NGHTTP2 */ |
767f5514 RG |
1133 | } |
1134 | ||
54a9a226 | 1135 | void setDoHDownstreamMaxIdleConnectionsPerBackend(size_t max) |
767f5514 | 1136 | { |
cf25b82b | 1137 | #if defined(HAVE_DNS_OVER_HTTPS) && defined(HAVE_NGHTTP2) |
54a9a226 | 1138 | DownstreamDoHConnectionsManager::setMaxIdleConnectionsPerDownstream(max); |
cf25b82b | 1139 | #endif /* HAVE_DNS_OVER_HTTPS && HAVE_NGHTTP2 */ |
767f5514 | 1140 | } |