#!/usr/bin/python3 # # GPOA - GPO Applier for Linux # # Copyright (C) 2019-2020 BaseALT Ltd. # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program. If not, see . import rpm import subprocess from gpoa.storage import registry_factory import logging from util.logging import log import argparse import gettext import locale from messages import message_with_code from util.arguments import ( set_loglevel ) def is_rpm_installed(rpm_name): ''' Check if the package named 'rpm_name' is installed ''' ts = rpm.TransactionSet() pm = ts.dbMatch('name', rpm_name) if pm.count() > 0: return True return False class Pkcon_applier: def __init__(self, sid = None): self.__install_key_name = 'Install' self.__remove_key_name = 'Remove' self.__hkcu_branch = 'Software\\BaseALT\\Policies\\Packages' self.__hklm_branch = 'Software\\BaseALT\\Policies\\Packages' self.__install_command = ['/usr/bin/pkcon', '-y', 'install'] self.__remove_command = ['/usr/bin/pkcon', '-y', 'remove'] self.__reinstall_command = ['/usr/bin/pkcon', '-y', 'reinstall'] self.install_packages = set() self.remove_packages = set() self.storage = registry_factory('registry') if sid: install_branch_user = '{}\\{}%'.format(self.__hkcu_branch, self.__install_key_name) remove_branch_user = '{}\\{}%'.format(self.__hkcu_branch, self.__remove_key_name) self.install_packages_setting = self.storage.filter_hkcu_entries(sid, install_branch_user) self.remove_packages_setting = self.storage.filter_hkcu_entries(sid, remove_branch_user) else: install_branch = '{}\\{}%'.format(self.__hklm_branch, self.__install_key_name) remove_branch = '{}\\{}%'.format(self.__hklm_branch, self.__remove_key_name) self.install_packages_setting = self.storage.filter_hklm_entries(install_branch) self.remove_packages_setting = self.storage.filter_hklm_entries(remove_branch) for package in self.install_packages_setting: if not is_rpm_installed(package.data): self.install_packages.add(package.data) for package in self.remove_packages_setting: if package.data in self.install_packages: self.install_packages.remove(package.data) if is_rpm_installed(package.data): self.remove_packages.add(package.data) def apply(self): log('D142') self.update() for package in self.remove_packages: try: logdata = dict() logdata['name'] = package log('D149', logdata) self.remove_pkg(package) except Exception as exc: logdata = dict() logdata['exc'] = exc log('E58', logdata) for package in self.install_packages: try: logdata = dict() logdata['name'] = package log('D148', logdata) self.install_pkg(package) except Exception as exc: logdata = dict() logdata['exc'] = exc log('E57', logdata) def install_pkg(self, package_name): fullcmd = list(self.__install_command) fullcmd.append(package_name) return subprocess.check_output(fullcmd) def reinstall_pkg(self, package_name): pass def remove_pkg(self, package_name): fullcmd = self.__remove_command fullcmd.append(package_name) return subprocess.check_output(fullcmd) def update(self): ''' Update APT-RPM database. ''' try: res = subprocess.check_output(['/usr/bin/apt-get', 'update'], encoding='utf-8') msg = str(res).split('\n') logdata = dict() for mslog in msg: ms = str(mslog).split(' ') if ms: logdata = {ms[0]: ms[1:-1]} log('D143', logdata) except Exception as exc: logdata = dict() logdata['msg'] = exc log('E56',logdata) if __name__ == '__main__': locale.bindtextdomain('gpoa', '/usr/lib/python3/site-packages/gpoa/locale') gettext.bindtextdomain('gpoa', '/usr/lib/python3/site-packages/gpoa/locale') gettext.textdomain('gpoa') logger = logging.getLogger() parser = argparse.ArgumentParser(description='Package applier') parser.add_argument('--sid', type = str, help = 'sid', nargs = '?', default = None) parser.add_argument('--loglevel', type = int, help = 'loglevel', nargs = '?', default = 30) args = parser.parse_args() logger.setLevel(args.loglevel) if args.sid: applier = Pkcon_applier(args.sid) else: applier = Pkcon_applier() applier.apply()