]> git.ipfire.org Git - thirdparty/bind9.git/commitdiff
Use Text with Grep support in isctest.run.cmd()
authorNicki Křížek <nicki@isc.org>
Thu, 2 Oct 2025 16:14:13 +0000 (18:14 +0200)
committerNicki Křížek <nicki@isc.org>
Mon, 8 Dec 2025 13:57:47 +0000 (14:57 +0100)
When commands are executed using the isctest.run.cmd() command, allow
the output to be Grep-able like logs and text files.

bin/tests/system/dnssec/tests_delv.py
bin/tests/system/dnssec/tests_signing.py
bin/tests/system/isctest/run.py
bin/tests/system/isctest/text.py
bin/tests/system/keyfromlabel/tests_keyfromlabel.py
bin/tests/system/multisigner/tests_multisigner.py
bin/tests/system/verify/tests_verify.py

index 9727db3a014999baad4f1dbeab8e4e77221fc1b6..5c24abb2cf2ec2f235d206523b990202a2bd92f7 100644 (file)
@@ -10,7 +10,7 @@
 # information regarding copyright ownership.
 
 import os
-import re
+from re import compile as Re
 import subprocess
 
 import pytest
@@ -49,13 +49,6 @@ pytestmark = pytest.mark.extra_artifacts(
 )
 
 
-# helper functions
-def grep_c(regex, data):
-    blob = data.splitlines()
-    results = [x for x in blob if re.search(regex, x)]
-    return len(results)
-
-
 # run delv
 def delv(*args, tkeys=False):
     delv_cmd = [os.environ.get("DELV")]
@@ -70,115 +63,115 @@ def delv(*args, tkeys=False):
 def test_positive_validation_delv():
     # check positive validation NSEC
     response = delv("a", "a.example")
-    assert grep_c("a.example..*10.0.0.1", response.out)
-    assert grep_c("a.example..*.RRSIG.A [0-9][0-9]* 2 300 .*", response.out)
+    assert Re("a.example..*10.0.0.1") in response.out
+    assert Re("a.example..*.RRSIG.A [0-9][0-9]* 2 300 .*") in response.out
 
     # check positive validation NSEC (trsuted-keys)
     response = delv("a", "a.example", tkeys=True)
-    assert grep_c("a.example..*10.0.0.1", response.out)
-    assert grep_c("a.example..*.RRSIG.A [0-9][0-9]* 2 300 .*", response.out)
+    assert Re("a.example..*10.0.0.1") in response.out
+    assert Re("a.example..*.RRSIG.A [0-9][0-9]* 2 300 .*") in response.out
 
     # check positive validation NSEC3
     response = delv("a", "a.nsec3.example")
-    assert grep_c("a.nsec3.example..*10.0.0.1", response.out)
-    assert grep_c("a.nsec3.example..*.RRSIG.A [0-9][0-9]* 3 300 .*", response.out)
+    assert Re("a.nsec3.example..*10.0.0.1") in response.out
+    assert Re("a.nsec3.example..*.RRSIG.A [0-9][0-9]* 3 300 .*") in response.out
 
     # check positive validation OPTOUT
     response = delv("a", "a.optout.example")
-    assert grep_c("a.optout.example..*10.0.0.1", response.out)
-    assert grep_c("a.optout.example..*.RRSIG.A [0-9][0-9]* 3 300 .*", response.out)
+    assert Re("a.optout.example..*10.0.0.1") in response.out
+    assert Re("a.optout.example..*.RRSIG.A [0-9][0-9]* 3 300 .*") in response.out
 
     # check positive wildcard validation NSEC
     response = delv("a", "a.wild.example")
-    assert grep_c("a.wild.example..*10.0.0.27", response.out)
-    assert grep_c("a.wild.example..*.RRSIG.A [0-9][0-9]* 2 300 .*", response.out)
+    assert Re("a.wild.example..*10.0.0.27") in response.out
+    assert Re("a.wild.example..*.RRSIG.A [0-9][0-9]* 2 300 .*") in response.out
 
     # check positive wildcard validation NSEC3
     response = delv("a", "a.wild.nsec3.example")
-    assert grep_c("a.wild.nsec3.example..*10.0.0.6", response.out)
-    assert grep_c("a.wild.nsec3.example..*.RRSIG.A [0-9][0-9]* 3 300 .*", response.out)
+    assert Re("a.wild.nsec3.example..*10.0.0.6") in response.out
+    assert Re("a.wild.nsec3.example..*.RRSIG.A [0-9][0-9]* 3 300 .*") in response.out
 
     # check positive wildcard validation OPTOUT
     response = delv("a", "a.wild.optout.example")
