]> git.ipfire.org Git - thirdparty/systemd.git/commitdiff
ukify: add helper to create UKIs
authorZbigniew Jędrzejewski-Szmek <zbyszek@in.waw.pl>
Sat, 29 Oct 2022 18:07:46 +0000 (20:07 +0200)
committerZbigniew Jędrzejewski-Szmek <zbyszek@in.waw.pl>
Wed, 7 Dec 2022 14:32:13 +0000 (15:32 +0100)
Features:
- adds sections .linux, .initrd, .uname, .osrel, .pcrpkey, .pcrsig, .cmdline, .splash
- multiple initrds can be concatenated
- section flags are set properly (READONLY, DATA or CODE)
- uses systemd-measure to precalculate pcr measurements and create a signed json policy
- the inner linux image will be signed automatically with sbsign if unsigned
- uses sbsign to sign the output image
- offsets are calculated so that sections are placed adjacent, with .linux last
- custom sections are possible
- multiple pcr signing keys can be specified and different boot phase paths can be
  signed with different keys
- most things can be overriden (path to tools, stub file, signing keys, pcr banks,
  boot phase paths, whether to sign things)
- superficial verification of slash bmp is done
- kernel uname "scraping" from the kernel if not specified (in a later patch)

TODO:
- change systemd-measure to not require a functional TPM2. W/o this, we'd need
  to support all banks in the build machine, which is hard to guarantee.
- load signing keys from /etc/kernel/
- supress exceptions, so if something external fails, the user will not see a traceback
- conversion to BMP from other formats

$ sudo /usr/lib/systemd/ukify \
  --tools=build/ \
  --measure \
  /lib/modules/6.0.5-300.fc37.x86_64/vmlinuz \
  /boot/08a5690a2eed47cf92ac0a5d2e3cf6b0/6.0.5-300.fc37.x86_64/initrd \
  --secureboot-private-key=server.key --secureboot-certificate=server.crt \
  --pcr-private-key=tpm2-pcr-private.pem --pcr-public-key=tpm2-pcr-public.pem \
  --cmdline='rw quiet' \
  --section test:TESTTESTTEST \
  --section test2:TESTTESTTEST2 \
  --pcr-banks=sha1 \
  --uname="$(uname -rv)"

Host arch 'x86_64', efi arch 'x64'
+ sbverify --list /lib/modules/6.0.5-300.fc37.x86_64/vmlinuz
+ build/systemd-measure calculate --linux=/lib/modules/6.0.5-300.fc37.x86_64/vmlinuz --osrel=/etc/os-release --cmdline=/tmp/tmpcmdline_5aufjir --pcrpkey=tpm2-pcr-public.pem --initrd=/boot/08a5690a2eed47cf92ac0a5d2e3cf6b0/6.0.5-300.fc37.x86_64/initrd --bank=sha1
11:sha1=03df5e5243bc002b959d52359fe04e266d0b5ebf
11:sha1=54949b82bae32e80343ff0f01eeeeb75f4c07d3f
11:sha1=0fc62be88aa9c5ad7282aa8adb504f451bcec9df
11:sha1=b71155e7fcd467f7c1696f675e37887032e2eafa
+ build/systemd-measure sign --linux=/lib/modules/6.0.5-300.fc37.x86_64/vmlinuz --osrel=/etc/os-release --cmdline=/tmp/tmpcmdline_5aufjir --pcrpkey=tpm2-pcr-public.pem --initrd=/boot/08a5690a2eed47cf92ac0a5d2e3cf6b0/6.0.5-300.fc37.x86_64/initrd --bank=sha1 --private-key=tpm2-pcr-private.pem --public-key=tpm2-pcr-public.pem
+ objcopy /usr/lib/systemd/boot/efi/linuxx64.efi.stub --add-section .osrel=/etc/os-release --change-section-vma .osrel=0x22000 --add-section .cmdline=/tmp/tmpcmdline_5aufjir --change-section-vma .cmdline=0x23000 --add-section .pcrpkey=tpm2-pcr-public.pem --change-section-vma .pcrpkey=0x24000 --add-section .initrd=/boot/08a5690a2eed47cf92ac0a5d2e3cf6b0/6.0.5-300.fc37.x86_64/initrd --change-section-vma .initrd=0x25000 --add-section .uname=/tmp/tmpuname0v3uzh5r --change-section-vma .uname=0x4009000 --add-section .test=/tmp/tmptestuxve59c8 --change-section-vma .test=0x400a000 --add-section .test2=/tmp/tmptest2_i143p9i --change-section-vma .test2=0x400b000 --add-section .pcrsig=/tmp/tmppcrsigdtcqxz_w --change-section-vma .pcrsig=0x400c000 --add-section .linux=/lib/modules/6.0.5-300.fc37.x86_64/vmlinuz --change-section-vma .linux=0x400d000 /tmp/uki4vsbf7y8
+ sbsign --key server.key --cert server.crt /tmp/uki4vsbf7y8 --output vmlinuz.efi
warning: data remaining[79849520 vs 79866644]: gaps between PE/COFF sections?
warning: data remaining[79849520 vs 79866648]: gaps between PE/COFF sections?
Signing Unsigned original image
Wrote signed vmlinuz.efi

