1
0
mirror of https://github.com/dkmstr/openuds.git synced 2024-12-24 21:34:41 +03:00

Fixing up MFA

This commit is contained in:
Adolfo Gómez García 2022-06-29 22:05:45 +02:00
parent 64fc61a2d6
commit 76e67b1f63
5 changed files with 125 additions and 41 deletions

View File

@ -118,17 +118,25 @@ class MFA(Module):
"""
return self.cacheTime
def sendCode(self, userId: str, identifier: str, code: str) -> None:
def sendCode(self, userId: str, username: str, identifier: str, code: str) -> bool:
"""
This method will be invoked from "process" method, to send the MFA code to the user.
If returns True, the MFA code was sent.
If returns False, the MFA code was not sent, the user does not need to enter the MFA code.
If raises an error, the MFA code was not sent, and the user needs to enter the MFA code.
"""
raise NotImplementedError('sendCode method not implemented')
def process(self, userId: str, identifier: str, validity: typing.Optional[int] = None) -> None:
def process(self, userId: str, username: str, identifier: str, validity: typing.Optional[int] = None) -> bool:
"""
This method will be invoked from the MFA form, to send the MFA code to the user.
The identifier where to send the code, will be obtained from "mfaIdentifier" method.
Default implementation generates a random code and sends invokes "sendCode" method.
If returns True, the MFA code was sent.
If returns False, the MFA code was not sent, the user does not need to enter the MFA code.
If raises an error, the MFA code was not sent, and the user needs to enter the MFA code.
"""
# try to get the stored code
data: typing.Any = self.storage.getPickle(userId)
@ -138,7 +146,7 @@ class MFA(Module):
# if we have a stored code, check if it's still valid
if data[0] + datetime.timedelta(seconds=validity) < getSqlDatetime():
# if it's still valid, just return without sending a new one
return
return True
except Exception:
# if we have a problem, just remove the stored code
self.storage.remove(userId)
@ -149,9 +157,10 @@ class MFA(Module):
# Store the code in the database, own storage space
self.storage.putPickle(userId, (getSqlDatetime(), code))
# Send the code to the user
self.sendCode(userId, identifier, code)
return self.sendCode(userId, username, identifier, code)
def validate(self, userId: str, identifier: str, code: str, validity: typing.Optional[int] = None) -> None:
def validate(self, userId: str, username: str, identifier: str, code: str, validity: typing.Optional[int] = None) -> None:
"""
If this method is provided by an authenticator, the user will be allowed to enter a MFA code
You must raise an "exceptions.MFAError" if the code is not valid.

View File

