1
0
mirror of https://github.com/dkmstr/openuds.git synced 2025-01-10 01:17:59 +03:00

Merge remote-tracking branch 'origin/v3.6'

This commit is contained in:
Adolfo Gómez García 2023-04-06 17:21:07 +02:00
commit 09473d627a
No known key found for this signature in database
GPG Key ID: DD1ABF20724CDA23

View File

@ -16,14 +16,22 @@ import certifi
import requests import requests
import requests.adapters import requests.adapters
KEY_SIZE = 4096
SECRET_SIZE = 32
def selfSignedCert(ip: str) -> typing.Tuple[str, str, str]: def selfSignedCert(ip: str) -> typing.Tuple[str, str, str]:
"""
Generates a self signed certificate for the given ip.
This method is mainly intended to be used for generating/saving Actor certificates.
UDS will check that actor server certificate is the one generated by this method.
"""
key = rsa.generate_private_key( key = rsa.generate_private_key(
public_exponent=65537, public_exponent=65537,
key_size=4096, key_size=KEY_SIZE,
backend=default_backend(), backend=default_backend(),
) )
# Create a random password for private key # Create a random password for private key
password = secrets.token_urlsafe(32) password = secrets.token_hex(SECRET_SIZE)
name = x509.Name([x509.NameAttribute(NameOID.COMMON_NAME, ip)]) name = x509.Name([x509.NameAttribute(NameOID.COMMON_NAME, ip)])
san = x509.SubjectAlternativeName([x509.IPAddress(ipaddress.ip_address(ip))]) san = x509.SubjectAlternativeName([x509.IPAddress(ipaddress.ip_address(ip))])
@ -57,15 +65,23 @@ def selfSignedCert(ip: str) -> typing.Tuple[str, str, str]:
def createClientSslContext(verify: bool = True) -> ssl.SSLContext: def createClientSslContext(verify: bool = True) -> ssl.SSLContext:
if verify: """
sslContext = ssl.create_default_context(purpose=ssl.Purpose.SERVER_AUTH, cafile=certifi.where()) Creates a SSLContext for client connections.
else:
sslContext = ssl._create_unverified_context(purpose=ssl.Purpose.SERVER_AUTH, check_hostname=False)
Args:
verify: If True, the server certificate will be verified. (Default: True)
# Disable TLS1.0 and TLS1.1 Returns:
# Redundant, only use minimum_version A SSLContext object.
# sslContext.options |= ssl.OP_NO_TLSv1 | ssl.OP_NO_TLSv1_1 """
sslContext = ssl.create_default_context(purpose=ssl.Purpose.SERVER_AUTH, cafile=certifi.where())
if not verify:
sslContext.check_hostname = False
sslContext.verify_mode = ssl.CERT_NONE
# Disable TLS1.0 and TLS1.1, SSLv2 and SSLv3 are disabled by default
# Redundant in fact, i think... :)
sslContext.options |= ssl.OP_NO_TLSv1 | ssl.OP_NO_TLSv1_1 | ssl.OP_NO_SSLv2 | ssl.OP_NO_SSLv3
sslContext.minimum_version = ssl.TLSVersion.TLSv1_2 sslContext.minimum_version = ssl.TLSVersion.TLSv1_2
return sslContext return sslContext
@ -73,6 +89,7 @@ def createClientSslContext(verify: bool = True) -> ssl.SSLContext:
def checkCertificateMatchPrivateKey(*, cert: str, key: str) -> bool: def checkCertificateMatchPrivateKey(*, cert: str, key: str) -> bool:
""" """
Checks if a certificate and a private key match. Checks if a certificate and a private key match.
All parameters must be keyword arguments.
Borh must be in PEM format. Borh must be in PEM format.
""" """
try: try:
@ -96,6 +113,8 @@ def checkCertificateMatchPrivateKey(*, cert: str, key: str) -> bool:
) )
return public_cert == public_key return public_cert == public_key
except Exception: except Exception:
# Not intended to show kind of error, just to return False if the certificate does not match the key
# Even if the key or certificate is not valid, we only want a True if they match, False otherwise
return False return False
def secureRequestsSession(verify: bool = True) -> 'requests.Session': def secureRequestsSession(verify: bool = True) -> 'requests.Session':
@ -103,15 +122,13 @@ def secureRequestsSession(verify: bool = True) -> 'requests.Session':
def init_poolmanager(self, *args, **kwargs) -> None: def init_poolmanager(self, *args, **kwargs) -> None:
sslContext = createClientSslContext(verify=verify) sslContext = createClientSslContext(verify=verify)
ssl.create_default_context(purpose=ssl.Purpose.SERVER_AUTH)
# See urllib3.poolmanager.SSL_KEYWORDS for all available keys. # See urllib3.poolmanager.SSL_KEYWORDS for all available keys.
kwargs["ssl_context"] = sslContext kwargs["ssl_context"] = sslContext
return super().init_poolmanager(*args, **kwargs) return super().init_poolmanager(*args, **kwargs)
def cert_verify(self, conn, url, _, cert): # pylint: disable=unused-argument def cert_verify(self, conn, url, _, cert):
# Overridden to do nothing # Overridden to disable cert verification if verify is False
return super().cert_verify(conn, url, verify, cert) return super().cert_verify(conn, url, verify, cert)
session = requests.Session() session = requests.Session()