diff --git a/server/src/uds/core/environment.py b/server/src/uds/core/environment.py index 7eec1124e..9e7820bea 100644 --- a/server/src/uds/core/environment.py +++ b/server/src/uds/core/environment.py @@ -74,7 +74,7 @@ class Environment: from uds.core.util.storage import Storage # pylint: disable=import-outside-toplevel if idGenerators is None: - idGenerators = dict() + idGenerators = {} self._key = uniqueKey self._cache = Cache(uniqueKey) self._storage = Storage(uniqueKey) diff --git a/server/src/uds/core/mfas/__init__.py b/server/src/uds/core/mfas/__init__.py index d8dbee789..19c09271c 100644 --- a/server/src/uds/core/mfas/__init__.py +++ b/server/src/uds/core/mfas/__init__.py @@ -28,17 +28,14 @@ # OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. """ -UDS os managers related interfaces and classes - -.. moduleauthor:: Adolfo Gómez, dkmaster at dkmon dot com +Author: Adolfo Gómez, dkmaster at dkmon dot com """ -from .mfa import MFA +from .mfa import MFA, LoginAllowed +from .mfafactory import MFAsFactory -def factory(): +def factory() -> MFAsFactory: """ Returns factory for register/access to authenticators """ - from .mfafactory import MFAsFactory - - return MFAsFactory.factory() + return MFAsFactory() diff --git a/server/src/uds/core/mfas/mfa.py b/server/src/uds/core/mfas/mfa.py index 5f39ba49f..da1b1a0f1 100644 --- a/server/src/uds/core/mfas/mfa.py +++ b/server/src/uds/core/mfas/mfa.py @@ -37,10 +37,11 @@ import hashlib import logging import typing -from django.utils.translation import gettext_noop as _ +from django.utils.translation import gettext_noop as _, gettext from uds.core.module import Module from uds.models.util import getSqlDatetime from uds.core.auths import exceptions +from uds.models.network import Network if typing.TYPE_CHECKING: from uds.core.environment import Environment @@ -50,6 +51,54 @@ if typing.TYPE_CHECKING: logger = logging.getLogger(__name__) +class LoginAllowed(enum.StrEnum): + """ + This enum is used to know if the MFA code was sent or not. + """ + + ALLOWED = '0' + DENIED = '1' + ALLOWED_IF_IN_NETWORKS = '2' + DENIED_IF_IN_NETWORKS = '3' + + @staticmethod + def checkAction( + action: 'LoginAllowed|str', + request: 'ExtendedHttpRequest', + networks: typing.Optional[typing.Iterable[str]] = None, + ) -> bool: + def checkIp() -> bool: + if networks is None: + return True # No network restrictions, so we allow + return any( + i.contains(request.ip) + for i in Network.objects.filter(uuid__in=list(networks)) + ) + + if isinstance(action, str): + action = LoginAllowed(action) + + return { + LoginAllowed.ALLOWED: True, + LoginAllowed.DENIED: False, + LoginAllowed.ALLOWED_IF_IN_NETWORKS: checkIp(), + LoginAllowed.DENIED_IF_IN_NETWORKS: not checkIp(), + }.get(action, False) + + @staticmethod + def valuesForSelect() -> typing.Mapping[str, str]: + return { + LoginAllowed.ALLOWED.value: gettext('Allow user login'), + LoginAllowed.DENIED.value: gettext('Deny user login'), + LoginAllowed.ALLOWED_IF_IN_NETWORKS.value: gettext( + 'Allow user to login if it IP is in the networks list' + ), + LoginAllowed.DENIED_IF_IN_NETWORKS.value: gettext( + 'Deny user to login if it IP is in the networks list' + ), + } + + class MFA(Module): """ this class provides an abstraction of a Multi Factor Authentication @@ -155,8 +204,8 @@ class MFA(Module): If returns MFA.RESULT.ALLOW, the MFA code was not sent, the user does not need to enter the MFA code. If raises an error, the MFA code was not sent, and the user needs to enter the MFA code. """ - - raise NotImplementedError('sendCode method not implemented') + logger.error('MFA.sendCode not implemented') + raise exceptions.MFAError('MFA.sendCode not implemented') def _getData( self, request: 'ExtendedHttpRequest', userId: str @@ -216,10 +265,7 @@ class MFA(Module): try: if data and validity: # if we have a stored code, check if it's still valid - if ( - data[0] + datetime.timedelta(seconds=validity) - > getSqlDatetime() - ): + if data[0] + datetime.timedelta(seconds=validity) > getSqlDatetime(): # if it's still valid, just return without sending a new one return MFA.RESULT.OK except Exception: @@ -232,6 +278,7 @@ class MFA(Module): # Send the code to the user # May raise an exception if the code was not sent and is required to be sent + # pylint: disable=assignment-from-no-return result = self.sendCode(request, userId, username, identifier, code) # Store the code in the database, own storage space, if no exception was raised diff --git a/server/src/uds/core/mfas/mfafactory.py b/server/src/uds/core/mfas/mfafactory.py index 074be2dca..f73ac3959 100644 --- a/server/src/uds/core/mfas/mfafactory.py +++ b/server/src/uds/core/mfas/mfafactory.py @@ -30,32 +30,13 @@ """ @author: Adolfo Gómez, dkmaster at dkmon dot com """ -import logging import typing -from uds.core.util import singleton +from uds.core.util import factory if typing.TYPE_CHECKING: from .mfa import MFA -logger = logging.getLogger(__name__) - -class MFAsFactory(metaclass=singleton.Singleton): - _factory: typing.Optional['MFAsFactory'] = None - _mfas: typing.MutableMapping[str, typing.Type['MFA']] = {} - - @staticmethod - def factory() -> 'MFAsFactory': - return MFAsFactory() - - def providers(self) -> typing.Mapping[str, typing.Type['MFA']]: - return self._mfas - - def insert(self, type_: typing.Type['MFA']) -> None: - logger.debug('Adding Multi Factor Auth %s as %s', type_.type(), type_) - typeName = type_.type().lower() - self._mfas[typeName] = type_ - - def lookup(self, typeName: str) -> typing.Optional[typing.Type['MFA']]: - return self._mfas.get(typeName.lower(), None) +class MFAsFactory(factory.ModuleFactory['MFA']): + pass diff --git a/server/src/uds/core/osmanagers/__init__.py b/server/src/uds/core/osmanagers/__init__.py index c0628a1e8..cd8d118d0 100644 --- a/server/src/uds/core/osmanagers/__init__.py +++ b/server/src/uds/core/osmanagers/__init__.py @@ -1,7 +1,7 @@ # -*- coding: utf-8 -*- # -# Copyright (c) 2012-2021 Virtual Cable S.L.U. +# Copyright (c) 2012-2023 Virtual Cable S.L.U. # All rights reserved. # # Redistribution and use in source and binary forms, with or without modification, @@ -28,10 +28,10 @@ # OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. """ -UDS os managers related interfaces and classes - -.. moduleauthor:: Adolfo Gómez, dkmaster at dkmon dot com +Author: Adolfo Gómez, dkmaster at dkmon dot com """ + +# pylint: disable=unused-import from .osmanager import OSManager from .osmfactory import OSManagersFactory diff --git a/server/src/uds/core/osmanagers/osmanager.py b/server/src/uds/core/osmanagers/osmanager.py index ac54fd889..a81adb0a3 100644 --- a/server/src/uds/core/osmanagers/osmanager.py +++ b/server/src/uds/core/osmanagers/osmanager.py @@ -99,7 +99,7 @@ class OSManager(Module): # These methods must be overriden def actorData( - self, userService: 'UserService' + self, userService: 'UserService' # pylint: disable=unused-argument ) -> typing.MutableMapping[str, typing.Any]: """ This method provides information to actor, so actor can complete os configuration. @@ -129,7 +129,9 @@ class OSManager(Module): """ return {} - def checkState(self, userService: 'UserService') -> str: + def checkState( + self, userService: 'UserService' # pylint: disable=unused-argument + ) -> str: """ This method must be overriden so your os manager can respond to requests from system to the current state of the service This method will be invoked when: @@ -148,7 +150,9 @@ class OSManager(Module): This function can update userService values. Normal operation will be remove machines if this state is not valid """ - def isRemovableOnLogout(self, userService: 'UserService') -> bool: + def isRemovableOnLogout( + self, userService: 'UserService' # pylint: disable=unused-argument + ) -> bool: """ If returns true, when actor notifies "logout", UDS will mark service for removal can be overriden @@ -174,7 +178,10 @@ class OSManager(Module): return cls.processUserPassword != OSManager.processUserPassword def processUserPassword( - self, userService: 'UserService', username: str, password: str + self, + userService: 'UserService', # pylint: disable=unused-argument + username: str, + password: str, ) -> typing.Tuple[str, str]: """ This will be invoked prior to passsing username/password to Transport. @@ -245,7 +252,7 @@ class OSManager(Module): log.doLog( userService, log.INFO, - "User {0} has logged in".format(userName), + f'User {userName} has logged in', log.OSMANAGER, ) @@ -310,7 +317,7 @@ class OSManager(Module): log.doLog( userService, log.INFO, - "User {0} has logged out".format(userName), + f'User {userName} has logged out', log.OSMANAGER, ) diff --git a/server/src/uds/core/reports/__init__.py b/server/src/uds/core/reports/__init__.py index 93a84d33c..ed0ce411a 100644 --- a/server/src/uds/core/reports/__init__.py +++ b/server/src/uds/core/reports/__init__.py @@ -1,7 +1,7 @@ # -*- coding: utf-8 -*- # -# Copyright (c) 2015-2020 Virtual Cable S.L.U. +# Copyright (c) 2015-2023 Virtual Cable S.L.U. # All rights reserved. # # Redistribution and use in source and binary forms, with or without modification, @@ -28,6 +28,6 @@ # OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. """ -.. moduleauthor:: Adolfo Gómez, dkmaster at dkmon dot com +Author: Adolfo Gómez, dkmaster at dkmon dot com """ from .report import Report diff --git a/server/src/uds/dispatchers/opengnsys/views.py b/server/src/uds/dispatchers/opengnsys/views.py index a4d1d839a..b3b91b185 100644 --- a/server/src/uds/dispatchers/opengnsys/views.py +++ b/server/src/uds/dispatchers/opengnsys/views.py @@ -47,7 +47,12 @@ CONTENT_TYPE = 'text/plain' @auth.trustedSourceRequired -def opengnsys(request: HttpRequest, msg: str, token: str, uuid: str) -> HttpResponse: +def opengnsys( + request: HttpRequest, # pylint: disable=unused-argument + msg: str, + token: str, + uuid: str, +) -> HttpResponse: logger.debug('Received opengnsys message %s, token %s, uuid %s', msg, token, uuid) def getUserService() -> typing.Optional[UserService]: @@ -63,7 +68,7 @@ def opengnsys(request: HttpRequest, msg: str, token: str, uuid: str) -> HttpResp uuid, ) # Sleep a while in case of error? - except Exception as e: + except Exception: # Any exception will stop process logger.warning( 'OpenGnsys: invalid userService %s:%s. (Ignored)', token, uuid @@ -106,7 +111,9 @@ def opengnsys(request: HttpRequest, msg: str, token: str, uuid: str) -> HttpResp logger.debug( 'Processing logout from OpenGnsys %s', userService.friendly_name ) - actor_v3.Logout.process_logout(userService, 'OpenGnsys', '') # Close all sessions + actor_v3.Logout.process_logout( + userService, 'OpenGnsys', '' + ) # Close all sessions fnc: typing.Optional[typing.Callable[[], None]] = { 'login': login, diff --git a/server/src/uds/management/commands/config.py b/server/src/uds/management/commands/config.py index 885748049..c424a94d8 100644 --- a/server/src/uds/management/commands/config.py +++ b/server/src/uds/management/commands/config.py @@ -30,7 +30,6 @@ """ @author: Adolfo Gómez, dkmaster at dkmon dot com """ -import typing import logging from django.core.management.base import BaseCommand @@ -63,5 +62,5 @@ class Command(BaseCommand): ): # If not exists, try to store value without any special parameters Config.section(mod).value(name, value).get() except Exception as e: - self.stderr.write('The command could not be processed: {}'.format(e)) + self.stderr.write(f'The command could not be processed: {e}') logger.exception('Exception processing %s', args) diff --git a/server/src/uds/management/commands/export.py b/server/src/uds/management/commands/export.py index 5b570f3d1..d3453e479 100644 --- a/server/src/uds/management/commands/export.py +++ b/server/src/uds/management/commands/export.py @@ -198,6 +198,7 @@ def osmanager_exporter(osmanager: models.OSManager) -> typing.Dict[str, typing.A o = managed_object_exporter(osmanager) return o + def calendar_exporter(calendar: models.Calendar) -> typing.Dict[str, typing.Any]: """ Exports a calendar to a dict @@ -212,7 +213,10 @@ def calendar_exporter(calendar: models.Calendar) -> typing.Dict[str, typing.Any] ) return c -def calendar_rule_exporter(calendar_rule: models.CalendarRule) -> typing.Dict[str, typing.Any]: + +def calendar_rule_exporter( + calendar_rule: models.CalendarRule, +) -> typing.Dict[str, typing.Any]: """ Exports a calendar rule to a dict """ @@ -232,6 +236,7 @@ def calendar_rule_exporter(calendar_rule: models.CalendarRule) -> typing.Dict[st ) return c + class Command(BaseCommand): help = 'Export entities from UDS to be imported in another UDS instance' @@ -315,7 +320,7 @@ class Command(BaseCommand): entities = self.remove_reduntant_entities(options['entities']) # For each entity, export it as yaml to output file - with open(options['output'], 'w') as f: + with open(options['output'], 'w', encoding='utf8') as f: for entity in entities: self.stderr.write(f'Exporting {entity}') f.write(self.VALID_ENTITIES[entity]()) @@ -324,9 +329,7 @@ class Command(BaseCommand): if self.verbose: self.stderr.write(f'Exported to {options["output"]}') - def apply_filter( - self, model: typing.Type[ModelType] - ) -> typing.Iterable[ModelType]: + def apply_filter(self, model: typing.Type[ModelType]) -> typing.Iterable[ModelType]: """ Applies a filter to a model """ @@ -337,7 +340,9 @@ class Command(BaseCommand): self.stderr.write("\n ".join(values)) # Generate "OR" filter with all kwargs if self.filter_args: - return model.objects.filter(reduce(operator.or_, (Q(**{k: v}) for k, v in self.filter_args))) + return model.objects.filter( + reduce(operator.or_, (Q(**{k: v}) for k, v in self.filter_args)) + ) return model.objects.all() def output_count( @@ -367,16 +372,11 @@ class Command(BaseCommand): def export_services(self) -> str: # First, locate providers for services with the filter services_list = list( - self.output_count( - 'Filtering services', self.apply_filter(models.Service) - ) - ) - providers_list = set( - [ - s.provider - for s in self.output_count('Filtering providers', services_list) - ] + self.output_count('Filtering services', self.apply_filter(models.Service)) ) + providers_list = { + s.provider for s in self.output_count('Filtering providers', services_list) + } # Now, export those providers providers = [ provider_exporter(p) @@ -416,16 +416,11 @@ class Command(BaseCommand): """ # first, locate authenticators for users with the filter users_list = list( - self.output_count( - 'Filtering users', self.apply_filter(models.User) - ) - ) - authenticators_list = set( - [ - u.manager - for u in self.output_count('Filtering authenticators', users_list) - ] + self.output_count('Filtering users', self.apply_filter(models.User)) ) + authenticators_list = { + u.manager for u in self.output_count('Filtering authenticators', users_list) + } # Now, groups that contains those users groups_list = set() for u in self.output_count('Filtering groups', users_list): @@ -461,16 +456,12 @@ class Command(BaseCommand): """ # First export authenticators for groups with the filter groups_list = list( - self.output_count( - 'Filtering groups', self.apply_filter(models.Group) - ) - ) - authenticators_list = set( - [ - g.manager - for g in self.output_count('Filtering authenticators', groups_list) - ] + self.output_count('Filtering groups', self.apply_filter(models.Group)) ) + authenticators_list = { + g.manager + for g in self.output_count('Filtering authenticators', groups_list) + } authenticators = [ authenticator_exporter(a) for a in self.output_count('Saving authenticators', authenticators_list) diff --git a/server/src/uds/management/commands/fs.py b/server/src/uds/management/commands/fs.py index f835d4f65..af6c10741 100644 --- a/server/src/uds/management/commands/fs.py +++ b/server/src/uds/management/commands/fs.py @@ -1,2 +1,4 @@ # Placeholder, import the command from udsfs + +# pylint: disable=unused-import from .udsfs import Command diff --git a/server/src/uds/management/commands/showconfig.py b/server/src/uds/management/commands/showconfig.py index 53f27116d..6db1b8a0f 100644 --- a/server/src/uds/management/commands/showconfig.py +++ b/server/src/uds/management/commands/showconfig.py @@ -87,6 +87,6 @@ class Command(BaseCommand): if options['yaml']: self.stdout.write(yaml.safe_dump(writer, default_flow_style=False)) except Exception as e: - self.stdout.write('The command could not be processed: {}'.format(e)) + self.stdout.write(f'The command could not be processed: {e}') self.stdout.flush() logger.exception('Exception processing %s', args) diff --git a/server/src/uds/management/commands/taskManager.py b/server/src/uds/management/commands/taskManager.py index dd214b1db..e29b07fa5 100644 --- a/server/src/uds/management/commands/taskManager.py +++ b/server/src/uds/management/commands/taskManager.py @@ -65,7 +65,7 @@ def become_daemon( if os.fork() > 0: sys.exit(0) # kill off parent except OSError as e: - sys.stderr.write("fork #1 failed: (%d) %s\n" % (e.errno, e.strerror)) + sys.stderr.write(f'fork #1 failed: ({e.errno}) {e.strerror}') sys.exit(1) os.setsid() os.chdir(our_home_dir) @@ -76,12 +76,12 @@ def become_daemon( if os.fork() > 0: os._exit(0) # pylint: disable=protected-access except OSError as e: - sys.stderr.write("fork #2 failed: (%d) %s\n" % (e.errno, e.strerror)) + sys.stderr.write(f'fork #2 failed: ({e.errno}) {e.strerror}') os._exit(1) # pylint: disable=protected-access - si = open('/dev/null', 'r') - so = open(out_log, 'a+', 1) - se = open(err_log, 'a+', 1) + si = open('/dev/null', 'r', encoding='utf-8') # pylint: disable=consider-using-with + so = open(out_log, 'a+', 1, encoding='utf-8') # pylint: disable=consider-using-with + se = open(err_log, 'a+', 1, encoding='utf-8') # pylint: disable=consider-using-with os.dup2(si.fileno(), sys.stdin.fileno()) os.dup2(so.fileno(), sys.stdout.fileno()) os.dup2(se.fileno(), sys.stderr.fileno()) @@ -136,7 +136,7 @@ class Command(BaseCommand): pid: int = 0 try: - pid = int(open(getPidFile(), 'r').readline()) + pid = int(open(getPidFile(), 'r', encoding='utf8').readline()) # pylint: disable=consider-using-with except Exception: pid = 0 @@ -161,7 +161,8 @@ class Command(BaseCommand): ) pid = os.getpid() - open(getPidFile(), 'w+').write('{}\n'.format(pid)) + with open(getPidFile(), 'w+', encoding='utf8') as f: + f.write(f'{pid}\n') manager = taskManager() manager.run() @@ -169,7 +170,7 @@ class Command(BaseCommand): if not start and not stop: if pid: self.stdout.write( - "Task manager found running (pid file exists: {0})\n".format(pid) + f'Task manager found running (pid file exists: {pid})\n' ) else: self.stdout.write("Task manager not foud (pid file do not exits)\n") diff --git a/server/src/uds/management/commands/tree.py b/server/src/uds/management/commands/tree.py index c6c9d4b62..04bb1f49e 100644 --- a/server/src/uds/management/commands/tree.py +++ b/server/src/uds/management/commands/tree.py @@ -32,10 +32,11 @@ """ import logging import typing + import yaml -import collections from django.core.management.base import BaseCommand + from uds.core.util import log from uds import models from uds.core.util.state import State @@ -85,7 +86,7 @@ def getSerializedFromModel( removableFields = removableFields or [] passwordFields = passwordFields or [] try: - values = mod._meta.managers[0].filter(pk=mod.pk).values()[0] # type: ignore + values = mod._meta.managers[0].filter(pk=mod.pk).values()[0] # type: ignore # pylint: disable=protected-access for i in ['uuid', 'id'] + removableFields: if i in values: del values[i] @@ -118,6 +119,7 @@ class Command(BaseCommand): help='Maximum elements exported for groups and user services', ) + # pylint: disable=too-many-locals,too-many-branches,too-many-statements def handle(self, *args, **options) -> None: logger.debug("Show Tree") # firt, genertate Provider-service-servicepool tree @@ -134,14 +136,13 @@ class Command(BaseCommand): try: providers = {} for provider in models.Provider.objects.all(): - services = {} totalServices = 0 totalServicePools = 0 totalUserServices = 0 for service in provider.services.all(): servicePools = {} - numberOfServicePools = 0 + numberOfServicePools = 0 numberOfUserServices = 0 for servicePool in service.deployedServices.all(): # get assigned user services with ERROR status @@ -151,12 +152,7 @@ class Command(BaseCommand): fltr = fltr.filter(state=State.ERROR) for item in fltr[:max_items]: # at most max_items items logs = [ - '{}: {} [{}] - {}'.format( - l['date'], - log.logStrFromLevel(l['level']), - l['source'], - l['message'], - ) + f'{l["date"]}: {log.logStrFromLevel(l["level"])} [{l["source"]}] - {l["message"]}' for l in log.getLogs(item) ] userServices[item.friendly_name] = { @@ -197,9 +193,9 @@ class Command(BaseCommand): except Exception: changelogs = [] - publications[str(publication.revision)] = getSerializedFromModel( - publication, ['data'] - ) + publications[ + str(publication.revision) + ] = getSerializedFromModel(publication, ['data']) publications[str(publication.revision)][ 'changelogs' ] = changelogs @@ -242,7 +238,9 @@ class Command(BaseCommand): numberOfServicePools = len(servicePools) totalServicePools += numberOfServicePools - services[f'{service.name} ({numberOfServicePools}, {numberOfUserServices})'] = { + services[ + f'{service.name} ({numberOfServicePools}, {numberOfUserServices})' + ] = { '_': getSerializedFromManagedObject(service), 'servicePools': servicePools, } @@ -262,8 +260,12 @@ class Command(BaseCommand): for authenticator in models.Authenticator.objects.all(): # Groups grps: typing.Dict[str, typing.Any] = {} - for group in authenticator.groups.all()[:max_items]: # at most max_items items - grps[group.name] = getSerializedFromModel(group, ['manager_id', 'name']) + for group in authenticator.groups.all()[ + :max_items + ]: # at most max_items items + grps[group.name] = getSerializedFromModel( + group, ['manager_id', 'name'] + ) authenticators[authenticator.name] = { '_': getSerializedFromManagedObject(authenticator), 'groups': grps, @@ -375,6 +377,6 @@ class Command(BaseCommand): self.stdout.write(yaml.safe_dump(tree, default_flow_style=False)) except Exception as e: - self.stdout.write('The command could not be processed: {}'.format(e)) + self.stdout.write(f'The command could not be processed: {e}') self.stdout.flush() logger.exception('Exception processing %s', args) diff --git a/server/src/uds/management/commands/udsfs/events.py b/server/src/uds/management/commands/udsfs/events.py index c626765a7..385274f37 100644 --- a/server/src/uds/management/commands/udsfs/events.py +++ b/server/src/uds/management/commands/udsfs/events.py @@ -95,7 +95,7 @@ class EventFS(types.UDSFSInterface): ): # Return days of month as indicated on path month = int(path[1]) return ['.', '..'] + [ - '{:02d}'.format(x) + f'{x:02d}' for x in range(1, EventFS.number_of_days(year, month) + 1) ] diff --git a/server/src/uds/management/commands/udsfs/stats.py b/server/src/uds/management/commands/udsfs/stats.py index bcd32da77..efa658715 100644 --- a/server/src/uds/management/commands/udsfs/stats.py +++ b/server/src/uds/management/commands/udsfs/stats.py @@ -6,13 +6,13 @@ import logging from uds import models from uds.core.util.cache import Cache -from uds.core.util.stats import events, counters from . import types logger = logging.getLogger(__name__) + # Custom types class StatInterval(typing.NamedTuple): start: datetime.datetime @@ -87,13 +87,13 @@ class StatsFS(types.UDSFSInterface): self, filename: typing.List[str] ) -> typing.Tuple[DispatcherType, StatInterval, str]: if len(filename) != 1: - raise FileNotFoundError + raise FileNotFoundError() # Extract components try: dispatcher, interval, extension = (filename[0].split('.') + [''])[:3] except ValueError: - raise FileNotFoundError + raise FileNotFoundError() from None logger.debug( 'Dispatcher: %s, interval: %s, extension: %s', @@ -103,16 +103,16 @@ class StatsFS(types.UDSFSInterface): ) if dispatcher not in self._dispatchers: - raise FileNotFoundError + raise FileNotFoundError() fnc, requiresInterval = self._dispatchers[dispatcher] if extension == '' and requiresInterval is True: - raise FileNotFoundError + raise FileNotFoundError() if requiresInterval: if interval not in self._interval: - raise FileNotFoundError + raise FileNotFoundError() range = self._interval[interval] else: @@ -122,7 +122,7 @@ class StatsFS(types.UDSFSInterface): extension = interval if extension != 'csv': - raise FileNotFoundError + raise FileNotFoundError() todayStart = datetime.datetime.utcnow().replace( hour=0, minute=0, second=0, microsecond=0 @@ -174,14 +174,14 @@ class StatsFS(types.UDSFSInterface): cacheTime = 60 # Check if the file info is cached - cached = self._cache.get(path[0]+extension) + cached = self._cache.get(path[0] + extension) if cached is not None: logger.debug('Cache hit for %s', path[0]) data = cached else: logger.debug('Cache miss for %s', path[0]) data = dispatcher(interval, extension, 0, 0) - self._cache.put(path[0]+extension, data, cacheTime) + self._cache.put(path[0] + extension, data, cacheTime) # Calculate the size of the file size = len(data) @@ -206,14 +206,14 @@ class StatsFS(types.UDSFSInterface): cacheTime = 60 # Check if the file info is cached - cached = self._cache.get(path[0]+extension) + cached = self._cache.get(path[0] + extension) if cached is not None: logger.debug('Cache hit for %s', path[0]) data = cached else: logger.debug('Cache miss for %s', path[0]) data = dispatcher(interval, extension, 0, 0) - self._cache.put(path[0]+extension, data, cacheTime) + self._cache.put(path[0] + extension, data, cacheTime) # Dispatch the read to the dispatcher data = dispatcher(interval, extension, size, offset) diff --git a/server/src/uds/management/commands/udsfs/types.py b/server/src/uds/management/commands/udsfs/types.py index a5d7beeb0..d639946a1 100644 --- a/server/src/uds/management/commands/udsfs/types.py +++ b/server/src/uds/management/commands/udsfs/types.py @@ -27,7 +27,7 @@ class StatType(typing.NamedTuple): 'st_ctime': self.st_ctime, 'st_mtime': self.st_mtime, 'st_atime': self.st_atime, - 'st_nlink': self.st_nlink + 'st_nlink': self.st_nlink, } # Append optional fields if self.st_dev != -1: @@ -46,7 +46,7 @@ class StatType(typing.NamedTuple): rst['st_blksize'] = self.st_blksize if self.st_blocks != -1: rst['st_blocks'] = self.st_blocks - + return rst @@ -54,6 +54,7 @@ class UDSFSInterface: """ Base Class for UDS Info File system """ + def getattr(self, path: typing.List[str]) -> StatType: """ Get file attributes. Path is the full path to the file, already splitted. @@ -65,12 +66,12 @@ class UDSFSInterface: Get a list of files in the directory. Path is the full path to the directory, already splitted. """ raise NotImplementedError - + def read(self, path: typing.List[str], size: int, offset: int) -> bytes: """ Read a file. Path is the full path to the file, already splitted. """ raise NotImplementedError - def flush(self, path: typing.List[str]) -> None: + def flush(self, path: typing.List[str]) -> None: # pylint: disable=unused-argument return diff --git a/server/src/uds/mfas/Email/__init__.py b/server/src/uds/mfas/Email/__init__.py index d14dc4590..661dc7736 100644 --- a/server/src/uds/mfas/Email/__init__.py +++ b/server/src/uds/mfas/Email/__init__.py @@ -26,4 +26,5 @@ # OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE # OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. +# pylint: disable=unused-import from . import mfa diff --git a/server/src/uds/mfas/Email/mfa.py b/server/src/uds/mfas/Email/mfa.py index 7dabdbcc3..a725b5685 100644 --- a/server/src/uds/mfas/Email/mfa.py +++ b/server/src/uds/mfas/Email/mfa.py @@ -133,12 +133,7 @@ class EmailMFA(mfas.MFA): defaultValue='0', tooltip=_('Action for MFA response error'), required=True, - values={ - '0': _('Allow user login'), - '1': _('Deny user login'), - '2': _('Allow user to login if it IP is in the networks list'), - '3': _('Deny user to login if it IP is in the networks list'), - }, + values=mfas.LoginAllowed.valuesForSelect(), tab=_('Config'), ) @@ -157,7 +152,10 @@ class EmailMFA(mfas.MFA): label=_('Mail text'), order=33, multiline=4, - tooltip=_('Text of the email. If empty, a default text will be used') + '\n' + _('Allowed variables are: ') + '{code}, {username}, {justUsername}. {ip}', + tooltip=_('Text of the email. If empty, a default text will be used') + + '\n' + + _('Allowed variables are: ') + + '{code}, {username}, {justUsername}. {ip}', required=True, defvalue='', tab=_('Config'), @@ -168,7 +166,11 @@ class EmailMFA(mfas.MFA): label=_('Mail HTML'), order=34, multiline=4, - tooltip=_('HTML of the email. If empty, a default HTML will be used')+ '\n' + _('Allowed variables are: ') + '{code}, {username}, {justUsername}, {ip}', + tooltip=_('HTML of the email. If empty, a default HTML will be used') + + '\n' + + _('Allowed variables are: ') + + '\n' + + '{code}, {username}, {justUsername}, {ip}', required=False, defvalue='', tab=_('Config'), @@ -191,7 +193,7 @@ class EmailMFA(mfas.MFA): # Now check is valid format if ':' in hostname: host, port = validators.validateHostPortPair(hostname) - self.hostname.value = '{}:{}'.format(host, port) + self.hostname.value = f'{host}:{port}' else: host = self.hostname.cleanStr() self.hostname.value = validators.validateFqdn(host) @@ -203,7 +205,7 @@ class EmailMFA(mfas.MFA): return gettext( 'Check your mail. You will receive an email with the verification code' ) - + def initGui(self) -> None: # Populate the networks list self.networks.setValues( @@ -214,28 +216,10 @@ class EmailMFA(mfas.MFA): ] ) - def checkAction(self, action: str, request: 'ExtendedHttpRequest') -> bool: - def checkIp() -> bool: - return any( - i.contains(request.ip) - for i in models.Network.objects.filter(uuid__in=self.networks.value) - ) - - if action == '0': - return True - elif action == '1': - return False - elif action == '2': - return checkIp() - elif action == '3': - return not checkIp() - else: - return False - def emptyIndentifierAllowedToLogin( self, request: 'ExtendedHttpRequest' ) -> typing.Optional[bool]: - return self.checkAction(self.allowLoginWithoutMFA.value, request) + return mfas.LoginAllowed.checkAction(self.allowLoginWithoutMFA.value, request, self.networks.value) def label(self) -> str: return 'OTP received via email' @@ -270,7 +254,7 @@ class EmailMFA(mfas.MFA): smtp.sendmail(self.fromEmail.value, identifier, msg.as_string()) except smtplib.SMTPException as e: - logger.error('Error sending email: {}'.format(e)) + logger.error('Error sending email: %s', e) raise def sendCode( diff --git a/server/src/uds/mfas/Radius/__init__.py b/server/src/uds/mfas/Radius/__init__.py index f963c676e..661dc7736 100644 --- a/server/src/uds/mfas/Radius/__init__.py +++ b/server/src/uds/mfas/Radius/__init__.py @@ -1 +1,30 @@ +# -*- coding: utf-8 -*- +# +# Copyright (c) 2022 Virtual Cable S.L.U. +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without modification, +# are permitted provided that the following conditions are met: +# +# * Redistributions of source code must retain the above copyright notice, +# this list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above copyright notice, +# this list of conditions and the following disclaimer in the documentation +# and/or other materials provided with the distribution. +# * Neither the name of Virtual Cable S.L. nor the names of its contributors +# may be used to endorse or promote products derived from this software +# without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" +# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE +# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE +# DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE +# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL +# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR +# SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER +# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, +# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +# pylint: disable=unused-import from . import mfa diff --git a/server/src/uds/mfas/Radius/mfa.py b/server/src/uds/mfas/Radius/mfa.py index db8c92511..4b13dd65a 100644 --- a/server/src/uds/mfas/Radius/mfa.py +++ b/server/src/uds/mfas/Radius/mfa.py @@ -40,7 +40,13 @@ from uds.core import mfas from uds.core.ui import gui from uds.auths.Radius import client -from uds.auths.Radius.client import NOT_CHECKED, INCORRECT, CORRECT, NOT_NEEDED, NEEDED +from uds.auths.Radius.client import ( + # NOT_CHECKED, + INCORRECT, + CORRECT, + NOT_NEEDED, + # NEEDED +) from uds.core.auths.auth import webPassword from uds.core.auths import exceptions @@ -108,12 +114,7 @@ class RadiusOTP(mfas.MFA): defaultValue='0', tooltip=_('Action for OTP server communication error'), required=True, - values={ - '0': _('Allow user login'), - '1': _('Deny user login'), - '2': _('Allow user to login if it IP is in the networks list'), - '3': _('Deny user to login if it IP is in the networks list'), - }, + values=mfas.LoginAllowed.valuesForSelect(), tab=_('Config'), ) @@ -133,12 +134,7 @@ class RadiusOTP(mfas.MFA): defaultValue='0', tooltip=_('Action for user without defined Radius Challenge'), required=True, - values={ - '0': _('Allow user login'), - '1': _('Deny user login'), - '2': _('Allow user to login if it IP is in the networks list'), - '3': _('Deny user to login if it IP is in the networks list'), - }, + values=mfas.LoginAllowed.valuesForSelect(), tab=_('Config'), ) @@ -163,30 +159,16 @@ class RadiusOTP(mfas.MFA): nasIdentifier=self.nasIdentifier.value, ) - def checkAction(self, action: str, request: 'ExtendedHttpRequest') -> bool: - def checkIp() -> bool: - return any( - i.contains(request.ip) - for i in models.Network.objects.filter(uuid__in=self.networks.value) - ) - - if action == '0': - return True - elif action == '1': - return False - elif action == '2': - return checkIp() - elif action == '3': - return not checkIp() - else: - return False - - def checkResult(self, action: str, request: 'ExtendedHttpRequest') -> mfas.MFA.RESULT: - if self.checkAction(action, request): + def checkResult( + self, action: str, request: 'ExtendedHttpRequest' + ) -> mfas.MFA.RESULT: + if mfas.LoginAllowed.checkAction(action, request, self.networks.value): return mfas.MFA.RESULT.OK raise Exception('User not allowed to login') - def emptyIndentifierAllowedToLogin(self, request: 'ExtendedHttpRequest') -> typing.Optional[bool]: + def emptyIndentifierAllowedToLogin( + self, request: 'ExtendedHttpRequest' + ) -> typing.Optional[bool]: return None def label(self) -> str: @@ -225,8 +207,8 @@ class RadiusOTP(mfas.MFA): logger.error( "Exception found connecting to Radius OTP %s: %s", e.__class__, e ) - if not self.checkAction(self.responseErrorAction.value, request): - raise Exception(_('Radius OTP connection error')) + if not mfas.LoginAllowed.checkAction(self.responseErrorAction.value, request, self.networks.value): + raise Exception(_('Radius OTP connection error')) from e logger.warning( "Radius OTP connection error: Allowing access to user [%s] from IP [%s] without OTP", username, @@ -287,8 +269,8 @@ class RadiusOTP(mfas.MFA): logger.error( "Exception found connecting to Radius OTP %s: %s", e.__class__, e ) - if not self.checkAction(self.responseErrorAction.value, request): - raise Exception(_('Radius OTP connection error')) + if mfas.LoginAllowed.checkAction(self.responseErrorAction.value, request, self.networks.value): + raise Exception(_('Radius OTP connection error')) from e logger.warning( "Radius OTP connection error: Allowing access to user [%s] from IP [%s] without OTP", username, diff --git a/server/src/uds/mfas/SMS/__init__.py b/server/src/uds/mfas/SMS/__init__.py index dab63b648..bcd92ee13 100644 --- a/server/src/uds/mfas/SMS/__init__.py +++ b/server/src/uds/mfas/SMS/__init__.py @@ -26,7 +26,8 @@ # OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE # OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. """ -@author: Adolfo Gómez, dkmaster at dkmon dot com +Author: Adolfo Gómez, dkmaster at dkmon dot com """ +# pylint: disable=unused-import from . import mfa diff --git a/server/src/uds/mfas/SMS/mfa.py b/server/src/uds/mfas/SMS/mfa.py index a0e7c47d2..1bf77b783 100644 --- a/server/src/uds/mfas/SMS/mfa.py +++ b/server/src/uds/mfas/SMS/mfa.py @@ -26,7 +26,7 @@ # OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE # OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. """ -@author: Adolfo Gómez, dkmaster at dkmon dot com +Author: Adolfo Gómez, dkmaster at dkmon dot com """ import typing @@ -94,6 +94,7 @@ class SMSMFA(mfas.MFA): """ + typeName = _('SMS via HTTP') typeType = 'smsHttpMFA' typeDescription = _('Simple SMS sending MFA using HTTP/HTTPS') @@ -243,12 +244,7 @@ class SMSMFA(mfas.MFA): defaultValue='0', tooltip=_('Action for SMS response error'), required=True, - values={ - '0': _('Allow user login'), - '1': _('Deny user login'), - '2': _('Allow user to login if its IP is in the networks list'), - '3': _('Deny user to login if its IP is in the networks list'), - }, + values=mfas.LoginAllowed.valuesForSelect(), tab=_('Config'), ) @@ -275,7 +271,13 @@ class SMSMFA(mfas.MFA): ] ) - def composeSmsUrl(self, userId: str, userName: str, code: str, phone: str) -> str: + def composeSmsUrl( + self, + userId: str, # pylint: disable=unused-argument + userName: str, + code: str, + phone: str, + ) -> str: url = self.sendingUrl.value url = url.replace('{code}', code) url = url.replace('{phone}', phone.replace('+', '')) @@ -285,7 +287,9 @@ class SMSMFA(mfas.MFA): return url def getSession(self) -> requests.Session: - session = security.secureRequestsSession(verify=self.ignoreCertificateErrors.isTrue()) + session = security.secureRequestsSession( + verify=self.ignoreCertificateErrors.isTrue() + ) # 0 means no authentication if self.authenticationMethod.value == '1': session.auth = requests.auth.HTTPBasicAuth( @@ -307,37 +311,29 @@ class SMSMFA(mfas.MFA): session.headers[headerName.strip()] = headerValue.strip() return session - def checkAction(self, action: str, request: 'ExtendedHttpRequest') -> bool: - def checkIp() -> bool: - return any( - i.contains(request.ip) - for i in models.Network.objects.filter(uuid__in=self.networks.value) - ) - - if action == '0': - return True - elif action == '1': - return False - elif action == '2': - return checkIp() - elif action == '3': - return not checkIp() - else: - return False - - def emptyIndentifierAllowedToLogin(self, request: 'ExtendedHttpRequest') -> typing.Optional[bool]: - return self.checkAction(self.allowLoginWithoutMFA.value, request) + def emptyIndentifierAllowedToLogin( + self, request: 'ExtendedHttpRequest' + ) -> typing.Optional[bool]: + return mfas.LoginAllowed.checkAction( + self.allowLoginWithoutMFA.value, request, self.networks.value + ) def processResponse( self, request: 'ExtendedHttpRequest', response: requests.Response ) -> mfas.MFA.RESULT: logger.debug('Response: %s', response) if not response.ok: - logger.warning('SMS sending failed: code: %s, content: %s', response.status_code, response.text) - if not self.checkAction(self.responseErrorAction.value, request): + logger.warning( + 'SMS sending failed: code: %s, content: %s', + response.status_code, + response.text, + ) + if not mfas.LoginAllowed.checkAction( + self.responseErrorAction.value, request, self.networks.value + ): raise Exception(_('SMS sending failed')) return mfas.MFA.RESULT.ALLOWED # Allow login, NO MFA code was sent - elif self.responseOkRegex.value.strip(): + if self.responseOkRegex.value.strip(): logger.debug( 'Checking response OK regex: %s: (%s)', self.responseOkRegex.value, @@ -348,17 +344,19 @@ class SMSMFA(mfas.MFA): 'SMS response error: %s', response.text, ) - if not self.checkAction(self.responseErrorAction.value, request): + if not mfas.LoginAllowed.checkAction( + self.responseErrorAction.value, request, self.networks.value + ): raise Exception(_('SMS response error')) return mfas.MFA.RESULT.ALLOWED return mfas.MFA.RESULT.OK def getData( self, - request: 'ExtendedHttpRequest', - userId: str, + request: 'ExtendedHttpRequest', # pylint: disable=unused-argument + userId: str, # pylint: disable=unused-argument username: str, - url: str, + url: str, # pylint: disable=unused-argument code: str, phone: str, ) -> bytes: @@ -374,7 +372,11 @@ class SMSMFA(mfas.MFA): return data.encode(self.encoding.value) def sendSMS_GET( - self, request: 'ExtendedHttpRequest', userId: str, username: str, url: str + self, + request: 'ExtendedHttpRequest', + userId: str, # pylint: disable=unused-argument + username: str, # pylint: disable=unused-argument + url: str, ) -> mfas.MFA.RESULT: return self.processResponse(request, self.getSession().get(url)) @@ -405,7 +407,6 @@ class SMSMFA(mfas.MFA): phone: str, ) -> mfas.MFA.RESULT: # Compose POST data - data = '' bdata = self.getData(request, userId, username, url, code, phone) return self.processResponse(request, self.getSession().put(url, data=bdata)) @@ -420,12 +421,11 @@ class SMSMFA(mfas.MFA): url = self.composeSmsUrl(userId, username, code, phone) if self.sendingMethod.value == 'GET': return self.sendSMS_GET(request, userId, username, url) - elif self.sendingMethod.value == 'POST': + if self.sendingMethod.value == 'POST': return self.sendSMS_POST(request, userId, username, url, code, phone) - elif self.sendingMethod.value == 'PUT': + if self.sendingMethod.value == 'PUT': return self.sendSMS_PUT(request, userId, username, url, code, phone) - else: - raise Exception('Unknown SMS sending method') + raise Exception('Unknown SMS sending method') def label(self) -> str: return gettext('MFA Code') diff --git a/server/src/uds/mfas/Sample/__init__.py b/server/src/uds/mfas/Sample/__init__.py index dab63b648..bcd92ee13 100644 --- a/server/src/uds/mfas/Sample/__init__.py +++ b/server/src/uds/mfas/Sample/__init__.py @@ -26,7 +26,8 @@ # OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE # OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. """ -@author: Adolfo Gómez, dkmaster at dkmon dot com +Author: Adolfo Gómez, dkmaster at dkmon dot com """ +# pylint: disable=unused-import from . import mfa diff --git a/server/src/uds/mfas/TOTP/__init__.py b/server/src/uds/mfas/TOTP/__init__.py index f963c676e..bcd92ee13 100644 --- a/server/src/uds/mfas/TOTP/__init__.py +++ b/server/src/uds/mfas/TOTP/__init__.py @@ -1 +1,33 @@ +# -*- coding: utf-8 -*- +# +# Copyright (c) 2022 Virtual Cable S.L.U. +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without modification, +# are permitted provided that the following conditions are met: +# +# * Redistributions of source code must retain the above copyright notice, +# this list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above copyright notice, +# this list of conditions and the following disclaimer in the documentation +# and/or other materials provided with the distribution. +# * Neither the name of Virtual Cable S.L. nor the names of its contributors +# may be used to endorse or promote products derived from this software +# without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" +# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE +# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE +# DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE +# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL +# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR +# SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER +# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, +# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. +""" +Author: Adolfo Gómez, dkmaster at dkmon dot com +""" + +# pylint: disable=unused-import from . import mfa diff --git a/server/src/uds/mfas/TOTP/mfa.py b/server/src/uds/mfas/TOTP/mfa.py index 82b0100a8..336c5b896 100644 --- a/server/src/uds/mfas/TOTP/mfa.py +++ b/server/src/uds/mfas/TOTP/mfa.py @@ -26,7 +26,7 @@ # OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE # OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. """ -@author: Adolfo Gómez, dkmaster at dkmon dot com +Author: Adolfo Gómez, dkmaster at dkmon dot com """ import typing import logging @@ -155,7 +155,7 @@ class TOTP_MFA(mfas.MFA): def html(self, request: 'ExtendedHttpRequest', userId: str, username: str) -> str: # Get data from storage related to this user - secret, qrShown = self._userData(userId) + qrShown = self._userData(userId)[1] if qrShown: return _('Enter your authentication code') # Compose the QR code from provisioning URI diff --git a/server/src/uds/mfas/__init__.py b/server/src/uds/mfas/__init__.py index b19381323..d65960cd5 100644 --- a/server/src/uds/mfas/__init__.py +++ b/server/src/uds/mfas/__init__.py @@ -45,24 +45,22 @@ import importlib import sys import typing -def __init__(): + +import logging + +from uds.core.util import modfinder + +logger = logging.getLogger(__name__) + + +def __loadModules(): """ This imports all packages that are descendant of this package, and, after that, - it register all subclases of authenticator as + it register all subclases of mfas.MFA """ - from uds.core import mfas + from uds.core import mfas # pylint: disable=import-outside-toplevel - # Dinamycally import children of this package. The __init__.py files must declare mfs as subclasses of mfas.MFA - pkgpath = os.path.dirname(typing.cast(str, sys.modules[__name__].__file__)) - for _, name, _ in pkgutil.iter_modules([pkgpath]): - # __import__(name, globals(), locals(), [], 1) - importlib.import_module('.' + name, __name__) # import module - - importlib.invalidate_caches() - - a = mfas.MFA - for cls in a.__subclasses__(): - mfas.factory().insert(cls) + modfinder.dynamicLoadAndRegisterModules(mfas.factory(), mfas.MFA, __name__) -__init__() +__loadModules() diff --git a/server/src/uds/middleware/builder.py b/server/src/uds/middleware/builder.py index 0c42627d2..938f4a41e 100644 --- a/server/src/uds/middleware/builder.py +++ b/server/src/uds/middleware/builder.py @@ -77,12 +77,12 @@ def build_middleware( ) return async_middleware - else: - def sync_middleware(request: 'ExtendedHttpRequest') -> 'HttpResponse': - response = request_processor(request) - return response_processor(request, response or get_response(request)) + # Sync middleware + def sync_middleware(request: 'ExtendedHttpRequest') -> 'HttpResponse': + response = request_processor(request) + return response_processor(request, response or get_response(request)) - return sync_middleware + return sync_middleware return middleware diff --git a/server/src/uds/middleware/redirect.py b/server/src/uds/middleware/redirect.py index e806184a1..4add5dfa0 100644 --- a/server/src/uds/middleware/redirect.py +++ b/server/src/uds/middleware/redirect.py @@ -34,6 +34,7 @@ import typing from django.urls import reverse from django.http import HttpResponsePermanentRedirect +from django.conf import settings from . import builder @@ -43,7 +44,7 @@ if typing.TYPE_CHECKING: from django.http import HttpRequest, HttpResponse def _check_redirectable(request: 'HttpRequest') -> typing.Optional['HttpResponse']: - if request.is_secure(): + if request.is_secure() or settings.DEBUG: return None return HttpResponsePermanentRedirect(request.build_absolute_uri(reverse('page.index')).replace('http://', 'https://', 1)) diff --git a/server/src/uds/middleware/request.py b/server/src/uds/middleware/request.py index 070d58d77..396b4ffbd 100644 --- a/server/src/uds/middleware/request.py +++ b/server/src/uds/middleware/request.py @@ -47,7 +47,7 @@ from uds.core.auths.auth import ( ) from uds.models import User -from . import builder +from . import builder if typing.TYPE_CHECKING: from django.http import HttpResponse diff --git a/server/src/uds/middleware/security.py b/server/src/uds/middleware/security.py index a3b0f7c50..9c16bc7fc 100644 --- a/server/src/uds/middleware/security.py +++ b/server/src/uds/middleware/security.py @@ -33,16 +33,16 @@ import re import logging import typing -logger = logging.getLogger(__name__) from django.http import HttpResponseForbidden from uds.core.util.config import GlobalConfig from uds.core.auths.auth import isTrustedSource, IP_KEY - from . import builder +logger = logging.getLogger(__name__) + if typing.TYPE_CHECKING: from django.http import HttpResponse from uds.core.util.request import ExtendedHttpRequest @@ -66,7 +66,7 @@ def _process_request(request: 'ExtendedHttpRequest') -> typing.Optional['HttpRes request.path, ) return HttpResponseForbidden(content='Forbbiden', content_type='text/plain') - + if GlobalConfig.ENHANCED_SECURITY.getBool(): # Check that ip stored in session is the same as the one that is requesting if user is logged in session_ip = request.session.get(IP_KEY, None) @@ -79,12 +79,13 @@ def _process_request(request: 'ExtendedHttpRequest') -> typing.Optional['HttpRes request.session.get('ip', None), ) return HttpResponseForbidden(content='Forbbiden', content_type='text/plain') - + return None def _process_response( - request: 'ExtendedHttpRequest', response: 'HttpResponse' + request: 'ExtendedHttpRequest', # pylint: disable=unused-argument + response: 'HttpResponse', ) -> 'HttpResponse': if GlobalConfig.ENHANCED_SECURITY.getBool(): # Legacy browser support for X-XSS-Protection diff --git a/server/src/uds/middleware/xua.py b/server/src/uds/middleware/xua.py index b76755083..30ed23079 100644 --- a/server/src/uds/middleware/xua.py +++ b/server/src/uds/middleware/xua.py @@ -39,13 +39,16 @@ if typing.TYPE_CHECKING: logger = logging.getLogger(__name__) + def _process_response( - request: 'ExtendedHttpRequest', response: 'HttpResponse' + request: 'ExtendedHttpRequest', # pylint: disable=unused-argument + response: 'HttpResponse', ) -> 'HttpResponse': if response.get('content-type', '').startswith('text/html'): response['X-UA-Compatible'] = 'IE=edge' return response + # Add a X-UA-Compatible header to the response # This header tells to Internet Explorer to render page with latest # possible version or to use chrome frame if it is installed. TO BE REMOVED SOON (edge does not need it) diff --git a/server/src/uds/migrations/0044_notification_notifier_servicetokenalias_and_more.py b/server/src/uds/migrations/0044_notification_notifier_servicetokenalias_and_more.py index 02d6febf8..e812e9a27 100644 --- a/server/src/uds/migrations/0044_notification_notifier_servicetokenalias_and_more.py +++ b/server/src/uds/migrations/0044_notification_notifier_servicetokenalias_and_more.py @@ -9,33 +9,37 @@ import uds.models.util # Remove ServicePools with null service field -def remove_null_service_pools(apps, schema_editor): +def remove_null_service_pools(apps, schema_editor): # pylint: disable=unused-argument ServicePool = apps.get_model('uds', 'ServicePool') ServicePool.objects.filter(service__isnull=True).delete() + # No-Op backwards migration -def nop(apps, schema_editor): # pragma: no cover +def nop(apps, schema_editor): # pylint: disable=unused-argument pass # Python update network fields to allow ipv6 -# We will -def update_network_model(apps, schema_editor): - import uds.models.network +# We will +def update_network_model(apps, schema_editor): # pylint: disable=unused-argument + import uds.models.network # pylint: disable=import-outside-toplevel,redefined-outer-name + Network = apps.get_model('uds', 'Network') try: for net in Network.objects.all(): # Store the net_start and net_end on new fields "start" and "end", that are strings # to allow us to store ipv6 addresses + # pylint: disable=protected-access net.start = uds.models.network.Network._hexlify(net.net_start) + # pylint: disable=protected-access net.end = uds.models.network.Network._hexlify(net.net_end) net.version = 4 # Previous versions only supported ipv4 net.save() except Exception as e: - print('Error updating network model: {}'.format(e)) + print(f'Error updating network model: {e}') + class Migration(migrations.Migration): - dependencies = [ ("uds", "0043_auto_20220704_2120"), ] @@ -133,7 +137,7 @@ class Migration(migrations.Migration): models.CharField( blank=True, db_index=True, - default=uds.models.user_service_session._session_id_generator, + default=uds.models.user_service_session._session_id_generator, # pylint: disable=protected-access max_length=128, ), ), diff --git a/server/src/uds/models/__init__.py b/server/src/uds/models/__init__.py index 91e43a873..df50837e2 100644 --- a/server/src/uds/models/__init__.py +++ b/server/src/uds/models/__init__.py @@ -1,7 +1,7 @@ -# -*- coding: utf-8 -*- +# pylint: disable=unused-import # -# Copyright (c) 2012-2020 Virtual Cable S.L.U. +# Copyright (c) 2012-2023 Virtual Cable S.L.U. # All rights reserved. # # Redistribution and use in source and binary forms, with or without modification, @@ -28,7 +28,7 @@ # OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. """ -.. moduleauthor:: Adolfo Gómez, dkmaster at dkmon dot com +Author: Adolfo Gómez, dkmaster at dkmon dot com """ import logging diff --git a/server/src/uds/models/account.py b/server/src/uds/models/account.py index 0d136d4be..9284b2c7a 100644 --- a/server/src/uds/models/account.py +++ b/server/src/uds/models/account.py @@ -92,7 +92,7 @@ class Account(UUIDModel, TaggingMixin): tmp.save() return tmp - class Meta: + class Meta: # pylint: disable=too-few-public-methods """ Meta class to declare the name of the table at database """ @@ -101,4 +101,4 @@ class Account(UUIDModel, TaggingMixin): app_label = 'uds' def __str__(self): - return 'Account id {}, name {}'.format(self.id, self.name) + return f'Account id {self.id}, name {self.name}' diff --git a/server/src/uds/models/account_usage.py b/server/src/uds/models/account_usage.py index 4869ee3f8..728ae7f05 100644 --- a/server/src/uds/models/account_usage.py +++ b/server/src/uds/models/account_usage.py @@ -46,7 +46,7 @@ if typing.TYPE_CHECKING: logger = logging.getLogger(__name__) - +# pylint: disable=no-member # pylint complais a lot about members of models... class AccountUsage(UUIDModel): """ AccountUsage storing on DB model @@ -74,7 +74,7 @@ class AccountUsage(UUIDModel): Account, related_name='usages', on_delete=models.CASCADE ) - class Meta: + class Meta: # pylint: disable=too-few-public-methods """ Meta class to declare the name of the table at database """ @@ -111,6 +111,4 @@ class AccountUsage(UUIDModel): return secondsToTimeString(self.elapsed_seconds_timemark) def __str__(self): - return 'AccountUsage id {}, pool {}, name {}, start {}, end {}'.format( - self.id, self.pool_name, self.user_name, self.start, self.end - ) + return f'AccountUsage id {self.id}, pool {self.pool_name}, name {self.user_name}, start {self.start}, end {self.end}' diff --git a/server/src/uds/models/actor_token.py b/server/src/uds/models/actor_token.py index 9ed89d7fa..00ffa1a81 100644 --- a/server/src/uds/models/actor_token.py +++ b/server/src/uds/models/actor_token.py @@ -1,6 +1,6 @@ # -*- coding: utf-8 -*- # -# Copyright (c) 2012-2020 Virtual Cable S.L.U. +# Copyright (c) 2012-2023 Virtual Cable S.L.U. # All rights reserved. # # Redistribution and use in source and binary forms, with or without modification, @@ -26,12 +26,13 @@ # OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE # OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. ''' -.. moduleauthor:: Adolfo Gómez, dkmaster at dkmon dot com +Author: Adolfo Gómez, dkmaster at dkmon dot com ''' from django.db import models from .util import MAX_IPV6_LENGTH, MAX_DNS_NAME_LENGTH + class ActorToken(models.Model): """ UDS Actors tokens on DB @@ -41,7 +42,7 @@ class ActorToken(models.Model): ip_from = models.CharField(max_length=MAX_IPV6_LENGTH) ip = models.CharField(max_length=MAX_IPV6_LENGTH) ip_version = models.IntegerField(default=4) # Version of ip fields - + hostname = models.CharField(max_length=MAX_DNS_NAME_LENGTH) mac = models.CharField(max_length=128, db_index=True, unique=True) pre_command = models.CharField(max_length=255, blank=True, default='') @@ -52,13 +53,8 @@ class ActorToken(models.Model): token = models.CharField(max_length=48, db_index=True, unique=True) stamp = models.DateTimeField() # Date creation or validation of this entry - # "fake" declarations for type checking - # objects: 'models.manager.Manager[ActorToken]' - - class Meta: + class Meta: # pylint: disable=too-few-public-methods app_label = 'uds' def __str__(self): - return ''.format( - self.token, self.stamp, self.username, self.hostname, self.ip_from - ) + return f'' diff --git a/server/src/uds/models/authenticator.py b/server/src/uds/models/authenticator.py index c7130be11..ebbefacaf 100644 --- a/server/src/uds/models/authenticator.py +++ b/server/src/uds/models/authenticator.py @@ -1,6 +1,6 @@ # -*- coding: utf-8 -*- # -# Copyright (c) 2012-2021 Virtual Cable S.L.U. +# Copyright (c) 2012-2023 Virtual Cable S.L.U. # All rights reserved. # # Redistribution and use in source and binary forms, with or without modification, @@ -27,13 +27,12 @@ # OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. """ -.. moduleauthor:: Adolfo Gómez, dkmaster at dkmon dot com +Author: Adolfo Gómez, dkmaster at dkmon dot com """ import logging import typing from django.db import models -from django.db.models import signals from uds.core import auths from uds.core import environment @@ -57,6 +56,7 @@ class Authenticator(ManagedObjectModel, TaggingMixin): This class represents an Authenticator inside the platform. Sample authenticators are LDAP, Active Directory, SAML, ... """ + # Constants for Visibility VISIBLE = 'v' HIDDEN = 'h' @@ -82,7 +82,13 @@ class Authenticator(ManagedObjectModel, TaggingMixin): networks: 'models.manager.RelatedManager[Network]' # MFA associated to this authenticator. Can be null - mfa = models.ForeignKey('MFA', on_delete=models.SET_NULL, null=True, blank=True, related_name='authenticators') + mfa = models.ForeignKey( + 'MFA', + on_delete=models.SET_NULL, + null=True, + blank=True, + related_name='authenticators', + ) class Meta(ManagedObjectModel.Meta): # pylint: disable=too-few-public-methods """ @@ -227,9 +233,13 @@ class Authenticator(ManagedObjectModel, TaggingMixin): ip, version = net.ipToLong(ipStr) # Allow if self.net_filtering == Authenticator.ALLOW: - return self.networks.filter(net_start__lte=ip, net_end__gte=ip, version=version).exists() + return self.networks.filter( + net_start__lte=ip, net_end__gte=ip, version=version + ).exists() # Deny, must not be in any network - return self.networks.filter(net_start__lte=ip, net_end__gte=ip).exists() is False + return ( + self.networks.filter(net_start__lte=ip, net_end__gte=ip).exists() is False + ) @staticmethod def all() -> 'models.QuerySet[Authenticator]': @@ -244,7 +254,10 @@ class Authenticator(ManagedObjectModel, TaggingMixin): Gets authenticator by tag name. Special tag name "disabled" is used to exclude customAuth """ - from uds.core.util.config import GlobalConfig + # pylint: disable=import-outside-toplevel + from uds.core.util.config import ( + GlobalConfig, + ) if tag is not None: authsList = Authenticator.objects.filter(small_name=tag).order_by( @@ -264,7 +277,7 @@ class Authenticator(ManagedObjectModel, TaggingMixin): yield auth @staticmethod - def beforeDelete(sender, **kwargs) -> None: + def beforeDelete(sender, **kwargs) -> None: # pylint: disable=unused-argument """ Used to invoke the Service class "Destroy" before deleting it from database. @@ -273,7 +286,10 @@ class Authenticator(ManagedObjectModel, TaggingMixin): :note: If destroy raises an exception, the deletion is not taken. """ - from uds.core.util.permissions import clean + # pylint: disable=import-outside-toplevel + from uds.core.util.permissions import ( + clean, + ) toDelete = kwargs['instance'] @@ -291,7 +307,6 @@ class Authenticator(ManagedObjectModel, TaggingMixin): # Clears related permissions clean(toDelete) - # returns CSV header @staticmethod def getCSVHeader(sep: str = ',') -> str: @@ -315,9 +330,8 @@ class Authenticator(ManagedObjectModel, TaggingMixin): ] ) - def __str__(self): - return u"{0} of type {1} (id:{2})".format(self.name, self.data_type, self.id) + return f'{self.name} of type {self.data_type} (id:{self.id})' # Connects a pre deletion signal to Authenticator diff --git a/server/src/uds/models/user_service_session.py b/server/src/uds/models/user_service_session.py index d4d002b4f..e7c9ae09f 100644 --- a/server/src/uds/models/user_service_session.py +++ b/server/src/uds/models/user_service_session.py @@ -41,6 +41,7 @@ from .util import getSqlDatetime logger = logging.getLogger(__name__) + def _session_id_generator() -> str: """ Generates a new session id @@ -54,7 +55,9 @@ class UserServiceSession(models.Model): # pylint: disable=too-many-public-metho The value field is a Text field, so we can put whatever we want in it """ - session_id = models.CharField(max_length=128, db_index=True, default=_session_id_generator, blank=True) + session_id = models.CharField( + max_length=128, db_index=True, default=_session_id_generator, blank=True + ) start = models.DateTimeField(default=getSqlDatetime) end = models.DateTimeField(null=True, blank=True) @@ -65,7 +68,7 @@ class UserServiceSession(models.Model): # pylint: disable=too-many-public-metho # "fake" declarations for type checking # objects: 'models.manager.Manager["UserServiceSession"]' - class Meta: + class Meta: # pylint: disable=too-few-public-methods """ Meta class to declare default order and unique multiple field index """ @@ -79,10 +82,8 @@ class UserServiceSession(models.Model): # pylint: disable=too-many-public-metho ] def __str__(self) -> str: - return 'Session {}. ({} to {}'.format( - self.session_id, self.start, self.end - ) - + return f'Session {self.session_id} ({self.start} to {self.end})' + def close(self) -> None: """ Ends the session diff --git a/server/src/uds/services/__init__.py b/server/src/uds/services/__init__.py index 6cd45f868..466846fa4 100644 --- a/server/src/uds/services/__init__.py +++ b/server/src/uds/services/__init__.py @@ -43,7 +43,7 @@ The registration of modules is done locating subclases of :py:class:`uds.core.au import logging -from uds.core.util import modfinder +from uds.core.util import modfinder logger = logging.getLogger(__name__) @@ -53,12 +53,11 @@ def __loadModules(): This imports all packages that are descendant of this package, and, after that, it register all subclases of service provider as """ - from uds.core import services - + from uds.core import services # pylint: disable=import-outside-toplevel + modfinder.dynamicLoadAndRegisterModules( - services.factory(), - services.ServiceProvider, - __name__ + services.factory(), services.ServiceProvider, __name__ ) + __loadModules()