From: Nicki Křížek Date: Wed, 1 Oct 2025 15:53:18 +0000 (+0200) Subject: Use CmdResult to decode stdout/stderr from isctest.run.cmd() X-Git-Tag: v9.21.17~53^2~8 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=ac998da3f6c45073ea188763e7b9c84fa34e45ff;p=thirdparty%2Fbind9.git Use CmdResult to decode stdout/stderr from isctest.run.cmd() Avoid repeating the .decode("utf-8") snippet when processing command output and provide a helper instead, which leads to more concise code. --- diff --git a/bin/tests/system/checkconf-keys/tests_checkconf_keys.py b/bin/tests/system/checkconf-keys/tests_checkconf_keys.py index c89f156f825..1e512cf2469 100644 --- a/bin/tests/system/checkconf-keys/tests_checkconf_keys.py +++ b/bin/tests/system/checkconf-keys/tests_checkconf_keys.py @@ -38,125 +38,117 @@ def test_dnssecpolicy_keystore(): # Superfluous key file. zone = "superfluous-keyfile.kz.example" - out = isctest.run.cmd( + cmd = isctest.run.cmd( [CHECKCONF, "-k", "bad-superfluous-keyfile.conf"], raise_on_exception=False ) - err = out.stdout.decode("utf-8") - assert f"zone '{zone}': wrong number of key files (3, expected 2)" in err + assert f"zone '{zone}': wrong number of key files (3, expected 2)" in cmd.out # Missing key file. zone = "missing-keyfile.kz.example" - out = isctest.run.cmd( + cmd = isctest.run.cmd( [CHECKCONF, "-k", "bad-missing-keyfile.conf"], raise_on_exception=False ) - err = out.stdout.decode("utf-8") - assert f"zone '{zone}': wrong number of key files (1, expected 2)" in err + assert f"zone '{zone}': wrong number of key files (1, expected 2)" in cmd.out # Mismatch algorithm. zone = "bad-algorithm.kz.example" - out = isctest.run.cmd( + cmd = isctest.run.cmd( [CHECKCONF, "-k", "bad-algorithm.conf"], raise_on_exception=False ) - err = out.stdout.decode("utf-8") keys = isctest.kasp.keydir_to_keylist(zone) assert len(keys) == 2 assert ( f"zone '{zone}': key file '{zone}/ECDSAP256SHA256/{keys[0].tag}' does not match dnssec-policy alternative-kz" - in err + in cmd.out ) assert ( f"zone '{zone}': key file '{zone}/ECDSAP256SHA256/{keys[1].tag}' does not match dnssec-policy alternative-kz" - in err + in cmd.out ) assert ( f"zone '{zone}': no key file found matching dnssec-policy alternative-kz key:'ksk algorithm:RSASHA256 length:2048 tag-range:0-65535'" - in err + in cmd.out ) assert ( f"zone '{zone}': no key file found matching dnssec-policy alternative-kz key:'zsk algorithm:RSASHA256 length:2048 tag-range:0-65535'" - in err + in cmd.out ) # Mismatch length zone = "bad-length.csk.example" - out = isctest.run.cmd( + cmd = isctest.run.cmd( [CHECKCONF, "-k", "bad-length.conf"], raise_on_exception=False ) - err = out.stdout.decode("utf-8") keys = isctest.kasp.keydir_to_keylist(zone) assert len(keys) == 1 assert ( f"zone '{zone}': key file '{zone}/RSASHA256/{keys[0].tag}' does not match dnssec-policy alternative-csk" - in err + in cmd.out ) assert ( f"zone '{zone}': no key file found matching dnssec-policy alternative-csk key:'csk algorithm:RSASHA256 length:2048 tag-range:0-65535'" - in err + in cmd.out ) # Mismatch tag range zone = "bad-tagrange.csk.example" - out = isctest.run.cmd( + cmd = isctest.run.cmd( [CHECKCONF, "-k", "bad-tagrange.conf"], raise_on_exception=False ) - err = out.stdout.decode("utf-8") keys = isctest.kasp.keydir_to_keylist(zone) assert len(keys) == 1 assert ( f"zone '{zone}': key file '{zone}/ECDSAP256SHA256/{keys[0].tag}' does not match dnssec-policy tagrange-csk" - in err + in cmd.out ) assert ( f"zone '{zone}': no key file found matching dnssec-policy tagrange-csk key:'csk algorithm:ECDSAP256SHA256 length:256 tag-range:0-32767'" - in err + in cmd.out ) # Mismatch role zone = "bad-role.kz.example" - out = isctest.run.cmd([CHECKCONF, "-k", "bad-role.conf"], raise_on_exception=False) - err = out.stdout.decode("utf-8") + cmd = isctest.run.cmd([CHECKCONF, "-k", "bad-role.conf"], raise_on_exception=False) keys = isctest.kasp.keydir_to_keylist(zone) assert len(keys) == 2 assert ( f"zone '{zone}': no key file found matching dnssec-policy default-kz key:'zsk algorithm:ECDSAP256SHA256 length:256 tag-range:0-65535'" - in err + in cmd.out ) # Mismatch algorithm (default policy) zone = "bad-default-algorithm.example" - out = isctest.run.cmd( + cmd = isctest.run.cmd( [CHECKCONF, "-k", "bad-default-algorithm.conf"], raise_on_exception=False ) - err = out.stdout.decode("utf-8") keys = isctest.kasp.keydir_to_keylist(zone) assert len(keys) == 1 assert ( f"zone '{zone}': key file '{zone}/RSASHA256/{keys[0].tag}' does not match dnssec-policy default" - in err + in cmd.out ) assert ( f"zone '{zone}': no key file found matching dnssec-policy default key:'csk algorithm:ECDSAP256SHA256 length:256 tag-range:0-65535'" - in err + in cmd.out ) # Mismatch role (default policy) zone = "bad-default-kz.example" - out = isctest.run.cmd( + cmd = isctest.run.cmd( [CHECKCONF, "-k", "bad-default-kz.conf"], raise_on_exception=False ) - err = out.stdout.decode("utf-8") keys = isctest.kasp.keydir_to_keylist(zone) assert len(keys) == 2 assert ( f"zone '{zone}': key file '{zone}/ECDSAP256SHA256/{keys[0].tag}' does not match dnssec-policy default" - in err + in cmd.out ) assert ( f"zone '{zone}': key file '{zone}/ECDSAP256SHA256/{keys[1].tag}' does not match dnssec-policy default" - in err + in cmd.out ) assert ( f"zone '{zone}': no key file found matching dnssec-policy default key:'csk algorithm:ECDSAP256SHA256 length:256 tag-range:0-65535'" - in err + in cmd.out ) - assert f"zone '{zone}': wrong number of key files (2, expected 1)" in err + assert f"zone '{zone}': wrong number of key files (2, expected 1)" in cmd.out diff --git a/bin/tests/system/checkconf/tests_checkconf.py b/bin/tests/system/checkconf/tests_checkconf.py index 83abdb1e2f1..3b7ef067806 100644 --- a/bin/tests/system/checkconf/tests_checkconf.py +++ b/bin/tests/system/checkconf/tests_checkconf.py @@ -15,25 +15,23 @@ import isctest def test_checkconf_effective(): - proc = isctest.run.cmd([os.environ["CHECKCONF"], "-e", "effective.conf"]) - checkconf_output = proc.stdout.decode() - assert "listen-on port 5353 {\n\t\t127.1.2.3/32;\n\t};" in checkconf_output - assert 'view "_bind" chaos {' in checkconf_output - assert 'remote-servers "_default_iana_root_zone_primaries" {' in checkconf_output - assert 'view "foo" {\n}' in checkconf_output + cmd = isctest.run.cmd([os.environ["CHECKCONF"], "-e", "effective.conf"]) + assert b"listen-on port 5353 {\n\t\t127.1.2.3/32;\n\t};" in cmd.proc.stdout + assert 'view "_bind" chaos {' in cmd.out + assert 'remote-servers "_default_iana_root_zone_primaries" {' in cmd.out + assert b'view "foo" {\n}' in cmd.proc.stdout # builtin-trust-anchors is non documented and internal clause only, it must # not be visible. - assert "builtin-trust-anchors" not in checkconf_output + assert "builtin-trust-anchors" not in cmd.out def test_checkconf_builtin(): - proc = isctest.run.cmd([os.environ["CHECKCONF"], "-b"]) - checkconf_output = proc.stdout.decode() - assert 'listen-on {\n\t\t"any";\n\t};' in checkconf_output - assert 'view "_bind" chaos {' in checkconf_output - assert 'remote-servers "_default_iana_root_zone_primaries" {' in checkconf_output + cmd = isctest.run.cmd([os.environ["CHECKCONF"], "-b"]) + assert b'listen-on {\n\t\t"any";\n\t};' in cmd.proc.stdout + assert 'view "_bind" chaos {' in cmd.out + assert 'remote-servers "_default_iana_root_zone_primaries" {' in cmd.out # builtin-trust-anchors is non documented and internal clause only, it must # not be visible. - assert "builtin-trust-anchors" not in checkconf_output + assert "builtin-trust-anchors" not in cmd.out diff --git a/bin/tests/system/checkds/tests_checkds.py b/bin/tests/system/checkds/tests_checkds.py index 5e859a44c24..b6c36e39085 100755 --- a/bin/tests/system/checkds/tests_checkds.py +++ b/bin/tests/system/checkds/tests_checkds.py @@ -102,10 +102,10 @@ def verify_zone(zone, transfer): verifier = isctest.run.cmd(verify_cmd) - if verifier.returncode != 0: + if verifier.rc != 0: isctest.log.error(f"dnssec-verify {zone} failed") - return verifier.returncode == 0 + return verifier.rc == 0 def read_statefile(server, zone): @@ -210,10 +210,10 @@ def rekey(zone): ] controller = isctest.run.cmd(rndc_cmd) - if controller.returncode != 0: + if controller.rc != 0: isctest.log.error(f"rndc loadkeys {zone} failed") - assert controller.returncode == 0 + assert controller.rc == 0 class CheckDSTest(NamedTuple): diff --git a/bin/tests/system/dnssec/tests_delv.py b/bin/tests/system/dnssec/tests_delv.py index aa374584bac..9727db3a014 100644 --- a/bin/tests/system/dnssec/tests_delv.py +++ b/bin/tests/system/dnssec/tests_delv.py @@ -64,125 +64,121 @@ def delv(*args, tkeys=False): delv_cmd.extend(["@10.53.0.4", "-a", tfile, "-p", os.environ["PORT"]]) delv_cmd.extend(args) - return ( - isctest.run.cmd(delv_cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT) - .stdout.decode("utf-8") - .strip() - ) + return isctest.run.cmd(delv_cmd, stderr=subprocess.STDOUT) def test_positive_validation_delv(): # check positive validation NSEC response = delv("a", "a.example") - assert grep_c("a.example..*10.0.0.1", response) - assert grep_c("a.example..*.RRSIG.A [0-9][0-9]* 2 300 .*", response) + assert grep_c("a.example..*10.0.0.1", response.out) + assert grep_c("a.example..*.RRSIG.A [0-9][0-9]* 2 300 .*", response.out) # check positive validation NSEC (trsuted-keys) response = delv("a", "a.example", tkeys=True) - assert grep_c("a.example..*10.0.0.1", response) - assert grep_c("a.example..*.RRSIG.A [0-9][0-9]* 2 300 .*", response) + assert grep_c("a.example..*10.0.0.1", response.out) + assert grep_c("a.example..*.RRSIG.A [0-9][0-9]* 2 300 .*", response.out) # check positive validation NSEC3 response = delv("a", "a.nsec3.example") - assert grep_c("a.nsec3.example..*10.0.0.1", response) - assert grep_c("a.nsec3.example..*.RRSIG.A [0-9][0-9]* 3 300 .*", response) + assert grep_c("a.nsec3.example..*10.0.0.1", response.out) + assert grep_c("a.nsec3.example..*.RRSIG.A [0-9][0-9]* 3 300 .*", response.out) # check positive validation OPTOUT response = delv("a", "a.optout.example") - assert grep_c("a.optout.example..*10.0.0.1", response) - assert grep_c("a.optout.example..*.RRSIG.A [0-9][0-9]* 3 300 .*", response) + assert grep_c("a.optout.example..*10.0.0.1", response.out) + assert grep_c("a.optout.example..*.RRSIG.A [0-9][0-9]* 3 300 .*", response.out) # check positive wildcard validation NSEC response = delv("a", "a.wild.example") - assert grep_c("a.wild.example..*10.0.0.27", response) - assert grep_c("a.wild.example..*.RRSIG.A [0-9][0-9]* 2 300 .*", response) + assert grep_c("a.wild.example..*10.0.0.27", response.out) + assert grep_c("a.wild.example..*.RRSIG.A [0-9][0-9]* 2 300 .*", response.out) # check positive wildcard validation NSEC3 response = delv("a", "a.wild.nsec3.example") - assert grep_c("a.wild.nsec3.example..*10.0.0.6", response) - assert grep_c("a.wild.nsec3.example..*.RRSIG.A [0-9][0-9]* 3 300 .*", response) + assert grep_c("a.wild.nsec3.example..*10.0.0.6", response.out) + assert grep_c("a.wild.nsec3.example..*.RRSIG.A [0-9][0-9]* 3 300 .*", response.out) # check positive wildcard validation OPTOUT response = delv("a", "a.wild.optout.example") - assert grep_c("a.wild.optout.example..*10.0.0.6", response) - assert grep_c("a.wild.optout.example..*.RRSIG.A [0-9][0-9]* 3 300 .*", response) + assert grep_c("a.wild.optout.example..*10.0.0.6", response.out) + assert grep_c("a.wild.optout.example..*.RRSIG.A [0-9][0-9]* 3 300 .*", response.out) def test_negative_validation_delv(): # checking negative validation NXDOMAIN NSEC response = delv("a", "q.example") - assert grep_c("resolution failed: ncache nxdomain", response) + assert grep_c("resolution failed: ncache nxdomain", response.out) # checking negative validation NODATA NSEC response = delv("txt", "a.example") - assert grep_c("resolution failed: ncache nxrrset", response) + assert grep_c("resolution failed: ncache nxrrset", response.out) # checking negative validation NXDOMAIN NSEC3 response = delv("a", "q.nsec3.example") - assert grep_c("resolution failed: ncache nxdomain", response) + assert grep_c("resolution failed: ncache nxdomain", response.out) # checking negative validation NODATA NSEC3 response = delv("txt", "a.nsec3.example") - assert grep_c("resolution failed: ncache nxrrset", response) + assert grep_c("resolution failed: ncache nxrrset", response.out) # checking negative validation NXDOMAIN OPTOUT response = delv("a", "q.optout.example") - assert grep_c("resolution failed: ncache nxdomain", response) + assert grep_c("resolution failed: ncache nxdomain", response.out) # checking negative validation NODATA OPTOUT response = delv("txt", "a.optout.example") - assert grep_c("resolution failed: ncache nxrrset", response) + assert grep_c("resolution failed: ncache nxrrset", response.out) # checking negative wildcard validation NSEC response = delv("txt", "b.wild.example") - assert grep_c("resolution failed: ncache nxrrset", response) + assert grep_c("resolution failed: ncache nxrrset", response.out) # checking negative wildcard validation NSEC3 response = delv("txt", "b.wild.nsec3.example") - assert grep_c("resolution failed: ncache nxrrset", response) + assert grep_c("resolution failed: ncache nxrrset", response.out) # checking negative wildcard validation OPTOUT response = delv("txt", "b.wild.optout.example") - assert grep_c("resolution failed: ncache nxrrset", response) + assert grep_c("resolution failed: ncache nxrrset", response.out) def test_insecure_validation_delv(): # check 1-server insecurity proof NSEC response = delv("a", "a.insecure.example") - assert grep_c("a.insecure.example..*10.0.0.1", response) + assert grep_c("a.insecure.example..*10.0.0.1", response.out) # check 1-server insecurity proof NSEC3 response = delv("a", "a.insecure.nsec3.example") - assert grep_c("a.insecure.nsec3.example..*10.0.0.1", response) + assert grep_c("a.insecure.nsec3.example..*10.0.0.1", response.out) # check 1-server insecurity proof NSEC3 response = delv("a", "a.insecure.optout.example") - assert grep_c("a.insecure.optout.example..*10.0.0.1", response) + assert grep_c("a.insecure.optout.example..*10.0.0.1", response.out) # check 1-server negative insecurity proof NSEC response = delv("a", "q.insecure.example") - assert grep_c("resolution failed: ncache nxdomain", response) + assert grep_c("resolution failed: ncache nxdomain", response.out) # check 1-server negative insecurity proof NSEC3 response = delv("a", "q.insecure.nsec3.example") - assert grep_c("resolution failed: ncache nxdomain", response) + assert grep_c("resolution failed: ncache nxdomain", response.out) # check 1-server negative insecurity proof OPTOUT response = delv("a", "q.insecure.optout.example") - assert grep_c("resolution failed: ncache nxdomain", response) + assert grep_c("resolution failed: ncache nxdomain", response.out) def test_validation_failure_delv(): # check failed validation due to bogus data response = delv("+cd", "a", "a.bogus.example") - assert grep_c("resolution failed: RRSIG failed to verify", response) + assert grep_c("resolution failed: RRSIG failed to verify", response.out) # check failed validation due to missing key record response = delv("+cd", "a", "a.b.keyless.example") - assert grep_c("resolution failed: insecurity proof failed", response) + assert grep_c("resolution failed: insecurity proof failed", response.out) def test_revoked_key_delv(): # check failed validation succeeds when a revoked key is encountered response = delv("+cd", "soa", "revkey.example") - assert grep_c("fully validated", response) + assert grep_c("fully validated", response.out) diff --git a/bin/tests/system/dnssec/tests_signing.py b/bin/tests/system/dnssec/tests_signing.py index 2ff3e0e063e..b0295e5840c 100644 --- a/bin/tests/system/dnssec/tests_signing.py +++ b/bin/tests/system/dnssec/tests_signing.py @@ -63,14 +63,14 @@ def grep_c(regex, filename): def keygen(*args): keygen_cmd = [os.environ.get("KEYGEN")] keygen_cmd.extend(args) - return isctest.run.cmd(keygen_cmd, log_stdout=True).stdout.decode("utf-8").strip() + return isctest.run.cmd(keygen_cmd).out.strip() # run dnssec-settime def settime(*args): settime_cmd = [os.environ.get("SETTIME")] settime_cmd.extend(args) - return isctest.run.cmd(settime_cmd, log_stdout=True).stdout.decode("utf-8").strip() + return isctest.run.cmd(settime_cmd).out.strip() @pytest.mark.parametrize( diff --git a/bin/tests/system/dnstap/tests_dnstap.py b/bin/tests/system/dnstap/tests_dnstap.py index a8680fed1d1..aec43785eaa 100644 --- a/bin/tests/system/dnstap/tests_dnstap.py +++ b/bin/tests/system/dnstap/tests_dnstap.py @@ -48,7 +48,7 @@ def test_dnstap_dispatch_socket_addresses(ns3): os.rename(os.path.join("ns3", "dnstap.out.0"), "dnstap.out.resolver_addresses") # Read the contents of the dnstap file using dnstap-read. - run = isctest.run.cmd( + dnstapread = isctest.run.cmd( [isctest.vars.ALL["DNSTAPREAD"], "dnstap.out.resolver_addresses"], ) @@ -64,7 +64,7 @@ def test_dnstap_dispatch_socket_addresses(ns3): bad_frames = [] inspected_frames = 0 addr_regex = r"^10\.53\.0\.[0-9]+:[0-9]{1,5}$" - for line in run.stdout.decode("utf-8").splitlines(): + for line in dnstapread.out.splitlines(): _, _, frame_type, addr1, _, addr2, _ = line.split(" ", 6) # Only inspect RESOLVER_QUERY and RESOLVER_RESPONSE frames. if frame_type not in ("RQ", "RR"): diff --git a/bin/tests/system/doth/conftest.py b/bin/tests/system/doth/conftest.py index 27dca7ee8a4..b95baeaab5a 100644 --- a/bin/tests/system/doth/conftest.py +++ b/bin/tests/system/doth/conftest.py @@ -25,10 +25,10 @@ def gnutls_cli_executable(): pytest.skip("gnutls-cli not found in PATH") # Ensure gnutls-cli supports the --logfile command-line option. - output = isctest.run.cmd( + cmd = isctest.run.cmd( [executable, "--logfile=/dev/null"], log_stderr=False, raise_on_exception=False - ).stdout - if b"illegal option" in output: + ) + if "illegal option" in cmd.out: pytest.skip("gnutls-cli does not support the --logfile option") return executable diff --git a/bin/tests/system/isctest/kasp.py b/bin/tests/system/isctest/kasp.py index 1a657a26dbb..2e1b2c36fb8 100644 --- a/bin/tests/system/isctest/kasp.py +++ b/bin/tests/system/isctest/kasp.py @@ -15,7 +15,6 @@ import glob import os from pathlib import Path import re -import subprocess import time from typing import Dict, List, Optional, Tuple, Union @@ -533,8 +532,8 @@ class Key: str(self.keyfile), ] - out = isctest.run.cmd(dsfromkey_command) - dsfromkey = out.stdout.decode("utf-8").split() + cmd = isctest.run.cmd(dsfromkey_command) + dsfromkey = cmd.out.split() rdata_fromfile = " ".join(dsfromkey[:7]) rdata_fromwire = " ".join(cds[:7]) @@ -837,18 +836,14 @@ def check_dnssec_verify(server, zone, tsig=None): file.write(rr.to_text()) file.write("\n") - try: - verify_command = [os.environ.get("VERIFY"), "-z", "-o", zone, zonefile] - verified = isctest.run.cmd(verify_command) - except subprocess.CalledProcessError: - pass - - if verified: - break + verify_command = [os.environ.get("VERIFY"), "-z", "-o", zone, zonefile] + verified = isctest.run.cmd(verify_command, raise_on_exception=False) + if verified.rc == 0: + return time.sleep(1) - assert verified + assert False, "zone not verified" def check_dnssecstatus(server, zone, keys, policy=None, view=None, verbose=False): diff --git a/bin/tests/system/isctest/run.py b/bin/tests/system/isctest/run.py index e4fbd459dd2..a052f65d3de 100644 --- a/bin/tests/system/isctest/run.py +++ b/bin/tests/system/isctest/run.py @@ -18,6 +18,18 @@ from typing import List, Optional import isctest.log +class CmdResult: + def __init__(self, proc=None): + self.proc = proc + self.rc = self.proc.returncode + self.out = "" + self.err = "" + if self.proc.stdout: + self.out = self.proc.stdout.decode("utf-8") + if self.proc.stderr: + self.err = self.proc.stderr.decode("utf-8") + + def cmd( args, cwd=None, @@ -29,7 +41,7 @@ def cmd( input_text: Optional[bytes] = None, raise_on_exception=True, env: Optional[dict] = None, -): +) -> CmdResult: """Execute a command with given args as subprocess.""" isctest.log.debug(f"isctest.run.cmd(): {' '.join(args)}") @@ -59,13 +71,13 @@ def cmd( env=env, ) print_debug_logs(proc) - return proc + return CmdResult(proc) except subprocess.CalledProcessError as exc: print_debug_logs(exc) isctest.log.debug(f"isctest.run.cmd(): (return code) {exc.returncode}") if raise_on_exception: raise exc - return exc + return CmdResult(exc) def _run_script( @@ -107,11 +119,11 @@ class Dig: def __init__(self, base_params: str = ""): self.base_params = base_params - def __call__(self, params: str) -> str: - """Run the dig command with the given parameters and return the decoded output.""" + def __call__(self, params: str) -> CmdResult: + """Run the dig command with the given parameters.""" return cmd( [os.environ.get("DIG")] + f"{self.base_params} {params}".split(), - ).stdout.decode("utf-8") + ) def shell(script: str, args: Optional[List[str]] = None) -> None: diff --git a/bin/tests/system/kasp/tests_kasp.py b/bin/tests/system/kasp/tests_kasp.py index 2d6aa4b46e8..d5acb31c154 100644 --- a/bin/tests/system/kasp/tests_kasp.py +++ b/bin/tests/system/kasp/tests_kasp.py @@ -1282,7 +1282,7 @@ def test_kasp_dnssec_keygen(): zone, ] - return isctest.run.cmd(keygen_command).stdout.decode("utf-8") + return isctest.run.cmd(keygen_command).out isctest.log.info( "check that 'dnssec-keygen -k' (configured policy) created valid files" @@ -1327,7 +1327,7 @@ def test_kasp_dnssec_keygen(): str(publish), key.path, ] - out = isctest.run.cmd(settime).stdout.decode("utf-8") + isctest.run.cmd(settime) isctest.check.file_contents_equal(f"{key.statefile}", f"{key.statefile}.backup") assert key.get_metadata("Publish", file=key.privatefile) == str(publish) @@ -1378,7 +1378,7 @@ def test_kasp_dnssec_keygen(): str(now), key.path, ] - out = isctest.run.cmd(settime).stdout.decode("utf-8") + isctest.run.cmd(settime) isctest.kasp.check_keys("kasp", keys, expected) isctest.kasp.check_keytimes(keys, expected) @@ -1415,7 +1415,7 @@ def test_kasp_dnssec_keygen(): str(now), key.path, ] - out = isctest.run.cmd(settime).stdout.decode("utf-8") + isctest.run.cmd(settime) isctest.kasp.check_keys("kasp", keys, expected) isctest.kasp.check_keytimes(keys, expected) @@ -1463,7 +1463,7 @@ def test_kasp_dnssec_keygen(): str(soon), key.path, ] - out = isctest.run.cmd(settime).stdout.decode("utf-8") + isctest.run.cmd(settime) isctest.kasp.check_keys("kasp", keys, expected) isctest.kasp.check_keytimes(keys, expected) diff --git a/bin/tests/system/keepalive/tests_keepalive.py b/bin/tests/system/keepalive/tests_keepalive.py index 9190ddd8b81..5314d7ee57b 100644 --- a/bin/tests/system/keepalive/tests_keepalive.py +++ b/bin/tests/system/keepalive/tests_keepalive.py @@ -31,25 +31,27 @@ def test_dig_tcp_keepalive_handling(named_port, ns2): dig = isctest.run.Dig(f"-p {str(named_port)}") isctest.log.info("check that dig handles TCP keepalive in query") - assert "; TCP-KEEPALIVE" in dig("+qr +keepalive foo.example. @10.53.0.2") + assert "; TCP-KEEPALIVE" in dig("+qr +keepalive foo.example. @10.53.0.2").out isctest.log.info("check that dig added TCP keepalive was received") assert get_keepalive_options_received() == 1 isctest.log.info("check that TCP keepalive is added for TCP responses") - assert "; TCP-KEEPALIVE" in dig("+tcp +keepalive foo.example. @10.53.0.2") + assert "; TCP-KEEPALIVE" in dig("+tcp +keepalive foo.example. @10.53.0.2").out isctest.log.info("check that TCP keepalive requires TCP") - assert "; TCP-KEEPALIVE" not in dig("+keepalive foo.example. @10.53.0.2") + assert "; TCP-KEEPALIVE" not in dig("+keepalive foo.example. @10.53.0.2").out isctest.log.info("check the default keepalive value") - assert "; TCP-KEEPALIVE: 30.0 secs" in dig( - "+tcp +keepalive foo.example. @10.53.0.3" + assert ( + "; TCP-KEEPALIVE: 30.0 secs" + in dig("+tcp +keepalive foo.example. @10.53.0.3").out ) isctest.log.info("check a keepalive configured value") - assert "; TCP-KEEPALIVE: 15.0 secs" in dig( - "+tcp +keepalive foo.example. @10.53.0.2" + assert ( + "; TCP-KEEPALIVE: 15.0 secs" + in dig("+tcp +keepalive foo.example. @10.53.0.2").out ) isctest.log.info("check a re-configured keepalive value") @@ -59,8 +61,9 @@ def test_dig_tcp_keepalive_handling(named_port, ns2): assert "tcp-keepalive-timeout=300" in response assert "tcp-advertised-timeout=200" in response assert "tcp-primaries-timeout=100" in response - assert "; TCP-KEEPALIVE: 20.0 secs" in dig( - "+tcp +keepalive foo.example. @10.53.0.2" + assert ( + "; TCP-KEEPALIVE: 20.0 secs" + in dig("+tcp +keepalive foo.example. @10.53.0.2").out ) isctest.log.info("check server config entry") diff --git a/bin/tests/system/keyfromlabel/tests_keyfromlabel.py b/bin/tests/system/keyfromlabel/tests_keyfromlabel.py index 8fddbd4dd59..da9b3913135 100644 --- a/bin/tests/system/keyfromlabel/tests_keyfromlabel.py +++ b/bin/tests/system/keyfromlabel/tests_keyfromlabel.py @@ -74,18 +74,16 @@ def token_init_and_cleanup(): ) try: - output = isctest.run.cmd( - token_init_command, env=EMPTY_OPENSSL_CONF_ENV - ).stdout.decode("utf-8") - assert "The token has been initialized and is reassigned to slot" in output + cmd = isctest.run.cmd(token_init_command, env=EMPTY_OPENSSL_CONF_ENV) + assert "The token has been initialized and is reassigned to slot" in cmd.out yield finally: - output = isctest.run.cmd( + cmd = isctest.run.cmd( token_cleanup_command, env=EMPTY_OPENSSL_CONF_ENV, raise_on_exception=False, - ).stdout.decode("utf-8") - assert re.search("Found token (.*) with matching token label", output) + ) + assert re.search("Found token (.*) with matching token label", cmd.out) # pylint: disable-msg=too-many-locals @@ -125,11 +123,9 @@ def test_keyfromlabel(alg_name, alg_type, alg_bits): HSMPIN, ] - output = isctest.run.cmd( - pkcs11_command, env=EMPTY_OPENSSL_CONF_ENV - ).stdout.decode("utf-8") + cmd = isctest.run.cmd(pkcs11_command, env=EMPTY_OPENSSL_CONF_ENV) - assert "Key pair generated" in output + assert "Key pair generated" in cmd.out def keyfromlabel(alg_name, zone, key_id, key_flag): key_flag = key_flag.split() if key_flag else [] @@ -145,12 +141,12 @@ def test_keyfromlabel(alg_name, alg_type, alg_bits): zone, ] - output = isctest.run.cmd(keyfrlab_command) - output_decoded = output.stdout.decode("utf-8").rstrip() + ".key" + cmd = isctest.run.cmd(keyfrlab_command) + keyfile = cmd.out.rstrip() + ".key" - assert os.path.exists(output_decoded) + assert os.path.exists(keyfile) - return output_decoded + return keyfile if f"{alg_name.upper()}_SUPPORTED" not in os.environ: pytest.skip(f"{alg_name} is not supported") diff --git a/bin/tests/system/ksr/tests_ksr.py b/bin/tests/system/ksr/tests_ksr.py index 374865b042e..fc869bb5272 100644 --- a/bin/tests/system/ksr/tests_ksr.py +++ b/bin/tests/system/ksr/tests_ksr.py @@ -97,8 +97,8 @@ def ksr(zone, policy, action, options="", raise_on_exception=True): zone, ] - out = isctest.run.cmd(ksr_command, raise_on_exception=raise_on_exception) - return out.stdout.decode("utf-8"), out.stderr.decode("utf-8") + cmd = isctest.run.cmd(ksr_command, raise_on_exception=raise_on_exception) + return cmd.out, cmd.err # TODO return cmd instead def check_keys( diff --git a/bin/tests/system/masterfile/tests_masterfile.py b/bin/tests/system/masterfile/tests_masterfile.py index 9b3d0b114f0..ae85d7bde43 100644 --- a/bin/tests/system/masterfile/tests_masterfile.py +++ b/bin/tests/system/masterfile/tests_masterfile.py @@ -149,7 +149,7 @@ def test_masterfile_missing_master_file_servfail(): def test_masterfile_owner_inheritance(): """Test owner inheritance after $INCLUDE""" - checker_output = isctest.run.cmd( + cmd = isctest.run.cmd( [ os.environ["CHECKZONE"], "-D", @@ -157,12 +157,12 @@ def test_masterfile_owner_inheritance(): "example", "zone/inheritownerafterinclude.db", ] - ).stdout.decode("utf-8") + ) owner_inheritance_zone = """ example. 0 IN SOA . . 0 0 0 0 0 example. 0 IN TXT "this should be at the zone apex" example. 0 IN NS . """ - checker_zone = dns.zone.from_text(checker_output, origin="example.") + checker_zone = dns.zone.from_text(cmd.out, origin="example.") expected = dns.zone.from_text(owner_inheritance_zone, origin="example.") isctest.check.zones_equal(checker_zone, expected, compare_ttl=True) diff --git a/bin/tests/system/multisigner/tests_multisigner.py b/bin/tests/system/multisigner/tests_multisigner.py index 695f2420b94..c356595afe4 100644 --- a/bin/tests/system/multisigner/tests_multisigner.py +++ b/bin/tests/system/multisigner/tests_multisigner.py @@ -73,8 +73,8 @@ def dsfromkey(key): "-w", str(key.keyfile), ] - out = isctest.run.cmd(dsfromkey_command) - return out.stdout.decode("utf-8").split() + cmd = isctest.run.cmd(dsfromkey_command) + return cmd.out.split() def check_dnssec(server, zone, keys, expected): @@ -102,12 +102,11 @@ def check_no_dnssec_in_journal(server, zone): f"{server.identifier}/{zone}.db.jnl", ] - out = isctest.run.cmd(journalprint) - contents = out.stdout.decode("utf-8") + cmd = isctest.run.cmd(journalprint) pattern = re.compile( r"^\s*(?:\S+\s+){4}(NSEC|NSEC3|NSEC3PARAM|RRSIG)", flags=re.MULTILINE ) - match = pattern.search(contents) + match = pattern.search(cmd.out) assert not match, f"{match.group(1)} record found in journal" diff --git a/bin/tests/system/nzd2nzf/tests_nzd2nzf.py b/bin/tests/system/nzd2nzf/tests_nzd2nzf.py index e25f2dfc9a9..5ad766c4cfd 100644 --- a/bin/tests/system/nzd2nzf/tests_nzd2nzf.py +++ b/bin/tests/system/nzd2nzf/tests_nzd2nzf.py @@ -44,13 +44,11 @@ def test_nzd2nzf(ns1): # dump "_default.nzd" to "_default.nzf" and check that it contains the expected content cfg_dir = "ns1" - stdout = isctest.run.cmd( - [os.environ["NZD2NZF"], "_default.nzd"], cwd=cfg_dir - ).stdout.decode("utf-8") - assert f"zone {zone_data}" in stdout + cmd = isctest.run.cmd([os.environ["NZD2NZF"], "_default.nzd"], cwd=cfg_dir) + assert f"zone {zone_data}" in cmd.out nzf_filename = os.path.join(cfg_dir, "_default.nzf") with open(nzf_filename, "w", encoding="utf-8") as nzf_file: - nzf_file.write(stdout) + nzf_file.write(cmd.out) # delete "_default.nzd" database nzd_filename = os.path.join(cfg_dir, "_default.nzd") diff --git a/bin/tests/system/rollover-multisigner/tests_rollover_multisigner.py b/bin/tests/system/rollover-multisigner/tests_rollover_multisigner.py index 7aad2d98cdf..9c4cc47b8b3 100644 --- a/bin/tests/system/rollover-multisigner/tests_rollover_multisigner.py +++ b/bin/tests/system/rollover-multisigner/tests_rollover_multisigner.py @@ -58,7 +58,7 @@ def test_rollover_multisigner(ns3, alg, size): zone, ] - return isctest.run.cmd(keygen_command).stdout.decode("utf-8") + return isctest.run.cmd(keygen_command).out zone = "multisigner-model2.kasp" diff --git a/bin/tests/system/rrchecker/tests_rrchecker.py b/bin/tests/system/rrchecker/tests_rrchecker.py index d1cc1b71927..715e891af8e 100644 --- a/bin/tests/system/rrchecker/tests_rrchecker.py +++ b/bin/tests/system/rrchecker/tests_rrchecker.py @@ -121,22 +121,18 @@ pytestmark = pytest.mark.extra_artifacts( ], ) def test_rrchecker_list_standard_names(option, expected_result): - stdout = isctest.run.cmd([os.environ["RRCHECKER"], option]).stdout.decode("utf-8") - values = [line for line in stdout.split("\n") if line.strip()] + cmd = isctest.run.cmd([os.environ["RRCHECKER"], option]) + values = [line for line in cmd.out.split("\n") if line.strip()] assert sorted(values) == sorted(expected_result) def run_rrchecker(option, rr_class, rr_type, rr_rest): - rrchecker_output = ( - isctest.run.cmd( - [os.environ["RRCHECKER"], option], - input_text=f"{rr_class} {rr_type} {rr_rest}".encode("utf-8"), - ) - .stdout.decode("utf-8") - .strip() + cmd = isctest.run.cmd( + [os.environ["RRCHECKER"], option], + input_text=f"{rr_class} {rr_type} {rr_rest}".encode("utf-8"), ) - return rrchecker_output.split() + return cmd.out.strip().split() @pytest.mark.parametrize( @@ -162,7 +158,7 @@ def test_rrchecker_conversions(option): ".", tempzone_file, ], - ).stdout.decode("utf-8") + ).out checkzone_output = [ line for line in checkzone_output.splitlines() if not line.startswith(";") ] diff --git a/bin/tests/system/tools/tests_tools_nsec3hash.py b/bin/tests/system/tools/tests_tools_nsec3hash.py index 605e5e55af4..19571882c15 100644 --- a/bin/tests/system/tools/tests_tools_nsec3hash.py +++ b/bin/tests/system/tools/tests_tools_nsec3hash.py @@ -52,16 +52,12 @@ def test_nsec3_hashes(domain, nsec3hash): algorithm = "1" iterations = "12" - output = isctest.run.cmd( - [NSEC3HASH, salt, algorithm, iterations, domain] - ).stdout.decode("utf-8") - assert nsec3hash in output + cmd = isctest.run.cmd([NSEC3HASH, salt, algorithm, iterations, domain]) + assert nsec3hash in cmd.out flags = "0" - output = isctest.run.cmd( - [NSEC3HASH, "-r", algorithm, flags, iterations, salt, domain] - ).stdout.decode("utf-8") - assert nsec3hash in output + cmd = isctest.run.cmd([NSEC3HASH, "-r", algorithm, flags, iterations, salt, domain]) + assert nsec3hash in cmd.out @pytest.mark.parametrize( @@ -78,11 +74,11 @@ def test_nsec3_empty_salt(salt_emptiness_args): iterations = "0" domain = "com" - output = isctest.run.cmd( + cmd = isctest.run.cmd( [NSEC3HASH] + salt_emptiness_args + [algorithm, iterations, domain] - ).stdout.decode("utf-8") - assert "CK0POJMG874LJREF7EFN8430QVIT8BSM" in output - assert "salt=-" in output + ) + assert "CK0POJMG874LJREF7EFN8430QVIT8BSM" in cmd.out + assert "salt=-" in cmd.out @pytest.mark.parametrize( @@ -98,7 +94,7 @@ def test_nsec3_empty_salt_r(salt_emptiness_arg): iterations = "0" domain = "com" - output = isctest.run.cmd( + cmd = isctest.run.cmd( [ NSEC3HASH, "-r", @@ -108,8 +104,8 @@ def test_nsec3_empty_salt_r(salt_emptiness_arg): salt_emptiness_arg, domain, ] - ).stdout.decode("utf-8") - assert " - CK0POJMG874LJREF7EFN8430QVIT8BSM" in output + ) + assert " - CK0POJMG874LJREF7EFN8430QVIT8BSM" in cmd.out @pytest.mark.parametrize( @@ -145,10 +141,8 @@ def test_nsec3hash_acceptable_values(domain, it, salt_bytes) -> None: ) # calculate the hash using nsec3hash: - output = isctest.run.cmd( - [NSEC3HASH, salt_text, "1", str(it), str(domain)] - ).stdout.decode("ascii") - hash2 = output.partition(" ")[0] + cmd = isctest.run.cmd([NSEC3HASH, salt_text, "1", str(it), str(domain)]) + hash2 = cmd.out.partition(" ")[0] assert hash1 == hash2 diff --git a/bin/tests/system/verify/tests_verify.py b/bin/tests/system/verify/tests_verify.py index 203c2181be1..9ec09331764 100644 --- a/bin/tests/system/verify/tests_verify.py +++ b/bin/tests/system/verify/tests_verify.py @@ -11,6 +11,7 @@ import os import re +import subprocess import pytest @@ -63,12 +64,12 @@ def test_verify_good_zone_nsec_next_name_case_mismatch(): def get_bad_zone_output(zone): only_opt = ["-z"] if re.match(r"[zk]sk-only", zone) else [] - output = isctest.run.cmd( + cmd = isctest.run.cmd( [VERIFY, *only_opt, "-o", zone, f"zones/{zone}.bad"], + stderr=subprocess.STDOUT, raise_on_exception=False, ) - stream = (output.stdout + output.stderr).decode("utf-8").replace("\n", "") - return stream + return cmd.out @pytest.mark.parametrize( @@ -153,27 +154,25 @@ def test_verify_bad_zone_files_unequal_nsec3_chains(): def test_verify_soa_not_at_top_error(): # when -o is not used, origin is set to zone file name, # which should cause an error in this case - output = isctest.run.cmd( - [VERIFY, "zones/ksk+zsk.nsec.good"], raise_on_exception=False - ).stderr.decode("utf-8") - assert "not at top of zone" in output - assert "use -o to specify a different zone origin" in output + cmd = isctest.run.cmd([VERIFY, "zones/ksk+zsk.nsec.good"], raise_on_exception=False) + assert "not at top of zone" in cmd.err + assert "use -o to specify a different zone origin" in cmd.err # checking error message when an invalid -o is specified # and a SOA record not at top of zone is found def test_verify_invalid_o_option_soa_not_at_top_error(): - output = isctest.run.cmd( + cmd = isctest.run.cmd( [VERIFY, "-o", "invalid.origin", "zones/ksk+zsk.nsec.good"], raise_on_exception=False, - ).stderr.decode("utf-8") - assert "not at top of zone" in output - assert "use -o to specify a different zone origin" not in output + ) + assert "not at top of zone" in cmd.err + assert "use -o to specify a different zone origin" not in cmd.err # checking dnssec-verify -J reads journal file def test_verify_j_reads_journal_file(): - output = isctest.run.cmd( + cmd = isctest.run.cmd( [ VERIFY, "-o", @@ -182,5 +181,5 @@ def test_verify_j_reads_journal_file(): "zones/updated.other.jnl", "zones/updated.other", ] - ).stdout.decode("utf-8") - assert "Loading zone 'updated' from file 'zones/updated.other'" in output + ) + assert "Loading zone 'updated' from file 'zones/updated.other'" in cmd.out