1
0
mirror of https://github.com/samba-team/samba.git synced 2025-01-22 22:04:08 +03:00

CVE-2020-10760 dsdb: Add tests for paged_results and VLV over the Global Catalog port

This should avoid a regression.

(backported from master patch)
[abartlet@samba.org: sort=True parameter on test_paged_delete_during_search
 is not in 4.10]

Signed-off-by: Andrew Bartlett <abartlet@samba.org>
This commit is contained in:
Andrew Bartlett 2020-06-08 16:32:14 +12:00 committed by Karolin Seeger
parent 4bc0ada8d9
commit ca38b0eecd
2 changed files with 107 additions and 66 deletions

View File

@ -1,2 +1,2 @@
samba4.ldap.vlv.python.*__main__.VLVTests.test_vlv_change_search_expr samba4.ldap.vlv.python.*__main__.VLVTests.test_vlv_change_search_expr
samba4.ldap.vlv.python.*__main__.PagedResultsTests.test_paged_cant_change_controls_data samba4.ldap.vlv.python.*__main__.PagedResultsTestsRW.test_paged_cant_change_controls_data

View File

@ -152,7 +152,7 @@ class TestsWithUserOU(samba.tests.TestCase):
super(TestsWithUserOU, self).setUp() super(TestsWithUserOU, self).setUp()
self.ldb = SamDB(host, credentials=creds, self.ldb = SamDB(host, credentials=creds,
session_info=system_session(lp), lp=lp) session_info=system_session(lp), lp=lp)
self.ldb_ro = self.ldb
self.base_dn = self.ldb.domain_dn() self.base_dn = self.ldb.domain_dn()
self.ou = "ou=vlv,%s" % self.base_dn self.ou = "ou=vlv,%s" % self.base_dn
if opts.delete_in_setup: if opts.delete_in_setup:
@ -195,8 +195,60 @@ class TestsWithUserOU(samba.tests.TestCase):
self.ldb.delete(self.ou, ['tree_delete:1']) self.ldb.delete(self.ou, ['tree_delete:1'])
class VLVTests(TestsWithUserOU): class VLVTestsBase(TestsWithUserOU):
# Run a vlv search and return important fields of the response control
def vlv_search(self, attr, expr, cookie="", after_count=0, offset=1):
sort_ctrl = "server_sort:1:0:%s" % attr
ctrl = "vlv:1:0:%d:%d:0" % (after_count, offset)
if cookie:
ctrl += ":" + cookie
res = self.ldb_ro.search(self.ou,
expression=expr,
scope=ldb.SCOPE_ONELEVEL,
attrs=[attr],
controls=[ctrl, sort_ctrl])
results = [str(x[attr][0]) for x in res]
ctrls = [str(c) for c in res.controls if
str(c).startswith('vlv')]
self.assertEqual(len(ctrls), 1)
spl = ctrls[0].rsplit(':')
cookie = ""
if len(spl) == 6:
cookie = spl[-1]
return results, cookie
class VLVTestsRO(VLVTestsBase):
def test_vlv_simple_double_run(self):
"""Do the simplest possible VLV query to confirm if VLV
works at all. Useful for showing VLV as a whole works
on Global Catalog (for example)"""
attr = 'roomNumber'
expr = "(objectclass=user)"
# Start new search
full_results, cookie = self.vlv_search(attr, expr,
after_count=len(self.users))
results, cookie = self.vlv_search(attr, expr, cookie=cookie,
after_count=len(self.users))
expected_results = full_results
self.assertEqual(results, expected_results)
class VLVTestsGC(VLVTestsRO):
def setUp(self):
super(VLVTestsRO, self).setUp()
self.ldb_ro = SamDB(host + ":3268", credentials=creds,
session_info=system_session(lp), lp=lp)
class VLVTests(VLVTestsBase):
def get_full_list(self, attr, include_cn=False): def get_full_list(self, attr, include_cn=False):
"""Fetch the whole list sorted on the attribute, using the VLV. """Fetch the whole list sorted on the attribute, using the VLV.
This way you get a VLV cookie.""" This way you get a VLV cookie."""
@ -1077,31 +1129,6 @@ class VLVTests(TestsWithUserOU):
controls=[sort_control, controls=[sort_control,
"vlv:0:1:1:1:0:%s" % vlv_cookies[-1]]) "vlv:0:1:1:1:0:%s" % vlv_cookies[-1]])
# Run a vlv search and return important fields of the response control
def vlv_search(self, attr, expr, cookie="", after_count=0, offset=1):
sort_ctrl = "server_sort:1:0:%s" % attr
ctrl = "vlv:1:0:%d:%d:0" % (after_count, offset)
if cookie:
ctrl += ":" + cookie
res = self.ldb.search(self.ou,
expression=expr,
scope=ldb.SCOPE_ONELEVEL,
attrs=[attr],
controls=[ctrl, sort_ctrl])
results = [str(x[attr][0]) for x in res]
ctrls = [str(c) for c in res.controls if
str(c).startswith('vlv')]
self.assertEqual(len(ctrls), 1)
spl = ctrls[0].rsplit(':')
cookie = ""
if len(spl) == 6:
cookie = spl[-1]
return results, cookie
def test_vlv_modify_during_view(self): def test_vlv_modify_during_view(self):
attr = 'roomNumber' attr = 'roomNumber'
expr = "(objectclass=user)" expr = "(objectclass=user)"
@ -1214,7 +1241,7 @@ class PagedResultsTests(TestsWithUserOU):
if subtree: if subtree:
scope = ldb.SCOPE_SUBTREE scope = ldb.SCOPE_SUBTREE
res = self.ldb.search(ou, res = self.ldb_ro.search(ou,
expression=expr, expression=expr,
scope=scope, scope=scope,
controls=controls, controls=controls,
@ -1231,6 +1258,53 @@ class PagedResultsTests(TestsWithUserOU):
cookie = spl[-1] cookie = spl[-1]
return results, cookie return results, cookie
class PagedResultsTestsRO(PagedResultsTests):
def test_paged_search_lockstep(self):
expr = "(objectClass=*)"
ps = 3
all_results, _ = self.paged_search(expr, page_size=len(self.users)+1)
# Run two different but overlapping paged searches simultaneously.
set_1_index = int((len(all_results))//3)
set_2_index = int((2*len(all_results))//3)
set_1 = all_results[set_1_index:]
set_2 = all_results[:set_2_index+1]
set_1_expr = "(cn>=%s)" % (all_results[set_1_index])
set_2_expr = "(cn<=%s)" % (all_results[set_2_index])
results, cookie1 = self.paged_search(set_1_expr, page_size=ps)
self.assertEqual(results, set_1[:ps])
results, cookie2 = self.paged_search(set_2_expr, page_size=ps)
self.assertEqual(results, set_2[:ps])
results, cookie1 = self.paged_search(set_1_expr, cookie=cookie1,
page_size=ps)
self.assertEqual(results, set_1[ps:ps*2])
results, cookie2 = self.paged_search(set_2_expr, cookie=cookie2,
page_size=ps)
self.assertEqual(results, set_2[ps:ps*2])
results, _ = self.paged_search(set_1_expr, cookie=cookie1,
page_size=len(self.users))
self.assertEqual(results, set_1[ps*2:])
results, _ = self.paged_search(set_2_expr, cookie=cookie2,
page_size=len(self.users))
self.assertEqual(results, set_2[ps*2:])
class PagedResultsTestsGC(PagedResultsTestsRO):
def setUp(self):
super(PagedResultsTestsRO, self).setUp()
self.ldb_ro = SamDB(host + ":3268", credentials=creds,
session_info=system_session(lp), lp=lp)
class PagedResultsTestsRW(PagedResultsTests):
def test_paged_delete_during_search(self): def test_paged_delete_during_search(self):
expr = "(objectClass=*)" expr = "(objectClass=*)"
@ -1611,39 +1685,6 @@ class PagedResultsTests(TestsWithUserOU):
cookie, attrs=changed_attrs, cookie, attrs=changed_attrs,
extra_ctrls=[]) extra_ctrls=[])
def test_paged_search_lockstep(self):
expr = "(objectClass=*)"
ps = 3
all_results, _ = self.paged_search(expr, page_size=len(self.users)+1)
# Run two different but overlapping paged searches simultaneously.
set_1_index = int((len(all_results))//3)
set_2_index = int((2*len(all_results))//3)
set_1 = all_results[set_1_index:]
set_2 = all_results[:set_2_index+1]
set_1_expr = "(cn>=%s)" % (all_results[set_1_index])
set_2_expr = "(cn<=%s)" % (all_results[set_2_index])
results, cookie1 = self.paged_search(set_1_expr, page_size=ps)
self.assertEqual(results, set_1[:ps])
results, cookie2 = self.paged_search(set_2_expr, page_size=ps)
self.assertEqual(results, set_2[:ps])
results, cookie1 = self.paged_search(set_1_expr, cookie=cookie1,
page_size=ps)
self.assertEqual(results, set_1[ps:ps*2])
results, cookie2 = self.paged_search(set_2_expr, cookie=cookie2,
page_size=ps)
self.assertEqual(results, set_2[ps:ps*2])
results, _ = self.paged_search(set_1_expr, cookie=cookie1,
page_size=len(self.users))
self.assertEqual(results, set_1[ps*2:])
results, _ = self.paged_search(set_2_expr, cookie=cookie2,
page_size=len(self.users))
self.assertEqual(results, set_2[ps*2:])
def test_vlv_paged(self): def test_vlv_paged(self):
"""Testing behaviour with VLV and paged_results set. """Testing behaviour with VLV and paged_results set.