# # Copyright 2006-2007, 2013 Red Hat, Inc. # # This work is licensed under the GNU GPLv2 or later. # See the COPYING file in the top-level directory. import configparser import logging import re from .osdict import OSDB ############################################### # Helpers for detecting distro from given URL # ############################################### class _DistroCache(object): def __init__(self, fetcher): self._fetcher = fetcher self._filecache = {} self._treeinfo = None self.treeinfo_family = None self.treeinfo_version = None self.treeinfo_name = None self.treeinfo_matched = False self.suse_content = None self.checked_for_suse_content = False self.debian_media_type = None self.libosinfo_os_variant = None self.libosinfo_mediaobj = None def acquire_file_content(self, path): if path not in self._filecache: try: content = self._fetcher.acquireFileContent(path) except ValueError: content = None logging.debug("Failed to acquire file=%s", path) self._filecache[path] = content return self._filecache[path] @property def treeinfo(self): if self._treeinfo: return self._treeinfo treeinfostr = self.acquire_file_content(".treeinfo") if treeinfostr is None: return None # If the file doesn't parse or there's no 'family', this will # error, but that should be fine because we aren't going to # successfully detect the tree anyways treeinfo = configparser.SafeConfigParser() treeinfo.read_string(treeinfostr) self.treeinfo_family = treeinfo.get("general", "family") self._treeinfo = treeinfo logging.debug("treeinfo family=%s", self.treeinfo_family) if self._treeinfo.has_option("general", "version"): self.treeinfo_version = self._treeinfo.get("general", "version") logging.debug("Found treeinfo version=%s", self.treeinfo_version) if self._treeinfo.has_option("general", "name"): self.treeinfo_name = self._treeinfo.get("general", "name") logging.debug("Found treeinfo name=%s", self.treeinfo_name) return self._treeinfo def treeinfo_family_regex(self, famregex): if not self.treeinfo: return False ret = bool(re.match(famregex, self.treeinfo_family)) self.treeinfo_matched = ret if not ret: logging.debug("Didn't match treeinfo family regex=%s", famregex) return ret def content_regex(self, filename, regex): """ Fetch 'filename' and return True/False if it matches the regex """ content = self.acquire_file_content(filename) if content is None: return False for line in content.splitlines(): if re.match(regex, line): return True logging.debug("found filename=%s but regex=%s didn't match", filename, regex) return False def get_treeinfo_media(self, typ): """ Pull kernel/initrd/boot.iso paths out of the treeinfo for the passed data """ def _get_treeinfo_path(media_name): image_type = self.treeinfo.get("general", "arch") if typ == "xen": image_type = "xen" return self.treeinfo.get("images-%s" % image_type, media_name) try: return [(_get_treeinfo_path("kernel"), _get_treeinfo_path("initrd"))] except Exception: logging.debug("Failed to parse treeinfo kernel/initrd", exc_info=True) return [] def split_version(self): verstr = self.treeinfo_version def _safeint(c): try: return int(c) except Exception: return 0 # Parse a string like 6.9 or 7.4 into its two parts # centos altarch's have just version=7 update = 0 version = _safeint(verstr) if verstr.count(".") >= 1: version = _safeint(verstr.split(".")[0]) update = _safeint(verstr.split(".")[1]) logging.debug("converted verstr=%s to version=%s update=%s", verstr, version, update) return version, update def fetcher_is_iso(self): return self._fetcher.is_iso() def guess_os_from_iso(self): ret = OSDB.guess_os_by_iso(self._fetcher.location) if not ret: return False self.libosinfo_os_variant, self.libosinfo_mediaobj = ret if (not self.libosinfo_mediaobj.get_kernel_path() or not self.libosinfo_mediaobj.get_initrd_path()): # This can happen if the media is live media, or just # with incomplete libosinfo data logging.debug("libosinfo didn't report any media kernel/initrd " "path for detected os_variant=%s", self.libosinfo_mediaobj) return False return True class _SUSEContent(object): """ Helper class tracking the SUSE 'content' files """ def __init__(self, content_str): self.content_str = content_str self.content_dict = {} for line in self.content_str.splitlines(): for prefix in ["LABEL", "DISTRO", "VERSION", "BASEARCHS", "DEFAULTBASE", "REPOID"]: if line.startswith(prefix + " "): self.content_dict[prefix] = line.split(" ", 1)[1] logging.debug("SUSE content dict: %s", str(self.content_dict)) self.tree_arch = self._get_tree_arch() self.product_name = self._get_product_name() self.product_version = self._get_product_version() logging.debug("SUSE content product_name=%s product_version=%s " "tree_arch=%s", self.product_name, self.product_version, self.tree_arch) def _get_tree_arch(self): # Examples: # opensuse 11.4: BASEARCHS i586 x86_64 # opensuse 12.3: BASEARCHS i586 x86_64 # opensuse 10.3: DEFAULTBASE i586 distro_arch = (self.content_dict.get("BASEARCHS") or self.content_dict.get("DEFAULTBASE")) if not distro_arch and "REPOID" in self.content_dict: distro_arch = self.content_dict["REPOID"].rsplit('/', 1)[1] if not distro_arch: return None tree_arch = distro_arch.strip() # Fix for 13.2 official oss repo if tree_arch.find("i586-x86_64") != -1: tree_arch = "x86_64" return tree_arch def _get_product_name(self): """ Parse the SUSE product name. Examples: SUSE Linux Enterprise Server 11 SP4 openSUSE 11.4 """ # Some field examples in the wild # # opensuse 10.3: LABEL openSUSE 10.3 # opensuse 11.4: LABEL openSUSE 11.4 # opensuse 12.3: LABEL openSUSE # sles11sp4 DVD: LABEL SUSE Linux Enterprise Server 11 SP4 # # # DISTRO cpe:/o:opensuse:opensuse:13.2,openSUSE # DISTRO cpe:/o:suse:sled:12:sp3,SUSE Linux Enterprise Desktop 12 SP3 # # As of 2018 all latest distros match only DISTRO and REPOID. product_name = None if "LABEL" in self.content_dict: product_name = self.content_dict["LABEL"] elif "," in self.content_dict.get("DISTRO", ""): product_name = self.content_dict["DISTRO"].rsplit(",", 1)[1] logging.debug("SUSE content product_name=%s", product_name) return product_name def _get_product_version(self): # Some example fields: # # opensuse 10.3: VERSION 10.3 # opensuse 12.3: VERSION 12.3 # SLES-10-SP4-DVD-x86_64-GM-DVD1.iso: VERSION 10.4-0 # # REPOID obsproduct://build.suse.de/SUSE:SLE-11-SP4:GA/SUSE_SLES/11.4/DVD/x86_64 # REPOID obsproduct://build.suse.de/SUSE:SLE-12-SP3:GA/SLES/12.3/DVD/aarch64 # # As of 2018 all latest distros match only DISTRO and REPOID. if not self.product_name: return None distro_version = self.content_dict.get("VERSION", "") if "-" in distro_version: distro_version = distro_version.split('-', 1)[0] # Special case, parse version out of a line like this # cpe:/o:opensuse:opensuse:13.2,openSUSE if (not distro_version and re.match("^.*:.*,openSUSE*", self.content_dict["DISTRO"])): distro_version = self.content_dict["DISTRO"].rsplit( ",", 1)[0].strip().rsplit(":")[4] distro_version = distro_version.strip() if "Enterprise" in self.product_name or "SLES" in self.product_name: sle_version = self.product_name.strip().rsplit(' ')[4] if len(self.product_name.strip().rsplit(' ')) > 5: sle_version = (sle_version + '.' + self.product_name.strip().rsplit(' ')[5][2]) distro_version = sle_version return distro_version def getDistroStore(guest, fetcher): logging.debug("Finding distro store for location=%s", fetcher.location) arch = guest.os.arch _type = guest.os.os_type osobj = guest.osinfo stores = _allstores[:] cache = _DistroCache(fetcher) # If user manually specified an os_distro, bump its URL class # to the top of the list if osobj.distro: logging.debug("variant=%s has distro=%s, looking for matching " "distro store to prioritize", osobj.name, osobj.distro) found_store = None for store in stores: if osobj.distro in store.matching_distros: found_store = store if found_store: logging.debug("Prioritizing distro store=%s", found_store) stores.remove(found_store) stores.insert(0, found_store) else: logging.debug("No matching store found, not prioritizing anything") for sclass in stores: if not sclass.is_valid(cache): continue store = sclass(fetcher.location, arch, _type, cache) logging.debug("Detected class=%s osvariant=%s", store.__class__.__name__, store.get_osdict_info()) return store # No distro was detected. See if the URL even resolves, and if not # give the user a hint that maybe they mistyped. This won't always # be true since some webservers don't allow directory listing. # https://www.redhat.com/archives/virt-tools-list/2014-December/msg00048.html extramsg = "" if not fetcher.can_access(): extramsg = (": " + _("The URL could not be accessed, maybe you mistyped?")) raise ValueError( _("Could not find an installable distribution at '%s'%s\n\n" "The location must be the root directory of an install tree.\n" "See virt-install man page for various distro examples." % (fetcher.location, extramsg))) ################## # Distro classes # ################## class _DistroTree(object): """ Class for determining the kernel/initrd path for an install tree (URL, ISO, or local directory) """ PRETTY_NAME = None matching_distros = [] def __init__(self, location, arch, vmtype, cache): self.type = vmtype self.arch = arch self.uri = location self.cache = cache if self.cache.libosinfo_os_variant: self._os_variant = self.cache.libosinfo_os_variant else: self._os_variant = self._detect_version() if self._os_variant and not OSDB.lookup_os(self._os_variant): logging.debug("Detected os_variant as %s, which is not " "in osdict.", self._os_variant) self._os_variant = None self._kernel_paths = [] if self.cache.treeinfo_matched: self._kernel_paths = self.cache.get_treeinfo_media(self.type) else: self._set_manual_kernel_paths() def _set_manual_kernel_paths(self): """ If kernel/initrd path could not be determined from a source like treeinfo, subclasses can override this to set a list of manual paths """ pass def _detect_version(self): """ Hook for subclasses to detect media os variant. """ logging.debug("%s does not implement any osdict detection", self) return None ############## # Public API # ############## @classmethod def is_valid(cls, cache): raise NotImplementedError def get_kernel_paths(self): return self._kernel_paths def get_osdict_info(self): """ Return detected osdict value """ return self._os_variant class _FedoraDistro(_DistroTree): PRETTY_NAME = "Fedora" matching_distros = ["fedora"] @classmethod def is_valid(cls, cache): famregex = ".*Fedora.*" return cache.treeinfo_family_regex(famregex) def _detect_version(self): latest_variant = "fedora-unknown" verstr = self.cache.treeinfo_version if not verstr: logging.debug("No treeinfo version? Assume latest_variant=%s", latest_variant) return latest_variant # rawhide trees changed to use version=Rawhide in Apr 2016 if verstr in ["development", "rawhide", "Rawhide"]: logging.debug("treeinfo version=%s, using latest_variant=%s", verstr, latest_variant) return latest_variant # treeinfo version is just an integer variant = "fedora" + verstr if OSDB.lookup_os(variant): return variant logging.debug("variant=%s from treeinfo version=%s not found, " "using latest_variant=%s", variant, verstr, latest_variant) return latest_variant class _RHELDistro(_DistroTree): PRETTY_NAME = "Red Hat Enterprise Linux" matching_distros = ["rhel"] _variant_prefix = "rhel" @classmethod def is_valid(cls, cache): # Matches: # Red Hat Enterprise Linux # RHEL Atomic Host famregex = ".*(Red Hat Enterprise Linux|RHEL).*" return cache.treeinfo_family_regex(famregex) def _detect_version(self): if not self.cache.treeinfo_version: logging.debug("No treeinfo version? Not setting an os_variant") return version, update = self.cache.split_version() # start with example base=rhel7, then walk backwards # through the OS list to find the latest os name that matches # this way we handle rhel7.6 from treeinfo when osdict only # knows about rhel7.5 base = self._variant_prefix + str(version) while update >= 0: tryvar = base + ".%s" % update if OSDB.lookup_os(tryvar): return tryvar update -= 1 class _CentOSDistro(_RHELDistro): PRETTY_NAME = "CentOS" matching_distros = ["centos"] _variant_prefix = "centos" @classmethod def is_valid(cls, cache): famregex = ".*(CentOS|Scientific).*" return cache.treeinfo_family_regex(famregex) class _SuseDistro(_RHELDistro): PRETTY_NAME = None _suse_regex = [] matching_distros = [] _variant_prefix = NotImplementedError famregex = NotImplementedError @classmethod def is_valid(cls, cache): if cache.treeinfo_family_regex(cls.famregex): return True if not cache.checked_for_suse_content: cache.checked_for_suse_content = True content_str = cache.acquire_file_content("content") if content_str is None: return False try: cache.suse_content = _SUSEContent(content_str) except Exception as e: logging.debug("Error parsing SUSE content file: %s", str(e)) return False if not cache.suse_content: return False for regex in cls._suse_regex: if re.match(regex, cache.suse_content.product_name): return True return False def _set_manual_kernel_paths(self): # We only reach here if no treeinfo was matched tree_arch = self.cache.suse_content.tree_arch if re.match(r'i[4-9]86', tree_arch): tree_arch = 'i386' oldkern = "linux" oldinit = "initrd" if tree_arch == "x86_64": oldkern += "64" oldinit += "64" if self.type == "xen": # Matches Opensuse > 10.2 and sles 10 self._kernel_paths.append( ("boot/%s/vmlinuz-xen" % tree_arch, "boot/%s/initrd-xen" % tree_arch)) if (tree_arch == "s390x" and str(self._os_variant).startswith(("sles11", "sled11"))): self._kernel_paths.append( ("boot/s390x/vmrdr.ikr", "boot/s390x/initrd")) # Tested with SLES 12 for ppc64le, all s390x self._kernel_paths.append( ("boot/%s/linux" % tree_arch, "boot/%s/initrd" % tree_arch)) # Tested with Opensuse 10.0 self._kernel_paths.append( ("boot/loader/%s" % oldkern, "boot/loader/%s" % oldinit)) # Tested with Opensuse >= 10.2, 11, and sles 10 self._kernel_paths.append( ("boot/%s/loader/linux" % tree_arch, "boot/%s/loader/initrd" % tree_arch)) def _detect_osdict_from_suse_content(self): if not self.cache.suse_content: return distro_version = self.cache.suse_content.product_version if not distro_version: return version = distro_version.split('.', 1)[0].strip() if len(version) == 8: # Tumbleweed 8 digit date return "opensusetumbleweed" if int(version) < 10: return self._variant_prefix + "9" if str(self._variant_prefix).startswith(("sles", "sled")): sp_version = "" if len(distro_version.split('.', 1)) == 2: sp_version = 'sp' + distro_version.split('.', 1)[1].strip() return self._variant_prefix + version + sp_version return self._variant_prefix + distro_version def _detect_osdict_from_url(self): root = "opensuse" oses = [n for n in OSDB.list_os() if n.name.startswith(root)] for osobj in oses: codename = osobj.name[len(root):] if re.search("/%s/" % codename, self.uri): return osobj.name def _detect_from_treeinfo(self): if not self.cache.treeinfo_name: return if re.search("openSUSE Tumbleweed", self.cache.treeinfo_name): return "opensusetumbleweed" version, update = self.cache.split_version() base = self._variant_prefix + str(version) while update >= 0: tryvar = base # SLE doesn't use '.0' for initial releases in # osinfo-db (sles11, sles12, etc) if update > 0 or not base.startswith('sle'): tryvar += ".%s" % update if OSDB.lookup_os(tryvar): return tryvar update -= 1 def _detect_version(self): var = self._detect_from_treeinfo() if not var: var = self._detect_osdict_from_url() if not var: var = self._detect_osdict_from_suse_content() return var class _SLESDistro(_SuseDistro): PRETTY_NAME = "SLES" matching_distros = ["sles"] _variant_prefix = "sles" _suse_regex = [".*SUSE Linux Enterprise Server*", ".*SUSE SLES*"] famregex = ".*SUSE Linux Enterprise.*" class _SLEDDistro(_SuseDistro): PRETTY_NAME = "SLED" matching_distros = ["sled"] _variant_prefix = "sled" _suse_regex = [".*SUSE Linux Enterprise Desktop*"] famregex = ".*SUSE Linux Enterprise.*" class _OpensuseDistro(_SuseDistro): PRETTY_NAME = "openSUSE" matching_distros = ["opensuse"] _variant_prefix = "opensuse" _suse_regex = [".*openSUSE.*"] famregex = ".*openSUSE.*" class _DebianDistro(_DistroTree): # ex. http://ftp.egr.msu.edu/debian/dists/sarge/main/installer-i386/ # daily builds: https://d-i.debian.org/daily-images/amd64/ PRETTY_NAME = "Debian" matching_distros = ["debian"] _debname = "debian" @classmethod def is_valid(cls, cache): def check_manifest(mfile): is_ubuntu = cls._debname == "ubuntu" if cache.content_regex(mfile, ".*[Uu]buntu.*"): return is_ubuntu return cache.content_regex(mfile, ".*[Dd]ebian.*") media_type = None if check_manifest("current/images/MANIFEST"): media_type = "url" elif check_manifest("daily/MANIFEST"): media_type = "daily" elif cache.content_regex(".disk/info", "%s.*" % cls._debname.capitalize()): # There's two cases here: # 1) Direct access ISO, attached as CDROM afterwards. We # use one set of kernels in that case which seem to # assume the prescence of CDROM media # 2) ISO mounted and exported over URL. We use a different # set of kernels that expect to boot from the network if cache.fetcher_is_iso(): media_type = "disk" else: media_type = "mounted_iso_url" if media_type: cache.debian_media_type = media_type return bool(media_type) def _set_manual_kernel_paths(self): if self.cache.debian_media_type == "disk": self._set_installcd_paths() else: self._set_url_paths() def _find_treearch(self): for pattern in [r"^.*/installer-(\w+)/?$", r"^.*/daily-images/(\w+)/?$"]: arch = re.findall(pattern, self.uri) if not arch: continue logging.debug("Found pattern=%s treearch=%s in uri", pattern, arch[0]) return arch[0] # Check for standard arch strings which will be # in the URI name for --location $ISO mounts for arch in ["i386", "amd64", "x86_64", "arm64"]: if arch in self.uri: logging.debug("Found treearch=%s in uri", arch) if arch == "x86_64": arch = "amd64" return arch # Otherwise default to i386 arch = "i386" logging.debug("No treearch found in uri, defaulting to arch=%s", arch) return arch def _set_url_paths(self): url_prefix = "current/images" if self.cache.debian_media_type == "daily": url_prefix = "daily" elif self.cache.debian_media_type == "mounted_iso_url": url_prefix = "install" tree_arch = self._find_treearch() hvmroot = "%s/netboot/%s-installer/%s/" % (url_prefix, self._debname, tree_arch) initrd_basename = "initrd.gz" kernel_basename = "linux" if tree_arch in ["ppc64el"]: kernel_basename = "vmlinux" if tree_arch == "s390x": hvmroot = "%s/generic/" % url_prefix kernel_basename = "kernel.%s" % self._debname initrd_basename = "initrd.%s" % self._debname if self.type == "xen": xenroot = "%s/netboot/xen/" % url_prefix self._kernel_paths.append( (xenroot + "vmlinuz", xenroot + "initrd.gz")) self._kernel_paths.append( (hvmroot + kernel_basename, hvmroot + initrd_basename)) def _set_installcd_paths(self): if self._debname == "ubuntu": if not self.arch == "s390x": kpair = ("install/vmlinuz", "install/initrd.gz") else: kpair = ("boot/kernel.ubuntu", "boot/initrd.ubuntu") elif self.arch == "x86_64": kpair = ("install.amd/vmlinuz", "install.amd/initrd.gz") elif self.arch == "i686": kpair = ("install.386/vmlinuz", "install.386/initrd.gz") elif self.arch == "aarch64": kpair = ("install.a64/vmlinuz", "install.a64/initrd.gz") elif self.arch == "ppc64le": kpair = ("install/vmlinux", "install/initrd.gz") elif self.arch == "s390x": kpair = ("boot/linux_vm", "boot/root.bin") else: kpair = ("install/vmlinuz", "install/initrd.gz") self._kernel_paths += [kpair] return True def _detect_version(self): oses = [n for n in OSDB.list_os() if n.name.startswith(self._debname)] if self.cache.debian_media_type == "daily": logging.debug("Appears to be debian 'daily' URL, using latest " "debian OS") return oses[0].name for osobj in oses: if osobj.codename: # Ubuntu codenames look like 'Warty Warthog' codename = osobj.codename.split()[0].lower() else: if " " not in osobj.label: continue # Debian labels look like 'Debian Sarge' codename = osobj.label.split()[1].lower() if ("/%s/" % codename) in self.uri: logging.debug("Found codename=%s in the URL string", codename) return osobj.name class _UbuntuDistro(_DebianDistro): # https://archive.ubuntu.com/ubuntu/dists/natty/main/installer-amd64/ PRETTY_NAME = "Ubuntu" matching_distros = ["ubuntu"] _debname = "ubuntu" class _ALTLinuxDistro(_DistroTree): PRETTY_NAME = "ALT Linux" matching_distros = ["altlinux"] def _set_manual_kernel_paths(self): self._kernel_paths = [ ("syslinux/alt0/vmlinuz", "syslinux/alt0/full.cz")] @classmethod def is_valid(cls, cache): # altlinux doesn't have installable URLs, so this is just for ISO return cache.content_regex(".disk/info", ".*ALT .*") class _MandrivaDistro(_DistroTree): # ftp://ftp.uwsg.indiana.edu/linux/mandrake/official/2007.1/x86_64/ PRETTY_NAME = "Mandriva/Mageia" matching_distros = ["mandriva", "mes"] @classmethod def is_valid(cls, cache): return cache.content_regex("VERSION", ".*(Mandriva|Mageia).*") def _set_manual_kernel_paths(self): # At least Mageia 5 uses arch in the names self._kernel_paths += [ ("isolinux/%s/vmlinuz" % self.arch, "isolinux/%s/all.rdz" % self.arch)] # Kernels for HVM: valid for releases 2007.1, 2008.*, 2009.0 self._kernel_paths += [ ("isolinux/alt0/vmlinuz", "isolinux/alt0/all.rdz")] class _GenericTreeinfoDistro(_DistroTree): """ Generic catchall class for .treeinfo using distros """ PRETTY_NAME = "Generic Treeinfo" matching_distros = [] @classmethod def is_valid(cls, cache): if cache.treeinfo: cache.treeinfo_matched = True return True return False class _LibosinfoDistro(_DistroTree): """ For ISO media detection that was fully handled by libosinfo """ PRETTY_NAME = "Libosinfo detected" matching_distros = [] @classmethod def is_valid(cls, cache): if cache.fetcher_is_iso(): return cache.guess_os_from_iso() return False def _set_manual_kernel_paths(self): self._kernel_paths += [ (self.cache.libosinfo_mediaobj.get_kernel_path(), self.cache.libosinfo_mediaobj.get_initrd_path()) ] # Build list of all *Distro classes def _build_distro_list(): allstores = [] for obj in list(globals().values()): if (isinstance(obj, type) and issubclass(obj, _DistroTree) and obj.PRETTY_NAME): allstores.append(obj) # Always stick Libosinfo first, it takes priority allstores.remove(_LibosinfoDistro) allstores.insert(0, _LibosinfoDistro) # Always stick GenericDistro at the end, since it's a catchall allstores.remove(_GenericTreeinfoDistro) allstores.append(_GenericTreeinfoDistro) return allstores _allstores = _build_distro_list()