#!/usr/bin/python3 # # GPOA - GPO Applier for Linux # # Copyright (C) 2019-2020 BaseALT Ltd. # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program. If not, see . import rpm import subprocess from gpoa.storage import registry_factory from util.gpoa_ini_parsing import GpoaConfigObj from util.util import get_uid_by_username, string_to_literal_eval import logging from util.logging import log import argparse import gettext import locale def is_rpm_installed(rpm_name): ''' Check if the package named 'rpm_name' is installed ''' ts = rpm.TransactionSet() pm = ts.dbMatch('name', rpm_name) if pm.count() > 0: return True return False class Pkcon_applier: def __init__(self, user = None): self.__install_key_name = 'Install' self.__remove_key_name = 'Remove' self.__hklm_branch = '/Software/BaseALT/Policies/Packages' self.__install_command = ['/usr/bin/pkcon', '-y', 'install'] self.__remove_command = ['/usr/bin/pkcon', '-y', 'remove'] self.__reinstall_command = ['/usr/bin/pkcon', '-y', 'reinstall'] self.install_packages = set() self.remove_packages = set() if user: pid = get_uid_by_username(user) #TODO: It is necessary to redo reading from the GVariant database file policy{pid} try: packages_dict = GpoaConfigObj(f'/etc/dconf/db/policy{pid}.d/policy{pid}.ini') except: packages_dict = {} self.install_packages_setting = string_to_literal_eval( packages_dict.get(self.__hklm_branch[1:], {}).get(self.__install_key_name, {})) self.remove_packages_setting = string_to_literal_eval( packages_dict.get(self.__hklm_branch[1:], {}).get(self.__remove_key_name, {})) else: storage = registry_factory(username=user) install_branch = '{}/{}'.format(self.__hklm_branch, self.__install_key_name) remove_branch = '{}/{}'.format(self.__hklm_branch, self.__remove_key_name) self.install_packages_setting = storage.get_key_value(install_branch) self.remove_packages_setting = storage.get_key_value(remove_branch) for package in self.install_packages_setting: if not is_rpm_installed(package): self.install_packages.add(package) for package in self.remove_packages_setting: if package in self.install_packages: self.install_packages.remove(package) if is_rpm_installed(package): self.remove_packages.add(package) def apply(self): log('D142') self.update() for package in self.remove_packages: try: logdata = dict() logdata['name'] = package log('D149', logdata) self.remove_pkg(package) except Exception as exc: logdata = dict() logdata['exc'] = exc log('E58', logdata) for package in self.install_packages: try: logdata = dict() logdata['name'] = package log('D148', logdata) self.install_pkg(package) except Exception as exc: logdata = dict() logdata['exc'] = exc log('E57', logdata) def install_pkg(self, package_name): fullcmd = list(self.__install_command) fullcmd.append(package_name) return subprocess.check_output(fullcmd) def reinstall_pkg(self, package_name): pass def remove_pkg(self, package_name): fullcmd = self.__remove_command fullcmd.append(package_name) return subprocess.check_output(fullcmd) def update(self): ''' Update APT-RPM database. ''' try: res = subprocess.check_output(['/usr/bin/apt-get', 'update'], encoding='utf-8') msg = str(res).split('\n') logdata = dict() for mslog in msg: ms = str(mslog).split(' ') if ms: logdata = {ms[0]: ms[1:-1]} log('D143', logdata) except Exception as exc: logdata = dict() logdata['msg'] = exc log('E56',logdata) if __name__ == '__main__': locale.bindtextdomain('gpoa', '/usr/lib/python3/site-packages/gpoa/locale') gettext.bindtextdomain('gpoa', '/usr/lib/python3/site-packages/gpoa/locale') gettext.textdomain('gpoa') logger = logging.getLogger() parser = argparse.ArgumentParser(description='Package applier') parser.add_argument('--user', type = str, help = 'user', nargs = '?', default = None) parser.add_argument('--loglevel', type = int, help = 'loglevel', nargs = '?', default = 30) args = parser.parse_args() logger.setLevel(args.loglevel) if args.user: applier = Pkcon_applier(args.user) else: applier = Pkcon_applier() applier.apply()