import os
import tempfile
-from collections.abc import Iterable
+from collections.abc import Iterable, Sequence
from pathlib import Path
from typing import Union
from xml.etree import ElementTree
from mkosi.installer.rpm import RpmRepository, find_rpm_gpgkey, setup_rpm
from mkosi.installer.zypper import Zypper
from mkosi.log import complete_step, die
-from mkosi.mounts import finalize_certificate_mounts
-from mkosi.run import run, workdir
-from mkosi.util import flatten
+from mkosi.run import exists_in_sandbox, run, workdir
+from mkosi.util import PathString
from mkosi.versioncomp import GenericVersion
setup_rpm(context, dbbackend="ndb")
cls.package_manager(context.config).setup(context, list(cls.repositories(context)))
- if cls.package_manager(context.config) is Zypper and (gpgkeys := fetch_gpgkeys(context)):
- with complete_step("Importing GPG keys into RPM database"):
- run(
- ["rpm", "--root=/buildroot", "--import", *(workdir(key) for key in gpgkeys)],
- sandbox=context.sandbox(
- options=[
- *context.rootoptions(),
- *finalize_certificate_mounts(context.config),
- *flatten(["--ro-bind", os.fspath(key), workdir(key)] for key in gpgkeys),
- ],
- ),
- )
+ if cls.package_manager(context.config) is Zypper:
+ options = context.rootoptions()
+
+ gpgkeys: Sequence[PathString] = []
+ if (p := context.keyring_dir / "opensuse.gpg").exists():
+ gpgkeys = [workdir(p)]
+ options += ["--bind", os.fspath(p), workdir(p)]
+ else:
+ gpgkeys = fetch_gpgkeys(context)
+
+ if gpgkeys:
+ with complete_step("Importing GPG keys into RPM database"):
+ run(
+ ["rpm", "--root=/buildroot", "--import", *gpgkeys],
+ sandbox=context.sandbox(options=options),
+ )
+
+ @classmethod
+ def keyring(cls, context: Context) -> None:
+ if cls.package_manager(context.config) is not Zypper:
+ return
+
+ context.keyring_dir.mkdir(parents=True, exist_ok=True)
+
+ with (context.keyring_dir / "opensuse.gpg").open("wb") as f:
+ run(
+ # TODO: Switch to rpmkeys --export once we can rely on rpm 6.0.0 or newer.
+ ["rpm", "--root=/buildroot", "-q", "gpg-pubkey", "--qf", "%{PUBKEYS:armor}\n"],
+ stdout=f,
+ sandbox=context.sandbox(options=context.rootoptions()),
+ )
@classmethod
def install(cls, context: Context) -> None:
def fetch_gpgkeys(context: Context) -> list[Path]:
- files = set((context.metadata_dir / "cache/zypp/pubkeys").glob("*.asc"))
+ files = set()
+
+ # We have to do these lookups in the sandbox and return paths in the sandbox, so that overrides from
+ # mkosi.sandbox/ are taken into account.
with complete_step("Fetching GPG keys from configured repositories"):
for p in (context.sandbox_tree / "etc/zypp/repos.d").iterdir():
keys = value.splitlines()
for key in keys:
if key.startswith("file://"):
- path = key.removeprefix("file://").lstrip("/")
- if not (context.config.tools() / path).exists():
- die(f"Local GPG key specified ({key}) but not found at /{path}")
-
- files.add(context.config.tools() / path)
- elif key.startswith("https://") and context.config.repository_key_fetch:
- (context.workspace / "keys").mkdir(parents=True, exist_ok=True)
- curl(context.config, key, output_dir=context.workspace / "keys")
- files.add(context.workspace / "keys" / Path(key).name)
- else:
+ path = Path(key.removeprefix("file://"))
+ if path in files:
+ continue
+
+ if not exists_in_sandbox(path, sandbox=context.sandbox()):
+ die(f"Local GPG key specified ({key}) but not found at {path}")
+
+ files.add(path)
+ elif not context.config.repository_key_fetch:
die(
f"Remote GPG key specified ({key}) but RepositoryKeyFetch= is disabled",
hint="Enable RepositoryKeyFetch= or provide local keys",
"--non-interactive",
"--no-refresh",
f"--releasever={context.config.release}",
- *(["--gpg-auto-import-keys"] if context.config.repository_key_fetch else []),
*(["--no-gpg-checks"] if not context.config.repository_key_check else []),
*([f"--plus-content={repo}" for repo in context.config.repositories]),
*(["-vv"] if ARG_DEBUG.get() else []),
@classmethod
def sync(cls, context: Context, force: bool, arguments: Sequence[str] = ()) -> None:
- cls.invoke(context, "refresh", [*(["--force"] if force else []), *arguments])
+ cls.invoke(
+ context,
+ "refresh",
+ [*(["--force"] if force else []), *arguments],
+ options=["--gpg-auto-import-keys"] if context.config.repository_key_fetch else [],
+ )
@classmethod
def createrepo(cls, context: Context) -> None: