]> git.ipfire.org Git - thirdparty/knot-dns.git/commitdiff
tests-extra: add SoftHSM backend support 1837/head
authorJan Hák <jan.hak@nic.cz>
Fri, 21 Nov 2025 12:28:01 +0000 (13:28 +0100)
committerDaniel Salzman <daniel.salzman@nic.cz>
Fri, 12 Dec 2025 15:32:49 +0000 (15:32 +0000)
tests-extra/README
tests-extra/data/knot.supp
tests-extra/tests/dnssec/keystores/test.py
tests-extra/tests/dnssec/softhsm/test.py [new file with mode: 0644]
tests-extra/tools/dnstest/__init__.py
tests-extra/tools/dnstest/keystore.py [new file with mode: 0644]
tests-extra/tools/dnstest/params.py
tests-extra/tools/dnstest/server.py

index b4c4829735955646278d44584bb3f35d65bee2ab..55e4164afe09090185645067e751c0c0e7941752 100644 (file)
@@ -13,6 +13,7 @@ ldnsutils
 lsof
 gawk
 objdump
+softhsm2
 (valgrind)
 (gdb)
 
index 62dff053c55e313c4b5ead292bf75d74ba3b1c76..dc8ae402729278e182081646480150a28ce30689 100644 (file)
    ...
    fun:allocate_stack
 }
