]> git.ipfire.org Git - thirdparty/bind9.git/commitdiff
Use CmdResult to decode stdout/stderr from isctest.run.cmd()
authorNicki Křížek <nicki@isc.org>
Wed, 1 Oct 2025 15:53:18 +0000 (17:53 +0200)
committerNicki Křížek <nicki@isc.org>
Mon, 8 Dec 2025 13:57:47 +0000 (14:57 +0100)
Avoid repeating the .decode("utf-8") snippet when processing command
output and provide a helper instead, which leads to more concise code.

20 files changed:
bin/tests/system/checkconf-keys/tests_checkconf_keys.py
bin/tests/system/checkconf/tests_checkconf.py
bin/tests/system/checkds/tests_checkds.py
bin/tests/system/dnssec/tests_delv.py
bin/tests/system/dnssec/tests_signing.py
bin/tests/system/dnstap/tests_dnstap.py
bin/tests/system/doth/conftest.py
bin/tests/system/isctest/kasp.py
bin/tests/system/isctest/run.py
bin/tests/system/kasp/tests_kasp.py
bin/tests/system/keepalive/tests_keepalive.py
bin/tests/system/keyfromlabel/tests_keyfromlabel.py
bin/tests/system/ksr/tests_ksr.py
bin/tests/system/masterfile/tests_masterfile.py
bin/tests/system/multisigner/tests_multisigner.py
bin/tests/system/nzd2nzf/tests_nzd2nzf.py
bin/tests/system/rollover-multisigner/tests_rollover_multisigner.py
bin/tests/system/rrchecker/tests_rrchecker.py
bin/tests/system/tools/tests_tools_nsec3hash.py
bin/tests/system/verify/tests_verify.py

index c89f156f825909db560daacab1114e6cd25c030e..1e512cf246931fab0a0d2be096a14e2a46eb31fd 100644 (file)
@@ -38,125 +38,117 @@ def test_dnssecpolicy_keystore():
 
     # Superfluous key file.
     zone = "superfluous-keyfile.kz.example"
