]> git.ipfire.org Git - thirdparty/pdns.git/blame - pdns/dnsdistdist/doq.cc
dnsdist: Use the correct source IP for outgoing QUIC datagrams
[thirdparty/pdns.git] / pdns / dnsdistdist / doq.cc
CommitLineData
57a94421
CHB
1/*
2 * This file is part of PowerDNS or dnsdist.
3 * Copyright -- PowerDNS.COM B.V. and its contributors
4 *
5 * This program is free software; you can redistribute it and/or modify
6 * it under the terms of version 2 of the GNU General Public License as
7 * published by the Free Software Foundation.
8 *
9 * In addition, for the avoidance of any doubt, permission is granted to
10 * link this program with OpenSSL and to (re)distribute the binaries
11 * produced as the result of such linking.
12 *
13 * This program is distributed in the hope that it will be useful,
14 * but WITHOUT ANY WARRANTY; without even the implied warranty of
15 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16 * GNU General Public License for more details.
17 *
18 * You should have received a copy of the GNU General Public License
19 * along with this program; if not, write to the Free Software
20 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
21 */
22
23#include "doq.hh"
24
acde6658
RG
25#ifdef HAVE_DNS_OVER_QUIC
26#include <quiche.h>
27
57a94421
CHB
28#include "dolog.hh"
29#include "iputils.hh"
30#include "misc.hh"
31#include "sstuff.hh"
57a94421 32#include "threadname.hh"
acde6658 33
d5d9573c 34#include "dnsdist-dnsparser.hh"
559942b3 35#include "dnsdist-ecs.hh"
57a94421 36#include "dnsdist-proxy-protocol.hh"
acde6658
RG
37#include "dnsdist-tcp.hh"
38#include "dnsdist-random.hh"
fc7c7538 39
ed16c81f 40#include "doq-common.hh"
57a94421 41
ed16c81f 42using namespace dnsdist::doq;
805b22fe 43
d0439b42
CHB
44#if 0
45#define DEBUGLOG_ENABLED
46#define DEBUGLOG(x) std::cerr << x << std::endl;
47#else
48#define DEBUGLOG(x)
49#endif
50
805b22fe
RG
51class Connection
52{
53public:
699689aa
RG
54 Connection(const ComboAddress& peer, const ComboAddress& localAddr, QuicheConfig config, QuicheConnection conn) :
55 d_peer(peer), d_localAddr(localAddr), d_conn(std::move(conn)), d_config(std::move(config))
805b22fe
RG
56 {
57 }
58 Connection(const Connection&) = delete;
59 Connection(Connection&&) = default;
60 Connection& operator=(const Connection&) = delete;
61 Connection& operator=(Connection&&) = default;
62 ~Connection() = default;
63
64 ComboAddress d_peer;
699689aa 65 ComboAddress d_localAddr;
805b22fe 66 QuicheConnection d_conn;
3791b473
RG
67 QuicheConfig d_config;
68
6c66428a 69 std::unordered_map<uint64_t, PacketBuffer> d_streamBuffers;
c6886da0 70 std::unordered_map<uint64_t, PacketBuffer> d_streamOutBuffers;
805b22fe
RG
71};
72
3201998f
RG
73static void sendBackDOQUnit(DOQUnitUniquePtr&& unit, const char* description);
74
a1a819fd 75struct DOQServerConfig
57a94421 76{
acde6658 77 DOQServerConfig(QuicheConfig&& config_, uint32_t internalPipeBufferSize) :
57a94421
CHB
78 config(std::move(config_))
79 {
f7d508b5
CHB
80 {
81 auto [sender, receiver] = pdns::channel::createObjectQueue<DOQUnit>(pdns::channel::SenderBlockingMode::SenderNonBlocking, pdns::channel::ReceiverBlockingMode::ReceiverNonBlocking, internalPipeBufferSize);
82 d_responseSender = std::move(sender);
83 d_responseReceiver = std::move(receiver);
84 }
57a94421
CHB
85 }
86 DOQServerConfig(const DOQServerConfig&) = delete;
87 DOQServerConfig(DOQServerConfig&&) = default;
88 DOQServerConfig& operator=(const DOQServerConfig&) = delete;
89 DOQServerConfig& operator=(DOQServerConfig&&) = default;
90 ~DOQServerConfig() = default;
91
f016a7d5
RG
92 using ConnectionsMap = std::map<PacketBuffer, Connection>;
93
57a94421 94 LocalHolders holders;
f016a7d5 95 ConnectionsMap d_connections;
57a94421 96 QuicheConfig config;
ec9548d7 97 ClientState* clientState{nullptr};
57a94421 98 std::shared_ptr<DOQFrontend> df{nullptr};
f7d508b5
CHB
99 pdns::channel::Sender<DOQUnit> d_responseSender;
100 pdns::channel::Receiver<DOQUnit> d_responseReceiver;
57a94421
CHB
101};
102
acde6658
RG
103/* these might seem useless, but they are needed because
104 they need to be declared _after_ the definition of DOQServerConfig
105 so that we can use a unique_ptr in DOQFrontend */
3201998f
RG
106DOQFrontend::DOQFrontend() = default;
107DOQFrontend::~DOQFrontend() = default;
acde6658 108
57a94421
CHB
109class DOQTCPCrossQuerySender final : public TCPQuerySender
110{
111public:
3201998f 112 DOQTCPCrossQuerySender() = default;
57a94421 113
3201998f 114 [[nodiscard]] bool active() const override
57a94421
CHB
115 {
116 return true;
117 }
118
3201998f 119 void handleResponse([[maybe_unused]] const struct timeval& now, TCPResponse&& response) override
57a94421
CHB
120 {
121 if (!response.d_idstate.doqu) {
122 return;
123 }
124
3201998f
RG
125 auto unit = std::move(response.d_idstate.doqu);
126 if (unit->dsc == nullptr) {
f7d508b5
CHB
127 return;
128 }
57a94421 129
3201998f
RG
130 unit->response = std::move(response.d_buffer);
131 unit->ids = std::move(response.d_idstate);
132 DNSResponse dnsResponse(unit->ids, unit->response, unit->downstream);
57a94421 133
c0e87fe4 134 dnsheader cleartextDH{};
90686725 135 memcpy(&cleartextDH, dnsResponse.getHeader().get(), sizeof(cleartextDH));
57a94421
CHB
136
137 if (!response.isAsync()) {
138
3fc92d67
RG
139 static thread_local LocalStateHolder<vector<dnsdist::rules::ResponseRuleAction>> localRespRuleActions = dnsdist::rules::getResponseRuleChainHolder(dnsdist::rules::ResponseRuleChain::ResponseRules).getLocal();
140 static thread_local LocalStateHolder<vector<dnsdist::rules::ResponseRuleAction>> localCacheInsertedRespRuleActions = dnsdist::rules::getResponseRuleChainHolder(dnsdist::rules::ResponseRuleChain::CacheInsertedResponseRules).getLocal();
57a94421 141
3201998f 142 dnsResponse.ids.doqu = std::move(unit);
57a94421 143
3201998f
RG
144 if (!processResponse(dnsResponse.ids.doqu->response, *localRespRuleActions, *localCacheInsertedRespRuleActions, dnsResponse, false)) {
145 if (dnsResponse.ids.doqu) {
57a94421 146
3201998f 147 sendBackDOQUnit(std::move(dnsResponse.ids.doqu), "Response dropped by rules");
57a94421
CHB
148 }
149 return;
150 }
151
3201998f 152 if (dnsResponse.isAsynchronous()) {
57a94421
CHB
153 return;
154 }
155
3201998f 156 unit = std::move(dnsResponse.ids.doqu);
57a94421
CHB
157 }
158
3201998f
RG
159 if (!unit->ids.selfGenerated) {
160 double udiff = unit->ids.queryRealTime.udiff();
a6a6615c 161 vinfolog("Got answer from %s, relayed to %s (quic, %d bytes), took %f us", unit->downstream->d_config.remote.toStringWithPort(), unit->ids.origRemote.toStringWithPort(), unit->response.size(), udiff);
57a94421 162
3201998f
RG
163 auto backendProtocol = unit->downstream->getProtocol();
164 if (backendProtocol == dnsdist::Protocol::DoUDP && unit->tcp) {
57a94421
CHB
165 backendProtocol = dnsdist::Protocol::DoTCP;
166 }
3201998f 167 handleResponseSent(unit->ids, udiff, unit->ids.origRemote, unit->downstream->d_config.remote, unit->response.size(), cleartextDH, backendProtocol, true);
57a94421
CHB
168 }
169
170 ++dnsdist::metrics::g_stats.responses;
3201998f
RG
171 if (unit->ids.cs != nullptr) {
172 ++unit->ids.cs->responses;
57a94421
CHB
173 }
174
3201998f 175 sendBackDOQUnit(std::move(unit), "Cross-protocol response");
57a94421
CHB
176 }
177
178 void handleXFRResponse(const struct timeval& now, TCPResponse&& response) override
179 {
180 return handleResponse(now, std::move(response));
181 }
182
ef4350f8 183 void notifyIOError([[maybe_unused]] const struct timeval& now, TCPResponse&& response) override
57a94421 184 {
ef4350f8
RG
185 if (!response.d_idstate.doqu) {
186 return;
187 }
188
189 auto unit = std::move(response.d_idstate.doqu);
190 if (unit->dsc == nullptr) {
191 return;
192 }
193
194 /* this will signal an error */
195 unit->response.clear();
196 unit->ids = std::move(response.d_idstate);
197 sendBackDOQUnit(std::move(unit), "Cross-protocol error");
3dc49a89 198 }
57a94421
CHB
199};
200
201class DOQCrossProtocolQuery : public CrossProtocolQuery
202{
203public:
3201998f 204 DOQCrossProtocolQuery(DOQUnitUniquePtr&& unit, bool isResponse)
57a94421
CHB
205 {
206 if (isResponse) {
207 /* happens when a response becomes async */
3201998f 208 query = InternalQuery(std::move(unit->response), std::move(unit->ids));
57a94421
CHB
209 }
210 else {
211 /* we need to duplicate the query here because we might need
212 the existing query later if we get a truncated answer */
3201998f 213 query = InternalQuery(PacketBuffer(unit->query), std::move(unit->ids));
57a94421
CHB
214 }
215
3201998f
RG
216 /* it might have been moved when we moved unit->ids */
217 if (unit) {
218 query.d_idstate.doqu = std::move(unit);
57a94421
CHB
219 }
220
221 /* we _could_ remove it from the query buffer and put in query's d_proxyProtocolPayload,
3201998f 222 clearing query.d_proxyProtocolPayloadAdded and unit->proxyProtocolPayloadSize.
57a94421
CHB
223 Leave it for now because we know that the onky case where the payload has been
224 added is when we tried over UDP, got a TC=1 answer and retried over TCP/DoT,
225 and we know the TCP/DoT code can handle it. */
226 query.d_proxyProtocolPayloadAdded = query.d_idstate.doqu->proxyProtocolPayloadSize > 0;
227 downstream = query.d_idstate.doqu->downstream;
228 }
229
230 void handleInternalError()
231 {
3dc49a89 232 sendBackDOQUnit(std::move(query.d_idstate.doqu), "DOQ internal error");
57a94421
CHB
233 }
234
235 std::shared_ptr<TCPQuerySender> getTCPQuerySender() override
236 {
237 query.d_idstate.doqu->downstream = downstream;
238 return s_sender;
239 }
240
241 DNSQuestion getDQ() override
242 {
243 auto& ids = query.d_idstate;
3201998f
RG
244 DNSQuestion dnsQuestion(ids, query.d_buffer);
245 return dnsQuestion;
57a94421
CHB
246 }
247
248 DNSResponse getDR() override
249 {
250 auto& ids = query.d_idstate;
3201998f
RG
251 DNSResponse dnsResponse(ids, query.d_buffer, downstream);
252 return dnsResponse;
182af582 253 }
57a94421
CHB
254
255 DOQUnitUniquePtr&& releaseDU()
256 {
257 return std::move(query.d_idstate.doqu);
258 }
259
260private:
261 static std::shared_ptr<DOQTCPCrossQuerySender> s_sender;
262};
263
264std::shared_ptr<DOQTCPCrossQuerySender> DOQCrossProtocolQuery::s_sender = std::make_shared<DOQTCPCrossQuerySender>();
265
c6886da0
RG
266static bool tryWriteResponse(Connection& conn, const uint64_t streamID, PacketBuffer& response)
267{
268 size_t pos = 0;
269 while (pos < response.size()) {
270 auto res = quiche_conn_stream_send(conn.d_conn.get(), streamID, &response.at(pos), response.size() - pos, true);
271 if (res == QUICHE_ERR_DONE) {
272 response.erase(response.begin(), response.begin() + static_cast<ssize_t>(pos));
273 return false;
274 }
275 if (res < 0) {
276 quiche_conn_stream_shutdown(conn.d_conn.get(), streamID, QUICHE_SHUTDOWN_WRITE, static_cast<uint64_t>(DOQ_Error_Codes::DOQ_INTERNAL_ERROR));
277 return true;
278 }
279 pos += res;
280 }
281
282 return true;
283}
284
285static void handleResponse(DOQFrontend& frontend, Connection& conn, const uint64_t streamID, PacketBuffer& response)
57a94421 286{
3201998f
RG
287 if (response.empty()) {
288 ++frontend.d_errorResponses;
6c66428a
RG
289 quiche_conn_stream_shutdown(conn.d_conn.get(), streamID, QUICHE_SHUTDOWN_WRITE, static_cast<uint64_t>(DOQ_Error_Codes::DOQ_UNSPECIFIED_ERROR));
290 return;
182af582 291 }
3201998f
RG
292 ++frontend.d_validResponses;
293 auto responseSize = static_cast<uint16_t>(response.size());
6c66428a 294 const std::array<uint8_t, 2> sizeBytes = {static_cast<uint8_t>(responseSize / 256), static_cast<uint8_t>(responseSize % 256)};
c6886da0
RG
295 response.insert(response.begin(), sizeBytes.begin(), sizeBytes.end());
296 if (!tryWriteResponse(conn, streamID, response)) {
297 conn.d_streamOutBuffers[streamID] = std::move(response);
3201998f 298 }
57a94421
CHB
299}
300
301void DOQFrontend::setup()
302{
303 auto config = QuicheConfig(quiche_config_new(QUICHE_PROTOCOL_VERSION), quiche_config_free);
ed16c81f 304 d_quicheParams.d_alpn = std::string(DOQ_ALPN.begin(), DOQ_ALPN.end());
2e7989f5 305 configureQuiche(config, d_quicheParams, false);
acde6658 306 d_server_config = std::make_unique<DOQServerConfig>(std::move(config), d_internalPipeBufferSize);
57a94421
CHB
307}
308
3791b473
RG
309void DOQFrontend::reloadCertificates()
310{
311 auto config = QuicheConfig(quiche_config_new(QUICHE_PROTOCOL_VERSION), quiche_config_free);
312 d_quicheParams.d_alpn = std::string(DOQ_ALPN.begin(), DOQ_ALPN.end());
313 configureQuiche(config, d_quicheParams, false);
314 std::atomic_store_explicit(&d_server_config->config, std::move(config), std::memory_order_release);
315}
316
f016a7d5 317static std::optional<std::reference_wrapper<Connection>> getConnection(DOQServerConfig::ConnectionsMap& connMap, const PacketBuffer& connID)
57a94421 318{
f016a7d5
RG
319 auto iter = connMap.find(connID);
320 if (iter == connMap.end()) {
57a94421
CHB
321 return std::nullopt;
322 }
f016a7d5 323 return iter->second;
57a94421
CHB
324}
325
3201998f 326static void sendBackDOQUnit(DOQUnitUniquePtr&& unit, const char* description)
3dc49a89 327{
3201998f 328 if (unit->dsc == nullptr) {
f7d508b5
CHB
329 return;
330 }
331 try {
3201998f 332 if (!unit->dsc->d_responseSender.send(std::move(unit))) {
f089d71c 333 ++dnsdist::metrics::g_stats.doqResponsePipeFull;
f7d508b5
CHB
334 vinfolog("Unable to pass a %s to the DoQ worker thread because the pipe is full", description);
335 }
357b3faa
CHB
336 }
337 catch (const std::exception& e) {
f7d508b5
CHB
338 vinfolog("Unable to pass a %s to the DoQ worker thread because we couldn't write to the pipe: %s", description, e.what());
339 }
3dc49a89
CHB
340}
341
699689aa 342static std::optional<std::reference_wrapper<Connection>> createConnection(DOQServerConfig& config, const PacketBuffer& serverSideID, const PacketBuffer& originalDestinationID, const ComboAddress& peer, const ComboAddress& localAddr)
57a94421 343{
0011a825 344 auto quicheConfig = std::atomic_load_explicit(&config.config, std::memory_order_acquire);
57a94421
CHB
345 auto quicheConn = QuicheConnection(quiche_accept(serverSideID.data(), serverSideID.size(),
346 originalDestinationID.data(), originalDestinationID.size(),
3201998f 347 // NOLINTNEXTLINE(cppcoreguidelines-pro-type-reinterpret-cast)
699689aa
RG
348 reinterpret_cast<const struct sockaddr*>(&localAddr),
349 localAddr.getSocklen(),
3201998f
RG
350 // NOLINTNEXTLINE(cppcoreguidelines-pro-type-reinterpret-cast)
351 reinterpret_cast<const struct sockaddr*>(&peer),
57a94421 352 peer.getSocklen(),
0011a825 353 quicheConfig.get()),
182af582 354 quiche_conn_free);
1cba32b0 355
ed16c81f
CHB
356 if (config.df && !config.df->d_quicheParams.d_keyLogFile.empty()) {
357 quiche_conn_set_keylog_path(quicheConn.get(), config.df->d_quicheParams.d_keyLogFile.c_str());
1cba32b0
RG
358 }
359
699689aa 360 auto conn = Connection(peer, localAddr, std::move(quicheConfig), std::move(quicheConn));
f016a7d5 361 auto pair = config.d_connections.emplace(serverSideID, std::move(conn));
57a94421
CHB
362 return pair.first->second;
363}
364
3201998f 365std::unique_ptr<CrossProtocolQuery> getDOQCrossProtocolQueryFromDQ(DNSQuestion& dnsQuestion, bool isResponse)
57a94421 366{
3201998f 367 if (!dnsQuestion.ids.doqu) {
57a94421
CHB
368 throw std::runtime_error("Trying to create a DoQ cross protocol query without a valid DoQ unit");
369 }
370
3201998f
RG
371 auto unit = std::move(dnsQuestion.ids.doqu);
372 if (&dnsQuestion.ids != &unit->ids) {
373 unit->ids = std::move(dnsQuestion.ids);
57a94421
CHB
374 }
375
3201998f 376 unit->ids.origID = dnsQuestion.getHeader()->id;
57a94421
CHB
377
378 if (!isResponse) {
3201998f
RG
379 if (unit->query.data() != dnsQuestion.getMutableData().data()) {
380 unit->query = std::move(dnsQuestion.getMutableData());
57a94421
CHB
381 }
382 }
383 else {
3201998f
RG
384 if (unit->response.data() != dnsQuestion.getMutableData().data()) {
385 unit->response = std::move(dnsQuestion.getMutableData());
57a94421
CHB
386 }
387 }
388
3201998f 389 return std::make_unique<DOQCrossProtocolQuery>(std::move(unit), isResponse);
57a94421
CHB
390}
391
3201998f 392static void processDOQQuery(DOQUnitUniquePtr&& doqUnit)
57a94421 393{
3201998f 394 const auto handleImmediateResponse = [](DOQUnitUniquePtr&& unit, [[maybe_unused]] const char* reason) {
57a94421 395 DEBUGLOG("handleImmediateResponse() reason=" << reason);
3201998f
RG
396 auto conn = getConnection(unit->dsc->df->d_server_config->d_connections, unit->serverConnID);
397 handleResponse(*unit->dsc->df, *conn, unit->streamID, unit->response);
398 unit->ids.doqu.reset();
57a94421
CHB
399 };
400
3201998f
RG
401 auto& ids = doqUnit->ids;
402 ids.doqu = std::move(doqUnit);
403 auto& unit = ids.doqu;
57a94421
CHB
404 uint16_t queryId = 0;
405 ComboAddress remote;
406
407 try {
57a94421 408
3201998f
RG
409 remote = unit->ids.origRemote;
410 DOQServerConfig* dsc = unit->dsc;
57a94421 411 auto& holders = dsc->holders;
ec9548d7 412 ClientState& clientState = *dsc->clientState;
57a94421 413
2aaf9ecd
CHB
414 if (!holders.acl->match(remote)) {
415 vinfolog("Query from %s (DoQ) dropped because of ACL", remote.toStringWithPort());
416 ++dnsdist::metrics::g_stats.aclDrops;
417 unit->response.clear();
418
419 handleImmediateResponse(std::move(unit), "DoQ query dropped because of ACL");
420 return;
421 }
422
3201998f 423 if (unit->query.size() < sizeof(dnsheader)) {
182af582 424 ++dnsdist::metrics::g_stats.nonCompliantQueries;
ec9548d7 425 ++clientState.nonCompliantQueries;
d5d9573c 426 unit->response.clear();
3dc49a89 427
3201998f 428 handleImmediateResponse(std::move(unit), "DoQ non-compliant query");
57a94421
CHB
429 return;
430 }
431
ec9548d7 432 ++clientState.queries;
182af582 433 ++dnsdist::metrics::g_stats.queries;
3201998f 434 unit->ids.queryRealTime.start();
57a94421
CHB
435
436 {
437 /* don't keep that pointer around, it will be invalidated if the buffer is ever resized */
d5d9573c
RG
438 dnsheader_aligned dnsHeader(unit->query.data());
439
94e3db7e 440 if (!checkQueryHeaders(*dnsHeader, clientState)) {
d5d9573c
RG
441 dnsdist::PacketMangling::editDNSHeaderFromPacket(unit->query, [](dnsheader& header) {
442 header.rcode = RCode::ServFail;
443 header.qr = true;
444 return true;
445 });
3201998f 446 unit->response = std::move(unit->query);
3dc49a89 447
3201998f 448 handleImmediateResponse(std::move(unit), "DoQ invalid headers");
57a94421
CHB
449 return;
450 }
451
3201998f 452 if (dnsHeader->qdcount == 0) {
d5d9573c
RG
453 dnsdist::PacketMangling::editDNSHeaderFromPacket(unit->query, [](dnsheader& header) {
454 header.rcode = RCode::NotImp;
455 header.qr = true;
456 return true;
457 });
3201998f 458 unit->response = std::move(unit->query);
57a94421 459
3201998f 460 handleImmediateResponse(std::move(unit), "DoQ empty query");
57a94421
CHB
461 return;
462 }
463
3201998f 464 queryId = ntohs(dnsHeader->id);
57a94421
CHB
465 }
466
3201998f
RG
467 auto downstream = unit->downstream;
468 // NOLINTNEXTLINE(cppcoreguidelines-pro-type-reinterpret-cast)
469 unit->ids.qname = DNSName(reinterpret_cast<const char*>(unit->query.data()), static_cast<int>(unit->query.size()), sizeof(dnsheader), false, &unit->ids.qtype, &unit->ids.qclass);
470 DNSQuestion dnsQuestion(unit->ids, unit->query);
d5d9573c
RG
471 dnsdist::PacketMangling::editDNSHeaderFromPacket(dnsQuestion.getMutableData(), [&ids](dnsheader& header) {
472 const uint16_t* flags = getFlagsFromDNSHeader(&header);
473 ids.origFlags = *flags;
474 return true;
475 });
3201998f 476 unit->ids.cs = &clientState;
57a94421 477
3201998f 478 auto result = processQuery(dnsQuestion, holders, downstream);
57a94421 479 if (result == ProcessQueryResult::Drop) {
3201998f 480 handleImmediateResponse(std::move(unit), "DoQ dropped query");
57a94421
CHB
481 return;
482 }
3201998f 483 if (result == ProcessQueryResult::Asynchronous) {
57a94421
CHB
484 return;
485 }
3201998f
RG
486 if (result == ProcessQueryResult::SendAnswer) {
487 if (unit->response.empty()) {
488 unit->response = std::move(unit->query);
57a94421 489 }
3201998f 490 if (unit->response.size() >= sizeof(dnsheader)) {
d5d9573c 491 const dnsheader_aligned dnsHeader(unit->response.data());
57a94421 492
3201998f 493 handleResponseSent(unit->ids.qname, QType(unit->ids.qtype), 0., unit->ids.origDest, ComboAddress(), unit->response.size(), *dnsHeader, dnsdist::Protocol::DoQ, dnsdist::Protocol::DoQ, false);
57a94421 494 }
3201998f 495 handleImmediateResponse(std::move(unit), "DoQ self-answered response");
57a94421
CHB
496 return;
497 }
498
182af582 499 ++dnsdist::metrics::g_stats.responses;
3201998f
RG
500 if (unit->ids.cs != nullptr) {
501 ++unit->ids.cs->responses;
182af582
CHB
502 }
503
57a94421 504 if (result != ProcessQueryResult::PassToBackend) {
3201998f 505 handleImmediateResponse(std::move(unit), "DoQ no backend available");
57a94421
CHB
506 return;
507 }
508
509 if (downstream == nullptr) {
3201998f 510 handleImmediateResponse(std::move(unit), "DoQ no backend available");
57a94421
CHB
511 return;
512 }
513
3201998f 514 unit->downstream = downstream;
57a94421
CHB
515
516 std::string proxyProtocolPayload;
517 /* we need to do this _before_ creating the cross protocol query because
518 after that the buffer will have been moved */
519 if (downstream->d_config.useProxyProtocol) {
3201998f 520 proxyProtocolPayload = getProxyProtocolPayload(dnsQuestion);
57a94421
CHB
521 }
522
3201998f
RG
523 unit->ids.origID = htons(queryId);
524 unit->tcp = true;
57a94421 525
3201998f
RG
526 /* this moves unit->ids, careful! */
527 auto cpq = std::make_unique<DOQCrossProtocolQuery>(std::move(unit), false);
57a94421
CHB
528 cpq->query.d_proxyProtocolPayload = std::move(proxyProtocolPayload);
529
530 if (downstream->passCrossProtocolQuery(std::move(cpq))) {
531 return;
532 }
3201998f
RG
533 // NOLINTNEXTLINE(bugprone-use-after-move): it was only moved if the call succeeded
534 unit = cpq->releaseDU();
535 handleImmediateResponse(std::move(unit), "DoQ internal error");
536 return;
57a94421
CHB
537 }
538 catch (const std::exception& e) {
539 vinfolog("Got an error in DOQ question thread while parsing a query from %s, id %d: %s", remote.toStringWithPort(), queryId, e.what());
3201998f 540 handleImmediateResponse(std::move(unit), "DoQ internal error");
57a94421
CHB
541 return;
542 }
57a94421
CHB
543}
544
57a94421
CHB
545static void doq_dispatch_query(DOQServerConfig& dsc, PacketBuffer&& query, const ComboAddress& local, const ComboAddress& remote, const PacketBuffer& serverConnID, const uint64_t streamID)
546{
547 try {
3201998f
RG
548 auto unit = std::make_unique<DOQUnit>(std::move(query));
549 unit->dsc = &dsc;
550 unit->ids.origDest = local;
551 unit->ids.origRemote = remote;
552 unit->ids.protocol = dnsdist::Protocol::DoQ;
553 unit->serverConnID = serverConnID;
554 unit->streamID = streamID;
57a94421 555
3201998f 556 processDOQQuery(std::move(unit));
57a94421 557 }
fc7c7538 558 catch (const std::exception& exp) {
559942b3 559 vinfolog("Had error handling DoQ DNS packet from %s: %s", remote.toStringWithPort(), exp.what());
57a94421
CHB
560 }
561}
562
f7d508b5
CHB
563static void flushResponses(pdns::channel::Receiver<DOQUnit>& receiver)
564{
357b3faa 565 for (;;) {
f7d508b5
CHB
566 try {
567 auto tmp = receiver.receive();
568 if (!tmp) {
357b3faa 569 return;
f7d508b5
CHB
570 }
571
f016a7d5
RG
572 auto unit = std::move(*tmp);
573 auto conn = getConnection(unit->dsc->df->d_server_config->d_connections, unit->serverConnID);
574 if (conn) {
575 handleResponse(*unit->dsc->df, *conn, unit->streamID, unit->response);
576 }
f7d508b5
CHB
577 }
578 catch (const std::exception& e) {
579 errlog("Error while processing response received over DoQ: %s", e.what());
580 }
581 catch (...) {
582 errlog("Unspecified error while processing response received over DoQ");
583 }
584 }
585}
586
c6886da0
RG
587static void flushStalledResponses(Connection& conn)
588{
589 for (auto streamIt = conn.d_streamOutBuffers.begin(); streamIt != conn.d_streamOutBuffers.end();) {
590 const auto& streamID = streamIt->first;
591 auto& response = streamIt->second;
592 if (quiche_conn_stream_writable(conn.d_conn.get(), streamID, response.size()) == 1) {
593 if (tryWriteResponse(conn, streamID, response)) {
594 streamIt = conn.d_streamOutBuffers.erase(streamIt);
595 continue;
596 }
597 }
598 ++streamIt;
599 }
600}
601
b4c463e2
RG
602static void handleReadableStream(DOQFrontend& frontend, ClientState& clientState, Connection& conn, uint64_t streamID, const ComboAddress& client, const PacketBuffer& serverConnID)
603{
604 auto& streamBuffer = conn.d_streamBuffers[streamID];
0fe22933
RG
605 while (true) {
606 bool fin = false;
607 auto existingLength = streamBuffer.size();
608 streamBuffer.resize(existingLength + 512);
609 auto received = quiche_conn_stream_recv(conn.d_conn.get(), streamID,
610 &streamBuffer.at(existingLength), 512,
611 &fin);
612 if (received == 0 || received == QUICHE_ERR_DONE) {
613 streamBuffer.resize(existingLength);
b4c463e2
RG
614 return;
615 }
a6cfab84 616 if (received < 0) {
b4c463e2
RG
617 ++dnsdist::metrics::g_stats.nonCompliantQueries;
618 ++clientState.nonCompliantQueries;
619 quiche_conn_stream_shutdown(conn.d_conn.get(), streamID, QUICHE_SHUTDOWN_WRITE, static_cast<uint64_t>(DOQ_Error_Codes::DOQ_PROTOCOL_ERROR));
620 return;
621 }
0fe22933
RG
622
623 streamBuffer.resize(existingLength + received);
624 if (fin) {
625 break;
626 }
627 }
628
629 if (streamBuffer.size() < (sizeof(uint16_t) + sizeof(dnsheader))) {
630 ++dnsdist::metrics::g_stats.nonCompliantQueries;
631 ++clientState.nonCompliantQueries;
632 quiche_conn_stream_shutdown(conn.d_conn.get(), streamID, QUICHE_SHUTDOWN_WRITE, static_cast<uint64_t>(DOQ_Error_Codes::DOQ_PROTOCOL_ERROR));
633 return;
634 }
635
636 uint16_t payloadLength = streamBuffer.at(0) * 256 + streamBuffer.at(1);
637 streamBuffer.erase(streamBuffer.begin(), streamBuffer.begin() + 2);
638 if (payloadLength != streamBuffer.size()) {
639 ++dnsdist::metrics::g_stats.nonCompliantQueries;
640 ++clientState.nonCompliantQueries;
641 quiche_conn_stream_shutdown(conn.d_conn.get(), streamID, QUICHE_SHUTDOWN_WRITE, static_cast<uint64_t>(DOQ_Error_Codes::DOQ_PROTOCOL_ERROR));
642 return;
b4c463e2 643 }
0fe22933 644 DEBUGLOG("Dispatching query");
699689aa 645 doq_dispatch_query(*(frontend.d_server_config), std::move(streamBuffer), conn.d_localAddr, client, serverConnID, streamID);
0fe22933 646 conn.d_streamBuffers.erase(streamID);
b4c463e2
RG
647}
648
2e1c76aa 649static void handleSocketReadable(DOQFrontend& frontend, ClientState& clientState, Socket& sock, PacketBuffer& buffer)
cc62f9c3 650{
7769b7b7
RG
651 // destination connection ID, will have to be sent as original destination connection ID
652 PacketBuffer serverConnID;
653 // source connection ID, will have to be sent as destination connection ID
654 PacketBuffer clientConnID;
655 PacketBuffer tokenBuf;
f684e004 656 while (true) {
f684e004 657 ComboAddress client;
699689aa
RG
658 ComboAddress localAddr;
659 client.sin4.sin_family = clientState.local.sin4.sin_family;
660 localAddr.sin4.sin_family = clientState.local.sin4.sin_family;
2e1c76aa 661 buffer.resize(4096);
699689aa 662 if (!dnsdist::doq::recvAsync(sock, buffer, client, localAddr)) {
cc62f9c3
RG
663 return;
664 }
699689aa
RG
665 if (localAddr.sin4.sin_family == 0) {
666 localAddr = clientState.local;
667 }
668 else {
669 /* we don't get the port, only the address */
670 localAddr.sin4.sin_port = clientState.local.sin4.sin_port;
671 }
672
ddc643a2 673 DEBUGLOG("Received DoQ datagram of size " << buffer.size() << " from " << client.toStringWithPort());
cc62f9c3 674
f684e004
RG
675 uint32_t version{0};
676 uint8_t type{0};
677 std::array<uint8_t, QUICHE_MAX_CONN_ID_LEN> scid{};
678 size_t scid_len = scid.size();
679 std::array<uint8_t, QUICHE_MAX_CONN_ID_LEN> dcid{};
680 size_t dcid_len = dcid.size();
681 std::array<uint8_t, MAX_TOKEN_LEN> token{};
682 size_t token_len = token.size();
cc62f9c3 683
2e1c76aa 684 auto res = quiche_header_info(buffer.data(), buffer.size(), LOCAL_CONN_ID_LEN,
f684e004
RG
685 &version, &type,
686 scid.data(), &scid_len,
687 dcid.data(), &dcid_len,
688 token.data(), &token_len);
689 if (res != 0) {
690 DEBUGLOG("Error in quiche_header_info: " << res);
691 continue;
692 }
693
7769b7b7
RG
694 serverConnID.assign(dcid.begin(), dcid.begin() + dcid_len);
695 clientConnID.assign(scid.begin(), scid.begin() + scid_len);
f684e004 696 auto conn = getConnection(frontend.d_server_config->d_connections, serverConnID);
cc62f9c3 697
cc62f9c3 698 if (!conn) {
f684e004 699 DEBUGLOG("Connection not found");
fc16a83b
RG
700 if (type != static_cast<uint8_t>(DOQ_Packet_Types::QUIC_PACKET_TYPE_INITIAL)) {
701 DEBUGLOG("Packet is not initial");
702 continue;
703 }
704
f684e004
RG
705 if (!quiche_version_is_supported(version)) {
706 DEBUGLOG("Unsupported version");
707 ++frontend.d_doqUnsupportedVersionErrors;
699689aa 708 handleVersionNegociation(sock, clientConnID, serverConnID, client, localAddr, buffer);
f684e004
RG
709 continue;
710 }
711
712 if (token_len == 0) {
713 /* stateless retry */
714 DEBUGLOG("No token received");
699689aa 715 handleStatelessRetry(sock, clientConnID, serverConnID, client, localAddr, version, buffer);
f684e004
RG
716 continue;
717 }
718
7769b7b7 719 tokenBuf.assign(token.begin(), token.begin() + token_len);
f684e004
RG
720 auto originalDestinationID = validateToken(tokenBuf, client);
721 if (!originalDestinationID) {
722 ++frontend.d_doqInvalidTokensReceived;
723 DEBUGLOG("Discarding invalid token");
724 continue;
725 }
726
727 DEBUGLOG("Creating a new connection");
699689aa 728 conn = createConnection(*frontend.d_server_config, serverConnID, *originalDestinationID, client, localAddr);
f684e004
RG
729 if (!conn) {
730 continue;
731 }
cc62f9c3 732 }
f684e004
RG
733 DEBUGLOG("Connection found");
734 quiche_recv_info recv_info = {
735 // NOLINTNEXTLINE(cppcoreguidelines-pro-type-reinterpret-cast)
736 reinterpret_cast<struct sockaddr*>(&client),
737 client.getSocklen(),
738 // NOLINTNEXTLINE(cppcoreguidelines-pro-type-reinterpret-cast)
699689aa
RG
739 reinterpret_cast<struct sockaddr*>(&localAddr),
740 localAddr.getSocklen(),
f684e004 741 };
cc62f9c3 742
2e1c76aa 743 auto done = quiche_conn_recv(conn->get().d_conn.get(), buffer.data(), buffer.size(), &recv_info);
f684e004
RG
744 if (done < 0) {
745 continue;
746 }
cc62f9c3 747
8be79592 748 if (quiche_conn_is_established(conn->get().d_conn.get()) || quiche_conn_is_in_early_data(conn->get().d_conn.get())) {
f684e004 749 auto readable = std::unique_ptr<quiche_stream_iter, decltype(&quiche_stream_iter_free)>(quiche_conn_readable(conn->get().d_conn.get()), quiche_stream_iter_free);
cc62f9c3 750
f684e004
RG
751 uint64_t streamID = 0;
752 while (quiche_stream_iter_next(readable.get(), &streamID)) {
753 handleReadableStream(frontend, clientState, *conn, streamID, client, serverConnID);
754 }
4cec715d 755
699689aa 756 flushEgress(sock, conn->get().d_conn, client, localAddr, buffer);
f684e004
RG
757 }
758 else {
759 DEBUGLOG("Connection not established");
cc62f9c3 760 }
cc62f9c3
RG
761 }
762}
763
57a94421 764// this is the entrypoint from dnsdist.cc
ec9548d7 765void doqThread(ClientState* clientState)
57a94421
CHB
766{
767 try {
ec9548d7 768 std::shared_ptr<DOQFrontend>& frontend = clientState->doqFrontend;
57a94421 769
ec9548d7
CHB
770 frontend->d_server_config->clientState = clientState;
771 frontend->d_server_config->df = clientState->doqFrontend;
57a94421 772
57a94421
CHB
773 setThreadName("dnsdist/doq");
774
ec9548d7 775 Socket sock(clientState->udpFD);
f684e004 776 sock.setNonBlocking();
57a94421 777
f7d508b5 778 auto mplexer = std::unique_ptr<FDMultiplexer>(FDMultiplexer::getMultiplexerSilent());
57a94421 779
f7d508b5
CHB
780 auto responseReceiverFD = frontend->d_server_config->d_responseReceiver.getDescriptor();
781 mplexer->addReadFD(sock.getHandle(), [](int, FDMultiplexer::funcparam_t&) {});
782 mplexer->addReadFD(responseReceiverFD, [](int, FDMultiplexer::funcparam_t&) {});
f684e004 783 std::vector<int> readyFDs;
2e1c76aa 784 PacketBuffer buffer(4096);
57a94421 785 while (true) {
f684e004 786 readyFDs.clear();
f7d508b5
CHB
787 mplexer->getAvailableFDs(readyFDs, 500);
788
f684e004
RG
789 try {
790 if (std::find(readyFDs.begin(), readyFDs.end(), sock.getHandle()) != readyFDs.end()) {
2e1c76aa 791 handleSocketReadable(*frontend, *clientState, sock, buffer);
f684e004 792 }
f7d508b5 793
f684e004
RG
794 if (std::find(readyFDs.begin(), readyFDs.end(), responseReceiverFD) != readyFDs.end()) {
795 flushResponses(frontend->d_server_config->d_responseReceiver);
796 }
182af582 797
f684e004
RG
798 for (auto conn = frontend->d_server_config->d_connections.begin(); conn != frontend->d_server_config->d_connections.end();) {
799 quiche_conn_on_timeout(conn->second.d_conn.get());
3dc49a89 800
699689aa 801 flushEgress(sock, conn->second.d_conn, conn->second.d_peer, conn->second.d_localAddr, buffer);
3dc49a89 802
f684e004 803 if (quiche_conn_is_closed(conn->second.d_conn.get())) {
3dc49a89 804#ifdef DEBUGLOG_ENABLED
f684e004
RG
805 quiche_stats stats;
806 quiche_path_stats path_stats;
3dc49a89 807
f684e004
RG
808 quiche_conn_stats(conn->second.d_conn.get(), &stats);
809 quiche_conn_path_stats(conn->second.d_conn.get(), 0, &path_stats);
3dc49a89 810
2e1c76aa 811 DEBUGLOG("Connection (DoQ) closed, recv=" << stats.recv << " sent=" << stats.sent << " lost=" << stats.lost << " rtt=" << path_stats.rtt << "ns cwnd=" << path_stats.cwnd);
3dc49a89 812#endif
f684e004
RG
813 conn = frontend->d_server_config->d_connections.erase(conn);
814 }
815 else {
816 flushStalledResponses(conn->second);
817 ++conn;
818 }
3dc49a89 819 }
57a94421 820 }
f684e004
RG
821 catch (const std::exception& exp) {
822 vinfolog("Caught exception in the main DoQ thread: %s", exp.what());
823 }
824 catch (...) {
825 vinfolog("Unknown exception in the main DoQ thread");
826 }
57a94421 827 }
57a94421
CHB
828 }
829 catch (const std::exception& e) {
f684e004 830 DEBUGLOG("Caught fatal error in the main DoQ thread: " << e.what());
57a94421
CHB
831 }
832}
acde6658
RG
833
834#endif /* HAVE_DNS_OVER_QUIC */