1
0
mirror of https://github.com/altlinux/gpupdate.git synced 2025-03-21 18:50:38 +03:00

Frontends refactored to become readable

This commit is contained in:
Игорь Чудов 2019-11-27 18:00:51 +04:00
parent 1559f90767
commit 85e3de7286
Signed by untrusted user: nir
GPG Key ID: 0F3883600CAE7AAC
10 changed files with 311 additions and 242 deletions

View File

@ -1,15 +1,15 @@
import frontend.appliers
from .appliers.control import control
from .appliers.polkit import polkit
from .appliers.systemd import systemd_unit
from .control_applier import control_applier
from .polkit_applier import polkit_applier
from .systemd_applier import systemd_applier
import logging
from xml.etree import ElementTree
#from xml.etree import ElementTree
from samba.gp_parse.gp_pol import GPPolParser
class applier_frontend:
def __init__(self, regobj):
pass
def apply(self):
pass
#from samba.gp_parse.gp_pol import GPPolParser
class entry:
def __init__(self, e_keyname, e_valuename, e_type, e_data):
@ -25,133 +25,6 @@ def preg2entries(preg_obj):
entries.append(entry_obj)
return entries
def load_xml_preg(xml_path):
'''
Parse PReg file and return its preg object
'''
logging.info('Loading PReg from XML: {}'.format(xml_path))
gpparser = GPPolParser()
xml_root = ElementTree.parse(xml_path).getroot()
gpparser.load_xml(xml_root)
gpparser.pol_file.__ndr_print__()
return gpparser.pol_file
class control_applier(applier_frontend):
_registry_branch = 'Software\\BaseALT\\Policies\\Control'
def __init__(self, polfiles):
self.polparsers = polfiles
self.control_settings = self._get_controls(self.polparsers)
self.controls = []
for setting in self.control_settings:
try:
self.controls.append(appliers.control(setting.valuename, setting.data))
except:
logging.info('Unable to work with control: {}'.format(setting.valuename))
#for e in polfile.pol_file.entries:
# print('{}:{}:{}:{}:{}'.format(e.type, e.data, e.valuename, e.keyname))
def _get_controls(self, polfiles):
'''
Extract control entries from PReg file
'''
controls = []
for parser in polfiles:
for entry in parser.entries:
if entry.keyname == self._registry_branch:
controls.append(entry)
logging.info('Found control setting: {}'.format(entry.valuename))
else:
# Property names are taken from python/samba/gp_parse/gp_pol.py
logging.info('Dropped control setting: {}\\{}'.format(entry.keyname, entry.valuename))
return controls
def apply(self):
'''
Trigger control facility invocation.
'''
for control in self.controls:
control.set_control_status()
def dump_settings(self):
'''
Write actual controls as XML and PReg files
'''
print('Dumping...')
polfile = preg.file()
polfile.header.signature = 'PReg'
polfile.header.version = 1
polfile.num_entries = len(self.control_settings)
polfile.entries = self.control_settings
print(polfile.__ndr_print__())
policy_writer = GPPolParser()
policy_writer.pol_file = polfile
policy_writer.write_xml('test_reg.xml')
policy_writer.write_binary('test_reg.pol')
class polkit_applier(applier_frontend):
_registry_branch = 'Software\\Policies\\Microsoft\\Windows\\RemovableStorageDevices'
__policy_map = {
'Deny_All': ['99-gpoa_disk_permissions', { 'Deny_All': 0 }]
}
def __init__(self, polfiles):
self.polparsers = polfiles
self.polkit_settings = self._get_policies()
self.policies = []
for setting in self.polkit_settings:
if setting.valuename in self.__policy_map.keys() and setting.keyname == self._registry_branch:
logging.info('Found key: {}, file: {} and value: {}'.format(setting.keyname, self.__policy_map[setting.valuename][0], self.__policy_map[setting.valuename][1]))
#try:
self.__policy_map[setting.valuename][1][setting.valuename] = setting.data
self.policies.append(appliers.polkit(self.__policy_map[setting.valuename][0], self.__policy_map[setting.valuename][1]))
#except Exception as exc:
# print(exc)
# logging.info('Unable to work with PolicyKit setting: {}'.format(setting.valuename))
#for e in polfile.pol_file.entries:
# print('{}:{}:{}:{}:{}'.format(e.type, e.data, e.valuename, e.keyname))
def _get_policies(self):
'''
Extract control entries from PReg file
'''
policies = []
for parser in self.polparsers:
for entry in parser.entries:
if entry.keyname == self._registry_branch:
policies.append(entry)
logging.info('Found PolicyKit setting: {}'.format(entry.valuename))
else:
# Property names are taken from python/samba/gp_parse/gp_pol.py
logging.info('Dropped setting: {}\\{}'.format(entry.keyname, entry.valuename))
return policies
def apply(self):
'''
Trigger control facility invocation.
'''
for policy in self.policies:
policy.generate()
def dump_settings(self):
'''
Write actual controls as XML and PReg files
'''
print('Dumping...')
polfile = preg.file()
polfile.header.signature = 'PReg'
polfile.header.version = 1
polfile.num_entries = len(self.control_settings)
polfile.entries = self.control_settings
print(polfile.__ndr_print__())
policy_writer = GPPolParser()
policy_writer.pol_file = polfile
policy_writer.write_xml('test_reg.xml')
policy_writer.write_binary('test_reg.pol')
class applier:
def __init__(self, sid, backend):
self.backend = backend
@ -159,7 +32,8 @@ class applier:
logging.info('Values: {}'.format(self.gpvalues))
capplier = control_applier(self.gpvalues)
pkapplier = polkit_applier(self.gpvalues)
self.appliers = dict({ 'control': capplier, 'polkit': pkapplier })
sdapplier = systemd_applier(self.gpvalues)
self.appliers = dict({ 'control': capplier, 'polkit': pkapplier, 'systemd': sdapplier })
def load_values(self):
'''
@ -174,7 +48,5 @@ class applier:
logging.info('Applying')
self.appliers['control'].apply()
self.appliers['polkit'].apply()
# This thing dumps Registry.pol files to disk from data structures
#print('Writing settings to file')
#self.appliers['control'].dump_settings()
self.appliers['systemd'].apply()

View File

@ -0,0 +1,7 @@
class applier_frontend(object):
def __init__(self, regobj):
pass
def apply(self):
pass

58
gpoa/frontend/appliers/__init__.py Executable file → Normal file
View File

@ -1,59 +1 @@
#! /usr/bin/env python3
import subprocess
import threading
import os
import jinja2
class control:
def __init__(self, name, value):
self.control_name = name
self.control_value = value
self.possible_values = self._query_control_values()
if self.possible_values == None:
raise Exception('Unable to query possible values')
def _query_control_values(self):
proc = subprocess.Popen(['control', self.control_name, 'list'], stdout=subprocess.PIPE)
for line in proc.stdout:
values = line.split()
return values
def _map_control_status(self, int_status):
str_status = self.possible_values[int_status].decode()
return str_status
def get_control_name(self):
return self.control_name
def get_control_status(self):
proc = subprocess.Popen(['control', self.control_name], stdout=subprocess.PIPE)
for line in proc.stdout:
return line.rstrip('\n\r')
def set_control_status(self):
status = self._map_control_status(self.control_value)
print('Setting control {} to {}'.format(self.control_name, status))
try:
proc = subprocess.Popen(['control', self.control_name, status], stdout=subprocess.PIPE)
except:
print('Unable to set {} to {}'.format(self.control_name, status))
class polkit:
__template_path = '/usr/lib/python3/site-packages/gpoa/templates'
__policy_dir = '/etc/polkit-1/rules.d'
__template_loader = jinja2.FileSystemLoader(searchpath=__template_path)
__template_environment = jinja2.Environment(loader=__template_loader)
def __init__(self, template_name, arglist):
self.template_name = template_name
self.args = arglist
self.infilename = '{}.rules.j2'.format(self.template_name)
self.outfile = os.path.join(self.__policy_dir, '{}.rules'.format(self.template_name))
def generate(self):
template = self.__template_environment.get_template(self.infilename)
text = template.render(**self.args)
with open(self.outfile, 'w') as f:
f.write(text)

View File

@ -0,0 +1,42 @@
#! /usr/bin/env python3
import subprocess
import threading
import logging
logging.basicConfig(level=logging.DEBUG)
class control:
def __init__(self, name, value):
self.control_name = name
self.control_value = value
self.possible_values = self._query_control_values()
if self.possible_values == None:
raise Exception('Unable to query possible values')
def _query_control_values(self):
proc = subprocess.Popen(['control', self.control_name, 'list'], stdout=subprocess.PIPE)
for line in proc.stdout:
values = line.split()
return values
def _map_control_status(self, int_status):
str_status = self.possible_values[int_status].decode()
return str_status
def get_control_name(self):
return self.control_name
def get_control_status(self):
proc = subprocess.Popen(['control', self.control_name], stdout=subprocess.PIPE)
for line in proc.stdout:
return line.rstrip('\n\r')
def set_control_status(self):
status = self._map_control_status(self.control_value)
logging.debug('Setting control {} to {}'.format(self.control_name, status))
try:
proc = subprocess.Popen(['control', self.control_name, status], stdout=subprocess.PIPE)
except:
logging.error('Unable to set {} to {}'.format(self.control_name, status))

View File

@ -0,0 +1,26 @@
#! /usr/bin/env python3
import os
import jinja2
import logging
logging.basicConfig(level=logging.DEBUG)
class polkit:
__template_path = '/usr/lib/python3/site-packages/gpoa/templates'
__policy_dir = '/etc/polkit-1/rules.d'
__template_loader = jinja2.FileSystemLoader(searchpath=__template_path)
__template_environment = jinja2.Environment(loader=__template_loader)
def __init__(self, template_name, arglist):
self.template_name = template_name
self.args = arglist
self.infilename = '{}.rules.j2'.format(self.template_name)
self.outfile = os.path.join(self.__policy_dir, '{}.rules'.format(self.template_name))
def generate(self):
template = self.__template_environment.get_template(self.infilename)
text = template.render(**self.args)
with open(self.outfile, 'w') as f:
f.write(text)
logging.debug('Generated file {} with arguments {}'.format(self.outfile, self.args))

View File

@ -0,0 +1,29 @@
import dbus
import logging
logging.basicConfig(level=logging.DEBUG)
class systemd_unit:
__system_bus = dbus.SystemBus()
__systemd_dbus = __system_bus.get_object('org.freedesktop.systemd1', '/org/freedesktop/systemd1')
__manager = dbus.Interface(__systemd_dbus, 'org.freedesktop.systemd1.Manager')
def __init__(self, unit_name, state):
self.unit_name = unit_name
self.desired_state = state
self.unit = self.__manager.LoadUnit(self.unit_name)
self.unit_proxy = self.__system_bus.get_object('org.freedesktop.systemd1', str(self.unit))
self.unit_interface = dbus.Interface(self.unit_proxy, dbus_interface='org.freedesktop.systemd1.Unit')
def apply(self):
if self.desired_state == 1:
self.__manager.UnmaskUnitFiles([self.unit_name], dbus.Boolean(True))
self.__manager.EnableUnitFiles([self.unit_name], dbus.Boolean(True), dbus.Boolean(True))
self.__manager.StartUnit(self.unit_name, 'replace')
logging.info('Starting systemd unit: {}'.format(self.unit_name))
else:
self.__manager.MaskUnitFiles([self.unit_name], dbus.Boolean(True), dbus.Boolean(True))
self.__manager.DisableUnitFiles([self.unit_name], dbus.Boolean(True))
self.__manager.StopUnit(self.unit_name, 'replace')
logging.info('Stopping systemd unit: {}'.format(self.unit_name))

View File

@ -0,0 +1,61 @@
from .applier_frontend import applier_frontend
from .appliers.control import control
import logging
from samba.gp_parse.gp_pol import GPPolParser
class control_applier(applier_frontend):
_registry_branch = 'Software\\BaseALT\\Policies\\Control'
def __init__(self, polfiles):
self.polparsers = polfiles
self.control_settings = self._get_controls(self.polparsers)
self.controls = []
for setting in self.control_settings:
try:
self.controls.append(control(setting.valuename, setting.data))
except:
logging.info('Unable to work with control: {}'.format(setting.valuename))
#for e in polfile.pol_file.entries:
# print('{}:{}:{}:{}:{}'.format(e.type, e.data, e.valuename, e.keyname))
def _get_controls(self, polfiles):
'''
Extract control entries from PReg file
'''
controls = []
for parser in polfiles:
for entry in parser.entries:
if entry.keyname == self._registry_branch:
controls.append(entry)
logging.info('Found control setting: {}'.format(entry.valuename))
else:
# Property names are taken from python/samba/gp_parse/gp_pol.py
logging.info('Dropped control setting: {}\\{}'.format(entry.keyname, entry.valuename))
return controls
def apply(self):
'''
Trigger control facility invocation.
'''
for cont in self.controls:
cont.set_control_status()
def dump_settings(self):
'''
Write actual controls as XML and PReg files
'''
print('Dumping...')
polfile = preg.file()
polfile.header.signature = 'PReg'
polfile.header.version = 1
polfile.num_entries = len(self.control_settings)
polfile.entries = self.control_settings
print(polfile.__ndr_print__())
policy_writer = GPPolParser()
policy_writer.pol_file = polfile
policy_writer.write_xml('test_reg.xml')
policy_writer.write_binary('test_reg.pol')

View File

@ -0,0 +1,52 @@
from .applier_frontend import applier_frontend
from .appliers.polkit import polkit
import logging
from xml.etree import ElementTree
from samba.gp_parse.gp_pol import GPPolParser
class polkit_applier(applier_frontend):
__registry_branch = 'Software\\Policies\\Microsoft\\Windows\\RemovableStorageDevices'
__policy_map = {
'Deny_All': ['99-gpoa_disk_permissions', { 'Deny_All': 0 }]
}
def __init__(self, polfiles):
self.polparsers = polfiles
self.polkit_settings = self._get_policies()
self.policies = []
for setting in self.polkit_settings:
if setting.valuename in self.__policy_map.keys() and setting.keyname == self.__registry_branch:
logging.info('Found key: {}, file: {} and value: {}'.format(setting.keyname, self.__policy_map[setting.valuename][0], self.__policy_map[setting.valuename][1]))
try:
self.__policy_map[setting.valuename][1][setting.valuename] = setting.data
self.policies.append(polkit(self.__policy_map[setting.valuename][0], self.__policy_map[setting.valuename][1]))
except Exception as exc:
print(exc)
logging.info('Unable to work with PolicyKit setting: {}'.format(setting.valuename))
#for e in polfile.pol_file.entries:
# print('{}:{}:{}:{}:{}'.format(e.type, e.data, e.valuename, e.keyname))
def _get_policies(self):
'''
Extract control entries from PReg file
'''
policies = []
for parser in self.polparsers:
for entry in parser.entries:
if entry.keyname == self.__registry_branch:
policies.append(entry)
logging.info('Found PolicyKit setting: {}\\{}'.format(entry.keyname, entry.valuename))
else:
# Property names are taken from python/samba/gp_parse/gp_pol.py
logging.info('Dropped PolicyKit setting: {}\\{}'.format(entry.keyname, entry.valuename))
return policies
def apply(self):
'''
Trigger control facility invocation.
'''
for policy in self.policies:
policy.generate()

View File

@ -0,0 +1,70 @@
from .applier_frontend import applier_frontend
from .appliers.systemd import systemd_unit
import logging
from samba.gp_parse.gp_pol import GPPolParser
class systemd_applier(applier_frontend):
__registry_branch = 'Software\\BaseALT\\Policies\\SystemdUnits'
def __init__(self, polfiles):
self.polparsers = polfiles
self.systemd_unit_settings = self._get_unit_settings(self.polparsers)
self.units = []
for setting in self.systemd_unit_settings:
try:
self.units.append(systemd_unit(setting.valuename, setting.data))
except:
logging.info('Unable to work with systemd unit: {}'.format(setting.valuename))
def _get_unit_settings(self, polfiles):
'''
Extract control entries from PReg file
'''
units = []
for parser in polfiles:
for entry in parser.entries:
if entry.keyname == self.__registry_branch:
units.append(entry)
logging.info('Found systemd unit setting: {}'.format(entry.valuename))
else:
# Property names are taken from python/samba/gp_parse/gp_pol.py
logging.info('Dropped systemd unit setting: {}\\{}'.format(entry.keyname, entry.valuename))
return units
def apply(self):
'''
Trigger control facility invocation.
'''
for unit in self.units:
unit.apply()
class applier:
def __init__(self, sid, backend):
self.backend = backend
self.gpvalues = self.load_values()
logging.info('Values: {}'.format(self.gpvalues))
capplier = control_applier(self.gpvalues)
pkapplier = polkit_applier(self.gpvalues)
sdapplier = systemd_applier(self.gpvalues)
self.appliers = dict({ 'control': capplier, 'polkit': pkapplier, 'systemd': sdapplier })
def load_values(self):
'''
This thing returns the list of samba.preg objects for
now but it must be transformed to return registry and
its hives to read values from.
'''
logging.info('Get values from backend')
return self.backend.get_values()
def apply_parameters(self):
logging.info('Applying')
self.appliers['control'].apply()
self.appliers['polkit'].apply()
self.appliers['systemd'].apply()
# This thing dumps Registry.pol files to disk from data structures
#print('Writing settings to file')
#self.appliers['control'].dump_settings()

View File

@ -5,8 +5,7 @@ import argparse
# Facility to determine GPTs for user
import optparse
from samba import getopt as options
from samba.gpclass import get_dc_hostname, gpo_version, check_safe_path
import samba.gpo
from samba.gpclass import check_safe_path
# Primitives to work with libregistry
# samba.registry.str_regtype(2) -> 'REG_EXPAND_SZ'
@ -24,9 +23,6 @@ from samba.gp_parse.gp_pol import GPPolParser
# invocation helper).
#from samba.dcerpc import netlogon
# Registry editing facilities are buggy. Use TDB module instead.
import tdb
# This is needed by Registry.pol file search
import os
import re
@ -56,34 +52,6 @@ import sys
import logging
logging.basicConfig(level=logging.DEBUG)
class tdb_regedit:
'''
regedit object for samba-regedit which has separate registry and
not really related to registry formed in smb.conf.
'''
def __init__(self, tdb_file='/var/lib/samba/registry.tdb'):
self.registry = tdb.open(tdb_file)
def _blob2keys(self, blob):
return blob.split('')
def get_keys(self, path):
# Keys are ending with trailing zeros and are binary blobs
keys_blob = self.registry.get('{}\x00'.format(path).encode())
return self._blob2keys
def write_preg_entry(self, entry):
hive_key = 'HKLM\\{}\\{}\\{}'.format(
entry.keyname.upper(),
entry.valuename,
entry.type.to_bytes(1, byteorder='big')).upper().encode()
logging.info('Merging {}'.format(hive_key))
self.registry.transaction_start()
self.registry.store(hive_key, entry.data.to_bytes(4, byteorder='big'))
self.registry.transaction_commit()
class applier_backend:
def __init__(self):
pass
@ -95,7 +63,7 @@ class local_policy_backend(applier_backend):
self.username = username
def get_values(self):
policies = [frontend.load_xml_preg(self.__default_policy_path)]
policies = [util.load_xml_preg(self.__default_policy_path)]
return policies
class samba_backend(applier_backend):
@ -120,10 +88,10 @@ class samba_backend(applier_backend):
dict of lists with PReg file paths.
'''
if self._check_sysvol_present(gpo_obj):
print('Found SYSVOL entry {} for GPO {}'.format(gpo_obj.file_sys_path, gpo_obj.name))
logging.debug('Found SYSVOL entry {} for GPO {}'.format(gpo_obj.file_sys_path, gpo_obj.name))
path = check_safe_path(gpo_obj.file_sys_path).upper()
gpt_abspath = os.path.join(self.cache_dir, 'gpo_cache', path)
print('Path: {}'.format(path))
logging.debug('Path: {}'.format(path))
policy_files = self._find_regpol_files(gpt_abspath)
return policy_files
@ -135,7 +103,7 @@ class samba_backend(applier_backend):
self.creds = creds
self.cache_dir = self.loadparm.get('cache directory')
logging.info('Cache directory is: {}'.format(self.cache_dir))
logging.debug('Cache directory is: {}'.format(self.cache_dir))
# Regular expressions to split PReg files into user and machine parts
self._machine_pol_path_regex = re.compile(self._machine_pol_path_pattern)
@ -175,9 +143,9 @@ class samba_backend(applier_backend):
# Re-cache the retrieved values
with open(cache_file, 'wb') as f:
pickle.dump(cache, f, pickle.HIGHEST_PROTOCOL)
logging.info('Cached PReg files')
logging.debug('Cached PReg files')
logging.info('Policy files: {}'.format(self.policy_files))
logging.debug('Policy files: {}'.format(self.policy_files))
def _parse_pol_file(self, polfile):
'''
@ -199,7 +167,7 @@ class samba_backend(applier_backend):
'''
# Build hive key path from PReg's key name and value name.
hive_key = '{}\\{}'.format(entry.keyname, entry.valuename)
logging.info('Merging {}'.format(hive_key))
logging.debug('Merging {}'.format(hive_key))
# FIXME: Here should be entry.type parser in order to correctly
# represent data
@ -215,7 +183,7 @@ class samba_backend(applier_backend):
logging.info('Parsing machine regpols')
for regpol in self.policy_files['machine_regpols']:
logging.info('Processing {}'.format(regpol))
logging.debug('Processing {}'.format(regpol))
pregfile = self._parse_pol_file(regpol)
preg_objs.append(pregfile)
# Works only with full key names
@ -230,7 +198,7 @@ class samba_backend(applier_backend):
Seek through given GPT directory absolute path and return the dictionary
of user's and machine's Registry.pol files.
'''
logging.info('Finding regpols in: {}'.format(gpt_path))
logging.debug('Finding regpols in: {}'.format(gpt_path))
polfiles = dict({ 'machine_regpols': [], 'user_regpols': [] })
for root, dirs, files in os.walk(gpt_path):
for gpt_file in files:
@ -240,7 +208,7 @@ class samba_backend(applier_backend):
polfiles['machine_regpols'].append(regpol_abspath)
else:
polfiles['user_regpols'].append(regpol_abspath)
logging.info('Polfiles: {}'.format(polfiles))
logging.debug('Polfiles: {}'.format(polfiles))
return polfiles
def parse_arguments():