from __future__ import print_function import collections import itertools import logging import rpm import lists LOG = logging.getLogger(__name__) class Dependency(collections.namedtuple( 'Dependency', ['name', 'flags', 'version'])): HEADER_TRIPLETS = { 'require': (rpm.RPMTAG_REQUIRENAME, rpm.RPMTAG_REQUIREFLAGS, rpm.RPMTAG_REQUIREVERSION), 'provide': (rpm.RPMTAG_PROVIDENAME, rpm.RPMTAG_PROVIDEFLAGS, rpm.RPMTAG_PROVIDEVERSION) } @classmethod def from_header(cls, header, kind): tp, tf, tv = cls.HEADER_TRIPLETS[kind] triplets = itertools.izip(header[tp], header[tf], header[tv]) for name, flags, version in triplets: if not name.startswith('rpmlib('): yield cls(name, flags, version or None) if kind == 'provide': for name in header[rpm.RPMTAG_FILENAMES]: yield cls(name, 0, None) class DependencyInfo(collections.namedtuple( 'DependencyInfo', ['src_req', 'srcrpms', 'bin_req', 'reverse_prov'])): @classmethod def load(cls, repo): src_list, bin_list = lists.read_pkglist_heders_for_repo( repo['path'], repo['arch'], components=['classic']) def reqs(header): return frozenset(Dependency.from_header(header, 'require')) def name(header): return header[rpm.RPMTAG_NAME] LOG.info('Building BuildReq cache...') src_req = dict((name(h), reqs(h)) for h in src_list) LOG.info('Building SourceRPM cache...') srcrpms = dict((name(h), h[rpm.RPMTAG_SOURCERPM].rsplit('-', 2)[0]) for h in bin_list) LOG.info('Building Req cache...') bin_req = dict((name(h), reqs(h)) for h in bin_list) LOG.info('Building reverse Prov cache...') reverse_prov = collections.defaultdict(set) for h in bin_list: for p in Dependency.from_header(h, 'provide'): reverse_prov[p.name].add((name(h), p)) LOG.info('Info loading: DONE') return cls(src_req, srcrpms, bin_req, reverse_prov) def _srpm_report(self, srpm): result = collections.defaultdict(lambda: collections.defaultdict(set)) for dep in self.src_req[srpm]: for p in self.reverse_prov[dep.name]: bin_name = p[0] src_name = self.srcrpms[bin_name] result[src_name][p[0]].add(dep.name) return result def _format_srpm_report(self, report, colorize=None): for name, provs in sorted(report.iteritems()): if colorize: yield "%-8s" % colorize(name) yield '%-30s' % name for k, v in provs.iteritems(): yield ' %s [%s]' % (k, ' '.join(sorted(v))) yield '\n' def simple_srpm_report(self, srpm, colorize=None): return ''.join(self._format_srpm_report( self._srpm_report(srpm), colorize)) def recursive_srpm_report(self, srpm, colorize): stack = [srpm] seen = set(stack) output = [] while stack: cur = stack.pop() report = self._srpm_report(cur) for item in report: if colorize(item) == 'RED' and item not in seen: stack.append(item) seen.add(item) output.append('== %s ==\n' % cur) output.extend(self._format_srpm_report(report, colorize)) output.append('\n') return ''.join(output)