1
0
mirror of https://github.com/samba-team/samba.git synced 2025-02-12 21:58:10 +03:00

tests/krb5: formatting

Signed-off-by: Joseph Sutton <josephsutton@catalyst.net.nz>
Reviewed-by: Andrew Bartlett <abartlet@samba.org>
Reviewed-by: Andreas Schneider <asn@samba.org>
BUG: https://bugzilla.samba.org/show_bug.cgi?id=14817
(cherry picked from commit df6623363a7ec1a13af48a09e1d29fa8784e825c)
This commit is contained in:
Joseph Sutton 2021-08-02 17:00:09 +12:00 committed by Jule Anger
parent 27e3155358
commit 0e276e08fb
3 changed files with 209 additions and 156 deletions

View File

@ -21,10 +21,7 @@ import os
from datetime import datetime, timezone
import tempfile
import binascii
import struct
sys.path.insert(0, "bin/python")
os.environ["PYTHONUNBUFFERED"] = "1"
from collections import namedtuple
import ldb
from ldb import SCOPE_BASE
@ -66,6 +63,9 @@ from samba.tests.krb5.rfc4120_constants import (
PADATA_ETYPE_INFO2,
)
sys.path.insert(0, "bin/python")
os.environ["PYTHONUNBUFFERED"] = "1"
global_asn1_print = False
global_hexdump = False
@ -337,6 +337,7 @@ class KDCBaseTest(RawKerberosTest):
require_strongest_key=False):
if require_strongest_key:
self.assertTrue(require_keys)
def download_krbtgt_creds():
samdb = self.get_samdb()
@ -750,7 +751,8 @@ class KDCBaseTest(RawKerberosTest):
"Ticket not yet valid - clocks may be out of sync.")
self.assertGreater(cred.endtime - 60 * 60,
datetime.now(timezone.utc).timestamp(),
"Ticket already expired/about to expire - clocks may be out of sync.")
"Ticket already expired/about to expire - "
"clocks may be out of sync.")
cred.renew_till = cred.endtime
cred.is_skey = 0

View File

