from samba import generate_random_bytes as get_random_bytes
from samba.common import get_string, get_bytes
+
class Enctype(object):
DES_CRC = 1
DES_MD4 = 2
res |= x ^ y
return res == 0
+
def SIMPLE_HASH(string, algo_cls):
hash_ctx = hashes.Hash(algo_cls(), default_backend())
hash_ctx.update(string)
return hash_ctx.finalize()
+
def HMAC_HASH(key, string, algo_cls):
hmac_ctx = hmac.HMAC(key, algo_cls(), default_backend())
hmac_ctx.update(string)
return hmac_ctx.finalize()
+
def _nfold(str, nbytes):
# Convert str to a string of length nbytes using the RFC 3961 nfold
# operation.
# Rotate the bytes in str to the right by nbits bits.
def rotate_right(str, nbits):
- nbytes, remain = (nbits//8) % len(str), nbits % 8
- return bytes([(str[i-nbytes] >> remain) |
- (str[i-nbytes-1] << (8-remain) & 0xff)
- for i in range(len(str))])
+ nbytes, remain = (nbits // 8) % len(str), nbits % 8
+ return bytes([
+ (str[i - nbytes] >> remain)
+ | (str[i - nbytes - 1] << (8 - remain) & 0xff)
+ for i in range(len(str))])
# Add equal-length strings together with end-around carry.
def add_ones_complement(str1, str2):
v = [a + b for a, b in zip(str1, str2)]
# Propagate carry bits to the left until there aren't any left.
while any(x & ~0xff for x in v):
- v = [(v[i-n+1]>>8) + (v[i]&0xff) for i in range(n)]
+ v = [(v[i - n + 1] >> 8) + (v[i] & 0xff) for i in range(n)]
return bytes([x for x in v])
# Concatenate copies of str to produce the least common multiple
slen = len(str)
lcm = nbytes * slen // gcd(nbytes, slen)
bigstr = b''.join((rotate_right(str, 13 * i) for i in range(lcm // slen)))
- slices = (bigstr[p:p+nbytes] for p in range(0, lcm, nbytes))
+ slices = (bigstr[p:p + nbytes] for p in range(0, lcm, nbytes))
return reduce(add_ones_complement, slices)
return b if bin(b & ~1).count('1') % 2 else b | 1
assert len(seed) == 7
firstbytes = [parity(b & ~1) for b in seed]
- lastbyte = parity(sum((seed[i]&1) << i+1 for i in range(7)))
+ lastbyte = parity(sum((seed[i] & 1) << i + 1 for i in range(7)))
keybytes = bytes([b for b in firstbytes + [lastbyte]])
if _is_weak_des_key(keybytes):
keybytes[7] = bytes([keybytes[7] ^ 0xF0])
if len(ciphertext) == 16:
return aes_decrypt(ciphertext)
# Split the ciphertext into blocks. The last block may be partial.
- cblocks = [ciphertext[p:p+16] for p in range(0, len(ciphertext), 16)]
+ cblocks = [ciphertext[p:p + 16] for p in range(0, len(ciphertext), 16)]
lastlen = len(cblocks[-1])
# CBC-decrypt all but the last two blocks.
prev_cblock = bytes(16)
# will be the omitted bytes of ciphertext from the final
# block.
b = aes_decrypt(cblocks[-2])
- lastplaintext =_xorbytes(b[:lastlen], cblocks[-1])
+ lastplaintext = _xorbytes(b[:lastlen], cblocks[-1])
omitted = b[lastlen:]
# Decrypt the final cipher block plus the omitted bytes to get
# the second-to-last plaintext block.
cksum = HMAC_HASH(ki, confounder + plaintext, hashes.MD5)
ke = HMAC_HASH(ki, cksum, hashes.MD5)
- encryptor = Cipher(ciphers.ARC4(ke), None, default_backend()).encryptor()
+ encryptor = Cipher(
+ ciphers.ARC4(ke), None, default_backend()).encryptor()
ctext = encryptor.update(confounder + plaintext)
return cksum + ctext
ki = HMAC_HASH(key.contents, cls.usage_str(keyusage), hashes.MD5)
ke = HMAC_HASH(ki, cksum, hashes.MD5)
- decryptor = Cipher(ciphers.ARC4(ke), None, default_backend()).decryptor()
+ decryptor = Cipher(
+ ciphers.ARC4(ke), None, default_backend()).decryptor()
basic_plaintext = decryptor.update(basic_ctext)
exp_cksum = HMAC_HASH(ki, basic_plaintext, hashes.MD5)
c.verify(key, keyusage, text, cksum)
-def prfplus(key, pepper, l):
- # Produce l bytes of output using the RFC 6113 PRF+ function.
+def prfplus(key, pepper, ln):
+ # Produce ln bytes of output using the RFC 6113 PRF+ function.
out = b''
count = 1
- while len(out) < l:
+ while len(out) < ln:
out += prf(key, bytes([count]) + pepper)
count += 1
- return out[:l]
+ return out[:ln]
def cf2(enctype, key1, key2, pepper1, pepper2):
return e.random_to_key(_xorbytes(prfplus(key1, pepper1, e.seedsize),
prfplus(key2, pepper2, e.seedsize)))
+
def h(hexstr):
return bytes.fromhex(hexstr)
+
class KcrytoTest(TestCase):
"""kcrypto Test case."""
conf = h('94B491F481485B9A0678CD3C4EA386AD')
keyusage = 2
plain = b'9 bytesss'
- ctxt = h('68FB9679601F45C78857B2BF820FD6E53ECA8D42FD4B1D7024A09205ABB7CD2E'
- 'C26C355D2F')
+ ctxt = h('68FB9679601F45C78857B2BF820FD6E53ECA8D42FD4B1D7024A09205ABB7'
+ 'CD2EC26C355D2F')
k = Key(Enctype.AES128, kb)
self.assertEqual(encrypt(k, keyusage, plain, conf), ctxt)
self.assertEqual(decrypt(k, keyusage, ctxt), plain)
def test_aes256_crypt(self):
# AES256 encrypt and decrypt
- kb = h('F1C795E9248A09338D82C3F8D5B567040B0110736845041347235B1404231398')
+ kb = h('F1C795E9248A09338D82C3F8D5B567040B0110736845041347235B14042313'
+ '98')
conf = h('E45CA518B42E266AD98E165E706FFB60')
keyusage = 4
plain = b'30 bytes bytes bytes bytes byt'
- ctxt = h('D1137A4D634CFECE924DBC3BF6790648BD5CFF7DE0E7B99460211D0DAEF3D79A'
- '295C688858F3B34B9CBD6EEBAE81DAF6B734D4D498B6714F1C1D')
+ ctxt = h('D1137A4D634CFECE924DBC3BF6790648BD5CFF7DE0E7B99460211D0DAEF3'
+ 'D79A295C688858F3B34B9CBD6EEBAE81DAF6B734D4D498B6714F1C1D')
k = Key(Enctype.AES256, kb)
self.assertEqual(encrypt(k, keyusage, plain, conf), ctxt)
self.assertEqual(decrypt(k, keyusage, ctxt), plain)
def test_aes256_checksum(self):
# AES256 checksum
- kb = h('B1AE4CD8462AFF1677053CC9279AAC30B796FB81CE21474DD3DDBCFEA4EC76D7')
+ kb = h('B1AE4CD8462AFF1677053CC9279AAC30B796FB81CE21474DD3DDBC'
+ 'FEA4EC76D7')
keyusage = 4
plain = b'fourteen'
cksum = h('E08739E3279E2903EC8E3836')
string = b'X' * 64
salt = b'pass phrase equals block size'
params = h('000004B0')
- kb = h('89ADEE3608DB8BC71F1BFBFE459486B05618B70CBAE22092534E56C553BA4B34')
+ kb = h('89ADEE3608DB8BC71F1BFBFE459486B05618B70CBAE22092534E56'
+ 'C553BA4B34')
k = string_to_key(Enctype.AES256, string, salt, params)
self.assertEqual(k.contents, kb)
def test_aes256_cf2(self):
# AES256 cf2
- kb = h('4D6CA4E629785C1F01BAF55E2E548566B9617AE3A96868C337CB93B5E72B1C7B')
+ kb = h('4D6CA4E629785C1F01BAF55E2E548566B9617AE3A96868C337CB93B5'
+ 'E72B1C7B')
k1 = string_to_key(Enctype.AES256, b'key1', b'key1')
k2 = string_to_key(Enctype.AES256, b'key2', b'key2')
k = cf2(Enctype.AES256, k1, k2, b'a', b'b')
conf = h('94690A17B2DA3C9B')
keyusage = 3
plain = b'13 bytes byte'
- ctxt = h('839A17081ECBAFBCDC91B88C6955DD3C4514023CF177B77BF0D0177A16F705E8'
- '49CB7781D76A316B193F8D30')
+ ctxt = h('839A17081ECBAFBCDC91B88C6955DD3C4514023CF177B77BF0D0177A16F7'
+ '05E849CB7781D76A316B193F8D30')
k = Key(Enctype.DES3, kb)
self.assertEqual(encrypt(k, keyusage, plain, conf), ctxt)
self.assertEqual(decrypt(k, keyusage, ctxt), _zeropad(plain, 8))
conf = h('37245E73A45FBF72')
keyusage = 4
plain = b'30 bytes bytes bytes bytes byt'
- ctxt = h('95F9047C3AD75891C2E9B04B16566DC8B6EB9CE4231AFB2542EF87A7B5A0F260'
- 'A99F0460508DE0CECC632D07C354124E46C5D2234EB8')
+ ctxt = h('95F9047C3AD75891C2E9B04B16566DC8B6EB9CE4231AFB2542EF87A7B5A0'
+ 'F260A99F0460508DE0CECC632D07C354124E46C5D2234EB8')
k = Key(Enctype.RC4, kb)
self.assertEqual(encrypt(k, keyusage, plain, conf), ctxt)
self.assertEqual(decrypt(k, keyusage, ctxt), plain)
from pyasn1.codec.native.encoder import encode as pyasn1_native_encode
from pyasn1.codec.ber.encoder import BitStringEncoder as BitStringEncoder
-def BitStringEncoder_encodeValue32(self, value, asn1Spec, encodeFun, **options):
+
+
+def BitStringEncoder_encodeValue32(
+ self, value, asn1Spec, encodeFun, **options):
#
# BitStrings like KDCOptions or TicketFlags should at least
# be 32-Bit on the wire
padding = 0
ret = b'\x00' + substrate + (b'\x00' * padding)
return ret, False, True
+
+
BitStringEncoder.encodeValue = BitStringEncoder_encodeValue32
+
def BitString_NamedValues_prettyPrint(self, scope=0):
ret = "%s" % self.asBinary()
bits = []
highest_bit = 32
for byte in self.asNumbers():
- for bit in [7,6,5,4,3,2,1,0]:
+ for bit in [7, 6, 5, 4, 3, 2, 1, 0]:
mask = 1 << bit
if byte & mask:
val = 1
delim = ",\n%s " % indent
ret += "\n%s)" % indent
return ret
-krb5_asn1.TicketFlags.prettyPrintNamedValues = krb5_asn1.TicketFlagsValues.namedValues
-krb5_asn1.TicketFlags.namedValues = krb5_asn1.TicketFlagsValues.namedValues
-krb5_asn1.TicketFlags.prettyPrint = BitString_NamedValues_prettyPrint
-krb5_asn1.KDCOptions.prettyPrintNamedValues = krb5_asn1.KDCOptionsValues.namedValues
-krb5_asn1.KDCOptions.namedValues = krb5_asn1.KDCOptionsValues.namedValues
-krb5_asn1.KDCOptions.prettyPrint = BitString_NamedValues_prettyPrint
+
+
+krb5_asn1.TicketFlags.prettyPrintNamedValues =\
+ krb5_asn1.TicketFlagsValues.namedValues
+krb5_asn1.TicketFlags.namedValues =\
+ krb5_asn1.TicketFlagsValues.namedValues
+krb5_asn1.TicketFlags.prettyPrint =\
+ BitString_NamedValues_prettyPrint
+krb5_asn1.KDCOptions.prettyPrintNamedValues =\
+ krb5_asn1.KDCOptionsValues.namedValues
+krb5_asn1.KDCOptions.namedValues =\
+ krb5_asn1.KDCOptionsValues.namedValues
+krb5_asn1.KDCOptions.prettyPrint =\
+ BitString_NamedValues_prettyPrint
+
def Integer_NamedValues_prettyPrint(self, scope=0):
intval = int(self)
name = "<__unknown__>"
ret = "%d (0x%x) %s" % (intval, intval, name)
return ret
-krb5_asn1.NameType.prettyPrintNamedValues = krb5_asn1.NameTypeValues.namedValues
-krb5_asn1.NameType.prettyPrint = Integer_NamedValues_prettyPrint
-krb5_asn1.AuthDataType.prettyPrintNamedValues = krb5_asn1.AuthDataTypeValues.namedValues
-krb5_asn1.AuthDataType.prettyPrint = Integer_NamedValues_prettyPrint
-krb5_asn1.PADataType.prettyPrintNamedValues = krb5_asn1.PADataTypeValues.namedValues
-krb5_asn1.PADataType.prettyPrint = Integer_NamedValues_prettyPrint
-krb5_asn1.EncryptionType.prettyPrintNamedValues = krb5_asn1.EncryptionTypeValues.namedValues
-krb5_asn1.EncryptionType.prettyPrint = Integer_NamedValues_prettyPrint
-krb5_asn1.ChecksumType.prettyPrintNamedValues = krb5_asn1.ChecksumTypeValues.namedValues
-krb5_asn1.ChecksumType.prettyPrint = Integer_NamedValues_prettyPrint
+
+
+krb5_asn1.NameType.prettyPrintNamedValues =\
+ krb5_asn1.NameTypeValues.namedValues
+krb5_asn1.NameType.prettyPrint =\
+ Integer_NamedValues_prettyPrint
+krb5_asn1.AuthDataType.prettyPrintNamedValues =\
+ krb5_asn1.AuthDataTypeValues.namedValues
+krb5_asn1.AuthDataType.prettyPrint =\
+ Integer_NamedValues_prettyPrint
+krb5_asn1.PADataType.prettyPrintNamedValues =\
+ krb5_asn1.PADataTypeValues.namedValues
+krb5_asn1.PADataType.prettyPrint =\
+ Integer_NamedValues_prettyPrint
+krb5_asn1.EncryptionType.prettyPrintNamedValues =\
+ krb5_asn1.EncryptionTypeValues.namedValues
+krb5_asn1.EncryptionType.prettyPrint =\
+ Integer_NamedValues_prettyPrint
+krb5_asn1.ChecksumType.prettyPrintNamedValues =\
+ krb5_asn1.ChecksumTypeValues.namedValues
+krb5_asn1.ChecksumType.prettyPrint =\
+ Integer_NamedValues_prettyPrint
+
class Krb5EncryptionKey(object):
def __init__(self, key, kvno):
EncryptionKey_obj = {
'keytype': self.etype,
'keyvalue': self.key.contents,
- };
+ }
return EncryptionKey_obj
+
class RawKerberosTest(TestCase):
"""A raw Kerberos Test case."""
self.s = socket.socket(self.a[0][0], self.a[0][1], self.a[0][2])
self.s.settimeout(10)
self.s.connect(self.a[0][4])
- except socket.error as e:
+ except socket.error:
self.s.close()
raise
- except IOError as e:
+ except IOError:
self.s.close()
raise
- except Exception as e:
+ except Exception:
raise
finally:
pass
domain = samba.tests.env_get_var_value('DOMAIN')
realm = samba.tests.env_get_var_value('REALM')
username = samba.tests.env_get_var_value('SERVICE_USERNAME')
- password = samba.tests.env_get_var_value('SERVICE_PASSWORD',
- allow_missing=allow_missing_password)
+ password = samba.tests.env_get_var_value(
+ 'SERVICE_PASSWORD',
+ allow_missing=allow_missing_password)
c.set_domain(domain)
c.set_realm(realm)
c.set_username(username)
if hexdump is None:
hexdump = self.do_hexdump
if hexdump:
- sys.stderr.write("%s: %d\n%s" % (name, len(blob), self.hexdump(blob)))
-
- def der_decode(self, blob, asn1Spec=None, native_encode=True, asn1_print=None, hexdump=None):
+ sys.stderr.write(
+ "%s: %d\n%s" % (name, len(blob), self.hexdump(blob)))
+
+ def der_decode(
+ self,
+ blob,
+ asn1Spec=None,
+ native_encode=True,
+ asn1_print=None,
+ hexdump=None):
if asn1Spec is not None:
class_name = type(asn1Spec).__name__.split(':')[0]
else:
class_name = "<None-asn1Spec>"
self.hex_dump(class_name, blob, hexdump=hexdump)
- obj,_ = pyasn1_der_decode(blob, asn1Spec=asn1Spec)
+ obj, _ = pyasn1_der_decode(blob, asn1Spec=asn1Spec)
self.asn1_dump(None, obj, asn1_print=asn1_print)
if native_encode:
obj = pyasn1_native_encode(obj)
return obj
- def der_encode(self, obj, asn1Spec=None, native_decode=True, asn1_print=None, hexdump=None):
+ def der_encode(
+ self,
+ obj,
+ asn1Spec=None,
+ native_decode=True,
+ asn1_print=None,
+ hexdump=None):
if native_decode:
obj = pyasn1_native_decode(obj, asn1Spec=asn1Spec)
class_name = type(obj).__name__.split(':')[0]
def send_pdu(self, req, asn1_print=None, hexdump=None):
try:
- k5_pdu = self.der_encode(req, native_decode=False, asn1_print=asn1_print, hexdump=False)
+ k5_pdu = self.der_encode(
+ req, native_decode=False, asn1_print=asn1_print, hexdump=False)
header = struct.pack('>I', len(k5_pdu))
req_pdu = header
req_pdu += k5_pdu
self._disconnect("recv_raw: EOF")
return None
self.hex_dump("recv_raw", rep_pdu, hexdump=hexdump)
- except socket.timeout as e:
+ except socket.timeout:
self.s.settimeout(10)
sys.stderr.write("recv_raw: TIMEOUT\n")
pass
rep_pdu = None
rep = None
try:
- raw_pdu = self.recv_raw(num_recv=4, hexdump=hexdump, timeout=timeout)
+ raw_pdu = self.recv_raw(
+ num_recv=4, hexdump=hexdump, timeout=timeout)
if raw_pdu is None:
return (None, None)
header = struct.unpack(">I", raw_pdu[0:4])
missing = k5_len
rep_pdu = b''
while missing > 0:
- raw_pdu = self.recv_raw(num_recv=missing, hexdump=hexdump, timeout=timeout)
+ raw_pdu = self.recv_raw(
+ num_recv=missing, hexdump=hexdump, timeout=timeout)
self.assertGreaterEqual(len(raw_pdu), 1)
rep_pdu += raw_pdu
missing = k5_len - len(rep_pdu)
- k5_raw = self.der_decode(rep_pdu, asn1Spec=None, native_encode=False,
- asn1_print=False, hexdump=False)
- pvno=k5_raw['field-0']
+ k5_raw = self.der_decode(
+ rep_pdu,
+ asn1Spec=None,
+ native_encode=False,
+ asn1_print=False,
+ hexdump=False)
+ pvno = k5_raw['field-0']
self.assertEqual(pvno, 5)
- msg_type=k5_raw['field-1']
- self.assertIn(msg_type, [11,13,30])
+ msg_type = k5_raw['field-1']
+ self.assertIn(msg_type, [11, 13, 30])
if msg_type == 11:
- asn1Spec=krb5_asn1.AS_REP()
+ asn1Spec = krb5_asn1.AS_REP()
elif msg_type == 13:
- asn1Spec=krb5_asn1.TGS_REP()
+ asn1Spec = krb5_asn1.TGS_REP()
elif msg_type == 30:
- asn1Spec=krb5_asn1.KRB_ERROR()
+ asn1Spec = krb5_asn1.KRB_ERROR()
rep = self.der_decode(rep_pdu, asn1Spec=asn1Spec,
asn1_print=asn1_print, hexdump=False)
finally:
self.assertIsNone(self.s, msg="Is connected")
return
- def send_recv_transaction(self, req, asn1_print=None, hexdump=None, timeout=None):
+ def send_recv_transaction(
+ self,
+ req,
+ asn1_print=None,
+ hexdump=None,
+ timeout=None):
self.connect()
try:
self.send_pdu(req, asn1_print=asn1_print, hexdump=hexdump)
- rep = self.recv_pdu(asn1_print=asn1_print, hexdump=hexdump, timeout=timeout)
+ rep = self.recv_pdu(
+ asn1_print=asn1_print, hexdump=hexdump, timeout=timeout)
except Exception:
self._disconnect("transaction failed")
raise
def assertPrincipalEqual(self, princ1, princ2):
self.assertEqual(princ1['name-type'], princ2['name-type'])
- self.assertEqual(len(princ1['name-string']), len(princ2['name-string']),
- msg="princ1=%s != princ2=%s" % (princ1, princ2))
+ self.assertEqual(
+ len(princ1['name-string']),
+ len(princ2['name-string']),
+ msg="princ1=%s != princ2=%s" % (princ1, princ2))
for idx in range(len(princ1['name-string'])):
- self.assertEqual(princ1['name-string'][idx], princ2['name-string'][idx],
- msg="princ1=%s != princ2=%s" % (princ1, princ2))
+ self.assertEqual(
+ princ1['name-string'][idx],
+ princ2['name-string'][idx],
+ msg="princ1=%s != princ2=%s" % (princ1, princ2))
return
def get_KerberosTimeWithUsec(self, epoch=None, offset=None):
salt = None
try:
salt = etype_info2['salt']
- except:
+ except Exception:
pass
if e == kcrypto.Enctype.RC4:
return self.SessionKey_create(etype=e, contents=nthash, kvno=kvno)
password = creds.get_password()
- return self.PasswordKey_create(etype=e, pwd=password, salt=salt, kvno=kvno)
+ return self.PasswordKey_create(
+ etype=e, pwd=password, salt=salt, kvno=kvno)
def RandomKey(self, etype):
e = kcrypto._get_enctype_profile(etype)
'cipher': ciphertext
}
if key.kvno is not None:
- EncryptedData_obj['kvno'] = key.kvno
+ EncryptedData_obj['kvno'] = key.kvno
return EncryptedData_obj
def Checksum_create(self, key, usage, plaintext, ctype=None):
- #Checksum ::= SEQUENCE {
+ # Checksum ::= SEQUENCE {
# cksumtype [0] Int32,
# checksum [1] OCTET STRING
- #}
+ # }
if ctype is None:
ctype = key.ctype
checksum = key.make_checksum(usage, plaintext, ctype=ctype)
return PA_DATA_obj
def PA_ENC_TS_ENC_create(self, ts, usec):
- #PA-ENC-TS-ENC ::= SEQUENCE {
+ # PA-ENC-TS-ENC ::= SEQUENCE {
# patimestamp[0] KerberosTime, -- client's time
# pausec[1] krb5int32 OPTIONAL
- #}
+ # }
PA_ENC_TS_ENC_obj = {
'patimestamp': ts,
'pausec': usec,
additional_tickets,
asn1_print=None,
hexdump=None):
- #KDC-REQ-BODY ::= SEQUENCE {
+ # KDC-REQ-BODY ::= SEQUENCE {
# kdc-options [0] KDCOptions,
# cname [1] PrincipalName OPTIONAL
# -- Used only in AS-REQ --,
# till [5] KerberosTime,
# rtime [6] KerberosTime OPTIONAL,
# nonce [7] UInt32,
- # etype [8] SEQUENCE OF Int32 -- EncryptionType
+ # etype [8] SEQUENCE OF Int32
+ # -- EncryptionType
# -- in preference order --,
# addresses [9] HostAddresses OPTIONAL,
# enc-authorization-data [10] EncryptedData OPTIONAL
# -- AuthorizationData --,
# additional-tickets [11] SEQUENCE OF Ticket OPTIONAL
# -- NOTE: not empty
- #}
+ # }
if EncAuthorizationData is not None:
- enc_ad_plain = self.der_encode(EncAuthorizationData,
- asn1Spec=krb5_asn1.AuthorizationData(),
- asn1_print=asn1_print,
- hexdump=hexdump)
- enc_ad = self.EncryptedData_create(EncAuthorizationData_key, enc_ad_plain)
+ enc_ad_plain = self.der_encode(
+ EncAuthorizationData,
+ asn1Spec=krb5_asn1.AuthorizationData(),
+ asn1_print=asn1_print,
+ hexdump=hexdump)
+ enc_ad = self.EncryptedData_create(
+ EncAuthorizationData_key, enc_ad_plain)
else:
enc_ad = None
KDC_REQ_BODY_obj = {
asn1Spec=None,
asn1_print=None,
hexdump=None):
- #KDC-REQ ::= SEQUENCE {
+ # KDC-REQ ::= SEQUENCE {
# -- NOTE: first tag is [1], not [0]
# pvno [1] INTEGER (5) ,
# msg-type [2] INTEGER (10 -- AS -- | 12 -- TGS --),
# padata [3] SEQUENCE OF PA-DATA OPTIONAL
# -- NOTE: not empty --,
# req-body [4] KDC-REQ-BODY
- #}
+ # }
#
KDC_REQ_BODY_obj = self.KDC_REQ_BODY_create(kdc_options,
cname,
if padata is not None:
KDC_REQ_obj['padata'] = padata
if asn1Spec is not None:
- KDC_REQ_decoded = pyasn1_native_decode(KDC_REQ_obj, asn1Spec=asn1Spec)
+ KDC_REQ_decoded = pyasn1_native_decode(
+ KDC_REQ_obj, asn1Spec=asn1Spec)
else:
KDC_REQ_decoded = None
return KDC_REQ_obj, KDC_REQ_decoded
def AS_REQ_create(self,
- padata, # optional
- kdc_options, # required
- cname, # optional
- realm, # required
- sname, # optional
- from_time, # optional
- till_time, # required
- renew_time, # optional
- nonce, # required
- etypes, # required
- addresses, # optional
+ padata, # optional
+ kdc_options, # required
+ cname, # optional
+ realm, # required
+ sname, # optional
+ from_time, # optional
+ till_time, # required
+ renew_time, # optional
+ nonce, # required
+ etypes, # required
+ addresses, # optional
EncAuthorizationData,
EncAuthorizationData_key,
additional_tickets,
native_decoded_only=True,
asn1_print=None,
hexdump=None):
- #KDC-REQ ::= SEQUENCE {
+ # KDC-REQ ::= SEQUENCE {
# -- NOTE: first tag is [1], not [0]
# pvno [1] INTEGER (5) ,
# msg-type [2] INTEGER (10 -- AS -- | 12 -- TGS --),
# padata [3] SEQUENCE OF PA-DATA OPTIONAL
# -- NOTE: not empty --,
# req-body [4] KDC-REQ-BODY
- #}
+ # }
#
- #KDC-REQ-BODY ::= SEQUENCE {
+ # KDC-REQ-BODY ::= SEQUENCE {
# kdc-options [0] KDCOptions,
# cname [1] PrincipalName OPTIONAL
# -- Used only in AS-REQ --,
# till [5] KerberosTime,
# rtime [6] KerberosTime OPTIONAL,
# nonce [7] UInt32,
- # etype [8] SEQUENCE OF Int32 -- EncryptionType
+ # etype [8] SEQUENCE OF Int32
+ # -- EncryptionType
# -- in preference order --,
# addresses [9] HostAddresses OPTIONAL,
# enc-authorization-data [10] EncryptedData OPTIONAL
# -- AuthorizationData --,
# additional-tickets [11] SEQUENCE OF Ticket OPTIONAL
# -- NOTE: not empty
- #}
- obj,decoded = self.KDC_REQ_create(msg_type=10,
- padata=padata,
- kdc_options=kdc_options,
- cname=cname,
- realm=realm,
- sname=sname,
- from_time=from_time,
- till_time=till_time,
- renew_time=renew_time,
- nonce=nonce,
- etypes=etypes,
- addresses=addresses,
- EncAuthorizationData=EncAuthorizationData,
- EncAuthorizationData_key=EncAuthorizationData_key,
- additional_tickets=additional_tickets,
- asn1Spec=krb5_asn1.AS_REQ(),
- asn1_print=asn1_print,
- hexdump=hexdump)
+ # }
+ obj, decoded = self.KDC_REQ_create(
+ msg_type=10,
+ padata=padata,
+ kdc_options=kdc_options,
+ cname=cname,
+ realm=realm,
+ sname=sname,
+ from_time=from_time,
+ till_time=till_time,
+ renew_time=renew_time,
+ nonce=nonce,
+ etypes=etypes,
+ addresses=addresses,
+ EncAuthorizationData=EncAuthorizationData,
+ EncAuthorizationData_key=EncAuthorizationData_key,
+ additional_tickets=additional_tickets,
+ asn1Spec=krb5_asn1.AS_REQ(),
+ asn1_print=asn1_print,
+ hexdump=hexdump)
if native_decoded_only:
return decoded
return decoded, obj
# ap-options [2] APOptions,
# ticket [3] Ticket,
# authenticator [4] EncryptedData -- Authenticator
- #}
+ # }
AP_REQ_obj = {
'pvno': 5,
'msg-type': 14,
}
return AP_REQ_obj
- def Authenticator_create(self, crealm, cname, cksum, cusec, ctime, subkey, seq_number,
- authorization_data):
+ def Authenticator_create(
+ self, crealm, cname, cksum, cusec, ctime, subkey, seq_number,
+ authorization_data):
# -- Unencrypted authenticator
# Authenticator ::= [APPLICATION 2] SEQUENCE {
# authenticator-vno [0] INTEGER (5),
# subkey [6] EncryptionKey OPTIONAL,
# seq-number [7] UInt32 OPTIONAL,
# authorization-data [8] AuthorizationData OPTIONAL
- #}
+ # }
Authenticator_obj = {
'authenticator-vno': 5,
'crealm': crealm,
return Authenticator_obj
def TGS_REQ_create(self,
- padata, # optional
+ padata, # optional
cusec,
ctime,
ticket,
- kdc_options, # required
- cname, # optional
- realm, # required
- sname, # optional
- from_time, # optional
- till_time, # required
- renew_time, # optional
- nonce, # required
- etypes, # required
- addresses, # optional
+ kdc_options, # required
+ cname, # optional
+ realm, # required
+ sname, # optional
+ from_time, # optional
+ till_time, # required
+ renew_time, # optional
+ nonce, # required
+ etypes, # required
+ addresses, # optional
EncAuthorizationData,
EncAuthorizationData_key,
additional_tickets,
native_decoded_only=True,
asn1_print=None,
hexdump=None):
- #KDC-REQ ::= SEQUENCE {
+ # KDC-REQ ::= SEQUENCE {
# -- NOTE: first tag is [1], not [0]
# pvno [1] INTEGER (5) ,
# msg-type [2] INTEGER (10 -- AS -- | 12 -- TGS --),
# padata [3] SEQUENCE OF PA-DATA OPTIONAL
# -- NOTE: not empty --,
# req-body [4] KDC-REQ-BODY
- #}
+ # }
#
- #KDC-REQ-BODY ::= SEQUENCE {
+ # KDC-REQ-BODY ::= SEQUENCE {
# kdc-options [0] KDCOptions,
# cname [1] PrincipalName OPTIONAL
# -- Used only in AS-REQ --,
# till [5] KerberosTime,
# rtime [6] KerberosTime OPTIONAL,
# nonce [7] UInt32,
- # etype [8] SEQUENCE OF Int32 -- EncryptionType
+ # etype [8] SEQUENCE OF Int32
+ # -- EncryptionType
# -- in preference order --,
# addresses [9] HostAddresses OPTIONAL,
# enc-authorization-data [10] EncryptedData OPTIONAL
# -- AuthorizationData --,
# additional-tickets [11] SEQUENCE OF Ticket OPTIONAL
# -- NOTE: not empty
- #}
-
- req_body = self.KDC_REQ_BODY_create(kdc_options=kdc_options,
- cname=None,
- realm=realm,
- sname=sname,
- from_time=from_time,
- till_time=till_time,
- renew_time=renew_time,
- nonce=nonce,
- etypes=etypes,
- addresses=addresses,
- EncAuthorizationData=EncAuthorizationData,
- EncAuthorizationData_key=EncAuthorizationData_key,
- additional_tickets=additional_tickets)
+ # }
+
+ req_body = self.KDC_REQ_BODY_create(
+ kdc_options=kdc_options,
+ cname=None,
+ realm=realm,
+ sname=sname,
+ from_time=from_time,
+ till_time=till_time,
+ renew_time=renew_time,
+ nonce=nonce,
+ etypes=etypes,
+ addresses=addresses,
+ EncAuthorizationData=EncAuthorizationData,
+ EncAuthorizationData_key=EncAuthorizationData_key,
+ additional_tickets=additional_tickets)
req_body = self.der_encode(req_body, asn1Spec=krb5_asn1.KDC_REQ_BODY(),
asn1_print=asn1_print, hexdump=hexdump)
- req_body_checksum = self.Checksum_create(ticket_session_key, 6, req_body,
- ctype=body_checksum_type)
+ req_body_checksum = self.Checksum_create(
+ ticket_session_key, 6, req_body, ctype=body_checksum_type)
subkey_obj = None
if authenticator_subkey is not None:
subkey_obj = authenticator_subkey.export_obj()
seq_number = random.randint(0, 0xfffffffe)
- authenticator = self.Authenticator_create(crealm=realm,
- cname=cname,
- cksum=req_body_checksum,
- cusec=cusec,
- ctime=ctime,
- subkey=subkey_obj,
- seq_number=seq_number,
- authorization_data=None)
- authenticator = self.der_encode(authenticator, asn1Spec=krb5_asn1.Authenticator(),
- asn1_print=asn1_print, hexdump=hexdump)
-
- authenticator = self.EncryptedData_create(ticket_session_key, 7, authenticator)
+ authenticator = self.Authenticator_create(
+ crealm=realm,
+ cname=cname,
+ cksum=req_body_checksum,
+ cusec=cusec,
+ ctime=ctime,
+ subkey=subkey_obj,
+ seq_number=seq_number,
+ authorization_data=None)
+ authenticator = self.der_encode(
+ authenticator,
+ asn1Spec=krb5_asn1.Authenticator(),
+ asn1_print=asn1_print,
+ hexdump=hexdump)
+
+ authenticator = self.EncryptedData_create(
+ ticket_session_key, 7, authenticator)
ap_options = krb5_asn1.APOptions('0')
ap_req = self.AP_REQ_create(ap_options=str(ap_options),
else:
padata = [pa_tgs_req]
- obj,decoded = self.KDC_REQ_create(msg_type=12,
- padata=padata,
- kdc_options=kdc_options,
- cname=None,
- realm=realm,
- sname=sname,
- from_time=from_time,
- till_time=till_time,
- renew_time=renew_time,
- nonce=nonce,
- etypes=etypes,
- addresses=addresses,
- EncAuthorizationData=EncAuthorizationData,
- EncAuthorizationData_key=EncAuthorizationData_key,
- additional_tickets=additional_tickets,
- asn1Spec=krb5_asn1.TGS_REQ(),
- asn1_print=asn1_print,
- hexdump=hexdump)
+ obj, decoded = self.KDC_REQ_create(
+ msg_type=12,
+ padata=padata,
+ kdc_options=kdc_options,
+ cname=None,
+ realm=realm,
+ sname=sname,
+ from_time=from_time,
+ till_time=till_time,
+ renew_time=renew_time,
+ nonce=nonce,
+ etypes=etypes,
+ addresses=addresses,
+ EncAuthorizationData=EncAuthorizationData,
+ EncAuthorizationData_key=EncAuthorizationData_key,
+ additional_tickets=additional_tickets,
+ asn1Spec=krb5_asn1.TGS_REQ(),
+ asn1_print=asn1_print,
+ hexdump=hexdump)
if native_decoded_only:
return decoded
return decoded, obj
'cksum': cksum,
'auth': "Kerberos",
}
- pa_s4u2self = self.der_encode(PA_S4U2Self_obj, asn1Spec=krb5_asn1.PA_S4U2Self())
+ pa_s4u2self = self.der_encode(
+ PA_S4U2Self_obj, asn1Spec=krb5_asn1.PA_S4U2Self())
return self.PA_DATA_create(129, pa_s4u2self)