]> git.ipfire.org Git - thirdparty/pdns.git/blame - pdns/dnsdistdist/doq.cc
Merge pull request #13768 from rgacogne/ddist-maintenance-hook
[thirdparty/pdns.git] / pdns / dnsdistdist / doq.cc
CommitLineData
57a94421
CHB
1/*
2 * This file is part of PowerDNS or dnsdist.
3 * Copyright -- PowerDNS.COM B.V. and its contributors
4 *
5 * This program is free software; you can redistribute it and/or modify
6 * it under the terms of version 2 of the GNU General Public License as
7 * published by the Free Software Foundation.
8 *
9 * In addition, for the avoidance of any doubt, permission is granted to
10 * link this program with OpenSSL and to (re)distribute the binaries
11 * produced as the result of such linking.
12 *
13 * This program is distributed in the hope that it will be useful,
14 * but WITHOUT ANY WARRANTY; without even the implied warranty of
15 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16 * GNU General Public License for more details.
17 *
18 * You should have received a copy of the GNU General Public License
19 * along with this program; if not, write to the Free Software
20 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
21 */
22
23#include "doq.hh"
24
acde6658
RG
25#ifdef HAVE_DNS_OVER_QUIC
26#include <quiche.h>
27
57a94421
CHB
28#include "dolog.hh"
29#include "iputils.hh"
30#include "misc.hh"
31#include "sstuff.hh"
57a94421 32#include "threadname.hh"
acde6658 33
d5d9573c 34#include "dnsdist-dnsparser.hh"
559942b3 35#include "dnsdist-ecs.hh"
57a94421 36#include "dnsdist-proxy-protocol.hh"
acde6658
RG
37#include "dnsdist-tcp.hh"
38#include "dnsdist-random.hh"
fc7c7538 39
ed16c81f 40#include "doq-common.hh"
57a94421 41
ed16c81f 42using namespace dnsdist::doq;
805b22fe 43
d0439b42
CHB
44#if 0
45#define DEBUGLOG_ENABLED
46#define DEBUGLOG(x) std::cerr << x << std::endl;
47#else
48#define DEBUGLOG(x)
49#endif
50
805b22fe
RG
51class Connection
52{
53public:
7450fc08 54 Connection(const ComboAddress& peer, QuicheConnection&& conn) :
805b22fe
RG
55 d_peer(peer), d_conn(std::move(conn))
56 {
57 }
58 Connection(const Connection&) = delete;
59 Connection(Connection&&) = default;
60 Connection& operator=(const Connection&) = delete;
61 Connection& operator=(Connection&&) = default;
62 ~Connection() = default;
63
64 ComboAddress d_peer;
65 QuicheConnection d_conn;
6c66428a 66 std::unordered_map<uint64_t, PacketBuffer> d_streamBuffers;
c6886da0 67 std::unordered_map<uint64_t, PacketBuffer> d_streamOutBuffers;
805b22fe
RG
68};
69
3201998f
RG
70static void sendBackDOQUnit(DOQUnitUniquePtr&& unit, const char* description);
71
a1a819fd 72struct DOQServerConfig
57a94421 73{
acde6658 74 DOQServerConfig(QuicheConfig&& config_, uint32_t internalPipeBufferSize) :
57a94421
CHB
75 config(std::move(config_))
76 {
f7d508b5
CHB
77 {
78 auto [sender, receiver] = pdns::channel::createObjectQueue<DOQUnit>(pdns::channel::SenderBlockingMode::SenderNonBlocking, pdns::channel::ReceiverBlockingMode::ReceiverNonBlocking, internalPipeBufferSize);
79 d_responseSender = std::move(sender);
80 d_responseReceiver = std::move(receiver);
81 }
57a94421
CHB
82 }
83 DOQServerConfig(const DOQServerConfig&) = delete;
84 DOQServerConfig(DOQServerConfig&&) = default;
85 DOQServerConfig& operator=(const DOQServerConfig&) = delete;
86 DOQServerConfig& operator=(DOQServerConfig&&) = default;
87 ~DOQServerConfig() = default;
88
f016a7d5
RG
89 using ConnectionsMap = std::map<PacketBuffer, Connection>;
90
57a94421 91 LocalHolders holders;
f016a7d5 92 ConnectionsMap d_connections;
57a94421 93 QuicheConfig config;
ec9548d7 94 ClientState* clientState{nullptr};
57a94421 95 std::shared_ptr<DOQFrontend> df{nullptr};
f7d508b5
CHB
96 pdns::channel::Sender<DOQUnit> d_responseSender;
97 pdns::channel::Receiver<DOQUnit> d_responseReceiver;
57a94421
CHB
98};
99
acde6658
RG
100/* these might seem useless, but they are needed because
101 they need to be declared _after_ the definition of DOQServerConfig
102 so that we can use a unique_ptr in DOQFrontend */
3201998f
RG
103DOQFrontend::DOQFrontend() = default;
104DOQFrontend::~DOQFrontend() = default;
acde6658 105
57a94421
CHB
106class DOQTCPCrossQuerySender final : public TCPQuerySender
107{
108public:
3201998f 109 DOQTCPCrossQuerySender() = default;
57a94421 110
3201998f 111 [[nodiscard]] bool active() const override
57a94421
CHB
112 {
113 return true;
114 }
115
3201998f 116 void handleResponse([[maybe_unused]] const struct timeval& now, TCPResponse&& response) override
57a94421
CHB
117 {
118 if (!response.d_idstate.doqu) {
119 return;
120 }
121
3201998f
RG
122 auto unit = std::move(response.d_idstate.doqu);
123 if (unit->dsc == nullptr) {
f7d508b5
CHB
124 return;
125 }
57a94421 126
3201998f
RG
127 unit->response = std::move(response.d_buffer);
128 unit->ids = std::move(response.d_idstate);
129 DNSResponse dnsResponse(unit->ids, unit->response, unit->downstream);
57a94421 130
c0e87fe4 131 dnsheader cleartextDH{};
90686725 132 memcpy(&cleartextDH, dnsResponse.getHeader().get(), sizeof(cleartextDH));
57a94421
CHB
133
134 if (!response.isAsync()) {
135
136 static thread_local LocalStateHolder<vector<DNSDistResponseRuleAction>> localRespRuleActions = g_respruleactions.getLocal();
137 static thread_local LocalStateHolder<vector<DNSDistResponseRuleAction>> localCacheInsertedRespRuleActions = g_cacheInsertedRespRuleActions.getLocal();
138
3201998f 139 dnsResponse.ids.doqu = std::move(unit);
57a94421 140
3201998f
RG
141 if (!processResponse(dnsResponse.ids.doqu->response, *localRespRuleActions, *localCacheInsertedRespRuleActions, dnsResponse, false)) {
142 if (dnsResponse.ids.doqu) {
57a94421 143
3201998f 144 sendBackDOQUnit(std::move(dnsResponse.ids.doqu), "Response dropped by rules");
57a94421
CHB
145 }
146 return;
147 }
148
3201998f 149 if (dnsResponse.isAsynchronous()) {
57a94421
CHB
150 return;
151 }
152
3201998f 153 unit = std::move(dnsResponse.ids.doqu);
57a94421
CHB
154 }
155
3201998f
RG
156 if (!unit->ids.selfGenerated) {
157 double udiff = unit->ids.queryRealTime.udiff();
a6a6615c 158 vinfolog("Got answer from %s, relayed to %s (quic, %d bytes), took %f us", unit->downstream->d_config.remote.toStringWithPort(), unit->ids.origRemote.toStringWithPort(), unit->response.size(), udiff);
57a94421 159
3201998f
RG
160 auto backendProtocol = unit->downstream->getProtocol();
161 if (backendProtocol == dnsdist::Protocol::DoUDP && unit->tcp) {
57a94421
CHB
162 backendProtocol = dnsdist::Protocol::DoTCP;
163 }
3201998f 164 handleResponseSent(unit->ids, udiff, unit->ids.origRemote, unit->downstream->d_config.remote, unit->response.size(), cleartextDH, backendProtocol, true);
57a94421
CHB
165 }
166
167 ++dnsdist::metrics::g_stats.responses;
3201998f
RG
168 if (unit->ids.cs != nullptr) {
169 ++unit->ids.cs->responses;
57a94421
CHB
170 }
171
3201998f 172 sendBackDOQUnit(std::move(unit), "Cross-protocol response");
57a94421
CHB
173 }
174
175 void handleXFRResponse(const struct timeval& now, TCPResponse&& response) override
176 {
177 return handleResponse(now, std::move(response));
178 }
179
ef4350f8 180 void notifyIOError([[maybe_unused]] const struct timeval& now, TCPResponse&& response) override
57a94421 181 {
ef4350f8
RG
182 if (!response.d_idstate.doqu) {
183 return;
184 }
185
186 auto unit = std::move(response.d_idstate.doqu);
187 if (unit->dsc == nullptr) {
188 return;
189 }
190
191 /* this will signal an error */
192 unit->response.clear();
193 unit->ids = std::move(response.d_idstate);
194 sendBackDOQUnit(std::move(unit), "Cross-protocol error");
3dc49a89 195 }
57a94421
CHB
196};
197
198class DOQCrossProtocolQuery : public CrossProtocolQuery
199{
200public:
3201998f 201 DOQCrossProtocolQuery(DOQUnitUniquePtr&& unit, bool isResponse)
57a94421
CHB
202 {
203 if (isResponse) {
204 /* happens when a response becomes async */
3201998f 205 query = InternalQuery(std::move(unit->response), std::move(unit->ids));
57a94421
CHB
206 }
207 else {
208 /* we need to duplicate the query here because we might need
209 the existing query later if we get a truncated answer */
3201998f 210 query = InternalQuery(PacketBuffer(unit->query), std::move(unit->ids));
57a94421
CHB
211 }
212
3201998f
RG
213 /* it might have been moved when we moved unit->ids */
214 if (unit) {
215 query.d_idstate.doqu = std::move(unit);
57a94421
CHB
216 }
217
218 /* we _could_ remove it from the query buffer and put in query's d_proxyProtocolPayload,
3201998f 219 clearing query.d_proxyProtocolPayloadAdded and unit->proxyProtocolPayloadSize.
57a94421
CHB
220 Leave it for now because we know that the onky case where the payload has been
221 added is when we tried over UDP, got a TC=1 answer and retried over TCP/DoT,
222 and we know the TCP/DoT code can handle it. */
223 query.d_proxyProtocolPayloadAdded = query.d_idstate.doqu->proxyProtocolPayloadSize > 0;
224 downstream = query.d_idstate.doqu->downstream;
225 }
226
227 void handleInternalError()
228 {
3dc49a89 229 sendBackDOQUnit(std::move(query.d_idstate.doqu), "DOQ internal error");
57a94421
CHB
230 }
231
232 std::shared_ptr<TCPQuerySender> getTCPQuerySender() override
233 {
234 query.d_idstate.doqu->downstream = downstream;
235 return s_sender;
236 }
237
238 DNSQuestion getDQ() override
239 {
240 auto& ids = query.d_idstate;
3201998f
RG
241 DNSQuestion dnsQuestion(ids, query.d_buffer);
242 return dnsQuestion;
57a94421
CHB
243 }
244
245 DNSResponse getDR() override
246 {
247 auto& ids = query.d_idstate;
3201998f
RG
248 DNSResponse dnsResponse(ids, query.d_buffer, downstream);
249 return dnsResponse;
182af582 250 }
57a94421
CHB
251
252 DOQUnitUniquePtr&& releaseDU()
253 {
254 return std::move(query.d_idstate.doqu);
255 }
256
257private:
258 static std::shared_ptr<DOQTCPCrossQuerySender> s_sender;
259};
260
261std::shared_ptr<DOQTCPCrossQuerySender> DOQCrossProtocolQuery::s_sender = std::make_shared<DOQTCPCrossQuerySender>();
262
c6886da0
RG
263static bool tryWriteResponse(Connection& conn, const uint64_t streamID, PacketBuffer& response)
264{
265 size_t pos = 0;
266 while (pos < response.size()) {
267 auto res = quiche_conn_stream_send(conn.d_conn.get(), streamID, &response.at(pos), response.size() - pos, true);
268 if (res == QUICHE_ERR_DONE) {
269 response.erase(response.begin(), response.begin() + static_cast<ssize_t>(pos));
270 return false;
271 }
272 if (res < 0) {
273 quiche_conn_stream_shutdown(conn.d_conn.get(), streamID, QUICHE_SHUTDOWN_WRITE, static_cast<uint64_t>(DOQ_Error_Codes::DOQ_INTERNAL_ERROR));
274 return true;
275 }
276 pos += res;
277 }
278
279 return true;
280}
281
282static void handleResponse(DOQFrontend& frontend, Connection& conn, const uint64_t streamID, PacketBuffer& response)
57a94421 283{
3201998f
RG
284 if (response.empty()) {
285 ++frontend.d_errorResponses;
6c66428a
RG
286 quiche_conn_stream_shutdown(conn.d_conn.get(), streamID, QUICHE_SHUTDOWN_WRITE, static_cast<uint64_t>(DOQ_Error_Codes::DOQ_UNSPECIFIED_ERROR));
287 return;
182af582 288 }
3201998f
RG
289 ++frontend.d_validResponses;
290 auto responseSize = static_cast<uint16_t>(response.size());
6c66428a 291 const std::array<uint8_t, 2> sizeBytes = {static_cast<uint8_t>(responseSize / 256), static_cast<uint8_t>(responseSize % 256)};
c6886da0
RG
292 response.insert(response.begin(), sizeBytes.begin(), sizeBytes.end());
293 if (!tryWriteResponse(conn, streamID, response)) {
294 conn.d_streamOutBuffers[streamID] = std::move(response);
3201998f 295 }
57a94421
CHB
296}
297
298void DOQFrontend::setup()
299{
300 auto config = QuicheConfig(quiche_config_new(QUICHE_PROTOCOL_VERSION), quiche_config_free);
ed16c81f 301 d_quicheParams.d_alpn = std::string(DOQ_ALPN.begin(), DOQ_ALPN.end());
2e7989f5 302 configureQuiche(config, d_quicheParams, false);
acde6658 303 d_server_config = std::make_unique<DOQServerConfig>(std::move(config), d_internalPipeBufferSize);
57a94421
CHB
304}
305
f016a7d5 306static std::optional<std::reference_wrapper<Connection>> getConnection(DOQServerConfig::ConnectionsMap& connMap, const PacketBuffer& connID)
57a94421 307{
f016a7d5
RG
308 auto iter = connMap.find(connID);
309 if (iter == connMap.end()) {
57a94421
CHB
310 return std::nullopt;
311 }
f016a7d5 312 return iter->second;
57a94421
CHB
313}
314
3201998f 315static void sendBackDOQUnit(DOQUnitUniquePtr&& unit, const char* description)
3dc49a89 316{
3201998f 317 if (unit->dsc == nullptr) {
f7d508b5
CHB
318 return;
319 }
320 try {
3201998f 321 if (!unit->dsc->d_responseSender.send(std::move(unit))) {
f089d71c 322 ++dnsdist::metrics::g_stats.doqResponsePipeFull;
f7d508b5
CHB
323 vinfolog("Unable to pass a %s to the DoQ worker thread because the pipe is full", description);
324 }
357b3faa
CHB
325 }
326 catch (const std::exception& e) {
f7d508b5
CHB
327 vinfolog("Unable to pass a %s to the DoQ worker thread because we couldn't write to the pipe: %s", description, e.what());
328 }
3dc49a89
CHB
329}
330
f016a7d5 331static std::optional<std::reference_wrapper<Connection>> createConnection(DOQServerConfig& config, const PacketBuffer& serverSideID, const PacketBuffer& originalDestinationID, const ComboAddress& local, const ComboAddress& peer)
57a94421
CHB
332{
333 auto quicheConn = QuicheConnection(quiche_accept(serverSideID.data(), serverSideID.size(),
334 originalDestinationID.data(), originalDestinationID.size(),
3201998f
RG
335 // NOLINTNEXTLINE(cppcoreguidelines-pro-type-reinterpret-cast)
336 reinterpret_cast<const struct sockaddr*>(&local),
57a94421 337 local.getSocklen(),
3201998f
RG
338 // NOLINTNEXTLINE(cppcoreguidelines-pro-type-reinterpret-cast)
339 reinterpret_cast<const struct sockaddr*>(&peer),
57a94421 340 peer.getSocklen(),
1cba32b0 341 config.config.get()),
182af582 342 quiche_conn_free);
1cba32b0 343
ed16c81f
CHB
344 if (config.df && !config.df->d_quicheParams.d_keyLogFile.empty()) {
345 quiche_conn_set_keylog_path(quicheConn.get(), config.df->d_quicheParams.d_keyLogFile.c_str());
1cba32b0
RG
346 }
347
57a94421 348 auto conn = Connection(peer, std::move(quicheConn));
f016a7d5 349 auto pair = config.d_connections.emplace(serverSideID, std::move(conn));
57a94421
CHB
350 return pair.first->second;
351}
352
3201998f 353std::unique_ptr<CrossProtocolQuery> getDOQCrossProtocolQueryFromDQ(DNSQuestion& dnsQuestion, bool isResponse)
57a94421 354{
3201998f 355 if (!dnsQuestion.ids.doqu) {
57a94421
CHB
356 throw std::runtime_error("Trying to create a DoQ cross protocol query without a valid DoQ unit");
357 }
358
3201998f
RG
359 auto unit = std::move(dnsQuestion.ids.doqu);
360 if (&dnsQuestion.ids != &unit->ids) {
361 unit->ids = std::move(dnsQuestion.ids);
57a94421
CHB
362 }
363
3201998f 364 unit->ids.origID = dnsQuestion.getHeader()->id;
57a94421
CHB
365
366 if (!isResponse) {
3201998f
RG
367 if (unit->query.data() != dnsQuestion.getMutableData().data()) {
368 unit->query = std::move(dnsQuestion.getMutableData());
57a94421
CHB
369 }
370 }
371 else {
3201998f
RG
372 if (unit->response.data() != dnsQuestion.getMutableData().data()) {
373 unit->response = std::move(dnsQuestion.getMutableData());
57a94421
CHB
374 }
375 }
376
3201998f 377 return std::make_unique<DOQCrossProtocolQuery>(std::move(unit), isResponse);
57a94421
CHB
378}
379
3201998f 380static void processDOQQuery(DOQUnitUniquePtr&& doqUnit)
57a94421 381{
3201998f 382 const auto handleImmediateResponse = [](DOQUnitUniquePtr&& unit, [[maybe_unused]] const char* reason) {
57a94421 383 DEBUGLOG("handleImmediateResponse() reason=" << reason);
3201998f
RG
384 auto conn = getConnection(unit->dsc->df->d_server_config->d_connections, unit->serverConnID);
385 handleResponse(*unit->dsc->df, *conn, unit->streamID, unit->response);
386 unit->ids.doqu.reset();
57a94421
CHB
387 };
388
3201998f
RG
389 auto& ids = doqUnit->ids;
390 ids.doqu = std::move(doqUnit);
391 auto& unit = ids.doqu;
57a94421
CHB
392 uint16_t queryId = 0;
393 ComboAddress remote;
394
395 try {
57a94421 396
3201998f
RG
397 remote = unit->ids.origRemote;
398 DOQServerConfig* dsc = unit->dsc;
57a94421 399 auto& holders = dsc->holders;
ec9548d7 400 ClientState& clientState = *dsc->clientState;
57a94421 401
2aaf9ecd
CHB
402 if (!holders.acl->match(remote)) {
403 vinfolog("Query from %s (DoQ) dropped because of ACL", remote.toStringWithPort());
404 ++dnsdist::metrics::g_stats.aclDrops;
405 unit->response.clear();
406
407 handleImmediateResponse(std::move(unit), "DoQ query dropped because of ACL");
408 return;
409 }
410
3201998f 411 if (unit->query.size() < sizeof(dnsheader)) {
182af582 412 ++dnsdist::metrics::g_stats.nonCompliantQueries;
ec9548d7 413 ++clientState.nonCompliantQueries;
d5d9573c 414 unit->response.clear();
3dc49a89 415
3201998f 416 handleImmediateResponse(std::move(unit), "DoQ non-compliant query");
57a94421
CHB
417 return;
418 }
419
ec9548d7 420 ++clientState.queries;
182af582 421 ++dnsdist::metrics::g_stats.queries;
3201998f 422 unit->ids.queryRealTime.start();
57a94421
CHB
423
424 {
425 /* don't keep that pointer around, it will be invalidated if the buffer is ever resized */
d5d9573c
RG
426 dnsheader_aligned dnsHeader(unit->query.data());
427
94e3db7e 428 if (!checkQueryHeaders(*dnsHeader, clientState)) {
d5d9573c
RG
429 dnsdist::PacketMangling::editDNSHeaderFromPacket(unit->query, [](dnsheader& header) {
430 header.rcode = RCode::ServFail;
431 header.qr = true;
432 return true;
433 });
3201998f 434 unit->response = std::move(unit->query);
3dc49a89 435
3201998f 436 handleImmediateResponse(std::move(unit), "DoQ invalid headers");
57a94421
CHB
437 return;
438 }
439
3201998f 440 if (dnsHeader->qdcount == 0) {
d5d9573c
RG
441 dnsdist::PacketMangling::editDNSHeaderFromPacket(unit->query, [](dnsheader& header) {
442 header.rcode = RCode::NotImp;
443 header.qr = true;
444 return true;
445 });
3201998f 446 unit->response = std::move(unit->query);
57a94421 447
3201998f 448 handleImmediateResponse(std::move(unit), "DoQ empty query");
57a94421
CHB
449 return;
450 }
451
3201998f 452 queryId = ntohs(dnsHeader->id);
57a94421
CHB
453 }
454
3201998f
RG
455 auto downstream = unit->downstream;
456 // NOLINTNEXTLINE(cppcoreguidelines-pro-type-reinterpret-cast)
457 unit->ids.qname = DNSName(reinterpret_cast<const char*>(unit->query.data()), static_cast<int>(unit->query.size()), sizeof(dnsheader), false, &unit->ids.qtype, &unit->ids.qclass);
458 DNSQuestion dnsQuestion(unit->ids, unit->query);
d5d9573c
RG
459 dnsdist::PacketMangling::editDNSHeaderFromPacket(dnsQuestion.getMutableData(), [&ids](dnsheader& header) {
460 const uint16_t* flags = getFlagsFromDNSHeader(&header);
461 ids.origFlags = *flags;
462 return true;
463 });
3201998f 464 unit->ids.cs = &clientState;
57a94421 465
3201998f 466 auto result = processQuery(dnsQuestion, holders, downstream);
57a94421 467 if (result == ProcessQueryResult::Drop) {
3201998f 468 handleImmediateResponse(std::move(unit), "DoQ dropped query");
57a94421
CHB
469 return;
470 }
3201998f 471 if (result == ProcessQueryResult::Asynchronous) {
57a94421
CHB
472 return;
473 }
3201998f
RG
474 if (result == ProcessQueryResult::SendAnswer) {
475 if (unit->response.empty()) {
476 unit->response = std::move(unit->query);
57a94421 477 }
3201998f 478 if (unit->response.size() >= sizeof(dnsheader)) {
d5d9573c 479 const dnsheader_aligned dnsHeader(unit->response.data());
57a94421 480
3201998f 481 handleResponseSent(unit->ids.qname, QType(unit->ids.qtype), 0., unit->ids.origDest, ComboAddress(), unit->response.size(), *dnsHeader, dnsdist::Protocol::DoQ, dnsdist::Protocol::DoQ, false);
57a94421 482 }
3201998f 483 handleImmediateResponse(std::move(unit), "DoQ self-answered response");
57a94421
CHB
484 return;
485 }
486
182af582 487 ++dnsdist::metrics::g_stats.responses;
3201998f
RG
488 if (unit->ids.cs != nullptr) {
489 ++unit->ids.cs->responses;
182af582
CHB
490 }
491
57a94421 492 if (result != ProcessQueryResult::PassToBackend) {
3201998f 493 handleImmediateResponse(std::move(unit), "DoQ no backend available");
57a94421
CHB
494 return;
495 }
496
497 if (downstream == nullptr) {
3201998f 498 handleImmediateResponse(std::move(unit), "DoQ no backend available");
57a94421
CHB
499 return;
500 }
501
3201998f 502 unit->downstream = downstream;
57a94421
CHB
503
504 std::string proxyProtocolPayload;
505 /* we need to do this _before_ creating the cross protocol query because
506 after that the buffer will have been moved */
507 if (downstream->d_config.useProxyProtocol) {
3201998f 508 proxyProtocolPayload = getProxyProtocolPayload(dnsQuestion);
57a94421
CHB
509 }
510
3201998f
RG
511 unit->ids.origID = htons(queryId);
512 unit->tcp = true;
57a94421 513
3201998f
RG
514 /* this moves unit->ids, careful! */
515 auto cpq = std::make_unique<DOQCrossProtocolQuery>(std::move(unit), false);
57a94421
CHB
516 cpq->query.d_proxyProtocolPayload = std::move(proxyProtocolPayload);
517
518 if (downstream->passCrossProtocolQuery(std::move(cpq))) {
519 return;
520 }
3201998f
RG
521 // NOLINTNEXTLINE(bugprone-use-after-move): it was only moved if the call succeeded
522 unit = cpq->releaseDU();
523 handleImmediateResponse(std::move(unit), "DoQ internal error");
524 return;
57a94421
CHB
525 }
526 catch (const std::exception& e) {
527 vinfolog("Got an error in DOQ question thread while parsing a query from %s, id %d: %s", remote.toStringWithPort(), queryId, e.what());
3201998f 528 handleImmediateResponse(std::move(unit), "DoQ internal error");
57a94421
CHB
529 return;
530 }
57a94421
CHB
531}
532
57a94421
CHB
533static void doq_dispatch_query(DOQServerConfig& dsc, PacketBuffer&& query, const ComboAddress& local, const ComboAddress& remote, const PacketBuffer& serverConnID, const uint64_t streamID)
534{
535 try {
3201998f
RG
536 auto unit = std::make_unique<DOQUnit>(std::move(query));
537 unit->dsc = &dsc;
538 unit->ids.origDest = local;
539 unit->ids.origRemote = remote;
540 unit->ids.protocol = dnsdist::Protocol::DoQ;
541 unit->serverConnID = serverConnID;
542 unit->streamID = streamID;
57a94421 543
3201998f 544 processDOQQuery(std::move(unit));
57a94421 545 }
fc7c7538 546 catch (const std::exception& exp) {
559942b3 547 vinfolog("Had error handling DoQ DNS packet from %s: %s", remote.toStringWithPort(), exp.what());
57a94421
CHB
548 }
549}
550
f7d508b5
CHB
551static void flushResponses(pdns::channel::Receiver<DOQUnit>& receiver)
552{
357b3faa 553 for (;;) {
f7d508b5
CHB
554 try {
555 auto tmp = receiver.receive();
556 if (!tmp) {
357b3faa 557 return;
f7d508b5
CHB
558 }
559
f016a7d5
RG
560 auto unit = std::move(*tmp);
561 auto conn = getConnection(unit->dsc->df->d_server_config->d_connections, unit->serverConnID);
562 if (conn) {
563 handleResponse(*unit->dsc->df, *conn, unit->streamID, unit->response);
564 }
f7d508b5
CHB
565 }
566 catch (const std::exception& e) {
567 errlog("Error while processing response received over DoQ: %s", e.what());
568 }
569 catch (...) {
570 errlog("Unspecified error while processing response received over DoQ");
571 }
572 }
573}
574
c6886da0
RG
575static void flushStalledResponses(Connection& conn)
576{
577 for (auto streamIt = conn.d_streamOutBuffers.begin(); streamIt != conn.d_streamOutBuffers.end();) {
578 const auto& streamID = streamIt->first;
579 auto& response = streamIt->second;
580 if (quiche_conn_stream_writable(conn.d_conn.get(), streamID, response.size()) == 1) {
581 if (tryWriteResponse(conn, streamID, response)) {
582 streamIt = conn.d_streamOutBuffers.erase(streamIt);
583 continue;
584 }
585 }
586 ++streamIt;
587 }
588}
589
b4c463e2
RG
590static void handleReadableStream(DOQFrontend& frontend, ClientState& clientState, Connection& conn, uint64_t streamID, const ComboAddress& client, const PacketBuffer& serverConnID)
591{
592 auto& streamBuffer = conn.d_streamBuffers[streamID];
0fe22933
RG
593 while (true) {
594 bool fin = false;
595 auto existingLength = streamBuffer.size();
596 streamBuffer.resize(existingLength + 512);
597 auto received = quiche_conn_stream_recv(conn.d_conn.get(), streamID,
598 &streamBuffer.at(existingLength), 512,
599 &fin);
600 if (received == 0 || received == QUICHE_ERR_DONE) {
601 streamBuffer.resize(existingLength);
b4c463e2
RG
602 return;
603 }
a6cfab84 604 if (received < 0) {
b4c463e2
RG
605 ++dnsdist::metrics::g_stats.nonCompliantQueries;
606 ++clientState.nonCompliantQueries;
607 quiche_conn_stream_shutdown(conn.d_conn.get(), streamID, QUICHE_SHUTDOWN_WRITE, static_cast<uint64_t>(DOQ_Error_Codes::DOQ_PROTOCOL_ERROR));
608 return;
609 }
0fe22933
RG
610
611 streamBuffer.resize(existingLength + received);
612 if (fin) {
613 break;
614 }
615 }
616
617 if (streamBuffer.size() < (sizeof(uint16_t) + sizeof(dnsheader))) {
618 ++dnsdist::metrics::g_stats.nonCompliantQueries;
619 ++clientState.nonCompliantQueries;
620 quiche_conn_stream_shutdown(conn.d_conn.get(), streamID, QUICHE_SHUTDOWN_WRITE, static_cast<uint64_t>(DOQ_Error_Codes::DOQ_PROTOCOL_ERROR));
621 return;
622 }
623
624 uint16_t payloadLength = streamBuffer.at(0) * 256 + streamBuffer.at(1);
625 streamBuffer.erase(streamBuffer.begin(), streamBuffer.begin() + 2);
626 if (payloadLength != streamBuffer.size()) {
627 ++dnsdist::metrics::g_stats.nonCompliantQueries;
628 ++clientState.nonCompliantQueries;
629 quiche_conn_stream_shutdown(conn.d_conn.get(), streamID, QUICHE_SHUTDOWN_WRITE, static_cast<uint64_t>(DOQ_Error_Codes::DOQ_PROTOCOL_ERROR));
630 return;
b4c463e2 631 }
0fe22933
RG
632 DEBUGLOG("Dispatching query");
633 doq_dispatch_query(*(frontend.d_server_config), std::move(streamBuffer), clientState.local, client, serverConnID, streamID);
634 conn.d_streamBuffers.erase(streamID);
b4c463e2
RG
635}
636
2e1c76aa 637static void handleSocketReadable(DOQFrontend& frontend, ClientState& clientState, Socket& sock, PacketBuffer& buffer)
cc62f9c3 638{
7769b7b7
RG
639 // destination connection ID, will have to be sent as original destination connection ID
640 PacketBuffer serverConnID;
641 // source connection ID, will have to be sent as destination connection ID
642 PacketBuffer clientConnID;
643 PacketBuffer tokenBuf;
f684e004 644 while (true) {
f684e004 645 ComboAddress client;
2e1c76aa 646 buffer.resize(4096);
a6cfab84 647 if (!sock.recvFromAsync(buffer, client) || buffer.empty()) {
cc62f9c3
RG
648 return;
649 }
ddc643a2 650 DEBUGLOG("Received DoQ datagram of size " << buffer.size() << " from " << client.toStringWithPort());
cc62f9c3 651
f684e004
RG
652 uint32_t version{0};
653 uint8_t type{0};
654 std::array<uint8_t, QUICHE_MAX_CONN_ID_LEN> scid{};
655 size_t scid_len = scid.size();
656 std::array<uint8_t, QUICHE_MAX_CONN_ID_LEN> dcid{};
657 size_t dcid_len = dcid.size();
658 std::array<uint8_t, MAX_TOKEN_LEN> token{};
659 size_t token_len = token.size();
cc62f9c3 660
2e1c76aa 661 auto res = quiche_header_info(buffer.data(), buffer.size(), LOCAL_CONN_ID_LEN,
f684e004
RG
662 &version, &type,
663 scid.data(), &scid_len,
664 dcid.data(), &dcid_len,
665 token.data(), &token_len);
666 if (res != 0) {
667 DEBUGLOG("Error in quiche_header_info: " << res);
668 continue;
669 }
670
7769b7b7
RG
671 serverConnID.assign(dcid.begin(), dcid.begin() + dcid_len);
672 clientConnID.assign(scid.begin(), scid.begin() + scid_len);
f684e004 673 auto conn = getConnection(frontend.d_server_config->d_connections, serverConnID);
cc62f9c3 674
cc62f9c3 675 if (!conn) {
f684e004 676 DEBUGLOG("Connection not found");
fc16a83b
RG
677 if (type != static_cast<uint8_t>(DOQ_Packet_Types::QUIC_PACKET_TYPE_INITIAL)) {
678 DEBUGLOG("Packet is not initial");
679 continue;
680 }
681
f684e004
RG
682 if (!quiche_version_is_supported(version)) {
683 DEBUGLOG("Unsupported version");
684 ++frontend.d_doqUnsupportedVersionErrors;
7769b7b7 685 handleVersionNegociation(sock, clientConnID, serverConnID, client, buffer);
f684e004
RG
686 continue;
687 }
688
689 if (token_len == 0) {
690 /* stateless retry */
691 DEBUGLOG("No token received");
7769b7b7 692 handleStatelessRetry(sock, clientConnID, serverConnID, client, version, buffer);
f684e004
RG
693 continue;
694 }
695
7769b7b7 696 tokenBuf.assign(token.begin(), token.begin() + token_len);
f684e004
RG
697 auto originalDestinationID = validateToken(tokenBuf, client);
698 if (!originalDestinationID) {
699 ++frontend.d_doqInvalidTokensReceived;
700 DEBUGLOG("Discarding invalid token");
701 continue;
702 }
703
704 DEBUGLOG("Creating a new connection");
705 conn = createConnection(*frontend.d_server_config, serverConnID, *originalDestinationID, clientState.local, client);
706 if (!conn) {
707 continue;
708 }
cc62f9c3 709 }
f684e004
RG
710 DEBUGLOG("Connection found");
711 quiche_recv_info recv_info = {
712 // NOLINTNEXTLINE(cppcoreguidelines-pro-type-reinterpret-cast)
713 reinterpret_cast<struct sockaddr*>(&client),
714 client.getSocklen(),
715 // NOLINTNEXTLINE(cppcoreguidelines-pro-type-reinterpret-cast)
716 reinterpret_cast<struct sockaddr*>(&clientState.local),
717 clientState.local.getSocklen(),
718 };
cc62f9c3 719
2e1c76aa 720 auto done = quiche_conn_recv(conn->get().d_conn.get(), buffer.data(), buffer.size(), &recv_info);
f684e004
RG
721 if (done < 0) {
722 continue;
723 }
cc62f9c3 724
8be79592 725 if (quiche_conn_is_established(conn->get().d_conn.get()) || quiche_conn_is_in_early_data(conn->get().d_conn.get())) {
f684e004 726 auto readable = std::unique_ptr<quiche_stream_iter, decltype(&quiche_stream_iter_free)>(quiche_conn_readable(conn->get().d_conn.get()), quiche_stream_iter_free);
cc62f9c3 727
f684e004
RG
728 uint64_t streamID = 0;
729 while (quiche_stream_iter_next(readable.get(), &streamID)) {
730 handleReadableStream(frontend, clientState, *conn, streamID, client, serverConnID);
731 }
4cec715d 732
7769b7b7 733 flushEgress(sock, conn->get().d_conn, client, buffer);
f684e004
RG
734 }
735 else {
736 DEBUGLOG("Connection not established");
cc62f9c3 737 }
cc62f9c3
RG
738 }
739}
740
57a94421 741// this is the entrypoint from dnsdist.cc
ec9548d7 742void doqThread(ClientState* clientState)
57a94421
CHB
743{
744 try {
ec9548d7 745 std::shared_ptr<DOQFrontend>& frontend = clientState->doqFrontend;
57a94421 746
ec9548d7
CHB
747 frontend->d_server_config->clientState = clientState;
748 frontend->d_server_config->df = clientState->doqFrontend;
57a94421 749
57a94421
CHB
750 setThreadName("dnsdist/doq");
751
ec9548d7 752 Socket sock(clientState->udpFD);
f684e004 753 sock.setNonBlocking();
57a94421 754
f7d508b5 755 auto mplexer = std::unique_ptr<FDMultiplexer>(FDMultiplexer::getMultiplexerSilent());
57a94421 756
f7d508b5
CHB
757 auto responseReceiverFD = frontend->d_server_config->d_responseReceiver.getDescriptor();
758 mplexer->addReadFD(sock.getHandle(), [](int, FDMultiplexer::funcparam_t&) {});
759 mplexer->addReadFD(responseReceiverFD, [](int, FDMultiplexer::funcparam_t&) {});
f684e004 760 std::vector<int> readyFDs;
2e1c76aa 761 PacketBuffer buffer(4096);
57a94421 762 while (true) {
f684e004 763 readyFDs.clear();
f7d508b5
CHB
764 mplexer->getAvailableFDs(readyFDs, 500);
765
f684e004
RG
766 try {
767 if (std::find(readyFDs.begin(), readyFDs.end(), sock.getHandle()) != readyFDs.end()) {
2e1c76aa 768 handleSocketReadable(*frontend, *clientState, sock, buffer);
f684e004 769 }
f7d508b5 770
f684e004
RG
771 if (std::find(readyFDs.begin(), readyFDs.end(), responseReceiverFD) != readyFDs.end()) {
772 flushResponses(frontend->d_server_config->d_responseReceiver);
773 }
182af582 774
f684e004
RG
775 for (auto conn = frontend->d_server_config->d_connections.begin(); conn != frontend->d_server_config->d_connections.end();) {
776 quiche_conn_on_timeout(conn->second.d_conn.get());
3dc49a89 777
7769b7b7 778 flushEgress(sock, conn->second.d_conn, conn->second.d_peer, buffer);
3dc49a89 779
f684e004 780 if (quiche_conn_is_closed(conn->second.d_conn.get())) {
3dc49a89 781#ifdef DEBUGLOG_ENABLED
f684e004
RG
782 quiche_stats stats;
783 quiche_path_stats path_stats;
3dc49a89 784
f684e004
RG
785 quiche_conn_stats(conn->second.d_conn.get(), &stats);
786 quiche_conn_path_stats(conn->second.d_conn.get(), 0, &path_stats);
3dc49a89 787
2e1c76aa 788 DEBUGLOG("Connection (DoQ) closed, recv=" << stats.recv << " sent=" << stats.sent << " lost=" << stats.lost << " rtt=" << path_stats.rtt << "ns cwnd=" << path_stats.cwnd);
3dc49a89 789#endif
f684e004
RG
790 conn = frontend->d_server_config->d_connections.erase(conn);
791 }
792 else {
793 flushStalledResponses(conn->second);
794 ++conn;
795 }
3dc49a89 796 }
57a94421 797 }
f684e004
RG
798 catch (const std::exception& exp) {
799 vinfolog("Caught exception in the main DoQ thread: %s", exp.what());
800 }
801 catch (...) {
802 vinfolog("Unknown exception in the main DoQ thread");
803 }
57a94421 804 }
57a94421
CHB
805 }
806 catch (const std::exception& e) {
f684e004 807 DEBUGLOG("Caught fatal error in the main DoQ thread: " << e.what());
57a94421
CHB
808 }
809}
acde6658
RG
810
811#endif /* HAVE_DNS_OVER_QUIC */