1
0
mirror of https://github.com/altlinux/gpupdate.git synced 2025-11-12 04:24:18 +03:00

Compare commits

..

93 Commits

Author SHA1 Message Date
8292aa69b3 Frontend expanded with binary module support 2020-04-28 00:46:05 +04:00
aa03e6dfa4 Added paths to directories with modules 2020-04-28 00:45:20 +04:00
NIR
0328afa788 Merge pull request #55 from altlinux/rpm_install_improvement
RPM helper improved
2020-04-28 00:10:27 +04:00
edead53d7e RPM test improved accordingly 2020-04-28 00:07:04 +04:00
db1a82f930 util.rpm: Removed strange and ugly functions and introduced simple functions instead 2020-04-28 00:02:27 +04:00
fd7cfe2b83 util.rpm: Package.reinstall() command added 2020-04-28 00:01:46 +04:00
b3694a8b4d RPM module improved with corner case checks 2020-04-24 15:19:28 +04:00
060f125258 Half-assed test for RPM 2020-04-24 15:18:51 +04:00
1f4960cb48 RPM helper improved 2020-04-23 23:10:08 +04:00
Evgeny Sinelnikov
a0c5b1a2b1 Merge pull request #45 from altlinux/preg_key_deletion
Skip branch deletion keys
2020-04-23 01:14:16 +04:00
Evgeny Sinelnikov
2f7e6f3a98 0.5.0-alt1
- Update samba format: local.xml -> Machine/Registry.pol.xml
- Add support of ad-domain-controller local policy profile
- Set properly URL of project
2020-04-22 00:54:48 +04:00
Evgeny Sinelnikov
393529f0af Merge pull request #46 from altlinux/proper_dist
Proper dist as installation directory
2020-04-22 00:43:33 +04:00
Evgeny Sinelnikov
ccf73c4fc6 Merge pull request #51 from altlinux/gpupdate_suppress
Suppress extra output of gpupdate-setup
2020-04-22 00:37:51 +04:00
Evgeny Sinelnikov
a7aa12d42d Merge pull request #49 from altlinux/samba_format
Samba format: local.xml -> Machine/Registry.pol.xml
2020-04-22 00:36:15 +04:00
Evgeny Sinelnikov
d12d4c4227 Merge pull request #52 from altlinux/gpoa_fixes_for_server
Bunch of fixes for bugs found while testing GPOA on ALT Server
2020-04-22 00:32:00 +04:00
e8833ddee0 There were two systemctl calls in gpupdate-setup 2020-04-21 23:58:05 +04:00
18e1911bb5 desktop-file-utils is missing from ALT Server by default 2020-04-21 23:56:43 +04:00
fa34dc9e96 python3-module-dbus does not added to dependencies automatically 2020-04-21 23:56:11 +04:00
34ed296546 Set GPOA loglevel to FATAL in gpupdate-setup 2020-04-21 23:41:25 +04:00
Evgeny Sinelnikov
8b63d294d3 gpupdate-setup: fix break symlink recreate during enable 2020-04-21 23:14:10 +04:00
Evgeny Sinelnikov
d38e937e22 gpupdate-setup: add support domain_controller local policy profile 2020-04-21 22:20:14 +04:00
Evgeny Sinelnikov
c70280a964 Get machine local Registry policy in Samba backup format 2020-04-21 22:20:14 +04:00
41e950172b Created directory for temporary testing files 2020-04-21 22:17:35 +04:00
25381e1a04 Updated specfile to install supplementary files from ./dist 2020-04-21 21:01:11 +04:00
f7233c539e Moved system-policy-gpupdate to ./dist 2020-04-21 20:58:49 +04:00
0b3c004d0b Moved gpupdate.service to ./dist 2020-04-21 20:58:20 +04:00
ac37d736cb Moved gpupdate-user.service to ./dist 2020-04-21 20:57:54 +04:00
91da6ff912 test.storage.test_preg_special_values - half-assed unit test for starred PReg value names 2020-04-21 20:47:21 +04:00
ae7d1ed0dc util.merge_polfile(): accept registry name and path to ease testing 2020-04-21 20:46:32 +04:00
ce0e3f1901 storage.registry_factory(): accept target registry directory 2020-04-21 20:45:51 +04:00
edfca0e31d storage.sqlite_registry: Accept target registry directory 2020-04-21 20:45:01 +04:00
77da991c6f storage.sqlite_registry: use record types from separate module 2020-04-21 20:42:43 +04:00
d61583e704 Storage testing module created 2020-04-21 20:41:23 +04:00
1d31f17bb3 Records from sqlite_registry moved into separate module to ease testing and adding storages 2020-04-21 20:40:33 +04:00
3276be53cc Testing data for storage added 2020-04-21 20:39:45 +04:00
442e7986d5 Skip branch deletion keys 2020-04-21 19:08:03 +04:00
Evgeny Sinelnikov
faa0265fd7 Merge pull request #44 from altlinux/gpupdate_setup_fix
Correct systemctl run for global enable gpupdate-user.service
2020-04-21 14:13:10 +04:00
0150e60f3d Correct systemctl call 2020-04-21 12:29:33 +04:00
Evgeny Sinelnikov
7572fa1ed7 0.4.5-alt1
- Add support for control system-policy and requires to new pam-config
2020-04-20 03:13:23 +04:00
Evgeny Sinelnikov
1fa9b67fb2 Correctly enable and disable system-policy for gpupdate 2020-04-20 03:13:10 +04:00
Evgeny Sinelnikov
0be9e4b317 0.4.4-alt1
- Add gpupdate-setup initialization script supported local-policy profiles
2020-04-19 12:45:42 +04:00
Evgeny Sinelnikov
c16312161f Merge pull request #41 from altlinux/gpsetup
Ok, prepare to new prerelease.
2020-04-16 20:49:40 +04:00
Evgeny Sinelnikov
ed9477c0fc gpupdate-setup: adjust get_active_policy() 2020-04-16 15:56:08 +04:00
Evgeny Sinelnikov
75485eeb62 gpupdate-setup: get default localpolicy from /etc/altlinux-release 2020-04-16 06:31:45 +04:00
Evgeny Sinelnikov
633637bee2 gpupdate-setup: add enable and disable actions 2020-04-16 06:29:41 +04:00
Evgeny Sinelnikov
7d46cd69e0 Add version requires to libnss-role and local-policy 2020-04-16 04:33:00 +04:00
Evgeny Sinelnikov
2901f54830 Replace os.system with checked call of subprocess 2020-04-16 04:16:30 +04:00
Evgeny Sinelnikov
8f349a96c7 gpupdate-setup: set status as default action 2020-04-16 04:15:13 +04:00
ebbdf7c033 Fixed symlink update functionality 2020-04-14 16:08:06 +04:00
4e8888086f Added check for /etc/local-policy path for gpupdate 2020-04-14 16:07:32 +04:00
2571e27235 gpsetup 2020-04-03 19:01:38 +04:00
207b7eb029 Minifix for testing policy existence 2020-03-27 15:50:09 +04:00
5668dae81e gpupdate-setup updated with interface for backend 2020-03-26 17:10:33 +04:00
ce9891802f gpupdate-setup installed in /usr/sbin 2020-03-23 16:14:25 +04:00
d0ff27c45c gpupdate-setup script added to atomize actions of Group Policy switch 2020-03-23 16:07:18 +04:00
Evgeny Sinelnikov
e66f1fd5a4 0.4.3-alt1
- Fix polfile merging
- Add support controls with strings values
- Initial SIGINT signal handling
2020-03-06 19:30:13 +04:00
Evgeny Sinelnikov
c383631fde Merge pull request #39 from altlinux/initial_signal_handling
Initial signal handling tested.
2020-03-06 19:05:43 +04:00
Evgeny Sinelnikov
b519249aff Merge pull request #38 from altlinux/controls_with_strings_testing
Controls with strings testing tested.
2020-03-06 19:05:14 +04:00
b52c14a66f Signal handlers for gpoa and gpupdate introduced 2020-03-06 18:57:31 +04:00
c205940b08 Signal handling function introduced 2020-03-06 18:57:04 +04:00
a4e2b3638d New exit code value introduced 2020-03-06 18:56:41 +04:00
34344d66d8 Unit test for out-of-range control 2020-03-06 18:43:23 +04:00
32d7546a5a Numerous Popen fixes in control applier
This commit fixes errors caused by missed .wait() in Popen calls like:

