Merge remote-tracking branch 'origin/v3.5-mfa'

This commit is contained in:
Adolfo Gómez García 2022-06-29 23:22:27 +02:00
commit 091a834074
7 changed files with 224 additions and 91 deletions

View File

@ -49,7 +49,7 @@ logger = logging.getLogger(__name__)
class MFA(ModelHandler):
model = models.MFA
save_fields = ['name', 'comments', 'tags', 'remember_device']
save_fields = ['name', 'comments', 'tags', 'remember_device', 'validity']
table_title = _('Multi Factor Authentication')
table_fields = [

View File

@ -32,6 +32,7 @@
"""
import datetime
import random
import enum
import logging
import typing
@ -83,6 +84,13 @@ class MFA(Module):
# : override it in your own implementation.
cacheTime: typing.ClassVar[int] = 5
class RESULT(enum.Enum):
"""
This enum is used to know if the MFA code was sent or not.
"""
OK = 1
ALLOWED = 2
def __init__(self, environment: 'Environment', values: Module.ValuesType):
super().__init__(environment, values)
self.initialize(values)
@ -118,17 +126,31 @@ class MFA(Module):
"""
return self.cacheTime
def sendCode(self, userId: str, identifier: str, code: str) -> None:
def emptyIndentifierAllowedToLogin(self) -> bool:
"""
If this method returns True, an user that has no "identifier" is allowed to login without MFA
"""
return True
def sendCode(self, userId: str, username: str, identifier: str, code: str) -> 'MFA.RESULT':
"""
This method will be invoked from "process" method, to send the MFA code to the user.
If returns MFA.RESULT.VALID, the MFA code was sent.
If returns MFA.RESULT.ALLOW, the MFA code was not sent, the user does not need to enter the MFA code.
If raises an error, the MFA code was not sent, and the user needs to enter the MFA code.
"""
raise NotImplementedError('sendCode method not implemented')
def process(self, userId: str, identifier: str, validity: typing.Optional[int] = None) -> None:
def process(self, userId: str, username: str, identifier: str, validity: typing.Optional[int] = None) -> 'MFA.RESULT':
"""
This method will be invoked from the MFA form, to send the MFA code to the user.
The identifier where to send the code, will be obtained from "mfaIdentifier" method.
Default implementation generates a random code and sends invokes "sendCode" method.
If returns MFA.RESULT.VALID, the MFA code was sent.
If returns MFA.RESULT.ALLOW, the MFA code was not sent, the user does not need to enter the MFA code.
If raises an error, the MFA code was not sent, and the user needs to enter the MFA code.
"""
# try to get the stored code
data: typing.Any = self.storage.getPickle(userId)
@ -138,7 +160,7 @@ class MFA(Module):
# if we have a stored code, check if it's still valid
if data[0] + datetime.timedelta(seconds=validity) < getSqlDatetime():
# if it's still valid, just return without sending a new one
return
return MFA.RESULT.OK
except Exception:
# if we have a problem, just remove the stored code
self.storage.remove(userId)
@ -149,9 +171,10 @@ class MFA(Module):
# Store the code in the database, own storage space
self.storage.putPickle(userId, (getSqlDatetime(), code))
# Send the code to the user
self.sendCode(userId, identifier, code)
return self.sendCode(userId, username, identifier, code)
def validate(self, userId: str, identifier: str, code: str, validity: typing.Optional[int] = None) -> None:
def validate(self, userId: str, username: str, identifier: str, code: str, validity: typing.Optional[int] = None) -> None:
"""
If this method is provided by an authenticator, the user will be allowed to enter a MFA code
You must raise an "exceptions.MFAError" if the code is not valid.

View File

@ -132,7 +132,7 @@ class gui:
return [{'id': v, 'text': v} for v in vals]
# Dictionary
return [{'id': k, 'text': v} for k, v in vals.items()]
return [{'id': str(k), 'text': v} for k, v in vals.items()]
@staticmethod
def convertToList(vals: typing.Iterable[str]) -> typing.List[str]:

View File

@ -1,5 +1,6 @@
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart
from re import T
import smtplib
import ssl
import typing
@ -127,7 +128,7 @@ class EmailMFA(mfas.MFA):
return 'OTP received via email'
@decorators.threaded
def sendCode(self, userId: str, identifier: str, code: str) -> None:
def doSendCode(self, identifier: str, code: str) -> None:
# Send and email with the notification
with self.login() as smtp:
try:
@ -147,6 +148,10 @@ class EmailMFA(mfas.MFA):
logger.error('Error sending email: {}'.format(e))
raise
def sendCode(self, userId: str, username: str, identifier: str, code: str) -> mfas.MFA.RESULT:
self.doSendCode(identifier, code)
return mfas.MFA.RESULT.OK
def login(self) -> smtplib.SMTP:
"""
Login to SMTP server

View File

@ -1,4 +1,5 @@
import typing
import re
import logging
from django.utils.translation import gettext_noop as _, gettext
@ -26,11 +27,10 @@ class SMSMFA(mfas.MFA):
order=1,
tooltip=_(
'URL pattern for SMS sending. It can contain the following '
'variables:<br>'
'<ul>'
'<li>{code} - the code to send</li>'
'<li>{phone} - the phone number</li>'
'</ul>'
'variables:\n'
'* {code} - the code to send\n'
'* {phone/+phone} - the phone number\n'
'* {username} - the username\n'
),
required=True,
tab=_('HTTP Server'),
@ -49,131 +49,225 @@ class SMSMFA(mfas.MFA):
smsSendingMethod = gui.ChoiceField(
label=_('SMS sending method'),
order=2,
order=3,
tooltip=_('Method for sending SMS'),
required=True,
tab=_('HTTP Server'),
values=('GET', 'POST', 'PUT'),
)
smsSendingParameters = gui.TextField(
length=128,
label=_('Parameters for SMS POST/PUT sending'),
order=3,
smsHeadersParameters = gui.TextField(
length=4096,
multiline=4,
label=_('Headers for SMS requests'),
order=4,
tooltip=_(
'Parameters for SMS sending via POST/PUT. It can contain the following '
'variables:<br>'
'<ul>'
'<li>{code} - the code to send</li>'
'<li>{phone} - the phone number</li>'
'</ul>'
'Headers for SMS requests. It can contain the following '
'variables:\n'
'* {code} - the code to send\n'
'* {phone/+phone} - the phone number\n'
'* {username} - the username\n'
'* {justUsername} - the username without @....\n'
'Headers are in the form of "Header: Value". (without the quotes)'
),
required=False,
tab=_('HTTP Server'),
)
smsAuthenticationMethod = gui.ChoiceField(
label=_('SMS authentication method'),
order=3,
tooltip=_('Method for sending SMS'),
smsSendingParameters = gui.TextField(
length=4096,
multiline=5,
label=_('Parameters for SMS POST/PUT sending'),
order=4,
tooltip=_(
'Parameters for SMS sending via POST/PUT. It can contain the following '
'variables:\n'
'* {code} - the code to send\n'
'* {phone/+phone} - the phone number\n'
'* {username} - the username\n'
'* {justUsername} - the username without @....\n'
),
required=False,
tab=_('HTTP Server'),
)
smsEncoding = gui.ChoiceField(
label=_('SMS encoding'),
defaultValue='utf-8',
order=5,
tooltip=_('Encoding for SMS'),
required=True,
tab=_('HTTP Server'),
values=[
{'id': 0, 'text': _('None')},
{'id': 1, 'text': _('HTTP Basic Auth')},
{'id': 2, 'text': _('HTTP Digest Auth')},
{'id': 3, 'text': _('HTTP Token Auth')},
],
values=('utf-8', 'iso-8859-1'),
)
smsAuthenticationMethod = gui.ChoiceField(
label=_('SMS authentication method'),
order=20,
tooltip=_('Method for sending SMS'),
required=True,
tab=_('HTTP Authentication'),
values={
'0': _('None'),
'1': _('HTTP Basic Auth'),
'2': _('HTTP Digest Auth'),
},
)
smsAuthenticationUserOrToken = gui.TextField(
length=128,
length=256,
label=_('SMS authentication user or token'),
order=4,
order=21,
tooltip=_('User or token for SMS authentication'),
required=False,
tab=_('HTTP Server'),
tab=_('HTTP Authentication'),
)
smsAuthenticationPassword = gui.TextField(
length=128,
smsAuthenticationPassword = gui.PasswordField(
length=256,
label=_('SMS authentication password'),
order=5,
order=22,
tooltip=_('Password for SMS authentication'),
required=False,
tab=_('HTTP Server'),
tab=_('HTTP Authentication'),
)
smsResponseOkRegex = gui.TextField(
length=256,
label=_('SMS response OK regex'),
order=30,
tooltip=_(
'Regex for SMS response OK. If emty, the response is considered OK if status code is 200.'
),
required=False,
tab=_('HTTP Response'),
)
smsResponseErrorAction = gui.ChoiceField(
label=_('SMS response error action'),
order=31,
defaultValue='0',
tooltip=_('Action for SMS response error'),
required=True,
tab=_('HTTP Response'),
values={
'0': _('Allow user log in without MFA'),
'1': _('Deny user log in'),
},
)
def initialize(self, values: 'Module.ValuesType') -> None:
return super().initialize(values)
def composeSmsUrl(self, code: str, phone: str) -> str:
def composeSmsUrl(self, userId: str, userName: str, code: str, phone: str) -> str:
url = self.smsSendingUrl.value
url = url.replace('{code}', code)
url = url.replace('{phone}', phone)
url = url.replace('{phone}', phone.replace('+', ''))
url = url.replace('{+phone}', phone)
url = url.replace('{username}', userName)
url = url.replace('{justUsername}', userName.split('@')[0])
return url
def getSession(self) -> requests.Session:
session = requests.Session()
# 0 means no authentication
if self.smsAuthenticationMethod.value == 1:
if self.smsAuthenticationMethod.value == '1':
session.auth = requests.auth.HTTPBasicAuth(
username=self.smsAuthenticationUserOrToken.value,
password=self.smsAuthenticationPassword.value,
)
elif self.smsAuthenticationMethod.value == 2:
elif self.smsAuthenticationMethod.value == '2':
session.auth = requests.auth.HTTPDigestAuth(
self.smsAuthenticationUserOrToken.value,
self.smsAuthenticationPassword.value,
)
elif self.smsAuthenticationMethod.value == 3:
session.headers['Authorization'] = (
'Token ' + self.smsAuthenticationUserOrToken.value
)
# Any other value means no authentication
# Add headers. Headers are in the form of "Header: Value". (without the quotes)
if self.smsHeadersParameters.value.strip():
for header in self.smsHeadersParameters.value.split('\n'):
if header.strip():
headerName, headerValue = header.split(':', 1)
session.headers[headerName.strip()] = headerValue.strip()
return session
def sendSMS_GET(self, url: str) -> None:
response = self.getSession().get(url)
if response.status_code != 200:
raise Exception('Error sending SMS: ' + response.text)
def processResponse(self, response: requests.Response) -> mfas.MFA.RESULT:
logger.debug('Response: %s', response)
if not response.ok:
if self.smsResponseErrorAction.value == '1':
raise Exception(_('SMS sending failed'))
elif self.smsResponseOkRegex.value.strip():
logger.debug('Checking response OK regex: %s: (%s)', self.smsResponseOkRegex.value, re.search(self.smsResponseOkRegex.value, response.text))
if not re.search(self.smsResponseOkRegex.value, response.text or ''):
logger.error(
'SMS response error: %s',
response.text,
)
if self.smsResponseErrorAction.value == '1':
raise Exception('SMS response error')
return mfas.MFA.RESULT.ALLOWED
return mfas.MFA.RESULT.OK
def sendSMS_POST(self, url: str, code: str, phone: str) -> None:
# Compose POST data
def sendSMS_GET(self, userId: str, username: str, url: str) -> mfas.MFA.RESULT:
return self.processResponse(self.getSession().get(url))
def getData(
self, userId: str, username: str, url: str, code: str, phone: str
) -> bytes:
data = ''
if self.smsSendingParameters.value:
data = self.smsSendingParameters.value.replace('{code}', code).replace(
'{phone}', phone
data = (
self.smsSendingParameters.value.replace('{code}', code)
.replace('{phone}', phone.replace('+', ''))
.replace('{+phone}', phone)
.replace('{username}', username)
.replace('{justUsername}', username.split('@')[0])
)
response = self.getSession().post(url, data=data.encode())
if response.status_code != 200:
raise Exception('Error sending SMS: ' + response.text)
return data.encode(self.smsEncoding.value)
def sendSMS_PUT(self, url: str, code: str, phone: str) -> None:
def sendSMS_POST(
self, userId: str, username: str, url: str, code: str, phone: str
) -> mfas.MFA.RESULT:
# Compose POST data
session = self.getSession()
bdata = self.getData(userId, username, url, code, phone)
# Add content-length header
session.headers['Content-Length'] = str(len(bdata))
return self.processResponse(session.post(url, data=bdata))
def sendSMS_PUT(
self, userId: str, username: str, url: str, code: str, phone: str
) -> mfas.MFA.RESULT:
# Compose POST data
data = ''
if self.smsSendingParameters.value:
data = self.smsSendingParameters.value.replace('{code}', code).replace(
'{phone}', phone
)
response = self.getSession().put(url, data=data.encode())
if response.status_code != 200:
raise Exception('Error sending SMS: ' + response.text)
bdata = self.getData(userId, username, url, code, phone)
return self.processResponse(self.getSession().put(url, data=bdata))
def sendSMS(self, code: str, phone: str) -> None:
url = self.composeSmsUrl(code, phone)
def sendSMS(
self, userId: str, username: str, code: str, phone: str
) -> mfas.MFA.RESULT:
url = self.composeSmsUrl(userId, username, code, phone)
if self.smsSendingMethod.value == 'GET':
return self.sendSMS_GET(url)
return self.sendSMS_GET(userId, username, url)
elif self.smsSendingMethod.value == 'POST':
return self.sendSMS_POST(url, code, phone)
return self.sendSMS_POST(userId, username, url, code, phone)
elif self.smsSendingMethod.value == 'PUT':
return self.sendSMS_PUT(url, code, phone)
return self.sendSMS_PUT(userId, username, url, code, phone)
else:
raise Exception('Unknown SMS sending method')
def label(self) -> str:
return gettext('MFA Code')
def sendCode(self, userId: str, identifier: str, code: str) -> None:
logger.debug('Sending code: %s', code)
return
def sendCode(
self, userId: str, username: str, identifier: str, code: str
) -> mfas.MFA.RESULT:
logger.debug(
'Sending SMS code "%s" for user %s (userId="%s", identifier="%s")',
code,
username,
userId,
identifier,
)
return self.sendSMS(userId, username, code, identifier)

View File

@ -1,3 +1,4 @@
from re import T
import typing
import logging
@ -33,7 +34,7 @@ class SampleMFA(mfas.MFA):
def label(self) -> str:
return 'Code is in log'
def sendCode(self, userId: str, identifier: str, code: str) -> None:
def sendCode(self, userId: str, username: str, identifier: str, code: str) -> mfas.MFA.RESULT:
logger.debug('Sending code: %s', code)
return
return mfas.MFA.RESULT.OK

View File

@ -28,7 +28,6 @@
"""
@author: Adolfo Gómez, dkmaster at dkmon dot com
"""
import datetime
import time
import logging
import hashlib
@ -109,19 +108,16 @@ def login(
response = HttpResponseRedirect(reverse('page.index'))
# save tag, weblogin will clear session
tag = request.session.get('tag')
auth.webLogin(request, response, loginResult.user, loginResult.password)
auth.webLogin(request, response, user, data) # data is user password here
# And restore tag
request.session['tag'] = tag
# If MFA is provided, we need to redirect to MFA page
request.authorized = True
if loginResult.user.manager.getType().providesMfa() and loginResult.user.manager.mfa:
authInstance = loginResult.user.manager.getInstance()
if authInstance.mfaIdentifier(loginResult.user.name):
request.authorized = (
False # We can ask for MFA so first disauthorize user
)
response = HttpResponseRedirect(reverse('page.mfa'))
if user.manager.getType().providesMfa() and user.manager.mfa:
authInstance = user.manager.getInstance()
request.authorized = False
response = HttpResponseRedirect(reverse('page.mfa'))
else:
# If redirection on login failure is found, honor it
@ -205,13 +201,22 @@ def mfa(request: ExtendedHttpRequest) -> HttpResponse:
mfaIdentifier = authInstance.mfaIdentifier(request.user.name)
label = mfaInstance.label()
if not mfaIdentifier:
if mfaInstance.emptyIndentifierAllowedToLogin():
# Allow login
request.authorized = True
return HttpResponseRedirect(reverse('page.index'))
# Not allowed to login, redirect to login error page
logger.warning('MFA identifier not found for user %s on authenticator %s. It is required by MFA %s', request.user.name, request.user.manager.name, mfaProvider.name)
return errors.errorView(request, errors.ACCESS_DENIED)
if request.method == 'POST': # User has provided MFA code
form = MFAForm(request.POST)
if form.is_valid():
code = form.cleaned_data['code']
try:
mfaInstance.validate(
userHashValue, mfaIdentifier, code, validity=validity
userHashValue, request.user.name, mfaIdentifier, code, validity=validity
)
request.authorized = True
# Remove mfa_start_time from session
@ -239,12 +244,17 @@ def mfa(request: ExtendedHttpRequest) -> HttpResponse:
else:
# Make MFA send a code
try:
mfaInstance.process(userHashValue, mfaIdentifier, validity=validity)
result = mfaInstance.process(userHashValue, request.user.name, mfaIdentifier, validity=validity)
if result == mfas.MFA.RESULT.ALLOWED:
# MFA not needed, redirect to index after authorization of the user
request.authorized = True
return HttpResponseRedirect(reverse('page.index'))
# store on session the start time of the MFA process if not already stored
if 'mfa_start_time' not in request.session:
request.session['mfa_start_time'] = time.time()
except Exception:
logger.exception('Error processing MFA')
except Exception as e:
logger.error('Error processing MFA: %s', e)
return errors.errorView(request, errors.UNKNOWN_ERROR)
# Compose a nice "XX years, XX months, XX days, XX hours, XX minutes" string from mfaProvider.remember_device