Fixing up bandit recomendations & minor typo errors

This commit is contained in:
Adolfo Gómez García 2022-08-15 14:10:17 +02:00
parent 17b040d9b3
commit 76745e8624
7 changed files with 62 additions and 46 deletions

View File

@ -93,23 +93,23 @@ class InternalDBAuth(auths.Authenticator):
tab=gui.ADVANCED_TAB, tab=gui.ADVANCED_TAB,
) )
def getIp(self) -> str: def getIp(self, request: 'ExtendedHttpRequest') -> str:
ip = ( ip = (
getRequest().ip_proxy if self.acceptProxy.isTrue() else getRequest().ip request.ip_proxy if self.acceptProxy.isTrue() else request.ip
) # pylint: disable=maybe-no-member ) # pylint: disable=maybe-no-member
if self.reverseDns.isTrue(): if self.reverseDns.isTrue():
try: try:
return str( return str(
dns.resolver.query(dns.reversename.from_address(ip).to_text(), 'PTR')[0] dns.resolver.query(dns.reversename.from_address(ip).to_text(), 'PTR')[0]
) )
except Exception: except Exception: # nosec: intentionally
pass pass
return ip return ip
def mfaIdentifier(self, username: str) -> str: def mfaIdentifier(self, username: str) -> str:
try: try:
self.dbAuthenticator().users.get(name=username, state=State.ACTIVE).mfaData self.dbAuthenticator().users.get(name=username, state=State.ACTIVE).mfaData
except Exception: except Exception: # nosec: This is e controled pickle loading
pass pass
return '' return ''
@ -134,7 +134,7 @@ class InternalDBAuth(auths.Authenticator):
usr.name = newUsername usr.name = newUsername
usr.parent = parent usr.parent = parent
usr.save() usr.save()
except Exception: except Exception: # nosec: intentionally
pass # User already exists pass # User already exists
username = newUsername username = newUsername

View File

@ -32,7 +32,7 @@
""" """
import re import re
from urllib.parse import urlparse from urllib.parse import urlparse
import xml.sax import xml.sax # nosec: used to parse trusted xml provided only by administrators
import datetime import datetime
import requests import requests
import logging import logging
@ -204,6 +204,13 @@ class SAMLAuthenticator(auths.Authenticator):
tab=gui.ADVANCED_TAB, tab=gui.ADVANCED_TAB,
) )
checkSSLCertificate = gui.CheckBoxField(
label=_('Check SSL certificate'),
defvalue=False, # For compatibility with previous versions
order=23,
tooltip=_('If set, check SSL certificate on requests for IDP Metadata'),
tab=_('Security'),
)
nameIdEncrypted = gui.CheckBoxField( nameIdEncrypted = gui.CheckBoxField(
label=_('Encripted nameID'), label=_('Encripted nameID'),
@ -375,7 +382,7 @@ class SAMLAuthenticator(auths.Authenticator):
if idpMetadata.startswith('http://') or idpMetadata.startswith('https://'): if idpMetadata.startswith('http://') or idpMetadata.startswith('https://'):
logger.debug('idp Metadata is an URL: %s', idpMetadata) logger.debug('idp Metadata is an URL: %s', idpMetadata)
try: try:
resp = requests.get(idpMetadata.split('\n')[0], verify=False) resp = requests.get(idpMetadata.split('\n')[0], verify=self.checkSSLCertificate.isTrue())
idpMetadata = resp.content.decode() idpMetadata = resp.content.decode()
except Exception as e: except Exception as e:
raise auths.Authenticator.ValidationException( raise auths.Authenticator.ValidationException(
@ -388,7 +395,7 @@ class SAMLAuthenticator(auths.Authenticator):
# Try to parse it so we can check it is valid. Right now, it checks just that this is XML, will # Try to parse it so we can check it is valid. Right now, it checks just that this is XML, will
# correct it to check that is is valid idp metadata # correct it to check that is is valid idp metadata
try: try:
xml.sax.parseString(idpMetadata, xml.sax.ContentHandler()) # type: ignore xml.sax.parseString(idpMetadata, xml.sax.ContentHandler()) # type: ignore # nosec: url provided by admin
except Exception as e: except Exception as e:
msg = (gettext(' (obtained from URL)') if fromUrl else '') + str(e) msg = (gettext(' (obtained from URL)') if fromUrl else '') + str(e)
raise auths.Authenticator.ValidationException( raise auths.Authenticator.ValidationException(
@ -439,7 +446,7 @@ class SAMLAuthenticator(auths.Authenticator):
def getIdpMetadataDict(self, **kwargs) -> typing.Dict[str, typing.Any]: def getIdpMetadataDict(self, **kwargs) -> typing.Dict[str, typing.Any]:
if self.idpMetadata.value.startswith('http'): if self.idpMetadata.value.startswith('http'):
try: try:
resp = requests.get(self.idpMetadata.value.split('\n')[0], verify=False) resp = requests.get(self.idpMetadata.value.split('\n')[0], verify=self.checkSSLCertificate.isTrue())
val = resp.content.decode() val = resp.content.decode()
except Exception as e: except Exception as e:
logger.error('Error fetching idp metadata: %s', e) logger.error('Error fetching idp metadata: %s', e)

View File

@ -67,7 +67,7 @@ logger = logging.getLogger(__name__)
authLogger = logging.getLogger('authLog') authLogger = logging.getLogger('authLog')
USER_KEY = 'uk' USER_KEY = 'uk'
PASS_KEY = 'pk' PASS_KEY = 'pk' # nosec: this is not a password but a cookie to store encrypted data
EXPIRY_KEY = 'ek' EXPIRY_KEY = 'ek'
AUTHORIZED_KEY = 'ak' AUTHORIZED_KEY = 'ak'
ROOT_ID = -20091204 # Any negative number will do the trick ROOT_ID = -20091204 # Any negative number will do the trick
@ -456,7 +456,9 @@ def webLogout(
if request.user: if request.user:
authenticator = request.user.manager.getInstance() authenticator = request.user.manager.getInstance()
username = request.user.name username = request.user.name
exit_url = authenticator.logout(username) or exit_url logout = authenticator.logout(request, username)
if logout and logout.success == auths.AuthenticationSuccess.REDIRECT:
exit_url = logout.url
if request.user.id != ROOT_ID: if request.user.id != ROOT_ID:
# Log the event if not root user # Log the event if not root user
events.addEvent( events.addEvent(
@ -524,7 +526,7 @@ def authLogLogin(
), ),
log.WEB, log.WEB,
) )
except Exception: except Exception: # nosec: intentionally ignore exception
pass pass

View File

@ -32,10 +32,13 @@
""" """
import typing import typing
if typing.TYPE_CHECKING:
from uds.core.util.cache import Cache from uds.core.util.cache import Cache
from uds.core.util.storage import Storage from uds.core.util.storage import Storage
from uds.core.util.unique_id_generator import UniqueIDGenerator from uds.core.util.unique_id_generator import UniqueIDGenerator
TEMP_ENV = 'temporary' TEMP_ENV = 'temporary'
GLOBAL_ENV = 'global' GLOBAL_ENV = 'global'
@ -50,14 +53,14 @@ class Environment:
__slots__ = ['_key', '_cache', '_storage', '_idGenerators'] __slots__ = ['_key', '_cache', '_storage', '_idGenerators']
_key: str _key: str
_cache: Cache _cache: 'Cache'
_storage: Storage _storage: 'Storage'
_idGenerators: typing.Dict[str, UniqueIDGenerator] _idGenerators: typing.Dict[str, 'UniqueIDGenerator']
def __init__( def __init__(
self, self,
uniqueKey: str, uniqueKey: str,
idGenerators: typing.Optional[typing.Dict[str, UniqueIDGenerator]] = None, idGenerators: typing.Optional[typing.Dict[str, 'UniqueIDGenerator']] = None,
): ):
""" """
Initialized the Environment for the specified id Initialized the Environment for the specified id
@ -66,6 +69,10 @@ class Environment:
is used basically at User Services to auto-create ids for macs or names, using is used basically at User Services to auto-create ids for macs or names, using
{'mac' : UniqueMacGenerator, 'name' : UniqueNameGenerator } as argument. {'mac' : UniqueMacGenerator, 'name' : UniqueNameGenerator } as argument.
""" """
# Avoid circular imports
from uds.core.util.cache import Cache
from uds.core.util.storage import Storage
if idGenerators is None: if idGenerators is None:
idGenerators = dict() idGenerators = dict()
self._key = uniqueKey self._key = uniqueKey
@ -74,7 +81,7 @@ class Environment:
self._idGenerators = idGenerators self._idGenerators = idGenerators
@property @property
def cache(self) -> Cache: def cache(self) -> 'Cache':
""" """
Method to acces the cache of the environment. Method to acces the cache of the environment.
@return: a referente to a Cache instance @return: a referente to a Cache instance
@ -82,14 +89,14 @@ class Environment:
return self._cache return self._cache
@property @property
def storage(self) -> Storage: def storage(self) -> 'Storage':
""" """
Method to acces the cache of the environment. Method to acces the cache of the environment.
@return: a referente to an Storage Instance @return: a referente to an Storage Instance
""" """
return self._storage return self._storage
def idGenerators(self, generatorId: str) -> UniqueIDGenerator: def idGenerators(self, generatorId: str) -> 'UniqueIDGenerator':
""" """
The idea of generator of id is to obtain at some moment Ids with a proper generator. The idea of generator of id is to obtain at some moment Ids with a proper generator.
If the environment do not contains generators of id, this method will return None. If the environment do not contains generators of id, this method will return None.
@ -112,8 +119,8 @@ class Environment:
""" """
Removes all related information from database for this environment. Removes all related information from database for this environment.
""" """
Cache.delete(self._key) self._cache.clear()
Storage.delete(self._key) self._storage.clear()
for _, v in self._idGenerators.items(): for _, v in self._idGenerators.items():
v.release() v.release()
@ -156,8 +163,8 @@ class Environment:
It will not make environment persistent It will not make environment persistent
""" """
env = Environment(TEMP_ENV) env = Environment(TEMP_ENV)
env.storage.clean() env.storage.clear()
env.cache.clean() env.cache.clear()
return env return env
@staticmethod @staticmethod
@ -178,7 +185,7 @@ class Environmentable:
_env: Environment _env: Environment
def __init__(self, environment: Environment): def __init__(self, environment: 'Environment'):
""" """
Initialized the element Initialized the element
@ -188,7 +195,7 @@ class Environmentable:
self._env = environment self._env = environment
@property @property
def env(self) -> Environment: def env(self) -> 'Environment':
""" """
Utility method to access the envionment contained by this object Utility method to access the envionment contained by this object
@ -198,7 +205,7 @@ class Environmentable:
return self._env return self._env
@env.setter @env.setter
def env(self, environment: Environment): def env(self, environment: 'Environment'):
""" """
Assigns a new environment Assigns a new environment
@ -208,7 +215,7 @@ class Environmentable:
self._env = environment self._env = environment
@property @property
def cache(self) -> Cache: def cache(self) -> 'Cache':
""" """
Utility method to access the cache of the environment containe by this object Utility method to access the cache of the environment containe by this object
@ -219,7 +226,7 @@ class Environmentable:
return self._env.cache return self._env.cache
@property @property
def storage(self) -> Storage: def storage(self) -> 'Storage':
""" """
Utility method to access the storage of the environment containe by this object Utility method to access the storage of the environment containe by this object
@ -230,7 +237,7 @@ class Environmentable:
""" """
return self._env.storage return self._env.storage
def idGenerators(self, generatorId: str) -> UniqueIDGenerator: def idGenerators(self, generatorId: str) -> 'UniqueIDGenerator':
""" """
Utility method to access the id generator of the environment containe by this object Utility method to access the id generator of the environment containe by this object

View File

@ -32,7 +32,7 @@
import datetime import datetime
import hashlib import hashlib
import codecs import codecs
import pickle import pickle # nosec: This is e controled pickle loading
import typing import typing
import logging import logging
@ -61,7 +61,7 @@ class Cache:
self._bowner = self._owner.encode('utf8') self._bowner = self._owner.encode('utf8')
def __getKey(self, key: typing.Union[str, bytes]) -> str: def __getKey(self, key: typing.Union[str, bytes]) -> str:
h = hashlib.md5() h = hashlib.md5() # nosec: not used for cryptography, just for hashing
if isinstance(key, str): if isinstance(key, str):
key = key.encode('utf8') key = key.encode('utf8')
h.update(self._bowner + key) h.update(self._bowner + key)
@ -82,7 +82,7 @@ class Cache:
try: try:
# logger.debug('value: %s', c.value) # logger.debug('value: %s', c.value)
val = pickle.loads( val = pickle.loads( # nosec: This is e controled pickle loading
typing.cast(bytes, codecs.decode(c.value.encode(), 'base64')) typing.cast(bytes, codecs.decode(c.value.encode(), 'base64'))
) )
except Exception: # If invalid, simple do no tuse it except Exception: # If invalid, simple do no tuse it
@ -127,7 +127,7 @@ class Cache:
""" """
self.remove(key) self.remove(key)
def clean(self) -> None: def clear(self) -> None:
Cache.delete(self._owner) Cache.delete(self._owner)
def put( def put(

View File

@ -29,7 +29,7 @@
""" """
@author: Adolfo Gómez, dkmaster at dkmon dot com @author: Adolfo Gómez, dkmaster at dkmon dot com
""" """
import pickle import pickle # nosec: This is e controled pickle use
import base64 import base64
import hashlib import hashlib
import codecs import codecs
@ -46,7 +46,7 @@ MARK = '_mgb_'
def _calcKey(owner: bytes, key: bytes, extra: typing.Optional[bytes] = None) -> str: def _calcKey(owner: bytes, key: bytes, extra: typing.Optional[bytes] = None) -> str:
h = hashlib.md5() h = hashlib.md5() # nosec: not used for cryptography, just for hashing
h.update(owner) h.update(owner)
h.update(key) h.update(key)
if extra: if extra:
@ -66,7 +66,7 @@ def _decodeValue(
) -> typing.Tuple[str, typing.Any]: ) -> typing.Tuple[str, typing.Any]:
if value: if value:
try: try:
v = pickle.loads(base64.b64decode(value.encode())) v = pickle.loads(base64.b64decode(value.encode())) # nosec: This is e controled pickle loading
if isinstance(v, tuple) and v[0] == MARK: if isinstance(v, tuple) and v[0] == MARK:
return typing.cast(typing.Tuple[str, typing.Any], v[1:]) return typing.cast(typing.Tuple[str, typing.Any], v[1:])
# Fix value so it contains also the "key" (in this case, the original key is lost, we have only the hash value...) # Fix value so it contains also the "key" (in this case, the original key is lost, we have only the hash value...)
@ -312,7 +312,7 @@ class Storage:
def getPickle(self, skey: typing.Union[str, bytes]) -> typing.Any: def getPickle(self, skey: typing.Union[str, bytes]) -> typing.Any:
v = self.readData(skey, True) v = self.readData(skey, True)
if v: if v:
return pickle.loads(typing.cast(bytes, v)) return pickle.loads(typing.cast(bytes, v)) # nosec: This is e controled pickle loading
return None return None
def getPickleByAttr1(self, attr1: str, forUpdate: bool = False): def getPickleByAttr1(self, attr1: str, forUpdate: bool = False):
@ -320,7 +320,7 @@ class Storage:
query = DBStorage.objects.filter(owner=self._owner, attr1=attr1) query = DBStorage.objects.filter(owner=self._owner, attr1=attr1)
if forUpdate: if forUpdate:
query = query.select_for_update() query = query.select_for_update()
return pickle.loads( return pickle.loads( # nosec: This is e controled pickle loading
codecs.decode(query[0].data.encode(), 'base64') codecs.decode(query[0].data.encode(), 'base64')
) # @UndefinedVariable ) # @UndefinedVariable
except Exception: except Exception:
@ -335,7 +335,7 @@ class Storage:
try: try:
# Process several keys at once # Process several keys at once
DBStorage.objects.filter(key__in=[self.getKey(k) for k in keys]).delete() DBStorage.objects.filter(key__in=[self.getKey(k) for k in keys]).delete()
except Exception: except Exception: # nosec: Not interested in processing exceptions, just ignores it
pass pass
def lock(self): def lock(self):
@ -393,10 +393,10 @@ class Storage:
self, attr1: typing.Optional[str] = None, forUpdate: bool = False self, attr1: typing.Optional[str] = None, forUpdate: bool = False
) -> typing.Iterable[typing.Tuple[str, typing.Any, str]]: ) -> typing.Iterable[typing.Tuple[str, typing.Any, str]]:
for v in self.filter(attr1, forUpdate): for v in self.filter(attr1, forUpdate):
yield (v[0], pickle.loads(v[1]), v[2]) yield (v[0], pickle.loads(v[1]), v[2]) # nosec: secure pickle load
def clean(self): def clear(self):
self.delete(self._owner) Storage.delete(self._owner)
@staticmethod @staticmethod
def delete(owner: str) -> None: def delete(owner: str) -> None:

View File

@ -69,7 +69,7 @@ class CacheTests(TestCase):
# Checks cache clean # Checks cache clean
cache.put('key', VALUE_1) cache.put('key', VALUE_1)
cache.clean() cache.clear()
self.assertEqual(cache.get('key'), None, 'Get key from cleaned cache') self.assertEqual(cache.get('key'), None, 'Get key from cleaned cache')
# Checks cache purge # Checks cache purge