repos: New tool in the box

We try to model repository as collection of source and
binary packages with their dependencies, and do some
stuff with it.

The initial version can calculate a list of unmets.
This commit is contained in:
Ivan A. Melnikov 2021-09-02 16:33:43 +04:00
parent 33d59af9e6
commit c84192c52a
2 changed files with 177 additions and 0 deletions

168
port_stats/repos.py Normal file
View File

@ -0,0 +1,168 @@
# This is Python3
# This module does not use str: use bytes everywhere.
import collections
import logging
import rpm
from port_stats import lists
from port_stats import rpm_ffi
LOG = logging.getLogger(__name__)
class Dependency(collections.namedtuple(
'Dependency', ['name', 'flags', 'version'])):
HEADER_TRIPLETS = {
'require': (rpm.RPMTAG_REQUIRENAME,
rpm.RPMTAG_REQUIREFLAGS,
rpm.RPMTAG_REQUIREVERSION),
'provide': (rpm.RPMTAG_PROVIDENAME,
rpm.RPMTAG_PROVIDEFLAGS,
rpm.RPMTAG_PROVIDEVERSION)
}
RPMSENSE_COMPARE = (rpm.RPMSENSE_EQUAL
| rpm.RPMSENSE_GREATER
| rpm.RPMSENSE_LESS)
@classmethod
def from_header(cls, header, kind):
tp, tf, tv = cls.HEADER_TRIPLETS[kind]
triplets = zip(header[tp], header[tf], header[tv])
for name, flags, version in triplets:
if not name.startswith(b'rpmlib('):
yield cls(name, flags, version or None)
if kind == 'provide':
for name in header[rpm.RPMTAG_FILENAMES]:
yield cls(name, 0, None)
yield cls(header[rpm.RPMTAG_NAME],
rpm.RPMSENSE_EQUAL,
header.format('%{EVR}:%{DISTTAG}').encode('utf-8'))
def is_provide_for(self, other):
if self.name != other.name:
return False
if self.version is None or other.version is None:
return True
return bool(rpm_ffi.ranges_overlap(
self.name, self.version, self.flags,
other.name, other.version, other.flags))
class Source:
def __init__(self, name, epoch, version, release):
self.name = name
self.epoch = epoch
self.version = version
self.release = release
self.requires = set()
self.bin_names = set()
@classmethod
def from_header(cls, header):
'''Initialize Source from rpm.header corresponding to SRPM'''
src = cls(header[rpm.RPMTAG_NAME],
header[rpm.RPMTAG_EPOCH],
header[rpm.RPMTAG_VERSION],
header[rpm.RPMTAG_RELEASE])
src.requires.update(Dependency.from_header(header, 'require'))
return src
def __repr__(self):
return 'Source[{} {} {} {}]'.format(
self.name, self.epoch, self.version, self.release)
class Binary:
def __init__(self, name, epoch, version, release, source_rpm):
self.name = name
self.epoch = epoch
self.version = version
self.release = release
self.source_rpm = source_rpm
self.source_name = source_rpm.rsplit(b'-', 2)[0]
self.requires = set()
self.provides = set()
@classmethod
def from_header(cls, header):
'''Initialize Source from rpm.header corresponding to SRPM'''
pkg = cls(header[rpm.RPMTAG_NAME],
header[rpm.RPMTAG_EPOCH],
header[rpm.RPMTAG_VERSION],
header[rpm.RPMTAG_RELEASE],
header[rpm.RPMTAG_SOURCERPM])
pkg.requires.update(Dependency.from_header(header, 'require'))
pkg.provides.update(Dependency.from_header(header, 'provide'))
return pkg
def __repr__(self):
return 'Binary[{} {} {} {}]'.format(
self.name, self.epoch, self.version, self.release)
class Repository:
def __init__(self, repo_name, sources, binaries):
self.name = repo_name
self.sources = sources
self.binaries = binaries
self.reverse_prov = {} # name -> [(provide, binary)]
self.update_indexes()
def update_indexes(self):
rprov = collections.defaultdict(list)
for b in self.binaries.values():
for p in b.provides:
rprov[p.name].append((p, b))
self.reverse_prov = dict(rprov)
@classmethod
def load(cls, repo_name, path, arch, components=('classic',)):
src_list, bin_list = lists.read_pkglist_heders_for_repo(
path, arch, components)
sources = {}
for header in src_list:
name = header[rpm.RPMTAG_NAME]
if name not in sources:
sources[name] = Source.from_header(header)
binaries = {}
for header in bin_list:
name = header[rpm.RPMTAG_NAME]
found = binaries.get(name)
if not found:
binaries[name] = Binary.from_header(header)
else:
LOG.warning('Duplicate binaries: %s %s', found,
header.format('%{NAME}-%{EVR}:%{DISTTAG})'))
return cls(repo_name, sources, binaries)
@classmethod
def load_from_config(cls, repo_name, config):
r = config['repos'][repo_name]
return cls.load(repo_name, r['path'], r['arch'])
def providers(self, dependency):
for item in self.reverse_prov.get(dependency.name, ()):
if item[0].is_provide_for(dependency):
yield item
def unmets(self):
result = []
for pkg in self.binaries.values():
for dep in pkg.requires:
if not any(self.providers(dep)):
result.append((pkg, dep))
return result
if __name__ == '__main__':
from port_stats.utils import interactive_setup
CONFIG = interactive_setup()
repo = Repository.load_from_config('sisyphus_riscv64', CONFIG)

View File

@ -12,6 +12,9 @@ rpmEVRcmp(const char * const aE, const char * const aV, const char * const aR,
const char * const aDepend,
const char * const bE, const char * const bV, const char * const bR,
const char * const bDepend);
int rpmRangesOverlap(const char * AName, const char * AEVR, int AFlags,
const char * BName, const char * BEVR, int BFlags);
"""
@ -44,6 +47,12 @@ def parse_evr(evr):
return epoch, _pp_str(v), _pp_str(r)
def ranges_overlap(aname, aevr, aflags, bname, bevr, bflags):
return _LIBRPM.rpmRangesOverlap(
_FFI.new('char[]', aname), _FFI.new('char[]', aevr), aflags,
_FFI.new('char[]', bname), _FFI.new('char[]', bevr), bflags)
def _epoch_to_pchar(epoch, mode):
if mode not in ('pkg', 'deps'):
raise ValueError("Epoch mode should be one of "