From: Samuel Cabrero Date: Tue, 21 Feb 2023 17:02:26 +0000 (+0100) Subject: pytests/varlink: Add varlink tests X-Git-Tag: tevent-0.17.0~728 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=0653b4b1c95d197525c41faaff021e088b18edd1;p=thirdparty%2Fsamba.git pytests/varlink: Add varlink tests Signed-off-by: Samuel Cabrero Reviewed-by: Andreas Schneider --- diff --git a/python/samba/tests/varlink/base.py b/python/samba/tests/varlink/base.py new file mode 100644 index 00000000000..3e313ec3fdc --- /dev/null +++ b/python/samba/tests/varlink/base.py @@ -0,0 +1,89 @@ +# Unix SMB/CIFS implementation. +# +# Copyright (C) Samuel Cabrero 2023 +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . + +""" Winbind varlink service tests, base class """ + +from samba.tests.samba_tool.base import SambaToolCmdTest +from samba.auth import system_session +from samba.samdb import SamDB +from samba.credentials import Credentials +import os, pwd, grp +import varlink +import samba +import subprocess + +class VarlinkTestCase(SambaToolCmdTest): + + def setUp(self): + super().setUp() + sdir = samba.tests.env_get_var_value("SELFTEST_WINBINDD_SOCKET_DIR") + uri = "unix:" + os.path.join(sdir, "org.samba.selftest") + self.cli = varlink.Client.new_with_address(uri) + self.assertIsNotNone(self.cli) + + self.lp = samba.tests.env_loadparm() + self.domain = samba.tests.env_get_var_value("DOMAIN") + self.winbind_separator = self.lp.get('winbind separator') + self.varlink_service = self.lp.get('winbind varlink : service name') + + self.bindir = os.path.normpath(os.getenv("BINDIR", "./bin")) + self.netcmd = os.path.join(self.bindir, "net") + + self.ldb = SamDB( + session_info=system_session(), + credentials=Credentials(), + lp=self.lp) + + self.users = [] + self.groups = [] + members = [] + for i in range(0, 3): + username = "vl_test_user_%d" % i + groupname = "vl_test_group_%d" % i + + subprocess.Popen([self.netcmd, "cache", "del", "NAME2SID/%s\\%s" + % (self.domain, username.upper())], stdout=subprocess.PIPE) + self.runsubcmd("user", "create", username, self.random_password()) + + subprocess.Popen([self.netcmd, "cache", "del", "NAME2SID/%s\\%s" + % (self.domain, groupname.upper())], stdout=subprocess.PIPE) + self.runsubcmd("group", "create", groupname) + + members.append(username) + for m in members: + self.runsubcmd("group", "addmembers", groupname, m) + + grent = grp.getgrnam(groupname) + self.groups.append({"groupname": groupname, + "gid": grent.gr_gid, + "members": members.copy()}) + + pwent = pwd.getpwnam(username) + self.users.append({"username": username, + "uid": pwent.pw_uid, + "gid": pwent.pw_gid, + "shell": pwent.pw_shell, + "dir": pwent.pw_dir}) + + def tearDown(self): + for group in self.groups: + self.runsubcmd("group", "delete", group["groupname"]) + + for user in self.users: + self.runsubcmd("user", "delete", user["username"]) + + super().tearDown() diff --git a/python/samba/tests/varlink/getgrouprecord.py b/python/samba/tests/varlink/getgrouprecord.py new file mode 100644 index 00000000000..127464b2902 --- /dev/null +++ b/python/samba/tests/varlink/getgrouprecord.py @@ -0,0 +1,61 @@ +# Unix SMB/CIFS implementation. +# +# Copyright (C) Samuel Cabrero 2024 +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . + +""" Winbind varlink service tests """ + +import sys +import os + +sys.path.insert(0, "bin/python") +os.environ["PYTHONUNBUFFERED"] = "1" + +from samba.tests.varlink.base import VarlinkTestCase + + +class VarlinkGetUserRecordTests(VarlinkTestCase): + def setUp(self): + super().setUp() + + def tearDown(self): + super().tearDown() + + def testGetGroupRecord(self): + for group in self.groups: + with self.cli.open("io.systemd.UserDatabase", namespaced=True) as conn: + full_groupname = "%s%s%s" % (self.domain, + self.winbind_separator, + group["groupname"]) + full_members_names = [] + for m in group["members"]: + full_members_names.append("%s%s%s" % (self.domain, + self.winbind_separator, + m)) + r = conn.GetGroupRecord(service=self.varlink_service, + groupName=full_groupname) + self.assertIsNotNone(r) + self.assertFalse(r.incomplete) + self.assertIsNotNone(r.record) + self.assertEqual(r.record["service"], self.varlink_service) + self.assertEqual(r.record["groupName"], full_groupname) + self.assertEqual(r.record["gid"], group["gid"]) + self.assertEqual(sorted(r.record["members"]), + sorted(full_members_names)) + + +if __name__ == "__main__": + import unittest + unittest.main() diff --git a/python/samba/tests/varlink/getmemberships.py b/python/samba/tests/varlink/getmemberships.py new file mode 100644 index 00000000000..0308ccce368 --- /dev/null +++ b/python/samba/tests/varlink/getmemberships.py @@ -0,0 +1,89 @@ +# Unix SMB/CIFS implementation. +# +# Copyright (C) Samuel Cabrero 2024 +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . + +""" Winbind varlink service tests """ + +import sys +import os +import pwd +import grp + +sys.path.insert(0, "bin/python") +os.environ["PYTHONUNBUFFERED"] = "1" + +from samba.tests.varlink.base import VarlinkTestCase + + +class VarlinkGetMembershipsTests(VarlinkTestCase): + def setUp(self): + super().setUp() + + def tearDown(self): + super().tearDown() + + def testGetMembershipsByGroup(self): + for group in self.groups: + full_name = "%s%s%s" % (self.domain, + self.winbind_separator, + group["groupname"]) + full_members_names = [] + for m in group["members"]: + full_members_names.append("%s%s%s" % (self.domain, + self.winbind_separator, + m)) + vl_members = [] + with self.cli.open("io.systemd.UserDatabase", namespaced=True) as conn: + for r in conn.GetMemberships(service=self.varlink_service, + groupName=full_name, + _more=True): + self.assertIsNotNone(r) + vl_members.append(r.userName) + self.assertEqual(sorted(vl_members), + sorted(full_members_names)) + + def testGetMembershipsByUser(self): + for user in self.users: + full_username = "%s%s%s" % (self.domain, + self.winbind_separator, + user["username"]) + pwent = pwd.getpwnam(full_username) + glgid = os.getgrouplist(pwent.pw_name, pwent.pw_gid) + nss_list = [] + for gid in glgid: + grent = grp.getgrgid(gid) + # nss_wrapper looks into files first, and "ADDOMAIN/domain users" is + # mapped to "users" from files NSS group db. + gname = grent.gr_name + if gname == "users": + gname = "%s%s%s" % (self.domain, + self.winbind_separator, + "domain users") + nss_list.append(gname) + + vl_list = [] + with self.cli.open("io.systemd.UserDatabase", namespaced=True) as conn: + for r in conn.GetMemberships(service=self.varlink_service, + userName=full_username, + _more=True): + self.assertIsNotNone(r) + vl_list.append(r.groupName) + + self.assertEqual(sorted(nss_list), sorted(vl_list)) + +if __name__ == "__main__": + import unittest + unittest.main() diff --git a/python/samba/tests/varlink/getuserrecord.py b/python/samba/tests/varlink/getuserrecord.py new file mode 100644 index 00000000000..c82c65df8ad --- /dev/null +++ b/python/samba/tests/varlink/getuserrecord.py @@ -0,0 +1,56 @@ +# Unix SMB/CIFS implementation. +# +# Copyright (C) Samuel Cabrero 2023 +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . + +""" Winbind varlink service tests """ + +import sys +import os + +sys.path.insert(0, "bin/python") +os.environ["PYTHONUNBUFFERED"] = "1" + +from samba.tests.varlink.base import VarlinkTestCase + + +class VarlinkGetUserRecordTests(VarlinkTestCase): + def setUp(self): + super().setUp() + + def tearDown(self): + super().tearDown() + + def testGetUserRecord(self): + for user in self.users: + with self.cli.open("io.systemd.UserDatabase", namespaced=True) as conn: + full_username = "%s%s%s" % (self.domain, + self.winbind_separator, + user["username"]) + r = conn.GetUserRecord(service=self.varlink_service, + userName=full_username) + self.assertIsNotNone(r) + self.assertFalse(r.incomplete) + self.assertIsNotNone(r.record) + self.assertEqual(r.record["service"], self.varlink_service) + self.assertEqual(r.record["userName"], full_username) + self.assertEqual(r.record["uid"], user["uid"]) + self.assertEqual(r.record["gid"], user["gid"]) + self.assertEqual(r.record["shell"], user["shell"]) + self.assertEqual(r.record["homeDirectory"], user["dir"]) + +if __name__ == "__main__": + import unittest + unittest.main() diff --git a/script/autobuild.py b/script/autobuild.py index 137a6eb0999..8817968d760 100755 --- a/script/autobuild.py +++ b/script/autobuild.py @@ -282,7 +282,7 @@ tasks = { "samba-mit-build": { "git-clone-required": True, "sequence": [ - ("configure", "./configure.developer --with-system-mitkrb5 --with-experimental-mit-ad-dc" + samba_configure_params), + ("configure", "./configure.developer --with-system-mitkrb5 --with-experimental-mit-ad-dc --with-systemd-userdb" + samba_configure_params), ("make", "make -j"), ("check-clean-tree", CLEAN_SOURCE_TREE_CMD), ("chmod-R-a-w", "chmod -R a-w ."), diff --git a/selftest/target/Samba4.pm b/selftest/target/Samba4.pm index 9da339f6239..1316e3e0581 100755 --- a/selftest/target/Samba4.pm +++ b/selftest/target/Samba4.pm @@ -821,6 +821,10 @@ sub provision_raw_step1($$) idmap_ldb:use rfc2307=yes winbind enum users = yes winbind enum groups = yes + winbind expand groups = 1 + + winbind varlink : socket directory = $ctx->{winbindd_socket_dir} + winbind varlink : service name = org.samba.selftest rpc server port:netlogon = 1026 include system krb5 conf = no @@ -2165,6 +2169,9 @@ sub provision_ad_dc() dsdb event notification = true dsdb password event notification = true dsdb group change notification = true + + winbind varlink service = yes + $smbconf_args "; diff --git a/source4/selftest/tests.py b/source4/selftest/tests.py index fab95fcfef3..1e8bc42b055 100755 --- a/source4/selftest/tests.py +++ b/source4/selftest/tests.py @@ -2250,3 +2250,8 @@ if have_cluster_support: planpythontestsuite("clusteredmember:local", "samba.tests.blackbox.rpcd_witness_samba_only", environ=cluster_environ) + +if 'WITH_SYSTEMD_USERDB' in config_hash: + planpythontestsuite("ad_dc:local", "samba.tests.varlink.getuserrecord") + planpythontestsuite("ad_dc:local", "samba.tests.varlink.getgrouprecord") + planpythontestsuite("ad_dc:local", "samba.tests.varlink.getmemberships")