1
0
mirror of https://github.com/altlinux/gpupdate.git synced 2025-08-25 01:49:35 +03:00

Compare commits

..

2 Commits

55 changed files with 469 additions and 2053 deletions

1
.gitignore vendored
View File

@ -2,5 +2,4 @@ __pycache__
*~
_opam
_build
*.pyc

23
dist/gpupdate-setup vendored
View File

@ -50,7 +50,7 @@ def get_default_policy_name():
dcpolicy = 'ad-domain-controller'
try:
if smbopts().get_server_role() == 'active directory domain controller':
if smbopt.get_server_role() == 'active directory domain controller':
return dcpolicy
except:
pass
@ -86,8 +86,6 @@ def parse_arguments():
help='Disable Group Policy subsystem')
parser_write = subparsers.add_parser('write',
help='Operate on Group Policies (enable or disable)')
parser_default = subparsers.add_parser('default-policy',
help='Show name of default policy')
parser_active = subparsers.add_parser('active-policy',
help='Show name of policy enabled')
@ -146,17 +144,17 @@ def get_status():
return os.path.islink(systemd_unit_link)
def get_active_policy_name():
def get_active_policy():
policy_dir = '/usr/share/local-policy'
etc_policy_dir = '/etc/local-policy'
actual_policy_name = 'unknown'
default_policy_name = os.path.join(policy_dir, get_default_policy_name())
active_policy_path = os.path.join(etc_policy_dir, 'active')
active_policy_name = os.path.join(etc_policy_dir, 'active')
if os.path.islink(active_policy_path):
active_policy_path = os.path.realpath(active_policy_path)
actual_policy_name = os.path.realpath(default_policy_name)
if os.path.isdir(active_policy_path):
actual_policy_name = os.path.basename(active_policy_path)
if os.path.isdir(active_policy_name):
actual_policy_name = os.path.realpath(active_policy_name)
return actual_policy_name
@ -223,10 +221,7 @@ def main():
disable_gp()
if arguments.action == 'active-policy':
print(get_active_policy_name())
if arguments.action == 'default-policy':
print(get_default_policy_name())
print(get_active_policy())
if __name__ == '__main__':
main()

View File

@ -1,4 +1,5 @@
#%PAM-1.0
session required pam_mktemp.so
session required pam_mkhomedir.so silent
session required pam_limits.so
-session required pam_oddjob_gpupdate.so
session optional pam_env.so user_readenv=1 conffile=/etc/gpupdate/environment user_envfile=.gpupdate_environment

View File

@ -18,51 +18,6 @@
from abc import ABC
import logging
from util.logging import slogm
def check_experimental_enabled(storage):
experimental_enable_flag = 'Software\\BaseALT\\Policies\\GPUpdate\\GlobalExperimental'
flag = storage.get_hklm_entry(experimental_enable_flag)
result = False
if flag and '1' == flag.data:
result = True
return result
def check_module_enabled(storage, module_name):
gpupdate_module_enable_branch = 'Software\\BaseALT\\Policies\\GPUpdate'
gpupdate_module_flag = '{}\\{}'.format(gpupdate_module_enable_branch, module_name)
flag = storage.get_hklm_entry(gpupdate_module_flag)
result = None
if flag:
if '1' == flag.data:
result = True
if '0' == flag.data:
result = False
return result
def check_enabled(storage, module_name, is_experimental):
module_enabled = check_module_enabled(storage, module_name)
exp_enabled = check_experimental_enabled(storage)
result = False
if None == module_enabled:
if is_experimental and exp_enabled:
result = True
if not is_experimental:
result = True
else:
result = module_enabled
return result
class applier_frontend(ABC):
@classmethod
def __init__(self, regobj):

View File

@ -1,97 +0,0 @@
#
# GPOA - GPO Applier for Linux
#
# Copyright (C) 2019-2020 BaseALT Ltd.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
from enum import Enum
import subprocess
def getprops(param_list):
props = dict()
for entry in param_list:
lentry = entry.lower()
if lentry.startswith('action'):
props['action'] = lentry.rpartition('=')[2]
if lentry.startswith('protocol'):
props['protocol'] = lentry.rpartition('=')[2]
if lentry.startswith('dir'):
props['dir'] = lentry.rpartition('=')[2]
return props
def get_ports(param_list):
portlist = list()
for entry in param_list:
lentry = entry.lower()
if lentry.startswith('lport'):
port = lentry.rpartition('=')[2]
portlist.append(port)
return portlist
class PortState(Enum):
OPEN = 'Allow'
CLOSE = 'Deny'
class Protocol(Enum):
TCP = 'tcp'
UDP = 'udp'
class FirewallMode(Enum):
ROUTER = 'router'
GATEWAY = 'gateway'
HOST = 'host'
# This shi^Wthing named alterator-net-iptables is unable to work in
# multi-threaded environment
class FirewallRule:
__alterator_command = '/usr/bin/alterator-net-iptables'
def __init__(self, data):
data_array = data.split('|')
self.version = data_array[0]
self.ports = get_ports(data_array[1:])
self.properties = getprops(data_array[1:])
def apply(self):
tcp_command = []
udp_command = []
for port in self.ports:
tcp_port = '{}'.format(port)
udp_port = '{}'.format(port)
if PortState.OPEN.value == self.properties['action']:
tcp_port = '+' + tcp_port
udp_port = '+' + udp_port
if PortState.CLOSE.value == self.properties['action']:
tcp_port = '-' + tcp_port
udp_port = '-' + udp_port
portcmd = [
self.__alterator_command
, 'write'
, '-m', FirewallMode.HOST.value
, '-t', tcp_port
, '-u', udp_port
]
proc = subprocess.Popen(portcmd)
proc.wait()

View File

@ -1,72 +0,0 @@
#
# GPOA - GPO Applier for Linux
#
# Copyright (C) 2019-2020 BaseALT Ltd.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
from pathlib import Path
from gpt.folders import (
FileAction
, action_letter2enum
)
def remove_dir_tree(path, delete_files=False, delete_folder=False, delete_sub_folders=False):
for entry in path.iterdir():
if entry.is_file():
entry.unlink()
if entry.is_dir():
if delete_sub_folders:
remove_dir_tree(entry,
delete_files,
delete_folder,
delete_sub_folders)
if delete_folder:
path.rmdir()
def str2bool(boolstr):
if boolstr.lower in ['true', 'yes', '1']:
return True
return False
class Folder:
def __init__(self, folder_object):
self.folder_path = Path(folder_object.path)
self.action = action_letter2enum(folder_object.action)
self.delete_files = str2bool(folder_object.delete_files)
self.delete_folder = str2bool(folder_object.delete_folder)
self.delete_sub_folders = str2bool(folder_object.delete_sub_folders)
def _create_action(self):
self.folder_path.mkdir(parents=True, exist_ok=True)
def _delete_action(self):
remove_dir_tree(self.folder_path,
self.delete_files,
self.delete_folders,
self.delete_sub_folders)
def action(self):
if self.action == FileAction.CREATE:
self._create_action()
if self.action == FileAction.DELETE:
self._delete_action()
if self.action == FileAction.REPLACE:
self._delete_action()
self._create_action()

View File

@ -28,15 +28,11 @@ class polkit:
__template_loader = jinja2.FileSystemLoader(searchpath=__template_path)
__template_environment = jinja2.Environment(loader=__template_loader)
def __init__(self, template_name, arglist, username=None):
def __init__(self, template_name, arglist):
self.template_name = template_name
self.args = arglist
self.username = username
self.infilename = '{}.rules.j2'.format(self.template_name)
if self.username:
self.outfile = os.path.join(self.__policy_dir, '{}.{}.rules'.format(self.template_name, self.username))
else:
self.outfile = os.path.join(self.__policy_dir, '{}.rules'.format(self.template_name))
self.outfile = os.path.join(self.__policy_dir, '{}.rules'.format(self.template_name))
def generate(self):
try:

View File

@ -16,10 +16,16 @@
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
def read_tasks(filename):
pass
from util.rpm import (
install_rpm,
remove_rpm
)
def merge_tasks(storage, sid, task_objects, policy_name):
for task in task_objects:
class rpm:
def __init__(self, name, action):
self.name = name
self.action = action
def apply(self):
pass

View File

@ -16,10 +16,7 @@
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
from .applier_frontend import (
applier_frontend
, check_enabled
)
from .applier_frontend import applier_frontend
import logging
import json
@ -29,9 +26,6 @@ from util.logging import slogm
from util.util import is_machine_name
class chromium_applier(applier_frontend):
__module_name = 'ChromiumApplier'
__module_enabled = False
__module_experimental = True
__registry_branch = 'Software\\Policies\\Google\\Chrome'
__managed_policies_path = '/etc/chromium/policies/managed'
__recommended_policies_path = '/etc/chromium/policies/recommended'
@ -45,11 +39,6 @@ class chromium_applier(applier_frontend):
self.username = username
self._is_machine_name = is_machine_name(self.username)
self.policies = dict()
self.__module_enabled = check_enabled(
self.storage
, self.__module_name
, self.__module_experimental
)
def get_hklm_string_entry(self, hive_subkey):
query_str = '{}\\{}'.format(self.__registry_branch, hive_subkey)
@ -142,12 +131,7 @@ class chromium_applier(applier_frontend):
'''
All actual job done here.
'''
if self.__module_enabled:
logging.debug(slogm('Running Chromium applier for machine'))
self.machine_apply()
else:
logging.debug(slogm('Chromium applier for machine will not be started'))
self.machine_apply()
#if not self._is_machine_name:
# logging.debug('Running user applier for Chromium')
# self.user_apply()

View File

@ -1,164 +0,0 @@
#
# GPOA - GPO Applier for Linux
#
# Copyright (C) 2019-2020 BaseALT Ltd.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import fileinput
import jinja2
import os
import subprocess
import logging
from pathlib import Path
from .applier_frontend import (
applier_frontend
, check_enabled
)
from gpt.drives import json2drive
from util.util import get_homedir
from util.logging import slogm
def storage_get_drives(storage, sid):
drives = storage.get_drives(sid)
drive_list = list()
for drv_obj in drives:
drive_list.append(drv_obj)
return drive_list
def add_line_if_missing(filename, ins_line):
with open(filename, 'r+') as f:
for line in f:
if ins_line == line.strip():
break
else:
f.write(ins_line + '\n')
f.flush()
class cifs_applier(applier_frontend):
def __init__(self, storage):
pass
def apply(self):
pass
class cifs_applier_user(applier_frontend):
__module_name = 'CIFSApplierUser'
__module_enabled = False
__module_experimental = True
__auto_file = '/etc/auto.master'
__auto_dir = '/etc/auto.master.gpupdate.d'
__template_path = '/usr/share/gpupdate/templates'
__template_mountpoints = 'autofs_mountpoints.j2'
__template_identity = 'autofs_identity.j2'
__template_auto = 'autofs_auto.j2'
def __init__(self, storage, sid, username):
self.storage = storage
self.sid = sid
self.username = username
self.home = get_homedir(username)
conf_file = '{}.conf'.format(sid)
autofs_file = '{}.autofs'.format(sid)
cred_file = '{}.creds'.format(sid)
self.auto_master_d = Path(self.__auto_dir)
self.user_config = self.auto_master_d / conf_file
if os.path.exists(self.user_config.resolve()):
self.user_config.unlink()
self.user_autofs = self.auto_master_d / autofs_file
if os.path.exists(self.user_autofs.resolve()):
self.user_autofs.unlink()
self.user_creds = self.auto_master_d / cred_file
self.mount_dir = Path(os.path.join(self.home, 'net'))
self.drives = storage_get_drives(self.storage, self.sid)
self.template_loader = jinja2.FileSystemLoader(searchpath=self.__template_path)
self.template_env = jinja2.Environment(loader=self.template_loader)
self.template_mountpoints = self.template_env.get_template(self.__template_mountpoints)
self.template_indentity = self.template_env.get_template(self.__template_identity)
self.template_auto = self.template_env.get_template(self.__template_auto)
self.__module_enabled = check_enabled(
self.storage
, self.__module_name
, self.__module_experimental
)
def user_context_apply(self):
'''
Nothing to implement.
'''
pass
def __admin_context_apply(self):
# Create /etc/auto.master.gpupdate.d directory
self.auto_master_d.mkdir(parents=True, exist_ok=True)
# Create user's destination mount directory
self.mount_dir.mkdir(parents=True, exist_ok=True)
# Add pointer to /etc/auto.master.gpiupdate.d in /etc/auto.master
auto_destdir = '+dir:{}'.format(self.__auto_dir)
add_line_if_missing(self.__auto_file, auto_destdir)
# Collect data for drive settings
drive_list = list()
for drv in self.drives:
drive_settings = dict()
drive_settings['dir'] = drv.dir
drive_settings['login'] = drv.login
drive_settings['password'] = drv.password
drive_settings['path'] = drv.path.replace('\\', '/')
drive_list.append(drive_settings)
if len(drive_list) > 0:
mount_settings = dict()
mount_settings['drives'] = drive_list
mount_text = self.template_mountpoints.render(**mount_settings)
with open(self.user_config.resolve(), 'w') as f:
f.truncate()
f.write(mount_text)
f.flush()
autofs_settings = dict()
autofs_settings['home_dir'] = self.home
autofs_settings['mount_file'] = self.user_config.resolve()
autofs_text = self.template_auto.render(**autofs_settings)
with open(self.user_autofs.resolve(), 'w') as f:
f.truncate()
f.write(autofs_text)
f.flush()
subprocess.check_call(['/bin/systemctl', 'restart', 'autofs'])
def admin_context_apply(self):
if self.__module_enabled:
logging.debug(slogm('Running CIFS applier for user in administrator context'))
self.__admin_context_apply()
else:
logging.debug(slogm('CIFS applier for user in administrator context will not be started'))

