# Superfluous key file.
zone = "superfluous-keyfile.kz.example"
- out = isctest.run.cmd(
+ cmd = isctest.run.cmd(
[CHECKCONF, "-k", "bad-superfluous-keyfile.conf"], raise_on_exception=False
)
- err = out.stdout.decode("utf-8")
- assert f"zone '{zone}': wrong number of key files (3, expected 2)" in err
+ assert f"zone '{zone}': wrong number of key files (3, expected 2)" in cmd.out
# Missing key file.
zone = "missing-keyfile.kz.example"
- out = isctest.run.cmd(
+ cmd = isctest.run.cmd(
[CHECKCONF, "-k", "bad-missing-keyfile.conf"], raise_on_exception=False
)
- err = out.stdout.decode("utf-8")
- assert f"zone '{zone}': wrong number of key files (1, expected 2)" in err
+ assert f"zone '{zone}': wrong number of key files (1, expected 2)" in cmd.out
# Mismatch algorithm.
zone = "bad-algorithm.kz.example"
- out = isctest.run.cmd(
+ cmd = isctest.run.cmd(
[CHECKCONF, "-k", "bad-algorithm.conf"], raise_on_exception=False
)
- err = out.stdout.decode("utf-8")
keys = isctest.kasp.keydir_to_keylist(zone)
assert len(keys) == 2
assert (
f"zone '{zone}': key file '{zone}/ECDSAP256SHA256/{keys[0].tag}' does not match dnssec-policy alternative-kz"
- in err
+ in cmd.out
)
assert (
f"zone '{zone}': key file '{zone}/ECDSAP256SHA256/{keys[1].tag}' does not match dnssec-policy alternative-kz"
- in err
+ in cmd.out
)
assert (
f"zone '{zone}': no key file found matching dnssec-policy alternative-kz key:'ksk algorithm:RSASHA256 length:2048 tag-range:0-65535'"
- in err
+ in cmd.out
)
assert (
f"zone '{zone}': no key file found matching dnssec-policy alternative-kz key:'zsk algorithm:RSASHA256 length:2048 tag-range:0-65535'"
- in err
+ in cmd.out
)
# Mismatch length
zone = "bad-length.csk.example"
- out = isctest.run.cmd(
+ cmd = isctest.run.cmd(
[CHECKCONF, "-k", "bad-length.conf"], raise_on_exception=False
)
- err = out.stdout.decode("utf-8")
keys = isctest.kasp.keydir_to_keylist(zone)
assert len(keys) == 1
assert (
f"zone '{zone}': key file '{zone}/RSASHA256/{keys[0].tag}' does not match dnssec-policy alternative-csk"
- in err
+ in cmd.out
)
assert (
f"zone '{zone}': no key file found matching dnssec-policy alternative-csk key:'csk algorithm:RSASHA256 length:2048 tag-range:0-65535'"
- in err
+ in cmd.out
)
# Mismatch tag range
zone = "bad-tagrange.csk.example"
- out = isctest.run.cmd(
+ cmd = isctest.run.cmd(
[CHECKCONF, "-k", "bad-tagrange.conf"], raise_on_exception=False
)
- err = out.stdout.decode("utf-8")
keys = isctest.kasp.keydir_to_keylist(zone)
assert len(keys) == 1
assert (
f"zone '{zone}': key file '{zone}/ECDSAP256SHA256/{keys[0].tag}' does not match dnssec-policy tagrange-csk"
- in err
+ in cmd.out
)
assert (
f"zone '{zone}': no key file found matching dnssec-policy tagrange-csk key:'csk algorithm:ECDSAP256SHA256 length:256 tag-range:0-32767'"
- in err
+ in cmd.out
)
# Mismatch role
zone = "bad-role.kz.example"
- out = isctest.run.cmd([CHECKCONF, "-k", "bad-role.conf"], raise_on_exception=False)
- err = out.stdout.decode("utf-8")
+ cmd = isctest.run.cmd([CHECKCONF, "-k", "bad-role.conf"], raise_on_exception=False)
keys = isctest.kasp.keydir_to_keylist(zone)
assert len(keys) == 2
assert (
f"zone '{zone}': no key file found matching dnssec-policy default-kz key:'zsk algorithm:ECDSAP256SHA256 length:256 tag-range:0-65535'"
- in err
+ in cmd.out
)
# Mismatch algorithm (default policy)
zone = "bad-default-algorithm.example"
- out = isctest.run.cmd(
+ cmd = isctest.run.cmd(
[CHECKCONF, "-k", "bad-default-algorithm.conf"], raise_on_exception=False
)
- err = out.stdout.decode("utf-8")
keys = isctest.kasp.keydir_to_keylist(zone)
assert len(keys) == 1
assert (
f"zone '{zone}': key file '{zone}/RSASHA256/{keys[0].tag}' does not match dnssec-policy default"
- in err
+ in cmd.out
)
assert (
f"zone '{zone}': no key file found matching dnssec-policy default key:'csk algorithm:ECDSAP256SHA256 length:256 tag-range:0-65535'"
- in err
+ in cmd.out
)
# Mismatch role (default policy)
zone = "bad-default-kz.example"
- out = isctest.run.cmd(
+ cmd = isctest.run.cmd(
[CHECKCONF, "-k", "bad-default-kz.conf"], raise_on_exception=False
)
- err = out.stdout.decode("utf-8")
keys = isctest.kasp.keydir_to_keylist(zone)
assert len(keys) == 2
assert (
f"zone '{zone}': key file '{zone}/ECDSAP256SHA256/{keys[0].tag}' does not match dnssec-policy default"
- in err
+ in cmd.out
)
assert (
f"zone '{zone}': key file '{zone}/ECDSAP256SHA256/{keys[1].tag}' does not match dnssec-policy default"
- in err
+ in cmd.out
)
assert (
f"zone '{zone}': no key file found matching dnssec-policy default key:'csk algorithm:ECDSAP256SHA256 length:256 tag-range:0-65535'"
- in err
+ in cmd.out
)
- assert f"zone '{zone}': wrong number of key files (2, expected 1)" in err
+ assert f"zone '{zone}': wrong number of key files (2, expected 1)" in cmd.out
def test_checkconf_effective():
- proc = isctest.run.cmd([os.environ["CHECKCONF"], "-e", "effective.conf"])
- checkconf_output = proc.stdout.decode()
- assert "listen-on port 5353 {\n\t\t127.1.2.3/32;\n\t};" in checkconf_output
- assert 'view "_bind" chaos {' in checkconf_output
- assert 'remote-servers "_default_iana_root_zone_primaries" {' in checkconf_output
- assert 'view "foo" {\n}' in checkconf_output
+ cmd = isctest.run.cmd([os.environ["CHECKCONF"], "-e", "effective.conf"])
+ assert b"listen-on port 5353 {\n\t\t127.1.2.3/32;\n\t};" in cmd.proc.stdout
+ assert 'view "_bind" chaos {' in cmd.out
+ assert 'remote-servers "_default_iana_root_zone_primaries" {' in cmd.out
+ assert b'view "foo" {\n}' in cmd.proc.stdout
# builtin-trust-anchors is non documented and internal clause only, it must
# not be visible.
- assert "builtin-trust-anchors" not in checkconf_output
+ assert "builtin-trust-anchors" not in cmd.out
def test_checkconf_builtin():
- proc = isctest.run.cmd([os.environ["CHECKCONF"], "-b"])
- checkconf_output = proc.stdout.decode()
- assert 'listen-on {\n\t\t"any";\n\t};' in checkconf_output
- assert 'view "_bind" chaos {' in checkconf_output
- assert 'remote-servers "_default_iana_root_zone_primaries" {' in checkconf_output
+ cmd = isctest.run.cmd([os.environ["CHECKCONF"], "-b"])
+ assert b'listen-on {\n\t\t"any";\n\t};' in cmd.proc.stdout
+ assert 'view "_bind" chaos {' in cmd.out
+ assert 'remote-servers "_default_iana_root_zone_primaries" {' in cmd.out
# builtin-trust-anchors is non documented and internal clause only, it must
# not be visible.
- assert "builtin-trust-anchors" not in checkconf_output
+ assert "builtin-trust-anchors" not in cmd.out
verifier = isctest.run.cmd(verify_cmd)
- if verifier.returncode != 0:
+ if verifier.rc != 0:
isctest.log.error(f"dnssec-verify {zone} failed")
- return verifier.returncode == 0
+ return verifier.rc == 0
def read_statefile(server, zone):
]
controller = isctest.run.cmd(rndc_cmd)
- if controller.returncode != 0:
+ if controller.rc != 0:
isctest.log.error(f"rndc loadkeys {zone} failed")
- assert controller.returncode == 0
+ assert controller.rc == 0
class CheckDSTest(NamedTuple):
delv_cmd.extend(["@10.53.0.4", "-a", tfile, "-p", os.environ["PORT"]])
delv_cmd.extend(args)
- return (
- isctest.run.cmd(delv_cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
- .stdout.decode("utf-8")
- .strip()
- )
+ return isctest.run.cmd(delv_cmd, stderr=subprocess.STDOUT)
def test_positive_validation_delv():
# check positive validation NSEC
response = delv("a", "a.example")
- assert grep_c("a.example..*10.0.0.1", response)
- assert grep_c("a.example..*.RRSIG.A [0-9][0-9]* 2 300 .*", response)
+ assert grep_c("a.example..*10.0.0.1", response.out)
+ assert grep_c("a.example..*.RRSIG.A [0-9][0-9]* 2 300 .*", response.out)
# check positive validation NSEC (trsuted-keys)
response = delv("a", "a.example", tkeys=True)
- assert grep_c("a.example..*10.0.0.1", response)
- assert grep_c("a.example..*.RRSIG.A [0-9][0-9]* 2 300 .*", response)
+ assert grep_c("a.example..*10.0.0.1", response.out)
+ assert grep_c("a.example..*.RRSIG.A [0-9][0-9]* 2 300 .*", response.out)
# check positive validation NSEC3
response = delv("a", "a.nsec3.example")
- assert grep_c("a.nsec3.example..*10.0.0.1", response)
- assert grep_c("a.nsec3.example..*.RRSIG.A [0-9][0-9]* 3 300 .*", response)
+ assert grep_c("a.nsec3.example..*10.0.0.1", response.out)
+ assert grep_c("a.nsec3.example..*.RRSIG.A [0-9][0-9]* 3 300 .*", response.out)
# check positive validation OPTOUT
response = delv("a", "a.optout.example")
- assert grep_c("a.optout.example..*10.0.0.1", response)
- assert grep_c("a.optout.example..*.RRSIG.A [0-9][0-9]* 3 300 .*", response)
+ assert grep_c("a.optout.example..*10.0.0.1", response.out)
+ assert grep_c("a.optout.example..*.RRSIG.A [0-9][0-9]* 3 300 .*", response.out)
# check positive wildcard validation NSEC
response = delv("a", "a.wild.example")
- assert grep_c("a.wild.example..*10.0.0.27", response)
- assert grep_c("a.wild.example..*.RRSIG.A [0-9][0-9]* 2 300 .*", response)
+ assert grep_c("a.wild.example..*10.0.0.27", response.out)
+ assert grep_c("a.wild.example..*.RRSIG.A [0-9][0-9]* 2 300 .*", response.out)
# check positive wildcard validation NSEC3
response = delv("a", "a.wild.nsec3.example")
- assert grep_c("a.wild.nsec3.example..*10.0.0.6", response)
- assert grep_c("a.wild.nsec3.example..*.RRSIG.A [0-9][0-9]* 3 300 .*", response)
+ assert grep_c("a.wild.nsec3.example..*10.0.0.6", response.out)
+ assert grep_c("a.wild.nsec3.example..*.RRSIG.A [0-9][0-9]* 3 300 .*", response.out)
# check positive wildcard validation OPTOUT
response = delv("a", "a.wild.optout.example")
- assert grep_c("a.wild.optout.example..*10.0.0.6", response)
- assert grep_c("a.wild.optout.example..*.RRSIG.A [0-9][0-9]* 3 300 .*", response)
+ assert grep_c("a.wild.optout.example..*10.0.0.6", response.out)
+ assert grep_c("a.wild.optout.example..*.RRSIG.A [0-9][0-9]* 3 300 .*", response.out)
def test_negative_validation_delv():
# checking negative validation NXDOMAIN NSEC
response = delv("a", "q.example")
- assert grep_c("resolution failed: ncache nxdomain", response)
+ assert grep_c("resolution failed: ncache nxdomain", response.out)
# checking negative validation NODATA NSEC
response = delv("txt", "a.example")
- assert grep_c("resolution failed: ncache nxrrset", response)
+ assert grep_c("resolution failed: ncache nxrrset", response.out)
# checking negative validation NXDOMAIN NSEC3
response = delv("a", "q.nsec3.example")
- assert grep_c("resolution failed: ncache nxdomain", response)
+ assert grep_c("resolution failed: ncache nxdomain", response.out)
# checking negative validation NODATA NSEC3
response = delv("txt", "a.nsec3.example")
- assert grep_c("resolution failed: ncache nxrrset", response)
+ assert grep_c("resolution failed: ncache nxrrset", response.out)
# checking negative validation NXDOMAIN OPTOUT
response = delv("a", "q.optout.example")
- assert grep_c("resolution failed: ncache nxdomain", response)
+ assert grep_c("resolution failed: ncache nxdomain", response.out)
# checking negative validation NODATA OPTOUT
response = delv("txt", "a.optout.example")
- assert grep_c("resolution failed: ncache nxrrset", response)
+ assert grep_c("resolution failed: ncache nxrrset", response.out)
# checking negative wildcard validation NSEC
response = delv("txt", "b.wild.example")
- assert grep_c("resolution failed: ncache nxrrset", response)
+ assert grep_c("resolution failed: ncache nxrrset", response.out)
# checking negative wildcard validation NSEC3
response = delv("txt", "b.wild.nsec3.example")
- assert grep_c("resolution failed: ncache nxrrset", response)
+ assert grep_c("resolution failed: ncache nxrrset", response.out)
# checking negative wildcard validation OPTOUT
response = delv("txt", "b.wild.optout.example")
- assert grep_c("resolution failed: ncache nxrrset", response)
+ assert grep_c("resolution failed: ncache nxrrset", response.out)
def test_insecure_validation_delv():
# check 1-server insecurity proof NSEC
response = delv("a", "a.insecure.example")
- assert grep_c("a.insecure.example..*10.0.0.1", response)
+ assert grep_c("a.insecure.example..*10.0.0.1", response.out)
# check 1-server insecurity proof NSEC3
response = delv("a", "a.insecure.nsec3.example")
- assert grep_c("a.insecure.nsec3.example..*10.0.0.1", response)
+ assert grep_c("a.insecure.nsec3.example..*10.0.0.1", response.out)
# check 1-server insecurity proof NSEC3
response = delv("a", "a.insecure.optout.example")
- assert grep_c("a.insecure.optout.example..*10.0.0.1", response)
+ assert grep_c("a.insecure.optout.example..*10.0.0.1", response.out)
# check 1-server negative insecurity proof NSEC
response = delv("a", "q.insecure.example")
- assert grep_c("resolution failed: ncache nxdomain", response)
+ assert grep_c("resolution failed: ncache nxdomain", response.out)
# check 1-server negative insecurity proof NSEC3
response = delv("a", "q.insecure.nsec3.example")
- assert grep_c("resolution failed: ncache nxdomain", response)
+ assert grep_c("resolution failed: ncache nxdomain", response.out)
# check 1-server negative insecurity proof OPTOUT
response = delv("a", "q.insecure.optout.example")
- assert grep_c("resolution failed: ncache nxdomain", response)
+ assert grep_c("resolution failed: ncache nxdomain", response.out)
def test_validation_failure_delv():
# check failed validation due to bogus data
response = delv("+cd", "a", "a.bogus.example")
- assert grep_c("resolution failed: RRSIG failed to verify", response)
+ assert grep_c("resolution failed: RRSIG failed to verify", response.out)
# check failed validation due to missing key record
response = delv("+cd", "a", "a.b.keyless.example")
- assert grep_c("resolution failed: insecurity proof failed", response)
+ assert grep_c("resolution failed: insecurity proof failed", response.out)
def test_revoked_key_delv():
# check failed validation succeeds when a revoked key is encountered
response = delv("+cd", "soa", "revkey.example")
- assert grep_c("fully validated", response)
+ assert grep_c("fully validated", response.out)
def keygen(*args):
keygen_cmd = [os.environ.get("KEYGEN")]
keygen_cmd.extend(args)
- return isctest.run.cmd(keygen_cmd, log_stdout=True).stdout.decode("utf-8").strip()
+ return isctest.run.cmd(keygen_cmd).out.strip()
# run dnssec-settime
def settime(*args):
settime_cmd = [os.environ.get("SETTIME")]
settime_cmd.extend(args)
- return isctest.run.cmd(settime_cmd, log_stdout=True).stdout.decode("utf-8").strip()
+ return isctest.run.cmd(settime_cmd).out.strip()
@pytest.mark.parametrize(
os.rename(os.path.join("ns3", "dnstap.out.0"), "dnstap.out.resolver_addresses")
# Read the contents of the dnstap file using dnstap-read.
- run = isctest.run.cmd(
+ dnstapread = isctest.run.cmd(
[isctest.vars.ALL["DNSTAPREAD"], "dnstap.out.resolver_addresses"],
)
bad_frames = []
inspected_frames = 0
addr_regex = r"^10\.53\.0\.[0-9]+:[0-9]{1,5}$"
- for line in run.stdout.decode("utf-8").splitlines():
+ for line in dnstapread.out.splitlines():
_, _, frame_type, addr1, _, addr2, _ = line.split(" ", 6)
# Only inspect RESOLVER_QUERY and RESOLVER_RESPONSE frames.
if frame_type not in ("RQ", "RR"):
pytest.skip("gnutls-cli not found in PATH")
# Ensure gnutls-cli supports the --logfile command-line option.
- output = isctest.run.cmd(
+ cmd = isctest.run.cmd(
[executable, "--logfile=/dev/null"], log_stderr=False, raise_on_exception=False
- ).stdout
- if b"illegal option" in output:
+ )
+ if "illegal option" in cmd.out:
pytest.skip("gnutls-cli does not support the --logfile option")
return executable
import os
from pathlib import Path
import re
-import subprocess
import time
from typing import Dict, List, Optional, Tuple, Union
str(self.keyfile),
]
- out = isctest.run.cmd(dsfromkey_command)
- dsfromkey = out.stdout.decode("utf-8").split()
+ cmd = isctest.run.cmd(dsfromkey_command)
+ dsfromkey = cmd.out.split()
rdata_fromfile = " ".join(dsfromkey[:7])
rdata_fromwire = " ".join(cds[:7])
file.write(rr.to_text())
file.write("\n")
- try:
- verify_command = [os.environ.get("VERIFY"), "-z", "-o", zone, zonefile]
- verified = isctest.run.cmd(verify_command)
- except subprocess.CalledProcessError:
- pass
-
- if verified:
- break
+ verify_command = [os.environ.get("VERIFY"), "-z", "-o", zone, zonefile]
+ verified = isctest.run.cmd(verify_command, raise_on_exception=False)
+ if verified.rc == 0:
+ return
time.sleep(1)
- assert verified
+ assert False, "zone not verified"
def check_dnssecstatus(server, zone, keys, policy=None, view=None, verbose=False):
import isctest.log
+class CmdResult:
+ def __init__(self, proc=None):
+ self.proc = proc
+ self.rc = self.proc.returncode
+ self.out = ""
+ self.err = ""
+ if self.proc.stdout:
+ self.out = self.proc.stdout.decode("utf-8")
+ if self.proc.stderr:
+ self.err = self.proc.stderr.decode("utf-8")
+
+
def cmd(
args,
cwd=None,
input_text: Optional[bytes] = None,
raise_on_exception=True,
env: Optional[dict] = None,
-):
+) -> CmdResult:
"""Execute a command with given args as subprocess."""
isctest.log.debug(f"isctest.run.cmd(): {' '.join(args)}")
env=env,
)
print_debug_logs(proc)
- return proc
+ return CmdResult(proc)
except subprocess.CalledProcessError as exc:
print_debug_logs(exc)
isctest.log.debug(f"isctest.run.cmd(): (return code) {exc.returncode}")
if raise_on_exception:
raise exc
- return exc
+ return CmdResult(exc)
def _run_script(
def __init__(self, base_params: str = ""):
self.base_params = base_params
- def __call__(self, params: str) -> str:
- """Run the dig command with the given parameters and return the decoded output."""
+ def __call__(self, params: str) -> CmdResult:
+ """Run the dig command with the given parameters."""
return cmd(
[os.environ.get("DIG")] + f"{self.base_params} {params}".split(),
- ).stdout.decode("utf-8")
+ )
def shell(script: str, args: Optional[List[str]] = None) -> None:
zone,
]
- return isctest.run.cmd(keygen_command).stdout.decode("utf-8")
+ return isctest.run.cmd(keygen_command).out
isctest.log.info(
"check that 'dnssec-keygen -k' (configured policy) created valid files"
str(publish),
key.path,
]
- out = isctest.run.cmd(settime).stdout.decode("utf-8")
+ isctest.run.cmd(settime)
isctest.check.file_contents_equal(f"{key.statefile}", f"{key.statefile}.backup")
assert key.get_metadata("Publish", file=key.privatefile) == str(publish)
str(now),
key.path,
]
- out = isctest.run.cmd(settime).stdout.decode("utf-8")
+ isctest.run.cmd(settime)
isctest.kasp.check_keys("kasp", keys, expected)
isctest.kasp.check_keytimes(keys, expected)
str(now),
key.path,
]
- out = isctest.run.cmd(settime).stdout.decode("utf-8")
+ isctest.run.cmd(settime)
isctest.kasp.check_keys("kasp", keys, expected)
isctest.kasp.check_keytimes(keys, expected)
str(soon),
key.path,
]
- out = isctest.run.cmd(settime).stdout.decode("utf-8")
+ isctest.run.cmd(settime)
isctest.kasp.check_keys("kasp", keys, expected)
isctest.kasp.check_keytimes(keys, expected)
dig = isctest.run.Dig(f"-p {str(named_port)}")
isctest.log.info("check that dig handles TCP keepalive in query")
- assert "; TCP-KEEPALIVE" in dig("+qr +keepalive foo.example. @10.53.0.2")
+ assert "; TCP-KEEPALIVE" in dig("+qr +keepalive foo.example. @10.53.0.2").out
isctest.log.info("check that dig added TCP keepalive was received")
assert get_keepalive_options_received() == 1
isctest.log.info("check that TCP keepalive is added for TCP responses")
- assert "; TCP-KEEPALIVE" in dig("+tcp +keepalive foo.example. @10.53.0.2")
+ assert "; TCP-KEEPALIVE" in dig("+tcp +keepalive foo.example. @10.53.0.2").out
isctest.log.info("check that TCP keepalive requires TCP")
- assert "; TCP-KEEPALIVE" not in dig("+keepalive foo.example. @10.53.0.2")
+ assert "; TCP-KEEPALIVE" not in dig("+keepalive foo.example. @10.53.0.2").out
isctest.log.info("check the default keepalive value")
- assert "; TCP-KEEPALIVE: 30.0 secs" in dig(
- "+tcp +keepalive foo.example. @10.53.0.3"
+ assert (
+ "; TCP-KEEPALIVE: 30.0 secs"
+ in dig("+tcp +keepalive foo.example. @10.53.0.3").out
)
isctest.log.info("check a keepalive configured value")
- assert "; TCP-KEEPALIVE: 15.0 secs" in dig(
- "+tcp +keepalive foo.example. @10.53.0.2"
+ assert (
+ "; TCP-KEEPALIVE: 15.0 secs"
+ in dig("+tcp +keepalive foo.example. @10.53.0.2").out
)
isctest.log.info("check a re-configured keepalive value")
assert "tcp-keepalive-timeout=300" in response
assert "tcp-advertised-timeout=200" in response
assert "tcp-primaries-timeout=100" in response
- assert "; TCP-KEEPALIVE: 20.0 secs" in dig(
- "+tcp +keepalive foo.example. @10.53.0.2"
+ assert (
+ "; TCP-KEEPALIVE: 20.0 secs"
+ in dig("+tcp +keepalive foo.example. @10.53.0.2").out
)
isctest.log.info("check server config entry")
)
try:
- output = isctest.run.cmd(
- token_init_command, env=EMPTY_OPENSSL_CONF_ENV
- ).stdout.decode("utf-8")
- assert "The token has been initialized and is reassigned to slot" in output
+ cmd = isctest.run.cmd(token_init_command, env=EMPTY_OPENSSL_CONF_ENV)
+ assert "The token has been initialized and is reassigned to slot" in cmd.out
yield
finally:
- output = isctest.run.cmd(
+ cmd = isctest.run.cmd(
token_cleanup_command,
env=EMPTY_OPENSSL_CONF_ENV,
raise_on_exception=False,
- ).stdout.decode("utf-8")
- assert re.search("Found token (.*) with matching token label", output)
+ )
+ assert re.search("Found token (.*) with matching token label", cmd.out)
# pylint: disable-msg=too-many-locals
HSMPIN,
]
- output = isctest.run.cmd(
- pkcs11_command, env=EMPTY_OPENSSL_CONF_ENV
- ).stdout.decode("utf-8")
+ cmd = isctest.run.cmd(pkcs11_command, env=EMPTY_OPENSSL_CONF_ENV)
- assert "Key pair generated" in output
+ assert "Key pair generated" in cmd.out
def keyfromlabel(alg_name, zone, key_id, key_flag):
key_flag = key_flag.split() if key_flag else []
zone,
]
- output = isctest.run.cmd(keyfrlab_command)
- output_decoded = output.stdout.decode("utf-8").rstrip() + ".key"
+ cmd = isctest.run.cmd(keyfrlab_command)
+ keyfile = cmd.out.rstrip() + ".key"
- assert os.path.exists(output_decoded)
+ assert os.path.exists(keyfile)
- return output_decoded
+ return keyfile
if f"{alg_name.upper()}_SUPPORTED" not in os.environ:
pytest.skip(f"{alg_name} is not supported")
zone,
]
- out = isctest.run.cmd(ksr_command, raise_on_exception=raise_on_exception)
- return out.stdout.decode("utf-8"), out.stderr.decode("utf-8")
+ cmd = isctest.run.cmd(ksr_command, raise_on_exception=raise_on_exception)
+ return cmd.out, cmd.err # TODO return cmd instead
def check_keys(
def test_masterfile_owner_inheritance():
"""Test owner inheritance after $INCLUDE"""
- checker_output = isctest.run.cmd(
+ cmd = isctest.run.cmd(
[
os.environ["CHECKZONE"],
"-D",
"example",
"zone/inheritownerafterinclude.db",
]
- ).stdout.decode("utf-8")
+ )
owner_inheritance_zone = """
example. 0 IN SOA . . 0 0 0 0 0
example. 0 IN TXT "this should be at the zone apex"
example. 0 IN NS .
"""
- checker_zone = dns.zone.from_text(checker_output, origin="example.")
+ checker_zone = dns.zone.from_text(cmd.out, origin="example.")
expected = dns.zone.from_text(owner_inheritance_zone, origin="example.")
isctest.check.zones_equal(checker_zone, expected, compare_ttl=True)
"-w",
str(key.keyfile),
]
- out = isctest.run.cmd(dsfromkey_command)
- return out.stdout.decode("utf-8").split()
+ cmd = isctest.run.cmd(dsfromkey_command)
+ return cmd.out.split()
def check_dnssec(server, zone, keys, expected):
f"{server.identifier}/{zone}.db.jnl",
]
- out = isctest.run.cmd(journalprint)
- contents = out.stdout.decode("utf-8")
+ cmd = isctest.run.cmd(journalprint)
pattern = re.compile(
r"^\s*(?:\S+\s+){4}(NSEC|NSEC3|NSEC3PARAM|RRSIG)", flags=re.MULTILINE
)
- match = pattern.search(contents)
+ match = pattern.search(cmd.out)
assert not match, f"{match.group(1)} record found in journal"
# dump "_default.nzd" to "_default.nzf" and check that it contains the expected content
cfg_dir = "ns1"
- stdout = isctest.run.cmd(
- [os.environ["NZD2NZF"], "_default.nzd"], cwd=cfg_dir
- ).stdout.decode("utf-8")
- assert f"zone {zone_data}" in stdout
+ cmd = isctest.run.cmd([os.environ["NZD2NZF"], "_default.nzd"], cwd=cfg_dir)
+ assert f"zone {zone_data}" in cmd.out
nzf_filename = os.path.join(cfg_dir, "_default.nzf")
with open(nzf_filename, "w", encoding="utf-8") as nzf_file:
- nzf_file.write(stdout)
+ nzf_file.write(cmd.out)
# delete "_default.nzd" database
nzd_filename = os.path.join(cfg_dir, "_default.nzd")
zone,
]
- return isctest.run.cmd(keygen_command).stdout.decode("utf-8")
+ return isctest.run.cmd(keygen_command).out
zone = "multisigner-model2.kasp"
],
)
def test_rrchecker_list_standard_names(option, expected_result):
- stdout = isctest.run.cmd([os.environ["RRCHECKER"], option]).stdout.decode("utf-8")
- values = [line for line in stdout.split("\n") if line.strip()]
+ cmd = isctest.run.cmd([os.environ["RRCHECKER"], option])
+ values = [line for line in cmd.out.split("\n") if line.strip()]
assert sorted(values) == sorted(expected_result)
def run_rrchecker(option, rr_class, rr_type, rr_rest):
- rrchecker_output = (
- isctest.run.cmd(
- [os.environ["RRCHECKER"], option],
- input_text=f"{rr_class} {rr_type} {rr_rest}".encode("utf-8"),
- )
- .stdout.decode("utf-8")
- .strip()
+ cmd = isctest.run.cmd(
+ [os.environ["RRCHECKER"], option],
+ input_text=f"{rr_class} {rr_type} {rr_rest}".encode("utf-8"),
)
- return rrchecker_output.split()
+ return cmd.out.strip().split()
@pytest.mark.parametrize(
".",
tempzone_file,
],
- ).stdout.decode("utf-8")
+ ).out
checkzone_output = [
line for line in checkzone_output.splitlines() if not line.startswith(";")
]
algorithm = "1"
iterations = "12"
- output = isctest.run.cmd(
- [NSEC3HASH, salt, algorithm, iterations, domain]
- ).stdout.decode("utf-8")
- assert nsec3hash in output
+ cmd = isctest.run.cmd([NSEC3HASH, salt, algorithm, iterations, domain])
+ assert nsec3hash in cmd.out
flags = "0"
- output = isctest.run.cmd(
- [NSEC3HASH, "-r", algorithm, flags, iterations, salt, domain]
- ).stdout.decode("utf-8")
- assert nsec3hash in output
+ cmd = isctest.run.cmd([NSEC3HASH, "-r", algorithm, flags, iterations, salt, domain])
+ assert nsec3hash in cmd.out
@pytest.mark.parametrize(
iterations = "0"
domain = "com"
- output = isctest.run.cmd(
+ cmd = isctest.run.cmd(
[NSEC3HASH] + salt_emptiness_args + [algorithm, iterations, domain]
- ).stdout.decode("utf-8")
- assert "CK0POJMG874LJREF7EFN8430QVIT8BSM" in output
- assert "salt=-" in output
+ )
+ assert "CK0POJMG874LJREF7EFN8430QVIT8BSM" in cmd.out
+ assert "salt=-" in cmd.out
@pytest.mark.parametrize(
iterations = "0"
domain = "com"
- output = isctest.run.cmd(
+ cmd = isctest.run.cmd(
[
NSEC3HASH,
"-r",
salt_emptiness_arg,
domain,
]
- ).stdout.decode("utf-8")
- assert " - CK0POJMG874LJREF7EFN8430QVIT8BSM" in output
+ )
+ assert " - CK0POJMG874LJREF7EFN8430QVIT8BSM" in cmd.out
@pytest.mark.parametrize(
)
# calculate the hash using nsec3hash:
- output = isctest.run.cmd(
- [NSEC3HASH, salt_text, "1", str(it), str(domain)]
- ).stdout.decode("ascii")
- hash2 = output.partition(" ")[0]
+ cmd = isctest.run.cmd([NSEC3HASH, salt_text, "1", str(it), str(domain)])
+ hash2 = cmd.out.partition(" ")[0]
assert hash1 == hash2
import os
import re
+import subprocess
import pytest
def get_bad_zone_output(zone):
only_opt = ["-z"] if re.match(r"[zk]sk-only", zone) else []
- output = isctest.run.cmd(
+ cmd = isctest.run.cmd(
[VERIFY, *only_opt, "-o", zone, f"zones/{zone}.bad"],
+ stderr=subprocess.STDOUT,
raise_on_exception=False,
)
- stream = (output.stdout + output.stderr).decode("utf-8").replace("\n", "")
- return stream
+ return cmd.out
@pytest.mark.parametrize(
def test_verify_soa_not_at_top_error():
# when -o is not used, origin is set to zone file name,
# which should cause an error in this case
- output = isctest.run.cmd(
- [VERIFY, "zones/ksk+zsk.nsec.good"], raise_on_exception=False
- ).stderr.decode("utf-8")
- assert "not at top of zone" in output
- assert "use -o to specify a different zone origin" in output
+ cmd = isctest.run.cmd([VERIFY, "zones/ksk+zsk.nsec.good"], raise_on_exception=False)
+ assert "not at top of zone" in cmd.err
+ assert "use -o to specify a different zone origin" in cmd.err
# checking error message when an invalid -o is specified
# and a SOA record not at top of zone is found
def test_verify_invalid_o_option_soa_not_at_top_error():
- output = isctest.run.cmd(
+ cmd = isctest.run.cmd(
[VERIFY, "-o", "invalid.origin", "zones/ksk+zsk.nsec.good"],
raise_on_exception=False,
- ).stderr.decode("utf-8")
- assert "not at top of zone" in output
- assert "use -o to specify a different zone origin" not in output
+ )
+ assert "not at top of zone" in cmd.err
+ assert "use -o to specify a different zone origin" not in cmd.err
# checking dnssec-verify -J reads journal file
def test_verify_j_reads_journal_file():
- output = isctest.run.cmd(
+ cmd = isctest.run.cmd(
[
VERIFY,
"-o",
"zones/updated.other.jnl",
"zones/updated.other",
]
- ).stdout.decode("utf-8")
- assert "Loading zone 'updated' from file 'zones/updated.other'" in output
+ )
+ assert "Loading zone 'updated' from file 'zones/updated.other'" in cmd.out