From: Remi Gacogne Date: Sat, 2 Jul 2022 14:09:56 +0000 (+0200) Subject: dnsdist: Merge the 'main' and 'client' DoH threads X-Git-Tag: dnsdist-1.8.0-rc1~119^2 X-Git-Url: http://git.ipfire.org/?a=commitdiff_plain;h=6c1dcafcabfdc63d64cadff945d8c1dd87c541bf;p=thirdparty%2Fpdns.git dnsdist: Merge the 'main' and 'client' DoH threads When we are in "single acceptor thread" mode, merge the 'main' and 'client' DoH threads into a single one. We use separate threads to reduce the separate the handling of the HTTP/2 traffic from the DNS handling, to reduce latency, but that does not really make sense on small devices with a single, limited CPU core. On these we prefer using as few threads as possible to reduce the context switches and the memory usage. --- diff --git a/pdns/dnsdist.cc b/pdns/dnsdist.cc index 9f76591b5a..599acccd25 100644 --- a/pdns/dnsdist.cc +++ b/pdns/dnsdist.cc @@ -1640,6 +1640,10 @@ static void MultipleMessagesUDPClientThread(ClientState* cs, LocalHolders& holde }; const size_t vectSize = g_udpVectorSize; + if (vectSize > std::numeric_limits::max()) { + throw std::runtime_error("The value of setUDPMultipleMessagesVectorSize is too high, the maximum value is " + std::to_string(std::numeric_limits::max())); + } + auto recvData = std::make_unique(vectSize); auto msgVec = std::make_unique(vectSize); auto outMsgVec = std::make_unique(vectSize); diff --git a/pdns/dnsdistdist/doh.cc b/pdns/dnsdistdist/doh.cc index fd993a25d1..cc4c549bf1 100644 --- a/pdns/dnsdistdist/doh.cc +++ b/pdns/dnsdistdist/doh.cc @@ -172,6 +172,7 @@ struct DOHServerConfig DOHServerConfig(uint32_t idleTimeout, uint32_t internalPipeBufferSize): accept_ctx(std::make_shared()) { int fd[2]; +#ifndef USE_SINGLE_ACCEPTOR_THREAD if (pipe(fd) < 0) { unixDie("Creating a pipe for DNS over HTTPS"); } @@ -182,10 +183,13 @@ struct DOHServerConfig if (internalPipeBufferSize > 0) { setPipeBufferSize(dohquerypair[0], internalPipeBufferSize); } +#endif /* USE_SINGLE_ACCEPTOR_THREAD */ if (pipe(fd) < 0) { +#ifndef USE_SINGLE_ACCEPTOR_THREAD close(dohquerypair[0]); close(dohquerypair[1]); +#endif /* USE_SINGLE_ACCEPTOR_THREAD */ unixDie("Creating a pipe for DNS over HTTPS"); } @@ -219,7 +223,9 @@ struct DOHServerConfig std::shared_ptr accept_ctx{nullptr}; ClientState* cs{nullptr}; std::shared_ptr df{nullptr}; +#ifndef USE_SINGLE_ACCEPTOR_THREAD int dohquerypair[2]{-1,-1}; +#endif /* USE_SINGLE_ACCEPTOR_THREAD */ int dohresponsepair[2]{-1,-1}; }; @@ -548,13 +554,30 @@ public: auto sender = std::make_shared(*query.d_idstate.cs); return sender; } + + DOHUnitUniquePtr&& releaseDU() + { + return std::move(query.d_idstate.du); + } }; /* We are not in the main DoH thread but in the DoH 'client' thread. */ -static void processDOHQuery(DOHUnitUniquePtr&& unit) +static void processDOHQuery(DOHUnitUniquePtr&& unit, bool inMainThread = false) { + const auto handleImmediateResponse = [inMainThread](DOHUnitUniquePtr&& du, const char* reason) { + if (inMainThread) { + handleResponse(*du->dsc->df, du->req, du->status_code, du->response, du->dsc->df->d_customResponseHeaders, du->contentType, true); + /* so the unique pointer is stored in the InternalState which itself is stored in the unique pointer itself. We likely need + a better design, but for now let's just reset the internal one since we know it is no longer needed. */ + du->ids.du.reset(); + } + else { + sendDoHUnitToTheMainThread(std::move(du), reason); + } + }; + auto& ids = unit->ids; ids.du = std::move(unit); auto& du = ids.du; @@ -567,9 +590,27 @@ static void processDOHQuery(DOHUnitUniquePtr&& unit) // but we should be fine as long as we don't touch du->req // outside of the main DoH thread du->status_code = 500; - sendDoHUnitToTheMainThread(std::move(du), "DoH killed in flight"); + handleImmediateResponse(std::move(du), "DoH killed in flight"); return; } + + { + // if there was no EDNS, we add it with a large buffer size + // so we can use UDP to talk to the backend. + auto dh = const_cast(reinterpret_cast(du->query.data())); + + if (!dh->arcount) { + if (generateOptRR(std::string(), du->query, 4096, 4096, 0, false)) { + dh = const_cast(reinterpret_cast(du->query.data())); // may have reallocated + dh->arcount = htons(1); + du->ids.ednsAdded = true; + } + } + else { + // we leave existing EDNS in place + } + } + remote = du->ids.origRemote; DOHServerConfig* dsc = du->dsc; auto& holders = dsc->holders; @@ -579,7 +620,7 @@ static void processDOHQuery(DOHUnitUniquePtr&& unit) ++g_stats.nonCompliantQueries; ++cs.nonCompliantQueries; du->status_code = 400; - sendDoHUnitToTheMainThread(std::move(du), "DoH non-compliant query"); + handleImmediateResponse(std::move(du), "DoH non-compliant query"); return; } @@ -593,7 +634,7 @@ static void processDOHQuery(DOHUnitUniquePtr&& unit) if (!checkQueryHeaders(dh, cs)) { du->status_code = 400; - sendDoHUnitToTheMainThread(std::move(du), "DoH invalid headers"); + handleImmediateResponse(std::move(du), "DoH invalid headers"); return; } @@ -602,7 +643,7 @@ static void processDOHQuery(DOHUnitUniquePtr&& unit) dh->qr = true; du->response = std::move(du->query); - sendDoHUnitToTheMainThread(std::move(du), "DoH empty query"); + handleImmediateResponse(std::move(du), "DoH empty query"); return; } @@ -620,7 +661,7 @@ static void processDOHQuery(DOHUnitUniquePtr&& unit) if (result == ProcessQueryResult::Drop) { du->status_code = 403; - sendDoHUnitToTheMainThread(std::move(du), "DoH dropped query"); + handleImmediateResponse(std::move(du), "DoH dropped query"); return; } @@ -633,20 +674,20 @@ static void processDOHQuery(DOHUnitUniquePtr&& unit) handleResponseSent(ids.qname, QType(ids.qtype), 0., du->ids.origDest, ComboAddress(), du->response.size(), *dh, dnsdist::Protocol::DoH, dnsdist::Protocol::DoH); } - sendDoHUnitToTheMainThread(std::move(du), "DoH self-answered response"); + handleImmediateResponse(std::move(du), "DoH self-answered response"); return; } if (result != ProcessQueryResult::PassToBackend) { du->status_code = 500; - sendDoHUnitToTheMainThread(std::move(du), "DoH no backend available"); + handleImmediateResponse(std::move(du), "DoH no backend available"); return; } auto downstream = du->downstream; if (downstream == nullptr) { du->status_code = 502; - sendDoHUnitToTheMainThread(std::move(du), "DoH no backend available"); + handleImmediateResponse(std::move(du), "DoH no backend available"); return; } @@ -671,22 +712,29 @@ static void processDOHQuery(DOHUnitUniquePtr&& unit) return; } else { - cpq->handleInternalError(); + if (inMainThread) { + du = cpq->releaseDU(); + du->status_code = 502; + handleImmediateResponse(std::move(du), "DoH internal error"); + } + else { + cpq->handleInternalError(); + } return; } } ComboAddress dest = dq.ids.origDest; if (!assignOutgoingUDPQueryToBackend(downstream, htons(queryId), dq, du->query, dest)) { - du->status_code = 500; - sendDoHUnitToTheMainThread(std::move(du), "DoH internal error"); + du->status_code = 502; + handleImmediateResponse(std::move(du), "DoH internal error"); return; } } catch (const std::exception& e) { vinfolog("Got an error in DOH question thread while parsing a query from %s, id %d: %s", remote.toStringWithPort(), queryId, e.what()); du->status_code = 500; - sendDoHUnitToTheMainThread(std::move(du), "DoH internal error"); + handleImmediateResponse(std::move(du), "DoH internal error"); return; } @@ -788,10 +836,13 @@ static void doh_dispatch_query(DOHServerConfig* dsc, h2o_handler_t* self, h2o_re du->sni = sni; } #endif /* HAVE_H2O_SOCKET_GET_SSL_SERVER_NAME */ - du->self = reinterpret_cast(h2o_mem_alloc_shared(&req->pool, sizeof(*self), on_generator_dispose)); auto ptr = du.release(); *(ptr->self) = ptr; + +#ifdef USE_SINGLE_ACCEPTOR_THREAD + processDOHQuery(DOHUnitUniquePtr(ptr, DOHUnit::release), true); +#else /* USE_SINGLE_ACCEPTOR_THREAD */ try { static_assert(sizeof(ptr) <= PIPE_BUF, "Writes up to PIPE_BUF are guaranteed not to be interleaved and to either fully succeed or fail"); ssize_t sent = write(dsc->dohquerypair[0], &ptr, sizeof(ptr)); @@ -813,6 +864,7 @@ static void doh_dispatch_query(DOHServerConfig* dsc, h2o_handler_t* self, h2o_re ptr->release(); } } +#endif /* USE_SINGLE_ACCEPTOR_THREAD */ } catch (const std::exception& e) { vinfolog("Had error parsing DoH DNS packet from %s: %s", remote.toStringWithPort(), e.what()); @@ -1159,8 +1211,9 @@ void DOHUnit::setHTTPResponse(uint16_t statusCode, PacketBuffer&& body_, const s contentType = contentType_; } +#ifndef USE_SINGLE_ACCEPTOR_THREAD /* query has been parsed by h2o, which called doh_handler() in the main DoH thread. - In order not to blockfor long, doh_handler() called doh_dispatch_query() which allocated + In order not to block for long, doh_handler() called doh_dispatch_query() which allocated a DOHUnit object and passed it to us */ static void dnsdistclient(int qsock) { @@ -1189,22 +1242,7 @@ static void dnsdistclient(int qsock) continue; } - // if there was no EDNS, we add it with a large buffer size - // so we can use UDP to talk to the backend. - auto dh = const_cast(reinterpret_cast(du->query.data())); - - if (!dh->arcount) { - if (generateOptRR(std::string(), du->query, 4096, 4096, 0, false)) { - dh = const_cast(reinterpret_cast(du->query.data())); // may have reallocated - dh->arcount = htons(1); - du->ids.ednsAdded = true; - } - } - else { - // we leave existing EDNS in place - } - - processDOHQuery(std::move(du)); + processDOHQuery(std::move(du), false); } catch (const std::exception& e) { errlog("Error while processing query received over DoH: %s", e.what()); @@ -1214,6 +1252,7 @@ static void dnsdistclient(int qsock) } } } +#endif /* USE_SINGLE_ACCEPTOR_THREAD */ /* Called in the main DoH thread if h2o finds that dnsdist gave us an answer by writing into the dohresponsepair[0] side of the pipe so from: @@ -1529,8 +1568,10 @@ void dohThread(ClientState* cs) dsc->df = cs->dohFrontend; dsc->h2o_config.server_name = h2o_iovec_init(df->d_serverTokens.c_str(), df->d_serverTokens.size()); +#ifndef USE_SINGLE_ACCEPTOR_THREAD std::thread dnsdistThread(dnsdistclient, dsc->dohquerypair[1]); dnsdistThread.detach(); // gets us better error reporting +#endif setThreadName("dnsdist/doh"); // I wonder if this registers an IP address.. I think it does