diff --git a/server/src/uds/core/util/ldaputil.py b/server/src/uds/core/util/ldaputil.py index 0a91d5018..51d02a864 100644 --- a/server/src/uds/core/util/ldaputil.py +++ b/server/src/uds/core/util/ldaputil.py @@ -1,7 +1,7 @@ # -*- coding: utf-8 -*- # -# Copyright (c) 2016 Virtual Cable S.L. +# Copyright (c) 2016-2021 Virtual Cable S.L.U. # All rights reserved. # # Redistribution and use in source and binary forms, with or without modification, @@ -12,7 +12,7 @@ # * Redistributions in binary form must reproduce the above copyright notice, # this list of conditions and the following disclaimer in the documentation # and/or other materials provided with the distribution. -# * Neither the name of Virtual Cable S.L. nor the names of its contributors +# * Neither the name of Virtual Cable S.L.U. nor the names of its contributors # may be used to endorse or promote products derived from this software # without specific prior written permission. # @@ -42,6 +42,7 @@ logger = logging.getLogger(__name__) LDAPResultType = typing.MutableMapping[str, typing.Any] + class LDAPError(Exception): @staticmethod def reraise(e: typing.Any): @@ -60,7 +61,15 @@ def escape(value: str): return ldap.filter.escape_filter_chars(value) -def connection(username: str, passwd: typing.Union[str, bytes], host: str, port: int = -1, ssl: bool = False, timeout: int = 3, debug: bool = False) -> typing.Any: +def connection( + username: str, + passwd: typing.Union[str, bytes], + host: str, + port: int = -1, + ssl: bool = False, + timeout: int = 3, + debug: bool = False, +) -> typing.Any: """ Tries to connect to ldap. If username is None, it tries to connect using user provided credentials. @param username: Username for connection validation @@ -74,24 +83,24 @@ def connection(username: str, passwd: typing.Union[str, bytes], host: str, port: try: if debug: - ldap.set_option(ldap.OPT_DEBUG_LEVEL, 9) - ldap.set_option(ldap.OPT_X_TLS_REQUIRE_CERT, ldap.OPT_X_TLS_NEVER) + ldap.set_option(ldap.OPT_DEBUG_LEVEL, 9) # type: ignore + ldap.set_option(ldap.OPT_X_TLS_REQUIRE_CERT, ldap.OPT_X_TLS_NEVER) # type: ignore schema = 'ldaps' if ssl else 'ldap' if port == -1: port = 636 if ssl else 389 uri = "{}://{}:{}".format(schema, host, port) logger.debug('Ldap uri: %s', uri) - l = ldap.initialize(uri=uri) - l.set_option(ldap.OPT_REFERRALS, 0) - l.set_option(ldap.OPT_TIMEOUT, int(timeout)) + l = ldap.initialize(uri=uri) # type: ignore + l.set_option(ldap.OPT_REFERRALS, 0) # type: ignore + l.set_option(ldap.OPT_TIMEOUT, int(timeout)) # type: ignore l.network_timeout = int(timeout) - l.protocol_version = ldap.VERSION3 + l.protocol_version = ldap.VERSION3 # type: ignore l.simple_bind_s(who=username, cred=password) - except ldap.SERVER_DOWN: + except ldap.SERVER_DOWN: # type: ignore raise LDAPError(_('Can\'t contact LDAP server')) - except ldap.LDAPError as e: + except ldap.LDAPError as e: # type: ignore LDAPError.reraise(e) except Exception as e: logger.exception('Exception connection:') @@ -102,13 +111,13 @@ def connection(username: str, passwd: typing.Union[str, bytes], host: str, port: def getAsDict( - con: typing.Any, - base: str, - ldapFilter: str, - attrList: typing.Optional[typing.Iterable[str]], - sizeLimit: int, - scope=ldap.SCOPE_SUBTREE - ) -> typing.Generator[LDAPResultType, None, None]: + con: typing.Any, + base: str, + ldapFilter: str, + attrList: typing.Optional[typing.Iterable[str]], + sizeLimit: int, + scope=ldap.SCOPE_SUBTREE, # type: ignore +) -> typing.Generator[LDAPResultType, None, None]: """ Makes a search on LDAP, adjusting string to required type (ascii on python2, str on python3). returns an generator with the results, where each result is a dictionary where it values are always a list of strings @@ -126,9 +135,9 @@ def getAsDict( scope=scope, filterstr=ldapFilter, attrlist=attrList, - sizelimit=sizeLimit + sizelimit=sizeLimit, ) - except ldap.LDAPError as e: + except ldap.LDAPError as e: # type: ignore LDAPError.reraise(e) except Exception as e: logger.exception('Exception connection:') @@ -142,7 +151,11 @@ def getAsDict( continue # Skip None entities # Convert back attritutes to test_type ONLY on python2 - dct = tools.CaseInsensitiveDict((k, ['']) for k in attrList) if attrList is not None else tools.CaseInsensitiveDict() + dct = ( + tools.CaseInsensitiveDict((k, ['']) for k in attrList) + if attrList is not None + else tools.CaseInsensitiveDict() + ) # Convert back result fields to str for k, v in r[1].items(): @@ -152,15 +165,16 @@ def getAsDict( yield dct + def getFirst( - con: typing.Any, - base: str, - objectClass: str, - field: str, - value: str, - attributes: typing.Optional[typing.Iterable[str]] = None, - sizeLimit: int = 50 - ) -> typing.Optional[LDAPResultType]: + con: typing.Any, + base: str, + objectClass: str, + field: str, + value: str, + attributes: typing.Optional[typing.Iterable[str]] = None, + sizeLimit: int = 50, +) -> typing.Optional[LDAPResultType]: """ Searchs for the username and returns its LDAP entry @param username: username to search, using user provided parameters at configuration to map search entries. @@ -185,7 +199,7 @@ def getFirst( # Recursive delete def recursive_delete(con: typing.Any, base_dn: str) -> None: - search = con.search_s(base_dn, ldap.SCOPE_ONELEVEL) + search = con.search_s(base_dn, ldap.SCOPE_ONELEVEL) # type: ignore for dn, _ in search: # recursive_delete(conn, dn) diff --git a/server/src/uds/osmanagers/WindowsOsManager/windows_domain.py b/server/src/uds/osmanagers/WindowsOsManager/windows_domain.py index be1dbcb28..786fad6ca 100644 --- a/server/src/uds/osmanagers/WindowsOsManager/windows_domain.py +++ b/server/src/uds/osmanagers/WindowsOsManager/windows_domain.py @@ -1,7 +1,7 @@ # -*- coding: utf-8 -*- # -# Copyright (c) 2012-2019 Virtual Cable S.L. +# Copyright (c) 2012-2021 Virtual Cable S.L.U. # All rights reserved. # # @@ -13,7 +13,7 @@ # * Redistributions in binary form must reproduce the above copyright notice, # this list of conditions and the following disclaimer in the documentation # and/or other materials provided with the distribution. -# * Neither the name of Virtual Cable S.L. nor the names of its contributors +# * Neither the name of Virtual Cable S.L.U. nor the names of its contributors # may be used to endorse or promote products derived from this software # without specific prior written permission. #