1
0
mirror of https://github.com/dkmstr/openuds.git synced 2024-12-22 13:34:04 +03:00

Refactoring using pylint

This commit is contained in:
Adolfo Gómez García 2023-04-15 03:23:17 +02:00
parent 8c0f3b68ed
commit 54efa40eef
No known key found for this signature in database
GPG Key ID: DD1ABF20724CDA23
16 changed files with 152 additions and 173 deletions

View File

@ -25,7 +25,7 @@ clear-cache-post-run=no
# A comma-separated list of package or module names from where C extensions may
# be loaded. Extensions are loading into the active Python interpreter and may
# run arbitrary code.
extension-pkg-allow-list=
extension-pkg-allow-list=lxml.etree
# A comma-separated list of package or module names from where C extensions may
# be loaded. Extensions are loading into the active Python interpreter and may
@ -108,7 +108,7 @@ unsafe-load-any-extension=no
[BASIC]
# Naming style matching correct argument names.
argument-naming-style=snake_case
argument-naming-style=camelCase
# Regular expression matching correct argument names. Overrides argument-
# naming-style. If left empty, argument names will be checked with the set
@ -116,7 +116,7 @@ argument-naming-style=snake_case
#argument-rgx=
# Naming style matching correct attribute names.
attr-naming-style=snake_case
attr-naming-style=camelCase
# Regular expression matching correct attribute names. Overrides attr-naming-
# style. If left empty, attribute names will be checked with the set naming
@ -171,7 +171,7 @@ const-naming-style=UPPER_CASE
docstring-min-length=-1
# Naming style matching correct function names.
function-naming-style=snake_case
function-naming-style=camelCase
# Regular expression matching correct function names. Overrides function-
# naming-style. If left empty, function names will be checked with the set
@ -202,14 +202,14 @@ inlinevar-naming-style=any
#inlinevar-rgx=
# Naming style matching correct method names.
method-naming-style=snake_case
method-naming-style=camelCase
# Regular expression matching correct method names. Overrides method-naming-
# style. If left empty, method names will be checked with the set naming style.
#method-rgx=
# Naming style matching correct module names.
module-naming-style=snake_case
module-naming-style=camelCase
# Regular expression matching correct module names. Overrides module-naming-
# style. If left empty, module names will be checked with the set naming style.
@ -233,7 +233,7 @@ property-classes=abc.abstractproperty
#typevar-rgx=
# Naming style matching correct variable names.
variable-naming-style=snake_case
variable-naming-style=camelCase
# Regular expression matching correct variable names. Overrides variable-
# naming-style. If left empty, variable names will be checked with the set
@ -287,7 +287,7 @@ max-attributes=7
max-bool-expr=5
# Maximum number of branch for function / method body.
max-branches=12
max-branches=20
# Maximum number of locals for function / method body.
max-locals=15
@ -425,7 +425,10 @@ disable=raw-checker-failed,
R0022,
broad-exception-raised,
invalid-name,
broad-except
broad-except,
no-name-in-module, # Too many false positives... :(
import-error,
too-many-lines
# Enable the message, report, category or checker with the given id(s). You can
# either give multiple identifier separated by comma (,) or put this option
@ -455,7 +458,7 @@ notes-rgx=
[REFACTORING]
# Maximum number of nested blocks for function / method body
max-nested-blocks=5
max-nested-blocks=8
# Complete name of functions that never returns. When checking for
# inconsistent-return-statements if a never returning function is called then

View File

@ -100,7 +100,7 @@ class IPAuth(auths.Authenticator):
def authenticate(
self,
username: str,
credentials: str,
credentials: str, # pylint: disable=unused-argument
groupsManager: 'auths.GroupsManager',
request: 'ExtendedHttpRequest',
) -> auths.AuthenticationResult:
@ -123,7 +123,7 @@ class IPAuth(auths.Authenticator):
def internalAuthenticate(
self,
username: str,
credentials: str,
credentials: str, # pylint: disable=unused-argument
groupsManager: 'auths.GroupsManager',
request: 'ExtendedHttpRequest',
) -> auths.AuthenticationResult:
@ -137,8 +137,8 @@ class IPAuth(auths.Authenticator):
return auths.FAILED_AUTH
@staticmethod
def test(env, data):
return _("All seems to be fine.")
def test(env, data): # pylint: disable=unused-argument
return [True, _("Internal structures seems ok")]
def check(self):
return _("All seems to be fine.")
@ -154,11 +154,9 @@ class IPAuth(auths.Authenticator):
return ('function setVal(element, value) {{\n' # nosec: no user input, password is always EMPTY
' document.getElementById(element).value = value;\n'
'}}\n'
'setVal("id_user", "{ip}");\n'
'setVal("id_password", "{passwd}");\n'
'document.getElementById("loginform").submit();\n').format(
ip=ip, passwd=''
)
f'setVal("id_user", "{ip}");\n'
'setVal("id_password", "");\n'
'document.getElementById("loginform").submit();\n')
return 'alert("invalid authhenticator"); window.location.reload();'

