# # Support for parsing libvirt's domcapabilities XML # # Copyright 2014 Red Hat, Inc. # # This work is licensed under the GNU GPLv2 or later. # See the COPYING file in the top-level directory. import re import xml.etree.ElementTree as ET import libvirt from .domain import DomainCpu from .logger import log from .xmlbuilder import XMLBuilder, XMLChildProperty, XMLProperty ######################################## # Genering and handling # ######################################## class _Value(XMLBuilder): XML_NAME = "value" value = XMLProperty(".") class _HasValues(XMLBuilder): values = XMLChildProperty(_Value) def get_values(self): return [v.value for v in self.values] def has_value(self, val): return val in self.get_values() class _Enum(_HasValues): XML_NAME = "enum" name = XMLProperty("./@name") class _CapsBlock(_HasValues): supported = XMLProperty("./@supported", is_yesno=True) _supported_present = XMLProperty("./@supported") enums = XMLChildProperty(_Enum) @property def present(self): return self._supported_present is not None def enum_names(self): return [e.name for e in self.enums] def has_enum(self, name): return name in self.enum_names() def get_enum(self, name): for enum in self.enums: if enum.name == name: return enum # Didn't find a match. Could be talking to older libvirt, or # driver with incomplete info. Return a stub enum return _Enum(self.conn) def _make_capsblock(xml_root_name): """ Build a class object representing a list of in the XML. For example, domcapabilities may have a block like: sdl vnc spice To build a class that tracks that whole block, call this like _make_capsblock("graphics") """ class TmpClass(_CapsBlock): pass setattr(TmpClass, "XML_NAME", xml_root_name) return TmpClass ################################ # SEV launch security handling # ################################ class _SEV(XMLBuilder): XML_NAME = "sev" supported = XMLProperty("./@supported", is_yesno=True) maxESGuests = XMLProperty("./maxESGuests") ############################# # Misc toplevel XML classes # ############################# class _OS(_CapsBlock): XML_NAME = "os" loader = XMLChildProperty(_make_capsblock("loader"), is_single=True) class _Devices(_CapsBlock): XML_NAME = "devices" hostdev = XMLChildProperty(_make_capsblock("hostdev"), is_single=True) disk = XMLChildProperty(_make_capsblock("disk"), is_single=True) video = XMLChildProperty(_make_capsblock("video"), is_single=True) graphics = XMLChildProperty(_make_capsblock("graphics"), is_single=True) tpm = XMLChildProperty(_make_capsblock("tpm"), is_single=True) filesystem = XMLChildProperty(_make_capsblock("filesystem"), is_single=True) redirdev = XMLChildProperty(_make_capsblock("redirdev"), is_single=True) channel = XMLChildProperty(_make_capsblock("channel"), is_single=True) panic = XMLChildProperty(_make_capsblock("panic"), is_single=True) class _Features(_CapsBlock): XML_NAME = "features" gic = XMLChildProperty(_make_capsblock("gic"), is_single=True) sev = XMLChildProperty(_SEV, is_single=True) hyperv = XMLChildProperty(_make_capsblock("hyperv"), is_single=True) class _MemoryBacking(_CapsBlock): XML_NAME = "memoryBacking" ############### # CPU classes # ############### class _CPUModel(XMLBuilder): XML_NAME = "model" model = XMLProperty(".") usable = XMLProperty("./@usable") fallback = XMLProperty("./@fallback") class _CPUMode(_CapsBlock): XML_NAME = "mode" name = XMLProperty("./@name") models = XMLChildProperty(_CPUModel) def get_model(self, name): for model in self.models: if model.model == name: return model class _CPU(XMLBuilder): XML_NAME = "cpu" modes = XMLChildProperty(_CPUMode) def get_mode(self, name): for mode in self.modes: if mode.name == name: return mode ############################# # CPU flags/baseline helpers# ############################# def _convert_mode_to_cpu(xml, arch): root = ET.fromstring(xml) root.tag = "cpu" root.attrib = {} aelement = ET.SubElement(root, "arch") aelement.text = arch return ET.tostring(root, encoding="unicode") def _get_expanded_cpu(domcaps, mode): cpuXML = _convert_mode_to_cpu(mode.get_xml(), domcaps.arch) log.debug("Generated CPU XML for security flag baseline:\n%s", cpuXML) try: expandedXML = domcaps.conn.baselineHypervisorCPU( domcaps.path, domcaps.arch, domcaps.machine, domcaps.domain, [cpuXML], libvirt.VIR_CONNECT_BASELINE_CPU_EXPAND_FEATURES) except (libvirt.libvirtError, AttributeError): expandedXML = domcaps.conn.baselineCPU([cpuXML], libvirt.VIR_CONNECT_BASELINE_CPU_EXPAND_FEATURES) return DomainCpu(domcaps.conn, expandedXML) def _lookup_cpu_security_features(domcaps): ret = [] sec_features = [ 'spec-ctrl', 'ssbd', 'ibpb', 'virt-ssbd', 'md-clear'] for m in domcaps.cpu.modes: if m.name != "host-model" or not m.supported: continue # pragma: no cover try: cpu = _get_expanded_cpu(domcaps, m) except libvirt.libvirtError as e: # pragma: no cover log.warning(_("Failed to get expanded CPU XML: %s"), e) break for feature in cpu.features: if feature.name in sec_features: ret.append(feature.name) log.debug("Found host-model security features: %s", ret) return ret ################################# # DomainCapabilities main class # ################################# class DomainCapabilities(XMLBuilder): XML_NAME = "domainCapabilities" os = XMLChildProperty(_OS, is_single=True) cpu = XMLChildProperty(_CPU, is_single=True) devices = XMLChildProperty(_Devices, is_single=True) features = XMLChildProperty(_Features, is_single=True) memorybacking = XMLChildProperty(_MemoryBacking, is_single=True) arch = XMLProperty("./arch") domain = XMLProperty("./domain") machine = XMLProperty("./machine") path = XMLProperty("./path") ################ # Init helpers # ################ @staticmethod def build_from_params(conn, emulator, arch, machine, hvtype): xml = None if conn.support.conn_domain_capabilities(): try: xml = conn.getDomainCapabilities(emulator, arch, machine, hvtype) log.debug("Fetched domain capabilities for (%s,%s,%s,%s): %s", emulator, arch, machine, hvtype, xml) except Exception: # pragma: no cover log.debug("Error fetching domcapabilities XML", exc_info=True) if not xml: # If not supported, just use a stub object return DomainCapabilities(conn) return DomainCapabilities(conn, parsexml=xml) @staticmethod def build_from_guest(guest): return DomainCapabilities.build_from_params(guest.conn, guest.emulator, guest.os.arch, guest.os.machine, guest.type) ######################### # UEFI/firmware methods # ######################### # Mapping of UEFI binary names to their associated architectures. We # only use this info to do things automagically for the user, it shouldn't # validate anything the user explicitly enters. _uefi_arch_patterns = { "i686": [ r".*edk2-i386-.*\.fd", # upstream qemu r".*ovmf-ia32.*", # fedora, gerd's firmware repo ], "x86_64": [ r".*edk2-x86_64-.*\.fd", # upstream qemu r".*OVMF_CODE\.fd", # RHEL r".*ovmf-x64/OVMF.*\.fd", # gerd's firmware repo r".*ovmf-x86_64-.*", # SUSE r".*ovmf.*", ".*OVMF.*", # generic attempt at a catchall ], "aarch64": [ r".*AAVMF_CODE\.fd", # RHEL r".*aarch64/QEMU_EFI.*", # fedora, gerd's firmware repo r".*aarch64.*", # generic attempt at a catchall r".*edk2-aarch64-code\.fd", # upstream qemu ], "armv7l": [ r".*AAVMF32_CODE\.fd", # Debian qemu-efi-arm package r".*arm/QEMU_EFI.*", # fedora, gerd's firmware repo r".*edk2-arm-code\.fd" # upstream qemu ], "riscv64": [ r".*RISCV_VIRT_CODE\..*", # Fedora, Debian r".*riscv64.*", # generic attempt at a catchall ], "loongarch64": [ r".*loongarch64/QEMU_CODE\..*", # Fedora r".*loongarch64.*", # generic attempt at a catchall ], } def find_uefi_path_for_arch(self): """ Search the loader paths for one that matches the passed arch """ if not self.arch_can_uefi(): return # pragma: no cover firmware_files = [f.value for f in self.os.loader.values] if self.conn.is_bhyve(): for firmware_file in firmware_files: if 'BHYVE_UEFI.fd' in firmware_file: return firmware_file return (firmware_files and firmware_files[0] or None) # pragma: no cover patterns = self._uefi_arch_patterns.get(self.arch) for pattern in patterns: for path in firmware_files: if re.match(pattern, path): return path def label_for_firmware_path(self, path): """ Return a pretty label for passed path, based on if we know about it or not """ if not path: if self.arch in ["i686", "x86_64"]: return _("BIOS") return _("Default") for arch, patterns in self._uefi_arch_patterns.items(): for pattern in patterns: if re.match(pattern, path): return (_("UEFI %(arch)s: %(path)s") % {"arch": arch, "path": path}) return _("Custom: %(path)s" % {"path": path}) def arch_can_uefi(self): """ Return True if we know how to setup UEFI for the passed arch """ return self.arch in self._uefi_arch_patterns def supports_uefi_loader(self): """ Return True if libvirt advertises support for UEFI loader """ return self.os.loader.get_enum("readonly").has_value("yes") def supports_firmware_efi(self): return self.os.get_enum("firmware").has_value("efi") ####################### # CPU support methods # ####################### def supports_safe_host_model(self): """ Return True if domcaps reports support for cpu mode=host-model. host-model in fact predates this support, however it wasn't general purpose safe prior to domcaps advertisement. """ m = self.cpu.get_mode("host-model") return (m and m.supported and m.models[0].fallback == "forbid") def supports_safe_host_passthrough(self): """ Return True if host-passthrough is safe enough to use by default. We limit this to domcaps new enough to report whether host-passthrough is migratable or not, which also means libvirt is about new enough to not taint the VM for using host-passthrough """ m = self.cpu.get_mode("host-passthrough") return (m and m.supported and "on" in m.get_enum("hostPassthroughMigratable").get_values()) def supports_maximum_cpu_mode(self): m = self.cpu.get_mode("maximum") return (m and m.supported) def get_cpu_models(self): models = [] for m in self.cpu.modes: if m.name == "custom" and m.supported: for model in m.models: if model.usable != "no": models.append(model.model) return models _features = None def get_cpu_security_features(self): if self._features is None: self._features = _lookup_cpu_security_features(self) or [] return self._features ######################## # Misc support methods # ######################## def supports_sev_launch_security(self, check_es=False): """ Returns False if either libvirt doesn't advertise support for SEV at all (< libvirt-4.5.0) or if it explicitly advertises it as unsupported on the platform """ if check_es: return bool(self.features.sev.supported and self.features.sev.maxESGuests) return bool(self.features.sev.supported) def supports_video_bochs(self): """ Returns False if either libvirt or qemu do not have support to bochs video type. """ return self.devices.video.get_enum("modelType").has_value("bochs") def supports_video_qxl(self): if not self.devices.video.has_enum("modelType"): # qxl long predates modelType in domcaps, so if it is missing, # use spice support as a rough value return self.supports_graphics_spice() return self.devices.video.get_enum("modelType").has_value("qxl") def supports_video_virtio(self): return self.devices.video.get_enum("modelType").has_value("virtio") def supports_tpm_emulator(self): """ Returns False if either libvirt or qemu do not have support for emulating a TPM. """ models = self.devices.tpm.get_enum("model").get_values() backends = self.devices.tpm.get_enum("backendModel").get_values() if self.arch == "armv7l" and models == ["tpm-tis"]: # libvirt as of 8.4.0 can advertise armv7l tpm-tis support, # but then explicitly rejects that config. If we see it, # assume TPM is not supported # https://gitlab.com/libvirt/libvirt/-/issues/329 return False return len(models) > 0 and bool("emulator" in backends) def supports_graphics_spice(self): if not self.devices.graphics.supported: # domcaps is too old, or the driver doesn't advertise graphics # support. Use our pre-existing logic if not self.conn.is_qemu() and not self.conn.is_test(): return False return self.conn.caps.host.cpu.arch in ["i686", "x86_64"] return self.devices.graphics.get_enum("type").has_value("spice") def supports_channel_spicevmc(self): """ Return False if libvirt explicitly advertises no support for spice channel """ if self.devices.channel.supported is None: # Follow the original behavior in case of talking to older # libvirt. return True return self.devices.channel.get_enum("type").has_value("spicevmc") def supports_redirdev_usb(self): """ Return False if libvirt explicitly advertises no support for USB redirect """ if self.devices.redirdev.supported is None: # Follow the original behavior in case of talking to older # libvirt. return True return self.devices.redirdev.get_enum("bus").has_value("usb") def supports_filesystem_virtiofs(self): """ Return True if libvirt advertises support for virtiofs """ return self.devices.filesystem.get_enum( "driverType").has_value("virtiofs") def supports_memorybacking_memfd(self): """ Return True if libvirt advertises support for memfd memory backend """ return self.memorybacking.get_enum("sourceType").has_value("memfd") def supported_hyperv_features(self): """ Return list of supported Hyper-V features. """ if not self.features.hyperv.supported: return [] return self.features.hyperv.get_enum("features").get_values() def supported_panic_models(self): """ Return list of supported panic device models. """ return self.devices.panic.get_enum("model").get_values()