port-compare/port_stats/lists.py
Ivan A. Melnikov cfd8dee1c6 More python3 compatibility
Try to convert all names, versions and releases
to str when reading.

Now we can actually use the interactive shell.
2023-08-02 17:05:46 +04:00

173 lines
5.0 KiB
Python

import collections
import logging
import os
import subprocess
try:
import rpm
except ImportError:
rpm = None
from port_stats import rpm_ffi
LOG = logging.getLogger(__name__)
def _as_str(item):
if isinstance(item, str):
return item
if isinstance(item, bytes):
return item.decode('utf-8', errors='replace')
return str(item)
class NEVR(collections.namedtuple('NEVR', ['name', 'epoch',
'version', 'release'])):
if rpm:
@classmethod
def from_header(cls, header):
return cls(_as_str(header[rpm.RPMTAG_NAME]),
_as_str(header[rpm.RPMTAG_EPOCH]),
_as_str(header[rpm.RPMTAG_VERSION]),
_as_str(header[rpm.RPMTAG_RELEASE]))
@classmethod
def from_tsv_line(cls, line):
"""Creates a NEVR object from a tab-separated line.
The line should have the following format:
name\tepoch\tversion\trelease
"""
try:
n, e, v, r = line.split(b'\t')
except Exception:
LOG.error("Failed to parse line: %s", line, exc_info=True)
return None
if e in ('', '(none)', 'None'):
e = None
else:
try:
e = int(e)
except Exception:
LOG.error("Failed to parse epoch from line: %s",
line, exc_info=True)
return None
return cls(_as_str(n), e, _as_str(v), _as_str(r))
def format_evr(self):
if self.epoch is None:
return '%s-%s' % (self.version, self.release)
else:
return '%s:%s-%s' % (self.epoch, self.version, self.release)
@property
def evr(self):
return self[1:]
def format_evr(nevr):
return nevr.format_evr() if nevr else 'MISSING'
def read_pkglist_headers_rpm(path):
LOG.info("Reading %s using python-module-rpm", path)
if not rpm:
raise RuntimeError('rpm module is not avalable')
if path.endswith('.xz'):
xz = subprocess.Popen(['xz', '-dc', path], stdout=subprocess.PIPE)
input_file = xz.stdout
else:
input_file = open(path, 'rb')
try:
return rpm.readHeaderListFromFD(input_file.fileno())
finally:
input_file.close()
def read_pkglist_heders_for_repo(repo_path, arches, components=None):
bin_headers = []
src_headers = []
for arch in arches:
basedir = os.path.join(repo_path, arch, 'base')
for pkglist in os.listdir(basedir):
parts = pkglist.split('.', 3)
if parts[0] not in ('pkglist', 'srclist'):
continue
if components is not None and parts[1] not in components:
continue
(src_headers if parts[0] == 'srclist' else bin_headers).extend(
read_pkglist_headers_rpm(os.path.join(basedir, pkglist)))
return src_headers, bin_headers
def _read_pkglist_rpm(path):
return (NEVR.from_header(h) for h in read_pkglist_headers_rpm(path))
_PKGLIST_QUERY_FORMAT = '%{NAME}\t%{EPOCH}\t%{VERSION}\t%{RELEASE}\n'
def _read_pkglist_pkglist_query(path):
LOG.info("Reading %s using pkglist-query", path)
if path.endswith('.xz'):
xz = subprocess.Popen(["xz", '-dc', path], stdout=subprocess.PIPE)
try:
query = subprocess.Popen(
["pkglist-query", _PKGLIST_QUERY_FORMAT, '-'],
stdin=xz.stdout, stdout=subprocess.PIPE)
finally:
xz.stdout.close() # Allow xz to receive a SIGPIPE if p2 exits.
else:
query = subprocess.Popen(
["pkglist-query", _PKGLIST_QUERY_FORMAT, path],
stdout=subprocess.PIPE)
return (NEVR.from_tsv_line(line)
for line in query.communicate()[0].splitlines())
def read_pkglist(path):
if rpm:
result = _read_pkglist_rpm(path)
else:
result = _read_pkglist_pkglist_query(path)
return [r for r in result if r]
def read_src_dot_list(repo_path):
path = os.path.join(repo_path, 'files/list/src.list.xz')
LOG.info("Reading src.list %s", path)
xz = subprocess.Popen(['xz', '-dc', path], stdout=subprocess.PIPE)
input_file = xz.stdout
result = []
try:
for line in input_file:
try:
name, evr = line.split(b'\t', 2)[:2]
e, v, r = rpm_ffi.parse_evr(evr)
result.append(NEVR(_as_str(name), e, _as_str(v), _as_str(r)))
except Exception:
LOG.warning('Failed to parse line %r', line, exc_info=True)
finally:
input_file.close()
return frozenset(result)
def read_srclists(prefix, arches):
result = frozenset()
for arch in arches:
srclist = os.path.join(prefix, arch, 'base', 'srclist.classic.xz')
result = result.union(read_pkglist(srclist))
if not result:
raise RuntimeError('Empty lists at %s' % prefix)
return result
def read_all_srclists(repos):
return dict((name, read_src_dot_list(v['path']))
for name, v in repos.items())