def ksk_remove():
isctest.log.info("remove the KSK from disk")
- os.rename(f"ns2/{KSK}.key", f"ns2/{KSK}.key.bak")
- os.rename(f"ns2/{KSK}.private", f"ns2/{KSK}.private.bak")
+ os.rename(f"ns2/{ksk}.key", f"ns2/{ksk}.key.bak")
+ os.rename(f"ns2/{ksk}.private", f"ns2/{ksk}.private.bak")
def ksk_recover():
isctest.log.info("put back the KSK")
- os.rename(f"ns2/{KSK}.key.bak", f"ns2/{KSK}.key")
- os.rename(f"ns2/{KSK}.private.bak", f"ns2/{KSK}.private")
+ os.rename(f"ns2/{ksk}.key.bak", f"ns2/{ksk}.key")
+ os.rename(f"ns2/{ksk}.private.bak", f"ns2/{ksk}.private")
def loadkeys():
pattern = Re(f"{zone}/IN.*next key event")
ksk_only_types = ["DNSKEY", "CDNSKEY", "CDS"]
zone = "updatecheck-kskonly.secure"
- KSK = getfrom(f"ns2/{zone}.ksk.key")
- ZSK = getfrom(f"ns2/{zone}.zsk.key")
- KSKID = int(getfrom(f"ns2/{zone}.ksk.id"))
- ZSKID = int(getfrom(f"ns2/{zone}.zsk.id"))
+ ksk = getfrom(f"ns2/{zone}.ksk.key")
+ zsk = getfrom(f"ns2/{zone}.zsk.key")
+ ksk_id = int(getfrom(f"ns2/{zone}.ksk.id"))
+ zsk_id = int(getfrom(f"ns2/{zone}.zsk.id"))
# set key state for KSK. the ZSK rollovers below assume that there is a
# chain of trust established, so we tell named that the DS is in
# omnipresent state.
timings = SettimeOptions(d="OMNIPRESENT now")
- setkeytimes(KSK, timings, keydir="ns2")
+ setkeytimes(ksk, timings, keydir="ns2")
isctest.log.info("check state before KSK is made offline")
isctest.log.info("make sure certain types are signed with KSK only")
- check_signing_keys(ksk_only_types, expect=[KSKID], prohibit=[ZSKID])
+ check_signing_keys(ksk_only_types, expect=[ksk_id], prohibit=[zsk_id])
isctest.log.info("check SOA is signed with ZSK only")
- check_signing_keys(["SOA"], expect=[ZSKID], prohibit=[KSKID])
+ check_signing_keys(["SOA"], expect=[zsk_id], prohibit=[ksk_id])
isctest.log.info("roll the ZSK")
- ZSK2 = keygen(
+ zsk_2 = keygen(
"-qKns2",
"-Pnone",
"-Anone",
os.environ["DEFAULT_BITS"],
zone,
)
- ZSKID2 = getkeyid(ZSK2)
+ zsk_2_id = getkeyid(zsk_2)
isctest.log.info("prepublish new ZSK")
- ns2.rndc(f"dnssec -rollover -key {ZSKID} {zone}")
+ ns2.rndc(f"dnssec -rollover -key {zsk_id} {zone}")
isctest.run.retry_with_timeout(check_zskcount, 5)
isctest.log.info("make the new ZSK active")
timings = SettimeOptions(I="now")
- setkeytimes(ZSK, timings, keydir="ns2")
+ setkeytimes(zsk, timings, keydir="ns2")
timings = SettimeOptions(
A="now",
k="OMNIPRESENT now",
)
- setkeytimes(ZSK2, timings, keydir="ns2")
+ setkeytimes(zsk_2, timings, keydir="ns2")
loadkeys()
with ns2.watch_log_from_start() as watcher:
watcher.wait_for_line(
- [f"{ZSKID2} (ZSK) is now active", f"{ZSKID} (ZSK) is now inactive"]
+ [f"{zsk_2_id} (ZSK) is now active", f"{zsk_id} (ZSK) is now inactive"]
)
ksk_remove()
"redo the tests now that the zone is updated and the KSK is offline"
)
isctest.log.info("make sure certain types are signed with KSK only")
- check_signing_keys(ksk_only_types, expect=[KSKID], prohibit=[ZSKID, ZSKID2])
+ check_signing_keys(ksk_only_types, expect=[ksk_id], prohibit=[zsk_id, zsk_2_id])
isctest.log.info("check TXT, SOA are signed with ZSK2 only")
def check_txt_soa_zsk2():
return check_signing_keys(
- ["TXT", "SOA"], expect=[ZSKID2], prohibit=[KSKID, ZSKID]
+ ["TXT", "SOA"], expect=[zsk_2_id], prohibit=[ksk_id, zsk_id]
)
isctest.run.retry_with_timeout(check_txt_soa_zsk2, 5)
ksk_recover()
isctest.log.info("roll the ZSK again")
- ZSK3 = keygen(
+ zsk_3 = keygen(
"-qKns2",
"-Pnone",
"-Anone",
os.environ["DEFAULT_BITS"],
zone,
)
- ZSKID3 = getkeyid(ZSK3)
+ zsk_3_id = getkeyid(zsk_3)
isctest.log.info("delete old ZSK, schedule ZSK2 inactive, pre-publish ZSK3")
z="HIDDEN now",
D="now",
)
- setkeytimes(ZSK, timings, keydir="ns2")
+ setkeytimes(zsk, timings, keydir="ns2")
timings = SettimeOptions(
k="OMNIPRESENT now",
z="OMNIPRESENT now",
)
- setkeytimes(ZSK2, timings, keydir="ns2")
+ setkeytimes(zsk_2, timings, keydir="ns2")
loadkeys()
- ns2.rndc(f"dnssec -rollover -key {ZSKID2} {zone}")
+ ns2.rndc(f"dnssec -rollover -key {zsk_2_id} {zone}")
with ns2.watch_log_from_start() as watcher:
- watcher.wait_for_line(f"{ZSKID3} (ZSK) is now published")
+ watcher.wait_for_line(f"{zsk_3_id} (ZSK) is now published")
ksk_remove()
isctest.log.info("redo the tests now that the ZSK roll has deleted the old key")
isctest.log.info("make sure certain types are signed with KSK only")
- check_signing_keys(ksk_only_types, expect=[KSKID], prohibit=[ZSKID, ZSKID2, ZSKID3])
+ check_signing_keys(
+ ksk_only_types, expect=[ksk_id], prohibit=[zsk_id, zsk_2_id, zsk_3_id]
+ )
isctest.log.info("check A, TXT, SOA are signed with ZSK2 only")
def check_a_txt_soa_zsk2():
return check_signing_keys(
- ["A", "TXT", "SOA"], expect=[ZSKID2], prohibit=[KSKID, ZSKID, ZSKID3]
+ ["A", "TXT", "SOA"], expect=[zsk_2_id], prohibit=[ksk_id, zsk_id, zsk_3_id]
)
isctest.run.retry_with_timeout(check_a_txt_soa_zsk2, 5)
isctest.log.info("make ZSK3 active")
timings = SettimeOptions(I="now")
- setkeytimes(ZSK2, timings, keydir="ns2")
+ setkeytimes(zsk_2, timings, keydir="ns2")
timings = SettimeOptions(
k="OMNIPRESENT now",
A="now",
)
- setkeytimes(ZSK3, timings, keydir="ns2")
+ setkeytimes(zsk_3, timings, keydir="ns2")
loadkeys()
with ns2.watch_log_from_start() as watcher:
watcher.wait_for_line(
- [f"{ZSKID3} (ZSK) is now active", f"{ZSKID2} (ZSK) is now inactive"]
+ [f"{zsk_3_id} (ZSK) is now active", f"{zsk_2_id} (ZSK) is now inactive"]
)
ksk_remove()
isctest.log.info("redo the tests one last time")
isctest.log.info("make sure certain types are signed with KSK only")
- check_signing_keys(ksk_only_types, expect=[KSKID], prohibit=[ZSKID, ZSKID2, ZSKID3])
+ check_signing_keys(
+ ksk_only_types, expect=[ksk_id], prohibit=[zsk_id, zsk_2_id, zsk_3_id]
+ )
isctest.log.info("check A, TXT, SOA are signed with ZSK2 only")
def check_aaaa_a_txt_soa_zsk3():
return check_signing_keys(
["AAAA", "A", "TXT", "SOA"],
- expect=[ZSKID3],
- prohibit=[KSKID, ZSKID, ZSKID2],
+ expect=[zsk_3_id],
+ prohibit=[ksk_id, zsk_id, zsk_2_id],
)
isctest.run.retry_with_timeout(check_aaaa_a_txt_soa_zsk3, 5)
# global start-time variable
# pylint: disable=global-statement
-# pylint: disable=global-variable-not-assigned
-start = 0
+START = 0
def test_initial():
def test_nta_install(servers):
- global start
+ global START
ns4 = servers["ns4"]
ns4.rndc("nta -f -l 20s bogus.example")
response = ns4.rndc("nta -d")
assert len(response.out.splitlines()) == 5
- start = time.time()
+ START = time.time()
def test_nta_behavior(servers):
- global start
- assert start, "test_nta_behavior must be run as part of the full NTA test"
+ assert START, "test_nta_behavior must be run as part of the full NTA test"
m = isctest.query.create("a.bogus.example.", "A")
res = isctest.query.tcp(m, "10.53.0.4")
# is configured to 9s, so at t=10 the NTAs for secure.example and
# fakenode.secure.example should both be lifted, while badds.example
# should still be going.
- delay = start + 10 - time.time()
+ delay = START + 10 - time.time()
if delay > 0:
time.sleep(delay)
# bogus.example was set to expire in 20s, so at t=13
# it should still be NTA'd, but badds.example used the default
# lifetime of 12s, so it should revert to SERVFAIL now.
- delay = start + 13 - time.time()
+ delay = START + 13 - time.time()
if delay > 0:
time.sleep(delay)
isctest.check.adflag(res)
# at t=21, all the NTAs should have expired.
- delay = start + 21 - time.time()
+ delay = START + 21 - time.time()
if delay > 0:
time.sleep(delay)
def test_nta_restarts(servers):
- global start
- assert start, "test_nta_restarts must be run as part of the full NTA test"
+ global START
+ assert START, "test_nta_restarts must be run as part of the full NTA test"
# test NTA persistence across restarts
ns4 = servers["ns4"]
response = ns4.rndc("nta -d")
assert active(response.out) == 0
- start = time.time()
+ START = time.time()
ns4.rndc("nta -f -l 30s bogus.example")
ns4.rndc("nta -f -l 10s badds.example")
response = ns4.rndc("nta -d")
# wait 14s before restarting. badds.example's NTA (lifetime=10s) should
# have expired, and bogus.example should still be running.
- delay = start + 14 - time.time()
+ delay = START + 14 - time.time()
if delay > 0:
time.sleep(delay)
ns4.start(["--noclean", "--restart", "--port", os.environ["PORT"]])
def test_nta_regular(servers):
- global start
- assert start, "test_nta_regular must be run as part of the full NTA test"
+ global START
+ assert START, "test_nta_regular must be run as part of the full NTA test"
# check "regular" attribute in NTA file
ns4 = servers["ns4"]
# nta-recheck is configured as 9s, so at t=12 the NTA for
# secure.example. should be lifted as it is not a "forced" NTA.
- start = time.mktime(now)
- delay = start + 12 - time.time()
+ START = time.mktime(now)
+ delay = START + 12 - time.time()
if delay > 0:
time.sleep(delay)
def test_nta_forced(servers):
- global start
- assert start, "test_nta_regular must be run as part of the full NTA test"
+ global START
+ assert START, "test_nta_regular must be run as part of the full NTA test"
# check "forced" attribute in NTA file
ns4 = servers["ns4"]
# nta-recheck is configured as 9s. at t=12 the NTA for
# secure.example. should NOT be lifted as it is "forced".
- start = time.mktime(now)
- delay = start + 12 - time.time()
+ START = time.mktime(now)
+ delay = START + 12 - time.time()
if delay > 0:
time.sleep(delay)