1
0
mirror of https://github.com/samba-team/samba.git synced 2025-07-28 11:42:03 +03:00

s4-python: Remove obsolete and broken torture modules.

The functionality of these modules is already present in a more current
form in other modules.
This commit is contained in:
Jelmer Vernooij
2010-04-08 18:53:14 +02:00
parent 19e1537fdf
commit be4b688175
5 changed files with 0 additions and 963 deletions

View File

@ -1,51 +0,0 @@
#!/usr/bin/python
import sys
from optparse import OptionParser
# Parse command line
parser = OptionParser()
parser.add_option("-b", "--binding", action="store", type="string",
dest="binding")
parser.add_option("-d", "--domain", action="store", type="string",
dest="domain")
parser.add_option("-u", "--username", action="store", type="string",
dest="username")
parser.add_option("-p", "--password", action="store", type="string",
dest="password")
(options, args) = parser.parse_args()
if not options.binding:
parser.error('You must supply a binding string')
if not options.username or not options.password or not options.domain:
parser.error('You must supply a domain, username and password')
binding = options.binding
domain = options.domain
username = options.username
password = options.password
if len(args) == 0:
parser.error('You must supply the name of a module to test')
# Import and test
for test in args:
try:
module = __import__('torture_%s' % test)
except ImportError:
print 'No such module "%s"' % test
sys.exit(1)
if not hasattr(module, 'runtests'):
print 'Module "%s" does not have a runtests function' % test
module.runtests(binding, (domain, username, password))

View File