```
/usr/lib64/python3.7/subprocess.py:858: ResourceWarning: subprocess 4955 is still running
  ResourceWarning, source=self)
ResourceWarning: Enable tracemalloc to get the object allocation traceback
/mnt/sda1/github.com/gpupdate/gpoa/frontend/appliers/control.py:30: ResourceWarning: unclosed file <_io.BufferedReader name=3>
  self.possible_values = self._query_control_values()
ResourceWarning: Enable tracemalloc to get the object allocation traceback
```
2020-03-06 16:08:13 +04:00
d63740f5de Basic (non-unit) tests added for controls 2020-03-06 16:06:50 +04:00
Rustem Bapin
c4197da64f Control applier is not breaking in case input value is not in possible values 2020-03-05 19:43:40 +04:00
056554c35c Fix service state handling
This commit fixes error on `gpupdate.service` unit start:

```
Unable to start systemd unit gpupdate.service
```

This error was caused by limited functionality on handling unit
states, especially for restartable units.
2020-03-04 16:24:46 +04:00
Rustem Bapin
d9bd6f663b Control applier handle both number and string values 2020-03-03 21:16:11 +04:00
NIR
ae156bcb92 Merge pull request #34 from altlinux/dbus_exception_catch
D-Bus exception catch
2020-03-03 19:20:48 +04:00
71107f72f7 Catch DBusException in dbus_runner when reply is timed out
When we try to start `oddjobd` via `D-Bus` using gpupdate we may
get the following traceback:

```
Traceback (most recent call last):
  File "/usr/bin/gpupdate", line 144, in <module>
    main()
  File "/usr/bin/gpupdate", line 139, in main
    gpo_appliers[1].run()
  File "/usr/lib/python3/site-packages/gpoa/util/dbus.py", line 48, in run
    result = self.interface.gpupdate()
  File "/usr/lib64/python3/site-packages/dbus/proxies.py", line 72, in __call__
    return self._proxy_method(*args, **keywords)
  File "/usr/lib64/python3/site-packages/dbus/proxies.py", line 147, in __call__
    **keywords)
  File "/usr/lib64/python3/site-packages/dbus/connection.py", line 653, in call_blocking
    message, timeout)
dbus.exceptions.DBusException: org.freedesktop.DBus.Error.NoReply: Did not receive a reply. Possible causes include: the remote application did not send a reply, the message bus security policy blocked the reply, the reply timeout expired, or the network connection was broken.
```

This commit catches this traceback, re-raises the exception and
establishes contract for `gpupdate` utility exit codes on fatal errors.
2020-03-03 19:18:56 +04:00
17411de005 Wiki submodule index bump 2020-03-03 17:22:19 +04:00
7ae1912455 gpupdate exit codes documented in manpage 2020-03-03 17:21:53 +04:00
7fbd1a3c40 gpt.gpt: Polfile merging quickfix 2020-03-03 17:20:50 +04:00
7b856f3d44 util.arguments.ExitCodeUpdater: Exit code contract for gpupdate utility 2020-03-03 17:18:34 +04:00
44b51fd05c .gitignore updated 2020-03-03 17:17:51 +04:00
Evgeny Sinelnikov
2778e2a043 0.4.2-alt1
- Change service type to 'simple' because gpoa is not systemd-aware
- Shortcuts fixes and improvements
2020-02-19 16:35:25 +04:00
Evgeny Sinelnikov
44f68020c5 Merge pull request #24 from altlinux/shortcuts_testing
Shortcuts fixes and improvements
2020-02-19 16:32:44 +04:00
Evgeny Sinelnikov
9ab8f41b77 Merge pull request #26 from altlinux/systemd_start_fixes
Change service type to 'simple' because gpoa is not systemd-aware
2020-02-19 16:31:33 +04:00
17467b5488 shortcut_applier: Don't try to operate on non-existent home directory for user 2020-02-19 16:09:15 +04:00
8c42f00f89 util.util: homedir_exists to check if home directory is created for user 2020-02-19 16:08:30 +04:00
a60a9d6b94 Fix for directory entry types in gpt.gpt 2020-02-19 15:23:54 +04:00
285fdae5e6 gpt.gpt: Find shortcuts and other directories without workarounds 2020-02-18 14:30:06 +04:00
be38594d07 gpt.shortcuts: Added functionality to store .desktop file type 2020-02-18 14:27:52 +04:00
fbc90b7e88 gpt.gpt: find_file improvement for cases when no root directory passed 2020-02-18 14:26:50 +04:00
a3172af037 gpt.gpt: find_dir for case-insensitive search for directory 2020-02-18 14:25:57 +04:00
6130973a09 ttype2str: function to transform TargetType object into string for JSON 2020-02-17 19:07:28 +04:00
5bcc38f80a Use RestartSec since gpupdate is not systemd-aware 2020-02-14 18:41:08 +04:00
d1cbba1834 Implement REAL unit test for shortcuts 2020-02-14 16:18:38 +04:00
33e7417811 Disable role module test 2020-02-14 16:16:53 +04:00
0c0caa4906 Fix for shortcut processor after unit-test 2020-02-14 16:16:53 +04:00
a384991be0 Test for shortcut of Type=Link added 2020-02-14 16:16:45 +04:00
38e70647f7 Unnecessary test removed 2020-02-14 16:16:13 +04:00
59ccecd457 Shortcuts fixes and improvements
- It was found that shortcut files must be executable in order to
  work correctly
- Added some checks for URL links. This should allow to create
  links pointing to network shares.
2020-02-14 16:16:07 +04:00
28da08eb24 Change service type to 'simple' because gpoa is not systemd-aware 2020-02-13 18:26:42 +04:00
46 changed files with 1136 additions and 157 deletions

3
.gitignore vendored
View File

@@ -1,2 +1,5 @@
__pycache__
*~
_opam
_build

228
dist/gpupdate-setup vendored Executable file
View File

