1
0
mirror of https://github.com/dkmstr/openuds.git synced 2025-03-13 08:58:35 +03:00

Some minor MFA fixes

This commit is contained in:
Adolfo Gómez García 2023-02-23 02:57:37 +01:00
parent 0aaa734030
commit 762c0e5392
No known key found for this signature in database
GPG Key ID: DD1ABF20724CDA23
3 changed files with 115 additions and 23 deletions

View File

@ -33,11 +33,12 @@
import datetime
import random
import enum
import hashlib
import logging
import typing
from django.utils.translation import ugettext_noop as _
from uds.models import getSqlDatetime
from uds import models
from uds.core import Module
from uds.core.auths import exceptions
@ -47,6 +48,7 @@ if typing.TYPE_CHECKING:
logger = logging.getLogger(__name__)
class MFA(Module):
"""
this class provides an abstraction of a Multi Factor Authentication
@ -78,7 +80,7 @@ class MFA(Module):
iconFile: typing.ClassVar[str] = 'mfa.png'
# : Cache time for the generated MFA code
# : this means that the code will be valid for this time, and will not
# : this means that the code will be valid for this time, and will not
# : be resent to the user until the time expires.
# : This value is in minutes
# : Note: This value is used by default "process" methos, but you can
@ -89,6 +91,7 @@ class MFA(Module):
"""
This enum is used to know if the MFA code was sent or not.
"""
OK = 1
ALLOWED = 2
@ -140,17 +143,54 @@ class MFA(Module):
"""
return True
def sendCode(self, request: 'ExtendedHttpRequest', userId: str, username: str, identifier: str, code: str) -> 'MFA.RESULT':
def sendCode(
self,
request: 'ExtendedHttpRequest',
userId: str,
username: str,
identifier: str,
code: str,
) -> 'MFA.RESULT':
"""
This method will be invoked from "process" method, to send the MFA code to the user.
If returns MFA.RESULT.OK, the MFA code was sent.
If returns MFA.RESULT.ALLOW, the MFA code was not sent, the user does not need to enter the MFA code.
If raises an error, the MFA code was not sent, and the user needs to enter the MFA code.
"""
raise NotImplementedError('sendCode method not implemented')
def process(self, request: 'ExtendedHttpRequest', userId: str, username: str, identifier: str, validity: typing.Optional[int] = None) -> 'MFA.RESULT':
def _getData(
self, request: 'ExtendedHttpRequest', userId: str
) -> typing.Optional[typing.Tuple[datetime.datetime, str]]:
"""
Internal method to get the data from storage
"""
storageKey = request.ip + userId
return self.storage.getPickle(storageKey)
def _removeData(self, request: 'ExtendedHttpRequest', userId: str) -> None:
"""
Internal method to remove the data from storage
"""
storageKey = request.ip + userId
self.storage.remove(storageKey)
def _putData(self, request: 'ExtendedHttpRequest', userId: str, code: str) -> None:
"""
Internal method to put the data into storage
"""
storageKey = request.ip + userId
self.storage.putPickle(storageKey, (models.getSqlDatetime(), code))
def process(
self,
request: 'ExtendedHttpRequest',
userId: str,
username: str,
identifier: str,
validity: typing.Optional[int] = None,
) -> 'MFA.RESULT':
"""
This method will be invoked from the MFA form, to send the MFA code to the user.
The identifier where to send the code, will be obtained from "mfaIdentifier" method.
@ -159,57 +199,111 @@ class MFA(Module):
If returns MFA.RESULT.OK, the MFA code was sent.
If returns MFA.RESULT.ALLOW, the MFA code was not sent, the user does not need to enter the MFA code.
If raises an error, the MFA code was not sent, and the user needs to enter the MFA code.
Args:
request: The request object
userId: An unique, non authenticator dependant, id for the user (at this time, it's sha3_256 of user + authenticator)
username: The user name, the one used to login
identifier: The identifier where to send the code (phone, email, etc)
validity: The validity of the code in seconds. If None, the default value will be used.
Returns:
MFA.RESULT.OK if the code was already sent
MFA.RESULT.ALLOW if the user does not need to enter the MFA code (i.e. fail to send the code)
Raises an error if the code was not sent and was required to be sent
"""
# try to get the stored code
storageKey = request.ip + userId
data: typing.Any = self.storage.getPickle(storageKey)
data = self._getData(request, userId)
validity = validity if validity is not None else self.validity() * 60
try:
if data and validity:
# if we have a stored code, check if it's still valid
if data[0] + datetime.timedelta(seconds=validity) > getSqlDatetime():
if data[0] + datetime.timedelta(seconds=validity) > models.getSqlDatetime():
# if it's still valid, just return without sending a new one
return MFA.RESULT.OK
except Exception:
# if we have a problem, just remove the stored code
self.storage.remove(storageKey)
self._removeData(request, userId)
# Generate a 6 digit code (0-9)
code = ''.join(random.SystemRandom().choices('0123456789', k=6))
logger.debug('Generated OTP is %s', code)
# Store the code in the database, own storage space
self.storage.putPickle(storageKey, (getSqlDatetime(), code))
self._putData(request, userId, code)
# Send the code to the user
return self.sendCode(request, userId, username, identifier, code)
def validate(self, request: 'ExtendedHttpRequest', userId: str, username: str, identifier: str, code: str, validity: typing.Optional[int] = None) -> None:
def validate(
self,
request: 'ExtendedHttpRequest',
userId: str,
username: str,
identifier: str,
code: str,
validity: typing.Optional[int] = None,
) -> None:
"""
If this method is provided by an authenticator, the user will be allowed to enter a MFA code
You must raise an "exceptions.MFAError" if the code is not valid.
Args:
request: The request object
userId: An unique, non authenticator dependant, id for the user (at this time, it's sha3_256 of user + authenticator)
username: The user name, the one used to login
identifier: The identifier where to send the code (phone, email, etc)
code: The code entered by the user
validity: The validity of the code in seconds. If None, the default value will be used.
Returns:
None if the code is valid
Raises an error if the code is not valid ("exceptions.MFAError")
"""
# Validate the code
try:
err = _('Invalid MFA code')
storageKey = request.ip + userId
data = self.storage.getPickle(storageKey)
data = self._getData(request, userId)
if data and len(data) == 2:
validity = validity if validity is not None else self.validity() * 60
if validity > 0 and data[0] + datetime.timedelta(seconds=validity) < getSqlDatetime():
if (
validity > 0
and data[0] + datetime.timedelta(seconds=validity)
< models.getSqlDatetime()
):
# if it is no more valid, raise an error
# Remove stored code and raise error
self.storage.remove(storageKey)
self._removeData(request, userId)
raise exceptions.MFAError('MFA Code expired')
# Check if the code is valid
if data[1] == code:
# Code is valid, remove it from storage
self.storage.remove(storageKey)
self._removeData(request, userId)
return
except Exception as e:
# Any error means invalid code
err = str(e)
raise exceptions.MFAError(err)
def reset_data(
self,
request: 'ExtendedHttpRequest',
userId: str,
) -> None:
"""
This method allows to reset the MFA state of an user.
Normally, this will do nothing, but for persistent MFA data (as Google Authenticator), this will remove the data.
"""
pass
@staticmethod
def getUserId(user: models.User) -> str:
mfa = user.manager.mfa
if not mfa:
raise exceptions.MFAError('MFA is not enabled')
return hashlib.sha3_256(
(user.name + (user.uuid or '') + mfa.uuid).encode()
).hexdigest()

View File

@ -236,6 +236,7 @@ class EmailMFA(mfas.MFA):
raise
def sendCode(self, request: 'ExtendedHttpRequest', userId: str, username: str, identifier: str, code: str) -> mfas.MFA.RESULT:
# If failed to send the code, we will raise the exception
self.doSendCode(request, identifier, code,)
return mfas.MFA.RESULT.OK

View File

@ -30,7 +30,6 @@
"""
import time
import logging
import hashlib
import typing
import json
@ -182,9 +181,7 @@ def mfa(request: ExtendedHttpRequest) -> HttpResponse:
if not mfaProvider:
return HttpResponseRedirect(reverse('page.index'))
userHashValue: str = hashlib.sha3_256(
(request.user.name + (request.user.uuid or '') + mfaProvider.uuid).encode()
).hexdigest()
userHashValue = mfas.MFA.getUserId(request.user)
# Try to get cookie anc check it
mfaCookie = request.COOKIES.get(MFA_COOKIE_NAME, None)