1
0
mirror of https://github.com/altlinux/gpupdate.git synced 2025-11-08 04:23:54 +03:00

Compare commits

...

20 Commits

Author SHA1 Message Date
8292aa69b3 Frontend expanded with binary module support 2020-04-28 00:46:05 +04:00
aa03e6dfa4 Added paths to directories with modules 2020-04-28 00:45:20 +04:00
NIR
0328afa788 Merge pull request #55 from altlinux/rpm_install_improvement
RPM helper improved
2020-04-28 00:10:27 +04:00
edead53d7e RPM test improved accordingly 2020-04-28 00:07:04 +04:00
db1a82f930 util.rpm: Removed strange and ugly functions and introduced simple functions instead 2020-04-28 00:02:27 +04:00
fd7cfe2b83 util.rpm: Package.reinstall() command added 2020-04-28 00:01:46 +04:00
b3694a8b4d RPM module improved with corner case checks 2020-04-24 15:19:28 +04:00
060f125258 Half-assed test for RPM 2020-04-24 15:18:51 +04:00
1f4960cb48 RPM helper improved 2020-04-23 23:10:08 +04:00
Evgeny Sinelnikov
a0c5b1a2b1 Merge pull request #45 from altlinux/preg_key_deletion
Skip branch deletion keys
2020-04-23 01:14:16 +04:00
41e950172b Created directory for temporary testing files 2020-04-21 22:17:35 +04:00
91da6ff912 test.storage.test_preg_special_values - half-assed unit test for starred PReg value names 2020-04-21 20:47:21 +04:00
ae7d1ed0dc util.merge_polfile(): accept registry name and path to ease testing 2020-04-21 20:46:32 +04:00
ce0e3f1901 storage.registry_factory(): accept target registry directory 2020-04-21 20:45:51 +04:00
edfca0e31d storage.sqlite_registry: Accept target registry directory 2020-04-21 20:45:01 +04:00
77da991c6f storage.sqlite_registry: use record types from separate module 2020-04-21 20:42:43 +04:00
d61583e704 Storage testing module created 2020-04-21 20:41:23 +04:00
1d31f17bb3 Records from sqlite_registry moved into separate module to ease testing and adding storages 2020-04-21 20:40:33 +04:00
3276be53cc Testing data for storage added 2020-04-21 20:39:45 +04:00
442e7986d5 Skip branch deletion keys 2020-04-21 19:08:03 +04:00
12 changed files with 338 additions and 58 deletions

View File

@@ -41,8 +41,13 @@ from util.users import (
with_privileges
)
from util.logging import slogm
from util.paths import (
frontend_module_dir
)
import logging
import os
import subprocess
def determine_username(username=None):
'''
@@ -72,6 +77,12 @@ class frontend_manager:
'''
def __init__(self, username, is_machine):
frontend_module_files = frontend_module_dir().glob('**/*')
self.frontend_module_binaries = list()
for exe in frontend_module_files:
if (exe.is_file() and os.access(exe.resolve(), os.X_OK)):
self.frontend_module_binaries.append(exe)
self.storage = registry_factory('registry')
self.username = determine_username(username)
self.is_machine = is_machine
@@ -105,6 +116,10 @@ class frontend_manager:
logging.error('Not sufficient privileges to run machine appliers')
return
logging.debug(slogm('Applying computer part of settings'))
for exe in self.frontend_module_binaries:
subprocess.check_call([exe.resolve()])
self.machine_appliers['systemd'].apply()
self.machine_appliers['control'].apply()
self.machine_appliers['polkit'].apply()

View File

@@ -22,6 +22,6 @@ from .sqlite_cache import sqlite_cache
def cache_factory(cache_name):
return sqlite_cache(cache_name)
def registry_factory(registry_name):
return sqlite_registry(registry_name)
def registry_factory(registry_name='registry', registry_dir=None):
return sqlite_registry(registry_name, registry_dir)

View File

@@ -0,0 +1,59 @@
#
# GPOA - GPO Applier for Linux
#
# Copyright (C) 2019-2020 BaseALT Ltd.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
class samba_preg(object):
'''
Object mapping representing HKLM entry (registry key without SID)
'''
def __init__(self, preg_obj):
self.hive_key = '{}\\{}'.format(preg_obj.keyname, preg_obj.valuename)
self.type = preg_obj.type
self.data = preg_obj.data
class samba_hkcu_preg(object):
'''
Object mapping representing HKCU entry (registry key with SID)
'''
def __init__(self, sid, preg_obj):
self.sid = sid
self.hive_key = '{}\\{}'.format(preg_obj.keyname, preg_obj.valuename)
self.type = preg_obj.type
self.data = preg_obj.data
class ad_shortcut(object):
'''
Object mapping representing Windows shortcut.
'''
def __init__(self, sid, sc):
self.sid = sid
self.path = sc.dest
self.shortcut = sc.to_json()
class info_entry(object):
def __init__(self, name, value):
self.name = name
self.value = value
class printer_entry(object):
'''
Object mapping representing Windows printer of some type.
'''
def __init__(self, sid, pobj):
self.sid = sid
self.name = pobj.name
self.printer = pobj.to_json()

