diff --git a/python/samba/tests/krb5/as_req_tests.py b/python/samba/tests/krb5/as_req_tests.py index 10e7b603609..09cfc9e1fc8 100755 --- a/python/samba/tests/krb5/as_req_tests.py +++ b/python/samba/tests/krb5/as_req_tests.py @@ -82,16 +82,16 @@ class AsReqKerberosTests(KDCBaseTest): return initial_padata, req_body kdc_exchange_dict = self.as_exchange_dict( - expected_crealm=expected_crealm, - expected_cname=expected_cname, - expected_srealm=expected_srealm, - expected_sname=expected_sname, - generate_padata_fn=_generate_padata_copy, - check_error_fn=self.generic_check_as_error, - check_rep_fn=self.generic_check_kdc_rep, - expected_error_mode=expected_error_mode, - client_as_etypes=client_as_etypes, - expected_salt=expected_salt) + expected_crealm=expected_crealm, + expected_cname=expected_cname, + expected_srealm=expected_srealm, + expected_sname=expected_sname, + generate_padata_fn=_generate_padata_copy, + check_error_fn=self.generic_check_as_error, + check_rep_fn=self.generic_check_kdc_rep, + expected_error_mode=expected_error_mode, + client_as_etypes=client_as_etypes, + expected_salt=expected_salt) rep = self._generic_kdc_exchange(kdc_exchange_dict, kdc_options=str(initial_kdc_options), diff --git a/python/samba/tests/krb5/kdc_base_test.py b/python/samba/tests/krb5/kdc_base_test.py index 4bd856b217e..c23c71e1d74 100644 --- a/python/samba/tests/krb5/kdc_base_test.py +++ b/python/samba/tests/krb5/kdc_base_test.py @@ -21,10 +21,7 @@ import os from datetime import datetime, timezone import tempfile import binascii -import struct -sys.path.insert(0, "bin/python") -os.environ["PYTHONUNBUFFERED"] = "1" from collections import namedtuple import ldb from ldb import SCOPE_BASE @@ -66,6 +63,9 @@ from samba.tests.krb5.rfc4120_constants import ( PADATA_ETYPE_INFO2, ) +sys.path.insert(0, "bin/python") +os.environ["PYTHONUNBUFFERED"] = "1" + global_asn1_print = False global_hexdump = False @@ -114,9 +114,9 @@ class KDCBaseTest(RawKerberosTest): session = system_session() type(self)._ldb = SamDB(url="ldap://%s" % self.host, - session_info=session, - credentials=creds, - lp=lp) + session_info=session, + credentials=creds, + lp=lp) return self._ldb @@ -337,6 +337,7 @@ class KDCBaseTest(RawKerberosTest): require_strongest_key=False): if require_strongest_key: self.assertTrue(require_keys) + def download_krbtgt_creds(): samdb = self.get_samdb() @@ -742,15 +743,16 @@ class KDCBaseTest(RawKerberosTest): .replace(tzinfo=timezone.utc).timestamp()) # Account for clock skew of up to five minutes. - self.assertLess(cred.authtime - 5*60, + self.assertLess(cred.authtime - 5 * 60, datetime.now(timezone.utc).timestamp(), "Ticket not yet valid - clocks may be out of sync.") - self.assertLess(cred.starttime - 5*60, + self.assertLess(cred.starttime - 5 * 60, datetime.now(timezone.utc).timestamp(), "Ticket not yet valid - clocks may be out of sync.") - self.assertGreater(cred.endtime - 60*60, + self.assertGreater(cred.endtime - 60 * 60, datetime.now(timezone.utc).timestamp(), - "Ticket already expired/about to expire - clocks may be out of sync.") + "Ticket already expired/about to expire - " + "clocks may be out of sync.") cred.renew_till = cred.endtime cred.is_skey = 0 diff --git a/python/samba/tests/krb5/raw_testcase.py b/python/samba/tests/krb5/raw_testcase.py index 9c090e4d005..de9c25751d2 100644 --- a/python/samba/tests/krb5/raw_testcase.py +++ b/python/samba/tests/krb5/raw_testcase.py @@ -24,11 +24,19 @@ import datetime import random import binascii import itertools +from pyasn1.codec.der.decoder import decode as pyasn1_der_decode +from pyasn1.codec.der.encoder import encode as pyasn1_der_encode +from pyasn1.codec.native.decoder import decode as pyasn1_native_decode +from pyasn1.codec.native.encoder import encode as pyasn1_native_encode + +from pyasn1.codec.ber.encoder import BitStringEncoder + +from samba.credentials import Credentials +from samba.dcerpc import security import samba.tests -from samba.credentials import Credentials from samba.tests import TestCaseInTempDir -from samba.dcerpc import security + import samba.tests.krb5.rfc4120_pyasn1 as krb5_asn1 from samba.tests.krb5.rfc4120_constants import ( KDC_ERR_ETYPE_NOSUPP, @@ -53,13 +61,6 @@ from samba.tests.krb5.rfc4120_constants import ( ) import samba.tests.krb5.kcrypto as kcrypto -from pyasn1.codec.der.decoder import decode as pyasn1_der_decode -from pyasn1.codec.der.encoder import encode as pyasn1_der_encode -from pyasn1.codec.native.decoder import decode as pyasn1_native_decode -from pyasn1.codec.native.encoder import encode as pyasn1_native_encode - -from pyasn1.codec.ber.encoder import BitStringEncoder as BitStringEncoder - def BitStringEncoder_encodeValue32( self, value, asn1Spec, encodeFun, **options): @@ -217,6 +218,7 @@ class Krb5EncryptionKey(object): } return EncryptionKey_obj + class KerberosCredentials(Credentials): def __init__(self): super(KerberosCredentials, self).__init__() @@ -293,6 +295,7 @@ class KerberosCredentials(Credentials): def get_forced_salt(self): return self.forced_salt + class KerberosTicketCreds(object): def __init__(self, ticket, session_key, crealm=None, cname=None, @@ -311,14 +314,15 @@ class KerberosTicketCreds(object): self.encpart_private = encpart_private return + class RawKerberosTest(TestCaseInTempDir): """A raw Kerberos Test case.""" etypes_to_test = ( - { "value": -1111, "name": "dummy", }, - { "value": kcrypto.Enctype.AES256, "name": "aes128", }, - { "value": kcrypto.Enctype.AES128, "name": "aes256", }, - { "value": kcrypto.Enctype.RC4, "name": "rc4", }, + {"value": -1111, "name": "dummy", }, + {"value": kcrypto.Enctype.AES256, "name": "aes128", }, + {"value": kcrypto.Enctype.AES128, "name": "aes256", }, + {"value": kcrypto.Enctype.RC4, "name": "rc4", }, ) setup_etype_test_permutations_done = False @@ -332,7 +336,7 @@ class RawKerberosTest(TestCaseInTempDir): num_idxs = len(cls.etypes_to_test) permutations = [] - for num in range(1, num_idxs+1): + for num in range(1, num_idxs + 1): chunk = list(itertools.permutations(range(num_idxs), num)) for e in chunk: el = list(e) @@ -349,7 +353,7 @@ class RawKerberosTest(TestCaseInTempDir): name += "_%s" % n etypes += (cls.etypes_to_test[idx]["value"],) - r = { "name": name, "etypes": etypes, } + r = {"name": name, "etypes": etypes, } res.append(r) cls.etype_test_permutations = res @@ -386,7 +390,8 @@ class RawKerberosTest(TestCaseInTempDir): self.do_asn1_print = False self.do_hexdump = False - strict_checking = samba.tests.env_get_var_value('STRICT_CHECKING', allow_missing=True) + strict_checking = samba.tests.env_get_var_value('STRICT_CHECKING', + allow_missing=True) if strict_checking is None: strict_checking = '1' self.strict_checking = bool(int(strict_checking)) @@ -440,8 +445,9 @@ class RawKerberosTest(TestCaseInTempDir): val = None if prefix is not None: allow_missing_prefix = allow_missing or fallback_default - val = samba.tests.env_get_var_value('%s_%s' % (prefix, varname), - allow_missing=allow_missing_prefix) + val = samba.tests.env_get_var_value( + '%s_%s' % (prefix, varname), + allow_missing=allow_missing_prefix) else: fallback_default = True if val is None and fallback_default: @@ -506,7 +512,8 @@ class RawKerberosTest(TestCaseInTempDir): if aes256_key is not None: c.set_forced_key(kcrypto.Enctype.AES256, aes256_key) aes128_key = self.env_get_var('AES128_KEY_HEX', prefix, - fallback_default=False, allow_missing=True) + fallback_default=False, + allow_missing=True) if aes128_key is not None: c.set_forced_key(kcrypto.Enctype.AES128, aes128_key) rc4_key = self.env_get_var('RC4_KEY_HEX', prefix, @@ -536,11 +543,12 @@ class RawKerberosTest(TestCaseInTempDir): env_err = None try: # Try to obtain them from the environment - creds = self._get_krb5_creds_from_env(prefix, - default_username=default_username, - allow_missing_password=allow_missing_password, - allow_missing_keys=allow_missing_keys, - require_strongest_key=require_strongest_key) + creds = self._get_krb5_creds_from_env( + prefix, + default_username=default_username, + allow_missing_password=allow_missing_password, + allow_missing_keys=allow_missing_keys, + require_strongest_key=require_strongest_key) except Exception as err: # An error occurred, so save it for later env_err = err @@ -886,8 +894,8 @@ class RawKerberosTest(TestCaseInTempDir): return s def get_Nonce(self): - nonce_min=0x7f000000 - nonce_max=0x7fffffff + nonce_min = 0x7f000000 + nonce_max = 0x7fffffff v = random.randint(nonce_min, nonce_max) return v @@ -936,15 +944,20 @@ class RawKerberosTest(TestCaseInTempDir): if etype == kcrypto.Enctype.RC4: nthash = creds.get_nt_hash() self.assertIsNotNone(nthash, msg=fail_msg) - return self.SessionKey_create(etype=etype, contents=nthash, kvno=kvno) + return self.SessionKey_create(etype=etype, + contents=nthash, + kvno=kvno) password = creds.get_password() self.assertIsNotNone(password, msg=fail_msg) salt = creds.get_forced_salt() if salt is None: salt = bytes("%s%s" % (creds.get_realm(), creds.get_username()), - encoding='utf-8') - return self.PasswordKey_create(etype=etype, pwd=password, salt=salt, kvno=kvno) + encoding='utf-8') + return self.PasswordKey_create(etype=etype, + pwd=password, + salt=salt, + kvno=kvno) def RandomKey(self, etype): e = kcrypto._get_enctype_profile(etype) @@ -1020,10 +1033,12 @@ class RawKerberosTest(TestCaseInTempDir): return PA_ENC_TS_ENC_obj def KERB_PA_PAC_REQUEST_create(self, include_pac, pa_data_create=True): - #KERB-PA-PAC-REQUEST ::= SEQUENCE { - # include-pac[0] BOOLEAN --If TRUE, and no pac present, include PAC. - # --If FALSE, and PAC present, remove PAC - #} + # KERB-PA-PAC-REQUEST ::= SEQUENCE { + # include-pac[0] BOOLEAN --If TRUE, and no pac present, + # -- include PAC. + # --If FALSE, and PAC present, + # -- remove PAC. + # } KERB_PA_PAC_REQUEST_obj = { 'include-pac': include_pac, } @@ -1031,7 +1046,7 @@ class RawKerberosTest(TestCaseInTempDir): return KERB_PA_PAC_REQUEST_obj pa_pac = self.der_encode(KERB_PA_PAC_REQUEST_obj, asn1Spec=krb5_asn1.KERB_PA_PAC_REQUEST()) - pa_data = self.PA_DATA_create(128, pa_pac) # PA-PAC-REQUEST + pa_data = self.PA_DATA_create(128, pa_pac) # PA-PAC-REQUEST return pa_data def KDC_REQ_BODY_create(self, @@ -1327,11 +1342,14 @@ class RawKerberosTest(TestCaseInTempDir): EncAuthorizationData=EncAuthorizationData, EncAuthorizationData_key=EncAuthorizationData_key, additional_tickets=additional_tickets) - req_body_blob = self.der_encode(req_body, asn1Spec=krb5_asn1.KDC_REQ_BODY(), + req_body_blob = self.der_encode(req_body, + asn1Spec=krb5_asn1.KDC_REQ_BODY(), asn1_print=asn1_print, hexdump=hexdump) - req_body_checksum = self.Checksum_create( - ticket_session_key, 6, req_body_blob, ctype=body_checksum_type) + req_body_checksum = self.Checksum_create(ticket_session_key, + 6, + req_body_blob, + ctype=body_checksum_type) subkey_obj = None if authenticator_subkey is not None: @@ -1390,7 +1408,10 @@ class RawKerberosTest(TestCaseInTempDir): cksum_data += n.encode() cksum_data += realm.encode() cksum_data += "Kerberos".encode() - cksum = self.Checksum_create(tgt_session_key, 17, cksum_data, ctype) + cksum = self.Checksum_create(tgt_session_key, + 17, + cksum_data, + ctype) PA_S4U2Self_obj = { 'name': name, @@ -1403,20 +1424,20 @@ class RawKerberosTest(TestCaseInTempDir): return self.PA_DATA_create(129, pa_s4u2self) def _generic_kdc_exchange(self, - kdc_exchange_dict, # required - kdc_options=None, # required - cname=None, # optional - realm=None, # required - sname=None, # optional - from_time=None, # optional - till_time=None, # required - renew_time=None, # optional - nonce=None, # required - etypes=None, # required - addresses=None, # optional - EncAuthorizationData=None, # optional - EncAuthorizationData_key=None, # optional - additional_tickets=None): # optional + kdc_exchange_dict, # required + kdc_options=None, # required + cname=None, # optional + realm=None, # required + sname=None, # optional + from_time=None, # optional + till_time=None, # required + renew_time=None, # optional + nonce=None, # required + etypes=None, # required + addresses=None, # optional + EncAuthorizationData=None, # optional + EncAuthorizationData_key=None, # optional + additional_tickets=None): # optional check_error_fn = kdc_exchange_dict['check_error_fn'] check_rep_fn = kdc_exchange_dict['check_rep_fn'] @@ -1431,19 +1452,20 @@ class RawKerberosTest(TestCaseInTempDir): if nonce is None: nonce = self.get_Nonce() - req_body = self.KDC_REQ_BODY_create(kdc_options=kdc_options, - cname=cname, - realm=realm, - sname=sname, - from_time=from_time, - till_time=till_time, - renew_time=renew_time, - nonce=nonce, - etypes=etypes, - addresses=addresses, - EncAuthorizationData=EncAuthorizationData, - EncAuthorizationData_key=EncAuthorizationData_key, - additional_tickets=additional_tickets) + req_body = self.KDC_REQ_BODY_create( + kdc_options=kdc_options, + cname=cname, + realm=realm, + sname=sname, + from_time=from_time, + till_time=till_time, + renew_time=renew_time, + nonce=nonce, + etypes=etypes, + addresses=addresses, + EncAuthorizationData=EncAuthorizationData, + EncAuthorizationData_key=EncAuthorizationData_key, + additional_tickets=additional_tickets) if generate_padata_fn is not None: # This can alter req_body... padata, req_body = generate_padata_fn(kdc_exchange_dict, @@ -1455,10 +1477,10 @@ class RawKerberosTest(TestCaseInTempDir): kdc_exchange_dict['req_padata'] = padata kdc_exchange_dict['req_body'] = req_body - req_obj,req_decoded = self.KDC_REQ_create(msg_type=req_msg_type, - padata=padata, - req_body=req_body, - asn1Spec=req_asn1Spec()) + req_obj, req_decoded = self.KDC_REQ_create(msg_type=req_msg_type, + padata=padata, + req_body=req_body, + asn1Spec=req_asn1Spec()) rep = self.send_recv_transaction(req_decoded) self.assertIsNotNone(rep) @@ -1571,7 +1593,7 @@ class RawKerberosTest(TestCaseInTempDir): rep_encpart_asn1Spec = kdc_exchange_dict['rep_encpart_asn1Spec'] msg_type = kdc_exchange_dict['rep_msg_type'] - self.assertElementEqual(rep, 'msg-type', msg_type) # AS-REP | TGS-REP + self.assertElementEqual(rep, 'msg-type', msg_type) # AS-REP | TGS-REP padata = self.getElementValue(rep, 'padata') self.assertElementEqualUTF8(rep, 'crealm', expected_crealm) self.assertElementEqualPrincipal(rep, 'cname', expected_cname) @@ -1579,22 +1601,23 @@ class RawKerberosTest(TestCaseInTempDir): ticket = self.getElementValue(rep, 'ticket') ticket_encpart = None ticket_cipher = None - if ticket is not None: # Never None, but gives indentation + if ticket is not None: # Never None, but gives indentation self.assertElementPresent(ticket, 'tkt-vno') self.assertElementEqualUTF8(ticket, 'realm', expected_srealm) self.assertElementEqualPrincipal(ticket, 'sname', expected_sname) self.assertElementPresent(ticket, 'enc-part') ticket_encpart = self.getElementValue(ticket, 'enc-part') - if ticket_encpart is not None: # Never None, but gives indentation + if ticket_encpart is not None: # Never None, but gives indentation self.assertElementPresent(ticket_encpart, 'etype') # 'unspecified' means present, with any value != 0 - self.assertElementKVNO(ticket_encpart, 'kvno', self.unspecified_kvno) + self.assertElementKVNO(ticket_encpart, 'kvno', + self.unspecified_kvno) self.assertElementPresent(ticket_encpart, 'cipher') ticket_cipher = self.getElementValue(ticket_encpart, 'cipher') self.assertElementPresent(rep, 'enc-part') encpart = self.getElementValue(rep, 'enc-part') encpart_cipher = None - if encpart is not None: # Never None, but gives indentation + if encpart is not None: # Never None, but gives indentation self.assertElementPresent(encpart, 'etype') self.assertElementKVNO(ticket_encpart, 'kvno', 'autodetect') self.assertElementPresent(encpart, 'cipher') @@ -1602,24 +1625,35 @@ class RawKerberosTest(TestCaseInTempDir): encpart_decryption_key = None if check_padata_fn is not None: - # See if get the decryption key from the preauth phase - encpart_decryption_key,encpart_decryption_usage = \ - check_padata_fn(kdc_exchange_dict, callback_dict, - rep, padata) + # See if we can get the decryption key from the preauth phase + encpart_decryption_key, encpart_decryption_usage = ( + check_padata_fn(kdc_exchange_dict, callback_dict, + rep, padata)) ticket_private = None if ticket_decryption_key is not None: - self.assertElementEqual(ticket_encpart, 'etype', ticket_decryption_key.etype) - self.assertElementKVNO(ticket_encpart, 'kvno', ticket_decryption_key.kvno) - ticket_decpart = ticket_decryption_key.decrypt(KU_TICKET, ticket_cipher) - ticket_private = self.der_decode(ticket_decpart, asn1Spec=krb5_asn1.EncTicketPart()) + self.assertElementEqual(ticket_encpart, 'etype', + ticket_decryption_key.etype) + self.assertElementKVNO(ticket_encpart, 'kvno', + ticket_decryption_key.kvno) + ticket_decpart = ticket_decryption_key.decrypt(KU_TICKET, + ticket_cipher) + ticket_private = self.der_decode( + ticket_decpart, + asn1Spec=krb5_asn1.EncTicketPart()) encpart_private = None if encpart_decryption_key is not None: - self.assertElementEqual(encpart, 'etype', encpart_decryption_key.etype) - self.assertElementKVNO(encpart, 'kvno', encpart_decryption_key.kvno) - rep_decpart = encpart_decryption_key.decrypt(encpart_decryption_usage, encpart_cipher) - encpart_private = self.der_decode(rep_decpart, asn1Spec=rep_encpart_asn1Spec()) + self.assertElementEqual(encpart, 'etype', + encpart_decryption_key.etype) + self.assertElementKVNO(encpart, 'kvno', + encpart_decryption_key.kvno) + rep_decpart = encpart_decryption_key.decrypt( + encpart_decryption_usage, + encpart_cipher) + encpart_private = self.der_decode( + rep_decpart, + asn1Spec=rep_encpart_asn1Spec()) if check_kdc_private_fn is not None: check_kdc_private_fn(kdc_exchange_dict, callback_dict, @@ -1647,12 +1681,14 @@ class RawKerberosTest(TestCaseInTempDir): self.assertElementPresent(ticket_private, 'flags') self.assertElementPresent(ticket_private, 'key') ticket_key = self.getElementValue(ticket_private, 'key') - if ticket_key is not None: # Never None, but gives indentation + if ticket_key is not None: # Never None, but gives indentation self.assertElementPresent(ticket_key, 'keytype') self.assertElementPresent(ticket_key, 'keyvalue') ticket_session_key = self.EncryptionKey_import(ticket_key) - self.assertElementEqualUTF8(ticket_private, 'crealm', expected_crealm) - self.assertElementEqualPrincipal(ticket_private, 'cname', expected_cname) + self.assertElementEqualUTF8(ticket_private, 'crealm', + expected_crealm) + self.assertElementEqualPrincipal(ticket_private, 'cname', + expected_cname) self.assertElementPresent(ticket_private, 'transited') self.assertElementPresent(ticket_private, 'authtime') if self.strict_checking: @@ -1666,39 +1702,45 @@ class RawKerberosTest(TestCaseInTempDir): if encpart_private is not None: self.assertElementPresent(encpart_private, 'key') encpart_key = self.getElementValue(encpart_private, 'key') - if encpart_key is not None: # Never None, but gives indentation + if encpart_key is not None: # Never None, but gives indentation self.assertElementPresent(encpart_key, 'keytype') self.assertElementPresent(encpart_key, 'keyvalue') encpart_session_key = self.EncryptionKey_import(encpart_key) self.assertElementPresent(encpart_private, 'last-req') self.assertElementPresent(encpart_private, 'nonce') - # TODO self.assertElementPresent(encpart_private, 'key-expiration') + # TODO self.assertElementPresent(encpart_private, + # 'key-expiration') self.assertElementPresent(encpart_private, 'flags') self.assertElementPresent(encpart_private, 'authtime') if self.strict_checking: self.assertElementPresent(encpart_private, 'starttime') self.assertElementPresent(encpart_private, 'endtime') # TODO self.assertElementPresent(encpart_private, 'renew-till') - self.assertElementEqualUTF8(encpart_private, 'srealm', expected_srealm) - self.assertElementEqualPrincipal(encpart_private, 'sname', expected_sname) + self.assertElementEqualUTF8(encpart_private, 'srealm', + expected_srealm) + self.assertElementEqualPrincipal(encpart_private, 'sname', + expected_sname) # TODO self.assertElementMissing(encpart_private, 'caddr') if ticket_session_key is not None and encpart_session_key is not None: - self.assertEqual(ticket_session_key.etype, encpart_session_key.etype) - self.assertEqual(ticket_session_key.key.contents, encpart_session_key.key.contents) + self.assertEqual(ticket_session_key.etype, + encpart_session_key.etype) + self.assertEqual(ticket_session_key.key.contents, + encpart_session_key.key.contents) if encpart_session_key is not None: session_key = encpart_session_key else: session_key = ticket_session_key - ticket_creds = KerberosTicketCreds(ticket, - session_key, - crealm=expected_crealm, - cname=expected_cname, - srealm=expected_srealm, - sname=expected_sname, - decryption_key=ticket_decryption_key, - ticket_private=ticket_private, - encpart_private=encpart_private) + ticket_creds = KerberosTicketCreds( + ticket, + session_key, + crealm=expected_crealm, + cname=expected_cname, + srealm=expected_srealm, + sname=expected_sname, + decryption_key=ticket_decryption_key, + ticket_private=ticket_private, + encpart_private=encpart_private) kdc_exchange_dict['rep_ticket_creds'] = ticket_creds return @@ -1728,11 +1770,11 @@ class RawKerberosTest(TestCaseInTempDir): if kcrypto.Enctype.RC4 in proposed_etypes: expect_etype_info = True for etype in proposed_etypes: - if etype in (kcrypto.Enctype.AES256,kcrypto.Enctype.AES128): + if etype in (kcrypto.Enctype.AES256, kcrypto.Enctype.AES128): expect_etype_info = False if etype not in client_as_etypes: continue - if etype in (kcrypto.Enctype.AES256,kcrypto.Enctype.AES128): + if etype in (kcrypto.Enctype.AES256, kcrypto.Enctype.AES128): if etype > expected_aes_type: expected_aes_type = etype if etype in (kcrypto.Enctype.RC4,): @@ -1779,14 +1821,17 @@ class RawKerberosTest(TestCaseInTempDir): if self.strict_checking: self.assertIsNotNone(edata) if edata is not None: - rep_padata = self.der_decode(edata, asn1Spec=krb5_asn1.METHOD_DATA()) + rep_padata = self.der_decode(edata, + asn1Spec=krb5_asn1.METHOD_DATA()) self.assertGreater(len(rep_padata), 0) else: rep_padata = [] if self.strict_checking: for i in range(0, len(expected_patypes)): - self.assertElementEqual(rep_padata[i], 'padata-type', expected_patypes[i]) + self.assertElementEqual(rep_padata[i], + 'padata-type', + expected_patypes[i]) self.assertEqual(len(rep_padata), len(expected_patypes)) etype_info2 = None @@ -1799,11 +1844,13 @@ class RawKerberosTest(TestCaseInTempDir): pavalue = self.getElementValue(pa, 'padata-value') if patype == PADATA_ETYPE_INFO2: self.assertIsNone(etype_info2) - etype_info2 = self.der_decode(pavalue, asn1Spec=krb5_asn1.ETYPE_INFO2()) + etype_info2 = self.der_decode(pavalue, + asn1Spec=krb5_asn1.ETYPE_INFO2()) continue if patype == PADATA_ETYPE_INFO: self.assertIsNone(etype_info) - etype_info = self.der_decode(pavalue, asn1Spec=krb5_asn1.ETYPE_INFO()) + etype_info = self.der_decode(pavalue, + asn1Spec=krb5_asn1.ETYPE_INFO()) continue if patype == PADATA_ENC_TIMESTAMP: self.assertIsNone(enc_timestamp) @@ -1881,7 +1928,8 @@ class RawKerberosTest(TestCaseInTempDir): authenticator_subkey = kdc_exchange_dict['authenticator_subkey'] body_checksum_type = kdc_exchange_dict['body_checksum_type'] - req_body_blob = self.der_encode(req_body, asn1Spec=krb5_asn1.KDC_REQ_BODY()) + req_body_blob = self.der_encode(req_body, + asn1Spec=krb5_asn1.KDC_REQ_BODY()) req_body_checksum = self.Checksum_create(tgt.session_key, KU_TGS_REQ_AUTH_CKSUM, @@ -1893,15 +1941,18 @@ class RawKerberosTest(TestCaseInTempDir): subkey_obj = authenticator_subkey.export_obj() seq_number = random.randint(0, 0xfffffffe) (ctime, cusec) = self.get_KerberosTimeWithUsec() - authenticator_obj = self.Authenticator_create(crealm=tgt.crealm, - cname=tgt.cname, - cksum=req_body_checksum, - cusec=cusec, - ctime=ctime, - subkey=subkey_obj, - seq_number=seq_number, - authorization_data=None) - authenticator_blob = self.der_encode(authenticator_obj, asn1Spec=krb5_asn1.Authenticator()) + authenticator_obj = self.Authenticator_create( + crealm=tgt.crealm, + cname=tgt.cname, + cksum=req_body_checksum, + cusec=cusec, + ctime=ctime, + subkey=subkey_obj, + seq_number=seq_number, + authorization_data=None) + authenticator_blob = self.der_encode( + authenticator_obj, + asn1Spec=krb5_asn1.Authenticator()) authenticator = self.EncryptedData_create(tgt.session_key, KU_TGS_REQ_AUTH, @@ -1909,8 +1960,8 @@ class RawKerberosTest(TestCaseInTempDir): ap_options = krb5_asn1.APOptions('0') ap_req_obj = self.AP_REQ_create(ap_options=str(ap_options), - ticket=tgt.ticket, - authenticator=authenticator) + ticket=tgt.ticket, + authenticator=authenticator) ap_req = self.der_encode(ap_req_obj, asn1Spec=krb5_asn1.AP_REQ()) pa_tgs_req = self.PA_DATA_create(PADATA_KDC_REQ, ap_req) padata = [pa_tgs_req] @@ -1964,19 +2015,19 @@ class RawKerberosTest(TestCaseInTempDir): return preauth_key, as_rep_usage kdc_exchange_dict = self.as_exchange_dict( - expected_crealm=expected_crealm, - expected_cname=expected_cname, - expected_srealm=expected_srealm, - expected_sname=expected_sname, - ticket_decryption_key=ticket_decryption_key, - generate_padata_fn=_generate_padata_copy, - check_error_fn=self.generic_check_as_error, - check_rep_fn=self.generic_check_kdc_rep, - check_padata_fn=_check_padata_preauth_key, - check_kdc_private_fn=self.generic_check_kdc_private, - expected_error_mode=expected_error_mode, - client_as_etypes=client_as_etypes, - expected_salt=expected_salt) + expected_crealm=expected_crealm, + expected_cname=expected_cname, + expected_srealm=expected_srealm, + expected_sname=expected_sname, + ticket_decryption_key=ticket_decryption_key, + generate_padata_fn=_generate_padata_copy, + check_error_fn=self.generic_check_as_error, + check_rep_fn=self.generic_check_kdc_rep, + check_padata_fn=_check_padata_preauth_key, + check_kdc_private_fn=self.generic_check_kdc_private, + expected_error_mode=expected_error_mode, + client_as_etypes=client_as_etypes, + expected_salt=expected_salt) rep = self._generic_kdc_exchange(kdc_exchange_dict, kdc_options=str(kdc_options), @@ -1986,7 +2037,7 @@ class RawKerberosTest(TestCaseInTempDir): till_time=till, etypes=etypes) - if expected_error_mode == 0: # AS-REP + if expected_error_mode == 0: # AS-REP return rep return kdc_exchange_dict['preauth_etype_info2']