]> git.ipfire.org Git - thirdparty/samba.git/commitdiff
tests/krb5: formatting
authorJoseph Sutton <josephsutton@catalyst.net.nz>
Mon, 2 Aug 2021 05:00:09 +0000 (17:00 +1200)
committerAndrew Bartlett <abartlet@samba.org>
Wed, 18 Aug 2021 22:28:33 +0000 (22:28 +0000)
Signed-off-by: Joseph Sutton <josephsutton@catalyst.net.nz>
Reviewed-by: Andrew Bartlett <abartlet@samba.org>
Reviewed-by: Andreas Schneider <asn@samba.org>
python/samba/tests/krb5/as_req_tests.py
python/samba/tests/krb5/kdc_base_test.py
python/samba/tests/krb5/raw_testcase.py

index 10e7b6036098742bc56c8c49853aa6ddd69f75c2..09cfc9e1fc8fcf69b6c8aaa9a52a94a1be8825dc 100755 (executable)
@@ -82,16 +82,16 @@ class AsReqKerberosTests(KDCBaseTest):
             return initial_padata, req_body
 
         kdc_exchange_dict = self.as_exchange_dict(
-                         expected_crealm=expected_crealm,
-                         expected_cname=expected_cname,
-                         expected_srealm=expected_srealm,
-                         expected_sname=expected_sname,
-                         generate_padata_fn=_generate_padata_copy,
-                         check_error_fn=self.generic_check_as_error,
-                         check_rep_fn=self.generic_check_kdc_rep,
-                         expected_error_mode=expected_error_mode,
-                         client_as_etypes=client_as_etypes,
-                         expected_salt=expected_salt)
+            expected_crealm=expected_crealm,
+            expected_cname=expected_cname,
+            expected_srealm=expected_srealm,
+            expected_sname=expected_sname,
+            generate_padata_fn=_generate_padata_copy,
+            check_error_fn=self.generic_check_as_error,
+            check_rep_fn=self.generic_check_kdc_rep,
+            expected_error_mode=expected_error_mode,
+            client_as_etypes=client_as_etypes,
+            expected_salt=expected_salt)
 
         rep = self._generic_kdc_exchange(kdc_exchange_dict,
                                          kdc_options=str(initial_kdc_options),
index 4bd856b217e0443b25b9cc3a46fe212a95dd0859..c23c71e1d747d05def15af3ec02a2fc6ab8488b4 100644 (file)
@@ -21,10 +21,7 @@ import os
 from datetime import datetime, timezone
 import tempfile
 import binascii
-import struct
 
-sys.path.insert(0, "bin/python")
-os.environ["PYTHONUNBUFFERED"] = "1"
 from collections import namedtuple
 import ldb
 from ldb import SCOPE_BASE
@@ -66,6 +63,9 @@ from samba.tests.krb5.rfc4120_constants import (
     PADATA_ETYPE_INFO2,
 )
 
+sys.path.insert(0, "bin/python")
+os.environ["PYTHONUNBUFFERED"] = "1"
+
 global_asn1_print = False
 global_hexdump = False
 
@@ -114,9 +114,9 @@ class KDCBaseTest(RawKerberosTest):
 
             session = system_session()
             type(self)._ldb = SamDB(url="ldap://%s" % self.host,
-                            session_info=session,
-                            credentials=creds,
-                            lp=lp)
+                                    session_info=session,
+                                    credentials=creds,
+                                    lp=lp)
 
         return self._ldb
 
@@ -337,6 +337,7 @@ class KDCBaseTest(RawKerberosTest):
                          require_strongest_key=False):
         if require_strongest_key:
             self.assertTrue(require_keys)
+
         def download_krbtgt_creds():
             samdb = self.get_samdb()
 
@@ -742,15 +743,16 @@ class KDCBaseTest(RawKerberosTest):
                             .replace(tzinfo=timezone.utc).timestamp())
 
         # Account for clock skew of up to five minutes.
