]> git.ipfire.org Git - thirdparty/bind9.git/commitdiff
Improve file handling in ksr test
authorNicki Křížek <nicki@isc.org>
Thu, 2 Oct 2025 17:12:50 +0000 (19:12 +0200)
committerNicki Křížek <nicki@isc.org>
Mon, 8 Dec 2025 13:57:47 +0000 (14:57 +0100)
Refactor the file handling to write to a file directly when calling
isctest.run.cmd().

Refactor the existing code to use CmdResult rather than out and err
separately.

bin/tests/system/ksr/tests_ksr.py

index fc869bb52721185b683653b08b3f42b993eaaa21..553eda1dff20ed579307b5efe36c5f8e01865e40 100644 (file)
@@ -85,7 +85,7 @@ def between(value, start, end):
     return start < value < end
 
 
-def ksr(zone, policy, action, options="", raise_on_exception=True):
+def ksr(zone, policy, action, options="", raise_on_exception=True, to_file=""):
     ksr_command = [
         os.environ.get("KSR"),
         "-l",
@@ -97,8 +97,14 @@ def ksr(zone, policy, action, options="", raise_on_exception=True):
         zone,
     ]
 
-    cmd = isctest.run.cmd(ksr_command, raise_on_exception=raise_on_exception)
-    return cmd.out, cmd.err  # TODO return cmd instead
+    if to_file:
+        with open(to_file, "wb") as f:
+            cmd = isctest.run.cmd(
+                ksr_command, raise_on_exception=raise_on_exception, stdout=f
+            )
+    else:
+        cmd = isctest.run.cmd(ksr_command, raise_on_exception=raise_on_exception)
+    return cmd
 
 
 def check_keys(
@@ -259,8 +265,9 @@ def check_rrsig_bundle(bundle_keys, bundle_lines, zone, rrtype, sigend, sigstart
     assert count == len(bundle_lines)
 
 
-def check_keysigningrequest(out, zsks, start, end):
-    lines = out.split("\n")
+def check_keysigningrequest(path, zsks, start, end):
+    with open(path, "r", encoding="utf-8") as f:
+        lines = f.readlines()
     line_no = 0
 
     inception = start
@@ -302,14 +309,14 @@ def check_keysigningrequest(out, zsks, start, end):
 
     # trailing empty lines
     while line_no < len(lines):
-        assert lines[line_no] == ""
+        assert lines[line_no].rstrip() == ""
         line_no += 1
 
     assert line_no == len(lines)
 
 
 def check_signedkeyresponse(
-    out,
+    path,
     zone,
     ksks,
     zsks,
@@ -319,7 +326,8 @@ def check_signedkeyresponse(
     cdnskey=True,
     cds="SHA-256",
 ):
-    lines = out.split("\n")
+    with open(path, "r", encoding="utf-8") as f:
+        lines = f.readlines()
     line_no = 0
     next_bundle = end + 1
 
@@ -341,7 +349,7 @@ def check_signedkeyresponse(
 
         # ignore empty lines
         while line_no < len(lines):
-            if lines[line_no] == "":
+            if lines[line_no].rstrip() == "":
                 line_no += 1
             else:
                 break
@@ -537,32 +545,32 @@ def check_signedkeyresponse(
 
 def test_ksr_errors():
     # check that 'dnssec-ksr' errors on unknown action
-    _, err = ksr("common.test", "common", "foobar", raise_on_exception=False)
-    assert "dnssec-ksr: fatal: unknown command 'foobar'" in err
+    cmd = ksr("common.test", "common", "foobar", raise_on_exception=False)
+    assert "dnssec-ksr: fatal: unknown command 'foobar'" in cmd.err
 
     # check that 'dnssec-ksr keygen' errors on missing end date
-    _, err = ksr("common.test", "common", "keygen", raise_on_exception=False)
-    assert "dnssec-ksr: fatal: keygen requires an end date" in err
+    cmd = ksr("common.test", "common", "keygen", raise_on_exception=False)
+    assert "dnssec-ksr: fatal: keygen requires an end date" in cmd.err
 
     # check that 'dnssec-ksr keygen' errors on zone with csk
-    _, err = ksr(
+    cmd = ksr(
         "csk.test", "csk", "keygen", options="-K ns1 -e +2y", raise_on_exception=False
     )
-    assert "dnssec-ksr: fatal: no keys created for policy 'csk'" in err
+    assert "dnssec-ksr: fatal: no keys created for policy 'csk'" in cmd.err
 
     # check that 'dnssec-ksr request' errors on missing end date
-    _, err = ksr("common.test", "common", "request", raise_on_exception=False)
-    assert "dnssec-ksr: fatal: request requires an end date" in err
+    cmd = ksr("common.test", "common", "request", raise_on_exception=False)
+    assert "dnssec-ksr: fatal: request requires an end date" in cmd.err
 
     # check that 'dnssec-ksr sign' errors on missing ksr file
-    _, err = ksr(
+    cmd = ksr(
         "common.test",
         "common",
         "sign",
         options="-K ns1/offline -i now -e +1y",
         raise_on_exception=False,
     )
-    assert "dnssec-ksr: fatal: 'sign' requires a KSR file" in err
+    assert "dnssec-ksr: fatal: 'sign' requires a KSR file" in cmd.err
 
 
 def test_ksr_common(ns1):
@@ -573,15 +581,15 @@ def test_ksr_common(ns1):
 
     # create ksk
     kskdir = "ns1/offline"
-    out, _ = ksr(zone, policy, "keygen", options=f"-K {kskdir} -i now -e +1y -o")
-    ksks = isctest.kasp.keystr_to_keylist(out, kskdir)
+    cmd = ksr(zone, policy, "keygen", options=f"-K {kskdir} -i now -e +1y -o")
+    ksks = isctest.kasp.keystr_to_keylist(cmd.out, kskdir)
     assert len(ksks) == 1
 
     check_keys(ksks, None)
 
     # check that 'dnssec-ksr keygen' pregenerates right amount of keys
-    out, _ = ksr(zone, policy, "keygen", options="-i now -e +1y")
-    zsks = isctest.kasp.keystr_to_keylist(out)
+    cmd = ksr(zone, policy, "keygen", options="-i now -e +1y")
+    zsks = isctest.kasp.keystr_to_keylist(cmd.out)
     assert len(zsks) == 2
 
     lifetime = timedelta(days=31 * 6)
@@ -590,8 +598,8 @@ def test_ksr_common(ns1):
     # check that 'dnssec-ksr keygen' pregenerates right amount of keys
     # in the given key directory
     zskdir = "ns1"
-    out, _ = ksr(zone, policy, "keygen", options=f"-K {zskdir} -i now -e +1y")
-    zsks = isctest.kasp.keystr_to_keylist(out, zskdir)
+    cmd = ksr(zone, policy, "keygen", options=f"-K {zskdir} -i now -e +1y")
+    zsks = isctest.kasp.keystr_to_keylist(cmd.out, zskdir)
     assert len(zsks) == 2
 
     lifetime = timedelta(days=31 * 6)
@@ -608,33 +616,35 @@ def test_ksr_common(ns1):
     # check that 'dnssec-ksr request' creates correct ksr
     now = zsks[0].get_timing("Created")
     until = now + timedelta(days=365)
-    out, _ = ksr(zone, policy, "request", options=f"-K {zskdir} -i {now} -e +1y")
-
-    fname = f"{zone}.ksr.{n}"
-    with open(fname, "w", encoding="utf-8") as file:
-        file.write(out)
-
-    check_keysigningrequest(out, zsks, now, until)
-
-    # check that 'dnssec-ksr sign' creates correct skr
-    out, _ = ksr(
-        zone, policy, "sign", options=f"-K {kskdir} -f {fname} -i {now} -e +1y"
+    ksr_fname = f"{zone}.ksr.{n}"
+    ksr(
+        zone,
+        policy,
+        "request",
+        options=f"-K {zskdir} -i {now} -e +1y",
+        to_file=ksr_fname,
     )
+    check_keysigningrequest(ksr_fname, zsks, now, until)
 
-    fname = f"{zone}.skr.{n}"
-    with open(fname, "w", encoding="utf-8") as file:
-        file.write(out)
-
+    # check that 'dnssec-ksr sign' creates correct skr
     refresh = -432000  # 5 days
-    check_signedkeyresponse(out, zone, ksks, zsks, now, until, refresh)
+    skr_fname = f"{zone}.skr.{n}"
+    ksr(
+        zone,
+        policy,
+        "sign",
+        options=f"-K {kskdir} -f {ksr_fname} -i {now} -e +1y",
+        to_file=skr_fname,
+    )
+    check_signedkeyresponse(skr_fname, zone, ksks, zsks, now, until, refresh)
 
     # common test cases (2)
     n = 2
 
     # check that 'dnssec-ksr keygen' selects pregenerated keys for
     # the same time bundle
-    out, _ = ksr(zone, policy, "keygen", options=f"-K {zskdir} -i {now} -e +1y")
-    selected_zsks = isctest.kasp.keystr_to_keylist(out, zskdir)
+    cmd = ksr(zone, policy, "keygen", options=f"-K {zskdir} -i {now} -e +1y")
+    selected_zsks = isctest.kasp.keystr_to_keylist(cmd.out, zskdir)
     assert len(selected_zsks) == 2
     for index, key in enumerate(selected_zsks):
         assert zsks[index] == key
@@ -648,13 +658,13 @@ def test_ksr_common(ns1):
 
     # check that 'dnssec-ksr keygen' generates only necessary keys for
     # overlapping time bundle
-    out, err = ksr(zone, policy, "keygen", options=f"-K {zskdir} -i {now} -e +2y -v 1")
-    overlapping_zsks = isctest.kasp.keystr_to_keylist(out, zskdir)
+    cmd = ksr(zone, policy, "keygen", options=f"-K {zskdir} -i {now} -e +2y -v 1")
+    overlapping_zsks = isctest.kasp.keystr_to_keylist(cmd.out, zskdir)
     assert len(overlapping_zsks) == 4
 
-    selected = len(re.findall("Selecting key pair", err))
-    generated = len(re.findall("Generating key pair", err)) - len(
-        re.findall("collide", err)
+    selected = len(re.findall("Selecting key pair", cmd.err))
+    generated = len(re.findall("Generating key pair", cmd.err)) - len(
+        re.findall("collide", cmd.err)
     )
 
     assert selected == 2
@@ -673,8 +683,8 @@ def test_ksr_common(ns1):
             )
 
     # run 'dnssec-ksr keygen' again with verbosity 0
-    out, _ = ksr(zone, policy, "keygen", options=f"-K {zskdir} -i {now} -e +2y")
-    overlapping_zsks2 = isctest.kasp.keystr_to_keylist(out, zskdir)
+    cmd = ksr(zone, policy, "keygen", options=f"-K {zskdir} -i {now} -e +2y")
+    overlapping_zsks2 = isctest.kasp.keystr_to_keylist(cmd.out, zskdir)
     assert len(overlapping_zsks2) == 4
     check_keys(overlapping_zsks2, lifetime)
     for index, key in enumerate(overlapping_zsks2):
@@ -682,47 +692,41 @@ def test_ksr_common(ns1):
 
     # check that 'dnssec-ksr request' creates correct ksr if the
     # interval is shorter
-    out, _ = ksr(zone, policy, "request", options=f"-K ns1 -i {now} -e +1y")
-
-    fname = f"{zone}.ksr.{n}.shorter"
-    with open(fname, "w", encoding="utf-8") as file:
-        file.write(out)
-
-    check_keysigningrequest(out, zsks, now, until)
+    ksr_fname = f"{zone}.ksr.{n}.shorter"
+    ksr(zone, policy, "request", options=f"-K ns1 -i {now} -e +1y", to_file=ksr_fname)
+    check_keysigningrequest(ksr_fname, zsks, now, until)
 
     # check that 'dnssec-ksr request' creates correct ksr with new interval
-    out, _ = ksr(zone, policy, "request", options=f"-K ns1 -i {now} -e +2y")
-
-    fname = f"{zone}.ksr.{n}"
-    with open(fname, "w", encoding="utf-8") as file:
-        file.write(out)
-
     until = now + timedelta(days=365 * 2)
-    check_keysigningrequest(out, overlapping_zsks, now, until)
+    ksr_fname = f"{zone}.ksr.{n}"
+    ksr(zone, policy, "request", options=f"-K ns1 -i {now} -e +2y", to_file=ksr_fname)
+    check_keysigningrequest(ksr_fname, overlapping_zsks, now, until)
 
     # check that 'dnssec-ksr request' errors if there are not enough keys
-    _, err = ksr(
+    cmd = ksr(
         zone,
         policy,
         "request",
         options=f"-K ns1 -i {now} -e +3y",
         raise_on_exception=False,
     )
-    error = f"no {zone}/ECDSAP256SHA256 zsk key pair found for bundle"
-    assert f"dnssec-ksr: fatal: {error}" in err
-
-    # check that 'dnssec-ksr sign' creates correct skr
-    out, _ = ksr(
-        zone, policy, "sign", options=f"-K ns1/offline -f {fname} -i {now} -e +2y"
+    errmsg = (
+        f"dnssec-ksr: fatal: no {zone}/ECDSAP256SHA256 zsk key pair found for bundle"
     )
+    assert errmsg in cmd.err
 
-    fname = f"{zone}.skr.{n}"
-    with open(fname, "w", encoding="utf-8") as file:
-        file.write(out)
-
+    # check that 'dnssec-ksr sign' creates correct skr
     refresh = -432000  # 5 days
+    skr_fname = f"{zone}.skr.{n}"
+    ksr(
+        zone,
+        policy,
+        "sign",
+        options=f"-K ns1/offline -f {ksr_fname} -i {now} -e +2y",
+        to_file=skr_fname,
+    )
     check_signedkeyresponse(
-        out,
+        skr_fname,
         zone,
         ksks,
         overlapping_zsks,
@@ -741,8 +745,8 @@ def test_ksr_common(ns1):
     )
 
     # import skr
-    shutil.copyfile(fname, f"ns1/{fname}")
-    ns1.rndc(f"skr -import {fname} {zone}", log=False)
+    shutil.copyfile(skr_fname, f"ns1/{skr_fname}")
+    ns1.rndc(f"skr -import {skr_fname} {zone}", log=False)
 
     # test zone is correctly signed
     # - check rndc dnssec -status output
@@ -767,16 +771,16 @@ def test_ksr_lastbundle(ns1):
     # create ksk
     kskdir = "ns1/offline"
     offset = -timedelta(days=365)
-    out, _ = ksr(zone, policy, "keygen", options=f"-K {kskdir} -i -1y -e +1d -o")
-    ksks = isctest.kasp.keystr_to_keylist(out, kskdir)
+    cmd = ksr(zone, policy, "keygen", options=f"-K {kskdir} -i -1y -e +1d -o")
+    ksks = isctest.kasp.keystr_to_keylist(cmd.out, kskdir)
     assert len(ksks) == 1
 
     check_keys(ksks, None, offset=offset)
 
     # check that 'dnssec-ksr keygen' pregenerates right amount of keys
     zskdir = "ns1"
-    out, _ = ksr(zone, policy, "keygen", options=f"-K {zskdir} -i -1y -e +1d")
-    zsks = isctest.kasp.keystr_to_keylist(out, zskdir)
+    cmd = ksr(zone, policy, "keygen", options=f"-K {zskdir} -i -1y -e +1d")
+    zsks = isctest.kasp.keystr_to_keylist(cmd.out, zskdir)
     assert len(zsks) == 2
 
     lifetime = timedelta(days=31 * 6)
@@ -785,25 +789,27 @@ def test_ksr_lastbundle(ns1):
     # check that 'dnssec-ksr request' creates correct ksr
     then = zsks[0].get_timing("Created") + offset
     until = then + timedelta(days=366)
-    out, _ = ksr(zone, policy, "request", options=f"-K {zskdir} -i {then} -e +1d")
-
-    fname = f"{zone}.ksr.{n}"
-    with open(fname, "w", encoding="utf-8") as file:
-        file.write(out)
-
-    check_keysigningrequest(out, zsks, then, until)
-
-    # check that 'dnssec-ksr sign' creates correct skr
-    out, _ = ksr(
-        zone, policy, "sign", options=f"-K {kskdir} -f {fname} -i {then} -e +1d"
+    ksr_fname = f"{zone}.ksr.{n}"
+    ksr(
+        zone,
+        policy,
+        "request",
+        options=f"-K {zskdir} -i {then} -e +1d",
+        to_file=ksr_fname,
     )
+    check_keysigningrequest(ksr_fname, zsks, then, until)
 
-    fname = f"{zone}.skr.{n}"
-    with open(fname, "w", encoding="utf-8") as file:
-        file.write(out)
-
+    # check that 'dnssec-ksr sign' creates correct skr
     refresh = -432000  # 5 days
-    check_signedkeyresponse(out, zone, ksks, zsks, then, until, refresh)
+    skr_fname = f"{zone}.skr.{n}"
+    ksr(
+        zone,
+        policy,
+        "sign",
+        options=f"-K {kskdir} -f {ksr_fname} -i {then} -e +1d",
+        to_file=skr_fname,
+    )
+    check_signedkeyresponse(skr_fname, zone, ksks, zsks, then, until, refresh)
 
     # add zone
     ns1.rndc(
@@ -815,8 +821,8 @@ def test_ksr_lastbundle(ns1):
     )
 
     # import skr
-    shutil.copyfile(fname, f"ns1/{fname}")
-    ns1.rndc(f"skr -import {fname} {zone}", log=False)
+    shutil.copyfile(skr_fname, f"ns1/{skr_fname}")
+    ns1.rndc(f"skr -import {skr_fname} {zone}", log=False)
 
     # test zone is correctly signed
     # - check rndc dnssec -status output
@@ -843,16 +849,16 @@ def test_ksr_inthemiddle(ns1):
     # create ksk
     kskdir = "ns1/offline"
     offset = -timedelta(days=365)
-    out, _ = ksr(zone, policy, "keygen", options=f"-K {kskdir} -i -1y -e +1y -o")
-    ksks = isctest.kasp.keystr_to_keylist(out, kskdir)
+    cmd = ksr(zone, policy, "keygen", options=f"-K {kskdir} -i -1y -e +1y -o")
+    ksks = isctest.kasp.keystr_to_keylist(cmd.out, kskdir)
     assert len(ksks) == 1
 
     check_keys(ksks, None, offset=offset)
 
     # check that 'dnssec-ksr keygen' pregenerates right amount of keys
     zskdir = "ns1"
-    out, _ = ksr(zone, policy, "keygen", options=f"-K {zskdir} -i -1y -e +1y")
-    zsks = isctest.kasp.keystr_to_keylist(out, zskdir)
+    cmd = ksr(zone, policy, "keygen", options=f"-K {zskdir} -i -1y -e +1y")
+    zsks = isctest.kasp.keystr_to_keylist(cmd.out, zskdir)
     assert len(zsks) == 4
 
     lifetime = timedelta(days=31 * 6)
@@ -862,25 +868,27 @@ def test_ksr_inthemiddle(ns1):
     then = zsks[0].get_timing("Created")
     then = then + offset
     until = then + timedelta(days=365 * 2)
-    out, _ = ksr(zone, policy, "request", options=f"-K {zskdir} -i {then} -e +1y")
-
-    fname = f"{zone}.ksr.{n}"
-    with open(fname, "w", encoding="utf-8") as file:
-        file.write(out)
-
-    check_keysigningrequest(out, zsks, then, until)
-
-    # check that 'dnssec-ksr sign' creates correct skr
-    out, _ = ksr(
-        zone, policy, "sign", options=f"-K {kskdir} -f {fname} -i {then} -e +1y"
+    ksr_fname = f"{zone}.ksr.{n}"
+    ksr(
+        zone,
+        policy,
+        "request",
+        options=f"-K {zskdir} -i {then} -e +1y",
+        to_file=ksr_fname,
     )
+    check_keysigningrequest(ksr_fname, zsks, then, until)
 
-    fname = f"{zone}.skr.{n}"
-    with open(fname, "w", encoding="utf-8") as file:
-        file.write(out)
-
+    # check that 'dnssec-ksr sign' creates correct skr
     refresh = -432000  # 5 days
-    check_signedkeyresponse(out, zone, ksks, zsks, then, until, refresh)
+    skr_fname = f"{zone}.skr.{n}"
+    ksr(
+        zone,
+        policy,
+        "sign",
+        options=f"-K {kskdir} -f {ksr_fname} -i {then} -e +1y",
+        to_file=skr_fname,
+    )
+    check_signedkeyresponse(skr_fname, zone, ksks, zsks, then, until, refresh)
 
     # add zone
     ns1.rndc(
@@ -892,8 +900,8 @@ def test_ksr_inthemiddle(ns1):
     )
 
     # import skr
-    shutil.copyfile(fname, f"ns1/{fname}")
-    ns1.rndc(f"skr -import {fname} {zone}", log=False)
+    shutil.copyfile(skr_fname, f"ns1/{skr_fname}")
+    ns1.rndc(f"skr -import {skr_fname} {zone}", log=False)
 
     # test zone is correctly signed
     # - check rndc dnssec -status output
@@ -920,35 +928,39 @@ def check_ksr_rekey_logs_error(server, zone, policy, offset, end):
     now = KeyTimingMetadata.now()
     then = now + offset
     until = now + end
-    out, _ = ksr(zone, policy, "keygen", options=f"-K {kskdir} -i {then} -e {until} -o")
-    ksks = isctest.kasp.keystr_to_keylist(out, kskdir)
+    cmd = ksr(zone, policy, "keygen", options=f"-K {kskdir} -i {then} -e {until} -o")
+    ksks = isctest.kasp.keystr_to_keylist(cmd.out, kskdir)
     assert len(ksks) == 1
 
     # key generation
     zskdir = "ns1"
-    out, _ = ksr(zone, policy, "keygen", options=f"-K {zskdir} -i {then} -e {until}")
-    zsks = isctest.kasp.keystr_to_keylist(out, zskdir)
+    cmd = ksr(zone, policy, "keygen", options=f"-K {zskdir} -i {then} -e {until}")
+    zsks = isctest.kasp.keystr_to_keylist(cmd.out, zskdir)
     assert len(zsks) == 2
 
     # create request
     now = zsks[0].get_timing("Created")
     then = now + offset
     until = now + end
-    out, _ = ksr(zone, policy, "request", options=f"-K {zskdir} -i {then} -e {until}")
-
-    fname = f"{zone}.ksr.{n}"
-    with open(fname, "w", encoding="utf-8") as file:
-        file.write(out)
+    ksr_fname = f"{zone}.ksr.{n}"
+    ksr(
+        zone,
+        policy,
+        "request",
+        options=f"-K {zskdir} -i {then} -e {until}",
+        to_file=ksr_fname,
+    )
 
     # sign request
-    out, _ = ksr(
-        zone, policy, "sign", options=f"-K {kskdir} -f {fname} -i {then} -e {until}"
+    skr_fname = f"{zone}.skr.{n}"
+    ksr(
+        zone,
+        policy,
+        "sign",
+        options=f"-K {kskdir} -f {ksr_fname} -i {then} -e {until}",
+        to_file=skr_fname,
     )
 
-    fname = f"{zone}.skr.{n}"
-    with open(fname, "w", encoding="utf-8") as file:
-        file.write(out)
-
     # add zone
     server.rndc(
         f"addzone {zone} "
@@ -959,8 +971,8 @@ def check_ksr_rekey_logs_error(server, zone, policy, offset, end):
     )
 
     # import skr
-    shutil.copyfile(fname, f"ns1/{fname}")
-    server.rndc(f"skr -import {fname} {zone}", log=False)
+    shutil.copyfile(skr_fname, f"ns1/{skr_fname}")
+    server.rndc(f"skr -import {skr_fname} {zone}", log=False)
 
     # test that rekey logs error
     time_remaining = 10
@@ -989,16 +1001,16 @@ def test_ksr_unlimited(ns1):
 
     # create ksk
     kskdir = "ns1/offline"
-    out, _ = ksr(zone, policy, "keygen", options=f"-K {kskdir} -i now -e +2y -o")
-    ksks = isctest.kasp.keystr_to_keylist(out, kskdir)
+    cmd = ksr(zone, policy, "keygen", options=f"-K {kskdir} -i now -e +2y -o")
+    ksks = isctest.kasp.keystr_to_keylist(cmd.out, kskdir)
     assert len(ksks) == 1
 
     check_keys(ksks, None)
 
     # check that 'dnssec-ksr keygen' pregenerates right amount of keys
     zskdir = "ns1"
-    out, _ = ksr(zone, policy, "keygen", options=f"-K {zskdir} -i now -e +2y")
-    zsks = isctest.kasp.keystr_to_keylist(out, zskdir)
+    cmd = ksr(zone, policy, "keygen", options=f"-K {zskdir} -i now -e +2y")
+    zsks = isctest.kasp.keystr_to_keylist(cmd.out, zskdir)
     assert len(zsks) == 1
 
     lifetime = None
@@ -1007,26 +1019,28 @@ def test_ksr_unlimited(ns1):
     # check that 'dnssec-ksr request' creates correct ksr
     now = zsks[0].get_timing("Created")
     until = now + timedelta(days=365 * 4)
-    out, _ = ksr(zone, policy, "request", options=f"-K {zskdir} -i {now} -e +4y")
-
-    fname = f"{zone}.ksr.{n}"
-    with open(fname, "w", encoding="utf-8") as file:
-        file.write(out)
-
-    check_keysigningrequest(out, zsks, now, until)
-
-    # check that 'dnssec-ksr sign' creates correct skr without cdnskey
-    out, _ = ksr(
-        zone, "no-cdnskey", "sign", options=f"-K {kskdir} -f {fname} -i {now} -e +4y"
+    ksr_fname = f"{zone}.ksr.{n}"
+    ksr(
+        zone,
+        policy,
+        "request",
+        options=f"-K {zskdir} -i {now} -e +4y",
+        to_file=ksr_fname,
     )
+    check_keysigningrequest(ksr_fname, zsks, now, until)
 
-    skrfile = f"{zone}.no-cdnskey.skr.{n}"
-    with open(skrfile, "w", encoding="utf-8") as file:
-        file.write(out)
-
+    # check that 'dnssec-ksr sign' creates correct skr without cdnskey
     refresh = -432000  # 5 days
+    skr_fname = f"{zone}.no-cdnskey.skr.{n}"
+    ksr(
+        zone,
+        "no-cdnskey",
+        "sign",
+        options=f"-K {kskdir} -f {ksr_fname} -i {now} -e +4y",
+        to_file=skr_fname,
+    )
     check_signedkeyresponse(
-        out,
+        skr_fname,
         zone,
         ksks,
         zsks,
@@ -1038,17 +1052,17 @@ def test_ksr_unlimited(ns1):
     )
 
     # check that 'dnssec-ksr sign' creates correct skr without cds
-    out, _ = ksr(
-        zone, "no-cds", "sign", options=f"-K {kskdir} -f {fname} -i {now} -e +4y"
-    )
-
-    skrfile = f"{zone}.no-cds.skr.{n}"
-    with open(skrfile, "w", encoding="utf-8") as file:
-        file.write(out)
-
     refresh = -432000  # 5 days
+    skr_fname = f"{zone}.no-cds.skr.{n}"
+    ksr(
+        zone,
+        "no-cds",
+        "sign",
+        options=f"-K {kskdir} -f {ksr_fname} -i {now} -e +4y",
+        to_file=skr_fname,
+    )
     check_signedkeyresponse(
-        out,
+        skr_fname,
         zone,
         ksks,
         zsks,
@@ -1059,16 +1073,16 @@ def test_ksr_unlimited(ns1):
     )
 
     # check that 'dnssec-ksr sign' creates correct skr
-    out, _ = ksr(
-        zone, policy, "sign", options=f"-K {kskdir} -f {fname} -i {now} -e +4y"
-    )
-
-    skrfile = f"{zone}.{policy}.skr.{n}"
-    with open(skrfile, "w", encoding="utf-8") as file:
-        file.write(out)
-
     refresh = -432000  # 5 days
-    check_signedkeyresponse(out, zone, ksks, zsks, now, until, refresh)
+    skr_fname = f"{zone}.{policy}.skr.{n}"
+    ksr(
+        zone,
+        policy,
+        "sign",
+        options=f"-K {kskdir} -f {ksr_fname} -i {now} -e +4y",
+        to_file=skr_fname,
+    )
+    check_signedkeyresponse(skr_fname, zone, ksks, zsks, now, until, refresh)
 
     # add zone
     ns1.rndc(
@@ -1080,8 +1094,8 @@ def test_ksr_unlimited(ns1):
     )
 
     # import skr
-    shutil.copyfile(skrfile, f"ns1/{skrfile}")
-    ns1.rndc(f"skr -import {skrfile} {zone}", log=False)
+    shutil.copyfile(skr_fname, f"ns1/{skr_fname}")
+    ns1.rndc(f"skr -import {skr_fname} {zone}", log=False)
 
     # test zone is correctly signed
     # - check rndc dnssec -status output
@@ -1103,8 +1117,8 @@ def test_ksr_twotone(ns1):
 
     # create ksk
     kskdir = "ns1/offline"
-    out, _ = ksr(zone, policy, "keygen", options=f"-K {kskdir} -i now -e +1y -o")
-    ksks = isctest.kasp.keystr_to_keylist(out, kskdir)
+    cmd = ksr(zone, policy, "keygen", options=f"-K {kskdir} -i now -e +1y -o")
+    ksks = isctest.kasp.keystr_to_keylist(cmd.out, kskdir)
     assert len(ksks) == 2
 
     ksks_defalg = []
@@ -1127,8 +1141,8 @@ def test_ksr_twotone(ns1):
 
     # check that 'dnssec-ksr keygen' pregenerates right amount of keys
     zskdir = "ns1"
-    out, _ = ksr(zone, policy, "keygen", options=f"-K {zskdir} -i now -e +1y")
-    zsks = isctest.kasp.keystr_to_keylist(out, zskdir)
+    cmd = ksr(zone, policy, "keygen", options=f"-K {zskdir} -i now -e +1y")
+    zsks = isctest.kasp.keystr_to_keylist(cmd.out, zskdir)
     # First algorithm keys have a lifetime of 3 months, so there should
     # be 4 created keys. Second algorithm keys have a lifetime of 5
     # months, so there should be 3 created keys.  While only two time
@@ -1159,25 +1173,27 @@ def test_ksr_twotone(ns1):
     # check that 'dnssec-ksr request' creates correct ksr
     now = zsks[0].get_timing("Created")
     until = now + timedelta(days=365)
-    out, _ = ksr(zone, policy, "request", options=f"-K {zskdir} -i {now} -e +1y")
-
-    fname = f"{zone}.ksr.{n}"
-    with open(fname, "w", encoding="utf-8") as file:
-        file.write(out)
-
-    check_keysigningrequest(out, zsks, now, until)
-
-    # check that 'dnssec-ksr sign' creates correct skr
-    out, _ = ksr(
-        zone, policy, "sign", options=f"-K {kskdir} -f {fname} -i {now} -e +1y"
+    ksr_fname = f"{zone}.ksr.{n}"
+    ksr(
+        zone,
+        policy,
+        "request",
+        options=f"-K {zskdir} -i {now} -e +1y",
+        to_file=ksr_fname,
     )
+    check_keysigningrequest(ksr_fname, zsks, now, until)
 
-    skrfile = f"{zone}.skr.{n}"
-    with open(skrfile, "w", encoding="utf-8") as file:
-        file.write(out)
-
+    # check that 'dnssec-ksr sign' creates correct skr
     refresh = -timedelta(days=5)
-    check_signedkeyresponse(out, zone, ksks, zsks, now, until, refresh)
+    skr_fname = f"{zone}.skr.{n}"
+    ksr(
+        zone,
+        policy,
+        "sign",
+        options=f"-K {kskdir} -f {ksr_fname} -i {now} -e +1y",
+        to_file=skr_fname,
+    )
+    check_signedkeyresponse(skr_fname, zone, ksks, zsks, now, until, refresh)
 
     # add zone
     ns1.rndc(
@@ -1189,8 +1205,8 @@ def test_ksr_twotone(ns1):
     )
 
     # import skr
-    shutil.copyfile(skrfile, f"ns1/{skrfile}")
-    ns1.rndc(f"skr -import {skrfile} {zone}", log=False)
+    shutil.copyfile(skr_fname, f"ns1/{skr_fname}")
+    ns1.rndc(f"skr -import {skr_fname} {zone}", log=False)
 
     # test zone is correctly signed
     # - check rndc dnssec -status output
@@ -1218,8 +1234,8 @@ def test_ksr_kskroll(ns1):
 
     # create ksk
     kskdir = "ns1/offline"
-    out, _ = ksr(zone, policy, "keygen", options=f"-K {kskdir} -i now -e +1y -o")
-    ksks = isctest.kasp.keystr_to_keylist(out, kskdir)
+    cmd = ksr(zone, policy, "keygen", options=f"-K {kskdir} -i now -e +1y -o")
+    ksks = isctest.kasp.keystr_to_keylist(cmd.out, kskdir)
     assert len(ksks) == 2
 
     lifetime = timedelta(days=31 * 6)
@@ -1227,8 +1243,8 @@ def test_ksr_kskroll(ns1):
 
     # check that 'dnssec-ksr keygen' pregenerates right amount of keys
     zskdir = "ns1"
-    out, _ = ksr(zone, policy, "keygen", options=f"-K {zskdir} -i now -e +1y")
-    zsks = isctest.kasp.keystr_to_keylist(out, zskdir)
+    cmd = ksr(zone, policy, "keygen", options=f"-K {zskdir} -i now -e +1y")
+    zsks = isctest.kasp.keystr_to_keylist(cmd.out, zskdir)
     assert len(zsks) == 1
 
     check_keys(zsks, None)
@@ -1236,25 +1252,27 @@ def test_ksr_kskroll(ns1):
     # check that 'dnssec-ksr request' creates correct ksr
     now = zsks[0].get_timing("Created")
     until = now + timedelta(days=365)
-    out, _ = ksr(zone, policy, "request", options=f"-K {zskdir} -i {now} -e +1y")
-
-    fname = f"{zone}.ksr.{n}"
-    with open(fname, "w", encoding="utf-8") as file:
-        file.write(out)
-
-    check_keysigningrequest(out, zsks, now, until)
-
-    # check that 'dnssec-ksr sign' creates correct skr
-    out, _ = ksr(
-        zone, policy, "sign", options=f"-K {kskdir} -f {fname} -i {now} -e +1y"
+    ksr_fname = f"{zone}.ksr.{n}"
+    ksr(
+        zone,
+        policy,
+        "request",
+        options=f"-K {zskdir} -i {now} -e +1y",
+        to_file=ksr_fname,
     )
+    check_keysigningrequest(ksr_fname, zsks, now, until)
 
-    skrfile = f"{zone}.skr.{n}"
-    with open(skrfile, "w", encoding="utf-8") as file:
-        file.write(out)
-
+    # check that 'dnssec-ksr sign' creates correct skr
     refresh = -432000  # 5 days
-    check_signedkeyresponse(out, zone, ksks, zsks, now, until, refresh)
+    skr_fname = f"{zone}.skr.{n}"
+    ksr(
+        zone,
+        policy,
+        "sign",
+        options=f"-K {kskdir} -f {ksr_fname} -i {now} -e +1y",
+        to_file=skr_fname,
+    )
+    check_signedkeyresponse(skr_fname, zone, ksks, zsks, now, until, refresh)
 
     # add zone
     ns1.rndc(
@@ -1266,8 +1284,8 @@ def test_ksr_kskroll(ns1):
     )
 
     # import skr
-    shutil.copyfile(skrfile, f"ns1/{skrfile}")
-    ns1.rndc(f"skr -import {skrfile} {zone}", log=False)
+    shutil.copyfile(skr_fname, f"ns1/{skr_fname}")
+    ns1.rndc(f"skr -import {skr_fname} {zone}", log=False)
 
     # test zone is correctly signed
     # - check rndc dnssec -status output