@ -1,437 +0,0 @@
import sys, string
import dcerpc
def ResizeBufferCall(fn, pipe, r):
r['buffer'] = None
r['buf_size'] = 0
result = fn(pipe, r)
if result['result'] == dcerpc.WERR_INSUFFICIENT_BUFFER or \
result['result'] == dcerpc.WERR_MORE_DATA:
r['buffer'] = result['buf_size'] * '\x00'
r['buf_size'] = result['buf_size']
result = fn(pipe, r)
return result
def test_OpenPrinterEx(pipe, printer):
print 'spoolss_OpenPrinterEx(%s)' % printer
printername = '\\\\%s' % dcerpc.dcerpc_server_name(pipe)
if printer is not None:
printername = printername + '\\%s' % printer
r = {}
r['printername'] = printername
r['datatype'] = None
r['devmode_ctr'] = {}
r['devmode_ctr']['size'] = 0
r['devmode_ctr']['devmode'] = None
r['access_mask'] = 0x02000000
r['level'] = 1
r['userlevel'] = {}
r['userlevel']['level1'] = {}
r['userlevel']['level1']['size'] = 0
r['userlevel']['level1']['client'] = None
r['userlevel']['level1']['user'] = None
r['userlevel']['level1']['build'] = 1381
r['userlevel']['level1']['major'] = 2
r['userlevel']['level1']['minor'] = 0
r['userlevel']['level1']['processor'] = 0
result = dcerpc.spoolss_OpenPrinterEx(pipe, r)
return result['handle']
def test_ClosePrinter(pipe, handle):
r = {}
r['handle'] = handle
dcerpc.spoolss_ClosePrinter(pipe, r)
def test_GetPrinter(pipe, handle):
r = {}
r['handle'] = handle
for level in [0, 1, 2, 3, 4, 5, 6, 7]:
print 'spoolss_GetPrinter(level = %d)' % level
r['level'] = level
r['buffer'] = None
r['buf_size'] = 0
result = ResizeBufferCall(dcerpc.spoolss_GetPrinter, pipe, r)
def test_EnumForms(pipe, handle):
print 'spoolss_EnumForms()'
r = {}
r['handle'] = handle
r['level'] = 1
r['buffer'] = None
r['buf_size'] = 0
result = ResizeBufferCall(dcerpc.spoolss_EnumForms, pipe, r)
forms = dcerpc.unmarshall_spoolss_FormInfo_array(
result['buffer'], r['level'], result['count'])
for form in forms:
r = {}
r['handle'] = handle
r['formname'] = form['info1']['formname']
r['level'] = 1
result = ResizeBufferCall(dcerpc.spoolss_GetForm, pipe, r)
def test_EnumPorts(pipe, handle):
print 'spoolss_EnumPorts()'
for level in [1, 2]:
r = {}
r['handle'] = handle
r['servername'] = None
r['level'] = level
result = ResizeBufferCall(dcerpc.spoolss_EnumPorts, pipe, r)
ports = dcerpc.unmarshall_spoolss_PortInfo_array(
result['buffer'], r['level'], result['count'])
if level == 1:
port_names = map(lambda x: x['info1']['port_name'], ports)
def test_DeleteForm(pipe, handle, formname):
r = {}
r['handle'] = handle
r['formname'] = formname
dcerpc.spoolss_DeleteForm(pipe, r)
def test_GetForm(pipe, handle, formname):
r = {}
r['handle'] = handle
r['formname'] = formname
r['level'] = 1
result = ResizeBufferCall(dcerpc.spoolss_GetForm, pipe, r)
return result['info']['info1']
def test_SetForm(pipe, handle, form):
print 'spoolss_SetForm()'
r = {}
r['handle'] = handle
r['level'] = 1
r['formname'] = form['info1']['formname']
r['info'] = form
dcerpc.spoolss_SetForm(pipe, r)
newform = test_GetForm(pipe, handle, r['formname'])
if form['info1'] != newform:
print 'SetForm: mismatch: %s != %s' % \
(r['info']['info1'], f)
sys.exit(1)
def test_AddForm(pipe, handle):
print 'spoolss_AddForm()'
formname = '__testform__'
r = {}
r['handle'] = handle
r['level'] = 1
r['info'] = {}
r['info']['info1'] = {}
r['info']['info1']['formname'] = formname
r['info']['info1']['flags'] = 0x0002
r['info']['info1']['width'] = 100
r['info']['info1']['length'] = 100
r['info']['info1']['left'] = 0
r['info']['info1']['top'] = 1000
r['info']['info1']['right'] = 2000
r['info']['info1']['bottom'] = 3000
try:
result = dcerpc.spoolss_AddForm(pipe, r)
except dcerpc.WERROR, arg:
if arg[0] == dcerpc.WERR_ALREADY_EXISTS:
test_DeleteForm(pipe, handle, formname)
result = dcerpc.spoolss_AddForm(pipe, r)
f = test_GetForm(pipe, handle, formname)
if r['info']['info1'] != f:
print 'AddForm: mismatch: %s != %s' % \
(r['info']['info1'], f)
sys.exit(1)
r['formname'] = formname
test_SetForm(pipe, handle, r['info'])
test_DeleteForm(pipe, handle, formname)
def test_EnumJobs(pipe, handle):
print 'spoolss_EnumJobs()'
r = {}
r['handle'] = handle
r['firstjob'] = 0
r['numjobs'] = 0xffffffff
r['level'] = 1
result = ResizeBufferCall(dcerpc.spoolss_EnumJobs, pipe, r)
if result['buffer'] is None:
return
jobs = dcerpc.unmarshall_spoolss_JobInfo_array(
result['buffer'], r['level'], result['count'])
for job in jobs:
s = {}
s['handle'] = handle
s['job_id'] = job['info1']['job_id']
s['level'] = 1
result = ResizeBufferCall(dcerpc.spoolss_GetJob, pipe, s)
if result['info'] != job:
print 'EnumJobs: mismatch: %s != %s' % (result['info'], job)
sys.exit(1)
# TODO: AddJob, DeleteJob, ScheduleJob
def test_EnumPrinterData(pipe, handle):
print 'test_EnumPrinterData()'
enum_index = 0
while 1:
r = {}
r['handle'] = handle
r['enum_index'] = enum_index
r['value_offered'] = 0
r['data_size'] = 0
result = dcerpc.spoolss_EnumPrinterData(pipe, r)
r['value_offered'] = result['value_needed']
r['data_size'] = result['data_size']
result = dcerpc.spoolss_EnumPrinterData(pipe, r)
if result['result'] == dcerpc.WERR_NO_MORE_ITEMS:
break
s = {}
s['handle'] = handle
s['value_name'] = result['value_name']
result2 = ResizeBufferCall(dcerpc.spoolss_GetPrinterData, pipe, s)
if result['buffer'][:result2['buf_size']] != result2['buffer']:
print 'EnumPrinterData/GetPrinterData mismatch'
sys.exit(1)
enum_index += 1
def test_SetPrinterDataEx(pipe, handle):
valuename = '__printerdataextest__'
data = '12345'
r = {}
r['handle'] = handle
r['key_name'] = 'DsSpooler'
r['value_name'] = valuename
r['type'] = 3
r['buffer'] = data
r['buf_size'] = len(data)
result = dcerpc.spoolss_SetPrinterDataEx(pipe, r)
def test_EnumPrinterDataEx(pipe, handle):
r = {}
r['handle'] = handle
r['key_name'] = 'DsSpooler'
r['buf_size'] = 0
result = dcerpc.spoolss_EnumPrinterDataEx(pipe, r)
if result['result'] == dcerpc.WERR_MORE_DATA:
r['buf_size'] = result['buf_size']
result = dcerpc.spoolss_EnumPrinterDataEx(pipe, r)
# TODO: test spoolss_GetPrinterDataEx()
def test_SetPrinterData(pipe, handle):
print 'testing spoolss_SetPrinterData()'
valuename = '__printerdatatest__'
data = '12345'
r = {}
r['handle'] = handle
r['value_name'] = valuename
r['type'] = 3 # REG_BINARY
r['buffer'] = data
r['real_len'] = 5
dcerpc.spoolss_SetPrinterData(pipe, r)
s = {}
s['handle'] = handle
s['value_name'] = valuename
result = ResizeBufferCall(dcerpc.spoolss_GetPrinterData, pipe, r)
if result['buffer'] != data:
print 'SetPrinterData: mismatch'
sys.exit(1)
dcerpc.spoolss_DeletePrinterData(pipe, r)
def test_EnumPrinters(pipe):
print 'testing spoolss_EnumPrinters()'
printer_names = None
r = {}
r['flags'] = 0x02
r['server'] = None
for level in [0, 1, 2, 4, 5]:
print 'test_EnumPrinters(level = %d)' % level
r['level'] = level
result = ResizeBufferCall(dcerpc.spoolss_EnumPrinters, pipe, r)
printers = dcerpc.unmarshall_spoolss_PrinterInfo_array(
result['buffer'], r['level'], result['count'])
if level == 2:
for p in printers:
# A nice check is for the specversion in the
# devicemode. This has always been observed to be
# 1025.
if p['info2']['devmode']['specversion'] != 1025:
print 'test_EnumPrinters: specversion != 1025'
sys.exit(1)
r['level'] = 1
result = ResizeBufferCall(dcerpc.spoolss_EnumPrinters, pipe, r)
for printer in dcerpc.unmarshall_spoolss_PrinterInfo_array(
result['buffer'], r['level'], result['count']):
if string.find(printer['info1']['name'], '\\\\') == 0:
print 'Skipping remote printer %s' % printer['info1']['name']
continue
printername = string.split(printer['info1']['name'], ',')[0]
handle = test_OpenPrinterEx(pipe, printername)
test_GetPrinter(pipe, handle)
test_EnumPorts(pipe, handle)
test_EnumForms(pipe, handle)
test_AddForm(pipe, handle)
test_EnumJobs(pipe, handle)
test_EnumPrinterData(pipe, handle)
test_EnumPrinterDataEx(pipe, handle)
test_SetPrinterData(pipe, handle)
# test_SetPrinterDataEx(pipe, handle)
test_ClosePrinter(pipe, handle)
def test_EnumPrinterDrivers(pipe):
print 'test spoolss_EnumPrinterDrivers()'
for level in [1, 2, 3]:
r = {}
r['server'] = None
r['environment'] = None
r['level'] = level
result = ResizeBufferCall(dcerpc.spoolss_EnumPrinterDrivers, pipe, r)
drivers = dcerpc.unmarshall_spoolss_DriverInfo_array(
result['buffer'], r['level'], result['count'])
if level == 1:
driver_names = map(lambda x: x['info1']['driver_name'], drivers)
def test_PrintServer(pipe):
handle = test_OpenPrinterEx(pipe, None)
# EnumForms and AddForm tests return WERR_BADFID here (??)
test_ClosePrinter(pipe, handle)
def runtests(binding, domain, username, password):
print 'Testing SPOOLSS pipe'
pipe = dcerpc.pipe_connect(binding,
dcerpc.DCERPC_SPOOLSS_UUID, dcerpc.DCERPC_SPOOLSS_VERSION,
domain, username, password)
test_EnumPrinters(pipe)
test_EnumPrinterDrivers(pipe)
test_PrintServer(pipe)

