mirror of
https://github.com/samba-team/samba.git
synced 2025-03-20 22:50:26 +03:00
tests/krb5/raw_testcase.py: Add get_{client,server,krbtgt}_creds()
These helpful functions allow us to build the various credentials that we will use in validating the KDC responses in this test. Signed-off-by: Stefan Metzmacher <metze@samba.org> Reviewed-by: Andrew Bartlett <abartlet@samba.org> BUG: https://bugzilla.samba.org/show_bug.cgi?id=14817 (cherry picked from commit c3222870b92db7f867557c2896b7bf39915d469a)
This commit is contained in:
parent
e63908db36
commit
d48196e12f
@ -22,10 +22,12 @@ import struct
|
||||
import time
|
||||
import datetime
|
||||
import random
|
||||
import binascii
|
||||
|
||||
import samba.tests
|
||||
from samba.credentials import Credentials
|
||||
from samba.tests import TestCaseInTempDir
|
||||
from samba.dcerpc import security
|
||||
import samba.tests.krb5.rfc4120_pyasn1 as krb5_asn1
|
||||
import samba.tests.krb5.kcrypto as kcrypto
|
||||
|
||||
@ -177,6 +179,81 @@ class Krb5EncryptionKey(object):
|
||||
}
|
||||
return EncryptionKey_obj
|
||||
|
||||
class KerberosCredentials(Credentials):
|
||||
def __init__(self):
|
||||
super(KerberosCredentials, self).__init__()
|
||||
all_enc_types = 0
|
||||
all_enc_types |= security.KERB_ENCTYPE_RC4_HMAC_MD5
|
||||
all_enc_types |= security.KERB_ENCTYPE_AES128_CTS_HMAC_SHA1_96
|
||||
all_enc_types |= security.KERB_ENCTYPE_AES256_CTS_HMAC_SHA1_96
|
||||
|
||||
self.as_supported_enctypes = all_enc_types
|
||||
self.tgs_supported_enctypes = all_enc_types
|
||||
self.ap_supported_enctypes = all_enc_types
|
||||
|
||||
self.kvno = None
|
||||
self.forced_keys = {}
|
||||
|
||||
self.forced_salt = None
|
||||
return
|
||||
|
||||
def set_as_supported_enctypes(self, value):
|
||||
self.as_supported_enctypes = int(value)
|
||||
return
|
||||
|
||||
def set_tgs_supported_enctypes(self, value):
|
||||
self.tgs_supported_enctypes = int(value)
|
||||
return
|
||||
|
||||
def set_ap_supported_enctypes(self, value):
|
||||
self.ap_supported_enctypes = int(value)
|
||||
return
|
||||
|
||||
def _get_krb5_etypes(self, supported_enctypes):
|
||||
etypes = ()
|
||||
|
||||
if supported_enctypes & security.KERB_ENCTYPE_AES256_CTS_HMAC_SHA1_96:
|
||||
etypes += (kcrypto.Enctype.AES256,)
|
||||
if supported_enctypes & security.KERB_ENCTYPE_AES128_CTS_HMAC_SHA1_96:
|
||||
etypes += (kcrypto.Enctype.AES128,)
|
||||
if supported_enctypes & security.KERB_ENCTYPE_RC4_HMAC_MD5:
|
||||
etypes += (kcrypto.Enctype.RC4,)
|
||||
|
||||
return etypes
|
||||
|
||||
def get_as_krb5_etypes(self):
|
||||
return self._get_krb5_etypes(self.as_supported_enctypes)
|
||||
|
||||
def get_tgs_krb5_etypes(self):
|
||||
return self._get_krb5_etypes(self.tgs_supported_enctypes)
|
||||
|
||||
def get_ap_krb5_etypes(self):
|
||||
return self._get_krb5_etypes(self.ap_supported_enctypes)
|
||||
|
||||
def set_kvno(self, kvno):
|
||||
self.kvno = kvno
|
||||
|
||||
def get_kvno(self):
|
||||
return self.kvno
|
||||
|
||||
def set_forced_key(self, etype, hexkey):
|
||||
etype = int(etype)
|
||||
contents = binascii.a2b_hex(hexkey)
|
||||
key = kcrypto.Key(etype, contents)
|
||||
self.forced_keys[etype] = Krb5EncryptionKey(key, self.kvno)
|
||||
|
||||
def get_forced_key(self, etype):
|
||||
etype = int(etype)
|
||||
if etype in self.forced_keys:
|
||||
return self.forced_keys[etype]
|
||||
return None
|
||||
|
||||
def set_forced_salt(self, salt):
|
||||
self.forced_salt = bytes(salt)
|
||||
return
|
||||
|
||||
def get_forced_salt(self):
|
||||
return self.forced_salt
|
||||
|
||||
class RawKerberosTest(TestCaseInTempDir):
|
||||
"""A raw Kerberos Test case."""
|
||||
@ -229,33 +306,113 @@ class RawKerberosTest(TestCaseInTempDir):
|
||||
sys.stderr.write("connected[%s]\n" % self.host)
|
||||
return
|
||||
|
||||
def get_user_creds(self):
|
||||
c = Credentials()
|
||||
def _get_krb5_creds(self, prefix,
|
||||
default_username=None,
|
||||
allow_missing_password=False,
|
||||
require_strongest_key=False):
|
||||
c = KerberosCredentials()
|
||||
c.guess()
|
||||
domain = samba.tests.env_get_var_value('DOMAIN')
|
||||
realm = samba.tests.env_get_var_value('REALM')
|
||||
username = samba.tests.env_get_var_value('USERNAME')
|
||||
password = samba.tests.env_get_var_value('PASSWORD')
|
||||
c.set_domain(domain)
|
||||
c.set_realm(realm)
|
||||
c.set_username(username)
|
||||
c.set_password(password)
|
||||
return c
|
||||
|
||||
def get_service_creds(self, allow_missing_password=False):
|
||||
c = Credentials()
|
||||
c.guess()
|
||||
domain = samba.tests.env_get_var_value('DOMAIN')
|
||||
realm = samba.tests.env_get_var_value('REALM')
|
||||
username = samba.tests.env_get_var_value('SERVICE_USERNAME')
|
||||
password = samba.tests.env_get_var_value(
|
||||
'SERVICE_PASSWORD',
|
||||
allow_missing=allow_missing_password)
|
||||
def env_get_var(varname, prefix, fallback_default=True, allow_missing=False):
|
||||
val = None
|
||||
if prefix is not None:
|
||||
allow_missing_prefix = allow_missing
|
||||
if fallback_default:
|
||||
allow_missing_prefix = True
|
||||
val = samba.tests.env_get_var_value('%s_%s' % (prefix, varname),
|
||||
allow_missing=allow_missing_prefix)
|
||||
else:
|
||||
fallback_default = True
|
||||
if val is None and fallback_default:
|
||||
val = samba.tests.env_get_var_value(varname,
|
||||
allow_missing=allow_missing)
|
||||
return val
|
||||
|
||||
domain = env_get_var('DOMAIN', prefix)
|
||||
realm = env_get_var('REALM', prefix)
|
||||
allow_missing_username = False
|
||||
if default_username is not None:
|
||||
allow_missing_username = True
|
||||
username = env_get_var('USERNAME', prefix,
|
||||
fallback_default=False,
|
||||
allow_missing=allow_missing_username)
|
||||
if username is None:
|
||||
username = default_username
|
||||
password = env_get_var('PASSWORD', prefix,
|
||||
fallback_default=False,
|
||||
allow_missing=allow_missing_password)
|
||||
c.set_domain(domain)
|
||||
c.set_realm(realm)
|
||||
c.set_username(username)
|
||||
if password is not None:
|
||||
c.set_password(password)
|
||||
as_supported_enctypes = env_get_var('AS_SUPPORTED_ENCTYPES',
|
||||
prefix, allow_missing=True)
|
||||
if as_supported_enctypes is not None:
|
||||
c.set_as_supported_enctypes(as_supported_enctypes)
|
||||
tgs_supported_enctypes = env_get_var('TGS_SUPPORTED_ENCTYPES',
|
||||
prefix, allow_missing=True)
|
||||
if tgs_supported_enctypes is not None:
|
||||
c.set_tgs_supported_enctypes(tgs_supported_enctypes)
|
||||
ap_supported_enctypes = env_get_var('AP_SUPPORTED_ENCTYPES',
|
||||
prefix, allow_missing=True)
|
||||
if ap_supported_enctypes is not None:
|
||||
c.set_ap_supported_enctypes(ap_supported_enctypes)
|
||||
|
||||
if require_strongest_key:
|
||||
kvno_allow_missing = False
|
||||
if password is None:
|
||||
aes256_allow_missing = False
|
||||
else:
|
||||
aes256_allow_missing = True
|
||||
else:
|
||||
kvno_allow_missing = True
|
||||
aes256_allow_missing = True
|
||||
kvno = env_get_var('KVNO', prefix,
|
||||
fallback_default=False,
|
||||
allow_missing=kvno_allow_missing)
|
||||
if kvno is not None:
|
||||
c.set_kvno(kvno)
|
||||
aes256_key = env_get_var('AES256_KEY_HEX', prefix,
|
||||
fallback_default=False,
|
||||
allow_missing=aes256_allow_missing)
|
||||
if aes256_key is not None:
|
||||
c.set_forced_key(kcrypto.Enctype.AES256, aes256_key)
|
||||
aes128_key = env_get_var('AES128_KEY_HEX', prefix,
|
||||
fallback_default=False, allow_missing=True)
|
||||
if aes128_key is not None:
|
||||
c.set_forced_key(kcrypto.Enctype.AES128, aes128_key)
|
||||
rc4_key = env_get_var('RC4_KEY_HEX', prefix,
|
||||
fallback_default=False, allow_missing=True)
|
||||
if rc4_key is not None:
|
||||
c.set_forced_key(kcrypto.Enctype.RC4, rc4_key)
|
||||
return c
|
||||
|
||||
def get_user_creds(self, allow_missing_password=False):
|
||||
c = self._get_krb5_creds(prefix=None,
|
||||
allow_missing_password=allow_missing_password)
|
||||
return c
|
||||
|
||||
def get_service_creds(self, allow_missing_password=False):
|
||||
c = self._get_krb5_creds(prefix='SERVICE',
|
||||
allow_missing_password=allow_missing_password)
|
||||
return c
|
||||
|
||||
def get_client_creds(self, allow_missing_password=False):
|
||||
c = self._get_krb5_creds(prefix='CLIENT',
|
||||
allow_missing_password=allow_missing_password)
|
||||
return c
|
||||
|
||||
def get_server_creds(self, allow_missing_password=False):
|
||||
c = self._get_krb5_creds(prefix='SERVER',
|
||||
allow_missing_password=allow_missing_password)
|
||||
return c
|
||||
|
||||
def get_krbtgt_creds(self, require_strongest_key=False):
|
||||
c = self._get_krb5_creds(prefix='KRBTGT',
|
||||
default_username='krbtgt',
|
||||
allow_missing_password=True,
|
||||
require_strongest_key=require_strongest_key)
|
||||
return c
|
||||
|
||||
def get_anon_creds(self):
|
||||
@ -473,6 +630,8 @@ class RawKerberosTest(TestCaseInTempDir):
|
||||
return Krb5EncryptionKey(key, kvno)
|
||||
|
||||
def PasswordKey_create(self, etype=None, pwd=None, salt=None, kvno=None):
|
||||
self.assertIsNotNone(pwd)
|
||||
self.assertIsNotNone(salt)
|
||||
key = kcrypto.string_to_key(etype, pwd, salt)
|
||||
return Krb5EncryptionKey(key, kvno)
|
||||
|
||||
|
@ -44,10 +44,12 @@ class SimpleKerberosTests(RawKerberosTest):
|
||||
def test_simple(self):
|
||||
user_creds = self.get_user_creds()
|
||||
user = user_creds.get_username()
|
||||
realm = user_creds.get_realm()
|
||||
krbtgt_creds = self.get_krbtgt_creds()
|
||||
krbtgt_account = krbtgt_creds.get_username()
|
||||
realm = krbtgt_creds.get_realm()
|
||||
|
||||
cname = self.PrincipalName_create(name_type=1, names=[user])
|
||||
sname = self.PrincipalName_create(name_type=2, names=["krbtgt", realm])
|
||||
sname = self.PrincipalName_create(name_type=2, names=[krbtgt_account, realm])
|
||||
|
||||
till = self.get_KerberosTime(offset=36000)
|
||||
|
||||
|
Loading…
x
Reference in New Issue
Block a user