mirror of
https://github.com/samba-team/samba.git
synced 2024-12-23 17:34:34 +03:00
b29793ffde
Signed-off-by: Andreas Schneider <asn@samba.org> Reviewed-by: Joseph Sutton <josephsutton@catalyst.net.nz>
372 lines
13 KiB
Python
372 lines
13 KiB
Python
#!/usr/bin/env python3
|
|
# -*- coding: utf-8 -*-
|
|
#
|
|
# Copyright (C) Jelmer Vernooij <jelmer@samba.org> 2008-2011
|
|
#
|
|
# This program is free software; you can redistribute it and/or modify
|
|
# it under the terms of the GNU General Public License as published by
|
|
# the Free Software Foundation; either version 3 of the License, or
|
|
# (at your option) any later version.
|
|
#
|
|
# This program is distributed in the hope that it will be useful,
|
|
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
# GNU General Public License for more details.
|
|
#
|
|
# You should have received a copy of the GNU General Public License
|
|
# along with this program. If not, see <http://www.gnu.org/licenses/>.
|
|
|
|
import optparse
|
|
import sys
|
|
import os
|
|
from itertools import permutations
|
|
import traceback
|
|
|
|
sys.path.insert(0, "bin/python")
|
|
import samba
|
|
from samba.tests.subunitrun import SubunitOptions, TestProgram
|
|
import samba.getopt as options
|
|
|
|
from samba.auth import system_session
|
|
from ldb import SCOPE_BASE, LdbError
|
|
from ldb import Message, MessageElement, Dn
|
|
from ldb import FLAG_MOD_ADD, FLAG_MOD_REPLACE, FLAG_MOD_DELETE
|
|
from samba.samdb import SamDB
|
|
|
|
from samba.tests import delete_force
|
|
|
|
TEST_DATA_DIR = os.path.join(
|
|
os.path.dirname(__file__),
|
|
'testdata')
|
|
|
|
LDB_STRERR = {}
|
|
def _build_ldb_strerr():
|
|
import ldb
|
|
for k, v in vars(ldb).items():
|
|
if k.startswith('ERR_') and isinstance(v, int):
|
|
LDB_STRERR[v] = k
|
|
|
|
_build_ldb_strerr()
|
|
|
|
|
|
class ModifyOrderTests(samba.tests.TestCase):
|
|
|
|
def setUp(self):
|
|
super().setUp()
|
|
self.admin_dsdb = get_dsdb(admin_creds)
|
|
self.base_dn = self.admin_dsdb.domain_dn()
|
|
|
|
def delete_object(self, dn):
|
|
delete_force(self.admin_dsdb, dn)
|
|
|
|
def get_user_dn(self, name):
|
|
return "CN=%s,CN=Users,%s" % (name, self.base_dn)
|
|
|
|
def _test_modify_order(self,
|
|
start_attrs,
|
|
mod_attrs,
|
|
extra_search_attrs=(),
|
|
name=None):
|
|
if name is None:
|
|
name = traceback.extract_stack()[-2][2][5:]
|
|
|
|
if opts.normal_user:
|
|
name += '-non-admin'
|
|
username = "user123"
|
|
password = "pass123@#$@#"
|
|
self.admin_dsdb.newuser(username, password)
|
|
self.addCleanup(self.delete_object, self.get_user_dn(username))
|
|
mod_creds = self.insta_creds(template=admin_creds,
|
|
username=username,
|
|
userpass=password)
|
|
else:
|
|
mod_creds = admin_creds
|
|
|
|
mod_dsdb = get_dsdb(mod_creds)
|
|
sig = []
|
|
op_lut = ['', 'add', 'replace', 'delete']
|
|
|
|
search_attrs = set(extra_search_attrs)
|
|
lines = [name, "initial attrs:"]
|
|
for k, v in start_attrs:
|
|
lines.append("%20s: %r" % (k, v))
|
|
search_attrs.add(k)
|
|
|
|
for k, v, op in mod_attrs:
|
|
search_attrs.add(k)
|
|
|
|
search_attrs = sorted(search_attrs)
|
|
header = "\n".join(lines)
|
|
sig.append(header)
|
|
|
|
clusters = {}
|
|
for i, attrs in enumerate(permutations(mod_attrs)):
|
|
# for each permutation we construct a string describing the
|
|
# requested operations, and a string describing the result
|
|
# (which may be an exception). The we cluster the
|
|
# attribute strings by their results.
|
|
dn = "cn=ldaptest_%s_%d,cn=users,%s" % (name, i, self.base_dn)
|
|
m = Message()
|
|
m.dn = Dn(self.admin_dsdb, dn)
|
|
|
|
# We are using Message objects here for add (rather than the
|
|
# more convenient dict) because we maybe care about the order
|
|
# in which attributes are added.
|
|
|
|
for k, v in start_attrs:
|
|
m[k] = MessageElement(v, 0, k)
|
|
|
|
self.admin_dsdb.add(m)
|
|
self.addCleanup(self.delete_object, dn)
|
|
|
|
m = Message()
|
|
m.dn = Dn(mod_dsdb, dn)
|
|
|
|
attr_lines = []
|
|
for k, v, op in attrs:
|
|
if v is None:
|
|
v = dn
|
|
m[k] = MessageElement(v, op, k)
|
|
attr_lines.append("%16s %-8s %s" % (k, op_lut[op], v))
|
|
|
|
attr_str = '\n'.join(attr_lines)
|
|
|
|
try:
|
|
mod_dsdb.modify(m)
|
|
except LdbError as e:
|
|
err, _ = e.args
|
|
s = LDB_STRERR.get(err, "unknown error")
|
|
result_str = "%s (%d)" % (s, err)
|
|
else:
|
|
res = self.admin_dsdb.search(base=dn, scope=SCOPE_BASE,
|
|
attrs=search_attrs)
|
|
|
|
lines = []
|
|
for k, v in sorted(dict(res[0]).items()):
|
|
if k != "dn" or k in extra_search_attrs:
|
|
lines.append("%20s: %r" % (k, sorted(v)))
|
|
|
|
result_str = '\n'.join(lines)
|
|
|
|
clusters.setdefault(result_str, []).append(attr_str)
|
|
|
|
for s, attrs in sorted(clusters.items()):
|
|
sig.extend([
|
|
"== result ===[%3d]=======================" % len(attrs),
|
|
s,
|
|
"-- operations ---------------------------"])
|
|
for a in attrs:
|
|
sig.append(a)
|
|
sig.append("-" * 34)
|
|
|
|
sig = '\n'.join(sig).replace(self.base_dn, "{base dn}")
|
|
|
|
if opts.verbose:
|
|
print(sig)
|
|
|
|
if opts.rewrite_ground_truth:
|
|
f = open(os.path.join(TEST_DATA_DIR, name + '.expected'), 'w')
|
|
f.write(sig)
|
|
f.close()
|
|
f = open(os.path.join(TEST_DATA_DIR, name + '.expected'))
|
|
expected = f.read()
|
|
f.close()
|
|
|
|
self.assertStringsEqual(sig, expected)
|
|
|
|
def test_modify_order_mixed(self):
|
|
start_attrs = [("objectclass", "user"),
|
|
("carLicense", ["1", "2", "3"]),
|
|
("otherTelephone", "123")]
|
|
|
|
mod_attrs = [("carLicense", "3", FLAG_MOD_DELETE),
|
|
("carLicense", "4", FLAG_MOD_ADD),
|
|
("otherTelephone", "4", FLAG_MOD_REPLACE),
|
|
("otherTelephone", "123", FLAG_MOD_DELETE)]
|
|
self._test_modify_order(start_attrs, mod_attrs)
|
|
|
|
def test_modify_order_mixed2(self):
|
|
start_attrs = [("objectclass", "user"),
|
|
("carLicense", ["1", "2", "3"]),
|
|
("ipPhone", "123")]
|
|
|
|
mod_attrs = [("carLicense", "3", FLAG_MOD_DELETE),
|
|
("carLicense", "4", FLAG_MOD_ADD),
|
|
("ipPhone", "4", FLAG_MOD_REPLACE),
|
|
("ipPhone", "123", FLAG_MOD_DELETE)]
|
|
self._test_modify_order(start_attrs, mod_attrs)
|
|
|
|
def test_modify_order_telephone(self):
|
|
start_attrs = [("objectclass", "user"),
|
|
("otherTelephone", "123")]
|
|
|
|
mod_attrs = [("carLicense", "3", FLAG_MOD_REPLACE),
|
|
("carLicense", "4", FLAG_MOD_ADD),
|
|
("otherTelephone", "4", FLAG_MOD_REPLACE),
|
|
("otherTelephone", "4", FLAG_MOD_ADD),
|
|
("otherTelephone", "123", FLAG_MOD_DELETE)]
|
|
self._test_modify_order(start_attrs, mod_attrs)
|
|
|
|
def test_modify_order_telephone_delete_delete(self):
|
|
start_attrs = [("objectclass", "user"),
|
|
("otherTelephone", "123")]
|
|
|
|
mod_attrs = [("carLicense", "3", FLAG_MOD_REPLACE),
|
|
("carLicense", "4", FLAG_MOD_DELETE),
|
|
("otherTelephone", "4", FLAG_MOD_REPLACE),
|
|
("otherTelephone", "4", FLAG_MOD_DELETE),
|
|
("otherTelephone", "123", FLAG_MOD_DELETE)]
|
|
self._test_modify_order(start_attrs, mod_attrs)
|
|
|
|
def test_modify_order_objectclass(self):
|
|
start_attrs = [("objectclass", "user"),
|
|
("otherTelephone", "123")]
|
|
|
|
mod_attrs = [("objectclass", "computer", FLAG_MOD_REPLACE),
|
|
("objectclass", "user", FLAG_MOD_DELETE),
|
|
("objectclass", "person", FLAG_MOD_DELETE)]
|
|
self._test_modify_order(start_attrs, mod_attrs)
|
|
|
|
def test_modify_order_objectclass2(self):
|
|
start_attrs = [("objectclass", "user")]
|
|
|
|
mod_attrs = [("objectclass", "computer", FLAG_MOD_REPLACE),
|
|
("objectclass", "user", FLAG_MOD_ADD),
|
|
("objectclass", "attributeSchema", FLAG_MOD_REPLACE),
|
|
("objectclass", "inetOrgPerson", FLAG_MOD_ADD),
|
|
("objectclass", "person", FLAG_MOD_DELETE)]
|
|
self._test_modify_order(start_attrs, mod_attrs)
|
|
|
|
def test_modify_order_singlevalue(self):
|
|
start_attrs = [("objectclass", "user"),
|
|
("givenName", "a")]
|
|
|
|
mod_attrs = [("givenName", "a", FLAG_MOD_REPLACE),
|
|
("givenName", ["b", "a"], FLAG_MOD_REPLACE),
|
|
("givenName", "b", FLAG_MOD_DELETE),
|
|
("givenName", "a", FLAG_MOD_DELETE),
|
|
("givenName", "c", FLAG_MOD_ADD)]
|
|
self._test_modify_order(start_attrs, mod_attrs)
|
|
|
|
def test_modify_order_inapplicable(self):
|
|
#attributes that don't go on a user
|
|
start_attrs = [("objectclass", "user"),
|
|
("givenName", "a")]
|
|
|
|
mod_attrs = [("dhcpSites", "b", FLAG_MOD_REPLACE),
|
|
("dhcpSites", "b", FLAG_MOD_DELETE),
|
|
("dhcpSites", "c", FLAG_MOD_ADD)]
|
|
self._test_modify_order(start_attrs, mod_attrs)
|
|
|
|
def test_modify_order_sometimes_inapplicable(self):
|
|
# attributes that don't go on a user, but do on a computer,
|
|
# which we sometimes change into.
|
|
start_attrs = [("objectclass", "user"),
|
|
("givenName", "a")]
|
|
|
|
mod_attrs = [("objectclass", "computer", FLAG_MOD_REPLACE),
|
|
("objectclass", "person", FLAG_MOD_DELETE),
|
|
("dnsHostName", "b", FLAG_MOD_ADD),
|
|
("dnsHostName", "c", FLAG_MOD_REPLACE)]
|
|
self._test_modify_order(start_attrs, mod_attrs)
|
|
|
|
def test_modify_order_account_locality_device(self):
|
|
# account, locality, and device all take l (locality name) but
|
|
# only device takes owner. We shouldn't be able to change
|
|
# objectclass at all.
|
|
start_attrs = [("objectclass", "account"),
|
|
("l", "a")]
|
|
|
|
mod_attrs = [("objectclass", ["device", "top"], FLAG_MOD_REPLACE),
|
|
("l", "a", FLAG_MOD_DELETE),
|
|
("owner", "c", FLAG_MOD_ADD)
|
|
]
|
|
self._test_modify_order(start_attrs, mod_attrs)
|
|
|
|
def test_modify_order_container_flags_multivalue(self):
|
|
# account, locality, and device all take l (locality name)
|
|
# but only device takes owner
|
|
start_attrs = [("objectclass", "container"),
|
|
("wWWHomePage", "a")]
|
|
|
|
mod_attrs = [("flags", ["0", "1"], FLAG_MOD_ADD),
|
|
("flags", "65355", FLAG_MOD_ADD),
|
|
("flags", "65355", FLAG_MOD_DELETE),
|
|
("flags", ["2", "101"], FLAG_MOD_REPLACE),
|
|
]
|
|
self._test_modify_order(start_attrs, mod_attrs)
|
|
|
|
def test_modify_order_container_flags(self):
|
|
#flags should be an integer
|
|
start_attrs = [("objectclass", "container")]
|
|
|
|
mod_attrs = [("flags", "0x6", FLAG_MOD_ADD),
|
|
("flags", "5", FLAG_MOD_ADD),
|
|
("flags", "101", FLAG_MOD_REPLACE),
|
|
("flags", "c", FLAG_MOD_DELETE),
|
|
]
|
|
self._test_modify_order(start_attrs, mod_attrs)
|
|
|
|
def test_modify_order_member(self):
|
|
name = "modify_order_member_other_group"
|
|
|
|
dn2 = "cn=%s,%s" % (name, self.base_dn)
|
|
m = Message()
|
|
m.dn = Dn(self.admin_dsdb, dn2)
|
|
self.admin_dsdb.add({"dn": dn2, "objectclass": "group"})
|
|
self.addCleanup(self.delete_object, dn2)
|
|
|
|
start_attrs = [("objectclass", "group"),
|
|
("member", dn2)]
|
|
|
|
mod_attrs = [("member", None, FLAG_MOD_DELETE),
|
|
("member", None, FLAG_MOD_REPLACE),
|
|
("member", dn2, FLAG_MOD_DELETE),
|
|
("member", None, FLAG_MOD_ADD),
|
|
]
|
|
self._test_modify_order(start_attrs, mod_attrs, ["memberOf"])
|
|
|
|
|
|
def get_dsdb(creds=None):
|
|
if creds is None:
|
|
creds = admin_creds
|
|
dsdb = SamDB(host,
|
|
credentials=creds,
|
|
session_info=system_session(lp),
|
|
lp=lp)
|
|
return dsdb
|
|
|
|
|
|
parser = optparse.OptionParser("ldap_modify_order.py [options] <host>")
|
|
sambaopts = options.SambaOptions(parser)
|
|
parser.add_option_group(sambaopts)
|
|
parser.add_option_group(options.VersionOptions(parser))
|
|
credopts = options.CredentialsOptions(parser)
|
|
parser.add_option_group(credopts)
|
|
subunitopts = SubunitOptions(parser)
|
|
parser.add_option_group(subunitopts)
|
|
parser.add_option("--rewrite-ground-truth", action="store_true",
|
|
help="write expected values")
|
|
parser.add_option("-v", "--verbose", action="store_true")
|
|
parser.add_option("--normal-user", action="store_true")
|
|
|
|
opts, args = parser.parse_args()
|
|
|
|
if len(args) < 1:
|
|
parser.print_usage()
|
|
sys.exit(1)
|
|
|
|
host = args[0]
|
|
|
|
lp = sambaopts.get_loadparm()
|
|
admin_creds = credopts.get_credentials(lp)
|
|
|
|
if "://" not in host:
|
|
if os.path.isfile(host):
|
|
host = "tdb://%s" % host
|
|
else:
|
|
host = "ldap://%s" % host
|
|
|
|
|
|
TestProgram(module=__name__, opts=subunitopts)
|