mirror of
https://github.com/dkmstr/openuds.git
synced 2024-12-25 23:21:41 +03:00
Merge remote-tracking branch 'origin/v3.6'
This commit is contained in:
commit
692c8601bf
@ -62,6 +62,10 @@ if typing.TYPE_CHECKING:
|
|||||||
|
|
||||||
|
|
||||||
class CryptoManager(metaclass=singleton.Singleton):
|
class CryptoManager(metaclass=singleton.Singleton):
|
||||||
|
_rsa: 'RSAPrivateKey'
|
||||||
|
_namespace: uuid.UUID
|
||||||
|
_counter: int
|
||||||
|
|
||||||
def __init__(self):
|
def __init__(self):
|
||||||
self._rsa = serialization.load_pem_private_key(
|
self._rsa = serialization.load_pem_private_key(
|
||||||
settings.RSA_KEY.encode(), password=None, backend=default_backend()
|
settings.RSA_KEY.encode(), password=None, backend=default_backend()
|
||||||
@ -92,7 +96,7 @@ class CryptoManager(metaclass=singleton.Singleton):
|
|||||||
|
|
||||||
def encrypt(self, value: str) -> str:
|
def encrypt(self, value: str) -> str:
|
||||||
return codecs.encode(
|
return codecs.encode(
|
||||||
self._rsa.public_key().encrypt( # type: ignore
|
self._rsa.public_key().encrypt(
|
||||||
value.encode(),
|
value.encode(),
|
||||||
padding.OAEP(
|
padding.OAEP(
|
||||||
mgf=padding.MGF1(algorithm=hashes.SHA256()),
|
mgf=padding.MGF1(algorithm=hashes.SHA256()),
|
||||||
@ -108,7 +112,7 @@ class CryptoManager(metaclass=singleton.Singleton):
|
|||||||
|
|
||||||
try:
|
try:
|
||||||
# First, try new "cryptografy" decrpypting
|
# First, try new "cryptografy" decrpypting
|
||||||
decrypted: bytes = self._rsa.decrypt( # type: ignore
|
decrypted: bytes = self._rsa.decrypt(
|
||||||
data,
|
data,
|
||||||
padding.OAEP(
|
padding.OAEP(
|
||||||
mgf=padding.MGF1(algorithm=hashes.SHA256()),
|
mgf=padding.MGF1(algorithm=hashes.SHA256()),
|
||||||
@ -158,19 +162,19 @@ class CryptoManager(metaclass=singleton.Singleton):
|
|||||||
toDecode = decryptor.update(text) + decryptor.finalize()
|
toDecode = decryptor.update(text) + decryptor.finalize()
|
||||||
return toDecode[4 : 4 + struct.unpack('>i', toDecode[:4])[0]]
|
return toDecode[4 : 4 + struct.unpack('>i', toDecode[:4])[0]]
|
||||||
|
|
||||||
def xor(self, s1: typing.Union[str, bytes], s2: typing.Union[str, bytes]) -> bytes:
|
def xor(self, value: typing.Union[str, bytes], key: typing.Union[str, bytes]) -> bytes:
|
||||||
if not s2:
|
if not key:
|
||||||
return b'' # Protect against division by cero
|
return b'' # Protect against division by cero
|
||||||
|
|
||||||
if isinstance(s1, str):
|
if isinstance(value, str):
|
||||||
s1 = s1.encode('utf-8')
|
value = value.encode('utf-8')
|
||||||
if isinstance(s2, str):
|
if isinstance(key, str):
|
||||||
s2 = s2.encode('utf-8')
|
key = key.encode('utf-8')
|
||||||
mult = len(s1) // len(s2) + 1
|
mult = len(value) // len(key) + 1
|
||||||
s1a = array.array('B', s1)
|
value_array = array.array('B', value)
|
||||||
s2a = array.array('B', s2 * mult)
|
key_array = array.array('B', key * mult) # Ensure key array is at least as long as value_array
|
||||||
# We must return bynary in xor, because result is in fact binary
|
# We must return binary in xor, because result is in fact binary
|
||||||
return array.array('B', (s1a[i] ^ s2a[i] for i in range(len(s1a)))).tobytes()
|
return array.array('B', (value_array[i] ^ key_array[i] for i in range(len(value_array)))).tobytes()
|
||||||
|
|
||||||
def symCrypt(
|
def symCrypt(
|
||||||
self, text: typing.Union[str, bytes], key: typing.Union[str, bytes]
|
self, text: typing.Union[str, bytes], key: typing.Union[str, bytes]
|
||||||
|
Loading…
Reference in New Issue
Block a user