]> git.ipfire.org Git - thirdparty/bind9.git/commitdiff
Use Re() for creating regular expressions
authorNicki Křížek <nicki@isc.org>
Thu, 2 Oct 2025 09:47:56 +0000 (11:47 +0200)
committerNicki Křížek <nicki@isc.org>
Mon, 8 Dec 2025 13:57:47 +0000 (14:57 +0100)
It's a fairly common pattern to use regular expression in our tests.
Instead of using the fairly verbose re.compile(), import that function
as Re() instead to allow for more brevity in the test syntax.

12 files changed:
bin/tests/system/cipher-suites/tests_cipher_suites.py
bin/tests/system/configloading/tests_configloading.py
bin/tests/system/conftest.py
bin/tests/system/dnssec/tests_signing.py
bin/tests/system/dnssec/tests_tat.py
bin/tests/system/dnssec/tests_validation_multiview.py
bin/tests/system/isctest/kasp.py
bin/tests/system/isctest/log/watchlog.py
bin/tests/system/isctest/vars/openssl.py
bin/tests/system/multisigner/tests_multisigner.py
bin/tests/system/xfer-servers-list/tests_xfer_servers_list.py
bin/tests/system/xferquota/tests_xferquota.py

index edc3c09598a0314a18b8153c6fa4cd44231460d4..86adc29da11257b0aab19fce6e580482455e6de3 100644 (file)
@@ -9,7 +9,7 @@
 # See the COPYRIGHT file distributed with this work for additional
 # information regarding copyright ownership.
 
-import re
+from re import compile as Re
 
 import pytest
 
