import json
import logging
import os
+import re
import resource
import shutil
import subprocess
Verb,
flatten,
format_rlimit,
+ is_apt_distribution,
patch_file,
prepend_to_environ_path,
tmp_dir,
def make_cpio(root: Path, files: Iterator[Path], output: Path) -> None:
- with complete_step("Creating archive…"):
+ with complete_step(f"Creating cpio {output}…"):
cmd: list[PathString] = [
"cpio", "-o", "--reproducible", "--null", "-H", "newc", "--quiet", "-D", root, "-O", output
]
yield kver.name, kimg
+def module_path_to_name(path: Path) -> str:
+ return path.name.partition(".")[0]
+
+
+def resolve_module_dependencies(root: Path, kver: str, modules: Sequence[str]) -> tuple[list[Path], list[Path]]:
+ """
+ Returns a tuple of lists containing the paths to the module and firmware dependencies of the given list
+ of module names (including the given module paths themselves).
+ """
+ allmodules = root.joinpath("usr/lib/modules").joinpath(kver).joinpath("kernel").glob("**/*.ko*")
+ nametofile = {module_path_to_name(m): m for m in allmodules}
+
+ # We could run modinfo once for each module but that's slow. Luckily we can pass multiple modules to
+ # modinfo and it'll process them all in a single go. We get the modinfo for all modules to build two maps
+ # that map the path of the module to its module dependencies and its firmware dependencies respectively.
+ info = run(["modinfo", "--basedir", root, "--set-version", kver, "--null", *nametofile.keys()],
+ text=True, stdout=subprocess.PIPE).stdout
+
+ moddep = {}
+ firmwaredep = {}
+
+ name = ""
+ depends = []
+ firmware = []
+ for line in info.split("\0"):
+ key, sep, value = line.partition(":")
+ if not sep:
+ key, sep, value = line.partition("=")
+
+ if key in ("depends", "softdep"):
+ for d in value.strip().split(","):
+ if not d:
+ continue
+
+ if d not in nametofile:
+ logging.warning(f"{d} is a dependency of {name} but is not installed, ignoring ")
+ continue
+
+ depends.append(d)
+
+ if key == "firmware":
+ for f in root.joinpath("usr/lib/firmware").glob(f"{value.strip()}*"):
+ firmware.append(f)
+
+ if key == "filename":
+ if name:
+ moddep[name] = depends
+ firmwaredep[name] = firmware
+
+ depends = []
+ firmware = []
+
+ name = module_path_to_name(Path(value))
+
+ # Make sure we add the last module as well.
+ moddep[name] = depends
+ firmwaredep[name] = firmware
+
+ todo = [*modules]
+ mods = set()
+ firmware = set()
+
+ while len(todo) > 0:
+ m = todo.pop()
+ if not m or m in mods:
+ continue
+
+ mods.add(m)
+ todo += moddep.get(m, [])
+ firmware.update(firmwaredep.get(m, []))
+
+ return [nametofile[m] for m in mods], list(firmware)
+
+
def gen_kernel_modules_initrd(state: MkosiState, kver: str) -> Path:
kmods = state.workspace / f"initramfs-kernel-modules-{kver}.img"
def files() -> Iterator[Path]:
- yield state.root.joinpath("usr/lib/modules").relative_to(state.root)
- yield state.root.joinpath("usr/lib/modules").joinpath(kver).relative_to(state.root)
- for p in find_files(state.root / "usr/lib/modules" / kver, state.root):
- if p.name != "vmlinuz":
- yield p
+ modulesd = state.root / "usr/lib/modules" / kver
+ yield modulesd.parent.relative_to(state.root)
+ yield modulesd.relative_to(state.root)
+ yield modulesd.joinpath("kernel").relative_to(state.root)
+
+ for p in modulesd.joinpath("kernel").glob("**/*"):
+ if p.is_dir():
+ yield p.relative_to(state.root)
+
+ modules = set(modulesd.joinpath("kernel").glob("**/*.ko*"))
+
+ for pattern in state.config.kernel_modules_initrd_exclude:
+ regex = re.compile(pattern)
+ exclude = set()
+ for m in modules:
+ rel = m.relative_to(modulesd / "kernel")
+ if regex.search(str(rel)):
+ logging.debug(f"Excluding module {rel}")
+ exclude.add(m)
+
+ modules -= exclude
+
+ if state.config.kernel_modules_initrd_include:
+ include: set[Path] = set()
+ for pattern in state.config.kernel_modules_initrd_include:
+ regex = re.compile(pattern)
+ for m in modules:
+ rel = m.relative_to(modulesd / "kernel")
+ if regex.search(str(rel)):
+ logging.debug(f"Including module {m}")
+ include.add(m)
+
+ modules = include
+
+ names = [module_path_to_name(m) for m in modules]
+ mods, firmware = resolve_module_dependencies(state.root, kver, names)
+
+ for m in mods:
+ rel = m.relative_to(state.root)
+ logging.debug(f"Adding module {rel}")
+ yield rel
+
+ for fw in firmware:
+ rel = fw.relative_to(state.root)
+ logging.debug(f"Adding firmware {rel}")
+ yield rel
+
+ for p in modulesd.iterdir():
+ if not p.name.startswith("modules"):
+ continue
+
+ yield p.relative_to(state.root)
+
+ if modulesd.joinpath("vdso").exists():
+ yield modulesd.joinpath("vdso").relative_to(state.root)
+
+ for p in modulesd.joinpath("vdso").iterdir():
+ yield p.relative_to(state.root)
with complete_step(f"Generating kernel modules initrd for kernel {kver}"):
make_cpio(state.root, files(), kmods)
+ # Debian/Ubuntu do not compress their kernel modules, so we compress the initramfs instead. Note that
+ # this is not ideal since the compressed kernel modules will all be decompressed on boot which
+ # requires significant memory.
+ if is_apt_distribution(state.config.distribution):
+ maybe_compress(state, Compression.zst, kmods, kmods)
+
return kmods
"--repository-key-check", yes_no(state.config.repository_key_check),
"--repositories", ",".join(state.config.repositories),
"--repo-dir", ",".join(str(p) for p in state.config.repo_dirs),
- "--compress-output", str(state.config.compress_output),
+ *(["--compress-output", str(state.config.compress_output)] if state.config.compress_output else []),
"--with-network", yes_no(state.config.with_network),
"--cache-only", yes_no(state.config.cache_only),
*(["--output-dir", str(state.config.output_dir)] if state.config.output_dir else []),
"--pcr-banks", "sha1,sha256",
]
- cmd += [state.root / kimg] + initrds + [gen_kernel_modules_initrd(state, kver)]
+ cmd += [state.root / kimg] + initrds
+
+ if state.config.kernel_modules_initrd:
+ cmd += [gen_kernel_modules_initrd(state, kver)]
run(cmd)
die(f"Unknown compression {compression}")
-def maybe_compress(state: MkosiState, src: Path, dst: Optional[Path] = None) -> None:
- if not state.config.compress_output or src.is_dir():
+def maybe_compress(state: MkosiState, compression: Compression, src: Path, dst: Optional[Path] = None) -> None:
+ if not compression or src.is_dir():
if dst:
shutil.move(src, dst)
return
if not dst:
- dst = src.parent / f"{src.name}.{state.config.compress_output}"
+ dst = src.parent / f"{src.name}.{compression}"
with complete_step(f"Compressing {src}"):
with src.open("rb") as i:
src.unlink() # if src == dst, make sure dst doesn't truncate the src file but creates a new file.
with dst.open("wb") as o:
- run(compressor_command(state.config.compress_output),
- user=state.uid, group=state.gid, stdin=i, stdout=o)
+ run(compressor_command(compression), user=state.uid, group=state.gid, stdin=i, stdout=o)
def copy_nspawn_settings(state: MkosiState) -> None:
for kver, _ in gen_kernel_images(state):
with complete_step(f"Running depmod for {kver}"):
- run(["depmod", "--basedir", state.root, kver])
+ run(["depmod", "--all", "--basedir", state.root, kver])
def run_sysusers(state: MkosiState) -> None:
_, split_paths = invoke_repart(state, split=True)
for p in split_paths:
- maybe_compress(state, p)
+ maybe_compress(state, state.config.compress_output, p)
make_tar(state)
make_initrd(state)
with complete_step("Building image"):
build_image(state, manifest=manifest, for_cache=False)
- maybe_compress(state,
+ maybe_compress(state, state.config.compress_output,
state.staging / state.config.output_with_format,
state.staging / state.config.output_with_compression)