2 * This file is part of PowerDNS or dnsdist.
3 * Copyright -- PowerDNS.COM B.V. and its contributors
5 * This program is free software; you can redistribute it and/or modify
6 * it under the terms of version 2 of the GNU General Public License as
7 * published by the Free Software Foundation.
9 * In addition, for the avoidance of any doubt, permission is granted to
10 * link this program with OpenSSL and to (re)distribute the binaries
11 * produced as the result of such linking.
13 * This program is distributed in the hope that it will be useful,
14 * but WITHOUT ANY WARRANTY; without even the implied warranty of
15 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16 * GNU General Public License for more details.
18 * You should have received a copy of the GNU General Public License
19 * along with this program; if not, write to the Free Software
20 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
25 #include "dnsdist-tcp.hh"
26 #include "dnsdist-random.hh"
27 #include "dnsparser.hh"
32 #include "threadname.hh"
33 #include "dnsdist-ecs.hh"
34 #include "dnsdist-proxy-protocol.hh"
35 #include "sodcrypto.hh"
37 static std::string s_quicRetryTokenKey
= newKey(false);
39 #ifdef HAVE_DNS_OVER_QUIC
43 using QuicheConnection
= std::unique_ptr
<quiche_conn
, decltype(&quiche_conn_free
)>;
44 using QuicheConfig
= std::unique_ptr
<quiche_config
, decltype(&quiche_config_free
)>;
49 Connection(const ComboAddress
& peer
, std::unique_ptr
<quiche_conn
, decltype(&quiche_conn_free
)>&& conn
) :
50 d_peer(peer
), d_conn(std::move(conn
))
53 Connection(const Connection
&) = delete;
54 Connection(Connection
&&) = default;
55 Connection
& operator=(const Connection
&) = delete;
56 Connection
& operator=(Connection
&&) = default;
57 ~Connection() = default;
60 QuicheConnection d_conn
;
65 static void sendBackDOQUnit(DOQUnitUniquePtr
&& du
, const char* description
);
66 struct DOQServerConfig
68 DOQServerConfig(std::unique_ptr
<quiche_config
, decltype(&quiche_config_free
)>&& config_
, uint32_t internalPipeBufferSize
) :
69 config(std::move(config_
))
72 auto [sender
, receiver
] = pdns::channel::createObjectQueue
<DOQUnit
>(pdns::channel::SenderBlockingMode::SenderNonBlocking
, pdns::channel::ReceiverBlockingMode::ReceiverNonBlocking
, internalPipeBufferSize
);
73 d_responseSender
= std::move(sender
);
74 d_responseReceiver
= std::move(receiver
);
77 DOQServerConfig(const DOQServerConfig
&) = delete;
78 DOQServerConfig(DOQServerConfig
&&) = default;
79 DOQServerConfig
& operator=(const DOQServerConfig
&) = delete;
80 DOQServerConfig
& operator=(DOQServerConfig
&&) = default;
81 ~DOQServerConfig() = default;
85 ClientState
* cs
{nullptr};
86 std::shared_ptr
<DOQFrontend
> df
{nullptr};
87 pdns::channel::Sender
<DOQUnit
> d_responseSender
;
88 pdns::channel::Receiver
<DOQUnit
> d_responseReceiver
;
92 #define DEBUGLOG_ENABLED
93 #define DEBUGLOG(x) std::cerr << x << std::endl;
98 static constexpr size_t MAX_DATAGRAM_SIZE
= 1350;
99 static constexpr size_t LOCAL_CONN_ID_LEN
= 16;
101 static std::map
<PacketBuffer
, Connection
> s_connections
;
103 class DOQTCPCrossQuerySender final
: public TCPQuerySender
106 DOQTCPCrossQuerySender()
110 bool active() const override
115 void handleResponse(const struct timeval
& now
, TCPResponse
&& response
) override
117 if (!response
.d_idstate
.doqu
) {
121 auto du
= std::move(response
.d_idstate
.doqu
);
122 if (du
->dsc
== nullptr) {
126 du
->response
= std::move(response
.d_buffer
);
127 du
->ids
= std::move(response
.d_idstate
);
128 DNSResponse
dr(du
->ids
, du
->response
, du
->downstream
);
130 dnsheader cleartextDH
;
131 memcpy(&cleartextDH
, dr
.getHeader(), sizeof(cleartextDH
));
133 if (!response
.isAsync()) {
135 static thread_local LocalStateHolder
<vector
<DNSDistResponseRuleAction
>> localRespRuleActions
= g_respruleactions
.getLocal();
136 static thread_local LocalStateHolder
<vector
<DNSDistResponseRuleAction
>> localCacheInsertedRespRuleActions
= g_cacheInsertedRespRuleActions
.getLocal();
138 dr
.ids
.doqu
= std::move(du
);
140 if (!processResponse(dr
.ids
.doqu
->response
, *localRespRuleActions
, *localCacheInsertedRespRuleActions
, dr
, false)) {
143 sendBackDOQUnit(std::move(dr
.ids
.doqu
), "Response dropped by rules");
148 if (dr
.isAsynchronous()) {
152 du
= std::move(dr
.ids
.doqu
);
155 if (!du
->ids
.selfGenerated
) {
156 double udiff
= du
->ids
.queryRealTime
.udiff();
157 vinfolog("Got answer from %s, relayed to %s (quic), took %f us", du
->downstream
->d_config
.remote
.toStringWithPort(), du
->ids
.origRemote
.toStringWithPort(), udiff
);
159 auto backendProtocol
= du
->downstream
->getProtocol();
160 if (backendProtocol
== dnsdist::Protocol::DoUDP
&& du
->tcp
) {
161 backendProtocol
= dnsdist::Protocol::DoTCP
;
163 handleResponseSent(du
->ids
, udiff
, du
->ids
.origRemote
, du
->downstream
->d_config
.remote
, du
->response
.size(), cleartextDH
, backendProtocol
, true);
166 ++dnsdist::metrics::g_stats
.responses
;
168 ++du
->ids
.cs
->responses
;
171 sendBackDOQUnit(std::move(du
), "Cross-protocol response");
174 void handleXFRResponse(const struct timeval
& now
, TCPResponse
&& response
) override
176 return handleResponse(now
, std::move(response
));
179 void notifyIOError(const struct timeval
& now
, TCPResponse
&& response
) override
184 class DOQCrossProtocolQuery
: public CrossProtocolQuery
187 DOQCrossProtocolQuery(DOQUnitUniquePtr
&& du
, bool isResponse
)
190 /* happens when a response becomes async */
191 query
= InternalQuery(std::move(du
->response
), std::move(du
->ids
));
194 /* we need to duplicate the query here because we might need
195 the existing query later if we get a truncated answer */
196 query
= InternalQuery(PacketBuffer(du
->query
), std::move(du
->ids
));
199 /* it might have been moved when we moved du->ids */
201 query
.d_idstate
.doqu
= std::move(du
);
204 /* we _could_ remove it from the query buffer and put in query's d_proxyProtocolPayload,
205 clearing query.d_proxyProtocolPayloadAdded and du->proxyProtocolPayloadSize.
206 Leave it for now because we know that the onky case where the payload has been
207 added is when we tried over UDP, got a TC=1 answer and retried over TCP/DoT,
208 and we know the TCP/DoT code can handle it. */
209 query
.d_proxyProtocolPayloadAdded
= query
.d_idstate
.doqu
->proxyProtocolPayloadSize
> 0;
210 downstream
= query
.d_idstate
.doqu
->downstream
;
213 void handleInternalError()
215 sendBackDOQUnit(std::move(query
.d_idstate
.doqu
), "DOQ internal error");
218 std::shared_ptr
<TCPQuerySender
> getTCPQuerySender() override
220 query
.d_idstate
.doqu
->downstream
= downstream
;
224 DNSQuestion
getDQ() override
226 auto& ids
= query
.d_idstate
;
227 DNSQuestion
dq(ids
, query
.d_buffer
);
231 DNSResponse
getDR() override
233 auto& ids
= query
.d_idstate
;
234 DNSResponse
dr(ids
, query
.d_buffer
, downstream
);
238 DOQUnitUniquePtr
&& releaseDU()
240 return std::move(query
.d_idstate
.doqu
);
244 static std::shared_ptr
<DOQTCPCrossQuerySender
> s_sender
;
247 std::shared_ptr
<DOQTCPCrossQuerySender
> DOQCrossProtocolQuery::s_sender
= std::make_shared
<DOQTCPCrossQuerySender
>();
249 static void handleResponse(DOQFrontend
& df
, Connection
& conn
, const uint64_t streamID
, const PacketBuffer
& response
)
251 if (response
.size() == 0) {
252 quiche_conn_stream_shutdown(conn
.d_conn
.get(), streamID
, QUICHE_SHUTDOWN_WRITE
, 0x5);
255 uint16_t responseSize
= static_cast<uint16_t>(response
.size());
256 const uint8_t sizeBytes
[] = {static_cast<uint8_t>(responseSize
/ 256), static_cast<uint8_t>(responseSize
% 256)};
257 auto res
= quiche_conn_stream_send(conn
.d_conn
.get(), streamID
, sizeBytes
, sizeof(sizeBytes
), false);
258 if (res
== sizeof(sizeBytes
)) {
259 res
= quiche_conn_stream_send(conn
.d_conn
.get(), streamID
, response
.data(), response
.size(), true);
264 static void fillRandom(PacketBuffer
& buffer
, size_t size
)
266 buffer
.reserve(size
);
268 buffer
.insert(buffer
.end(), dnsdist::getRandomValue(std::numeric_limits
<uint8_t>::max()));
272 void DOQFrontend::setup()
274 auto config
= QuicheConfig(quiche_config_new(QUICHE_PROTOCOL_VERSION
), quiche_config_free
);
275 for (const auto& pair
: d_tlsConfig
.d_certKeyPairs
) {
276 auto res
= quiche_config_load_cert_chain_from_pem_file(config
.get(), pair
.d_cert
.c_str());
278 throw std::runtime_error("Error loading the server certificate: " + std::to_string(res
));
281 res
= quiche_config_load_priv_key_from_pem_file(config
.get(), pair
.d_key
->c_str());
283 throw std::runtime_error("Error loading the server key: " + std::to_string(res
));
289 const std::array
<uint8_t, 4> alpn
{'\x03', 'd', 'o', 'q'};
290 auto res
= quiche_config_set_application_protos(config
.get(),
294 throw std::runtime_error("Error setting ALPN: " + std::to_string(res
));
298 quiche_config_set_max_idle_timeout(config
.get(), d_idleTimeout
* 1000);
299 quiche_config_set_max_recv_udp_payload_size(config
.get(), MAX_DATAGRAM_SIZE
);
300 quiche_config_set_max_send_udp_payload_size(config
.get(), MAX_DATAGRAM_SIZE
);
301 quiche_config_set_initial_max_data(config
.get(), 10000000);
302 quiche_config_set_initial_max_stream_data_bidi_local(config
.get(), 1000000);
303 quiche_config_set_initial_max_stream_data_bidi_remote(config
.get(), 1000000);
304 quiche_config_set_initial_max_streams_bidi(config
.get(), 100);
305 quiche_config_set_cc_algorithm(config
.get(), QUICHE_CC_RENO
);
308 PacketBuffer resetToken
;
309 fillRandom(resetToken
, 16);
310 quiche_config_set_stateless_reset_token(config
.get(), reinterpret_cast<const uint8_t*>(resetToken
.data()));
313 d_server_config
= std::make_shared
<DOQServerConfig
>(std::move(config
), d_internalPipeBufferSize
);
316 static std::optional
<PacketBuffer
> getCID()
320 fillRandom(buffer
, LOCAL_CONN_ID_LEN
);
325 static constexpr size_t MAX_TOKEN_LEN
= std::tuple_size
<decltype(SodiumNonce::value
)>{} /* nonce */ + sizeof(uint64_t) /* TTD */ + 16 /* IPv6 */ + QUICHE_MAX_CONN_ID_LEN
;
327 static PacketBuffer
mintToken(const PacketBuffer
& dcid
, const ComboAddress
& peer
)
333 const auto addrBytes
= peer
.toByteString();
334 // this token will be valid for 60s
335 const uint64_t ttd
= time(nullptr) + 60U;
336 PacketBuffer plainTextToken
;
337 plainTextToken
.reserve(sizeof(ttd
) + addrBytes
.size() + dcid
.size());
338 plainTextToken
.insert(plainTextToken
.end(), reinterpret_cast<const char*>(&ttd
), reinterpret_cast<const char*>(&ttd
) + sizeof(ttd
));
339 plainTextToken
.insert(plainTextToken
.end(), addrBytes
.begin(), addrBytes
.end());
340 plainTextToken
.insert(plainTextToken
.end(), dcid
.begin(), dcid
.end());
341 const auto encryptedToken
= sodEncryptSym(std::string_view(reinterpret_cast<const char*>(plainTextToken
.data()), plainTextToken
.size()), s_quicRetryTokenKey
, nonce
, false);
342 // a bit sad, let's see if we can do better later
343 auto encryptedTokenPacket
= PacketBuffer(encryptedToken
.begin(), encryptedToken
.end());
344 encryptedTokenPacket
.insert(encryptedTokenPacket
.begin(), nonce
.value
.begin(), nonce
.value
.end());
345 return encryptedTokenPacket
;
347 catch (const std::exception
& exp
) {
348 vinfolog("Error while minting DoQ token: %s", exp
.what());
353 // returns the original destination ID if the token is valid, nothing otherwise
354 static std::optional
<PacketBuffer
> validateToken(const PacketBuffer
& token
, const PacketBuffer
& dcid
, const ComboAddress
& peer
)
358 auto addrBytes
= peer
.toByteString();
359 const uint64_t now
= time(nullptr);
360 const auto minimumSize
= nonce
.value
.size() + sizeof(now
) + addrBytes
.size();
361 if (token
.size() <= minimumSize
) {
365 memcpy(nonce
.value
.data(), token
.data(), nonce
.value
.size());
367 auto cipher
= std::string_view(reinterpret_cast<const char*>(&token
.at(nonce
.value
.size())), token
.size() - nonce
.value
.size());
368 auto plainText
= sodDecryptSym(cipher
, s_quicRetryTokenKey
, nonce
, false);
370 if (plainText
.size() <= sizeof(now
) + addrBytes
.size()) {
375 memcpy(&ttd
, plainText
.data(), sizeof(ttd
));
380 if (std::memcmp(&plainText
.at(sizeof(ttd
)), &*addrBytes
.begin(), addrBytes
.size()) != 0) {
383 return PacketBuffer(plainText
.begin() + (sizeof(ttd
) + addrBytes
.size()), plainText
.end());
385 catch (const std::exception
& exp
) {
386 vinfolog("Error while validating DoQ token: %s", exp
.what());
391 static void handleStatelessRetry(Socket
& sock
, const PacketBuffer
& clientConnID
, const PacketBuffer
& serverConnID
, const ComboAddress
& peer
, uint32_t version
)
393 auto newServerConnID
= getCID();
394 if (!newServerConnID
) {
398 auto token
= mintToken(serverConnID
, peer
);
400 PacketBuffer
out(MAX_DATAGRAM_SIZE
);
401 auto written
= quiche_retry(clientConnID
.data(), clientConnID
.size(),
402 serverConnID
.data(), serverConnID
.size(),
403 newServerConnID
->data(), newServerConnID
->size(),
404 token
.data(), token
.size(),
406 out
.data(), out
.size());
409 DEBUGLOG("failed to create retry packet " << written
);
414 sock
.sendTo(std::string(out
.begin(), out
.end()), peer
);
417 static void handleVersionNegociation(Socket
& sock
, const PacketBuffer
& clientConnID
, const PacketBuffer
& serverConnID
, const ComboAddress
& peer
)
419 PacketBuffer
out(MAX_DATAGRAM_SIZE
);
421 auto written
= quiche_negotiate_version(clientConnID
.data(), clientConnID
.size(),
422 serverConnID
.data(), serverConnID
.size(),
423 out
.data(), out
.size());
426 DEBUGLOG("failed to create vneg packet " << written
);
429 sock
.sendTo(reinterpret_cast<const char*>(out
.data()), written
, peer
);
432 static std::optional
<std::reference_wrapper
<Connection
>> getConnection(const PacketBuffer
& id
)
434 auto it
= s_connections
.find(id
);
435 if (it
== s_connections
.end()) {
441 static void sendBackDOQUnit(DOQUnitUniquePtr
&& du
, const char* description
)
443 if (du
->dsc
== nullptr) {
447 if (!du
->dsc
->d_responseSender
.send(std::move(du
))) {
448 vinfolog("Unable to pass a %s to the DoQ worker thread because the pipe is full", description
);
451 catch (const std::exception
& e
) {
452 vinfolog("Unable to pass a %s to the DoQ worker thread because we couldn't write to the pipe: %s", description
, e
.what());
456 static std::optional
<std::reference_wrapper
<Connection
>> createConnection(QuicheConfig
& config
, const PacketBuffer
& serverSideID
, const PacketBuffer
& originalDestinationID
, const PacketBuffer
& token
, const ComboAddress
& local
, const ComboAddress
& peer
)
458 auto quicheConn
= QuicheConnection(quiche_accept(serverSideID
.data(), serverSideID
.size(),
459 originalDestinationID
.data(), originalDestinationID
.size(),
460 (struct sockaddr
*)&local
,
462 (struct sockaddr
*)&peer
,
466 auto conn
= Connection(peer
, std::move(quicheConn
));
467 auto pair
= s_connections
.emplace(serverSideID
, std::move(conn
));
468 return pair
.first
->second
;
471 static void flushEgress(Socket
& sock
, Connection
& conn
)
473 std::array
<uint8_t, MAX_DATAGRAM_SIZE
> out
;
474 quiche_send_info send_info
;
477 auto written
= quiche_conn_send(conn
.d_conn
.get(), out
.data(), out
.size(), &send_info
);
479 if (written
== QUICHE_ERR_DONE
) {
486 // FIXME pacing (as send_info.at should tell us when to send the packet) ?
487 sock
.sendTo(reinterpret_cast<const char*>(out
.data()), written
, conn
.d_peer
);
491 std::unique_ptr
<CrossProtocolQuery
> getDOQCrossProtocolQueryFromDQ(DNSQuestion
& dq
, bool isResponse
)
494 throw std::runtime_error("Trying to create a DoQ cross protocol query without a valid DoQ unit");
497 auto du
= std::move(dq
.ids
.doqu
);
498 if (&dq
.ids
!= &du
->ids
) {
499 du
->ids
= std::move(dq
.ids
);
502 du
->ids
.origID
= dq
.getHeader()->id
;
505 if (du
->query
.data() != dq
.getMutableData().data()) {
506 du
->query
= std::move(dq
.getMutableData());
510 if (du
->response
.data() != dq
.getMutableData().data()) {
511 du
->response
= std::move(dq
.getMutableData());
515 return std::make_unique
<DOQCrossProtocolQuery
>(std::move(du
), isResponse
);
519 We are not in the main DoQ thread but in the DoQ 'client' thread.
521 static void processDOQQuery(DOQUnitUniquePtr
&& unit
)
523 const auto handleImmediateResponse
= [](DOQUnitUniquePtr
&& du
, const char* reason
) {
524 DEBUGLOG("handleImmediateResponse() reason=" << reason
);
525 auto conn
= getConnection(du
->serverConnID
);
526 handleResponse(*du
->dsc
->df
, *conn
, du
->streamID
, du
->response
);
527 du
->ids
.doqu
.reset();
530 auto& ids
= unit
->ids
;
531 ids
.doqu
= std::move(unit
);
533 uint16_t queryId
= 0;
538 remote
= du
->ids
.origRemote
;
539 DOQServerConfig
* dsc
= du
->dsc
;
540 auto& holders
= dsc
->holders
;
541 ClientState
& cs
= *dsc
->cs
;
543 if (du
->query
.size() < sizeof(dnsheader
)) {
544 ++dnsdist::metrics::g_stats
.nonCompliantQueries
;
545 ++cs
.nonCompliantQueries
;
546 struct dnsheader
* dh
= reinterpret_cast<struct dnsheader
*>(du
->query
.data());
547 dh
->rcode
= RCode::ServFail
;
549 du
->response
= std::move(du
->query
);
551 handleImmediateResponse(std::move(du
), "DoQ non-compliant query");
556 ++dnsdist::metrics::g_stats
.queries
;
557 du
->ids
.queryRealTime
.start();
560 /* don't keep that pointer around, it will be invalidated if the buffer is ever resized */
561 struct dnsheader
* dh
= reinterpret_cast<struct dnsheader
*>(du
->query
.data());
563 if (!checkQueryHeaders(dh
, cs
)) {
564 dh
->rcode
= RCode::ServFail
;
566 du
->response
= std::move(du
->query
);
568 handleImmediateResponse(std::move(du
), "DoQ invalid headers");
572 if (dh
->qdcount
== 0) {
573 dh
->rcode
= RCode::NotImp
;
575 du
->response
= std::move(du
->query
);
577 handleImmediateResponse(std::move(du
), "DoQ empty query");
581 queryId
= ntohs(dh
->id
);
584 auto downstream
= du
->downstream
;
585 du
->ids
.qname
= DNSName(reinterpret_cast<const char*>(du
->query
.data()), du
->query
.size(), sizeof(dnsheader
), false, &du
->ids
.qtype
, &du
->ids
.qclass
);
586 DNSQuestion
dq(du
->ids
, du
->query
);
587 const uint16_t* flags
= getFlagsFromDNSHeader(dq
.getHeader());
588 ids
.origFlags
= *flags
;
591 auto result
= processQuery(dq
, holders
, downstream
);
592 if (result
== ProcessQueryResult::Drop
) {
593 handleImmediateResponse(std::move(du
), "DoQ dropped query");
596 else if (result
== ProcessQueryResult::Asynchronous
) {
599 else if (result
== ProcessQueryResult::SendAnswer
) {
600 if (du
->response
.empty()) {
601 du
->response
= std::move(du
->query
);
603 if (du
->response
.size() >= sizeof(dnsheader
)) {
604 auto dh
= reinterpret_cast<const struct dnsheader
*>(du
->response
.data());
606 handleResponseSent(du
->ids
.qname
, QType(du
->ids
.qtype
), 0., du
->ids
.origDest
, ComboAddress(), du
->response
.size(), *dh
, dnsdist::Protocol::DoQ
, dnsdist::Protocol::DoQ
, false);
608 handleImmediateResponse(std::move(du
), "DoQ self-answered response");
612 ++dnsdist::metrics::g_stats
.responses
;
613 if (du
->ids
.cs
!= nullptr) {
614 ++du
->ids
.cs
->responses
;
617 if (result
!= ProcessQueryResult::PassToBackend
) {
618 handleImmediateResponse(std::move(du
), "DoQ no backend available");
622 if (downstream
== nullptr) {
623 handleImmediateResponse(std::move(du
), "DoQ no backend available");
627 du
->downstream
= downstream
;
629 std::string proxyProtocolPayload
;
630 /* we need to do this _before_ creating the cross protocol query because
631 after that the buffer will have been moved */
632 if (downstream
->d_config
.useProxyProtocol
) {
633 proxyProtocolPayload
= getProxyProtocolPayload(dq
);
636 du
->ids
.origID
= htons(queryId
);
639 /* this moves du->ids, careful! */
640 auto cpq
= std::make_unique
<DOQCrossProtocolQuery
>(std::move(du
), false);
641 cpq
->query
.d_proxyProtocolPayload
= std::move(proxyProtocolPayload
);
643 if (downstream
->passCrossProtocolQuery(std::move(cpq
))) {
647 du
= cpq
->releaseDU();
648 handleImmediateResponse(std::move(du
), "DoQ internal error");
652 catch (const std::exception
& e
) {
653 vinfolog("Got an error in DOQ question thread while parsing a query from %s, id %d: %s", remote
.toStringWithPort(), queryId
, e
.what());
654 handleImmediateResponse(std::move(du
), "DoQ internal error");
661 static void doq_dispatch_query(DOQServerConfig
& dsc
, PacketBuffer
&& query
, const ComboAddress
& local
, const ComboAddress
& remote
, const PacketBuffer
& serverConnID
, const uint64_t streamID
)
664 /* we only parse it there as a sanity check, we will parse it again later */
665 DNSPacketMangler
mangler(reinterpret_cast<char*>(query
.data()), query
.size());
666 mangler
.skipDomainName();
667 mangler
.skipBytes(4);
668 // Should we ensure message id is 0 ?
670 auto du
= std::make_unique
<DOQUnit
>(std::move(query
));
672 du
->ids
.origDest
= local
;
673 du
->ids
.origRemote
= remote
;
674 du
->ids
.protocol
= dnsdist::Protocol::DoQ
;
675 du
->serverConnID
= serverConnID
;
676 du
->streamID
= streamID
;
678 processDOQQuery(std::move(du
));
680 catch (const std::exception
& exp
) {
681 vinfolog("Had error parsing DoQ DNS packet from %s: %s", remote
.toStringWithPort(), exp
.what());
685 static void flushResponses(pdns::channel::Receiver
<DOQUnit
>& receiver
)
689 auto tmp
= receiver
.receive();
694 auto du
= std::move(*tmp
);
695 auto conn
= getConnection(du
->serverConnID
);
697 handleResponse(*du
->dsc
->df
, *conn
, du
->streamID
, du
->response
);
699 catch (const std::exception
& e
) {
700 errlog("Error while processing response received over DoQ: %s", e
.what());
703 errlog("Unspecified error while processing response received over DoQ");
708 // this is the entrypoint from dnsdist.cc
709 void doqThread(ClientState
* cs
)
712 std::shared_ptr
<DOQFrontend
>& frontend
= cs
->doqFrontend
;
714 frontend
->d_server_config
->cs
= cs
;
715 frontend
->d_server_config
->df
= cs
->doqFrontend
;
717 setThreadName("dnsdist/doq");
719 Socket
sock(cs
->udpFD
);
721 PacketBuffer
buffer(std::numeric_limits
<unsigned short>::max());
722 auto mplexer
= std::unique_ptr
<FDMultiplexer
>(FDMultiplexer::getMultiplexerSilent());
724 auto responseReceiverFD
= frontend
->d_server_config
->d_responseReceiver
.getDescriptor();
725 mplexer
->addReadFD(sock
.getHandle(), [](int, FDMultiplexer::funcparam_t
&) {});
726 mplexer
->addReadFD(responseReceiverFD
, [](int, FDMultiplexer::funcparam_t
&) {});
728 std::vector
<int> readyFDs
;
729 mplexer
->getAvailableFDs(readyFDs
, 500);
731 if (std::find(readyFDs
.begin(), readyFDs
.end(), sock
.getHandle()) != readyFDs
.end()) {
732 std::string bufferStr
;
734 sock
.recvFrom(bufferStr
, client
);
738 std::array
<uint8_t, QUICHE_MAX_CONN_ID_LEN
> scid
;
739 size_t scid_len
= scid
.size();
740 std::array
<uint8_t, QUICHE_MAX_CONN_ID_LEN
> dcid
;
741 size_t dcid_len
= dcid
.size();
742 std::array
<uint8_t, MAX_TOKEN_LEN
> token
;
743 size_t token_len
= token
.size();
745 auto res
= quiche_header_info(reinterpret_cast<const uint8_t*>(bufferStr
.data()), bufferStr
.size(), LOCAL_CONN_ID_LEN
,
747 scid
.data(), &scid_len
,
748 dcid
.data(), &dcid_len
,
749 token
.data(), &token_len
);
754 // destination connection ID, will have to be sent as original destination connection ID
755 PacketBuffer
serverConnID(dcid
.begin(), dcid
.begin() + dcid_len
);
756 // source connection ID, will have to be sent as destination connection ID
757 PacketBuffer
clientConnID(scid
.begin(), scid
.begin() + scid_len
);
758 auto conn
= getConnection(serverConnID
);
761 DEBUGLOG("Connection not found");
762 if (!quiche_version_is_supported(version
)) {
763 DEBUGLOG("Unsupported version");
764 handleVersionNegociation(sock
, clientConnID
, serverConnID
, client
);
768 if (token_len
== 0) {
769 /* stateless retry */
770 DEBUGLOG("No token received");
771 handleStatelessRetry(sock
, clientConnID
, serverConnID
, client
, version
);
775 PacketBuffer
tokenBuf(token
.begin(), token
.begin() + token_len
);
776 auto originalDestinationID
= validateToken(tokenBuf
, serverConnID
, client
);
777 if (!originalDestinationID
) {
778 DEBUGLOG("Discarding invalid token");
782 DEBUGLOG("Creating a new connection");
783 conn
= createConnection(frontend
->d_server_config
->config
, serverConnID
, *originalDestinationID
, tokenBuf
, cs
->local
, client
);
788 quiche_recv_info recv_info
= {
789 (struct sockaddr
*)&client
,
792 (struct sockaddr
*)&cs
->local
,
793 cs
->local
.getSocklen(),
796 auto done
= quiche_conn_recv(conn
->get().d_conn
.get(), reinterpret_cast<uint8_t*>(bufferStr
.data()), bufferStr
.size(), &recv_info
);
801 if (quiche_conn_is_established(conn
->get().d_conn
.get())) {
802 auto readable
= std::unique_ptr
<quiche_stream_iter
, decltype(&quiche_stream_iter_free
)>(quiche_conn_readable(conn
->get().d_conn
.get()), quiche_stream_iter_free
);
804 uint64_t streamID
= 0;
805 while (quiche_stream_iter_next(readable
.get(), &streamID
)) {
807 buffer
.resize(std::numeric_limits
<unsigned short>::max());
808 auto received
= quiche_conn_stream_recv(conn
->get().d_conn
.get(), streamID
,
809 buffer
.data(), buffer
.size(),
814 buffer
.resize(received
);
817 // we skip message length, should we verify ?
818 buffer
.erase(buffer
.begin(), buffer
.begin() + 2);
819 if (buffer
.size() >= sizeof(dnsheader
)) {
820 doq_dispatch_query(*(frontend
->d_server_config
), std::move(buffer
), cs
->local
, client
, serverConnID
, streamID
);
826 DEBUGLOG("Connection not established");
831 if (std::find(readyFDs
.begin(), readyFDs
.end(), responseReceiverFD
) != readyFDs
.end()) {
832 flushResponses(frontend
->d_server_config
->d_responseReceiver
);
835 for (auto conn
= s_connections
.begin(); conn
!= s_connections
.end();) {
836 quiche_conn_on_timeout(conn
->second
.d_conn
.get());
838 flushEgress(sock
, conn
->second
);
840 if (quiche_conn_is_closed(conn
->second
.d_conn
.get())) {
841 #ifdef DEBUGLOG_ENABLED
843 quiche_path_stats path_stats
;
845 quiche_conn_stats(conn
->second
.d_conn
.get(), &stats
);
846 quiche_conn_path_stats(conn
->second
.d_conn
.get(), 0, &path_stats
);
848 DEBUGLOG("Connection closed, recv=" << stats
.recv
<< " sent=" << stats
.sent
<< " lost=" << stats
.lost
<< " rtt=" << path_stats
.rtt
<< "ns cwnd=" << path_stats
.cwnd
);
850 conn
= s_connections
.erase(conn
);
858 catch (const std::exception
& e
) {
859 DEBUGLOG("Caught fatal error: " << e
.what());