1
0
mirror of https://github.com/samba-team/samba.git synced 2025-01-11 05:18:09 +03:00

tests/rodc: Add password lockout tests with RODC-auth, RWDC-check

This occurs when the password is preloaded, and the bad logins and
successes must be forwarded the the RWDC.

The password server MUST be localdc.

Signed-off-by: Garming Sam <garming@catalyst.net.nz>
Reviewed-by: Andrew Bartlett <abartlet@samba.org>
This commit is contained in:
Garming Sam 2017-04-21 15:21:58 +12:00 committed by Andrew Bartlett
parent e418db6ea1
commit 7dfe7df6d0
2 changed files with 504 additions and 32 deletions

View File

@ -105,7 +105,8 @@ class BasePasswordTestCase(samba.tests.TestCase):
userAccountControl=None,
msDSUserAccountControlComputed=None,
effective_bad_password_count=None,
msg=None):
msg=None,
badPwdCountOnly=False):
print '-=' * 36
if msg is not None:
print "\033[01;32m %s \033[00m\n" % msg
@ -128,17 +129,18 @@ class BasePasswordTestCase(samba.tests.TestCase):
res = self.ldb.search(dn, scope=SCOPE_BASE, attrs=attrs)
self.assertTrue(len(res) == 1)
self._check_attribute(res, "badPwdCount", badPwdCount)
self._check_attribute(res, "badPasswordTime", badPasswordTime)
self._check_attribute(res, "logonCount", logonCount)
self._check_attribute(res, "lastLogon", lastLogon)
self._check_attribute(res, "lastLogonTimestamp", lastLogonTimestamp)
self._check_attribute(res, "lockoutTime", lockoutTime)
self._check_attribute(res, "userAccountControl", userAccountControl)
self._check_attribute(res, "msDS-User-Account-Control-Computed",
msDSUserAccountControlComputed)
self._check_attribute(res, "badPasswordTime", badPasswordTime)
if not badPwdCountOnly:
self._check_attribute(res, "logonCount", logonCount)
self._check_attribute(res, "lastLogon", lastLogon)
self._check_attribute(res, "lastLogonTimestamp", lastLogonTimestamp)
self._check_attribute(res, "userAccountControl", userAccountControl)
self._check_attribute(res, "msDS-User-Account-Control-Computed",
msDSUserAccountControlComputed)
lastLogon = int(res[0]["lastLogon"][0])
logonCount = int(res[0]["logonCount"][0])
lastLogon = int(res[0]["lastLogon"][0])
logonCount = int(res[0]["logonCount"][0])
samr_user = self._open_samr_user(res)
uinfo3 = self.samr.QueryUserInfo(samr_user, 3)
@ -148,16 +150,21 @@ class BasePasswordTestCase(samba.tests.TestCase):
self.samr.Close(samr_user)
expected_acb_info = 0
if userAccountControl & dsdb.UF_NORMAL_ACCOUNT:
expected_acb_info |= samr.ACB_NORMAL
if userAccountControl & dsdb.UF_ACCOUNTDISABLE:
expected_acb_info |= samr.ACB_DISABLED
if userAccountControl & dsdb.UF_PASSWD_NOTREQD:
expected_acb_info |= samr.ACB_PWNOTREQ
if msDSUserAccountControlComputed & dsdb.UF_LOCKOUT:
expected_acb_info |= samr.ACB_AUTOLOCK
if msDSUserAccountControlComputed & dsdb.UF_PASSWORD_EXPIRED:
expected_acb_info |= samr.ACB_PW_EXPIRED
if not badPwdCountOnly:
if userAccountControl & dsdb.UF_NORMAL_ACCOUNT:
expected_acb_info |= samr.ACB_NORMAL
if userAccountControl & dsdb.UF_ACCOUNTDISABLE:
expected_acb_info |= samr.ACB_DISABLED
if userAccountControl & dsdb.UF_PASSWD_NOTREQD:
expected_acb_info |= samr.ACB_PWNOTREQ
if msDSUserAccountControlComputed & dsdb.UF_LOCKOUT:
expected_acb_info |= samr.ACB_AUTOLOCK
if msDSUserAccountControlComputed & dsdb.UF_PASSWORD_EXPIRED:
expected_acb_info |= samr.ACB_PW_EXPIRED
self.assertEquals(uinfo3.acct_flags, expected_acb_info)
self.assertEquals(uinfo3.last_logon, lastLogon)
self.assertEquals(uinfo3.logon_count, logonCount)
expected_bad_password_count = 0
if badPwdCount is not None:
@ -165,22 +172,21 @@ class BasePasswordTestCase(samba.tests.TestCase):
if effective_bad_password_count is None:
effective_bad_password_count = expected_bad_password_count
self.assertEquals(uinfo3.acct_flags, expected_acb_info)
self.assertEquals(uinfo3.bad_password_count, expected_bad_password_count)
self.assertEquals(uinfo3.last_logon, lastLogon)
self.assertEquals(uinfo3.logon_count, logonCount)
self.assertEquals(uinfo5.acct_flags, expected_acb_info)
self.assertEquals(uinfo5.bad_password_count, effective_bad_password_count)
self.assertEquals(uinfo5.last_logon, lastLogon)
self.assertEquals(uinfo5.logon_count, logonCount)
if not badPwdCountOnly:
self.assertEquals(uinfo5.acct_flags, expected_acb_info)
self.assertEquals(uinfo5.bad_password_count, effective_bad_password_count)
self.assertEquals(uinfo5.last_logon, lastLogon)
self.assertEquals(uinfo5.logon_count, logonCount)
self.assertEquals(uinfo16.acct_flags, expected_acb_info)
self.assertEquals(uinfo16.acct_flags, expected_acb_info)
self.assertEquals(uinfo21.acct_flags, expected_acb_info)
self.assertEquals(uinfo21.bad_password_count, effective_bad_password_count)
self.assertEquals(uinfo21.last_logon, lastLogon)
self.assertEquals(uinfo21.logon_count, logonCount)
self.assertEquals(uinfo21.acct_flags, expected_acb_info)
self.assertEquals(uinfo21.bad_password_count, effective_bad_password_count)
self.assertEquals(uinfo21.last_logon, lastLogon)
self.assertEquals(uinfo21.logon_count, logonCount)
# check LDAP again and make sure the samr.QueryUserInfo
# doesn't have any impact.

