--- /dev/null
+#!/usr/bin/env python3
+# SPDX-License-Identifier: LGPL-2.1+
+
+# pylint: disable=missing-docstring,invalid-name,import-outside-toplevel
+# pylint: disable=consider-using-with,unspecified-encoding,line-too-long
+# pylint: disable=too-many-locals,too-many-statements,too-many-return-statements
+# pylint: disable=too-many-branches
+
+import argparse
+import collections
+import dataclasses
+import fnmatch
+import itertools
+import json
+import os
+import pathlib
+import re
+import shlex
+import subprocess
+import tempfile
+import typing
+
+import pefile
+
+EFI_ARCH_MAP = {
+ # host_arch glob : [efi_arch, 32_bit_efi_arch if mixed mode is supported]
+ 'x86_64' : ['x64', 'ia32'],
+ 'i[3456]86' : ['ia32'],
+ 'aarch64' : ['aa64'],
+ 'arm[45678]*l' : ['arm'],
+ 'riscv64' : ['riscv64'],
+}
+EFI_ARCHES: list[str] = sum(EFI_ARCH_MAP.values(), [])
+
+def guess_efi_arch():
+ arch = os.uname().machine
+
+ for glob, mapping in EFI_ARCH_MAP.items():
+ if fnmatch.fnmatch(arch, glob):
+ efi_arch, *fallback = mapping
+ break
+ else:
+ raise ValueError(f'Unsupported architecture {arch}')
+
+ # This makes sense only on some architectures, but it also probably doesn't
+ # hurt on others, so let's just apply the check everywhere.
+ if fallback:
+ fw_platform_size = pathlib.Path('/sys/firmware/efi/fw_platform_size')
+ try:
+ size = fw_platform_size.read_text().strip()
+ except FileNotFoundError:
+ pass
+ else:
+ if int(size) == 32:
+ efi_arch = fallback[0]
+
+ print(f'Host arch {arch!r}, EFI arch {efi_arch!r}')
+ return efi_arch
+
+
+def shell_join(cmd):
+ # TODO: drop in favour of shlex.join once shlex.join supports pathlib.Path.
+ return ' '.join(shlex.quote(str(x)) for x in cmd)
+
+
+def pe_executable_size(filename):
+ pe = pefile.PE(filename)
+ section = pe.sections[-1]
+ return section.VirtualAddress + section.Misc_VirtualSize
+
+
+def round_up(x, blocksize=4096):
+ return (x + blocksize - 1) // blocksize * blocksize
+
+
+@dataclasses.dataclass
+class Section:
+ name: str
+ content: pathlib.Path
+ tmpfile: typing.IO | None = None
+ flags: list[str] | None = dataclasses.field(default=None)
+ offset: int | None = None
+ measure: bool = False
+
+ @classmethod
+ def create(cls, name, contents, flags=None, measure=False):
+ if isinstance(contents, str):
+ tmp = tempfile.NamedTemporaryFile(mode='wt', prefix=f'tmp{name}')
+ tmp.write(contents)
+ tmp.flush()
+ contents = pathlib.Path(tmp.name)
+ else:
+ tmp = None
+
+ return cls(name, contents, tmpfile=tmp, flags=flags, measure=measure)
+
+ @classmethod
+ def parse_arg(cls, s):
+ try:
+ name, contents, *rest = s.split(':')
+ except ValueError as e:
+ raise ValueError(f'Cannot parse section spec (name or contents missing): {s!r}') from e
+ if rest:
+ raise ValueError(f'Cannot parse section spec (extraneous parameters): {s!r}')
+
+ if contents.startswith('@'):
+ contents = pathlib.Path(contents[1:])
+
+ return cls.create(name, contents)
+
+ def size(self):
+ return self.content.stat().st_size
+
+ def check_name(self):
+ # PE section names with more than 8 characters are legal, but our stub does
+ # not support them.
+ if not self.name.isascii() or not self.name.isprintable():
+ raise ValueError(f'Bad section name: {self.name!r}')
+ if len(self.name) > 8:
+ raise ValueError(f'Section name too long: {self.name!r}')
+
+
+@dataclasses.dataclass
+class UKI:
+ executable: list[pathlib.Path|str]
+ sections: list[Section] = dataclasses.field(default_factory=list, init=False)
+ offset: int | None = dataclasses.field(default=None, init=False)
+
+ def __post_init__(self):
+ self.offset = round_up(pe_executable_size(self.executable))
+
+ def add_section(self, section):
+ assert self.offset
+ assert section.offset is None
+
+ if section.name in [s.name for s in self.sections]:
+ raise ValueError(f'Duplicate section {section.name}')
+
+ section.offset = self.offset
+ self.offset += round_up(section.size())
+ self.sections += [section]
+
+
+def parse_banks(s):
+ banks = re.split(r',|\s+', s)
+ # TODO: do some sanity checking here
+ return banks
+
+
+KNOWN_PHASES = (
+ 'enter-initrd',
+ 'leave-initrd',
+ 'sysinit',
+ 'ready',
+ 'shutdown',
+ 'final',
+)
+
+def parse_phase_paths(s):
+ # Split on commas or whitespace here. Commas might be hard to parse visually.
+ paths = re.split(r',|\s+', s)
+
+ for path in paths:
+ for phase in path.split(':'):
+ if phase not in KNOWN_PHASES:
+ raise argparse.ArgumentTypeError(f'Unknown boot phase {phase!r} ({path=})')
+
+ return paths
+
+
+def check_splash(filename):
+ if filename is None:
+ return
+
+ # import is delayed, to avoid import when the splash image is not used
+ try:
+ from PIL import Image
+ except ImportError:
+ return
+
+ img = Image.open(filename, formats=['BMP'])
+ print(f'Splash image {filename} is {img.width}×{img.height} pixels')
+
+
+def check_inputs(opts):
+ for name, value in vars(opts).items():
+ if name in {'output', 'tools'}:
+ continue
+
+ if not isinstance(value, pathlib.Path):
+ continue
+
+ # Open file to check that we can read it, or generate an exception
+ value.open().close()
+
+ check_splash(opts.splash)
+
+
+def find_tool(name, fallback=None, opts=None):
+ if opts and opts.tools:
+ tool = opts.tools / name
+ if tool.exists():
+ return tool
+
+ return fallback or name
+
+
+def combine_signatures(pcrsigs):
+ combined = collections.defaultdict(list)
+ for pcrsig in pcrsigs:
+ for bank, sigs in pcrsig.items():
+ for sig in sigs:
+ if sig not in combined[bank]:
+ combined[bank] += [sig]
+ return json.dumps(combined)
+
+
+def call_systemd_measure(uki, linux, opts):
+ measure_tool = find_tool('systemd-measure',
+ '/usr/lib/systemd/systemd-measure',
+ opts=opts)
+
+ banks = opts.pcr_banks or ()
+
+ # PCR measurement
+
+ if opts.measure:
+ pp_groups = opts.phase_path_groups or []
+
+ cmd = [
+ measure_tool,
+ 'calculate',
+ f'--linux={linux}',
+ *(f"--{s.name.removeprefix('.')}={s.content}"
+ for s in uki.sections
+ if s.measure),
+ *(f'--bank={bank}'
+ for bank in banks),
+ # For measurement, the keys are not relevant, so we can lump all the phase paths
+ # into one call to systemd-measure calculate.
+ *(f'--phase={phase_path}'
+ for phase_path in itertools.chain.from_iterable(pp_groups)),
+ ]
+
+ print('+', shell_join(cmd))
+ subprocess.check_call(cmd)
+
+ # PCR signing
+
+ if opts.pcr_private_keys:
+ n_priv = len(opts.pcr_private_keys or ())
+ pp_groups = opts.phase_path_groups or [None] * n_priv
+ pub_keys = opts.pcr_public_keys or [None] * n_priv
+
+ pcrsigs = []
+
+ cmd = [
+ measure_tool,
+ 'sign',
+ f'--linux={linux}',
+ *(f"--{s.name.removeprefix('.')}={s.content}"
+ for s in uki.sections
+ if s.measure),
+ *(f'--bank={bank}'
+ for bank in banks),
+ ]
+
+ for priv_key, pub_key, group in zip(opts.pcr_private_keys,
+ pub_keys,
+ pp_groups):
+ extra = [f'--private-key={priv_key}']
+ if pub_key:
+ extra += [f'--public-key={pub_key}']
+ extra += [f'--phase={phase_path}' for phase_path in group or ()]
+
+ print('+', shell_join(cmd + extra))
+ pcrsig = subprocess.check_output(cmd + extra, text=True)
+ pcrsig = json.loads(pcrsig)
+ pcrsigs += [pcrsig]
+
+ combined = combine_signatures(pcrsigs)
+ uki.add_section(Section.create('.pcrsig', combined))
+
+
+def make_uki(opts):
+ # kernel payload signing
+
+ sbsign_tool = find_tool('sbsign', opts=opts)
+ sbsign_invocation = [
+ sbsign_tool,
+ '--key', opts.sb_key,
+ '--cert', opts.sb_cert,
+ ]
+
+ if opts.signing_engine is not None:
+ sbsign_invocation += ['--engine', opts.signing_engine]
+
+ sign_kernel = opts.sign_kernel
+ if sign_kernel is None and opts.sb_key:
+ # figure out if we should sign the kernel
+ sbverify_tool = find_tool('sbverify', opts=opts)
+
+ cmd = [
+ sbverify_tool,
+ '--list',
+ opts.linux,
+ ]
+
+ print('+', shell_join(cmd))
+ info = subprocess.check_output(cmd, text=True)
+
+ # sbverify has wonderful API
+ if 'No signature table present' in info:
+ sign_kernel = True
+
+ if sign_kernel:
+ linux_signed = tempfile.NamedTemporaryFile(prefix='linux-signed')
+ linux = linux_signed.name
+
+ cmd = [
+ *sbsign_invocation,
+ opts.linux,
+ '--output', linux,
+ ]
+
+ print('+', shell_join(cmd))
+ subprocess.check_call(cmd)
+ else:
+ linux = opts.linux
+
+ uki = UKI(opts.stub)
+
+ # TODO: derive public key from from opts.pcr_private_keys?
+ pcrpkey = opts.pcrpkey
+ if pcrpkey is None:
+ if opts.pcr_public_keys and len(opts.pcr_public_keys) == 1:
+ pcrpkey = opts.pcr_public_keys[0]
+
+ sections = [
+ # name, content, measure?
+ ('.osrel', opts.os_release, True ),
+ ('.cmdline', opts.cmdline, True ),
+ ('.dtb', opts.devicetree, True ),
+ ('.splash', opts.splash, True ),
+ ('.pcrpkey', pcrpkey, True ),
+ ('.initrd', opts.initrd, True ),
+ ('.uname', opts.uname, False),
+
+ # linux shall be last to leave breathing room for decompression.
+ # We'll add it later.
+ ]
+
+ for name, content, measure in sections:
+ if content:
+ uki.add_section(Section.create(name, content, measure=measure))
+
+ # systemd-measure doesn't know about those extra sections
+ for section in opts.sections:
+ uki.add_section(section)
+
+ # PCR measurement and signing
+
+ call_systemd_measure(uki, linux, opts=opts)
+
+ # UKI creation
+
+ uki.add_section(
+ Section.create('.linux', linux, measure=True,
+ flags=['code', 'readonly']))
+
+ if opts.sb_key:
+ unsigned = tempfile.NamedTemporaryFile(prefix='uki')
+ output = unsigned.name
+ else:
+ output = opts.output
+
+ objcopy_tool = find_tool('objcopy', opts=opts)
+
+ cmd = [
+ objcopy_tool,
+ opts.stub,
+ *itertools.chain.from_iterable(
+ ('--add-section', f'{s.name}={s.content}',
+ '--change-section-vma', f'{s.name}=0x{s.offset:x}')
+ for s in uki.sections),
+ *itertools.chain.from_iterable(
+ ('--set-section-flags', f"{s.name}={','.join(s.flags)}")
+ for s in uki.sections
+ if s.flags is not None),
+ output,
+ ]
+ print('+', shell_join(cmd))
+ subprocess.check_call(cmd)
+
+ # UKI signing
+
+ if opts.sb_key:
+ cmd = [
+ *sbsign_invocation,
+ unsigned.name,
+ '--output', opts.output,
+ ]
+ print('+', shell_join(cmd))
+ subprocess.check_call(cmd)
+
+ # We end up with no executable bits, let's reapply them
+ os.umask(umask := os.umask(0))
+ os.chmod(opts.output, 0o777 & ~umask)
+
+ print(f"Wrote {'signed' if opts.sb_key else 'unsigned'} {opts.output}")
+
+
+def parse_args(args=None):
+ p = argparse.ArgumentParser(
+ description='Build and sign Unified Kernel Images',
+ allow_abbrev=False,
+ usage='''\
+usage: ukify [options…] linux initrd
+ ukify -h | --help
+''')
+
+ # Suppress printing of usage synopsis on errors
+ p.error = lambda message: p.exit(2, f'{p.prog}: error: {message}\n')
+
+ p.add_argument('linux',
+ type=pathlib.Path,
+ help='vmlinuz file [.linux section]')
+ p.add_argument('initrd',
+ type=pathlib.Path,
+ help='initrd file [.initrd section]')
+
+ p.add_argument('--cmdline',
+ metavar='TEXT|@PATH',
+ help='kernel command line [.cmdline section]')
+
+ p.add_argument('--os-release',
+ metavar='TEXT|@PATH',
+ help='path to os-release file [.osrel section]')
+
+ p.add_argument('--devicetree',
+ metavar='PATH',
+ type=pathlib.Path,
+ help='Device Tree file [.dtb section]')
+ p.add_argument('--splash',
+ metavar='BMP',
+ type=pathlib.Path,
+ help='splash image bitmap file [.splash section]')
+ p.add_argument('--pcrpkey',
+ metavar='KEY',
+ type=pathlib.Path,
+ help='embedded public key to seal secrets to [.pcrpkey section]')
+ p.add_argument('--uname',
+ metavar='VERSION',
+ help='"uname -r" information [.uname section]')
+
+ p.add_argument('--efi-arch',
+ metavar='ARCH',
+ choices=('ia32', 'x64', 'arm', 'aa64', 'riscv64'),
+ help='target EFI architecture')
+
+ p.add_argument('--stub',
+ type=pathlib.Path,
+ help='path the the sd-stub file [.text,.data,… sections]')
+
+ p.add_argument('--section',
+ dest='sections',
+ metavar='NAME:TEXT|@PATH',
+ type=Section.parse_arg,
+ action='append',
+ default=[],
+ help='additional section as name and contents [NAME section]')
+
+ p.add_argument('--pcr-private-key',
+ dest='pcr_private_keys',
+ metavar='PATH',
+ type=pathlib.Path,
+ action='append',
+ help='private part of the keypair for signing PCR signatures')
+ p.add_argument('--pcr-public-key',
+ dest='pcr_public_keys',
+ metavar='PATH',
+ type=pathlib.Path,
+ action='append',
+ help='public part of the keypair for signing PCR signatures')
+ p.add_argument('--phases',
+ dest='phase_path_groups',
+ metavar='PHASE-PATH…',
+ type=parse_phase_paths,
+ action='append',
+ help='phase-paths to create signatures for')
+
+ p.add_argument('--pcr-banks',
+ metavar='BANK…',
+ type=parse_banks)
+
+ p.add_argument('--signing-engine',
+ metavar='ENGINE',
+ help='OpenSSL engine to use for signing')
+ p.add_argument('--secureboot-private-key',
+ dest='sb_key',
+ help='path to key file or engine-specific designation for SB signing')
+ p.add_argument('--secureboot-certificate',
+ dest='sb_cert',
+ help='path to certificate file or engine-specific designation for SB signing')
+
+ p.add_argument('--sign-kernel',
+ action=argparse.BooleanOptionalAction,
+ help='Sign the embedded kernel')
+
+ p.add_argument('--tools',
+ type=pathlib.Path,
+ help='a directory with systemd-measure and other tools')
+
+ p.add_argument('--output', '-o',
+ type=pathlib.Path,
+ help='output file path')
+
+ p.add_argument('--measure',
+ action=argparse.BooleanOptionalAction,
+ help='print systemd-measure output for the UKI')
+
+ opts = p.parse_args(args)
+
+ if opts.cmdline and opts.cmdline.startswith('@'):
+ opts.cmdline = pathlib.Path(opts.cmdline[1:])
+
+ if opts.os_release is not None and opts.os_release.startswith('@'):
+ opts.os_release = pathlib.Path(opts.os_release[1:])
+ elif opts.os_release is None:
+ p = pathlib.Path('/etc/os-release')
+ if not p.exists():
+ p = pathlib.Path('/usr/lib/os-release')
+ opts.os_release = p
+
+ if opts.efi_arch is None:
+ opts.efi_arch = guess_efi_arch()
+
+ if opts.stub is None:
+ opts.stub = f'/usr/lib/systemd/boot/efi/linux{opts.efi_arch}.efi.stub'
+
+ if opts.signing_engine is None:
+ opts.sb_key = pathlib.Path(opts.sb_key) if opts.sb_key else None
+ opts.sb_cert = pathlib.Path(opts.sb_cert) if opts.sb_cert else None
+
+ if bool(opts.sb_key) ^ bool(opts.sb_cert):
+ raise ValueError('--secureboot-private-key= and --secureboot-certificate= must be specified together')
+
+ if opts.sign_kernel and not opts.sb_key:
+ raise ValueError('--sign-kernel requires --secureboot-private-key= and --secureboot-certificate= to be specified')
+
+ n_pcr_pub = None if opts.pcr_public_keys is None else len(opts.pcr_public_keys)
+ n_pcr_priv = None if opts.pcr_private_keys is None else len(opts.pcr_private_keys)
+ n_phase_path_groups = None if opts.phase_path_groups is None else len(opts.phase_path_groups)
+ if n_pcr_pub is not None and n_pcr_pub != n_pcr_priv:
+ raise ValueError('--pcr-public-key= specifications must match --pcr-private-key=')
+ if n_phase_path_groups is not None and n_phase_path_groups != n_pcr_priv:
+ raise ValueError('--phases= specifications must match --pcr-private-key=')
+
+ if opts.output is None:
+ suffix = '.efi' if opts.sb_key else '.unsigned.efi'
+ opts.output = opts.linux.name + suffix
+
+ for section in opts.sections:
+ section.check_name()
+
+ return opts
+
+
+def main():
+ opts = parse_args()
+ check_inputs(opts)
+ make_uki(opts)
+
+
+if __name__ == '__main__':
+ main()