#include "dnsdist-proxy-protocol.hh"
+static void sendBackDOQUnit(DOQUnitUniquePtr&& du, const char* description);
class DOQServerConfig
{
public:
- DOQServerConfig(std::unique_ptr<quiche_config, decltype(&quiche_config_free)>&& config_, uint32_t internalPipeBufferSize) :
+ DOQServerConfig(std::unique_ptr<quiche_config, decltype(&quiche_config_free)>&& config_) :
config(std::move(config_))
{
- {
- auto [sender, receiver] = pdns::channel::createObjectQueue<DOQUnit>(pdns::channel::SenderBlockingMode::SenderNonBlocking, pdns::channel::ReceiverBlockingMode::ReceiverBlocking, internalPipeBufferSize);
- d_querySender = std::move(sender);
- d_queryReceiver = std::move(receiver);
- }
- {
- auto [sender, receiver] = pdns::channel::createObjectQueue<DOQUnit>(pdns::channel::SenderBlockingMode::SenderNonBlocking, pdns::channel::ReceiverBlockingMode::ReceiverBlocking, internalPipeBufferSize);
- d_responseSender = std::move(sender);
- d_responseReceiver = std::move(receiver);
- }
}
DOQServerConfig(const DOQServerConfig&) = delete;
DOQServerConfig(DOQServerConfig&&) = default;
QuicheConfig config;
ClientState* cs{nullptr};
std::shared_ptr<DOQFrontend> df{nullptr};
- pdns::channel::Sender<DOQUnit> d_querySender;
- pdns::channel::Receiver<DOQUnit> d_queryReceiver;
- pdns::channel::Sender<DOQUnit> d_responseSender;
- pdns::channel::Receiver<DOQUnit> d_responseReceiver;
};
-#if 1
+#if 0
#define DEBUGLOG_ENABLED
#define DEBUGLOG(x) std::cerr<<x<<std::endl;
#else
static std::map<PacketBuffer, Connection> s_connections;
-/* This internal function sends back the object to the main thread to send a reply.
- The caller should NOT release or touch the unit after calling this function */
-static void sendDOQUnitToTheMainThread(DOQUnitUniquePtr&& du, const char* description)
-{
- if (du->responseSender == nullptr) {
- return;
- }
- try {
- if (!du->responseSender->send(std::move(du))) {
- vinfolog("Unable to pass a %s to the DoQ worker thread because the pipe is full", description);
- }
- } catch (const std::exception& e) {
- vinfolog("Unable to pass a %s to the DoQ worker thread because we couldn't write to the pipe: %s", description, e.what());
- }
-}
-
class DOQTCPCrossQuerySender final : public TCPQuerySender
{
public:
}
auto du = std::move(response.d_idstate.doqu);
- if (du->responseSender == nullptr) {
- return;
- }
du->response = std::move(response.d_buffer);
du->ids = std::move(response.d_idstate);
if (!processResponse(dr.ids.doqu->response, *localRespRuleActions, *localCacheInsertedRespRuleActions, dr, false)) {
if (dr.ids.doqu) {
- sendDOQUnitToTheMainThread(std::move(dr.ids.doqu), "Response dropped by rules");
+ sendBackDOQUnit(std::move(dr.ids.doqu), "Response dropped by rules");
}
return;
}
++du->ids.cs->responses;
}
- sendDOQUnitToTheMainThread(std::move(du), "cross-protocol response");
+ sendBackDOQUnit(std::move(du), "Cross-protocol response");
}
void handleXFRResponse(const struct timeval& now, TCPResponse&& response) override
void notifyIOError(const struct timeval& now, TCPResponse&& response) override
{
- // auto& query = response.d_idstate;
- // if (!query.du) {
- // return;
- // }
-
- // auto dohUnit = getDUFromIDS(query);
- // if (dohUnit->responseSender == nullptr) {
- // return;
- // }
-
- // du->ids = std::move(query);
- // sendDOQUnitToTheMainThread(std::move(du), "cross-protocol error response");
- }
- // void notifyIOError(InternalQueryState&& query, const struct timeval& now) override
- // {
- // if (!query.doqu) {
- // return;
- // }
-
- // if (query.doqu->responseSender == nullptr) {
- // return;
- // }
-
- // auto du = std::move(query.doqu);
- // du->ids = std::move(query);
- // sendDOQUnitToTheMainThread(std::move(du), "cross-protocol error response");
- // }
+ }
};
class DOQCrossProtocolQuery : public CrossProtocolQuery
void handleInternalError()
{
- sendDOQUnitToTheMainThread(std::move(query.d_idstate.doqu), "DOQ internal error");
+ sendBackDOQUnit(std::move(query.d_idstate.doqu), "DOQ internal error");
}
std::shared_ptr<TCPQuerySender> getTCPQuerySender() override
/* Always called from the main DoQ thread */
static void handleResponse(DOQFrontend& df, Connection& conn, const uint64_t streamID, const PacketBuffer& response)
{
- uint16_t responseSize = static_cast<uint16_t>(response.size());
- const uint8_t sizeBytes[] = { static_cast<uint8_t>(responseSize / 256), static_cast<uint8_t>(responseSize % 256) };
- auto res = quiche_conn_stream_send(conn.d_conn.get(), streamID, sizeBytes, sizeof(sizeBytes), false);
- if (res == sizeof(sizeBytes)) {
- res = quiche_conn_stream_send(conn.d_conn.get(), streamID, response.data(), response.size(), true);
+ if (response.size() == 0) {
+ quiche_conn_stream_shutdown(conn.d_conn.get(), streamID, QUICHE_SHUTDOWN_WRITE, 0x5);
+ } else {
+ uint16_t responseSize = static_cast<uint16_t>(response.size());
+ const uint8_t sizeBytes[] = { static_cast<uint8_t>(responseSize / 256), static_cast<uint8_t>(responseSize % 256) };
+ auto res = quiche_conn_stream_send(conn.d_conn.get(), streamID, sizeBytes, sizeof(sizeBytes), false);
+ if (res == sizeof(sizeBytes)) {
+ res = quiche_conn_stream_send(conn.d_conn.get(), streamID, response.data(), response.size(), true);
+ }
}
}
quiche_config_set_cc_algorithm(config.get(), QUICHE_CC_RENO);
// quiche_config_log_keys(config.get());
- d_server_config = std::make_shared<DOQServerConfig>(std::move(config), d_internalPipeBufferSize);
+ d_server_config = std::make_shared<DOQServerConfig>(std::move(config));
}
static std::optional<PacketBuffer> getCID()
{
- // FIXME remplacer par notre truc de random
+ // FIXME replace it
int rng = open("/dev/urandom", O_RDONLY);
if (rng < 0) {
return std::nullopt;
static PacketBuffer mintToken(const PacketBuffer& dcid, const ComboAddress& peer)
{
// FIXME: really check whether this needs to be authenticated, via HMAC for example
- // client recoit un datagram
- // challenge avec token
- // suffisement d'infos pour binder a la bonne adresse
- // filer l'original CID fille par le client.
- // -> ne pas garder l'etat
- // -> inclure l'info dans le token
- // -> voir avec libsodium ?
- // -> token plus gros avec HMAC
- // -> regarder ce que font les autres implementations de QUIC
const std::array keyword = {'q', 'u', 'i', 'c', 'h', 'e'};
auto addrBytes = peer.toByteString();
PacketBuffer token;
return it->second;
}
+static void sendBackDOQUnit(DOQUnitUniquePtr&& du, const char* description)
+{
+ DEBUGLOG("Handling back a " << description);
+ auto conn = getConnection(du->serverConnID);
+ handleResponse(*du->dsc->df, *conn, du->streamID, du->response);
+}
+
static std::optional<std::reference_wrapper<Connection>> createConnection(QuicheConfig& config, const PacketBuffer& serverSideID, const PacketBuffer& originalDestinationID, const PacketBuffer& token, const ComboAddress& local, const ComboAddress& peer)
{
auto quicheConn = QuicheConnection(quiche_accept(serverSideID.data(), serverSideID.size(),
}
// FIXME: update timers
- // -> on peut appeler une fonction quiche pour savoir quand prochain timeout
- // -> pas ici ?
- // -> fin de loop event quand est le prochain plus petit timeout a venir
- // -> relancer le multiplexer pour au plus ce temps la
}
std::unique_ptr<CrossProtocolQuery> getDOQCrossProtocolQueryFromDQ(DNSQuestion& dq, bool isResponse)
/*
We are not in the main DoQ thread but in the DoQ 'client' thread.
*/
-static void processDOQQuery(DOQUnitUniquePtr&& unit, bool inMainThread = false)
+static void processDOQQuery(DOQUnitUniquePtr&& unit)
{
- const auto handleImmediateResponse = [inMainThread](DOQUnitUniquePtr&& du, const char* reason) {
+ const auto handleImmediateResponse = [](DOQUnitUniquePtr&& du, const char* reason) {
DEBUGLOG("handleImmediateResponse() reason=" << reason);
- if (inMainThread) {
auto conn = getConnection(du->serverConnID);
handleResponse(*du->dsc->df, *conn, du->streamID, du->response);
/* so the unique pointer is stored in the InternalState which itself is stored in the unique pointer itself. We likely need
a better design, but for now let's just reset the internal one since we know it is no longer needed. */
du->ids.doqu.reset();
- }
- else {
- sendDOQUnitToTheMainThread(std::move(du), reason);
- }
};
auto& ids = unit->ids;
if (du->query.size() < sizeof(dnsheader)) {
// ++dnsdist::metrics::g_stats.nonCompliantQueries;
// ++cs.nonCompliantQueries;
+ struct dnsheader* dh = reinterpret_cast<struct dnsheader*>(du->query.data());
+ dh->rcode = RCode::ServFail;
+ dh->qr = true;
+ du->response = std::move(du->query);
+
handleImmediateResponse(std::move(du), "DoQ non-compliant query");
return;
}
struct dnsheader* dh = reinterpret_cast<struct dnsheader*>(du->query.data());
if (!checkQueryHeaders(dh, cs)) {
- // du->status_code = 400;
+ dh->rcode = RCode::ServFail;
+ dh->qr = true;
+ du->response = std::move(du->query);
+
handleImmediateResponse(std::move(du), "DoQ invalid headers");
return;
}
return;
}
else {
- if (inMainThread) {
- du = cpq->releaseDU();
- handleImmediateResponse(std::move(du), "DoQ internal error");
- }
- else {
- cpq->handleInternalError();
- }
+ du = cpq->releaseDU();
+ handleImmediateResponse(std::move(du), "DoQ internal error");
return;
}
}
return;
}
-static void flushResponses(pdns::channel::Receiver<DOQUnit>&& receiver)
-{
- setThreadName("dnsdist/doq-responder");
-
- for(;;) {
- try {
- auto tmp = receiver.receive();
- if (!tmp) {
- return ;
- }
-
- auto du = std::move(*tmp);
- auto conn = getConnection(du->serverConnID);
-
- handleResponse(*du->dsc->df, *conn, du->streamID, du->response);
-
- }
- catch (const std::exception& e) {
- errlog("Error while processing response received over DoQ: %s", e.what());
- }
- catch (...) {
- errlog("Unspecified error while processing response received over DoQ");
- }
- }
-}
-
-static void dnsdistclient(pdns::channel::Receiver<DOQUnit>&& receiver)
-{
- setThreadName("dnsdist/doq-cli");
-
- for(;;) {
- try {
- auto tmp = receiver.receive();
- if (!tmp) {
- continue;
- }
- auto du = std::move(*tmp);
- processDOQQuery(std::move(du), false);
- }
- catch (const std::exception& e) {
- errlog("Error while processing query received over DoQ: %s", e.what());
- }
- catch (...) {
- errlog("Unspecified error while processing query received over DoQ");
- }
- }
-}
-
static void doq_dispatch_query(DOQServerConfig& dsc, PacketBuffer&& query, const ComboAddress& local, const ComboAddress& remote, const PacketBuffer& serverConnID, const uint64_t streamID)
{
try {
du->ids.origDest = local;
du->ids.origRemote = remote;
du->ids.protocol = dnsdist::Protocol::DoQ;
- du->responseSender = &dsc.d_responseSender;
du->serverConnID = serverConnID;
du->streamID = streamID;
- try {
- if (!dsc.d_querySender.send(std::move(du))) {
- vinfolog("Unable to pass a DoQ query to the DoQ worker thread because the pipe is full");
- }
- }
- catch (...) {
- vinfolog("Unable to pass a DoQ query to the DoQ worker thread because we couldn't write to the pipe: %s", stringerror());
- }
+ processDOQQuery(std::move(du));
}
catch (const std::exception& e) {
vinfolog("Had error parsing DoQ DNS packet from %s: %s", remote.toStringWithPort(), e.what());
frontend->d_server_config->cs = cs;
frontend->d_server_config->df = cs->doqFrontend;
- std::thread dnsdistThread(dnsdistclient, std::move(frontend->d_server_config->d_queryReceiver));
- dnsdistThread.detach();
- std::thread responderThread(flushResponses, std::move(frontend->d_server_config->d_responseReceiver));
- responderThread.detach();
setThreadName("dnsdist/doq");
Socket sock(cs->udpFD);
buffer.resize(received);
if (fin) {
+ // we skip message length, should we verify ?
buffer.erase(buffer.begin(), buffer.begin() + 2);
if (buffer.size() >= sizeof(dnsheader)) {
doq_dispatch_query(*(frontend->d_server_config), std::move(buffer), cs->local, client, serverConnID, streamID);
else {
DEBUGLOG("Connection not established");
}
- /* FIXME: we should handle closed connections, timeouts */
+ /* FIXME: we should handle timeouts */
// pacing QUIC ?
// quiche_send_info.at Queue avec les paquets a envoyer par date.
}
- for (auto& conn : s_connections) {
- flushEgress(sock, conn.second);
+ for (auto conn = s_connections.begin(); conn != s_connections.end();) {
+ quiche_conn_on_timeout(conn->second.d_conn.get());
+
+ flushEgress(sock, conn->second);
+
+ if (quiche_conn_is_closed(conn->second.d_conn.get())) {
+#ifdef DEBUGLOG_ENABLED
+ quiche_stats stats;
+ quiche_path_stats path_stats;
+
+ quiche_conn_stats(conn->second.d_conn.get(), &stats);
+ quiche_conn_path_stats(conn->second.d_conn.get(), 0, &path_stats);
+
+ DEBUGLOG("Connection closed, recv="<<stats.recv<<" sent="<<stats.sent<<" lost="<<stats.lost<<" rtt="<<path_stats.rtt<<"ns cwnd="<<path_stats.cwnd);
+#endif
+ conn = s_connections.erase(conn);
+ } else {
+ ++conn;
+ }
}
}
-
}
catch (const std::exception& e) {
DEBUGLOG("Caught fatal error: " << e.what());
--- /dev/null
+#!/usr/bin/env python
+import dns
+import clientsubnetoption
+
+from dnsdisttests import DNSDistTest
+from dnsdisttests import pickAvailablePort
+
+class TestDOQ(DNSDistTest):
+ _serverKey = 'server.key'
+ _serverCert = 'server.chain'
+ _serverName = '127.0.0.1'
+ _caCert = 'ca.pem'
+ _doqServerPort = 8853
+ _config_template = """
+ newServer{address="127.0.0.1:%d"}
+
+ addAction("drop.doq.tests.powerdns.com.", DropAction())
+ addAction("refused.doq.tests.powerdns.com.", RCodeAction(DNSRCode.REFUSED))
+ addAction("spoof.doq.tests.powerdns.com.", SpoofAction("1.2.3.4"))
+ addAction("no-backend.doq.tests.powerdns.com.", PoolAction('this-pool-has-no-backend'))
+
+ addDOQLocal("127.0.0.1:%d", "%s", "%s")
+ """
+ _config_params = ['_testServerPort', '_doqServerPort','_serverCert', '_serverKey']
+ _verboseMode = True
+
+ def testDOQSimple(self):
+ """
+ DOQ: Simple query
+ """
+ name = 'simple.doq.tests.powerdns.com.'
+ query = dns.message.make_query(name, 'A', 'IN', use_edns=False)
+ query.id = 0
+ expectedQuery = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=4096)
+ expectedQuery.id = 0
+ response = dns.message.make_response(query)
+ rrset = dns.rrset.from_text(name,
+ 3600,
+ dns.rdataclass.IN,
+ dns.rdatatype.A,
+ '127.0.0.1')
+ response.answer.append(rrset)
+ (receivedQuery, receivedResponse) = self.sendDOQQuery(self._doqServerPort, self._serverName, query, response=response, caFile=self._caCert)
+ self.assertTrue(receivedQuery)
+ self.assertTrue(receivedResponse)
+ receivedQuery.id = expectedQuery.id
+ self.assertEqual(expectedQuery, receivedQuery)
+
+ def testDOQMultipleStreams(self):
+ """
+ DOQ: Test multiple queries using the same connection
+ """
+
+ name = 'simple.doq.tests.powerdns.com.'
+ query = dns.message.make_query(name, 'A', 'IN', use_edns=False)
+ query.id = 0
+ expectedQuery = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=4096)
+ expectedQuery.id = 0
+ response = dns.message.make_response(query)
+ rrset = dns.rrset.from_text(name,
+ 3600,
+ dns.rdataclass.IN,
+ dns.rdatatype.A,
+ '127.0.0.1')
+ response.answer.append(rrset)
+
+ connection = self.getDOQConnection(self._doqServerPort, self._serverName, self._caCert)
+
+ (receivedQuery, receivedResponse) = self.sendDOQQuery(self._doqServerPort, self._serverName, query, response=response, connection=connection)
+ self.assertTrue(receivedQuery)
+ self.assertTrue(receivedResponse)
+ receivedQuery.id = expectedQuery.id
+ self.assertEqual(expectedQuery, receivedQuery)
+
+ (receivedQuery, receivedResponse) = self.sendDOQQuery(self._doqServerPort, self._serverName, query, response=response, connection=connection)
+ self.assertTrue(receivedQuery)
+ self.assertTrue(receivedResponse)
+ receivedQuery.id = expectedQuery.id
+ self.assertEqual(expectedQuery, receivedQuery)
+
+ def testDropped(self):
+ """
+ DOQ: Dropped query
+ """
+ name = 'drop.doq.tests.powerdns.com.'
+ query = dns.message.make_query(name, 'A', 'IN')
+ dropped = False
+ try:
+ (_, receivedResponse) = self.sendDOQQuery(self._doqServerPort, self._serverName, query, response=None, caFile=self._caCert, useQueue=False)
+ # dns.quic doesn't seem to report correctly the quic error so the connection timeout
+ except dns.exception.Timeout :
+ dropped = True
+ self.assertTrue(dropped)
+
+ def testRefused(self):
+ """
+ DOQ: Refused
+ """
+ name = 'refused.doq.tests.powerdns.com.'
+ query = dns.message.make_query(name, 'A', 'IN')
+ query.id = 0
+ query.flags &= ~dns.flags.RD
+ expectedResponse = dns.message.make_response(query)
+ expectedResponse.set_rcode(dns.rcode.REFUSED)
+
+ (_, receivedResponse) = self.sendDOQQuery(self._doqServerPort, self._serverName, query, response=None, caFile=self._caCert, useQueue=False)
+ self.assertEqual(receivedResponse, expectedResponse)
+
+ def testSpoof(self):
+ """
+ DOQ: Spoofed
+ """
+ name = 'spoof.doq.tests.powerdns.com.'
+ query = dns.message.make_query(name, 'A', 'IN')
+ query.id = 0
+ query.flags &= ~dns.flags.RD
+ expectedResponse = dns.message.make_response(query)
+ rrset = dns.rrset.from_text(name,
+ 3600,
+ dns.rdataclass.IN,
+ dns.rdatatype.A,
+ '1.2.3.4')
+ expectedResponse.answer.append(rrset)
+
+ (_, receivedResponse) = self.sendDOQQuery(self._doqServerPort, self._serverName, query, response=None, caFile=self._caCert, useQueue=False)
+ self.assertEqual(receivedResponse, expectedResponse)
+
+ def testDOQNoBackend(self):
+ """
+ DOQ: No backend
+ """
+ name = 'no-backend.doq.tests.powerdns.com.'
+ query = dns.message.make_query(name, 'A', 'IN', use_edns=False)
+ dropped = False
+ try:
+ (_, receivedResponse) = self.sendDOQQuery(self._doqServerPort, self._serverName, query, response=None, caFile=self._caCert, useQueue=False)
+ except dns.exception.Timeout :
+ dropped = True
+ self.assertTrue(dropped)
+ # dns.quic doesn't seem to report correctly the quic error so the connection timeout