View File

@ -1,220 +0,0 @@
#!/usr/bin/python
import dcerpc, samr
def test_Connect(pipe):
handle = samr.Connect(pipe)
handle = samr.Connect2(pipe)
handle = samr.Connect3(pipe)
handle = samr.Connect4(pipe)
# WIN2K3 only?
try:
handle = samr.Connect5(pipe)
except dcerpc.NTSTATUS, arg:
if arg[0] != 0xc00000d2L: # NT_STATUS_NET_WRITE_FAULT
raise
return handle
def test_UserHandle(user_handle):
# QuerySecurity()/SetSecurity()
user_handle.SetSecurity(user_handle.QuerySecurity())
# GetUserPwInfo()
user_handle.GetUserPwInfo()
# GetUserInfo()
for level in [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 16, 17, 20,
21, 23, 24, 25, 26]:
try:
user_handle.QueryUserInfo(level)
user_handle.QueryUserInfo2(level)
except dcerpc.NTSTATUS, arg:
if arg[0] != 0xc0000003L: # NT_STATUS_INVALID_INFO_CLASS
raise
# GetGroupsForUser()
user_handle.GetGroupsForUser()
# TestPrivateFunctionsUser()
try:
user_handle.TestPrivateFunctionsUser()
except dcerpc.NTSTATUS, arg:
if arg[0] != 0xC0000002L:
raise
def test_GroupHandle(group_handle):
# QuerySecurity()/SetSecurity()
group_handle.SetSecurity(group_handle.QuerySecurity())
# QueryGroupInfo()
for level in [1, 2, 3, 4, 5]:
info = group_handle.QueryGroupInfo(level)
# TODO: SetGroupinfo()
# QueryGroupMember()
group_handle.QueryGroupMember()
def test_AliasHandle(alias_handle):
# QuerySecurity()/SetSecurity()
alias_handle.SetSecurity(alias_handle.QuerySecurity())
print alias_handle.GetMembersInAlias()
def test_DomainHandle(name, sid, domain_handle):
print 'testing %s (%s)' % (name, sid)
# QuerySecurity()/SetSecurity()
domain_handle.SetSecurity(domain_handle.QuerySecurity())
# LookupNames(), none mapped
try:
domain_handle.LookupNames(['xxNONAMExx'])
except dcerpc.NTSTATUS, arg:
if arg[0] != 0xc0000073L:
raise dcerpc.NTSTATUS(arg)
# LookupNames(), some mapped
if name != 'Builtin':
domain_handle.LookupNames(['Administrator', 'xxNONAMExx'])
# QueryDomainInfo()/SetDomainInfo()
levels = [1, 2, 3, 4, 5, 6, 7, 8, 9, 11, 12, 13]
set_ok = [1, 0, 1, 1, 0, 1, 1, 0, 1, 0, 1, 0]
for i in range(len(levels)):
info = domain_handle.QueryDomainInfo(level = levels[i])
try:
domain_handle.SetDomainInfo(levels[i], info)
except dcerpc.NTSTATUS, arg:
if not (arg[0] == 0xc0000003L and not set_ok[i]):
raise
# QueryDomainInfo2()
levels = [1, 2, 3, 4, 5, 6, 7, 8, 9, 11, 12, 13]
for i in range(len(levels)):
domain_handle.QueryDomainInfo2(level = levels[i])
# EnumDomainUsers
print 'testing users'
users = domain_handle.EnumDomainUsers()
rids = domain_handle.LookupNames(users)
for i in range(len(users)):
test_UserHandle(domain_handle.OpenUser(rids[0][i]))
# QueryDisplayInfo
for i in [1, 2, 3, 4, 5]:
domain_handle.QueryDisplayInfo(level = i)
domain_handle.QueryDisplayInfo2(level = i)
domain_handle.QueryDisplayInfo3(level = i)
# EnumDomainGroups
print 'testing groups'
groups = domain_handle.EnumDomainGroups()
rids = domain_handle.LookupNames(groups)
for i in range(len(groups)):
test_GroupHandle(domain_handle.OpenGroup(rids[0][i]))
# EnumDomainAliases
print 'testing aliases'
aliases = domain_handle.EnumDomainAliases()
rids = domain_handle.LookupNames(aliases)
for i in range(len(aliases)):
test_AliasHandle(domain_handle.OpenAlias(rids[0][i]))
# CreateUser
# CreateUser2
# CreateDomAlias
# RidToSid
# RemoveMemberFromForeignDomain
# CreateDomainGroup
# GetAliasMembership
# GetBootKeyInformation()
try:
domain_handle.GetBootKeyInformation()
except dcerpc.NTSTATUS, arg:
pass
# TestPrivateFunctionsDomain()
try:
domain_handle.TestPrivateFunctionsDomain()
except dcerpc.NTSTATUS, arg:
if arg[0] != 0xC0000002L:
raise
def test_ConnectHandle(connect_handle):
print 'testing connect handle'
# QuerySecurity/SetSecurity
connect_handle.SetSecurity(connect_handle.QuerySecurity())
# Lookup bogus domain
try:
connect_handle.LookupDomain('xxNODOMAINxx')
except dcerpc.NTSTATUS, arg:
if arg[0] != 0xC00000DFL: # NT_STATUS_NO_SUCH_DOMAIN
raise
# Test all domains
for domain_name in connect_handle.EnumDomains():
connect_handle.GetDomPwInfo(domain_name)
sid = connect_handle.LookupDomain(domain_name)
domain_handle = connect_handle.OpenDomain(sid)
test_DomainHandle(domain_name, sid, domain_handle)
# TODO: Test Shutdown() function
def runtests(binding, creds):
print 'Testing SAMR pipe'
pipe = dcerpc.pipe_connect(binding,
dcerpc.DCERPC_SAMR_UUID, int(dcerpc.DCERPC_SAMR_VERSION), creds)
handle = test_Connect(pipe)
test_ConnectHandle(handle)

