diff --git a/server/src/uds/core/services/provider.py b/server/src/uds/core/services/provider.py index 248e18df0..9e35b53a4 100644 --- a/server/src/uds/core/services/provider.py +++ b/server/src/uds/core/services/provider.py @@ -208,7 +208,7 @@ class ServiceProvider(Module): val = getattr(val, 'value', val) return val is True or val == gui.TRUE - def doLog(self, level: int, message: str) -> None: + def doLog(self, level: log.LogLevel, message: str) -> None: """ Logs a message with requested level associated with this service """ diff --git a/server/src/uds/core/util/auto_serializable.py b/server/src/uds/core/util/auto_serializable.py index 4597a0f85..f25efd352 100644 --- a/server/src/uds/core/util/auto_serializable.py +++ b/server/src/uds/core/util/auto_serializable.py @@ -95,7 +95,6 @@ def fernet_key(crypt_key: bytes) -> str: Note: if password is not set, this will raise an exception """ # Generate an URL-Safe base64 encoded 32 bytes key for Fernet - # First, with seed + password, generate a 32 bytes key based on seed + password return base64.b64encode(hashlib.sha256(crypt_key).digest()).decode() # pylint: disable=unnecessary-dunder-call diff --git a/server/src/uds/management/commands/tree.py b/server/src/uds/management/commands/tree.py index 04bb1f49e..ef6504a26 100644 --- a/server/src/uds/management/commands/tree.py +++ b/server/src/uds/management/commands/tree.py @@ -152,7 +152,7 @@ class Command(BaseCommand): fltr = fltr.filter(state=State.ERROR) for item in fltr[:max_items]: # at most max_items items logs = [ - f'{l["date"]}: {log.logStrFromLevel(l["level"])} [{l["source"]}] - {l["message"]}' + f'{l["date"]}: {log.LogLevel.fromStr(l["level"])} [{l["source"]}] - {l["message"]}' for l in log.getLogs(item) ] userServices[item.friendly_name] = { diff --git a/server/src/uds/mfas/Sample/mfa.py b/server/src/uds/mfas/Sample/mfa.py index 2843684cb..eb14f9baf 100644 --- a/server/src/uds/mfas/Sample/mfa.py +++ b/server/src/uds/mfas/Sample/mfa.py @@ -68,4 +68,3 @@ class SampleMFA(mfas.MFA): def sendCode(self, request: 'ExtendedHttpRequest', userId: str, username: str, identifier: str, code: str) -> mfas.MFA.RESULT: logger.debug('Sending code: %s (from %s)', code, request.ip) return mfas.MFA.RESULT.OK - diff --git a/server/src/uds/osmanagers/LinuxOsManager/__init__.py b/server/src/uds/osmanagers/LinuxOsManager/__init__.py index 5e367fc6e..46fa49900 100644 --- a/server/src/uds/osmanagers/LinuxOsManager/__init__.py +++ b/server/src/uds/osmanagers/LinuxOsManager/__init__.py @@ -43,60 +43,50 @@ from .linux_osmanager import LinuxOsManager from .linux_randompass_osmanager import LinuxRandomPassManager downloadsManager().registerDownloadable( - 'udsactor_{version}_all.deb'.format(version=VERSION), - _( - 'UDS Actor for Debian, Ubuntu, ... Linux machines (Requires python >= 3.6)' - ), - os.path.dirname(typing.cast(str, sys.modules[__package__].__file__)) - + '/files/udsactor_{version}_all.deb'.format(version=VERSION), + f'udsactor_{VERSION}_all.deb', + _('UDS Actor for Debian, Ubuntu, ... Linux machines (Requires python >= 3.6)'), + os.path.dirname(typing.cast(str, sys.modules[__package__].__file__)) + f'/files/udsactor_{VERSION}_all.deb', 'application/x-debian-package', ) downloadsManager().registerDownloadable( - 'udsactor-{version}-1.noarch.rpm'.format(version=VERSION), - _( - 'UDS Actor for Centos, Fedora, RH, Suse, ... Linux machines (Requires python >= 3.6)' - ), + f'udsactor-{VERSION}-1.noarch.rpm', + _('UDS Actor for Centos, Fedora, RH, Suse, ... Linux machines (Requires python >= 3.6)'), os.path.dirname(typing.cast(str, sys.modules[__package__].__file__)) - + '/files/udsactor-{version}-1.noarch.rpm'.format(version=VERSION), + + f'/files/udsactor-{VERSION}-1.noarch.rpm', 'application/x-redhat-package-manager', ) downloadsManager().registerDownloadable( - 'udsactor-unmanaged_{version}_all.deb'.format(version=VERSION), + f'udsactor-unmanaged_{VERSION}_all.deb', _( 'UDS Actor for Debian based Linux machines. Used ONLY for static machines. (Requires python >= 3.6)' ), os.path.dirname(typing.cast(str, sys.modules[__package__].__file__)) - + '/files/udsactor-unmanaged_{version}_all.deb'.format(version=VERSION), + + f'/files/udsactor-unmanaged_{VERSION}_all.deb', 'application/x-debian-package', ) downloadsManager().registerDownloadable( - 'udsactor-unmanaged-{version}-1.noarch.rpm'.format(version=VERSION), + f'udsactor-unmanaged-{VERSION}-1.noarch.rpm', _( 'UDS Actor for Centos, Fedora, RH, Suse, ... Linux machines. Used ONLY for static machines. (Requires python >= 3.6)' ), os.path.dirname(typing.cast(str, sys.modules[__package__].__file__)) - + '/files/udsactor-unmanaged-{version}-1.noarch.rpm'.format(version=VERSION), + + f'/files/udsactor-unmanaged-{VERSION}-1.noarch.rpm', 'application/x-redhat-package-manager', ) downloadsManager().registerDownloadable( 'udsactor_2.2.0_legacy.deb', - _( - 'Legacy UDS Actor for Debian, Ubuntu, ... Linux machines (Requires python 2.7)' - ), - os.path.dirname(typing.cast(str, sys.modules[__package__].__file__)) - + '/files/udsactor_2.2.0_legacy.deb', + _('Legacy UDS Actor for Debian, Ubuntu, ... Linux machines (Requires python 2.7)'), + os.path.dirname(typing.cast(str, sys.modules[__package__].__file__)) + '/files/udsactor_2.2.0_legacy.deb', 'application/x-debian-package', ) downloadsManager().registerDownloadable( 'udsactor-legacy-2.2.1-1.noarch.rpm', - _( - 'Legacy UDS Actor for Centos, Fedora, RH, ... Linux machines (Requires python 2.7)' - ), + _('Legacy UDS Actor for Centos, Fedora, RH, ... Linux machines (Requires python 2.7)'), os.path.dirname(typing.cast(str, sys.modules[__package__].__file__)) + '/files/udsactor-legacy-2.2.1-1.noarch.rpm', 'application/x-redhat-package-manager', @@ -104,9 +94,7 @@ downloadsManager().registerDownloadable( downloadsManager().registerDownloadable( 'udsactor-opensuse-legacy-2.2.1-1.noarch.rpm', - _( - 'Legacy UDS Actor for OpenSUSE, ... Linux machines (Requires python 2.7)' - ), + _('Legacy UDS Actor for OpenSUSE, ... Linux machines (Requires python 2.7)'), os.path.dirname(typing.cast(str, sys.modules[__package__].__file__)) + '/files/udsactor-opensuse-legacy-2.2.1-1.noarch.rpm', 'application/x-redhat-package-manager', diff --git a/server/src/uds/osmanagers/LinuxOsManager/linux_osmanager.py b/server/src/uds/osmanagers/LinuxOsManager/linux_osmanager.py index 99ffd4941..d56f753a0 100644 --- a/server/src/uds/osmanagers/LinuxOsManager/linux_osmanager.py +++ b/server/src/uds/osmanagers/LinuxOsManager/linux_osmanager.py @@ -94,11 +94,11 @@ class LinuxOsManager(osmanagers.OSManager): defvalue=gui.TRUE, ) - def __setProcessUnusedMachines(self): + def __setProcessUnusedMachines(self) -> None: self.processUnusedMachines = self._onLogout == 'remove' - def __init__(self, environment, values): - super(LinuxOsManager, self).__init__(environment, values) + def __init__(self, environment, values) -> None: + super().__init__(environment, values) if values is not None: self._onLogout = values['onLogout'] self._idle = int(values['idle']) @@ -128,31 +128,31 @@ class LinuxOsManager(osmanagers.OSManager): return False - def getName(self, service): + def getName(self, service: 'UserService') -> str: """ gets name from deployed """ return service.getName() - def doLog(self, service, data, origin=log.LogSource.OSMANAGER): + def doLog(self, service: 'UserService', data, origin=log.LogSource.OSMANAGER) -> None: # Stores a log associated with this service try: - msg, level = data.split('\t') + msg, slevel = data.split('\t') try: - level = int(level) + level = log.LogLevel.fromStr(slevel) except Exception: - logger.debug('Do not understand level %s', level) + logger.debug('Do not understand level %s', slevel) level = log.LogLevel.INFO log.doLog(service, level, msg, origin) except Exception: - log.doLog(service, log.LogLevel.ERROR, "do not understand {0}".format(data), origin) + log.doLog(service, log.LogLevel.ERROR, f'do not understand {data}', origin) def actorData( self, userService: 'UserService' ) -> typing.MutableMapping[str, typing.Any]: return {'action': 'rename', 'name': userService.getName()} - def processUnused(self, userService): + def processUnused(self, userService: 'UserService') -> None: """ This will be invoked for every assigned and unused user service that has been in this state at least 1/2 of Globalconfig.CHECK_UNUSED_TIME This function can update userService values. Normal operation will be remove machines if this state is not valid @@ -166,14 +166,14 @@ class LinuxOsManager(osmanagers.OSManager): ) userService.remove() - def isPersistent(self): + def isPersistent(self) -> bool: return self._onLogout == 'keep-always' def checkState(self, userService: 'UserService') -> str: logger.debug('Checking state for service %s', userService) return State.RUNNING - def maxIdle(self): + def maxIdle(self) -> typing.Optional[int]: """ On production environments, will return no idle for non removable machines """ @@ -192,7 +192,7 @@ class LinuxOsManager(osmanagers.OSManager): ['v3', self._onLogout, str(self._idle), gui.fromBool(self._deadLine)] ).encode('utf8') - def unmarshal(self, data: bytes): + def unmarshal(self, data: bytes) -> None: values = data.decode('utf8').split('\t') self._idle = -1 self._deadLine = True diff --git a/server/src/uds/osmanagers/LinuxOsManager/linux_randompass_osmanager.py b/server/src/uds/osmanagers/LinuxOsManager/linux_randompass_osmanager.py index 6a35fac3b..e4e64c9ec 100644 --- a/server/src/uds/osmanagers/LinuxOsManager/linux_randompass_osmanager.py +++ b/server/src/uds/osmanagers/LinuxOsManager/linux_randompass_osmanager.py @@ -74,7 +74,7 @@ class LinuxRandomPassManager(LinuxOsManager): _userAccount: str def __init__(self, environment, values): - super(LinuxRandomPassManager, self).__init__(environment, values) + super().__init__(environment, values) if values is not None: if values['userAccount'] == '': raise exceptions.ValidationError( @@ -91,7 +91,7 @@ class LinuxRandomPassManager(LinuxOsManager): return (username, userService.recoverValue('linOsRandomPass')) return username, password - def genPassword(self, service): + def genPassword(self, service: 'UserService') -> str: randomPass = service.recoverValue('linOsRandomPass') if randomPass is None: randomPass = ''.join( @@ -102,7 +102,7 @@ class LinuxRandomPassManager(LinuxOsManager): log.doLog( service, log.LogLevel.INFO, - "Password set to \"{}\"".format(randomPass), + f'Password set to "{randomPass}"', log.LogSource.OSMANAGER, ) @@ -134,7 +134,7 @@ class LinuxRandomPassManager(LinuxOsManager): self._userAccount = values[1].decode() LinuxOsManager.unmarshal(self, codecs.decode(values[2], 'hex')) - def valuesDict(self): + def valuesDict(self) -> gui.ValuesDictType: dic = LinuxOsManager.valuesDict(self) dic['userAccount'] = self._userAccount return dic diff --git a/server/src/uds/osmanagers/Test/__init__.py b/server/src/uds/osmanagers/Test/__init__.py index d42d76d74..5a6b4ad03 100644 --- a/server/src/uds/osmanagers/Test/__init__.py +++ b/server/src/uds/osmanagers/Test/__init__.py @@ -27,9 +27,6 @@ # OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. """ -@author: Adolfo Gómez, dkmaster at dkmon dot com +Author: Adolfo Gómez, dkmaster at dkmon dot com """ -from django.utils.translation import gettext_noop as _ - from .testing_osmanager import TestOSManager - diff --git a/server/src/uds/osmanagers/Test/testing_osmanager.py b/server/src/uds/osmanagers/Test/testing_osmanager.py index e8600fc2d..a97702763 100644 --- a/server/src/uds/osmanagers/Test/testing_osmanager.py +++ b/server/src/uds/osmanagers/Test/testing_osmanager.py @@ -108,18 +108,18 @@ class TestOSManager(osmanagers.OSManager): """ return userService.getName() - def doLog(self, service, data, origin=log.LogSource.OSMANAGER): + def doLog(self, service: 'UserService', data, origin=log.LogSource.OSMANAGER) -> None: # Stores a log associated with this service try: - msg, level = data.split('\t') + msg, slevel = data.split('\t') try: - level = int(level) + level = log.LogLevel.fromStr(slevel) except Exception: - logger.debug('Do not understand level %s', level) + logger.debug('Do not understand level %s', slevel) level = log.LogLevel.INFO log.doLog(service, level, msg, origin) except Exception: - log.doLog(service, log.LogLevel.ERROR, "do not understand {0}".format(data), origin) + log.doLog(service, log.LogLevel.ERROR, f'do not understand {data}', origin) def actorData( self, userService: 'UserService' diff --git a/server/src/uds/osmanagers/WindowsOsManager/__init__.py b/server/src/uds/osmanagers/WindowsOsManager/__init__.py index 8913d998a..8e2f2878a 100644 --- a/server/src/uds/osmanagers/WindowsOsManager/__init__.py +++ b/server/src/uds/osmanagers/WindowsOsManager/__init__.py @@ -45,17 +45,17 @@ from .windows_domain import WinDomainOsManager from .windows_random import WinRandomPassManager managers.downloadsManager().registerDownloadable( - 'UDSActorSetup-{version}.exe'.format(version=VERSION), + f'UDSActorSetup-{VERSION}.exe', _('UDS Actor for windows machines'), os.path.dirname(typing.cast(str, sys.modules[__package__].__file__)) - + '/files/UDSActorSetup-{version}.exe'.format(version=VERSION), + + f'/files/UDSActorSetup-{VERSION}.exe', 'application/x-msdos-program', ) managers.downloadsManager().registerDownloadable( - 'UDSActorUnmanagedSetup-{version}.exe'.format(version=VERSION), + f'UDSActorUnmanagedSetup-{VERSION}.exe', _('UDS Actor for Unmanaged windows machines. Used ONLY for static machines.'), os.path.dirname(typing.cast(str, sys.modules[__package__].__file__)) - + '/files/UDSActorUnmanagedSetup-{version}.exe'.format(version=VERSION), + + f'/files/UDSActorUnmanagedSetup-{VERSION}.exe', 'application/x-msdos-program', ) diff --git a/server/src/uds/osmanagers/WindowsOsManager/windows.py b/server/src/uds/osmanagers/WindowsOsManager/windows.py index 496d25cc6..a77ac074a 100644 --- a/server/src/uds/osmanagers/WindowsOsManager/windows.py +++ b/server/src/uds/osmanagers/WindowsOsManager/windows.py @@ -96,7 +96,7 @@ class WindowsOsManager(osmanagers.OSManager): except Exception: raise exceptions.ValidationError( _('Length must be numeric!!') - ) + ) from None if length > 6 or length < 1: raise exceptions.ValidationError( _('Length must be betwen 1 and 6') @@ -107,7 +107,7 @@ class WindowsOsManager(osmanagers.OSManager): self.processUnusedMachines = self._onLogout == 'remove' def __init__(self, environment, values): - super(WindowsOsManager, self).__init__(environment, values) + super().__init__(environment, values) if values is not None: self._onLogout = values['onLogout'] self._idle = int(values['idle']) @@ -145,7 +145,7 @@ class WindowsOsManager(osmanagers.OSManager): try: msg, levelStr = data.split('\t') try: - level = int(levelStr) + level = log.LogLevel.fromStr(levelStr) except Exception: logger.debug('Do not understand level %s', levelStr) level = log.LogLevel.INFO @@ -154,7 +154,7 @@ class WindowsOsManager(osmanagers.OSManager): except Exception: logger.exception('WindowsOs Manager message log: ') log.doLog( - userService, log.LogLevel.ERROR, "do not understand {0}".format(data), origin + userService, log.LogLevel.ERROR, f'do not understand {data}', origin ) def actorData( diff --git a/server/src/uds/osmanagers/WindowsOsManager/windows_domain.py b/server/src/uds/osmanagers/WindowsOsManager/windows_domain.py index 8c109cfc8..d5c228726 100644 --- a/server/src/uds/osmanagers/WindowsOsManager/windows_domain.py +++ b/server/src/uds/osmanagers/WindowsOsManager/windows_domain.py @@ -68,9 +68,7 @@ class WinDomainOsManager(WindowsOsManager): length=64, label=_('Domain'), order=1, - tooltip=_( - 'Domain to join machines to (use FQDN form, Netbios name not supported for most operations)' - ), + tooltip=_('Domain to join machines to (use FQDN form, Netbios name not supported for most operations)'), required=True, ) account = gui.TextField( @@ -99,9 +97,7 @@ class WinDomainOsManager(WindowsOsManager): length=64, label=_('Machine Group'), order=7, - tooltip=_( - 'Group to which add machines on creation. If empty, no group will be used.' - ), + tooltip=_('Group to which add machines on creation. If empty, no group will be used.'), tab=_('Advanced'), ) removeOnExit = gui.CheckBoxField( @@ -150,17 +146,11 @@ class WinDomainOsManager(WindowsOsManager): # if values['domain'].find('.') == -1: # raise exceptions.ValidationException(_('Must provide domain in FQDN')) if values['account'] == '': - raise exceptions.ValidationError( - _('Must provide an account to add machines to domain!') - ) + raise exceptions.ValidationError(_('Must provide an account to add machines to domain!')) if values['account'].find('\\') != -1: - raise exceptions.ValidationError( - _('DOM\\USER form is not allowed!') - ) + raise exceptions.ValidationError(_('DOM\\USER form is not allowed!')) if values['password'] == '': - raise exceptions.ValidationError( - _('Must provide a password for the account!') - ) + raise exceptions.ValidationError(_('Must provide a password for the account!')) self._domain = values['domain'] self._ou = values['ou'].strip() self._account = values['account'] @@ -269,9 +259,7 @@ class WinDomainOsManager(WindowsOsManager): fltr = f'(&(objectClass=computer)(sAMAccountName={ldaputil.escape(machineName)}$))' obj: typing.Optional[typing.MutableMapping[str, typing.Any]] try: - obj = next( - ldaputil.getAsDict(ldapConnection, base, fltr, ['dn'], sizeLimit=50) - ) + obj = next(ldaputil.getAsDict(ldapConnection, base, fltr, ['dn'], sizeLimit=50)) except StopIteration: obj = None @@ -309,7 +297,7 @@ class WinDomainOsManager(WindowsOsManager): logger.warning('Could not find _ldap._tcp.%s', self._domain) log.doLog( userService, - log.WARN, + log.LogLevel.WARNING, f'Could not remove machine from domain (_ldap._tcp.{self._domain} not found)', log.LogSource.OSMANAGER, ) @@ -325,7 +313,7 @@ class WinDomainOsManager(WindowsOsManager): # logger.exception('Ldap Exception caught') if error: - log.doLog(userService, log.WARN, error, log.LogSource.OSMANAGER) + log.doLog(userService, log.LogLevel.WARNING, error, log.LogSource.OSMANAGER) logger.error(error) def release(self, userService: 'UserService') -> None: @@ -351,7 +339,7 @@ class WinDomainOsManager(WindowsOsManager): logger.warning('Could not find _ldap._tcp.%s', self._domain) log.doLog( userService, - log.WARN, + log.LogLevel.WARNING, f'Could not remove machine from domain (_ldap._tcp.{self._domain} not found)', log.LogSource.OSMANAGER, ) @@ -360,7 +348,7 @@ class WinDomainOsManager(WindowsOsManager): # logger.exception('Ldap Exception caught') log.doLog( userService, - log.WARN, + log.LogLevel.WARNING, f'Could not remove machine from domain ({e})', log.LogSource.OSMANAGER, ) @@ -369,7 +357,7 @@ class WinDomainOsManager(WindowsOsManager): # logger.exception('Exception caught') log.doLog( userService, - log.WARN, + log.LogLevel.WARNING, f'Could not remove machine from domain ({e})', log.LogSource.OSMANAGER, ) @@ -378,14 +366,10 @@ class WinDomainOsManager(WindowsOsManager): try: res = self.__getMachine(ldapConnection, userService.friendly_name) if res is None: - raise Exception( - f'Machine {userService.friendly_name} not found on AD (permissions?)' - ) + raise Exception(f'Machine {userService.friendly_name} not found on AD (permissions?)') ldaputil.recursive_delete(ldapConnection, res) except IndexError: - logger.error( - 'Error deleting %s from BASE %s', userService.friendly_name, self._ou - ) + logger.error('Error deleting %s from BASE %s', userService.friendly_name, self._ou) except Exception: logger.exception('Deleting from AD: ') @@ -395,9 +379,9 @@ class WinDomainOsManager(WindowsOsManager): except ldaputil.LDAPError as e: return _('Check error: {}').format(e) except dns.resolver.NXDOMAIN: - return _( - 'Could not find server parameters (_ldap._tcp.{0} can\'t be resolved)' - ).format(self._domain) + return _('Could not find server parameters (_ldap._tcp.{0} can\'t be resolved)').format( + self._domain + ) except Exception as e: logger.exception('Exception ') return str(e) @@ -410,17 +394,13 @@ class WinDomainOsManager(WindowsOsManager): # Group if self._group != '': if self.__getGroup(ldapConnection) is None: - return _( - 'Check Error: group "{}" not found (using "cn" to locate it)' - ).format(self._group) + return _('Check Error: group "{}" not found (using "cn" to locate it)').format(self._group) return _('Server check was successful') # pylint: disable=protected-access @staticmethod - def test( - env: 'Environment', data: typing.Dict[str, str] - ) -> typing.List[typing.Any]: + def test(env: 'Environment', data: typing.Dict[str, str]) -> typing.List[typing.Any]: logger.debug('Test invoked') wd = WinDomainOsManager(env, data) logger.debug(wd) @@ -442,17 +422,13 @@ class WinDomainOsManager(WindowsOsManager): if wd and not wd._ou: return [ False, - _('The default path {0} for computers was not found!!!').format( - wd._ou - ), + _('The default path {0} for computers was not found!!!').format(wd._ou), ] return [False, _('The ou path {0} was not found!!!').format(wd._ou)] except dns.resolver.NXDOMAIN: return [ True, - _( - 'Could not check parameters (_ldap._tcp.{0} can\'r be resolved)' - ).format(wd._domain), + _('Could not check parameters (_ldap._tcp.{0} can\'r be resolved)').format(wd._domain), ] except Exception as e: logger.exception('Exception ') @@ -460,9 +436,7 @@ class WinDomainOsManager(WindowsOsManager): return [True, _("All parameters seem to work fine.")] - def actorData( - self, userService: 'UserService' - ) -> typing.MutableMapping[str, typing.Any]: + def actorData(self, userService: 'UserService') -> typing.MutableMapping[str, typing.Any]: return { 'action': 'rename_ad', 'name': userService.getName(), diff --git a/server/src/uds/osmanagers/WindowsOsManager/windows_random.py b/server/src/uds/osmanagers/WindowsOsManager/windows_random.py index eb9a3a315..b77a7a62e 100644 --- a/server/src/uds/osmanagers/WindowsOsManager/windows_random.py +++ b/server/src/uds/osmanagers/WindowsOsManager/windows_random.py @@ -58,9 +58,7 @@ logger = logging.getLogger(__name__) class WinRandomPassManager(WindowsOsManager): typeName = _('Windows Random Password OS Manager') typeType = 'WinRandomPasswordManager' - typeDescription = _( - 'Os Manager to control windows machines, with user password set randomly.' - ) + typeDescription = _('Os Manager to control windows machines, with user password set randomly.') iconFile = 'wosmanager.png' # Apart form data from windows os manager, we need also domain and credentials @@ -88,13 +86,9 @@ class WinRandomPassManager(WindowsOsManager): super().__init__(environment, values) if values: if values['userAccount'] == '': - raise exceptions.ValidationError( - _('Must provide an user account!!!') - ) + raise exceptions.ValidationError(_('Must provide an user account!!!')) if values['password'] == '': - raise exceptions.ValidationError( - _('Must provide a password for the account!!!') - ) + raise exceptions.ValidationError(_('Must provide a password for the account!!!')) self._userAccount = values['userAccount'] self._password = values['password'] else: @@ -107,9 +101,7 @@ class WinRandomPassManager(WindowsOsManager): if username == self._userAccount: password = userService.recoverValue('winOsRandomPass') - return WindowsOsManager.processUserPassword( - self, userService, username, password - ) + return WindowsOsManager.processUserPassword(self, userService, username, password) def genPassword(self, userService: 'UserService'): randomPass = userService.recoverValue('winOsRandomPass') @@ -117,12 +109,9 @@ class WinRandomPassManager(WindowsOsManager): # Generates a password that conforms to complexity rnd = random.SystemRandom() base = ''.join( - rnd.choice(v) - for v in (string.ascii_lowercase, string.ascii_uppercase, string.digits) + rnd.choice(v) for v in (string.ascii_lowercase, string.ascii_uppercase, string.digits) ) + rnd.choice('.+-') - randomPass = ''.join( - rnd.choice(string.ascii_letters + string.digits) for _ in range(12) - ) + randomPass = ''.join(rnd.choice(string.ascii_letters + string.digits) for _ in range(12)) pos = rnd.randrange(0, len(randomPass)) randomPass = randomPass[:pos] + base + randomPass[pos:] userService.storeValue('winOsRandomPass', randomPass) @@ -134,9 +123,7 @@ class WinRandomPassManager(WindowsOsManager): ) return randomPass - def actorData( - self, userService: 'UserService' - ) -> typing.MutableMapping[str, typing.Any]: + def actorData(self, userService: 'UserService') -> typing.MutableMapping[str, typing.Any]: return { 'action': 'rename', 'name': userService.getName(), @@ -150,9 +137,9 @@ class WinRandomPassManager(WindowsOsManager): Serializes the os manager data so we can store it in database ''' base = codecs.encode(super().marshal(), 'hex').decode() - return '\t'.join( - ['v1', self._userAccount, CryptoManager().encrypt(self._password), base] - ).encode('utf8') + return '\t'.join(['v1', self._userAccount, CryptoManager().encrypt(self._password), base]).encode( + 'utf8' + ) def unmarshal(self, data: bytes) -> None: values = data.decode('utf8').split('\t') diff --git a/server/src/uds/reports/auto/__init__.py b/server/src/uds/reports/auto/__init__.py index 18536bf26..11222d760 100644 --- a/server/src/uds/reports/auto/__init__.py +++ b/server/src/uds/reports/auto/__init__.py @@ -60,16 +60,13 @@ reportAutoModelDct: typing.Mapping[str, typing.Type[ReportAutoModel]] = { # typ class ReportAutoType(UserInterfaceType): - def __new__(cls, name, bases, attrs) -> 'ReportAutoType': + def __new__(mcs, name, bases, attrs) -> 'ReportAutoType': # Add gui for elements... order = 1 # Check what source if attrs.get('data_source'): - - attrs['source'] = fields.source_field( - order, attrs['data_source'], attrs['multiple'] - ) + attrs['source'] = fields.source_field(order, attrs['data_source'], attrs['multiple']) order += 1 # Check if date must be added @@ -88,11 +85,9 @@ class ReportAutoType(UserInterfaceType): attrs['interval'] = fields.intervals_field(order) order += 1 - return typing.cast( - 'ReportAutoType', UserInterfaceType.__new__(cls, name, bases, attrs) - ) - + return typing.cast('ReportAutoType', UserInterfaceType.__new__(mcs, name, bases, attrs)) +# pylint: disable=abstract-method class ReportAuto(Report, metaclass=ReportAutoType): # Variables that will be overwriten on new class creation source: typing.ClassVar[typing.Union[gui.MultiChoiceField, gui.ChoiceField]] @@ -115,23 +110,19 @@ class ReportAuto(Report, metaclass=ReportAutoType): multiple: bool = False def getModel(self) -> typing.Type[ReportAutoModel]: - data_source = self.data_source.split('.')[0] + data_source = self.data_source.split('.', maxsplit=1)[0] return reportAutoModelDct[data_source] def initGui(self): # Fills datasource - fields.source_field_data(self.getModel(), self.data_source, self.source) + fields.source_field_data(self.getModel(), self.source) logger.debug('Source field data: %s', self.source) def getModelItems(self) -> typing.Iterable[ReportAutoModel]: model = self.getModel() - filters = ( - [self.source.value] - if isinstance(self.source, gui.ChoiceField) - else self.source.value - ) + filters = [self.source.value] if isinstance(self.source, gui.ChoiceField) else self.source.value if '0-0-0-0' in filters: items = model.objects.all() @@ -141,9 +132,7 @@ class ReportAuto(Report, metaclass=ReportAutoType): return items def getIntervalInHours(self): - return {'hour': 1, 'day': 24, 'week': 24 * 7, 'month': 24 * 30}[ - self.interval.value - ] + return {'hour': 1, 'day': 24, 'week': 24 * 7, 'month': 24 * 30}[self.interval.value] def getIntervalsList(self) -> typing.List[typing.Tuple[datetime.datetime, datetime.datetime]]: intervals: typing.List[typing.Tuple[datetime.datetime, datetime.datetime]] = [] @@ -164,33 +153,29 @@ class ReportAuto(Report, metaclass=ReportAutoType): next = (start + datetime.timedelta(days=32)).replace(day=1) intervals.append((start, next)) start = next - - logger.info('Intervals: {0}'.format(intervals)) - return intervals + logger.debug('Intervals: %s', intervals) + return intervals def adjustDate(self, d: datetime.date, isEndingDate: bool) -> datetime.date: if self.interval.value in ('hour', 'day'): return d - elif self.interval.value == 'week': + if self.interval.value == 'week': return (d - datetime.timedelta(days=d.weekday())).replace() - elif self.interval.value == 'month': + if self.interval.value == 'month': if not isEndingDate: return d.replace(day=1) - else: - return (d + datetime.timedelta(days=32)).replace(day=1) - datetime.timedelta(days=1) - else: - return d + return (d + datetime.timedelta(days=32)).replace(day=1) - datetime.timedelta(days=1) + return d def formatDatetimeAsString(self, d: datetime.date) -> str: if self.interval.value in ('hour', 'day'): return d.strftime('%Y-%b-%d %H:%M:%S') - elif self.interval.value == 'week': + if self.interval.value == 'week': return d.strftime('%Y-%b-%d') - elif self.interval.value == 'month': + if self.interval.value == 'month': return d.strftime('%Y-%b') - else: - return d.strftime('%Y-%b-%d %H:%M:%S') + return d.strftime('%Y-%b-%d %H:%M:%S') def startingDate(self) -> datetime.date: return self.adjustDate(self.date_start.date(), False) diff --git a/server/src/uds/reports/auto/fields.py b/server/src/uds/reports/auto/fields.py index 42ea23208..5a5a3f666 100644 --- a/server/src/uds/reports/auto/fields.py +++ b/server/src/uds/reports/auto/fields.py @@ -37,7 +37,6 @@ import typing from django.utils.translation import gettext_noop as _ from uds.core.ui import gui -from uds import models logger = logging.getLogger(__name__) @@ -57,7 +56,7 @@ def single_date_field(order: int) -> gui.DateField: order=order, label=_('Date'), tooltip=_('Date for report'), - defvalue=lambda: datetime.date.today(), + defvalue=datetime.date.today, required=True, ) @@ -67,7 +66,7 @@ def end_date_field(order: int) -> gui.DateField: order=order, label=_('Ending date'), tooltip=_('ending date for report'), - defvalue=lambda: datetime.date.today()+datetime.timedelta(days=1), + defvalue=lambda: datetime.date.today() + datetime.timedelta(days=1), required=True, ) @@ -113,7 +112,6 @@ def source_field( def source_field_data( model: typing.Any, - data_source: str, field: typing.Union[gui.ChoiceField, gui.MultiChoiceField], ) -> None: dataList: typing.List[gui.ChoiceType] = [ diff --git a/server/src/uds/reports/lists/audit.py b/server/src/uds/reports/lists/audit.py index 73755aff2..240b65e41 100644 --- a/server/src/uds/reports/lists/audit.py +++ b/server/src/uds/reports/lists/audit.py @@ -41,6 +41,7 @@ from django.utils.translation import gettext, gettext_lazy as _ from uds.core.ui import gui from uds.core.util import log +from uds.core.managers.log.objects import LogObjectType from uds.models import Log from .base import ListReport @@ -49,7 +50,6 @@ from .base import ListReport logger = logging.getLogger(__name__) - class ListReportAuditCSV(ListReport): name = _('Audit Log list') # Report name description = _('List administration audit logs') # Report description @@ -80,16 +80,21 @@ class ListReportAuditCSV(ListReport): def genData(self) -> typing.Generator[typing.Tuple, None, None]: # Xtract user method, response_code and request from data # the format is "user: [method/response_code] request" - rx = re.compile(r'(?P[^ ]*) (?P.*?): \[(?P[^/]*)/(?P[^\]]*)\] (?P.*)') + rx = re.compile( + r'(?P[^ ]*) (?P.*?): \[(?P[^/]*)/(?P[^\]]*)\] (?P.*)' + ) start = self.startDate.datetime().replace(hour=0, minute=0, second=0, microsecond=0) end = self.endDate.datetime().replace(hour=23, minute=59, second=59, microsecond=999999) for i in Log.objects.filter( - created__gte=start, created__lte=end, owner_id=0, owner_type=log.OWNER_TYPE_AUDIT, + created__gte=start, + created__lte=end, + source=log.LogSource.REST, + owner_type=LogObjectType.SYSLOG, ).order_by('-created'): # extract user, method, response_code and request from data field m = rx.match(i.data) - + if m is not None: # Convert response code to an string if 200, else, to an error response_code = { @@ -117,7 +122,15 @@ class ListReportAuditCSV(ListReport): writer = csv.writer(output) writer.writerow( - [gettext('Date'), gettext('Level'), gettext('IP'), gettext('User'), gettext('Method'), gettext('Response code'), gettext('Request')] + [ + gettext('Date'), + gettext('Level'), + gettext('IP'), + gettext('User'), + gettext('Method'), + gettext('Response code'), + gettext('Request'), + ] ) for l in self.genData(): diff --git a/server/src/uds/reports/lists/base.py b/server/src/uds/reports/lists/base.py index 37640fbe1..250331330 100644 --- a/server/src/uds/reports/lists/base.py +++ b/server/src/uds/reports/lists/base.py @@ -30,8 +30,6 @@ """ Author: Adolfo Gómez, dkmaster at dkmon dot com """ -import typing - from django.utils.translation import gettext_noop as _ from uds.core import reports diff --git a/server/src/uds/reports/stats/auth_stats.py b/server/src/uds/reports/stats/auth_stats.py index df711da41..fc1b8e613 100644 --- a/server/src/uds/reports/stats/auth_stats.py +++ b/server/src/uds/reports/stats/auth_stats.py @@ -34,7 +34,6 @@ import typing from django.utils.translation import gettext, gettext_lazy as _ -from uds.core.ui import gui from uds.core.util.stats import counters from .base import StatsReportAuto @@ -68,8 +67,6 @@ class AuthenticatorsStats(StatsReportAuto): # Will show a.name on every change... stats.append({'date': a.name, 'users': None}) - services = 0 - userServices = 0 for i in self.getIntervalsList(): start = i[0] end = i[1] diff --git a/server/src/uds/reports/stats/base.py b/server/src/uds/reports/stats/base.py index bd4d46293..6c319d5ca 100644 --- a/server/src/uds/reports/stats/base.py +++ b/server/src/uds/reports/stats/base.py @@ -30,8 +30,6 @@ """ Author: Adolfo Gómez, dkmaster at dkmon dot com """ -import typing - from django.utils.translation import gettext_noop as _ from uds.core import reports from ..auto import ReportAuto @@ -43,6 +41,6 @@ class StatsReport(reports.Report): def generate(self) -> bytes: raise NotImplementedError('StatsReport generate invoked and not implemented') - +# pylint: disable=abstract-method class StatsReportAuto(ReportAuto, StatsReport): pass diff --git a/server/src/uds/reports/stats/pools_usage_day.py b/server/src/uds/reports/stats/pools_usage_day.py index bf9d58864..eac4185f5 100644 --- a/server/src/uds/reports/stats/pools_usage_day.py +++ b/server/src/uds/reports/stats/pools_usage_day.py @@ -92,7 +92,7 @@ class CountersPoolAssigned(StatsReport): for poolUuid in self.pools.value: try: pool = ServicePool.objects.get(uuid=poolUuid) - except Exception: + except Exception: # nosec: If not found, simple ignore it and go for next continue hours = [0] * 24 @@ -125,9 +125,7 @@ class CountersPoolAssigned(StatsReport): d = { 'title': _('Services by hour'), 'x': X, - 'xtickFnc': lambda xx: '{:02d}'.format( - xx - ), # pylint: disable=unnecessary-lambda + 'xtickFnc': '{:02d}'.format, # Two digits 'xlabel': _('Hour'), 'y': [ {'label': i['name'], 'data': [i['hours'][v] for v in X]} for i in items @@ -172,6 +170,6 @@ class CountersPoolAssignedCSV(CountersPoolAssigned): for i in items: for j in range(24): - writer.writerow([i['name'], '{:02d}'.format(j), i['hours'][j]]) + writer.writerow([i['name'], f'{j:02d}', i['hours'][j]]) return output.getvalue() diff --git a/server/src/uds/reports/stats/pools_usage_summary.py b/server/src/uds/reports/stats/pools_usage_summary.py index f67e9ca41..b628fe651 100644 --- a/server/src/uds/reports/stats/pools_usage_summary.py +++ b/server/src/uds/reports/stats/pools_usage_summary.py @@ -60,7 +60,7 @@ class PoolsUsageSummary(UsageByPool): ) -> typing.Tuple[ typing.ValuesView[typing.MutableMapping[str, typing.Any]], int, int, int ]: - orig, poolNames = super().getData() + orig, poolNames = super().getData() # pylint: disable=unused-variable # Keep name for reference pools: typing.Dict[str, typing.Dict] = {} totalTime: int = 0 @@ -88,8 +88,8 @@ class PoolsUsageSummary(UsageByPool): logger.debug('Pools %s', pools) # Remove unique users, and keep only counts... - for pn in pools: - pools[pn]['users'] = len(pools[pn]['users']) + for _, pn in pools.items(): + pn['users'] = len(pn['users']) return pools.values(), totalTime, totalCount or 1, len(uniqueUsers) diff --git a/server/src/uds/reports/stats/usage.py b/server/src/uds/reports/stats/usage.py index 143a5c180..5e38d6d6c 100644 --- a/server/src/uds/reports/stats/usage.py +++ b/server/src/uds/reports/stats/usage.py @@ -30,16 +30,15 @@ Author: Adolfo Gómez, dkmaster at dkmon dot com """ -""" from django.utils.translation import gettext_noop as _ +# from django.utils.translation import gettext_noop as _ -from .base import StatsReport +# from .base import StatsReport -class StatsReportUsage(StatsReport): # Disabled from being used - name = _('Usage stats') # Report name - description = _('Statistics of platform use') # Report description - uuid = '9ae54172-ed48-11e4-b8e1-10feed05884b' +# class StatsReportUsage(StatsReport): # Disabled from being used +# name = _('Usage stats') # Report name +# description = _('Statistics of platform use') # Report description +# uuid = '9ae54172-ed48-11e4-b8e1-10feed05884b' - def generate(self): - return '' - """ +# def generate(self): +# return '' diff --git a/server/src/uds/reports/stats/user_access.py b/server/src/uds/reports/stats/user_access.py index 063a3ad3d..adea0431a 100644 --- a/server/src/uds/reports/stats/user_access.py +++ b/server/src/uds/reports/stats/user_access.py @@ -200,7 +200,7 @@ class StatsReportLogin(StatsReport): _('Sunday'), ][l], 'xlabel': _('Day of week'), - 'y': [{'label': 'Users', 'data': [v for v in dataWeek]}], + 'y': [{'label': 'Users', 'data': list(dataWeek)}], 'ylabel': 'Users', } @@ -211,7 +211,7 @@ class StatsReportLogin(StatsReport): 'title': _('Users Access (by hour)'), 'x': X, 'xlabel': _('Hour'), - 'y': [{'label': 'Users', 'data': [v for v in dataHour]}], + 'y': [{'label': 'Users', 'data': list(dataHour)}], 'ylabel': 'Users', }