mirror of
https://github.com/samba-team/samba.git
synced 2025-12-20 16:23:51 +03:00
gpupdate: Test Drive Maps Client Side Extension
Signed-off-by: David Mulder <dmulder@samba.org> Reviewed-by: Andrew Bartlett <abartlet@samba.org>
This commit is contained in:
committed by
Andrew Bartlett
parent
acd9248b13
commit
42d03da306
@@ -50,6 +50,7 @@ from samba.gp.gp_msgs_ext import gp_msgs_ext
|
||||
from samba.gp.gp_centrify_sudoers_ext import gp_centrify_sudoers_ext
|
||||
from samba.gp.gp_centrify_crontab_ext import gp_centrify_crontab_ext, \
|
||||
gp_user_centrify_crontab_ext
|
||||
from samba.gp.gp_drive_maps_ext import gp_drive_maps_user_ext
|
||||
from samba.common import get_bytes
|
||||
from samba.dcerpc import preg
|
||||
from samba.ndr import ndr_pack
|
||||
@@ -60,7 +61,7 @@ import hashlib
|
||||
from samba.gp_parse.gp_pol import GPPolParser
|
||||
from glob import glob
|
||||
from configparser import ConfigParser
|
||||
from samba.gp.gpclass import get_dc_hostname
|
||||
from samba.gp.gpclass import get_dc_hostname, expand_pref_variables
|
||||
from samba import Ldb
|
||||
import ldb as _ldb
|
||||
from samba.auth import system_session
|
||||
@@ -5009,6 +5010,11 @@ br"""
|
||||
</PolFile>
|
||||
"""
|
||||
|
||||
drive_maps_xml = b"""<?xml version="1.0" encoding="utf-8"?>
|
||||
<Drives clsid="{8FDDCC1A-0C3C-43cd-A6B4-71A6DF20DA8C}"><Drive clsid="{935D1B74-9CB8-4e3c-9914-7DD559B7A417}" name="A:" status="A:" image="2" changed="2023-03-08 19:23:02" uid="{1641E121-DEF3-418D-A428-2D8DF4749504}" bypassErrors="1"><Properties action="U" thisDrive="NOCHANGE" allDrives="NOCHANGE" userName="" path="\\\\example.com\\test" label="TEST" persistent="1" useLetter="0" letter="A"/></Drive>
|
||||
</Drives>
|
||||
"""
|
||||
|
||||
def days2rel_nttime(val):
|
||||
seconds = 60
|
||||
minutes = 60
|
||||
@@ -7829,3 +7835,183 @@ class GPOTests(tests.TestCase):
|
||||
# Unstage the Registry.pol files
|
||||
unstage_file(reg_pol)
|
||||
unstage_file(reg_pol2)
|
||||
|
||||
def test_gp_drive_maps_user_ext(self):
|
||||
local_path = self.lp.cache_path('gpo_cache')
|
||||
guid = '{31B2F340-016D-11D2-945F-00C04FB984F9}'
|
||||
xml_path = os.path.join(local_path, policies, guid,
|
||||
'USER/PREFERENCES/DRIVES/DRIVES.XML')
|
||||
cache_dir = self.lp.get('cache directory')
|
||||
store = GPOStorage(os.path.join(cache_dir, 'gpo.tdb'))
|
||||
|
||||
machine_creds = Credentials()
|
||||
machine_creds.guess(self.lp)
|
||||
machine_creds.set_machine_account()
|
||||
|
||||
# Initialize the group policy extension
|
||||
ext = gp_drive_maps_user_ext(self.lp, machine_creds,
|
||||
os.environ.get('DC_USERNAME'), store)
|
||||
|
||||
ads = gpo.ADS_STRUCT(self.server, self.lp, machine_creds)
|
||||
if ads.connect():
|
||||
gpos = ads.get_gpo_list(machine_creds.get_username())
|
||||
|
||||
# Stage the Drives.xml file with test data
|
||||
ret = stage_file(xml_path, drive_maps_xml)
|
||||
self.assertTrue(ret, 'Could not create the target %s' % xml_path)
|
||||
|
||||
# Process all gpos, intentionally skipping the privilege drop
|
||||
ext.process_group_policy([], gpos)
|
||||
# Dump the fake crontab setup for testing
|
||||
p = Popen(['crontab', '-l'], stdout=PIPE)
|
||||
crontab, _ = p.communicate()
|
||||
entry = b'@hourly gio mount smb://example.com/test'
|
||||
self.assertIn(entry, crontab,
|
||||
'The crontab entry was not installed')
|
||||
|
||||
# Check that a call to gpupdate --rsop also succeeds
|
||||
ret = rsop(self.lp)
|
||||
self.assertEquals(ret, 0, 'gpupdate --rsop failed!')
|
||||
|
||||
# Unstage the Drives.xml
|
||||
unstage_file(xml_path)
|
||||
|
||||
# Modify the policy and ensure it is updated
|
||||
xml_conf = etree.fromstring(drive_maps_xml.strip())
|
||||
drives = xml_conf.findall('Drive')
|
||||
props = drives[0].find('Properties')
|
||||
props.attrib['action'] = 'D'
|
||||
ret = stage_file(xml_path,
|
||||
etree.tostring(xml_conf, encoding='unicode'))
|
||||
self.assertTrue(ret, 'Could not create the target %s' % xml_path)
|
||||
|
||||
# Process all gpos, intentionally skipping the privilege drop
|
||||
ext.process_group_policy([], gpos)
|
||||
# Dump the fake crontab setup for testing
|
||||
p = Popen(['crontab', '-l'], stdout=PIPE)
|
||||
crontab, _ = p.communicate()
|
||||
self.assertNotIn(entry+b'\n', crontab,
|
||||
'The old crontab entry was not removed')
|
||||
entry = entry + b' --unmount'
|
||||
self.assertIn(entry, crontab,
|
||||
'The crontab entry was not installed')
|
||||
|
||||
# Remove policy
|
||||
gp_db = store.get_gplog(os.environ.get('DC_USERNAME'))
|
||||
del_gpos = get_deleted_gpos_list(gp_db, [])
|
||||
ext.process_group_policy(del_gpos, [])
|
||||
# Dump the fake crontab setup for testing
|
||||
p = Popen(['crontab', '-l'], stdout=PIPE)
|
||||
crontab, _ = p.communicate()
|
||||
self.assertNotIn(entry, crontab,
|
||||
'Unapply failed to cleanup crontab entry')
|
||||
|
||||
# Unstage the Drives.xml
|
||||
unstage_file(xml_path)
|
||||
|
||||
# Modify the policy to set 'run once', ensure there is no cron entry
|
||||
xml_conf = etree.fromstring(drive_maps_xml.strip())
|
||||
drives = xml_conf.findall('Drive')
|
||||
filters = etree.SubElement(drives[0], 'Filters')
|
||||
etree.SubElement(filters, 'FilterRunOnce')
|
||||
ret = stage_file(xml_path,
|
||||
etree.tostring(xml_conf, encoding='unicode'))
|
||||
self.assertTrue(ret, 'Could not create the target %s' % xml_path)
|
||||
|
||||
# Process all gpos, intentionally skipping the privilege drop
|
||||
ext.process_group_policy([], gpos)
|
||||
# Dump the fake crontab setup for testing
|
||||
p = Popen(['crontab', '-l'], stdout=PIPE)
|
||||
crontab, _ = p.communicate()
|
||||
entry = b'@hourly gio mount smb://example.com/test'
|
||||
self.assertNotIn(entry, crontab,
|
||||
'The crontab entry was added despite run-once request')
|
||||
|
||||
# Remove policy
|
||||
gp_db = store.get_gplog(os.environ.get('DC_USERNAME'))
|
||||
del_gpos = get_deleted_gpos_list(gp_db, [])
|
||||
ext.process_group_policy(del_gpos, [])
|
||||
|
||||
# Unstage the Drives.xml
|
||||
unstage_file(xml_path)
|
||||
|
||||
def test_expand_pref_variables(self):
|
||||
cache_path = self.lp.cache_path(os.path.join('gpo_cache'))
|
||||
gpt_path = 'TEST'
|
||||
username = 'test_uname'
|
||||
test_vars = { 'AppDataDir': os.path.expanduser('~/.config'),
|
||||
'ComputerName': self.lp.get('netbios name'),
|
||||
'DesktopDir': os.path.expanduser('~/Desktop'),
|
||||
'DomainName': self.lp.get('realm'),
|
||||
'GptPath': os.path.join(cache_path,
|
||||
check_safe_path(gpt_path).upper()),
|
||||
'LogonDomain': self.lp.get('realm'),
|
||||
'LogonUser': username,
|
||||
'SystemDrive': '/',
|
||||
'TempDir': '/tmp'
|
||||
}
|
||||
for exp_var, val in test_vars.items():
|
||||
self.assertEqual(expand_pref_variables('%%%s%%' % exp_var,
|
||||
gpt_path,
|
||||
self.lp,
|
||||
username),
|
||||
val, 'Failed to expand variable %s' % exp_var)
|
||||
# With the time variables, we can't test for an exact time, so let's do
|
||||
# simple checks instead.
|
||||
time_vars = ['DateTime', 'DateTimeEx', 'LocalTime',
|
||||
'LocalTimeEx', 'TimeStamp']
|
||||
for time_var in time_vars:
|
||||
self.assertNotEqual(expand_pref_variables('%%%s%%' % time_var,
|
||||
gpt_path,
|
||||
self.lp,
|
||||
username),
|
||||
None, 'Failed to expand variable %s' % time_var)
|
||||
|
||||
# Here we test to ensure undefined preference variables cause an error.
|
||||
# The reason for testing these is to ensure we don't apply nonsense
|
||||
# policies when they can't be defined. Also, these tests will fail if
|
||||
# one of these is implemented in the future (forcing us to write a test
|
||||
# anytime these are implemented).
|
||||
undef_vars = ['BinaryComputerSid',
|
||||
'BinaryUserSid',
|
||||
'CommonAppdataDir',
|
||||
'CommonDesktopDir',
|
||||
'CommonFavoritesDir',
|
||||
'CommonProgramsDir',
|
||||
'CommonStartUpDir',
|
||||
'CurrentProccessId',
|
||||
'CurrentThreadId',
|
||||
'FavoritesDir',
|
||||
'GphPath',
|
||||
'GroupPolicyVersion',
|
||||
'LastDriveMapped',
|
||||
'LastError',
|
||||
'LastErrorText',
|
||||
'LdapComputerSid',
|
||||
'LdapUserSid',
|
||||
'LogonServer',
|
||||
'LogonUserSid',
|
||||
'MacAddress',
|
||||
'NetPlacesDir',
|
||||
'OsVersion',
|
||||
'ProgramFilesDir',
|
||||
'ProgramsDir',
|
||||
'RecentDocumentsDir',
|
||||
'ResultCode',
|
||||
'ResultText',
|
||||
'ReversedComputerSid',
|
||||
'ReversedUserSid',
|
||||
'SendToDir',
|
||||
'StartMenuDir',
|
||||
'StartUpDir',
|
||||
'SystemDir',
|
||||
'TraceFile',
|
||||
'WindowsDir'
|
||||
]
|
||||
for undef_var in undef_vars:
|
||||
try:
|
||||
expand_pref_variables('%%%s%%' % undef_var, gpt_path, self.lp)
|
||||
except NameError:
|
||||
pass
|
||||
else:
|
||||
self.fail('Undefined variable %s caused no error' % undef_var)
|
||||
|
||||
Reference in New Issue
Block a user