View File

@ -187,7 +187,7 @@ class InternalDBAuth(auths.Authenticator):
pass
@staticmethod
def test(env, data):
def test(env, data): # pylint: disable=unused-argument
return [True, _("Internal structures seems ok")]
def check(self):

View File

@ -209,7 +209,7 @@ class RadiusAuth(auths.Authenticator):
connection.authenticate(
cryptoManager().randomString(10), cryptoManager().randomString(10)
)
except client.RadiusAuthenticationError as e:
except client.RadiusAuthenticationError:
pass
except Exception:
logger.exception('Connecting')

View File

@ -207,9 +207,6 @@ class RadiusClient:
if reply.code == pyrad.packet.AccessChallenge:
state = typing.cast(typing.List[bytes], reply.get('State') or [b''])[0]
replyMessage = typing.cast(
typing.List[bytes], reply.get('Reply-Message') or ['']
)[0]
return self.challenge_only(username, otp, state=state)
# user/pwd accepted: but this user does not have challenge data

View File

@ -1,4 +1,4 @@
# -*- coding: utf-8 -*-
# pylint: disable=no-member
#
# Copyright (c) 2012-2022 Virtual Cable S.L.U.
@ -269,10 +269,10 @@ class RegexLdap(auths.Authenticator):
pattern = '(' + pattern + ')'
try:
re.search(pattern, '')
except Exception:
except Exception as e:
raise exceptions.ValidationError(
'Invalid pattern in {0}: {1}'.format(fieldLabel, line)
)
f'Invalid pattern in {fieldLabel}: {line}'
) from e
def __getAttrsFromField(self, field: str) -> typing.List[str]:
res = []
@ -486,9 +486,7 @@ class RegexLdap(auths.Authenticator):
for usr in ldaputil.getAsDict(
con=self.__connection(),
base=self._ldapBase,
ldapFilter='(&(objectClass={})({}={}))'.format(
self._altClass, self._userIdAttr, ldaputil.escape(username)
),
ldapFilter=f'(&(objectClass={self._altClass})({self._userIdAttr}={ldaputil.escape(username)}))',
attrList=attributes,
sizeLimit=LDAP_RESULT_LIMIT,
):
@ -555,7 +553,7 @@ class RegexLdap(auths.Authenticator):
self.__connectAs(
usr['dn'], credentials
) # Will raise an exception if it can't connect
except:
except Exception:
authLogLogin(
request, self.dbAuthenticator(), username, 'Invalid password'
)
@ -627,9 +625,7 @@ class RegexLdap(auths.Authenticator):
for r in ldaputil.getAsDict(
con=self.__connection(),
base=self._ldapBase,
ldapFilter='(&(&(objectClass={})({}={}*)))'.format(
self._userClass, self._userIdAttr, ldaputil.escape(pattern)
),
ldapFilter=f'(&(&(objectClass={self._userClass})({self._userIdAttr}={ldaputil.escape(pattern)}*)))',
attrList=None, # All attrs
sizeLimit=LDAP_RESULT_LIMIT,
):
@ -642,11 +638,11 @@ class RegexLdap(auths.Authenticator):
)
logger.debug(res)
return res
except Exception:
except Exception as e:
logger.exception("Exception: ")
raise auths.exceptions.AuthenticatorException(
_('Too many results, be more specific')
)
) from e
@staticmethod
def test(env, data):
@ -676,7 +672,7 @@ class RegexLdap(auths.Authenticator):
con.search_ext_s(
base=self._ldapBase,
scope=ldap.SCOPE_SUBTREE, # type: ignore # ldap.SCOPE_* not resolved due to dynamic creation?
filterstr='(objectClass=%s)' % self._userClass,
filterstr=f'(objectClass={self._userClass})',
sizelimit=1,
)
)
@ -700,8 +696,7 @@ class RegexLdap(auths.Authenticator):
con.search_ext_s(
base=self._ldapBase,
scope=ldap.SCOPE_SUBTREE, # type: ignore # ldap.SCOPE_* not resolved due to dynamic creation?
filterstr='(&(objectClass=%s)(%s=*))'
% (self._userClass, self._userIdAttr),
filterstr=f'(&(objectClass={self._userClass})({self._userIdAttr}=*))',
sizelimit=1,
)
)
@ -728,7 +723,7 @@ class RegexLdap(auths.Authenticator):
con.search_ext_s(
base=self._ldapBase,
scope=ldap.SCOPE_SUBTREE, # type: ignore # ldap.SCOPE_* not resolved due to dynamic creation?
filterstr='(%s=*)' % vals,
filterstr=f'({vals}=*)',
sizelimit=1,
)
)
@ -759,15 +754,8 @@ class RegexLdap(auths.Authenticator):
]
def __str__(self):
return "Ldap Auth: {}:{}@{}:{}, base = {}, userClass = {}, userIdAttr = {}, groupNameAttr = {}, userName attr = {}, altClass={}".format(
self._username,
self._password,
self._host,
self._port,
self._ldapBase,
self._userClass,
self._userIdAttr,
self._groupNameAttr,
self._userNameAttr,
self._altClass,
return (
f'Ldap Auth: {self._username}:{self._password}@{self._host}:{self._port},'
f' base = {self._ldapBase}, userClass = {self._userClass}, userIdAttr = {self._userIdAttr},'
f' groupNameAttr = {self._groupNameAttr}, userName attr = {self._userNameAttr}, altClass={self._altClass}'
)

View File

@ -34,12 +34,12 @@ import re
from urllib.parse import urlparse
import xml.sax # nosec: used to parse trusted xml provided only by administrators
import datetime
import requests
import logging
import typing
import requests
from onelogin.saml2.auth import OneLogin_Saml2_Auth
from onelogin.saml2.utils import OneLogin_Saml2_Utils
from onelogin.saml2.idp_metadata_parser import OneLogin_Saml2_IdPMetadataParser
from onelogin.saml2.settings import OneLogin_Saml2_Settings
@ -373,7 +373,7 @@ class SAMLAuthenticator(auths.Authenticator):
)
try:
pk = cryptoManager().loadPrivateKey(self.privateKey.value)
cryptoManager().loadPrivateKey(self.privateKey.value)
except Exception as e:
raise exceptions.ValidationError(gettext('Invalid private key. ') + str(e))
@ -428,14 +428,14 @@ class SAMLAuthenticator(auths.Authenticator):
def getReqFromRequest(
self,
request: 'ExtendedHttpRequest',
params: typing.Dict[str, typing.Any] = {},
params: typing.Optional[typing.Dict[str, typing.Any]] = None,
) -> typing.Dict[str, typing.Any]:
manageUrlObj = urlparse(self.manageUrl.value)
script_path = manageUrlObj.path
# If callback parameters are passed, we use them
if params:
# TODO: Remove next 3 lines, just for testing and debugging
# Remove next 3 lines, just for testing and debugging
# params['http_host'] = '172.27.0.1'
# params['server_port'] = '8000'
# params['https'] = False
@ -466,7 +466,7 @@ class SAMLAuthenticator(auths.Authenticator):
cachingKeyFnc=CACHING_KEY_FNC,
cacheTimeout=3600 * 24 * 365, # 1 year
)
def getIdpMetadataDict(self, **kwargs) -> typing.Dict[str, typing.Any]:
def getIdpMetadataDict(self) -> typing.Dict[str, typing.Any]:
if self.idpMetadata.value.startswith('http'):
try:
resp = requests.get(
@ -570,10 +570,10 @@ class SAMLAuthenticator(auths.Authenticator):
pattern = '(' + pattern + ')'
try:
re.search(pattern, '')
except:
except Exception as e:
raise exceptions.ValidationError(
'Invalid pattern at {0}: {1}'.format(field.label, line)
)
f'Invalid pattern at {field.label}: {line}'
) from e
def processField(
self, field: str, attributes: typing.Dict[str, typing.List]
@ -818,7 +818,7 @@ class SAMLAuthenticator(auths.Authenticator):
req = self.getReqFromRequest(request)
auth = OneLogin_Saml2_Auth(req, self.oneLoginSettings())
return 'window.location="{0}";'.format(auth.login())
return f'window.location="{auth.login()}";'
def removeUser(self, username):
"""

View File

@ -118,7 +118,9 @@ class SampleAuth(auths.Authenticator):
# : We will define a simple form where we will use a simple
# : list editor to allow entering a few group names
groups = gui.EditableListField(label=_('Groups'), values=['Gods', 'Daemons', 'Mortals'])
groups = gui.EditableListField(
label=_('Groups'), values=['Gods', 'Daemons', 'Mortals']
)
def initialize(self, values: typing.Optional[typing.Dict[str, typing.Any]]) -> None:
"""
@ -131,9 +133,7 @@ class SampleAuth(auths.Authenticator):
# unserialization, and at this point all will be default values
# so self.groups.value will be []
if values and len(self.groups.value) < 2:
raise exceptions.ValidationError(
_('We need more than two groups!')
)
raise exceptions.ValidationError(_('We need more than two groups!'))
def searchUsers(self, pattern: str) -> typing.Iterable[typing.Dict[str, str]]:
"""
@ -147,8 +147,8 @@ class SampleAuth(auths.Authenticator):
"""
return [
{
'id': '{0}-{1}'.format(pattern, a),
'name': '{0} number {1}'.format(pattern, a),
'id': f'{pattern}-{a}',
'name': f'{pattern} number {a}',
}
for a in range(1, 10)
]
@ -173,7 +173,7 @@ class SampleAuth(auths.Authenticator):
username: str,
credentials: str,
groupsManager: 'GroupsManager',
request: 'ExtendedHttpRequest',
request: 'ExtendedHttpRequest', # pylint: disable=unused-argument
) -> auths.AuthenticationResult:
"""
This method is invoked by UDS whenever it needs an user to be authenticated.
@ -246,7 +246,9 @@ class SampleAuth(auths.Authenticator):
if len(set(g.lower()).intersection(username.lower())) >= 2:
groupsManager.validate(g)
def getJavascript(self, request: 'HttpRequest') -> typing.Optional[str]:
def getJavascript(
self, request: 'HttpRequest' # pylint: disable=unused-argument
) -> typing.Optional[str]:
"""
If we override this method from the base one, we are telling UDS
that we want to draw our own authenticator.
@ -278,7 +280,10 @@ class SampleAuth(auths.Authenticator):
return res
def authCallback(
self, parameters: typing.Dict[str, typing.Any], gm: 'auths.GroupsManager', request: 'ExtendedHttpRequestWithUser'
self,
parameters: typing.Dict[str, typing.Any],
gm: 'auths.GroupsManager', # pylint: disable=unused-argument
request: 'ExtendedHttpRequestWithUser', # pylint: disable=unused-argument
) -> AuthenticationResult:
"""
We provide this as a sample of callback for an user.
@ -313,7 +318,7 @@ class SampleAuth(auths.Authenticator):
Here, we will set the state to "Inactive" and realName to the same as username, but twice :-)
"""
from uds.core.util.state import State
from uds.core.util.state import State # pylint: disable=import-outside-toplevel
usrData['real_name'] = usrData['name'] + ' ' + usrData['name']
usrData['state'] = State.INACTIVE

