]> git.ipfire.org Git - thirdparty/bind9.git/commitdiff
Move helper functions in statchannel into single file
authorTom Krizek <tkrizek@isc.org>
Fri, 24 Nov 2023 14:44:32 +0000 (15:44 +0100)
committerTom Krizek <tkrizek@isc.org>
Tue, 5 Dec 2023 12:26:49 +0000 (13:26 +0100)
Since dnspython is now a required dependency, there's no need to keep
these two helper files separate.

bin/tests/system/statschannel/generic.py
bin/tests/system/statschannel/generic_dnspython.py [deleted file]
bin/tests/system/statschannel/tests_json.py
bin/tests/system/statschannel/tests_xml.py

index 9e926537cc10ce932f0332cd1faa5a238c2d067f..78dc4582294fe0194c226768427a05a34b255669 100644 (file)
 # information regarding copyright ownership.
 
 from datetime import datetime, timedelta
+from collections import defaultdict
 import os
 
+import dns.message
+import dns.query
+import dns.rcode
+
 
 # ISO datetime format without msec
 fmt = "%Y-%m-%dT%H:%M:%SZ"
@@ -21,6 +26,8 @@ max_refresh = timedelta(seconds=2419200)  # 4 weeks
 max_expires = timedelta(seconds=14515200)  # 24 weeks
 dayzero = datetime.utcfromtimestamp(0).replace(microsecond=0)
 
+TIMEOUT = 10
+
 
 # Generic helper functions
 def check_expires(expires, min_time, max_time):
@@ -104,3 +111,112 @@ def test_zone_with_many_keys(fetch_zones, load_zone, **kwargs):
         name = load_zone(zone)
         if name == "manykeys":
             check_manykeys(name)