@@ -0,0 +1,228 @@
#! /usr/bin/env python3
#
# GPOA - GPO Applier for Linux
#
# Copyright (C) 2019-2020 BaseALT Ltd.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import os
import sys
import argparse
import subprocess
import re
from gpoa.util.samba import smbopts
def command(args):
try:
subprocess.check_call(args.split())
except:
print ('command: \'%s\' error' % args)
def from_command(args):
try:
with subprocess.Popen(args.split(), stdout=subprocess.PIPE) as proc:
value = proc.stdout.readline().decode('utf-8')
proc.wait()
except:
print ('from_command: \'%s\' error' % args)
return 'local'
return value.strip()
def get_default_policy_name():
localpolicy = 'workstation'
dcpolicy = 'ad-domain-controller'
try:
if smbopt.get_server_role() == 'active directory domain controller':
return dcpolicy
except:
pass
try:
release = '/etc/altlinux-release'
if os.path.isfile(release):
f = open(release)
s = f.readline()
if re.search('server', s, re.I):
localpolicy = 'server'
except:
pass
return localpolicy
def parse_arguments():
'''
Parse CLI arguments.
'''
parser = argparse.ArgumentParser(prog='gpupdate-setup')
subparsers = parser.add_subparsers(dest='action',
metavar='action',
help='Group Policy management actions (default action is status)')
parser_list = subparsers.add_parser('list',
help='List avalable types of local policy')
parser_status = subparsers.add_parser('status',
help='Show current Group Policy status')
parser_enable = subparsers.add_parser('enable',
help='Enable Group Policy subsystem')
parser_disable = subparsers.add_parser('disable',
help='Disable Group Policy subsystem')
parser_write = subparsers.add_parser('write',
help='Operate on Group Policies (enable or disable)')
parser_active = subparsers.add_parser('active-policy',
help='Show name of policy enabled')
parser_write.add_argument('status',
choices=['enable', 'disable'],
help='Enable or disable Group Policies')
parser_write.add_argument('localpolicy',
default=None,
nargs='?',
help='Name of local policy to enable')
parser_enable.add_argument('localpolicy',
default=None,
nargs='?',
help='Name of local policy to enable')
return parser.parse_args()
def get_policy_entries(directory):
filtered_entries = list()
if os.path.isdir(directory):
entries = [os.path.join(directory, entry) for entry in os.listdir(directory)]
for entry in entries:
if os.path.isdir(os.path.join(entry)):
if not os.path.islink(os.path.join(entry)):
if not entry.rpartition('/')[2] == 'default':
filtered_entries.append(entry)
return filtered_entries
def get_policy_variants():
'''
Get the list of local policy variants deployed on this system.
Please note that is case overlapping names the names in
/etc/local-policy must override names in /usr/share/local-policy
'''
policy_dir = '/usr/share/local-policy'
etc_policy_dir = '/etc/local-policy'
system_policies = get_policy_entries(policy_dir)
user_policies = get_policy_entries(etc_policy_dir)
general_listing = list()
general_listing.extend(system_policies)
general_listing.extend(user_policies)
return general_listing
def validate_policy_name(policy_name):
return policy_name in [os.path.basename(d) for d in get_policy_variants()]
def get_status():
systemd_unit_link = '/etc/systemd/system/multi-user.target.wants/gpupdate.service'
return os.path.islink(systemd_unit_link)
def get_active_policy():
policy_dir = '/usr/share/local-policy'
etc_policy_dir = '/etc/local-policy'
default_policy_name = os.path.join(policy_dir, get_default_policy_name())
active_policy_name = os.path.join(etc_policy_dir, 'active')
actual_policy_name = os.path.realpath(default_policy_name)
if os.path.isdir(active_policy_name):
actual_policy_name = os.path.realpath(active_policy_name)
return actual_policy_name
def disable_gp():
if from_command('/usr/sbin/control system-auth') != 'local':
command('/usr/sbin/control system-policy global')
else:
command('/usr/sbin/control system-policy local')
command('systemctl disable gpupdate.service')
command('systemctl --global disable gpupdate-user.service')
def enable_gp(policy_name):
policy_dir = '/usr/share/local-policy'
etc_policy_dir = '/etc/local-policy'
target_policy_name = get_default_policy_name()
if policy_name:
if validate_policy_name(policy_name):
target_policy_name = policy_name
print (target_policy_name)
default_policy_name = os.path.join(policy_dir, target_policy_name)
active_policy_name = os.path.join(etc_policy_dir, 'active')
if not os.path.isdir(etc_policy_dir):
os.makedirs(etc_policy_dir)
if not os.path.islink(active_policy_name):
os.symlink(default_policy_name, active_policy_name)
else:
os.unlink(active_policy_name)
os.symlink(default_policy_name, active_policy_name)
# Enable oddjobd_gpupdate in PAM config
command('/usr/sbin/control system-policy gpupdate')
# Bootstrap the Group Policy engine
command('/usr/sbin/gpoa --nodomain --loglevel 5')
# Enable gpupdate-setup.service for all users
command('systemctl --global enable gpupdate-user.service')
def main():
arguments = parse_arguments()
if arguments.action == 'list':
for entry in get_policy_variants():
print(entry.rpartition('/')[2])
if arguments.action == 'status' or arguments.action == None:
if get_status():
print('enabled')
else:
print('disabled')
if arguments.action == 'write':
if arguments.status == 'enable' or arguments.status == '#t':
enable_gp(arguments.localpolicy)
if arguments.status == 'disable' or arguments.status == '#f':
disable_gp()
if arguments.action == "enable":
enable_gp(arguments.localpolicy)
if arguments.action == "disable":
disable_gp()
if arguments.action == 'active-policy':
print(get_active_policy())
if __name__ == '__main__':
main()

View File

@@ -5,8 +5,8 @@ Description=gpupdate in userspace
# gpupdate on Windows runs once per hour
[Service]
Environment="PATH=/bin:/sbin:/usr/bin:/usr/sbin"
Type=notify
WatchdogSec=3600
Type=simple
RestartSec=3600
TimeoutSec=3000
Restart=always
ExecStart=/usr/sbin/gpoa

View File

@@ -4,8 +4,8 @@ After=sssd.service
[Service]
Environment="PATH=/bin:/sbin:/usr/bin:/usr/sbin"
Type=notify
WatchdogSec=3600
Type=simple
RestartSec=3600
TimeoutSec=3000
Restart=always
ExecStart=/usr/bin/gpupdate

View File

@@ -44,6 +44,22 @@ Show help.
\fB--user \fIusername\fR
Run \fBgpupdate\fP for \fIusername\fP.
.
.SS "EXIT CODES"
.TP
\fB0\fR
Application exited successfully.
.TP
\fB1\fR
No runner is able to start \fBgpoa\fR.
.TP
\fB2\fR
No reply from \fID-Bus\fR when starting \fBgpoa\fR for computer using
\fBoddjobd\fR via \fID-Bus\fR.
.TP
\fB3\fR
No reply from \fID-Bus\fR when starting \fBgpoa\fR for user using
\fBoddjobd\fR via \fID-Bus\fR.
.
.SH "SEE ALSO"
gpoa(1)
.SH BUGS

View File

@@ -23,6 +23,8 @@ from util.logging import slogm
class control:
def __init__(self, name, value):
if type(value) != int and type(value) != str:
raise Exception('Unknown type of value for control')
self.control_name = name
self.control_value = value
self.possible_values = self._query_control_values()
@@ -30,29 +32,65 @@ class control:
raise Exception('Unable to query possible values')
def _query_control_values(self):
proc = subprocess.Popen(['/usr/sbin/control', self.control_name, 'list'], stdout=subprocess.PIPE)
for line in proc.stdout:
values = line.split()
return values
'''
Query possible values from control in order to perform check of
parameter passed to constructor.
'''
values = list()
popen_call = ['/usr/sbin/control', self.control_name, 'list']
with subprocess.Popen(popen_call, stdout=subprocess.PIPE) as proc:
values = proc.stdout.readline().decode('utf-8').split()
proc.wait()
return values
def _map_control_status(self, int_status):
str_status = self.possible_values[int_status].decode()
'''
Get control's string value by numeric index
'''
try:
str_status = self.possible_values[int_status]
except IndexError as exc:
logging.error(slogm('Error getting control ({}) value from {} by index {}'.format(self.control_name, self.possible_values, int_status)))
str_status = None
return str_status
def get_control_name(self):
return self.control_name
def get_control_status(self):
proc = subprocess.Popen(['/usr/sbin/control', self.control_name], stdout=subprocess.PIPE)
for line in proc.stdout:
return line.rstrip('\n\r')
'''
Get current control value
'''
line = None
popen_call = ['/usr/sbin/control', self.control_name]
with subprocess.Popen(popen_call, stdout=subprocess.PIPE) as proc:
line = proc.stdout.readline().decode('utf-8').rstrip('\n\r')
proc.wait()
return line
def set_control_status(self):
status = self._map_control_status(self.control_value)
if type(self.control_value) == int:
status = self._map_control_status(self.control_value)
if status == None:
logging.error(slogm('\'{}\' is not in possible values for control {}'.format(self.control_value, self.control_name)))
return
elif type(self.control_value) == str:
if self.control_value not in self.possible_values:
logging.error(slogm('\'{}\' is not in possible values for control {}'.format(self.control_value, self.control_name)))
return
status = self.control_value
logging.debug(slogm('Setting control {} to {}'.format(self.control_name, status)))
try:
proc = subprocess.Popen(['/usr/sbin/control', self.control_name, status], stdout=subprocess.PIPE)
popen_call = ['/usr/sbin/control', self.control_name, status]
with subprocess.Popen(popen_call, stdout=subprocess.PIPE) as proc:
proc.wait()
except:
logging.error(slogm('Unable to set {} to {}'.format(self.control_name, status)))

View File

