]> git.ipfire.org Git - thirdparty/pdns.git/commitdiff
dnsdist: add simple DoQ tests, misc cleanup, removes unnecessary chans/threads
authorCharles-Henri Bruyand <charles-henri.bruyand@open-xchange.com>
Thu, 14 Sep 2023 13:59:06 +0000 (15:59 +0200)
committerRemi Gacogne <remi.gacogne@powerdns.com>
Mon, 9 Oct 2023 11:36:45 +0000 (13:36 +0200)
pdns/dnsdist-lua.cc
pdns/dnsdistdist/docs/reference/config.rst
pdns/dnsdistdist/doq.cc
pdns/dnsdistdist/doq.hh
regression-tests.dnsdist/dnsdisttests.py
regression-tests.dnsdist/requirements.txt
regression-tests.dnsdist/test_DOQ.py [new file with mode: 0644]

index 39a30ba053c38a8db5cf524e1058b2880456e230..6d39d29e0d5a754eeaa17082eb10cc345c7c07e7 100644 (file)
@@ -2510,9 +2510,6 @@ static void setupLuaConfig(LuaContext& luaCtx, bool client, bool configCheck)
 
     if (vars) {
       parseLocalBindVars(vars, reusePort, tcpFastOpenQueueSize, interface, cpus, tcpListenQueueSize, maxInFlightQueriesPerConn, tcpMaxConcurrentConnections);
-//      getOptionalValue<int>(vars, "idleTimeout", frontend->d_idleTimeout);
-
-      getOptionalValue<int>(vars, "internalPipeBufferSize", frontend->d_internalPipeBufferSize);
 
       parseTLSConfig(frontend->d_tlsConfig, "addDOQLocal", vars);
 
index fbb8e73f77645ce30b9a5aa269760b2c5bffe0c4..6684b3554a4059bbcd7ccf08cde05c2c697314b9 100644 (file)
@@ -189,7 +189,6 @@ Listen Sockets
   * ``interface=""``: str - Set the network interface to use.
   * ``cpus={}``: table - Set the CPU affinity for this listener thread, asking the scheduler to run it on a single CPU id, or a set of CPU ids. This parameter is only available if the OS provides the pthread_setaffinity_np() function.
   * ``idleTimeout=30``: int - Set the idle timeout, in seconds.
-  * ``internalPipeBufferSize=0``: int - Set the size in bytes of the internal buffer of the pipes used internally to pass queries and responses between threads. Requires support for ``F_SETPIPE_SZ`` which is present in Linux since 2.6.35. The actual size might be rounded up to a multiple of a page size. 0 means that the OS default size is used. The default value is 0, except on Linux where it is 1048576 since 1.6.0.
 
 .. function:: addTLSLocal(address, certFile(s), keyFile(s) [, options])
 
index c8e8bbe04bc58c9a2701bd2fccaba63c5c5a8a71..b0e05d6a459bb741998aa810409b6f11d96e8ba9 100644 (file)
 #include "dnsdist-proxy-protocol.hh"
 
 
+static void sendBackDOQUnit(DOQUnitUniquePtr&& du, const char* description);
 class DOQServerConfig
 {
 public:
-  DOQServerConfig(std::unique_ptr<quiche_config, decltype(&quiche_config_free)>&& config_, uint32_t internalPipeBufferSize) :
+  DOQServerConfig(std::unique_ptr<quiche_config, decltype(&quiche_config_free)>&& config_) :
     config(std::move(config_))
   {
-    {
-      auto [sender, receiver] = pdns::channel::createObjectQueue<DOQUnit>(pdns::channel::SenderBlockingMode::SenderNonBlocking, pdns::channel::ReceiverBlockingMode::ReceiverBlocking, internalPipeBufferSize);
-      d_querySender = std::move(sender);
-      d_queryReceiver = std::move(receiver);
-    }
-    {
-      auto [sender, receiver] = pdns::channel::createObjectQueue<DOQUnit>(pdns::channel::SenderBlockingMode::SenderNonBlocking, pdns::channel::ReceiverBlockingMode::ReceiverBlocking, internalPipeBufferSize);
-      d_responseSender = std::move(sender);
-      d_responseReceiver = std::move(receiver);
-    }
   }
   DOQServerConfig(const DOQServerConfig&) = delete;
   DOQServerConfig(DOQServerConfig&&) = default;
@@ -60,13 +51,9 @@ public:
   QuicheConfig config;
   ClientState* cs{nullptr};
   std::shared_ptr<DOQFrontend> df{nullptr};
-  pdns::channel::Sender<DOQUnit> d_querySender;
-  pdns::channel::Receiver<DOQUnit> d_queryReceiver;
-  pdns::channel::Sender<DOQUnit> d_responseSender;
-  pdns::channel::Receiver<DOQUnit> d_responseReceiver;
 };
 
-#if 1
+#if 0
 #define DEBUGLOG_ENABLED
 #define DEBUGLOG(x) std::cerr<<x<<std::endl;
 #else
@@ -79,22 +66,6 @@ static constexpr size_t TOKEN_LEN = 32; /* check if this needs to be authenticat
 
 static std::map<PacketBuffer, Connection> s_connections;
 
-/* This internal function sends back the object to the main thread to send a reply.
-   The caller should NOT release or touch the unit after calling this function */
-static void sendDOQUnitToTheMainThread(DOQUnitUniquePtr&& du, const char* description)
-{
-  if (du->responseSender == nullptr) {
-    return;
-  }
-  try {
-    if (!du->responseSender->send(std::move(du))) {
-      vinfolog("Unable to pass a %s to the DoQ worker thread because the pipe is full", description);
-    }
-  } catch (const std::exception& e) {
-    vinfolog("Unable to pass a %s to the DoQ worker thread because we couldn't write to the pipe: %s", description, e.what());
-  }
-}
-
 class DOQTCPCrossQuerySender final : public TCPQuerySender
 {
 public:
@@ -114,9 +85,6 @@ public:
     }
 
     auto du = std::move(response.d_idstate.doqu);
-    if (du->responseSender == nullptr) {
-      return;
-    }
 
     du->response = std::move(response.d_buffer);
     du->ids = std::move(response.d_idstate);
@@ -136,7 +104,7 @@ public:
       if (!processResponse(dr.ids.doqu->response, *localRespRuleActions, *localCacheInsertedRespRuleActions, dr, false)) {
         if (dr.ids.doqu) {
 
-          sendDOQUnitToTheMainThread(std::move(dr.ids.doqu), "Response dropped by rules");
+          sendBackDOQUnit(std::move(dr.ids.doqu), "Response dropped by rules");
         }
         return;
       }
@@ -164,7 +132,7 @@ public:
       ++du->ids.cs->responses;
     }
 
