From: Joseph Sutton Date: Fri, 8 Dec 2023 03:38:21 +0000 (+1300) Subject: tests/krb5: Add Python implementation and tests for Group Key Distribution Service X-Git-Tag: talloc-2.4.2~201 X-Git-Url: http://git.ipfire.org/gitweb.cgi?a=commitdiff_plain;h=080a62bba875c2a5df7c04703d095142200dad0e;p=thirdparty%2Fsamba.git tests/krb5: Add Python implementation and tests for Group Key Distribution Service Signed-off-by: Joseph Sutton Reviewed-by: Andrew Bartlett Autobuild-User(master): Andrew Bartlett Autobuild-Date(master): Thu Dec 21 21:19:30 UTC 2023 on atb-devel-224 --- diff --git a/python/samba/gkdi.py b/python/samba/gkdi.py new file mode 100644 index 00000000000..9e3abb58a2f --- /dev/null +++ b/python/samba/gkdi.py @@ -0,0 +1,397 @@ +# Unix SMB/CIFS implementation. +# Copyright (C) Catalyst.Net Ltd 2023 +# +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . +# + +"""Group Key Distribution Service module""" + +from enum import Enum +from functools import total_ordering +from typing import Final, Optional, Tuple + +from cryptography.hazmat.primitives import hashes + +from samba import _glue +from samba.dcerpc import gkdi, misc +from samba.ndr import ndr_pack, ndr_unpack +from samba.nt_time import NtTime, NtTimeDelta + + +uint64_max: Final[int] = 2**64 - 1 + +L1_KEY_ITERATION: Final[int] = _glue.GKDI_L1_KEY_ITERATION +L2_KEY_ITERATION: Final[int] = _glue.GKDI_L2_KEY_ITERATION +KEY_CYCLE_DURATION: Final[NtTimeDelta] = _glue.GKDI_KEY_CYCLE_DURATION +MAX_CLOCK_SKEW: Final[NtTimeDelta] = _glue.GKDI_MAX_CLOCK_SKEW + +KEY_LEN_BYTES: Final = 64 + + +class Algorithm(Enum): + SHA1 = "SHA1" + SHA256 = "SHA256" + SHA384 = "SHA384" + SHA512 = "SHA512" + + def algorithm(self) -> hashes.HashAlgorithm: + if self is Algorithm.SHA1: + return hashes.SHA1() + + if self is Algorithm.SHA256: + return hashes.SHA256() + + if self is Algorithm.SHA384: + return hashes.SHA384() + + if self is Algorithm.SHA512: + return hashes.SHA512() + + raise RuntimeError("unknown hash algorithm {self}") + + def __repr__(self) -> str: + return str(self) + + @staticmethod + def from_kdf_parameters(kdf_param: Optional[bytes]) -> "Algorithm": + if not kdf_param: + return Algorithm.SHA256 # the default used by Windows. + + kdf_parameters = ndr_unpack(gkdi.KdfParameters, kdf_param) + return Algorithm(kdf_parameters.hash_algorithm) + + +class GkidType(Enum): + DEFAULT = object() + L0_SEED_KEY = object() + L1_SEED_KEY = object() + L2_SEED_KEY = object() + + def description(self) -> str: + if self is GkidType.DEFAULT: + return "a default GKID" + + if self is GkidType.L0_SEED_KEY: + return "an L0 seed key" + + if self is GkidType.L1_SEED_KEY: + return "an L1 seed key" + + if self is GkidType.L2_SEED_KEY: + return "an L2 seed key" + + raise RuntimeError("unknown GKID type {self}") + + +class InvalidDerivation(Exception): + pass + + +class UndefinedStartTime(Exception): + pass + + +@total_ordering +class Gkid: + __slots__ = ["_l0_idx", "_l1_idx", "_l2_idx"] + + max_l0_idx: Final = 0x7FFF_FFFF + + def __init__(self, l0_idx: int, l1_idx: int, l2_idx: int) -> None: + if not -1 <= l0_idx <= Gkid.max_l0_idx: + raise ValueError(f"L0 index {l0_idx} out of range") + + if not -1 <= l1_idx < L1_KEY_ITERATION: + raise ValueError(f"L1 index {l1_idx} out of range") + + if not -1 <= l2_idx < L2_KEY_ITERATION: + raise ValueError(f"L2 index {l2_idx} out of range") + + if l0_idx == -1 and l1_idx != -1: + raise ValueError("invalid combination of negative and non‐negative indices") + + if l1_idx == -1 and l2_idx != -1: + raise ValueError("invalid combination of negative and non‐negative indices") + + self._l0_idx = l0_idx + self._l1_idx = l1_idx + self._l2_idx = l2_idx + + @property + def l0_idx(self) -> int: + return self._l0_idx + + @property + def l1_idx(self) -> int: + return self._l1_idx + + @property + def l2_idx(self) -> int: + return self._l2_idx + + def gkid_type(self) -> GkidType: + if self.l0_idx == -1: + return GkidType.DEFAULT + + if self.l1_idx == -1: + return GkidType.L0_SEED_KEY + + if self.l2_idx == -1: + return GkidType.L1_SEED_KEY + + return GkidType.L2_SEED_KEY + + def wrapped_l1_idx(self) -> int: + if self.l1_idx == -1: + return L1_KEY_ITERATION + + return self.l1_idx + + def wrapped_l2_idx(self) -> int: + if self.l2_idx == -1: + return L2_KEY_ITERATION + + return self.l2_idx + + def derive_l1_seed_key(self) -> "Gkid": + gkid_type = self.gkid_type() + if ( + gkid_type is not GkidType.L0_SEED_KEY + and gkid_type is not GkidType.L1_SEED_KEY + ): + raise InvalidDerivation( + "Invalid attempt to derive an L1 seed key from" + f" {gkid_type.description()}" + ) + + if self.l1_idx == 0: + raise InvalidDerivation("No further derivation of L1 seed keys is possible") + + return Gkid(self.l0_idx, self.wrapped_l1_idx() - 1, self.l2_idx) + + def derive_l2_seed_key(self) -> "Gkid": + gkid_type = self.gkid_type() + if ( + gkid_type is not GkidType.L1_SEED_KEY + and gkid_type is not GkidType.L2_SEED_KEY + ): + raise InvalidDerivation( + f"Attempt to derive an L2 seed key from {gkid_type.description()}" + ) + + if self.l2_idx == 0: + raise InvalidDerivation("No further derivation of L2 seed keys is possible") + + return Gkid(self.l0_idx, self.l1_idx, self.wrapped_l2_idx() - 1) + + def __str__(self) -> str: + return f"Gkid({self.l0_idx}, {self.l1_idx}, {self.l2_idx})" + + def __repr__(self) -> str: + cls = type(self) + return ( + f"{cls.__qualname__}({repr(self.l0_idx)}, {repr(self.l1_idx)}," + f" {repr(self.l2_idx)})" + ) + + def __eq__(self, other: object) -> bool: + if not isinstance(other, Gkid): + return NotImplemented + + return (self.l0_idx, self.l1_idx, self.l2_idx) == ( + other.l0_idx, + other.l1_idx, + other.l2_idx, + ) + + def __lt__(self, other: object) -> bool: + if not isinstance(other, Gkid): + return NotImplemented + + def as_tuple(gkid: Gkid) -> Tuple[int, int, int]: + l0_idx, l1_idx, l2_idx = gkid.l0_idx, gkid.l1_idx, gkid.l2_idx + + # DEFAULT is considered less than everything else, so that the + # lexical ordering requirement in [MS-GKDI] 3.1.4.1.3 (GetKey) makes + # sense. + if gkid.gkid_type() is not GkidType.DEFAULT: + # Use the wrapped indices so that L1 seed keys are considered + # greater than their children L2 seed keys, and L0 seed keys are + # considered greater than their children L1 seed keys. + l1_idx = gkid.wrapped_l1_idx() + l2_idx = gkid.wrapped_l2_idx() + + return l0_idx, l1_idx, l2_idx + + return as_tuple(self) < as_tuple(other) + + def __hash__(self) -> int: + return hash((self.l0_idx, self.l1_idx, self.l2_idx)) + + @staticmethod + def default() -> "Gkid": + return Gkid(-1, -1, -1) + + @staticmethod + def l0_seed_key(l0_idx: int) -> "Gkid": + return Gkid(l0_idx, -1, -1) + + @staticmethod + def l1_seed_key(l0_idx: int, l1_idx: int) -> "Gkid": + return Gkid(l0_idx, l1_idx, -1) + + @staticmethod + def from_nt_time(nt_time: NtTime) -> "Gkid": + l0 = nt_time // (L1_KEY_ITERATION * L2_KEY_ITERATION * KEY_CYCLE_DURATION) + l1 = ( + nt_time + % (L1_KEY_ITERATION * L2_KEY_ITERATION * KEY_CYCLE_DURATION) + // (L2_KEY_ITERATION * KEY_CYCLE_DURATION) + ) + l2 = nt_time % (L2_KEY_ITERATION * KEY_CYCLE_DURATION) // KEY_CYCLE_DURATION + + return Gkid(l0, l1, l2) + + def start_nt_time(self) -> NtTime: + gkid_type = self.gkid_type() + if gkid_type is not GkidType.L2_SEED_KEY: + raise UndefinedStartTime( + f"{gkid_type.description()} has no defined start time" + ) + + start_time = NtTime( + ( + self.l0_idx * L1_KEY_ITERATION * L2_KEY_ITERATION + + self.l1_idx * L2_KEY_ITERATION + + self.l2_idx + ) + * KEY_CYCLE_DURATION + ) + + if not 0 <= start_time <= uint64_max: + raise OverflowError(f"start time {start_time} out of range") + + return start_time + + +class SeedKeyPair: + __slots__ = ["l1_key", "l2_key", "gkid", "hash_algorithm", "root_key_id"] + + def __init__( + self, + l1_key: Optional[bytes], + l2_key: Optional[bytes], + gkid: Gkid, + hash_algorithm: Algorithm, + root_key_id: misc.GUID, + ) -> None: + if l1_key is not None and len(l1_key) != KEY_LEN_BYTES: + raise ValueError(f"L1 key ({repr(l1_key)}) must be {KEY_LEN_BYTES} bytes") + if l2_key is not None and len(l2_key) != KEY_LEN_BYTES: + raise ValueError(f"L2 key ({repr(l2_key)}) must be {KEY_LEN_BYTES} bytes") + + self.l1_key = l1_key + self.l2_key = l2_key + self.gkid = gkid + self.hash_algorithm = hash_algorithm + self.root_key_id = root_key_id + + def __str__(self) -> str: + l1_key_hex = None if self.l1_key is None else self.l1_key.hex() + l2_key_hex = None if self.l2_key is None else self.l2_key.hex() + + return ( + f"SeedKeyPair(L1Key({l1_key_hex}), L2Key({l2_key_hex}), {self.gkid}," + f" {self.root_key_id}, {self.hash_algorithm})" + ) + + def __repr__(self) -> str: + cls = type(self) + return ( + f"{cls.__qualname__}({repr(self.l1_key)}, {repr(self.l2_key)}," + f" {repr(self.gkid)}, {repr(self.hash_algorithm)}," + f" {repr(self.root_key_id)})" + ) + + def __eq__(self, other: object) -> bool: + if not isinstance(other, SeedKeyPair): + return NotImplemented + + return ( + self.l1_key, + self.l2_key, + self.gkid, + self.hash_algorithm, + self.root_key_id, + ) == ( + other.l1_key, + other.l2_key, + other.gkid, + other.hash_algorithm, + other.root_key_id, + ) + + def __hash__(self) -> int: + return hash(( + self.l1_key, + self.l2_key, + self.gkid, + self.hash_algorithm, + ndr_pack(self.root_key_id), + )) + + +class GroupKey: + __slots__ = ["gkid", "key", "hash_algorithm", "root_key_id"] + + def __init__( + self, key: bytes, gkid: Gkid, hash_algorithm: Algorithm, root_key_id: misc.GUID + ) -> None: + if key is not None and len(key) != KEY_LEN_BYTES: + raise ValueError(f"Key ({repr(key)}) must be {KEY_LEN_BYTES} bytes") + + self.key = key + self.gkid = gkid + self.hash_algorithm = hash_algorithm + self.root_key_id = root_key_id + + def __str__(self) -> str: + return ( + f"GroupKey(Key({self.key.hex()}), {self.gkid}, {self.hash_algorithm}," + f" {self.root_key_id})" + ) + + def __repr__(self) -> str: + cls = type(self) + return ( + f"{cls.__qualname__}({repr(self.key)}, {repr(self.gkid)}," + f" {repr(self.hash_algorithm)}, {repr(self.root_key_id)})" + ) + + def __eq__(self, other: object) -> bool: + if not isinstance(other, GroupKey): + return NotImplemented + + return (self.key, self.gkid, self.hash_algorithm, self.root_key_id) == ( + other.key, + other.gkid, + other.hash_algorithm, + other.root_key_id, + ) + + def __hash__(self) -> int: + return hash( + (self.key, self.gkid, self.hash_algorithm, ndr_pack(self.root_key_id)) + ) diff --git a/python/samba/tests/gkdi.py b/python/samba/tests/gkdi.py new file mode 100644 index 00000000000..53cd6146aa6 --- /dev/null +++ b/python/samba/tests/gkdi.py @@ -0,0 +1,644 @@ +# +# Helper classes for testing the Group Key Distribution Service. +# +# Copyright (C) Catalyst.Net Ltd 2023 +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . +# + +import sys +import os + +sys.path.insert(0, "bin/python") +os.environ["PYTHONUNBUFFERED"] = "1" + +import datetime +import secrets +from typing import Final, NewType, Optional, Tuple, Union + +import ldb + +from cryptography.hazmat.backends import default_backend +from cryptography.hazmat.primitives import hashes +from cryptography.hazmat.primitives.kdf.kbkdf import CounterLocation, KBKDFHMAC, Mode + +from samba import ( + HRES_E_INVALIDARG, + HRES_NTE_BAD_KEY, + HRES_NTE_NO_KEY, + ntstatus, + NTSTATUSError, + werror, +) +from samba.credentials import Credentials +from samba.dcerpc import gkdi, misc +from samba.gkdi import ( + Algorithm, + Gkid, + GkidType, + GroupKey, + KEY_CYCLE_DURATION, + KEY_LEN_BYTES, + MAX_CLOCK_SKEW, + SeedKeyPair, +) +from samba.ndr import ndr_pack, ndr_unpack +from samba.nt_time import ( + nt_time_from_datetime, + NtTime, + NtTimeDelta, + timedelta_from_nt_time_delta, +) +from samba.param import LoadParm +from samba.samdb import SamDB + +from samba.tests import delete_force, TestCase + + +HResult = NewType("HResult", int) +RootKey = NewType("RootKey", ldb.Message) + + +ROOT_KEY_START_TIME: Final = NtTime(KEY_CYCLE_DURATION + MAX_CLOCK_SKEW) + + +class GetKeyError(Exception): + def __init__(self, status: HResult, message: str): + super().__init__(status, message) + + +class GkdiBaseTest(TestCase): + # This is the NDR‐encoded security descriptor O:SYD:(A;;FRFW;;;S-1-5-9). + gmsa_sd = ( + b"\x01\x00\x04\x800\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00" + b"\x14\x00\x00\x00\x02\x00\x1c\x00\x01\x00\x00\x00\x00\x00\x14\x00" + b"\x9f\x01\x12\x00\x01\x01\x00\x00\x00\x00\x00\x05\t\x00\x00\x00" + b"\x01\x01\x00\x00\x00\x00\x00\x05\x12\x00\x00\x00" + ) + + @staticmethod + def current_time(offset: Optional[datetime.timedelta] = None) -> datetime.datetime: + if offset is None: + # Allow for clock skew. + offset = timedelta_from_nt_time_delta(MAX_CLOCK_SKEW) + + current_time = datetime.datetime.now(tz=datetime.timezone.utc) + return current_time + offset + + def current_nt_time(self, offset: Optional[datetime.timedelta] = None) -> NtTime: + return nt_time_from_datetime(self.current_time(offset)) + + def current_gkid(self, offset: Optional[datetime.timedelta] = None) -> Gkid: + return Gkid.from_nt_time(self.current_nt_time(offset)) + + def gkdi_connect( + self, host: str, lp: LoadParm, server_creds: Credentials + ) -> gkdi.gkdi: + try: + return gkdi.gkdi(f"ncacn_ip_tcp:{host}[seal]", lp, server_creds) + except NTSTATUSError as err: + if err.args[0] == ntstatus.NT_STATUS_PORT_UNREACHABLE: + self.fail( + "Try starting the Microsoft Key Distribution Service (KdsSvc).\n" + "In PowerShell, run:\n\tStart-Service -Name KdsSvc" + ) + + raise + + def rpc_get_key( + self, + conn: gkdi.gkdi, + target_sd: bytes, + root_key_id: Optional[misc.GUID], + gkid: Gkid, + ) -> SeedKeyPair: + out_len, out, result = conn.GetKey( + list(target_sd), root_key_id, gkid.l0_idx, gkid.l1_idx, gkid.l2_idx + ) + result_code, result_string = result + if ( + root_key_id is None + and result_code & 0xFFFF == werror.WERR_TOO_MANY_OPEN_FILES + ): + self.fail( + "The server has given up selecting a root key because there are too" + " many keys (more than 1000) in the Master Root Keys container. Delete" + " some root keys and try again." + ) + if result != (0, None): + raise GetKeyError(result_code, result_string) + self.assertEqual(len(out), out_len, "output len mismatch") + + envelope = ndr_unpack(gkdi.GroupKeyEnvelope, bytes(out)) + + gkid = Gkid(envelope.l0_index, envelope.l1_index, envelope.l2_index) + l1_key = bytes(envelope.l1_key) if envelope.l1_key else None + l2_key = bytes(envelope.l2_key) if envelope.l2_key else None + + hash_algorithm = Algorithm.from_kdf_parameters(bytes(envelope.kdf_parameters)) + + root_key_id = envelope.root_key_id + + return SeedKeyPair(l1_key, l2_key, gkid, hash_algorithm, root_key_id) + + def get_root_key_object( + self, samdb: SamDB, root_key_id: Optional[misc.GUID], gkid: Gkid + ) -> Tuple[RootKey, misc.GUID]: + """Return a root key object and its corresponding GUID. + + *root_key_id* specifies the GUID of the root key object to return. It + can be ``None`` to indicate that the selected key should be the most + recently created key starting not after the time indicated by *gkid*. + + Bear in mind as that the Microsoft Key Distribution Service caches root + keys, the most recently created key might not be the one that Windows + chooses.""" + + root_key_attrs = [ + "cn", + "msKds-CreateTime", + "msKds-KDFAlgorithmID", + "msKds-KDFParam", + "msKds-RootKeyData", + "msKds-UseStartTime", + "msKds-Version", + ] + + gkid_start_nt_time = gkid.start_nt_time() + + exact_key_specified = root_key_id is not None + if exact_key_specified: + root_key_dn = self.get_root_key_container_dn(samdb) + root_key_dn.add_child(f"CN={root_key_id}") + + try: + root_key_res = samdb.search( + root_key_dn, scope=ldb.SCOPE_BASE, attrs=root_key_attrs + ) + except ldb.LdbError as err: + if err.args[0] == ldb.ERR_NO_SUCH_OBJECT: + raise GetKeyError(HRES_NTE_NO_KEY, "no such root key exists") + + raise + + root_key_object = root_key_res[0] + else: + root_keys = samdb.search( + self.get_root_key_container_dn(samdb), + scope=ldb.SCOPE_SUBTREE, + expression=f"(msKds-UseStartTime<={gkid_start_nt_time})", + attrs=root_key_attrs, + ) + if not root_keys: + raise GetKeyError( + HRES_NTE_NO_KEY, "no root keys exist at specified time" + ) + + def root_key_create_time(key: RootKey) -> NtTime: + create_time = key.get("msKds-CreateTime", idx=0) + if create_time is None: + return NtTime(0) + + return NtTime(int(create_time)) + + root_key_object = max(root_keys, key=root_key_create_time) + + root_key_cn = root_key_object.get("cn", idx=0) + self.assertIsNotNone(root_key_cn) + root_key_id = misc.GUID(root_key_cn) + + use_start_nt_time = NtTime( + int(root_key_object.get("msKds-UseStartTime", idx=0)) + ) + if use_start_nt_time == 0: + raise GetKeyError(HRES_NTE_BAD_KEY, "root key effective time is 0") + use_start_nt_time = NtTime( + use_start_nt_time - NtTimeDelta(KEY_CYCLE_DURATION + MAX_CLOCK_SKEW) + ) + + if exact_key_specified and not (0 <= use_start_nt_time <= gkid_start_nt_time): + raise GetKeyError(HRES_E_INVALIDARG, "root key is not yet valid") + + return root_key_object, root_key_id + + def validate_get_key_request( + self, gkid: Gkid, current_gkid: Gkid, root_key_specified: bool + ) -> None: + if gkid > current_gkid: + raise GetKeyError( + HRES_E_INVALIDARG, "invalid request for a key from the future" + ) + + gkid_type = gkid.gkid_type() + if gkid_type is GkidType.DEFAULT: + derived_from = ( + " derived from the specified root key" if root_key_specified else "" + ) + raise NotImplementedError( + f"The latest group key{derived_from} is being requested." + ) + + if gkid_type is not GkidType.L2_SEED_KEY: + raise GetKeyError( + HRES_E_INVALIDARG, f"invalid request for {gkid_type.description()}" + ) + + def get_key( + self, + samdb: SamDB, + target_sd: bytes, # An NDR‐encoded valid security descriptor in self‐relative format. + root_key_id: Optional[misc.GUID], + gkid: Gkid, + *, + root_key_id_hint: Optional[misc.GUID] = None, + current_gkid: Optional[Gkid] = None, + ) -> SeedKeyPair: + """Emulate the ISDKey.GetKey() RPC method. + + When passed a NULL root key ID, GetKey() may use a cached root key + rather than picking the most recently created applicable key as the + documentation implies. If it’s important to arrive at the same result as + Windows, pass a GUID in the *root_key_id_hint* parameter to specify a + particular root key to use.""" + + if current_gkid is None: + current_gkid = self.current_gkid() + + root_key_specified = root_key_id is not None + if root_key_specified: + self.assertIsNone( + root_key_id_hint, "don’t provide both root key ID parameters" + ) + + self.validate_get_key_request(gkid, current_gkid, root_key_specified) + + root_key_object, root_key_id = self.get_root_key_object( + samdb, root_key_id if root_key_specified else root_key_id_hint, gkid + ) + + if root_key_specified: + if gkid.l0_idx < current_gkid.l0_idx: + # All of the seed keys with an L0 index less than the current L0 + # index are from the past and thus are safe to return. If the + # caller has requested a specific seed key with a past L0 index, + # return the L1 seed key (L0, 31, −1), from which any L1 or L2 + # seed key having that L0 index can be derived. + l1_gkid = Gkid(gkid.l0_idx, 31, -1) + seed_key = self.compute_seed_key( + target_sd, root_key_id, root_key_object, l1_gkid + ) + return SeedKeyPair( + seed_key.key, + None, + Gkid(gkid.l0_idx, 31, 31), + seed_key.hash_algorithm, + root_key_id, + ) + + # All of the previous seed keys with an L0 index equal to the + # current L0 index can be derived from the current seed key or from + # the next older L1 seed key. + gkid = current_gkid + + if gkid.l2_idx == 31: + # The current seed key, and all previous seed keys with that same L0 + # index, can be derived from the L1 seed key (L0, L1, 31). + l1_gkid = Gkid(gkid.l0_idx, gkid.l1_idx, -1) + seed_key = self.compute_seed_key( + target_sd, root_key_id, root_key_object, l1_gkid + ) + return SeedKeyPair( + seed_key.key, None, gkid, seed_key.hash_algorithm, root_key_id + ) + + # Compute the L2 seed key to return. + seed_key = self.compute_seed_key(target_sd, root_key_id, root_key_object, gkid) + + next_older_seed_key = None + if gkid.l1_idx != 0: + # From the current seed key can be derived only those seed keys that + # share its L1 and L2 indices. To be able to derive previous seed + # keys with older L1 indices, the caller must be given the next + # older L1 seed key as well. + next_older_l1_gkid = Gkid(gkid.l0_idx, gkid.l1_idx - 1, -1) + next_older_seed_key = self.compute_seed_key( + target_sd, root_key_id, root_key_object, next_older_l1_gkid + ).key + + return SeedKeyPair( + next_older_seed_key, + seed_key.key, + gkid, + seed_key.hash_algorithm, + root_key_id, + ) + + def get_key_exact( + self, + samdb: SamDB, + target_sd: bytes, # An NDR‐encoded valid security descriptor in self‐relative format. + root_key_id: Optional[misc.GUID], + gkid: Gkid, + current_gkid: Optional[Gkid] = None, + ) -> GroupKey: + if current_gkid is None: + current_gkid = self.current_gkid() + + root_key_specified = root_key_id is not None + self.validate_get_key_request(gkid, current_gkid, root_key_specified) + + root_key_object, root_key_id = self.get_root_key_object( + samdb, root_key_id, gkid + ) + + return self.compute_seed_key(target_sd, root_key_id, root_key_object, gkid) + + def get_root_key_data(self, root_key: RootKey) -> Tuple[bytes, Algorithm]: + version = root_key.get("msKds-Version", idx=0) + self.assertEqual(b"1", version) + + algorithm_id = root_key.get("msKds-KDFAlgorithmID", idx=0) + self.assertEqual(b"SP800_108_CTR_HMAC", algorithm_id) + + hash_algorithm = Algorithm.from_kdf_parameters( + root_key.get("msKds-KDFParam", idx=0) + ) + + root_key_data = root_key.get("msKds-RootKeyData", idx=0) + self.assertIsInstance(root_key_data, bytes) + + return root_key_data, hash_algorithm + + def compute_seed_key( + self, + target_sd: bytes, + root_key_id: misc.GUID, + root_key: RootKey, + target_gkid: Gkid, + ) -> GroupKey: + target_gkid_type = target_gkid.gkid_type() + self.assertIn( + target_gkid_type, + (GkidType.L1_SEED_KEY, GkidType.L2_SEED_KEY), + f"unexpected attempt to compute {target_gkid_type.description()}", + ) + + root_key_data, algorithm = self.get_root_key_data(root_key) + root_key_id_bytes = ndr_pack(root_key_id) + + hash_algorithm = algorithm.algorithm() + + # Derive the L0 seed key. + gkid = Gkid.l0_seed_key(target_gkid.l0_idx) + key = self.derive_key(root_key_data, root_key_id_bytes, hash_algorithm, gkid) + + # Derive the L1 seed key. + + gkid = gkid.derive_l1_seed_key() + key = self.derive_key( + key, root_key_id_bytes, hash_algorithm, gkid, target_sd=target_sd + ) + + while gkid.l1_idx != target_gkid.l1_idx: + gkid = gkid.derive_l1_seed_key() + key = self.derive_key(key, root_key_id_bytes, hash_algorithm, gkid) + + # Derive the L2 seed key. + while gkid != target_gkid: + gkid = gkid.derive_l2_seed_key() + key = self.derive_key(key, root_key_id_bytes, hash_algorithm, gkid) + + return GroupKey(key, gkid, algorithm, root_key_id) + + def derive_key( + self, + key: bytes, + root_key_id_bytes: bytes, + hash_algorithm: hashes.HashAlgorithm, + gkid: Gkid, + *, + target_sd: Optional[bytes] = None, + ) -> bytes: + def u32_bytes(n: int) -> bytes: + return (n & 0xFFFF_FFFF).to_bytes(length=4, byteorder="little") + + context = ( + root_key_id_bytes + + u32_bytes(gkid.l0_idx) + + u32_bytes(gkid.l1_idx) + + u32_bytes(gkid.l2_idx) + ) + if target_sd is not None: + context += target_sd + return self.kdf(hash_algorithm, key, context) + + def kdf( + self, + hash_algorithm: hashes.HashAlgorithm, + key: bytes, + context: bytes, + *, + label="KDS service", + len_in_bytes=KEY_LEN_BYTES, + ) -> bytes: + label = label.encode("utf-16-le") + b"\x00\x00" + kdf = KBKDFHMAC( + algorithm=hash_algorithm, + mode=Mode.CounterMode, + length=len_in_bytes, + rlen=4, + llen=4, + location=CounterLocation.BeforeFixed, + label=label, + context=context, + fixed=None, + backend=default_backend(), + ) + return kdf.derive(key) + + def get_config_dn(self, samdb: SamDB, dn: str) -> ldb.Dn: + config_dn = samdb.get_config_basedn() + config_dn.add_child(dn) + return config_dn + + def get_server_config_dn(self, samdb: SamDB) -> ldb.Dn: + # [MS-GKDI] has “CN=Sid Key Service” for “CN=Group Key Distribution + # Service”, and “CN=SID Key Server Configuration” for “CN=Group Key + # Distribution Service Server Configuration”. + return self.get_config_dn( + samdb, + "CN=Group Key Distribution Service Server Configuration," + "CN=Server Configuration," + "CN=Group Key Distribution Service," + "CN=Services", + ) + + def get_root_key_container_dn(self, samdb: SamDB) -> ldb.Dn: + # [MS-GKDI] has “CN=Sid Key Service” for “CN=Group Key Distribution Service”. + return self.get_config_dn( + samdb, + "CN=Master Root Keys,CN=Group Key Distribution Service,CN=Services", + ) + + def create_root_key( + self, + samdb: SamDB, + domain_dn: ldb.Dn, + *, + use_start_time: Optional[Union[datetime.datetime, NtTime]] = None, + hash_algorithm: Optional[Algorithm] = Algorithm.SHA512, + guid: Optional[misc.GUID] = None, + data: Optional[bytes] = None, + ) -> misc.GUID: + # [MS-GKDI] 3.1.4.1.1, “Creating a New Root Key”, states that if the + # server receives a GetKey request and the root keys container in Active + # Directory is empty, the the server must create a new root key object + # based on the default Server Configuration object. Additional root keys + # are to be created based on either the default Server Configuration + # object or an updated one specifying optional configuration values. + + guid_specified = guid is not None + if not guid_specified: + guid = misc.GUID(secrets.token_bytes(16)) + + if data is None: + data = secrets.token_bytes(KEY_LEN_BYTES) + else: + self.assertEqual( + KEY_LEN_BYTES, + len(data), + f"root key data must be {KEY_LEN_BYTES} bytes", + ) + + create_time = current_nt_time = self.current_nt_time() + + if use_start_time is None: + # Root keys created by Windows without the ‘-EffectiveImmediately’ + # parameter have an effective time of exactly ten days in the + # future, presumably to allow time for replication. + # + # Microsoft’s documentation on creating a KDS root key, located at + # https://learn.microsoft.com/en-us/windows-server/security/group-managed-service-accounts/create-the-key-distribution-services-kds-root-key, + # claims to the contrary that domain controllers will only wait up + # to ten hours before allowing Group Managed Service Accounts to be + # created. + # + # The same page includes instructions for creating a root key with + # an effective time of ten hours in the past (for testing purposes), + # but I’m not sure why — the KDS will consider a key valid for use + # immediately after its start time has passed, without bothering to + # wait ten hours first. In fact, it will consider a key to be valid + # a full ten hours (plus clock skew) *before* its declared start + # time — intentional, or (conceivably) the result of an accidental + # negation? + current_interval_start_nt_time = Gkid.from_nt_time( + current_nt_time + ).start_nt_time() + use_start_time = NtTime( + current_interval_start_nt_time + KEY_CYCLE_DURATION + MAX_CLOCK_SKEW + ) + + if isinstance(use_start_time, datetime.datetime): + use_start_nt_time = nt_time_from_datetime(use_start_time) + else: + self.assertIsInstance(use_start_time, int) + use_start_nt_time = use_start_time + + kdf_parameters = None + if hash_algorithm is not None: + kdf_parameters = gkdi.KdfParameters() + kdf_parameters.hash_algorithm = hash_algorithm.value + kdf_parameters = ndr_pack(kdf_parameters) + + # These are the encoded p and g values, respectively, of the “2048‐bit + # MODP Group with 256‐bit Prime Order Subgroup” from RFC 5114 section + # 2.3. + field_order = ( + b"\x87\xa8\xe6\x1d\xb4\xb6f<\xff\xbb\xd1\x9ce\x19Y\x99\x8c\xee\xf6\x08" + b"f\r\xd0\xf2],\xee\xd4C^;\x00\xe0\r\xf8\xf1\xd6\x19W\xd4\xfa\xf7\xdfE" + b"a\xb2\xaa0\x16\xc3\xd9\x114\to\xaa;\xf4)m\x83\x0e\x9a|" + b" \x9e\x0cd\x97Qz\xbd" + b'Z\x8a\x9d0k\xcfg\xed\x91\xf9\xe6r[GX\xc0"\xe0\xb1\xefBu\xbf{l[\xfc\x11' + b"\xd4_\x90\x88\xb9A\xf5N\xb1\xe5\x9b\xb8\xbc9\xa0\xbf\x120\x7f\\O\xdbp\xc5" + b"\x81\xb2?v\xb6:\xca\xe1\xca\xa6\xb7\x90-RRg5H\x8a\x0e\xf1. +# + +import sys +import os + +sys.path.insert(0, "bin/python") +os.environ["PYTHONUNBUFFERED"] = "1" + +import secrets + +from typing import ClassVar, Optional + +from samba import HRES_E_INVALIDARG, HRES_NTE_BAD_KEY, HRES_NTE_NO_KEY +from samba.dcerpc import gkdi, misc +from samba.gkdi import ( + Algorithm, + Gkid, + KEY_CYCLE_DURATION, + MAX_CLOCK_SKEW, + NtTime, + NtTimeDelta, + SeedKeyPair, +) +from samba.nt_time import timedelta_from_nt_time_delta + +from samba.tests.gkdi import GetKeyError, GkdiBaseTest, ROOT_KEY_START_TIME +from samba.tests.krb5.kdc_base_test import KDCBaseTest + + +class GkdiKdcBaseTest(GkdiBaseTest, KDCBaseTest): + _root_key: ClassVar[misc.GUID] + + @classmethod + def setUpClass(cls) -> None: + super().setUpClass() + + cls._root_key = None + + def setUp(self) -> None: + super().setUp() + + if self._root_key is None: + # GKDI requires a root key to operate. Creating a root key here + # saves creating one before every test. + # + # We cannot delete this key after the tests have run, as Windows + # might have decided to cache it to be used in subsequent runs. It + # will keep a root key cached even if the corresponding AD object + # has been deleted, leading to various problems later. + cls = type(self) + cls._root_key = self.new_root_key(use_start_time=ROOT_KEY_START_TIME) + + def new_root_key(self, *args, **kwargs) -> misc.GUID: + samdb = self.get_samdb() + domain_dn = self.get_server_dn(samdb) + return self.create_root_key(samdb, domain_dn, *args, **kwargs) + + def gkdi_conn(self) -> gkdi.gkdi: + # The seed keys used by Group Managed Service Accounts correspond to the + # Enterprise DCs SID (S-1-5-9); as such they can be retrieved only by + # server accounts. + return self.gkdi_connect( + self.dc_host, + self.get_lp(), + self.get_cached_creds(account_type=self.AccountType.SERVER), + ) + + def check_rpc_get_key( + self, root_key_id: Optional[misc.GUID], gkid: Gkid + ) -> SeedKeyPair: + got_key_pair = self.rpc_get_key( + self.gkdi_conn(), self.gmsa_sd, root_key_id, gkid + ) + expected_key_pair = self.get_key( + self.get_samdb(), + self.gmsa_sd, + root_key_id, + gkid, + root_key_id_hint=got_key_pair.root_key_id if root_key_id is None else None, + ) + self.assertEqual( + got_key_pair.root_key_id, + expected_key_pair.root_key_id, + "root key IDs must match", + ) + self.assertEqual(got_key_pair, expected_key_pair, "key pairs must match") + + return got_key_pair + + +class GkdiExplicitRootKeyTests(GkdiKdcBaseTest): + def test_current_l0_idx(self): + """Request a key with the current L0 index. This index is regularly + incremented every 427 days or so.""" + root_key_id = self.new_root_key() + + # It actually doesn’t matter what we specify for the L1 and L2 indices. + # We’ll get the same result regardless — they just cannot specify a key + # from the future. + self.check_rpc_get_key(root_key_id, self.current_gkid()) + + def test_previous_l0_idx(self): + """Request a key with a previous L0 index.""" + root_key_id = self.new_root_key(use_start_time=ROOT_KEY_START_TIME) + + # It actually doesn’t matter what we specify for the L1 and L2 indices. + # We’ll get the same result regardless. + previous_l0_idx = self.current_gkid().l0_idx - 1 + key = self.check_rpc_get_key(root_key_id, Gkid(previous_l0_idx, 0, 0)) + + # Expect to get an L1 seed key. + self.assertIsNotNone(key.l1_key) + self.assertIsNone(key.l2_key) + self.assertEqual(Gkid(previous_l0_idx, 31, 31), key.gkid) + self.assertEqual(root_key_id, key.root_key_id) + + def test_algorithm_sha1(self): + """Test with the SHA1 algorithm.""" + key = self.check_rpc_get_key( + self.new_root_key(hash_algorithm=Algorithm.SHA1), + self.current_gkid(), + ) + self.assertIs(Algorithm.SHA1, key.hash_algorithm) + + def test_algorithm_sha256(self): + """Test with the SHA256 algorithm.""" + key = self.check_rpc_get_key( + self.new_root_key(hash_algorithm=Algorithm.SHA256), + self.current_gkid(), + ) + self.assertIs(Algorithm.SHA256, key.hash_algorithm) + + def test_algorithm_sha384(self): + """Test with the SHA384 algorithm.""" + key = self.check_rpc_get_key( + self.new_root_key(hash_algorithm=Algorithm.SHA384), + self.current_gkid(), + ) + self.assertIs(Algorithm.SHA384, key.hash_algorithm) + + def test_algorithm_sha512(self): + """Test with the SHA512 algorithm.""" + key = self.check_rpc_get_key( + self.new_root_key(hash_algorithm=Algorithm.SHA512), + self.current_gkid(), + ) + self.assertIs(Algorithm.SHA512, key.hash_algorithm) + + def test_algorithm_none(self): + """Test without a specified algorithm.""" + key = self.check_rpc_get_key( + self.new_root_key(hash_algorithm=None), + self.current_gkid(), + ) + self.assertIs(Algorithm.SHA256, key.hash_algorithm) + + def test_future_key(self): + """Try to request a key from the future.""" + root_key_id = self.new_root_key(use_start_time=ROOT_KEY_START_TIME) + + future_gkid = self.current_gkid( + offset=timedelta_from_nt_time_delta( + NtTimeDelta(KEY_CYCLE_DURATION + MAX_CLOCK_SKEW) + ) + ) + + with self.assertRaises(GetKeyError) as err: + self.get_key(self.get_samdb(), self.gmsa_sd, root_key_id, future_gkid) + + self.assertEqual( + HRES_E_INVALIDARG, + err.exception.args[0], + "requesting a key from the future should fail with INVALID_PARAMETER", + ) + + with self.assertRaises(GetKeyError) as rpc_err: + self.rpc_get_key(self.gkdi_conn(), self.gmsa_sd, root_key_id, future_gkid) + + self.assertEqual( + HRES_E_INVALIDARG, + rpc_err.exception.args[0], + "requesting a key from the future should fail with INVALID_PARAMETER", + ) + + def test_root_key_use_start_time_zero(self): + """Attempt to use a root key with an effective time of zero.""" + root_key_id = self.new_root_key(use_start_time=NtTime(0)) + + gkid = self.current_gkid() + + with self.assertRaises(GetKeyError) as err: + self.get_key(self.get_samdb(), self.gmsa_sd, root_key_id, gkid) + + self.assertEqual( + HRES_NTE_BAD_KEY, + err.exception.args[0], + "using a root key with an effective time of zero should fail with BAD_KEY", + ) + + with self.assertRaises(GetKeyError) as rpc_err: + self.rpc_get_key(self.gkdi_conn(), self.gmsa_sd, root_key_id, gkid) + + self.assertEqual( + HRES_NTE_BAD_KEY, + rpc_err.exception.args[0], + "using a root key with an effective time of zero should fail with BAD_KEY", + ) + + def test_root_key_use_start_time_too_low(self): + """Attempt to use a root key with an effective time set too low.""" + root_key_id = self.new_root_key(use_start_time=NtTime(ROOT_KEY_START_TIME - 1)) + + gkid = self.current_gkid() + + with self.assertRaises(GetKeyError) as err: + self.get_key(self.get_samdb(), self.gmsa_sd, root_key_id, gkid) + + self.assertEqual( + HRES_E_INVALIDARG, + err.exception.args[0], + "using a root key with too low effective time should fail with" + " INVALID_PARAMETER", + ) + + with self.assertRaises(GetKeyError) as rpc_err: + self.rpc_get_key(self.gkdi_conn(), self.gmsa_sd, root_key_id, gkid) + + self.assertEqual( + HRES_E_INVALIDARG, + rpc_err.exception.args[0], + "using a root key with too low effective time should fail with" + " INVALID_PARAMETER", + ) + + def test_before_valid(self): + """Attempt to use a key before it is valid.""" + gkid = self.current_gkid() + valid_start_time = NtTime( + gkid.start_nt_time() + KEY_CYCLE_DURATION + MAX_CLOCK_SKEW + ) + + # Using a valid root key is allowed. + valid_root_key_id = self.new_root_key(use_start_time=valid_start_time) + self.check_rpc_get_key(valid_root_key_id, gkid) + + # But attempting to use a root key that is not yet valid will result in + # an INVALID_PARAMETER error. + invalid_root_key_id = self.new_root_key(use_start_time=valid_start_time + 1) + + with self.assertRaises(GetKeyError) as err: + self.get_key(self.get_samdb(), self.gmsa_sd, invalid_root_key_id, gkid) + + self.assertEqual( + HRES_E_INVALIDARG, + err.exception.args[0], + "using a key before it is valid should fail with INVALID_PARAMETER", + ) + + with self.assertRaises(GetKeyError) as rpc_err: + self.rpc_get_key(self.gkdi_conn(), self.gmsa_sd, invalid_root_key_id, gkid) + + self.assertEqual( + HRES_E_INVALIDARG, + rpc_err.exception.args[0], + "using a key before it is valid should fail with INVALID_PARAMETER", + ) + + def test_non_existent_root_key(self): + """Attempt to use a non‐existent root key.""" + root_key_id = misc.GUID(secrets.token_bytes(16)) + + gkid = self.current_gkid() + + with self.assertRaises(GetKeyError) as err: + self.get_key(self.get_samdb(), self.gmsa_sd, root_key_id, gkid) + + self.assertEqual( + HRES_NTE_NO_KEY, + err.exception.args[0], + "using a non‐existent root key should fail with NO_KEY", + ) + + with self.assertRaises(GetKeyError) as rpc_err: + self.rpc_get_key(self.gkdi_conn(), self.gmsa_sd, root_key_id, gkid) + + self.assertEqual( + HRES_NTE_NO_KEY, + rpc_err.exception.args[0], + "using a non‐existent root key should fail with NO_KEY", + ) + + +class GkdiImplicitRootKeyTests(GkdiKdcBaseTest): + def test_l1_seed_key(self): + """Request a key and expect to receive an L1 seed key.""" + gkid = Gkid(300, 0, 31) + key = self.check_rpc_get_key(None, gkid) + + # Expect to get an L1 seed key. + self.assertIsNotNone(key.l1_key) + self.assertIsNone(key.l2_key) + self.assertEqual(gkid, key.gkid) + + def test_l2_seed_key(self): + """Request a key and expect to receive an L2 seed key.""" + gkid = Gkid(300, 0, 0) + key = self.check_rpc_get_key(None, gkid) + + # Expect to get an L2 seed key. + self.assertIsNone(key.l1_key) + self.assertIsNotNone(key.l2_key) + self.assertEqual(gkid, key.gkid) + + def test_both_seed_keys(self): + """Request a key and expect to receive L1 and L2 seed keys.""" + gkid = Gkid(300, 1, 0) + key = self.check_rpc_get_key(None, gkid) + + # Expect to get both L1 and L2 seed keys. + self.assertIsNotNone(key.l1_key) + self.assertIsNotNone(key.l2_key) + self.assertEqual(gkid, key.gkid) + + def test_both_seed_keys_no_hint(self): + """Request a key, but don’t specify ‘root_key_id_hint’.""" + gkid = Gkid(300, 1, 0) + key = self.get_key( + self.get_samdb(), + self.gmsa_sd, + None, + gkid, + ) + + # Expect to get both L1 and L2 seed keys. + self.assertIsNotNone(key.l1_key) + self.assertIsNotNone(key.l2_key) + self.assertEqual(gkid, key.gkid) + + def test_request_l0_seed_key(self): + """Attempt to request an L0 seed key.""" + gkid = Gkid.l0_seed_key(300) + + with self.assertRaises(GetKeyError) as err: + self.get_key(self.get_samdb(), self.gmsa_sd, None, gkid) + + self.assertEqual( + HRES_E_INVALIDARG, + err.exception.args[0], + "requesting an L0 seed key should fail with INVALID_PARAMETER", + ) + + with self.assertRaises(GetKeyError) as rpc_err: + self.rpc_get_key(self.gkdi_conn(), self.gmsa_sd, None, gkid) + + self.assertEqual( + HRES_E_INVALIDARG, + rpc_err.exception.args[0], + "requesting an L0 seed key should fail with INVALID_PARAMETER", + ) + + def test_request_l1_seed_key(self): + """Attempt to request an L1 seed key.""" + gkid = Gkid.l1_seed_key(300, 0) + + with self.assertRaises(GetKeyError) as err: + self.get_key(self.get_samdb(), self.gmsa_sd, None, gkid) + + self.assertEqual( + HRES_E_INVALIDARG, + err.exception.args[0], + "requesting an L1 seed key should fail with INVALID_PARAMETER", + ) + + with self.assertRaises(GetKeyError) as rpc_err: + self.rpc_get_key(self.gkdi_conn(), self.gmsa_sd, None, gkid) + + self.assertEqual( + HRES_E_INVALIDARG, + rpc_err.exception.args[0], + "requesting an L1 seed key should fail with INVALID_PARAMETER", + ) + + def test_request_default_seed_key(self): + """Try to make a request with the default GKID.""" + gkid = Gkid.default() + + self.assertRaises( + NotImplementedError, + self.get_key, + self.get_samdb(), + self.gmsa_sd, + None, + gkid, + ) + + self.rpc_get_key(self.gkdi_conn(), self.gmsa_sd, None, gkid) + + +class GkdiSelfTests(GkdiKdcBaseTest): + def test_current_l0_idx_l1_seed_key(self): + """Request a key with the current L0 index, expecting to receive an L1 + seed key.""" + root_key_id = self.new_root_key( + use_start_time=ROOT_KEY_START_TIME, + hash_algorithm=Algorithm.SHA512, + guid=misc.GUID("89f70521-9d66-441f-c314-1b462f9b1052"), + data=bytes.fromhex( + "a6ef87dbbbf86b6bbe55750b941f13ca99efe5185e2e2bded5b838d8a0e77647" + "0537e68cae45a7a0f4b1d6c9bf5494c3f879e172e326557cdbb6a56e8799a722" + ), + ) + + current_gkid = Gkid(255, 24, 31) + key = self.get_key( + self.get_samdb(), + self.gmsa_sd, + root_key_id, + Gkid(255, 2, 5), + current_gkid=current_gkid, + ) + + # Expect to get an L1 seed key. + self.assertEqual(current_gkid, key.gkid) + self.assertEqual(root_key_id, key.root_key_id) + self.assertEqual(Algorithm.SHA512, key.hash_algorithm) + self.assertEqual( + bytes.fromhex( + "bd538a073490f3cf9451c933025de9b22c97eaddaffa94b379e2b919a4bed147" + "5bc67f6a9175b139c69204c57d4300a0141ffe34d12ced84614593b1aa13af1c" + ), + key.l1_key, + ) + self.assertIsNone(key.l2_key) + + def test_current_l0_idx_l2_seed_key(self): + """Request a key with the current L0 index, expecting to receive an L2 + seed key.""" + root_key_id = self.new_root_key( + use_start_time=ROOT_KEY_START_TIME, + hash_algorithm=Algorithm.SHA512, + guid=misc.GUID("1a3d6c30-aa81-cb7f-d3fe-80775d135dfe"), + data=bytes.fromhex( + "dfd95be3153a0805c65694e7d284aace5ab0aa493350025eb8dbc6df0b4e9256" + "fb4cbfbe6237ce3732694e2608760076b67082d39abd3c0fedba1b8873645064" + ), + ) + + current_gkid = Gkid(321, 0, 12) + key = self.get_key( + self.get_samdb(), + self.gmsa_sd, + root_key_id, + Gkid(321, 0, 1), + current_gkid=current_gkid, + ) + + # Expect to get an L2 seed key. + self.assertEqual(current_gkid, key.gkid) + self.assertEqual(root_key_id, key.root_key_id) + self.assertEqual(Algorithm.SHA512, key.hash_algorithm) + self.assertIsNone(key.l1_key) + self.assertEqual( + bytes.fromhex( + "bbbd9376cd16c247ed40f5912d1908218c08f0915bae02fe02cbfb3753bde406" + "f9c553acd95143cf63906a0440e3cf237d2335ae4e4b9cd2d946a71351ebcb7b" + ), + key.l2_key, + ) + + def test_current_l0_idx_both_seed_keys(self): + """Request a key with the current L0 index, expecting to receive L1 and + L2 seed keys.""" + root_key_id = self.new_root_key( + use_start_time=ROOT_KEY_START_TIME, + hash_algorithm=Algorithm.SHA512, + guid=misc.GUID("09de0b38-c743-7abf-44ea-7a3c3e404314"), + data=bytes.fromhex( + "d5912d0eb3bd60e1371b1e525dd83be7fc5baf77018b0dba6bd948b7a98ebe5a" + "f37674332506a46c52c108a62f2a3e89251ad1bde6d539004679c0658853bb68" + ), + ) + + current_gkid = Gkid(123, 21, 0) + key = self.get_key( + self.get_samdb(), + self.gmsa_sd, + root_key_id, + Gkid(123, 2, 1), + current_gkid=current_gkid, + ) + + # Expect to get both L1 and L2 seed keys. + self.assertEqual(current_gkid, key.gkid) + self.assertEqual(root_key_id, key.root_key_id) + self.assertEqual(Algorithm.SHA512, key.hash_algorithm) + self.assertEqual( + bytes.fromhex( + "b1f7c5896e7dc791d9c0aaf8ca7dbab8c172a4f8b873db488a3c4cbd0f559b11" + "52ffba39d4aff2d9e8aada90b27a3c94a5af996f4b8f584a4f37ccab4d505d3d" + ), + key.l1_key, + ) + self.assertEqual( + bytes.fromhex( + "133c9bbd20d9227aeb38dfcd3be6bcbfc5983ba37202088ff5c8a70511214506" + "a69c195a8807cd844bcb955e9569c8e4d197759f28577cc126d15f16a7da4ee0" + ), + key.l2_key, + ) + + def test_previous_l0_idx(self): + """Request a key with a previous L0 index.""" + root_key_id = self.new_root_key( + use_start_time=ROOT_KEY_START_TIME, + hash_algorithm=Algorithm.SHA512, + guid=misc.GUID("27136e8f-e093-6fe3-e57f-1d915b102e1c"), + data=bytes.fromhex( + "b41118c60a19cafa5ecf858d1a2a2216527b2daedf386e9d599e42a46add6c7d" + "c93868619761c880ff3674a77c6e5fbf3434d130a9727bb2cd2a2557bdcfc752" + ), + ) + + key = self.get_key( + self.get_samdb(), + self.gmsa_sd, + root_key_id, + Gkid(100, 20, 30), + current_gkid=Gkid(101, 2, 3), + ) + + # Expect to get an L1 seed key. + self.assertEqual(Gkid(100, 31, 31), key.gkid) + self.assertEqual(root_key_id, key.root_key_id) + self.assertEqual(Algorithm.SHA512, key.hash_algorithm) + self.assertEqual( + bytes.fromhex( + "935cbdc06198eb28fa44b8d8278f51072c4613999236585041ede8e72d02fe95" + "e3454f046382cbc0a700779b79474dd7e080509d76302d2937407e96e3d3d022" + ), + key.l1_key, + ) + self.assertIsNone(key.l2_key) + + def test_sha1(self): + """Request a key derived with SHA1.""" + root_key_id = self.new_root_key( + use_start_time=ROOT_KEY_START_TIME, + hash_algorithm=Algorithm.SHA1, + guid=misc.GUID("970abad6-fe55-073a-caf1-b801d3f26bd3"), + data=bytes.fromhex( + "3bed03bf0fb7d4013149154f24ca2d59b98db6d588cb1f54eca083855e25eb28" + "d3562a01adc78c4b70e0b72a59515863e7732b853fba02dd7646e63108441211" + ), + ) + + current_gkid = Gkid(1, 2, 3) + key = self.get_key( + self.get_samdb(), + self.gmsa_sd, + root_key_id, + Gkid(1, 1, 1), + current_gkid=current_gkid, + ) + + # Expect to get both L1 and L2 seed keys. + self.assertEqual(current_gkid, key.gkid) + self.assertEqual(root_key_id, key.root_key_id) + self.assertEqual(Algorithm.SHA1, key.hash_algorithm) + self.assertEqual( + bytes.fromhex( + "576cb68f2e52eb739f817b488c3590d86f1c2c365f3fc9201d9c7fee7494853d" + "58746ee13e48f18aa6fa69f7157de3d07de34e13836792b7c088ffb6914a89c2" + ), + key.l1_key, + ) + self.assertEqual( + bytes.fromhex( + "3ffb825adaf116b6533207d568a30ed3d3f21c68840941c9456684f9afa11b05" + "6e0c59391b4d88c495d984c3d680029cc5c594630f34179119c1c5acaae5e90e" + ), + key.l2_key, + ) + + def test_sha256(self): + """Request a key derived with SHA256.""" + root_key_id = self.new_root_key( + use_start_time=ROOT_KEY_START_TIME, + hash_algorithm=Algorithm.SHA256, + guid=misc.GUID("45e26207-ed33-dcd5-925a-518a0deef69e"), + data=bytes.fromhex( + "28b5b6503d3c1d24814de781bb7bfce3ef69eed1ce4809372bee2c506270c5f0" + "b5c6df597472623f256c86daa0991e8a11a1705f21b2cfdc0bb9db4ba23246a2" + ), + ) + + current_gkid = Gkid(222, 22, 22) + key = self.get_key( + self.get_samdb(), + self.gmsa_sd, + root_key_id, + Gkid(222, 11, 0), + current_gkid=current_gkid, + ) + + # Expect to get both L1 and L2 seed keys. + self.assertEqual(current_gkid, key.gkid) + self.assertEqual(root_key_id, key.root_key_id) + self.assertEqual(Algorithm.SHA256, key.hash_algorithm) + self.assertEqual( + bytes.fromhex( + "57aced6e75f83f3af4f879b38b60f090b42e4bfa022fae3e6fd94280b469b0ec" + "15d8b853a870b5fbdf28708cce19273b74a573acbe0deda8ef515db4691e2dcb" + ), + key.l1_key, + ) + self.assertEqual( + bytes.fromhex( + "752a0879ae2424c0504c7493599f13e588e1bbdc252f83325ad5b1fb91c24c89" + "01d440f3ff9ffba59fcd65bb975732d9f383dd50b898174bb9393e383d25d540" + ), + key.l2_key, + ) + + def test_sha384(self): + """Request a key derived with SHA384.""" + root_key_id = self.new_root_key( + use_start_time=ROOT_KEY_START_TIME, + hash_algorithm=Algorithm.SHA384, + guid=misc.GUID("66e6d9f7-4924-f3fc-fe34-605634d42ebd"), + data=bytes.fromhex( + "23e5ba86cbd88f7b432ee66dbb03bf4eebf401cbfc3df735d4d728b503c87f84" + "3207c6f6153f190dfe85a86cb8d8b74df13b25305981be8d7e29c96ee54c9630" + ), + ) + + current_gkid = Gkid(287, 28, 27) + key = self.get_key( + self.get_samdb(), + self.gmsa_sd, + root_key_id, + Gkid(287, 8, 7), + current_gkid=current_gkid, + ) + + # Expect to get both L1 and L2 seed keys. + self.assertEqual(current_gkid, key.gkid) + self.assertEqual(root_key_id, key.root_key_id) + self.assertEqual(Algorithm.SHA384, key.hash_algorithm) + self.assertEqual( + bytes.fromhex( + "fabadd7a9a63df57d6832df7a735aebb6e181888b2eaf301a2e4ff9a70246d38" + "ab1d2416325bf3eb726a0267bab4bd950c7291f05ea5f17197ece56992af3eb8" + ), + key.l1_key, + ) + self.assertEqual( + bytes.fromhex( + "ec1c65634b5694818e1d341da9996db8f2a1ef6a2c776a7126a7ebd18b37a073" + "afdac44c41b167b14e4b872d485bbb6d7b70964215d0e84a2ff142a9d943f205" + ), + key.l2_key, + ) + + def test_derive_key_exact(self): + """Derive a key at an exact GKID.""" + root_key_id = self.new_root_key( + use_start_time=ROOT_KEY_START_TIME, + hash_algorithm=Algorithm.SHA512, + guid=misc.GUID("d95fb06f-5a9c-1829-e20d-27f3f2ecfbeb"), + data=bytes.fromhex( + "489f3531c537774d432d6b97e3bc1f43d2e8c6dc17eb0e4fd9a0870d2f1ebf92" + "e2496668a8b5bd11aea2d32d0aab716f48fe569f5c9b50ff3f9bf5deaea572fb" + ), + ) + + gkid = Gkid(333, 22, 11) + key = self.get_key_exact( + self.get_samdb(), + self.gmsa_sd, + root_key_id, + gkid, + current_gkid=self.current_gkid(), + ) + + self.assertEqual(gkid, key.gkid) + self.assertEqual(root_key_id, key.root_key_id) + self.assertEqual(Algorithm.SHA512, key.hash_algorithm) + self.assertEqual( + bytes.fromhex( + "d6ab3b14f4f4c8908aa3464011b39f10a8bfadb9974af90f7d9a9fede2fdc6e5" + "f68a628ec00f9994a3abd8a52ae9e2db4f68e83648311e9d7765f2535515b5e2" + ), + key.key, + ) + + +if __name__ == "__main__": + import unittest + + unittest.main() diff --git a/selftest/knownfail.d/gkdi b/selftest/knownfail.d/gkdi new file mode 100644 index 00000000000..68f3dffd42e --- /dev/null +++ b/selftest/knownfail.d/gkdi @@ -0,0 +1,18 @@ +^samba\.tests\.krb5\.gkdi_tests\.samba\.tests\.krb5\.gkdi_tests\.GkdiExplicitRootKeyTests\.test_algorithm_none\(ad_dc\)$ +^samba\.tests\.krb5\.gkdi_tests\.samba\.tests\.krb5\.gkdi_tests\.GkdiExplicitRootKeyTests\.test_algorithm_sha1\(ad_dc\)$ +^samba\.tests\.krb5\.gkdi_tests\.samba\.tests\.krb5\.gkdi_tests\.GkdiExplicitRootKeyTests\.test_algorithm_sha256\(ad_dc\)$ +^samba\.tests\.krb5\.gkdi_tests\.samba\.tests\.krb5\.gkdi_tests\.GkdiExplicitRootKeyTests\.test_algorithm_sha384\(ad_dc\)$ +^samba\.tests\.krb5\.gkdi_tests\.samba\.tests\.krb5\.gkdi_tests\.GkdiExplicitRootKeyTests\.test_algorithm_sha512\(ad_dc\)$ +^samba\.tests\.krb5\.gkdi_tests\.samba\.tests\.krb5\.gkdi_tests\.GkdiExplicitRootKeyTests\.test_before_valid\(ad_dc\)$ +^samba\.tests\.krb5\.gkdi_tests\.samba\.tests\.krb5\.gkdi_tests\.GkdiExplicitRootKeyTests\.test_current_l0_idx\(ad_dc\)$ +^samba\.tests\.krb5\.gkdi_tests\.samba\.tests\.krb5\.gkdi_tests\.GkdiExplicitRootKeyTests\.test_future_key\(ad_dc\)$ +^samba\.tests\.krb5\.gkdi_tests\.samba\.tests\.krb5\.gkdi_tests\.GkdiExplicitRootKeyTests\.test_non_existent_root_key\(ad_dc\)$ +^samba\.tests\.krb5\.gkdi_tests\.samba\.tests\.krb5\.gkdi_tests\.GkdiExplicitRootKeyTests\.test_previous_l0_idx\(ad_dc\)$ +^samba\.tests\.krb5\.gkdi_tests\.samba\.tests\.krb5\.gkdi_tests\.GkdiExplicitRootKeyTests\.test_root_key_use_start_time_too_low\(ad_dc\)$ +^samba\.tests\.krb5\.gkdi_tests\.samba\.tests\.krb5\.gkdi_tests\.GkdiExplicitRootKeyTests\.test_root_key_use_start_time_zero\(ad_dc\)$ +^samba\.tests\.krb5\.gkdi_tests\.samba\.tests\.krb5\.gkdi_tests\.GkdiImplicitRootKeyTests\.test_both_seed_keys\(ad_dc\)$ +^samba\.tests\.krb5\.gkdi_tests\.samba\.tests\.krb5\.gkdi_tests\.GkdiImplicitRootKeyTests\.test_l1_seed_key\(ad_dc\)$ +^samba\.tests\.krb5\.gkdi_tests\.samba\.tests\.krb5\.gkdi_tests\.GkdiImplicitRootKeyTests\.test_l2_seed_key\(ad_dc\)$ +^samba\.tests\.krb5\.gkdi_tests\.samba\.tests\.krb5\.gkdi_tests\.GkdiImplicitRootKeyTests\.test_request_default_seed_key\(ad_dc\)$ +^samba\.tests\.krb5\.gkdi_tests\.samba\.tests\.krb5\.gkdi_tests\.GkdiImplicitRootKeyTests\.test_request_l0_seed_key\(ad_dc\)$ +^samba\.tests\.krb5\.gkdi_tests\.samba\.tests\.krb5\.gkdi_tests\.GkdiImplicitRootKeyTests\.test_request_l1_seed_key\(ad_dc\)$ diff --git a/source4/selftest/tests.py b/source4/selftest/tests.py index 37839e7522b..2ab930a6c9a 100755 --- a/source4/selftest/tests.py +++ b/source4/selftest/tests.py @@ -2044,6 +2044,10 @@ planoldpythontestsuite( 'ad_dc', 'samba.tests.krb5.conditional_ace_tests', environ=krb5_environ) +planoldpythontestsuite( + 'ad_dc', + 'samba.tests.krb5.gkdi_tests', + environ=krb5_environ) for env in [ 'vampire_dc',