@@ -40,14 +40,23 @@ class systemd_unit:
self.manager.EnableUnitFiles([self.unit_name], dbus.Boolean(False), dbus.Boolean(True))
self.manager.StartUnit(self.unit_name, 'replace')
logging.info(slogm('Starting systemd unit: {}'.format(self.unit_name)))
if self._get_state() != 'active':
# In case the service has 'RestartSec' property set it
# switches to 'activating (auto-restart)' state instead of
# 'active' so we consider 'activating' a valid state too.
service_state = self._get_state()
if not service_state in ['active', 'activating']:
logging.error(slogm('Unable to start systemd unit {}'.format(self.unit_name)))
else:
self.manager.StopUnit(self.unit_name, 'replace')
self.manager.DisableUnitFiles([self.unit_name], dbus.Boolean(False))
self.manager.MaskUnitFiles([self.unit_name], dbus.Boolean(False), dbus.Boolean(True))
logging.info(slogm('Stopping systemd unit: {}'.format(self.unit_name)))
if self._get_state() != 'stopped':
service_state = self._get_state()
if not service_state in ['stopped']:
logging.error(slogm('Unable to stop systemd unit {}'.format(self.unit_name)))
def _get_state(self):

View File

@@ -39,6 +39,9 @@ class control_applier(applier_frontend):
try:
self.controls.append(control(valuename, int(setting.data)))
logging.info(slogm('Working with control {}'.format(valuename)))
except ValueError as exc:
self.controls.append(control(valuename, setting.data))
logging.info(slogm('Working with control {} with string value'.format(valuename)))
except Exception as exc:
logging.info(slogm('Unable to work with control {}: {}'.format(valuename, exc)))
#for e in polfile.pol_file.entries:

View File

@@ -41,8 +41,13 @@ from util.users import (
with_privileges
)
from util.logging import slogm
from util.paths import (
frontend_module_dir
)
import logging
import os
import subprocess
def determine_username(username=None):
'''
@@ -72,6 +77,12 @@ class frontend_manager:
'''
def __init__(self, username, is_machine):
frontend_module_files = frontend_module_dir().glob('**/*')
self.frontend_module_binaries = list()
for exe in frontend_module_files:
if (exe.is_file() and os.access(exe.resolve(), os.X_OK)):
self.frontend_module_binaries.append(exe)
self.storage = registry_factory('registry')
self.username = determine_username(username)
self.is_machine = is_machine
@@ -105,6 +116,10 @@ class frontend_manager:
logging.error('Not sufficient privileges to run machine appliers')
return
logging.debug(slogm('Applying computer part of settings'))
for exe in self.frontend_module_binaries:
subprocess.check_call([exe.resolve()])
self.machine_appliers['systemd'].apply()
self.machine_appliers['control'].apply()
self.machine_appliers['polkit'].apply()

View File

@@ -23,6 +23,10 @@ from .applier_frontend import applier_frontend
from gpt.shortcuts import json2sc
from util.windows import expand_windows_var
from util.logging import slogm
from util.util import (
get_homedir,
homedir_exists
)
def storage_get_shortcuts(storage, sid):
'''
@@ -44,6 +48,17 @@ def write_shortcut(shortcut, username=None):
:username: None means working with machine variables and paths
'''
dest_abspath = expand_windows_var(shortcut.dest, username).replace('\\', '/') + '.desktop'
# Check that we're working for user, not on global system level
if username:
# Check that link destination path starts with specification of
# user's home directory
if dest_abspath.startswith(get_homedir(username)):
# Don't try to operate on non-existent directory
if not homedir_exists(username):
logging.warning(slogm('No home directory exists for user {}: will not create link {}'.format(username, dest_abspath)))
return None
logging.debug(slogm('Writing shortcut file to {}'.format(dest_abspath)))
shortcut.write_desktop(dest_abspath)

View File

@@ -20,6 +20,7 @@
import argparse
import logging
import os
import signal
from backend import backend_factory
from frontend.frontend_manager import frontend_manager, determine_username
@@ -35,6 +36,7 @@ from util.arguments import (
set_loglevel
)
from util.logging import slogm
from util.signals import signal_handler
def parse_arguments():
arguments = argparse.ArgumentParser(description='Generate configuration out of parsed policies')
@@ -128,5 +130,6 @@ def main():
controller.run()
if __name__ == "__main__":
signal.signal(signal.SIGINT, signal_handler)
main()

View File

