From: Jennifer Sutton Date: Wed, 8 Oct 2025 01:34:25 +0000 (+1300) Subject: samba-tool: Add subcommand to generate Certificate Signing Requests with SID extension X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=db6f50b7cff616b4d67ec62dab6e0208f5cbb79d;p=thirdparty%2Fsamba.git samba-tool: Add subcommand to generate Certificate Signing Requests with SID extension Signed-off-by: Jennifer Sutton Reviewed-by: Douglas Bagnall Reviewed-by: Gary Lockyer --- diff --git a/python/samba/generate_csr.py b/python/samba/generate_csr.py new file mode 100644 index 00000000000..a486e844b93 --- /dev/null +++ b/python/samba/generate_csr.py @@ -0,0 +1,266 @@ +# Generate a Certificate Signing Request for a certificate +# +# Copyright (C) Catalyst.Net Ltd 2025 +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . + + +from typing import Optional + +from cryptography import x509 +from cryptography.hazmat.backends import default_backend +from cryptography.hazmat.primitives import hashes +from cryptography.hazmat.primitives.asymmetric.rsa import RSAPrivateKey +from cryptography.hazmat.primitives.serialization import ( + load_der_private_key, + load_pem_private_key, +) +from cryptography.x509.base import CertificateSigningRequest +from samba import asn1, ldb +from samba.samdb import SamDB + +from samba.domain.models import User + + +ID_PKINIT_MS_SAN = x509.ObjectIdentifier("1.3.6.1.4.1.311.20.2.3") +szOID_NTDS_CA_SECURITY_EXT = x509.ObjectIdentifier("1.3.6.1.4.1.311.25.2") + + +# As the version of python3-cryptography used in CI is too old to include the +# method x509.Name.from_rfc4514_string(), we must implement it ourselves. +def x509_name_from_rfc4514_string(rfc4514_string: str) -> x509.Name: + # Derived from https://datatracker.ietf.org/doc/html/rfc4514#page-7 + name_oid_map = { + "CN": x509.NameOID.COMMON_NAME, + "L": x509.NameOID.LOCALITY_NAME, + "ST": x509.NameOID.STATE_OR_PROVINCE_NAME, + "O": x509.NameOID.ORGANIZATION_NAME, + "OU": x509.NameOID.ORGANIZATIONAL_UNIT_NAME, + "C": x509.NameOID.COUNTRY_NAME, + "STREET": x509.NameOID.STREET_ADDRESS, + "DC": x509.NameOID.DOMAIN_COMPONENT, + "UID": x509.NameOID.USER_ID, + } + + def name_to_name_oid(name: str) -> x509.ObjectIdentifier: + try: + return name_oid_map[name] + except KeyError: + raise ValueError(f"Unknown component ‘{name}’ in RFC4514 string") + + try: + dn = ldb.Dn(ldb.Ldb(), rfc4514_string) + except ValueError: + raise ValueError("Unable to parse RFC4514 string as DN") + + return x509.Name([ + x509.RelativeDistinguishedName([ + x509.NameAttribute( + name_to_name_oid(dn.get_component_name(i)), dn.get_component_value(i) + ) + ]) + for i in reversed(range(len(dn))) + ]) + + +def get_private_key( + data: bytes, encoding: Optional[str] = None, password: Optional[str] = None +) -> RSAPrivateKey: + """decode a key in PEM or DER format. + + So far only RSA keys are supported. + """ + encoded_password = None + if password is not None: + encoded_password = password.encode("utf-8") + + if encoding is None: + if data[:11] == b"-----BEGIN ": + encoding = "PEM" + else: + encoding = "DER" + + encoding = encoding.upper() + + # The cryptography module also supports ssh keys, PKCS1, and other formats, + # as well as non-RSA keys. It might not be wise to tolerate all of this, but + # we can do it by adding to key_fns here. + if encoding == "PEM": + key_fns = [load_pem_private_key] + elif encoding == "DER": + key_fns = [load_der_private_key] + else: + raise ValueError( + f"Private key encoding '{encoding}' not supported (try 'PEM' or 'DER')" + ) + + key = None + for fn in key_fns: + try: + key = fn(data, encoded_password) + break + except ValueError: + continue + except TypeError: + if password is None: + raise ValueError("No password supplied to decrypt private key") + else: + raise ValueError("Password supplied but private key isn’t encrypted") + + if key is None: + raise ValueError("could not decode private key") + + if not isinstance(key, RSAPrivateKey): + raise ValueError(f"Currently only RSA Private Keys are supported (not '{key}')") + + return key + + +def generate_csr( + samdb: SamDB, + user: User, + subject_name: str, + private_key_filename: str, + *, + private_key_encoding: Optional[str] = "auto", + private_key_pass: Optional[str] = None, +) -> CertificateSigningRequest: + if private_key_encoding == "auto": + private_key_encoding = None + + certificate_signature = hashes.SHA256 + + account_name = user.account_name + if user.user_principal_name is not None: + account_upn = user.user_principal_name + else: + realm = samdb.domain_dns_name() + account_upn = f"{account_name}@{realm.lower()}" + + builder = x509.CertificateSigningRequestBuilder() + # Add the subject name. + builder = builder.subject_name(x509_name_from_rfc4514_string(subject_name)) + + with open(private_key_filename, "rb") as private_key_file: + private_key_bytes = private_key_file.read() + + private_key = get_private_key( + private_key_bytes, encoding=private_key_encoding, password=private_key_pass + ) + public_key = private_key.public_key() + + # Add the SubjectAlternativeName. Windows uses this to map the account + # to the certificate. + + encoded_upn = account_upn.encode("utf-8") + encoded_upn = bytes([0x0C]) + asn1.asn1_length(encoded_upn) + encoded_upn + + ms_upn_san = x509.OtherName(ID_PKINIT_MS_SAN, encoded_upn) + alt_names = [ms_upn_san] + builder = builder.add_extension( + x509.SubjectAlternativeName(alt_names), + critical=False, + ) + + builder = builder.add_extension( + x509.BasicConstraints(ca=False, path_length=None), + critical=True, + ) + + # The key identifier is used to identify the certificate. + subject_key_id = x509.SubjectKeyIdentifier.from_public_key(public_key) + builder = builder.add_extension( + subject_key_id, + critical=True, + ) + + # Add the key usages for which this certificate is valid. Windows + # doesn’t actually require this extension to be present. + builder = builder.add_extension( + # Heimdal requires that the certificate be valid for digital + # signatures. + x509.KeyUsage( + digital_signature=True, + content_commitment=False, + key_encipherment=False, + data_encipherment=False, + key_agreement=False, + key_cert_sign=False, + crl_sign=False, + encipher_only=False, + decipher_only=False, + ), + critical=True, + ) + + # Windows doesn’t require this extension to be present either; but if + # it is, Windows will not accept the certificate unless either client + # authentication or smartcard logon is specified, returning + # KDC_ERR_INCONSISTENT_KEY_PURPOSE otherwise. + builder = builder.add_extension( + x509.ExtendedKeyUsage([ + x509.oid.ExtendedKeyUsageOID.CLIENT_AUTH, + ]), + critical=False, + ) + + # If the certificate predates (as ours does) the existence of the + # account that presents it Windows will refuse to accept it unless + # there exists a strong mapping from one to the other. This strong + # mapping will in this case take the form of a certificate extension + # described in [MS-WCCE] 2.2.2.7.7.4 (szOID_NTDS_CA_SECURITY_EXT) and + # containing the account’s SID. + + # Encode this structure manually until we are able to produce the same + # ASN.1 encoding that Windows does. + + encoded_sid = user.object_sid.encode("utf-8") + + # The OCTET STRING tag, followed by length and encoded SID… + security_ext = bytes([0x04]) + asn1.asn1_length(encoded_sid) + (encoded_sid) + + # …enclosed in a construct tagged with the application-specific value + # 0… + security_ext = bytes([0xA0]) + asn1.asn1_length(security_ext) + (security_ext) + + # …preceded by the extension OID… + + encoded_oid = bytes.fromhex("060a2b060104018237190201") + security_ext = encoded_oid + security_ext + + # …and another application-specific tag 0… + # (This is the part about which I’m unsure. This length is not just of + # the OID, but of the entire structure so far, as if there’s some + # nesting going on. So far I haven’t been able to replicate this with + # pyasn1.) + security_ext = bytes([0xA0]) + asn1.asn1_length(security_ext) + (security_ext) + + # …all enclosed in a structure with a SEQUENCE tag. + security_ext = bytes([0x30]) + asn1.asn1_length(security_ext) + (security_ext) + + # Add the security extension to the certificate. + builder = builder.add_extension( + x509.UnrecognizedExtension( + szOID_NTDS_CA_SECURITY_EXT, + security_ext, + ), + critical=False, + ) + + # Sign the certificate with the user’s private key. + return builder.sign( + private_key=private_key, + algorithm=certificate_signature(), + backend=default_backend(), + ) diff --git a/python/samba/netcmd/computer.py b/python/samba/netcmd/computer.py index cd5389cf8ec..39f6c627039 100644 --- a/python/samba/netcmd/computer.py +++ b/python/samba/netcmd/computer.py @@ -36,6 +36,7 @@ from samba.samdb import SamDB from samba.common import get_bytes from subprocess import check_call, CalledProcessError from . import common +from .computer_generate_csr import cmd_computer_generate_csr from .computer_keytrust import cmd_computer_keytrust @@ -723,6 +724,7 @@ class cmd_computer(SuperCommand): subcommands["create"] = cmd_computer_add() subcommands["delete"] = cmd_computer_delete() subcommands["edit"] = cmd_computer_edit() + subcommands["generate-csr"] = cmd_computer_generate_csr() subcommands["list"] = cmd_computer_list() subcommands["show"] = cmd_computer_show() subcommands["move"] = cmd_computer_move() diff --git a/python/samba/netcmd/computer_generate_csr.py b/python/samba/netcmd/computer_generate_csr.py new file mode 100644 index 00000000000..094002e359d --- /dev/null +++ b/python/samba/netcmd/computer_generate_csr.py @@ -0,0 +1,93 @@ +# samba-tool commands to generate a Certificate Signing Request for a computer’s +# certificate +# +# Copyright (C) Catalyst.Net Ltd 2025 +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . + +from typing import Optional + +from cryptography.hazmat.primitives import serialization +import samba.getopt as options +from samba.netcmd import Command, Option +from samba.netcmd import exception_to_command_error + +from samba.domain.models import Computer +from samba.domain.models.exceptions import ModelError +from samba.generate_csr import generate_csr + + +class cmd_computer_generate_csr(Command): + """Generate a PEM‐encoded Certificate Signing Request for a computer.""" + + synopsis = "%prog [options]" + + takes_args = [ + "computername", + "subject_name", + "private_key_filename", + "output_filename", + ] + + takes_optiongroups = { + "sambaopts": options.SambaOptions, + "credopts": options.CredentialsOptions, + "hostopts": options.HostOptions, + } + + takes_options = [ + Option( + "--private-key-encoding", + default="auto", + choices=("pem", "der", "auto"), + help="Private key encoding (optional)", + ), + Option( + "--private-key-pass", + help="Password to decrypt private key (optional)", + ), + ] + + @exception_to_command_error(ValueError, ModelError, FileNotFoundError) + def run( + self, + computername: str, + subject_name: str, + private_key_filename: str, + output_filename: str, + *, + hostopts: Optional[options.HostOptions] = None, + sambaopts: Optional[options.SambaOptions] = None, + credopts: Optional[options.CredentialsOptions] = None, + private_key_encoding: Optional[str] = "auto", + private_key_pass: Optional[str] = None, + ): + if private_key_encoding == "auto": + private_key_encoding = None + + samdb = self.ldb_connect(hostopts, sambaopts, credopts) + computer: Computer = Computer.find(samdb, computername) + + csr = generate_csr( + samdb, + computer, + subject_name, + private_key_filename, + private_key_encoding=private_key_encoding, + private_key_pass=private_key_pass, + ) + + serialized = csr.public_bytes(serialization.Encoding.PEM) + with open(output_filename, "wb") as output_file: + _ = output_file.write(serialized) diff --git a/python/samba/netcmd/user/__init__.py b/python/samba/netcmd/user/__init__.py index e73f2d323e0..ccef9b25de4 100644 --- a/python/samba/netcmd/user/__init__.py +++ b/python/samba/netcmd/user/__init__.py @@ -26,6 +26,7 @@ from .delete import cmd_user_delete from .disable import cmd_user_disable from .edit import cmd_user_edit from .enable import cmd_user_enable +from .generate_csr import cmd_user_generate_csr from .getgroups import cmd_user_getgroups from .keytrust import cmd_user_keytrust from .list import cmd_user_list @@ -57,6 +58,7 @@ class cmd_user(SuperCommand): subcommands["list"] = cmd_user_list() subcommands["setexpiry"] = cmd_user_setexpiry() subcommands["password"] = cmd_user_password() + subcommands["generate-csr"] = cmd_user_generate_csr() subcommands["getgroups"] = cmd_user_getgroups() subcommands["setprimarygroup"] = cmd_user_setprimarygroup() subcommands["setpassword"] = cmd_user_setpassword() diff --git a/python/samba/netcmd/user/generate_csr.py b/python/samba/netcmd/user/generate_csr.py new file mode 100644 index 00000000000..79d9dda1bc1 --- /dev/null +++ b/python/samba/netcmd/user/generate_csr.py @@ -0,0 +1,88 @@ +# samba-tool commands to generate a Certificate Signing Request for a user’s +# certificate +# +# Copyright (C) Catalyst.Net Ltd 2025 +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . + +from typing import Optional + +from cryptography.hazmat.primitives import serialization +import samba.getopt as options +from samba.netcmd import Command, Option +from samba.netcmd import exception_to_command_error + +from samba.domain.models import User +from samba.domain.models.exceptions import ModelError +from samba.generate_csr import generate_csr + + +class cmd_user_generate_csr(Command): + """Generate a PEM‐encoded Certificate Signing Request for a user.""" + + synopsis = "%prog [options]" + + takes_args = ["username", "subject_name", "private_key_filename", "output_filename"] + + takes_optiongroups = { + "sambaopts": options.SambaOptions, + "credopts": options.CredentialsOptions, + "hostopts": options.HostOptions, + } + + takes_options = [ + Option( + "--private-key-encoding", + default="auto", + choices=("pem", "der", "auto"), + help="Private key encoding (optional)", + ), + Option( + "--private-key-pass", + help="Password to decrypt private key (optional)", + ), + ] + + @exception_to_command_error(ValueError, ModelError, FileNotFoundError) + def run( + self, + username: str, + subject_name: str, + private_key_filename: str, + output_filename: str, + *, + hostopts: Optional[options.HostOptions] = None, + sambaopts: Optional[options.SambaOptions] = None, + credopts: Optional[options.CredentialsOptions] = None, + private_key_encoding: Optional[str] = "auto", + private_key_pass: Optional[str] = None, + ): + if private_key_encoding == "auto": + private_key_encoding = None + + samdb = self.ldb_connect(hostopts, sambaopts, credopts) + user: User = User.find(samdb, username) + + csr = generate_csr( + samdb, + user, + subject_name, + private_key_filename, + private_key_encoding=private_key_encoding, + private_key_pass=private_key_pass, + ) + + serialized = csr.public_bytes(serialization.Encoding.PEM) + with open(output_filename, "wb") as output_file: + _ = output_file.write(serialized) diff --git a/python/samba/tests/samba_tool/user_generate_csr.py b/python/samba/tests/samba_tool/user_generate_csr.py new file mode 100644 index 00000000000..7623c1eb9db --- /dev/null +++ b/python/samba/tests/samba_tool/user_generate_csr.py @@ -0,0 +1,154 @@ +# Unix SMB/CIFS implementation. +# +# Tests for `samba-tool user generate-csr` +# +# Copyright (C) Catalyst.Net Ltd 2025 +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . +# + +import os +from typing import Optional + +from samba.domain.models import Computer, User +from samba.tests.samba_tool.base import SambaToolCmdTest + +from cryptography import x509 +from cryptography.hazmat.backends import default_backend +from cryptography.hazmat.primitives import serialization +from cryptography.hazmat.primitives.asymmetric import rsa +from cryptography.x509.oid import NameOID + + +szOID_NTDS_CA_SECURITY_EXT = x509.ObjectIdentifier("1.3.6.1.4.1.311.25.2") + + +HOST = "ldap://{DC_SERVER}".format(**os.environ) +CREDS = "-U{DC_USERNAME}%{DC_PASSWORD}".format(**os.environ) + + +class SambaToolUserGenerateCsrTest(SambaToolCmdTest): + cmd = "user" + model = User + user = "alice" + + @classmethod + def setUpClass(cls): + super().setUpClass() + cls.samdb = cls.getSamDB("-H", HOST, CREDS) + + def test_create_private_key_pem(self): + self._test_create_private_key(encoding=serialization.Encoding.PEM) + + def test_create_private_key_der(self): + self._test_create_private_key(encoding=serialization.Encoding.DER) + + def test_create_private_key_password(self): + self._test_create_private_key(password="pass1234") + + def _test_create_private_key( + self, + *, + encoding: serialization.Encoding = serialization.Encoding.PEM, + password: Optional[str] = None, + ): + private_key = rsa.generate_private_key( + public_exponent=65537, key_size=2048, backend=default_backend() + ) + + if password is not None: + encryption_algorithm = serialization.BestAvailableEncryption( + password.encode("utf-8") + ) + else: + encryption_algorithm = serialization.NoEncryption() + + private_key_bytes = private_key.private_bytes( + encoding=encoding, + format=serialization.PrivateFormat.PKCS8, + encryption_algorithm=encryption_algorithm, + ) + + user = self.model.find(self.samdb, self.user) + account_name = user.account_name + if user.user_principal_name is not None: + account_upn = user.user_principal_name + else: + realm = self.samdb.domain_dns_name() + account_upn = f"{account_name}@{realm.lower()}" + + subject_name = x509.Name([ + # Note that the subject name is used in certificate mappings + x509.NameAttribute(NameOID.COUNTRY_NAME, "US"), + x509.NameAttribute(NameOID.STATE_OR_PROVINCE_NAME, "SambaState"), + x509.NameAttribute(NameOID.ORGANIZATION_NAME, "SambaSelfTesting"), + x509.NameAttribute(NameOID.ORGANIZATIONAL_UNIT_NAME, "Users"), + x509.NameAttribute(NameOID.COMMON_NAME, account_upn), + ]) + subject_name_str = subject_name.rfc4514_string() + + with self.mktemp() as private_key_filename, self.mktemp() as csr_filename: + with open(private_key_filename, "wb") as private_key_file: + _ = private_key_file.write(private_key_bytes) + + cmd = [ + self.cmd, + "generate-csr", + "-H", + HOST, + CREDS, + self.user, + subject_name_str, + private_key_filename, + csr_filename, + ] + if password is not None: + cmd.append("--private-key-pass") + cmd.append(password) + + result, out, err = self.runcmd(*cmd) + self.assertCmdSuccess(result, out, err) + + with open(csr_filename, "rb") as csr_file: + csr_bytes = csr_file.read() + + csr = x509.load_pem_x509_csr(csr_bytes) + + self.assertEqual(subject_name, csr.subject) + + try: + sid_extension = csr.extensions.get_extension_for_oid( + szOID_NTDS_CA_SECURITY_EXT, + ) + except x509.ExtensionNotFound: + self.fail("expected to find SID extension") + + # We don’t check the whole ASN.1 structure is correct, just that it + # contains the encoded SID. + encoded_sid = user.object_sid.encode("utf-8") + self.assertIn(encoded_sid, sid_extension.value.value) + + +class SambaToolComputerGenerateCsrTest(SambaToolUserGenerateCsrTest): + cmd = "computer" + model = Computer + user = "ADDC" + + +if __name__ == "__main__": + global_asn1_print = False + global_hexdump = False + import unittest + + unittest.main() diff --git a/source4/selftest/tests.py b/source4/selftest/tests.py index 06f2ff08d64..fb756b164ef 100755 --- a/source4/selftest/tests.py +++ b/source4/selftest/tests.py @@ -1232,6 +1232,7 @@ planpythontestsuite("ad_dc_ntvfs:local", "samba.tests.samba_tool.user") planpythontestsuite("ad_dc_default", "samba.tests.samba_tool.user_auth_policy") planpythontestsuite("ad_dc_default", "samba.tests.samba_tool.user_auth_silo") planpythontestsuite("ad_dc_default", "samba.tests.samba_tool.user_keytrust") +planpythontestsuite("ad_dc_default", "samba.tests.samba_tool.user_generate_csr") for env in ["ad_dc_default:local", "ad_dc_no_ntlm:local"]: planpythontestsuite(env, "samba.tests.samba_tool.user_wdigest") for env, nt_hash in [("ad_dc:local", True),