1
0
mirror of https://github.com/dkmstr/openuds.git synced 2025-01-08 21:18:00 +03:00

Fixes on crypto typing and refactorized userId on mfas, and added support for using identifier instead of username in RADIUS MFA

This commit is contained in:
Adolfo Gómez García 2024-07-19 18:07:37 +02:00
parent abedf042da
commit 0e097e3a12
No known key found for this signature in database
GPG Key ID: DD1ABF20724CDA23
9 changed files with 83 additions and 74 deletions

View File

@ -74,8 +74,11 @@ class CryptoManager(metaclass=singleton.Singleton):
_namespace: uuid.UUID
def __init__(self) -> None:
self._rsa = serialization.load_pem_private_key(
self._rsa = typing.cast(
'RSAPrivateKey',
serialization.load_pem_private_key(
settings.RSA_KEY.encode(), password=None, backend=default_backend()
),
)
self._namespace = uuid.UUID('627a37a5-e8db-431a-b783-73f7d20b4934')
@ -228,7 +231,10 @@ class CryptoManager(metaclass=singleton.Singleton):
self, rsaKey: str
) -> typing.Union['RSAPrivateKey', 'DSAPrivateKey', 'DHPrivateKey', 'EllipticCurvePrivateKey']:
try:
return serialization.load_pem_private_key(rsaKey.encode(), password=None, backend=default_backend())
return typing.cast(
typing.Union['RSAPrivateKey', 'DSAPrivateKey', 'DHPrivateKey', 'EllipticCurvePrivateKey'],
serialization.load_pem_private_key(rsaKey.encode(), password=None, backend=default_backend()),
)
except Exception as e:
raise e

View File

