1
0
mirror of https://github.com/dkmstr/openuds.git synced 2025-01-11 05:17:55 +03:00

Merge remote-tracking branch 'origin/v3.6'

This commit is contained in:
Adolfo Gómez García 2023-02-23 16:48:54 +01:00
commit 7e84d183ca
No known key found for this signature in database
GPG Key ID: DD1ABF20724CDA23
9 changed files with 278 additions and 28 deletions

View File

@ -82,10 +82,12 @@ class MFA(Module):
# : Cache time for the generated MFA code
# : this means that the code will be valid for this time, and will not
# : be resent to the user until the time expires.
# : This value is in minutes
# : This value is in second
# : Note: This value is used by default "process" methos, but you can
# : override it in your own implementation.
cacheTime: typing.ClassVar[int] = 5
# : Note: This value is only used in "validity" method, that is also overridable
# : by your own implementation, so its up to you to use it or not.
cacheTime: typing.ClassVar[int] = 5*60
class RESULT(enum.IntEnum):
"""
@ -122,10 +124,17 @@ class MFA(Module):
"""
return 'MFA Code'
def html(self, request: 'ExtendedHttpRequest') -> str:
def html(self, userId: str, request: 'ExtendedHttpRequest', username: str) -> str:
"""
This method will be invoked from the MFA form, to know the HTML that will be presented
to the user below the MFA code form.
Args:
userId: Id of the user that is requesting the MFA code
request: Request object, so you can get more information
Returns:
HTML to be presented to the user along with the MFA code form
"""
return ''
@ -137,9 +146,13 @@ class MFA(Module):
"""
return self.cacheTime
def emptyIndentifierAllowedToLogin(self, request: 'ExtendedHttpRequest') -> bool:
def emptyIndentifierAllowedToLogin(self, request: 'ExtendedHttpRequest') -> typing.Optional[bool]:
"""
If this method returns True, an user that has no "identifier" is allowed to login without MFA
Returns:
True: If an user that has no "identifier" is allowed to login without MFA
False: If an user that has no "identifier" is not allowed to login without MFA
None: Process request, let the class decide if the user is allowed to login without MFA
"""
return True
@ -214,7 +227,7 @@ class MFA(Module):
"""
# try to get the stored code
data = self._getData(request, userId)
validity = validity if validity is not None else self.validity() * 60
validity = validity if validity is not None else self.validity()
try:
if data and validity:
# if we have a stored code, check if it's still valid
@ -264,7 +277,7 @@ class MFA(Module):
data = self._getData(request, userId)
if data and len(data) == 2:
validity = validity if validity is not None else self.validity() * 60
validity = validity if validity is not None else self.validity()
if (
validity > 0
and data[0] + datetime.timedelta(seconds=validity)
@ -286,16 +299,14 @@ class MFA(Module):
raise exceptions.MFAError(err)
def reset_data(
def resetData(
self,
request: 'ExtendedHttpRequest',
userId: str,
) -> None:
"""
This method allows to reset the MFA state of an user.
Normally, this will do nothing, but for persistent MFA data (as Google Authenticator), this will remove the data.
"""
pass
@staticmethod

View File

@ -211,7 +211,7 @@ class EmailMFA(mfas.MFA):
else:
return False
def emptyIndentifierAllowedToLogin(self, request: 'ExtendedHttpRequest') -> bool:
def emptyIndentifierAllowedToLogin(self, request: 'ExtendedHttpRequest') -> typing.Optional[bool]:
return self.checkAction(self.allowLoginWithoutMFA.value, request)
def label(self) -> str:

View File

@ -60,7 +60,7 @@ class RadiusOTP(mfas.MFA):
typeType = 'RadiusOTP'
typeDescription = _('Radius OTP Challenge')
iconFile = 'radius.png'
cacheTime = 1 # In this MFA type there are not code generation nor sending... so ? 1 minute or too short ?
cacheTime = 1*60 # In this MFA type there are not code generation nor sending... so ? 1 minute or too short ?
server = gui.TextField(
length=64,
@ -186,13 +186,13 @@ class RadiusOTP(mfas.MFA):
return mfas.MFA.RESULT.OK
raise Exception('User not allowed to login')
def emptyIndentifierAllowedToLogin(self, request: 'ExtendedHttpRequest') -> bool:
def emptyIndentifierAllowedToLogin(self, request: 'ExtendedHttpRequest') -> typing.Optional[bool]:
return self.checkAction(self.allowLoginWithoutMFA.value, request)
def label(self) -> str:
return gettext('OTP Code')
def html(self, request: 'ExtendedHttpRequest') -> str:
def html(self, userId: str, request: 'ExtendedHttpRequest', username: str) -> str:
'''
ToDo:
- Maybe create a field in mfa definition to edit from admin panel ?

View File

@ -40,7 +40,6 @@ import requests.auth
from uds import models
from uds.core import mfas
from uds.core.ui import gui
from uds.core.util import net
if typing.TYPE_CHECKING:
from uds.core.module import Module
@ -329,7 +328,7 @@ class SMSMFA(mfas.MFA):
else:
return False
def emptyIndentifierAllowedToLogin(self, request: 'ExtendedHttpRequest') -> bool:
def emptyIndentifierAllowedToLogin(self, request: 'ExtendedHttpRequest') -> typing.Optional[bool]:
return self.checkAction(self.allowLoginWithoutMFA.value, request)
def processResponse(

View File

@ -0,0 +1 @@
from . import mfa

View File

@ -0,0 +1,232 @@
# -*- coding: utf-8 -*-
#
# Copyright (c) 2022 Virtual Cable S.L.U.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without modification,
# are permitted provided that the following conditions are met:
#
# * Redistributions of source code must retain the above copyright notice,
# this list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above copyright notice,
# this list of conditions and the following disclaimer in the documentation
# and/or other materials provided with the distribution.
# * Neither the name of Virtual Cable S.L. nor the names of its contributors
# may be used to endorse or promote products derived from this software
# without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
# DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
# SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
"""
@author: Daniel Torregrosa
"""
import typing
import logging
import io
import base64
import pyotp
import qrcode
from django.utils.translation import gettext_noop as _, gettext
from uds import models
from uds.core import mfas
from uds.core.ui import gui
from uds.core.auths import exceptions
if typing.TYPE_CHECKING:
from uds.core.module import Module
from uds.core.util.request import ExtendedHttpRequest
logger = logging.getLogger(__name__)
class TOTP_MFA(mfas.MFA):
'''
Validates OTP challenge against a proper configured Radius Server with OTP
using 'Access-Challenge' response from Radius Server [RFC2865, RFC5080]
'''
typeName = _('TOTP Based MFA')
typeType = 'TOTP_MFA'
typeDescription = _('TOTP Based MFA (Google Authenticator, etc)')
iconFile = 'totp.png'
cacheTime = 1 # In this MFA type there are not code generation nor sending... so ? 1 minute or too short ?
issuer = gui.TextField(
length=64,
label=_('Issuer'),
defvalue='UDS Authenticator',
order=1,
tooltip=_('Issuer for OTP. Once it\'s created it can\'t be changed'),
required=True,
rdonly=True, # This is not editable, as it is used to generate the QR code. Once generated, it can't be changed
)
validWindow = gui.NumericField(
length=2,
label=_('Valid Window'),
defvalue=0,
minValue=0,
maxValue=10,
order=31,
tooltip=_('Number of valid codes before and after the current one'),
required=True,
tab=_('Config'),
)
networks = gui.MultiChoiceField(
label=_('TOTP networks'),
rdonly=False,
rows=5,
order=32,
tooltip=_('Networks for TOTP authentication choices'),
required=False,
tab=_('Config'),
)
doNotAskForOTP = gui.ChoiceField(
label=_('Requre HOTP for users within networks'),
order=33,
defaultValue='0',
tooltip=_('Action for user without defined Radius Challenge'),
required=True,
values={
'0': _('Allow user login (no MFA)'),
'1': _('Require user to login with MFA'),
},
tab=_('Config'),
)
def initialize(self, values: 'Module.ValuesType') -> None:
return super().initialize(values)
@classmethod
def initClassGui(cls) -> None:
# Populate the networks list
cls.networks.setValues(
[
gui.choiceItem(v.uuid, v.name) # type: ignore
for v in models.Network.objects.all().order_by('name')
]
)
def emptyIndentifierAllowedToLogin(
self, request: 'ExtendedHttpRequest'
) -> typing.Optional[bool]:
return None
def askForOTP(self, request: 'ExtendedHttpRequest') -> bool:
"""
Check if we need to ask for OTP for a given user
Returns:
True if we need to ask for OTP
"""
def checkIp() -> bool:
return any(
i.ipInNetwork(request.ip)
for i in models.Network.objects.filter(uuid__in=self.networks.value)
)
if self.doNotAskForOTP.value == '0':
return not checkIp()
return True
def label(self) -> str:
return gettext('Authentication Code')
def _userData(self, userId: str) -> typing.Tuple[str, bool]:
# Get data from storage related to this user
# Data contains the secret and if the user has already logged in already some time
# so we show the QR code only once
data: typing.Optional[typing.Tuple[str, bool]] = self.storage.getPickle(userId)
if data is None:
data = (pyotp.random_base32(), False)
self._saveUserData(userId, data)
return data
def _saveUserData(self, userId: str, data: typing.Tuple[str, bool]) -> None:
self.storage.putPickle(userId, data)
def getTOTP(self, userId: str, username: str) -> pyotp.TOTP:
return pyotp.TOTP(
self._userData(userId)[0], issuer=self.issuer.value, name=username
)
def html(self, userId: str, request: 'ExtendedHttpRequest', username: str) -> str:
# Get data from storage related to this user
secret, qrShown = self._userData(userId)
if qrShown:
return _('Enter your authentication code')
# Compose the QR code from provisioning URI
totp = self.getTOTP(userId, username)
uri = totp.provisioning_uri()
img = qrcode.make(uri)
imgByteStream = io.BytesIO()
img.save(imgByteStream, format='PNG')
# Convert to base64 to be used in html img tag
imgByteArr = imgByteStream.getvalue()
imgData = 'data:image/png;base64,' + base64.b64encode(imgByteArr).decode(
'utf-8'
)
# Return HTML code to be shown to user
return f'''
<div style="text-align: center;">
<img src="{imgData}" alt="QR Code" />
</div>
<div style="text-align: center;">
<p>{_('Please, use your Authenticator to add your account. (i.e. Google Authenticator, Authy, ...)')}</p>
</div>
'''
def process(
self,
request: 'ExtendedHttpRequest',
userId: str,
username: str,
identifier: str,
validity: typing.Optional[int] = None,
) -> 'mfas.MFA.RESULT':
if self.askForOTP(request) is False:
return mfas.MFA.RESULT.ALLOWED
# The data is provided by an external source, so we need to process anything on the request
return mfas.MFA.RESULT.OK
def validate(
self,
request: 'ExtendedHttpRequest',
userId: str,
username: str,
identifier: str,
code: str,
validity: typing.Optional[int] = None,
) -> None:
if self.askForOTP(request) is False:
return
# Get data from storage related to this user
secret, qrShown = self._userData(userId)
# Validate code
if not self.getTOTP(userId, username).verify(
code, valid_window=self.validWindow.num()
):
raise exceptions.MFAError(gettext('Invalid code'))
if qrShown is False:
self._saveUserData(
userId, (secret, True)
) # Update user data to show QR code only once

Binary file not shown.

After

Width:  |  Height:  |  Size: 3.5 KiB

View File

@ -223,6 +223,11 @@ class User(UUIDModel):
# first, we invoke removeUser. If this raises an exception, user will not
# be removed
toDelete.getManager().removeUser(toDelete.name)
# If has mfa, remove related data
if toDelete.manager.mfa:
toDelete.manager.mfa.getInstance().resetData(toDelete)
# Remove related stored values
with storage.StorageAccess('manager' + str(toDelete.manager.uuid)) as store:
for key in store.keys():

View File

@ -191,10 +191,10 @@ def mfa(request: ExtendedHttpRequest) -> HttpResponse:
# Obtain MFA data
authInstance = request.user.manager.getInstance()
mfaInstance = mfaProvider.getInstance()
mfaInstance: 'mfas.MFA' = mfaProvider.getInstance()
# Get validity duration
validity = min(mfaInstance.validity(), mfaProvider.validity) * 60
validity = min(mfaInstance.validity(), mfaProvider.validity*60)
start_time = request.session.get('mfa_start_time', time.time())
# If mfa process timed out, we need to start login again
@ -206,10 +206,11 @@ def mfa(request: ExtendedHttpRequest) -> HttpResponse:
label = mfaInstance.label()
if not mfaIdentifier:
if mfaInstance.emptyIndentifierAllowedToLogin(request):
if mfaInstance.emptyIndentifierAllowedToLogin(request) == True:
# Allow login
request.authorized = True
return HttpResponseRedirect(reverse('page.index'))
elif mfaInstance.emptyIndentifierAllowedToLogin(request) == False:
# Not allowed to login, redirect to login error page
logger.warning(
'MFA identifier not found for user %s on authenticator %s. It is required by MFA %s',
@ -218,6 +219,7 @@ def mfa(request: ExtendedHttpRequest) -> HttpResponse:
mfaProvider.name,
)
return errors.errorView(request, errors.ACCESS_DENIED)
# Default, do nothing, and continue
tries = request.session.get('mfa_tries', 0)
if request.method == 'POST': # User has provided MFA code
@ -231,7 +233,7 @@ def mfa(request: ExtendedHttpRequest) -> HttpResponse:
request.user.name,
mfaIdentifier,
code,
validity=validity,
validity=validity*60,
)
request.authorized = True
# Remove mfa_start_time from session
@ -297,7 +299,7 @@ def mfa(request: ExtendedHttpRequest) -> HttpResponse:
remember_device = _('{} hours').format(mfaProvider.remember_device)
# Html from MFA provider
mfaHtml = mfaInstance.html(request)
mfaHtml = mfaInstance.html(mfaUserId, request, request.user.name)
# Redirect to index, but with MFA data
request.session['mfa'] = {