import random
import binascii
import itertools
+from pyasn1.codec.der.decoder import decode as pyasn1_der_decode
+from pyasn1.codec.der.encoder import encode as pyasn1_der_encode
+from pyasn1.codec.native.decoder import decode as pyasn1_native_decode
+from pyasn1.codec.native.encoder import encode as pyasn1_native_encode
+
+from pyasn1.codec.ber.encoder import BitStringEncoder
-import samba.tests
from samba.credentials import Credentials
-from samba.tests import TestCaseInTempDir
from samba.dcerpc import security
+
+import samba.tests
+from samba.tests import TestCaseInTempDir
+
import samba.tests.krb5.rfc4120_pyasn1 as krb5_asn1
from samba.tests.krb5.rfc4120_constants import (
KDC_ERR_ETYPE_NOSUPP,
)
import samba.tests.krb5.kcrypto as kcrypto
-from pyasn1.codec.der.decoder import decode as pyasn1_der_decode
-from pyasn1.codec.der.encoder import encode as pyasn1_der_encode
-from pyasn1.codec.native.decoder import decode as pyasn1_native_decode
-from pyasn1.codec.native.encoder import encode as pyasn1_native_encode
-
-from pyasn1.codec.ber.encoder import BitStringEncoder as BitStringEncoder
-
def BitStringEncoder_encodeValue32(
self, value, asn1Spec, encodeFun, **options):
}
return EncryptionKey_obj
+
class KerberosCredentials(Credentials):
def __init__(self):
super(KerberosCredentials, self).__init__()
def get_forced_salt(self):
return self.forced_salt
+
class KerberosTicketCreds(object):
def __init__(self, ticket, session_key,
crealm=None, cname=None,
self.encpart_private = encpart_private
return
+
class RawKerberosTest(TestCaseInTempDir):
"""A raw Kerberos Test case."""
etypes_to_test = (
- { "value": -1111, "name": "dummy", },
- { "value": kcrypto.Enctype.AES256, "name": "aes128", },
- { "value": kcrypto.Enctype.AES128, "name": "aes256", },
- { "value": kcrypto.Enctype.RC4, "name": "rc4", },
+ {"value": -1111, "name": "dummy", },
+ {"value": kcrypto.Enctype.AES256, "name": "aes128", },
+ {"value": kcrypto.Enctype.AES128, "name": "aes256", },
+ {"value": kcrypto.Enctype.RC4, "name": "rc4", },
)
setup_etype_test_permutations_done = False
num_idxs = len(cls.etypes_to_test)
permutations = []
- for num in range(1, num_idxs+1):
+ for num in range(1, num_idxs + 1):
chunk = list(itertools.permutations(range(num_idxs), num))
for e in chunk:
el = list(e)
name += "_%s" % n
etypes += (cls.etypes_to_test[idx]["value"],)
- r = { "name": name, "etypes": etypes, }
+ r = {"name": name, "etypes": etypes, }
res.append(r)
cls.etype_test_permutations = res
self.do_asn1_print = False
self.do_hexdump = False
- strict_checking = samba.tests.env_get_var_value('STRICT_CHECKING', allow_missing=True)
+ strict_checking = samba.tests.env_get_var_value('STRICT_CHECKING',
+ allow_missing=True)
if strict_checking is None:
strict_checking = '1'
self.strict_checking = bool(int(strict_checking))
val = None
if prefix is not None:
allow_missing_prefix = allow_missing or fallback_default
- val = samba.tests.env_get_var_value('%s_%s' % (prefix, varname),
- allow_missing=allow_missing_prefix)
+ val = samba.tests.env_get_var_value(
+ '%s_%s' % (prefix, varname),
+ allow_missing=allow_missing_prefix)
else:
fallback_default = True
if val is None and fallback_default:
if aes256_key is not None:
c.set_forced_key(kcrypto.Enctype.AES256, aes256_key)
aes128_key = self.env_get_var('AES128_KEY_HEX', prefix,
- fallback_default=False, allow_missing=True)
+ fallback_default=False,
+ allow_missing=True)
if aes128_key is not None:
c.set_forced_key(kcrypto.Enctype.AES128, aes128_key)
rc4_key = self.env_get_var('RC4_KEY_HEX', prefix,
env_err = None
try:
# Try to obtain them from the environment
- creds = self._get_krb5_creds_from_env(prefix,
- default_username=default_username,
- allow_missing_password=allow_missing_password,
- allow_missing_keys=allow_missing_keys,
- require_strongest_key=require_strongest_key)
+ creds = self._get_krb5_creds_from_env(
+ prefix,
+ default_username=default_username,
+ allow_missing_password=allow_missing_password,
+ allow_missing_keys=allow_missing_keys,
+ require_strongest_key=require_strongest_key)
except Exception as err:
# An error occurred, so save it for later
env_err = err
return s
def get_Nonce(self):
- nonce_min=0x7f000000
- nonce_max=0x7fffffff
+ nonce_min = 0x7f000000
+ nonce_max = 0x7fffffff
v = random.randint(nonce_min, nonce_max)
return v
if etype == kcrypto.Enctype.RC4:
nthash = creds.get_nt_hash()
self.assertIsNotNone(nthash, msg=fail_msg)
- return self.SessionKey_create(etype=etype, contents=nthash, kvno=kvno)
+ return self.SessionKey_create(etype=etype,
+ contents=nthash,
+ kvno=kvno)
password = creds.get_password()
self.assertIsNotNone(password, msg=fail_msg)
salt = creds.get_forced_salt()
if salt is None:
salt = bytes("%s%s" % (creds.get_realm(), creds.get_username()),
- encoding='utf-8')
- return self.PasswordKey_create(etype=etype, pwd=password, salt=salt, kvno=kvno)
+ encoding='utf-8')
+ return self.PasswordKey_create(etype=etype,
+ pwd=password,
+ salt=salt,
+ kvno=kvno)
def RandomKey(self, etype):
e = kcrypto._get_enctype_profile(etype)
return PA_ENC_TS_ENC_obj
def KERB_PA_PAC_REQUEST_create(self, include_pac, pa_data_create=True):
- #KERB-PA-PAC-REQUEST ::= SEQUENCE {
- # include-pac[0] BOOLEAN --If TRUE, and no pac present, include PAC.
- # --If FALSE, and PAC present, remove PAC
- #}
+ # KERB-PA-PAC-REQUEST ::= SEQUENCE {
+ # include-pac[0] BOOLEAN --If TRUE, and no pac present,
+ # -- include PAC.
+ # --If FALSE, and PAC present,
+ # -- remove PAC.
+ # }
KERB_PA_PAC_REQUEST_obj = {
'include-pac': include_pac,
}
return KERB_PA_PAC_REQUEST_obj
pa_pac = self.der_encode(KERB_PA_PAC_REQUEST_obj,
asn1Spec=krb5_asn1.KERB_PA_PAC_REQUEST())
- pa_data = self.PA_DATA_create(128, pa_pac) # PA-PAC-REQUEST
+ pa_data = self.PA_DATA_create(128, pa_pac) # PA-PAC-REQUEST
return pa_data
def KDC_REQ_BODY_create(self,
EncAuthorizationData=EncAuthorizationData,
EncAuthorizationData_key=EncAuthorizationData_key,
additional_tickets=additional_tickets)
- req_body_blob = self.der_encode(req_body, asn1Spec=krb5_asn1.KDC_REQ_BODY(),
+ req_body_blob = self.der_encode(req_body,
+ asn1Spec=krb5_asn1.KDC_REQ_BODY(),
asn1_print=asn1_print, hexdump=hexdump)
- req_body_checksum = self.Checksum_create(
- ticket_session_key, 6, req_body_blob, ctype=body_checksum_type)
+ req_body_checksum = self.Checksum_create(ticket_session_key,
+ 6,
+ req_body_blob,
+ ctype=body_checksum_type)
subkey_obj = None
if authenticator_subkey is not None:
cksum_data += n.encode()
cksum_data += realm.encode()
cksum_data += "Kerberos".encode()
- cksum = self.Checksum_create(tgt_session_key, 17, cksum_data, ctype)
+ cksum = self.Checksum_create(tgt_session_key,
+ 17,
+ cksum_data,
+ ctype)
PA_S4U2Self_obj = {
'name': name,
return self.PA_DATA_create(129, pa_s4u2self)
def _generic_kdc_exchange(self,
- kdc_exchange_dict, # required
- kdc_options=None, # required
- cname=None, # optional
- realm=None, # required
- sname=None, # optional
- from_time=None, # optional
- till_time=None, # required
- renew_time=None, # optional
- nonce=None, # required
- etypes=None, # required
- addresses=None, # optional
- EncAuthorizationData=None, # optional
- EncAuthorizationData_key=None, # optional
- additional_tickets=None): # optional
+ kdc_exchange_dict, # required
+ kdc_options=None, # required
+ cname=None, # optional
+ realm=None, # required
+ sname=None, # optional
+ from_time=None, # optional
+ till_time=None, # required
+ renew_time=None, # optional
+ nonce=None, # required
+ etypes=None, # required
+ addresses=None, # optional
+ EncAuthorizationData=None, # optional
+ EncAuthorizationData_key=None, # optional
+ additional_tickets=None): # optional
check_error_fn = kdc_exchange_dict['check_error_fn']
check_rep_fn = kdc_exchange_dict['check_rep_fn']
if nonce is None:
nonce = self.get_Nonce()
- req_body = self.KDC_REQ_BODY_create(kdc_options=kdc_options,
- cname=cname,
- realm=realm,
- sname=sname,
- from_time=from_time,
- till_time=till_time,
- renew_time=renew_time,
- nonce=nonce,
- etypes=etypes,
- addresses=addresses,
- EncAuthorizationData=EncAuthorizationData,
- EncAuthorizationData_key=EncAuthorizationData_key,
- additional_tickets=additional_tickets)
+ req_body = self.KDC_REQ_BODY_create(
+ kdc_options=kdc_options,
+ cname=cname,
+ realm=realm,
+ sname=sname,
+ from_time=from_time,
+ till_time=till_time,
+ renew_time=renew_time,
+ nonce=nonce,
+ etypes=etypes,
+ addresses=addresses,
+ EncAuthorizationData=EncAuthorizationData,
+ EncAuthorizationData_key=EncAuthorizationData_key,
+ additional_tickets=additional_tickets)
if generate_padata_fn is not None:
# This can alter req_body...
padata, req_body = generate_padata_fn(kdc_exchange_dict,
kdc_exchange_dict['req_padata'] = padata
kdc_exchange_dict['req_body'] = req_body
- req_obj,req_decoded = self.KDC_REQ_create(msg_type=req_msg_type,
- padata=padata,
- req_body=req_body,
- asn1Spec=req_asn1Spec())
+ req_obj, req_decoded = self.KDC_REQ_create(msg_type=req_msg_type,
+ padata=padata,
+ req_body=req_body,
+ asn1Spec=req_asn1Spec())
rep = self.send_recv_transaction(req_decoded)
self.assertIsNotNone(rep)
rep_encpart_asn1Spec = kdc_exchange_dict['rep_encpart_asn1Spec']
msg_type = kdc_exchange_dict['rep_msg_type']
- self.assertElementEqual(rep, 'msg-type', msg_type) # AS-REP | TGS-REP
+ self.assertElementEqual(rep, 'msg-type', msg_type) # AS-REP | TGS-REP
padata = self.getElementValue(rep, 'padata')
self.assertElementEqualUTF8(rep, 'crealm', expected_crealm)
self.assertElementEqualPrincipal(rep, 'cname', expected_cname)
ticket = self.getElementValue(rep, 'ticket')
ticket_encpart = None
ticket_cipher = None
- if ticket is not None: # Never None, but gives indentation
+ if ticket is not None: # Never None, but gives indentation
self.assertElementPresent(ticket, 'tkt-vno')
self.assertElementEqualUTF8(ticket, 'realm', expected_srealm)
self.assertElementEqualPrincipal(ticket, 'sname', expected_sname)
self.assertElementPresent(ticket, 'enc-part')
ticket_encpart = self.getElementValue(ticket, 'enc-part')
- if ticket_encpart is not None: # Never None, but gives indentation
+ if ticket_encpart is not None: # Never None, but gives indentation
self.assertElementPresent(ticket_encpart, 'etype')
# 'unspecified' means present, with any value != 0
- self.assertElementKVNO(ticket_encpart, 'kvno', self.unspecified_kvno)
+ self.assertElementKVNO(ticket_encpart, 'kvno',
+ self.unspecified_kvno)
self.assertElementPresent(ticket_encpart, 'cipher')
ticket_cipher = self.getElementValue(ticket_encpart, 'cipher')
self.assertElementPresent(rep, 'enc-part')
encpart = self.getElementValue(rep, 'enc-part')
encpart_cipher = None
- if encpart is not None: # Never None, but gives indentation
+ if encpart is not None: # Never None, but gives indentation
self.assertElementPresent(encpart, 'etype')
self.assertElementKVNO(ticket_encpart, 'kvno', 'autodetect')
self.assertElementPresent(encpart, 'cipher')
encpart_decryption_key = None
if check_padata_fn is not None:
- # See if get the decryption key from the preauth phase
- encpart_decryption_key,encpart_decryption_usage = \
- check_padata_fn(kdc_exchange_dict, callback_dict,
- rep, padata)
+ # See if we can get the decryption key from the preauth phase
+ encpart_decryption_key, encpart_decryption_usage = (
+ check_padata_fn(kdc_exchange_dict, callback_dict,
+ rep, padata))
ticket_private = None
if ticket_decryption_key is not None:
- self.assertElementEqual(ticket_encpart, 'etype', ticket_decryption_key.etype)
- self.assertElementKVNO(ticket_encpart, 'kvno', ticket_decryption_key.kvno)
- ticket_decpart = ticket_decryption_key.decrypt(KU_TICKET, ticket_cipher)
- ticket_private = self.der_decode(ticket_decpart, asn1Spec=krb5_asn1.EncTicketPart())
+ self.assertElementEqual(ticket_encpart, 'etype',
+ ticket_decryption_key.etype)
+ self.assertElementKVNO(ticket_encpart, 'kvno',
+ ticket_decryption_key.kvno)
+ ticket_decpart = ticket_decryption_key.decrypt(KU_TICKET,
+ ticket_cipher)
+ ticket_private = self.der_decode(
+ ticket_decpart,
+ asn1Spec=krb5_asn1.EncTicketPart())
encpart_private = None
if encpart_decryption_key is not None:
- self.assertElementEqual(encpart, 'etype', encpart_decryption_key.etype)
- self.assertElementKVNO(encpart, 'kvno', encpart_decryption_key.kvno)
- rep_decpart = encpart_decryption_key.decrypt(encpart_decryption_usage, encpart_cipher)
- encpart_private = self.der_decode(rep_decpart, asn1Spec=rep_encpart_asn1Spec())
+ self.assertElementEqual(encpart, 'etype',
+ encpart_decryption_key.etype)
+ self.assertElementKVNO(encpart, 'kvno',
+ encpart_decryption_key.kvno)
+ rep_decpart = encpart_decryption_key.decrypt(
+ encpart_decryption_usage,
+ encpart_cipher)
+ encpart_private = self.der_decode(
+ rep_decpart,
+ asn1Spec=rep_encpart_asn1Spec())
if check_kdc_private_fn is not None:
check_kdc_private_fn(kdc_exchange_dict, callback_dict,
self.assertElementPresent(ticket_private, 'flags')
self.assertElementPresent(ticket_private, 'key')
ticket_key = self.getElementValue(ticket_private, 'key')
- if ticket_key is not None: # Never None, but gives indentation
+ if ticket_key is not None: # Never None, but gives indentation
self.assertElementPresent(ticket_key, 'keytype')
self.assertElementPresent(ticket_key, 'keyvalue')
ticket_session_key = self.EncryptionKey_import(ticket_key)
- self.assertElementEqualUTF8(ticket_private, 'crealm', expected_crealm)
- self.assertElementEqualPrincipal(ticket_private, 'cname', expected_cname)
+ self.assertElementEqualUTF8(ticket_private, 'crealm',
+ expected_crealm)
+ self.assertElementEqualPrincipal(ticket_private, 'cname',
+ expected_cname)
self.assertElementPresent(ticket_private, 'transited')
self.assertElementPresent(ticket_private, 'authtime')
if self.strict_checking:
if encpart_private is not None:
self.assertElementPresent(encpart_private, 'key')
encpart_key = self.getElementValue(encpart_private, 'key')
- if encpart_key is not None: # Never None, but gives indentation
+ if encpart_key is not None: # Never None, but gives indentation
self.assertElementPresent(encpart_key, 'keytype')
self.assertElementPresent(encpart_key, 'keyvalue')
encpart_session_key = self.EncryptionKey_import(encpart_key)
self.assertElementPresent(encpart_private, 'last-req')
self.assertElementPresent(encpart_private, 'nonce')
- # TODO self.assertElementPresent(encpart_private, 'key-expiration')
+ # TODO self.assertElementPresent(encpart_private,
+ # 'key-expiration')
self.assertElementPresent(encpart_private, 'flags')
self.assertElementPresent(encpart_private, 'authtime')
if self.strict_checking:
self.assertElementPresent(encpart_private, 'starttime')
self.assertElementPresent(encpart_private, 'endtime')
# TODO self.assertElementPresent(encpart_private, 'renew-till')
- self.assertElementEqualUTF8(encpart_private, 'srealm', expected_srealm)
- self.assertElementEqualPrincipal(encpart_private, 'sname', expected_sname)
+ self.assertElementEqualUTF8(encpart_private, 'srealm',
+ expected_srealm)
+ self.assertElementEqualPrincipal(encpart_private, 'sname',
+ expected_sname)
# TODO self.assertElementMissing(encpart_private, 'caddr')
if ticket_session_key is not None and encpart_session_key is not None:
- self.assertEqual(ticket_session_key.etype, encpart_session_key.etype)
- self.assertEqual(ticket_session_key.key.contents, encpart_session_key.key.contents)
+ self.assertEqual(ticket_session_key.etype,
+ encpart_session_key.etype)
+ self.assertEqual(ticket_session_key.key.contents,
+ encpart_session_key.key.contents)
if encpart_session_key is not None:
session_key = encpart_session_key
else:
session_key = ticket_session_key
- ticket_creds = KerberosTicketCreds(ticket,
- session_key,
- crealm=expected_crealm,
- cname=expected_cname,
- srealm=expected_srealm,
- sname=expected_sname,
- decryption_key=ticket_decryption_key,
- ticket_private=ticket_private,
- encpart_private=encpart_private)
+ ticket_creds = KerberosTicketCreds(
+ ticket,
+ session_key,
+ crealm=expected_crealm,
+ cname=expected_cname,
+ srealm=expected_srealm,
+ sname=expected_sname,
+ decryption_key=ticket_decryption_key,
+ ticket_private=ticket_private,
+ encpart_private=encpart_private)
kdc_exchange_dict['rep_ticket_creds'] = ticket_creds
return
if kcrypto.Enctype.RC4 in proposed_etypes:
expect_etype_info = True
for etype in proposed_etypes:
- if etype in (kcrypto.Enctype.AES256,kcrypto.Enctype.AES128):
+ if etype in (kcrypto.Enctype.AES256, kcrypto.Enctype.AES128):
expect_etype_info = False
if etype not in client_as_etypes:
continue
- if etype in (kcrypto.Enctype.AES256,kcrypto.Enctype.AES128):
+ if etype in (kcrypto.Enctype.AES256, kcrypto.Enctype.AES128):
if etype > expected_aes_type:
expected_aes_type = etype
if etype in (kcrypto.Enctype.RC4,):
if self.strict_checking:
self.assertIsNotNone(edata)
if edata is not None:
- rep_padata = self.der_decode(edata, asn1Spec=krb5_asn1.METHOD_DATA())
+ rep_padata = self.der_decode(edata,
+ asn1Spec=krb5_asn1.METHOD_DATA())
self.assertGreater(len(rep_padata), 0)
else:
rep_padata = []
if self.strict_checking:
for i in range(0, len(expected_patypes)):
- self.assertElementEqual(rep_padata[i], 'padata-type', expected_patypes[i])
+ self.assertElementEqual(rep_padata[i],
+ 'padata-type',
+ expected_patypes[i])
self.assertEqual(len(rep_padata), len(expected_patypes))
etype_info2 = None
pavalue = self.getElementValue(pa, 'padata-value')
if patype == PADATA_ETYPE_INFO2:
self.assertIsNone(etype_info2)
- etype_info2 = self.der_decode(pavalue, asn1Spec=krb5_asn1.ETYPE_INFO2())
+ etype_info2 = self.der_decode(pavalue,
+ asn1Spec=krb5_asn1.ETYPE_INFO2())
continue
if patype == PADATA_ETYPE_INFO:
self.assertIsNone(etype_info)
- etype_info = self.der_decode(pavalue, asn1Spec=krb5_asn1.ETYPE_INFO())
+ etype_info = self.der_decode(pavalue,
+ asn1Spec=krb5_asn1.ETYPE_INFO())
continue
if patype == PADATA_ENC_TIMESTAMP:
self.assertIsNone(enc_timestamp)
authenticator_subkey = kdc_exchange_dict['authenticator_subkey']
body_checksum_type = kdc_exchange_dict['body_checksum_type']
- req_body_blob = self.der_encode(req_body, asn1Spec=krb5_asn1.KDC_REQ_BODY())
+ req_body_blob = self.der_encode(req_body,
+ asn1Spec=krb5_asn1.KDC_REQ_BODY())
req_body_checksum = self.Checksum_create(tgt.session_key,
KU_TGS_REQ_AUTH_CKSUM,
subkey_obj = authenticator_subkey.export_obj()
seq_number = random.randint(0, 0xfffffffe)
(ctime, cusec) = self.get_KerberosTimeWithUsec()
- authenticator_obj = self.Authenticator_create(crealm=tgt.crealm,
- cname=tgt.cname,
- cksum=req_body_checksum,
- cusec=cusec,
- ctime=ctime,
- subkey=subkey_obj,
- seq_number=seq_number,
- authorization_data=None)
- authenticator_blob = self.der_encode(authenticator_obj, asn1Spec=krb5_asn1.Authenticator())
+ authenticator_obj = self.Authenticator_create(
+ crealm=tgt.crealm,
+ cname=tgt.cname,
+ cksum=req_body_checksum,
+ cusec=cusec,
+ ctime=ctime,
+ subkey=subkey_obj,
+ seq_number=seq_number,
+ authorization_data=None)
+ authenticator_blob = self.der_encode(
+ authenticator_obj,
+ asn1Spec=krb5_asn1.Authenticator())
authenticator = self.EncryptedData_create(tgt.session_key,
KU_TGS_REQ_AUTH,
ap_options = krb5_asn1.APOptions('0')
ap_req_obj = self.AP_REQ_create(ap_options=str(ap_options),
- ticket=tgt.ticket,
- authenticator=authenticator)
+ ticket=tgt.ticket,
+ authenticator=authenticator)
ap_req = self.der_encode(ap_req_obj, asn1Spec=krb5_asn1.AP_REQ())
pa_tgs_req = self.PA_DATA_create(PADATA_KDC_REQ, ap_req)
padata = [pa_tgs_req]
return preauth_key, as_rep_usage
kdc_exchange_dict = self.as_exchange_dict(
- expected_crealm=expected_crealm,
- expected_cname=expected_cname,
- expected_srealm=expected_srealm,
- expected_sname=expected_sname,
- ticket_decryption_key=ticket_decryption_key,
- generate_padata_fn=_generate_padata_copy,
- check_error_fn=self.generic_check_as_error,
- check_rep_fn=self.generic_check_kdc_rep,
- check_padata_fn=_check_padata_preauth_key,
- check_kdc_private_fn=self.generic_check_kdc_private,
- expected_error_mode=expected_error_mode,
- client_as_etypes=client_as_etypes,
- expected_salt=expected_salt)
+ expected_crealm=expected_crealm,
+ expected_cname=expected_cname,
+ expected_srealm=expected_srealm,
+ expected_sname=expected_sname,
+ ticket_decryption_key=ticket_decryption_key,
+ generate_padata_fn=_generate_padata_copy,
+ check_error_fn=self.generic_check_as_error,
+ check_rep_fn=self.generic_check_kdc_rep,
+ check_padata_fn=_check_padata_preauth_key,
+ check_kdc_private_fn=self.generic_check_kdc_private,
+ expected_error_mode=expected_error_mode,
+ client_as_etypes=client_as_etypes,
+ expected_salt=expected_salt)
rep = self._generic_kdc_exchange(kdc_exchange_dict,
kdc_options=str(kdc_options),
till_time=till,
etypes=etypes)
- if expected_error_mode == 0: # AS-REP
+ if expected_error_mode == 0: # AS-REP
return rep
return kdc_exchange_dict['preauth_etype_info2']