-    out = isctest.run.cmd(
+    cmd = isctest.run.cmd(
         [CHECKCONF, "-k", "bad-superfluous-keyfile.conf"], raise_on_exception=False
     )
-    err = out.stdout.decode("utf-8")
-    assert f"zone '{zone}': wrong number of key files (3, expected 2)" in err
+    assert f"zone '{zone}': wrong number of key files (3, expected 2)" in cmd.out
 
     # Missing key file.
     zone = "missing-keyfile.kz.example"
-    out = isctest.run.cmd(
+    cmd = isctest.run.cmd(
         [CHECKCONF, "-k", "bad-missing-keyfile.conf"], raise_on_exception=False
     )
-    err = out.stdout.decode("utf-8")
-    assert f"zone '{zone}': wrong number of key files (1, expected 2)" in err
+    assert f"zone '{zone}': wrong number of key files (1, expected 2)" in cmd.out
 
     # Mismatch algorithm.
     zone = "bad-algorithm.kz.example"
-    out = isctest.run.cmd(
+    cmd = isctest.run.cmd(
         [CHECKCONF, "-k", "bad-algorithm.conf"], raise_on_exception=False
     )
-    err = out.stdout.decode("utf-8")
     keys = isctest.kasp.keydir_to_keylist(zone)
     assert len(keys) == 2
     assert (
         f"zone '{zone}': key file '{zone}/ECDSAP256SHA256/{keys[0].tag}' does not match dnssec-policy alternative-kz"
-        in err
+        in cmd.out
     )
     assert (
         f"zone '{zone}': key file '{zone}/ECDSAP256SHA256/{keys[1].tag}' does not match dnssec-policy alternative-kz"
-        in err
+        in cmd.out
     )
     assert (
         f"zone '{zone}': no key file found matching dnssec-policy alternative-kz key:'ksk algorithm:RSASHA256 length:2048 tag-range:0-65535'"
-        in err
+        in cmd.out
     )
     assert (
         f"zone '{zone}': no key file found matching dnssec-policy alternative-kz key:'zsk algorithm:RSASHA256 length:2048 tag-range:0-65535'"
-        in err
+        in cmd.out
     )
 
     # Mismatch length
     zone = "bad-length.csk.example"
-    out = isctest.run.cmd(
+    cmd = isctest.run.cmd(
         [CHECKCONF, "-k", "bad-length.conf"], raise_on_exception=False
     )
-    err = out.stdout.decode("utf-8")
     keys = isctest.kasp.keydir_to_keylist(zone)
     assert len(keys) == 1
     assert (
         f"zone '{zone}': key file '{zone}/RSASHA256/{keys[0].tag}' does not match dnssec-policy alternative-csk"
-        in err
+        in cmd.out
     )
     assert (
         f"zone '{zone}': no key file found matching dnssec-policy alternative-csk key:'csk algorithm:RSASHA256 length:2048 tag-range:0-65535'"
-        in err
+        in cmd.out
     )
 
     # Mismatch tag range
     zone = "bad-tagrange.csk.example"
-    out = isctest.run.cmd(
+    cmd = isctest.run.cmd(
         [CHECKCONF, "-k", "bad-tagrange.conf"], raise_on_exception=False
     )
-    err = out.stdout.decode("utf-8")
     keys = isctest.kasp.keydir_to_keylist(zone)
     assert len(keys) == 1
     assert (
         f"zone '{zone}': key file '{zone}/ECDSAP256SHA256/{keys[0].tag}' does not match dnssec-policy tagrange-csk"
-        in err
+        in cmd.out
     )
     assert (
         f"zone '{zone}': no key file found matching dnssec-policy tagrange-csk key:'csk algorithm:ECDSAP256SHA256 length:256 tag-range:0-32767'"
-        in err
+        in cmd.out
     )
 
     # Mismatch role
     zone = "bad-role.kz.example"
-    out = isctest.run.cmd([CHECKCONF, "-k", "bad-role.conf"], raise_on_exception=False)
-    err = out.stdout.decode("utf-8")
+    cmd = isctest.run.cmd([CHECKCONF, "-k", "bad-role.conf"], raise_on_exception=False)
     keys = isctest.kasp.keydir_to_keylist(zone)
     assert len(keys) == 2
     assert (
         f"zone '{zone}': no key file found matching dnssec-policy default-kz key:'zsk algorithm:ECDSAP256SHA256 length:256 tag-range:0-65535'"
-        in err
+        in cmd.out
     )
 
     # Mismatch algorithm (default policy)
     zone = "bad-default-algorithm.example"
-    out = isctest.run.cmd(
+    cmd = isctest.run.cmd(
         [CHECKCONF, "-k", "bad-default-algorithm.conf"], raise_on_exception=False
     )
-    err = out.stdout.decode("utf-8")
     keys = isctest.kasp.keydir_to_keylist(zone)
     assert len(keys) == 1
     assert (
         f"zone '{zone}': key file '{zone}/RSASHA256/{keys[0].tag}' does not match dnssec-policy default"
-        in err
+        in cmd.out
     )
     assert (
         f"zone '{zone}': no key file found matching dnssec-policy default key:'csk algorithm:ECDSAP256SHA256 length:256 tag-range:0-65535'"
-        in err
+        in cmd.out
     )
 
     # Mismatch role (default policy)
     zone = "bad-default-kz.example"
-    out = isctest.run.cmd(
+    cmd = isctest.run.cmd(
         [CHECKCONF, "-k", "bad-default-kz.conf"], raise_on_exception=False
     )
-    err = out.stdout.decode("utf-8")
     keys = isctest.kasp.keydir_to_keylist(zone)
     assert len(keys) == 2
     assert (
         f"zone '{zone}': key file '{zone}/ECDSAP256SHA256/{keys[0].tag}' does not match dnssec-policy default"
-        in err
+        in cmd.out
     )
     assert (
         f"zone '{zone}': key file '{zone}/ECDSAP256SHA256/{keys[1].tag}' does not match dnssec-policy default"
-        in err
+        in cmd.out
     )
     assert (
         f"zone '{zone}': no key file found matching dnssec-policy default key:'csk algorithm:ECDSAP256SHA256 length:256 tag-range:0-65535'"
-        in err
+        in cmd.out
     )
-    assert f"zone '{zone}': wrong number of key files (2, expected 1)" in err
+    assert f"zone '{zone}': wrong number of key files (2, expected 1)" in cmd.out
index 83abdb1e2f1f13d1919b972731330d417c6bc55d..3b7ef067806f684d5ba2317eb057d98ae54b8fe8 100644 (file)
@@ -15,25 +15,23 @@ import isctest
 
 
 def test_checkconf_effective():
-    proc = isctest.run.cmd([os.environ["CHECKCONF"], "-e", "effective.conf"])
-    checkconf_output = proc.stdout.decode()
-    assert "listen-on port 5353 {\n\t\t127.1.2.3/32;\n\t};" in checkconf_output
-    assert 'view "_bind" chaos {' in checkconf_output
-    assert 'remote-servers "_default_iana_root_zone_primaries" {' in checkconf_output
-    assert 'view "foo" {\n}' in checkconf_output
+    cmd = isctest.run.cmd([os.environ["CHECKCONF"], "-e", "effective.conf"])
+    assert b"listen-on port 5353 {\n\t\t127.1.2.3/32;\n\t};" in cmd.proc.stdout
+    assert 'view "_bind" chaos {' in cmd.out
+    assert 'remote-servers "_default_iana_root_zone_primaries" {' in cmd.out
+    assert b'view "foo" {\n}' in cmd.proc.stdout
 
     # builtin-trust-anchors is non documented and internal clause only, it must
     # not be visible.
-    assert "builtin-trust-anchors" not in checkconf_output
+    assert "builtin-trust-anchors" not in cmd.out
 
 
 def test_checkconf_builtin():
-    proc = isctest.run.cmd([os.environ["CHECKCONF"], "-b"])
-    checkconf_output = proc.stdout.decode()
-    assert 'listen-on  {\n\t\t"any";\n\t};' in checkconf_output
-    assert 'view "_bind" chaos {' in checkconf_output
-    assert 'remote-servers "_default_iana_root_zone_primaries" {' in checkconf_output
+    cmd = isctest.run.cmd([os.environ["CHECKCONF"], "-b"])
+    assert b'listen-on  {\n\t\t"any";\n\t};' in cmd.proc.stdout
+    assert 'view "_bind" chaos {' in cmd.out
+    assert 'remote-servers "_default_iana_root_zone_primaries" {' in cmd.out
 
     # builtin-trust-anchors is non documented and internal clause only, it must
     # not be visible.
-    assert "builtin-trust-anchors" not in checkconf_output
+    assert "builtin-trust-anchors" not in cmd.out
index 5e859a44c241f8324b70084e895dc94505a66108..b6c36e3908546f67fd9f9206b1849ae6798a9fbd 100755 (executable)
@@ -102,10 +102,10 @@ def verify_zone(zone, transfer):
 
     verifier = isctest.run.cmd(verify_cmd)
 
-    if verifier.returncode != 0:
+    if verifier.rc != 0:
         isctest.log.error(f"dnssec-verify {zone} failed")
 
-    return verifier.returncode == 0
+    return verifier.rc == 0
 
 
 def read_statefile(server, zone):
@@ -210,10 +210,10 @@ def rekey(zone):
     ]
     controller = isctest.run.cmd(rndc_cmd)
 
-    if controller.returncode != 0:
+    if controller.rc != 0:
         isctest.log.error(f"rndc loadkeys {zone} failed")
 
-    assert controller.returncode == 0
+    assert controller.rc == 0
 
 
 class CheckDSTest(NamedTuple):
index aa374584bacdff253d2ddf874dc42135ff6eca77..9727db3a014999baad4f1dbeab8e4e77221fc1b6 100644 (file)
@@ -64,125 +64,121 @@ def delv(*args, tkeys=False):
     delv_cmd.extend(["@10.53.0.4", "-a", tfile, "-p", os.environ["PORT"]])
     delv_cmd.extend(args)
 
-    return (
-        isctest.run.cmd(delv_cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
-        .stdout.decode("utf-8")
-        .strip()
-    )
+    return isctest.run.cmd(delv_cmd, stderr=subprocess.STDOUT)
 
 
 def test_positive_validation_delv():
     # check positive validation NSEC
     response = delv("a", "a.example")
-    assert grep_c("a.example..*10.0.0.1", response)
-    assert grep_c("a.example..*.RRSIG.A [0-9][0-9]* 2 300 .*", response)
+    assert grep_c("a.example..*10.0.0.1", response.out)
+    assert grep_c("a.example..*.RRSIG.A [0-9][0-9]* 2 300 .*", response.out)
 
     # check positive validation NSEC (trsuted-keys)
     response = delv("a", "a.example", tkeys=True)
-    assert grep_c("a.example..*10.0.0.1", response)
-    assert grep_c("a.example..*.RRSIG.A [0-9][0-9]* 2 300 .*", response)
+    assert grep_c("a.example..*10.0.0.1", response.out)
+    assert grep_c("a.example..*.RRSIG.A [0-9][0-9]* 2 300 .*", response.out)
 
     # check positive validation NSEC3
     response = delv("a", "a.nsec3.example")
-    assert grep_c("a.nsec3.example..*10.0.0.1", response)
-    assert grep_c("a.nsec3.example..*.RRSIG.A [0-9][0-9]* 3 300 .*", response)
+    assert grep_c("a.nsec3.example..*10.0.0.1", response.out)
+    assert grep_c("a.nsec3.example..*.RRSIG.A [0-9][0-9]* 3 300 .*", response.out)
 
     # check positive validation OPTOUT
     response = delv("a", "a.optout.example")
-    assert grep_c("a.optout.example..*10.0.0.1", response)
-    assert grep_c("a.optout.example..*.RRSIG.A [0-9][0-9]* 3 300 .*", response)
+    assert grep_c("a.optout.example..*10.0.0.1", response.out)
+    assert grep_c("a.optout.example..*.RRSIG.A [0-9][0-9]* 3 300 .*", response.out)
 
     # check positive wildcard validation NSEC
     response = delv("a", "a.wild.example")
-    assert grep_c("a.wild.example..*10.0.0.27", response)
-    assert grep_c("a.wild.example..*.RRSIG.A [0-9][0-9]* 2 300 .*", response)
+    assert grep_c("a.wild.example..*10.0.0.27", response.out)
+    assert grep_c("a.wild.example..*.RRSIG.A [0-9][0-9]* 2 300 .*", response.out)
 
     # check positive wildcard validation NSEC3
     response = delv("a", "a.wild.nsec3.example")
-    assert grep_c("a.wild.nsec3.example..*10.0.0.6", response)
-    assert grep_c("a.wild.nsec3.example..*.RRSIG.A [0-9][0-9]* 3 300 .*", response)
+    assert grep_c("a.wild.nsec3.example..*10.0.0.6", response.out)
+    assert grep_c("a.wild.nsec3.example..*.RRSIG.A [0-9][0-9]* 3 300 .*", response.out)
 
     # check positive wildcard validation OPTOUT
     response = delv("a", "a.wild.optout.example")
-    assert grep_c("a.wild.optout.example..*10.0.0.6", response)
-    assert grep_c("a.wild.optout.example..*.RRSIG.A [0-9][0-9]* 3 300 .*", response)
+    assert grep_c("a.wild.optout.example..*10.0.0.6", response.out)
+    assert grep_c("a.wild.optout.example..*.RRSIG.A [0-9][0-9]* 3 300 .*", response.out)
 
 
 def test_negative_validation_delv():
     # checking negative validation NXDOMAIN NSEC
     response = delv("a", "q.example")
-    assert grep_c("resolution failed: ncache nxdomain", response)
+    assert grep_c("resolution failed: ncache nxdomain", response.out)
 
     # checking negative validation NODATA NSEC
     response = delv("txt", "a.example")
-    assert grep_c("resolution failed: ncache nxrrset", response)
+    assert grep_c("resolution failed: ncache nxrrset", response.out)
 
     # checking negative validation NXDOMAIN NSEC3
     response = delv("a", "q.nsec3.example")
-    assert grep_c("resolution failed: ncache nxdomain", response)
+    assert grep_c("resolution failed: ncache nxdomain", response.out)
 
     # checking negative validation NODATA NSEC3
     response = delv("txt", "a.nsec3.example")
-    assert grep_c("resolution failed: ncache nxrrset", response)
+    assert grep_c("resolution failed: ncache nxrrset", response.out)
 
     # checking negative validation NXDOMAIN OPTOUT
     response = delv("a", "q.optout.example")
-    assert grep_c("resolution failed: ncache nxdomain", response)
+    assert grep_c("resolution failed: ncache nxdomain", response.out)
 
     # checking negative validation NODATA OPTOUT
     response = delv("txt", "a.optout.example")
-    assert grep_c("resolution failed: ncache nxrrset", response)
+    assert grep_c("resolution failed: ncache nxrrset", response.out)
 
     # checking negative wildcard validation NSEC
     response = delv("txt", "b.wild.example")
-    assert grep_c("resolution failed: ncache nxrrset", response)
+    assert grep_c("resolution failed: ncache nxrrset", response.out)
 
     # checking negative wildcard validation NSEC3
     response = delv("txt", "b.wild.nsec3.example")
-    assert grep_c("resolution failed: ncache nxrrset", response)
+    assert grep_c("resolution failed: ncache nxrrset", response.out)
 
     # checking negative wildcard validation OPTOUT
     response = delv("txt", "b.wild.optout.example")
-    assert grep_c("resolution failed: ncache nxrrset", response)
+    assert grep_c("resolution failed: ncache nxrrset", response.out)
 
 
 def test_insecure_validation_delv():
     # check 1-server insecurity proof NSEC
     response = delv("a", "a.insecure.example")
-    assert grep_c("a.insecure.example..*10.0.0.1", response)
+    assert grep_c("a.insecure.example..*10.0.0.1", response.out)
 
     # check 1-server insecurity proof NSEC3
     response = delv("a", "a.insecure.nsec3.example")
-    assert grep_c("a.insecure.nsec3.example..*10.0.0.1", response)
+    assert grep_c("a.insecure.nsec3.example..*10.0.0.1", response.out)
 
     # check 1-server insecurity proof NSEC3
     response = delv("a", "a.insecure.optout.example")
-    assert grep_c("a.insecure.optout.example..*10.0.0.1", response)
+    assert grep_c("a.insecure.optout.example..*10.0.0.1", response.out)
 
     # check 1-server negative insecurity proof NSEC
     response = delv("a", "q.insecure.example")
-    assert grep_c("resolution failed: ncache nxdomain", response)
+    assert grep_c("resolution failed: ncache nxdomain", response.out)
 
     # check 1-server negative insecurity proof NSEC3
     response = delv("a", "q.insecure.nsec3.example")
-    assert grep_c("resolution failed: ncache nxdomain", response)
+    assert grep_c("resolution failed: ncache nxdomain", response.out)
 
     # check 1-server negative insecurity proof OPTOUT
     response = delv("a", "q.insecure.optout.example")
-    assert grep_c("resolution failed: ncache nxdomain", response)
+    assert grep_c("resolution failed: ncache nxdomain", response.out)
 
 
 def test_validation_failure_delv():
     # check failed validation due to bogus data
     response = delv("+cd", "a", "a.bogus.example")
-    assert grep_c("resolution failed: RRSIG failed to verify", response)
+    assert grep_c("resolution failed: RRSIG failed to verify", response.out)
 
     # check failed validation due to missing key record
     response = delv("+cd", "a", "a.b.keyless.example")
-    assert grep_c("resolution failed: insecurity proof failed", response)
+    assert grep_c("resolution failed: insecurity proof failed", response.out)
 
 
 def test_revoked_key_delv():
     # check failed validation succeeds when a revoked key is encountered
     response = delv("+cd", "soa", "revkey.example")
-    assert grep_c("fully validated", response)
+    assert grep_c("fully validated", response.out)
index 2ff3e0e063e5bf181e7733f234e7ca1a140c840a..b0295e5840cca8836215e57385219367603c2d73 100644 (file)
@@ -63,14 +63,14 @@ def grep_c(regex, filename):
 def keygen(*args):
     keygen_cmd = [os.environ.get("KEYGEN")]
     keygen_cmd.extend(args)
-    return isctest.run.cmd(keygen_cmd, log_stdout=True).stdout.decode("utf-8").strip()
+    return isctest.run.cmd(keygen_cmd).out.strip()
 
 
 # run dnssec-settime
 def settime(*args):
     settime_cmd = [os.environ.get("SETTIME")]
     settime_cmd.extend(args)
-    return isctest.run.cmd(settime_cmd, log_stdout=True).stdout.decode("utf-8").strip()
+    return isctest.run.cmd(settime_cmd).out.strip()
 
 
 @pytest.mark.parametrize(
index a8680fed1d137227848208e0a7ee2401a5f5a7c5..aec43785eaaab1f257170e715d3691deca9a3a46 100644 (file)
@@ -48,7 +48,7 @@ def test_dnstap_dispatch_socket_addresses(ns3):
     os.rename(os.path.join("ns3", "dnstap.out.0"), "dnstap.out.resolver_addresses")
 
     # Read the contents of the dnstap file using dnstap-read.
-    run = isctest.run.cmd(
+    dnstapread = isctest.run.cmd(
         [isctest.vars.ALL["DNSTAPREAD"], "dnstap.out.resolver_addresses"],
     )
 
@@ -64,7 +64,7 @@ def test_dnstap_dispatch_socket_addresses(ns3):
     bad_frames = []
     inspected_frames = 0
     addr_regex = r"^10\.53\.0\.[0-9]+:[0-9]{1,5}$"
-    for line in run.stdout.decode("utf-8").splitlines():
+    for line in dnstapread.out.splitlines():
         _, _, frame_type, addr1, _, addr2, _ = line.split(" ", 6)
         # Only inspect RESOLVER_QUERY and RESOLVER_RESPONSE frames.
         if frame_type not in ("RQ", "RR"):
index 27dca7ee8a47d63a35b70d7279de432634bab5bc..b95baeaab5a3f20a1c2b8a0ce3af526f1895166b 100644 (file)
@@ -25,10 +25,10 @@ def gnutls_cli_executable():
         pytest.skip("gnutls-cli not found in PATH")
 
     # Ensure gnutls-cli supports the --logfile command-line option.
-    output = isctest.run.cmd(
+    cmd = isctest.run.cmd(
         [executable, "--logfile=/dev/null"], log_stderr=False, raise_on_exception=False
-    ).stdout
-    if b"illegal option" in output:
+    )
+    if "illegal option" in cmd.out:
         pytest.skip("gnutls-cli does not support the --logfile option")
 
     return executable
index 1a657a26dbbee4898184375240e4cecb6a27d109..2e1b2c36fb8b911fb3828f92bfa50981866f3d17 100644 (file)
@@ -15,7 +15,6 @@ import glob
 import os
 from pathlib import Path
 import re
-import subprocess
 import time
 from typing import Dict, List, Optional, Tuple, Union
 
@@ -533,8 +532,8 @@ class Key:
             str(self.keyfile),
         ]
 
-        out = isctest.run.cmd(dsfromkey_command)
-        dsfromkey = out.stdout.decode("utf-8").split()
+        cmd = isctest.run.cmd(dsfromkey_command)
+        dsfromkey = cmd.out.split()
 
         rdata_fromfile = " ".join(dsfromkey[:7])
         rdata_fromwire = " ".join(cds[:7])
@@ -837,18 +836,14 @@ def check_dnssec_verify(server, zone, tsig=None):
                     file.write(rr.to_text())
                     file.write("\n")
 
-            try:
-                verify_command = [os.environ.get("VERIFY"), "-z", "-o", zone, zonefile]
-                verified = isctest.run.cmd(verify_command)
-            except subprocess.CalledProcessError:
-                pass
-
-        if verified:
-            break
+            verify_command = [os.environ.get("VERIFY"), "-z", "-o", zone, zonefile]
+            verified = isctest.run.cmd(verify_command, raise_on_exception=False)
+            if verified.rc == 0:
+                return
 
         time.sleep(1)
 
-    assert verified
+    assert False, "zone not verified"
 
 
 def check_dnssecstatus(server, zone, keys, policy=None, view=None, verbose=False):
index e4fbd459dd26cfc172836be995e05a6165b4543d..a052f65d3de013b1ce20b86d34d363604554514e 100644 (file)
@@ -18,6 +18,18 @@ from typing import List, Optional
 import isctest.log
 
 
+class CmdResult:
+    def __init__(self, proc=None):
+        self.proc = proc
+        self.rc = self.proc.returncode
+        self.out = ""
+        self.err = ""
+        if self.proc.stdout:
+            self.out = self.proc.stdout.decode("utf-8")
+        if self.proc.stderr:
+            self.err = self.proc.stderr.decode("utf-8")
+
+
 def cmd(
     args,
     cwd=None,
@@ -29,7 +41,7 @@ def cmd(
     input_text: Optional[bytes] = None,
     raise_on_exception=True,
     env: Optional[dict] = None,
-):
+) -> CmdResult:
     """Execute a command with given args as subprocess."""
     isctest.log.debug(f"isctest.run.cmd(): {' '.join(args)}")
 
@@ -59,13 +71,13 @@ def cmd(
             env=env,
         )
         print_debug_logs(proc)
-        return proc
+        return CmdResult(proc)
     except subprocess.CalledProcessError as exc:
         print_debug_logs(exc)
         isctest.log.debug(f"isctest.run.cmd(): (return code) {exc.returncode}")
         if raise_on_exception:
             raise exc
-        return exc
+        return CmdResult(exc)
 
 
 def _run_script(
@@ -107,11 +119,11 @@ class Dig:
     def __init__(self, base_params: str = ""):
         self.base_params = base_params
 
-    def __call__(self, params: str) -> str:
-        """Run the dig command with the given parameters and return the decoded output."""
+    def __call__(self, params: str) -> CmdResult:
+        """Run the dig command with the given parameters."""
         return cmd(
             [os.environ.get("DIG")] + f"{self.base_params} {params}".split(),
-        ).stdout.decode("utf-8")
+        )
 
 
 def shell(script: str, args: Optional[List[str]] = None) -> None:
index 2d6aa4b46e8f9fe3bb4a777baf04cc90ae648473..d5acb31c154fd3c040566ef37fb19c71c97c1ff7 100644 (file)
@@ -1282,7 +1282,7 @@ def test_kasp_dnssec_keygen():
             zone,
         ]
 
-        return isctest.run.cmd(keygen_command).stdout.decode("utf-8")
+        return isctest.run.cmd(keygen_command).out
 
     isctest.log.info(
         "check that 'dnssec-keygen -k' (configured policy) created valid files"
@@ -1327,7 +1327,7 @@ def test_kasp_dnssec_keygen():
         str(publish),
         key.path,
     ]
-    out = isctest.run.cmd(settime).stdout.decode("utf-8")
+    isctest.run.cmd(settime)
 
     isctest.check.file_contents_equal(f"{key.statefile}", f"{key.statefile}.backup")
     assert key.get_metadata("Publish", file=key.privatefile) == str(publish)
@@ -1378,7 +1378,7 @@ def test_kasp_dnssec_keygen():
         str(now),
         key.path,
     ]
-    out = isctest.run.cmd(settime).stdout.decode("utf-8")
+    isctest.run.cmd(settime)
     isctest.kasp.check_keys("kasp", keys, expected)
     isctest.kasp.check_keytimes(keys, expected)
 
@@ -1415,7 +1415,7 @@ def test_kasp_dnssec_keygen():
         str(now),
         key.path,
     ]
-    out = isctest.run.cmd(settime).stdout.decode("utf-8")
+    isctest.run.cmd(settime)
     isctest.kasp.check_keys("kasp", keys, expected)
     isctest.kasp.check_keytimes(keys, expected)
 
@@ -1463,7 +1463,7 @@ def test_kasp_dnssec_keygen():
         str(soon),
         key.path,
     ]
-    out = isctest.run.cmd(settime).stdout.decode("utf-8")
+    isctest.run.cmd(settime)
     isctest.kasp.check_keys("kasp", keys, expected)
     isctest.kasp.check_keytimes(keys, expected)
 
index 9190ddd8b818ff9c4b32b73145f3a29c6e75188c..5314d7ee57bef4428192133f7f27a2f973f2d942 100644 (file)
@@ -31,25 +31,27 @@ def test_dig_tcp_keepalive_handling(named_port, ns2):
     dig = isctest.run.Dig(f"-p {str(named_port)}")
 
     isctest.log.info("check that dig handles TCP keepalive in query")
-    assert "; TCP-KEEPALIVE" in dig("+qr +keepalive foo.example. @10.53.0.2")
+    assert "; TCP-KEEPALIVE" in dig("+qr +keepalive foo.example. @10.53.0.2").out
 
     isctest.log.info("check that dig added TCP keepalive was received")
     assert get_keepalive_options_received() == 1
 
     isctest.log.info("check that TCP keepalive is added for TCP responses")
-    assert "; TCP-KEEPALIVE" in dig("+tcp +keepalive foo.example. @10.53.0.2")
+    assert "; TCP-KEEPALIVE" in dig("+tcp +keepalive foo.example. @10.53.0.2").out
 
     isctest.log.info("check that TCP keepalive requires TCP")
-    assert "; TCP-KEEPALIVE" not in dig("+keepalive foo.example. @10.53.0.2")
+    assert "; TCP-KEEPALIVE" not in dig("+keepalive foo.example. @10.53.0.2").out
 
     isctest.log.info("check the default keepalive value")
-    assert "; TCP-KEEPALIVE: 30.0 secs" in dig(
-        "+tcp +keepalive foo.example. @10.53.0.3"
+    assert (
+        "; TCP-KEEPALIVE: 30.0 secs"
+        in dig("+tcp +keepalive foo.example. @10.53.0.3").out
     )
 
     isctest.log.info("check a keepalive configured value")
-    assert "; TCP-KEEPALIVE: 15.0 secs" in dig(
-        "+tcp +keepalive foo.example. @10.53.0.2"
+    assert (
+        "; TCP-KEEPALIVE: 15.0 secs"
+        in dig("+tcp +keepalive foo.example. @10.53.0.2").out
     )
 
     isctest.log.info("check a re-configured keepalive value")
@@ -59,8 +61,9 @@ def test_dig_tcp_keepalive_handling(named_port, ns2):
     assert "tcp-keepalive-timeout=300" in response
     assert "tcp-advertised-timeout=200" in response
     assert "tcp-primaries-timeout=100" in response
-    assert "; TCP-KEEPALIVE: 20.0 secs" in dig(
-        "+tcp +keepalive foo.example. @10.53.0.2"
+    assert (
+        "; TCP-KEEPALIVE: 20.0 secs"
+        in dig("+tcp +keepalive foo.example. @10.53.0.2").out
     )
 
     isctest.log.info("check server config entry")
index 8fddbd4dd59ad526203cd38a4597c373f6c2bb92..da9b3913135e10acde38ffcd6b13f1caa41649e1 100644 (file)
@@ -74,18 +74,16 @@ def token_init_and_cleanup():
     )
 
     try:
-        output = isctest.run.cmd(
-            token_init_command, env=EMPTY_OPENSSL_CONF_ENV
-        ).stdout.decode("utf-8")
-        assert "The token has been initialized and is reassigned to slot" in output
+        cmd = isctest.run.cmd(token_init_command, env=EMPTY_OPENSSL_CONF_ENV)
+        assert "The token has been initialized and is reassigned to slot" in cmd.out
         yield
     finally:
-        output = isctest.run.cmd(
+        cmd = isctest.run.cmd(
             token_cleanup_command,
             env=EMPTY_OPENSSL_CONF_ENV,
             raise_on_exception=False,
-        ).stdout.decode("utf-8")
-        assert re.search("Found token (.*) with matching token label", output)
+        )
+        assert re.search("Found token (.*) with matching token label", cmd.out)
 
 
 # pylint: disable-msg=too-many-locals
@@ -125,11 +123,9 @@ def test_keyfromlabel(alg_name, alg_type, alg_bits):
             HSMPIN,
         ]
 
-        output = isctest.run.cmd(
-            pkcs11_command, env=EMPTY_OPENSSL_CONF_ENV
-        ).stdout.decode("utf-8")
+        cmd = isctest.run.cmd(pkcs11_command, env=EMPTY_OPENSSL_CONF_ENV)
 
-        assert "Key pair generated" in output
+        assert "Key pair generated" in cmd.out
 
     def keyfromlabel(alg_name, zone, key_id, key_flag):
         key_flag = key_flag.split() if key_flag else []
@@ -145,12 +141,12 @@ def test_keyfromlabel(alg_name, alg_type, alg_bits):
             zone,
         ]
 
-        output = isctest.run.cmd(keyfrlab_command)
-        output_decoded = output.stdout.decode("utf-8").rstrip() + ".key"
+        cmd = isctest.run.cmd(keyfrlab_command)
+        keyfile = cmd.out.rstrip() + ".key"
 
-        assert os.path.exists(output_decoded)
+        assert os.path.exists(keyfile)
 
-        return output_decoded
+        return keyfile
 
     if f"{alg_name.upper()}_SUPPORTED" not in os.environ:
         pytest.skip(f"{alg_name} is not supported")
index 374865b042e6fa0c3465a38f1056efba2c3131fb..fc869bb52721185b683653b08b3f42b993eaaa21 100644 (file)
@@ -97,8 +97,8 @@ def ksr(zone, policy, action, options="", raise_on_exception=True):
         zone,
     ]
 
-    out = isctest.run.cmd(ksr_command, raise_on_exception=raise_on_exception)
-    return out.stdout.decode("utf-8"), out.stderr.decode("utf-8")
+    cmd = isctest.run.cmd(ksr_command, raise_on_exception=raise_on_exception)
+    return cmd.out, cmd.err  # TODO return cmd instead
 
 
 def check_keys(
index 9b3d0b114f0a229e54b58110c3498ab0541155bb..ae85d7bde43fe9c091084564718af65ec69a7794 100644 (file)
@@ -149,7 +149,7 @@ def test_masterfile_missing_master_file_servfail():
 
 def test_masterfile_owner_inheritance():
     """Test owner inheritance after $INCLUDE"""
-    checker_output = isctest.run.cmd(
+    cmd = isctest.run.cmd(
         [
             os.environ["CHECKZONE"],
             "-D",
@@ -157,12 +157,12 @@ def test_masterfile_owner_inheritance():
             "example",
             "zone/inheritownerafterinclude.db",
         ]
-    ).stdout.decode("utf-8")
+    )
     owner_inheritance_zone = """
 example.       0       IN      SOA     . . 0 0 0 0 0
 example.       0       IN      TXT     "this should be at the zone apex"
 example.       0       IN      NS      .
 """
-    checker_zone = dns.zone.from_text(checker_output, origin="example.")
+    checker_zone = dns.zone.from_text(cmd.out, origin="example.")
     expected = dns.zone.from_text(owner_inheritance_zone, origin="example.")
     isctest.check.zones_equal(checker_zone, expected, compare_ttl=True)
index 695f2420b9416f7269bbef9642561b2ffaa3faa9..c356595afe4439fda0660780d676ee891af07ef9 100644 (file)
@@ -73,8 +73,8 @@ def dsfromkey(key):
         "-w",
         str(key.keyfile),
     ]
-    out = isctest.run.cmd(dsfromkey_command)
-    return out.stdout.decode("utf-8").split()
+    cmd = isctest.run.cmd(dsfromkey_command)
+    return cmd.out.split()
 
 
 def check_dnssec(server, zone, keys, expected):
@@ -102,12 +102,11 @@ def check_no_dnssec_in_journal(server, zone):
         f"{server.identifier}/{zone}.db.jnl",
     ]
 
-    out = isctest.run.cmd(journalprint)
-    contents = out.stdout.decode("utf-8")
+    cmd = isctest.run.cmd(journalprint)
     pattern = re.compile(
         r"^\s*(?:\S+\s+){4}(NSEC|NSEC3|NSEC3PARAM|RRSIG)", flags=re.MULTILINE
     )
-    match = pattern.search(contents)
+    match = pattern.search(cmd.out)
     assert not match, f"{match.group(1)} record found in journal"
 
 
index e25f2dfc9a9a906f3c44870563e5615454fb3f36..5ad766c4cfd9cfb23af645188c8ea696826faebd 100644 (file)
@@ -44,13 +44,11 @@ def test_nzd2nzf(ns1):
 
     # dump "_default.nzd" to "_default.nzf" and check that it contains the expected content
     cfg_dir = "ns1"
-    stdout = isctest.run.cmd(
-        [os.environ["NZD2NZF"], "_default.nzd"], cwd=cfg_dir
-    ).stdout.decode("utf-8")
-    assert f"zone {zone_data}" in stdout
+    cmd = isctest.run.cmd([os.environ["NZD2NZF"], "_default.nzd"], cwd=cfg_dir)
+    assert f"zone {zone_data}" in cmd.out
     nzf_filename = os.path.join(cfg_dir, "_default.nzf")
     with open(nzf_filename, "w", encoding="utf-8") as nzf_file:
-        nzf_file.write(stdout)
+        nzf_file.write(cmd.out)
 
     # delete "_default.nzd" database
     nzd_filename = os.path.join(cfg_dir, "_default.nzd")
index 7aad2d98cdf1ccdb6a8766633810aa18bcd5bd90..9c4cc47b8b30fb8f671dea31611ff7902b8bdd96 100644 (file)
@@ -58,7 +58,7 @@ def test_rollover_multisigner(ns3, alg, size):
             zone,
         ]
 
-        return isctest.run.cmd(keygen_command).stdout.decode("utf-8")
+        return isctest.run.cmd(keygen_command).out
 
     zone = "multisigner-model2.kasp"
 
index d1cc1b719271d4b754a23f4959df9959ff1bd9bc..715e891af8e852c70b3c62504ef996cc43870e39 100644 (file)
@@ -121,22 +121,18 @@ pytestmark = pytest.mark.extra_artifacts(
     ],
 )
 def test_rrchecker_list_standard_names(option, expected_result):
