1
0
mirror of https://github.com/samba-team/samba.git synced 2025-02-25 17:57:42 +03:00

tests/krb5/raw_testcase.py: introduce a _generic_kdc_exchange() infrastructure

This will allow us to write tests, which will all cross check almost
every aspect of the KDC response (including encrypted parts).

Signed-off-by: Stefan Metzmacher <metze@samba.org>
Reviewed-by: Andrew Bartlett <abartlet@samba.org>
This commit is contained in:
Stefan Metzmacher 2020-04-21 11:07:45 +02:00
parent 69ce2a6408
commit 6e2f2adc8e
2 changed files with 645 additions and 0 deletions

View File

@ -30,6 +30,27 @@ from samba.credentials import Credentials
from samba.tests import TestCaseInTempDir
from samba.dcerpc import security
import samba.tests.krb5.rfc4120_pyasn1 as krb5_asn1
from samba.tests.krb5.rfc4120_constants import (
KDC_ERR_ETYPE_NOSUPP,
KDC_ERR_PREAUTH_REQUIRED,
KRB_AS_REP,
KRB_AS_REQ,
KRB_ERROR,
KRB_TGS_REP,
KRB_TGS_REQ,
KU_AS_REP_ENC_PART,
KU_TGS_REP_ENC_PART_SESSION,
KU_TGS_REP_ENC_PART_SUB_KEY,
KU_TGS_REQ_AUTH,
KU_TGS_REQ_AUTH_CKSUM,
KU_TICKET,
PADATA_ENC_TIMESTAMP,
PADATA_ETYPE_INFO,
PADATA_ETYPE_INFO2,
PADATA_KDC_REQ,
PADATA_PK_AS_REQ,
PADATA_PK_AS_REP_19
)
import samba.tests.krb5.kcrypto as kcrypto
from pyasn1.codec.der.decoder import decode as pyasn1_der_decode
@ -272,6 +293,24 @@ class KerberosCredentials(Credentials):
def get_forced_salt(self):
return self.forced_salt
class KerberosTicketCreds(object):
def __init__(self, ticket, session_key,
crealm=None, cname=None,
srealm=None, sname=None,
decryption_key=None,
ticket_private=None,
encpart_private=None):
self.ticket = ticket
self.session_key = session_key
self.crealm = crealm
self.cname = cname
self.srealm = srealm
self.sname = sname
self.decryption_key = decryption_key
self.ticket_private = ticket_private
self.encpart_private = encpart_private
return
class RawKerberosTest(TestCaseInTempDir):
"""A raw Kerberos Test case."""
@ -758,6 +797,12 @@ class RawKerberosTest(TestCaseInTempDir):
(s, _) = self.get_KerberosTimeWithUsec(epoch=epoch, offset=offset)
return s
def get_Nonce(self):
nonce_min=0x7f000000
nonce_max=0x7fffffff
v = random.randint(nonce_min, nonce_max)
return v
def SessionKey_create(self, etype, contents, kvno=None):
key = kcrypto.Key(etype, contents)
return Krb5EncryptionKey(key, kvno)
@ -1268,3 +1313,592 @@ class RawKerberosTest(TestCaseInTempDir):
pa_s4u2self = self.der_encode(
PA_S4U2Self_obj, asn1Spec=krb5_asn1.PA_S4U2Self())
return self.PA_DATA_create(129, pa_s4u2self)
def _generic_kdc_exchange(self,
kdc_exchange_dict, # required
kdc_options=None, # required
cname=None, # optional
realm=None, # required
sname=None, # optional
from_time=None, # optional
till_time=None, # required
renew_time=None, # optional
nonce=None, # required
etypes=None, # required
addresses=None, # optional
EncAuthorizationData=None, # optional
EncAuthorizationData_key=None, # optional
additional_tickets=None): # optional
check_error_fn = kdc_exchange_dict['check_error_fn']
check_rep_fn = kdc_exchange_dict['check_rep_fn']
generate_padata_fn = kdc_exchange_dict['generate_padata_fn']
callback_dict = kdc_exchange_dict['callback_dict']
req_msg_type = kdc_exchange_dict['req_msg_type']
req_asn1Spec = kdc_exchange_dict['req_asn1Spec']
rep_msg_type = kdc_exchange_dict['rep_msg_type']
if till_time is None:
till_time = self.get_KerberosTime(offset=36000)
if nonce is None:
nonce = self.get_Nonce()
req_body = self.KDC_REQ_BODY_create(kdc_options=kdc_options,
cname=cname,
realm=realm,
sname=sname,
from_time=from_time,
till_time=till_time,
renew_time=renew_time,
nonce=nonce,
etypes=etypes,
addresses=addresses,
EncAuthorizationData=EncAuthorizationData,
EncAuthorizationData_key=EncAuthorizationData_key,
additional_tickets=additional_tickets)
if generate_padata_fn is not None:
# This can alter req_body...
padata, req_body = generate_padata_fn(kdc_exchange_dict,
callback_dict,
req_body)
else:
padata = None
kdc_exchange_dict['req_padata'] = padata
kdc_exchange_dict['req_body'] = req_body
req_obj,req_decoded = self.KDC_REQ_create(msg_type=req_msg_type,
padata=padata,
req_body=req_body,
asn1Spec=req_asn1Spec())
rep = self.send_recv_transaction(req_decoded)
self.assertIsNotNone(rep)
msg_type = self.getElementValue(rep, 'msg-type')
self.assertIsNotNone(msg_type)
allowed_msg_types = ()
if check_error_fn is not None:
allowed_msg_types = (KRB_ERROR,)
if check_rep_fn is not None:
allowed_msg_types += (rep_msg_type,)
self.assertIn(msg_type, allowed_msg_types)
if msg_type == KRB_ERROR:
return check_error_fn(kdc_exchange_dict,
callback_dict,
rep)
return check_rep_fn(kdc_exchange_dict, callback_dict, rep)
def as_exchange_dict(self,
expected_crealm=None,
expected_cname=None,
expected_srealm=None,
expected_sname=None,
ticket_decryption_key=None,
generate_padata_fn=None,
check_error_fn=None,
check_rep_fn=None,
check_padata_fn=None,
check_kdc_private_fn=None,
callback_dict=dict(),
expected_error_mode=None,
client_as_etypes=None,
expected_salt=None):
kdc_exchange_dict = {
'req_msg_type': KRB_AS_REQ,
'req_asn1Spec': krb5_asn1.AS_REQ,
'rep_msg_type': KRB_AS_REP,
'rep_asn1Spec': krb5_asn1.AS_REP,
'rep_encpart_asn1Spec': krb5_asn1.EncASRepPart,
'expected_crealm': expected_crealm,
'expected_cname': expected_cname,
'expected_srealm': expected_srealm,
'expected_sname': expected_sname,
'ticket_decryption_key': ticket_decryption_key,
'generate_padata_fn': generate_padata_fn,
'check_error_fn': check_error_fn,
'check_rep_fn': check_rep_fn,
'check_padata_fn': check_padata_fn,
'check_kdc_private_fn': check_kdc_private_fn,
'callback_dict': callback_dict,
'expected_error_mode': expected_error_mode,
'client_as_etypes': client_as_etypes,
'expected_salt': expected_salt,
}
return kdc_exchange_dict
def tgs_exchange_dict(self,
expected_crealm=None,
expected_cname=None,
expected_srealm=None,
expected_sname=None,
ticket_decryption_key=None,
generate_padata_fn=None,
check_error_fn=None,
check_rep_fn=None,
check_padata_fn=None,
check_kdc_private_fn=None,
callback_dict=dict(),
tgt=None,
authenticator_subkey=None,
body_checksum_type=None):
kdc_exchange_dict = {
'req_msg_type': KRB_TGS_REQ,
'req_asn1Spec': krb5_asn1.TGS_REQ,
'rep_msg_type': KRB_TGS_REP,
'rep_asn1Spec': krb5_asn1.TGS_REP,
'rep_encpart_asn1Spec': krb5_asn1.EncTGSRepPart,
'expected_crealm': expected_crealm,
'expected_cname': expected_cname,
'expected_srealm': expected_srealm,
'expected_sname': expected_sname,
'ticket_decryption_key': ticket_decryption_key,
'generate_padata_fn': generate_padata_fn,
'check_error_fn': check_error_fn,
'check_rep_fn': check_rep_fn,
'check_padata_fn': check_padata_fn,
'check_kdc_private_fn': check_kdc_private_fn,
'callback_dict': callback_dict,
'tgt': tgt,
'body_checksum_type': body_checksum_type,
'authenticator_subkey': authenticator_subkey,
}
return kdc_exchange_dict
def generic_check_kdc_rep(self,
kdc_exchange_dict,
callback_dict,
rep):
expected_crealm = kdc_exchange_dict['expected_crealm']
expected_cname = kdc_exchange_dict['expected_cname']
expected_srealm = kdc_exchange_dict['expected_srealm']
expected_sname = kdc_exchange_dict['expected_sname']
ticket_decryption_key = kdc_exchange_dict['ticket_decryption_key']
check_padata_fn = kdc_exchange_dict['check_padata_fn']
check_kdc_private_fn = kdc_exchange_dict['check_kdc_private_fn']
rep_encpart_asn1Spec = kdc_exchange_dict['rep_encpart_asn1Spec']
msg_type = kdc_exchange_dict['rep_msg_type']
self.assertElementEqual(rep, 'msg-type', msg_type) # AS-REP | TGS-REP
padata = self.getElementValue(rep, 'padata')
self.assertElementEqualUTF8(rep, 'crealm', expected_crealm)
self.assertElementEqualPrincipal(rep, 'cname', expected_cname)
self.assertElementPresent(rep, 'ticket')
ticket = self.getElementValue(rep, 'ticket')
ticket_encpart = None
ticket_cipher = None
if ticket is not None: # Never None, but gives indentation
self.assertElementPresent(ticket, 'tkt-vno')
self.assertElementEqualUTF8(ticket, 'realm', expected_srealm)
self.assertElementEqualPrincipal(ticket, 'sname', expected_sname)
self.assertElementPresent(ticket, 'enc-part')
ticket_encpart = self.getElementValue(ticket, 'enc-part')
if ticket_encpart is not None: # Never None, but gives indentation
self.assertElementPresent(ticket_encpart, 'etype')
# 0 means present, with any value != 0
self.assertElementKVNO(ticket_encpart, 'kvno', 0)
self.assertElementPresent(ticket_encpart, 'cipher')
ticket_cipher = self.getElementValue(ticket_encpart, 'cipher')
self.assertElementPresent(rep, 'enc-part')
encpart = self.getElementValue(rep, 'enc-part')
encpart_cipher = None
if encpart is not None: # Never None, but gives indentation
self.assertElementPresent(encpart, 'etype')
self.assertElementKVNO(ticket_encpart, 'kvno', 'autodetect')
self.assertElementPresent(encpart, 'cipher')
encpart_cipher = self.getElementValue(encpart, 'cipher')
encpart_decryption_key = None
if check_padata_fn is not None:
# See if get the decryption key from the preauth phase
encpart_decryption_key,encpart_decryption_usage = \
check_padata_fn(kdc_exchange_dict, callback_dict,
rep, padata)
ticket_private = None
if ticket_decryption_key is not None:
self.assertElementEqual(ticket_encpart, 'etype', ticket_decryption_key.etype)
self.assertElementKVNO(ticket_encpart, 'kvno', ticket_decryption_key.kvno)
ticket_decpart = ticket_decryption_key.decrypt(KU_TICKET, ticket_cipher)
ticket_private = self.der_decode(ticket_decpart, asn1Spec=krb5_asn1.EncTicketPart())
encpart_private = None
if encpart_decryption_key is not None:
self.assertElementEqual(encpart, 'etype', encpart_decryption_key.etype)
self.assertElementKVNO(encpart, 'kvno', encpart_decryption_key.kvno)
rep_decpart = encpart_decryption_key.decrypt(encpart_decryption_usage, encpart_cipher)
encpart_private = self.der_decode(rep_decpart, asn1Spec=rep_encpart_asn1Spec())
if check_kdc_private_fn is not None:
check_kdc_private_fn(kdc_exchange_dict, callback_dict,
rep, ticket_private, encpart_private)
return rep
def generic_check_kdc_private(self,
kdc_exchange_dict,
callback_dict,
rep,
ticket_private,
encpart_private):
expected_crealm = kdc_exchange_dict['expected_crealm']
expected_cname = kdc_exchange_dict['expected_cname']
expected_srealm = kdc_exchange_dict['expected_srealm']
expected_sname = kdc_exchange_dict['expected_sname']
ticket_decryption_key = kdc_exchange_dict['ticket_decryption_key']
ticket = self.getElementValue(rep, 'ticket')
ticket_session_key = None
if ticket_private is not None:
self.assertElementPresent(ticket_private, 'flags')
self.assertElementPresent(ticket_private, 'key')
ticket_key = self.getElementValue(ticket_private, 'key')
if ticket_key is not None: # Never None, but gives indentation
self.assertElementPresent(ticket_key, 'keytype')
self.assertElementPresent(ticket_key, 'keyvalue')
ticket_session_key = self.EncryptionKey_import(ticket_key)
self.assertElementEqualUTF8(ticket_private, 'crealm', expected_crealm)
self.assertElementEqualPrincipal(ticket_private, 'cname', expected_cname)
self.assertElementPresent(ticket_private, 'transited')
self.assertElementPresent(ticket_private, 'authtime')
if self.strict_checking:
self.assertElementPresent(ticket_private, 'starttime')
self.assertElementPresent(ticket_private, 'endtime')
# TODO self.assertElementPresent(ticket_private, 'renew-till')
# TODO self.assertElementMissing(ticket_private, 'caddr')
self.assertElementPresent(ticket_private, 'authorization-data')
encpart_session_key = None
if encpart_private is not None:
self.assertElementPresent(encpart_private, 'key')
encpart_key = self.getElementValue(encpart_private, 'key')
if encpart_key is not None: # Never None, but gives indentation
self.assertElementPresent(encpart_key, 'keytype')
self.assertElementPresent(encpart_key, 'keyvalue')
encpart_session_key = self.EncryptionKey_import(encpart_key)
self.assertElementPresent(encpart_private, 'last-req')
self.assertElementPresent(encpart_private, 'nonce')
# TODO self.assertElementPresent(encpart_private, 'key-expiration')
self.assertElementPresent(encpart_private, 'flags')
self.assertElementPresent(encpart_private, 'authtime')
if self.strict_checking:
self.assertElementPresent(encpart_private, 'starttime')
self.assertElementPresent(encpart_private, 'endtime')
# TODO self.assertElementPresent(encpart_private, 'renew-till')
self.assertElementEqualUTF8(encpart_private, 'srealm', expected_srealm)
self.assertElementEqualPrincipal(encpart_private, 'sname', expected_sname)
# TODO self.assertElementMissing(encpart_private, 'caddr')
if ticket_session_key is not None and encpart_session_key is not None:
self.assertEqual(ticket_session_key.etype, encpart_session_key.etype)
self.assertEqual(ticket_session_key.key.contents, encpart_session_key.key.contents)
if encpart_session_key is not None:
session_key = encpart_session_key
else:
session_key = ticket_session_key
ticket_creds = KerberosTicketCreds(ticket,
session_key,
crealm=expected_crealm,
cname=expected_cname,
srealm=expected_srealm,
sname=expected_sname,
decryption_key=ticket_decryption_key,
ticket_private=ticket_private,
encpart_private=encpart_private)
kdc_exchange_dict['rep_ticket_creds'] = ticket_creds
return
def generic_check_as_error(self,
kdc_exchange_dict,
callback_dict,
rep):
expected_crealm = kdc_exchange_dict['expected_crealm']
expected_cname = kdc_exchange_dict['expected_cname']
expected_srealm = kdc_exchange_dict['expected_srealm']
expected_sname = kdc_exchange_dict['expected_sname']
expected_salt = kdc_exchange_dict['expected_salt']
client_as_etypes = kdc_exchange_dict['client_as_etypes']
expected_error_mode = kdc_exchange_dict['expected_error_mode']
req_body = kdc_exchange_dict['req_body']
proposed_etypes = req_body['etype']
kdc_exchange_dict['preauth_etype_info2'] = None
expect_etype_info2 = ()
expect_etype_info = False
unexpect_etype_info = True
expected_aes_type = 0
expected_rc4_type = 0
if kcrypto.Enctype.RC4 in proposed_etypes:
expect_etype_info = True
for etype in proposed_etypes:
if etype in (kcrypto.Enctype.AES256,kcrypto.Enctype.AES128):
expect_etype_info = False
if etype not in client_as_etypes:
continue
if etype in (kcrypto.Enctype.AES256,kcrypto.Enctype.AES128):
if etype > expected_aes_type:
expected_aes_type = etype
if etype in (kcrypto.Enctype.RC4,):
unexpect_etype_info = False
if etype > expected_rc4_type:
expected_rc4_type = etype
if expected_aes_type != 0:
expect_etype_info2 += (expected_aes_type,)
if expected_rc4_type != 0:
expect_etype_info2 += (expected_rc4_type,)
expected_error = KDC_ERR_ETYPE_NOSUPP
expected_patypes = ()
if expect_etype_info:
self.assertGreater(len(expect_etype_info2), 0)
expected_patypes += (PADATA_ETYPE_INFO,)
if len(expect_etype_info2) != 0:
expected_error = KDC_ERR_PREAUTH_REQUIRED
expected_patypes += (PADATA_ETYPE_INFO2,)
expected_patypes += (PADATA_ENC_TIMESTAMP,)
expected_patypes += (PADATA_PK_AS_REQ,)
expected_patypes += (PADATA_PK_AS_REP_19,)
self.assertElementEqual(rep, 'msg-type', KRB_ERROR)
self.assertElementEqual(rep, 'error-code', expected_error)
self.assertElementMissing(rep, 'ctime')
self.assertElementMissing(rep, 'cusec')
self.assertElementPresent(rep, 'stime')
self.assertElementPresent(rep, 'susec')
# error-code checked above
if self.strict_checking:
self.assertElementMissing(rep, 'crealm')
self.assertElementMissing(rep, 'cname')
self.assertElementEqualUTF8(rep, 'realm', expected_srealm)
self.assertElementEqualPrincipal(rep, 'sname', expected_sname)
if self.strict_checking:
self.assertElementMissing(rep, 'e-text')
if expected_error_mode != KDC_ERR_PREAUTH_REQUIRED:
self.assertElementMissing(rep, 'e-data')
return
edata = self.getElementValue(rep, 'e-data')
if self.strict_checking:
self.assertIsNotNone(edata)
if edata is not None:
rep_padata = self.der_decode(edata, asn1Spec=krb5_asn1.METHOD_DATA())
self.assertGreater(len(rep_padata), 0)
else:
rep_padata = []
if self.strict_checking:
for i in range(0, len(expected_patypes)):
self.assertElementEqual(rep_padata[i], 'padata-type', expected_patypes[i])
self.assertEqual(len(rep_padata), len(expected_patypes))
etype_info2 = None
etype_info = None
enc_timestamp = None
pk_as_req = None
pk_as_rep19 = None
for pa in rep_padata:
patype = self.getElementValue(pa, 'padata-type')
pavalue = self.getElementValue(pa, 'padata-value')
if patype == PADATA_ETYPE_INFO2:
self.assertIsNone(etype_info2)
etype_info2 = self.der_decode(pavalue, asn1Spec=krb5_asn1.ETYPE_INFO2())
continue
if patype == PADATA_ETYPE_INFO:
self.assertIsNone(etype_info)
etype_info = self.der_decode(pavalue, asn1Spec=krb5_asn1.ETYPE_INFO())
continue
if patype == PADATA_ENC_TIMESTAMP:
self.assertIsNone(enc_timestamp)
enc_timestamp = pavalue
self.assertEqual(len(enc_timestamp), 0)
continue
if patype == PADATA_PK_AS_REQ:
self.assertIsNone(pk_as_req)
pk_as_req = pavalue
self.assertEqual(len(pk_as_req), 0)
continue
if patype == PADATA_PK_AS_REP_19:
self.assertIsNone(pk_as_rep19)
pk_as_rep19 = pavalue
self.assertEqual(len(pk_as_rep19), 0)
continue
if expected_error == KDC_ERR_ETYPE_NOSUPP:
self.assertIsNone(etype_info2)
self.assertIsNone(etype_info)
if self.strict_checking:
self.assertIsNotNone(enc_timestamp)
self.assertIsNotNone(pk_as_req)
self.assertIsNotNone(pk_as_rep19)
return
self.assertIsNotNone(etype_info2)
if expect_etype_info:
self.assertIsNotNone(etype_info)
else:
if self.strict_checking:
self.assertIsNone(etype_info)
if unexpect_etype_info:
self.assertIsNone(etype_info)
self.assertGreaterEqual(len(etype_info2), 1)
self.assertLessEqual(len(etype_info2), len(expect_etype_info2))
if self.strict_checking:
self.assertEqual(len(etype_info2), len(expect_etype_info2))
for i in range(0, len(etype_info2)):
e = self.getElementValue(etype_info2[i], 'etype')
self.assertEqual(e, expect_etype_info2[i])
salt = self.getElementValue(etype_info2[i], 'salt')
if e == kcrypto.Enctype.RC4:
self.assertIsNone(salt)
else:
self.assertIsNotNone(salt)
if expected_salt is not None:
self.assertEqual(salt, expected_salt)
s2kparams = self.getElementValue(etype_info2[i], 's2kparams')
if self.strict_checking:
self.assertIsNone(s2kparams)
if etype_info is not None:
self.assertEqual(len(etype_info), 1)
e = self.getElementValue(etype_info[0], 'etype')
self.assertEqual(e, kcrypto.Enctype.RC4)
self.assertEqual(e, expect_etype_info2[0])
salt = self.getElementValue(etype_info[0], 'salt')
if self.strict_checking:
self.assertIsNotNone(salt)
self.assertEqual(len(salt), 0)
self.assertIsNotNone(enc_timestamp)
self.assertIsNotNone(pk_as_req)
self.assertIsNotNone(pk_as_rep19)
kdc_exchange_dict['preauth_etype_info2'] = etype_info2
return
def generate_simple_tgs_padata(self,
kdc_exchange_dict,
callback_dict,
req_body):
tgt = kdc_exchange_dict['tgt']
authenticator_subkey = kdc_exchange_dict['authenticator_subkey']
body_checksum_type = kdc_exchange_dict['body_checksum_type']
req_body_blob = self.der_encode(req_body, asn1Spec=krb5_asn1.KDC_REQ_BODY())
req_body_checksum = self.Checksum_create(tgt.session_key,
KU_TGS_REQ_AUTH_CKSUM,
req_body_blob,
ctype=body_checksum_type)
subkey_obj = None
if authenticator_subkey is not None:
subkey_obj = authenticator_subkey.export_obj()
seq_number = random.randint(0, 0xfffffffe)
(ctime, cusec) = self.get_KerberosTimeWithUsec()
authenticator_obj = self.Authenticator_create(crealm=tgt.crealm,
cname=tgt.cname,
cksum=req_body_checksum,
cusec=cusec,
ctime=ctime,
subkey=subkey_obj,
seq_number=seq_number,
authorization_data=None)
authenticator_blob = self.der_encode(authenticator_obj, asn1Spec=krb5_asn1.Authenticator())
authenticator = self.EncryptedData_create(tgt.session_key,
KU_TGS_REQ_AUTH,
authenticator_blob)
ap_options = krb5_asn1.APOptions('0')
ap_req_obj = self.AP_REQ_create(ap_options=str(ap_options),
ticket=tgt.ticket,
authenticator=authenticator)
ap_req = self.der_encode(ap_req_obj, asn1Spec=krb5_asn1.AP_REQ())
pa_tgs_req = self.PA_DATA_create(PADATA_KDC_REQ, ap_req)
padata = [pa_tgs_req]
return padata, req_body
def check_simple_tgs_padata(self,
kdc_exchange_dict,
callback_dict,
rep,
padata):
tgt = kdc_exchange_dict['tgt']
authenticator_subkey = kdc_exchange_dict['authenticator_subkey']
if authenticator_subkey is not None:
subkey = authenticator_subkey
subkey_usage = KU_TGS_REP_ENC_PART_SUB_KEY
else:
subkey = tgt.session_key
subkey_usage = KU_TGS_REP_ENC_PART_SESSION
return subkey, subkey_usage
def _test_as_exchange(self,
cname,
realm,
sname,
till,
client_as_etypes,
expected_error_mode,
expected_crealm,
expected_cname,
expected_srealm,
expected_sname,
expected_salt,
etypes,
padata,
kdc_options,
preauth_key=None,
ticket_decryption_key=None):
def _generate_padata_copy(_kdc_exchange_dict,
_callback_dict,
req_body):
return padata, req_body
def _check_padata_preauth_key(_kdc_exchange_dict,
_callback_dict,
rep,
padata):
as_rep_usage = KU_AS_REP_ENC_PART
return preauth_key, as_rep_usage
kdc_exchange_dict = self.as_exchange_dict(
expected_crealm=expected_crealm,
expected_cname=expected_cname,
expected_srealm=expected_srealm,
expected_sname=expected_sname,
ticket_decryption_key=ticket_decryption_key,
generate_padata_fn=_generate_padata_copy,
check_error_fn=self.generic_check_as_error,
check_rep_fn=self.generic_check_kdc_rep,
check_padata_fn=_check_padata_preauth_key,
check_kdc_private_fn=self.generic_check_kdc_private,
expected_error_mode=expected_error_mode,
client_as_etypes=client_as_etypes,
expected_salt=expected_salt)
rep = self._generic_kdc_exchange(kdc_exchange_dict,
kdc_options=str(kdc_options),
cname=cname,
realm=realm,
sname=sname,
till_time=till,
etypes=etypes)
if expected_error_mode == 0: # AS-REP
return rep
return kdc_exchange_dict['preauth_etype_info2']