-    assert grep_c("a.wild.optout.example..*10.0.0.6", response.out)
-    assert grep_c("a.wild.optout.example..*.RRSIG.A [0-9][0-9]* 3 300 .*", response.out)
+    assert Re("a.wild.optout.example..*10.0.0.6") in response.out
+    assert Re("a.wild.optout.example..*.RRSIG.A [0-9][0-9]* 3 300 .*") in response.out
 
 
 def test_negative_validation_delv():
     # checking negative validation NXDOMAIN NSEC
     response = delv("a", "q.example")
-    assert grep_c("resolution failed: ncache nxdomain", response.out)
+    assert "resolution failed: ncache nxdomain" in response.out
 
     # checking negative validation NODATA NSEC
     response = delv("txt", "a.example")
-    assert grep_c("resolution failed: ncache nxrrset", response.out)
+    assert "resolution failed: ncache nxrrset" in response.out
 
     # checking negative validation NXDOMAIN NSEC3
     response = delv("a", "q.nsec3.example")
-    assert grep_c("resolution failed: ncache nxdomain", response.out)
+    assert "resolution failed: ncache nxdomain" in response.out
 
     # checking negative validation NODATA NSEC3
     response = delv("txt", "a.nsec3.example")
-    assert grep_c("resolution failed: ncache nxrrset", response.out)
+    assert "resolution failed: ncache nxrrset" in response.out
 
     # checking negative validation NXDOMAIN OPTOUT
     response = delv("a", "q.optout.example")
-    assert grep_c("resolution failed: ncache nxdomain", response.out)
+    assert "resolution failed: ncache nxdomain" in response.out
 
     # checking negative validation NODATA OPTOUT
     response = delv("txt", "a.optout.example")
-    assert grep_c("resolution failed: ncache nxrrset", response.out)
+    assert "resolution failed: ncache nxrrset" in response.out
 
     # checking negative wildcard validation NSEC
     response = delv("txt", "b.wild.example")
-    assert grep_c("resolution failed: ncache nxrrset", response.out)
+    assert "resolution failed: ncache nxrrset" in response.out
 
     # checking negative wildcard validation NSEC3
     response = delv("txt", "b.wild.nsec3.example")
-    assert grep_c("resolution failed: ncache nxrrset", response.out)
+    assert "resolution failed: ncache nxrrset" in response.out
 
     # checking negative wildcard validation OPTOUT
     response = delv("txt", "b.wild.optout.example")
-    assert grep_c("resolution failed: ncache nxrrset", response.out)
+    assert "resolution failed: ncache nxrrset" in response.out
 
 
 def test_insecure_validation_delv():
     # check 1-server insecurity proof NSEC
     response = delv("a", "a.insecure.example")
-    assert grep_c("a.insecure.example..*10.0.0.1", response.out)
+    assert Re("a.insecure.example..*10.0.0.1") in response.out
 
     # check 1-server insecurity proof NSEC3
     response = delv("a", "a.insecure.nsec3.example")
-    assert grep_c("a.insecure.nsec3.example..*10.0.0.1", response.out)
+    assert Re("a.insecure.nsec3.example..*10.0.0.1") in response.out
 
     # check 1-server insecurity proof NSEC3
     response = delv("a", "a.insecure.optout.example")
-    assert grep_c("a.insecure.optout.example..*10.0.0.1", response.out)
+    assert Re("a.insecure.optout.example..*10.0.0.1") in response.out
 
     # check 1-server negative insecurity proof NSEC
     response = delv("a", "q.insecure.example")
-    assert grep_c("resolution failed: ncache nxdomain", response.out)
+    assert "resolution failed: ncache nxdomain" in response.out
 
     # check 1-server negative insecurity proof NSEC3
     response = delv("a", "q.insecure.nsec3.example")
-    assert grep_c("resolution failed: ncache nxdomain", response.out)
+    assert "resolution failed: ncache nxdomain" in response.out
 
     # check 1-server negative insecurity proof OPTOUT
     response = delv("a", "q.insecure.optout.example")
-    assert grep_c("resolution failed: ncache nxdomain", response.out)
+    assert "resolution failed: ncache nxdomain" in response.out
 
 
 def test_validation_failure_delv():
     # check failed validation due to bogus data
     response = delv("+cd", "a", "a.bogus.example")
-    assert grep_c("resolution failed: RRSIG failed to verify", response.out)
+    assert "resolution failed: RRSIG failed to verify" in response.out
 
     # check failed validation due to missing key record
     response = delv("+cd", "a", "a.b.keyless.example")
