# Unix SMB/CIFS implementation.
# Copyright (C) Catalyst.Net Ltd 2025
#
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see .
#
"""Functions for processing key_credential_link"""
import base64
from hashlib import sha256
import struct
import time
from typing import Optional, Union, Iterable
from cryptography.hazmat.primitives.serialization import (
load_der_public_key,
load_pem_public_key,
PublicFormat,
Encoding)
from cryptography.hazmat.primitives.asymmetric.rsa import RSAPublicKey
from cryptography.x509 import (
load_pem_x509_certificate,
load_der_x509_certificate)
from samba import nttime2unix
from samba.samdb import SamDB, BinaryDn
from samba.ndr import ndr_unpack, ndr_pack
from ldb import Dn
from samba.dcerpc import keycredlink, misc
class KeyCredLinkError(Exception):
"""The key credential link is inconsistent."""
# For bad values handed in, we use ValueError. For internal bad
# values, we use this.
def key_usage_string(i):
# there must be a better way.
for s in ('KEY_USAGE_NGC', 'KEY_USAGE_FIDO', 'KEY_USAGE_FEK',):
if i == getattr(keycredlink, s):
return s
return "unknown"
def nttime_as_date(nt):
secs = nttime2unix(nt)
ts = time.gmtime(secs)
return time.strftime('%Y-%m-%d %H:%M:%S', ts)
class KeyCredentialLinkDn(BinaryDn):
"""KeyCredentialLink attributes are stored as DN+Binary.
The binary part is a KEYCREDENTIALLINK_BLOB, which is basically an
array of KEYCREDENTIALLINK_ENTRY collectively describing a public
key.
Usually the DN refers to the object the KeyCredentialLink was
found on.
"""
# We make .binary a @property, so that BinaryDn's .parse() and
# .prefix just work without knowing that assigning to .binary is
# doing validation checks.
blob = None
@property
def binary(self) -> bytes:
"""The binary is stored as a keycredlink.KEYCREDENTIALLINK_BLOB"""
if self.blob is None:
return None
return ndr_pack(self.blob)
@binary.setter
def binary(self, value:bytes):
try:
self.blob = ndr_unpack(keycredlink.KEYCREDENTIALLINK_BLOB,
value)
except Exception as e:
raise ValueError("Could not parse value as KEYCREDENTIALLINK_BLOB "
f" (internal error: {e})")
def get_entry(self, entry_id):
if self.blob is None:
raise KeyCredLinkError("no key material")
for entry in self.blob.entries:
if entry.identifier == entry_id:
return entry.value
raise KeyCredLinkError(f"Key information entry {entry_id} not found")
def fingerprint(self) -> str:
"""The SHA256 of the key material in DER encoding, formatted
as hex pairs separated by colons ("hh:hh:...")"""
# A competing format is '2048 SHA256:' (ssh style).
# This sha256 value should also be stored in the KeyID field.
data = self.get_entry(keycredlink.KeyMaterial)
hash = sha256(data).digest()
# Python 3.8+ will do this with hash.hex(':')
return ':'.join(f'{_:02X}' for _ in hash)
def description(self, verbosity=2) -> str:
"""Text describing key credential link characteristics.
verbosity is adjustable between 1 and 3.
"""
out = []
def write(msg, verbose_level=0):
if verbosity > verbose_level:
out.append(msg)
write(f'Link target: {self.dn}', 1)
write(f'Binary Dn: {self}', 2)
write(f'Key Credential Link Blob version: {self.blob.version}', 2)
write(f'Number of key entries: {self.blob.count}', 1)
write('Key entries:')
entries = []
longest = 0
for description, verbose_level, fn, attr in [
("key material fingerprint", 0,
lambda x: ':'.join(f"{_:02X}" for _ in x),
'KeyID'),
("key parameters fingerprint", 2,
lambda x: ':'.join(f"{_:02X}" for _ in x),
'KeyHash'),
("key usage", 1, key_usage_string, 'KeyUsage'),
("Device GUID", 1, misc.GUID, 'DeviceId'),
("last logon", 0, nttime_as_date,
'KeyApproximateLastLogonTimeStamp'),
("creation time", 0, nttime_as_date, 'KeyCreationTime'),
# for now we are ignoring KeySource and CustomKeyInformation
# KeyMaterial is decoded separately
]:
if verbosity > 1:
description = f"{description} ({attr})"
i = getattr(keycredlink, attr)
try:
entry = self.get_entry(i)
value = fn(entry)
except KeyCredLinkError:
value = "not found"
if verbosity > verbose_level:
entries.append((description, value))
longest = max(longest, len(description))
for desc, val in entries:
write(f" {desc + ':':{longest + 1}} {val}")
data = self.get_entry(keycredlink.KeyMaterial)
key = get_public_key(data, 'der')
write("RSA public key properties:", 1)
write(f" key size: {key.key_size}", 1)
write(f" fingerprint: {self.fingerprint()}", 1)
return '\n'.join(out)
def key_material(self) -> bytes:
return self.get_entry(keycredlink.KeyMaterial)
def as_pem(self) -> str:
"""Get the key out of the keycredlink blob, and return it in
PEM format as a string.
PEM is the ASCII format that starts '-----BEGIN PUBLIC KEY-----'.
"""
# The key is in DER format in an entry in the blob.
data = self.key_material()
key = get_public_key(data, 'der')
pem = key.public_bytes(Encoding.PEM,
PublicFormat.SubjectPublicKeyInfo)
return pem.decode()
def get_public_key(data:bytes, encoding:Optional[str] = None) -> RSAPublicKey:
"""decode a key in PEM or DER format.
If it turns out to be a certificate or something, we try to get
the public key from that.
So far only RSA keys are supported.
"""
if encoding is None:
if data[:11] == b'-----BEGIN ':
encoding = 'PEM'
else:
encoding = 'DER'
encoding = encoding.upper()
# The cryptography module also supports ssh keys, PKCS1, and other
# formats, as well as non-RSA keys and extracting public keys from
# private. It might not be wise to tolerate all of this, but we
# can do it by adding to key_fns and cert_fns here.
if encoding == 'PEM':
key_fns = [load_pem_public_key]
cert_fns = [load_pem_x509_certificate]
elif encoding == 'DER':
key_fns = [load_der_public_key]
cert_fns = [load_der_x509_certificate]
else:
raise ValueError(f"Public key encoding '{encoding}' not supported "
"(try 'PEM' or 'DER')")
key = None
for fn in key_fns:
try:
key = fn(data)
break
except ValueError:
continue
if key is None:
for fn in cert_fns:
try:
cert = fn(data)
key = cert.public_key()
break
except ValueError:
continue
if key is None:
raise ValueError("could not decode public key")
if not isinstance(key, RSAPublicKey):
raise ValueError("Currently only RSA Public Keys are supported "
f"(not '{key}')")
return key
def kcl_entry_bytes(entry_type:int, data:bytes) -> bytes:
"""helper to pack key credential link entries"""
return struct.pack(' list:
"""Select only the input links that match at least one of the
criteria.
"""
# used in samba-tool X keytrust delete
selected = []
filters = []
if link_target is not None:
target_dn = Dn(samdb, link_target)
filters.append(lambda x: x.dn == target_dn)
if fingerprint is not None:
fingerprint = fingerprint.upper()
filters.append(lambda x: x.fingerprint() == fingerprint)
for x in keycredlinks:
for fn in filters:
if fn(x):
selected.append(x)
break
return selected