View File

@ -1,5 +1,4 @@
# -*- coding: utf-8 -*-
# pylint: disable=no-member # ldap module gives errors to pylint
#
# Copyright (c) 2012-2021 Virtual Cable S.L.U.
# All rights reserved.
@ -53,7 +52,6 @@ LDAP_RESULT_LIMIT = 100
class SimpleLDAPAuthenticator(auths.Authenticator):
host = gui.TextField(
length=64,
label=_('Host'),
@ -195,7 +193,6 @@ class SimpleLDAPAuthenticator(auths.Authenticator):
tab=gui.MFA_TAB,
)
typeName = _('SimpleLDAP (DEPRECATED)')
typeType = 'SimpleLdapAuthenticator'
typeDescription = _('Simple LDAP authenticator (DEPRECATED)')
@ -230,7 +227,6 @@ class SimpleLDAPAuthenticator(auths.Authenticator):
_verifySsl: bool = True
_certificate: str = ''
def initialize(self, values: typing.Optional[typing.Dict[str, typing.Any]]) -> None:
if values:
self._host = values['host']
@ -319,12 +315,8 @@ class SimpleLDAPAuthenticator(auths.Authenticator):
) = vals[1:14]
self._ssl = gui.toBool(ssl)
if vals[0] == 'v2':
(
self._mfaAttr,
verifySsl,
self._certificate
) = vals[14:17]
if vals[0] == 'v2':
(self._mfaAttr, verifySsl, self._certificate) = vals[14:17]
self._verifySsl = gui.toBool(verifySsl)
def mfaStorageKey(self, username: str) -> str:
@ -333,9 +325,7 @@ class SimpleLDAPAuthenticator(auths.Authenticator):
def mfaIdentifier(self, username: str) -> str:
return self.storage.getPickle(self.mfaStorageKey(username)) or ''
def __connection(
self
):
def __connection(self):
"""
Tries to connect to ldap. If username is None, it tries to connect using user provided credentials.
@return: Connection established
@ -410,13 +400,7 @@ class SimpleLDAPAuthenticator(auths.Authenticator):
try:
groups: typing.List[str] = []
filter_ = '(&(objectClass=%s)(|(%s=%s)(%s=%s)))' % (
self._groupClass,
self._memberAttr,
user['_id'],
self._memberAttr,
user['dn'],
)
filter_ = f'(&(objectClass={self._groupClass})(|({self._memberAttr}={user["_id"]})({self._memberAttr}={user["dn"]})))'
for d in ldaputil.getAsDict(
con=self.__connection(),
base=self._ldapBase,
@ -450,7 +434,11 @@ class SimpleLDAPAuthenticator(auths.Authenticator):
).strip()
def authenticate(
self, username: str, credentials: str, groupsManager: 'auths.GroupsManager', request: 'ExtendedHttpRequest'
self,
username: str,
credentials: str,
groupsManager: 'auths.GroupsManager',
request: 'ExtendedHttpRequest',
) -> auths.AuthenticationResult:
'''
Must authenticate the user.
@ -466,9 +454,7 @@ class SimpleLDAPAuthenticator(auths.Authenticator):
user = self.__getUser(username)
if user is None:
authLogLogin(
request, self.dbAuthenticator(), username, 'Invalid user'
)
authLogLogin(request, self.dbAuthenticator(), username, 'Invalid user')
return auths.FAILED_AUTH
try:
@ -476,7 +462,7 @@ class SimpleLDAPAuthenticator(auths.Authenticator):
self.__connectAs(
user['dn'], credentials
) # Will raise an exception if it can't connect
except:
except Exception:
authLogLogin(
request, self.dbAuthenticator(), username, 'Invalid password'
)
@ -556,8 +542,7 @@ class SimpleLDAPAuthenticator(auths.Authenticator):
for r in ldaputil.getAsDict(
con=self.__connection(),
base=self._ldapBase,
ldapFilter='(&(objectClass=%s)(%s=%s*))'
% (self._userClass, self._userIdAttr, pattern),
ldapFilter=f'(&(objectClass={self._userClass})({self._userIdAttr}={pattern}*))',
attrList=[self._userIdAttr, self._userNameAttr],
sizeLimit=LDAP_RESULT_LIMIT,
):
@ -569,11 +554,11 @@ class SimpleLDAPAuthenticator(auths.Authenticator):
)
return res
except Exception:
except Exception as e:
logger.exception("Exception: ")
raise auths.exceptions.AuthenticatorException(
_('Too many results, be more specific')
)
) from e
def searchGroups(self, pattern: str) -> typing.Iterable[typing.Dict[str, str]]:
try:
@ -581,19 +566,18 @@ class SimpleLDAPAuthenticator(auths.Authenticator):
for r in ldaputil.getAsDict(
con=self.__connection(),
base=self._ldapBase,
ldapFilter='(&(objectClass=%s)(%s=%s*))'
% (self._groupClass, self._groupIdAttr, pattern),
ldapFilter=f'(&(objectClass={self._groupClass})({self._groupIdAttr}={pattern}*))',
attrList=[self._groupIdAttr, 'memberOf', 'description'],
sizeLimit=LDAP_RESULT_LIMIT,
):
res.append({'id': r[self._groupIdAttr][0], 'name': r['description'][0]})
return res
except Exception:
except Exception as e:
logger.exception("Exception: ")
raise auths.exceptions.AuthenticatorException(
_('Too many results, be more specific')
)
) from e
@staticmethod
def test(env, data) -> typing.List[typing.Any]:
@ -620,7 +604,17 @@ class SimpleLDAPAuthenticator(auths.Authenticator):
return [False, _('Ldap search base is incorrect')]
try:
if len(con.search_ext_s(base=self._ldapBase, scope=ldap.SCOPE_SUBTREE, filterstr='(objectClass=%s)' % self._userClass, sizelimit=1)) == 1: # type: ignore # SCOPE.. exists on LDAP after load
if (
len(
con.search_ext_s(
base=self._ldapBase,
scope=ldap.SCOPE_SUBTREE, # type: ignore # SCOPE.. exists on LDAP after load
filterstr=f'(objectClass={self._userClass})',
sizelimit=1,
)
)
== 1
):
raise Exception()
return [
False,
@ -628,7 +622,7 @@ class SimpleLDAPAuthenticator(auths.Authenticator):
'Ldap user class seems to be incorrect (no user found by that class)'
),
]
except Exception as e: # nosec: Flow control
except Exception: # nosec: Flow control
# If found 1 or more, all right
pass
@ -638,7 +632,7 @@ class SimpleLDAPAuthenticator(auths.Authenticator):
con.search_ext_s(
base=self._ldapBase,
scope=ldap.SCOPE_SUBTREE, # type: ignore # SCOPE.. exists on LDAP after load
filterstr='(objectClass=%s)' % self._groupClass,
filterstr=f'(objectClass={self._groupClass})',
sizelimit=1,
)
)
@ -651,7 +645,7 @@ class SimpleLDAPAuthenticator(auths.Authenticator):
'Ldap group class seems to be incorrect (no group found by that class)'
),
]
except Exception as e: # nosec: Flow control
except Exception: # nosec: Flow control
# If found 1 or more, all right
pass
@ -661,7 +655,7 @@ class SimpleLDAPAuthenticator(auths.Authenticator):
con.search_ext_s(
base=self._ldapBase,
scope=ldap.SCOPE_SUBTREE, # type: ignore # SCOPE.. exists on LDAP after load
filterstr='(%s=*)' % self._userIdAttr,
filterstr=f'({self._userIdAttr}=*)',
sizelimit=1,
)
)
@ -674,7 +668,7 @@ class SimpleLDAPAuthenticator(auths.Authenticator):
'Ldap user id attribute seems to be incorrect (no user found by that attribute)'
),
]
except Exception as e: # nosec: Flow control
except Exception: # nosec: Flow control
# If found 1 or more, all right
pass
@ -684,7 +678,7 @@ class SimpleLDAPAuthenticator(auths.Authenticator):
con.search_ext_s(
base=self._ldapBase,
scope=ldap.SCOPE_SUBTREE, # type: ignore # SCOPE.. exists on LDAP after load
filterstr='(%s=*)' % self._groupIdAttr,
filterstr=f'({self._groupIdAttr}=*)',
sizelimit=1,
)
)
@ -697,7 +691,7 @@ class SimpleLDAPAuthenticator(auths.Authenticator):
'Ldap group id attribute seems to be incorrect (no group found by that attribute)'
),
]
except Exception as e: # nosec: Flow control
except Exception: # nosec: Flow control
# If found 1 or more, all right
pass
@ -708,8 +702,7 @@ class SimpleLDAPAuthenticator(auths.Authenticator):
con.search_ext_s(
base=self._ldapBase,
scope=ldap.SCOPE_SUBTREE, # type: ignore # SCOPE.. exists on LDAP after load
filterstr='(&(objectClass=%s)(%s=*))'
% (self._userClass, self._userIdAttr),
filterstr=f'(&(objectClass={self._userClass})({self._userIdAttr}=*))',
sizelimit=1,
)
)
@ -722,7 +715,7 @@ class SimpleLDAPAuthenticator(auths.Authenticator):
'Ldap user class or user id attr is probably wrong (can\'t find any user with both conditions)'
),
]
except Exception as e: # nosec: Flow control
except Exception: # nosec: Flow control
# If found 1 or more, all right
pass
@ -731,8 +724,7 @@ class SimpleLDAPAuthenticator(auths.Authenticator):
res = con.search_ext_s(
base=self._ldapBase,
scope=ldap.SCOPE_SUBTREE, # type: ignore # SCOPE.. exists on LDAP after load
filterstr='(&(objectClass=%s)(%s=*))'
% (self._groupClass, self._groupIdAttr),
filterstr=f'(&(objectClass={self._groupClass})({self._groupIdAttr}=*))',
attrlist=[self._memberAttr],
)
if not res:
@ -759,16 +751,9 @@ class SimpleLDAPAuthenticator(auths.Authenticator):
]
def __str__(self):
return "Ldap Auth: {0}:{1}@{2}:{3}, base = {4}, userClass = {5}, groupClass = {6}, userIdAttr = {7}, groupIdAttr = {8}, memberAttr = {9}, userName attr = {10}".format(
self._username,
self._password,
self._host,
self._port,
self._ldapBase,
self._userClass,
self._groupClass,
self._userIdAttr,
self._groupIdAttr,
self._memberAttr,
self._userNameAttr,
return (
f'Ldap Auth: {self._username}:{self._password}@{self._host}:{self._port}, '
f'base = {self._ldapBase}, userClass = {self._userClass}, groupClass = {self._groupClass}, '
f'userIdAttr = {self._userIdAttr}, groupIdAttr = {self._groupIdAttr}, '
f'memberAttr = {self._memberAttr}, userName attr = {self._userNameAttr}'
)

View File

@ -36,6 +36,6 @@ import time
VERSION = '4.x.x-DEVEL'
VERSION_STAMP = '{}-DEVEL'.format(time.strftime("%Y%m%d"))
VERSION_STAMP = f'{time.strftime("%Y%m%d")}-DEVEL'
# Minimal uds client version required to connect to this server
REQUIRED_CLIENT_VERSION = '3.5.0'
REQUIRED_CLIENT_VERSION = '3.6.0'

View File

@ -70,8 +70,8 @@ class Environment:
{'mac' : UniqueMacGenerator, 'name' : UniqueNameGenerator } as argument.
"""
# Avoid circular imports
from uds.core.util.cache import Cache
from uds.core.util.storage import Storage
from uds.core.util.cache import Cache # pylint: disable=import-outside-toplevel
from uds.core.util.storage import Storage # pylint: disable=import-outside-toplevel
if idGenerators is None:
idGenerators = dict()
@ -105,7 +105,7 @@ class Environment:
@return: Generator for that id, or None if no generator for that id is found
"""
if not self._idGenerators or generatorId not in self._idGenerators:
raise Exception('No generator found for {}'.format(generatorId))
raise Exception(f'No generator found for {generatorId}')
return self._idGenerators[generatorId]
@property

View File

@ -31,22 +31,20 @@
Author: Adolfo Gómez, dkmaster at dkmon dot com
"""
import typing
class UDSException(Exception):
"""
Base class for all UDS exceptions
"""
pass
class ValidationError(UDSException):
"""
Exception used to indicate that the params assigned are invalid
"""
pass
class TransportError(UDSException):
"""
Exception used to indicate that the transport is not available
"""
pass

