2007-12-17 05:25:28 +03:00
# Unix SMB/CIFS implementation.
2010-11-28 05:15:36 +03:00
# Copyright (C) Jelmer Vernooij <jelmer@samba.org> 2007-2010
2014-03-27 01:42:19 +04:00
# Copyright (C) Stefan Metzmacher 2014,2015
2010-11-28 05:15:36 +03:00
#
2007-12-17 05:25:28 +03:00
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 3 of the License, or
# (at your option) any later version.
2010-11-28 05:15:36 +03:00
#
2007-12-17 05:25:28 +03:00
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
2010-11-28 05:15:36 +03:00
#
2007-12-17 05:25:28 +03:00
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
2007-12-30 03:14:15 +03:00
""" Samba Python tests. """
2007-12-17 05:25:28 +03:00
import os
2018-04-03 05:29:26 +03:00
import tempfile
2007-12-17 05:25:28 +03:00
import ldb
import samba
2010-06-13 18:38:24 +04:00
from samba import param
2014-11-05 08:26:25 +03:00
from samba import credentials
2016-09-16 12:11:58 +03:00
from samba . credentials import Credentials
2017-03-16 06:24:20 +03:00
from samba import gensec
2014-03-27 01:42:19 +04:00
import socket
import struct
2010-09-22 23:52:29 +04:00
import subprocess
2015-02-04 18:40:29 +03:00
import sys
2007-12-20 01:27:31 +03:00
import tempfile
2014-12-14 22:59:13 +03:00
import unittest
2018-02-11 01:59:40 +03:00
import re
2017-01-02 10:52:29 +03:00
import samba . auth
import samba . dcerpc . base
2018-02-11 01:59:40 +03:00
from samba . compat import PY3 , text_type
2018-05-01 21:58:36 +03:00
from samba . compat import string_types
2018-03-15 02:44:30 +03:00
from random import randint
2018-07-21 12:05:15 +03:00
try :
from samba . samdb import SamDB
except ImportError :
# We are built without samdb support,
# imitate it so that connect_samdb() can recover
2018-07-30 04:56:55 +03:00
def SamDB ( * args , * * kwargs ) :
return None
2018-05-10 15:14:22 +03:00
import samba . ndr
import samba . dcerpc . dcerpc
import samba . dcerpc . epmapper
2007-12-17 05:25:28 +03:00
2014-12-14 22:59:13 +03:00
try :
2014-12-14 23:03:28 +03:00
from unittest import SkipTest
2014-12-14 22:59:13 +03:00
except ImportError :
2014-12-14 23:03:28 +03:00
class SkipTest ( Exception ) :
2014-12-14 22:59:13 +03:00
""" Test skipped. """
2011-11-17 05:30:38 +04:00
2018-07-30 09:18:03 +03:00
HEXDUMP_FILTER = bytearray ( [ x if ( ( len ( repr ( chr ( x ) ) ) == 3 ) and ( x < 127 ) ) else ord ( ' . ' ) for x in range ( 256 ) ] )
2010-06-19 19:48:05 +04:00
2018-07-30 09:20:39 +03:00
2014-12-14 22:59:13 +03:00
class TestCase ( unittest . TestCase ) :
2011-01-05 03:09:25 +03:00
""" A Samba test case. """
def setUp ( self ) :
super ( TestCase , self ) . setUp ( )
test_debug_level = os . getenv ( " TEST_DEBUG_LEVEL " )
if test_debug_level is not None :
test_debug_level = int ( test_debug_level )
self . _old_debug_level = samba . get_debug_level ( )
samba . set_debug_level ( test_debug_level )
self . addCleanup ( samba . set_debug_level , test_debug_level )
2010-12-15 16:57:43 +03:00
def get_loadparm ( self ) :
return env_loadparm ( )
def get_credentials ( self ) :
return cmdline_credentials
2017-07-06 07:29:14 +03:00
def get_creds_ccache_name ( self ) :
creds = self . get_credentials ( )
ccache = creds . get_named_ccache ( self . get_loadparm ( ) )
ccache_name = ccache . get_name ( )
return ccache_name
2015-06-25 15:06:40 +03:00
def hexdump ( self , src ) :
2015-06-25 11:28:31 +03:00
N = 0
result = ' '
2018-05-01 21:58:36 +03:00
is_string = isinstance ( src , string_types )
2015-06-25 11:28:31 +03:00
while src :
2015-06-25 15:06:40 +03:00
ll = src [ : 8 ]
lr = src [ 8 : 16 ]
src = src [ 16 : ]
2018-05-01 21:58:36 +03:00
if is_string :
hl = ' ' . join ( [ " %02X " % ord ( x ) for x in ll ] )
hr = ' ' . join ( [ " %02X " % ord ( x ) for x in lr ] )
ll = ll . translate ( HEXDUMP_FILTER )
lr = lr . translate ( HEXDUMP_FILTER )
else :
hl = ' ' . join ( [ " %02X " % x for x in ll ] )
hr = ' ' . join ( [ " %02X " % x for x in lr ] )
ll = ll . translate ( HEXDUMP_FILTER ) . decode ( ' utf8 ' )
lr = lr . translate ( HEXDUMP_FILTER ) . decode ( ' utf8 ' )
2018-07-30 09:18:25 +03:00
result + = " [ %04X ] %-*s %-*s %s %s \n " % ( N , 8 * 3 , hl , 8 * 3 , hr , ll , lr )
2015-06-25 15:06:40 +03:00
N + = 16
2015-06-25 11:28:31 +03:00
return result
2017-03-16 06:24:20 +03:00
def insta_creds ( self , template = None , username = None , userpass = None , kerberos_state = None ) :
if template is None :
assert template is not None
if username is not None :
assert userpass is not None
if username is None :
assert userpass is None
username = template . get_username ( )
userpass = template . get_password ( )
if kerberos_state is None :
kerberos_state = template . get_kerberos_state ( )
# get a copy of the global creds or a the passed in creds
c = Credentials ( )
c . set_username ( username )
c . set_password ( userpass )
c . set_domain ( template . get_domain ( ) )
c . set_realm ( template . get_realm ( ) )
c . set_workstation ( template . get_workstation ( ) )
2017-03-30 21:03:30 +03:00
c . set_gensec_features ( c . get_gensec_features ( )
| gensec . FEATURE_SEAL )
2017-03-16 06:24:20 +03:00
c . set_kerberos_state ( kerberos_state )
return c
2015-01-27 05:44:10 +03:00
# These functions didn't exist before Python2.7:
2015-02-04 18:40:29 +03:00
if sys . version_info < ( 2 , 7 ) :
import warnings
2014-12-15 20:35:24 +03:00
def skipTest ( self , reason ) :
raise SkipTest ( reason )
2015-02-05 21:57:26 +03:00
def assertIn ( self , member , container , msg = None ) :
self . assertTrue ( member in container , msg )
2015-02-06 00:04:44 +03:00
def assertIs ( self , a , b , msg = None ) :
self . assertTrue ( a is b , msg )
2015-01-27 05:40:34 +03:00
2015-02-06 00:04:44 +03:00
def assertIsNot ( self , a , b , msg = None ) :
self . assertTrue ( a is not b , msg )
2015-01-27 05:40:34 +03:00
2015-02-06 00:04:44 +03:00
def assertIsNotNone ( self , a , msg = None ) :
self . assertTrue ( a is not None )
def assertIsInstance ( self , a , b , msg = None ) :
self . assertTrue ( isinstance ( a , b ) , msg )
2015-01-29 01:17:13 +03:00
2015-02-05 12:25:53 +03:00
def assertIsNone ( self , a , msg = None ) :
self . assertTrue ( a is None , msg )
2015-06-29 09:41:22 +03:00
def assertGreater ( self , a , b , msg = None ) :
self . assertTrue ( a > b , msg )
def assertGreaterEqual ( self , a , b , msg = None ) :
self . assertTrue ( a > = b , msg )
def assertLess ( self , a , b , msg = None ) :
self . assertTrue ( a < b , msg )
def assertLessEqual ( self , a , b , msg = None ) :
self . assertTrue ( a < = b , msg )
2015-01-27 05:44:10 +03:00
def addCleanup ( self , fn , * args , * * kwargs ) :
self . _cleanups = getattr ( self , " _cleanups " , [ ] ) + [
( fn , args , kwargs ) ]
2018-02-11 01:59:40 +03:00
def assertRegexpMatches ( self , text , regex , msg = None ) :
# PY3 note: Python 3 will never see this, but we use
# text_type for the benefit of linters.
if isinstance ( regex , ( str , text_type ) ) :
regex = re . compile ( regex )
if not regex . search ( text ) :
self . fail ( msg )
2015-02-04 18:40:29 +03:00
def _addSkip ( self , result , reason ) :
addSkip = getattr ( result , ' addSkip ' , None )
if addSkip is not None :
addSkip ( self , reason )
else :
warnings . warn ( " TestResult has no addSkip method, skips not reported " ,
RuntimeWarning , 2 )
result . addSuccess ( self )
2015-01-30 04:16:05 +03:00
def run ( self , result = None ) :
2018-07-30 09:22:02 +03:00
if result is None :
result = self . defaultTestResult ( )
2015-02-04 18:40:29 +03:00
result . startTest ( self )
testMethod = getattr ( self , self . _testMethodName )
try :
try :
self . setUp ( )
2016-12-13 13:26:53 +03:00
except SkipTest as e :
2015-02-04 18:40:29 +03:00
self . _addSkip ( result , str ( e ) )
return
except KeyboardInterrupt :
raise
except :
result . addError ( self , self . _exc_info ( ) )
return
ok = False
try :
testMethod ( )
ok = True
2016-12-13 13:26:53 +03:00
except SkipTest as e :
2015-02-04 18:40:29 +03:00
self . _addSkip ( result , str ( e ) )
return
except self . failureException :
result . addFailure ( self , self . _exc_info ( ) )
except KeyboardInterrupt :
raise
except :
result . addError ( self , self . _exc_info ( ) )
try :
self . tearDown ( )
2016-12-13 13:26:53 +03:00
except SkipTest as e :
2015-02-04 18:40:29 +03:00
self . _addSkip ( result , str ( e ) )
except KeyboardInterrupt :
raise
except :
result . addError ( self , self . _exc_info ( ) )
ok = False
for ( fn , args , kwargs ) in reversed ( getattr ( self , " _cleanups " , [ ] ) ) :
fn ( * args , * * kwargs )
2018-07-30 09:22:02 +03:00
if ok :
result . addSuccess ( self )
2015-02-04 18:40:29 +03:00
finally :
result . stopTest ( self )
2015-01-27 05:44:10 +03:00
2018-01-05 06:45:37 +03:00
def assertStringsEqual ( self , a , b , msg = None , strip = False ) :
""" Assert equality between two strings and highlight any differences.
If strip is true , leading and trailing whitespace is ignored . """
if strip :
a = a . strip ( )
b = b . strip ( )
if a != b :
sys . stderr . write ( " The strings differ %s (lengths %d vs %d ); "
" a diff follows \n "
% ( ' when stripped ' if strip else ' ' ,
len ( a ) , len ( b ) ,
2018-07-30 09:14:43 +03:00
) )
2018-01-05 06:45:37 +03:00
from difflib import unified_diff
diff = unified_diff ( a . splitlines ( True ) ,
b . splitlines ( True ) ,
' a ' , ' b ' )
for line in diff :
sys . stderr . write ( line )
self . fail ( msg )
2010-12-15 16:57:43 +03:00
2015-01-30 04:06:33 +03:00
class LdbTestCase ( TestCase ) :
2007-12-30 03:14:15 +03:00
""" Trivial test case for running tests against a LDB. """
2010-06-19 20:58:18 +04:00
2007-12-17 05:25:28 +03:00
def setUp ( self ) :
2010-06-19 20:58:18 +04:00
super ( LdbTestCase , self ) . setUp ( )
2018-04-03 05:29:26 +03:00
self . tempfile = tempfile . NamedTemporaryFile ( delete = False )
self . filename = self . tempfile . name
2007-12-17 05:25:28 +03:00
self . ldb = samba . Ldb ( self . filename )
2007-12-17 13:12:36 +03:00
def set_modules ( self , modules = [ ] ) :
2007-12-30 03:14:15 +03:00
""" Change the modules for this Ldb. """
2007-12-17 05:25:28 +03:00
m = ldb . Message ( )
m . dn = ldb . Dn ( self . ldb , " @MODULES " )
m [ " @LIST " ] = " , " . join ( modules )
self . ldb . add ( m )
self . ldb = samba . Ldb ( self . filename )
2010-06-19 19:48:05 +04:00
class TestCaseInTempDir ( TestCase ) :
2009-04-06 01:03:13 +04:00
2007-12-20 01:27:31 +03:00
def setUp ( self ) :
super ( TestCaseInTempDir , self ) . setUp ( )
self . tempdir = tempfile . mkdtemp ( )
2012-10-27 03:58:06 +04:00
self . addCleanup ( self . _remove_tempdir )
2007-12-20 01:27:31 +03:00
2012-10-27 03:58:06 +04:00
def _remove_tempdir ( self ) :
2008-05-29 19:29:56 +04:00
self . assertEquals ( [ ] , os . listdir ( self . tempdir ) )
2007-12-26 01:36:31 +03:00
os . rmdir ( self . tempdir )
2012-10-27 03:58:06 +04:00
self . tempdir = None
2007-12-20 01:27:31 +03:00
2010-06-13 18:38:24 +04:00
def env_loadparm ( ) :
lp = param . LoadParm ( )
try :
lp . load ( os . environ [ " SMB_CONF_PATH " ] )
except KeyError :
2014-12-02 07:04:40 +03:00
raise KeyError ( " SMB_CONF_PATH not set " )
2010-06-13 18:38:24 +04:00
return lp
2010-11-28 05:34:47 +03:00
2016-09-20 22:06:39 +03:00
def env_get_var_value ( var_name , allow_missing = False ) :
2010-09-29 15:53:12 +04:00
""" Returns value for variable in os.environ
Function throws AssertionError if variable is defined .
Unit - test based python tests require certain input params
to be set in environment , otherwise they can ' t be run
"""
2016-09-20 22:06:39 +03:00
if allow_missing :
if var_name not in os . environ . keys ( ) :
return None
2010-09-29 15:53:12 +04:00
assert var_name in os . environ . keys ( ) , " Please supply %s in environment " % var_name
return os . environ [ var_name ]
2008-04-14 21:13:41 +04:00
cmdline_credentials = None
2008-04-14 20:30:07 +04:00
2018-07-30 09:20:39 +03:00
2010-06-19 19:48:05 +04:00
class RpcInterfaceTestCase ( TestCase ) :
2010-12-15 16:57:43 +03:00
""" DCE/RPC Test case. """
2009-04-06 01:17:43 +04:00
2010-06-19 19:48:05 +04:00
class ValidNetbiosNameTests ( TestCase ) :
2009-04-06 01:17:43 +04:00
def test_valid ( self ) :
2009-04-20 13:11:25 +04:00
self . assertTrue ( samba . valid_netbios_name ( " FOO " ) )
2009-04-06 01:17:43 +04:00
def test_too_long ( self ) :
2018-07-30 09:18:25 +03:00
self . assertFalse ( samba . valid_netbios_name ( " FOO " * 10 ) )
2009-04-06 01:17:43 +04:00
def test_invalid_characters ( self ) :
2009-04-20 13:11:25 +04:00
self . assertFalse ( samba . valid_netbios_name ( " *BLA " ) )
2010-09-22 23:52:29 +04:00
2012-01-09 14:55:08 +04:00
class BlackboxProcessError ( Exception ) :
""" This is raised when check_output() process returns a non-zero exit status
Exception instance should contain the exact exit code ( S . returncode ) ,
command line ( S . cmd ) , process output ( S . stdout ) and process error stream
( S . stderr )
"""
2018-02-22 04:19:11 +03:00
def __init__ ( self , returncode , cmd , stdout , stderr , msg = None ) :
2012-01-09 14:55:08 +04:00
self . returncode = returncode
self . cmd = cmd
2011-02-20 05:15:08 +03:00
self . stdout = stdout
self . stderr = stderr
2018-02-22 04:19:11 +03:00
self . msg = msg
2012-01-09 14:55:08 +04:00
2011-02-20 05:15:08 +03:00
def __str__ ( self ) :
2018-02-22 04:19:11 +03:00
s = ( " Command ' %s ' ; exit status %d ; stdout: ' %s ' ; stderr: ' %s ' " %
( self . cmd , self . returncode , self . stdout , self . stderr ) )
if self . msg is not None :
s = " %s ; message: %s " % ( s , self . msg )
return s
2011-02-20 05:15:08 +03:00
2018-07-30 09:20:39 +03:00
2015-08-17 06:33:31 +03:00
class BlackboxTestCase ( TestCaseInTempDir ) :
2010-09-22 23:52:29 +04:00
""" Base test case for blackbox tests. """
2011-02-09 04:40:17 +03:00
def _make_cmdline ( self , line ) :
2010-09-22 23:52:29 +04:00
bindir = os . path . abspath ( os . path . join ( os . path . dirname ( __file__ ) , " ../../../../bin " ) )
parts = line . split ( " " )
if os . path . exists ( os . path . join ( bindir , parts [ 0 ] ) ) :
parts [ 0 ] = os . path . join ( bindir , parts [ 0 ] )
line = " " . join ( parts )
2011-02-09 04:40:17 +03:00
return line
2018-02-22 04:19:11 +03:00
def check_run ( self , line , msg = None ) :
self . check_exit_code ( line , 0 , msg = msg )
2017-08-16 04:52:25 +03:00
2018-02-22 04:19:11 +03:00
def check_exit_code ( self , line , expected , msg = None ) :
2011-02-09 04:40:17 +03:00
line = self . _make_cmdline ( line )
2017-08-16 04:52:25 +03:00
p = subprocess . Popen ( line ,
stdout = subprocess . PIPE ,
stderr = subprocess . PIPE ,
shell = True )
2017-09-15 07:13:26 +03:00
stdoutdata , stderrdata = p . communicate ( )
retcode = p . returncode
2017-08-16 04:52:25 +03:00
if retcode != expected :
raise BlackboxProcessError ( retcode ,
line ,
2017-09-15 07:13:26 +03:00
stdoutdata ,
2018-02-22 04:19:11 +03:00
stderrdata ,
msg )
2010-09-29 03:58:23 +04:00
2011-02-09 04:01:16 +03:00
def check_output ( self , line ) :
2011-02-09 04:40:17 +03:00
line = self . _make_cmdline ( line )
2011-02-20 05:17:25 +03:00
p = subprocess . Popen ( line , stdout = subprocess . PIPE , stderr = subprocess . PIPE , shell = True , close_fds = True )
2017-09-15 07:13:26 +03:00
stdoutdata , stderrdata = p . communicate ( )
retcode = p . returncode
2011-02-09 04:01:16 +03:00
if retcode :
2017-09-15 07:13:26 +03:00
raise BlackboxProcessError ( retcode , line , stdoutdata , stderrdata )
return stdoutdata
2010-09-29 03:58:23 +04:00
2014-10-13 08:11:22 +04:00
2010-11-28 05:15:36 +03:00
def connect_samdb ( samdb_url , lp = None , session_info = None , credentials = None ,
2014-10-13 08:11:22 +04:00
flags = 0 , ldb_options = None , ldap_only = False , global_schema = True ) :
2010-11-28 05:15:36 +03:00
""" Create SamDB instance and connects to samdb_url database.
2010-09-29 03:58:23 +04:00
: param samdb_url : Url for database to connect to .
: param lp : Optional loadparm object
: param session_info : Optional session information
: param credentials : Optional credentials , defaults to anonymous .
: param flags : Optional LDB flags
: param ldap_only : If set , only remote LDAP connection will be created .
2014-10-13 08:11:22 +04:00
: param global_schema : Whether to use global schema .
2010-09-29 03:58:23 +04:00
Added value for tests is that we have a shorthand function
to make proper URL for ldb . connect ( ) while using default
parameters for connection based on test environment
"""
2018-07-30 09:22:34 +03:00
if " :// " not in samdb_url :
2010-09-29 03:58:23 +04:00
if not ldap_only and os . path . isfile ( samdb_url ) :
samdb_url = " tdb:// %s " % samdb_url
else :
samdb_url = " ldap:// %s " % samdb_url
# use 'paged_search' module when connecting remotely
if samdb_url . startswith ( " ldap:// " ) :
ldb_options = [ " modules:paged_searches " ]
2010-11-28 05:15:36 +03:00
elif ldap_only :
raise AssertionError ( " Trying to connect to %s while remote "
" connection is required " % samdb_url )
2010-09-29 03:58:23 +04:00
# set defaults for test environment
2010-11-28 05:15:36 +03:00
if lp is None :
lp = env_loadparm ( )
if session_info is None :
session_info = samba . auth . system_session ( lp )
if credentials is None :
credentials = cmdline_credentials
2010-09-29 03:58:23 +04:00
return SamDB ( url = samdb_url ,
lp = lp ,
session_info = session_info ,
credentials = credentials ,
flags = flags ,
2014-10-13 08:11:22 +04:00
options = ldb_options ,
global_schema = global_schema )
2010-11-22 16:03:59 +03:00
2010-11-28 05:15:36 +03:00
def connect_samdb_ex ( samdb_url , lp = None , session_info = None , credentials = None ,
flags = 0 , ldb_options = None , ldap_only = False ) :
2010-11-22 16:03:59 +03:00
""" Connects to samdb_url database
: param samdb_url : Url for database to connect to .
: param lp : Optional loadparm object
: param session_info : Optional session information
: param credentials : Optional credentials , defaults to anonymous .
: param flags : Optional LDB flags
: param ldap_only : If set , only remote LDAP connection will be created .
: return : ( sam_db_connection , rootDse_record ) tuple
"""
2010-11-28 05:15:36 +03:00
sam_db = connect_samdb ( samdb_url , lp , session_info , credentials ,
2010-11-22 16:03:59 +03:00
flags , ldb_options , ldap_only )
# fetch RootDse
2010-11-28 05:15:36 +03:00
res = sam_db . search ( base = " " , expression = " " , scope = ldb . SCOPE_BASE ,
attrs = [ " * " ] )
2010-11-22 16:03:59 +03:00
return ( sam_db , res [ 0 ] )
2010-11-24 18:47:27 +03:00
2010-11-28 05:15:36 +03:00
2014-11-05 08:26:25 +03:00
def connect_samdb_env ( env_url , env_username , env_password , lp = None ) :
""" Connect to SamDB by getting URL and Credentials from environment
: param env_url : Environment variable name to get lsb url from
: param env_username : Username environment variable
: param env_password : Password environment variable
: return : sam_db_connection
"""
samdb_url = env_get_var_value ( env_url )
creds = credentials . Credentials ( )
if lp is None :
# guess Credentials parameters here. Otherwise workstation
# and domain fields are NULL and gencache code segfalts
lp = param . LoadParm ( )
creds . guess ( lp )
creds . set_username ( env_get_var_value ( env_username ) )
creds . set_password ( env_get_var_value ( env_password ) )
return connect_samdb ( samdb_url , credentials = creds , lp = lp )
2017-06-07 08:44:25 +03:00
def delete_force ( samdb , dn , * * kwargs ) :
2010-11-24 18:47:27 +03:00
try :
2017-06-07 08:44:25 +03:00
samdb . delete ( dn , * * kwargs )
2016-12-13 13:26:53 +03:00
except ldb . LdbError as error :
( num , errstr ) = error . args
2014-11-02 19:11:20 +03:00
assert num == ldb . ERR_NO_SUCH_OBJECT , " ldb.delete() failed: %s " % errstr
2018-03-15 02:44:30 +03:00
2018-07-30 09:20:39 +03:00
2018-03-15 02:44:30 +03:00
def create_test_ou ( samdb , name ) :
""" Creates a unique OU for the test """
# Add some randomness to the test OU. Replication between the testenvs is
# constantly happening in the background. Deletion of the last test's
# objects can be slow to replicate out. So the OU created by a previous
# testenv may still exist at the point that tests start on another testenv.
rand = randint ( 1 , 10000000 )
2018-07-30 09:18:03 +03:00
dn = ldb . Dn ( samdb , " OU= %s %d , %s " % ( name , rand , samdb . get_default_basedn ( ) ) )
2018-07-30 09:16:43 +03:00
samdb . add ( { " dn " : dn , " objectclass " : " organizationalUnit " } )
2018-03-15 02:44:30 +03:00
return dn