]> git.ipfire.org Git - thirdparty/pdns.git/blob - pdns/dnsdistdist/doq.cc
doq: initialize stateless reset token
[thirdparty/pdns.git] / pdns / dnsdistdist / doq.cc
1 /*
2 * This file is part of PowerDNS or dnsdist.
3 * Copyright -- PowerDNS.COM B.V. and its contributors
4 *
5 * This program is free software; you can redistribute it and/or modify
6 * it under the terms of version 2 of the GNU General Public License as
7 * published by the Free Software Foundation.
8 *
9 * In addition, for the avoidance of any doubt, permission is granted to
10 * link this program with OpenSSL and to (re)distribute the binaries
11 * produced as the result of such linking.
12 *
13 * This program is distributed in the hope that it will be useful,
14 * but WITHOUT ANY WARRANTY; without even the implied warranty of
15 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16 * GNU General Public License for more details.
17 *
18 * You should have received a copy of the GNU General Public License
19 * along with this program; if not, write to the Free Software
20 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
21 */
22
23 #include "doq.hh"
24
25 #include "dnsdist-tcp.hh"
26 #include "dnsdist-random.hh"
27 #include "dnsparser.hh"
28 #include "dolog.hh"
29 #include "iputils.hh"
30 #include "misc.hh"
31 #include "sstuff.hh"
32 #include "threadname.hh"
33 #include "dnsdist-ecs.hh"
34 #include "dnsdist-proxy-protocol.hh"
35 #include "sodcrypto.hh"
36
37 static std::string s_quicRetryTokenKey = newKey(false);
38
39 #ifdef HAVE_DNS_OVER_QUIC
40
41 #include <quiche.h>
42
43 using QuicheConnection = std::unique_ptr<quiche_conn, decltype(&quiche_conn_free)>;
44 using QuicheConfig = std::unique_ptr<quiche_config, decltype(&quiche_config_free)>;
45
46 class Connection
47 {
48 public:
49 Connection(const ComboAddress& peer, std::unique_ptr<quiche_conn, decltype(&quiche_conn_free)>&& conn) :
50 d_peer(peer), d_conn(std::move(conn))
51 {
52 }
53 Connection(const Connection&) = delete;
54 Connection(Connection&&) = default;
55 Connection& operator=(const Connection&) = delete;
56 Connection& operator=(Connection&&) = default;
57 ~Connection() = default;
58
59 ComboAddress d_peer;
60 QuicheConnection d_conn;
61 };
62
63 #endif
64
65 static void sendBackDOQUnit(DOQUnitUniquePtr&& du, const char* description);
66 struct DOQServerConfig
67 {
68 DOQServerConfig(std::unique_ptr<quiche_config, decltype(&quiche_config_free)>&& config_, uint32_t internalPipeBufferSize) :
69 config(std::move(config_))
70 {
71 {
72 auto [sender, receiver] = pdns::channel::createObjectQueue<DOQUnit>(pdns::channel::SenderBlockingMode::SenderNonBlocking, pdns::channel::ReceiverBlockingMode::ReceiverNonBlocking, internalPipeBufferSize);
73 d_responseSender = std::move(sender);
74 d_responseReceiver = std::move(receiver);
75 }
76 }
77 DOQServerConfig(const DOQServerConfig&) = delete;
78 DOQServerConfig(DOQServerConfig&&) = default;
79 DOQServerConfig& operator=(const DOQServerConfig&) = delete;
80 DOQServerConfig& operator=(DOQServerConfig&&) = default;
81 ~DOQServerConfig() = default;
82
83 LocalHolders holders;
84 QuicheConfig config;
85 ClientState* cs{nullptr};
86 std::shared_ptr<DOQFrontend> df{nullptr};
87 pdns::channel::Sender<DOQUnit> d_responseSender;
88 pdns::channel::Receiver<DOQUnit> d_responseReceiver;
89 };
90
91 #if 0
92 #define DEBUGLOG_ENABLED
93 #define DEBUGLOG(x) std::cerr << x << std::endl;
94 #else
95 #define DEBUGLOG(x)
96 #endif
97
98 static constexpr size_t MAX_DATAGRAM_SIZE = 1350;
99 static constexpr size_t LOCAL_CONN_ID_LEN = 16;
100
101 static std::map<PacketBuffer, Connection> s_connections;
102
103 class DOQTCPCrossQuerySender final : public TCPQuerySender
104 {
105 public:
106 DOQTCPCrossQuerySender()
107 {
108 }
109
110 bool active() const override
111 {
112 return true;
113 }
114
115 void handleResponse(const struct timeval& now, TCPResponse&& response) override
116 {
117 if (!response.d_idstate.doqu) {
118 return;
119 }
120
121 auto du = std::move(response.d_idstate.doqu);
122 if (du->dsc == nullptr) {
123 return;
124 }
125
126 du->response = std::move(response.d_buffer);
127 du->ids = std::move(response.d_idstate);
128 DNSResponse dr(du->ids, du->response, du->downstream);
129
130 dnsheader cleartextDH;
131 memcpy(&cleartextDH, dr.getHeader(), sizeof(cleartextDH));
132
133 if (!response.isAsync()) {
134
135 static thread_local LocalStateHolder<vector<DNSDistResponseRuleAction>> localRespRuleActions = g_respruleactions.getLocal();
136 static thread_local LocalStateHolder<vector<DNSDistResponseRuleAction>> localCacheInsertedRespRuleActions = g_cacheInsertedRespRuleActions.getLocal();
137
138 dr.ids.doqu = std::move(du);
139
140 if (!processResponse(dr.ids.doqu->response, *localRespRuleActions, *localCacheInsertedRespRuleActions, dr, false)) {
141 if (dr.ids.doqu) {
142
143 sendBackDOQUnit(std::move(dr.ids.doqu), "Response dropped by rules");
144 }
145 return;
146 }
147
148 if (dr.isAsynchronous()) {
149 return;
150 }
151
152 du = std::move(dr.ids.doqu);
153 }
154
155 if (!du->ids.selfGenerated) {
156 double udiff = du->ids.queryRealTime.udiff();
157 vinfolog("Got answer from %s, relayed to %s (quic), took %f us", du->downstream->d_config.remote.toStringWithPort(), du->ids.origRemote.toStringWithPort(), udiff);
158
159 auto backendProtocol = du->downstream->getProtocol();
160 if (backendProtocol == dnsdist::Protocol::DoUDP && du->tcp) {
161 backendProtocol = dnsdist::Protocol::DoTCP;
162 }
163 handleResponseSent(du->ids, udiff, du->ids.origRemote, du->downstream->d_config.remote, du->response.size(), cleartextDH, backendProtocol, true);
164 }
165
166 ++dnsdist::metrics::g_stats.responses;
167 if (du->ids.cs) {
168 ++du->ids.cs->responses;
169 }
170
171 sendBackDOQUnit(std::move(du), "Cross-protocol response");
172 }
173
174 void handleXFRResponse(const struct timeval& now, TCPResponse&& response) override
175 {
176 return handleResponse(now, std::move(response));
177 }
178
179 void notifyIOError(const struct timeval& now, TCPResponse&& response) override
180 {
181 }
182 };
183
184 class DOQCrossProtocolQuery : public CrossProtocolQuery
185 {
186 public:
187 DOQCrossProtocolQuery(DOQUnitUniquePtr&& du, bool isResponse)
188 {
189 if (isResponse) {
190 /* happens when a response becomes async */
191 query = InternalQuery(std::move(du->response), std::move(du->ids));
192 }
193 else {
194 /* we need to duplicate the query here because we might need
195 the existing query later if we get a truncated answer */
196 query = InternalQuery(PacketBuffer(du->query), std::move(du->ids));
197 }
198
199 /* it might have been moved when we moved du->ids */
200 if (du) {
201 query.d_idstate.doqu = std::move(du);
202 }
203
204 /* we _could_ remove it from the query buffer and put in query's d_proxyProtocolPayload,
205 clearing query.d_proxyProtocolPayloadAdded and du->proxyProtocolPayloadSize.
206 Leave it for now because we know that the onky case where the payload has been
207 added is when we tried over UDP, got a TC=1 answer and retried over TCP/DoT,
208 and we know the TCP/DoT code can handle it. */
209 query.d_proxyProtocolPayloadAdded = query.d_idstate.doqu->proxyProtocolPayloadSize > 0;
210 downstream = query.d_idstate.doqu->downstream;
211 }
212
213 void handleInternalError()
214 {
215 sendBackDOQUnit(std::move(query.d_idstate.doqu), "DOQ internal error");
216 }
217
218 std::shared_ptr<TCPQuerySender> getTCPQuerySender() override
219 {
220 query.d_idstate.doqu->downstream = downstream;
221 return s_sender;
222 }
223
224 DNSQuestion getDQ() override
225 {
226 auto& ids = query.d_idstate;
227 DNSQuestion dq(ids, query.d_buffer);
228 return dq;
229 }
230
231 DNSResponse getDR() override
232 {
233 auto& ids = query.d_idstate;
234 DNSResponse dr(ids, query.d_buffer, downstream);
235 return dr;
236 }
237
238 DOQUnitUniquePtr&& releaseDU()
239 {
240 return std::move(query.d_idstate.doqu);
241 }
242
243 private:
244 static std::shared_ptr<DOQTCPCrossQuerySender> s_sender;
245 };
246
247 std::shared_ptr<DOQTCPCrossQuerySender> DOQCrossProtocolQuery::s_sender = std::make_shared<DOQTCPCrossQuerySender>();
248
249 static void handleResponse(DOQFrontend& df, Connection& conn, const uint64_t streamID, const PacketBuffer& response)
250 {
251 if (response.size() == 0) {
252 quiche_conn_stream_shutdown(conn.d_conn.get(), streamID, QUICHE_SHUTDOWN_WRITE, 0x5);
253 }
254 else {
255 uint16_t responseSize = static_cast<uint16_t>(response.size());
256 const uint8_t sizeBytes[] = {static_cast<uint8_t>(responseSize / 256), static_cast<uint8_t>(responseSize % 256)};
257 auto res = quiche_conn_stream_send(conn.d_conn.get(), streamID, sizeBytes, sizeof(sizeBytes), false);
258 if (res == sizeof(sizeBytes)) {
259 res = quiche_conn_stream_send(conn.d_conn.get(), streamID, response.data(), response.size(), true);
260 }
261 }
262 }
263
264 static void fillRandom(PacketBuffer& buffer, size_t size)
265 {
266 buffer.reserve(size);
267 while (size > 0) {
268 buffer.insert(buffer.end(), dnsdist::getRandomValue(std::numeric_limits<uint8_t>::max()));
269 --size;
270 }
271 }
272 void DOQFrontend::setup()
273 {
274 auto config = QuicheConfig(quiche_config_new(QUICHE_PROTOCOL_VERSION), quiche_config_free);
275 for (const auto& pair : d_tlsConfig.d_certKeyPairs) {
276 auto res = quiche_config_load_cert_chain_from_pem_file(config.get(), pair.d_cert.c_str());
277 if (res != 0) {
278 throw std::runtime_error("Error loading the server certificate: " + std::to_string(res));
279 }
280 if (pair.d_key) {
281 res = quiche_config_load_priv_key_from_pem_file(config.get(), pair.d_key->c_str());
282 if (res != 0) {
283 throw std::runtime_error("Error loading the server key: " + std::to_string(res));
284 }
285 }
286 }
287
288 {
289 const std::array<uint8_t, 4> alpn{'\x03', 'd', 'o', 'q'};
290 auto res = quiche_config_set_application_protos(config.get(),
291 alpn.data(),
292 alpn.size());
293 if (res != 0) {
294 throw std::runtime_error("Error setting ALPN: " + std::to_string(res));
295 }
296 }
297
298 quiche_config_set_max_idle_timeout(config.get(), d_idleTimeout * 1000);
299 quiche_config_set_max_recv_udp_payload_size(config.get(), MAX_DATAGRAM_SIZE);
300 quiche_config_set_max_send_udp_payload_size(config.get(), MAX_DATAGRAM_SIZE);
301 quiche_config_set_initial_max_data(config.get(), 10000000);
302 quiche_config_set_initial_max_stream_data_bidi_local(config.get(), 1000000);
303 quiche_config_set_initial_max_stream_data_bidi_remote(config.get(), 1000000);
304 quiche_config_set_initial_max_streams_bidi(config.get(), 100);
305 quiche_config_set_cc_algorithm(config.get(), QUICHE_CC_RENO);
306
307 {
308 PacketBuffer resetToken;
309 fillRandom(resetToken, 16);
310 quiche_config_set_stateless_reset_token(config.get(), reinterpret_cast<const uint8_t*>(resetToken.data()));
311 }
312
313 d_server_config = std::make_shared<DOQServerConfig>(std::move(config), d_internalPipeBufferSize);
314 }
315
316 static std::optional<PacketBuffer> getCID()
317 {
318 PacketBuffer buffer;
319
320 fillRandom(buffer, LOCAL_CONN_ID_LEN);
321
322 return buffer;
323 }
324
325 static constexpr size_t MAX_TOKEN_LEN = std::tuple_size<decltype(SodiumNonce::value)>{} /* nonce */ + sizeof(uint64_t) /* TTD */ + 16 /* IPv6 */ + QUICHE_MAX_CONN_ID_LEN;
326
327 static PacketBuffer mintToken(const PacketBuffer& dcid, const ComboAddress& peer)
328 {
329 try {
330 SodiumNonce nonce;
331 nonce.init();
332
333 const auto addrBytes = peer.toByteString();
334 // this token will be valid for 60s
335 const uint64_t ttd = time(nullptr) + 60U;
336 PacketBuffer plainTextToken;
337 plainTextToken.reserve(sizeof(ttd) + addrBytes.size() + dcid.size());
338 plainTextToken.insert(plainTextToken.end(), reinterpret_cast<const char*>(&ttd), reinterpret_cast<const char*>(&ttd) + sizeof(ttd));
339 plainTextToken.insert(plainTextToken.end(), addrBytes.begin(), addrBytes.end());
340 plainTextToken.insert(plainTextToken.end(), dcid.begin(), dcid.end());
341 const auto encryptedToken = sodEncryptSym(std::string_view(reinterpret_cast<const char*>(plainTextToken.data()), plainTextToken.size()), s_quicRetryTokenKey, nonce, false);
342 // a bit sad, let's see if we can do better later
343 auto encryptedTokenPacket = PacketBuffer(encryptedToken.begin(), encryptedToken.end());
344 encryptedTokenPacket.insert(encryptedTokenPacket.begin(), nonce.value.begin(), nonce.value.end());
345 return encryptedTokenPacket;
346 }
347 catch (const std::exception& exp) {
348 vinfolog("Error while minting DoQ token: %s", exp.what());
349 throw;
350 }
351 }
352
353 // returns the original destination ID if the token is valid, nothing otherwise
354 static std::optional<PacketBuffer> validateToken(const PacketBuffer& token, const PacketBuffer& dcid, const ComboAddress& peer)
355 {
356 try {
357 SodiumNonce nonce;
358 auto addrBytes = peer.toByteString();
359 const uint64_t now = time(nullptr);
360 const auto minimumSize = nonce.value.size() + sizeof(now) + addrBytes.size();
361 if (token.size() <= minimumSize) {
362 return std::nullopt;
363 }
364
365 memcpy(nonce.value.data(), token.data(), nonce.value.size());
366
367 auto cipher = std::string_view(reinterpret_cast<const char*>(&token.at(nonce.value.size())), token.size() - nonce.value.size());
368 auto plainText = sodDecryptSym(cipher, s_quicRetryTokenKey, nonce, false);
369
370 if (plainText.size() <= sizeof(now) + addrBytes.size()) {
371 return std::nullopt;
372 }
373
374 uint64_t ttd{0};
375 memcpy(&ttd, plainText.data(), sizeof(ttd));
376 if (ttd < now) {
377 return std::nullopt;
378 }
379
380 if (std::memcmp(&plainText.at(sizeof(ttd)), &*addrBytes.begin(), addrBytes.size()) != 0) {
381 return std::nullopt;
382 }
383 return PacketBuffer(plainText.begin() + (sizeof(ttd) + addrBytes.size()), plainText.end());
384 }
385 catch (const std::exception& exp) {
386 vinfolog("Error while validating DoQ token: %s", exp.what());
387 return std::nullopt;
388 }
389 }
390
391 static void handleStatelessRetry(Socket& sock, const PacketBuffer& clientConnID, const PacketBuffer& serverConnID, const ComboAddress& peer, uint32_t version)
392 {
393 auto newServerConnID = getCID();
394 if (!newServerConnID) {
395 return;
396 }
397
398 auto token = mintToken(serverConnID, peer);
399
400 PacketBuffer out(MAX_DATAGRAM_SIZE);
401 auto written = quiche_retry(clientConnID.data(), clientConnID.size(),
402 serverConnID.data(), serverConnID.size(),
403 newServerConnID->data(), newServerConnID->size(),
404 token.data(), token.size(),
405 version,
406 out.data(), out.size());
407
408 if (written < 0) {
409 DEBUGLOG("failed to create retry packet " << written);
410 return;
411 }
412
413 out.resize(written);
414 sock.sendTo(std::string(out.begin(), out.end()), peer);
415 }
416
417 static void handleVersionNegociation(Socket& sock, const PacketBuffer& clientConnID, const PacketBuffer& serverConnID, const ComboAddress& peer)
418 {
419 PacketBuffer out(MAX_DATAGRAM_SIZE);
420
421 auto written = quiche_negotiate_version(clientConnID.data(), clientConnID.size(),
422 serverConnID.data(), serverConnID.size(),
423 out.data(), out.size());
424
425 if (written < 0) {
426 DEBUGLOG("failed to create vneg packet " << written);
427 return;
428 }
429 sock.sendTo(reinterpret_cast<const char*>(out.data()), written, peer);
430 }
431
432 static std::optional<std::reference_wrapper<Connection>> getConnection(const PacketBuffer& id)
433 {
434 auto it = s_connections.find(id);
435 if (it == s_connections.end()) {
436 return std::nullopt;
437 }
438 return it->second;
439 }
440
441 static void sendBackDOQUnit(DOQUnitUniquePtr&& du, const char* description)
442 {
443 if (du->dsc == nullptr) {
444 return;
445 }
446 try {
447 if (!du->dsc->d_responseSender.send(std::move(du))) {
448 vinfolog("Unable to pass a %s to the DoQ worker thread because the pipe is full", description);
449 }
450 }
451 catch (const std::exception& e) {
452 vinfolog("Unable to pass a %s to the DoQ worker thread because we couldn't write to the pipe: %s", description, e.what());
453 }
454 }
455
456 static std::optional<std::reference_wrapper<Connection>> createConnection(QuicheConfig& config, const PacketBuffer& serverSideID, const PacketBuffer& originalDestinationID, const PacketBuffer& token, const ComboAddress& local, const ComboAddress& peer)
457 {
458 auto quicheConn = QuicheConnection(quiche_accept(serverSideID.data(), serverSideID.size(),
459 originalDestinationID.data(), originalDestinationID.size(),
460 (struct sockaddr*)&local,
461 local.getSocklen(),
462 (struct sockaddr*)&peer,
463 peer.getSocklen(),
464 config.get()),
465 quiche_conn_free);
466 auto conn = Connection(peer, std::move(quicheConn));
467 auto pair = s_connections.emplace(serverSideID, std::move(conn));
468 return pair.first->second;
469 }
470
471 static void flushEgress(Socket& sock, Connection& conn)
472 {
473 std::array<uint8_t, MAX_DATAGRAM_SIZE> out;
474 quiche_send_info send_info;
475
476 while (true) {
477 auto written = quiche_conn_send(conn.d_conn.get(), out.data(), out.size(), &send_info);
478
479 if (written == QUICHE_ERR_DONE) {
480 return;
481 }
482
483 if (written < 0) {
484 return;
485 }
486 // FIXME pacing (as send_info.at should tell us when to send the packet) ?
487 sock.sendTo(reinterpret_cast<const char*>(out.data()), written, conn.d_peer);
488 }
489 }
490
491 std::unique_ptr<CrossProtocolQuery> getDOQCrossProtocolQueryFromDQ(DNSQuestion& dq, bool isResponse)
492 {
493 if (!dq.ids.doqu) {
494 throw std::runtime_error("Trying to create a DoQ cross protocol query without a valid DoQ unit");
495 }
496
497 auto du = std::move(dq.ids.doqu);
498 if (&dq.ids != &du->ids) {
499 du->ids = std::move(dq.ids);
500 }
501
502 du->ids.origID = dq.getHeader()->id;
503
504 if (!isResponse) {
505 if (du->query.data() != dq.getMutableData().data()) {
506 du->query = std::move(dq.getMutableData());
507 }
508 }
509 else {
510 if (du->response.data() != dq.getMutableData().data()) {
511 du->response = std::move(dq.getMutableData());
512 }
513 }
514
515 return std::make_unique<DOQCrossProtocolQuery>(std::move(du), isResponse);
516 }
517
518 /*
519 We are not in the main DoQ thread but in the DoQ 'client' thread.
520 */
521 static void processDOQQuery(DOQUnitUniquePtr&& unit)
522 {
523 const auto handleImmediateResponse = [](DOQUnitUniquePtr&& du, const char* reason) {
524 DEBUGLOG("handleImmediateResponse() reason=" << reason);
525 auto conn = getConnection(du->serverConnID);
526 handleResponse(*du->dsc->df, *conn, du->streamID, du->response);
527 du->ids.doqu.reset();
528 };
529
530 auto& ids = unit->ids;
531 ids.doqu = std::move(unit);
532 auto& du = ids.doqu;
533 uint16_t queryId = 0;
534 ComboAddress remote;
535
536 try {
537
538 remote = du->ids.origRemote;
539 DOQServerConfig* dsc = du->dsc;
540 auto& holders = dsc->holders;
541 ClientState& cs = *dsc->cs;
542
543 if (du->query.size() < sizeof(dnsheader)) {
544 ++dnsdist::metrics::g_stats.nonCompliantQueries;
545 ++cs.nonCompliantQueries;
546 struct dnsheader* dh = reinterpret_cast<struct dnsheader*>(du->query.data());
547 dh->rcode = RCode::ServFail;
548 dh->qr = true;
549 du->response = std::move(du->query);
550
551 handleImmediateResponse(std::move(du), "DoQ non-compliant query");
552 return;
553 }
554
555 ++cs.queries;
556 ++dnsdist::metrics::g_stats.queries;
557 du->ids.queryRealTime.start();
558
559 {
560 /* don't keep that pointer around, it will be invalidated if the buffer is ever resized */
561 struct dnsheader* dh = reinterpret_cast<struct dnsheader*>(du->query.data());
562
563 if (!checkQueryHeaders(dh, cs)) {
564 dh->rcode = RCode::ServFail;
565 dh->qr = true;
566 du->response = std::move(du->query);
567
568 handleImmediateResponse(std::move(du), "DoQ invalid headers");
569 return;
570 }
571
572 if (dh->qdcount == 0) {
573 dh->rcode = RCode::NotImp;
574 dh->qr = true;
575 du->response = std::move(du->query);
576
577 handleImmediateResponse(std::move(du), "DoQ empty query");
578 return;
579 }
580
581 queryId = ntohs(dh->id);
582 }
583
584 auto downstream = du->downstream;
585 du->ids.qname = DNSName(reinterpret_cast<const char*>(du->query.data()), du->query.size(), sizeof(dnsheader), false, &du->ids.qtype, &du->ids.qclass);
586 DNSQuestion dq(du->ids, du->query);
587 const uint16_t* flags = getFlagsFromDNSHeader(dq.getHeader());
588 ids.origFlags = *flags;
589 du->ids.cs = &cs;
590
591 auto result = processQuery(dq, holders, downstream);
592 if (result == ProcessQueryResult::Drop) {
593 handleImmediateResponse(std::move(du), "DoQ dropped query");
594 return;
595 }
596 else if (result == ProcessQueryResult::Asynchronous) {
597 return;
598 }
599 else if (result == ProcessQueryResult::SendAnswer) {
600 if (du->response.empty()) {
601 du->response = std::move(du->query);
602 }
603 if (du->response.size() >= sizeof(dnsheader)) {
604 auto dh = reinterpret_cast<const struct dnsheader*>(du->response.data());
605
606 handleResponseSent(du->ids.qname, QType(du->ids.qtype), 0., du->ids.origDest, ComboAddress(), du->response.size(), *dh, dnsdist::Protocol::DoQ, dnsdist::Protocol::DoQ, false);
607 }
608 handleImmediateResponse(std::move(du), "DoQ self-answered response");
609 return;
610 }
611
612 ++dnsdist::metrics::g_stats.responses;
613 if (du->ids.cs != nullptr) {
614 ++du->ids.cs->responses;
615 }
616
617 if (result != ProcessQueryResult::PassToBackend) {
618 handleImmediateResponse(std::move(du), "DoQ no backend available");
619 return;
620 }
621
622 if (downstream == nullptr) {
623 handleImmediateResponse(std::move(du), "DoQ no backend available");
624 return;
625 }
626
627 du->downstream = downstream;
628
629 std::string proxyProtocolPayload;
630 /* we need to do this _before_ creating the cross protocol query because
631 after that the buffer will have been moved */
632 if (downstream->d_config.useProxyProtocol) {
633 proxyProtocolPayload = getProxyProtocolPayload(dq);
634 }
635
636 du->ids.origID = htons(queryId);
637 du->tcp = true;
638
639 /* this moves du->ids, careful! */
640 auto cpq = std::make_unique<DOQCrossProtocolQuery>(std::move(du), false);
641 cpq->query.d_proxyProtocolPayload = std::move(proxyProtocolPayload);
642
643 if (downstream->passCrossProtocolQuery(std::move(cpq))) {
644 return;
645 }
646 else {
647 du = cpq->releaseDU();
648 handleImmediateResponse(std::move(du), "DoQ internal error");
649 return;
650 }
651 }
652 catch (const std::exception& e) {
653 vinfolog("Got an error in DOQ question thread while parsing a query from %s, id %d: %s", remote.toStringWithPort(), queryId, e.what());
654 handleImmediateResponse(std::move(du), "DoQ internal error");
655 return;
656 }
657
658 return;
659 }
660
661 static void doq_dispatch_query(DOQServerConfig& dsc, PacketBuffer&& query, const ComboAddress& local, const ComboAddress& remote, const PacketBuffer& serverConnID, const uint64_t streamID)
662 {
663 try {
664 /* we only parse it there as a sanity check, we will parse it again later */
665 DNSPacketMangler mangler(reinterpret_cast<char*>(query.data()), query.size());
666 mangler.skipDomainName();
667 mangler.skipBytes(4);
668 // Should we ensure message id is 0 ?
669
670 auto du = std::make_unique<DOQUnit>(std::move(query));
671 du->dsc = &dsc;
672 du->ids.origDest = local;
673 du->ids.origRemote = remote;
674 du->ids.protocol = dnsdist::Protocol::DoQ;
675 du->serverConnID = serverConnID;
676 du->streamID = streamID;
677
678 processDOQQuery(std::move(du));
679 }
680 catch (const std::exception& exp) {
681 vinfolog("Had error parsing DoQ DNS packet from %s: %s", remote.toStringWithPort(), exp.what());
682 }
683 }
684
685 static void flushResponses(pdns::channel::Receiver<DOQUnit>& receiver)
686 {
687 for (;;) {
688 try {
689 auto tmp = receiver.receive();
690 if (!tmp) {
691 return;
692 }
693
694 auto du = std::move(*tmp);
695 auto conn = getConnection(du->serverConnID);
696
697 handleResponse(*du->dsc->df, *conn, du->streamID, du->response);
698 }
699 catch (const std::exception& e) {
700 errlog("Error while processing response received over DoQ: %s", e.what());
701 }
702 catch (...) {
703 errlog("Unspecified error while processing response received over DoQ");
704 }
705 }
706 }
707
708 // this is the entrypoint from dnsdist.cc
709 void doqThread(ClientState* cs)
710 {
711 try {
712 std::shared_ptr<DOQFrontend>& frontend = cs->doqFrontend;
713
714 frontend->d_server_config->cs = cs;
715 frontend->d_server_config->df = cs->doqFrontend;
716
717 setThreadName("dnsdist/doq");
718
719 Socket sock(cs->udpFD);
720
721 PacketBuffer buffer(std::numeric_limits<unsigned short>::max());
722 auto mplexer = std::unique_ptr<FDMultiplexer>(FDMultiplexer::getMultiplexerSilent());
723
724 auto responseReceiverFD = frontend->d_server_config->d_responseReceiver.getDescriptor();
725 mplexer->addReadFD(sock.getHandle(), [](int, FDMultiplexer::funcparam_t&) {});
726 mplexer->addReadFD(responseReceiverFD, [](int, FDMultiplexer::funcparam_t&) {});
727 while (true) {
728 std::vector<int> readyFDs;
729 mplexer->getAvailableFDs(readyFDs, 500);
730
731 if (std::find(readyFDs.begin(), readyFDs.end(), sock.getHandle()) != readyFDs.end()) {
732 std::string bufferStr;
733 ComboAddress client;
734 sock.recvFrom(bufferStr, client);
735
736 uint32_t version{0};
737 uint8_t type;
738 std::array<uint8_t, QUICHE_MAX_CONN_ID_LEN> scid;
739 size_t scid_len = scid.size();
740 std::array<uint8_t, QUICHE_MAX_CONN_ID_LEN> dcid;
741 size_t dcid_len = dcid.size();
742 std::array<uint8_t, MAX_TOKEN_LEN> token;
743 size_t token_len = token.size();
744
745 auto res = quiche_header_info(reinterpret_cast<const uint8_t*>(bufferStr.data()), bufferStr.size(), LOCAL_CONN_ID_LEN,
746 &version, &type,
747 scid.data(), &scid_len,
748 dcid.data(), &dcid_len,
749 token.data(), &token_len);
750 if (res != 0) {
751 continue;
752 }
753
754 // destination connection ID, will have to be sent as original destination connection ID
755 PacketBuffer serverConnID(dcid.begin(), dcid.begin() + dcid_len);
756 // source connection ID, will have to be sent as destination connection ID
757 PacketBuffer clientConnID(scid.begin(), scid.begin() + scid_len);
758 auto conn = getConnection(serverConnID);
759
760 if (!conn) {
761 DEBUGLOG("Connection not found");
762 if (!quiche_version_is_supported(version)) {
763 DEBUGLOG("Unsupported version");
764 handleVersionNegociation(sock, clientConnID, serverConnID, client);
765 continue;
766 }
767
768 if (token_len == 0) {
769 /* stateless retry */
770 DEBUGLOG("No token received");
771 handleStatelessRetry(sock, clientConnID, serverConnID, client, version);
772 continue;
773 }
774
775 PacketBuffer tokenBuf(token.begin(), token.begin() + token_len);
776 auto originalDestinationID = validateToken(tokenBuf, serverConnID, client);
777 if (!originalDestinationID) {
778 DEBUGLOG("Discarding invalid token");
779 continue;
780 }
781
782 DEBUGLOG("Creating a new connection");
783 conn = createConnection(frontend->d_server_config->config, serverConnID, *originalDestinationID, tokenBuf, cs->local, client);
784 if (!conn) {
785 continue;
786 }
787 }
788 quiche_recv_info recv_info = {
789 (struct sockaddr*)&client,
790 client.getSocklen(),
791
792 (struct sockaddr*)&cs->local,
793 cs->local.getSocklen(),
794 };
795
796 auto done = quiche_conn_recv(conn->get().d_conn.get(), reinterpret_cast<uint8_t*>(bufferStr.data()), bufferStr.size(), &recv_info);
797 if (done < 0) {
798 continue;
799 }
800
801 if (quiche_conn_is_established(conn->get().d_conn.get())) {
802 auto readable = std::unique_ptr<quiche_stream_iter, decltype(&quiche_stream_iter_free)>(quiche_conn_readable(conn->get().d_conn.get()), quiche_stream_iter_free);
803
804 uint64_t streamID = 0;
805 while (quiche_stream_iter_next(readable.get(), &streamID)) {
806 bool fin = false;
807 buffer.resize(std::numeric_limits<unsigned short>::max());
808 auto received = quiche_conn_stream_recv(conn->get().d_conn.get(), streamID,
809 buffer.data(), buffer.size(),
810 &fin);
811 if (received < 2) {
812 break;
813 }
814 buffer.resize(received);
815
816 if (fin) {
817 // we skip message length, should we verify ?
818 buffer.erase(buffer.begin(), buffer.begin() + 2);
819 if (buffer.size() >= sizeof(dnsheader)) {
820 doq_dispatch_query(*(frontend->d_server_config), std::move(buffer), cs->local, client, serverConnID, streamID);
821 }
822 }
823 }
824 }
825 else {
826 DEBUGLOG("Connection not established");
827 }
828 // }
829 }
830
831 if (std::find(readyFDs.begin(), readyFDs.end(), responseReceiverFD) != readyFDs.end()) {
832 flushResponses(frontend->d_server_config->d_responseReceiver);
833 }
834
835 for (auto conn = s_connections.begin(); conn != s_connections.end();) {
836 quiche_conn_on_timeout(conn->second.d_conn.get());
837
838 flushEgress(sock, conn->second);
839
840 if (quiche_conn_is_closed(conn->second.d_conn.get())) {
841 #ifdef DEBUGLOG_ENABLED
842 quiche_stats stats;
843 quiche_path_stats path_stats;
844
845 quiche_conn_stats(conn->second.d_conn.get(), &stats);
846 quiche_conn_path_stats(conn->second.d_conn.get(), 0, &path_stats);
847
848 DEBUGLOG("Connection closed, recv=" << stats.recv << " sent=" << stats.sent << " lost=" << stats.lost << " rtt=" << path_stats.rtt << "ns cwnd=" << path_stats.cwnd);
849 #endif
850 conn = s_connections.erase(conn);
851 }
852 else {
853 ++conn;
854 }
855 }
856 }
857 }
858 catch (const std::exception& e) {
859 DEBUGLOG("Caught fatal error: " << e.what());
860 }
861 }