"""Functions for processing key_credential_link"""
+from hashlib import sha256
+import struct
+from typing import Optional, Union
-from samba.samdb import BinaryDn
+from cryptography.hazmat.primitives.serialization import (
+ load_der_public_key,
+ load_pem_public_key,
+ PublicFormat,
+ Encoding)
+
+from cryptography.hazmat.primitives.asymmetric.rsa import RSAPublicKey
+
+from cryptography.x509 import (
+ load_pem_x509_certificate,
+ load_der_x509_certificate)
+
+
+from samba.samdb import SamDB, BinaryDn
from samba.ndr import ndr_unpack, ndr_pack
+from ldb import Dn
from samba.dcerpc import keycredlink
except Exception as e:
raise ValueError("Could not parse value as KEYCREDENTIALLINK_BLOB "
f" (internal error: {e})")
+
+
+def get_public_key(data:bytes, encoding:str):
+ """decode a key in PEM or DER format.
+
+ If it turns out to be a certificate or something, we try to get
+ the public key from that.
+
+ So far only RSA keys are supported.
+ """
+ if encoding is None:
+ if data[:11] == b'-----BEGIN ':
+ encoding = 'PEM'
+ else:
+ encoding = 'DER'
+
+ encoding = encoding.upper()
+
+ # The cryptography module also supports ssh keys, PKCS1, and other
+ # formats, as well as non-RSA keys and extracting public keys from
+ # private. It might not be wise to tolerate all of this, but we
+ # can do it by adding to key_fns and cert_fns here.
+ if encoding == 'PEM':
+ key_fns = [load_pem_public_key]
+ cert_fns = [load_pem_x509_certificate]
+ elif encoding == 'DER':
+ key_fns = [load_der_public_key]
+ cert_fns = [load_der_x509_certificate]
+ else:
+ raise ValueError(f"Public key encoding '{encoding}' not supported "
+ "(try 'PEM' or 'DER')")
+
+ key = None
+ for fn in key_fns:
+ try:
+ key = fn(data)
+ break
+ except ValueError:
+ continue
+
+ if key is None:
+ for fn in cert_fns:
+ try:
+ cert = fn(data)
+ key = cert.public_key()
+ break
+ except ValueError:
+ continue
+
+ if key is None:
+ raise ValueError("could not decode public key")
+
+ if not isinstance(key, RSAPublicKey):
+ raise ValueError("Currently only RSA Public Keys are supported "
+ f"(not '{key}')")
+
+ return key
+
+
+def kcl_entry_bytes(entry_type:int, data:bytes) -> bytes:
+ """helper to pack key credential link entries"""
+ return struct.pack('<HB', len(data), entry_type) + data
+
+
+def create_key_credential_link(samdb: SamDB,
+ target: Union[str, Dn],
+ data: bytes,
+ encoding: Optional[str] = None,
+ force: bool = False):
+ """Convert a public key in a common format into a binary DN"""
+ if not force:
+ res = samdb.search(base=target)
+ if len(res) == 0:
+ raise ValueError(f"link target {target} does not exist")
+
+ key = get_public_key(data, encoding)
+
+ if key.key_size != 2048:
+ # According to [MS-ADTS] 2.2.20.5.1, KEY_USAGE_NGC means a
+ # 2048 bit public key.
+ if not force:
+ raise ValueError(f"2048 bit RSA key expected, not {key.key_size}")
+
+ key_bytes = key.public_bytes(Encoding.DER,
+ PublicFormat.SubjectPublicKeyInfo)
+
+ # that's the key.
+ # but there's more.
+ kcl_header = bytes.fromhex("00 02 00 00") # Always version 2
+
+ # Entries are added in the enum order, as follows.
+ #
+ # Here '**' means MUST exist, '*' means SHOULD, and '-' means
+ # SHOULD which we ignore. We ignore all the un-SHOULDed values
+ # ([MS-ADTS] 2.2.20.6). For KeyUsage, only use KEY_USAGE_NGC.
+ #
+ # ** 1 KeyID hash of the key material
+ # * 2 KeyHash hash of following entries (i.e. 3, 4, 9)
+ # ** 3 KeyMaterial the key
+ # ** 4 KeyUsage KEY_USAGE_NGC, KEY_USAGE_FIDO, or KEY_USAGE_FEK
+ # 5 KeySource KEY_SOURCE_AD.
+ # 6 DeviceId 16 byte device ID (GUID, I guess) or zeros
+ # 7 CustomKeyInformation CUSTOM_KEY_INFORMATION struct
+ # - 8 KeyApproximateLastLogonTimeStamp nttime
+ # * 9 KeyCreationTime nttime
+
+ # sha256 of the actual key
+ kcl_key_id = kcl_entry_bytes(keycredlink.KeyID,
+ sha256(key_bytes).digest())
+
+ # the actual key
+ kcl_material = kcl_entry_bytes(keycredlink.KeyMaterial,
+ key_bytes)
+
+ # always KEY_USAGE_NGC
+ kcl_key_usage = kcl_entry_bytes(keycredlink.KeyUsage,
+ keycredlink.KEY_USAGE_NGC.to_bytes())
+
+ # nttime for now
+ kcl_creation = kcl_entry_bytes(keycredlink.KeyCreationTime,
+ struct.pack('<Q', samdb.get_nttime()))
+
+ # always KEY_SOURCE_AD
+ #kcl_key_source = kcl_entry_bytes(keycredlink.KeySource,
+ # KEY_SOURCE_AD.to_bytes())
+
+ # the KeyHash field is a sha256 of all the values after the
+ # KeyHash field.
+
+ kcl_key_hash = kcl_entry_bytes(keycredlink.KeyHash,
+ sha256(kcl_material +
+ kcl_key_usage +
+ kcl_creation).digest())
+
+ kcl_bytes = (kcl_header +
+ kcl_key_id +
+ kcl_key_hash +
+ kcl_material +
+ kcl_key_usage +
+ kcl_creation)
+
+ k = KeyCredentialLinkDn.from_bytes_and_dn(samdb, kcl_bytes, target)
+ return k