various-tools/bdbdiff

133 lines
3.3 KiB
Python
Executable File

#!/usr/bin/env python3
from __future__ import annotations
import argparse
import dataclasses
import json
import logging
import os
import pathlib
import sys
import typing
import rpm
@dataclasses.dataclass
class Package:
__slots__ = ("_header", )
_header: rpm.hdr
def __str__(self) -> str:
return f"{self.name}-{self.version}-{self.release}"
def __eq__(self, other: Package) -> bool:
return rpm.versionCompare(self._header, other._header) == 0
def __lt__(self, other: Package) -> bool:
return rpm.versionCompare(self._header, other._header) == -1
def __gt__(self, other: Package) -> bool:
return rpm.versionCompare(self._header, other._header) == 1
@property
def name(self) -> str:
return self._header[rpm.RPMTAG_NAME].decode()
@property
def version(self) -> str:
return self._header[rpm.RPMTAG_VERSION].decode()
@property
def release(self) -> str:
return self._header[rpm.RPMTAG_RELEASE].decode()
@property
def epoch(self) -> int:
return self._header[rpm.RPMTAG_EPOCH]
def to_dict(self) -> dict:
return {
"name": self.name,
"version": self.version,
"release": self.release,
"epoch": self.epoch
}
PackageMapping: typing.TypeAlias = dict[bytes, Package]
@dataclasses.dataclass
class UpdateDiff:
__slots__ = ("new_pkg", "old_pkg")
new_pkg: Package
old_pkg: Package
def to_dict(self) -> dict:
return {
"new": self.new_pkg.to_dict(),
"old": self.old_pkg.to_dict(),
}
def unique(a: PackageMapping, b: PackageMapping) -> PackageMapping:
unique_names = set(a.keys()).difference(set(b.keys()))
return {n: a[n] for n in unique_names}
def get_update_diff(a: PackageMapping, b: PackageMapping) -> list[UpdateDiff]:
return [UpdateDiff(a[n], b[n])
for n in a.keys() & b.keys() if a[n] > b[n]]
def load_pkgs(dbpath: str | os.PathLike = None) -> PackageMapping:
if dbpath:
if not pathlib.Path(dbpath, "Packages").exists():
logging.error(f"{dbpath} database not exists")
sys.exit(1)
rpm.addMacro("_dbpath", dbpath)
pkgs = {hdr[rpm.RPMTAG_NAME]: Package(hdr) for hdr in rpm.TransactionSet().dbMatch()}
if dbpath:
rpm.delMacro("_dbpath")
return pkgs
def handle(args: argparse.Namespace) -> None:
[dbpath_a, dbpath_b] = args.dbpath_a, args.dbpath_b
[a, b] = load_pkgs(dbpath_a), load_pkgs(dbpath_b)
result_json = {
"new": [pkg.to_dict() for _, pkg in unique(a, b).items()],
"removed": [pkg.to_dict() for _, pkg in unique(b, a).items()],
"updated": [diff.to_dict() for diff in get_update_diff(a, b)],
}
print(json.dumps(result_json, indent=args.indent))
def main() -> None:
logging.basicConfig(level=logging.WARNING)
parser = argparse.ArgumentParser(description="prints diff between two RPMs Berkley DB")
parser.add_argument("-da", "--dbpath_a", dest="dbpath_a", default=None)
parser.add_argument("-db", "--dbpath_b", dest="dbpath_b", default=None)
parser.add_argument("-i", "--indent", type=int)
parser.set_defaults(handle=handle)
args = parser.parse_args()
args.handle(args)
if __name__ == '__main__':
main()