1
0
mirror of https://github.com/samba-team/samba.git synced 2025-02-04 17:47:26 +03:00

tests/krb5/raw_testcase.py: Add method to obtain Kerberos keys over DRS

This requires admin credentials, and removes the need to pass these keys
as environment variables.

Signed-off-by: Joseph Sutton <josephsutton@catalyst.net.nz>
Reviewed-by: Andrew Bartlett <abartlet@samba.org>
Reviewed-by: Stefan Metzmacher <metze@samba.org>
This commit is contained in:
Joseph Sutton 2021-06-15 13:15:10 +12:00 committed by Stefan Metzmacher
parent 7d4a0ed21b
commit 1f2ddd3c97

View File

@ -20,6 +20,8 @@ import sys
import os
from datetime import datetime, timezone
import tempfile
import binascii
import struct
sys.path.insert(0, "bin/python")
os.environ["PYTHONUNBUFFERED"] = "1"
@ -29,7 +31,8 @@ from ldb import SCOPE_BASE
from samba import generate_random_password
from samba.auth import system_session
from samba.credentials import Credentials, SPECIFIED, MUST_USE_KERBEROS
from samba.dcerpc import krb5pac, krb5ccache, security
from samba.dcerpc import drsblobs, drsuapi, misc, krb5pac, krb5ccache, security
from samba.drs_utils import drsuapi_connect
from samba.dsdb import (
DS_DOMAIN_FUNCTION_2000,
DS_DOMAIN_FUNCTION_2008,
@ -37,6 +40,7 @@ from samba.dsdb import (
UF_NORMAL_ACCOUNT
)
from samba.ndr import ndr_pack, ndr_unpack
from samba import net
from samba.samdb import SamDB
from samba.tests import delete_force
@ -191,6 +195,100 @@ class KDCBaseTest(RawKerberosTest):
return (creds, dn)
def get_keys(self, samdb, dn):
admin_creds = self.get_admin_creds()
dns_hostname = samdb.host_dns_name()
(bind, handle, _) = drsuapi_connect(dns_hostname,
self.get_lp(),
admin_creds)
destination_dsa_guid = misc.GUID(samdb.get_ntds_GUID())
req = drsuapi.DsGetNCChangesRequest8()
req.destination_dsa_guid = destination_dsa_guid
req.source_dsa_invocation_id = misc.GUID()
naming_context = drsuapi.DsReplicaObjectIdentifier()
naming_context.dn = str(dn)
req.naming_context = naming_context
hwm = drsuapi.DsReplicaHighWaterMark()
hwm.tmp_highest_usn = 0
hwm.reserved_usn = 0
hwm.highest_usn = 0
req.highwatermark = hwm
req.uptodateness_vector = None
req.replica_flags = 0
req.max_object_count = 1
req.max_ndr_size = 402116
req.extended_op = drsuapi.DRSUAPI_EXOP_REPL_SECRET
attids = [drsuapi.DRSUAPI_ATTID_supplementalCredentials,
drsuapi.DRSUAPI_ATTID_unicodePwd]
partial_attribute_set = drsuapi.DsPartialAttributeSet()
partial_attribute_set.version = 1
partial_attribute_set.attids = attids
partial_attribute_set.num_attids = len(attids)
req.partial_attribute_set = partial_attribute_set
req.partial_attribute_set_ex = None
req.mapping_ctr.num_mappings = 0
req.mapping_ctr.mappings = None
_, ctr = bind.DsGetNCChanges(handle, 8, req)
identifier = ctr.first_object.object.identifier
attributes = ctr.first_object.object.attribute_ctr.attributes
rid = identifier.sid.split()[1]
forced_keys = dict()
net_ctx = net.Net(admin_creds)
keys = {}
for attr in attributes:
if attr.attid == drsuapi.DRSUAPI_ATTID_supplementalCredentials:
net_ctx.replicate_decrypt(bind, attr, rid)
attr_val = attr.value_ctr.values[0].blob
spl = ndr_unpack(drsblobs.supplementalCredentialsBlob,
attr_val)
for pkg in spl.sub.packages:
if pkg.name == 'Primary:Kerberos-Newer-Keys':
krb5_new_keys_raw = binascii.a2b_hex(pkg.data)
krb5_new_keys = ndr_unpack(
drsblobs.package_PrimaryKerberosBlob,
krb5_new_keys_raw)
for key in krb5_new_keys.ctr.keys:
keytype = key.keytype
if keytype in (kcrypto.Enctype.AES256,
kcrypto.Enctype.AES128):
keys[keytype] = key.value.hex()
elif attr.attid == drsuapi.DRSUAPI_ATTID_unicodePwd:
net_ctx.replicate_decrypt(bind, attr, rid)
pwd = attr.value_ctr.values[0].blob
keys[kcrypto.Enctype.RC4] = pwd.hex()
default_enctypes = self.get_default_enctypes()
if default_enctypes & security.KERB_ENCTYPE_RC4_HMAC_MD5:
self.assertIn(kcrypto.Enctype.RC4, keys)
if default_enctypes & security.KERB_ENCTYPE_AES256_CTS_HMAC_SHA1_96:
self.assertIn(kcrypto.Enctype.AES256, keys)
if default_enctypes & security.KERB_ENCTYPE_AES128_CTS_HMAC_SHA1_96:
self.assertIn(kcrypto.Enctype.AES128, keys)
return keys
def as_req(self, cname, sname, realm, etypes, padata=None):
'''Send a Kerberos AS_REQ, returns the undecoded response
'''