import collections import logging import multiprocessing import os import shutil import subprocess try: import rpm except ImportError: rpm = None from repos_cmp import rpm_ffi LOG = logging.getLogger(__name__) class NEVR(collections.namedtuple('NEVR', ['name', 'epoch', 'version', 'release'])): if rpm: @classmethod def from_header(cls, header): return cls(header[rpm.RPMTAG_NAME], header[rpm.RPMTAG_EPOCH], header[rpm.RPMTAG_VERSION], header[rpm.RPMTAG_RELEASE]) @classmethod def from_tsv_line(cls, line): """Creates a NEVR object from a tab-separated line. The line should have the following format: name\tepoch\tversion\trelease """ try: n, e, v, r = line.split(b'\t') except Exception: LOG.error("Failed to parse line: %s", line, exc_info=True) return None if e in ('', '(none)', 'None'): e = None else: try: e = int(e) except Exception: LOG.error("Failed to parse epoch from line: %s", line, exc_info=True) return None return cls(n, e, v, r) def format_evr(self): if self.epoch is None: return '%s-%s' % (self.version, self.release) else: return '%s:%s-%s' % (self.epoch, self.version, self.release) @property def evr(self): return self[1:] def format_evr(nevr): return nevr.format_evr() if nevr else 'MISSING' def _uncompressed(path): if not path.endswith('.xz'): return open(path, 'rb') pixz = shutil.which('pixz') if pixz: return subprocess.Popen([pixz, '-d', '-i', path, '-o', '/dev/stdout'], stdout=subprocess.PIPE).stdout return subprocess.Popen(['xz', '-dc', path], stdout=subprocess.PIPE).stdout def read_pkglist_headers_rpm(path): LOG.info("Reading %s using python-module-rpm", path) if not rpm: raise RuntimeError('rpm module is not avalable') with _uncompressed(path) as input_file: return rpm.readHeaderListFromFD(input_file.fileno()) def read_pkglist_heders_for_repo(repo_path, arches, components=None): bin_lists = [] src_lists = [] seen = set() # collect the files for arch in arches: basedir = os.path.join(repo_path, arch, 'base') for pkglist in os.listdir(basedir): if pkglist.endswith('.bz2'): LOG.info('Ignoring %s/%s', basedir, pkglist) continue parts = pkglist.split('.', 3) if parts[0] not in ('pkglist', 'srclist'): continue if components is not None and parts[1] not in components: continue what = basedir, parts[0], parts[1] if what in seen: LOG.info('Ignoring %s/%s', basedir, pkglist) continue seen.add(what) (src_lists if parts[0] == 'srclist' else bin_lists).append( os.path.join(basedir, pkglist)) with multiprocessing.Pool() as p: src_res = p.map_async(read_pkglist_headers_rpm, src_lists) bin_res = p.map_async(read_pkglist_headers_rpm, bin_lists) return sum(src_res.get(), []), sum(bin_res.get(), []) def _read_pkglist_rpm(path): return (NEVR.from_header(h) for h in read_pkglist_headers_rpm(path)) _PKGLIST_QUERY_FORMAT = '%{NAME}\t%{EPOCH}\t%{VERSION}\t%{RELEASE}\n' def _read_pkglist_pkglist_query(path): LOG.info("Reading %s using pkglist-query", path) with _uncompressed(path) as input_file: query = subprocess.Popen( ["pkglist-query", _PKGLIST_QUERY_FORMAT, '-'], stdin=input_file, stdout=subprocess.PIPE) return (NEVR.from_tsv_line(line) for line in query.communicate()[0].splitlines()) def read_pkglist(path): if rpm: result = _read_pkglist_rpm(path) else: result = _read_pkglist_pkglist_query(path) return [r for r in result if r] def read_src_dot_list(repo_path): path = os.path.join(repo_path, 'files/list/src.list.xz') LOG.info("Reading src.list %s", path) result = [] with _uncompressed(path) as input_file: for line in input_file: try: name, evr = line.split(b'\t', 2)[:2] e, v, r = rpm_ffi.parse_evr(evr) result.append(NEVR(name, e, v, r)) except Exception: LOG.warning('Failed to parse line %r', line, exc_info=True) return frozenset(result) def read_srclists(prefix, arches): result = frozenset() for arch in arches: srclist = os.path.join(prefix, arch, 'base', 'srclist.classic.xz') result = result.union(read_pkglist(srclist)) if not result: raise RuntimeError('Empty lists at %s' % prefix) return result def read_all_srclists(repos): return dict((name, read_src_dot_list(v['path'])) for name, v in repos.items())