--- /dev/null
+# Unix SMB/CIFS implementation.
+#
+# Copyright (C) Samuel Cabrero <scabrero@samba.org> 2023
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+
+""" Winbind varlink service tests, base class """
+
+from samba.tests.samba_tool.base import SambaToolCmdTest
+from samba.auth import system_session
+from samba.samdb import SamDB
+from samba.credentials import Credentials
+import os, pwd, grp
+import varlink
+import samba
+import subprocess
+
+class VarlinkTestCase(SambaToolCmdTest):
+
+ def setUp(self):
+ super().setUp()
+ sdir = samba.tests.env_get_var_value("SELFTEST_WINBINDD_SOCKET_DIR")
+ uri = "unix:" + os.path.join(sdir, "org.samba.selftest")
+ self.cli = varlink.Client.new_with_address(uri)
+ self.assertIsNotNone(self.cli)
+
+ self.lp = samba.tests.env_loadparm()
+ self.domain = samba.tests.env_get_var_value("DOMAIN")
+ self.winbind_separator = self.lp.get('winbind separator')
+ self.varlink_service = self.lp.get('winbind varlink : service name')
+
+ self.bindir = os.path.normpath(os.getenv("BINDIR", "./bin"))
+ self.netcmd = os.path.join(self.bindir, "net")
+
+ self.ldb = SamDB(
+ session_info=system_session(),
+ credentials=Credentials(),
+ lp=self.lp)
+
+ self.users = []
+ self.groups = []
+ members = []
+ for i in range(0, 3):
+ username = "vl_test_user_%d" % i
+ groupname = "vl_test_group_%d" % i
+
+ subprocess.Popen([self.netcmd, "cache", "del", "NAME2SID/%s\\%s"
+ % (self.domain, username.upper())], stdout=subprocess.PIPE)
+ self.runsubcmd("user", "create", username, self.random_password())
+
+ subprocess.Popen([self.netcmd, "cache", "del", "NAME2SID/%s\\%s"
+ % (self.domain, groupname.upper())], stdout=subprocess.PIPE)
+ self.runsubcmd("group", "create", groupname)
+
+ members.append(username)
+ for m in members:
+ self.runsubcmd("group", "addmembers", groupname, m)
+
+ grent = grp.getgrnam(groupname)
+ self.groups.append({"groupname": groupname,
+ "gid": grent.gr_gid,
+ "members": members.copy()})
+
+ pwent = pwd.getpwnam(username)
+ self.users.append({"username": username,
+ "uid": pwent.pw_uid,
+ "gid": pwent.pw_gid,
+ "shell": pwent.pw_shell,
+ "dir": pwent.pw_dir})
+
+ def tearDown(self):
+ for group in self.groups:
+ self.runsubcmd("group", "delete", group["groupname"])
+
+ for user in self.users:
+ self.runsubcmd("user", "delete", user["username"])
+
+ super().tearDown()
--- /dev/null
+# Unix SMB/CIFS implementation.
+#
+# Copyright (C) Samuel Cabrero <scabrero@samba.org> 2024
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+
+""" Winbind varlink service tests """
+
+import sys
+import os
+
+sys.path.insert(0, "bin/python")
+os.environ["PYTHONUNBUFFERED"] = "1"
+
+from samba.tests.varlink.base import VarlinkTestCase
+
+
+class VarlinkGetUserRecordTests(VarlinkTestCase):
+ def setUp(self):
+ super().setUp()
+
+ def tearDown(self):
+ super().tearDown()
+
+ def testGetGroupRecord(self):
+ for group in self.groups:
+ with self.cli.open("io.systemd.UserDatabase", namespaced=True) as conn:
+ full_groupname = "%s%s%s" % (self.domain,
+ self.winbind_separator,
+ group["groupname"])
+ full_members_names = []
+ for m in group["members"]:
+ full_members_names.append("%s%s%s" % (self.domain,
+ self.winbind_separator,
+ m))
+ r = conn.GetGroupRecord(service=self.varlink_service,
+ groupName=full_groupname)
+ self.assertIsNotNone(r)
+ self.assertFalse(r.incomplete)
+ self.assertIsNotNone(r.record)
+ self.assertEqual(r.record["service"], self.varlink_service)
+ self.assertEqual(r.record["groupName"], full_groupname)
+ self.assertEqual(r.record["gid"], group["gid"])
+ self.assertEqual(sorted(r.record["members"]),
+ sorted(full_members_names))
+
+
+if __name__ == "__main__":
+ import unittest
+ unittest.main()
--- /dev/null
+# Unix SMB/CIFS implementation.
+#
+# Copyright (C) Samuel Cabrero <scabrero@samba.org> 2024
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+
+""" Winbind varlink service tests """
+
+import sys
+import os
+import pwd
+import grp
+
+sys.path.insert(0, "bin/python")
+os.environ["PYTHONUNBUFFERED"] = "1"
+
+from samba.tests.varlink.base import VarlinkTestCase
+
+
+class VarlinkGetMembershipsTests(VarlinkTestCase):
+ def setUp(self):
+ super().setUp()
+
+ def tearDown(self):
+ super().tearDown()
+
+ def testGetMembershipsByGroup(self):
+ for group in self.groups:
+ full_name = "%s%s%s" % (self.domain,
+ self.winbind_separator,
+ group["groupname"])
+ full_members_names = []
+ for m in group["members"]:
+ full_members_names.append("%s%s%s" % (self.domain,
+ self.winbind_separator,
+ m))
+ vl_members = []
+ with self.cli.open("io.systemd.UserDatabase", namespaced=True) as conn:
+ for r in conn.GetMemberships(service=self.varlink_service,
+ groupName=full_name,
+ _more=True):
+ self.assertIsNotNone(r)
+ vl_members.append(r.userName)
+ self.assertEqual(sorted(vl_members),
+ sorted(full_members_names))
+
+ def testGetMembershipsByUser(self):
+ for user in self.users:
+ full_username = "%s%s%s" % (self.domain,
+ self.winbind_separator,
+ user["username"])
+ pwent = pwd.getpwnam(full_username)
+ glgid = os.getgrouplist(pwent.pw_name, pwent.pw_gid)
+ nss_list = []
+ for gid in glgid:
+ grent = grp.getgrgid(gid)
+ # nss_wrapper looks into files first, and "ADDOMAIN/domain users" is
+ # mapped to "users" from files NSS group db.
+ gname = grent.gr_name
+ if gname == "users":
+ gname = "%s%s%s" % (self.domain,
+ self.winbind_separator,
+ "domain users")
+ nss_list.append(gname)
+
+ vl_list = []
+ with self.cli.open("io.systemd.UserDatabase", namespaced=True) as conn:
+ for r in conn.GetMemberships(service=self.varlink_service,
+ userName=full_username,
+ _more=True):
+ self.assertIsNotNone(r)
+ vl_list.append(r.groupName)
+
+ self.assertEqual(sorted(nss_list), sorted(vl_list))
+
+if __name__ == "__main__":
+ import unittest
+ unittest.main()
--- /dev/null
+# Unix SMB/CIFS implementation.
+#
+# Copyright (C) Samuel Cabrero <scabrero@samba.org> 2023
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+
+""" Winbind varlink service tests """
+
+import sys
+import os
+
+sys.path.insert(0, "bin/python")
+os.environ["PYTHONUNBUFFERED"] = "1"
+
+from samba.tests.varlink.base import VarlinkTestCase
+
+
+class VarlinkGetUserRecordTests(VarlinkTestCase):
+ def setUp(self):
+ super().setUp()
+
+ def tearDown(self):
+ super().tearDown()
+
+ def testGetUserRecord(self):
+ for user in self.users:
+ with self.cli.open("io.systemd.UserDatabase", namespaced=True) as conn:
+ full_username = "%s%s%s" % (self.domain,
+ self.winbind_separator,
+ user["username"])
+ r = conn.GetUserRecord(service=self.varlink_service,
+ userName=full_username)
+ self.assertIsNotNone(r)
+ self.assertFalse(r.incomplete)
+ self.assertIsNotNone(r.record)
+ self.assertEqual(r.record["service"], self.varlink_service)
+ self.assertEqual(r.record["userName"], full_username)
+ self.assertEqual(r.record["uid"], user["uid"])
+ self.assertEqual(r.record["gid"], user["gid"])
+ self.assertEqual(r.record["shell"], user["shell"])
+ self.assertEqual(r.record["homeDirectory"], user["dir"])
+
+if __name__ == "__main__":
+ import unittest
+ unittest.main()
"samba-mit-build": {
"git-clone-required": True,
"sequence": [
- ("configure", "./configure.developer --with-system-mitkrb5 --with-experimental-mit-ad-dc" + samba_configure_params),
+ ("configure", "./configure.developer --with-system-mitkrb5 --with-experimental-mit-ad-dc --with-systemd-userdb" + samba_configure_params),
("make", "make -j"),
("check-clean-tree", CLEAN_SOURCE_TREE_CMD),
("chmod-R-a-w", "chmod -R a-w ."),
idmap_ldb:use rfc2307=yes
winbind enum users = yes
winbind enum groups = yes
+ winbind expand groups = 1
+
+ winbind varlink : socket directory = $ctx->{winbindd_socket_dir}
+ winbind varlink : service name = org.samba.selftest
rpc server port:netlogon = 1026
include system krb5 conf = no
dsdb event notification = true
dsdb password event notification = true
dsdb group change notification = true
+
+ winbind varlink service = yes
+
$smbconf_args
";
planpythontestsuite("clusteredmember:local",
"samba.tests.blackbox.rpcd_witness_samba_only",
environ=cluster_environ)
+
+if 'WITH_SYSTEMD_USERDB' in config_hash:
+ planpythontestsuite("ad_dc:local", "samba.tests.varlink.getuserrecord")
+ planpythontestsuite("ad_dc:local", "samba.tests.varlink.getgrouprecord")
+ planpythontestsuite("ad_dc:local", "samba.tests.varlink.getmemberships")