-    stdout = isctest.run.cmd([os.environ["RRCHECKER"], option]).stdout.decode("utf-8")
-    values = [line for line in stdout.split("\n") if line.strip()]
+    cmd = isctest.run.cmd([os.environ["RRCHECKER"], option])
+    values = [line for line in cmd.out.split("\n") if line.strip()]
 
     assert sorted(values) == sorted(expected_result)
 
 
 def run_rrchecker(option, rr_class, rr_type, rr_rest):
-    rrchecker_output = (
-        isctest.run.cmd(
-            [os.environ["RRCHECKER"], option],
-            input_text=f"{rr_class} {rr_type} {rr_rest}".encode("utf-8"),
-        )
-        .stdout.decode("utf-8")
-        .strip()
+    cmd = isctest.run.cmd(
+        [os.environ["RRCHECKER"], option],
+        input_text=f"{rr_class} {rr_type} {rr_rest}".encode("utf-8"),
     )
-    return rrchecker_output.split()
+    return cmd.out.strip().split()
 
 
 @pytest.mark.parametrize(
@@ -162,7 +158,7 @@ def test_rrchecker_conversions(option):
             ".",
             tempzone_file,
         ],
-    ).stdout.decode("utf-8")
+    ).out
     checkzone_output = [
         line for line in checkzone_output.splitlines() if not line.startswith(";")
     ]
