# Unix SMB/CIFS implementation. # Copyright (C) Catalyst.Net Ltd 2025 # # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program. If not, see . # """Functions for processing key_credential_link""" import base64 from hashlib import sha256 import struct import time from typing import Optional, Union, Iterable from cryptography.hazmat.primitives.serialization import ( load_der_public_key, load_pem_public_key, PublicFormat, Encoding) from cryptography.hazmat.primitives.asymmetric.rsa import RSAPublicKey from cryptography.x509 import ( load_pem_x509_certificate, load_der_x509_certificate) from samba import nttime2unix from samba.samdb import SamDB, BinaryDn from samba.ndr import ndr_unpack, ndr_pack from ldb import Dn from samba.dcerpc import keycredlink, misc class KeyCredLinkError(Exception): """The key credential link is inconsistent.""" # For bad values handed in, we use ValueError. For internal bad # values, we use this. def key_usage_string(i): # there must be a better way. for s in ('KEY_USAGE_NGC', 'KEY_USAGE_FIDO', 'KEY_USAGE_FEK',): if i == getattr(keycredlink, s): return s return "unknown" def nttime_as_date(nt): secs = nttime2unix(nt) ts = time.gmtime(secs) return time.strftime('%Y-%m-%d %H:%M:%S', ts) class KeyCredentialLinkDn(BinaryDn): """KeyCredentialLink attributes are stored as DN+Binary. The binary part is a KEYCREDENTIALLINK_BLOB, which is basically an array of KEYCREDENTIALLINK_ENTRY collectively describing a public key. Usually the DN refers to the object the KeyCredentialLink was found on. """ # We make .binary a @property, so that BinaryDn's .parse() and # .prefix just work without knowing that assigning to .binary is # doing validation checks. blob = None @property def binary(self) -> bytes: """The binary is stored as a keycredlink.KEYCREDENTIALLINK_BLOB""" if self.blob is None: return None return ndr_pack(self.blob) @binary.setter def binary(self, value:bytes): try: self.blob = ndr_unpack(keycredlink.KEYCREDENTIALLINK_BLOB, value) except Exception as e: raise ValueError("Could not parse value as KEYCREDENTIALLINK_BLOB " f" (internal error: {e})") def get_entry(self, entry_id): if self.blob is None: raise KeyCredLinkError("no key material") for entry in self.blob.entries: if entry.identifier == entry_id: return entry.value raise KeyCredLinkError(f"Key information entry {entry_id} not found") def fingerprint(self) -> str: """The SHA256 of the key material in DER encoding, formatted as hex pairs separated by colons ("hh:hh:...")""" # A competing format is '2048 SHA256:' (ssh style). # This sha256 value should also be stored in the KeyID field. data = self.get_entry(keycredlink.KeyMaterial) hash = sha256(data).digest() # Python 3.8+ will do this with hash.hex(':') return ':'.join(f'{_:02X}' for _ in hash) def description(self, verbosity=2) -> str: """Text describing key credential link characteristics. verbosity is adjustable between 1 and 3. """ out = [] def write(msg, verbose_level=0): if verbosity > verbose_level: out.append(msg) write(f'Link target: {self.dn}', 1) write(f'Binary Dn: {self}', 2) write(f'Key Credential Link Blob version: {self.blob.version}', 2) write(f'Number of key entries: {self.blob.count}', 1) write('Key entries:') entries = [] longest = 0 for description, verbose_level, fn, attr in [ ("key material fingerprint", 0, lambda x: ':'.join(f"{_:02X}" for _ in x), 'KeyID'), ("key parameters fingerprint", 2, lambda x: ':'.join(f"{_:02X}" for _ in x), 'KeyHash'), ("key usage", 1, key_usage_string, 'KeyUsage'), ("Device GUID", 1, misc.GUID, 'DeviceId'), ("last logon", 0, nttime_as_date, 'KeyApproximateLastLogonTimeStamp'), ("creation time", 0, nttime_as_date, 'KeyCreationTime'), # for now we are ignoring KeySource and CustomKeyInformation # KeyMaterial is decoded separately ]: if verbosity > 1: description = f"{description} ({attr})" i = getattr(keycredlink, attr) try: entry = self.get_entry(i) value = fn(entry) except KeyCredLinkError: value = "not found" if verbosity > verbose_level: entries.append((description, value)) longest = max(longest, len(description)) for desc, val in entries: write(f" {desc + ':':{longest + 1}} {val}") data = self.get_entry(keycredlink.KeyMaterial) key = get_public_key(data, 'der') write("RSA public key properties:", 1) write(f" key size: {key.key_size}", 1) write(f" fingerprint: {self.fingerprint()}", 1) return '\n'.join(out) def key_material(self) -> bytes: return self.get_entry(keycredlink.KeyMaterial) def as_pem(self) -> str: """Get the key out of the keycredlink blob, and return it in PEM format as a string. PEM is the ASCII format that starts '-----BEGIN PUBLIC KEY-----'. """ # The key is in DER format in an entry in the blob. data = self.key_material() key = get_public_key(data, 'der') pem = key.public_bytes(Encoding.PEM, PublicFormat.SubjectPublicKeyInfo) return pem.decode() def get_public_key(data:bytes, encoding:Optional[str] = None) -> RSAPublicKey: """decode a key in PEM or DER format. If it turns out to be a certificate or something, we try to get the public key from that. So far only RSA keys are supported. """ if encoding is None: if data[:11] == b'-----BEGIN ': encoding = 'PEM' else: encoding = 'DER' encoding = encoding.upper() # The cryptography module also supports ssh keys, PKCS1, and other # formats, as well as non-RSA keys and extracting public keys from # private. It might not be wise to tolerate all of this, but we # can do it by adding to key_fns and cert_fns here. if encoding == 'PEM': key_fns = [load_pem_public_key] cert_fns = [load_pem_x509_certificate] elif encoding == 'DER': key_fns = [load_der_public_key] cert_fns = [load_der_x509_certificate] else: raise ValueError(f"Public key encoding '{encoding}' not supported " "(try 'PEM' or 'DER')") key = None for fn in key_fns: try: key = fn(data) break except ValueError: continue if key is None: for fn in cert_fns: try: cert = fn(data) key = cert.public_key() break except ValueError: continue if key is None: raise ValueError("could not decode public key") if not isinstance(key, RSAPublicKey): raise ValueError("Currently only RSA Public Keys are supported " f"(not '{key}')") return key def kcl_entry_bytes(entry_type:int, data:bytes) -> bytes: """helper to pack key credential link entries""" return struct.pack(' list: """Select only the input links that match at least one of the criteria. """ # used in samba-tool X keytrust delete selected = [] filters = [] if link_target is not None: target_dn = Dn(samdb, link_target) filters.append(lambda x: x.dn == target_dn) if fingerprint is not None: fingerprint = fingerprint.upper() filters.append(lambda x: x.fingerprint() == fingerprint) for x in keycredlinks: for fn in filters: if fn(x): selected.append(x) break return selected