@@ -58,9 +58,10 @@ class gpt:
if 'default' == self.guid:
self.guid = 'Local Policy'
self._machine_path = None
self._user_path = None
self._get_machine_user_dirs()
self._machine_path = find_dir(self.path, 'Machine')
self._user_path = find_dir(self.path, 'User')
self._machine_prefs = find_dir(self._machine_path, 'Preferences')
self._user_prefs = find_dir(self._user_path, 'Preferences')
logging.debug(slogm('Looking for machine part of GPT {}'.format(self.guid)))
self._find_machine()
@@ -88,19 +89,6 @@ class gpt:
return upm
def _get_machine_user_dirs(self):
'''
Find full path to Machine and User parts of GPT.
'''
entries = os.listdir(self.path)
for entry in entries:
full_entry_path = os.path.join(self.path, entry)
if os.path.isdir(full_entry_path):
if 'machine' == entry.lower():
self._machine_path = full_entry_path
if 'user' == entry.lower():
self._user_path = full_entry_path
def _find_user(self):
self._user_regpol = self._find_regpol('user')
self._user_shortcuts = self._find_shortcuts('user')
@@ -125,16 +113,14 @@ class gpt:
'''
Find Shortcuts.xml files.
'''
search_path = os.path.join(self._machine_path, 'Preferences', 'Shortcuts')
if 'user' == part:
try:
search_path = os.path.join(self._user_path, 'Preferences', 'Shortcuts')
except Exception as exc:
return None
if not search_path:
return None
shortcuts_dir = find_dir(self._machine_prefs, 'Shortcuts')
shortcuts_file = find_file(shortcuts_dir, 'shortcuts.xml')
return find_file(search_path, 'shortcuts.xml')
if 'user' == part:
shortcuts_dir = find_dir(self._user_prefs, 'Shortcuts')
shortcuts_file = find_file(shortcuts_dir, 'shortcuts.xml')
return shortcuts_file
def _find_envvars(self, part):
'''
@@ -194,7 +180,7 @@ class gpt:
util.preg.merge_polfile(self._machine_regpol)
if self._user_regpol:
logging.debug(slogm('Merging machine(user) settings from {}'.format(self._machine_regpol)))
util.preg.merge_polfile(self._user_regpol, self.machine_sid)
util.preg.merge_polfile(self._user_regpol, self.sid)
if self._machine_shortcuts:
logging.debug(slogm('Merging machine shortcuts from {}'.format(self._machine_shortcuts)))
self._merge_shortcuts()
@@ -240,15 +226,42 @@ User Shortcuts.xml: {}
)
return result
def find_dir(search_path, name):
'''
Attempt for case-insensitive search of directory
:param search_path: Path to get file list from
:param name: Name of the directory to search for
'''
if not search_path:
return None
try:
file_list = os.listdir(search_path)
for entry in file_list:
dir_path = os.path.join(search_path, entry)
if os.path.isdir(dir_path) and name.lower() == str(entry).lower():
return dir_path
except Exception as exc:
pass
return None
def find_file(search_path, name):
'''
Attempt for case-insensitive file search in directory.
'''
if not search_path:
return None
if not name:
return None
try:
file_list = os.listdir(search_path)
for entry in file_list:
file_path = os.path.join(search_path, entry)
if os.path.isfile(file_path) and name.lower() == entry.lower():
if os.path.isfile(file_path) and name.lower() == str(entry).lower():
return file_path
except Exception as exc:
#logging.error(exc)
@@ -260,7 +273,7 @@ def lp2gpt():
'''
Convert local-policy to full-featured GPT.
'''
lppath = os.path.join(default_policy_path(), 'local.xml')
lppath = os.path.join(default_policy_path(), 'Machine/Registry.pol.xml')
# Load settings from XML PolFile
polparser = GPPolParser()

View File

@@ -16,7 +16,10 @@
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
from pathlib import Path
import stat
import logging
from enum import Enum
from xml.etree import ElementTree
from xdg.DesktopEntry import DesktopEntry
@@ -25,18 +28,58 @@ import json
from util.windows import transform_windows_path
from util.xml import get_xml_root
class TargetType(Enum):
FILESYSTEM = 'FILESYSTEM'
URL = 'URL'
def get_ttype(targetstr):
'''
Validation function for targetType property
:targetstr: String representing link type.
:returns: Object of type TargetType.
'''
ttype = TargetType.FILESYSTEM
if targetstr == 'URL':
ttype = TargetType.URL
return ttype
def ttype2str(ttype):
'''
Transform TargetType to string for JSON serialization
:param ttype: TargetType object
'''
result = 'FILESYSTEM'
if ttype == TargetType.URL:
result = 'URL'
return result
def read_shortcuts(shortcuts_file):
'''
Read shortcut objects from GPTs XML file
:shortcuts_file: Location of Shortcuts.xml
'''
shortcuts = list()
for link in get_xml_root(shortcuts_file):
props = link.find('Properties')
# Location of the link itself
dest = props.get('shortcutPath')
# Location where link should follow
path = transform_windows_path(props.get('targetPath'))
# Arguments to executable file
arguments = props.get('arguments')
sc = shortcut(dest, path, arguments, link.get('name'))
# URL or FILESYSTEM
target_type = get_ttype(props.get('targetType'))
sc = shortcut(dest, path, arguments, link.get('name'), target_type)
sc.set_changed(link.get('changed'))
sc.set_clsid(link.get('clsid'))
sc.set_guid(link.get('uid'))
@@ -50,8 +93,9 @@ def json2sc(json_str):
Build shortcut out of string-serialized JSON
'''
json_obj = json.loads(json_str)
link_type = get_ttype(json_obj['type'])
sc = shortcut(json_obj['dest'], json_obj['path'], json_obj['arguments'], json_obj['name'])
sc = shortcut(json_obj['dest'], json_obj['path'], json_obj['arguments'], json_obj['name'], link_type)
sc.set_changed(json_obj['changed'])
sc.set_clsid(json_obj['clsid'])
sc.set_guid(json_obj['guid'])
@@ -60,13 +104,21 @@ def json2sc(json_str):
return sc
class shortcut:
def __init__(self, dest, path, arguments, name=None):
def __init__(self, dest, path, arguments, name=None, ttype=TargetType.FILESYSTEM):
'''
:param dest: Path to resulting file on file system
:param path: Path where the link should point to
:param arguments: Arguemnts to eecutable file
:param name: Name of the application
:param type: Link type - FILESYSTEM or URL
'''
self.dest = dest
self.path = path
self.arguments = arguments
self.name = name
self.changed = ''
self.is_in_user_context = self.set_usercontext()
self.type = ttype
def __str__(self):
result = self.to_json()
@@ -84,6 +136,14 @@ class shortcut:
def set_guid(self, uid):
self.guid = uid
def set_type(self, ttype):
'''
Set type of the hyperlink - FILESYSTEM or URL
:ttype: - object of class TargetType
'''
self.type = ttype
def set_usercontext(self, usercontext=False):
'''
Perform action in user context or not
@@ -111,6 +171,7 @@ class shortcut:
content['guid'] = self.guid
content['changed'] = self.changed
content['is_in_user_context'] = self.is_in_user_context
content['type'] = ttype2str(self.type)
result = self.desktop()
result.content.update(content)
@@ -123,17 +184,30 @@ class shortcut:
'''
self.desktop_file = DesktopEntry()
self.desktop_file.addGroup('Desktop Entry')
self.desktop_file.set('Type', 'Application')
if self.type == TargetType.URL:
self.desktop_file.set('Type', 'Link')
else:
self.desktop_file.set('Type', 'Application')
self.desktop_file.set('Version', '1.0')
self.desktop_file.set('Terminal', 'false')
self.desktop_file.set('Exec', '{} {}'.format(self.path, self.arguments))
self.desktop_file.set('Name', self.name)
if self.type == TargetType.URL:
self.desktop_file.set('URL', self.path)
else:
self.desktop_file.set('Terminal', 'false')
self.desktop_file.set('Exec', '{} {}'.format(self.path, self.arguments))
return self.desktop_file
def write_desktop(self, dest):
'''
Write .desktop file to disk using path 'dest'
Write .desktop file to disk using path 'dest'. Please note that
.desktop files must have executable bit set in order to work in
GUI.
'''
self.desktop().write(dest)
sc = Path(dest)
sc.chmod(sc.stat().st_mode | stat.S_IEXEC)

View File

@@ -24,17 +24,20 @@ import os
import sys
import logging
import pwd
import signal
from util.users import (
is_root
)
from util.arguments import (
process_target
process_target,
ExitCodeUpdater
)
from util.dbus import (
is_oddjobd_gpupdate_accessible,
dbus_runner
)
from util.signals import signal_handler
logging.basicConfig(level=logging.DEBUG)
@@ -132,14 +135,28 @@ def runner_factory(args, target):
def main():
args = parse_cli_arguments()
gpo_appliers = runner_factory(args, process_target(args.target))
if gpo_appliers:
if gpo_appliers[0]:
gpo_appliers[0].run()
try:
gpo_appliers[0].run()
except Exception as exc:
logging.error('Error running GPOA for computer: {}'.format(exc))
return int(ExitCodeUpdater.FAIL_GPUPDATE_COMPUTER_NOREPLY)
if gpo_appliers[1]:
gpo_appliers[1].run()
try:
gpo_appliers[1].run()
except Exception as exc:
logging.error('Error running GPOA for user: {}'.format(exc))
return int(ExitCodeUpdater.FAIL_GPUPDATE_USER_NOREPLY)
else:
logging.error('gpupdate will not be started')
return int(ExitCodeUpdater.FAIL_NO_RUNNER)
return int(ExitCodeUpdater.EXIT_SUCCESS)
if __name__ == '__main__':
main()
signal.signal(signal.SIGINT, signal_handler)
sys.exit(int(main()))

View File

@@ -22,6 +22,6 @@ from .sqlite_cache import sqlite_cache
def cache_factory(cache_name):
return sqlite_cache(cache_name)
def registry_factory(registry_name):
return sqlite_registry(registry_name)
def registry_factory(registry_name='registry', registry_dir=None):
return sqlite_registry(registry_name, registry_dir)

View File

@@ -0,0 +1,59 @@
#
# GPOA - GPO Applier for Linux
#
# Copyright (C) 2019-2020 BaseALT Ltd.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
class samba_preg(object):
'''
Object mapping representing HKLM entry (registry key without SID)
'''
def __init__(self, preg_obj):
self.hive_key = '{}\\{}'.format(preg_obj.keyname, preg_obj.valuename)
self.type = preg_obj.type
self.data = preg_obj.data
class samba_hkcu_preg(object):
'''
Object mapping representing HKCU entry (registry key with SID)
'''
def __init__(self, sid, preg_obj):
self.sid = sid
self.hive_key = '{}\\{}'.format(preg_obj.keyname, preg_obj.valuename)
self.type = preg_obj.type
self.data = preg_obj.data
class ad_shortcut(object):
'''
Object mapping representing Windows shortcut.
'''
def __init__(self, sid, sc):
self.sid = sid
self.path = sc.dest
self.shortcut = sc.to_json()
class info_entry(object):
def __init__(self, name, value):
self.name = name
self.value = value
class printer_entry(object):
'''
Object mapping representing Windows printer of some type.
'''
def __init__(self, sid, pobj):
self.sid = sid
self.name = pobj.name
self.printer = pobj.to_json()

View File

