]> git.ipfire.org Git - thirdparty/samba.git/commitdiff
pytests/varlink: Add varlink tests
authorSamuel Cabrero <scabrero@samba.org>
Tue, 21 Feb 2023 17:02:26 +0000 (18:02 +0100)
committerAndreas Schneider <asn@cryptomilk.org>
Thu, 20 Feb 2025 08:07:32 +0000 (08:07 +0000)
Signed-off-by: Samuel Cabrero <scabrero@samba.org>
Reviewed-by: Andreas Schneider <asn@samba.org>
python/samba/tests/varlink/base.py [new file with mode: 0644]
python/samba/tests/varlink/getgrouprecord.py [new file with mode: 0644]
python/samba/tests/varlink/getmemberships.py [new file with mode: 0644]
python/samba/tests/varlink/getuserrecord.py [new file with mode: 0644]
script/autobuild.py
selftest/target/Samba4.pm
source4/selftest/tests.py

diff --git a/python/samba/tests/varlink/base.py b/python/samba/tests/varlink/base.py
new file mode 100644 (file)
index 0000000..3e313ec
--- /dev/null
@@ -0,0 +1,89 @@
+# Unix SMB/CIFS implementation.
+#
+# Copyright (C) Samuel Cabrero <scabrero@samba.org> 2023
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program.  If not, see <http://www.gnu.org/licenses/>.
+
+""" Winbind varlink service tests, base class """
+
+from samba.tests.samba_tool.base import SambaToolCmdTest
+from samba.auth import system_session
+from samba.samdb import SamDB
+from samba.credentials import Credentials
+import os, pwd, grp
+import varlink
+import samba
+import subprocess
+
+class VarlinkTestCase(SambaToolCmdTest):
+
+    def setUp(self):
+        super().setUp()
+        sdir = samba.tests.env_get_var_value("SELFTEST_WINBINDD_SOCKET_DIR")
+        uri = "unix:" + os.path.join(sdir, "org.samba.selftest")
+        self.cli = varlink.Client.new_with_address(uri)
+        self.assertIsNotNone(self.cli)
+
+        self.lp = samba.tests.env_loadparm()
+        self.domain = samba.tests.env_get_var_value("DOMAIN")
+        self.winbind_separator = self.lp.get('winbind separator')
+        self.varlink_service = self.lp.get('winbind varlink : service name')
+
+        self.bindir = os.path.normpath(os.getenv("BINDIR", "./bin"))
+        self.netcmd = os.path.join(self.bindir, "net")
+
+        self.ldb = SamDB(
+            session_info=system_session(),
+            credentials=Credentials(),
+            lp=self.lp)
+
+        self.users = []
+        self.groups = []
+        members = []
+        for i in range(0, 3):
+            username = "vl_test_user_%d" % i
+            groupname = "vl_test_group_%d" % i
+
+            subprocess.Popen([self.netcmd, "cache", "del", "NAME2SID/%s\\%s"
+                              % (self.domain, username.upper())], stdout=subprocess.PIPE)
+            self.runsubcmd("user", "create", username, self.random_password())
+
+            subprocess.Popen([self.netcmd, "cache", "del", "NAME2SID/%s\\%s"
+                              % (self.domain, groupname.upper())], stdout=subprocess.PIPE)
+            self.runsubcmd("group", "create", groupname)
+
+            members.append(username)
+            for m in members:
+                self.runsubcmd("group", "addmembers", groupname, m)
+
+            grent = grp.getgrnam(groupname)
+            self.groups.append({"groupname": groupname,
+                                "gid": grent.gr_gid,
+                                "members": members.copy()})
+
+            pwent = pwd.getpwnam(username)
+            self.users.append({"username": username,
+                   "uid": pwent.pw_uid,
+                   "gid": pwent.pw_gid,
+                   "shell": pwent.pw_shell,
+                   "dir": pwent.pw_dir})
+
+    def tearDown(self):
+        for group in self.groups:
+            self.runsubcmd("group", "delete", group["groupname"])
+
+        for user in self.users:
+            self.runsubcmd("user", "delete", user["username"])
+
+        super().tearDown()
diff --git a/python/samba/tests/varlink/getgrouprecord.py b/python/samba/tests/varlink/getgrouprecord.py
new file mode 100644 (file)
index 0000000..127464b
--- /dev/null
@@ -0,0 +1,61 @@
+# Unix SMB/CIFS implementation.
+#
+# Copyright (C) Samuel Cabrero <scabrero@samba.org> 2024
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program.  If not, see <http://www.gnu.org/licenses/>.
+
+""" Winbind varlink service tests """
+
+import sys
+import os
+
+sys.path.insert(0, "bin/python")
+os.environ["PYTHONUNBUFFERED"] = "1"
+
+from samba.tests.varlink.base import VarlinkTestCase
+
+
+class VarlinkGetUserRecordTests(VarlinkTestCase):
+    def setUp(self):
+        super().setUp()
+
+    def tearDown(self):
+        super().tearDown()
+
+    def testGetGroupRecord(self):
+        for group in self.groups:
+            with self.cli.open("io.systemd.UserDatabase", namespaced=True) as conn:
+                full_groupname = "%s%s%s" % (self.domain,
+                                             self.winbind_separator,
+                                             group["groupname"])
+                full_members_names = []
+                for m in group["members"]:
+                    full_members_names.append("%s%s%s" % (self.domain,
+                                              self.winbind_separator,
+                                              m))
+                r = conn.GetGroupRecord(service=self.varlink_service,
+                                        groupName=full_groupname)
+                self.assertIsNotNone(r)
+                self.assertFalse(r.incomplete)
+                self.assertIsNotNone(r.record)
+                self.assertEqual(r.record["service"], self.varlink_service)
+                self.assertEqual(r.record["groupName"], full_groupname)
+                self.assertEqual(r.record["gid"], group["gid"])
+                self.assertEqual(sorted(r.record["members"]),
+                                 sorted(full_members_names))
+
+
+if __name__ == "__main__":
+    import unittest
+    unittest.main()
diff --git a/python/samba/tests/varlink/getmemberships.py b/python/samba/tests/varlink/getmemberships.py
new file mode 100644 (file)
index 0000000..0308ccc
--- /dev/null
@@ -0,0 +1,89 @@
+# Unix SMB/CIFS implementation.
+#
+# Copyright (C) Samuel Cabrero <scabrero@samba.org> 2024
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program.  If not, see <http://www.gnu.org/licenses/>.
+
+""" Winbind varlink service tests """
+
+import sys
+import os
+import pwd
+import grp
+
+sys.path.insert(0, "bin/python")
+os.environ["PYTHONUNBUFFERED"] = "1"
+
+from samba.tests.varlink.base import VarlinkTestCase
+
+
+class VarlinkGetMembershipsTests(VarlinkTestCase):
+    def setUp(self):
+        super().setUp()
+
+    def tearDown(self):
+        super().tearDown()
+
+    def testGetMembershipsByGroup(self):
+        for group in self.groups:
+            full_name = "%s%s%s" % (self.domain,
+                                    self.winbind_separator,
+                                    group["groupname"])
+            full_members_names = []
+            for m in group["members"]:
+                full_members_names.append("%s%s%s" % (self.domain,
+                                          self.winbind_separator,
+                                          m))
+            vl_members = []
+            with self.cli.open("io.systemd.UserDatabase", namespaced=True) as conn:
+                for r in conn.GetMemberships(service=self.varlink_service,
+                                             groupName=full_name,
+                                             _more=True):
+                    self.assertIsNotNone(r)
+                    vl_members.append(r.userName)
+            self.assertEqual(sorted(vl_members),
+                             sorted(full_members_names))
+
+    def testGetMembershipsByUser(self):
+        for user in self.users:
+            full_username = "%s%s%s" % (self.domain,
+                                        self.winbind_separator,
+                                        user["username"])
+            pwent = pwd.getpwnam(full_username)
+            glgid = os.getgrouplist(pwent.pw_name, pwent.pw_gid)
+            nss_list = []
+            for gid in glgid:
+                grent = grp.getgrgid(gid)
+                # nss_wrapper looks into files first, and "ADDOMAIN/domain users" is
+                # mapped to "users" from files NSS group db.
+                gname = grent.gr_name
+                if gname == "users":
+                    gname = "%s%s%s" % (self.domain,
+                                        self.winbind_separator,
+                                        "domain users")
+                nss_list.append(gname)
+
+            vl_list = []
+            with self.cli.open("io.systemd.UserDatabase", namespaced=True) as conn:
+                for r in conn.GetMemberships(service=self.varlink_service,
+                                             userName=full_username,
+                                             _more=True):
+                    self.assertIsNotNone(r)
+                    vl_list.append(r.groupName)
+
+            self.assertEqual(sorted(nss_list), sorted(vl_list))
+
+if __name__ == "__main__":
+    import unittest
+    unittest.main()
diff --git a/python/samba/tests/varlink/getuserrecord.py b/python/samba/tests/varlink/getuserrecord.py
new file mode 100644 (file)
index 0000000..c82c65d
--- /dev/null
@@ -0,0 +1,56 @@
+# Unix SMB/CIFS implementation.
+#
+# Copyright (C) Samuel Cabrero <scabrero@samba.org> 2023
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program.  If not, see <http://www.gnu.org/licenses/>.
+
+""" Winbind varlink service tests """
+
+import sys
+import os
+
+sys.path.insert(0, "bin/python")
+os.environ["PYTHONUNBUFFERED"] = "1"
+
+from samba.tests.varlink.base import VarlinkTestCase
+
+
+class VarlinkGetUserRecordTests(VarlinkTestCase):
+    def setUp(self):
+        super().setUp()
+
+    def tearDown(self):
+        super().tearDown()
+
+    def testGetUserRecord(self):
+        for user in self.users:
+            with self.cli.open("io.systemd.UserDatabase", namespaced=True) as conn:
+                full_username = "%s%s%s" % (self.domain,
+                                            self.winbind_separator,
+                                            user["username"])
+                r = conn.GetUserRecord(service=self.varlink_service,
+                                        userName=full_username)
+                self.assertIsNotNone(r)
+                self.assertFalse(r.incomplete)
+                self.assertIsNotNone(r.record)
+                self.assertEqual(r.record["service"], self.varlink_service)
+                self.assertEqual(r.record["userName"], full_username)
+                self.assertEqual(r.record["uid"], user["uid"])
+                self.assertEqual(r.record["gid"], user["gid"])
+                self.assertEqual(r.record["shell"], user["shell"])
+                self.assertEqual(r.record["homeDirectory"], user["dir"])
+
+if __name__ == "__main__":
+    import unittest
+    unittest.main()
index 137a6eb09999020d996a6e26065b4b998c7b59ca..8817968d760c6fb8b2245b8c93a874f0eced1edd 100755 (executable)
@@ -282,7 +282,7 @@ tasks = {
     "samba-mit-build": {
         "git-clone-required": True,
         "sequence": [
-            ("configure", "./configure.developer --with-system-mitkrb5 --with-experimental-mit-ad-dc" + samba_configure_params),
+            ("configure", "./configure.developer --with-system-mitkrb5 --with-experimental-mit-ad-dc --with-systemd-userdb" + samba_configure_params),
             ("make", "make -j"),
             ("check-clean-tree", CLEAN_SOURCE_TREE_CMD),
             ("chmod-R-a-w", "chmod -R a-w ."),
index 9da339f62397a606800e7fcfe4e6b49e1edd7a34..1316e3e0581ab246b0894277507ad137ab3c7b81 100755 (executable)
@@ -821,6 +821,10 @@ sub provision_raw_step1($$)
         idmap_ldb:use rfc2307=yes
        winbind enum users = yes
        winbind enum groups = yes
+       winbind expand groups = 1
+
+       winbind varlink : socket directory = $ctx->{winbindd_socket_dir}
+       winbind varlink : service name = org.samba.selftest
 
         rpc server port:netlogon = 1026
        include system krb5 conf = no
@@ -2165,6 +2169,9 @@ sub provision_ad_dc()
        dsdb event notification = true
        dsdb password event notification = true
        dsdb group change notification = true
+
+       winbind varlink service = yes
+
         $smbconf_args
 ";
 
index fab95fcfef3f35d49bacd52c7ea1795dec28c0ce..1e8bc42b055a6f26a0aca9cf1aeb25ee617f7518 100755 (executable)
@@ -2250,3 +2250,8 @@ if have_cluster_support:
     planpythontestsuite("clusteredmember:local",
                         "samba.tests.blackbox.rpcd_witness_samba_only",
                         environ=cluster_environ)
+
+if 'WITH_SYSTEMD_USERDB' in config_hash:
+    planpythontestsuite("ad_dc:local", "samba.tests.varlink.getuserrecord")
+    planpythontestsuite("ad_dc:local", "samba.tests.varlink.getgrouprecord")
+    planpythontestsuite("ad_dc:local", "samba.tests.varlink.getmemberships")