2 * This file is part of PowerDNS or dnsdist.
3 * Copyright -- PowerDNS.COM B.V. and its contributors
5 * This program is free software; you can redistribute it and/or modify
6 * it under the terms of version 2 of the GNU General Public License as
7 * published by the Free Software Foundation.
9 * In addition, for the avoidance of any doubt, permission is granted to
10 * link this program with OpenSSL and to (re)distribute the binaries
11 * produced as the result of such linking.
13 * This program is distributed in the hope that it will be useful,
14 * but WITHOUT ANY WARRANTY; without even the implied warranty of
15 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16 * GNU General Public License for more details.
18 * You should have received a copy of the GNU General Public License
19 * along with this program; if not, write to the Free Software
20 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
25 #ifdef HAVE_DNS_OVER_QUIC
32 #include "threadname.hh"
34 #include "dnsdist-dnsparser.hh"
35 #include "dnsdist-ecs.hh"
36 #include "dnsdist-proxy-protocol.hh"
37 #include "dnsdist-tcp.hh"
38 #include "dnsdist-random.hh"
40 #include "doq-common.hh"
42 using namespace dnsdist::doq
;
45 #define DEBUGLOG_ENABLED
46 #define DEBUGLOG(x) std::cerr << x << std::endl;
54 Connection(const ComboAddress
& peer
, QuicheConfig config
, QuicheConnection conn
) :
55 d_peer(peer
), d_conn(std::move(conn
)), d_config(std::move(config
))
58 Connection(const Connection
&) = delete;
59 Connection(Connection
&&) = default;
60 Connection
& operator=(const Connection
&) = delete;
61 Connection
& operator=(Connection
&&) = default;
62 ~Connection() = default;
65 QuicheConnection d_conn
;
66 QuicheConfig d_config
;
68 std::unordered_map
<uint64_t, PacketBuffer
> d_streamBuffers
;
69 std::unordered_map
<uint64_t, PacketBuffer
> d_streamOutBuffers
;
72 static void sendBackDOQUnit(DOQUnitUniquePtr
&& unit
, const char* description
);
74 struct DOQServerConfig
76 DOQServerConfig(QuicheConfig
&& config_
, uint32_t internalPipeBufferSize
) :
77 config(std::move(config_
))
80 auto [sender
, receiver
] = pdns::channel::createObjectQueue
<DOQUnit
>(pdns::channel::SenderBlockingMode::SenderNonBlocking
, pdns::channel::ReceiverBlockingMode::ReceiverNonBlocking
, internalPipeBufferSize
);
81 d_responseSender
= std::move(sender
);
82 d_responseReceiver
= std::move(receiver
);
85 DOQServerConfig(const DOQServerConfig
&) = delete;
86 DOQServerConfig(DOQServerConfig
&&) = default;
87 DOQServerConfig
& operator=(const DOQServerConfig
&) = delete;
88 DOQServerConfig
& operator=(DOQServerConfig
&&) = default;
89 ~DOQServerConfig() = default;
91 using ConnectionsMap
= std::map
<PacketBuffer
, Connection
>;
94 ConnectionsMap d_connections
;
96 ClientState
* clientState
{nullptr};
97 std::shared_ptr
<DOQFrontend
> df
{nullptr};
98 pdns::channel::Sender
<DOQUnit
> d_responseSender
;
99 pdns::channel::Receiver
<DOQUnit
> d_responseReceiver
;
102 /* these might seem useless, but they are needed because
103 they need to be declared _after_ the definition of DOQServerConfig
104 so that we can use a unique_ptr in DOQFrontend */
105 DOQFrontend::DOQFrontend() = default;
106 DOQFrontend::~DOQFrontend() = default;
108 class DOQTCPCrossQuerySender final
: public TCPQuerySender
111 DOQTCPCrossQuerySender() = default;
113 [[nodiscard
]] bool active() const override
118 void handleResponse([[maybe_unused
]] const struct timeval
& now
, TCPResponse
&& response
) override
120 if (!response
.d_idstate
.doqu
) {
124 auto unit
= std::move(response
.d_idstate
.doqu
);
125 if (unit
->dsc
== nullptr) {
129 unit
->response
= std::move(response
.d_buffer
);
130 unit
->ids
= std::move(response
.d_idstate
);
131 DNSResponse
dnsResponse(unit
->ids
, unit
->response
, unit
->downstream
);
133 dnsheader cleartextDH
{};
134 memcpy(&cleartextDH
, dnsResponse
.getHeader().get(), sizeof(cleartextDH
));
136 if (!response
.isAsync()) {
138 static thread_local LocalStateHolder
<vector
<DNSDistResponseRuleAction
>> localRespRuleActions
= g_respruleactions
.getLocal();
139 static thread_local LocalStateHolder
<vector
<DNSDistResponseRuleAction
>> localCacheInsertedRespRuleActions
= g_cacheInsertedRespRuleActions
.getLocal();
141 dnsResponse
.ids
.doqu
= std::move(unit
);
143 if (!processResponse(dnsResponse
.ids
.doqu
->response
, *localRespRuleActions
, *localCacheInsertedRespRuleActions
, dnsResponse
, false)) {
144 if (dnsResponse
.ids
.doqu
) {
146 sendBackDOQUnit(std::move(dnsResponse
.ids
.doqu
), "Response dropped by rules");
151 if (dnsResponse
.isAsynchronous()) {
155 unit
= std::move(dnsResponse
.ids
.doqu
);
158 if (!unit
->ids
.selfGenerated
) {
159 double udiff
= unit
->ids
.queryRealTime
.udiff();
160 vinfolog("Got answer from %s, relayed to %s (quic, %d bytes), took %f us", unit
->downstream
->d_config
.remote
.toStringWithPort(), unit
->ids
.origRemote
.toStringWithPort(), unit
->response
.size(), udiff
);
162 auto backendProtocol
= unit
->downstream
->getProtocol();
163 if (backendProtocol
== dnsdist::Protocol::DoUDP
&& unit
->tcp
) {
164 backendProtocol
= dnsdist::Protocol::DoTCP
;
166 handleResponseSent(unit
->ids
, udiff
, unit
->ids
.origRemote
, unit
->downstream
->d_config
.remote
, unit
->response
.size(), cleartextDH
, backendProtocol
, true);
169 ++dnsdist::metrics::g_stats
.responses
;
170 if (unit
->ids
.cs
!= nullptr) {
171 ++unit
->ids
.cs
->responses
;
174 sendBackDOQUnit(std::move(unit
), "Cross-protocol response");
177 void handleXFRResponse(const struct timeval
& now
, TCPResponse
&& response
) override
179 return handleResponse(now
, std::move(response
));
182 void notifyIOError([[maybe_unused
]] const struct timeval
& now
, TCPResponse
&& response
) override
184 if (!response
.d_idstate
.doqu
) {
188 auto unit
= std::move(response
.d_idstate
.doqu
);
189 if (unit
->dsc
== nullptr) {
193 /* this will signal an error */
194 unit
->response
.clear();
195 unit
->ids
= std::move(response
.d_idstate
);
196 sendBackDOQUnit(std::move(unit
), "Cross-protocol error");
200 class DOQCrossProtocolQuery
: public CrossProtocolQuery
203 DOQCrossProtocolQuery(DOQUnitUniquePtr
&& unit
, bool isResponse
)
206 /* happens when a response becomes async */
207 query
= InternalQuery(std::move(unit
->response
), std::move(unit
->ids
));
210 /* we need to duplicate the query here because we might need
211 the existing query later if we get a truncated answer */
212 query
= InternalQuery(PacketBuffer(unit
->query
), std::move(unit
->ids
));
215 /* it might have been moved when we moved unit->ids */
217 query
.d_idstate
.doqu
= std::move(unit
);
220 /* we _could_ remove it from the query buffer and put in query's d_proxyProtocolPayload,
221 clearing query.d_proxyProtocolPayloadAdded and unit->proxyProtocolPayloadSize.
222 Leave it for now because we know that the onky case where the payload has been
223 added is when we tried over UDP, got a TC=1 answer and retried over TCP/DoT,
224 and we know the TCP/DoT code can handle it. */
225 query
.d_proxyProtocolPayloadAdded
= query
.d_idstate
.doqu
->proxyProtocolPayloadSize
> 0;
226 downstream
= query
.d_idstate
.doqu
->downstream
;
229 void handleInternalError()
231 sendBackDOQUnit(std::move(query
.d_idstate
.doqu
), "DOQ internal error");
234 std::shared_ptr
<TCPQuerySender
> getTCPQuerySender() override
236 query
.d_idstate
.doqu
->downstream
= downstream
;
240 DNSQuestion
getDQ() override
242 auto& ids
= query
.d_idstate
;
243 DNSQuestion
dnsQuestion(ids
, query
.d_buffer
);
247 DNSResponse
getDR() override
249 auto& ids
= query
.d_idstate
;
250 DNSResponse
dnsResponse(ids
, query
.d_buffer
, downstream
);
254 DOQUnitUniquePtr
&& releaseDU()
256 return std::move(query
.d_idstate
.doqu
);
260 static std::shared_ptr
<DOQTCPCrossQuerySender
> s_sender
;
263 std::shared_ptr
<DOQTCPCrossQuerySender
> DOQCrossProtocolQuery::s_sender
= std::make_shared
<DOQTCPCrossQuerySender
>();
265 static bool tryWriteResponse(Connection
& conn
, const uint64_t streamID
, PacketBuffer
& response
)
268 while (pos
< response
.size()) {
269 auto res
= quiche_conn_stream_send(conn
.d_conn
.get(), streamID
, &response
.at(pos
), response
.size() - pos
, true);
270 if (res
== QUICHE_ERR_DONE
) {
271 response
.erase(response
.begin(), response
.begin() + static_cast<ssize_t
>(pos
));
275 quiche_conn_stream_shutdown(conn
.d_conn
.get(), streamID
, QUICHE_SHUTDOWN_WRITE
, static_cast<uint64_t>(DOQ_Error_Codes::DOQ_INTERNAL_ERROR
));
284 static void handleResponse(DOQFrontend
& frontend
, Connection
& conn
, const uint64_t streamID
, PacketBuffer
& response
)
286 if (response
.empty()) {
287 ++frontend
.d_errorResponses
;
288 quiche_conn_stream_shutdown(conn
.d_conn
.get(), streamID
, QUICHE_SHUTDOWN_WRITE
, static_cast<uint64_t>(DOQ_Error_Codes::DOQ_UNSPECIFIED_ERROR
));
291 ++frontend
.d_validResponses
;
292 auto responseSize
= static_cast<uint16_t>(response
.size());
293 const std::array
<uint8_t, 2> sizeBytes
= {static_cast<uint8_t>(responseSize
/ 256), static_cast<uint8_t>(responseSize
% 256)};
294 response
.insert(response
.begin(), sizeBytes
.begin(), sizeBytes
.end());
295 if (!tryWriteResponse(conn
, streamID
, response
)) {
296 conn
.d_streamOutBuffers
[streamID
] = std::move(response
);
300 void DOQFrontend::setup()
302 auto config
= QuicheConfig(quiche_config_new(QUICHE_PROTOCOL_VERSION
), quiche_config_free
);
303 d_quicheParams
.d_alpn
= std::string(DOQ_ALPN
.begin(), DOQ_ALPN
.end());
304 configureQuiche(config
, d_quicheParams
, false);
305 d_server_config
= std::make_unique
<DOQServerConfig
>(std::move(config
), d_internalPipeBufferSize
);
308 void DOQFrontend::reloadCertificates()
310 auto config
= QuicheConfig(quiche_config_new(QUICHE_PROTOCOL_VERSION
), quiche_config_free
);
311 d_quicheParams
.d_alpn
= std::string(DOQ_ALPN
.begin(), DOQ_ALPN
.end());
312 configureQuiche(config
, d_quicheParams
, false);
313 std::atomic_store_explicit(&d_server_config
->config
, std::move(config
), std::memory_order_release
);
316 static std::optional
<std::reference_wrapper
<Connection
>> getConnection(DOQServerConfig::ConnectionsMap
& connMap
, const PacketBuffer
& connID
)
318 auto iter
= connMap
.find(connID
);
319 if (iter
== connMap
.end()) {
325 static void sendBackDOQUnit(DOQUnitUniquePtr
&& unit
, const char* description
)
327 if (unit
->dsc
== nullptr) {
331 if (!unit
->dsc
->d_responseSender
.send(std::move(unit
))) {
332 ++dnsdist::metrics::g_stats
.doqResponsePipeFull
;
333 vinfolog("Unable to pass a %s to the DoQ worker thread because the pipe is full", description
);
336 catch (const std::exception
& e
) {
337 vinfolog("Unable to pass a %s to the DoQ worker thread because we couldn't write to the pipe: %s", description
, e
.what());
341 static std::optional
<std::reference_wrapper
<Connection
>> createConnection(DOQServerConfig
& config
, const PacketBuffer
& serverSideID
, const PacketBuffer
& originalDestinationID
, const ComboAddress
& local
, const ComboAddress
& peer
)
343 auto quicheConfig
= std::atomic_load_explicit(&config
.config
, std::memory_order_acquire
);
344 auto quicheConn
= QuicheConnection(quiche_accept(serverSideID
.data(), serverSideID
.size(),
345 originalDestinationID
.data(), originalDestinationID
.size(),
346 // NOLINTNEXTLINE(cppcoreguidelines-pro-type-reinterpret-cast)
347 reinterpret_cast<const struct sockaddr
*>(&local
),
349 // NOLINTNEXTLINE(cppcoreguidelines-pro-type-reinterpret-cast)
350 reinterpret_cast<const struct sockaddr
*>(&peer
),
355 if (config
.df
&& !config
.df
->d_quicheParams
.d_keyLogFile
.empty()) {
356 quiche_conn_set_keylog_path(quicheConn
.get(), config
.df
->d_quicheParams
.d_keyLogFile
.c_str());
359 auto conn
= Connection(peer
, std::move(quicheConfig
), std::move(quicheConn
));
360 auto pair
= config
.d_connections
.emplace(serverSideID
, std::move(conn
));
361 return pair
.first
->second
;
364 std::unique_ptr
<CrossProtocolQuery
> getDOQCrossProtocolQueryFromDQ(DNSQuestion
& dnsQuestion
, bool isResponse
)
366 if (!dnsQuestion
.ids
.doqu
) {
367 throw std::runtime_error("Trying to create a DoQ cross protocol query without a valid DoQ unit");
370 auto unit
= std::move(dnsQuestion
.ids
.doqu
);
371 if (&dnsQuestion
.ids
!= &unit
->ids
) {
372 unit
->ids
= std::move(dnsQuestion
.ids
);
375 unit
->ids
.origID
= dnsQuestion
.getHeader()->id
;
378 if (unit
->query
.data() != dnsQuestion
.getMutableData().data()) {
379 unit
->query
= std::move(dnsQuestion
.getMutableData());
383 if (unit
->response
.data() != dnsQuestion
.getMutableData().data()) {
384 unit
->response
= std::move(dnsQuestion
.getMutableData());
388 return std::make_unique
<DOQCrossProtocolQuery
>(std::move(unit
), isResponse
);
391 static void processDOQQuery(DOQUnitUniquePtr
&& doqUnit
)
393 const auto handleImmediateResponse
= [](DOQUnitUniquePtr
&& unit
, [[maybe_unused
]] const char* reason
) {
394 DEBUGLOG("handleImmediateResponse() reason=" << reason
);
395 auto conn
= getConnection(unit
->dsc
->df
->d_server_config
->d_connections
, unit
->serverConnID
);
396 handleResponse(*unit
->dsc
->df
, *conn
, unit
->streamID
, unit
->response
);
397 unit
->ids
.doqu
.reset();
400 auto& ids
= doqUnit
->ids
;
401 ids
.doqu
= std::move(doqUnit
);
402 auto& unit
= ids
.doqu
;
403 uint16_t queryId
= 0;
408 remote
= unit
->ids
.origRemote
;
409 DOQServerConfig
* dsc
= unit
->dsc
;
410 auto& holders
= dsc
->holders
;
411 ClientState
& clientState
= *dsc
->clientState
;
413 if (!holders
.acl
->match(remote
)) {
414 vinfolog("Query from %s (DoQ) dropped because of ACL", remote
.toStringWithPort());
415 ++dnsdist::metrics::g_stats
.aclDrops
;
416 unit
->response
.clear();
418 handleImmediateResponse(std::move(unit
), "DoQ query dropped because of ACL");
422 if (unit
->query
.size() < sizeof(dnsheader
)) {
423 ++dnsdist::metrics::g_stats
.nonCompliantQueries
;
424 ++clientState
.nonCompliantQueries
;
425 unit
->response
.clear();
427 handleImmediateResponse(std::move(unit
), "DoQ non-compliant query");
431 ++clientState
.queries
;
432 ++dnsdist::metrics::g_stats
.queries
;
433 unit
->ids
.queryRealTime
.start();
436 /* don't keep that pointer around, it will be invalidated if the buffer is ever resized */
437 dnsheader_aligned
dnsHeader(unit
->query
.data());
439 if (!checkQueryHeaders(*dnsHeader
, clientState
)) {
440 dnsdist::PacketMangling::editDNSHeaderFromPacket(unit
->query
, [](dnsheader
& header
) {
441 header
.rcode
= RCode::ServFail
;
445 unit
->response
= std::move(unit
->query
);
447 handleImmediateResponse(std::move(unit
), "DoQ invalid headers");
451 if (dnsHeader
->qdcount
== 0) {
452 dnsdist::PacketMangling::editDNSHeaderFromPacket(unit
->query
, [](dnsheader
& header
) {
453 header
.rcode
= RCode::NotImp
;
457 unit
->response
= std::move(unit
->query
);
459 handleImmediateResponse(std::move(unit
), "DoQ empty query");
463 queryId
= ntohs(dnsHeader
->id
);
466 auto downstream
= unit
->downstream
;
467 // NOLINTNEXTLINE(cppcoreguidelines-pro-type-reinterpret-cast)
468 unit
->ids
.qname
= DNSName(reinterpret_cast<const char*>(unit
->query
.data()), static_cast<int>(unit
->query
.size()), sizeof(dnsheader
), false, &unit
->ids
.qtype
, &unit
->ids
.qclass
);
469 DNSQuestion
dnsQuestion(unit
->ids
, unit
->query
);
470 dnsdist::PacketMangling::editDNSHeaderFromPacket(dnsQuestion
.getMutableData(), [&ids
](dnsheader
& header
) {
471 const uint16_t* flags
= getFlagsFromDNSHeader(&header
);
472 ids
.origFlags
= *flags
;
475 unit
->ids
.cs
= &clientState
;
477 auto result
= processQuery(dnsQuestion
, holders
, downstream
);
478 if (result
== ProcessQueryResult::Drop
) {
479 handleImmediateResponse(std::move(unit
), "DoQ dropped query");
482 if (result
== ProcessQueryResult::Asynchronous
) {
485 if (result
== ProcessQueryResult::SendAnswer
) {
486 if (unit
->response
.empty()) {
487 unit
->response
= std::move(unit
->query
);
489 if (unit
->response
.size() >= sizeof(dnsheader
)) {
490 const dnsheader_aligned
dnsHeader(unit
->response
.data());
492 handleResponseSent(unit
->ids
.qname
, QType(unit
->ids
.qtype
), 0., unit
->ids
.origDest
, ComboAddress(), unit
->response
.size(), *dnsHeader
, dnsdist::Protocol::DoQ
, dnsdist::Protocol::DoQ
, false);
494 handleImmediateResponse(std::move(unit
), "DoQ self-answered response");
498 ++dnsdist::metrics::g_stats
.responses
;
499 if (unit
->ids
.cs
!= nullptr) {
500 ++unit
->ids
.cs
->responses
;
503 if (result
!= ProcessQueryResult::PassToBackend
) {
504 handleImmediateResponse(std::move(unit
), "DoQ no backend available");
508 if (downstream
== nullptr) {
509 handleImmediateResponse(std::move(unit
), "DoQ no backend available");
513 unit
->downstream
= downstream
;
515 std::string proxyProtocolPayload
;
516 /* we need to do this _before_ creating the cross protocol query because
517 after that the buffer will have been moved */
518 if (downstream
->d_config
.useProxyProtocol
) {
519 proxyProtocolPayload
= getProxyProtocolPayload(dnsQuestion
);
522 unit
->ids
.origID
= htons(queryId
);
525 /* this moves unit->ids, careful! */
526 auto cpq
= std::make_unique
<DOQCrossProtocolQuery
>(std::move(unit
), false);
527 cpq
->query
.d_proxyProtocolPayload
= std::move(proxyProtocolPayload
);
529 if (downstream
->passCrossProtocolQuery(std::move(cpq
))) {
532 // NOLINTNEXTLINE(bugprone-use-after-move): it was only moved if the call succeeded
533 unit
= cpq
->releaseDU();
534 handleImmediateResponse(std::move(unit
), "DoQ internal error");
537 catch (const std::exception
& e
) {
538 vinfolog("Got an error in DOQ question thread while parsing a query from %s, id %d: %s", remote
.toStringWithPort(), queryId
, e
.what());
539 handleImmediateResponse(std::move(unit
), "DoQ internal error");
544 static void doq_dispatch_query(DOQServerConfig
& dsc
, PacketBuffer
&& query
, const ComboAddress
& local
, const ComboAddress
& remote
, const PacketBuffer
& serverConnID
, const uint64_t streamID
)
547 auto unit
= std::make_unique
<DOQUnit
>(std::move(query
));
549 unit
->ids
.origDest
= local
;
550 unit
->ids
.origRemote
= remote
;
551 unit
->ids
.protocol
= dnsdist::Protocol::DoQ
;
552 unit
->serverConnID
= serverConnID
;
553 unit
->streamID
= streamID
;
555 processDOQQuery(std::move(unit
));
557 catch (const std::exception
& exp
) {
558 vinfolog("Had error handling DoQ DNS packet from %s: %s", remote
.toStringWithPort(), exp
.what());
562 static void flushResponses(pdns::channel::Receiver
<DOQUnit
>& receiver
)
566 auto tmp
= receiver
.receive();
571 auto unit
= std::move(*tmp
);
572 auto conn
= getConnection(unit
->dsc
->df
->d_server_config
->d_connections
, unit
->serverConnID
);
574 handleResponse(*unit
->dsc
->df
, *conn
, unit
->streamID
, unit
->response
);
577 catch (const std::exception
& e
) {
578 errlog("Error while processing response received over DoQ: %s", e
.what());
581 errlog("Unspecified error while processing response received over DoQ");
586 static void flushStalledResponses(Connection
& conn
)
588 for (auto streamIt
= conn
.d_streamOutBuffers
.begin(); streamIt
!= conn
.d_streamOutBuffers
.end();) {
589 const auto& streamID
= streamIt
->first
;
590 auto& response
= streamIt
->second
;
591 if (quiche_conn_stream_writable(conn
.d_conn
.get(), streamID
, response
.size()) == 1) {
592 if (tryWriteResponse(conn
, streamID
, response
)) {
593 streamIt
= conn
.d_streamOutBuffers
.erase(streamIt
);
601 static void handleReadableStream(DOQFrontend
& frontend
, ClientState
& clientState
, Connection
& conn
, uint64_t streamID
, const ComboAddress
& client
, const PacketBuffer
& serverConnID
)
603 auto& streamBuffer
= conn
.d_streamBuffers
[streamID
];
606 auto existingLength
= streamBuffer
.size();
607 streamBuffer
.resize(existingLength
+ 512);
608 auto received
= quiche_conn_stream_recv(conn
.d_conn
.get(), streamID
,
609 &streamBuffer
.at(existingLength
), 512,
611 if (received
== 0 || received
== QUICHE_ERR_DONE
) {
612 streamBuffer
.resize(existingLength
);
616 ++dnsdist::metrics::g_stats
.nonCompliantQueries
;
617 ++clientState
.nonCompliantQueries
;
618 quiche_conn_stream_shutdown(conn
.d_conn
.get(), streamID
, QUICHE_SHUTDOWN_WRITE
, static_cast<uint64_t>(DOQ_Error_Codes::DOQ_PROTOCOL_ERROR
));
622 streamBuffer
.resize(existingLength
+ received
);
628 if (streamBuffer
.size() < (sizeof(uint16_t) + sizeof(dnsheader
))) {
629 ++dnsdist::metrics::g_stats
.nonCompliantQueries
;
630 ++clientState
.nonCompliantQueries
;
631 quiche_conn_stream_shutdown(conn
.d_conn
.get(), streamID
, QUICHE_SHUTDOWN_WRITE
, static_cast<uint64_t>(DOQ_Error_Codes::DOQ_PROTOCOL_ERROR
));
635 uint16_t payloadLength
= streamBuffer
.at(0) * 256 + streamBuffer
.at(1);
636 streamBuffer
.erase(streamBuffer
.begin(), streamBuffer
.begin() + 2);
637 if (payloadLength
!= streamBuffer
.size()) {
638 ++dnsdist::metrics::g_stats
.nonCompliantQueries
;
639 ++clientState
.nonCompliantQueries
;
640 quiche_conn_stream_shutdown(conn
.d_conn
.get(), streamID
, QUICHE_SHUTDOWN_WRITE
, static_cast<uint64_t>(DOQ_Error_Codes::DOQ_PROTOCOL_ERROR
));
643 DEBUGLOG("Dispatching query");
644 doq_dispatch_query(*(frontend
.d_server_config
), std::move(streamBuffer
), clientState
.local
, client
, serverConnID
, streamID
);
645 conn
.d_streamBuffers
.erase(streamID
);
648 static void handleSocketReadable(DOQFrontend
& frontend
, ClientState
& clientState
, Socket
& sock
, PacketBuffer
& buffer
)
650 // destination connection ID, will have to be sent as original destination connection ID
651 PacketBuffer serverConnID
;
652 // source connection ID, will have to be sent as destination connection ID
653 PacketBuffer clientConnID
;
654 PacketBuffer tokenBuf
;
658 if (!sock
.recvFromAsync(buffer
, client
) || buffer
.empty()) {
661 DEBUGLOG("Received DoQ datagram of size " << buffer
.size() << " from " << client
.toStringWithPort());
665 std::array
<uint8_t, QUICHE_MAX_CONN_ID_LEN
> scid
{};
666 size_t scid_len
= scid
.size();
667 std::array
<uint8_t, QUICHE_MAX_CONN_ID_LEN
> dcid
{};
668 size_t dcid_len
= dcid
.size();
669 std::array
<uint8_t, MAX_TOKEN_LEN
> token
{};
670 size_t token_len
= token
.size();
672 auto res
= quiche_header_info(buffer
.data(), buffer
.size(), LOCAL_CONN_ID_LEN
,
674 scid
.data(), &scid_len
,
675 dcid
.data(), &dcid_len
,
676 token
.data(), &token_len
);
678 DEBUGLOG("Error in quiche_header_info: " << res
);
682 serverConnID
.assign(dcid
.begin(), dcid
.begin() + dcid_len
);
683 clientConnID
.assign(scid
.begin(), scid
.begin() + scid_len
);
684 auto conn
= getConnection(frontend
.d_server_config
->d_connections
, serverConnID
);
687 DEBUGLOG("Connection not found");
688 if (type
!= static_cast<uint8_t>(DOQ_Packet_Types::QUIC_PACKET_TYPE_INITIAL
)) {
689 DEBUGLOG("Packet is not initial");
693 if (!quiche_version_is_supported(version
)) {
694 DEBUGLOG("Unsupported version");
695 ++frontend
.d_doqUnsupportedVersionErrors
;
696 handleVersionNegociation(sock
, clientConnID
, serverConnID
, client
, buffer
);
700 if (token_len
== 0) {
701 /* stateless retry */
702 DEBUGLOG("No token received");
703 handleStatelessRetry(sock
, clientConnID
, serverConnID
, client
, version
, buffer
);
707 tokenBuf
.assign(token
.begin(), token
.begin() + token_len
);
708 auto originalDestinationID
= validateToken(tokenBuf
, client
);
709 if (!originalDestinationID
) {
710 ++frontend
.d_doqInvalidTokensReceived
;
711 DEBUGLOG("Discarding invalid token");
715 DEBUGLOG("Creating a new connection");
716 conn
= createConnection(*frontend
.d_server_config
, serverConnID
, *originalDestinationID
, clientState
.local
, client
);
721 DEBUGLOG("Connection found");
722 quiche_recv_info recv_info
= {
723 // NOLINTNEXTLINE(cppcoreguidelines-pro-type-reinterpret-cast)
724 reinterpret_cast<struct sockaddr
*>(&client
),
726 // NOLINTNEXTLINE(cppcoreguidelines-pro-type-reinterpret-cast)
727 reinterpret_cast<struct sockaddr
*>(&clientState
.local
),
728 clientState
.local
.getSocklen(),
731 auto done
= quiche_conn_recv(conn
->get().d_conn
.get(), buffer
.data(), buffer
.size(), &recv_info
);
736 if (quiche_conn_is_established(conn
->get().d_conn
.get()) || quiche_conn_is_in_early_data(conn
->get().d_conn
.get())) {
737 auto readable
= std::unique_ptr
<quiche_stream_iter
, decltype(&quiche_stream_iter_free
)>(quiche_conn_readable(conn
->get().d_conn
.get()), quiche_stream_iter_free
);
739 uint64_t streamID
= 0;
740 while (quiche_stream_iter_next(readable
.get(), &streamID
)) {
741 handleReadableStream(frontend
, clientState
, *conn
, streamID
, client
, serverConnID
);
744 flushEgress(sock
, conn
->get().d_conn
, client
, buffer
);
747 DEBUGLOG("Connection not established");
752 // this is the entrypoint from dnsdist.cc
753 void doqThread(ClientState
* clientState
)
756 std::shared_ptr
<DOQFrontend
>& frontend
= clientState
->doqFrontend
;
758 frontend
->d_server_config
->clientState
= clientState
;
759 frontend
->d_server_config
->df
= clientState
->doqFrontend
;
761 setThreadName("dnsdist/doq");
763 Socket
sock(clientState
->udpFD
);
764 sock
.setNonBlocking();
766 auto mplexer
= std::unique_ptr
<FDMultiplexer
>(FDMultiplexer::getMultiplexerSilent());
768 auto responseReceiverFD
= frontend
->d_server_config
->d_responseReceiver
.getDescriptor();
769 mplexer
->addReadFD(sock
.getHandle(), [](int, FDMultiplexer::funcparam_t
&) {});
770 mplexer
->addReadFD(responseReceiverFD
, [](int, FDMultiplexer::funcparam_t
&) {});
771 std::vector
<int> readyFDs
;
772 PacketBuffer
buffer(4096);
775 mplexer
->getAvailableFDs(readyFDs
, 500);
778 if (std::find(readyFDs
.begin(), readyFDs
.end(), sock
.getHandle()) != readyFDs
.end()) {
779 handleSocketReadable(*frontend
, *clientState
, sock
, buffer
);
782 if (std::find(readyFDs
.begin(), readyFDs
.end(), responseReceiverFD
) != readyFDs
.end()) {
783 flushResponses(frontend
->d_server_config
->d_responseReceiver
);
786 for (auto conn
= frontend
->d_server_config
->d_connections
.begin(); conn
!= frontend
->d_server_config
->d_connections
.end();) {
787 quiche_conn_on_timeout(conn
->second
.d_conn
.get());
789 flushEgress(sock
, conn
->second
.d_conn
, conn
->second
.d_peer
, buffer
);
791 if (quiche_conn_is_closed(conn
->second
.d_conn
.get())) {
792 #ifdef DEBUGLOG_ENABLED
794 quiche_path_stats path_stats
;
796 quiche_conn_stats(conn
->second
.d_conn
.get(), &stats
);
797 quiche_conn_path_stats(conn
->second
.d_conn
.get(), 0, &path_stats
);
799 DEBUGLOG("Connection (DoQ) closed, recv=" << stats
.recv
<< " sent=" << stats
.sent
<< " lost=" << stats
.lost
<< " rtt=" << path_stats
.rtt
<< "ns cwnd=" << path_stats
.cwnd
);
801 conn
= frontend
->d_server_config
->d_connections
.erase(conn
);
804 flushStalledResponses(conn
->second
);
809 catch (const std::exception
& exp
) {
810 vinfolog("Caught exception in the main DoQ thread: %s", exp
.what());
813 vinfolog("Unknown exception in the main DoQ thread");
817 catch (const std::exception
& e
) {
818 DEBUGLOG("Caught fatal error in the main DoQ thread: " << e
.what());
822 #endif /* HAVE_DNS_OVER_QUIC */