"""
import os
-import random
-import shutil
from dnstest.utils import *
from dnstest.keys import Keymgr
+from dnstest.keystore import KeystorePEM
from dnstest.test import Test
def check_key_count(server, keystore, expected):
- ksdir = os.path.join(server.keydir, keystore)
try:
- files = len([name for name in os.listdir(ksdir)])
+ files = len([name for name in os.listdir(keystore.config())])
except FileNotFoundError:
files = 0
- compare(files, expected, "privkey count in %s" % ksdir)
+ compare(files, expected, "privkey count in %s" % keystore.id)
t = Test()
zone = t.zone("catalog.") # has zero TTL => faster key rollovers
t.link(zone, server)
+keys1 = KeystorePEM("keys1")
+keys2 = KeystorePEM("keys2")
+
server.dnssec(zone).enable = True
server.dnssec(zone).propagation_delay = 1
-server.dnssec(zone).keystore = [ "keys1", "keys2" ]
+server.dnssec(zone).keystore = [ keys1, keys2 ]
t.start()
serial = server.zone_wait(zone)
-check_key_count(server, "keys1", 2)
-check_key_count(server, "keys2", 0)
+check_key_count(server, keys1, 2)
+check_key_count(server, keys2, 0)
-server.dnssec(zone).keystore = [ "keys2", "keys1" ]
+server.dnssec(zone).keystore = [ keys2, keys1 ]
server.gen_confile()
server.reload()
server.ctl("zone-key-rollover %s zsk" % zone[0].name)
serial += 2 # wait for three increments which is whole ZSK rollover
serial = server.zone_wait(zone, serial)
-check_key_count(server, "keys1", 1)
-check_key_count(server, "keys2", 1)
+check_key_count(server, keys1, 1)
+check_key_count(server, keys2, 1)
backup_dir = os.path.join(server.dir, "backup1")
server.ctl("zone-backup +backupdir %s %s" % (backup_dir, zone[0].name), wait=True)
-shutil.rmtree(os.path.join(server.keydir, "keys1"))
-shutil.rmtree(os.path.join(server.keydir, "keys2"))
+keys1.clear()
+keys2.clear()
server.ctl("zone-restore +backupdir %s %s" % (backup_dir, zone[0].name), wait=True)
-check_key_count(server, "keys1", 0)
-check_key_count(server, "keys2", 2) # restore puts all keys to first configured keystore no matter where they were at backup
+check_key_count(server, keys1, 0)
+check_key_count(server, keys2, 2) # restore puts all keys to first configured keystore no matter where they were at backup
server.ctl("zone-sign %s" % zone[0].name, wait=True) # check that signing still works after restore
serial = server.zone_wait(zone, serial)
server.flush(zone[0], wait=True)
server.zone_verify(zone[0])
-server.dnssec(zone).keystore = [ "keys0ksk", "keys1", "keys2" ]
+keys0ksk = KeystorePEM("keys0ksk", ksk_only=True)
+
+server.dnssec(zone).keystore = [ keys0ksk, keys1, keys2 ]
server.gen_confile()
server.reload()
server.ctl("zone-key-rollover %s ksk" % zone[0].name)
serial += 1 # wait for two increments
serial = server.zone_wait(zone, serial)
-check_key_count(server, "keys0ksk", 1)
-check_key_count(server, "keys1", 0)
-check_key_count(server, "keys2", 2)
+check_key_count(server, keys0ksk, 1)
+check_key_count(server, keys1, 0)
+check_key_count(server, keys2, 2)
server.ctl("zone-ksk-submitted %s" % zone[0].name)
serial = server.zone_wait(zone, serial)
-check_key_count(server, "keys0ksk", 1)
-check_key_count(server, "keys1", 0)
-check_key_count(server, "keys2", 1)
+check_key_count(server, keys0ksk, 1)
+check_key_count(server, keys1, 0)
+check_key_count(server, keys2, 1)
server.ctl("zone-key-rollover %s zsk" % zone[0].name)
serial += 2 # wait for three increments which is whole ZSK rollover
serial = server.zone_wait(zone, serial)
-check_key_count(server, "keys0ksk", 1)
-check_key_count(server, "keys1", 1)
-check_key_count(server, "keys2", 0)
+check_key_count(server, keys0ksk, 1)
+check_key_count(server, keys1, 1)
+check_key_count(server, keys2, 0)
Keymgr.run_check(server.confile, zone[0].name, "generate", "ksk=yes")
-check_key_count(server, "keys0ksk", 2)
-check_key_count(server, "keys1", 1)
+check_key_count(server, keys0ksk, 2)
+check_key_count(server, keys1, 1)
Keymgr.run_check(server.confile, zone[0].name, "generate", "ksk=no")
-check_key_count(server, "keys0ksk", 2)
-check_key_count(server, "keys1", 2)
+check_key_count(server, keys0ksk, 2)
+check_key_count(server, keys1, 2)
Keymgr.run_check(server.confile, zone[0].name, "import-bind", os.path.join(t.data_dir, "Kcatalog.+013+07147.key"))
-check_key_count(server, "keys0ksk", 2)
-check_key_count(server, "keys1", 3)
+check_key_count(server, keys0ksk, 2)
+check_key_count(server, keys1, 3)
Keymgr.run_check(server.confile, zone[0].name, "import-bind", os.path.join(t.data_dir, "Kcatalog.+013+18635.key"))
-check_key_count(server, "keys0ksk", 3)
-check_key_count(server, "keys1", 3)
+check_key_count(server, keys0ksk, 3)
+check_key_count(server, keys1, 3)
Keymgr.run_check(server.confile, zone[0].name, "import-pem", os.path.join(t.data_dir, "8329a00d5dceefdcbbf7b8a3cdf61fe944c51d6f.pem"), "ksk=yes")
-check_key_count(server, "keys0ksk", 4)
-check_key_count(server, "keys1", 3)
+check_key_count(server, keys0ksk, 4)
+check_key_count(server, keys1, 3)
Keymgr.run_check(server.confile, zone[0].name, "import-pem", os.path.join(t.data_dir, "894d4240398f459f59f4a99cd4c5b658c9a62d54.pem"), "ksk=no")
-check_key_count(server, "keys0ksk", 4)
-check_key_count(server, "keys1", 4)
-check_key_count(server, "keys2", 0)
+check_key_count(server, keys0ksk, 4)
+check_key_count(server, keys1, 4)
+check_key_count(server, keys2, 0)
t.end()
--- /dev/null
+#!/usr/bin/env python3
+
+import os
+import shutil
+from subprocess import Popen
+from dnstest.context import Context
+
+class Keystore(object):
+ def __init__(self, id: str, ksk_only: bool = None, key_label: bool = None):
+ self.id = id
+ self.ksk_only = ksk_only
+ self.key_label = key_label
+
+class KeystorePEM(Keystore):
+ def __init__(self, id: str, ksk_only: bool = None, key_label: bool = None):
+ super().__init__(id, ksk_only, key_label)
+
+ def config(self):
+ return os.path.join(Context().test.out_dir, f"{self.backend()}-{self.id}")
+
+ def backend(self):
+ return "pem"
+
+ def clear(self):
+ shutil.rmtree(self.config())
+
+class KeystoreSoftHSM(Keystore):
+ so_pin = "12345"
+
+ def __init__(self, id: str, ksk_only: bool = None, key_label: bool = None,
+ token: str = None, passwd: str = None, so_path: str = None):
+ super().__init__(id, ksk_only, key_label)
+ self.so_path = so_path if so_path else "/usr/lib/x86_64-linux-gnu/softhsm/libsofthsm2.so"
+ self.passwd = passwd if passwd else "1234"
+ self.token = token if token else "knot"
+ self.dir = os.path.join(Context().test.out_dir, f"{self.backend()}-{self.id}")
+ self.init()
+
+ def config(self):
+ return f"pkcs11:token={self.token};pin-value={self.passwd} {self.so_path}"
+
+ def backend(self):
+ return "pkcs11"
+
+ def config_file(self):
+ return os.path.join(self.dir, "softhsm.conf")
+
+ def clear(self):
+ shutil.rmtree(os.path.join(self.dir, "tokens"))
+
+ def init(self, keystore=None):
+ if not os.path.isdir(self.dir):
+ os.makedirs(os.path.join(self.dir, "tokens"))
+ with open(self.config_file(), "w") as conf_file:
+ config = (
+ f"directories.tokendir = {self.dir}/tokens/\n"
+ "objectstore.backend = file\n"
+ "log.level = INFO\n"
+ )
+ conf_file.write(config)
+
+ if keystore:
+ self.clear()
+ shutil.copytree(os.path.join(keystore.dir, "tokens"), os.path.join(self.dir, "tokens"))
+ else:
+ init_process = Popen(
+ ['softhsm2-util', '--init-token', '--free', f'--label={self.token}',
+ f'--pin={self.passwd}', f'--so-pin={self.so_pin}', f'--module={self.so_path}'],
+ stdout=open(os.path.join(self.dir, "stdout"), mode='a'),
+ stderr=open(os.path.join(self.dir, "stderr"), mode='a'),
+ env=dict(os.environ, SOFTHSM2_CONF=self.config_file()))
+ init_process.wait()
+
+ def link(self, server):
+ server.softhsm_conf = self.config_file()