# information regarding copyright ownership.
from datetime import datetime, timedelta
+from collections import defaultdict
import os
+import dns.message
+import dns.query
+import dns.rcode
+
# ISO datetime format without msec
fmt = "%Y-%m-%dT%H:%M:%SZ"
max_expires = timedelta(seconds=14515200) # 24 weeks
dayzero = datetime.utcfromtimestamp(0).replace(microsecond=0)
+TIMEOUT = 10
+
# Generic helper functions
def check_expires(expires, min_time, max_time):
name = load_zone(zone)
if name == "manykeys":
check_manykeys(name)
+
+
+def create_msg(qname, qtype):
+ msg = dns.message.make_query(
+ qname, qtype, want_dnssec=True, use_edns=0, payload=4096
+ )
+
+ return msg
+
+
+def udp_query(ip, port, msg):
+ ans = dns.query.udp(msg, ip, TIMEOUT, port=port)
+ assert ans.rcode() == dns.rcode.NOERROR
+
+ return ans
+
+
+def tcp_query(ip, port, msg):
+ ans = dns.query.tcp(msg, ip, TIMEOUT, port=port)
+ assert ans.rcode() == dns.rcode.NOERROR
+
+ return ans
+
+
+def create_expected(data):
+ expected = {
+ "dns-tcp-requests-sizes-received-ipv4": defaultdict(int),
+ "dns-tcp-responses-sizes-sent-ipv4": defaultdict(int),
+ "dns-tcp-requests-sizes-received-ipv6": defaultdict(int),
+ "dns-tcp-responses-sizes-sent-ipv6": defaultdict(int),
+ "dns-udp-requests-sizes-received-ipv4": defaultdict(int),
+ "dns-udp-requests-sizes-received-ipv6": defaultdict(int),
+ "dns-udp-responses-sizes-sent-ipv4": defaultdict(int),
+ "dns-udp-responses-sizes-sent-ipv6": defaultdict(int),
+ }
+
+ for k, v in data.items():
+ for kk, vv in v.items():
+ expected[k][kk] += vv
+
+ return expected
+
+
+def update_expected(expected, key, msg):
+ msg_len = len(msg.to_wire())
+ bucket_num = (msg_len // 16) * 16
+ bucket = "{}-{}".format(bucket_num, bucket_num + 15)
+
+ expected[key][bucket] += 1
+
+
+def check_traffic(data, expected):
+ def ordered(obj):
+ if isinstance(obj, dict):
+ return sorted((k, ordered(v)) for k, v in obj.items())
+ if isinstance(obj, list):
+ return sorted(ordered(x) for x in obj)
+ return obj
+
+ ordered_data = ordered(data)
+ ordered_expected = ordered(expected)
+
+ assert len(ordered_data) == 8
+ assert len(ordered_expected) == 8
+ assert len(data) == len(ordered_data)
+ assert len(expected) == len(ordered_expected)
+
+ assert ordered_data == ordered_expected
+
+
+def test_traffic(fetch_traffic, **kwargs):
+ statsip = kwargs["statsip"]
+ statsport = kwargs["statsport"]
+ port = kwargs["port"]
+
+ data = fetch_traffic(statsip, statsport)
+ exp = create_expected(data)
+
+ msg = create_msg("short.example.", "TXT")
+ update_expected(exp, "dns-udp-requests-sizes-received-ipv4", msg)
+ ans = udp_query(statsip, port, msg)
+ update_expected(exp, "dns-udp-responses-sizes-sent-ipv4", ans)
+ data = fetch_traffic(statsip, statsport)
+
+ check_traffic(data, exp)
+
+ msg = create_msg("long.example.", "TXT")
+ update_expected(exp, "dns-udp-requests-sizes-received-ipv4", msg)
+ ans = udp_query(statsip, port, msg)
+ update_expected(exp, "dns-udp-responses-sizes-sent-ipv4", ans)
+ data = fetch_traffic(statsip, statsport)
+
+ check_traffic(data, exp)
+
+ msg = create_msg("short.example.", "TXT")
+ update_expected(exp, "dns-tcp-requests-sizes-received-ipv4", msg)
+ ans = tcp_query(statsip, port, msg)
+ update_expected(exp, "dns-tcp-responses-sizes-sent-ipv4", ans)
+ data = fetch_traffic(statsip, statsport)
+
+ check_traffic(data, exp)
+
+ msg = create_msg("long.example.", "TXT")
+ update_expected(exp, "dns-tcp-requests-sizes-received-ipv4", msg)
+ ans = tcp_query(statsip, port, msg)
+ update_expected(exp, "dns-tcp-responses-sizes-sent-ipv4", ans)
+ data = fetch_traffic(statsip, statsport)
+
+ check_traffic(data, exp)
+++ /dev/null
-# Copyright (C) Internet Systems Consortium, Inc. ("ISC")
-#
-# SPDX-License-Identifier: MPL-2.0
-#
-# This Source Code Form is subject to the terms of the Mozilla Public
-# License, v. 2.0. If a copy of the MPL was not distributed with this
-# file, you can obtain one at https://mozilla.org/MPL/2.0/.
-#
-# See the COPYRIGHT file distributed with this work for additional
-# information regarding copyright ownership.
-
-from collections import defaultdict
-
-import dns.message
-import dns.query
-import dns.rcode
-
-
-TIMEOUT = 10
-
-
-def create_msg(qname, qtype):
- msg = dns.message.make_query(
- qname, qtype, want_dnssec=True, use_edns=0, payload=4096
- )
-
- return msg
-
-
-def udp_query(ip, port, msg):
- ans = dns.query.udp(msg, ip, TIMEOUT, port=port)
- assert ans.rcode() == dns.rcode.NOERROR
-
- return ans
-
-
-def tcp_query(ip, port, msg):
- ans = dns.query.tcp(msg, ip, TIMEOUT, port=port)
- assert ans.rcode() == dns.rcode.NOERROR
-
- return ans
-
-
-def create_expected(data):
- expected = {
- "dns-tcp-requests-sizes-received-ipv4": defaultdict(int),
- "dns-tcp-responses-sizes-sent-ipv4": defaultdict(int),
- "dns-tcp-requests-sizes-received-ipv6": defaultdict(int),
- "dns-tcp-responses-sizes-sent-ipv6": defaultdict(int),
- "dns-udp-requests-sizes-received-ipv4": defaultdict(int),
- "dns-udp-requests-sizes-received-ipv6": defaultdict(int),
- "dns-udp-responses-sizes-sent-ipv4": defaultdict(int),
- "dns-udp-responses-sizes-sent-ipv6": defaultdict(int),
- }
-
- for k, v in data.items():
- for kk, vv in v.items():
- expected[k][kk] += vv
-
- return expected
-
-
-def update_expected(expected, key, msg):
- msg_len = len(msg.to_wire())
- bucket_num = (msg_len // 16) * 16
- bucket = "{}-{}".format(bucket_num, bucket_num + 15)
-
- expected[key][bucket] += 1
-
-
-def check_traffic(data, expected):
- def ordered(obj):
- if isinstance(obj, dict):
- return sorted((k, ordered(v)) for k, v in obj.items())
- if isinstance(obj, list):
- return sorted(ordered(x) for x in obj)
- return obj
-
- ordered_data = ordered(data)
- ordered_expected = ordered(expected)
-
- assert len(ordered_data) == 8
- assert len(ordered_expected) == 8
- assert len(data) == len(ordered_data)
- assert len(expected) == len(ordered_expected)
-
- assert ordered_data == ordered_expected
-
-
-def test_traffic(fetch_traffic, **kwargs):
- statsip = kwargs["statsip"]
- statsport = kwargs["statsport"]
- port = kwargs["port"]
-
- data = fetch_traffic(statsip, statsport)
- exp = create_expected(data)
-
- msg = create_msg("short.example.", "TXT")
- update_expected(exp, "dns-udp-requests-sizes-received-ipv4", msg)
- ans = udp_query(statsip, port, msg)
- update_expected(exp, "dns-udp-responses-sizes-sent-ipv4", ans)
- data = fetch_traffic(statsip, statsport)
-
- check_traffic(data, exp)
-
- msg = create_msg("long.example.", "TXT")
- update_expected(exp, "dns-udp-requests-sizes-received-ipv4", msg)
- ans = udp_query(statsip, port, msg)
- update_expected(exp, "dns-udp-responses-sizes-sent-ipv4", ans)
- data = fetch_traffic(statsip, statsport)
-
- check_traffic(data, exp)
-
- msg = create_msg("short.example.", "TXT")
- update_expected(exp, "dns-tcp-requests-sizes-received-ipv4", msg)
- ans = tcp_query(statsip, port, msg)
- update_expected(exp, "dns-tcp-responses-sizes-sent-ipv4", ans)
- data = fetch_traffic(statsip, statsport)
-
- check_traffic(data, exp)
-
- msg = create_msg("long.example.", "TXT")
- update_expected(exp, "dns-tcp-requests-sizes-received-ipv4", msg)
- ans = tcp_query(statsip, port, msg)
- update_expected(exp, "dns-tcp-responses-sizes-sent-ipv4", ans)
- data = fetch_traffic(statsip, statsport)
-
- check_traffic(data, exp)
def test_traffic_json(named_port, statsport):
- generic_dnspython = pytest.importorskip("generic_dnspython")
- generic_dnspython.test_traffic(
+ generic.test_traffic(
fetch_traffic_json, statsip="10.53.0.2", statsport=statsport, port=named_port
)
def test_traffic_xml(named_port, statsport):
- generic_dnspython = pytest.importorskip("generic_dnspython")
- generic_dnspython.test_traffic(
+ generic.test_traffic(
fetch_traffic_xml, statsip="10.53.0.2", statsport=statsport, port=named_port
)