1
0
mirror of https://github.com/dkmstr/openuds.git synced 2025-03-13 08:58:35 +03:00

Added Google Auth MFA

This commit is contained in:
Adolfo Gómez García 2023-02-23 16:44:47 +01:00
parent ac49786492
commit 52096b1eff
No known key found for this signature in database
GPG Key ID: DD1ABF20724CDA23
8 changed files with 281 additions and 32 deletions

View File

@ -82,10 +82,12 @@ class MFA(Module):
# : Cache time for the generated MFA code
# : this means that the code will be valid for this time, and will not
# : be resent to the user until the time expires.
# : This value is in minutes
# : This value is in second
# : Note: This value is used by default "process" methos, but you can
# : override it in your own implementation.
cacheTime: typing.ClassVar[int] = 5
# : Note: This value is only used in "validity" method, that is also overridable
# : by your own implementation, so its up to you to use it or not.
cacheTime: typing.ClassVar[int] = 5*60
class RESULT(enum.IntEnum):
"""
@ -122,10 +124,17 @@ class MFA(Module):
"""
return 'MFA Code'
def html(self, request: 'ExtendedHttpRequest') -> str:
def html(self, userId: str, request: 'ExtendedHttpRequest', username: str) -> str:
"""
This method will be invoked from the MFA form, to know the HTML that will be presented
to the user below the MFA code form.
Args:
userId: Id of the user that is requesting the MFA code
request: Request object, so you can get more information
Returns:
HTML to be presented to the user along with the MFA code form
"""
return ''
@ -137,9 +146,13 @@ class MFA(Module):
"""
return self.cacheTime
def emptyIndentifierAllowedToLogin(self, request: 'ExtendedHttpRequest') -> bool:
def emptyIndentifierAllowedToLogin(self, request: 'ExtendedHttpRequest') -> typing.Optional[bool]:
"""
If this method returns True, an user that has no "identifier" is allowed to login without MFA
Returns:
True: If an user that has no "identifier" is allowed to login without MFA
False: If an user that has no "identifier" is not allowed to login without MFA
None: Process request, let the class decide if the user is allowed to login without MFA
"""
return True
@ -214,7 +227,7 @@ class MFA(Module):
"""
# try to get the stored code
data = self._getData(request, userId)
validity = validity if validity is not None else self.validity() * 60
validity = validity if validity is not None else self.validity()
try:
if data and validity:
# if we have a stored code, check if it's still valid
@ -264,7 +277,7 @@ class MFA(Module):
data = self._getData(request, userId)
if data and len(data) == 2:
validity = validity if validity is not None else self.validity() * 60
validity = validity if validity is not None else self.validity()
if (
validity > 0
and data[0] + datetime.timedelta(seconds=validity)

View File

@ -181,7 +181,8 @@ class EmailMFA(mfas.MFA):
# now check from email and to email
self.fromEmail.value = validators.validateEmail(self.fromEmail.value)
def html(self, request: 'ExtendedHttpRequest') -> str:
def html(self, userId: str, request: 'ExtendedHttpRequest', username: str) -> str:
return gettext('Check your mail. You will receive an email with the verification code')
@classmethod
@ -208,7 +209,7 @@ class EmailMFA(mfas.MFA):
else:
return False
def emptyIndentifierAllowedToLogin(self, request: 'ExtendedHttpRequest') -> bool:
def emptyIndentifierAllowedToLogin(self, request: 'ExtendedHttpRequest') -> typing.Optional[bool]:
return self.checkAction(self.allowLoginWithoutMFA.value, request)
def label(self) -> str:

View File

@ -60,7 +60,7 @@ class RadiusOTP(mfas.MFA):
typeType = 'RadiusOTP'
typeDescription = _('Radius OTP Challenge')
iconFile = 'radius.png'
cacheTime = 1 # In this MFA type there are not code generation nor sending... so ? 1 minute or too short ?
cacheTime = 1*60 # In this MFA type there are not code generation nor sending... so ? 1 minute or too short ?
server = gui.TextField(
length=64,
@ -187,13 +187,13 @@ class RadiusOTP(mfas.MFA):
return mfas.MFA.RESULT.OK
raise Exception('User not allowed to login')
def emptyIndentifierAllowedToLogin(self, request: 'ExtendedHttpRequest') -> bool:
def emptyIndentifierAllowedToLogin(self, request: 'ExtendedHttpRequest') -> typing.Optional[bool]:
return self.checkAction(self.allowLoginWithoutMFA.value, request)
def label(self) -> str:
return gettext('OTP Code')
def html(self, request: 'ExtendedHttpRequest') -> str:
def html(self, userId: str, request: 'ExtendedHttpRequest', username: str) -> str:
'''
ToDo:
- Maybe create a field in mfa definition to edit from admin panel ?

View File

@ -40,7 +40,6 @@ import requests.auth
from uds import models
from uds.core import mfas
from uds.core.ui import gui
from uds.core.util import net
if typing.TYPE_CHECKING:
from uds.core.module import Module
@ -326,7 +325,7 @@ class SMSMFA(mfas.MFA):
else:
return False
def emptyIndentifierAllowedToLogin(self, request: 'ExtendedHttpRequest') -> bool:
def emptyIndentifierAllowedToLogin(self, request: 'ExtendedHttpRequest') -> typing.Optional[bool]:
return self.checkAction(self.allowLoginWithoutMFA.value, request)
def processResponse(self, request: 'ExtendedHttpRequest', response: requests.Response) -> mfas.MFA.RESULT:
@ -398,7 +397,8 @@ class SMSMFA(mfas.MFA):
def label(self) -> str:
return gettext('MFA Code')
def html(self, request: 'ExtendedHttpRequest') -> str:
def html(self, userId: str, request: 'ExtendedHttpRequest', username: str) -> str:
return gettext('Check your phone. You will receive an SMS with the verification code')
def sendCode(self, request: 'ExtendedHttpRequest', userId: str, username: str, identifier: str, code: str) -> mfas.MFA.RESULT:

View File

@ -0,0 +1 @@
from . import mfa

View File

@ -0,0 +1,232 @@
# -*- coding: utf-8 -*-
#
# Copyright (c) 2022 Virtual Cable S.L.U.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without modification,
# are permitted provided that the following conditions are met:
#
# * Redistributions of source code must retain the above copyright notice,
# this list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above copyright notice,
# this list of conditions and the following disclaimer in the documentation
# and/or other materials provided with the distribution.
# * Neither the name of Virtual Cable S.L. nor the names of its contributors
# may be used to endorse or promote products derived from this software
# without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
# DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
# SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
"""
@author: Daniel Torregrosa
"""
import typing
import logging
import io
import base64
import pyotp
import qrcode
from django.utils.translation import gettext_noop as _, gettext
from uds import models
from uds.core import mfas
from uds.core.ui import gui
from uds.core.auths import exceptions
if typing.TYPE_CHECKING:
from uds.core.module import Module
from uds.core.util.request import ExtendedHttpRequest
logger = logging.getLogger(__name__)
class TOTP_MFA(mfas.MFA):
'''
Validates OTP challenge against a proper configured Radius Server with OTP
using 'Access-Challenge' response from Radius Server [RFC2865, RFC5080]
'''
typeName = _('TOTP Based MFA')
typeType = 'TOTP_MFA'
typeDescription = _('TOTP Based MFA (Google Authenticator, etc)')
iconFile = 'totp.png'
cacheTime = 1 # In this MFA type there are not code generation nor sending... so ? 1 minute or too short ?
issuer = gui.TextField(
length=64,
label=_('Issuer'),
defvalue='UDS Authenticator',
order=1,
tooltip=_('Issuer for OTP. Once it\'s created it can\'t be changed'),
required=True,
rdonly=True, # This is not editable, as it is used to generate the QR code. Once generated, it can't be changed
)
validWindow = gui.NumericField(
length=2,
label=_('Valid Window'),
defvalue=0,
minValue=0,
maxValue=10,
order=31,
tooltip=_('Number of valid codes before and after the current one'),
required=True,
tab=_('Config'),
)
networks = gui.MultiChoiceField(
label=_('TOTP networks'),
rdonly=False,
rows=5,
order=32,
tooltip=_('Networks for TOTP authentication choices'),
required=False,
tab=_('Config'),
)
doNotAskForOTP = gui.ChoiceField(
label=_('Requre HOTP for users within networks'),
order=33,
defaultValue='0',
tooltip=_('Action for user without defined Radius Challenge'),
required=True,
values={
'0': _('Allow user login (no MFA)'),
'1': _('Require user to login with MFA'),
},
tab=_('Config'),
)
def initialize(self, values: 'Module.ValuesType') -> None:
return super().initialize(values)
@classmethod
def initClassGui(cls) -> None:
# Populate the networks list
cls.networks.setValues(
[
gui.choiceItem(v.uuid, v.name) # type: ignore
for v in models.Network.objects.all().order_by('name')
]
)
def emptyIndentifierAllowedToLogin(
self, request: 'ExtendedHttpRequest'
) -> typing.Optional[bool]:
return None
def askForOTP(self, request: 'ExtendedHttpRequest') -> bool:
"""
Check if we need to ask for OTP for a given user
Returns:
True if we need to ask for OTP
"""
def checkIp() -> bool:
return any(
i.ipInNetwork(request.ip)
for i in models.Network.objects.filter(uuid__in=self.networks.value)
)
if self.doNotAskForOTP.value == '0':
return not checkIp()
return True
def label(self) -> str:
return gettext('Authentication Code')
def _userData(self, userId: str) -> typing.Tuple[str, bool]:
# Get data from storage related to this user
# Data contains the secret and if the user has already logged in already some time
# so we show the QR code only once
data: typing.Optional[typing.Tuple[str, bool]] = self.storage.getPickle(userId)
if data is None:
data = (pyotp.random_base32(), False)
self._saveUserData(userId, data)
return data
def _saveUserData(self, userId: str, data: typing.Tuple[str, bool]) -> None:
self.storage.putPickle(userId, data)
def getTOTP(self, userId: str, username: str) -> pyotp.TOTP:
return pyotp.TOTP(
self._userData(userId)[0], issuer=self.issuer.value, name=username
)
def html(self, userId: str, request: 'ExtendedHttpRequest', username: str) -> str:
# Get data from storage related to this user
secret, qrShown = self._userData(userId)
if qrShown:
return _('Enter your authentication code')
# Compose the QR code from provisioning URI
totp = self.getTOTP(userId, username)
uri = totp.provisioning_uri()
img = qrcode.make(uri)
imgByteStream = io.BytesIO()
img.save(imgByteStream, format='PNG')
# Convert to base64 to be used in html img tag
imgByteArr = imgByteStream.getvalue()
imgData = 'data:image/png;base64,' + base64.b64encode(imgByteArr).decode(
'utf-8'
)
# Return HTML code to be shown to user
return f'''
<div style="text-align: center;">
<img src="{imgData}" alt="QR Code" />
</div>
<div style="text-align: center;">
<p>{_('Please, use your Authenticator to add your account. (i.e. Google Authenticator, Authy, ...)')}</p>
</div>
'''
def process(
self,
request: 'ExtendedHttpRequest',
userId: str,
username: str,
identifier: str,
validity: typing.Optional[int] = None,
) -> 'mfas.MFA.RESULT':
if self.askForOTP(request) is False:
return mfas.MFA.RESULT.ALLOWED
# The data is provided by an external source, so we need to process anything on the request
return mfas.MFA.RESULT.OK
def validate(
self,
request: 'ExtendedHttpRequest',
userId: str,
username: str,
identifier: str,
code: str,
validity: typing.Optional[int] = None,
) -> None:
if self.askForOTP(request) is False:
return
# Get data from storage related to this user
secret, qrShown = self._userData(userId)
# Validate code
if not self.getTOTP(userId, username).verify(
code, valid_window=self.validWindow.num()
):
raise exceptions.MFAError(gettext('Invalid code'))
if qrShown is False:
self._saveUserData(
userId, (secret, True)
) # Update user data to show QR code only once

Binary file not shown.

After

Width:  |  Height:  |  Size: 3.5 KiB

View File

@ -182,20 +182,20 @@ def mfa(request: ExtendedHttpRequest) -> HttpResponse:
if not mfaProvider:
return HttpResponseRedirect(reverse('page.index'))
userHashValue = mfas.MFA.getUserId(request.user)
mfaUserId = mfas.MFA.getUserId(request.user)
# Try to get cookie anc check it
mfaCookie = request.COOKIES.get(MFA_COOKIE_NAME, None)
if mfaCookie == userHashValue: # Cookie is valid, skip MFA setting authorization
if mfaCookie == mfaUserId: # Cookie is valid, skip MFA setting authorization
request.authorized = True
return HttpResponseRedirect(reverse('page.index'))
# Obtain MFA data
authInstance = request.user.manager.getInstance()
mfaInstance = mfaProvider.getInstance()
mfaInstance: 'mfas.MFA' = mfaProvider.getInstance()
# Get validity duration
validity = min(mfaInstance.validity(), mfaProvider.validity) * 60
validity = min(mfaInstance.validity(), mfaProvider.validity*60)
start_time = request.session.get('mfa_start_time', time.time())
# If mfa process timed out, we need to start login again
@ -207,18 +207,20 @@ def mfa(request: ExtendedHttpRequest) -> HttpResponse:
label = mfaInstance.label()
if not mfaIdentifier:
if mfaInstance.emptyIndentifierAllowedToLogin(request):
if mfaInstance.emptyIndentifierAllowedToLogin(request) == True:
# Allow login
request.authorized = True
return HttpResponseRedirect(reverse('page.index'))
# Not allowed to login, redirect to login error page
logger.warning(
'MFA identifier not found for user %s on authenticator %s. It is required by MFA %s',
request.user.name,
request.user.manager.name,
mfaProvider.name,
)
return errors.errorView(request, errors.ACCESS_DENIED)
elif mfaInstance.emptyIndentifierAllowedToLogin(request) == False:
# Not allowed to login, redirect to login error page
logger.warning(
'MFA identifier not found for user %s on authenticator %s. It is required by MFA %s',
request.user.name,
request.user.manager.name,
mfaProvider.name,
)
return errors.errorView(request, errors.ACCESS_DENIED)
# Default, do nothing, and continue
tries = request.session.get('mfa_tries', 0)
if request.method == 'POST': # User has provided MFA code
@ -228,11 +230,11 @@ def mfa(request: ExtendedHttpRequest) -> HttpResponse:
try:
mfaInstance.validate(
request,
userHashValue,
mfaUserId,
request.user.name,
mfaIdentifier,
code,
validity=validity,
validity=validity*60,
)
request.authorized = True
# Remove mfa_start_time from session
@ -247,7 +249,7 @@ def mfa(request: ExtendedHttpRequest) -> HttpResponse:
):
response.set_cookie(
MFA_COOKIE_NAME,
userHashValue,
mfaUserId,
max_age=mfaProvider.remember_device * 60 * 60,
)
@ -270,7 +272,7 @@ def mfa(request: ExtendedHttpRequest) -> HttpResponse:
try:
result = mfaInstance.process(
request,
userHashValue,
mfaUserId,
request.user.name,
mfaIdentifier,
validity=validity,
@ -298,7 +300,7 @@ def mfa(request: ExtendedHttpRequest) -> HttpResponse:
remember_device = _('{} hours').format(mfaProvider.remember_device)
# Html from MFA provider
mfaHtml = mfaInstance.html(request)
mfaHtml = mfaInstance.html(mfaUserId, request, request.user.name)
# Redirect to index, but with MFA data
request.session['mfa'] = {