1
0
mirror of https://github.com/dkmstr/openuds.git synced 2025-02-02 09:47:13 +03:00

Refactor code to use validate_certificate instead of validate_server_certificate in server registration

This commit is contained in:
Adolfo Gómez García 2024-09-08 21:45:03 +02:00
parent 6a0244e83d
commit 320e97b85d
No known key found for this signature in database
GPG Key ID: DD1ABF20724CDA23
5 changed files with 43 additions and 38 deletions

View File

@ -84,7 +84,7 @@ class ServerRegisterBase(Handler):
validators.validate_fqdn(hostname)
validators.validate_mac(mac)
validators.validate_json(data)
validators.validate_server_certificate(certificate)
validators.validate_certificate(certificate)
except Exception as e:
raise rest_exceptions.RequestError(str(e)) from e

View File

@ -1,7 +1,7 @@
# -*- coding: utf-8 -*-
#
# Copyright (c) 2012-2022 Virtual Cable S.L.U.
# Copyright (c) 2012-2024 Virtual Cable S.L.U.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without modification,
@ -48,7 +48,7 @@ from uds.core import auths, exceptions, types
from uds.core.managers.crypto import CryptoManager
from uds.core.types.requests import ExtendedHttpRequest
from uds.core.ui import gui
from uds.core.util import security, decorators, auth as auth_utils
from uds.core.util import security, decorators, auth as auth_utils, validators
from uds.core.util.model import sql_now
# Not imported at runtime, just for type checking
@ -389,34 +389,8 @@ class SAMLAuthenticator(auths.Authenticator):
self.cache.remove('idpMetadata')
# First, validate certificates
# This is in fact not needed, but we may say something useful to user if we check this
if self.server_certificate.value.startswith('-----BEGIN CERTIFICATE-----\n') is False:
raise exceptions.ui.ValidationError(
gettext(
'Server certificate should be a valid PEM (PEM certificates starts with -----BEGIN CERTIFICATE-----)'
)
)
try:
CryptoManager().load_certificate(self.server_certificate.value)
except Exception as e:
raise exceptions.ui.ValidationError(gettext('Invalid server certificate. ') + str(e))
if (
self.private_key.value.startswith('-----BEGIN RSA PRIVATE KEY-----\n') is False
and self.private_key.value.startswith('-----BEGIN PRIVATE KEY-----\n') is False
):
raise exceptions.ui.ValidationError(
gettext(
'Private key should be a valid PEM (PEM private keys starts with -----BEGIN RSA PRIVATE KEY-----'
)
)
try:
CryptoManager().load_private_key(self.private_key.value)
except Exception as e:
raise exceptions.ui.ValidationError(gettext('Invalid private key. ') + str(e))
validators.validate_private_key(self.private_key.value)
validators.validate_certificate(self.server_certificate.value)
if not security.check_certificate_matches_private_key(
cert=self.server_certificate.value, key=self.private_key.value

View File

@ -196,7 +196,7 @@ class SimpleLDAPAuthenticator(auths.Authenticator):
def initialize(self, values: typing.Optional[dict[str, typing.Any]]) -> None:
if values:
self.username_attr.value = self.username_attr.value.replace(' ', '') # Removes white spaces
validators.validate_server_certificate(self.certificate.value)
validators.validate_certificate(self.certificate.value)
def unmarshal(self, data: bytes) -> None:
if not data.startswith(b'v'):

View File

@ -199,6 +199,7 @@ def secure_requests_session(*, verify: typing.Union[str, bool] = True) -> 'reque
class UDSHTTPAdapter(requests.adapters.HTTPAdapter):
_ssl_context: ssl.SSLContext
def init_poolmanager(self, *args: typing.Any, **kwargs: typing.Any) -> None:
self._ssl_context = kwargs["ssl_context"] = create_client_sslcontext(verify=verify is True)
@ -244,3 +245,16 @@ def is_server_certificate_valid(cert: str) -> bool:
return True
except Exception:
return False
def is_private_key_valid(key: str) -> bool:
"""
Checks if a private key is valid.
All parameters must be keyword arguments.
Borh must be in PEM format.
"""
try:
serialization.load_pem_private_key(key.encode(), password=None, backend=default_backend())
return True
except Exception:
return False

View File

@ -380,7 +380,7 @@ def validate_json(json_data: typing.Optional[str]) -> typing.Any:
raise exceptions.ui.ValidationError(_('Invalid JSON data')) from None
def validate_server_certificate(cert: typing.Optional[str]) -> str:
def validate_certificate(cert: typing.Optional[str]) -> str:
"""
Validates that a certificate is valid
@ -394,13 +394,30 @@ def validate_server_certificate(cert: typing.Optional[str]) -> str:
str: Certificate
"""
if not cert:
return ''
try:
security.is_server_certificate_valid(cert)
except Exception as e:
raise exceptions.ui.ValidationError(_('Invalid certificate') + f' :{e}') from e
raise exceptions.ui.ValidationError(_('Certificate is empty'))
if security.is_server_certificate_valid(cert) is False:
raise exceptions.ui.ValidationError(_('Invalid certificate'))
return cert
def validate_private_key(key: typing.Optional[str]) -> str:
"""
Validates that a private key is valid
Args:
key (str): Private key to validate
Raises:
exceptions.ui.ValidationError: If private key is not valid
Returns:
str: Private key
"""
if not key:
raise exceptions.ui.ValidationError(_('Private key is empty'))
if security.is_private_key_valid(key) is False:
raise exceptions.ui.ValidationError(_('Invalid private key'))
return key
def validate_server_certificate_multiple(value: typing.Optional[str]) -> str:
"""