diff --git a/server/src/uds/core/managers/crypto.py b/server/src/uds/core/managers/crypto.py index 770523b2f..d981f35e1 100644 --- a/server/src/uds/core/managers/crypto.py +++ b/server/src/uds/core/managers/crypto.py @@ -33,11 +33,14 @@ import hashlib import array import uuid import codecs +import datetime import struct -import random +import re import string import logging import typing +import secrets +import time from cryptography import x509 @@ -132,9 +135,7 @@ class CryptoManager(metaclass=singleton.Singleton): modes.CBC(b'udsinitvectoruds'), backend=default_backend(), ) - rndStr = self.randomString( - 16 - ).encode() # Same as block size of CBC (that is 16 here) + rndStr = secrets.token_bytes(16) # Same as block size of CBC (that is 16 here) paddedLength = ((len(text) + 4 + 15) // 16) * 16 toEncode = ( struct.pack('>i', len(text)) + text + rndStr[: paddedLength - len(text) - 4] @@ -143,7 +144,7 @@ class CryptoManager(metaclass=singleton.Singleton): encoded = encryptor.update(toEncode) + encryptor.finalize() if base64: - return codecs.encode(encoded, 'base64') # Return as binary + encoded = codecs.encode(encoded, 'base64').strip() # Return as bytes return encoded @@ -161,7 +162,9 @@ class CryptoManager(metaclass=singleton.Singleton): toDecode = decryptor.update(text) + decryptor.finalize() return toDecode[4 : 4 + struct.unpack('>i', toDecode[:4])[0]] - def xor(self, value: typing.Union[str, bytes], key: typing.Union[str, bytes]) -> bytes: + def xor( + self, value: typing.Union[str, bytes], key: typing.Union[str, bytes] + ) -> bytes: if not key: return b'' # Protect against division by cero @@ -171,9 +174,13 @@ class CryptoManager(metaclass=singleton.Singleton): key = key.encode('utf-8') mult = len(value) // len(key) + 1 value_array = array.array('B', value) - key_array = array.array('B', key * mult) # Ensure key array is at least as long as value_array + key_array = array.array( + 'B', key * mult + ) # Ensure key array is at least as long as value_array # We must return binary in xor, because result is in fact binary - return array.array('B', (value_array[i] ^ key_array[i] for i in range(len(value_array)))).tobytes() + return array.array( + 'B', (value_array[i] ^ key_array[i] for i in range(len(value_array))) + ).tobytes() def symCrypt( self, text: typing.Union[str, bytes], key: typing.Union[str, bytes] @@ -227,20 +234,30 @@ class CryptoManager(metaclass=singleton.Singleton): raise Exception('Invalid certificate') def certificateString(self, certificate: str) -> str: - return ( - certificate.replace('-----BEGIN CERTIFICATE-----', '') - .replace('-----END CERTIFICATE-----', '') - .replace('\n', '') - ) + # Remove -----.*-----\n strings using regex + return re.sub(r'(-----.*-----\n)', '', certificate) + + def secret(self, length: int = 16) -> str: + """ + Get a random secret string from config.SECRET_KEY + """ + from django.conf import settings + return settings.SECRET_KEY[:length] + + def salt(self, length: int = 16) -> str: + """ + Get a random salt random string + """ + return secrets.token_hex(length) def hash(self, value: typing.Union[str, bytes]) -> str: if isinstance(value, str): value = value.encode() - if not value: - return '' + salt = self.salt(8) # 8 bytes = 16 chars + value = salt.encode() + value - return '{SHA256}' + str(hashlib.sha3_256(value).hexdigest()) + return '{SHA256SALT}' + salt + str(hashlib.sha3_256(value).hexdigest()) def checkHash(self, value: typing.Union[str, bytes], hash: str) -> bool: if isinstance(value, str): @@ -250,9 +267,14 @@ class CryptoManager(metaclass=singleton.Singleton): return not hash if hash[:8] == '{SHA256}': - return str(hashlib.sha3_256(value).hexdigest()) == hash[8:] + return secrets.compare_digest(hashlib.sha3_256(value).hexdigest(), hash[8:]) + elif hash[:12] == '{SHA256SALT}': + # Extract 16 chars salt and hash + salt = hash[12:28].encode() + value = salt + value + return secrets.compare_digest(hashlib.sha3_256(value).hexdigest(), hash[28:]) else: # Old sha1 - return hash == str(hashlib.sha1(value).hexdigest()) + return secrets.compare_digest(hash, str(hashlib.sha1(value).hexdigest())) # nosec: Old compatibility SHA1, not used anymore but need to be supported def uuid(self, obj: typing.Any = None) -> str: """ @@ -273,4 +295,12 @@ class CryptoManager(metaclass=singleton.Singleton): def randomString(self, length: int = 40, digits: bool = True) -> str: base = string.ascii_letters + (string.digits if digits else '') - return ''.join(random.SystemRandom().choices(base, k=length)) + return ''.join(secrets.choice(base) for _ in range(length)) + + def unique(self) -> str: + return hashlib.sha3_256( + ( + self.randomString(24, True) + + datetime.datetime.now().strftime('%H%M%S%f') + ).encode() + ).hexdigest()