1
0
mirror of https://github.com/samba-team/samba.git synced 2025-07-31 20:22:15 +03:00

s4: Added acl search tests for anonymous connection.

The tests make sure that we comply with dsHeuristics setting and
restrict anonymous access to rootDSE. They will be enabled when the
implementation is pushed. tests are verified against win2k8.
This commit is contained in:
Nadezhda Ivanova
2010-07-14 14:44:46 +03:00
parent e30aa45666
commit d35e9008a7

View File

@ -15,15 +15,17 @@ samba.ensure_external_module("testtools", "testtools")
import samba.getopt as options
from ldb import (
SCOPE_BASE, LdbError, ERR_NO_SUCH_OBJECT,
SCOPE_BASE, SCOPE_SUBTREE, LdbError, ERR_NO_SUCH_OBJECT,
ERR_UNWILLING_TO_PERFORM, ERR_INSUFFICIENT_ACCESS_RIGHTS)
from ldb import ERR_CONSTRAINT_VIOLATION
from ldb import ERR_OPERATIONS_ERROR
from ldb import Message, MessageElement, Dn
from ldb import FLAG_MOD_REPLACE, FLAG_MOD_DELETE
from samba.ndr import ndr_pack, ndr_unpack
from samba.dcerpc import security
from samba.auth import system_session
from samba.auth import system_session_anonymous
from samba import gensec
from samba.samdb import SamDB
from samba.credentials import Credentials
@ -694,94 +696,77 @@ Member: CN=test_modify_user2,CN=Users,""" + self.base_dn
class AclSearchTests(AclTests):
def setUp(self):
super(AclTests, self).setUp()
self.regular_user = "acl_search_user1"
self.create_enable_user(self.regular_user)
self.ldb_user = self.get_ldb_connection(self.regular_user, self.user_pass)
super(AclSearchTests, self).setUp()
self.anonymous = SamDB(url=host, session_info=system_session_anonymous(),
lp=lp)
res = self.ldb_admin.search("CN=Directory Service, CN=Windows NT, CN=Services, "
+ self.configuration_dn, scope=SCOPE_BASE, attrs=["dSHeuristics"])
if "dSHeuristics" in res[0]:
self.dsheuristics = res[0]["dSHeuristics"][0]
else:
self.dsheuristics = None
def tearDown(self):
super(AclSearchTests, self).tearDown()
self.delete_force(self.ldb_admin, "CN=test_search_user1,OU=test_search_ou1," + self.base_dn)
self.set_dsheuristics(self.dsheuristics)
self.delete_force(self.ldb_admin, "OU=test_search_ou2,OU=test_search_ou1," + self.base_dn)
self.delete_force(self.ldb_admin, "OU=test_search_ou1," + self.base_dn)
self.delete_force(self.ldb_admin, self.get_user_dn(self.regular_user))
def test_search_u1(self):
"""See if can prohibit user to read another User object"""
ou_dn = "OU=test_search_ou1," + self.base_dn
user_dn = "CN=test_search_user1," + ou_dn
# Create clean OU
self.delete_force(self.ldb_admin, ou_dn)
self.create_ou(self.ldb_admin, ou_dn)
desc = self.read_desc(ou_dn)
desc_sddl = desc.as_sddl(self.domain_sid)
# Parse descriptor's SDDL and remove all inherited ACEs reffering
# to 'Registered Users' or 'Authenticated Users'
desc_aces = re.findall("\(.*?\)", desc_sddl)
for ace in desc_aces:
if ("I" in ace) and (("RU" in ace) or ("AU" in ace)):
desc_sddl = desc_sddl.replace(ace, "")
# Add 'P' in the DACL so it breaks further inheritance
desc_sddl = desc_sddl.replace("D:AI(", "D:PAI(")
# Create a security descriptor object and OU with that descriptor
desc = security.descriptor.from_sddl(desc_sddl, self.domain_sid)
self.delete_force(self.ldb_admin, ou_dn)
self.create_ou(self.ldb_admin, ou_dn, desc)
# Create clean user
self.delete_force(self.ldb_admin, user_dn)
self.create_test_user(self.ldb_admin, user_dn)
desc = self.read_desc(user_dn)
desc_sddl = desc.as_sddl(self.domain_sid)
# Parse security descriptor SDDL and remove all 'Read' ACEs
# reffering to AU
desc_aces = re.findall("\(.*?\)", desc_sddl)
for ace in desc_aces:
if ("AU" in ace) and ("R" in ace):
desc_sddl = desc_sddl.replace(ace, "")
# Create user with the edited descriptor
desc = security.descriptor.from_sddl(desc_sddl, self.domain_sid)
self.delete_force(self.ldb_admin, user_dn)
self.create_test_user(self.ldb_admin, user_dn, desc)
def test_search_anonymous1(self):
"""Verify access of rootDSE with the correct request"""
res = self.anonymous.search("", expression="(objectClass=*)", scope=SCOPE_BASE)
self.assertEquals(len(res), 1)
#verify some of the attributes
#dont care about values
self.assertTrue("ldapServiceName" in res[0])
self.assertTrue("namingContexts" in res[0])
self.assertTrue("isSynchronized" in res[0])
self.assertTrue("dsServiceName" in res[0])
self.assertTrue("supportedSASLMechanisms" in res[0])
self.assertTrue("isGlobalCatalogReady" in res[0])
self.assertTrue("domainControllerFunctionality" in res[0])
self.assertTrue("serverName" in res[0])
res = self.ldb_user.search(self.base_dn,
expression="(distinguishedName=%s)" % user_dn)
self.assertEqual(res, [])
def test_search_anonymous2(self):
"""Make sure we cannot access anything else"""
try:
res = self.anonymous.search("", expression="(objectClass=*)", scope=SCOPE_SUBTREE)
except LdbError, (num, _):
self.assertEquals(num, ERR_OPERATIONS_ERROR)
else:
self.fail()
try:
res = self.anonymous.search(self.base_dn, expression="(objectClass=*)", scope=SCOPE_SUBTREE)
except LdbError, (num, _):
self.assertEquals(num, ERR_OPERATIONS_ERROR)
else:
self.fail()
try:
res = self.anonymous.search("CN=Configuration," + self.base_dn, expression="(objectClass=*)",
scope=SCOPE_SUBTREE)
except LdbError, (num, _):
self.assertEquals(num, ERR_OPERATIONS_ERROR)
else:
self.fail()
def test_search_u2(self):
"""User's group ACEs cleared and after that granted RIGHT_DS_READ_PROPERTY to another User object"""
ou_dn = "OU=test_search_ou1," + self.base_dn
user_dn = "CN=test_search_user1," + ou_dn
# Create clean OU
self.delete_force(self.ldb_admin, ou_dn)
self.create_ou(self.ldb_admin, ou_dn)
desc = self.read_desc(ou_dn)
desc_sddl = desc.as_sddl(self.domain_sid)
# Parse descriptor's SDDL and remove all inherited ACEs reffering
# to 'Registered Users' or 'Authenticated Users'
desc_aces = re.findall("\(.*?\)", desc_sddl)
for ace in desc_aces:
if ("I" in ace) and (("RU" in ace) or ("AU" in ace)):
desc_sddl = desc_sddl.replace(ace, "")
# Add 'P' in the DACL so it breaks further inheritance
desc_sddl = desc_sddl.replace("D:AI(", "D:PAI(")
# Create a security descriptor object and OU with that descriptor
desc = security.descriptor.from_sddl(desc_sddl, self.domain_sid)
self.delete_force(self.ldb_admin, ou_dn)
self.create_ou(self.ldb_admin, ou_dn, desc)
# Create clean user
self.delete_force(self.ldb_admin, user_dn)
self.create_test_user(self.ldb_admin, user_dn)
# Parse security descriptor SDDL and remove all 'Read' ACEs
# reffering to AU
desc_aces = re.findall("\(.*?\)", desc_sddl)
for ace in desc_aces:
if ("AU" in ace) and ("R" in ace):
desc_sddl = desc_sddl.replace(ace, "")
#mod = "(OA;;RP;e48d0154-bcf8-11d1-8702-00c04fb96050;;AU)"
mod = "(A;;RP;;;AU)"
self.dacl_add_ace(user_dn, mod)
res = self.ldb_user.search(self.base_dn,
expression="(distinguishedName=%s)" % user_dn)
self.assertNotEqual(res, [])
def test_search_anonymous3(self):
"""Set dsHeuristics and repeat"""
self.set_dsheuristics("0000002")
self.create_ou(self.ldb_admin, "OU=test_search_ou1," + self.base_dn)
mod = "(A;CI;LC;;;AN)"
self.dacl_add_ace("OU=test_search_ou1," + self.base_dn, mod)
self.create_ou(self.ldb_admin, "OU=test_search_ou2,OU=test_search_ou1," + self.base_dn)
res = self.anonymous.search("OU=test_search_ou2,OU=test_search_ou1," + self.base_dn,
expression="(objectClass=*)", scope=SCOPE_SUBTREE)
self.assertEquals(len(res), 1)
self.assertTrue("dn" in res[0])
self.assertTrue(res[0]["dn"] == Dn(self.ldb_admin,
"OU=test_search_ou2,OU=test_search_ou1," + self.base_dn))
res = self.anonymous.search("CN=Configuration," + self.base_dn, expression="(objectClass=*)",
scope=SCOPE_SUBTREE)
self.assertEquals(len(res), 1)
self.assertTrue("dn" in res[0])
self.assertTrue(res[0]["dn"] == Dn(self.ldb_admin, self.configuration_dn))
#tests on ldap delete operations
class AclDeleteTests(AclTests):