From: Stefan Metzmacher Date: Wed, 17 Jul 2024 16:12:31 +0000 (+0200) Subject: python:lsa_utils: Fix fallback to OpenPolicy2 X-Git-Tag: samba-4.21.7~5 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=ccb5e9694e30028c8bf849fdab1f06ecca861d26;p=thirdparty%2Fsamba.git python:lsa_utils: Fix fallback to OpenPolicy2 BUG: https://bugzilla.samba.org/show_bug.cgi?id=15680 Pair-Programmed-With: Andreas Schneider Signed-off-by: Andreas Schneider Signed-off-by: Stefan Metzmacher Autobuild-User(master): Andreas Schneider Autobuild-Date(master): Mon Feb 17 18:33:15 UTC 2025 on atb-devel-224 (cherry picked from commit a814f5d90a3fb85a94c9516dba224037e8fd76f1) Autobuild-User(v4-22-test): Jule Anger Autobuild-Date(v4-22-test): Thu Feb 20 11:22:18 UTC 2025 on atb-devel-224 (cherry picked from commit 29bd6fe9cbe538b267bf0ed66823cfe8599afb3d) Autobuild-User(v4-21-test): Jule Anger Autobuild-Date(v4-21-test): Thu Jun 12 12:40:31 UTC 2025 on atb-devel-224 --- diff --git a/python/samba/lsa_utils.py b/python/samba/lsa_utils.py index 571beb46c85..506dc399c93 100644 --- a/python/samba/lsa_utils.py +++ b/python/samba/lsa_utils.py @@ -20,24 +20,27 @@ from samba.dcerpc import lsa, drsblobs, misc from samba.ndr import ndr_pack from samba import ( NTSTATUSError, + ntstatus, aead_aes_256_cbc_hmac_sha512, arcfour_encrypt, ) -from samba.ntstatus import ( - NT_STATUS_RPC_PROCNUM_OUT_OF_RANGE -) from samba import crypto from secrets import token_bytes +# FIXME from collections.abc import Callable def OpenPolicyFallback( - conn: lsa.lsarpc, + # new_lsa_conn: Callable[[], lsa.lsarpc], - FIXME the type doesn't work + # with python version 3.6 (CentOS8, SLES15). + new_lsa_conn, system_name: str, in_version: int, in_revision_info: lsa.revision_info1, sec_qos: bool, access_mask: int, ): + conn = new_lsa_conn() + attr = lsa.ObjectAttribute() if sec_qos: qos = lsa.QosInfo() @@ -48,26 +51,38 @@ def OpenPolicyFallback( attr.sec_qos = qos - try: - out_version, out_rev_info, policy = conn.OpenPolicy3( - system_name, - attr, - access_mask, - in_version, - in_revision_info - ) - except NTSTATUSError as e: - if e.args[0] == NT_STATUS_RPC_PROCNUM_OUT_OF_RANGE: - out_version = 1 - out_rev_info = lsa.revision_info1() - out_rev_info.revision = 1 - out_rev_info.supported_features = 0 - - policy = conn.OpenPolicy2(system_name, attr, access_mask) - else: - raise - - return out_version, out_rev_info, policy + open_policy2 = False + if in_revision_info is not None: + try: + out_version, out_rev_info, policy = conn.OpenPolicy3( + system_name, + attr, + access_mask, + in_version, + in_revision_info + ) + except NTSTATUSError as e: + if e.args[0] == ntstatus.NT_STATUS_RPC_PROCNUM_OUT_OF_RANGE: + open_policy2 = True + if e.args[0] == ntstatus.NT_STATUS_ACCESS_DENIED: + # We need a new connection + conn = new_lsa_conn(basis_connection=conn) + + open_policy2 = True + else: + raise + else: + open_policy2 = True + + if open_policy2: + out_version = 1 + out_rev_info = lsa.revision_info1() + out_rev_info.revision = 1 + out_rev_info.supported_features = 0 + + policy = conn.OpenPolicy2(system_name, attr, access_mask) + + return conn, out_version, out_rev_info, policy def CreateTrustedDomainRelax( diff --git a/python/samba/netcmd/domain/trust.py b/python/samba/netcmd/domain/trust.py index f39d4814a11..f3d75f84137 100644 --- a/python/samba/netcmd/domain/trust.py +++ b/python/samba/netcmd/domain/trust.py @@ -125,8 +125,13 @@ class DomainTrustCommand(Command): self.local_creds = local_creds return self.local_server - def new_local_lsa_connection(self): - return lsa.lsarpc(self.local_binding_string, self.local_lp, self.local_creds) + def new_local_lsa_connection(self, basis_connection=None): + return lsa.lsarpc( + self.local_binding_string, + self.local_lp, + self.local_creds, + basis_connection=basis_connection + ) def new_local_netlogon_connection(self): return netlogon.netlogon(self.local_binding_string, self.local_lp, self.local_creds) @@ -203,13 +208,23 @@ class DomainTrustCommand(Command): self.remote_creds = remote_creds return self.remote_server - def new_remote_lsa_connection(self): - return lsa.lsarpc(self.remote_binding_string, self.local_lp, self.remote_creds) + def new_remote_lsa_connection(self, basis_connection=None): + return lsa.lsarpc( + self.remote_binding_string, + self.local_lp, + self.remote_creds, + basis_connection=basis_connection + ) - def new_remote_netlogon_connection(self): - return netlogon.netlogon(self.remote_binding_string, self.local_lp, self.remote_creds) + def new_remote_netlogon_connection(self, basis_connection=None): + return netlogon.netlogon( + self.remote_binding_string, + self.local_lp, + self.remote_creds, + basis_connection=basis_connection + ) - def get_lsa_info(self, conn, policy_access): + def get_lsa_info(self, conn_fn, policy_access): in_version = 1 in_revision_info1 = lsa.revision_info1() in_revision_info1.revision = 1 @@ -217,9 +232,9 @@ class DomainTrustCommand(Command): lsa.LSA_FEATURE_TDO_AUTH_INFO_AES_CIPHER ) - out_version, out_revision_info1, policy = OpenPolicyFallback( - conn, - b''.decode('utf-8'), + conn, out_version, out_revision_info1, policy = OpenPolicyFallback( + conn_fn, + '', in_version, in_revision_info1, False, @@ -228,7 +243,7 @@ class DomainTrustCommand(Command): info = conn.QueryInfoPolicy2(policy, lsa.LSA_POLICY_INFO_DNS) - return (policy, out_version, out_revision_info1, info) + return (conn, policy, out_version, out_revision_info1, info) def get_netlogon_dc_unc(self, conn, server, domain): try: @@ -508,19 +523,15 @@ class cmd_domain_trust_show(DomainTrustCommand): def run(self, domain, sambaopts=None, versionopts=None, localdcopts=None): self.setup_local_server(sambaopts, localdcopts) - try: - local_lsa = self.new_local_lsa_connection() - except RuntimeError as error: - raise self.LocalRuntimeError(self, error, "failed to connect lsa server") - try: local_policy_access = lsa.LSA_POLICY_VIEW_LOCAL_INFORMATION ( + local_lsa, local_policy, local_version, local_revision_info1, local_lsa_info - ) = self.get_lsa_info(local_lsa, local_policy_access) + ) = self.get_lsa_info(self.new_local_lsa_connection, local_policy_access) except RuntimeError as error: raise self.LocalRuntimeError(self, error, "failed to query LSA_POLICY_INFO_DNS") @@ -649,19 +660,16 @@ class cmd_domain_trust_modify(DomainTrustCommand): raise CommandError("modification arguments are required, try --help") self.setup_local_server(sambaopts, localdcopts) - try: - local_lsa = self.new_local_lsa_connection() - except RuntimeError as error: - raise self.LocalRuntimeError(self, error, "failed to connect to lsa server") try: local_policy_access = lsa.LSA_POLICY_VIEW_LOCAL_INFORMATION ( + local_lsa, local_policy, local_version, local_revision_info1, local_lsa_info - ) = self.get_lsa_info(local_lsa, local_policy_access) + ) = self.get_lsa_info(self.new_local_lsa_connection, local_policy_access) except RuntimeError as error: raise self.LocalRuntimeError(self, error, "failed to query LSA_POLICY_INFO_DNS") @@ -908,18 +916,15 @@ class cmd_domain_trust_create(DomainTrustCommand): remote_trust_info.trust_attributes |= lsa.LSA_TRUST_ATTRIBUTE_TREAT_AS_EXTERNAL local_server = self.setup_local_server(sambaopts, localdcopts) - try: - local_lsa = self.new_local_lsa_connection() - except RuntimeError as error: - raise self.LocalRuntimeError(self, error, "failed to connect lsa server") try: ( + local_lsa, local_policy, local_version, local_revision_info1, local_lsa_info - ) = self.get_lsa_info(local_lsa, local_policy_access) + ) = self.get_lsa_info(self.new_local_lsa_connection, local_policy_access) except RuntimeError as error: raise self.LocalRuntimeError(self, error, "failed to query LSA_POLICY_INFO_DNS") @@ -933,18 +938,14 @@ class cmd_domain_trust_create(DomainTrustCommand): except RuntimeError as error: raise self.RemoteRuntimeError(self, error, "failed to locate remote server") - try: - remote_lsa = self.new_remote_lsa_connection() - except RuntimeError as error: - raise self.RemoteRuntimeError(self, error, "failed to connect lsa server") - try: ( + remote_lsa, remote_policy, remote_version, remote_revision_info1, remote_lsa_info - ) = self.get_lsa_info(remote_lsa, remote_policy_access) + ) = self.get_lsa_info(self.new_remote_lsa_connection, remote_policy_access) except RuntimeError as error: raise self.RemoteRuntimeError(self, error, "failed to query LSA_POLICY_INFO_DNS") @@ -1297,18 +1298,15 @@ class cmd_domain_trust_delete(DomainTrustCommand): remote_policy_access |= lsa.LSA_POLICY_CREATE_SECRET self.setup_local_server(sambaopts, localdcopts) - try: - local_lsa = self.new_local_lsa_connection() - except RuntimeError as error: - raise self.LocalRuntimeError(self, error, "failed to connect lsa server") try: ( + local_lsa, local_policy, local_version, local_revision_info1, local_lsa_info - ) = self.get_lsa_info(local_lsa, local_policy_access) + ) = self.get_lsa_info(self.new_local_lsa_connection, local_policy_access) except RuntimeError as error: raise self.LocalRuntimeError(self, error, "failed to query LSA_POLICY_INFO_DNS") @@ -1338,18 +1336,14 @@ class cmd_domain_trust_delete(DomainTrustCommand): except RuntimeError as error: raise self.RemoteRuntimeError(self, error, "failed to locate remote server") - try: - remote_lsa = self.new_remote_lsa_connection() - except RuntimeError as error: - raise self.RemoteRuntimeError(self, error, "failed to connect lsa server") - try: ( + remote_lsa, remote_policy, remote_version, remote_revision_info1, remote_lsa_info - ) = self.get_lsa_info(remote_lsa, remote_policy_access) + ) = self.get_lsa_info(self.new_remote_lsa_connection, remote_policy_access) except RuntimeError as error: raise self.RemoteRuntimeError(self, error, "failed to query LSA_POLICY_INFO_DNS") @@ -1450,18 +1444,15 @@ class cmd_domain_trust_validate(DomainTrustCommand): local_policy_access = lsa.LSA_POLICY_VIEW_LOCAL_INFORMATION local_server = self.setup_local_server(sambaopts, localdcopts) - try: - local_lsa = self.new_local_lsa_connection() - except RuntimeError as error: - raise self.LocalRuntimeError(self, error, "failed to connect lsa server") try: ( + local_lsa, local_policy, local_version, local_revision_info1, local_lsa_info - ) = self.get_lsa_info(local_lsa, local_policy_access) + ) = self.get_lsa_info(self.new_local_lsa_connection, local_policy_access) except RuntimeError as error: raise self.LocalRuntimeError(self, error, "failed to query LSA_POLICY_INFO_DNS") @@ -1897,11 +1888,12 @@ class cmd_domain_trust_namespaces(DomainTrustCommand): try: ( + local_lsa, local_policy, local_version, local_revision_info1, local_lsa_info - ) = self.get_lsa_info(local_lsa, local_policy_access) + ) = self.get_lsa_info(self.new_local_lsa_connection, local_policy_access) except RuntimeError as error: raise self.LocalRuntimeError(self, error, "failed to query LSA_POLICY_INFO_DNS") diff --git a/python/samba/tests/dcerpc/lsa_utils.py b/python/samba/tests/dcerpc/lsa_utils.py index fee9a45419b..8a3e7d24276 100644 --- a/python/samba/tests/dcerpc/lsa_utils.py +++ b/python/samba/tests/dcerpc/lsa_utils.py @@ -35,6 +35,7 @@ from samba.lsa_utils import ( class CreateTrustedDomain(TestCase): + smbencrypt = True def get_user_creds(self): c = Credentials() @@ -47,26 +48,35 @@ class CreateTrustedDomain(TestCase): c.set_password(password) return c - def _create_trust_relax(self, smbencrypt=True): + def new_lsa_conn(self, basis_connection=None): creds = self.get_user_creds() - - if smbencrypt: + if self.smbencrypt: creds.set_smb_encryption(SMB_ENCRYPTION_REQUIRED) else: creds.set_smb_encryption(SMB_ENCRYPTION_OFF) lp = self.get_loadparm() - binding_string = ( "ncacn_np:%s" % (samba.tests.env_get_var_value('SERVER')) ) - lsa_conn = lsa.lsarpc(binding_string, lp, creds) - if smbencrypt: + lsa_conn = lsa.lsarpc( + binding_string, + lp, + creds, + basis_connection=basis_connection + ) + + if self.smbencrypt: self.assertTrue(lsa_conn.transport_encrypted()) else: self.assertFalse(lsa_conn.transport_encrypted()) + return lsa_conn + + def _create_trust_relax(self, smbencrypt=True): + self.smbencrypt = smbencrypt + in_version = 1 in_revision_info1 = lsa.revision_info1() in_revision_info1.revision = 1 @@ -74,8 +84,13 @@ class CreateTrustedDomain(TestCase): lsa.LSA_FEATURE_TDO_AUTH_INFO_AES_CIPHER ) - out_version, out_revision_info1, pol_handle = OpenPolicyFallback( + ( lsa_conn, + out_version, + out_revision_info1, + pol_handle + ) = OpenPolicyFallback( + self.new_lsa_conn, '', in_version, in_revision_info1, @@ -148,14 +163,7 @@ class CreateTrustedDomain(TestCase): self.assertIsNone(trustdom_handle) def _create_trust_fallback(self): - creds = self.get_user_creds() - - lp = self.get_loadparm() - - binding_string = ( - "ncacn_np:%s" % (samba.tests.env_get_var_value('SERVER')) - ) - lsa_conn = lsa.lsarpc(binding_string, lp, creds) + self.smbencrypt = True in_version = 1 in_revision_info1 = lsa.revision_info1() @@ -164,8 +172,13 @@ class CreateTrustedDomain(TestCase): lsa.LSA_FEATURE_TDO_AUTH_INFO_AES_CIPHER ) - out_version, out_revision_info1, pol_handle = OpenPolicyFallback( + ( lsa_conn, + out_version, + out_revision_info1, + pol_handle + ) = OpenPolicyFallback( + self.new_lsa_conn, '', in_version, in_revision_info1,