Updated crypto manager to use cryptography instead of pyCrypto

This commit is contained in:
Adolfo Gómez García 2020-02-18 01:46:24 +01:00
parent 2b4a0113a2
commit 3bc08d6b86
3 changed files with 71 additions and 36 deletions

View File

@ -308,8 +308,6 @@ def webPassword(request: HttpRequest) -> str:
The password is stored at session using a simple scramble algorithm that keeps the password splited at
session (db) and client browser cookies. This method uses this two values to recompose the user password
so we can provide it to remote sessions.
@param request: DJango Request
@return: Unscrambled user password
return cryptoManager().symDecrpyt(request.session.get(PASS_KEY, ''), getUDSCookie(request)) # recover as original unicode string

View File

@ -130,7 +130,7 @@ class DelayedTaskRunner:
now = getSqlDatetime()
exec_time = now + timedelta(seconds=delay)
cls = instance.__class__
instanceDump = encoders.encode(pickle.dumps(instance), 'base64', asText=True)
instanceDump = encoders.encodeAsStr(pickle.dumps(instance), 'base64')
typeName = str(cls.__module__ + '.' + cls.__name__)
logger.debug('Inserting delayed task %s with %s bytes (%s)', typeName, len(instanceDump), exec_time)

View File

@ -30,40 +30,41 @@
@author: Adolfo Gómez, dkmaster at dkmon dot com
import typing
import hashlib
import array
import uuid
import struct
import random
import string
import logging
import typing
from cryptography import x509
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives import serialization
from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.primitives.asymmetric import padding
from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
# Deprecating these. On future versions will only use
# cryptography libraries. Keep here for backwards compat with
# 1.x 2.x encriptions methods
from Crypto.PublicKey import RSA
from Crypto.Cipher import AES
from Crypto.Random import atfork # type: ignore
from OpenSSL import crypto
from django.conf import settings
from uds.core.util import encoders
logger = logging.getLogger(__name__)
# To generate an rsa key, first we need the crypt module
# next, we do:
# from Crypto.PublicKey import RSA
# import os
# RSA.generate(1024, os.urandom).exportKey()
class CryptoManager:
instance = None
def __init__(self):
self._rsa = RSA.importKey(settings.RSA_KEY)
self._rsa = serialization.load_pem_private_key( settings.RSA_KEY.encode(), password=None, backend=default_backend())
self._oldRsa = RSA.importKey(settings.RSA_KEY)
self._namespace = uuid.UUID('627a37a5-e8db-431a-b783-73f7d20b4934')
self._counter = 0
@ -94,30 +95,61 @@ class CryptoManager:
if isinstance(value, str):
value = value.encode('utf-8')
return typing.cast(str, encoders.encode((self._rsa.encrypt(value, b'')[0]), 'base64', asText=True))
return encoders.encodeAsStr(
# atfork()
# return typing.cast(str, encoders.encode((self._rsa.encrypt(value, b'')[0]), 'base64', asText=True))
def decrypt(self, value: typing.Union[str, bytes]) -> str:
if isinstance(value, str):
value = value.encode('utf-8')
# import inspect
data: bytes = typing.cast(bytes, encoders.decode(value, 'base64'))
decrypted: bytes
# First, try new "cryptografy" decrpypting
decrypted = self._rsa.decrypt(
except Exception: # If fails, try old method
return str(self._rsa.decrypt(encoders.decode(value, 'base64')).decode('utf-8'))
decrypted = self._oldRsa.decrypt(encoders.decode(value, 'base64'))
except Exception:
logger.exception('Decripting: %s', value)
# logger.error(inspect.stack())
return 'decript error'
return decrypted.decode()
def AESCrypt(self, text: bytes, key: bytes, base64: bool = False) -> bytes:
# First, match key to 16 bytes. If key is over 16, create a new one based on key of 16 bytes length
cipher = AES.new(CryptoManager.AESKey(key, 16), AES.MODE_CBC, 'udsinitvectoruds')
rndStr = self.randomString(cipher.block_size).encode('utf8')
cipher = Cipher(algorithms.AES(CryptoManager.AESKey(key, 16)), modes.CBC(b'udsinitvectoruds'), backend=default_backend())
rndStr = self.randomString(16).encode() # Same as block size of CBC (that is 16 here)
paddedLength = ((len(text) + 4 + 15) // 16) * 16
toEncode = struct.pack('>i', len(text)) + text + rndStr[:paddedLength - len(text) - 4]
encoded = cipher.encrypt(toEncode)
encryptor = cipher.encryptor()
encoded = encryptor.update(toEncode) + encryptor.finalize()
if base64:
return typing.cast(bytes, encoders.encode(encoded, 'base64', asText=False)) # Return as binary
return typing.cast(bytes, encoders.encode(encoded, 'base64')) # Return as binary
return encoded
@ -125,8 +157,10 @@ class CryptoManager:
if base64:
text = typing.cast(bytes, encoders.decode(text, 'base64'))
cipher = AES.new(CryptoManager.AESKey(key, 16), AES.MODE_CBC, 'udsinitvectoruds')
toDecode = cipher.decrypt(text)
cipher = Cipher(algorithms.AES(CryptoManager.AESKey(key, 16)), modes.CBC(b'udsinitvectoruds'), backend=default_backend())
decryptor = cipher.decryptor()
toDecode = decryptor.update(text) + decryptor.finalize()
return toDecode[4:4 + struct.unpack('>i', toDecode[:4])[0]]
def xor(self, s1: typing.Union[str, bytes], s2: typing.Union[str, bytes]) -> bytes:
@ -153,12 +187,15 @@ class CryptoManager:
raise e
return pk
def loadCertificate(self, certificate: str):
def loadCertificate(self, certificate: typing.Union[str, bytes]):
if isinstance(certificate, str):
certificate = certificate.encode()
# If invalid certificate, will raise an exception
cert = crypto.load_certificate(crypto.FILETYPE_PEM, certificate)
except crypto.Error as e:
raise Exception(e.message[0][2])
return cert
return x509.load_pem_x509_certificate(certificate, default_backend())
except Exception:
raise Exception('Invalid certificate')
def certificateString(self, certificate: str) -> str:
return certificate.replace('-----BEGIN CERTIFICATE-----', '').replace('-----END CERTIFICATE-----', '').replace('\n', '')