1
0
mirror of https://github.com/samba-team/samba.git synced 2025-01-22 22:04:08 +03:00

tests/rodc: Add a number of tests for RODC-RWDC interaction

This tests password fallback to RWDC in preloaded and non-preloaded
cases. It also tests some basic scenarios around what things are
replicated between the two DCs.

Signed-off-by: Garming Sam <garming@catalyst.net.nz>
Reviewed-by: Andrew Bartlett <abartlet@samba.org>
Pair-programmed-with: Douglas Bagnall <douglas.bagnall@catalyst.net.nz>
This commit is contained in:
Garming Sam 2017-04-10 10:16:57 +12:00 committed by Garming Sam
parent de26e2f87a
commit f4170a49fb
3 changed files with 593 additions and 0 deletions

View File

@ -328,3 +328,4 @@
^samba4.blackbox.trust_ntlm.Test10.*client.*with.Administrator@ADDOM.SAMBA.EXAMPLE.COM%locDCpass1\(fl2003dc:local\)
# We currently don't send referrals for LDAP modify of non-replicated attrs
^samba4.ldap.rodc.python\(rodc\).__main__.RodcTests.test_modify_nonreplicated.*
^samba4.ldap.rodc_rwdc.python.*.__main__.RodcRwdcTests.test_change_password_reveal_on_demand_kerberos

View File

