]> git.ipfire.org Git - thirdparty/samba.git/commitdiff
tests/krb5/raw_testcase.py: Add method to obtain Kerberos keys over DRS
authorJoseph Sutton <josephsutton@catalyst.net.nz>
Tue, 15 Jun 2021 01:15:10 +0000 (13:15 +1200)
committerStefan Metzmacher <metze@samba.org>
Thu, 1 Jul 2021 17:46:31 +0000 (17:46 +0000)
This requires admin credentials, and removes the need to pass these keys
as environment variables.

Signed-off-by: Joseph Sutton <josephsutton@catalyst.net.nz>
Reviewed-by: Andrew Bartlett <abartlet@samba.org>
Reviewed-by: Stefan Metzmacher <metze@samba.org>
python/samba/tests/krb5/kdc_base_test.py

index e1b73dd8ff73983e4d74e970ed061bee701409f0..7ae22bc59291fc24123a9aab0586efb10cf1c94c 100644 (file)
@@ -20,6 +20,8 @@ import sys
 import os
 from datetime import datetime, timezone
 import tempfile
+import binascii
+import struct
 
 sys.path.insert(0, "bin/python")
 os.environ["PYTHONUNBUFFERED"] = "1"
@@ -29,7 +31,8 @@ from ldb import SCOPE_BASE
 from samba import generate_random_password
 from samba.auth import system_session
 from samba.credentials import Credentials, SPECIFIED, MUST_USE_KERBEROS
-from samba.dcerpc import krb5pac, krb5ccache, security
+from samba.dcerpc import drsblobs, drsuapi, misc, krb5pac, krb5ccache, security
+from samba.drs_utils import drsuapi_connect
 from samba.dsdb import (
     DS_DOMAIN_FUNCTION_2000,
     DS_DOMAIN_FUNCTION_2008,
@@ -37,6 +40,7 @@ from samba.dsdb import (
     UF_NORMAL_ACCOUNT
 )
 from samba.ndr import ndr_pack, ndr_unpack
+from samba import net
 from samba.samdb import SamDB
 
 from samba.tests import delete_force
@@ -191,6 +195,100 @@ class KDCBaseTest(RawKerberosTest):
 
         return (creds, dn)
 
+    def get_keys(self, samdb, dn):
+        admin_creds = self.get_admin_creds()
+
+        dns_hostname = samdb.host_dns_name()
+        (bind, handle, _) = drsuapi_connect(dns_hostname,
+                                            self.get_lp(),
+                                            admin_creds)
+
+        destination_dsa_guid = misc.GUID(samdb.get_ntds_GUID())
+
+        req = drsuapi.DsGetNCChangesRequest8()
+
+        req.destination_dsa_guid = destination_dsa_guid
+        req.source_dsa_invocation_id = misc.GUID()
+
+        naming_context = drsuapi.DsReplicaObjectIdentifier()
+        naming_context.dn = str(dn)
+
+        req.naming_context = naming_context
+
+        hwm = drsuapi.DsReplicaHighWaterMark()
+        hwm.tmp_highest_usn = 0
+        hwm.reserved_usn = 0
+        hwm.highest_usn = 0
+
+        req.highwatermark = hwm
+        req.uptodateness_vector = None
+
+        req.replica_flags = 0
+
+        req.max_object_count = 1
+        req.max_ndr_size = 402116
+        req.extended_op = drsuapi.DRSUAPI_EXOP_REPL_SECRET
+
+        attids = [drsuapi.DRSUAPI_ATTID_supplementalCredentials,
+                  drsuapi.DRSUAPI_ATTID_unicodePwd]
+
+        partial_attribute_set = drsuapi.DsPartialAttributeSet()
+        partial_attribute_set.version = 1
+        partial_attribute_set.attids = attids
+        partial_attribute_set.num_attids = len(attids)
+
+        req.partial_attribute_set = partial_attribute_set
+
+        req.partial_attribute_set_ex = None
+        req.mapping_ctr.num_mappings = 0
+        req.mapping_ctr.mappings = None
+
+        _, ctr = bind.DsGetNCChanges(handle, 8, req)
+        identifier = ctr.first_object.object.identifier
+        attributes = ctr.first_object.object.attribute_ctr.attributes
+
+        rid = identifier.sid.split()[1]
+
+        forced_keys = dict()
+
+        net_ctx = net.Net(admin_creds)
+
+        keys = {}
+
+        for attr in attributes:
+            if attr.attid == drsuapi.DRSUAPI_ATTID_supplementalCredentials:
+                net_ctx.replicate_decrypt(bind, attr, rid)
+                attr_val = attr.value_ctr.values[0].blob
+
+                spl = ndr_unpack(drsblobs.supplementalCredentialsBlob,
+                                 attr_val)
+                for pkg in spl.sub.packages:
+                    if pkg.name == 'Primary:Kerberos-Newer-Keys':
+                        krb5_new_keys_raw = binascii.a2b_hex(pkg.data)
+                        krb5_new_keys = ndr_unpack(
+                            drsblobs.package_PrimaryKerberosBlob,
+                            krb5_new_keys_raw)
+                        for key in krb5_new_keys.ctr.keys:
+                            keytype = key.keytype
+                            if keytype in (kcrypto.Enctype.AES256,
+                                           kcrypto.Enctype.AES128):
+                                keys[keytype] = key.value.hex()
+            elif attr.attid == drsuapi.DRSUAPI_ATTID_unicodePwd:
+                net_ctx.replicate_decrypt(bind, attr, rid)
+                pwd = attr.value_ctr.values[0].blob
+                keys[kcrypto.Enctype.RC4] = pwd.hex()
+
+        default_enctypes = self.get_default_enctypes()
+
+        if default_enctypes & security.KERB_ENCTYPE_RC4_HMAC_MD5:
+            self.assertIn(kcrypto.Enctype.RC4, keys)
+        if default_enctypes & security.KERB_ENCTYPE_AES256_CTS_HMAC_SHA1_96:
+            self.assertIn(kcrypto.Enctype.AES256, keys)
+        if default_enctypes & security.KERB_ENCTYPE_AES128_CTS_HMAC_SHA1_96:
+            self.assertIn(kcrypto.Enctype.AES128, keys)
+
+        return keys
+
     def as_req(self, cname, sname, realm, etypes, padata=None):
         '''Send a Kerberos AS_REQ, returns the undecoded response
         '''