View File

@ -1,90 +0,0 @@
#!/usr/bin/python
import sys, os
import Tdb
def fail(msg):
print 'FAILED:', msg
sys.exit(1)
tdb_file = '/tmp/torture_tdb.tdb'
# Create temporary tdb file
t = Tdb.Tdb(tdb_file, flags = Tdb.CLEAR_IF_FIRST)
# Check non-existent key throws KeyError exception
try:
t['__none__']
except KeyError:
pass
else:
fail('non-existent key did not throw KeyError')
# Check storing key
t['bar'] = '1234'
if t['bar'] != '1234':
fail('store key failed')
# Check key exists
if not t.has_key('bar'):
fail('has_key() failed for existing key')
if t.has_key('__none__'):
fail('has_key() succeeded for non-existent key')
# Delete key
try:
del(t['__none__'])
except KeyError:
pass
else:
fail('delete of non-existent key did not throw KeyError')
del t['bar']
if t.has_key('bar'):
fail('delete of existing key did not delete key')
# Clear all keys
t.clear()
if len(t) != 0:
fail('clear failed to remove all keys')
# Other dict functions
t['a'] = '1'
t['ab'] = '12'
t['abc'] = '123'
if len(t) != 3:
fail('len method produced wrong value')
keys = t.keys()
values = t.values()
items = t.items()
if set(keys) != set(['a', 'ab', 'abc']):
fail('keys method produced wrong values')
if set(values) != set(['1', '12', '123']):
fail('values method produced wrong values')
if set(items) != set([('a', '1'), ('ab', '12'), ('abc', '123')]):
fail('values method produced wrong values')
t.close()
# Re-open read-only
t = Tdb.Tdb(tdb_file, open_flags = os.O_RDONLY)
t.keys()
t.close()
# Clean up
os.unlink(tdb_file)

