]> git.ipfire.org Git - thirdparty/mkosi.git/commitdiff
Pass credentials as files where applicable
authorDaan De Meyer <daan.j.demeyer@gmail.com>
Sun, 23 Mar 2025 18:36:49 +0000 (19:36 +0100)
committerDaan De Meyer <daan.j.demeyer@gmail.com>
Fri, 12 Sep 2025 11:28:15 +0000 (13:28 +0200)
- Credentials can be rather large, which leads to huge command lines
  if we pass them as a (encoded) string
- Credentials can be security sensitive, and passing them as a string
  makes it very easy to discover them via `ps` or similar.

Let's solve both issues by always passing credentials as files.

Fixes #3856

mkosi/__init__.py
mkosi/config.py
mkosi/qemu.py
mkosi/vmspawn.py

index d7444145e2803ca4c99aab579c47dc6006ea408a..62f7e2f2889f8c228a450abc968484f6f20b6737 100644 (file)
@@ -4280,14 +4280,12 @@ def run_shell(args: Args, config: Config) -> None:
 
     # Underscores are not allowed in machine names so replace them with hyphens.
     name = config.machine_or_name().replace("_", "-")
-    cmdline += ["--machine", name]
-
-    for k, v in finalize_credentials(config).items():
-        cmdline += [f"--set-credential={k}:{v}"]
-
-    cmdline += ["--register", yes_no(finalize_register(config))]
+    cmdline += ["--machine", name, "--register", yes_no(finalize_register(config))]
 
     with contextlib.ExitStack() as stack:
+        for f in finalize_credentials(config, stack).iterdir():
+            cmdline += [f"--load-credential={f.name}:{f}"]
+
         # Make sure the latest nspawn settings are always used.
         if config.nspawn_settings:
             if not (config.output_dir_or_cwd() / f"{name}.nspawn").exists():
index d61cc6f517c4c0f67608a9c79d4940f6997ca0f9..ba9afc565d5fa47494414ce9c90b96be1347d650 100644 (file)
@@ -1216,12 +1216,15 @@ def config_match_version(match: str, value: str) -> bool:
 def config_make_dict_parser(
     *,
     delimiter: Optional[str] = None,
-    parse: Callable[[str], tuple[str, str]],
+    parse: Callable[[str], tuple[str, PathString]],
     unescape: bool = False,
     allow_paths: bool = False,
     reset: bool = True,
-) -> ConfigParseCallback[dict[str, str]]:
-    def config_parse_dict(value: Optional[str], old: Optional[dict[str, str]]) -> Optional[dict[str, str]]:
+) -> ConfigParseCallback[dict[str, PathString]]:
+    def config_parse_dict(
+        value: Optional[str],
+        old: Optional[dict[str, PathString]],
+    ) -> Optional[dict[str, PathString]]:
         new = old.copy() if old else {}
 
         if value is None:
