1
0
mirror of https://github.com/samba-team/samba.git synced 2025-02-04 17:47:26 +03:00

CVE-2022-2031 tests/krb5: Add kpasswd_exchange() method

Now we can test the kpasswd service from Python.

BUG: https://bugzilla.samba.org/show_bug.cgi?id=15047
BUG: https://bugzilla.samba.org/show_bug.cgi?id=15049
BUG: https://bugzilla.samba.org/show_bug.cgi?id=15074

Signed-off-by: Joseph Sutton <josephsutton@catalyst.net.nz>
Reviewed-by: Andreas Schneider <asn@samba.org>
This commit is contained in:
Joseph Sutton 2022-05-24 19:57:57 +12:00 committed by Jule Anger
parent 332fd6032a
commit 6a2ec50bfd

View File

@ -26,6 +26,8 @@ import binascii
import itertools
import collections
from enum import Enum
from pyasn1.codec.der.decoder import decode as pyasn1_der_decode
from pyasn1.codec.der.encoder import encode as pyasn1_der_encode
from pyasn1.codec.native.decoder import decode as pyasn1_native_decode
@ -33,6 +35,8 @@ from pyasn1.codec.native.encoder import encode as pyasn1_native_encode
from pyasn1.codec.ber.encoder import BitStringEncoder
from pyasn1.error import PyAsn1Error
from samba.credentials import Credentials
from samba.dcerpc import krb5pac, security
from samba.gensec import FEATURE_SEAL
@ -52,6 +56,7 @@ from samba.tests.krb5.rfc4120_constants import (
KDC_ERR_SKEW,
KDC_ERR_UNKNOWN_CRITICAL_FAST_OPTIONS,
KERB_ERR_TYPE_EXTENDED,
KRB_AP_REP,
KRB_AP_REQ,
KRB_AS_REP,
KRB_AS_REQ,
@ -61,6 +66,7 @@ from samba.tests.krb5.rfc4120_constants import (
KRB_TGS_REQ,
KU_AP_REQ_AUTH,
KU_AS_REP_ENC_PART,
KU_AP_REQ_ENC_PART,
KU_AS_REQ,
KU_ENC_CHALLENGE_KDC,
KU_FAST_ENC,
@ -76,6 +82,7 @@ from samba.tests.krb5.rfc4120_constants import (
KU_TGS_REQ_AUTH_DAT_SESSION,
KU_TGS_REQ_AUTH_DAT_SUBKEY,
KU_TICKET,
NT_PRINCIPAL,
NT_SRV_INST,
NT_WELLKNOWN,
PADATA_ENCRYPTED_CHALLENGE,
@ -525,6 +532,10 @@ class KerberosTicketCreds:
class RawKerberosTest(TestCaseInTempDir):
"""A raw Kerberos Test case."""
class KpasswdMode(Enum):
SET = object()
CHANGE = object()
pac_checksum_types = {krb5pac.PAC_TYPE_SRV_CHECKSUM,
krb5pac.PAC_TYPE_KDC_CHECKSUM,
krb5pac.PAC_TYPE_TICKET_CHECKSUM}
@ -1931,6 +1942,224 @@ class RawKerberosTest(TestCaseInTempDir):
return msg
def get_enc_part(self, obj, key, usage):
self.assertElementEqual(obj, 'pvno', 5)
enc_part = obj['enc-part']
self.assertElementEqual(enc_part, 'etype', key.etype)
self.assertElementKVNO(enc_part, 'kvno', key.kvno)
enc_part = key.decrypt(usage, enc_part['cipher'])
return enc_part
def kpasswd_exchange(self,
ticket,
new_password,
expected_code,
expected_msg,
mode,
target_princ=None,
target_realm=None,
ap_options=None,
send_seq_number=True):
if mode is self.KpasswdMode.SET:
version = 0xff80
user_data = self.ChangePasswdDataMS_create(new_password,
target_princ,
target_realm)
elif mode is self.KpasswdMode.CHANGE:
self.assertIsNone(target_princ,
'target_princ only valid for pw set')
self.assertIsNone(target_realm,
'target_realm only valid for pw set')
version = 1
user_data = new_password.encode('utf-8')
else:
self.fail(f'invalid mode {mode}')
subkey = self.RandomKey(kcrypto.Enctype.AES256)
if ap_options is None:
ap_options = '0'
ap_options = str(krb5_asn1.APOptions(ap_options))
kdc_exchange_dict = {
'tgt': ticket,
'authenticator_subkey': subkey,
'auth_data': None,
'ap_options': ap_options,
}
if send_seq_number:
seq_number = random.randint(0, 0xfffffffe)
else:
seq_number = None
ap_req = self.generate_ap_req(kdc_exchange_dict,
None,
req_body=None,
armor=False,
usage=KU_AP_REQ_AUTH,
seq_number=seq_number)
self.connect(self.host, port=464)
self.assertIsNotNone(self.s)
family = self.s.family
if family == socket.AF_INET:
addr_type = 2 # IPv4
elif family == socket.AF_INET6:
addr_type = 24 # IPv6
else:
self.fail(f'unknown family {family}')
def create_address(ip):
return {
'addr-type': addr_type,
'address': socket.inet_pton(family, ip),
}
local_ip = self.s.getsockname()[0]
local_address = create_address(local_ip)
# remote_ip = self.s.getpeername()[0]
# remote_address = create_address(remote_ip)
# TODO: due to a bug (?), MIT Kerberos will not accept the request
# unless r-address is set to our _local_ address. Heimdal, on the other
# hand, requires the r-address is set to the remote address (as
# expected). To avoid problems, avoid sending r-address for now.
remote_address = None
msg = self.kpasswd_create(subkey,
user_data,
version,
seq_number,
ap_req,
local_address,
remote_address)
self.send_msg(msg)
rep_pdu = self.recv_pdu_raw()
self._disconnect('transaction done')
self.assertIsNotNone(rep_pdu)
header = rep_pdu[:6]
reply = rep_pdu[6:]
reply_len = (header[0] << 8) | header[1]
reply_version = (header[2] << 8) | header[3]
ap_rep_len = (header[4] << 8) | header[5]
self.assertEqual(reply_len, len(rep_pdu))
self.assertEqual(1, reply_version) # KRB5_KPASSWD_VERS_CHANGEPW
self.assertLess(ap_rep_len, reply_len)
self.assertNotEqual(0x7e, rep_pdu[1])
self.assertNotEqual(0x5e, rep_pdu[1])
if ap_rep_len:
# We received an AP-REQ and KRB-PRIV as a response. This may or may
# not indicate an error, depending on the status code.
ap_rep = reply[:ap_rep_len]
krb_priv = reply[ap_rep_len:]
key = ticket.session_key
ap_rep = self.der_decode(ap_rep, asn1Spec=krb5_asn1.AP_REP())
self.assertElementEqual(ap_rep, 'msg-type', KRB_AP_REP)
enc_part = self.get_enc_part(ap_rep, key, KU_AP_REQ_ENC_PART)
enc_part = self.der_decode(
enc_part, asn1Spec=krb5_asn1.EncAPRepPart())
self.assertElementPresent(enc_part, 'ctime')
self.assertElementPresent(enc_part, 'cusec')
# self.assertElementMissing(enc_part, 'subkey') # TODO
# self.assertElementPresent(enc_part, 'seq-number') # TODO
try:
krb_priv = self.der_decode(krb_priv, asn1Spec=krb5_asn1.KRB_PRIV())
except PyAsn1Error:
self.fail()
self.assertElementEqual(krb_priv, 'msg-type', KRB_PRIV)
priv_enc_part = self.get_enc_part(krb_priv, subkey, KU_KRB_PRIV)
priv_enc_part = self.der_decode(
priv_enc_part, asn1Spec=krb5_asn1.EncKrbPrivPart())
self.assertElementMissing(priv_enc_part, 'timestamp')
self.assertElementMissing(priv_enc_part, 'usec')
# self.assertElementPresent(priv_enc_part, 'seq-number') # TODO
# self.assertElementEqual(priv_enc_part, 's-address', remote_address) # TODO
# self.assertElementMissing(priv_enc_part, 'r-address') # TODO
result_data = priv_enc_part['user-data']
else:
# We received a KRB-ERROR as a response, indicating an error.
krb_error = self.der_decode(reply, asn1Spec=krb5_asn1.KRB_ERROR())
sname = self.PrincipalName_create(
name_type=NT_PRINCIPAL,
names=['kadmin', 'changepw'])
realm = self.get_krbtgt_creds().get_realm().upper()
self.assertElementEqual(krb_error, 'pvno', 5)
self.assertElementEqual(krb_error, 'msg-type', KRB_ERROR)
self.assertElementMissing(krb_error, 'ctime')
self.assertElementMissing(krb_error, 'usec')
self.assertElementPresent(krb_error, 'stime')
self.assertElementPresent(krb_error, 'susec')
error_code = krb_error['error-code']
if isinstance(expected_code, int):
self.assertEqual(error_code, expected_code)
else:
self.assertIn(error_code, expected_code)
self.assertElementMissing(krb_error, 'crealm')
self.assertElementMissing(krb_error, 'cname')
self.assertElementEqual(krb_error, 'realm', realm.encode('utf-8'))
self.assertElementEqualPrincipal(krb_error, 'sname', sname)
self.assertElementMissing(krb_error, 'e-text')
result_data = krb_error['e-data']
status = result_data[:2]
message = result_data[2:]
status_code = (status[0] << 8) | status[1]
if isinstance(expected_code, int):
self.assertEqual(status_code, expected_code)
else:
self.assertIn(status_code, expected_code)
if not message:
self.assertEqual(0, status_code,
'got an error result, but no message')
return
# Check the first character of the message.
if message[0]:
if isinstance(expected_msg, bytes):
self.assertEqual(message, expected_msg)
else:
self.assertIn(message, expected_msg)
else:
# We got AD password policy information.
self.assertEqual(30, len(message))
(empty_bytes,
min_length,
history_length,
properties,
expire_time,
min_age) = struct.unpack('>HIIIQQ', message)
def _generic_kdc_exchange(self,
kdc_exchange_dict, # required
cname=None, # optional
@ -2041,7 +2270,7 @@ class RawKerberosTest(TestCaseInTempDir):
self.assertIsNotNone(generate_fast_fn)
fast_ap_req = generate_fast_armor_fn(kdc_exchange_dict,
callback_dict,
req_body,
None,
armor=True)
fast_armor_type = kdc_exchange_dict['fast_armor_type']
@ -3438,31 +3667,39 @@ class RawKerberosTest(TestCaseInTempDir):
kdc_exchange_dict,
_callback_dict,
req_body,
armor):
armor,
usage=None,
seq_number=None):
req_body_checksum = None
if armor:
self.assertIsNone(req_body)
tgt = kdc_exchange_dict['armor_tgt']
authenticator_subkey = kdc_exchange_dict['armor_subkey']
req_body_checksum = None
else:
tgt = kdc_exchange_dict['tgt']
authenticator_subkey = kdc_exchange_dict['authenticator_subkey']
body_checksum_type = kdc_exchange_dict['body_checksum_type']
req_body_blob = self.der_encode(req_body,
asn1Spec=krb5_asn1.KDC_REQ_BODY())
if req_body is not None:
body_checksum_type = kdc_exchange_dict['body_checksum_type']
req_body_checksum = self.Checksum_create(tgt.session_key,
KU_TGS_REQ_AUTH_CKSUM,
req_body_blob,
ctype=body_checksum_type)
req_body_blob = self.der_encode(
req_body, asn1Spec=krb5_asn1.KDC_REQ_BODY())
req_body_checksum = self.Checksum_create(
tgt.session_key,
KU_TGS_REQ_AUTH_CKSUM,
req_body_blob,
ctype=body_checksum_type)
auth_data = kdc_exchange_dict['auth_data']
subkey_obj = None
if authenticator_subkey is not None:
subkey_obj = authenticator_subkey.export_obj()
seq_number = random.randint(0, 0xfffffffe)
if seq_number is None:
seq_number = random.randint(0, 0xfffffffe)
(ctime, cusec) = self.get_KerberosTimeWithUsec()
authenticator_obj = self.Authenticator_create(
crealm=tgt.crealm,
@ -3477,7 +3714,8 @@ class RawKerberosTest(TestCaseInTempDir):
authenticator_obj,
asn1Spec=krb5_asn1.Authenticator())
usage = KU_AP_REQ_AUTH if armor else KU_TGS_REQ_AUTH
if usage is None:
usage = KU_AP_REQ_AUTH if armor else KU_TGS_REQ_AUTH
authenticator = self.EncryptedData_create(tgt.session_key,
usage,
authenticator_blob)