@contextlib.contextmanager
-def chdir(directory: PathString) -> Iterator[Path]:
+def chdir(directory: Path) -> Iterator[Path]:
c = os.getcwd()
os.chdir(directory)
try:
return result
-def remove_glob(*patterns: PathString) -> None:
+def remove_glob(*patterns: Path) -> None:
pathgen = (glob.glob(str(pattern)) for pattern in patterns)
paths: set[str] = set(sum(pathgen, [])) # uniquify
for path in paths:
for p in config.output.parent.iterdir():
if p.name.startswith(config.output.name) and "cache" not in p.name:
unlink_try_hard(p)
- unlink_try_hard(f"{config.output}.manifest")
- unlink_try_hard(f"{config.output}.changelog")
+ unlink_try_hard(Path(f"{config.output}.manifest"))
+ unlink_try_hard(Path(f"{config.output}.changelog"))
if config.checksum:
unlink_try_hard(config.output_checksum)
return
-def require_private_file(name: PathString, description: str) -> None:
+def require_private_file(name: Path, description: str) -> None:
mode = os.stat(name).st_mode & 0o777
if mode & 0o007:
warn(dedent(f"""\
return
try:
- require_private_file("mkosi.rootpw", "root password")
+ pwfile = Path("mkosi.rootpw")
+ require_private_file(pwfile, "root password")
- with open("mkosi.rootpw") as f:
- args.password = f.read().strip()
+ args.password = pwfile.read_text().strip()
except FileNotFoundError:
pass
authorized_keys = state.root / "root/.ssh/authorized_keys"
if state.config.ssh_key:
- copy_path(f"{state.config.ssh_key}.pub", authorized_keys)
+ copy_path(Path(f"{state.config.ssh_key}.pub"), authorized_keys)
elif state.config.ssh_agent is not None:
env = {"SSH_AUTH_SOCK": state.config.ssh_agent}
result = run(["ssh-add", "-L"], env=env, text=True, stdout=subprocess.PIPE)
make_output_dir(config)
make_cache_dir(config)
workspace = setup_workspace(config)
- cache = setup_package_cache(config, Path(workspace.name))
+ workspace_dir = Path(workspace.name)
+ cache = setup_package_cache(config, workspace_dir)
manifest = Manifest(config)
# Make sure tmpfiles' aging doesn't interfere with our workspace
# while we are working on it.
- with flock(workspace.name):
+ with flock(workspace_dir):
state = MkosiState(
config=config,
- workspace=Path(workspace.name),
+ workspace=workspace_dir,
cache=cache,
do_run_build_script=False,
machine_id=config.machine_id or uuid.uuid4().hex,
with contextlib.ExitStack() as stack:
if fw_supports_sb:
ovmf_vars = stack.enter_context(tempfile.NamedTemporaryFile(prefix=".mkosi-", dir=tmp_dir()))
- copy_path(find_ovmf_vars(config), ovmf_vars.name)
+ copy_path(find_ovmf_vars(config), Path(ovmf_vars.name))
cmdline += [
"-global",
"ICH9-LPC.disable_s3=1",
shutil.move(temp_new_filepath, filepath)
-def path_relative_to_cwd(path: PathString) -> Path:
+def path_relative_to_cwd(path: Path) -> Path:
"Return path as relative to $PWD if underneath, absolute path otherwise"
- path = Path(path)
-
try:
return path.relative_to(os.getcwd())
except ValueError:
cls.print_step(text2.format(*args))
-def chown_to_running_user(path: PathString) -> None:
+def chown_to_running_user(path: Path) -> None:
uid = int(os.getenv("SUDO_UID") or os.getenv("PKEXEC_UID") or str(os.getuid()))
user = pwd.getpwuid(uid).pw_name
gid = pwd.getpwuid(uid).pw_gid
from textwrap import dedent
from typing import Optional
-from mkosi.backend import MkosiState, PathString, complete_step, run
+from mkosi.backend import MkosiState, complete_step, run
def make_executable(path: Path) -> None:
@contextlib.contextmanager
-def flock(path: PathString) -> Iterator[Path]:
+def flock(path: Path) -> Iterator[Path]:
fd = os.open(path, os.O_CLOEXEC|os.O_DIRECTORY|os.O_RDONLY)
try:
fcntl.fcntl(fd, fcntl.FD_CLOEXEC)
os.close(fd)
-def copy_path(src: PathString, dst: PathString, parents: bool = False) -> None:
+def copy_path(src: Path, dst: Path, parents: bool = False) -> None:
run(["cp", "--archive", "--no-target-directory", "--reflink=auto", src, dst])