@ -0,0 +1,585 @@
#!/usr/bin/python
# -*- coding: utf-8 -*-
"""Test communication of credentials etc, between an RODC and a RWDC.
How does it work when the password is changed on the RWDC?
"""
import optparse
import sys
import base64
import uuid
import subprocess
import itertools
import time
sys.path.insert(0, "bin/python")
import samba
import ldb
from samba.tests.subunitrun import SubunitOptions, TestProgram
import samba.getopt as options
from samba.auth import system_session
from samba.samdb import SamDB
from samba.credentials import Credentials, DONT_USE_KERBEROS, MUST_USE_KERBEROS
from samba import gensec, dsdb
def passwd_encode(pw):
return base64.b64encode(('"%s"' % pw).encode('utf-16-le'))
class RodcRwdcTestException(Exception):
pass
def make_creds(username, password, kerberos_state=None):
# use the global CREDS as a template
c = Credentials()
c.set_username(username)
c.set_password(password)
c.set_domain(CREDS.get_domain())
c.set_realm(CREDS.get_realm())
c.set_workstation(CREDS.get_workstation())
if kerberos_state is None:
kerberos_state = CREDS.get_kerberos_state()
c.set_kerberos_state(kerberos_state)
print '-' * 73
if kerberos_state == MUST_USE_KERBEROS:
print "we seem to be using kerberos for %s %s" % (username, password)
elif kerberos_state == DONT_USE_KERBEROS:
print "NOT using kerberos for %s %s" % (username, password)
else:
print "kerberos state is %s" % kerberos_state
c.set_gensec_features(c.get_gensec_features() |
gensec.FEATURE_SEAL)
return c
def set_auto_replication(dc, allow):
credstring = '-U%s%%%s' % (CREDS.get_username(),
CREDS.get_password())
on_or_off = '-' if allow else '+'
for opt in ['DISABLE_INBOUND_REPL',
'DISABLE_OUTBOUND_REPL']:
cmd = ['bin/samba-tool',
'drs', 'options',
credstring, dc,
"--dsa-option=%s%s" % (on_or_off, opt)]
p = subprocess.Popen(cmd,
stderr=subprocess.PIPE,
stdout=subprocess.PIPE)
stdout, stderr = p.communicate()
if p.returncode:
if 'LDAP_REFERRAL' not in stderr:
raise RodcRwdcTestException()
print ("ignoring +%s REFERRAL error; assuming %s is RODC" %
(opt, dc))
def preload_rodc_user(user_dn):
credstring = '-U%s%%%s' % (CREDS.get_username(),
CREDS.get_password())
set_auto_replication(RWDC, True)
cmd = ['bin/samba-tool',
'rodc', 'preload',
user_dn,
credstring,
'--server', RWDC,]
print ' '.join(cmd)
subprocess.check_call(cmd)
set_auto_replication(RWDC, False)
def get_server_ref_from_samdb(samdb):
server_name = samdb.get_serverName()
res = samdb.search(server_name,
scope=ldb.SCOPE_BASE,
attrs=['serverReference'])
return res[0]['serverReference'][0]
class RodcRwdcTests(samba.tests.TestCase):
counter = itertools.count(1).next
def force_replication(self, base=None):
if base is None:
base = self.base_dn
# XXX feels like a horrendous way to do it.
credstring = '-U%s%%%s' % (CREDS.get_username(),
CREDS.get_password())
cmd = ['bin/samba-tool',
'drs', 'replicate',
RODC, RWDC, base,
credstring,
'--sync-forced']
p = subprocess.Popen(cmd,
stderr=subprocess.PIPE,
stdout=subprocess.PIPE)
stdout, stderr = p.communicate()
if p.returncode:
print "failed with code %s" % p.returncode
print ' '.join(cmd)
print "stdout"
print stdout
print "stderr"
print stderr
raise RodcRwdcTestException()
def tearDown(self):
super(RodcRwdcTests, self).tearDown()
self.rwdc_db.set_dsheuristics(self.rwdc_dsheuristics)
CREDS.set_kerberos_state(DONT_USE_KERBEROS)
set_auto_replication(RWDC, True)
def setUp(self):
super(RodcRwdcTests, self).setUp()
self.rodc_db = SamDB('ldap://%s' % RODC, credentials=CREDS,
session_info=system_session(LP), lp=LP)
self.rwdc_db = SamDB('ldap://%s' % RWDC, credentials=CREDS,
session_info=system_session(LP), lp=LP)
self.base_dn = self.rwdc_db.domain_dn()
root = self.rodc_db.search(base='', scope=ldb.SCOPE_BASE,
attrs=['dsServiceName'])
self.service = root[0]['dsServiceName'][0]
self.tag = uuid.uuid4().hex
self.rwdc_dsheuristics = self.rwdc_db.get_dsheuristics()
self.rwdc_db.set_dsheuristics("000000001")
set_auto_replication(RWDC, False)
# make sure DCs are synchronized before the test
self.force_replication()
self.rwdc_dn = get_server_ref_from_samdb(self.rwdc_db)
self.rodc_dn = get_server_ref_from_samdb(self.rodc_db)
def assertReferral(self, fn, *args, **kwargs):
try:
fn(*args, **kwargs)
self.fail("failed to raise ldap referral")
except ldb.LdbError as (code, msg):
self.assertEqual(code, ldb.ERR_REFERRAL,
"expected referral, got %s %s" % (code, msg))
def _test_rodc_dsheuristics(self):
d = self.rodc_db.get_dsheuristics()
self.assertReferral(self.rodc_db.set_dsheuristics, "000000001")
self.assertReferral(self.rodc_db.set_dsheuristics, d)
def TEST_rodc_heuristics_kerberos(self):
CREDS.set_kerberos_state(MUST_USE_KERBEROS)
self._test_rodc_dsheuristics()
def TEST_rodc_heuristics_ntlm(self):
CREDS.set_kerberos_state(DONT_USE_KERBEROS)
self._test_rodc_dsheuristics()
def _test_add(self, objects, cross_ncs=False):
for o in objects:
dn = o['dn']
if cross_ncs:
base = str(self.rwdc_db.get_config_basedn())
controls = ["search_options:1:2"]
cn = dn.split(',', 1)[0]
expression = '(%s)' % cn
else:
base = dn
controls = []
expression = None
try:
res = self.rodc_db.search(base,
expression=expression,
scope=ldb.SCOPE_SUBTREE,
attrs=['dn'],
controls=controls)
self.assertEqual(len(res), 0)
except ldb.LdbError, e:
if e.args[0] != ldb.ERR_NO_SUCH_OBJECT:
raise
try:
self.rwdc_db.add(o)
except ldb.LdbError as (ecode, emsg):
self.fail("Failed to add %s to rwdc: ldb error: %s %s" %
(ecode, emsg))
if cross_ncs:
self.force_replication(base=base)
else:
self.force_replication()
try:
res = self.rodc_db.search(base,
expression=expression,
scope=ldb.SCOPE_SUBTREE,
attrs=['dn'],
controls=controls)
self.assertEqual(len(res), 1)
except ldb.LdbError, e:
self.assertNotEqual(e.args[0], ldb.ERR_NO_SUCH_OBJECT,
"replication seems to have failed")
def _test_add_replicated_objects(self, mode):
tag = "%s%s" % (self.tag, mode)
self._test_add([
{
'dn': "ou=%s1,%s" % (tag, self.base_dn),
"objectclass": "organizationalUnit"
},
{
'dn': "cn=%s2,%s" % (tag, self.base_dn),
"objectclass": "user"
},
{
'dn': "cn=%s3,%s" % (tag, self.base_dn),
"objectclass": "group"
},
])
self.rwdc_db.delete("ou=%s1,%s" % (tag, self.base_dn))
self.rwdc_db.delete("cn=%s2,%s" % (tag, self.base_dn))
self.rwdc_db.delete("cn=%s3,%s" % (tag, self.base_dn))
def test_add_replicated_objects_kerberos(self):
CREDS.set_kerberos_state(MUST_USE_KERBEROS)
self._test_add_replicated_objects('kerberos')
def test_add_replicated_objects_ntlm(self):
CREDS.set_kerberos_state(DONT_USE_KERBEROS)
self._test_add_replicated_objects('ntlm')
def _test_add_replicated_connections(self, mode):
tag = "%s%s" % (self.tag, mode)
self._test_add([
{
'dn': "cn=%sfoofoofoo,%s" % (tag, self.service),
"objectclass": "NTDSConnection",
'enabledConnection': 'TRUE',
'fromServer': self.base_dn,
'options': '0'
},
], cross_ncs=True)
self.rwdc_db.delete("cn=%sfoofoofoo,%s" % (tag, self.service))
def test_add_replicated_connections_kerberos(self):
CREDS.set_kerberos_state(MUST_USE_KERBEROS)
self._test_add_replicated_connections('kerberos')
def test_add_replicated_connections_ntlm(self):
CREDS.set_kerberos_state(DONT_USE_KERBEROS)
self._test_add_replicated_connections('ntlm')
def _test_modify_replicated_attributes(self):
dn = 'CN=Guest,CN=Users,' + self.base_dn
value = self.tag
for attr in ['carLicense', 'middleName']:
m = ldb.Message()
m.dn = ldb.Dn(self.rwdc_db, dn)
m[attr] = ldb.MessageElement(value,
ldb.FLAG_MOD_REPLACE,
attr)
try:
self.rwdc_db.modify(m)
except ldb.LdbError as e:
self.fail("Failed to modify %s %s on RWDC %s with %s" %
(dn, attr, RWDC, e))
self.force_replication()
try:
res = self.rodc_db.search(dn,
scope=ldb.SCOPE_SUBTREE,
attrs=[attr])
results = [x[attr][0] for x in res]
self.assertEqual(results, [value])
except ldb.LdbError, e:
self.assertNotEqual(e.args[0], ldb.ERR_NO_SUCH_OBJECT,
"replication seems to have failed")
def test_modify_replicated_attributes_kerberos(self):
CREDS.set_kerberos_state(MUST_USE_KERBEROS)
self._test_modify_replicated_attributes()
def test_modify_replicated_attributes_ntlm(self):
CREDS.set_kerberos_state(DONT_USE_KERBEROS)
self._test_modify_replicated_attributes()
def _test_add_modify_delete(self):
dn = "cn=%s_add_modify,%s" % (self.tag, self.base_dn)
values = ["%s%s" % (i, self.tag) for i in range(3)]
attr = "carLicense"
self._test_add([
{
'dn': dn,
"objectclass": "user",
attr: values[0]
},
])
self.force_replication()
for value in values[1:]:
m = ldb.Message()
m.dn = ldb.Dn(self.rwdc_db, dn)
m[attr] = ldb.MessageElement(value,
ldb.FLAG_MOD_REPLACE,
attr)
try:
self.rwdc_db.modify(m)
except ldb.LdbError as e:
self.fail("Failed to modify %s %s on RWDC %s with %s" %
(dn, attr, RWDC, e))
self.force_replication()
try:
res = self.rodc_db.search(dn,
scope=ldb.SCOPE_SUBTREE,
attrs=[attr])
results = [x[attr][0] for x in res]
self.assertEqual(results, [value])
except ldb.LdbError, e:
self.assertNotEqual(e.args[0], ldb.ERR_NO_SUCH_OBJECT,
"replication seems to have failed")
self.rwdc_db.delete(dn)
self.force_replication()
try:
res = self.rodc_db.search(dn,
scope=ldb.SCOPE_SUBTREE,
attrs=[attr])
if len(res) > 0:
self.fail("Failed to delete %s" % (dn))
except ldb.LdbError, e:
self.assertEqual(e.args[0], ldb.ERR_NO_SUCH_OBJECT,
"Failed to delete %s" % (dn))
def test_add_modify_delete_kerberos(self):
CREDS.set_kerberos_state(MUST_USE_KERBEROS)
self._test_add_modify_delete()
def test_add_modify_delete_ntlm(self):
CREDS.set_kerberos_state(DONT_USE_KERBEROS)
self._test_add_modify_delete()
def _new_user(self):
username = "u%sX%s" % (self.tag[:12], self.counter())
password = 'password#1'
dn = 'CN=%s,CN=Users,%s' % (username, self.base_dn)
o = {
'dn': dn,
"objectclass": "user",
'sAMAccountName': username,
}
try:
self.rwdc_db.add(o)
except ldb.LdbError as e:
self.fail("Failed to add %s to rwdc: ldb error: %s" % (o, e))
self.rwdc_db.modify_ldif("dn: %s\n"
"changetype: modify\n"
"delete: userPassword\n"
"add: userPassword\n"
"userPassword: %s\n" % (dn, password))
self.rwdc_db.enable_account("(sAMAccountName=%s)" % username)
return (dn, username, password)
def _change_password(self, user_dn, old_password, new_password):
self.rwdc_db.modify_ldif(
"dn: %s\n"
"changetype: modify\n"
"delete: userPassword\n"
"userPassword: %s\n"
"add: userPassword\n"
"userPassword: %s\n" % (user_dn, old_password, new_password))
def try_ldap_logon(self, server, creds, errno=None):
try:
tmpdb = SamDB('ldap://%s' % server, credentials=creds,
session_info=system_session(LP), lp=LP)
if errno is not None:
self.fail("logon failed to fail with ldb error %s" % errno)
except ldb.LdbError as (code, msg):
if code != errno:
if errno is None:
self.fail("logon incorrectly raised ldb error (code=%s)" %
code)
else:
self.fail("logon failed to raise correct ldb error"
"Expected: %s Got: %s" %
(errno, code))
def zero_min_password_age(self):
min_pwd_age = int(self.rwdc_db.get_minPwdAge())
if min_pwd_age != 0:
self.rwdc_db.set_minPwdAge('0')
def _test_ldap_change_password(self, errno=None):
self.zero_min_password_age()
dn, username, password = self._new_user()
creds1 = make_creds(username, password)
# With NTLM, this should fail on RODC before replication,
# because the user isn't known.
self.try_ldap_logon(RODC, creds1, ldb.ERR_INVALID_CREDENTIALS)
self.force_replication()
# Now the user is replicated to RODC, so logon should work
self.try_ldap_logon(RODC, creds1)
passwords = ['password#%s' % i for i in range(1, 6)]
for prev, password in zip(passwords[:-1], passwords[1:]):
self._change_password(dn, prev, password)
# The password has changed enough times to make the old
# password invalid (though with kerberos that doesn't matter).
# For NTLM, the old creds should always fail
self.try_ldap_logon(RODC, creds1, errno)
self.try_ldap_logon(RWDC, creds1, errno)
creds2 = make_creds(username, password)
# new creds work straight away with NTLM, because although it
# doesn't have the password, it knows the user and forwards
# the query.
self.try_ldap_logon(RODC, creds2)
self.try_ldap_logon(RWDC, creds2)
self.force_replication()
# After another replication check RODC still works and fails,
# as appropriate to various creds
self.try_ldap_logon(RODC, creds2)
self.try_ldap_logon(RODC, creds1, errno)
prev = password
password = 'password#6'
self._change_password(dn, prev, password)
creds3 = make_creds(username, password)
# previous password should still work.
self.try_ldap_logon(RWDC, creds2)
self.try_ldap_logon(RODC, creds2)
# new password should still work.
self.try_ldap_logon(RWDC, creds3)
self.try_ldap_logon(RODC, creds3)
# old password should still fail (but not on kerberos).
self.try_ldap_logon(RWDC, creds1, errno)
self.try_ldap_logon(RODC, creds1, errno)
def test_ldap_change_password_kerberos(self):
CREDS.set_kerberos_state(MUST_USE_KERBEROS)
self._test_ldap_change_password()
def test_ldap_change_password_ntlm(self):
CREDS.set_kerberos_state(DONT_USE_KERBEROS)
self._test_ldap_change_password(ldb.ERR_INVALID_CREDENTIALS)
def _test_ldap_change_password_reveal_on_demand(self, errno=None):
self.zero_min_password_age()
res = self.rodc_db.search(self.rodc_dn,
scope=ldb.SCOPE_BASE,
attrs=['msDS-RevealOnDemandGroup'])
group = res[0]['msDS-RevealOnDemandGroup'][0]
user_dn, username, password = self._new_user()
creds1 = make_creds(username, password)
m = ldb.Message()
m.dn = ldb.Dn(self.rwdc_db, group)
m['member'] = ldb.MessageElement(user_dn, ldb.FLAG_MOD_ADD, 'member')
self.rwdc_db.modify(m)
# Against Windows, this will just forward if no account exists on the KDC
# Therefore, this does not error on Windows.
self.try_ldap_logon(RODC, creds1, ldb.ERR_INVALID_CREDENTIALS)
self.force_replication()
# The proxy case
self.try_ldap_logon(RODC, creds1)
preload_rodc_user(user_dn)
# Now the user AND password are replicated to RODC, so logon should work (not proxy case)
self.try_ldap_logon(RODC, creds1)
passwords = ['password#%s' % i for i in range(1, 6)]
for prev, password in zip(passwords[:-1], passwords[1:]):
self._change_password(user_dn, prev, password)
# The password has changed enough times to make the old
# password invalid, but the RODC shouldn't know that.
self.try_ldap_logon(RODC, creds1)
self.try_ldap_logon(RWDC, creds1, errno)
creds2 = make_creds(username, password)
self.try_ldap_logon(RWDC, creds2)
self.try_ldap_logon(RODC, creds2, errno)
def test_change_password_reveal_on_demand_ntlm(self):
CREDS.set_kerberos_state(DONT_USE_KERBEROS)
self._test_ldap_change_password_reveal_on_demand(ldb.ERR_INVALID_CREDENTIALS)
def test_change_password_reveal_on_demand_kerberos(self):
CREDS.set_kerberos_state(MUST_USE_KERBEROS)
self._test_ldap_change_password_reveal_on_demand()
def main():
global RODC, RWDC, CREDS, LP
parser = optparse.OptionParser(
"rodc_rwdc.py [options] <rodc host> <rwdc host>")
sambaopts = options.SambaOptions(parser)
versionopts = options.VersionOptions(parser)
credopts = options.CredentialsOptions(parser)
subunitopts = SubunitOptions(parser)
parser.add_option_group(sambaopts)
parser.add_option_group(versionopts)
parser.add_option_group(credopts)
parser.add_option_group(subunitopts)
opts, args = parser.parse_args()
LP = sambaopts.get_loadparm()
CREDS = credopts.get_credentials(LP)
CREDS.set_gensec_features(CREDS.get_gensec_features() |
gensec.FEATURE_SEAL)
try:
RODC, RWDC = args
except ValueError:
parser.print_usage()
sys.exit(1)
set_auto_replication(RWDC, True)
try:
TestProgram(module=__name__, opts=subunitopts)
finally:
set_auto_replication(RWDC, True)
main()

