import os
from enum import Enum, auto
from threading import Thread
-from typing import Any, Callable, Dict, List, Optional, Tuple
+from typing import Any, Callable, Dict, List, Optional, Tuple, TypeVar
from gi.repository import GLib # type: ignore[import]
from pydbus import SystemBus # type: ignore[import]
SESSION = auto()
+def _clean_error_message(msg: str) -> str:
+ if "org.freedesktop.systemd1." in msg:
+ msg = msg.split("org.freedesktop.systemd1.", maxsplit=1)[1]
+ return msg
+
+
+T = TypeVar("T")
+
+
+def _wrap_dbus_errors(func: Callable[..., T]) -> Callable[..., T]:
+ def inner(*args: Any, **kwargs: Any) -> T:
+ try:
+ return func(*args, **kwargs)
+ except GLib.Error as e:
+ raise SubprocessControllerException(_clean_error_message(str(e))) from e
+
+ return inner
+
+
def _create_object_proxy(type_: SystemdType, bus_name: str, path: Optional[str] = None) -> Any:
bus: Any = SystemBus() if type_ is SystemdType.SYSTEM else SessionBus()
systemd = bus.get(bus_name, path)
raise SubprocessControllerException(f"Job completed with state '{result_state}' instead of expected 'done'")
+@_wrap_dbus_errors
def get_unit_file_state(
type_: SystemdType,
unit_name: str,
return _create_manager_proxy(type_).ListUnits()
+@_wrap_dbus_errors
def list_units(type_: SystemdType) -> List[Unit]:
return [Unit(name=str(u[0]), state=str(u[4])) for u in _list_units_internal(type_)] # type: ignore[call-arg]
+@_wrap_dbus_errors
def list_unit_names(type_: SystemdType) -> List[str]:
return [str(u[0]) for u in _list_units_internal(type_)]
+@_wrap_dbus_errors
def reset_failed_unit(typ: SystemdType, unit_name: str) -> None:
systemd = _create_manager_proxy(typ)
systemd.ResetFailedUnit(unit_name)
+@_wrap_dbus_errors
def restart_unit(type_: SystemdType, unit_name: str) -> None:
systemd = _create_manager_proxy(type_)
),
),
("TimeoutStopUSec", GLib.Variant("t", 10000000)),
- ("Restart", GLib.Variant("s", "on-abnormal")),
+ # The manager assumes that the subprocess is always running. When this is not the case, we end up with a hard
+ # crash, because no code is expecting to deal with this state. Therefore, there is no condition under which
+ # kresd instances could be not running ==> we configure systemd to restart them always
+ ("Restart", GLib.Variant("s", "always")),
("LimitNOFILE", GLib.Variant("t", 524288)),
("Environment", GLib.Variant("as", [f"SYSTEMD_INSTANCE={kres_id}"])),
]
],
),
),
- ("Restart", GLib.Variant("s", "on-failure")),
+ ("Restart", GLib.Variant("s", "always")), # see reasoning at kresd instance properties
("RestartUSec", GLib.Variant("t", 30000000)),
("StartLimitIntervalUSec", GLib.Variant("t", 400000000)),
("StartLimitBurst", GLib.Variant("u", 10)),
return val
+@_wrap_dbus_errors
def start_transient_kresd_unit(
config: KresConfig, type_: SystemdType, kres_id: KresID, subprocess_type: SubprocessType
) -> None:
raise SubprocessControllerException(f"Failed to start systemd transient service '{name}'") from e
+@_wrap_dbus_errors
def start_unit(type_: SystemdType, unit_name: str) -> None:
systemd = _create_manager_proxy(type_)
_wait_for_job_completion(systemd, job)
+@_wrap_dbus_errors
def stop_unit(type_: SystemdType, unit_name: str) -> None:
systemd = _create_manager_proxy(type_)
_wait_for_job_completion(systemd, job)
+@_wrap_dbus_errors
def list_unit_files(type_: SystemdType) -> List[str]:
systemd = _create_manager_proxy(type_)
files = systemd.ListUnitFiles()
return [str(x[0]) for x in files]
+@_wrap_dbus_errors
def can_load_unit(type_: SystemdType, unit_name: str) -> bool:
systemd = _create_manager_proxy(type_)
try: