1
0
mirror of https://github.com/samba-team/samba.git synced 2025-01-14 19:24:43 +03:00
samba-mirror/python/samba/tests/krb5/lockout_tests.py
Joseph Sutton 4ae7f1cb98 tests/krb5: Remove client_as_etypes parameter
The client_as_etypes parameter previously indicated which etypes we
thought the client supported. In practice, this was rarely specified, so
we simply assumed that all three main enctypes were supported.

Now that we have removed this parameter, rewrite the etype-info padata
checking code to be simpler, and no longer to contain loops.

Use get_default_enctypes() to determine which enctypes are supported.
For tests that inherit from KDCBaseTest, this is based on the domain
functional level, and will be more correct for tests that previously
passed in client_as_etypes=None.

Signed-off-by: Joseph Sutton <josephsutton@catalyst.net.nz>
Reviewed-by: Andrew Bartlett <abartlet@samba.org>
2023-03-03 01:07:36 +00:00

1089 lines
40 KiB
Python
Executable File

#!/usr/bin/env python3
# Unix SMB/CIFS implementation.
# Copyright (C) Stefan Metzmacher 2020
# Copyright (C) Catalyst.Net Ltd
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
import sys
import os
sys.path.insert(0, 'bin/python')
os.environ['PYTHONUNBUFFERED'] = '1'
from concurrent import futures
from enum import Enum
from functools import partial
from multiprocessing import Pipe
import time
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives.ciphers.base import Cipher
from cryptography.hazmat.primitives.ciphers import algorithms
import ldb
from samba import (
NTSTATUSError,
dsdb,
generate_random_bytes,
generate_random_password,
ntstatus,
unix2nttime,
werror,
)
from samba.credentials import DONT_USE_KERBEROS, MUST_USE_KERBEROS
from samba.crypto import (
aead_aes_256_cbc_hmac_sha512_blob,
des_crypt_blob_16,
md4_hash_blob,
sha512_pbkdf2,
)
from samba.dcerpc import lsa, samr
from samba.samdb import SamDB
from samba.tests import connect_samdb, env_get_var_value, env_loadparm
from samba.tests.krb5.as_req_tests import AsReqBaseTest
from samba.tests.krb5 import kcrypto
from samba.tests.krb5.kdc_base_test import KDCBaseTest
from samba.tests.krb5.raw_testcase import KerberosCredentials
import samba.tests.krb5.rfc4120_pyasn1 as krb5_asn1
from samba.tests.krb5.rfc4120_constants import (
KDC_ERR_CLIENT_REVOKED,
KDC_ERR_PREAUTH_FAILED,
KRB_AS_REP,
KRB_ERROR,
NT_PRINCIPAL,
NT_SRV_INST,
)
global_asn1_print = False
global_hexdump = False
class ConnectionResult(Enum):
LOCKED_OUT = 1
WRONG_PASSWORD = 2
SUCCESS = 3
def connect_kdc(pipe,
url,
hostname,
username,
password,
domain,
realm,
workstation,
dn,
expect_error=True):
AsReqBaseTest.setUpClass()
as_req_base = AsReqBaseTest()
as_req_base.setUp()
user_creds = KerberosCredentials()
user_creds.set_username(username)
user_creds.set_password(password)
user_creds.set_domain(domain)
user_creds.set_realm(realm)
user_creds.set_workstation(workstation)
user_creds.set_kerberos_state(DONT_USE_KERBEROS)
user_name = user_creds.get_username()
cname = as_req_base.PrincipalName_create(name_type=NT_PRINCIPAL,
names=user_name.split('/'))
krbtgt_creds = as_req_base.get_krbtgt_creds()
krbtgt_supported_etypes = krbtgt_creds.tgs_supported_enctypes
realm = krbtgt_creds.get_realm()
krbtgt_account = krbtgt_creds.get_username()
sname = as_req_base.PrincipalName_create(name_type=NT_SRV_INST,
names=[krbtgt_account, realm])
expected_salt = user_creds.get_salt()
till = as_req_base.get_KerberosTime(offset=36000)
kdc_options = krb5_asn1.KDCOptions('postdated')
preauth_key = as_req_base.PasswordKey_from_creds(user_creds,
kcrypto.Enctype.AES256)
ts_enc_padata = as_req_base.get_enc_timestamp_pa_data_from_key(preauth_key)
padata = [ts_enc_padata]
krbtgt_decryption_key = (
as_req_base.TicketDecryptionKey_from_creds(krbtgt_creds))
etypes = as_req_base.get_default_enctypes()
if expect_error:
expected_error_modes = (KDC_ERR_CLIENT_REVOKED,
KDC_ERR_PREAUTH_FAILED)
else:
expected_error_modes = 0
# Remove the LDAP connection.
del type(as_req_base)._ldb
# Indicate that we're ready. This ensures we hit the right transaction
# lock.
pipe.send_bytes(b'0')
# Wait for the main process to take out a transaction lock.
if not pipe.poll(timeout=5):
raise AssertionError('main process failed to indicate readiness')
# Try making a Kerberos AS-REQ to the KDC. This should fail, either due to
# the user's account being locked out or due to using the wrong password.
as_rep, kdc_exchange_dict = as_req_base._test_as_exchange(
cname=cname,
realm=realm,
sname=sname,
till=till,
expected_error_mode=expected_error_modes,
expected_crealm=realm,
expected_cname=cname,
expected_srealm=realm,
expected_sname=sname,
expected_salt=expected_salt,
etypes=etypes,
padata=padata,
kdc_options=kdc_options,
expected_supported_etypes=krbtgt_supported_etypes,
expected_account_name=user_name,
preauth_key=preauth_key,
ticket_decryption_key=krbtgt_decryption_key,
pac_request=True)
as_req_base.assertIsNotNone(as_rep)
msg_type = as_rep['msg-type']
if expect_error and msg_type != KRB_ERROR or (
not expect_error and msg_type != KRB_AS_REP):
raise AssertionError(f'wrong message type {msg_type}')
if not expect_error:
return ConnectionResult.SUCCESS
error_code = as_rep['error-code']
if error_code == KDC_ERR_CLIENT_REVOKED:
return ConnectionResult.LOCKED_OUT
elif error_code == KDC_ERR_PREAUTH_FAILED:
return ConnectionResult.WRONG_PASSWORD
else:
raise AssertionError(f'wrong error code {error_code}')
def connect_ntlm(pipe,
url,
hostname,
username,
password,
domain,
realm,
workstation,
dn):
user_creds = KerberosCredentials()
user_creds.set_username(username)
user_creds.set_password(password)
user_creds.set_domain(domain)
user_creds.set_workstation(workstation)
user_creds.set_kerberos_state(DONT_USE_KERBEROS)
# Indicate that we're ready. This ensures we hit the right transaction
# lock.
pipe.send_bytes(b'0')
# Wait for the main process to take out a transaction lock.
if not pipe.poll(timeout=5):
raise AssertionError('main process failed to indicate readiness')
try:
# Try connecting to SamDB. This should fail, either due to our
# account being locked out or due to using the wrong password.
SamDB(url=url,
credentials=user_creds,
lp=env_loadparm())
except ldb.LdbError as err:
num, estr = err.args
if num != ldb.ERR_INVALID_CREDENTIALS:
raise AssertionError(f'connection raised wrong error code '
f'({err})')
if f'data {werror.WERR_ACCOUNT_LOCKED_OUT:x},' in estr:
return ConnectionResult.LOCKED_OUT
elif f'data {werror.WERR_LOGON_FAILURE:x},' in estr:
return ConnectionResult.WRONG_PASSWORD
else:
raise AssertionError(f'connection raised wrong error code '
f'({estr})')
else:
return ConnectionResult.SUCCESS
def connect_samr(pipe,
url,
hostname,
username,
password,
domain,
realm,
workstation,
dn):
# Get the user's NT hash.
user_creds = KerberosCredentials()
user_creds.set_password(password)
nt_hash = user_creds.get_nt_hash()
# Generate a new UTF-16 password.
new_password = generate_random_password(32, 32)
new_password = new_password.encode('utf-16le')
# Generate the MD4 hash of the password.
new_password_md4 = md4_hash_blob(new_password)
# Prefix the password with padding so it is 512 bytes long.
new_password_len = len(new_password)
remaining_len = 512 - new_password_len
new_password = bytes(remaining_len) + new_password
# Append the 32-bit length of the password..
new_password += int.to_bytes(new_password_len,
length=4,
byteorder='little')
# Encrypt the password with RC4 and the existing NT hash.
encryptor = Cipher(algorithms.ARC4(nt_hash),
None,
default_backend()).encryptor()
new_password = encryptor.update(new_password)
# Create a key from the MD4 hash of the new password.
key = new_password_md4[:14]
# Encrypt the old NT hash with DES to obtain the verifier.
verifier = des_crypt_blob_16(nt_hash, key)
server = lsa.String()
server.string = hostname
account = lsa.String()
account.string = username
nt_password = samr.CryptPassword()
nt_password.data = list(new_password)
nt_verifier = samr.Password()
nt_verifier.hash = list(verifier)
conn = samr.samr(f'ncacn_np:{hostname}[krb5,seal,smb2]')
# Indicate that we're ready. This ensures we hit the right transaction
# lock.
pipe.send_bytes(b'0')
# Wait for the main process to take out a transaction lock.
if not pipe.poll(timeout=5):
raise AssertionError('main process failed to indicate readiness')
try:
# Try changing the password. This should fail, either due to our
# account being locked out or due to using the wrong password.
conn.ChangePasswordUser3(server=server,
account=account,
nt_password=nt_password,
nt_verifier=nt_verifier,
lm_change=True,
lm_password=None,
lm_verifier=None,
password3=None)
except NTSTATUSError as err:
num, estr = err.args
if num == ntstatus.NT_STATUS_ACCOUNT_LOCKED_OUT:
return ConnectionResult.LOCKED_OUT
elif num == ntstatus.NT_STATUS_WRONG_PASSWORD:
return ConnectionResult.WRONG_PASSWORD
else:
raise AssertionError(f'pwd change raised wrong error code '
f'({num:08X})')
else:
return ConnectionResult.SUCCESS
def connect_samr_aes(pipe,
url,
hostname,
username,
password,
domain,
realm,
workstation,
dn):
# Get the user's NT hash.
user_creds = KerberosCredentials()
user_creds.set_password(password)
nt_hash = user_creds.get_nt_hash()
# Generate a new UTF-16 password.
new_password = generate_random_password(32, 32)
new_password = new_password.encode('utf-16le')
# Prepend the 16-bit length of the password..
new_password_len = int.to_bytes(len(new_password),
length=2,
byteorder='little')
new_password = new_password_len + new_password
server = lsa.String()
server.string = hostname
account = lsa.String()
account.string = username
# Derive a key from the user's NT hash.
iv = generate_random_bytes(16)
iterations = 5555
cek = sha512_pbkdf2(nt_hash, iv, iterations)
enc_key_salt = (b'Microsoft SAM encryption key '
b'AEAD-AES-256-CBC-HMAC-SHA512 16\0')
mac_key_salt = (b'Microsoft SAM MAC key '
b'AEAD-AES-256-CBC-HMAC-SHA512 16\0')
# Encrypt the new password.
ciphertext, auth_data = aead_aes_256_cbc_hmac_sha512_blob(new_password,
cek,
enc_key_salt,
mac_key_salt,
iv)
# Create the new password structure
pwd_buf = samr.EncryptedPasswordAES()
pwd_buf.auth_data = list(auth_data)
pwd_buf.salt = list(iv)
pwd_buf.cipher_len = len(ciphertext)
pwd_buf.cipher = list(ciphertext)
pwd_buf.PBKDF2Iterations = iterations
conn = samr.samr(f'ncacn_np:{hostname}[krb5,seal,smb2]')
# Indicate that we're ready. This ensures we hit the right transaction
# lock.
pipe.send_bytes(b'0')
# Wait for the main process to take out a transaction lock.
if not pipe.poll(timeout=5):
raise AssertionError('main process failed to indicate readiness')
try:
# Try changing the password. This should fail, either due to our
# account being locked out or due to using the wrong password.
conn.ChangePasswordUser4(server=server,
account=account,
password=pwd_buf)
except NTSTATUSError as err:
num, estr = err.args
if num == ntstatus.NT_STATUS_ACCOUNT_LOCKED_OUT:
return ConnectionResult.LOCKED_OUT
elif num == ntstatus.NT_STATUS_WRONG_PASSWORD:
return ConnectionResult.WRONG_PASSWORD
else:
raise AssertionError(f'pwd change raised wrong error code '
f'({num:08X})')
else:
return ConnectionResult.SUCCESS
def ldap_pwd_change(pipe,
url,
hostname,
username,
password,
domain,
realm,
workstation,
dn):
lp = env_loadparm()
admin_creds = KerberosCredentials()
admin_creds.guess(lp)
admin_creds.set_username(env_get_var_value('ADMIN_USERNAME'))
admin_creds.set_password(env_get_var_value('ADMIN_PASSWORD'))
admin_creds.set_kerberos_state(MUST_USE_KERBEROS)
samdb = SamDB(url=url,
credentials=admin_creds,
lp=lp)
old_utf16pw = f'"{password}"'.encode('utf-16le')
new_password = generate_random_password(32, 32)
new_utf16pw = f'"{new_password}"'.encode('utf-16le')
msg = ldb.Message(ldb.Dn(samdb, dn))
msg['0'] = ldb.MessageElement(old_utf16pw,
ldb.FLAG_MOD_DELETE,
'unicodePwd')
msg['1'] = ldb.MessageElement(new_utf16pw,
ldb.FLAG_MOD_ADD,
'unicodePwd')
# Indicate that we're ready. This ensures we hit the right transaction
# lock.
pipe.send_bytes(b'0')
# Wait for the main process to take out a transaction lock.
if not pipe.poll(timeout=5):
raise AssertionError('main process failed to indicate readiness')
# Try changing the user's password. This should fail, either due to the
# user's account being locked out or due to specifying the wrong password.
try:
samdb.modify(msg)
except ldb.LdbError as err:
num, estr = err.args
if num != ldb.ERR_CONSTRAINT_VIOLATION:
raise AssertionError(f'pwd change raised wrong error code ({err})')
if f'<{werror.WERR_ACCOUNT_LOCKED_OUT:08X}:' in estr:
return ConnectionResult.LOCKED_OUT
elif f'<{werror.WERR_INVALID_PASSWORD:08X}:' in estr:
return ConnectionResult.WRONG_PASSWORD
else:
raise AssertionError(f'pwd change raised wrong error code '
f'({estr})')
else:
return ConnectionResult.SUCCESS
class LockoutTests(KDCBaseTest):
def setUp(self):
super().setUp()
self.do_asn1_print = global_asn1_print
self.do_hexdump = global_hexdump
samdb = self.get_samdb()
base_dn = ldb.Dn(samdb, samdb.domain_dn())
def modify_attr(attr, value):
if value is None:
value = []
flag = ldb.FLAG_MOD_DELETE
else:
value = str(value)
flag = ldb.FLAG_MOD_REPLACE
msg = ldb.Message(base_dn)
msg[attr] = ldb.MessageElement(
value, flag, attr)
samdb.modify(msg)
res = samdb.search(base_dn,
scope=ldb.SCOPE_BASE,
attrs=['lockoutDuration',
'lockoutThreshold',
'msDS-LogonTimeSyncInterval'])
self.assertEqual(1, len(res))
# Reset the lockout duration as it was before.
lockout_duration = res[0].get('lockoutDuration', idx=0)
self.addCleanup(modify_attr, 'lockoutDuration', lockout_duration)
# Set the new lockout duration: locked out accounts now stay locked
# out.
modify_attr('lockoutDuration', 0)
# Reset the lockout threshold as it was before.
lockout_threshold = res[0].get('lockoutThreshold', idx=0)
self.addCleanup(modify_attr, 'lockoutThreshold', lockout_threshold)
# Set the new lockout threshold.
self.lockout_threshold = 3
modify_attr('lockoutThreshold', self.lockout_threshold)
# Reset the logon time sync interval as it was before.
sync_interval = res[0].get('msDS-LogonTimeSyncInterval', idx=0)
self.addCleanup(modify_attr,
'msDS-LogonTimeSyncInterval',
sync_interval)
# Set the new logon time sync interval. Setting it to 0 eliminates the
# need for this attribute to be updated on logon, and thus the
# requirement to take out a transaction.
modify_attr('msDS-LogonTimeSyncInterval', 0)
# Get the old 'minPwdAge'.
minPwdAge = samdb.get_minPwdAge()
# Reset the 'minPwdAge' as it was before.
self.addCleanup(samdb.set_minPwdAge, minPwdAge)
# Set it temporarily to '0'.
samdb.set_minPwdAge('0')
def assertLocalSamDB(self, samdb):
if samdb.url.startswith('tdb://'):
return
if samdb.url.startswith('mdb://'):
return
self.fail(f'connection to {samdb.url} is not local!')
def wait_for_ready(self, pipe, future):
if pipe.poll(timeout=5):
return
# We failed to read a response from the pipe, so see if the test raised
# an exception with more information.
if future.done():
exception = future.exception(timeout=0)
if exception is not None:
raise exception
self.fail('test failed to indicate readiness')
def test_lockout_transaction_kdc(self):
self.do_lockout_transaction(connect_kdc)
def test_lockout_transaction_ntlm(self):
self.do_lockout_transaction(connect_ntlm)
def test_lockout_transaction_samr(self):
self.do_lockout_transaction(connect_samr)
def test_lockout_transaction_samr_aes(self):
if not self.gnutls_pbkdf2_support:
self.skipTest('gnutls_pbkdf2() is not available')
self.do_lockout_transaction(connect_samr_aes)
def test_lockout_transaction_ldap_pw_change(self):
self.do_lockout_transaction(ldap_pwd_change)
# Tests to ensure we can handle the account being renamed. We do not test
# renames with SAMR password changes, because in that case the entire
# process happens inside a transaction, and the password change method only
# receives the account username. By the time it searches for the account,
# it will have already been renamed, and so it will always fail to find the
# account.
def test_lockout_transaction_rename_kdc(self):
self.do_lockout_transaction(connect_kdc, rename=True)
def test_lockout_transaction_rename_ntlm(self):
self.do_lockout_transaction(connect_ntlm, rename=True)
def test_lockout_transaction_rename_ldap_pw_change(self):
self.do_lockout_transaction(ldap_pwd_change, rename=True)
def test_lockout_transaction_bad_pwd_kdc(self):
self.do_lockout_transaction(connect_kdc, correct_pw=False)
def test_lockout_transaction_bad_pwd_ntlm(self):
self.do_lockout_transaction(connect_ntlm, correct_pw=False)
def test_lockout_transaction_bad_pwd_samr(self):
self.do_lockout_transaction(connect_samr, correct_pw=False)
def test_lockout_transaction_bad_pwd_samr_aes(self):
if not self.gnutls_pbkdf2_support:
self.skipTest('gnutls_pbkdf2() is not available')
self.do_lockout_transaction(connect_samr_aes, correct_pw=False)
def test_lockout_transaction_bad_pwd_ldap_pw_change(self):
self.do_lockout_transaction(ldap_pwd_change, correct_pw=False)
def test_bad_pwd_count_transaction_kdc(self):
self.do_bad_pwd_count_transaction(connect_kdc)
def test_bad_pwd_count_transaction_ntlm(self):
self.do_bad_pwd_count_transaction(connect_ntlm)
def test_bad_pwd_count_transaction_samr(self):
self.do_bad_pwd_count_transaction(connect_samr)
def test_bad_pwd_count_transaction_samr_aes(self):
if not self.gnutls_pbkdf2_support:
self.skipTest('gnutls_pbkdf2() is not available')
self.do_bad_pwd_count_transaction(connect_samr_aes)
def test_bad_pwd_count_transaction_ldap_pw_change(self):
self.do_bad_pwd_count_transaction(ldap_pwd_change)
def test_bad_pwd_count_transaction_rename_kdc(self):
self.do_bad_pwd_count_transaction(connect_kdc, rename=True)
def test_bad_pwd_count_transaction_rename_ntlm(self):
self.do_bad_pwd_count_transaction(connect_ntlm, rename=True)
def test_bad_pwd_count_transaction_rename_ldap_pw_change(self):
self.do_bad_pwd_count_transaction(ldap_pwd_change, rename=True)
def test_lockout_race_kdc(self):
self.do_lockout_race(connect_kdc)
def test_lockout_race_ntlm(self):
self.do_lockout_race(connect_ntlm)
def test_lockout_race_samr(self):
self.do_lockout_race(connect_samr)
def test_lockout_race_samr_aes(self):
if not self.gnutls_pbkdf2_support:
self.skipTest('gnutls_pbkdf2() is not available')
self.do_lockout_race(connect_samr_aes)
def test_lockout_race_ldap_pw_change(self):
self.do_lockout_race(ldap_pwd_change)
def test_logon_without_transaction_ntlm(self):
self.do_logon_without_transaction(connect_ntlm)
# Tests to ensure that the connection functions work correctly in the happy
# path.
def test_logon_kdc(self):
self.do_logon(partial(connect_kdc, expect_error=False))
def test_logon_ntlm(self):
self.do_logon(connect_ntlm)
def test_logon_samr(self):
self.do_logon(connect_samr)
def test_logon_samr_aes(self):
if not self.gnutls_pbkdf2_support:
self.skipTest('gnutls_pbkdf2() is not available')
self.do_logon(connect_samr_aes)
def test_logon_ldap_pw_change(self):
self.do_logon(ldap_pwd_change)
# Test that connection without a correct password works.
def do_logon(self, connect_fn):
# Create the user account for testing.
user_creds = self.get_cached_creds(account_type=self.AccountType.USER,
use_cache=False)
user_dn = user_creds.get_dn()
admin_creds = self.get_admin_creds()
lp = self.get_lp()
# Get a connection to our local SamDB.
samdb = connect_samdb(samdb_url=lp.samdb_url(), lp=lp,
credentials=admin_creds)
self.assertLocalSamDB(samdb)
password = user_creds.get_password()
# Prepare to connect to the server with a valid password.
our_pipe, their_pipe = Pipe(duplex=True)
# Inform the test function that it may proceed.
our_pipe.send_bytes(b'0')
result = connect_fn(pipe=their_pipe,
url=f'ldap://{samdb.host_dns_name()}',
hostname=samdb.host_dns_name(),
username=user_creds.get_username(),
password=password,
domain=user_creds.get_domain(),
realm=user_creds.get_realm(),
workstation=user_creds.get_workstation(),
dn=str(user_dn))
# The connection should succeed.
self.assertEqual(result, ConnectionResult.SUCCESS)
# Lock out the account while holding a transaction lock, then release the
# lock. A logon attempt already in progress should reread the account
# details and recognise the account is locked out. The account can
# additionally be renamed within the transaction to ensure that, by using
# the GUID, rereading the account's details still succeeds.
def do_lockout_transaction(self, connect_fn,
rename=False,
correct_pw=True):
# Create the user account for testing.
user_creds = self.get_cached_creds(account_type=self.AccountType.USER,
use_cache=False)
user_dn = user_creds.get_dn()
admin_creds = self.get_admin_creds()
lp = self.get_lp()
# Get a connection to our local SamDB.
samdb = connect_samdb(samdb_url=lp.samdb_url(), lp=lp,
credentials=admin_creds)
self.assertLocalSamDB(samdb)
password = user_creds.get_password()
if not correct_pw:
password = password[:-1]
# Prepare to connect to the server.
with futures.ProcessPoolExecutor(max_workers=1) as executor:
our_pipe, their_pipe = Pipe(duplex=True)
connect_future = executor.submit(
connect_fn,
pipe=their_pipe,
url=f'ldap://{samdb.host_dns_name()}',
hostname=samdb.host_dns_name(),
username=user_creds.get_username(),
password=password,
domain=user_creds.get_domain(),
realm=user_creds.get_realm(),
workstation=user_creds.get_workstation(),
dn=str(user_dn))
# Wait until the test process indicates it's ready.
self.wait_for_ready(our_pipe, connect_future)
# Take out a transaction.
samdb.transaction_start()
try:
# Lock out the account. We must do it using an actual password
# check like so, rather than directly with a database
# modification, so that the account is also added to the
# auxiliary bad password database.
old_utf16pw = f'"Secret007"'.encode('utf-16le') # invalid pwd
new_utf16pw = f'"Secret008"'.encode('utf-16le')
msg = ldb.Message(user_dn)
msg['0'] = ldb.MessageElement(old_utf16pw,
ldb.FLAG_MOD_DELETE,
'unicodePwd')
msg['1'] = ldb.MessageElement(new_utf16pw,
ldb.FLAG_MOD_ADD,
'unicodePwd')
for i in range(self.lockout_threshold):
try:
samdb.modify(msg)
except ldb.LdbError as err:
num, estr = err.args
# We get an error, but the bad password count should
# still be updated.
self.assertEqual(num, ldb.ERR_OPERATIONS_ERROR)
self.assertEqual('Failed to obtain remote address for '
'the LDAP client while changing the '
'password',
estr)
else:
self.fail('pwd change should have failed')
# Ensure the account is locked out.
res = samdb.search(
user_dn, scope=ldb.SCOPE_BASE,
attrs=['msDS-User-Account-Control-Computed'])
self.assertEqual(1, len(res))
uac = int(res[0].get('msDS-User-Account-Control-Computed',
idx=0))
self.assertTrue(uac & dsdb.UF_LOCKOUT)
# Now the bad password database has been updated, inform the
# test process that it may proceed.
our_pipe.send_bytes(b'0')
# Wait one second to ensure the test process hits the
# transaction lock.
time.sleep(1)
if rename:
# While we're at it, rename the account to ensure that is
# also safe if a race occurs.
msg = ldb.Message(user_dn)
new_username = self.get_new_username()
msg['sAMAccountName'] = ldb.MessageElement(
new_username,
ldb.FLAG_MOD_REPLACE,
'sAMAccountName')
samdb.modify(msg)
except Exception:
samdb.transaction_cancel()
raise
# Commit the local transaction.
samdb.transaction_commit()
result = connect_future.result(timeout=5)
self.assertEqual(result, ConnectionResult.LOCKED_OUT)
# Update the bad password count while holding a transaction lock, then
# release the lock. A logon attempt already in progress should reread the
# account details and ensure the bad password count is atomically
# updated. The account can additionally be renamed within the transaction
# to ensure that, by using the GUID, rereading the account's details still
# succeeds.
def do_bad_pwd_count_transaction(self, connect_fn, rename=False):
# Create the user account for testing.
user_creds = self.get_cached_creds(account_type=self.AccountType.USER,
use_cache=False)
user_dn = user_creds.get_dn()
admin_creds = self.get_admin_creds()
lp = self.get_lp()
# Get a connection to our local SamDB.
samdb = connect_samdb(samdb_url=lp.samdb_url(), lp=lp,
credentials=admin_creds)
self.assertLocalSamDB(samdb)
# Prepare to connect to the server with an invalid password.
with futures.ProcessPoolExecutor(max_workers=1) as executor:
our_pipe, their_pipe = Pipe(duplex=True)
connect_future = executor.submit(
connect_fn,
pipe=their_pipe,
url=f'ldap://{samdb.host_dns_name()}',
hostname=samdb.host_dns_name(),
username=user_creds.get_username(),
password=user_creds.get_password()[:-1], # invalid password
domain=user_creds.get_domain(),
realm=user_creds.get_realm(),
workstation=user_creds.get_workstation(),
dn=str(user_dn))
# Wait until the test process indicates it's ready.
self.wait_for_ready(our_pipe, connect_future)
# Take out a transaction.
samdb.transaction_start()
try:
# Inform the test process that it may proceed.
our_pipe.send_bytes(b'0')
# Wait one second to ensure the test process hits the
# transaction lock.
time.sleep(1)
# Set badPwdCount to 1.
msg = ldb.Message(user_dn)
now = int(time.time())
bad_pwd_time = unix2nttime(now)
msg['badPwdCount'] = ldb.MessageElement(
'1',
ldb.FLAG_MOD_REPLACE,
'badPwdCount')
msg['badPasswordTime'] = ldb.MessageElement(
str(bad_pwd_time),
ldb.FLAG_MOD_REPLACE,
'badPasswordTime')
if rename:
# While we're at it, rename the account to ensure that is
# also safe if a race occurs.
new_username = self.get_new_username()
msg['sAMAccountName'] = ldb.MessageElement(
new_username,
ldb.FLAG_MOD_REPLACE,
'sAMAccountName')
samdb.modify(msg)
# Ensure the account is not yet locked out.
res = samdb.search(
user_dn, scope=ldb.SCOPE_BASE,
attrs=['msDS-User-Account-Control-Computed'])
self.assertEqual(1, len(res))
uac = int(res[0].get('msDS-User-Account-Control-Computed',
idx=0))
self.assertFalse(uac & dsdb.UF_LOCKOUT)
except Exception:
samdb.transaction_cancel()
raise
# Commit the local transaction.
samdb.transaction_commit()
result = connect_future.result(timeout=5)
self.assertEqual(result, ConnectionResult.WRONG_PASSWORD, result)
# Check that badPwdCount has now increased to 2.
res = samdb.search(user_dn,
scope=ldb.SCOPE_BASE,
attrs=['badPwdCount'])
self.assertEqual(1, len(res))
bad_pwd_count = int(res[0].get('badPwdCount', idx=0))
self.assertEqual(2, bad_pwd_count)
# Attempt to log in to the account with an incorrect password, using
# lockoutThreshold+1 simultaneous attempts. We should get three 'wrong
# password' errors and one 'locked out' error, showing that the bad
# password count is checked and incremented atomically.
def do_lockout_race(self, connect_fn):
# Create the user account for testing.
user_creds = self.get_cached_creds(account_type=self.AccountType.USER,
use_cache=False)
user_dn = user_creds.get_dn()
admin_creds = self.get_admin_creds()
lp = self.get_lp()
# Get a connection to our local SamDB.
samdb = connect_samdb(samdb_url=lp.samdb_url(), lp=lp,
credentials=admin_creds)
self.assertLocalSamDB(samdb)
# Prepare to connect to the server with an invalid password, using four
# simultaneous requests. Only three of those attempts should get
# through before the account is locked out.
num_attempts = self.lockout_threshold + 1
with futures.ProcessPoolExecutor(max_workers=num_attempts) as executor:
connect_futures = []
our_pipes = []
for i in range(num_attempts):
our_pipe, their_pipe = Pipe(duplex=True)
our_pipes.append(our_pipe)
connect_future = executor.submit(
connect_fn,
pipe=their_pipe,
url=f'ldap://{samdb.host_dns_name()}',
hostname=samdb.host_dns_name(),
username=user_creds.get_username(),
password=user_creds.get_password()[:-1], # invalid pw
domain=user_creds.get_domain(),
realm=user_creds.get_realm(),
workstation=user_creds.get_workstation(),
dn=str(user_dn))
connect_futures.append(connect_future)
# Wait until the test process indicates it's ready.
self.wait_for_ready(our_pipe, connect_future)
# Take out a transaction.
samdb.transaction_start()
try:
# Inform the test processes that they may proceed.
for our_pipe in our_pipes:
our_pipe.send_bytes(b'0')
# Wait one second to ensure the test processes hit the
# transaction lock.
time.sleep(1)
except Exception:
samdb.transaction_cancel()
raise
# Commit the local transaction.
samdb.transaction_commit()
lockouts = 0
wrong_passwords = 0
for i, connect_future in enumerate(connect_futures):
result = connect_future.result(timeout=5)
if result == ConnectionResult.LOCKED_OUT:
lockouts += 1
elif result == ConnectionResult.WRONG_PASSWORD:
wrong_passwords += 1
else:
self.fail(f'process {i} gave an unexpected result '
f'{result}')
self.assertEqual(wrong_passwords, self.lockout_threshold)
self.assertEqual(lockouts, num_attempts - self.lockout_threshold)
# Ensure the account is now locked out.
res = samdb.search(
user_dn, scope=ldb.SCOPE_BASE,
attrs=['badPwdCount',
'msDS-User-Account-Control-Computed'])
self.assertEqual(1, len(res))
bad_pwd_count = int(res[0].get('badPwdCount', idx=0))
self.assertEqual(self.lockout_threshold, bad_pwd_count)
uac = int(res[0].get('msDS-User-Account-Control-Computed',
idx=0))
self.assertTrue(uac & dsdb.UF_LOCKOUT)
# Test that logon is possible even while we locally hold a transaction
# lock. This test only works with NTLM authentication; Kerberos
# authentication must take out a transaction to update the logonCount
# attribute, and LDAP and SAMR password changes both take out a transaction
# to effect the password change. NTLM is the only logon method that does
# not require a transaction, and can thus be performed while we're holding
# the lock.
def do_logon_without_transaction(self, connect_fn):
# Create the user account for testing.
user_creds = self.get_cached_creds(account_type=self.AccountType.USER,
use_cache=False)
user_dn = user_creds.get_dn()
admin_creds = self.get_admin_creds()
lp = self.get_lp()
# Get a connection to our local SamDB.
samdb = connect_samdb(samdb_url=lp.samdb_url(), lp=lp,
credentials=admin_creds)
self.assertLocalSamDB(samdb)
password = user_creds.get_password()
# Prepare to connect to the server with a valid password.
with futures.ProcessPoolExecutor(max_workers=1) as executor:
our_pipe, their_pipe = Pipe(duplex=True)
connect_future = executor.submit(
connect_fn,
pipe=their_pipe,
url=f'ldap://{samdb.host_dns_name()}',
hostname=samdb.host_dns_name(),
username=user_creds.get_username(),
password=password,
domain=user_creds.get_domain(),
realm=user_creds.get_realm(),
workstation=user_creds.get_workstation(),
dn=str(user_dn))
# Wait until the test process indicates it's ready.
self.wait_for_ready(our_pipe, connect_future)
# Take out a transaction.
samdb.transaction_start()
try:
# Inform the test process that it may proceed.
our_pipe.send_bytes(b'0')
# The connection should succeed, despite our holding a
# transaction.
result = connect_future.result(timeout=5)
self.assertEqual(result, ConnectionResult.SUCCESS)
except Exception:
samdb.transaction_cancel()
raise
# Commit the local transaction.
samdb.transaction_commit()
if __name__ == '__main__':
global_asn1_print = False
global_hexdump = False
import unittest
unittest.main()