]> git.ipfire.org Git - thirdparty/pdns.git/commitdiff
dnsdist: Merge the 'main' and 'client' DoH threads 12386/head
authorRemi Gacogne <remi.gacogne@powerdns.com>
Sat, 2 Jul 2022 14:09:56 +0000 (16:09 +0200)
committerRemi Gacogne <remi.gacogne@powerdns.com>
Wed, 11 Jan 2023 13:52:06 +0000 (14:52 +0100)
When we are in "single acceptor thread" mode, merge the 'main' and
'client' DoH threads into a single one. We use separate threads to
reduce the separate the handling of the HTTP/2 traffic from the DNS
handling, to reduce latency, but that does not really make sense on
small devices with a single, limited CPU core. On these we prefer
using as few threads as possible to reduce the context switches and
the memory usage.

pdns/dnsdist.cc
pdns/dnsdistdist/doh.cc

index 9f76591b5a27a609d94fe0073f2dbf69c21988b2..599acccd254c1f2aab1824bb4ab4d11ff4e16280 100644 (file)
@@ -1640,6 +1640,10 @@ static void MultipleMessagesUDPClientThread(ClientState* cs, LocalHolders& holde
   };
   const size_t vectSize = g_udpVectorSize;
 
+  if (vectSize > std::numeric_limits<uint16_t>::max()) {
+    throw std::runtime_error("The value of setUDPMultipleMessagesVectorSize is too high, the maximum value is " + std::to_string(std::numeric_limits<uint16_t>::max()));
+  }
+
   auto recvData = std::make_unique<MMReceiver[]>(vectSize);
   auto msgVec = std::make_unique<struct mmsghdr[]>(vectSize);
   auto outMsgVec = std::make_unique<struct mmsghdr[]>(vectSize);
index fd993a25d128095247102435bf18de3f4d5ab189..cc4c549bf1d872d40689bc0a4112c5e6f6502acb 100644 (file)
@@ -172,6 +172,7 @@ struct DOHServerConfig
   DOHServerConfig(uint32_t idleTimeout, uint32_t internalPipeBufferSize): accept_ctx(std::make_shared<DOHAcceptContext>())
   {
     int fd[2];
+#ifndef USE_SINGLE_ACCEPTOR_THREAD
     if (pipe(fd) < 0) {
       unixDie("Creating a pipe for DNS over HTTPS");
     }
@@ -182,10 +183,13 @@ struct DOHServerConfig
     if (internalPipeBufferSize > 0) {
       setPipeBufferSize(dohquerypair[0], internalPipeBufferSize);
     }
+#endif /* USE_SINGLE_ACCEPTOR_THREAD */
 
     if (pipe(fd) < 0) {
+#ifndef USE_SINGLE_ACCEPTOR_THREAD
       close(dohquerypair[0]);
       close(dohquerypair[1]);
+#endif /* USE_SINGLE_ACCEPTOR_THREAD */
       unixDie("Creating a pipe for DNS over HTTPS");
     }
 
@@ -219,7 +223,9 @@ struct DOHServerConfig
   std::shared_ptr<DOHAcceptContext> accept_ctx{nullptr};
   ClientState* cs{nullptr};
   std::shared_ptr<DOHFrontend> df{nullptr};
+#ifndef USE_SINGLE_ACCEPTOR_THREAD
   int dohquerypair[2]{-1,-1};
+#endif /* USE_SINGLE_ACCEPTOR_THREAD */
   int dohresponsepair[2]{-1,-1};
 };
 
@@ -548,13 +554,30 @@ public:
     auto sender = std::make_shared<DoHTCPCrossQuerySender>(*query.d_idstate.cs);
     return sender;
   }
+
+  DOHUnitUniquePtr&& releaseDU()
+  {
+    return std::move(query.d_idstate.du);
+  }
 };
 
 /*
    We are not in the main DoH thread but in the DoH 'client' thread.
 */
-static void processDOHQuery(DOHUnitUniquePtr&& unit)
+static void processDOHQuery(DOHUnitUniquePtr&& unit, bool inMainThread = false)
 {
+  const auto handleImmediateResponse = [inMainThread](DOHUnitUniquePtr&& du, const char* reason) {
+    if (inMainThread) {
+      handleResponse(*du->dsc->df, du->req, du->status_code, du->response, du->dsc->df->d_customResponseHeaders, du->contentType, true);
+      /* so the unique pointer is stored in the InternalState which itself is stored in the unique pointer itself. We likely need
+         a better design, but for now let's just reset the internal one since we know it is no longer needed. */
+      du->ids.du.reset();
+    }
+    else {
+      sendDoHUnitToTheMainThread(std::move(du), reason);
+    }
+  };
+
   auto& ids = unit->ids;
   ids.du = std::move(unit);
   auto& du = ids.du;
@@ -567,9 +590,27 @@ static void processDOHQuery(DOHUnitUniquePtr&& unit)
       // but we should be fine as long as we don't touch du->req
       // outside of the main DoH thread
       du->status_code = 500;
-      sendDoHUnitToTheMainThread(std::move(du), "DoH killed in flight");
+      handleImmediateResponse(std::move(du), "DoH killed in flight");
       return;
     }
