def config_make_dict_parser(
*,
delimiter: Optional[str] = None,
- parse: Callable[[str], tuple[str, str]],
+ parse: Callable[[str], tuple[str, PathString]],
unescape: bool = False,
allow_paths: bool = False,
reset: bool = True,
-) -> ConfigParseCallback[dict[str, str]]:
- def config_parse_dict(value: Optional[str], old: Optional[dict[str, str]]) -> Optional[dict[str, str]]:
+) -> ConfigParseCallback[dict[str, PathString]]:
+ def config_parse_dict(
+ value: Optional[str],
+ old: Optional[dict[str, PathString]],
+ ) -> Optional[dict[str, PathString]]:
new = old.copy() if old else {}
if value is None:
if allow_paths and value and "=" not in value:
if Path(value).is_dir():
- for p in sorted(Path(value).iterdir()):
- if p.is_dir():
- continue
-
- if os.access(p, os.X_OK):
- new[p.name] = run([p], stdout=subprocess.PIPE, env=os.environ).stdout
- else:
- new[p.name] = p.read_text()
+ new.update({p.name: p.absolute() for p in sorted(Path(value).iterdir()) if not p.is_dir()})
elif (p := Path(value)).exists():
- if os.access(p, os.X_OK):
- new[p.name] = run([p], stdout=subprocess.PIPE, env=os.environ).stdout
- else:
- new[p.name] = p.read_text()
+ new[p.name] = p.absolute()
else:
- die(f"{p} does not exist")
+ die(f"{p.absolute()} does not exist")
return new
nspawn_settings: Optional[Path]
ephemeral: bool
- credentials: dict[str, str]
+ credentials: dict[str, PathString]
kernel_command_line_extra: list[str]
register: ConfigFeature
storage_target_mode: ConfigFeature
return cmdline
-def finalize_credentials(config: Config) -> dict[str, str]:
- creds = {
- "firstboot.locale": "C.UTF-8",
- **config.credentials,
- }
+def finalize_credentials(config: Config, stack: contextlib.ExitStack) -> Path:
+ d = Path(stack.enter_context(tempfile.TemporaryDirectory(prefix="mkosi-credentials-")))
+
+ (d / "firstboot.locale").write_text("C.UTF-8")
+
+ for k, v in config.credentials.items():
+ with (d / k).open("w") as f:
+ if isinstance(v, str):
+ f.write(v)
+ elif os.access(v, os.X_OK):
+ run([v], stdout=f, env=os.environ)
+ else:
+ f.write(v.read_text())
- if "firstboot.timezone" not in creds:
+ if not (d / "firstboot.timezone").exists():
if config.find_binary("timedatectl"):
tz = run(
["timedatectl", "show", "-p", "Timezone", "--value"],
else:
tz = "UTC"
- creds["firstboot.timezone"] = tz
+ (d / "firstboot.timezone").write_text(tz)
- if "ssh.authorized_keys.root" not in creds:
+ if not (d / "ssh.authorized_keys.root").exists():
if config.ssh_certificate:
pubkey = run(
["openssl", "x509", "-in", workdir(config.ssh_certificate), "-pubkey", "-noout"],
options=["--ro-bind", config.ssh_certificate, workdir(config.ssh_certificate)],
),
).stdout.strip()
- sshpubkey = run(
- ["ssh-keygen", "-f", "/dev/stdin", "-i", "-m", "PKCS8"],
- input=pubkey,
- stdout=subprocess.PIPE,
- # ssh-keygen insists on being able to resolve the current user which doesn't always work
- # (think sssd or similar) so let's switch to root which is always resolvable.
- sandbox=config.sandbox(options=["--become-root", "--ro-bind", "/etc/passwd", "/etc/passwd"]),
- ).stdout.strip()
- creds["ssh.authorized_keys.root"] = sshpubkey
+ with (d / "ssh.authorized_keys.root").open("w") as f:
+ run(
+ ["ssh-keygen", "-f", "/dev/stdin", "-i", "-m", "PKCS8"],
+ input=pubkey,
+ stdout=f,
+ # ssh-keygen insists on being able to resolve the current user which doesn't always work
+ # (think sssd or similar) so let's switch to root which is always resolvable.
+ sandbox=config.sandbox(
+ options=["--become-root", "--ro-bind", "/etc/passwd", "/etc/passwd"]
+ ),
+ )
elif config.ssh in (Ssh.always, Ssh.runtime):
die(
"Ssh= is enabled but no SSH certificate was found",
hint="Run 'mkosi genkey' to automatically create one",
)
- return creds
+ return d
def scope_cmd(
] # fmt: skip
kcl += ["root=root", "rootfstype=virtiofs"]
- credentials = finalize_credentials(config)
+ credentials = finalize_credentials(config, stack)
def add_virtiofs_mount(
- sock: Path, dst: PathString, cmdline: list[PathString], credentials: dict[str, str]
+ sock: Path,
+ dst: PathString,
+ cmdline: list[PathString],
+ credentials: Path,
) -> None:
tag = os.fspath(dst)
if len(tag.encode()) > VIRTIOFS_MAX_TAG_LEN:
"-device", f"vhost-user-fs-pci,queue-size=1024,chardev={sock.name},tag={tag}",
] # fmt: skip
- if "fstab.extra" not in credentials:
- credentials["fstab.extra"] = ""
+ if not (credentials / "fstab.extra").exists():
+ fstab = ""
+ else:
+ fstab = (credentials / "fstab.extra").read_text()
- if credentials["fstab.extra"] and not credentials["fstab.extra"][-1] == "\n":
- credentials["fstab.extra"] += "\n"
+ if fstab and not fstab[-1] == "\n":
+ fstab += "\n"
- credentials["fstab.extra"] += f"{tag} {dst} virtiofs x-initrd.mount\n"
+ fstab += f"{tag} {dst} virtiofs x-initrd.mount\n"
+ (credentials / "fstab.extra").write_text(fstab)
if config.runtime_build_sources:
for t in config.build_sources:
sock = stack.enter_context(start_virtiofsd(config, tree.source))
add_virtiofs_mount(sock, Path("/root/src") / (tree.target or ""), cmdline, credentials)
- if config.runtime_home and (p := current_home_dir()):
- sock = stack.enter_context(start_virtiofsd(config, p))
- add_virtiofs_mount(sock, Path("/root"), cmdline, credentials)
-
if want_scratch(config) or config.output_format in (OutputFormat.disk, OutputFormat.esp):
cmdline += ["-device", "virtio-scsi-pci,id=mkosi"]
if QemuDeviceNode.vhost_vsock in qemu_device_fds:
addr, notify = stack.enter_context(vsock_notify_handler())
- credentials["vmm.notify_socket"] = addr
+ (credentials / "vmm.notify_socket").write_text(addr)
if config.forward_journal:
- credentials["journal.forward_to_socket"] = stack.enter_context(
- start_journal_remote_vsock(config)
+ (credentials / "journal.forward_to_socket").write_text(
+ stack.enter_context(start_journal_remote_vsock(config))
)
- for k, v in credentials.items():
- payload = base64.b64encode(v.encode()).decode()
+ smbiosdir = Path(stack.enter_context(tempfile.TemporaryDirectory(prefix="mkosi-smbios-")))
+
+ for p in credentials.iterdir():
+ payload = base64.b64encode(p.read_bytes())
+
if config.architecture.supports_smbios(firmware):
- cmdline += ["-smbios", f"type=11,value=io.systemd.credential.binary:{k}={payload}"]
+ with (smbiosdir / p.name).open("wb") as f:
+ f.write(f"io.systemd.credential.binary:{p.name}=".encode())
+ f.write(payload)
+
+ cmdline += ["-smbios", f"type=11,path={smbiosdir / p.name}"]
# qemu's fw_cfg device only supports keys up to 55 characters long.
- elif config.architecture.supports_fw_cfg() and len(k) <= 55 - len("opt/io.systemd.credentials/"):
- f = stack.enter_context(tempfile.NamedTemporaryFile(prefix="mkosi-fw-cfg-", mode="w"))
- f.write(v)
- f.flush()
- cmdline += ["-fw_cfg", f"name=opt/io.systemd.credentials/{k},file={f.name}"]
+ elif config.architecture.supports_fw_cfg() and len(f"opt/io.systemd.credentials/{p.name}") <= 55:
+ cmdline += ["-fw_cfg", f"name=opt/io.systemd.credentials/{p.name},file={p}"]
elif kernel:
- kcl += [f"systemd.set_credential_binary={k}:{payload}"]
+ kcl += [f"systemd.set_credential_binary={p.name}:{payload.decode()}"]
kcl += finalize_kernel_command_line_extra(config)