@ -24,11 +24,19 @@ import datetime
import random
import binascii
import itertools
from pyasn1.codec.der.decoder import decode as pyasn1_der_decode
from pyasn1.codec.der.encoder import encode as pyasn1_der_encode
from pyasn1.codec.native.decoder import decode as pyasn1_native_decode
from pyasn1.codec.native.encoder import encode as pyasn1_native_encode
from pyasn1.codec.ber.encoder import BitStringEncoder
from samba.credentials import Credentials
from samba.dcerpc import security
import samba.tests
from samba.credentials import Credentials
from samba.tests import TestCaseInTempDir
from samba.dcerpc import security
import samba.tests.krb5.rfc4120_pyasn1 as krb5_asn1
from samba.tests.krb5.rfc4120_constants import (
KDC_ERR_ETYPE_NOSUPP,
@ -53,13 +61,6 @@ from samba.tests.krb5.rfc4120_constants import (
)
import samba.tests.krb5.kcrypto as kcrypto
from pyasn1.codec.der.decoder import decode as pyasn1_der_decode
from pyasn1.codec.der.encoder import encode as pyasn1_der_encode
from pyasn1.codec.native.decoder import decode as pyasn1_native_decode
from pyasn1.codec.native.encoder import encode as pyasn1_native_encode
from pyasn1.codec.ber.encoder import BitStringEncoder as BitStringEncoder
def BitStringEncoder_encodeValue32(
self, value, asn1Spec, encodeFun, **options):
@ -217,6 +218,7 @@ class Krb5EncryptionKey(object):
}
return EncryptionKey_obj
class KerberosCredentials(Credentials):
def __init__(self):
super(KerberosCredentials, self).__init__()
@ -293,6 +295,7 @@ class KerberosCredentials(Credentials):
def get_forced_salt(self):
return self.forced_salt
class KerberosTicketCreds(object):
def __init__(self, ticket, session_key,
crealm=None, cname=None,
@ -311,6 +314,7 @@ class KerberosTicketCreds(object):
self.encpart_private = encpart_private
return
class RawKerberosTest(TestCaseInTempDir):
"""A raw Kerberos Test case."""
@ -386,7 +390,8 @@ class RawKerberosTest(TestCaseInTempDir):
self.do_asn1_print = False
self.do_hexdump = False
strict_checking = samba.tests.env_get_var_value('STRICT_CHECKING', allow_missing=True)
strict_checking = samba.tests.env_get_var_value('STRICT_CHECKING',
allow_missing=True)
if strict_checking is None:
strict_checking = '1'
self.strict_checking = bool(int(strict_checking))
@ -440,7 +445,8 @@ class RawKerberosTest(TestCaseInTempDir):
val = None
if prefix is not None:
allow_missing_prefix = allow_missing or fallback_default
val = samba.tests.env_get_var_value('%s_%s' % (prefix, varname),
val = samba.tests.env_get_var_value(
'%s_%s' % (prefix, varname),
allow_missing=allow_missing_prefix)
else:
fallback_default = True
@ -506,7 +512,8 @@ class RawKerberosTest(TestCaseInTempDir):
if aes256_key is not None:
c.set_forced_key(kcrypto.Enctype.AES256, aes256_key)
aes128_key = self.env_get_var('AES128_KEY_HEX', prefix,
fallback_default=False, allow_missing=True)
fallback_default=False,
allow_missing=True)
if aes128_key is not None:
c.set_forced_key(kcrypto.Enctype.AES128, aes128_key)
rc4_key = self.env_get_var('RC4_KEY_HEX', prefix,
@ -536,7 +543,8 @@ class RawKerberosTest(TestCaseInTempDir):
env_err = None
try:
# Try to obtain them from the environment
creds = self._get_krb5_creds_from_env(prefix,
creds = self._get_krb5_creds_from_env(
prefix,
default_username=default_username,
allow_missing_password=allow_missing_password,
allow_missing_keys=allow_missing_keys,
@ -936,7 +944,9 @@ class RawKerberosTest(TestCaseInTempDir):
if etype == kcrypto.Enctype.RC4:
nthash = creds.get_nt_hash()
self.assertIsNotNone(nthash, msg=fail_msg)
return self.SessionKey_create(etype=etype, contents=nthash, kvno=kvno)
return self.SessionKey_create(etype=etype,
contents=nthash,
kvno=kvno)
password = creds.get_password()
self.assertIsNotNone(password, msg=fail_msg)
@ -944,7 +954,10 @@ class RawKerberosTest(TestCaseInTempDir):
if salt is None:
salt = bytes("%s%s" % (creds.get_realm(), creds.get_username()),
encoding='utf-8')
return self.PasswordKey_create(etype=etype, pwd=password, salt=salt, kvno=kvno)
return self.PasswordKey_create(etype=etype,
pwd=password,
salt=salt,
kvno=kvno)
def RandomKey(self, etype):
e = kcrypto._get_enctype_profile(etype)
@ -1021,8 +1034,10 @@ class RawKerberosTest(TestCaseInTempDir):
def KERB_PA_PAC_REQUEST_create(self, include_pac, pa_data_create=True):
# KERB-PA-PAC-REQUEST ::= SEQUENCE {
# include-pac[0] BOOLEAN --If TRUE, and no pac present, include PAC.
# --If FALSE, and PAC present, remove PAC
# include-pac[0] BOOLEAN --If TRUE, and no pac present,
# -- include PAC.
# --If FALSE, and PAC present,
# -- remove PAC.
# }
KERB_PA_PAC_REQUEST_obj = {
'include-pac': include_pac,
@ -1327,11 +1342,14 @@ class RawKerberosTest(TestCaseInTempDir):
EncAuthorizationData=EncAuthorizationData,
EncAuthorizationData_key=EncAuthorizationData_key,
additional_tickets=additional_tickets)
req_body_blob = self.der_encode(req_body, asn1Spec=krb5_asn1.KDC_REQ_BODY(),
req_body_blob = self.der_encode(req_body,
asn1Spec=krb5_asn1.KDC_REQ_BODY(),
asn1_print=asn1_print, hexdump=hexdump)
req_body_checksum = self.Checksum_create(
ticket_session_key, 6, req_body_blob, ctype=body_checksum_type)
req_body_checksum = self.Checksum_create(ticket_session_key,
6,
req_body_blob,
ctype=body_checksum_type)
subkey_obj = None
if authenticator_subkey is not None:
@ -1390,7 +1408,10 @@ class RawKerberosTest(TestCaseInTempDir):
cksum_data += n.encode()
cksum_data += realm.encode()
cksum_data += "Kerberos".encode()
cksum = self.Checksum_create(tgt_session_key, 17, cksum_data, ctype)
cksum = self.Checksum_create(tgt_session_key,
17,
cksum_data,
ctype)
PA_S4U2Self_obj = {
'name': name,
@ -1431,7 +1452,8 @@ class RawKerberosTest(TestCaseInTempDir):
if nonce is None:
nonce = self.get_Nonce()
req_body = self.KDC_REQ_BODY_create(kdc_options=kdc_options,
req_body = self.KDC_REQ_BODY_create(
kdc_options=kdc_options,
cname=cname,
realm=realm,
sname=sname,
@ -1588,7 +1610,8 @@ class RawKerberosTest(TestCaseInTempDir):
if ticket_encpart is not None: # Never None, but gives indentation
self.assertElementPresent(ticket_encpart, 'etype')
# 'unspecified' means present, with any value != 0
self.assertElementKVNO(ticket_encpart, 'kvno', self.unspecified_kvno)
self.assertElementKVNO(ticket_encpart, 'kvno',
self.unspecified_kvno)
self.assertElementPresent(ticket_encpart, 'cipher')
ticket_cipher = self.getElementValue(ticket_encpart, 'cipher')
self.assertElementPresent(rep, 'enc-part')
@ -1602,24 +1625,35 @@ class RawKerberosTest(TestCaseInTempDir):
encpart_decryption_key = None
if check_padata_fn is not None:
# See if get the decryption key from the preauth phase
encpart_decryption_key,encpart_decryption_usage = \
# See if we can get the decryption key from the preauth phase
encpart_decryption_key, encpart_decryption_usage = (
check_padata_fn(kdc_exchange_dict, callback_dict,
rep, padata)
rep, padata))
ticket_private = None
if ticket_decryption_key is not None:
self.assertElementEqual(ticket_encpart, 'etype', ticket_decryption_key.etype)
self.assertElementKVNO(ticket_encpart, 'kvno', ticket_decryption_key.kvno)
ticket_decpart = ticket_decryption_key.decrypt(KU_TICKET, ticket_cipher)
ticket_private = self.der_decode(ticket_decpart, asn1Spec=krb5_asn1.EncTicketPart())
self.assertElementEqual(ticket_encpart, 'etype',
ticket_decryption_key.etype)
self.assertElementKVNO(ticket_encpart, 'kvno',
ticket_decryption_key.kvno)
ticket_decpart = ticket_decryption_key.decrypt(KU_TICKET,
ticket_cipher)
ticket_private = self.der_decode(
ticket_decpart,
asn1Spec=krb5_asn1.EncTicketPart())
encpart_private = None
if encpart_decryption_key is not None:
self.assertElementEqual(encpart, 'etype', encpart_decryption_key.etype)
self.assertElementKVNO(encpart, 'kvno', encpart_decryption_key.kvno)
rep_decpart = encpart_decryption_key.decrypt(encpart_decryption_usage, encpart_cipher)
encpart_private = self.der_decode(rep_decpart, asn1Spec=rep_encpart_asn1Spec())
self.assertElementEqual(encpart, 'etype',
encpart_decryption_key.etype)
self.assertElementKVNO(encpart, 'kvno',
encpart_decryption_key.kvno)
rep_decpart = encpart_decryption_key.decrypt(
encpart_decryption_usage,
encpart_cipher)
encpart_private = self.der_decode(
rep_decpart,
asn1Spec=rep_encpart_asn1Spec())
if check_kdc_private_fn is not None:
check_kdc_private_fn(kdc_exchange_dict, callback_dict,
@ -1651,8 +1685,10 @@ class RawKerberosTest(TestCaseInTempDir):
self.assertElementPresent(ticket_key, 'keytype')
self.assertElementPresent(ticket_key, 'keyvalue')
ticket_session_key = self.EncryptionKey_import(ticket_key)
self.assertElementEqualUTF8(ticket_private, 'crealm', expected_crealm)
self.assertElementEqualPrincipal(ticket_private, 'cname', expected_cname)
self.assertElementEqualUTF8(ticket_private, 'crealm',
expected_crealm)
self.assertElementEqualPrincipal(ticket_private, 'cname',
expected_cname)
self.assertElementPresent(ticket_private, 'transited')
self.assertElementPresent(ticket_private, 'authtime')
if self.strict_checking:
@ -1672,25 +1708,31 @@ class RawKerberosTest(TestCaseInTempDir):
encpart_session_key = self.EncryptionKey_import(encpart_key)
self.assertElementPresent(encpart_private, 'last-req')
self.assertElementPresent(encpart_private, 'nonce')
# TODO self.assertElementPresent(encpart_private, 'key-expiration')
# TODO self.assertElementPresent(encpart_private,
# 'key-expiration')
self.assertElementPresent(encpart_private, 'flags')
self.assertElementPresent(encpart_private, 'authtime')
if self.strict_checking:
self.assertElementPresent(encpart_private, 'starttime')
self.assertElementPresent(encpart_private, 'endtime')
# TODO self.assertElementPresent(encpart_private, 'renew-till')
self.assertElementEqualUTF8(encpart_private, 'srealm', expected_srealm)
self.assertElementEqualPrincipal(encpart_private, 'sname', expected_sname)
self.assertElementEqualUTF8(encpart_private, 'srealm',
expected_srealm)
self.assertElementEqualPrincipal(encpart_private, 'sname',
expected_sname)
# TODO self.assertElementMissing(encpart_private, 'caddr')
if ticket_session_key is not None and encpart_session_key is not None:
self.assertEqual(ticket_session_key.etype, encpart_session_key.etype)
self.assertEqual(ticket_session_key.key.contents, encpart_session_key.key.contents)
self.assertEqual(ticket_session_key.etype,
encpart_session_key.etype)
self.assertEqual(ticket_session_key.key.contents,
encpart_session_key.key.contents)
if encpart_session_key is not None:
session_key = encpart_session_key
else:
session_key = ticket_session_key
ticket_creds = KerberosTicketCreds(ticket,
ticket_creds = KerberosTicketCreds(
ticket,
session_key,
crealm=expected_crealm,
cname=expected_cname,
@ -1779,14 +1821,17 @@ class RawKerberosTest(TestCaseInTempDir):
if self.strict_checking:
self.assertIsNotNone(edata)
if edata is not None:
rep_padata = self.der_decode(edata, asn1Spec=krb5_asn1.METHOD_DATA())
rep_padata = self.der_decode(edata,
asn1Spec=krb5_asn1.METHOD_DATA())
self.assertGreater(len(rep_padata), 0)
else:
rep_padata = []
if self.strict_checking:
for i in range(0, len(expected_patypes)):
self.assertElementEqual(rep_padata[i], 'padata-type', expected_patypes[i])
self.assertElementEqual(rep_padata[i],
'padata-type',
expected_patypes[i])
self.assertEqual(len(rep_padata), len(expected_patypes))
etype_info2 = None
@ -1799,11 +1844,13 @@ class RawKerberosTest(TestCaseInTempDir):
pavalue = self.getElementValue(pa, 'padata-value')
if patype == PADATA_ETYPE_INFO2:
self.assertIsNone(etype_info2)
etype_info2 = self.der_decode(pavalue, asn1Spec=krb5_asn1.ETYPE_INFO2())
etype_info2 = self.der_decode(pavalue,
asn1Spec=krb5_asn1.ETYPE_INFO2())
continue
if patype == PADATA_ETYPE_INFO:
self.assertIsNone(etype_info)
etype_info = self.der_decode(pavalue, asn1Spec=krb5_asn1.ETYPE_INFO())
etype_info = self.der_decode(pavalue,
asn1Spec=krb5_asn1.ETYPE_INFO())
continue
if patype == PADATA_ENC_TIMESTAMP:
self.assertIsNone(enc_timestamp)
@ -1881,7 +1928,8 @@ class RawKerberosTest(TestCaseInTempDir):
authenticator_subkey = kdc_exchange_dict['authenticator_subkey']
body_checksum_type = kdc_exchange_dict['body_checksum_type']
req_body_blob = self.der_encode(req_body, asn1Spec=krb5_asn1.KDC_REQ_BODY())
req_body_blob = self.der_encode(req_body,
asn1Spec=krb5_asn1.KDC_REQ_BODY())
req_body_checksum = self.Checksum_create(tgt.session_key,
KU_TGS_REQ_AUTH_CKSUM,
@ -1893,7 +1941,8 @@ class RawKerberosTest(TestCaseInTempDir):
subkey_obj = authenticator_subkey.export_obj()
seq_number = random.randint(0, 0xfffffffe)
(ctime, cusec) = self.get_KerberosTimeWithUsec()
authenticator_obj = self.Authenticator_create(crealm=tgt.crealm,
authenticator_obj = self.Authenticator_create(
crealm=tgt.crealm,
cname=tgt.cname,
cksum=req_body_checksum,
cusec=cusec,
@ -1901,7 +1950,9 @@ class RawKerberosTest(TestCaseInTempDir):
subkey=subkey_obj,
seq_number=seq_number,
authorization_data=None)
authenticator_blob = self.der_encode(authenticator_obj, asn1Spec=krb5_asn1.Authenticator())
authenticator_blob = self.der_encode(
authenticator_obj,
asn1Spec=krb5_asn1.Authenticator())
authenticator = self.EncryptedData_create(tgt.session_key,
KU_TGS_REQ_AUTH,