import os
import random
import shutil
+import signal
import socket
import struct
import subprocess
from mkosi.log import ARG_DEBUG, die
from mkosi.mounts import finalize_source_mounts
from mkosi.partition import finalize_root, find_partitions
-from mkosi.run import SD_LISTEN_FDS_START, AsyncioThread, find_binary, fork_and_wait, run, spawn
+from mkosi.run import SD_LISTEN_FDS_START, AsyncioThread, find_binary, fork_and_wait, kill, run, spawn
from mkosi.sandbox import Mount
from mkosi.tree import copy_tree, rmtree
from mkosi.types import PathString
cmdline,
pass_fds=(sock.fileno(),),
sandbox=config.sandbox(mounts=[Mount(state, state)]),
- ) as proc:
+ ) as (proc, innerpid):
allocate_scope(
config,
name=f"mkosi-swtpm-{config.machine_or_name()}",
- pid=proc.pid,
+ pid=innerpid,
description=f"swtpm for {config.machine_or_name()}",
)
yield path
- proc.terminate()
+ kill(proc, innerpid, signal.SIGTERM)
def find_virtiofsd(*, tools: Path = Path("/")) -> Optional[Path]:
mounts=[Mount(directory, directory)],
options=["--uid", "0", "--gid", "0", "--cap-add", "all"],
),
- ) as proc:
+ ) as (proc, innerpid):
allocate_scope(
config,
name=f"mkosi-virtiofsd-{name}",
- pid=proc.pid,
+ pid=innerpid,
description=f"virtiofsd for {directory}",
)
yield path
- proc.terminate()
+ kill(proc, innerpid, signal.SIGTERM)
@contextlib.contextmanager
# If all logs go into a single file, disable compact mode to allow for journal files exceeding 4G.
env={"SYSTEMD_JOURNAL_COMPACT": "0" if config.forward_journal.suffix == ".journal" else "1"},
foreground=False,
- ) as proc:
+ ) as (proc, innerpid):
allocate_scope(
config,
name=f"mkosi-journal-remote-{config.machine_or_name()}",
- pid=proc.pid,
+ pid=innerpid,
description=f"mkosi systemd-journal-remote for {config.machine_or_name()}",
)
yield
- proc.terminate()
+ kill(proc, innerpid, signal.SIGTERM)
+
@contextlib.contextmanager
log=False,
foreground=True,
sandbox=config.sandbox(network=True, devices=True, relaxed=True),
- ) as qemu:
+ ) as (proc, innerpid):
# We have to close these before we wait for qemu otherwise we'll deadlock as qemu will never exit.
for fd in qemu_device_fds.values():
os.close(fd)
allocate_scope(
config,
name=name,
- pid=qemu.pid,
+ pid=innerpid,
description=f"mkosi Virtual Machine {name}",
)
- register_machine(config, qemu.pid, fname)
+ register_machine(config, innerpid, fname)
- if qemu.wait() == 0 and (status := int(notifications.get("EXIT_STATUS", 0))):
+ if proc.wait() == 0 and (status := int(notifications.get("EXIT_STATUS", 0))):
raise subprocess.CalledProcessError(status, cmdline)
preexec_fn=preexec_fn,
success_exit_status=success_exit_status,
sandbox=sandbox,
- ) as process:
+ innerpid=False,
+ ) as (process, _):
out, err = process.communicate(input)
return CompletedProcess(cmdline, process.returncode, out, err)
preexec_fn: Optional[Callable[[], None]] = None,
success_exit_status: Sequence[int] = (0,),
sandbox: AbstractContextManager[Sequence[PathString]] = contextlib.nullcontext([]),
-) -> Iterator[Popen]:
+ innerpid: bool = True,
+) -> Iterator[tuple[Popen, int]]:
assert sorted(set(pass_fds)) == list(pass_fds)
cmdline = [os.fspath(x) for x in cmdline]
# command.
prefix += ["sh", "-c", f"LISTEN_FDS={len(pass_fds)} LISTEN_PID=$$ exec $0 \"$@\""]
+ if prefix and innerpid:
+ r, w = os.pipe2(os.O_CLOEXEC)
+ # Make sure that the write end won't be overridden in preexec() when we're moving fds forward.
+ q = fcntl.fcntl(w, fcntl.F_DUPFD_CLOEXEC, SD_LISTEN_FDS_START + len(pass_fds) + 1)
+ os.close(w)
+ w = q
+ # dash doesn't support working with file descriptors higher than 9 so make sure we use bash.
+ prefix += ["bash", "-c", f"echo $$ >&{w} && exec {w}>&- && exec $0 \"$@\""]
+ else:
+ r, w = (None, None)
+
try:
with subprocess.Popen(
prefix + cmdline,
group=group,
# pass_fds only comes into effect after python has invoked the preexec function, so we make sure that
# pass_fds contains the file descriptors to keep open after we've done our transformation in preexec().
- pass_fds=[SD_LISTEN_FDS_START + i for i in range(len(pass_fds))],
+ pass_fds=[SD_LISTEN_FDS_START + i for i in range(len(pass_fds))] + ([w] if w else []),
env=env,
cwd=cwd,
preexec_fn=preexec,
) as proc:
+ if w:
+ os.close(w)
+ pid = proc.pid
try:
- yield proc
+ if r:
+ with open(r) as f:
+ s = f.read()
+ if s:
+ pid = int(s)
+
+ yield proc, pid
except BaseException:
- proc.terminate()
+ kill(proc, pid, signal.SIGTERM)
raise
finally:
returncode = proc.wait()
return None
+def kill(process: Popen, innerpid: int, signal: int) -> None:
+ process.poll()
+ if process.returncode is not None:
+ return
+
+ try:
+ os.kill(innerpid, signal)
+ # Handle the race condition where the process might exit between us calling poll() and us calling os.kill().
+ except ProcessLookupError:
+ pass
+
+
class AsyncioThread(threading.Thread):
"""
The default threading.Thread() is not interruptable, so we make our own version by using the concurrency
# execute using flock so they don't execute before they can get a lock on the same temporary file, then we
# unshare the user namespace and finally we unlock the temporary file, which allows the newuidmap and newgidmap
# processes to execute. we then wait for the processes to finish before continuing.
- with flock(lock) as fd, spawn(newuidmap) as uidmap, spawn(newgidmap) as gidmap:
+ with (
+ flock(lock) as fd,
+ spawn(newuidmap, innerpid=False) as (uidmap, _),
+ spawn(newgidmap, innerpid=False) as (gidmap, _)
+ ):
unshare(CLONE_NEWUSER)
fcntl.flock(fd, fcntl.LOCK_UN)
uidmap.wait()