bool dnssecOK{false};
bool useZeroScope{false};
bool forwardedOverUDP{false};
+ bool selfGenerated{false};
};
struct IDState
#include "config.h"
#include "threadname.hh"
#include "dnsdist.hh"
+#include "dnsdist-async.hh"
#include "dnsdist-ecs.hh"
#include "dnsdist-lua.hh"
#include "dnsdist-lua-ffi.hh"
DNSAction::Action operator()(DNSQuestion* dq, std::string* ruleresult) const override
{
- auto lock = g_lua.lock();
try {
- auto ret = d_func(dq);
- if (ruleresult) {
- if (boost::optional<std::string> rule = std::get<1>(ret)) {
- *ruleresult = *rule;
- }
- else {
- // default to empty string
- ruleresult->clear();
+ DNSAction::Action result;
+ {
+ auto lock = g_lua.lock();
+ auto ret = d_func(dq);
+ if (ruleresult) {
+ if (boost::optional<std::string> rule = std::get<1>(ret)) {
+ *ruleresult = *rule;
+ }
+ else {
+ // default to empty string
+ ruleresult->clear();
+ }
}
+ result = static_cast<Action>(std::get<0>(ret));
}
- return static_cast<Action>(std::get<0>(ret));
+ dnsdist::handleQueuedAsynchronousEvents();
+ return result;
} catch (const std::exception &e) {
warnlog("LuaAction failed inside Lua, returning ServFail: %s", e.what());
} catch (...) {
{}
DNSResponseAction::Action operator()(DNSResponse* dr, std::string* ruleresult) const override
{
- auto lock = g_lua.lock();
try {
- auto ret = d_func(dr);
- if (ruleresult) {
- if (boost::optional<std::string> rule = std::get<1>(ret)) {
- *ruleresult = *rule;
- }
- else {
- // default to empty string
- ruleresult->clear();
+ DNSResponseAction::Action result;
+ {
+ auto lock = g_lua.lock();
+ auto ret = d_func(dr);
+ if (ruleresult) {
+ if (boost::optional<std::string> rule = std::get<1>(ret)) {
+ *ruleresult = *rule;
+ }
+ else {
+ // default to empty string
+ ruleresult->clear();
+ }
}
+ result = static_cast<Action>(std::get<0>(ret));
}
- return static_cast<Action>(std::get<0>(ret));
+ dnsdist::handleQueuedAsynchronousEvents();
+ return result;
} catch (const std::exception &e) {
warnlog("LuaResponseAction failed inside Lua, returning ServFail: %s", e.what());
} catch (...) {
{
dnsdist_ffi_dnsquestion_t dqffi(dq);
try {
- auto lock = g_lua.lock();
- auto ret = d_func(&dqffi);
- if (ruleresult) {
- if (dqffi.result) {
- *ruleresult = *dqffi.result;
- }
- else {
- // default to empty string
- ruleresult->clear();
+ DNSAction::Action result;
+ {
+ auto lock = g_lua.lock();
+ auto ret = d_func(&dqffi);
+ if (ruleresult) {
+ if (dqffi.result) {
+ *ruleresult = *dqffi.result;
+ }
+ else {
+ // default to empty string
+ ruleresult->clear();
+ }
}
+ result = static_cast<DNSAction::Action>(ret);
}
- return static_cast<DNSAction::Action>(ret);
+ dnsdist::handleQueuedAsynchronousEvents();
+ return result;
} catch (const std::exception &e) {
warnlog("LuaFFIAction failed inside Lua, returning ServFail: %s", e.what());
} catch (...) {
ruleresult->clear();
}
}
+ dnsdist::handleQueuedAsynchronousEvents();
return static_cast<DNSAction::Action>(ret);
}
catch (const std::exception &e) {
{
dnsdist_ffi_dnsresponse_t drffi(dr);
try {
- auto lock = g_lua.lock();
- auto ret = d_func(&drffi);
- if (ruleresult) {
- if (drffi.result) {
- *ruleresult = *drffi.result;
- }
- else {
- // default to empty string
- ruleresult->clear();
+ DNSResponseAction::Action result;
+ {
+ auto lock = g_lua.lock();
+ auto ret = d_func(&drffi);
+ if (ruleresult) {
+ if (drffi.result) {
+ *ruleresult = *drffi.result;
+ }
+ else {
+ // default to empty string
+ ruleresult->clear();
+ }
}
+ result = static_cast<DNSResponseAction::Action>(ret);
}
- return static_cast<DNSResponseAction::Action>(ret);
+ dnsdist::handleQueuedAsynchronousEvents();
+ return result;
} catch (const std::exception &e) {
warnlog("LuaFFIResponseAction failed inside Lua, returning ServFail: %s", e.what());
} catch (...) {
ruleresult->clear();
}
}
+ dnsdist::handleQueuedAsynchronousEvents();
return static_cast<DNSResponseAction::Action>(ret);
}
catch (const std::exception &e) {
--state->d_currentQueriesCount;
- if (currentResponse.d_selfGenerated == false && currentResponse.d_connection && currentResponse.d_connection->getDS()) {
- const auto& ds = currentResponse.d_connection->getDS();
+ const auto& ds = currentResponse.d_connection ? currentResponse.d_connection->getDS() : currentResponse.d_ds;
+ if (currentResponse.d_idstate.selfGenerated == false && ds) {
const auto& ids = currentResponse.d_idstate;
double udiff = ids.queryRealTime.udiff();
vinfolog("Got answer from %s, relayed to %s (%s, %d bytes), took %f usec", ds->d_config.remote.toStringWithPort(), ids.origRemote.toStringWithPort(), (state->d_handler.isTLS() ? "DoT" : "TCP"), currentResponse.d_buffer.size(), udiff);
std::shared_ptr<IncomingTCPConnectionState> state = shared_from_this();
- if (response.d_connection && response.d_connection->getDS() && response.d_connection->getDS()->d_config.useProxyProtocol) {
+ if (!response.isAsync() && response.d_connection && response.d_connection->getDS() && response.d_connection->getDS()->d_config.useProxyProtocol) {
// if we have added a TCP Proxy Protocol payload to a connection, don't release it to the general pool as no one else will be able to use it anyway
if (!response.d_connection->willBeReusable(true)) {
// if it can't be reused even by us, well
return;
}
- try {
- auto& ids = response.d_idstate;
- unsigned int qnameWireLength;
- if (!response.d_connection || !responseContentMatches(response.d_buffer, ids.qname, ids.qtype, ids.qclass, response.d_connection->getDS(), qnameWireLength)) {
- state->terminateClientConnection();
- return;
- }
+ if (!response.isAsync()) {
+ try {
+ auto& ids = response.d_idstate;
+ unsigned int qnameWireLength;
+ if (!response.d_connection || !responseContentMatches(response.d_buffer, ids.qname, ids.qtype, ids.qclass, response.d_connection->getDS(), qnameWireLength)) {
+ state->terminateClientConnection();
+ return;
+ }
- if (response.d_connection->getDS()) {
- ++response.d_connection->getDS()->responses;
- }
+ if (response.d_connection->getDS()) {
+ ++response.d_connection->getDS()->responses;
+ }
- DNSResponse dr(ids, response.d_buffer, response.d_connection->getDS());
+ DNSResponse dr(ids, response.d_buffer, response.d_connection->getDS());
+ dr.d_incomingTCPState = state;
- memcpy(&response.d_cleartextDH, dr.getHeader(), sizeof(response.d_cleartextDH));
+ memcpy(&response.d_cleartextDH, dr.getHeader(), sizeof(response.d_cleartextDH));
- if (!processResponse(response.d_buffer, *state->d_threadData.localRespRuleActions, *state->d_threadData.localCacheInsertedRespRuleActions, dr, false)) {
+ if (!processResponse(response.d_buffer, *state->d_threadData.localRespRuleActions, *state->d_threadData.localCacheInsertedRespRuleActions, dr, false)) {
+ state->terminateClientConnection();
+ return;
+ }
+
+ if (dr.isAsynchronous()) {
+ /* we are done for now */
+ return;
+ }
+ }
+ catch (const std::exception& e) {
+ vinfolog("Unexpected exception while handling response from backend: %s", e.what());
state->terminateClientConnection();
return;
}
}
- catch (const std::exception& e) {
- vinfolog("Unexpected exception while handling response from backend: %s", e.what());
- state->terminateClientConnection();
- return;
- }
++g_stats.responses;
++state->d_ci.cs->responses;
class TCPCrossProtocolQuery : public CrossProtocolQuery
{
public:
- TCPCrossProtocolQuery(PacketBuffer&& buffer, InternalQueryState&& ids, std::shared_ptr<DownstreamState>& ds, std::shared_ptr<IncomingTCPConnectionState> sender): CrossProtocolQuery(InternalQuery(std::move(buffer), std::move(ids)), ds), d_sender(std::move(sender))
+ TCPCrossProtocolQuery(PacketBuffer&& buffer, InternalQueryState&& ids, std::shared_ptr<DownstreamState> ds, std::shared_ptr<IncomingTCPConnectionState> sender): CrossProtocolQuery(InternalQuery(std::move(buffer), std::move(ids)), ds), d_sender(std::move(sender))
{
proxyProtocolPayloadSize = 0;
}
return d_sender;
}
+ DNSQuestion getDQ() override
+ {
+ auto& ids = query.d_idstate;
+ DNSQuestion dq(ids, query.d_buffer);
+ dq.d_incomingTCPState = d_sender;
+ return dq;
+ }
+
+ DNSResponse getDR() override
+ {
+ auto& ids = query.d_idstate;
+ DNSResponse dr(ids, query.d_buffer, downstream);
+ dr.d_incomingTCPState = d_sender;
+ return dr;
+ }
+
private:
std::shared_ptr<IncomingTCPConnectionState> d_sender;
};
+std::unique_ptr<CrossProtocolQuery> getTCPCrossProtocolQueryFromDQ(DNSQuestion& dq)
+{
+ auto state = dq.getIncomingTCPState();
+ if (!state) {
+ throw std::runtime_error("Trying to create a TCP cross protocol query without a valid TCP state");
+ }
+
+ dq.ids.origID = dq.getHeader()->id;
+ return std::make_unique<TCPCrossProtocolQuery>(std::move(dq.getMutableData()), std::move(dq.ids), nullptr, std::move(state));
+}
+
void IncomingTCPConnectionState::handleCrossProtocolResponse(const struct timeval& now, TCPResponse&& response)
{
if (d_threadData.crossProtocolResponsesPipe == -1) {
TCPResponse response;
dh->rcode = RCode::NotImp;
dh->qr = true;
- response.d_selfGenerated = true;
+ response.d_idstate.selfGenerated = true;
response.d_buffer = std::move(state->d_buffer);
state->d_state = IncomingTCPConnectionState::State::idle;
++state->d_currentQueriesCount;
DNSQuestion dq(ids, state->d_buffer);
const uint16_t* flags = getFlagsFromDNSHeader(dq.getHeader());
ids.origFlags = *flags;
-
+ dq.d_incomingTCPState = state;
dq.sni = state->d_handler.getServerNameIndication();
+
if (state->d_proxyProtocolValues) {
/* we need to copy them, because the next queries received on that connection will
need to get the _unaltered_ values */
}
std::shared_ptr<DownstreamState> ds;
- auto result = processQuery(dq, *state->d_ci.cs, state->d_threadData.holders, ds);
+ auto result = processQuery(dq, state->d_threadData.holders, ds);
if (result == ProcessQueryResult::Drop) {
state->terminateClientConnection();
return;
}
+ else if (result == ProcessQueryResult::Asynchronous) {
+ /* we are done for now */
+ ++state->d_currentQueriesCount;
+ return;
+ }
// the buffer might have been invalidated by now
const dnsheader* dh = dq.getHeader();
memcpy(&response.d_cleartextDH, dh, sizeof(response.d_cleartextDH));
response.d_idstate = std::move(ids);
response.d_idstate.origID = dh->id;
+ response.d_idstate.selfGenerated = true;
response.d_idstate.cs = state->d_ci.cs;
response.d_buffer = std::move(state->d_buffer);
}
if (cs.d_tcpConcurrentConnectionsLimit > 0 && concurrentConnections > cs.d_tcpConcurrentConnectionsLimit) {
+ vinfolog("Dropped TCP connection from %s because of concurrent connections limit", remote.toStringWithPort());
return;
}
#endif
#include "dnsdist.hh"
+#include "dnsdist-async.hh"
#include "dnsdist-cache.hh"
#include "dnsdist-carbon.hh"
#include "dnsdist-console.hh"
return true;
}
-bool processResponse(PacketBuffer& response, const std::vector<DNSDistResponseRuleAction>& respRuleActions, const std::vector<DNSDistResponseRuleAction>& insertedRespRuleActions, DNSResponse& dr, bool muted)
+bool processResponseAfterRules(PacketBuffer& response, const std::vector<DNSDistResponseRuleAction>& cacheInsertedRespRuleActions, DNSResponse& dr, bool muted)
{
- if (!applyRulesToResponse(respRuleActions, dr)) {
- return false;
- }
-
bool zeroScope = false;
if (!fixUpResponse(response, dr.ids.qname, dr.ids.origFlags, dr.ids.ednsAdded, dr.ids.ecsAdded, dr.ids.useZeroScope ? &zeroScope : nullptr)) {
return false;
}
- if (dr.ids.packetCache && !dr.ids.skipCache && response.size() <= s_maxPacketCacheEntrySize) {
+ if (dr.ids.packetCache && !dr.ids.selfGenerated && !dr.ids.skipCache && response.size() <= s_maxPacketCacheEntrySize) {
if (!dr.ids.useZeroScope) {
/* if the query was not suitable for zero-scope, for
example because it had an existing ECS entry so the hash is
dr.ids.packetCache->insert(cacheKey, zeroScope ? boost::none : dr.ids.subnet, dr.ids.cacheFlags, dr.ids.dnssecOK, dr.ids.qname, dr.ids.qtype, dr.ids.qclass, response, dr.ids.forwardedOverUDP, dr.getHeader()->rcode, dr.ids.tempFailureTTL);
- if (!applyRulesToResponse(insertedRespRuleActions, dr)) {
+ if (!applyRulesToResponse(cacheInsertedRespRuleActions, dr)) {
return false;
}
}
return true;
}
+bool processResponse(PacketBuffer& response, const std::vector<DNSDistResponseRuleAction>& respRuleActions, const std::vector<DNSDistResponseRuleAction>& cacheInsertedRespRuleActions, DNSResponse& dr, bool muted)
+{
+ if (!applyRulesToResponse(respRuleActions, dr)) {
+ return false;
+ }
+
+ if (dr.isAsynchronous()) {
+ return true;
+ }
+
+ return processResponseAfterRules(response, cacheInsertedRespRuleActions, dr, muted);
+}
+
static size_t getInitialUDPPacketBufferSize()
{
static_assert(s_udpIncomingBufferSize <= s_initialUDPPacketBufferSize, "The incoming buffer size should not be larger than s_initialUDPPacketBufferSize");
return s_udpIncomingBufferSize + g_proxyProtocolMaximumSize;
}
-static bool sendUDPResponse(int origFD, const PacketBuffer& response, const int delayMsec, const ComboAddress& origDest, const ComboAddress& origRemote)
+bool sendUDPResponse(int origFD, const PacketBuffer& response, const int delayMsec, const ComboAddress& origDest, const ComboAddress& origRemote)
{
#ifndef DISABLE_DELAY_PIPE
if (delayMsec && g_delay) {
doLatencyStats(incomingProtocol, udiff);
}
-static void handleResponseForUDPClient(InternalQueryState& ids, PacketBuffer& response, const std::vector<DNSDistResponseRuleAction>& respRuleActions, const std::vector<DNSDistResponseRuleAction>& cacheInsertedRespRuleActions, const std::shared_ptr<DownstreamState>& ds, bool selfGenerated)
+static void handleResponseForUDPClient(InternalQueryState& ids, PacketBuffer& response, const std::vector<DNSDistResponseRuleAction>& respRuleActions, const std::vector<DNSDistResponseRuleAction>& cacheInsertedRespRuleActions, const std::shared_ptr<DownstreamState>& ds, bool isAsync, bool selfGenerated)
{
DNSResponse dr(ids, response, ds);
dnsheader cleartextDH;
memcpy(&cleartextDH, dr.getHeader(), sizeof(cleartextDH));
- if (!processResponse(response, respRuleActions, cacheInsertedRespRuleActions, dr, ids.cs && ids.cs->muted)) {
- return;
+ if (!isAsync) {
+ if (!processResponse(response, respRuleActions, cacheInsertedRespRuleActions, dr, ids.cs && ids.cs->muted)) {
+ return;
+ }
+
+ if (dr.isAsynchronous()) {
+ return;
+ }
}
++g_stats.responses;
continue;
}
- handleResponseForUDPClient(*ids, response, *localRespRuleActions, *localCacheInsertedRespRuleActions, dss, false);
+ handleResponseForUDPClient(*ids, response, *localRespRuleActions, *localCacheInsertedRespRuleActions, dss, false, false);
}
}
catch (const std::exception& e) {
bool processRulesResult(const DNSAction::Action& action, DNSQuestion& dq, std::string& ruleresult, bool& drop)
{
+ if (dq.isAsynchronous()) {
+ return false;
+ }
+
switch(action) {
case DNSAction::Action::Allow:
return true;
#endif /* DISABLE_RECVMMSG */
/* self-generated responses or cache hits */
-static bool prepareOutgoingResponse(LocalHolders& holders, ClientState& cs, DNSQuestion& dq, bool cacheHit)
+static bool prepareOutgoingResponse(LocalHolders& holders, const ClientState& cs, DNSQuestion& dq, bool cacheHit)
{
std::shared_ptr<DownstreamState> ds{nullptr};
DNSResponse dr(dq.ids, dq.getMutableData(), ds);
+ dr.d_incomingTCPState = dq.d_incomingTCPState;
+ dr.ids.selfGenerated = true;
if (!applyRulesToResponse(cacheHit ? *holders.cacheHitRespRuleactions : *holders.selfAnsweredRespRuleactions, dr)) {
return false;
ac(&dr, &result);
}
+ if (cacheHit) {
+ ++g_stats.cacheHits;
+ }
+
+ if (dr.isAsynchronous()) {
+ return false;
+ }
+
#ifdef HAVE_DNSCRYPT
if (!cs.muted) {
if (!encryptResponse(dq.getMutableData(), dq.getMaximumSize(), dq.overTCP(), dq.ids.dnsCryptQuery)) {
}
#endif /* HAVE_DNSCRYPT */
- if (cacheHit) {
- ++g_stats.cacheHits;
- }
-
return true;
}
-ProcessQueryResult processQuery(DNSQuestion& dq, ClientState& cs, LocalHolders& holders, std::shared_ptr<DownstreamState>& selectedBackend)
+ProcessQueryResult processQueryAfterRules(DNSQuestion& dq, LocalHolders& holders, std::shared_ptr<DownstreamState>& selectedBackend)
{
const uint16_t queryId = ntohs(dq.getHeader()->id);
try {
- /* we need an accurate ("real") value for the response and
- to store into the IDS, but not for insertion into the
- rings for example */
- struct timespec now;
- gettime(&now);
-
- if (!applyRulesToQuery(holders, dq, now)) {
- return ProcessQueryResult::Drop;
- }
-
if (dq.getHeader()->qr) { // something turned it into a response
fixUpQueryTurnedResponse(dq, dq.ids.origFlags);
vinfolog("Packet cache hit for query for %s|%s from %s (%s, %d bytes)", dq.ids.qname.toLogString(), QType(dq.ids.qtype).toString(), dq.ids.origRemote.toStringWithPort(), dq.ids.protocol.toString(), dq.getData().size());
- if (!prepareOutgoingResponse(holders, cs, dq, true)) {
+ if (!prepareOutgoingResponse(holders, *dq.ids.cs, dq, true)) {
return ProcessQueryResult::Drop;
}
else if (dq.ids.protocol == dnsdist::Protocol::DoH && !forwardedOverUDP) {
/* do a second-lookup for UDP responses, but we do not want TC=1 answers */
if (dq.ids.packetCache->get(dq, dq.getHeader()->id, &dq.ids.cacheKeyUDP, dq.ids.subnet, dq.ids.dnssecOK, true, allowExpired, false, false, false)) {
- if (!prepareOutgoingResponse(holders, cs, dq, true)) {
+ if (!prepareOutgoingResponse(holders, *dq.ids.cs, dq, true)) {
return ProcessQueryResult::Drop;
}
fixUpQueryTurnedResponse(dq, dq.ids.origFlags);
- if (!prepareOutgoingResponse(holders, cs, dq, false)) {
+ if (!prepareOutgoingResponse(holders, *dq.ids.cs, dq, false)) {
return ProcessQueryResult::Drop;
}
++g_stats.responses;
return ProcessQueryResult::PassToBackend;
}
catch (const std::exception& e){
- vinfolog("Got an error while parsing a %s query from %s, id %d: %s", (dq.overTCP() ? "TCP" : "UDP"), dq.ids.origRemote.toStringWithPort(), queryId, e.what());
+ vinfolog("Got an error while parsing a %s query (after applying rules) from %s, id %d: %s", (dq.overTCP() ? "TCP" : "UDP"), dq.ids.origRemote.toStringWithPort(), queryId, e.what());
}
return ProcessQueryResult::Drop;
}
class UDPTCPCrossQuerySender : public TCPQuerySender
{
public:
- UDPTCPCrossQuerySender(const ClientState& cs, const std::shared_ptr<DownstreamState>& ds): d_cs(cs), d_ds(ds)
+ UDPTCPCrossQuerySender()
{
}
return true;
}
- const ClientState* getClientState() const override
- {
- return &d_cs;
- }
-
void handleResponse(const struct timeval& now, TCPResponse&& response) override
{
- if (!d_ds && !response.d_selfGenerated) {
+ if (!response.d_ds && !response.d_idstate.selfGenerated) {
throw std::runtime_error("Passing a cross-protocol answer originated from UDP without a valid downstream");
}
static thread_local LocalStateHolder<vector<DNSDistResponseRuleAction>> localRespRuleActions = g_respruleactions.getLocal();
static thread_local LocalStateHolder<vector<DNSDistResponseRuleAction>> localCacheInsertedRespRuleActions = g_cacheInsertedRespRuleActions.getLocal();
- handleResponseForUDPClient(ids, response.d_buffer, *localRespRuleActions, *localCacheInsertedRespRuleActions, d_ds, response.d_selfGenerated);
+ handleResponseForUDPClient(ids, response.d_buffer, *localRespRuleActions, *localCacheInsertedRespRuleActions, response.d_ds, response.isAsync(), response.d_idstate.selfGenerated);
}
void handleXFRResponse(const struct timeval& now, TCPResponse&& response) override
{
// nothing to do
}
-private:
- const ClientState& d_cs;
- const std::shared_ptr<DownstreamState> d_ds{nullptr};
};
class UDPCrossProtocolQuery : public CrossProtocolQuery
{
public:
- UDPCrossProtocolQuery(PacketBuffer&& buffer, InternalQueryState&& ids, std::shared_ptr<DownstreamState>& ds)
+ UDPCrossProtocolQuery(PacketBuffer&& buffer_, InternalQueryState&& ids_, std::shared_ptr<DownstreamState> ds): CrossProtocolQuery(InternalQuery(std::move(buffer_), std::move(ids_)), ds)
{
- uint16_t z = 0;
- getEDNSUDPPayloadSizeAndZ(reinterpret_cast<const char*>(buffer.data()), buffer.size(), &ids.udpPayloadSize, &z);
- if (ids.udpPayloadSize < 512) {
- ids.udpPayloadSize = 512;
+ auto& ids = query.d_idstate;
+ const auto& buffer = query.d_buffer;
+
+ if (ids.udpPayloadSize == 0) {
+ uint16_t z = 0;
+ getEDNSUDPPayloadSizeAndZ(reinterpret_cast<const char*>(buffer.data()), buffer.size(), &ids.udpPayloadSize, &z);
+ if (ids.udpPayloadSize < 512) {
+ ids.udpPayloadSize = 512;
+ }
}
- query = InternalQuery(std::move(buffer), std::move(ids));
- downstream = ds;
}
~UDPCrossProtocolQuery()
std::shared_ptr<TCPQuerySender> getTCPQuerySender() override
{
- auto sender = std::make_shared<UDPTCPCrossQuerySender>(*query.d_idstate.cs, downstream);
- return sender;
+ return s_sender;
}
+private:
+ static std::shared_ptr<UDPTCPCrossQuerySender> s_sender;
};
+std::shared_ptr<UDPTCPCrossQuerySender> UDPCrossProtocolQuery::s_sender = std::make_shared<UDPTCPCrossQuerySender>();
+
+std::unique_ptr<CrossProtocolQuery> getUDPCrossProtocolQueryFromDQ(DNSQuestion& dq);
+std::unique_ptr<CrossProtocolQuery> getUDPCrossProtocolQueryFromDQ(DNSQuestion& dq)
+{
+ dq.ids.origID = dq.getHeader()->id;
+ return std::make_unique<UDPCrossProtocolQuery>(std::move(dq.getMutableData()), std::move(dq.ids), nullptr);
+}
+
+ProcessQueryResult processQuery(DNSQuestion& dq, LocalHolders& holders, std::shared_ptr<DownstreamState>& selectedBackend)
+{
+ const uint16_t queryId = ntohs(dq.getHeader()->id);
+
+ try {
+ /* we need an accurate ("real") value for the response and
+ to store into the IDS, but not for insertion into the
+ rings for example */
+ struct timespec now;
+ gettime(&now);
+
+ if (!applyRulesToQuery(holders, dq, now)) {
+ return ProcessQueryResult::Drop;
+ }
+
+ if (dq.isAsynchronous()) {
+ return ProcessQueryResult::Asynchronous;
+ }
+
+ return processQueryAfterRules(dq, holders, selectedBackend);
+ }
+ catch (const std::exception& e){
+ vinfolog("Got an error while parsing a %s query from %s, id %d: %s", (dq.overTCP() ? "TCP" : "UDP"), dq.ids.origRemote.toStringWithPort(), queryId, e.what());
+ }
+ return ProcessQueryResult::Drop;
+}
+
bool assignOutgoingUDPQueryToBackend(std::shared_ptr<DownstreamState>& ds, uint16_t queryID, DNSQuestion& dq, PacketBuffer& query, ComboAddress& dest)
{
bool doh = dq.ids.du != nullptr;
}
std::shared_ptr<DownstreamState> ss{nullptr};
- auto result = processQuery(dq, cs, holders, ss);
+ auto result = processQuery(dq, holders, ss);
- if (result == ProcessQueryResult::Drop) {
+ if (result == ProcessQueryResult::Drop || result == ProcessQueryResult::Asynchronous) {
return;
}
/* we use dest, always, because we don't want to use the listening address to send a response since it could be 0.0.0.0 */
sendUDPResponse(cs.udpFD, query, dq.ids.delayMsec, dest, remote);
- handleResponseSent(ids, 0., remote, ComboAddress(), query.size(), *dh, dnsdist::Protocol::DoUDP);
+ handleResponseSent(dq.ids.qname, dq.ids.qtype, 0., remote, ComboAddress(), query.size(), *dh, dnsdist::Protocol::DoUDP, dnsdist::Protocol::DoUDP);
return;
}
#endif
}
+ dnsdist::g_asyncHolder = std::make_unique<dnsdist::AsynchronousHolder>();
+
auto todo = setupLua(*(g_lua.lock()), false, false, g_cmdLine.config);
auto localPools = g_pools.getCopy();
using QTag = std::unordered_map<string, string>;
+class IncomingTCPConnectionState;
+
+struct ClientState;
+
struct DNSQuestion
{
DNSQuestion(InternalQueryState& ids_, PacketBuffer& data_):
DNSQuestion(const DNSQuestion&) = delete;
DNSQuestion& operator=(const DNSQuestion&) = delete;
DNSQuestion(DNSQuestion&&) = default;
+ virtual ~DNSQuestion() = default;
std::string getTrailingData() const;
bool setTrailingData(const std::string&);
return ids.queryRealTime.d_start;
}
+ bool isAsynchronous() const
+ {
+ return asynchronous;
+ }
+
+ std::shared_ptr<IncomingTCPConnectionState> getIncomingTCPState() const
+ {
+ return d_incomingTCPState;
+ }
+
+ ClientState* getFrontend() const
+ {
+ return ids.cs;
+ }
+
protected:
PacketBuffer& data;
std::unique_ptr<Netmask> ecs{nullptr};
std::string sni; /* Server Name Indication, if any (DoT or DoH) */
mutable std::unique_ptr<EDNSOptionViewMap> ednsOptions; /* this needs to be mutable because it is parsed just in time, when DNSQuestion is read-only */
+ std::shared_ptr<IncomingTCPConnectionState> d_incomingTCPState{nullptr};
std::unique_ptr<std::vector<ProxyProtocolValue>> proxyProtocolValues{nullptr};
uint16_t ecsPrefixLength;
uint8_t ednsRCode{0};
bool ecsOverride;
bool useECS{true};
bool addXPF{true};
+ bool asynchronous{false};
};
+struct DownstreamState;
+
struct DNSResponse : DNSQuestion
{
DNSResponse(InternalQueryState& ids_, PacketBuffer& data_, const std::shared_ptr<DownstreamState>& downstream):
void resetLuaSideEffect(); // reset to indeterminate state
bool responseContentMatches(const PacketBuffer& response, const DNSName& qname, const uint16_t qtype, const uint16_t qclass, const std::shared_ptr<DownstreamState>& remote, unsigned int& qnameWireLength);
-bool processResponse(PacketBuffer& response, const std::vector<DNSDistResponseRuleAction>& respRuleActions, const std::vector<DNSDistResponseRuleAction>& insertedRespRuleActions, DNSResponse& dr, bool muted);
-bool processRulesResult(const DNSAction::Action& action, DNSQuestion& dq, std::string& ruleresult, bool& drop);
bool checkQueryHeaders(const struct dnsheader* dh, ClientState& cs);
static const uint16_t s_udpIncomingBufferSize{1500}; // don't accept UDP queries larger than this value
static const size_t s_maxPacketCacheEntrySize{4096}; // don't cache responses larger than this value
-enum class ProcessQueryResult : uint8_t { Drop, SendAnswer, PassToBackend };
-ProcessQueryResult processQuery(DNSQuestion& dq, ClientState& cs, LocalHolders& holders, std::shared_ptr<DownstreamState>& selectedBackend);
+enum class ProcessQueryResult : uint8_t { Drop, SendAnswer, PassToBackend, Asynchronous };
+ProcessQueryResult processQuery(DNSQuestion& dq, LocalHolders& holders, std::shared_ptr<DownstreamState>& selectedBackend);
+ProcessQueryResult processQueryAfterRules(DNSQuestion& dq, LocalHolders& holders, std::shared_ptr<DownstreamState>& selectedBackend);
+bool processResponse(PacketBuffer& response, const std::vector<DNSDistResponseRuleAction>& respRuleActions, const std::vector<DNSDistResponseRuleAction>& insertedRespRuleActions, DNSResponse& dr, bool muted);
+bool processRulesResult(const DNSAction::Action& action, DNSQuestion& dq, std::string& ruleresult, bool& drop);
+bool processResponseAfterRules(PacketBuffer& response, const std::vector<DNSDistResponseRuleAction>& cacheInsertedRespRuleActions, DNSResponse& dr, bool muted);
bool assignOutgoingUDPQueryToBackend(std::shared_ptr<DownstreamState>& ds, uint16_t queryID, DNSQuestion& dq, PacketBuffer& query, ComboAddress& dest);
ssize_t udpClientSendRequestToBackend(const std::shared_ptr<DownstreamState>& ss, const int sd, const PacketBuffer& request, bool healthCheck = false);
+bool sendUDPResponse(int origFD, const PacketBuffer& response, const int delayMsec, const ComboAddress& origDest, const ComboAddress& origRemote);
void handleResponseSent(const DNSName& qname, const QType& qtype, double udiff, const ComboAddress& client, const ComboAddress& backend, unsigned int size, const dnsheader& cleartextDH, dnsdist::Protocol outgoingProtocol, dnsdist::Protocol incomingProtocol);
void handleResponseSent(const InternalQueryState& ids, double udiff, const ComboAddress& client, const ComboAddress& backend, unsigned int size, const dnsheader& cleartextDH, dnsdist::Protocol outgoingProtocol);
dns.cc dns.hh \
dns_random.hh \
dnscrypt.cc dnscrypt.hh \
+ dnsdist-async.cc dnsdist-async.hh \
dnsdist-backend.cc \
dnsdist-cache.cc dnsdist-cache.hh \
dnsdist-carbon.cc dnsdist-carbon.hh \
dnsdist-ecs.cc dnsdist-ecs.hh \
dnsdist-healthchecks.cc dnsdist-healthchecks.hh \
dnsdist-idstate.hh \
+ dnsdist-internal-queries.cc dnsdist-internal-queries.hh \
dnsdist-kvs.hh dnsdist-kvs.cc \
dnsdist-lbpolicies.cc dnsdist-lbpolicies.hh \
dnsdist-lua-actions.cc \
credentials.cc credentials.hh \
dns.cc dns.hh \
dnscrypt.cc dnscrypt.hh \
+ dnsdist-async.cc dnsdist-async.hh \
dnsdist-backend.cc \
dnsdist-cache.cc dnsdist-cache.hh \
dnsdist-dnsparser.cc dnsdist-dnsparser.hh \
test-dnsdist-connections-cache.cc \
test-dnsdist-dnsparser.cc \
test-dnsdist_cc.cc \
+ test-dnsdistasync.cc \
test-dnsdistbackend_cc.cc \
test-dnsdistdynblocks_hh.cc \
test-dnsdistkvs_cc.cc \
--- /dev/null
+/*
+ * This file is part of PowerDNS or dnsdist.
+ * Copyright -- PowerDNS.COM B.V. and its contributors
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of version 2 of the GNU General Public License as
+ * published by the Free Software Foundation.
+ *
+ * In addition, for the avoidance of any doubt, permission is granted to
+ * link this program with OpenSSL and to (re)distribute the binaries
+ * produced as the result of such linking.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
+ */
+#include "dnsdist-async.hh"
+#include "dnsdist-internal-queries.hh"
+#include "dolog.hh"
+#include "threadname.hh"
+
+namespace dnsdist
+{
+
+AsynchronousHolder::AsynchronousHolder(bool failOpen)
+{
+ d_data = std::make_shared<Data>();
+ d_data->d_failOpen = failOpen;
+
+ int fds[2] = {-1, -1};
+ if (pipe(fds) < 0) {
+ throw std::runtime_error("Error creating the AsynchronousHolder pipe: " + stringerror());
+ }
+
+ for (size_t idx = 0; idx < (sizeof(fds) / sizeof(*fds)); idx++) {
+ if (!setNonBlocking(fds[idx])) {
+ int err = errno;
+ close(fds[0]);
+ close(fds[1]);
+ throw std::runtime_error("Error setting the AsynchronousHolder pipe non-blocking: " + stringerror(err));
+ }
+ }
+
+ d_data->d_notifyPipe = FDWrapper(fds[1]);
+ d_data->d_watchPipe = FDWrapper(fds[0]);
+
+ std::thread main([data = this->d_data] { mainThread(data); });
+ main.detach();
+}
+
+AsynchronousHolder::~AsynchronousHolder()
+{
+ try {
+ stop();
+ }
+ catch (...) {
+ }
+}
+
+bool AsynchronousHolder::notify() const
+{
+ const char data = 0;
+ bool failed = false;
+ do {
+ auto written = write(d_data->d_notifyPipe.getHandle(), &data, sizeof(data));
+ if (written == 0) {
+ break;
+ }
+ if (written > 0 && static_cast<size_t>(written) == sizeof(data)) {
+ return true;
+ }
+ if (errno != EINTR) {
+ failed = true;
+ }
+ } while (!failed);
+
+ return false;
+}
+
+bool AsynchronousHolder::wait(const AsynchronousHolder::Data& data, FDMultiplexer& mplexer, std::vector<int>& readyFDs, int atMostMs)
+{
+ readyFDs.clear();
+ mplexer.getAvailableFDs(readyFDs, atMostMs);
+ if (readyFDs.size() == 0) {
+ /* timeout */
+ return true;
+ }
+
+ while (true) {
+ /* we might have been notified several times, let's read
+ as much as possible before returning */
+ char dummy = 0;
+ auto got = read(data.d_watchPipe.getHandle(), &dummy, sizeof(dummy));
+ if (got == 0) {
+ break;
+ }
+ if (got > 0 && static_cast<size_t>(got) != sizeof(dummy)) {
+ continue;
+ }
+ if (got == -1 && (errno == EAGAIN || errno == EWOULDBLOCK)) {
+ break;
+ }
+ }
+
+ return false;
+}
+
+void AsynchronousHolder::stop()
+{
+ {
+ auto content = d_data->d_content.lock();
+ d_data->d_done = true;
+ }
+
+ notify();
+}
+
+void AsynchronousHolder::mainThread(std::shared_ptr<Data> data)
+{
+ setThreadName("dnsdist/async");
+ struct timeval now;
+ std::list<std::pair<uint16_t, std::unique_ptr<CrossProtocolQuery>>> expiredEvents;
+
+ auto mplexer = std::unique_ptr<FDMultiplexer>(FDMultiplexer::getMultiplexerSilent(1));
+ mplexer->addReadFD(data->d_watchPipe.getHandle(), [](int, FDMultiplexer::funcparam_t&) {});
+ std::vector<int> readyFDs;
+
+ while (true) {
+ bool shouldWait = true;
+ int timeout = -1;
+ {
+ auto content = data->d_content.lock();
+ if (data->d_done) {
+ return;
+ }
+
+ if (!content->empty()) {
+ gettimeofday(&now, nullptr);
+ struct timeval next = getNextTTD(*content);
+ if (next <= now) {
+ pickupExpired(*content, now, expiredEvents);
+ shouldWait = false;
+ }
+ else {
+ auto remainingUsec = uSec(next - now);
+ timeout = std::round(remainingUsec / 1000.0);
+ if (timeout == 0 && remainingUsec > 0) {
+ /* if we have less than 1 ms, let's wait at least 1 ms */
+ timeout = 1;
+ }
+ }
+ }
+ }
+
+ if (shouldWait) {
+ auto timedOut = wait(*data, *mplexer, readyFDs, timeout);
+ if (timedOut) {
+ auto content = data->d_content.lock();
+ gettimeofday(&now, nullptr);
+ pickupExpired(*content, now, expiredEvents);
+ }
+ }
+
+ while (!expiredEvents.empty()) {
+ auto [queryID, query] = std::move(expiredEvents.front());
+ expiredEvents.pop_front();
+ if (!data->d_failOpen) {
+ vinfolog("Asynchronous query %d has expired at %d.%d, notifying the sender", queryID, now.tv_sec, now.tv_usec);
+ auto sender = query->getTCPQuerySender();
+ if (sender) {
+ sender->notifyIOError(std::move(query->query.d_idstate), now);
+ }
+ }
+ else {
+ vinfolog("Asynchronous query %d has expired at %d.%d, resuming", queryID, now.tv_sec, now.tv_usec);
+ resumeQuery(std::move(query));
+ }
+ }
+ }
+}
+
+void AsynchronousHolder::push(uint16_t asyncID, uint16_t queryID, const struct timeval& ttd, std::unique_ptr<CrossProtocolQuery>&& query)
+{
+ bool needNotify = false;
+ {
+ auto content = d_data->d_content.lock();
+ if (!content->empty()) {
+ /* the thread is already waiting on a TTD expiry in addition to notifications,
+ let's not wake it unless our TTD comes before the current one */
+ const struct timeval next = getNextTTD(*content);
+ if (ttd < next) {
+ needNotify = true;
+ }
+ }
+ else {
+ /* the thread is currently only waiting for a notify */
+ needNotify = true;
+ }
+ content->insert({std::move(query), ttd, asyncID, queryID});
+ }
+
+ if (needNotify) {
+ notify();
+ }
+}
+
+std::unique_ptr<CrossProtocolQuery> AsynchronousHolder::get(uint16_t asyncID, uint16_t queryID)
+{
+ /* no need to notify, worst case the thread wakes up for nothing because this was the next TTD */
+ auto content = d_data->d_content.lock();
+ auto it = content->find(std::tie(queryID, asyncID));
+ if (it == content->end()) {
+ struct timeval now;
+ gettimeofday(&now, nullptr);
+ vinfolog("Asynchronous object %d not found at %d.%d", queryID, now.tv_sec, now.tv_usec);
+ return nullptr;
+ }
+
+ auto result = std::move(it->d_query);
+ content->erase(it);
+ return result;
+}
+
+void AsynchronousHolder::pickupExpired(content_t& content, const struct timeval& now, std::list<std::pair<uint16_t, std::unique_ptr<CrossProtocolQuery>>>& events)
+{
+ auto& idx = content.get<TTDTag>();
+ for (auto it = idx.begin(); it != idx.end() && it->d_ttd < now;) {
+ events.emplace_back(it->d_queryID, std::move(it->d_query));
+ it = idx.erase(it);
+ }
+}
+
+struct timeval AsynchronousHolder::getNextTTD(const content_t& content)
+{
+ if (content.empty()) {
+ throw std::runtime_error("AsynchronousHolder::getNextTTD() called on an empty holder");
+ }
+
+ return content.get<TTDTag>().begin()->d_ttd;
+}
+
+bool AsynchronousHolder::empty()
+{
+ return d_data->d_content.read_only_lock()->empty();
+}
+
+static bool resumeResponse(std::unique_ptr<CrossProtocolQuery>&& response)
+{
+ try {
+ auto& ids = response->query.d_idstate;
+ DNSResponse dr = response->getDR();
+
+ LocalHolders holders;
+ auto result = processResponseAfterRules(response->query.d_buffer, *holders.cacheInsertedRespRuleActions, dr, ids.cs->muted);
+ if (!result) {
+ /* easy */
+ return true;
+ }
+
+ auto sender = response->getTCPQuerySender();
+ if (sender) {
+ struct timeval now;
+ gettimeofday(&now, nullptr);
+
+ TCPResponse resp(std::move(response->query.d_buffer), std::move(response->query.d_idstate), nullptr, response->downstream);
+ resp.d_async = true;
+ sender->handleResponse(now, std::move(resp));
+ }
+ }
+ catch (const std::exception& e) {
+ vinfolog("Got exception while resuming cross-protocol response: %s", e.what());
+ return false;
+ }
+
+ return true;
+}
+
+static LockGuarded<std::deque<std::unique_ptr<CrossProtocolQuery>>> s_asynchronousEventsQueue;
+
+bool queueQueryResumptionEvent(std::unique_ptr<CrossProtocolQuery>&& query)
+{
+ s_asynchronousEventsQueue.lock()->push_back(std::move(query));
+ return true;
+}
+
+void handleQueuedAsynchronousEvents()
+{
+ while (true) {
+ std::unique_ptr<CrossProtocolQuery> query;
+ {
+ // we do not want to hold the lock while resuming
+ auto queue = s_asynchronousEventsQueue.lock();
+ if (queue->empty()) {
+ return;
+ }
+
+ query = std::move(queue->front());
+ queue->pop_front();
+ }
+ if (query && !resumeQuery(std::move(query))) {
+ vinfolog("Unable to resume asynchronous query event");
+ }
+ }
+}
+
+bool resumeQuery(std::unique_ptr<CrossProtocolQuery>&& query)
+{
+ if (query->d_isResponse) {
+ return resumeResponse(std::move(query));
+ }
+
+ auto& ids = query->query.d_idstate;
+ DNSQuestion dq = query->getDQ();
+ LocalHolders holders;
+
+ auto result = processQueryAfterRules(dq, holders, query->downstream);
+ if (result == ProcessQueryResult::Drop) {
+ /* easy */
+ return true;
+ }
+ else if (result == ProcessQueryResult::PassToBackend) {
+ if (query->downstream == nullptr) {
+ return false;
+ }
+
+#ifdef HAVE_DNS_OVER_HTTPS
+ if (dq.ids.du != nullptr) {
+ dq.ids.du->downstream = query->downstream;
+ }
+#endif
+
+ if (query->downstream->isTCPOnly() || !(dq.getProtocol().isUDP() || dq.getProtocol() == dnsdist::Protocol::DoH)) {
+ query->downstream->passCrossProtocolQuery(std::move(query));
+ return true;
+ }
+
+ auto queryID = dq.getHeader()->id;
+ /* at this point 'du', if it is not nullptr, is owned by the DoHCrossProtocolQuery
+ which will stop existing when we return, so we need to increment the reference count
+ */
+ return assignOutgoingUDPQueryToBackend(query->downstream, queryID, dq, query->query.d_buffer, ids.origDest);
+ }
+ else if (result == ProcessQueryResult::SendAnswer) {
+ auto sender = query->getTCPQuerySender();
+ if (!sender) {
+ return false;
+ }
+
+ struct timeval now;
+ gettimeofday(&now, nullptr);
+
+ TCPResponse response(std::move(query->query.d_buffer), std::move(query->query.d_idstate), nullptr, query->downstream);
+ response.d_async = true;
+ response.d_idstate.selfGenerated = true;
+
+ try {
+ sender->handleResponse(now, std::move(response));
+ return true;
+ }
+ catch (const std::exception& e) {
+ vinfolog("Got exception while resuming cross-protocol self-answered query: %s", e.what());
+ return false;
+ }
+ }
+ else if (result == ProcessQueryResult::Asynchronous) {
+ /* nope */
+ errlog("processQueryAfterRules returned 'asynchronous' while trying to resume an already asynchronous query");
+ return false;
+ }
+
+ return false;
+}
+
+bool suspendQuery(DNSQuestion& dq, uint16_t asyncID, uint16_t queryID, uint32_t timeoutMs)
+{
+ if (!g_asyncHolder) {
+ return false;
+ }
+
+ struct timeval now;
+ gettimeofday(&now, nullptr);
+ struct timeval ttd = now;
+ ttd.tv_sec += timeoutMs / 1000;
+ ttd.tv_usec += (timeoutMs % 1000) * 1000;
+ if (ttd.tv_usec >= 1000000) {
+ ttd.tv_sec++;
+ ttd.tv_usec -= 1000000;
+ }
+
+ vinfolog("Suspending asynchronous query %d at %d.%d until %d.%d", queryID, now.tv_sec, now.tv_usec, ttd.tv_sec, ttd.tv_usec);
+ auto query = getInternalQueryFromDQ(dq, false);
+
+ g_asyncHolder->push(asyncID, queryID, ttd, std::move(query));
+ return true;
+}
+
+bool suspendResponse(DNSResponse& dr, uint16_t asyncID, uint16_t queryID, uint32_t timeoutMs)
+{
+ if (!g_asyncHolder) {
+ return false;
+ }
+
+ struct timeval now;
+ gettimeofday(&now, nullptr);
+ struct timeval ttd = now;
+ ttd.tv_sec += timeoutMs / 1000;
+ ttd.tv_usec += (timeoutMs % 1000) * 1000;
+ if (ttd.tv_usec >= 1000000) {
+ ttd.tv_sec++;
+ ttd.tv_usec -= 1000000;
+ }
+
+ vinfolog("Suspending asynchronous response %d at %d.%d until %d.%d", queryID, now.tv_sec, now.tv_usec, ttd.tv_sec, ttd.tv_usec);
+ auto query = getInternalQueryFromDQ(dr, true);
+ query->d_isResponse = true;
+ query->downstream = dr.d_downstream;
+
+ g_asyncHolder->push(asyncID, queryID, ttd, std::move(query));
+ return true;
+}
+
+std::unique_ptr<AsynchronousHolder> g_asyncHolder;
+}
--- /dev/null
+/*
+ * This file is part of PowerDNS or dnsdist.
+ * Copyright -- PowerDNS.COM B.V. and its contributors
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of version 2 of the GNU General Public License as
+ * published by the Free Software Foundation.
+ *
+ * In addition, for the avoidance of any doubt, permission is granted to
+ * link this program with OpenSSL and to (re)distribute the binaries
+ * produced as the result of such linking.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
+ */
+#pragma once
+
+#include <thread>
+
+#include <boost/multi_index_container.hpp>
+#include <boost/multi_index/ordered_index.hpp>
+#include <boost/multi_index/key_extractors.hpp>
+
+#include "dnsdist-tcp.hh"
+
+namespace dnsdist
+{
+class AsynchronousHolder
+{
+public:
+ AsynchronousHolder(bool failOpen = true);
+ ~AsynchronousHolder();
+ void push(uint16_t asyncID, uint16_t queryID, const struct timeval& ttd, std::unique_ptr<CrossProtocolQuery>&& query);
+ std::unique_ptr<CrossProtocolQuery> get(uint16_t asyncID, uint16_t queryID);
+ bool empty();
+ void stop();
+
+private:
+ struct TTDTag
+ {
+ };
+ struct IDTag
+ {
+ };
+
+ struct Entry
+ {
+ /* not used by any of the indexes, so mutable */
+ mutable std::unique_ptr<CrossProtocolQuery> d_query;
+ struct timeval d_ttd;
+ uint16_t d_asyncID;
+ uint16_t d_queryID;
+ };
+
+ typedef multi_index_container<
+ Entry,
+ indexed_by<
+ ordered_unique<tag<IDTag>,
+ composite_key<
+ Entry,
+ member<Entry, uint16_t, &Entry::d_queryID>,
+ member<Entry, uint16_t, &Entry::d_asyncID>>>,
+ ordered_non_unique<tag<TTDTag>,
+ member<Entry, struct timeval, &Entry::d_ttd>>>>
+ content_t;
+
+ static void pickupExpired(content_t&, const struct timeval& now, std::list<std::pair<uint16_t, std::unique_ptr<CrossProtocolQuery>>>& expiredEvents);
+ static struct timeval getNextTTD(const content_t&);
+
+ struct Data
+ {
+ LockGuarded<content_t> d_content;
+ FDWrapper d_notifyPipe;
+ FDWrapper d_watchPipe;
+ bool d_failOpen{true};
+ bool d_done{false};
+ };
+ std::shared_ptr<Data> d_data{nullptr};
+
+ static void mainThread(std::shared_ptr<Data> data);
+ static bool wait(const Data& data, FDMultiplexer& mplexer, std::vector<int>& readyFDs, int atMostMs);
+ bool notify() const;
+};
+
+bool suspendQuery(DNSQuestion& dq, uint16_t asyncID, uint16_t queryID, uint32_t timeoutMs);
+bool suspendResponse(DNSResponse& dr, uint16_t asyncID, uint16_t queryID, uint32_t timeoutMs);
+bool queueQueryResumptionEvent(std::unique_ptr<CrossProtocolQuery>&& query);
+bool resumeQuery(std::unique_ptr<CrossProtocolQuery>&& query);
+void handleQueuedAsynchronousEvents();
+
+extern std::unique_ptr<AsynchronousHolder> g_asyncHolder;
+}
return true;
}
- const ClientState* getClientState() const override
- {
- return nullptr;
- }
-
void handleResponse(const struct timeval& now, TCPResponse&& response) override
{
d_data->d_buffer = std::move(response.d_buffer);
--- /dev/null
+/*
+ * This file is part of PowerDNS or dnsdist.
+ * Copyright -- PowerDNS.COM B.V. and its contributors
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of version 2 of the GNU General Public License as
+ * published by the Free Software Foundation.
+ *
+ * In addition, for the avoidance of any doubt, permission is granted to
+ * link this program with OpenSSL and to (re)distribute the binaries
+ * produced as the result of such linking.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
+ */
+#include "dnsdist-internal-queries.hh"
+#include "dnsdist-tcp.hh"
+#include "doh.hh"
+
+std::unique_ptr<CrossProtocolQuery> getUDPCrossProtocolQueryFromDQ(DNSQuestion& dq);
+
+namespace dnsdist
+{
+std::unique_ptr<CrossProtocolQuery> getInternalQueryFromDQ(DNSQuestion& dq, bool isResponse)
+{
+ auto protocol = dq.getProtocol();
+ if (protocol == dnsdist::Protocol::DoUDP || protocol == dnsdist::Protocol::DNSCryptUDP) {
+ return getUDPCrossProtocolQueryFromDQ(dq);
+ }
+#ifdef HAVE_DNS_OVER_HTTPS
+ else if (protocol == dnsdist::Protocol::DoH) {
+ return getDoHCrossProtocolQueryFromDQ(dq, isResponse);
+ }
+#endif
+ else {
+ return getTCPCrossProtocolQueryFromDQ(dq);
+ }
+}
+}
--- /dev/null
+/*
+ * This file is part of PowerDNS or dnsdist.
+ * Copyright -- PowerDNS.COM B.V. and its contributors
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of version 2 of the GNU General Public License as
+ * published by the Free Software Foundation.
+ *
+ * In addition, for the avoidance of any doubt, permission is granted to
+ * link this program with OpenSSL and to (re)distribute the binaries
+ * produced as the result of such linking.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
+ */
+#pragma once
+
+#include <memory>
+#include "dnsdist.hh"
+
+namespace dnsdist
+{
+std::unique_ptr<CrossProtocolQuery> getInternalQueryFromDQ(DNSQuestion& dq, bool isResponse);
+}
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
*/
#include "dnsdist.hh"
+#include "dnsdist-async.hh"
#include "dnsdist-lua.hh"
#include "dnsdist-lua-network.hh"
#include "dolog.hh"
}
return listener->addUnixListeningEndpoint(path, endpointID, [cb](dnsdist::NetworkListener::EndpointID endpoint, std::string&& dgram, const std::string& from) {
- auto lock = g_lua.lock();
- cb(endpoint, dgram, from);
+ {
+ auto lock = g_lua.lock();
+ cb(endpoint, dgram, from);
+ }
+ dnsdist::handleQueuedAsynchronousEvents();
});
});
size_t dnsdist_ffi_dnsquestion_get_qname_hash(const dnsdist_ffi_dnsquestion_t* dq, size_t init) __attribute__ ((visibility ("default")));
uint16_t dnsdist_ffi_dnsquestion_get_qtype(const dnsdist_ffi_dnsquestion_t* dq) __attribute__ ((visibility ("default")));
uint16_t dnsdist_ffi_dnsquestion_get_qclass(const dnsdist_ffi_dnsquestion_t* dq) __attribute__ ((visibility ("default")));
+uint16_t dnsdist_ffi_dnsquestion_get_id(const dnsdist_ffi_dnsquestion_t* dq) __attribute__ ((visibility ("default")));
int dnsdist_ffi_dnsquestion_get_rcode(const dnsdist_ffi_dnsquestion_t* dq) __attribute__ ((visibility ("default")));
void* dnsdist_ffi_dnsquestion_get_header(const dnsdist_ffi_dnsquestion_t* dq) __attribute__ ((visibility ("default")));
uint16_t dnsdist_ffi_dnsquestion_get_len(const dnsdist_ffi_dnsquestion_t* dq) __attribute__ ((visibility ("default")));
bool dnsdist_ffi_dnsquestion_get_do(const dnsdist_ffi_dnsquestion_t* dq) __attribute__ ((visibility ("default")));
void dnsdist_ffi_dnsquestion_get_sni(const dnsdist_ffi_dnsquestion_t* dq, const char** sni, size_t* sniSize) __attribute__ ((visibility ("default")));
const char* dnsdist_ffi_dnsquestion_get_tag(const dnsdist_ffi_dnsquestion_t* dq, const char* label) __attribute__ ((visibility ("default")));
+size_t dnsdist_ffi_dnsquestion_get_tag_raw(const dnsdist_ffi_dnsquestion_t* dq, const char* label, char* buffer, size_t bufferSize) __attribute__ ((visibility ("default")));
const char* dnsdist_ffi_dnsquestion_get_http_path(dnsdist_ffi_dnsquestion_t* dq) __attribute__ ((visibility ("default")));
const char* dnsdist_ffi_dnsquestion_get_http_query_string(dnsdist_ffi_dnsquestion_t* dq) __attribute__ ((visibility ("default")));
const char* dnsdist_ffi_dnsquestion_get_http_host(dnsdist_ffi_dnsquestion_t* dq) __attribute__ ((visibility ("default")));
const char* dnsdist_ffi_dnsquestion_get_http_scheme(dnsdist_ffi_dnsquestion_t* dq) __attribute__ ((visibility ("default")));
size_t dnsdist_ffi_dnsquestion_get_mac_addr(const dnsdist_ffi_dnsquestion_t* dq, void* buffer, size_t bufferSize) __attribute__ ((visibility ("default")));
+uint64_t dnsdist_ffi_dnsquestion_get_elapsed_us(const dnsdist_ffi_dnsquestion_t* dq) __attribute__ ((visibility ("default")));
// returns the length of the resulting 'out' array. 'out' is not set if the length is 0
size_t dnsdist_ffi_dnsquestion_get_edns_options(dnsdist_ffi_dnsquestion_t* ref, const dnsdist_ffi_ednsoption_t** out) __attribute__ ((visibility ("default")));
void dnsdist_ffi_dnsquestion_set_temp_failure_ttl(dnsdist_ffi_dnsquestion_t* dq, uint32_t tempFailureTTL) __attribute__ ((visibility ("default")));
void dnsdist_ffi_dnsquestion_unset_temp_failure_ttl(dnsdist_ffi_dnsquestion_t* dq) __attribute__ ((visibility ("default")));
void dnsdist_ffi_dnsquestion_set_tag(dnsdist_ffi_dnsquestion_t* dq, const char* label, const char* value) __attribute__ ((visibility ("default")));
+void dnsdist_ffi_dnsquestion_set_tag_raw(dnsdist_ffi_dnsquestion_t* dq, const char* label, const char* value, size_t valueSize) __attribute__ ((visibility ("default")));
void dnsdist_ffi_dnsquestion_set_http_response(dnsdist_ffi_dnsquestion_t* dq, uint16_t statusCode, const char* body, size_t bodyLen, const char* contentType) __attribute__ ((visibility ("default")));
void dnsdist_ffi_dnsresponse_set_max_returned_ttl(dnsdist_ffi_dnsresponse_t* dr, uint32_t max) __attribute__ ((visibility ("default")));
void dnsdist_ffi_dnsresponse_clear_records_type(dnsdist_ffi_dnsresponse_t* dr, uint16_t qtype) __attribute__ ((visibility ("default")));
+bool dnsdist_ffi_dnsquestion_set_async(dnsdist_ffi_dnsquestion_t* dq, uint16_t asyncID, uint16_t queryID, uint32_t timeoutMs) __attribute__ ((visibility ("default")));
+bool dnsdist_ffi_dnsresponse_set_async(dnsdist_ffi_dnsquestion_t* dq, uint16_t asyncID, uint16_t queryID, uint32_t timeoutMs) __attribute__ ((visibility ("default")));
+
+bool dnsdist_ffi_resume_from_async(uint16_t asyncID, uint16_t queryID, const char* tag, size_t tagSize, const char* tagValue, size_t tagValueSize, bool useCache) __attribute__ ((visibility ("default")));
+bool dnsdist_ffi_drop_from_async(uint16_t asyncID, uint16_t queryID) __attribute__ ((visibility ("default")));
+bool dnsdist_ffi_set_answer_from_async(uint16_t asyncID, uint16_t queryID, const char* raw, size_t rawSize) __attribute__ ((visibility ("default")));
+bool dnsdist_ffi_set_rcode_from_async(uint16_t asyncID, uint16_t queryID, uint8_t rcode, bool clearAnswers) __attribute__ ((visibility ("default")));
+
typedef struct dnsdist_ffi_proxy_protocol_value {
const char* value;
uint16_t size;
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
*/
+#include "dnsdist-async.hh"
#include "dnsdist-dnsparser.hh"
#include "dnsdist-lua-ffi.hh"
#include "dnsdist-mac-address.hh"
return dq->dq->ids.qclass;
}
+uint16_t dnsdist_ffi_dnsquestion_get_id(const dnsdist_ffi_dnsquestion_t* dq)
+{
+ if (dq == nullptr) {
+ return 0;
+ }
+ return ntohs(dq->dq->getHeader()->id);
+}
+
static void dnsdist_ffi_comboaddress_to_raw(const ComboAddress& ca, const void** addr, size_t* addrSize)
{
if (ca.isIPv4()) {
if (dq == nullptr) {
return 0;
}
-
auto ret = dnsdist::MacAddressesCache::get(dq->dq->ids.origRemote, reinterpret_cast<unsigned char*>(buffer), bufferSize);
if (ret != 0) {
return 0;
return 6;
}
+uint64_t dnsdist_ffi_dnsquestion_get_elapsed_us(const dnsdist_ffi_dnsquestion_t* dq)
+{
+ if (dq == nullptr) {
+ return 0;
+ }
+
+ return dq->dq->ids.queryRealTime.udiff();
+}
+
void dnsdist_ffi_dnsquestion_get_masked_remoteaddr(dnsdist_ffi_dnsquestion_t* dq, const void** addr, size_t* addrSize, uint8_t bits)
{
dq->maskedRemote = Netmask(dq->dq->ids.origRemote, bits).getMaskedNetwork();
{
const char * result = nullptr;
- if (dq->dq->ids.qTag != nullptr) {
+ if (dq != nullptr && dq->dq != nullptr && dq->dq->ids.qTag != nullptr) {
const auto it = dq->dq->ids.qTag->find(label);
if (it != dq->dq->ids.qTag->cend()) {
result = it->second.c_str();
return result;
}
+size_t dnsdist_ffi_dnsquestion_get_tag_raw(const dnsdist_ffi_dnsquestion_t* dq, const char* label, char* buffer, size_t bufferSize)
+{
+ if (dq == nullptr || dq->dq == nullptr || dq->dq->ids.qTag == nullptr || label == nullptr || buffer == nullptr || bufferSize == 0) {
+ return 0;
+ }
+
+ const auto it = dq->dq->ids.qTag->find(label);
+ if (it == dq->dq->ids.qTag->cend()) {
+ return 0;
+ }
+
+ if (it->second.size() > bufferSize) {
+ return 0;
+ }
+
+ memcpy(buffer, it->second.c_str(), it->second.size());
+ return it->second.size();
+}
+
const char* dnsdist_ffi_dnsquestion_get_http_path(dnsdist_ffi_dnsquestion_t* dq)
{
if (!dq->httpPath) {
size_t dnsdist_ffi_dnsquestion_get_tag_array(dnsdist_ffi_dnsquestion_t* dq, const dnsdist_ffi_tag_t** out)
{
- if (dq->dq->ids.qTag == nullptr || dq->dq->ids.qTag->size() == 0) {
+ if (dq == nullptr || dq->dq == nullptr || dq->dq->ids.qTag == nullptr || dq->dq->ids.qTag->size() == 0) {
return 0;
}
dq->dq->setTag(label, value);
}
+void dnsdist_ffi_dnsquestion_set_tag_raw(dnsdist_ffi_dnsquestion_t* dq, const char* label, const char* value, size_t valueSize)
+{
+ dq->dq->setTag(label, std::string(value, valueSize));
+}
+
size_t dnsdist_ffi_dnsquestion_get_trailing_data(dnsdist_ffi_dnsquestion_t* dq, const char** out)
{
dq->trailingData = dq->dq->getTrailingData();
}
}
+bool dnsdist_ffi_dnsquestion_set_async(dnsdist_ffi_dnsquestion_t* dq, uint16_t asyncID, uint16_t queryID, uint32_t timeoutMs)
+{
+ try {
+ dq->dq->asynchronous = true;
+ dnsdist::suspendQuery(*dq->dq, asyncID, queryID, timeoutMs);
+ return true;
+ }
+ catch (const std::exception& e) {
+ vinfolog("Error in dnsdist_ffi_dnsquestion_set_async: %s", e.what());
+ }
+ catch (...) {
+ vinfolog("Exception in dnsdist_ffi_dnsquestion_set_async");
+ }
+
+ return false;
+}
+
+bool dnsdist_ffi_dnsresponse_set_async(dnsdist_ffi_dnsquestion_t* dq, uint16_t asyncID, uint16_t queryID, uint32_t timeoutMs)
+{
+ try {
+ dq->dq->asynchronous = true;
+ auto dr = dynamic_cast<DNSResponse*>(dq->dq);
+ if (!dr) {
+ vinfolog("Passed a DNSQuestion instead of a DNSResponse to dnsdist_ffi_dnsresponse_set_async");
+ return false;
+ }
+
+ dnsdist::suspendResponse(*dr, asyncID, queryID, timeoutMs);
+ return true;
+ }
+ catch (const std::exception& e) {
+ vinfolog("Error in dnsdist_ffi_dnsresponse_set_async: %s", e.what());
+ }
+ catch (...) {
+ vinfolog("Exception in dnsdist_ffi_dnsresponse_set_async");
+ }
+ return false;
+}
+
+bool dnsdist_ffi_resume_from_async(uint16_t asyncID, uint16_t queryID, const char* tag, size_t tagSize, const char* tagValue, size_t tagValueSize, bool useCache)
+{
+ if (!dnsdist::g_asyncHolder) {
+ vinfolog("Unable to resume, no asynchronous holder");
+ return false;
+ }
+
+ auto query = dnsdist::g_asyncHolder->get(asyncID, queryID);
+ if (!query) {
+ vinfolog("Unable to resume, no object found for asynchronous ID %d and query ID %d", asyncID, queryID);
+ return false;
+ }
+
+ auto& ids = query->query.d_idstate;
+ if (tag != nullptr && tagSize > 0) {
+ if (!ids.qTag) {
+ ids.qTag = std::make_unique<QTag>();
+ }
+ (*ids.qTag)[std::string(tag, tagSize)] = std::string(tagValue, tagValueSize);
+ }
+
+ ids.skipCache = !useCache;
+
+ return dnsdist::queueQueryResumptionEvent(std::move(query));
+}
+
+bool dnsdist_ffi_set_rcode_from_async(uint16_t asyncID, uint16_t queryID, uint8_t rcode, bool clearAnswers)
+{
+ if (!dnsdist::g_asyncHolder) {
+ return false;
+ }
+
+ auto query = dnsdist::g_asyncHolder->get(asyncID, queryID);
+ if (!query) {
+ vinfolog("Unable to resume with a custom response code, no object found for asynchronous ID %d and query ID %d", asyncID, queryID);
+ return false;
+ }
+
+ const auto qnameLength = query->query.d_idstate.qname.wirelength();
+ auto& buffer = query->query.d_buffer;
+ if (buffer.size() < sizeof(dnsheader) + qnameLength + sizeof(uint16_t) + sizeof(uint16_t)) {
+ return false;
+ }
+
+ EDNS0Record edns0;
+ bool hadEDNS = false;
+ if (clearAnswers) {
+ hadEDNS = getEDNS0Record(buffer, edns0);
+ }
+
+ auto dh = reinterpret_cast<dnsheader*>(buffer.data());
+ dh->rcode = rcode;
+ dh->ad = false;
+ dh->aa = false;
+ dh->ra = dh->rd;
+ dh->qr = true;
+
+ if (clearAnswers) {
+ dh->ancount = 0;
+ dh->nscount = 0;
+ dh->arcount = 0;
+ buffer.resize(sizeof(dnsheader) + qnameLength + sizeof(uint16_t) + sizeof(uint16_t));
+ if (hadEDNS) {
+ if (!addEDNS(buffer, query->query.d_idstate.protocol.isUDP() ? 4096 : std::numeric_limits<uint16_t>::max(), edns0.extFlags & htons(EDNS_HEADER_FLAG_DO), g_PayloadSizeSelfGenAnswers, 0)) {
+ return false;
+ }
+ }
+ }
+
+ query->query.d_idstate.skipCache = true;
+
+ return dnsdist::queueQueryResumptionEvent(std::move(query));
+}
+
+bool dnsdist_ffi_drop_from_async(uint16_t asyncID, uint16_t queryID)
+{
+ if (!dnsdist::g_asyncHolder) {
+ return false;
+ }
+
+ auto query = dnsdist::g_asyncHolder->get(asyncID, queryID);
+ if (!query) {
+ vinfolog("Unable to drop, no object found for asynchronous ID %d and query ID %d", asyncID, queryID);
+ return false;
+ }
+
+ auto sender = query->getTCPQuerySender();
+ if (!sender) {
+ return false;
+ }
+
+ struct timeval now;
+ gettimeofday(&now, nullptr);
+ sender->notifyIOError(std::move(query->query.d_idstate), now);
+
+ return true;
+}
+
+bool dnsdist_ffi_set_answer_from_async(uint16_t asyncID, uint16_t queryID, const char* raw, size_t rawSize)
+{
+ if (rawSize < sizeof(dnsheader)) {
+ return false;
+ }
+ if (!dnsdist::g_asyncHolder) {
+ return false;
+ }
+
+ auto query = dnsdist::g_asyncHolder->get(asyncID, queryID);
+ if (!query) {
+ vinfolog("Unable to resume with a custom answer, no object found for asynchronous ID %d and query ID %d", asyncID, queryID);
+ return false;
+ }
+
+ auto oldId = reinterpret_cast<const dnsheader*>(query->query.d_buffer.data())->id;
+ query->query.d_buffer.clear();
+ query->query.d_buffer.insert(query->query.d_buffer.begin(), raw, raw + rawSize);
+ reinterpret_cast<dnsheader*>(query->query.d_buffer.data())->id = oldId;
+
+ query->query.d_idstate.skipCache = true;
+
+ return dnsdist::queueQueryResumptionEvent(std::move(query));
+}
+
static constexpr char s_lua_ffi_code[] = R"FFICodeContent(
local ffi = require("ffi")
local C = ffi.C
vinfolog("Error parsing address passed to dnsdist_ffi_packetcache_get_domain_list_by_addr: %s", e.what());
return 0;
}
+ catch (const PDNSException& e) {
+ vinfolog("Error parsing address passed to dnsdist_ffi_packetcache_get_domain_list_by_addr: %s", e.reason);
+ return 0;
+ }
const auto localPools = g_pools.getCopy();
auto it = localPools.find(poolName);
vinfolog("Unable to convert address in dnsdist_ffi_ring_get_entries_by_addr: %s", e.what());
return 0;
}
+ catch (const PDNSException& e) {
+ vinfolog("Unable to convert address in dnsdist_ffi_ring_get_entries_by_addr: %s", e.reason);
+ return 0;
+ }
auto list = std::make_unique<dnsdist_ffi_ring_entry_list_t>();
#include "dnsdist-lua-network.hh"
#include "dolog.hh"
+#include "threadname.hh"
namespace dnsdist
{
NetworkListener::NetworkListener() :
- d_mplexer(std::unique_ptr<FDMultiplexer>(FDMultiplexer::getMultiplexerSilent()))
+ d_mplexer(std::unique_ptr<FDMultiplexer>(FDMultiplexer::getMultiplexerSilent(10)))
{
}
void NetworkListener::mainThread()
{
+ setThreadName("dnsdist/lua-net");
struct timeval now;
while (true) {
}
}
- request.d_sender->handleResponse(now, TCPResponse(std::move(request.d_buffer), std::move(request.d_query.d_idstate), shared_from_this()));
+ request.d_sender->handleResponse(now, TCPResponse(std::move(request.d_buffer), std::move(request.d_query.d_idstate), shared_from_this(), d_ds));
}
catch (const std::exception& e) {
vinfolog("Got exception while handling response for cross-protocol DoH: %s", e.what());
bool ConnectionToBackend::reconnect()
{
std::unique_ptr<TLSSession> tlsSession{nullptr};
- if (d_handler) {
+ if (d_handler) {
DEBUGLOG("closing socket "<<d_handler->getDescriptor());
if (d_handler->isTLS()) {
if (d_handler->hasTLSSessionBeenResumed()) {
DEBUGLOG("Opening TCP connection to backend "<<d_ds->getNameWithAddr());
++d_ds->tcpNewConnections;
try {
- auto socket = std::make_unique<Socket>(d_ds->d_config.remote.sin4.sin_family, SOCK_STREAM, 0);
- DEBUGLOG("result of socket() is "<<socket->getHandle());
+ auto socket = Socket(d_ds->d_config.remote.sin4.sin_family, SOCK_STREAM, 0);
+ DEBUGLOG("result of socket() is "<<socket.getHandle());
/* disable NAGLE, which does not play nicely with delayed ACKs.
In theory we could be wasting up to 500 milliseconds waiting for
the other end to acknowledge our initial packet before we could
send the rest. */
- setTCPNoDelay(socket->getHandle());
+ setTCPNoDelay(socket.getHandle());
#ifdef SO_BINDTODEVICE
if (!d_ds->d_config.sourceItfName.empty()) {
- int res = setsockopt(socket->getHandle(), SOL_SOCKET, SO_BINDTODEVICE, d_ds->d_config.sourceItfName.c_str(), d_ds->d_config.sourceItfName.length());
+ int res = setsockopt(socket.getHandle(), SOL_SOCKET, SO_BINDTODEVICE, d_ds->d_config.sourceItfName.c_str(), d_ds->d_config.sourceItfName.length());
if (res != 0) {
vinfolog("Error setting up the interface on backend TCP socket '%s': %s", d_ds->getNameWithAddr(), stringerror());
}
#endif
if (!IsAnyAddress(d_ds->d_config.sourceAddr)) {
- SSetsockopt(socket->getHandle(), SOL_SOCKET, SO_REUSEADDR, 1);
+ SSetsockopt(socket.getHandle(), SOL_SOCKET, SO_REUSEADDR, 1);
#ifdef IP_BIND_ADDRESS_NO_PORT
if (d_ds->d_config.ipBindAddrNoPort) {
- SSetsockopt(socket->getHandle(), SOL_IP, IP_BIND_ADDRESS_NO_PORT, 1);
+ SSetsockopt(socket.getHandle(), SOL_IP, IP_BIND_ADDRESS_NO_PORT, 1);
}
#endif
- socket->bind(d_ds->d_config.sourceAddr, false);
+ socket.bind(d_ds->d_config.sourceAddr, false);
}
-
- socket->setNonBlocking();
+ socket.setNonBlocking();
gettimeofday(&d_connectionStartTime, nullptr);
- auto handler = std::make_unique<TCPIOHandler>(d_ds->d_config.d_tlsSubjectName, d_ds->d_config.d_tlsSubjectIsAddr, socket->releaseHandle(), timeval{0,0}, d_ds->d_tlsCtx, d_connectionStartTime.tv_sec);
+ auto handler = std::make_unique<TCPIOHandler>(d_ds->d_config.d_tlsSubjectName, d_ds->d_config.d_tlsSubjectIsAddr, socket.releaseHandle(), timeval{0,0}, d_ds->d_tlsCtx, d_connectionStartTime.tv_sec);
if (!tlsSession && d_ds->d_tlsCtx) {
tlsSession = g_sessionCache.getSession(d_ds->getID(), d_connectionStartTime.tv_sec);
}
auto pendingResponses = std::move(d_pendingResponses);
d_pendingResponses.clear();
- auto increaseCounters = [reason](std::shared_ptr<TCPQuerySender>& sender) {
+ auto increaseCounters = [reason](const ClientState* cs) {
if (reason == FailureReason::timeout) {
- const ClientState* cs = sender->getClientState();
if (cs) {
++cs->tcpDownstreamTimeouts;
}
}
else if (reason == FailureReason::gaveUp) {
- const ClientState* cs = sender->getClientState();
if (cs) {
++cs->tcpGaveUp;
}
try {
if (d_state == State::sendingQueryToBackend) {
+ increaseCounters(d_currentQuery.d_query.d_idstate.cs);
auto sender = d_currentQuery.d_sender;
if (sender->active()) {
- increaseCounters(sender);
sender->notifyIOError(std::move(d_currentQuery.d_query.d_idstate), now);
}
}
for (auto& query : pendingQueries) {
+ increaseCounters(query.d_query.d_idstate.cs);
auto sender = query.d_sender;
if (sender->active()) {
- increaseCounters(sender);
sender->notifyIOError(std::move(query.d_query.d_idstate), now);
}
}
for (auto& response : pendingResponses) {
+ increaseCounters(response.second.d_query.d_idstate.cs);
auto sender = response.second.d_sender;
if (sender->active()) {
- increaseCounters(sender);
sender->notifyIOError(std::move(response.second.d_query.d_idstate), now);
}
}
TCPResponse response;
response.d_buffer = std::move(d_responseBuffer);
response.d_connection = conn;
+ response.d_ds = conn->d_ds;
/* we don't move the whole IDS because we will need for the responses to come */
response.d_idstate.qtype = it->second.d_query.d_idstate.qtype;
response.d_idstate.qname = it->second.d_query.d_idstate.qname;
if (sender->active()) {
DEBUGLOG("passing response to client connection for "<<ids.qname);
// make sure that we still exist after calling handleResponse()
- sender->handleResponse(now, TCPResponse(std::move(d_responseBuffer), std::move(ids), conn));
+ sender->handleResponse(now, TCPResponse(std::move(d_responseBuffer), std::move(ids), conn, conn->d_ds));
}
if (!d_pendingQueries.empty()) {
return d_ioState != nullptr;
}
- const ClientState* getClientState() const override
- {
- return d_ci.cs;
- }
-
std::string toString() const
{
ostringstream o;
int fd{-1};
};
-struct InternalQuery
+class InternalQuery
{
+public:
InternalQuery()
{
}
memset(&d_cleartextDH, 0, sizeof(d_cleartextDH));
}
- TCPResponse(PacketBuffer&& buffer, InternalQueryState&& state, std::shared_ptr<ConnectionToBackend> conn) :
- TCPQuery(std::move(buffer), std::move(state)), d_connection(conn)
+ TCPResponse(PacketBuffer&& buffer, InternalQueryState&& state, std::shared_ptr<ConnectionToBackend> conn, std::shared_ptr<DownstreamState> ds) :
+ TCPQuery(std::move(buffer), std::move(state)), d_connection(conn), d_ds(ds)
{
- memset(&d_cleartextDH, 0, sizeof(d_cleartextDH));
+ if (d_buffer.size() >= sizeof(dnsheader)) {
+ memcpy(&d_cleartextDH, reinterpret_cast<const dnsheader*>(d_buffer.data()), sizeof(d_cleartextDH));
+ }
+ else {
+ memset(&d_cleartextDH, 0, sizeof(d_cleartextDH));
+ }
+ }
+
+ bool isAsync() const
+ {
+ return d_async;
}
std::shared_ptr<ConnectionToBackend> d_connection{nullptr};
+ std::shared_ptr<DownstreamState> d_ds{nullptr};
dnsheader d_cleartextDH;
- bool d_selfGenerated{false};
+ bool d_async{false};
};
class TCPQuerySender
}
virtual bool active() const = 0;
- virtual const ClientState* getClientState() const = 0;
virtual void handleResponse(const struct timeval& now, TCPResponse&& response) = 0;
virtual void handleXFRResponse(const struct timeval& now, TCPResponse&& response) = 0;
virtual void notifyIOError(InternalQueryState&& query, const struct timeval& now) = 0;
}
virtual std::shared_ptr<TCPQuerySender> getTCPQuerySender() = 0;
+ virtual DNSQuestion getDQ()
+ {
+ auto& ids = query.d_idstate;
+ DNSQuestion dq(ids, query.d_buffer);
+ return dq;
+ }
+
+ virtual DNSResponse getDR()
+ {
+ auto& ids = query.d_idstate;
+ DNSResponse dr(ids, query.d_buffer, downstream);
+ return dr;
+ }
InternalQuery query;
std::shared_ptr<DownstreamState> downstream{nullptr};
size_t proxyProtocolPayloadSize{0};
- bool isXFR{false};
+ bool d_isResponse{false};
};
class TCPClientCollection
};
extern std::unique_ptr<TCPClientCollection> g_tcpclientthreads;
+
+std::unique_ptr<CrossProtocolQuery> getTCPCrossProtocolQueryFromDQ(DNSQuestion& dq);
class DoHTCPCrossQuerySender : public TCPQuerySender
{
public:
- DoHTCPCrossQuerySender(const ClientState& cs): d_cs(cs)
+ DoHTCPCrossQuerySender()
{
}
return true;
}
- const ClientState* getClientState() const override
- {
- return &d_cs;
- }
-
void handleResponse(const struct timeval& now, TCPResponse&& response) override
{
if (!response.d_idstate.du) {
du->ids = std::move(response.d_idstate);
DNSResponse dr(du->ids, du->response, du->downstream);
- static thread_local LocalStateHolder<vector<DNSDistResponseRuleAction>> localRespRuleActions = g_respruleactions.getLocal();
- static thread_local LocalStateHolder<vector<DNSDistResponseRuleAction>> localCacheInsertedRespRuleActions = g_cacheInsertedRespRuleActions.getLocal();
-
dnsheader cleartextDH;
memcpy(&cleartextDH, dr.getHeader(), sizeof(cleartextDH));
- dr.ids.du = std::move(du);
+ if (!response.isAsync()) {
+ static thread_local LocalStateHolder<vector<DNSDistResponseRuleAction>> localRespRuleActions = g_respruleactions.getLocal();
+ static thread_local LocalStateHolder<vector<DNSDistResponseRuleAction>> localCacheInsertedRespRuleActions = g_cacheInsertedRespRuleActions.getLocal();
- if (!processResponse(dr.ids.du->response, *localRespRuleActions, *localCacheInsertedRespRuleActions, dr, false)) {
- if (dr.ids.du) {
- dr.ids.du->status_code = 503;
- sendDoHUnitToTheMainThread(std::move(dr.ids.du), "Response dropped by rules");
+ dr.ids.du = std::move(du);
+
+ if (!processResponse(dr.ids.du->response, *localRespRuleActions, *localCacheInsertedRespRuleActions, dr, false)) {
+ if (dr.ids.du) {
+ dr.ids.du->status_code = 503;
+ sendDoHUnitToTheMainThread(std::move(dr.ids.du), "Response dropped by rules");
+ }
+ return;
}
- return;
- }
- du = std::move(dr.ids.du);
+ if (dr.isAsynchronous()) {
+ return;
+ }
- double udiff = du->ids.queryRealTime.udiff();
- vinfolog("Got answer from %s, relayed to %s (https), took %f usec", du->downstream->d_config.remote.toStringWithPort(), du->ids.origRemote.toStringWithPort(), udiff);
+ du = std::move(dr.ids.du);
+ }
+
+ if (!du->ids.selfGenerated) {
+ double udiff = du->ids.queryRealTime.udiff();
+ vinfolog("Got answer from %s, relayed to %s (https), took %f usec", du->downstream->d_config.remote.toStringWithPort(), du->ids.origRemote.toStringWithPort(), udiff);
- auto backendProtocol = du->downstream->getProtocol();
- if (backendProtocol == dnsdist::Protocol::DoUDP && du->tcp) {
- backendProtocol = dnsdist::Protocol::DoTCP;
+ auto backendProtocol = du->downstream->getProtocol();
+ if (backendProtocol == dnsdist::Protocol::DoUDP && du->tcp) {
+ backendProtocol = dnsdist::Protocol::DoTCP;
+ }
+ handleResponseSent(du->ids, udiff, du->ids.origRemote, du->downstream->d_config.remote, du->response.size(), cleartextDH, backendProtocol);
}
- handleResponseSent(du->ids, udiff, du->ids.origRemote, du->downstream->d_config.remote, du->response.size(), cleartextDH, backendProtocol);
++g_stats.responses;
if (du->ids.cs) {
du->status_code = 502;
sendDoHUnitToTheMainThread(std::move(du), "cross-protocol error response");
}
-protected:
- const ClientState& d_cs;
};
class DoHCrossProtocolQuery : public CrossProtocolQuery
{
public:
- DoHCrossProtocolQuery(DOHUnitUniquePtr&& du)
+ DoHCrossProtocolQuery(DOHUnitUniquePtr&& du, bool isResponse)
{
- query = InternalQuery(std::move(du->query), std::move(du->ids));
+ if (isResponse) {
+ /* happens when a response becomes async */
+ query = InternalQuery(std::move(du->response), std::move(du->ids));
+ }
+ else {
+ /* we need to duplicate the query here because we might need
+ the existing query later if we get a truncated answer */
+ query = InternalQuery(PacketBuffer(du->query), std::move(du->ids));
+ }
+
/* it might have been moved when we moved du->ids */
if (du) {
query.d_idstate.du = std::move(du);
std::shared_ptr<TCPQuerySender> getTCPQuerySender() override
{
query.d_idstate.du->downstream = downstream;
- auto sender = std::make_shared<DoHTCPCrossQuerySender>(*query.d_idstate.cs);
- return sender;
+ return s_sender;
+ }
+
+ DNSQuestion getDQ() override
+ {
+ auto& ids = query.d_idstate;
+ DNSQuestion dq(ids, query.d_buffer);
+ return dq;
}
+ DNSResponse getDR() override
+ {
+ auto& ids = query.d_idstate;
+ DNSResponse dr(ids, query.d_buffer, downstream);
+ return dr;
+ }
+
DOHUnitUniquePtr&& releaseDU()
{
return std::move(query.d_idstate.du);
}
+
+private:
+ static std::shared_ptr<DoHTCPCrossQuerySender> s_sender;
};
+std::shared_ptr<DoHTCPCrossQuerySender> DoHCrossProtocolQuery::s_sender = std::make_shared<DoHTCPCrossQuerySender>();
+
+std::unique_ptr<CrossProtocolQuery> getDoHCrossProtocolQueryFromDQ(DNSQuestion& dq, bool isResponse)
+{
+ if (!dq.ids.du) {
+ throw std::runtime_error("Trying to create a DoH cross protocol query without a valid DoH unit");
+ }
+
+ auto du = std::move(dq.ids.du);
+ if (&dq.ids != &du->ids) {
+ du->ids = std::move(dq.ids);
+ }
+
+ du->ids.origID = dq.getHeader()->id;
+
+ if (!isResponse) {
+ if (du->query.data() != dq.getMutableData().data()) {
+ du->query = std::move(dq.getMutableData());
+ }
+ }
+ else {
+ if (du->response.data() != dq.getMutableData().data()) {
+ du->response = std::move(dq.getMutableData());
+ }
+ }
+
+ return std::make_unique<DoHCrossProtocolQuery>(std::move(du), isResponse);
+}
+
/*
We are not in the main DoH thread but in the DoH 'client' thread.
*/
queryId = ntohs(dh->id);
}
+ auto downstream = du->downstream;
du->ids.qname = DNSName(reinterpret_cast<const char*>(du->query.data()), du->query.size(), sizeof(dnsheader), false, &du->ids.qtype, &du->ids.qclass);
DNSQuestion dq(du->ids, du->query);
const uint16_t* flags = getFlagsFromDNSHeader(dq.getHeader());
du->ids.cs = &cs;
dq.sni = std::move(du->sni);
- auto result = processQuery(dq, cs, holders, du->downstream);
+ auto result = processQuery(dq, holders, downstream);
if (result == ProcessQueryResult::Drop) {
du->status_code = 403;
handleImmediateResponse(std::move(du), "DoH dropped query");
return;
}
-
- if (result == ProcessQueryResult::SendAnswer) {
+ else if (result == ProcessQueryResult::Asynchronous) {
+ return;
+ }
+ else if (result == ProcessQueryResult::SendAnswer) {
if (du->response.empty()) {
du->response = std::move(du->query);
}
if (du->response.size() >= sizeof(dnsheader) && du->contentType.empty()) {
auto dh = reinterpret_cast<const struct dnsheader*>(du->response.data());
- handleResponseSent(ids.qname, QType(ids.qtype), 0., du->ids.origDest, ComboAddress(), du->response.size(), *dh, dnsdist::Protocol::DoH, dnsdist::Protocol::DoH);
+ handleResponseSent(du->ids.qname, QType(du->ids.qtype), 0., du->ids.origDest, ComboAddress(), du->response.size(), *dh, dnsdist::Protocol::DoH, dnsdist::Protocol::DoH);
}
handleImmediateResponse(std::move(du), "DoH self-answered response");
return;
return;
}
- auto downstream = du->downstream;
if (downstream == nullptr) {
du->status_code = 502;
handleImmediateResponse(std::move(du), "DoH no backend available");
du->tcp = true;
/* this moves du->ids, careful! */
- auto cpq = std::make_unique<DoHCrossProtocolQuery>(std::move(du));
+ auto cpq = std::make_unique<DoHCrossProtocolQuery>(std::move(du), false);
cpq->query.d_proxyProtocolPayload = std::move(proxyProtocolPayload);
if (downstream->passCrossProtocolQuery(std::move(cpq))) {
du->truncated = false;
du->response.clear();
- auto cpq = std::make_unique<DoHCrossProtocolQuery>(std::move(du));
+ auto cpq = std::make_unique<DoHCrossProtocolQuery>(std::move(du), false);
if (g_tcpclientthreads && g_tcpclientthreads->passCrossProtocolQueryToThread(std::move(cpq))) {
continue;
const dnsheader* dh = reinterpret_cast<const struct dnsheader*>(du->response.data());
if (!dh->tc) {
static thread_local LocalStateHolder<vector<DNSDistResponseRuleAction>> localRespRuleActions = g_respruleactions.getLocal();
- static thread_local LocalStateHolder<vector<DNSDistResponseRuleAction>> localcacheInsertedRespRuleActions = g_cacheInsertedRespRuleActions.getLocal();
+ static thread_local LocalStateHolder<vector<DNSDistResponseRuleAction>> localCacheInsertedRespRuleActions = g_cacheInsertedRespRuleActions.getLocal();
DNSResponse dr(du->ids, du->response, du->downstream);
dnsheader cleartextDH;
memcpy(&cleartextDH, dr.getHeader(), sizeof(cleartextDH));
dr.ids.du = std::move(du);
- if (!processResponse(dr.ids.du->response, *localRespRuleActions, *localcacheInsertedRespRuleActions, dr, false)) {
+ if (!processResponse(dr.ids.du->response, *localRespRuleActions, *localCacheInsertedRespRuleActions, dr, false)) {
if (dr.ids.du) {
dr.ids.du->status_code = 503;
sendDoHUnitToTheMainThread(std::move(dr.ids.du), "Response dropped by rules");
return;
}
+ if (dr.isAsynchronous()) {
+ return;
+ }
+
du = std::move(dr.ids.du);
double udiff = du->ids.queryRealTime.udiff();
vinfolog("Got answer from %s, relayed to %s (https), took %f usec", du->downstream->d_config.remote.toStringWithPort(), du->ids.origRemote.toStringWithPort(), udiff);
--- /dev/null
+/*
+ * This file is part of PowerDNS or dnsdist.
+ * Copyright -- PowerDNS.COM B.V. and its contributors
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of version 2 of the GNU General Public License as
+ * published by the Free Software Foundation.
+ *
+ * In addition, for the avoidance of any doubt, permission is granted to
+ * link this program with OpenSSL and to (re)distribute the binaries
+ * produced as the result of such linking.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
+ */
+#define BOOST_TEST_DYN_LINK
+#define BOOST_TEST_NO_MAIN
+
+#include <boost/test/unit_test.hpp>
+
+#include "dnsdist-async.hh"
+
+BOOST_AUTO_TEST_SUITE(test_dnsdistasync)
+
+class DummyQuerySender : public TCPQuerySender
+{
+public:
+ bool active() const override
+ {
+ return true;
+ }
+
+ void handleResponse(const struct timeval&, TCPResponse&&) override
+ {
+ }
+
+ void handleXFRResponse(const struct timeval&, TCPResponse&&) override
+ {
+ }
+
+ void notifyIOError(InternalQueryState&&, const struct timeval&) override
+ {
+ errorRaised = true;
+ }
+
+ bool errorRaised{false};
+};
+
+struct DummyCrossProtocolQuery : public CrossProtocolQuery
+{
+ DummyCrossProtocolQuery() :
+ CrossProtocolQuery()
+ {
+ d_sender = std::make_shared<DummyQuerySender>();
+ }
+
+ std::shared_ptr<TCPQuerySender> getTCPQuerySender() override
+ {
+ return d_sender;
+ }
+
+ std::shared_ptr<DummyQuerySender> d_sender;
+};
+
+BOOST_AUTO_TEST_CASE(test_Basic)
+{
+ auto holder = std::make_unique<dnsdist::AsynchronousHolder>();
+ BOOST_CHECK(holder->empty());
+
+ {
+ auto query = holder->get(0, 0);
+ BOOST_CHECK(query == nullptr);
+ }
+
+ {
+ uint16_t asyncID = 1;
+ uint16_t queryID = 42;
+ struct timeval ttd;
+ gettimeofday(&ttd, nullptr);
+ // timeout in 100 ms
+ ttd.tv_usec += 100000;
+
+ holder->push(asyncID, queryID, ttd, std::make_unique<DummyCrossProtocolQuery>());
+ BOOST_CHECK(!holder->empty());
+
+ auto query = holder->get(0, 0);
+ BOOST_CHECK(query == nullptr);
+
+ query = holder->get(asyncID, queryID);
+ BOOST_CHECK(holder->empty());
+
+ query = holder->get(asyncID, queryID);
+ BOOST_CHECK(query == nullptr);
+
+ // sleep for 200 ms, to be sure the main thread has
+ // been awakened
+ usleep(200000);
+ }
+
+ holder->stop();
+}
+
+BOOST_AUTO_TEST_CASE(test_TimeoutFailClose)
+{
+ auto holder = std::make_unique<dnsdist::AsynchronousHolder>(false);
+ uint16_t asyncID = 1;
+ uint16_t queryID = 42;
+ struct timeval ttd;
+ gettimeofday(&ttd, nullptr);
+ // timeout in 10 ms
+ ttd.tv_usec += 10000;
+
+ std::shared_ptr<DummyQuerySender> sender{nullptr};
+ {
+ auto query = std::make_unique<DummyCrossProtocolQuery>();
+ sender = query->d_sender;
+ BOOST_REQUIRE(sender != nullptr);
+ holder->push(asyncID, queryID, ttd, std::move(query));
+ BOOST_CHECK(!holder->empty());
+ }
+
+ // sleep for 20 ms, to be sure
+ usleep(20000);
+
+ BOOST_CHECK(holder->empty());
+ BOOST_CHECK(sender->errorRaised);
+
+ holder->stop();
+}
+
+BOOST_AUTO_TEST_CASE(test_AddingExpiredEvent)
+{
+ auto holder = std::make_unique<dnsdist::AsynchronousHolder>(false);
+ uint16_t asyncID = 1;
+ uint16_t queryID = 42;
+ struct timeval ttd;
+ gettimeofday(&ttd, nullptr);
+ // timeout was 10 ms ago, for some reason (long processing time, CPU starvation...)
+ ttd.tv_usec -= 10000;
+
+ std::shared_ptr<DummyQuerySender> sender{nullptr};
+ {
+ auto query = std::make_unique<DummyCrossProtocolQuery>();
+ sender = query->d_sender;
+ BOOST_REQUIRE(sender != nullptr);
+ holder->push(asyncID, queryID, ttd, std::move(query));
+ BOOST_CHECK(!holder->empty());
+ }
+
+ // sleep for 20 ms
+ usleep(20000);
+
+ BOOST_CHECK(holder->empty());
+ BOOST_CHECK(sender->errorRaised);
+
+ holder->stop();
+}
+
+BOOST_AUTO_TEST_SUITE_END();
return true;
}
- const ClientState* getClientState() const override
- {
- return nullptr;
- }
-
void handleResponse(const struct timeval& now, TCPResponse&& response) override
{
if (d_customHandler) {
{
}
-static std::function<ProcessQueryResult(DNSQuestion& dq, ClientState& cs, LocalHolders& holders, std::shared_ptr<DownstreamState>& selectedBackend)> s_processQuery;
+static std::function<ProcessQueryResult(DNSQuestion& dq, std::shared_ptr<DownstreamState>& selectedBackend)> s_processQuery;
-ProcessQueryResult processQuery(DNSQuestion& dq, ClientState& cs, LocalHolders& holders, std::shared_ptr<DownstreamState>& selectedBackend)
+ProcessQueryResult processQuery(DNSQuestion& dq, LocalHolders& holders, std::shared_ptr<DownstreamState>& selectedBackend)
{
if (s_processQuery) {
- return s_processQuery(dq, cs, holders, selectedBackend);
+ return s_processQuery(dq, selectedBackend);
}
return ProcessQueryResult::Drop;
{ ExpectedStep::ExpectedRequest::readFromClient, IOState::Done, query.size() - 2 },
{ ExpectedStep::ExpectedRequest::closeClient, IOState::Done },
};
- s_processQuery = [](DNSQuestion& dq, ClientState& cs, LocalHolders& holders, std::shared_ptr<DownstreamState>& selectedBackend) -> ProcessQueryResult {
+ s_processQuery = [](DNSQuestion& dq, std::shared_ptr<DownstreamState>& selectedBackend) -> ProcessQueryResult {
return ProcessQueryResult::Drop;
};
{ ExpectedStep::ExpectedRequest::readFromClient, IOState::Done, 0 },
{ ExpectedStep::ExpectedRequest::closeClient, IOState::Done },
};
- s_processQuery = [](DNSQuestion& dq, ClientState& cs, LocalHolders& holders, std::shared_ptr<DownstreamState>& selectedBackend) -> ProcessQueryResult {
+ s_processQuery = [](DNSQuestion& dq, std::shared_ptr<DownstreamState>& selectedBackend) -> ProcessQueryResult {
// Would be nicer to actually turn it into a response
return ProcessQueryResult::SendAnswer;
};
{ ExpectedStep::ExpectedRequest::readFromClient, IOState::Done, 0 },
{ ExpectedStep::ExpectedRequest::closeClient, IOState::Done },
};
- s_processQuery = [](DNSQuestion& dq, ClientState& cs, LocalHolders& holders, std::shared_ptr<DownstreamState>& selectedBackend) -> ProcessQueryResult {
+ s_processQuery = [](DNSQuestion& dq, std::shared_ptr<DownstreamState>& selectedBackend) -> ProcessQueryResult {
// Would be nicer to actually turn it into a response
return ProcessQueryResult::SendAnswer;
};
{ ExpectedStep::ExpectedRequest::readFromClient, IOState::Done, query.size() - 2 },
{ ExpectedStep::ExpectedRequest::closeClient, IOState::Done },
};
- s_processQuery = [](DNSQuestion& dq, ClientState& cs, LocalHolders& holders, std::shared_ptr<DownstreamState>& selectedBackend) -> ProcessQueryResult {
+ s_processQuery = [](DNSQuestion& dq, std::shared_ptr<DownstreamState>& selectedBackend) -> ProcessQueryResult {
throw std::runtime_error("Something unexpected happened");
};
s_steps.push_back({ ExpectedStep::ExpectedRequest::readFromClient, IOState::Done, 0 });
s_steps.push_back({ ExpectedStep::ExpectedRequest::closeClient, IOState::Done });
- s_processQuery = [](DNSQuestion& dq, ClientState& cs, LocalHolders& holders, std::shared_ptr<DownstreamState>& selectedBackend) -> ProcessQueryResult {
+ s_processQuery = [](DNSQuestion& dq, std::shared_ptr<DownstreamState>& selectedBackend) -> ProcessQueryResult {
// Would be nicer to actually turn it into a response
return ProcessQueryResult::SendAnswer;
};
{ ExpectedStep::ExpectedRequest::readFromClient, IOState::NeedRead, query.size() - 2 - 2 },
{ ExpectedStep::ExpectedRequest::closeClient, IOState::Done },
};
- s_processQuery = [](DNSQuestion& dq, ClientState& cs, LocalHolders& holders, std::shared_ptr<DownstreamState>& selectedBackend) -> ProcessQueryResult {
+ s_processQuery = [](DNSQuestion& dq, std::shared_ptr<DownstreamState>& selectedBackend) -> ProcessQueryResult {
/* should not be reached */
BOOST_CHECK(false);
return ProcessQueryResult::SendAnswer;
{ ExpectedStep::ExpectedRequest::writeToClient, IOState::NeedWrite, 1 },
{ ExpectedStep::ExpectedRequest::closeClient, IOState::Done },
};
- s_processQuery = [](DNSQuestion& dq, ClientState& cs, LocalHolders& holders, std::shared_ptr<DownstreamState>& selectedBackend) -> ProcessQueryResult {
+ s_processQuery = [](DNSQuestion& dq, std::shared_ptr<DownstreamState>& selectedBackend) -> ProcessQueryResult {
return ProcessQueryResult::SendAnswer;
};
{ ExpectedStep::ExpectedRequest::writeToClient, IOState::Done, 0 },
{ ExpectedStep::ExpectedRequest::closeClient, IOState::Done },
};
- s_processQuery = [](DNSQuestion& dq, ClientState& cs, LocalHolders& holders, std::shared_ptr<DownstreamState>& selectedBackend) -> ProcessQueryResult {
+ s_processQuery = [](DNSQuestion& dq, std::shared_ptr<DownstreamState>& selectedBackend) -> ProcessQueryResult {
return ProcessQueryResult::SendAnswer;
};
{ ExpectedStep::ExpectedRequest::readFromClient, IOState::Done, 0 },
{ ExpectedStep::ExpectedRequest::closeClient, IOState::Done },
};
- s_processQuery = [](DNSQuestion& dq, ClientState& cs, LocalHolders& holders, std::shared_ptr<DownstreamState>& selectedBackend) -> ProcessQueryResult {
+ s_processQuery = [](DNSQuestion& dq, std::shared_ptr<DownstreamState>& selectedBackend) -> ProcessQueryResult {
return ProcessQueryResult::SendAnswer;
};
{ ExpectedStep::ExpectedRequest::readFromClient, IOState::Done, s_proxyProtocolMinimumHeaderSize },
{ ExpectedStep::ExpectedRequest::closeClient, IOState::Done },
};
- s_processQuery = [](DNSQuestion& dq, ClientState& cs, LocalHolders& holders, std::shared_ptr<DownstreamState>& selectedBackend) -> ProcessQueryResult {
+ s_processQuery = [](DNSQuestion& dq, std::shared_ptr<DownstreamState>& selectedBackend) -> ProcessQueryResult {
return ProcessQueryResult::SendAnswer;
};
{ ExpectedStep::ExpectedRequest::readFromClient, IOState::NeedRead, proxyPayload.size() - s_proxyProtocolMinimumHeaderSize - 1},
{ ExpectedStep::ExpectedRequest::closeClient, IOState::Done },
};
- s_processQuery = [](DNSQuestion& dq, ClientState& cs, LocalHolders& holders, std::shared_ptr<DownstreamState>& selectedBackend) -> ProcessQueryResult {
+ s_processQuery = [](DNSQuestion& dq, std::shared_ptr<DownstreamState>& selectedBackend) -> ProcessQueryResult {
return ProcessQueryResult::SendAnswer;
};
/* closing a connection to the backend */
{ ExpectedStep::ExpectedRequest::closeBackend, IOState::Done },
};
- s_processQuery = [backend](DNSQuestion& dq, ClientState& cs, LocalHolders& holders, std::shared_ptr<DownstreamState>& selectedBackend) -> ProcessQueryResult {
+ s_processQuery = [backend](DNSQuestion& dq, std::shared_ptr<DownstreamState>& selectedBackend) -> ProcessQueryResult {
selectedBackend = backend;
return ProcessQueryResult::PassToBackend;
};
/* closing a connection to the backend */
{ ExpectedStep::ExpectedRequest::closeBackend, IOState::Done },
};
- s_processQuery = [backend](DNSQuestion& dq, ClientState& cs, LocalHolders& holders, std::shared_ptr<DownstreamState>& selectedBackend) -> ProcessQueryResult {
+ s_processQuery = [backend](DNSQuestion& dq, std::shared_ptr<DownstreamState>& selectedBackend) -> ProcessQueryResult {
selectedBackend = backend;
return ProcessQueryResult::PassToBackend;
};
/* closing a connection to the backend */
{ ExpectedStep::ExpectedRequest::closeBackend, IOState::Done },
};
- s_processQuery = [backend](DNSQuestion& dq, ClientState& cs, LocalHolders& holders, std::shared_ptr<DownstreamState>& selectedBackend) -> ProcessQueryResult {
+ s_processQuery = [backend](DNSQuestion& dq, std::shared_ptr<DownstreamState>& selectedBackend) -> ProcessQueryResult {
selectedBackend = backend;
return ProcessQueryResult::PassToBackend;
};
/* closing a connection to the backend */
{ ExpectedStep::ExpectedRequest::closeBackend, IOState::Done },
};
- s_processQuery = [backend](DNSQuestion& dq, ClientState& cs, LocalHolders& holders, std::shared_ptr<DownstreamState>& selectedBackend) -> ProcessQueryResult {
+ s_processQuery = [backend](DNSQuestion& dq, std::shared_ptr<DownstreamState>& selectedBackend) -> ProcessQueryResult {
selectedBackend = backend;
return ProcessQueryResult::PassToBackend;
};
/* closing client connection */
{ ExpectedStep::ExpectedRequest::closeClient, IOState::Done },
};
- s_processQuery = [](DNSQuestion& dq, ClientState& cs, LocalHolders& holders, std::shared_ptr<DownstreamState>& selectedBackend) -> ProcessQueryResult {
+ s_processQuery = [](DNSQuestion& dq, std::shared_ptr<DownstreamState>& selectedBackend) -> ProcessQueryResult {
return ProcessQueryResult::SendAnswer;
};
s_processResponse = [](PacketBuffer& response, DNSResponse& dr, bool muted) -> bool {
/* closing backend connection */
{ ExpectedStep::ExpectedRequest::closeBackend, IOState::Done },
};
- s_processQuery = [backend](DNSQuestion& dq, ClientState& cs, LocalHolders& holders, std::shared_ptr<DownstreamState>& selectedBackend) -> ProcessQueryResult {
+ s_processQuery = [backend](DNSQuestion& dq, std::shared_ptr<DownstreamState>& selectedBackend) -> ProcessQueryResult {
selectedBackend = backend;
return ProcessQueryResult::PassToBackend;
};
{ ExpectedStep::ExpectedRequest::closeBackend, IOState::Done },
};
- s_processQuery = [backend](DNSQuestion& dq, ClientState& cs, LocalHolders& holders, std::shared_ptr<DownstreamState>& selectedBackend) -> ProcessQueryResult {
+ s_processQuery = [backend](DNSQuestion& dq, std::shared_ptr<DownstreamState>& selectedBackend) -> ProcessQueryResult {
selectedBackend = backend;
return ProcessQueryResult::PassToBackend;
};
{ ExpectedStep::ExpectedRequest::closeClient, IOState::Done },
};
- s_processQuery = [backend](DNSQuestion& dq, ClientState& cs, LocalHolders& holders, std::shared_ptr<DownstreamState>& selectedBackend) -> ProcessQueryResult {
+ s_processQuery = [backend](DNSQuestion& dq, std::shared_ptr<DownstreamState>& selectedBackend) -> ProcessQueryResult {
selectedBackend = backend;
return ProcessQueryResult::PassToBackend;
{ ExpectedStep::ExpectedRequest::closeBackend, IOState::Done },
};
- s_processQuery = [backend](DNSQuestion& dq, ClientState& cs, LocalHolders& holders, std::shared_ptr<DownstreamState>& selectedBackend) -> ProcessQueryResult {
+ s_processQuery = [backend](DNSQuestion& dq, std::shared_ptr<DownstreamState>& selectedBackend) -> ProcessQueryResult {
selectedBackend = backend;
return ProcessQueryResult::PassToBackend;
{ ExpectedStep::ExpectedRequest::closeBackend, IOState::Done },
};
- s_processQuery = [backend](DNSQuestion& dq, ClientState& cs, LocalHolders& holders, std::shared_ptr<DownstreamState>& selectedBackend) -> ProcessQueryResult {
+ s_processQuery = [backend](DNSQuestion& dq, std::shared_ptr<DownstreamState>& selectedBackend) -> ProcessQueryResult {
selectedBackend = backend;
return ProcessQueryResult::PassToBackend;
};
{ ExpectedStep::ExpectedRequest::closeBackend, IOState::Done },
};
- s_processQuery = [backend](DNSQuestion& dq, ClientState& cs, LocalHolders& holders, std::shared_ptr<DownstreamState>& selectedBackend) -> ProcessQueryResult {
+ s_processQuery = [backend](DNSQuestion& dq, std::shared_ptr<DownstreamState>& selectedBackend) -> ProcessQueryResult {
selectedBackend = backend;
return ProcessQueryResult::PassToBackend;
};
{ ExpectedStep::ExpectedRequest::closeBackend, IOState::Done },
};
- s_processQuery = [backend](DNSQuestion& dq, ClientState& cs, LocalHolders& holders, std::shared_ptr<DownstreamState>& selectedBackend) -> ProcessQueryResult {
+ s_processQuery = [backend](DNSQuestion& dq, std::shared_ptr<DownstreamState>& selectedBackend) -> ProcessQueryResult {
selectedBackend = backend;
return ProcessQueryResult::PassToBackend;
};
{ ExpectedStep::ExpectedRequest::closeClient, IOState::Done },
};
- s_processQuery = [backend](DNSQuestion& dq, ClientState& cs, LocalHolders& holders, std::shared_ptr<DownstreamState>& selectedBackend) -> ProcessQueryResult {
+ s_processQuery = [backend](DNSQuestion& dq, std::shared_ptr<DownstreamState>& selectedBackend) -> ProcessQueryResult {
selectedBackend = backend;
return ProcessQueryResult::PassToBackend;
};
{ ExpectedStep::ExpectedRequest::closeBackend, IOState::Done },
};
- s_processQuery = [backend](DNSQuestion& dq, ClientState& cs, LocalHolders& holders, std::shared_ptr<DownstreamState>& selectedBackend) -> ProcessQueryResult {
+ s_processQuery = [backend](DNSQuestion& dq, std::shared_ptr<DownstreamState>& selectedBackend) -> ProcessQueryResult {
selectedBackend = backend;
return ProcessQueryResult::PassToBackend;
};
{ ExpectedStep::ExpectedRequest::closeBackend, IOState::Done },
};
- s_processQuery = [backend](DNSQuestion& dq, ClientState& cs, LocalHolders& holders, std::shared_ptr<DownstreamState>& selectedBackend) -> ProcessQueryResult {
+ s_processQuery = [backend](DNSQuestion& dq, std::shared_ptr<DownstreamState>& selectedBackend) -> ProcessQueryResult {
selectedBackend = backend;
return ProcessQueryResult::PassToBackend;
};
{ ExpectedStep::ExpectedRequest::closeBackend, IOState::Done },
};
- s_processQuery = [backend](DNSQuestion& dq, ClientState& cs, LocalHolders& holders, std::shared_ptr<DownstreamState>& selectedBackend) -> ProcessQueryResult {
+ s_processQuery = [backend](DNSQuestion& dq, std::shared_ptr<DownstreamState>& selectedBackend) -> ProcessQueryResult {
selectedBackend = backend;
return ProcessQueryResult::PassToBackend;
};
/* close the connection with the client */
s_steps.push_back({ ExpectedStep::ExpectedRequest::closeClient, IOState::Done });
- s_processQuery = [backend](DNSQuestion& dq, ClientState& cs, LocalHolders& holders, std::shared_ptr<DownstreamState>& selectedBackend) -> ProcessQueryResult {
+ s_processQuery = [backend](DNSQuestion& dq, std::shared_ptr<DownstreamState>& selectedBackend) -> ProcessQueryResult {
selectedBackend = backend;
return ProcessQueryResult::PassToBackend;
};
#endif
}
+ {
+ /* 2 queries on the same connection, asynchronously handled, check that we only read the first one (no OOOR as maxInFlight is 0) */
+ TEST_INIT("=> 2 queries on the same connection, async");
+
+ size_t count = 2;
+
+ s_readBuffer = query;
+
+ for (size_t idx = 0; idx < count; idx++) {
+ appendPayloadEditingID(s_readBuffer, query, idx);
+ appendPayloadEditingID(s_backendReadBuffer, query, idx);
+ }
+
+ s_steps = { { ExpectedStep::ExpectedRequest::handshakeClient, IOState::Done },
+ { ExpectedStep::ExpectedRequest::readFromClient, IOState::Done, 2 },
+ { ExpectedStep::ExpectedRequest::readFromClient, IOState::Done, query.size() - 2 },
+ /* close the connection with the client */
+ { ExpectedStep::ExpectedRequest::closeClient, IOState::Done }
+ };
+
+ s_processQuery = [backend](DNSQuestion& dq, std::shared_ptr<DownstreamState>& selectedBackend) -> ProcessQueryResult {
+ selectedBackend = backend;
+ dq.asynchronous = true;
+ /* note that we do nothing with the query, we just tell the frontend it was dealt with */
+ return ProcessQueryResult::Asynchronous;
+ };
+ s_processResponse = [](PacketBuffer& response, DNSResponse& dr, bool muted) -> bool {
+ return true;
+ };
+
+ auto state = std::make_shared<IncomingTCPConnectionState>(ConnectionInfo(&localCS, getBackendAddress("84", 4242)), threadData, now);
+ IncomingTCPConnectionState::handleIO(state, now);
+ BOOST_CHECK_EQUAL(backend->outstanding.load(), 0U);
+
+ /* we need to clear them now, otherwise we end up with dangling pointers to the steps via the TLS context, etc */
+ IncomingTCPConnectionState::clearAllDownstreamConnections();
+ }
}
BOOST_AUTO_TEST_CASE(test_IncomingConnectionOOOR_BackendOOOR)
{ ExpectedStep::ExpectedRequest::closeBackend, IOState::Done },
};
- s_processQuery = [backend](DNSQuestion& dq, ClientState& cs, LocalHolders& holders, std::shared_ptr<DownstreamState>& selectedBackend) -> ProcessQueryResult {
+ s_processQuery = [backend](DNSQuestion& dq, std::shared_ptr<DownstreamState>& selectedBackend) -> ProcessQueryResult {
selectedBackend = backend;
return ProcessQueryResult::PassToBackend;
};
{ ExpectedStep::ExpectedRequest::closeBackend, IOState::Done },
};
- s_processQuery = [backend,&responses](DNSQuestion& dq, ClientState& cs, LocalHolders& holders, std::shared_ptr<DownstreamState>& selectedBackend) -> ProcessQueryResult {
+ s_processQuery = [backend,&responses](DNSQuestion& dq, std::shared_ptr<DownstreamState>& selectedBackend) -> ProcessQueryResult {
static size_t count = 0;
if (count++ == 3) {
/* self answered */
{ ExpectedStep::ExpectedRequest::closeClient, IOState::Done },
};
- s_processQuery = [backend](DNSQuestion& dq, ClientState& cs, LocalHolders& holders, std::shared_ptr<DownstreamState>& selectedBackend) -> ProcessQueryResult {
+ s_processQuery = [backend](DNSQuestion& dq, std::shared_ptr<DownstreamState>& selectedBackend) -> ProcessQueryResult {
selectedBackend = backend;
return ProcessQueryResult::PassToBackend;
};
};
counter = 0;
- s_processQuery = [backend,&counter](DNSQuestion& dq, ClientState& cs, LocalHolders& holders, std::shared_ptr<DownstreamState>& selectedBackend) -> ProcessQueryResult {
+ s_processQuery = [backend,&counter](DNSQuestion& dq, std::shared_ptr<DownstreamState>& selectedBackend) -> ProcessQueryResult {
if (counter == 0) {
++counter;
selectedBackend = backend;
};
counter = 0;
- s_processQuery = [backend,&counter](DNSQuestion& dq, ClientState& cs, LocalHolders& holders, std::shared_ptr<DownstreamState>& selectedBackend) -> ProcessQueryResult {
+ s_processQuery = [backend,&counter](DNSQuestion& dq, std::shared_ptr<DownstreamState>& selectedBackend) -> ProcessQueryResult {
if (counter == 0) {
++counter;
selectedBackend = backend;
{ ExpectedStep::ExpectedRequest::closeClient, IOState::Done, 0 },
};
- s_processQuery = [backend](DNSQuestion& dq, ClientState& cs, LocalHolders& holders, std::shared_ptr<DownstreamState>& selectedBackend) -> ProcessQueryResult {
+ s_processQuery = [backend](DNSQuestion& dq, std::shared_ptr<DownstreamState>& selectedBackend) -> ProcessQueryResult {
selectedBackend = backend;
return ProcessQueryResult::PassToBackend;
};
{ ExpectedStep::ExpectedRequest::closeClient, IOState::Done, 0 },
};
- s_processQuery = [backend](DNSQuestion& dq, ClientState& cs, LocalHolders& holders, std::shared_ptr<DownstreamState>& selectedBackend) -> ProcessQueryResult {
+ s_processQuery = [backend](DNSQuestion& dq, std::shared_ptr<DownstreamState>& selectedBackend) -> ProcessQueryResult {
selectedBackend = backend;
return ProcessQueryResult::PassToBackend;
};
{ ExpectedStep::ExpectedRequest::closeBackend, IOState::Done },
};
- s_processQuery = [backend](DNSQuestion& dq, ClientState& cs, LocalHolders& holders, std::shared_ptr<DownstreamState>& selectedBackend) -> ProcessQueryResult {
+ s_processQuery = [backend](DNSQuestion& dq, std::shared_ptr<DownstreamState>& selectedBackend) -> ProcessQueryResult {
selectedBackend = backend;
return ProcessQueryResult::PassToBackend;
};
{ ExpectedStep::ExpectedRequest::closeBackend, IOState::Done },
};
- s_processQuery = [proxyEnabledBackend](DNSQuestion& dq, ClientState& cs, LocalHolders& holders, std::shared_ptr<DownstreamState>& selectedBackend) -> ProcessQueryResult {
+ s_processQuery = [proxyEnabledBackend](DNSQuestion& dq, std::shared_ptr<DownstreamState>& selectedBackend) -> ProcessQueryResult {
selectedBackend = proxyEnabledBackend;
return ProcessQueryResult::PassToBackend;
};
{ ExpectedStep::ExpectedRequest::closeBackend, IOState::Done },
};
- s_processQuery = [backend](DNSQuestion& dq, ClientState& cs, LocalHolders& holders, std::shared_ptr<DownstreamState>& selectedBackend) -> ProcessQueryResult {
+ s_processQuery = [backend](DNSQuestion& dq, std::shared_ptr<DownstreamState>& selectedBackend) -> ProcessQueryResult {
selectedBackend = backend;
return ProcessQueryResult::PassToBackend;
};
{ ExpectedStep::ExpectedRequest::closeClient, IOState::Done, 0 },
};
- s_processQuery = [proxyEnabledBackend](DNSQuestion& dq, ClientState& cs, LocalHolders& holders, std::shared_ptr<DownstreamState>& selectedBackend) -> ProcessQueryResult {
+ s_processQuery = [proxyEnabledBackend](DNSQuestion& dq, std::shared_ptr<DownstreamState>& selectedBackend) -> ProcessQueryResult {
selectedBackend = proxyEnabledBackend;
return ProcessQueryResult::PassToBackend;
};
{ ExpectedStep::ExpectedRequest::closeClient, IOState::Done, 0 },
};
- s_processQuery = [proxyEnabledBackend](DNSQuestion& dq, ClientState& cs, LocalHolders& holders, std::shared_ptr<DownstreamState>& selectedBackend) -> ProcessQueryResult {
+ s_processQuery = [proxyEnabledBackend](DNSQuestion& dq, std::shared_ptr<DownstreamState>& selectedBackend) -> ProcessQueryResult {
selectedBackend = proxyEnabledBackend;
return ProcessQueryResult::PassToBackend;
};
{ ExpectedStep::ExpectedRequest::closeBackend, IOState::Done, 0 },
};
- s_processQuery = [backend](DNSQuestion& dq, ClientState& cs, LocalHolders& holders, std::shared_ptr<DownstreamState>& selectedBackend) -> ProcessQueryResult {
+ s_processQuery = [backend](DNSQuestion& dq, std::shared_ptr<DownstreamState>& selectedBackend) -> ProcessQueryResult {
selectedBackend = backend;
return ProcessQueryResult::PassToBackend;
};
{ ExpectedStep::ExpectedRequest::closeBackend, IOState::Done },
};
- s_processQuery = [backend1](DNSQuestion& dq, ClientState& cs, LocalHolders& holders, std::shared_ptr<DownstreamState>& selectedBackend) -> ProcessQueryResult {
+ s_processQuery = [backend1](DNSQuestion& dq, std::shared_ptr<DownstreamState>& selectedBackend) -> ProcessQueryResult {
selectedBackend = backend1;
return ProcessQueryResult::PassToBackend;
};
{ ExpectedStep::ExpectedRequest::closeClient, IOState::Done },
};
- s_processQuery = [backend](DNSQuestion& dq, ClientState& cs, LocalHolders& holders, std::shared_ptr<DownstreamState>& selectedBackend) -> ProcessQueryResult {
+ s_processQuery = [backend](DNSQuestion& dq, std::shared_ptr<DownstreamState>& selectedBackend) -> ProcessQueryResult {
selectedBackend = backend;
return ProcessQueryResult::PassToBackend;
};
{ ExpectedStep::ExpectedRequest::closeBackend, IOState::Done },
};
- s_processQuery = [backend](DNSQuestion& dq, ClientState& cs, LocalHolders& holders, std::shared_ptr<DownstreamState>& selectedBackend) -> ProcessQueryResult {
+ s_processQuery = [backend](DNSQuestion& dq, std::shared_ptr<DownstreamState>& selectedBackend) -> ProcessQueryResult {
selectedBackend = backend;
return ProcessQueryResult::PassToBackend;
};
/* we need to clear them now, otherwise we end up with dangling pointers to the steps via the TLS context, etc */
BOOST_CHECK_EQUAL(IncomingTCPConnectionState::clearAllDownstreamConnections(), 5U);
}
+
+ {
+ /* 2 queries on the same connection, asynchronously handled, check that we only read all of them (OOOR as maxInFlight is 65535) */
+ TEST_INIT("=> 2 queries on the same connection, async with OOOR");
+
+ size_t count = 2;
+
+ s_readBuffer = queries.at(0);
+
+ for (size_t idx = 0; idx < count; idx++) {
+ appendPayloadEditingID(s_readBuffer, queries.at(idx), idx);
+ appendPayloadEditingID(s_backendReadBuffer, queries.at(idx), idx);
+ }
+
+ bool timeout = false;
+ s_steps = { { ExpectedStep::ExpectedRequest::handshakeClient, IOState::Done },
+ { ExpectedStep::ExpectedRequest::readFromClient, IOState::Done, 2 },
+ { ExpectedStep::ExpectedRequest::readFromClient, IOState::Done, queries.at(0).size() - 2 },
+ { ExpectedStep::ExpectedRequest::readFromClient, IOState::Done, 2 },
+ { ExpectedStep::ExpectedRequest::readFromClient, IOState::Done, queries.at(1).size() - 2 },
+ { ExpectedStep::ExpectedRequest::readFromClient, IOState::NeedRead, 0, [&timeout](int desc) {
+ timeout = true;
+ }},
+ /* close the connection with the client */
+ { ExpectedStep::ExpectedRequest::closeClient, IOState::Done }
+ };
+
+ s_processQuery = [backend](DNSQuestion& dq, std::shared_ptr<DownstreamState>& selectedBackend) -> ProcessQueryResult {
+ selectedBackend = backend;
+ dq.asynchronous = true;
+ /* note that we do nothing with the query, we just tell the frontend it was dealt with */
+ return ProcessQueryResult::Asynchronous;
+ };
+ s_processResponse = [](PacketBuffer& response, DNSResponse& dr, bool muted) -> bool {
+ return true;
+ };
+
+ auto state = std::make_shared<IncomingTCPConnectionState>(ConnectionInfo(&localCS, getBackendAddress("84", 4242)), threadData, now);
+ IncomingTCPConnectionState::handleIO(state, now);
+ while (!timeout && (threadData.mplexer->getWatchedFDCount(false) != 0 || threadData.mplexer->getWatchedFDCount(true) != 0)) {
+ threadData.mplexer->run(&now);
+ }
+
+ struct timeval later = now;
+ later.tv_sec += g_tcpRecvTimeout + 1;
+ auto expiredConns = threadData.mplexer->getTimeouts(later);
+ BOOST_CHECK_EQUAL(expiredConns.size(), 1U);
+ for (const auto& cbData : expiredConns) {
+ if (cbData.second.type() == typeid(std::shared_ptr<IncomingTCPConnectionState>)) {
+ auto cbState = boost::any_cast<std::shared_ptr<IncomingTCPConnectionState>>(cbData.second);
+ cbState->handleTimeout(cbState, false);
+ }
+ }
+
+ BOOST_CHECK_EQUAL(backend->outstanding.load(), 0U);
+
+ /* we need to clear them now, otherwise we end up with dangling pointers to the steps via the TLS context, etc */
+ IncomingTCPConnectionState::clearAllDownstreamConnections();
+ }
}
BOOST_AUTO_TEST_SUITE_END();
void release()
{
}
+
size_t proxyProtocolPayloadSize{0};
uint16_t status_code{200};
};
void handleUDPResponseForDoH(std::unique_ptr<DOHUnit, void(*)(DOHUnit*)>&&, PacketBuffer&& response, InternalQueryState&& state);
+struct CrossProtocolQuery;
+struct DNSQuestion;
+
+std::unique_ptr<CrossProtocolQuery> getDoHCrossProtocolQueryFromDQ(DNSQuestion& dq, bool isResponse);
+
#endif /* HAVE_DNS_OVER_HTTPS */
using DOHUnitUniquePtr = std::unique_ptr<DOHUnit, void(*)(DOHUnit*)>;
return LockGuardedHolder<T>(d_value, d_mutex);
}
- LockGuardedHolder<const T> read_only_lock() const
+ LockGuardedHolder<const T> read_only_lock()
{
return LockGuardedHolder<const T>(d_value, d_mutex);
}
#include "dnsdist.hh"
#include "dnsdist-ecs.hh"
+#include "dnsdist-internal-queries.hh"
+#include "dnsdist-tcp.hh"
#include "dnsdist-xpf.hh"
#include "dolog.hh"
#include "ednscookies.hh"
#include "ednssubnet.hh"
-bool DNSDistSNMPAgent::sendBackendStatusChangeTrap(DownstreamState const&)
+ProcessQueryResult processQueryAfterRules(DNSQuestion& dq, LocalHolders& holders, std::shared_ptr<DownstreamState>& selectedBackend)
+{
+ return ProcessQueryResult::Drop;
+}
+
+bool processResponseAfterRules(PacketBuffer& response, const std::vector<DNSDistResponseRuleAction>& cacheInsertedRespRuleActions, DNSResponse& dr, bool muted)
+{
+ return false;
+}
+
+bool sendUDPResponse(int origFD, const PacketBuffer& response, const int delayMsec, const ComboAddress& origDest, const ComboAddress& origRemote)
{
return false;
}
return false;
}
+namespace dnsdist {
+std::unique_ptr<CrossProtocolQuery> getInternalQueryFromDQ(DNSQuestion& dq, bool isResponse)
+{
+ return nullptr;
+}
+}
+
+bool DNSDistSNMPAgent::sendBackendStatusChangeTrap(DownstreamState const&)
+{
+ return false;
+}
+
BOOST_AUTO_TEST_SUITE(test_dnsdist_cc)
static const uint16_t ECSSourcePrefixV4 = 24;