mirror of
https://github.com/samba-team/samba.git
synced 2024-12-22 13:34:15 +03:00
gpo: Test the new get_applied functions
Signed-off-by: David Mulder <dmulder@suse.com> Reviewed-by: Douglas Bagnall <douglas.bagnall@catalyst.net.nz> Reviewed-by: Aurelien Aptel <aaptel@suse.com>
This commit is contained in:
parent
ba87e23b38
commit
cb3eb79eff
@ -17,15 +17,58 @@
|
|||||||
import os
|
import os
|
||||||
from samba import gpo, tests
|
from samba import gpo, tests
|
||||||
from samba.gpclass import register_gp_extension, list_gp_extensions, \
|
from samba.gpclass import register_gp_extension, list_gp_extensions, \
|
||||||
unregister_gp_extension
|
unregister_gp_extension, gp_log, GPOStorage
|
||||||
from samba.param import LoadParm
|
from samba.param import LoadParm
|
||||||
from samba.gpclass import check_refresh_gpo_list, check_safe_path, \
|
from samba.gpclass import check_refresh_gpo_list, check_safe_path, \
|
||||||
check_guid, parse_gpext_conf, atomic_write_conf
|
check_guid, parse_gpext_conf, atomic_write_conf
|
||||||
|
from subprocess import Popen, PIPE
|
||||||
|
from tempfile import NamedTemporaryFile
|
||||||
|
|
||||||
poldir = r'\\addom.samba.example.com\sysvol\addom.samba.example.com\Policies'
|
poldir = r'\\addom.samba.example.com\sysvol\addom.samba.example.com\Policies'
|
||||||
dspath = 'CN=Policies,CN=System,DC=addom,DC=samba,DC=example,DC=com'
|
dspath = 'CN=Policies,CN=System,DC=addom,DC=samba,DC=example,DC=com'
|
||||||
gpt_data = '[General]\nVersion=%d'
|
gpt_data = '[General]\nVersion=%d'
|
||||||
|
|
||||||
|
def days2rel_nttime(val):
|
||||||
|
seconds = 60
|
||||||
|
minutes = 60
|
||||||
|
hours = 24
|
||||||
|
sam_add = 10000000
|
||||||
|
return -(val * seconds * minutes * hours * sam_add)
|
||||||
|
|
||||||
|
def gpupdate_force(lp):
|
||||||
|
gpupdate = lp.get('gpo update command')
|
||||||
|
gpupdate.append('--force')
|
||||||
|
|
||||||
|
return Popen(gpupdate, stdout=PIPE, stderr=PIPE).wait()
|
||||||
|
|
||||||
|
def gpupdate_unapply(lp):
|
||||||
|
gpupdate = lp.get('gpo update command')
|
||||||
|
gpupdate.append('--unapply')
|
||||||
|
|
||||||
|
return Popen(gpupdate, stdout=PIPE, stderr=PIPE).wait()
|
||||||
|
|
||||||
|
def stage_file(path, data):
|
||||||
|
dirname = os.path.dirname(path)
|
||||||
|
if not os.path.exists(dirname):
|
||||||
|
try:
|
||||||
|
os.makedirs(dirname)
|
||||||
|
except OSError as e:
|
||||||
|
if not (e.errno == errno.EEXIST and os.path.isdir(dirname)):
|
||||||
|
return False
|
||||||
|
if os.path.exists(path):
|
||||||
|
os.rename(path, '%s.bak' % path)
|
||||||
|
with NamedTemporaryFile(delete=False, dir=os.path.dirname(path)) as f:
|
||||||
|
f.write(data)
|
||||||
|
os.rename(f.name, path)
|
||||||
|
os.chmod(path, 0o644)
|
||||||
|
return True
|
||||||
|
|
||||||
|
def unstage_file(path):
|
||||||
|
backup = '%s.bak' % path
|
||||||
|
if os.path.exists(backup):
|
||||||
|
os.rename(backup, path)
|
||||||
|
elif os.path.exists(path):
|
||||||
|
os.remove(path)
|
||||||
|
|
||||||
class GPOTests(tests.TestCase):
|
class GPOTests(tests.TestCase):
|
||||||
def setUp(self):
|
def setUp(self):
|
||||||
@ -148,3 +191,51 @@ class GPOTests(tests.TestCase):
|
|||||||
'Failed to find test variable in gpext.conf')
|
'Failed to find test variable in gpext.conf')
|
||||||
parser.remove_section('test_section')
|
parser.remove_section('test_section')
|
||||||
atomic_write_conf(lp, parser)
|
atomic_write_conf(lp, parser)
|
||||||
|
|
||||||
|
def test_gp_log_get_applied(self):
|
||||||
|
local_path = self.lp.get('path', 'sysvol')
|
||||||
|
guids = ['{31B2F340-016D-11D2-945F-00C04FB984F9}',
|
||||||
|
'{6AC1786C-016F-11D2-945F-00C04FB984F9}']
|
||||||
|
gpofile = '%s/addom.samba.example.com/Policies/%s/MACHINE/Microsoft/' \
|
||||||
|
'Windows NT/SecEdit/GptTmpl.inf'
|
||||||
|
stage = '[System Access]\nMinimumPasswordAge = 998\n'
|
||||||
|
cache_dir = self.lp.get('cache directory')
|
||||||
|
store = GPOStorage(os.path.join(cache_dir, 'gpo.tdb'))
|
||||||
|
for guid in guids:
|
||||||
|
gpttmpl = gpofile % (local_path, guid)
|
||||||
|
ret = stage_file(gpttmpl, stage)
|
||||||
|
self.assertTrue(ret, 'Could not create the target %s' % gpttmpl)
|
||||||
|
|
||||||
|
ret = gpupdate_force(self.lp)
|
||||||
|
self.assertEquals(ret, 0, 'gpupdate force failed')
|
||||||
|
|
||||||
|
gp_db = store.get_gplog('ADDC$')
|
||||||
|
|
||||||
|
applied_guids = gp_db.get_applied_guids()
|
||||||
|
self.assertEquals(len(applied_guids), 2, 'The guids were not found')
|
||||||
|
self.assertIn(guids[0], applied_guids,
|
||||||
|
'%s not in applied guids' % guids[0])
|
||||||
|
self.assertIn(guids[1], applied_guids,
|
||||||
|
'%s not in applied guids' % guids[1])
|
||||||
|
|
||||||
|
applied_settings = gp_db.get_applied_settings(applied_guids)
|
||||||
|
for policy in applied_settings:
|
||||||
|
self.assertIn('System Access', policy[1],
|
||||||
|
'System Access policies not set')
|
||||||
|
self.assertIn('minPwdAge', policy[1]['System Access'],
|
||||||
|
'minPwdAge policy not set')
|
||||||
|
if policy[0] == guids[0]:
|
||||||
|
self.assertEqual(int(policy[1]['System Access']['minPwdAge']),
|
||||||
|
days2rel_nttime(1),
|
||||||
|
'minPwdAge policy not set')
|
||||||
|
elif policy[0] == guids[1]:
|
||||||
|
self.assertEqual(int(policy[1]['System Access']['minPwdAge']),
|
||||||
|
days2rel_nttime(998),
|
||||||
|
'minPwdAge policy not set')
|
||||||
|
|
||||||
|
for guid in guids:
|
||||||
|
gpttmpl = gpofile % (local_path, guid)
|
||||||
|
unstage_file(gpttmpl)
|
||||||
|
|
||||||
|
ret = gpupdate_unapply(self.lp)
|
||||||
|
self.assertEquals(ret, 0, 'gpupdate unapply failed')
|
||||||
|
Loading…
Reference in New Issue
Block a user