Major port_stats refactoring
- introduce the module - rewrite srclists/stats in the terms of it - get rid of the hard dependency on python-module-rpm (the new code can use pkglist-query and rpmevrcmp instead)
This commit is contained in:
parent
a8d7275657
commit
e66fbdd2bb
0
port_stats/__init__.py
Normal file
0
port_stats/__init__.py
Normal file
59
port_stats/colorize.py
Normal file
59
port_stats/colorize.py
Normal file
@ -0,0 +1,59 @@
|
||||
|
||||
import collections
|
||||
import subprocess
|
||||
|
||||
|
||||
COLORS = ('RED', 'ORANGE', 'YELLOW', 'SLATE', 'GREEN', 'EXTRA')
|
||||
|
||||
LEGEND = {
|
||||
"RED": "totally missing",
|
||||
"GREEN": "same NEVR in both repositories",
|
||||
"EXTRA": "present in new repo only (distro-specific or not yet deleted)",
|
||||
"ORANGE": "EV is different (we need the latest version)",
|
||||
"YELLOW": "EV is the same, release is smaller (we need the latest build)",
|
||||
"SLATE": "EV is the same, release is larger (we need to push the fixes)"
|
||||
}
|
||||
|
||||
def _compare_evr(nevr1, nevr2):
|
||||
p = subprocess.Popen(['rpmevrcmp',
|
||||
nevr1.format_evr(), nevr2.format_evr()],
|
||||
stdout=subprocess.PIPE)
|
||||
return int(p.communicate()[0])
|
||||
|
||||
|
||||
def _colorize(base_package, new_package):
|
||||
if base_package == new_package:
|
||||
return 'GREEN'
|
||||
if new_package is None:
|
||||
return 'RED'
|
||||
if base_package is None:
|
||||
return 'EXTRA'
|
||||
|
||||
if base_package.epoch != new_package.epoch:
|
||||
return 'ORANGE'
|
||||
if base_package.version != new_package.version:
|
||||
return 'ORANGE'
|
||||
|
||||
c = _compare_evr(base_package, new_package)
|
||||
if c < 0: # base package is newer
|
||||
return 'YELLOW'
|
||||
return 'SLATE'
|
||||
|
||||
|
||||
def colorize(base, new):
|
||||
b_idx = dict((p.name, p) for p in base)
|
||||
n_idx = dict((p.name, p) for p in new)
|
||||
|
||||
by_name = dict()
|
||||
by_color = collections.defaultdict(list)
|
||||
|
||||
for name in frozenset(b_idx).union(n_idx):
|
||||
bp = b_idx.get(name)
|
||||
np = n_idx.get(name)
|
||||
color = _colorize(bp, np)
|
||||
by_name[name] = (color, bp, np)
|
||||
by_color[color].append((name, bp, np))
|
||||
|
||||
by_color = dict((color, sorted(by_color.get(color, [])))
|
||||
for color in COLORS)
|
||||
return by_name, by_color
|
102
port_stats/lists.py
Normal file
102
port_stats/lists.py
Normal file
@ -0,0 +1,102 @@
|
||||
|
||||
import logging
|
||||
import subprocess
|
||||
import collections
|
||||
import sys
|
||||
|
||||
try:
|
||||
import rpm
|
||||
except ImportError:
|
||||
rpm = None
|
||||
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class NEVR(collections.namedtuple('NEVR', ['name', 'epoch',
|
||||
'version', 'release'])):
|
||||
|
||||
if rpm:
|
||||
@classmethod
|
||||
def from_header(cls, header):
|
||||
return cls(header[rpm.RPMTAG_NAME],
|
||||
header[rpm.RPMTAG_EPOCH],
|
||||
header[rpm.RPMTAG_VERSION],
|
||||
header[rpm.RPMTAG_RELEASE])
|
||||
|
||||
@classmethod
|
||||
def from_tsv_line(cls, line):
|
||||
"""Creates a NEVR object from a tab-separated line.
|
||||
|
||||
The line should have the following format:
|
||||
name\tepoch\tversion\trelease
|
||||
"""
|
||||
try:
|
||||
n, e, v, r = line.split('\t')
|
||||
except Exception:
|
||||
LOG.error("Failed to parse line: %s", line, exc_info=True)
|
||||
return None
|
||||
|
||||
if e in ('', '(none)', 'None'):
|
||||
e = None
|
||||
else:
|
||||
try:
|
||||
e = int(e)
|
||||
except Exception:
|
||||
LOG.error("Failed to parse epoch from line: %s",
|
||||
line, exc_info=True)
|
||||
return None
|
||||
return cls(n, e, v, r)
|
||||
|
||||
def format_evr(self):
|
||||
if self.epoch is None:
|
||||
return '%s-%s' % (self.version, self.release)
|
||||
else:
|
||||
return '%s:%s-%s' %(self.epoch, self.version, self.release)
|
||||
|
||||
|
||||
def format_evr(nevr):
|
||||
return nevr.format_evr() if nevr else 'MISSING'
|
||||
|
||||
def _read_pkglist_rpm(path):
|
||||
if path.endswith('.xz'):
|
||||
xz = subprocess.Popen(['xz', '-dc', path], stdout=subprocess.PIPE)
|
||||
input_file = xz.stdout
|
||||
else:
|
||||
input_file = open(path, 'rb')
|
||||
|
||||
try:
|
||||
headers = rpm.readHeaderListFromFD(input_file.fileno())
|
||||
finally:
|
||||
input_file.close()
|
||||
return (NEVR.from_header(h) for h in headers)
|
||||
|
||||
|
||||
|
||||
_PKGLIST_QUERY_FORMAT='%{NAME}\t%{EPOCH}\t%{VERSION}\t%{RELEASE}\n'
|
||||
|
||||
|
||||
def _read_pkglist_pkglist_query(path):
|
||||
if path.endswith('.xz'):
|
||||
xz = subprocess.Popen(["xz", '-dc', path], stdout=subprocess.PIPE)
|
||||
try:
|
||||
query = subprocess.Popen(
|
||||
["pkglist-query", _PKGLIST_QUERY_FORMAT, '-'],
|
||||
stdin=xz.stdout, stdout=subprocess.PIPE)
|
||||
finally:
|
||||
xz.stdout.close() # Allow xz to receive a SIGPIPE if p2 exits.
|
||||
else:
|
||||
query = subprocess.Popen(
|
||||
["pkglist-query", _PKGLIST_QUERY_FORMAT, path],
|
||||
stdout=subprocess.PIPE)
|
||||
return (NEVR.from_tsv_line(l)
|
||||
for l in query.communicate()[0].splitlines())
|
||||
|
||||
|
||||
def read_pkglist(path):
|
||||
if rpm:
|
||||
result = _read_pkglist_rpm(path)
|
||||
else:
|
||||
LOG.info("Module `rpm` not found; trying pkglist-query...")
|
||||
result = _read_pkglist_pkglist_query(path)
|
||||
return [r for r in result if r]
|
10
port_stats/reports.py
Normal file
10
port_stats/reports.py
Normal file
@ -0,0 +1,10 @@
|
||||
|
||||
from __future__ import print_function
|
||||
|
||||
from port_stats import colorize
|
||||
|
||||
def print_totals(by_color, out_file):
|
||||
for color in colorize.COLORS:
|
||||
message = "%10s:%6s // %s" % (
|
||||
color, len(by_color[color]), colorize.LEGEND[color])
|
||||
print(message, file=out_file)
|
41
port_stats/srclist_stats.py
Normal file
41
port_stats/srclist_stats.py
Normal file
@ -0,0 +1,41 @@
|
||||
|
||||
from __future__ import print_function
|
||||
|
||||
import sys
|
||||
|
||||
from port_stats import colorize
|
||||
from port_stats import lists
|
||||
from port_stats import reports
|
||||
|
||||
|
||||
def read_lists(arg):
|
||||
result = frozenset()
|
||||
for filename in arg.split(','):
|
||||
result = result.union(lists.read_pkglist(filename))
|
||||
return result
|
||||
|
||||
|
||||
def print_names(color, packages):
|
||||
print("\n\n%s (%s):" % (color, colorize.LEGEND[color]))
|
||||
for name, bp, np in packages:
|
||||
print("\t%s\t%s -> %s" %
|
||||
(name, lists.format_evr(np), lists.format_evr(bp)))
|
||||
|
||||
|
||||
def main(base_arch_paths, new_arch_paths):
|
||||
base = read_lists(base_arch_paths)
|
||||
new = read_lists(new_arch_paths)
|
||||
|
||||
print("Base repo size:", len(base))
|
||||
print("New repo size:", len(new))
|
||||
|
||||
by_name, by_color = colorize.colorize(base, new)
|
||||
|
||||
print("Statistics:")
|
||||
reports.print_totals(by_color, sys.stdout)
|
||||
for color in colorize.COLORS:
|
||||
print_names(color, by_color[color])
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
sys.exit(main(*sys.argv[1:]))
|
Loading…
Reference in New Issue
Block a user