]> git.ipfire.org Git - thirdparty/samba.git/commitdiff
tests/krb5/as_canonicalization_tests.py: Refactor account creation
authorJoseph Sutton <josephsutton@catalyst.net.nz>
Tue, 15 Jun 2021 23:49:05 +0000 (11:49 +1200)
committerStefan Metzmacher <metze@samba.org>
Thu, 1 Jul 2021 17:46:31 +0000 (17:46 +0000)
Making this test a subclass of KDCBaseTest allows us to make use of its
methods for obtaining credentials and creating accounts, which helps to
eliminate some duplicated code.

Signed-off-by: Joseph Sutton <josephsutton@catalyst.net.nz>
Reviewed-by: Andrew Bartlett <abartlet@samba.org>
Reviewed-by: Stefan Metzmacher <metze@samba.org>
python/samba/tests/krb5/as_canonicalization_tests.py

index 43f532dc483b424a7306fa40dc2cef36032e5a46..abb3f96a1e64a09e8a6c4b28c6deea1cfdffe9e0 100755 (executable)
@@ -25,20 +25,11 @@ import pyasn1
 sys.path.insert(0, "bin/python")
 os.environ["PYTHONUNBUFFERED"] = "1"
 
-from samba.tests.krb5.raw_testcase import RawKerberosTest
+from samba.tests.krb5.kdc_base_test import KDCBaseTest
 import samba.tests.krb5.rfc4120_pyasn1 as krb5_asn1
-import samba
-from samba.auth import system_session
-from samba.credentials import (
-    Credentials,
-    DONT_USE_KERBEROS)
+from samba.credentials import DONT_USE_KERBEROS
 from samba.dcerpc.misc import SEC_CHAN_WKSTA
