)
-def luks_open(dev: str, passphrase: Dict[str, str]) -> str:
- name = str(uuid.uuid4())
-
- if passphrase["type"] == "stdin":
- passphrase_content = (passphrase["content"] + "\n").encode("utf-8")
- run(["cryptsetup", "open", "--type", "luks", dev, name], input=passphrase_content)
- else:
- assert passphrase["type"] == "file"
- run(["cryptsetup", "--key-file", passphrase["content"], "open", "--type", "luks", dev, name])
-
- return os.path.join("/dev/mapper", name)
-
-
-def luks_close(dev: Optional[str], text: str) -> None:
- if dev is None:
- return
-
- with complete_step(text):
- run(["cryptsetup", "close", dev])
-
-
def luks_format_root(
args: CommandLineArguments,
loopdev: str,
luks_format(partition(loopdev, args.tmp_partno), args.passphrase)
+@contextlib.contextmanager
+def luks_open(dev: str, passphrase: Dict[str, str], partition: str) -> Generator[str, None, None]:
+ name = str(uuid.uuid4())
+
+ with complete_step(f"Setting up LUKS on {partition}…"):
+ if passphrase["type"] == "stdin":
+ passphrase_content = (passphrase["content"] + "\n").encode("utf-8")
+ run(["cryptsetup", "open", "--type", "luks", dev, name], input=passphrase_content)
+ else:
+ assert passphrase["type"] == "file"
+ run(["cryptsetup", "--key-file", passphrase["content"], "open", "--type", "luks", dev, name])
+
+ path = os.path.join("/dev/mapper", name)
+
+ try:
+ yield path
+ finally:
+ with complete_step(f"Closing LUKS {partition}"):
+ run(["cryptsetup", "close", path])
+
+
def luks_setup_root(
args: CommandLineArguments, loopdev: str, do_run_build_script: bool, inserting_generated_root: bool = False
-) -> Optional[str]:
+) -> ContextManager[Optional[str]]:
if args.encrypt != "all":
- return None
+ return contextlib.nullcontext()
if args.root_partno is None:
- return None
+ return contextlib.nullcontext()
if is_generated_root(args) and not inserting_generated_root:
- return None
+ return contextlib.nullcontext()
if do_run_build_script:
- return None
+ return contextlib.nullcontext()
assert args.passphrase is not None
- with complete_step("Opening LUKS root partition…"):
- return luks_open(partition(loopdev, args.root_partno), args.passphrase)
+ return luks_open(partition(loopdev, args.root_partno), args.passphrase, "root partition")
-def luks_setup_home(args: CommandLineArguments, loopdev: str, do_run_build_script: bool) -> Optional[str]:
+def luks_setup_home(
+ args: CommandLineArguments, loopdev: str, do_run_build_script: bool
+) -> ContextManager[Optional[str]]:
if args.encrypt is None:
- return None
+ return contextlib.nullcontext()
if args.home_partno is None:
- return None
+ return contextlib.nullcontext()
if do_run_build_script:
- return None
+ return contextlib.nullcontext()
assert args.passphrase is not None
- with complete_step("Opening LUKS home partition…"):
- return luks_open(partition(loopdev, args.home_partno), args.passphrase)
+ return luks_open(partition(loopdev, args.home_partno), args.passphrase, "home partition")
-def luks_setup_srv(args: CommandLineArguments, loopdev: str, do_run_build_script: bool) -> Optional[str]:
+def luks_setup_srv(
+ args: CommandLineArguments, loopdev: str, do_run_build_script: bool
+) -> ContextManager[Optional[str]]:
if args.encrypt is None:
- return None
+ return contextlib.nullcontext()
if args.srv_partno is None:
- return None
+ return contextlib.nullcontext()
if do_run_build_script:
- return None
+ return contextlib.nullcontext()
assert args.passphrase is not None
- with complete_step("Opening LUKS server data partition…"):
- return luks_open(partition(loopdev, args.srv_partno), args.passphrase)
+ return luks_open(partition(loopdev, args.srv_partno), args.passphrase, "server data partition")
-def luks_setup_var(args: CommandLineArguments, loopdev: str, do_run_build_script: bool) -> Optional[str]:
+def luks_setup_var(
+ args: CommandLineArguments, loopdev: str, do_run_build_script: bool
+) -> ContextManager[Optional[str]]:
if args.encrypt is None:
- return None
+ return contextlib.nullcontext()
if args.var_partno is None:
- return None
+ return contextlib.nullcontext()
if do_run_build_script:
- return None
+ return contextlib.nullcontext()
assert args.passphrase is not None
- with complete_step("Opening LUKS variable data partition…"):
- return luks_open(partition(loopdev, args.var_partno), args.passphrase)
+ return luks_open(partition(loopdev, args.var_partno), args.passphrase, "variable data partition")
-def luks_setup_tmp(args: CommandLineArguments, loopdev: str, do_run_build_script: bool) -> Optional[str]:
+def luks_setup_tmp(
+ args: CommandLineArguments, loopdev: str, do_run_build_script: bool
+) -> ContextManager[Optional[str]]:
if args.encrypt is None:
- return None
+ return contextlib.nullcontext()
if args.tmp_partno is None:
- return None
+ return contextlib.nullcontext()
if do_run_build_script:
- return None
+ return contextlib.nullcontext()
assert args.passphrase is not None
- with complete_step("Opening LUKS temporary data partition…"):
- return luks_open(partition(loopdev, args.tmp_partno), args.passphrase)
+ return luks_open(partition(loopdev, args.tmp_partno), args.passphrase, "temporary data partition")
class LuksSetupOutput(NamedTuple):
return
assert loopdev is not None
- try:
- root = luks_setup_root(args, loopdev, do_run_build_script)
- try:
- home = luks_setup_home(args, loopdev, do_run_build_script)
- try:
- srv = luks_setup_srv(args, loopdev, do_run_build_script)
- try:
- var = luks_setup_var(args, loopdev, do_run_build_script)
- try:
- tmp = luks_setup_tmp(args, loopdev, do_run_build_script)
-
- yield LuksSetupOutput(
- optional_partition(loopdev, args.root_partno) if root is None else root,
- optional_partition(loopdev, args.home_partno) if home is None else home,
- optional_partition(loopdev, args.srv_partno) if srv is None else srv,
- optional_partition(loopdev, args.var_partno) if var is None else var,
- optional_partition(loopdev, args.tmp_partno) if tmp is None else tmp,
- )
- finally:
- luks_close(tmp, "Closing LUKS temporary data partition")
- finally:
- luks_close(var, "Closing LUKS variable data partition")
- finally:
- luks_close(srv, "Closing LUKS server data partition")
- finally:
- luks_close(home, "Closing LUKS home partition")
- finally:
- luks_close(root, "Closing LUKS root partition")
+ with contextlib.ExitStack() as stack:
+ root = stack.enter_context(luks_setup_root(args, loopdev, do_run_build_script))
+ home = stack.enter_context(luks_setup_home(args, loopdev, do_run_build_script))
+ srv = stack.enter_context(luks_setup_srv(args, loopdev, do_run_build_script))
+ var = stack.enter_context(luks_setup_var(args, loopdev, do_run_build_script))
+ tmp = stack.enter_context(luks_setup_tmp(args, loopdev, do_run_build_script))
+
+ yield LuksSetupOutput(
+ optional_partition(loopdev, args.root_partno) if root is None else root,
+ optional_partition(loopdev, args.home_partno) if home is None else home,
+ optional_partition(loopdev, args.srv_partno) if srv is None else srv,
+ optional_partition(loopdev, args.var_partno) if var is None else var,
+ optional_partition(loopdev, args.tmp_partno) if tmp is None else tmp,
+ )
def prepare_root(args: CommandLineArguments, dev: Optional[str], cached: bool) -> None:
if args.password == "":
with complete_step("Deleting root password"):
- def jj(line: str) -> str:
+ def delete_root_pw(line: str) -> str:
if line.startswith("root:"):
return ":".join(["root", ""] + line.split(":")[2:])
return line
- patch_file(os.path.join(root, "etc/passwd"), jj)
+ patch_file(os.path.join(root, "etc/passwd"), delete_root_pw)
elif args.password:
with complete_step("Setting root password"):
if args.password_is_hashed:
else:
password = crypt.crypt(args.password, crypt.mksalt(crypt.METHOD_SHA512))
- def jj(line: str) -> str:
+ def set_root_pw(line: str) -> str:
if line.startswith("root:"):
return ":".join(["root", password] + line.split(":")[2:])
return line
- patch_file(os.path.join(root, "etc/shadow"), jj)
+ patch_file(os.path.join(root, "etc/shadow"), set_root_pw)
def invoke_fstrim(args: CommandLineArguments, root: str, do_run_build_script: bool, for_cache: bool) -> None:
MkosiPrinter.print_step("Writing partition...")
- if args.root_partno == partno:
- luks_format_root(args, loopdev, False, False, True)
- dev = luks_setup_root(args, loopdev, False, True)
- else:
- dev = None
+ with contextlib.ExitStack() as stack:
+ if args.root_partno == partno:
+ luks_format_root(args, loopdev, False, False, True)
+ dev = stack.enter_context(luks_setup_root(args, loopdev, False, True))
+ else:
+ dev = None
- path = dev if dev is not None else partition(loopdev, partno)
- try:
+ path = dev if dev is not None else partition(loopdev, partno)
run(["dd", f"if={blob.name}", f"of={path}", "conv=nocreat,sparse"])
- finally:
- luks_close(dev, "Closing LUKS root partition")
args.ran_sfdisk = True