]> git.ipfire.org Git - thirdparty/pdns.git/commitdiff
Working DoH between dnsdist and the backend!
authorRemi Gacogne <remi.gacogne@powerdns.com>
Mon, 19 Jul 2021 16:06:53 +0000 (18:06 +0200)
committerRemi Gacogne <remi.gacogne@powerdns.com>
Mon, 13 Sep 2021 13:28:27 +0000 (15:28 +0200)
pdns/dnsdist-lua.cc
pdns/dnsdist.cc
pdns/dnsdist.hh
pdns/dnsdistdist/Makefile.am
pdns/dnsdistdist/dnsdist-backend.cc
pdns/dnsdistdist/dnsdist-nghttp2.cc
pdns/dnsdistdist/dnsdist-nghttp2.hh
pdns/dnsdistdist/dnsdist-tcp-downstream.cc
pdns/dnsdistdist/dnsdist-tcp-downstream.hh
pdns/dnsdistdist/dnsdist-tcp.hh

index 69663e5191b8cf3566e2b7273c35774e4c7c4701..b1f8465b1b7944680948e76d5cbfc55dadba3321 100644 (file)
@@ -530,6 +530,10 @@ static void setupLuaConfig(LuaContext& luaCtx, bool client, bool configCheck)
         ret->d_tlsCtx = getTLSContext(tlsParams);
       }
 
+      if (vars.count("dohPath")) {
+        ret->d_dohPath = boost::get<string>(vars.at("dohPath"));
+      }
+
       /* this needs to be done _AFTER_ the order has been set,
          since the server are kept ordered inside the pool */
       auto localPools = g_pools.getCopy();
index de23e7b93307f5f08f7dcd74ad1db9d8d83c10d9..490a3d52c8db6d3462940ae4aaf6dd1ff660da9e 100644 (file)
@@ -51,6 +51,7 @@
 #include "dnsdist-ecs.hh"
 #include "dnsdist-healthchecks.hh"
 #include "dnsdist-lua.hh"
+#include "dnsdist-nghttp2.hh"
 #include "dnsdist-proxy-protocol.hh"
 #include "dnsdist-rings.hh"
 #include "dnsdist-secpoll.hh"
@@ -1505,12 +1506,8 @@ static void processUDPQuery(ClientState& cs, LocalHolders& holders, const struct
       }
       auto cpq = std::make_unique<UDPCrossProtocolQuery>(std::move(query), std::move(ids), ss);
 
-      if (g_tcpclientthreads && g_tcpclientthreads->passCrossProtocolQueryToThread(std::move(cpq))) {
-        return ;
-      }
-      else {
-        return;
-      }
+      ss->passCrossProtocolQuery(std::move(cpq));
+      return;
     }
 
     unsigned int idOffset = (ss->idOffset++) % ss->idStates.size();
@@ -2171,8 +2168,6 @@ static void sighandler(int sig)
 }
 #endif
 
-#include "dnsdist-nghttp2.hh"
-
 int main(int argc, char** argv)
 {
   try {
@@ -2553,6 +2548,8 @@ int main(int argc, char** argv)
 
     g_tcpclientthreads = std::make_unique<TCPClientCollection>(*g_maxTCPClientThreads);
 
+    initDoHWorkers();
+
     for (auto& t : todo) {
       t();
     }
@@ -2640,8 +2637,6 @@ int main(int argc, char** argv)
       secpollthread.detach();
     }
 
-    sendHTTP2Query();
-
     if(g_cmdLine.beSupervised) {
 #ifdef HAVE_SYSTEMD
       sd_notify(0, "READY=1");
index 85fa96ea2e066e5957c64fc3663ba41ce318aa66..4ba0b80876fad176d607376b4bff3205c6dba64f 100644 (file)
@@ -649,6 +649,8 @@ struct ClientState
   }
 };
 
+struct CrossProtocolQuery;
+
 struct DownstreamState
 {
    typedef std::function<std::tuple<DNSName, uint16_t, uint16_t>(const DNSName&, uint16_t, uint16_t, dnsheader*)> checkfunc_t;
@@ -662,6 +664,7 @@ struct DownstreamState
   std::vector<int> sockets;
   const std::string sourceItfName;
   std::string d_tlsSubjectName;
+  std::string d_dohPath;
   std::mutex connectLock;
   LockGuarded<std::unique_ptr<FDMultiplexer>> mplexer{nullptr};
   std::shared_ptr<TLSCtx> d_tlsCtx{nullptr};
@@ -823,6 +826,8 @@ struct DownstreamState
     return d_tcpOnly || d_tlsCtx != nullptr;
   }
 
+  bool passCrossProtocolQuery(std::unique_ptr<CrossProtocolQuery>&& cpq);
+
 private:
   std::string name;
   std::string nameWithAddr;
index 5928ca9e8b312329c03eee2b56681513e1ca60f5..03ae3e1110204af63f75933be85f9c1fd7d3ac32 100644 (file)
@@ -242,6 +242,7 @@ testrunner_SOURCES = \
        dnsdist-lua-ffi-interface.h dnsdist-lua-ffi-interface.inc \
        dnsdist-lua-ffi.cc dnsdist-lua-ffi.hh \
        dnsdist-lua-vars.cc \
+       dnsdist-nghttp2.cc dnsdist-nghttp2.hh \
        dnsdist-protocols.cc dnsdist-protocols.hh \
        dnsdist-proxy-protocol.cc dnsdist-proxy-protocol.hh \
        dnsdist-rings.cc dnsdist-rings.hh \
@@ -321,6 +322,7 @@ testrunner_LDFLAGS = \
        $(AM_LDFLAGS) \
        $(PROGRAM_LDFLAGS) \
        $(BOOST_UNIT_TEST_FRAMEWORK_LDFLAGS) \
+       -lnghttp2 \
        -pthread
 
 testrunner_LDADD = \
index d0ffe2eecb262c34afd33accd8655ad810de6b0f..ceef5574b4001c19dbf63e121d389ffe14bb56af 100644 (file)
  */
 
 #include "dnsdist.hh"
+#include "dnsdist-nghttp2.hh"
+#include "dnsdist-tcp.hh"
 #include "dolog.hh"
 
