Ivan A. Melnikov d52b3e4725 inapt: Avoid using name property of RPM header object
This property was not supported in older python-module-rpm.
2021-08-15 20:27:15 +04:00

109 lines
3.6 KiB
Python

from __future__ import print_function
import collections
import itertools
import logging
import rpm
import lists
LOG = logging.getLogger(__name__)
class Dependency(collections.namedtuple(
'Dependency', ['name', 'flags', 'version'])):
HEADER_TRIPLETS = {
'require': (rpm.RPMTAG_REQUIRENAME,
rpm.RPMTAG_REQUIREFLAGS,
rpm.RPMTAG_REQUIREVERSION),
'provide': (rpm.RPMTAG_PROVIDENAME,
rpm.RPMTAG_PROVIDEFLAGS,
rpm.RPMTAG_PROVIDEVERSION)
}
@classmethod
def from_header(cls, header, kind):
tp, tf, tv = cls.HEADER_TRIPLETS[kind]
triplets = itertools.izip(header[tp], header[tf], header[tv])
for name, flags, version in triplets:
if not name.startswith('rpmlib('):
yield cls(name, flags, version or None)
if kind == 'provide':
for name in header[rpm.RPMTAG_FILENAMES]:
yield cls(name, 0, None)
class DependencyInfo(collections.namedtuple(
'DependencyInfo', ['src_req', 'srcrpms',
'bin_req', 'reverse_prov'])):
@classmethod
def load(cls, repo):
src_list, bin_list = lists.read_pkglist_heders_for_repo(
repo['path'], repo['arch'], components=['classic'])
def reqs(header):
return frozenset(Dependency.from_header(header, 'require'))
def name(header):
return header[rpm.RPMTAG_NAME]
LOG.info('Building BuildReq cache...')
src_req = dict((name(h), reqs(h)) for h in src_list)
LOG.info('Building SourceRPM cache...')
srcrpms = dict((name(h), h[rpm.RPMTAG_SOURCERPM].rsplit('-', 2)[0])
for h in bin_list)
LOG.info('Building Req cache...')
bin_req = dict((name(h), reqs(h)) for h in bin_list)
LOG.info('Building reverse Prov cache...')
reverse_prov = collections.defaultdict(set)
for h in bin_list:
for p in Dependency.from_header(h, 'provide'):
reverse_prov[p.name].add((name(h), p))
LOG.info('Info loading: DONE')
return cls(src_req, srcrpms, bin_req, reverse_prov)
def _srpm_report(self, srpm):
result = collections.defaultdict(lambda: collections.defaultdict(set))
for dep in self.src_req[srpm]:
for p in self.reverse_prov[dep.name]:
bin_name = p[0]
src_name = self.srcrpms[bin_name]
result[src_name][p[0]].add(dep.name)
return result
def _format_srpm_report(self, report, colorize=None):
for name, provs in sorted(report.iteritems()):
if colorize:
yield "%-8s" % colorize(name)
yield '%-30s' % name
for k, v in provs.iteritems():
yield ' %s [%s]' % (k, ' '.join(sorted(v)))
yield '\n'
def simple_srpm_report(self, srpm, colorize=None):
return ''.join(self._format_srpm_report(
self._srpm_report(srpm), colorize))
def recursive_srpm_report(self, srpm, colorize):
stack = [srpm]
seen = set(stack)
output = []
while stack:
cur = stack.pop()
report = self._srpm_report(cur)
for item in report:
if colorize(item) == 'RED' and item not in seen:
stack.append(item)
seen.add(item)
output.append('== %s ==\n' % cur)
output.extend(self._format_srpm_report(report, colorize))
output.append('\n')
return ''.join(output)