1
0
mirror of https://github.com/dkmstr/openuds.git synced 2025-01-08 21:18:00 +03:00

a lot of more linting

This commit is contained in:
Adolfo Gómez García 2023-04-16 13:51:30 +02:00
parent fa8e77c750
commit 030e3785e9
No known key found for this signature in database
GPG Key ID: DD1ABF20724CDA23
40 changed files with 404 additions and 324 deletions

View File

@ -74,7 +74,7 @@ class Environment:
from uds.core.util.storage import Storage # pylint: disable=import-outside-toplevel
if idGenerators is None:
idGenerators = dict()
idGenerators = {}
self._key = uniqueKey
self._cache = Cache(uniqueKey)
self._storage = Storage(uniqueKey)

View File

@ -28,17 +28,14 @@
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
"""
UDS os managers related interfaces and classes
.. moduleauthor:: Adolfo Gómez, dkmaster at dkmon dot com
Author: Adolfo Gómez, dkmaster at dkmon dot com
"""
from .mfa import MFA
from .mfa import MFA, LoginAllowed
from .mfafactory import MFAsFactory
def factory():
def factory() -> MFAsFactory:
"""
Returns factory for register/access to authenticators
"""
from .mfafactory import MFAsFactory
return MFAsFactory.factory()
return MFAsFactory()

View File