View File

@ -1,165 +0,0 @@
#!/usr/bin/python
import sys, dcerpc
def test_OpenHKLM(pipe):
r = {}
r['unknown'] = {}
r['unknown']['unknown0'] = 0x9038
r['unknown']['unknown1'] = 0x0000
r['access_required'] = 0x02000000
result = dcerpc.winreg_OpenHKLM(pipe, r)
return result['handle']
def test_QueryInfoKey(pipe, handle):
r = {}
r['handle'] = handle
r['class'] = {}
r['class']['name'] = None
return dcerpc.winreg_QueryInfoKey(pipe, r)
def test_CloseKey(pipe, handle):
r = {}
r['handle'] = handle
dcerpc.winreg_CloseKey(pipe, r)
def test_FlushKey(pipe, handle):
r = {}
r['handle'] = handle
dcerpc.winreg_FlushKey(pipe, r)
def test_GetVersion(pipe, handle):
r = {}
r['handle'] = handle
dcerpc.winreg_GetVersion(pipe, r)
def test_GetKeySecurity(pipe, handle):
r = {}
r['handle'] = handle
r['unknown'] = 4
r['size'] = None
r['data'] = {}
r['data']['max_len'] = 0
r['data']['data'] = ''
result = dcerpc.winreg_GetKeySecurity(pipe, r)
print result
if result['result'] == dcerpc.WERR_INSUFFICIENT_BUFFER:
r['size'] = {}
r['size']['max_len'] = result['data']['max_len']
r['size']['offset'] = 0
r['size']['len'] = result['data']['max_len']
result = dcerpc.winreg_GetKeySecurity(pipe, r)
print result
sys.exit(1)
def test_Key(pipe, handle, name, depth = 0):
# Don't descend too far. Registries can be very deep.
if depth > 2:
return
try:
keyinfo = test_QueryInfoKey(pipe, handle)
except dcerpc.WERROR, arg:
if arg[0] == dcerpc.WERR_ACCESS_DENIED:
return
test_GetVersion(pipe, handle)
test_FlushKey(pipe, handle)
test_GetKeySecurity(pipe, handle)
# Enumerate values in this key
r = {}
r['handle'] = handle
r['name_in'] = {}
r['name_in']['len'] = 0
r['name_in']['max_len'] = (keyinfo['max_valnamelen'] + 1) * 2
r['name_in']['buffer'] = {}
r['name_in']['buffer']['max_len'] = keyinfo['max_valnamelen'] + 1
r['name_in']['buffer']['offset'] = 0
r['name_in']['buffer']['len'] = 0
r['type'] = 0
r['value_in'] = {}
r['value_in']['max_len'] = keyinfo['max_valbufsize']
r['value_in']['offset'] = 0
r['value_in']['len'] = 0
r['value_len1'] = keyinfo['max_valbufsize']
r['value_len2'] = 0
for i in range(0, keyinfo['num_values']):
r['enum_index'] = i
dcerpc.winreg_EnumValue(pipe, r)
# Recursively test subkeys of this key
r = {}
r['handle'] = handle
r['key_name_len'] = 0
r['unknown'] = 0x0414
r['in_name'] = {}
r['in_name']['unknown'] = 0x20a
r['in_name']['key_name'] = {}
r['in_name']['key_name']['name'] = None
r['class'] = {}
r['class']['name'] = None
r['last_changed_time'] = {}
r['last_changed_time']['low'] = 0
r['last_changed_time']['high'] = 0
for i in range(0, keyinfo['num_subkeys']):
r['enum_index'] = i
subkey = dcerpc.winreg_EnumKey(pipe, r)
s = {}
s['handle'] = handle
s['keyname'] = {}
s['keyname']['name'] = subkey['out_name']['name']
s['unknown'] = 0
s['access_mask'] = 0x02000000
result = dcerpc.winreg_OpenKey(pipe, s)
test_Key(pipe, result['handle'], name + '/' + s['keyname']['name'],
depth + 1)
test_CloseKey(pipe, result['handle'])
# Enumerate values
def runtests(binding, domain, username, password):
print 'Testing WINREG pipe'
pipe = dcerpc.pipe_connect(binding,
dcerpc.DCERPC_WINREG_UUID, dcerpc.DCERPC_WINREG_VERSION,
domain, username, password)
handle = test_OpenHKLM(pipe)
test_Key(pipe, handle, 'HKLM')