Refactoring Authenticators related and minor type checking fixes

This commit is contained in:
Adolfo Gómez García 2022-10-31 19:24:14 +01:00
parent 0cf33501b6
commit 937240a9fc
No known key found for this signature in database
GPG Key ID: DD1ABF20724CDA23
31 changed files with 197 additions and 321 deletions

View File

@ -36,6 +36,7 @@ import typing
from django.utils.translation import gettext, gettext_lazy as _
from uds.models import Authenticator, Network, MFA
from uds.core import auths
from uds.core.environment import Environment
from uds.REST import NotFound
from uds.REST.model import ModelHandler
@ -70,7 +71,11 @@ class Authenticators(ModelHandler):
{'priority': {'title': _('Priority'), 'type': 'numeric', 'width': '5em'}},
{'small_name': {'title': _('Label')}},
{'users_count': {'title': _('Users'), 'type': 'numeric', 'width': '5em'}},
{'mfa_name': {'title': _('MFA'),}},
{
'mfa_name': {
'title': _('MFA'),
}
},
{'tags': {'title': _('tags'), 'visible': False}},
]
@ -97,8 +102,10 @@ class Authenticators(ModelHandler):
try:
authType = auths.factory().lookup(type_)
if authType:
# Create a new instance of the authenticator to access to its GUI
authInstance = authType(Environment.getTempEnv(), None)
field = self.addDefaultFields(
authType.guiDescription(),
authInstance.guiDescription(),
['name', 'comments', 'tags', 'priority', 'small_name', 'networks'],
)
self.addField(
@ -220,10 +227,9 @@ class Authenticators(ModelHandler):
if not authType:
raise self.invalidRequestException('Invalid type: {}'.format(type_))
tmpEnvironment = Environment.getTempEnv()
dct = self._params.copy()
dct['_request'] = self._request
res = authType.test(tmpEnvironment, dct)
res = authType.test(Environment.getTempEnv(), dct)
if res[0]:
return self.success()
return res[1]
@ -234,9 +240,7 @@ class Authenticators(ModelHandler):
logger.debug(self._params)
if fields.get('mfa_id'):
try:
mfa = MFA.objects.get(
uuid=processUuid(fields['mfa_id'])
)
mfa = MFA.objects.get(uuid=processUuid(fields['mfa_id']))
fields['mfa_id'] = mfa.id
return
except MFA.DoesNotExist:
@ -244,8 +248,6 @@ class Authenticators(ModelHandler):
fields['mfa_id'] = None
def deleteItem(self, item: Authenticator):
# For every user, remove assigned services (mark them for removal)

View File

@ -36,6 +36,7 @@ import typing
from django.utils.translation import gettext_lazy as _, gettext
from uds import models
from uds.core import mfas
from uds.core.environment import Environment
from uds.core.ui import gui
from uds.core.util import permissions
@ -63,11 +64,14 @@ class MFA(ModelHandler):
return mfas.factory().providers().values()
def getGui(self, type_: str) -> typing.List[typing.Any]:
mfa = mfas.factory().lookup(type_)
mfaType = mfas.factory().lookup(type_)
if not mfa:
if not mfaType:
raise self.invalidItemException()
# Create a temporal instance to get the gui
mfa = mfaType(Environment.getTempEnv(), None)
localGui = self.addDefaultFields(
mfa.guiDescription(), ['name', 'comments', 'tags']
)

View File

@ -34,6 +34,7 @@ import logging
import typing
from django.utils.translation import gettext_lazy as _, gettext
from uds.core.environment import Environment
from uds.models import Notifier, NotificationLevel
from uds.core import messaging
from uds.core.ui import gui
@ -71,11 +72,13 @@ class Notifiers(ModelHandler):
return messaging.factory().providers().values()
def getGui(self, type_: str) -> typing.List[typing.Any]:
notifier = messaging.factory().lookup(type_)
notifierType = messaging.factory().lookup(type_)
if not notifier:
if not notifierType:
raise self.invalidItemException()
notifier = notifierType(Environment.getTempEnv(), None)
localGui = self.addDefaultFields(
notifier.guiDescription(), ['name', 'comments', 'tags']
)

View File

@ -36,6 +36,7 @@ import typing
from django.utils.translation import gettext, gettext_lazy as _
from uds.core import osmanagers
from uds.core.environment import Environment
from uds.core.util import permissions
from uds.models import OSManager
from uds.REST import NotFound, RequestError
@ -90,8 +91,15 @@ class OsManagers(ModelHandler):
# Gui related
def getGui(self, type_: str) -> typing.List[typing.Any]:
try:
osmanagerType = osmanagers.factory().lookup(type_)
if not osmanagerType:
raise NotFound('OS Manager type not found')
osmanager = osmanagerType(Environment.getTempEnv(), None)
return self.addDefaultFields(
osmanagers.factory().lookup(type_).guiDescription(), # type: ignore # may raise an exception if lookup fails
osmanager.guiDescription(), # type: ignore # may raise an exception if lookup fails
['name', 'comments', 'tags'],
)
except:

View File

@ -34,6 +34,7 @@ import logging
import typing
from django.utils.translation import gettext, gettext_lazy as _
from uds.core.environment import Environment
from uds.models import Provider, Service, UserService
from uds.core import services
@ -123,10 +124,11 @@ class Providers(ModelHandler):
# Gui related
def getGui(self, type_: str) -> typing.List[typing.Any]:
clsType = services.factory().lookup(type_)
if clsType:
providerType = services.factory().lookup(type_)
if providerType:
provider = providerType(Environment.getTempEnv(), None)
return self.addDefaultFields(
clsType.guiDescription(), ['name', 'comments', 'tags']
provider.guiDescription(), ['name', 'comments', 'tags']
)
raise NotFound('Type not found!')

View File

@ -35,9 +35,12 @@ import typing
from django.utils.translation import gettext_lazy as _
from uds.core.environment import Environment
from uds.REST import model
from uds import reports
if typing.TYPE_CHECKING:
from uds.core.reports.report import Report
logger = logging.getLogger(__name__)
@ -78,7 +81,7 @@ class Reports(model.BaseModelHandler):
# Field from where to get "class" and prefix for that class, so this will generate "row-state-A, row-state-X, ....
table_row_style = {'field': 'state', 'prefix': 'row-state-'}
def _findReport(self, uuid, values=None):
def _findReport(self, uuid: str, values=None) -> 'Report':
found = None
logger.debug('Looking for report %s', uuid)
for i in reports.availableReports:
@ -147,7 +150,7 @@ class Reports(model.BaseModelHandler):
# Gui related
def getGui(self, type_: str) -> typing.List[typing.Any]:
report = self._findReport(type_)
return sorted(report.guiDescription(report), key=lambda f: f['gui']['order'])
return sorted(report.guiDescription(), key=lambda f: f['gui']['order'])
# Returns the list of
def getItems(

View File

@ -143,7 +143,7 @@ class Services(DetailHandler): # pylint: disable=too-many-public-methods
if service:
try:
service.delete()
except Exception:
except Exception: # nosec: This is a delete, we don't care about exceptions
pass
def saveItem(self, parent: 'Provider', item: typing.Optional[str]) -> None:
@ -288,7 +288,7 @@ class Services(DetailHandler): # pylint: disable=too-many-public-methods
Environment.getTempEnv(), parentInstance
) # Instantiate it so it has the opportunity to alter gui description based on parent
localGui = self.addDefaultFields(
service.guiDescription(service), ['name', 'comments', 'tags']
service.guiDescription(), ['name', 'comments', 'tags']
)
self.addField(
localGui,

View File

@ -34,6 +34,7 @@ import logging
import typing
from django.utils.translation import gettext_lazy as _, gettext
from uds.core.environment import Environment
from uds.models import Transport, Network, ServicePool
from uds.core import transports
from uds.core.ui import gui
@ -81,11 +82,13 @@ class Transports(ModelHandler):
return transports.factory().providers().values()
def getGui(self, type_: str) -> typing.List[typing.Any]:
transport = transports.factory().lookup(type_)
transportType = transports.factory().lookup(type_)
if not transport:
if not transportType:
raise self.invalidItemException()
transport = transportType(Environment.getTempEnv(), None)
field = self.addDefaultFields(
transport.guiDescription(), ['name', 'comments', 'tags', 'priority', 'networks']
)
@ -118,7 +121,7 @@ class Transports(ModelHandler):
'values': [
{'id': x.uuid, 'text': x.name}
for x in ServicePool.objects.all().order_by('name')
if x.service and transport.protocol in x.service.getType().allowedProtocols
if x.service and transportType.protocol in x.service.getType().allowedProtocols
],
'label': gettext('Service Pools'),
'tooltip': gettext('Currently assigned services pools'),

View File

@ -108,7 +108,7 @@ class InternalDBAuth(auths.Authenticator):
def mfaIdentifier(self, username: str) -> str:
try:
self.dbAuthenticator().users.get(name=username, state=State.ACTIVE).mfaData
self.dbAuthenticator().users.get(name=username, state=State.ACTIVE).mfa_data
except Exception: # nosec: This is e controled pickle loading
pass
return ''

View File

@ -131,7 +131,7 @@ class RadiusClient:
# Second element of return value is the mfa code from field
def authenticate(
self, username: str, password: str, mfaField: str
self, username: str, password: str, mfaField: str = ''
) -> typing.Tuple[typing.List[str], str]:
reply = self.sendAccessRequest(username, password)

View File

@ -216,13 +216,7 @@ class RegexLdap(auths.Authenticator):
_altClass: str = ''
_mfaAttr: str = ''
def __init__(
self,
dbAuth: 'models.Authenticator',
environment: 'Environment',
values: typing.Optional[typing.Dict[str, str]],
):
super().__init__(dbAuth, environment, values)
def initialize(self, values: typing.Optional[typing.Dict[str, typing.Any]]) -> None:
if values:
self.__validateField(values['userNameAttr'], str(self.userNameAttr.label))
self.__validateField(values['userIdAttr'], str(self.userIdAttr.label))
@ -243,6 +237,7 @@ class RegexLdap(auths.Authenticator):
self._altClass = values['altClass']
self._mfaAttr = values['mfaAttr']
def __validateField(self, field: str, fieldLabel: str) -> None:
"""
Validates the multi line fields refering to attributes
@ -302,7 +297,7 @@ class RegexLdap(auths.Authenticator):
continue
logger.debug("Found against %s: %s ", v, searchResult.groups())
res.append(''.join(searchResult.groups()))
except Exception:
except Exception: # nosec
pass # Ignore exceptions here
logger.debug('Res: %s', res)
return res
@ -689,7 +684,7 @@ class RegexLdap(auths.Authenticator):
'Ldap user class seems to be incorrect (no user found by that class)'
),
]
except Exception:
except Exception: # nosec: Control flow
# If found 1 or more, all right
pass
@ -714,7 +709,7 @@ class RegexLdap(auths.Authenticator):
'Ldap user id attr is probably wrong (can\'t find any user with both conditions)'
),
]
except Exception:
except Exception: # nosec: Control flow
# If found 1 or more, all right
pass
@ -735,7 +730,7 @@ class RegexLdap(auths.Authenticator):
== 1
):
continue
except Exception:
except Exception: # nosec: Control flow
continue
return [
False,
@ -750,7 +745,7 @@ class RegexLdap(auths.Authenticator):
# Check validity of regular expression (try to compile it)
# this only right now
pass
except Exception:
except Exception: # nosec: Control flow
pass
return [

View File

@ -34,6 +34,7 @@ import logging
import typing
from django.utils.translation import gettext_noop as _
from uds.core.auths.authenticator import AuthenticationResult, AuthenticationSuccess
from uds.core.ui import gui
from uds.core import auths
@ -278,7 +279,7 @@ class SampleAuth(auths.Authenticator):
def authCallback(
self, parameters: typing.Dict[str, typing.Any], gm: 'auths.GroupsManager', request: 'ExtendedHttpRequestWithUser'
) -> typing.Optional[str]:
) -> AuthenticationResult:
"""
We provide this as a sample of callback for an user.
We will accept all petitions that has "user" parameter
@ -294,7 +295,7 @@ class SampleAuth(auths.Authenticator):
"""
user = parameters.get('user', None)
return user
return AuthenticationResult(AuthenticationSuccess.OK, username=user)
def createUser(self, usrData: typing.Dict[str, str]) -> None:
"""

View File

@ -555,7 +555,7 @@ class SimpleLDAPAuthenticator(auths.Authenticator):
'Ldap user class seems to be incorrect (no user found by that class)'
),
]
except Exception as e:
except Exception as e: # nosec: Flow control
# If found 1 or more, all right
pass
@ -578,7 +578,7 @@ class SimpleLDAPAuthenticator(auths.Authenticator):
'Ldap group class seems to be incorrect (no group found by that class)'
),
]
except Exception as e:
except Exception as e: # nosec: Flow control
# If found 1 or more, all right
pass
@ -601,7 +601,7 @@ class SimpleLDAPAuthenticator(auths.Authenticator):
'Ldap user id attribute seems to be incorrect (no user found by that attribute)'
),
]
except Exception as e:
except Exception as e: # nosec: Flow control
# If found 1 or more, all right
pass
@ -624,7 +624,7 @@ class SimpleLDAPAuthenticator(auths.Authenticator):
'Ldap group id attribute seems to be incorrect (no group found by that attribute)'
),
]
except Exception as e:
except Exception as e: # nosec: Flow control
# If found 1 or more, all right
pass
@ -649,7 +649,7 @@ class SimpleLDAPAuthenticator(auths.Authenticator):
'Ldap user class or user id attr is probably wrong (can\'t find any user with both conditions)'
),
]
except Exception as e:
except Exception as e: # nosec: Flow control
# If found 1 or more, all right
pass