@ -37,10 +37,11 @@ import hashlib
import logging
import typing
from django.utils.translation import gettext_noop as _
from django.utils.translation import gettext_noop as _, gettext
from uds.core.module import Module
from uds.models.util import getSqlDatetime
from uds.core.auths import exceptions
from uds.models.network import Network
if typing.TYPE_CHECKING:
from uds.core.environment import Environment
@ -50,6 +51,54 @@ if typing.TYPE_CHECKING:
logger = logging.getLogger(__name__)
class LoginAllowed(enum.StrEnum):
"""
This enum is used to know if the MFA code was sent or not.
"""
ALLOWED = '0'
DENIED = '1'
ALLOWED_IF_IN_NETWORKS = '2'
DENIED_IF_IN_NETWORKS = '3'
@staticmethod
def checkAction(
action: 'LoginAllowed|str',
request: 'ExtendedHttpRequest',
networks: typing.Optional[typing.Iterable[str]] = None,
) -> bool:
def checkIp() -> bool:
if networks is None:
return True # No network restrictions, so we allow
return any(
i.contains(request.ip)
for i in Network.objects.filter(uuid__in=list(networks))
)
if isinstance(action, str):
action = LoginAllowed(action)
return {
LoginAllowed.ALLOWED: True,
LoginAllowed.DENIED: False,
LoginAllowed.ALLOWED_IF_IN_NETWORKS: checkIp(),
LoginAllowed.DENIED_IF_IN_NETWORKS: not checkIp(),
}.get(action, False)
@staticmethod
def valuesForSelect() -> typing.Mapping[str, str]:
return {
LoginAllowed.ALLOWED.value: gettext('Allow user login'),
LoginAllowed.DENIED.value: gettext('Deny user login'),
LoginAllowed.ALLOWED_IF_IN_NETWORKS.value: gettext(
'Allow user to login if it IP is in the networks list'
),
LoginAllowed.DENIED_IF_IN_NETWORKS.value: gettext(
'Deny user to login if it IP is in the networks list'
),
}
class MFA(Module):
"""
this class provides an abstraction of a Multi Factor Authentication
@ -155,8 +204,8 @@ class MFA(Module):
If returns MFA.RESULT.ALLOW, the MFA code was not sent, the user does not need to enter the MFA code.
If raises an error, the MFA code was not sent, and the user needs to enter the MFA code.
"""
raise NotImplementedError('sendCode method not implemented')
logger.error('MFA.sendCode not implemented')
raise exceptions.MFAError('MFA.sendCode not implemented')
def _getData(
self, request: 'ExtendedHttpRequest', userId: str
@ -216,10 +265,7 @@ class MFA(Module):
try:
if data and validity:
# if we have a stored code, check if it's still valid
if (
data[0] + datetime.timedelta(seconds=validity)
> getSqlDatetime()
):
if data[0] + datetime.timedelta(seconds=validity) > getSqlDatetime():
# if it's still valid, just return without sending a new one
return MFA.RESULT.OK
except Exception:
@ -232,6 +278,7 @@ class MFA(Module):
# Send the code to the user
# May raise an exception if the code was not sent and is required to be sent
# pylint: disable=assignment-from-no-return
result = self.sendCode(request, userId, username, identifier, code)
# Store the code in the database, own storage space, if no exception was raised

View File

@ -30,32 +30,13 @@
"""
@author: Adolfo Gómez, dkmaster at dkmon dot com
"""
import logging
import typing
from uds.core.util import singleton
from uds.core.util import factory
if typing.TYPE_CHECKING:
from .mfa import MFA
logger = logging.getLogger(__name__)
class MFAsFactory(metaclass=singleton.Singleton):
_factory: typing.Optional['MFAsFactory'] = None
_mfas: typing.MutableMapping[str, typing.Type['MFA']] = {}
@staticmethod
def factory() -> 'MFAsFactory':
return MFAsFactory()
def providers(self) -> typing.Mapping[str, typing.Type['MFA']]:
return self._mfas
def insert(self, type_: typing.Type['MFA']) -> None:
logger.debug('Adding Multi Factor Auth %s as %s', type_.type(), type_)
typeName = type_.type().lower()
self._mfas[typeName] = type_
def lookup(self, typeName: str) -> typing.Optional[typing.Type['MFA']]:
return self._mfas.get(typeName.lower(), None)
class MFAsFactory(factory.ModuleFactory['MFA']):
pass

View File

@ -1,7 +1,7 @@
# -*- coding: utf-8 -*-
#
# Copyright (c) 2012-2021 Virtual Cable S.L.U.
# Copyright (c) 2012-2023 Virtual Cable S.L.U.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without modification,
@ -28,10 +28,10 @@
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
"""
UDS os managers related interfaces and classes
.. moduleauthor:: Adolfo Gómez, dkmaster at dkmon dot com
Author: Adolfo Gómez, dkmaster at dkmon dot com
"""
# pylint: disable=unused-import
from .osmanager import OSManager
from .osmfactory import OSManagersFactory

View File

@ -99,7 +99,7 @@ class OSManager(Module):
# These methods must be overriden
def actorData(
self, userService: 'UserService'
self, userService: 'UserService' # pylint: disable=unused-argument
) -> typing.MutableMapping[str, typing.Any]:
"""
This method provides information to actor, so actor can complete os configuration.
@ -129,7 +129,9 @@ class OSManager(Module):
"""
return {}
def checkState(self, userService: 'UserService') -> str:
def checkState(
self, userService: 'UserService' # pylint: disable=unused-argument
) -> str:
"""
This method must be overriden so your os manager can respond to requests from system to the current state of the service
This method will be invoked when:
@ -148,7 +150,9 @@ class OSManager(Module):
This function can update userService values. Normal operation will be remove machines if this state is not valid
"""
def isRemovableOnLogout(self, userService: 'UserService') -> bool:
def isRemovableOnLogout(
self, userService: 'UserService' # pylint: disable=unused-argument
) -> bool:
"""
If returns true, when actor notifies "logout", UDS will mark service for removal
can be overriden
@ -174,7 +178,10 @@ class OSManager(Module):
return cls.processUserPassword != OSManager.processUserPassword
def processUserPassword(
self, userService: 'UserService', username: str, password: str
self,
userService: 'UserService', # pylint: disable=unused-argument
username: str,
password: str,
) -> typing.Tuple[str, str]:
"""
This will be invoked prior to passsing username/password to Transport.
@ -245,7 +252,7 @@ class OSManager(Module):
log.doLog(
userService,
log.INFO,
"User {0} has logged in".format(userName),
f'User {userName} has logged in',
log.OSMANAGER,
)
@ -310,7 +317,7 @@ class OSManager(Module):
log.doLog(
userService,
log.INFO,
"User {0} has logged out".format(userName),
f'User {userName} has logged out',
log.OSMANAGER,
)

View File

@ -1,7 +1,7 @@
# -*- coding: utf-8 -*-
#
# Copyright (c) 2015-2020 Virtual Cable S.L.U.
# Copyright (c) 2015-2023 Virtual Cable S.L.U.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without modification,
@ -28,6 +28,6 @@
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
"""
.. moduleauthor:: Adolfo Gómez, dkmaster at dkmon dot com
Author: Adolfo Gómez, dkmaster at dkmon dot com
"""
from .report import Report

View File

@ -47,7 +47,12 @@ CONTENT_TYPE = 'text/plain'
@auth.trustedSourceRequired
def opengnsys(request: HttpRequest, msg: str, token: str, uuid: str) -> HttpResponse:
def opengnsys(
request: HttpRequest, # pylint: disable=unused-argument
msg: str,
token: str,
uuid: str,
) -> HttpResponse:
logger.debug('Received opengnsys message %s, token %s, uuid %s', msg, token, uuid)
def getUserService() -> typing.Optional[UserService]:
@ -63,7 +68,7 @@ def opengnsys(request: HttpRequest, msg: str, token: str, uuid: str) -> HttpResp
uuid,
)
# Sleep a while in case of error?
except Exception as e:
except Exception:
# Any exception will stop process
logger.warning(
'OpenGnsys: invalid userService %s:%s. (Ignored)', token, uuid
@ -106,7 +111,9 @@ def opengnsys(request: HttpRequest, msg: str, token: str, uuid: str) -> HttpResp
logger.debug(
'Processing logout from OpenGnsys %s', userService.friendly_name
)
actor_v3.Logout.process_logout(userService, 'OpenGnsys', '') # Close all sessions
actor_v3.Logout.process_logout(
userService, 'OpenGnsys', ''
) # Close all sessions
fnc: typing.Optional[typing.Callable[[], None]] = {
'login': login,

View File

@ -30,7 +30,6 @@
"""
@author: Adolfo Gómez, dkmaster at dkmon dot com
"""
import typing
import logging
from django.core.management.base import BaseCommand
@ -63,5 +62,5 @@ class Command(BaseCommand):
): # If not exists, try to store value without any special parameters
Config.section(mod).value(name, value).get()
except Exception as e:
self.stderr.write('The command could not be processed: {}'.format(e))
self.stderr.write(f'The command could not be processed: {e}')
logger.exception('Exception processing %s', args)

View File

@ -198,6 +198,7 @@ def osmanager_exporter(osmanager: models.OSManager) -> typing.Dict[str, typing.A
o = managed_object_exporter(osmanager)
return o
def calendar_exporter(calendar: models.Calendar) -> typing.Dict[str, typing.Any]:
"""
Exports a calendar to a dict
@ -212,7 +213,10 @@ def calendar_exporter(calendar: models.Calendar) -> typing.Dict[str, typing.Any]
)
return c
def calendar_rule_exporter(calendar_rule: models.CalendarRule) -> typing.Dict[str, typing.Any]:
def calendar_rule_exporter(
calendar_rule: models.CalendarRule,
) -> typing.Dict[str, typing.Any]:
"""
Exports a calendar rule to a dict
"""
@ -232,6 +236,7 @@ def calendar_rule_exporter(calendar_rule: models.CalendarRule) -> typing.Dict[st
)
return c
class Command(BaseCommand):
help = 'Export entities from UDS to be imported in another UDS instance'
@ -315,7 +320,7 @@ class Command(BaseCommand):
entities = self.remove_reduntant_entities(options['entities'])
# For each entity, export it as yaml to output file
with open(options['output'], 'w') as f:
with open(options['output'], 'w', encoding='utf8') as f:
for entity in entities:
self.stderr.write(f'Exporting {entity}')
f.write(self.VALID_ENTITIES[entity]())
@ -324,9 +329,7 @@ class Command(BaseCommand):
if self.verbose:
self.stderr.write(f'Exported to {options["output"]}')
def apply_filter(
self, model: typing.Type[ModelType]
) -> typing.Iterable[ModelType]:
def apply_filter(self, model: typing.Type[ModelType]) -> typing.Iterable[ModelType]:
"""
Applies a filter to a model
"""
@ -337,7 +340,9 @@ class Command(BaseCommand):
self.stderr.write("\n ".join(values))
# Generate "OR" filter with all kwargs
if self.filter_args:
return model.objects.filter(reduce(operator.or_, (Q(**{k: v}) for k, v in self.filter_args)))
return model.objects.filter(
reduce(operator.or_, (Q(**{k: v}) for k, v in self.filter_args))
)
return model.objects.all()
def output_count(
@ -367,16 +372,11 @@ class Command(BaseCommand):
def export_services(self) -> str:
# First, locate providers for services with the filter
services_list = list(
self.output_count(
'Filtering services', self.apply_filter(models.Service)
)
)
providers_list = set(
[
s.provider
for s in self.output_count('Filtering providers', services_list)
]
self.output_count('Filtering services', self.apply_filter(models.Service))
)
providers_list = {
s.provider for s in self.output_count('Filtering providers', services_list)
}
# Now, export those providers
providers = [
provider_exporter(p)
@ -416,16 +416,11 @@ class Command(BaseCommand):
"""
# first, locate authenticators for users with the filter
users_list = list(
self.output_count(
'Filtering users', self.apply_filter(models.User)
)
)
authenticators_list = set(
[
u.manager
for u in self.output_count('Filtering authenticators', users_list)
]
self.output_count('Filtering users', self.apply_filter(models.User))
)
authenticators_list = {
u.manager for u in self.output_count('Filtering authenticators', users_list)
}
# Now, groups that contains those users
groups_list = set()
for u in self.output_count('Filtering groups', users_list):
@ -461,16 +456,12 @@ class Command(BaseCommand):
"""
# First export authenticators for groups with the filter
groups_list = list(
self.output_count(
'Filtering groups', self.apply_filter(models.Group)
)
)
authenticators_list = set(
[
g.manager
for g in self.output_count('Filtering authenticators', groups_list)
]
self.output_count('Filtering groups', self.apply_filter(models.Group))
)
authenticators_list = {
g.manager
for g in self.output_count('Filtering authenticators', groups_list)
}
authenticators = [
authenticator_exporter(a)
for a in self.output_count('Saving authenticators', authenticators_list)

View File

@ -1,2 +1,4 @@
# Placeholder, import the command from udsfs
# pylint: disable=unused-import
from .udsfs import Command

View File

@ -87,6 +87,6 @@ class Command(BaseCommand):
if options['yaml']:
self.stdout.write(yaml.safe_dump(writer, default_flow_style=False))
except Exception as e:
self.stdout.write('The command could not be processed: {}'.format(e))
self.stdout.write(f'The command could not be processed: {e}')
self.stdout.flush()
logger.exception('Exception processing %s', args)

View File

@ -65,7 +65,7 @@ def become_daemon(
if os.fork() > 0:
sys.exit(0) # kill off parent
except OSError as e:
sys.stderr.write("fork #1 failed: (%d) %s\n" % (e.errno, e.strerror))
sys.stderr.write(f'fork #1 failed: ({e.errno}) {e.strerror}')
sys.exit(1)
os.setsid()
os.chdir(our_home_dir)
@ -76,12 +76,12 @@ def become_daemon(
if os.fork() > 0:
os._exit(0) # pylint: disable=protected-access
except OSError as e:
sys.stderr.write("fork #2 failed: (%d) %s\n" % (e.errno, e.strerror))
sys.stderr.write(f'fork #2 failed: ({e.errno}) {e.strerror}')
os._exit(1) # pylint: disable=protected-access
si = open('/dev/null', 'r')
so = open(out_log, 'a+', 1)
se = open(err_log, 'a+', 1)
si = open('/dev/null', 'r', encoding='utf-8') # pylint: disable=consider-using-with
so = open(out_log, 'a+', 1, encoding='utf-8') # pylint: disable=consider-using-with
se = open(err_log, 'a+', 1, encoding='utf-8') # pylint: disable=consider-using-with
os.dup2(si.fileno(), sys.stdin.fileno())
os.dup2(so.fileno(), sys.stdout.fileno())
os.dup2(se.fileno(), sys.stderr.fileno())
@ -136,7 +136,7 @@ class Command(BaseCommand):
pid: int = 0
try:
pid = int(open(getPidFile(), 'r').readline())
pid = int(open(getPidFile(), 'r', encoding='utf8').readline()) # pylint: disable=consider-using-with
except Exception:
pid = 0
@ -161,7 +161,8 @@ class Command(BaseCommand):
)
pid = os.getpid()
open(getPidFile(), 'w+').write('{}\n'.format(pid))
with open(getPidFile(), 'w+', encoding='utf8') as f:
f.write(f'{pid}\n')
manager = taskManager()
manager.run()
@ -169,7 +170,7 @@ class Command(BaseCommand):
if not start and not stop:
if pid:
self.stdout.write(
"Task manager found running (pid file exists: {0})\n".format(pid)
f'Task manager found running (pid file exists: {pid})\n'
)
else:
self.stdout.write("Task manager not foud (pid file do not exits)\n")

View File

@ -32,10 +32,11 @@
"""
import logging
import typing
import yaml
import collections
from django.core.management.base import BaseCommand
from uds.core.util import log
from uds import models
from uds.core.util.state import State
@ -85,7 +86,7 @@ def getSerializedFromModel(
removableFields = removableFields or []
passwordFields = passwordFields or []
try:
values = mod._meta.managers[0].filter(pk=mod.pk).values()[0] # type: ignore
values = mod._meta.managers[0].filter(pk=mod.pk).values()[0] # type: ignore # pylint: disable=protected-access
for i in ['uuid', 'id'] + removableFields:
if i in values:
del values[i]
@ -118,6 +119,7 @@ class Command(BaseCommand):
help='Maximum elements exported for groups and user services',
)
# pylint: disable=too-many-locals,too-many-branches,too-many-statements
def handle(self, *args, **options) -> None:
logger.debug("Show Tree")
# firt, genertate Provider-service-servicepool tree
@ -134,14 +136,13 @@ class Command(BaseCommand):
try:
providers = {}
for provider in models.Provider.objects.all():
services = {}
totalServices = 0
totalServicePools = 0
totalUserServices = 0
for service in provider.services.all():
servicePools = {}
numberOfServicePools = 0
numberOfServicePools = 0
numberOfUserServices = 0
for servicePool in service.deployedServices.all():
# get assigned user services with ERROR status
@ -151,12 +152,7 @@ class Command(BaseCommand):
fltr = fltr.filter(state=State.ERROR)
for item in fltr[:max_items]: # at most max_items items
logs = [
'{}: {} [{}] - {}'.format(
l['date'],
log.logStrFromLevel(l['level']),
l['source'],
l['message'],
)
f'{l["date"]}: {log.logStrFromLevel(l["level"])} [{l["source"]}] - {l["message"]}'
for l in log.getLogs(item)
]
userServices[item.friendly_name] = {
@ -197,9 +193,9 @@ class Command(BaseCommand):
except Exception:
changelogs = []
publications[str(publication.revision)] = getSerializedFromModel(
publication, ['data']
)
publications[
str(publication.revision)
] = getSerializedFromModel(publication, ['data'])
publications[str(publication.revision)][
'changelogs'
] = changelogs
@ -242,7 +238,9 @@ class Command(BaseCommand):
numberOfServicePools = len(servicePools)
totalServicePools += numberOfServicePools
services[f'{service.name} ({numberOfServicePools}, {numberOfUserServices})'] = {
services[
f'{service.name} ({numberOfServicePools}, {numberOfUserServices})'
] = {
'_': getSerializedFromManagedObject(service),
'servicePools': servicePools,
}
@ -262,8 +260,12 @@ class Command(BaseCommand):
for authenticator in models.Authenticator.objects.all():
# Groups
grps: typing.Dict[str, typing.Any] = {}
for group in authenticator.groups.all()[:max_items]: # at most max_items items
grps[group.name] = getSerializedFromModel(group, ['manager_id', 'name'])
for group in authenticator.groups.all()[
:max_items
]: # at most max_items items
grps[group.name] = getSerializedFromModel(
group, ['manager_id', 'name']
)
authenticators[authenticator.name] = {
'_': getSerializedFromManagedObject(authenticator),
'groups': grps,
@ -375,6 +377,6 @@ class Command(BaseCommand):
self.stdout.write(yaml.safe_dump(tree, default_flow_style=False))
except Exception as e:
self.stdout.write('The command could not be processed: {}'.format(e))
self.stdout.write(f'The command could not be processed: {e}')
self.stdout.flush()
logger.exception('Exception processing %s', args)

View File

@ -95,7 +95,7 @@ class EventFS(types.UDSFSInterface):
): # Return days of month as indicated on path
month = int(path[1])
return ['.', '..'] + [
'{:02d}'.format(x)
f'{x:02d}'
for x in range(1, EventFS.number_of_days(year, month) + 1)
]

View File

@ -6,13 +6,13 @@ import logging
from uds import models
from uds.core.util.cache import Cache
from uds.core.util.stats import events, counters
from . import types
logger = logging.getLogger(__name__)
# Custom types
class StatInterval(typing.NamedTuple):
start: datetime.datetime
@ -87,13 +87,13 @@ class StatsFS(types.UDSFSInterface):
self, filename: typing.List[str]
) -> typing.Tuple[DispatcherType, StatInterval, str]:
if len(filename) != 1:
raise FileNotFoundError
raise FileNotFoundError()
# Extract components
try:
dispatcher, interval, extension = (filename[0].split('.') + [''])[:3]
except ValueError:
raise FileNotFoundError
raise FileNotFoundError() from None
logger.debug(
'Dispatcher: %s, interval: %s, extension: %s',
@ -103,16 +103,16 @@ class StatsFS(types.UDSFSInterface):
)
if dispatcher not in self._dispatchers:
raise FileNotFoundError
raise FileNotFoundError()
fnc, requiresInterval = self._dispatchers[dispatcher]
if extension == '' and requiresInterval is True:
raise FileNotFoundError
raise FileNotFoundError()
if requiresInterval:
if interval not in self._interval:
raise FileNotFoundError
raise FileNotFoundError()
range = self._interval[interval]
else:
@ -122,7 +122,7 @@ class StatsFS(types.UDSFSInterface):
extension = interval
if extension != 'csv':
raise FileNotFoundError
raise FileNotFoundError()
todayStart = datetime.datetime.utcnow().replace(
hour=0, minute=0, second=0, microsecond=0
@ -174,14 +174,14 @@ class StatsFS(types.UDSFSInterface):
cacheTime = 60
# Check if the file info is cached
cached = self._cache.get(path[0]+extension)
cached = self._cache.get(path[0] + extension)
if cached is not None:
logger.debug('Cache hit for %s', path[0])
data = cached
else:
logger.debug('Cache miss for %s', path[0])
data = dispatcher(interval, extension, 0, 0)
self._cache.put(path[0]+extension, data, cacheTime)
self._cache.put(path[0] + extension, data, cacheTime)
# Calculate the size of the file
size = len(data)
@ -206,14 +206,14 @@ class StatsFS(types.UDSFSInterface):
cacheTime = 60
# Check if the file info is cached
cached = self._cache.get(path[0]+extension)
cached = self._cache.get(path[0] + extension)
if cached is not None:
logger.debug('Cache hit for %s', path[0])
data = cached
else:
logger.debug('Cache miss for %s', path[0])
data = dispatcher(interval, extension, 0, 0)
self._cache.put(path[0]+extension, data, cacheTime)
self._cache.put(path[0] + extension, data, cacheTime)
# Dispatch the read to the dispatcher
data = dispatcher(interval, extension, size, offset)

View File

@ -27,7 +27,7 @@ class StatType(typing.NamedTuple):
'st_ctime': self.st_ctime,
'st_mtime': self.st_mtime,
'st_atime': self.st_atime,
'st_nlink': self.st_nlink
'st_nlink': self.st_nlink,
}
# Append optional fields
if self.st_dev != -1:
@ -46,7 +46,7 @@ class StatType(typing.NamedTuple):
rst['st_blksize'] = self.st_blksize
if self.st_blocks != -1:
rst['st_blocks'] = self.st_blocks
return rst
@ -54,6 +54,7 @@ class UDSFSInterface:
"""
Base Class for UDS Info File system
"""
def getattr(self, path: typing.List[str]) -> StatType:
"""
Get file attributes. Path is the full path to the file, already splitted.
@ -65,12 +66,12 @@ class UDSFSInterface:
Get a list of files in the directory. Path is the full path to the directory, already splitted.
"""
raise NotImplementedError
def read(self, path: typing.List[str], size: int, offset: int) -> bytes:
"""
Read a file. Path is the full path to the file, already splitted.
"""
raise NotImplementedError
def flush(self, path: typing.List[str]) -> None:
def flush(self, path: typing.List[str]) -> None: # pylint: disable=unused-argument
return

View File

@ -26,4 +26,5 @@
# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
# pylint: disable=unused-import
from . import mfa

View File

@ -133,12 +133,7 @@ class EmailMFA(mfas.MFA):
defaultValue='0',
tooltip=_('Action for MFA response error'),
required=True,
values={
'0': _('Allow user login'),
'1': _('Deny user login'),
'2': _('Allow user to login if it IP is in the networks list'),
'3': _('Deny user to login if it IP is in the networks list'),
},
values=mfas.LoginAllowed.valuesForSelect(),
tab=_('Config'),
)
@ -157,7 +152,10 @@ class EmailMFA(mfas.MFA):
label=_('Mail text'),
order=33,
multiline=4,
tooltip=_('Text of the email. If empty, a default text will be used') + '\n' + _('Allowed variables are: ') + '{code}, {username}, {justUsername}. {ip}',
tooltip=_('Text of the email. If empty, a default text will be used')
+ '\n'
+ _('Allowed variables are: ')
+ '{code}, {username}, {justUsername}. {ip}',
required=True,
defvalue='',
tab=_('Config'),
@ -168,7 +166,11 @@ class EmailMFA(mfas.MFA):
label=_('Mail HTML'),
order=34,
multiline=4,
tooltip=_('HTML of the email. If empty, a default HTML will be used')+ '\n' + _('Allowed variables are: ') + '{code}, {username}, {justUsername}, {ip}',
tooltip=_('HTML of the email. If empty, a default HTML will be used')
+ '\n'
+ _('Allowed variables are: ')
+ '\n'
+ '{code}, {username}, {justUsername}, {ip}',
required=False,
defvalue='',
tab=_('Config'),
@ -191,7 +193,7 @@ class EmailMFA(mfas.MFA):
# Now check is valid format
if ':' in hostname:
host, port = validators.validateHostPortPair(hostname)
self.hostname.value = '{}:{}'.format(host, port)
self.hostname.value = f'{host}:{port}'
else:
host = self.hostname.cleanStr()
self.hostname.value = validators.validateFqdn(host)
@ -203,7 +205,7 @@ class EmailMFA(mfas.MFA):
return gettext(
'Check your mail. You will receive an email with the verification code'
)
def initGui(self) -> None:
# Populate the networks list
self.networks.setValues(
@ -214,28 +216,10 @@ class EmailMFA(mfas.MFA):
]
)
def checkAction(self, action: str, request: 'ExtendedHttpRequest') -> bool:
def checkIp() -> bool:
return any(
i.contains(request.ip)
for i in models.Network.objects.filter(uuid__in=self.networks.value)
)
if action == '0':
return True
elif action == '1':
return False
elif action == '2':
return checkIp()
elif action == '3':
return not checkIp()
else:
return False
def emptyIndentifierAllowedToLogin(
self, request: 'ExtendedHttpRequest'
) -> typing.Optional[bool]:
return self.checkAction(self.allowLoginWithoutMFA.value, request)
return mfas.LoginAllowed.checkAction(self.allowLoginWithoutMFA.value, request, self.networks.value)
def label(self) -> str:
return 'OTP received via email'
@ -270,7 +254,7 @@ class EmailMFA(mfas.MFA):
smtp.sendmail(self.fromEmail.value, identifier, msg.as_string())
except smtplib.SMTPException as e:
logger.error('Error sending email: {}'.format(e))
logger.error('Error sending email: %s', e)
raise
def sendCode(

View File

@ -1 +1,30 @@
# -*- coding: utf-8 -*-
#
# Copyright (c) 2022 Virtual Cable S.L.U.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without modification,
# are permitted provided that the following conditions are met:
#
# * Redistributions of source code must retain the above copyright notice,
# this list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above copyright notice,
# this list of conditions and the following disclaimer in the documentation
# and/or other materials provided with the distribution.
# * Neither the name of Virtual Cable S.L. nor the names of its contributors
# may be used to endorse or promote products derived from this software
# without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
# DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
# SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
# pylint: disable=unused-import
from . import mfa

View File

@ -40,7 +40,13 @@ from uds.core import mfas
from uds.core.ui import gui
from uds.auths.Radius import client
from uds.auths.Radius.client import NOT_CHECKED, INCORRECT, CORRECT, NOT_NEEDED, NEEDED
from uds.auths.Radius.client import (
# NOT_CHECKED,
INCORRECT,
CORRECT,
NOT_NEEDED,
# NEEDED
)
from uds.core.auths.auth import webPassword
from uds.core.auths import exceptions
@ -108,12 +114,7 @@ class RadiusOTP(mfas.MFA):
defaultValue='0',
tooltip=_('Action for OTP server communication error'),
required=True,
values={
'0': _('Allow user login'),
'1': _('Deny user login'),
'2': _('Allow user to login if it IP is in the networks list'),
'3': _('Deny user to login if it IP is in the networks list'),
},
values=mfas.LoginAllowed.valuesForSelect(),
tab=_('Config'),
)
@ -133,12 +134,7 @@ class RadiusOTP(mfas.MFA):
defaultValue='0',
tooltip=_('Action for user without defined Radius Challenge'),
required=True,
values={
'0': _('Allow user login'),
'1': _('Deny user login'),
'2': _('Allow user to login if it IP is in the networks list'),
'3': _('Deny user to login if it IP is in the networks list'),
},
values=mfas.LoginAllowed.valuesForSelect(),
tab=_('Config'),
)
@ -163,30 +159,16 @@ class RadiusOTP(mfas.MFA):
nasIdentifier=self.nasIdentifier.value,
)
def checkAction(self, action: str, request: 'ExtendedHttpRequest') -> bool:
def checkIp() -> bool:
return any(
i.contains(request.ip)
for i in models.Network.objects.filter(uuid__in=self.networks.value)
)
if action == '0':
return True
elif action == '1':
return False
elif action == '2':
return checkIp()
elif action == '3':
return not checkIp()
else:
return False
def checkResult(self, action: str, request: 'ExtendedHttpRequest') -> mfas.MFA.RESULT:
if self.checkAction(action, request):
def checkResult(
self, action: str, request: 'ExtendedHttpRequest'
) -> mfas.MFA.RESULT:
if mfas.LoginAllowed.checkAction(action, request, self.networks.value):
return mfas.MFA.RESULT.OK
raise Exception('User not allowed to login')
def emptyIndentifierAllowedToLogin(self, request: 'ExtendedHttpRequest') -> typing.Optional[bool]:
def emptyIndentifierAllowedToLogin(
self, request: 'ExtendedHttpRequest'
) -> typing.Optional[bool]:
return None
def label(self) -> str:
@ -225,8 +207,8 @@ class RadiusOTP(mfas.MFA):
logger.error(
"Exception found connecting to Radius OTP %s: %s", e.__class__, e
)
if not self.checkAction(self.responseErrorAction.value, request):
raise Exception(_('Radius OTP connection error'))
if not mfas.LoginAllowed.checkAction(self.responseErrorAction.value, request, self.networks.value):
raise Exception(_('Radius OTP connection error')) from e
logger.warning(
"Radius OTP connection error: Allowing access to user [%s] from IP [%s] without OTP",
username,
@ -287,8 +269,8 @@ class RadiusOTP(mfas.MFA):
logger.error(
"Exception found connecting to Radius OTP %s: %s", e.__class__, e
)
if not self.checkAction(self.responseErrorAction.value, request):
raise Exception(_('Radius OTP connection error'))
if mfas.LoginAllowed.checkAction(self.responseErrorAction.value, request, self.networks.value):
raise Exception(_('Radius OTP connection error')) from e
logger.warning(
"Radius OTP connection error: Allowing access to user [%s] from IP [%s] without OTP",
username,

View File

@ -26,7 +26,8 @@
# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
"""
@author: Adolfo Gómez, dkmaster at dkmon dot com
Author: Adolfo Gómez, dkmaster at dkmon dot com
"""
# pylint: disable=unused-import
from . import mfa

View File

@ -26,7 +26,7 @@
# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
"""
@author: Adolfo Gómez, dkmaster at dkmon dot com
Author: Adolfo Gómez, dkmaster at dkmon dot com
"""
import typing
@ -94,6 +94,7 @@ class SMSMFA(mfas.MFA):
"""
typeName = _('SMS via HTTP')
typeType = 'smsHttpMFA'
typeDescription = _('Simple SMS sending MFA using HTTP/HTTPS')
@ -243,12 +244,7 @@ class SMSMFA(mfas.MFA):
defaultValue='0',
tooltip=_('Action for SMS response error'),
required=True,
values={
'0': _('Allow user login'),
'1': _('Deny user login'),
'2': _('Allow user to login if its IP is in the networks list'),
'3': _('Deny user to login if its IP is in the networks list'),
},
values=mfas.LoginAllowed.valuesForSelect(),
tab=_('Config'),
)
@ -275,7 +271,13 @@ class SMSMFA(mfas.MFA):
]
)
def composeSmsUrl(self, userId: str, userName: str, code: str, phone: str) -> str:
def composeSmsUrl(
self,
userId: str, # pylint: disable=unused-argument
userName: str,
code: str,
phone: str,
) -> str:
url = self.sendingUrl.value
url = url.replace('{code}', code)
url = url.replace('{phone}', phone.replace('+', ''))
@ -285,7 +287,9 @@ class SMSMFA(mfas.MFA):
return url
def getSession(self) -> requests.Session:
session = security.secureRequestsSession(verify=self.ignoreCertificateErrors.isTrue())
session = security.secureRequestsSession(
verify=self.ignoreCertificateErrors.isTrue()
)
# 0 means no authentication
if self.authenticationMethod.value == '1':
session.auth = requests.auth.HTTPBasicAuth(
@ -307,37 +311,29 @@ class SMSMFA(mfas.MFA):
session.headers[headerName.strip()] = headerValue.strip()
return session
def checkAction(self, action: str, request: 'ExtendedHttpRequest') -> bool:
def checkIp() -> bool:
return any(
i.contains(request.ip)
for i in models.Network.objects.filter(uuid__in=self.networks.value)
)
if action == '0':
return True
elif action == '1':
return False
elif action == '2':
return checkIp()
elif action == '3':
return not checkIp()
else:
return False
def emptyIndentifierAllowedToLogin(self, request: 'ExtendedHttpRequest') -> typing.Optional[bool]:
return self.checkAction(self.allowLoginWithoutMFA.value, request)
def emptyIndentifierAllowedToLogin(
self, request: 'ExtendedHttpRequest'
) -> typing.Optional[bool]:
return mfas.LoginAllowed.checkAction(
self.allowLoginWithoutMFA.value, request, self.networks.value
)
def processResponse(
self, request: 'ExtendedHttpRequest', response: requests.Response
) -> mfas.MFA.RESULT:
logger.debug('Response: %s', response)
if not response.ok:
logger.warning('SMS sending failed: code: %s, content: %s', response.status_code, response.text)
if not self.checkAction(self.responseErrorAction.value, request):
logger.warning(
'SMS sending failed: code: %s, content: %s',
response.status_code,
response.text,
)
if not mfas.LoginAllowed.checkAction(
self.responseErrorAction.value, request, self.networks.value
):
raise Exception(_('SMS sending failed'))
return mfas.MFA.RESULT.ALLOWED # Allow login, NO MFA code was sent
elif self.responseOkRegex.value.strip():
if self.responseOkRegex.value.strip():
logger.debug(
'Checking response OK regex: %s: (%s)',
self.responseOkRegex.value,
@ -348,17 +344,19 @@ class SMSMFA(mfas.MFA):
'SMS response error: %s',
response.text,
)
if not self.checkAction(self.responseErrorAction.value, request):
if not mfas.LoginAllowed.checkAction(
self.responseErrorAction.value, request, self.networks.value
):
raise Exception(_('SMS response error'))
return mfas.MFA.RESULT.ALLOWED
return mfas.MFA.RESULT.OK
def getData(
self,
request: 'ExtendedHttpRequest',
userId: str,
request: 'ExtendedHttpRequest', # pylint: disable=unused-argument
userId: str, # pylint: disable=unused-argument
username: str,
url: str,
url: str, # pylint: disable=unused-argument
code: str,
phone: str,
) -> bytes:
@ -374,7 +372,11 @@ class SMSMFA(mfas.MFA):
return data.encode(self.encoding.value)
def sendSMS_GET(
self, request: 'ExtendedHttpRequest', userId: str, username: str, url: str
self,
request: 'ExtendedHttpRequest',
userId: str, # pylint: disable=unused-argument
username: str, # pylint: disable=unused-argument
url: str,
) -> mfas.MFA.RESULT:
return self.processResponse(request, self.getSession().get(url))
@ -405,7 +407,6 @@ class SMSMFA(mfas.MFA):
phone: str,
) -> mfas.MFA.RESULT:
# Compose POST data
data = ''
bdata = self.getData(request, userId, username, url, code, phone)
return self.processResponse(request, self.getSession().put(url, data=bdata))
@ -420,12 +421,11 @@ class SMSMFA(mfas.MFA):
url = self.composeSmsUrl(userId, username, code, phone)
if self.sendingMethod.value == 'GET':
return self.sendSMS_GET(request, userId, username, url)
elif self.sendingMethod.value == 'POST':
if self.sendingMethod.value == 'POST':
return self.sendSMS_POST(request, userId, username, url, code, phone)
elif self.sendingMethod.value == 'PUT':
if self.sendingMethod.value == 'PUT':
return self.sendSMS_PUT(request, userId, username, url, code, phone)
else:
raise Exception('Unknown SMS sending method')
raise Exception('Unknown SMS sending method')
def label(self) -> str:
return gettext('MFA Code')

View File

@ -26,7 +26,8 @@
# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
"""
@author: Adolfo Gómez, dkmaster at dkmon dot com
Author: Adolfo Gómez, dkmaster at dkmon dot com
"""
# pylint: disable=unused-import
from . import mfa

View File

@ -1 +1,33 @@
# -*- coding: utf-8 -*-
#
# Copyright (c) 2022 Virtual Cable S.L.U.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without modification,
# are permitted provided that the following conditions are met:
#
# * Redistributions of source code must retain the above copyright notice,
# this list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above copyright notice,
# this list of conditions and the following disclaimer in the documentation
# and/or other materials provided with the distribution.
# * Neither the name of Virtual Cable S.L. nor the names of its contributors
# may be used to endorse or promote products derived from this software
# without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
# DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
# SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
"""
Author: Adolfo Gómez, dkmaster at dkmon dot com
"""
# pylint: disable=unused-import
from . import mfa

View File

@ -26,7 +26,7 @@
# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
"""
@author: Adolfo Gómez, dkmaster at dkmon dot com
Author: Adolfo Gómez, dkmaster at dkmon dot com
"""
import typing
import logging
@ -155,7 +155,7 @@ class TOTP_MFA(mfas.MFA):
def html(self, request: 'ExtendedHttpRequest', userId: str, username: str) -> str:
# Get data from storage related to this user
secret, qrShown = self._userData(userId)
qrShown = self._userData(userId)[1]
if qrShown:
return _('Enter your authentication code')
# Compose the QR code from provisioning URI

View File

@ -45,24 +45,22 @@ import importlib
import sys
import typing
def __init__():
import logging
from uds.core.util import modfinder
logger = logging.getLogger(__name__)
def __loadModules():
"""
This imports all packages that are descendant of this package, and, after that,
it register all subclases of authenticator as
it register all subclases of mfas.MFA
"""
from uds.core import mfas
from uds.core import mfas # pylint: disable=import-outside-toplevel
# Dinamycally import children of this package. The __init__.py files must declare mfs as subclasses of mfas.MFA
pkgpath = os.path.dirname(typing.cast(str, sys.modules[__name__].__file__))
for _, name, _ in pkgutil.iter_modules([pkgpath]):
# __import__(name, globals(), locals(), [], 1)
importlib.import_module('.' + name, __name__) # import module
importlib.invalidate_caches()
a = mfas.MFA
for cls in a.__subclasses__():
mfas.factory().insert(cls)
modfinder.dynamicLoadAndRegisterModules(mfas.factory(), mfas.MFA, __name__)
__init__()
__loadModules()

View File

@ -77,12 +77,12 @@ def build_middleware(
)
return async_middleware
else:
def sync_middleware(request: 'ExtendedHttpRequest') -> 'HttpResponse':
response = request_processor(request)
return response_processor(request, response or get_response(request))
# Sync middleware
def sync_middleware(request: 'ExtendedHttpRequest') -> 'HttpResponse':
response = request_processor(request)
return response_processor(request, response or get_response(request))
return sync_middleware
return sync_middleware
return middleware

View File

@ -34,6 +34,7 @@ import typing
from django.urls import reverse
from django.http import HttpResponsePermanentRedirect
from django.conf import settings
from . import builder
@ -43,7 +44,7 @@ if typing.TYPE_CHECKING:
from django.http import HttpRequest, HttpResponse
def _check_redirectable(request: 'HttpRequest') -> typing.Optional['HttpResponse']:
if request.is_secure():
if request.is_secure() or settings.DEBUG:
return None
return HttpResponsePermanentRedirect(request.build_absolute_uri(reverse('page.index')).replace('http://', 'https://', 1))

View File

@ -47,7 +47,7 @@ from uds.core.auths.auth import (
)
from uds.models import User
from . import builder
from . import builder
if typing.TYPE_CHECKING:
from django.http import HttpResponse

View File

@ -33,16 +33,16 @@ import re
import logging
import typing
logger = logging.getLogger(__name__)
from django.http import HttpResponseForbidden
from uds.core.util.config import GlobalConfig
from uds.core.auths.auth import isTrustedSource, IP_KEY
from . import builder
logger = logging.getLogger(__name__)
if typing.TYPE_CHECKING:
from django.http import HttpResponse
from uds.core.util.request import ExtendedHttpRequest
@ -66,7 +66,7 @@ def _process_request(request: 'ExtendedHttpRequest') -> typing.Optional['HttpRes
request.path,
)
return HttpResponseForbidden(content='Forbbiden', content_type='text/plain')
if GlobalConfig.ENHANCED_SECURITY.getBool():
# Check that ip stored in session is the same as the one that is requesting if user is logged in
session_ip = request.session.get(IP_KEY, None)
@ -79,12 +79,13 @@ def _process_request(request: 'ExtendedHttpRequest') -> typing.Optional['HttpRes
request.session.get('ip', None),
)
return HttpResponseForbidden(content='Forbbiden', content_type='text/plain')
return None
def _process_response(
request: 'ExtendedHttpRequest', response: 'HttpResponse'
request: 'ExtendedHttpRequest', # pylint: disable=unused-argument
response: 'HttpResponse',
) -> 'HttpResponse':
if GlobalConfig.ENHANCED_SECURITY.getBool():
# Legacy browser support for X-XSS-Protection

View File

@ -39,13 +39,16 @@ if typing.TYPE_CHECKING:
logger = logging.getLogger(__name__)
def _process_response(
request: 'ExtendedHttpRequest', response: 'HttpResponse'
request: 'ExtendedHttpRequest', # pylint: disable=unused-argument
response: 'HttpResponse',
) -> 'HttpResponse':
if response.get('content-type', '').startswith('text/html'):
response['X-UA-Compatible'] = 'IE=edge'
return response
# Add a X-UA-Compatible header to the response
# This header tells to Internet Explorer to render page with latest
# possible version or to use chrome frame if it is installed. TO BE REMOVED SOON (edge does not need it)

View File

@ -9,33 +9,37 @@ import uds.models.util
# Remove ServicePools with null service field
def remove_null_service_pools(apps, schema_editor):
def remove_null_service_pools(apps, schema_editor): # pylint: disable=unused-argument
ServicePool = apps.get_model('uds', 'ServicePool')
ServicePool.objects.filter(service__isnull=True).delete()
# No-Op backwards migration
def nop(apps, schema_editor): # pragma: no cover
def nop(apps, schema_editor): # pylint: disable=unused-argument
pass
# Python update network fields to allow ipv6
# We will
def update_network_model(apps, schema_editor):
import uds.models.network
# We will
def update_network_model(apps, schema_editor): # pylint: disable=unused-argument
import uds.models.network # pylint: disable=import-outside-toplevel,redefined-outer-name
Network = apps.get_model('uds', 'Network')
try:
for net in Network.objects.all():
# Store the net_start and net_end on new fields "start" and "end", that are strings
# to allow us to store ipv6 addresses
# pylint: disable=protected-access
net.start = uds.models.network.Network._hexlify(net.net_start)
# pylint: disable=protected-access
net.end = uds.models.network.Network._hexlify(net.net_end)
net.version = 4 # Previous versions only supported ipv4
net.save()
except Exception as e:
print('Error updating network model: {}'.format(e))
print(f'Error updating network model: {e}')
class Migration(migrations.Migration):
dependencies = [
("uds", "0043_auto_20220704_2120"),
]
@ -133,7 +137,7 @@ class Migration(migrations.Migration):
models.CharField(
blank=True,
db_index=True,
default=uds.models.user_service_session._session_id_generator,
default=uds.models.user_service_session._session_id_generator, # pylint: disable=protected-access
max_length=128,
),
),

View File

@ -1,7 +1,7 @@
# -*- coding: utf-8 -*-
# pylint: disable=unused-import
#
# Copyright (c) 2012-2020 Virtual Cable S.L.U.
# Copyright (c) 2012-2023 Virtual Cable S.L.U.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without modification,
@ -28,7 +28,7 @@
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
"""
.. moduleauthor:: Adolfo Gómez, dkmaster at dkmon dot com
Author: Adolfo Gómez, dkmaster at dkmon dot com
"""
import logging

View File

@ -92,7 +92,7 @@ class Account(UUIDModel, TaggingMixin):
tmp.save()
return tmp
class Meta:
class Meta: # pylint: disable=too-few-public-methods
"""
Meta class to declare the name of the table at database
"""
@ -101,4 +101,4 @@ class Account(UUIDModel, TaggingMixin):
app_label = 'uds'
def __str__(self):
return 'Account id {}, name {}'.format(self.id, self.name)
return f'Account id {self.id}, name {self.name}'

View File

@ -46,7 +46,7 @@ if typing.TYPE_CHECKING:
logger = logging.getLogger(__name__)
# pylint: disable=no-member # pylint complais a lot about members of models...
class AccountUsage(UUIDModel):
"""
AccountUsage storing on DB model
@ -74,7 +74,7 @@ class AccountUsage(UUIDModel):
Account, related_name='usages', on_delete=models.CASCADE
)
class Meta:
class Meta: # pylint: disable=too-few-public-methods
"""
Meta class to declare the name of the table at database
"""
@ -111,6 +111,4 @@ class AccountUsage(UUIDModel):
return secondsToTimeString(self.elapsed_seconds_timemark)
def __str__(self):
return 'AccountUsage id {}, pool {}, name {}, start {}, end {}'.format(
self.id, self.pool_name, self.user_name, self.start, self.end
)
return f'AccountUsage id {self.id}, pool {self.pool_name}, name {self.user_name}, start {self.start}, end {self.end}'

View File

@ -1,6 +1,6 @@
# -*- coding: utf-8 -*-
#
# Copyright (c) 2012-2020 Virtual Cable S.L.U.
# Copyright (c) 2012-2023 Virtual Cable S.L.U.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without modification,
@ -26,12 +26,13 @@
# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
'''
.. moduleauthor:: Adolfo Gómez, dkmaster at dkmon dot com
Author: Adolfo Gómez, dkmaster at dkmon dot com
'''
from django.db import models
from .util import MAX_IPV6_LENGTH, MAX_DNS_NAME_LENGTH
class ActorToken(models.Model):
"""
UDS Actors tokens on DB
@ -41,7 +42,7 @@ class ActorToken(models.Model):
ip_from = models.CharField(max_length=MAX_IPV6_LENGTH)
ip = models.CharField(max_length=MAX_IPV6_LENGTH)
ip_version = models.IntegerField(default=4) # Version of ip fields
hostname = models.CharField(max_length=MAX_DNS_NAME_LENGTH)
mac = models.CharField(max_length=128, db_index=True, unique=True)
pre_command = models.CharField(max_length=255, blank=True, default='')
@ -52,13 +53,8 @@ class ActorToken(models.Model):
token = models.CharField(max_length=48, db_index=True, unique=True)
stamp = models.DateTimeField() # Date creation or validation of this entry
# "fake" declarations for type checking
# objects: 'models.manager.Manager[ActorToken]'
class Meta:
class Meta: # pylint: disable=too-few-public-methods
app_label = 'uds'
def __str__(self):
return '<ActorToken {} created on {} by {} from {}/{}>'.format(
self.token, self.stamp, self.username, self.hostname, self.ip_from
)
return f'<ActorToken {self.token} created on {self.stamp} by {self.username} from {self.hostname}/{self.ip_from}>'

View File

@ -1,6 +1,6 @@
# -*- coding: utf-8 -*-
#
# Copyright (c) 2012-2021 Virtual Cable S.L.U.
# Copyright (c) 2012-2023 Virtual Cable S.L.U.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without modification,
@ -27,13 +27,12 @@
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
"""
.. moduleauthor:: Adolfo Gómez, dkmaster at dkmon dot com
Author: Adolfo Gómez, dkmaster at dkmon dot com
"""
import logging
import typing
from django.db import models
from django.db.models import signals
from uds.core import auths
from uds.core import environment
@ -57,6 +56,7 @@ class Authenticator(ManagedObjectModel, TaggingMixin):
This class represents an Authenticator inside the platform.
Sample authenticators are LDAP, Active Directory, SAML, ...
"""
# Constants for Visibility
VISIBLE = 'v'
HIDDEN = 'h'
@ -82,7 +82,13 @@ class Authenticator(ManagedObjectModel, TaggingMixin):
networks: 'models.manager.RelatedManager[Network]'
# MFA associated to this authenticator. Can be null
mfa = models.ForeignKey('MFA', on_delete=models.SET_NULL, null=True, blank=True, related_name='authenticators')
mfa = models.ForeignKey(
'MFA',
on_delete=models.SET_NULL,
null=True,
blank=True,
related_name='authenticators',
)
class Meta(ManagedObjectModel.Meta): # pylint: disable=too-few-public-methods
"""
@ -227,9 +233,13 @@ class Authenticator(ManagedObjectModel, TaggingMixin):
ip, version = net.ipToLong(ipStr)
# Allow
if self.net_filtering == Authenticator.ALLOW:
return self.networks.filter(net_start__lte=ip, net_end__gte=ip, version=version).exists()
return self.networks.filter(
net_start__lte=ip, net_end__gte=ip, version=version
).exists()
# Deny, must not be in any network
return self.networks.filter(net_start__lte=ip, net_end__gte=ip).exists() is False
return (
self.networks.filter(net_start__lte=ip, net_end__gte=ip).exists() is False
)
@staticmethod
def all() -> 'models.QuerySet[Authenticator]':
@ -244,7 +254,10 @@ class Authenticator(ManagedObjectModel, TaggingMixin):
Gets authenticator by tag name.
Special tag name "disabled" is used to exclude customAuth
"""
from uds.core.util.config import GlobalConfig
# pylint: disable=import-outside-toplevel
from uds.core.util.config import (
GlobalConfig,
)
if tag is not None:
authsList = Authenticator.objects.filter(small_name=tag).order_by(
@ -264,7 +277,7 @@ class Authenticator(ManagedObjectModel, TaggingMixin):
yield auth
@staticmethod
def beforeDelete(sender, **kwargs) -> None:
def beforeDelete(sender, **kwargs) -> None: # pylint: disable=unused-argument
"""
Used to invoke the Service class "Destroy" before deleting it from database.
@ -273,7 +286,10 @@ class Authenticator(ManagedObjectModel, TaggingMixin):
:note: If destroy raises an exception, the deletion is not taken.
"""
from uds.core.util.permissions import clean
# pylint: disable=import-outside-toplevel
from uds.core.util.permissions import (
clean,
)
toDelete = kwargs['instance']
@ -291,7 +307,6 @@ class Authenticator(ManagedObjectModel, TaggingMixin):
# Clears related permissions
clean(toDelete)
# returns CSV header
@staticmethod
def getCSVHeader(sep: str = ',') -> str:
@ -315,9 +330,8 @@ class Authenticator(ManagedObjectModel, TaggingMixin):
]
)
def __str__(self):
return u"{0} of type {1} (id:{2})".format(self.name, self.data_type, self.id)
return f'{self.name} of type {self.data_type} (id:{self.id})'
# Connects a pre deletion signal to Authenticator