+
+    {
+      // if there was no EDNS, we add it with a large buffer size
+      // so we can use UDP to talk to the backend.
+      auto dh = const_cast<struct dnsheader*>(reinterpret_cast<const struct dnsheader*>(du->query.data()));
+
+      if (!dh->arcount) {
+        if (generateOptRR(std::string(), du->query, 4096, 4096, 0, false)) {
+          dh = const_cast<struct dnsheader*>(reinterpret_cast<const struct dnsheader*>(du->query.data())); // may have reallocated
+          dh->arcount = htons(1);
+          du->ids.ednsAdded = true;
+        }
+      }
+      else {
+        // we leave existing EDNS in place
+      }
+    }
+
     remote = du->ids.origRemote;
     DOHServerConfig* dsc = du->dsc;
     auto& holders = dsc->holders;
@@ -579,7 +620,7 @@ static void processDOHQuery(DOHUnitUniquePtr&& unit)
       ++g_stats.nonCompliantQueries;
       ++cs.nonCompliantQueries;
       du->status_code = 400;
-      sendDoHUnitToTheMainThread(std::move(du), "DoH non-compliant query");
+      handleImmediateResponse(std::move(du), "DoH non-compliant query");
       return;
     }
 
@@ -593,7 +634,7 @@ static void processDOHQuery(DOHUnitUniquePtr&& unit)
 
       if (!checkQueryHeaders(dh, cs)) {
         du->status_code = 400;
-        sendDoHUnitToTheMainThread(std::move(du), "DoH invalid headers");
+        handleImmediateResponse(std::move(du), "DoH invalid headers");
         return;
       }
 
@@ -602,7 +643,7 @@ static void processDOHQuery(DOHUnitUniquePtr&& unit)
         dh->qr = true;
         du->response = std::move(du->query);
 
-        sendDoHUnitToTheMainThread(std::move(du), "DoH empty query");
+        handleImmediateResponse(std::move(du), "DoH empty query");
         return;
       }
 
@@ -620,7 +661,7 @@ static void processDOHQuery(DOHUnitUniquePtr&& unit)
 
     if (result == ProcessQueryResult::Drop) {
       du->status_code = 403;
-      sendDoHUnitToTheMainThread(std::move(du), "DoH dropped query");
+      handleImmediateResponse(std::move(du), "DoH dropped query");
       return;
     }
 
@@ -633,20 +674,20 @@ static void processDOHQuery(DOHUnitUniquePtr&& unit)
 
         handleResponseSent(ids.qname, QType(ids.qtype), 0., du->ids.origDest, ComboAddress(), du->response.size(), *dh, dnsdist::Protocol::DoH, dnsdist::Protocol::DoH);
       }
-      sendDoHUnitToTheMainThread(std::move(du), "DoH self-answered response");
+      handleImmediateResponse(std::move(du), "DoH self-answered response");
       return;
     }
 
     if (result != ProcessQueryResult::PassToBackend) {
       du->status_code = 500;
-      sendDoHUnitToTheMainThread(std::move(du), "DoH no backend available");
+      handleImmediateResponse(std::move(du), "DoH no backend available");
       return;
     }
 
     auto downstream = du->downstream;
     if (downstream == nullptr) {
       du->status_code = 502;
-      sendDoHUnitToTheMainThread(std::move(du), "DoH no backend available");
+      handleImmediateResponse(std::move(du), "DoH no backend available");
       return;
     }
 
