]> git.ipfire.org Git - thirdparty/pdns.git/blob - pdns/dnsdistdist/doq.cc
Merge pull request #11431 from jroessler-ox/docs-kskzskroll-update
[thirdparty/pdns.git] / pdns / dnsdistdist / doq.cc
1 /*
2 * This file is part of PowerDNS or dnsdist.
3 * Copyright -- PowerDNS.COM B.V. and its contributors
4 *
5 * This program is free software; you can redistribute it and/or modify
6 * it under the terms of version 2 of the GNU General Public License as
7 * published by the Free Software Foundation.
8 *
9 * In addition, for the avoidance of any doubt, permission is granted to
10 * link this program with OpenSSL and to (re)distribute the binaries
11 * produced as the result of such linking.
12 *
13 * This program is distributed in the hope that it will be useful,
14 * but WITHOUT ANY WARRANTY; without even the implied warranty of
15 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16 * GNU General Public License for more details.
17 *
18 * You should have received a copy of the GNU General Public License
19 * along with this program; if not, write to the Free Software
20 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
21 */
22
23 #include "doq.hh"
24
25 #ifdef HAVE_DNS_OVER_QUIC
26 #include <quiche.h>
27
28 #include "dolog.hh"
29 #include "iputils.hh"
30 #include "misc.hh"
31 #include "sstuff.hh"
32 #include "threadname.hh"
33
34 #include "dnsdist-dnsparser.hh"
35 #include "dnsdist-ecs.hh"
36 #include "dnsdist-proxy-protocol.hh"
37 #include "dnsdist-tcp.hh"
38 #include "dnsdist-random.hh"
39
40 #include "doq-common.hh"
41
42 using namespace dnsdist::doq;
43
44 #if 0
45 #define DEBUGLOG_ENABLED
46 #define DEBUGLOG(x) std::cerr << x << std::endl;
47 #else
48 #define DEBUGLOG(x)
49 #endif
50
51 class Connection
52 {
53 public:
54 Connection(const ComboAddress& peer, QuicheConfig config, QuicheConnection conn) :
55 d_peer(peer), d_conn(std::move(conn)), d_config(std::move(config))
56 {
57 }
58 Connection(const Connection&) = delete;
59 Connection(Connection&&) = default;
60 Connection& operator=(const Connection&) = delete;
61 Connection& operator=(Connection&&) = default;
62 ~Connection() = default;
63
64 ComboAddress d_peer;
65 QuicheConnection d_conn;
66 QuicheConfig d_config;
67
68 std::unordered_map<uint64_t, PacketBuffer> d_streamBuffers;
69 std::unordered_map<uint64_t, PacketBuffer> d_streamOutBuffers;
70 };
71
72 static void sendBackDOQUnit(DOQUnitUniquePtr&& unit, const char* description);
73
74 struct DOQServerConfig
75 {
76 DOQServerConfig(QuicheConfig&& config_, uint32_t internalPipeBufferSize) :
77 config(std::move(config_))
78 {
79 {
80 auto [sender, receiver] = pdns::channel::createObjectQueue<DOQUnit>(pdns::channel::SenderBlockingMode::SenderNonBlocking, pdns::channel::ReceiverBlockingMode::ReceiverNonBlocking, internalPipeBufferSize);
81 d_responseSender = std::move(sender);
82 d_responseReceiver = std::move(receiver);
83 }
84 }
85 DOQServerConfig(const DOQServerConfig&) = delete;
86 DOQServerConfig(DOQServerConfig&&) = default;
87 DOQServerConfig& operator=(const DOQServerConfig&) = delete;
88 DOQServerConfig& operator=(DOQServerConfig&&) = default;
89 ~DOQServerConfig() = default;
90
91 using ConnectionsMap = std::map<PacketBuffer, Connection>;
92
93 LocalHolders holders;
94 ConnectionsMap d_connections;
95 QuicheConfig config;
96 ClientState* clientState{nullptr};
97 std::shared_ptr<DOQFrontend> df{nullptr};
98 pdns::channel::Sender<DOQUnit> d_responseSender;
99 pdns::channel::Receiver<DOQUnit> d_responseReceiver;
100 };
101
102 /* these might seem useless, but they are needed because
103 they need to be declared _after_ the definition of DOQServerConfig
104 so that we can use a unique_ptr in DOQFrontend */
105 DOQFrontend::DOQFrontend() = default;
106 DOQFrontend::~DOQFrontend() = default;
107
108 class DOQTCPCrossQuerySender final : public TCPQuerySender
109 {
110 public:
111 DOQTCPCrossQuerySender() = default;
112
113 [[nodiscard]] bool active() const override
114 {
115 return true;
116 }
117
118 void handleResponse([[maybe_unused]] const struct timeval& now, TCPResponse&& response) override
119 {
120 if (!response.d_idstate.doqu) {
121 return;
122 }
123
124 auto unit = std::move(response.d_idstate.doqu);
125 if (unit->dsc == nullptr) {
126 return;
127 }
128
129 unit->response = std::move(response.d_buffer);
130 unit->ids = std::move(response.d_idstate);
131 DNSResponse dnsResponse(unit->ids, unit->response, unit->downstream);
132
133 dnsheader cleartextDH{};
134 memcpy(&cleartextDH, dnsResponse.getHeader().get(), sizeof(cleartextDH));
135
136 if (!response.isAsync()) {
137
138 static thread_local LocalStateHolder<vector<DNSDistResponseRuleAction>> localRespRuleActions = g_respruleactions.getLocal();
139 static thread_local LocalStateHolder<vector<DNSDistResponseRuleAction>> localCacheInsertedRespRuleActions = g_cacheInsertedRespRuleActions.getLocal();
140
141 dnsResponse.ids.doqu = std::move(unit);
142
143 if (!processResponse(dnsResponse.ids.doqu->response, *localRespRuleActions, *localCacheInsertedRespRuleActions, dnsResponse, false)) {
144 if (dnsResponse.ids.doqu) {
145
146 sendBackDOQUnit(std::move(dnsResponse.ids.doqu), "Response dropped by rules");
147 }
148 return;
149 }
150
151 if (dnsResponse.isAsynchronous()) {
152 return;
153 }
154
155 unit = std::move(dnsResponse.ids.doqu);
156 }
157
158 if (!unit->ids.selfGenerated) {
159 double udiff = unit->ids.queryRealTime.udiff();
160 vinfolog("Got answer from %s, relayed to %s (quic, %d bytes), took %f us", unit->downstream->d_config.remote.toStringWithPort(), unit->ids.origRemote.toStringWithPort(), unit->response.size(), udiff);
161
162 auto backendProtocol = unit->downstream->getProtocol();
163 if (backendProtocol == dnsdist::Protocol::DoUDP && unit->tcp) {
164 backendProtocol = dnsdist::Protocol::DoTCP;
165 }
166 handleResponseSent(unit->ids, udiff, unit->ids.origRemote, unit->downstream->d_config.remote, unit->response.size(), cleartextDH, backendProtocol, true);
167 }
168
169 ++dnsdist::metrics::g_stats.responses;
170 if (unit->ids.cs != nullptr) {
171 ++unit->ids.cs->responses;
172 }
173
174 sendBackDOQUnit(std::move(unit), "Cross-protocol response");
175 }
176
177 void handleXFRResponse(const struct timeval& now, TCPResponse&& response) override
178 {
179 return handleResponse(now, std::move(response));
180 }
181
182 void notifyIOError([[maybe_unused]] const struct timeval& now, TCPResponse&& response) override
183 {
184 if (!response.d_idstate.doqu) {
185 return;
186 }
187
188 auto unit = std::move(response.d_idstate.doqu);
189 if (unit->dsc == nullptr) {
190 return;
191 }
192
193 /* this will signal an error */
194 unit->response.clear();
195 unit->ids = std::move(response.d_idstate);
196 sendBackDOQUnit(std::move(unit), "Cross-protocol error");
197 }
198 };
199
200 class DOQCrossProtocolQuery : public CrossProtocolQuery
201 {
202 public:
203 DOQCrossProtocolQuery(DOQUnitUniquePtr&& unit, bool isResponse)
204 {
205 if (isResponse) {
206 /* happens when a response becomes async */
207 query = InternalQuery(std::move(unit->response), std::move(unit->ids));
208 }
209 else {
210 /* we need to duplicate the query here because we might need
211 the existing query later if we get a truncated answer */
212 query = InternalQuery(PacketBuffer(unit->query), std::move(unit->ids));
213 }
214
215 /* it might have been moved when we moved unit->ids */
216 if (unit) {
217 query.d_idstate.doqu = std::move(unit);
218 }
219
220 /* we _could_ remove it from the query buffer and put in query's d_proxyProtocolPayload,
221 clearing query.d_proxyProtocolPayloadAdded and unit->proxyProtocolPayloadSize.
222 Leave it for now because we know that the onky case where the payload has been
223 added is when we tried over UDP, got a TC=1 answer and retried over TCP/DoT,
224 and we know the TCP/DoT code can handle it. */
225 query.d_proxyProtocolPayloadAdded = query.d_idstate.doqu->proxyProtocolPayloadSize > 0;
226 downstream = query.d_idstate.doqu->downstream;
227 }
228
229 void handleInternalError()
230 {
231 sendBackDOQUnit(std::move(query.d_idstate.doqu), "DOQ internal error");
232 }
233
234 std::shared_ptr<TCPQuerySender> getTCPQuerySender() override
235 {
236 query.d_idstate.doqu->downstream = downstream;
237 return s_sender;
238 }
239
240 DNSQuestion getDQ() override
241 {
242 auto& ids = query.d_idstate;
243 DNSQuestion dnsQuestion(ids, query.d_buffer);
244 return dnsQuestion;
245 }
246
247 DNSResponse getDR() override
248 {
249 auto& ids = query.d_idstate;
250 DNSResponse dnsResponse(ids, query.d_buffer, downstream);
251 return dnsResponse;
252 }
253
254 DOQUnitUniquePtr&& releaseDU()
255 {
256 return std::move(query.d_idstate.doqu);
257 }
258
259 private:
260 static std::shared_ptr<DOQTCPCrossQuerySender> s_sender;
261 };
262
263 std::shared_ptr<DOQTCPCrossQuerySender> DOQCrossProtocolQuery::s_sender = std::make_shared<DOQTCPCrossQuerySender>();
264
265 static bool tryWriteResponse(Connection& conn, const uint64_t streamID, PacketBuffer& response)
266 {
267 size_t pos = 0;
268 while (pos < response.size()) {
269 auto res = quiche_conn_stream_send(conn.d_conn.get(), streamID, &response.at(pos), response.size() - pos, true);
270 if (res == QUICHE_ERR_DONE) {
271 response.erase(response.begin(), response.begin() + static_cast<ssize_t>(pos));
272 return false;
273 }
274 if (res < 0) {
275 quiche_conn_stream_shutdown(conn.d_conn.get(), streamID, QUICHE_SHUTDOWN_WRITE, static_cast<uint64_t>(DOQ_Error_Codes::DOQ_INTERNAL_ERROR));
276 return true;
277 }
278 pos += res;
279 }
280
281 return true;
282 }
283
284 static void handleResponse(DOQFrontend& frontend, Connection& conn, const uint64_t streamID, PacketBuffer& response)
285 {
286 if (response.empty()) {
287 ++frontend.d_errorResponses;
288 quiche_conn_stream_shutdown(conn.d_conn.get(), streamID, QUICHE_SHUTDOWN_WRITE, static_cast<uint64_t>(DOQ_Error_Codes::DOQ_UNSPECIFIED_ERROR));
289 return;
290 }
291 ++frontend.d_validResponses;
292 auto responseSize = static_cast<uint16_t>(response.size());
293 const std::array<uint8_t, 2> sizeBytes = {static_cast<uint8_t>(responseSize / 256), static_cast<uint8_t>(responseSize % 256)};
294 response.insert(response.begin(), sizeBytes.begin(), sizeBytes.end());
295 if (!tryWriteResponse(conn, streamID, response)) {
296 conn.d_streamOutBuffers[streamID] = std::move(response);
297 }
298 }
299
300 void DOQFrontend::setup()
301 {
302 auto config = QuicheConfig(quiche_config_new(QUICHE_PROTOCOL_VERSION), quiche_config_free);
303 d_quicheParams.d_alpn = std::string(DOQ_ALPN.begin(), DOQ_ALPN.end());
304 configureQuiche(config, d_quicheParams, false);
305 d_server_config = std::make_unique<DOQServerConfig>(std::move(config), d_internalPipeBufferSize);
306 }
307
308 void DOQFrontend::reloadCertificates()
309 {
310 auto config = QuicheConfig(quiche_config_new(QUICHE_PROTOCOL_VERSION), quiche_config_free);
311 d_quicheParams.d_alpn = std::string(DOQ_ALPN.begin(), DOQ_ALPN.end());
312 configureQuiche(config, d_quicheParams, false);
313 std::atomic_store_explicit(&d_server_config->config, std::move(config), std::memory_order_release);
314 }
315
316 static std::optional<std::reference_wrapper<Connection>> getConnection(DOQServerConfig::ConnectionsMap& connMap, const PacketBuffer& connID)
317 {
318 auto iter = connMap.find(connID);
319 if (iter == connMap.end()) {
320 return std::nullopt;
321 }
322 return iter->second;
323 }
324
325 static void sendBackDOQUnit(DOQUnitUniquePtr&& unit, const char* description)
326 {
327 if (unit->dsc == nullptr) {
328 return;
329 }
330 try {
331 if (!unit->dsc->d_responseSender.send(std::move(unit))) {
332 ++dnsdist::metrics::g_stats.doqResponsePipeFull;
333 vinfolog("Unable to pass a %s to the DoQ worker thread because the pipe is full", description);
334 }
335 }
336 catch (const std::exception& e) {
337 vinfolog("Unable to pass a %s to the DoQ worker thread because we couldn't write to the pipe: %s", description, e.what());
338 }
339 }
340
341 static std::optional<std::reference_wrapper<Connection>> createConnection(DOQServerConfig& config, const PacketBuffer& serverSideID, const PacketBuffer& originalDestinationID, const ComboAddress& local, const ComboAddress& peer)
342 {
343 auto quicheConfig = std::atomic_load_explicit(&config.config, std::memory_order_acquire);
344 auto quicheConn = QuicheConnection(quiche_accept(serverSideID.data(), serverSideID.size(),
345 originalDestinationID.data(), originalDestinationID.size(),
346 // NOLINTNEXTLINE(cppcoreguidelines-pro-type-reinterpret-cast)
347 reinterpret_cast<const struct sockaddr*>(&local),
348 local.getSocklen(),
349 // NOLINTNEXTLINE(cppcoreguidelines-pro-type-reinterpret-cast)
350 reinterpret_cast<const struct sockaddr*>(&peer),
351 peer.getSocklen(),
352 quicheConfig.get()),
353 quiche_conn_free);
354
355 if (config.df && !config.df->d_quicheParams.d_keyLogFile.empty()) {
356 quiche_conn_set_keylog_path(quicheConn.get(), config.df->d_quicheParams.d_keyLogFile.c_str());
357 }
358
359 auto conn = Connection(peer, std::move(quicheConfig), std::move(quicheConn));
360 auto pair = config.d_connections.emplace(serverSideID, std::move(conn));
361 return pair.first->second;
362 }
363
364 std::unique_ptr<CrossProtocolQuery> getDOQCrossProtocolQueryFromDQ(DNSQuestion& dnsQuestion, bool isResponse)
365 {
366 if (!dnsQuestion.ids.doqu) {
367 throw std::runtime_error("Trying to create a DoQ cross protocol query without a valid DoQ unit");
368 }
369
370 auto unit = std::move(dnsQuestion.ids.doqu);
371 if (&dnsQuestion.ids != &unit->ids) {
372 unit->ids = std::move(dnsQuestion.ids);
373 }
374
375 unit->ids.origID = dnsQuestion.getHeader()->id;
376
377 if (!isResponse) {
378 if (unit->query.data() != dnsQuestion.getMutableData().data()) {
379 unit->query = std::move(dnsQuestion.getMutableData());
380 }
381 }
382 else {
383 if (unit->response.data() != dnsQuestion.getMutableData().data()) {
384 unit->response = std::move(dnsQuestion.getMutableData());
385 }
386 }
387
388 return std::make_unique<DOQCrossProtocolQuery>(std::move(unit), isResponse);
389 }
390
391 static void processDOQQuery(DOQUnitUniquePtr&& doqUnit)
392 {
393 const auto handleImmediateResponse = [](DOQUnitUniquePtr&& unit, [[maybe_unused]] const char* reason) {
394 DEBUGLOG("handleImmediateResponse() reason=" << reason);
395 auto conn = getConnection(unit->dsc->df->d_server_config->d_connections, unit->serverConnID);
396 handleResponse(*unit->dsc->df, *conn, unit->streamID, unit->response);
397 unit->ids.doqu.reset();
398 };
399
400 auto& ids = doqUnit->ids;
401 ids.doqu = std::move(doqUnit);
402 auto& unit = ids.doqu;
403 uint16_t queryId = 0;
404 ComboAddress remote;
405
406 try {
407
408 remote = unit->ids.origRemote;
409 DOQServerConfig* dsc = unit->dsc;
410 auto& holders = dsc->holders;
411 ClientState& clientState = *dsc->clientState;
412
413 if (!holders.acl->match(remote)) {
414 vinfolog("Query from %s (DoQ) dropped because of ACL", remote.toStringWithPort());
415 ++dnsdist::metrics::g_stats.aclDrops;
416 unit->response.clear();
417
418 handleImmediateResponse(std::move(unit), "DoQ query dropped because of ACL");
419 return;
420 }
421
422 if (unit->query.size() < sizeof(dnsheader)) {
423 ++dnsdist::metrics::g_stats.nonCompliantQueries;
424 ++clientState.nonCompliantQueries;
425 unit->response.clear();
426
427 handleImmediateResponse(std::move(unit), "DoQ non-compliant query");
428 return;
429 }
430
431 ++clientState.queries;
432 ++dnsdist::metrics::g_stats.queries;
433 unit->ids.queryRealTime.start();
434
435 {
436 /* don't keep that pointer around, it will be invalidated if the buffer is ever resized */
437 dnsheader_aligned dnsHeader(unit->query.data());
438
439 if (!checkQueryHeaders(*dnsHeader, clientState)) {
440 dnsdist::PacketMangling::editDNSHeaderFromPacket(unit->query, [](dnsheader& header) {
441 header.rcode = RCode::ServFail;
442 header.qr = true;
443 return true;
444 });
445 unit->response = std::move(unit->query);
446
447 handleImmediateResponse(std::move(unit), "DoQ invalid headers");
448 return;
449 }
450
451 if (dnsHeader->qdcount == 0) {
452 dnsdist::PacketMangling::editDNSHeaderFromPacket(unit->query, [](dnsheader& header) {
453 header.rcode = RCode::NotImp;
454 header.qr = true;
455 return true;
456 });
457 unit->response = std::move(unit->query);
458
459 handleImmediateResponse(std::move(unit), "DoQ empty query");
460 return;
461 }
462
463 queryId = ntohs(dnsHeader->id);
464 }
465
466 auto downstream = unit->downstream;
467 // NOLINTNEXTLINE(cppcoreguidelines-pro-type-reinterpret-cast)
468 unit->ids.qname = DNSName(reinterpret_cast<const char*>(unit->query.data()), static_cast<int>(unit->query.size()), sizeof(dnsheader), false, &unit->ids.qtype, &unit->ids.qclass);
469 DNSQuestion dnsQuestion(unit->ids, unit->query);
470 dnsdist::PacketMangling::editDNSHeaderFromPacket(dnsQuestion.getMutableData(), [&ids](dnsheader& header) {
471 const uint16_t* flags = getFlagsFromDNSHeader(&header);
472 ids.origFlags = *flags;
473 return true;
474 });
475 unit->ids.cs = &clientState;
476
477 auto result = processQuery(dnsQuestion, holders, downstream);
478 if (result == ProcessQueryResult::Drop) {
479 handleImmediateResponse(std::move(unit), "DoQ dropped query");
480 return;
481 }
482 if (result == ProcessQueryResult::Asynchronous) {
483 return;
484 }
485 if (result == ProcessQueryResult::SendAnswer) {
486 if (unit->response.empty()) {
487 unit->response = std::move(unit->query);
488 }
489 if (unit->response.size() >= sizeof(dnsheader)) {
490 const dnsheader_aligned dnsHeader(unit->response.data());
491
492 handleResponseSent(unit->ids.qname, QType(unit->ids.qtype), 0., unit->ids.origDest, ComboAddress(), unit->response.size(), *dnsHeader, dnsdist::Protocol::DoQ, dnsdist::Protocol::DoQ, false);
493 }
494 handleImmediateResponse(std::move(unit), "DoQ self-answered response");
495 return;
496 }
497
498 ++dnsdist::metrics::g_stats.responses;
499 if (unit->ids.cs != nullptr) {
500 ++unit->ids.cs->responses;
501 }
502
503 if (result != ProcessQueryResult::PassToBackend) {
504 handleImmediateResponse(std::move(unit), "DoQ no backend available");
505 return;
506 }
507
508 if (downstream == nullptr) {
509 handleImmediateResponse(std::move(unit), "DoQ no backend available");
510 return;
511 }
512
513 unit->downstream = downstream;
514
515 std::string proxyProtocolPayload;
516 /* we need to do this _before_ creating the cross protocol query because
517 after that the buffer will have been moved */
518 if (downstream->d_config.useProxyProtocol) {
519 proxyProtocolPayload = getProxyProtocolPayload(dnsQuestion);
520 }
521
522 unit->ids.origID = htons(queryId);
523 unit->tcp = true;
524
525 /* this moves unit->ids, careful! */
526 auto cpq = std::make_unique<DOQCrossProtocolQuery>(std::move(unit), false);
527 cpq->query.d_proxyProtocolPayload = std::move(proxyProtocolPayload);
528
529 if (downstream->passCrossProtocolQuery(std::move(cpq))) {
530 return;
531 }
532 // NOLINTNEXTLINE(bugprone-use-after-move): it was only moved if the call succeeded
533 unit = cpq->releaseDU();
534 handleImmediateResponse(std::move(unit), "DoQ internal error");
535 return;
536 }
537 catch (const std::exception& e) {
538 vinfolog("Got an error in DOQ question thread while parsing a query from %s, id %d: %s", remote.toStringWithPort(), queryId, e.what());
539 handleImmediateResponse(std::move(unit), "DoQ internal error");
540 return;
541 }
542 }
543
544 static void doq_dispatch_query(DOQServerConfig& dsc, PacketBuffer&& query, const ComboAddress& local, const ComboAddress& remote, const PacketBuffer& serverConnID, const uint64_t streamID)
545 {
546 try {
547 auto unit = std::make_unique<DOQUnit>(std::move(query));
548 unit->dsc = &dsc;
549 unit->ids.origDest = local;
550 unit->ids.origRemote = remote;
551 unit->ids.protocol = dnsdist::Protocol::DoQ;
552 unit->serverConnID = serverConnID;
553 unit->streamID = streamID;
554
555 processDOQQuery(std::move(unit));
556 }
557 catch (const std::exception& exp) {
558 vinfolog("Had error handling DoQ DNS packet from %s: %s", remote.toStringWithPort(), exp.what());
559 }
560 }
561
562 static void flushResponses(pdns::channel::Receiver<DOQUnit>& receiver)
563 {
564 for (;;) {
565 try {
566 auto tmp = receiver.receive();
567 if (!tmp) {
568 return;
569 }
570
571 auto unit = std::move(*tmp);
572 auto conn = getConnection(unit->dsc->df->d_server_config->d_connections, unit->serverConnID);
573 if (conn) {
574 handleResponse(*unit->dsc->df, *conn, unit->streamID, unit->response);
575 }
576 }
577 catch (const std::exception& e) {
578 errlog("Error while processing response received over DoQ: %s", e.what());
579 }
580 catch (...) {
581 errlog("Unspecified error while processing response received over DoQ");
582 }
583 }
584 }
585
586 static void flushStalledResponses(Connection& conn)
587 {
588 for (auto streamIt = conn.d_streamOutBuffers.begin(); streamIt != conn.d_streamOutBuffers.end();) {
589 const auto& streamID = streamIt->first;
590 auto& response = streamIt->second;
591 if (quiche_conn_stream_writable(conn.d_conn.get(), streamID, response.size()) == 1) {
592 if (tryWriteResponse(conn, streamID, response)) {
593 streamIt = conn.d_streamOutBuffers.erase(streamIt);
594 continue;
595 }
596 }
597 ++streamIt;
598 }
599 }
600
601 static void handleReadableStream(DOQFrontend& frontend, ClientState& clientState, Connection& conn, uint64_t streamID, const ComboAddress& client, const PacketBuffer& serverConnID)
602 {
603 auto& streamBuffer = conn.d_streamBuffers[streamID];
604 while (true) {
605 bool fin = false;
606 auto existingLength = streamBuffer.size();
607 streamBuffer.resize(existingLength + 512);
608 auto received = quiche_conn_stream_recv(conn.d_conn.get(), streamID,
609 &streamBuffer.at(existingLength), 512,
610 &fin);
611 if (received == 0 || received == QUICHE_ERR_DONE) {
612 streamBuffer.resize(existingLength);
613 return;
614 }
615 if (received < 0) {
616 ++dnsdist::metrics::g_stats.nonCompliantQueries;
617 ++clientState.nonCompliantQueries;
618 quiche_conn_stream_shutdown(conn.d_conn.get(), streamID, QUICHE_SHUTDOWN_WRITE, static_cast<uint64_t>(DOQ_Error_Codes::DOQ_PROTOCOL_ERROR));
619 return;
620 }
621
622 streamBuffer.resize(existingLength + received);
623 if (fin) {
624 break;
625 }
626 }
627
628 if (streamBuffer.size() < (sizeof(uint16_t) + sizeof(dnsheader))) {
629 ++dnsdist::metrics::g_stats.nonCompliantQueries;
630 ++clientState.nonCompliantQueries;
631 quiche_conn_stream_shutdown(conn.d_conn.get(), streamID, QUICHE_SHUTDOWN_WRITE, static_cast<uint64_t>(DOQ_Error_Codes::DOQ_PROTOCOL_ERROR));
632 return;
633 }
634
635 uint16_t payloadLength = streamBuffer.at(0) * 256 + streamBuffer.at(1);
636 streamBuffer.erase(streamBuffer.begin(), streamBuffer.begin() + 2);
637 if (payloadLength != streamBuffer.size()) {
638 ++dnsdist::metrics::g_stats.nonCompliantQueries;
639 ++clientState.nonCompliantQueries;
640 quiche_conn_stream_shutdown(conn.d_conn.get(), streamID, QUICHE_SHUTDOWN_WRITE, static_cast<uint64_t>(DOQ_Error_Codes::DOQ_PROTOCOL_ERROR));
641 return;
642 }
643 DEBUGLOG("Dispatching query");
644 doq_dispatch_query(*(frontend.d_server_config), std::move(streamBuffer), clientState.local, client, serverConnID, streamID);
645 conn.d_streamBuffers.erase(streamID);
646 }
647
648 static void handleSocketReadable(DOQFrontend& frontend, ClientState& clientState, Socket& sock, PacketBuffer& buffer)
649 {
650 // destination connection ID, will have to be sent as original destination connection ID
651 PacketBuffer serverConnID;
652 // source connection ID, will have to be sent as destination connection ID
653 PacketBuffer clientConnID;
654 PacketBuffer tokenBuf;
655 while (true) {
656 ComboAddress client;
657 buffer.resize(4096);
658 if (!sock.recvFromAsync(buffer, client) || buffer.empty()) {
659 return;
660 }
661 DEBUGLOG("Received DoQ datagram of size " << buffer.size() << " from " << client.toStringWithPort());
662
663 uint32_t version{0};
664 uint8_t type{0};
665 std::array<uint8_t, QUICHE_MAX_CONN_ID_LEN> scid{};
666 size_t scid_len = scid.size();
667 std::array<uint8_t, QUICHE_MAX_CONN_ID_LEN> dcid{};
668 size_t dcid_len = dcid.size();
669 std::array<uint8_t, MAX_TOKEN_LEN> token{};
670 size_t token_len = token.size();
671
672 auto res = quiche_header_info(buffer.data(), buffer.size(), LOCAL_CONN_ID_LEN,
673 &version, &type,
674 scid.data(), &scid_len,
675 dcid.data(), &dcid_len,
676 token.data(), &token_len);
677 if (res != 0) {
678 DEBUGLOG("Error in quiche_header_info: " << res);
679 continue;
680 }
681
682 serverConnID.assign(dcid.begin(), dcid.begin() + dcid_len);
683 clientConnID.assign(scid.begin(), scid.begin() + scid_len);
684 auto conn = getConnection(frontend.d_server_config->d_connections, serverConnID);
685
686 if (!conn) {
687 DEBUGLOG("Connection not found");
688 if (type != static_cast<uint8_t>(DOQ_Packet_Types::QUIC_PACKET_TYPE_INITIAL)) {
689 DEBUGLOG("Packet is not initial");
690 continue;
691 }
692
693 if (!quiche_version_is_supported(version)) {
694 DEBUGLOG("Unsupported version");
695 ++frontend.d_doqUnsupportedVersionErrors;
696 handleVersionNegociation(sock, clientConnID, serverConnID, client, buffer);
697 continue;
698 }
699
700 if (token_len == 0) {
701 /* stateless retry */
702 DEBUGLOG("No token received");
703 handleStatelessRetry(sock, clientConnID, serverConnID, client, version, buffer);
704 continue;
705 }
706
707 tokenBuf.assign(token.begin(), token.begin() + token_len);
708 auto originalDestinationID = validateToken(tokenBuf, client);
709 if (!originalDestinationID) {
710 ++frontend.d_doqInvalidTokensReceived;
711 DEBUGLOG("Discarding invalid token");
712 continue;
713 }
714
715 DEBUGLOG("Creating a new connection");
716 conn = createConnection(*frontend.d_server_config, serverConnID, *originalDestinationID, clientState.local, client);
717 if (!conn) {
718 continue;
719 }
720 }
721 DEBUGLOG("Connection found");
722 quiche_recv_info recv_info = {
723 // NOLINTNEXTLINE(cppcoreguidelines-pro-type-reinterpret-cast)
724 reinterpret_cast<struct sockaddr*>(&client),
725 client.getSocklen(),
726 // NOLINTNEXTLINE(cppcoreguidelines-pro-type-reinterpret-cast)
727 reinterpret_cast<struct sockaddr*>(&clientState.local),
728 clientState.local.getSocklen(),
729 };
730
731 auto done = quiche_conn_recv(conn->get().d_conn.get(), buffer.data(), buffer.size(), &recv_info);
732 if (done < 0) {
733 continue;
734 }
735
736 if (quiche_conn_is_established(conn->get().d_conn.get()) || quiche_conn_is_in_early_data(conn->get().d_conn.get())) {
737 auto readable = std::unique_ptr<quiche_stream_iter, decltype(&quiche_stream_iter_free)>(quiche_conn_readable(conn->get().d_conn.get()), quiche_stream_iter_free);
738
739 uint64_t streamID = 0;
740 while (quiche_stream_iter_next(readable.get(), &streamID)) {
741 handleReadableStream(frontend, clientState, *conn, streamID, client, serverConnID);
742 }
743
744 flushEgress(sock, conn->get().d_conn, client, buffer);
745 }
746 else {
747 DEBUGLOG("Connection not established");
748 }
749 }
750 }
751
752 // this is the entrypoint from dnsdist.cc
753 void doqThread(ClientState* clientState)
754 {
755 try {
756 std::shared_ptr<DOQFrontend>& frontend = clientState->doqFrontend;
757
758 frontend->d_server_config->clientState = clientState;
759 frontend->d_server_config->df = clientState->doqFrontend;
760
761 setThreadName("dnsdist/doq");
762
763 Socket sock(clientState->udpFD);
764 sock.setNonBlocking();
765
766 auto mplexer = std::unique_ptr<FDMultiplexer>(FDMultiplexer::getMultiplexerSilent());
767
768 auto responseReceiverFD = frontend->d_server_config->d_responseReceiver.getDescriptor();
769 mplexer->addReadFD(sock.getHandle(), [](int, FDMultiplexer::funcparam_t&) {});
770 mplexer->addReadFD(responseReceiverFD, [](int, FDMultiplexer::funcparam_t&) {});
771 std::vector<int> readyFDs;
772 PacketBuffer buffer(4096);
773 while (true) {
774 readyFDs.clear();
775 mplexer->getAvailableFDs(readyFDs, 500);
776
777 try {
778 if (std::find(readyFDs.begin(), readyFDs.end(), sock.getHandle()) != readyFDs.end()) {
779 handleSocketReadable(*frontend, *clientState, sock, buffer);
780 }
781
782 if (std::find(readyFDs.begin(), readyFDs.end(), responseReceiverFD) != readyFDs.end()) {
783 flushResponses(frontend->d_server_config->d_responseReceiver);
784 }
785
786 for (auto conn = frontend->d_server_config->d_connections.begin(); conn != frontend->d_server_config->d_connections.end();) {
787 quiche_conn_on_timeout(conn->second.d_conn.get());
788
789 flushEgress(sock, conn->second.d_conn, conn->second.d_peer, buffer);
790
791 if (quiche_conn_is_closed(conn->second.d_conn.get())) {
792 #ifdef DEBUGLOG_ENABLED
793 quiche_stats stats;
794 quiche_path_stats path_stats;
795
796 quiche_conn_stats(conn->second.d_conn.get(), &stats);
797 quiche_conn_path_stats(conn->second.d_conn.get(), 0, &path_stats);
798
799 DEBUGLOG("Connection (DoQ) closed, recv=" << stats.recv << " sent=" << stats.sent << " lost=" << stats.lost << " rtt=" << path_stats.rtt << "ns cwnd=" << path_stats.cwnd);
800 #endif
801 conn = frontend->d_server_config->d_connections.erase(conn);
802 }
803 else {
804 flushStalledResponses(conn->second);
805 ++conn;
806 }
807 }
808 }
809 catch (const std::exception& exp) {
810 vinfolog("Caught exception in the main DoQ thread: %s", exp.what());
811 }
812 catch (...) {
813 vinfolog("Unknown exception in the main DoQ thread");
814 }
815 }
816 }
817 catch (const std::exception& e) {
818 DEBUGLOG("Caught fatal error in the main DoQ thread: " << e.what());
819 }
820 }
821
822 #endif /* HAVE_DNS_OVER_QUIC */