mirror of
https://github.com/samba-team/samba.git
synced 2024-12-23 17:34:34 +03:00
tests/krb5: Add Python implementation and tests for Group Key Distribution Service
Signed-off-by: Joseph Sutton <josephsutton@catalyst.net.nz> Reviewed-by: Andrew Bartlett <abartlet@samba.org> Autobuild-User(master): Andrew Bartlett <abartlet@samba.org> Autobuild-Date(master): Thu Dec 21 21:19:30 UTC 2023 on atb-devel-224
This commit is contained in:
parent
f6bb2d4010
commit
080a62bba8
397
python/samba/gkdi.py
Normal file
397
python/samba/gkdi.py
Normal file
@ -0,0 +1,397 @@
|
||||
# Unix SMB/CIFS implementation.
|
||||
# Copyright (C) Catalyst.Net Ltd 2023
|
||||
#
|
||||
#
|
||||
# This program is free software: you can redistribute it and/or modify
|
||||
# it under the terms of the GNU General Public License as published by
|
||||
# the Free Software Foundation, either version 3 of the License, or
|
||||
# (at your option) any later version.
|
||||
#
|
||||
# This program is distributed in the hope that it will be useful,
|
||||
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
# GNU General Public License for more details.
|
||||
#
|
||||
# You should have received a copy of the GNU General Public License
|
||||
# along with this program. If not, see <https://www.gnu.org/licenses/>.
|
||||
#
|
||||
|
||||
"""Group Key Distribution Service module"""
|
||||
|
||||
from enum import Enum
|
||||
from functools import total_ordering
|
||||
from typing import Final, Optional, Tuple
|
||||
|
||||
from cryptography.hazmat.primitives import hashes
|
||||
|
||||
from samba import _glue
|
||||
from samba.dcerpc import gkdi, misc
|
||||
from samba.ndr import ndr_pack, ndr_unpack
|
||||
from samba.nt_time import NtTime, NtTimeDelta
|
||||
|
||||
|
||||
uint64_max: Final[int] = 2**64 - 1
|
||||
|
||||
L1_KEY_ITERATION: Final[int] = _glue.GKDI_L1_KEY_ITERATION
|
||||
L2_KEY_ITERATION: Final[int] = _glue.GKDI_L2_KEY_ITERATION
|
||||
KEY_CYCLE_DURATION: Final[NtTimeDelta] = _glue.GKDI_KEY_CYCLE_DURATION
|
||||
MAX_CLOCK_SKEW: Final[NtTimeDelta] = _glue.GKDI_MAX_CLOCK_SKEW
|
||||
|
||||
KEY_LEN_BYTES: Final = 64
|
||||
|
||||
|
||||
class Algorithm(Enum):
|
||||
SHA1 = "SHA1"
|
||||
SHA256 = "SHA256"
|
||||
SHA384 = "SHA384"
|
||||
SHA512 = "SHA512"
|
||||
|
||||
def algorithm(self) -> hashes.HashAlgorithm:
|
||||
if self is Algorithm.SHA1:
|
||||
return hashes.SHA1()
|
||||
|
||||
if self is Algorithm.SHA256:
|
||||
return hashes.SHA256()
|
||||
|
||||
if self is Algorithm.SHA384:
|
||||
return hashes.SHA384()
|
||||
|
||||
if self is Algorithm.SHA512:
|
||||
return hashes.SHA512()
|
||||
|
||||
raise RuntimeError("unknown hash algorithm {self}")
|
||||
|
||||
def __repr__(self) -> str:
|
||||
return str(self)
|
||||
|
||||
@staticmethod
|
||||
def from_kdf_parameters(kdf_param: Optional[bytes]) -> "Algorithm":
|
||||
if not kdf_param:
|
||||
return Algorithm.SHA256 # the default used by Windows.
|
||||
|
||||
kdf_parameters = ndr_unpack(gkdi.KdfParameters, kdf_param)
|
||||
return Algorithm(kdf_parameters.hash_algorithm)
|
||||
|
||||
|
||||
class GkidType(Enum):
|
||||
DEFAULT = object()
|
||||
L0_SEED_KEY = object()
|
||||
L1_SEED_KEY = object()
|
||||
L2_SEED_KEY = object()
|
||||
|
||||
def description(self) -> str:
|
||||
if self is GkidType.DEFAULT:
|
||||
return "a default GKID"
|
||||
|
||||
if self is GkidType.L0_SEED_KEY:
|
||||
return "an L0 seed key"
|
||||
|
||||
if self is GkidType.L1_SEED_KEY:
|
||||
return "an L1 seed key"
|
||||
|
||||
if self is GkidType.L2_SEED_KEY:
|
||||
return "an L2 seed key"
|
||||
|
||||
raise RuntimeError("unknown GKID type {self}")
|
||||
|
||||
|
||||
class InvalidDerivation(Exception):
|
||||
pass
|
||||
|
||||
|
||||
class UndefinedStartTime(Exception):
|
||||
pass
|
||||
|
||||
|
||||
@total_ordering
|
||||
class Gkid:
|
||||
__slots__ = ["_l0_idx", "_l1_idx", "_l2_idx"]
|
||||
|
||||
max_l0_idx: Final = 0x7FFF_FFFF
|
||||
|
||||
def __init__(self, l0_idx: int, l1_idx: int, l2_idx: int) -> None:
|
||||
if not -1 <= l0_idx <= Gkid.max_l0_idx:
|
||||
raise ValueError(f"L0 index {l0_idx} out of range")
|
||||
|
||||
if not -1 <= l1_idx < L1_KEY_ITERATION:
|
||||
raise ValueError(f"L1 index {l1_idx} out of range")
|
||||
|
||||
if not -1 <= l2_idx < L2_KEY_ITERATION:
|
||||
raise ValueError(f"L2 index {l2_idx} out of range")
|
||||
|
||||
if l0_idx == -1 and l1_idx != -1:
|
||||
raise ValueError("invalid combination of negative and non‐negative indices")
|
||||
|
||||
if l1_idx == -1 and l2_idx != -1:
|
||||
raise ValueError("invalid combination of negative and non‐negative indices")
|
||||
|
||||
self._l0_idx = l0_idx
|
||||
self._l1_idx = l1_idx
|
||||
self._l2_idx = l2_idx
|
||||
|
||||
@property
|
||||
def l0_idx(self) -> int:
|
||||
return self._l0_idx
|
||||
|
||||
@property
|
||||
def l1_idx(self) -> int:
|
||||
return self._l1_idx
|
||||
|
||||
@property
|
||||
def l2_idx(self) -> int:
|
||||
return self._l2_idx
|
||||
|
||||
def gkid_type(self) -> GkidType:
|
||||
if self.l0_idx == -1:
|
||||
return GkidType.DEFAULT
|
||||
|
||||
if self.l1_idx == -1:
|
||||
return GkidType.L0_SEED_KEY
|
||||
|
||||
if self.l2_idx == -1:
|
||||
return GkidType.L1_SEED_KEY
|
||||
|
||||
return GkidType.L2_SEED_KEY
|
||||
|
||||
def wrapped_l1_idx(self) -> int:
|
||||
if self.l1_idx == -1:
|
||||
return L1_KEY_ITERATION
|
||||
|
||||
return self.l1_idx
|
||||
|
||||
def wrapped_l2_idx(self) -> int:
|
||||
if self.l2_idx == -1:
|
||||
return L2_KEY_ITERATION
|
||||
|
||||
return self.l2_idx
|
||||
|
||||
def derive_l1_seed_key(self) -> "Gkid":
|
||||
gkid_type = self.gkid_type()
|
||||
if (
|
||||
gkid_type is not GkidType.L0_SEED_KEY
|
||||
and gkid_type is not GkidType.L1_SEED_KEY
|
||||
):
|
||||
raise InvalidDerivation(
|
||||
"Invalid attempt to derive an L1 seed key from"
|
||||
f" {gkid_type.description()}"
|
||||
)
|
||||
|
||||
if self.l1_idx == 0:
|
||||
raise InvalidDerivation("No further derivation of L1 seed keys is possible")
|
||||
|
||||
return Gkid(self.l0_idx, self.wrapped_l1_idx() - 1, self.l2_idx)
|
||||
|
||||
def derive_l2_seed_key(self) -> "Gkid":
|
||||
gkid_type = self.gkid_type()
|
||||
if (
|
||||
gkid_type is not GkidType.L1_SEED_KEY
|
||||
and gkid_type is not GkidType.L2_SEED_KEY
|
||||
):
|
||||
raise InvalidDerivation(
|
||||
f"Attempt to derive an L2 seed key from {gkid_type.description()}"
|
||||
)
|
||||
|
||||
if self.l2_idx == 0:
|
||||
raise InvalidDerivation("No further derivation of L2 seed keys is possible")
|
||||
|
||||
return Gkid(self.l0_idx, self.l1_idx, self.wrapped_l2_idx() - 1)
|
||||
|
||||
def __str__(self) -> str:
|
||||
return f"Gkid({self.l0_idx}, {self.l1_idx}, {self.l2_idx})"
|
||||
|
||||
def __repr__(self) -> str:
|
||||
cls = type(self)
|
||||
return (
|
||||
f"{cls.__qualname__}({repr(self.l0_idx)}, {repr(self.l1_idx)},"
|
||||
f" {repr(self.l2_idx)})"
|
||||
)
|
||||
|
||||
def __eq__(self, other: object) -> bool:
|
||||
if not isinstance(other, Gkid):
|
||||
return NotImplemented
|
||||
|
||||
return (self.l0_idx, self.l1_idx, self.l2_idx) == (
|
||||
other.l0_idx,
|
||||
other.l1_idx,
|
||||
other.l2_idx,
|
||||
)
|
||||
|
||||
def __lt__(self, other: object) -> bool:
|
||||
if not isinstance(other, Gkid):
|
||||
return NotImplemented
|
||||
|
||||
def as_tuple(gkid: Gkid) -> Tuple[int, int, int]:
|
||||
l0_idx, l1_idx, l2_idx = gkid.l0_idx, gkid.l1_idx, gkid.l2_idx
|
||||
|
||||
# DEFAULT is considered less than everything else, so that the
|
||||
# lexical ordering requirement in [MS-GKDI] 3.1.4.1.3 (GetKey) makes
|
||||
# sense.
|
||||
if gkid.gkid_type() is not GkidType.DEFAULT:
|
||||
# Use the wrapped indices so that L1 seed keys are considered
|
||||
# greater than their children L2 seed keys, and L0 seed keys are
|
||||
# considered greater than their children L1 seed keys.
|
||||
l1_idx = gkid.wrapped_l1_idx()
|
||||
l2_idx = gkid.wrapped_l2_idx()
|
||||
|
||||
return l0_idx, l1_idx, l2_idx
|
||||
|
||||
return as_tuple(self) < as_tuple(other)
|
||||
|
||||
def __hash__(self) -> int:
|
||||
return hash((self.l0_idx, self.l1_idx, self.l2_idx))
|
||||
|
||||
@staticmethod
|
||||
def default() -> "Gkid":
|
||||
return Gkid(-1, -1, -1)
|
||||
|
||||
@staticmethod
|
||||
def l0_seed_key(l0_idx: int) -> "Gkid":
|
||||
return Gkid(l0_idx, -1, -1)
|
||||
|
||||
@staticmethod
|
||||
def l1_seed_key(l0_idx: int, l1_idx: int) -> "Gkid":
|
||||
return Gkid(l0_idx, l1_idx, -1)
|
||||
|
||||
@staticmethod
|
||||
def from_nt_time(nt_time: NtTime) -> "Gkid":
|
||||
l0 = nt_time // (L1_KEY_ITERATION * L2_KEY_ITERATION * KEY_CYCLE_DURATION)
|
||||
l1 = (
|
||||
nt_time
|
||||
% (L1_KEY_ITERATION * L2_KEY_ITERATION * KEY_CYCLE_DURATION)
|
||||
// (L2_KEY_ITERATION * KEY_CYCLE_DURATION)
|
||||
)
|
||||
l2 = nt_time % (L2_KEY_ITERATION * KEY_CYCLE_DURATION) // KEY_CYCLE_DURATION
|
||||
|
||||
return Gkid(l0, l1, l2)
|
||||
|
||||
def start_nt_time(self) -> NtTime:
|
||||
gkid_type = self.gkid_type()
|
||||
if gkid_type is not GkidType.L2_SEED_KEY:
|
||||
raise UndefinedStartTime(
|
||||
f"{gkid_type.description()} has no defined start time"
|
||||
)
|
||||
|
||||
start_time = NtTime(
|
||||
(
|
||||
self.l0_idx * L1_KEY_ITERATION * L2_KEY_ITERATION
|
||||
+ self.l1_idx * L2_KEY_ITERATION
|
||||
+ self.l2_idx
|
||||
)
|
||||
* KEY_CYCLE_DURATION
|
||||
)
|
||||
|
||||
if not 0 <= start_time <= uint64_max:
|
||||
raise OverflowError(f"start time {start_time} out of range")
|
||||
|
||||
return start_time
|
||||
|
||||
|
||||
class SeedKeyPair:
|
||||
__slots__ = ["l1_key", "l2_key", "gkid", "hash_algorithm", "root_key_id"]
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
l1_key: Optional[bytes],
|
||||
l2_key: Optional[bytes],
|
||||
gkid: Gkid,
|
||||
hash_algorithm: Algorithm,
|
||||
root_key_id: misc.GUID,
|
||||
) -> None:
|
||||
if l1_key is not None and len(l1_key) != KEY_LEN_BYTES:
|
||||
raise ValueError(f"L1 key ({repr(l1_key)}) must be {KEY_LEN_BYTES} bytes")
|
||||
if l2_key is not None and len(l2_key) != KEY_LEN_BYTES:
|
||||
raise ValueError(f"L2 key ({repr(l2_key)}) must be {KEY_LEN_BYTES} bytes")
|
||||
|
||||
self.l1_key = l1_key
|
||||
self.l2_key = l2_key
|
||||
self.gkid = gkid
|
||||
self.hash_algorithm = hash_algorithm
|
||||
self.root_key_id = root_key_id
|
||||
|
||||
def __str__(self) -> str:
|
||||
l1_key_hex = None if self.l1_key is None else self.l1_key.hex()
|
||||
l2_key_hex = None if self.l2_key is None else self.l2_key.hex()
|
||||
|
||||
return (
|
||||
f"SeedKeyPair(L1Key({l1_key_hex}), L2Key({l2_key_hex}), {self.gkid},"
|
||||
f" {self.root_key_id}, {self.hash_algorithm})"
|
||||
)
|
||||
|
||||
def __repr__(self) -> str:
|
||||
cls = type(self)
|
||||
return (
|
||||
f"{cls.__qualname__}({repr(self.l1_key)}, {repr(self.l2_key)},"
|
||||
f" {repr(self.gkid)}, {repr(self.hash_algorithm)},"
|
||||
f" {repr(self.root_key_id)})"
|
||||
)
|
||||
|
||||
def __eq__(self, other: object) -> bool:
|
||||
if not isinstance(other, SeedKeyPair):
|
||||
return NotImplemented
|
||||
|
||||
return (
|
||||
self.l1_key,
|
||||
self.l2_key,
|
||||
self.gkid,
|
||||
self.hash_algorithm,
|
||||
self.root_key_id,
|
||||
) == (
|
||||
other.l1_key,
|
||||
other.l2_key,
|
||||
other.gkid,
|
||||
other.hash_algorithm,
|
||||
other.root_key_id,
|
||||
)
|
||||
|
||||
def __hash__(self) -> int:
|
||||
return hash((
|
||||
self.l1_key,
|
||||
self.l2_key,
|
||||
self.gkid,
|
||||
self.hash_algorithm,
|
||||
ndr_pack(self.root_key_id),
|
||||
))
|
||||
|
||||
|
||||
class GroupKey:
|
||||
__slots__ = ["gkid", "key", "hash_algorithm", "root_key_id"]
|
||||
|
||||
def __init__(
|
||||
self, key: bytes, gkid: Gkid, hash_algorithm: Algorithm, root_key_id: misc.GUID
|
||||
) -> None:
|
||||
if key is not None and len(key) != KEY_LEN_BYTES:
|
||||
raise ValueError(f"Key ({repr(key)}) must be {KEY_LEN_BYTES} bytes")
|
||||
|
||||
self.key = key
|
||||
self.gkid = gkid
|
||||
self.hash_algorithm = hash_algorithm
|
||||
self.root_key_id = root_key_id
|
||||
|
||||
def __str__(self) -> str:
|
||||
return (
|
||||
f"GroupKey(Key({self.key.hex()}), {self.gkid}, {self.hash_algorithm},"
|
||||
f" {self.root_key_id})"
|
||||
)
|
||||
|
||||
def __repr__(self) -> str:
|
||||
cls = type(self)
|
||||
return (
|
||||
f"{cls.__qualname__}({repr(self.key)}, {repr(self.gkid)},"
|
||||
f" {repr(self.hash_algorithm)}, {repr(self.root_key_id)})"
|
||||
)
|
||||
|
||||
def __eq__(self, other: object) -> bool:
|
||||
if not isinstance(other, GroupKey):
|
||||
return NotImplemented
|
||||
|
||||
return (self.key, self.gkid, self.hash_algorithm, self.root_key_id) == (
|
||||
other.key,
|
||||
other.gkid,
|
||||
other.hash_algorithm,
|
||||
other.root_key_id,
|
||||
)
|
||||
|
||||
def __hash__(self) -> int:
|
||||
return hash(
|
||||
(self.key, self.gkid, self.hash_algorithm, ndr_pack(self.root_key_id))
|
||||
)
|
644
python/samba/tests/gkdi.py
Normal file
644
python/samba/tests/gkdi.py
Normal file
@ -0,0 +1,644 @@
|
||||
#
|
||||
# Helper classes for testing the Group Key Distribution Service.
|
||||
#
|
||||
# Copyright (C) Catalyst.Net Ltd 2023
|
||||
#
|
||||
# This program is free software: you can redistribute it and/or modify
|
||||
# it under the terms of the GNU General Public License as published by
|
||||
# the Free Software Foundation, either version 3 of the License, or
|
||||
# (at your option) any later version.
|
||||
#
|
||||
# This program is distributed in the hope that it will be useful,
|
||||
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
# GNU General Public License for more details.
|
||||
#
|
||||
# You should have received a copy of the GNU General Public License
|
||||
# along with this program. If not, see <https://www.gnu.org/licenses/>.
|
||||
#
|
||||
|
||||
import sys
|
||||
import os
|
||||
|
||||
sys.path.insert(0, "bin/python")
|
||||
os.environ["PYTHONUNBUFFERED"] = "1"
|
||||
|
||||
import datetime
|
||||
import secrets
|
||||
from typing import Final, NewType, Optional, Tuple, Union
|
||||
|
||||
import ldb
|
||||
|
||||
from cryptography.hazmat.backends import default_backend
|
||||
from cryptography.hazmat.primitives import hashes
|
||||
from cryptography.hazmat.primitives.kdf.kbkdf import CounterLocation, KBKDFHMAC, Mode
|
||||
|
||||
from samba import (
|
||||
HRES_E_INVALIDARG,
|
||||
HRES_NTE_BAD_KEY,
|
||||
HRES_NTE_NO_KEY,
|
||||
ntstatus,
|
||||
NTSTATUSError,
|
||||
werror,
|
||||
)
|
||||
from samba.credentials import Credentials
|
||||
from samba.dcerpc import gkdi, misc
|
||||
from samba.gkdi import (
|
||||
Algorithm,
|
||||
Gkid,
|
||||
GkidType,
|
||||
GroupKey,
|
||||
KEY_CYCLE_DURATION,
|
||||
KEY_LEN_BYTES,
|
||||
MAX_CLOCK_SKEW,
|
||||
SeedKeyPair,
|
||||
)
|
||||
from samba.ndr import ndr_pack, ndr_unpack
|
||||
from samba.nt_time import (
|
||||
nt_time_from_datetime,
|
||||
NtTime,
|
||||
NtTimeDelta,
|
||||
timedelta_from_nt_time_delta,
|
||||
)
|
||||
from samba.param import LoadParm
|
||||
from samba.samdb import SamDB
|
||||
|
||||
from samba.tests import delete_force, TestCase
|
||||
|
||||
|
||||
HResult = NewType("HResult", int)
|
||||
RootKey = NewType("RootKey", ldb.Message)
|
||||
|
||||
|
||||
ROOT_KEY_START_TIME: Final = NtTime(KEY_CYCLE_DURATION + MAX_CLOCK_SKEW)
|
||||
|
||||
|
||||
class GetKeyError(Exception):
|
||||
def __init__(self, status: HResult, message: str):
|
||||
super().__init__(status, message)
|
||||
|
||||
|
||||
class GkdiBaseTest(TestCase):
|
||||
# This is the NDR‐encoded security descriptor O:SYD:(A;;FRFW;;;S-1-5-9).
|
||||
gmsa_sd = (
|
||||
b"\x01\x00\x04\x800\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00"
|
||||
b"\x14\x00\x00\x00\x02\x00\x1c\x00\x01\x00\x00\x00\x00\x00\x14\x00"
|
||||
b"\x9f\x01\x12\x00\x01\x01\x00\x00\x00\x00\x00\x05\t\x00\x00\x00"
|
||||
b"\x01\x01\x00\x00\x00\x00\x00\x05\x12\x00\x00\x00"
|
||||
)
|
||||
|
||||
@staticmethod
|
||||
def current_time(offset: Optional[datetime.timedelta] = None) -> datetime.datetime:
|
||||
if offset is None:
|
||||
# Allow for clock skew.
|
||||
offset = timedelta_from_nt_time_delta(MAX_CLOCK_SKEW)
|
||||
|
||||
current_time = datetime.datetime.now(tz=datetime.timezone.utc)
|
||||
return current_time + offset
|
||||
|
||||
def current_nt_time(self, offset: Optional[datetime.timedelta] = None) -> NtTime:
|
||||
return nt_time_from_datetime(self.current_time(offset))
|
||||
|
||||
def current_gkid(self, offset: Optional[datetime.timedelta] = None) -> Gkid:
|
||||
return Gkid.from_nt_time(self.current_nt_time(offset))
|
||||
|
||||
def gkdi_connect(
|
||||
self, host: str, lp: LoadParm, server_creds: Credentials
|
||||
) -> gkdi.gkdi:
|
||||
try:
|
||||
return gkdi.gkdi(f"ncacn_ip_tcp:{host}[seal]", lp, server_creds)
|
||||
except NTSTATUSError as err:
|
||||
if err.args[0] == ntstatus.NT_STATUS_PORT_UNREACHABLE:
|
||||
self.fail(
|
||||
"Try starting the Microsoft Key Distribution Service (KdsSvc).\n"
|
||||
"In PowerShell, run:\n\tStart-Service -Name KdsSvc"
|
||||
)
|
||||
|
||||
raise
|
||||
|
||||
def rpc_get_key(
|
||||
self,
|
||||
conn: gkdi.gkdi,
|
||||
target_sd: bytes,
|
||||
root_key_id: Optional[misc.GUID],
|
||||
gkid: Gkid,
|
||||
) -> SeedKeyPair:
|
||||
out_len, out, result = conn.GetKey(
|
||||
list(target_sd), root_key_id, gkid.l0_idx, gkid.l1_idx, gkid.l2_idx
|
||||
)
|
||||
result_code, result_string = result
|
||||
if (
|
||||
root_key_id is None
|
||||
and result_code & 0xFFFF == werror.WERR_TOO_MANY_OPEN_FILES
|
||||
):
|
||||
self.fail(
|
||||
"The server has given up selecting a root key because there are too"
|
||||
" many keys (more than 1000) in the Master Root Keys container. Delete"
|
||||
" some root keys and try again."
|
||||
)
|
||||
if result != (0, None):
|
||||
raise GetKeyError(result_code, result_string)
|
||||
self.assertEqual(len(out), out_len, "output len mismatch")
|
||||
|
||||
envelope = ndr_unpack(gkdi.GroupKeyEnvelope, bytes(out))
|
||||
|
||||
gkid = Gkid(envelope.l0_index, envelope.l1_index, envelope.l2_index)
|
||||
l1_key = bytes(envelope.l1_key) if envelope.l1_key else None
|
||||
l2_key = bytes(envelope.l2_key) if envelope.l2_key else None
|
||||
|
||||
hash_algorithm = Algorithm.from_kdf_parameters(bytes(envelope.kdf_parameters))
|
||||
|
||||
root_key_id = envelope.root_key_id
|
||||
|
||||
return SeedKeyPair(l1_key, l2_key, gkid, hash_algorithm, root_key_id)
|
||||
|
||||
def get_root_key_object(
|
||||
self, samdb: SamDB, root_key_id: Optional[misc.GUID], gkid: Gkid
|
||||
) -> Tuple[RootKey, misc.GUID]:
|
||||
"""Return a root key object and its corresponding GUID.
|
||||
|
||||
*root_key_id* specifies the GUID of the root key object to return. It
|
||||
can be ``None`` to indicate that the selected key should be the most
|
||||
recently created key starting not after the time indicated by *gkid*.
|
||||
|
||||
Bear in mind as that the Microsoft Key Distribution Service caches root
|
||||
keys, the most recently created key might not be the one that Windows
|
||||
chooses."""
|
||||
|
||||
root_key_attrs = [
|
||||
"cn",
|
||||
"msKds-CreateTime",
|
||||
"msKds-KDFAlgorithmID",
|
||||
"msKds-KDFParam",
|
||||
"msKds-RootKeyData",
|
||||
"msKds-UseStartTime",
|
||||
"msKds-Version",
|
||||
]
|
||||
|
||||
gkid_start_nt_time = gkid.start_nt_time()
|
||||
|
||||
exact_key_specified = root_key_id is not None
|
||||
if exact_key_specified:
|
||||
root_key_dn = self.get_root_key_container_dn(samdb)
|
||||
root_key_dn.add_child(f"CN={root_key_id}")
|
||||
|
||||
try:
|
||||
root_key_res = samdb.search(
|
||||
root_key_dn, scope=ldb.SCOPE_BASE, attrs=root_key_attrs
|
||||
)
|
||||
except ldb.LdbError as err:
|
||||
if err.args[0] == ldb.ERR_NO_SUCH_OBJECT:
|
||||
raise GetKeyError(HRES_NTE_NO_KEY, "no such root key exists")
|
||||
|
||||
raise
|
||||
|
||||
root_key_object = root_key_res[0]
|
||||
else:
|
||||
root_keys = samdb.search(
|
||||
self.get_root_key_container_dn(samdb),
|
||||
scope=ldb.SCOPE_SUBTREE,
|
||||
expression=f"(msKds-UseStartTime<={gkid_start_nt_time})",
|
||||
attrs=root_key_attrs,
|
||||
)
|
||||
if not root_keys:
|
||||
raise GetKeyError(
|
||||
HRES_NTE_NO_KEY, "no root keys exist at specified time"
|
||||
)
|
||||
|
||||
def root_key_create_time(key: RootKey) -> NtTime:
|
||||
create_time = key.get("msKds-CreateTime", idx=0)
|
||||
if create_time is None:
|
||||
return NtTime(0)
|
||||
|
||||
return NtTime(int(create_time))
|
||||
|
||||
root_key_object = max(root_keys, key=root_key_create_time)
|
||||
|
||||
root_key_cn = root_key_object.get("cn", idx=0)
|
||||
self.assertIsNotNone(root_key_cn)
|
||||
root_key_id = misc.GUID(root_key_cn)
|
||||
|
||||
use_start_nt_time = NtTime(
|
||||
int(root_key_object.get("msKds-UseStartTime", idx=0))
|
||||
)
|
||||
if use_start_nt_time == 0:
|
||||
raise GetKeyError(HRES_NTE_BAD_KEY, "root key effective time is 0")
|
||||
use_start_nt_time = NtTime(
|
||||
use_start_nt_time - NtTimeDelta(KEY_CYCLE_DURATION + MAX_CLOCK_SKEW)
|
||||
)
|
||||
|
||||
if exact_key_specified and not (0 <= use_start_nt_time <= gkid_start_nt_time):
|
||||
raise GetKeyError(HRES_E_INVALIDARG, "root key is not yet valid")
|
||||
|
||||
return root_key_object, root_key_id
|
||||
|
||||
def validate_get_key_request(
|
||||
self, gkid: Gkid, current_gkid: Gkid, root_key_specified: bool
|
||||
) -> None:
|
||||
if gkid > current_gkid:
|
||||
raise GetKeyError(
|
||||
HRES_E_INVALIDARG, "invalid request for a key from the future"
|
||||
)
|
||||
|
||||
gkid_type = gkid.gkid_type()
|
||||
if gkid_type is GkidType.DEFAULT:
|
||||
derived_from = (
|
||||
" derived from the specified root key" if root_key_specified else ""
|
||||
)
|
||||
raise NotImplementedError(
|
||||
f"The latest group key{derived_from} is being requested."
|
||||
)
|
||||
|
||||
if gkid_type is not GkidType.L2_SEED_KEY:
|
||||
raise GetKeyError(
|
||||
HRES_E_INVALIDARG, f"invalid request for {gkid_type.description()}"
|
||||
)
|
||||
|
||||
def get_key(
|
||||
self,
|
||||
samdb: SamDB,
|
||||
target_sd: bytes, # An NDR‐encoded valid security descriptor in self‐relative format.
|
||||
root_key_id: Optional[misc.GUID],
|
||||
gkid: Gkid,
|
||||
*,
|
||||
root_key_id_hint: Optional[misc.GUID] = None,
|
||||
current_gkid: Optional[Gkid] = None,
|
||||
) -> SeedKeyPair:
|
||||
"""Emulate the ISDKey.GetKey() RPC method.
|
||||
|
||||
When passed a NULL root key ID, GetKey() may use a cached root key
|
||||
rather than picking the most recently created applicable key as the
|
||||
documentation implies. If it’s important to arrive at the same result as
|
||||
Windows, pass a GUID in the *root_key_id_hint* parameter to specify a
|
||||
particular root key to use."""
|
||||
|
||||
if current_gkid is None:
|
||||
current_gkid = self.current_gkid()
|
||||
|
||||
root_key_specified = root_key_id is not None
|
||||
if root_key_specified:
|
||||
self.assertIsNone(
|
||||
root_key_id_hint, "don’t provide both root key ID parameters"
|
||||
)
|
||||
|
||||
self.validate_get_key_request(gkid, current_gkid, root_key_specified)
|
||||
|
||||
root_key_object, root_key_id = self.get_root_key_object(
|
||||
samdb, root_key_id if root_key_specified else root_key_id_hint, gkid
|
||||
)
|
||||
|
||||
if root_key_specified:
|
||||
if gkid.l0_idx < current_gkid.l0_idx:
|
||||
# All of the seed keys with an L0 index less than the current L0
|
||||
# index are from the past and thus are safe to return. If the
|
||||
# caller has requested a specific seed key with a past L0 index,
|
||||
# return the L1 seed key (L0, 31, −1), from which any L1 or L2
|
||||
# seed key having that L0 index can be derived.
|
||||
l1_gkid = Gkid(gkid.l0_idx, 31, -1)
|
||||
seed_key = self.compute_seed_key(
|
||||
target_sd, root_key_id, root_key_object, l1_gkid
|
||||
)
|
||||
return SeedKeyPair(
|
||||
seed_key.key,
|
||||
None,
|
||||
Gkid(gkid.l0_idx, 31, 31),
|
||||
seed_key.hash_algorithm,
|
||||
root_key_id,
|
||||
)
|
||||
|
||||
# All of the previous seed keys with an L0 index equal to the
|
||||
# current L0 index can be derived from the current seed key or from
|
||||
# the next older L1 seed key.
|
||||
gkid = current_gkid
|
||||
|
||||
if gkid.l2_idx == 31:
|
||||
# The current seed key, and all previous seed keys with that same L0
|
||||
# index, can be derived from the L1 seed key (L0, L1, 31).
|
||||
l1_gkid = Gkid(gkid.l0_idx, gkid.l1_idx, -1)
|
||||
seed_key = self.compute_seed_key(
|
||||
target_sd, root_key_id, root_key_object, l1_gkid
|
||||
)
|
||||
return SeedKeyPair(
|
||||
seed_key.key, None, gkid, seed_key.hash_algorithm, root_key_id
|
||||
)
|
||||
|
||||
# Compute the L2 seed key to return.
|
||||
seed_key = self.compute_seed_key(target_sd, root_key_id, root_key_object, gkid)
|
||||
|
||||
next_older_seed_key = None
|
||||
if gkid.l1_idx != 0:
|
||||
# From the current seed key can be derived only those seed keys that
|
||||
# share its L1 and L2 indices. To be able to derive previous seed
|
||||
# keys with older L1 indices, the caller must be given the next
|
||||
# older L1 seed key as well.
|
||||
next_older_l1_gkid = Gkid(gkid.l0_idx, gkid.l1_idx - 1, -1)
|
||||
next_older_seed_key = self.compute_seed_key(
|
||||
target_sd, root_key_id, root_key_object, next_older_l1_gkid
|
||||
).key
|
||||
|
||||
return SeedKeyPair(
|
||||
next_older_seed_key,
|
||||
seed_key.key,
|
||||
gkid,
|
||||
seed_key.hash_algorithm,
|
||||
root_key_id,
|
||||
)
|
||||
|
||||
def get_key_exact(
|
||||
self,
|
||||
samdb: SamDB,
|
||||
target_sd: bytes, # An NDR‐encoded valid security descriptor in self‐relative format.
|
||||
root_key_id: Optional[misc.GUID],
|
||||
gkid: Gkid,
|
||||
current_gkid: Optional[Gkid] = None,
|
||||
) -> GroupKey:
|
||||
if current_gkid is None:
|
||||
current_gkid = self.current_gkid()
|
||||
|
||||
root_key_specified = root_key_id is not None
|
||||
self.validate_get_key_request(gkid, current_gkid, root_key_specified)
|
||||
|
||||
root_key_object, root_key_id = self.get_root_key_object(
|
||||
samdb, root_key_id, gkid
|
||||
)
|
||||
|
||||
return self.compute_seed_key(target_sd, root_key_id, root_key_object, gkid)
|
||||
|
||||
def get_root_key_data(self, root_key: RootKey) -> Tuple[bytes, Algorithm]:
|
||||
version = root_key.get("msKds-Version", idx=0)
|
||||
self.assertEqual(b"1", version)
|
||||
|
||||
algorithm_id = root_key.get("msKds-KDFAlgorithmID", idx=0)
|
||||
self.assertEqual(b"SP800_108_CTR_HMAC", algorithm_id)
|
||||
|
||||
hash_algorithm = Algorithm.from_kdf_parameters(
|
||||
root_key.get("msKds-KDFParam", idx=0)
|
||||
)
|
||||
|
||||
root_key_data = root_key.get("msKds-RootKeyData", idx=0)
|
||||
self.assertIsInstance(root_key_data, bytes)
|
||||
|
||||
return root_key_data, hash_algorithm
|
||||
|
||||
def compute_seed_key(
|
||||
self,
|
||||
target_sd: bytes,
|
||||
root_key_id: misc.GUID,
|
||||
root_key: RootKey,
|
||||
target_gkid: Gkid,
|
||||
) -> GroupKey:
|
||||
target_gkid_type = target_gkid.gkid_type()
|
||||
self.assertIn(
|
||||
target_gkid_type,
|
||||
(GkidType.L1_SEED_KEY, GkidType.L2_SEED_KEY),
|
||||
f"unexpected attempt to compute {target_gkid_type.description()}",
|
||||
)
|
||||
|
||||
root_key_data, algorithm = self.get_root_key_data(root_key)
|
||||
root_key_id_bytes = ndr_pack(root_key_id)
|
||||
|
||||
hash_algorithm = algorithm.algorithm()
|
||||
|
||||
# Derive the L0 seed key.
|
||||
gkid = Gkid.l0_seed_key(target_gkid.l0_idx)
|
||||
key = self.derive_key(root_key_data, root_key_id_bytes, hash_algorithm, gkid)
|
||||
|
||||
# Derive the L1 seed key.
|
||||
|
||||
gkid = gkid.derive_l1_seed_key()
|
||||
key = self.derive_key(
|
||||
key, root_key_id_bytes, hash_algorithm, gkid, target_sd=target_sd
|
||||
)
|
||||
|
||||
while gkid.l1_idx != target_gkid.l1_idx:
|
||||
gkid = gkid.derive_l1_seed_key()
|
||||
key = self.derive_key(key, root_key_id_bytes, hash_algorithm, gkid)
|
||||
|
||||
# Derive the L2 seed key.
|
||||
while gkid != target_gkid:
|
||||
gkid = gkid.derive_l2_seed_key()
|
||||
key = self.derive_key(key, root_key_id_bytes, hash_algorithm, gkid)
|
||||
|
||||
return GroupKey(key, gkid, algorithm, root_key_id)
|
||||
|
||||
def derive_key(
|
||||
self,
|
||||
key: bytes,
|
||||
root_key_id_bytes: bytes,
|
||||
hash_algorithm: hashes.HashAlgorithm,
|
||||
gkid: Gkid,
|
||||
*,
|
||||
target_sd: Optional[bytes] = None,
|
||||
) -> bytes:
|
||||
def u32_bytes(n: int) -> bytes:
|
||||
return (n & 0xFFFF_FFFF).to_bytes(length=4, byteorder="little")
|
||||
|
||||
context = (
|
||||
root_key_id_bytes
|
||||
+ u32_bytes(gkid.l0_idx)
|
||||
+ u32_bytes(gkid.l1_idx)
|
||||
+ u32_bytes(gkid.l2_idx)
|
||||
)
|
||||
if target_sd is not None:
|
||||
context += target_sd
|
||||
return self.kdf(hash_algorithm, key, context)
|
||||
|
||||
def kdf(
|
||||
self,
|
||||
hash_algorithm: hashes.HashAlgorithm,
|
||||
key: bytes,
|
||||
context: bytes,
|
||||
*,
|
||||
label="KDS service",
|
||||
len_in_bytes=KEY_LEN_BYTES,
|
||||
) -> bytes:
|
||||
label = label.encode("utf-16-le") + b"\x00\x00"
|
||||
kdf = KBKDFHMAC(
|
||||
algorithm=hash_algorithm,
|
||||
mode=Mode.CounterMode,
|
||||
length=len_in_bytes,
|
||||
rlen=4,
|
||||
llen=4,
|
||||
location=CounterLocation.BeforeFixed,
|
||||
label=label,
|
||||
context=context,
|
||||
fixed=None,
|
||||
backend=default_backend(),
|
||||
)
|
||||
return kdf.derive(key)
|
||||
|
||||
def get_config_dn(self, samdb: SamDB, dn: str) -> ldb.Dn:
|
||||
config_dn = samdb.get_config_basedn()
|
||||
config_dn.add_child(dn)
|
||||
return config_dn
|
||||
|
||||
def get_server_config_dn(self, samdb: SamDB) -> ldb.Dn:
|
||||
# [MS-GKDI] has “CN=Sid Key Service” for “CN=Group Key Distribution
|
||||
# Service”, and “CN=SID Key Server Configuration” for “CN=Group Key
|
||||
# Distribution Service Server Configuration”.
|
||||
return self.get_config_dn(
|
||||
samdb,
|
||||
"CN=Group Key Distribution Service Server Configuration,"
|
||||
"CN=Server Configuration,"
|
||||
"CN=Group Key Distribution Service,"
|
||||
"CN=Services",
|
||||
)
|
||||
|
||||
def get_root_key_container_dn(self, samdb: SamDB) -> ldb.Dn:
|
||||
# [MS-GKDI] has “CN=Sid Key Service” for “CN=Group Key Distribution Service”.
|
||||
return self.get_config_dn(
|
||||
samdb,
|
||||
"CN=Master Root Keys,CN=Group Key Distribution Service,CN=Services",
|
||||
)
|
||||
|
||||
def create_root_key(
|
||||
self,
|
||||
samdb: SamDB,
|
||||
domain_dn: ldb.Dn,
|
||||
*,
|
||||
use_start_time: Optional[Union[datetime.datetime, NtTime]] = None,
|
||||
hash_algorithm: Optional[Algorithm] = Algorithm.SHA512,
|
||||
guid: Optional[misc.GUID] = None,
|
||||
data: Optional[bytes] = None,
|
||||
) -> misc.GUID:
|
||||
# [MS-GKDI] 3.1.4.1.1, “Creating a New Root Key”, states that if the
|
||||
# server receives a GetKey request and the root keys container in Active
|
||||
# Directory is empty, the the server must create a new root key object
|
||||
# based on the default Server Configuration object. Additional root keys
|
||||
# are to be created based on either the default Server Configuration
|
||||
# object or an updated one specifying optional configuration values.
|
||||
|
||||
guid_specified = guid is not None
|
||||
if not guid_specified:
|
||||
guid = misc.GUID(secrets.token_bytes(16))
|
||||
|
||||
if data is None:
|
||||
data = secrets.token_bytes(KEY_LEN_BYTES)
|
||||
else:
|
||||
self.assertEqual(
|
||||
KEY_LEN_BYTES,
|
||||
len(data),
|
||||
f"root key data must be {KEY_LEN_BYTES} bytes",
|
||||
)
|
||||
|
||||
create_time = current_nt_time = self.current_nt_time()
|
||||
|
||||
if use_start_time is None:
|
||||
# Root keys created by Windows without the ‘-EffectiveImmediately’
|
||||
# parameter have an effective time of exactly ten days in the
|
||||
# future, presumably to allow time for replication.
|
||||
#
|
||||
# Microsoft’s documentation on creating a KDS root key, located at
|
||||
# https://learn.microsoft.com/en-us/windows-server/security/group-managed-service-accounts/create-the-key-distribution-services-kds-root-key,
|
||||
# claims to the contrary that domain controllers will only wait up
|
||||
# to ten hours before allowing Group Managed Service Accounts to be
|
||||
# created.
|
||||
#
|
||||
# The same page includes instructions for creating a root key with
|
||||
# an effective time of ten hours in the past (for testing purposes),
|
||||
# but I’m not sure why — the KDS will consider a key valid for use
|
||||
# immediately after its start time has passed, without bothering to
|
||||
# wait ten hours first. In fact, it will consider a key to be valid
|
||||
# a full ten hours (plus clock skew) *before* its declared start
|
||||
# time — intentional, or (conceivably) the result of an accidental
|
||||
# negation?
|
||||
current_interval_start_nt_time = Gkid.from_nt_time(
|
||||
current_nt_time
|
||||
).start_nt_time()
|
||||
use_start_time = NtTime(
|
||||
current_interval_start_nt_time + KEY_CYCLE_DURATION + MAX_CLOCK_SKEW
|
||||
)
|
||||
|
||||
if isinstance(use_start_time, datetime.datetime):
|
||||
use_start_nt_time = nt_time_from_datetime(use_start_time)
|
||||
else:
|
||||
self.assertIsInstance(use_start_time, int)
|
||||
use_start_nt_time = use_start_time
|
||||
|
||||
kdf_parameters = None
|
||||
if hash_algorithm is not None:
|
||||
kdf_parameters = gkdi.KdfParameters()
|
||||
kdf_parameters.hash_algorithm = hash_algorithm.value
|
||||
kdf_parameters = ndr_pack(kdf_parameters)
|
||||
|
||||
# These are the encoded p and g values, respectively, of the “2048‐bit
|
||||
# MODP Group with 256‐bit Prime Order Subgroup” from RFC 5114 section
|
||||
# 2.3.
|
||||
field_order = (
|
||||
b"\x87\xa8\xe6\x1d\xb4\xb6f<\xff\xbb\xd1\x9ce\x19Y\x99\x8c\xee\xf6\x08"
|
||||
b"f\r\xd0\xf2],\xee\xd4C^;\x00\xe0\r\xf8\xf1\xd6\x19W\xd4\xfa\xf7\xdfE"
|
||||
b"a\xb2\xaa0\x16\xc3\xd9\x114\to\xaa;\xf4)m\x83\x0e\x9a|"
|
||||
b" \x9e\x0cd\x97Qz\xbd"
|
||||
b'Z\x8a\x9d0k\xcfg\xed\x91\xf9\xe6r[GX\xc0"\xe0\xb1\xefBu\xbf{l[\xfc\x11'
|
||||
b"\xd4_\x90\x88\xb9A\xf5N\xb1\xe5\x9b\xb8\xbc9\xa0\xbf\x120\x7f\\O\xdbp\xc5"
|
||||
b"\x81\xb2?v\xb6:\xca\xe1\xca\xa6\xb7\x90-RRg5H\x8a\x0e\xf1<m\x9aQ\xbf\xa4\xab"
|
||||
b":\xd84w\x96RM\x8e\xf6\xa1g\xb5\xa4\x18%\xd9g\xe1D\xe5\x14\x05d%"
|
||||
b"\x1c\xca\xcb\x83\xe6\xb4\x86\xf6\xb3\xca?yqP`&\xc0\xb8W\xf6\x89\x96(V"
|
||||
b"\xde\xd4\x01\n\xbd\x0b\xe6!\xc3\xa3\x96\nT\xe7\x10\xc3u\xf2cu\xd7\x01A\x03"
|
||||
b"\xa4\xb5C0\xc1\x98\xaf\x12a\x16\xd2'n\x11q_i8w\xfa\xd7\xef\t\xca\xdb\tJ\xe9"
|
||||
b"\x1e\x1a\x15\x97"
|
||||
)
|
||||
generator = (
|
||||
b"?\xb3,\x9bs\x13M\x0b.wPf`\xed\xbdHL\xa7\xb1\x8f!\xef T\x07\xf4y:"
|
||||
b"\x1a\x0b\xa1%\x10\xdb\xc1Pw\xbeF?\xffO\xedJ\xac\x0b\xb5U\xbe:l\x1b\x0ckG\xb1"
|
||||
b"\xbc7s\xbf~\x8cob\x90\x12(\xf8\xc2\x8c\xbb\x18\xa5Z\xe3\x13A\x00\ne"
|
||||
b"\x01\x96\xf91\xc7zW\xf2\xdd\xf4c\xe5\xe9\xec\x14Kw}\xe6*\xaa\xb8\xa8b"
|
||||
b"\x8a\xc3v\xd2\x82\xd6\xed8d\xe6y\x82B\x8e\xbc\x83\x1d\x144\x8fo/\x91\x93"
|
||||
b"\xb5\x04Z\xf2vqd\xe1\xdf\xc9g\xc1\xfb?.U\xa4\xbd\x1b\xff\xe8;\x9c\x80"
|
||||
b"\xd0R\xb9\x85\xd1\x82\xea\n\xdb*;s\x13\xd3\xfe\x14\xc8HK\x1e\x05%\x88\xb9"
|
||||
b"\xb7\xd2\xbb\xd2\xdf\x01a\x99\xec\xd0n\x15W\xcd\t\x15\xb35;\xbbd\xe0\xec7"
|
||||
b"\x7f\xd0(7\r\xf9+R\xc7\x89\x14(\xcd\xc6~\xb6\x18KR=\x1d\xb2F\xc3/c\x07\x84"
|
||||
b"\x90\xf0\x0e\xf8\xd6G\xd1H\xd4yTQ^#'\xcf\xef\x98\xc5\x82fKL\x0fl\xc4\x16Y"
|
||||
)
|
||||
self.assertEqual(len(field_order), len(generator))
|
||||
key_length = len(field_order)
|
||||
|
||||
ffc_dh_parameters = gkdi.FfcDhParameters()
|
||||
ffc_dh_parameters.field_order = list(field_order)
|
||||
ffc_dh_parameters.generator = list(generator)
|
||||
ffc_dh_parameters.key_length = key_length
|
||||
ffc_dh_parameters = ndr_pack(ffc_dh_parameters)
|
||||
|
||||
root_key_dn = self.get_root_key_container_dn(samdb)
|
||||
root_key_dn.add_child(f"CN={guid}")
|
||||
|
||||
# Avoid deleting root key objects without subsequently restarting the
|
||||
# Microsoft Key Distribution Service. This service will keep its root
|
||||
# key cached even after the corresponding AD object has been deleted,
|
||||
# breaking later tests that try to look up the root key object.
|
||||
|
||||
details = {
|
||||
"dn": root_key_dn,
|
||||
"objectClass": "msKds-ProvRootKey",
|
||||
"msKds-RootKeyData": data,
|
||||
"msKds-CreateTime": str(create_time),
|
||||
"msKds-UseStartTime": str(use_start_nt_time),
|
||||
"msKds-DomainID": str(domain_dn),
|
||||
"msKds-Version": "1", # comes from Server Configuration object.
|
||||
"msKds-KDFAlgorithmID": (
|
||||
"SP800_108_CTR_HMAC"
|
||||
), # comes from Server Configuration.
|
||||
"msKds-SecretAgreementAlgorithmID": (
|
||||
"DH"
|
||||
), # comes from Server Configuration.
|
||||
"msKds-SecretAgreementParam": (
|
||||
ffc_dh_parameters
|
||||
), # comes from Server Configuration.
|
||||
"msKds-PublicKeyLength": "2048", # comes from Server Configuration.
|
||||
"msKds-PrivateKeyLength": (
|
||||
"512"
|
||||
), # comes from Server Configuration. [MS-GKDI] claims this defaults to ‘256’.
|
||||
}
|
||||
if kdf_parameters is not None:
|
||||
details["msKds-KDFParam"] = (
|
||||
kdf_parameters # comes from Server Configuration.
|
||||
)
|
||||
|
||||
if guid_specified:
|
||||
# A test may request that a root key have a specific GUID so that
|
||||
# results may be reproducible. Ensure these keys are cleaned up
|
||||
# afterwards.
|
||||
self.addCleanup(delete_force, samdb, root_key_dn)
|
||||
samdb.add(details)
|
||||
|
||||
return guid
|
716
python/samba/tests/krb5/gkdi_tests.py
Executable file
716
python/samba/tests/krb5/gkdi_tests.py
Executable file
@ -0,0 +1,716 @@
|
||||
#!/usr/bin/env python3
|
||||
# Unix SMB/CIFS implementation.
|
||||
# Copyright (C) Catalyst.Net Ltd 2023
|
||||
#
|
||||
# This program is free software: you can redistribute it and/or modify
|
||||
# it under the terms of the GNU General Public License as published by
|
||||
# the Free Software Foundation, either version 3 of the License, or
|
||||
# (at your option) any later version.
|
||||
#
|
||||
# This program is distributed in the hope that it will be useful,
|
||||
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
# GNU General Public License for more details.
|
||||
#
|
||||
# You should have received a copy of the GNU General Public License
|
||||
# along with this program. If not, see <https://www.gnu.org/licenses/>.
|
||||
#
|
||||
|
||||
import sys
|
||||
import os
|
||||
|
||||
sys.path.insert(0, "bin/python")
|
||||
os.environ["PYTHONUNBUFFERED"] = "1"
|
||||
|
||||
import secrets
|
||||
|
||||
from typing import ClassVar, Optional
|
||||
|
||||
from samba import HRES_E_INVALIDARG, HRES_NTE_BAD_KEY, HRES_NTE_NO_KEY
|
||||
from samba.dcerpc import gkdi, misc
|
||||
from samba.gkdi import (
|
||||
Algorithm,
|
||||
Gkid,
|
||||
KEY_CYCLE_DURATION,
|
||||
MAX_CLOCK_SKEW,
|
||||
NtTime,
|
||||
NtTimeDelta,
|
||||
SeedKeyPair,
|
||||
)
|
||||
from samba.nt_time import timedelta_from_nt_time_delta
|
||||
|
||||
from samba.tests.gkdi import GetKeyError, GkdiBaseTest, ROOT_KEY_START_TIME
|
||||
from samba.tests.krb5.kdc_base_test import KDCBaseTest
|
||||
|
||||
|
||||
class GkdiKdcBaseTest(GkdiBaseTest, KDCBaseTest):
|
||||
_root_key: ClassVar[misc.GUID]
|
||||
|
||||
@classmethod
|
||||
def setUpClass(cls) -> None:
|
||||
super().setUpClass()
|
||||
|
||||
cls._root_key = None
|
||||
|
||||
def setUp(self) -> None:
|
||||
super().setUp()
|
||||
|
||||
if self._root_key is None:
|
||||
# GKDI requires a root key to operate. Creating a root key here
|
||||
# saves creating one before every test.
|
||||
#
|
||||
# We cannot delete this key after the tests have run, as Windows
|
||||
# might have decided to cache it to be used in subsequent runs. It
|
||||
# will keep a root key cached even if the corresponding AD object
|
||||
# has been deleted, leading to various problems later.
|
||||
cls = type(self)
|
||||
cls._root_key = self.new_root_key(use_start_time=ROOT_KEY_START_TIME)
|
||||
|
||||
def new_root_key(self, *args, **kwargs) -> misc.GUID:
|
||||
samdb = self.get_samdb()
|
||||
domain_dn = self.get_server_dn(samdb)
|
||||
return self.create_root_key(samdb, domain_dn, *args, **kwargs)
|
||||
|
||||
def gkdi_conn(self) -> gkdi.gkdi:
|
||||
# The seed keys used by Group Managed Service Accounts correspond to the
|
||||
# Enterprise DCs SID (S-1-5-9); as such they can be retrieved only by
|
||||
# server accounts.
|
||||
return self.gkdi_connect(
|
||||
self.dc_host,
|
||||
self.get_lp(),
|
||||
self.get_cached_creds(account_type=self.AccountType.SERVER),
|
||||
)
|
||||
|
||||
def check_rpc_get_key(
|
||||
self, root_key_id: Optional[misc.GUID], gkid: Gkid
|
||||
) -> SeedKeyPair:
|
||||
got_key_pair = self.rpc_get_key(
|
||||
self.gkdi_conn(), self.gmsa_sd, root_key_id, gkid
|
||||
)
|
||||
expected_key_pair = self.get_key(
|
||||
self.get_samdb(),
|
||||
self.gmsa_sd,
|
||||
root_key_id,
|
||||
gkid,
|
||||
root_key_id_hint=got_key_pair.root_key_id if root_key_id is None else None,
|
||||
)
|
||||
self.assertEqual(
|
||||
got_key_pair.root_key_id,
|
||||
expected_key_pair.root_key_id,
|
||||
"root key IDs must match",
|
||||
)
|
||||
self.assertEqual(got_key_pair, expected_key_pair, "key pairs must match")
|
||||
|
||||
return got_key_pair
|
||||
|
||||
|
||||
class GkdiExplicitRootKeyTests(GkdiKdcBaseTest):
|
||||
def test_current_l0_idx(self):
|
||||
"""Request a key with the current L0 index. This index is regularly
|
||||
incremented every 427 days or so."""
|
||||
root_key_id = self.new_root_key()
|
||||
|
||||
# It actually doesn’t matter what we specify for the L1 and L2 indices.
|
||||
# We’ll get the same result regardless — they just cannot specify a key
|
||||
# from the future.
|
||||
self.check_rpc_get_key(root_key_id, self.current_gkid())
|
||||
|
||||
def test_previous_l0_idx(self):
|
||||
"""Request a key with a previous L0 index."""
|
||||
root_key_id = self.new_root_key(use_start_time=ROOT_KEY_START_TIME)
|
||||
|
||||
# It actually doesn’t matter what we specify for the L1 and L2 indices.
|
||||
# We’ll get the same result regardless.
|
||||
previous_l0_idx = self.current_gkid().l0_idx - 1
|
||||
key = self.check_rpc_get_key(root_key_id, Gkid(previous_l0_idx, 0, 0))
|
||||
|
||||
# Expect to get an L1 seed key.
|
||||
self.assertIsNotNone(key.l1_key)
|
||||
self.assertIsNone(key.l2_key)
|
||||
self.assertEqual(Gkid(previous_l0_idx, 31, 31), key.gkid)
|
||||
self.assertEqual(root_key_id, key.root_key_id)
|
||||
|
||||
def test_algorithm_sha1(self):
|
||||
"""Test with the SHA1 algorithm."""
|
||||
key = self.check_rpc_get_key(
|
||||
self.new_root_key(hash_algorithm=Algorithm.SHA1),
|
||||
self.current_gkid(),
|
||||
)
|
||||
self.assertIs(Algorithm.SHA1, key.hash_algorithm)
|
||||
|
||||
def test_algorithm_sha256(self):
|
||||
"""Test with the SHA256 algorithm."""
|
||||
key = self.check_rpc_get_key(
|
||||
self.new_root_key(hash_algorithm=Algorithm.SHA256),
|
||||
self.current_gkid(),
|
||||
)
|
||||
self.assertIs(Algorithm.SHA256, key.hash_algorithm)
|
||||
|
||||
def test_algorithm_sha384(self):
|
||||
"""Test with the SHA384 algorithm."""
|
||||
key = self.check_rpc_get_key(
|
||||
self.new_root_key(hash_algorithm=Algorithm.SHA384),
|
||||
self.current_gkid(),
|
||||
)
|
||||
self.assertIs(Algorithm.SHA384, key.hash_algorithm)
|
||||
|
||||
def test_algorithm_sha512(self):
|
||||
"""Test with the SHA512 algorithm."""
|
||||
key = self.check_rpc_get_key(
|
||||
self.new_root_key(hash_algorithm=Algorithm.SHA512),
|
||||
self.current_gkid(),
|
||||
)
|
||||
self.assertIs(Algorithm.SHA512, key.hash_algorithm)
|
||||
|
||||
def test_algorithm_none(self):
|
||||
"""Test without a specified algorithm."""
|
||||
key = self.check_rpc_get_key(
|
||||
self.new_root_key(hash_algorithm=None),
|
||||
self.current_gkid(),
|
||||
)
|
||||
self.assertIs(Algorithm.SHA256, key.hash_algorithm)
|
||||
|
||||
def test_future_key(self):
|
||||
"""Try to request a key from the future."""
|
||||
root_key_id = self.new_root_key(use_start_time=ROOT_KEY_START_TIME)
|
||||
|
||||
future_gkid = self.current_gkid(
|
||||
offset=timedelta_from_nt_time_delta(
|
||||
NtTimeDelta(KEY_CYCLE_DURATION + MAX_CLOCK_SKEW)
|
||||
)
|
||||
)
|
||||
|
||||
with self.assertRaises(GetKeyError) as err:
|
||||
self.get_key(self.get_samdb(), self.gmsa_sd, root_key_id, future_gkid)
|
||||
|
||||
self.assertEqual(
|
||||
HRES_E_INVALIDARG,
|
||||
err.exception.args[0],
|
||||
"requesting a key from the future should fail with INVALID_PARAMETER",
|
||||
)
|
||||
|
||||
with self.assertRaises(GetKeyError) as rpc_err:
|
||||
self.rpc_get_key(self.gkdi_conn(), self.gmsa_sd, root_key_id, future_gkid)
|
||||
|
||||
self.assertEqual(
|
||||
HRES_E_INVALIDARG,
|
||||
rpc_err.exception.args[0],
|
||||
"requesting a key from the future should fail with INVALID_PARAMETER",
|
||||
)
|
||||
|
||||
def test_root_key_use_start_time_zero(self):
|
||||
"""Attempt to use a root key with an effective time of zero."""
|
||||
root_key_id = self.new_root_key(use_start_time=NtTime(0))
|
||||
|
||||
gkid = self.current_gkid()
|
||||
|
||||
with self.assertRaises(GetKeyError) as err:
|
||||
self.get_key(self.get_samdb(), self.gmsa_sd, root_key_id, gkid)
|
||||
|
||||
self.assertEqual(
|
||||
HRES_NTE_BAD_KEY,
|
||||
err.exception.args[0],
|
||||
"using a root key with an effective time of zero should fail with BAD_KEY",
|
||||
)
|
||||
|
||||
with self.assertRaises(GetKeyError) as rpc_err:
|
||||
self.rpc_get_key(self.gkdi_conn(), self.gmsa_sd, root_key_id, gkid)
|
||||
|
||||
self.assertEqual(
|
||||
HRES_NTE_BAD_KEY,
|
||||
rpc_err.exception.args[0],
|
||||
"using a root key with an effective time of zero should fail with BAD_KEY",
|
||||
)
|
||||
|
||||
def test_root_key_use_start_time_too_low(self):
|
||||
"""Attempt to use a root key with an effective time set too low."""
|
||||
root_key_id = self.new_root_key(use_start_time=NtTime(ROOT_KEY_START_TIME - 1))
|
||||
|
||||
gkid = self.current_gkid()
|
||||
|
||||
with self.assertRaises(GetKeyError) as err:
|
||||
self.get_key(self.get_samdb(), self.gmsa_sd, root_key_id, gkid)
|
||||
|
||||
self.assertEqual(
|
||||
HRES_E_INVALIDARG,
|
||||
err.exception.args[0],
|
||||
"using a root key with too low effective time should fail with"
|
||||
" INVALID_PARAMETER",
|
||||
)
|
||||
|
||||
with self.assertRaises(GetKeyError) as rpc_err:
|
||||
self.rpc_get_key(self.gkdi_conn(), self.gmsa_sd, root_key_id, gkid)
|
||||
|
||||
self.assertEqual(
|
||||
HRES_E_INVALIDARG,
|
||||
rpc_err.exception.args[0],
|
||||
"using a root key with too low effective time should fail with"
|
||||
" INVALID_PARAMETER",
|
||||
)
|
||||
|
||||
def test_before_valid(self):
|
||||
"""Attempt to use a key before it is valid."""
|
||||
gkid = self.current_gkid()
|
||||
valid_start_time = NtTime(
|
||||
gkid.start_nt_time() + KEY_CYCLE_DURATION + MAX_CLOCK_SKEW
|
||||
)
|
||||
|
||||
# Using a valid root key is allowed.
|
||||
valid_root_key_id = self.new_root_key(use_start_time=valid_start_time)
|
||||
self.check_rpc_get_key(valid_root_key_id, gkid)
|
||||
|
||||
# But attempting to use a root key that is not yet valid will result in
|
||||
# an INVALID_PARAMETER error.
|
||||
invalid_root_key_id = self.new_root_key(use_start_time=valid_start_time + 1)
|
||||
|
||||
with self.assertRaises(GetKeyError) as err:
|
||||
self.get_key(self.get_samdb(), self.gmsa_sd, invalid_root_key_id, gkid)
|
||||
|
||||
self.assertEqual(
|
||||
HRES_E_INVALIDARG,
|
||||
err.exception.args[0],
|
||||
"using a key before it is valid should fail with INVALID_PARAMETER",
|
||||
)
|
||||
|
||||
with self.assertRaises(GetKeyError) as rpc_err:
|
||||
self.rpc_get_key(self.gkdi_conn(), self.gmsa_sd, invalid_root_key_id, gkid)
|
||||
|
||||
self.assertEqual(
|
||||
HRES_E_INVALIDARG,
|
||||
rpc_err.exception.args[0],
|
||||
"using a key before it is valid should fail with INVALID_PARAMETER",
|
||||
)
|
||||
|
||||
def test_non_existent_root_key(self):
|
||||
"""Attempt to use a non‐existent root key."""
|
||||
root_key_id = misc.GUID(secrets.token_bytes(16))
|
||||
|
||||
gkid = self.current_gkid()
|
||||
|
||||
with self.assertRaises(GetKeyError) as err:
|
||||
self.get_key(self.get_samdb(), self.gmsa_sd, root_key_id, gkid)
|
||||
|
||||
self.assertEqual(
|
||||
HRES_NTE_NO_KEY,
|
||||
err.exception.args[0],
|
||||
"using a non‐existent root key should fail with NO_KEY",
|
||||
)
|
||||
|
||||
with self.assertRaises(GetKeyError) as rpc_err:
|
||||
self.rpc_get_key(self.gkdi_conn(), self.gmsa_sd, root_key_id, gkid)
|
||||
|
||||
self.assertEqual(
|
||||
HRES_NTE_NO_KEY,
|
||||
rpc_err.exception.args[0],
|
||||
"using a non‐existent root key should fail with NO_KEY",
|
||||
)
|
||||
|
||||
|
||||
class GkdiImplicitRootKeyTests(GkdiKdcBaseTest):
|
||||
def test_l1_seed_key(self):
|
||||
"""Request a key and expect to receive an L1 seed key."""
|
||||
gkid = Gkid(300, 0, 31)
|
||||
key = self.check_rpc_get_key(None, gkid)
|
||||
|
||||
# Expect to get an L1 seed key.
|
||||
self.assertIsNotNone(key.l1_key)
|
||||
self.assertIsNone(key.l2_key)
|
||||
self.assertEqual(gkid, key.gkid)
|
||||
|
||||
def test_l2_seed_key(self):
|
||||
"""Request a key and expect to receive an L2 seed key."""
|
||||
gkid = Gkid(300, 0, 0)
|
||||
key = self.check_rpc_get_key(None, gkid)
|
||||
|
||||
# Expect to get an L2 seed key.
|
||||
self.assertIsNone(key.l1_key)
|
||||
self.assertIsNotNone(key.l2_key)
|
||||
self.assertEqual(gkid, key.gkid)
|
||||
|
||||
def test_both_seed_keys(self):
|
||||
"""Request a key and expect to receive L1 and L2 seed keys."""
|
||||
gkid = Gkid(300, 1, 0)
|
||||
key = self.check_rpc_get_key(None, gkid)
|
||||
|
||||
# Expect to get both L1 and L2 seed keys.
|
||||
self.assertIsNotNone(key.l1_key)
|
||||
self.assertIsNotNone(key.l2_key)
|
||||
self.assertEqual(gkid, key.gkid)
|
||||
|
||||
def test_both_seed_keys_no_hint(self):
|
||||
"""Request a key, but don’t specify ‘root_key_id_hint’."""
|
||||
gkid = Gkid(300, 1, 0)
|
||||
key = self.get_key(
|
||||
self.get_samdb(),
|
||||
self.gmsa_sd,
|
||||
None,
|
||||
gkid,
|
||||
)
|
||||
|
||||
# Expect to get both L1 and L2 seed keys.
|
||||
self.assertIsNotNone(key.l1_key)
|
||||
self.assertIsNotNone(key.l2_key)
|
||||
self.assertEqual(gkid, key.gkid)
|
||||
|
||||
def test_request_l0_seed_key(self):
|
||||
"""Attempt to request an L0 seed key."""
|
||||
gkid = Gkid.l0_seed_key(300)
|
||||
|
||||
with self.assertRaises(GetKeyError) as err:
|
||||
self.get_key(self.get_samdb(), self.gmsa_sd, None, gkid)
|
||||
|
||||
self.assertEqual(
|
||||
HRES_E_INVALIDARG,
|
||||
err.exception.args[0],
|
||||
"requesting an L0 seed key should fail with INVALID_PARAMETER",
|
||||
)
|
||||
|
||||
with self.assertRaises(GetKeyError) as rpc_err:
|
||||
self.rpc_get_key(self.gkdi_conn(), self.gmsa_sd, None, gkid)
|
||||
|
||||
self.assertEqual(
|
||||
HRES_E_INVALIDARG,
|
||||
rpc_err.exception.args[0],
|
||||
"requesting an L0 seed key should fail with INVALID_PARAMETER",
|
||||
)
|
||||
|
||||
def test_request_l1_seed_key(self):
|
||||
"""Attempt to request an L1 seed key."""
|
||||
gkid = Gkid.l1_seed_key(300, 0)
|
||||
|
||||
with self.assertRaises(GetKeyError) as err:
|
||||
self.get_key(self.get_samdb(), self.gmsa_sd, None, gkid)
|
||||
|
||||
self.assertEqual(
|
||||
HRES_E_INVALIDARG,
|
||||
err.exception.args[0],
|
||||
"requesting an L1 seed key should fail with INVALID_PARAMETER",
|
||||
)
|
||||
|
||||
with self.assertRaises(GetKeyError) as rpc_err:
|
||||
self.rpc_get_key(self.gkdi_conn(), self.gmsa_sd, None, gkid)
|
||||
|
||||
self.assertEqual(
|
||||
HRES_E_INVALIDARG,
|
||||
rpc_err.exception.args[0],
|
||||
"requesting an L1 seed key should fail with INVALID_PARAMETER",
|
||||
)
|
||||
|
||||
def test_request_default_seed_key(self):
|
||||
"""Try to make a request with the default GKID."""
|
||||
gkid = Gkid.default()
|
||||
|
||||
self.assertRaises(
|
||||
NotImplementedError,
|
||||
self.get_key,
|
||||
self.get_samdb(),
|
||||
self.gmsa_sd,
|
||||
None,
|
||||
gkid,
|
||||
)
|
||||
|
||||
self.rpc_get_key(self.gkdi_conn(), self.gmsa_sd, None, gkid)
|
||||
|
||||
|
||||
class GkdiSelfTests(GkdiKdcBaseTest):
|
||||
def test_current_l0_idx_l1_seed_key(self):
|
||||
"""Request a key with the current L0 index, expecting to receive an L1
|
||||
seed key."""
|
||||
root_key_id = self.new_root_key(
|
||||
use_start_time=ROOT_KEY_START_TIME,
|
||||
hash_algorithm=Algorithm.SHA512,
|
||||
guid=misc.GUID("89f70521-9d66-441f-c314-1b462f9b1052"),
|
||||
data=bytes.fromhex(
|
||||
"a6ef87dbbbf86b6bbe55750b941f13ca99efe5185e2e2bded5b838d8a0e77647"
|
||||
"0537e68cae45a7a0f4b1d6c9bf5494c3f879e172e326557cdbb6a56e8799a722"
|
||||
),
|
||||
)
|
||||
|
||||
current_gkid = Gkid(255, 24, 31)
|
||||
key = self.get_key(
|
||||
self.get_samdb(),
|
||||
self.gmsa_sd,
|
||||
root_key_id,
|
||||
Gkid(255, 2, 5),
|
||||
current_gkid=current_gkid,
|
||||
)
|
||||
|
||||
# Expect to get an L1 seed key.
|
||||
self.assertEqual(current_gkid, key.gkid)
|
||||
self.assertEqual(root_key_id, key.root_key_id)
|
||||
self.assertEqual(Algorithm.SHA512, key.hash_algorithm)
|
||||
self.assertEqual(
|
||||
bytes.fromhex(
|
||||
"bd538a073490f3cf9451c933025de9b22c97eaddaffa94b379e2b919a4bed147"
|
||||
"5bc67f6a9175b139c69204c57d4300a0141ffe34d12ced84614593b1aa13af1c"
|
||||
),
|
||||
key.l1_key,
|
||||
)
|
||||
self.assertIsNone(key.l2_key)
|
||||
|
||||
def test_current_l0_idx_l2_seed_key(self):
|
||||
"""Request a key with the current L0 index, expecting to receive an L2
|
||||
seed key."""
|
||||
root_key_id = self.new_root_key(
|
||||
use_start_time=ROOT_KEY_START_TIME,
|
||||
hash_algorithm=Algorithm.SHA512,
|
||||
guid=misc.GUID("1a3d6c30-aa81-cb7f-d3fe-80775d135dfe"),
|
||||
data=bytes.fromhex(
|
||||
"dfd95be3153a0805c65694e7d284aace5ab0aa493350025eb8dbc6df0b4e9256"
|
||||
"fb4cbfbe6237ce3732694e2608760076b67082d39abd3c0fedba1b8873645064"
|
||||
),
|
||||
)
|
||||
|
||||
current_gkid = Gkid(321, 0, 12)
|
||||
key = self.get_key(
|
||||
self.get_samdb(),
|
||||
self.gmsa_sd,
|
||||
root_key_id,
|
||||
Gkid(321, 0, 1),
|
||||
current_gkid=current_gkid,
|
||||
)
|
||||
|
||||
# Expect to get an L2 seed key.
|
||||
self.assertEqual(current_gkid, key.gkid)
|
||||
self.assertEqual(root_key_id, key.root_key_id)
|
||||
self.assertEqual(Algorithm.SHA512, key.hash_algorithm)
|
||||
self.assertIsNone(key.l1_key)
|
||||
self.assertEqual(
|
||||
bytes.fromhex(
|
||||
"bbbd9376cd16c247ed40f5912d1908218c08f0915bae02fe02cbfb3753bde406"
|
||||
"f9c553acd95143cf63906a0440e3cf237d2335ae4e4b9cd2d946a71351ebcb7b"
|
||||
),
|
||||
key.l2_key,
|
||||
)
|
||||
|
||||
def test_current_l0_idx_both_seed_keys(self):
|
||||
"""Request a key with the current L0 index, expecting to receive L1 and
|
||||
L2 seed keys."""
|
||||
root_key_id = self.new_root_key(
|
||||
use_start_time=ROOT_KEY_START_TIME,
|
||||
hash_algorithm=Algorithm.SHA512,
|
||||
guid=misc.GUID("09de0b38-c743-7abf-44ea-7a3c3e404314"),
|
||||
data=bytes.fromhex(
|
||||
"d5912d0eb3bd60e1371b1e525dd83be7fc5baf77018b0dba6bd948b7a98ebe5a"
|
||||
"f37674332506a46c52c108a62f2a3e89251ad1bde6d539004679c0658853bb68"
|
||||
),
|
||||
)
|
||||
|
||||
current_gkid = Gkid(123, 21, 0)
|
||||
key = self.get_key(
|
||||
self.get_samdb(),
|
||||
self.gmsa_sd,
|
||||
root_key_id,
|
||||
Gkid(123, 2, 1),
|
||||
current_gkid=current_gkid,
|
||||
)
|
||||
|
||||
# Expect to get both L1 and L2 seed keys.
|
||||
self.assertEqual(current_gkid, key.gkid)
|
||||
self.assertEqual(root_key_id, key.root_key_id)
|
||||
self.assertEqual(Algorithm.SHA512, key.hash_algorithm)
|
||||
self.assertEqual(
|
||||
bytes.fromhex(
|
||||
"b1f7c5896e7dc791d9c0aaf8ca7dbab8c172a4f8b873db488a3c4cbd0f559b11"
|
||||
"52ffba39d4aff2d9e8aada90b27a3c94a5af996f4b8f584a4f37ccab4d505d3d"
|
||||
),
|
||||
key.l1_key,
|
||||
)
|
||||
self.assertEqual(
|
||||
bytes.fromhex(
|
||||
"133c9bbd20d9227aeb38dfcd3be6bcbfc5983ba37202088ff5c8a70511214506"
|
||||
"a69c195a8807cd844bcb955e9569c8e4d197759f28577cc126d15f16a7da4ee0"
|
||||
),
|
||||
key.l2_key,
|
||||
)
|
||||
|
||||
def test_previous_l0_idx(self):
|
||||
"""Request a key with a previous L0 index."""
|
||||
root_key_id = self.new_root_key(
|
||||
use_start_time=ROOT_KEY_START_TIME,
|
||||
hash_algorithm=Algorithm.SHA512,
|
||||
guid=misc.GUID("27136e8f-e093-6fe3-e57f-1d915b102e1c"),
|
||||
data=bytes.fromhex(
|
||||
"b41118c60a19cafa5ecf858d1a2a2216527b2daedf386e9d599e42a46add6c7d"
|
||||
"c93868619761c880ff3674a77c6e5fbf3434d130a9727bb2cd2a2557bdcfc752"
|
||||
),
|
||||
)
|
||||
|
||||
key = self.get_key(
|
||||
self.get_samdb(),
|
||||
self.gmsa_sd,
|
||||
root_key_id,
|
||||
Gkid(100, 20, 30),
|
||||
current_gkid=Gkid(101, 2, 3),
|
||||
)
|
||||
|
||||
# Expect to get an L1 seed key.
|
||||
self.assertEqual(Gkid(100, 31, 31), key.gkid)
|
||||
self.assertEqual(root_key_id, key.root_key_id)
|
||||
self.assertEqual(Algorithm.SHA512, key.hash_algorithm)
|
||||
self.assertEqual(
|
||||
bytes.fromhex(
|
||||
"935cbdc06198eb28fa44b8d8278f51072c4613999236585041ede8e72d02fe95"
|
||||
"e3454f046382cbc0a700779b79474dd7e080509d76302d2937407e96e3d3d022"
|
||||
),
|
||||
key.l1_key,
|
||||
)
|
||||
self.assertIsNone(key.l2_key)
|
||||
|
||||
def test_sha1(self):
|
||||
"""Request a key derived with SHA1."""
|
||||
root_key_id = self.new_root_key(
|
||||
use_start_time=ROOT_KEY_START_TIME,
|
||||
hash_algorithm=Algorithm.SHA1,
|
||||
guid=misc.GUID("970abad6-fe55-073a-caf1-b801d3f26bd3"),
|
||||
data=bytes.fromhex(
|
||||
"3bed03bf0fb7d4013149154f24ca2d59b98db6d588cb1f54eca083855e25eb28"
|
||||
"d3562a01adc78c4b70e0b72a59515863e7732b853fba02dd7646e63108441211"
|
||||
),
|
||||
)
|
||||
|
||||
current_gkid = Gkid(1, 2, 3)
|
||||
key = self.get_key(
|
||||
self.get_samdb(),
|
||||
self.gmsa_sd,
|
||||
root_key_id,
|
||||
Gkid(1, 1, 1),
|
||||
current_gkid=current_gkid,
|
||||
)
|
||||
|
||||
# Expect to get both L1 and L2 seed keys.
|
||||
self.assertEqual(current_gkid, key.gkid)
|
||||
self.assertEqual(root_key_id, key.root_key_id)
|
||||
self.assertEqual(Algorithm.SHA1, key.hash_algorithm)
|
||||
self.assertEqual(
|
||||
bytes.fromhex(
|
||||
"576cb68f2e52eb739f817b488c3590d86f1c2c365f3fc9201d9c7fee7494853d"
|
||||
"58746ee13e48f18aa6fa69f7157de3d07de34e13836792b7c088ffb6914a89c2"
|
||||
),
|
||||
key.l1_key,
|
||||
)
|
||||
self.assertEqual(
|
||||
bytes.fromhex(
|
||||
"3ffb825adaf116b6533207d568a30ed3d3f21c68840941c9456684f9afa11b05"
|
||||
"6e0c59391b4d88c495d984c3d680029cc5c594630f34179119c1c5acaae5e90e"
|
||||
),
|
||||
key.l2_key,
|
||||
)
|
||||
|
||||
def test_sha256(self):
|
||||
"""Request a key derived with SHA256."""
|
||||
root_key_id = self.new_root_key(
|
||||
use_start_time=ROOT_KEY_START_TIME,
|
||||
hash_algorithm=Algorithm.SHA256,
|
||||
guid=misc.GUID("45e26207-ed33-dcd5-925a-518a0deef69e"),
|
||||
data=bytes.fromhex(
|
||||
"28b5b6503d3c1d24814de781bb7bfce3ef69eed1ce4809372bee2c506270c5f0"
|
||||
"b5c6df597472623f256c86daa0991e8a11a1705f21b2cfdc0bb9db4ba23246a2"
|
||||
),
|
||||
)
|
||||
|
||||
current_gkid = Gkid(222, 22, 22)
|
||||
key = self.get_key(
|
||||
self.get_samdb(),
|
||||
self.gmsa_sd,
|
||||
root_key_id,
|
||||
Gkid(222, 11, 0),
|
||||
current_gkid=current_gkid,
|
||||
)
|
||||
|
||||
# Expect to get both L1 and L2 seed keys.
|
||||
self.assertEqual(current_gkid, key.gkid)
|
||||
self.assertEqual(root_key_id, key.root_key_id)
|
||||
self.assertEqual(Algorithm.SHA256, key.hash_algorithm)
|
||||
self.assertEqual(
|
||||
bytes.fromhex(
|
||||
"57aced6e75f83f3af4f879b38b60f090b42e4bfa022fae3e6fd94280b469b0ec"
|
||||
"15d8b853a870b5fbdf28708cce19273b74a573acbe0deda8ef515db4691e2dcb"
|
||||
),
|
||||
key.l1_key,
|
||||
)
|
||||
self.assertEqual(
|
||||
bytes.fromhex(
|
||||
"752a0879ae2424c0504c7493599f13e588e1bbdc252f83325ad5b1fb91c24c89"
|
||||
"01d440f3ff9ffba59fcd65bb975732d9f383dd50b898174bb9393e383d25d540"
|
||||
),
|
||||
key.l2_key,
|
||||
)
|
||||
|
||||
def test_sha384(self):
|
||||
"""Request a key derived with SHA384."""
|
||||
root_key_id = self.new_root_key(
|
||||
use_start_time=ROOT_KEY_START_TIME,
|
||||
hash_algorithm=Algorithm.SHA384,
|
||||
guid=misc.GUID("66e6d9f7-4924-f3fc-fe34-605634d42ebd"),
|
||||
data=bytes.fromhex(
|
||||
"23e5ba86cbd88f7b432ee66dbb03bf4eebf401cbfc3df735d4d728b503c87f84"
|
||||
"3207c6f6153f190dfe85a86cb8d8b74df13b25305981be8d7e29c96ee54c9630"
|
||||
),
|
||||
)
|
||||
|
||||
current_gkid = Gkid(287, 28, 27)
|
||||
key = self.get_key(
|
||||
self.get_samdb(),
|
||||
self.gmsa_sd,
|
||||
root_key_id,
|
||||
Gkid(287, 8, 7),
|
||||
current_gkid=current_gkid,
|
||||
)
|
||||
|
||||
# Expect to get both L1 and L2 seed keys.
|
||||
self.assertEqual(current_gkid, key.gkid)
|
||||
self.assertEqual(root_key_id, key.root_key_id)
|
||||
self.assertEqual(Algorithm.SHA384, key.hash_algorithm)
|
||||
self.assertEqual(
|
||||
bytes.fromhex(
|
||||
"fabadd7a9a63df57d6832df7a735aebb6e181888b2eaf301a2e4ff9a70246d38"
|
||||
"ab1d2416325bf3eb726a0267bab4bd950c7291f05ea5f17197ece56992af3eb8"
|
||||
),
|
||||
key.l1_key,
|
||||
)
|
||||
self.assertEqual(
|
||||
bytes.fromhex(
|
||||
"ec1c65634b5694818e1d341da9996db8f2a1ef6a2c776a7126a7ebd18b37a073"
|
||||
"afdac44c41b167b14e4b872d485bbb6d7b70964215d0e84a2ff142a9d943f205"
|
||||
),
|
||||
key.l2_key,
|
||||
)
|
||||
|
||||
def test_derive_key_exact(self):
|
||||
"""Derive a key at an exact GKID."""
|
||||
root_key_id = self.new_root_key(
|
||||
use_start_time=ROOT_KEY_START_TIME,
|
||||
hash_algorithm=Algorithm.SHA512,
|
||||
guid=misc.GUID("d95fb06f-5a9c-1829-e20d-27f3f2ecfbeb"),
|
||||
data=bytes.fromhex(
|
||||
"489f3531c537774d432d6b97e3bc1f43d2e8c6dc17eb0e4fd9a0870d2f1ebf92"
|
||||
"e2496668a8b5bd11aea2d32d0aab716f48fe569f5c9b50ff3f9bf5deaea572fb"
|
||||
),
|
||||
)
|
||||
|
||||
gkid = Gkid(333, 22, 11)
|
||||
key = self.get_key_exact(
|
||||
self.get_samdb(),
|
||||
self.gmsa_sd,
|
||||
root_key_id,
|
||||
gkid,
|
||||
current_gkid=self.current_gkid(),
|
||||
)
|
||||
|
||||
self.assertEqual(gkid, key.gkid)
|
||||
self.assertEqual(root_key_id, key.root_key_id)
|
||||
self.assertEqual(Algorithm.SHA512, key.hash_algorithm)
|
||||
self.assertEqual(
|
||||
bytes.fromhex(
|
||||
"d6ab3b14f4f4c8908aa3464011b39f10a8bfadb9974af90f7d9a9fede2fdc6e5"
|
||||
"f68a628ec00f9994a3abd8a52ae9e2db4f68e83648311e9d7765f2535515b5e2"
|
||||
),
|
||||
key.key,
|
||||
)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
import unittest
|
||||
|
||||
unittest.main()
|
18
selftest/knownfail.d/gkdi
Normal file
18
selftest/knownfail.d/gkdi
Normal file
@ -0,0 +1,18 @@
|
||||
^samba\.tests\.krb5\.gkdi_tests\.samba\.tests\.krb5\.gkdi_tests\.GkdiExplicitRootKeyTests\.test_algorithm_none\(ad_dc\)$
|
||||
^samba\.tests\.krb5\.gkdi_tests\.samba\.tests\.krb5\.gkdi_tests\.GkdiExplicitRootKeyTests\.test_algorithm_sha1\(ad_dc\)$
|
||||
^samba\.tests\.krb5\.gkdi_tests\.samba\.tests\.krb5\.gkdi_tests\.GkdiExplicitRootKeyTests\.test_algorithm_sha256\(ad_dc\)$
|
||||
^samba\.tests\.krb5\.gkdi_tests\.samba\.tests\.krb5\.gkdi_tests\.GkdiExplicitRootKeyTests\.test_algorithm_sha384\(ad_dc\)$
|
||||
^samba\.tests\.krb5\.gkdi_tests\.samba\.tests\.krb5\.gkdi_tests\.GkdiExplicitRootKeyTests\.test_algorithm_sha512\(ad_dc\)$
|
||||
^samba\.tests\.krb5\.gkdi_tests\.samba\.tests\.krb5\.gkdi_tests\.GkdiExplicitRootKeyTests\.test_before_valid\(ad_dc\)$
|
||||
^samba\.tests\.krb5\.gkdi_tests\.samba\.tests\.krb5\.gkdi_tests\.GkdiExplicitRootKeyTests\.test_current_l0_idx\(ad_dc\)$
|
||||
^samba\.tests\.krb5\.gkdi_tests\.samba\.tests\.krb5\.gkdi_tests\.GkdiExplicitRootKeyTests\.test_future_key\(ad_dc\)$
|
||||
^samba\.tests\.krb5\.gkdi_tests\.samba\.tests\.krb5\.gkdi_tests\.GkdiExplicitRootKeyTests\.test_non_existent_root_key\(ad_dc\)$
|
||||
^samba\.tests\.krb5\.gkdi_tests\.samba\.tests\.krb5\.gkdi_tests\.GkdiExplicitRootKeyTests\.test_previous_l0_idx\(ad_dc\)$
|
||||
^samba\.tests\.krb5\.gkdi_tests\.samba\.tests\.krb5\.gkdi_tests\.GkdiExplicitRootKeyTests\.test_root_key_use_start_time_too_low\(ad_dc\)$
|
||||
^samba\.tests\.krb5\.gkdi_tests\.samba\.tests\.krb5\.gkdi_tests\.GkdiExplicitRootKeyTests\.test_root_key_use_start_time_zero\(ad_dc\)$
|
||||
^samba\.tests\.krb5\.gkdi_tests\.samba\.tests\.krb5\.gkdi_tests\.GkdiImplicitRootKeyTests\.test_both_seed_keys\(ad_dc\)$
|
||||
^samba\.tests\.krb5\.gkdi_tests\.samba\.tests\.krb5\.gkdi_tests\.GkdiImplicitRootKeyTests\.test_l1_seed_key\(ad_dc\)$
|
||||
^samba\.tests\.krb5\.gkdi_tests\.samba\.tests\.krb5\.gkdi_tests\.GkdiImplicitRootKeyTests\.test_l2_seed_key\(ad_dc\)$
|
||||
^samba\.tests\.krb5\.gkdi_tests\.samba\.tests\.krb5\.gkdi_tests\.GkdiImplicitRootKeyTests\.test_request_default_seed_key\(ad_dc\)$
|
||||
^samba\.tests\.krb5\.gkdi_tests\.samba\.tests\.krb5\.gkdi_tests\.GkdiImplicitRootKeyTests\.test_request_l0_seed_key\(ad_dc\)$
|
||||
^samba\.tests\.krb5\.gkdi_tests\.samba\.tests\.krb5\.gkdi_tests\.GkdiImplicitRootKeyTests\.test_request_l1_seed_key\(ad_dc\)$
|
@ -2044,6 +2044,10 @@ planoldpythontestsuite(
|
||||
'ad_dc',
|
||||
'samba.tests.krb5.conditional_ace_tests',
|
||||
environ=krb5_environ)
|
||||
planoldpythontestsuite(
|
||||
'ad_dc',
|
||||
'samba.tests.krb5.gkdi_tests',
|
||||
environ=krb5_environ)
|
||||
|
||||
for env in [
|
||||
'vampire_dc',
|
||||
|
Loading…
Reference in New Issue
Block a user