1
0
mirror of https://github.com/altlinux/gpupdate.git synced 2025-01-22 10:03:34 +03:00
gpupdate/gpoa/gpoa

200 lines
6.4 KiB
Python
Executable File

#! /usr/bin/env python3
#
# GPOA - GPO Applier for Linux
#
# Copyright (C) 2019-2020 BaseALT Ltd.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import argparse
import os
import signal
import gettext
import locale
from backend import backend_factory, save_dconf
from frontend.frontend_manager import frontend_manager, determine_username
from plugin import plugin_manager
from messages import message_with_code
from storage import Dconf_registry
from util.util import get_machine_name
from util.users import (
is_root,
get_process_user
)
from util.arguments import (
set_loglevel
)
from util.logging import log
from util.exceptions import geterr
from util.signals import signal_handler
def parse_arguments():
arguments = argparse.ArgumentParser(description='Generate configuration out of parsed policies')
arguments.add_argument('user',
type=str,
nargs='?',
help='Domain username ({}) to parse policies for'.format(get_machine_name()))
arguments.add_argument('--dc',
type=str,
help='FQDN of the domain to replicate SYSVOL from')
arguments.add_argument('--nodomain',
action='store_true',
help='Operate without domain (apply local policy)')
arguments.add_argument('--noupdate',
action='store_true',
help='Don\'t try to update storage, only run appliers')
arguments.add_argument('--noplugins',
action='store_true',
help='Don\'t start plugins')
arguments.add_argument('--list-backends',
action='store_true',
help='Show list of available backends')
arguments.add_argument('--force',
action='store_true',
help='Force GPT download')
arguments.add_argument('--loglevel',
type=int,
default=4,
help='Set logging verbosity level')
return arguments.parse_args()
class gpoa_controller:
__args = None
def __init__(self):
self.__args = parse_arguments()
self.is_machine = False
self.noupdate = self.__args.noupdate
set_loglevel(self.__args.loglevel)
locale.bindtextdomain('gpoa', '/usr/lib/python3/site-packages/gpoa/locale')
gettext.bindtextdomain('gpoa', '/usr/lib/python3/site-packages/gpoa/locale')
gettext.textdomain('gpoa')
if not self.__args.user:
self.username = get_machine_name()
self.is_machine = True
else:
self.username = self.__args.user
uname = get_process_user()
uid = os.getuid()
logdata = dict()
logdata['username'] = self.username
logdata['is_machine'] = self.is_machine
logdata['process_username'] = uname
logdata['process_uid'] = uid
if self.is_machine:
log('D61', logdata)
else:
log('D1', logdata)
self.username = determine_username(self.username)
if not is_root():
self.noupdate = True
if self.is_machine:
msgtext = message_with_code('E34')
log('E34', {'username': self.username})
raise Exception(msgtext)
log('D59', {'username': self.username})
else:
log('D60', {'username': self.username})
def run(self):
'''
GPOA controller entry point
'''
if self.__args.list_backends:
print('local')
print('samba')
return
Dconf_registry._force = self.__args.force
self.start_plugins()
self.start_backend()
def start_backend(self):
'''
Function to start update of settings storage
'''
dc = self.__args.dc
nodomain = False
if self.__args.nodomain:
nodomain = True
if not self.noupdate:
if is_root():
back = None
try:
back = backend_factory(dc, self.username, self.is_machine, nodomain)
except Exception as exc:
logdata = dict({'msg': str(exc)})
einfo = geterr()
print(einfo)
print(type(einfo))
#logdata.update(einfo)
log('E12', logdata)
if back:
try:
back.retrieve_and_store()
# Start frontend only on successful backend finish
self.start_frontend()
save_dconf(self.username, self.is_machine, nodomain)
except Exception as exc:
logdata = dict({'message': str(exc)})
# In case we're handling "E3" - it means that
# this is a very specific exception that was
# not handled properly on lower levels of
# code so we're also printing file name and
# other information.
einfo = geterr()
logdata.update(einfo)
log('E3', logdata)
def start_frontend(self):
'''
Function to start appliers
'''
try:
appl = frontend_manager(self.username, self.is_machine)
appl.apply_parameters()
except Exception as exc:
logdata = dict({'message': str(exc)})
einfo = geterr()
#print(einfo)
logdata.update(einfo)
log('E4', logdata)
def start_plugins(self):
'''
Function to start supplementary facilities
'''
if not self.__args.noplugins:
pm = plugin_manager()
pm.run()
def main():
controller = gpoa_controller()
controller.run()
if __name__ == "__main__":
default_handler = signal.getsignal(signal.SIGINT)
signal.signal(signal.SIGINT, signal_handler)
main()