--- /dev/null
+from __future__ import annotations
+
+from typing import Type
+
+from pytest_mh import MultihostConfig, MultihostDomain, MultihostHost, MultihostRole
+
+__all__ = [
+ "ShadowMultihostConfig",
+ "ShadowMultihostDomain",
+]
+
+
+class ShadowMultihostConfig(MultihostConfig):
+ @property
+ def id_to_domain_class(self) -> dict[str, Type[MultihostDomain]]:
+ """
+ All domains are mapped to :class:`ShadowMultihostDomain`.
+
+ :rtype: Class name.
+ """
+ return {"*": ShadowMultihostDomain}
+
+
+class ShadowMultihostDomain(MultihostDomain[ShadowMultihostConfig]):
+ @property
+ def role_to_host_class(self) -> dict[str, Type[MultihostHost]]:
+ """
+ Map roles to classes:
+
+ * shadow to ShadowHost
+
+ :rtype: Class name.
+ """
+ from .hosts.shadow import ShadowHost
+
+ return {
+ "shadow": ShadowHost,
+ }
+
+ @property
+ def role_to_role_class(self) -> dict[str, Type[MultihostRole]]:
+ """
+ Map roles to classes:
+
+ * shadow to Shadow
+
+ :rtype: Class name.
+ """
+ from .roles.shadow import Shadow
+
+ return {
+ "shadow": Shadow,
+ }
--- /dev/null
+"""Pytest fixtures."""
+
+from __future__ import annotations
+
+import os
+
+import pytest
+
+
+@pytest.fixture(scope="session")
+def datadir(request: pytest.FixtureRequest) -> str:
+ """
+ Data directory shared for all tests.
+
+ :return: Path to the data directory ``(root-pytest-dir)/data``.
+ :rtype: str
+ """
+ return os.path.join(request.node.path, "data")
+
+
+@pytest.fixture(scope="module")
+def moduledatadir(datadir: str, request: pytest.FixtureRequest) -> str:
+ """
+ Data directory shared for all tests within a single module.
+
+ :return: Path to the data directory ``(root-pytest-dir)/data/$module_name``.
+ :rtype: str
+ """
+ name = request.module.__name__
+ return os.path.join(datadir, name)
+
+
+@pytest.fixture(scope="function")
+def testdatadir(moduledatadir: str, request: pytest.FixtureRequest) -> str:
+ """
+ Data directory for current test.
+
+ :return: Path to the data directory ``(root-pytest-dir)/data/$module_name/$test_name``.
+ :rtype: str
+ """
+ if not isinstance(request.node, pytest.Function):
+ raise TypeError(f"Excepted pytest.Function, got {type(request.node)}")
+
+ name = request.node.originalname
+ return os.path.join(moduledatadir, name)
--- /dev/null
+"""shadow multihost hosts."""
+
+from __future__ import annotations
--- /dev/null
+"""Base classes and objects for shadow specific multihost hosts."""
+
+from __future__ import annotations
+
+import csv
+
+from pytest_mh import MultihostBackupHost, MultihostHost
+from pytest_mh.utils.fs import LinuxFileSystem
+
+from ..config import ShadowMultihostDomain
+
+__all__ = [
+ "BaseHost",
+ "BaseLinuxHost",
+]
+
+
+class BaseHost(MultihostBackupHost[ShadowMultihostDomain]):
+ """
+ Base class for all shadow hosts.
+ """
+
+ def __init__(self, *args, **kwargs) -> None:
+ # restore is handled in topology controllers
+ super().__init__(*args, **kwargs)
+
+ @property
+ def features(self) -> dict[str, bool]:
+ """
+ Features supported by the host.
+ """
+ return {}
+
+
+class BaseLinuxHost(MultihostHost[ShadowMultihostDomain]):
+ """
+ Base Linux host.
+
+ Adds linux specific reentrant utilities.
+ """
+
+ def __init__(self, *args, **kwargs) -> None:
+ super().__init__(*args, **kwargs)
+
+ self.fs: LinuxFileSystem = LinuxFileSystem(self)
+ self._os_release: dict = {}
+ self._distro_name: str = "unknown"
+ self._distro_major: int = 0
+ self._distro_minor: int = 0
+
+ def _distro_information(self):
+ """
+ Pulls distro information from a host from /ets/os-release
+ """
+ self.logger.info(f"Detecting distro information on {self.hostname}")
+ os_release = self.fs.read("/etc/os-release")
+ self._os_release = dict(csv.reader([x for x in os_release.splitlines() if x], delimiter="="))
+ if "NAME" in self._os_release:
+ self._distro_name = self._os_release["NAME"]
+ if "VERSION_ID" not in self._os_release:
+ return
+ if "." in self._os_release["VERSION_ID"]:
+ self._distro_major = int(self._os_release["VERSION_ID"].split(".", maxsplit=1)[0])
+ self._distro_minor = int(self._os_release["VERSION_ID"].split(".", maxsplit=1)[1])
+ else:
+ self._distro_major = int(self._os_release["VERSION_ID"])
+
+ @property
+ def distro_name(self) -> str:
+ """
+ Host distribution
+
+ :return: Distribution name or "unknown"
+ :rtype: str
+ """
+ # NAME item from os-release
+ if not self._os_release:
+ self._distro_information()
+ return self._distro_name
+
+ @property
+ def distro_major(self) -> int:
+ """
+ Host distribution major version
+
+ :return: Major version
+ :rtype: int
+ """
+ # First part of VERSION_ID from os-release
+ # Returns zero when could not detect
+ if not self._os_release:
+ self._distro_information()
+ return self._distro_major
+
+ @property
+ def distro_minor(self) -> int:
+ """
+ Host distribution minor version
+
+ :return: Minor version
+ :rtype: int
+ """
+ # Second part of VERSION_ID from os-release
+ # Returns zero when no minor version is present
+ if not self._os_release:
+ self._distro_information()
+ return self._distro_minor
--- /dev/null
+"""shadow multihost host."""
+
+from __future__ import annotations
+
+from pathlib import PurePosixPath
+from typing import Any
+
+from pytest_mh.conn import ProcessLogLevel
+
+from .base import BaseHost, BaseLinuxHost
+
+__all__ = [
+ "ShadowHost",
+]
+
+
+class ShadowHost(BaseHost, BaseLinuxHost):
+ """
+ shadow host object.
+
+ This is the host where the tests are run.
+
+ .. note::
+
+ Full backup and restore of shadow state is supported.
+ """
+
+ def __init__(self, *args, **kwargs) -> None:
+ super().__init__(*args, **kwargs)
+
+ self._features: dict[str, bool] | None = None
+ """Features dictionary."""
+
+ self._backup_path: PurePosixPath | None = None
+ """Path to backup files."""
+
+ self._verify_files: [dict[str, str]] = [
+ {"origin": "/etc/passwd", "backup": "passwd"},
+ {"origin": "/etc/shadow", "backup": "shadow"},
+ {"origin": "/etc/group", "backup": "group"},
+ {"origin": "/etc/gshadow", "backup": "gshadow"},
+ ]
+ """Files to verify for mismatch."""
+
+ def pytest_setup(self) -> None:
+ super().pytest_setup()
+
+ def start(self) -> None:
+ """
+ Not supported.
+
+ :raises NotImplementedError: _description_
+ """
+ raise NotImplementedError("Starting shadow service is not implemented.")
+
+ def stop(self) -> None:
+ """
+ Not supported.
+
+ :raises NotImplementedError: _description_
+ """
+ raise NotImplementedError("Stopping shadow service is not implemented.")
+
+ def backup(self) -> Any:
+ """
+ Backup all shadow data.
+
+ :return: Backup data.
+ :rtype: Any
+ """
+ self.logger.info("Creating backup of shadow host")
+
+ result = self.conn.run(
+ """
+ set -ex
+
+ function backup {
+ if [ -d "$1" ] || [ -f "$1" ]; then
+ cp --force --archive "$1" "$2"
+ fi
+ }
+
+ path=`mktemp -d`
+ backup /etc/login.defs "$path/login.defs"
+ backup /etc/default/useradd "$path/useradd"
+ backup /etc/passwd "$path/passwd"
+ backup /etc/shadow "$path/shadow"
+ backup /etc/group "$path/group"
+ backup /etc/gshadow "$path/gshadow"
+ backup /etc/subuid "$path/subuid"
+ backup /etc/subgid "$path/subgid"
+ backup /home "$path/home"
+ backup /var/log/secure "$path/secure"
+
+ echo $path
+ """,
+ log_level=ProcessLogLevel.Error,
+ )
+
+ self._backup_path = PurePosixPath(result.stdout_lines[-1].strip())
+
+ return PurePosixPath(result.stdout_lines[-1].strip())
+
+ def restore(self, backup_data: Any | None) -> None:
+ """
+ Restore all shadow data.
+
+ :return: Backup data.
+ :rtype: Any
+ """
+ if backup_data is None:
+ return
+
+ if not isinstance(backup_data, PurePosixPath):
+ raise TypeError(f"Expected PurePosixPath, got {type(backup_data)}")
+
+ backup_path = str(backup_data)
+
+ self.logger.info(f"Restoring shadow data from {backup_path}")
+ self.conn.run(
+ f"""
+ set -ex
+
+ function restore {{
+ rm --force --recursive "$2"
+ if [ -d "$1" ] || [ -f "$1" ]; then
+ cp --force --archive "$1" "$2"
+ fi
+ }}
+
+ rm --force --recursive /var/log/secure
+ restore "{backup_path}/login.defs" /etc/login.defs
+ restore "{backup_path}/useradd" /etc/default/useradd
+ restore "{backup_path}/passwd" /etc/passwd
+ restore "{backup_path}/shadow" /etc/shadow
+ restore "{backup_path}/group" /etc/group
+ restore "{backup_path}/gshadow" /etc/gshadow
+ restore "{backup_path}/subuid" /etc/subuid
+ restore "{backup_path}/subgid" /etc/subgid
+ restore "{backup_path}/home" /home
+ restore "{backup_path}/secure" /var/log/secure
+ """,
+ log_level=ProcessLogLevel.Error,
+ )
+
+ def detect_file_mismatches(self) -> None:
+ """
+ Shadow binaries modify a number of files, but usually do not modify all of them. This is why we add an
+ additional check at the end of the test to verify that the files that should not have been modified are still
+ intact.
+ """
+ self.logger.info(f"Detecting mismatches in shadow files {self._backup_path}")
+
+ for x in self._verify_files:
+ result = self.conn.run(
+ f"""
+ set -ex
+
+ cmp {x['origin']} {self._backup_path}/{x['backup']}
+ """,
+ log_level=ProcessLogLevel.Error,
+ raise_on_error=False,
+ )
+ if result.rc != 0:
+ self.logger.error(f"File mismatch in '{x['origin']}' and '{self._backup_path}/{x['backup']}'")
+ result.throw()
+
+ def discard_file(self, origin: str) -> None:
+ """
+ Discard modified files from the files that should be verified.
+ """
+ for x in self._verify_files:
+ if x["origin"] == origin:
+ self._verify_files.remove(x)
+ break
--- /dev/null
+"""Pytest fixtures."""
+
+from __future__ import annotations
+
+from functools import partial
+
+import pytest
+from pytest_mh import MultihostItemData, Topology
+
+from .misc import to_list_of_strings
+from .roles.base import BaseRole
+from .topology import KnownTopology, KnownTopologyGroup
+
+
+def pytest_configure(config: pytest.Config):
+ """
+ Pytest hook: register multihost plugin.
+ """
+
+ # register additional markers
+ config.addinivalue_line(
+ "markers",
+ "builtwith(feature): Run test only if shadow was built with given feature",
+ )
+
+
+def builtwith(item: pytest.Function, requirements: dict[str, str], **kwargs: BaseRole):
+ def value_error(msg: str) -> ValueError:
+ return ValueError(f"{item.nodeid}::{item.originalname}: @pytest.mark.builtwith: {msg}")
+
+ errors: list[str] = []
+ for role, features in requirements.items():
+ if role not in kwargs:
+ raise value_error(f"unknown fixture '{role}'")
+
+ if not isinstance(kwargs[role], BaseRole):
+ raise value_error(f"fixture '{role}' is not instance of BaseRole")
+
+ obj = kwargs[role]
+ for feature in to_list_of_strings(features):
+ if feature not in obj.features:
+ raise value_error(f"unknown feature '{feature}' in '{role}'")
+
+ if not obj.features[feature]:
+ errors.append(f'{role} does not support "{feature}"')
+
+ if len(errors) == 1:
+ return (False, errors[0])
+ elif len(errors) > 1:
+ return (False, str(errors))
+
+ # All requirements were passed
+ return True
+
+
+@pytest.hookimpl(tryfirst=True)
+def pytest_runtest_setup(item: pytest.Item) -> None:
+ if not isinstance(item, pytest.Function):
+ raise TypeError(f"Unexpected item type: {type(item)}")
+
+ topology: list[Topology] = []
+ mh_item_data: MultihostItemData | None = MultihostItemData.GetData(item)
+ for mark in item.iter_markers("builtwith"):
+ requirements: dict[str, str] = {}
+
+ if len(mark.args) == 1 and not mark.kwargs:
+ # @pytest.mark.builtwith("feature_x")
+ # -> check if "feature_x" is supported by shadow
+ requirements["shadow"] = mark.args[0]
+ topology = []
+ elif not mark.args and mark.kwargs:
+ # @pytest.mark.builtwith(shadow="feature_x", another_host="feature_x") ->
+ # -> check if "feature_x" is supported by both shadow and another_host
+ requirements = dict(mark.kwargs)
+ topology = []
+ elif (
+ len(mark.args) == 1
+ and isinstance(mark.args[0], (Topology, KnownTopology, KnownTopologyGroup))
+ and mark.kwargs
+ ):
+ # @pytest.mark.builtwith(KnownTopology.Shadow, shadow="feature_x") ->
+ # -> check if "feature_x" is supported by shadow only if the test runs on shadow topology
+ requirements = dict(mark.kwargs)
+ if isinstance(mark.args[0], Topology):
+ topology = [mark.args[0]]
+ elif isinstance(mark.args[0], KnownTopology):
+ topology = [mark.args[0].value.topology]
+ elif isinstance(mark.args[0], KnownTopologyGroup):
+ topology = [x.value.topology for x in mark.args[0].value]
+ else:
+ raise ValueError(f"{item.nodeid}::{item.originalname}: invalid arguments for @pytest.mark.builtwith")
+
+ if mh_item_data is None:
+ raise ValueError(f"{item.nodeid}::{item.originalname}: multihost item data is not set")
+
+ if mh_item_data.topology_mark is None:
+ raise ValueError(f"{item.nodeid}::{item.originalname}: multihost topology mark is not set")
+
+ if not topology or mh_item_data.topology_mark.topology in topology:
+ item.add_marker(pytest.mark.require(partial(builtwith, item=item, requirements=requirements)))
--- /dev/null
+"""Miscellaneous functions."""
+
+from __future__ import annotations
+
+from typing import Any
+
+
+def to_list(value: Any | list[Any] | None) -> list[Any]:
+ """
+ Convert value into a list.
+
+ - if value is ``None`` then return an empty list
+ - if value is already a list then return it unchanged
+ - if value is not a list then return ``[value]``
+
+ :param value: Value that should be converted to a list.
+ :type value: Any | list[Any] | None
+ :return: List with the value as an element.
+ :rtype: list[Any]
+ """
+ if value is None:
+ return []
+
+ if isinstance(value, list):
+ return value
+
+ return [value]
+
+
+def to_list_of_strings(value: Any | list[Any] | None) -> list[str]:
+ """
+ Convert given list or single value to list of strings.
+
+ The ``value`` is first converted to a list and then ``str(item)`` is run on
+ each of its item.
+
+ :param value: Value to convert.
+ :type value: Any | list[Any] | None
+ :return: List of strings.
+ :rtype: list[str]
+ """
+ return [str(x) for x in to_list(value)]
--- /dev/null
+from __future__ import annotations
+
+
+class ExpectScriptError(Exception):
+ """
+ Expect script error.
+
+ Seeing this exception means that there is an unhandled path or other error
+ in the expect script that was executed. The script needs to be fixed.
+ """
+
+ def __init__(self, code: int, msg: str | None = None) -> None:
+ """
+ :param code: Expect script error code.
+ :type code: int
+ :param msg: Error message, defaults to None (translate error code to message)
+ :type msg: str | None, optional
+ """
+ self.code: int = code
+ if msg is None:
+ msg = self.code_to_message(code)
+
+ super().__init__(msg)
+
+ def code_to_message(self, code: int) -> str:
+ """
+ Translate expect script error codes used in this framework to message.
+
+ :param code: Expect script error code.
+ :type code: int
+ :return: Error message.
+ :rtype: str
+ """
+ match code:
+ case 201:
+ return "Timeout, unexpected output"
+ case 202:
+ return "Unexpected end of file"
+ case 203:
+ return "Unexpected code path"
+
+ return "Unknown error code"
--- /dev/null
+"""shadow multihost roles."""
+
+from __future__ import annotations
--- /dev/null
+"""Base classes and objects for shadow specific multihost roles."""
+
+from __future__ import annotations
+
+from typing import Any, Generic, TypeGuard, TypeVar
+
+from pytest_mh import MultihostRole
+from pytest_mh.cli import CLIBuilder
+from pytest_mh.conn import Bash, Shell
+from pytest_mh.conn.ssh import SSHClient
+from pytest_mh.utils.coredumpd import Coredumpd
+from pytest_mh.utils.firewall import Firewalld
+from pytest_mh.utils.fs import LinuxFileSystem
+from pytest_mh.utils.journald import JournaldUtils
+from pytest_mh.utils.tc import LinuxTrafficControl
+
+from ..hosts.base import BaseHost
+from ..utils.tools import LinuxToolsUtils
+
+HostType = TypeVar("HostType", bound=BaseHost)
+RoleType = TypeVar("RoleType", bound=MultihostRole)
+
+
+__all__ = [
+ "HostType",
+ "RoleType",
+ "DeleteAttribute",
+ "BaseObject",
+ "BaseRole",
+ "BaseLinuxRole",
+]
+
+
+class DeleteAttribute(object):
+ """
+ This class is used to distinguish between setting an attribute to an empty
+ value and deleting it completely.
+ """
+
+ pass
+
+
+class BaseObject(Generic[HostType, RoleType]):
+ """
+ Base class for object management classes (like users or groups).
+
+ It provides shortcuts to low level functionality to easily enable execution
+ of remote commands. It also defines multiple helper methods that are shared
+ across roles.
+ """
+
+ def __init__(self, role: RoleType) -> None:
+ self.role: RoleType = role
+ """Multihost role object."""
+
+ self.host: HostType = role.host
+ """Multihost host object."""
+
+ self.cli: CLIBuilder = self.host.cli
+ """Command line builder to easy build command line for execution."""
+
+
+class BaseRole(MultihostRole[HostType]):
+ """
+ Base role class. Roles are the main interface to the remote hosts that can
+ be directly accessed in test cases as fixtures.
+
+ All changes to the remote host that were done through the role object API
+ are automatically reverted when a test is finished.
+ """
+
+ Delete: DeleteAttribute = DeleteAttribute()
+ """
+ Use this to indicate that you want to delete an attribute instead of setting
+ it to an empty value.
+ """
+
+ def __init__(self, *args, **kwargs) -> None:
+ super().__init__(*args, **kwargs)
+
+ def is_delete_attribute(self, value: Any) -> TypeGuard[DeleteAttribute]:
+ """
+ Return ``True`` if the value is :attr:`DeleteAttribute`
+
+ :param value: Value to test.
+ :type value: Any
+ :return: Return ``True`` if the value is :attr:`DeleteAttribute`
+ :rtype: TypeGuard[DeleteAttribute]
+ """
+ return isinstance(value, DeleteAttribute)
+
+ @property
+ def features(self) -> dict[str, bool]:
+ """
+ Features supported by the role.
+ """
+ return self.host.features
+
+ def ssh(self, user: str, password: str, *, shell: Shell | None = None) -> SSHClient:
+ """
+ Open SSH connection to the host as given user.
+
+ :param user: Username.
+ :type user: str
+ :param password: User password.
+ :type password: str
+ :param shell: Shell that will run the commands, defaults to ``None`` (= ``Bash``)
+ :type shell: Shell | None, optional
+ :return: SSH client connection.
+ :rtype: SSHClient
+ """
+ if shell is None:
+ shell = Bash()
+
+ host = self.host.hostname
+ port = 22
+
+ if isinstance(self.host.conn, SSHClient):
+ host = getattr(self.host.conn, "host", host)
+ port = getattr(self.host.conn, "port", 22)
+
+ return SSHClient(
+ host=host,
+ port=port,
+ user=user,
+ password=password,
+ shell=shell,
+ logger=self.logger,
+ )
+
+
+class BaseLinuxRole(BaseRole[HostType]):
+ """
+ Base linux role.
+ """
+
+ def __init__(self, *args, **kwargs) -> None:
+ super().__init__(*args, **kwargs)
+
+ self.fs: LinuxFileSystem = LinuxFileSystem(self.host)
+ """
+ File system manipulation.
+ """
+
+ self.firewall: Firewalld = Firewalld(self.host).postpone_setup()
+ """
+ Configure firewall using firewalld.
+ """
+
+ self.tc: LinuxTrafficControl = LinuxTrafficControl(self.host).postpone_setup()
+ """
+ Traffic control manipulation.
+ """
+
+ self.tools: LinuxToolsUtils = LinuxToolsUtils(self.host)
+ """
+ Standard tools interface.
+ """
+
+ self.journald: JournaldUtils = JournaldUtils(self.host)
+ """
+ Journald utilities.
+ """
+
+ coredumpd_config = self.host.config.get("coredumpd", {})
+ coredumpd_mode = coredumpd_config.get("mode", "ignore")
+ coredumpd_filter = coredumpd_config.get("filter", None)
+
+ self.coredumpd: Coredumpd = Coredumpd(self.host, self.fs, mode=coredumpd_mode, filter=coredumpd_filter)
+ """
+ Coredumpd utilities.
+ """
--- /dev/null
+"""shadow multihost role."""
+
+from __future__ import annotations
+
+import shlex
+from typing import Dict
+
+from pytest_mh.conn import ProcessLogLevel, ProcessResult
+
+from ..hosts.shadow import ShadowHost
+from .base import BaseLinuxRole
+
+__all__ = [
+ "Shadow",
+]
+
+
+class Shadow(BaseLinuxRole[ShadowHost]):
+ """
+ shadow role.
+
+ Provides unified Python API for managing and testing shadow.
+ """
+
+ def __init__(self, *args, **kwargs) -> None:
+ """
+ Set up the environment.
+ """
+ super().__init__(*args, **kwargs)
+
+ def teardown(self) -> None:
+ """
+ Detect file mismatches before cleaning up the environment.
+ """
+ self.host.detect_file_mismatches()
+ """
+ Clean up the environment.
+ """
+ super().teardown()
+
+ def _parse_args(self, *args) -> Dict[str, str]:
+ args_list = shlex.split(*args[0])
+ name = args_list[-1]
+
+ return {"name": name}
+
+ def useradd(self, *args) -> ProcessResult:
+ """
+ Create user.
+ """
+ args_dict = self._parse_args(args)
+ self.logger.info(f'Creating user "{args_dict["name"]}" on {self.host.hostname}')
+ cmd = self.host.conn.run("useradd " + args[0], log_level=ProcessLogLevel.Error)
+
+ self.host.discard_file("/etc/passwd")
+ self.host.discard_file("/etc/shadow")
+ self.host.discard_file("/etc/group")
+ self.host.discard_file("/etc/gshadow")
+
+ return cmd
+
+ def usermod(self, *args) -> ProcessResult:
+ """
+ Modify user.
+ """
+ args_dict = self._parse_args(args)
+ self.logger.info(f'Modifying user "{args_dict["name"]}" on {self.host.hostname}')
+ cmd = self.host.conn.run("usermod " + args[0], log_level=ProcessLogLevel.Error)
+
+ self.host.discard_file("/etc/passwd")
+ self.host.discard_file("/etc/shadow")
+ self.host.discard_file("/etc/group")
+ self.host.discard_file("/etc/gshadow")
+
+ return cmd
+
+ def userdel(self, *args) -> ProcessResult:
+ """
+ Delete user.
+ """
+ args_dict = self._parse_args(args)
+ self.logger.info(f'Deleting user "{args_dict["name"]}" on {self.host.hostname}')
+ cmd = self.host.conn.run("userdel " + args[0], log_level=ProcessLogLevel.Error)
+
+ self.host.discard_file("/etc/passwd")
+ self.host.discard_file("/etc/shadow")
+ self.host.discard_file("/etc/group")
+ self.host.discard_file("/etc/gshadow")
+
+ return cmd
+
+ def groupadd(self, *args) -> ProcessResult:
+ """
+ Create group.
+ """
+ args_dict = self._parse_args(args)
+ self.logger.info(f'Creating group "{args_dict["name"]}" on {self.host.hostname}')
+ cmd = self.host.conn.run("groupadd " + args[0], log_level=ProcessLogLevel.Error)
+
+ self.host.discard_file("/etc/group")
+ self.host.discard_file("/etc/gshadow")
+
+ return cmd
+
+ def groupmod(self, *args) -> ProcessResult:
+ """
+ Modify group.
+ """
+ args_dict = self._parse_args(args)
+ self.logger.info(f'Modifying group "{args_dict["name"]}" on {self.host.hostname}')
+ cmd = self.host.conn.run("groupmod " + args[0], log_level=ProcessLogLevel.Error)
+
+ self.host.discard_file("/etc/group")
+ self.host.discard_file("/etc/gshadow")
+
+ return cmd
+
+ def groupdel(self, *args) -> ProcessResult:
+ """
+ Delete group.
+ """
+ args_dict = self._parse_args(args)
+ self.logger.info(f'Deleting group "{args_dict["name"]}" on {self.host.hostname}')
+ cmd = self.host.conn.run("groupdel " + args[0], log_level=ProcessLogLevel.Error)
+
+ self.host.discard_file("/etc/group")
+ self.host.discard_file("/etc/gshadow")
+
+ return cmd
--- /dev/null
+"""Predefined well-known topologies."""
+
+from __future__ import annotations
+
+from enum import unique
+from typing import final
+
+from pytest_mh import KnownTopologyBase, KnownTopologyGroupBase, Topology, TopologyDomain, TopologyMark
+
+__all__ = [
+ "KnownTopology",
+ "KnownTopologyGroup",
+]
+
+
+@final
+@unique
+class KnownTopology(KnownTopologyBase):
+ """
+ Well-known topologies that can be given to ``pytest.mark.topology``
+ directly. It is expected to use these values in favor of providing
+ custom marker values.
+
+ .. code-block:: python
+ :caption: Example usage
+
+ @pytest.mark.topology(KnownTopology.Shadow)
+ def test_ldap(shadow: Shadow):
+ assert True
+ """
+
+ Shadow = TopologyMark(
+ name="shadow",
+ topology=Topology(TopologyDomain("shadow", shadow=1)),
+ fixtures=dict(shadow="shadow.shadow[0]"),
+ )
+
+
+class KnownTopologyGroup(KnownTopologyGroupBase):
+ """
+ Groups of well-known topologies that can be given to ``pytest.mark.topology``
+ directly. It is expected to use these values in favor of providing
+ custom marker values.
+
+ The test is parametrized and runs multiple times, once per each topology.
+
+ .. code-block:: python
+ :caption: Example usage (runs on Shadow topology)
+
+ @pytest.mark.topology(KnownTopologyGroup.AnyProvider)
+ def test_ldap(shadow: Shadow):
+ assert True
+ """
+
+ AnyProvider = [KnownTopology.Shadow]
--- /dev/null
+"""shadow multihost utils used by roles."""
+
+from __future__ import annotations
--- /dev/null
+"""Run various standard Linux commands on remote host."""
+
+from __future__ import annotations
+
+from typing import Any
+
+import jc
+from pytest_mh import MultihostHost, MultihostUtility
+from pytest_mh.conn import Process
+
+__all__ = [
+ "UnixObject",
+ "UnixUser",
+ "UnixGroup",
+ "IdEntry",
+ "PasswdEntry",
+ "GroupEntry",
+ "InitgroupsEntry",
+ "LinuxToolsUtils",
+ "KillCommand",
+ "GetentUtils",
+]
+
+
+class UnixObject(object):
+ """
+ Generic Unix object.
+ """
+
+ def __init__(self, id: int | None, name: str | None) -> None:
+ """
+ :param id: Object ID.
+ :type id: int | None
+ :param name: Object name.
+ :type name: str | None
+ """
+ self.id: int | None = id
+ """
+ ID.
+ """
+
+ self.name: str | None = name
+ """
+ Name.
+ """
+
+ def __str__(self) -> str:
+ return f'({self.id},"{self.name}")'
+
+ def __repr__(self) -> str:
+ return str(self)
+
+ def __eq__(self, o: object) -> bool:
+ if isinstance(o, str):
+ return o == self.name
+ elif isinstance(o, int):
+ return o == self.id
+ elif isinstance(o, tuple):
+ if len(o) != 2 or not isinstance(o[0], int) or not isinstance(o[1], str):
+ raise NotImplementedError(f"Unable to compare {type(o)} with {self.__class__}")
+
+ (id, name) = o
+ return id == self.id and name == self.name
+ elif isinstance(o, UnixObject):
+ # Fallback to identity comparison
+ return NotImplemented
+
+ raise NotImplementedError(f"Unable to compare {type(o)} with {self.__class__}")
+
+
+class UnixUser(UnixObject):
+ """
+ Unix user.
+ """
+
+ pass
+
+
+class UnixGroup(UnixObject):
+ """
+ Unix group.
+ """
+
+ pass
+
+
+class IdEntry(object):
+ """
+ Result of ``id``
+ """
+
+ def __init__(self, user: UnixUser, group: UnixGroup, groups: list[UnixGroup]) -> None:
+ self.user: UnixUser = user
+ """
+ User information.
+ """
+
+ self.group: UnixGroup = group
+ """
+ Primary group.
+ """
+
+ self.groups: list[UnixGroup] = groups
+ """
+ Secondary groups.
+ """
+
+ def memberof(self, groups: int | str | tuple[int, str] | list[int | str | tuple[int, str]]) -> bool:
+ """
+ Check if the user is member of give group(s).
+
+ Group specification can be either a single gid or group name. But it can
+ be also a tuple of (gid, name) where both gid and name must match or list
+ of groups where the user must be member of all given groups.
+
+ :param groups: _description_
+ :type groups: int | str | tuple
+ :return: _description_
+ :rtype: bool
+ """
+ if isinstance(groups, (int, str, tuple)):
+ return groups in self.groups
+
+ return all(x in self.groups for x in groups)
+
+ def __str__(self) -> str:
+ return f"{{user={str(self.user)},group={str(self.group)},groups={str(self.groups)}}}"
+
+ def __repr__(self) -> str:
+ return str(self)
+
+ @classmethod
+ def FromDict(cls, d: dict[str, Any]) -> IdEntry:
+ user = UnixUser(d["uid"]["id"], d["uid"].get("name", None))
+ group = UnixGroup(d["gid"]["id"], d["gid"].get("name", None))
+ groups = []
+
+ for secondary_group in d["groups"]:
+ groups.append(UnixGroup(secondary_group["id"], secondary_group.get("name", None)))
+
+ return cls(user, group, groups)
+
+ @classmethod
+ def FromOutput(cls, stdout: str) -> IdEntry:
+ jcresult = jc.parse("id", stdout)
+
+ if not isinstance(jcresult, dict):
+ raise TypeError(f"Unexpected type: {type(jcresult)}, expecting dict")
+
+ return cls.FromDict(jcresult)
+
+
+class PasswdEntry(object):
+ """
+ Result of ``getent passwd``
+ """
+
+ def __init__(self, name: str, password: str, uid: int, gid: int, gecos: str, home: str, shell: str) -> None:
+ self.name: str | None = name
+ """
+ User name.
+ """
+
+ self.password: str | None = password
+ """
+ User password.
+ """
+
+ self.uid: int = uid
+ """
+ User id.
+ """
+
+ self.gid: int = gid
+ """
+ Group id.
+ """
+
+ self.gecos: str | None = gecos
+ """
+ GECOS.
+ """
+
+ self.home: str | None = home
+ """
+ Home directory.
+ """
+
+ self.shell: str | None = shell
+ """
+ Login shell.
+ """
+
+ def __str__(self) -> str:
+ return f"({self.name}:{self.password}:{self.uid}:{self.gid}:{self.gecos}:{self.home}:{self.shell})"
+
+ def __repr__(self) -> str:
+ return str(self)
+
+ @classmethod
+ def FromDict(cls, d: dict[str, Any]) -> PasswdEntry:
+ return cls(
+ name=d.get("username", None),
+ password=d.get("password", None),
+ uid=d.get("uid", None),
+ gid=d.get("gid", None),
+ gecos=d.get("comment", None),
+ home=d.get("home", None),
+ shell=d.get("shell", None),
+ )
+
+ @classmethod
+ def FromOutput(cls, stdout: str) -> PasswdEntry:
+ result = jc.parse("passwd", stdout)
+
+ if not isinstance(result, list):
+ raise TypeError(f"Unexpected type: {type(result)}, expecting list")
+
+ if len(result) != 1:
+ raise ValueError("More then one entry was returned")
+
+ return cls.FromDict(result[0])
+
+
+class GroupEntry(object):
+ """
+ Result of ``getent group``
+ """
+
+ def __init__(self, name: str, password: str, gid: int, members: list[str]) -> None:
+ self.name: str | None = name
+ """
+ Group name.
+ """
+
+ self.password: str | None = password
+ """
+ Group password.
+ """
+
+ self.gid: int = gid
+ """
+ Group id.
+ """
+
+ self.members: list[str] = members
+ """
+ Group members.
+ """
+
+ def __str__(self) -> str:
+ return f'({self.name}:{self.password}:{self.gid}:{",".join(self.members)})'
+
+ def __repr__(self) -> str:
+ return str(self)
+
+ @classmethod
+ def FromDict(cls, d: dict[str, Any]) -> GroupEntry:
+ return cls(
+ name=d.get("group_name", None),
+ password=d.get("password", None),
+ gid=d.get("gid", None),
+ members=d.get("members", []),
+ )
+
+ @classmethod
+ def FromOutput(cls, stdout: str) -> GroupEntry:
+ result = jc.parse("group", stdout)
+
+ if not isinstance(result, list):
+ raise TypeError(f"Unexpected type: {type(result)}, expecting list")
+
+ if len(result) != 1:
+ raise ValueError("More then one entry was returned")
+
+ return cls.FromDict(result[0])
+
+
+class InitgroupsEntry(object):
+ """
+ Result of ``getent initgroups``
+
+ If user does not exist or does not have any supplementary groups then ``self.groups`` is empty.
+ """
+
+ def __init__(self, name: str, groups: list[int]) -> None:
+ self.name: str = name
+ """
+ Exact username for which ``initgroups`` was called
+ """
+
+ self.groups: list[int] = groups
+ """
+ Group ids that ``name`` is member of.
+ """
+
+ def __str__(self) -> str:
+ return f'({self.name}:{",".join([str(i) for i in self.groups])})'
+
+ def __repr__(self) -> str:
+ return str(self)
+
+ def memberof(self, groups: list[int]) -> bool:
+ """
+ Check if the user is member of given groups.
+
+ This method checks only supplementary groups not the primary group.
+
+ :param groups: List of group ids
+ :type groups: list[int]
+ :return: If user is member of all given groups True, otherwise False.
+ :rtype: bool
+ """
+
+ return all(x in self.groups for x in groups)
+
+ @classmethod
+ def FromDict(cls, d: dict[str, Any]) -> InitgroupsEntry:
+ return cls(
+ name=d["name"],
+ groups=d.get("groups", []),
+ )
+
+ @classmethod
+ def FromOutput(cls, stdout: str) -> InitgroupsEntry:
+ result: list[str] = stdout.split()
+
+ dictionary: dict[str, str | list[int]] = {}
+ dictionary["name"] = result[0]
+
+ if len(result) > 1:
+ dictionary["groups"] = [int(x) for x in result[1:]]
+
+ return cls.FromDict(dictionary)
+
+
+class LinuxToolsUtils(MultihostUtility[MultihostHost]):
+ """
+ Run various standard commands on remote host.
+ """
+
+ def __init__(self, host: MultihostHost) -> None:
+ """
+ :param host: Remote host.
+ :type host: MultihostHost
+ """
+ super().__init__(host)
+
+ self.getent: GetentUtils = GetentUtils(host)
+ """
+ Run ``getent`` command.
+ """
+
+ def id(self, name: str | int) -> IdEntry | None:
+ """
+ Run ``id`` command.
+
+ :param name: User name or id.
+ :type name: str | int
+ :return: id data, None if not found
+ :rtype: IdEntry | None
+ """
+ command = self.host.conn.exec(["id", name], raise_on_error=False)
+ if command.rc != 0:
+ return None
+
+ return IdEntry.FromOutput(command.stdout)
+
+ def grep(self, pattern: str, paths: str | list[str], args: list[str] | None = None) -> bool:
+ """
+ Run ``grep`` command.
+
+ :param pattern: Pattern to match.
+ :type pattern: str
+ :param paths: Paths to search.
+ :type paths: str | list[str]
+ :param args: Additional arguments to ``grep`` command, defaults to None.
+ :type args: list[str] | None, optional
+ :return: True if grep returned 0, False otherwise.
+ :rtype: bool
+ """
+ if args is None:
+ args = []
+
+ paths = [paths] if isinstance(paths, str) else paths
+ command = self.host.conn.exec(["grep", *args, pattern, *paths])
+
+ return command.rc == 0
+
+
+class KillCommand(object):
+ def __init__(self, host: MultihostHost, process: Process, pid: int) -> None:
+ self.host = host
+ self.process = process
+ self.pid = pid
+ self.__killed: bool = False
+
+ def kill(self) -> None:
+ if self.__killed:
+ return
+
+ self.host.conn.exec(["kill", self.pid])
+ self.__killed = True
+
+ def __enter__(self) -> KillCommand:
+ return self
+
+ def __exit__(self, exception_type, exception_value, traceback) -> None:
+ self.kill()
+ self.process.wait()
+
+
+class GetentUtils(MultihostUtility[MultihostHost]):
+ """
+ Interface to getent command.
+ """
+
+ def __init__(self, host: MultihostHost) -> None:
+ """
+ :param host: Remote host.
+ :type host: MultihostHost
+ """
+ super().__init__(host)
+
+ def passwd(self, name: str | int, *, service: str | None = None) -> PasswdEntry | None:
+ """
+ Call ``getent passwd $name``
+
+ :param name: User name or id.
+ :type name: str | int
+ :param service: Service used, defaults to None
+ :type service: str | None
+ :return: passwd data, None if not found
+ :rtype: PasswdEntry | None
+ """
+ return self.__exec(PasswdEntry, "passwd", name, service)
+
+ def group(self, name: str | int, *, service: str | None = None) -> GroupEntry | None:
+ """
+ Call ``getent group $name``
+
+ :param name: Group name or id.
+ :type name: str | int
+ :param service: Service used, defaults to None
+ :type service: str | None
+ :return: group data, None if not found
+ :rtype: PasswdEntry | None
+ """
+ return self.__exec(GroupEntry, "group", name, service)
+
+ def initgroups(self, name: str, *, service: str | None = None) -> InitgroupsEntry:
+ """
+ Call ``getent initgroups $name``
+
+ If ``name`` does not exist, group list is empty. This is standard behavior of ``getent initgroups``
+
+ :param name: User name.
+ :type name: str
+ :param service: Service used, defaults to None
+ :type service: str | None
+ :return: Initgroups data
+ :rtype: InitgroupsEntry
+ """
+ return self.__exec(InitgroupsEntry, "initgroups", name, service)
+
+ def __exec(self, cls, cmd: str, name: str | int, service: str | None = None) -> Any:
+ args = []
+ if service is not None:
+ args = ["-s", service]
+
+ command = self.host.conn.exec(["getent", *args, cmd, name], raise_on_error=False)
+ if command.rc != 0:
+ return None
+
+ return cls.FromOutput(command.stdout)