@@ -31,7 +31,7 @@ pytestmark = pytest.mark.extra_artifacts(
 @pytest.fixture(scope="module")
 def transfers_complete(servers):
     for zone in ["example", "example-aes-128", "example-aes-256", "example-chacha-20"]:
-        pattern = re.compile(
+        pattern = Re(
             f"transfer of '{zone}/IN' from 10.53.0.1#[0-9]+: Transfer completed"
         )
         for ns in ["ns2", "ns3", "ns4", "ns5"]:
index de3d25aa71e16174aecf03a8dbb0d8486616a232..ec017a8bfe5ea410f971afe7201f987444af4666 100644 (file)
@@ -9,8 +9,7 @@
 # See the COPYRIGHT file distributed with this work for additional
 # information regarding copyright ownership.
 
-import re
-
+from re import compile as Re
 
 import isctest
 
@@ -59,7 +58,7 @@ def test_reload_fails_log(ns1, templates):
         "apply_configuration",
         "loop exclusive mode: starting",
         "apply_configuration: configure_views",
-        re.compile(r".*port '9999999' out of range"),
+        Re(r".*port '9999999' out of range"),
         "apply_configuration: detaching views",
         "loop exclusive mode: ending",
         "reloading configuration failed",
index bbb2c283859608fd5c8555013551645beddd3b57..9eff2ee8b2b2a64cb184ff4320a42ff607b74135 100644 (file)
@@ -12,7 +12,7 @@
 import filecmp
 import os
 from pathlib import Path
-import re
+from re import compile as Re
 import shutil
 import subprocess
 import tempfile
@@ -53,7 +53,7 @@ else:
 
 XDIST_WORKER = os.environ.get("PYTEST_XDIST_WORKER", "")
 FILE_DIR = os.path.abspath(Path(__file__).parent)
-ENV_RE = re.compile(b"([^=]+)=(.*)")
+ENV_RE = Re(b"([^=]+)=(.*)")
 PRIORITY_TESTS = [
     # Tests that are scheduled first. Speeds up parallel execution.
     "rpz/",
@@ -62,9 +62,9 @@ PRIORITY_TESTS = [
     "timeouts/",
     "upforwd/",
 ]
-PRIORITY_TESTS_RE = re.compile("|".join(PRIORITY_TESTS))
-SYSTEM_TEST_NAME_RE = re.compile(f"{SYSTEM_TEST_DIR_GIT_PATH}" + r"/([^/]+)")
-SYMLINK_REPLACEMENT_RE = re.compile(r"/tests(_.*)\.py")
+PRIORITY_TESTS_RE = Re("|".join(PRIORITY_TESTS))
+SYSTEM_TEST_NAME_RE = Re(f"{SYSTEM_TEST_DIR_GIT_PATH}" + r"/([^/]+)")
+SYMLINK_REPLACEMENT_RE = Re(r"/tests(_.*)\.py")
 
 # ----------------------- Global requirements ----------------------------
 
index b0295e5840cca8836215e57385219367603c2d73..2a1abd531d3933da2773b4739bbed1b1c6ee884d 100644 (file)
@@ -12,6 +12,7 @@
 from collections import namedtuple
 import os
 import re
+from re import compile as Re
 import struct
 import time
 
@@ -475,7 +476,7 @@ def test_offline_ksk_signing(ns2):
         os.rename(f"ns2/{KSK}.private.bak", f"ns2/{KSK}.private")
 
     def loadkeys():
-        pattern = re.compile(f"{zone}/IN.*next key event")
+        pattern = Re(f"{zone}/IN.*next key event")
         with ns2.watch_log_from_here() as watcher:
             ns2.rndc(f"loadkeys {zone}", log=False)
             watcher.wait_for_line(pattern)
index 609f110c5985449fece47f889bab3161d9fa0f8f..e37644f3a9a80a15afb9721220e2b4da51e56341 100644 (file)
@@ -10,7 +10,7 @@
 # information regarding copyright ownership.
 
 import os
-import re
+from re import compile as Re
 
 from dns import edns
 
@@ -67,7 +67,7 @@ def test_tat_queries(ns1, ns6):
     msg = isctest.query.create(".", "DNSKEY")
     opt = edns.GenericOption(14, b"\xff\xff")
     msg.use_edns(edns=True, options=[opt])
-    pattern = re.compile("trust-anchor-telemetry './IN' from .* 65535")
+    pattern = Re("trust-anchor-telemetry './IN' from .* 65535")
     with ns1.watch_log_from_here() as watcher:
         res = isctest.query.tcp(msg, "10.53.0.1")
         watcher.wait_for_line(pattern)
@@ -79,7 +79,7 @@ def test_tat_queries(ns1, ns6):
     opt1 = edns.GenericOption(14, b"\xff\xff")
     opt2 = edns.GenericOption(14, b"\xff\xfe")
     msg.use_edns(edns=True, options=[opt2, opt1])
-    pattern = re.compile("trust-anchor-telemetry './IN' from .* 65534")
+    pattern = Re("trust-anchor-telemetry './IN' from .* 65534")
     with ns1.watch_log_from_here() as watcher:
         res = isctest.query.tcp(msg, "10.53.0.1")
         isctest.check.noerror(res)
index 085f388b2c9092bd04ffa245ff34a956ccfacc9e..015e458349c0a098d007a9aa8edacffb452b604f 100644 (file)
@@ -10,7 +10,7 @@
 # information regarding copyright ownership.
 
 import os
-import re
+from re import compile as Re
 
 import isctest
 
@@ -47,7 +47,7 @@ def test_staticstub_delegations():
 
 def test_validator_logging(ns4):
     # check that validator logging includes the view name with multiple views
-    pattern = re.compile("view rec: *validat")
+    pattern = Re("view rec: *validat")
     with ns4.watch_log_from_start() as watcher:
         msg = isctest.query.create("secure.example", "NS")
         isctest.query.tcp(msg, "10.53.0.4")
index 2e1b2c36fb8b911fb3828f92bfa50981866f3d17..f46ceabe73782547606fe99fecd3c4c4e0025f32 100644 (file)
@@ -15,6 +15,7 @@ import glob
 import os
 from pathlib import Path
 import re
+from re import compile as Re
 import time
 from typing import Dict, List, Optional, Tuple, Union
 
@@ -1475,7 +1476,7 @@ def next_key_event_equals(server, zone, next_event):
         waitfor = rf".*zone {zone}.*: next key event in (?!3600$)(.*) seconds"
 
     with server.watch_log_from_start() as watcher:
-        watcher.wait_for_line(re.compile(waitfor))
+        watcher.wait_for_line(Re(waitfor))
 
     # WMM: The with code below is extracting the line the watcher was
     # waiting for. If WatchLog.wait_for_line()` returned the matched string,
index 6d85e9817d1ba28ed510dae779306ad994949619..d2d8521ebda589ee16fa848c630fc00510eb19a6 100644 (file)
@@ -14,6 +14,7 @@ from typing import Any, Iterator, List, Match, Optional, Pattern, TextIO, TypeVa
 import abc
 import os
 import re
+from re import compile as Re
 import time
 
 
@@ -213,7 +214,7 @@ class WatchLog(abc.ABC):
             if isinstance(string, Pattern):
                 patterns.append(string)
             elif isinstance(string, str):
-                pattern = re.compile(re.escape(string))
+                pattern = Re(re.escape(string))
                 patterns.append(pattern)
             else:
                 raise WatchLogException(
@@ -256,13 +257,14 @@ class WatchLog(abc.ABC):
         Recommended use:
 
         ```python
+        from re import compile as Re
         import isctest
 
         def test_foo(servers):
             with servers["ns1"].watch_log_from_start() as watcher:
                 watcher.wait_for_line("all zones loaded")
 
-            pattern = re.compile(r"next key event in ([0-9]+) seconds")
+            pattern = Re(r"next key event in ([0-9]+) seconds")
             with servers["ns1"].watch_log_from_here() as watcher:
                 # ... do stuff here ...
                 match = watcher.wait_for_line(pattern)
@@ -321,7 +323,8 @@ class WatchLog(abc.ABC):
         >>> # Different values must be returned depending on which line is
         >>> # found in the log file.
         >>> import tempfile
-        >>> patterns = [re.compile(r"bar ([0-9])"), "qux"]
+        >>> from re import compile as Re
+        >>> patterns = [Re(r"bar ([0-9])"), "qux"]
         >>> with tempfile.NamedTemporaryFile("w") as file:
         ...     print("foo bar 3", file=file, flush=True)
         ...     with WatchLogFromStart(file.name) as watcher:
@@ -443,7 +446,8 @@ class WatchLog(abc.ABC):
         >>> assert ret[1].group(0) == "foo"
 
         >>> import tempfile
-        >>> bar_pattern = re.compile('bar')
+        >>> from re import compile as Re
+        >>> bar_pattern = Re('bar')
         >>> patterns = ['foo', bar_pattern]
         >>> with tempfile.NamedTemporaryFile("w") as file:
         ...     print("bar", file=file, flush=True)
index 19d62d1811418a87432700c11104d88b3b38b6b4..3d1829e7235396fef2a854f1c93ce8b36f9d4b97 100644 (file)
@@ -10,7 +10,7 @@
 # information regarding copyright ownership.
 
 import os
-import re
+from re import compile as Re
 from typing import Optional
 
 from .. import log
@@ -30,7 +30,7 @@ def parse_openssl_config(path: Optional[str]):
         return
     assert os.path.isfile(path), f"{path} exists, but it's not a file"
 
-    regex = re.compile(r"([^=]+)=(.*)")
+    regex = Re(r"([^=]+)=(.*)")
     log.debug(f"parsing openssl config: {path}")
     with open(path, "r", encoding="utf-8") as conf:
         for line in conf:
index c356595afe4439fda0660780d676ee891af07ef9..900cc7e7b46972400eed65bce0de8308d15a0cb9 100644 (file)
@@ -12,6 +12,7 @@
 from datetime import timedelta
 import os
 import re
+from re import compile as Re
 
 import pytest
 
@@ -103,9 +104,7 @@ def check_no_dnssec_in_journal(server, zone):
     ]
 
     cmd = isctest.run.cmd(journalprint)
-    pattern = re.compile(
-        r"^\s*(?:\S+\s+){4}(NSEC|NSEC3|NSEC3PARAM|RRSIG)", flags=re.MULTILINE
-    )
+    pattern = Re(r"^\s*(?:\S+\s+){4}(NSEC|NSEC3|NSEC3PARAM|RRSIG)", flags=re.MULTILINE)
     match = pattern.search(cmd.out)
     assert not match, f"{match.group(1)} record found in journal"
 
index 612be36b1d28e3150f9023a7a8266aee54d7b00d..54cf2fcaf23c4d7430a165d0dc9a53103cb76b98 100644 (file)
@@ -9,7 +9,7 @@
 # See the COPYRIGHT file distributed with this work for additional
 # information regarding copyright ownership.
 
-import re
+from re import compile as Re
 
 import isctest
 
@@ -32,7 +32,7 @@ def wait_for_initial_xfrin(ns):
 
 
 def wait_for_sending_notify(ns1, ns, key_name):
-    pattern = re.compile(
+    pattern = Re(
         f"zone test/IN: sending notify to {ns.ip}#[0-9]+ : TSIG \\({key_name}\\)"
     )
     with ns1.watch_log_from_start() as watcher:
index a4edc5998f019a4d24bee255fba928f34c6924a0..a0d042d1356e1f5572eceedd336462e64690a46e 100644 (file)
@@ -12,6 +12,7 @@
 import glob
 import os
 import re
+from re import compile as Re
 import shutil
 import signal
 import time
@@ -71,7 +72,7 @@ def test_xferquota(named_port, ns1, ns2):
         isctest.check.rrsets_equal(ns1response.answer, ns2response.answer)
 
     query_and_compare(axfr_msg)
-    pattern = re.compile(
+    pattern = Re(
         f"transfer of 'changing/IN' from 10.53.0.1#{named_port}: "
         f"Transfer completed: .*\\(serial 2\\)"
     )