@@ -36,53 +36,21 @@ from sqlalchemy.orm import (
from util.logging import slogm
from util.paths import cache_dir
from .registry import registry
class samba_preg(object):
'''
Object mapping representing HKLM entry (registry key without SID)
'''
def __init__(self, preg_obj):
self.hive_key = '{}\\{}'.format(preg_obj.keyname, preg_obj.valuename)
self.type = preg_obj.type
self.data = preg_obj.data
class samba_hkcu_preg(object):
'''
Object mapping representing HKCU entry (registry key with SID)
'''
def __init__(self, sid, preg_obj):
self.sid = sid
self.hive_key = '{}\\{}'.format(preg_obj.keyname, preg_obj.valuename)
self.type = preg_obj.type
self.data = preg_obj.data
class ad_shortcut(object):
'''
Object mapping representing Windows shortcut.
'''
def __init__(self, sid, sc):
self.sid = sid
self.path = sc.dest
self.shortcut = sc.to_json()
class info_entry(object):
def __init__(self, name, value):
self.name = name
self.value = value
class printer_entry(object):
'''
Object mapping representing Windows printer of some type.
'''
def __init__(self, sid, pobj):
self.sid = sid
self.name = pobj.name
self.printer = pobj.to_json()
from .record_types import (
samba_preg
, samba_hkcu_preg
, ad_shortcut
, info_entry
, printer_entry
)
class sqlite_registry(registry):
def __init__(self, db_name):
def __init__(self, db_name, registry_cache_dir=None):
self.db_name = db_name
self.db_path = os.path.join('sqlite:///{}/{}.sqlite'.format(cache_dir(), self.db_name))
cdir = registry_cache_dir
if cdir == None:
cdir = cache_dir()
self.db_path = os.path.join('sqlite:///{}/{}.sqlite'.format(cdir, self.db_name))
self.db_cnt = create_engine(self.db_path, echo=False)
self.__metadata = MetaData(self.db_cnt)
self.__info = Table(
@@ -221,15 +189,21 @@ class sqlite_registry(registry):
Write PReg entry to HKEY_LOCAL_MACHINE
'''
pentry = samba_preg(preg_entry)
self._hklm_upsert(pentry)
if not pentry.hive_key.rpartition('\\')[2].startswith('**'):
self._hklm_upsert(pentry)
else:
logging.warning(slogm('Skipping branch deletion key: {}'.format(pentry.hive_key)))
def add_hkcu_entry(self, preg_entry, sid):
'''
Write PReg entry to HKEY_CURRENT_USER
'''
hkcu_pentry = samba_hkcu_preg(sid, preg_entry)
logging.debug(slogm('Adding HKCU entry for {}'.format(sid)))
self._hkcu_upsert(hkcu_pentry)
if not hkcu_pentry.hive_key.rpartition('\\')[2].startswith('**'):
logging.debug(slogm('Adding HKCU entry for {}'.format(sid)))
self._hkcu_upsert(hkcu_pentry)
else:
logging.warning(slogm('Skipping branch deletion key: {}'.format(hkcu_pentry.hive_key)))
def add_shortcut(self, sid, sc_obj):
'''

8
gpoa/test/files/share.desktop Executable file
View File

@@ -0,0 +1,8 @@
[Desktop Entry]
Type=Link
Version=1.0
Name=Docs
GenericName=Link to CIFS disk
Comment=Link to Samba share
URL=smb://10.0.0.0/

View File

@@ -0,0 +1,18 @@
#
# GPOA - GPO Applier for Linux
#
# Copyright (C) 2019-2020 BaseALT Ltd.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.

View File

@@ -0,0 +1,18 @@
#
# GPOA - GPO Applier for Linux
#
# Copyright (C) 2019-2020 BaseALT Ltd.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.

Binary file not shown.

View File

@@ -0,0 +1,9 @@
<?xml version="1.0" encoding="utf-8"?>
<PolFile num_entries="1" signature="PReg" version="1">
<Entry type="4" type_name="REG_DWORD">
<Key>Software\BaseALT\Policies\Control</Key>
<ValueName>sshd-gssapi-auth</ValueName>
<Value>1</Value>
</Entry>
</PolFile>

Binary file not shown.

View File

@@ -0,0 +1,8 @@
<?xml version="1.0" encoding="utf-8"?>
<PolFile num_entries="1" signature="PReg" version="1">
<Entry type="1" type_name="REG_SZ">
<Key>Software\BaseALT\Policies\Control</Key>
<ValueName>dvd-ram-control</ValueName>
<Value>restricted</Value>
</Entry>
</PolFile>

View File

@@ -0,0 +1,74 @@
#
# GPOA - GPO Applier for Linux
#
# Copyright (C) 2019-2020 BaseALT Ltd.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import unittest
from util.preg import load_preg
from frontend.appliers.control import control
class ControlTestCase(unittest.TestCase):
'''
Semi-integrational tests for control facility
'''
def test_control_with_int(self):
'''
Test procedure for control framework invocation with integer
value. The type of data loaded from PReg must be 'int' but
we're storing all values as strings inside the database. So,
for the test to be correct - we transform the value to string
first.
'''
preg_file = 'test/frontend/appliers/data/control_int.pol'
preg_data = load_preg(preg_file)
for entry in preg_data.entries:
control_name = entry.valuename
control_value = str(entry.data)
test_control = control(control_name, int(control_value))
test_control.set_control_status()
def test_control_int_out_of_range(self):
'''
Test procedure for control framework invocation with incorrect
integer value (out of range). The type of data loaded from PReg
must be 'int' but we're storing all values as strings inside the
database. So, for the test to be correct - we transform the
value to string first.
'''
control_name = 'sshd-gssapi-auth'
control_value = '50'
test_control = control(control_name, int(control_value))
test_control.set_control_status()
def test_control_with_str(self):
'''
Test procedure for control framework invocation with string
value. The type of data loaded from PReg must be 'str'.
'''
preg_file = 'test/frontend/appliers/data/control_string.pol'
preg_data = load_preg(preg_file)
for entry in preg_data.entries:
control_name = entry.valuename
control_value = entry.data
test_control = control(control_name, str(control_value))
test_control.set_control_status()

View File

@@ -0,0 +1,3 @@
<?xml version="1.0" encoding="utf-8"?>
<Shortcuts clsid="{872ECB34-B2EC-401b-A585-D32574AA90EE}"><Shortcut clsid="{4F2F7C55-2790-433e-8127-0739D1CFA327}" name="Documents" status="Far" image="1" removePolicy="1" userContext="0" bypassErrors="0" changed="2019-12-11 16:53:37" uid="{3C6978C0-F9F6-4B8B-822C-2846327A1C45}"><Properties pidl="" targetType="URL" action="R" comment="Far 32-bit" shortcutKey="0" startIn="" arguments="%HOME%" iconIndex="13" targetPath="smb://10.0.0.0/" iconPath="%SystemRoot%\system32\SHELL32.dll" window="MIN" shortcutPath="%DesktopDir%\Docs"/></Shortcut>
</Shortcuts>

View File

@@ -22,6 +22,7 @@ import unittest.mock
import os
import util.paths
import json
class GptShortcutsTestCase(unittest.TestCase):
@@ -35,5 +36,25 @@ class GptShortcutsTestCase(unittest.TestCase):
import gpt.shortcuts
testdata_path = '{}/test/gpt/data/Shortcuts.xml'.format(os.getcwd())
sc = gpt.shortcuts.read_shortcuts(testdata_path)
print(sc[0].to_json())
json_obj = json.loads(sc[0].to_json())
self.assertIsNotNone(json_obj['Desktop Entry'])
self.assertEqual(json_obj['Desktop Entry']['Type'], 'Application')
@unittest.mock.patch('util.paths.cache_dir')
def test_shortcut_link(self, cdir_mock):
'''
Test generation of .desktop file with Type=Link pointing to
Samba share.
'''
cdir_mock.return_value = '/var/cache/gpupdate'
import gpt.shortcuts
testdata_path = '{}/test/gpt/data/Shortcuts_link.xml'.format(os.getcwd())
sc = gpt.shortcuts.read_shortcuts(testdata_path)
json_obj = json.loads(sc[0].to_json())
self.assertIsNotNone(json_obj['Desktop Entry'])
self.assertEqual(json_obj['Desktop Entry']['Type'], 'Link')
self.assertEqual(json_obj['Desktop Entry']['URL'], 'smb://10.0.0.0/')

View File

@@ -0,0 +1,18 @@
#
# GPOA - GPO Applier for Linux
#
# Copyright (C) 2019-2020 BaseALT Ltd.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.

View File

@@ -0,0 +1,18 @@
<?xml version="1.0" encoding="utf-8"?>
<PolFile num_entries="3" signature="PReg" version="1">
<Entry type="1" type_name="REG_SZ">
<Key>Software\BaseALT\Policies\Control</Key>
<ValueName>**del.cups</ValueName>
<Value> </Value>
</Entry>
<Entry type="1" type_name="REG_SZ">
<Key>Software\BaseALT\Policies\Control</Key>
<ValueName>**del.postfix</ValueName>
<Value> </Value>
</Entry>
<Entry type="1" type_name="REG_SZ">
<Key>Software\BaseALT\Policies\Control</Key>
<ValueName>**del.postqueue</ValueName>
<Value> </Value>
</Entry>
</PolFile>

View File

@@ -0,0 +1,47 @@
#
# GPOA - GPO Applier for Linux
#
# Copyright (C) 2019-2020 BaseALT Ltd.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import unittest
import unittest.mock
import os
class StorageTestCase(unittest.TestCase):
preg_xml_path = '{}/test/storage/data/Registry.pol.xml'.format(os.getcwd())
reg_name = 'registry'
# Run destructive storage tests in current directory
reg_path = '{}/test/tmp'.format(os.getcwd())
@unittest.mock.patch('util.paths.cache_dir')
def test_add_hklm_entry(self, cdir_mock):
test_sid = None
from util.preg import merge_polfile
merge_polfile(self.preg_xml_path, test_sid, self.reg_name, self.reg_path)
@unittest.mock.patch('util.paths.cache_dir')
def test_add_hkcu_entry(self, cdir_mock):
test_sid = 'test_sid'
from util.preg import merge_polfile
merge_polfile(self.preg_xml_path, test_sid, self.reg_name, self.reg_path)

View File

@@ -1,16 +0,0 @@
#! /usr/bin/env python3
from gpoa.util.users import with_privileges
from gpoa.util.arguments import set_loglevel
def test_fn():
with open('testfile', 'w') as f:
f.write('test')
def main():
set_loglevel(1)
with_privileges('test', test_fn)
if '__main__' == __name__:
main()

5
gpoa/test/tmp/.gitignore vendored Normal file
View File

@@ -0,0 +1,5 @@
# Ignore everything in this directory
*
# Except this file
!.gitignore

View File

@@ -21,6 +21,7 @@ import unittest
from util.roles import fill_roles
class RolesTestCase(unittest.TestCase):
@unittest.skip('Role module test disabled because of instability')
def test_roles(self):
'''
Test utility functions to work with roles

View File

@@ -0,0 +1,37 @@
#
# GPOA - GPO Applier for Linux
#
# Copyright (C) 2019-2020 BaseALT Ltd.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import unittest
from util.rpm import (
install_rpms
, remove_rpms
)
class RPMTestCase(unittest.TestCase):
@unittest.skip('test_install_rpm is not unit test')
def test_install_rpm(self):
test_package_names = ['tortoisehg', 'csync']
install_rpms(test_package_names)
@unittest.skip('test_remove_rpm is not unit test')
def test_remove_rpm(self):
test_package_names = ['tortoisehg', 'csync']
remove_rpms(test_package_names)

View File

@@ -18,6 +18,7 @@
import logging
import logging.handlers
from enum import IntEnum
from .logging import slogm
@@ -75,3 +76,13 @@ def process_target(target_name=None):
return target
class ExitCodeUpdater(IntEnum):
'''
Exit code contract for gpupdate application
'''
EXIT_SUCCESS = 0
FAIL_NO_RUNNER = 1
FAIL_GPUPDATE_COMPUTER_NOREPLY = 2
FAIL_GPUPDATE_USER_NOREPLY = 3
EXIT_SIGINT = 130

View File

@@ -43,14 +43,27 @@ class dbus_runner:
if self.username:
logging.info(slogm('Starting GPO applier for user {} via D-Bus'.format(self.username)))
if is_root():
result = self.interface.gpupdatefor(dbus.String(self.username))
try:
result = self.interface.gpupdatefor(dbus.String(self.username))
print_dbus_result(result)
except dbus.exceptions.DBusException as exc:
logging.error(slogm('No reply from oddjobd gpoa runner for {}'.format(self.username)))
raise exc
else:
result = self.interface.gpupdate()
print_dbus_result(result)
try:
result = self.interface.gpupdate()
print_dbus_result(result)
except dbus.exceptions.DBusException as exc:
logging.error(slogm('No reply from oddjobd gpoa runner for current user'))
raise exc
else:
logging.info(slogm('Starting GPO applier for computer via D-Bus'))
result = self.interface.gpupdate_computer()
print_dbus_result(result)
try:
result = self.interface.gpupdate_computer()
print_dbus_result(result)
except dbus.exceptions.DBusException as exc:
logging.error(slogm('No reply from oddjobd gpoa runner for computer'))
raise exc
#self.interface.Quit()

View File

@@ -17,13 +17,22 @@
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import pathlib
import os
def default_policy_path():
'''
Returns path pointing to Default Policy directory.
'''
return pathlib.Path('/usr/share/local-policy/default')
local_policy_default = '/usr/share/local-policy/default'
etc_local_policy_default = '/etc/local-policy/active'
result_path = pathlib.Path(local_policy_default)
if os.path.exists(etc_local_policy_default):
result_path = pathlib.Path(etc_local_policy_default)
return pathlib.Path(result_path)
def cache_dir():
@@ -50,3 +59,31 @@ def local_policy_cache():
return lpcache
def backend_module_dir():
backend_dir = '/usr/lib/gpoa/backend'
return pathlib.Path(backend_dir)
def frontend_module_dir():
frontend_dir = '/usr/lib/gpoa/frontend'
return pathlib.Path(frontend_dir)
def storage_module_dir():
storage_dir = '/usr/lib/gpoa/storage'
return pathlib.Path(storage_dir)
def pre_backend_plugin_dir():
pre_backend_dir = '/usr/lib/gpoa/backend_pre'
return pathlib.Path(pre_backend_dir)
def post_backend_plugin_dir():
post_backend_dir = '/usr/lib/gpoa/backend_post'
return pathlib.Path(post_backend_dir)
def pre_frontend_plugin_dir():
pre_forntend_dir = '/usr/lib/gpoa/frontend_pre'
return pathlib.Path(pre_frontend_dir)
def post_frontend_plugin_dir():
post_frontend_dir = '/usr/lib/gpoa/frontend_post'
return pathlib.Path(post_frontend_dir)

View File

@@ -76,11 +76,10 @@ def preg_keymap(preg):
return keymap
def merge_polfile(preg, sid=None):
def merge_polfile(preg, sid=None, reg_name='registry', reg_path=None):
pregfile = load_preg(preg)
logging.info(slogm('Loaded PReg {}'.format(preg)))
key_map = dict()
storage = registry_factory('registry')
storage = registry_factory(reg_name, reg_path)
for entry in pregfile.entries:
if not sid:
storage.add_hklm_entry(entry)

View File

@@ -33,6 +33,64 @@ def is_rpm_installed(rpm_name):
return False
class Package:
__install_command = ['/usr/bin/apt-get', '-y', 'install']
__remove_command = ['/usr/bin/apt-get', '-y', 'remove']
__reinstall_command = ['/usr/bin/apt-get', '-y', 'reinstall']
def __init__(self, package_name):
self.package_name = package_name
self.for_install = True
if package_name.endswith('-'):
self.package_name = package_name[:-1]
self.for_install = False
self.installed = is_rpm_installed(self.package_name)
def mark_for_install(self):
self.for_install = True
def mark_for_removal(self):
self.for_install = False
def is_installed(self):
return self.installed
def is_for_install(self):
return self.for_install
def is_for_removal(self):
return (not self.for_install)
def action(self):
if self.for_install:
if not self.is_installed():
return self.install()
else:
if self.is_installed():
return self.remove()
def install(self):
fullcmd = self.__install_command
fullcmd.append(self.package_name)
return subprocess.check_call(fullcmd)
def reinstall(self):
fullcmd = self.__reinstall_command
fullcmd.append(self.package_name)
return subprocess.check_call(fullcmd)
def remove(self):
fullcmd = self.__remove_command
fullcmd.append(self.package_name)
return subprocess.check_call(fullcmd)
def __repr__(self):
return self.package_name
def __str__(self):
return self.package_name
def update():
'''
@@ -40,18 +98,40 @@ def update():
'''
subprocess.check_call(['/usr/bin/apt-get', 'update'])
def install_rpm(rpm_name):
'''
Install RPM from APT-RPM sources.
Install single RPM
'''
update()
subprocess.check_call(['/usr/bin/apt-get', '-y', 'install', rpm_name])
rpm = Package(rpm_name)
return rpm.install()
def remove_rpm(rpm_name):
'''
Remove RPM from file system.
Remove single RPM
'''
subprocess.check_call(['/usr/bin/apt-get', '-y', 'remove', rpm_name])
rpm = Package(rpm_name)
return rpm.remove()
def install_rpms(rpm_names):
'''
Install set of RPMs sequentially
'''
result = list()
for package in rpm_names:
result.append(install_rpm(package))
return result
def remove_rpms(rpm_names):
'''
Remove set of RPMs requentially
'''
result = list()
for package in rpm_names:
result.append(remove_rpm(package))
return result

38
gpoa/util/samba.py Normal file
View File

@@ -0,0 +1,38 @@
#
# GPOA - GPO Applier for Linux
#
# Copyright (C) 2019-2020 BaseALT Ltd.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import optparse
from samba import getopt as options
class smbopts:
def __init__(self, prog=None):
self.parser = optparse.OptionParser(prog)
self.sambaopts = options.SambaOptions(self.parser)
self.lp = self.sambaopts.get_loadparm()
def get_cache_dir(self):
return self._get_prop('cache directory')
def get_server_role(self):
return self._get_prop('server role')
def _get_prop(self, property_name):
return self.lp.get(property_name)

31
gpoa/util/signals.py Normal file
View File

@@ -0,0 +1,31 @@
#
# GPOA - GPO Applier for Linux
#
# Copyright (C) 2019-2020 BaseALT Ltd.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import signal
from sys import exit
from .arguments import ExitCodeUpdater
default_handler = signal.getsignal(signal.SIGINT)
def signal_handler(sig_number, frame):
# Ignore extra signals
signal.signal(sig_number, signal.SIG_IGN)
print('Received signal, exiting gracefully')
exit(ExitCodeUpdater.EXIT_SIGINT)

View File

@@ -20,6 +20,7 @@
import socket
import os
import pwd
from pathlib import Path
def get_machine_name():
@@ -53,6 +54,19 @@ def get_homedir(username):
'''
return pwd.getpwnam(username).pw_dir
def homedir_exists(username):
'''
Check that home directory exists for specified user.
:param username: string representing user name to work with
:return: True in case home directory exists and False otherwise
'''
hd = Path(get_homedir(username))
if hd.exists() and hd.is_dir():
return True
return False
def mk_homedir_path(username, homedir_path):
'''

View File

@@ -21,7 +21,6 @@ import logging
import os
import pwd
import optparse
from samba import getopt as options
from samba.gpclass import get_dc_hostname, check_refresh_gpo_list
@@ -33,15 +32,14 @@ from storage import cache_factory
from .xdg import get_user_dir
from .util import get_homedir
from .logging import slogm
from .samba import smbopts
class smbcreds:
class smbcreds (smbopts):
def __init__(self, dc_fqdn=None):
self.parser = optparse.OptionParser('GPO Applier')
self.sambaopts = options.SambaOptions(self.parser)
smbopts.__init__(self, 'GPO Applier')
self.credopts = options.CredentialsOptions(self.parser)
self.lp = self.sambaopts.get_loadparm()
self.creds = self.credopts.get_credentials(self.lp, fallback_machine=True)
self.selected_dc = self.set_dc(dc_fqdn)
@@ -87,9 +85,6 @@ class smbcreds:
return dns_domainname
def get_cache_dir(self):
return self._get_prop('cache directory')
def get_gpos(self, username):
'''
Get GPO list for the specified username for the specified DC
@@ -125,9 +120,6 @@ class smbcreds:
username, self.selected_dc)))
return gpos
def _get_prop(self, property_name):
return self.lp.get(property_name)
def wbinfo_getsid(domain, user):
'''

View File

@@ -1,13 +1,13 @@
%define _unpackaged_files_terminate_build 1
Name: gpupdate
Version: 0.4.1
Version: 0.5.0
Release: alt1
Summary: GPT applier
License: GPLv3+
Group: Other
Url: http://git.altlinux.org/
Url: https://github.com/altlinux/gpupdate
BuildArch: noarch
Requires: control
@@ -15,8 +15,13 @@ Requires: local-policy >= 0.1.0
BuildRequires: rpm-build-python3
Requires: python3-module-rpm
Requires: python3-module-dbus
Requires: oddjob-%name >= 0.2.0
Requires: libnss-role
Requires: libnss-role >= 0.5.0
Requires: local-policy >= 0.3.0
Requires: pam-config >= 1.8
# This is needed by shortcuts_applier
Requires: desktop-file-utils
Source0: %name-%version.tar
@@ -42,14 +47,16 @@ ln -s %python3_sitelibdir/gpoa/gpoa \
%buildroot%_sbindir/gpoa
ln -s %python3_sitelibdir/gpoa/gpupdate \
%buildroot%_bindir/gpupdate
cp dist/gpupdate-setup \
%buildroot%_sbindir/gpupdate-setup
mkdir -p %buildroot%_datadir/%name
mv %buildroot%python3_sitelibdir/gpoa/templates \
%buildroot%_datadir/%name/
install -Dm0644 %name.service %buildroot%_unitdir/%name.service
install -Dm0644 %name.service %{buildroot}/usr/lib/systemd/user/%{name}-user.service
install -Dm0644 system-policy-%name %buildroot%_sysconfdir/pam.d/system-policy-%name
install -Dm0644 dist/%name.service %buildroot%_unitdir/%name.service
install -Dm0644 dist/%name.service %{buildroot}/usr/lib/systemd/user/%{name}-user.service
install -Dm0644 dist/system-policy-%name %buildroot%_sysconfdir/pam.d/system-policy-%name
install -Dm0644 doc/gpoa.1 %buildroot/%{_man1dir}/gpoa.1
install -Dm0644 doc/gpupdate.1 %buildroot/%{_man1dir}/gpupdate.1
@@ -61,19 +68,40 @@ install -Dm0644 doc/gpupdate.1 %buildroot/%{_man1dir}/gpupdate.1
%files
%_sbindir/gpoa
%_sbindir/gpupdate-setup
%_bindir/gpupdate
%attr(755,root,root) %python3_sitelibdir/gpoa/gpoa
%attr(755,root,root) %python3_sitelibdir/gpoa/gpupdate
%python3_sitelibdir/gpoa
%_datadir/%name
%_unitdir/%name.service
%{_man1dir}/gpoa.1.xz
%{_man1dir}/gpupdate.1.xz
/usr/lib/systemd/user/%{name}-user.service
%_man1dir/gpoa.1.*
%_man1dir/gpupdate.1.*
/usr/lib/systemd/user/%name-user.service
%_sysconfdir/pam.d/system-policy-%name
%dir %_cachedir/%name
%changelog
* Wed Apr 22 2020 Evgeny Sinelnikov <sin@altlinux.org> 0.5.0-alt1
- Update samba format: local.xml -> Machine/Registry.pol.xml
- Add support of ad-domain-controller local policy profile
- Set properly URL of project
* Mon Apr 20 2020 Evgeny Sinelnikov <sin@altlinux.org> 0.4.5-alt1
- Add support for control system-policy and requires to new pam-config
* Sun Apr 19 2020 Evgeny Sinelnikov <sin@altlinux.org> 0.4.4-alt1
- Add gpupdate-setup initialization script supported local-policy profiles
* Fri Mar 06 2020 Evgeny Sinelnikov <sin@altlinux.org> 0.4.3-alt1
- Fix polfile merging
- Add support controls with strings values
- Initial SIGINT signal handling
* Wed Feb 19 2020 Evgeny Sinelnikov <sin@altlinux.org> 0.4.2-alt1
- Change service type to 'simple' because gpoa is not systemd-aware
- Shortcuts fixes and improvements
* Wed Feb 12 2020 Evgeny Sinelnikov <sin@altlinux.org> 0.4.1-alt1
- Update license to GPLv3+
- Run gpudate as root without user argument for Computer target only

2
wiki

Submodule wiki updated: a1333257b6...8cb284e0f7