self.assertTrue(msg.HasField("openTelemetry"))
def sendQueryAndGetProtobuf(
- self, useTCP=False, traceID="", spanID="", ednsTraceIDOpt=65500
+ self,
+ useTCP=False,
+ traceID="",
+ spanID="",
+ ednsTraceIDOpt=65500,
+ dropped=False,
+ querySentByDNSDist=True,
):
name = "query.ot.tests.powerdns.com."
else:
(receivedQuery, receivedResponse) = self.sendUDPQuery(query, response)
- self.assertTrue(receivedQuery)
- self.assertTrue(receivedResponse)
- receivedQuery.id = query.id
- self.assertEqual(query, receivedQuery)
- self.assertEqual(response, receivedResponse)
+ if querySentByDNSDist:
+ self.assertTrue(receivedQuery)
+ receivedQuery.id = query.id
+ self.assertEqual(query, receivedQuery)
+
+ if not dropped:
+ self.assertTrue(receivedResponse)
+ self.assertEqual(response, receivedResponse)
if self._protobufQueue.empty():
# let the protobuf messages the time to get there
# check the protobuf message corresponding to the UDP query
return self.getFirstProtobufMessage()
-
-class DNSDistOpenTelemetryProtobufBaseTest(DNSDistOpenTelemetryProtobufTest):
- def doTest(self, wasDelayed=False, useTCP=False, traceID="", spanID=""):
- msg = self.sendQueryAndGetProtobuf(useTCP, traceID, spanID)
-
- self.assertTrue(msg.HasField("openTelemetryTraceID"))
- self.assertNotEqual(msg.openTelemetryTraceID, "")
-
- if traceID != "":
- self.assertEqual(msg.openTelemetryTraceID, binascii.a2b_hex(traceID))
-
- self.assertTrue(msg.HasField("openTelemetryData"))
- traces_data = opentelemetry.proto.trace.v1.trace_pb2.TracesData()
- traces_data.ParseFromString(msg.openTelemetryData)
- ot_data = google.protobuf.json_format.MessageToDict(
- traces_data, preserving_proto_field_name=True
- )
-
- self.assertEqual(len(ot_data["resource_spans"]), 1)
- self.assertEqual(len(ot_data["resource_spans"][0]["resource"]["attributes"]), 1)
+ def checkOTData(
+ self,
+ otData,
+ hasProcessResponseAfterRules=False,
+ useTCP=False,
+ hasRemoteLogResponseAction=True,
+ hasSelectBackendForOutgoingQuery=True,
+ hasResponse=True,
+ extraFunctions=set(),
+ ):
+ self.assertEqual(len(otData["resource_spans"]), 1)
+ self.assertEqual(len(otData["resource_spans"][0]["resource"]["attributes"]), 1)
# Ensure all attributes exist
- for field in ot_data["resource_spans"][0]["resource"]["attributes"]:
+ for field in otData["resource_spans"][0]["resource"]["attributes"]:
self.assertIn(field["key"], ["service.name"])
# Ensure the values are correct
# TODO: query.remote with port
msg_scope_attr_keys = [
v["key"]
- for v in ot_data["resource_spans"][0]["scope_spans"][0]["scope"][
+ for v in otData["resource_spans"][0]["scope_spans"][0]["scope"][
"attributes"
]
]
root_span_attr_keys = [
v["key"]
- for v in ot_data["resource_spans"][0]["scope_spans"][0]["spans"][0][
+ for v in otData["resource_spans"][0]["scope_spans"][0]["spans"][0][
"attributes"
]
]
# No way to guess the test port, but check the rest of the values
root_span_attrs = {
v["key"]: v["value"]["string_value"]
- for v in ot_data["resource_spans"][0]["scope_spans"][0]["spans"][0][
+ for v in otData["resource_spans"][0]["scope_spans"][0]["spans"][0][
"attributes"
]
if v["key"] not in ["query.remote.port"]
)
msg_span_name = {
- v["name"] for v in ot_data["resource_spans"][0]["scope_spans"][0]["spans"]
+ v["name"] for v in otData["resource_spans"][0]["scope_spans"][0]["spans"]
}
funcs = {
"applyRulesToQuery",
"Rule: Enable tracing",
"applyRulesChainToQuery",
- "selectBackendForOutgoingQuery",
- "processResponse",
"applyRulesToResponse",
- "Rule: Do PB logging",
}
+ if hasSelectBackendForOutgoingQuery:
+ funcs.add("selectBackendForOutgoingQuery")
+
+ if hasResponse:
+ funcs.add("processResponse")
+
+ if hasRemoteLogResponseAction:
+ funcs.add("Rule: Do PB logging")
+
if useTCP:
funcs.add("IncomingTCPConnectionState::handleQuery")
else:
funcs.add("processUDPQuery")
- funcs.add("assignOutgoingUDPQueryToBackend")
+ if hasSelectBackendForOutgoingQuery:
+ funcs.add("assignOutgoingUDPQueryToBackend")
- if wasDelayed:
+ if hasProcessResponseAfterRules:
funcs.add("processResponseAfterRules")
+ funcs = funcs.union(extraFunctions)
+
self.assertSetEqual(msg_span_name, funcs)
+
+class DNSDistOpenTelemetryProtobufBaseTest(DNSDistOpenTelemetryProtobufTest):
+ def doTest(
+ self, hasProcessResponseAfterRules=False, useTCP=False, traceID="", spanID=""
+ ):
+ msg = self.sendQueryAndGetProtobuf(useTCP, traceID, spanID)
+
+ self.assertTrue(msg.HasField("openTelemetryTraceID"))
+ self.assertNotEqual(msg.openTelemetryTraceID, "")
+
+ if traceID != "":
+ self.assertEqual(msg.openTelemetryTraceID, binascii.a2b_hex(traceID))
+
+ self.assertTrue(msg.HasField("openTelemetryData"))
+ traces_data = opentelemetry.proto.trace.v1.trace_pb2.TracesData()
+ traces_data.ParseFromString(msg.openTelemetryData)
+ ot_data = google.protobuf.json_format.MessageToDict(
+ traces_data, preserving_proto_field_name=True
+ )
+
+ self.checkOTData(ot_data, hasProcessResponseAfterRules, useTCP)
+
traceId = base64.b64encode(msg.openTelemetryTraceID).decode()
for msg_span in ot_data["resource_spans"][0]["scope_spans"][0]["spans"]:
self.assertEqual(
self.doTest(True)
def testTCP(self):
- self.doTest(wasDelayed=True, useTCP=True)
+ self.doTest(hasProcessResponseAfterRules=True, useTCP=True)
class TestOpenTelemetryTracingBaseDelayLua(DNSDistOpenTelemetryProtobufBaseTest):
self.doTest(True)
def testTCP(self):
- self.doTest(wasDelayed=True, useTCP=True)
+ self.doTest(hasProcessResponseAfterRules=True, useTCP=True)
class TestOpenTelemetryTracingUseIncomingYAML(DNSDistOpenTelemetryProtobufBaseTest):
rl = newRemoteLogger('127.0.0.1:%d')
setOpenTelemetryTracing(true)
-addAction(AllRule(), SetTraceAction(true, true), {name="Enable tracing"})
+addAction(AllRule(), SetTraceAction(true, {}, true), {name="Enable tracing"})
addResponseAction(AllRule(), RemoteLogResponseAction(rl, nil, false, {}, {}, false), {name="Do PB logging"})
"""
def testEnabledButUnset(self):
self.doTest()
+
+
+class TestOpenTelemetryTracingBaseYAMLIncludedRemoteLoggerDropped(
+ DNSDistOpenTelemetryProtobufTest
+):
+ _yaml_config_params = [
+ "_testServerPort",
+ "_protobufServerPort",
+ ]
+ _yaml_config_template = """---
+logging:
+ open_telemetry_tracing: true
+
+backends:
+ - address: 127.0.0.1:%d
+ protocol: Do53
+
+remote_logging:
+ protobuf_loggers:
+ - name: pblog
+ address: 127.0.0.1:%d
+
+query_rules:
+ - name: Enable tracing
+ selector:
+ type: All
+ action:
+ type: SetTrace
+ value: true
+ remote_loggers:
+ - pblog
+
+response_rules:
+ - name: Drop
+ selector:
+ type: All
+ action:
+ type: Drop
+"""
+
+ def doTest(self, useTCP=False):
+ msg = self.sendQueryAndGetProtobuf(useTCP=useTCP, dropped=True)
+ traces_data = opentelemetry.proto.trace.v1.trace_pb2.TracesData()
+ traces_data.ParseFromString(msg.openTelemetryData)
+ ot_data = google.protobuf.json_format.MessageToDict(
+ traces_data, preserving_proto_field_name=True
+ )
+
+ self.checkOTData(
+ ot_data,
+ hasProcessResponseAfterRules=False,
+ hasRemoteLogResponseAction=False,
+ useTCP=useTCP,
+ extraFunctions={
+ "Rule: Drop",
+ },
+ )
+
+ def testBasic(self):
+ self.doTest(False)
+
+ def testTCP(self):
+ self.doTest(True)
+
+
+class TestOpenTelemetryTracingBaseLuaIncludedRemoteLoggerDropped(
+ TestOpenTelemetryTracingBaseYAMLIncludedRemoteLoggerDropped
+):
+ _yaml_config_params = []
+ _yaml_config_template = ""
+
+ _config_params = [
+ "_testServerPort",
+ "_protobufServerPort",
+ ]
+ _config_template = """
+newServer{address="127.0.0.1:%d"}
+rl = newRemoteLogger('127.0.0.1:%d')
+setOpenTelemetryTracing(true)
+
+addAction(AllRule(), SetTraceAction(true, {rl}), {name="Enable tracing"})
+addResponseAction(AllRule(), DropResponseAction(), {name="Drop"})
+"""
+
+
+class TestOpenTelemetryTracingBaseYAMLIncludedRemoteLoggerSpoofed(
+ DNSDistOpenTelemetryProtobufTest
+):
+ _yaml_config_params = [
+ "_testServerPort",
+ "_protobufServerPort",
+ ]
+ _yaml_config_template = """---
+logging:
+ open_telemetry_tracing: true
+
+backends:
+ - address: 127.0.0.1:%d
+ protocol: Do53
+
+remote_logging:
+ protobuf_loggers:
+ - name: pblog
+ address: 127.0.0.1:%d
+
+query_rules:
+ - name: Enable tracing
+ selector:
+ type: All
+ action:
+ type: SetTrace
+ value: true
+ remote_loggers:
+ - pblog
+ - name: Spoof A record
+ selector:
+ type: All
+ action:
+ type: Spoof
+ ips:
+ - 192.0.2.1
+"""
+
+ def doTest(self, useTCP=False):
+ msg = self.sendQueryAndGetProtobuf(
+ useTCP=useTCP, querySentByDNSDist=False, dropped=True
+ )
+ traces_data = opentelemetry.proto.trace.v1.trace_pb2.TracesData()
+ traces_data.ParseFromString(msg.openTelemetryData)
+ ot_data = google.protobuf.json_format.MessageToDict(
+ traces_data, preserving_proto_field_name=True
+ )
+
+ self.checkOTData(
+ ot_data,
+ hasProcessResponseAfterRules=False,
+ hasRemoteLogResponseAction=False,
+ useTCP=useTCP,
+ hasSelectBackendForOutgoingQuery=False,
+ hasResponse=False,
+ extraFunctions={
+ "Rule: Spoof A record",
+ },
+ )
+
+ def testBasic(self):
+ self.doTest()
+
+ def testTCP(self):
+ self.doTest(useTCP=True)