From 5e618710910e980f258bc13f65ecd4a478e38de5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Adolfo=20G=C3=B3mez=20Garc=C3=ADa?= Date: Fri, 1 Jul 2022 20:23:13 +0200 Subject: [PATCH] Added network to MFA and added initGui suppor for "providers" --- server/src/uds/core/mfas/mfa.py | 7 +- server/src/uds/core/ui/user_interface.py | 10 ++ server/src/uds/mfas/SMS/mfa.py | 129 ++++++++++++++--------- server/src/uds/models/network.py | 6 ++ 4 files changed, 100 insertions(+), 52 deletions(-) diff --git a/server/src/uds/core/mfas/mfa.py b/server/src/uds/core/mfas/mfa.py index 10460f8ec..0e306aa7d 100644 --- a/server/src/uds/core/mfas/mfa.py +++ b/server/src/uds/core/mfas/mfa.py @@ -43,6 +43,7 @@ from uds.core.auths import exceptions if typing.TYPE_CHECKING: from uds.core.environment import Environment + from uds.core.util.request import ExtendedHttpRequest logger = logging.getLogger(__name__) @@ -132,7 +133,7 @@ class MFA(Module): """ return True - def sendCode(self, userId: str, username: str, identifier: str, code: str) -> 'MFA.RESULT': + def sendCode(self, request: 'ExtendedHttpRequest', userId: str, username: str, identifier: str, code: str) -> 'MFA.RESULT': """ This method will be invoked from "process" method, to send the MFA code to the user. If returns MFA.RESULT.VALID, the MFA code was sent. @@ -142,7 +143,7 @@ class MFA(Module): raise NotImplementedError('sendCode method not implemented') - def process(self, userId: str, username: str, identifier: str, validity: typing.Optional[int] = None) -> 'MFA.RESULT': + def process(self, request: 'ExtendedHttpRequest', userId: str, username: str, identifier: str, validity: typing.Optional[int] = None) -> 'MFA.RESULT': """ This method will be invoked from the MFA form, to send the MFA code to the user. The identifier where to send the code, will be obtained from "mfaIdentifier" method. @@ -171,7 +172,7 @@ class MFA(Module): # Store the code in the database, own storage space self.storage.putPickle(userId, (getSqlDatetime(), code)) # Send the code to the user - return self.sendCode(userId, username, identifier, code) + return self.sendCode(request, userId, username, identifier, code) def validate(self, userId: str, username: str, identifier: str, code: str, validity: typing.Optional[int] = None) -> None: diff --git a/server/src/uds/core/ui/user_interface.py b/server/src/uds/core/ui/user_interface.py index 919f22090..0b10642c2 100644 --- a/server/src/uds/core/ui/user_interface.py +++ b/server/src/uds/core/ui/user_interface.py @@ -118,6 +118,8 @@ class gui: Helper to convert from array of strings to the same dict used in choice, multichoice, .. """ + if not vals: + return [] if isinstance(vals, (list, tuple)): return [{'id': v, 'text': v} for v in vals] @@ -990,6 +992,12 @@ class UserInterface(metaclass=UserInterfaceType): of this posibility in a near version... """ + @classmethod + def initClassGui(cls) -> None: + """ + This method is used to initialize the gui fields of the class. + """ + def valuesDict(self) -> gui.ValuesDictType: """ Returns own data needed for user interaction as a dict of key-names -> @@ -1144,6 +1152,8 @@ class UserInterface(metaclass=UserInterfaceType): if obj: obj.initGui() # We give the "oportunity" to fill necesary theGui data before providing it to client theGui = obj + else: + cls.initClassGui() # We give the "oportunity" to fill necesary theGui data before providing it to client res: typing.List[typing.MutableMapping[str, typing.Any]] = [] diff --git a/server/src/uds/mfas/SMS/mfa.py b/server/src/uds/mfas/SMS/mfa.py index 54d6e07c1..56d9381b0 100644 --- a/server/src/uds/mfas/SMS/mfa.py +++ b/server/src/uds/mfas/SMS/mfa.py @@ -6,11 +6,14 @@ from django.utils.translation import gettext_noop as _, gettext import requests import requests.auth +from uds import models from uds.core import mfas from uds.core.ui import gui +from uds.core.util import net if typing.TYPE_CHECKING: from uds.core.module import Module + from uds.core.util.request import ExtendedHttpRequest logger = logging.getLogger(__name__) @@ -21,7 +24,7 @@ class SMSMFA(mfas.MFA): typeDescription = _('Simple SMS sending MFA using HTTP') iconFile = 'sms.png' - smsSendingUrl = gui.TextField( + sendingUrl = gui.TextField( length=128, label=_('URL pattern for SMS sending'), order=1, @@ -48,7 +51,7 @@ class SMSMFA(mfas.MFA): ), ) - smsSendingMethod = gui.ChoiceField( + sendingMethod = gui.ChoiceField( label=_('SMS sending method'), order=3, tooltip=_('Method for sending SMS'), @@ -57,7 +60,7 @@ class SMSMFA(mfas.MFA): values=('GET', 'POST', 'PUT'), ) - smsHeadersParameters = gui.TextField( + headersParameters = gui.TextField( length=4096, multiline=4, label=_('Headers for SMS requests'), @@ -75,7 +78,7 @@ class SMSMFA(mfas.MFA): tab=_('HTTP Server'), ) - smsSendingParameters = gui.TextField( + sendingParameters = gui.TextField( length=4096, multiline=5, label=_('Parameters for SMS POST/PUT sending'), @@ -92,7 +95,7 @@ class SMSMFA(mfas.MFA): tab=_('HTTP Server'), ) - smsEncoding = gui.ChoiceField( + encoding = gui.ChoiceField( label=_('SMS encoding'), defaultValue='utf-8', order=5, @@ -102,7 +105,7 @@ class SMSMFA(mfas.MFA): values=('utf-8', 'iso-8859-1'), ) - smsAuthenticationMethod = gui.ChoiceField( + authenticationMethod = gui.ChoiceField( label=_('SMS authentication method'), order=20, tooltip=_('Method for sending SMS'), @@ -115,7 +118,7 @@ class SMSMFA(mfas.MFA): }, ) - smsAuthenticationUserOrToken = gui.TextField( + authenticationUserOrToken = gui.TextField( length=256, label=_('SMS authentication user or token'), order=21, @@ -124,7 +127,7 @@ class SMSMFA(mfas.MFA): tab=_('HTTP Authentication'), ) - smsAuthenticationPassword = gui.PasswordField( + authenticationPassword = gui.PasswordField( length=256, label=_('SMS authentication password'), order=22, @@ -133,7 +136,7 @@ class SMSMFA(mfas.MFA): tab=_('HTTP Authentication'), ) - smsResponseOkRegex = gui.TextField( + responseOkRegex = gui.TextField( length=256, label=_('SMS response OK regex'), order=30, @@ -144,7 +147,7 @@ class SMSMFA(mfas.MFA): tab=_('HTTP Response'), ) - smsResponseErrorAction = gui.ChoiceField( + responseErrorAction = gui.ChoiceField( label=_('SMS response error action'), order=31, defaultValue='0', @@ -154,14 +157,34 @@ class SMSMFA(mfas.MFA): values={ '0': _('Allow user log in without MFA'), '1': _('Deny user log in'), + '2': _('Allow user to log in if it IP is in the networks list'), + '3': _('Deny user to log in if it IP is in the networks list'), }, ) + networks = gui.MultiChoiceField( + label=_('SMS networks'), + rdonly=False, + rows=5, + order=32, + tooltip=_('Networks for SMS authentication'), + required=True, + tab=_('HTTP Response'), + ) + def initialize(self, values: 'Module.ValuesType') -> None: return super().initialize(values) + @classmethod + def initClassGui(cls) -> None: + # Populate the networks list + cls.networks.setValues([ + gui.choiceItem(v.uuid, v.name) + for v in models.Network.objects.all().order_by('name') + ]) + def composeSmsUrl(self, userId: str, userName: str, code: str, phone: str) -> str: - url = self.smsSendingUrl.value + url = self.sendingUrl.value url = url.replace('{code}', code) url = url.replace('{phone}', phone.replace('+', '')) url = url.replace('{+phone}', phone) @@ -172,99 +195,107 @@ class SMSMFA(mfas.MFA): def getSession(self) -> requests.Session: session = requests.Session() # 0 means no authentication - if self.smsAuthenticationMethod.value == '1': + if self.authenticationMethod.value == '1': session.auth = requests.auth.HTTPBasicAuth( - username=self.smsAuthenticationUserOrToken.value, - password=self.smsAuthenticationPassword.value, + username=self.authenticationUserOrToken.value, + password=self.authenticationPassword.value, ) - elif self.smsAuthenticationMethod.value == '2': + elif self.authenticationMethod.value == '2': session.auth = requests.auth.HTTPDigestAuth( - self.smsAuthenticationUserOrToken.value, - self.smsAuthenticationPassword.value, + self.authenticationUserOrToken.value, + self.authenticationPassword.value, ) # Any other value means no authentication # Add headers. Headers are in the form of "Header: Value". (without the quotes) - if self.smsHeadersParameters.value.strip(): - for header in self.smsHeadersParameters.value.split('\n'): + if self.headersParameters.value.strip(): + for header in self.headersParameters.value.split('\n'): if header.strip(): headerName, headerValue = header.split(':', 1) session.headers[headerName.strip()] = headerValue.strip() return session - def processResponse(self, response: requests.Response) -> mfas.MFA.RESULT: + def processResponse(self, request: 'ExtendedHttpRequest', response: requests.Response) -> mfas.MFA.RESULT: logger.debug('Response: %s', response) if not response.ok: - if self.smsResponseErrorAction.value == '1': + if self.responseErrorAction.value == '1': raise Exception(_('SMS sending failed')) - elif self.smsResponseOkRegex.value.strip(): - logger.debug('Checking response OK regex: %s: (%s)', self.smsResponseOkRegex.value, re.search(self.smsResponseOkRegex.value, response.text)) - if not re.search(self.smsResponseOkRegex.value, response.text or ''): + elif self.responseOkRegex.value.strip(): + logger.debug('Checking response OK regex: %s: (%s)', self.responseOkRegex.value, re.search(self.responseOkRegex.value, response.text)) + if not re.search(self.responseOkRegex.value, response.text or ''): logger.error( 'SMS response error: %s', response.text, ) - if self.smsResponseErrorAction.value == '1': + if self.responseErrorAction.value == '0': + return mfas.MFA.RESULT.ALLOWED + elif self.responseErrorAction.value == '1': raise Exception('SMS response error') else: + isInNetwork = any(i.ipInNetwork(request.ip) for i in models.Network.objects.filter(uuid__in = self.networks.value)) + if self.responseErrorAction.value == '2': + # Allow user to log in if it IP is in the networks list + if isInNetwork: + return mfas.MFA.RESULT.ALLOWED + elif self.responseErrorAction.value == '3': + if isInNetwork: + raise Exception('SMS response error') return mfas.MFA.RESULT.ALLOWED return mfas.MFA.RESULT.OK - def sendSMS_GET(self, userId: str, username: str, url: str) -> mfas.MFA.RESULT: - return self.processResponse(self.getSession().get(url)) - def getData( - self, userId: str, username: str, url: str, code: str, phone: str + self, request: 'ExtendedHttpRequest', userId: str, username: str, url: str, code: str, phone: str ) -> bytes: data = '' - if self.smsSendingParameters.value: + if self.sendingParameters.value: data = ( - self.smsSendingParameters.value.replace('{code}', code) + self.sendingParameters.value.replace('{code}', code) .replace('{phone}', phone.replace('+', '')) .replace('{+phone}', phone) .replace('{username}', username) .replace('{justUsername}', username.split('@')[0]) ) - return data.encode(self.smsEncoding.value) + return data.encode(self.encoding.value) + + def sendSMS_GET(self, request: 'ExtendedHttpRequest', userId: str, username: str, url: str) -> mfas.MFA.RESULT: + return self.processResponse(request, self.getSession().get(url)) def sendSMS_POST( - self, userId: str, username: str, url: str, code: str, phone: str + self, request: 'ExtendedHttpRequest', userId: str, username: str, url: str, code: str, phone: str ) -> mfas.MFA.RESULT: # Compose POST data session = self.getSession() - bdata = self.getData(userId, username, url, code, phone) + bdata = self.getData(request, userId, username, url, code, phone) # Add content-length header session.headers['Content-Length'] = str(len(bdata)) - return self.processResponse(session.post(url, data=bdata)) + return self.processResponse(request, session.post(url, data=bdata)) def sendSMS_PUT( - self, userId: str, username: str, url: str, code: str, phone: str + self, request: 'ExtendedHttpRequest', userId: str, username: str, url: str, code: str, phone: str ) -> mfas.MFA.RESULT: # Compose POST data data = '' - bdata = self.getData(userId, username, url, code, phone) - return self.processResponse(self.getSession().put(url, data=bdata)) + bdata = self.getData(request, userId, username, url, code, phone) + return self.processResponse(request, self.getSession().put(url, data=bdata)) def sendSMS( - self, userId: str, username: str, code: str, phone: str + self, request: 'ExtendedHttpRequest', userId: str, username: str, code: str, phone: str ) -> mfas.MFA.RESULT: url = self.composeSmsUrl(userId, username, code, phone) - if self.smsSendingMethod.value == 'GET': - return self.sendSMS_GET(userId, username, url) - elif self.smsSendingMethod.value == 'POST': - return self.sendSMS_POST(userId, username, url, code, phone) - elif self.smsSendingMethod.value == 'PUT': - return self.sendSMS_PUT(userId, username, url, code, phone) + if self.sendingMethod.value == 'GET': + return self.sendSMS_GET(request, userId, username, url) + elif self.sendingMethod.value == 'POST': + return self.sendSMS_POST(request, userId, username, url, code, phone) + elif self.sendingMethod.value == 'PUT': + return self.sendSMS_PUT(request, userId, username, url, code, phone) else: raise Exception('Unknown SMS sending method') def label(self) -> str: return gettext('MFA Code') - def sendCode( - self, userId: str, username: str, identifier: str, code: str - ) -> mfas.MFA.RESULT: + def sendCode(self, request: 'ExtendedHttpRequest', userId: str, username: str, identifier: str, code: str) -> mfas.MFA.RESULT: logger.debug( 'Sending SMS code "%s" for user %s (userId="%s", identifier="%s")', code, @@ -272,4 +303,4 @@ class SMSMFA(mfas.MFA): userId, identifier, ) - return self.sendSMS(userId, username, code, identifier) + return self.sendSMS(request, userId, username, code, identifier) diff --git a/server/src/uds/models/network.py b/server/src/uds/models/network.py index d4d1e59c2..af1f5f77a 100644 --- a/server/src/uds/models/network.py +++ b/server/src/uds/models/network.py @@ -112,6 +112,12 @@ class Network(UUIDModel, TaggingMixin): # type: ignore """ return net.longToIp(self.net_end) + def ipInNetwork(self, ip: str) -> bool: + """ + Returns true if the specified ip is in this network + """ + return net.ipToLong(ip) >= self.net_start and net.ipToLong(ip) <= self.net_end + def update(self, name: str, netRange: str): """ Updated this network with provided values