-    assert grep_c("resolution failed: insecurity proof failed", response.out)
+    assert "resolution failed: insecurity proof failed" in response.out
 
 
 def test_revoked_key_delv():
     # check failed validation succeeds when a revoked key is encountered
     response = delv("+cd", "soa", "revkey.example")
-    assert grep_c("fully validated", response.out)
+    assert "fully validated" in response.out
index 2a1abd531d3933da2773b4739bbed1b1c6ee884d..682c3d7cfb1665601bfecf12e505763f888e5590 100644 (file)
@@ -52,14 +52,6 @@ pytestmark = pytest.mark.extra_artifacts(
 )
 
 
-# helper functions
-def grep_c(regex, filename):
-    with open(filename, "r", encoding="utf-8") as f:
-        blob = f.read().splitlines()
-    results = [x for x in blob if re.search(regex, x)]
-    return len(results)
-
-
 # run dnssec-keygen
 def keygen(*args):
     keygen_cmd = [os.environ.get("KEYGEN")]
@@ -126,7 +118,7 @@ def test_split_dnssec():
     isctest.check.adflag(res2)
 
 
-def test_expiring_rrsig():
+def test_expiring_rrsig(ns3):
     # check soon-to-expire RRSIGs without a replacement private
     # key aren't deleted. this response has to have an RRSIG:
     msg = isctest.query.create("expiring.example.", "NS")
@@ -135,8 +127,7 @@ def test_expiring_rrsig():
     assert sigs
 
     # check that named doesn't loop when private keys are not available
-    n = grep_c("reading private key file expiring.example", "ns3/named.run")
-    assert n < 15
+    assert len(ns3.log.grep("reading private key file expiring.example")) < 15
 
     # check expired signatures stay place when updates are disabled
     msg = isctest.query.create("expired.example", "SOA")
index a052f65d3de013b1ce20b86d34d363604554514e..530d413751f54f4ad912f381485bfe69d18f22d9 100644 (file)
@@ -16,18 +16,19 @@ import time
 from typing import List, Optional
 
 import isctest.log
+import isctest.text
 
 
 class CmdResult:
     def __init__(self, proc=None):
         self.proc = proc
         self.rc = self.proc.returncode
-        self.out = ""
-        self.err = ""
+        self.out = isctest.text.Text("")
+        self.err = isctest.text.Text("")
         if self.proc.stdout:
-            self.out = self.proc.stdout.decode("utf-8")
+            self.out = isctest.text.Text(self.proc.stdout.decode("utf-8"))
         if self.proc.stderr:
