diff --git a/python/samba/tests/krb5/kdc_base_test.py b/python/samba/tests/krb5/kdc_base_test.py index bf0e6642527..37c278e01dc 100644 --- a/python/samba/tests/krb5/kdc_base_test.py +++ b/python/samba/tests/krb5/kdc_base_test.py @@ -34,7 +34,13 @@ from enum import Enum from collections import namedtuple import ldb from ldb import SCOPE_BASE -from samba import NTSTATUSError, common, generate_random_password, ntstatus +from samba import ( + NTSTATUSError, + arcfour_encrypt, + common, + generate_random_password, + ntstatus, +) from samba.auth import system_session from samba.credentials import ( Credentials, @@ -42,6 +48,7 @@ from samba.credentials import ( DONT_USE_KERBEROS, MUST_USE_KERBEROS, ) +from samba.crypto import des_crypt_blob_16, md4_hash_blob from samba.dcerpc import ( claims, drsblobs, @@ -3171,51 +3178,132 @@ class KDCBaseTest(RawKerberosTest): # Test the two SAMR password change methods implemented in Samba. If the # user is protected, we should get an ACCOUNT_RESTRICTION error indicating - # that the password change is not allowed; otherwise we should get a - # WRONG_PASSWORD error. - def _test_samr_change_password(self, creds, expect_error): + # that the password change is not allowed. + def _test_samr_change_password(self, creds, expect_error, + connect_error=None): samdb = self.get_samdb() server_name = samdb.host_dns_name() - conn = samr.samr(f'ncacn_np:{server_name}[krb5,seal,smb2]') + try: + conn = samr.samr(f'ncacn_np:{server_name}[seal,smb2]', + self.get_lp(), + creds) + except NTSTATUSError as err: + self.assertIsNotNone(connect_error, + 'connection unexpectedly failed') + self.assertIsNone(expect_error, 'don’t specify both errors') - username = creds.get_username() + num, _ = err.args + self.assertEqual(num, connect_error) + + return + else: + self.assertIsNone(connect_error, 'expected connection to fail') + + # Get the NT hash. + nt_hash = creds.get_nt_hash() + + # Generate a new UTF-16 password. + new_password_str = generate_random_password(32, 32) + new_password = new_password_str.encode('utf-16le') + + # Generate the MD4 hash of the password. + new_password_md4 = md4_hash_blob(new_password) + + # Prefix the password with padding so it is 512 bytes long. + new_password_len = len(new_password) + remaining_len = 512 - new_password_len + new_password = bytes(remaining_len) + new_password + + # Append the 32-bit length of the password. + new_password += int.to_bytes(new_password_len, + length=4, + byteorder='little') + + # Create a key from the MD4 hash of the new password. + key = new_password_md4[:14] + + # Encrypt the old NT hash with DES to obtain the verifier. + verifier = des_crypt_blob_16(nt_hash, key) server = lsa.String() server.string = server_name account = lsa.String() - account.string = username + account.string = creds.get_username() + + nt_verifier = samr.Password() + nt_verifier.hash = list(verifier) nt_password = samr.CryptPassword() - nt_verifier = samr.Password() + nt_password.data = list(arcfour_encrypt(nt_hash, new_password)) if not self.expect_nt_hash: expect_error = ntstatus.NT_STATUS_NTLM_BLOCKED - with self.assertRaises(NTSTATUSError) as err: + try: conn.ChangePasswordUser2(server=server, account=account, nt_password=nt_password, nt_verifier=nt_verifier, - lm_change=True, + lm_change=False, lm_password=None, lm_verifier=None) + except NTSTATUSError as err: + num, _ = err.args + self.assertIsNotNone(expect_error, + f'unexpectedly failed with {num:08X}') + self.assertEqual(num, expect_error) + else: + self.assertIsNone(expect_error, 'expected to fail') - num, _ = err.exception.args - self.assertEqual(num, expect_error) + creds.set_password(new_password_str) - with self.assertRaises(NTSTATUSError) as err: + # Get the NT hash. + nt_hash = creds.get_nt_hash() + + # Generate a new UTF-16 password. + new_password = generate_random_password(32, 32) + new_password = new_password.encode('utf-16le') + + # Generate the MD4 hash of the password. + new_password_md4 = md4_hash_blob(new_password) + + # Prefix the password with padding so it is 512 bytes long. + new_password_len = len(new_password) + remaining_len = 512 - new_password_len + new_password = bytes(remaining_len) + new_password + + # Append the 32-bit length of the password. + new_password += int.to_bytes(new_password_len, + length=4, + byteorder='little') + + # Create a key from the MD4 hash of the new password. + key = new_password_md4[:14] + + # Encrypt the old NT hash with DES to obtain the verifier. + verifier = des_crypt_blob_16(nt_hash, key) + + nt_verifier.hash = list(verifier) + + nt_password.data = list(arcfour_encrypt(nt_hash, new_password)) + + try: conn.ChangePasswordUser3(server=server, account=account, nt_password=nt_password, nt_verifier=nt_verifier, - lm_change=True, + lm_change=False, lm_password=None, lm_verifier=None, password3=None) + except NTSTATUSError as err: + self.assertIsNotNone(expect_error, 'unexpectedly failed') - num, _ = err.exception.args - self.assertEqual(num, expect_error) + num, _ = err.args + self.assertEqual(num, expect_error) + else: + self.assertIsNone(expect_error, 'expected to fail') # Test SamLogon. Authentication should succeed for non-protected accounts, # and fail for protected accounts. diff --git a/python/samba/tests/krb5/protected_users_tests.py b/python/samba/tests/krb5/protected_users_tests.py index e29d7c0f6c9..27356cbd589 100755 --- a/python/samba/tests/krb5/protected_users_tests.py +++ b/python/samba/tests/krb5/protected_users_tests.py @@ -60,6 +60,17 @@ class ProtectedUsersTests(KDCBaseTest): self.do_asn1_print = global_asn1_print self.do_hexdump = global_hexdump + samdb = self.get_samdb() + + # Get the old ‘minPwdAge’. + minPwdAge = samdb.get_minPwdAge() + + # Reset the ‘minPwdAge’ as it was before. + self.addCleanup(samdb.set_minPwdAge, minPwdAge) + + # Set it temporarily to ‘0’. + samdb.set_minPwdAge('0') + # Get account credentials for testing. def _get_creds(self, protected, @@ -134,7 +145,7 @@ class ProtectedUsersTests(KDCBaseTest): self._test_samr_change_password( client_creds, - expect_error=ntstatus.NT_STATUS_WRONG_PASSWORD) + expect_error=None) def test_samr_change_password_protected(self): # Use a non-cached account so that it is not locked out for other