mirror of
https://github.com/samba-team/samba.git
synced 2025-03-07 00:58:40 +03:00
s4-tests: Added a speedtest for LDAP search operations with different accounts.
Autobuild-User: Nadezhda Ivanova <nivanova@samba.org> Autobuild-Date: Wed Dec 15 21:32:09 CET 2010 on sn-devel-104
This commit is contained in:
parent
aab37c3146
commit
6bb89aaa0d
@ -42,7 +42,7 @@ from samba.ndr import ndr_pack, ndr_unpack
|
||||
from samba.dcerpc import security
|
||||
|
||||
from samba.auth import system_session
|
||||
from samba import gensec
|
||||
from samba import gensec, sd_utils
|
||||
from samba.samdb import SamDB
|
||||
from samba.credentials import Credentials
|
||||
import samba.tests
|
||||
@ -77,12 +77,6 @@ creds.set_gensec_features(creds.get_gensec_features() | gensec.FEATURE_SEAL)
|
||||
|
||||
class SpeedTest(samba.tests.TestCase):
|
||||
|
||||
def find_basedn(self, ldb):
|
||||
res = ldb.search(base="", expression="", scope=SCOPE_BASE,
|
||||
attrs=["defaultNamingContext"])
|
||||
self.assertEquals(len(res), 1)
|
||||
return res[0]["defaultNamingContext"][0]
|
||||
|
||||
def find_domain_sid(self, ldb):
|
||||
res = ldb.search(base=self.base_dn, expression="(objectClass=*)", scope=SCOPE_BASE)
|
||||
return ndr_unpack(security.dom_sid,res[0]["objectSid"][0])
|
||||
@ -90,8 +84,8 @@ class SpeedTest(samba.tests.TestCase):
|
||||
def setUp(self):
|
||||
super(SpeedTest, self).setUp()
|
||||
self.ldb_admin = ldb
|
||||
self.base_dn = self.find_basedn(self.ldb_admin)
|
||||
self.domain_sid = self.find_domain_sid(self.ldb_admin)
|
||||
self.base_dn = ldb.domain_dn()
|
||||
self.domain_sid = security.dom_sid(ldb.get_domain_sid())
|
||||
self.user_pass = "samba123@"
|
||||
print "baseDN: %s" % self.base_dn
|
||||
|
||||
@ -129,6 +123,11 @@ url: www.example.com
|
||||
for dn in dn_list:
|
||||
delete_force(self.ldb_admin, dn)
|
||||
|
||||
class SpeedTestAddDel(SpeedTest):
|
||||
|
||||
def setUp(self):
|
||||
super(SpeedTestAddDel, self).setUp()
|
||||
|
||||
def run_bundle(self, num):
|
||||
print "\n=== Test ADD/DEL %s user objects ===\n" % num
|
||||
avg_add = Decimal("0.0")
|
||||
@ -169,6 +168,62 @@ url: www.example.com
|
||||
"""
|
||||
self.run_bundle(10000)
|
||||
|
||||
class AclSearchSpeedTest(SpeedTest):
|
||||
|
||||
def setUp(self):
|
||||
super(AclSearchSpeedTest, self).setUp()
|
||||
self.ldb_admin.newuser("acltestuser", "samba123@")
|
||||
self.sd_utils = sd_utils.SDUtils(self.ldb_admin)
|
||||
self.ldb_user = self.get_ldb_connection("acltestuser", "samba123@")
|
||||
self.user_sid = self.sd_utils.get_object_sid(self.get_user_dn("acltestuser"))
|
||||
|
||||
def tearDown(self):
|
||||
super(AclSearchSpeedTest, self).tearDown()
|
||||
delete_force(self.ldb_admin, self.get_user_dn("acltestuser"))
|
||||
|
||||
def run_search_bundle(self, num, _ldb):
|
||||
print "\n=== Creating %s user objects ===\n" % num
|
||||
self.create_bundle(num)
|
||||
mod = "(A;;LC;;;%s)(D;;RP;;;%s)" % (str(self.user_sid), str(self.user_sid))
|
||||
for i in range(num):
|
||||
self.sd_utils.dacl_add_ace("cn=speedtestuser%d,cn=Users,%s" %
|
||||
(i+1, self.base_dn), mod)
|
||||
print "\n=== %s user objects created ===\n" % num
|
||||
print "\n=== Test search on %s user objects ===\n" % num
|
||||
avg_search = Decimal("0.0")
|
||||
for x in [1, 2, 3]:
|
||||
start = time.time()
|
||||
res = _ldb.search(base=self.base_dn, expression="(objectClass=*)", scope=SCOPE_SUBTREE)
|
||||
res_search = Decimal( str(time.time() - start) )
|
||||
avg_search += res_search
|
||||
print " Attempt %s SEARCH: %.3fs" % ( x, float(res_search) )
|
||||
print "Average Search: %.3fs" % float( Decimal(avg_search) / Decimal("3.0") )
|
||||
self.remove_bundle(num)
|
||||
|
||||
def get_user_dn(self, name):
|
||||
return "CN=%s,CN=Users,%s" % (name, self.base_dn)
|
||||
|
||||
def get_ldb_connection(self, target_username, target_password):
|
||||
creds_tmp = Credentials()
|
||||
creds_tmp.set_username(target_username)
|
||||
creds_tmp.set_password(target_password)
|
||||
creds_tmp.set_domain(creds.get_domain())
|
||||
creds_tmp.set_realm(creds.get_realm())
|
||||
creds_tmp.set_workstation(creds.get_workstation())
|
||||
creds_tmp.set_gensec_features(creds_tmp.get_gensec_features()
|
||||
| gensec.FEATURE_SEAL)
|
||||
ldb_target = SamDB(url=host, credentials=creds_tmp, lp=lp)
|
||||
return ldb_target
|
||||
|
||||
def test_search_01000(self):
|
||||
self.run_search_bundle(1000, self.ldb_admin)
|
||||
|
||||
def test_search2_01000(self):
|
||||
# allow the user to see objects but not attributes, all attributes will be filtered out
|
||||
mod = "(A;;LC;;;%s)(D;;RP;;;%s)" % (str(self.user_sid), str(self.user_sid))
|
||||
self.sd_utils.dacl_add_ace("CN=Users,%s" % self.base_dn, mod)
|
||||
self.run_search_bundle(1000, self.ldb_user)
|
||||
|
||||
# Important unit running information
|
||||
|
||||
if not "://" in host:
|
||||
@ -179,7 +234,8 @@ ldb = SamDB(host, credentials=creds, session_info=system_session(), lp=lp, optio
|
||||
|
||||
runner = SubunitTestRunner()
|
||||
rc = 0
|
||||
if not runner.run(unittest.makeSuite(SpeedTest)).wasSuccessful():
|
||||
if not runner.run(unittest.makeSuite(SpeedTestAddDel)).wasSuccessful():
|
||||
rc = 1
|
||||
if not runner.run(unittest.makeSuite(AclSearchSpeedTest)).wasSuccessful():
|
||||
rc = 1
|
||||
|
||||
sys.exit(rc)
|
||||
|
Loading…
x
Reference in New Issue
Block a user