View File

@ -651,6 +651,13 @@ plantestsuite_loadlist("samba4.ldap.rodc.python(rodc)", "rodc",
'$SERVER', '-U"$USERNAME%$PASSWORD"',
'--workgroup=$DOMAIN', '$LOADLIST', '$LISTOPT'])
plantestsuite_loadlist("samba4.ldap.rodc_rwdc.python(rodc)", "rodc:local",
[python,
os.path.join(samba4srcdir,
"dsdb/tests/python/rodc_rwdc.py"),
'$SERVER', '$DC_SERVER', '-U"$USERNAME%$PASSWORD"',
'--workgroup=$DOMAIN', '$LOADLIST', '$LISTOPT'])
for env in ["ad_dc_ntvfs", "fl2000dc", "fl2003dc", "fl2008r2dc"]:
plantestsuite_loadlist("samba4.ldap_schema.python(%s)" % env, env, [python, os.path.join(samba4srcdir, "dsdb/tests/python/ldap_schema.py"), '$SERVER', '-U"$USERNAME%$PASSWORD"', '--workgroup=$DOMAIN', '$LOADLIST', '$LISTOPT'])
plantestsuite("samba4.ldap.possibleInferiors.python(%s)" % env, env, [python, os.path.join(samba4srcdir, "dsdb/samdb/ldb_modules/tests/possibleinferiors.py"), "ldap://$SERVER", '-U"$USERNAME%$PASSWORD"', "-W$DOMAIN"])