index 605e5e55af450687e35ca55d404d1821e5b2893b..19571882c15e46919f2b11087e964995ed2aab1a 100644 (file)
@@ -52,16 +52,12 @@ def test_nsec3_hashes(domain, nsec3hash):
     algorithm = "1"
     iterations = "12"
 
-    output = isctest.run.cmd(
-        [NSEC3HASH, salt, algorithm, iterations, domain]
-    ).stdout.decode("utf-8")
-    assert nsec3hash in output
+    cmd = isctest.run.cmd([NSEC3HASH, salt, algorithm, iterations, domain])
+    assert nsec3hash in cmd.out
 
     flags = "0"
-    output = isctest.run.cmd(
-        [NSEC3HASH, "-r", algorithm, flags, iterations, salt, domain]
-    ).stdout.decode("utf-8")
-    assert nsec3hash in output
+    cmd = isctest.run.cmd([NSEC3HASH, "-r", algorithm, flags, iterations, salt, domain])
+    assert nsec3hash in cmd.out
 
 
 @pytest.mark.parametrize(
@@ -78,11 +74,11 @@ def test_nsec3_empty_salt(salt_emptiness_args):
     iterations = "0"
     domain = "com"
 
-    output = isctest.run.cmd(
+    cmd = isctest.run.cmd(
         [NSEC3HASH] + salt_emptiness_args + [algorithm, iterations, domain]
-    ).stdout.decode("utf-8")
-    assert "CK0POJMG874LJREF7EFN8430QVIT8BSM" in output
-    assert "salt=-" in output
+    )
+    assert "CK0POJMG874LJREF7EFN8430QVIT8BSM" in cmd.out
+    assert "salt=-" in cmd.out
 
 
 @pytest.mark.parametrize(
@@ -98,7 +94,7 @@ def test_nsec3_empty_salt_r(salt_emptiness_arg):
     iterations = "0"
     domain = "com"
 
-    output = isctest.run.cmd(
+    cmd = isctest.run.cmd(
         [
             NSEC3HASH,
             "-r",
@@ -108,8 +104,8 @@ def test_nsec3_empty_salt_r(salt_emptiness_arg):
             salt_emptiness_arg,
             domain,
         ]
-    ).stdout.decode("utf-8")
-    assert " - CK0POJMG874LJREF7EFN8430QVIT8BSM" in output
+    )
+    assert " - CK0POJMG874LJREF7EFN8430QVIT8BSM" in cmd.out
 
 
 @pytest.mark.parametrize(
@@ -145,10 +141,8 @@ def test_nsec3hash_acceptable_values(domain, it, salt_bytes) -> None:
     )
 
     # calculate the hash using nsec3hash:
-    output = isctest.run.cmd(
-        [NSEC3HASH, salt_text, "1", str(it), str(domain)]
-    ).stdout.decode("ascii")
-    hash2 = output.partition(" ")[0]
+    cmd = isctest.run.cmd([NSEC3HASH, salt_text, "1", str(it), str(domain)])
+    hash2 = cmd.out.partition(" ")[0]
 
     assert hash1 == hash2
 
index 203c2181be14e586f2305f914e580e3e46d07ebb..9ec0933176434ff3548d94fe56f313a6cef847fc 100644 (file)
@@ -11,6 +11,7 @@
 
 import os
 import re
+import subprocess
 
 import pytest
 
@@ -63,12 +64,12 @@ def test_verify_good_zone_nsec_next_name_case_mismatch():
 
 def get_bad_zone_output(zone):
     only_opt = ["-z"] if re.match(r"[zk]sk-only", zone) else []
-    output = isctest.run.cmd(
+    cmd = isctest.run.cmd(
         [VERIFY, *only_opt, "-o", zone, f"zones/{zone}.bad"],
+        stderr=subprocess.STDOUT,
         raise_on_exception=False,
     )
-    stream = (output.stdout + output.stderr).decode("utf-8").replace("\n", "")
-    return stream
+    return cmd.out
 
 
 @pytest.mark.parametrize(
@@ -153,27 +154,25 @@ def test_verify_bad_zone_files_unequal_nsec3_chains():
 def test_verify_soa_not_at_top_error():
     # when -o is not used, origin is set to zone file name,
     # which should cause an error in this case
-    output = isctest.run.cmd(
-        [VERIFY, "zones/ksk+zsk.nsec.good"], raise_on_exception=False
-    ).stderr.decode("utf-8")
-    assert "not at top of zone" in output
-    assert "use -o to specify a different zone origin" in output
+    cmd = isctest.run.cmd([VERIFY, "zones/ksk+zsk.nsec.good"], raise_on_exception=False)
+    assert "not at top of zone" in cmd.err
+    assert "use -o to specify a different zone origin" in cmd.err
 
 
 # checking error message when an invalid -o is specified
 # and a SOA record not at top of zone is found
 def test_verify_invalid_o_option_soa_not_at_top_error():
-    output = isctest.run.cmd(
+    cmd = isctest.run.cmd(
         [VERIFY, "-o", "invalid.origin", "zones/ksk+zsk.nsec.good"],
         raise_on_exception=False,
-    ).stderr.decode("utf-8")
-    assert "not at top of zone" in output
-    assert "use -o to specify a different zone origin" not in output
+    )
+    assert "not at top of zone" in cmd.err
+    assert "use -o to specify a different zone origin" not in cmd.err
 
 
 # checking dnssec-verify -J reads journal file
 def test_verify_j_reads_journal_file():
-    output = isctest.run.cmd(
+    cmd = isctest.run.cmd(
         [
             VERIFY,
             "-o",
@@ -182,5 +181,5 @@ def test_verify_j_reads_journal_file():
             "zones/updated.other.jnl",
             "zones/updated.other",
         ]
-    ).stdout.decode("utf-8")
-    assert "Loading zone 'updated' from file 'zones/updated.other'" in output
+    )
+    assert "Loading zone 'updated' from file 'zones/updated.other'" in cmd.out