1
0
mirror of https://github.com/samba-team/samba.git synced 2025-12-23 00:23:53 +03:00

tests python krb5: PEP8 cleanups

Fix all the PEP8 warnings in samba/tests/krb5. With the exception of
rfc4120_pyasn1.py, which is generated from rfc4120.asn1.

As these tests are new, it makes sense to ensure that they conform to
PEP8. And set an aspirational goal for the rest of our python code.

Signed-off-by: Gary Lockyer <gary@catalyst.net.nz>
Reviewed-by: Andreas Schneider <asn@samba.org>

Autobuild-User(master): Gary Lockyer <gary@samba.org>
Autobuild-Date(master): Mon Dec 21 21:29:28 UTC 2020 on sn-devel-184
This commit is contained in:
Gary Lockyer
2020-12-11 11:55:01 +13:00
committed by Gary Lockyer
parent 03676a4a5c
commit c00d537526
10 changed files with 410 additions and 260 deletions

View File

@@ -31,8 +31,6 @@ import samba
from samba.auth import system_session from samba.auth import system_session
from samba.credentials import ( from samba.credentials import (
Credentials, Credentials,
CLI_CRED_NTLMv2_AUTH,
CLI_CRED_NTLM_AUTH,
DONT_USE_KERBEROS) DONT_USE_KERBEROS)
from samba.dcerpc.misc import SEC_CHAN_WKSTA from samba.dcerpc.misc import SEC_CHAN_WKSTA
from samba.dsdb import ( from samba.dsdb import (
@@ -41,7 +39,20 @@ from samba.dsdb import (
UF_NORMAL_ACCOUNT) UF_NORMAL_ACCOUNT)
from samba.samdb import SamDB from samba.samdb import SamDB
from samba.tests import delete_force, DynamicTestCase from samba.tests import delete_force, DynamicTestCase
from samba.tests.krb5.rfc4120_constants import * from samba.tests.krb5.rfc4120_constants import (
AES256_CTS_HMAC_SHA1_96,
AES128_CTS_HMAC_SHA1_96,
ARCFOUR_HMAC_MD5,
KDC_ERR_PREAUTH_REQUIRED,
KRB_AS_REP,
KU_AS_REP_ENC_PART,
KRB_ERROR,
KU_PA_ENC_TIMESTAMP,
PADATA_ENC_TIMESTAMP,
NT_ENTERPRISE_PRINCIPAL,
NT_PRINCIPAL,
NT_SRV_INST,
)
global_asn1_print = False global_asn1_print = False
global_hexdump = False global_hexdump = False
@@ -128,6 +139,7 @@ class TestData:
MACHINE_NAME = "tstkrb5cnnmch" MACHINE_NAME = "tstkrb5cnnmch"
USER_NAME = "tstkrb5cnnusr" USER_NAME = "tstkrb5cnnusr"
@DynamicTestCase @DynamicTestCase
class KerberosASCanonicalizationTests(RawKerberosTest): class KerberosASCanonicalizationTests(RawKerberosTest):

View File

@@ -25,7 +25,20 @@ os.environ["PYTHONUNBUFFERED"] = "1"
from samba.tests.krb5.raw_testcase import RawKerberosTest from samba.tests.krb5.raw_testcase import RawKerberosTest
import samba.tests.krb5.rfc4120_pyasn1 as krb5_asn1 import samba.tests.krb5.rfc4120_pyasn1 as krb5_asn1
from samba.tests.krb5.rfc4120_constants import * from samba.tests.krb5.rfc4120_constants import (
AES128_CTS_HMAC_SHA1_96,
AES256_CTS_HMAC_SHA1_96,
ARCFOUR_HMAC_MD5,
KDC_ERR_PREAUTH_REQUIRED,
KRB_AS_REP,
KRB_ERROR,
KU_AS_REP_ENC_PART,
KU_PA_ENC_TIMESTAMP,
PADATA_ENC_TIMESTAMP,
PADATA_ETYPE_INFO2,
NT_PRINCIPAL,
NT_SRV_INST,
)
global_asn1_print = False global_asn1_print = False
global_hexdump = False global_hexdump = False
@@ -123,7 +136,6 @@ class SimpleKerberosTests(RawKerberosTest):
kdc_options = krb5_asn1.KDCOptions('forwardable') kdc_options = krb5_asn1.KDCOptions('forwardable')
padata = None padata = None
req = self.AS_REQ_create(padata=padata, req = self.AS_REQ_create(padata=padata,
kdc_options=str(kdc_options), kdc_options=str(kdc_options),
cname=cname, cname=cname,

View File

@@ -64,6 +64,7 @@ from samba.credentials import Credentials
from samba import generate_random_bytes as get_random_bytes from samba import generate_random_bytes as get_random_bytes
from samba.common import get_string, get_bytes from samba.common import get_string, get_bytes
class Enctype(object): class Enctype(object):
DES_CRC = 1 DES_CRC = 1
DES_MD4 = 2 DES_MD4 = 2
@@ -112,16 +113,19 @@ def _mac_equal(mac1, mac2):
res |= x ^ y res |= x ^ y
return res == 0 return res == 0
def SIMPLE_HASH(string, algo_cls): def SIMPLE_HASH(string, algo_cls):
hash_ctx = hashes.Hash(algo_cls(), default_backend()) hash_ctx = hashes.Hash(algo_cls(), default_backend())
hash_ctx.update(string) hash_ctx.update(string)
return hash_ctx.finalize() return hash_ctx.finalize()
def HMAC_HASH(key, string, algo_cls): def HMAC_HASH(key, string, algo_cls):
hmac_ctx = hmac.HMAC(key, algo_cls(), default_backend()) hmac_ctx = hmac.HMAC(key, algo_cls(), default_backend())
hmac_ctx.update(string) hmac_ctx.update(string)
return hmac_ctx.finalize() return hmac_ctx.finalize()
def _nfold(str, nbytes): def _nfold(str, nbytes):
# Convert str to a string of length nbytes using the RFC 3961 nfold # Convert str to a string of length nbytes using the RFC 3961 nfold
# operation. # operation.
@@ -129,8 +133,9 @@ def _nfold(str, nbytes):
# Rotate the bytes in str to the right by nbits bits. # Rotate the bytes in str to the right by nbits bits.
def rotate_right(str, nbits): def rotate_right(str, nbits):
nbytes, remain = (nbits // 8) % len(str), nbits % 8 nbytes, remain = (nbits // 8) % len(str), nbits % 8
return bytes([(str[i-nbytes] >> remain) | return bytes([
(str[i-nbytes-1] << (8-remain) & 0xff) (str[i - nbytes] >> remain)
| (str[i - nbytes - 1] << (8 - remain) & 0xff)
for i in range(len(str))]) for i in range(len(str))])
# Add equal-length strings together with end-around carry. # Add equal-length strings together with end-around carry.
@@ -433,7 +438,8 @@ class _RC4(_EnctypeProfile):
cksum = HMAC_HASH(ki, confounder + plaintext, hashes.MD5) cksum = HMAC_HASH(ki, confounder + plaintext, hashes.MD5)
ke = HMAC_HASH(ki, cksum, hashes.MD5) ke = HMAC_HASH(ki, cksum, hashes.MD5)
encryptor = Cipher(ciphers.ARC4(ke), None, default_backend()).encryptor() encryptor = Cipher(
ciphers.ARC4(ke), None, default_backend()).encryptor()
ctext = encryptor.update(confounder + plaintext) ctext = encryptor.update(confounder + plaintext)
return cksum + ctext return cksum + ctext
@@ -446,7 +452,8 @@ class _RC4(_EnctypeProfile):
ki = HMAC_HASH(key.contents, cls.usage_str(keyusage), hashes.MD5) ki = HMAC_HASH(key.contents, cls.usage_str(keyusage), hashes.MD5)
ke = HMAC_HASH(ki, cksum, hashes.MD5) ke = HMAC_HASH(ki, cksum, hashes.MD5)
decryptor = Cipher(ciphers.ARC4(ke), None, default_backend()).decryptor() decryptor = Cipher(
ciphers.ARC4(ke), None, default_backend()).decryptor()
basic_plaintext = decryptor.update(basic_ctext) basic_plaintext = decryptor.update(basic_ctext)
exp_cksum = HMAC_HASH(ki, basic_plaintext, hashes.MD5) exp_cksum = HMAC_HASH(ki, basic_plaintext, hashes.MD5)
@@ -636,14 +643,14 @@ def verify_checksum(cksumtype, key, keyusage, text, cksum):
c.verify(key, keyusage, text, cksum) c.verify(key, keyusage, text, cksum)
def prfplus(key, pepper, l): def prfplus(key, pepper, ln):
# Produce l bytes of output using the RFC 6113 PRF+ function. # Produce ln bytes of output using the RFC 6113 PRF+ function.
out = b'' out = b''
count = 1 count = 1
while len(out) < l: while len(out) < ln:
out += prf(key, bytes([count]) + pepper) out += prf(key, bytes([count]) + pepper)
count += 1 count += 1
return out[:l] return out[:ln]
def cf2(enctype, key1, key2, pepper1, pepper2): def cf2(enctype, key1, key2, pepper1, pepper2):
@@ -653,9 +660,11 @@ def cf2(enctype, key1, key2, pepper1, pepper2):
return e.random_to_key(_xorbytes(prfplus(key1, pepper1, e.seedsize), return e.random_to_key(_xorbytes(prfplus(key1, pepper1, e.seedsize),
prfplus(key2, pepper2, e.seedsize))) prfplus(key2, pepper2, e.seedsize)))
def h(hexstr): def h(hexstr):
return bytes.fromhex(hexstr) return bytes.fromhex(hexstr)
class KcrytoTest(TestCase): class KcrytoTest(TestCase):
"""kcrypto Test case.""" """kcrypto Test case."""
@@ -665,20 +674,21 @@ class KcrytoTest(TestCase):
conf = h('94B491F481485B9A0678CD3C4EA386AD') conf = h('94B491F481485B9A0678CD3C4EA386AD')
keyusage = 2 keyusage = 2
plain = b'9 bytesss' plain = b'9 bytesss'
ctxt = h('68FB9679601F45C78857B2BF820FD6E53ECA8D42FD4B1D7024A09205ABB7CD2E' ctxt = h('68FB9679601F45C78857B2BF820FD6E53ECA8D42FD4B1D7024A09205ABB7'
'C26C355D2F') 'CD2EC26C355D2F')
k = Key(Enctype.AES128, kb) k = Key(Enctype.AES128, kb)
self.assertEqual(encrypt(k, keyusage, plain, conf), ctxt) self.assertEqual(encrypt(k, keyusage, plain, conf), ctxt)
self.assertEqual(decrypt(k, keyusage, ctxt), plain) self.assertEqual(decrypt(k, keyusage, ctxt), plain)
def test_aes256_crypt(self): def test_aes256_crypt(self):
# AES256 encrypt and decrypt # AES256 encrypt and decrypt
kb = h('F1C795E9248A09338D82C3F8D5B567040B0110736845041347235B1404231398') kb = h('F1C795E9248A09338D82C3F8D5B567040B0110736845041347235B14042313'
'98')
conf = h('E45CA518B42E266AD98E165E706FFB60') conf = h('E45CA518B42E266AD98E165E706FFB60')
keyusage = 4 keyusage = 4
plain = b'30 bytes bytes bytes bytes byt' plain = b'30 bytes bytes bytes bytes byt'
ctxt = h('D1137A4D634CFECE924DBC3BF6790648BD5CFF7DE0E7B99460211D0DAEF3D79A' ctxt = h('D1137A4D634CFECE924DBC3BF6790648BD5CFF7DE0E7B99460211D0DAEF3'
'295C688858F3B34B9CBD6EEBAE81DAF6B734D4D498B6714F1C1D') 'D79A295C688858F3B34B9CBD6EEBAE81DAF6B734D4D498B6714F1C1D')
k = Key(Enctype.AES256, kb) k = Key(Enctype.AES256, kb)
self.assertEqual(encrypt(k, keyusage, plain, conf), ctxt) self.assertEqual(encrypt(k, keyusage, plain, conf), ctxt)
self.assertEqual(decrypt(k, keyusage, ctxt), plain) self.assertEqual(decrypt(k, keyusage, ctxt), plain)
@@ -694,7 +704,8 @@ class KcrytoTest(TestCase):
def test_aes256_checksum(self): def test_aes256_checksum(self):
# AES256 checksum # AES256 checksum
kb = h('B1AE4CD8462AFF1677053CC9279AAC30B796FB81CE21474DD3DDBCFEA4EC76D7') kb = h('B1AE4CD8462AFF1677053CC9279AAC30B796FB81CE21474DD3DDBC'
'FEA4EC76D7')
keyusage = 4 keyusage = 4
plain = b'fourteen' plain = b'fourteen'
cksum = h('E08739E3279E2903EC8E3836') cksum = h('E08739E3279E2903EC8E3836')
@@ -715,7 +726,8 @@ class KcrytoTest(TestCase):
string = b'X' * 64 string = b'X' * 64
salt = b'pass phrase equals block size' salt = b'pass phrase equals block size'
params = h('000004B0') params = h('000004B0')
kb = h('89ADEE3608DB8BC71F1BFBFE459486B05618B70CBAE22092534E56C553BA4B34') kb = h('89ADEE3608DB8BC71F1BFBFE459486B05618B70CBAE22092534E56'
'C553BA4B34')
k = string_to_key(Enctype.AES256, string, salt, params) k = string_to_key(Enctype.AES256, string, salt, params)
self.assertEqual(k.contents, kb) self.assertEqual(k.contents, kb)
@@ -741,7 +753,8 @@ class KcrytoTest(TestCase):
def test_aes256_cf2(self): def test_aes256_cf2(self):
# AES256 cf2 # AES256 cf2
kb = h('4D6CA4E629785C1F01BAF55E2E548566B9617AE3A96868C337CB93B5E72B1C7B') kb = h('4D6CA4E629785C1F01BAF55E2E548566B9617AE3A96868C337CB93B5'
'E72B1C7B')
k1 = string_to_key(Enctype.AES256, b'key1', b'key1') k1 = string_to_key(Enctype.AES256, b'key1', b'key1')
k2 = string_to_key(Enctype.AES256, b'key2', b'key2') k2 = string_to_key(Enctype.AES256, b'key2', b'key2')
k = cf2(Enctype.AES256, k1, k2, b'a', b'b') k = cf2(Enctype.AES256, k1, k2, b'a', b'b')
@@ -753,8 +766,8 @@ class KcrytoTest(TestCase):
conf = h('94690A17B2DA3C9B') conf = h('94690A17B2DA3C9B')
keyusage = 3 keyusage = 3
plain = b'13 bytes byte' plain = b'13 bytes byte'
ctxt = h('839A17081ECBAFBCDC91B88C6955DD3C4514023CF177B77BF0D0177A16F705E8' ctxt = h('839A17081ECBAFBCDC91B88C6955DD3C4514023CF177B77BF0D0177A16F7'
'49CB7781D76A316B193F8D30') '05E849CB7781D76A316B193F8D30')
k = Key(Enctype.DES3, kb) k = Key(Enctype.DES3, kb)
self.assertEqual(encrypt(k, keyusage, plain, conf), ctxt) self.assertEqual(encrypt(k, keyusage, plain, conf), ctxt)
self.assertEqual(decrypt(k, keyusage, ctxt), _zeropad(plain, 8)) self.assertEqual(decrypt(k, keyusage, ctxt), _zeropad(plain, 8))
@@ -790,8 +803,8 @@ class KcrytoTest(TestCase):
conf = h('37245E73A45FBF72') conf = h('37245E73A45FBF72')
keyusage = 4 keyusage = 4
plain = b'30 bytes bytes bytes bytes byt' plain = b'30 bytes bytes bytes bytes byt'
ctxt = h('95F9047C3AD75891C2E9B04B16566DC8B6EB9CE4231AFB2542EF87A7B5A0F260' ctxt = h('95F9047C3AD75891C2E9B04B16566DC8B6EB9CE4231AFB2542EF87A7B5A0'
'A99F0460508DE0CECC632D07C354124E46C5D2234EB8') 'F260A99F0460508DE0CECC632D07C354124E46C5D2234EB8')
k = Key(Enctype.RC4, kb) k = Key(Enctype.RC4, kb)
self.assertEqual(encrypt(k, keyusage, plain, conf), ctxt) self.assertEqual(encrypt(k, keyusage, plain, conf), ctxt)
self.assertEqual(decrypt(k, keyusage, ctxt), plain) self.assertEqual(decrypt(k, keyusage, ctxt), plain)

View File

@@ -374,8 +374,8 @@ class KDCBaseTest(RawKerberosTest):
account_name = ( account_name = (
pac.info.info.info3.base.account_name) pac.info.info.info3.base.account_name)
user_sid = ( user_sid = (
str(pac.info.info.info3.base.domain_sid) + str(pac.info.info.info3.base.domain_sid)
"-" + str(pac.info.info.info3.base.rid)) + "-" + str(pac.info.info.info3.base.rid))
elif pac.type == self.PAC_LOGON_NAME: elif pac.type == self.PAC_LOGON_NAME:
logon_name = pac.info.account_name logon_name = pac.info.account_name
elif pac.type == self.PAC_UPN_DNS_INFO: elif pac.type == self.PAC_UPN_DNS_INFO:

View File

@@ -25,7 +25,20 @@ os.environ["PYTHONUNBUFFERED"] = "1"
from samba.tests.krb5.raw_testcase import RawKerberosTest from samba.tests.krb5.raw_testcase import RawKerberosTest
import samba.tests.krb5.rfc4120_pyasn1 as krb5_asn1 import samba.tests.krb5.rfc4120_pyasn1 as krb5_asn1
from samba.tests.krb5.rfc4120_constants import * from samba.tests.krb5.rfc4120_constants import (
AES256_CTS_HMAC_SHA1_96,
ARCFOUR_HMAC_MD5,
KDC_ERR_PREAUTH_FAILED,
KDC_ERR_PREAUTH_REQUIRED,
KDC_ERR_SKEW,
KRB_AS_REP,
KRB_ERROR,
KU_PA_ENC_TIMESTAMP,
PADATA_ENC_TIMESTAMP,
PADATA_ETYPE_INFO2,
NT_PRINCIPAL,
NT_SRV_INST,
)
global_asn1_print = False global_asn1_print = False
global_hexdump = False global_hexdump = False

View File

@@ -35,7 +35,10 @@ from pyasn1.codec.native.decoder import decode as pyasn1_native_decode
from pyasn1.codec.native.encoder import encode as pyasn1_native_encode from pyasn1.codec.native.encoder import encode as pyasn1_native_encode
from pyasn1.codec.ber.encoder import BitStringEncoder as BitStringEncoder from pyasn1.codec.ber.encoder import BitStringEncoder as BitStringEncoder
def BitStringEncoder_encodeValue32(self, value, asn1Spec, encodeFun, **options):
def BitStringEncoder_encodeValue32(
self, value, asn1Spec, encodeFun, **options):
# #
# BitStrings like KDCOptions or TicketFlags should at least # BitStrings like KDCOptions or TicketFlags should at least
# be 32-Bit on the wire # be 32-Bit on the wire
@@ -59,8 +62,11 @@ def BitStringEncoder_encodeValue32(self, value, asn1Spec, encodeFun, **options):
padding = 0 padding = 0
ret = b'\x00' + substrate + (b'\x00' * padding) ret = b'\x00' + substrate + (b'\x00' * padding)
return ret, False, True return ret, False, True
BitStringEncoder.encodeValue = BitStringEncoder_encodeValue32 BitStringEncoder.encodeValue = BitStringEncoder_encodeValue32
def BitString_NamedValues_prettyPrint(self, scope=0): def BitString_NamedValues_prettyPrint(self, scope=0):
ret = "%s" % self.asBinary() ret = "%s" % self.asBinary()
bits = [] bits = []
@@ -89,12 +95,21 @@ def BitString_NamedValues_prettyPrint(self, scope=0):
delim = ",\n%s " % indent delim = ",\n%s " % indent
ret += "\n%s)" % indent ret += "\n%s)" % indent
return ret return ret
krb5_asn1.TicketFlags.prettyPrintNamedValues = krb5_asn1.TicketFlagsValues.namedValues
krb5_asn1.TicketFlags.namedValues = krb5_asn1.TicketFlagsValues.namedValues
krb5_asn1.TicketFlags.prettyPrint = BitString_NamedValues_prettyPrint krb5_asn1.TicketFlags.prettyPrintNamedValues =\
krb5_asn1.KDCOptions.prettyPrintNamedValues = krb5_asn1.KDCOptionsValues.namedValues krb5_asn1.TicketFlagsValues.namedValues
krb5_asn1.KDCOptions.namedValues = krb5_asn1.KDCOptionsValues.namedValues krb5_asn1.TicketFlags.namedValues =\
krb5_asn1.KDCOptions.prettyPrint = BitString_NamedValues_prettyPrint krb5_asn1.TicketFlagsValues.namedValues
krb5_asn1.TicketFlags.prettyPrint =\
BitString_NamedValues_prettyPrint
krb5_asn1.KDCOptions.prettyPrintNamedValues =\
krb5_asn1.KDCOptionsValues.namedValues
krb5_asn1.KDCOptions.namedValues =\
krb5_asn1.KDCOptionsValues.namedValues
krb5_asn1.KDCOptions.prettyPrint =\
BitString_NamedValues_prettyPrint
def Integer_NamedValues_prettyPrint(self, scope=0): def Integer_NamedValues_prettyPrint(self, scope=0):
intval = int(self) intval = int(self)
@@ -104,16 +119,29 @@ def Integer_NamedValues_prettyPrint(self, scope=0):
name = "<__unknown__>" name = "<__unknown__>"
ret = "%d (0x%x) %s" % (intval, intval, name) ret = "%d (0x%x) %s" % (intval, intval, name)
return ret return ret
krb5_asn1.NameType.prettyPrintNamedValues = krb5_asn1.NameTypeValues.namedValues
krb5_asn1.NameType.prettyPrint = Integer_NamedValues_prettyPrint
krb5_asn1.AuthDataType.prettyPrintNamedValues = krb5_asn1.AuthDataTypeValues.namedValues krb5_asn1.NameType.prettyPrintNamedValues =\
krb5_asn1.AuthDataType.prettyPrint = Integer_NamedValues_prettyPrint krb5_asn1.NameTypeValues.namedValues
krb5_asn1.PADataType.prettyPrintNamedValues = krb5_asn1.PADataTypeValues.namedValues krb5_asn1.NameType.prettyPrint =\
krb5_asn1.PADataType.prettyPrint = Integer_NamedValues_prettyPrint Integer_NamedValues_prettyPrint
krb5_asn1.EncryptionType.prettyPrintNamedValues = krb5_asn1.EncryptionTypeValues.namedValues krb5_asn1.AuthDataType.prettyPrintNamedValues =\
krb5_asn1.EncryptionType.prettyPrint = Integer_NamedValues_prettyPrint krb5_asn1.AuthDataTypeValues.namedValues
krb5_asn1.ChecksumType.prettyPrintNamedValues = krb5_asn1.ChecksumTypeValues.namedValues krb5_asn1.AuthDataType.prettyPrint =\
krb5_asn1.ChecksumType.prettyPrint = Integer_NamedValues_prettyPrint Integer_NamedValues_prettyPrint
krb5_asn1.PADataType.prettyPrintNamedValues =\
krb5_asn1.PADataTypeValues.namedValues
krb5_asn1.PADataType.prettyPrint =\
Integer_NamedValues_prettyPrint
krb5_asn1.EncryptionType.prettyPrintNamedValues =\
krb5_asn1.EncryptionTypeValues.namedValues
krb5_asn1.EncryptionType.prettyPrint =\
Integer_NamedValues_prettyPrint
krb5_asn1.ChecksumType.prettyPrintNamedValues =\
krb5_asn1.ChecksumTypeValues.namedValues
krb5_asn1.ChecksumType.prettyPrint =\
Integer_NamedValues_prettyPrint
class Krb5EncryptionKey(object): class Krb5EncryptionKey(object):
def __init__(self, key, kvno): def __init__(self, key, kvno):
@@ -146,9 +174,10 @@ class Krb5EncryptionKey(object):
EncryptionKey_obj = { EncryptionKey_obj = {
'keytype': self.etype, 'keytype': self.etype,
'keyvalue': self.key.contents, 'keyvalue': self.key.contents,
}; }
return EncryptionKey_obj return EncryptionKey_obj
class RawKerberosTest(TestCase): class RawKerberosTest(TestCase):
"""A raw Kerberos Test case.""" """A raw Kerberos Test case."""
@@ -182,13 +211,13 @@ class RawKerberosTest(TestCase):
self.s = socket.socket(self.a[0][0], self.a[0][1], self.a[0][2]) self.s = socket.socket(self.a[0][0], self.a[0][1], self.a[0][2])
self.s.settimeout(10) self.s.settimeout(10)
self.s.connect(self.a[0][4]) self.s.connect(self.a[0][4])
except socket.error as e: except socket.error:
self.s.close() self.s.close()
raise raise
except IOError as e: except IOError:
self.s.close() self.s.close()
raise raise
except Exception as e: except Exception:
raise raise
finally: finally:
pass pass
@@ -219,7 +248,8 @@ class RawKerberosTest(TestCase):
domain = samba.tests.env_get_var_value('DOMAIN') domain = samba.tests.env_get_var_value('DOMAIN')
realm = samba.tests.env_get_var_value('REALM') realm = samba.tests.env_get_var_value('REALM')
username = samba.tests.env_get_var_value('SERVICE_USERNAME') username = samba.tests.env_get_var_value('SERVICE_USERNAME')
password = samba.tests.env_get_var_value('SERVICE_PASSWORD', password = samba.tests.env_get_var_value(
'SERVICE_PASSWORD',
allow_missing=allow_missing_password) allow_missing=allow_missing_password)
c.set_domain(domain) c.set_domain(domain)
c.set_realm(realm) c.set_realm(realm)
@@ -246,9 +276,16 @@ class RawKerberosTest(TestCase):
if hexdump is None: if hexdump is None:
hexdump = self.do_hexdump hexdump = self.do_hexdump
if hexdump: if hexdump:
sys.stderr.write("%s: %d\n%s" % (name, len(blob), self.hexdump(blob))) sys.stderr.write(
"%s: %d\n%s" % (name, len(blob), self.hexdump(blob)))
def der_decode(self, blob, asn1Spec=None, native_encode=True, asn1_print=None, hexdump=None): def der_decode(
self,
blob,
asn1Spec=None,
native_encode=True,
asn1_print=None,
hexdump=None):
if asn1Spec is not None: if asn1Spec is not None:
class_name = type(asn1Spec).__name__.split(':')[0] class_name = type(asn1Spec).__name__.split(':')[0]
else: else:
@@ -260,7 +297,13 @@ class RawKerberosTest(TestCase):
obj = pyasn1_native_encode(obj) obj = pyasn1_native_encode(obj)
return obj return obj
def der_encode(self, obj, asn1Spec=None, native_decode=True, asn1_print=None, hexdump=None): def der_encode(
self,
obj,
asn1Spec=None,
native_decode=True,
asn1_print=None,
hexdump=None):
if native_decode: if native_decode:
obj = pyasn1_native_decode(obj, asn1Spec=asn1Spec) obj = pyasn1_native_decode(obj, asn1Spec=asn1Spec)
class_name = type(obj).__name__.split(':')[0] class_name = type(obj).__name__.split(':')[0]
@@ -273,7 +316,8 @@ class RawKerberosTest(TestCase):
def send_pdu(self, req, asn1_print=None, hexdump=None): def send_pdu(self, req, asn1_print=None, hexdump=None):
try: try:
k5_pdu = self.der_encode(req, native_decode=False, asn1_print=asn1_print, hexdump=False) k5_pdu = self.der_encode(
req, native_decode=False, asn1_print=asn1_print, hexdump=False)
header = struct.pack('>I', len(k5_pdu)) header = struct.pack('>I', len(k5_pdu))
req_pdu = header req_pdu = header
req_pdu += k5_pdu req_pdu += k5_pdu
@@ -304,7 +348,7 @@ class RawKerberosTest(TestCase):
self._disconnect("recv_raw: EOF") self._disconnect("recv_raw: EOF")
return None return None
self.hex_dump("recv_raw", rep_pdu, hexdump=hexdump) self.hex_dump("recv_raw", rep_pdu, hexdump=hexdump)
except socket.timeout as e: except socket.timeout:
self.s.settimeout(10) self.s.settimeout(10)
sys.stderr.write("recv_raw: TIMEOUT\n") sys.stderr.write("recv_raw: TIMEOUT\n")
pass pass
@@ -322,7 +366,8 @@ class RawKerberosTest(TestCase):
rep_pdu = None rep_pdu = None
rep = None rep = None
try: try:
raw_pdu = self.recv_raw(num_recv=4, hexdump=hexdump, timeout=timeout) raw_pdu = self.recv_raw(
num_recv=4, hexdump=hexdump, timeout=timeout)
if raw_pdu is None: if raw_pdu is None:
return (None, None) return (None, None)
header = struct.unpack(">I", raw_pdu[0:4]) header = struct.unpack(">I", raw_pdu[0:4])
@@ -332,12 +377,17 @@ class RawKerberosTest(TestCase):
missing = k5_len missing = k5_len
rep_pdu = b'' rep_pdu = b''
while missing > 0: while missing > 0:
raw_pdu = self.recv_raw(num_recv=missing, hexdump=hexdump, timeout=timeout) raw_pdu = self.recv_raw(
num_recv=missing, hexdump=hexdump, timeout=timeout)
self.assertGreaterEqual(len(raw_pdu), 1) self.assertGreaterEqual(len(raw_pdu), 1)
rep_pdu += raw_pdu rep_pdu += raw_pdu
missing = k5_len - len(rep_pdu) missing = k5_len - len(rep_pdu)
k5_raw = self.der_decode(rep_pdu, asn1Spec=None, native_encode=False, k5_raw = self.der_decode(
asn1_print=False, hexdump=False) rep_pdu,
asn1Spec=None,
native_encode=False,
asn1_print=False,
hexdump=False)
pvno = k5_raw['field-0'] pvno = k5_raw['field-0']
self.assertEqual(pvno, 5) self.assertEqual(pvno, 5)
msg_type = k5_raw['field-1'] msg_type = k5_raw['field-1']
@@ -368,11 +418,17 @@ class RawKerberosTest(TestCase):
self.assertIsNone(self.s, msg="Is connected") self.assertIsNone(self.s, msg="Is connected")
return return
def send_recv_transaction(self, req, asn1_print=None, hexdump=None, timeout=None): def send_recv_transaction(
self,
req,
asn1_print=None,
hexdump=None,
timeout=None):
self.connect() self.connect()
try: try:
self.send_pdu(req, asn1_print=asn1_print, hexdump=hexdump) self.send_pdu(req, asn1_print=asn1_print, hexdump=hexdump)
rep = self.recv_pdu(asn1_print=asn1_print, hexdump=hexdump, timeout=timeout) rep = self.recv_pdu(
asn1_print=asn1_print, hexdump=hexdump, timeout=timeout)
except Exception: except Exception:
self._disconnect("transaction failed") self._disconnect("transaction failed")
raise raise
@@ -389,10 +445,14 @@ class RawKerberosTest(TestCase):
def assertPrincipalEqual(self, princ1, princ2): def assertPrincipalEqual(self, princ1, princ2):
self.assertEqual(princ1['name-type'], princ2['name-type']) self.assertEqual(princ1['name-type'], princ2['name-type'])
self.assertEqual(len(princ1['name-string']), len(princ2['name-string']), self.assertEqual(
len(princ1['name-string']),
len(princ2['name-string']),
msg="princ1=%s != princ2=%s" % (princ1, princ2)) msg="princ1=%s != princ2=%s" % (princ1, princ2))
for idx in range(len(princ1['name-string'])): for idx in range(len(princ1['name-string'])):
self.assertEqual(princ1['name-string'][idx], princ2['name-string'][idx], self.assertEqual(
princ1['name-string'][idx],
princ2['name-string'][idx],
msg="princ1=%s != princ2=%s" % (princ1, princ2)) msg="princ1=%s != princ2=%s" % (princ1, princ2))
return return
@@ -421,7 +481,7 @@ class RawKerberosTest(TestCase):
salt = None salt = None
try: try:
salt = etype_info2['salt'] salt = etype_info2['salt']
except: except Exception:
pass pass
if e == kcrypto.Enctype.RC4: if e == kcrypto.Enctype.RC4:
@@ -429,7 +489,8 @@ class RawKerberosTest(TestCase):
return self.SessionKey_create(etype=e, contents=nthash, kvno=kvno) return self.SessionKey_create(etype=e, contents=nthash, kvno=kvno)
password = creds.get_password() password = creds.get_password()
return self.PasswordKey_create(etype=e, pwd=password, salt=salt, kvno=kvno) return self.PasswordKey_create(
etype=e, pwd=password, salt=salt, kvno=kvno)
def RandomKey(self, etype): def RandomKey(self, etype):
e = kcrypto._get_enctype_profile(etype) e = kcrypto._get_enctype_profile(etype)
@@ -532,7 +593,8 @@ class RawKerberosTest(TestCase):
# till [5] KerberosTime, # till [5] KerberosTime,
# rtime [6] KerberosTime OPTIONAL, # rtime [6] KerberosTime OPTIONAL,
# nonce [7] UInt32, # nonce [7] UInt32,
# etype [8] SEQUENCE OF Int32 -- EncryptionType # etype [8] SEQUENCE OF Int32
# -- EncryptionType
# -- in preference order --, # -- in preference order --,
# addresses [9] HostAddresses OPTIONAL, # addresses [9] HostAddresses OPTIONAL,
# enc-authorization-data [10] EncryptedData OPTIONAL # enc-authorization-data [10] EncryptedData OPTIONAL
@@ -541,11 +603,13 @@ class RawKerberosTest(TestCase):
# -- NOTE: not empty # -- NOTE: not empty
# } # }
if EncAuthorizationData is not None: if EncAuthorizationData is not None:
enc_ad_plain = self.der_encode(EncAuthorizationData, enc_ad_plain = self.der_encode(
EncAuthorizationData,
asn1Spec=krb5_asn1.AuthorizationData(), asn1Spec=krb5_asn1.AuthorizationData(),
asn1_print=asn1_print, asn1_print=asn1_print,
hexdump=hexdump) hexdump=hexdump)
enc_ad = self.EncryptedData_create(EncAuthorizationData_key, enc_ad_plain) enc_ad = self.EncryptedData_create(
EncAuthorizationData_key, enc_ad_plain)
else: else:
enc_ad = None enc_ad = None
KDC_REQ_BODY_obj = { KDC_REQ_BODY_obj = {
@@ -622,7 +686,8 @@ class RawKerberosTest(TestCase):
if padata is not None: if padata is not None:
KDC_REQ_obj['padata'] = padata KDC_REQ_obj['padata'] = padata
if asn1Spec is not None: if asn1Spec is not None:
KDC_REQ_decoded = pyasn1_native_decode(KDC_REQ_obj, asn1Spec=asn1Spec) KDC_REQ_decoded = pyasn1_native_decode(
KDC_REQ_obj, asn1Spec=asn1Spec)
else: else:
KDC_REQ_decoded = None KDC_REQ_decoded = None
return KDC_REQ_obj, KDC_REQ_decoded return KDC_REQ_obj, KDC_REQ_decoded
@@ -666,7 +731,8 @@ class RawKerberosTest(TestCase):
# till [5] KerberosTime, # till [5] KerberosTime,
# rtime [6] KerberosTime OPTIONAL, # rtime [6] KerberosTime OPTIONAL,
# nonce [7] UInt32, # nonce [7] UInt32,
# etype [8] SEQUENCE OF Int32 -- EncryptionType # etype [8] SEQUENCE OF Int32
# -- EncryptionType
# -- in preference order --, # -- in preference order --,
# addresses [9] HostAddresses OPTIONAL, # addresses [9] HostAddresses OPTIONAL,
# enc-authorization-data [10] EncryptedData OPTIONAL # enc-authorization-data [10] EncryptedData OPTIONAL
@@ -674,7 +740,8 @@ class RawKerberosTest(TestCase):
# additional-tickets [11] SEQUENCE OF Ticket OPTIONAL # additional-tickets [11] SEQUENCE OF Ticket OPTIONAL
# -- NOTE: not empty # -- NOTE: not empty
# } # }
obj,decoded = self.KDC_REQ_create(msg_type=10, obj, decoded = self.KDC_REQ_create(
msg_type=10,
padata=padata, padata=padata,
kdc_options=kdc_options, kdc_options=kdc_options,
cname=cname, cname=cname,
@@ -713,7 +780,8 @@ class RawKerberosTest(TestCase):
} }
return AP_REQ_obj return AP_REQ_obj
def Authenticator_create(self, crealm, cname, cksum, cusec, ctime, subkey, seq_number, def Authenticator_create(
self, crealm, cname, cksum, cusec, ctime, subkey, seq_number,
authorization_data): authorization_data):
# -- Unencrypted authenticator # -- Unencrypted authenticator
# Authenticator ::= [APPLICATION 2] SEQUENCE { # Authenticator ::= [APPLICATION 2] SEQUENCE {
@@ -789,7 +857,8 @@ class RawKerberosTest(TestCase):
# till [5] KerberosTime, # till [5] KerberosTime,
# rtime [6] KerberosTime OPTIONAL, # rtime [6] KerberosTime OPTIONAL,
# nonce [7] UInt32, # nonce [7] UInt32,
# etype [8] SEQUENCE OF Int32 -- EncryptionType # etype [8] SEQUENCE OF Int32
# -- EncryptionType
# -- in preference order --, # -- in preference order --,
# addresses [9] HostAddresses OPTIONAL, # addresses [9] HostAddresses OPTIONAL,
# enc-authorization-data [10] EncryptedData OPTIONAL # enc-authorization-data [10] EncryptedData OPTIONAL
@@ -798,7 +867,8 @@ class RawKerberosTest(TestCase):
# -- NOTE: not empty # -- NOTE: not empty
# } # }
req_body = self.KDC_REQ_BODY_create(kdc_options=kdc_options, req_body = self.KDC_REQ_BODY_create(
kdc_options=kdc_options,
cname=None, cname=None,
realm=realm, realm=realm,
sname=sname, sname=sname,
@@ -814,14 +884,15 @@ class RawKerberosTest(TestCase):
req_body = self.der_encode(req_body, asn1Spec=krb5_asn1.KDC_REQ_BODY(), req_body = self.der_encode(req_body, asn1Spec=krb5_asn1.KDC_REQ_BODY(),
asn1_print=asn1_print, hexdump=hexdump) asn1_print=asn1_print, hexdump=hexdump)
req_body_checksum = self.Checksum_create(ticket_session_key, 6, req_body, req_body_checksum = self.Checksum_create(
ctype=body_checksum_type) ticket_session_key, 6, req_body, ctype=body_checksum_type)
subkey_obj = None subkey_obj = None
if authenticator_subkey is not None: if authenticator_subkey is not None:
subkey_obj = authenticator_subkey.export_obj() subkey_obj = authenticator_subkey.export_obj()
seq_number = random.randint(0, 0xfffffffe) seq_number = random.randint(0, 0xfffffffe)
authenticator = self.Authenticator_create(crealm=realm, authenticator = self.Authenticator_create(
crealm=realm,
cname=cname, cname=cname,
cksum=req_body_checksum, cksum=req_body_checksum,
cusec=cusec, cusec=cusec,
@@ -829,10 +900,14 @@ class RawKerberosTest(TestCase):
subkey=subkey_obj, subkey=subkey_obj,
seq_number=seq_number, seq_number=seq_number,
authorization_data=None) authorization_data=None)
authenticator = self.der_encode(authenticator, asn1Spec=krb5_asn1.Authenticator(), authenticator = self.der_encode(
asn1_print=asn1_print, hexdump=hexdump) authenticator,
asn1Spec=krb5_asn1.Authenticator(),
asn1_print=asn1_print,
hexdump=hexdump)
authenticator = self.EncryptedData_create(ticket_session_key, 7, authenticator) authenticator = self.EncryptedData_create(
ticket_session_key, 7, authenticator)
ap_options = krb5_asn1.APOptions('0') ap_options = krb5_asn1.APOptions('0')
ap_req = self.AP_REQ_create(ap_options=str(ap_options), ap_req = self.AP_REQ_create(ap_options=str(ap_options),
@@ -846,7 +921,8 @@ class RawKerberosTest(TestCase):
else: else:
padata = [pa_tgs_req] padata = [pa_tgs_req]
obj,decoded = self.KDC_REQ_create(msg_type=12, obj, decoded = self.KDC_REQ_create(
msg_type=12,
padata=padata, padata=padata,
kdc_options=kdc_options, kdc_options=kdc_options,
cname=None, cname=None,
@@ -888,5 +964,6 @@ class RawKerberosTest(TestCase):
'cksum': cksum, 'cksum': cksum,
'auth': "Kerberos", 'auth': "Kerberos",
} }
pa_s4u2self = self.der_encode(PA_S4U2Self_obj, asn1Spec=krb5_asn1.PA_S4U2Self()) pa_s4u2self = self.der_encode(
PA_S4U2Self_obj, asn1Spec=krb5_asn1.PA_S4U2Self())
return self.PA_DATA_create(129, pa_s4u2self) return self.PA_DATA_create(129, pa_s4u2self)

View File

@@ -35,6 +35,7 @@ import samba.tests.krb5.rfc4120_pyasn1 as krb5_asn1
global_asn1_print = False global_asn1_print = False
global_hexdump = False global_hexdump = False
class S4UKerberosTests(RawKerberosTest): class S4UKerberosTests(RawKerberosTest):
def setUp(self): def setUp(self):
@@ -76,14 +77,16 @@ class S4UKerberosTests(RawKerberosTest):
self.assertEqual(rep['msg-type'], 30) self.assertEqual(rep['msg-type'], 30)
self.assertEqual(rep['error-code'], 25) self.assertEqual(rep['error-code'], 25)
rep_padata = self.der_decode(rep['e-data'], asn1Spec=krb5_asn1.METHOD_DATA()) rep_padata = self.der_decode(
rep['e-data'], asn1Spec=krb5_asn1.METHOD_DATA())
for pa in rep_padata: for pa in rep_padata:
if pa['padata-type'] == 19: if pa['padata-type'] == 19:
etype_info2 = pa['padata-value'] etype_info2 = pa['padata-value']
break break
etype_info2 = self.der_decode(etype_info2, asn1Spec=krb5_asn1.ETYPE_INFO2()) etype_info2 = self.der_decode(
etype_info2, asn1Spec=krb5_asn1.ETYPE_INFO2())
key = self.PasswordKey_from_etype_info2(service_creds, etype_info2[0]) key = self.PasswordKey_from_etype_info2(service_creds, etype_info2[0])
@@ -120,7 +123,8 @@ class S4UKerberosTests(RawKerberosTest):
self.assertEqual(msg_type, 11) self.assertEqual(msg_type, 11)
enc_part2 = key.decrypt(KU_AS_REP_ENC_PART, rep['enc-part']['cipher']) enc_part2 = key.decrypt(KU_AS_REP_ENC_PART, rep['enc-part']['cipher'])
enc_part2 = self.der_decode(enc_part2, asn1Spec=krb5_asn1.EncASRepPart()) enc_part2 = self.der_decode(
enc_part2, asn1Spec=krb5_asn1.EncASRepPart())
# S4U2Self Request # S4U2Self Request
sname = cname sname = cname
@@ -167,11 +171,13 @@ class S4UKerberosTests(RawKerberosTest):
if msg_type == 13: if msg_type == 13:
enc_part2 = subkey.decrypt( enc_part2 = subkey.decrypt(
KU_TGS_REP_ENC_PART_SUB_KEY, rep['enc-part']['cipher']) KU_TGS_REP_ENC_PART_SUB_KEY, rep['enc-part']['cipher'])
enc_part2 = self.der_decode(enc_part2, asn1Spec=krb5_asn1.EncTGSRepPart()) enc_part2 = self.der_decode(
enc_part2, asn1Spec=krb5_asn1.EncTGSRepPart())
return msg_type return msg_type
# Using the checksum type from the tgt_session_key happens to work everywhere # Using the checksum type from the tgt_session_key happens to work
# everywhere
def test_s4u2self(self): def test_s4u2self(self):
msg_type = self._test_s4u2self() msg_type = self._test_s4u2self()
self.assertEqual(msg_type, 13) self.assertEqual(msg_type, 13)
@@ -193,6 +199,7 @@ class S4UKerberosTests(RawKerberosTest):
msg_type = self._test_s4u2self(pa_s4u2self_ctype=Cksumtype.CRC32) msg_type = self._test_s4u2self(pa_s4u2self_ctype=Cksumtype.CRC32)
self.assertEqual(msg_type, 30) self.assertEqual(msg_type, 30)
if __name__ == "__main__": if __name__ == "__main__":
global_asn1_print = True global_asn1_print = True
global_hexdump = True global_hexdump = True

View File

@@ -33,6 +33,7 @@ import samba.tests.krb5.rfc4120_pyasn1 as krb5_asn1
global_asn1_print = False global_asn1_print = False
global_hexdump = False global_hexdump = False
class SimpleKerberosTests(RawKerberosTest): class SimpleKerberosTests(RawKerberosTest):
def setUp(self): def setUp(self):
@@ -74,14 +75,16 @@ class SimpleKerberosTests(RawKerberosTest):
self.assertEqual(rep['msg-type'], 30) self.assertEqual(rep['msg-type'], 30)
self.assertEqual(rep['error-code'], 25) self.assertEqual(rep['error-code'], 25)
rep_padata = self.der_decode(rep['e-data'], asn1Spec=krb5_asn1.METHOD_DATA()) rep_padata = self.der_decode(
rep['e-data'], asn1Spec=krb5_asn1.METHOD_DATA())
for pa in rep_padata: for pa in rep_padata:
if pa['padata-type'] == 19: if pa['padata-type'] == 19:
etype_info2 = pa['padata-value'] etype_info2 = pa['padata-value']
break break
etype_info2 = self.der_decode(etype_info2, asn1Spec=krb5_asn1.ETYPE_INFO2()) etype_info2 = self.der_decode(
etype_info2, asn1Spec=krb5_asn1.ETYPE_INFO2())
key = self.PasswordKey_from_etype_info2(user_creds, etype_info2[0]) key = self.PasswordKey_from_etype_info2(user_creds, etype_info2[0])
@@ -119,17 +122,21 @@ class SimpleKerberosTests(RawKerberosTest):
enc_part2 = key.decrypt(KU_AS_REP_ENC_PART, rep['enc-part']['cipher']) enc_part2 = key.decrypt(KU_AS_REP_ENC_PART, rep['enc-part']['cipher'])
# MIT KDC encodes both EncASRepPart and EncTGSRepPart with application tag 26 # MIT KDC encodes both EncASRepPart and EncTGSRepPart with
# application tag 26
try: try:
enc_part2 = self.der_decode(enc_part2, asn1Spec=krb5_asn1.EncASRepPart()) enc_part2 = self.der_decode(
enc_part2, asn1Spec=krb5_asn1.EncASRepPart())
except Exception: except Exception:
enc_part2 = self.der_decode(enc_part2, asn1Spec=krb5_asn1.EncTGSRepPart()) enc_part2 = self.der_decode(
enc_part2, asn1Spec=krb5_asn1.EncTGSRepPart())
# TGS Request # TGS Request
service_creds = self.get_service_creds(allow_missing_password=True) service_creds = self.get_service_creds(allow_missing_password=True)
service_name = service_creds.get_username() service_name = service_creds.get_username()
sname = self.PrincipalName_create(name_type=2, names=["host", service_name]) sname = self.PrincipalName_create(
name_type=2, names=["host", service_name])
kdc_options = krb5_asn1.KDCOptions('forwardable') kdc_options = krb5_asn1.KDCOptions('forwardable')
till = self.get_KerberosTime(offset=36000) till = self.get_KerberosTime(offset=36000)
ticket = rep['ticket'] ticket = rep['ticket']
@@ -167,7 +174,8 @@ class SimpleKerberosTests(RawKerberosTest):
enc_part2 = subkey.decrypt( enc_part2 = subkey.decrypt(
KU_TGS_REP_ENC_PART_SUB_KEY, rep['enc-part']['cipher']) KU_TGS_REP_ENC_PART_SUB_KEY, rep['enc-part']['cipher'])
enc_part2 = self.der_decode(enc_part2, asn1Spec=krb5_asn1.EncTGSRepPart()) enc_part2 = self.der_decode(
enc_part2, asn1Spec=krb5_asn1.EncTGSRepPart())
return return

View File

@@ -34,6 +34,7 @@ import samba.tests
global_asn1_print = False global_asn1_print = False
global_hexdump = False global_hexdump = False
class XrealmKerberosTests(RawKerberosTest): class XrealmKerberosTests(RawKerberosTest):
def setUp(self): def setUp(self):
@@ -75,14 +76,16 @@ class XrealmKerberosTests(RawKerberosTest):
self.assertEqual(rep['msg-type'], 30) self.assertEqual(rep['msg-type'], 30)
self.assertEqual(rep['error-code'], 25) self.assertEqual(rep['error-code'], 25)
rep_padata = self.der_decode(rep['e-data'], asn1Spec=krb5_asn1.METHOD_DATA()) rep_padata = self.der_decode(
rep['e-data'], asn1Spec=krb5_asn1.METHOD_DATA())
for pa in rep_padata: for pa in rep_padata:
if pa['padata-type'] == 19: if pa['padata-type'] == 19:
etype_info2 = pa['padata-value'] etype_info2 = pa['padata-value']
break break
etype_info2 = self.der_decode(etype_info2, asn1Spec=krb5_asn1.ETYPE_INFO2()) etype_info2 = self.der_decode(
etype_info2, asn1Spec=krb5_asn1.ETYPE_INFO2())
key = self.PasswordKey_from_etype_info2(user_creds, etype_info2[0]) key = self.PasswordKey_from_etype_info2(user_creds, etype_info2[0])
@@ -120,15 +123,19 @@ class XrealmKerberosTests(RawKerberosTest):
enc_part2 = key.decrypt(KU_AS_REP_ENC_PART, rep['enc-part']['cipher']) enc_part2 = key.decrypt(KU_AS_REP_ENC_PART, rep['enc-part']['cipher'])
# MIT KDC encodes both EncASRepPart and EncTGSRepPart with application tag 26 # MIT KDC encodes both EncASRepPart and EncTGSRepPart with
# application tag 26
try: try:
enc_part2 = self.der_decode(enc_part2, asn1Spec=krb5_asn1.EncASRepPart()) enc_part2 = self.der_decode(
enc_part2, asn1Spec=krb5_asn1.EncASRepPart())
except Exception: except Exception:
enc_part2 = self.der_decode(enc_part2, asn1Spec=krb5_asn1.EncTGSRepPart()) enc_part2 = self.der_decode(
enc_part2, asn1Spec=krb5_asn1.EncTGSRepPart())
# TGS Request (for cross-realm TGT) # TGS Request (for cross-realm TGT)
trust_realm = samba.tests.env_get_var_value('TRUST_REALM') trust_realm = samba.tests.env_get_var_value('TRUST_REALM')
sname = self.PrincipalName_create(name_type=2, names=["krbtgt", trust_realm]) sname = self.PrincipalName_create(
name_type=2, names=["krbtgt", trust_realm])
kdc_options = krb5_asn1.KDCOptions('forwardable') kdc_options = krb5_asn1.KDCOptions('forwardable')
till = self.get_KerberosTime(offset=36000) till = self.get_KerberosTime(offset=36000)
@@ -167,7 +174,8 @@ class XrealmKerberosTests(RawKerberosTest):
enc_part2 = subkey.decrypt( enc_part2 = subkey.decrypt(
KU_TGS_REP_ENC_PART_SUB_KEY, rep['enc-part']['cipher']) KU_TGS_REP_ENC_PART_SUB_KEY, rep['enc-part']['cipher'])
enc_part2 = self.der_decode(enc_part2, asn1Spec=krb5_asn1.EncTGSRepPart()) enc_part2 = self.der_decode(
enc_part2, asn1Spec=krb5_asn1.EncTGSRepPart())
# Check the forwardable flag # Check the forwardable flag
fwd_pos = len(tuple(krb5_asn1.TicketFlags('forwardable'))) - 1 fwd_pos = len(tuple(krb5_asn1.TicketFlags('forwardable'))) - 1