repos-cmp/repos_cmp/lists.py
Ivan A. Melnikov 8388c0627b lists: Parallelize read_pkglist_heders_for_repo
Use multiprocessing module to read the headers for each
repo in parallel. Usualy reading data for repository means
loading 6 lists, so the gain is considerable.
2023-12-05 13:27:43 +04:00

171 lines
5.1 KiB
Python

import collections
import logging
import multiprocessing
import os
import shutil
import subprocess
try:
import rpm
except ImportError:
rpm = None
from repos_cmp import rpm_ffi
LOG = logging.getLogger(__name__)
class NEVR(collections.namedtuple('NEVR', ['name', 'epoch',
'version', 'release'])):
if rpm:
@classmethod
def from_header(cls, header):
return cls(header[rpm.RPMTAG_NAME],
header[rpm.RPMTAG_EPOCH],
header[rpm.RPMTAG_VERSION],
header[rpm.RPMTAG_RELEASE])
@classmethod
def from_tsv_line(cls, line):
"""Creates a NEVR object from a tab-separated line.
The line should have the following format:
name\tepoch\tversion\trelease
"""
try:
n, e, v, r = line.split(b'\t')
except Exception:
LOG.error("Failed to parse line: %s", line, exc_info=True)
return None
if e in ('', '(none)', 'None'):
e = None
else:
try:
e = int(e)
except Exception:
LOG.error("Failed to parse epoch from line: %s",
line, exc_info=True)
return None
return cls(n, e, v, r)
def format_evr(self):
if self.epoch is None:
return '%s-%s' % (self.version, self.release)
else:
return '%s:%s-%s' % (self.epoch, self.version, self.release)
@property
def evr(self):
return self[1:]
def format_evr(nevr):
return nevr.format_evr() if nevr else 'MISSING'
def _uncompressed(path):
if not path.endswith('.xz'):
return open(path, 'rb')
pixz = shutil.which('pixz')
if pixz:
return subprocess.Popen([pixz, '-d', '-i', path, '-o', '/dev/stdout'],
stdout=subprocess.PIPE).stdout
return subprocess.Popen(['xz', '-dc', path], stdout=subprocess.PIPE).stdout
def read_pkglist_headers_rpm(path):
LOG.info("Reading %s using python-module-rpm", path)
if not rpm:
raise RuntimeError('rpm module is not avalable')
with _uncompressed(path) as input_file:
return rpm.readHeaderListFromFD(input_file.fileno())
def read_pkglist_heders_for_repo(repo_path, arches, components=None):
bin_lists = []
src_lists = []
seen = set()
# collect the files
for arch in arches:
basedir = os.path.join(repo_path, arch, 'base')
for pkglist in os.listdir(basedir):
if pkglist.endswith('.bz2'):
LOG.info('Ignoring %s/%s', basedir, pkglist)
continue
parts = pkglist.split('.', 3)
if parts[0] not in ('pkglist', 'srclist'):
continue
if components is not None and parts[1] not in components:
continue
what = basedir, parts[0], parts[1]
if what in seen:
LOG.info('Ignoring %s/%s', basedir, pkglist)
continue
seen.add(what)
(src_lists if parts[0] == 'srclist' else bin_lists).append(
os.path.join(basedir, pkglist))
with multiprocessing.Pool() as p:
src_res = p.map_async(read_pkglist_headers_rpm, src_lists)
bin_res = p.map_async(read_pkglist_headers_rpm, bin_lists)
return sum(src_res.get(), []), sum(bin_res.get(), [])
def _read_pkglist_rpm(path):
return (NEVR.from_header(h) for h in read_pkglist_headers_rpm(path))
_PKGLIST_QUERY_FORMAT = '%{NAME}\t%{EPOCH}\t%{VERSION}\t%{RELEASE}\n'
def _read_pkglist_pkglist_query(path):
LOG.info("Reading %s using pkglist-query", path)
with _uncompressed(path) as input_file:
query = subprocess.Popen(
["pkglist-query", _PKGLIST_QUERY_FORMAT, '-'],
stdin=input_file, stdout=subprocess.PIPE)
return (NEVR.from_tsv_line(line)
for line in query.communicate()[0].splitlines())
def read_pkglist(path):
if rpm:
result = _read_pkglist_rpm(path)
else:
result = _read_pkglist_pkglist_query(path)
return [r for r in result if r]
def read_src_dot_list(repo_path):
path = os.path.join(repo_path, 'files/list/src.list.xz')
LOG.info("Reading src.list %s", path)
result = []
with _uncompressed(path) as input_file:
for line in input_file:
try:
name, evr = line.split(b'\t', 2)[:2]
e, v, r = rpm_ffi.parse_evr(evr)
result.append(NEVR(name, e, v, r))
except Exception:
LOG.warning('Failed to parse line %r', line, exc_info=True)
return frozenset(result)
def read_srclists(prefix, arches):
result = frozenset()
for arch in arches:
srclist = os.path.join(prefix, arch, 'base', 'srclist.classic.xz')
result = result.union(read_pkglist(srclist))
if not result:
raise RuntimeError('Empty lists at %s' % prefix)
return result
def read_all_srclists(repos):
return dict((name, read_src_dot_list(v['path']))
for name, v in repos.items())