#include "dnsdist-tcp.hh"
#include "dnsdist-random.hh"
+#include "dnsparser.hh"
#include "dolog.hh"
#include "iputils.hh"
#include "misc.hh"
#include "sstuff.hh"
-#include "dnsparser.hh"
#include "threadname.hh"
#include "dnsdist-ecs.hh"
#include "dnsdist-proxy-protocol.hh"
static void sendBackDOQUnit(DOQUnitUniquePtr&& du, const char* description);
struct DOQServerConfig
{
- DOQServerConfig(std::unique_ptr<quiche_config, decltype(&quiche_config_free)>&& config_) :
+ DOQServerConfig(std::unique_ptr<quiche_config, decltype(&quiche_config_free)>&& config_, uint32_t internalPipeBufferSize) :
config(std::move(config_))
{
+ {
+ auto [sender, receiver] = pdns::channel::createObjectQueue<DOQUnit>(pdns::channel::SenderBlockingMode::SenderNonBlocking, pdns::channel::ReceiverBlockingMode::ReceiverNonBlocking, internalPipeBufferSize);
+ d_responseSender = std::move(sender);
+ d_responseReceiver = std::move(receiver);
+ }
+
}
DOQServerConfig(const DOQServerConfig&) = delete;
DOQServerConfig(DOQServerConfig&&) = default;
QuicheConfig config;
ClientState* cs{nullptr};
std::shared_ptr<DOQFrontend> df{nullptr};
+ pdns::channel::Sender<DOQUnit> d_responseSender;
+ pdns::channel::Receiver<DOQUnit> d_responseReceiver;
};
#if 0
}
auto du = std::move(response.d_idstate.doqu);
+ if (du->responseSender == nullptr) {
+ return;
+ }
du->response = std::move(response.d_buffer);
du->ids = std::move(response.d_idstate);
quiche_config_set_cc_algorithm(config.get(), QUICHE_CC_RENO);
// quiche_config_log_keys(config.get());
- d_server_config = std::make_shared<DOQServerConfig>(std::move(config));
+ d_server_config = std::make_shared<DOQServerConfig>(std::move(config), d_internalPipeBufferSize);
}
static std::optional<PacketBuffer> getCID()
static void sendBackDOQUnit(DOQUnitUniquePtr&& du, const char* description)
{
- DEBUGLOG("Handling back a " << description);
- auto conn = getConnection(du->serverConnID);
- handleResponse(*du->dsc->df, *conn, du->streamID, du->response);
+ if (du->responseSender == nullptr) {
+ return;
+ }
+ try {
+ if (!du->responseSender->send(std::move(du))) {
+ vinfolog("Unable to pass a %s to the DoQ worker thread because the pipe is full", description);
+ }
+ } catch (const std::exception& e) {
+ vinfolog("Unable to pass a %s to the DoQ worker thread because we couldn't write to the pipe: %s", description, e.what());
+ }
}
static std::optional<std::reference_wrapper<Connection>> createConnection(QuicheConfig& config, const PacketBuffer& serverSideID, const PacketBuffer& originalDestinationID, const PacketBuffer& token, const ComboAddress& local, const ComboAddress& peer)
du->ids.protocol = dnsdist::Protocol::DoQ;
du->serverConnID = serverConnID;
du->streamID = streamID;
+ du->responseSender = &dsc.d_responseSender;
processDOQQuery(std::move(du));
}
}
}
+static void flushResponses(pdns::channel::Receiver<DOQUnit>& receiver)
+{
+ for(;;) {
+ try {
+ auto tmp = receiver.receive();
+ if (!tmp) {
+ return ;
+ }
+
+ auto du = std::move(*tmp);
+ auto conn = getConnection(du->serverConnID);
+
+ handleResponse(*du->dsc->df, *conn, du->streamID, du->response);
+
+ }
+ catch (const std::exception& e) {
+ errlog("Error while processing response received over DoQ: %s", e.what());
+ }
+ catch (...) {
+ errlog("Unspecified error while processing response received over DoQ");
+ }
+ }
+}
+
// this is the entrypoint from dnsdist.cc
void doqThread(ClientState* cs)
{
Socket sock(cs->udpFD);
PacketBuffer buffer(std::numeric_limits<unsigned short>::max());
+ auto mplexer = std::unique_ptr<FDMultiplexer>(FDMultiplexer::getMultiplexerSilent());
+ auto responseReceiverFD = frontend->d_server_config->d_responseReceiver.getDescriptor();
+ mplexer->addReadFD(sock.getHandle(), [](int, FDMultiplexer::funcparam_t&) {});
+ mplexer->addReadFD(responseReceiverFD, [](int, FDMultiplexer::funcparam_t&) {});
while (true) {
- std::string bufferStr;
- ComboAddress client;
- if (waitForData(sock.getHandle(), 1, 0) > 0) {
+ std::vector<int> readyFDs;
+ mplexer->getAvailableFDs(readyFDs, 500);
+
+ if (std::find(readyFDs.begin(), readyFDs.end(), sock.getHandle()) != readyFDs.end()) {
+ std::string bufferStr;
+ ComboAddress client;
sock.recvFrom(bufferStr, client);
uint32_t version{0};
else {
DEBUGLOG("Connection not established");
}
+ // }
+ }
+
+ if (std::find(readyFDs.begin(), readyFDs.end(), responseReceiverFD) != readyFDs.end()) {
+ flushResponses(frontend->d_server_config->d_responseReceiver);
}
for (auto conn = s_connections.begin(); conn != s_connections.end();) {