View File

@ -16,32 +16,24 @@
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
from .applier_frontend import (
applier_frontend
, check_enabled
)
from .applier_frontend import applier_frontend
from .appliers.control import control
from util.logging import slogm
import logging
class control_applier(applier_frontend):
__module_name = 'ControlApplier'
__module_experimental = False
__module_enabled = True
_registry_branch = 'Software\\BaseALT\\Policies\\Control'
def __init__(self, storage):
self.storage = storage
self.control_settings = self.storage.filter_hklm_entries('Software\\BaseALT\\Policies\\Control%')
self.controls = list()
self.__module_enabled = check_enabled(
self.storage
, self.__module_name
, self.__module_experimental
)
def run(self):
def apply(self):
'''
Trigger control facility invocation.
'''
for setting in self.control_settings:
valuename = setting.hive_key.rpartition('\\')[2]
try:
@ -57,13 +49,3 @@ class control_applier(applier_frontend):
for cont in self.controls:
cont.set_control_status()
def apply(self):
'''
Trigger control facility invocation.
'''
if self.__module_enabled:
logging.debug(slogm('Running Control applier for machine'))
self.run()
else:
logging.debug(slogm('Control applier for machine will not be started'))

View File

@ -18,14 +18,8 @@
import logging
import os
import json
import cups
from .applier_frontend import (
applier_frontend
, check_enabled
)
from .applier_frontend import applier_frontend
from gpt.printers import json2printer
from util.rpm import is_rpm_installed
from util.logging import slogm
@ -38,81 +32,42 @@ def storage_get_printers(storage, sid):
printers = list()
for prnj in printer_objs:
printers.append(prnj)
prn_obj = json2printer(prnj)
printers.append(prn_obj)
return printers
def connect_printer(connection, prn):
def write_printer(prn):
'''
Dump printer cinfiguration to disk as CUPS config
'''
# PPD file location
printer_driver = 'generic'
pjson = json.loads(prn.printer)
printer_parts = pjson['printer']['path'].partition(' ')
# Printer queue name in CUPS
printer_name = printer_parts[2].replace('(', '').replace(')', '')
# Printer description in CUPS
printer_info = printer_name
printer_uri = printer_parts[0].replace('\\', '/')
printer_uri = 'smb:' + printer_uri
connection.addPrinter(
name=printer_name
, info=printer_info
, device=printer_uri
#filename=printer_driver
)
printer_config_path = os.path.join('/etc/cups', prn.name)
with open(printer_config_path, 'r') as f:
print(prn.cups_config(), file=f)
class cups_applier(applier_frontend):
__module_name = 'CUPSApplier'
__module_experimental = True
__module_enabled = False
def __init__(self, storage):
self.storage = storage
self.__module_enabled = check_enabled(
self.storage
, self.__module_name
, self.__module_experimental
)
def run(self):
if not is_rpm_installed('cups'):
logging.warning(slogm('CUPS is not installed: no printer settings will be deployed'))
return
self.cups_connection = cups.Connection()
self.printers = storage_get_printers(self.storage, self.storage.get_info('machine_sid'))
if self.printers:
for prn in self.printers:
connect_printer(self.cups_connection, prn)
def apply(self):
'''
Perform configuration of printer which is assigned to computer.
'''
if self.__module_enabled:
logging.debug(slogm('Running CUPS applier for machine'))
self.run()
else:
logging.debug(slogm('CUPS applier for machine will not be started'))
if not is_rpm_installed('cups'):
logging.warning(slogm('CUPS is not installed: no printer settings will be deployed'))
return
printers = storage_get_printers(self.storage, self.storage.get_info('machine_sid'))
if printers:
for prn in printers:
write_printer(prn)
class cups_applier_user(applier_frontend):
__module_name = 'CUPSApplierUser'
__module_experimental = True
__module_enabled = False
def __init__(self, storage, sid, username):
self.storage = storage
self.sid = sid
self.username = username
self.__module_enabled = check_enabled(
self.storage
, self.__module_name
, self.__module_enabled
)
def user_context_apply(self):
'''
@ -121,25 +76,17 @@ class cups_applier_user(applier_frontend):
'''
pass
def run(self):
if not is_rpm_installed('cups'):
logging.warning(slogm('CUPS is not installed: no printer settings will be deployed'))
return
self.cups_connection = cups.Connection()
self.printers = storage_get_printers(self.storage, self.sid)
if self.printers:
for prn in self.printers:
connect_printer(self.cups_connection, prn)
def admin_context_apply(self):
'''
Perform printer configuration assigned for user.
'''
if self.__module_enabled:
logging.debug(slogm('Running CUPS applier for user in administrator context'))
self.run()
else:
logging.debug(slogm('CUPS applier for user in administrator context will not be started'))
if not is_rpm_installed('cups'):
logging.warning(slogm('CUPS is not installed: no printer settings will be deployed'))
return
printers = storage_get_printers(self.storage, self.sid)
if printers:
for prn in printers:
write_printer(prn)

View File

@ -30,17 +30,11 @@ import json
import os
import configparser
from .applier_frontend import (
applier_frontend
, check_enabled
)
from .applier_frontend import applier_frontend
from util.logging import slogm
from util.util import is_machine_name
class firefox_applier(applier_frontend):
__module_name = 'FirefoxApplier'
__module_experimental = True
__module_enabled = False
__registry_branch = 'Software\\Policies\\Mozilla\\Firefox'
__firefox_installdir = '/usr/lib64/firefox/distribution'
__user_settings_dir = '.mozilla/firefox'
@ -52,11 +46,6 @@ class firefox_applier(applier_frontend):
self._is_machine_name = is_machine_name(self.username)
self.policies = dict()
self.policies_json = dict({ 'policies': self.policies })
self.__module_enabled = check_enabled(
self.storage
, self.__module_name
, self.__module_experimental
)
def get_profiles(self):
'''
@ -148,11 +137,7 @@ class firefox_applier(applier_frontend):
logging.debug(slogm('Found Firefox profile in {}/{}'.format(profiledir, profile)))
def apply(self):
if self.__module_enabled:
logging.debug(slogm('Running Firefox applier for machine'))
self.machine_apply()
else:
logging.debug(slogm('Firefox applier for machine will not be started'))
self.machine_apply()
#if not self._is_machine_name:
# logging.debug('Running user applier for Firefox')
# self.user_apply()

View File

@ -1,53 +0,0 @@
#
# GPOA - GPO Applier for Linux
#
# Copyright (C) 2019-2020 BaseALT Ltd.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import logging
from util.logging import slogm
from .applier_frontend import (
applier_frontend
, check_enabled
)
from .appliers.firewall_rule import FirewallRule
class firewall_applier(applier_frontend):
__module_name = 'FirewallApplier'
__module_experimental = True
__module_enabled = False
__firewall_branch = 'SOFTWARE\\Policies\\Microsoft\\WindowsFirewall\\FirewallRules'
def __init__(self, storage):
self.storage = storage
self.firewall_settings = self.storage.filter_hklm_entries('{}%'.format(self.__firewall_branch))
self.__module_enabled = check_enabled(
self.storage
, self.__module_name
, self.__module_experimental
)
def run(self):
for setting in self.firewall_settings:
rule = FirewallRule(setting.data)
rule.apply()
def apply(self):
if self.__module_enabled:
logging.debug(slogm('Running Firewall applier for machine'))
self.run()
else:
logging.debug(slogm('Firewall applier will not be started'))

View File

@ -1,84 +0,0 @@
#
# GPOA - GPO Applier for Linux
#
# Copyright (C) 2019-2020 BaseALT Ltd.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
from pathlib import Path
from .applier_frontend import (
applier_frontend
, check_enabled
)
from .appliers.folder import Folder
from util.logging import slogm
import logging
class folder_applier(applier_frontend):
__module_name = 'FoldersApplier'
__module_experimental = False
__module_enabled = True
def __init__(self, storage, sid):
self.storage = storage
self.sid = sid
self.folders = self.storage.get_folders(self.sid)
self.__module_enabled = check_enabled(self.storage, self.__module_name, self.__module_enabled)
def apply(self):
if self.__module_enabled:
logging.debug(slogm('Running Folder applier for machine'))
for directory_obj in self.folders:
fld = Folder(directory_obj)
fld.action()
else:
logging.debug(slogm('Folder applier for machine will not be started'))
class folder_applier_user(applier_frontend):
__module_name = 'FoldersApplierUser'
__module_experimental = False
__module_enabled = True
def __init__(self, storage, sid, username):
self.storage = storage
self.sid = sid
self.username = username
self.folders = self.storage.get_folders(self.sid)
self.__module_enabled = check_enabled(
self.storage
, self.__module_name
, self.__module_experimental
)
def run(self):
for directory_obj in self.folders:
fld = Folder(directory_obj)
fld.action()
def admin_context_apply(self):
if self.__module_enabled:
logging.debug(slogm('Running Folder applier for user in administrator context'))
self.run()
else:
logging.debug(slogm('Folder applier for user in administrator context will not be started'))
def user_context_apply(self):
if self.__module_enabled:
logging.debug(slogm('Running Folder applier for user in user context'))
self.run()
else:
logging.debug(slogm('Folder applier for user administrator context will not be started'))

View File

@ -19,18 +19,12 @@
from storage import registry_factory
from .control_applier import control_applier
from .polkit_applier import (
polkit_applier
, polkit_applier_user
)
from .polkit_applier import polkit_applier
from .systemd_applier import systemd_applier
from .firefox_applier import firefox_applier
from .chromium_applier import chromium_applier
from .cups_applier import cups_applier
from .package_applier import (
package_applier
, package_applier_user
)
from .package_applier import package_applier
from .shortcut_applier import (
shortcut_applier,
shortcut_applier_user
@ -39,13 +33,6 @@ from .gsettings_applier import (
gsettings_applier,
gsettings_applier_user
)
from .firewall_applier import firewall_applier
from .folder_applier import (
folder_applier
, folder_applier_user
)
from .quota_applier import quota_applier
from .cifs_applier import cifs_applier_user
from util.windows import get_sid
from util.users import (
is_root,
@ -54,8 +41,13 @@ from util.users import (
with_privileges
)
from util.logging import slogm
from util.paths import (
frontend_module_dir
)
import logging
import os
import subprocess
def determine_username(username=None):
'''
@ -85,6 +77,12 @@ class frontend_manager:
'''
def __init__(self, username, is_machine):
frontend_module_files = frontend_module_dir().glob('**/*')
self.frontend_module_binaries = list()
for exe in frontend_module_files:
if (exe.is_file() and os.access(exe.resolve(), os.X_OK)):
self.frontend_module_binaries.append(exe)
self.storage = registry_factory('registry')
self.username = determine_username(username)
self.is_machine = is_machine
@ -92,29 +90,22 @@ class frontend_manager:
self.sid = get_sid(self.storage.get_info('domain'), self.username, is_machine)
self.machine_appliers = dict({
'control': control_applier(self.storage)
, 'polkit': polkit_applier(self.storage)
, 'systemd': systemd_applier(self.storage)
, 'firefox': firefox_applier(self.storage, self.sid, self.username)
, 'chromium': chromium_applier(self.storage, self.sid, self.username)
, 'shortcuts': shortcut_applier(self.storage)
, 'gsettings': gsettings_applier(self.storage)
, 'cups': cups_applier(self.storage)
, 'firewall': firewall_applier(self.storage)
, 'folders': folder_applier(self.storage, self.sid)
, 'quota': quota_applier(self.storage)
, 'package': package_applier(self.storage)
'control': control_applier(self.storage),
'polkit': polkit_applier(self.storage),
'systemd': systemd_applier(self.storage),
'firefox': firefox_applier(self.storage, self.sid, self.username),
'chromium': chromium_applier(self.storage, self.sid, self.username),
'shortcuts': shortcut_applier(self.storage),
'gsettings': gsettings_applier(self.storage),
'cups': cups_applier(self.storage),
'package': package_applier(self.storage)
})
# User appliers are expected to work with user-writable
# files and settings, mostly in $HOME.
self.user_appliers = dict({
'shortcuts': shortcut_applier_user(self.storage, self.sid, self.username)
, 'folders': folder_applier_user(self.storage, self.sid, self.username)
, 'gsettings': gsettings_applier_user(self.storage, self.sid, self.username)
, 'cifs': cifs_applier_user(self.storage, self.sid, self.username)
, 'package': package_applier_user(self.storage, self.sid, self.username)
, 'polkit': polkit_applier_user(self.storage, self.sid, self.username)
'shortcuts': shortcut_applier_user(self.storage, self.sid, self.username),
'gsettings': gsettings_applier_user(self.storage, self.sid, self.username)
})
def machine_apply(self):
@ -125,33 +116,36 @@ class frontend_manager:
logging.error('Not sufficient privileges to run machine appliers')
return
logging.debug(slogm('Applying computer part of settings'))
for applier_name, applier_object in self.machine_appliers.items():
try:
applier_object.apply()
except Exception as exc:
logging.error('Error occured while running applier {}: {}'.format(applier_name, exc))
for exe in self.frontend_module_binaries:
subprocess.check_call([exe.resolve()])
self.machine_appliers['systemd'].apply()
self.machine_appliers['control'].apply()
self.machine_appliers['polkit'].apply()
self.machine_appliers['firefox'].apply()
self.machine_appliers['chromium'].apply()
self.machine_appliers['shortcuts'].apply()
self.machine_appliers['gsettings'].apply()
self.machine_appliers['cups'].apply()
#self.machine_appliers['package'].apply()
def user_apply(self):
'''
Run appliers for users.
'''
if is_root():
for applier_name, applier_object in self.user_appliers.items():
try:
applier_object.admin_context_apply()
except Exception as exc:
logging.error('Error occured while running applier {}: {}'.format(applier_name, exc))
logging.debug(slogm('Running user appliers from administrator context'))
self.user_appliers['shortcuts'].admin_context_apply()
self.user_appliers['gsettings'].admin_context_apply()
try:
with_privileges(self.username, applier_object.user_context_apply)
except Exception as exc:
logging.error('Error occured while running applier {}: {}'.format(applier_name, exc))
logging.debug(slogm('Running user appliers for user context'))
with_privileges(self.username, self.user_appliers['shortcuts'].user_context_apply)
with_privileges(self.username, self.user_appliers['gsettings'].user_context_apply)
else:
for applier_name, applier_object in self.user_appliers.items():
try:
applier_object.user_context_apply()
except Exception as exc:
logging.error('Error occured while running applier {}: {}'.format(applier_name, exc))
logging.debug(slogm('Running user appliers from user context'))
self.user_appliers['shortcuts'].user_context_apply()
self.user_appliers['gsettings'].user_context_apply()
def apply_parameters(self):
'''