View File

@ -46,41 +46,41 @@ if typing.TYPE_CHECKING:
def cryptoManager() -> 'CryptoManager':
from .crypto import CryptoManager
from .crypto import CryptoManager # pylint: disable=import-outside-toplevel
return CryptoManager.manager()
def taskManager() -> 'TaskManager':
from .task import TaskManager
from .task import TaskManager # pylint: disable=import-outside-toplevel
return TaskManager()
def downloadsManager() -> 'DownloadsManager':
from .downloads import DownloadsManager
from .downloads import DownloadsManager # pylint: disable=import-outside-toplevel
return DownloadsManager()
def logManager() -> 'LogManager':
from .log import LogManager
from .log import LogManager # pylint: disable=import-outside-toplevel
return LogManager()
def userServiceManager() -> 'UserServiceManager':
from .user_service import UserServiceManager
from .user_service import UserServiceManager # pylint: disable=import-outside-toplevel
return UserServiceManager()
def publicationManager() -> 'PublicationManager':
from .publication import PublicationManager
from .publication import PublicationManager # pylint: disable=import-outside-toplevel
return PublicationManager()
def notificationsManager() -> 'NotificationsManager':
from .notifications import NotificationsManager
from .notifications import NotificationsManager # pylint: disable=import-outside-toplevel
return NotificationsManager()