View File

@ -418,7 +418,7 @@ def webLogin(
)
# If Enabled zero trust, do not cache credentials
if GlobalConfig.ENFORCE_ZERO_TRUST.getBool(False):
password = ''
password = '' # nosec: clear password if zero trust is enabled
request.session[USER_KEY] = user.id
request.session[PASS_KEY] = codecs.encode(cryptoManager().symCrypt(password, cookie), "base64").decode() # as str

View File

@ -100,7 +100,7 @@ class Authenticator(Module):
As always, if you override __init__, do not forget to invoke base __init__ as this::
super(self.__class__, self).__init__(self, dbAuth, environment, values)
super(self.__class__, self).__init__(self, environment, values, dbAuth)
This is a MUST, so internal structured gets filled correctly, so don't forget it!.
@ -203,9 +203,9 @@ class Authenticator(Module):
def __init__(
self,
dbAuth: 'models.Authenticator',
environment: 'Environment',
values: typing.Optional[typing.Dict[str, str]],
dbAuth: typing.Optional['models.Authenticator'] = None,
):
"""
Instantiathes the authenticator.
@ -213,7 +213,7 @@ class Authenticator(Module):
@param environment: Environment for the authenticator
@param values: Values passed to element
"""
self._dbAuth = dbAuth
self._dbAuth = dbAuth or models.Authenticator() # Fake dbAuth if not provided
super(Authenticator, self).__init__(environment, values)
self.initialize(values)