View File

@ -20,10 +20,7 @@ import logging
import os
import subprocess
from .applier_frontend import (
applier_frontend
, check_enabled
)
from .applier_frontend import applier_frontend
from .appliers.gsettings import (
system_gsetting,
user_gsetting
@ -31,9 +28,6 @@ from .appliers.gsettings import (
from util.logging import slogm
class gsettings_applier(applier_frontend):
__module_name = 'GSettingsApplier'
__module_experimental = True
__module_enabled = False
__registry_branch = 'Software\\BaseALT\\Policies\\gsettings'
__global_schema = '/usr/share/glib-2.0/schemas'
@ -43,13 +37,8 @@ class gsettings_applier(applier_frontend):
self.gsettings_keys = self.storage.filter_hklm_entries(gsettings_filter)
self.gsettings = list()
self.override_file = os.path.join(self.__global_schema, '0_policy.gschema.override')
self.__module_enabled = check_enabled(
self.storage
, self.__module_name
, self.__module_experimental
)
def run(self):
def apply(self):
# Cleanup settings from previous run
if os.path.exists(self.override_file):
logging.debug(slogm('Removing GSettings policy file from previous run'))
@ -73,17 +62,7 @@ class gsettings_applier(applier_frontend):
except Exception as exc:
logging.debug(slogm('Error recompiling global GSettings schemas'))
def apply(self):
if self.__module_enabled:
logging.debug(slogm('Running GSettings applier for machine'))
else:
logging.debug(slogm('GSettings applier for machine will not be started'))
class gsettings_applier_user(applier_frontend):
__module_name = 'GSettingsApplierUser'
__module_experimental = True
__module_enabled = False
__registry_branch = 'Software\\BaseALT\\Policies\\gsettings'
def __init__(self, storage, sid, username):
@ -93,9 +72,8 @@ class gsettings_applier_user(applier_frontend):
gsettings_filter = '{}%'.format(self.__registry_branch)
self.gsettings_keys = self.storage.filter_hkcu_entries(self.sid, gsettings_filter)
self.gsettings = list()
self.__module_enabled = check_enabled(self.storage, self.__module_name, self.__module_enabled)
def run(self):
def user_context_apply(self):
for setting in self.gsettings_keys:
valuename = setting.hive_key.rpartition('\\')[2]
rp = valuename.rpartition('.')
@ -106,13 +84,6 @@ class gsettings_applier_user(applier_frontend):
for gsetting in self.gsettings:
gsetting.apply()
def user_context_apply(self):
if self.__module_enabled:
logging.debug(slogm('Running GSettings applier for user in user context'))
self.run()
else:
logging.debug(slogm('GSettings applier for user in user context will not be started'))
def admin_context_apply(self):
'''
Not implemented because there is no point of doing so.

View File

@ -17,84 +17,20 @@
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import logging
from util.logging import slogm
from util.rpm import (
update
, install_rpm
, remove_rpm
)
from .applier_frontend import (
applier_frontend
, check_enabled
)
from .applier_frontend import applier_frontend
from .appliers.rpm import rpm
class package_applier(applier_frontend):
__module_name = 'PackagesApplier'
__module_experimental = True
__module_enabled = False
__install_key_name = 'Install'
__remove_key_name = 'Remove'
__hklm_branch = 'Software\\BaseALT\\Policies\\Packages'
def __init__(self, storage):
self.storage = storage
install_branch = '{}\\{}%'.format(self.__hklm_branch, self.__install_key_name)
remove_branch = '{}\\{}%'.format(self.__hklm_branch, self.__remove_key_name)
self.install_packages_setting = self.storage.filter_hklm_entries(install_branch)
self.remove_packages_setting = self.storage.filter_hklm_entries(remove_branch)
self.__module_enabled = check_enabled(
self.storage
, self.__module_name
, self.__module_experimental
)
def run(self):
if 0 < self.install_packages_setting.count() or 0 < self.remove_packages_setting.count():
update()
for package in self.install_packages_setting:
try:
install_rpm(package.data)
except Exception as exc:
logging.error(exc)
for package in self.remove_packages_setting:
try:
remove_rpm(package.data)
except Exception as exc:
logging.error(exc)
def apply(self):
if self.__module_enabled:
logging.debug(slogm('Running Package applier for machine'))
self.run()
else:
logging.debug(slogm('Package applier for machine will not be started'))
pass
class package_applier_user(applier_frontend):
__module_name = 'PackagesApplierUser'
__module_experimental = True
__module_enabled = False
__install_key_name = 'Install'
__remove_key_name = 'Remove'
__hkcu_branch = 'Software\\BaseALT\\Policies\\Packages'
def __init__(self, storage, sid, username):
self.storage = storage
self.sid = sid
self.username = username
install_branch = '{}\\{}%'.format(self.__hkcu_branch, self.__install_key_name)
remove_branch = '{}\\{}%'.format(self.__hkcu_branch, self.__remove_key_name)
self.install_packages_setting = self.storage.filter_hkcu_entries(self.sid, install_branch)
self.remove_packages_setting = self.storage.filter_hkcu_entries(self.sid, remove_branch)
self.__module_enabled = check_enabled(self.storage, self.__module_name, self.__module_enabled)
def __init__(self):
pass
def user_context_apply(self):
'''
@ -102,29 +38,10 @@ class package_applier_user(applier_frontend):
'''
pass
def run(self):
if 0 < self.install_packages_setting.count() or 0 < self.remove_packages_setting.count():
update()
for package in self.install_packages_setting:
try:
install_rpm(package.data)
except Exception as exc:
logging.debug(exc)
for package in self.remove_packages_setting:
try:
remove_rpm(package.data)
except Exception as exc:
logging.debug(exc)
def admin_context_apply(self):
'''
Install software assigned to specified username regardless
which computer he uses to log into system.
'''
if self.__module_enabled:
logging.debug(slogm('Running Package applier for user in administrator context'))
self.run()
else:
logging.debug(slogm('Package applier for user in administrator context will not be started'))
pass

View File

@ -16,22 +16,16 @@
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
from .applier_frontend import (
applier_frontend
, check_enabled
)
from .applier_frontend import applier_frontend
from .appliers.polkit import polkit
from util.logging import slogm
import logging
class polkit_applier(applier_frontend):
__module_name = 'PolkitApplier'
__module_experimental = False
__module_enabled = True
__deny_all = 'Software\\Policies\\Microsoft\\Windows\\RemovableStorageDevices\\Deny_All'
__polkit_map = {
__deny_all: ['49-gpoa_disk_permissions', { 'Deny_All': 0 }]
__deny_all: ['99-gpoa_disk_permissions', { 'Deny_All': 0 }]
}
def __init__(self, storage):
@ -47,66 +41,11 @@ class polkit_applier(applier_frontend):
logging.debug(slogm('Deny_All setting not found'))
self.policies = []
self.policies.append(polkit(template_file, template_vars))
self.__module_enabled = check_enabled(
self.storage
, self.__module_name
, self.__module_experimental
)
def apply(self):
'''
Trigger control facility invocation.
'''
if self.__module_enabled:
logging.debug(slogm('Running Polkit applier for machine'))
for policy in self.policies:
policy.generate()
else:
logging.debug(slogm('Polkit applier for machine will not be started'))
class polkit_applier_user(applier_frontend):
__module_name = 'PolkitApplierUser'
__module_experimental = False
__module_enabled = True
__deny_all = 'Software\\Policies\\Microsoft\\Windows\\RemovableStorageDevices\\Deny_All'
__polkit_map = {
__deny_all: ['48-gpoa_disk_permissions_user', { 'Deny_All': 0, 'User': '' }]
}
def __init__(self, storage, sid, username):
self.storage = storage
self.sid = sid
self.username = username
deny_all = storage.filter_hkcu_entries(self.sid, self.__deny_all).first()
# Deny_All hook: initialize defaults
template_file = self.__polkit_map[self.__deny_all][0]
template_vars = self.__polkit_map[self.__deny_all][1]
if deny_all:
logging.debug(slogm('Deny_All setting for user {} found: {}'.format(self.username, deny_all.data)))
self.__polkit_map[self.__deny_all][1]['Deny_All'] = deny_all.data
self.__polkit_map[self.__deny_all][1]['User'] = self.username
else:
logging.debug(slogm('Deny_All setting not found'))
self.policies = []
self.policies.append(polkit(template_file, template_vars, self.username))
self.__module_enabled = check_enabled(
self.storage
, self.__module_name
, self.__module_experimental
)
def user_context_apply(self):
pass
def admin_context_apply(self):
'''
Trigger control facility invocation.
'''
if self.__module_enabled:
logging.debug(slogm('Running Polkit applier for user in administrator context'))
for policy in self.policies:
policy.generate()
else:
logging.debug(slogm('Polkit applier for user in administrator context will not be started'))
for policy in self.policies:
policy.generate()

View File

@ -1,61 +0,0 @@
#
# GPOA - GPO Applier for Linux
#
# Copyright (C) 2019-2020 BaseALT Ltd.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import logging
from enum import Enum
from util.logging import slogm
class QuotaUnits(Enum):
KILOBYTES = 1
MEGABYTES = 2
class quota_applier:
__registry_branch = 'Software\\Policies\\Microsoft\\Windows NT\\DiskQuota'
__diskquota_key_enabled = 'Enable'
# Hard limit
__diskquota_key_limit = 'Limit'
__diskquota_key_limit_units = 'LimitUnits'
# Warning limit
__diskquota_key_threshold = 'Threshold'
__diskquota_key_threshold_units = 'ThresholdUnits'
# Switch between hard and soft quota
__diskquota_key_limit_type = 'Enforce'
__module_name = 'QuotaApplier'
__module_enabled = False
__module_experimental = True
def __init__(self, storage, username, sid):
self.storage = storage
self.username = username
self.sid = sid
def run(self):
pass
def apply(self):
if self.__module_enabled:
logging.debug(slogm('Running Quota applier for machine'))
self.run()
else:
logging.debug(slogm('Quota applier for machine will not be started'))

View File

@ -19,10 +19,7 @@
import logging
import subprocess
from .applier_frontend import (
applier_frontend
, check_enabled
)
from .applier_frontend import applier_frontend
from gpt.shortcuts import json2sc
from util.windows import expand_windows_var
from util.logging import slogm
@ -50,12 +47,7 @@ def write_shortcut(shortcut, username=None):
:username: None means working with machine variables and paths
'''
dest_abspath = shortcut.dest
if not dest_abspath.startswith('/') and not dest_abspath.startswith('%'):
dest_abspath = '%HOME%/' + dest_abspath
logging.debug(slogm('Try to expand path for shortcut: {} for {}'.format(dest_abspath, username)))
dest_abspath = expand_windows_var(dest_abspath, username).replace('\\', '/') + '.desktop'
dest_abspath = expand_windows_var(shortcut.dest, username).replace('\\', '/') + '.desktop'
# Check that we're working for user, not on global system level
if username:
@ -66,35 +58,15 @@ def write_shortcut(shortcut, username=None):
if not homedir_exists(username):
logging.warning(slogm('No home directory exists for user {}: will not create link {}'.format(username, dest_abspath)))
return None
else:
logging.warning(slogm('User\'s shortcut not placed to home directory for {}: bad path {}'.format(username, dest_abspath)))
return None
if '%' in dest_abspath:
logging.debug(slogm('Fail for writing shortcut to file with \'%\': {}'.format(dest_abspath)))
return None
if not dest_abspath.startswith('/'):
logging.debug(slogm('Fail for writing shortcut to not absolute path \'%\': {}'.format(dest_abspath)))
return None
logging.debug(slogm('Writing shortcut file to {}'.format(dest_abspath)))
shortcut.write_desktop(dest_abspath)
class shortcut_applier(applier_frontend):
__module_name = 'ShortcutsApplier'
__module_experimental = False
__module_enabled = True
def __init__(self, storage):
self.storage = storage
self.__module_enabled = check_enabled(
self.storage
, self.__module_name
, self.__module_experimental
)
def run(self):
def apply(self):
shortcuts = storage_get_shortcuts(self.storage, self.storage.get_info('machine_sid'))
if shortcuts:
for sc in shortcuts:
@ -107,24 +79,13 @@ class shortcut_applier(applier_frontend):
# /usr/local/share/applications
subprocess.check_call(['/usr/bin/update-desktop-database'])
def apply(self):
if self.__module_enabled:
logging.debug(slogm('Running Shortcut applier for machine'))
self.run()
else:
logging.debug(slogm('Shortcut applier for machine will not be started'))
class shortcut_applier_user(applier_frontend):
__module_name = 'ShortcutsApplierUser'
__module_experimental = False
__module_enabled = True
def __init__(self, storage, sid, username):
self.storage = storage
self.sid = sid
self.username = username
def run(self):
def user_context_apply(self):
shortcuts = storage_get_shortcuts(self.storage, self.sid)
if shortcuts:
@ -134,17 +95,13 @@ class shortcut_applier_user(applier_frontend):
else:
logging.debug(slogm('No shortcuts to process for {}'.format(self.sid)))
def user_context_apply(self):
if self.__module_enabled:
logging.debug(slogm('Running Shortcut applier for user in user context'))
self.run()
else:
logging.debug(slogm('Shortcut applier for user in user context will not be started'))
def admin_context_apply(self):
if self.__module_enabled:
logging.debug(slogm('Running Shortcut applier for user in administrator context'))
self.run()
else:
logging.debug(slogm('Shortcut applier for user in administrator context will not be started'))
shortcuts = storage_get_shortcuts(self.storage, self.sid)
if shortcuts:
for sc in shortcuts:
if not sc.is_usercontext():
write_shortcut(sc, self.username)
else:
logging.debug(slogm('No shortcuts to process for {}'.format(self.sid)))

View File

@ -16,32 +16,24 @@
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
from .applier_frontend import (
applier_frontend
, check_enabled
)
from .applier_frontend import applier_frontend
from .appliers.systemd import systemd_unit
from util.logging import slogm
import logging
class systemd_applier(applier_frontend):
__module_name = 'SystemdApplier'
__module_experimental = False
__module_enabled = True
__registry_branch = 'Software\\BaseALT\\Policies\\SystemdUnits'
def __init__(self, storage):
self.storage = storage
self.systemd_unit_settings = self.storage.filter_hklm_entries('Software\\BaseALT\\Policies\\SystemdUnits%')
self.units = []
self.__module_enabled = check_enabled(
self.storage
, self.__module_name
, self.__module_experimental
)
def run(self):
def apply(self):
'''
Trigger control facility invocation.
'''
for setting in self.systemd_unit_settings:
valuename = setting.hive_key.rpartition('\\')[2]
try:
@ -55,20 +47,7 @@ class systemd_applier(applier_frontend):
except:
logging.error(slogm('Failed applying unit {}'.format(unit.unit_name)))
def apply(self):
'''
Trigger control facility invocation.
'''
if self.__module_enabled:
logging.debug(slogm('Running systemd applier for machine'))
self.run()
else:
logging.debug(slogm('systemd applier for machine will not be started'))
class systemd_applier_user(applier_frontend):
__module_name = 'SystemdApplierUser'
__module_experimental = False
__module_enabled = True
__registry_branch = 'Software\\BaseALT\\Policies\\SystemdUnits'
def __init__(self, storage, sid, username):

View File

@ -27,10 +27,7 @@ from frontend.frontend_manager import frontend_manager, determine_username
from plugin import plugin_manager
from util.util import get_machine_name
from util.kerberos import (
machine_kinit
, machine_kdestroy
)
from util.kerberos import machine_kinit
from util.users import (
is_root,
get_process_user
@ -76,7 +73,7 @@ class gpoa_controller:
user = get_machine_name()
self.is_machine = True
set_loglevel(self.__args.loglevel)
self.cache_path = '/var/cache/gpupdate/creds/krb5cc_{}'.format(os.getpid())
self.__kinit_successful = machine_kinit()
uname = get_process_user()
uid = os.getuid()
@ -91,12 +88,9 @@ class gpoa_controller:
'''
GPOA controller entry point
'''
self.__kinit_successful = machine_kinit(self.cache_path)
self.start_plugins()
self.start_backend()
self.start_frontend()
if self.__kinit_successful:
machine_kdestroy()
def start_backend(self):
'''
@ -111,10 +105,7 @@ class gpoa_controller:
if is_root():
back = backend_factory(dc, self.username, self.is_machine, nodomain)
if back:
try:
back.retrieve_and_store()
except Exception as exc:
logging.error(slogm('Backend execution error: {}'.format(str(exc))))
back.retrieve_and_store()
def start_frontend(self):
'''
@ -139,7 +130,6 @@ def main():
controller.run()
if __name__ == "__main__":
default_handler = signal.getsignal(signal.SIGINT)
signal.signal(signal.SIGINT, signal_handler)
main()

View File

@ -22,13 +22,24 @@ from Crypto.Cipher import AES
from util.xml import get_xml_root
def read_drives(drives_file):
drives = list()
for drive in get_xml_root(drives_file):
drive_obj = drivemap()
props = drive.find('Properties')
drive_obj.set_login(props.get('username'))
drive_obj.set_pass(props.get('cpassword'))
drives.append(drive_obj)
return drives
def decrypt_pass(cpassword):
'''
AES key for cpassword decryption: http://msdn.microsoft.com/en-us/library/2c15cbf0-f086-4c74-8b70-1f2fa45dd4be%28v=PROT.13%29#endNote2
'''
if not cpassword:
return cpassword
key = (
b'\x4e\x99\x06\xe8'
b'\xfc\xb6\x6c\xc9'
@ -42,80 +53,23 @@ def decrypt_pass(cpassword):
cpass_len = len(cpassword)
padded_pass = (cpassword + "=" * ((4 - cpass_len % 4) % 4))
password = b64decode(padded_pass)
decrypter = AES.new(key, AES.MODE_CBC, '\x00' * 16)
decrypter = AES(key, AES.MODE_CBC, '\x00' * 16)
# decrypt() returns byte array which is immutable and we need to
# strip padding, then convert UTF-16LE to UTF-8
binstr = decrypter.decrypt(password)
by = list()
for item in binstr:
if item != 16:
by.append(item)
utf16str = bytes(by).decode('utf-16', 'ignore')
utf8str = utf16str.encode('utf8')
return utf8str.decode()
def read_drives(drives_file):
drives = list()
for drive in get_xml_root(drives_file):
drive_obj = drivemap()
props = drive.find('Properties')
drive_obj.set_login(props.get('username'))
drive_obj.set_pass(decrypt_pass(props.get('cpassword')))
drive_obj.set_dir(props.get('letter'))
drive_obj.set_path(props.get('path'))
drives.append(drive_obj)
return drives
def merge_drives(storage, sid, drive_objects, policy_name):
for drive in drive_objects:
storage.add_drive(sid, drive, policy_name)
def json2drive(json_str):
json_obj = json.loads(json_str)
drive_obj = drivemap()
drive_obj.set_login(json_obj['login'])
drive_obj.set_pass(json_obj['password'])
drive_obj.set_dir(json_obj['dir'])
drive_obj.set_path(json_obj['path'])
return drive_obj
return decrypter.decrypt(password)
class drivemap:
def __init__(self):
self.login = None
self.password = None
self.dir = None
self.path = None
def set_login(self, username):
self.login = username
if not username:
self.login = ''
def set_pass(self, password):
self.password = password
if not password:
self.password = ''
def set_dir(self, path):
self.dir = path
def set_path(self, path):
self.path = path
def to_json(self):
drive = dict()
drive['login'] = self.login
drive['password'] = self.password
drive['dir'] = self.dir
drive['path'] = self.path
contents = dict()
contents['drive'] = drive

View File

@ -28,10 +28,6 @@ def read_envvars(envvars_file):
return variables
def merge_envvars(storage, sid, envvars_objects, policy_name):
for envvar in envvar_objects:
pass
class envvar:
def __init__(self):
pass

View File

@ -28,10 +28,6 @@ def read_files(filesxml):
return files
def merge_files(storage, sid, file_objects, policy_name):
for fileobj in file_objects:
pass
class fileentry:
def __init__(self):
pass

View File

@ -16,83 +16,19 @@
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
from enum import Enum
from util.xml import get_xml_root
class FileAction(Enum):
CREATE = 'C'
REPLACE = 'R'
UPDATE = 'U'
DELETE = 'D'
def action_letter2enum(letter):
if letter in ['C', 'R', 'U', 'D']:
if letter == 'C': return FileAction.CREATE
if letter == 'R': return FileAction.REPLACE
if letter == 'U': return FileAction.UPDATE
if letter == 'D': return FileAction.DELETE
return FileAction.CREATE
def action_enum2letter(enumitem):
return enumitem.value
def folder_int2bool(val):
value = val
if type(value) == str:
value = int(value)
if value == 0:
return True
return False
def read_folders(folders_file):
folders = list()
for fld in get_xml_root(folders_file):
props = fld.find('Properties')
fld_obj = folderentry(props.get('path'))
fld_obj.set_action(action_letter2enum(props.get('action')))
fld_obj.set_delete_folder(folder_int2bool(props.get('deleteFolder')))
fld_obj.set_delete_sub_folder(folder_int2bool(props.get('deleteSubFolders')))
fld_obj.set_delete_files(folder_int2bool(props.get('deleteFiles')))
fld_obj = folderentry()
folders.append(fld_obj)
return folders
def merge_folders(storage, sid, folder_objects, policy_name):
for folder in folder_objects:
storage.add_folder(sid, folder, policy_name)
class folderentry:
def __init__(self, path):
self.path = path
self.action = FileAction.CREATE
self.delete_folder = False
self.delete_sub_folder = False
self.delete_files = False
def set_action(self, action):
self.action = action
def set_delete_folder(self, del_bool):
self.delete_folder = del_bool
def set_delete_sub_folder(self, del_bool):
self.delete_sub_folder = del_bool
def set_delete_files(self, del_bool):
self.delete_files = del_bool
def __init__(self):
pass

View File

@ -18,54 +18,18 @@
import logging
import os
from pathlib import Path
from enum import Enum, unique
from samba.gp_parse.gp_pol import GPPolParser
from storage import registry_factory
from .polfile import (
read_polfile
, merge_polfile
)
from .shortcuts import (
read_shortcuts
, merge_shortcuts
)
from .services import (
read_services
, merge_services
)
from .printers import (
read_printers
, merge_printers
)
from .inifiles import (
read_inifiles
, merge_inifiles
)
from .folders import (
read_folders
, merge_folders
)
from .files import (
read_files
, merge_files
)
from .envvars import (
read_envvars
, merge_envvars
)
from .drives import (
read_drives
, merge_drives
)
from .tasks import (
read_tasks
, merge_tasks
)
from .shortcuts import read_shortcuts
from .services import read_services
from .printers import read_printers
from .inifiles import read_inifiles
from .folders import read_folders
from .files import read_files
from .envvars import read_envvars
from .drives import read_drives
import util
import util.preg
from util.paths import (
@ -75,71 +39,6 @@ from util.paths import (
)
from util.logging import slogm
@unique
class FileType(Enum):
PREG = 'registry.pol'
SHORTCUTS = 'shortcuts.xml'
FOLDERS = 'folders.xml'
FILES = 'files.xml'
DRIVES = 'drives.xml'
SCHEDULEDTASKS = 'scheduledtasks.xml'
ENVIRONMENTVARIABLES = 'environmentvariables.xml'
INIFILES = 'inifiles.xml'
SERVICES = 'services.xml'
PRINTERS = 'printers.xml'
def get_preftype(path_to_file):
fpath = Path(path_to_file)
if fpath.exists():
file_name = fpath.name.lower()
for item in FileType:
if file_name == item.value:
return item
return None
def pref_parsers():
parsers = dict()
parsers[FileType.PREG] = read_polfile
parsers[FileType.SHORTCUTS] = read_shortcuts
parsers[FileType.FOLDERS] = read_folders
parsers[FileType.FILES] = read_files
parsers[FileType.DRIVES] = read_drives
parsers[FileType.SCHEDULEDTASKS] = read_tasks
parsers[FileType.ENVIRONMENTVARIABLES] = read_envvars
parsers[FileType.INIFILES] = read_inifiles
parsers[FileType.SERVICES] = read_services
parsers[FileType.PRINTERS] = read_printers
return parsers
def get_parser(preference_type):
parsers = pref_parsers()
return parsers[preference_type]
def pref_mergers():
mergers = dict()
mergers[FileType.PREG] = merge_polfile
mergers[FileType.SHORTCUTS] = merge_shortcuts
mergers[FileType.FOLDERS] = merge_folders
mergers[FileType.FILES] = merge_files
mergers[FileType.DRIVES] = merge_drives
mergers[FileType.SCHEDULEDTASKS] = merge_tasks
mergers[FileType.ENVIRONMENTVARIABLES] = merge_envvars
mergers[FileType.INIFILES] = merge_inifiles
mergers[FileType.SERVICES] = merge_services
mergers[FileType.PRINTERS] = merge_printers
return mergers
def get_merger(preference_type):
mergers = pref_mergers()
return mergers[preference_type]
class gpt:
__user_policy_mode_key = 'Software\\Policies\\Microsoft\\Windows\\System\\UserPolicyMode'
@ -147,8 +46,13 @@ class gpt:
self.path = gpt_path
self.sid = sid
self.storage = registry_factory('registry')
self.name = ''
self._scan_gpt()
def _scan_gpt(self):
'''
Collect the data from the specified GPT on file system (cached
by Samba).
'''
self.guid = self.path.rpartition('/')[2]
self.name = ''
if 'default' == self.guid:
@ -156,30 +60,13 @@ class gpt:
self._machine_path = find_dir(self.path, 'Machine')
self._user_path = find_dir(self.path, 'User')
self._machine_prefs = find_dir(self._machine_path, 'Preferences')
self._user_prefs = find_dir(self._user_path, 'Preferences')
self.settings_list = [
'shortcuts'
, 'drives'
, 'environmentvariables'
, 'printers'
, 'folders'
, 'files'
, 'inifiles'
, 'services'
, 'scheduledtasks'
]
self.settings = dict()
self.settings['machine'] = dict()
self.settings['user'] = dict()
self.settings['machine']['regpol'] = find_file(self._machine_path, 'registry.pol')
self.settings['user']['regpol'] = find_file(self._user_path, 'registry.pol')
for setting in self.settings_list:
machine_preffile = find_preffile(self._machine_path, setting)
user_preffile = find_preffile(self._user_path, setting)
logging.debug('Looking for {} in machine part of GPT {}: {}'.format(setting, self.name, machine_preffile))
self.settings['machine'][setting] = machine_preffile
logging.debug('Looking for {} in user part of GPT {}: {}'.format(setting, self.name, user_preffile))
self.settings['user'][setting] = user_preffile
logging.debug(slogm('Looking for machine part of GPT {}'.format(self.guid)))
self._find_machine()
logging.debug(slogm('Looking for user part of GPT {}'.format(self.guid)))
self._find_user()
def set_name(self, name):
'''
@ -202,41 +89,142 @@ class gpt:
return upm
def _find_user(self):
self._user_regpol = self._find_regpol('user')
self._user_shortcuts = self._find_shortcuts('user')
def _find_machine(self):
self._machine_regpol = self._find_regpol('machine')
self._machine_shortcuts = self._find_shortcuts('machine')
def _find_regpol(self, part):
'''
Find Registry.pol files.
'''
search_path = self._machine_path
if 'user' == part:
search_path = self._user_path
if not search_path:
return None
return find_file(search_path, 'registry.pol')
def _find_shortcuts(self, part):
'''
Find Shortcuts.xml files.
'''
shortcuts_dir = find_dir(self._machine_prefs, 'Shortcuts')
shortcuts_file = find_file(shortcuts_dir, 'shortcuts.xml')
if 'user' == part:
shortcuts_dir = find_dir(self._user_prefs, 'Shortcuts')
shortcuts_file = find_file(shortcuts_dir, 'shortcuts.xml')
return shortcuts_file
def _find_envvars(self, part):
'''
Find EnvironmentVariables.xml files.
'''
search_path = os.path.join(self._machine_path, 'Preferences', 'EnvironmentVariables')
if 'user' == part:
search_path = os.path.join(self._user_path, 'Preferences', 'EnvironmentVariables')
if not search_path:
return None
return find_file(search_path, 'environmentvariables.xml')
def _find_drives(self, part):
'''
Find Drives.xml files.
'''
search_path = os.path.join(self._machine_path, 'Preferences', 'Drives')
if 'user' == part:
search_path = os.path.join(self._user_path, 'Preferences', 'Drives')
if not search_path:
return None
return find_file(search_path, 'drives.xml')
def _find_printers(self, part):
'''
Find Printers.xml files.
'''
search_path = os.path.join(self._machine_path, 'Preferences', 'Printers')
if 'user' == part:
search_path = os.path.join(self._user_path, 'Preferences', 'Printers')
if not search_path:
return None
return find_file(search_path, 'printers.xml')
def _merge_shortcuts(self):
shortcuts = list()
if self.sid == self.storage.get_info('machine_sid'):
shortcuts = read_shortcuts(self._machine_shortcuts)
else:
shortcuts = read_shortcuts(self._user_shortcuts)
for sc in shortcuts:
self.storage.add_shortcut(self.sid, sc)
def merge(self):
'''
Merge machine and user (if sid provided) settings to storage.
'''
if self.sid == self.storage.get_info('machine_sid'):
# Merge machine settings to registry if possible
for preference_name, preference_path in self.settings['machine'].items():
if preference_path:
preference_type = get_preftype(preference_path)
logstring = 'Reading and merging {} for {}'.format(preference_type.value, self.sid)
logging.debug(logstring)
preference_parser = get_parser(preference_type)
preference_merger = get_merger(preference_type)
preference_objects = preference_parser(preference_path)
preference_merger(self.storage, self.sid, preference_objects, self.name)
if self.settings['user']['regpol']:
logging.debug(slogm('Merging machine(user) settings from {}'.format(self.settings['machine']['regpol'])))
util.preg.merge_polfile(self.settings['user']['regpol'], sid=self.sid, policy_name=self.name)
if self.settings['machine']['regpol']:
logging.debug(slogm('Merging machine settings from {}'.format(self.settings['machine']['regpol'])))
util.preg.merge_polfile(self.settings['machine']['regpol'], policy_name=self.name)
if self._machine_regpol:
logging.debug(slogm('Merging machine settings from {}'.format(self._machine_regpol)))
util.preg.merge_polfile(self._machine_regpol)
if self._user_regpol:
logging.debug(slogm('Merging machine(user) settings from {}'.format(self._machine_regpol)))
util.preg.merge_polfile(self._user_regpol, self.sid)
if self._machine_shortcuts:
logging.debug(slogm('Merging machine shortcuts from {}'.format(self._machine_shortcuts)))
self._merge_shortcuts()
else:
# Merge user settings if UserPolicyMode set accordingly
# and user settings (for HKCU) are exist.
policy_mode = upm2str(self.get_policy_mode())
if 'Merge' == policy_mode or 'Not configured' == policy_mode:
for preference_name, preference_path in self.settings['user'].items():
if preference_path:
preference_type = get_preftype(preference_path)
logstring = 'Reading and merging {} for {}'.format(preference_type.value, self.sid)
logging.debug(logstring)
preference_parser = get_parser(preference_type)
preference_merger = get_merger(preference_type)
preference_objects = preference_parser(preference_path)
preference_merger(self.storage, self.sid, preference_objects, self.name)
if self._user_regpol:
logging.debug(slogm('Merging user settings from {} for {}'.format(self._user_regpol, self.sid)))
util.preg.merge_polfile(self._user_regpol, self.sid)
if self._user_shortcuts:
logging.debug(slogm('Merging user shortcuts from {} for {}'.format(self._user_shortcuts, self.sid)))
self._merge_shortcuts()
def __str__(self):
template = '''
GUID: {}
Name: {}
For SID: {}
Machine part: {}
Machine Registry.pol: {}
Machine Shortcuts.xml: {}
User part: {}
User Registry.pol: {}
User Shortcuts.xml: {}
'''
result = template.format(
self.guid,
self.name,
self.sid,
self._machine_path,
self._machine_regpol,
self._machine_shortcuts,
self._user_path,
self._user_regpol,
self._user_shortcuts,
)
return result
def find_dir(search_path, name):
'''
@ -281,33 +269,6 @@ def find_file(search_path, name):
return None
def find_preferences(search_path):
'''
Find 'Preferences' directory
'''
if not search_path:
return None
return find_dir(search_path, 'Preferences')
def find_preffile(search_path, prefname):
'''
Find file with path like Preferences/prefname/prefname.xml
'''
# Look for 'Preferences' directory
prefdir = find_preferences(search_path)
if not prefdir:
return None
# Then search for preference directory
pref_dir = find_dir(prefdir, prefname)
file_name = '{}.xml'.format(prefname)
# And then try to find the corresponding file.
pref_file = find_file(pref_dir, file_name)
return pref_file
def lp2gpt():
'''
Convert local-policy to full-featured GPT.

View File

@ -28,10 +28,6 @@ def read_inifiles(inifiles_file):
return inifiles
def merge_inifiles(storage, sid, inifile_objects, policy_name):
for inifile in inifile_objects:
pass
def inifile():
def __init__(self):
pass

View File

@ -1,32 +0,0 @@
#
# GPOA - GPO Applier for Linux
#
# Copyright (C) 2019-2020 BaseALT Ltd.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
from util.preg import (
load_preg
)
def read_polfile(filename):
return load_preg(filename).entries
def merge_polfile(storage, sid, policy_objects, policy_name):
for entry in policy_objects:
if not sid:
storage.add_hklm_entry(entry, policy_name)
else:
storage.add_hkcu_entry(entry, sid, policy_name)

View File

@ -41,10 +41,6 @@ def read_printers(printers_file):
return printers
def merge_printers(storage, sid, printer_objects, policy_name):
for device in printer_objects:
storage.add_printer(sid, device, policy_name)
def json2printer(json_str):
'''
Build printer object out of string-serialized JSON.

View File

@ -39,10 +39,6 @@ def read_services(service_file):
return services
def merge_services(storage, sid, service_objects, policy_name):
for srv in service_objects:
pass
class service:
def __init__(self, name):
self.unit = name

View File

@ -84,15 +84,10 @@ def read_shortcuts(shortcuts_file):
sc.set_clsid(link.get('clsid'))
sc.set_guid(link.get('uid'))
sc.set_usercontext(link.get('userContext', False))
sc.set_icon(props.get('iconPath'))
shortcuts.append(sc)
return shortcuts
def merge_shortcuts(storage, sid, shortcut_objects, policy_name):
for shortcut in shortcut_objects:
storage.add_shortcut(sid, shortcut, policy_name)
def json2sc(json_str):
'''
Build shortcut out of string-serialized JSON
@ -105,8 +100,6 @@ def json2sc(json_str):
sc.set_clsid(json_obj['clsid'])
sc.set_guid(json_obj['guid'])
sc.set_usercontext(json_obj['is_in_user_context'])
if 'icon' in json_obj:
sc.set_icon(json_obj['icon'])
return sc
@ -124,7 +117,6 @@ class shortcut:
self.arguments = arguments
self.name = name
self.changed = ''
self.icon = None
self.is_in_user_context = self.set_usercontext()
self.type = ttype
@ -144,9 +136,6 @@ class shortcut:
def set_guid(self, uid):
self.guid = uid
def set_icon(self, icon_name):
self.icon = icon_name
def set_type(self, ttype):
'''
Set type of the hyperlink - FILESYSTEM or URL
@ -183,8 +172,7 @@ class shortcut:
content['changed'] = self.changed
content['is_in_user_context'] = self.is_in_user_context
content['type'] = ttype2str(self.type)
if self.icon:
content['icon'] = self.icon
result = self.desktop()
result.content.update(content)
@ -211,9 +199,6 @@ class shortcut:
self.desktop_file.set('Terminal', 'false')
self.desktop_file.set('Exec', '{} {}'.format(self.path, self.arguments))
if self.icon:
self.desktop_file.set('Icon', self.icon)
return self.desktop_file
def write_desktop(self, dest):

View File

@ -39,8 +39,6 @@ from util.dbus import (
)
from util.signals import signal_handler
from messages import message_with_code
logging.basicConfig(level=logging.DEBUG)
class file_runner:
@ -130,7 +128,7 @@ def runner_factory(args, target):
user_runner = file_runner(username)
return (computer_runner, user_runner)
else:
logging.error(message_with_code('E1'))
logging.error('Insufficient permissions to run gpupdate')
return None
@ -153,7 +151,7 @@ def main():
logging.error('Error running GPOA for user: {}'.format(exc))
return int(ExitCodeUpdater.FAIL_GPUPDATE_USER_NOREPLY)
else:
logging.error(message_with_code('E2'))
logging.error('gpupdate will not be started')
return int(ExitCodeUpdater.FAIL_NO_RUNNER)
return int(ExitCodeUpdater.EXIT_SUCCESS)

View File

@ -1,65 +0,0 @@
#
# GPOA - GPO Applier for Linux
#
# Copyright (C) 2019-2020 BaseALT Ltd.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
def info_code(code):
info_ids = dict()
info_ids[1] = ''
info_ids[2] = ''
return info_ids.get(code, 'Unknown info code')
def error_code(code):
error_ids = dict()
error_ids[1] = 'Insufficient permissions to run gpupdate'
error_ids[2] = 'gpupdate will not be started'
return error_ids.get(code, 'Unknown error code')
def debug_code(code):
debug_ids = dict()
debug_ids[1] = ''
debug_ids[2] = ''
return debug_ids.get(code, 'Unknown debug code')
def warning_code(code):
warning_ids = dict()
warning_ids[1] = ''
warning_ids[2] = ''
return warning_ids.get(code, 'Unknown warning code')
def get_message(code):
retstr = 'Unknown message type, no message assigned'
if code.startswith('E'):
retstr = error_code(int(code[1:]))
if code.startswith('I'):
retstr = info_code(int(code[1:]))
if code.startswith('D'):
retstr = debug_code(int(code[1:]))
if code.startswith('W'):
retstr = warning_code(int(code[1:]))
return retstr
def message_with_code(code):
retstr = '[' + code[0:1] + code[1:].rjust(5, '0') + ']: ' + get_message(code)
return retstr

View File

@ -20,131 +20,40 @@ class samba_preg(object):
'''
Object mapping representing HKLM entry (registry key without SID)
'''
def __init__(self, preg_obj, policy_name):
self.policy_name = policy_name
def __init__(self, preg_obj):
self.hive_key = '{}\\{}'.format(preg_obj.keyname, preg_obj.valuename)
self.type = preg_obj.type
self.data = preg_obj.data
def update_fields(self):
fields = dict()
fields['policy_name'] = self.policy_name
fields['type'] = self.type
fields['data'] = self.data
return fields
class samba_hkcu_preg(object):
'''
Object mapping representing HKCU entry (registry key with SID)
'''
def __init__(self, sid, preg_obj, policy_name):
def __init__(self, sid, preg_obj):
self.sid = sid
self.policy_name = policy_name
self.hive_key = '{}\\{}'.format(preg_obj.keyname, preg_obj.valuename)
self.type = preg_obj.type
self.data = preg_obj.data
def update_fields(self):
fields = dict()
fields['policy_name'] = self.policy_name
fields['type'] = self.type
fields['data'] = self.data
return fields
class ad_shortcut(object):
'''
Object mapping representing Windows shortcut.
'''
def __init__(self, sid, sc, policy_name):
def __init__(self, sid, sc):
self.sid = sid
self.policy_name = policy_name
self.path = sc.dest
self.shortcut = sc.to_json()
def update_fields(self):
fields = dict()
fields['policy_name'] = self.policy_name
fields['path'] = self.path
fields['shortcut'] = self.shortcut
return fields
class info_entry(object):
def __init__(self, name, value):
self.name = name
self.value = value
def update_fields(self):
fields = dict()
fields['value'] = self.value
return fields
class printer_entry(object):
'''
Object mapping representing Windows printer of some type.
'''
def __init__(self, sid, pobj, policy_name):
def __init__(self, sid, pobj):
self.sid = sid
self.policy_name = policy_name
self.name = pobj.name
self.printer = pobj.to_json()
def update_fields(self):
fields = dict()
fields['policy_name'] = self.policy_name
fields['name'] = self.name
fields['printer'] = self.printer.to_json()
return fields
class drive_entry(object):
'''
Object mapping representing Samba share bound to drive letter
'''
def __init__(self, sid, dobj, policy_name):
self.sid = sid
self.policy_name = policy_name
self.login = dobj.login
self.password = dobj.password
self.dir = dobj.dir
self.path = dobj.path
def update_fields(self):
fields = dict()
fields['policy_name'] = self.policy_name
fields['login'] = self.login
fields['password'] = self.password
fields['dir'] = self.dir
fields['path'] = self.path
return fields
class folder_entry(object):
'''
Object mapping representing file system directory
'''
def __init__(self, sid, fobj, policy_name):
self.sid = sid
self.policy_name = policy_name
self.path = fobj.path
self.action = fobj.action.value
self.delete_folder = str(fobj.delete_folder)
self.delete_sub_folder = str(fobj.delete_sub_folder)
self.delete_files = str(fobj.delete_files)
def update_fields(self):
'''
Return list of fields to update
'''
fields = dict()
fields['policy_name'] = self.policy_name
fields['action'] = self.action
fields['delete_folder'] = self.delete_folder
fields['delete_sub_folder'] = self.delete_sub_folder
fields['delete_files'] = self.delete_files
return fields

View File

@ -42,8 +42,6 @@ from .record_types import (
, ad_shortcut
, info_entry
, printer_entry
, drive_entry
, folder_entry
)
class sqlite_registry(registry):
@ -63,69 +61,40 @@ class sqlite_registry(registry):
Column('value', String(65536))
)
self.__hklm = Table(
'HKLM'
, self.__metadata
, Column('id', Integer, primary_key=True)
, Column('hive_key', String(65536), unique=True)
, Column('policy_name', String)
, Column('type', Integer)
, Column('data', String)
'HKLM',
self.__metadata,
Column('id', Integer, primary_key=True),
Column('hive_key', String(65536), unique=True),
Column('type', Integer),
Column('data', String)
)
self.__hkcu = Table(
'HKCU'
, self.__metadata
, Column('id', Integer, primary_key=True)
, Column('sid', String)
, Column('hive_key', String(65536))
, Column('policy_name', String)
, Column('type', Integer)
, Column('data', String)
, UniqueConstraint('sid', 'hive_key')
'HKCU',
self.__metadata,
Column('id', Integer, primary_key=True),
Column('sid', String),
Column('hive_key', String(65536)),
Column('type', Integer),
Column('data', String),
UniqueConstraint('sid', 'hive_key')
)
self.__shortcuts = Table(
'Shortcuts'
, self.__metadata
, Column('id', Integer, primary_key=True)
, Column('sid', String)
, Column('path', String)
, Column('policy_name', String)
, Column('shortcut', String)
, UniqueConstraint('sid', 'path')
'Shortcuts',
self.__metadata,
Column('id', Integer, primary_key=True),
Column('sid', String),
Column('path', String),
Column('shortcut', String),
UniqueConstraint('sid', 'path')
)
self.__printers = Table(
'Printers'
, self.__metadata
, Column('id', Integer, primary_key=True)
, Column('sid', String)
, Column('name', String)
, Column('policy_name', String)
, Column('printer', String)
, UniqueConstraint('sid', 'name')
)
self.__drives = Table(
'Drives'
, self.__metadata
, Column('id', Integer, primary_key=True)
, Column('sid', String)
, Column('login', String)
, Column('password', String)
, Column('dir', String)
, Column('policy_name', String)
, Column('path', String)
, UniqueConstraint('sid', 'dir')
)
self.__folders = Table(
'Folders'
, self.__metadata
, Column('id', Integer, primary_key=True)
, Column('sid', String)
, Column('path', String)
, Column('policy_name', String)
, Column('action', String)
, Column('delete_folder', String)
, Column('delete_sub_folder', String)
, Column('delete_files', String)
, UniqueConstraint('sid', 'path')
'Printers',
self.__metadata,
Column('id', Integer, primary_key=True),
Column('sid', String),
Column('name', String),
Column('printer', String),
UniqueConstraint('sid', 'name')
)
self.__metadata.create_all(self.db_cnt)
Session = sessionmaker(bind=self.db_cnt)
@ -136,8 +105,6 @@ class sqlite_registry(registry):
mapper(samba_hkcu_preg, self.__hkcu)
mapper(ad_shortcut, self.__shortcuts)
mapper(printer_entry, self.__printers)
mapper(drive_entry, self.__drives)
mapper(folder_entry, self.__folders)
except:
pass
#logging.error('Error creating mapper')
@ -154,69 +121,62 @@ class sqlite_registry(registry):
try:
self._add(row)
except:
update_obj = dict({ 'value': row.value })
(self
.db_session.query(info_entry)
.filter(info_entry.name == row.name)
.update(row.update_fields()))
.update(update_obj))
self.db_session.commit()
def _hklm_upsert(self, row):
try:
self._add(row)
except:
update_obj = dict({'type': row.type, 'data': row.data })
(self
.db_session
.query(samba_preg)
.filter(samba_preg.hive_key == row.hive_key)
.update(row.update_fields()))
.update(update_obj))
self.db_session.commit()
def _hkcu_upsert(self, row):
try:
self._add(row)
except Exception as exc:
except:
update_obj = dict({'type': row.type, 'data': row.data })
(self
.db_session
.query(samba_hkcu_preg)
.query(samba_preg)
.filter(samba_hkcu_preg.sid == row.sid)
.filter(samba_hkcu_preg.hive_key == row.hive_key)
.update(row.update_fields()))
.update(update_obj))
self.db_session.commit()
def _shortcut_upsert(self, row):
try:
self._add(row)
except:
update_obj = dict({ 'shortcut': row.shortcut })
(self
.db_session
.query(ad_shortcut)
.filter(ad_shortcut.sid == row.sid)
.filter(ad_shortcut.path == row.path)
.update(row.update_fields()))
.update(update_obj))
self.db_session.commit()
def _printer_upsert(self, row):
try:
self._add(row)
except:
update_obj = dict({ 'printer': row.printer })
(self
.db_session
.query(printer_entry)
.filter(printer_entry.sid == row.sid)
.filter(printer_entry.name == row.name)
.update(row.update_fields()))
self.db_session.commit()
def _drive_upsert(self, row):
try:
self._add(row)
except:
(self
.db_session
.query(drive_entry)
.filter(drive_entry.sid == row.sid)
.filter(drive_entry.dir == row.dir)
.update(row.update_fields()))
.update(update_obj))
self.db_session.commit()
def set_info(self, name, value):
@ -224,99 +184,70 @@ class sqlite_registry(registry):
logging.debug(slogm('Setting info {}:{}'.format(name, value)))
self._info_upsert(ientry)
def add_hklm_entry(self, preg_entry, policy_name):
def add_hklm_entry(self, preg_entry):
'''
Write PReg entry to HKEY_LOCAL_MACHINE
'''
pentry = samba_preg(preg_entry, policy_name)
pentry = samba_preg(preg_entry)
if not pentry.hive_key.rpartition('\\')[2].startswith('**'):
self._hklm_upsert(pentry)
else:
logging.warning(slogm('Skipping branch deletion key: {}'.format(pentry.hive_key)))
def add_hkcu_entry(self, preg_entry, sid, policy_name):
def add_hkcu_entry(self, preg_entry, sid):
'''
Write PReg entry to HKEY_CURRENT_USER
'''
hkcu_pentry = samba_hkcu_preg(sid, preg_entry, policy_name)
hkcu_pentry = samba_hkcu_preg(sid, preg_entry)
if not hkcu_pentry.hive_key.rpartition('\\')[2].startswith('**'):
logging.debug(slogm('Adding HKCU entry for {}'.format(sid)))
self._hkcu_upsert(hkcu_pentry)
else:
logging.warning(slogm('Skipping branch deletion key: {}'.format(hkcu_pentry.hive_key)))
def add_shortcut(self, sid, sc_obj, policy_name):
def add_shortcut(self, sid, sc_obj):
'''
Store shortcut information in the database
'''
sc_entry = ad_shortcut(sid, sc_obj, policy_name)
sc_entry = ad_shortcut(sid, sc_obj)
logging.debug(slogm('Saving info about {} link for {}'.format(sc_entry.path, sid)))
self._shortcut_upsert(sc_entry)
def add_printer(self, sid, pobj, policy_name):
def add_printer(self, sid, pobj):
'''
Store printer configuration in the database
'''
prn_entry = printer_entry(sid, pobj, policy_name)
prn_entry = printer_entry(sid, pobj)
logging.debug(slogm('Saving info about printer {} for {}'.format(prn_entry.name, sid)))
self._printer_upsert(prn_entry)
def add_drive(self, sid, dobj, policy_name):
drv_entry = drive_entry(sid, dobj, policy_name)
logging.debug(slogm('Saving info about drive mapping {} for {}'.format(drv_entry.path, sid)))
self._drive_upsert(drv_entry)
def add_folder(self, sid, fobj, policy_name):
fld_entry = folder_entry(sid, fobj, policy_name)
logstring = 'Saving info about folder {} for {}'.format(fld_entry.path, sid)
logging.debug(logstring)
try:
self._add(fld_entry)
except Exception as exc:
(self
._filter_sid_obj(folder_entry, sid)
.filter(folder_entry.path == fld_entry.path)
.update(fld_entry.update_fields()))
self.db_session.commit()
def _filter_sid_obj(self, row_object, sid):
def get_shortcuts(self, sid):
res = (self
.db_session
.query(row_object)
.filter(row_object.sid == sid))
return res
def _filter_sid_list(self, row_object, sid):
res = (self
.db_session
.query(row_object)
.filter(row_object.sid == sid)
.query(ad_shortcut)
.filter(ad_shortcut.sid == sid)
.all())
return res
def get_shortcuts(self, sid):
return self._filter_sid_list(ad_shortcut, sid)
def get_printers(self, sid):
return self._filter_sid_list(printer_entry, sid)
def get_drives(self, sid):
return self._filter_sid_list(drive_entry, sid)
def get_folders(self, sid):
return self._filter_sid_list(folder_entry, sid)
res = (self
.db_session
.query(printer_entry)
.filter(printer_entry.sid == sid)
.all())
return res
def get_hkcu_entry(self, sid, hive_key):
res = (self
.db_session
.query(samba_hkcu_preg)
.query(samba_preg)
.filter(samba_hkcu_preg.sid == sid)
.filter(samba_hkcu_preg.hive_key == hive_key)
.first())
# Try to get the value from machine SID as a default if no option is set.
if not res:
machine_sid = self.get_info('machine_sid')
res = self.db_session.query(samba_hkcu_preg).filter(samba_hkcu_preg.sid == machine_sid).filter(samba_hkcu_preg.hive_key == hive_key).first()
res = self.db_session.query(samba_preg).filter(samba_hkcu_preg.sid == machine_sid).filter(samba_hkcu_preg.hive_key == hive_key).first()
return res
def filter_hkcu_entries(self, sid, startswith):
@ -351,16 +282,31 @@ class sqlite_registry(registry):
return res
def wipe_user(self, sid):
self._wipe_sid(samba_hkcu_preg, sid)
self._wipe_sid(ad_shortcut, sid)
self._wipe_sid(printer_entry, sid)
self._wipe_sid(drive_entry, sid)
self.wipe_hkcu(sid)
self.wipe_shortcuts(sid)
self.wipe_printers(sid)
def _wipe_sid(self, row_object, sid):
def wipe_shortcuts(self, sid):
(self
.db_session
.query(row_object)
.filter(row_object.sid == sid)
.query(ad_shortcut)
.filter(ad_shortcut.sid == sid)
.delete())
self.db_session.commit()
def wipe_printers(self, sid):
(self
.db_session
.query(printer_entry)
.filter(printer_entry.sid == sid)
.delete())
self.db_session.commit()
def wipe_hkcu(self, sid):
(self
.db_session
.query(samba_hkcu_preg)
.filter(samba_hkcu_preg.sid == sid)
.delete())
self.db_session.commit()

