]> git.ipfire.org Git - thirdparty/pdns.git/commitdiff
dnsdist: Convert DoH to pdns::channel
authorRemi Gacogne <remi.gacogne@powerdns.com>
Fri, 8 Apr 2022 15:57:44 +0000 (17:57 +0200)
committerRemi Gacogne <remi.gacogne@powerdns.com>
Tue, 13 Jun 2023 07:59:36 +0000 (09:59 +0200)
pdns/dnsdistdist/doh.cc
pdns/doh.hh

index 021eec32d2d00d022753a7ed596933da601dc4ca..9feca652187628e6254d9a81e99de31aca51c010 100644 (file)
@@ -172,38 +172,20 @@ struct DOHServerConfig
 {
   DOHServerConfig(uint32_t idleTimeout, uint32_t internalPipeBufferSize): accept_ctx(std::make_shared<DOHAcceptContext>())
   {
-    int fd[2];
 #ifndef USE_SINGLE_ACCEPTOR_THREAD
-    if (pipe(fd) < 0) {
-      unixDie("Creating a pipe for DNS over HTTPS");
-    }
-    dohquerypair[0] = fd[1];
-    dohquerypair[1] = fd[0];
-
-    setNonBlocking(dohquerypair[0]);
-    if (internalPipeBufferSize > 0) {
-      setPipeBufferSize(dohquerypair[0], internalPipeBufferSize);
+    {
+      auto [sender, receiver] = pdns::channel::createObjectQueue<DOHUnit, void(*)(DOHUnit*)>(true, internalPipeBufferSize);
+      d_querySender = std::move(sender);
+      d_queryReceiver = std::move(receiver);
     }
 #endif /* USE_SINGLE_ACCEPTOR_THREAD */
 
-    if (pipe(fd) < 0) {
-#ifndef USE_SINGLE_ACCEPTOR_THREAD
-      close(dohquerypair[0]);
-      close(dohquerypair[1]);
-#endif /* USE_SINGLE_ACCEPTOR_THREAD */
-      unixDie("Creating a pipe for DNS over HTTPS");
-    }
-
-    dohresponsepair[0] = fd[1];
-    dohresponsepair[1] = fd[0];
-
-    setNonBlocking(dohresponsepair[0]);
-    if (internalPipeBufferSize > 0) {
-      setPipeBufferSize(dohresponsepair[0], internalPipeBufferSize);
+    {
+      auto [sender, receiver] = pdns::channel::createObjectQueue<DOHUnit, void(*)(DOHUnit*)>(true, internalPipeBufferSize);
+      d_responseSender = std::move(sender);
+      d_responseReceiver = std::move(receiver);
     }
 
-    setNonBlocking(dohresponsepair[1]);
-
     h2o_config_init(&h2o_config);
     h2o_config.http2.idle_timeout = idleTimeout * 1000;
     /* if you came here for a way to make the number of concurrent streams (concurrent requests per connection)
@@ -225,42 +207,28 @@ struct DOHServerConfig
   ClientState* cs{nullptr};
   std::shared_ptr<DOHFrontend> df{nullptr};
 #ifndef USE_SINGLE_ACCEPTOR_THREAD
-  int dohquerypair[2]{-1,-1};
+  pdns::channel::Sender<DOHUnit, void(*)(DOHUnit*)> d_querySender;
+  pdns::channel::Receiver<DOHUnit, void(*)(DOHUnit*)> d_queryReceiver;
 #endif /* USE_SINGLE_ACCEPTOR_THREAD */
-  int dohresponsepair[2]{-1,-1};
+  pdns::channel::Sender<DOHUnit, void(*)(DOHUnit*)> d_responseSender;
+  pdns::channel::Receiver<DOHUnit, void(*)(DOHUnit*)> d_responseReceiver;
 };
 
 /* This internal function sends back the object to the main thread to send a reply.
    The caller should NOT release or touch the unit after calling this function */
 static void sendDoHUnitToTheMainThread(DOHUnitUniquePtr&& du, const char* description)
 {
-  /* taking a naked pointer since we are about to send that pointer over a pipe */
-  auto ptr = du.release();
-  /* increasing the reference counter. This should not be strictly needed because
-     we already hold a reference and will only release it if we failed to send the
-     pointer over the pipe, but TSAN seems confused when the responder thread gets
-     a reply from a backend before the send() syscall sending the corresponding query
-     to that backend has returned in the initial thread.
-     The memory barrier needed to increase that counter seems to work around that.
-  */
-  ptr->get();
-  static_assert(sizeof(ptr) <= PIPE_BUF, "Writes up to PIPE_BUF are guaranteed not to be interleaved and to either fully succeed or fail");
-
-  ssize_t sent = write(ptr->rsock, &ptr, sizeof(ptr));
-  if (sent != sizeof(ptr)) {
-    if (errno == EAGAIN || errno == EWOULDBLOCK) {
+  if (du->responseSender == nullptr) {
+    return;
+  }
+  try {
+    if (!du->responseSender->send(std::move(du))) {
       ++g_stats.dohResponsePipeFull;
       vinfolog("Unable to pass a %s to the DoH worker thread because the pipe is full", description);
     }
-    else {
-      vinfolog("Unable to pass a %s to the DoH worker thread because we couldn't write to the pipe: %s", description, stringerror());
-    }
-
-    /* we fail to write over the pipe so we do not need to hold to that ref anymore */
-    ptr->release();
+  } catch (const std::exception& e) {
+    vinfolog("Unable to pass a %s to the DoH worker thread because we couldn't write to the pipe: %s", description, e.what());
   }
-  /* we decrement the counter incremented above at the beginning of that function */
-  ptr->release();
 }
 
 /* This function is called from other threads than the main DoH one,
@@ -452,7 +420,7 @@ public:
     }
 
     auto du = std::move(response.d_idstate.du);
-    if (du->rsock == -1) {
+    if (du->responseSender == nullptr) {
       return;
     }
 
@@ -514,7 +482,7 @@ public:
       return;
     }
 
-    if (query.du->rsock == -1) {
+    if (query.du->responseSender == nullptr) {
       return;
     }
 
@@ -869,13 +837,13 @@ static void doh_dispatch_query(DOHServerConfig* dsc, h2o_handler_t* self, h2o_re
     /* we are doing quite some copies here, sorry about that,
        but we can't keep accessing the req object once we are in a different thread
        because the request might get killed by h2o at pretty much any time */
-    auto du = std::make_unique<DOHUnit>(std::move(query), std::move(path), std::string(req->authority.base, req->authority.len));
+    auto du = std::unique_ptr<DOHUnit, void(*)(DOHUnit*)>(new DOHUnit(std::move(query), std::move(path), std::string(req->authority.base, req->authority.len)), DOHUnit::release);
     du->dsc = dsc;
     du->req = req;
     du->ids.origDest = local;
     du->ids.origRemote = remote;
     du->ids.protocol = dnsdist::Protocol::DoH;
-    du->rsock = dsc->dohresponsepair[0];
+    du->responseSender = &dsc->d_responseSender;
     if (req->scheme != nullptr) {
       du->scheme = std::string(req->scheme->name.base, req->scheme->name.len);
     }
@@ -897,32 +865,21 @@ static void doh_dispatch_query(DOHServerConfig* dsc, h2o_handler_t* self, h2o_re
     }
 #endif /* HAVE_H2O_SOCKET_GET_SSL_SERVER_NAME */
     du->self = reinterpret_cast<DOHUnit**>(h2o_mem_alloc_shared(&req->pool, sizeof(*self), on_generator_dispose));
-    auto ptr = du.release();
-    *(ptr->self) = ptr;
+    *(du->self) = du.get();
 
 #ifdef USE_SINGLE_ACCEPTOR_THREAD
-    processDOHQuery(DOHUnitUniquePtr(ptr, DOHUnit::release), true);
+    processDOHQuery(DOHUnitUniquePtr(du.release(), DOHUnit::release), true);
 #else /* USE_SINGLE_ACCEPTOR_THREAD */
-    try  {
-      static_assert(sizeof(ptr) <= PIPE_BUF, "Writes up to PIPE_BUF are guaranteed not to be interleaved and to either fully succeed or fail");
-      ssize_t sent = write(dsc->dohquerypair[0], &ptr, sizeof(ptr));
-      if (sent != sizeof(ptr)) {
-        if (errno == EAGAIN || errno == EWOULDBLOCK) {
-          ++g_stats.dohQueryPipeFull;
-          vinfolog("Unable to pass a DoH query to the DoH worker thread because the pipe is full");
-        }
-        else {
-          vinfolog("Unable to pass a DoH query to the DoH worker thread because we couldn't write to the pipe: %s", stringerror());
-        }
-        ptr->release();
-        ptr = nullptr;
+    try {
+      if (!dsc->d_querySender.send(std::move(du))) {
+        ++g_stats.dohQueryPipeFull;
+        vinfolog("Unable to pass a DoH query to the DoH worker thread because the pipe is full");
         h2o_send_error_500(req, "Internal Server Error", "Internal Server Error", 0);
       }
     }
     catch (...) {
-      if (ptr != nullptr) {
-        ptr->release();
-      }
+      vinfolog("Unable to pass a DoH query to the DoH worker thread because we couldn't write to the pipe: %s", stringerror());
+      h2o_send_error_500(req, "Internal Server Error", "Internal Server Error", 0);
     }
 #endif /* USE_SINGLE_ACCEPTOR_THREAD */
   }
@@ -1268,23 +1225,17 @@ void DOHUnit::setHTTPResponse(uint16_t statusCode, PacketBuffer&& body_, const s
 /* query has been parsed by h2o, which called doh_handler() in the main DoH thread.
    In order not to block for long, doh_handler() called doh_dispatch_query() which allocated
    a DOHUnit object and passed it to us */
-static void dnsdistclient(int qsock)
+static void dnsdistclient(pdns::channel::Receiver<DOHUnit, void(*)(DOHUnit*)>&& receiver)
 {
   setThreadName("dnsdist/doh-cli");
 
   for(;;) {
     try {
-      DOHUnit* ptr = nullptr;
-      ssize_t got = read(qsock, &ptr, sizeof(ptr));
-      if (got < 0) {
-        warnlog("Error receiving internal DoH query: %s", strerror(errno));
-        continue;
-      }
-      else if (static_cast<size_t>(got) < sizeof(ptr)) {
+      auto tmp = receiver.receive(DOHUnit::release);
+      if (!tmp) {
         continue;
       }
-
-      DOHUnitUniquePtr du(ptr, DOHUnit::release);
+      auto du = std::move(*tmp);
       /* we are not in the main DoH thread anymore, so there is a real risk of
          a race condition where h2o kills the query while we are processing it,
          so we can't touch the content of du->req until we are back into the
@@ -1308,7 +1259,7 @@ static void dnsdistclient(int qsock)
 #endif /* USE_SINGLE_ACCEPTOR_THREAD */
 
 /* Called in the main DoH thread if h2o finds that dnsdist gave us an answer by writing into
-   the dohresponsepair[0] side of the pipe so from:
+   the response channel so from:
    - handleDOHTimeout() when we did not get a response fast enough (called
      either from the health check thread (active) or from the frontend ones (reused))
    - dnsdistclient (error 500 because processDOHQuery() returned a negative value)
@@ -1321,23 +1272,21 @@ static void on_dnsdist(h2o_socket_t *listener, const char *err)
      for the CPU, the first thing we need to do is to send responses to free slots
      anyway, otherwise queries and responses are piling up in our pipes, consuming
      memory and likely coming up too late after the client has gone away */
+  DOHServerConfig* dsc = reinterpret_cast<DOHServerConfig*>(listener->data);
   while (true) {
-    DOHUnit *ptr = nullptr;
-    DOHServerConfig* dsc = reinterpret_cast<DOHServerConfig*>(listener->data);
-    ssize_t got = read(dsc->dohresponsepair[1], &ptr, sizeof(ptr));
-
-    if (got < 0) {
-      if (errno != EWOULDBLOCK && errno != EAGAIN) {
-        errlog("Error reading a DOH internal response: %s", strerror(errno));
+    std::unique_ptr<DOHUnit, void(*)(DOHUnit*)> du{nullptr, DOHUnit::release};
+    try {
+      auto tmp = dsc->d_responseReceiver.receive(DOHUnit::release);
+      if (!tmp) {
+        return;
       }
-      return;
+      du = std::move(*tmp);
     }
-    else if (static_cast<size_t>(got) != sizeof(ptr)) {
-      errlog("Error reading a DoH internal response, got %d bytes instead of the expected %d", got, sizeof(ptr));
+    catch (const std::exception& e) {
+      errlog("Error reading a DOH internal response: %s", e.what());
       return;
     }
 
-    DOHUnitUniquePtr du(ptr, DOHUnit::release);
     if (!du->req) { // it got killed in flight
       du->self = nullptr;
       continue;
@@ -1647,7 +1596,7 @@ void dohThread(ClientState* cs)
     dsc->h2o_config.server_name = h2o_iovec_init(df->d_serverTokens.c_str(), df->d_serverTokens.size());
 
 #ifndef USE_SINGLE_ACCEPTOR_THREAD
-    std::thread dnsdistThread(dnsdistclient, dsc->dohquerypair[1]);
+    std::thread dnsdistThread(dnsdistclient, std::move(dsc->d_queryReceiver));
     dnsdistThread.detach(); // gets us better error reporting
 #endif
 
@@ -1668,7 +1617,7 @@ void dohThread(ClientState* cs)
     dsc->h2o_ctx.storage.entries[0].data = dsc.get();
     ++dsc->h2o_ctx.storage.size;
 
-    auto sock = h2o_evloop_socket_create(dsc->h2o_ctx.loop, dsc->dohresponsepair[1], H2O_SOCKET_FLAG_DONT_READ);
+    auto sock = h2o_evloop_socket_create(dsc->h2o_ctx.loop, dsc->d_responseReceiver.getDescriptor(), H2O_SOCKET_FLAG_DONT_READ);
     sock->data = dsc.get();
 
     // this listens to responses from dnsdist to turn into http responses
index 96e65f1be2b331417f6b39d414e01429d1b7bac0..5cc94439d8f814524347db08ce4c0e7341d21801 100644 (file)
  * along with this program; if not, write to the Free Software
  * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
  */
+
 #pragma once
 
 #include <unordered_map>
 
+#include "channel.hh"
 #include "iputils.hh"
 #include "libssl.hh"
 #include "noinitvector.hh"
@@ -247,6 +249,7 @@ struct DOHUnit
   st_h2o_req_t* req{nullptr};
   DOHUnit** self{nullptr};
   DOHServerConfig* dsc{nullptr};
+  pdns::channel::Sender<DOHUnit, void(*)(DOHUnit*)>* responseSender{nullptr};
   std::atomic<uint64_t> d_refcnt{1};
   size_t query_at{0};
   size_t proxyProtocolPayloadSize{0};