diff --git a/.github/workflows/linter.yml b/.github/workflows/linter.yml index 6cb07c95f98..53d1b1b9596 100644 --- a/.github/workflows/linter.yml +++ b/.github/workflows/linter.yml @@ -35,3 +35,26 @@ jobs: MULTI_STATUS: false VALIDATE_ALL_CODEBASE: false VALIDATE_GITHUB_ACTIONS: true + + - name: Check that tabs are not used in Python code + run: sh -c '! git grep -P "\\t" -- src/ukify/ukify.py' + + - name: Install ruff and mypy + run: | + python3 -m pip install --break-system-packages --upgrade setuptools wheel pip + python3 -m pip install --break-system-packages mypy types-Pillow ruff + + - name: Run mypy + run: | + python3 -m mypy --version + python3 -m mypy src/ukify/ukify.py + + - name: Run ruff check + run: | + ruff --version + ruff check src/ukify/ukify.py + + - name: Run ruff format + run: | + ruff --version + ruff format --check src/ukify/ukify.py diff --git a/src/ukify/mypy.ini b/src/ukify/mypy.ini new file mode 100644 index 00000000000..bae4e70f44e --- /dev/null +++ b/src/ukify/mypy.ini @@ -0,0 +1,21 @@ +[mypy] +python_version = 3.9 +allow_redefinition = True +# belonging to --strict +warn_unused_configs = true +disallow_untyped_calls = true +disallow_untyped_defs = true +disallow_untyped_decorators = true +disallow_incomplete_defs = true +check_untyped_defs = true +no_implicit_optional = true +warn_redundant_casts = true +warn_unused_ignores = false +warn_return_any = true +no_implicit_reexport = true +# extra options not in --strict +pretty = true +show_error_codes = true +show_column_numbers = true +warn_unreachable = true +strict_equality = true diff --git a/src/ukify/ruff.toml b/src/ukify/ruff.toml new file mode 100644 index 00000000000..6c0ec6ceb84 --- /dev/null +++ b/src/ukify/ruff.toml @@ -0,0 +1,6 @@ +target-version = "py39" +line-length = 109 +lint.select = ["E", "F", "I", "UP"] + +[format] +quote-style = "single" diff --git a/src/ukify/ukify.py b/src/ukify/ukify.py index 20c1c5ca5e9..93ad78b01d1 100755 --- a/src/ukify/ukify.py +++ b/src/ukify/ukify.py @@ -20,34 +20,39 @@ # pylint: disable=unnecessary-lambda-assignment import argparse +import collections import configparser import contextlib -import collections import dataclasses import datetime import fnmatch import itertools import json import os -import pathlib import pprint import pydoc import re import shlex import shutil import socket +import struct import subprocess import sys import tempfile import textwrap -import struct +from collections.abc import Iterable, Iterator, Sequence from hashlib import sha256 -from typing import (Any, - Callable, - IO, - Optional, - Sequence, - Union) +from pathlib import Path +from types import ModuleType +from typing import ( + IO, + Any, + Callable, + Optional, + TypeVar, + Union, + cast, +) import pefile # type: ignore @@ -55,31 +60,33 @@ __version__ = '{{PROJECT_VERSION}} ({{VERSION_TAG}})' EFI_ARCH_MAP = { # host_arch glob : [efi_arch, 32_bit_efi_arch if mixed mode is supported] - 'x86_64' : ['x64', 'ia32'], - 'i[3456]86' : ['ia32'], - 'aarch64' : ['aa64'], + 'x86_64': ['x64', 'ia32'], + 'i[3456]86': ['ia32'], + 'aarch64': ['aa64'], 'armv[45678]*l': ['arm'], - 'loongarch32' : ['loongarch32'], - 'loongarch64' : ['loongarch64'], - 'riscv32' : ['riscv32'], - 'riscv64' : ['riscv64'], -} + 'loongarch32': ['loongarch32'], + 'loongarch64': ['loongarch64'], + 'riscv32': ['riscv32'], + 'riscv64': ['riscv64'], +} # fmt: skip EFI_ARCHES: list[str] = sum(EFI_ARCH_MAP.values(), []) # Default configuration directories and file name. -# When the user does not specify one, the directories are searched in this order and the first file found is used. +# When the user does not specify one, the directories are searched in this order and the first file found is +# used. DEFAULT_CONFIG_DIRS = ['/etc/systemd', '/run/systemd', '/usr/local/lib/systemd', '/usr/lib/systemd'] DEFAULT_CONFIG_FILE = 'ukify.conf' + class Style: - bold = "\033[0;1;39m" if sys.stderr.isatty() else "" - gray = "\033[0;38;5;245m" if sys.stderr.isatty() else "" - red = "\033[31;1m" if sys.stderr.isatty() else "" - yellow = "\033[33;1m" if sys.stderr.isatty() else "" - reset = "\033[0m" if sys.stderr.isatty() else "" + bold = '\033[0;1;39m' if sys.stderr.isatty() else '' + gray = '\033[0;38;5;245m' if sys.stderr.isatty() else '' + red = '\033[31;1m' if sys.stderr.isatty() else '' + yellow = '\033[33;1m' if sys.stderr.isatty() else '' + reset = '\033[0m' if sys.stderr.isatty() else '' -def guess_efi_arch(): +def guess_efi_arch() -> str: arch = os.uname().machine for glob, mapping in EFI_ARCH_MAP.items(): @@ -92,7 +99,7 @@ def guess_efi_arch(): # This makes sense only on some architectures, but it also probably doesn't # hurt on others, so let's just apply the check everywhere. if fallback: - fw_platform_size = pathlib.Path('/sys/firmware/efi/fw_platform_size') + fw_platform_size = Path('/sys/firmware/efi/fw_platform_size') try: size = fw_platform_size.read_text().strip() except FileNotFoundError: @@ -114,22 +121,23 @@ def page(text: str, enabled: Optional[bool]) -> None: print(text) -def shell_join(cmd): - # TODO: drop in favour of shlex.join once shlex.join supports pathlib.Path. +def shell_join(cmd: list[Union[str, Path]]) -> str: + # TODO: drop in favour of shlex.join once shlex.join supports Path. return ' '.join(shlex.quote(str(x)) for x in cmd) -def round_up(x, blocksize=4096): +def round_up(x: int, blocksize: int = 4096) -> int: return (x + blocksize - 1) // blocksize * blocksize -def try_import(modname, name=None): +def try_import(modname: str, name: Optional[str] = None) -> ModuleType: try: return __import__(modname) except ImportError as e: raise ValueError(f'Kernel is compressed with {name or modname}, but module unavailable') from e -def get_zboot_kernel(f): + +def get_zboot_kernel(f: IO[bytes]) -> bytes: """Decompress zboot efistub kernel if compressed. Return contents.""" # See linux/drivers/firmware/efi/libstub/Makefile.zboot # and linux/drivers/firmware/efi/libstub/zboot-header.S @@ -141,7 +149,7 @@ def get_zboot_kernel(f): # Reading 4 bytes from address 0x0c is the size of compressed data, # but it needs to be corrected according to the compressed type. - f.seek(0xc) + f.seek(0xC) _sizes = f.read(4) size = struct.unpack(' bytes: """Decompress file if compressed. Return contents.""" f = open(filename, 'rb') start = f.read(4) @@ -191,20 +200,20 @@ def maybe_decompress(filename): if start.startswith(b'\x1f\x8b'): gzip = try_import('gzip') - return gzip.open(f).read() + return cast(bytes, gzip.open(f).read()) if start.startswith(b'\x28\xb5\x2f\xfd'): zstd = try_import('zstd') - return zstd.uncompress(f.read()) + return cast(bytes, zstd.uncompress(f.read())) if start.startswith(b'\x02\x21\x4c\x18'): lz4 = try_import('lz4.frame', 'lz4') - return lz4.frame.decompress(f.read()) + return cast(bytes, lz4.frame.decompress(f.read())) if start.startswith(b'\x04\x22\x4d\x18'): print('Newer lz4 stream format detected! This may not boot!') lz4 = try_import('lz4.frame', 'lz4') - return lz4.frame.decompress(f.read()) + return cast(bytes, lz4.frame.decompress(f.read())) if start.startswith(b'\x89LZO'): # python3-lzo is not packaged for Fedora @@ -212,13 +221,13 @@ def maybe_decompress(filename): if start.startswith(b'BZh'): bz2 = try_import('bz2', 'bzip2') - return bz2.open(f).read() + return cast(bytes, bz2.open(f).read()) if start.startswith(b'\x5d\x00\x00'): lzma = try_import('lzma') - return lzma.open(f).read() + return cast(bytes, lzma.open(f).read()) - raise NotImplementedError(f'unknown file format (starts with {start})') + raise NotImplementedError(f'unknown file format (starts with {start!r})') class Uname: @@ -234,7 +243,7 @@ class Uname: TEXT_PATTERN = rb'Linux version (?P\d\.\S+) \(' @classmethod - def scrape_x86(cls, filename, opts=None): + def scrape_x86(cls, filename: Path, opts: Optional[argparse.Namespace] = None) -> str: # Based on https://gitlab.archlinux.org/archlinux/mkinitcpio/mkinitcpio/-/blob/master/functions#L136 # and https://docs.kernel.org/arch/x86/boot.html#the-real-mode-kernel-header with open(filename, 'rb') as f: @@ -243,18 +252,18 @@ class Uname: if magic != b'HdrS': raise ValueError('Real-Mode Kernel Header magic not found') f.seek(0x20E) - offset = f.read(1)[0] + f.read(1)[0]*256 # Pointer to kernel version string + offset = f.read(1)[0] + f.read(1)[0] * 256 # Pointer to kernel version string f.seek(0x200 + offset) text = f.read(128) text = text.split(b'\0', maxsplit=1)[0] - text = text.decode() + decoded = text.decode() - if not (m := re.match(cls.VERSION_PATTERN, text)): + if not (m := re.match(cls.VERSION_PATTERN, decoded)): raise ValueError(f'Cannot parse version-host-release uname string: {text!r}') return m.group('version') @classmethod - def scrape_elf(cls, filename, opts=None): + def scrape_elf(cls, filename: Path, opts: Optional[argparse.Namespace] = None) -> str: readelf = find_tool('readelf', opts=opts) cmd = [ @@ -276,7 +285,7 @@ class Uname: return text.rstrip('\0') @classmethod - def scrape_generic(cls, filename, opts=None): + def scrape_generic(cls, filename: Path, opts: Optional[argparse.Namespace] = None) -> str: # import libarchive # libarchive-c fails with # ArchiveError: Unrecognized archive format (errno=84, retcode=-30, archive_p=94705420454656) @@ -290,7 +299,7 @@ class Uname: return m.group('version').decode() @classmethod - def scrape(cls, filename, opts=None): + def scrape(cls, filename: Path, opts: Optional[argparse.Namespace] = None) -> Optional[str]: for func in (cls.scrape_x86, cls.scrape_elf, cls.scrape_generic): try: version = func(filename, opts=opts) @@ -300,46 +309,48 @@ class Uname: print(str(e)) return None + DEFAULT_SECTIONS_TO_SHOW = { - '.linux' : 'binary', - '.initrd' : 'binary', - '.ucode' : 'binary', - '.splash' : 'binary', - '.dtb' : 'binary', - '.cmdline' : 'text', - '.osrel' : 'text', - '.uname' : 'text', - '.pcrpkey' : 'text', - '.pcrsig' : 'text', - '.sbat' : 'text', - '.sbom' : 'binary', - '.profile' : 'text', -} + '.linux': 'binary', + '.initrd': 'binary', + '.ucode': 'binary', + '.splash': 'binary', + '.dtb': 'binary', + '.cmdline': 'text', + '.osrel': 'text', + '.uname': 'text', + '.pcrpkey': 'text', + '.pcrsig': 'text', + '.sbat': 'text', + '.sbom': 'binary', + '.profile': 'text', +} # fmt: skip + @dataclasses.dataclass class Section: name: str - content: Optional[pathlib.Path] - tmpfile: Optional[IO] = None + content: Optional[Path] + tmpfile: Optional[IO[Any]] = None measure: bool = False output_mode: Optional[str] = None virtual_size: Optional[int] = None @classmethod - def create(cls, name, contents, **kwargs): + def create(cls, name: str, contents: Union[str, bytes, Path, None], **kwargs: Any) -> 'Section': if isinstance(contents, (str, bytes)): mode = 'wt' if isinstance(contents, str) else 'wb' tmp = tempfile.NamedTemporaryFile(mode=mode, prefix=f'tmp{name}') tmp.write(contents) tmp.flush() - contents = pathlib.Path(tmp.name) + contents = Path(tmp.name) else: tmp = None return cls(name, contents, tmpfile=tmp, **kwargs) @classmethod - def parse_input(cls, s): + def parse_input(cls, s: str) -> 'Section': try: name, contents, *rest = s.split(':') except ValueError as e: @@ -348,23 +359,24 @@ class Section: raise ValueError(f'Cannot parse section spec (extraneous parameters): {s!r}') if contents.startswith('@'): - contents = pathlib.Path(contents[1:]) + sec = cls.create(name, Path(contents[1:])) + else: + sec = cls.create(name, contents) - sec = cls.create(name, contents) sec.check_name() return sec @classmethod - def parse_output(cls, s): + def parse_output(cls, s: str) -> 'Section': if not (m := re.match(r'([a-zA-Z0-9_.]+):(text|binary)(?:@(.+))?', s)): raise ValueError(f'Cannot parse section spec: {s!r}') name, ttype, out = m.groups() - out = pathlib.Path(out) if out else None + out = Path(out) if out else None return cls.create(name, out, output_mode=ttype) - def check_name(self): + def check_name(self) -> None: # PE section names with more than 8 characters are legal, but our stub does # not support them. if not self.name.isascii() or not self.name.isprintable(): @@ -375,15 +387,15 @@ class Section: @dataclasses.dataclass class UKI: - executable: list[Union[pathlib.Path, str]] + executable: list[Union[Path, str]] sections: list[Section] = dataclasses.field(default_factory=list, init=False) - def add_section(self, section): + def add_section(self, section: Section) -> None: start = 0 # Start search at last .profile section, if there is one for i, s in enumerate(self.sections): - if s.name == ".profile": + if s.name == '.profile': start = i + 1 if any(section.name == s.name for s in self.sections[start:]): @@ -392,7 +404,7 @@ class UKI: self.sections += [section] -def parse_banks(s): +def parse_banks(s: str) -> list[str]: banks = re.split(r',|\s+', s) # TODO: do some sanity checking here return banks @@ -407,7 +419,8 @@ KNOWN_PHASES = ( 'final', ) -def parse_phase_paths(s): + +def parse_phase_paths(s: str) -> list[str]: # Split on commas or whitespace here. Commas might be hard to parse visually. paths = re.split(r',|\s+', s) @@ -419,7 +432,7 @@ def parse_phase_paths(s): return paths -def check_splash(filename): +def check_splash(filename: Optional[str]) -> None: if filename is None: return @@ -433,50 +446,56 @@ def check_splash(filename): print(f'Splash image {filename} is {img.width}×{img.height} pixels') -def check_inputs(opts): +def check_inputs(opts: argparse.Namespace) -> None: for name, value in vars(opts).items(): if name in {'output', 'tools'}: continue - if isinstance(value, pathlib.Path): + if isinstance(value, Path): # Open file to check that we can read it, or generate an exception value.open().close() elif isinstance(value, list): for item in value: - if isinstance(item, pathlib.Path): + if isinstance(item, Path): item.open().close() check_splash(opts.splash) -def check_cert_and_keys_nonexistent(opts): +def check_cert_and_keys_nonexistent(opts: argparse.Namespace) -> None: # Raise if any of the keys and certs are found on disk paths = itertools.chain( (opts.sb_key, opts.sb_cert), - *((priv_key, pub_key) - for priv_key, pub_key, _ in key_path_groups(opts))) + *((priv_key, pub_key) for priv_key, pub_key, _ in key_path_groups(opts)), + ) for path in paths: - if path and pathlib.Path(path).exists(): + if path and Path(path).exists(): raise ValueError(f'{path} is present') -def find_tool(name, fallback=None, opts=None): +def find_tool( + name: str, + fallback: Optional[str] = None, + opts: Optional[argparse.Namespace] = None, + msg: str = 'Tool {name} not installed!', +) -> Union[str, Path]: if opts and opts.tools: for d in opts.tools: tool = d / name if tool.exists(): - return tool + return cast(Path, tool) if shutil.which(name) is not None: return name if fallback is None: - print(f"Tool {name} not installed!") + raise ValueError(msg.format(name=name)) return fallback -def combine_signatures(pcrsigs): - combined = collections.defaultdict(list) + +def combine_signatures(pcrsigs: list[dict[str, str]]) -> str: + combined: collections.defaultdict[str, list[str]] = collections.defaultdict(list) for pcrsig in pcrsigs: for bank, sigs in pcrsig.items(): for sig in sigs: @@ -485,7 +504,7 @@ def combine_signatures(pcrsigs): return json.dumps(combined) -def key_path_groups(opts): +def key_path_groups(opts: argparse.Namespace) -> Iterator: if not opts.pcr_private_keys: return @@ -493,19 +512,23 @@ def key_path_groups(opts): pub_keys = opts.pcr_public_keys or [None] * n_priv pp_groups = opts.phase_path_groups or [None] * n_priv - yield from zip(opts.pcr_private_keys, - pub_keys, - pp_groups) + yield from zip( + opts.pcr_private_keys, + pub_keys, + pp_groups, + ) -def pe_strip_section_name(name): - return name.rstrip(b"\x00").decode() +def pe_strip_section_name(name: bytes) -> str: + return name.rstrip(b'\x00').decode() -def call_systemd_measure(uki, opts, profile_start=0): - measure_tool = find_tool('systemd-measure', - '/usr/lib/systemd/systemd-measure', - opts=opts) +def call_systemd_measure(uki: UKI, opts: argparse.Namespace, profile_start: int = 0) -> None: + measure_tool = find_tool( + 'systemd-measure', + '/usr/lib/systemd/systemd-measure', + opts=opts, + ) banks = opts.pcr_banks or () @@ -518,7 +541,7 @@ def call_systemd_measure(uki, opts, profile_start=0): if profile_start != 0: for section in uki.sections: # If we reach the first .profile section the base is over - if section.name == ".profile": + if section.name == '.profile': break # Only some sections are measured @@ -537,14 +560,11 @@ def call_systemd_measure(uki, opts, profile_start=0): cmd = [ measure_tool, 'calculate', - *(f"--{s.name.removeprefix('.')}={s.content}" - for s in to_measure.values()), - *(f'--bank={bank}' - for bank in banks), + *(f"--{s.name.removeprefix('.')}={s.content}" for s in to_measure.values()), + *(f'--bank={bank}' for bank in banks), # For measurement, the keys are not relevant, so we can lump all the phase paths # into one call to systemd-measure calculate. - *(f'--phase={phase_path}' - for phase_path in itertools.chain.from_iterable(pp_groups)), + *(f'--phase={phase_path}' for phase_path in itertools.chain.from_iterable(pp_groups)), ] print('+', shell_join(cmd)) @@ -558,10 +578,8 @@ def call_systemd_measure(uki, opts, profile_start=0): cmd = [ measure_tool, 'sign', - *(f"--{s.name.removeprefix('.')}={s.content}" - for s in to_measure.values()), - *(f'--bank={bank}' - for bank in banks), + *(f"--{s.name.removeprefix('.')}={s.content}" for s in to_measure.values()), + *(f'--bank={bank}' for bank in banks), ] for priv_key, pub_key, group in key_path_groups(opts): @@ -574,8 +592,8 @@ def call_systemd_measure(uki, opts, profile_start=0): extra += [f'--public-key={pub_key}'] extra += [f'--phase={phase_path}' for phase_path in group or ()] - print('+', shell_join(cmd + extra)) - pcrsig = subprocess.check_output(cmd + extra, text=True) + print('+', shell_join(cmd + extra)) # type: ignore + pcrsig = subprocess.check_output(cmd + extra, text=True) # type: ignore pcrsig = json.loads(pcrsig) pcrsigs += [pcrsig] @@ -583,7 +601,7 @@ def call_systemd_measure(uki, opts, profile_start=0): uki.add_section(Section.create('.pcrsig', combined)) -def join_initrds(initrds): +def join_initrds(initrds: list[Path]) -> Union[Path, bytes, None]: if not initrds: return None if len(initrds) == 1: @@ -599,7 +617,10 @@ def join_initrds(initrds): return b''.join(seq) -def pairwise(iterable): +T = TypeVar('T') + + +def pairwise(iterable: Iterable[T]) -> Iterator[tuple[T, Optional[T]]]: a, b = itertools.tee(iterable) next(b, None) return zip(a, b) @@ -609,7 +630,7 @@ class PEError(Exception): pass -def pe_add_sections(uki: UKI, output: str): +def pe_add_sections(uki: UKI, output: str) -> None: pe = pefile.PE(uki.executable, fast_load=True) # Old stubs do not have the symbol/string table stripped, even though image files should not have one. @@ -637,15 +658,23 @@ def pe_add_sections(uki: UKI, output: str): padp = section.PointerToRawData - oldp padsz = section.SizeOfRawData - oldsz - for later_section in pe.sections[i+1:]: + for later_section in pe.sections[i + 1 :]: later_section.PointerToRawData += padp + padsz - pe.__data__ = pe.__data__[:oldp] + bytes(padp) + pe.__data__[oldp:oldp+oldsz] + bytes(padsz) + pe.__data__[oldp+oldsz:] + pe.__data__ = ( + pe.__data__[:oldp] + + bytes(padp) + + pe.__data__[oldp : oldp + oldsz] + + bytes(padsz) + + pe.__data__[oldp + oldsz :] + ) # We might not have any space to add new sections. Let's try our best to make some space by padding the # SizeOfHeaders to a multiple of the file alignment. This is safe because the first section's data starts # at a multiple of the file alignment, so all space before that is unused. - pe.OPTIONAL_HEADER.SizeOfHeaders = round_up(pe.OPTIONAL_HEADER.SizeOfHeaders, pe.OPTIONAL_HEADER.FileAlignment) + pe.OPTIONAL_HEADER.SizeOfHeaders = round_up( + pe.OPTIONAL_HEADER.SizeOfHeaders, pe.OPTIONAL_HEADER.FileAlignment + ) pe = pefile.PE(data=pe.write(), fast_load=True) warnings = pe.get_warnings() @@ -702,12 +731,22 @@ def pe_add_sections(uki: UKI, output: str): raise PEError(f'Not enough space in existing section {section.name} to append new data.') padding = bytes(new_section.SizeOfRawData - new_section.Misc_VirtualSize) - pe.__data__ = pe.__data__[:s.PointerToRawData] + data + padding + pe.__data__[pe.sections[i+1].PointerToRawData:] + pe.__data__ = ( + pe.__data__[: s.PointerToRawData] + + data + + padding + + pe.__data__[pe.sections[i + 1].PointerToRawData :] + ) s.SizeOfRawData = new_section.SizeOfRawData s.Misc_VirtualSize = new_section.Misc_VirtualSize break else: - pe.__data__ = pe.__data__[:] + bytes(new_section.PointerToRawData - len(pe.__data__)) + data + bytes(new_section.SizeOfRawData - len(data)) + pe.__data__ = ( + pe.__data__[:] + + bytes(new_section.PointerToRawData - len(pe.__data__)) + + data + + bytes(new_section.SizeOfRawData - len(data)) + ) pe.FILE_HEADER.NumberOfSections += 1 pe.OPTIONAL_HEADER.SizeOfInitializedData += new_section.Misc_VirtualSize @@ -722,21 +761,22 @@ def pe_add_sections(uki: UKI, output: str): pe.write(output) -def merge_sbat(input_pe: [pathlib.Path], input_text: [str]) -> str: + +def merge_sbat(input_pe: list[Path], input_text: list[str]) -> str: sbat = [] for f in input_pe: try: pe = pefile.PE(f, fast_load=True) except pefile.PEFormatError: - print(f"{f} is not a valid PE file, not extracting SBAT section.") + print(f'{f} is not a valid PE file, not extracting SBAT section.') continue for section in pe.sections: - if pe_strip_section_name(section.Name) == ".sbat": - split = section.get_data().rstrip(b"\x00").decode().splitlines() + if pe_strip_section_name(section.Name) == '.sbat': + split = section.get_data().rstrip(b'\x00').decode().splitlines() if not split[0].startswith('sbat,'): - print(f"{f} does not contain a valid SBAT section, skipping.") + print(f'{f} does not contain a valid SBAT section, skipping.') continue # Filter out the sbat line, we'll add it back later, there needs to be only one and it # needs to be first. @@ -744,49 +784,63 @@ def merge_sbat(input_pe: [pathlib.Path], input_text: [str]) -> str: for t in input_text: if t.startswith('@'): - t = pathlib.Path(t[1:]).read_text() + t = Path(t[1:]).read_text() split = t.splitlines() if not split[0].startswith('sbat,'): - print(f"{t} does not contain a valid SBAT section, skipping.") + print(f'{t} does not contain a valid SBAT section, skipping.') continue sbat += split[1:] - return 'sbat,1,SBAT Version,sbat,1,https://github.com/rhboot/shim/blob/main/SBAT.md\n' + '\n'.join(sbat) + "\n\x00" + return ( + 'sbat,1,SBAT Version,sbat,1,https://github.com/rhboot/shim/blob/main/SBAT.md\n' + + '\n'.join(sbat) + + '\n\x00' + ) -def signer_sign(cmd): + +def signer_sign(cmd: list[Union[str, Path]]) -> None: print('+', shell_join(cmd)) subprocess.check_call(cmd) -def find_sbsign(opts=None): - return find_tool('sbsign', opts=opts) -def sbsign_sign(sbsign_tool, input_f, output_f, opts=None): +def sbsign_sign( + sbsign_tool: Union[str, Path], + input_f: str, + output_f: str, + opts: argparse.Namespace, +) -> None: sign_invocation = [ sbsign_tool, '--key', opts.sb_key, '--cert', opts.sb_cert, - ] + ] # fmt: skip if opts.signing_engine is not None: sign_invocation += ['--engine', opts.signing_engine] sign_invocation += [ input_f, '--output', output_f, - ] + ] # fmt: skip signer_sign(sign_invocation) -def find_pesign(opts=None): - return find_tool('pesign', opts=opts) -def pesign_sign(pesign_tool, input_f, output_f, opts=None): +def pesign_sign( + pesign_tool: Union[str, Path], + input_f: str, + output_f: str, + opts: argparse.Namespace, +) -> None: sign_invocation = [ - pesign_tool, '-s', '--force', + pesign_tool, + '-s', + '--force', '-n', opts.sb_certdir, '-c', opts.sb_cert_name, '-i', input_f, '-o', output_f, - ] + ] # fmt: skip signer_sign(sign_invocation) + SBVERIFY = { 'name': 'sbverify', 'option': '--list', @@ -797,10 +851,11 @@ PESIGCHECK = { 'name': 'pesign', 'option': '-i', 'output': 'No signatures found.', - 'flags': '-S' + 'flags': '-S', } -def verify(tool, opts): + +def verify(tool: dict[str, str], opts: argparse.Namespace) -> bool: verify_tool = find_tool(tool['name'], opts=opts) cmd = [ verify_tool, @@ -815,35 +870,46 @@ def verify(tool, opts): return tool['output'] in info -def make_uki(opts): + +STUB_SBAT = """\ +sbat,1,SBAT Version,sbat,1,https://github.com/rhboot/shim/blob/main/SBAT.md +uki,1,UKI,uki,1,https://uapi-group.org/specifications/specs/unified_kernel_image/ +""" + +ADDON_SBAT = """\ +sbat,1,SBAT Version,sbat,1,https://github.com/rhboot/shim/blob/main/SBAT.md +uki-addon,1,UKI Addon,addon,1,https://www.freedesktop.org/software/systemd/man/latest/systemd-stub.html +""" + + +def make_uki(opts: argparse.Namespace) -> None: # kernel payload signing sign_tool = None sign_args_present = opts.sb_key or opts.sb_cert_name sign_kernel = opts.sign_kernel - sign = None + sign: Optional[Callable[[Union[str, Path], str, str, argparse.Namespace], None]] = None linux = opts.linux if sign_args_present: if opts.signtool == 'sbsign': - sign_tool = find_sbsign(opts=opts) + sign_tool = find_tool('sbsign', opts=opts, msg='sbsign, required for signing, is not installed') sign = sbsign_sign verify_tool = SBVERIFY else: - sign_tool = find_pesign(opts=opts) + sign_tool = find_tool('pesign', opts=opts, msg='pesign, required for signing, is not installed') sign = pesign_sign verify_tool = PESIGCHECK - if sign_tool is None: - raise ValueError(f'{opts.signtool}, required for signing, is not installed') - if sign_kernel is None and opts.linux is not None: # figure out if we should sign the kernel sign_kernel = verify(verify_tool, opts) if sign_kernel: + assert sign is not None + assert sign_tool is not None linux_signed = tempfile.NamedTemporaryFile(prefix='linux-signed') - linux = pathlib.Path(linux_signed.name) + linux = Path(linux_signed.name) sign(sign_tool, opts.linux, linux, opts=opts) if opts.uname is None and opts.linux is not None: @@ -857,13 +923,14 @@ def make_uki(opts): if pcrpkey is None: if opts.pcr_public_keys and len(opts.pcr_public_keys) == 1: pcrpkey = opts.pcr_public_keys[0] - # If we are getting a certificate when using an engine, we need to convert it to public key format - if opts.signing_engine is not None and pathlib.Path(pcrpkey).exists(): + # If we are getting a certificate when using an engine, we need to convert it to public key + # format + if opts.signing_engine is not None and Path(pcrpkey).exists(): from cryptography.hazmat.primitives import serialization from cryptography.x509 import load_pem_x509_certificate try: - cert = load_pem_x509_certificate(pathlib.Path(pcrpkey).read_bytes()) + cert = load_pem_x509_certificate(Path(pcrpkey).read_bytes()) except ValueError: raise ValueError(f'{pcrpkey} must be an X.509 certificate when signing with an engine') else: @@ -873,7 +940,11 @@ def make_uki(opts): ) elif opts.pcr_private_keys and len(opts.pcr_private_keys) == 1: from cryptography.hazmat.primitives import serialization - privkey = serialization.load_pem_private_key(pathlib.Path(opts.pcr_private_keys[0]).read_bytes(), password=None) + + privkey = serialization.load_pem_private_key( + Path(opts.pcr_private_keys[0]).read_bytes(), + password=None, + ) pcrpkey = privkey.public_key().public_bytes( encoding=serialization.Encoding.PEM, format=serialization.PublicFormat.SubjectPublicKeyInfo, @@ -881,19 +952,19 @@ def make_uki(opts): sections = [ # name, content, measure? - ('.osrel', opts.os_release, True ), - ('.cmdline', opts.cmdline, True ), - ('.dtb', opts.devicetree, True ), - ('.uname', opts.uname, True ), - ('.splash', opts.splash, True ), - ('.pcrpkey', pcrpkey, True ), - ('.initrd', initrd, True ), - ('.ucode', opts.microcode, True ), - ] + ('.osrel', opts.os_release, True), + ('.cmdline', opts.cmdline, True), + ('.dtb', opts.devicetree, True), + ('.uname', opts.uname, True), + ('.splash', opts.splash, True), + ('.pcrpkey', pcrpkey, True), + ('.initrd', initrd, True), + ('.ucode', opts.microcode, True), + ] # fmt: skip # If we're building a PE profile binary, the ".profile" section has to be the first one. if opts.profile and not opts.join_profiles: - uki.add_section(Section.create(".profile", opts.profile, measure=True)) + uki.add_section(Section.create('.profile', opts.profile, measure=True)) for name, content, measure in sections: if content: @@ -907,7 +978,7 @@ def make_uki(opts): try: virtual_size = pefile.PE(linux, fast_load=True).OPTIONAL_HEADER.SizeOfImage except pefile.PEFormatError: - print(f"{linux} is not a valid PE file, not using SizeOfImage.") + print(f'{linux} is not a valid PE file, not using SizeOfImage.') virtual_size = None uki.add_section(Section.create('.linux', linux, measure=True, virtual_size=virtual_size)) @@ -915,26 +986,23 @@ def make_uki(opts): # Don't add a sbat section to profile PE binaries. if opts.join_profiles or not opts.profile: if linux is not None: - # Merge the .sbat sections from stub, kernel and parameter, so that revocation can be done on either. + # Merge the .sbat sections from stub, kernel and parameter, so that revocation can be done on + # either. input_pes = [opts.stub, linux] if not opts.sbat: - opts.sbat = ["""sbat,1,SBAT Version,sbat,1,https://github.com/rhboot/shim/blob/main/SBAT.md -uki,1,UKI,uki,1,https://uapi-group.org/specifications/specs/unified_kernel_image/ -"""] + opts.sbat = [STUB_SBAT] else: # Addons don't use the stub so we add SBAT manually input_pes = [] if not opts.sbat: - opts.sbat = ["""sbat,1,SBAT Version,sbat,1,https://github.com/rhboot/shim/blob/main/SBAT.md -uki-addon,1,UKI Addon,addon,1,https://www.freedesktop.org/software/systemd/man/latest/systemd-stub.html -"""] + opts.sbat = [ADDON_SBAT] uki.add_section(Section.create('.sbat', merge_sbat(input_pes, opts.sbat), measure=linux is not None)) # If we're building a UKI with additional profiles, the .profile section for the base profile has to be # the last one so that everything before it is shared between profiles. The only thing we don't share # between profiles is the .pcrsig section which is appended later and doesn't make sense to share. if opts.profile and opts.join_profiles: - uki.add_section(Section.create(".profile", opts.profile, measure=True)) + uki.add_section(Section.create('.profile', opts.profile, measure=True)) # PCR measurement and signing @@ -942,7 +1010,18 @@ uki-addon,1,UKI Addon,addon,1,https://www.freedesktop.org/software/systemd/man/l # UKI profiles - to_import = {'.linux', '.osrel', '.cmdline', '.initrd', '.ucode', '.splash', '.dtb', '.uname', '.sbat', '.profile'} + to_import = { + '.linux', + '.osrel', + '.cmdline', + '.initrd', + '.ucode', + '.splash', + '.dtb', + '.uname', + '.sbat', + '.profile', + } for profile in opts.join_profiles: pe = pefile.PE(profile, fast_load=True) @@ -952,12 +1031,14 @@ uki-addon,1,UKI Addon,addon,1,https://www.freedesktop.org/software/systemd/man/l names = [n for n in names if n in to_import] if len(names) == 0: - raise ValueError(f"Found no valid sections in PE profile binary {profile}") + raise ValueError(f'Found no valid sections in PE profile binary {profile}') - if names[0] != ".profile": - raise ValueError(f'Expected .profile section as first valid section in PE profile binary {profile} but got {names[0]}') + if names[0] != '.profile': + raise ValueError( + f'Expected .profile section as first valid section in PE profile binary {profile} but got {names[0]}' # noqa: E501 + ) - if names.count(".profile") > 1: + if names.count('.profile') > 1: raise ValueError(f'Profile PE binary {profile} contains multiple .profile sections') for section in pe.sections: @@ -967,7 +1048,9 @@ uki-addon,1,UKI Addon,addon,1,https://www.freedesktop.org/software/systemd/man/l continue print(f"Copying section '{n}' from '{profile}': {section.Misc_VirtualSize} bytes") - uki.add_section(Section.create(n, section.get_data(length=section.Misc_VirtualSize), measure=True)) + uki.add_section( + Section.create(n, section.get_data(length=section.Misc_VirtualSize), measure=True) + ) call_systemd_measure(uki, opts=opts, profile_start=prev_len + 1) @@ -984,8 +1067,9 @@ uki-addon,1,UKI Addon,addon,1,https://www.freedesktop.org/software/systemd/man/l # UKI signing if sign_args_present: - assert sign - sign(sign_tool, unsigned_output, opts.output, opts=opts) + assert sign is not None + assert sign_tool is not None + sign(sign_tool, unsigned_output, opts.output, opts) # We end up with no executable bits, let's reapply them os.umask(umask := os.umask(0)) @@ -995,7 +1079,7 @@ uki-addon,1,UKI Addon,addon,1,https://www.freedesktop.org/software/systemd/man/l @contextlib.contextmanager -def temporary_umask(mask: int): +def temporary_umask(mask: int) -> Iterator[None]: # Drop bits from umask old = os.umask(0) os.umask(old | mask) @@ -1006,13 +1090,12 @@ def temporary_umask(mask: int): def generate_key_cert_pair( - common_name: str, - valid_days: int, - keylength: int = 2048, -) -> tuple[bytes]: - + common_name: str, + valid_days: int, + keylength: int = 2048, +) -> tuple[bytes, bytes]: from cryptography import x509 - from cryptography.hazmat.primitives import serialization, hashes + from cryptography.hazmat.primitives import hashes, serialization from cryptography.hazmat.primitives.asymmetric import rsa # We use a keylength of 2048 bits. That is what Microsoft documents as @@ -1025,25 +1108,34 @@ def generate_key_cert_pair( public_exponent=65537, key_size=keylength, ) - cert = x509.CertificateBuilder( - ).subject_name( - x509.Name([x509.NameAttribute(x509.oid.NameOID.COMMON_NAME, common_name)]) - ).issuer_name( - x509.Name([x509.NameAttribute(x509.oid.NameOID.COMMON_NAME, common_name)]) - ).not_valid_before( - now, - ).not_valid_after( - now + datetime.timedelta(days=valid_days) - ).serial_number( - x509.random_serial_number() - ).public_key( - key.public_key() - ).add_extension( - x509.BasicConstraints(ca=False, path_length=None), - critical=True, - ).sign( - private_key=key, - algorithm=hashes.SHA256(), + cert = ( + x509.CertificateBuilder() + .subject_name( + x509.Name([x509.NameAttribute(x509.oid.NameOID.COMMON_NAME, common_name)]), + ) + .issuer_name( + x509.Name([x509.NameAttribute(x509.oid.NameOID.COMMON_NAME, common_name)]), + ) + .not_valid_before( + now, + ) + .not_valid_after( + now + datetime.timedelta(days=valid_days), + ) + .serial_number( + x509.random_serial_number(), + ) + .public_key( + key.public_key(), + ) + .add_extension( + x509.BasicConstraints(ca=False, path_length=None), + critical=True, + ) + .sign( + private_key=key, + algorithm=hashes.SHA256(), + ) ) cert_pem = cert.public_bytes( @@ -1058,7 +1150,7 @@ def generate_key_cert_pair( return key_pem, cert_pem -def generate_priv_pub_key_pair(keylength : int = 2048) -> tuple[bytes]: +def generate_priv_pub_key_pair(keylength: int = 2048) -> tuple[bytes, bytes]: from cryptography.hazmat.primitives import serialization from cryptography.hazmat.primitives.asymmetric import rsa @@ -1079,7 +1171,7 @@ def generate_priv_pub_key_pair(keylength : int = 2048) -> tuple[bytes]: return priv_key_pem, pub_key_pem -def generate_keys(opts): +def generate_keys(opts: argparse.Namespace) -> None: work = False # This will generate keys and certificates and write them to the paths that @@ -1104,7 +1196,7 @@ def generate_keys(opts): print(f'Writing private key for PCR signing to {priv_key}') with temporary_umask(0o077): - pathlib.Path(priv_key).write_bytes(priv_key_pem) + Path(priv_key).write_bytes(priv_key_pem) if pub_key: print(f'Writing public key for PCR signing to {pub_key}') pub_key.write_bytes(pub_key_pem) @@ -1112,17 +1204,20 @@ def generate_keys(opts): work = True if not work: - raise ValueError('genkey: --secureboot-private-key=/--secureboot-certificate= or --pcr-private-key/--pcr-public-key must be specified') + raise ValueError( + 'genkey: --secureboot-private-key=/--secureboot-certificate= or --pcr-private-key/--pcr-public-key must be specified' # noqa: E501 + ) -def inspect_section(opts, section): +def inspect_section( + opts: argparse.Namespace, + section: pefile.SectionStructure, +) -> tuple[str, Optional[dict[str, Union[int, str]]]]: name = pe_strip_section_name(section.Name) # find the config for this section in opts and whether to show it config = opts.sections_by_name.get(name, None) - show = (config or - opts.all or - (name in DEFAULT_SECTIONS_TO_SHOW and not opts.sections)) + show = config or opts.all or (name in DEFAULT_SECTIONS_TO_SHOW and not opts.sections) if not show: return name, None @@ -1134,37 +1229,37 @@ def inspect_section(opts, section): digest = sha256(data).hexdigest() struct = { - 'size' : size, - 'sha256' : digest, + 'size': size, + 'sha256': digest, } if ttype == 'text': try: struct['text'] = data.decode() except UnicodeDecodeError as e: - print(f"Section {name!r} is not valid text: {e}") + print(f'Section {name!r} is not valid text: {e}') struct['text'] = '(not valid UTF-8)' if config and config.content: - assert isinstance(config.content, pathlib.Path) + assert isinstance(config.content, Path) config.content.write_bytes(data) if opts.json == 'off': - print(f"{name}:\n size: {size} bytes\n sha256: {digest}") + print(f'{name}:\n size: {size} bytes\n sha256: {digest}') if ttype == 'text': text = textwrap.indent(struct['text'].rstrip(), ' ' * 4) - print(f" text:\n{text}") + print(f' text:\n{text}') return name, struct -def inspect_sections(opts): +def inspect_sections(opts: argparse.Namespace) -> None: indent = 4 if opts.json == 'pretty' else None for file in opts.files: pe = pefile.PE(file, fast_load=True) gen = (inspect_section(opts, section) for section in pe.sections) - descs = {key:val for (key, val) in gen if val} + descs = {key: val for (key, val) in gen if val} if opts.json != 'off': json.dump(descs, sys.stdout, indent=indent) @@ -1173,10 +1268,10 @@ def inspect_sections(opts): class ConfigItem: @staticmethod def config_list_prepend( - namespace: argparse.Namespace, - group: Optional[str], - dest: str, - value: Any, + namespace: argparse.Namespace, + group: Optional[str], + dest: str, + value: Any, ) -> None: "Prepend value to namespace." @@ -1189,10 +1284,10 @@ class ConfigItem: @staticmethod def config_set_if_unset( - namespace: argparse.Namespace, - group: Optional[str], - dest: str, - value: Any, + namespace: argparse.Namespace, + group: Optional[str], + dest: str, + value: Any, ) -> None: "Set namespace. to value only if it was None" @@ -1203,10 +1298,10 @@ class ConfigItem: @staticmethod def config_set( - namespace: argparse.Namespace, - group: Optional[str], - dest: str, - value: Any, + namespace: argparse.Namespace, + group: Optional[str], + dest: str, + value: Any, ) -> None: "Set namespace. to value only if it was None" @@ -1216,10 +1311,10 @@ class ConfigItem: @staticmethod def config_set_group( - namespace: argparse.Namespace, - group: Optional[str], - dest: str, - value: Any, + namespace: argparse.Namespace, + group: Optional[str], + dest: str, + value: Any, ) -> None: "Set namespace.[idx] to value, with idx derived from group" @@ -1231,8 +1326,11 @@ class ConfigItem: old = getattr(namespace, dest, None) if old is None: old = [] - setattr(namespace, dest, - old + ([None] * (idx - len(old))) + [value]) + setattr( + namespace, + dest, + old + ([None] * (idx - len(old))) + [value], + ) @staticmethod def parse_boolean(s: str) -> bool: @@ -1259,8 +1357,7 @@ class ConfigItem: # metadata for config file parsing config_key: Optional[str] = None - config_push: Callable[[argparse.Namespace, Optional[str], str, Any], None] = \ - config_set_if_unset + config_push: Callable[[argparse.Namespace, Optional[str], str, Any], None] = config_set_if_unset def _names(self) -> tuple[str, ...]: return self.name if isinstance(self.name, tuple) else (self.name,) @@ -1271,15 +1368,23 @@ class ConfigItem: return self.dest return self._names()[0].lstrip('-').replace('-', '_') - def add_to(self, parser: argparse.ArgumentParser): - kwargs = { key:val - for key in dataclasses.asdict(self) - if (key not in ('name', 'config_key', 'config_push') and - (val := getattr(self, key)) is not None) } + def add_to(self, parser: argparse.ArgumentParser) -> None: + kwargs = { + key: val + for key in dataclasses.asdict(self) + if (key not in ('name', 'config_key', 'config_push') and (val := getattr(self, key)) is not None) + } args = self._names() parser.add_argument(*args, **kwargs) - def apply_config(self, namespace, section, group, key, value) -> None: + def apply_config( + self, + namespace: argparse.Namespace, + section: str, + group: Optional[str], + key: str, + value: Any, + ) -> None: assert f'{section}/{key}' == self.config_key dest = self.argparse_dest() @@ -1292,7 +1397,7 @@ class ConfigItem: elif self.type: conv = self.type else: - conv = lambda s:s + conv = lambda s: s # noqa: E731 # This is a bit ugly, but --initrd is the only option which is specified # with multiple args on the command line and a space-separated list in the @@ -1322,268 +1427,250 @@ VERBS = ('build', 'genkey', 'inspect') CONFIG_ITEMS = [ ConfigItem( 'positional', - metavar = 'VERB', - nargs = '*', - help = argparse.SUPPRESS, + metavar='VERB', + nargs='*', + help=argparse.SUPPRESS, ), - ConfigItem( '--version', - action = 'version', - version = f'ukify {__version__}', + action='version', + version=f'ukify {__version__}', ), - ConfigItem( '--summary', - help = 'print parsed config and exit', - action = 'store_true', + help='print parsed config and exit', + action='store_true', ), - ConfigItem( ('--config', '-c'), - metavar = 'PATH', - type = pathlib.Path, - help = 'configuration file', + metavar='PATH', + type=Path, + help='configuration file', ), - ConfigItem( '--linux', - type = pathlib.Path, - help = 'vmlinuz file [.linux section]', - config_key = 'UKI/Linux', + type=Path, + help='vmlinuz file [.linux section]', + config_key='UKI/Linux', ), - ConfigItem( '--os-release', - metavar = 'TEXT|@PATH', - help = 'path to os-release file [.osrel section]', - config_key = 'UKI/OSRelease', + metavar='TEXT|@PATH', + help='path to os-release file [.osrel section]', + config_key='UKI/OSRelease', ), - ConfigItem( '--cmdline', - metavar = 'TEXT|@PATH', - help = 'kernel command line [.cmdline section]', - config_key = 'UKI/Cmdline', + metavar='TEXT|@PATH', + help='kernel command line [.cmdline section]', + config_key='UKI/Cmdline', ), - ConfigItem( '--initrd', - metavar = 'INITRD', - type = pathlib.Path, - action = 'append', - help = 'initrd file [part of .initrd section]', - config_key = 'UKI/Initrd', - config_push = ConfigItem.config_list_prepend, + metavar='INITRD', + type=Path, + action='append', + help='initrd file [part of .initrd section]', + config_key='UKI/Initrd', + config_push=ConfigItem.config_list_prepend, ), - ConfigItem( '--microcode', - metavar = 'UCODE', - type = pathlib.Path, - help = 'microcode file [.ucode section]', - config_key = 'UKI/Microcode', + metavar='UCODE', + type=Path, + help='microcode file [.ucode section]', + config_key='UKI/Microcode', ), - ConfigItem( '--splash', - metavar = 'BMP', - type = pathlib.Path, - help = 'splash image bitmap file [.splash section]', - config_key = 'UKI/Splash', + metavar='BMP', + type=Path, + help='splash image bitmap file [.splash section]', + config_key='UKI/Splash', ), - ConfigItem( '--devicetree', - metavar = 'PATH', - type = pathlib.Path, - help = 'Device Tree file [.dtb section]', - config_key = 'UKI/DeviceTree', + metavar='PATH', + type=Path, + help='Device Tree file [.dtb section]', + config_key='UKI/DeviceTree', ), - ConfigItem( '--uname', metavar='VERSION', help='"uname -r" information [.uname section]', - config_key = 'UKI/Uname', + config_key='UKI/Uname', ), - ConfigItem( '--sbat', - metavar = 'TEXT|@PATH', - help = 'SBAT policy [.sbat section]', - default = [], - action = 'append', - config_key = 'UKI/SBAT', + metavar='TEXT|@PATH', + help='SBAT policy [.sbat section]', + default=[], + action='append', + config_key='UKI/SBAT', ), - ConfigItem( '--pcrpkey', - metavar = 'KEY', - type = pathlib.Path, - help = 'embedded public key to seal secrets to [.pcrpkey section]', - config_key = 'UKI/PCRPKey', + metavar='KEY', + type=Path, + help='embedded public key to seal secrets to [.pcrpkey section]', + config_key='UKI/PCRPKey', ), - ConfigItem( '--section', - dest = 'sections', - metavar = 'NAME:TEXT|@PATH', - action = 'append', - default = [], - help = 'section as name and contents [NAME section] or section to print', + dest='sections', + metavar='NAME:TEXT|@PATH', + action='append', + default=[], + help='section as name and contents [NAME section] or section to print', ), - ConfigItem( '--profile', - metavar = 'TEST|@PATH', - help = 'Profile information [.profile section]', - config_key = 'UKI/Profile', + metavar='TEST|@PATH', + help='Profile information [.profile section]', + config_key='UKI/Profile', ), - ConfigItem( '--join-profile', - dest = 'join_profiles', - metavar = 'PATH', - action = 'append', - default = [], - help = 'A PE binary containing an additional profile to add to the UKI', + dest='join_profiles', + metavar='PATH', + action='append', + default=[], + help='A PE binary containing an additional profile to add to the UKI', ), - ConfigItem( '--efi-arch', - metavar = 'ARCH', - choices = ('ia32', 'x64', 'arm', 'aa64', 'riscv64'), - help = 'target EFI architecture', - config_key = 'UKI/EFIArch', + metavar='ARCH', + choices=('ia32', 'x64', 'arm', 'aa64', 'riscv64'), + help='target EFI architecture', + config_key='UKI/EFIArch', ), - ConfigItem( '--stub', - type = pathlib.Path, - help = 'path to the sd-stub file [.text,.data,… sections]', - config_key = 'UKI/Stub', + type=Path, + help='path to the sd-stub file [.text,.data,… sections]', + config_key='UKI/Stub', ), - ConfigItem( '--pcr-banks', - metavar = 'BANK…', - type = parse_banks, - config_key = 'UKI/PCRBanks', + metavar='BANK…', + type=parse_banks, + config_key='UKI/PCRBanks', ), - ConfigItem( '--signing-engine', - metavar = 'ENGINE', - help = 'OpenSSL engine to use for signing', - config_key = 'UKI/SigningEngine', + metavar='ENGINE', + help='OpenSSL engine to use for signing', + config_key='UKI/SigningEngine', ), ConfigItem( '--signtool', - choices = ('sbsign', 'pesign'), - dest = 'signtool', - help = 'whether to use sbsign or pesign. It will also be inferred by the other \ - parameters given: when using --secureboot-{private-key/certificate}, sbsign \ - will be used, otherwise pesign will be used', - config_key = 'UKI/SecureBootSigningTool', + choices=('sbsign', 'pesign'), + dest='signtool', + help=( + 'whether to use sbsign or pesign. It will also be inferred by the other ' + 'parameters given: when using --secureboot-{private-key/certificate}, sbsign ' + 'will be used, otherwise pesign will be used' + ), + config_key='UKI/SecureBootSigningTool', ), ConfigItem( '--secureboot-private-key', - dest = 'sb_key', - help = 'required by --signtool=sbsign. Path to key file or engine-specific designation for SB signing', - config_key = 'UKI/SecureBootPrivateKey', + dest='sb_key', + help='required by --signtool=sbsign. Path to key file or engine-specific designation for SB signing', + config_key='UKI/SecureBootPrivateKey', ), ConfigItem( '--secureboot-certificate', - dest = 'sb_cert', - help = 'required by --signtool=sbsign. sbsign needs a path to certificate file or engine-specific designation for SB signing', - config_key = 'UKI/SecureBootCertificate', + dest='sb_cert', + help=( + 'required by --signtool=sbsign. sbsign needs a path to certificate file or engine-specific designation for SB signing' # noqa: E501 + ), + config_key='UKI/SecureBootCertificate', ), ConfigItem( '--secureboot-certificate-dir', - dest = 'sb_certdir', - default = '/etc/pki/pesign', - help = 'required by --signtool=pesign. Path to nss certificate database directory for PE signing. Default is /etc/pki/pesign', - config_key = 'UKI/SecureBootCertificateDir', - config_push = ConfigItem.config_set + dest='sb_certdir', + default='/etc/pki/pesign', + help=( + 'required by --signtool=pesign. Path to nss certificate database directory for PE signing. Default is /etc/pki/pesign' # noqa: E501 + ), + config_key='UKI/SecureBootCertificateDir', + config_push=ConfigItem.config_set, ), ConfigItem( '--secureboot-certificate-name', - dest = 'sb_cert_name', - help = 'required by --signtool=pesign. pesign needs a certificate nickname of nss certificate database entry to use for PE signing', - config_key = 'UKI/SecureBootCertificateName', + dest='sb_cert_name', + help=( + 'required by --signtool=pesign. pesign needs a certificate nickname of nss certificate database entry to use for PE signing' # noqa: E501 + ), + config_key='UKI/SecureBootCertificateName', ), ConfigItem( '--secureboot-certificate-validity', - metavar = 'DAYS', - type = int, - dest = 'sb_cert_validity', - default = 365 * 10, - help = "period of validity (in days) for a certificate created by 'genkey'", - config_key = 'UKI/SecureBootCertificateValidity', - config_push = ConfigItem.config_set + metavar='DAYS', + type=int, + dest='sb_cert_validity', + default=365 * 10, + help="period of validity (in days) for a certificate created by 'genkey'", + config_key='UKI/SecureBootCertificateValidity', + config_push=ConfigItem.config_set, ), - ConfigItem( '--sign-kernel', - action = argparse.BooleanOptionalAction, - help = 'Sign the embedded kernel', - config_key = 'UKI/SignKernel', + action=argparse.BooleanOptionalAction, + help='Sign the embedded kernel', + config_key='UKI/SignKernel', ), - ConfigItem( '--pcr-private-key', - dest = 'pcr_private_keys', - action = 'append', - help = 'private part of the keypair or engine-specific designation for signing PCR signatures', - config_key = 'PCRSignature:/PCRPrivateKey', - config_push = ConfigItem.config_set_group, + dest='pcr_private_keys', + action='append', + help='private part of the keypair or engine-specific designation for signing PCR signatures', + config_key='PCRSignature:/PCRPrivateKey', + config_push=ConfigItem.config_set_group, ), ConfigItem( '--pcr-public-key', - dest = 'pcr_public_keys', - metavar = 'PATH', - type = pathlib.Path, - action = 'append', - help = 'public part of the keypair or engine-specific designation for signing PCR signatures', - config_key = 'PCRSignature:/PCRPublicKey', - config_push = ConfigItem.config_set_group, + dest='pcr_public_keys', + metavar='PATH', + type=Path, + action='append', + help='public part of the keypair or engine-specific designation for signing PCR signatures', + config_key='PCRSignature:/PCRPublicKey', + config_push=ConfigItem.config_set_group, ), ConfigItem( '--phases', - dest = 'phase_path_groups', - metavar = 'PHASE-PATH…', - type = parse_phase_paths, - action = 'append', - help = 'phase-paths to create signatures for', - config_key = 'PCRSignature:/Phases', - config_push = ConfigItem.config_set_group, + dest='phase_path_groups', + metavar='PHASE-PATH…', + type=parse_phase_paths, + action='append', + help='phase-paths to create signatures for', + config_key='PCRSignature:/Phases', + config_push=ConfigItem.config_set_group, ), - ConfigItem( '--tools', - type = pathlib.Path, - action = 'append', - help = 'Directories to search for tools (systemd-measure, …)', + type=Path, + action='append', + help='Directories to search for tools (systemd-measure, …)', ), - ConfigItem( ('--output', '-o'), - type = pathlib.Path, - help = 'output file path', + type=Path, + help='output file path', ), - ConfigItem( '--measure', - action = argparse.BooleanOptionalAction, - help = 'print systemd-measure output for the UKI', + action=argparse.BooleanOptionalAction, + help='print systemd-measure output for the UKI', ), - ConfigItem( '--json', - choices = ('pretty', 'short', 'off'), - default = 'off', - help = 'generate JSON output', + choices=('pretty', 'short', 'off'), + default='off', + help='generate JSON output', ), ConfigItem( '-j', @@ -1592,20 +1679,17 @@ CONFIG_ITEMS = [ const='pretty', help='equivalent to --json=pretty', ), - ConfigItem( '--all', - help = 'print all sections', - action = 'store_true', + help='print all sections', + action='store_true', ), ] -CONFIGFILE_ITEMS = { item.config_key:item - for item in CONFIG_ITEMS - if item.config_key } +CONFIGFILE_ITEMS = {item.config_key: item for item in CONFIG_ITEMS if item.config_key} -def apply_config(namespace, filename=None): +def apply_config(namespace: argparse.Namespace, filename: Union[str, Path, None] = None) -> None: if filename is None: if namespace.config: # Config set by the user, use that. @@ -1614,7 +1698,7 @@ def apply_config(namespace, filename=None): else: # Try to look for a config file then use the first one found. for config_dir in DEFAULT_CONFIG_DIRS: - filename = pathlib.Path(config_dir) / DEFAULT_CONFIG_FILE + filename = Path(config_dir) / DEFAULT_CONFIG_FILE if filename.is_file(): # Found a config file, use it. print(f'Using found config file: {filename}') @@ -1634,19 +1718,20 @@ def apply_config(namespace, filename=None): delimiters='=', empty_lines_in_values=False, interpolation=None, - strict=False) + strict=False, + ) # Do not make keys lowercase - cp.optionxform = lambda option: option + cp.optionxform = lambda option: option # type: ignore # The API is not great. read = cp.read(filename) if not read: - raise IOError(f'Failed to read {filename}') + raise OSError(f'Failed to read {filename}') for section_name, section in cp.items(): idx = section_name.find(':') if idx >= 0: - section_name, group = section_name[:idx+1], section_name[idx+1:] + section_name, group = section_name[: idx + 1], section_name[idx + 1 :] if not section_name or not group: raise ValueError('Section name components cannot be empty') if ':' in group: @@ -1660,8 +1745,8 @@ def apply_config(namespace, filename=None): print(f'Unknown config setting [{section_name}] {key}=') -def config_example(): - prev_section = None +def config_example() -> Iterator[str]: + prev_section: Optional[str] = None for item in CONFIG_ITEMS: section, key, value = item.config_example() if section: @@ -1679,20 +1764,21 @@ class PagerHelpAction(argparse._HelpAction): # pylint: disable=protected-access parser: argparse.ArgumentParser, namespace: argparse.Namespace, values: Union[str, Sequence[Any], None] = None, - option_string: Optional[str] = None + option_string: Optional[str] = None, ) -> None: page(parser.format_help(), True) parser.exit() -def create_parser(): +def create_parser() -> argparse.ArgumentParser: p = argparse.ArgumentParser( description='Build and sign Unified Kernel Images', - usage='\n ' + textwrap.dedent('''\ + usage='\n ' + + textwrap.dedent("""\ ukify {b}build{e} [--linux=LINUX] [--initrd=INITRD] [options…] ukify {b}genkey{e} [options…] ukify {b}inspect{e} FILE… [options…] - ''').format(b=Style.bold, e=Style.reset), + """).format(b=Style.bold, e=Style.reset), allow_abbrev=False, add_help=False, epilog='\n '.join(('config file:', *config_example())), @@ -1703,19 +1789,19 @@ def create_parser(): item.add_to(p) # Suppress printing of usage synopsis on errors - p.error = lambda message: p.exit(2, f'{p.prog}: error: {message}\n') + p.error = lambda message: p.exit(2, f'{p.prog}: error: {message}\n') # type: ignore # Make --help paged p.add_argument( '-h', '--help', action=PagerHelpAction, help='show this help message and exit', - ) + ) # fmt: skip return p -def finalize_options(opts): +def finalize_options(opts: argparse.Namespace) -> None: # Figure out which syntax is being used, one of: # ukify verb --arg --arg --arg # ukify linux initrd… @@ -1734,9 +1820,9 @@ def finalize_options(opts): else: print("Assuming obsolete command line syntax with no verb. Please use 'build'.") if opts.positional: - opts.linux = pathlib.Path(opts.positional[0]) + opts.linux = Path(opts.positional[0]) # If we have initrds from parsing config files, append our positional args at the end - opts.initrd = (opts.initrd or []) + [pathlib.Path(arg) for arg in opts.positional[1:]] + opts.initrd = (opts.initrd or []) + [Path(arg) for arg in opts.positional[1:]] opts.verb = 'build' # Check that --pcr-public-key=, --pcr-private-key=, and --phases= @@ -1750,7 +1836,7 @@ def finalize_options(opts): raise ValueError('--phases= specifications must match --pcr-private-key=') if opts.cmdline and opts.cmdline.startswith('@'): - opts.cmdline = pathlib.Path(opts.cmdline[1:]) + opts.cmdline = Path(opts.cmdline[1:]) elif opts.cmdline: # Drop whitespace from the command line. If we're reading from a file, # we copy the contents verbatim. But configuration specified on the command line @@ -1758,11 +1844,11 @@ def finalize_options(opts): opts.cmdline = ' '.join(opts.cmdline.split()) if opts.os_release and opts.os_release.startswith('@'): - opts.os_release = pathlib.Path(opts.os_release[1:]) + opts.os_release = Path(opts.os_release[1:]) elif not opts.os_release and opts.linux: - p = pathlib.Path('/etc/os-release') + p = Path('/etc/os-release') if not p.exists(): - p = pathlib.Path('/usr/lib/os-release') + p = Path('/usr/lib/os-release') opts.os_release = p if opts.efi_arch is None: @@ -1770,32 +1856,40 @@ def finalize_options(opts): if opts.stub is None: if opts.linux is not None: - opts.stub = pathlib.Path(f'/usr/lib/systemd/boot/efi/linux{opts.efi_arch}.efi.stub') + opts.stub = Path(f'/usr/lib/systemd/boot/efi/linux{opts.efi_arch}.efi.stub') else: - opts.stub = pathlib.Path(f'/usr/lib/systemd/boot/efi/addon{opts.efi_arch}.efi.stub') + opts.stub = Path(f'/usr/lib/systemd/boot/efi/addon{opts.efi_arch}.efi.stub') if opts.signing_engine is None: if opts.sb_key: - opts.sb_key = pathlib.Path(opts.sb_key) + opts.sb_key = Path(opts.sb_key) if opts.sb_cert: - opts.sb_cert = pathlib.Path(opts.sb_cert) + opts.sb_cert = Path(opts.sb_cert) if bool(opts.sb_key) ^ bool(opts.sb_cert): # one param only given, sbsign needs both - raise ValueError('--secureboot-private-key= and --secureboot-certificate= must be specified together') + raise ValueError( + '--secureboot-private-key= and --secureboot-certificate= must be specified together' + ) elif bool(opts.sb_key) and bool(opts.sb_cert): # both param given, infer sbsign and in case it was given, ensure signtool=sbsign if opts.signtool and opts.signtool != 'sbsign': - raise ValueError(f'Cannot provide --signtool={opts.signtool} with --secureboot-private-key= and --secureboot-certificate=') + raise ValueError( + f'Cannot provide --signtool={opts.signtool} with --secureboot-private-key= and --secureboot-certificate=' # noqa: E501 + ) opts.signtool = 'sbsign' elif bool(opts.sb_cert_name): # sb_cert_name given, infer pesign and in case it was given, ensure signtool=pesign if opts.signtool and opts.signtool != 'pesign': - raise ValueError(f'Cannot provide --signtool={opts.signtool} with --secureboot-certificate-name=') + raise ValueError( + f'Cannot provide --signtool={opts.signtool} with --secureboot-certificate-name=' + ) opts.signtool = 'pesign' if opts.sign_kernel and not opts.sb_key and not opts.sb_cert_name: - raise ValueError('--sign-kernel requires either --secureboot-private-key= and --secureboot-certificate= (for sbsign) or --secureboot-certificate-name= (for pesign) to be specified') + raise ValueError( + '--sign-kernel requires either --secureboot-private-key= and --secureboot-certificate= (for sbsign) or --secureboot-certificate-name= (for pesign) to be specified' # noqa: E501 + ) if opts.join_profiles and not opts.profile: # If any additional profiles are added, we need a base profile as well so add one if @@ -1812,24 +1906,22 @@ def finalize_options(opts): f = Section.parse_output if opts.verb == 'inspect' else Section.parse_input opts.sections = [f(s) for s in opts.sections] # A convenience dictionary to make it easy to look up sections - opts.sections_by_name = {s.name:s for s in opts.sections} - - if opts.summary: - # TODO: replace pprint() with some fancy formatting. - pprint.pprint(vars(opts)) - sys.exit() + opts.sections_by_name = {s.name: s for s in opts.sections} -def parse_args(args=None): +def parse_args(args: Optional[list[str]] = None) -> argparse.Namespace: opts = create_parser().parse_args(args) apply_config(opts) finalize_options(opts) return opts -def main(): +def main() -> None: opts = parse_args() - if opts.verb == 'build': + if opts.summary: + # TODO: replace pprint() with some fancy formatting. + pprint.pprint(vars(opts)) + elif opts.verb == 'build': check_inputs(opts) make_uki(opts) elif opts.verb == 'genkey':