1
0
mirror of https://github.com/dkmstr/openuds.git synced 2024-12-24 21:34:41 +03:00

Tested correct working of generic SMS sending using HTTP

This commit is contained in:
Adolfo Gómez García 2022-06-29 23:14:26 +02:00
parent 76e67b1f63
commit 11d9c77a79
6 changed files with 92 additions and 57 deletions

View File

@ -49,7 +49,7 @@ logger = logging.getLogger(__name__)
class MFA(ModelHandler):
model = models.MFA
save_fields = ['name', 'comments', 'tags', 'remember_device']
save_fields = ['name', 'comments', 'tags', 'remember_device', 'validity']
table_title = _('Multi Factor Authentication')
table_fields = [

View File

@ -32,6 +32,7 @@
"""
import datetime
import random
import enum
import logging
import typing
@ -83,6 +84,13 @@ class MFA(Module):
# : override it in your own implementation.
cacheTime: typing.ClassVar[int] = 5
class RESULT(enum.Enum):
"""
This enum is used to know if the MFA code was sent or not.
"""
OK = 1
ALLOWED = 2
def __init__(self, environment: 'Environment', values: Module.ValuesType):
super().__init__(environment, values)
self.initialize(values)
@ -118,24 +126,30 @@ class MFA(Module):
"""
return self.cacheTime
def sendCode(self, userId: str, username: str, identifier: str, code: str) -> bool:
def emptyIndentifierAllowedToLogin(self) -> bool:
"""
If this method returns True, an user that has no "identifier" is allowed to login without MFA
"""
return True
def sendCode(self, userId: str, username: str, identifier: str, code: str) -> 'MFA.RESULT':
"""
This method will be invoked from "process" method, to send the MFA code to the user.
If returns True, the MFA code was sent.
If returns False, the MFA code was not sent, the user does not need to enter the MFA code.
If returns MFA.RESULT.VALID, the MFA code was sent.
If returns MFA.RESULT.ALLOW, the MFA code was not sent, the user does not need to enter the MFA code.
If raises an error, the MFA code was not sent, and the user needs to enter the MFA code.
"""
raise NotImplementedError('sendCode method not implemented')
def process(self, userId: str, username: str, identifier: str, validity: typing.Optional[int] = None) -> bool:
def process(self, userId: str, username: str, identifier: str, validity: typing.Optional[int] = None) -> 'MFA.RESULT':
"""
This method will be invoked from the MFA form, to send the MFA code to the user.
The identifier where to send the code, will be obtained from "mfaIdentifier" method.
Default implementation generates a random code and sends invokes "sendCode" method.
If returns True, the MFA code was sent.
If returns False, the MFA code was not sent, the user does not need to enter the MFA code.
If returns MFA.RESULT.VALID, the MFA code was sent.
If returns MFA.RESULT.ALLOW, the MFA code was not sent, the user does not need to enter the MFA code.
If raises an error, the MFA code was not sent, and the user needs to enter the MFA code.
"""
# try to get the stored code
@ -146,7 +160,7 @@ class MFA(Module):
# if we have a stored code, check if it's still valid
if data[0] + datetime.timedelta(seconds=validity) < getSqlDatetime():
# if it's still valid, just return without sending a new one
return True
return MFA.RESULT.OK
except Exception:
# if we have a problem, just remove the stored code
self.storage.remove(userId)

View File

@ -148,9 +148,9 @@ class EmailMFA(mfas.MFA):
logger.error('Error sending email: {}'.format(e))
raise
def sendCode(self, userId: str, username: str, identifier: str, code: str) -> bool:
def sendCode(self, userId: str, username: str, identifier: str, code: str) -> mfas.MFA.RESULT:
self.doSendCode(identifier, code)
return True
return mfas.MFA.RESULT.OK
def login(self) -> smtplib.SMTP:
"""

View File

@ -67,6 +67,7 @@ class SMSMFA(mfas.MFA):
'* {code} - the code to send\n'
'* {phone/+phone} - the phone number\n'
'* {username} - the username\n'
'* {justUsername} - the username without @....\n'
'Headers are in the form of "Header: Value". (without the quotes)'
),
required=False,
@ -84,6 +85,7 @@ class SMSMFA(mfas.MFA):
'* {code} - the code to send\n'
'* {phone/+phone} - the phone number\n'
'* {username} - the username\n'
'* {justUsername} - the username without @....\n'
),
required=False,
tab=_('HTTP Server'),
@ -121,7 +123,7 @@ class SMSMFA(mfas.MFA):
tab=_('HTTP Authentication'),
)
smsAuthenticationPassword = gui.TextField(
smsAuthenticationPassword = gui.PasswordField(
length=256,
label=_('SMS authentication password'),
order=22,
@ -163,6 +165,7 @@ class SMSMFA(mfas.MFA):
url = url.replace('{phone}', phone.replace('+', ''))
url = url.replace('{+phone}', phone)
url = url.replace('{username}', userName)
url = url.replace('{justUsername}', userName.split('@')[0])
return url
def getSession(self) -> requests.Session:
@ -188,59 +191,62 @@ class SMSMFA(mfas.MFA):
session.headers[headerName.strip()] = headerValue.strip()
return session
def processResponse(self, response: requests.Response) -> None:
def processResponse(self, response: requests.Response) -> mfas.MFA.RESULT:
logger.debug('Response: %s', response)
if not response.ok:
if self.smsResponseErrorAction.value == '1':
raise Exception(_('SMS sending failed'))
else:
return
if self.smsResponseOkRegex.value.strip():
if not re.search(self.smsResponseOkRegex.value, response.text):
elif self.smsResponseOkRegex.value.strip():
logger.debug('Checking response OK regex: %s: (%s)', self.smsResponseOkRegex.value, re.search(self.smsResponseOkRegex.value, response.text))
if not re.search(self.smsResponseOkRegex.value, response.text or ''):
logger.error(
'SMS response error: %s',
response.text,
)
if self.smsResponseErrorAction.value == '1':
raise Exception('SMS response error')
return mfas.MFA.RESULT.ALLOWED
return mfas.MFA.RESULT.OK
def sendSMS_GET(self, userId: str, username: str, url: str) -> None:
self.processResponse(self.getSession().get(url))
def sendSMS_GET(self, userId: str, username: str, url: str) -> mfas.MFA.RESULT:
return self.processResponse(self.getSession().get(url))
def sendSMS_POST(
def getData(
self, userId: str, username: str, url: str, code: str, phone: str
) -> None:
# Compose POST data
data = ''
if self.smsSendingParameters.value:
data = self.smsSendingParameters.value.replace('{code}', code).replace(
'{phone}',
phone.replace('+', '')
.replace('{+phone}', phone)
.replace('{username}', username),
)
bdata = data.encode(self.smsEncoding.value)
session = self.getSession()
# Add content-length header
session.headers['Content-Length'] = str(len(bdata))
self.processResponse(session.post(url, data=bdata))
def sendSMS_PUT(
self, userId: str, username: str, url: str, code: str, phone: str
) -> None:
# Compose POST data
) -> bytes:
data = ''
if self.smsSendingParameters.value:
data = (
self.smsSendingParameters.value.replace('{code}', code)
.replace('{phone}', phone)
.replace('{phone}', phone.replace('+', ''))
.replace('{+phone}', phone)
.replace('{username}', username)
.replace('{justUsername}', username.split('@')[0])
)
self.processResponse(self.getSession().put(url, data=data.encode()))
return data.encode(self.smsEncoding.value)
def sendSMS(self, userId: str, username: str, code: str, phone: str) -> None:
def sendSMS_POST(
self, userId: str, username: str, url: str, code: str, phone: str
) -> mfas.MFA.RESULT:
# Compose POST data
session = self.getSession()
bdata = self.getData(userId, username, url, code, phone)
# Add content-length header
session.headers['Content-Length'] = str(len(bdata))
return self.processResponse(session.post(url, data=bdata))
def sendSMS_PUT(
self, userId: str, username: str, url: str, code: str, phone: str
) -> mfas.MFA.RESULT:
# Compose POST data
data = ''
bdata = self.getData(userId, username, url, code, phone)
return self.processResponse(self.getSession().put(url, data=bdata))
def sendSMS(
self, userId: str, username: str, code: str, phone: str
) -> mfas.MFA.RESULT:
url = self.composeSmsUrl(userId, username, code, phone)
if self.smsSendingMethod.value == 'GET':
return self.sendSMS_GET(userId, username, url)
@ -254,6 +260,14 @@ class SMSMFA(mfas.MFA):
def label(self) -> str:
return gettext('MFA Code')
def sendCode(self, userId: str, username: str, identifier: str, code: str) -> bool:
logger.debug('Sending code: %s', code)
return True
def sendCode(
self, userId: str, username: str, identifier: str, code: str
) -> mfas.MFA.RESULT:
logger.debug(
'Sending SMS code "%s" for user %s (userId="%s", identifier="%s")',
code,
username,
userId,
identifier,
)
return self.sendSMS(userId, username, code, identifier)

View File

@ -34,7 +34,7 @@ class SampleMFA(mfas.MFA):
def label(self) -> str:
return 'Code is in log'
def sendCode(self, userId: str, username: str, identifier: str, code: str) -> bool:
def sendCode(self, userId: str, username: str, identifier: str, code: str) -> mfas.MFA.RESULT:
logger.debug('Sending code: %s', code)
return True
return mfas.MFA.RESULT.OK

View File

@ -28,7 +28,6 @@
"""
@author: Adolfo Gómez, dkmaster at dkmon dot com
"""
import datetime
import time
import logging
import hashlib
@ -44,6 +43,7 @@ from django.utils.translation import gettext as _
from uds.core.util.request import ExtendedHttpRequest, ExtendedHttpRequestWithUser
from django.views.decorators.cache import never_cache
from uds.core import mfas
from uds.core.auths import auth, exceptions
from uds.web.util import errors
from uds.web.forms.LoginForm import LoginForm
@ -118,10 +118,8 @@ def login(
request.authorized = True
if user.manager.getType().providesMfa() and user.manager.mfa:
authInstance = user.manager.getInstance()
if authInstance.mfaIdentifier(user.name):
# We can ask for MFA so first disauthorize user
request.authorized = False
response = HttpResponseRedirect(reverse('page.mfa'))
request.authorized = False
response = HttpResponseRedirect(reverse('page.mfa'))
else:
# If error is numeric, redirect...
@ -206,6 +204,15 @@ def mfa(request: ExtendedHttpRequest) -> HttpResponse:
mfaIdentifier = authInstance.mfaIdentifier(request.user.name)
label = mfaInstance.label()
if not mfaIdentifier:
if mfaInstance.emptyIndentifierAllowedToLogin():
# Allow login
request.authorized = True
return HttpResponseRedirect(reverse('page.index'))
# Not allowed to login, redirect to login error page
logger.warning('MFA identifier not found for user %s on authenticator %s. It is required by MFA %s', request.user.name, request.user.manager.name, mfaProvider.name)
return errors.errorView(request, errors.ACCESS_DENIED)
if request.method == 'POST': # User has provided MFA code
form = MFAForm(request.POST)
if form.is_valid():
@ -241,7 +248,7 @@ def mfa(request: ExtendedHttpRequest) -> HttpResponse:
# Make MFA send a code
try:
result = mfaInstance.process(userHashValue, request.user.name, mfaIdentifier, validity=validity)
if not result:
if result == mfas.MFA.RESULT.ALLOWED:
# MFA not needed, redirect to index after authorization of the user
request.authorized = True
return HttpResponseRedirect(reverse('page.index'))
@ -249,8 +256,8 @@ def mfa(request: ExtendedHttpRequest) -> HttpResponse:
# store on session the start time of the MFA process if not already stored
if 'mfa_start_time' not in request.session:
request.session['mfa_start_time'] = time.time()
except Exception:
logger.exception('Error processing MFA')
except Exception as e:
logger.error('Error processing MFA: %s', e)
return errors.errorView(request, errors.UNKNOWN_ERROR)
# Compose a nice "XX years, XX months, XX days, XX hours, XX minutes" string from mfaProvider.remember_device