View File

@ -35,189 +35,11 @@
import logging
import typing
from django import forms
from django.utils.translation import gettext as _, gettext_lazy
from uds.core.util import singleton
if typing.TYPE_CHECKING:
from uds.models import User
from django.utils.translation import gettext as _
logger = logging.getLogger(__name__)
# UserPrefs is DEPRECATED
# Currently not used anywhere
class UserPrefsManager(metaclass=singleton.Singleton):
_prefs: typing.Dict[str, typing.Dict]
def __init__(self):
self._prefs = {}
@staticmethod
def manager() -> 'UserPrefsManager':
return UserPrefsManager()
def __nameFor(self, module, name):
return module + "_" + name
def registerPrefs(
self, modName: str, friendlyModName: str, prefs: typing.Any
) -> None:
"""
Register an array of preferences for a module
"""
self._prefs[modName] = {'friendlyName': friendlyModName, 'prefs': prefs}
def getPreferencesForUser(self, modName: str, user: 'User'):
"""
Gets the preferences for an specified module for the user
"""
# logger.debug('Self prefs: %s', self._prefs)
prefs = {}
for up in user.preferences.filter(module=modName): # type: ignore
prefs[up.name] = up.value
for p in self._prefs[modName]['prefs']:
if p.getName() not in prefs:
prefs[p.getName()] = p.getDefValue()
logger.debug('Preferences: %s', prefs)
return prefs
def setPreferenceForUser(
self, user: 'User', modName: str, prefName: str, value: str
):
try:
user.preferences.create(module=modName, name=prefName, value=value) # type: ignore
except Exception: # Already exits, update it
user.preferences.filter(module=modName, name=prefName).update(value=value) # type: ignore
def getHtmlForUserPreferences(self, user: 'User'):
# First fill data for all preferences
data = {}
for up in user.preferences.all().order_by('module'): # type: ignore
data[self.__nameFor(up.module, up.name)] = up.value
res = ''
for mod, v in sorted(self._prefs.items()):
form = forms.Form()
for p in v['prefs']:
name = self.__nameFor(mod, p.getName())
val = data[name] if name in data else p.getDefValue()
form.fields[name] = p.formField(val)
res += (
'<fieldset class="prefset"><legend>'
+ v['friendlyName']
+ '</legend>'
+ form.as_p()
+ '</fieldset>'
)
return res
def getGuiForUserPreferences(self, user=None):
data = {}
if user is not None:
for up in user.preferences.all():
data[self.__nameFor(up.module, up.name)] = up.value
res = []
for mod, v in self._prefs.items():
grp = []
for p in v['prefs']:
name = self.__nameFor(mod, p.getName())
val = data[name] if name in data else p.getDefValue()
grp.append(
{
'name': name,
'gui': p.guiField(val).guiDescription(),
'value': val,
}
)
res.append({'moduleLabel': v['friendlyName'], 'prefs': grp})
return res
def processRequestForUserPreferences(self, user, data):
"""
Returns a list of errors in case of error, else return None
"""
# First, read fields form every single "section"
logger.debug('Processing %s', self._prefs)
prefs = []
for mod, v in self._prefs.items():
logger.debug(mod)
form = forms.Form(data)
for p in v['prefs']:
name = self.__nameFor(mod, p.getName())
form.fields[name] = p.formField(None)
if form.is_valid() is False:
logger.debug("errors")
return form.errors
for p in v['prefs']:
name = self.__nameFor(mod, p.getName())
logger.debug(name)
prefs.append(
{
'module': mod,
'name': p.getName(),
'value': form.cleaned_data[name],
}
)
user.preferences.all().delete()
try:
for p in prefs:
user.preferences.create(
module=p['module'], name=p['name'], value=p['value']
)
except Exception: # User does not exists
logger.info('Trying to dave user preferences failed (probably root user?)')
return None
def processGuiForUserPreferences(self, user, data):
"""
Processes the preferences got from user
"""
logger.debug('Processing data %s', data)
prefs = []
for mod, v in self._prefs.items():
logger.debug(mod)
for p in v['prefs']:
name = self.__nameFor(mod, p.getName())
if name in data:
prefs.append(
{'module': mod, 'name': p.getName(), 'value': data[name]}
)
user.preferences.all().delete()
for p in prefs:
user.preferences.create(
module=p['module'], name=p['name'], value=p['value']
)
class UserPreference(object):
TYPE = 'abstract'
def __init__(self, **kwargs):
self._name = kwargs['name']
self._label = kwargs['label']
self._defValue = kwargs.get('defvalue', None)
self._css = 'form-control'
def getName(self):
return self._name
def getDefValue(self):
return self._defValue
def formField(self, value):
"""
Returns a form field to add to the preferences form
"""
raise NotImplementedError('Can\'t create an abstract preference!!!')
def guiField(self, value):
"""
returns a gui field
"""
return None
class CommonPrefs(object):
class CommonPrefs:
SZ_PREF = 'screenSize'
SZ_640x480 = '1'
SZ_800x600 = '2'