View File

@ -1,29 +0,0 @@
{#
# GPOA - GPO Applier for Linux
#
# Copyright (C) 2019-2020 BaseALT Ltd.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#}
{% if Deny_All == '1' %}
polkit.addRule(function (action, subject) {
if ((action.id == "org.freedesktop.udisks2.filesystem-mount" ||
action.id == "org.freedesktop.udisks2.filesystem-mount-system" ||
action.id == "org.freedesktop.udisks2.filesystem-mount-other-seat") &&
subject.user == "{{User}}" ) {
return polkit.Result.NO;
}
});
{% endif %}

View File

@ -1,20 +0,0 @@
{#
# GPOA - GPO Applier for Linux
#
# Copyright (C) 2019-2020 BaseALT Ltd.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#}
{{ home_dir }}/net {{ mount_file }} -t 120

View File

@ -1,25 +0,0 @@
{#
# GPOA - GPO Applier for Linux
#
# Copyright (C) 2019-2020 BaseALT Ltd.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#}
{% if login %}
username={{ login }}
{% endif %}
{% if password %}
password={{ password }}
{% endif %}

View File

@ -1,21 +0,0 @@
{#
# GPOA - GPO Applier for Linux
#
# Copyright (C) 2019-2020 BaseALT Ltd.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#}
{%- for drv in drives %}
{{ drv.dir }} -fstype=cifs,cruid=$USER,sec=krb5,noperm :{{ drv.path }}
{% endfor %}

View File

@ -1,39 +0,0 @@
#
# GPOA - GPO Applier for Linux
#
# Copyright (C) 2019-2020 BaseALT Ltd.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import unittest
from frontend.appliers.rpm import rpm
class PackageTestCase(unittest.TestCase):
'''
Semi-integrational tests for packages installation/removing
'''
def test_package_not_exist(self):
packages_for_install = 'dummy1 dummy2'
packages_for_remove = 'dummy3'
test_rpm = rpm(packages_for_install, packages_for_remove)
test_rpm.apply()
def test_install_remove_same_package(self):
packages_for_install = 'gotop'
packages_for_remove = 'gotop'
test_rpm = rpm(packages_for_install, packages_for_remove)
test_rpm.apply()

View File

@ -1,42 +0,0 @@
#
# GPOA - GPO Applier for Linux
#
# Copyright (C) 2019-2020 BaseALT Ltd.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import unittest
import unittest.mock
import os
import util.paths
import json
class GptDrivesTestCase(unittest.TestCase):
@unittest.mock.patch('util.paths.cache_dir')
def test_drive_reader(self, cdir_mock):
'''
Test functionality to read objects from Shortcuts.xml
'''
cdir_mock.return_value = '/var/cache/gpupdate'
import gpt.drives
testdata_path = '{}/test/gpt/data/Drives.xml'.format(os.getcwd())
drvs = gpt.drives.read_drives(testdata_path)
json_obj = json.loads(drvs[0].to_json())
self.assertIsNotNone(json_obj['drive'])

View File

@ -43,6 +43,7 @@ def set_loglevel(loglevel_num=None):
log_level = 10 * log_num
print('Setting log level to {}'.format(loglevels[log_num]))
logging.basicConfig(format=format_message)
logger = logging.getLogger()
logger.setLevel(log_level)

View File

@ -16,7 +16,6 @@
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import os
import logging
import subprocess
@ -24,47 +23,13 @@ from .util import get_machine_name
from .logging import slogm
def machine_kinit(cache_name=None):
def machine_kinit():
'''
Perform kinit with machine credentials
'''
host = get_machine_name()
os.environ['KRB5CCNAME'] = 'FILE:{}'.format(cache_name)
kinit_cmd = ['kinit', '-k', host]
if cache_name:
kinit_cmd.extend(['-c', cache_name])
proc = subprocess.Popen(kinit_cmd)
proc.wait()
result = False
if 0 == proc.returncode:
result = True
if result:
result = check_krb_ticket()
return result
def machine_kdestroy(cache_name=None):
'''
Perform kdestroy for machine credentials
'''
host = get_machine_name()
kdestroy_cmd = ['kdestroy']
if cache_name:
kdestroy_cmd.extend(['-c', cache_name])
proc = subprocess.Popen(kdestroy_cmd, stderr=subprocess.DEVNULL)
proc.wait()
if cache_name and os.path.exists(cache_name):
os.unlink(cache_name)
elif 'KRB5CCNAME' in os.environ:
path = os.environ['KRB5CCNAME'][5:]
if os.path.exists(path):
os.unlink(path)
subprocess.call(['kinit', '-k', host])
return check_krb_ticket()
def check_krb_ticket():

View File

@ -59,3 +59,31 @@ def local_policy_cache():
return lpcache
def backend_module_dir():
backend_dir = '/usr/lib/gpoa/backend'
return pathlib.Path(backend_dir)
def frontend_module_dir():
frontend_dir = '/usr/lib/gpoa/frontend'
return pathlib.Path(frontend_dir)
def storage_module_dir():
storage_dir = '/usr/lib/gpoa/storage'
return pathlib.Path(storage_dir)
def pre_backend_plugin_dir():
pre_backend_dir = '/usr/lib/gpoa/backend_pre'
return pathlib.Path(pre_backend_dir)
def post_backend_plugin_dir():
post_backend_dir = '/usr/lib/gpoa/backend_post'
return pathlib.Path(post_backend_dir)
def pre_frontend_plugin_dir():
pre_forntend_dir = '/usr/lib/gpoa/frontend_pre'
return pathlib.Path(pre_frontend_dir)
def post_frontend_plugin_dir():
post_frontend_dir = '/usr/lib/gpoa/frontend_post'
return pathlib.Path(post_frontend_dir)

View File

@ -59,12 +59,10 @@ def load_pol_preg(polfile):
with open(polfile, 'rb') as f:
data = f.read()
logging.debug('PReg length: {}'.format(len(data)))
gpparser.parse(data)
#print(gpparser.pol_file.__ndr_print__())
pentries = preg2entries(gpparser.pol_file)
return pentries
return gpparser.pol_file
def preg_keymap(preg):
@ -78,37 +76,29 @@ def preg_keymap(preg):
return keymap
def merge_polfile(preg, sid=None, reg_name='registry', reg_path=None, policy_name='Unknown'):
def merge_polfile(preg, sid=None, reg_name='registry', reg_path=None):
pregfile = load_preg(preg)
logging.info(slogm('Loaded PReg {}'.format(preg)))
storage = registry_factory(reg_name, reg_path)
for entry in pregfile.entries:
if not sid:
storage.add_hklm_entry(entry, policy_name)
storage.add_hklm_entry(entry)
else:
storage.add_hkcu_entry(entry, sid, policy_name)
storage.add_hkcu_entry(entry, sid)
class entry:
def __init__(self, e_keyname, e_valuename, e_type, e_data):
logging.debug(slogm('Entry init e_keyname {}'.format(e_keyname)))
logging.debug(slogm('Entry init e_valuename {}'.format(e_valuename)))
logging.debug(slogm('Entry init e_type {}'.format(e_type)))
logging.debug(slogm('Entry init e_data {}'.format(e_data)))
self.keyname = e_keyname
self.valuename = e_valuename
self.type = e_type
self.data = e_data
class pentries:
def __init__(self):
self.entries = list()
def preg2entries(preg_obj):
entries = pentries()
for elem in preg_obj.entries:
entries = []
for elem in prej_obj.entries:
entry_obj = entry(elem.keyname, elem.valuename, elem.type, elem.data)
entries.entries.append(entry_obj)
entries.append(entry_obj)
return entries

View File

@ -34,11 +34,11 @@ def is_rpm_installed(rpm_name):
return False
class Package:
__install_command = ['/usr/bin/apt-get', '-y', 'install']
__remove_command = ['/usr/bin/apt-get', '-y', 'remove']
__reinstall_command = ['/usr/bin/apt-get', '-y', 'reinstall']
def __init__(self, package_name):
self.__install_command = ['/usr/bin/apt-get', '-y', 'install']
self.__remove_command = ['/usr/bin/apt-get', '-y', 'remove']
self.__reinstall_command = ['/usr/bin/apt-get', '-y', 'reinstall']
self.package_name = package_name
self.for_install = True
@ -102,6 +102,7 @@ def install_rpm(rpm_name):
'''
Install single RPM
'''
update()
rpm = Package(rpm_name)
return rpm.install()

View File

@ -16,20 +16,16 @@
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import os
import signal
from sys import exit
from .arguments import ExitCodeUpdater
from .kerberos import machine_kdestroy
default_handler = signal.getsignal(signal.SIGINT)
def signal_handler(sig_number, frame):
print('Received signal, exiting gracefully')
# Ignore extra signals
signal.signal(sig_number, signal.SIG_IGN)
# Kerberos cache cleanup on interrupt
machine_kdestroy()
os._exit(ExitCodeUpdater.EXIT_SIGINT)
print('Received signal, exiting gracefully')
exit(ExitCodeUpdater.EXIT_SIGINT)

View File

@ -99,14 +99,11 @@ def with_privileges(username, func):
# We need to catch exception in order to be able to restore
# privileges later in this function
out = None
try:
out = func()
func()
except Exception as exc:
logging.debug(slogm(exc))
# Restore privileges
set_privileges('root', current_uid, 0, current_groups)
return out

View File

@ -29,9 +29,7 @@ import samba.gpo
import pysss_nss_idmap
from storage import cache_factory
from .xdg import (
xdg_get_desktop
)
from .xdg import get_user_dir
from .util import get_homedir
from .logging import slogm
from .samba import smbopts
@ -66,9 +64,8 @@ class smbcreds (smbopts):
self.selected_dc = dc_fqdn
else:
self.selected_dc = samba_dc
except Exception as exc:
except:
logging.error(slogm('Unable to determine DC hostname'))
raise exc
return self.selected_dc
@ -83,9 +80,8 @@ class smbcreds (smbopts):
res = netcmd_get_domain_infos_via_cldap(self.lp, None, self.selected_dc)
dns_domainname = res.dns_domain
logging.info(slogm('Found domain via CLDAP: {}'.format(dns_domainname)))
except Exception as exc:
except:
logging.error(slogm('Unable to retrieve domain name via CLDAP query'))
raise exc
return dns_domainname
@ -122,7 +118,6 @@ class smbcreds (smbopts):
logging.error(
slogm('Unable to refresh GPO list for {} from {}'.format(
username, self.selected_dc)))
raise exc
return gpos
@ -179,15 +174,17 @@ def expand_windows_var(text, username=None):
Scan the line for percent-encoded variables and expand them.
'''
variables = dict()
variables['HOME'] = '/etc/skel'
variables['HOME'] = '/'
variables['SystemRoot'] = '/'
variables['StartMenuDir'] = '/usr/share/applications'
variables['SystemDrive'] = '/'
variables['DesktopDir'] = xdg_get_desktop(username, variables['HOME'])
if username:
variables['HOME'] = get_homedir(username)
variables['DesktopDir'] = get_user_dir(
'DESKTOP', os.path.join(variables['HOME'], 'Desktop'))
variables['StartMenuDir'] = os.path.join(
variables['HOME'], '.local', 'share', 'applications')

View File

@ -17,20 +17,21 @@
# along with this program. If not, see <http://www.gnu.org/licenses/>.
from configparser import RawConfigParser, DEFAULTSECT
import os
from xdg.BaseDirectory import xdg_config_home
from .util import get_homedir
from .logging import slogm
def xdg_get_desktop(username, homedir = None):
if username:
homedir = get_homedir(username)
if not homedir:
logging.warning(
slogm('Error for get XDG_DESKTOP_DIR for unknown user with unknown homedir'))
raise "Error for get XDG_DESKTOP_DIR for unknown user with unknown homedir"
stream = os.popen('export HOME={}; xdg-user-dir DESKTOP'.format(homedir))
output = stream.read()[:-1]
return output
def get_user_dir(dir_name, default=None):
'''
Get path to XDG's user directory
'''
config = RawConfigParser(allow_no_value=True)
userdirs_path = os.path.join(xdg_config_home, 'user-dirs.dirs')
try:
with open(userdirs_path, 'r') as f:
config.read_string('[DEFAULT]\n' + f.read())
return config.get(DEFAULTSECT, 'XDG_DESKTOP_DIR')
except Exception as exc:
return default

View File

@ -1,7 +1,7 @@
%define _unpackaged_files_terminate_build 1
Name: gpupdate
Version: 0.7.0
Version: 0.5.0
Release: alt1
Summary: GPT applier
@ -11,15 +11,15 @@ Url: https://github.com/altlinux/gpupdate
BuildArch: noarch
Requires: control
Requires: local-policy >= 0.1.0
BuildRequires: rpm-build-python3
Requires: python3-module-rpm
Requires: python3-module-dbus
Requires: oddjob-%name >= 0.2.0
Requires: libnss-role >= 0.5.0
Requires: local-policy >= 0.4.0
Requires: pam-config >= 1.9.0
Requires: autofs
Requires: local-policy >= 0.3.0
Requires: pam-config >= 1.8
# This is needed by shortcuts_applier
Requires: desktop-file-utils
@ -41,8 +41,7 @@ cp -r gpoa \
mkdir -p \
%buildroot%_bindir/ \
%buildroot%_sbindir/ \
%buildroot%_cachedir/%name/ \
%buildroot%_cachedir/%name/creds
%buildroot%_cachedir/%name/
ln -s %python3_sitelibdir/gpoa/gpoa \
%buildroot%_sbindir/gpoa
@ -55,14 +54,11 @@ mkdir -p %buildroot%_datadir/%name
mv %buildroot%python3_sitelibdir/gpoa/templates \
%buildroot%_datadir/%name/
mkdir -p %buildroot%_sysconfdir/%name
touch %buildroot%_sysconfdir/%name/environment
install -Dm0644 dist/%name.service %buildroot%_unitdir/%name.service
install -Dm0644 dist/%name.service %buildroot/usr/lib/systemd/user/%name-user.service
install -Dm0644 dist/%name.service %{buildroot}/usr/lib/systemd/user/%{name}-user.service
install -Dm0644 dist/system-policy-%name %buildroot%_sysconfdir/pam.d/system-policy-%name
install -Dm0644 doc/gpoa.1 %buildroot/%_man1dir/gpoa.1
install -Dm0644 doc/gpupdate.1 %buildroot/%_man1dir/gpupdate.1
install -Dm0644 doc/gpoa.1 %buildroot/%{_man1dir}/gpoa.1
install -Dm0644 doc/gpupdate.1 %buildroot/%{_man1dir}/gpupdate.1
%preun
%preun_service gpupdate
@ -82,28 +78,10 @@ install -Dm0644 doc/gpupdate.1 %buildroot/%_man1dir/gpupdate.1
%_man1dir/gpoa.1.*
%_man1dir/gpupdate.1.*
/usr/lib/systemd/user/%name-user.service
%dir %_sysconfdir/%name
%config(noreplace) %_sysconfdir/%name/environment
%config(noreplace) %_sysconfdir/pam.d/system-policy-%name
%dir %attr(0700, root, root) %_cachedir/%name
%dir %attr(0700, root, root) %_cachedir/%name/creds
%exclude %python3_sitelibdir/gpoa/.pylintrc
%exclude %python3_sitelibdir/gpoa/.prospector.yaml
%exclude %python3_sitelibdir/gpoa/Makefile
%exclude %python3_sitelibdir/gpoa/test
%_sysconfdir/pam.d/system-policy-%name
%dir %_cachedir/%name
%changelog
* Wed Jul 01 2020 Evgeny Sinelnikov <sin@altlinux.org> 0.7.0-alt1
- Add multiple appliers, part of which marks as experimental yet
* Wed May 20 2020 Evgeny Sinelnikov <sin@altlinux.org> 0.6.0-alt2
- Update system-policy PAM-rules (clean system-auth-common, add pam_env support)
- Add dependency to pam-config later than 1.9.0 release
* Fri May 15 2020 Evgeny Sinelnikov <sin@altlinux.org> 0.6.0-alt1
- Add drives policy for shared folders with cifs applier using autofs
- Update shortcuts policy with xdg-users-dir support for DESKTOP
* Wed Apr 22 2020 Evgeny Sinelnikov <sin@altlinux.org> 0.5.0-alt1
- Update samba format: local.xml -> Machine/Registry.pol.xml
- Add support of ad-domain-controller local policy profile

2
wiki

Submodule wiki updated: 79e4ba7f58...8cb284e0f7