mirror of
https://github.com/samba-team/samba.git
synced 2025-01-20 14:03:59 +03:00
0a79ed2c0b
SEARCH_FLAG_RODC_ATTRIBUTE should be like SEARCH_FLAG_CONFIDENTIAL, but for DirSync and DRS replication. Accounts with GUID_DRS_GET_CHANGES rights should not be able to read this attribute. BUG: https://bugzilla.samba.org/show_bug.cgi?id=15424 Signed-off-by: Andrew Bartlett <abartlet@samba.org>
1138 lines
49 KiB
Python
Executable File
1138 lines
49 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
# -*- coding: utf-8 -*-
|
|
#
|
|
# Tests that confidential attributes (or attributes protected by a ACL that
|
|
# denies read access) cannot be guessed through wildcard DB searches.
|
|
#
|
|
# Copyright (C) Catalyst.Net Ltd
|
|
#
|
|
# This program is free software; you can redistribute it and/or modify
|
|
# it under the terms of the GNU General Public License as published by
|
|
# the Free Software Foundation; either version 3 of the License, or
|
|
# (at your option) any later version.
|
|
#
|
|
# This program is distributed in the hope that it will be useful,
|
|
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
# GNU General Public License for more details.
|
|
#
|
|
# You should have received a copy of the GNU General Public License
|
|
# along with this program. If not, see <http://www.gnu.org/licenses/>.
|
|
#
|
|
import optparse
|
|
import sys
|
|
sys.path.insert(0, "bin/python")
|
|
|
|
import samba
|
|
import random
|
|
import statistics
|
|
import time
|
|
from samba.tests.subunitrun import SubunitOptions, TestProgram
|
|
import samba.getopt as options
|
|
from ldb import SCOPE_BASE, SCOPE_SUBTREE
|
|
from samba.dsdb import SEARCH_FLAG_CONFIDENTIAL, SEARCH_FLAG_RODC_ATTRIBUTE, SEARCH_FLAG_PRESERVEONDELETE
|
|
from ldb import Message, MessageElement, Dn
|
|
from ldb import FLAG_MOD_REPLACE, FLAG_MOD_ADD
|
|
from samba.auth import system_session
|
|
from samba import gensec, sd_utils
|
|
from samba.samdb import SamDB
|
|
from samba.credentials import Credentials, DONT_USE_KERBEROS
|
|
from samba.dcerpc import security
|
|
|
|
import samba.tests
|
|
import samba.dsdb
|
|
|
|
parser = optparse.OptionParser("confidential_attr.py [options] <host>")
|
|
sambaopts = options.SambaOptions(parser)
|
|
parser.add_option_group(sambaopts)
|
|
parser.add_option_group(options.VersionOptions(parser))
|
|
|
|
# use command line creds if available
|
|
credopts = options.CredentialsOptions(parser)
|
|
parser.add_option_group(credopts)
|
|
subunitopts = SubunitOptions(parser)
|
|
parser.add_option_group(subunitopts)
|
|
|
|
opts, args = parser.parse_args()
|
|
|
|
if len(args) < 1:
|
|
parser.print_usage()
|
|
sys.exit(1)
|
|
|
|
host = args[0]
|
|
if "://" not in host:
|
|
ldaphost = "ldap://%s" % host
|
|
else:
|
|
ldaphost = host
|
|
start = host.rindex("://")
|
|
host = host.lstrip(start + 3)
|
|
|
|
lp = sambaopts.get_loadparm()
|
|
creds = credopts.get_credentials(lp)
|
|
creds.set_gensec_features(creds.get_gensec_features() | gensec.FEATURE_SEAL)
|
|
|
|
#
|
|
# Tests start here
|
|
#
|
|
class ConfidentialAttrCommon(samba.tests.TestCase):
|
|
|
|
def setUp(self):
|
|
super(ConfidentialAttrCommon, self).setUp()
|
|
|
|
self.ldb_admin = SamDB(ldaphost, credentials=creds,
|
|
session_info=system_session(lp), lp=lp)
|
|
self.user_pass = "samba123@"
|
|
self.base_dn = self.ldb_admin.domain_dn()
|
|
self.schema_dn = self.ldb_admin.get_schema_basedn()
|
|
self.sd_utils = sd_utils.SDUtils(self.ldb_admin)
|
|
|
|
# the tests work by setting the 'Confidential' bit in the searchFlags
|
|
# for an existing schema attribute. This only works against Windows if
|
|
# the systemFlags does not have FLAG_SCHEMA_BASE_OBJECT set for the
|
|
# schema attribute being modified. There are only a few attributes that
|
|
# meet this criteria (most of which only apply to 'user' objects)
|
|
self.conf_attr = "homePostalAddress"
|
|
attr_cn = "CN=Address-Home"
|
|
# schemaIdGuid for homePostalAddress (used for ACE tests)
|
|
self.conf_attr_guid = "16775781-47f3-11d1-a9c3-0000f80367c1"
|
|
self.conf_attr_sec_guid = "77b5b886-944a-11d1-aebd-0000f80367c1"
|
|
self.attr_dn = "{0},{1}".format(attr_cn, self.schema_dn)
|
|
|
|
userou = "OU=conf-attr-test"
|
|
self.ou = "{0},{1}".format(userou, self.base_dn)
|
|
samba.tests.delete_force(self.ldb_admin, self.ou, controls=['tree_delete:1'])
|
|
self.ldb_admin.create_ou(self.ou)
|
|
self.addCleanup(samba.tests.delete_force, self.ldb_admin, self.ou, controls=['tree_delete:1'])
|
|
|
|
# use a common username prefix, so we can use sAMAccountName=CATC-* as
|
|
# a search filter to only return the users we're interested in
|
|
self.user_prefix = "catc-"
|
|
|
|
# add a test object with this attribute set
|
|
self.conf_value = "abcdef"
|
|
self.conf_user = "{0}conf-user".format(self.user_prefix)
|
|
self.ldb_admin.newuser(self.conf_user, self.user_pass, userou=userou)
|
|
self.conf_dn = self.get_user_dn(self.conf_user)
|
|
self.add_attr(self.conf_dn, self.conf_attr, self.conf_value)
|
|
|
|
# add a sneaky user that will try to steal our secrets
|
|
self.user = "{0}sneaky-user".format(self.user_prefix)
|
|
self.ldb_admin.newuser(self.user, self.user_pass, userou=userou)
|
|
self.ldb_user = self.get_ldb_connection(self.user, self.user_pass)
|
|
|
|
self.all_users = [self.user, self.conf_user]
|
|
|
|
# add some other users that also have confidential attributes, so we
|
|
# check we don't disclose their details, particularly in '!' searches
|
|
for i in range(1, 3):
|
|
username = "{0}other-user{1}".format(self.user_prefix, i)
|
|
self.ldb_admin.newuser(username, self.user_pass, userou=userou)
|
|
userdn = self.get_user_dn(username)
|
|
self.add_attr(userdn, self.conf_attr, "xyz{0}".format(i))
|
|
self.all_users.append(username)
|
|
|
|
# there are 4 users in the OU, plus the OU itself
|
|
self.test_dn = self.ou
|
|
self.total_objects = len(self.all_users) + 1
|
|
self.objects_with_attr = 3
|
|
|
|
# sanity-check the flag is not already set (this'll cause problems if
|
|
# previous test run didn't clean up properly)
|
|
search_flags = int(self.get_attr_search_flags(self.attr_dn))
|
|
if search_flags & SEARCH_FLAG_CONFIDENTIAL|SEARCH_FLAG_RODC_ATTRIBUTE:
|
|
self.set_attr_search_flags(self.attr_dn, str(search_flags &~ (SEARCH_FLAG_CONFIDENTIAL|SEARCH_FLAG_RODC_ATTRIBUTE)))
|
|
search_flags = int(self.get_attr_search_flags(self.attr_dn))
|
|
self.assertEqual(0, search_flags & (SEARCH_FLAG_CONFIDENTIAL|SEARCH_FLAG_RODC_ATTRIBUTE),
|
|
f"{self.conf_attr} searchFlags did not reset to omit SEARCH_FLAG_CONFIDENTIAL and SEARCH_FLAG_RODC_ATTRIBUTE ({search_flags})")
|
|
|
|
def add_attr(self, dn, attr, value):
|
|
m = Message()
|
|
m.dn = Dn(self.ldb_admin, dn)
|
|
m[attr] = MessageElement(value, FLAG_MOD_ADD, attr)
|
|
self.ldb_admin.modify(m)
|
|
|
|
def set_attr_search_flags(self, attr_dn, flags):
|
|
"""Modifies the searchFlags for an object in the schema"""
|
|
m = Message()
|
|
m.dn = Dn(self.ldb_admin, attr_dn)
|
|
m['searchFlags'] = MessageElement(flags, FLAG_MOD_REPLACE,
|
|
'searchFlags')
|
|
self.ldb_admin.modify(m)
|
|
|
|
# note we have to update the schema for this change to take effect (on
|
|
# Windows, at least)
|
|
self.ldb_admin.set_schema_update_now()
|
|
|
|
def get_attr_search_flags(self, attr_dn):
|
|
"""Marks the attribute under test as being confidential"""
|
|
res = self.ldb_admin.search(attr_dn, scope=SCOPE_BASE,
|
|
attrs=['searchFlags'])
|
|
return res[0]['searchFlags'][0]
|
|
|
|
def make_attr_confidential(self):
|
|
"""Marks the attribute under test as being confidential"""
|
|
|
|
# work out the original 'searchFlags' value before we overwrite it
|
|
old_value = self.get_attr_search_flags(self.attr_dn)
|
|
|
|
self.set_attr_search_flags(self.attr_dn, str(SEARCH_FLAG_CONFIDENTIAL))
|
|
|
|
# reset the value after the test completes
|
|
self.addCleanup(self.set_attr_search_flags, self.attr_dn, old_value)
|
|
|
|
def get_user_dn(self, name):
|
|
return "CN={0},{1}".format(name, self.ou)
|
|
|
|
def get_user_sid_string(self, username):
|
|
user_dn = self.get_user_dn(username)
|
|
user_sid = self.sd_utils.get_object_sid(user_dn)
|
|
return str(user_sid)
|
|
|
|
def get_ldb_connection(self, target_username, target_password):
|
|
creds_tmp = Credentials()
|
|
creds_tmp.set_username(target_username)
|
|
creds_tmp.set_password(target_password)
|
|
creds_tmp.set_domain(creds.get_domain())
|
|
creds_tmp.set_realm(creds.get_realm())
|
|
creds_tmp.set_workstation(creds.get_workstation())
|
|
features = creds_tmp.get_gensec_features() | gensec.FEATURE_SEAL
|
|
creds_tmp.set_gensec_features(features)
|
|
creds_tmp.set_kerberos_state(DONT_USE_KERBEROS)
|
|
ldb_target = SamDB(url=ldaphost, credentials=creds_tmp, lp=lp)
|
|
return ldb_target
|
|
|
|
def assert_search_result(self, expected_num, expr, samdb):
|
|
|
|
# try asking for different attributes back: None/all, the confidential
|
|
# attribute itself, and a random unrelated attribute
|
|
attr_filters = [None, ["*"], [self.conf_attr], ['name']]
|
|
for attr in attr_filters:
|
|
res = samdb.search(self.test_dn, expression=expr,
|
|
scope=SCOPE_SUBTREE, attrs=attr)
|
|
self.assertEqual(len(res), expected_num,
|
|
"%u results, not %u for search %s, attr %s" %
|
|
(len(res), expected_num, expr, str(attr)))
|
|
|
|
# return a selection of searches that match exactly against the test object
|
|
def get_exact_match_searches(self):
|
|
first_char = self.conf_value[:1]
|
|
last_char = self.conf_value[-1:]
|
|
test_attr = self.conf_attr
|
|
|
|
searches = [
|
|
# search for the attribute using a sub-string wildcard
|
|
# (which could reveal the attribute's actual value)
|
|
"({0}={1}*)".format(test_attr, first_char),
|
|
"({0}=*{1})".format(test_attr, last_char),
|
|
|
|
# sanity-check equality against an exact match on value
|
|
"({0}={1})".format(test_attr, self.conf_value),
|
|
|
|
# '~=' searches don't work against Samba
|
|
# sanity-check an approx search against an exact match on value
|
|
# "({0}~={1})".format(test_attr, self.conf_value),
|
|
|
|
# check wildcard in an AND search...
|
|
"(&({0}={1}*)(objectclass=*))".format(test_attr, first_char),
|
|
|
|
# ...an OR search (against another term that will never match)
|
|
"(|({0}={1}*)(objectclass=banana))".format(test_attr, first_char)]
|
|
|
|
return searches
|
|
|
|
# return searches that match any object with the attribute under test
|
|
def get_match_all_searches(self):
|
|
searches = [
|
|
# check a full wildcard against the confidential attribute
|
|
# (which could reveal the attribute's presence/absence)
|
|
"({0}=*)".format(self.conf_attr),
|
|
|
|
# check wildcard in an AND search...
|
|
"(&(objectclass=*)({0}=*))".format(self.conf_attr),
|
|
|
|
# ...an OR search (against another term that will never match)
|
|
"(|(objectclass=banana)({0}=*))".format(self.conf_attr),
|
|
|
|
# check <=, and >= expressions that would normally find a match
|
|
"({0}>=0)".format(self.conf_attr),
|
|
"({0}<=ZZZZZZZZZZZ)".format(self.conf_attr)]
|
|
|
|
return searches
|
|
|
|
def assert_conf_attr_searches(self, has_rights_to=0, samdb=None):
|
|
"""Check searches against the attribute under test work as expected"""
|
|
|
|
if samdb is None:
|
|
samdb = self.ldb_user
|
|
|
|
if has_rights_to == "all":
|
|
has_rights_to = self.objects_with_attr
|
|
|
|
# these first few searches we just expect to match against the one
|
|
# object under test that we're trying to guess the value of
|
|
expected_num = 1 if has_rights_to > 0 else 0
|
|
for search in self.get_exact_match_searches():
|
|
self.assert_search_result(expected_num, search, samdb)
|
|
|
|
# these next searches will match any objects we have rights to see
|
|
expected_num = has_rights_to
|
|
for search in self.get_match_all_searches():
|
|
self.assert_search_result(expected_num, search, samdb)
|
|
|
|
# The following are double negative searches (i.e. NOT non-matching-
|
|
# condition) which will therefore match ALL objects, including the test
|
|
# object(s).
|
|
def get_negative_match_all_searches(self):
|
|
first_char = self.conf_value[:1]
|
|
last_char = self.conf_value[-1:]
|
|
not_first_char = chr(ord(first_char) + 1)
|
|
not_last_char = chr(ord(last_char) + 1)
|
|
|
|
searches = [
|
|
"(!({0}={1}*))".format(self.conf_attr, not_first_char),
|
|
"(!({0}=*{1}))".format(self.conf_attr, not_last_char)]
|
|
return searches
|
|
|
|
# the following searches will not match against the test object(s). So
|
|
# a user with sufficient rights will see an inverse sub-set of objects.
|
|
# (An unprivileged user would either see all objects on Windows, or no
|
|
# objects on Samba)
|
|
def get_inverse_match_searches(self):
|
|
first_char = self.conf_value[:1]
|
|
last_char = self.conf_value[-1:]
|
|
searches = [
|
|
"(!({0}={1}*))".format(self.conf_attr, first_char),
|
|
"(!({0}=*{1}))".format(self.conf_attr, last_char)]
|
|
return searches
|
|
|
|
def negative_searches_all_rights(self, total_objects=None):
|
|
expected_results = {}
|
|
|
|
if total_objects is None:
|
|
total_objects = self.total_objects
|
|
|
|
# these searches should match ALL objects (including the OU)
|
|
for search in self.get_negative_match_all_searches():
|
|
expected_results[search] = total_objects
|
|
|
|
# a ! wildcard should only match the objects without the attribute
|
|
search = "(!({0}=*))".format(self.conf_attr)
|
|
expected_results[search] = total_objects - self.objects_with_attr
|
|
|
|
# whereas the inverse searches should match all objects *except* the
|
|
# one under test
|
|
for search in self.get_inverse_match_searches():
|
|
expected_results[search] = total_objects - 1
|
|
|
|
return expected_results
|
|
|
|
# Returns the expected negative (i.e. '!') search behaviour when talking to
|
|
# a DC, i.e. we assert that users
|
|
# without rights always see ALL objects in '!' searches
|
|
def negative_searches_return_all(self, has_rights_to=0,
|
|
total_objects=None):
|
|
"""Asserts user without rights cannot see objects in '!' searches"""
|
|
expected_results = {}
|
|
|
|
if total_objects is None:
|
|
total_objects = self.total_objects
|
|
|
|
# Windows 'hides' objects by always returning all of them, so negative
|
|
# searches that match all objects will simply return all objects
|
|
for search in self.get_negative_match_all_searches():
|
|
expected_results[search] = total_objects
|
|
|
|
# if we're matching on everything except the one object under test
|
|
# (i.e. the inverse subset), we'll still see all objects if
|
|
# has_rights_to == 0. Or we'll see all bar one if has_rights_to == 1.
|
|
inverse_searches = self.get_inverse_match_searches()
|
|
inverse_searches += ["(!({0}=*))".format(self.conf_attr)]
|
|
|
|
for search in inverse_searches:
|
|
expected_results[search] = total_objects - has_rights_to
|
|
|
|
return expected_results
|
|
|
|
# Returns the expected negative (i.e. '!') search behaviour. This varies
|
|
# depending on what type of DC we're talking to (i.e. Windows or Samba)
|
|
# and what access rights the user has.
|
|
# Note we only handle has_rights_to="all", 1 (the test object), or 0 (i.e.
|
|
# we don't have rights to any objects)
|
|
def negative_search_expected_results(self, has_rights_to, total_objects=None):
|
|
|
|
if has_rights_to == "all":
|
|
expect_results = self.negative_searches_all_rights(total_objects)
|
|
|
|
else:
|
|
expect_results = self.negative_searches_return_all(has_rights_to,
|
|
total_objects)
|
|
return expect_results
|
|
|
|
def assert_negative_searches(self, has_rights_to=0, samdb=None):
|
|
"""Asserts user without rights cannot see objects in '!' searches"""
|
|
|
|
if samdb is None:
|
|
samdb = self.ldb_user
|
|
|
|
# build a dictionary of key=search-expr, value=expected_num assertions
|
|
expected_results = self.negative_search_expected_results(has_rights_to)
|
|
|
|
for search, expected_num in expected_results.items():
|
|
self.assert_search_result(expected_num, search, samdb)
|
|
|
|
def assert_attr_returned(self, expect_attr, samdb, attrs):
|
|
# does a query that should always return a successful result, and
|
|
# checks whether the confidential attribute is present
|
|
res = samdb.search(self.conf_dn, expression="(objectClass=*)",
|
|
scope=SCOPE_SUBTREE, attrs=attrs)
|
|
self.assertEqual(1, len(res))
|
|
|
|
attr_returned = False
|
|
for msg in res:
|
|
if self.conf_attr in msg:
|
|
attr_returned = True
|
|
self.assertEqual(expect_attr, attr_returned)
|
|
|
|
def assert_attr_visible(self, expect_attr, samdb=None):
|
|
if samdb is None:
|
|
samdb = self.ldb_user
|
|
|
|
# sanity-check confidential attribute is/isn't returned as expected
|
|
# based on the filter attributes we ask for
|
|
self.assert_attr_returned(expect_attr, samdb, attrs=None)
|
|
self.assert_attr_returned(expect_attr, samdb, attrs=["*"])
|
|
self.assert_attr_returned(expect_attr, samdb, attrs=[self.conf_attr])
|
|
|
|
# filtering on a different attribute should never return the conf_attr
|
|
self.assert_attr_returned(expect_attr=False, samdb=samdb,
|
|
attrs=['name'])
|
|
|
|
def assert_attr_visible_to_admin(self):
|
|
# sanity-check the admin user can always see the confidential attribute
|
|
self.assert_conf_attr_searches(has_rights_to="all",
|
|
samdb=self.ldb_admin)
|
|
self.assert_negative_searches(has_rights_to="all",
|
|
samdb=self.ldb_admin)
|
|
self.assert_attr_visible(expect_attr=True, samdb=self.ldb_admin)
|
|
|
|
|
|
class ConfidentialAttrTest(ConfidentialAttrCommon):
|
|
def test_basic_search(self):
|
|
"""Basic test confidential attributes aren't disclosed via searches"""
|
|
|
|
# check we can see a non-confidential attribute in a basic searches
|
|
self.assert_conf_attr_searches(has_rights_to="all")
|
|
self.assert_negative_searches(has_rights_to="all")
|
|
self.assert_attr_visible(expect_attr=True)
|
|
|
|
# now make the attribute confidential. Repeat the tests and check that
|
|
# an ordinary user can't see the attribute, or indirectly match on the
|
|
# attribute via the search expression
|
|
self.make_attr_confidential()
|
|
|
|
self.assert_conf_attr_searches(has_rights_to=0)
|
|
self.assert_negative_searches(has_rights_to=0)
|
|
self.assert_attr_visible(expect_attr=False)
|
|
|
|
# sanity-check we haven't hidden the attribute from the admin as well
|
|
self.assert_attr_visible_to_admin()
|
|
|
|
def _test_search_with_allow_acl(self, allow_ace):
|
|
"""Checks a ACE with 'CR' rights can override a confidential attr"""
|
|
# make the test attribute confidential and check user can't see it
|
|
self.make_attr_confidential()
|
|
|
|
self.assert_conf_attr_searches(has_rights_to=0)
|
|
self.assert_negative_searches(has_rights_to=0)
|
|
self.assert_attr_visible(expect_attr=False)
|
|
|
|
# apply the allow ACE to the object under test
|
|
self.sd_utils.dacl_add_ace(self.conf_dn, allow_ace)
|
|
|
|
# the user should now be able to see the attribute for the one object
|
|
# we gave it rights to
|
|
self.assert_conf_attr_searches(has_rights_to=1)
|
|
self.assert_negative_searches(has_rights_to=1)
|
|
self.assert_attr_visible(expect_attr=True)
|
|
|
|
# sanity-check the admin can still see the attribute
|
|
self.assert_attr_visible_to_admin()
|
|
|
|
def test_search_with_attr_acl_override(self):
|
|
"""Make the confidential attr visible via an OA attr ACE"""
|
|
|
|
# set the SEC_ADS_CONTROL_ACCESS bit ('CR') for the user for the
|
|
# attribute under test, so the user can see it once more
|
|
user_sid = self.get_user_sid_string(self.user)
|
|
ace = "(OA;;CR;{0};;{1})".format(self.conf_attr_guid, user_sid)
|
|
|
|
self._test_search_with_allow_acl(ace)
|
|
|
|
def test_search_with_propset_acl_override(self):
|
|
"""Make the confidential attr visible via a Property-set ACE"""
|
|
|
|
# set the SEC_ADS_CONTROL_ACCESS bit ('CR') for the user for the
|
|
# property-set containing the attribute under test (i.e. the
|
|
# attributeSecurityGuid), so the user can see it once more
|
|
user_sid = self.get_user_sid_string(self.user)
|
|
ace = "(OA;;CR;{0};;{1})".format(self.conf_attr_sec_guid, user_sid)
|
|
|
|
self._test_search_with_allow_acl(ace)
|
|
|
|
def test_search_with_acl_override(self):
|
|
"""Make the confidential attr visible via a general 'allow' ACE"""
|
|
|
|
# set the allow SEC_ADS_CONTROL_ACCESS bit ('CR') for the user
|
|
user_sid = self.get_user_sid_string(self.user)
|
|
ace = "(A;;CR;;;{0})".format(user_sid)
|
|
|
|
self._test_search_with_allow_acl(ace)
|
|
|
|
def test_search_with_blanket_oa_acl(self):
|
|
"""Make the confidential attr visible via a non-specific OA ACE"""
|
|
|
|
# this just checks that an Object Access (OA) ACE without a GUID
|
|
# specified will work the same as an 'Access' (A) ACE
|
|
user_sid = self.get_user_sid_string(self.user)
|
|
ace = "(OA;;CR;;;{0})".format(user_sid)
|
|
|
|
self._test_search_with_allow_acl(ace)
|
|
|
|
def _test_search_with_neutral_acl(self, neutral_ace):
|
|
"""Checks that a user does NOT gain access via an unrelated ACE"""
|
|
|
|
# make the test attribute confidential and check user can't see it
|
|
self.make_attr_confidential()
|
|
|
|
self.assert_conf_attr_searches(has_rights_to=0)
|
|
self.assert_negative_searches(has_rights_to=0)
|
|
self.assert_attr_visible(expect_attr=False)
|
|
|
|
# apply the ACE to the object under test
|
|
self.sd_utils.dacl_add_ace(self.conf_dn, neutral_ace)
|
|
|
|
# this should make no difference to the user's ability to see the attr
|
|
self.assert_conf_attr_searches(has_rights_to=0)
|
|
self.assert_negative_searches(has_rights_to=0)
|
|
self.assert_attr_visible(expect_attr=False)
|
|
|
|
# sanity-check the admin can still see the attribute
|
|
self.assert_attr_visible_to_admin()
|
|
|
|
def test_search_with_neutral_acl(self):
|
|
"""Give the user all rights *except* CR for any attributes"""
|
|
|
|
# give the user all rights *except* CR and check it makes no difference
|
|
user_sid = self.get_user_sid_string(self.user)
|
|
ace = "(A;;RPWPCCDCLCLORCWOWDSDDTSW;;;{0})".format(user_sid)
|
|
self._test_search_with_neutral_acl(ace)
|
|
|
|
def test_search_with_neutral_attr_acl(self):
|
|
"""Give the user all rights *except* CR for the attribute under test"""
|
|
|
|
# giving user all OA rights *except* CR should make no difference
|
|
user_sid = self.get_user_sid_string(self.user)
|
|
rights = "RPWPCCDCLCLORCWOWDSDDTSW"
|
|
ace = "(OA;;{0};{1};;{2})".format(rights, self.conf_attr_guid, user_sid)
|
|
self._test_search_with_neutral_acl(ace)
|
|
|
|
def test_search_with_neutral_cr_acl(self):
|
|
"""Give the user CR rights for *another* unrelated attribute"""
|
|
|
|
# giving user object-access CR rights to an unrelated attribute
|
|
user_sid = self.get_user_sid_string(self.user)
|
|
# use the GUID for sAMAccountName here (for no particular reason)
|
|
unrelated_attr = "3e0abfd0-126a-11d0-a060-00aa006c33ed"
|
|
ace = "(OA;;CR;{0};;{1})".format(unrelated_attr, user_sid)
|
|
self._test_search_with_neutral_acl(ace)
|
|
|
|
|
|
# Check that a Deny ACL on an attribute doesn't reveal confidential info
|
|
class ConfidentialAttrTestDenyAcl(ConfidentialAttrCommon):
|
|
|
|
def assert_not_in_result(self, res, exclude_dn):
|
|
for msg in res:
|
|
self.assertNotEqual(msg.dn, exclude_dn,
|
|
"Search revealed object {0}".format(exclude_dn))
|
|
|
|
# deny ACL tests are slightly different as we are only denying access to
|
|
# the one object under test (rather than any objects with that attribute).
|
|
# Therefore we need an extra check that we don't reveal the test object
|
|
# in the search, if we're not supposed to
|
|
def assert_search_result(self, expected_num, expr, samdb,
|
|
excl_testobj=False):
|
|
|
|
# try asking for different attributes back: None/all, the confidential
|
|
# attribute itself, and a random unrelated attribute
|
|
attr_filters = [None, ["*"], [self.conf_attr], ['name']]
|
|
for attr in attr_filters:
|
|
res = samdb.search(self.test_dn, expression=expr,
|
|
scope=SCOPE_SUBTREE, attrs=attr)
|
|
self.assertEqual(len(res), expected_num,
|
|
"%u results, not %u for search %s, attr %s" %
|
|
(len(res), expected_num, expr, str(attr)))
|
|
|
|
# assert we haven't revealed the hidden test-object
|
|
if excl_testobj:
|
|
self.assert_not_in_result(res, exclude_dn=self.conf_dn)
|
|
|
|
# we make a few tweaks to the regular version of this function to cater to
|
|
# denying specifically one object via an ACE
|
|
def assert_conf_attr_searches(self, has_rights_to=0, samdb=None):
|
|
"""Check searches against the attribute under test work as expected"""
|
|
|
|
if samdb is None:
|
|
samdb = self.ldb_user
|
|
|
|
# make sure the test object is not returned if we've been denied rights
|
|
# to it via an ACE
|
|
excl_testobj = has_rights_to == "deny-one"
|
|
|
|
# these first few searches we just expect to match against the one
|
|
# object under test that we're trying to guess the value of
|
|
expected_num = 1 if has_rights_to == "all" else 0
|
|
|
|
for search in self.get_exact_match_searches():
|
|
self.assert_search_result(expected_num, search, samdb,
|
|
excl_testobj)
|
|
|
|
# these next searches will match any objects with the attribute that
|
|
# we have rights to see (i.e. all except the object under test)
|
|
if has_rights_to == "all":
|
|
expected_num = self.objects_with_attr
|
|
elif has_rights_to == "deny-one":
|
|
expected_num = self.objects_with_attr - 1
|
|
|
|
for search in self.get_match_all_searches():
|
|
self.assert_search_result(expected_num, search, samdb,
|
|
excl_testobj)
|
|
|
|
# override method specifically for deny ACL test cases
|
|
def negative_searches_return_all(self, has_rights_to=0,
|
|
total_objects=None):
|
|
expected_results = {}
|
|
|
|
# When a user lacks access rights to an object, Windows 'hides' it in
|
|
# '!' searches by always returning it, regardless of whether it matches
|
|
searches = self.get_negative_match_all_searches()
|
|
searches += self.get_inverse_match_searches()
|
|
for search in searches:
|
|
expected_results[search] = self.total_objects
|
|
|
|
# in the wildcard case, the one object we don't have rights to gets
|
|
# bundled in with the objects that don't have the attribute at all
|
|
search = "(!({0}=*))".format(self.conf_attr)
|
|
has_rights_to = self.objects_with_attr - 1
|
|
expected_results[search] = self.total_objects - has_rights_to
|
|
return expected_results
|
|
|
|
# override method specifically for deny ACL test cases
|
|
def assert_negative_searches(self, has_rights_to=0, samdb=None):
|
|
"""Asserts user without rights cannot see objects in '!' searches"""
|
|
|
|
if samdb is None:
|
|
samdb = self.ldb_user
|
|
|
|
# As the deny ACL is only denying access to one particular object, add
|
|
# an extra check that the denied object is not returned. (We can only
|
|
# assert this if the '!'/negative search behaviour is to suppress any
|
|
# objects we don't have access rights to)
|
|
excl_testobj = False
|
|
|
|
# build a dictionary of key=search-expr, value=expected_num assertions
|
|
expected_results = self.negative_search_expected_results(has_rights_to)
|
|
|
|
for search, expected_num in expected_results.items():
|
|
self.assert_search_result(expected_num, search, samdb,
|
|
excl_testobj=excl_testobj)
|
|
|
|
def _test_search_with_deny_acl(self, ace):
|
|
# check the user can see the attribute initially
|
|
self.assert_conf_attr_searches(has_rights_to="all")
|
|
self.assert_negative_searches(has_rights_to="all")
|
|
self.assert_attr_visible(expect_attr=True)
|
|
|
|
# add the ACE that denies access to the attr under test
|
|
self.sd_utils.dacl_add_ace(self.conf_dn, ace)
|
|
|
|
# the user shouldn't be able to see the attribute anymore
|
|
self.assert_conf_attr_searches(has_rights_to="deny-one")
|
|
self.assert_negative_searches(has_rights_to="deny-one")
|
|
self.assert_attr_visible(expect_attr=False)
|
|
|
|
# sanity-check we haven't hidden the attribute from the admin as well
|
|
self.assert_attr_visible_to_admin()
|
|
|
|
def test_search_with_deny_attr_acl(self):
|
|
"""Checks a deny ACE works the same way as a confidential attribute"""
|
|
|
|
# add an ACE that denies the user Read Property (RP) access to the attr
|
|
# (which is similar to making the attribute confidential)
|
|
user_sid = self.get_user_sid_string(self.user)
|
|
ace = "(OD;;RP;{0};;{1})".format(self.conf_attr_guid, user_sid)
|
|
|
|
# check the user cannot see the attribute anymore
|
|
self._test_search_with_deny_acl(ace)
|
|
|
|
def test_search_with_deny_acl(self):
|
|
"""Checks a blanket deny ACE denies access to an object's attributes"""
|
|
|
|
# add an blanket deny ACE for Read Property (RP) rights
|
|
user_dn = self.get_user_dn(self.user)
|
|
user_sid = self.sd_utils.get_object_sid(user_dn)
|
|
ace = "(D;;RP;;;{0})".format(str(user_sid))
|
|
|
|
# check the user cannot see the attribute anymore
|
|
self._test_search_with_deny_acl(ace)
|
|
|
|
def test_search_with_deny_propset_acl(self):
|
|
"""Checks a deny ACE on the attribute's Property-Set"""
|
|
|
|
# add an blanket deny ACE for Read Property (RP) rights
|
|
user_sid = self.get_user_sid_string(self.user)
|
|
ace = "(OD;;RP;{0};;{1})".format(self.conf_attr_sec_guid, user_sid)
|
|
|
|
# check the user cannot see the attribute anymore
|
|
self._test_search_with_deny_acl(ace)
|
|
|
|
def test_search_with_blanket_oa_deny_acl(self):
|
|
"""Checks a non-specific 'OD' ACE works the same as a 'D' ACE"""
|
|
|
|
# this just checks that adding a 'Object Deny' (OD) ACE without
|
|
# specifying a GUID will work the same way as a 'Deny' (D) ACE
|
|
user_sid = self.get_user_sid_string(self.user)
|
|
ace = "(OD;;RP;;;{0})".format(user_sid)
|
|
|
|
# check the user cannot see the attribute anymore
|
|
self._test_search_with_deny_acl(ace)
|
|
|
|
|
|
# Check that using the dirsync controls doesn't reveal confidential attributes
|
|
class ConfidentialAttrTestDirsync(ConfidentialAttrCommon):
|
|
|
|
def setUp(self):
|
|
super(ConfidentialAttrTestDirsync, self).setUp()
|
|
self.dirsync = ["dirsync:1:1:1000"]
|
|
|
|
# because we need to search on the base DN when using the dirsync
|
|
# controls, we need an extra filter for the inverse ('!') search,
|
|
# so we don't get thousands of objects returned
|
|
self.extra_filter = \
|
|
"(&(samaccountname={0}*)(!(isDeleted=*)))".format(self.user_prefix)
|
|
self.single_obj_filter = \
|
|
"(&(samaccountname={0})(!(isDeleted=*)))".format(self.conf_user)
|
|
|
|
self.attr_filters = [None, ["*"], ["name"]]
|
|
|
|
# Note dirsync behaviour is slightly different for the attribute under
|
|
# test - when you have full access rights, it only returns the objects
|
|
# that actually have this attribute (i.e. it doesn't return an empty
|
|
# message with just the DN). So we add the 'name' attribute into the
|
|
# attribute filter to avoid complicating our assertions further
|
|
self.attr_filters += [[self.conf_attr, "name"]]
|
|
|
|
# override method specifically for dirsync, i.e. add dirsync controls
|
|
def assert_search_result(self, expected_num, expr, samdb, base_dn=None):
|
|
|
|
# Note dirsync must always search on the partition base DN
|
|
base_dn = self.base_dn
|
|
|
|
# we need an extra filter for dirsync because:
|
|
# - we search on the base DN, so otherwise the '!' searches return
|
|
# thousands of unrelated results, and
|
|
# - we make the test attribute preserve-on-delete in one case, so we
|
|
# want to weed out results from any previous test runs
|
|
search = "(&{0}{1})".format(expr, self.extra_filter)
|
|
|
|
# If we expect to return multiple results, only check the first
|
|
if expected_num > 0:
|
|
attr_filters = [self.attr_filters[0]]
|
|
else:
|
|
attr_filters = self.attr_filters
|
|
|
|
for attr in attr_filters:
|
|
res = samdb.search(base_dn, expression=search, scope=SCOPE_SUBTREE,
|
|
attrs=attr, controls=self.dirsync)
|
|
self.assertEqual(len(res), expected_num,
|
|
"%u results, not %u for search %s, attr %s" %
|
|
(len(res), expected_num, search, str(attr)))
|
|
|
|
# override method specifically for dirsync, i.e. add dirsync controls
|
|
def assert_attr_returned(self, expect_attr, samdb, attrs,
|
|
no_result_ok=False):
|
|
|
|
# When using dirsync, the base DN we search on needs to be a naming
|
|
# context. Add an extra filter to ignore all the objects we aren't
|
|
# interested in
|
|
expr = self.single_obj_filter
|
|
res = samdb.search(self.base_dn, expression=expr, scope=SCOPE_SUBTREE,
|
|
attrs=attrs, controls=self.dirsync)
|
|
if not no_result_ok:
|
|
self.assertEqual(1, len(res))
|
|
|
|
attr_returned = False
|
|
for msg in res:
|
|
if self.conf_attr in msg and len(msg[self.conf_attr]) > 0:
|
|
attr_returned = True
|
|
self.assertEqual(expect_attr, attr_returned)
|
|
|
|
# override method specifically for dirsync (it has slightly different
|
|
# behaviour to normal when requesting specific attributes)
|
|
def assert_attr_visible(self, expect_attr, samdb=None):
|
|
if samdb is None:
|
|
samdb = self.ldb_user
|
|
|
|
# sanity-check confidential attribute is/isn't returned as expected
|
|
# based on the filter attributes we ask for
|
|
self.assert_attr_returned(expect_attr, samdb, attrs=None)
|
|
self.assert_attr_returned(expect_attr, samdb, attrs=["*"])
|
|
|
|
if expect_attr:
|
|
self.assert_attr_returned(expect_attr, samdb,
|
|
attrs=[self.conf_attr])
|
|
else:
|
|
# The behaviour with dirsync when asking solely for an attribute
|
|
# that you don't have rights to is a bit strange. Samba returns
|
|
# no result rather than an empty message with just the DN.
|
|
# Presumably this is due to dirsync module behaviour. It's not
|
|
# disclosive in that the DC behaves the same way as if you asked
|
|
# for a garbage/non-existent attribute
|
|
self.assert_attr_returned(expect_attr, samdb,
|
|
attrs=[self.conf_attr],
|
|
no_result_ok=True)
|
|
self.assert_attr_returned(expect_attr, samdb,
|
|
attrs=["garbage"], no_result_ok=True)
|
|
|
|
# filtering on a different attribute should never return the conf_attr
|
|
self.assert_attr_returned(expect_attr=False, samdb=samdb,
|
|
attrs=['name'])
|
|
|
|
# override method specifically for dirsync (total object count differs)
|
|
def assert_negative_searches(self, has_rights_to=0, samdb=None):
|
|
"""Asserts user without rights cannot see objects in '!' searches"""
|
|
|
|
if samdb is None:
|
|
samdb = self.ldb_user
|
|
|
|
# because dirsync uses an extra filter, the total objects we expect
|
|
# here only includes the user objects (not the parent OU)
|
|
total_objects = len(self.all_users)
|
|
expected_results = self.negative_search_expected_results(has_rights_to,
|
|
total_objects)
|
|
|
|
for search, expected_num in expected_results.items():
|
|
self.assert_search_result(expected_num, search, samdb)
|
|
|
|
def test_search_with_dirsync(self):
|
|
"""Checks dirsync controls don't reveal confidential attributes"""
|
|
|
|
self.assert_conf_attr_searches(has_rights_to="all")
|
|
self.assert_attr_visible(expect_attr=True)
|
|
self.assert_negative_searches(has_rights_to="all")
|
|
|
|
# make the test attribute confidential and check user can't see it,
|
|
# even if they use the dirsync controls
|
|
self.make_attr_confidential()
|
|
|
|
self.assert_conf_attr_searches(has_rights_to=0)
|
|
self.assert_attr_visible(expect_attr=False)
|
|
self.assert_negative_searches(has_rights_to=0)
|
|
|
|
# as a final sanity-check, make sure the admin can still see the attr
|
|
self.assert_conf_attr_searches(has_rights_to="all",
|
|
samdb=self.ldb_admin)
|
|
self.assert_attr_visible(expect_attr=True, samdb=self.ldb_admin)
|
|
self.assert_negative_searches(has_rights_to="all",
|
|
samdb=self.ldb_admin)
|
|
|
|
def get_guid_string(self, dn):
|
|
"""Returns an object's GUID (in string format)"""
|
|
res = self.ldb_admin.search(base=dn, attrs=["objectGUID"],
|
|
scope=SCOPE_BASE)
|
|
guid = res[0]['objectGUID'][0]
|
|
return self.ldb_admin.schema_format_value("objectGUID", guid).decode('utf-8')
|
|
|
|
def make_attr_preserve_on_delete(self):
|
|
"""Marks the attribute under test as being preserve on delete"""
|
|
|
|
# work out the original 'searchFlags' value before we overwrite it
|
|
search_flags = int(self.get_attr_search_flags(self.attr_dn))
|
|
|
|
# check we've already set the confidential flag
|
|
self.assertNotEqual(0, search_flags & SEARCH_FLAG_CONFIDENTIAL)
|
|
search_flags |= SEARCH_FLAG_PRESERVEONDELETE
|
|
|
|
self.set_attr_search_flags(self.attr_dn, str(search_flags))
|
|
|
|
def change_attr_under_test(self, attr_name, attr_cn):
|
|
# change the attribute that the test code uses
|
|
self.conf_attr = attr_name
|
|
self.attr_dn = "{0},{1}".format(attr_cn, self.schema_dn)
|
|
|
|
# set the new attribute for the user-under-test
|
|
self.add_attr(self.conf_dn, self.conf_attr, self.conf_value)
|
|
|
|
# 2 other users also have the attribute-under-test set (to a randomish
|
|
# value). Set the new attribute for them now (normally this gets done
|
|
# in the setUp())
|
|
for username in self.all_users:
|
|
if "other-user" in username:
|
|
dn = self.get_user_dn(username)
|
|
self.add_attr(dn, self.conf_attr, "xyz-blah")
|
|
|
|
def test_search_with_dirsync_deleted_objects(self):
|
|
"""Checks dirsync doesn't reveal confidential info for deleted objs"""
|
|
|
|
# change the attribute we're testing (we'll preserve on delete for this
|
|
# test case, which means the attribute-under-test hangs around after
|
|
# the test case finishes, and would interfere with the searches for
|
|
# subsequent other test cases)
|
|
self.change_attr_under_test("carLicense", "CN=carLicense")
|
|
|
|
# Windows dirsync behaviour is a little strange when you request
|
|
# attributes that deleted objects no longer have, so just request 'all
|
|
# attributes' to simplify the test logic
|
|
self.attr_filters = [None, ["*"]]
|
|
|
|
# normally dirsync uses extra filters to exclude deleted objects that
|
|
# we're not interested in. Override these filters so they WILL include
|
|
# deleted objects, but only from this particular test run. We can do
|
|
# this by matching lastKnownParent against this test case's OU, which
|
|
# will match any deleted child objects.
|
|
ou_guid = self.get_guid_string(self.ou)
|
|
deleted_filter = "(lastKnownParent=<GUID={0}>)".format(ou_guid)
|
|
|
|
# the extra-filter will get combined via AND with the search expression
|
|
# we're testing, i.e. filter on the confidential attribute AND only
|
|
# include non-deleted objects, OR deleted objects from this test run
|
|
exclude_deleted_objs_filter = self.extra_filter
|
|
self.extra_filter = "(|{0}{1})".format(exclude_deleted_objs_filter,
|
|
deleted_filter)
|
|
|
|
# for matching on a single object, the search expresseion becomes:
|
|
# match exactly by account-name AND either a non-deleted object OR a
|
|
# deleted object from this test run
|
|
match_by_name = "(samaccountname={0})".format(self.conf_user)
|
|
not_deleted = "(!(isDeleted=*))"
|
|
self.single_obj_filter = "(&{0}(|{1}{2}))".format(match_by_name,
|
|
not_deleted,
|
|
deleted_filter)
|
|
|
|
# check that the search filters work as expected
|
|
self.assert_conf_attr_searches(has_rights_to="all")
|
|
self.assert_attr_visible(expect_attr=True)
|
|
self.assert_negative_searches(has_rights_to="all")
|
|
|
|
# make the test attribute confidential *and* preserve on delete.
|
|
self.make_attr_confidential()
|
|
self.make_attr_preserve_on_delete()
|
|
|
|
# check we can't see the objects now, even with using dirsync controls
|
|
self.assert_conf_attr_searches(has_rights_to=0)
|
|
self.assert_attr_visible(expect_attr=False)
|
|
self.assert_negative_searches(has_rights_to=0)
|
|
|
|
# now delete the users (except for the user whose LDB connection
|
|
# we're currently using)
|
|
for user in self.all_users:
|
|
if user is not self.user:
|
|
self.ldb_admin.delete(self.get_user_dn(user))
|
|
|
|
# check we still can't see the objects
|
|
self.assert_conf_attr_searches(has_rights_to=0)
|
|
self.assert_negative_searches(has_rights_to=0)
|
|
|
|
def test_timing_attack(self):
|
|
# Create the machine account.
|
|
mach_name = f'conf_timing_{random.randint(0, 0xffff)}'
|
|
mach_dn = Dn(self.ldb_admin, f'CN={mach_name},{self.ou}')
|
|
details = {
|
|
'dn': mach_dn,
|
|
'objectclass': 'computer',
|
|
'sAMAccountName': f'{mach_name}$',
|
|
}
|
|
self.ldb_admin.add(details)
|
|
|
|
# Get the machine account's GUID.
|
|
res = self.ldb_admin.search(mach_dn,
|
|
attrs=['objectGUID'],
|
|
scope=SCOPE_BASE)
|
|
mach_guid = res[0].get('objectGUID', idx=0)
|
|
|
|
# Now we can create an msFVE-RecoveryInformation object that is a child
|
|
# of the machine account object.
|
|
recovery_dn = Dn(self.ldb_admin, str(mach_dn))
|
|
recovery_dn.add_child('CN=recovery_info')
|
|
|
|
secret_pw = 'Secret007'
|
|
not_secret_pw = 'Secret008'
|
|
|
|
secret_pw_utf8 = secret_pw.encode('utf-8')
|
|
|
|
# The crucial attribute, msFVE-RecoveryPassword, is a confidential
|
|
# attribute.
|
|
conf_attr = 'msFVE-RecoveryPassword'
|
|
|
|
m = Message(recovery_dn)
|
|
m['objectClass'] = 'msFVE-RecoveryInformation'
|
|
m['msFVE-RecoveryGuid'] = mach_guid
|
|
m[conf_attr] = secret_pw
|
|
self.ldb_admin.add(m)
|
|
|
|
attrs = [conf_attr]
|
|
|
|
# Search for the confidential attribute as administrator, ensuring it
|
|
# is visible.
|
|
res = self.ldb_admin.search(recovery_dn,
|
|
attrs=attrs,
|
|
scope=SCOPE_BASE)
|
|
self.assertEqual(1, len(res))
|
|
pw = res[0].get(conf_attr, idx=0)
|
|
self.assertEqual(secret_pw_utf8, pw)
|
|
|
|
# Repeat the search with an expression matching on the confidential
|
|
# attribute. This should also work.
|
|
res = self.ldb_admin.search(
|
|
recovery_dn,
|
|
attrs=attrs,
|
|
expression=f'({conf_attr}={secret_pw})',
|
|
scope=SCOPE_BASE)
|
|
self.assertEqual(1, len(res))
|
|
pw = res[0].get(conf_attr, idx=0)
|
|
self.assertEqual(secret_pw_utf8, pw)
|
|
|
|
# Search for the attribute as an unprivileged user. It should not be
|
|
# visible.
|
|
user_res = self.ldb_user.search(recovery_dn,
|
|
attrs=attrs,
|
|
scope=SCOPE_BASE)
|
|
pw = user_res[0].get(conf_attr, idx=0)
|
|
# The attribute should be None.
|
|
self.assertIsNone(pw)
|
|
|
|
# We use LDAP_MATCHING_RULE_TRANSITIVE_EVAL to create a search
|
|
# expression that takes a long time to execute, by setting off another
|
|
# search each time it is evaluated. It makes no difference that the
|
|
# object on which we're searching has no 'member' attribute.
|
|
dummy_dn = 'cn=user,cn=users,dc=samba,dc=example,dc=com'
|
|
slow_subexpr = f'(member:1.2.840.113556.1.4.1941:={dummy_dn})'
|
|
slow_expr = f'(|{slow_subexpr * 100})'
|
|
|
|
# The full search expression. It comprises a match on the confidential
|
|
# attribute joined by an AND to our slow search expression, The AND
|
|
# operator is short-circuiting, so if our first subexpression fails to
|
|
# match, we'll bail out of the search early. Otherwise, we'll evaluate
|
|
# the slow part; as its subexpressions are joined by ORs, and will all
|
|
# fail to match, every one of them will need to be evaluated. By
|
|
# measuring how long the search takes, we'll be able to infer whether
|
|
# the confidential attribute matched or not.
|
|
|
|
# This is bad if we are not an administrator, and are able to use this
|
|
# to determine the values of confidential attributes. Therefore we need
|
|
# to ensure we can't observe any difference in timing.
|
|
correct_expr = f'(&({conf_attr}={secret_pw}){slow_expr})'
|
|
wrong_expr = f'(&({conf_attr}={not_secret_pw}){slow_expr})'
|
|
|
|
def standard_uncertainty_bounds(times):
|
|
mean = statistics.mean(times)
|
|
stdev = statistics.stdev(times, mean)
|
|
|
|
return (mean - stdev, mean + stdev)
|
|
|
|
# Perform a number of searches with both correct and incorrect
|
|
# expressions, and return the uncertainty bounds for each.
|
|
def time_searches(samdb):
|
|
warmup_samples = 3
|
|
samples = 10
|
|
matching_times = []
|
|
non_matching_times = []
|
|
|
|
for _ in range(warmup_samples):
|
|
samdb.search(recovery_dn,
|
|
attrs=attrs,
|
|
expression=correct_expr,
|
|
scope=SCOPE_BASE)
|
|
|
|
for _ in range(samples):
|
|
# Measure the time taken for a search, for both a matching and
|
|
# a non-matching search expression.
|
|
|
|
prev = time.time()
|
|
samdb.search(recovery_dn,
|
|
attrs=attrs,
|
|
expression=correct_expr,
|
|
scope=SCOPE_BASE)
|
|
now = time.time()
|
|
matching_times.append(now - prev)
|
|
|
|
prev = time.time()
|
|
samdb.search(recovery_dn,
|
|
attrs=attrs,
|
|
expression=wrong_expr,
|
|
scope=SCOPE_BASE)
|
|
now = time.time()
|
|
non_matching_times.append(now - prev)
|
|
|
|
matching = standard_uncertainty_bounds(matching_times)
|
|
non_matching = standard_uncertainty_bounds(non_matching_times)
|
|
return matching, non_matching
|
|
|
|
def assertRangesDistinct(a, b):
|
|
a0, a1 = a
|
|
b0, b1 = b
|
|
self.assertLess(min(a1, b1), max(a0, b0))
|
|
|
|
def assertRangesOverlap(a, b):
|
|
a0, a1 = a
|
|
b0, b1 = b
|
|
self.assertGreaterEqual(min(a1, b1), max(a0, b0))
|
|
|
|
# For an administrator, the uncertainty bounds for matching and
|
|
# non-matching searches should be distinct. This shows that the two
|
|
# cases are distinguishable, and therefore that confidential attributes
|
|
# are visible.
|
|
admin_matching, admin_non_matching = time_searches(self.ldb_admin)
|
|
assertRangesDistinct(admin_matching, admin_non_matching)
|
|
|
|
# The user cannot view the confidential attribute, so the uncertainty
|
|
# bounds for matching and non-matching searches must overlap. The two
|
|
# cases must be indistinguishable.
|
|
user_matching, user_non_matching = time_searches(self.ldb_user)
|
|
assertRangesOverlap(user_matching, user_non_matching)
|
|
|
|
# Check that using the dirsync controls doesn't reveal confidential
|
|
# "RODC filtered attribute" values to users with only
|
|
# GUID_DRS_GET_CHANGES. The tests is so similar to the Confidential
|
|
# attribute test we base it on that.
|
|
class RodcFilteredAttrDirsync(ConfidentialAttrTestDirsync):
|
|
|
|
def setUp(self):
|
|
super().setUp()
|
|
self.dirsync = ["dirsync:1:0:1000"]
|
|
|
|
user_sid = self.sd_utils.get_object_sid(self.get_user_dn(self.user))
|
|
mod = "(OA;;CR;%s;;%s)" % (security.GUID_DRS_GET_CHANGES,
|
|
str(user_sid))
|
|
self.sd_utils.dacl_add_ace(self.base_dn, mod)
|
|
|
|
self.ldb_user = self.get_ldb_connection(self.user, self.user_pass)
|
|
|
|
self.addCleanup(self.sd_utils.dacl_delete_aces, self.base_dn, mod)
|
|
|
|
def make_attr_confidential(self):
|
|
"""Marks the attribute under test as being confidential AND RODC
|
|
filtered (which should mean it is not visible with only
|
|
GUID_DRS_GET_CHANGES)
|
|
"""
|
|
|
|
# work out the original 'searchFlags' value before we overwrite it
|
|
old_value = self.get_attr_search_flags(self.attr_dn)
|
|
|
|
self.set_attr_search_flags(self.attr_dn, str(SEARCH_FLAG_RODC_ATTRIBUTE|SEARCH_FLAG_CONFIDENTIAL))
|
|
|
|
# reset the value after the test completes
|
|
self.addCleanup(self.set_attr_search_flags, self.attr_dn, old_value)
|
|
|
|
|
|
TestProgram(module=__name__, opts=subunitopts)
|