#!/usr/bin/env python # # Copyright (C) 2018 Jonathan Lebon # # This library is free software; you can redistribute it and/or # modify it under the terms of the GNU Lesser General Public # License as published by the Free Software Foundation; either # version 2 of the License, or (at your option) any later version. # # This library is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU # Lesser General Public License for more details. # # You should have received a copy of the GNU Lesser General Public # License along with this library; if not, write to the # Free Software Foundation, Inc., 59 Temple Place - Suite 330, # Boston, MA 02111-1307, USA. """ This is a small helper CLI tool to create and modify updateinfo.xml data. Note that the original createrepo_c API is geared towards creating the XML in a single pass (e.g. Bodhi), so there are inefficiencies in the way we modify data below (e.g. removing data involves copying). Our goal here is to make it *really* easy to use from the command-line for testing purposes. """ import os import sys import argparse from collections import namedtuple import rpm import createrepo_c as cr def main(): args = parse_args() args.func(args) def parse_args(): parser = argparse.ArgumentParser() parser.add_argument('--repo', help='rpmmd repo path', default=os.getcwd()) subparsers = parser.add_subparsers(dest='cmd', title='subcommands') subparsers.required = True add = subparsers.add_parser('add') add.set_defaults(func=cmd_add) add.add_argument('--if-not-exists', action='store_true') add.add_argument('id') add.add_argument('type', default='security', nargs='?', choices=['bugfix', 'enhancement', 'security']) add.add_argument('severity', default='none', nargs='?', choices=['none', 'low', 'moderate', 'important', 'critical']) delete = subparsers.add_parser('delete') delete.set_defaults(func=cmd_delete) add.add_argument('--if-exists', action='store_true') delete.add_argument('id') show = subparsers.add_parser('show') show.set_defaults(func=cmd_show) show.add_argument('id', nargs='?') add_pkg = subparsers.add_parser('add-pkg') add_pkg.set_defaults(func=cmd_add_pkg) add_pkg.add_argument('id') add_pkg.add_argument('name') add_pkg.add_argument('epoch', nargs='?', type=int) add_pkg.add_argument('version', nargs='?') add_pkg.add_argument('release', nargs='?') add_pkg.add_argument('arch', nargs='?') delete_pkg = subparsers.add_parser('delete-pkg') delete_pkg.set_defaults(func=cmd_delete_pkg) delete_pkg.add_argument('id') delete_pkg.add_argument('name_or_nevra') return parser.parse_args() def cmd_add(args): uinfo = get_updateinfo(args.repo) try: uinfo = add_update(uinfo, args.id, args.type, args.severity) except UpdateExistsError: if args.if_not_exists: return raise set_updateinfo(args.repo, uinfo) def cmd_show(args): uinfo = get_updateinfo(args.repo) show_updates(uinfo, args.id) def cmd_delete(args): uinfo = get_updateinfo(args.repo) try: uinfo = delete_update(uinfo, args.id) except UpdateNotExistsError: if args.if_exists: return raise set_updateinfo(args.repo, uinfo) class Nevra(namedtuple('Nevra', 'name epoch version release arch')): def __str__(self): if self.epoch is not None and self.epoch > 0: return '%s-%u:%s-%s.%s' % (self.name, self.epoch, self.version, self.release, self.arch) return '%s-%s-%s.%s' % (self.name, self.version, self.release, self.arch) # generic equals check for cr.UpdateCollectionPackage/cr.Package objects def equals_pkg(self, pkg): for attr in ['name', 'epoch', 'version', 'release', 'arch']: if getattr(self, attr) != getattr(pkg, attr): return False return True def nevra_from_rpm(rpm_filename): ts = rpm.TransactionSet() with open(rpm_filename) as f: hdr = ts.hdrFromFdno(f) return Nevra(hdr[rpm.RPMTAG_NAME], hdr[rpm.RPMTAG_EPOCH], hdr[rpm.RPMTAG_VERSION], hdr[rpm.RPMTAG_RELEASE], hdr[rpm.RPMTAG_ARCH]) def nevra_from_obj(obj): epoch = 0 if obj.epoch is not None: epoch = int(obj.epoch) return Nevra(obj.name, epoch, obj.version, obj.release, obj.arch) def nevra_from_primary(repo, name): repomd = cr.Repomd(repomd_xml(repo)) pkgs = [] def pkgcb(pkg): if pkg.name == name: pkgs.append(pkg) for record in repomd.records: if record.type == 'primary': primary_xml = os.path.join(repo, record.location_href) cr.xml_parse_primary(primary_xml, do_files=False, pkgcb=pkgcb) break if len(pkgs) == 0: raise Exception("Package '%s' not found" % name) if len(pkgs) > 1: raise Exception("Multiple packages found for '%s'" % name) return nevra_from_obj(pkgs[0]) def cmd_add_pkg(args): uinfo = get_updateinfo(args.repo) if args.epoch is None: # user only passed the 'name' param # we support passing an RPM file from which to extract fields if os.path.isfile(args.name): nevra = nevra_from_rpm(args.name) else: # try to find pkg in primary nevra = nevra_from_primary(args.repo, args.name) else: nevra = nevra_from_obj(args) uinfo = add_pkg_to_update(uinfo, args.id, nevra) set_updateinfo(args.repo, uinfo) def cmd_delete_pkg(args): uinfo = get_updateinfo(args.repo) uinfo = delete_pkg_from_update(uinfo, args.id, args.name_or_nevra) set_updateinfo(args.repo, uinfo) def get_updateinfo(repo): ''' Parse existing repo updateinfo.xml or create a new one. ''' repomd = cr.Repomd(repomd_xml(repo)) for record in repomd.records: if record.type == 'updateinfo': return cr.UpdateInfo(os.path.join(repo, record.location_href)) return cr.UpdateInfo() def repomd_xml(repo): return os.path.join(repo, "repodata/repomd.xml") def sev2xml(sev): # important -> Important, which is what yum/dnf expects return sev[0].upper() + sev[1:] class UpdateExistsError(Exception): pass class UpdateNotExistsError(Exception): pass def add_update(uinfo, uid, utype, severity): # check that the target id doesn't already exist for update in uinfo.updates: if update.id == id: raise UpdateExistsError("Update '%s' already exists" % id) rec = cr.UpdateRecord() rec.id = uid rec.type = utype rec.severity = sev2xml(severity) uinfo.append(rec) return uinfo def modify_update(uinfo, uid, func, func_data=None): # createrepo_c doesn't allow modifying the original object new_uinfo = cr.UpdateInfo() found = False for update in uinfo.updates: if update.id != uid: new_uinfo.append(update) else: found = True if func is not None: new_update = func(uid, update, func_data) if new_update is not None: new_uinfo.append(new_update) if not found: raise Exception("Update '%s' does not exist" % uid) return new_uinfo def delete_update(uinfo, uid): return modify_update(uinfo, uid, None) def show_updates(uinfo, uid): for update in uinfo.updates: if uid is not None and update.id != uid: continue print update.id, update.type, update.severity for col in update.collections: for pkg in col.packages: print " ", pkg.filename def copy_update_no_cols(update): # the default copy() also copies collections new_update = cr.UpdateRecord() new_update.id = update.id new_update.type = update.type new_update.severity = update.severity return new_update def add_pkg_to_update_cb(uid, update, nevra): if len(update.collections) > 1: # let's just pretend that never happens for our purposes raise Exception("Update '%s' has more than one collection" % uid) elif len(update.collections) == 1: col = update.collections[0] else: col = cr.UpdateCollection() new_update = copy_update_no_cols(update) new_col = cr.UpdateCollection() for pkg in col.packages: if nevra.equals_pkg(pkg): raise Exception("Update '%s' already contains pkg '%s'" % (uid, nevra)) new_col.append(pkg) pkg = cr.UpdateCollectionPackage() pkg.name = nevra.name pkg.epoch = '0' if nevra.epoch is not None: pkg.epoch = str(nevra.epoch) pkg.version = nevra.version pkg.release = nevra.release pkg.arch = nevra.arch pkg.filename = str(nevra) + ".rpm" new_col.append(pkg) new_update.append_collection(new_col) return new_update def add_pkg_to_update(uinfo, uid, nevra): return modify_update(uinfo, uid, add_pkg_to_update_cb, nevra) def delete_pkg_from_update_cb(uid, update, name_or_nevra): if len(update.collections) > 1: # let's just pretend that never happens for our purposes raise Exception("Update '%s' has more than one collection" % uid) elif len(update.collections) == 1: col = update.collections[0] else: col = cr.UpdateCollection() new_update = copy_update_no_cols(update) new_col = cr.UpdateCollection() found = False # just compare by filename to make it easier rpm_name = name_or_nevra + '.rpm' for pkg in col.packages: if pkg.filename != rpm_name and pkg.name != name_or_nevra: new_col.append(pkg) else: found = True if not found: raise Exception("Update '%s' does not have package '%s'" % (uid, name_or_nevra)) if len(new_col.packages) > 0: new_update.append_collection(new_col) return new_update def delete_pkg_from_update(uinfo, uid, name_or_nevra): return modify_update(uinfo, uid, delete_pkg_from_update_cb, name_or_nevra) def new_updateinfo_record(repo, uinfo): xml = os.path.join(repo, "repodata/updateinfo.xml.gz") with open(xml, 'w') as f: f.write(uinfo.xml_dump()) # calculate SHA256 and rename ui_rec = cr.RepomdRecord('updateinfo', xml) ui_rec.fill(cr.SHA256) ui_rec.rename_file() return ui_rec def set_updateinfo(repo, uinfo): repomd = cr.Repomd(repomd_xml(repo)) # clone new_repomd = cr.Repomd() for record in repomd.records: if record.type != 'updateinfo': new_repomd.set_record(record) else: os.unlink(os.path.join(repo, record.location_href)) uinfo_rec = new_updateinfo_record(repo, uinfo) new_repomd.set_record(uinfo_rec) with open(repomd_xml(repo), 'w') as f: f.write(new_repomd.xml_dump()) if __name__ == '__main__': sys.exit(main())