@@ -671,22 +712,29 @@ static void processDOHQuery(DOHUnitUniquePtr&& unit)
         return;
       }
       else {
-        cpq->handleInternalError();
+        if (inMainThread) {
+          du = cpq->releaseDU();
+          du->status_code = 502;
+          handleImmediateResponse(std::move(du), "DoH internal error");
+        }
+        else {
+          cpq->handleInternalError();
+        }
         return;
       }
     }
 
     ComboAddress dest = dq.ids.origDest;
     if (!assignOutgoingUDPQueryToBackend(downstream, htons(queryId), dq, du->query, dest)) {
-      du->status_code = 500;
-      sendDoHUnitToTheMainThread(std::move(du), "DoH internal error");
+      du->status_code = 502;
+      handleImmediateResponse(std::move(du), "DoH internal error");
       return;
     }
   }
   catch (const std::exception& e) {
     vinfolog("Got an error in DOH question thread while parsing a query from %s, id %d: %s", remote.toStringWithPort(), queryId, e.what());
     du->status_code = 500;
-    sendDoHUnitToTheMainThread(std::move(du), "DoH internal error");
+    handleImmediateResponse(std::move(du), "DoH internal error");
     return;
   }
 
@@ -788,10 +836,13 @@ static void doh_dispatch_query(DOHServerConfig* dsc, h2o_handler_t* self, h2o_re
       du->sni = sni;
     }
 #endif /* HAVE_H2O_SOCKET_GET_SSL_SERVER_NAME */
-
     du->self = reinterpret_cast<DOHUnit**>(h2o_mem_alloc_shared(&req->pool, sizeof(*self), on_generator_dispose));
     auto ptr = du.release();
     *(ptr->self) = ptr;
+
+#ifdef USE_SINGLE_ACCEPTOR_THREAD
+    processDOHQuery(DOHUnitUniquePtr(ptr, DOHUnit::release), true);
+#else /* USE_SINGLE_ACCEPTOR_THREAD */
     try  {
       static_assert(sizeof(ptr) <= PIPE_BUF, "Writes up to PIPE_BUF are guaranteed not to be interleaved and to either fully succeed or fail");
       ssize_t sent = write(dsc->dohquerypair[0], &ptr, sizeof(ptr));
@@ -813,6 +864,7 @@ static void doh_dispatch_query(DOHServerConfig* dsc, h2o_handler_t* self, h2o_re
         ptr->release();
       }
     }
+#endif /* USE_SINGLE_ACCEPTOR_THREAD */
   }
   catch (const std::exception& e) {
     vinfolog("Had error parsing DoH DNS packet from %s: %s", remote.toStringWithPort(), e.what());
@@ -1159,8 +1211,9 @@ void DOHUnit::setHTTPResponse(uint16_t statusCode, PacketBuffer&& body_, const s
   contentType = contentType_;
 }
 
+#ifndef USE_SINGLE_ACCEPTOR_THREAD
 /* query has been parsed by h2o, which called doh_handler() in the main DoH thread.
-   In order not to blockfor long, doh_handler() called doh_dispatch_query() which allocated
+   In order not to block for long, doh_handler() called doh_dispatch_query() which allocated
    a DOHUnit object and passed it to us */
 static void dnsdistclient(int qsock)
 {
@@ -1189,22 +1242,7 @@ static void dnsdistclient(int qsock)
         continue;
       }
 
-      // if there was no EDNS, we add it with a large buffer size
-      // so we can use UDP to talk to the backend.
-      auto dh = const_cast<struct dnsheader*>(reinterpret_cast<const struct dnsheader*>(du->query.data()));
-
-      if (!dh->arcount) {
-        if (generateOptRR(std::string(), du->query, 4096, 4096, 0, false)) {
-          dh = const_cast<struct dnsheader*>(reinterpret_cast<const struct dnsheader*>(du->query.data())); // may have reallocated
-          dh->arcount = htons(1);
-          du->ids.ednsAdded = true;
-        }
-      }
-      else {
-        // we leave existing EDNS in place
-      }
-
-      processDOHQuery(std::move(du));
+      processDOHQuery(std::move(du), false);
     }
     catch (const std::exception& e) {
       errlog("Error while processing query received over DoH: %s", e.what());
@@ -1214,6 +1252,7 @@ static void dnsdistclient(int qsock)
     }
   }
 }
+#endif /* USE_SINGLE_ACCEPTOR_THREAD */
 
 /* Called in the main DoH thread if h2o finds that dnsdist gave us an answer by writing into
    the dohresponsepair[0] side of the pipe so from:
@@ -1529,8 +1568,10 @@ void dohThread(ClientState* cs)
     dsc->df = cs->dohFrontend;
     dsc->h2o_config.server_name = h2o_iovec_init(df->d_serverTokens.c_str(), df->d_serverTokens.size());
 
+#ifndef USE_SINGLE_ACCEPTOR_THREAD
     std::thread dnsdistThread(dnsdistclient, dsc->dohquerypair[1]);
     dnsdistThread.detach(); // gets us better error reporting
+#endif
 
     setThreadName("dnsdist/doh");
     // I wonder if this registers an IP address.. I think it does