-from samba.dsdb import (
-    UF_WORKSTATION_TRUST_ACCOUNT,
-    UF_PASSWD_NOTREQD,
-    UF_NORMAL_ACCOUNT)
-from samba.samdb import SamDB
-from samba.tests import delete_force, DynamicTestCase
+from samba.tests import DynamicTestCase
 from samba.tests.krb5.rfc4120_constants import (
     AES256_CTS_HMAC_SHA1_96,
     AES128_CTS_HMAC_SHA1_96,
@@ -96,12 +87,12 @@ class TestData:
         else:
             client_name_type = NT_PRINCIPAL
 
-        self.cname = RawKerberosTest.PrincipalName_create(
+        self.cname = KDCBaseTest.PrincipalName_create(
             name_type=client_name_type, names=[self.user_name])
         if TestOptions.AsReqSelf.is_set(options):
             self.sname = self.cname
         else:
-            self.sname = RawKerberosTest.PrincipalName_create(
+            self.sname = KDCBaseTest.PrincipalName_create(
                 name_type=NT_SRV_INST, names=["krbtgt", self.realm])
         self.canonicalize = TestOptions.Canonicalize.is_set(options)
 
@@ -141,7 +132,7 @@ USER_NAME = "tstkrb5cnnusr"
 
 
 @DynamicTestCase
-class KerberosASCanonicalizationTests(RawKerberosTest):
+class KerberosASCanonicalizationTests(KDCBaseTest):
 
     @classmethod
     def setUpDynamicTestCases(cls):
@@ -170,114 +161,37 @@ class KerberosASCanonicalizationTests(RawKerberosTest):
                 name = build_test_name(ct, x)
                 cls.generate_dynamic_test("test", name, x, ct)
 
-    @classmethod
-    def setUpClass(cls):
-        cls.lp = cls.get_loadparm(cls)
-        cls.username = os.environ["USERNAME"]
-        cls.password = os.environ["PASSWORD"]
-        cls.host = os.environ["SERVER"]
-
-        c = Credentials()
-        c.set_username(cls.username)
-        c.set_password(cls.password)
-        try:
-            realm = os.environ["REALM"]
-            c.set_realm(realm)
-        except KeyError:
-            pass
-        try:
-            domain = os.environ["DOMAIN"]
-            c.set_domain(domain)
-        except KeyError:
-            pass
+    def user_account_creds(self):
+        if self.user_creds is None:
+            samdb = self.get_samdb()
+            self.user_creds, _ = self.create_account(samdb, USER_NAME)
 
-        c.guess()
+        return self.user_creds
 
-        cls.credentials = c
+    def machine_account_creds(self):
+        if self.machine_creds is None:
+            samdb = self.get_samdb()
+            self.machine_creds, _ = self.create_account(samdb,
+                                                        MACHINE_NAME,
+                                                        machine_account=True)
+            self.machine_creds.set_secure_channel_type(SEC_CHAN_WKSTA)
+            self.machine_creds.set_kerberos_state(DONT_USE_KERBEROS)
 
-        cls.session = system_session()
-        cls.ldb = SamDB(url="ldap://%s" % cls.host,
-                        session_info=cls.session,
-                        credentials=cls.credentials,
-                        lp=cls.lp)
-        cls.create_machine_account()
-        cls.create_user_account()
-
-    @classmethod
-    def tearDownClass(cls):
-        super(KerberosASCanonicalizationTests, cls).tearDownClass()
-        delete_force(cls.ldb, cls.machine_dn)
-        delete_force(cls.ldb, cls.user_dn)
+        return self.machine_creds
 
     def setUp(self):
-        super(KerberosASCanonicalizationTests, self).setUp()
+        super().setUp()
         self.do_asn1_print = global_asn1_print
         self.do_hexdump = global_hexdump
 
-    #
-    # Create a test user account
-    @classmethod
-    def create_user_account(cls):
-        cls.user_pass = samba.generate_random_password(32, 32)
-        cls.user_name = USER_NAME
-        cls.user_dn = "cn=%s,%s" % (cls.user_name, cls.ldb.domain_dn())
-
-        # remove the account if it exists, this will happen if a previous test
-        # run failed
-        delete_force(cls.ldb, cls.user_dn)
-
-        utf16pw = ('"%s"' % cls.user_pass).encode('utf-16-le')
-        cls.ldb.add({
-            "dn": cls.user_dn,
-            "objectclass": "user",
-            "sAMAccountName": "%s" % cls.user_name,
-            "userAccountControl": str(UF_NORMAL_ACCOUNT),
-            "unicodePwd": utf16pw})
-
-        cls.user_creds = Credentials()
-        cls.user_creds.guess(cls.lp)
-        cls.user_creds.set_realm(cls.ldb.domain_dns_name().upper())
-        cls.user_creds.set_domain(cls.ldb.domain_netbios_name().upper())
-        cls.user_creds.set_password(cls.user_pass)
-        cls.user_creds.set_username(cls.user_name)
-        cls.user_creds.set_workstation(cls.machine_name)
-
-    #
-    # Create the machine account
-    @classmethod
-    def create_machine_account(cls):
-        cls.machine_pass = samba.generate_random_password(32, 32)
-        cls.machine_name = MACHINE_NAME
-        cls.machine_dn = "cn=%s,%s" % (cls.machine_name, cls.ldb.domain_dn())
-
-        # remove the account if it exists, this will happen if a previous test
-        # run failed
-        delete_force(cls.ldb, cls.machine_dn)
-
-        utf16pw = ('"%s"' % cls.machine_pass).encode('utf-16-le')
-        cls.ldb.add({
-            "dn": cls.machine_dn,
-            "objectclass": "computer",
-            "sAMAccountName": "%s$" % cls.machine_name,
-            "userAccountControl":
-                str(UF_WORKSTATION_TRUST_ACCOUNT | UF_PASSWD_NOTREQD),
-            "unicodePwd": utf16pw})
-
-        cls.machine_creds = Credentials()
-        cls.machine_creds.guess(cls.lp)
-        cls.machine_creds.set_realm(cls.ldb.domain_dns_name().upper())
-        cls.machine_creds.set_domain(cls.ldb.domain_netbios_name().upper())
-        cls.machine_creds.set_secure_channel_type(SEC_CHAN_WKSTA)
-        cls.machine_creds.set_kerberos_state(DONT_USE_KERBEROS)
-        cls.machine_creds.set_password(cls.machine_pass)
-        cls.machine_creds.set_username(cls.machine_name + "$")
-        cls.machine_creds.set_workstation(cls.machine_name)
+        self.user_creds = None
+        self.machine_creds = None
 
     def _test_with_args(self, x, ct):
         if ct == CredentialsType.User:
-            creds = self.user_creds
+            creds = self.user_account_creds()
         elif ct == CredentialsType.Machine:
-            creds = self.machine_creds
+            creds = self.machine_account_creds()
         else:
             raise Exception("Unexpected credential type")
         data = TestData(x, creds)