From: Gary Lockyer Date: Thu, 15 Jun 2017 03:55:43 +0000 (+1200) Subject: pycredentials: add function to return the netr_Authenticator X-Git-Tag: tevent-0.9.32~29 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=b68a3374a56c7117aa1d9fedf56940b6a1ed1cae;p=thirdparty%2Fsamba.git pycredentials: add function to return the netr_Authenticator Add method new_client_authenticator that returns data to allow a netr_Authenticator to be constructed. Allows python to make netr_LogonSamLogonWithFlags, netr_LogonGetDomainInfo and similar calls Signed-off-by: Gary Lockyer Reviewed-by: Garming Sam Reviewed-by: Andrew Bartlett --- diff --git a/auth/credentials/pycredentials.c b/auth/credentials/pycredentials.c index fee9556b180..30698d49dc5 100644 --- a/auth/credentials/pycredentials.c +++ b/auth/credentials/pycredentials.c @@ -26,6 +26,8 @@ #include "libcli/util/pyerrors.h" #include "param/pyparam.h" #include +#include "libcli/auth/libcli_auth.h" +#include "auth/credentials/credentials_internal.h" void initcredentials(void); @@ -584,6 +586,39 @@ static PyObject *py_creds_get_gensec_features(PyObject *self, PyObject *args) return PyInt_FromLong(gensec_features); } +static PyObject *py_creds_new_client_authenticator(PyObject *self, + PyObject *args) +{ + struct netr_Authenticator auth; + struct cli_credentials *creds = NULL; + struct netlogon_creds_CredentialState *nc = NULL; + PyObject *ret = NULL; + + creds = PyCredentials_AsCliCredentials(self); + if (creds == NULL) { + PyErr_SetString(PyExc_RuntimeError, + "Failed to get credentials from python"); + return NULL; + } + + nc = creds->netlogon_creds; + if (nc == NULL) { + PyErr_SetString(PyExc_ValueError, + "No netlogon credentials cannot make " + "client authenticator"); + return NULL; + } + + netlogon_creds_client_authenticator( + nc, + &auth); + ret = Py_BuildValue("{ss#si}", + "credential", + (const char *) &auth.cred, sizeof(auth.cred), + "timestamp", auth.timestamp); + return ret; +} + static PyObject *py_creds_set_secure_channel_type(PyObject *self, PyObject *args) { unsigned int channel_type; @@ -700,6 +735,11 @@ static PyMethodDef py_creds_methods[] = { { "set_forced_sasl_mech", py_creds_set_forced_sasl_mech, METH_VARARGS, "S.set_forced_sasl_mech(name) -> None\n" "Set forced SASL mechanism." }, + { "new_client_authenticator", + py_creds_new_client_authenticator, + METH_NOARGS, + "S.new_client_authenticator() -> Authenticator\n" + "Get a new client NETLOGON_AUTHENTICATOR"}, { "set_secure_channel_type", py_creds_set_secure_channel_type, METH_VARARGS, NULL }, { NULL } diff --git a/python/samba/tests/py_credentials.py b/python/samba/tests/py_credentials.py new file mode 100644 index 00000000000..fd9853ae7e1 --- /dev/null +++ b/python/samba/tests/py_credentials.py @@ -0,0 +1,241 @@ +# Integration tests for pycredentials +# +# Copyright (C) Catalyst IT Ltd. 2017 +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . +# +from samba.tests import TestCase, delete_force +import os + +import samba +from samba.auth import system_session +from samba.credentials import Credentials, CLI_CRED_NTLMv2_AUTH +from samba.dcerpc import netlogon, ntlmssp +from samba.dcerpc.netlogon import netr_Authenticator, netr_WorkstationInformation +from samba.dcerpc.misc import SEC_CHAN_WKSTA +from samba.dsdb import ( + UF_WORKSTATION_TRUST_ACCOUNT, + UF_PASSWD_NOTREQD, + UF_NORMAL_ACCOUNT) +from samba.ndr import ndr_pack +from samba.samdb import SamDB +""" +Integration tests for pycredentials +""" + +MACHINE_NAME = "PCTM" +USER_NAME = "PCTU" + +class PyCredentialsTests(TestCase): + + def setUp(self): + super(PyCredentialsTests, self).setUp() + + self.server = os.environ["SERVER"] + self.domain = os.environ["DOMAIN"] + self.host = os.environ["SERVER_IP"] + self.lp = self.get_loadparm() + + self.credentials = self.get_credentials() + + self.session = system_session() + self.ldb = SamDB(url="ldap://%s" % self.host, + session_info=self.session, + credentials=self.credentials, + lp=self.lp) + + self.create_machine_account() + self.create_user_account() + + + def tearDown(self): + super(PyCredentialsTests, self).tearDown() + delete_force(self.ldb, self.machine_dn) + delete_force(self.ldb, self.user_dn) + + # Until a successful netlogon connection has been established there will + # not be a valid authenticator associated with the credentials + # and new_client_authenticator should throw a ValueError + def test_no_netlogon_connection(self): + self.assertRaises(ValueError, + self.machine_creds.new_client_authenticator) + + # Once a netlogon connection has been established, + # new_client_authenticator should return a value + # + def test_have_netlogon_connection(self): + c = self.get_netlogon_connection() + a = self.machine_creds.new_client_authenticator() + self.assertIsNotNone(a) + + # Get an authenticator and use it on a sequence of operations requiring + # an authenticator + def test_client_authenticator(self): + c = self.get_netlogon_connection() + (authenticator, subsequent) = self.get_authenticator(c) + self.do_NetrLogonSamLogonWithFlags(c, authenticator, subsequent) + (authenticator, subsequent) = self.get_authenticator(c) + self.do_NetrLogonGetDomainInfo(c, authenticator, subsequent) + (authenticator, subsequent) = self.get_authenticator(c) + self.do_NetrLogonGetDomainInfo(c, authenticator, subsequent) + (authenticator, subsequent) = self.get_authenticator(c) + self.do_NetrLogonGetDomainInfo(c, authenticator, subsequent) + + + + # + # Establish aealed schannel netlogon connection over TCP/IP + def get_netlogon_connection(self): + return netlogon.netlogon("ncacn_ip_tcp:%s[schannel,seal]" % self.server, + self.lp, + self.machine_creds) + + # + # Create the machine account + def create_machine_account(self): + self.machine_pass = samba.generate_random_password(32, 32) + self.machine_name = MACHINE_NAME + self.machine_dn = "cn=%s,%s" % (self.machine_name, self.ldb.domain_dn()) + + # remove the account if it exists, this will happen if a previous test + # run failed + delete_force(self.ldb, self.machine_dn) + + utf16pw = unicode( + '"' + self.machine_pass.encode('utf-8') + '"', 'utf-8' + ).encode('utf-16-le') + self.ldb.add({ + "dn": self.machine_dn, + "objectclass": "computer", + "sAMAccountName": "%s$" % self.machine_name, + "userAccountControl": + str(UF_WORKSTATION_TRUST_ACCOUNT | UF_PASSWD_NOTREQD), + "unicodePwd": utf16pw}) + + self.machine_creds = Credentials() + self.machine_creds.guess(self.get_loadparm()) + self.machine_creds.set_secure_channel_type(SEC_CHAN_WKSTA) + self.machine_creds.set_password(self.machine_pass) + self.machine_creds.set_username(self.machine_name + "$") + + # + # Create a test user account + def create_user_account(self): + self.user_pass = samba.generate_random_password(32, 32) + self.user_name = USER_NAME + self.user_dn = "cn=%s,%s" % (self.user_name, self.ldb.domain_dn()) + + # remove the account if it exists, this will happen if a previous test + # run failed + delete_force(self.ldb, self.user_dn) + + utf16pw = unicode( + '"' + self.user_pass.encode('utf-8') + '"', 'utf-8' + ).encode('utf-16-le') + self.ldb.add({ + "dn": self.user_dn, + "objectclass": "user", + "sAMAccountName": "%s" % self.user_name, + "userAccountControl": str(UF_NORMAL_ACCOUNT), + "unicodePwd": utf16pw}) + + self.user_creds = Credentials() + self.user_creds.guess(self.get_loadparm()) + self.user_creds.set_password(self.user_pass) + self.user_creds.set_username(self.user_name) + pass + + # + # Get the authenticator from the machine creds. + def get_authenticator(self, c): + auth = self.machine_creds.new_client_authenticator(); + current = netr_Authenticator() + current.cred.data = [ord(x) for x in auth["credential"]] + current.timestamp = auth["timestamp"] + + subsequent = netr_Authenticator() + return (current, subsequent) + + def do_NetrLogonSamLogonWithFlags(self, c, current, subsequent): + logon = samlogon_logon_info(self.domain, + self.machine_name, + self.user_creds) + + logon_level = netlogon.NetlogonNetworkTransitiveInformation + validation_level = netlogon.NetlogonValidationSamInfo4 + netr_flags = 0 + c.netr_LogonSamLogonWithFlags(self.server, + self.user_creds.get_workstation(), + current, + subsequent, + logon_level, + logon, + validation_level, + netr_flags) + + def do_NetrLogonGetDomainInfo(self, c, current, subsequent): + query = netr_WorkstationInformation() + + c.netr_LogonGetDomainInfo(self.server, + self.user_creds.get_workstation(), + current, + subsequent, + 2, + query) + +# +# Build the logon data required by NetrLogonSamLogonWithFlags +def samlogon_logon_info(domain_name, computer_name, creds): + + target_info_blob = samlogon_target(domain_name, computer_name) + + challenge = b"abcdefgh" + # User account under test + response = creds.get_ntlm_response(flags=CLI_CRED_NTLMv2_AUTH, + challenge=challenge, + target_info=target_info_blob) + + logon = netlogon.netr_NetworkInfo() + + logon.challenge = [ord(x) for x in challenge] + logon.nt = netlogon.netr_ChallengeResponse() + logon.nt.length = len(response["nt_response"]) + logon.nt.data = [ord(x) for x in response["nt_response"]] + logon.identity_info = netlogon.netr_IdentityInfo() + + (username, domain) = creds.get_ntlm_username_domain() + logon.identity_info.domain_name.string = domain + logon.identity_info.account_name.string = username + logon.identity_info.workstation.string = creds.get_workstation() + + return logon + +# +# Build the samlogon target info. +def samlogon_target(domain_name, computer_name): + target_info = ntlmssp.AV_PAIR_LIST() + target_info.count = 3 + computername = ntlmssp.AV_PAIR() + computername.AvId = ntlmssp.MsvAvNbComputerName + computername.Value = computer_name + + domainname = ntlmssp.AV_PAIR() + domainname.AvId = ntlmssp.MsvAvNbDomainName + domainname.Value = domain_name + + eol = ntlmssp.AV_PAIR() + eol.AvId = ntlmssp.MsvAvEOL + target_info.pair = [domainname, computername, eol] + + return ndr_pack(target_info) diff --git a/source4/selftest/tests.py b/source4/selftest/tests.py index b70faafa5e5..049009fe125 100755 --- a/source4/selftest/tests.py +++ b/source4/selftest/tests.py @@ -659,6 +659,9 @@ planoldpythontestsuite("ad_dc", extra_args=['-U"$USERNAME%$PASSWORD"']) planpythontestsuite("ad_dc_ntvfs:local", "samba.tests.lsa_string") +planoldpythontestsuite("ad_dc_ntvfs", + "samba.tests.py_credentials", + extra_args=['-U"$USERNAME%$PASSWORD"']) plantestsuite_loadlist("samba4.ldap.python(ad_dc_ntvfs)", "ad_dc_ntvfs", [python, os.path.join(samba4srcdir, "dsdb/tests/python/ldap.py"), '$SERVER', '-U"$USERNAME%$PASSWORD"', '--workgroup=$DOMAIN', '$LOADLIST', '$LISTOPT']) plantestsuite_loadlist("samba4.tokengroups.krb5.python(ad_dc_ntvfs)", "ad_dc_ntvfs:local", [python, os.path.join(samba4srcdir, "dsdb/tests/python/token_group.py"), '$SERVER', '-U"$USERNAME%$PASSWORD"', '--workgroup=$DOMAIN', '-k', 'yes', '$LOADLIST', '$LISTOPT'])