--- /dev/null
+# Generate a Certificate Signing Request for a certificate
+#
+# Copyright (C) Catalyst.Net Ltd 2025
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <https://www.gnu.org/licenses/>.
+
+
+from typing import Optional
+
+from cryptography import x509
+from cryptography.hazmat.backends import default_backend
+from cryptography.hazmat.primitives import hashes
+from cryptography.hazmat.primitives.asymmetric.rsa import RSAPrivateKey
+from cryptography.hazmat.primitives.serialization import (
+ load_der_private_key,
+ load_pem_private_key,
+)
+from cryptography.x509.base import CertificateSigningRequest
+from samba import asn1, ldb
+from samba.samdb import SamDB
+
+from samba.domain.models import User
+
+
+ID_PKINIT_MS_SAN = x509.ObjectIdentifier("1.3.6.1.4.1.311.20.2.3")
+szOID_NTDS_CA_SECURITY_EXT = x509.ObjectIdentifier("1.3.6.1.4.1.311.25.2")
+
+
+# As the version of python3-cryptography used in CI is too old to include the
+# method x509.Name.from_rfc4514_string(), we must implement it ourselves.
+def x509_name_from_rfc4514_string(rfc4514_string: str) -> x509.Name:
+ # Derived from https://datatracker.ietf.org/doc/html/rfc4514#page-7
+ name_oid_map = {
+ "CN": x509.NameOID.COMMON_NAME,
+ "L": x509.NameOID.LOCALITY_NAME,
+ "ST": x509.NameOID.STATE_OR_PROVINCE_NAME,
+ "O": x509.NameOID.ORGANIZATION_NAME,
+ "OU": x509.NameOID.ORGANIZATIONAL_UNIT_NAME,
+ "C": x509.NameOID.COUNTRY_NAME,
+ "STREET": x509.NameOID.STREET_ADDRESS,
+ "DC": x509.NameOID.DOMAIN_COMPONENT,
+ "UID": x509.NameOID.USER_ID,
+ }
+
+ def name_to_name_oid(name: str) -> x509.ObjectIdentifier:
+ try:
+ return name_oid_map[name]
+ except KeyError:
+ raise ValueError(f"Unknown component ‘{name}’ in RFC4514 string")
+
+ try:
+ dn = ldb.Dn(ldb.Ldb(), rfc4514_string)
+ except ValueError:
+ raise ValueError("Unable to parse RFC4514 string as DN")
+
+ return x509.Name([
+ x509.RelativeDistinguishedName([
+ x509.NameAttribute(
+ name_to_name_oid(dn.get_component_name(i)), dn.get_component_value(i)
+ )
+ ])
+ for i in reversed(range(len(dn)))
+ ])
+
+
+def get_private_key(
+ data: bytes, encoding: Optional[str] = None, password: Optional[str] = None
+) -> RSAPrivateKey:
+ """decode a key in PEM or DER format.
+
+ So far only RSA keys are supported.
+ """
+ encoded_password = None
+ if password is not None:
+ encoded_password = password.encode("utf-8")
+
+ if encoding is None:
+ if data[:11] == b"-----BEGIN ":
+ encoding = "PEM"
+ else:
+ encoding = "DER"
+
+ encoding = encoding.upper()
+
+ # The cryptography module also supports ssh keys, PKCS1, and other formats,
+ # as well as non-RSA keys. It might not be wise to tolerate all of this, but
+ # we can do it by adding to key_fns here.
+ if encoding == "PEM":
+ key_fns = [load_pem_private_key]
+ elif encoding == "DER":
+ key_fns = [load_der_private_key]
+ else:
+ raise ValueError(
+ f"Private key encoding '{encoding}' not supported (try 'PEM' or 'DER')"
+ )
+
+ key = None
+ for fn in key_fns:
+ try:
+ key = fn(data, encoded_password)
+ break
+ except ValueError:
+ continue
+ except TypeError:
+ if password is None:
+ raise ValueError("No password supplied to decrypt private key")
+ else:
+ raise ValueError("Password supplied but private key isn’t encrypted")
+
+ if key is None:
+ raise ValueError("could not decode private key")
+
+ if not isinstance(key, RSAPrivateKey):
+ raise ValueError(f"Currently only RSA Private Keys are supported (not '{key}')")
+
+ return key
+
+
+def generate_csr(
+ samdb: SamDB,
+ user: User,
+ subject_name: str,
+ private_key_filename: str,
+ *,
+ private_key_encoding: Optional[str] = "auto",
+ private_key_pass: Optional[str] = None,
+) -> CertificateSigningRequest:
+ if private_key_encoding == "auto":
+ private_key_encoding = None
+
+ certificate_signature = hashes.SHA256
+
+ account_name = user.account_name
+ if user.user_principal_name is not None:
+ account_upn = user.user_principal_name
+ else:
+ realm = samdb.domain_dns_name()
+ account_upn = f"{account_name}@{realm.lower()}"
+
+ builder = x509.CertificateSigningRequestBuilder()
+ # Add the subject name.
+ builder = builder.subject_name(x509_name_from_rfc4514_string(subject_name))
+
+ with open(private_key_filename, "rb") as private_key_file:
+ private_key_bytes = private_key_file.read()
+
+ private_key = get_private_key(
+ private_key_bytes, encoding=private_key_encoding, password=private_key_pass
+ )
+ public_key = private_key.public_key()
+
+ # Add the SubjectAlternativeName. Windows uses this to map the account
+ # to the certificate.
+
+ encoded_upn = account_upn.encode("utf-8")
+ encoded_upn = bytes([0x0C]) + asn1.asn1_length(encoded_upn) + encoded_upn
+
+ ms_upn_san = x509.OtherName(ID_PKINIT_MS_SAN, encoded_upn)
+ alt_names = [ms_upn_san]
+ builder = builder.add_extension(
+ x509.SubjectAlternativeName(alt_names),
+ critical=False,
+ )
+
+ builder = builder.add_extension(
+ x509.BasicConstraints(ca=False, path_length=None),
+ critical=True,
+ )
+
+ # The key identifier is used to identify the certificate.
+ subject_key_id = x509.SubjectKeyIdentifier.from_public_key(public_key)
+ builder = builder.add_extension(
+ subject_key_id,
+ critical=True,
+ )
+
+ # Add the key usages for which this certificate is valid. Windows
+ # doesn’t actually require this extension to be present.
+ builder = builder.add_extension(
+ # Heimdal requires that the certificate be valid for digital
+ # signatures.
+ x509.KeyUsage(
+ digital_signature=True,
+ content_commitment=False,
+ key_encipherment=False,
+ data_encipherment=False,
+ key_agreement=False,
+ key_cert_sign=False,
+ crl_sign=False,
+ encipher_only=False,
+ decipher_only=False,
+ ),
+ critical=True,
+ )
+
+ # Windows doesn’t require this extension to be present either; but if
+ # it is, Windows will not accept the certificate unless either client
+ # authentication or smartcard logon is specified, returning
+ # KDC_ERR_INCONSISTENT_KEY_PURPOSE otherwise.
+ builder = builder.add_extension(
+ x509.ExtendedKeyUsage([
+ x509.oid.ExtendedKeyUsageOID.CLIENT_AUTH,
+ ]),
+ critical=False,
+ )
+
+ # If the certificate predates (as ours does) the existence of the
+ # account that presents it Windows will refuse to accept it unless
+ # there exists a strong mapping from one to the other. This strong
+ # mapping will in this case take the form of a certificate extension
+ # described in [MS-WCCE] 2.2.2.7.7.4 (szOID_NTDS_CA_SECURITY_EXT) and
+ # containing the account’s SID.
+
+ # Encode this structure manually until we are able to produce the same
+ # ASN.1 encoding that Windows does.
+
+ encoded_sid = user.object_sid.encode("utf-8")
+
+ # The OCTET STRING tag, followed by length and encoded SID…
+ security_ext = bytes([0x04]) + asn1.asn1_length(encoded_sid) + (encoded_sid)
+
+ # …enclosed in a construct tagged with the application-specific value
+ # 0…
+ security_ext = bytes([0xA0]) + asn1.asn1_length(security_ext) + (security_ext)
+
+ # …preceded by the extension OID…
+
+ encoded_oid = bytes.fromhex("060a2b060104018237190201")
+ security_ext = encoded_oid + security_ext
+
+ # …and another application-specific tag 0…
+ # (This is the part about which I’m unsure. This length is not just of
+ # the OID, but of the entire structure so far, as if there’s some
+ # nesting going on. So far I haven’t been able to replicate this with
+ # pyasn1.)
+ security_ext = bytes([0xA0]) + asn1.asn1_length(security_ext) + (security_ext)
+
+ # …all enclosed in a structure with a SEQUENCE tag.
+ security_ext = bytes([0x30]) + asn1.asn1_length(security_ext) + (security_ext)
+
+ # Add the security extension to the certificate.
+ builder = builder.add_extension(
+ x509.UnrecognizedExtension(
+ szOID_NTDS_CA_SECURITY_EXT,
+ security_ext,
+ ),
+ critical=False,
+ )
+
+ # Sign the certificate with the user’s private key.
+ return builder.sign(
+ private_key=private_key,
+ algorithm=certificate_signature(),
+ backend=default_backend(),
+ )
from samba.common import get_bytes
from subprocess import check_call, CalledProcessError
from . import common
+from .computer_generate_csr import cmd_computer_generate_csr
from .computer_keytrust import cmd_computer_keytrust
subcommands["create"] = cmd_computer_add()
subcommands["delete"] = cmd_computer_delete()
subcommands["edit"] = cmd_computer_edit()
+ subcommands["generate-csr"] = cmd_computer_generate_csr()
subcommands["list"] = cmd_computer_list()
subcommands["show"] = cmd_computer_show()
subcommands["move"] = cmd_computer_move()
--- /dev/null
+# samba-tool commands to generate a Certificate Signing Request for a computer’s
+# certificate
+#
+# Copyright (C) Catalyst.Net Ltd 2025
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <https://www.gnu.org/licenses/>.
+
+from typing import Optional
+
+from cryptography.hazmat.primitives import serialization
+import samba.getopt as options
+from samba.netcmd import Command, Option
+from samba.netcmd import exception_to_command_error
+
+from samba.domain.models import Computer
+from samba.domain.models.exceptions import ModelError
+from samba.generate_csr import generate_csr
+
+
+class cmd_computer_generate_csr(Command):
+ """Generate a PEM‐encoded Certificate Signing Request for a computer."""
+
+ synopsis = "%prog <computername> <subject_name> <private_key_filename> <output_filename> [options]"
+
+ takes_args = [
+ "computername",
+ "subject_name",
+ "private_key_filename",
+ "output_filename",
+ ]
+
+ takes_optiongroups = {
+ "sambaopts": options.SambaOptions,
+ "credopts": options.CredentialsOptions,
+ "hostopts": options.HostOptions,
+ }
+
+ takes_options = [
+ Option(
+ "--private-key-encoding",
+ default="auto",
+ choices=("pem", "der", "auto"),
+ help="Private key encoding (optional)",
+ ),
+ Option(
+ "--private-key-pass",
+ help="Password to decrypt private key (optional)",
+ ),
+ ]
+
+ @exception_to_command_error(ValueError, ModelError, FileNotFoundError)
+ def run(
+ self,
+ computername: str,
+ subject_name: str,
+ private_key_filename: str,
+ output_filename: str,
+ *,
+ hostopts: Optional[options.HostOptions] = None,
+ sambaopts: Optional[options.SambaOptions] = None,
+ credopts: Optional[options.CredentialsOptions] = None,
+ private_key_encoding: Optional[str] = "auto",
+ private_key_pass: Optional[str] = None,
+ ):
+ if private_key_encoding == "auto":
+ private_key_encoding = None
+
+ samdb = self.ldb_connect(hostopts, sambaopts, credopts)
+ computer: Computer = Computer.find(samdb, computername)
+
+ csr = generate_csr(
+ samdb,
+ computer,
+ subject_name,
+ private_key_filename,
+ private_key_encoding=private_key_encoding,
+ private_key_pass=private_key_pass,
+ )
+
+ serialized = csr.public_bytes(serialization.Encoding.PEM)
+ with open(output_filename, "wb") as output_file:
+ _ = output_file.write(serialized)
from .disable import cmd_user_disable
from .edit import cmd_user_edit
from .enable import cmd_user_enable
+from .generate_csr import cmd_user_generate_csr
from .getgroups import cmd_user_getgroups
from .keytrust import cmd_user_keytrust
from .list import cmd_user_list
subcommands["list"] = cmd_user_list()
subcommands["setexpiry"] = cmd_user_setexpiry()
subcommands["password"] = cmd_user_password()
+ subcommands["generate-csr"] = cmd_user_generate_csr()
subcommands["getgroups"] = cmd_user_getgroups()
subcommands["setprimarygroup"] = cmd_user_setprimarygroup()
subcommands["setpassword"] = cmd_user_setpassword()
--- /dev/null
+# samba-tool commands to generate a Certificate Signing Request for a user’s
+# certificate
+#
+# Copyright (C) Catalyst.Net Ltd 2025
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <https://www.gnu.org/licenses/>.
+
+from typing import Optional
+
+from cryptography.hazmat.primitives import serialization
+import samba.getopt as options
+from samba.netcmd import Command, Option
+from samba.netcmd import exception_to_command_error
+
+from samba.domain.models import User
+from samba.domain.models.exceptions import ModelError
+from samba.generate_csr import generate_csr
+
+
+class cmd_user_generate_csr(Command):
+ """Generate a PEM‐encoded Certificate Signing Request for a user."""
+
+ synopsis = "%prog <username> <subject_name> <private_key_filename> <output_filename> [options]"
+
+ takes_args = ["username", "subject_name", "private_key_filename", "output_filename"]
+
+ takes_optiongroups = {
+ "sambaopts": options.SambaOptions,
+ "credopts": options.CredentialsOptions,
+ "hostopts": options.HostOptions,
+ }
+
+ takes_options = [
+ Option(
+ "--private-key-encoding",
+ default="auto",
+ choices=("pem", "der", "auto"),
+ help="Private key encoding (optional)",
+ ),
+ Option(
+ "--private-key-pass",
+ help="Password to decrypt private key (optional)",
+ ),
+ ]
+
+ @exception_to_command_error(ValueError, ModelError, FileNotFoundError)
+ def run(
+ self,
+ username: str,
+ subject_name: str,
+ private_key_filename: str,
+ output_filename: str,
+ *,
+ hostopts: Optional[options.HostOptions] = None,
+ sambaopts: Optional[options.SambaOptions] = None,
+ credopts: Optional[options.CredentialsOptions] = None,
+ private_key_encoding: Optional[str] = "auto",
+ private_key_pass: Optional[str] = None,
+ ):
+ if private_key_encoding == "auto":
+ private_key_encoding = None
+
+ samdb = self.ldb_connect(hostopts, sambaopts, credopts)
+ user: User = User.find(samdb, username)
+
+ csr = generate_csr(
+ samdb,
+ user,
+ subject_name,
+ private_key_filename,
+ private_key_encoding=private_key_encoding,
+ private_key_pass=private_key_pass,
+ )
+
+ serialized = csr.public_bytes(serialization.Encoding.PEM)
+ with open(output_filename, "wb") as output_file:
+ _ = output_file.write(serialized)
--- /dev/null
+# Unix SMB/CIFS implementation.
+#
+# Tests for `samba-tool user generate-csr`
+#
+# Copyright (C) Catalyst.Net Ltd 2025
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+#
+
+import os
+from typing import Optional
+
+from samba.domain.models import Computer, User
+from samba.tests.samba_tool.base import SambaToolCmdTest
+
+from cryptography import x509
+from cryptography.hazmat.backends import default_backend
+from cryptography.hazmat.primitives import serialization
+from cryptography.hazmat.primitives.asymmetric import rsa
+from cryptography.x509.oid import NameOID
+
+
+szOID_NTDS_CA_SECURITY_EXT = x509.ObjectIdentifier("1.3.6.1.4.1.311.25.2")
+
+
+HOST = "ldap://{DC_SERVER}".format(**os.environ)
+CREDS = "-U{DC_USERNAME}%{DC_PASSWORD}".format(**os.environ)
+
+
+class SambaToolUserGenerateCsrTest(SambaToolCmdTest):
+ cmd = "user"
+ model = User
+ user = "alice"
+
+ @classmethod
+ def setUpClass(cls):
+ super().setUpClass()
+ cls.samdb = cls.getSamDB("-H", HOST, CREDS)
+
+ def test_create_private_key_pem(self):
+ self._test_create_private_key(encoding=serialization.Encoding.PEM)
+
+ def test_create_private_key_der(self):
+ self._test_create_private_key(encoding=serialization.Encoding.DER)
+
+ def test_create_private_key_password(self):
+ self._test_create_private_key(password="pass1234")
+
+ def _test_create_private_key(
+ self,
+ *,
+ encoding: serialization.Encoding = serialization.Encoding.PEM,
+ password: Optional[str] = None,
+ ):
+ private_key = rsa.generate_private_key(
+ public_exponent=65537, key_size=2048, backend=default_backend()
+ )
+
+ if password is not None:
+ encryption_algorithm = serialization.BestAvailableEncryption(
+ password.encode("utf-8")
+ )
+ else:
+ encryption_algorithm = serialization.NoEncryption()
+
+ private_key_bytes = private_key.private_bytes(
+ encoding=encoding,
+ format=serialization.PrivateFormat.PKCS8,
+ encryption_algorithm=encryption_algorithm,
+ )
+
+ user = self.model.find(self.samdb, self.user)
+ account_name = user.account_name
+ if user.user_principal_name is not None:
+ account_upn = user.user_principal_name
+ else:
+ realm = self.samdb.domain_dns_name()
+ account_upn = f"{account_name}@{realm.lower()}"
+
+ subject_name = x509.Name([
+ # Note that the subject name is used in certificate mappings
+ x509.NameAttribute(NameOID.COUNTRY_NAME, "US"),
+ x509.NameAttribute(NameOID.STATE_OR_PROVINCE_NAME, "SambaState"),
+ x509.NameAttribute(NameOID.ORGANIZATION_NAME, "SambaSelfTesting"),
+ x509.NameAttribute(NameOID.ORGANIZATIONAL_UNIT_NAME, "Users"),
+ x509.NameAttribute(NameOID.COMMON_NAME, account_upn),
+ ])
+ subject_name_str = subject_name.rfc4514_string()
+
+ with self.mktemp() as private_key_filename, self.mktemp() as csr_filename:
+ with open(private_key_filename, "wb") as private_key_file:
+ _ = private_key_file.write(private_key_bytes)
+
+ cmd = [
+ self.cmd,
+ "generate-csr",
+ "-H",
+ HOST,
+ CREDS,
+ self.user,
+ subject_name_str,
+ private_key_filename,
+ csr_filename,
+ ]
+ if password is not None:
+ cmd.append("--private-key-pass")
+ cmd.append(password)
+
+ result, out, err = self.runcmd(*cmd)
+ self.assertCmdSuccess(result, out, err)
+
+ with open(csr_filename, "rb") as csr_file:
+ csr_bytes = csr_file.read()
+
+ csr = x509.load_pem_x509_csr(csr_bytes)
+
+ self.assertEqual(subject_name, csr.subject)
+
+ try:
+ sid_extension = csr.extensions.get_extension_for_oid(
+ szOID_NTDS_CA_SECURITY_EXT,
+ )
+ except x509.ExtensionNotFound:
+ self.fail("expected to find SID extension")
+
+ # We don’t check the whole ASN.1 structure is correct, just that it
+ # contains the encoded SID.
+ encoded_sid = user.object_sid.encode("utf-8")
+ self.assertIn(encoded_sid, sid_extension.value.value)
+
+
+class SambaToolComputerGenerateCsrTest(SambaToolUserGenerateCsrTest):
+ cmd = "computer"
+ model = Computer
+ user = "ADDC"
+
+
+if __name__ == "__main__":
+ global_asn1_print = False
+ global_hexdump = False
+ import unittest
+
+ unittest.main()
planpythontestsuite("ad_dc_default", "samba.tests.samba_tool.user_auth_policy")
planpythontestsuite("ad_dc_default", "samba.tests.samba_tool.user_auth_silo")
planpythontestsuite("ad_dc_default", "samba.tests.samba_tool.user_keytrust")
+planpythontestsuite("ad_dc_default", "samba.tests.samba_tool.user_generate_csr")
for env in ["ad_dc_default:local", "ad_dc_no_ntlm:local"]:
planpythontestsuite(env, "samba.tests.samba_tool.user_wdigest")
for env, nt_hash in [("ad_dc:local", True),