import os
from datetime import datetime, timezone
import tempfile
+import binascii
+import struct
sys.path.insert(0, "bin/python")
os.environ["PYTHONUNBUFFERED"] = "1"
from samba import generate_random_password
from samba.auth import system_session
from samba.credentials import Credentials, SPECIFIED, MUST_USE_KERBEROS
-from samba.dcerpc import krb5pac, krb5ccache, security
+from samba.dcerpc import drsblobs, drsuapi, misc, krb5pac, krb5ccache, security
+from samba.drs_utils import drsuapi_connect
from samba.dsdb import (
DS_DOMAIN_FUNCTION_2000,
DS_DOMAIN_FUNCTION_2008,
UF_NORMAL_ACCOUNT
)
from samba.ndr import ndr_pack, ndr_unpack
+from samba import net
from samba.samdb import SamDB
from samba.tests import delete_force
return (creds, dn)
+ def get_keys(self, samdb, dn):
+ admin_creds = self.get_admin_creds()
+
+ dns_hostname = samdb.host_dns_name()
+ (bind, handle, _) = drsuapi_connect(dns_hostname,
+ self.get_lp(),
+ admin_creds)
+
+ destination_dsa_guid = misc.GUID(samdb.get_ntds_GUID())
+
+ req = drsuapi.DsGetNCChangesRequest8()
+
+ req.destination_dsa_guid = destination_dsa_guid
+ req.source_dsa_invocation_id = misc.GUID()
+
+ naming_context = drsuapi.DsReplicaObjectIdentifier()
+ naming_context.dn = str(dn)
+
+ req.naming_context = naming_context
+
+ hwm = drsuapi.DsReplicaHighWaterMark()
+ hwm.tmp_highest_usn = 0
+ hwm.reserved_usn = 0
+ hwm.highest_usn = 0
+
+ req.highwatermark = hwm
+ req.uptodateness_vector = None
+
+ req.replica_flags = 0
+
+ req.max_object_count = 1
+ req.max_ndr_size = 402116
+ req.extended_op = drsuapi.DRSUAPI_EXOP_REPL_SECRET
+
+ attids = [drsuapi.DRSUAPI_ATTID_supplementalCredentials,
+ drsuapi.DRSUAPI_ATTID_unicodePwd]
+
+ partial_attribute_set = drsuapi.DsPartialAttributeSet()
+ partial_attribute_set.version = 1
+ partial_attribute_set.attids = attids
+ partial_attribute_set.num_attids = len(attids)
+
+ req.partial_attribute_set = partial_attribute_set
+
+ req.partial_attribute_set_ex = None
+ req.mapping_ctr.num_mappings = 0
+ req.mapping_ctr.mappings = None
+
+ _, ctr = bind.DsGetNCChanges(handle, 8, req)
+ identifier = ctr.first_object.object.identifier
+ attributes = ctr.first_object.object.attribute_ctr.attributes
+
+ rid = identifier.sid.split()[1]
+
+ forced_keys = dict()
+
+ net_ctx = net.Net(admin_creds)
+
+ keys = {}
+
+ for attr in attributes:
+ if attr.attid == drsuapi.DRSUAPI_ATTID_supplementalCredentials:
+ net_ctx.replicate_decrypt(bind, attr, rid)
+ attr_val = attr.value_ctr.values[0].blob
+
+ spl = ndr_unpack(drsblobs.supplementalCredentialsBlob,
+ attr_val)
+ for pkg in spl.sub.packages:
+ if pkg.name == 'Primary:Kerberos-Newer-Keys':
+ krb5_new_keys_raw = binascii.a2b_hex(pkg.data)
+ krb5_new_keys = ndr_unpack(
+ drsblobs.package_PrimaryKerberosBlob,
+ krb5_new_keys_raw)
+ for key in krb5_new_keys.ctr.keys:
+ keytype = key.keytype
+ if keytype in (kcrypto.Enctype.AES256,
+ kcrypto.Enctype.AES128):
+ keys[keytype] = key.value.hex()
+ elif attr.attid == drsuapi.DRSUAPI_ATTID_unicodePwd:
+ net_ctx.replicate_decrypt(bind, attr, rid)
+ pwd = attr.value_ctr.values[0].blob
+ keys[kcrypto.Enctype.RC4] = pwd.hex()
+
+ default_enctypes = self.get_default_enctypes()
+
+ if default_enctypes & security.KERB_ENCTYPE_RC4_HMAC_MD5:
+ self.assertIn(kcrypto.Enctype.RC4, keys)
+ if default_enctypes & security.KERB_ENCTYPE_AES256_CTS_HMAC_SHA1_96:
+ self.assertIn(kcrypto.Enctype.AES256, keys)
+ if default_enctypes & security.KERB_ENCTYPE_AES128_CTS_HMAC_SHA1_96:
+ self.assertIn(kcrypto.Enctype.AES128, keys)
+
+ return keys
+
def as_req(self, cname, sname, realm, etypes, padata=None):
'''Send a Kerberos AS_REQ, returns the undecoded response
'''