mirror of
https://github.com/dkmstr/openuds.git
synced 2025-03-13 08:58:35 +03:00
Backport of 4.0 crypto manager
For enhacements on user password hashing
This commit is contained in:
parent
dba2526ffb
commit
859f09aa0b
@ -33,11 +33,14 @@ import hashlib
|
||||
import array
|
||||
import uuid
|
||||
import codecs
|
||||
import datetime
|
||||
import struct
|
||||
import random
|
||||
import re
|
||||
import string
|
||||
import logging
|
||||
import typing
|
||||
import secrets
|
||||
import time
|
||||
|
||||
|
||||
from cryptography import x509
|
||||
@ -132,9 +135,7 @@ class CryptoManager(metaclass=singleton.Singleton):
|
||||
modes.CBC(b'udsinitvectoruds'),
|
||||
backend=default_backend(),
|
||||
)
|
||||
rndStr = self.randomString(
|
||||
16
|
||||
).encode() # Same as block size of CBC (that is 16 here)
|
||||
rndStr = secrets.token_bytes(16) # Same as block size of CBC (that is 16 here)
|
||||
paddedLength = ((len(text) + 4 + 15) // 16) * 16
|
||||
toEncode = (
|
||||
struct.pack('>i', len(text)) + text + rndStr[: paddedLength - len(text) - 4]
|
||||
@ -143,7 +144,7 @@ class CryptoManager(metaclass=singleton.Singleton):
|
||||
encoded = encryptor.update(toEncode) + encryptor.finalize()
|
||||
|
||||
if base64:
|
||||
return codecs.encode(encoded, 'base64') # Return as binary
|
||||
encoded = codecs.encode(encoded, 'base64').strip() # Return as bytes
|
||||
|
||||
return encoded
|
||||
|
||||
@ -161,7 +162,9 @@ class CryptoManager(metaclass=singleton.Singleton):
|
||||
toDecode = decryptor.update(text) + decryptor.finalize()
|
||||
return toDecode[4 : 4 + struct.unpack('>i', toDecode[:4])[0]]
|
||||
|
||||
def xor(self, value: typing.Union[str, bytes], key: typing.Union[str, bytes]) -> bytes:
|
||||
def xor(
|
||||
self, value: typing.Union[str, bytes], key: typing.Union[str, bytes]
|
||||
) -> bytes:
|
||||
if not key:
|
||||
return b'' # Protect against division by cero
|
||||
|
||||
@ -171,9 +174,13 @@ class CryptoManager(metaclass=singleton.Singleton):
|
||||
key = key.encode('utf-8')
|
||||
mult = len(value) // len(key) + 1
|
||||
value_array = array.array('B', value)
|
||||
key_array = array.array('B', key * mult) # Ensure key array is at least as long as value_array
|
||||
key_array = array.array(
|
||||
'B', key * mult
|
||||
) # Ensure key array is at least as long as value_array
|
||||
# We must return binary in xor, because result is in fact binary
|
||||
return array.array('B', (value_array[i] ^ key_array[i] for i in range(len(value_array)))).tobytes()
|
||||
return array.array(
|
||||
'B', (value_array[i] ^ key_array[i] for i in range(len(value_array)))
|
||||
).tobytes()
|
||||
|
||||
def symCrypt(
|
||||
self, text: typing.Union[str, bytes], key: typing.Union[str, bytes]
|
||||
@ -227,20 +234,30 @@ class CryptoManager(metaclass=singleton.Singleton):
|
||||
raise Exception('Invalid certificate')
|
||||
|
||||
def certificateString(self, certificate: str) -> str:
|
||||
return (
|
||||
certificate.replace('-----BEGIN CERTIFICATE-----', '')
|
||||
.replace('-----END CERTIFICATE-----', '')
|
||||
.replace('\n', '')
|
||||
)
|
||||
# Remove -----.*-----\n strings using regex
|
||||
return re.sub(r'(-----.*-----\n)', '', certificate)
|
||||
|
||||
def secret(self, length: int = 16) -> str:
|
||||
"""
|
||||
Get a random secret string from config.SECRET_KEY
|
||||
"""
|
||||
from django.conf import settings
|
||||
return settings.SECRET_KEY[:length]
|
||||
|
||||
def salt(self, length: int = 16) -> str:
|
||||
"""
|
||||
Get a random salt random string
|
||||
"""
|
||||
return secrets.token_hex(length)
|
||||
|
||||
def hash(self, value: typing.Union[str, bytes]) -> str:
|
||||
if isinstance(value, str):
|
||||
value = value.encode()
|
||||
|
||||
if not value:
|
||||
return ''
|
||||
salt = self.salt(8) # 8 bytes = 16 chars
|
||||
value = salt.encode() + value
|
||||
|
||||
return '{SHA256}' + str(hashlib.sha3_256(value).hexdigest())
|
||||
return '{SHA256SALT}' + salt + str(hashlib.sha3_256(value).hexdigest())
|
||||
|
||||
def checkHash(self, value: typing.Union[str, bytes], hash: str) -> bool:
|
||||
if isinstance(value, str):
|
||||
@ -250,9 +267,14 @@ class CryptoManager(metaclass=singleton.Singleton):
|
||||
return not hash
|
||||
|
||||
if hash[:8] == '{SHA256}':
|
||||
return str(hashlib.sha3_256(value).hexdigest()) == hash[8:]
|
||||
return secrets.compare_digest(hashlib.sha3_256(value).hexdigest(), hash[8:])
|
||||
elif hash[:12] == '{SHA256SALT}':
|
||||
# Extract 16 chars salt and hash
|
||||
salt = hash[12:28].encode()
|
||||
value = salt + value
|
||||
return secrets.compare_digest(hashlib.sha3_256(value).hexdigest(), hash[28:])
|
||||
else: # Old sha1
|
||||
return hash == str(hashlib.sha1(value).hexdigest())
|
||||
return secrets.compare_digest(hash, str(hashlib.sha1(value).hexdigest())) # nosec: Old compatibility SHA1, not used anymore but need to be supported
|
||||
|
||||
def uuid(self, obj: typing.Any = None) -> str:
|
||||
"""
|
||||
@ -273,4 +295,12 @@ class CryptoManager(metaclass=singleton.Singleton):
|
||||
|
||||
def randomString(self, length: int = 40, digits: bool = True) -> str:
|
||||
base = string.ascii_letters + (string.digits if digits else '')
|
||||
return ''.join(random.SystemRandom().choices(base, k=length))
|
||||
return ''.join(secrets.choice(base) for _ in range(length))
|
||||
|
||||
def unique(self) -> str:
|
||||
return hashlib.sha3_256(
|
||||
(
|
||||
self.randomString(24, True)
|
||||
+ datetime.datetime.now().strftime('%H%M%S%f')
|
||||
).encode()
|
||||
).hexdigest()
|
||||
|
Loading…
x
Reference in New Issue
Block a user