View File

@@ -36,53 +36,21 @@ from sqlalchemy.orm import (
from util.logging import slogm
from util.paths import cache_dir
from .registry import registry
class samba_preg(object):
'''
Object mapping representing HKLM entry (registry key without SID)
'''
def __init__(self, preg_obj):
self.hive_key = '{}\\{}'.format(preg_obj.keyname, preg_obj.valuename)
self.type = preg_obj.type
self.data = preg_obj.data
class samba_hkcu_preg(object):
'''
Object mapping representing HKCU entry (registry key with SID)
'''
def __init__(self, sid, preg_obj):
self.sid = sid
self.hive_key = '{}\\{}'.format(preg_obj.keyname, preg_obj.valuename)
self.type = preg_obj.type
self.data = preg_obj.data
class ad_shortcut(object):
'''
Object mapping representing Windows shortcut.
'''
def __init__(self, sid, sc):
self.sid = sid
self.path = sc.dest
self.shortcut = sc.to_json()
class info_entry(object):
def __init__(self, name, value):
self.name = name
self.value = value
class printer_entry(object):
'''
Object mapping representing Windows printer of some type.
'''
def __init__(self, sid, pobj):
self.sid = sid
self.name = pobj.name
self.printer = pobj.to_json()
from .record_types import (
samba_preg
, samba_hkcu_preg
, ad_shortcut
, info_entry
, printer_entry
)
class sqlite_registry(registry):
def __init__(self, db_name):
def __init__(self, db_name, registry_cache_dir=None):
self.db_name = db_name
self.db_path = os.path.join('sqlite:///{}/{}.sqlite'.format(cache_dir(), self.db_name))
cdir = registry_cache_dir
if cdir == None:
cdir = cache_dir()
self.db_path = os.path.join('sqlite:///{}/{}.sqlite'.format(cdir, self.db_name))
self.db_cnt = create_engine(self.db_path, echo=False)
self.__metadata = MetaData(self.db_cnt)
self.__info = Table(
@@ -221,15 +189,21 @@ class sqlite_registry(registry):
Write PReg entry to HKEY_LOCAL_MACHINE
'''
pentry = samba_preg(preg_entry)
self._hklm_upsert(pentry)
if not pentry.hive_key.rpartition('\\')[2].startswith('**'):
self._hklm_upsert(pentry)
else:
logging.warning(slogm('Skipping branch deletion key: {}'.format(pentry.hive_key)))
def add_hkcu_entry(self, preg_entry, sid):
'''
Write PReg entry to HKEY_CURRENT_USER
'''
hkcu_pentry = samba_hkcu_preg(sid, preg_entry)
logging.debug(slogm('Adding HKCU entry for {}'.format(sid)))
self._hkcu_upsert(hkcu_pentry)
if not hkcu_pentry.hive_key.rpartition('\\')[2].startswith('**'):
logging.debug(slogm('Adding HKCU entry for {}'.format(sid)))
self._hkcu_upsert(hkcu_pentry)
else:
logging.warning(slogm('Skipping branch deletion key: {}'.format(hkcu_pentry.hive_key)))
def add_shortcut(self, sid, sc_obj):
'''

View File

@@ -0,0 +1,18 @@
#
# GPOA - GPO Applier for Linux
#
# Copyright (C) 2019-2020 BaseALT Ltd.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.

View File

@@ -0,0 +1,18 @@
<?xml version="1.0" encoding="utf-8"?>
<PolFile num_entries="3" signature="PReg" version="1">
<Entry type="1" type_name="REG_SZ">
<Key>Software\BaseALT\Policies\Control</Key>
<ValueName>**del.cups</ValueName>
<Value> </Value>
</Entry>
<Entry type="1" type_name="REG_SZ">
<Key>Software\BaseALT\Policies\Control</Key>
<ValueName>**del.postfix</ValueName>
<Value> </Value>
</Entry>
<Entry type="1" type_name="REG_SZ">
<Key>Software\BaseALT\Policies\Control</Key>
<ValueName>**del.postqueue</ValueName>
<Value> </Value>
</Entry>
</PolFile>

View File

@@ -0,0 +1,47 @@
#
# GPOA - GPO Applier for Linux
#
# Copyright (C) 2019-2020 BaseALT Ltd.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import unittest
import unittest.mock
import os
class StorageTestCase(unittest.TestCase):
preg_xml_path = '{}/test/storage/data/Registry.pol.xml'.format(os.getcwd())
reg_name = 'registry'
# Run destructive storage tests in current directory
reg_path = '{}/test/tmp'.format(os.getcwd())
@unittest.mock.patch('util.paths.cache_dir')
def test_add_hklm_entry(self, cdir_mock):
test_sid = None
from util.preg import merge_polfile
merge_polfile(self.preg_xml_path, test_sid, self.reg_name, self.reg_path)
@unittest.mock.patch('util.paths.cache_dir')
def test_add_hkcu_entry(self, cdir_mock):
test_sid = 'test_sid'
from util.preg import merge_polfile
merge_polfile(self.preg_xml_path, test_sid, self.reg_name, self.reg_path)

5
gpoa/test/tmp/.gitignore vendored Normal file
View File

@@ -0,0 +1,5 @@
# Ignore everything in this directory
*
# Except this file
!.gitignore

View File

@@ -0,0 +1,37 @@
#
# GPOA - GPO Applier for Linux
#
# Copyright (C) 2019-2020 BaseALT Ltd.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import unittest
from util.rpm import (
install_rpms
, remove_rpms
)
class RPMTestCase(unittest.TestCase):
@unittest.skip('test_install_rpm is not unit test')
def test_install_rpm(self):
test_package_names = ['tortoisehg', 'csync']
install_rpms(test_package_names)
@unittest.skip('test_remove_rpm is not unit test')
def test_remove_rpm(self):
test_package_names = ['tortoisehg', 'csync']
remove_rpms(test_package_names)

View File

@@ -59,3 +59,31 @@ def local_policy_cache():
return lpcache
def backend_module_dir():
backend_dir = '/usr/lib/gpoa/backend'
return pathlib.Path(backend_dir)
def frontend_module_dir():
frontend_dir = '/usr/lib/gpoa/frontend'
return pathlib.Path(frontend_dir)
def storage_module_dir():
storage_dir = '/usr/lib/gpoa/storage'
return pathlib.Path(storage_dir)
def pre_backend_plugin_dir():
pre_backend_dir = '/usr/lib/gpoa/backend_pre'
return pathlib.Path(pre_backend_dir)
def post_backend_plugin_dir():
post_backend_dir = '/usr/lib/gpoa/backend_post'
return pathlib.Path(post_backend_dir)
def pre_frontend_plugin_dir():
pre_forntend_dir = '/usr/lib/gpoa/frontend_pre'
return pathlib.Path(pre_frontend_dir)
def post_frontend_plugin_dir():
post_frontend_dir = '/usr/lib/gpoa/frontend_post'
return pathlib.Path(post_frontend_dir)

View File

@@ -76,11 +76,10 @@ def preg_keymap(preg):
return keymap
def merge_polfile(preg, sid=None):
def merge_polfile(preg, sid=None, reg_name='registry', reg_path=None):
pregfile = load_preg(preg)
logging.info(slogm('Loaded PReg {}'.format(preg)))
key_map = dict()
storage = registry_factory('registry')
storage = registry_factory(reg_name, reg_path)
for entry in pregfile.entries:
if not sid:
storage.add_hklm_entry(entry)

View File

@@ -33,6 +33,64 @@ def is_rpm_installed(rpm_name):
return False
class Package:
__install_command = ['/usr/bin/apt-get', '-y', 'install']
__remove_command = ['/usr/bin/apt-get', '-y', 'remove']
__reinstall_command = ['/usr/bin/apt-get', '-y', 'reinstall']
def __init__(self, package_name):
self.package_name = package_name
self.for_install = True
if package_name.endswith('-'):
self.package_name = package_name[:-1]
self.for_install = False
self.installed = is_rpm_installed(self.package_name)
def mark_for_install(self):
self.for_install = True
def mark_for_removal(self):
self.for_install = False
def is_installed(self):
return self.installed
def is_for_install(self):
return self.for_install
def is_for_removal(self):
return (not self.for_install)
def action(self):
if self.for_install:
if not self.is_installed():
return self.install()
else:
if self.is_installed():
return self.remove()
def install(self):
fullcmd = self.__install_command
fullcmd.append(self.package_name)
return subprocess.check_call(fullcmd)
def reinstall(self):
fullcmd = self.__reinstall_command
fullcmd.append(self.package_name)
return subprocess.check_call(fullcmd)
def remove(self):
fullcmd = self.__remove_command
fullcmd.append(self.package_name)
return subprocess.check_call(fullcmd)
def __repr__(self):
return self.package_name
def __str__(self):
return self.package_name
def update():
'''
@@ -40,18 +98,40 @@ def update():
'''
subprocess.check_call(['/usr/bin/apt-get', 'update'])
def install_rpm(rpm_name):
'''
Install RPM from APT-RPM sources.
Install single RPM
'''
update()
subprocess.check_call(['/usr/bin/apt-get', '-y', 'install', rpm_name])
rpm = Package(rpm_name)
return rpm.install()
def remove_rpm(rpm_name):
'''
Remove RPM from file system.
Remove single RPM
'''
subprocess.check_call(['/usr/bin/apt-get', '-y', 'remove', rpm_name])
rpm = Package(rpm_name)
return rpm.remove()
def install_rpms(rpm_names):
'''
Install set of RPMs sequentially
'''
result = list()
for package in rpm_names:
result.append(install_rpm(package))
return result
def remove_rpms(rpm_names):
'''
Remove set of RPMs requentially
'''
result = list()
for package in rpm_names:
result.append(remove_rpm(package))
return result