From: Iker Pedrosa Date: Mon, 7 Oct 2024 13:44:17 +0000 (+0200) Subject: Tests: implement system test framework X-Git-Tag: 4.17.3~88 X-Git-Url: http://git.ipfire.org/gitweb/gitweb.cgi?a=commitdiff_plain;h=128650dfd440f8a9f44d3c8af77e06dd4eea4dd4;p=thirdparty%2Fshadow.git Tests: implement system test framework As discussed at length, this is the implementation of the new system tests framework for shadow. This is a proof of concept that contains the key elements to be able to run basic user (i.e. useradd, usermod) and group (i.e. usermod) tests. If you like the framework the rest of the functionality will be added in the future. Some useful facts: * It is implemented in python * It is based on pytest and pytest-mh * It works on all the distributions that are part of our CI * It can be run in the cloud (VM or container) as well as on-premises * After the execution of each test the environment is cleaned up * Logs and other artifacts for failed tests are collected * It has a rich API that can be extended and extended to cover new functionalities Closes: https://github.com/shadow-maint/shadow/issues/835 Signed-off-by: Iker Pedrosa --- diff --git a/tests/system/framework/__init__.py b/tests/system/framework/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/tests/system/framework/config.py b/tests/system/framework/config.py new file mode 100644 index 000000000..69ab951e0 --- /dev/null +++ b/tests/system/framework/config.py @@ -0,0 +1,53 @@ +from __future__ import annotations + +from typing import Type + +from pytest_mh import MultihostConfig, MultihostDomain, MultihostHost, MultihostRole + +__all__ = [ + "ShadowMultihostConfig", + "ShadowMultihostDomain", +] + + +class ShadowMultihostConfig(MultihostConfig): + @property + def id_to_domain_class(self) -> dict[str, Type[MultihostDomain]]: + """ + All domains are mapped to :class:`ShadowMultihostDomain`. + + :rtype: Class name. + """ + return {"*": ShadowMultihostDomain} + + +class ShadowMultihostDomain(MultihostDomain[ShadowMultihostConfig]): + @property + def role_to_host_class(self) -> dict[str, Type[MultihostHost]]: + """ + Map roles to classes: + + * shadow to ShadowHost + + :rtype: Class name. + """ + from .hosts.shadow import ShadowHost + + return { + "shadow": ShadowHost, + } + + @property + def role_to_role_class(self) -> dict[str, Type[MultihostRole]]: + """ + Map roles to classes: + + * shadow to Shadow + + :rtype: Class name. + """ + from .roles.shadow import Shadow + + return { + "shadow": Shadow, + } diff --git a/tests/system/framework/fixtures.py b/tests/system/framework/fixtures.py new file mode 100644 index 000000000..d9775db0d --- /dev/null +++ b/tests/system/framework/fixtures.py @@ -0,0 +1,45 @@ +"""Pytest fixtures.""" + +from __future__ import annotations + +import os + +import pytest + + +@pytest.fixture(scope="session") +def datadir(request: pytest.FixtureRequest) -> str: + """ + Data directory shared for all tests. + + :return: Path to the data directory ``(root-pytest-dir)/data``. + :rtype: str + """ + return os.path.join(request.node.path, "data") + + +@pytest.fixture(scope="module") +def moduledatadir(datadir: str, request: pytest.FixtureRequest) -> str: + """ + Data directory shared for all tests within a single module. + + :return: Path to the data directory ``(root-pytest-dir)/data/$module_name``. + :rtype: str + """ + name = request.module.__name__ + return os.path.join(datadir, name) + + +@pytest.fixture(scope="function") +def testdatadir(moduledatadir: str, request: pytest.FixtureRequest) -> str: + """ + Data directory for current test. + + :return: Path to the data directory ``(root-pytest-dir)/data/$module_name/$test_name``. + :rtype: str + """ + if not isinstance(request.node, pytest.Function): + raise TypeError(f"Excepted pytest.Function, got {type(request.node)}") + + name = request.node.originalname + return os.path.join(moduledatadir, name) diff --git a/tests/system/framework/hosts/__init__.py b/tests/system/framework/hosts/__init__.py new file mode 100644 index 000000000..feb44d4e2 --- /dev/null +++ b/tests/system/framework/hosts/__init__.py @@ -0,0 +1,3 @@ +"""shadow multihost hosts.""" + +from __future__ import annotations diff --git a/tests/system/framework/hosts/base.py b/tests/system/framework/hosts/base.py new file mode 100644 index 000000000..a5ee5d914 --- /dev/null +++ b/tests/system/framework/hosts/base.py @@ -0,0 +1,107 @@ +"""Base classes and objects for shadow specific multihost hosts.""" + +from __future__ import annotations + +import csv + +from pytest_mh import MultihostBackupHost, MultihostHost +from pytest_mh.utils.fs import LinuxFileSystem + +from ..config import ShadowMultihostDomain + +__all__ = [ + "BaseHost", + "BaseLinuxHost", +] + + +class BaseHost(MultihostBackupHost[ShadowMultihostDomain]): + """ + Base class for all shadow hosts. + """ + + def __init__(self, *args, **kwargs) -> None: + # restore is handled in topology controllers + super().__init__(*args, **kwargs) + + @property + def features(self) -> dict[str, bool]: + """ + Features supported by the host. + """ + return {} + + +class BaseLinuxHost(MultihostHost[ShadowMultihostDomain]): + """ + Base Linux host. + + Adds linux specific reentrant utilities. + """ + + def __init__(self, *args, **kwargs) -> None: + super().__init__(*args, **kwargs) + + self.fs: LinuxFileSystem = LinuxFileSystem(self) + self._os_release: dict = {} + self._distro_name: str = "unknown" + self._distro_major: int = 0 + self._distro_minor: int = 0 + + def _distro_information(self): + """ + Pulls distro information from a host from /ets/os-release + """ + self.logger.info(f"Detecting distro information on {self.hostname}") + os_release = self.fs.read("/etc/os-release") + self._os_release = dict(csv.reader([x for x in os_release.splitlines() if x], delimiter="=")) + if "NAME" in self._os_release: + self._distro_name = self._os_release["NAME"] + if "VERSION_ID" not in self._os_release: + return + if "." in self._os_release["VERSION_ID"]: + self._distro_major = int(self._os_release["VERSION_ID"].split(".", maxsplit=1)[0]) + self._distro_minor = int(self._os_release["VERSION_ID"].split(".", maxsplit=1)[1]) + else: + self._distro_major = int(self._os_release["VERSION_ID"]) + + @property + def distro_name(self) -> str: + """ + Host distribution + + :return: Distribution name or "unknown" + :rtype: str + """ + # NAME item from os-release + if not self._os_release: + self._distro_information() + return self._distro_name + + @property + def distro_major(self) -> int: + """ + Host distribution major version + + :return: Major version + :rtype: int + """ + # First part of VERSION_ID from os-release + # Returns zero when could not detect + if not self._os_release: + self._distro_information() + return self._distro_major + + @property + def distro_minor(self) -> int: + """ + Host distribution minor version + + :return: Minor version + :rtype: int + """ + # Second part of VERSION_ID from os-release + # Returns zero when no minor version is present + if not self._os_release: + self._distro_information() + return self._distro_minor diff --git a/tests/system/framework/hosts/shadow.py b/tests/system/framework/hosts/shadow.py new file mode 100644 index 000000000..1fb656b63 --- /dev/null +++ b/tests/system/framework/hosts/shadow.py @@ -0,0 +1,175 @@ +"""shadow multihost host.""" + +from __future__ import annotations + +from pathlib import PurePosixPath +from typing import Any + +from pytest_mh.conn import ProcessLogLevel + +from .base import BaseHost, BaseLinuxHost + +__all__ = [ + "ShadowHost", +] + + +class ShadowHost(BaseHost, BaseLinuxHost): + """ + shadow host object. + + This is the host where the tests are run. + + .. note:: + + Full backup and restore of shadow state is supported. + """ + + def __init__(self, *args, **kwargs) -> None: + super().__init__(*args, **kwargs) + + self._features: dict[str, bool] | None = None + """Features dictionary.""" + + self._backup_path: PurePosixPath | None = None + """Path to backup files.""" + + self._verify_files: [dict[str, str]] = [ + {"origin": "/etc/passwd", "backup": "passwd"}, + {"origin": "/etc/shadow", "backup": "shadow"}, + {"origin": "/etc/group", "backup": "group"}, + {"origin": "/etc/gshadow", "backup": "gshadow"}, + ] + """Files to verify for mismatch.""" + + def pytest_setup(self) -> None: + super().pytest_setup() + + def start(self) -> None: + """ + Not supported. + + :raises NotImplementedError: _description_ + """ + raise NotImplementedError("Starting shadow service is not implemented.") + + def stop(self) -> None: + """ + Not supported. + + :raises NotImplementedError: _description_ + """ + raise NotImplementedError("Stopping shadow service is not implemented.") + + def backup(self) -> Any: + """ + Backup all shadow data. + + :return: Backup data. + :rtype: Any + """ + self.logger.info("Creating backup of shadow host") + + result = self.conn.run( + """ + set -ex + + function backup { + if [ -d "$1" ] || [ -f "$1" ]; then + cp --force --archive "$1" "$2" + fi + } + + path=`mktemp -d` + backup /etc/login.defs "$path/login.defs" + backup /etc/default/useradd "$path/useradd" + backup /etc/passwd "$path/passwd" + backup /etc/shadow "$path/shadow" + backup /etc/group "$path/group" + backup /etc/gshadow "$path/gshadow" + backup /etc/subuid "$path/subuid" + backup /etc/subgid "$path/subgid" + backup /home "$path/home" + backup /var/log/secure "$path/secure" + + echo $path + """, + log_level=ProcessLogLevel.Error, + ) + + self._backup_path = PurePosixPath(result.stdout_lines[-1].strip()) + + return PurePosixPath(result.stdout_lines[-1].strip()) + + def restore(self, backup_data: Any | None) -> None: + """ + Restore all shadow data. + + :return: Backup data. + :rtype: Any + """ + if backup_data is None: + return + + if not isinstance(backup_data, PurePosixPath): + raise TypeError(f"Expected PurePosixPath, got {type(backup_data)}") + + backup_path = str(backup_data) + + self.logger.info(f"Restoring shadow data from {backup_path}") + self.conn.run( + f""" + set -ex + + function restore {{ + rm --force --recursive "$2" + if [ -d "$1" ] || [ -f "$1" ]; then + cp --force --archive "$1" "$2" + fi + }} + + rm --force --recursive /var/log/secure + restore "{backup_path}/login.defs" /etc/login.defs + restore "{backup_path}/useradd" /etc/default/useradd + restore "{backup_path}/passwd" /etc/passwd + restore "{backup_path}/shadow" /etc/shadow + restore "{backup_path}/group" /etc/group + restore "{backup_path}/gshadow" /etc/gshadow + restore "{backup_path}/subuid" /etc/subuid + restore "{backup_path}/subgid" /etc/subgid + restore "{backup_path}/home" /home + restore "{backup_path}/secure" /var/log/secure + """, + log_level=ProcessLogLevel.Error, + ) + + def detect_file_mismatches(self) -> None: + """ + Shadow binaries modify a number of files, but usually do not modify all of them. This is why we add an + additional check at the end of the test to verify that the files that should not have been modified are still + intact. + """ + self.logger.info(f"Detecting mismatches in shadow files {self._backup_path}") + + for x in self._verify_files: + result = self.conn.run( + f""" + set -ex + + cmp {x['origin']} {self._backup_path}/{x['backup']} + """, + log_level=ProcessLogLevel.Error, + raise_on_error=False, + ) + if result.rc != 0: + self.logger.error(f"File mismatch in '{x['origin']}' and '{self._backup_path}/{x['backup']}'") + result.throw() + + def discard_file(self, origin: str) -> None: + """ + Discard modified files from the files that should be verified. + """ + for x in self._verify_files: + if x["origin"] == origin: + self._verify_files.remove(x) + break diff --git a/tests/system/framework/markers.py b/tests/system/framework/markers.py new file mode 100644 index 000000000..89caa05d5 --- /dev/null +++ b/tests/system/framework/markers.py @@ -0,0 +1,100 @@ +"""Pytest fixtures.""" + +from __future__ import annotations + +from functools import partial + +import pytest +from pytest_mh import MultihostItemData, Topology + +from .misc import to_list_of_strings +from .roles.base import BaseRole +from .topology import KnownTopology, KnownTopologyGroup + + +def pytest_configure(config: pytest.Config): + """ + Pytest hook: register multihost plugin. + """ + + # register additional markers + config.addinivalue_line( + "markers", + "builtwith(feature): Run test only if shadow was built with given feature", + ) + + +def builtwith(item: pytest.Function, requirements: dict[str, str], **kwargs: BaseRole): + def value_error(msg: str) -> ValueError: + return ValueError(f"{item.nodeid}::{item.originalname}: @pytest.mark.builtwith: {msg}") + + errors: list[str] = [] + for role, features in requirements.items(): + if role not in kwargs: + raise value_error(f"unknown fixture '{role}'") + + if not isinstance(kwargs[role], BaseRole): + raise value_error(f"fixture '{role}' is not instance of BaseRole") + + obj = kwargs[role] + for feature in to_list_of_strings(features): + if feature not in obj.features: + raise value_error(f"unknown feature '{feature}' in '{role}'") + + if not obj.features[feature]: + errors.append(f'{role} does not support "{feature}"') + + if len(errors) == 1: + return (False, errors[0]) + elif len(errors) > 1: + return (False, str(errors)) + + # All requirements were passed + return True + + +@pytest.hookimpl(tryfirst=True) +def pytest_runtest_setup(item: pytest.Item) -> None: + if not isinstance(item, pytest.Function): + raise TypeError(f"Unexpected item type: {type(item)}") + + topology: list[Topology] = [] + mh_item_data: MultihostItemData | None = MultihostItemData.GetData(item) + for mark in item.iter_markers("builtwith"): + requirements: dict[str, str] = {} + + if len(mark.args) == 1 and not mark.kwargs: + # @pytest.mark.builtwith("feature_x") + # -> check if "feature_x" is supported by shadow + requirements["shadow"] = mark.args[0] + topology = [] + elif not mark.args and mark.kwargs: + # @pytest.mark.builtwith(shadow="feature_x", another_host="feature_x") -> + # -> check if "feature_x" is supported by both shadow and another_host + requirements = dict(mark.kwargs) + topology = [] + elif ( + len(mark.args) == 1 + and isinstance(mark.args[0], (Topology, KnownTopology, KnownTopologyGroup)) + and mark.kwargs + ): + # @pytest.mark.builtwith(KnownTopology.Shadow, shadow="feature_x") -> + # -> check if "feature_x" is supported by shadow only if the test runs on shadow topology + requirements = dict(mark.kwargs) + if isinstance(mark.args[0], Topology): + topology = [mark.args[0]] + elif isinstance(mark.args[0], KnownTopology): + topology = [mark.args[0].value.topology] + elif isinstance(mark.args[0], KnownTopologyGroup): + topology = [x.value.topology for x in mark.args[0].value] + else: + raise ValueError(f"{item.nodeid}::{item.originalname}: invalid arguments for @pytest.mark.builtwith") + + if mh_item_data is None: + raise ValueError(f"{item.nodeid}::{item.originalname}: multihost item data is not set") + + if mh_item_data.topology_mark is None: + raise ValueError(f"{item.nodeid}::{item.originalname}: multihost topology mark is not set") + + if not topology or mh_item_data.topology_mark.topology in topology: + item.add_marker(pytest.mark.require(partial(builtwith, item=item, requirements=requirements))) diff --git a/tests/system/framework/misc/__init__.py b/tests/system/framework/misc/__init__.py new file mode 100644 index 000000000..92e0b76b9 --- /dev/null +++ b/tests/system/framework/misc/__init__.py @@ -0,0 +1,42 @@ +"""Miscellaneous functions.""" + +from __future__ import annotations + +from typing import Any + + +def to_list(value: Any | list[Any] | None) -> list[Any]: + """ + Convert value into a list. + + - if value is ``None`` then return an empty list + - if value is already a list then return it unchanged + - if value is not a list then return ``[value]`` + + :param value: Value that should be converted to a list. + :type value: Any | list[Any] | None + :return: List with the value as an element. + :rtype: list[Any] + """ + if value is None: + return [] + + if isinstance(value, list): + return value + + return [value] + + +def to_list_of_strings(value: Any | list[Any] | None) -> list[str]: + """ + Convert given list or single value to list of strings. + + The ``value`` is first converted to a list and then ``str(item)`` is run on + each of its item. + + :param value: Value to convert. + :type value: Any | list[Any] | None + :return: List of strings. + :rtype: list[str] + """ + return [str(x) for x in to_list(value)] diff --git a/tests/system/framework/misc/errors.py b/tests/system/framework/misc/errors.py new file mode 100644 index 000000000..e4955af58 --- /dev/null +++ b/tests/system/framework/misc/errors.py @@ -0,0 +1,42 @@ +from __future__ import annotations + + +class ExpectScriptError(Exception): + """ + Expect script error. + + Seeing this exception means that there is an unhandled path or other error + in the expect script that was executed. The script needs to be fixed. + """ + + def __init__(self, code: int, msg: str | None = None) -> None: + """ + :param code: Expect script error code. + :type code: int + :param msg: Error message, defaults to None (translate error code to message) + :type msg: str | None, optional + """ + self.code: int = code + if msg is None: + msg = self.code_to_message(code) + + super().__init__(msg) + + def code_to_message(self, code: int) -> str: + """ + Translate expect script error codes used in this framework to message. + + :param code: Expect script error code. + :type code: int + :return: Error message. + :rtype: str + """ + match code: + case 201: + return "Timeout, unexpected output" + case 202: + return "Unexpected end of file" + case 203: + return "Unexpected code path" + + return "Unknown error code" diff --git a/tests/system/framework/roles/__init__.py b/tests/system/framework/roles/__init__.py new file mode 100644 index 000000000..9a45a54bc --- /dev/null +++ b/tests/system/framework/roles/__init__.py @@ -0,0 +1,3 @@ +"""shadow multihost roles.""" + +from __future__ import annotations diff --git a/tests/system/framework/roles/base.py b/tests/system/framework/roles/base.py new file mode 100644 index 000000000..616ba4eb3 --- /dev/null +++ b/tests/system/framework/roles/base.py @@ -0,0 +1,172 @@ +"""Base classes and objects for shadow specific multihost roles.""" + +from __future__ import annotations + +from typing import Any, Generic, TypeGuard, TypeVar + +from pytest_mh import MultihostRole +from pytest_mh.cli import CLIBuilder +from pytest_mh.conn import Bash, Shell +from pytest_mh.conn.ssh import SSHClient +from pytest_mh.utils.coredumpd import Coredumpd +from pytest_mh.utils.firewall import Firewalld +from pytest_mh.utils.fs import LinuxFileSystem +from pytest_mh.utils.journald import JournaldUtils +from pytest_mh.utils.tc import LinuxTrafficControl + +from ..hosts.base import BaseHost +from ..utils.tools import LinuxToolsUtils + +HostType = TypeVar("HostType", bound=BaseHost) +RoleType = TypeVar("RoleType", bound=MultihostRole) + + +__all__ = [ + "HostType", + "RoleType", + "DeleteAttribute", + "BaseObject", + "BaseRole", + "BaseLinuxRole", +] + + +class DeleteAttribute(object): + """ + This class is used to distinguish between setting an attribute to an empty + value and deleting it completely. + """ + + pass + + +class BaseObject(Generic[HostType, RoleType]): + """ + Base class for object management classes (like users or groups). + + It provides shortcuts to low level functionality to easily enable execution + of remote commands. It also defines multiple helper methods that are shared + across roles. + """ + + def __init__(self, role: RoleType) -> None: + self.role: RoleType = role + """Multihost role object.""" + + self.host: HostType = role.host + """Multihost host object.""" + + self.cli: CLIBuilder = self.host.cli + """Command line builder to easy build command line for execution.""" + + +class BaseRole(MultihostRole[HostType]): + """ + Base role class. Roles are the main interface to the remote hosts that can + be directly accessed in test cases as fixtures. + + All changes to the remote host that were done through the role object API + are automatically reverted when a test is finished. + """ + + Delete: DeleteAttribute = DeleteAttribute() + """ + Use this to indicate that you want to delete an attribute instead of setting + it to an empty value. + """ + + def __init__(self, *args, **kwargs) -> None: + super().__init__(*args, **kwargs) + + def is_delete_attribute(self, value: Any) -> TypeGuard[DeleteAttribute]: + """ + Return ``True`` if the value is :attr:`DeleteAttribute` + + :param value: Value to test. + :type value: Any + :return: Return ``True`` if the value is :attr:`DeleteAttribute` + :rtype: TypeGuard[DeleteAttribute] + """ + return isinstance(value, DeleteAttribute) + + @property + def features(self) -> dict[str, bool]: + """ + Features supported by the role. + """ + return self.host.features + + def ssh(self, user: str, password: str, *, shell: Shell | None = None) -> SSHClient: + """ + Open SSH connection to the host as given user. + + :param user: Username. + :type user: str + :param password: User password. + :type password: str + :param shell: Shell that will run the commands, defaults to ``None`` (= ``Bash``) + :type shell: Shell | None, optional + :return: SSH client connection. + :rtype: SSHClient + """ + if shell is None: + shell = Bash() + + host = self.host.hostname + port = 22 + + if isinstance(self.host.conn, SSHClient): + host = getattr(self.host.conn, "host", host) + port = getattr(self.host.conn, "port", 22) + + return SSHClient( + host=host, + port=port, + user=user, + password=password, + shell=shell, + logger=self.logger, + ) + + +class BaseLinuxRole(BaseRole[HostType]): + """ + Base linux role. + """ + + def __init__(self, *args, **kwargs) -> None: + super().__init__(*args, **kwargs) + + self.fs: LinuxFileSystem = LinuxFileSystem(self.host) + """ + File system manipulation. + """ + + self.firewall: Firewalld = Firewalld(self.host).postpone_setup() + """ + Configure firewall using firewalld. + """ + + self.tc: LinuxTrafficControl = LinuxTrafficControl(self.host).postpone_setup() + """ + Traffic control manipulation. + """ + + self.tools: LinuxToolsUtils = LinuxToolsUtils(self.host) + """ + Standard tools interface. + """ + + self.journald: JournaldUtils = JournaldUtils(self.host) + """ + Journald utilities. + """ + + coredumpd_config = self.host.config.get("coredumpd", {}) + coredumpd_mode = coredumpd_config.get("mode", "ignore") + coredumpd_filter = coredumpd_config.get("filter", None) + + self.coredumpd: Coredumpd = Coredumpd(self.host, self.fs, mode=coredumpd_mode, filter=coredumpd_filter) + """ + Coredumpd utilities. + """ diff --git a/tests/system/framework/roles/shadow.py b/tests/system/framework/roles/shadow.py new file mode 100644 index 000000000..51d7493ad --- /dev/null +++ b/tests/system/framework/roles/shadow.py @@ -0,0 +1,129 @@ +"""shadow multihost role.""" + +from __future__ import annotations + +import shlex +from typing import Dict + +from pytest_mh.conn import ProcessLogLevel, ProcessResult + +from ..hosts.shadow import ShadowHost +from .base import BaseLinuxRole + +__all__ = [ + "Shadow", +] + + +class Shadow(BaseLinuxRole[ShadowHost]): + """ + shadow role. + + Provides unified Python API for managing and testing shadow. + """ + + def __init__(self, *args, **kwargs) -> None: + """ + Set up the environment. + """ + super().__init__(*args, **kwargs) + + def teardown(self) -> None: + """ + Detect file mismatches before cleaning up the environment. + """ + self.host.detect_file_mismatches() + """ + Clean up the environment. + """ + super().teardown() + + def _parse_args(self, *args) -> Dict[str, str]: + args_list = shlex.split(*args[0]) + name = args_list[-1] + + return {"name": name} + + def useradd(self, *args) -> ProcessResult: + """ + Create user. + """ + args_dict = self._parse_args(args) + self.logger.info(f'Creating user "{args_dict["name"]}" on {self.host.hostname}') + cmd = self.host.conn.run("useradd " + args[0], log_level=ProcessLogLevel.Error) + + self.host.discard_file("/etc/passwd") + self.host.discard_file("/etc/shadow") + self.host.discard_file("/etc/group") + self.host.discard_file("/etc/gshadow") + + return cmd + + def usermod(self, *args) -> ProcessResult: + """ + Modify user. + """ + args_dict = self._parse_args(args) + self.logger.info(f'Modifying user "{args_dict["name"]}" on {self.host.hostname}') + cmd = self.host.conn.run("usermod " + args[0], log_level=ProcessLogLevel.Error) + + self.host.discard_file("/etc/passwd") + self.host.discard_file("/etc/shadow") + self.host.discard_file("/etc/group") + self.host.discard_file("/etc/gshadow") + + return cmd + + def userdel(self, *args) -> ProcessResult: + """ + Delete user. + """ + args_dict = self._parse_args(args) + self.logger.info(f'Deleting user "{args_dict["name"]}" on {self.host.hostname}') + cmd = self.host.conn.run("userdel " + args[0], log_level=ProcessLogLevel.Error) + + self.host.discard_file("/etc/passwd") + self.host.discard_file("/etc/shadow") + self.host.discard_file("/etc/group") + self.host.discard_file("/etc/gshadow") + + return cmd + + def groupadd(self, *args) -> ProcessResult: + """ + Create group. + """ + args_dict = self._parse_args(args) + self.logger.info(f'Creating group "{args_dict["name"]}" on {self.host.hostname}') + cmd = self.host.conn.run("groupadd " + args[0], log_level=ProcessLogLevel.Error) + + self.host.discard_file("/etc/group") + self.host.discard_file("/etc/gshadow") + + return cmd + + def groupmod(self, *args) -> ProcessResult: + """ + Modify group. + """ + args_dict = self._parse_args(args) + self.logger.info(f'Modifying group "{args_dict["name"]}" on {self.host.hostname}') + cmd = self.host.conn.run("groupmod " + args[0], log_level=ProcessLogLevel.Error) + + self.host.discard_file("/etc/group") + self.host.discard_file("/etc/gshadow") + + return cmd + + def groupdel(self, *args) -> ProcessResult: + """ + Delete group. + """ + args_dict = self._parse_args(args) + self.logger.info(f'Deleting group "{args_dict["name"]}" on {self.host.hostname}') + cmd = self.host.conn.run("groupdel " + args[0], log_level=ProcessLogLevel.Error) + + self.host.discard_file("/etc/group") + self.host.discard_file("/etc/gshadow") + + return cmd diff --git a/tests/system/framework/topology.py b/tests/system/framework/topology.py new file mode 100644 index 000000000..88f01d013 --- /dev/null +++ b/tests/system/framework/topology.py @@ -0,0 +1,55 @@ +"""Predefined well-known topologies.""" + +from __future__ import annotations + +from enum import unique +from typing import final + +from pytest_mh import KnownTopologyBase, KnownTopologyGroupBase, Topology, TopologyDomain, TopologyMark + +__all__ = [ + "KnownTopology", + "KnownTopologyGroup", +] + + +@final +@unique +class KnownTopology(KnownTopologyBase): + """ + Well-known topologies that can be given to ``pytest.mark.topology`` + directly. It is expected to use these values in favor of providing + custom marker values. + + .. code-block:: python + :caption: Example usage + + @pytest.mark.topology(KnownTopology.Shadow) + def test_ldap(shadow: Shadow): + assert True + """ + + Shadow = TopologyMark( + name="shadow", + topology=Topology(TopologyDomain("shadow", shadow=1)), + fixtures=dict(shadow="shadow.shadow[0]"), + ) + + +class KnownTopologyGroup(KnownTopologyGroupBase): + """ + Groups of well-known topologies that can be given to ``pytest.mark.topology`` + directly. It is expected to use these values in favor of providing + custom marker values. + + The test is parametrized and runs multiple times, once per each topology. + + .. code-block:: python + :caption: Example usage (runs on Shadow topology) + + @pytest.mark.topology(KnownTopologyGroup.AnyProvider) + def test_ldap(shadow: Shadow): + assert True + """ + + AnyProvider = [KnownTopology.Shadow] diff --git a/tests/system/framework/utils/__init__.py b/tests/system/framework/utils/__init__.py new file mode 100644 index 000000000..1ab732490 --- /dev/null +++ b/tests/system/framework/utils/__init__.py @@ -0,0 +1,3 @@ +"""shadow multihost utils used by roles.""" + +from __future__ import annotations diff --git a/tests/system/framework/utils/tools.py b/tests/system/framework/utils/tools.py new file mode 100644 index 000000000..64a298c20 --- /dev/null +++ b/tests/system/framework/utils/tools.py @@ -0,0 +1,475 @@ +"""Run various standard Linux commands on remote host.""" + +from __future__ import annotations + +from typing import Any + +import jc +from pytest_mh import MultihostHost, MultihostUtility +from pytest_mh.conn import Process + +__all__ = [ + "UnixObject", + "UnixUser", + "UnixGroup", + "IdEntry", + "PasswdEntry", + "GroupEntry", + "InitgroupsEntry", + "LinuxToolsUtils", + "KillCommand", + "GetentUtils", +] + + +class UnixObject(object): + """ + Generic Unix object. + """ + + def __init__(self, id: int | None, name: str | None) -> None: + """ + :param id: Object ID. + :type id: int | None + :param name: Object name. + :type name: str | None + """ + self.id: int | None = id + """ + ID. + """ + + self.name: str | None = name + """ + Name. + """ + + def __str__(self) -> str: + return f'({self.id},"{self.name}")' + + def __repr__(self) -> str: + return str(self) + + def __eq__(self, o: object) -> bool: + if isinstance(o, str): + return o == self.name + elif isinstance(o, int): + return o == self.id + elif isinstance(o, tuple): + if len(o) != 2 or not isinstance(o[0], int) or not isinstance(o[1], str): + raise NotImplementedError(f"Unable to compare {type(o)} with {self.__class__}") + + (id, name) = o + return id == self.id and name == self.name + elif isinstance(o, UnixObject): + # Fallback to identity comparison + return NotImplemented + + raise NotImplementedError(f"Unable to compare {type(o)} with {self.__class__}") + + +class UnixUser(UnixObject): + """ + Unix user. + """ + + pass + + +class UnixGroup(UnixObject): + """ + Unix group. + """ + + pass + + +class IdEntry(object): + """ + Result of ``id`` + """ + + def __init__(self, user: UnixUser, group: UnixGroup, groups: list[UnixGroup]) -> None: + self.user: UnixUser = user + """ + User information. + """ + + self.group: UnixGroup = group + """ + Primary group. + """ + + self.groups: list[UnixGroup] = groups + """ + Secondary groups. + """ + + def memberof(self, groups: int | str | tuple[int, str] | list[int | str | tuple[int, str]]) -> bool: + """ + Check if the user is member of give group(s). + + Group specification can be either a single gid or group name. But it can + be also a tuple of (gid, name) where both gid and name must match or list + of groups where the user must be member of all given groups. + + :param groups: _description_ + :type groups: int | str | tuple + :return: _description_ + :rtype: bool + """ + if isinstance(groups, (int, str, tuple)): + return groups in self.groups + + return all(x in self.groups for x in groups) + + def __str__(self) -> str: + return f"{{user={str(self.user)},group={str(self.group)},groups={str(self.groups)}}}" + + def __repr__(self) -> str: + return str(self) + + @classmethod + def FromDict(cls, d: dict[str, Any]) -> IdEntry: + user = UnixUser(d["uid"]["id"], d["uid"].get("name", None)) + group = UnixGroup(d["gid"]["id"], d["gid"].get("name", None)) + groups = [] + + for secondary_group in d["groups"]: + groups.append(UnixGroup(secondary_group["id"], secondary_group.get("name", None))) + + return cls(user, group, groups) + + @classmethod + def FromOutput(cls, stdout: str) -> IdEntry: + jcresult = jc.parse("id", stdout) + + if not isinstance(jcresult, dict): + raise TypeError(f"Unexpected type: {type(jcresult)}, expecting dict") + + return cls.FromDict(jcresult) + + +class PasswdEntry(object): + """ + Result of ``getent passwd`` + """ + + def __init__(self, name: str, password: str, uid: int, gid: int, gecos: str, home: str, shell: str) -> None: + self.name: str | None = name + """ + User name. + """ + + self.password: str | None = password + """ + User password. + """ + + self.uid: int = uid + """ + User id. + """ + + self.gid: int = gid + """ + Group id. + """ + + self.gecos: str | None = gecos + """ + GECOS. + """ + + self.home: str | None = home + """ + Home directory. + """ + + self.shell: str | None = shell + """ + Login shell. + """ + + def __str__(self) -> str: + return f"({self.name}:{self.password}:{self.uid}:{self.gid}:{self.gecos}:{self.home}:{self.shell})" + + def __repr__(self) -> str: + return str(self) + + @classmethod + def FromDict(cls, d: dict[str, Any]) -> PasswdEntry: + return cls( + name=d.get("username", None), + password=d.get("password", None), + uid=d.get("uid", None), + gid=d.get("gid", None), + gecos=d.get("comment", None), + home=d.get("home", None), + shell=d.get("shell", None), + ) + + @classmethod + def FromOutput(cls, stdout: str) -> PasswdEntry: + result = jc.parse("passwd", stdout) + + if not isinstance(result, list): + raise TypeError(f"Unexpected type: {type(result)}, expecting list") + + if len(result) != 1: + raise ValueError("More then one entry was returned") + + return cls.FromDict(result[0]) + + +class GroupEntry(object): + """ + Result of ``getent group`` + """ + + def __init__(self, name: str, password: str, gid: int, members: list[str]) -> None: + self.name: str | None = name + """ + Group name. + """ + + self.password: str | None = password + """ + Group password. + """ + + self.gid: int = gid + """ + Group id. + """ + + self.members: list[str] = members + """ + Group members. + """ + + def __str__(self) -> str: + return f'({self.name}:{self.password}:{self.gid}:{",".join(self.members)})' + + def __repr__(self) -> str: + return str(self) + + @classmethod + def FromDict(cls, d: dict[str, Any]) -> GroupEntry: + return cls( + name=d.get("group_name", None), + password=d.get("password", None), + gid=d.get("gid", None), + members=d.get("members", []), + ) + + @classmethod + def FromOutput(cls, stdout: str) -> GroupEntry: + result = jc.parse("group", stdout) + + if not isinstance(result, list): + raise TypeError(f"Unexpected type: {type(result)}, expecting list") + + if len(result) != 1: + raise ValueError("More then one entry was returned") + + return cls.FromDict(result[0]) + + +class InitgroupsEntry(object): + """ + Result of ``getent initgroups`` + + If user does not exist or does not have any supplementary groups then ``self.groups`` is empty. + """ + + def __init__(self, name: str, groups: list[int]) -> None: + self.name: str = name + """ + Exact username for which ``initgroups`` was called + """ + + self.groups: list[int] = groups + """ + Group ids that ``name`` is member of. + """ + + def __str__(self) -> str: + return f'({self.name}:{",".join([str(i) for i in self.groups])})' + + def __repr__(self) -> str: + return str(self) + + def memberof(self, groups: list[int]) -> bool: + """ + Check if the user is member of given groups. + + This method checks only supplementary groups not the primary group. + + :param groups: List of group ids + :type groups: list[int] + :return: If user is member of all given groups True, otherwise False. + :rtype: bool + """ + + return all(x in self.groups for x in groups) + + @classmethod + def FromDict(cls, d: dict[str, Any]) -> InitgroupsEntry: + return cls( + name=d["name"], + groups=d.get("groups", []), + ) + + @classmethod + def FromOutput(cls, stdout: str) -> InitgroupsEntry: + result: list[str] = stdout.split() + + dictionary: dict[str, str | list[int]] = {} + dictionary["name"] = result[0] + + if len(result) > 1: + dictionary["groups"] = [int(x) for x in result[1:]] + + return cls.FromDict(dictionary) + + +class LinuxToolsUtils(MultihostUtility[MultihostHost]): + """ + Run various standard commands on remote host. + """ + + def __init__(self, host: MultihostHost) -> None: + """ + :param host: Remote host. + :type host: MultihostHost + """ + super().__init__(host) + + self.getent: GetentUtils = GetentUtils(host) + """ + Run ``getent`` command. + """ + + def id(self, name: str | int) -> IdEntry | None: + """ + Run ``id`` command. + + :param name: User name or id. + :type name: str | int + :return: id data, None if not found + :rtype: IdEntry | None + """ + command = self.host.conn.exec(["id", name], raise_on_error=False) + if command.rc != 0: + return None + + return IdEntry.FromOutput(command.stdout) + + def grep(self, pattern: str, paths: str | list[str], args: list[str] | None = None) -> bool: + """ + Run ``grep`` command. + + :param pattern: Pattern to match. + :type pattern: str + :param paths: Paths to search. + :type paths: str | list[str] + :param args: Additional arguments to ``grep`` command, defaults to None. + :type args: list[str] | None, optional + :return: True if grep returned 0, False otherwise. + :rtype: bool + """ + if args is None: + args = [] + + paths = [paths] if isinstance(paths, str) else paths + command = self.host.conn.exec(["grep", *args, pattern, *paths]) + + return command.rc == 0 + + +class KillCommand(object): + def __init__(self, host: MultihostHost, process: Process, pid: int) -> None: + self.host = host + self.process = process + self.pid = pid + self.__killed: bool = False + + def kill(self) -> None: + if self.__killed: + return + + self.host.conn.exec(["kill", self.pid]) + self.__killed = True + + def __enter__(self) -> KillCommand: + return self + + def __exit__(self, exception_type, exception_value, traceback) -> None: + self.kill() + self.process.wait() + + +class GetentUtils(MultihostUtility[MultihostHost]): + """ + Interface to getent command. + """ + + def __init__(self, host: MultihostHost) -> None: + """ + :param host: Remote host. + :type host: MultihostHost + """ + super().__init__(host) + + def passwd(self, name: str | int, *, service: str | None = None) -> PasswdEntry | None: + """ + Call ``getent passwd $name`` + + :param name: User name or id. + :type name: str | int + :param service: Service used, defaults to None + :type service: str | None + :return: passwd data, None if not found + :rtype: PasswdEntry | None + """ + return self.__exec(PasswdEntry, "passwd", name, service) + + def group(self, name: str | int, *, service: str | None = None) -> GroupEntry | None: + """ + Call ``getent group $name`` + + :param name: Group name or id. + :type name: str | int + :param service: Service used, defaults to None + :type service: str | None + :return: group data, None if not found + :rtype: PasswdEntry | None + """ + return self.__exec(GroupEntry, "group", name, service) + + def initgroups(self, name: str, *, service: str | None = None) -> InitgroupsEntry: + """ + Call ``getent initgroups $name`` + + If ``name`` does not exist, group list is empty. This is standard behavior of ``getent initgroups`` + + :param name: User name. + :type name: str + :param service: Service used, defaults to None + :type service: str | None + :return: Initgroups data + :rtype: InitgroupsEntry + """ + return self.__exec(InitgroupsEntry, "initgroups", name, service) + + def __exec(self, cls, cmd: str, name: str | int, service: str | None = None) -> Any: + args = [] + if service is not None: + args = ["-s", service] + + command = self.host.conn.exec(["getent", *args, cmd, name], raise_on_error=False) + if command.rc != 0: + return None + + return cls.FromOutput(command.stdout)