src/ukify/ukify.py [new file with mode: 0755]

diff --git a/src/ukify/ukify.py b/src/ukify/ukify.py
new file mode 100755 (executable)
index 0000000..4041cef
--- /dev/null
@@ -0,0 +1,576 @@
+#!/usr/bin/env python3
+# SPDX-License-Identifier: LGPL-2.1+
+
+# pylint: disable=missing-docstring,invalid-name,import-outside-toplevel
+# pylint: disable=consider-using-with,unspecified-encoding,line-too-long
+# pylint: disable=too-many-locals,too-many-statements,too-many-return-statements
+# pylint: disable=too-many-branches
+
+import argparse
+import collections
+import dataclasses
+import fnmatch
+import itertools
+import json
+import os
+import pathlib
+import re
+import shlex
+import subprocess
+import tempfile
+import typing
+
+import pefile
+
+EFI_ARCH_MAP = {
+        # host_arch glob : [efi_arch, 32_bit_efi_arch if mixed mode is supported]
+        'x86_64'       : ['x64', 'ia32'],
+        'i[3456]86'    : ['ia32'],
+        'aarch64'      : ['aa64'],
+        'arm[45678]*l' : ['arm'],
+        'riscv64'      : ['riscv64'],
+}
+EFI_ARCHES: list[str] = sum(EFI_ARCH_MAP.values(), [])
+
+def guess_efi_arch():
+    arch = os.uname().machine
+
+    for glob, mapping in EFI_ARCH_MAP.items():
+        if fnmatch.fnmatch(arch, glob):
+            efi_arch, *fallback = mapping
+            break
+    else:
+        raise ValueError(f'Unsupported architecture {arch}')
+
+    # This makes sense only on some architectures, but it also probably doesn't
+    # hurt on others, so let's just apply the check everywhere.
+    if fallback:
+        fw_platform_size = pathlib.Path('/sys/firmware/efi/fw_platform_size')
+        try:
+            size = fw_platform_size.read_text().strip()
+        except FileNotFoundError:
+            pass
+        else:
+            if int(size) == 32:
+                efi_arch = fallback[0]
+
+    print(f'Host arch {arch!r}, EFI arch {efi_arch!r}')
+    return efi_arch
+
+
+def shell_join(cmd):
+    # TODO: drop in favour of shlex.join once shlex.join supports pathlib.Path.
+    return ' '.join(shlex.quote(str(x)) for x in cmd)
+
+
+def pe_executable_size(filename):
+    pe = pefile.PE(filename)
+    section = pe.sections[-1]
+    return section.VirtualAddress + section.Misc_VirtualSize
+
+
+def round_up(x, blocksize=4096):
+    return (x + blocksize - 1) // blocksize * blocksize
+
+
+@dataclasses.dataclass
+class Section:
+    name: str
+    content: pathlib.Path
+    tmpfile: typing.IO | None = None
+    flags: list[str] | None = dataclasses.field(default=None)
+    offset: int | None = None
+    measure: bool = False
+
+    @classmethod
+    def create(cls, name, contents, flags=None, measure=False):
+        if isinstance(contents, str):
+            tmp = tempfile.NamedTemporaryFile(mode='wt', prefix=f'tmp{name}')
+            tmp.write(contents)
+            tmp.flush()
+            contents = pathlib.Path(tmp.name)
+        else:
+            tmp = None
+
+        return cls(name, contents, tmpfile=tmp, flags=flags, measure=measure)
+
+    @classmethod
+    def parse_arg(cls, s):
+        try:
+            name, contents, *rest = s.split(':')
+        except ValueError as e:
+            raise ValueError(f'Cannot parse section spec (name or contents missing): {s!r}') from e
+        if rest:
+            raise ValueError(f'Cannot parse section spec (extraneous parameters): {s!r}')
+
+        if contents.startswith('@'):
+            contents = pathlib.Path(contents[1:])
+
+        return cls.create(name, contents)
+
+    def size(self):
+        return self.content.stat().st_size
+
+    def check_name(self):
+        # PE section names with more than 8 characters are legal, but our stub does
+        # not support them.
+        if not self.name.isascii() or not self.name.isprintable():
+            raise ValueError(f'Bad section name: {self.name!r}')
+        if len(self.name) > 8:
+            raise ValueError(f'Section name too long: {self.name!r}')
+
+
+@dataclasses.dataclass
+class UKI:
+    executable: list[pathlib.Path|str]
+    sections: list[Section] = dataclasses.field(default_factory=list, init=False)
+    offset: int | None = dataclasses.field(default=None, init=False)
+
+    def __post_init__(self):
+        self.offset = round_up(pe_executable_size(self.executable))
+
+    def add_section(self, section):
+        assert self.offset
+        assert section.offset is None
+
+        if section.name in [s.name for s in self.sections]:
+            raise ValueError(f'Duplicate section {section.name}')
+
+        section.offset = self.offset
+        self.offset += round_up(section.size())
+        self.sections += [section]
+
+
+def parse_banks(s):
+    banks = re.split(r',|\s+', s)
+    # TODO: do some sanity checking here
+    return banks
+
+
+KNOWN_PHASES = (
+    'enter-initrd',
+    'leave-initrd',
+    'sysinit',
+    'ready',
+    'shutdown',
+    'final',
+)
+
+def parse_phase_paths(s):
+    # Split on commas or whitespace here. Commas might be hard to parse visually.
+    paths = re.split(r',|\s+', s)
+
+    for path in paths:
+        for phase in path.split(':'):
+            if phase not in KNOWN_PHASES:
+                raise argparse.ArgumentTypeError(f'Unknown boot phase {phase!r} ({path=})')
+
+    return paths
+
+
+def check_splash(filename):
+    if filename is None:
+        return
+
+    # import is delayed, to avoid import when the splash image is not used
+    try:
+        from PIL import Image
+    except ImportError:
+        return
+
+    img = Image.open(filename, formats=['BMP'])
+    print(f'Splash image {filename} is {img.width}×{img.height} pixels')
+
+
+def check_inputs(opts):
+    for name, value in vars(opts).items():
+        if name in {'output', 'tools'}:
+            continue
+
+        if not isinstance(value, pathlib.Path):
+            continue
+
+        # Open file to check that we can read it, or generate an exception
+        value.open().close()
+
+    check_splash(opts.splash)
+
+
+def find_tool(name, fallback=None, opts=None):
+    if opts and opts.tools:
+        tool = opts.tools / name
+        if tool.exists():
+            return tool
+
+    return fallback or name
+
+
+def combine_signatures(pcrsigs):
+    combined = collections.defaultdict(list)
+    for pcrsig in pcrsigs:
+        for bank, sigs in pcrsig.items():
+            for sig in sigs:
+                if sig not in combined[bank]:
+                    combined[bank] += [sig]
+    return json.dumps(combined)
+
+
+def call_systemd_measure(uki, linux, opts):
+    measure_tool = find_tool('systemd-measure',
+                             '/usr/lib/systemd/systemd-measure',
+                             opts=opts)
+
+    banks = opts.pcr_banks or ()
+
+    # PCR measurement
+
+    if opts.measure:
+        pp_groups = opts.phase_path_groups or []
+
+        cmd = [
+            measure_tool,
+            'calculate',
+            f'--linux={linux}',
+            *(f"--{s.name.removeprefix('.')}={s.content}"
+              for s in uki.sections
+              if s.measure),
+            *(f'--bank={bank}'
+              for bank in banks),
+            # For measurement, the keys are not relevant, so we can lump all the phase paths
+            # into one call to systemd-measure calculate.
+            *(f'--phase={phase_path}'
+              for phase_path in itertools.chain.from_iterable(pp_groups)),
+        ]
+
+        print('+', shell_join(cmd))
+        subprocess.check_call(cmd)
+
+    # PCR signing
+
+    if opts.pcr_private_keys:
+        n_priv = len(opts.pcr_private_keys or ())
+        pp_groups = opts.phase_path_groups or [None] * n_priv
+        pub_keys = opts.pcr_public_keys or [None] * n_priv
+
+        pcrsigs = []
+
+        cmd = [
+            measure_tool,
+            'sign',
+            f'--linux={linux}',
+            *(f"--{s.name.removeprefix('.')}={s.content}"
+              for s in uki.sections
+              if s.measure),
+            *(f'--bank={bank}'
+              for bank in banks),
+        ]
+
+        for priv_key, pub_key, group in zip(opts.pcr_private_keys,
+                                            pub_keys,
+                                            pp_groups):
+            extra = [f'--private-key={priv_key}']
+            if pub_key:
+                extra += [f'--public-key={pub_key}']
+            extra += [f'--phase={phase_path}' for phase_path in group or ()]
+
+            print('+', shell_join(cmd + extra))
+            pcrsig = subprocess.check_output(cmd + extra, text=True)
+            pcrsig = json.loads(pcrsig)
+            pcrsigs += [pcrsig]
+
+        combined = combine_signatures(pcrsigs)
+        uki.add_section(Section.create('.pcrsig', combined))
+
+
+def make_uki(opts):
+    # kernel payload signing
+
+    sbsign_tool = find_tool('sbsign', opts=opts)
+    sbsign_invocation = [
+        sbsign_tool,
+        '--key', opts.sb_key,
+        '--cert', opts.sb_cert,
+    ]
+
+    if opts.signing_engine is not None:
+        sbsign_invocation += ['--engine', opts.signing_engine]
+
+    sign_kernel = opts.sign_kernel
+    if sign_kernel is None and opts.sb_key:
+        # figure out if we should sign the kernel
+        sbverify_tool = find_tool('sbverify', opts=opts)
+
+        cmd = [
+            sbverify_tool,
+            '--list',
+            opts.linux,
+        ]
+
+        print('+', shell_join(cmd))
+        info = subprocess.check_output(cmd, text=True)
+
+        # sbverify has wonderful API
+        if 'No signature table present' in info:
+            sign_kernel = True
+
+    if sign_kernel:
+        linux_signed = tempfile.NamedTemporaryFile(prefix='linux-signed')
+        linux = linux_signed.name
+
+        cmd = [
+            *sbsign_invocation,
+            opts.linux,
+            '--output', linux,
+        ]
+
+        print('+', shell_join(cmd))
+        subprocess.check_call(cmd)
+    else:
+        linux = opts.linux
+
+    uki = UKI(opts.stub)
+
+    # TODO: derive public key from from opts.pcr_private_keys?
+    pcrpkey = opts.pcrpkey
+    if pcrpkey is None:
+        if opts.pcr_public_keys and len(opts.pcr_public_keys) == 1:
+            pcrpkey = opts.pcr_public_keys[0]
+
+    sections = [
+        # name,      content,         measure?
+        ('.osrel',   opts.os_release, True ),
+        ('.cmdline', opts.cmdline,    True ),
+        ('.dtb',     opts.devicetree, True ),
+        ('.splash',  opts.splash,     True ),
+        ('.pcrpkey', pcrpkey,         True ),
+        ('.initrd',  opts.initrd,     True ),
+        ('.uname',   opts.uname,      False),
+
+        # linux shall be last to leave breathing room for decompression.
+        # We'll add it later.
+    ]
+
+    for name, content, measure in sections:
+        if content:
+            uki.add_section(Section.create(name, content, measure=measure))
+
+    # systemd-measure doesn't know about those extra sections
+    for section in opts.sections:
+        uki.add_section(section)
+
+    # PCR measurement and signing
+
+    call_systemd_measure(uki, linux, opts=opts)
+
+    # UKI creation
+
+    uki.add_section(
+        Section.create('.linux', linux, measure=True,
+                       flags=['code', 'readonly']))
+
+    if opts.sb_key:
+        unsigned = tempfile.NamedTemporaryFile(prefix='uki')
+        output = unsigned.name
+    else:
+        output = opts.output
+
+    objcopy_tool = find_tool('objcopy', opts=opts)
+
+    cmd = [
+        objcopy_tool,
+        opts.stub,
+        *itertools.chain.from_iterable(
+            ('--add-section',        f'{s.name}={s.content}',
+             '--change-section-vma', f'{s.name}=0x{s.offset:x}')
+            for s in uki.sections),
+        *itertools.chain.from_iterable(
+            ('--set-section-flags',  f"{s.name}={','.join(s.flags)}")
+            for s in uki.sections
+            if s.flags is not None),
+        output,
+    ]
+    print('+', shell_join(cmd))
+    subprocess.check_call(cmd)
+
+    # UKI signing
+
+    if opts.sb_key:
+        cmd = [
+            *sbsign_invocation,
+            unsigned.name,
+            '--output', opts.output,
+        ]
+        print('+', shell_join(cmd))
+        subprocess.check_call(cmd)
+
+        # We end up with no executable bits, let's reapply them
+        os.umask(umask := os.umask(0))
+        os.chmod(opts.output, 0o777 & ~umask)
+
+    print(f"Wrote {'signed' if opts.sb_key else 'unsigned'} {opts.output}")
+
+
+def parse_args(args=None):
+    p = argparse.ArgumentParser(
+        description='Build and sign Unified Kernel Images',
+        allow_abbrev=False,
+        usage='''\
+usage: ukify [options…] linux initrd
+       ukify -h | --help
+''')
+
+    # Suppress printing of usage synopsis on errors
+    p.error = lambda message: p.exit(2, f'{p.prog}: error: {message}\n')
+
+    p.add_argument('linux',
+                   type=pathlib.Path,
+                   help='vmlinuz file [.linux section]')
+    p.add_argument('initrd',
+                   type=pathlib.Path,
+                   help='initrd file [.initrd section]')
+
+    p.add_argument('--cmdline',
+                   metavar='TEXT|@PATH',
+                   help='kernel command line [.cmdline section]')
+
+    p.add_argument('--os-release',
+                   metavar='TEXT|@PATH',
+                   help='path to os-release file [.osrel section]')
+
+    p.add_argument('--devicetree',
+                   metavar='PATH',
+                   type=pathlib.Path,
+                   help='Device Tree file [.dtb section]')
+    p.add_argument('--splash',
+                   metavar='BMP',
+                   type=pathlib.Path,
+                   help='splash image bitmap file [.splash section]')
+    p.add_argument('--pcrpkey',
+                   metavar='KEY',
+                   type=pathlib.Path,
+                   help='embedded public key to seal secrets to [.pcrpkey section]')
+    p.add_argument('--uname',
+                   metavar='VERSION',
+                   help='"uname -r" information [.uname section]')
+
+    p.add_argument('--efi-arch',
+                   metavar='ARCH',
+                   choices=('ia32', 'x64', 'arm', 'aa64', 'riscv64'),
+                   help='target EFI architecture')
+
+    p.add_argument('--stub',
+                   type=pathlib.Path,
+                   help='path the the sd-stub file [.text,.data,… sections]')
+
+    p.add_argument('--section',
+                   dest='sections',
+                   metavar='NAME:TEXT|@PATH',
+                   type=Section.parse_arg,
+                   action='append',
+                   default=[],
+                   help='additional section as name and contents [NAME section]')
+
+    p.add_argument('--pcr-private-key',
+                   dest='pcr_private_keys',
+                   metavar='PATH',
+                   type=pathlib.Path,
+                   action='append',
+                   help='private part of the keypair for signing PCR signatures')
+    p.add_argument('--pcr-public-key',
+                   dest='pcr_public_keys',
+                   metavar='PATH',
+                   type=pathlib.Path,
+                   action='append',
+                   help='public part of the keypair for signing PCR signatures')
+    p.add_argument('--phases',
+                   dest='phase_path_groups',
+                   metavar='PHASE-PATH…',
+                   type=parse_phase_paths,
+                   action='append',
+                   help='phase-paths to create signatures for')
+
+    p.add_argument('--pcr-banks',
+                   metavar='BANK…',
+                   type=parse_banks)
+
+    p.add_argument('--signing-engine',
+                   metavar='ENGINE',
+                   help='OpenSSL engine to use for signing')
+    p.add_argument('--secureboot-private-key',
+                   dest='sb_key',
+                   help='path to key file or engine-specific designation for SB signing')
+    p.add_argument('--secureboot-certificate',
+                   dest='sb_cert',
+                   help='path to certificate file or engine-specific designation for SB signing')
+
+    p.add_argument('--sign-kernel',
+                   action=argparse.BooleanOptionalAction,
+                   help='Sign the embedded kernel')
+
+    p.add_argument('--tools',
+                   type=pathlib.Path,
+                   help='a directory with systemd-measure and other tools')
+
+    p.add_argument('--output', '-o',
+                   type=pathlib.Path,
+                   help='output file path')
+
+    p.add_argument('--measure',
+                   action=argparse.BooleanOptionalAction,
+                   help='print systemd-measure output for the UKI')
+
+    opts = p.parse_args(args)
+
+    if opts.cmdline and opts.cmdline.startswith('@'):
+        opts.cmdline = pathlib.Path(opts.cmdline[1:])
+
+    if opts.os_release is not None and opts.os_release.startswith('@'):
+        opts.os_release = pathlib.Path(opts.os_release[1:])
+    elif opts.os_release is None:
+        p = pathlib.Path('/etc/os-release')
+        if not p.exists():
+            p = pathlib.Path('/usr/lib/os-release')
+        opts.os_release = p
+
+    if opts.efi_arch is None:
+        opts.efi_arch = guess_efi_arch()
+
+    if opts.stub is None:
+        opts.stub = f'/usr/lib/systemd/boot/efi/linux{opts.efi_arch}.efi.stub'
+
+    if opts.signing_engine is None:
+        opts.sb_key = pathlib.Path(opts.sb_key) if opts.sb_key else None
+        opts.sb_cert = pathlib.Path(opts.sb_cert) if opts.sb_cert else None
+
+    if bool(opts.sb_key) ^ bool(opts.sb_cert):
+        raise ValueError('--secureboot-private-key= and --secureboot-certificate= must be specified together')
+
+    if opts.sign_kernel and not opts.sb_key:
+        raise ValueError('--sign-kernel requires --secureboot-private-key= and --secureboot-certificate= to be specified')
+
+    n_pcr_pub = None if opts.pcr_public_keys is None else len(opts.pcr_public_keys)
+    n_pcr_priv = None if opts.pcr_private_keys is None else len(opts.pcr_private_keys)
+    n_phase_path_groups = None if opts.phase_path_groups is None else len(opts.phase_path_groups)
+    if n_pcr_pub is not None and n_pcr_pub != n_pcr_priv:
+        raise ValueError('--pcr-public-key= specifications must match --pcr-private-key=')
+    if n_phase_path_groups is not None and n_phase_path_groups != n_pcr_priv:
+        raise ValueError('--phases= specifications must match --pcr-private-key=')
+
+    if opts.output is None:
+        suffix = '.efi' if opts.sb_key else '.unsigned.efi'
+        opts.output = opts.linux.name + suffix
+
+    for section in opts.sections:
+        section.check_name()
+
+    return opts
+
+
+def main():
+    opts = parse_args()
+    check_inputs(opts)
+    make_uki(opts)
+
+
+if __name__ == '__main__':
+    main()