@@ -1229,21 +1232,11 @@ def config_make_dict_parser(
 
         if allow_paths and value and "=" not in value:
             if Path(value).is_dir():
-                for p in sorted(Path(value).iterdir()):
-                    if p.is_dir():
-                        continue
-
-                    if os.access(p, os.X_OK):
-                        new[p.name] = run([p], stdout=subprocess.PIPE, env=os.environ).stdout
-                    else:
-                        new[p.name] = p.read_text()
+                new.update({p.name: p.absolute() for p in sorted(Path(value).iterdir()) if not p.is_dir()})
             elif (p := Path(value)).exists():
-                if os.access(p, os.X_OK):
-                    new[p.name] = run([p], stdout=subprocess.PIPE, env=os.environ).stdout
-                else:
-                    new[p.name] = p.read_text()
+                new[p.name] = p.absolute()
             else:
-                die(f"{p} does not exist")
+                die(f"{p.absolute()} does not exist")
 
             return new
 
@@ -2103,7 +2096,7 @@ class Config:
 
     nspawn_settings: Optional[Path]
     ephemeral: bool
-    credentials: dict[str, str]
+    credentials: dict[str, PathString]
     kernel_command_line_extra: list[str]
     register: ConfigFeature
     storage_target_mode: ConfigFeature
index c143171f0a6e5a5ca9dd329b12b06c576cf6eff7..764507bff44a2c793db706d38cbcbfc5ad510134 100644 (file)
@@ -910,13 +910,21 @@ def finalize_kernel_command_line_extra(config: Config) -> list[str]:
     return cmdline
 
 
-def finalize_credentials(config: Config) -> dict[str, str]:
-    creds = {
-        "firstboot.locale": "C.UTF-8",
-        **config.credentials,
-    }
+def finalize_credentials(config: Config, stack: contextlib.ExitStack) -> Path:
+    d = Path(stack.enter_context(tempfile.TemporaryDirectory(prefix="mkosi-credentials-")))
+
+    (d / "firstboot.locale").write_text("C.UTF-8")
+
+    for k, v in config.credentials.items():
+        with (d / k).open("w") as f:
+            if isinstance(v, str):
+                f.write(v)
+            elif os.access(v, os.X_OK):
+                run([v], stdout=f, env=os.environ)
+            else:
+                f.write(v.read_text())
 
-    if "firstboot.timezone" not in creds:
+    if not (d / "firstboot.timezone").exists():
         if config.find_binary("timedatectl"):
             tz = run(
                 ["timedatectl", "show", "-p", "Timezone", "--value"],
@@ -928,9 +936,9 @@ def finalize_credentials(config: Config) -> dict[str, str]:
         else:
             tz = "UTC"
 
-        creds["firstboot.timezone"] = tz
+        (d / "firstboot.timezone").write_text(tz)
 
-    if "ssh.authorized_keys.root" not in creds:
+    if not (d / "ssh.authorized_keys.root").exists():
         if config.ssh_certificate:
             pubkey = run(
                 ["openssl", "x509", "-in", workdir(config.ssh_certificate), "-pubkey", "-noout"],
@@ -940,22 +948,24 @@ def finalize_credentials(config: Config) -> dict[str, str]:
                     options=["--ro-bind", config.ssh_certificate, workdir(config.ssh_certificate)],
                 ),
             ).stdout.strip()
-            sshpubkey = run(
-                ["ssh-keygen", "-f", "/dev/stdin", "-i", "-m", "PKCS8"],
-                input=pubkey,
-                stdout=subprocess.PIPE,
-                # ssh-keygen insists on being able to resolve the current user which doesn't always work
-                # (think sssd or similar) so let's switch to root which is always resolvable.
-                sandbox=config.sandbox(options=["--become-root", "--ro-bind", "/etc/passwd", "/etc/passwd"]),
-            ).stdout.strip()
-            creds["ssh.authorized_keys.root"] = sshpubkey
+            with (d / "ssh.authorized_keys.root").open("w") as f:
+                run(
+                    ["ssh-keygen", "-f", "/dev/stdin", "-i", "-m", "PKCS8"],
+                    input=pubkey,
+                    stdout=f,
+                    # ssh-keygen insists on being able to resolve the current user which doesn't always work
+                    # (think sssd or similar) so let's switch to root which is always resolvable.
+                    sandbox=config.sandbox(
+                        options=["--become-root", "--ro-bind", "/etc/passwd", "/etc/passwd"]
+                    ),
+                )
         elif config.ssh in (Ssh.always, Ssh.runtime):
             die(
                 "Ssh= is enabled but no SSH certificate was found",
                 hint="Run 'mkosi genkey' to automatically create one",
             )
 
-    return creds
+    return d
 
 
 def scope_cmd(
@@ -1390,10 +1400,13 @@ def run_qemu(args: Args, config: Config) -> None:
                 ]  # fmt: skip
                 kcl += ["root=root", "rootfstype=virtiofs"]
 
-        credentials = finalize_credentials(config)
+        credentials = finalize_credentials(config, stack)
 
         def add_virtiofs_mount(
-            sock: Path, dst: PathString, cmdline: list[PathString], credentials: dict[str, str]
+            sock: Path,
+            dst: PathString,
+            cmdline: list[PathString],
+            credentials: Path,
         ) -> None:
             tag = os.fspath(dst)
             if len(tag.encode()) > VIRTIOFS_MAX_TAG_LEN:
@@ -1404,13 +1417,16 @@ def run_qemu(args: Args, config: Config) -> None:
                 "-device", f"vhost-user-fs-pci,queue-size=1024,chardev={sock.name},tag={tag}",
             ]  # fmt: skip
 
-            if "fstab.extra" not in credentials:
-                credentials["fstab.extra"] = ""
+            if not (credentials / "fstab.extra").exists():
+                fstab = ""
+            else:
+                fstab = (credentials / "fstab.extra").read_text()
 
