From: Nicki Křížek Date: Thu, 2 Oct 2025 16:14:13 +0000 (+0200) Subject: Use Text with Grep support in isctest.run.cmd() X-Git-Tag: v9.21.17~53^2~4 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=4b6a86b029a34c939b2e265ccab43f4f1b9a5808;p=thirdparty%2Fbind9.git Use Text with Grep support in isctest.run.cmd() When commands are executed using the isctest.run.cmd() command, allow the output to be Grep-able like logs and text files. --- diff --git a/bin/tests/system/dnssec/tests_delv.py b/bin/tests/system/dnssec/tests_delv.py index 9727db3a014..5c24abb2cf2 100644 --- a/bin/tests/system/dnssec/tests_delv.py +++ b/bin/tests/system/dnssec/tests_delv.py @@ -10,7 +10,7 @@ # information regarding copyright ownership. import os -import re +from re import compile as Re import subprocess import pytest @@ -49,13 +49,6 @@ pytestmark = pytest.mark.extra_artifacts( ) -# helper functions -def grep_c(regex, data): - blob = data.splitlines() - results = [x for x in blob if re.search(regex, x)] - return len(results) - - # run delv def delv(*args, tkeys=False): delv_cmd = [os.environ.get("DELV")] @@ -70,115 +63,115 @@ def delv(*args, tkeys=False): def test_positive_validation_delv(): # check positive validation NSEC response = delv("a", "a.example") - assert grep_c("a.example..*10.0.0.1", response.out) - assert grep_c("a.example..*.RRSIG.A [0-9][0-9]* 2 300 .*", response.out) + assert Re("a.example..*10.0.0.1") in response.out + assert Re("a.example..*.RRSIG.A [0-9][0-9]* 2 300 .*") in response.out # check positive validation NSEC (trsuted-keys) response = delv("a", "a.example", tkeys=True) - assert grep_c("a.example..*10.0.0.1", response.out) - assert grep_c("a.example..*.RRSIG.A [0-9][0-9]* 2 300 .*", response.out) + assert Re("a.example..*10.0.0.1") in response.out + assert Re("a.example..*.RRSIG.A [0-9][0-9]* 2 300 .*") in response.out # check positive validation NSEC3 response = delv("a", "a.nsec3.example") - assert grep_c("a.nsec3.example..*10.0.0.1", response.out) - assert grep_c("a.nsec3.example..*.RRSIG.A [0-9][0-9]* 3 300 .*", response.out) + assert Re("a.nsec3.example..*10.0.0.1") in response.out + assert Re("a.nsec3.example..*.RRSIG.A [0-9][0-9]* 3 300 .*") in response.out # check positive validation OPTOUT response = delv("a", "a.optout.example") - assert grep_c("a.optout.example..*10.0.0.1", response.out) - assert grep_c("a.optout.example..*.RRSIG.A [0-9][0-9]* 3 300 .*", response.out) + assert Re("a.optout.example..*10.0.0.1") in response.out + assert Re("a.optout.example..*.RRSIG.A [0-9][0-9]* 3 300 .*") in response.out # check positive wildcard validation NSEC response = delv("a", "a.wild.example") - assert grep_c("a.wild.example..*10.0.0.27", response.out) - assert grep_c("a.wild.example..*.RRSIG.A [0-9][0-9]* 2 300 .*", response.out) + assert Re("a.wild.example..*10.0.0.27") in response.out + assert Re("a.wild.example..*.RRSIG.A [0-9][0-9]* 2 300 .*") in response.out # check positive wildcard validation NSEC3 response = delv("a", "a.wild.nsec3.example") - assert grep_c("a.wild.nsec3.example..*10.0.0.6", response.out) - assert grep_c("a.wild.nsec3.example..*.RRSIG.A [0-9][0-9]* 3 300 .*", response.out) + assert Re("a.wild.nsec3.example..*10.0.0.6") in response.out + assert Re("a.wild.nsec3.example..*.RRSIG.A [0-9][0-9]* 3 300 .*") in response.out # check positive wildcard validation OPTOUT response = delv("a", "a.wild.optout.example") - assert grep_c("a.wild.optout.example..*10.0.0.6", response.out) - assert grep_c("a.wild.optout.example..*.RRSIG.A [0-9][0-9]* 3 300 .*", response.out) + assert Re("a.wild.optout.example..*10.0.0.6") in response.out + assert Re("a.wild.optout.example..*.RRSIG.A [0-9][0-9]* 3 300 .*") in response.out def test_negative_validation_delv(): # checking negative validation NXDOMAIN NSEC response = delv("a", "q.example") - assert grep_c("resolution failed: ncache nxdomain", response.out) + assert "resolution failed: ncache nxdomain" in response.out # checking negative validation NODATA NSEC response = delv("txt", "a.example") - assert grep_c("resolution failed: ncache nxrrset", response.out) + assert "resolution failed: ncache nxrrset" in response.out # checking negative validation NXDOMAIN NSEC3 response = delv("a", "q.nsec3.example") - assert grep_c("resolution failed: ncache nxdomain", response.out) + assert "resolution failed: ncache nxdomain" in response.out # checking negative validation NODATA NSEC3 response = delv("txt", "a.nsec3.example") - assert grep_c("resolution failed: ncache nxrrset", response.out) + assert "resolution failed: ncache nxrrset" in response.out # checking negative validation NXDOMAIN OPTOUT response = delv("a", "q.optout.example") - assert grep_c("resolution failed: ncache nxdomain", response.out) + assert "resolution failed: ncache nxdomain" in response.out # checking negative validation NODATA OPTOUT response = delv("txt", "a.optout.example") - assert grep_c("resolution failed: ncache nxrrset", response.out) + assert "resolution failed: ncache nxrrset" in response.out # checking negative wildcard validation NSEC response = delv("txt", "b.wild.example") - assert grep_c("resolution failed: ncache nxrrset", response.out) + assert "resolution failed: ncache nxrrset" in response.out # checking negative wildcard validation NSEC3 response = delv("txt", "b.wild.nsec3.example") - assert grep_c("resolution failed: ncache nxrrset", response.out) + assert "resolution failed: ncache nxrrset" in response.out # checking negative wildcard validation OPTOUT response = delv("txt", "b.wild.optout.example") - assert grep_c("resolution failed: ncache nxrrset", response.out) + assert "resolution failed: ncache nxrrset" in response.out def test_insecure_validation_delv(): # check 1-server insecurity proof NSEC response = delv("a", "a.insecure.example") - assert grep_c("a.insecure.example..*10.0.0.1", response.out) + assert Re("a.insecure.example..*10.0.0.1") in response.out # check 1-server insecurity proof NSEC3 response = delv("a", "a.insecure.nsec3.example") - assert grep_c("a.insecure.nsec3.example..*10.0.0.1", response.out) + assert Re("a.insecure.nsec3.example..*10.0.0.1") in response.out # check 1-server insecurity proof NSEC3 response = delv("a", "a.insecure.optout.example") - assert grep_c("a.insecure.optout.example..*10.0.0.1", response.out) + assert Re("a.insecure.optout.example..*10.0.0.1") in response.out # check 1-server negative insecurity proof NSEC response = delv("a", "q.insecure.example") - assert grep_c("resolution failed: ncache nxdomain", response.out) + assert "resolution failed: ncache nxdomain" in response.out # check 1-server negative insecurity proof NSEC3 response = delv("a", "q.insecure.nsec3.example") - assert grep_c("resolution failed: ncache nxdomain", response.out) + assert "resolution failed: ncache nxdomain" in response.out # check 1-server negative insecurity proof OPTOUT response = delv("a", "q.insecure.optout.example") - assert grep_c("resolution failed: ncache nxdomain", response.out) + assert "resolution failed: ncache nxdomain" in response.out def test_validation_failure_delv(): # check failed validation due to bogus data response = delv("+cd", "a", "a.bogus.example") - assert grep_c("resolution failed: RRSIG failed to verify", response.out) + assert "resolution failed: RRSIG failed to verify" in response.out # check failed validation due to missing key record response = delv("+cd", "a", "a.b.keyless.example") - assert grep_c("resolution failed: insecurity proof failed", response.out) + assert "resolution failed: insecurity proof failed" in response.out def test_revoked_key_delv(): # check failed validation succeeds when a revoked key is encountered response = delv("+cd", "soa", "revkey.example") - assert grep_c("fully validated", response.out) + assert "fully validated" in response.out diff --git a/bin/tests/system/dnssec/tests_signing.py b/bin/tests/system/dnssec/tests_signing.py index 2a1abd531d3..682c3d7cfb1 100644 --- a/bin/tests/system/dnssec/tests_signing.py +++ b/bin/tests/system/dnssec/tests_signing.py @@ -52,14 +52,6 @@ pytestmark = pytest.mark.extra_artifacts( ) -# helper functions -def grep_c(regex, filename): - with open(filename, "r", encoding="utf-8") as f: - blob = f.read().splitlines() - results = [x for x in blob if re.search(regex, x)] - return len(results) - - # run dnssec-keygen def keygen(*args): keygen_cmd = [os.environ.get("KEYGEN")] @@ -126,7 +118,7 @@ def test_split_dnssec(): isctest.check.adflag(res2) -def test_expiring_rrsig(): +def test_expiring_rrsig(ns3): # check soon-to-expire RRSIGs without a replacement private # key aren't deleted. this response has to have an RRSIG: msg = isctest.query.create("expiring.example.", "NS") @@ -135,8 +127,7 @@ def test_expiring_rrsig(): assert sigs # check that named doesn't loop when private keys are not available - n = grep_c("reading private key file expiring.example", "ns3/named.run") - assert n < 15 + assert len(ns3.log.grep("reading private key file expiring.example")) < 15 # check expired signatures stay place when updates are disabled msg = isctest.query.create("expired.example", "SOA") diff --git a/bin/tests/system/isctest/run.py b/bin/tests/system/isctest/run.py index a052f65d3de..530d413751f 100644 --- a/bin/tests/system/isctest/run.py +++ b/bin/tests/system/isctest/run.py @@ -16,18 +16,19 @@ import time from typing import List, Optional import isctest.log +import isctest.text class CmdResult: def __init__(self, proc=None): self.proc = proc self.rc = self.proc.returncode - self.out = "" - self.err = "" + self.out = isctest.text.Text("") + self.err = isctest.text.Text("") if self.proc.stdout: - self.out = self.proc.stdout.decode("utf-8") + self.out = isctest.text.Text(self.proc.stdout.decode("utf-8")) if self.proc.stderr: - self.err = self.proc.stderr.decode("utf-8") + self.err = isctest.text.Text(self.proc.stderr.decode("utf-8")) def cmd( diff --git a/bin/tests/system/isctest/text.py b/bin/tests/system/isctest/text.py index 46b710e454f..ca3cc835e71 100644 --- a/bin/tests/system/isctest/text.py +++ b/bin/tests/system/isctest/text.py @@ -65,6 +65,15 @@ class Grep(abc.ABC): return True +class Text(Grep, str): # type: ignore + """ + Wrapper around classic string with grep support. + """ + + def readlines(self): + yield from self.splitlines(keepends=True) + + class TextFile(Grep): """ Text file wrapper with grep support. diff --git a/bin/tests/system/keyfromlabel/tests_keyfromlabel.py b/bin/tests/system/keyfromlabel/tests_keyfromlabel.py index da9b3913135..0eb9370060a 100644 --- a/bin/tests/system/keyfromlabel/tests_keyfromlabel.py +++ b/bin/tests/system/keyfromlabel/tests_keyfromlabel.py @@ -11,7 +11,7 @@ import hashlib import os -import re +from re import compile as Re import shutil import pytest @@ -83,7 +83,7 @@ def token_init_and_cleanup(): env=EMPTY_OPENSSL_CONF_ENV, raise_on_exception=False, ) - assert re.search("Found token (.*) with matching token label", cmd.out) + assert Re("Found token (.*) with matching token label") in cmd.out # pylint: disable-msg=too-many-locals diff --git a/bin/tests/system/multisigner/tests_multisigner.py b/bin/tests/system/multisigner/tests_multisigner.py index 40f6d9e04cc..020d39791d1 100644 --- a/bin/tests/system/multisigner/tests_multisigner.py +++ b/bin/tests/system/multisigner/tests_multisigner.py @@ -11,7 +11,6 @@ from datetime import timedelta import os -import re from re import compile as Re import pytest @@ -104,9 +103,9 @@ def check_no_dnssec_in_journal(server, zone): ] cmd = isctest.run.cmd(journalprint) - pattern = Re(r"^\s*(?:\S+\s+){4}(NSEC|NSEC3|NSEC3PARAM|RRSIG)", flags=re.MULTILINE) - match = pattern.search(cmd.out) - assert not match, f"{match.group(1)} record found in journal" + assert ( + Re(r"^\s*(?:\S+\s+){4}(NSEC|NSEC3|NSEC3PARAM|RRSIG)") not in cmd.out + ), "dnssec record found in journal" def wait_for_serial(primary, server, zone): diff --git a/bin/tests/system/verify/tests_verify.py b/bin/tests/system/verify/tests_verify.py index 9ec09331764..e5e780a5324 100644 --- a/bin/tests/system/verify/tests_verify.py +++ b/bin/tests/system/verify/tests_verify.py @@ -11,7 +11,7 @@ import os import re -import subprocess +from re import compile as Re import pytest @@ -62,14 +62,14 @@ def test_verify_good_zone_nsec_next_name_case_mismatch(): ) -def get_bad_zone_output(zone): - only_opt = ["-z"] if re.match(r"[zk]sk-only", zone) else [] +def verify_bad_zone(zone): + only_opt = ["-z"] if re.search(r"^[zk]sk-only", zone) else [] cmd = isctest.run.cmd( [VERIFY, *only_opt, "-o", zone, f"zones/{zone}.bad"], - stderr=subprocess.STDOUT, raise_on_exception=False, ) - return cmd.out + assert cmd.rc != 0 + return cmd @pytest.mark.parametrize( @@ -81,7 +81,8 @@ def get_bad_zone_output(zone): ], ) def test_verify_bad_zone_files_dnskeyonly(zone): - assert re.match(r".*DNSKEY is not signed.*", get_bad_zone_output(zone)) + cmd = verify_bad_zone(zone) + assert "DNSKEY is not signed" in cmd.err @pytest.mark.parametrize( @@ -98,10 +99,8 @@ def test_verify_bad_zone_files_dnskeyonly(zone): ], ) def test_verify_bad_zone_files_expired(zone): - assert re.match( - r".*signature has expired.*|.*No self-signed .*DNSKEY found.*", - get_bad_zone_output(zone), - ) + cmd = verify_bad_zone(zone) + assert Re("signature has expired|No self-signed DNSKEY found") in cmd.err @pytest.mark.parametrize( @@ -113,40 +112,33 @@ def test_verify_bad_zone_files_expired(zone): ], ) def test_verify_bad_zone_files_unexpected_nsec_rrset(zone): - assert re.match(r".*unexpected NSEC RRset at.*", get_bad_zone_output(zone)) + cmd = verify_bad_zone(zone) + assert "unexpected NSEC RRset at" in cmd.err def test_verify_bad_zone_files_bad_nsec_record(): - assert re.match( - r".*Bad NSEC record for.*, next name mismatch.*", - get_bad_zone_output("ksk+zsk.nsec.broken-chain"), - ) + cmd = verify_bad_zone("ksk+zsk.nsec.broken-chain") + assert Re("Bad NSEC record for.*, next name mismatch") in cmd.err def test_verify_bad_zone_files_bad_bitmap(): - assert re.match( - r".*bit map mismatch.*", get_bad_zone_output("ksk+zsk.nsec.bad-bitmap") - ) + cmd = verify_bad_zone("ksk+zsk.nsec.bad-bitmap") + assert "bit map mismatch" in cmd.err def test_verify_bad_zone_files_missing_nsec3_record(): - assert re.match( - r".*Missing NSEC3 record for.*", - get_bad_zone_output("ksk+zsk.nsec3.missing-empty"), - ) + cmd = verify_bad_zone("ksk+zsk.nsec3.missing-empty") + assert "Missing NSEC3 record for" in cmd.err def test_verify_bad_zone_files_no_dnssec_keys(): - assert re.match( - r".*Zone contains no DNSSEC keys.*", get_bad_zone_output("unsigned") - ) + cmd = verify_bad_zone("unsigned") + assert "Zone contains no DNSSEC keys" in cmd.err def test_verify_bad_zone_files_unequal_nsec3_chains(): - assert re.match( - r".*Expected and found NSEC3 chains not equal.*", - get_bad_zone_output("ksk+zsk.nsec3.extra-nsec3"), - ) + cmd = verify_bad_zone("ksk+zsk.nsec3.extra-nsec3") + assert "Expected and found NSEC3 chains not equal" in cmd.err # checking error message when -o is not used