@ -1,5 +1,6 @@
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart
from re import T
import smtplib
import ssl
import typing
@ -127,7 +128,7 @@ class EmailMFA(mfas.MFA):
return 'OTP received via email'
@decorators.threaded
def sendCode(self, userId: str, identifier: str, code: str) -> None:
def doSendCode(self, identifier: str, code: str) -> None:
# Send and email with the notification
with self.login() as smtp:
try:
@ -147,6 +148,10 @@ class EmailMFA(mfas.MFA):
logger.error('Error sending email: {}'.format(e))
raise
def sendCode(self, userId: str, username: str, identifier: str, code: str) -> bool:
self.doSendCode(identifier, code)
return True
def login(self) -> smtplib.SMTP:
"""
Login to SMTP server

View File

@ -1,4 +1,5 @@
import typing
import re
import logging
from django.utils.translation import gettext_noop as _, gettext
@ -29,6 +30,7 @@ class SMSMFA(mfas.MFA):
'variables:\n'
'* {code} - the code to send\n'
'* {phone/+phone} - the phone number\n'
'* {username} - the username\n'
),
required=True,
tab=_('HTTP Server'),
@ -64,6 +66,7 @@ class SMSMFA(mfas.MFA):
'variables:\n'
'* {code} - the code to send\n'
'* {phone/+phone} - the phone number\n'
'* {username} - the username\n'
'Headers are in the form of "Header: Value". (without the quotes)'
),
required=False,
@ -80,17 +83,28 @@ class SMSMFA(mfas.MFA):
'variables:\n'
'* {code} - the code to send\n'
'* {phone/+phone} - the phone number\n'
'* {username} - the username\n'
),
required=False,
tab=_('HTTP Server'),
)
smsAuthenticationMethod = gui.ChoiceField(
label=_('SMS authentication method'),
order=6,
tooltip=_('Method for sending SMS'),
smsEncoding = gui.ChoiceField(
label=_('SMS encoding'),
defaultValue='utf-8',
order=5,
tooltip=_('Encoding for SMS'),
required=True,
tab=_('HTTP Server'),
values=('utf-8', 'iso-8859-1'),
)
smsAuthenticationMethod = gui.ChoiceField(
label=_('SMS authentication method'),
order=20,
tooltip=_('Method for sending SMS'),
required=True,
tab=_('HTTP Authentication'),
values={
'0': _('None'),
'1': _('HTTP Basic Auth'),
@ -101,29 +115,54 @@ class SMSMFA(mfas.MFA):
smsAuthenticationUserOrToken = gui.TextField(
length=256,
label=_('SMS authentication user or token'),
order=7,
order=21,
tooltip=_('User or token for SMS authentication'),
required=False,
tab=_('HTTP Server'),
tab=_('HTTP Authentication'),
)
smsAuthenticationPassword = gui.TextField(
length=256,
label=_('SMS authentication password'),
order=8,
order=22,
tooltip=_('Password for SMS authentication'),
required=False,
tab=_('HTTP Server'),
tab=_('HTTP Authentication'),
)
smsResponseOkRegex = gui.TextField(
length=256,
label=_('SMS response OK regex'),
order=30,
tooltip=_(
'Regex for SMS response OK. If emty, the response is considered OK if status code is 200.'
),
required=False,
tab=_('HTTP Response'),
)
smsResponseErrorAction = gui.ChoiceField(
label=_('SMS response error action'),
order=31,
defaultValue='0',
tooltip=_('Action for SMS response error'),
required=True,
tab=_('HTTP Response'),
values={
'0': _('Allow user log in without MFA'),
'1': _('Deny user log in'),
},
)
def initialize(self, values: 'Module.ValuesType') -> None:
return super().initialize(values)
def composeSmsUrl(self, code: str, phone: str) -> str:
def composeSmsUrl(self, userId: str, userName: str, code: str, phone: str) -> str:
url = self.smsSendingUrl.value
url = url.replace('{code}', code)
url = url.replace('{phone}', phone.replace('+', ''))
url = url.replace('{+phone}', phone)
url = url.replace('{username}', userName)
return url
def getSession(self) -> requests.Session:
@ -149,47 +188,72 @@ class SMSMFA(mfas.MFA):
session.headers[headerName.strip()] = headerValue.strip()
return session
def sendSMS_GET(self, url: str) -> None:
response = self.getSession().get(url)
if response.status_code != 200:
raise Exception('Error sending SMS: ' + response.text)
def processResponse(self, response: requests.Response) -> None:
if not response.ok:
if self.smsResponseErrorAction.value == '1':
raise Exception(_('SMS sending failed'))
else:
return
def sendSMS_POST(self, url: str, code: str, phone: str) -> None:
if self.smsResponseOkRegex.value.strip():
if not re.search(self.smsResponseOkRegex.value, response.text):
logger.error(
'SMS response error: %s',
response.text,
)
if self.smsResponseErrorAction.value == '1':
raise Exception('SMS response error')
def sendSMS_GET(self, userId: str, username: str, url: str) -> None:
self.processResponse(self.getSession().get(url))
def sendSMS_POST(
self, userId: str, username: str, url: str, code: str, phone: str
) -> None:
# Compose POST data
data = ''
if self.smsSendingParameters.value:
data = self.smsSendingParameters.value.replace('{code}', code).replace(
'{phone}', phone.replace('+', '').replace('{+phone}', phone)
'{phone}',
phone.replace('+', '')
.replace('{+phone}', phone)
.replace('{username}', username),
)
response = self.getSession().post(url, data=data.encode())
if response.status_code != 200:
raise Exception('Error sending SMS: ' + response.text)
bdata = data.encode(self.smsEncoding.value)
session = self.getSession()
# Add content-length header
session.headers['Content-Length'] = str(len(bdata))
def sendSMS_PUT(self, url: str, code: str, phone: str) -> None:
self.processResponse(session.post(url, data=bdata))
def sendSMS_PUT(
self, userId: str, username: str, url: str, code: str, phone: str
) -> None:
# Compose POST data
data = ''
if self.smsSendingParameters.value:
data = self.smsSendingParameters.value.replace('{code}', code).replace(
'{phone}', phone
data = (
self.smsSendingParameters.value.replace('{code}', code)
.replace('{phone}', phone)
.replace('{+phone}', phone)
.replace('{username}', username)
)
response = self.getSession().put(url, data=data.encode())
if response.status_code != 200:
raise Exception('Error sending SMS: ' + response.text)
self.processResponse(self.getSession().put(url, data=data.encode()))
def sendSMS(self, code: str, phone: str) -> None:
url = self.composeSmsUrl(code, phone)
def sendSMS(self, userId: str, username: str, code: str, phone: str) -> None:
url = self.composeSmsUrl(userId, username, code, phone)
if self.smsSendingMethod.value == 'GET':
return self.sendSMS_GET(url)
return self.sendSMS_GET(userId, username, url)
elif self.smsSendingMethod.value == 'POST':
return self.sendSMS_POST(url, code, phone)
return self.sendSMS_POST(userId, username, url, code, phone)
elif self.smsSendingMethod.value == 'PUT':
return self.sendSMS_PUT(url, code, phone)
return self.sendSMS_PUT(userId, username, url, code, phone)
else:
raise Exception('Unknown SMS sending method')
def label(self) -> str:
return gettext('MFA Code')
def sendCode(self, userId: str, identifier: str, code: str) -> None:
def sendCode(self, userId: str, username: str, identifier: str, code: str) -> bool:
logger.debug('Sending code: %s', code)
return
return True

View File

@ -1,3 +1,4 @@
from re import T
import typing
import logging
@ -33,7 +34,7 @@ class SampleMFA(mfas.MFA):
def label(self) -> str:
return 'Code is in log'
def sendCode(self, userId: str, identifier: str, code: str) -> None:
def sendCode(self, userId: str, username: str, identifier: str, code: str) -> bool:
logger.debug('Sending code: %s', code)
return
return True

View File

@ -212,7 +212,7 @@ def mfa(request: ExtendedHttpRequest) -> HttpResponse:
code = form.cleaned_data['code']
try:
mfaInstance.validate(
userHashValue, mfaIdentifier, code, validity=validity
userHashValue, request.user.name, mfaIdentifier, code, validity=validity
)
request.authorized = True
# Remove mfa_start_time from session
@ -240,7 +240,12 @@ def mfa(request: ExtendedHttpRequest) -> HttpResponse:
else:
# Make MFA send a code
try:
mfaInstance.process(userHashValue, mfaIdentifier, validity=validity)
result = mfaInstance.process(userHashValue, request.user.name, mfaIdentifier, validity=validity)
if not result:
# MFA not needed, redirect to index after authorization of the user
request.authorized = True
return HttpResponseRedirect(reverse('page.index'))
# store on session the start time of the MFA process if not already stored
if 'mfa_start_time' not in request.session:
request.session['mfa_start_time'] = time.time()