-            if credentials["fstab.extra"] and not credentials["fstab.extra"][-1] == "\n":
-                credentials["fstab.extra"] += "\n"
+            if fstab and not fstab[-1] == "\n":
+                fstab += "\n"
 
-            credentials["fstab.extra"] += f"{tag} {dst} virtiofs x-initrd.mount\n"
+            fstab += f"{tag} {dst} virtiofs x-initrd.mount\n"
+            (credentials / "fstab.extra").write_text(fstab)
 
         if config.runtime_build_sources:
             for t in config.build_sources:
@@ -1426,10 +1442,6 @@ def run_qemu(args: Args, config: Config) -> None:
             sock = stack.enter_context(start_virtiofsd(config, tree.source))
             add_virtiofs_mount(sock, Path("/root/src") / (tree.target or ""), cmdline, credentials)
 
-        if config.runtime_home and (p := current_home_dir()):
-            sock = stack.enter_context(start_virtiofsd(config, p))
-            add_virtiofs_mount(sock, Path("/root"), cmdline, credentials)
-
         if want_scratch(config) or config.output_format in (OutputFormat.disk, OutputFormat.esp):
             cmdline += ["-device", "virtio-scsi-pci,id=mkosi"]
 
@@ -1503,25 +1515,29 @@ def run_qemu(args: Args, config: Config) -> None:
 
         if QemuDeviceNode.vhost_vsock in qemu_device_fds:
             addr, notify = stack.enter_context(vsock_notify_handler())
-            credentials["vmm.notify_socket"] = addr
+            (credentials / "vmm.notify_socket").write_text(addr)
 
         if config.forward_journal:
-            credentials["journal.forward_to_socket"] = stack.enter_context(
-                start_journal_remote_vsock(config)
+            (credentials / "journal.forward_to_socket").write_text(
+                stack.enter_context(start_journal_remote_vsock(config))
             )
 
-        for k, v in credentials.items():
-            payload = base64.b64encode(v.encode()).decode()
+        smbiosdir = Path(stack.enter_context(tempfile.TemporaryDirectory(prefix="mkosi-smbios-")))
+
+        for p in credentials.iterdir():
+            payload = base64.b64encode(p.read_bytes())
+
             if config.architecture.supports_smbios(firmware):
-                cmdline += ["-smbios", f"type=11,value=io.systemd.credential.binary:{k}={payload}"]
+                with (smbiosdir / p.name).open("wb") as f:
+                    f.write(f"io.systemd.credential.binary:{p.name}=".encode())
+                    f.write(payload)
+
+                cmdline += ["-smbios", f"type=11,path={smbiosdir / p.name}"]
             # qemu's fw_cfg device only supports keys up to 55 characters long.
-            elif config.architecture.supports_fw_cfg() and len(k) <= 55 - len("opt/io.systemd.credentials/"):
-                f = stack.enter_context(tempfile.NamedTemporaryFile(prefix="mkosi-fw-cfg-", mode="w"))
-                f.write(v)
-                f.flush()
-                cmdline += ["-fw_cfg", f"name=opt/io.systemd.credentials/{k},file={f.name}"]
+            elif config.architecture.supports_fw_cfg() and len(f"opt/io.systemd.credentials/{p.name}") <= 55:
+                cmdline += ["-fw_cfg", f"name=opt/io.systemd.credentials/{p.name},file={p}"]
             elif kernel:
-                kcl += [f"systemd.set_credential_binary={k}:{payload}"]
+                kcl += [f"systemd.set_credential_binary={p.name}:{payload.decode()}"]
 
         kcl += finalize_kernel_command_line_extra(config)
 
index 0d04d12c8590f6c48942c70d5cfcdcf1de8a1167..b32e5027b5f9bf3517afc832264946b372284afa 100644 (file)
@@ -68,9 +68,10 @@ def run_vmspawn(args: Args, config: Config) -> None:
     elif config.runtime_network == Network.interface:
         cmdline += ["--network-tap"]
 
-    cmdline += [f"--set-credential={k}:{v}" for k, v in finalize_credentials(config).items()]
-
     with contextlib.ExitStack() as stack:
+        for f in finalize_credentials(config, stack).iterdir():
+            cmdline += [f"--load-credential={f.name}:{f}"]
+
         fname = stack.enter_context(copy_ephemeral(config, config.output_dir_or_cwd() / config.output))
 
         apply_runtime_size(config, fname)