View File

@ -30,7 +30,7 @@
.. moduleauthor:: Adolfo Gómez, dkmaster at dkmon dot com
"""
from .provider import Notifier, NotificationLevel
from .factory import NotifierFactory
from .msgfactory import NotifierFactory
from . import config

View File

@ -92,7 +92,7 @@ class Report(UserInterface):
"""
# url fetcher for weasyprint
def report_fetcher(url: str) -> typing.Dict:
def report_fetcher(url: str, timeout=10, ssl_context=None) -> typing.Dict:
logger.debug('Getting url for weasyprint %s', url)
if url.startswith('stock://'):
imagePath = stock.getStockImagePath(url[8:])

View File

@ -127,7 +127,7 @@ class gui:
return str(self.value)
# : For backward compatibility, will be removed in future versions
# For now, will log an warning if used
# For now, will log a warning if used
@deprecatedClassValue('gui.Tab.ADVANCED')
def ADVANCED_TAB(cls) -> str:
return str(gui.Tab.ADVANCED)
@ -928,7 +928,7 @@ class gui:
"""
Set the values for this multi choice field
"""
self._data['values'] = values
self._data['values'] = gui.convertToChoices(values)
class EditableList(InputField):
"""
@ -1013,7 +1013,7 @@ class UserInterfaceType(type):
if isinstance(attr, gui.InputField):
_gui[attrName] = attr
newClassDict[attrName] = attr
newClassDict['_gui'] = _gui
newClassDict['_base_gui'] = _gui
return typing.cast(
'UserInterfaceType', type.__new__(cls, classname, bases, newClassDict)
)
@ -1030,7 +1030,11 @@ class UserInterface(metaclass=UserInterfaceType):
By default, the values passed to this class constructor are used to fill
the gui form fields values.
"""
# Class variable that will hold the gui fields description
_base_gui: typing.ClassVar[typing.Dict[str, gui.InputField]]
# instance variable that will hold the gui fields description
# this allows us to modify the gui fields values at runtime without affecting other instances
_gui: typing.Dict[str, gui.InputField]
def __init__(self, values: gui.ValuesType = None) -> None:
@ -1045,9 +1049,9 @@ class UserInterface(metaclass=UserInterfaceType):
# Ensure "gui" points to a copy of original gui, not the original one
# this is done to avoid modifying the original gui description
self._gui = copy.deepcopy(self._gui)
self._gui = copy.deepcopy(self._base_gui)
for key, val in self._gui.items(): # And refresh self references to them
setattr(self, key, val)
setattr(self, key, val) # val is an InputField instance, so it is a reference to self._gui[key]
if values is not None:
for k, v in self._gui.items():
@ -1076,12 +1080,6 @@ class UserInterface(metaclass=UserInterfaceType):
of this posibility in a near version...
"""
@classmethod
def initClassGui(cls) -> None:
"""
This method is used to initialize the gui fields of the class.
"""
def valuesDict(self) -> gui.ValuesDictType:
"""
Returns own data needed for user interaction as a dict of key-names ->
@ -1222,9 +1220,8 @@ class UserInterface(metaclass=UserInterfaceType):
# Values can contain invalid characters, so we log every single char
# logger.info('Invalid serialization data on {0} {1}'.format(self, values.encode('hex')))
@classmethod
def guiDescription(
cls, obj: typing.Optional['UserInterface'] = None
self
) -> typing.List[typing.MutableMapping[str, typing.Any]]:
"""
This simple method generates the theGui description needed by the
@ -1236,16 +1233,11 @@ class UserInterface(metaclass=UserInterfaceType):
This will only happen (not to be None) in Services.
"""
logger.debug('Active language for theGui translation: %s', get_language())
theGui: typing.Union[typing.Type['UserInterface'], 'UserInterface'] = cls
if obj:
obj.initGui() # We give the "oportunity" to fill necesary theGui data before providing it to client
theGui = obj
else:
cls.initClassGui() # We give the "oportunity" to fill necesary theGui data before providing it to client
self.initGui() # We give the "oportunity" to fill necesary theGui data before providing it to client
res: typing.List[typing.MutableMapping[str, typing.Any]] = [
{'name': key, 'gui': val.guiDescription(), 'value': ''}
for key, val in theGui._gui.items()
for key, val in self._gui.items()
]
logger.debug('theGui description: %s', res)
return res

