bool DownstreamState::passCrossProtocolQuery(std::unique_ptr<CrossProtocolQuery>&& cpq)
{
+ auto closer = cpq->query.d_idstate.getCloser(__func__); // NOLINT(cppcoreguidelines-pro-bounds-array-to-pointer-decay)
#if defined(HAVE_DNS_OVER_HTTPS) && defined(HAVE_NGHTTP2)
if (!d_config.d_dohPath.empty()) {
return g_dohClientThreads && g_dohClientThreads->passCrossProtocolQueryToThread(std::move(cpq));
static void prepareQueryForSending(TCPQuery& query, uint16_t queryID, ConnectionState connectionState)
{
+ auto closer = query.d_idstate.getCloser(__func__); // NOLINT(cppcoreguidelines-pro-bounds-array-to-pointer-decay)
if (connectionState == ConnectionState::needProxy) {
if (query.d_proxyProtocolPayload.size() > 0 && !query.d_proxyProtocolPayloadAdded) {
query.d_buffer.insert(query.d_buffer.begin(), query.d_proxyProtocolPayload.begin(), query.d_proxyProtocolPayload.end());
editPayloadID(query.d_buffer, queryID, query.d_proxyProtocolPayloadAdded ? query.d_idstate.d_proxyProtocolPayloadSize : 0, true);
}
+static const string classnamePrefix = "TCPConnectionToBackend::";
IOState TCPConnectionToBackend::queueNextQuery(std::shared_ptr<TCPConnectionToBackend>& conn)
{
conn->d_currentQuery = std::move(conn->d_pendingQueries.front());
+ auto closer = conn->d_currentQuery.d_query.d_idstate.getCloser(classnamePrefix + __func__);
uint16_t id = conn->d_highestStreamID;
prepareQueryForSending(conn->d_currentQuery.d_query, id, conn->needProxyProtocolPayload() ? ConnectionState::needProxy : ConnectionState::proxySent);
IOState TCPConnectionToBackend::sendQuery(std::shared_ptr<TCPConnectionToBackend>& conn, const struct timeval& now)
{
+ auto closer = conn->d_currentQuery.d_query.d_idstate.getCloser(classnamePrefix + __func__);
(void)now;
DEBUGLOG("sending query to backend "<<conn->getDS()->getNameWithAddr()<<" over FD "<<conn->d_handler->getDescriptor());
void TCPConnectionToBackend::queueQuery(std::shared_ptr<TCPQuerySender>& sender, TCPQuery&& query)
{
+ auto closer = query.d_idstate.getCloser(classnamePrefix + __func__);
if (!d_ioState) {
d_ioState = make_unique<IOStateHandler>(*d_mplexer, d_handler->getDescriptor());
}
notifyAllQueriesFailed(now, FailureReason::unexpectedQueryID);
return IOState::Done;
}
+ auto closer = it->second.d_query.d_idstate.getCloser(classnamePrefix + __func__);
editPayloadID(d_responseBuffer, ntohs(it->second.d_query.d_idstate.origID), 0, false);
void IncomingTCPConnectionState::queueResponse(std::shared_ptr<IncomingTCPConnectionState>& state, const struct timeval& now, TCPResponse&& response, bool fromBackend)
{
+ auto closer = response.d_idstate.getCloser(__func__); // NOLINT(cppcoreguidelines-pro-bounds-array-to-pointer-decay)
// queue response
state->d_queuedResponses.emplace_back(std::move(response));
DEBUGLOG("queueing response, state is " << (int)state->d_state << ", queue size is now " << state->d_queuedResponses.size());
/* called from the backend code when a new response has been received */
void IncomingTCPConnectionState::handleResponse(const struct timeval& now, TCPResponse&& response)
{
+ auto closer = response.d_idstate.getCloser(__func__); // NOLINT(cppcoreguidelines-pro-bounds-array-to-pointer-decay)
if (std::this_thread::get_id() != d_creatorThreadID) {
handleCrossProtocolResponse(now, std::move(response));
return;
void IncomingTCPConnectionState::handleCrossProtocolResponse(const struct timeval& now, TCPResponse&& response)
{
+ auto closer = response.d_idstate.getCloser(__func__); // NOLINT(cppcoreguidelines-pro-bounds-array-to-pointer-decay)
std::shared_ptr<IncomingTCPConnectionState> state = shared_from_this();
try {
auto ptr = std::make_unique<TCPCrossProtocolResponse>(std::move(response), state, now);
}
}
- pdns::trace::dnsdist::Tracer::Closer closer;
- if (auto tracer = ids.getTracer(); tracer != nullptr) {
- // TODO: figure out if this is a root span
- closer = tracer->openSpan("IncomingTCPConnectionState::handleQuery", tracer->getLastSpanID());
- }
+ static const std::string classnamePrefix = "IncomingTCPConnectionState::";
+ auto closer = ids.getCloser(classnamePrefix + __func__); // NOLINT(cppcoreguidelines-pro-bounds-array-to-pointer-decay)
// NOLINTNEXTLINE(cppcoreguidelines-pro-type-reinterpret-cast
ids.qname = DNSName(reinterpret_cast<const char*>(query.data()), static_cast<int>(query.size()), sizeof(dnsheader), false, &ids.qtype, &ids.qclass);
prependSizeToTCPQuery(query, 0);
- auto downstreamConnection = getDownstreamConnection(backend, dnsQuestion.proxyProtocolValues, now);
+ std::shared_ptr<TCPConnectionToBackend> downstreamConnection;
+ {
+ auto dscCloser = dnsQuestion.ids.getCloser("getDownstreamConnection");
+ downstreamConnection = getDownstreamConnection(backend, dnsQuestion.proxyProtocolValues, now);
+ }
if (backend->d_config.useProxyProtocol) {
/* if we ever sent a TLV over a connection, we can never go back */
downstreamConnection->setProxyProtocolValuesSent(std::move(dnsQuestion.proxyProtocolValues));
}
- TCPQuery tcpquery(std::move(query), std::move(ids));
- tcpquery.d_proxyProtocolPayload = std::move(proxyProtocolPayload);
+ TCPQuery tcpquery;
+ {
+ auto tcpqueryCloser = dnsQuestion.ids.getCloser("createTCPQuery");
+ tcpquery = {std::move(query), std::move(ids)};
+ tcpquery.d_proxyProtocolPayload = std::move(proxyProtocolPayload);
+ }
vinfolog("Got query for %s|%s from %s (%s, %d bytes), relayed to %s", tcpquery.d_idstate.qname.toLogString(), QType(tcpquery.d_idstate.qtype).toString(), d_proxiedRemote.toStringWithPort(), getProtocol().toString(), tcpquery.d_buffer.size(), backend->getNameWithAddr());
std::shared_ptr<TCPQuerySender> incoming = state;
class DNSDistOpenTelemetryProtobufBaseTest(DNSDistOpenTelemetryProtobufTest):
def doTest(
- self, hasProcessResponseAfterRules=False, useTCP=False, traceID="", spanID=""
+ self,
+ hasProcessResponseAfterRules=False,
+ useTCP=False,
+ traceID="",
+ spanID="",
+ extraFunctions=set(),
):
msg = self.sendQueryAndGetProtobuf(useTCP, traceID, spanID)
traces_data, preserving_proto_field_name=True
)
- self.checkOTData(ot_data, hasProcessResponseAfterRules, useTCP)
+ self.checkOTData(
+ ot_data, hasProcessResponseAfterRules, useTCP, extraFunctions=extraFunctions
+ )
traceId = base64.b64encode(msg.openTelemetryTraceID).decode()
for msg_span in ot_data["resource_spans"][0]["scope_spans"][0]["spans"]:
self.doTest()
def testTCP(self):
- self.doTest(useTCP=True)
+ self.doTest(
+ useTCP=True,
+ extraFunctions={
+ "createTCPQuery",
+ "TCPConnectionToBackend::handleResponse",
+ "getDownstreamConnection",
+ "TCPConnectionToBackend::sendQuery",
+ "handleResponse",
+ "prepareQueryForSending",
+ "TCPConnectionToBackend::queueQuery",
+ },
+ )
class TestOpenTelemetryTracingBaseLua(DNSDistOpenTelemetryProtobufBaseTest):
self.doTest()
def testTCP(self):
- self.doTest(useTCP=True)
+ self.doTest(
+ useTCP=True,
+ extraFunctions={
+ "createTCPQuery",
+ "TCPConnectionToBackend::handleResponse",
+ "getDownstreamConnection",
+ "TCPConnectionToBackend::sendQuery",
+ "handleResponse",
+ "prepareQueryForSending",
+ "TCPConnectionToBackend::queueQuery",
+ },
+ )
class TestOpenTelemetryTracingBaseDelayYAML(DNSDistOpenTelemetryProtobufBaseTest):
self.doTest(True)
def testTCP(self):
- self.doTest(hasProcessResponseAfterRules=True, useTCP=True)
+ self.doTest(
+ hasProcessResponseAfterRules=True,
+ useTCP=True,
+ extraFunctions={
+ "queueResponse",
+ "handleResponse",
+ "TCPConnectionToBackend::queueQuery",
+ "createTCPQuery",
+ "prepareQueryForSending",
+ "getDownstreamConnection",
+ "TCPConnectionToBackend::handleResponse",
+ "TCPConnectionToBackend::sendQuery",
+ },
+ )
class TestOpenTelemetryTracingBaseDelayLua(DNSDistOpenTelemetryProtobufBaseTest):
self.doTest(True)
def testTCP(self):
- self.doTest(hasProcessResponseAfterRules=True, useTCP=True)
+ self.doTest(
+ hasProcessResponseAfterRules=True,
+ useTCP=True,
+ extraFunctions={
+ "queueResponse",
+ "handleResponse",
+ "TCPConnectionToBackend::queueQuery",
+ "createTCPQuery",
+ "prepareQueryForSending",
+ "getDownstreamConnection",
+ "TCPConnectionToBackend::handleResponse",
+ "TCPConnectionToBackend::sendQuery",
+ },
+ )
class TestOpenTelemetryTracingUseIncomingYAML(DNSDistOpenTelemetryProtobufBaseTest):
type: Drop
"""
- def doTest(self, useTCP=False):
+ def doTest(self, useTCP=False, extraFunctions=set()):
msg = self.sendQueryAndGetProtobuf(useTCP=useTCP, dropped=True)
traces_data = opentelemetry.proto.trace.v1.trace_pb2.TracesData()
traces_data.ParseFromString(msg.openTelemetryData)
traces_data, preserving_proto_field_name=True
)
+ funcs = extraFunctions.union(
+ {
+ "ResponseRule: Drop",
+ }
+ )
+
self.checkOTData(
ot_data,
hasProcessResponseAfterRules=False,
hasRemoteLogResponseAction=False,
useTCP=useTCP,
- extraFunctions={
- "ResponseRule: Drop",
- },
+ extraFunctions=funcs,
)
def testBasic(self):
self.doTest(False)
def testTCP(self):
- self.doTest(True)
+ self.doTest(
+ True,
+ extraFunctions={
+ "handleResponse",
+ "TCPConnectionToBackend::queueQuery",
+ "getDownstreamConnection",
+ "createTCPQuery",
+ "TCPConnectionToBackend::handleResponse",
+ "TCPConnectionToBackend::sendQuery",
+ "prepareQueryForSending",
+ },
+ )
class TestOpenTelemetryTracingBaseLuaIncludedRemoteLoggerDropped(
- 192.0.2.1
"""
- def doTest(self, useTCP=False):
+ def doTest(self, useTCP=False, extraFunctions=set()):
msg = self.sendQueryAndGetProtobuf(
useTCP=useTCP, querySentByDNSDist=False, dropped=True
)
traces_data, preserving_proto_field_name=True
)
+ funcs = extraFunctions.union({"Rule: Spoof A record"})
self.checkOTData(
ot_data,
hasProcessResponseAfterRules=False,
useTCP=useTCP,
hasSelectBackendForOutgoingQuery=False,
hasResponse=False,
- extraFunctions={
- "Rule: Spoof A record",
- },
+ extraFunctions=funcs,
)
def testBasic(self):
self.doTest()
def testTCP(self):
- self.doTest(useTCP=True)
+ self.doTest(useTCP=True, extraFunctions={"queueResponse"})