View File

@ -112,7 +112,473 @@ def get_server_ref_from_samdb(samdb):
return res[0]['serverReference'][0]
class RodcRwdcCachedTests(password_lockout_base.BasePasswordTestCase):
counter = itertools.count(1).next
def _check_account_initial(self, dn):
self.force_replication()
return super(RodcRwdcCachedTests, self)._check_account_initial(dn)
def _check_account(self, dn,
badPwdCount=None,
badPasswordTime=None,
logonCount=None,
lastLogon=None,
lastLogonTimestamp=None,
lockoutTime=None,
userAccountControl=None,
msDSUserAccountControlComputed=None,
effective_bad_password_count=None,
msg=None,
badPwdCountOnly=False):
# Wait for the RWDC to get any delayed messages
# e.g. SendToSam or KRB5 bad passwords via winbindd
if (self.kerberos and isinstance(badPasswordTime, tuple) or
badPwdCount == 0):
time.sleep(5)
return super(RodcRwdcCachedTests,
self)._check_account(dn, badPwdCount, badPasswordTime,
logonCount, lastLogon,
lastLogonTimestamp, lockoutTime,
userAccountControl,
msDSUserAccountControlComputed,
effective_bad_password_count, msg,
True)
def force_replication(self, base=None):
if base is None:
base = self.base_dn
# XXX feels like a horrendous way to do it.
credstring = '-U%s%%%s' % (CREDS.get_username(),
CREDS.get_password())
cmd = ['bin/samba-tool',
'drs', 'replicate',
RODC, RWDC, base,
credstring,
'--sync-forced']
p = subprocess.Popen(cmd,
stderr=subprocess.PIPE,
stdout=subprocess.PIPE)
stdout, stderr = p.communicate()
if p.returncode:
print "failed with code %s" % p.returncode
print ' '.join(cmd)
print "stdout"
print stdout
print "stderr"
print stderr
raise RodcRwdcTestException()
def tearDown(self):
super(RodcRwdcCachedTests, self).tearDown()
set_auto_replication(RWDC, True)
def setUp(self):
self.kerberos = False # To be set later
self.rodc_db = SamDB('ldap://%s' % RODC, credentials=CREDS,
session_info=system_session(LP), lp=LP)
self.rwdc_db = SamDB('ldap://%s' % RWDC, credentials=CREDS,
session_info=system_session(LP), lp=LP)
# Define variables for BasePasswordTestCase
self.lp = LP
self.global_creds = CREDS
self.host = RWDC
self.host_url = 'ldap://%s' % RWDC
self.ldb = SamDB(url='ldap://%s' % RWDC, session_info=system_session(self.lp),
credentials=self.global_creds, lp=self.lp)
super(RodcRwdcCachedTests, self).setUp()
self.host_url = 'ldap://%s' % RODC
self.samr = samr.samr("ncacn_ip_tcp:%s[seal]" % self.host, self.lp, self.global_creds)
self.samr_handle = self.samr.Connect2(None, security.SEC_FLAG_MAXIMUM_ALLOWED)
self.samr_domain = self.samr.OpenDomain(self.samr_handle, security.SEC_FLAG_MAXIMUM_ALLOWED, self.domain_sid)
self.base_dn = self.rwdc_db.domain_dn()
root = self.rodc_db.search(base='', scope=ldb.SCOPE_BASE,
attrs=['dsServiceName'])
self.service = root[0]['dsServiceName'][0]
self.tag = uuid.uuid4().hex
self.rwdc_dsheuristics = self.rwdc_db.get_dsheuristics()
self.rwdc_db.set_dsheuristics("000000001")
set_auto_replication(RWDC, False)
# make sure DCs are synchronized before the test
self.force_replication()
def test_login_lockout_krb5(self):
username = self.lockout1krb5_creds.get_username()
userpass = self.lockout1krb5_creds.get_password()
userdn = "cn=%s,cn=users,%s" % (username, self.base_dn)
preload_rodc_user(userdn)
self.kerberos = True
self.rodc_dn = get_server_ref_from_samdb(self.rodc_db)
res = self.rodc_db.search(self.rodc_dn,
scope=ldb.SCOPE_BASE,
attrs=['msDS-RevealOnDemandGroup'])
group = res[0]['msDS-RevealOnDemandGroup'][0]
m = ldb.Message()
m.dn = ldb.Dn(self.rwdc_db, group)
m['member'] = ldb.MessageElement(userdn, ldb.FLAG_MOD_ADD, 'member')
self.rwdc_db.modify(m)
m = ldb.Message()
m.dn = ldb.Dn(self.ldb, self.base_dn)
self.account_lockout_duration = 10
account_lockout_duration_ticks = -int(self.account_lockout_duration * (1e7))
m["lockoutDuration"] = ldb.MessageElement(str(account_lockout_duration_ticks),
ldb.FLAG_MOD_REPLACE,
"lockoutDuration")
self.lockout_observation_window = 10
lockout_observation_window_ticks = -int(self.lockout_observation_window * (1e7))
m["lockOutObservationWindow"] = ldb.MessageElement(str(lockout_observation_window_ticks),
ldb.FLAG_MOD_REPLACE,
"lockOutObservationWindow")
self.rwdc_db.modify(m)
self.force_replication()
self._test_login_lockout_rodc_rwdc(self.lockout1krb5_creds, userdn)
def test_login_lockout_ntlm(self):
username = self.lockout1ntlm_creds.get_username()
userpass = self.lockout1ntlm_creds.get_password()
userdn = "cn=%s,cn=users,%s" % (username, self.base_dn)
preload_rodc_user(userdn)
self.kerberos = False
self.rodc_dn = get_server_ref_from_samdb(self.rodc_db)
res = self.rodc_db.search(self.rodc_dn,
scope=ldb.SCOPE_BASE,
attrs=['msDS-RevealOnDemandGroup'])
group = res[0]['msDS-RevealOnDemandGroup'][0]
m = ldb.Message()
m.dn = ldb.Dn(self.rwdc_db, group)
m['member'] = ldb.MessageElement(userdn, ldb.FLAG_MOD_ADD, 'member')
self.rwdc_db.modify(m)
self._test_login_lockout_rodc_rwdc(self.lockout1ntlm_creds, userdn)
def _test_login_lockout_rodc_rwdc(self, creds, userdn):
username = creds.get_username()
userpass = creds.get_password()
# Open a second LDB connection with the user credentials. Use the
# command line credentials for informations like the domain, the realm
# and the workstation.
creds_lockout = self.insta_creds(creds)
# The wrong password
creds_lockout.set_password("thatsAcomplPASS1x")
self.assertLoginFailure(self.host_url, creds_lockout, self.lp)
badPasswordTime = 0
logonCount = 0
lastLogon = 0
lastLogonTimestamp=0
logoncount_relation = ''
lastlogon_relation = ''
res = self._check_account(userdn,
badPwdCount=1,
badPasswordTime=("greater", badPasswordTime),
logonCount=logonCount,
lastLogon=lastLogon,
lastLogonTimestamp=lastLogonTimestamp,
userAccountControl=
dsdb.UF_NORMAL_ACCOUNT,
msDSUserAccountControlComputed=0,
msg='lastlogontimestamp with wrong password')
badPasswordTime = int(res[0]["badPasswordTime"][0])
# Correct old password
creds_lockout.set_password(userpass)
ldb_lockout = SamDB(url=self.host_url, credentials=creds_lockout, lp=self.lp)
# lastLogonTimestamp should not change
# lastLogon increases if badPwdCount is non-zero (!)
res = self._check_account(userdn,
badPwdCount=0,
badPasswordTime=badPasswordTime,
logonCount=(logoncount_relation, logonCount),
lastLogon=('greater', lastLogon),
lastLogonTimestamp=lastLogonTimestamp,
userAccountControl=
dsdb.UF_NORMAL_ACCOUNT,
msDSUserAccountControlComputed=0,
msg='LLTimestamp is updated to lastlogon')
logonCount = int(res[0]["logonCount"][0])
lastLogon = int(res[0]["lastLogon"][0])
# The wrong password
creds_lockout.set_password("thatsAcomplPASS1x")
self.assertLoginFailure(self.host_url, creds_lockout, self.lp)
res = self._check_account(userdn,
badPwdCount=1,
badPasswordTime=("greater", badPasswordTime),
logonCount=logonCount,
lastLogon=lastLogon,
lastLogonTimestamp=lastLogonTimestamp,
userAccountControl=
dsdb.UF_NORMAL_ACCOUNT,
msDSUserAccountControlComputed=0)
badPasswordTime = int(res[0]["badPasswordTime"][0])
# The wrong password
creds_lockout.set_password("thatsAcomplPASS1x")
try:
ldb_lockout = SamDB(url=self.host_url, credentials=creds_lockout, lp=self.lp)
self.fail()
except LdbError, (num, msg):
self.assertEquals(num, ERR_INVALID_CREDENTIALS)
res = self._check_account(userdn,
badPwdCount=2,
badPasswordTime=("greater", badPasswordTime),
logonCount=logonCount,
lastLogon=lastLogon,
lastLogonTimestamp=lastLogonTimestamp,
userAccountControl=
dsdb.UF_NORMAL_ACCOUNT,
msDSUserAccountControlComputed=0)
badPasswordTime = int(res[0]["badPasswordTime"][0])
print "two failed password change"
# The wrong password
creds_lockout.set_password("thatsAcomplPASS1x")
try:
ldb_lockout = SamDB(url=self.host_url, credentials=creds_lockout, lp=self.lp)
self.fail()
except LdbError, (num, msg):
self.assertEquals(num, ERR_INVALID_CREDENTIALS)
res = self._check_account(userdn,
badPwdCount=3,
badPasswordTime=("greater", badPasswordTime),
logonCount=logonCount,
lastLogon=lastLogon,
lastLogonTimestamp=lastLogonTimestamp,
lockoutTime=("greater", badPasswordTime),
userAccountControl=
dsdb.UF_NORMAL_ACCOUNT,
msDSUserAccountControlComputed=dsdb.UF_LOCKOUT)
badPasswordTime = int(res[0]["badPasswordTime"][0])
lockoutTime = int(res[0]["lockoutTime"][0])
# The wrong password
creds_lockout.set_password("thatsAcomplPASS1x")
try:
ldb_lockout = SamDB(url=self.host_url, credentials=creds_lockout, lp=self.lp)
self.fail()
except LdbError, (num, msg):
self.assertEquals(num, ERR_INVALID_CREDENTIALS)
res = self._check_account(userdn,
badPwdCount=3,
badPasswordTime=badPasswordTime,
logonCount=logonCount,
lastLogon=lastLogon,
lastLogonTimestamp=lastLogonTimestamp,
lockoutTime=lockoutTime,
userAccountControl=
dsdb.UF_NORMAL_ACCOUNT,
msDSUserAccountControlComputed=dsdb.UF_LOCKOUT)
# The wrong password
creds_lockout.set_password("thatsAcomplPASS1x")
try:
ldb_lockout = SamDB(url=self.host_url, credentials=creds_lockout, lp=self.lp)
self.fail()
except LdbError, (num, msg):
self.assertEquals(num, ERR_INVALID_CREDENTIALS)
res = self._check_account(userdn,
badPwdCount=3,
badPasswordTime=badPasswordTime,
logonCount=logonCount,
lastLogon=lastLogon,
lastLogonTimestamp=lastLogonTimestamp,
lockoutTime=lockoutTime,
userAccountControl=
dsdb.UF_NORMAL_ACCOUNT,
msDSUserAccountControlComputed=dsdb.UF_LOCKOUT)
# The correct password, but we are locked out
creds_lockout.set_password(userpass)
try:
ldb_lockout = SamDB(url=self.host_url, credentials=creds_lockout, lp=self.lp)
self.fail()
except LdbError, (num, msg):
self.assertEquals(num, ERR_INVALID_CREDENTIALS)
res = self._check_account(userdn,
badPwdCount=3,
badPasswordTime=badPasswordTime,
logonCount=logonCount,
lastLogon=lastLogon,
lastLogonTimestamp=lastLogonTimestamp,
lockoutTime=lockoutTime,
userAccountControl=
dsdb.UF_NORMAL_ACCOUNT,
msDSUserAccountControlComputed=dsdb.UF_LOCKOUT)
# wait for the lockout to end
time.sleep(self.account_lockout_duration + 1)
print self.account_lockout_duration + 1
res = self._check_account(userdn,
badPwdCount=3, effective_bad_password_count=0,
badPasswordTime=badPasswordTime,
logonCount=logonCount,
lockoutTime=lockoutTime,
lastLogon=lastLogon,
lastLogonTimestamp=lastLogonTimestamp,
userAccountControl=
dsdb.UF_NORMAL_ACCOUNT,
msDSUserAccountControlComputed=0)
# The correct password after letting the timeout expire
creds_lockout.set_password(userpass)
creds_lockout2 = self.insta_creds(creds_lockout)
ldb_lockout = SamDB(url=self.host_url, credentials=creds_lockout2, lp=self.lp)
time.sleep(3)
res = self._check_account(userdn,
badPwdCount=0,
badPasswordTime=badPasswordTime,
logonCount=(logoncount_relation, logonCount),
lastLogon=(lastlogon_relation, lastLogon),
lastLogonTimestamp=lastLogonTimestamp,
lockoutTime=lockoutTime,
userAccountControl=
dsdb.UF_NORMAL_ACCOUNT,
msDSUserAccountControlComputed=0,
msg="lastLogon is way off")
# The wrong password
creds_lockout.set_password("thatsAcomplPASS1x")
try:
ldb_lockout = SamDB(url=self.host_url, credentials=creds_lockout, lp=self.lp)
self.fail()
except LdbError, (num, msg):
self.assertEquals(num, ERR_INVALID_CREDENTIALS)
res = self._check_account(userdn,
badPwdCount=1,
badPasswordTime=("greater", badPasswordTime),
logonCount=logonCount,
lockoutTime=lockoutTime,
lastLogon=lastLogon,
lastLogonTimestamp=lastLogonTimestamp,
userAccountControl=
dsdb.UF_NORMAL_ACCOUNT,
msDSUserAccountControlComputed=0)
badPasswordTime = int(res[0]["badPasswordTime"][0])
# The wrong password
creds_lockout.set_password("thatsAcomplPASS1x")
try:
ldb_lockout = SamDB(url=self.host_url, credentials=creds_lockout, lp=self.lp)
self.fail()
except LdbError, (num, msg):
self.assertEquals(num, ERR_INVALID_CREDENTIALS)
res = self._check_account(userdn,
badPwdCount=2,
badPasswordTime=("greater", badPasswordTime),
logonCount=logonCount,
lockoutTime=lockoutTime,
lastLogon=lastLogon,
lastLogonTimestamp=lastLogonTimestamp,
userAccountControl=
dsdb.UF_NORMAL_ACCOUNT,
msDSUserAccountControlComputed=0)
badPasswordTime = int(res[0]["badPasswordTime"][0])
time.sleep(self.lockout_observation_window + 1)
res = self._check_account(userdn,
badPwdCount=2, effective_bad_password_count=0,
badPasswordTime=badPasswordTime,
logonCount=logonCount,
lockoutTime=lockoutTime,
lastLogon=lastLogon,
lastLogonTimestamp=lastLogonTimestamp,
userAccountControl=
dsdb.UF_NORMAL_ACCOUNT,
msDSUserAccountControlComputed=0)
# The wrong password
creds_lockout.set_password("thatsAcomplPASS1x")
try:
ldb_lockout = SamDB(url=self.host_url, credentials=creds_lockout, lp=self.lp)
self.fail()
except LdbError, (num, msg):
self.assertEquals(num, ERR_INVALID_CREDENTIALS)
res = self._check_account(userdn,
badPwdCount=1,
badPasswordTime=("greater", badPasswordTime),
logonCount=logonCount,
lockoutTime=lockoutTime,
lastLogon=lastLogon,
lastLogonTimestamp=lastLogonTimestamp,
userAccountControl=
dsdb.UF_NORMAL_ACCOUNT,
msDSUserAccountControlComputed=0)
badPasswordTime = int(res[0]["badPasswordTime"][0])
# The correct password without letting the timeout expire
creds_lockout.set_password(userpass)
ldb_lockout = SamDB(url=self.host_url, credentials=creds_lockout, lp=self.lp)
res = self._check_account(userdn,
badPwdCount=0,
badPasswordTime=badPasswordTime,
logonCount=(logoncount_relation, logonCount),
lockoutTime=lockoutTime,
lastLogon=("greater", lastLogon),
lastLogonTimestamp=lastLogonTimestamp,
userAccountControl=
dsdb.UF_NORMAL_ACCOUNT,
msDSUserAccountControlComputed=0)
class RodcRwdcTests(password_lockout_base.BasePasswordTestCase):
counter = itertools.count(1).next