1
0
mirror of https://github.com/dkmstr/openuds.git synced 2025-03-22 14:50:29 +03:00

Refactorized ldap and added "ignores" to non recognized correct values

This commit is contained in:
Adolfo Gómez García 2021-06-03 11:43:56 +02:00
parent 21f6df36b0
commit f184fa778d
2 changed files with 46 additions and 32 deletions

View File

@ -1,7 +1,7 @@
# -*- coding: utf-8 -*-
#
# Copyright (c) 2016 Virtual Cable S.L.
# Copyright (c) 2016-2021 Virtual Cable S.L.U.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without modification,
@ -12,7 +12,7 @@
# * Redistributions in binary form must reproduce the above copyright notice,
# this list of conditions and the following disclaimer in the documentation
# and/or other materials provided with the distribution.
# * Neither the name of Virtual Cable S.L. nor the names of its contributors
# * Neither the name of Virtual Cable S.L.U. nor the names of its contributors
# may be used to endorse or promote products derived from this software
# without specific prior written permission.
#
@ -42,6 +42,7 @@ logger = logging.getLogger(__name__)
LDAPResultType = typing.MutableMapping[str, typing.Any]
class LDAPError(Exception):
@staticmethod
def reraise(e: typing.Any):
@ -60,7 +61,15 @@ def escape(value: str):
return ldap.filter.escape_filter_chars(value)
def connection(username: str, passwd: typing.Union[str, bytes], host: str, port: int = -1, ssl: bool = False, timeout: int = 3, debug: bool = False) -> typing.Any:
def connection(
username: str,
passwd: typing.Union[str, bytes],
host: str,
port: int = -1,
ssl: bool = False,
timeout: int = 3,
debug: bool = False,
) -> typing.Any:
"""
Tries to connect to ldap. If username is None, it tries to connect using user provided credentials.
@param username: Username for connection validation
@ -74,24 +83,24 @@ def connection(username: str, passwd: typing.Union[str, bytes], host: str, port:
try:
if debug:
ldap.set_option(ldap.OPT_DEBUG_LEVEL, 9)
ldap.set_option(ldap.OPT_X_TLS_REQUIRE_CERT, ldap.OPT_X_TLS_NEVER)
ldap.set_option(ldap.OPT_DEBUG_LEVEL, 9) # type: ignore
ldap.set_option(ldap.OPT_X_TLS_REQUIRE_CERT, ldap.OPT_X_TLS_NEVER) # type: ignore
schema = 'ldaps' if ssl else 'ldap'
if port == -1:
port = 636 if ssl else 389
uri = "{}://{}:{}".format(schema, host, port)
logger.debug('Ldap uri: %s', uri)
l = ldap.initialize(uri=uri)
l.set_option(ldap.OPT_REFERRALS, 0)
l.set_option(ldap.OPT_TIMEOUT, int(timeout))
l = ldap.initialize(uri=uri) # type: ignore
l.set_option(ldap.OPT_REFERRALS, 0) # type: ignore
l.set_option(ldap.OPT_TIMEOUT, int(timeout)) # type: ignore
l.network_timeout = int(timeout)
l.protocol_version = ldap.VERSION3
l.protocol_version = ldap.VERSION3 # type: ignore
l.simple_bind_s(who=username, cred=password)
except ldap.SERVER_DOWN:
except ldap.SERVER_DOWN: # type: ignore
raise LDAPError(_('Can\'t contact LDAP server'))
except ldap.LDAPError as e:
except ldap.LDAPError as e: # type: ignore
LDAPError.reraise(e)
except Exception as e:
logger.exception('Exception connection:')
@ -102,13 +111,13 @@ def connection(username: str, passwd: typing.Union[str, bytes], host: str, port:
def getAsDict(
con: typing.Any,
base: str,
ldapFilter: str,
attrList: typing.Optional[typing.Iterable[str]],
sizeLimit: int,
scope=ldap.SCOPE_SUBTREE
) -> typing.Generator[LDAPResultType, None, None]:
con: typing.Any,
base: str,
ldapFilter: str,
attrList: typing.Optional[typing.Iterable[str]],
sizeLimit: int,
scope=ldap.SCOPE_SUBTREE, # type: ignore
) -> typing.Generator[LDAPResultType, None, None]:
"""
Makes a search on LDAP, adjusting string to required type (ascii on python2, str on python3).
returns an generator with the results, where each result is a dictionary where it values are always a list of strings
@ -126,9 +135,9 @@ def getAsDict(
scope=scope,
filterstr=ldapFilter,
attrlist=attrList,
sizelimit=sizeLimit
sizelimit=sizeLimit,
)
except ldap.LDAPError as e:
except ldap.LDAPError as e: # type: ignore
LDAPError.reraise(e)
except Exception as e:
logger.exception('Exception connection:')
@ -142,7 +151,11 @@ def getAsDict(
continue # Skip None entities
# Convert back attritutes to test_type ONLY on python2
dct = tools.CaseInsensitiveDict((k, ['']) for k in attrList) if attrList is not None else tools.CaseInsensitiveDict()
dct = (
tools.CaseInsensitiveDict((k, ['']) for k in attrList)
if attrList is not None
else tools.CaseInsensitiveDict()
)
# Convert back result fields to str
for k, v in r[1].items():
@ -152,15 +165,16 @@ def getAsDict(
yield dct
def getFirst(
con: typing.Any,
base: str,
objectClass: str,
field: str,
value: str,
attributes: typing.Optional[typing.Iterable[str]] = None,
sizeLimit: int = 50
) -> typing.Optional[LDAPResultType]:
con: typing.Any,
base: str,
objectClass: str,
field: str,
value: str,
attributes: typing.Optional[typing.Iterable[str]] = None,
sizeLimit: int = 50,
) -> typing.Optional[LDAPResultType]:
"""
Searchs for the username and returns its LDAP entry
@param username: username to search, using user provided parameters at configuration to map search entries.
@ -185,7 +199,7 @@ def getFirst(
# Recursive delete
def recursive_delete(con: typing.Any, base_dn: str) -> None:
search = con.search_s(base_dn, ldap.SCOPE_ONELEVEL)
search = con.search_s(base_dn, ldap.SCOPE_ONELEVEL) # type: ignore
for dn, _ in search:
# recursive_delete(conn, dn)

View File

@ -1,7 +1,7 @@
# -*- coding: utf-8 -*-
#
# Copyright (c) 2012-2019 Virtual Cable S.L.
# Copyright (c) 2012-2021 Virtual Cable S.L.U.
# All rights reserved.
#
#
@ -13,7 +13,7 @@
# * Redistributions in binary form must reproduce the above copyright notice,
# this list of conditions and the following disclaimer in the documentation
# and/or other materials provided with the distribution.
# * Neither the name of Virtual Cable S.L. nor the names of its contributors
# * Neither the name of Virtual Cable S.L.U. nor the names of its contributors
# may be used to endorse or promote products derived from this software
# without specific prior written permission.
#