2020-11-30 14:16:28 +13:00
# Unix SMB/CIFS implementation.
# Copyright (C) Stefan Metzmacher 2020
2021-04-28 11:02:47 +12:00
# Copyright (C) 2020-2021 Catalyst.Net Ltd
2020-11-30 14:16:28 +13:00
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
import sys
import os
2021-05-10 16:43:03 +12:00
from datetime import datetime , timezone
2021-04-28 11:02:47 +12:00
import tempfile
2021-06-15 13:15:10 +12:00
import binascii
2021-08-31 22:38:01 +12:00
import collections
2021-09-03 15:36:24 +12:00
import secrets
2020-11-30 14:16:28 +13:00
from collections import namedtuple
2021-02-17 12:15:50 +13:00
import ldb
2020-11-30 14:16:28 +13:00
from ldb import SCOPE_BASE
from samba import generate_random_password
from samba . auth import system_session
2021-04-28 11:02:47 +12:00
from samba . credentials import Credentials , SPECIFIED , MUST_USE_KERBEROS
2021-06-15 13:15:10 +12:00
from samba . dcerpc import drsblobs , drsuapi , misc , krb5pac , krb5ccache , security
2021-09-13 22:13:24 +12:00
from samba . drs_utils import drs_Replicate , drsuapi_connect
2021-06-15 15:12:38 +12:00
from samba . dsdb import (
2021-09-13 22:13:24 +12:00
DSDB_SYNTAX_BINARY_DN ,
2021-06-15 15:12:38 +12:00
DS_DOMAIN_FUNCTION_2000 ,
DS_DOMAIN_FUNCTION_2008 ,
2021-09-03 09:18:32 +12:00
DS_GUID_COMPUTERS_CONTAINER ,
DS_GUID_USERS_CONTAINER ,
2021-06-15 15:12:38 +12:00
UF_WORKSTATION_TRUST_ACCOUNT ,
2021-09-03 15:36:24 +12:00
UF_NO_AUTH_DATA_REQUIRED ,
UF_NORMAL_ACCOUNT ,
UF_NOT_DELEGATED ,
2021-09-13 21:24:05 +12:00
UF_PARTIAL_SECRETS_ACCOUNT ,
2021-09-03 15:36:24 +12:00
UF_TRUSTED_TO_AUTHENTICATE_FOR_DELEGATION
2021-06-15 15:12:38 +12:00
)
2021-09-13 21:24:05 +12:00
from samba . join import DCJoinContext
2021-04-28 11:02:47 +12:00
from samba . ndr import ndr_pack , ndr_unpack
2021-06-15 13:15:10 +12:00
from samba import net
2021-09-13 22:13:24 +12:00
from samba . samdb import SamDB , dsdb_Dn
2020-11-30 14:16:28 +13:00
from samba . tests import delete_force
2021-06-15 16:07:16 +12:00
import samba . tests . krb5 . kcrypto as kcrypto
2021-09-16 11:13:09 +12:00
from samba . tests . krb5 . raw_testcase import (
KerberosCredentials ,
KerberosTicketCreds ,
RawKerberosTest
)
2020-11-30 14:16:28 +13:00
import samba . tests . krb5 . rfc4120_pyasn1 as krb5_asn1
from samba . tests . krb5 . rfc4120_constants import (
AD_IF_RELEVANT ,
AD_WIN2K_PAC ,
2021-04-28 11:02:47 +12:00
AES256_CTS_HMAC_SHA1_96 ,
ARCFOUR_HMAC_MD5 ,
2020-11-30 14:16:28 +13:00
KDC_ERR_PREAUTH_REQUIRED ,
KRB_AS_REP ,
KRB_TGS_REP ,
KRB_ERROR ,
2020-12-10 16:27:17 +13:00
KU_AS_REP_ENC_PART ,
2021-07-06 12:47:18 +12:00
KU_ENC_CHALLENGE_CLIENT ,
2020-12-10 16:27:17 +13:00
KU_PA_ENC_TIMESTAMP ,
KU_TICKET ,
2021-04-28 11:02:47 +12:00
NT_PRINCIPAL ,
NT_SRV_HST ,
2021-07-06 12:47:18 +12:00
PADATA_ENCRYPTED_CHALLENGE ,
2020-11-30 14:16:28 +13:00
PADATA_ENC_TIMESTAMP ,
PADATA_ETYPE_INFO2 ,
)
2021-08-02 17:00:09 +12:00
sys . path . insert ( 0 , " bin/python " )
os . environ [ " PYTHONUNBUFFERED " ] = " 1 "
2020-11-30 14:16:28 +13:00
global_asn1_print = False
global_hexdump = False
class KDCBaseTest ( RawKerberosTest ) :
""" Base class for KDC tests.
"""
@classmethod
def setUpClass ( cls ) :
2021-06-15 17:10:44 +12:00
super ( ) . setUpClass ( )
2021-06-16 11:40:41 +12:00
cls . _lp = None
2020-11-30 14:16:28 +13:00
2021-06-16 11:04:00 +12:00
cls . _ldb = None
2021-09-13 20:20:23 +12:00
cls . _rodc_ldb = None
2020-11-30 14:16:28 +13:00
2021-06-15 15:12:38 +12:00
cls . _functional_level = None
2021-09-03 15:36:24 +12:00
# An identifier to ensure created accounts have unique names. Windows
# caches accounts based on usernames, so account names being different
# across test runs avoids previous test runs affecting the results.
cls . account_base = f ' krb5_ { secrets . token_hex ( 5 ) } _ '
cls . account_id = 0
2021-06-15 15:38:28 +12:00
# A set containing DNs of accounts created as part of testing.
cls . accounts = set ( )
2021-09-03 15:36:24 +12:00
cls . account_cache = { }
2021-09-13 21:24:05 +12:00
cls . _rodc_ctx = None
2021-09-13 22:13:24 +12:00
cls . ldb_cleanups = [ ]
2021-06-15 15:38:28 +12:00
@classmethod
def tearDownClass ( cls ) :
# Clean up any accounts created by create_account. This is
# done in tearDownClass() rather than tearDown(), so that
# accounts need only be created once for permutation tests.
2021-06-16 11:04:00 +12:00
if cls . _ldb is not None :
2021-09-13 22:13:24 +12:00
for cleanup in reversed ( cls . ldb_cleanups ) :
try :
cls . _ldb . modify ( cleanup )
except ldb . LdbError :
pass
2021-06-16 11:04:00 +12:00
for dn in cls . accounts :
delete_force ( cls . _ldb , dn )
2021-09-13 21:24:05 +12:00
if cls . _rodc_ctx is not None :
cls . _rodc_ctx . cleanup_old_join ( force = True )
2021-06-15 15:38:28 +12:00
super ( ) . tearDownClass ( )
2020-11-30 14:16:28 +13:00
def setUp ( self ) :
super ( ) . setUp ( )
self . do_asn1_print = global_asn1_print
self . do_hexdump = global_hexdump
2021-06-16 11:40:41 +12:00
def get_lp ( self ) :
if self . _lp is None :
type ( self ) . _lp = self . get_loadparm ( )
return self . _lp
2021-06-16 11:04:00 +12:00
def get_samdb ( self ) :
if self . _ldb is None :
2021-06-16 12:52:11 +12:00
creds = self . get_admin_creds ( )
2021-06-16 11:40:41 +12:00
lp = self . get_lp ( )
2021-06-16 11:31:26 +12:00
2021-06-16 11:04:00 +12:00
session = system_session ( )
2021-09-01 19:34:20 +12:00
type ( self ) . _ldb = SamDB ( url = " ldap:// %s " % self . dc_host ,
2021-08-02 17:00:09 +12:00
session_info = session ,
credentials = creds ,
lp = lp )
2021-06-16 11:04:00 +12:00
return self . _ldb
2021-09-13 20:20:23 +12:00
def get_rodc_samdb ( self ) :
if self . _rodc_ldb is None :
creds = self . get_admin_creds ( )
lp = self . get_lp ( )
session = system_session ( )
type ( self ) . _rodc_ldb = SamDB ( url = " ldap:// %s " % self . host ,
session_info = session ,
credentials = creds ,
lp = lp ,
am_rodc = True )
return self . _rodc_ldb
def get_server_dn ( self , samdb ) :
server = samdb . get_serverName ( )
res = samdb . search ( base = server ,
scope = ldb . SCOPE_BASE ,
attrs = [ ' serverReference ' ] )
dn = ldb . Dn ( samdb , res [ 0 ] [ ' serverReference ' ] [ 0 ] . decode ( ' utf8 ' ) )
return dn
2021-09-13 21:24:05 +12:00
def get_mock_rodc_ctx ( self ) :
if self . _rodc_ctx is None :
admin_creds = self . get_admin_creds ( )
lp = self . get_lp ( )
rodc_name = ' KRB5RODC '
site_name = ' Default-First-Site-Name '
type ( self ) . _rodc_ctx = DCJoinContext ( server = self . dc_host ,
creds = admin_creds ,
lp = lp ,
site = site_name ,
netbios_name = rodc_name ,
targetdir = None ,
domain = None )
self . create_rodc ( self . _rodc_ctx )
return self . _rodc_ctx
2021-06-15 15:12:38 +12:00
def get_domain_functional_level ( self , ldb ) :
if self . _functional_level is None :
res = ldb . search ( base = ' ' ,
scope = SCOPE_BASE ,
attrs = [ ' domainFunctionality ' ] )
try :
functional_level = int ( res [ 0 ] [ ' domainFunctionality ' ] [ 0 ] )
except KeyError :
functional_level = DS_DOMAIN_FUNCTION_2000
type ( self ) . _functional_level = functional_level
return self . _functional_level
def get_default_enctypes ( self ) :
samdb = self . get_samdb ( )
functional_level = self . get_domain_functional_level ( samdb )
# RC4 should always be supported
default_enctypes = security . KERB_ENCTYPE_RC4_HMAC_MD5
if functional_level > = DS_DOMAIN_FUNCTION_2008 :
# AES is only supported at functional level 2008 or higher
default_enctypes | = security . KERB_ENCTYPE_AES256_CTS_HMAC_SHA1_96
default_enctypes | = security . KERB_ENCTYPE_AES128_CTS_HMAC_SHA1_96
return default_enctypes
2021-06-16 11:04:00 +12:00
def create_account ( self , ldb , name , machine_account = False ,
2021-09-01 16:34:02 +12:00
spn = None , upn = None , additional_details = None ,
2021-09-01 16:34:46 +12:00
ou = None , account_control = 0 ) :
2020-11-30 14:16:28 +13:00
''' Create an account for testing.
The dn of the created account is added to self . accounts ,
2021-06-15 15:38:28 +12:00
which is used by tearDownClass to clean up the created accounts .
2020-11-30 14:16:28 +13:00
'''
2021-09-01 16:34:02 +12:00
if ou is None :
2021-09-03 09:18:32 +12:00
guid = ( DS_GUID_COMPUTERS_CONTAINER if machine_account
else DS_GUID_USERS_CONTAINER )
ou = ldb . get_wellknown_dn ( ldb . get_default_basedn ( ) , guid )
2021-09-01 16:34:02 +12:00
dn = " CN= %s , %s " % ( name , ou )
2020-11-30 14:16:28 +13:00
# remove the account if it exists, this will happen if a previous test
# run failed
2021-06-16 11:04:00 +12:00
delete_force ( ldb , dn )
2020-11-30 14:16:28 +13:00
if machine_account :
object_class = " computer "
account_name = " %s $ " % name
2021-09-01 16:34:46 +12:00
account_control | = UF_WORKSTATION_TRUST_ACCOUNT
2020-11-30 14:16:28 +13:00
else :
object_class = " user "
account_name = name
2021-09-01 16:34:46 +12:00
account_control | = UF_NORMAL_ACCOUNT
2020-11-30 14:16:28 +13:00
password = generate_random_password ( 32 , 32 )
utf16pw = ( ' " %s " ' % password ) . encode ( ' utf-16-le ' )
details = {
" dn " : dn ,
" objectclass " : object_class ,
" sAMAccountName " : account_name ,
2021-09-01 16:34:46 +12:00
" userAccountControl " : str ( account_control ) ,
2020-11-30 14:16:28 +13:00
" unicodePwd " : utf16pw }
if spn is not None :
details [ " servicePrincipalName " ] = spn
2021-02-17 12:15:50 +13:00
if upn is not None :
details [ " userPrincipalName " ] = upn
2021-07-06 11:25:55 +12:00
if additional_details is not None :
details . update ( additional_details )
2021-06-16 11:04:00 +12:00
ldb . add ( details )
2020-11-30 14:16:28 +13:00
2021-06-15 16:07:16 +12:00
creds = KerberosCredentials ( )
2021-06-16 11:40:41 +12:00
creds . guess ( self . get_lp ( ) )
2021-06-16 11:04:00 +12:00
creds . set_realm ( ldb . domain_dns_name ( ) . upper ( ) )
creds . set_domain ( ldb . domain_netbios_name ( ) . upper ( ) )
2020-11-30 14:16:28 +13:00
creds . set_password ( password )
creds . set_username ( account_name )
if machine_account :
creds . set_workstation ( name )
2021-07-22 16:22:09 +12:00
else :
creds . set_workstation ( ' ' )
2021-09-01 16:35:58 +12:00
creds . set_dn ( dn )
2020-11-30 14:16:28 +13:00
#
2021-06-15 15:38:28 +12:00
# Save the account name so it can be deleted in tearDownClass
self . accounts . add ( dn )
2020-11-30 14:16:28 +13:00
return ( creds , dn )
2021-09-13 21:24:05 +12:00
def create_rodc ( self , ctx ) :
ctx . nc_list = [ ctx . base_dn , ctx . config_dn , ctx . schema_dn ]
ctx . full_nc_list = [ ctx . base_dn , ctx . config_dn , ctx . schema_dn ]
ctx . krbtgt_dn = f ' CN=krbtgt_ { ctx . myname } ,CN=Users, { ctx . base_dn } '
ctx . never_reveal_sid = [ f ' <SID= { ctx . domsid } - { security . DOMAIN_RID_RODC_DENY } > ' ,
f ' <SID= { security . SID_BUILTIN_ADMINISTRATORS } > ' ,
f ' <SID= { security . SID_BUILTIN_SERVER_OPERATORS } > ' ,
f ' <SID= { security . SID_BUILTIN_BACKUP_OPERATORS } > ' ,
f ' <SID= { security . SID_BUILTIN_ACCOUNT_OPERATORS } > ' ]
ctx . reveal_sid = f ' <SID= { ctx . domsid } - { security . DOMAIN_RID_RODC_ALLOW } > '
mysid = ctx . get_mysid ( )
admin_dn = f ' <SID= { mysid } > '
ctx . managedby = admin_dn
ctx . userAccountControl = ( UF_WORKSTATION_TRUST_ACCOUNT |
UF_TRUSTED_TO_AUTHENTICATE_FOR_DELEGATION |
UF_PARTIAL_SECRETS_ACCOUNT )
ctx . connection_dn = f ' CN=RODC Connection (FRS), { ctx . ntds_dn } '
ctx . secure_channel_type = misc . SEC_CHAN_RODC
ctx . RODC = True
ctx . replica_flags = ( drsuapi . DRSUAPI_DRS_INIT_SYNC |
drsuapi . DRSUAPI_DRS_PER_SYNC |
drsuapi . DRSUAPI_DRS_GET_ANC |
drsuapi . DRSUAPI_DRS_NEVER_SYNCED |
drsuapi . DRSUAPI_DRS_SPECIAL_SECRET_PROCESSING )
ctx . domain_replica_flags = ctx . replica_flags | drsuapi . DRSUAPI_DRS_CRITICAL_ONLY
ctx . build_nc_lists ( )
ctx . cleanup_old_join ( )
try :
ctx . join_add_objects ( )
except Exception :
# cleanup the failed join (checking we still have a live LDB
# connection to the remote DC first)
ctx . refresh_ldb_connection ( )
ctx . cleanup_old_join ( )
raise
2021-09-13 22:13:24 +12:00
def replicate_account_to_rodc ( self , dn ) :
samdb = self . get_samdb ( )
rodc_samdb = self . get_rodc_samdb ( )
repl_val = f ' { samdb . get_dsServiceName ( ) } : { dn } :SECRETS_ONLY '
msg = ldb . Message ( )
msg . dn = ldb . Dn ( rodc_samdb , ' ' )
msg [ ' replicateSingleObject ' ] = ldb . MessageElement (
repl_val ,
ldb . FLAG_MOD_REPLACE ,
' replicateSingleObject ' )
try :
# Try replication using the replicateSingleObject rootDSE
# operation.
rodc_samdb . modify ( msg )
except ldb . LdbError as err :
enum , estr = err . args
self . assertEqual ( enum , ldb . ERR_UNWILLING_TO_PERFORM )
self . assertIn ( ' rootdse_modify: unknown attribute to change! ' ,
estr )
# If that method wasn't supported, we may be in the rodc:local test
# environment, where we can try replicating to the local database.
lp = self . get_lp ( )
rodc_creds = Credentials ( )
rodc_creds . guess ( lp )
rodc_creds . set_machine_account ( lp )
local_samdb = SamDB ( url = None , session_info = system_session ( ) ,
credentials = rodc_creds , lp = lp )
destination_dsa_guid = misc . GUID ( local_samdb . get_ntds_GUID ( ) )
repl = drs_Replicate ( f ' ncacn_ip_tcp: { self . dc_host } [seal] ' ,
lp , rodc_creds ,
local_samdb , destination_dsa_guid )
source_dsa_invocation_id = misc . GUID ( samdb . invocation_id )
repl . replicate ( dn ,
source_dsa_invocation_id ,
destination_dsa_guid ,
exop = drsuapi . DRSUAPI_EXOP_REPL_SECRET ,
rodc = True )
2021-09-13 21:24:31 +12:00
def reveal_account_to_mock_rodc ( self , dn ) :
samdb = self . get_samdb ( )
rodc_ctx = self . get_mock_rodc_ctx ( )
self . get_secrets (
samdb ,
dn ,
destination_dsa_guid = rodc_ctx . ntds_guid ,
source_dsa_invocation_id = misc . GUID ( samdb . invocation_id ) )
2021-09-13 22:13:24 +12:00
def check_revealed ( self , dn , rodc_dn , revealed = True ) :
samdb = self . get_samdb ( )
res = samdb . search ( base = rodc_dn ,
scope = ldb . SCOPE_BASE ,
attrs = [ ' msDS-RevealedUsers ' ] )
revealed_users = res [ 0 ] . get ( ' msDS-RevealedUsers ' )
if revealed_users is None :
self . assertFalse ( revealed )
return
revealed_dns = set ( str ( dsdb_Dn ( samdb , str ( user ) ,
syntax_oid = DSDB_SYNTAX_BINARY_DN ) . dn )
for user in revealed_users )
if revealed :
self . assertIn ( str ( dn ) , revealed_dns )
else :
self . assertNotIn ( str ( dn ) , revealed_dns )
2021-09-13 20:58:01 +12:00
def get_secrets ( self , samdb , dn ,
destination_dsa_guid ,
source_dsa_invocation_id ) :
2021-06-15 13:15:10 +12:00
admin_creds = self . get_admin_creds ( )
dns_hostname = samdb . host_dns_name ( )
( bind , handle , _ ) = drsuapi_connect ( dns_hostname ,
self . get_lp ( ) ,
admin_creds )
req = drsuapi . DsGetNCChangesRequest8 ( )
req . destination_dsa_guid = destination_dsa_guid
2021-09-13 20:58:01 +12:00
req . source_dsa_invocation_id = source_dsa_invocation_id
2021-06-15 13:15:10 +12:00
naming_context = drsuapi . DsReplicaObjectIdentifier ( )
2021-09-13 20:58:01 +12:00
naming_context . dn = dn
2021-06-15 13:15:10 +12:00
req . naming_context = naming_context
hwm = drsuapi . DsReplicaHighWaterMark ( )
hwm . tmp_highest_usn = 0
hwm . reserved_usn = 0
hwm . highest_usn = 0
req . highwatermark = hwm
req . uptodateness_vector = None
req . replica_flags = 0
req . max_object_count = 1
req . max_ndr_size = 402116
req . extended_op = drsuapi . DRSUAPI_EXOP_REPL_SECRET
attids = [ drsuapi . DRSUAPI_ATTID_supplementalCredentials ,
drsuapi . DRSUAPI_ATTID_unicodePwd ]
partial_attribute_set = drsuapi . DsPartialAttributeSet ( )
partial_attribute_set . version = 1
partial_attribute_set . attids = attids
partial_attribute_set . num_attids = len ( attids )
req . partial_attribute_set = partial_attribute_set
req . partial_attribute_set_ex = None
req . mapping_ctr . num_mappings = 0
req . mapping_ctr . mappings = None
_ , ctr = bind . DsGetNCChanges ( handle , 8 , req )
2021-09-13 20:58:01 +12:00
self . assertEqual ( 1 , ctr . object_count )
2021-06-15 13:15:10 +12:00
identifier = ctr . first_object . object . identifier
attributes = ctr . first_object . object . attribute_ctr . attributes
2021-09-13 20:58:01 +12:00
self . assertEqual ( dn , identifier . dn )
return bind , identifier , attributes
def get_keys ( self , samdb , dn ) :
admin_creds = self . get_admin_creds ( )
bind , identifier , attributes = self . get_secrets (
samdb ,
str ( dn ) ,
destination_dsa_guid = misc . GUID ( samdb . get_ntds_GUID ( ) ) ,
source_dsa_invocation_id = misc . GUID ( ) )
2021-06-15 13:15:10 +12:00
rid = identifier . sid . split ( ) [ 1 ]
net_ctx = net . Net ( admin_creds )
keys = { }
for attr in attributes :
if attr . attid == drsuapi . DRSUAPI_ATTID_supplementalCredentials :
net_ctx . replicate_decrypt ( bind , attr , rid )
attr_val = attr . value_ctr . values [ 0 ] . blob
spl = ndr_unpack ( drsblobs . supplementalCredentialsBlob ,
attr_val )
for pkg in spl . sub . packages :
if pkg . name == ' Primary:Kerberos-Newer-Keys ' :
krb5_new_keys_raw = binascii . a2b_hex ( pkg . data )
krb5_new_keys = ndr_unpack (
drsblobs . package_PrimaryKerberosBlob ,
krb5_new_keys_raw )
for key in krb5_new_keys . ctr . keys :
keytype = key . keytype
if keytype in ( kcrypto . Enctype . AES256 ,
kcrypto . Enctype . AES128 ) :
keys [ keytype ] = key . value . hex ( )
elif attr . attid == drsuapi . DRSUAPI_ATTID_unicodePwd :
net_ctx . replicate_decrypt ( bind , attr , rid )
pwd = attr . value_ctr . values [ 0 ] . blob
keys [ kcrypto . Enctype . RC4 ] = pwd . hex ( )
default_enctypes = self . get_default_enctypes ( )
if default_enctypes & security . KERB_ENCTYPE_RC4_HMAC_MD5 :
self . assertIn ( kcrypto . Enctype . RC4 , keys )
if default_enctypes & security . KERB_ENCTYPE_AES256_CTS_HMAC_SHA1_96 :
self . assertIn ( kcrypto . Enctype . AES256 , keys )
if default_enctypes & security . KERB_ENCTYPE_AES128_CTS_HMAC_SHA1_96 :
self . assertIn ( kcrypto . Enctype . AES128 , keys )
return keys
2021-06-15 16:07:16 +12:00
def creds_set_keys ( self , creds , keys ) :
if keys is not None :
for enctype , key in keys . items ( ) :
creds . set_forced_key ( enctype , key )
supported_enctypes = 0
if kcrypto . Enctype . AES256 in keys :
supported_enctypes | = security . KERB_ENCTYPE_AES256_CTS_HMAC_SHA1_96
if kcrypto . Enctype . AES128 in keys :
supported_enctypes | = security . KERB_ENCTYPE_AES128_CTS_HMAC_SHA1_96
if kcrypto . Enctype . RC4 in keys :
supported_enctypes | = security . KERB_ENCTYPE_RC4_HMAC_MD5
creds . set_as_supported_enctypes ( supported_enctypes )
creds . set_tgs_supported_enctypes ( supported_enctypes )
creds . set_ap_supported_enctypes ( supported_enctypes )
2021-09-13 22:13:24 +12:00
def add_to_group ( self , account_dn , group_dn , group_attr ) :
samdb = self . get_samdb ( )
res = samdb . search ( base = group_dn ,
scope = ldb . SCOPE_BASE ,
attrs = [ group_attr ] )
orig_msg = res [ 0 ]
members = list ( orig_msg [ group_attr ] )
members . append ( account_dn )
msg = ldb . Message ( )
msg . dn = group_dn
msg [ group_attr ] = ldb . MessageElement ( members ,
ldb . FLAG_MOD_REPLACE ,
group_attr )
cleanup = samdb . msg_diff ( msg , orig_msg )
self . ldb_cleanups . append ( cleanup )
samdb . modify ( msg )
return cleanup
2021-09-03 15:36:24 +12:00
def get_cached_creds ( self , * ,
machine_account ,
opts = None ) :
if opts is None :
opts = { }
opts_default = {
2021-09-13 22:13:24 +12:00
' allowed_replication ' : False ,
2021-09-13 21:24:31 +12:00
' allowed_replication_mock ' : False ,
2021-09-13 22:13:24 +12:00
' denied_replication ' : False ,
2021-09-13 21:24:31 +12:00
' denied_replication_mock ' : False ,
2021-09-13 22:13:24 +12:00
' revealed_to_rodc ' : False ,
2021-09-13 21:24:31 +12:00
' revealed_to_mock_rodc ' : False ,
2021-09-03 15:36:24 +12:00
' no_auth_data_required ' : False ,
' supported_enctypes ' : None ,
' not_delegated ' : False ,
' allowed_to_delegate_to ' : None ,
' trusted_to_auth_for_delegation ' : False ,
' fast_support ' : False
}
account_opts = {
' machine_account ' : machine_account ,
* * opts_default ,
* * opts
}
cache_key = tuple ( sorted ( account_opts . items ( ) ) )
creds = self . account_cache . get ( cache_key )
if creds is None :
creds = self . create_account_opts ( * * account_opts )
self . account_cache [ cache_key ] = creds
return creds
def create_account_opts ( self , * ,
machine_account ,
2021-09-13 22:13:24 +12:00
allowed_replication ,
2021-09-13 21:24:31 +12:00
allowed_replication_mock ,
2021-09-13 22:13:24 +12:00
denied_replication ,
2021-09-13 21:24:31 +12:00
denied_replication_mock ,
2021-09-13 22:13:24 +12:00
revealed_to_rodc ,
2021-09-13 21:24:31 +12:00
revealed_to_mock_rodc ,
2021-09-03 15:36:24 +12:00
no_auth_data_required ,
supported_enctypes ,
not_delegated ,
allowed_to_delegate_to ,
trusted_to_auth_for_delegation ,
fast_support ) :
if machine_account :
self . assertFalse ( not_delegated )
else :
self . assertIsNone ( allowed_to_delegate_to )
self . assertFalse ( trusted_to_auth_for_delegation )
samdb = self . get_samdb ( )
2021-09-13 22:13:24 +12:00
rodc_samdb = self . get_rodc_samdb ( )
rodc_dn = self . get_server_dn ( rodc_samdb )
2021-06-15 16:07:16 +12:00
2021-09-03 15:36:24 +12:00
user_name = self . account_base + str ( self . account_id )
type ( self ) . account_id + = 1
2021-06-15 16:07:16 +12:00
2021-09-03 15:36:24 +12:00
user_account_control = 0
if trusted_to_auth_for_delegation :
user_account_control | = UF_TRUSTED_TO_AUTHENTICATE_FOR_DELEGATION
if not_delegated :
user_account_control | = UF_NOT_DELEGATED
if no_auth_data_required :
user_account_control | = UF_NO_AUTH_DATA_REQUIRED
details = { }
enctypes = supported_enctypes
if fast_support :
fast_bits = ( security . KERB_ENCTYPE_FAST_SUPPORTED |
security . KERB_ENCTYPE_COMPOUND_IDENTITY_SUPPORTED |
security . KERB_ENCTYPE_CLAIMS_SUPPORTED )
enctypes = ( enctypes or 0 ) | fast_bits
if enctypes is not None :
details [ ' msDS-SupportedEncryptionTypes ' ] = str ( enctypes )
if allowed_to_delegate_to :
details [ ' msDS-AllowedToDelegateTo ' ] = allowed_to_delegate_to
if machine_account :
spn = ' host/ ' + user_name
else :
spn = None
creds , dn = self . create_account ( samdb , user_name ,
machine_account = machine_account ,
spn = spn ,
additional_details = details ,
account_control = user_account_control )
2021-06-15 16:07:16 +12:00
2021-09-03 15:36:24 +12:00
res = samdb . search ( base = dn ,
scope = ldb . SCOPE_BASE ,
attrs = [ ' msDS-KeyVersionNumber ' ] )
kvno = int ( res [ 0 ] [ ' msDS-KeyVersionNumber ' ] [ 0 ] )
creds . set_kvno ( kvno )
2021-06-15 16:07:16 +12:00
2021-09-03 15:36:24 +12:00
keys = self . get_keys ( samdb , dn )
self . creds_set_keys ( creds , keys )
if machine_account :
if supported_enctypes is not None :
tgs_enctypes = supported_enctypes
else :
tgs_enctypes = security . KERB_ENCTYPE_RC4_HMAC_MD5
creds . set_tgs_supported_enctypes ( tgs_enctypes )
2021-09-13 22:13:24 +12:00
# Handle secret replication to the RODC.
if allowed_replication or revealed_to_rodc :
# Allow replicating this account's secrets if requested, or allow
# it only temporarily if we're about to replicate them.
allowed_cleanup = self . add_to_group (
dn , rodc_dn ,
' msDS-RevealOnDemandGroup ' )
if revealed_to_rodc :
# Replicate this account's secrets to the RODC.
self . replicate_account_to_rodc ( dn )
if not allowed_replication :
# If we don't want replicating secrets to be allowed for this
# account, disable it again.
samdb . modify ( allowed_cleanup )
self . check_revealed ( dn ,
rodc_dn ,
revealed = revealed_to_rodc )
if denied_replication :
# Deny replicating this account's secrets to the RODC.
self . add_to_group ( dn , rodc_dn , ' msDS-NeverRevealGroup ' )
2021-09-13 21:24:31 +12:00
# Handle secret replication to the mock RODC.
if allowed_replication_mock or revealed_to_mock_rodc :
# Allow replicating this account's secrets if requested, or allow
# it only temporarily if we want to add the account to the mock
# RODC's msDS-RevealedUsers.
rodc_ctx = self . get_mock_rodc_ctx ( )
mock_rodc_dn = ldb . Dn ( samdb , rodc_ctx . acct_dn )
allowed_mock_cleanup = self . add_to_group (
dn , mock_rodc_dn ,
' msDS-RevealOnDemandGroup ' )
if revealed_to_mock_rodc :
# Request replicating this account's secrets to the mock RODC,
# which updates msDS-RevealedUsers.
self . reveal_account_to_mock_rodc ( dn )
if not allowed_replication_mock :
# If we don't want replicating secrets to be allowed for this
# account, disable it again.
samdb . modify ( allowed_mock_cleanup )
self . check_revealed ( dn ,
mock_rodc_dn ,
revealed = revealed_to_mock_rodc )
if denied_replication_mock :
# Deny replicating this account's secrets to the mock RODC.
rodc_ctx = self . get_mock_rodc_ctx ( )
mock_rodc_dn = ldb . Dn ( samdb , rodc_ctx . acct_dn )
self . add_to_group ( dn , mock_rodc_dn , ' msDS-NeverRevealGroup ' )
2021-09-13 22:13:24 +12:00
2021-09-03 15:36:24 +12:00
return creds
def get_client_creds ( self ,
allow_missing_password = False ,
allow_missing_keys = True ) :
def create_client_account ( ) :
return self . get_cached_creds ( machine_account = False )
2021-06-15 16:07:16 +12:00
c = self . _get_krb5_creds ( prefix = ' CLIENT ' ,
allow_missing_password = allow_missing_password ,
allow_missing_keys = allow_missing_keys ,
fallback_creds_fn = create_client_account )
return c
2021-07-06 10:19:57 +12:00
def get_mach_creds ( self ,
allow_missing_password = False ,
allow_missing_keys = True ) :
def create_mach_account ( ) :
2021-09-03 15:36:24 +12:00
return self . get_cached_creds ( machine_account = True ,
opts = { ' fast_support ' : True } )
2021-07-06 10:19:57 +12:00
c = self . _get_krb5_creds ( prefix = ' MAC ' ,
allow_missing_password = allow_missing_password ,
allow_missing_keys = allow_missing_keys ,
fallback_creds_fn = create_mach_account )
return c
def get_service_creds ( self ,
allow_missing_password = False ,
allow_missing_keys = True ) :
def create_service_account ( ) :
2021-09-03 15:36:24 +12:00
return self . get_cached_creds (
machine_account = True ,
opts = {
' trusted_to_auth_for_delegation ' : True ,
' fast_support ' : True
} )
2021-07-06 10:19:57 +12:00
c = self . _get_krb5_creds ( prefix = ' SERVICE ' ,
allow_missing_password = allow_missing_password ,
allow_missing_keys = allow_missing_keys ,
fallback_creds_fn = create_service_account )
return c
2021-09-13 20:20:23 +12:00
def get_rodc_krbtgt_creds ( self ,
require_keys = True ,
require_strongest_key = False ) :
if require_strongest_key :
self . assertTrue ( require_keys )
def download_rodc_krbtgt_creds ( ) :
samdb = self . get_samdb ( )
rodc_samdb = self . get_rodc_samdb ( )
rodc_dn = self . get_server_dn ( rodc_samdb )
res = samdb . search ( rodc_dn ,
scope = ldb . SCOPE_BASE ,
attrs = [ ' msDS-KrbTgtLink ' ] )
krbtgt_dn = res [ 0 ] [ ' msDS-KrbTgtLink ' ] [ 0 ]
res = samdb . search ( krbtgt_dn ,
scope = ldb . SCOPE_BASE ,
attrs = [ ' sAMAccountName ' ,
' msDS-KeyVersionNumber ' ,
' msDS-SecondaryKrbTgtNumber ' ] )
krbtgt_dn = res [ 0 ] . dn
username = str ( res [ 0 ] [ ' sAMAccountName ' ] )
creds = KerberosCredentials ( )
creds . set_domain ( self . env_get_var ( ' DOMAIN ' , ' RODC_KRBTGT ' ) )
creds . set_realm ( self . env_get_var ( ' REALM ' , ' RODC_KRBTGT ' ) )
creds . set_username ( username )
kvno = int ( res [ 0 ] [ ' msDS-KeyVersionNumber ' ] [ 0 ] )
krbtgt_number = int ( res [ 0 ] [ ' msDS-SecondaryKrbTgtNumber ' ] [ 0 ] )
rodc_kvno = krbtgt_number << 16 | kvno
creds . set_kvno ( rodc_kvno )
creds . set_dn ( krbtgt_dn )
keys = self . get_keys ( samdb , krbtgt_dn )
self . creds_set_keys ( creds , keys )
return creds
c = self . _get_krb5_creds ( prefix = ' RODC_KRBTGT ' ,
allow_missing_password = True ,
allow_missing_keys = not require_keys ,
require_strongest_key = require_strongest_key ,
fallback_creds_fn = download_rodc_krbtgt_creds )
return c
2021-09-13 21:24:05 +12:00
def get_mock_rodc_krbtgt_creds ( self ,
require_keys = True ,
require_strongest_key = False ) :
if require_strongest_key :
self . assertTrue ( require_keys )
def create_rodc_krbtgt_account ( ) :
samdb = self . get_samdb ( )
rodc_ctx = self . get_mock_rodc_ctx ( )
krbtgt_dn = rodc_ctx . new_krbtgt_dn
res = samdb . search ( base = ldb . Dn ( samdb , krbtgt_dn ) ,
scope = ldb . SCOPE_BASE ,
attrs = [ ' msDS-KeyVersionNumber ' ,
' msDS-SecondaryKrbTgtNumber ' ] )
dn = res [ 0 ] . dn
username = str ( rodc_ctx . krbtgt_name )
creds = KerberosCredentials ( )
creds . set_domain ( self . env_get_var ( ' DOMAIN ' , ' RODC_KRBTGT ' ) )
creds . set_realm ( self . env_get_var ( ' REALM ' , ' RODC_KRBTGT ' ) )
creds . set_username ( username )
kvno = int ( res [ 0 ] [ ' msDS-KeyVersionNumber ' ] [ 0 ] )
krbtgt_number = int ( res [ 0 ] [ ' msDS-SecondaryKrbTgtNumber ' ] [ 0 ] )
rodc_kvno = krbtgt_number << 16 | kvno
creds . set_kvno ( rodc_kvno )
creds . set_dn ( dn )
keys = self . get_keys ( samdb , dn )
self . creds_set_keys ( creds , keys )
return creds
c = self . _get_krb5_creds ( prefix = ' MOCK_RODC_KRBTGT ' ,
allow_missing_password = True ,
allow_missing_keys = not require_keys ,
require_strongest_key = require_strongest_key ,
fallback_creds_fn = create_rodc_krbtgt_account )
return c
2021-06-15 16:07:16 +12:00
def get_krbtgt_creds ( self ,
require_keys = True ,
require_strongest_key = False ) :
if require_strongest_key :
self . assertTrue ( require_keys )
2021-08-02 17:00:09 +12:00
2021-06-15 16:07:16 +12:00
def download_krbtgt_creds ( ) :
samdb = self . get_samdb ( )
2021-09-01 17:46:02 +12:00
krbtgt_rid = security . DOMAIN_RID_KRBTGT
2021-06-15 16:07:16 +12:00
krbtgt_sid = ' %s - %d ' % ( samdb . get_domain_sid ( ) , krbtgt_rid )
res = samdb . search ( base = ' <SID= %s > ' % krbtgt_sid ,
scope = ldb . SCOPE_BASE ,
attrs = [ ' sAMAccountName ' ,
' msDS-KeyVersionNumber ' ] )
dn = res [ 0 ] . dn
username = str ( res [ 0 ] [ ' sAMAccountName ' ] )
creds = KerberosCredentials ( )
creds . set_domain ( self . env_get_var ( ' DOMAIN ' , ' KRBTGT ' ) )
creds . set_realm ( self . env_get_var ( ' REALM ' , ' KRBTGT ' ) )
creds . set_username ( username )
kvno = int ( res [ 0 ] [ ' msDS-KeyVersionNumber ' ] [ 0 ] )
creds . set_kvno ( kvno )
2021-09-01 16:35:58 +12:00
creds . set_dn ( dn )
2021-06-15 16:07:16 +12:00
keys = self . get_keys ( samdb , dn )
self . creds_set_keys ( creds , keys )
return creds
c = self . _get_krb5_creds ( prefix = ' KRBTGT ' ,
default_username = ' krbtgt ' ,
allow_missing_password = True ,
allow_missing_keys = not require_keys ,
require_strongest_key = require_strongest_key ,
fallback_creds_fn = download_krbtgt_creds )
return c
2020-11-30 14:16:28 +13:00
def as_req ( self , cname , sname , realm , etypes , padata = None ) :
''' Send a Kerberos AS_REQ, returns the undecoded response
'''
till = self . get_KerberosTime ( offset = 36000 )
kdc_options = 0
req = self . AS_REQ_create ( padata = padata ,
kdc_options = str ( kdc_options ) ,
cname = cname ,
realm = realm ,
sname = sname ,
from_time = None ,
till_time = till ,
renew_time = None ,
nonce = 0x7fffffff ,
etypes = etypes ,
addresses = None ,
additional_tickets = None )
rep = self . send_recv_transaction ( req )
return rep
def get_as_rep_key ( self , creds , rep ) :
''' Extract the session key from an AS-REP
'''
rep_padata = self . der_decode (
rep [ ' e-data ' ] ,
asn1Spec = krb5_asn1 . METHOD_DATA ( ) )
for pa in rep_padata :
if pa [ ' padata-type ' ] == PADATA_ETYPE_INFO2 :
padata_value = pa [ ' padata-value ' ]
break
etype_info2 = self . der_decode (
padata_value , asn1Spec = krb5_asn1 . ETYPE_INFO2 ( ) )
2021-07-06 11:28:37 +12:00
key = self . PasswordKey_from_etype_info2 ( creds , etype_info2 [ 0 ] ,
creds . get_kvno ( ) )
2020-11-30 14:16:28 +13:00
return key
2021-07-06 10:16:01 +12:00
def get_enc_timestamp_pa_data ( self , creds , rep , skew = 0 ) :
2020-11-30 14:16:28 +13:00
''' generate the pa_data data element for an AS-REQ
'''
2021-07-26 17:18:38 +12:00
2020-11-30 14:16:28 +13:00
key = self . get_as_rep_key ( creds , rep )
2021-07-26 17:18:38 +12:00
return self . get_enc_timestamp_pa_data_from_key ( key , skew = skew )
def get_enc_timestamp_pa_data_from_key ( self , key , skew = 0 ) :
2020-11-30 14:16:28 +13:00
( patime , pausec ) = self . get_KerberosTimeWithUsec ( offset = skew )
padata = self . PA_ENC_TS_ENC_create ( patime , pausec )
padata = self . der_encode ( padata , asn1Spec = krb5_asn1 . PA_ENC_TS_ENC ( ) )
2020-12-10 16:27:17 +13:00
padata = self . EncryptedData_create ( key , KU_PA_ENC_TIMESTAMP , padata )
2020-11-30 14:16:28 +13:00
padata = self . der_encode ( padata , asn1Spec = krb5_asn1 . EncryptedData ( ) )
padata = self . PA_DATA_create ( PADATA_ENC_TIMESTAMP , padata )
2021-07-06 10:16:01 +12:00
return padata
2020-11-30 14:16:28 +13:00
2021-07-06 12:47:18 +12:00
def get_challenge_pa_data ( self , client_challenge_key , skew = 0 ) :
patime , pausec = self . get_KerberosTimeWithUsec ( offset = skew )
padata = self . PA_ENC_TS_ENC_create ( patime , pausec )
padata = self . der_encode ( padata ,
asn1Spec = krb5_asn1 . PA_ENC_TS_ENC ( ) )
padata = self . EncryptedData_create ( client_challenge_key ,
KU_ENC_CHALLENGE_CLIENT ,
padata )
padata = self . der_encode ( padata ,
asn1Spec = krb5_asn1 . EncryptedData ( ) )
padata = self . PA_DATA_create ( PADATA_ENCRYPTED_CHALLENGE ,
padata )
return padata
2020-11-30 14:16:28 +13:00
def get_as_rep_enc_data ( self , key , rep ) :
''' Decrypt and Decode the encrypted data in an AS-REP
'''
2020-12-10 16:27:17 +13:00
enc_part = key . decrypt ( KU_AS_REP_ENC_PART , rep [ ' enc-part ' ] [ ' cipher ' ] )
2020-11-30 14:16:28 +13:00
# MIT KDC encodes both EncASRepPart and EncTGSRepPart with
# application tag 26
try :
enc_part = self . der_decode (
enc_part , asn1Spec = krb5_asn1 . EncASRepPart ( ) )
except Exception :
enc_part = self . der_decode (
enc_part , asn1Spec = krb5_asn1 . EncTGSRepPart ( ) )
return enc_part
2021-07-06 10:17:52 +12:00
def check_pre_authentication ( self , rep ) :
2020-11-30 14:16:28 +13:00
""" Check that the kdc response was pre-authentication required
"""
self . check_error_rep ( rep , KDC_ERR_PREAUTH_REQUIRED )
def check_as_reply ( self , rep ) :
""" Check that the kdc response is an AS-REP and that the
values for :
msg - type
pvno
tkt - pvno
kvno
match the expected values
"""
2021-07-26 17:19:04 +12:00
self . check_reply ( rep , msg_type = KRB_AS_REP )
2020-11-30 14:16:28 +13:00
def check_tgs_reply ( self , rep ) :
""" Check that the kdc response is an TGS-REP and that the
values for :
msg - type
pvno
tkt - pvno
kvno
match the expected values
"""
2021-07-26 17:19:04 +12:00
self . check_reply ( rep , msg_type = KRB_TGS_REP )
def check_reply ( self , rep , msg_type ) :
2020-11-30 14:16:28 +13:00
# Should have a reply, and it should an TGS-REP message.
self . assertIsNotNone ( rep )
2021-07-26 17:19:04 +12:00
self . assertEqual ( rep [ ' msg-type ' ] , msg_type , " rep = { %s } " % rep )
2020-11-30 14:16:28 +13:00
# Protocol version number should be 5
pvno = int ( rep [ ' pvno ' ] )
self . assertEqual ( 5 , pvno , " rep = { %s } " % rep )
# The ticket version number should be 5
tkt_vno = int ( rep [ ' ticket ' ] [ ' tkt-vno ' ] )
self . assertEqual ( 5 , tkt_vno , " rep = { %s } " % rep )
# Check that the kvno is not an RODC kvno
# MIT kerberos does not provide the kvno, so we treat it as optional.
# This is tested in compatability_test.py
if ' kvno ' in rep [ ' enc-part ' ] :
kvno = int ( rep [ ' enc-part ' ] [ ' kvno ' ] )
# If the high order bits are set this is an RODC kvno.
self . assertEqual ( 0 , kvno & 0xFFFF0000 , " rep = { %s } " % rep )
def check_error_rep ( self , rep , expected ) :
""" Check that the reply is an error message, with the expected
error - code specified .
"""
self . assertIsNotNone ( rep )
self . assertEqual ( rep [ ' msg-type ' ] , KRB_ERROR , " rep = { %s } " % rep )
2021-08-31 22:38:01 +12:00
if isinstance ( expected , collections . abc . Container ) :
self . assertIn ( rep [ ' error-code ' ] , expected , " rep = { %s } " % rep )
else :
self . assertEqual ( rep [ ' error-code ' ] , expected , " rep = { %s } " % rep )
2020-11-30 14:16:28 +13:00
2021-09-16 11:13:09 +12:00
def tgs_req ( self , cname , sname , realm , ticket , key , etypes ,
expected_error_mode = 0 ) :
2020-11-30 14:16:28 +13:00
''' Send a TGS-REQ, returns the response and the decrypted and
decoded enc - part
'''
kdc_options = " 0 "
subkey = self . RandomKey ( key . etype )
( ctime , cusec ) = self . get_KerberosTimeWithUsec ( )
2021-09-16 11:13:09 +12:00
tgt = KerberosTicketCreds ( ticket ,
key ,
crealm = realm ,
cname = cname )
2020-11-30 14:16:28 +13:00
2021-09-16 11:13:09 +12:00
if not expected_error_mode :
check_error_fn = None
check_rep_fn = self . generic_check_kdc_rep
else :
check_error_fn = self . generic_check_kdc_error
check_rep_fn = None
kdc_exchange_dict = self . tgs_exchange_dict (
expected_crealm = realm ,
expected_cname = cname ,
expected_srealm = realm ,
expected_sname = sname ,
expected_error_mode = expected_error_mode ,
check_error_fn = check_error_fn ,
check_rep_fn = check_rep_fn ,
check_kdc_private_fn = self . generic_check_kdc_private ,
tgt = tgt ,
authenticator_subkey = subkey ,
kdc_options = str ( kdc_options ) )
rep = self . _generic_kdc_exchange ( kdc_exchange_dict ,
cname = None ,
realm = realm ,
sname = sname ,
etypes = etypes )
if expected_error_mode :
enc_part = None
else :
ticket_creds = kdc_exchange_dict [ ' rep_ticket_creds ' ]
enc_part = ticket_creds . encpart_private
return rep , enc_part
2020-11-30 14:16:28 +13:00
# Named tuple to contain values of interest when the PAC is decoded.
PacData = namedtuple (
" PacData " ,
" account_name account_sid logon_name upn domain_name " )
PAC_LOGON_INFO = 1
PAC_CREDENTIAL_INFO = 2
PAC_SRV_CHECKSUM = 6
PAC_KDC_CHECKSUM = 7
PAC_LOGON_NAME = 10
PAC_CONSTRAINED_DELEGATION = 11
PAC_UPN_DNS_INFO = 12
def get_pac_data ( self , authorization_data ) :
''' Decode the PAC element contained in the authorization-data element
'''
account_name = None
user_sid = None
logon_name = None
upn = None
domain_name = None
# The PAC data will be wrapped in an AD_IF_RELEVANT element
ad_if_relevant_elements = (
x for x in authorization_data if x [ ' ad-type ' ] == AD_IF_RELEVANT )
for dt in ad_if_relevant_elements :
buf = self . der_decode (
dt [ ' ad-data ' ] , asn1Spec = krb5_asn1 . AD_IF_RELEVANT ( ) )
# The PAC data is further wrapped in a AD_WIN2K_PAC element
for ad in ( x for x in buf if x [ ' ad-type ' ] == AD_WIN2K_PAC ) :
pb = ndr_unpack ( krb5pac . PAC_DATA , ad [ ' ad-data ' ] )
for pac in pb . buffers :
if pac . type == self . PAC_LOGON_INFO :
account_name = (
pac . info . info . info3 . base . account_name )
user_sid = (
2020-12-11 11:55:01 +13:00
str ( pac . info . info . info3 . base . domain_sid )
+ " - " + str ( pac . info . info . info3 . base . rid ) )
2020-11-30 14:16:28 +13:00
elif pac . type == self . PAC_LOGON_NAME :
logon_name = pac . info . account_name
elif pac . type == self . PAC_UPN_DNS_INFO :
upn = pac . info . upn_name
domain_name = pac . info . dns_domain_name
return self . PacData (
account_name ,
user_sid ,
logon_name ,
upn ,
domain_name )
def decode_service_ticket ( self , creds , ticket ) :
''' Decrypt and decode a service ticket
'''
name = creds . get_username ( )
if name . endswith ( ' $ ' ) :
name = name [ : - 1 ]
realm = creds . get_realm ( )
salt = " %s . %s @ %s " % ( name , realm . lower ( ) , realm . upper ( ) )
key = self . PasswordKey_create (
ticket [ ' enc-part ' ] [ ' etype ' ] ,
creds . get_password ( ) ,
salt ,
ticket [ ' enc-part ' ] [ ' kvno ' ] )
2020-12-10 16:27:17 +13:00
enc_part = key . decrypt ( KU_TICKET , ticket [ ' enc-part ' ] [ ' cipher ' ] )
2020-11-30 14:16:28 +13:00
enc_ticket_part = self . der_decode (
enc_part , asn1Spec = krb5_asn1 . EncTicketPart ( ) )
return enc_ticket_part
2021-06-16 11:04:00 +12:00
def get_objectSid ( self , samdb , dn ) :
2020-11-30 14:16:28 +13:00
''' Get the objectSID for a DN
Note : performs an Ldb query .
'''
2021-06-16 11:04:00 +12:00
res = samdb . search ( dn , scope = SCOPE_BASE , attrs = [ " objectSID " ] )
2020-11-30 14:16:28 +13:00
self . assertTrue ( len ( res ) == 1 , " did not get objectSid for %s " % dn )
2021-06-16 11:04:00 +12:00
sid = samdb . schema_format_value ( " objectSID " , res [ 0 ] [ " objectSID " ] [ 0 ] )
2020-11-30 14:16:28 +13:00
return sid . decode ( ' utf8 ' )
2021-02-17 12:15:50 +13:00
2021-06-16 11:04:00 +12:00
def add_attribute ( self , samdb , dn_str , name , value ) :
2021-02-17 12:15:50 +13:00
if isinstance ( value , list ) :
values = value
else :
values = [ value ]
flag = ldb . FLAG_MOD_ADD
2021-06-16 11:04:00 +12:00
dn = ldb . Dn ( samdb , dn_str )
2021-02-17 12:15:50 +13:00
msg = ldb . Message ( dn )
msg [ name ] = ldb . MessageElement ( values , flag , name )
2021-06-16 11:04:00 +12:00
samdb . modify ( msg )
2021-02-17 12:15:50 +13:00
2021-06-16 11:04:00 +12:00
def modify_attribute ( self , samdb , dn_str , name , value ) :
2021-02-17 12:15:50 +13:00
if isinstance ( value , list ) :
values = value
else :
values = [ value ]
flag = ldb . FLAG_MOD_REPLACE
2021-06-16 11:04:00 +12:00
dn = ldb . Dn ( samdb , dn_str )
2021-02-17 12:15:50 +13:00
msg = ldb . Message ( dn )
msg [ name ] = ldb . MessageElement ( values , flag , name )
2021-06-16 11:04:00 +12:00
samdb . modify ( msg )
2021-04-28 11:02:47 +12:00
def create_ccache ( self , cname , ticket , enc_part ) :
""" Lay out a version 4 on-disk credentials cache, to be read using the
FILE : protocol .
"""
field = krb5ccache . DELTATIME_TAG ( )
field . kdc_sec_offset = 0
field . kdc_usec_offset = 0
v4tag = krb5ccache . V4TAG ( )
v4tag . tag = 1
v4tag . field = field
v4tags = krb5ccache . V4TAGS ( )
v4tags . tag = v4tag
v4tags . further_tags = b ' '
optional_header = krb5ccache . V4HEADER ( )
optional_header . v4tags = v4tags
cname_string = cname [ ' name-string ' ]
cprincipal = krb5ccache . PRINCIPAL ( )
cprincipal . name_type = cname [ ' name-type ' ]
cprincipal . component_count = len ( cname_string )
cprincipal . realm = ticket [ ' realm ' ]
cprincipal . components = cname_string
sname = ticket [ ' sname ' ]
sname_string = sname [ ' name-string ' ]
sprincipal = krb5ccache . PRINCIPAL ( )
sprincipal . name_type = sname [ ' name-type ' ]
sprincipal . component_count = len ( sname_string )
sprincipal . realm = ticket [ ' realm ' ]
sprincipal . components = sname_string
key = self . EncryptionKey_import ( enc_part [ ' key ' ] )
key_data = key . export_obj ( )
keyblock = krb5ccache . KEYBLOCK ( )
keyblock . enctype = key_data [ ' keytype ' ]
keyblock . data = key_data [ ' keyvalue ' ]
addresses = krb5ccache . ADDRESSES ( )
addresses . count = 0
addresses . data = [ ]
authdata = krb5ccache . AUTHDATA ( )
authdata . count = 0
authdata . data = [ ]
# Re-encode the ticket, since it was decoded by another layer.
ticket_data = self . der_encode ( ticket , asn1Spec = krb5_asn1 . Ticket ( ) )
authtime = enc_part [ ' authtime ' ]
2021-08-02 17:10:32 +12:00
starttime = enc_part . get ( ' starttime ' , authtime )
2021-04-28 11:02:47 +12:00
endtime = enc_part [ ' endtime ' ]
cred = krb5ccache . CREDENTIAL ( )
cred . client = cprincipal
cred . server = sprincipal
cred . keyblock = keyblock
2021-07-22 16:27:17 +12:00
cred . authtime = self . get_EpochFromKerberosTime ( authtime )
cred . starttime = self . get_EpochFromKerberosTime ( starttime )
cred . endtime = self . get_EpochFromKerberosTime ( endtime )
2021-05-10 16:43:03 +12:00
# Account for clock skew of up to five minutes.
2021-08-02 17:00:09 +12:00
self . assertLess ( cred . authtime - 5 * 60 ,
2021-05-10 16:43:03 +12:00
datetime . now ( timezone . utc ) . timestamp ( ) ,
" Ticket not yet valid - clocks may be out of sync. " )
2021-08-02 17:00:09 +12:00
self . assertLess ( cred . starttime - 5 * 60 ,
2021-05-10 16:43:03 +12:00
datetime . now ( timezone . utc ) . timestamp ( ) ,
" Ticket not yet valid - clocks may be out of sync. " )
2021-08-02 17:00:09 +12:00
self . assertGreater ( cred . endtime - 60 * 60 ,
2021-05-10 16:43:03 +12:00
datetime . now ( timezone . utc ) . timestamp ( ) ,
2021-08-02 17:00:09 +12:00
" Ticket already expired/about to expire - "
" clocks may be out of sync. " )
2021-05-10 16:43:03 +12:00
2021-04-28 11:02:47 +12:00
cred . renew_till = cred . endtime
cred . is_skey = 0
cred . ticket_flags = int ( enc_part [ ' flags ' ] , 2 )
cred . addresses = addresses
cred . authdata = authdata
cred . ticket = ticket_data
cred . second_ticket = b ' '
ccache = krb5ccache . CCACHE ( )
ccache . pvno = 5
ccache . version = 4
ccache . optional_header = optional_header
ccache . principal = cprincipal
ccache . cred = cred
# Serialise the credentials cache structure.
result = ndr_pack ( ccache )
# Create a temporary file and write the credentials.
cachefile = tempfile . NamedTemporaryFile ( dir = self . tempdir , delete = False )
cachefile . write ( result )
cachefile . close ( )
return cachefile
def create_ccache_with_user ( self , user_credentials , mach_name ,
service = " host " ) :
# Obtain a service ticket authorising the user and place it into a
# newly created credentials cache file.
user_name = user_credentials . get_username ( )
realm = user_credentials . get_realm ( )
# Do the initial AS-REQ, should get a pre-authentication required
# response
etype = ( AES256_CTS_HMAC_SHA1_96 , ARCFOUR_HMAC_MD5 )
cname = self . PrincipalName_create ( name_type = NT_PRINCIPAL ,
names = [ user_name ] )
sname = self . PrincipalName_create ( name_type = NT_SRV_HST ,
names = [ " krbtgt " , realm ] )
rep = self . as_req ( cname , sname , realm , etype )
2021-07-06 10:17:52 +12:00
self . check_pre_authentication ( rep )
2021-04-28 11:02:47 +12:00
# Do the next AS-REQ
2021-07-06 10:16:01 +12:00
padata = self . get_enc_timestamp_pa_data ( user_credentials , rep )
2021-04-28 11:02:47 +12:00
key = self . get_as_rep_key ( user_credentials , rep )
2021-07-06 10:16:01 +12:00
rep = self . as_req ( cname , sname , realm , etype , padata = [ padata ] )
2021-04-28 11:02:47 +12:00
self . check_as_reply ( rep )
# Request a ticket to the host service on the machine account
ticket = rep [ ' ticket ' ]
enc_part = self . get_as_rep_enc_data ( key , rep )
key = self . EncryptionKey_import ( enc_part [ ' key ' ] )
cname = self . PrincipalName_create ( name_type = NT_PRINCIPAL ,
names = [ user_name ] )
sname = self . PrincipalName_create ( name_type = NT_SRV_HST ,
names = [ service , mach_name ] )
( rep , enc_part ) = self . tgs_req (
cname , sname , realm , ticket , key , etype )
self . check_tgs_reply ( rep )
key = self . EncryptionKey_import ( enc_part [ ' key ' ] )
# Check the contents of the pac, and the ticket
ticket = rep [ ' ticket ' ]
# Write the ticket into a credentials cache file that can be ingested
# by the main credentials code.
cachefile = self . create_ccache ( cname , ticket , enc_part )
# Create a credentials object to reference the credentials cache.
creds = Credentials ( )
creds . set_kerberos_state ( MUST_USE_KERBEROS )
creds . set_username ( user_name , SPECIFIED )
creds . set_realm ( realm )
2021-06-16 11:40:41 +12:00
creds . set_named_ccache ( cachefile . name , SPECIFIED , self . get_lp ( ) )
2021-04-28 11:02:47 +12:00
# Return the credentials along with the cache file.
return ( creds , cachefile )