View File

@ -28,16 +28,27 @@ ARCFOUR_HMAC_MD5 = int(
# Message types
KRB_ERROR = int(krb5_asn1.MessageTypeValues('krb-error'))
KRB_AS_REP = int(krb5_asn1.MessageTypeValues('krb-as-rep'))
KRB_AS_REQ = int(krb5_asn1.MessageTypeValues('krb-as-req'))
KRB_TGS_REP = int(krb5_asn1.MessageTypeValues('krb-tgs-rep'))
KRB_TGS_REQ = int(krb5_asn1.MessageTypeValues('krb-tgs-req'))
# PAData types
PADATA_ENC_TIMESTAMP = int(
krb5_asn1.PADataTypeValues('kRB5-PADATA-ENC-TIMESTAMP'))
PADATA_ETYPE_INFO = int(
krb5_asn1.PADataTypeValues('kRB5-PADATA-ETYPE-INFO'))
PADATA_ETYPE_INFO2 = int(
krb5_asn1.PADataTypeValues('kRB5-PADATA-ETYPE-INFO2'))
PADATA_KDC_REQ = int(
krb5_asn1.PADataTypeValues('kRB5-PADATA-KDC-REQ'))
PADATA_PK_AS_REQ = int(
krb5_asn1.PADataTypeValues('kRB5-PADATA-PK-AS-REQ'))
PADATA_PK_AS_REP_19 = int(
krb5_asn1.PADataTypeValues('kRB5-PADATA-PK-AS-REP-19'))
# Error codes
KDC_ERR_C_PRINCIPAL_UNKNOWN = 6
KDC_ERR_ETYPE_NOSUPP = 14
KDC_ERR_PREAUTH_FAILED = 24
KDC_ERR_PREAUTH_REQUIRED = 25
KDC_ERR_BADMATCH = 36