2018-02-12 22:21:42 +03:00
# Unix SMB/CIFS implementation. Tests for smb manipulation
# Copyright (C) David Mulder <dmulder@suse.com> 2018
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import os
2018-10-11 03:47:22 +03:00
import errno
2018-02-12 22:21:42 +03:00
from samba import gpo , tests
2018-07-11 08:09:26 +03:00
from samba . gpclass import register_gp_extension , list_gp_extensions , \
2018-10-11 03:47:22 +03:00
unregister_gp_extension , GPOStorage
2018-02-12 22:21:42 +03:00
from samba . param import LoadParm
2018-07-11 08:09:26 +03:00
from samba . gpclass import check_refresh_gpo_list , check_safe_path , \
2018-08-30 19:25:45 +03:00
check_guid , parse_gpext_conf , atomic_write_conf , get_deleted_gpos_list
2018-08-30 02:28:58 +03:00
from subprocess import Popen , PIPE
2020-06-17 00:29:40 +03:00
from tempfile import NamedTemporaryFile , TemporaryDirectory
2020-07-08 23:48:45 +03:00
from samba . gp_sec_ext import gp_krb_ext , gp_access_ext
2020-06-17 00:29:40 +03:00
from samba . gp_scripts_ext import gp_scripts_ext
2020-06-26 21:37:11 +03:00
from samba . gp_sudoers_ext import gp_sudoers_ext
2020-07-07 20:10:10 +03:00
from samba . gpclass import gp_inf_ext
2018-07-26 00:24:35 +03:00
from samba . gp_smb_conf_ext import gp_smb_conf_ext
2018-08-31 00:22:08 +03:00
import logging
from samba . credentials import Credentials
2020-07-09 17:39:41 +03:00
from samba . gp_msgs_ext import gp_msgs_ext
2018-11-06 22:55:22 +03:00
from samba . compat import get_bytes
2020-06-17 00:29:40 +03:00
from samba . dcerpc import preg
from samba . ndr import ndr_pack
2020-07-07 20:10:10 +03:00
import codecs
2018-07-26 00:24:35 +03:00
from shutil import copyfile
2018-02-12 22:21:42 +03:00
2018-12-13 23:50:02 +03:00
realm = os . environ . get ( ' REALM ' )
policies = realm + ' /POLICIES '
realm = realm . lower ( )
poldir = r ' \\ {0} \ sysvol \ {0} \ Policies ' . format ( realm )
# the first part of the base DN varies by testenv. Work it out from the realm
base_dn = ' DC= {0} ,DC=samba,DC=example,DC=com ' . format ( realm . split ( ' . ' ) [ 0 ] )
dspath = ' CN=Policies,CN=System, ' + base_dn
2018-02-12 22:21:42 +03:00
gpt_data = ' [General] \n Version= %d '
2018-08-30 02:28:58 +03:00
def days2rel_nttime ( val ) :
seconds = 60
minutes = 60
hours = 24
sam_add = 10000000
return - ( val * seconds * minutes * hours * sam_add )
def gpupdate_force ( lp ) :
gpupdate = lp . get ( ' gpo update command ' )
gpupdate . append ( ' --force ' )
return Popen ( gpupdate , stdout = PIPE , stderr = PIPE ) . wait ( )
def gpupdate_unapply ( lp ) :
gpupdate = lp . get ( ' gpo update command ' )
gpupdate . append ( ' --unapply ' )
return Popen ( gpupdate , stdout = PIPE , stderr = PIPE ) . wait ( )
def stage_file ( path , data ) :
dirname = os . path . dirname ( path )
if not os . path . exists ( dirname ) :
try :
os . makedirs ( dirname )
except OSError as e :
if not ( e . errno == errno . EEXIST and os . path . isdir ( dirname ) ) :
return False
if os . path . exists ( path ) :
os . rename ( path , ' %s .bak ' % path )
with NamedTemporaryFile ( delete = False , dir = os . path . dirname ( path ) ) as f :
2018-11-06 22:55:22 +03:00
f . write ( get_bytes ( data ) )
2018-08-30 02:28:58 +03:00
os . rename ( f . name , path )
os . chmod ( path , 0o644 )
return True
def unstage_file ( path ) :
backup = ' %s .bak ' % path
if os . path . exists ( backup ) :
os . rename ( backup , path )
elif os . path . exists ( path ) :
os . remove ( path )
2018-07-30 09:20:39 +03:00
2018-02-12 22:21:42 +03:00
class GPOTests ( tests . TestCase ) :
def setUp ( self ) :
super ( GPOTests , self ) . setUp ( )
self . server = os . environ [ " SERVER " ]
2018-12-13 23:50:02 +03:00
self . dc_account = self . server . upper ( ) + ' $ '
2018-02-12 22:21:42 +03:00
self . lp = LoadParm ( )
self . lp . load_default ( )
self . creds = self . insta_creds ( template = self . get_credentials ( ) )
def tearDown ( self ) :
super ( GPOTests , self ) . tearDown ( )
def test_gpo_list ( self ) :
global poldir , dspath
ads = gpo . ADS_STRUCT ( self . server , self . lp , self . creds )
if ads . connect ( ) :
gpos = ads . get_gpo_list ( self . creds . get_username ( ) )
guid = ' { 31B2F340-016D-11D2-945F-00C04FB984F9} '
names = [ ' Local Policy ' , guid ]
file_sys_paths = [ None , ' %s \\ %s ' % ( poldir , guid ) ]
ds_paths = [ None , ' CN= %s , %s ' % ( guid , dspath ) ]
for i in range ( 0 , len ( gpos ) ) :
2020-02-07 01:02:38 +03:00
self . assertEqual ( gpos [ i ] . name , names [ i ] ,
2018-07-30 09:16:12 +03:00
' The gpo name did not match expected name %s ' % gpos [ i ] . name )
2020-02-07 01:02:38 +03:00
self . assertEqual ( gpos [ i ] . file_sys_path , file_sys_paths [ i ] ,
2018-07-30 09:16:12 +03:00
' file_sys_path did not match expected %s ' % gpos [ i ] . file_sys_path )
2020-02-07 01:02:38 +03:00
self . assertEqual ( gpos [ i ] . ds_path , ds_paths [ i ] ,
2018-07-30 09:16:12 +03:00
' ds_path did not match expected %s ' % gpos [ i ] . ds_path )
2018-02-12 22:21:42 +03:00
2018-04-13 03:29:05 +03:00
def test_gpo_ads_does_not_segfault ( self ) :
try :
ads = gpo . ADS_STRUCT ( self . server , 42 , self . creds )
except :
pass
2018-02-12 22:21:42 +03:00
def test_gpt_version ( self ) :
global gpt_data
2018-01-08 17:17:29 +03:00
local_path = self . lp . cache_path ( ' gpo_cache ' )
2018-02-12 22:21:42 +03:00
guid = ' { 31B2F340-016D-11D2-945F-00C04FB984F9} '
gpo_path = os . path . join ( local_path , policies , guid )
old_vers = gpo . gpo_get_sysvol_gpt_version ( gpo_path ) [ 1 ]
with open ( os . path . join ( gpo_path , ' GPT.INI ' ) , ' w ' ) as gpt :
gpt . write ( gpt_data % 42 )
2020-02-07 01:02:38 +03:00
self . assertEqual ( gpo . gpo_get_sysvol_gpt_version ( gpo_path ) [ 1 ] , 42 ,
2018-07-30 09:16:12 +03:00
' gpo_get_sysvol_gpt_version() did not return the expected version ' )
2018-02-12 22:21:42 +03:00
with open ( os . path . join ( gpo_path , ' GPT.INI ' ) , ' w ' ) as gpt :
gpt . write ( gpt_data % old_vers )
2020-02-07 01:02:38 +03:00
self . assertEqual ( gpo . gpo_get_sysvol_gpt_version ( gpo_path ) [ 1 ] , old_vers ,
2018-07-30 09:16:12 +03:00
' gpo_get_sysvol_gpt_version() did not return the expected version ' )
2018-02-12 22:21:42 +03:00
2018-01-08 17:17:29 +03:00
def test_check_refresh_gpo_list ( self ) :
cache = self . lp . cache_path ( ' gpo_cache ' )
ads = gpo . ADS_STRUCT ( self . server , self . lp , self . creds )
if ads . connect ( ) :
gpos = ads . get_gpo_list ( self . creds . get_username ( ) )
check_refresh_gpo_list ( self . server , self . lp , self . creds , gpos )
self . assertTrue ( os . path . exists ( cache ) ,
' GPO cache %s was not created ' % cache )
guid = ' { 31B2F340-016D-11D2-945F-00C04FB984F9} '
2018-12-13 23:50:02 +03:00
gpt_ini = os . path . join ( cache , policies ,
2018-01-08 17:17:29 +03:00
guid , ' GPT.INI ' )
self . assertTrue ( os . path . exists ( gpt_ini ) ,
' GPT.INI was not cached for %s ' % guid )
def test_check_refresh_gpo_list_malicious_paths ( self ) :
# the path cannot contain ..
path = ' /usr/local/samba/var/locks/sysvol/../../../../../../root/ '
self . assertRaises ( OSError , check_safe_path , path )
self . assertEqual ( check_safe_path ( ' /etc/passwd ' ) , ' etc/passwd ' )
self . assertEqual ( check_safe_path ( ' \\ \\ etc/ \\ passwd ' ) , ' etc/passwd ' )
# there should be no backslashes used to delineate paths
2018-12-13 23:50:02 +03:00
before = ' sysvol/ ' + realm + ' \\ Policies/ ' \
2018-01-08 17:17:29 +03:00
' { 31B2F340-016D-11D2-945F-00C04FB984F9} \\ GPT.INI '
2018-12-13 23:50:02 +03:00
after = realm + ' /Policies/ ' \
2018-01-08 17:17:29 +03:00
' { 31B2F340-016D-11D2-945F-00C04FB984F9}/GPT.INI '
result = check_safe_path ( before )
2020-02-07 01:02:38 +03:00
self . assertEqual ( result , after , ' check_safe_path() didn \' t '
2018-07-30 09:16:12 +03:00
' correctly convert \\ to / ' )
2018-01-08 17:17:29 +03:00
2018-07-11 08:09:26 +03:00
def test_gpt_ext_register ( self ) :
this_path = os . path . dirname ( os . path . realpath ( __file__ ) )
samba_path = os . path . realpath ( os . path . join ( this_path , ' ../../../ ' ) )
ext_path = os . path . join ( samba_path , ' python/samba/gp_sec_ext.py ' )
ext_guid = ' { 827D319E-6EAC-11D2-A4EA-00C04F79F83A} '
2020-07-08 23:48:45 +03:00
ret = register_gp_extension ( ext_guid , ' gp_access_ext ' , ext_path ,
2018-07-11 08:09:26 +03:00
smb_conf = self . lp . configfile ,
machine = True , user = False )
self . assertTrue ( ret , ' Failed to register a gp ext ' )
gp_exts = list_gp_extensions ( self . lp . configfile )
self . assertTrue ( ext_guid in gp_exts . keys ( ) ,
2018-07-30 09:16:12 +03:00
' Failed to list gp exts ' )
2020-02-07 01:02:38 +03:00
self . assertEqual ( gp_exts [ ext_guid ] [ ' DllName ' ] , ext_path ,
2018-07-30 09:16:12 +03:00
' Failed to list gp exts ' )
2018-07-11 08:09:26 +03:00
unregister_gp_extension ( ext_guid )
gp_exts = list_gp_extensions ( self . lp . configfile )
self . assertTrue ( ext_guid not in gp_exts . keys ( ) ,
2018-07-30 09:16:12 +03:00
' Failed to unregister gp exts ' )
2018-07-11 08:09:26 +03:00
self . assertTrue ( check_guid ( ext_guid ) , ' Failed to parse valid guid ' )
self . assertFalse ( check_guid ( ' AAAAAABBBBBBBCCC ' ) , ' Parsed invalid guid ' )
lp , parser = parse_gpext_conf ( self . lp . configfile )
self . assertTrue ( lp and parser , ' parse_gpext_conf() invalid return ' )
parser . add_section ( ' test_section ' )
parser . set ( ' test_section ' , ' test_var ' , ext_guid )
atomic_write_conf ( lp , parser )
lp , parser = parse_gpext_conf ( self . lp . configfile )
self . assertTrue ( ' test_section ' in parser . sections ( ) ,
2018-07-30 09:16:12 +03:00
' test_section not found in gpext.conf ' )
2020-02-07 01:02:38 +03:00
self . assertEqual ( parser . get ( ' test_section ' , ' test_var ' ) , ext_guid ,
2018-07-30 09:16:12 +03:00
' Failed to find test variable in gpext.conf ' )
2018-07-11 08:09:26 +03:00
parser . remove_section ( ' test_section ' )
atomic_write_conf ( lp , parser )
2018-08-30 02:28:58 +03:00
def test_gp_log_get_applied ( self ) :
local_path = self . lp . get ( ' path ' , ' sysvol ' )
guids = [ ' { 31B2F340-016D-11D2-945F-00C04FB984F9} ' ,
' { 6AC1786C-016F-11D2-945F-00C04FB984F9} ' ]
2018-12-13 23:50:02 +03:00
gpofile = ' %s / ' + realm + ' /Policies/ %s /MACHINE/Microsoft/ ' \
2018-08-30 02:28:58 +03:00
' Windows NT/SecEdit/GptTmpl.inf '
stage = ' [System Access] \n MinimumPasswordAge = 998 \n '
cache_dir = self . lp . get ( ' cache directory ' )
store = GPOStorage ( os . path . join ( cache_dir , ' gpo.tdb ' ) )
for guid in guids :
gpttmpl = gpofile % ( local_path , guid )
ret = stage_file ( gpttmpl , stage )
self . assertTrue ( ret , ' Could not create the target %s ' % gpttmpl )
ret = gpupdate_force ( self . lp )
2020-02-07 01:02:38 +03:00
self . assertEqual ( ret , 0 , ' gpupdate force failed ' )
2018-08-30 02:28:58 +03:00
2018-12-13 23:50:02 +03:00
gp_db = store . get_gplog ( self . dc_account )
2018-08-30 02:28:58 +03:00
applied_guids = gp_db . get_applied_guids ( )
2020-02-07 01:02:38 +03:00
self . assertEqual ( len ( applied_guids ) , 2 , ' The guids were not found ' )
2018-08-30 02:28:58 +03:00
self . assertIn ( guids [ 0 ] , applied_guids ,
' %s not in applied guids ' % guids [ 0 ] )
self . assertIn ( guids [ 1 ] , applied_guids ,
' %s not in applied guids ' % guids [ 1 ] )
applied_settings = gp_db . get_applied_settings ( applied_guids )
for policy in applied_settings :
self . assertIn ( ' System Access ' , policy [ 1 ] ,
' System Access policies not set ' )
self . assertIn ( ' minPwdAge ' , policy [ 1 ] [ ' System Access ' ] ,
' minPwdAge policy not set ' )
if policy [ 0 ] == guids [ 0 ] :
self . assertEqual ( int ( policy [ 1 ] [ ' System Access ' ] [ ' minPwdAge ' ] ) ,
days2rel_nttime ( 1 ) ,
' minPwdAge policy not set ' )
elif policy [ 0 ] == guids [ 1 ] :
self . assertEqual ( int ( policy [ 1 ] [ ' System Access ' ] [ ' minPwdAge ' ] ) ,
days2rel_nttime ( 998 ) ,
' minPwdAge policy not set ' )
2018-08-30 19:25:45 +03:00
ads = gpo . ADS_STRUCT ( self . server , self . lp , self . creds )
if ads . connect ( ) :
2018-12-13 23:50:02 +03:00
gpos = ads . get_gpo_list ( self . dc_account )
2018-08-30 19:25:45 +03:00
del_gpos = get_deleted_gpos_list ( gp_db , gpos [ : - 1 ] )
self . assertEqual ( len ( del_gpos ) , 1 , ' Returned delete gpos is incorrect ' )
self . assertEqual ( guids [ - 1 ] , del_gpos [ 0 ] [ 0 ] ,
' GUID for delete gpo is incorrect ' )
self . assertIn ( ' System Access ' , del_gpos [ 0 ] [ 1 ] ,
' System Access policies not set for removal ' )
self . assertIn ( ' minPwdAge ' , del_gpos [ 0 ] [ 1 ] [ ' System Access ' ] ,
' minPwdAge policy not set for removal ' )
2018-08-30 02:28:58 +03:00
for guid in guids :
gpttmpl = gpofile % ( local_path , guid )
unstage_file ( gpttmpl )
ret = gpupdate_unapply ( self . lp )
2020-02-07 01:02:38 +03:00
self . assertEqual ( ret , 0 , ' gpupdate unapply failed ' )
2018-08-31 00:22:08 +03:00
def test_process_group_policy ( self ) :
local_path = self . lp . cache_path ( ' gpo_cache ' )
guids = [ ' { 31B2F340-016D-11D2-945F-00C04FB984F9} ' ,
' { 6AC1786C-016F-11D2-945F-00C04FB984F9} ' ]
2018-12-13 23:50:02 +03:00
gpofile = ' %s / ' + policies + ' / %s /MACHINE/MICROSOFT/ ' \
2018-08-31 00:22:08 +03:00
' WINDOWS NT/SECEDIT/GPTTMPL.INF '
logger = logging . getLogger ( ' gpo_tests ' )
cache_dir = self . lp . get ( ' cache directory ' )
store = GPOStorage ( os . path . join ( cache_dir , ' gpo.tdb ' ) )
machine_creds = Credentials ( )
machine_creds . guess ( self . lp )
machine_creds . set_machine_account ( )
# Initialize the group policy extension
2020-06-27 00:34:02 +03:00
ext = gp_krb_ext ( logger , self . lp , machine_creds , store )
2018-08-31 00:22:08 +03:00
ads = gpo . ADS_STRUCT ( self . server , self . lp , machine_creds )
if ads . connect ( ) :
gpos = ads . get_gpo_list ( machine_creds . get_username ( ) )
stage = ' [Kerberos Policy] \n MaxTicketAge = %d \n '
opts = [ 100 , 200 ]
for i in range ( 0 , 2 ) :
gpttmpl = gpofile % ( local_path , guids [ i ] )
ret = stage_file ( gpttmpl , stage % opts [ i ] )
self . assertTrue ( ret , ' Could not create the target %s ' % gpttmpl )
# Process all gpos
ext . process_group_policy ( [ ] , gpos )
ret = store . get_int ( ' kdc:user_ticket_lifetime ' )
self . assertEqual ( ret , opts [ 1 ] , ' Higher priority policy was not set ' )
# Remove policy
gp_db = store . get_gplog ( machine_creds . get_username ( ) )
del_gpos = get_deleted_gpos_list ( gp_db , [ ] )
ext . process_group_policy ( del_gpos , [ ] )
ret = store . get_int ( ' kdc:user_ticket_lifetime ' )
self . assertEqual ( ret , None , ' MaxTicketAge should not have applied ' )
# Process just the first gpo
ext . process_group_policy ( [ ] , gpos [ : - 1 ] )
ret = store . get_int ( ' kdc:user_ticket_lifetime ' )
self . assertEqual ( ret , opts [ 0 ] , ' Lower priority policy was not set ' )
# Remove policy
ext . process_group_policy ( del_gpos , [ ] )
for guid in guids :
gpttmpl = gpofile % ( local_path , guid )
unstage_file ( gpttmpl )
2020-06-17 00:29:40 +03:00
2020-06-25 23:15:18 +03:00
def test_gp_scripts ( self ) :
2020-06-17 00:29:40 +03:00
local_path = self . lp . cache_path ( ' gpo_cache ' )
guid = ' { 31B2F340-016D-11D2-945F-00C04FB984F9} '
reg_pol = os . path . join ( local_path , policies , guid ,
' MACHINE/REGISTRY.POL ' )
logger = logging . getLogger ( ' gpo_tests ' )
cache_dir = self . lp . get ( ' cache directory ' )
store = GPOStorage ( os . path . join ( cache_dir , ' gpo.tdb ' ) )
machine_creds = Credentials ( )
machine_creds . guess ( self . lp )
machine_creds . set_machine_account ( )
# Initialize the group policy extension
ext = gp_scripts_ext ( logger , self . lp , machine_creds , store )
ads = gpo . ADS_STRUCT ( self . server , self . lp , machine_creds )
if ads . connect ( ) :
gpos = ads . get_gpo_list ( machine_creds . get_username ( ) )
2020-06-25 23:15:18 +03:00
reg_key = b ' Software \\ Policies \\ Samba \\ Unix Settings '
sections = { b ' %s \\ Daily Scripts ' % reg_key : ' .cron.daily ' ,
2020-06-26 00:03:03 +03:00
b ' %s \\ Monthly Scripts ' % reg_key : ' .cron.monthly ' ,
2020-06-26 00:23:35 +03:00
b ' %s \\ Weekly Scripts ' % reg_key : ' .cron.weekly ' ,
2020-06-25 23:15:18 +03:00
b ' %s \\ Hourly Scripts ' % reg_key : ' .cron.hourly ' }
for keyname in sections . keys ( ) :
# Stage the Registry.pol file with test data
stage = preg . file ( )
e = preg . entry ( )
e . keyname = keyname
e . valuename = b ' Software \\ Policies \\ Samba \\ Unix Settings '
e . type = 1
e . data = b ' echo hello world '
stage . num_entries = 1
stage . entries = [ e ]
ret = stage_file ( reg_pol , ndr_pack ( stage ) )
self . assertTrue ( ret , ' Could not create the target %s ' % reg_pol )
# Process all gpos, with temp output directory
with TemporaryDirectory ( sections [ keyname ] ) as dname :
ext . process_group_policy ( [ ] , gpos , dname )
scripts = os . listdir ( dname )
self . assertEquals ( len ( scripts ) , 1 ,
' The %s script was not created ' % keyname . decode ( ) )
out , _ = Popen ( [ os . path . join ( dname , scripts [ 0 ] ) ] , stdout = PIPE ) . communicate ( )
self . assertIn ( b ' hello world ' , out ,
' %s script execution failed ' % keyname . decode ( ) )
2020-08-07 22:58:34 +03:00
# Remove policy
gp_db = store . get_gplog ( machine_creds . get_username ( ) )
del_gpos = get_deleted_gpos_list ( gp_db , [ ] )
ext . process_group_policy ( del_gpos , [ ] )
self . assertEquals ( len ( os . listdir ( dname ) ) , 0 ,
' Unapply failed to cleanup scripts ' )
2020-06-25 23:15:18 +03:00
# Unstage the Registry.pol file
unstage_file ( reg_pol )
2020-06-26 21:37:11 +03:00
def test_gp_sudoers ( self ) :
local_path = self . lp . cache_path ( ' gpo_cache ' )
guid = ' { 31B2F340-016D-11D2-945F-00C04FB984F9} '
reg_pol = os . path . join ( local_path , policies , guid ,
' MACHINE/REGISTRY.POL ' )
logger = logging . getLogger ( ' gpo_tests ' )
cache_dir = self . lp . get ( ' cache directory ' )
store = GPOStorage ( os . path . join ( cache_dir , ' gpo.tdb ' ) )
machine_creds = Credentials ( )
machine_creds . guess ( self . lp )
machine_creds . set_machine_account ( )
# Initialize the group policy extension
ext = gp_sudoers_ext ( logger , self . lp , machine_creds , store )
ads = gpo . ADS_STRUCT ( self . server , self . lp , machine_creds )
if ads . connect ( ) :
gpos = ads . get_gpo_list ( machine_creds . get_username ( ) )
# Stage the Registry.pol file with test data
stage = preg . file ( )
e = preg . entry ( )
e . keyname = b ' Software \\ Policies \\ Samba \\ Unix Settings \\ Sudo Rights '
e . valuename = b ' Software \\ Policies \\ Samba \\ Unix Settings '
e . type = 1
e . data = b ' fakeu ALL=(ALL) NOPASSWD: ALL '
stage . num_entries = 1
stage . entries = [ e ]
ret = stage_file ( reg_pol , ndr_pack ( stage ) )
self . assertTrue ( ret , ' Could not create the target %s ' % reg_pol )
# Process all gpos, with temp output directory
with TemporaryDirectory ( ) as dname :
ext . process_group_policy ( [ ] , gpos , dname )
sudoers = os . listdir ( dname )
self . assertEquals ( len ( sudoers ) , 1 , ' The sudoer file was not created ' )
self . assertIn ( e . data ,
open ( os . path . join ( dname , sudoers [ 0 ] ) , ' r ' ) . read ( ) ,
' The sudoers entry was not applied ' )
2020-08-07 22:59:32 +03:00
# Remove policy
gp_db = store . get_gplog ( machine_creds . get_username ( ) )
del_gpos = get_deleted_gpos_list ( gp_db , [ ] )
ext . process_group_policy ( del_gpos , [ ] )
self . assertEquals ( len ( os . listdir ( dname ) ) , 0 ,
' Unapply failed to cleanup scripts ' )
2020-06-26 21:37:11 +03:00
# Unstage the Registry.pol file
unstage_file ( reg_pol )
2020-07-07 20:10:10 +03:00
def test_gp_inf_ext_utf ( self ) :
logger = logging . getLogger ( ' gpo_tests ' )
cache_dir = self . lp . get ( ' cache directory ' )
store = GPOStorage ( os . path . join ( cache_dir , ' gpo.tdb ' ) )
machine_creds = Credentials ( )
machine_creds . guess ( self . lp )
machine_creds . set_machine_account ( )
ext = gp_inf_ext ( logger , self . lp , machine_creds , store )
test_data = ' [Kerberos Policy] \n MaxTicketAge = 99 \n '
with NamedTemporaryFile ( ) as f :
with codecs . open ( f . name , ' w ' , ' utf-16 ' ) as w :
w . write ( test_data )
try :
inf_conf = ext . read ( f . name )
except UnicodeDecodeError :
self . fail ( ' Failed to parse utf-16 ' )
self . assertIn ( ' Kerberos Policy ' , inf_conf . keys ( ) ,
' Kerberos Policy was not read from the file ' )
self . assertEquals ( inf_conf . get ( ' Kerberos Policy ' , ' MaxTicketAge ' ) ,
' 99 ' , ' MaxTicketAge was not read from the file ' )
with NamedTemporaryFile ( ) as f :
with codecs . open ( f . name , ' w ' , ' utf-8 ' ) as w :
w . write ( test_data )
inf_conf = ext . read ( f . name )
self . assertIn ( ' Kerberos Policy ' , inf_conf . keys ( ) ,
' Kerberos Policy was not read from the file ' )
self . assertEquals ( inf_conf . get ( ' Kerberos Policy ' , ' MaxTicketAge ' ) ,
' 99 ' , ' MaxTicketAge was not read from the file ' )
2020-07-07 19:35:25 +03:00
def test_rsop ( self ) :
logger = logging . getLogger ( ' gpo_tests ' )
cache_dir = self . lp . get ( ' cache directory ' )
local_path = self . lp . cache_path ( ' gpo_cache ' )
store = GPOStorage ( os . path . join ( cache_dir , ' gpo.tdb ' ) )
machine_creds = Credentials ( )
machine_creds . guess ( self . lp )
machine_creds . set_machine_account ( )
ads = gpo . ADS_STRUCT ( self . server , self . lp , machine_creds )
if ads . connect ( ) :
gpos = ads . get_gpo_list ( machine_creds . get_username ( ) )
gp_extensions = [ ]
2020-08-07 02:25:47 +03:00
gp_extensions . append ( gp_krb_ext )
gp_extensions . append ( gp_scripts_ext )
gp_extensions . append ( gp_sudoers_ext )
2020-08-19 22:02:48 +03:00
gp_extensions . append ( gp_smb_conf_ext )
2020-07-07 19:35:25 +03:00
# Create registry stage data
reg_pol = os . path . join ( local_path , policies , ' %s /MACHINE/REGISTRY.POL ' )
reg_stage = preg . file ( )
e = preg . entry ( )
e . keyname = b ' Software \\ Policies \\ Samba \\ Unix Settings \\ Daily Scripts '
e . valuename = b ' Software \\ Policies \\ Samba \\ Unix Settings '
e . type = 1
e . data = b ' echo hello world '
2020-08-06 23:53:02 +03:00
e2 = preg . entry ( )
e2 . keyname = b ' Software \\ Policies \\ Samba \\ Unix Settings \\ Sudo Rights '
e2 . valuename = b ' Software \\ Policies \\ Samba \\ Unix Settings '
e2 . type = 1
e2 . data = b ' fakeu ALL=(ALL) NOPASSWD: ALL '
2020-08-19 22:02:48 +03:00
e3 = preg . entry ( )
e3 . keyname = ' Software \\ Policies \\ Samba \\ smb_conf \\ apply group policies '
e3 . type = 4
e3 . data = 1
e3 . valuename = ' apply group policies '
reg_stage . num_entries = 3
reg_stage . entries = [ e , e2 , e3 ]
2020-07-07 19:35:25 +03:00
# Create krb stage date
gpofile = os . path . join ( local_path , policies , ' %s /MACHINE/MICROSOFT/ ' \
' WINDOWS NT/SECEDIT/GPTTMPL.INF ' )
krb_stage = ' [Kerberos Policy] \n MaxTicketAge = 99 \n '
for g in [ g for g in gpos if g . file_sys_path ] :
ret = stage_file ( gpofile % g . name , krb_stage )
self . assertTrue ( ret , ' Could not create the target %s ' %
( gpofile % g . name ) )
ret = stage_file ( reg_pol % g . name , ndr_pack ( reg_stage ) )
self . assertTrue ( ret , ' Could not create the target %s ' %
( reg_pol % g . name ) )
for ext in gp_extensions :
2020-08-07 02:25:47 +03:00
ext = ext ( logger , self . lp , machine_creds , store )
2020-07-07 19:35:25 +03:00
ret = ext . rsop ( g )
self . assertEquals ( len ( ret . keys ( ) ) , 1 ,
' A single policy should have been displayed ' )
# Check the Security Extension
2020-06-27 00:34:02 +03:00
if type ( ext ) == gp_krb_ext :
2020-07-07 19:35:25 +03:00
self . assertIn ( ' Kerberos Policy ' , ret . keys ( ) ,
' Kerberos Policy not found ' )
self . assertIn ( ' MaxTicketAge ' , ret [ ' Kerberos Policy ' ] ,
' MaxTicketAge setting not found ' )
self . assertEquals ( ret [ ' Kerberos Policy ' ] [ ' MaxTicketAge ' ] , ' 99 ' ,
' MaxTicketAge was not set to 99 ' )
# Check the Scripts Extension
elif type ( ext ) == gp_scripts_ext :
self . assertIn ( ' Daily Scripts ' , ret . keys ( ) ,
' Daily Scripts not found ' )
self . assertIn ( ' echo hello world ' , ret [ ' Daily Scripts ' ] ,
' Daily script was not created ' )
2020-08-06 23:53:02 +03:00
# Check the Sudoers Extension
elif type ( ext ) == gp_sudoers_ext :
self . assertIn ( ' Sudo Rights ' , ret . keys ( ) ,
' Sudoers not found ' )
self . assertIn ( ' fakeu ALL=(ALL) NOPASSWD: ALL ' ,
ret [ ' Sudo Rights ' ] ,
' Sudoers policy not created ' )
2020-08-19 22:02:48 +03:00
# Check the smb.conf Extension
elif type ( ext ) == gp_smb_conf_ext :
self . assertIn ( ' smb.conf ' , ret . keys ( ) ,
' apply group policies was not applied ' )
self . assertIn ( e3 . valuename , ret [ ' smb.conf ' ] ,
' apply group policies was not applied ' )
self . assertEquals ( ret [ ' smb.conf ' ] [ e3 . valuename ] , e3 . data ,
' apply group policies was not set ' )
2020-07-07 19:35:25 +03:00
unstage_file ( gpofile % g . name )
unstage_file ( reg_pol % g . name )
2020-08-07 00:41:13 +03:00
def test_gp_unapply ( self ) :
logger = logging . getLogger ( ' gpo_tests ' )
cache_dir = self . lp . get ( ' cache directory ' )
local_path = self . lp . cache_path ( ' gpo_cache ' )
guid = ' { 31B2F340-016D-11D2-945F-00C04FB984F9} '
store = GPOStorage ( os . path . join ( cache_dir , ' gpo.tdb ' ) )
machine_creds = Credentials ( )
machine_creds . guess ( self . lp )
machine_creds . set_machine_account ( )
ads = gpo . ADS_STRUCT ( self . server , self . lp , machine_creds )
if ads . connect ( ) :
gpos = ads . get_gpo_list ( machine_creds . get_username ( ) )
gp_extensions = [ ]
2020-08-07 02:25:47 +03:00
gp_extensions . append ( gp_krb_ext )
gp_extensions . append ( gp_scripts_ext )
gp_extensions . append ( gp_sudoers_ext )
2020-08-07 00:41:13 +03:00
# Create registry stage data
reg_pol = os . path . join ( local_path , policies , ' %s /MACHINE/REGISTRY.POL ' )
reg_stage = preg . file ( )
e = preg . entry ( )
e . keyname = b ' Software \\ Policies \\ Samba \\ Unix Settings \\ Daily Scripts '
e . valuename = b ' Software \\ Policies \\ Samba \\ Unix Settings '
e . type = 1
e . data = b ' echo hello world '
e2 = preg . entry ( )
e2 . keyname = b ' Software \\ Policies \\ Samba \\ Unix Settings \\ Sudo Rights '
e2 . valuename = b ' Software \\ Policies \\ Samba \\ Unix Settings '
e2 . type = 1
e2 . data = b ' fakeu ALL=(ALL) NOPASSWD: ALL '
reg_stage . num_entries = 2
reg_stage . entries = [ e , e2 ]
# Create krb stage date
gpofile = os . path . join ( local_path , policies , ' %s /MACHINE/MICROSOFT/ ' \
' WINDOWS NT/SECEDIT/GPTTMPL.INF ' )
krb_stage = ' [Kerberos Policy] \n MaxTicketAge = 99 \n '
ret = stage_file ( gpofile % guid , krb_stage )
self . assertTrue ( ret , ' Could not create the target %s ' %
( gpofile % guid ) )
ret = stage_file ( reg_pol % guid , ndr_pack ( reg_stage ) )
self . assertTrue ( ret , ' Could not create the target %s ' %
( reg_pol % guid ) )
# Process all gpos, with temp output directory
remove = [ ]
with TemporaryDirectory ( ) as dname :
for ext in gp_extensions :
2020-08-07 02:25:47 +03:00
ext = ext ( logger , self . lp , machine_creds , store )
2020-08-07 00:41:13 +03:00
if type ( ext ) == gp_krb_ext :
ext . process_group_policy ( [ ] , gpos )
ret = store . get_int ( ' kdc:user_ticket_lifetime ' )
self . assertEqual ( ret , 99 , ' Kerberos policy was not set ' )
elif type ( ext ) in [ gp_scripts_ext , gp_sudoers_ext ] :
ext . process_group_policy ( [ ] , gpos , dname )
gp_db = store . get_gplog ( machine_creds . get_username ( ) )
applied_settings = gp_db . get_applied_settings ( [ guid ] )
for _ , fname in applied_settings [ - 1 ] [ - 1 ] [ str ( ext ) ] . items ( ) :
self . assertIn ( dname , fname ,
' Test file not created in tmp dir ' )
self . assertTrue ( os . path . exists ( fname ) ,
' Test file not created ' )
remove . append ( fname )
# Unapply policy, and ensure policies are removed
gpupdate_unapply ( self . lp )
for fname in remove :
self . assertFalse ( os . path . exists ( fname ) ,
' Unapply did not remove test file ' )
ret = store . get_int ( ' kdc:user_ticket_lifetime ' )
self . assertNotEqual ( ret , 99 , ' Kerberos policy was not unapplied ' )
unstage_file ( gpofile % guid )
unstage_file ( reg_pol % guid )
2018-07-26 00:24:35 +03:00
def test_smb_conf_ext ( self ) :
local_path = self . lp . cache_path ( ' gpo_cache ' )
guid = ' { 31B2F340-016D-11D2-945F-00C04FB984F9} '
reg_pol = os . path . join ( local_path , policies , guid ,
' MACHINE/REGISTRY.POL ' )
logger = logging . getLogger ( ' gpo_tests ' )
cache_dir = self . lp . get ( ' cache directory ' )
store = GPOStorage ( os . path . join ( cache_dir , ' gpo.tdb ' ) )
machine_creds = Credentials ( )
machine_creds . guess ( self . lp )
machine_creds . set_machine_account ( )
ads = gpo . ADS_STRUCT ( self . server , self . lp , machine_creds )
if ads . connect ( ) :
gpos = ads . get_gpo_list ( machine_creds . get_username ( ) )
entries = [ ]
e = preg . entry ( )
e . keyname = ' Software \\ Policies \\ Samba \\ smb_conf \\ template homedir '
e . type = 1
e . data = ' /home/samba/ % D/ % U '
e . valuename = ' template homedir '
entries . append ( e )
e = preg . entry ( )
e . keyname = ' Software \\ Policies \\ Samba \\ smb_conf \\ apply group policies '
e . type = 4
e . data = 1
e . valuename = ' apply group policies '
entries . append ( e )
e = preg . entry ( )
e . keyname = ' Software \\ Policies \\ Samba \\ smb_conf \\ ldap timeout '
e . type = 4
e . data = 9999
e . valuename = ' ldap timeout '
entries . append ( e )
stage = preg . file ( )
stage . num_entries = len ( entries )
stage . entries = entries
ret = stage_file ( reg_pol , ndr_pack ( stage ) )
self . assertTrue ( ret , ' Failed to create the Registry.pol file ' )
with NamedTemporaryFile ( suffix = ' _smb.conf ' ) as f :
copyfile ( self . lp . configfile , f . name )
lp = LoadParm ( f . name )
# Initialize the group policy extension
ext = gp_smb_conf_ext ( logger , lp , machine_creds , store )
ext . process_group_policy ( [ ] , gpos )
lp = LoadParm ( f . name )
template_homedir = lp . get ( ' template homedir ' )
self . assertEquals ( template_homedir , ' /home/samba/ % D/ % U ' ,
' template homedir was not applied ' )
apply_group_policies = lp . get ( ' apply group policies ' )
self . assertTrue ( apply_group_policies ,
' apply group policies was not applied ' )
ldap_timeout = lp . get ( ' ldap timeout ' )
self . assertEquals ( ldap_timeout , 9999 , ' ldap timeout was not applied ' )
# Remove policy
gp_db = store . get_gplog ( machine_creds . get_username ( ) )
del_gpos = get_deleted_gpos_list ( gp_db , [ ] )
ext . process_group_policy ( del_gpos , [ ] )
lp = LoadParm ( f . name )
template_homedir = lp . get ( ' template homedir ' )
self . assertEquals ( template_homedir , self . lp . get ( ' template homedir ' ) ,
' template homedir was not unapplied ' )
apply_group_policies = lp . get ( ' apply group policies ' )
self . assertEquals ( apply_group_policies , self . lp . get ( ' apply group policies ' ) ,
' apply group policies was not unapplied ' )
ldap_timeout = lp . get ( ' ldap timeout ' )
self . assertEquals ( ldap_timeout , self . lp . get ( ' ldap timeout ' ) ,
' ldap timeout was not unapplied ' )
# Unstage the Registry.pol file
unstage_file ( reg_pol )
2020-07-09 17:39:41 +03:00
def test_gp_motd ( self ) :
local_path = self . lp . cache_path ( ' gpo_cache ' )
guid = ' { 31B2F340-016D-11D2-945F-00C04FB984F9} '
reg_pol = os . path . join ( local_path , policies , guid ,
' MACHINE/REGISTRY.POL ' )
logger = logging . getLogger ( ' gpo_tests ' )
cache_dir = self . lp . get ( ' cache directory ' )
store = GPOStorage ( os . path . join ( cache_dir , ' gpo.tdb ' ) )
machine_creds = Credentials ( )
machine_creds . guess ( self . lp )
machine_creds . set_machine_account ( )
# Initialize the group policy extension
ext = gp_msgs_ext ( logger , self . lp , machine_creds , store )
ads = gpo . ADS_STRUCT ( self . server , self . lp , machine_creds )
if ads . connect ( ) :
gpos = ads . get_gpo_list ( machine_creds . get_username ( ) )
# Stage the Registry.pol file with test data
stage = preg . file ( )
2020-07-09 18:53:34 +03:00
e1 = preg . entry ( )
e1 . keyname = b ' Software \\ Policies \\ Samba \\ Unix Settings \\ Messages '
e1 . valuename = b ' motd '
e1 . type = 1
e1 . data = b ' Have a lot of fun! '
stage . num_entries = 2
e2 = preg . entry ( )
e2 . keyname = b ' Software \\ Policies \\ Samba \\ Unix Settings \\ Messages '
e2 . valuename = b ' issue '
e2 . type = 1
e2 . data = b ' Welcome to \\ s \\ r \\ l '
stage . entries = [ e1 , e2 ]
2020-07-09 17:39:41 +03:00
ret = stage_file ( reg_pol , ndr_pack ( stage ) )
self . assertTrue ( ret , ' Could not create the target %s ' % reg_pol )
# Process all gpos, with temp output directory
with TemporaryDirectory ( ) as dname :
ext . process_group_policy ( [ ] , gpos , dname )
motd_file = os . path . join ( dname , ' motd ' )
self . assertTrue ( os . path . exists ( motd_file ) ,
' Message of the day file not created ' )
data = open ( motd_file , ' r ' ) . read ( )
2020-07-09 18:53:34 +03:00
self . assertEquals ( data , e1 . data , ' Message of the day not applied ' )
issue_file = os . path . join ( dname , ' issue ' )
self . assertTrue ( os . path . exists ( issue_file ) ,
' Login Prompt Message file not created ' )
data = open ( issue_file , ' r ' ) . read ( )
self . assertEquals ( data , e2 . data , ' Login Prompt Message not applied ' )
2020-07-09 17:39:41 +03:00
# Unapply policy, and ensure the test files are removed
gp_db = store . get_gplog ( machine_creds . get_username ( ) )
del_gpos = get_deleted_gpos_list ( gp_db , [ ] )
ext . process_group_policy ( del_gpos , [ ] , dname )
data = open ( motd_file , ' r ' ) . read ( )
self . assertFalse ( data , ' Message of the day file not removed ' )
2020-07-09 18:53:34 +03:00
data = open ( issue_file , ' r ' ) . read ( )
self . assertFalse ( data , ' Login Prompt Message file not removed ' )
2020-07-09 17:39:41 +03:00
# Unstage the Registry.pol file
unstage_file ( reg_pol )