+
+
+def create_msg(qname, qtype):
+    msg = dns.message.make_query(
+        qname, qtype, want_dnssec=True, use_edns=0, payload=4096
+    )
+
+    return msg
+
+
+def udp_query(ip, port, msg):
+    ans = dns.query.udp(msg, ip, TIMEOUT, port=port)
+    assert ans.rcode() == dns.rcode.NOERROR
+
+    return ans
+
+
+def tcp_query(ip, port, msg):
+    ans = dns.query.tcp(msg, ip, TIMEOUT, port=port)
+    assert ans.rcode() == dns.rcode.NOERROR
+
+    return ans
+
+
+def create_expected(data):
+    expected = {
+        "dns-tcp-requests-sizes-received-ipv4": defaultdict(int),
+        "dns-tcp-responses-sizes-sent-ipv4": defaultdict(int),
+        "dns-tcp-requests-sizes-received-ipv6": defaultdict(int),
+        "dns-tcp-responses-sizes-sent-ipv6": defaultdict(int),
+        "dns-udp-requests-sizes-received-ipv4": defaultdict(int),
+        "dns-udp-requests-sizes-received-ipv6": defaultdict(int),
+        "dns-udp-responses-sizes-sent-ipv4": defaultdict(int),
+        "dns-udp-responses-sizes-sent-ipv6": defaultdict(int),
+    }
+
+    for k, v in data.items():
+        for kk, vv in v.items():
+            expected[k][kk] += vv
+
+    return expected
+
+
+def update_expected(expected, key, msg):
+    msg_len = len(msg.to_wire())
+    bucket_num = (msg_len // 16) * 16
+    bucket = "{}-{}".format(bucket_num, bucket_num + 15)
+
+    expected[key][bucket] += 1
+
+
+def check_traffic(data, expected):
+    def ordered(obj):
+        if isinstance(obj, dict):
+            return sorted((k, ordered(v)) for k, v in obj.items())
+        if isinstance(obj, list):
+            return sorted(ordered(x) for x in obj)
+        return obj
+
+    ordered_data = ordered(data)
+    ordered_expected = ordered(expected)
+
+    assert len(ordered_data) == 8
+    assert len(ordered_expected) == 8
+    assert len(data) == len(ordered_data)
+    assert len(expected) == len(ordered_expected)
+
+    assert ordered_data == ordered_expected
+
+
+def test_traffic(fetch_traffic, **kwargs):
+    statsip = kwargs["statsip"]
+    statsport = kwargs["statsport"]
+    port = kwargs["port"]
+
+    data = fetch_traffic(statsip, statsport)
+    exp = create_expected(data)
+
+    msg = create_msg("short.example.", "TXT")
+    update_expected(exp, "dns-udp-requests-sizes-received-ipv4", msg)
+    ans = udp_query(statsip, port, msg)
+    update_expected(exp, "dns-udp-responses-sizes-sent-ipv4", ans)
+    data = fetch_traffic(statsip, statsport)
+
+    check_traffic(data, exp)
+
+    msg = create_msg("long.example.", "TXT")
+    update_expected(exp, "dns-udp-requests-sizes-received-ipv4", msg)
+    ans = udp_query(statsip, port, msg)
+    update_expected(exp, "dns-udp-responses-sizes-sent-ipv4", ans)
+    data = fetch_traffic(statsip, statsport)
+
+    check_traffic(data, exp)
+
+    msg = create_msg("short.example.", "TXT")
+    update_expected(exp, "dns-tcp-requests-sizes-received-ipv4", msg)
+    ans = tcp_query(statsip, port, msg)
+    update_expected(exp, "dns-tcp-responses-sizes-sent-ipv4", ans)
+    data = fetch_traffic(statsip, statsport)
+
+    check_traffic(data, exp)
+
+    msg = create_msg("long.example.", "TXT")
+    update_expected(exp, "dns-tcp-requests-sizes-received-ipv4", msg)
+    ans = tcp_query(statsip, port, msg)
+    update_expected(exp, "dns-tcp-responses-sizes-sent-ipv4", ans)
+    data = fetch_traffic(statsip, statsport)
+
+    check_traffic(data, exp)
diff --git a/bin/tests/system/statschannel/generic_dnspython.py b/bin/tests/system/statschannel/generic_dnspython.py
deleted file mode 100644 (file)
index 34a0398..0000000
+++ /dev/null
@@ -1,128 +0,0 @@
-# Copyright (C) Internet Systems Consortium, Inc. ("ISC")
-#
-# SPDX-License-Identifier: MPL-2.0
-#
-# This Source Code Form is subject to the terms of the Mozilla Public
-# License, v. 2.0.  If a copy of the MPL was not distributed with this
-# file, you can obtain one at https://mozilla.org/MPL/2.0/.
-#
-# See the COPYRIGHT file distributed with this work for additional
-# information regarding copyright ownership.
-
-from collections import defaultdict
-
-import dns.message
-import dns.query
-import dns.rcode
-
-
-TIMEOUT = 10
-
-
-def create_msg(qname, qtype):
-    msg = dns.message.make_query(
-        qname, qtype, want_dnssec=True, use_edns=0, payload=4096
-    )
-
-    return msg
-
-
-def udp_query(ip, port, msg):
-    ans = dns.query.udp(msg, ip, TIMEOUT, port=port)
-    assert ans.rcode() == dns.rcode.NOERROR
-
-    return ans
-
-
-def tcp_query(ip, port, msg):
-    ans = dns.query.tcp(msg, ip, TIMEOUT, port=port)
-    assert ans.rcode() == dns.rcode.NOERROR
-
-    return ans
-
-
-def create_expected(data):
-    expected = {
-        "dns-tcp-requests-sizes-received-ipv4": defaultdict(int),
-        "dns-tcp-responses-sizes-sent-ipv4": defaultdict(int),
-        "dns-tcp-requests-sizes-received-ipv6": defaultdict(int),
-        "dns-tcp-responses-sizes-sent-ipv6": defaultdict(int),
-        "dns-udp-requests-sizes-received-ipv4": defaultdict(int),
-        "dns-udp-requests-sizes-received-ipv6": defaultdict(int),
-        "dns-udp-responses-sizes-sent-ipv4": defaultdict(int),
-        "dns-udp-responses-sizes-sent-ipv6": defaultdict(int),
-    }
-
-    for k, v in data.items():
-        for kk, vv in v.items():
-            expected[k][kk] += vv
-
-    return expected
-
-
-def update_expected(expected, key, msg):
-    msg_len = len(msg.to_wire())
-    bucket_num = (msg_len // 16) * 16
-    bucket = "{}-{}".format(bucket_num, bucket_num + 15)
-
-    expected[key][bucket] += 1
-
-
-def check_traffic(data, expected):
-    def ordered(obj):
-        if isinstance(obj, dict):
-            return sorted((k, ordered(v)) for k, v in obj.items())
-        if isinstance(obj, list):
-            return sorted(ordered(x) for x in obj)
-        return obj
-
-    ordered_data = ordered(data)
-    ordered_expected = ordered(expected)
-
-    assert len(ordered_data) == 8
-    assert len(ordered_expected) == 8
-    assert len(data) == len(ordered_data)
-    assert len(expected) == len(ordered_expected)
-
-    assert ordered_data == ordered_expected
-
-
-def test_traffic(fetch_traffic, **kwargs):
-    statsip = kwargs["statsip"]
-    statsport = kwargs["statsport"]
-    port = kwargs["port"]
-
-    data = fetch_traffic(statsip, statsport)
-    exp = create_expected(data)
-
-    msg = create_msg("short.example.", "TXT")
-    update_expected(exp, "dns-udp-requests-sizes-received-ipv4", msg)
-    ans = udp_query(statsip, port, msg)
-    update_expected(exp, "dns-udp-responses-sizes-sent-ipv4", ans)
-    data = fetch_traffic(statsip, statsport)
-
-    check_traffic(data, exp)
-
-    msg = create_msg("long.example.", "TXT")
-    update_expected(exp, "dns-udp-requests-sizes-received-ipv4", msg)
-    ans = udp_query(statsip, port, msg)
-    update_expected(exp, "dns-udp-responses-sizes-sent-ipv4", ans)
-    data = fetch_traffic(statsip, statsport)
-
-    check_traffic(data, exp)
-
-    msg = create_msg("short.example.", "TXT")
-    update_expected(exp, "dns-tcp-requests-sizes-received-ipv4", msg)
-    ans = tcp_query(statsip, port, msg)
-    update_expected(exp, "dns-tcp-responses-sizes-sent-ipv4", ans)
-    data = fetch_traffic(statsip, statsport)
-
-    check_traffic(data, exp)
-
-    msg = create_msg("long.example.", "TXT")
-    update_expected(exp, "dns-tcp-requests-sizes-received-ipv4", msg)
-    ans = tcp_query(statsip, port, msg)
-    update_expected(exp, "dns-tcp-responses-sizes-sent-ipv4", ans)
-    data = fetch_traffic(statsip, statsport)
-
-    check_traffic(data, exp)
index c4599258ea4365a56ff2ede78286ad280172c4ec..cf1deb6fd3ec0400907baa3cdc77c122cbfce1fa 100755 (executable)
@@ -99,7 +99,6 @@ def test_zone_with_many_keys_json(statsport):
 
 
 def test_traffic_json(named_port, statsport):
-    generic_dnspython = pytest.importorskip("generic_dnspython")
-    generic_dnspython.test_traffic(
+    generic.test_traffic(
         fetch_traffic_json, statsip="10.53.0.2", statsport=statsport, port=named_port
     )
index 7f0b37e846368bedf6959eb9807824010f8b23b8..29c2e88bfd598e7a6355c6fac10d1625c3e80a34 100755 (executable)
@@ -129,7 +129,6 @@ def test_zone_with_many_keys_xml(statsport):
 
 
 def test_traffic_xml(named_port, statsport):
-    generic_dnspython = pytest.importorskip("generic_dnspython")
-    generic_dnspython.test_traffic(
+    generic.test_traffic(
         fetch_traffic_xml, statsip="10.53.0.2", statsport=statsport, port=named_port
     )