from mkosi.util import (
Compression,
Distribution,
+ InvokingUser,
ManifestFormat,
OutputFormat,
Verb,
- current_user,
flatten,
format_rlimit,
patch_file,
cmdline += ["--"]
cmdline += config.cmdline
- uid = current_user().uid
+ uid = InvokingUser.uid()
if config.output_format == OutputFormat.directory:
acl_toggle_remove(config, config.output, uid, allow=False)
def expand_specifier(s: str) -> str:
- return s.replace("%u", current_user().name)
+ return s.replace("%u", InvokingUser.name())
def needs_build(config: Union[argparse.Namespace, MkosiConfig]) -> bool:
from mkosi.util import (
Compression,
Distribution,
+ InvokingUser,
ManifestFormat,
OutputFormat,
Verb,
chdir,
- current_user,
detect_distribution,
flatten,
is_apt_distribution,
path = Path(value)
if expanduser:
- user = current_user()
- if path.is_relative_to("~") and not user.is_running_user():
- path = user.home / path.relative_to("~")
- else:
- path = path.expanduser()
+ if path.is_relative_to("~") and not InvokingUser.is_running_user():
+ path = InvokingUser.home() / path.relative_to("~")
+ path = path.expanduser()
if required and not path.exists():
die(f"{value} does not exist")
from mkosi.log import ARG_DEBUG, ARG_DEBUG_SHELL, die
from mkosi.types import _FILE, CompletedProcess, PathString, Popen
-from mkosi.util import current_user
+from mkosi.util import InvokingUser
CLONE_NEWNS = 0x00020000
CLONE_NEWUSER = 0x10000000
The function returns the UID-GID pair of the invoking user in the namespace (65436, 65436).
"""
if os.getuid() == 0:
- user = current_user()
- return user.uid, user.gid
+ return InvokingUser.uid_gid()
subuid = read_subrange(Path("/etc/subuid"))
subgid = read_subrange(Path("/etc/subgid"))
import ast
import contextlib
-import dataclasses
import enum
import functools
import getpass
return list(itertools.chain.from_iterable(lists))
-@dataclasses.dataclass
class InvokingUser:
- _pw: Optional[pwd.struct_passwd] = None
+ @staticmethod
+ def _uid_from_env() -> Optional[str]:
+ return os.getenv("SUDO_UID") or os.getenv("PKEXEC_UID")
@classmethod
- def for_uid(cls, uid: int) -> "InvokingUser":
- return cls(pwd.getpwuid(uid))
-
- @property
- def uid(self) -> int:
- if self._pw is not None:
- return self._pw.pw_uid
- return os.getuid()
-
- @property
- def gid(self) -> int:
- if self._pw is not None:
- return self._pw.pw_gid
- return os.getgid()
-
- @property
- def name(self) -> str:
- if self._pw is not None:
- return self._pw.pw_name
- return getpass.getuser()
+ def uid(cls) -> int:
+ return int(cls._uid_from_env() or os.getuid())
- @property
- def home(self) -> Path:
- if self._pw is not None:
- return Path(self._pw.pw_dir)
- return Path.home()
+ @classmethod
+ def uid_gid(cls) -> tuple[int, int]:
+ if uid := cls._uid_from_env:
+ gid = int(os.getenv("SUDO_GID") or pwd.getpwuid(uid).pw_gid)
+ return uid, gid
+ return os.getuid(), os.getgid()
- def is_running_user(self) -> bool:
- if self._pw is not None:
- return self._pw.pw_uid == os.getuid()
- return True
+ @classmethod
+ def name(cls) -> str:
+ if uid := cls._uid_from_env():
+ return pwd.getpwuid(uid).pw_name
+ return getpass.getuser()
+ @classmethod
+ def home(cls) -> Path:
+ home = os.getenv("SUDO_HOME") or os.getenv("HOME") or pwd.getpwuid(cls.uid()).pw_dir
+ return Path(home)
-def current_user() -> InvokingUser:
- uid = os.getenv("SUDO_UID") or os.getenv("PKEXEC_UID")
- if uid:
- return InvokingUser.for_uid(int(uid))
- return InvokingUser()
+ @classmethod
+ def is_running_user(cls) -> bool:
+ return cls.uid() == os.getuid()
@contextlib.contextmanager