]> git.ipfire.org Git - thirdparty/mkosi.git/commitdiff
Refactor ACLs + make sure they apply to base trees
authorDaan De Meyer <daan.j.demeyer@gmail.com>
Sun, 30 Apr 2023 11:24:07 +0000 (13:24 +0200)
committerDaan De Meyer <daan.j.demeyer@gmail.com>
Sun, 30 Apr 2023 20:53:55 +0000 (22:53 +0200)
Let's refactor ACLs so that they're all applied/removed in the same
function. Let's also make sure they apply to base trees as well.

mkosi/__init__.py

index 14681173369f2de2ca819496f02ceb613aa615eb..086ebeef60304924ed92dbbfdee7e264948938e2 100644 (file)
@@ -869,37 +869,16 @@ def calculate_signature(state: MkosiState) -> None:
         )
 
 
-def acl_toggle_remove(config: MkosiConfig, root: Path, uid: int, *, allow: bool) -> None:
-    if not config.acl:
-        return
-
-    ret = run(["setfacl",
-               "--physical",
-               "--modify" if allow else "--remove",
-               f"user:{uid}:rwx" if allow else f"user:{uid}",
-               "-"],
-              check=False,
-              text=True,
-              # Supply files via stdin so we don't clutter --debug run output too much
-              input="\n".join([str(root),
-                               *(e.path for e in cast(Iterator[os.DirEntry[str]], scandir_recursive(root)) if e.is_dir())])
-    )
-    if ret.returncode != 0:
-        logging.warning("Failed to set ACLs, you'll need root privileges to remove some generated files/directories")
-
-
 def save_cache(state: MkosiState) -> None:
     final, build = cache_tree_paths(state.config)
 
     with complete_step("Installing cache copies"):
         unlink_try_hard(final)
         shutil.move(state.cache_overlay, final)
-        acl_toggle_remove(state.config, final, state.uid, allow=True)
 
         if state.config.build_script:
             unlink_try_hard(build)
             shutil.move(state.build_overlay, build)
-            acl_toggle_remove(state.config, build, state.uid, allow=True)
 
 
 def dir_size(path: PathString) -> int:
@@ -1396,7 +1375,6 @@ def reuse_cache_tree(state: MkosiState) -> bool:
 
     with complete_step("Copying cached trees"):
         copy_path(final, state.root)
-        acl_toggle_remove(state.config, state.root, state.uid, allow=False)
         if state.config.build_script:
             state.build_overlay.rmdir()
             state.build_overlay.symlink_to(build)
@@ -1613,6 +1591,39 @@ def need_cache_tree(args: MkosiArgs, state: MkosiState) -> bool:
     return not final.exists() or (state.config.build_script is not None and not build.exists())
 
 
+def setfacl(root: Path, uid: int, allow: bool) -> None:
+    run(["setfacl",
+        "--physical",
+        "--modify" if allow else "--remove",
+        f"user:{uid}:rwx" if allow else f"user:{uid}",
+        "-"],
+        text=True,
+        # Supply files via stdin so we don't clutter --debug run output too much
+        input="\n".join([str(root),
+                        *(e.path for e in cast(Iterator[os.DirEntry[str]], scandir_recursive(root)) if e.is_dir())])
+    )
+
+
+@contextlib.contextmanager
+def acl_toggle_build(state: MkosiState) -> Iterator[None]:
+    if not state.config.acl:
+        yield
+        return
+
+    try:
+        for p in (*state.config.base_trees, state.config.cache_dir):
+            if p and p.is_dir():
+                with complete_step(f"Removing ACLs from {p}"):
+                    setfacl(p, state.uid, allow=False)
+
+        yield
+    finally:
+        for p in (*state.config.base_trees, state.config.cache_dir, *state.config.output_paths()):
+            if p and p.is_dir():
+                with complete_step(f"Adding ACLs to {p}"):
+                    setfacl(p, state.uid, allow=True)
+
+
 def build_stuff(uid: int, gid: int, args: MkosiArgs, config: MkosiConfig) -> None:
     workspace = tempfile.TemporaryDirectory(dir=config.workspace_dir or Path.cwd(), prefix=".mkosi.tmp")
     workspace_dir = Path(workspace.name)
@@ -1635,7 +1646,7 @@ def build_stuff(uid: int, gid: int, args: MkosiArgs, config: MkosiConfig) -> Non
 
     # Make sure tmpfiles' aging doesn't interfere with our workspace
     # while we are working on it.
-    with flock(workspace_dir), workspace:
+    with flock(workspace_dir), workspace, acl_toggle_build(state):
         # If caching is requested, then make sure we have cache trees around we can make use of
         if need_cache_tree(args, state):
             with complete_step("Building cache image"):
@@ -1651,16 +1662,11 @@ def build_stuff(uid: int, gid: int, args: MkosiArgs, config: MkosiConfig) -> Non
         calculate_signature(state)
         save_manifest(state, manifest)
 
-        if state.config.cache_dir:
-            acl_toggle_remove(state.config, state.config.cache_dir, uid, allow=True)
-
         for p in state.config.output_paths():
             if state.staging.joinpath(p.name).exists():
                 shutil.move(state.staging / p.name, p)
                 if p != state.config.output or state.config.output_format != OutputFormat.directory:
                     os.chown(p, uid, gid)
-                else:
-                    acl_toggle_remove(state.config, p, uid, allow=True)
                 if p == state.config.output:
                     compress_output(state.config, p, uid=uid, gid=gid)
 
@@ -1697,6 +1703,23 @@ def nspawn_knows_arg(arg: str) -> bool:
     return "unrecognized option" not in c.stderr
 
 
+@contextlib.contextmanager
+def acl_toggle_boot(config: MkosiConfig) -> Iterator[None]:
+    if not config.acl or config.output_format != OutputFormat.directory:
+        yield
+        return
+
+    uid = InvokingUser.uid()
+
+    try:
+        with complete_step(f"Removing ACLs from {config.output}"):
+            setfacl(config.output, uid, allow=False)
+        yield
+    finally:
+        with complete_step(f"Adding ACLs to {config.output}"):
+            setfacl(config.output, uid, allow=True)
+
+
 def run_shell(args: MkosiArgs, config: MkosiConfig) -> None:
     cmdline: list[PathString] = ["systemd-nspawn", "--quiet"]
 
@@ -1743,16 +1766,8 @@ def run_shell(args: MkosiArgs, config: MkosiConfig) -> None:
         cmdline += ["--"]
         cmdline += args.cmdline
 
-    uid = InvokingUser.uid()
-
-    if config.output_format == OutputFormat.directory:
-        acl_toggle_remove(config, config.output, uid, allow=False)
-
-    try:
+    with acl_toggle_boot(config):
         run(cmdline, stdin=sys.stdin, stdout=sys.stdout, env=os.environ, log=False)
-    finally:
-        if config.output_format == OutputFormat.directory:
-            acl_toggle_remove(config, config.output, uid, allow=True)
 
 
 def find_qemu_binary(config: MkosiConfig) -> str: