8388c0627b
Use multiprocessing module to read the headers for each repo in parallel. Usualy reading data for repository means loading 6 lists, so the gain is considerable.
171 lines
5.1 KiB
Python
171 lines
5.1 KiB
Python
|
|
import collections
|
|
import logging
|
|
import multiprocessing
|
|
import os
|
|
import shutil
|
|
import subprocess
|
|
|
|
try:
|
|
import rpm
|
|
except ImportError:
|
|
rpm = None
|
|
|
|
from repos_cmp import rpm_ffi
|
|
|
|
LOG = logging.getLogger(__name__)
|
|
|
|
|
|
class NEVR(collections.namedtuple('NEVR', ['name', 'epoch',
|
|
'version', 'release'])):
|
|
|
|
if rpm:
|
|
@classmethod
|
|
def from_header(cls, header):
|
|
return cls(header[rpm.RPMTAG_NAME],
|
|
header[rpm.RPMTAG_EPOCH],
|
|
header[rpm.RPMTAG_VERSION],
|
|
header[rpm.RPMTAG_RELEASE])
|
|
|
|
@classmethod
|
|
def from_tsv_line(cls, line):
|
|
"""Creates a NEVR object from a tab-separated line.
|
|
|
|
The line should have the following format:
|
|
name\tepoch\tversion\trelease
|
|
"""
|
|
try:
|
|
n, e, v, r = line.split(b'\t')
|
|
except Exception:
|
|
LOG.error("Failed to parse line: %s", line, exc_info=True)
|
|
return None
|
|
|
|
if e in ('', '(none)', 'None'):
|
|
e = None
|
|
else:
|
|
try:
|
|
e = int(e)
|
|
except Exception:
|
|
LOG.error("Failed to parse epoch from line: %s",
|
|
line, exc_info=True)
|
|
return None
|
|
return cls(n, e, v, r)
|
|
|
|
def format_evr(self):
|
|
if self.epoch is None:
|
|
return '%s-%s' % (self.version, self.release)
|
|
else:
|
|
return '%s:%s-%s' % (self.epoch, self.version, self.release)
|
|
|
|
@property
|
|
def evr(self):
|
|
return self[1:]
|
|
|
|
|
|
def format_evr(nevr):
|
|
return nevr.format_evr() if nevr else 'MISSING'
|
|
|
|
|
|
def _uncompressed(path):
|
|
if not path.endswith('.xz'):
|
|
return open(path, 'rb')
|
|
pixz = shutil.which('pixz')
|
|
if pixz:
|
|
return subprocess.Popen([pixz, '-d', '-i', path, '-o', '/dev/stdout'],
|
|
stdout=subprocess.PIPE).stdout
|
|
return subprocess.Popen(['xz', '-dc', path], stdout=subprocess.PIPE).stdout
|
|
|
|
|
|
def read_pkglist_headers_rpm(path):
|
|
LOG.info("Reading %s using python-module-rpm", path)
|
|
if not rpm:
|
|
raise RuntimeError('rpm module is not avalable')
|
|
with _uncompressed(path) as input_file:
|
|
return rpm.readHeaderListFromFD(input_file.fileno())
|
|
|
|
|
|
def read_pkglist_heders_for_repo(repo_path, arches, components=None):
|
|
bin_lists = []
|
|
src_lists = []
|
|
seen = set()
|
|
|
|
# collect the files
|
|
for arch in arches:
|
|
basedir = os.path.join(repo_path, arch, 'base')
|
|
for pkglist in os.listdir(basedir):
|
|
if pkglist.endswith('.bz2'):
|
|
LOG.info('Ignoring %s/%s', basedir, pkglist)
|
|
continue
|
|
parts = pkglist.split('.', 3)
|
|
if parts[0] not in ('pkglist', 'srclist'):
|
|
continue
|
|
if components is not None and parts[1] not in components:
|
|
continue
|
|
what = basedir, parts[0], parts[1]
|
|
if what in seen:
|
|
LOG.info('Ignoring %s/%s', basedir, pkglist)
|
|
continue
|
|
seen.add(what)
|
|
(src_lists if parts[0] == 'srclist' else bin_lists).append(
|
|
os.path.join(basedir, pkglist))
|
|
with multiprocessing.Pool() as p:
|
|
src_res = p.map_async(read_pkglist_headers_rpm, src_lists)
|
|
bin_res = p.map_async(read_pkglist_headers_rpm, bin_lists)
|
|
return sum(src_res.get(), []), sum(bin_res.get(), [])
|
|
|
|
|
|
def _read_pkglist_rpm(path):
|
|
return (NEVR.from_header(h) for h in read_pkglist_headers_rpm(path))
|
|
|
|
|
|
_PKGLIST_QUERY_FORMAT = '%{NAME}\t%{EPOCH}\t%{VERSION}\t%{RELEASE}\n'
|
|
|
|
|
|
def _read_pkglist_pkglist_query(path):
|
|
LOG.info("Reading %s using pkglist-query", path)
|
|
with _uncompressed(path) as input_file:
|
|
query = subprocess.Popen(
|
|
["pkglist-query", _PKGLIST_QUERY_FORMAT, '-'],
|
|
stdin=input_file, stdout=subprocess.PIPE)
|
|
return (NEVR.from_tsv_line(line)
|
|
for line in query.communicate()[0].splitlines())
|
|
|
|
|
|
def read_pkglist(path):
|
|
if rpm:
|
|
result = _read_pkglist_rpm(path)
|
|
else:
|
|
result = _read_pkglist_pkglist_query(path)
|
|
return [r for r in result if r]
|
|
|
|
|
|
def read_src_dot_list(repo_path):
|
|
path = os.path.join(repo_path, 'files/list/src.list.xz')
|
|
LOG.info("Reading src.list %s", path)
|
|
|
|
result = []
|
|
with _uncompressed(path) as input_file:
|
|
for line in input_file:
|
|
try:
|
|
name, evr = line.split(b'\t', 2)[:2]
|
|
e, v, r = rpm_ffi.parse_evr(evr)
|
|
result.append(NEVR(name, e, v, r))
|
|
except Exception:
|
|
LOG.warning('Failed to parse line %r', line, exc_info=True)
|
|
return frozenset(result)
|
|
|
|
|
|
def read_srclists(prefix, arches):
|
|
result = frozenset()
|
|
for arch in arches:
|
|
srclist = os.path.join(prefix, arch, 'base', 'srclist.classic.xz')
|
|
result = result.union(read_pkglist(srclist))
|
|
if not result:
|
|
raise RuntimeError('Empty lists at %s' % prefix)
|
|
return result
|
|
|
|
|
|
def read_all_srclists(repos):
|
|
return dict((name, read_src_dot_list(v['path']))
|
|
for name, v in repos.items())
|