133 lines
3.3 KiB
Python
Executable File
133 lines
3.3 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
|
|
from __future__ import annotations
|
|
|
|
import argparse
|
|
import dataclasses
|
|
import json
|
|
import logging
|
|
import os
|
|
import pathlib
|
|
import sys
|
|
|
|
import typing
|
|
|
|
import rpm
|
|
|
|
|
|
@dataclasses.dataclass
|
|
class Package:
|
|
__slots__ = ("_header", )
|
|
|
|
_header: rpm.hdr
|
|
|
|
def __str__(self) -> str:
|
|
return f"{self.name}-{self.version}-{self.release}"
|
|
|
|
def __eq__(self, other: Package) -> bool:
|
|
return rpm.versionCompare(self._header, other._header) == 0
|
|
|
|
def __lt__(self, other: Package) -> bool:
|
|
return rpm.versionCompare(self._header, other._header) == -1
|
|
|
|
def __gt__(self, other: Package) -> bool:
|
|
return rpm.versionCompare(self._header, other._header) == 1
|
|
|
|
@property
|
|
def name(self) -> str:
|
|
return self._header[rpm.RPMTAG_NAME].decode()
|
|
|
|
@property
|
|
def version(self) -> str:
|
|
return self._header[rpm.RPMTAG_VERSION].decode()
|
|
|
|
@property
|
|
def release(self) -> str:
|
|
return self._header[rpm.RPMTAG_RELEASE].decode()
|
|
|
|
@property
|
|
def epoch(self) -> int:
|
|
return self._header[rpm.RPMTAG_EPOCH]
|
|
|
|
def to_dict(self) -> dict:
|
|
return {
|
|
"name": self.name,
|
|
"version": self.version,
|
|
"release": self.release,
|
|
"epoch": self.epoch
|
|
}
|
|
|
|
|
|
PackageMapping: typing.TypeAlias = dict[bytes, Package]
|
|
|
|
|
|
@dataclasses.dataclass
|
|
class UpdateDiff:
|
|
__slots__ = ("new_pkg", "old_pkg")
|
|
|
|
new_pkg: Package
|
|
old_pkg: Package
|
|
|
|
def to_dict(self) -> dict:
|
|
return {
|
|
"new": self.new_pkg.to_dict(),
|
|
"old": self.old_pkg.to_dict(),
|
|
}
|
|
|
|
|
|
def unique(a: PackageMapping, b: PackageMapping) -> PackageMapping:
|
|
unique_names = set(a.keys()).difference(set(b.keys()))
|
|
return {n: a[n] for n in unique_names}
|
|
|
|
|
|
def get_update_diff(a: PackageMapping, b: PackageMapping) -> list[UpdateDiff]:
|
|
return [UpdateDiff(a[n], b[n])
|
|
for n in a.keys() & b.keys() if a[n] > b[n]]
|
|
|
|
|
|
def load_pkgs(dbpath: str | os.PathLike = None) -> PackageMapping:
|
|
if dbpath:
|
|
if not pathlib.Path(dbpath, "Packages").exists():
|
|
logging.error(f"{dbpath} database not exists")
|
|
sys.exit(1)
|
|
|
|
rpm.addMacro("_dbpath", dbpath)
|
|
|
|
pkgs = {hdr[rpm.RPMTAG_NAME]: Package(hdr) for hdr in rpm.TransactionSet().dbMatch()}
|
|
|
|
if dbpath:
|
|
rpm.delMacro("_dbpath")
|
|
|
|
return pkgs
|
|
|
|
|
|
def handle(args: argparse.Namespace) -> None:
|
|
[dbpath_a, dbpath_b] = args.dbpath_a, args.dbpath_b
|
|
[a, b] = load_pkgs(dbpath_a), load_pkgs(dbpath_b)
|
|
|
|
result_json = {
|
|
"new": [pkg.to_dict() for _, pkg in unique(a, b).items()],
|
|
"removed": [pkg.to_dict() for _, pkg in unique(b, a).items()],
|
|
"updated": [diff.to_dict() for diff in get_update_diff(a, b)],
|
|
}
|
|
|
|
print(json.dumps(result_json, indent=args.indent))
|
|
|
|
|
|
def main() -> None:
|
|
logging.basicConfig(level=logging.WARNING)
|
|
|
|
parser = argparse.ArgumentParser(description="prints diff between two RPMs Berkley DB")
|
|
parser.add_argument("-da", "--dbpath_a", dest="dbpath_a", default=None)
|
|
parser.add_argument("-db", "--dbpath_b", dest="dbpath_b", default=None)
|
|
parser.add_argument("-i", "--indent", type=int)
|
|
|
|
parser.set_defaults(handle=handle)
|
|
|
|
args = parser.parse_args()
|
|
args.handle(args)
|
|
|
|
|
|
if __name__ == '__main__':
|
|
main()
|