sys.path.insert(0, "bin/python")
os.environ["PYTHONUNBUFFERED"] = "1"
-from samba.tests.krb5.raw_testcase import RawKerberosTest
+from samba.tests.krb5.kdc_base_test import KDCBaseTest
import samba.tests.krb5.rfc4120_pyasn1 as krb5_asn1
-import samba
-from samba.auth import system_session
-from samba.credentials import (
- Credentials,
- DONT_USE_KERBEROS)
+from samba.credentials import DONT_USE_KERBEROS
from samba.dcerpc.misc import SEC_CHAN_WKSTA
-from samba.dsdb import (
- UF_WORKSTATION_TRUST_ACCOUNT,
- UF_PASSWD_NOTREQD,
- UF_NORMAL_ACCOUNT)
-from samba.samdb import SamDB
-from samba.tests import delete_force, DynamicTestCase
+from samba.tests import DynamicTestCase
from samba.tests.krb5.rfc4120_constants import (
AES256_CTS_HMAC_SHA1_96,
AES128_CTS_HMAC_SHA1_96,
else:
client_name_type = NT_PRINCIPAL
- self.cname = RawKerberosTest.PrincipalName_create(
+ self.cname = KDCBaseTest.PrincipalName_create(
name_type=client_name_type, names=[self.user_name])
if TestOptions.AsReqSelf.is_set(options):
self.sname = self.cname
else:
- self.sname = RawKerberosTest.PrincipalName_create(
+ self.sname = KDCBaseTest.PrincipalName_create(
name_type=NT_SRV_INST, names=["krbtgt", self.realm])
self.canonicalize = TestOptions.Canonicalize.is_set(options)
@DynamicTestCase
-class KerberosASCanonicalizationTests(RawKerberosTest):
+class KerberosASCanonicalizationTests(KDCBaseTest):
@classmethod
def setUpDynamicTestCases(cls):
name = build_test_name(ct, x)
cls.generate_dynamic_test("test", name, x, ct)
- @classmethod
- def setUpClass(cls):
- cls.lp = cls.get_loadparm(cls)
- cls.username = os.environ["USERNAME"]
- cls.password = os.environ["PASSWORD"]
- cls.host = os.environ["SERVER"]
-
- c = Credentials()
- c.set_username(cls.username)
- c.set_password(cls.password)
- try:
- realm = os.environ["REALM"]
- c.set_realm(realm)
- except KeyError:
- pass
- try:
- domain = os.environ["DOMAIN"]
- c.set_domain(domain)
- except KeyError:
- pass
+ def user_account_creds(self):
+ if self.user_creds is None:
+ samdb = self.get_samdb()
+ self.user_creds, _ = self.create_account(samdb, USER_NAME)
- c.guess()
+ return self.user_creds
- cls.credentials = c
+ def machine_account_creds(self):
+ if self.machine_creds is None:
+ samdb = self.get_samdb()
+ self.machine_creds, _ = self.create_account(samdb,
+ MACHINE_NAME,
+ machine_account=True)
+ self.machine_creds.set_secure_channel_type(SEC_CHAN_WKSTA)
+ self.machine_creds.set_kerberos_state(DONT_USE_KERBEROS)
- cls.session = system_session()
- cls.ldb = SamDB(url="ldap://%s" % cls.host,
- session_info=cls.session,
- credentials=cls.credentials,
- lp=cls.lp)
- cls.create_machine_account()
- cls.create_user_account()
-
- @classmethod
- def tearDownClass(cls):
- super(KerberosASCanonicalizationTests, cls).tearDownClass()
- delete_force(cls.ldb, cls.machine_dn)
- delete_force(cls.ldb, cls.user_dn)
+ return self.machine_creds
def setUp(self):
- super(KerberosASCanonicalizationTests, self).setUp()
+ super().setUp()
self.do_asn1_print = global_asn1_print
self.do_hexdump = global_hexdump
- #
- # Create a test user account
- @classmethod
- def create_user_account(cls):
- cls.user_pass = samba.generate_random_password(32, 32)
- cls.user_name = USER_NAME
- cls.user_dn = "cn=%s,%s" % (cls.user_name, cls.ldb.domain_dn())
-
- # remove the account if it exists, this will happen if a previous test
- # run failed
- delete_force(cls.ldb, cls.user_dn)
-
- utf16pw = ('"%s"' % cls.user_pass).encode('utf-16-le')
- cls.ldb.add({
- "dn": cls.user_dn,
- "objectclass": "user",
- "sAMAccountName": "%s" % cls.user_name,
- "userAccountControl": str(UF_NORMAL_ACCOUNT),
- "unicodePwd": utf16pw})
-
- cls.user_creds = Credentials()
- cls.user_creds.guess(cls.lp)
- cls.user_creds.set_realm(cls.ldb.domain_dns_name().upper())
- cls.user_creds.set_domain(cls.ldb.domain_netbios_name().upper())
- cls.user_creds.set_password(cls.user_pass)
- cls.user_creds.set_username(cls.user_name)
- cls.user_creds.set_workstation(cls.machine_name)
-
- #
- # Create the machine account
- @classmethod
- def create_machine_account(cls):
- cls.machine_pass = samba.generate_random_password(32, 32)
- cls.machine_name = MACHINE_NAME
- cls.machine_dn = "cn=%s,%s" % (cls.machine_name, cls.ldb.domain_dn())
-
- # remove the account if it exists, this will happen if a previous test
- # run failed
- delete_force(cls.ldb, cls.machine_dn)
-
- utf16pw = ('"%s"' % cls.machine_pass).encode('utf-16-le')
- cls.ldb.add({
- "dn": cls.machine_dn,
- "objectclass": "computer",
- "sAMAccountName": "%s$" % cls.machine_name,
- "userAccountControl":
- str(UF_WORKSTATION_TRUST_ACCOUNT | UF_PASSWD_NOTREQD),
- "unicodePwd": utf16pw})
-
- cls.machine_creds = Credentials()
- cls.machine_creds.guess(cls.lp)
- cls.machine_creds.set_realm(cls.ldb.domain_dns_name().upper())
- cls.machine_creds.set_domain(cls.ldb.domain_netbios_name().upper())
- cls.machine_creds.set_secure_channel_type(SEC_CHAN_WKSTA)
- cls.machine_creds.set_kerberos_state(DONT_USE_KERBEROS)
- cls.machine_creds.set_password(cls.machine_pass)
- cls.machine_creds.set_username(cls.machine_name + "$")
- cls.machine_creds.set_workstation(cls.machine_name)
+ self.user_creds = None
+ self.machine_creds = None
def _test_with_args(self, x, ct):
if ct == CredentialsType.User:
- creds = self.user_creds
+ creds = self.user_account_creds()
elif ct == CredentialsType.Machine:
- creds = self.machine_creds
+ creds = self.machine_account_creds()
else:
raise Exception("Unexpected credential type")
data = TestData(x, creds)