2012-08-15 16:08:45 +04:00
# Unix SMB/CIFS implementation.
# Copyright Volker Lendecke <vl@samba.org> 2012
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
2018-12-13 02:32:17 +03:00
""" Tests for samba.samba3.libsmb. """
2012-08-15 16:08:45 +04:00
2019-01-09 00:15:49 +03:00
from samba . samba3 import libsmb_samba_internal as libsmb
2012-08-15 16:08:45 +04:00
from samba . dcerpc import security
2023-04-14 18:34:17 +03:00
from samba import NTSTATUSError , ntstatus
2020-11-17 17:24:43 +03:00
from samba . ntstatus import NT_STATUS_DELETE_PENDING
2020-06-10 12:26:00 +03:00
from samba . credentials import SMB_ENCRYPTION_REQUIRED
2022-10-19 12:34:40 +03:00
import samba . tests . libsmb
2012-08-15 16:08:45 +04:00
import threading
import sys
2020-06-10 12:26:00 +03:00
import random
2012-08-15 16:08:45 +04:00
2018-07-30 09:20:39 +03:00
2022-10-19 12:34:40 +03:00
class LibsmbTestCase ( samba . tests . libsmb . LibsmbTests ) :
2012-08-15 16:08:45 +04:00
class OpenClose ( threading . Thread ) :
def __init__ ( self , conn , filename , num_ops ) :
threading . Thread . __init__ ( self )
self . conn = conn
self . filename = filename
self . num_ops = num_ops
self . exc = False
def run ( self ) :
c = self . conn
try :
for i in range ( self . num_ops ) :
f = c . create ( self . filename , CreateDisposition = 3 ,
DesiredAccess = security . SEC_STD_DELETE )
c . delete_on_close ( f , True )
c . close ( f )
except Exception :
self . exc = sys . exc_info ( )
2020-11-17 18:11:11 +03:00
def test_OpenClose ( self ) :
2022-10-19 12:34:40 +03:00
c = libsmb . Conn (
self . server_ip ,
" tmp " ,
self . lp ,
self . creds ,
multi_threaded = True ,
force_smb1 = True )
2012-08-15 16:08:45 +04:00
mythreads = [ ]
for i in range ( 3 ) :
t = LibsmbTestCase . OpenClose ( c , " test " + str ( i ) , 10 )
mythreads . append ( t )
for t in mythreads :
t . start ( )
for t in mythreads :
t . join ( )
if t . exc :
raise t . exc [ 0 ] ( t . exc [ 1 ] )
2020-06-10 12:26:00 +03:00
def test_SMB3EncryptionRequired ( self ) :
test_dir = ' testing_ %d ' % random . randint ( 0 , 0xFFFF )
2022-10-19 12:34:40 +03:00
self . creds . set_smb_encryption ( SMB_ENCRYPTION_REQUIRED )
2020-06-10 12:26:00 +03:00
2022-10-19 12:34:40 +03:00
c = libsmb . Conn ( self . server_ip , " tmp " , self . lp , self . creds )
2020-06-10 12:26:00 +03:00
c . mkdir ( test_dir )
c . rmdir ( test_dir )
def test_SMB1EncryptionRequired ( self ) :
test_dir = ' testing_ %d ' % random . randint ( 0 , 0xFFFF )
2022-10-19 12:34:40 +03:00
self . creds . set_smb_encryption ( SMB_ENCRYPTION_REQUIRED )
2020-06-10 12:26:00 +03:00
2022-10-19 12:34:40 +03:00
c = libsmb . Conn (
self . server_ip ,
" tmp " ,
self . lp ,
self . creds ,
force_smb1 = True )
2020-06-10 12:26:00 +03:00
c . mkdir ( test_dir )
c . rmdir ( test_dir )
2018-07-30 09:21:29 +03:00
2020-11-17 17:24:43 +03:00
def test_RenameDstDelOnClose ( self ) :
dstdir = " \\ dst-subdir "
2022-10-19 12:34:40 +03:00
c1 = libsmb . Conn ( self . server_ip , " tmp " , self . lp , self . creds )
c2 = libsmb . Conn ( self . server_ip , " tmp " , self . lp , self . creds )
2020-11-17 17:24:43 +03:00
try :
c1 . deltree ( dstdir )
except :
pass
c1 . mkdir ( dstdir )
dnum = c1 . create ( dstdir , DesiredAccess = security . SEC_STD_DELETE )
c1 . delete_on_close ( dnum , 1 )
c2 . savefile ( " \\ src.txt " , b " Content " )
with self . assertRaises ( NTSTATUSError ) as cm :
c2 . rename ( " \\ src.txt " , dstdir + " \\ dst.txt " )
if ( cm . exception . args [ 0 ] != NT_STATUS_DELETE_PENDING ) :
raise AssertionError ( " Rename must fail with DELETE_PENDING " )
c1 . delete_on_close ( dnum , 0 )
c1 . close ( dnum )
try :
c1 . deltree ( dstdir )
c1 . unlink ( " \\ src.txt " )
except :
pass
2022-08-29 18:02:25 +03:00
def test_libsmb_CreateContexts ( self ) :
2022-10-19 12:34:40 +03:00
c = libsmb . Conn ( self . server_ip , " tmp " , self . lp , self . creds )
2022-08-29 18:02:25 +03:00
cc_in = [ ( libsmb . SMB2_CREATE_TAG_MXAC , b ' ' ) ]
fnum , cr , cc = c . create_ex ( " " , CreateContexts = cc_in )
self . assertEqual (
cr [ ' file_attributes ' ] & libsmb . FILE_ATTRIBUTE_DIRECTORY ,
libsmb . FILE_ATTRIBUTE_DIRECTORY )
self . assertEqual ( cc [ 0 ] [ 0 ] , libsmb . SMB2_CREATE_TAG_MXAC )
self . assertEqual ( len ( cc [ 0 ] [ 1 ] ) , 8 )
c . close ( fnum )
2022-12-16 12:43:11 +03:00
def test_libsmb_TortureCaseSensitivity ( self ) :
testdir = " test_libsmb_torture_case_sensitivity "
filename = " file "
filepath = testdir + " / " + filename
c = libsmb . Conn ( self . server_ip , " tmp " , self . lp , self . creds )
try :
c . deltree ( testdir )
except :
pass
c . mkdir ( testdir )
try :
# Now check for all possible upper-/lowercase combinations:
# - testdir/file
# - TESTDIR/file
# - testdir/FILE
# - TESTDIR/FILE
dircases = [ testdir , testdir , testdir . upper ( ) , testdir . upper ( ) ]
filecases = [ filename , filename . upper ( ) , filename , filename . upper ( ) ]
tcases = [ { ' dir ' : dir , ' file ' : file } for dir , file in zip ( dircases , filecases ) ]
for tcase in tcases :
testpath = tcase [ ' dir ' ] + " / " + tcase [ ' file ' ]
# Create the testfile
h = c . create ( filepath ,
DesiredAccess = security . SEC_FILE_ALL ,
CreateDisposition = libsmb . FILE_OPEN_IF )
c . close ( h )
# Open
c . loadfile ( testpath )
# Search
ls = [ f [ ' name ' ] for f in c . list ( tcase [ ' dir ' ] , mask = tcase [ ' file ' ] ) ]
self . assertIn ( filename , ls , msg = ' When searching for " %s " not found in " %s " ' % ( tcase [ ' file ' ] , tcase [ ' dir ' ] ) )
# Rename
c . rename ( testpath , tcase [ ' dir ' ] + " /tmp " )
c . rename ( tcase [ ' dir ' ] + " /TMP " , filepath )
c . loadfile ( testpath )
# Delete
c . unlink ( testpath )
finally :
c . deltree ( testdir )
2023-02-17 17:41:12 +03:00
def test_libsmb_TortureDirCaseSensitive ( self ) :
c = libsmb . Conn ( self . server_ip , " lowercase " , self . lp , self . creds )
c . mkdir ( " subdir " )
c . mkdir ( " subdir/b " )
ret = c . chkpath ( " SubDir/b " )
c . rmdir ( " subdir/b " )
c . rmdir ( " subdir " )
self . assertTrue ( ret )
2023-04-14 18:34:17 +03:00
def test_libsmb_shadow_depot ( self ) :
c = libsmb . Conn ( self . server_ip , " shadow_depot " , self . lp , self . creds )
try :
fnum = c . create ( " x:y " , CreateDisposition = libsmb . FILE_CREATE )
c . close ( fnum )
except :
self . fail ( )
finally :
# "c" might have crashed, get a new connection
c1 = libsmb . Conn ( self . server_ip , " shadow_depot " , self . lp , self . creds )
c1 . unlink ( " x " )
c1 = None
2023-09-20 20:53:52 +03:00
def test_gencache_pollution_bz15481 ( self ) :
c = libsmb . Conn ( self . server_ip , " tmp " , self . lp , self . creds )
fh = c . create ( " file " ,
DesiredAccess = security . SEC_STD_DELETE ,
CreateDisposition = libsmb . FILE_CREATE )
# prime the gencache File->file
fh_upper = c . create ( " File " ,
DesiredAccess = security . SEC_FILE_READ_ATTRIBUTE ,
CreateDisposition = libsmb . FILE_OPEN )
c . close ( fh_upper )
c . delete_on_close ( fh , 1 )
c . close ( fh )
fh = c . create ( " File " ,
DesiredAccess = security . SEC_STD_DELETE ,
CreateDisposition = libsmb . FILE_CREATE )
directory = c . list ( " \\ " , " File " )
c . delete_on_close ( fh , 1 )
c . close ( fh )
# Without the bugfix for 15481 we get 'file' not 'File'
self . assertEqual ( directory [ 0 ] [ ' name ' ] , ' File ' )
2023-10-07 13:28:05 +03:00
def test_stream_close_with_full_information ( self ) :
c = libsmb . Conn ( self . server_ip , " streams_xattr " , self . lp , self . creds )
try :
c . deltree ( " teststreams " )
except :
pass
c . mkdir ( " teststreams " )
fh = c . create ( " teststreams \\ stream_full_close_info.txt:Stream " ,
DesiredAccess = security . SEC_STD_DELETE ,
CreateDisposition = libsmb . FILE_CREATE )
c . delete_on_close ( fh , 1 )
try :
c . close ( fh , libsmb . SMB2_CLOSE_FLAGS_FULL_INFORMATION )
except :
self . fail ( )
c . deltree ( " teststreams " )
2012-08-15 16:08:45 +04:00
if __name__ == " __main__ " :
import unittest
unittest . main ( )