6634f39f4b
Sometimes, when lists are updated while we are reading them, we get 0 results, and all our reports become very confusing. This change adds a check to raise an exception in this case.
128 lines
3.5 KiB
Python
128 lines
3.5 KiB
Python
|
|
import collections
|
|
import logging
|
|
import os
|
|
import subprocess
|
|
|
|
try:
|
|
import rpm
|
|
except ImportError:
|
|
rpm = None
|
|
|
|
|
|
LOG = logging.getLogger(__name__)
|
|
|
|
|
|
class NEVR(collections.namedtuple('NEVR', ['name', 'epoch',
|
|
'version', 'release'])):
|
|
|
|
if rpm:
|
|
@classmethod
|
|
def from_header(cls, header):
|
|
return cls(header[rpm.RPMTAG_NAME],
|
|
header[rpm.RPMTAG_EPOCH],
|
|
header[rpm.RPMTAG_VERSION],
|
|
header[rpm.RPMTAG_RELEASE])
|
|
|
|
@classmethod
|
|
def from_tsv_line(cls, line):
|
|
"""Creates a NEVR object from a tab-separated line.
|
|
|
|
The line should have the following format:
|
|
name\tepoch\tversion\trelease
|
|
"""
|
|
try:
|
|
n, e, v, r = line.split('\t')
|
|
except Exception:
|
|
LOG.error("Failed to parse line: %s", line, exc_info=True)
|
|
return None
|
|
|
|
if e in ('', '(none)', 'None'):
|
|
e = None
|
|
else:
|
|
try:
|
|
e = int(e)
|
|
except Exception:
|
|
LOG.error("Failed to parse epoch from line: %s",
|
|
line, exc_info=True)
|
|
return None
|
|
return cls(n, e, v, r)
|
|
|
|
def format_evr(self):
|
|
if self.epoch is None:
|
|
return '%s-%s' % (self.version, self.release)
|
|
else:
|
|
return '%s:%s-%s' % (self.epoch, self.version, self.release)
|
|
|
|
@property
|
|
def evr(self):
|
|
return self[1:]
|
|
|
|
|
|
def format_evr(nevr):
|
|
return nevr.format_evr() if nevr else 'MISSING'
|
|
|
|
|
|
def read_pkglist_headers_rpm(path):
|
|
LOG.info("Reading %s using python-module-rpm", path)
|
|
if not rpm:
|
|
raise RuntimeError('rpm module is not avalable')
|
|
if path.endswith('.xz'):
|
|
xz = subprocess.Popen(['xz', '-dc', path], stdout=subprocess.PIPE)
|
|
input_file = xz.stdout
|
|
else:
|
|
input_file = open(path, 'rb')
|
|
|
|
try:
|
|
return rpm.readHeaderListFromFD(input_file.fileno())
|
|
finally:
|
|
input_file.close()
|
|
|
|
|
|
def _read_pkglist_rpm(path):
|
|
return (NEVR.from_header(h) for h in read_pkglist_headers_rpm(path))
|
|
|
|
|
|
_PKGLIST_QUERY_FORMAT = '%{NAME}\t%{EPOCH}\t%{VERSION}\t%{RELEASE}\n'
|
|
|
|
|
|
def _read_pkglist_pkglist_query(path):
|
|
LOG.info("Reading %s using pkglist-query", path)
|
|
if path.endswith('.xz'):
|
|
xz = subprocess.Popen(["xz", '-dc', path], stdout=subprocess.PIPE)
|
|
try:
|
|
query = subprocess.Popen(
|
|
["pkglist-query", _PKGLIST_QUERY_FORMAT, '-'],
|
|
stdin=xz.stdout, stdout=subprocess.PIPE)
|
|
finally:
|
|
xz.stdout.close() # Allow xz to receive a SIGPIPE if p2 exits.
|
|
else:
|
|
query = subprocess.Popen(
|
|
["pkglist-query", _PKGLIST_QUERY_FORMAT, path],
|
|
stdout=subprocess.PIPE)
|
|
return (NEVR.from_tsv_line(l)
|
|
for l in query.communicate()[0].splitlines())
|
|
|
|
|
|
def read_pkglist(path):
|
|
if rpm:
|
|
result = _read_pkglist_rpm(path)
|
|
else:
|
|
result = _read_pkglist_pkglist_query(path)
|
|
return [r for r in result if r]
|
|
|
|
|
|
def read_srclists(prefix, arches):
|
|
result = frozenset()
|
|
for arch in arches:
|
|
srclist = os.path.join(prefix, arch, 'base', 'srclist.classic.xz')
|
|
result = result.union(read_pkglist(srclist))
|
|
if not result:
|
|
raise RuntimeError('Empty lists at %s' % prefix)
|
|
return result
|
|
|
|
|
|
def read_all_srclists(repos):
|
|
return dict((name, read_srclists(v['path'], v['arch']))
|
|
for name, v in repos.items())
|