1
0
mirror of https://github.com/dkmstr/openuds.git synced 2025-11-06 20:24:49 +03:00

Cleaning up repeated code for authenticators with regex fields

This commit is contained in:
Adolfo Gómez García
2023-10-19 19:21:59 +02:00
parent d0da06c492
commit 9419e0f69e
4 changed files with 173 additions and 163 deletions

View File

@@ -41,7 +41,7 @@ from django.utils.translation import gettext_noop as _
from uds.core import auths, exceptions, types
from uds.core.auths.auth import authLogLogin
from uds.core.ui import gui
from uds.core.util import ldaputil
from uds.core.util import ldaputil, auth as auth_utils
try:
# pylint: disable=no-name-in-module
@@ -236,9 +236,9 @@ class RegexLdap(auths.Authenticator):
def initialize(self, values: typing.Optional[typing.Dict[str, typing.Any]]) -> None:
if values:
self.__validateField(values['userNameAttr'], str(self.userNameAttr.label))
self.__validateField(values['userIdAttr'], str(self.userIdAttr.label))
self.__validateField(values['groupNameAttr'], str(self.groupNameAttr.label))
auth_utils.validateRegexField(self.userNameAttr, values['userNameAttr'])
auth_utils.validateRegexField(self.userIdAttr, values['userIdAttr'])
auth_utils.validateRegexField(self.groupNameAttr, values['groupNameAttr'])
self._host = values['host']
self._port = values['port']
@@ -257,22 +257,6 @@ class RegexLdap(auths.Authenticator):
self._verifySsl = gui.toBool(values['verifySsl'])
self._certificate = values['certificate']
def __validateField(self, field: str, fieldLabel: str) -> None:
"""
Validates the multi line fields refering to attributes
"""
for line in field.splitlines():
if line.find('=') != -1:
_, pattern = line.split('=')[0:2]
if pattern.find('(') == -1:
pattern = '(' + pattern + ')'
try:
re.search(pattern, '')
except Exception as e:
raise exceptions.ValidationError(
f'Invalid pattern in {fieldLabel}: {line}'
) from e
def __getAttrsFromField(self, field: str) -> typing.List[str]:
res = []
for line in field.splitlines():
@@ -293,64 +277,6 @@ class RegexLdap(auths.Authenticator):
res.append(attr)
return res
def __processField(
self, field: str, attributes: typing.MutableMapping[str, typing.Any]
) -> typing.List[str]:
res: typing.List[str] = []
def getAttr(attrName: str) -> typing.List[str]:
def asList(val: typing.Any) -> typing.List[str]:
if isinstance(val, list):
return val
return [val]
if '+' in attrName:
attrList = attrName.split('+')
# Check all attributes are present, and has only one value
if not all([len(attributes.get(a, [])) <= 1 for a in attrList]):
logger.warning('Attribute %s do not has exactly one value, skipping %s', attrName, line)
return []
val = [''.join([asList(attributes.get(a, ['']))[0] for a in attrList])]
elif ':' in attrName:
# Prepend the value after : to value before :
attr, prependable = attrName.split(':')
val = [prependable + a for a in asList(attributes.get(attr, []))]
else:
val = asList(attributes.get(attrName, []))
return val
logger.debug('******** Attributes: %s', attributes)
for line in field.splitlines():
equalPos = line.find('=')
if (
equalPos == -1
): # if no attr=pattern form, convert to it (only "attr", so it will be "attr=(.*)")
line += '=(.*)'
equalPos = line.find('=')
attr, pattern = (line[:equalPos].lower(), line[equalPos + 1 :])
# if pattern do not have groups, define one with complete pattern (i.e. id=.* --> id=(.*))
if pattern.find('(') == -1:
pattern = '(' + pattern + ')'
val = getAttr(attr)
logger.debug('Pattern: %s', pattern)
for v in val:
try:
logger.debug('Pattern: %s on value %s', pattern, v)
searchResult = re.search(
pattern, v, re.IGNORECASE
) # @UndefinedVariable
if searchResult is None:
continue
logger.debug("Found against %s: %s ", v, searchResult.groups())
res.append(''.join(searchResult.groups()))
except Exception: # nosec: If not a valid regex, just ignore it
pass # Ignore exceptions here
logger.debug('Res: %s', res)
return res
def mfaStorageKey(self, username: str) -> str:
return 'mfa_' + self.dbObj().uuid + username
@@ -545,13 +471,13 @@ class RegexLdap(auths.Authenticator):
return user
def __getGroups(self, user: ldaputil.LDAPResultType):
grps = self.__processField(self._groupNameAttr, user)
grps = auth_utils.processRegexField(self._groupNameAttr, user)
if extra:
grps += extra.getGroups(self, user)
return grps
def __getUserRealName(self, user: ldaputil.LDAPResultType):
return ' '.join(self.__processField(self._userNameAttr, user))
return ' '.join(auth_utils.processRegexField(self._userNameAttr, user))
def authenticate(
self,