View File

@ -207,7 +207,7 @@ class Module(UserInterface, Environmentable, Serializable):
return codecs.encode(cls.icon(), 'base64').decode()
@staticmethod
def test(env: Environment, data: typing.Dict[str, str]) -> typing.List[typing.Any]:
def test(env: Environment, data: typing.Dict[str, str]) -> typing.List[typing.Any]: # pylint: disable=unused-argument
"""
Test if the connection data is ok.

View File

@ -32,8 +32,6 @@
"""
import base64
import pickle # nosec: Safe pickle usage
import gzip
import typing
class Serializable:

View File

@ -124,27 +124,27 @@ class gui:
# : For backward compatibility, will be removed in future versions
# For now, will log a warning if used
@deprecatedClassValue('gui.Tab.ADVANCED')
def ADVANCED_TAB(cls) -> str:
def ADVANCED_TAB(cls) -> str: # pylint: disable=no-self-argument
return str(gui.Tab.ADVANCED)
@deprecatedClassValue('gui.Tab.PARAMETERS')
def PARAMETERS_TAB(cls) -> str:
def PARAMETERS_TAB(cls) -> str: # pylint: disable=no-self-argument
return str(gui.Tab.PARAMETERS)
@deprecatedClassValue('gui.Tab.CREDENTIALS')
def CREDENTIALS_TAB(cls) -> str:
def CREDENTIALS_TAB(cls) -> str: # pylint: disable=no-self-argument
return str(gui.Tab.CREDENTIALS)
@deprecatedClassValue('gui.Tab.TUNNEL')
def TUNNEL_TAB(cls) -> str:
def TUNNEL_TAB(cls) -> str: # pylint: disable=no-self-argument
return str(gui.Tab.TUNNEL)
@deprecatedClassValue('gui.Tab.DISPLAY')
def DISPLAY_TAB(cls) -> str:
def DISPLAY_TAB(cls) -> str: # pylint: disable=no-self-argument
return str(gui.Tab.DISPLAY)
@deprecatedClassValue('gui.Tab.MFA')
def MFA_TAB(cls) -> str:
def MFA_TAB(cls) -> str: # pylint: disable=no-self-argument
return str(gui.Tab.MFA)
# : Static Callbacks simple registry
@ -196,7 +196,7 @@ class gui:
) -> 'gui.ChoiceType':
if isinstance(val, dict):
if 'id' not in val or 'text' not in val:
raise ValueError('Invalid choice dict: {}'.format(val))
raise ValueError(f'Invalid choice dict: {val}')
return gui.choiceItem(val['id'], val['text'])
# If val is not a dict, and it has not 'id' and 'text', raise an exception
return gui.choiceItem(val, val)
@ -214,7 +214,7 @@ class gui:
return [choiceFromValue(v) for v in vals]
# This should never happen
raise RuntimeError('Invalid type for convertToChoices: {}'.format(type(vals)))
raise ValueError(f'Invalid type for convertToChoices: {vals}')
@staticmethod
def convertToList(
@ -588,6 +588,9 @@ class gui:
return re.match(self._data['pattern'], self.value) is not None
return True # No pattern, so it's valid
def __str__(self):
return str(self.value)
class TextAutocompleteField(TextField):
"""
This represents a text field that holds autocomplete values.
@ -667,7 +670,7 @@ class gui:
def __init__(self, **options):
super().__init__(**options, type=gui.InputField.Types.DATE)
def date(self, min: bool = True) -> datetime.date:
def date(self, useMin: bool = True) -> datetime.date:
"""
Returns the date this object represents
@ -682,9 +685,9 @@ class gui:
self.value, '%Y-%m-%d'
).date() # ISO Format
except Exception:
return datetime.date.min if min else datetime.date.max
return datetime.date.min if useMin else datetime.date.max
def datetime(self, min: bool = True) -> datetime.datetime:
def datetime(self, useMin: bool = True) -> datetime.datetime:
"""
Returns the date this object represents
@ -697,7 +700,7 @@ class gui:
try:
return datetime.datetime.strptime(self.value, '%Y-%m-%d') # ISO Format
except Exception:
return datetime.datetime.min if min else datetime.datetime.max
return datetime.datetime.min if useMin else datetime.datetime.max
def stamp(self) -> int:
return int(
@ -1052,8 +1055,9 @@ class UserInterfaceType(type):
better place. This is done this way because we will "deepcopy" these fields
later, and update references on class 'self' to the new copy. (so everyone has a different copy)
"""
def __new__(
cls: typing.Type['UserInterfaceType'],
mcs: typing.Type['UserInterfaceType'],
classname: str,
bases: typing.Tuple[type, ...],
namespace: typing.Dict[str, typing.Any],
@ -1070,7 +1074,7 @@ class UserInterfaceType(type):
newClassDict[attrName] = attr
newClassDict['_base_gui'] = _gui
return typing.cast(
'UserInterfaceType', type.__new__(cls, classname, bases, newClassDict)
'UserInterfaceType', type.__new__(mcs, classname, bases, newClassDict)
)
@ -1085,11 +1089,11 @@ class UserInterface(metaclass=UserInterfaceType):
By default, the values passed to this class constructor are used to fill
the gui form fields values.
"""
class ValidationFieldInfo(typing.NamedTuple):
field: str
error: str
# Class variable that will hold the gui fields description
_base_gui: typing.ClassVar[typing.Dict[str, gui.InputField]]
@ -1258,8 +1262,8 @@ class UserInterface(metaclass=UserInterfaceType):
self.oldDeserializeForm(values)
return
# For future use
version = values[
# For future use, right now we only have one version
version = values[ # pylint: disable=unused-variable
len(SERIALIZATION_HEADER) : len(SERIALIZATION_HEADER)
+ len(SERIALIZATION_VERSION)
]
@ -1286,7 +1290,7 @@ class UserInterface(metaclass=UserInterfaceType):
] = {
gui.InputField.Types.TEXT: lambda x: x,
gui.InputField.Types.TEXT_AUTOCOMPLETE: lambda x: x,
gui.InputField.Types.NUMERIC: lambda x: int(x),
gui.InputField.Types.NUMERIC: int,
gui.InputField.Types.PASSWORD: lambda x: (
CryptoManager().AESDecrypt(x.encode(), UDSK, True).decode()
),
@ -1369,7 +1373,7 @@ class UserInterface(metaclass=UserInterfaceType):
if isinstance(val, bytes):
val = val.decode('utf8')
except Exception:
logger.exception('Pickling {} from {}'.format(k, self))
logger.exception('Pickling %s from %s', k, self)
val = ''
self._gui[k].value = val
# logger.debug('Value for {0}:{1}'.format(k, val))
@ -1401,9 +1405,12 @@ class UserInterface(metaclass=UserInterfaceType):
found_erros: typing.List[UserInterface.ValidationFieldInfo] = []
for key, val in self._gui.items():
if val.required and not val.value:
found_erros.append(UserInterface.ValidationFieldInfo(key, 'Field is required'))
found_erros.append(
UserInterface.ValidationFieldInfo(key, 'Field is required')
)
if not val.validate():
found_erros.append(UserInterface.ValidationFieldInfo(key, 'Field is not valid'))
found_erros.append(
UserInterface.ValidationFieldInfo(key, 'Field is not valid')
)
return found_erros