-    sendDOQUnitToTheMainThread(std::move(du), "cross-protocol response");
+    sendBackDOQUnit(std::move(du), "Cross-protocol response");
   }
 
   void handleXFRResponse(const struct timeval& now, TCPResponse&& response) override
@@ -174,33 +142,7 @@ public:
 
   void notifyIOError(const struct timeval& now, TCPResponse&& response) override
   {
-    // auto& query = response.d_idstate;
-    // if (!query.du) {
-    //   return;
-    // }
-
-    // auto dohUnit = getDUFromIDS(query);
-    // if (dohUnit->responseSender == nullptr) {
-    //   return;
-    // }
-
-    // du->ids = std::move(query);
-    // sendDOQUnitToTheMainThread(std::move(du), "cross-protocol error response");
-  }
- // void notifyIOError(InternalQueryState&& query, const struct timeval& now) override
-  // {
-  //   if (!query.doqu) {
-  //     return;
-  //   }
-
-  //   if (query.doqu->responseSender == nullptr) {
-  //     return;
-  //   }
-
-  //   auto du = std::move(query.doqu);
-  //   du->ids = std::move(query);
-  //   sendDOQUnitToTheMainThread(std::move(du), "cross-protocol error response");
-  // }
+  }
 };
 
 class DOQCrossProtocolQuery : public CrossProtocolQuery
@@ -234,7 +176,7 @@ public:
 
   void handleInternalError()
   {
-    sendDOQUnitToTheMainThread(std::move(query.d_idstate.doqu), "DOQ internal error");
+    sendBackDOQUnit(std::move(query.d_idstate.doqu), "DOQ internal error");
   }
 
   std::shared_ptr<TCPQuerySender> getTCPQuerySender() override