-        self.assertLess(cred.authtime - 5*60,
+        self.assertLess(cred.authtime - 5 * 60,
                         datetime.now(timezone.utc).timestamp(),
                         "Ticket not yet valid - clocks may be out of sync.")
-        self.assertLess(cred.starttime - 5*60,
+        self.assertLess(cred.starttime - 5 * 60,
                         datetime.now(timezone.utc).timestamp(),
                         "Ticket not yet valid - clocks may be out of sync.")
-        self.assertGreater(cred.endtime - 60*60,
+        self.assertGreater(cred.endtime - 60 * 60,
                            datetime.now(timezone.utc).timestamp(),
-                           "Ticket already expired/about to expire - clocks may be out of sync.")
+                           "Ticket already expired/about to expire - "
+                           "clocks may be out of sync.")
 
         cred.renew_till = cred.endtime
         cred.is_skey = 0
index 9c090e4d005c137b3aeb29983a09e3f2801b24ca..de9c25751d283c55fec73e855af2d45274e23d46 100644 (file)
@@ -24,11 +24,19 @@ import datetime
 import random
 import binascii
 import itertools
+from pyasn1.codec.der.decoder import decode as pyasn1_der_decode
+from pyasn1.codec.der.encoder import encode as pyasn1_der_encode
+from pyasn1.codec.native.decoder import decode as pyasn1_native_decode
+from pyasn1.codec.native.encoder import encode as pyasn1_native_encode
+
+from pyasn1.codec.ber.encoder import BitStringEncoder
 
-import samba.tests
 from samba.credentials import Credentials
-from samba.tests import TestCaseInTempDir
 from samba.dcerpc import security
+
+import samba.tests
+from samba.tests import TestCaseInTempDir
+
 import samba.tests.krb5.rfc4120_pyasn1 as krb5_asn1
 from samba.tests.krb5.rfc4120_constants import (
     KDC_ERR_ETYPE_NOSUPP,
@@ -53,13 +61,6 @@ from samba.tests.krb5.rfc4120_constants import (
 )
 import samba.tests.krb5.kcrypto as kcrypto
 
-from pyasn1.codec.der.decoder import decode as pyasn1_der_decode
-from pyasn1.codec.der.encoder import encode as pyasn1_der_encode
-from pyasn1.codec.native.decoder import decode as pyasn1_native_decode
-from pyasn1.codec.native.encoder import encode as pyasn1_native_encode
-
-from pyasn1.codec.ber.encoder import BitStringEncoder as BitStringEncoder
-
 
 def BitStringEncoder_encodeValue32(
         self, value, asn1Spec, encodeFun, **options):
@@ -217,6 +218,7 @@ class Krb5EncryptionKey(object):
         }
         return EncryptionKey_obj
 
+
 class KerberosCredentials(Credentials):
     def __init__(self):
         super(KerberosCredentials, self).__init__()
@@ -293,6 +295,7 @@ class KerberosCredentials(Credentials):
     def get_forced_salt(self):
         return self.forced_salt
 
+
 class KerberosTicketCreds(object):
     def __init__(self, ticket, session_key,
                  crealm=None, cname=None,
@@ -311,14 +314,15 @@ class KerberosTicketCreds(object):
         self.encpart_private = encpart_private
         return
 
+
 class RawKerberosTest(TestCaseInTempDir):
     """A raw Kerberos Test case."""
 
     etypes_to_test = (
-        { "value": -1111, "name": "dummy", },
-        { "value": kcrypto.Enctype.AES256, "name": "aes128", },
-        { "value": kcrypto.Enctype.AES128, "name": "aes256", },
-        { "value": kcrypto.Enctype.RC4, "name": "rc4", },
+        {"value": -1111, "name": "dummy", },
+        {"value": kcrypto.Enctype.AES256, "name": "aes128", },
+        {"value": kcrypto.Enctype.AES128, "name": "aes256", },
+        {"value": kcrypto.Enctype.RC4, "name": "rc4", },
     )
 
     setup_etype_test_permutations_done = False
@@ -332,7 +336,7 @@ class RawKerberosTest(TestCaseInTempDir):
 
         num_idxs = len(cls.etypes_to_test)
         permutations = []
-        for num in range(1, num_idxs+1):
+        for num in range(1, num_idxs + 1):
             chunk = list(itertools.permutations(range(num_idxs), num))
             for e in chunk:
                 el = list(e)
@@ -349,7 +353,7 @@ class RawKerberosTest(TestCaseInTempDir):
                     name += "_%s" % n
                 etypes += (cls.etypes_to_test[idx]["value"],)
 
-            r = { "name": name, "etypes": etypes, }
+            r = {"name": name, "etypes": etypes, }
             res.append(r)
 
         cls.etype_test_permutations = res
@@ -386,7 +390,8 @@ class RawKerberosTest(TestCaseInTempDir):
         self.do_asn1_print = False
         self.do_hexdump = False
 
-        strict_checking = samba.tests.env_get_var_value('STRICT_CHECKING', allow_missing=True)
+        strict_checking = samba.tests.env_get_var_value('STRICT_CHECKING',
+                                                        allow_missing=True)
         if strict_checking is None:
             strict_checking = '1'
         self.strict_checking = bool(int(strict_checking))
@@ -440,8 +445,9 @@ class RawKerberosTest(TestCaseInTempDir):
         val = None
         if prefix is not None:
             allow_missing_prefix = allow_missing or fallback_default
-            val = samba.tests.env_get_var_value('%s_%s' % (prefix, varname),
-                                    allow_missing=allow_missing_prefix)
+            val = samba.tests.env_get_var_value(
+                '%s_%s' % (prefix, varname),
+                allow_missing=allow_missing_prefix)
         else:
             fallback_default = True
         if val is None and fallback_default:
@@ -506,7 +512,8 @@ class RawKerberosTest(TestCaseInTempDir):
         if aes256_key is not None:
             c.set_forced_key(kcrypto.Enctype.AES256, aes256_key)
         aes128_key = self.env_get_var('AES128_KEY_HEX', prefix,
-                                      fallback_default=False, allow_missing=True)
+                                      fallback_default=False,
+                                      allow_missing=True)
         if aes128_key is not None:
             c.set_forced_key(kcrypto.Enctype.AES128, aes128_key)
         rc4_key = self.env_get_var('RC4_KEY_HEX', prefix,
@@ -536,11 +543,12 @@ class RawKerberosTest(TestCaseInTempDir):
         env_err = None
         try:
             # Try to obtain them from the environment
-            creds = self._get_krb5_creds_from_env(prefix,
-                                                  default_username=default_username,
-                                                  allow_missing_password=allow_missing_password,
-                                                  allow_missing_keys=allow_missing_keys,
-                                                  require_strongest_key=require_strongest_key)
+            creds = self._get_krb5_creds_from_env(
+                prefix,
+                default_username=default_username,
+                allow_missing_password=allow_missing_password,
+                allow_missing_keys=allow_missing_keys,
+                require_strongest_key=require_strongest_key)
         except Exception as err:
             # An error occurred, so save it for later
             env_err = err
@@ -886,8 +894,8 @@ class RawKerberosTest(TestCaseInTempDir):
         return s
 
     def get_Nonce(self):
-        nonce_min=0x7f000000
-        nonce_max=0x7fffffff
+        nonce_min = 0x7f000000
+        nonce_max = 0x7fffffff
         v = random.randint(nonce_min, nonce_max)
         return v
 
@@ -936,15 +944,20 @@ class RawKerberosTest(TestCaseInTempDir):
         if etype == kcrypto.Enctype.RC4:
             nthash = creds.get_nt_hash()
             self.assertIsNotNone(nthash, msg=fail_msg)
-            return self.SessionKey_create(etype=etype, contents=nthash, kvno=kvno)
+            return self.SessionKey_create(etype=etype,
+                                          contents=nthash,
+                                          kvno=kvno)
 
         password = creds.get_password()
         self.assertIsNotNone(password, msg=fail_msg)
         salt = creds.get_forced_salt()
         if salt is None:
             salt = bytes("%s%s" % (creds.get_realm(), creds.get_username()),
-                    encoding='utf-8')
-        return self.PasswordKey_create(etype=etype, pwd=password, salt=salt, kvno=kvno)
+                         encoding='utf-8')
+        return self.PasswordKey_create(etype=etype,
+                                       pwd=password,
+                                       salt=salt,
+                                       kvno=kvno)
 
     def RandomKey(self, etype):
         e = kcrypto._get_enctype_profile(etype)
@@ -1020,10 +1033,12 @@ class RawKerberosTest(TestCaseInTempDir):
         return PA_ENC_TS_ENC_obj
 
     def KERB_PA_PAC_REQUEST_create(self, include_pac, pa_data_create=True):
-        #KERB-PA-PAC-REQUEST ::= SEQUENCE {
-        #        include-pac[0] BOOLEAN --If TRUE, and no pac present, include PAC.
-        #                               --If FALSE, and PAC present, remove PAC
-        #}
+        # KERB-PA-PAC-REQUEST ::= SEQUENCE {
+        #         include-pac[0] BOOLEAN --If TRUE, and no pac present,
+        #                                --    include PAC.
+        #                                --If FALSE, and PAC present,
+        #                                --    remove PAC.
+        # }
         KERB_PA_PAC_REQUEST_obj = {
             'include-pac': include_pac,
         }
@@ -1031,7 +1046,7 @@ class RawKerberosTest(TestCaseInTempDir):
             return KERB_PA_PAC_REQUEST_obj
         pa_pac = self.der_encode(KERB_PA_PAC_REQUEST_obj,
                                  asn1Spec=krb5_asn1.KERB_PA_PAC_REQUEST())
-        pa_data = self.PA_DATA_create(128, pa_pac) # PA-PAC-REQUEST
+        pa_data = self.PA_DATA_create(128, pa_pac)  # PA-PAC-REQUEST
         return pa_data
 
     def KDC_REQ_BODY_create(self,
@@ -1327,11 +1342,14 @@ class RawKerberosTest(TestCaseInTempDir):
             EncAuthorizationData=EncAuthorizationData,
             EncAuthorizationData_key=EncAuthorizationData_key,
             additional_tickets=additional_tickets)
-        req_body_blob = self.der_encode(req_body, asn1Spec=krb5_asn1.KDC_REQ_BODY(),
+        req_body_blob = self.der_encode(req_body,
+                                        asn1Spec=krb5_asn1.KDC_REQ_BODY(),
                                         asn1_print=asn1_print, hexdump=hexdump)
 
-        req_body_checksum = self.Checksum_create(
-            ticket_session_key, 6, req_body_blob, ctype=body_checksum_type)
+        req_body_checksum = self.Checksum_create(ticket_session_key,
+                                                 6,
+                                                 req_body_blob,
+                                                 ctype=body_checksum_type)
 
         subkey_obj = None
         if authenticator_subkey is not None:
@@ -1390,7 +1408,10 @@ class RawKerberosTest(TestCaseInTempDir):
             cksum_data += n.encode()
         cksum_data += realm.encode()
         cksum_data += "Kerberos".encode()
-        cksum = self.Checksum_create(tgt_session_key, 17, cksum_data, ctype)
+        cksum = self.Checksum_create(tgt_session_key,
+                                     17,
+                                     cksum_data,
+                                     ctype)
 
         PA_S4U2Self_obj = {
             'name': name,
@@ -1403,20 +1424,20 @@ class RawKerberosTest(TestCaseInTempDir):
         return self.PA_DATA_create(129, pa_s4u2self)
 
     def _generic_kdc_exchange(self,
-                              kdc_exchange_dict, # required
-                              kdc_options=None, # required
-                              cname=None, # optional
-                              realm=None, # required
-                              sname=None, # optional
-                              from_time=None, # optional
-                              till_time=None, # required
-                              renew_time=None, # optional
-                              nonce=None, # required
-                              etypes=None, # required
-                              addresses=None, # optional
-                              EncAuthorizationData=None, # optional
-                              EncAuthorizationData_key=None, # optional
-                              additional_tickets=None): # optional
+                              kdc_exchange_dict,  # required
+                              kdc_options=None,  # required
+                              cname=None,  # optional
+                              realm=None,  # required
+                              sname=None,  # optional
+                              from_time=None,  # optional
+                              till_time=None,  # required
+                              renew_time=None,  # optional
+                              nonce=None,  # required
+                              etypes=None,  # required
+                              addresses=None,  # optional
+                              EncAuthorizationData=None,  # optional
+                              EncAuthorizationData_key=None,  # optional
+                              additional_tickets=None):  # optional
 
         check_error_fn = kdc_exchange_dict['check_error_fn']
         check_rep_fn = kdc_exchange_dict['check_rep_fn']
@@ -1431,19 +1452,20 @@ class RawKerberosTest(TestCaseInTempDir):
         if nonce is None:
             nonce = self.get_Nonce()
 
-        req_body = self.KDC_REQ_BODY_create(kdc_options=kdc_options,
-                                            cname=cname,
-                                            realm=realm,
-                                            sname=sname,
-                                            from_time=from_time,
-                                            till_time=till_time,
-                                            renew_time=renew_time,
-                                            nonce=nonce,
-                                            etypes=etypes,
-                                            addresses=addresses,
-                                            EncAuthorizationData=EncAuthorizationData,
-                                            EncAuthorizationData_key=EncAuthorizationData_key,
-                                            additional_tickets=additional_tickets)
+        req_body = self.KDC_REQ_BODY_create(
+            kdc_options=kdc_options,
+            cname=cname,
+            realm=realm,
+            sname=sname,
+            from_time=from_time,
+            till_time=till_time,
+            renew_time=renew_time,
+            nonce=nonce,
+            etypes=etypes,
+            addresses=addresses,
+            EncAuthorizationData=EncAuthorizationData,
+            EncAuthorizationData_key=EncAuthorizationData_key,
+            additional_tickets=additional_tickets)
         if generate_padata_fn is not None:
             # This can alter req_body...
             padata, req_body = generate_padata_fn(kdc_exchange_dict,
@@ -1455,10 +1477,10 @@ class RawKerberosTest(TestCaseInTempDir):
         kdc_exchange_dict['req_padata'] = padata
         kdc_exchange_dict['req_body'] = req_body
 
-        req_obj,req_decoded = self.KDC_REQ_create(msg_type=req_msg_type,
-                                                  padata=padata,
-                                                  req_body=req_body,
-                                                  asn1Spec=req_asn1Spec())
+        req_obj, req_decoded = self.KDC_REQ_create(msg_type=req_msg_type,
+                                                   padata=padata,
+                                                   req_body=req_body,
+                                                   asn1Spec=req_asn1Spec())
 
         rep = self.send_recv_transaction(req_decoded)
         self.assertIsNotNone(rep)
@@ -1571,7 +1593,7 @@ class RawKerberosTest(TestCaseInTempDir):
         rep_encpart_asn1Spec = kdc_exchange_dict['rep_encpart_asn1Spec']
         msg_type = kdc_exchange_dict['rep_msg_type']
 
-        self.assertElementEqual(rep, 'msg-type', msg_type) # AS-REP | TGS-REP
+        self.assertElementEqual(rep, 'msg-type', msg_type)  # AS-REP | TGS-REP
         padata = self.getElementValue(rep, 'padata')
         self.assertElementEqualUTF8(rep, 'crealm', expected_crealm)
         self.assertElementEqualPrincipal(rep, 'cname', expected_cname)
@@ -1579,22 +1601,23 @@ class RawKerberosTest(TestCaseInTempDir):
         ticket = self.getElementValue(rep, 'ticket')
         ticket_encpart = None
         ticket_cipher = None
-        if ticket is not None: # Never None, but gives indentation
+        if ticket is not None:  # Never None, but gives indentation
             self.assertElementPresent(ticket, 'tkt-vno')
             self.assertElementEqualUTF8(ticket, 'realm', expected_srealm)
             self.assertElementEqualPrincipal(ticket, 'sname', expected_sname)
             self.assertElementPresent(ticket, 'enc-part')
             ticket_encpart = self.getElementValue(ticket, 'enc-part')
-            if ticket_encpart is not None: # Never None, but gives indentation
+            if ticket_encpart is not None:  # Never None, but gives indentation
                 self.assertElementPresent(ticket_encpart, 'etype')
                 # 'unspecified' means present, with any value != 0
-                self.assertElementKVNO(ticket_encpart, 'kvno', self.unspecified_kvno)
+                self.assertElementKVNO(ticket_encpart, 'kvno',
+                                       self.unspecified_kvno)
                 self.assertElementPresent(ticket_encpart, 'cipher')
                 ticket_cipher = self.getElementValue(ticket_encpart, 'cipher')
         self.assertElementPresent(rep, 'enc-part')
         encpart = self.getElementValue(rep, 'enc-part')
         encpart_cipher = None
-        if encpart is not None: # Never None, but gives indentation
+        if encpart is not None:  # Never None, but gives indentation
             self.assertElementPresent(encpart, 'etype')
             self.assertElementKVNO(ticket_encpart, 'kvno', 'autodetect')
             self.assertElementPresent(encpart, 'cipher')
@@ -1602,24 +1625,35 @@ class RawKerberosTest(TestCaseInTempDir):
 
         encpart_decryption_key = None
         if check_padata_fn is not None:
-            # See if get the decryption key from the preauth phase
-            encpart_decryption_key,encpart_decryption_usage = \
-                    check_padata_fn(kdc_exchange_dict, callback_dict,
-                                    rep, padata)
+            # See if we can get the decryption key from the preauth phase
+            encpart_decryption_key, encpart_decryption_usage = (
+                check_padata_fn(kdc_exchange_dict, callback_dict,
+                                rep, padata))
 
         ticket_private = None
         if ticket_decryption_key is not None:
-            self.assertElementEqual(ticket_encpart, 'etype', ticket_decryption_key.etype)
-            self.assertElementKVNO(ticket_encpart, 'kvno', ticket_decryption_key.kvno)
-            ticket_decpart = ticket_decryption_key.decrypt(KU_TICKET, ticket_cipher)
-            ticket_private = self.der_decode(ticket_decpart, asn1Spec=krb5_asn1.EncTicketPart())
+            self.assertElementEqual(ticket_encpart, 'etype',
+                                    ticket_decryption_key.etype)
+            self.assertElementKVNO(ticket_encpart, 'kvno',
+                                   ticket_decryption_key.kvno)
+            ticket_decpart = ticket_decryption_key.decrypt(KU_TICKET,
+                                                           ticket_cipher)
+            ticket_private = self.der_decode(
+                ticket_decpart,
+                asn1Spec=krb5_asn1.EncTicketPart())
 
         encpart_private = None
         if encpart_decryption_key is not None:
-            self.assertElementEqual(encpart, 'etype', encpart_decryption_key.etype)
-            self.assertElementKVNO(encpart, 'kvno', encpart_decryption_key.kvno)
-            rep_decpart = encpart_decryption_key.decrypt(encpart_decryption_usage, encpart_cipher)
-            encpart_private = self.der_decode(rep_decpart, asn1Spec=rep_encpart_asn1Spec())
+            self.assertElementEqual(encpart, 'etype',
+                                    encpart_decryption_key.etype)
+            self.assertElementKVNO(encpart, 'kvno',
+                                   encpart_decryption_key.kvno)
+            rep_decpart = encpart_decryption_key.decrypt(
+                encpart_decryption_usage,
+                encpart_cipher)
+            encpart_private = self.der_decode(
+                rep_decpart,
+                asn1Spec=rep_encpart_asn1Spec())
 
         if check_kdc_private_fn is not None:
             check_kdc_private_fn(kdc_exchange_dict, callback_dict,
@@ -1647,12 +1681,14 @@ class RawKerberosTest(TestCaseInTempDir):
             self.assertElementPresent(ticket_private, 'flags')
             self.assertElementPresent(ticket_private, 'key')
             ticket_key = self.getElementValue(ticket_private, 'key')
-            if ticket_key is not None: # Never None, but gives indentation
+            if ticket_key is not None:  # Never None, but gives indentation
                 self.assertElementPresent(ticket_key, 'keytype')
                 self.assertElementPresent(ticket_key, 'keyvalue')
                 ticket_session_key = self.EncryptionKey_import(ticket_key)
-            self.assertElementEqualUTF8(ticket_private, 'crealm', expected_crealm)
-            self.assertElementEqualPrincipal(ticket_private, 'cname', expected_cname)
+            self.assertElementEqualUTF8(ticket_private, 'crealm',
+                                        expected_crealm)
+            self.assertElementEqualPrincipal(ticket_private, 'cname',
+                                             expected_cname)
             self.assertElementPresent(ticket_private, 'transited')
             self.assertElementPresent(ticket_private, 'authtime')
             if self.strict_checking:
@@ -1666,39 +1702,45 @@ class RawKerberosTest(TestCaseInTempDir):
         if encpart_private is not None:
             self.assertElementPresent(encpart_private, 'key')
             encpart_key = self.getElementValue(encpart_private, 'key')
-            if encpart_key is not None: # Never None, but gives indentation
+            if encpart_key is not None:  # Never None, but gives indentation
                 self.assertElementPresent(encpart_key, 'keytype')
                 self.assertElementPresent(encpart_key, 'keyvalue')
                 encpart_session_key = self.EncryptionKey_import(encpart_key)
             self.assertElementPresent(encpart_private, 'last-req')
             self.assertElementPresent(encpart_private, 'nonce')
-            # TODO self.assertElementPresent(encpart_private, 'key-expiration')
+            # TODO self.assertElementPresent(encpart_private,
+            #                                'key-expiration')
             self.assertElementPresent(encpart_private, 'flags')
             self.assertElementPresent(encpart_private, 'authtime')
             if self.strict_checking:
                 self.assertElementPresent(encpart_private, 'starttime')
             self.assertElementPresent(encpart_private, 'endtime')
             # TODO self.assertElementPresent(encpart_private, 'renew-till')
-            self.assertElementEqualUTF8(encpart_private, 'srealm', expected_srealm)
-            self.assertElementEqualPrincipal(encpart_private, 'sname', expected_sname)
+            self.assertElementEqualUTF8(encpart_private, 'srealm',
+                                        expected_srealm)
+            self.assertElementEqualPrincipal(encpart_private, 'sname',
+                                             expected_sname)
             # TODO self.assertElementMissing(encpart_private, 'caddr')
 
         if ticket_session_key is not None and encpart_session_key is not None:
-            self.assertEqual(ticket_session_key.etype, encpart_session_key.etype)
-            self.assertEqual(ticket_session_key.key.contents, encpart_session_key.key.contents)
+            self.assertEqual(ticket_session_key.etype,
+                             encpart_session_key.etype)
+            self.assertEqual(ticket_session_key.key.contents,
+                             encpart_session_key.key.contents)
         if encpart_session_key is not None:
             session_key = encpart_session_key
         else:
             session_key = ticket_session_key
-        ticket_creds = KerberosTicketCreds(ticket,
-                                           session_key,
-                                           crealm=expected_crealm,
-                                           cname=expected_cname,
-                                           srealm=expected_srealm,
-                                           sname=expected_sname,
-                                           decryption_key=ticket_decryption_key,
-                                           ticket_private=ticket_private,
-                                           encpart_private=encpart_private)
+        ticket_creds = KerberosTicketCreds(
+            ticket,
+            session_key,
+            crealm=expected_crealm,
+            cname=expected_cname,
+            srealm=expected_srealm,
+            sname=expected_sname,
+            decryption_key=ticket_decryption_key,
+            ticket_private=ticket_private,
+            encpart_private=encpart_private)
 
         kdc_exchange_dict['rep_ticket_creds'] = ticket_creds
         return
@@ -1728,11 +1770,11 @@ class RawKerberosTest(TestCaseInTempDir):
         if kcrypto.Enctype.RC4 in proposed_etypes:
             expect_etype_info = True
         for etype in proposed_etypes:
-            if etype in (kcrypto.Enctype.AES256,kcrypto.Enctype.AES128):
+            if etype in (kcrypto.Enctype.AES256, kcrypto.Enctype.AES128):
                 expect_etype_info = False
             if etype not in client_as_etypes:
                 continue
-            if etype in (kcrypto.Enctype.AES256,kcrypto.Enctype.AES128):
+            if etype in (kcrypto.Enctype.AES256, kcrypto.Enctype.AES128):
                 if etype > expected_aes_type:
                     expected_aes_type = etype
             if etype in (kcrypto.Enctype.RC4,):
@@ -1779,14 +1821,17 @@ class RawKerberosTest(TestCaseInTempDir):
         if self.strict_checking:
             self.assertIsNotNone(edata)
         if edata is not None:
-            rep_padata = self.der_decode(edata, asn1Spec=krb5_asn1.METHOD_DATA())
+            rep_padata = self.der_decode(edata,
+                                         asn1Spec=krb5_asn1.METHOD_DATA())
             self.assertGreater(len(rep_padata), 0)
         else:
             rep_padata = []
 
         if self.strict_checking:
             for i in range(0, len(expected_patypes)):
-                self.assertElementEqual(rep_padata[i], 'padata-type', expected_patypes[i])
+                self.assertElementEqual(rep_padata[i],
+                                        'padata-type',
+                                        expected_patypes[i])
             self.assertEqual(len(rep_padata), len(expected_patypes))
 
         etype_info2 = None
@@ -1799,11 +1844,13 @@ class RawKerberosTest(TestCaseInTempDir):
             pavalue = self.getElementValue(pa, 'padata-value')
             if patype == PADATA_ETYPE_INFO2:
                 self.assertIsNone(etype_info2)
-                etype_info2 = self.der_decode(pavalue, asn1Spec=krb5_asn1.ETYPE_INFO2())
+                etype_info2 = self.der_decode(pavalue,
+                                              asn1Spec=krb5_asn1.ETYPE_INFO2())
                 continue
             if patype == PADATA_ETYPE_INFO:
                 self.assertIsNone(etype_info)
-                etype_info = self.der_decode(pavalue, asn1Spec=krb5_asn1.ETYPE_INFO())
+                etype_info = self.der_decode(pavalue,
+                                             asn1Spec=krb5_asn1.ETYPE_INFO())
                 continue
             if patype == PADATA_ENC_TIMESTAMP:
                 self.assertIsNone(enc_timestamp)
@@ -1881,7 +1928,8 @@ class RawKerberosTest(TestCaseInTempDir):
         authenticator_subkey = kdc_exchange_dict['authenticator_subkey']
         body_checksum_type = kdc_exchange_dict['body_checksum_type']
 
-        req_body_blob = self.der_encode(req_body, asn1Spec=krb5_asn1.KDC_REQ_BODY())
+        req_body_blob = self.der_encode(req_body,
+                                        asn1Spec=krb5_asn1.KDC_REQ_BODY())
 
         req_body_checksum = self.Checksum_create(tgt.session_key,
                                                  KU_TGS_REQ_AUTH_CKSUM,
@@ -1893,15 +1941,18 @@ class RawKerberosTest(TestCaseInTempDir):
             subkey_obj = authenticator_subkey.export_obj()
         seq_number = random.randint(0, 0xfffffffe)
         (ctime, cusec) = self.get_KerberosTimeWithUsec()
-        authenticator_obj = self.Authenticator_create(crealm=tgt.crealm,
-                                                      cname=tgt.cname,
-                                                      cksum=req_body_checksum,
-                                                      cusec=cusec,
-                                                      ctime=ctime,
-                                                      subkey=subkey_obj,
-                                                      seq_number=seq_number,
-                                                      authorization_data=None)
-        authenticator_blob = self.der_encode(authenticator_obj, asn1Spec=krb5_asn1.Authenticator())
+        authenticator_obj = self.Authenticator_create(
+            crealm=tgt.crealm,
+            cname=tgt.cname,
+            cksum=req_body_checksum,
+            cusec=cusec,
+            ctime=ctime,
+            subkey=subkey_obj,
+            seq_number=seq_number,
+            authorization_data=None)
+        authenticator_blob = self.der_encode(
+            authenticator_obj,
+            asn1Spec=krb5_asn1.Authenticator())
 
         authenticator = self.EncryptedData_create(tgt.session_key,
                                                   KU_TGS_REQ_AUTH,
@@ -1909,8 +1960,8 @@ class RawKerberosTest(TestCaseInTempDir):
 
         ap_options = krb5_asn1.APOptions('0')
         ap_req_obj = self.AP_REQ_create(ap_options=str(ap_options),
-                                    ticket=tgt.ticket,
-                                    authenticator=authenticator)
+                                        ticket=tgt.ticket,
+                                        authenticator=authenticator)
         ap_req = self.der_encode(ap_req_obj, asn1Spec=krb5_asn1.AP_REQ())
         pa_tgs_req = self.PA_DATA_create(PADATA_KDC_REQ, ap_req)
         padata = [pa_tgs_req]
@@ -1964,19 +2015,19 @@ class RawKerberosTest(TestCaseInTempDir):
             return preauth_key, as_rep_usage
 
         kdc_exchange_dict = self.as_exchange_dict(
-                         expected_crealm=expected_crealm,
-                         expected_cname=expected_cname,
-                         expected_srealm=expected_srealm,
-                         expected_sname=expected_sname,
-                         ticket_decryption_key=ticket_decryption_key,
-                         generate_padata_fn=_generate_padata_copy,
-                         check_error_fn=self.generic_check_as_error,
-                         check_rep_fn=self.generic_check_kdc_rep,
-                         check_padata_fn=_check_padata_preauth_key,
-                         check_kdc_private_fn=self.generic_check_kdc_private,
-                         expected_error_mode=expected_error_mode,
-                         client_as_etypes=client_as_etypes,
-                         expected_salt=expected_salt)
+            expected_crealm=expected_crealm,
+            expected_cname=expected_cname,
+            expected_srealm=expected_srealm,
+            expected_sname=expected_sname,
+            ticket_decryption_key=ticket_decryption_key,
+            generate_padata_fn=_generate_padata_copy,
+            check_error_fn=self.generic_check_as_error,
+            check_rep_fn=self.generic_check_kdc_rep,
+            check_padata_fn=_check_padata_preauth_key,
+            check_kdc_private_fn=self.generic_check_kdc_private,
+            expected_error_mode=expected_error_mode,
+            client_as_etypes=client_as_etypes,
+            expected_salt=expected_salt)
 
         rep = self._generic_kdc_exchange(kdc_exchange_dict,
                                          kdc_options=str(kdc_options),
@@ -1986,7 +2037,7 @@ class RawKerberosTest(TestCaseInTempDir):
                                          till_time=till,
                                          etypes=etypes)
 
-        if expected_error_mode == 0: # AS-REP
+        if expected_error_mode == 0:  # AS-REP
             return rep
 
         return kdc_exchange_dict['preauth_etype_info2']