+
+{
+   libgnutls/p11_load_module
+   Memcheck:Leak
+   match-leak-kinds: definite,reachable
+   fun:*alloc
+   ...
+   fun:gnutls_pkcs11_add_provider
+}
+
+{
+   libstdc++/exit
+   Memcheck:Leak
+   match-leak-kinds: reachable
+   fun:malloc
+   ...
+   fun:exit
+}
index 6c628aa4deef763d4f073494a00a6e34db96905e..3b57e5723453842fcb505f742aecab8e39222eaf 100644 (file)
@@ -5,19 +5,17 @@ Check of multi-keystore operation.
 """
 
 import os
-import random
-import shutil
 from dnstest.utils import *
 from dnstest.keys import Keymgr
+from dnstest.keystore import KeystorePEM
 from dnstest.test import Test
 
 def check_key_count(server, keystore, expected):
-    ksdir = os.path.join(server.keydir, keystore)
     try:
-        files = len([name for name in os.listdir(ksdir)])
+        files = len([name for name in os.listdir(keystore.config())])
     except FileNotFoundError:
         files = 0
-    compare(files, expected, "privkey count in %s" % ksdir)
+    compare(files, expected, "privkey count in %s" % keystore.id)
 
 t = Test()
 
@@ -25,17 +23,20 @@ server = t.server("knot")
 zone = t.zone("catalog.") # has zero TTL => faster key rollovers
 t.link(zone, server)
 
+keys1 = KeystorePEM("keys1")
+keys2 = KeystorePEM("keys2")
+
 server.dnssec(zone).enable = True
 server.dnssec(zone).propagation_delay = 1
-server.dnssec(zone).keystore = [ "keys1", "keys2" ]
+server.dnssec(zone).keystore = [ keys1, keys2 ]
 
 t.start()
 serial = server.zone_wait(zone)
 
-check_key_count(server, "keys1", 2)
-check_key_count(server, "keys2", 0)
+check_key_count(server, keys1, 2)
+check_key_count(server, keys2, 0)
 
-server.dnssec(zone).keystore = [ "keys2", "keys1" ]
+server.dnssec(zone).keystore = [ keys2, keys1 ]
 server.gen_confile()
 server.reload()
 server.ctl("zone-key-rollover %s zsk" % zone[0].name)
@@ -43,17 +44,17 @@ server.ctl("zone-key-rollover %s zsk" % zone[0].name)
 serial += 2 # wait for three increments which is whole ZSK rollover
 serial = server.zone_wait(zone, serial)
 
-check_key_count(server, "keys1", 1)
-check_key_count(server, "keys2", 1)
+check_key_count(server, keys1, 1)
+check_key_count(server, keys2, 1)
 
 backup_dir = os.path.join(server.dir, "backup1")
 server.ctl("zone-backup +backupdir %s %s" % (backup_dir, zone[0].name), wait=True)
-shutil.rmtree(os.path.join(server.keydir, "keys1"))
-shutil.rmtree(os.path.join(server.keydir, "keys2"))
+keys1.clear()
+keys2.clear()
 server.ctl("zone-restore +backupdir %s %s" % (backup_dir, zone[0].name), wait=True)
 
-check_key_count(server, "keys1", 0)
-check_key_count(server, "keys2", 2) # restore puts all keys to first configured keystore no matter where they were at backup
+check_key_count(server, keys1, 0)
+check_key_count(server, keys2, 2) # restore puts all keys to first configured keystore no matter where they were at backup
 
 server.ctl("zone-sign %s" % zone[0].name, wait=True) # check that signing still works after restore
 serial = server.zone_wait(zone, serial)
@@ -61,53 +62,55 @@ serial = server.zone_wait(zone, serial)
 server.flush(zone[0], wait=True)
 server.zone_verify(zone[0])
 
-server.dnssec(zone).keystore = [ "keys0ksk", "keys1", "keys2" ]
+keys0ksk = KeystorePEM("keys0ksk", ksk_only=True)
+
+server.dnssec(zone).keystore = [ keys0ksk, keys1, keys2 ]
 server.gen_confile()
 server.reload()
 
 server.ctl("zone-key-rollover %s ksk" % zone[0].name)
 serial += 1 # wait for two increments
 serial = server.zone_wait(zone, serial)
-check_key_count(server, "keys0ksk", 1)
-check_key_count(server, "keys1", 0)
-check_key_count(server, "keys2", 2)
+check_key_count(server, keys0ksk, 1)
+check_key_count(server, keys1, 0)
+check_key_count(server, keys2, 2)
 
 server.ctl("zone-ksk-submitted %s" % zone[0].name)
 serial = server.zone_wait(zone, serial)
-check_key_count(server, "keys0ksk", 1)
-check_key_count(server, "keys1", 0)
-check_key_count(server, "keys2", 1)
+check_key_count(server, keys0ksk, 1)
+check_key_count(server, keys1, 0)
+check_key_count(server, keys2, 1)
 
 server.ctl("zone-key-rollover %s zsk" % zone[0].name)
 serial += 2 # wait for three increments which is whole ZSK rollover
 serial = server.zone_wait(zone, serial)
-check_key_count(server, "keys0ksk", 1)
-check_key_count(server, "keys1", 1)
-check_key_count(server, "keys2", 0)
+check_key_count(server, keys0ksk, 1)
+check_key_count(server, keys1, 1)
+check_key_count(server, keys2, 0)
 
 Keymgr.run_check(server.confile, zone[0].name, "generate", "ksk=yes")
-check_key_count(server, "keys0ksk", 2)
-check_key_count(server, "keys1", 1)
+check_key_count(server, keys0ksk, 2)
+check_key_count(server, keys1, 1)
 
 Keymgr.run_check(server.confile, zone[0].name, "generate", "ksk=no")
-check_key_count(server, "keys0ksk", 2)
-check_key_count(server, "keys1", 2)
+check_key_count(server, keys0ksk, 2)
+check_key_count(server, keys1, 2)
 
 Keymgr.run_check(server.confile, zone[0].name, "import-bind", os.path.join(t.data_dir, "Kcatalog.+013+07147.key"))
-check_key_count(server, "keys0ksk", 2)
-check_key_count(server, "keys1", 3)
+check_key_count(server, keys0ksk, 2)
+check_key_count(server, keys1, 3)
 
 Keymgr.run_check(server.confile, zone[0].name, "import-bind", os.path.join(t.data_dir, "Kcatalog.+013+18635.key"))
-check_key_count(server, "keys0ksk", 3)
-check_key_count(server, "keys1", 3)
+check_key_count(server, keys0ksk, 3)
+check_key_count(server, keys1, 3)
 
 Keymgr.run_check(server.confile, zone[0].name, "import-pem", os.path.join(t.data_dir, "8329a00d5dceefdcbbf7b8a3cdf61fe944c51d6f.pem"), "ksk=yes")
-check_key_count(server, "keys0ksk", 4)
-check_key_count(server, "keys1", 3)
+check_key_count(server, keys0ksk, 4)
+check_key_count(server, keys1, 3)
 
 Keymgr.run_check(server.confile, zone[0].name, "import-pem", os.path.join(t.data_dir, "894d4240398f459f59f4a99cd4c5b658c9a62d54.pem"), "ksk=no")
-check_key_count(server, "keys0ksk", 4)
-check_key_count(server, "keys1", 4)
-check_key_count(server, "keys2", 0)
+check_key_count(server, keys0ksk, 4)
+check_key_count(server, keys1, 4)
+check_key_count(server, keys2, 0)
 
 t.end()
diff --git a/tests-extra/tests/dnssec/softhsm/test.py b/tests-extra/tests/dnssec/softhsm/test.py
new file mode 100644 (file)
index 0000000..515951a
--- /dev/null
@@ -0,0 +1,56 @@
+#!/usr/bin/env python3
+
+"""
+Backup and restore of SoftHSM keystores.
+"""
+
+from dnstest.utils import *
+from dnstest.keystore import KeystoreSoftHSM
+from dnstest.test import Test
+
+t = Test()
+
+knot1 = t.server("knot")
+knot2 = t.server("knot")
+zone = t.zone("example.com")
+t.link(zone, knot1)
+t.link(zone, knot2)
+
+keys1 = KeystoreSoftHSM("keys1")
+keys1.link(knot1)
+keys2 = KeystoreSoftHSM("keys2")
+keys2.link(knot2)
+
+knot1.dnssec(zone).enable = True
+knot1.dnssec(zone).keystore = [ keys1 ]
+
+t.start()
+
+# Wait for signed zone
+knot1.zone_wait(zone)
+resp = knot1.dig(zone[0].name, "DNSKEY")
+resp.check_count(2, "DNSKEY")
+
+# Wait for unsigned zone
+serial = knot2.zone_wait(zone)
+resp = knot2.dig(zone[0].name, "DNSKEY")
+resp.check_count(0, "DNSKEY")
+
+backup_dir = os.path.join(knot1.dir, "backup")
+knot1.ctl("zone-backup +keysonly +backupdir %s %s" % (backup_dir, zone[0].name), wait=True)
+
+keys2.init(keys1) # Synchronize tokens directory between SoftHSMs
+knot2.ctl("zone-restore +keysonly +backupdir %s %s" % (backup_dir, zone[0].name), wait=True)
+
+# Enable signing with initial keys from the backup
+knot2.dnssec(zone).enable = True
+knot2.dnssec(zone).keystore = [ keys2 ]
+knot2.gen_confile()
+knot2.reload()
+
+# Check the keysets match
+knot2.zone_wait(zone, serial)
+resp = knot2.dig(zone[0].name, "DNSKEY")
+resp.cmp(knot1)
+
+t.end()
index c91a5aa575a54c8596a42187aedc6da9f3685e35..9ddbc40e70c57337e18f1cd5bb325ca708493d9f 100644 (file)
@@ -3,6 +3,7 @@ __all__ = [
     "context",
     "inquirer",
     "keys",
+    "keystore",
     "knsupdate",
     "libknot",
     "module",
diff --git a/tests-extra/tools/dnstest/keystore.py b/tests-extra/tools/dnstest/keystore.py
new file mode 100644 (file)
index 0000000..7df3ce5
--- /dev/null
@@ -0,0 +1,75 @@
+#!/usr/bin/env python3
+
+import os
+import shutil
+from subprocess import Popen
+from dnstest.context import Context
+
+class Keystore(object):
+    def __init__(self, id: str, ksk_only: bool = None, key_label: bool = None):
+        self.id = id
+        self.ksk_only = ksk_only
+        self.key_label = key_label
+
+class KeystorePEM(Keystore):
+    def __init__(self, id: str, ksk_only: bool = None, key_label: bool = None):
+        super().__init__(id, ksk_only, key_label)
+
+    def config(self):
+        return os.path.join(Context().test.out_dir, f"{self.backend()}-{self.id}")
+
+    def backend(self):
+        return "pem"
+
+    def clear(self):
+        shutil.rmtree(self.config())
+
+class KeystoreSoftHSM(Keystore):
+    so_pin = "12345"
+
+    def __init__(self, id: str, ksk_only: bool = None, key_label: bool = None,
+                 token: str = None, passwd: str = None, so_path: str = None):
+        super().__init__(id, ksk_only, key_label)
+        self.so_path = so_path if so_path else "/usr/lib/x86_64-linux-gnu/softhsm/libsofthsm2.so"
+        self.passwd = passwd if passwd else "1234"
+        self.token = token if token else "knot"
+        self.dir = os.path.join(Context().test.out_dir, f"{self.backend()}-{self.id}")
+        self.init()
+
+    def config(self):
+        return f"pkcs11:token={self.token};pin-value={self.passwd} {self.so_path}"
+
+    def backend(self):
+        return "pkcs11"
+
+    def config_file(self):
+        return os.path.join(self.dir, "softhsm.conf")
+
+    def clear(self):
+        shutil.rmtree(os.path.join(self.dir, "tokens"))
+
+    def init(self, keystore=None):
+        if not os.path.isdir(self.dir):
+            os.makedirs(os.path.join(self.dir, "tokens"))
+            with open(self.config_file(), "w") as conf_file:
+                config = (
+                    f"directories.tokendir = {self.dir}/tokens/\n"
+                     "objectstore.backend = file\n"
+                     "log.level = INFO\n"
+                )
+                conf_file.write(config)
+
+        if keystore:
+            self.clear()
+            shutil.copytree(os.path.join(keystore.dir, "tokens"), os.path.join(self.dir, "tokens"))
+        else:
+            init_process = Popen(
+                ['softhsm2-util', '--init-token', '--free', f'--label={self.token}',
+                 f'--pin={self.passwd}', f'--so-pin={self.so_pin}', f'--module={self.so_path}'],
+                    stdout=open(os.path.join(self.dir, "stdout"), mode='a'),
+                    stderr=open(os.path.join(self.dir, "stderr"), mode='a'),
+                    env=dict(os.environ, SOFTHSM2_CONF=self.config_file()))
+            init_process.wait()
+
+    def link(self, server):
+        server.softhsm_conf = self.config_file()
index 07ace0c21755fa7890507ac6a47a411fe418488b..39cb38ffe792161aaa335e303ff2915d23b1bcb3 100644 (file)
@@ -49,7 +49,7 @@ xdp = False
 valgrind_bin = get_binary("KNOT_TEST_VALGRIND", "valgrind")
 # KNOT_TEST_VALGRIND_FLAGS - valgrind flags.
 valgrind_flags = get_param("KNOT_TEST_VALGRIND_FLAGS",
-                           "--leak-check=full --show-leak-kinds=all --track-origins=yes --vgdb=yes --verbose --num-callers=20 --trace-children=yes --trace-children-skip=/usr/*sh,/bin/*sh")
+                           "--leak-check=full --show-leak-kinds=all --track-origins=yes --vgdb=yes --verbose --num-callers=20 --trace-children=yes --trace-children-skip=/usr/*sh,/bin/*sh --gen-suppressions=all")
 # KNOT_TEST_GDB - gdb binary.
 gdb_bin = get_binary("KNOT_TEST_GDB", "gdb")
 # KNOT_TEST_VGDB - vgdb binary.
index 5873e8d6518a4d0074ab8e25494681a06de42b54..8d0f1281100a7e0056ab4408e8046c67089bd590 100644 (file)
@@ -192,6 +192,7 @@ class Server(object):
         self.valgrind_log = None
         self.session_log = None
         self.confile = None
+        self.softhsm_conf = str()
 
         self.redis_backends = list()
 
@@ -353,7 +354,9 @@ class Server(object):
                                   self.start_params,
                                   stdout=open(self.fout, mode=mode),
                                   stderr=open(self.ferr, mode=mode),
-                                  env=dict(os.environ, SSLKEYLOGFILE=self.session_log))
+                                  env=dict(os.environ,
+                                           SSLKEYLOGFILE=self.session_log,
+                                           SOFTHSM2_CONF=self.softhsm_conf))
 
             if self.valgrind:
                 time.sleep(Server.START_WAIT_VALGRIND)
@@ -1663,10 +1666,13 @@ class Knot(Server):
                 s.begin("keystore")
                 have_keystore = True
             for ks in z.dnssec.keystore:
-                s.id_item("id", ks)
-                s.item("config", ks)
-                if ks.endswith("ksk"):
-                    s.item("ksk-only", "on")
+                s.id_item("id", ks.id)
+                s.item_type("config", ks.config())
+                s.item_type("backend", ks.backend())
+                if ks.ksk_only:
+                    s.item_type("ksk-only", ks.ksk_only)
+                if ks.key_label:
+                    s.item_type("key-label", ks.key_label)
         if have_keystore:
             s.end()
 
@@ -1677,9 +1683,11 @@ class Knot(Server):
             s.begin("policy")
             s.id_item("id", z.name)
             for ci, val in self.conf["policy"][z.name].items():
-                if ci not in [ "enable", "shared_policy_with" ]:
+                if ci not in [ "enable", "shared_policy_with", "keystore" ]:
                     s.item_type(ci.replace("_", "-"), val)
 
+            if len(z.dnssec.keystore or []) > 0:
+                s.item_list("keystore", [ v.id for v in z.dnssec.keystore ])
             if zone in self.conf["dnskey-sync"]:
                 s.item("dnskey-sync", zone)
             if zone in self.conf["submission"]: