0729487ae5
Pick up security advisories when checking for pending updates and include them in the `cached-update` property. On the client-side, display them in the output of `status`. This was part of the original vision for how useful a smart `check` mode could be. It directly impacts how one manages their individual system (e.g. when to reboot), and paves the way for integration into higher-level apps that act at the cluster level. Closes: #1249 Approved by: cgwalters
389 lines
11 KiB
Python
Executable File
389 lines
11 KiB
Python
Executable File
#!/usr/bin/env python
|
|
#
|
|
# Copyright (C) 2018 Jonathan Lebon <jlebon@redhat.com>
|
|
#
|
|
# This library is free software; you can redistribute it and/or
|
|
# modify it under the terms of the GNU Lesser General Public
|
|
# License as published by the Free Software Foundation; either
|
|
# version 2 of the License, or (at your option) any later version.
|
|
#
|
|
# This library is distributed in the hope that it will be useful,
|
|
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
|
|
# Lesser General Public License for more details.
|
|
#
|
|
# You should have received a copy of the GNU Lesser General Public
|
|
# License along with this library; if not, write to the
|
|
# Free Software Foundation, Inc., 59 Temple Place - Suite 330,
|
|
# Boston, MA 02111-1307, USA.
|
|
|
|
"""
|
|
This is a small helper CLI tool to create and modify updateinfo.xml data.
|
|
Note that the original createrepo_c API is geared towards creating the XML
|
|
in a single pass (e.g. Bodhi), so there are inefficiencies in the way we
|
|
modify data below (e.g. removing data involves copying). Our goal here is
|
|
to make it *really* easy to use from the command-line for testing purposes.
|
|
"""
|
|
|
|
import os
|
|
import sys
|
|
import argparse
|
|
|
|
from collections import namedtuple
|
|
|
|
import rpm
|
|
import createrepo_c as cr
|
|
|
|
|
|
def main():
|
|
args = parse_args()
|
|
args.func(args)
|
|
|
|
|
|
def parse_args():
|
|
|
|
parser = argparse.ArgumentParser()
|
|
parser.add_argument('--repo', help='rpmmd repo path', default=os.getcwd())
|
|
subparsers = parser.add_subparsers(dest='cmd', title='subcommands')
|
|
subparsers.required = True
|
|
|
|
add = subparsers.add_parser('add')
|
|
add.set_defaults(func=cmd_add)
|
|
add.add_argument('--if-not-exists', action='store_true')
|
|
add.add_argument('id')
|
|
add.add_argument('type', default='security', nargs='?',
|
|
choices=['bugfix', 'enhancement', 'security'])
|
|
add.add_argument('severity', default='none', nargs='?',
|
|
choices=['none', 'low', 'moderate',
|
|
'important', 'critical'])
|
|
|
|
delete = subparsers.add_parser('delete')
|
|
delete.set_defaults(func=cmd_delete)
|
|
add.add_argument('--if-exists', action='store_true')
|
|
delete.add_argument('id')
|
|
|
|
show = subparsers.add_parser('show')
|
|
show.set_defaults(func=cmd_show)
|
|
show.add_argument('id', nargs='?')
|
|
|
|
add_pkg = subparsers.add_parser('add-pkg')
|
|
add_pkg.set_defaults(func=cmd_add_pkg)
|
|
add_pkg.add_argument('id')
|
|
add_pkg.add_argument('name')
|
|
add_pkg.add_argument('epoch', nargs='?', type=int)
|
|
add_pkg.add_argument('version', nargs='?')
|
|
add_pkg.add_argument('release', nargs='?')
|
|
add_pkg.add_argument('arch', nargs='?')
|
|
|
|
delete_pkg = subparsers.add_parser('delete-pkg')
|
|
delete_pkg.set_defaults(func=cmd_delete_pkg)
|
|
delete_pkg.add_argument('id')
|
|
delete_pkg.add_argument('name_or_nevra')
|
|
|
|
return parser.parse_args()
|
|
|
|
|
|
def cmd_add(args):
|
|
|
|
uinfo = get_updateinfo(args.repo)
|
|
|
|
try:
|
|
uinfo = add_update(uinfo, args.id, args.type, args.severity)
|
|
except UpdateExistsError:
|
|
if args.if_not_exists:
|
|
return
|
|
raise
|
|
|
|
set_updateinfo(args.repo, uinfo)
|
|
|
|
|
|
def cmd_show(args):
|
|
uinfo = get_updateinfo(args.repo)
|
|
show_updates(uinfo, args.id)
|
|
|
|
|
|
def cmd_delete(args):
|
|
|
|
uinfo = get_updateinfo(args.repo)
|
|
|
|
try:
|
|
uinfo = delete_update(uinfo, args.id)
|
|
except UpdateNotExistsError:
|
|
if args.if_exists:
|
|
return
|
|
raise
|
|
|
|
set_updateinfo(args.repo, uinfo)
|
|
|
|
|
|
class Nevra(namedtuple('Nevra', 'name epoch version release arch')):
|
|
|
|
def __str__(self):
|
|
if self.epoch is not None and self.epoch > 0:
|
|
return '%s-%u:%s-%s.%s' % (self.name, self.epoch, self.version,
|
|
self.release, self.arch)
|
|
return '%s-%s-%s.%s' % (self.name, self.version,
|
|
self.release, self.arch)
|
|
|
|
# generic equals check for cr.UpdateCollectionPackage/cr.Package objects
|
|
def equals_pkg(self, pkg):
|
|
for attr in ['name', 'epoch', 'version', 'release', 'arch']:
|
|
if getattr(self, attr) != getattr(pkg, attr):
|
|
return False
|
|
return True
|
|
|
|
|
|
def nevra_from_rpm(rpm_filename):
|
|
|
|
ts = rpm.TransactionSet()
|
|
with open(rpm_filename) as f:
|
|
hdr = ts.hdrFromFdno(f)
|
|
|
|
return Nevra(hdr[rpm.RPMTAG_NAME],
|
|
hdr[rpm.RPMTAG_EPOCH],
|
|
hdr[rpm.RPMTAG_VERSION],
|
|
hdr[rpm.RPMTAG_RELEASE],
|
|
hdr[rpm.RPMTAG_ARCH])
|
|
|
|
|
|
def nevra_from_obj(obj):
|
|
epoch = 0
|
|
if obj.epoch is not None:
|
|
epoch = int(obj.epoch)
|
|
return Nevra(obj.name, epoch, obj.version, obj.release, obj.arch)
|
|
|
|
|
|
def nevra_from_primary(repo, name):
|
|
repomd = cr.Repomd(repomd_xml(repo))
|
|
|
|
pkgs = []
|
|
def pkgcb(pkg):
|
|
if pkg.name == name:
|
|
pkgs.append(pkg)
|
|
|
|
for record in repomd.records:
|
|
if record.type == 'primary':
|
|
primary_xml = os.path.join(repo, record.location_href)
|
|
cr.xml_parse_primary(primary_xml, do_files=False, pkgcb=pkgcb)
|
|
break
|
|
|
|
if len(pkgs) == 0:
|
|
raise Exception("Package '%s' not found" % name)
|
|
if len(pkgs) > 1:
|
|
raise Exception("Multiple packages found for '%s'" % name)
|
|
return nevra_from_obj(pkgs[0])
|
|
|
|
|
|
def cmd_add_pkg(args):
|
|
uinfo = get_updateinfo(args.repo)
|
|
if args.epoch is None: # user only passed the 'name' param
|
|
# we support passing an RPM file from which to extract fields
|
|
if os.path.isfile(args.name):
|
|
nevra = nevra_from_rpm(args.name)
|
|
else:
|
|
# try to find pkg in primary
|
|
nevra = nevra_from_primary(args.repo, args.name)
|
|
else:
|
|
nevra = nevra_from_obj(args)
|
|
uinfo = add_pkg_to_update(uinfo, args.id, nevra)
|
|
set_updateinfo(args.repo, uinfo)
|
|
|
|
|
|
def cmd_delete_pkg(args):
|
|
uinfo = get_updateinfo(args.repo)
|
|
uinfo = delete_pkg_from_update(uinfo, args.id, args.name_or_nevra)
|
|
set_updateinfo(args.repo, uinfo)
|
|
|
|
|
|
def get_updateinfo(repo):
|
|
'''
|
|
Parse existing repo updateinfo.xml or create a new one.
|
|
'''
|
|
repomd = cr.Repomd(repomd_xml(repo))
|
|
for record in repomd.records:
|
|
if record.type == 'updateinfo':
|
|
return cr.UpdateInfo(os.path.join(repo, record.location_href))
|
|
return cr.UpdateInfo()
|
|
|
|
|
|
def repomd_xml(repo):
|
|
return os.path.join(repo, "repodata/repomd.xml")
|
|
|
|
|
|
def sev2xml(sev):
|
|
# important -> Important, which is what yum/dnf expects
|
|
return sev[0].upper() + sev[1:]
|
|
|
|
|
|
class UpdateExistsError(Exception):
|
|
pass
|
|
|
|
class UpdateNotExistsError(Exception):
|
|
pass
|
|
|
|
def add_update(uinfo, uid, utype, severity):
|
|
|
|
# check that the target id doesn't already exist
|
|
for update in uinfo.updates:
|
|
if update.id == id:
|
|
raise UpdateExistsError("Update '%s' already exists" % id)
|
|
|
|
rec = cr.UpdateRecord()
|
|
rec.id = uid
|
|
rec.type = utype
|
|
rec.severity = sev2xml(severity)
|
|
uinfo.append(rec)
|
|
return uinfo
|
|
|
|
|
|
def modify_update(uinfo, uid, func, func_data=None):
|
|
|
|
# createrepo_c doesn't allow modifying the original object
|
|
new_uinfo = cr.UpdateInfo()
|
|
|
|
found = False
|
|
for update in uinfo.updates:
|
|
if update.id != uid:
|
|
new_uinfo.append(update)
|
|
else:
|
|
found = True
|
|
if func is not None:
|
|
new_update = func(uid, update, func_data)
|
|
if new_update is not None:
|
|
new_uinfo.append(new_update)
|
|
if not found:
|
|
raise Exception("Update '%s' does not exist" % uid)
|
|
return new_uinfo
|
|
|
|
|
|
def delete_update(uinfo, uid):
|
|
return modify_update(uinfo, uid, None)
|
|
|
|
|
|
def show_updates(uinfo, uid):
|
|
for update in uinfo.updates:
|
|
if uid is not None and update.id != uid:
|
|
continue
|
|
print update.id, update.type, update.severity
|
|
for col in update.collections:
|
|
for pkg in col.packages:
|
|
print " ", pkg.filename
|
|
|
|
|
|
def copy_update_no_cols(update):
|
|
# the default copy() also copies collections
|
|
new_update = cr.UpdateRecord()
|
|
new_update.id = update.id
|
|
new_update.type = update.type
|
|
new_update.severity = update.severity
|
|
return new_update
|
|
|
|
|
|
def add_pkg_to_update_cb(uid, update, nevra):
|
|
|
|
if len(update.collections) > 1:
|
|
# let's just pretend that never happens for our purposes
|
|
raise Exception("Update '%s' has more than one collection" % uid)
|
|
elif len(update.collections) == 1:
|
|
col = update.collections[0]
|
|
else:
|
|
col = cr.UpdateCollection()
|
|
|
|
new_update = copy_update_no_cols(update)
|
|
new_col = cr.UpdateCollection()
|
|
|
|
for pkg in col.packages:
|
|
if nevra.equals_pkg(pkg):
|
|
raise Exception("Update '%s' already contains pkg '%s'" %
|
|
(uid, nevra))
|
|
new_col.append(pkg)
|
|
|
|
pkg = cr.UpdateCollectionPackage()
|
|
pkg.name = nevra.name
|
|
pkg.epoch = '0'
|
|
if nevra.epoch is not None:
|
|
pkg.epoch = str(nevra.epoch)
|
|
pkg.version = nevra.version
|
|
pkg.release = nevra.release
|
|
pkg.arch = nevra.arch
|
|
pkg.filename = str(nevra) + ".rpm"
|
|
new_col.append(pkg)
|
|
|
|
new_update.append_collection(new_col)
|
|
|
|
return new_update
|
|
|
|
|
|
def add_pkg_to_update(uinfo, uid, nevra):
|
|
return modify_update(uinfo, uid, add_pkg_to_update_cb, nevra)
|
|
|
|
|
|
def delete_pkg_from_update_cb(uid, update, name_or_nevra):
|
|
|
|
if len(update.collections) > 1:
|
|
# let's just pretend that never happens for our purposes
|
|
raise Exception("Update '%s' has more than one collection" % uid)
|
|
elif len(update.collections) == 1:
|
|
col = update.collections[0]
|
|
else:
|
|
col = cr.UpdateCollection()
|
|
|
|
new_update = copy_update_no_cols(update)
|
|
new_col = cr.UpdateCollection()
|
|
|
|
found = False
|
|
# just compare by filename to make it easier
|
|
rpm_name = name_or_nevra + '.rpm'
|
|
for pkg in col.packages:
|
|
if pkg.filename != rpm_name and pkg.name != name_or_nevra:
|
|
new_col.append(pkg)
|
|
else:
|
|
found = True
|
|
if not found:
|
|
raise Exception("Update '%s' does not have package '%s'" %
|
|
(uid, name_or_nevra))
|
|
|
|
if len(new_col.packages) > 0:
|
|
new_update.append_collection(new_col)
|
|
|
|
return new_update
|
|
|
|
|
|
def delete_pkg_from_update(uinfo, uid, name_or_nevra):
|
|
return modify_update(uinfo, uid, delete_pkg_from_update_cb, name_or_nevra)
|
|
|
|
|
|
def new_updateinfo_record(repo, uinfo):
|
|
|
|
xml = os.path.join(repo, "repodata/updateinfo.xml.gz")
|
|
with open(xml, 'w') as f:
|
|
f.write(uinfo.xml_dump())
|
|
|
|
# calculate SHA256 and rename
|
|
ui_rec = cr.RepomdRecord('updateinfo', xml)
|
|
ui_rec.fill(cr.SHA256)
|
|
ui_rec.rename_file()
|
|
return ui_rec
|
|
|
|
|
|
def set_updateinfo(repo, uinfo):
|
|
repomd = cr.Repomd(repomd_xml(repo))
|
|
|
|
# clone
|
|
new_repomd = cr.Repomd()
|
|
for record in repomd.records:
|
|
if record.type != 'updateinfo':
|
|
new_repomd.set_record(record)
|
|
else:
|
|
os.unlink(os.path.join(repo, record.location_href))
|
|
|
|
uinfo_rec = new_updateinfo_record(repo, uinfo)
|
|
new_repomd.set_record(uinfo_rec)
|
|
|
|
with open(repomd_xml(repo), 'w') as f:
|
|
f.write(new_repomd.xml_dump())
|
|
|
|
|
|
if __name__ == '__main__':
|
|
sys.exit(main())
|