@ -224,13 +224,13 @@ class MFA(Module):
"""
return 'MFA Code'
def html(self, request: 'ExtendedHttpRequest', userId: str, username: str) -> str:
def html(self, request: 'ExtendedHttpRequest', userid: str, username: str) -> str:
"""
This method will be invoked from the MFA form, to know the HTML that will be presented
to the user below the MFA code form.
Args:
userId: Id of the user that is requesting the MFA code
userid: Id of the user that is requesting the MFA code
request: Request object, so you can get more information
Returns:
@ -251,7 +251,7 @@ class MFA(Module):
def send_code(
self,
request: 'ExtendedHttpRequest',
userId: str,
userid: str,
username: str,
identifier: str,
code: str,
@ -266,32 +266,32 @@ class MFA(Module):
raise exceptions.auth.MFAError('MFA.sendCode not implemented')
def _get_data(
self, request: 'ExtendedHttpRequest', userId: str
self, request: 'ExtendedHttpRequest', userid: str
) -> typing.Optional[tuple[datetime.datetime, str]]:
"""
Internal method to get the data from storage
"""
storageKey = request.ip + userId
storageKey = request.ip + userid
return self.storage.read_pickled(storageKey)
def _remove_data(self, request: 'ExtendedHttpRequest', userId: str) -> None:
def _remove_data(self, request: 'ExtendedHttpRequest', userid: str) -> None:
"""
Internal method to remove the data from storage
"""
storageKey = request.ip + userId
storageKey = request.ip + userid
self.storage.remove(storageKey)
def _put_data(self, request: 'ExtendedHttpRequest', userId: str, code: str) -> None:
def _put_data(self, request: 'ExtendedHttpRequest', userid: str, code: str) -> None:
"""
Internal method to put the data into storage
"""
storageKey = request.ip + userId
storageKey = request.ip + userid
self.storage.save_pickled(storageKey, (sql_now(), code))
def process(
self,
request: 'ExtendedHttpRequest',
userId: str,
userid: str,
username: str,
identifier: str,
validity: typing.Optional[int] = None,
@ -307,7 +307,7 @@ class MFA(Module):
Args:
request: The request object
userId: An unique, non authenticator dependant, id for the user (at this time, it's sha3_256 of user + authenticator)
userid: An unique, non authenticator dependant, id for the user (at this time, it's sha3_256 of user + authenticator)
username: The user name, the one used to login
identifier: The identifier where to send the code (phone, email, etc)
validity: The validity of the code in seconds. If None, the default value will be used.
@ -318,7 +318,7 @@ class MFA(Module):
Raises an error if the code was not sent and was required to be sent
"""
# try to get the stored code
data = self._get_data(request, userId)
data = self._get_data(request, userid)
validity = validity if validity is not None else 0
try:
if data and validity:
@ -328,7 +328,7 @@ class MFA(Module):
return MFA.RESULT.OK
except Exception:
# if we have a problem, just remove the stored code
self._remove_data(request, userId)
self._remove_data(request, userid)
# Generate a 6 digit code (0-9)
code = ''.join(random.SystemRandom().choices('0123456789', k=6))
@ -337,17 +337,17 @@ class MFA(Module):
# Send the code to the user
# May raise an exception if the code was not sent and is required to be sent
# pylint: disable=assignment-from-no-return
result = self.send_code(request, userId, username, identifier, code)
result = self.send_code(request, userid, username, identifier, code)
# Store the code in the database, own storage space, if no exception was raised
self._put_data(request, userId, code)
self._put_data(request, userid, code)
return result
def validate(
self,
request: 'ExtendedHttpRequest',
userId: str,
userid: str,
username: str,
identifier: str,
code: str,
@ -359,7 +359,7 @@ class MFA(Module):
Args:
request: The request object
userId: An unique, non authenticator dependant, id for the user (at this time, it's sha3_256 of user + authenticator)
userid: An unique, non authenticator dependant, id for the user (at this time, it's sha3_256 of user + authenticator)
username: The user name, the one used to login
identifier: The identifier where to send the code (phone, email, etc)
code: The code entered by the user
@ -373,19 +373,19 @@ class MFA(Module):
try:
err = _('Invalid MFA code')
data = self._get_data(request, userId)
data = self._get_data(request, userid)
if data and len(data) == 2:
validity = validity if validity is not None else 0
if validity > 0 and data[0] + datetime.timedelta(seconds=validity) < sql_now():
# if it is no more valid, raise an error
# Remove stored code and raise error
self._remove_data(request, userId)
self._remove_data(request, userid)
raise exceptions.auth.MFAError('MFA Code expired')
# Check if the code is valid
if data[1] == code:
# Code is valid, remove it from storage
self._remove_data(request, userId)
self._remove_data(request, userid)
return
except Exception as e:
# Any error means invalid code
@ -395,7 +395,7 @@ class MFA(Module):
def reset_data(
self,
userId: str,
userid: str,
) -> None:
"""
This method allows to reset the MFA state of an user.

View File

@ -190,7 +190,7 @@ class EmailMFA(mfas.MFA):
# now check from email and to email
self.from_email.value = validators.validate_email(self.from_email.value)
def html(self, request: 'ExtendedHttpRequest', userId: str, username: str) -> str:
def html(self, request: 'ExtendedHttpRequest', userid: str, username: str) -> str:
return gettext('Check your mail. You will receive an email with the verification code')
def allow_login_without_identifier(self, request: 'ExtendedHttpRequest') -> typing.Optional[bool]:
@ -235,7 +235,7 @@ class EmailMFA(mfas.MFA):
def send_code(
self,
request: 'ExtendedHttpRequest',
userId: str,
userid: str,
username: str,
identifier: str,
code: str,
@ -290,7 +290,7 @@ class EmailMFA(mfas.MFA):
def process(
self,
request: 'ExtendedHttpRequest',
userId: str,
userid: str,
username: str,
identifier: str,
validity: int | None = None,
@ -298,4 +298,4 @@ class EmailMFA(mfas.MFA):
# if ip allowed to skip mfa, return allowed
if mfas.LoginAllowed.check_ip_allowed(request, self.allow_skip_mfa_from_networks.value):
return mfas.MFA.RESULT.ALLOWED
return super().process(request, userId, username, identifier, validity)
return super().process(request, userid, username, identifier, validity)

View File

@ -131,7 +131,7 @@ class RadiusOTP(mfas.MFA):
def label(self) -> str:
return gettext('OTP Code')
def html(self, request: 'ExtendedHttpRequest', userId: str, username: str) -> str:
def html(self, request: 'ExtendedHttpRequest', userid: str, username: str) -> str:
'''
ToDo:
- Maybe create a field in mfa definition to edit from admin panel ?
@ -142,7 +142,7 @@ class RadiusOTP(mfas.MFA):
def process(
self,
request: 'ExtendedHttpRequest',
userId: str,
userid: str,
username: str,
identifier: str,
validity: typing.Optional[int] = None,
@ -159,6 +159,8 @@ class RadiusOTP(mfas.MFA):
if self.all_users_otp.value:
return mfas.MFA.RESULT.OK
username = identifier or username
web_pwd = web_password(request)
try:
connection = self.radius_client()
@ -203,7 +205,7 @@ class RadiusOTP(mfas.MFA):
def validate(
self,
request: 'ExtendedHttpRequest',
userId: str,
userid: str,
username: str,
identifier: str,
code: str,
@ -218,6 +220,7 @@ class RadiusOTP(mfas.MFA):
regenerate a new State after a wrong sent OTP code
slightly less efficient but a lot simpler
'''
username = identifier or username
try:
err = _('Invalid OTP code')

View File

@ -253,7 +253,7 @@ class SMSMFA(mfas.MFA):
def build_sms_url(
self,
userId: str, # pylint: disable=unused-argument
userid: str, # pylint: disable=unused-argument
userName: str,
code: str,
phone: str,
@ -304,7 +304,7 @@ class SMSMFA(mfas.MFA):
def process(
self,
request: 'ExtendedHttpRequest',
userId: str,
userid: str,
username: str,
identifier: str,
validity: typing.Optional[int] = None,
@ -313,7 +313,7 @@ class SMSMFA(mfas.MFA):
if mfas.LoginAllowed.check_ip_allowed(request, self.allow_skip_mfa_from_networks.value):
return mfas.MFA.RESULT.ALLOWED
return super().process(request, userId, username, identifier, validity)
return super().process(request, userid, username, identifier, validity)
def process_response(self, request: 'ExtendedHttpRequest', response: requests.Response) -> mfas.MFA.RESULT:
logger.debug('Response: %s', response)
@ -347,7 +347,7 @@ class SMSMFA(mfas.MFA):
def _build_data(
self,
request: 'ExtendedHttpRequest', # pylint: disable=unused-argument
userId: str, # pylint: disable=unused-argument
userid: str, # pylint: disable=unused-argument
username: str,
url: str, # pylint: disable=unused-argument
code: str,
@ -367,7 +367,7 @@ class SMSMFA(mfas.MFA):
def _send_sms_using_get(
self,
request: 'ExtendedHttpRequest',
userId: str, # pylint: disable=unused-argument
userid: str, # pylint: disable=unused-argument
username: str, # pylint: disable=unused-argument
url: str,
) -> mfas.MFA.RESULT:
@ -376,7 +376,7 @@ class SMSMFA(mfas.MFA):
def _send_sms_using_post(
self,
request: 'ExtendedHttpRequest',
userId: str,
userid: str,
username: str,
url: str,
code: str,
@ -384,7 +384,7 @@ class SMSMFA(mfas.MFA):
) -> mfas.MFA.RESULT:
# Compose POST data
session = self.get_session()
bdata = self._build_data(request, userId, username, url, code, phone)
bdata = self._build_data(request, userid, username, url, code, phone)
# Add content-length header
session.headers['Content-Length'] = str(len(bdata))
@ -393,52 +393,52 @@ class SMSMFA(mfas.MFA):
def _send_sms_using_put(
self,
request: 'ExtendedHttpRequest',
userId: str,
userid: str,
username: str,
url: str,
code: str,
phone: str,
) -> mfas.MFA.RESULT:
# Compose POST data
bdata = self._build_data(request, userId, username, url, code, phone)
bdata = self._build_data(request, userid, username, url, code, phone)
return self.process_response(request, self.get_session().put(url, data=bdata))
def _send_sms(
self,
request: 'ExtendedHttpRequest',
userId: str,
userid: str,
username: str,
code: str,
phone: str,
) -> mfas.MFA.RESULT:
url = self.build_sms_url(userId, username, code, phone)
url = self.build_sms_url(userid, username, code, phone)
if self.http_method.value == 'GET':
return self._send_sms_using_get(request, userId, username, url)
return self._send_sms_using_get(request, userid, username, url)
if self.http_method.value == 'POST':
return self._send_sms_using_post(request, userId, username, url, code, phone)
return self._send_sms_using_post(request, userid, username, url, code, phone)
if self.http_method.value == 'PUT':
return self._send_sms_using_put(request, userId, username, url, code, phone)
return self._send_sms_using_put(request, userid, username, url, code, phone)
raise Exception('Unknown SMS sending method')
def label(self) -> str:
return gettext('MFA Code')
def html(self, request: 'ExtendedHttpRequest', userId: str, username: str) -> str:
def html(self, request: 'ExtendedHttpRequest', userid: str, username: str) -> str:
return gettext('Check your phone. You will receive an SMS with the verification code')
def send_code(
self,
request: 'ExtendedHttpRequest',
userId: str,
userid: str,
username: str,
identifier: str,
code: str,
) -> mfas.MFA.RESULT:
logger.debug(
'Sending SMS code "%s" for user %s (userId="%s", identifier="%s")',
'Sending SMS code "%s" for user %s (userid="%s", identifier="%s")',
code,
username,
userId,
userid,
identifier,
)
return self._send_sms(request, userId, username, code, identifier)
return self._send_sms(request, userid, username, code, identifier)

View File

@ -64,7 +64,7 @@ class SampleMFA(mfas.MFA):
return 'Code is in log'
def send_code(
self, request: 'ExtendedHttpRequest', userId: str, username: str, identifier: str, code: str
self, request: 'ExtendedHttpRequest', userid: str, username: str, identifier: str, code: str
) -> mfas.MFA.RESULT:
logger.debug('Sending code: %s (from %s)', code, request.ip)
return mfas.MFA.RESULT.OK

View File

@ -109,37 +109,37 @@ class TOTP_MFA(mfas.MFA):
def label(self) -> str:
return gettext('Authentication Code')
def _user_data(self, userId: str) -> tuple[str, bool]:
def _user_data(self, userid: str) -> tuple[str, bool]:
# Get data from storage related to this user
# Data contains the secret and if the user has already logged in already some time
# so we show the QR code only once
data: typing.Optional[tuple[str, bool]] = self.storage.read_pickled(userId)
data: typing.Optional[tuple[str, bool]] = self.storage.read_pickled(userid)
if data is None:
data = (pyotp.random_base32(), False)
self._save_user_data(userId, data)
self._save_user_data(userid, data)
return data
def _save_user_data(self, userId: str, data: tuple[str, bool]) -> None:
self.storage.save_pickled(userId, data)
def _save_user_data(self, userid: str, data: tuple[str, bool]) -> None:
self.storage.save_pickled(userid, data)
def _remove_user_data(self, userId: str) -> None:
self.storage.remove(userId)
def _remove_user_data(self, userid: str) -> None:
self.storage.remove(userid)
def get_totp(self, userId: str, username: str) -> pyotp.TOTP:
def get_totp(self, userid: str, username: str) -> pyotp.TOTP:
return pyotp.TOTP(
self._user_data(userId)[0],
self._user_data(userid)[0],
issuer=self.issuer.value,
name=username,
interval=TOTP_INTERVAL,
)
def html(self, request: 'ExtendedHttpRequest', userId: str, username: str) -> str:
def html(self, request: 'ExtendedHttpRequest', userid: str, username: str) -> str:
# Get data from storage related to this user
qrShown = self._user_data(userId)[1]
qrShown = self._user_data(userid)[1]
if qrShown:
return _('Enter your authentication code')
# Compose the QR code from provisioning URI
totp = self.get_totp(userId, username)
totp = self.get_totp(userid, username)
uri = totp.provisioning_uri()
img: bytes = qrcode.make(uri) # pyright: ignore
imgByteStream = io.BytesIO()
@ -161,7 +161,7 @@ class TOTP_MFA(mfas.MFA):
def process(
self,
request: 'ExtendedHttpRequest',
userId: str,
userid: str,
username: str,
identifier: str,
validity: typing.Optional[int] = None,
@ -175,7 +175,7 @@ class TOTP_MFA(mfas.MFA):
def validate(
self,
request: 'ExtendedHttpRequest',
userId: str,
userid: str,
username: str,
identifier: str,
code: str,
@ -184,22 +184,22 @@ class TOTP_MFA(mfas.MFA):
if self.ask_for_otp(request) is False:
return
if self.cache.get(userId + code) is not None:
if self.cache.get(userid + code) is not None:
raise exceptions.auth.MFAError(gettext('Code is already used. Wait a minute and try again.'))
# Get data from storage related to this user
secret, qrShown = self._user_data(userId)
secret, qrShown = self._user_data(userid)
# Validate code
if not self.get_totp(userId, username).verify(
if not self.get_totp(userid, username).verify(
code, valid_window=self.valid_window.as_int(), for_time=sql_now()
):
raise exceptions.auth.MFAError(gettext('Invalid code'))
self.cache.put(userId + code, True, self.valid_window.as_int() * (TOTP_INTERVAL + 1))
self.cache.put(userid + code, True, self.valid_window.as_int() * (TOTP_INTERVAL + 1))
if qrShown is False:
self._save_user_data(userId, (secret, True)) # Update user data to show QR code only once
self._save_user_data(userid, (secret, True)) # Update user data to show QR code only once
def reset_data(self, userId: str) -> None:
self._remove_user_data(userId)
def reset_data(self, userid: str) -> None:
self._remove_user_data(userid)

File diff suppressed because one or more lines are too long

View File

@ -109,6 +109,6 @@
</svg>
</div>
</uds-root>
<script src="/uds/res/modern/polyfills.js" type="module" crossorigin="anonymous" integrity="sha384-kneAlH1vIzldsZkT6EK31Eum/UWMQmkQ8TorFG7mTYEkxfYsv0BPE75Ib40rzLtG"></script><script src="/uds/res/modern/scripts.js" defer crossorigin="anonymous" integrity="sha384-gJ6CPuwXlJNL6wOYMLzD98cFi988rpY6Ln6S+UhAkZs84+MOQ+ws+0qgV4WicE5i"></script><script src="/uds/res/modern/main.js" type="module" crossorigin="anonymous" integrity="sha384-mIxKGVvOc7kh1cBRR41a0yUcX0sIXqgkcnNHRmDyX2zV93h5XZtV2fBNGO8F1ntY"></script></body>
<script src="/uds/res/modern/polyfills.js" type="module" crossorigin="anonymous" integrity="sha384-kneAlH1vIzldsZkT6EK31Eum/UWMQmkQ8TorFG7mTYEkxfYsv0BPE75Ib40rzLtG"></script><script src="/uds/res/modern/scripts.js" defer crossorigin="anonymous" integrity="sha384-gJ6CPuwXlJNL6wOYMLzD98cFi988rpY6Ln6S+UhAkZs84+MOQ+ws+0qgV4WicE5i"></script><script src="/uds/res/modern/main.js" type="module" crossorigin="anonymous" integrity="sha384-MUw7BkU/Q+zd/6FAjuNRIWtxrk+O983TyRBForFApkLWJehL3NqOMpTMGS0PrLV0"></script></body>
</html>