1
0
mirror of https://github.com/samba-team/samba.git synced 2024-12-23 17:34:34 +03:00
samba-mirror/lib/ldb/tests/python/index.py
Douglas Bagnall e49a0b444a ldb: remove some 'if PY3's in tests
Signed-off-by: Douglas Bagnall <douglas.bagnall@catalyst.net.nz>
Reviewed-by: David Mulder <dmulder@suse.com>

Autobuild-User(master): David Mulder <dmulder@samba.org>
Autobuild-Date(master): Mon Feb 22 15:50:55 UTC 2021 on sn-devel-184
2021-02-22 15:50:55 +00:00

1455 lines
58 KiB
Python
Executable File

#!/usr/bin/env python3
#
# Tests for truncated index keys
#
# Copyright (C) Andrew Bartlett <abartlet@samba.org> 2018
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
"""Tests for truncated index keys
Databases such as lmdb have a maximum key length, these tests ensure that
ldb behaves correctly in those circumstances.
"""
import os
from unittest import TestCase
import sys
import ldb
import shutil
TDB_PREFIX = "tdb://"
MDB_PREFIX = "mdb://"
def tempdir():
import tempfile
try:
dir_prefix = os.path.join(os.environ["SELFTEST_PREFIX"], "tmp")
except KeyError:
dir_prefix = None
return tempfile.mkdtemp(dir=dir_prefix)
def contains(result, dn):
if result is None:
return False
for r in result:
if str(r["dn"]) == dn:
return True
return False
class LdbBaseTest(TestCase):
def setUp(self):
super(LdbBaseTest, self).setUp()
try:
if self.prefix is None:
self.prefix = TDB_PREFIX
except AttributeError:
self.prefix = TDB_PREFIX
def tearDown(self):
super(LdbBaseTest, self).tearDown()
def url(self):
return self.prefix + self.filename
def flags(self):
if self.prefix == MDB_PREFIX:
return ldb.FLG_NOSYNC
else:
return 0
class MaxIndexKeyLengthTests(LdbBaseTest):
def checkGuids(self, key, guids):
#
# This check relies on the current implementation where the indexes
# are in the same database as the data.
#
# It checks that the index record exists, unless guids is None then
# the record must not exist. And the it contains the expected guid
# entries.
#
# The caller needs to provide the GUID's in the expected order
#
res = self.l.search(
base=key,
scope=ldb.SCOPE_BASE)
if guids is None:
self.assertEqual(len(res), 0)
return
self.assertEqual(len(res), 1)
# The GUID index format has only one value
index = res[0]["@IDX"][0]
self.assertEqual(len(guids), len(index))
self.assertEqual(guids, index)
def tearDown(self):
shutil.rmtree(self.testdir)
super(MaxIndexKeyLengthTests, self).tearDown()
# Ensure the LDB is closed now, so we close the FD
del(self.l)
def setUp(self):
super(MaxIndexKeyLengthTests, self).setUp()
self.testdir = tempdir()
self.filename = os.path.join(self.testdir, "key_len_test.ldb")
# Note that the maximum key length is set to 54
# This accounts for the 4 bytes added by the dn formatting
# a leading dn=, and a trailing zero terminator
#
self.l = ldb.Ldb(self.url(),
options=[
"modules:rdn_name",
"max_key_len_for_self_test:54"])
self.l.add({"dn": "@ATTRIBUTES",
"uniqueThing": "UNIQUE_INDEX"})
self.l.add({"dn": "@INDEXLIST",
"@IDXATTR": [
b"uniqueThing",
b"notUnique",
b"base64____lt",
b"base64_____eq",
b"base64______gt"],
"@IDXONE": [b"1"],
"@IDXGUID": [b"objectUUID"],
"@IDX_DN_GUID": [b"GUID"]})
# Add a value to a unique index that exceeds the maximum key length
# This should be rejected.
def test_add_long_unique_add(self):
try:
self.l.add({"dn": "OU=UNIQUE_MAX_LEN,DC=SAMBA,DC=ORG",
"objectUUID": b"0123456789abcdef",
"uniqueThing": "01234567890123456789012345678901"})
# index key will be
# "@INDEX:UNIQUETHING:01234567890123456789012345678901"
self.fail("Should have failed on long index key")
except ldb.LdbError as err:
enum = err.args[0]
self.assertEqual(enum, ldb.ERR_CONSTRAINT_VIOLATION)
# Test that DN's longer the maximum key length can be added
# and that duplicate DN's are rejected correctly
def test_add_long_dn_add(self):
#
# For all entries the DN index key gets truncated to
# @INDEX:@IDXDN:OU=A_LONG_DNXXXXXXXXXXXXXXX,DC=SAMBA
#
self.l.add({"dn": "OU=A_LONG_DNXXXXXXXXXXXXXXX,DC=SAMBA,DC=ORG",
"objectUUID": b"0123456789abcdef"})
self.checkGuids(
"@INDEX#@IDXDN#OU=A_LONG_DNXXXXXXXXXXXXXXX,DC=SAMBA",
b"0123456789abcdef")
self.l.add({"dn": "OU=A_LONG_DNXXXXXXXXXXXXXXX,DC=SAMBA,DC=COM",
"objectUUID": b"0123456789abcde0"})
self.checkGuids(
"@INDEX#@IDXDN#OU=A_LONG_DNXXXXXXXXXXXXXXX,DC=SAMBA",
b"0123456789abcde0" + b"0123456789abcdef")
self.l.add({"dn": "OU=A_LONG_DNXXXXXXXXXXXXXXX,DC=SAMBA,DC=GOV",
"objectUUID": b"0123456789abcde1"})
self.checkGuids(
"@INDEX#@IDXDN#OU=A_LONG_DNXXXXXXXXXXXXXXX,DC=SAMBA",
b"0123456789abcde0" + b"0123456789abcde1" + b"0123456789abcdef")
# Key is equal to max length does not get inserted into the truncated
# key namespace
self.l.add({"dn": "OU=A_LONG_DNXXXXXXXXXXXXXXX,DC=SAMBA",
"objectUUID": b"0123456789abcde5"})
self.checkGuids(
"@INDEX:@IDXDN:OU=A_LONG_DNXXXXXXXXXXXXXXX,DC=SAMBA",
b"0123456789abcde5")
# This key should not get truncated, as it's one character less than
# max, and will not be in the truncate name space
self.l.add({"dn": "OU=A_LONG_DNXXXXXXXXXXXXXX,DC=SAMBA",
"objectUUID": b"0123456789abcde7"})
self.checkGuids(
"@INDEX:@IDXDN:OU=A_LONG_DNXXXXXXXXXXXXXX,DC=SAMBA",
b"0123456789abcde7")
try:
self.l.add({"dn": "OU=A_LONG_DNXXXXXXXXXXXXXXX,DC=SAMBA,DC=ORG",
"objectUUID": b"0123456789abcde2"})
except ldb.LdbError as err:
enum = err.args[0]
self.assertEqual(enum, ldb.ERR_ENTRY_ALREADY_EXISTS)
try:
self.l.add({"dn": "OU=A_LONG_DNXXXXXXXXXXXXXXX,DC=SAMBA,DC=COM",
"objectUUID": b"0123456789abcde3"})
except ldb.LdbError as err:
enum = err.args[0]
self.assertEqual(enum, ldb.ERR_ENTRY_ALREADY_EXISTS)
try:
self.l.add({"dn": "OU=A_LONG_DNXXXXXXXXXXXXXXX,DC=SAMBA,DC=GOV",
"objectUUID": b"0123456789abcde4"})
except ldb.LdbError as err:
enum = err.args[0]
self.assertEqual(enum, ldb.ERR_ENTRY_ALREADY_EXISTS)
try:
self.l.add({"dn": "OU=A_LONG_DNXXXXXXXXXXXXXXX,DC=SAMBA",
"objectUUID": b"0123456789abcde6"})
except ldb.LdbError as err:
enum = err.args[0]
self.assertEqual(enum, ldb.ERR_ENTRY_ALREADY_EXISTS)
try:
self.l.add({"dn": "OU=A_LONG_DNXXXXXXXXXX,DC=SAMBA",
"objectUUID": b"0123456789abcde8"})
except ldb.LdbError as err:
enum = err.args[0]
self.assertEqual(enum, ldb.ERR_ENTRY_ALREADY_EXISTS)
def test_rename_truncated_dn_keys(self):
# For all entries the DN index key gets truncated to
# 0 1 2 3 4 5
# 12345678901234567890123456789012345678901234567890
# @INDEX:@IDXDN:OU=A_LONG_DNXXXXXXXXXXXXXXX,DC=SAMBA
self.l.add({"dn": "OU=A_LONG_DNXXXXXXXXXXXXXXX,DC=SAMBA,DC=ORG",
"objectUUID": b"0123456789abcdef"})
self.checkGuids(
"@INDEX#@IDXDN#OU=A_LONG_DNXXXXXXXXXXXXXXX,DC=SAMBA",
b"0123456789abcdef")
self.l.add({"dn": "OU=A_LONG_DNXXXXXXXXXXXXXXX,DC=SAMBA,DC=COM",
"objectUUID": b"0123456789abcde0"})
self.checkGuids(
"@INDEX#@IDXDN#OU=A_LONG_DNXXXXXXXXXXXXXXX,DC=SAMBA",
b"0123456789abcde0" + b"0123456789abcdef")
# Non conflicting rename, should succeed
self.l.rename("OU=A_LONG_DNXXXXXXXXXXXXXXX,DC=SAMBA,DC=ORG",
"OU=A_LONG_DNXXXXXXXXXXXXXXX,DC=SAMBA,DC=GOV")
# Index should be unchanged.
self.checkGuids(
"@INDEX#@IDXDN#OU=A_LONG_DNXXXXXXXXXXXXXXX,DC=SAMBA",
b"0123456789abcde0" + b"0123456789abcdef")
# Conflicting rename should fail
try:
self.l.rename("OU=A_LONG_DNXXXXXXXXXXXXXXX,DC=SAMBA,DC=COM",
"OU=A_LONG_DNXXXXXXXXXXXXXXX,DC=SAMBA,DC=GOV")
except ldb.LdbError as err:
enum = err.args[0]
self.assertEqual(enum, ldb.ERR_ENTRY_ALREADY_EXISTS)
def test_delete_truncated_dn_keys(self):
#
# For all entries the DN index key gets truncated to
# 0 1 2 3 4 5
# 12345678901234567890123456789012345678901234567890
# @INDEX:@IDXDN:OU=A_LONG_DNXXXXXXXXXXXXXXX,DC=SAMBA
#
self.l.add({"dn": "OU=A_LONG_DNXXXXXXXXXXXXXXX,DC=SAMBA,DC=ORG",
"objectUUID": b"0123456789abcdef"})
self.checkGuids(
"@INDEX#@IDXDN#OU=A_LONG_DNXXXXXXXXXXXXXXX,DC=SAMBA",
b"0123456789abcdef")
self.l.add({"dn": "OU=A_LONG_DNXXXXXXXXXXXXXXX,DC=SAMBA,DC=GOV",
"objectUUID": b"0123456789abcde1"})
self.checkGuids(
"@INDEX#@IDXDN#OU=A_LONG_DNXXXXXXXXXXXXXXX,DC=SAMBA",
b"0123456789abcde1" + b"0123456789abcdef")
self.l.add({"dn": "OU=A_LONG_DNXXXXXXXXXXXXXXX,DC=SAMBA",
"objectUUID": b"0123456789abcde5"})
self.checkGuids(
"@INDEX:@IDXDN:OU=A_LONG_DNXXXXXXXXXXXXXXX,DC=SAMBA",
b"0123456789abcde5")
# Try to delete a non existent DN with a truncated key
try:
self.l.delete("OU=A_LONG_DNXXXXXXXXXXXXXXX,DC=SAMBA,DC=COM")
except ldb.LdbError as err:
enum = err.args[0]
self.assertEqual(enum, ldb.ERR_NO_SUCH_OBJECT)
# Ensure that non of the other truncated DN's got deleted
res = self.l.search(
base="OU=A_LONG_DNXXXXXXXXXXXXXXX,DC=SAMBA,DC=ORG")
self.assertEqual(len(res), 1)
res = self.l.search(
base="OU=A_LONG_DNXXXXXXXXXXXXXXX,DC=SAMBA,DC=GOV")
self.assertEqual(len(res), 1)
# Ensure that the non truncated DN did not get deleted
res = self.l.search(
base="OU=A_LONG_DNXXXXXXXXXXXXXXX,DC=SAMBA")
self.assertEqual(len(res), 1)
# Check the indexes are correct
self.checkGuids(
"@INDEX#@IDXDN#OU=A_LONG_DNXXXXXXXXXXXXXXX,DC=SAMBA",
b"0123456789abcde1" + b"0123456789abcdef")
self.checkGuids(
"@INDEX:@IDXDN:OU=A_LONG_DNXXXXXXXXXXXXXXX,DC=SAMBA",
b"0123456789abcde5")
# delete an existing entry
self.l.delete("OU=A_LONG_DNXXXXXXXXXXXXXXX,DC=SAMBA,DC=ORG")
# Ensure it got deleted
res = self.l.search(base="OU=A_LONG_DNXXXXXXXXXXXXXXX,DC=SAMBA,DC=ORG")
self.assertEqual(len(res), 0)
# Ensure that non of the other truncated DN's got deleted
res = self.l.search(base="OU=A_LONG_DNXXXXXXXXXXXXXXX,DC=SAMBA,DC=GOV")
self.assertEqual(len(res), 1)
# Ensure the non truncated entry did not get deleted.
res = self.l.search(base="OU=A_LONG_DNXXXXXXXXXXXXXXX,DC=SAMBA")
self.assertEqual(len(res), 1)
# Check the indexes are correct
self.checkGuids(
"@INDEX#@IDXDN#OU=A_LONG_DNXXXXXXXXXXXXXXX,DC=SAMBA",
b"0123456789abcde1")
self.checkGuids(
"@INDEX:@IDXDN:OU=A_LONG_DNXXXXXXXXXXXXXXX,DC=SAMBA",
b"0123456789abcde5")
# delete an existing entry
self.l.delete("OU=A_LONG_DNXXXXXXXXXXXXXXX,DC=SAMBA,DC=GOV")
# Ensure it got deleted
res = self.l.search(base="OU=A_LONG_DNXXXXXXXXXXX,DC=SAMBA,DC=GOV")
self.assertEqual(len(res), 0)
# Ensure that non of the non truncated DN's got deleted
res = self.l.search(base="OU=A_LONG_DNXXXXXXXXXXXXXXX,DC=SAMBA")
self.assertEqual(len(res), 1)
# Check the indexes are correct
self.checkGuids(
"@INDEX#@IDXDN#OU=A_LONG_DNXXXXXXXXXXXXXXX,DC=SAMBA",
None)
self.checkGuids(
"@INDEX:@IDXDN:OU=A_LONG_DNXXXXXXXXXXXXXXX,DC=SAMBA",
b"0123456789abcde5")
# delete an existing entry
self.l.delete("OU=A_LONG_DNXXXXXXXXXXXXXXX,DC=SAMBA")
# Ensure it got deleted
res = self.l.search(base="OU=A_LONG_DNXXXXXXXXXXXXXXX,DC=SAMBAxxx")
self.assertEqual(len(res), 0)
self.checkGuids(
"@INDEX:@IDXDN:OU=A_LONG_DNXXXXXXXXXXXXXXX,DC=SAMBA",
None)
def test_search_truncated_dn_keys(self):
#
# For all entries the DN index key gets truncated to
# 0 1 2 3 4 5
# 12345678901234567890123456789012345678901234567890
# @INDEX:@IDXDN:OU=A_LONG_DNXXXXXXXXXXXXXXX,DC=SAMBA
#
self.l.add({"dn": "OU=A_LONG_DNXXXXXXXXXXXXXXX,DC=SAMBA,DC=ORG",
"objectUUID": b"0123456789abcdef"})
self.checkGuids(
"@INDEX#@IDXDN#OU=A_LONG_DNXXXXXXXXXXXXXXX,DC=SAMBA",
b"0123456789abcdef")
self.l.add({"dn": "OU=A_LONG_DNXXXXXXXXXXXXXXX,DC=SAMBA,DC=GOV",
"objectUUID": b"0123456789abcde1"})
self.checkGuids(
"@INDEX#@IDXDN#OU=A_LONG_DNXXXXXXXXXXXXXXX,DC=SAMBA",
b"0123456789abcde1" + b"0123456789abcdef")
self.l.add({"dn": "OU=A_LONG_DNXXXXXXXXXXXXXXX,DC=SAMBA",
"objectUUID": b"0123456789abcde5"})
self.checkGuids(
"@INDEX:@IDXDN:OU=A_LONG_DNXXXXXXXXXXXXXXX,DC=SAMBA",
b"0123456789abcde5")
res = self.l.search(base="OU=A_LONG_DNXXXXXXXXXXXXXXX,DC=SAMBA,DC=ORG")
self.assertEqual(len(res), 1)
res = self.l.search(base="OU=A_LONG_DNXXXXXXXXXXXXXXX,DC=SAMBA,DC=GOV")
self.assertEqual(len(res), 1)
res = self.l.search(base="OU=A_LONG_DNXXXXXXXXXXXXXXX,DC=SAMBA")
self.assertEqual(len(res), 1)
res = self.l.search(base="OU=A_LONG_DNXXXXXXXXXXXXXXX,DC=SAMBA,DC=COM")
self.assertEqual(len(res), 0)
res = self.l.search(base="OU=A_LONG_DNXXXXXXXXXXXX,DC=SAMBA,DC=GOV")
self.assertEqual(len(res), 0)
# Non existent, key one less than truncation limit
res = self.l.search(base="OU=A_LONG_DNXXXXXXXXXXXXXX,DC=SAMBA")
self.assertEqual(len(res), 0)
def test_search_dn_filter_truncated_dn_keys(self):
#
# For all entries the DN index key gets truncated to
# 0 1 2 3 4 5
# 12345678901234567890123456789012345678901234567890
# @INDEX:@IDXDN:OU=A_LONG_DNXXXXXXXXXXXXXXX,DC=SAMBA
#
self.l.add({"dn": "OU=A_LONG_DNXXXXXXXXXXXXXXX,DC=SAMBA,DC=ORG",
"objectUUID": b"0123456789abcdef"})
self.checkGuids(
"@INDEX#@IDXDN#OU=A_LONG_DNXXXXXXXXXXXXXXX,DC=SAMBA",
b"0123456789abcdef")
self.l.add({"dn": "OU=A_LONG_DNXXXXXXXXXXXXXXX,DC=SAMBA,DC=GOV",
"objectUUID": b"0123456789abcde1"})
self.checkGuids(
"@INDEX#@IDXDN#OU=A_LONG_DNXXXXXXXXXXXXXXX,DC=SAMBA",
b"0123456789abcde1" + b"0123456789abcdef")
self.l.add({"dn": "OU=A_LONG_DNXXXXXXXXXXXXXXX,DC=SAMBA",
"objectUUID": b"0123456789abcde5"})
self.checkGuids(
"@INDEX:@IDXDN:OU=A_LONG_DNXXXXXXXXXXXXXXX,DC=SAMBA",
b"0123456789abcde5")
res = self.l.search(
expression="dn=OU=A_LONG_DNXXXXXXXXXXXXXXX,DC=SAMBA,DC=ORG")
self.assertEqual(len(res), 1)
res = self.l.search(
expression="dn=OU=A_LONG_DNXXXXXXXXXXXXXXX,DC=SAMBA,DC=GOV")
self.assertEqual(len(res), 1)
res = self.l.search(
expression="dn=OU=A_LONG_DNXXXXXXXXXXXXXXX,DC=SAMBA")
self.assertEqual(len(res), 1)
res = self.l.search(
expression="dn=OU=A_LONG_DNXXXXXXXXXXXXXXX,DC=SAMBA,DC=COM")
self.assertEqual(len(res), 0)
res = self.l.search(
expression="dn=OU=A_LONG_DNXXXXXXXXXXXX,DC=SAMBA,DC=GOV")
self.assertEqual(len(res), 0)
# Non existent, key one less than truncation limit
res = self.l.search(
expression="dn=OU=A_LONG_DNXXXXXXXXXXXXXX,DC=SAMBA")
self.assertEqual(len(res), 0)
def test_search_one_level_truncated_dn_keys(self):
#
# Except for the base DN's
# all entries the DN index key gets truncated to
# 0 1 2 3 4 5
# 12345678901234567890123456789012345678901234567890
# @INDEX:@IDXDN:OU=??,OU=A_LONG_DN_ONE_LVLX,DC=SAMBA
# The base DN-s truncate to
# @INDEX:@IDXDN:OU=A_LONG_DN_ONE_LVLX,DC=SAMBA,DC=OR
#
self.l.add({"dn": "OU=A_LONG_DN_ONE_LVLX,DC=SAMBA,DC=OR1",
"objectUUID": b"0123456789abcdef"})
self.l.add({"dn": "OU=A_LONG_DN_ONE_LVLX,DC=SAMBA,DC=OR2",
"objectUUID": b"0123456789abcd1f"})
self.checkGuids(
"@INDEX#@IDXDN#OU=A_LONG_DN_ONE_LVLX,DC=SAMBA,DC=OR",
b"0123456789abcd1f" + b"0123456789abcdef")
self.l.add({"dn": "OU=01,OU=A_LONG_DN_ONE_LVLX,DC=SAMBA,DC=OR1",
"objectUUID": b"0123456789abcde1"})
self.l.add({"dn": "OU=01,OU=A_LONG_DN_ONE_LVLX,DC=SAMBA,DC=OR2",
"objectUUID": b"0123456789abcd11"})
self.checkGuids(
"@INDEX#@IDXDN#OU=01,OU=A_LONG_DN_ONE_LVLX,DC=SAMBA",
b"0123456789abcd11" + b"0123456789abcde1")
self.l.add({"dn": "OU=02,OU=A_LONG_DN_ONE_LVLX,DC=SAMBA,DC=OR1",
"objectUUID": b"0123456789abcde2"})
self.l.add({"dn": "OU=02,OU=A_LONG_DN_ONE_LVLX,DC=SAMBA,DC=OR2",
"objectUUID": b"0123456789abcdf2"})
self.checkGuids(
"@INDEX#@IDXDN#OU=02,OU=A_LONG_DN_ONE_LVLX,DC=SAMBA",
b"0123456789abcde2" + b"0123456789abcdf2")
self.l.add({"dn": "OU=03,OU=A_LONG_DN_ONE_LVLX,DC=SAMBA,DC=OR1",
"objectUUID": b"0123456789abcde3"})
self.l.add({"dn": "OU=03,OU=A_LONG_DN_ONE_LVLX,DC=SAMBA,DC=OR2",
"objectUUID": b"0123456789abcd13"})
self.checkGuids(
"@INDEX#@IDXDN#OU=03,OU=A_LONG_DN_ONE_LVLX,DC=SAMBA",
b"0123456789abcd13" + b"0123456789abcde3")
# This key is not truncated as it's the max_key_len
self.l.add({"dn": "OU=01,OU=A_LONG_DN_ONE_LVLX,DC=SAMBA",
"objectUUID": b"0123456789abcde7"})
self.checkGuids(
"@INDEX:@IDXDN:OU=01,OU=A_LONG_DN_ONE_LVLX,DC=SAMBA",
b"0123456789abcde7")
res = self.l.search(base="OU=A_LONG_DN_ONE_LVLX,DC=SAMBA,DC=OR1",
scope=ldb.SCOPE_ONELEVEL)
self.assertEqual(len(res), 3)
self.assertTrue(
contains(res, "OU=01,OU=A_LONG_DN_ONE_LVLX,DC=SAMBA,DC=OR1"))
self.assertTrue(
contains(res, "OU=02,OU=A_LONG_DN_ONE_LVLX,DC=SAMBA,DC=OR1"))
self.assertTrue(
contains(res, "OU=03,OU=A_LONG_DN_ONE_LVLX,DC=SAMBA,DC=OR1"))
res = self.l.search(base="OU=A_LONG_DN_ONE_LVLX,DC=SAMBA,DC=OR2",
scope=ldb.SCOPE_ONELEVEL)
self.assertEqual(len(res), 3)
self.assertTrue(
contains(res, "OU=01,OU=A_LONG_DN_ONE_LVLX,DC=SAMBA,DC=OR2"))
self.assertTrue(
contains(res, "OU=02,OU=A_LONG_DN_ONE_LVLX,DC=SAMBA,DC=OR2"))
self.assertTrue(
contains(res, "OU=03,OU=A_LONG_DN_ONE_LVLX,DC=SAMBA,DC=OR2"))
res = self.l.search(base="OU=A_LONG_DN_ONE_LVLX,DC=SAMBA",
scope=ldb.SCOPE_ONELEVEL)
self.assertEqual(len(res), 1)
self.assertTrue(
contains(res, "OU=01,OU=A_LONG_DN_ONE_LVLX,DC=SAMBA"))
def test_search_sub_tree_truncated_dn_keys(self):
#
# Except for the base DN's
# all entries the DN index key gets truncated to
# 0 1 2 3 4 5
# 12345678901234567890123456789012345678901234567890
# @INDEX:@IDXDN:OU=??,OU=A_LONG_DN_SUB_TREE,DC=SAMBA
# The base DN-s truncate to
# @INDEX:@IDXDN:OU=A_LONG_DN_SUB_TREE,DC=SAMBA,DC=OR
#
self.l.add({"dn": "OU=A_LONG_DN_SUB_TREE,DC=SAMBA,DC=OR1",
"objectUUID": b"0123456789abcdef"})
self.l.add({"dn": "OU=A_LONG_DN_SUB_TREE,DC=SAMBA,DC=OR2",
"objectUUID": b"0123456789abcde4"})
self.l.add({"dn": "OU=A_LONG_DN_SUB_TREE,DC=SAMBA,DC=OR3",
"objectUUID": b"0123456789abcde8"})
self.checkGuids(
"@INDEX#@IDXDN#OU=A_LONG_DN_SUB_TREE,DC=SAMBA,DC=OR",
b"0123456789abcde4" + b"0123456789abcde8" + b"0123456789abcdef")
self.l.add({"dn": "OU=01,OU=A_LONG_DN_SUB_TREE,DC=SAMBA,DC=OR1",
"objectUUID": b"0123456789abcde1"})
self.l.add({"dn": "OU=01,OU=A_LONG_DN_SUB_TREE,DC=SAMBA,DC=OR2",
"objectUUID": b"0123456789abcde5"})
self.checkGuids(
"@INDEX#@IDXDN#OU=01,OU=A_LONG_DN_SUB_TREE,DC=SAMBA",
b"0123456789abcde1" + b"0123456789abcde5")
self.l.add({"dn": "OU=02,OU=A_LONG_DN_SUB_TREE,DC=SAMBA,DC=OR1",
"objectUUID": b"0123456789abcde2"})
self.l.add({"dn": "OU=02,OU=A_LONG_DN_SUB_TREE,DC=SAMBA,DC=OR2",
"objectUUID": b"0123456789abcde6"})
self.checkGuids(
"@INDEX#@IDXDN#OU=02,OU=A_LONG_DN_SUB_TREE,DC=SAMBA",
b"0123456789abcde2" + b"0123456789abcde6")
self.l.add({"dn": "OU=03,OU=A_LONG_DN_SUB_TREE,DC=SAMBA,DC=OR1",
"objectUUID": b"0123456789abcde3"})
self.l.add({"dn": "OU=03,OU=A_LONG_DN_SUB_TREE,DC=SAMBA,DC=OR2",
"objectUUID": b"0123456789abcde7"})
self.checkGuids(
"@INDEX#@IDXDN#OU=03,OU=A_LONG_DN_SUB_TREE,DC=SAMBA",
b"0123456789abcde3" + b"0123456789abcde7")
self.l.add({"dn": "OU=04,OU=A_LONG_DN_SUB_TREE,DC=SAMBA,DC=OR4",
"objectUUID": b"0123456789abcde9"})
self.checkGuids(
"@INDEX#@IDXDN#OU=04,OU=A_LONG_DN_SUB_TREE,DC=SAMBA",
b"0123456789abcde9")
res = self.l.search(base="OU=A_LONG_DN_SUB_TREE,DC=SAMBA,DC=OR1",
scope=ldb.SCOPE_SUBTREE)
self.assertEqual(len(res), 4)
self.assertTrue(
contains(res, "OU=A_LONG_DN_SUB_TREE,DC=SAMBA,DC=OR1"))
self.assertTrue(
contains(res, "OU=01,OU=A_LONG_DN_SUB_TREE,DC=SAMBA,DC=OR1"))
self.assertTrue(
contains(res, "OU=02,OU=A_LONG_DN_SUB_TREE,DC=SAMBA,DC=OR1"))
self.assertTrue(
contains(res, "OU=03,OU=A_LONG_DN_SUB_TREE,DC=SAMBA,DC=OR1"))
res = self.l.search(base="OU=A_LONG_DN_SUB_TREE,DC=SAMBA,DC=OR2",
scope=ldb.SCOPE_SUBTREE)
self.assertEqual(len(res), 4)
self.assertTrue(
contains(res, "OU=A_LONG_DN_SUB_TREE,DC=SAMBA,DC=OR2"))
self.assertTrue(
contains(res, "OU=01,OU=A_LONG_DN_SUB_TREE,DC=SAMBA,DC=OR2"))
self.assertTrue(
contains(res, "OU=02,OU=A_LONG_DN_SUB_TREE,DC=SAMBA,DC=OR2"))
self.assertTrue(
contains(res, "OU=03,OU=A_LONG_DN_SUB_TREE,DC=SAMBA,DC=OR2"))
res = self.l.search(base="OU=A_LONG_DN_SUB_TREE,DC=SAMBA,DC=OR3",
scope=ldb.SCOPE_SUBTREE)
self.assertEqual(len(res), 1)
self.assertTrue(
contains(res, "OU=A_LONG_DN_SUB_TREE,DC=SAMBA,DC=OR3"))
res = self.l.search(base="OU=A_LONG_DN_SUB_TREE,DC=SAMBA,DC=OR4",
scope=ldb.SCOPE_SUBTREE)
self.assertEqual(len(res), 1)
self.assertTrue(
contains(res, "OU=04,OU=A_LONG_DN_SUB_TREE,DC=SAMBA,DC=OR4"))
def test_search_base_truncated_dn_keys(self):
#
# For all entries the DN index key gets truncated to
# 0 1 2 3 4 5
# 12345678901234567890123456789012345678901234567890
# @INDEX:@IDXDN:OU=A_LONG_DNXXXXXXXXXXXXXXX,DC=SAMBA
#
self.l.add({"dn": "OU=A_LONG_DNXXXXXXXXXXXXXXX,DC=SAMBA,DC=ORG",
"objectUUID": b"0123456789abcdef"})
self.checkGuids(
"@INDEX#@IDXDN#OU=A_LONG_DNXXXXXXXXXXXXXXX,DC=SAMBA",
b"0123456789abcdef")
self.l.add({"dn": "OU=A_LONG_DNXXXXXXXXXXXXXXX,DC=SAMBA,DC=GOV",
"objectUUID": b"0123456789abcde1"})
self.checkGuids(
"@INDEX#@IDXDN#OU=A_LONG_DNXXXXXXXXXXXXXXX,DC=SAMBA",
b"0123456789abcde1" + b"0123456789abcdef")
self.l.add({"dn": "OU=A_LONG_DNXXXXXXXXXXXXXXX,DC=SAMBA",
"objectUUID": b"0123456789abcde5"})
self.checkGuids(
"@INDEX:@IDXDN:OU=A_LONG_DNXXXXXXXXXXXXXXX,DC=SAMBA",
b"0123456789abcde5")
res = self.l.search(
base="OU=A_LONG_DNXXXXXXXXXXXXXXX,DC=SAMBA,DC=ORG",
scope=ldb.SCOPE_BASE)
self.assertEqual(len(res), 1)
res = self.l.search(
base="OU=A_LONG_DNXXXXXXXXXXXXXXX,DC=SAMBA,DC=GOV",
scope=ldb.SCOPE_BASE)
self.assertEqual(len(res), 1)
res = self.l.search(
base="OU=A_LONG_DNXXXXXXXXXXXXXXX,DC=SAMBA",
scope=ldb.SCOPE_BASE)
self.assertEqual(len(res), 1)
res = self.l.search(
base="OU=A_LONG_DNXXXXXXXXXXXXXXX,DC=SAMBA,DC=COM",
scope=ldb.SCOPE_BASE)
self.assertEqual(len(res), 0)
res = self.l.search(
base="OU=A_LONG_DNXXXXXXXXXXXX,DC=SAMBA,DC=GOV",
scope=ldb.SCOPE_BASE)
self.assertEqual(len(res), 0)
# Non existent, key one less than truncation limit
res = self.l.search(
base="OU=A_LONG_DNXXXXXXXXXXXXXX,DC=SAMBA",
scope=ldb.SCOPE_BASE)
self.assertEqual(len(res), 0)
#
# Test non unique index searched with truncated keys
#
def test_index_truncated_keys(self):
# 0 1 2 3 4 5
# 12345678901234567890123456789012345678901234567890
# @INDEX:NOTUNIQUE:aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa
eq_max = b"aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa"
gt_max = b"aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa"
lt_max = b"aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa"
# > than max length and differs in values that will be truncated
gt_max_b = b"aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaab"
# Add two entries with the same value, key length = max so no
# truncation.
self.l.add({"dn": "OU=01,OU=SEARCH_NON_UNIQUE,DC=SAMBA,DC=ORG",
"notUnique": eq_max,
"objectUUID": b"0123456789abcde0"})
self.checkGuids(
"@INDEX:NOTUNIQUE:aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa",
b"0123456789abcde0")
self.l.add({"dn": "OU=02,OU=SEARCH_NON_UNIQUE,DC=SAMBA,DC=ORG",
"notUnique": eq_max,
"objectUUID": b"0123456789abcde1"})
self.checkGuids(
"@INDEX:NOTUNIQUE:aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa",
b"0123456789abcde0" + b"0123456789abcde1")
#
# An entry outside the tree
#
self.l.add({"dn": "OU=10,OU=SEARCH_NON_UNIQUE01,DC=SAMBA,DC=ORG",
"notUnique": eq_max,
"objectUUID": b"0123456789abcd11"})
self.checkGuids(
"@INDEX:NOTUNIQUE:aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa",
b"0123456789abcd11" + b"0123456789abcde0" + b"0123456789abcde1")
# Key longer than max so should get truncated to same key as
# the previous two entries
self.l.add({"dn": "OU=03,OU=SEARCH_NON_UNIQUE,DC=SAMBA,DC=ORG",
"notUnique": gt_max,
"objectUUID": b"0123456789abcde2"})
# But in the truncated key space
self.checkGuids(
"@INDEX#NOTUNIQUE#aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa",
b"0123456789abcde2")
# Key longer than max so should get truncated to same key as
# the previous entries but differs in the chars after max length
self.l.add({"dn": "OU=23,OU=SEARCH_NON_UNIQUE,DC=SAMBA,DC=ORG",
"notUnique": gt_max_b,
"objectUUID": b"0123456789abcd22"})
# But in the truncated key space
self.checkGuids(
"@INDEX#NOTUNIQUE#aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa",
b"0123456789abcd22" + b"0123456789abcde2")
#
# An entry outside the tree
#
self.l.add({"dn": "OU=11,OU=SEARCH_NON_UNIQUE01,DC=SAMBA,DC=ORG",
"notUnique": gt_max,
"objectUUID": b"0123456789abcd12"})
self.checkGuids(
"@INDEX#NOTUNIQUE#aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa",
b"0123456789abcd12" + b"0123456789abcd22" + b"0123456789abcde2")
# Key shorter than max
#
self.l.add({"dn": "OU=04,OU=SEARCH_NON_UNIQUE,DC=SAMBA,DC=ORG",
"notUnique": lt_max,
"objectUUID": b"0123456789abcde3"})
self.checkGuids(
"@INDEX:NOTUNIQUE:aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa",
b"0123456789abcde3")
#
# An entry outside the tree
#
self.l.add({"dn": "OU=12,OU=SEARCH_NON_UNIQUE01,DC=SAMBA,DC=ORG",
"notUnique": lt_max,
"objectUUID": b"0123456789abcd13"})
self.checkGuids(
"@INDEX:NOTUNIQUE:aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa",
b"0123456789abcd13" + b"0123456789abcde3")
#
# search for target is max value not truncated
# should return ou's 01, 02
#
expression = "(notUnique=" + eq_max.decode('ascii') + ")"
res = self.l.search(base="OU=SEARCH_NON_UNIQUE,DC=SAMBA,DC=ORG",
scope=ldb.SCOPE_ONELEVEL,
expression=expression)
self.assertEqual(len(res), 2)
self.assertTrue(
contains(res, "OU=01,OU=SEARCH_NON_UNIQUE,DC=SAMBA,DC=ORG"))
self.assertTrue(
contains(res, "OU=02,OU=SEARCH_NON_UNIQUE,DC=SAMBA,DC=ORG"))
#
# search for target is max value not truncated
# search one level up the tree, scope is ONE_LEVEL
# So should get no matches
#
expression = "(notUnique=" + eq_max.decode('ascii') + ")"
res = self.l.search(base="DC=SAMBA,DC=ORG",
scope=ldb.SCOPE_ONELEVEL,
expression=expression)
self.assertEqual(len(res), 0)
#
# search for target is max value not truncated
# search one level up the tree, scope is SUBTREE
# So should get 3 matches
#
res = self.l.search(base="DC=SAMBA,DC=ORG",
scope=ldb.SCOPE_SUBTREE,
expression=expression)
self.assertEqual(len(res), 3)
self.assertTrue(
contains(res, "OU=01,OU=SEARCH_NON_UNIQUE,DC=SAMBA,DC=ORG"))
self.assertTrue(
contains(res, "OU=02,OU=SEARCH_NON_UNIQUE,DC=SAMBA,DC=ORG"))
self.assertTrue(
contains(res, "OU=10,OU=SEARCH_NON_UNIQUE01,DC=SAMBA,DC=ORG"))
#
# search for target is max value + 1 so truncated
# should return ou 23 as it's gt_max_b being searched for
#
expression = "(notUnique=" + gt_max_b.decode('ascii') + ")"
res = self.l.search(base="OU=SEARCH_NON_UNIQUE,DC=SAMBA,DC=ORG",
scope=ldb.SCOPE_ONELEVEL,
expression=expression)
self.assertEqual(len(res), 1)
self.assertTrue(
contains(res, "OU=23,OU=SEARCH_NON_UNIQUE,DC=SAMBA,DC=ORG"))
#
# search for target is max value + 1 so truncated
# should return ou 03 as it's gt_max being searched for
#
expression = "(notUnique=" + gt_max.decode('ascii') + ")"
res = self.l.search(base="OU=SEARCH_NON_UNIQUE,DC=SAMBA,DC=ORG",
scope=ldb.SCOPE_ONELEVEL,
expression=expression)
self.assertEqual(len(res), 1)
self.assertTrue(
contains(res, "OU=03,OU=SEARCH_NON_UNIQUE,DC=SAMBA,DC=ORG"))
#
# scope one level and one level up one level up should get no matches
#
res = self.l.search(base="DC=SAMBA,DC=ORG",
scope=ldb.SCOPE_ONELEVEL,
expression=expression)
self.assertEqual(len(res), 0)
#
# scope sub tree and one level up one level up should get 2 matches
#
res = self.l.search(base="DC=SAMBA,DC=ORG",
scope=ldb.SCOPE_SUBTREE,
expression=expression)
self.assertEqual(len(res), 2)
self.assertTrue(
contains(res, "OU=03,OU=SEARCH_NON_UNIQUE,DC=SAMBA,DC=ORG"))
self.assertTrue(
contains(res, "OU=11,OU=SEARCH_NON_UNIQUE01,DC=SAMBA,DC=ORG"))
#
# search for target is max value - 1 so not truncated
# should return ou 04
#
expression = "(notUnique=" + lt_max.decode('ascii') + ")"
res = self.l.search(base="OU=SEARCH_NON_UNIQUE,DC=SAMBA,DC=ORG",
scope=ldb.SCOPE_ONELEVEL,
expression=expression)
self.assertEqual(len(res), 1)
self.assertTrue(
contains(res, "OU=04,OU=SEARCH_NON_UNIQUE,DC=SAMBA,DC=ORG"))
#
# scope one level and one level up one level up should get no matches
#
res = self.l.search(base="DC=SAMBA,DC=ORG",
scope=ldb.SCOPE_ONELEVEL,
expression=expression)
self.assertEqual(len(res), 0)
#
# scope sub tree and one level up one level up should get 2 matches
#
res = self.l.search(base="DC=SAMBA,DC=ORG",
scope=ldb.SCOPE_SUBTREE,
expression=expression)
self.assertEqual(len(res), 2)
self.assertTrue(
contains(res, "OU=04,OU=SEARCH_NON_UNIQUE,DC=SAMBA,DC=ORG"))
self.assertTrue(
contains(res, "OU=12,OU=SEARCH_NON_UNIQUE01,DC=SAMBA,DC=ORG"))
#
# Test index key truncation for base64 encoded values
#
def test_index_truncated_base64_encoded_keys(self):
value = b"aaaaaaaaaaaaaaaaaaaa\x02"
# base64 encodes to "YWFhYWFhYWFhYWFhYWFhYWFhYWEC"
# One less than max key length
self.l.add({"dn": "OU=01,OU=BASE64,DC=SAMBA,DC=ORG",
"base64____lt": value,
"objectUUID": b"0123456789abcde0"})
self.checkGuids(
"@INDEX:BASE64____LT::YWFhYWFhYWFhYWFhYWFhYWFhYWEC",
b"0123456789abcde0")
# Equal max key length
self.l.add({"dn": "OU=02,OU=BASE64,DC=SAMBA,DC=ORG",
"base64_____eq": value,
"objectUUID": b"0123456789abcde1"})
self.checkGuids(
"@INDEX:BASE64_____EQ::YWFhYWFhYWFhYWFhYWFhYWFhYWEC",
b"0123456789abcde1")
# One greater than max key length
self.l.add({"dn": "OU=03,OU=BASE64,DC=SAMBA,DC=ORG",
"base64______gt": value,
"objectUUID": b"0123456789abcde2"})
self.checkGuids(
"@INDEX#BASE64______GT##YWFhYWFhYWFhYWFhYWFhYWFhYWE",
b"0123456789abcde2")
#
# Test adding to non unique index with identical multivalued index
# attributes
#
def test_index_multi_valued_identical_keys(self):
# 0 1 2 3 4 5
# 12345678901234567890123456789012345678901234567890
# @INDEX:NOTUNIQUE:aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa
as_eq_max = b"aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa"
bs_eq_max = b"bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb"
try:
self.l.add({"dn": "OU=01,OU=SEARCH_NON_UNIQUE,DC=SAMBA,DC=ORG",
"notUnique": [bs_eq_max, as_eq_max, as_eq_max],
"objectUUID": b"0123456789abcde0"})
self.fail("Exception not thrown")
except ldb.LdbError as e:
code = e.args[0]
self.assertEqual(ldb.ERR_ATTRIBUTE_OR_VALUE_EXISTS, code)
#
# Test non unique index with multivalued index attributes
# searched with non truncated keys
#
def test_search_index_multi_valued_truncated_keys(self):
# 0 1 2 3 4 5
# 12345678901234567890123456789012345678901234567890
# @INDEX:NOTUNIQUE:aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa
aa_gt_max = b"aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa"
ab_gt_max = b"aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaab"
bb_gt_max = b"bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb"
self.l.add({"dn": "OU=01,OU=SEARCH_NON_UNIQUE,DC=SAMBA,DC=ORG",
"notUnique": [aa_gt_max, ab_gt_max, bb_gt_max],
"objectUUID": b"0123456789abcde0"})
self.checkGuids(
"@INDEX#NOTUNIQUE#aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa",
b"0123456789abcde0" + b"0123456789abcde0")
self.checkGuids(
"@INDEX#NOTUNIQUE#bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb",
b"0123456789abcde0")
expression = "(notUnique=" + aa_gt_max.decode('ascii') + ")"
res = self.l.search(base="OU=SEARCH_NON_UNIQUE,DC=SAMBA,DC=ORG",
scope=ldb.SCOPE_ONELEVEL,
expression=expression)
self.assertEqual(len(res), 1)
self.assertTrue(
contains(res, "OU=01,OU=SEARCH_NON_UNIQUE,DC=SAMBA,DC=ORG"))
expression = "(notUnique=" + ab_gt_max.decode('ascii') + ")"
res = self.l.search(base="OU=SEARCH_NON_UNIQUE,DC=SAMBA,DC=ORG",
scope=ldb.SCOPE_ONELEVEL,
expression=expression)
self.assertEqual(len(res), 1)
self.assertTrue(
contains(res, "OU=01,OU=SEARCH_NON_UNIQUE,DC=SAMBA,DC=ORG"))
expression = "(notUnique=" + bb_gt_max.decode('ascii') + ")"
res = self.l.search(base="OU=SEARCH_NON_UNIQUE,DC=SAMBA,DC=ORG",
scope=ldb.SCOPE_ONELEVEL,
expression=expression)
self.assertEqual(len(res), 1)
self.assertTrue(
contains(res, "OU=01,OU=SEARCH_NON_UNIQUE,DC=SAMBA,DC=ORG"))
#
# Test deletion of records with non unique index with multivalued index
# attributes
# replicate this to test modify with modify flags i.e. DELETE, REPLACE
#
def test_delete_index_multi_valued_truncated_keys(self):
# 0 1 2 3 4 5
# 12345678901234567890123456789012345678901234567890
# @INDEX:NOTUNIQUE:aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa
aa_gt_max = b"aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa"
ab_gt_max = b"aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaab"
bb_gt_max = b"bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb"
cc_gt_max = b"cccccccccccccccccccccccccccccccccc"
self.l.add({"dn": "OU=01,OU=DELETE_NON_UNIQUE,DC=SAMBA,DC=ORG",
"notUnique": [aa_gt_max, ab_gt_max, bb_gt_max],
"objectUUID": b"0123456789abcde0"})
self.l.add({"dn": "OU=02,OU=DELETE_NON_UNIQUE,DC=SAMBA,DC=ORG",
"notUnique": [aa_gt_max, ab_gt_max, cc_gt_max],
"objectUUID": b"0123456789abcde1"})
self.checkGuids(
"@INDEX#NOTUNIQUE#aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa",
b"0123456789abcde0" + b"0123456789abcde0" +
b"0123456789abcde1" + b"0123456789abcde1")
self.checkGuids(
"@INDEX#NOTUNIQUE#bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb",
b"0123456789abcde0")
self.checkGuids(
"@INDEX#NOTUNIQUE#ccccccccccccccccccccccccccccccccc",
b"0123456789abcde1")
res = self.l.search(
base="DC=SAMBA,DC=ORG",
expression="(notUnique=" + aa_gt_max.decode("ascii") + ")")
self.assertEqual(2, len(res))
self.assertTrue(
contains(res, "OU=01,OU=DELETE_NON_UNIQUE,DC=SAMBA,DC=ORG"))
self.assertTrue(
contains(res, "OU=02,OU=DELETE_NON_UNIQUE,DC=SAMBA,DC=ORG"))
self.l.delete("OU=02,OU=DELETE_NON_UNIQUE,DC=SAMBA,DC=ORG")
self.checkGuids(
"@INDEX#NOTUNIQUE#aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa",
b"0123456789abcde0" + b"0123456789abcde0")
self.checkGuids(
"@INDEX#NOTUNIQUE#bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb",
b"0123456789abcde0")
self.checkGuids(
"@INDEX#NOTUNIQUE#ccccccccccccccccccccccccccccccccc",
None)
self.l.delete("OU=01,OU=DELETE_NON_UNIQUE,DC=SAMBA,DC=ORG")
self.checkGuids(
"@INDEX#NOTUNIQUE#aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa",
None)
self.checkGuids(
"@INDEX#NOTUNIQUE#bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb",
None)
self.checkGuids(
"@INDEX#NOTUNIQUE#ccccccccccccccccccccccccccccccccc",
None)
#
# Test modification of records with non unique index with multivalued index
# attributes
#
def test_modify_index_multi_valued_truncated_keys(self):
# 0 1 2 3 4 5
# 12345678901234567890123456789012345678901234567890
# @INDEX:NOTUNIQUE:aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa
aa_gt_max = b"aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa"
ab_gt_max = b"aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaab"
bb_gt_max = b"bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb"
cc_gt_max = b"cccccccccccccccccccccccccccccccccc"
self.l.add({"dn": "OU=01,OU=MODIFY_NON_UNIQUE,DC=SAMBA,DC=ORG",
"notUnique": [aa_gt_max, ab_gt_max, bb_gt_max],
"objectUUID": b"0123456789abcde0"})
self.l.add({"dn": "OU=02,OU=MODIFY_NON_UNIQUE,DC=SAMBA,DC=ORG",
"notUnique": [aa_gt_max, ab_gt_max, cc_gt_max],
"objectUUID": b"0123456789abcde1"})
self.checkGuids(
"@INDEX#NOTUNIQUE#aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa",
b"0123456789abcde0" + b"0123456789abcde0" +
b"0123456789abcde1" + b"0123456789abcde1")
self.checkGuids(
"@INDEX#NOTUNIQUE#bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb",
b"0123456789abcde0")
self.checkGuids(
"@INDEX#NOTUNIQUE#ccccccccccccccccccccccccccccccccc",
b"0123456789abcde1")
res = self.l.search(
base="DC=SAMBA,DC=ORG",
expression="(notUnique=" + aa_gt_max.decode("ascii") + ")")
self.assertEqual(2, len(res))
self.assertTrue(
contains(res, "OU=01,OU=MODIFY_NON_UNIQUE,DC=SAMBA,DC=ORG"))
self.assertTrue(
contains(res, "OU=02,OU=MODIFY_NON_UNIQUE,DC=SAMBA,DC=ORG"))
#
# Modify that does not change the indexed attribute
#
msg = ldb.Message()
msg.dn = ldb.Dn(self.l, "OU=01,OU=MODIFY_NON_UNIQUE,DC=SAMBA,DC=ORG")
msg["notUnique"] = ldb.MessageElement(
[aa_gt_max, ab_gt_max, bb_gt_max],
ldb.FLAG_MOD_REPLACE,
"notUnique")
self.l.modify(msg)
#
# As the modify is replacing the attribute with the same contents
# there should be no changes to the indexes.
#
self.checkGuids(
"@INDEX#NOTUNIQUE#aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa",
b"0123456789abcde0" + b"0123456789abcde0" +
b"0123456789abcde1" + b"0123456789abcde1")
self.checkGuids(
"@INDEX#NOTUNIQUE#bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb",
b"0123456789abcde0")
self.checkGuids(
"@INDEX#NOTUNIQUE#ccccccccccccccccccccccccccccccccc",
b"0123456789abcde1")
#
# Modify that removes a value from the indexed attribute
#
msg = ldb.Message()
msg.dn = ldb.Dn(self.l, "OU=01,OU=MODIFY_NON_UNIQUE,DC=SAMBA,DC=ORG")
msg["notUnique"] = ldb.MessageElement(
[aa_gt_max, bb_gt_max],
ldb.FLAG_MOD_REPLACE,
"notUnique")
self.l.modify(msg)
self.checkGuids(
"@INDEX#NOTUNIQUE#aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa",
b"0123456789abcde0" +
b"0123456789abcde1" + b"0123456789abcde1")
self.checkGuids(
"@INDEX#NOTUNIQUE#bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb",
b"0123456789abcde0")
self.checkGuids(
"@INDEX#NOTUNIQUE#ccccccccccccccccccccccccccccccccc",
b"0123456789abcde1")
#
# Modify that does a constrained delete the indexed attribute
#
msg = ldb.Message()
msg.dn = ldb.Dn(self.l, "OU=02,OU=MODIFY_NON_UNIQUE,DC=SAMBA,DC=ORG")
msg["notUnique"] = ldb.MessageElement(
[ab_gt_max],
ldb.FLAG_MOD_DELETE,
"notUnique")
self.l.modify(msg)
self.checkGuids(
"@INDEX#NOTUNIQUE#aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa",
b"0123456789abcde0" + b"0123456789abcde1")
self.checkGuids(
"@INDEX#NOTUNIQUE#bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb",
b"0123456789abcde0")
self.checkGuids(
"@INDEX#NOTUNIQUE#ccccccccccccccccccccccccccccccccc",
b"0123456789abcde1")
#
# Modify that does an unconstrained delete the indexed attribute
#
msg = ldb.Message()
msg.dn = ldb.Dn(self.l, "OU=02,OU=MODIFY_NON_UNIQUE,DC=SAMBA,DC=ORG")
msg["notUnique"] = ldb.MessageElement(
[],
ldb.FLAG_MOD_DELETE,
"notUnique")
self.l.modify(msg)
self.checkGuids(
"@INDEX#NOTUNIQUE#aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa",
b"0123456789abcde0")
self.checkGuids(
"@INDEX#NOTUNIQUE#bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb",
b"0123456789abcde0")
self.checkGuids(
"@INDEX#NOTUNIQUE#ccccccccccccccccccccccccccccccccc",
None)
#
# Modify that adds a value to the indexed attribute
#
msg = ldb.Message()
msg.dn = ldb.Dn(self.l, "OU=02,OU=MODIFY_NON_UNIQUE,DC=SAMBA,DC=ORG")
msg["notUnique"] = ldb.MessageElement(
[cc_gt_max],
ldb.FLAG_MOD_ADD,
"notUnique")
self.l.modify(msg)
self.checkGuids(
"@INDEX#NOTUNIQUE#aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa",
b"0123456789abcde0")
self.checkGuids(
"@INDEX#NOTUNIQUE#bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb",
b"0123456789abcde0")
self.checkGuids(
"@INDEX#NOTUNIQUE#ccccccccccccccccccccccccccccccccc",
b"0123456789abcde1")
#
# Modify that adds a values to the indexed attribute
#
msg = ldb.Message()
msg.dn = ldb.Dn(self.l, "OU=02,OU=MODIFY_NON_UNIQUE,DC=SAMBA,DC=ORG")
msg["notUnique"] = ldb.MessageElement(
[aa_gt_max, ab_gt_max],
ldb.FLAG_MOD_ADD,
"notUnique")
self.l.modify(msg)
self.checkGuids(
"@INDEX#NOTUNIQUE#aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa",
b"0123456789abcde0" +
b"0123456789abcde1" + b"0123456789abcde1")
self.checkGuids(
"@INDEX#NOTUNIQUE#bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb",
b"0123456789abcde0")
self.checkGuids(
"@INDEX#NOTUNIQUE#ccccccccccccccccccccccccccccccccc",
b"0123456789abcde1")
#
# Test Sub tree searches when checkBaseOnSearch is enabled and the
# DN indexes are truncated and collide.
#
def test_check_base_on_search_truncated_dn_keys(self):
#
# Except for the base DN's
# all entries the DN index key gets truncated to
# 0 1 2 3 4 5
# 12345678901234567890123456789012345678901234567890
# @INDEX:@IDXDN:OU=??,OU=CHECK_BASE_DN_XXXX,DC=SAMBA
# The base DN-s truncate to
# @INDEX:@IDXDN:OU=CHECK_BASE_DN_XXXX,DC=SAMBA,DC=OR
#
checkbaseonsearch = {"dn": "@OPTIONS",
"checkBaseOnSearch": b"TRUE"}
self.l.add(checkbaseonsearch)
self.l.add({"dn": "OU=CHECK_BASE_DN_XXXX,DC=SAMBA,DC=OR1",
"objectUUID": b"0123456789abcdef"})
self.l.add({"dn": "OU=CHECK_BASE_DN_XXXX,DC=SAMBA,DC=OR2",
"objectUUID": b"0123456789abcdee"})
self.checkGuids(
"@INDEX#@IDXDN#OU=CHECK_BASE_DN_XXXX,DC=SAMBA,DC=OR",
b"0123456789abcdee" + b"0123456789abcdef")
self.l.add({"dn": "OU=01,OU=CHECK_BASE_DN_XXXX,DC=SAMBA,DC=OR1",
"objectUUID": b"0123456789abcdec"})
self.l.add({"dn": "OU=01,OU=CHECK_BASE_DN_XXXX,DC=SAMBA,DC=OR2",
"objectUUID": b"0123456789abcdeb"})
self.l.add({"dn": "OU=01,OU=CHECK_BASE_DN_XXXX,DC=SAMBA,DC=OR3",
"objectUUID": b"0123456789abcded"})
self.checkGuids(
"@INDEX#@IDXDN#OU=01,OU=CHECK_BASE_DN_XXXX,DC=SAMBA",
b"0123456789abcdeb" + b"0123456789abcdec" + b"0123456789abcded")
self.l.add({"dn": "OU=02,OU=CHECK_BASE_DN_XXXX,DC=SAMBA,DC=OR1",
"objectUUID": b"0123456789abcde0"})
self.l.add({"dn": "OU=02,OU=CHECK_BASE_DN_XXXX,DC=SAMBA,DC=OR2",
"objectUUID": b"0123456789abcde1"})
self.l.add({"dn": "OU=02,OU=CHECK_BASE_DN_XXXX,DC=SAMBA,DC=OR3",
"objectUUID": b"0123456789abcde2"})
self.checkGuids(
"@INDEX#@IDXDN#OU=02,OU=CHECK_BASE_DN_XXXX,DC=SAMBA",
b"0123456789abcde0" + b"0123456789abcde1" + b"0123456789abcde2")
res = self.l.search(base="OU=CHECK_BASE_DN_XXXX,DC=SAMBA,DC=OR1",
scope=ldb.SCOPE_SUBTREE)
self.assertEqual(len(res), 3)
self.assertTrue(
contains(res, "OU=CHECK_BASE_DN_XXXX,DC=SAMBA,DC=OR1"))
self.assertTrue(
contains(res, "OU=01,OU=CHECK_BASE_DN_XXXX,DC=SAMBA,DC=OR1"))
self.assertTrue(
contains(res, "OU=02,OU=CHECK_BASE_DN_XXXX,DC=SAMBA,DC=OR1"))
res = self.l.search(base="OU=CHECK_BASE_DN_XXXX,DC=SAMBA,DC=OR2",
scope=ldb.SCOPE_SUBTREE)
self.assertEqual(len(res), 3)
self.assertTrue(
contains(res, "OU=CHECK_BASE_DN_XXXX,DC=SAMBA,DC=OR2"))
self.assertTrue(
contains(res, "OU=01,OU=CHECK_BASE_DN_XXXX,DC=SAMBA,DC=OR2"))
self.assertTrue(
contains(res, "OU=02,OU=CHECK_BASE_DN_XXXX,DC=SAMBA,DC=OR2"))
try:
res = self.l.search(base="OU=CHECK_BASE_DN_XXXX,DC=SAMBA,DC=OR3",
scope=ldb.SCOPE_SUBTREE)
self.fail("Expected exception no thrown")
except ldb.LdbError as e:
code = e.args[0]
self.assertEqual(ldb.ERR_NO_SUCH_OBJECT, code)
# Run the index truncation tests against an lmdb backend
class MaxIndexKeyLengthTestsLmdb(MaxIndexKeyLengthTests):
def setUp(self):
if os.environ.get('HAVE_LMDB', '1') == '0':
self.skipTest("No lmdb backend")
self.prefix = MDB_PREFIX
super(MaxIndexKeyLengthTestsLmdb, self).setUp()
def tearDown(self):
super(MaxIndexKeyLengthTestsLmdb, self).tearDown()
class OrderedIntegerRangeTests(LdbBaseTest):
def tearDown(self):
shutil.rmtree(self.testdir)
super(OrderedIntegerRangeTests, self).tearDown()
# Ensure the LDB is closed now, so we close the FD
del(self.l)
def setUp(self):
super(OrderedIntegerRangeTests, self).setUp()
self.testdir = tempdir()
self.filename = os.path.join(self.testdir, "ordered_integer_test.ldb")
self.l = ldb.Ldb(self.url(),
options=self.options())
self.l.add({"dn": "@ATTRIBUTES",
"int64attr": "ORDERED_INTEGER"})
self.l.add({"dn": "@INDEXLIST",
"@IDXATTR": [b"int64attr"],
"@IDXONE": [b"1"],
"@IDXGUID": [b"objectUUID"],
"@IDX_DN_GUID": [b"GUID"]})
def options(self):
if self.prefix == MDB_PREFIX:
return ['modules:rdn_name',
'disable_full_db_scan_for_self_test:1']
else:
return ['modules:rdn_name']
def test_comparison_expression(self):
int64_max = 2**63-1
int64_min = -2**63
test_nums = list(range(-5, 5))
test_nums += list(range(int64_max-5, int64_max+1))
test_nums += list(range(int64_min, int64_min+5))
test_nums = sorted(test_nums)
for (i, num) in enumerate(test_nums):
ouuid = 0x0123456789abcdef + i
ouuid_s = bytes(('0' + hex(ouuid)[2:]).encode())
self.l.add({"dn": "OU=COMPTESTOU{},DC=SAMBA,DC=ORG".format(i),
"objectUUID": ouuid_s,
"int64attr": str(num)})
def assert_int64_expr(expr, py_expr=None):
res = self.l.search(base="DC=SAMBA,DC=ORG",
scope=ldb.SCOPE_SUBTREE,
expression="(int64attr%s)" % (expr))
if not py_expr:
py_expr = expr
expect = [n for n in test_nums if eval(str(n) + py_expr)]
vals = sorted([int(r.get("int64attr")[0]) for r in res])
self.assertEqual(len(res), len(expect))
self.assertEqual(set(vals), set(expect))
self.assertEqual(expect, vals)
assert_int64_expr(">=-2")
assert_int64_expr("<=2")
assert_int64_expr(">=" + str(int64_min))
assert_int64_expr("<=" + str(int64_min))
assert_int64_expr("<=" + str(int64_min+1))
assert_int64_expr("<=" + str(int64_max))
assert_int64_expr(">=" + str(int64_max))
assert_int64_expr(">=" + str(int64_max-1))
assert_int64_expr("=10", "==10")
def test_comparison_expression_duplicates(self):
int64_max = 2**63-1
int64_min = -2**63
test_nums = list(range(-5, 5)) * 3
test_nums += list(range(-20, 20, 5)) * 2
test_nums += list(range(-50, 50, 15))
test_nums = sorted(test_nums)
for (i, num) in enumerate(test_nums):
ouuid = 0x0123456789abcdef + i
ouuid_s = bytes(('0' + hex(ouuid)[2:]).encode())
self.l.add({"dn": "OU=COMPTESTOU{},DC=SAMBA,DC=ORG".format(i),
"objectUUID": ouuid_s,
"int64attr": str(num)})
def assert_int64_expr(expr, py_expr=None):
res = self.l.search(base="DC=SAMBA,DC=ORG",
scope=ldb.SCOPE_SUBTREE,
expression="(int64attr%s)" % (expr))
if not py_expr:
py_expr = expr
expect = [n for n in test_nums if eval(str(n) + py_expr)]
vals = sorted([int(r.get("int64attr")[0]) for r in res])
self.assertEqual(len(res), len(expect))
self.assertEqual(set(vals), set(expect))
self.assertEqual(expect, vals)
assert_int64_expr(">=-2")
assert_int64_expr("<=2")
assert_int64_expr(">=" + str(int64_min))
assert_int64_expr("<=" + str(int64_min))
assert_int64_expr("<=" + str(int64_min+1))
assert_int64_expr("<=" + str(int64_max))
assert_int64_expr(">=" + str(int64_max))
assert_int64_expr(">=" + str(int64_max-1))
assert_int64_expr("=-5", "==-5")
assert_int64_expr("=5", "==5")
# Run the ordered integer range tests against an lmdb backend
class OrderedIntegerRangeTestsLmdb(OrderedIntegerRangeTests):
def setUp(self):
if os.environ.get('HAVE_LMDB', '1') == '0':
self.skipTest("No lmdb backend")
self.prefix = MDB_PREFIX
super(OrderedIntegerRangeTestsLmdb, self).setUp()
def tearDown(self):
super(OrderedIntegerRangeTestsLmdb, self).tearDown()
# Run the index truncation tests against an lmdb backend
class RejectSubDBIndex(LdbBaseTest):
def setUp(self):
if os.environ.get('HAVE_LMDB', '1') == '0':
self.skipTest("No lmdb backend")
self.prefix = MDB_PREFIX
super(RejectSubDBIndex, self).setUp()
self.testdir = tempdir()
self.filename = os.path.join(self.testdir,
"reject_subidx_test.ldb")
self.l = ldb.Ldb(self.url(),
options=[
"modules:rdn_name"])
def tearDown(self):
super(RejectSubDBIndex, self).tearDown()
def test_try_subdb_index(self):
try:
self.l.add({"dn": "@INDEXLIST",
"@IDX_LMDB_SUBDB": [b"1"],
"@IDXONE": [b"1"],
"@IDXGUID": [b"objectUUID"],
"@IDX_DN_GUID": [b"GUID"],
})
except ldb.LdbError as e:
code = e.args[0]
string = e.args[1]
self.assertEqual(ldb.ERR_OPERATIONS_ERROR, code)
self.assertIn("sub-database index", string)
if __name__ == '__main__':
import unittest
unittest.TestProgram()