+
+bool DownstreamState::passCrossProtocolQuery(std::unique_ptr<CrossProtocolQuery>&& cpq)
+{
+  if (d_dohPath.empty()) {
+    return g_tcpclientthreads && g_tcpclientthreads->passCrossProtocolQueryToThread(std::move(cpq));
+  }
+  else {
+    return g_dohClientThreads && g_dohClientThreads->passCrossProtocolQueryToThread(std::move(cpq));
+  }
+}
+
 bool DownstreamState::reconnect()
 {
   std::unique_lock<std::mutex> tl(connectLock, std::try_to_lock);
index 9e95676b6224d5ccdb666aab4d848d66c2b533ad..592a21f6d4d64dbcf7adf5068d9de6c076536b9a 100644 (file)
+/*
+ * This file is part of PowerDNS or dnsdist.
+ * Copyright -- PowerDNS.COM B.V. and its contributors
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of version 2 of the GNU General Public License as
+ * published by the Free Software Foundation.
+ *
+ * In addition, for the avoidance of any doubt, permission is granted to
+ * link this program with OpenSSL and to (re)distribute the binaries
+ * produced as the result of such linking.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
+ */
 
 #include <nghttp2/nghttp2.h>
 
+#include "dnsdist-nghttp2.hh"
+#include "dnsdist-tcp.hh"
+#include "dnsdist-tcp-downstream.hh"
+
+#include "dolog.hh"
 #include "iputils.hh"
 #include "libssl.hh"
 #include "noinitvector.hh"
 #include "tcpiohandler.hh"
+#include "threadname.hh"
 #include "sstuff.hh"
 
 #warning remove me
 #include "dnswriter.hh"
 
-struct MyUserData
+std::atomic<uint64_t> g_dohStatesDumpRequested{0};
+std::unique_ptr<DoHClientCollection> g_dohClientThreads{nullptr};
+
+class DoHConnectionToBackend: public TCPConnectionToBackend
+{
+public:
+  DoHConnectionToBackend(std::shared_ptr<DownstreamState> ds, std::unique_ptr<FDMultiplexer>& mplexer, const struct timeval& now);
+
+  void handleTimeout(const struct timeval& now, bool write) override
+  {
+#warning FIXME: we should notify the owners of pending queries / responses
+  }
+
+  void queueQuery(std::shared_ptr<TCPQuerySender>& sender, TCPQuery&& query) override;
+
+  std::string toString() const override
+  {
+    ostringstream o;
+    //o << "DoH connection to backend "<<(d_ds ? d_ds->getName() : "empty")<<" over FD "<<(d_handler ? std::to_string(d_handler->getDescriptor()) : "no socket")<<", state is "<<(int)d_state<<", io state is "<<(d_ioState ? std::to_string((int)d_ioState->getState()) : "empty")<<", queries count is "<<d_queries<<", pending queries count is "<<d_pendingQueries.size()<<", "<<d_pendingResponses.size()<<" pending responses";
+    o << "DoH connection to backend "<<(d_ds ? d_ds->getName() : "empty")<<" over FD "<<(d_handler ? std::to_string(d_handler->getDescriptor()) : "no socket");
+    return o.str();
+  }
+
+private:
+  static ssize_t send_callback(nghttp2_session* session, const uint8_t* data, size_t length, int flags, void* user_data);
+  static int on_frame_recv_callback(nghttp2_session* session, const nghttp2_frame* frame, void* user_data);
+  static int on_data_chunk_recv_callback(nghttp2_session* session, uint8_t flags, int32_t stream_id, const uint8_t* data, size_t len, void* user_data);
+  static int on_stream_close_callback(nghttp2_session* session, int32_t stream_id, uint32_t error_code, void* user_data);
+  static int on_header_callback(nghttp2_session* session, const nghttp2_frame* frame, const uint8_t* name, size_t namelen, const uint8_t* value, size_t valuelen, uint8_t flags, void* user_data);
+  static int on_begin_headers_callback(nghttp2_session* session, const nghttp2_frame* frame, void* user_data);
+  static int on_error_callback(nghttp2_session* session, int lib_error_code, const char* msg, size_t len, void* user_data);
+  static void handleReadableIOCallback(int fd, FDMultiplexer::funcparam_t& param);
+  static void handleWritableIOCallback(int fd, FDMultiplexer::funcparam_t& param);
+  static void handleIO(std::shared_ptr<DoHConnectionToBackend>& conn, const struct timeval& now);
+
+  class PendingRequest
+  {
+  public:
+    std::shared_ptr<TCPQuerySender> d_sender{nullptr};
+    TCPQuery d_query;
+    PacketBuffer d_buffer;
+    bool d_finished{false};
+  };
+  void updateIO(IOState newState, FDMultiplexer::callbackfunc_t callback);
+  void stopIO();
+  void handleResponse(PendingRequest&& request);
+
+  //std::deque<TCPQuery> d_pendingQueries;
+
+  std::unique_ptr<nghttp2_session, void(*)(nghttp2_session*)> d_session{nullptr, nghttp2_session_del};
+  std::unordered_map<int32_t, PendingRequest> d_currentStreams;
+  PacketBuffer d_out;
+  PacketBuffer d_in;
+  size_t d_outPos{0};
+  size_t d_inPos{0};
+};
+
+
+void DoHConnectionToBackend::handleResponse(PendingRequest&& request)
+{
+  cerr<<"handle response!"<<endl;
+  struct timeval now;
+  gettimeofday(&now, nullptr);
+  request.d_sender->handleResponse(now, TCPResponse(std::move(request.d_buffer), std::move(request.d_query.d_idstate), shared_from_this()));
+}
+
+#define MAKE_NV(NAME, VALUE, VALUELEN)                                         \
+  {                                                                            \
+    (uint8_t *)NAME, (uint8_t *)VALUE, sizeof(NAME) - 1, VALUELEN,             \
+        NGHTTP2_NV_FLAG_NONE                                                   \
+  }
+
+#define MAKE_NV2(NAME, VALUE)                                                  \
+  {                                                                            \
+    (uint8_t *)NAME, (uint8_t *)VALUE, sizeof(NAME) - 1, sizeof(VALUE) - 1,    \
+        NGHTTP2_NV_FLAG_NONE                                                   \
+  }
+
+void DoHConnectionToBackend::queueQuery(std::shared_ptr<TCPQuerySender>& sender, TCPQuery&& query)
+{
+  /* we could use nghttp2_nv_flag.NGHTTP2_NV_FLAG_NO_COPY_NAME and nghttp2_nv_flag.NGHTTP2_NV_FLAG_NO_COPY_VALUE
+     to avoid a copy and lowercasing as long as we take care of making sure that the data will outlive the request
+     and that it is already lowercased. */
+  auto payloadSize = std::to_string(query.d_buffer.size());
+  d_currentQuery = std::move(query);
+  const nghttp2_nv hdrs[] = {
+      MAKE_NV2(":method", "POST"),
+      MAKE_NV2(":scheme", "https"),
+      MAKE_NV(":authority", d_ds->d_tlsSubjectName.c_str(), d_ds->d_tlsSubjectName.size()),
+      MAKE_NV(":path", d_ds->d_dohPath.c_str(), d_ds->d_dohPath.size()),
+      MAKE_NV2("accept", "application/dns-message"),
+      MAKE_NV2("content-type", "application/dns-message"),
+      MAKE_NV("content-length", payloadSize.c_str(), payloadSize.size()),
+      MAKE_NV2("user-agent", "nghttp2-" NGHTTP2_VERSION "/dnsdist")
+  };
+
+  /* if data_prd is not NULL, it provides data which will be sent in subsequent DATA frames. In this case, a method that allows request message bodies (https://tools.ietf.org/html/rfc7231#section-4) must be specified with :method key in nva (e.g. POST). This function does not take ownership of the data_prd. The function copies the members of the data_prd. If data_prd is NULL, HEADERS have END_STREAM set
+   */
+  cerr<<"Remote size window is "<<nghttp2_session_get_remote_window_size(d_session.get())<<endl;
+
+  nghttp2_data_provider data_provider;
+  data_provider.source.ptr = this;
+  data_provider.read_callback = [](nghttp2_session* session, int32_t stream_id, uint8_t* buf, size_t length, uint32_t* data_flags, nghttp2_data_source* source, void* user_data) -> ssize_t
+  {
+    cerr<<"in data provider"<<endl;
+    auto userData = reinterpret_cast<DoHConnectionToBackend*>(user_data);
+    if (userData->d_inPos >= userData->d_currentQuery.d_buffer.size()) {
+       *data_flags |= NGHTTP2_DATA_FLAG_EOF;
+       cerr<<"EOF"<<endl;
+       return 0;
+    }
+    size_t remaining = userData->d_currentQuery.d_buffer.size()- userData->d_inPos;
+    size_t toCopy = length > remaining ? remaining : length;
+    memcpy(buf, &userData->d_currentQuery.d_buffer.at(userData->d_inPos), toCopy);
+    userData->d_inPos += toCopy;
+    cerr<<toCopy<<" written"<<endl;
+    return toCopy;
+  };
+
+  auto stream_id = nghttp2_submit_request(d_session.get(), nullptr, hdrs, sizeof(hdrs)/sizeof(*hdrs), &data_provider, this);
+  if (stream_id < 0) {
+    cerr<<"Could not submit HTTP request: "<<nghttp2_strerror(stream_id)<<endl;
+    return;
+  }
+  cerr<<"stream ID is "<<stream_id<<endl;
+  auto rv = nghttp2_session_send(d_session.get());
+  cerr<<"nghttp2_session_send returned "<<rv<<endl;
+  if (rv != 0) {
+    throw std::runtime_error("Error in nghttp2_session_send:" + std::to_string(rv));
+  }
+  PendingRequest request;
+  request.d_query = std::move(d_currentQuery);
+  request.d_sender = std::move(sender);
+  auto insertPair = d_currentStreams.insert({stream_id, std::move(request)});
+  if (!insertPair.second) {
+    cerr<<"collision!!"<<endl;
+    /* there is a stream ID collision, something is very wrong! */
+    nghttp2_session_terminate_session(d_session.get(), NGHTTP2_NO_ERROR);
+    throw std::runtime_error("Stream ID collision");
+  }
+}
+
+class DoHClientThreadData
 {
-  std::unique_ptr<nghttp2_session, void(*)(nghttp2_session*)> session{nullptr, nghttp2_session_del};
-  std::unique_ptr<TCPIOHandler> handler;
-  PacketBuffer out;
-  PacketBuffer in;
-  size_t outPos{0};
-  size_t inPos{0};
+public:
+  DoHClientThreadData(): mplexer(std::unique_ptr<FDMultiplexer>(FDMultiplexer::getMultiplexerSilent()))
+  {
+  }
+
+  std::unique_ptr<FDMultiplexer> mplexer{nullptr};
 };
 
-static ssize_t send_callback(nghttp2_session* session, const uint8_t* data, size_t length, int flags, void* user_data) {
+void DoHConnectionToBackend::handleIO(std::shared_ptr<DoHConnectionToBackend>& conn, const struct timeval& now)
+{
+}
+
+void DoHConnectionToBackend::handleReadableIOCallback(int fd, FDMultiplexer::funcparam_t& param)
+{
+  cerr<<"in "<<__PRETTY_FUNCTION__<<", param is "<<param.type().name()<<endl;
+  auto conn = boost::any_cast<std::shared_ptr<DoHConnectionToBackend>>(param);
+  if (fd != conn->getHandle()) {
+    throw std::runtime_error("Unexpected socket descriptor " + std::to_string(fd) + " received in " + std::string(__PRETTY_FUNCTION__) + ", expected " + std::to_string(conn->getHandle()));
+  }
+
+  IOStateGuard ioGuard(conn->d_ioState);
+  do {
+    conn->d_inPos = 0;
+    conn->d_in.resize(conn->d_in.size() + 512);
+    cerr<<"trying to read "<<conn->d_in.size()<<endl;
+    try {
+      IOState newState = conn->d_handler->tryRead(conn->d_in, conn->d_inPos, conn->d_in.size(), true);
+      // userData.d_handler->tryRead(userData.d_in, pos, userData.d_in.size());
+      cerr<<"got a "<<(int)newState<<" state and "<<conn->d_inPos<<" bytes"<<endl;
+      conn->d_in.resize(conn->d_inPos);
+      if (newState == IOState::Done) {
+        auto readlen = nghttp2_session_mem_recv(conn->d_session.get(), conn->d_in.data(), conn->d_inPos);
+        cerr<<"nghttp2_session_mem_recv returned "<<readlen<<endl;
+        /* as long as we don't require a pause by returning nghttp2_error.NGHTTP2_ERR_PAUSE from a CB,
+           all data should be consumed before returning */
+        if (readlen > 0 && static_cast<size_t>(readlen) < conn->d_inPos) {
+          cerr<<"Fatal error: "<<nghttp2_strerror((int)readlen)<<endl;
+          return;
+        }
+        int rv = nghttp2_session_send(conn->d_session.get());
+        cerr<<"nghttp2_session_send returned "<<rv<<endl;
+      }
+      else {
+        if (newState == IOState::NeedWrite) {
+          conn->updateIO(IOState::NeedWrite, handleReadableIOCallback);
+        }
+        ioGuard.release();
+        break;
+      }
+    }
+    catch (const std::exception& e) {
+      cerr<<"got exception "<<e.what()<<endl;
+      break;
+    }
+  }
+  while (true);
+
+  //struct timeval now;
+  //gettimeofday(&now, nullptr);
+  //handleIO(conn, now);
+}
+
+void DoHConnectionToBackend::handleWritableIOCallback(int fd, FDMultiplexer::funcparam_t& param)
+{
+  cerr<<"in "<<__PRETTY_FUNCTION__<<", param is "<<param.type().name()<<endl;
+  auto conn = boost::any_cast<std::shared_ptr<DoHConnectionToBackend>>(param);
+  if (fd != conn->getHandle()) {
+    throw std::runtime_error("Unexpected socket descriptor " + std::to_string(fd) + " received in " + std::string(__PRETTY_FUNCTION__) + ", expected " + std::to_string(conn->getHandle()));
+  }
+  IOStateGuard ioGuard(conn->d_ioState);
+
+  cerr<<"trying to write "<<conn->d_out.size()-conn->d_outPos<<endl;
+  try {
+    IOState newState = conn->d_handler->tryWrite(conn->d_out, conn->d_outPos, conn->d_out.size());
+    cerr<<"got a "<<(int)newState<<" state, "<<conn->d_out.size()-conn->d_inPos<<" bytes remaining"<<endl;
+    if (newState == IOState::NeedRead) {
+      conn->updateIO(IOState::NeedRead, handleWritableIOCallback);
+    }
+    else if (newState == IOState::Done) {
+      conn->d_out.clear();
+      conn->d_outPos = 0;
+      conn->stopIO();
+      conn->updateIO(IOState::NeedRead, handleReadableIOCallback);
+    }
+    ioGuard.release();
+  }
+  catch (const std::exception& e) {
+    cerr<<"got exception "<<e.what()<<endl;
+  }
+
+  //struct timeval now;
+  //gettimeofday(&now, nullptr);
+  //handleIO(conn, now);
+}
+
+void DoHConnectionToBackend::stopIO()
+{
+  d_ioState->reset();
+}
+
+void DoHConnectionToBackend::updateIO(IOState newState, FDMultiplexer::callbackfunc_t callback)
+{
+  struct timeval now;
+  gettimeofday(&now, nullptr);
+  boost::optional<struct timeval> ttd{boost::none};
+  if (newState == IOState::NeedRead) {
+    ttd = getBackendReadTTD(now);
+  }
+  else if (isFresh() && d_queries == 0) {
+    /* first write just after the non-blocking connect */
+    ttd = getBackendConnectTTD(now);
+  }
+  else {
+    ttd = getBackendWriteTTD(now);
+  }
+
+  auto shared = std::dynamic_pointer_cast<DoHConnectionToBackend>(shared_from_this());
+  if (shared) {
+    if (newState == IOState::NeedRead) {
+      d_ioState->update(newState, callback, shared, ttd);
+    }
+    else if (newState == IOState::NeedWrite) {
+      d_ioState->update(newState, callback, shared, ttd);
+    }
+  }
+}
+
+ssize_t DoHConnectionToBackend::send_callback(nghttp2_session* session, const uint8_t* data, size_t length, int flags, void* user_data) {
   cerr<<"in "<<__PRETTY_FUNCTION__<<endl;
   cerr<<"asked to send "<<length<<" bytes"<<endl;
-  MyUserData* userData = reinterpret_cast<MyUserData*>(user_data);
-  userData->out.insert(userData->out.end(), data, data + length);
-  userData->handler->write(userData->out.data() + userData->outPos, userData->out.size() - userData->outPos, timeval{2, 0});
-  userData->out.clear();
+  DoHConnectionToBackend* userData = reinterpret_cast<DoHConnectionToBackend*>(user_data);
+  bool bufferWasEmpty = userData->d_out.empty();
+  userData->d_out.insert(userData->d_out.end(), data, data + length);
+
+  if (bufferWasEmpty) {
+    auto state = userData->d_handler->tryWrite(userData->d_out, userData->d_outPos, userData->d_out.size());
+    if (state == IOState::Done) {
+      userData->d_out.clear();
+#warning FIXME from now on we need to read, as we might get an answer
+      cerr<<"FIXME now we need to read!"<<endl;
+      //if (currentIOState does not have NeedRead) {
+      //  userData->addToIOState(IOState::NeedRead);
+      //}
+    }
+    else {
+#warning write me should be addIO() instead, perhaps?
+      cerr<<"now we need to wait for a writable (or readable) socket"<<endl;
+      userData->updateIO(state, handleWritableIOCallback);
+    }
+  }
+
   return length;
 }
 
-static int on_frame_recv_callback(nghttp2_session* session, const nghttp2_frame* frame, void* user_data) {
+int DoHConnectionToBackend::on_frame_recv_callback(nghttp2_session* session, const nghttp2_frame* frame, void* user_data) {
   cerr<<"in "<<__PRETTY_FUNCTION__<<endl;
-  MyUserData* userData = reinterpret_cast<MyUserData*>(user_data);
+  DoHConnectionToBackend* conn = reinterpret_cast<DoHConnectionToBackend*>(user_data);
+  cerr<<"Frame type is "<<std::to_string(frame->hd.type)<<endl;
   switch (frame->hd.type) {
   case NGHTTP2_HEADERS:
+    cerr<<"got headers"<<endl;
     if (frame->headers.cat == NGHTTP2_HCAT_RESPONSE) {
       cerr<<"All headers received"<<endl;
     }
@@ -49,22 +356,63 @@ static int on_frame_recv_callback(nghttp2_session* session, const nghttp2_frame*
       cerr<<"- "<<frame->settings.iv[idx].settings_id<<" "<<frame->settings.iv[idx].value<<endl;
     }
     break;
+  case NGHTTP2_DATA:
+    cerr<<"got data"<<endl;
+    break;
+  case NGHTTP2_PRIORITY:
+    cerr<<"got priority"<<endl;
+    break;
+  case NGHTTP2_GOAWAY:
+    cerr<<"Got GO AWAY"<<endl;
+    break;
+  }
+
+  /* is this the last frame for this stream? */
+  if ((frame->hd.type == NGHTTP2_HEADERS || frame->hd.type == NGHTTP2_DATA) && frame->hd.flags & NGHTTP2_FLAG_END_STREAM) {
+    auto stream = conn->d_currentStreams.find(frame->hd.stream_id);
+    if (stream != conn->d_currentStreams.end()) {
+      cerr<<"Stream "<<frame->hd.stream_id<<" is now finished"<<endl;
+      stream->second.d_finished = true;
+
+      auto request = std::move(stream->second);
+      conn->d_currentStreams.erase(stream->first);
+      conn->handleResponse(std::move(request));
+    }
+    else {
+      cerr<<"Stream "<<frame->hd.stream_id<<" NOT FOUND"<<endl;
+    }
   }
 
   return 0;
 }
 
-static int on_data_chunk_recv_callback(nghttp2_session* session, uint8_t flags, int32_t stream_id, const uint8_t* data, size_t len, void* user_data) {
+int DoHConnectionToBackend::on_data_chunk_recv_callback(nghttp2_session* session, uint8_t flags, int32_t stream_id, const uint8_t* data, size_t len, void* user_data) {
   cerr<<"in "<<__PRETTY_FUNCTION__<<endl;
-  MyUserData* userData = reinterpret_cast<MyUserData*>(user_data);
-  cerr<<"Got data of size "<<len<<endl;
-  cerr<<std::string(reinterpret_cast<const char*>(data), len)<<endl;
+  DoHConnectionToBackend* conn = reinterpret_cast<DoHConnectionToBackend*>(user_data);
+  cerr<<"Got data of size "<<len<<" for stream "<<stream_id<<endl;
+  auto stream = conn->d_currentStreams.find(stream_id);
+  if (stream == conn->d_currentStreams.end()) {
+    cerr<<"Unable to match the stream ID "<<stream_id<<" to a known one!"<<endl;
+    return NGHTTP2_ERR_CALLBACK_FAILURE;
+  }
+  stream->second.d_buffer.insert(stream->second.d_buffer.end(), data, data + len);
+  if (stream->second.d_finished) {
+    cerr<<"we now have the full response!"<<endl;
+    auto request = std::move(stream->second);
+    conn->d_currentStreams.erase(stream->first);
+    conn->handleResponse(std::move(request));
+    cerr<<std::string(reinterpret_cast<const char*>(data), len)<<endl;
+  }
+  else {
+    cerr<<"but the stream is not finished yet"<<endl;
+  }
+
   return 0;
 }
 
-static int on_stream_close_callback(nghttp2_session* session, int32_t stream_id, uint32_t error_code, void* user_data) {
+int DoHConnectionToBackend::on_stream_close_callback(nghttp2_session* session, int32_t stream_id, uint32_t error_code, void* user_data) {
   cerr<<"in "<<__PRETTY_FUNCTION__<<endl;
-  MyUserData* userData = reinterpret_cast<MyUserData*>(user_data);
+  //DoHConnectionToBackend* userData = reinterpret_cast<DoHConnectionToBackend*>(user_data);
 
   cerr<<"Stream "<<stream_id<<" closed with error_code="<<error_code<<endl;
   auto rv = nghttp2_session_terminate_session(session, NGHTTP2_NO_ERROR);
@@ -75,9 +423,9 @@ static int on_stream_close_callback(nghttp2_session* session, int32_t stream_id,
   return 0;
 }
 
-static int on_header_callback(nghttp2_session* session, const nghttp2_frame* frame, const uint8_t* name, size_t namelen, const uint8_t* value, size_t valuelen, uint8_t flags, void* user_data) {
+int DoHConnectionToBackend::on_header_callback(nghttp2_session* session, const nghttp2_frame* frame, const uint8_t* name, size_t namelen, const uint8_t* value, size_t valuelen, uint8_t flags, void* user_data) {
   cerr<<"in "<<__PRETTY_FUNCTION__<<endl;
-  MyUserData* userData = reinterpret_cast<MyUserData*>(user_data);
+  //DoHConnectionToBackend* userData = reinterpret_cast<DoHConnectionToBackend*>(user_data);
 
   switch (frame->hd.type) {
   case NGHTTP2_HEADERS:
@@ -92,9 +440,9 @@ static int on_header_callback(nghttp2_session* session, const nghttp2_frame* fra
   return 0;
 }
 
-static int on_begin_headers_callback(nghttp2_session* session, const nghttp2_frame* frame, void* user_data) {
+int DoHConnectionToBackend::on_begin_headers_callback(nghttp2_session* session, const nghttp2_frame* frame, void* user_data) {
   cerr<<"in "<<__PRETTY_FUNCTION__<<endl;
-  MyUserData* userData = reinterpret_cast<MyUserData*>(user_data);
+  //DoHConnectionToBackend* userData = reinterpret_cast<DoHConnectionToBackend*>(user_data);
 
   switch (frame->hd.type) {
   case NGHTTP2_HEADERS:
@@ -106,25 +454,34 @@ static int on_begin_headers_callback(nghttp2_session* session, const nghttp2_fra
   return 0;
 }
 
-static void doReadData(MyUserData& userData)
+int DoHConnectionToBackend::on_error_callback(nghttp2_session* session, int lib_error_code, const char* msg, size_t len, void* user_data) {
+  cerr<<"in "<<__PRETTY_FUNCTION__<<endl;
+  cerr<<"Error is "<<std::string(msg, len)<<endl;
+  //DoHConnectionToBackend* userData = reinterpret_cast<DoHConnectionToBackend*>(user_data);
+
+  return 0;
+}
+
+#if 0
+static void doReadData(DoHConnectionToBackend& userData)
 {
   do {
     size_t pos = 0;
-    userData.in.resize(512);
-    cerr<<"trying to read "<<userData.in.size()<<endl;
+    userData.d_in.resize(512);
+    cerr<<"trying to read "<<userData.d_in.size()<<endl;
     try {
-      pos = userData.handler->read(userData.in.data(), userData.in.size(), timeval{2, 0}, timeval{2, 0}, true);
-      // userData.handler->tryRead(userData.in, pos, userData.in.size());
+      pos = userData.d_handler->read(userData.d_in.data(), userData.d_in.size(), timeval{2, 0}, timeval{2, 0}, true);
+      // userData.d_handler->tryRead(userData.d_in, pos, userData.d_in.size());
       cerr<<"got "<<pos<<endl;
-      userData.in.resize(pos);
+      userData.d_in.resize(pos);
       if (pos > 0) {
-        auto readlen = nghttp2_session_mem_recv(userData.session.get(), userData.in.data(), pos);
+        auto readlen = nghttp2_session_mem_recv(userData.d_session.get(), userData.d_in.data(), pos);
         cerr<<"nghttp2_session_mem_recv returned "<<readlen<<endl;
         if (readlen < 0) {
           cerr<<"Fatal error: "<<nghttp2_strerror((int)readlen)<<endl;
           return;
         }
-        int rv = nghttp2_session_send(userData.session.get());
+        int rv = nghttp2_session_send(userData.d_session.get());
         cerr<<"nghttp2_session_send returned "<<rv<<endl;
       }
       else {
@@ -139,17 +496,6 @@ static void doReadData(MyUserData& userData)
   while (true);
 }
 
-#define MAKE_NV(NAME, VALUE, VALUELEN)                                         \
-  {                                                                            \
-    (uint8_t *)NAME, (uint8_t *)VALUE, sizeof(NAME) - 1, VALUELEN,             \
-        NGHTTP2_NV_FLAG_NONE                                                   \
-  }
-
-#define MAKE_NV2(NAME, VALUE)                                                  \
-  {                                                                            \
-    (uint8_t *)NAME, (uint8_t *)VALUE, sizeof(NAME) - 1, sizeof(VALUE) - 1,    \
-        NGHTTP2_NV_FLAG_NONE                                                   \
-  }
 void sendHTTP2Query()
 {
   auto remote = ComboAddress("9.9.9.11:443");
@@ -163,9 +509,9 @@ void sendHTTP2Query()
   // FIXME
   auto fd = sock.getHandle();
   setTCPNoDelay(fd);
-  MyUserData userData;
-  userData.handler = std::make_unique<TCPIOHandler>(host, sock.releaseHandle(), timeval{2, 0}, tlsCtx, time(nullptr));
-  userData.handler->connect(true, remote, timeval{2, 0});
+  DoHConnectionToBackend userData;
+  userData.d_handler = std::make_unique<TCPIOHandler>(host, sock.releaseHandle(), timeval{2, 0}, tlsCtx, time(nullptr));
+  userData.d_handler->connect(true, remote, timeval{2, 0});
 
   /* check ALPN:
 SSL_get0_next_proto_negotiated(ssl, &alpn, &alpnlen);
@@ -203,7 +549,7 @@ SSL_get0_next_proto_negotiated(ssl, &alpn, &alpnlen);
     return;
   }
 
-  userData.session = std::unique_ptr<nghttp2_session, void(*)(nghttp2_session*)>(sess, nghttp2_session_del);
+  userData.d_session = std::unique_ptr<nghttp2_session, void(*)(nghttp2_session*)>(sess, nghttp2_session_del);
   sess = nullptr;
 
   callbacks.reset();
@@ -216,20 +562,20 @@ SSL_get0_next_proto_negotiated(ssl, &alpn, &alpnlen);
     {NGHTTP2_SETTINGS_INITIAL_WINDOW_SIZE, 16*1024*1024}
   };
    /* client 24 bytes magic string will be sent by nghttp2 library */
-  int rv = nghttp2_submit_settings(userData.session.get(), NGHTTP2_FLAG_NONE, iv, sizeof(iv)/sizeof(*iv));
+  int rv = nghttp2_submit_settings(userData.d_session.get(), NGHTTP2_FLAG_NONE, iv, sizeof(iv)/sizeof(*iv));
   if (rv != 0) {
     cerr<<"Could not submit SETTINGS: "<<nghttp2_strerror(rv)<<endl;
     return;
   }
 
-  GenericDNSPacketWriter<PacketBuffer> pw(userData.in, DNSName("doh.dnsdist.org."), QType::A, QClass::IN, 0);
+  GenericDNSPacketWriter<PacketBuffer> pw(userData.d_in, DNSName("doh.dnsdist.org."), QType::A, QClass::IN, 0);
   pw.getHeader()->rd = 1;
   pw.commit();
 
   /* we could use nghttp2_nv_flag.NGHTTP2_NV_FLAG_NO_COPY_NAME and nghttp2_nv_flag.NGHTTP2_NV_FLAG_NO_COPY_VALUE
      to avoid a copy and lowercasing as long as we take care of making sure that the data will outlive the request
      and that it is already lowercased. */
-  auto payloadSize = std::to_string(userData.in.size());
+  auto payloadSize = std::to_string(userData.d_in.size());
   const nghttp2_nv hdrs[] = {
       MAKE_NV2(":method", "POST"),
       MAKE_NV2(":scheme", "https"),
@@ -243,40 +589,380 @@ SSL_get0_next_proto_negotiated(ssl, &alpn, &alpnlen);
 
   /* f data_prd is not NULL, it provides data which will be sent in subsequent DATA frames. In this case, a method that allows request message bodies (https://tools.ietf.org/html/rfc7231#section-4) must be specified with :method key in nva (e.g. POST). This function does not take ownership of the data_prd. The function copies the members of the data_prd. If data_prd is NULL, HEADERS have END_STREAM set
    */
-  cerr<<"Remote size window is "<<nghttp2_session_get_remote_window_size(userData.session.get())<<endl;
+  cerr<<"Remote size window is "<<nghttp2_session_get_remote_window_size(userData.d_session.get())<<endl;
 
   nghttp2_data_provider data_provider;
   data_provider.source.ptr = &userData;
   data_provider.read_callback = [](nghttp2_session* session, int32_t stream_id, uint8_t* buf, size_t length, uint32_t* data_flags, nghttp2_data_source* source, void* user_data) -> ssize_t
   {
     cerr<<"in data provider"<<endl;
-    auto userData = reinterpret_cast<MyUserData*>(user_data);
-    if (userData->inPos >= userData->in.size()) {
+    auto userData = reinterpret_cast<DoHConnectionToBackend*>(user_data);
+    if (userData->d_inPos >= userData->d_in.size()) {
        *data_flags |= NGHTTP2_DATA_FLAG_EOF;
        cerr<<"EOF"<<endl;
        return 0;
     }
-    size_t remaining = userData->in.size()- userData->inPos;
+    size_t remaining = userData->d_in.size()- userData->d_inPos;
     size_t toCopy = length > remaining ? remaining : length;
-    memcpy(buf, &userData->in.at(userData->inPos), toCopy);
-    userData->inPos += toCopy;
+    memcpy(buf, &userData->d_in.at(userData->d_inPos), toCopy);
+    userData->d_inPos += toCopy;
     cerr<<toCopy<<" written"<<endl;
     return toCopy;
   };
 
-  auto stream_id = nghttp2_submit_request(userData.session.get(), nullptr, hdrs, sizeof(hdrs)/sizeof(*hdrs), &data_provider, &userData);
+  auto stream_id = nghttp2_submit_request(userData.d_session.get(), nullptr, hdrs, sizeof(hdrs)/sizeof(*hdrs), &data_provider, &userData);
   if (stream_id < 0) {
     cerr<<"Could not submit HTTP request: "<<nghttp2_strerror(stream_id)<<endl;
     return;
   }
-  rv = nghttp2_session_send(userData.session.get());
+  rv = nghttp2_session_send(userData.d_session.get());
 
   setNonBlocking(fd);
 
   doReadData(userData);
-  cerr<<"After reading data, remote size window is "<<nghttp2_session_get_remote_window_size(userData.session.get())<<endl;
-  cerr<<"Max number of streams from remote is "<<nghttp2_session_get_remote_settings(userData.session.get(), NGHTTP2_SETTINGS_MAX_CONCURRENT_STREAMS)<<endl;
-  cerr<<"our own is "<<nghttp2_session_get_local_settings(userData.session.get(), NGHTTP2_SETTINGS_MAX_CONCURRENT_STREAMS)<<endl;
+  cerr<<"After reading data, remote size window is "<<nghttp2_session_get_remote_window_size(userData.d_session.get())<<endl;
+  cerr<<"Max number of streams from remote is "<<nghttp2_session_get_remote_settings(userData.d_session.get(), NGHTTP2_SETTINGS_MAX_CONCURRENT_STREAMS)<<endl;
+  cerr<<"our own is "<<nghttp2_session_get_local_settings(userData.d_session.get(), NGHTTP2_SETTINGS_MAX_CONCURRENT_STREAMS)<<endl;
   // min(nghttp2_session_get_stream_remote_window_size(), nghttp2_session_get_remote_window_size())
 #warning for later: how do we know how many streams are left? the window size?
 }
+#endif
+
+DoHConnectionToBackend::DoHConnectionToBackend(std::shared_ptr<DownstreamState> ds, std::unique_ptr<FDMultiplexer>& mplexer, const struct timeval& now): TCPConnectionToBackend(ds, mplexer, now)
+{
+  // inherit most of the stuff from the TCPConnectionToBackend()
+
+  /* check ALPN:
+SSL_get0_next_proto_negotiated(ssl, &alpn, &alpnlen);
+#if OPENSSL_VERSION_NUMBER >= 0x10002000L
+    if (alpn == NULL) {
+      SSL_get0_alpn_selected(ssl, &alpn, &alpnlen);
+    }
+#endif // OPENSSL_VERSION_NUMBER >= 0x10002000L
+
+    if (alpn == NULL || alpnlen != 2 || memcmp("h2", alpn, 2) != 0) {
+      fprintf(stderr, "h2 is not negotiated\n");
+      delete_http2_session_data(session_data);
+      return;
+    }
+  */
+  d_ioState = make_unique<IOStateHandler>(*d_mplexer, d_handler->getDescriptor());
+
+  nghttp2_session_callbacks* cbs = nullptr;
+  if (nghttp2_session_callbacks_new(&cbs) != 0) {
+    cerr<<"unable to create a callback object for a new HTTP/2 session"<<endl;
+    return;
+  }
+  std::unique_ptr<nghttp2_session_callbacks, void(*)(nghttp2_session_callbacks*)> callbacks(cbs, nghttp2_session_callbacks_del);
+  cbs = nullptr;
+
+  nghttp2_session_callbacks_set_send_callback(callbacks.get(), send_callback);
+  nghttp2_session_callbacks_set_on_frame_recv_callback(callbacks.get(), on_frame_recv_callback);
+  nghttp2_session_callbacks_set_on_data_chunk_recv_callback(callbacks.get(), on_data_chunk_recv_callback);
+  nghttp2_session_callbacks_set_on_stream_close_callback(callbacks.get(), on_stream_close_callback);
+  nghttp2_session_callbacks_set_on_header_callback(callbacks.get(), on_header_callback);
+  nghttp2_session_callbacks_set_on_begin_headers_callback(callbacks.get(), on_begin_headers_callback);
+  nghttp2_session_callbacks_set_error_callback2(callbacks.get(), on_error_callback);
+
+  nghttp2_session* sess = nullptr;
+  if (nghttp2_session_client_new(&sess, callbacks.get(), this) != 0) {
+    cerr<<"Coult not allocate a new HTTP/2 session"<<endl;
+    return;
+  }
+
+  d_session = std::unique_ptr<nghttp2_session, void(*)(nghttp2_session*)>(sess, nghttp2_session_del);
+  sess = nullptr;
+
+  callbacks.reset();
+
+#warning we should make the 100 configurable here, as we might want a lower number before receiving the one actually supported by the server
+#warning we should also make the window size configurable, but 16M is a nice default
+  nghttp2_settings_entry iv[] = {
+    {NGHTTP2_SETTINGS_MAX_CONCURRENT_STREAMS, 100},
+    {NGHTTP2_SETTINGS_ENABLE_PUSH, 0},
+    {NGHTTP2_SETTINGS_INITIAL_WINDOW_SIZE, 16*1024*1024}
+  };
+   /* client 24 bytes magic string will be sent by nghttp2 library */
+  int rv = nghttp2_submit_settings(d_session.get(), NGHTTP2_FLAG_NONE, iv, sizeof(iv)/sizeof(*iv));
+  if (rv != 0) {
+    cerr<<"Could not submit SETTINGS: "<<nghttp2_strerror(rv)<<endl;
+    return;
+  }
+}
+
+class DownstreamDoHConnectionsManager
+{
+public:
+  static std::shared_ptr<DoHConnectionToBackend> getConnectionToDownstream(std::unique_ptr<FDMultiplexer>& mplexer, std::shared_ptr<DownstreamState>& ds, const struct timeval& now);
+  static void releaseDownstreamConnection(std::shared_ptr<DoHConnectionToBackend>&& conn);
+  static void cleanupClosedConnections(struct timeval now);
+  static size_t clear();
+
+  static void setMaxCachedConnectionsPerDownstream(size_t max)
+  {
+    s_maxCachedConnectionsPerDownstream = max;
+  }
+
+  static void setCleanupInterval(uint16_t interval)
+  {
+    s_cleanupInterval = interval;
+  }
+
+private:
+  static thread_local map<boost::uuids::uuid, std::deque<std::shared_ptr<DoHConnectionToBackend>>> t_downstreamConnections;
+  static size_t s_maxCachedConnectionsPerDownstream;
+  static time_t s_nextCleanup;
+  static uint16_t s_cleanupInterval;
+};
+
+struct DoHClientCollection::DoHWorkerThread
+{
+  DoHWorkerThread()
+  {
+  }
+
+  DoHWorkerThread(int crossProtocolPipe): d_crossProtocolQueryPipe(crossProtocolPipe)
+  {
+  }
+
+  DoHWorkerThread(DoHWorkerThread&& rhs): d_crossProtocolQueryPipe(rhs.d_crossProtocolQueryPipe)
+  {
+    rhs.d_crossProtocolQueryPipe = -1;
+  }
+
+  DoHWorkerThread& operator=(DoHWorkerThread&& rhs)
+  {
+    if (d_crossProtocolQueryPipe != -1) {
+      close(d_crossProtocolQueryPipe);
+    }
+
+    d_crossProtocolQueryPipe = rhs.d_crossProtocolQueryPipe;
+    rhs.d_crossProtocolQueryPipe = -1;
+
+    return *this;
+  }
+
+  DoHWorkerThread(const DoHWorkerThread& rhs) = delete;
+  DoHWorkerThread& operator=(const DoHWorkerThread&) = delete;
+
+  ~DoHWorkerThread()
+  {
+    if (d_crossProtocolQueryPipe != -1) {
+      close(d_crossProtocolQueryPipe);
+    }
+  }
+
+  int d_crossProtocolQueryPipe{-1};
+};
+
+DoHClientCollection::DoHClientCollection(size_t maxThreads): d_clientThreads(maxThreads), d_maxThreads(maxThreads)
+{
+}
+
+bool DoHClientCollection::passCrossProtocolQueryToThread(std::unique_ptr<CrossProtocolQuery>&& cpq)
+{
+  if (d_numberOfThreads == 0) {
+    throw std::runtime_error("No DoH worker thread yet");
+  }
+
+  uint64_t pos = d_pos++;
+  auto pipe = d_clientThreads.at(pos % d_numberOfThreads).d_crossProtocolQueryPipe;
+  auto tmp = cpq.release();
+
+  if (write(pipe, &tmp, sizeof(tmp)) != sizeof(tmp)) {
+    delete tmp;
+    tmp = nullptr;
+    return false;
+  }
+
+  return true;
+}
+
+std::shared_ptr<DoHConnectionToBackend> DownstreamDoHConnectionsManager::getConnectionToDownstream(std::unique_ptr<FDMultiplexer>& mplexer, std::shared_ptr<DownstreamState>& ds, const struct timeval& now)
+{
+  return std::make_shared<DoHConnectionToBackend>(ds, mplexer, now);
+}
+
+static void handleCrossProtocolQuery(int pipefd, FDMultiplexer::funcparam_t& param)
+{
+  auto threadData = boost::any_cast<DoHClientThreadData*>(param);
+  CrossProtocolQuery* tmp{nullptr};
+
+  ssize_t got = read(pipefd, &tmp, sizeof(tmp));
+  if (got == 0) {
+    throw std::runtime_error("EOF while reading from the DoH cross-protocol pipe (" + std::to_string(pipefd) + ") in " + std::string(isNonBlocking(pipefd) ? "non-blocking" : "blocking") + " mode");
+  }
+  else if (got == -1) {
+    if (errno == EAGAIN || errno == EINTR) {
+      return;
+    }
+    throw std::runtime_error("Error while reading from the DoH cross-protocol pipe (" + std::to_string(pipefd) + ") in " + std::string(isNonBlocking(pipefd) ? "non-blocking" : "blocking") + " mode:" + stringerror());
+  }
+  else if (got != sizeof(tmp)) {
+    throw std::runtime_error("Partial read while reading from the DoH cross-protocol pipe (" + std::to_string(pipefd) + ") in " + std::string(isNonBlocking(pipefd) ? "non-blocking" : "blocking") + " mode");
+  }
+
+  try {
+    struct timeval now;
+    gettimeofday(&now, nullptr);
+
+    std::shared_ptr<TCPQuerySender> tqs = tmp->getTCPQuerySender();
+    auto query = std::move(tmp->query);
+    auto downstreamServer = std::move(tmp->downstream);
+    delete tmp;
+    tmp = nullptr;
+
+    auto downstream = DownstreamDoHConnectionsManager::getConnectionToDownstream(threadData->mplexer, downstreamServer, now);
+    
+#warning what about the proxy protocol payload, here, do we need to remove it? we likely need to handle forward-for headers?
+    downstream->queueQuery(tqs, std::move(query));
+  }
+  catch (...) {
+    delete tmp;
+    tmp = nullptr;
+    throw;
+  }
+}
+
+static void dohClientThread(int crossProtocolPipeFD)
+{
+  setThreadName("dnsdist/dohClie");
+
+  DoHClientThreadData data;
+
+  data.mplexer->addReadFD(crossProtocolPipeFD, handleCrossProtocolQuery, &data);
+
+  struct timeval now;
+  gettimeofday(&now, nullptr);
+  time_t lastTimeoutScan = now.tv_sec;
+
+  for (;;) {
+    data.mplexer->run(&now);
+
+    if (now.tv_sec > lastTimeoutScan) {
+      lastTimeoutScan = now.tv_sec;
+      auto expiredReadConns = data.mplexer->getTimeouts(now, false);
+      for (const auto& cbData : expiredReadConns) {
+        if (cbData.second.type() == typeid(std::shared_ptr<DoHConnectionToBackend>)) {
+          auto conn = boost::any_cast<std::shared_ptr<DoHConnectionToBackend>>(cbData.second);
+          vinfolog("Timeout (read) from remote DoH backend %s", conn->getBackendName());
+          conn->handleTimeout(now, false);
+        }
+      }
+
+      auto expiredWriteConns = data.mplexer->getTimeouts(now, true);
+      for (const auto& cbData : expiredWriteConns) {
+        if (cbData.second.type() == typeid(std::shared_ptr<DoHConnectionToBackend>)) {
+          auto conn = boost::any_cast<std::shared_ptr<DoHConnectionToBackend>>(cbData.second);
+          vinfolog("Timeout (write) from remote DoH backend %s", conn->getBackendName());
+          conn->handleTimeout(now, true);
+        }
+      }
+
+      if (g_dohStatesDumpRequested > 0) {
+        /* just to keep things clean in the output, debug only */
+        static std::mutex s_lock;
+        std::lock_guard<decltype(s_lock)> lck(s_lock);
+        if (g_dohStatesDumpRequested > 0) {
+          /* no race here, we took the lock so it can only be increased in the meantime */
+          --g_dohStatesDumpRequested;
+          errlog("Dumping the DoH client states, as requested:");
+          data.mplexer->runForAllWatchedFDs([](bool isRead, int fd, const FDMultiplexer::funcparam_t& param, struct timeval ttd)
+          {
+            struct timeval lnow;
+            gettimeofday(&lnow, nullptr);
+            if (ttd.tv_sec > 0) {
+            errlog("- Descriptor %d is in %s state, TTD in %d", fd, (isRead ? "read" : "write"), (ttd.tv_sec-lnow.tv_sec));
+            }
+            else {
+              errlog("- Descriptor %d is in %s state, no TTD set", fd, (isRead ? "read" : "write"));
+            }
+
+            if (param.type() == typeid(std::shared_ptr<DoHConnectionToBackend>)) {
+              auto conn = boost::any_cast<std::shared_ptr<DoHConnectionToBackend>>(param);
+              errlog(" - %s", conn->toString());
+            }
+            else if (param.type() == typeid(DoHClientThreadData*)) {
+              errlog(" - Worker thread pipe");
+            }
+          });
+        }
+      }
+    }
+  }
+}
+
+void DoHClientCollection::addThread()
+{
+  auto preparePipe = [](int fds[2], const std::string& type) -> bool {
+    if (pipe(fds) < 0) {
+      errlog("Error creating the DoH thread %s pipe: %s", type, stringerror());
+      return false;
+    }
+
+    if (!setNonBlocking(fds[0])) {
+      int err = errno;
+      close(fds[0]);
+      close(fds[1]);
+      errlog("Error setting the DoH thread %s pipe non-blocking: %s", type, stringerror(err));
+      return false;
+    }
+
+    if (!setNonBlocking(fds[1])) {
+      int err = errno;
+      close(fds[0]);
+      close(fds[1]);
+      errlog("Error setting the DoH thread %s pipe non-blocking: %s", type, stringerror(err));
+      return false;
+    }
+
+    if (g_tcpInternalPipeBufferSize > 0 && getPipeBufferSize(fds[0]) < g_tcpInternalPipeBufferSize) {
+      setPipeBufferSize(fds[0], g_tcpInternalPipeBufferSize);
+    }
+
+    return true;
+  };
+
+  int crossProtocolFDs[2] = { -1, -1};
+  if (!preparePipe(crossProtocolFDs, "cross-protocol")) {
+    return;
+  }
+
+  vinfolog("Adding DoH Client thread");
+
+  {
+    std::lock_guard<std::mutex> lock(d_mutex);
+
+    if (d_numberOfThreads >= d_clientThreads.size()) {
+      vinfolog("Adding a new DoH client thread would exceed the vector size (%d/%d), skipping. Consider increasing the maximum amount of DoH client threads with setMaxDoHClientThreads() in the configuration.", d_numberOfThreads.load(), d_clientThreads.size());
+      close(crossProtocolFDs[0]);
+      close(crossProtocolFDs[1]);
+      return;
+    }
+
+    /* from now on this side of the pipe will be managed by that object,
+       no need to worry about it */
+    DoHWorkerThread worker(crossProtocolFDs[1]);
+    try {
+      std::thread t1(dohClientThread, crossProtocolFDs[0]);
+      t1.detach();
+    }
+    catch (const std::runtime_error& e) {
+      /* the thread creation failed, don't leak */
+      errlog("Error creating a DoH thread: %s", e.what());
+      close(crossProtocolFDs[0]);
+      return;
+    }
+
+    d_clientThreads.at(d_numberOfThreads) = std::move(worker);
+    ++d_numberOfThreads;
+  }
+}
+
+bool initDoHWorkers()
+{
+#warning FIXME: number of DoH threads
+  g_dohClientThreads = std::make_unique<DoHClientCollection>(1);
+  g_dohClientThreads->addThread();
+  return true;
+}
index 49ce32361655e5d038b54faf895e9dc7ad851a2f..97137358236bd0b5067ce23dca65480de49dcdd3 100644 (file)
@@ -1,3 +1,63 @@
+/*
+ * This file is part of PowerDNS or dnsdist.
+ * Copyright -- PowerDNS.COM B.V. and its contributors
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of version 2 of the GNU General Public License as
+ * published by the Free Software Foundation.
+ *
+ * In addition, for the avoidance of any doubt, permission is granted to
+ * link this program with OpenSSL and to (re)distribute the binaries
+ * produced as the result of such linking.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
+ */
 #pragma once
 
-void sendHTTP2Query();
+#include <memory>
+#include <mutex>
+#include <vector>
+
+#include "stat_t.hh"
+
+struct CrossProtocolQuery;
+
+class DoHClientCollection
+{
+public:
+  DoHClientCollection(size_t maxThreads);
+
+  bool hasReachedMaxThreads() const
+  {
+    return d_numberOfThreads >= d_maxThreads;
+  }
+
+  uint64_t getThreadsCount() const
+  {
+    return d_numberOfThreads;
+  }
+
+  bool passCrossProtocolQueryToThread(std::unique_ptr<CrossProtocolQuery>&& cpq);
+  void addThread();
+
+private:
+  struct DoHWorkerThread;
+
+  std::mutex d_mutex;
+  std::vector<DoHWorkerThread> d_clientThreads;
+  pdns::stat_t d_numberOfThreads{0};
+  pdns::stat_t d_pos{0};
+  const uint64_t d_maxThreads{0};
+};
+
+extern std::unique_ptr<DoHClientCollection> g_dohClientThreads;
+extern std::atomic<uint64_t> g_dohStatesDumpRequested;
+
+bool initDoHWorkers();
index d480b38935c6c8f268887fa5c00fdec609f504d0..2201f2df4c18a46ce43a64a61cf5420ad6ceaefd 100644 (file)
@@ -225,8 +225,7 @@ void TCPConnectionToBackend::handleIO(std::shared_ptr<TCPConnectionToBackend>& c
             conn->d_pendingResponses.clear();
             conn->d_currentPos = 0;
 
-            if (conn->d_state == State::doingHandshake ||
-                conn->d_state == State::sendingQueryToBackend) {
+            if (conn->d_state == State::sendingQueryToBackend) {
               iostate = IOState::NeedWrite;
               // resume sending query
             }
@@ -295,7 +294,7 @@ void TCPConnectionToBackend::handleIOCallback(int fd, FDMultiplexer::funcparam_t
   }
 
   struct timeval now;
-  gettimeofday(&now, 0);
+  gettimeofday(&now, nullptr);
   handleIO(conn, now);
 }
 
@@ -309,7 +308,7 @@ void TCPConnectionToBackend::queueQuery(std::shared_ptr<TCPQuerySender>& sender,
     throw std::runtime_error("Assigning a query from a different client to an existing backend connection with pending queries");
   }
 
-  // if we are not already sending a query or in the middle of reading a response (so idle or doingHandshake),
+  // if we are not already sending a query or in the middle of reading a response (so idle),
   // start sending the query
   if (d_state == State::idle || d_state == State::waitingForResponseFromBackend) {
     DEBUGLOG("Sending new query to backend right away");
@@ -615,7 +614,7 @@ IOState TCPConnectionToBackend::handleResponse(std::shared_ptr<TCPConnectionToBa
   }
 }
 
-uint16_t TCPConnectionToBackend::getQueryIdFromResponse()
+uint16_t TCPConnectionToBackend::getQueryIdFromResponse() const
 {
   if (d_responseBuffer.size() < sizeof(dnsheader)) {
     throw std::runtime_error("Unable to get query ID in a too small (" + std::to_string(d_responseBuffer.size()) + ") response from " + d_ds->getNameWithAddr());
index 3745de96571d7cb34b431233b2166854f709c0c2..9301ad70d89c2a12632a7b8f0ae526754e1a2115 100644 (file)
@@ -15,7 +15,7 @@ public:
     reconnect();
   }
 
-  ~TCPConnectionToBackend();
+  virtual ~TCPConnectionToBackend();
 
   int getHandle() const
   {
@@ -112,8 +112,8 @@ public:
     return ds == d_ds;
   }
 
-  void queueQuery(std::shared_ptr<TCPQuerySender>& sender, TCPQuery&& query);
-  void handleTimeout(const struct timeval& now, bool write);
+  virtual void queueQuery(std::shared_ptr<TCPQuerySender>& sender, TCPQuery&& query);
+  virtual void handleTimeout(const struct timeval& now, bool write);
   void release();
 
   void setProxyProtocolValuesSent(std::unique_ptr<std::vector<ProxyProtocolValue>>&& proxyProtocolValuesSent);
@@ -123,17 +123,17 @@ public:
     return d_lastDataReceivedTime;
   }
 
-  std::string toString() const
+  virtual std::string toString() const
   {
     ostringstream o;
     o << "TCP connection to backend "<<(d_ds ? d_ds->getName() : "empty")<<" over FD "<<(d_handler ? std::to_string(d_handler->getDescriptor()) : "no socket")<<", state is "<<(int)d_state<<", io state is "<<(d_ioState ? std::to_string((int)d_ioState->getState()) : "empty")<<", queries count is "<<d_queries<<", pending queries count is "<<d_pendingQueries.size()<<", "<<d_pendingResponses.size()<<" pending responses, linked to "<<(d_sender ? " a client" : "no client");
     return o.str();
   }
 
-private:
+protected:
   /* waitingForResponseFromBackend is a state where we have not yet started reading the size,
      so we can still switch to sending instead */
-  enum class State : uint8_t { idle, doingHandshake, sendingQueryToBackend, waitingForResponseFromBackend, readingResponseSizeFromBackend, readingResponseFromBackend };
+  enum class State : uint8_t { idle, sendingQueryToBackend, waitingForResponseFromBackend, readingResponseSizeFromBackend, readingResponseFromBackend };
   enum class FailureReason : uint8_t { /* too many attempts */ gaveUp, timeout, unexpectedQueryID };
 
   static void handleIO(std::shared_ptr<TCPConnectionToBackend>& conn, const struct timeval& now);
@@ -143,7 +143,7 @@ private:
   static bool isXFRFinished(const TCPResponse& response, TCPQuery& query);
 
   IOState handleResponse(std::shared_ptr<TCPConnectionToBackend>& conn, const struct timeval& now);
-  uint16_t getQueryIdFromResponse();
+  uint16_t getQueryIdFromResponse() const;
   bool reconnect();
   void notifyAllQueriesFailed(const struct timeval& now, FailureReason reason);
   bool needProxyProtocolPayload() const
@@ -197,6 +197,7 @@ private:
   }
 
   PacketBuffer d_responseBuffer;
+#warning we do not need this and could append to the outgoing buffer right away but is it better?
   std::deque<TCPQuery> d_pendingQueries;
   std::unordered_map<uint16_t, TCPQuery> d_pendingResponses;
   std::unique_ptr<FDMultiplexer>& d_mplexer;
index 99da15c094350f6a9b9a92140fd65ee6d4b93433..e48c599f644c8fa2923c0fa6962c1679b7993ad8 100644 (file)
  */
 #pragma once
 
+#include <unistd.h>
+#include "iputils.hh"
+#include "dnsdist.hh"
+
 struct ConnectionInfo
 {
   ConnectionInfo(ClientState* cs_) :