name=f"mkosi-swtpm-{config.machine_or_name()}",
description=f"swtpm for {config.machine_or_name()}",
),
+ env=scope_env(),
stdout=None if ARG_DEBUG.get() else subprocess.DEVNULL,
)
return None
+def unshare_version() -> str:
+ return run(["unshare", "--version"], stdout=subprocess.PIPE).stdout.strip().split()[-1]
+
+
@contextlib.contextmanager
def start_virtiofsd(config: Config, directory: PathString, *, name: str, selinux: bool = False) -> Iterator[Path]:
uidmap = Path(directory).stat().st_uid == INVOKING_USER.uid
cmdline += ["--fd", str(SD_LISTEN_FDS_START)]
+ name = f"mkosi-virtiofsd-{name}"
+ description = f"virtiofsd for {directory}"
uid = gid = None
runas = []
- if uidmap and os.getuid() != INVOKING_USER.uid:
- uid = INVOKING_USER.uid
- gid = INVOKING_USER.gid
- elif not uidmap and os.getuid() != 0:
+ scope = []
+ if uidmap:
+ uid = INVOKING_USER.uid if os.getuid() != INVOKING_USER.uid else None
+ gid = INVOKING_USER.gid if os.getuid() != INVOKING_USER.uid else None
+ scope = scope_cmd(name=name, description=description, user=uid, group=gid)
+ elif not uidmap and (os.getuid() == 0 or unshare_version() >= "2.38"):
runas = become_root_cmd()
+ scope = scope_cmd(name=name, description=description)
with spawn(
cmdline,
# in the user namespace it spawns, so by specifying --uid 0 --gid 0 we'll get a userns with the current
# uid/gid mapped to root in the userns. --cap-add=all is required to make virtiofsd work. Since it drops
# capabilities itself, we don't bother figuring out the exact set of capabilities it needs.
- user=uid,
- group=gid,
+ user=uid if not scope else None,
+ group=gid if not scope else None,
+ preexec_fn=become_root if not scope else None,
+ env=scope_env() if scope else {},
sandbox=config.sandbox(
binary=virtiofsd,
mounts=[Mount(directory, directory)],
options=["--uid", "0", "--gid", "0", "--cap-add", "all"],
setup=runas,
),
- scope=scope_cmd(
- name=f"mkosi-virtiofsd-{name}",
- description=f"virtiofsd for {directory}",
- user=uid,
- group=gid,
- ),
+ scope=scope,
) as (proc, innerpid):
yield path
kill(proc, innerpid, signal.SIGTERM)
user=config.forward_journal.parent.stat().st_uid if INVOKING_USER.invoked_as_root else None,
group=config.forward_journal.parent.stat().st_gid if INVOKING_USER.invoked_as_root else None,
),
+ env=scope_env(),
foreground=False,
) as (proc, innerpid):
yield
p.unlink(missing_ok=True)
+def scope_env() -> dict[str, str]:
+ if not find_binary("systemd-run"):
+ return {}
+ elif os.getuid() != 0 and "DBUS_SESSION_BUS_ADDRESS" in os.environ and "XDG_RUNTIME_DIR" in os.environ:
+ return {
+ "DBUS_SESSION_BUS_ADDRESS": os.environ["DBUS_SESSION_BUS_ADDRESS"],
+ "XDG_RUNTIME_DIR": os.environ["XDG_RUNTIME_DIR"]
+ }
+ elif os.getuid() == 0 and "DBUS_SYSTEM_ADDRESS" in os.environ:
+ return {"DBUS_SYSTEM_ADDRESS" : os.environ["DBUS_SYSTEM_ADDRESS"]}
+ else:
+ return {}
+
+
def scope_cmd(
name: str,
description: str,
group: Optional[int] = None,
properties: Sequence[str] = (),
) -> list[str]:
+ if not scope_env():
+ return []
+
return [
"systemd-run",
"--system" if os.getuid() == 0 else "--user",