1
0
mirror of https://github.com/altlinux/gpupdate.git synced 2025-02-02 21:47:31 +03:00
gpupdate/gpoa/gpupdate

196 lines
5.5 KiB
Python
Executable File

#! /usr/bin/env python3
#
# GPOA - GPO Applier for Linux
#
# Copyright (C) 2019-2020 BaseALT Ltd.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import argparse
import locale
import gettext
import subprocess
import os
import sys
import pwd
import signal
from util.users import (
is_root
)
from util.arguments import (
process_target,
set_loglevel,
ExitCodeUpdater
)
from util.dbus import (
is_oddjobd_gpupdate_accessible,
dbus_runner
)
from util.signals import signal_handler
from util.logging import log
#logging.basicConfig(level=logging.DEBUG)
class file_runner:
_gpoa_exe = '/usr/sbin/gpoa'
def __init__(self, loglevel, username=None):
self._user = username
self._loglevel = loglevel
def run(self):
'''
Call gpoa utility to generate scripts
'''
gpoa_cmd = [self._gpoa_exe]
if self._loglevel != None:
gpoa_cmd += ["--loglevel", str(self._loglevel)]
if self._user:
gpoa_cmd += [self._user]
subprocess.check_output(gpoa_cmd)
def parse_cli_arguments():
'''
Command line argument parser
'''
argparser = argparse.ArgumentParser(description='Update group policies for computer and the specified user')
argparser.add_argument('-u',
'--user',
default=None,
help='Name of the user for GPO update')
argparser.add_argument('-t',
'--target',
default=None,
type=str.upper,
choices=["ALL", "USER", "COMPUTER"],
help='Specify if it is needed to update user\'s or computer\'s policies')
argparser.add_argument('-l',
'--loglevel',
type=int,
default=5,
help='Set logging verbosity level')
argparser.add_argument('-s',
'--system',
action='store_true',
default=None,
help='Run gpoa directly in system mode')
return argparser.parse_args()
def runner_factory(args, target):
'''
Return the necessary runner class according to some
factors taken into account.
'''
username = None
target = target.upper()
if is_root():
# Only root may specify any username to update.
try:
if args.user:
username = pwd.getpwnam(args.user).pw_name
else:
target = 'COMPUTER'
except:
username = None
logdata = dict({'username': args.user})
log('W1', logdata)
else:
# User may only perform gpupdate for machine (None) or
# itself (os.getusername()).
username = pwd.getpwuid(os.getuid()).pw_name
if args.user != username:
logdata = dict({'username': username})
log('W2', logdata)
if args.system:
return try_directly(username, target, args.loglevel)
else:
return try_by_oddjob(username, target)
def try_by_oddjob(username, target):
'''
Run group policies applying by oddjob service
'''
if is_oddjobd_gpupdate_accessible():
log('D13')
computer_runner = None
user_runner = None
if target == 'ALL' or target == 'COMPUTER':
computer_runner = dbus_runner()
if username:
if target == 'ALL' or target == 'USER':
user_runner = dbus_runner(username)
return (computer_runner, user_runner)
else:
log('W3')
return None
def try_directly(username, target, loglevel):
'''
Run group policies applying directly
'''
if is_root():
log('D14')
computer_runner = None
user_runner = None
if target == 'ALL' or target == 'COMPUTER':
computer_runner = file_runner(loglevel)
if target == 'ALL' or target == 'USER':
user_runner = file_runner(loglevel, username)
return (computer_runner, user_runner)
else:
log('E1')
return None
def main():
args = parse_cli_arguments()
locale.bindtextdomain('gpoa', '/usr/lib/python3/site-packages/gpoa/locale')
gettext.bindtextdomain('gpoa', '/usr/lib/python3/site-packages/gpoa/locale')
gettext.textdomain('gpoa')
set_loglevel(args.loglevel)
gpo_appliers = runner_factory(args, process_target(args.target))
if gpo_appliers:
if gpo_appliers[0]:
try:
gpo_appliers[0].run()
except Exception as exc:
logdata = dict({'error': str(exc)})
log('E5')
return int(ExitCodeUpdater.FAIL_GPUPDATE_COMPUTER_NOREPLY)
if gpo_appliers[1]:
try:
gpo_appliers[1].run()
except Exception as exc:
logdata = dict({'error': str(exc)})
log('E6', logdata)
return int(ExitCodeUpdater.FAIL_GPUPDATE_USER_NOREPLY)
else:
log('E2')
return int(ExitCodeUpdater.FAIL_NO_RUNNER)
return int(ExitCodeUpdater.EXIT_SUCCESS)
if __name__ == '__main__':
signal.signal(signal.SIGINT, signal_handler)
sys.exit(int(main()))