View File

@ -85,7 +85,7 @@ def getSerializedFromModel(
removableFields = removableFields or []
passwordFields = passwordFields or []
try:
values = mod._meta.managers[0].filter(pk=mod.pk).values()[0]
values = mod._meta.managers[0].filter(pk=mod.pk).values()[0] # type: ignore
for i in ['uuid', 'id'] + removableFields:
if i in values:
del values[i]

View File

@ -128,7 +128,6 @@ class EmailMFA(mfas.MFA):
tab=_('Config'),
)
allowLoginWithoutMFA = gui.ChoiceField(
label=_('Policy for users without MFA support'),
order=31,
@ -182,20 +181,26 @@ class EmailMFA(mfas.MFA):
self.fromEmail.value = validators.validateEmail(self.fromEmail.value)
def html(self, request: 'ExtendedHttpRequest') -> str:
return gettext('Check your mail. You will receive an email with the verification code')
return gettext(
'Check your mail. You will receive an email with the verification code'
)
@classmethod
def initClassGui(cls) -> None:
def initGui(self) -> None:
# Populate the networks list
cls.networks.setValues([
self.networks.setValues(
[
gui.choiceItem(v.uuid, v.name)
for v in models.Network.objects.all().order_by('name') if v.uuid
])
for v in models.Network.objects.all().order_by('name')
if v.uuid
]
)
def checkAction(self, action: str, request: 'ExtendedHttpRequest') -> bool:
def checkIp() -> bool:
return any(i.ipInNetwork(request.ip) for i in models.Network.objects.filter(uuid__in = self.networks.value))
return any(
i.ipInNetwork(request.ip)
for i in models.Network.objects.filter(uuid__in=self.networks.value)
)
if action == '0':
return True
@ -215,7 +220,9 @@ class EmailMFA(mfas.MFA):
return 'OTP received via email'
@decorators.threaded
def doSendCode(self, request: 'ExtendedHttpRequest', identifier: str, code: str) -> None:
def doSendCode(
self, request: 'ExtendedHttpRequest', identifier: str, code: str
) -> None:
# Send and email with the notification
with self.login() as smtp:
try:
@ -225,18 +232,39 @@ class EmailMFA(mfas.MFA):
msg['From'] = self.fromEmail.cleanStr()
msg['To'] = identifier
msg.attach(MIMEText(f'A login attemt has been made from {request.ip}.\nTo continue, provide the verification code {code}', 'plain'))
msg.attach(
MIMEText(
f'A login attemt has been made from {request.ip}.\nTo continue, provide the verification code {code}',
'plain',
)
)
if self.enableHTML.value:
msg.attach(MIMEText(f'<p>A login attemt has been made from <b>{request.ip}</b>.</p><p>To continue, provide the verification code <b>{code}</b></p>', 'html'))
msg.attach(
MIMEText(
f'<p>A login attemt has been made from <b>{request.ip}</b>.</p><p>To continue, provide the verification code <b>{code}</b></p>',
'html',
)
)
smtp.sendmail(self.fromEmail.value, identifier, msg.as_string())
except smtplib.SMTPException as e:
logger.error('Error sending email: {}'.format(e))
raise
def sendCode(self, request: 'ExtendedHttpRequest', userId: str, username: str, identifier: str, code: str) -> mfas.MFA.RESULT:
self.doSendCode(request, identifier, code,)
def sendCode(
self,
request: 'ExtendedHttpRequest',
userId: str,
username: str,
identifier: str,
code: str,
) -> mfas.MFA.RESULT:
self.doSendCode(
request,
identifier,
code,
)
return mfas.MFA.RESULT.OK
def login(self) -> smtplib.SMTP:

View File

@ -145,10 +145,9 @@ class RadiusOTP(mfas.MFA):
def initialize(self, values: 'Module.ValuesType') -> None:
return super().initialize(values)
@classmethod
def initClassGui(cls) -> None:
def initGui(self) -> None:
# Populate the networks list
cls.networks.setValues(
self.networks.setValues(
[
gui.choiceItem(v.uuid, v.name)
for v in models.Network.objects.all().order_by('name')

View File

@ -221,14 +221,15 @@ class SMSMFA(mfas.MFA):
def initialize(self, values: 'Module.ValuesType') -> None:
return super().initialize(values)
@classmethod
def initClassGui(cls) -> None:
def initGui(self) -> None:
# Populate the networks list
cls.networks.setValues([
self.networks.setValues(
[
gui.choiceItem(v.uuid, v.name)
for v in models.Network.objects.all().order_by('name')
if v.uuid
])
]
)
def composeSmsUrl(self, userId: str, userName: str, code: str, phone: str) -> str:
url = self.sendingUrl.value
@ -266,10 +267,12 @@ class SMSMFA(mfas.MFA):
session.headers[headerName.strip()] = headerValue.strip()
return session
def checkAction(self, action: str, request: 'ExtendedHttpRequest') -> bool:
def checkIp() -> bool:
return any(i.ipInNetwork(request.ip) for i in models.Network.objects.filter(uuid__in = self.networks.value))
return any(
i.ipInNetwork(request.ip)
for i in models.Network.objects.filter(uuid__in=self.networks.value)
)
if action == '0':
return True
@ -285,13 +288,19 @@ class SMSMFA(mfas.MFA):
def emptyIndentifierAllowedToLogin(self, request: 'ExtendedHttpRequest') -> bool:
return self.checkAction(self.allowLoginWithoutMFA.value, request)
def processResponse(self, request: 'ExtendedHttpRequest', response: requests.Response) -> mfas.MFA.RESULT:
def processResponse(
self, request: 'ExtendedHttpRequest', response: requests.Response
) -> mfas.MFA.RESULT:
logger.debug('Response: %s', response)
if not response.ok:
if self.responseErrorAction.value == '1':
raise Exception(_('SMS sending failed'))
elif self.responseOkRegex.value.strip():
logger.debug('Checking response OK regex: %s: (%s)', self.responseOkRegex.value, re.search(self.responseOkRegex.value, response.text))
logger.debug(
'Checking response OK regex: %s: (%s)',
self.responseOkRegex.value,
re.search(self.responseOkRegex.value, response.text),
)
if not re.search(self.responseOkRegex.value, response.text or ''):
logger.error(
'SMS response error: %s',
@ -303,7 +312,13 @@ class SMSMFA(mfas.MFA):
return mfas.MFA.RESULT.OK
def getData(
self, request: 'ExtendedHttpRequest', userId: str, username: str, url: str, code: str, phone: str
self,
request: 'ExtendedHttpRequest',
userId: str,
username: str,
url: str,
code: str,
phone: str,
) -> bytes:
data = ''
if self.sendingParameters.value:
@ -316,11 +331,19 @@ class SMSMFA(mfas.MFA):
)
return data.encode(self.encoding.value)
def sendSMS_GET(self, request: 'ExtendedHttpRequest', userId: str, username: str, url: str) -> mfas.MFA.RESULT:
def sendSMS_GET(
self, request: 'ExtendedHttpRequest', userId: str, username: str, url: str
) -> mfas.MFA.RESULT:
return self.processResponse(request, self.getSession().get(url))
def sendSMS_POST(
self, request: 'ExtendedHttpRequest', userId: str, username: str, url: str, code: str, phone: str
self,
request: 'ExtendedHttpRequest',
userId: str,
username: str,
url: str,
code: str,
phone: str,
) -> mfas.MFA.RESULT:
# Compose POST data
session = self.getSession()
@ -331,7 +354,13 @@ class SMSMFA(mfas.MFA):
return self.processResponse(request, session.post(url, data=bdata))
def sendSMS_PUT(
self, request: 'ExtendedHttpRequest', userId: str, username: str, url: str, code: str, phone: str
self,
request: 'ExtendedHttpRequest',
userId: str,
username: str,
url: str,
code: str,
phone: str,
) -> mfas.MFA.RESULT:
# Compose POST data
data = ''
@ -339,7 +368,12 @@ class SMSMFA(mfas.MFA):
return self.processResponse(request, self.getSession().put(url, data=bdata))
def sendSMS(
self, request: 'ExtendedHttpRequest', userId: str, username: str, code: str, phone: str
self,
request: 'ExtendedHttpRequest',
userId: str,
username: str,
code: str,
phone: str,
) -> mfas.MFA.RESULT:
url = self.composeSmsUrl(userId, username, code, phone)
if self.sendingMethod.value == 'GET':
@ -355,9 +389,18 @@ class SMSMFA(mfas.MFA):
return gettext('MFA Code')
def html(self, request: 'ExtendedHttpRequest') -> str:
return gettext('Check your phone. You will receive an SMS with the verification code')
return gettext(
'Check your phone. You will receive an SMS with the verification code'
)
def sendCode(self, request: 'ExtendedHttpRequest', userId: str, username: str, identifier: str, code: str) -> mfas.MFA.RESULT:
def sendCode(
self,
request: 'ExtendedHttpRequest',
userId: str,
username: str,
identifier: str,
code: str,
) -> mfas.MFA.RESULT:
logger.debug(
'Sending SMS code "%s" for user %s (userId="%s", identifier="%s")',
code,

View File

@ -109,12 +109,12 @@ class Authenticator(ManagedObjectModel, TaggingMixin):
"""
if self.id is None:
return auths.Authenticator(
self, environment.Environment.getTempEnv(), values
environment.Environment.getTempEnv(), values, dbAuth=self
)
auType = self.getType()
env = self.getEnvironment()
auth = auType(self, env, values)
auth = auType(env, values, dbAuth=self)
self.deserialize(auth, values)
return auth

View File

@ -37,7 +37,6 @@ from django.db import models
from django.db.models import Count, Q, signals
from uds.core import auths
from uds.core.util import log, storage
from uds.models import permissions
from .authenticator import Authenticator
from .util import NEVER, UnsavedForeignKey, getSqlDatetime
@ -127,27 +126,6 @@ class User(UUIDModel):
"""
return self.staff_member or self.is_admin
def prefs(self, modName) -> typing.Dict:
"""
Returns the preferences for this user for the provided module name.
Usually preferences will be associated with transports, but can be preferences registered by ANY module.
Args:
modName: name of the module to get preferences for
Returns:
The preferences for the module specified as a dictionary (can be empty if module is not found).
If the module exists, the preferences will always contain something, but may be the values are the default ones.
"""
from uds.core.managers.user_preferences import UserPrefsManager
return UserPrefsManager.manager().getPreferencesForUser(modName, self)
def updateLastAccess(self) -> None:
"""
Updates the last access for this user with the current time of the sql server

View File

@ -31,7 +31,5 @@
@author: Adolfo Gómez, dkmaster at dkmon dot com
"""
from django.utils.translation import gettext_noop as _
from uds.core.managers.user_preferences import UserPrefsManager, CommonPrefs
from .rdp import RDPTransport
from .rdptunnel import TRDPTransport

View File

@ -105,7 +105,6 @@ class RDPTransport(BaseRDPTransport):
request: 'ExtendedHttpRequestWithUser',
) -> 'transports.TransportScript':
# We use helper to keep this clean
# prefs = user.prefs('rdp')
ci = self.getConnectionInfo(userService, user, password)
username, password, domain = ci['username'], ci['password'], ci['domain']

View File

@ -147,7 +147,6 @@ class TRDPTransport(BaseRDPTransport):
request: 'ExtendedHttpRequestWithUser',
) -> 'transports.TransportScript':
# We use helper to keep this clean
# prefs = user.prefs('rdp')
ci = self.getConnectionInfo(userService, user, password)
username, password, domain = ci['username'], ci['password'], ci['domain']

View File

@ -31,7 +31,5 @@
@author: Adolfo Gómez, dkmaster at dkmon dot com
"""
from django.utils.translation import gettext_noop as _
from uds.core.managers.user_preferences import UserPrefsManager, CommonPrefs
from .x2go import X2GOTransport
from .x2gotunnel import TX2GOTransport

View File

@ -34,7 +34,6 @@ import typing
from django.utils.translation import gettext_noop as _
from uds.core.ui import gui
from uds.core.managers.user_preferences import CommonPrefs
from uds.core.util import os_detector as OsDetector
from uds.core.util import tools, validators
from uds.core import transports