]> git.ipfire.org Git - thirdparty/pdns.git/blame - pdns/dnsdistdist/dnsdist-nghttp2.cc
Merge pull request #13387 from omoerbeek/rec-b-root-servers
[thirdparty/pdns.git] / pdns / dnsdistdist / dnsdist-nghttp2.cc
CommitLineData
9eb5394a
RG
1/*
2 * This file is part of PowerDNS or dnsdist.
3 * Copyright -- PowerDNS.COM B.V. and its contributors
4 *
5 * This program is free software; you can redistribute it and/or modify
6 * it under the terms of version 2 of the GNU General Public License as
7 * published by the Free Software Foundation.
8 *
9 * In addition, for the avoidance of any doubt, permission is granted to
10 * link this program with OpenSSL and to (re)distribute the binaries
11 * produced as the result of such linking.
12 *
13 * This program is distributed in the hope that it will be useful,
14 * but WITHOUT ANY WARRANTY; without even the implied warranty of
15 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16 * GNU General Public License for more details.
17 *
18 * You should have received a copy of the GNU General Public License
19 * along with this program; if not, write to the Free Software
20 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
21 */
d523caf3 22
6e069889
RG
23#include "config.h"
24
cf25b82b 25#if defined(HAVE_DNS_OVER_HTTPS) && defined(HAVE_NGHTTP2)
d523caf3 26#include <nghttp2/nghttp2.h>
cf25b82b 27#endif /* HAVE_DNS_OVER_HTTPS && HAVE_NGHTTP2 */
d523caf3 28
9eb5394a 29#include "dnsdist-nghttp2.hh"
7e8a05fa 30#include "dnsdist-nghttp2-in.hh"
9eb5394a
RG
31#include "dnsdist-tcp.hh"
32#include "dnsdist-tcp-downstream.hh"
8a2dd7db 33#include "dnsdist-downstream-connection.hh"
9eb5394a
RG
34
35#include "dolog.hh"
0a5cb883 36#include "channel.hh"
d523caf3
RG
37#include "iputils.hh"
38#include "libssl.hh"
39#include "noinitvector.hh"
40#include "tcpiohandler.hh"
9eb5394a 41#include "threadname.hh"
d523caf3
RG
42#include "sstuff.hh"
43
9eb5394a
RG
44std::atomic<uint64_t> g_dohStatesDumpRequested{0};
45std::unique_ptr<DoHClientCollection> g_dohClientThreads{nullptr};
b146a84f 46std::optional<uint16_t> g_outgoingDoHWorkerThreads{std::nullopt};
9eb5394a 47
cf25b82b 48#if defined(HAVE_DNS_OVER_HTTPS) && defined(HAVE_NGHTTP2)
645a1ca4 49class DoHConnectionToBackend : public ConnectionToBackend
9eb5394a
RG
50{
51public:
9fd8dc1f 52 DoHConnectionToBackend(const std::shared_ptr<DownstreamState>& ds, std::unique_ptr<FDMultiplexer>& mplexer, const struct timeval& now, std::string&& proxyProtocolPayload);
9eb5394a 53
e82bf80f 54 void handleTimeout(const struct timeval& now, bool write) override;
9eb5394a
RG
55 void queueQuery(std::shared_ptr<TCPQuerySender>& sender, TCPQuery&& query) override;
56
57 std::string toString() const override
58 {
59 ostringstream o;
d9fffb37 60 o << "DoH connection to backend " << (d_ds ? d_ds->getName() : "empty") << " over FD " << (d_handler ? std::to_string(d_handler->getDescriptor()) : "no socket") << ", " << getConcurrentStreamsCount() << " streams";
9eb5394a
RG
61 return o.str();
62 }
63
97b165e1
RG
64 void setHealthCheck(bool h)
65 {
66 d_healthCheckQuery = h;
67 }
68
9fd8dc1f 69 void stopIO() override;
645a1ca4
RG
70 bool reachedMaxConcurrentQueries() const override;
71 bool reachedMaxStreamID() const override;
767f5514 72 bool isIdle() const override;
4ed2c08c
RG
73 void release() override
74 {
75 }
becad613 76
9eb5394a
RG
77private:
78 static ssize_t send_callback(nghttp2_session* session, const uint8_t* data, size_t length, int flags, void* user_data);
79 static int on_frame_recv_callback(nghttp2_session* session, const nghttp2_frame* frame, void* user_data);
80 static int on_data_chunk_recv_callback(nghttp2_session* session, uint8_t flags, int32_t stream_id, const uint8_t* data, size_t len, void* user_data);
81 static int on_stream_close_callback(nghttp2_session* session, int32_t stream_id, uint32_t error_code, void* user_data);
82 static int on_header_callback(nghttp2_session* session, const nghttp2_frame* frame, const uint8_t* name, size_t namelen, const uint8_t* value, size_t valuelen, uint8_t flags, void* user_data);
9eb5394a
RG
83 static int on_error_callback(nghttp2_session* session, int lib_error_code, const char* msg, size_t len, void* user_data);
84 static void handleReadableIOCallback(int fd, FDMultiplexer::funcparam_t& param);
85 static void handleWritableIOCallback(int fd, FDMultiplexer::funcparam_t& param);
9eb5394a
RG
86
87 class PendingRequest
88 {
89 public:
90 std::shared_ptr<TCPQuerySender> d_sender{nullptr};
91 TCPQuery d_query;
92 PacketBuffer d_buffer;
f05cd66c 93 size_t d_queryPos{0};
e82bf80f 94 uint16_t d_responseCode{0};
9eb5394a
RG
95 bool d_finished{false};
96 };
ea090af9 97 void updateIO(IOState newState, FDMultiplexer::callbackfunc_t callback, bool noTTD = false);
becad613 98 void watchForRemoteHostClosingConnection();
9eb5394a 99 void handleResponse(PendingRequest&& request);
e82bf80f 100 void handleResponseError(PendingRequest&& request, const struct timeval& now);
f05cd66c 101 void handleIOError();
e82bf80f 102 uint32_t getConcurrentStreamsCount() const;
9eb5394a 103
e82bf80f
RG
104 size_t getUsageCount() const
105 {
106 auto ref = shared_from_this();
107 return ref.use_count();
108 }
109
110 static const std::unordered_map<std::string, std::string> s_constants;
9eb5394a 111
9eb5394a 112 std::unordered_map<int32_t, PendingRequest> d_currentStreams;
73571c03 113 std::string d_proxyProtocolPayload;
9eb5394a
RG
114 PacketBuffer d_out;
115 PacketBuffer d_in;
73571c03 116 std::unique_ptr<nghttp2_session, void (*)(nghttp2_session*)> d_session{nullptr, nghttp2_session_del};
9eb5394a
RG
117 size_t d_outPos{0};
118 size_t d_inPos{0};
97b165e1 119 bool d_healthCheckQuery{false};
eec63896 120 bool d_firstWrite{true};
9eb5394a
RG
121};
122
9fd8dc1f
RG
123using DownstreamDoHConnectionsManager = DownstreamConnectionsManager<DoHConnectionToBackend>;
124thread_local DownstreamDoHConnectionsManager t_downstreamDoHConnectionsManager;
e82bf80f
RG
125
126uint32_t DoHConnectionToBackend::getConcurrentStreamsCount() const
127{
128 return d_currentStreams.size();
129}
9eb5394a
RG
130
131void DoHConnectionToBackend::handleResponse(PendingRequest&& request)
132{
35b27ac8
RG
133 struct timeval now
134 {
135 .tv_sec = 0, .tv_usec = 0
136 };
137
9eb5394a 138 gettimeofday(&now, nullptr);
f05cd66c 139 try {
0c51513a 140 if (!d_healthCheckQuery) {
d5d15da1 141 const double udiff = request.d_query.d_idstate.queryRealTime.udiff();
3fbea485 142 d_ds->updateTCPLatency(udiff);
e3ab12d9
RG
143 if (request.d_buffer.size() >= sizeof(dnsheader)) {
144 dnsheader dh;
145 memcpy(&dh, request.d_buffer.data(), sizeof(dh));
146 d_ds->reportResponse(dh.rcode);
147 }
148 else {
149 d_ds->reportTimeoutOrError();
150 }
0c51513a
RG
151 }
152
7e8a05fa
RG
153 TCPResponse response(std::move(request.d_query));
154 response.d_buffer = std::move(request.d_buffer);
155 response.d_connection = shared_from_this();
156 response.d_ds = d_ds;
157 request.d_sender->handleResponse(now, std::move(response));
f05cd66c
RG
158 }
159 catch (const std::exception& e) {
160 vinfolog("Got exception while handling response for cross-protocol DoH: %s", e.what());
161 }
9eb5394a
RG
162}
163
e82bf80f
RG
164void DoHConnectionToBackend::handleResponseError(PendingRequest&& request, const struct timeval& now)
165{
f05cd66c 166 try {
93b395a5
RG
167 if (!d_healthCheckQuery) {
168 d_ds->reportTimeoutOrError();
169 }
e3ab12d9 170
7e8a05fa
RG
171 TCPResponse response(PacketBuffer(), std::move(request.d_query.d_idstate), nullptr, nullptr);
172 request.d_sender->notifyIOError(now, std::move(response));
f05cd66c
RG
173 }
174 catch (const std::exception& e) {
175 vinfolog("Got exception while handling response for cross-protocol DoH: %s", e.what());
176 }
e82bf80f
RG
177}
178
f05cd66c 179void DoHConnectionToBackend::handleIOError()
e82bf80f
RG
180{
181 d_connectionDied = true;
f05cd66c
RG
182 nghttp2_session_terminate_session(d_session.get(), NGHTTP2_PROTOCOL_ERROR);
183
35b27ac8
RG
184 struct timeval now
185 {
186 .tv_sec = 0, .tv_usec = 0
187 };
188
f05cd66c 189 gettimeofday(&now, nullptr);
e82bf80f
RG
190 for (auto& request : d_currentStreams) {
191 handleResponseError(std::move(request.second), now);
192 }
f05cd66c 193
e82bf80f 194 d_currentStreams.clear();
f05cd66c
RG
195 stopIO();
196}
197
198void DoHConnectionToBackend::handleTimeout(const struct timeval& now, bool write)
199{
eec63896
RG
200 if (write) {
201 if (d_firstWrite) {
202 ++d_ds->tcpConnectTimeouts;
203 }
204 else {
205 ++d_ds->tcpWriteTimeouts;
206 }
207 }
208 else {
209 ++d_ds->tcpReadTimeouts;
210 }
211
f05cd66c 212 handleIOError();
e82bf80f
RG
213}
214
eec63896
RG
215bool DoHConnectionToBackend::reachedMaxStreamID() const
216{
217 const uint32_t maximumStreamID = (static_cast<uint32_t>(1) << 31) - 1;
218 return d_highestStreamID == maximumStreamID;
219}
220
645a1ca4 221bool DoHConnectionToBackend::reachedMaxConcurrentQueries() const
e82bf80f 222{
f468a7fe 223 // cerr<<"Got "<<getConcurrentStreamsCount()<<" concurrent streams, max is "<<nghttp2_session_get_remote_settings(d_session.get(), NGHTTP2_SETTINGS_MAX_CONCURRENT_STREAMS)<<endl;
e82bf80f 224 if (nghttp2_session_get_remote_settings(d_session.get(), NGHTTP2_SETTINGS_MAX_CONCURRENT_STREAMS) <= getConcurrentStreamsCount()) {
eec63896
RG
225 return true;
226 }
eec63896
RG
227 return false;
228}
229
767f5514
RG
230bool DoHConnectionToBackend::isIdle() const
231{
232 return getConcurrentStreamsCount() == 0;
233}
234
9eb5394a
RG
235void DoHConnectionToBackend::queueQuery(std::shared_ptr<TCPQuerySender>& sender, TCPQuery&& query)
236{
9eb5394a 237 auto payloadSize = std::to_string(query.d_buffer.size());
9eb5394a 238
8cf75ac2 239 bool addXForwarded = d_ds->d_config.d_addXForwardedHeaders;
5a96fcd2
RG
240
241 /* We use nghttp2_nv_flag.NGHTTP2_NV_FLAG_NO_COPY_NAME and nghttp2_nv_flag.NGHTTP2_NV_FLAG_NO_COPY_VALUE
242 to avoid a copy and lowercasing but we need to make sure that the data will outlive the request (nghttp2_on_frame_send_callback called), and that it is already lowercased. */
243 std::vector<nghttp2_nv> headers;
244 // these need to live until after the request headers have been processed
245 std::string remote;
246 std::string remotePort;
247 headers.reserve(8 + (addXForwarded ? 3 : 0));
248
249 /* Pseudo-headers need to come first (rfc7540 8.1.2.1) */
83abf7f6
RG
250 NGHTTP2Headers::addStaticHeader(headers, NGHTTP2Headers::HeaderConstantIndexes::METHOD_NAME, NGHTTP2Headers::HeaderConstantIndexes::METHOD_VALUE);
251 NGHTTP2Headers::addStaticHeader(headers, NGHTTP2Headers::HeaderConstantIndexes::SCHEME_NAME, NGHTTP2Headers::HeaderConstantIndexes::SCHEME_VALUE);
252 NGHTTP2Headers::addDynamicHeader(headers, NGHTTP2Headers::HeaderConstantIndexes::AUTHORITY_NAME, d_ds->d_config.d_tlsSubjectName);
253 NGHTTP2Headers::addDynamicHeader(headers, NGHTTP2Headers::HeaderConstantIndexes::PATH_NAME, d_ds->d_config.d_dohPath);
254 NGHTTP2Headers::addStaticHeader(headers, NGHTTP2Headers::HeaderConstantIndexes::ACCEPT_NAME, NGHTTP2Headers::HeaderConstantIndexes::ACCEPT_VALUE);
255 NGHTTP2Headers::addStaticHeader(headers, NGHTTP2Headers::HeaderConstantIndexes::CONTENT_TYPE_NAME, NGHTTP2Headers::HeaderConstantIndexes::CONTENT_TYPE_VALUE);
256 NGHTTP2Headers::addStaticHeader(headers, NGHTTP2Headers::HeaderConstantIndexes::USER_AGENT_NAME, NGHTTP2Headers::HeaderConstantIndexes::USER_AGENT_VALUE);
257 NGHTTP2Headers::addDynamicHeader(headers, NGHTTP2Headers::HeaderConstantIndexes::CONTENT_LENGTH_NAME, payloadSize);
5a96fcd2 258 /* no need to add these headers for health-check queries */
f05cd66c
RG
259 if (addXForwarded && query.d_idstate.origRemote.getPort() != 0) {
260 remote = query.d_idstate.origRemote.toString();
261 remotePort = std::to_string(query.d_idstate.origRemote.getPort());
83abf7f6
RG
262 NGHTTP2Headers::addDynamicHeader(headers, NGHTTP2Headers::HeaderConstantIndexes::X_FORWARDED_FOR_NAME, remote);
263 NGHTTP2Headers::addDynamicHeader(headers, NGHTTP2Headers::HeaderConstantIndexes::X_FORWARDED_PORT_NAME, remotePort);
f05cd66c
RG
264 if (query.d_idstate.cs != nullptr) {
265 if (query.d_idstate.cs->isUDP()) {
83abf7f6 266 NGHTTP2Headers::addStaticHeader(headers, NGHTTP2Headers::HeaderConstantIndexes::X_FORWARDED_PROTO_NAME, NGHTTP2Headers::HeaderConstantIndexes::X_FORWARDED_PROTO_VALUE_DNS_OVER_UDP);
5a96fcd2 267 }
f05cd66c
RG
268 else if (query.d_idstate.cs->isDoH()) {
269 if (query.d_idstate.cs->hasTLS()) {
83abf7f6 270 NGHTTP2Headers::addStaticHeader(headers, NGHTTP2Headers::HeaderConstantIndexes::X_FORWARDED_PROTO_NAME, NGHTTP2Headers::HeaderConstantIndexes::X_FORWARDED_PROTO_VALUE_DNS_OVER_HTTPS);
5a96fcd2
RG
271 }
272 else {
83abf7f6 273 NGHTTP2Headers::addStaticHeader(headers, NGHTTP2Headers::HeaderConstantIndexes::X_FORWARDED_PROTO_NAME, NGHTTP2Headers::HeaderConstantIndexes::X_FORWARDED_PROTO_VALUE_DNS_OVER_HTTP);
5a96fcd2
RG
274 }
275 }
f05cd66c 276 else if (query.d_idstate.cs->hasTLS()) {
83abf7f6 277 NGHTTP2Headers::addStaticHeader(headers, NGHTTP2Headers::HeaderConstantIndexes::X_FORWARDED_PROTO_NAME, NGHTTP2Headers::HeaderConstantIndexes::X_FORWARDED_PROTO_VALUE_DNS_OVER_TLS);
5a96fcd2
RG
278 }
279 else {
83abf7f6 280 NGHTTP2Headers::addStaticHeader(headers, NGHTTP2Headers::HeaderConstantIndexes::X_FORWARDED_PROTO_NAME, NGHTTP2Headers::HeaderConstantIndexes::X_FORWARDED_PROTO_VALUE_DNS_OVER_TCP);
5a96fcd2
RG
281 }
282 }
283 }
284
f05cd66c
RG
285 PendingRequest pending;
286 pending.d_query = std::move(query);
287 pending.d_sender = std::move(sender);
288
289 uint32_t streamId = nghttp2_session_get_next_stream_id(d_session.get());
290 auto insertPair = d_currentStreams.insert({streamId, std::move(pending)});
291 if (!insertPair.second) {
292 /* there is a stream ID collision, something is very wrong! */
293 d_connectionDied = true;
294 nghttp2_session_terminate_session(d_session.get(), NGHTTP2_NO_ERROR);
295 throw std::runtime_error("Stream ID collision");
296 }
297
5a96fcd2
RG
298 /* if data_prd is not NULL, it provides data which will be sent in subsequent DATA frames. In this case, a method that allows request message bodies (https://tools.ietf.org/html/rfc7231#section-4) must be specified with :method key (e.g. POST). This function does not take ownership of the data_prd. The function copies the members of the data_prd. If data_prd is NULL, HEADERS have END_STREAM set.
299 */
9eb5394a 300 nghttp2_data_provider data_provider;
f05cd66c 301
9eb5394a 302 data_provider.source.ptr = this;
d9fffb37 303 data_provider.read_callback = [](nghttp2_session* session, int32_t stream_id, uint8_t* buf, size_t length, uint32_t* data_flags, nghttp2_data_source* source, void* user_data) -> ssize_t {
06140536 304 auto* conn = static_cast<DoHConnectionToBackend*>(user_data);
f05cd66c 305 auto& request = conn->d_currentStreams.at(stream_id);
e82bf80f 306 size_t toCopy = 0;
f05cd66c
RG
307 if (request.d_queryPos < request.d_query.d_buffer.size()) {
308 size_t remaining = request.d_query.d_buffer.size() - request.d_queryPos;
e82bf80f 309 toCopy = length > remaining ? remaining : length;
f05cd66c
RG
310 memcpy(buf, &request.d_query.d_buffer.at(request.d_queryPos), toCopy);
311 request.d_queryPos += toCopy;
e82bf80f
RG
312 }
313
f05cd66c 314 if (request.d_queryPos >= request.d_query.d_buffer.size()) {
d9fffb37 315 *data_flags |= NGHTTP2_DATA_FLAG_EOF;
e82bf80f 316 }
9eb5394a
RG
317 return toCopy;
318 };
319
f05cd66c
RG
320 auto newStreamId = nghttp2_submit_request(d_session.get(), nullptr, headers.data(), headers.size(), &data_provider, this);
321 if (newStreamId < 0) {
e82bf80f 322 d_connectionDied = true;
eec63896 323 ++d_ds->tcpDiedSendingQuery;
f05cd66c
RG
324 d_currentStreams.erase(streamId);
325 throw std::runtime_error("Error submitting HTTP request:" + std::string(nghttp2_strerror(newStreamId)));
9eb5394a 326 }
e82bf80f 327
9eb5394a 328 auto rv = nghttp2_session_send(d_session.get());
9eb5394a 329 if (rv != 0) {
e82bf80f 330 d_connectionDied = true;
eec63896 331 ++d_ds->tcpDiedSendingQuery;
f05cd66c 332 d_currentStreams.erase(streamId);
9eb5394a
RG
333 throw std::runtime_error("Error in nghttp2_session_send:" + std::to_string(rv));
334 }
e82bf80f 335
f05cd66c 336 d_highestStreamID = newStreamId;
9eb5394a
RG
337}
338
339class DoHClientThreadData
d523caf3 340{
9eb5394a 341public:
0a5cb883
RG
342 DoHClientThreadData(pdns::channel::Receiver<CrossProtocolQuery>&& receiver) :
343 mplexer(std::unique_ptr<FDMultiplexer>(FDMultiplexer::getMultiplexerSilent())),
344 d_receiver(std::move(receiver))
9eb5394a
RG
345 {
346 }
347
348 std::unique_ptr<FDMultiplexer> mplexer{nullptr};
0a5cb883 349 pdns::channel::Receiver<CrossProtocolQuery> d_receiver;
d523caf3
RG
350};
351
9eb5394a
RG
352void DoHConnectionToBackend::handleReadableIOCallback(int fd, FDMultiplexer::funcparam_t& param)
353{
9eb5394a
RG
354 auto conn = boost::any_cast<std::shared_ptr<DoHConnectionToBackend>>(param);
355 if (fd != conn->getHandle()) {
356 throw std::runtime_error("Unexpected socket descriptor " + std::to_string(fd) + " received in " + std::string(__PRETTY_FUNCTION__) + ", expected " + std::to_string(conn->getHandle()));
357 }
358
359 IOStateGuard ioGuard(conn->d_ioState);
360 do {
361 conn->d_inPos = 0;
362 conn->d_in.resize(conn->d_in.size() + 512);
f05cd66c 363 // cerr<<"trying to read "<<conn->d_in.size()<<endl;
9eb5394a
RG
364 try {
365 IOState newState = conn->d_handler->tryRead(conn->d_in, conn->d_inPos, conn->d_in.size(), true);
f05cd66c 366 // cerr<<"got a "<<(int)newState<<" state and "<<conn->d_inPos<<" bytes"<<endl;
9eb5394a 367 conn->d_in.resize(conn->d_inPos);
f05cd66c
RG
368
369 if (conn->d_inPos > 0) {
370 /* we got something */
9eb5394a 371 auto readlen = nghttp2_session_mem_recv(conn->d_session.get(), conn->d_in.data(), conn->d_inPos);
f05cd66c 372 // cerr<<"nghttp2_session_mem_recv returned "<<readlen<<endl;
9eb5394a
RG
373 /* as long as we don't require a pause by returning nghttp2_error.NGHTTP2_ERR_PAUSE from a CB,
374 all data should be consumed before returning */
375 if (readlen > 0 && static_cast<size_t>(readlen) < conn->d_inPos) {
f05cd66c 376 throw std::runtime_error("Fatal error while passing received data to nghttp2: " + std::string(nghttp2_strerror((int)readlen)));
9eb5394a 377 }
eec63896 378
35b27ac8
RG
379 struct timeval now
380 {
381 .tv_sec = 0, .tv_usec = 0
382 };
383
eec63896
RG
384 gettimeofday(&now, nullptr);
385 conn->d_lastDataReceivedTime = now;
386
f05cd66c 387 // cerr<<"after read send"<<endl;
e82bf80f 388 nghttp2_session_send(conn->d_session.get());
f05cd66c
RG
389 }
390
391 if (newState == IOState::Done) {
54a9a226 392 if (conn->isIdle()) {
89e62bd8 393 conn->stopIO();
becad613 394 conn->watchForRemoteHostClosingConnection();
89e62bd8
RG
395 ioGuard.release();
396 break;
397 }
9eb5394a
RG
398 }
399 else {
400 if (newState == IOState::NeedWrite) {
f05cd66c 401 // cerr<<"need write"<<endl;
9eb5394a
RG
402 conn->updateIO(IOState::NeedWrite, handleReadableIOCallback);
403 }
404 ioGuard.release();
405 break;
406 }
407 }
408 catch (const std::exception& e) {
f05cd66c 409 vinfolog("Exception while trying to read from HTTP backend connection: %s", e.what());
eec63896 410 ++conn->d_ds->tcpDiedReadingResponse;
f05cd66c 411 conn->handleIOError();
9eb5394a
RG
412 break;
413 }
d9fffb37 414 } while (conn->getConcurrentStreamsCount() > 0);
9eb5394a
RG
415}
416
417void DoHConnectionToBackend::handleWritableIOCallback(int fd, FDMultiplexer::funcparam_t& param)
418{
9eb5394a
RG
419 auto conn = boost::any_cast<std::shared_ptr<DoHConnectionToBackend>>(param);
420 if (fd != conn->getHandle()) {
421 throw std::runtime_error("Unexpected socket descriptor " + std::to_string(fd) + " received in " + std::string(__PRETTY_FUNCTION__) + ", expected " + std::to_string(conn->getHandle()));
422 }
423 IOStateGuard ioGuard(conn->d_ioState);
424
f05cd66c 425 // cerr<<"in "<<__PRETTY_FUNCTION__<<" trying to write "<<conn->d_out.size()-conn->d_outPos<<endl;
9eb5394a
RG
426 try {
427 IOState newState = conn->d_handler->tryWrite(conn->d_out, conn->d_outPos, conn->d_out.size());
f05cd66c 428 // cerr<<"got a "<<(int)newState<<" state, "<<conn->d_out.size()-conn->d_outPos<<" bytes remaining"<<endl;
9eb5394a
RG
429 if (newState == IOState::NeedRead) {
430 conn->updateIO(IOState::NeedRead, handleWritableIOCallback);
431 }
432 else if (newState == IOState::Done) {
f05cd66c 433 // cerr<<"done, buffer size was "<<conn->d_out.size()<<", pos was "<<conn->d_outPos<<endl;
eec63896 434 conn->d_firstWrite = false;
9eb5394a
RG
435 conn->d_out.clear();
436 conn->d_outPos = 0;
437 conn->stopIO();
54a9a226 438 if (!conn->isIdle()) {
f05cd66c
RG
439 conn->updateIO(IOState::NeedRead, handleReadableIOCallback);
440 }
becad613
RG
441 else {
442 conn->watchForRemoteHostClosingConnection();
443 }
9eb5394a
RG
444 }
445 ioGuard.release();
446 }
447 catch (const std::exception& e) {
f05cd66c 448 vinfolog("Exception while trying to write (ready) to HTTP backend connection: %s", e.what());
eec63896 449 ++conn->d_ds->tcpDiedSendingQuery;
f05cd66c 450 conn->handleIOError();
9eb5394a 451 }
9eb5394a
RG
452}
453
454void DoHConnectionToBackend::stopIO()
455{
456 d_ioState->reset();
f05cd66c 457
a34f264a
RG
458 if (isIdle()) {
459 auto shared = std::dynamic_pointer_cast<DoHConnectionToBackend>(shared_from_this());
460 if (!willBeReusable(false)) {
461 /* remove ourselves from the connection cache, this might mean that our
462 reference count drops to zero after that, so we need to be careful */
463 t_downstreamDoHConnectionsManager.removeDownstreamConnection(shared);
464 }
465 else {
466 t_downstreamDoHConnectionsManager.moveToIdle(shared);
467 }
54a9a226 468 }
9eb5394a
RG
469}
470
becad613 471void DoHConnectionToBackend::updateIO(IOState newState, FDMultiplexer::callbackfunc_t callback, bool noTTD)
9eb5394a 472{
35b27ac8
RG
473 struct timeval now
474 {
475 .tv_sec = 0, .tv_usec = 0
476 };
477
9eb5394a
RG
478 gettimeofday(&now, nullptr);
479 boost::optional<struct timeval> ttd{boost::none};
becad613
RG
480 if (!noTTD) {
481 if (d_healthCheckQuery) {
482 ttd = getBackendHealthCheckTTD(now);
483 }
484 else if (newState == IOState::NeedRead) {
485 ttd = getBackendReadTTD(now);
486 }
487 else if (isFresh() && d_firstWrite) {
488 /* first write just after the non-blocking connect */
489 ttd = getBackendConnectTTD(now);
490 }
491 else {
492 ttd = getBackendWriteTTD(now);
493 }
9eb5394a
RG
494 }
495
496 auto shared = std::dynamic_pointer_cast<DoHConnectionToBackend>(shared_from_this());
497 if (shared) {
498 if (newState == IOState::NeedRead) {
499 d_ioState->update(newState, callback, shared, ttd);
500 }
501 else if (newState == IOState::NeedWrite) {
502 d_ioState->update(newState, callback, shared, ttd);
503 }
504 }
505}
506
becad613
RG
507void DoHConnectionToBackend::watchForRemoteHostClosingConnection()
508{
645a1ca4 509 if (willBeReusable(false) && !d_healthCheckQuery) {
becad613
RG
510 updateIO(IOState::NeedRead, handleReadableIOCallback, false);
511 }
512}
513
d9fffb37
RG
514ssize_t DoHConnectionToBackend::send_callback(nghttp2_session* session, const uint8_t* data, size_t length, int flags, void* user_data)
515{
e82bf80f
RG
516 DoHConnectionToBackend* conn = reinterpret_cast<DoHConnectionToBackend*>(user_data);
517 bool bufferWasEmpty = conn->d_out.empty();
0e6892c6
RG
518 if (!conn->d_proxyProtocolPayloadSent && !conn->d_proxyProtocolPayload.empty()) {
519 conn->d_out.insert(conn->d_out.end(), conn->d_proxyProtocolPayload.begin(), conn->d_proxyProtocolPayload.end());
520 conn->d_proxyProtocolPayloadSent = true;
521 }
522
e82bf80f 523 conn->d_out.insert(conn->d_out.end(), data, data + length);
9eb5394a
RG
524
525 if (bufferWasEmpty) {
e82bf80f 526 try {
f05cd66c 527 // cerr<<"in "<<__PRETTY_FUNCTION__<<" trying to write "<<conn->d_out.size()-conn->d_outPos<<endl;
e82bf80f 528 auto state = conn->d_handler->tryWrite(conn->d_out, conn->d_outPos, conn->d_out.size());
f05cd66c 529 // cerr<<"got a "<<(int)state<<" state, "<<conn->d_out.size()-conn->d_outPos<<" bytes remaining"<<endl;
e82bf80f 530 if (state == IOState::Done) {
eec63896 531 conn->d_firstWrite = false;
e82bf80f
RG
532 conn->d_out.clear();
533 conn->d_outPos = 0;
f05cd66c 534 conn->stopIO();
54a9a226 535 if (!conn->isIdle()) {
f05cd66c
RG
536 conn->updateIO(IOState::NeedRead, handleReadableIOCallback);
537 }
becad613
RG
538 else {
539 conn->watchForRemoteHostClosingConnection();
540 }
e82bf80f
RG
541 }
542 else {
543 conn->updateIO(state, handleWritableIOCallback);
544 }
9eb5394a 545 }
e82bf80f 546 catch (const std::exception& e) {
f05cd66c
RG
547 vinfolog("Exception while trying to write (send) to HTTP backend connection: %s", e.what());
548 conn->handleIOError();
eec63896 549 ++conn->d_ds->tcpDiedSendingQuery;
9eb5394a
RG
550 }
551 }
552
d523caf3
RG
553 return length;
554}
555
d9fffb37
RG
556int DoHConnectionToBackend::on_frame_recv_callback(nghttp2_session* session, const nghttp2_frame* frame, void* user_data)
557{
9eb5394a 558 DoHConnectionToBackend* conn = reinterpret_cast<DoHConnectionToBackend*>(user_data);
f05cd66c 559 // cerr<<"Frame type is "<<std::to_string(frame->hd.type)<<endl;
e82bf80f 560#if 0
d523caf3
RG
561 switch (frame->hd.type) {
562 case NGHTTP2_HEADERS:
9eb5394a 563 cerr<<"got headers"<<endl;
d523caf3
RG
564 if (frame->headers.cat == NGHTTP2_HCAT_RESPONSE) {
565 cerr<<"All headers received"<<endl;
566 }
567 break;
568 case NGHTTP2_WINDOW_UPDATE:
569 cerr<<"got window update"<<endl;
570 break;
571 case NGHTTP2_SETTINGS:
572 cerr<<"got settings"<<endl;
573 cerr<<frame->settings.niv<<endl;
574 for (size_t idx = 0; idx < frame->settings.niv; idx++) {
575 cerr<<"- "<<frame->settings.iv[idx].settings_id<<" "<<frame->settings.iv[idx].value<<endl;
576 }
577 break;
9eb5394a
RG
578 case NGHTTP2_DATA:
579 cerr<<"got data"<<endl;
580 break;
9eb5394a 581 }
e82bf80f 582#endif
9eb5394a 583
becad613
RG
584 if (frame->hd.type == NGHTTP2_GOAWAY) {
585 conn->d_connectionDied = true;
586 }
587
9eb5394a 588 /* is this the last frame for this stream? */
0b2b0041 589 else if ((frame->hd.type == NGHTTP2_HEADERS || frame->hd.type == NGHTTP2_DATA) && frame->hd.flags & NGHTTP2_FLAG_END_STREAM) {
9eb5394a
RG
590 auto stream = conn->d_currentStreams.find(frame->hd.stream_id);
591 if (stream != conn->d_currentStreams.end()) {
f05cd66c 592 // cerr<<"Stream "<<frame->hd.stream_id<<" is now finished"<<endl;
9eb5394a 593 stream->second.d_finished = true;
eec63896 594 ++conn->d_queries;
9eb5394a
RG
595
596 auto request = std::move(stream->second);
597 conn->d_currentStreams.erase(stream->first);
e82bf80f
RG
598 if (request.d_responseCode == 200U) {
599 conn->handleResponse(std::move(request));
d9fffb37
RG
600 }
601 else {
e82bf80f 602 vinfolog("HTTP response has a non-200 status code: %d", request.d_responseCode);
35b27ac8
RG
603 struct timeval now
604 {
605 .tv_sec = 0, .tv_usec = 0
606 };
607
e82bf80f
RG
608 gettimeofday(&now, nullptr);
609
610 conn->handleResponseError(std::move(request), now);
611 }
becad613 612
54a9a226 613 if (conn->isIdle()) {
e82bf80f 614 conn->stopIO();
becad613 615 conn->watchForRemoteHostClosingConnection();
e82bf80f 616 }
9eb5394a
RG
617 }
618 else {
e82bf80f
RG
619 vinfolog("Stream %d NOT FOUND", frame->hd.stream_id);
620 conn->d_connectionDied = true;
eec63896 621 ++conn->d_ds->tcpDiedReadingResponse;
e82bf80f 622 return NGHTTP2_ERR_CALLBACK_FAILURE;
9eb5394a 623 }
d523caf3
RG
624 }
625
626 return 0;
627}
628
d9fffb37
RG
629int DoHConnectionToBackend::on_data_chunk_recv_callback(nghttp2_session* session, uint8_t flags, int32_t stream_id, const uint8_t* data, size_t len, void* user_data)
630{
9eb5394a 631 DoHConnectionToBackend* conn = reinterpret_cast<DoHConnectionToBackend*>(user_data);
f05cd66c 632 // cerr<<"Got data of size "<<len<<" for stream "<<stream_id<<endl;
9eb5394a
RG
633 auto stream = conn->d_currentStreams.find(stream_id);
634 if (stream == conn->d_currentStreams.end()) {
e82bf80f
RG
635 vinfolog("Unable to match the stream ID %d to a known one!", stream_id);
636 conn->d_connectionDied = true;
eec63896 637 ++conn->d_ds->tcpDiedReadingResponse;
e82bf80f
RG
638 return NGHTTP2_ERR_CALLBACK_FAILURE;
639 }
640 if (len > std::numeric_limits<uint16_t>::max() || (std::numeric_limits<uint16_t>::max() - stream->second.d_buffer.size()) < len) {
641 vinfolog("Data frame of size %d is too large for a DNS response (we already have %d)", len, stream->second.d_buffer.size());
642 conn->d_connectionDied = true;
eec63896 643 ++conn->d_ds->tcpDiedReadingResponse;
9eb5394a
RG
644 return NGHTTP2_ERR_CALLBACK_FAILURE;
645 }
e82bf80f 646
9eb5394a
RG
647 stream->second.d_buffer.insert(stream->second.d_buffer.end(), data, data + len);
648 if (stream->second.d_finished) {
becad613
RG
649 // cerr<<"we now have the full response!"<<endl;
650 // cerr<<std::string(reinterpret_cast<const char*>(data), len)<<endl;
e82bf80f 651
9eb5394a
RG
652 auto request = std::move(stream->second);
653 conn->d_currentStreams.erase(stream->first);
e82bf80f
RG
654 if (request.d_responseCode == 200U) {
655 conn->handleResponse(std::move(request));
d9fffb37
RG
656 }
657 else {
e82bf80f 658 vinfolog("HTTP response has a non-200 status code: %d", request.d_responseCode);
35b27ac8
RG
659 struct timeval now
660 {
661 .tv_sec = 0, .tv_usec = 0
662 };
663
e82bf80f
RG
664 gettimeofday(&now, nullptr);
665
666 conn->handleResponseError(std::move(request), now);
667 }
54a9a226 668 if (conn->isIdle()) {
e82bf80f 669 conn->stopIO();
becad613 670 conn->watchForRemoteHostClosingConnection();
e82bf80f 671 }
9eb5394a 672 }
9eb5394a 673
d523caf3
RG
674 return 0;
675}
676
d9fffb37
RG
677int DoHConnectionToBackend::on_stream_close_callback(nghttp2_session* session, int32_t stream_id, uint32_t error_code, void* user_data)
678{
e82bf80f
RG
679 DoHConnectionToBackend* conn = reinterpret_cast<DoHConnectionToBackend*>(user_data);
680
681 if (error_code == 0) {
682 return 0;
683 }
d523caf3 684
f05cd66c 685 // cerr << "Stream " << stream_id << " closed with error_code=" << error_code << endl;
e82bf80f 686 conn->d_connectionDied = true;
eec63896 687 ++conn->d_ds->tcpDiedReadingResponse;
e82bf80f
RG
688
689 auto stream = conn->d_currentStreams.find(stream_id);
690 if (stream == conn->d_currentStreams.end()) {
691 /* we don't care, then */
e82bf80f 692 return 0;
d523caf3
RG
693 }
694
35b27ac8
RG
695 struct timeval now
696 {
697 .tv_sec = 0, .tv_usec = 0
698 };
699
e82bf80f
RG
700 gettimeofday(&now, nullptr);
701 auto request = std::move(stream->second);
702 conn->d_currentStreams.erase(stream->first);
d523caf3 703
f05cd66c 704 // cerr<<"Query has "<<request.d_query.d_downstreamFailures<<" failures, backend limit is "<<conn->d_ds->d_retries<<endl;
8cf75ac2 705 if (request.d_query.d_downstreamFailures < conn->d_ds->d_config.d_retries) {
f05cd66c
RG
706 // cerr<<"in "<<__PRETTY_FUNCTION__<<", looking for a connection to send a query of size "<<request.d_query.d_buffer.size()<<endl;
707 ++request.d_query.d_downstreamFailures;
9fd8dc1f 708 auto downstream = t_downstreamDoHConnectionsManager.getConnectionToDownstream(conn->d_mplexer, conn->d_ds, now, std::string(conn->d_proxyProtocolPayload));
f05cd66c
RG
709 downstream->queueQuery(request.d_sender, std::move(request.d_query));
710 }
711 else {
712 conn->handleResponseError(std::move(request), now);
713 }
d523caf3 714
f468a7fe 715 // cerr<<"we now have "<<conn->getConcurrentStreamsCount()<<" concurrent connections"<<endl;
54a9a226 716 if (conn->isIdle()) {
f468a7fe 717 // cerr<<"stopping IO"<<endl;
e82bf80f 718 conn->stopIO();
becad613 719 conn->watchForRemoteHostClosingConnection();
d523caf3 720 }
e82bf80f 721
d523caf3
RG
722 return 0;
723}
724
d9fffb37
RG
725int DoHConnectionToBackend::on_header_callback(nghttp2_session* session, const nghttp2_frame* frame, const uint8_t* name, size_t namelen, const uint8_t* value, size_t valuelen, uint8_t flags, void* user_data)
726{
e82bf80f 727 DoHConnectionToBackend* conn = reinterpret_cast<DoHConnectionToBackend*>(user_data);
d523caf3 728
e82bf80f 729 const std::string status(":status");
5d662a4e 730 if (frame->hd.type == NGHTTP2_HEADERS && frame->headers.cat == NGHTTP2_HCAT_RESPONSE) {
f468a7fe
RG
731 // cerr<<"got header for "<<frame->hd.stream_id<<":"<<endl;
732 // cerr<<"- "<<std::string(reinterpret_cast<const char*>(name), namelen)<<endl;
733 // cerr<<"- "<<std::string(reinterpret_cast<const char*>(value), valuelen)<<endl;
5d662a4e
RG
734 if (namelen == status.size() && memcmp(status.data(), name, status.size()) == 0) {
735 auto stream = conn->d_currentStreams.find(frame->hd.stream_id);
736 if (stream == conn->d_currentStreams.end()) {
737 vinfolog("Unable to match the stream ID %d to a known one!", frame->hd.stream_id);
738 conn->d_connectionDied = true;
739 return NGHTTP2_ERR_CALLBACK_FAILURE;
740 }
741 try {
a0383aad 742 pdns::checked_stoi_into(stream->second.d_responseCode, std::string(reinterpret_cast<const char*>(value), valuelen));
5d662a4e
RG
743 }
744 catch (...) {
745 vinfolog("Error parsing the status header for stream ID %d", frame->hd.stream_id);
746 conn->d_connectionDied = true;
eec63896 747 ++conn->d_ds->tcpDiedReadingResponse;
5d662a4e 748 return NGHTTP2_ERR_CALLBACK_FAILURE;
d523caf3 749 }
d523caf3
RG
750 }
751 }
e82bf80f 752 return 0;
d523caf3
RG
753}
754
d9fffb37
RG
755int DoHConnectionToBackend::on_error_callback(nghttp2_session* session, int lib_error_code, const char* msg, size_t len, void* user_data)
756{
e82bf80f 757 vinfolog("Error in HTTP/2 connection: %s", std::string(msg, len));
d523caf3 758
e82bf80f
RG
759 DoHConnectionToBackend* conn = reinterpret_cast<DoHConnectionToBackend*>(user_data);
760 conn->d_connectionDied = true;
eec63896 761 ++conn->d_ds->tcpDiedReadingResponse;
d523caf3 762
e82bf80f 763 return 0;
d523caf3 764}
9eb5394a 765
9fd8dc1f 766DoHConnectionToBackend::DoHConnectionToBackend(const std::shared_ptr<DownstreamState>& ds, std::unique_ptr<FDMultiplexer>& mplexer, const struct timeval& now, std::string&& proxyProtocolPayload) :
645a1ca4 767 ConnectionToBackend(ds, mplexer, now), d_proxyProtocolPayload(std::move(proxyProtocolPayload))
9eb5394a 768{
645a1ca4 769 // inherit most of the stuff from the ConnectionToBackend()
9eb5394a
RG
770 d_ioState = make_unique<IOStateHandler>(*d_mplexer, d_handler->getDescriptor());
771
772 nghttp2_session_callbacks* cbs = nullptr;
773 if (nghttp2_session_callbacks_new(&cbs) != 0) {
e82bf80f 774 d_connectionDied = true;
eec63896 775 ++d_ds->tcpDiedSendingQuery;
e82bf80f 776 vinfolog("Unable to create a callback object for a new HTTP/2 session");
9eb5394a
RG
777 return;
778 }
d9fffb37 779 std::unique_ptr<nghttp2_session_callbacks, void (*)(nghttp2_session_callbacks*)> callbacks(cbs, nghttp2_session_callbacks_del);
9eb5394a
RG
780 cbs = nullptr;
781
782 nghttp2_session_callbacks_set_send_callback(callbacks.get(), send_callback);
783 nghttp2_session_callbacks_set_on_frame_recv_callback(callbacks.get(), on_frame_recv_callback);
784 nghttp2_session_callbacks_set_on_data_chunk_recv_callback(callbacks.get(), on_data_chunk_recv_callback);
785 nghttp2_session_callbacks_set_on_stream_close_callback(callbacks.get(), on_stream_close_callback);
786 nghttp2_session_callbacks_set_on_header_callback(callbacks.get(), on_header_callback);
9eb5394a
RG
787 nghttp2_session_callbacks_set_error_callback2(callbacks.get(), on_error_callback);
788
789 nghttp2_session* sess = nullptr;
790 if (nghttp2_session_client_new(&sess, callbacks.get(), this) != 0) {
e82bf80f 791 d_connectionDied = true;
eec63896 792 ++d_ds->tcpDiedSendingQuery;
e82bf80f 793 vinfolog("Coult not allocate a new HTTP/2 session");
9eb5394a
RG
794 return;
795 }
796
d9fffb37 797 d_session = std::unique_ptr<nghttp2_session, void (*)(nghttp2_session*)>(sess, nghttp2_session_del);
9eb5394a
RG
798 sess = nullptr;
799
800 callbacks.reset();
801
9eb5394a 802 nghttp2_settings_entry iv[] = {
e82bf80f
RG
803 /* rfc7540 section-8.2.2:
804 "Advertising a SETTINGS_MAX_CONCURRENT_STREAMS value of zero disables
805 server push by preventing the server from creating the necessary
806 streams."
807 */
808 {NGHTTP2_SETTINGS_MAX_CONCURRENT_STREAMS, 0},
9eb5394a 809 {NGHTTP2_SETTINGS_ENABLE_PUSH, 0},
e82bf80f 810 /* we might want to make the initial window size configurable, but 16M is a large enough default */
d9fffb37
RG
811 {NGHTTP2_SETTINGS_INITIAL_WINDOW_SIZE, 16 * 1024 * 1024}};
812 /* client 24 bytes magic string will be sent by nghttp2 library */
813 int rv = nghttp2_submit_settings(d_session.get(), NGHTTP2_FLAG_NONE, iv, sizeof(iv) / sizeof(*iv));
9eb5394a 814 if (rv != 0) {
e82bf80f 815 d_connectionDied = true;
eec63896 816 ++d_ds->tcpDiedSendingQuery;
e82bf80f 817 vinfolog("Could not submit SETTINGS: %s", nghttp2_strerror(rv));
9eb5394a
RG
818 return;
819 }
820}
821
9eb5394a
RG
822static void handleCrossProtocolQuery(int pipefd, FDMultiplexer::funcparam_t& param)
823{
824 auto threadData = boost::any_cast<DoHClientThreadData*>(param);
9eb5394a 825
0a5cb883
RG
826 std::unique_ptr<CrossProtocolQuery> cpq{nullptr};
827 try {
828 auto tmp = threadData->d_receiver.receive();
829 if (!tmp) {
9eb5394a
RG
830 return;
831 }
0a5cb883 832 cpq = std::move(*tmp);
9eb5394a 833 }
0a5cb883
RG
834 catch (const std::exception& e) {
835 throw std::runtime_error("Error while reading from the DoH cross-protocol channel:" + std::string(e.what()));
9eb5394a
RG
836 }
837
35b27ac8
RG
838 struct timeval now
839 {
840 .tv_sec = 0, .tv_usec = 0
841 };
0a5cb883 842 gettimeofday(&now, nullptr);
9eb5394a 843
0a5cb883
RG
844 std::shared_ptr<TCPQuerySender> tqs = cpq->getTCPQuerySender();
845 auto query = std::move(cpq->query);
846 auto downstreamServer = std::move(cpq->downstream);
847 cpq.reset();
9eb5394a 848
0a5cb883
RG
849 try {
850 auto downstream = t_downstreamDoHConnectionsManager.getConnectionToDownstream(threadData->mplexer, downstreamServer, now, std::move(query.d_proxyProtocolPayload));
851 downstream->queueQuery(tqs, std::move(query));
9eb5394a
RG
852 }
853 catch (...) {
7e8a05fa
RG
854 TCPResponse response(std::move(query));
855 tqs->notifyIOError(now, std::move(response));
9eb5394a
RG
856 }
857}
858
0a5cb883 859static void dohClientThread(pdns::channel::Receiver<CrossProtocolQuery>&& receiver)
9eb5394a
RG
860{
861 setThreadName("dnsdist/dohClie");
862
55a2979f 863 try {
0a5cb883
RG
864 DoHClientThreadData data(std::move(receiver));
865 data.mplexer->addReadFD(data.d_receiver.getDescriptor(), handleCrossProtocolQuery, &data);
9eb5394a 866
35b27ac8
RG
867 struct timeval now
868 {
869 .tv_sec = 0, .tv_usec = 0
870 };
871
55a2979f
RG
872 gettimeofday(&now, nullptr);
873 time_t lastTimeoutScan = now.tv_sec;
874
875 for (;;) {
ab8eb008 876 data.mplexer->run(&now, 1000);
55a2979f
RG
877
878 if (now.tv_sec > lastTimeoutScan) {
879 lastTimeoutScan = now.tv_sec;
880
881 try {
9fd8dc1f 882 t_downstreamDoHConnectionsManager.cleanupClosedConnections(now);
55a2979f
RG
883 handleH2Timeouts(*data.mplexer, now);
884
885 if (g_dohStatesDumpRequested > 0) {
886 /* just to keep things clean in the output, debug only */
887 static std::mutex s_lock;
888 std::lock_guard<decltype(s_lock)> lck(s_lock);
889 if (g_dohStatesDumpRequested > 0) {
890 /* no race here, we took the lock so it can only be increased in the meantime */
891 --g_dohStatesDumpRequested;
f8e1161f 892 infolog("Dumping the DoH client states, as requested:");
55a2979f
RG
893 data.mplexer->runForAllWatchedFDs([](bool isRead, int fd, const FDMultiplexer::funcparam_t& param, struct timeval ttd) {
894 struct timeval lnow;
895 gettimeofday(&lnow, nullptr);
896 if (ttd.tv_sec > 0) {
f8e1161f 897 infolog("- Descriptor %d is in %s state, TTD in %d", fd, (isRead ? "read" : "write"), (ttd.tv_sec - lnow.tv_sec));
55a2979f
RG
898 }
899 else {
f8e1161f 900 infolog("- Descriptor %d is in %s state, no TTD set", fd, (isRead ? "read" : "write"));
55a2979f
RG
901 }
902
903 if (param.type() == typeid(std::shared_ptr<DoHConnectionToBackend>)) {
904 auto conn = boost::any_cast<std::shared_ptr<DoHConnectionToBackend>>(param);
f8e1161f 905 infolog(" - %s", conn->toString());
55a2979f
RG
906 }
907 else if (param.type() == typeid(DoHClientThreadData*)) {
f8e1161f 908 infolog(" - Worker thread pipe");
55a2979f
RG
909 }
910 });
f8e1161f 911 infolog("The DoH client cache has %d active and %d idle outgoing connections cached", t_downstreamDoHConnectionsManager.getActiveCount(), t_downstreamDoHConnectionsManager.getIdleCount());
9eb5394a 912 }
55a2979f
RG
913 }
914 }
915 catch (const std::exception& e) {
f8e1161f 916 warnlog("Error in outgoing DoH thread: %s", e.what());
9eb5394a
RG
917 }
918 }
919 }
920 }
55a2979f
RG
921 catch (const std::exception& e) {
922 errlog("Fatal error in outgoing DoH thread: %s", e.what());
923 }
9eb5394a
RG
924}
925
d9fffb37
RG
926static bool select_next_proto_callback(unsigned char** out, unsigned char* outlen, const unsigned char* in, unsigned int inlen)
927{
6e069889
RG
928 if (nghttp2_select_next_protocol(out, outlen, in, inlen) <= 0) {
929 vinfolog("The remote DoH backend did not advertise " NGHTTP2_PROTO_VERSION_ID);
930 return false;
931 }
932 return true;
933}
934
cf25b82b 935#endif /* HAVE_DNS_OVER_HTTPS && HAVE_NGHTTP2 */
6e069889
RG
936
937struct DoHClientCollection::DoHWorkerThread
938{
939 DoHWorkerThread()
940 {
941 }
942
0a5cb883
RG
943 DoHWorkerThread(pdns::channel::Sender<CrossProtocolQuery>&& sender) :
944 d_sender(std::move(sender))
6e069889
RG
945 {
946 }
947
d9fffb37 948 DoHWorkerThread(DoHWorkerThread&& rhs) :
0a5cb883 949 d_sender(std::move(rhs.d_sender))
6e069889 950 {
6e069889
RG
951 }
952
953 DoHWorkerThread& operator=(DoHWorkerThread&& rhs)
954 {
0a5cb883 955 d_sender = std::move(rhs.d_sender);
6e069889
RG
956 return *this;
957 }
958
959 DoHWorkerThread(const DoHWorkerThread& rhs) = delete;
960 DoHWorkerThread& operator=(const DoHWorkerThread&) = delete;
961
0a5cb883 962 pdns::channel::Sender<CrossProtocolQuery> d_sender;
6e069889
RG
963};
964
168ef594
RG
965DoHClientCollection::DoHClientCollection(size_t numberOfThreads) :
966 d_clientThreads(numberOfThreads)
6e069889
RG
967{
968}
969
970bool DoHClientCollection::passCrossProtocolQueryToThread(std::unique_ptr<CrossProtocolQuery>&& cpq)
971{
972 if (d_numberOfThreads == 0) {
973 throw std::runtime_error("No DoH worker thread yet");
974 }
975
976 uint64_t pos = d_pos++;
0a5cb883 977 if (!d_clientThreads.at(pos % d_numberOfThreads).d_sender.send(std::move(cpq))) {
6ba8d6ca 978 ++dnsdist::metrics::g_stats.outgoingDoHQueryPipeFull;
6e069889
RG
979 return false;
980 }
981
982 return true;
983}
984
9eb5394a
RG
985void DoHClientCollection::addThread()
986{
cf25b82b 987#if defined(HAVE_DNS_OVER_HTTPS) && defined(HAVE_NGHTTP2)
0a5cb883 988 try {
c1d76521 989 auto [sender, receiver] = pdns::channel::createObjectQueue<CrossProtocolQuery>(pdns::channel::SenderBlockingMode::SenderNonBlocking, pdns::channel::ReceiverBlockingMode::ReceiverNonBlocking, g_tcpInternalPipeBufferSize);
9eb5394a 990
0a5cb883 991 vinfolog("Adding DoH Client thread");
9eb5394a
RG
992 std::lock_guard<std::mutex> lock(d_mutex);
993
994 if (d_numberOfThreads >= d_clientThreads.size()) {
168ef594 995 vinfolog("Adding a new DoH client thread would exceed the vector size (%d/%d), skipping. Consider increasing the maximum amount of DoH client threads with setMaxDoHClientThreads() in the configuration.", d_numberOfThreads, d_clientThreads.size());
9eb5394a
RG
996 return;
997 }
998
0a5cb883 999 DoHWorkerThread worker(std::move(sender));
9eb5394a 1000 try {
0a5cb883 1001 std::thread t1(dohClientThread, std::move(receiver));
9eb5394a
RG
1002 t1.detach();
1003 }
1004 catch (const std::runtime_error& e) {
0a5cb883 1005 /* the thread creation failed */
9eb5394a 1006 errlog("Error creating a DoH thread: %s", e.what());
9eb5394a
RG
1007 return;
1008 }
1009
1010 d_clientThreads.at(d_numberOfThreads) = std::move(worker);
1011 ++d_numberOfThreads;
1012 }
0a5cb883
RG
1013 catch (const std::exception& e) {
1014 errlog("Error creating the DoH channel: %s", e.what());
1015 return;
1016 }
cf25b82b 1017#else /* HAVE_DNS_OVER_HTTPS && HAVE_NGHTTP2 */
6e069889 1018 throw std::runtime_error("DoHClientCollection::addThread() called but nghttp2 support is not available");
cf25b82b 1019#endif /* HAVE_DNS_OVER_HTTPS && HAVE_NGHTTP2 */
9eb5394a
RG
1020}
1021
1022bool initDoHWorkers()
1023{
cf25b82b 1024#if defined(HAVE_DNS_OVER_HTTPS) && defined(HAVE_NGHTTP2)
b146a84f
RG
1025 if (!g_outgoingDoHWorkerThreads) {
1026 /* Unless the value has been set to 0 explicitly, always start at least one outgoing DoH worker thread, in case a DoH backend
1027 is added at a later time. */
1028 g_outgoingDoHWorkerThreads = 1;
1029 }
1030
1031 if (g_outgoingDoHWorkerThreads && *g_outgoingDoHWorkerThreads > 0) {
1032 g_dohClientThreads = std::make_unique<DoHClientCollection>(*g_outgoingDoHWorkerThreads);
1033 for (size_t idx = 0; idx < *g_outgoingDoHWorkerThreads; idx++) {
eec63896
RG
1034 g_dohClientThreads->addThread();
1035 }
47dc5b5f 1036 }
9eb5394a 1037 return true;
6e069889
RG
1038#else
1039 return false;
cf25b82b 1040#endif /* HAVE_DNS_OVER_HTTPS && HAVE_NGHTTP2 */
e82bf80f
RG
1041}
1042
1043bool setupDoHClientProtocolNegotiation(std::shared_ptr<TLSCtx>& ctx)
1044{
1045 if (ctx == nullptr) {
1046 return false;
1047 }
cf25b82b 1048#if defined(HAVE_DNS_OVER_HTTPS) && defined(HAVE_NGHTTP2)
e82bf80f
RG
1049 /* we want to set the ALPN to h2, if only to mitigate the ALPACA attack */
1050 const std::vector<std::vector<uint8_t>> h2Alpns = {{'h', '2'}};
1051 ctx->setALPNProtos(h2Alpns);
1052 ctx->setNextProtocolSelectCallback(select_next_proto_callback);
1053 return true;
cf25b82b 1054#else /* HAVE_DNS_OVER_HTTPS && HAVE_NGHTTP2 */
6e069889 1055 return false;
cf25b82b 1056#endif /* HAVE_DNS_OVER_HTTPS && HAVE_NGHTTP2 */
e82bf80f 1057}
8229d4f0 1058
ae3b96d9 1059bool sendH2Query(const std::shared_ptr<DownstreamState>& ds, std::unique_ptr<FDMultiplexer>& mplexer, std::shared_ptr<TCPQuerySender>& sender, InternalQuery&& query, bool healthCheck)
8229d4f0 1060{
cf25b82b 1061#if defined(HAVE_DNS_OVER_HTTPS) && defined(HAVE_NGHTTP2)
35b27ac8
RG
1062 struct timeval now
1063 {
1064 .tv_sec = 0, .tv_usec = 0
1065 };
8229d4f0
RG
1066 gettimeofday(&now, nullptr);
1067
f05cd66c
RG
1068 if (healthCheck) {
1069 /* always do health-checks over a new connection */
0e6892c6 1070 auto newConnection = std::make_shared<DoHConnectionToBackend>(ds, mplexer, now, std::move(query.d_proxyProtocolPayload));
f05cd66c
RG
1071 newConnection->setHealthCheck(healthCheck);
1072 newConnection->queueQuery(sender, std::move(query));
1073 }
1074 else {
9fd8dc1f 1075 auto connection = t_downstreamDoHConnectionsManager.getConnectionToDownstream(mplexer, ds, now, std::move(query.d_proxyProtocolPayload));
f05cd66c
RG
1076 connection->queueQuery(sender, std::move(query));
1077 }
1078
8229d4f0 1079 return true;
cf25b82b 1080#else /* HAVE_DNS_OVER_HTTPS && HAVE_NGHTTP2 */
6e069889 1081 return false;
cf25b82b 1082#endif /* HAVE_DNS_OVER_HTTPS && HAVE_NGHTTP2 */
8229d4f0 1083}
f05cd66c
RG
1084
1085size_t clearH2Connections()
1086{
1087 size_t cleared = 0;
cf25b82b 1088#if defined(HAVE_DNS_OVER_HTTPS) && defined(HAVE_NGHTTP2)
9fd8dc1f 1089 cleared = t_downstreamDoHConnectionsManager.clear();
cf25b82b 1090#endif /* HAVE_DNS_OVER_HTTPS && HAVE_NGHTTP2 */
f05cd66c
RG
1091 return cleared;
1092}
1093
1094size_t handleH2Timeouts(FDMultiplexer& mplexer, const struct timeval& now)
1095{
1096 size_t got = 0;
cf25b82b 1097#if defined(HAVE_DNS_OVER_HTTPS) && defined(HAVE_NGHTTP2)
f05cd66c
RG
1098 auto expiredReadConns = mplexer.getTimeouts(now, false);
1099 for (const auto& cbData : expiredReadConns) {
1100 if (cbData.second.type() == typeid(std::shared_ptr<DoHConnectionToBackend>)) {
1101 auto conn = boost::any_cast<std::shared_ptr<DoHConnectionToBackend>>(cbData.second);
1102 vinfolog("Timeout (read) from remote DoH backend %s", conn->getBackendName());
1103 conn->handleTimeout(now, false);
1104 ++got;
1105 }
1106 }
1107
1108 auto expiredWriteConns = mplexer.getTimeouts(now, true);
1109 for (const auto& cbData : expiredWriteConns) {
1110 if (cbData.second.type() == typeid(std::shared_ptr<DoHConnectionToBackend>)) {
1111 auto conn = boost::any_cast<std::shared_ptr<DoHConnectionToBackend>>(cbData.second);
1112 vinfolog("Timeout (write) from remote DoH backend %s", conn->getBackendName());
1113 conn->handleTimeout(now, true);
1114 ++got;
1115 }
1116 }
cf25b82b 1117#endif /* HAVE_DNS_OVER_HTTPS && HAVE_NGHTTP2 */
f05cd66c
RG
1118 return got;
1119}
767f5514
RG
1120
1121void setDoHDownstreamCleanupInterval(uint16_t max)
1122{
cf25b82b 1123#if defined(HAVE_DNS_OVER_HTTPS) && defined(HAVE_NGHTTP2)
767f5514 1124 DownstreamDoHConnectionsManager::setCleanupInterval(max);
cf25b82b 1125#endif /* HAVE_DNS_OVER_HTTPS && HAVE_NGHTTP2 */
767f5514
RG
1126}
1127
1128void setDoHDownstreamMaxIdleTime(uint16_t max)
1129{
cf25b82b 1130#if defined(HAVE_DNS_OVER_HTTPS) && defined(HAVE_NGHTTP2)
767f5514 1131 DownstreamDoHConnectionsManager::setMaxIdleTime(max);
cf25b82b 1132#endif /* HAVE_DNS_OVER_HTTPS && HAVE_NGHTTP2 */
767f5514
RG
1133}
1134
54a9a226 1135void setDoHDownstreamMaxIdleConnectionsPerBackend(size_t max)
767f5514 1136{
cf25b82b 1137#if defined(HAVE_DNS_OVER_HTTPS) && defined(HAVE_NGHTTP2)
54a9a226 1138 DownstreamDoHConnectionsManager::setMaxIdleConnectionsPerDownstream(max);
cf25b82b 1139#endif /* HAVE_DNS_OVER_HTTPS && HAVE_NGHTTP2 */
767f5514 1140}