Merge remote-tracking branch 'origin/v3.5-mfa'

This commit is contained in:
Adolfo Gómez García 2022-07-01 20:23:27 +02:00
commit 8b8bf7a321
4 changed files with 100 additions and 52 deletions

View File

@ -43,6 +43,7 @@ from uds.core.auths import exceptions
if typing.TYPE_CHECKING:
from uds.core.environment import Environment
from uds.core.util.request import ExtendedHttpRequest
logger = logging.getLogger(__name__)
@ -132,7 +133,7 @@ class MFA(Module):
"""
return True
def sendCode(self, userId: str, username: str, identifier: str, code: str) -> 'MFA.RESULT':
def sendCode(self, request: 'ExtendedHttpRequest', userId: str, username: str, identifier: str, code: str) -> 'MFA.RESULT':
"""
This method will be invoked from "process" method, to send the MFA code to the user.
If returns MFA.RESULT.VALID, the MFA code was sent.
@ -142,7 +143,7 @@ class MFA(Module):
raise NotImplementedError('sendCode method not implemented')
def process(self, userId: str, username: str, identifier: str, validity: typing.Optional[int] = None) -> 'MFA.RESULT':
def process(self, request: 'ExtendedHttpRequest', userId: str, username: str, identifier: str, validity: typing.Optional[int] = None) -> 'MFA.RESULT':
"""
This method will be invoked from the MFA form, to send the MFA code to the user.
The identifier where to send the code, will be obtained from "mfaIdentifier" method.
@ -171,7 +172,7 @@ class MFA(Module):
# Store the code in the database, own storage space
self.storage.putPickle(userId, (getSqlDatetime(), code))
# Send the code to the user
return self.sendCode(userId, username, identifier, code)
return self.sendCode(request, userId, username, identifier, code)
def validate(self, userId: str, username: str, identifier: str, code: str, validity: typing.Optional[int] = None) -> None:

View File

@ -128,6 +128,8 @@ class gui:
Helper to convert from array of strings to the same dict used in choice,
multichoice, ..
"""
if not vals:
return []
if isinstance(vals, (list, tuple)):
return [{'id': v, 'text': v} for v in vals]
@ -1003,6 +1005,12 @@ class UserInterface(metaclass=UserInterfaceType):
of this posibility in a near version...
"""
@classmethod
def initClassGui(cls) -> None:
"""
This method is used to initialize the gui fields of the class.
"""
def valuesDict(self) -> gui.ValuesDictType:
"""
Returns own data needed for user interaction as a dict of key-names ->
@ -1163,6 +1171,8 @@ class UserInterface(metaclass=UserInterfaceType):
if obj:
obj.initGui() # We give the "oportunity" to fill necesary theGui data before providing it to client
theGui = obj
else:
cls.initClassGui() # We give the "oportunity" to fill necesary theGui data before providing it to client
res: typing.List[typing.MutableMapping[str, typing.Any]] = [
{'name': key, 'gui': val.guiDescription(), 'value': ''}

View File

@ -6,11 +6,14 @@ from django.utils.translation import gettext_noop as _, gettext
import requests
import requests.auth
from uds import models
from uds.core import mfas
from uds.core.ui import gui
from uds.core.util import net
if typing.TYPE_CHECKING:
from uds.core.module import Module
from uds.core.util.request import ExtendedHttpRequest
logger = logging.getLogger(__name__)
@ -21,7 +24,7 @@ class SMSMFA(mfas.MFA):
typeDescription = _('Simple SMS sending MFA using HTTP')
iconFile = 'sms.png'
smsSendingUrl = gui.TextField(
sendingUrl = gui.TextField(
length=128,
label=_('URL pattern for SMS sending'),
order=1,
@ -48,7 +51,7 @@ class SMSMFA(mfas.MFA):
),
)
smsSendingMethod = gui.ChoiceField(
sendingMethod = gui.ChoiceField(
label=_('SMS sending method'),
order=3,
tooltip=_('Method for sending SMS'),
@ -57,7 +60,7 @@ class SMSMFA(mfas.MFA):
values=('GET', 'POST', 'PUT'),
)
smsHeadersParameters = gui.TextField(
headersParameters = gui.TextField(
length=4096,
multiline=4,
label=_('Headers for SMS requests'),
@ -75,7 +78,7 @@ class SMSMFA(mfas.MFA):
tab=_('HTTP Server'),
)
smsSendingParameters = gui.TextField(
sendingParameters = gui.TextField(
length=4096,
multiline=5,
label=_('Parameters for SMS POST/PUT sending'),
@ -92,7 +95,7 @@ class SMSMFA(mfas.MFA):
tab=_('HTTP Server'),
)
smsEncoding = gui.ChoiceField(
encoding = gui.ChoiceField(
label=_('SMS encoding'),
defaultValue='utf-8',
order=5,
@ -102,7 +105,7 @@ class SMSMFA(mfas.MFA):
values=('utf-8', 'iso-8859-1'),
)
smsAuthenticationMethod = gui.ChoiceField(
authenticationMethod = gui.ChoiceField(
label=_('SMS authentication method'),
order=20,
tooltip=_('Method for sending SMS'),
@ -115,7 +118,7 @@ class SMSMFA(mfas.MFA):
},
)
smsAuthenticationUserOrToken = gui.TextField(
authenticationUserOrToken = gui.TextField(
length=256,
label=_('SMS authentication user or token'),
order=21,
@ -124,7 +127,7 @@ class SMSMFA(mfas.MFA):
tab=_('HTTP Authentication'),
)
smsAuthenticationPassword = gui.PasswordField(
authenticationPassword = gui.PasswordField(
length=256,
label=_('SMS authentication password'),
order=22,
@ -133,7 +136,7 @@ class SMSMFA(mfas.MFA):
tab=_('HTTP Authentication'),
)
smsResponseOkRegex = gui.TextField(
responseOkRegex = gui.TextField(
length=256,
label=_('SMS response OK regex'),
order=30,
@ -144,7 +147,7 @@ class SMSMFA(mfas.MFA):
tab=_('HTTP Response'),
)
smsResponseErrorAction = gui.ChoiceField(
responseErrorAction = gui.ChoiceField(
label=_('SMS response error action'),
order=31,
defaultValue='0',
@ -154,14 +157,34 @@ class SMSMFA(mfas.MFA):
values={
'0': _('Allow user log in without MFA'),
'1': _('Deny user log in'),
'2': _('Allow user to log in if it IP is in the networks list'),
'3': _('Deny user to log in if it IP is in the networks list'),
},
)
networks = gui.MultiChoiceField(
label=_('SMS networks'),
rdonly=False,
rows=5,
order=32,
tooltip=_('Networks for SMS authentication'),
required=True,
tab=_('HTTP Response'),
)
def initialize(self, values: 'Module.ValuesType') -> None:
return super().initialize(values)
@classmethod
def initClassGui(cls) -> None:
# Populate the networks list
cls.networks.setValues([
gui.choiceItem(v.uuid, v.name)
for v in models.Network.objects.all().order_by('name')
])
def composeSmsUrl(self, userId: str, userName: str, code: str, phone: str) -> str:
url = self.smsSendingUrl.value
url = self.sendingUrl.value
url = url.replace('{code}', code)
url = url.replace('{phone}', phone.replace('+', ''))
url = url.replace('{+phone}', phone)
@ -172,99 +195,107 @@ class SMSMFA(mfas.MFA):
def getSession(self) -> requests.Session:
session = requests.Session()
# 0 means no authentication
if self.smsAuthenticationMethod.value == '1':
if self.authenticationMethod.value == '1':
session.auth = requests.auth.HTTPBasicAuth(
username=self.smsAuthenticationUserOrToken.value,
password=self.smsAuthenticationPassword.value,
username=self.authenticationUserOrToken.value,
password=self.authenticationPassword.value,
)
elif self.smsAuthenticationMethod.value == '2':
elif self.authenticationMethod.value == '2':
session.auth = requests.auth.HTTPDigestAuth(
self.smsAuthenticationUserOrToken.value,
self.smsAuthenticationPassword.value,
self.authenticationUserOrToken.value,
self.authenticationPassword.value,
)
# Any other value means no authentication
# Add headers. Headers are in the form of "Header: Value". (without the quotes)
if self.smsHeadersParameters.value.strip():
for header in self.smsHeadersParameters.value.split('\n'):
if self.headersParameters.value.strip():
for header in self.headersParameters.value.split('\n'):
if header.strip():
headerName, headerValue = header.split(':', 1)
session.headers[headerName.strip()] = headerValue.strip()
return session
def processResponse(self, response: requests.Response) -> mfas.MFA.RESULT:
def processResponse(self, request: 'ExtendedHttpRequest', response: requests.Response) -> mfas.MFA.RESULT:
logger.debug('Response: %s', response)
if not response.ok:
if self.smsResponseErrorAction.value == '1':
if self.responseErrorAction.value == '1':
raise Exception(_('SMS sending failed'))
elif self.smsResponseOkRegex.value.strip():
logger.debug('Checking response OK regex: %s: (%s)', self.smsResponseOkRegex.value, re.search(self.smsResponseOkRegex.value, response.text))
if not re.search(self.smsResponseOkRegex.value, response.text or ''):
elif self.responseOkRegex.value.strip():
logger.debug('Checking response OK regex: %s: (%s)', self.responseOkRegex.value, re.search(self.responseOkRegex.value, response.text))
if not re.search(self.responseOkRegex.value, response.text or ''):
logger.error(
'SMS response error: %s',
response.text,
)
if self.smsResponseErrorAction.value == '1':
if self.responseErrorAction.value == '0':
return mfas.MFA.RESULT.ALLOWED
elif self.responseErrorAction.value == '1':
raise Exception('SMS response error')
else:
isInNetwork = any(i.ipInNetwork(request.ip) for i in models.Network.objects.filter(uuid__in = self.networks.value))
if self.responseErrorAction.value == '2':
# Allow user to log in if it IP is in the networks list
if isInNetwork:
return mfas.MFA.RESULT.ALLOWED
elif self.responseErrorAction.value == '3':
if isInNetwork:
raise Exception('SMS response error')
return mfas.MFA.RESULT.ALLOWED
return mfas.MFA.RESULT.OK
def sendSMS_GET(self, userId: str, username: str, url: str) -> mfas.MFA.RESULT:
return self.processResponse(self.getSession().get(url))
def getData(
self, userId: str, username: str, url: str, code: str, phone: str
self, request: 'ExtendedHttpRequest', userId: str, username: str, url: str, code: str, phone: str
) -> bytes:
data = ''
if self.smsSendingParameters.value:
if self.sendingParameters.value:
data = (
self.smsSendingParameters.value.replace('{code}', code)
self.sendingParameters.value.replace('{code}', code)
.replace('{phone}', phone.replace('+', ''))
.replace('{+phone}', phone)
.replace('{username}', username)
.replace('{justUsername}', username.split('@')[0])
)
return data.encode(self.smsEncoding.value)
return data.encode(self.encoding.value)
def sendSMS_GET(self, request: 'ExtendedHttpRequest', userId: str, username: str, url: str) -> mfas.MFA.RESULT:
return self.processResponse(request, self.getSession().get(url))
def sendSMS_POST(
self, userId: str, username: str, url: str, code: str, phone: str
self, request: 'ExtendedHttpRequest', userId: str, username: str, url: str, code: str, phone: str
) -> mfas.MFA.RESULT:
# Compose POST data
session = self.getSession()
bdata = self.getData(userId, username, url, code, phone)
bdata = self.getData(request, userId, username, url, code, phone)
# Add content-length header
session.headers['Content-Length'] = str(len(bdata))
return self.processResponse(session.post(url, data=bdata))
return self.processResponse(request, session.post(url, data=bdata))
def sendSMS_PUT(
self, userId: str, username: str, url: str, code: str, phone: str
self, request: 'ExtendedHttpRequest', userId: str, username: str, url: str, code: str, phone: str
) -> mfas.MFA.RESULT:
# Compose POST data
data = ''
bdata = self.getData(userId, username, url, code, phone)
return self.processResponse(self.getSession().put(url, data=bdata))
bdata = self.getData(request, userId, username, url, code, phone)
return self.processResponse(request, self.getSession().put(url, data=bdata))
def sendSMS(
self, userId: str, username: str, code: str, phone: str
self, request: 'ExtendedHttpRequest', userId: str, username: str, code: str, phone: str
) -> mfas.MFA.RESULT:
url = self.composeSmsUrl(userId, username, code, phone)
if self.smsSendingMethod.value == 'GET':
return self.sendSMS_GET(userId, username, url)
elif self.smsSendingMethod.value == 'POST':
return self.sendSMS_POST(userId, username, url, code, phone)
elif self.smsSendingMethod.value == 'PUT':
return self.sendSMS_PUT(userId, username, url, code, phone)
if self.sendingMethod.value == 'GET':
return self.sendSMS_GET(request, userId, username, url)
elif self.sendingMethod.value == 'POST':
return self.sendSMS_POST(request, userId, username, url, code, phone)
elif self.sendingMethod.value == 'PUT':
return self.sendSMS_PUT(request, userId, username, url, code, phone)
else:
raise Exception('Unknown SMS sending method')
def label(self) -> str:
return gettext('MFA Code')
def sendCode(
self, userId: str, username: str, identifier: str, code: str
) -> mfas.MFA.RESULT:
def sendCode(self, request: 'ExtendedHttpRequest', userId: str, username: str, identifier: str, code: str) -> mfas.MFA.RESULT:
logger.debug(
'Sending SMS code "%s" for user %s (userId="%s", identifier="%s")',
code,
@ -272,4 +303,4 @@ class SMSMFA(mfas.MFA):
userId,
identifier,
)
return self.sendSMS(userId, username, code, identifier)
return self.sendSMS(request, userId, username, code, identifier)

View File

@ -116,6 +116,12 @@ class Network(UUIDModel, TaggingMixin): # type: ignore
"""
return net.longToIp(self.net_end)
def ipInNetwork(self, ip: str) -> bool:
"""
Returns true if the specified ip is in this network
"""
return net.ipToLong(ip) >= self.net_start and net.ipToLong(ip) <= self.net_end
def update(self, name: str, netRange: str):
"""
Updated this network with provided values