View File

@ -41,6 +41,7 @@ from .util import getSqlDatetime
logger = logging.getLogger(__name__)
def _session_id_generator() -> str:
"""
Generates a new session id
@ -54,7 +55,9 @@ class UserServiceSession(models.Model): # pylint: disable=too-many-public-metho
The value field is a Text field, so we can put whatever we want in it
"""
session_id = models.CharField(max_length=128, db_index=True, default=_session_id_generator, blank=True)
session_id = models.CharField(
max_length=128, db_index=True, default=_session_id_generator, blank=True
)
start = models.DateTimeField(default=getSqlDatetime)
end = models.DateTimeField(null=True, blank=True)
@ -65,7 +68,7 @@ class UserServiceSession(models.Model): # pylint: disable=too-many-public-metho
# "fake" declarations for type checking
# objects: 'models.manager.Manager["UserServiceSession"]'
class Meta:
class Meta: # pylint: disable=too-few-public-methods
"""
Meta class to declare default order and unique multiple field index
"""
@ -79,10 +82,8 @@ class UserServiceSession(models.Model): # pylint: disable=too-many-public-metho
]
def __str__(self) -> str:
return 'Session {}. ({} to {}'.format(
self.session_id, self.start, self.end
)
return f'Session {self.session_id} ({self.start} to {self.end})'
def close(self) -> None:
"""
Ends the session

View File

@ -43,7 +43,7 @@ The registration of modules is done locating subclases of :py:class:`uds.core.au
import logging
from uds.core.util import modfinder
from uds.core.util import modfinder
logger = logging.getLogger(__name__)
@ -53,12 +53,11 @@ def __loadModules():
This imports all packages that are descendant of this package, and, after that,
it register all subclases of service provider as
"""
from uds.core import services
from uds.core import services # pylint: disable=import-outside-toplevel
modfinder.dynamicLoadAndRegisterModules(
services.factory(),
services.ServiceProvider,
__name__
services.factory(), services.ServiceProvider, __name__
)
__loadModules()