-            self.err = self.proc.stderr.decode("utf-8")
+            self.err = isctest.text.Text(self.proc.stderr.decode("utf-8"))
 
 
 def cmd(
index 46b710e454f5461b69c9b798c4c8a6fe0166a33c..ca3cc835e71f7d129306bfb0f98654ca29b7e5d2 100644 (file)
@@ -65,6 +65,15 @@ class Grep(abc.ABC):
         return True
 
 
+class Text(Grep, str):  # type: ignore
+    """
+    Wrapper around classic string with grep support.
+    """
+
+    def readlines(self):
+        yield from self.splitlines(keepends=True)
+
+
 class TextFile(Grep):
     """
     Text file wrapper with grep support.
index da9b3913135e10acde38ffcd6b13f1caa41649e1..0eb9370060aaed763fbe08b96393b4cc3d05c5e1 100644 (file)
@@ -11,7 +11,7 @@
 
 import hashlib
 import os
-import re
+from re import compile as Re
 import shutil
 
 import pytest
@@ -83,7 +83,7 @@ def token_init_and_cleanup():
             env=EMPTY_OPENSSL_CONF_ENV,
             raise_on_exception=False,
         )
-        assert re.search("Found token (.*) with matching token label", cmd.out)
+        assert Re("Found token (.*) with matching token label") in cmd.out
 
 
 # pylint: disable-msg=too-many-locals
index 40f6d9e04ccb676366eb4a02286a0227683abc48..020d39791d1feb6a0dbd31fdcf3b1856faf4c286 100644 (file)
@@ -11,7 +11,6 @@
 
 from datetime import timedelta
 import os
-import re
 from re import compile as Re
 
 import pytest
@@ -104,9 +103,9 @@ def check_no_dnssec_in_journal(server, zone):
     ]
 
     cmd = isctest.run.cmd(journalprint)
-    pattern = Re(r"^\s*(?:\S+\s+){4}(NSEC|NSEC3|NSEC3PARAM|RRSIG)", flags=re.MULTILINE)
-    match = pattern.search(cmd.out)
-    assert not match, f"{match.group(1)} record found in journal"
+    assert (
+        Re(r"^\s*(?:\S+\s+){4}(NSEC|NSEC3|NSEC3PARAM|RRSIG)") not in cmd.out
+    ), "dnssec record found in journal"
 
 
 def wait_for_serial(primary, server, zone):
index 9ec0933176434ff3548d94fe56f313a6cef847fc..e5e780a5324921e924a7006ef5a9f925b9d0d44a 100644 (file)
@@ -11,7 +11,7 @@
 
 import os
 import re
-import subprocess
+from re import compile as Re
 
 import pytest
 
@@ -62,14 +62,14 @@ def test_verify_good_zone_nsec_next_name_case_mismatch():
     )
 
 
-def get_bad_zone_output(zone):
-    only_opt = ["-z"] if re.match(r"[zk]sk-only", zone) else []
+def verify_bad_zone(zone):
+    only_opt = ["-z"] if re.search(r"^[zk]sk-only", zone) else []
     cmd = isctest.run.cmd(
         [VERIFY, *only_opt, "-o", zone, f"zones/{zone}.bad"],
-        stderr=subprocess.STDOUT,
         raise_on_exception=False,
     )
-    return cmd.out
+    assert cmd.rc != 0
+    return cmd
 
 
 @pytest.mark.parametrize(
@@ -81,7 +81,8 @@ def get_bad_zone_output(zone):
     ],
 )
 def test_verify_bad_zone_files_dnskeyonly(zone):
-    assert re.match(r".*DNSKEY is not signed.*", get_bad_zone_output(zone))
+    cmd = verify_bad_zone(zone)
+    assert "DNSKEY is not signed" in cmd.err
 
 
 @pytest.mark.parametrize(
@@ -98,10 +99,8 @@ def test_verify_bad_zone_files_dnskeyonly(zone):
     ],
 )
 def test_verify_bad_zone_files_expired(zone):
-    assert re.match(
-        r".*signature has expired.*|.*No self-signed .*DNSKEY found.*",
-        get_bad_zone_output(zone),
-    )
+    cmd = verify_bad_zone(zone)
+    assert Re("signature has expired|No self-signed DNSKEY found") in cmd.err
 
 
 @pytest.mark.parametrize(
@@ -113,40 +112,33 @@ def test_verify_bad_zone_files_expired(zone):
     ],
 )
 def test_verify_bad_zone_files_unexpected_nsec_rrset(zone):
-    assert re.match(r".*unexpected NSEC RRset at.*", get_bad_zone_output(zone))
+    cmd = verify_bad_zone(zone)
+    assert "unexpected NSEC RRset at" in cmd.err
 
 
 def test_verify_bad_zone_files_bad_nsec_record():
-    assert re.match(
-        r".*Bad NSEC record for.*, next name mismatch.*",
-        get_bad_zone_output("ksk+zsk.nsec.broken-chain"),
-    )
+    cmd = verify_bad_zone("ksk+zsk.nsec.broken-chain")
+    assert Re("Bad NSEC record for.*, next name mismatch") in cmd.err
 
 
 def test_verify_bad_zone_files_bad_bitmap():
-    assert re.match(
-        r".*bit map mismatch.*", get_bad_zone_output("ksk+zsk.nsec.bad-bitmap")
-    )
+    cmd = verify_bad_zone("ksk+zsk.nsec.bad-bitmap")
+    assert "bit map mismatch" in cmd.err
 
 
 def test_verify_bad_zone_files_missing_nsec3_record():
-    assert re.match(
-        r".*Missing NSEC3 record for.*",
-        get_bad_zone_output("ksk+zsk.nsec3.missing-empty"),
-    )
+    cmd = verify_bad_zone("ksk+zsk.nsec3.missing-empty")
+    assert "Missing NSEC3 record for" in cmd.err
 
 
 def test_verify_bad_zone_files_no_dnssec_keys():
-    assert re.match(
-        r".*Zone contains no DNSSEC keys.*", get_bad_zone_output("unsigned")
-    )
+    cmd = verify_bad_zone("unsigned")
+    assert "Zone contains no DNSSEC keys" in cmd.err
 
 
 def test_verify_bad_zone_files_unequal_nsec3_chains():
-    assert re.match(
-        r".*Expected and found NSEC3 chains not equal.*",
-        get_bad_zone_output("ksk+zsk.nsec3.extra-nsec3"),
-    )
+    cmd = verify_bad_zone("ksk+zsk.nsec3.extra-nsec3")
+    assert "Expected and found NSEC3 chains not equal" in cmd.err
 
 
 # checking error message when -o is not used