repos-cmp/repos_cmp/lists.py
Ivan A. Melnikov 90d99e3882 lists: Read only the first of compressed pkglists
By default, genbasedir saves several copies of every
package list, compressing them with different methods
(xz, bz2, uncompressed). With this change, we read
only the first one, thus avoiding extra work
and (false) warnings about duplicate binaries.
2023-06-02 15:52:46 +04:00

162 lines
4.7 KiB
Python

import collections
import logging
import os
import shutil
import subprocess
try:
import rpm
except ImportError:
rpm = None
from repos_cmp import rpm_ffi
LOG = logging.getLogger(__name__)
class NEVR(collections.namedtuple('NEVR', ['name', 'epoch',
'version', 'release'])):
if rpm:
@classmethod
def from_header(cls, header):
return cls(header[rpm.RPMTAG_NAME],
header[rpm.RPMTAG_EPOCH],
header[rpm.RPMTAG_VERSION],
header[rpm.RPMTAG_RELEASE])
@classmethod
def from_tsv_line(cls, line):
"""Creates a NEVR object from a tab-separated line.
The line should have the following format:
name\tepoch\tversion\trelease
"""
try:
n, e, v, r = line.split(b'\t')
except Exception:
LOG.error("Failed to parse line: %s", line, exc_info=True)
return None
if e in ('', '(none)', 'None'):
e = None
else:
try:
e = int(e)
except Exception:
LOG.error("Failed to parse epoch from line: %s",
line, exc_info=True)
return None
return cls(n, e, v, r)
def format_evr(self):
if self.epoch is None:
return '%s-%s' % (self.version, self.release)
else:
return '%s:%s-%s' % (self.epoch, self.version, self.release)
@property
def evr(self):
return self[1:]
def format_evr(nevr):
return nevr.format_evr() if nevr else 'MISSING'
def _uncompressed(path):
if not path.endswith('.xz'):
return open(path, 'rb')
pixz = shutil.which('pixz')
if pixz:
return subprocess.Popen([pixz, '-d', '-i', path, '-o', '/dev/stdout'],
stdout=subprocess.PIPE).stdout
return subprocess.Popen(['xz', '-dc', path], stdout=subprocess.PIPE).stdout
def read_pkglist_headers_rpm(path):
LOG.info("Reading %s using python-module-rpm", path)
if not rpm:
raise RuntimeError('rpm module is not avalable')
with _uncompressed(path) as input_file:
return rpm.readHeaderListFromFD(input_file.fileno())
def read_pkglist_heders_for_repo(repo_path, arches, components=None):
bin_headers = []
src_headers = []
seen = set()
for arch in arches:
basedir = os.path.join(repo_path, arch, 'base')
for pkglist in os.listdir(basedir):
parts = pkglist.split('.', 3)
if parts[0] not in ('pkglist', 'srclist'):
continue
if components is not None and parts[1] not in components:
continue
what = basedir, parts[0], parts[1]
if what in seen:
LOG.info('Ignoring %s/%s', basedir, pkglist)
continue
seen.add(what)
(src_headers if parts[0] == 'srclist' else bin_headers).extend(
read_pkglist_headers_rpm(os.path.join(basedir, pkglist)))
return src_headers, bin_headers
def _read_pkglist_rpm(path):
return (NEVR.from_header(h) for h in read_pkglist_headers_rpm(path))
_PKGLIST_QUERY_FORMAT = '%{NAME}\t%{EPOCH}\t%{VERSION}\t%{RELEASE}\n'
def _read_pkglist_pkglist_query(path):
LOG.info("Reading %s using pkglist-query", path)
with _uncompressed(path) as input_file:
query = subprocess.Popen(
["pkglist-query", _PKGLIST_QUERY_FORMAT, '-'],
stdin=input_file, stdout=subprocess.PIPE)
return (NEVR.from_tsv_line(line)
for line in query.communicate()[0].splitlines())
def read_pkglist(path):
if rpm:
result = _read_pkglist_rpm(path)
else:
result = _read_pkglist_pkglist_query(path)
return [r for r in result if r]
def read_src_dot_list(repo_path):
path = os.path.join(repo_path, 'files/list/src.list.xz')
LOG.info("Reading src.list %s", path)
result = []
with _uncompressed(path) as input_file:
for line in input_file:
try:
name, evr = line.split(b'\t', 2)[:2]
e, v, r = rpm_ffi.parse_evr(evr)
result.append(NEVR(name, e, v, r))
except Exception:
LOG.warning('Failed to parse line %r', line, exc_info=True)
return frozenset(result)
def read_srclists(prefix, arches):
result = frozenset()
for arch in arches:
srclist = os.path.join(prefix, arch, 'base', 'srclist.classic.xz')
result = result.union(read_pkglist(srclist))
if not result:
raise RuntimeError('Empty lists at %s' % prefix)
return result
def read_all_srclists(repos):
return dict((name, read_src_dot_list(v['path']))
for name, v in repos.items())