@@ -271,11 +213,15 @@ std::shared_ptr<DOQTCPCrossQuerySender> DOQCrossProtocolQuery::s_sender = std::m
 /* Always called from the main DoQ thread */
 static void handleResponse(DOQFrontend& df, Connection& conn, const uint64_t streamID, const PacketBuffer& response)
 {
-  uint16_t responseSize = static_cast<uint16_t>(response.size());
-  const uint8_t sizeBytes[] = { static_cast<uint8_t>(responseSize / 256), static_cast<uint8_t>(responseSize % 256) };
-  auto res = quiche_conn_stream_send(conn.d_conn.get(), streamID, sizeBytes, sizeof(sizeBytes), false);
-  if (res == sizeof(sizeBytes)) {
-    res = quiche_conn_stream_send(conn.d_conn.get(), streamID, response.data(), response.size(), true);
+  if (response.size() == 0) {
+    quiche_conn_stream_shutdown(conn.d_conn.get(), streamID, QUICHE_SHUTDOWN_WRITE, 0x5);
+  } else {
+    uint16_t responseSize = static_cast<uint16_t>(response.size());
+    const uint8_t sizeBytes[] = { static_cast<uint8_t>(responseSize / 256), static_cast<uint8_t>(responseSize % 256) };
+    auto res = quiche_conn_stream_send(conn.d_conn.get(), streamID, sizeBytes, sizeof(sizeBytes), false);
+    if (res == sizeof(sizeBytes)) {
+      res = quiche_conn_stream_send(conn.d_conn.get(), streamID, response.data(), response.size(), true);
+    }
   }
 }
 
@@ -315,12 +261,12 @@ void DOQFrontend::setup()
   quiche_config_set_cc_algorithm(config.get(), QUICHE_CC_RENO);
   // quiche_config_log_keys(config.get());
 
-  d_server_config = std::make_shared<DOQServerConfig>(std::move(config), d_internalPipeBufferSize);
+  d_server_config = std::make_shared<DOQServerConfig>(std::move(config));
 }
 
 static std::optional<PacketBuffer> getCID()
 {
-  // FIXME remplacer par notre truc de random
+  // FIXME replace it
   int rng = open("/dev/urandom", O_RDONLY);
   if (rng < 0) {
     return std::nullopt;
@@ -339,15 +285,6 @@ static std::optional<PacketBuffer> getCID()
 static PacketBuffer mintToken(const PacketBuffer& dcid, const ComboAddress& peer)
 {
   // FIXME: really check whether this needs to be authenticated, via HMAC for example
-  // client recoit un datagram
-  // challenge avec token
-  // suffisement d'infos pour binder a la bonne adresse
-  // filer l'original CID fille par le client.
-  // -> ne pas garder l'etat
-  // -> inclure l'info dans le token
-  // -> voir avec libsodium ?
-  // -> token plus gros avec HMAC
-  // -> regarder ce que font les autres implementations de QUIC
   const std::array keyword = {'q', 'u', 'i', 'c', 'h', 'e'};
   auto addrBytes = peer.toByteString();
   PacketBuffer token;
@@ -426,6 +363,13 @@ static std::optional<std::reference_wrapper<Connection>> getConnection(const Pac
   return it->second;
 }
 
+static void sendBackDOQUnit(DOQUnitUniquePtr&& du, const char* description)
+{
+  DEBUGLOG("Handling back a " << description);
+  auto conn = getConnection(du->serverConnID);
+  handleResponse(*du->dsc->df, *conn, du->streamID, du->response);
+}
+
 static std::optional<std::reference_wrapper<Connection>> createConnection(QuicheConfig& config, const PacketBuffer& serverSideID, const PacketBuffer& originalDestinationID, const PacketBuffer& token, const ComboAddress& local, const ComboAddress& peer)
 {
   auto quicheConn = QuicheConnection(quiche_accept(serverSideID.data(), serverSideID.size(),
@@ -459,10 +403,6 @@ static void flushEgress(Socket& sock, Connection& conn) {
   }
 
   // FIXME: update timers
-  // -> on peut appeler une fonction quiche pour savoir quand prochain timeout
-  // -> pas ici ?
-  // -> fin de loop event quand est le prochain plus petit timeout a venir
-  // -> relancer le multiplexer pour au plus ce temps la
 }
 
 std::unique_ptr<CrossProtocolQuery> getDOQCrossProtocolQueryFromDQ(DNSQuestion& dq, bool isResponse)
@@ -495,20 +435,15 @@ std::unique_ptr<CrossProtocolQuery> getDOQCrossProtocolQueryFromDQ(DNSQuestion&
 /*
    We are not in the main DoQ thread but in the DoQ 'client' thread.
 */
-static void processDOQQuery(DOQUnitUniquePtr&& unit, bool inMainThread = false)
+static void processDOQQuery(DOQUnitUniquePtr&& unit)
 {
-  const auto handleImmediateResponse = [inMainThread](DOQUnitUniquePtr&& du, const char* reason) {
+  const auto handleImmediateResponse = [](DOQUnitUniquePtr&& du, const char* reason) {
     DEBUGLOG("handleImmediateResponse() reason=" << reason);
-    if (inMainThread) {
       auto conn = getConnection(du->serverConnID);
       handleResponse(*du->dsc->df, *conn, du->streamID, du->response);
       /* so the unique pointer is stored in the InternalState which itself is stored in the unique pointer itself. We likely need
          a better design, but for now let's just reset the internal one since we know it is no longer needed. */
       du->ids.doqu.reset();
-    }
-    else {
-      sendDOQUnitToTheMainThread(std::move(du), reason);
-    }
   };
 
   auto& ids = unit->ids;
@@ -543,6 +478,11 @@ static void processDOQQuery(DOQUnitUniquePtr&& unit, bool inMainThread = false)
     if (du->query.size() < sizeof(dnsheader)) {
       // ++dnsdist::metrics::g_stats.nonCompliantQueries;
       // ++cs.nonCompliantQueries;
+      struct dnsheader* dh = reinterpret_cast<struct dnsheader*>(du->query.data());
+      dh->rcode = RCode::ServFail;
+      dh->qr = true;
+      du->response = std::move(du->query);
+
       handleImmediateResponse(std::move(du), "DoQ non-compliant query");
       return;
     }
@@ -556,7 +496,10 @@ static void processDOQQuery(DOQUnitUniquePtr&& unit, bool inMainThread = false)
       struct dnsheader* dh = reinterpret_cast<struct dnsheader*>(du->query.data());
 
       if (!checkQueryHeaders(dh, cs)) {
-        // du->status_code = 400;
+        dh->rcode = RCode::ServFail;
+        dh->qr = true;
+        du->response = std::move(du->query);
+
         handleImmediateResponse(std::move(du), "DoQ invalid headers");
         return;
       }
@@ -631,13 +574,8 @@ static void processDOQQuery(DOQUnitUniquePtr&& unit, bool inMainThread = false)
       return;
     }
     else {
-      if (inMainThread) {
-        du = cpq->releaseDU();
-        handleImmediateResponse(std::move(du), "DoQ internal error");
-      }
-      else {
-        cpq->handleInternalError();
-      }
+      du = cpq->releaseDU();
+      handleImmediateResponse(std::move(du), "DoQ internal error");
       return;
     }
   }
@@ -650,54 +588,6 @@ static void processDOQQuery(DOQUnitUniquePtr&& unit, bool inMainThread = false)
   return;
 }
 
-static void flushResponses(pdns::channel::Receiver<DOQUnit>&& receiver)
-{
-  setThreadName("dnsdist/doq-responder");
-
-  for(;;) {
-    try {
-      auto tmp = receiver.receive();
-      if (!tmp) {
-        return ;
-      }
-
-      auto du = std::move(*tmp);
-      auto conn = getConnection(du->serverConnID);
-
-      handleResponse(*du->dsc->df, *conn, du->streamID, du->response);
-      
-    }
-    catch (const std::exception& e) {
-      errlog("Error while processing response received over DoQ: %s", e.what());
-    }
-    catch (...) {
-      errlog("Unspecified error while processing response received over DoQ");
-    }
-  }
-}
-
-static void dnsdistclient(pdns::channel::Receiver<DOQUnit>&& receiver)
-{
-  setThreadName("dnsdist/doq-cli");
-
-  for(;;) {
-    try {
-      auto tmp = receiver.receive();
-      if (!tmp) {
-        continue;
-      }
-      auto du = std::move(*tmp);
-      processDOQQuery(std::move(du), false);
-    }
-    catch (const std::exception& e) {
-      errlog("Error while processing query received over DoQ: %s", e.what());
-    }
-    catch (...) {
-      errlog("Unspecified error while processing query received over DoQ");
-    }
-  }
-}
-
 static void doq_dispatch_query(DOQServerConfig& dsc, PacketBuffer&& query, const ComboAddress& local, const ComboAddress& remote, const PacketBuffer& serverConnID, const uint64_t streamID)
 {
   try {
@@ -712,18 +602,10 @@ static void doq_dispatch_query(DOQServerConfig& dsc, PacketBuffer&& query, const
     du->ids.origDest = local;
     du->ids.origRemote = remote;
     du->ids.protocol = dnsdist::Protocol::DoQ;
-    du->responseSender = &dsc.d_responseSender;
     du->serverConnID = serverConnID;
     du->streamID = streamID;
 
-    try {
-      if (!dsc.d_querySender.send(std::move(du))) {
-        vinfolog("Unable to pass a DoQ query to the DoQ worker thread because the pipe is full");
-      }
-    }
-    catch (...) {
-      vinfolog("Unable to pass a DoQ query to the DoQ worker thread because we couldn't write to the pipe: %s", stringerror());
-    }
+    processDOQQuery(std::move(du));
   }
   catch (const std::exception& e) {
     vinfolog("Had error parsing DoQ DNS packet from %s: %s", remote.toStringWithPort(), e.what());
@@ -739,10 +621,6 @@ void doqThread(ClientState* cs)
     frontend->d_server_config->cs = cs;
     frontend->d_server_config->df = cs->doqFrontend;
 
-    std::thread dnsdistThread(dnsdistclient, std::move(frontend->d_server_config->d_queryReceiver));
-    dnsdistThread.detach();
-    std::thread responderThread(flushResponses, std::move(frontend->d_server_config->d_responseReceiver));
-    responderThread.detach();
     setThreadName("dnsdist/doq");
 
     Socket sock(cs->udpFD);
@@ -838,6 +716,7 @@ void doqThread(ClientState* cs)
             buffer.resize(received);
 
             if (fin) {
+              // we skip message length, should we verify ?
               buffer.erase(buffer.begin(), buffer.begin() + 2);
               if (buffer.size() >= sizeof(dnsheader)) {
                 doq_dispatch_query(*(frontend->d_server_config), std::move(buffer), cs->local, client, serverConnID, streamID);
@@ -848,15 +727,31 @@ void doqThread(ClientState* cs)
         else {
           DEBUGLOG("Connection not established");
         }
-        /* FIXME: we should handle closed connections, timeouts */
+        /* FIXME: we should handle timeouts */
         // pacing QUIC ?
         // quiche_send_info.at Queue avec les paquets a envoyer par date.
       }
-      for (auto& conn : s_connections) {
-        flushEgress(sock, conn.second);
+      for (auto conn = s_connections.begin(); conn != s_connections.end();) {
+        quiche_conn_on_timeout(conn->second.d_conn.get());
+
+        flushEgress(sock, conn->second);
+
+        if (quiche_conn_is_closed(conn->second.d_conn.get())) {
+#ifdef DEBUGLOG_ENABLED
+          quiche_stats stats;
+          quiche_path_stats path_stats;
+
+          quiche_conn_stats(conn->second.d_conn.get(), &stats);
+          quiche_conn_path_stats(conn->second.d_conn.get(), 0, &path_stats);
+
+          DEBUGLOG("Connection closed, recv="<<stats.recv<<" sent="<<stats.sent<<" lost="<<stats.lost<<" rtt="<<path_stats.rtt<<"ns cwnd="<<path_stats.cwnd);
+#endif
+          conn = s_connections.erase(conn);
+        } else {
+          ++conn;
+        }
       }
     }
-
   }
   catch (const std::exception& e) {
     DEBUGLOG("Caught fatal error: " << e.what());
index a145f83842d6e82d492f8f5b36cdf34e80d5b8da..461f09fbd182a8cae902ad92d5baccc5497b7262 100644 (file)
@@ -71,13 +71,6 @@ struct DOQFrontend
   ComboAddress d_local;
 
   void setup();
-#ifdef __linux__
-  // On Linux this gives us 128k pending queries (default is 8192 queries),
-  // which should be enough to deal with huge spikes
-  uint32_t d_internalPipeBufferSize{1024*1024};
-#else
-  uint32_t d_internalPipeBufferSize{0};
-#endif
 };
 
 struct DOQUnit
@@ -96,9 +89,7 @@ struct DOQUnit
   std::shared_ptr<DownstreamState> downstream{nullptr};
   DOQServerConfig* dsc{nullptr};
   pdns::channel::Sender<DOQUnit>* responseSender{nullptr};
-  size_t query_at{0};
   size_t proxyProtocolPayloadSize{0};
-  int rsock{-1};
   uint64_t streamID{0};
   PacketBuffer serverConnID;
   /* whether the query was re-sent to the backend over
index 75f12e6202211f9141dd145b5b83c0757a779533..7925c37cf19f4e3a46ff82e793592d1005d316d9 100644 (file)
@@ -1089,3 +1089,35 @@ class DNSDistTest(AssertEqualDNSMessageMixin, unittest.TestCase):
 
     def sendDOTQueryWrapper(self, query, response, useQueue=True):
         return self.sendDOTQuery(self._tlsServerPort, self._serverName, query, response, self._caCert, useQueue=useQueue)
+
+    @classmethod
+    def getDOQConnection(cls, port, servername, caFile=None, source=None, source_port=0):
+
+        manager = dns.quic.SyncQuicManager(
+            verify_mode=caFile
+        )
+
+        return manager.connect(servername, port, source, source_port)
+
+    @classmethod
+    def sendDOQQuery(cls, port, servername, query, response=None, timeout=2.0, caFile=None, useQueue=True, rawQuery=False, fromQueue=None, toQueue=None, connection=None):
+
+        if response:
+            if toQueue:
+                toQueue.put(response, True, timeout)
+            else:
+                cls._toResponderQueue.put(response, True, timeout)
+
+        message = dns.query.quic(query, servername, timeout, port, verify=caFile, connection=connection)
+
+        receivedQuery = None
+
+        if useQueue:
+            if fromQueue:
+                if not fromQueue.empty():
+                    receivedQuery = fromQueue.get(True, timeout)
+            else:
+                if not cls._fromResponderQueue.empty():
+                    receivedQuery = cls._fromResponderQueue.get(True, timeout)
+
+        return (receivedQuery, message)
index 3bc85702d228e1ef2676a45a2b6af7408bf28207..4c6b1020bdab2d939f83f544ab89df5618eac95d 100644 (file)
@@ -11,3 +11,4 @@ pycurl>=7.43.0
 lmdb>=0.95
 cdbx==0.1.2
 h2>=4.0.0
+aioquic
diff --git a/regression-tests.dnsdist/test_DOQ.py b/regression-tests.dnsdist/test_DOQ.py
new file mode 100644 (file)
index 0000000..b2ecea8
--- /dev/null
@@ -0,0 +1,140 @@
+#!/usr/bin/env python
+import dns
+import clientsubnetoption
+
+from dnsdisttests import DNSDistTest
+from dnsdisttests import pickAvailablePort
+
+class TestDOQ(DNSDistTest):
+    _serverKey = 'server.key'
+    _serverCert = 'server.chain'
+    _serverName = '127.0.0.1'
+    _caCert = 'ca.pem'
+    _doqServerPort = 8853
+    _config_template = """
+    newServer{address="127.0.0.1:%d"}
+
+    addAction("drop.doq.tests.powerdns.com.", DropAction())
+    addAction("refused.doq.tests.powerdns.com.", RCodeAction(DNSRCode.REFUSED))
+    addAction("spoof.doq.tests.powerdns.com.", SpoofAction("1.2.3.4"))
+    addAction("no-backend.doq.tests.powerdns.com.", PoolAction('this-pool-has-no-backend'))
+
+    addDOQLocal("127.0.0.1:%d", "%s", "%s")
+    """
+    _config_params = ['_testServerPort', '_doqServerPort','_serverCert', '_serverKey']
+    _verboseMode = True
+
+    def testDOQSimple(self):
+        """
+        DOQ: Simple query
+        """
+        name = 'simple.doq.tests.powerdns.com.'
+        query = dns.message.make_query(name, 'A', 'IN', use_edns=False)
+        query.id = 0
+        expectedQuery = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=4096)
+        expectedQuery.id = 0
+        response = dns.message.make_response(query)
+        rrset = dns.rrset.from_text(name,
+                                    3600,
+                                    dns.rdataclass.IN,
+                                    dns.rdatatype.A,
+                                    '127.0.0.1')
+        response.answer.append(rrset)
+        (receivedQuery, receivedResponse) = self.sendDOQQuery(self._doqServerPort, self._serverName, query, response=response, caFile=self._caCert)
+        self.assertTrue(receivedQuery)
+        self.assertTrue(receivedResponse)
+        receivedQuery.id = expectedQuery.id
+        self.assertEqual(expectedQuery, receivedQuery)
+
+    def testDOQMultipleStreams(self):
+        """
+        DOQ: Test multiple queries using the same connection
+        """
+
+        name = 'simple.doq.tests.powerdns.com.'
+        query = dns.message.make_query(name, 'A', 'IN', use_edns=False)
+        query.id = 0
+        expectedQuery = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=4096)
+        expectedQuery.id = 0
+        response = dns.message.make_response(query)
+        rrset = dns.rrset.from_text(name,
+                                    3600,
+                                    dns.rdataclass.IN,
+                                    dns.rdatatype.A,
+                                    '127.0.0.1')
+        response.answer.append(rrset)
+
+        connection = self.getDOQConnection(self._doqServerPort, self._serverName, self._caCert)
+
+        (receivedQuery, receivedResponse) = self.sendDOQQuery(self._doqServerPort, self._serverName, query, response=response, connection=connection)
+        self.assertTrue(receivedQuery)
+        self.assertTrue(receivedResponse)
+        receivedQuery.id = expectedQuery.id
+        self.assertEqual(expectedQuery, receivedQuery)
+
+        (receivedQuery, receivedResponse) = self.sendDOQQuery(self._doqServerPort, self._serverName, query, response=response, connection=connection)
+        self.assertTrue(receivedQuery)
+        self.assertTrue(receivedResponse)
+        receivedQuery.id = expectedQuery.id
+        self.assertEqual(expectedQuery, receivedQuery)
+
+    def testDropped(self):
+        """
+        DOQ: Dropped query
+        """
+        name = 'drop.doq.tests.powerdns.com.'
+        query = dns.message.make_query(name, 'A', 'IN')
+        dropped = False
+        try:
+            (_, receivedResponse) = self.sendDOQQuery(self._doqServerPort, self._serverName, query, response=None, caFile=self._caCert, useQueue=False)
+            # dns.quic doesn't seem to report correctly the quic error so the connection timeout
+        except dns.exception.Timeout :
+            dropped = True
+        self.assertTrue(dropped)
+
+    def testRefused(self):
+        """
+        DOQ: Refused
+        """
+        name = 'refused.doq.tests.powerdns.com.'
+        query = dns.message.make_query(name, 'A', 'IN')
+        query.id = 0
+        query.flags &= ~dns.flags.RD
+        expectedResponse = dns.message.make_response(query)
+        expectedResponse.set_rcode(dns.rcode.REFUSED)
+
+        (_, receivedResponse) = self.sendDOQQuery(self._doqServerPort, self._serverName, query, response=None, caFile=self._caCert, useQueue=False)
+        self.assertEqual(receivedResponse, expectedResponse)
+
+    def testSpoof(self):
+        """
+        DOQ: Spoofed
+        """
+        name = 'spoof.doq.tests.powerdns.com.'
+        query = dns.message.make_query(name, 'A', 'IN')
+        query.id = 0
+        query.flags &= ~dns.flags.RD
+        expectedResponse = dns.message.make_response(query)
+        rrset = dns.rrset.from_text(name,
+                                    3600,
+                                    dns.rdataclass.IN,
+                                    dns.rdatatype.A,
+                                    '1.2.3.4')
+        expectedResponse.answer.append(rrset)
+
+        (_, receivedResponse) = self.sendDOQQuery(self._doqServerPort, self._serverName, query, response=None, caFile=self._caCert, useQueue=False)
+        self.assertEqual(receivedResponse, expectedResponse)
+
+    def testDOQNoBackend(self):
+        """
+        DOQ: No backend
+        """
+        name = 'no-backend.doq.tests.powerdns.com.'
+        query = dns.message.make_query(name, 'A', 'IN', use_edns=False)
+        dropped = False
+        try:
+            (_, receivedResponse) = self.sendDOQQuery(self._doqServerPort, self._serverName, query, response=None, caFile=self._caCert, useQueue=False)
+        except dns.exception.Timeout :
+            dropped = True
+        self.assertTrue(dropped)
+            # dns.quic doesn't seem to report correctly the quic error so the connection timeout