1
0
mirror of https://github.com/dkmstr/openuds.git synced 2024-12-22 13:34:04 +03:00

MASSIVE type checking fixes for even more strict checkings

This commit is contained in:
Adolfo Gómez García 2024-02-29 01:33:45 +01:00
parent 05d26c732e
commit a2fc3130cd
No known key found for this signature in database
GPG Key ID: DD1ABF20724CDA23
381 changed files with 835 additions and 1129 deletions

View File

@ -6,7 +6,8 @@
exclude = .*/transports/.*/scripts/.*
mypy_path = $MYPY_CONFIG_FILE_DIR/src
disable_error_code = import, no-any-return, misc, redundant-cast
# Call overload because ForeignKey fields are not being recognized with django-types
disable_error_code = import, no-any-return, misc, redundant-cast, call-overload
strict = True
implicit_reexport = true

13
server/pyrightconfig.json Normal file
View File

@ -0,0 +1,13 @@
{
"include": [
"src"
],
"exclude": [
"**/scripts",
"**/__pycache__",
],
"typeCheckingMode": "strict",
"reportPrivateUsage": false,
"reportUnusedImport": true,
"reportMissingTypeStubs": false,
}

View File

@ -1,6 +1,6 @@
# -*- coding: utf-8 -*-
#
# Copyright (c) 2012-2019 Virtual Cable S.L.
# Copyright (c) 2012-2024 Virtual Cable S.L.U.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without modification,
@ -27,15 +27,9 @@
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
"""
@author: Adolfo Gómez, dkmaster at dkmon dot com
Author: Adolfo Gómez, dkmaster at dkmon dot com
"""
import logging
import typing
import collections.abc
logger = logging.getLogger(__name__)
# pyright: reportUnusedImport=false
# Convenience imports, must be present before initializing handlers
from .handlers import Handler
from .dispatcher import Dispatcher

View File

@ -82,7 +82,7 @@ class HandlerNode:
return ret
class Dispatcher(View): # type: ignore
class Dispatcher(View):
"""
This class is responsible of dispatching REST requests
"""

View File

@ -30,7 +30,6 @@
Author: Adolfo Gómez, dkmaster at dkmon dot com
"""
import typing
import collections.abc
import logging
import codecs
@ -54,27 +53,24 @@ if typing.TYPE_CHECKING:
logger = logging.getLogger(__name__)
class Handler:
"""
REST requests handler base class
"""
raw: typing.ClassVar[
bool
] = False # If true, Handler will return directly an HttpResponse Object
name: typing.ClassVar[
typing.Optional[str]
] = None # If name is not used, name will be the class name in lower case
path: typing.ClassVar[
typing.Optional[str]
] = None # Path for this method, so we can do /auth/login, /auth/logout, /auth/auths in a simple way
authenticated: typing.ClassVar[
bool
] = True # By default, all handlers needs authentication. Will be overwriten if needs_admin or needs_staff,
needs_admin: typing.ClassVar[
bool
] = False # By default, the methods will be accessible by anyone if nothing else indicated
raw: typing.ClassVar[bool] = False # If true, Handler will return directly an HttpResponse Object
name: typing.ClassVar[typing.Optional[str]] = (
None # If name is not used, name will be the class name in lower case
)
path: typing.ClassVar[typing.Optional[str]] = (
None # Path for this method, so we can do /auth/login, /auth/logout, /auth/auths in a simple way
)
authenticated: typing.ClassVar[bool] = (
True # By default, all handlers needs authentication. Will be overwriten if needs_admin or needs_staff,
)
needs_admin: typing.ClassVar[bool] = (
False # By default, the methods will be accessible by anyone if nothing else indicated
)
needs_staff: typing.ClassVar[bool] = False # By default, staff
# For implementing help
@ -85,10 +81,11 @@ class Handler:
_request: 'ExtendedHttpRequestWithUser' # It's a modified HttpRequest
_path: str
_operation: str
_params: dict[str, typing.Any] # This is a deserliazied object from request. Can be anything as 'a' or {'a': 1} or ....
_args: tuple[
str, ...
] # This are the "path" split by /, that is, the REST invocation arguments
_params: dict[
str, typing.Any
] # This is a deserliazied object from request. Can be anything as 'a' or {'a': 1} or ....
# These are the "path" split by /, that is, the REST invocation arguments
_args: list[str]
_kwargs: dict[str, typing.Any] # This are the "path" split by /, that is, the REST invocation arguments
_headers: dict[str, str]
_session: typing.Optional[SessionStore]
@ -108,9 +105,7 @@ class Handler:
*args: str,
**kwargs: typing.Any,
):
logger.debug(
'Data: %s %s %s', self.__class__, self.needs_admin, self.authenticated
)
logger.debug('Data: %s %s %s', self.__class__, self.needs_admin, self.authenticated)
if (
self.needs_admin or self.needs_staff
) and not self.authenticated: # If needs_admin, must also be authenticated
@ -122,13 +117,11 @@ class Handler:
self._path = path
self._operation = method
self._params = params
self._args = args
self._args = list(args) # copy of args
self._kwargs = kwargs
self._headers = {}
self._auth_token = None
if (
self.authenticated
): # Only retrieve auth related data on authenticated handlers
if self.authenticated: # Only retrieve auth related data on authenticated handlers
try:
self._auth_token = self._request.headers.get(consts.auth.AUTH_TOKEN_HEADER, '')
self._session = SessionStore(session_key=self._auth_token)
@ -200,12 +193,12 @@ class Handler:
return self._params
@property
def args(self) -> tuple[str, ...]:
def args(self) -> list[str]:
"""
Returns the args object
"""
return self._args
@property
def session(self) -> 'SessionStore':
if self._session is None:
@ -244,9 +237,7 @@ class Handler:
staff_member = True # Make admins also staff members :-)
# crypt password and convert to base64
passwd = codecs.encode(
CryptoManager().symmetric_encrypt(password, scrambler), 'base64'
).decode()
passwd = codecs.encode(CryptoManager().symmetric_encrypt(password, scrambler), 'base64').decode()
session['REST'] = {
'auth': id_auth,
@ -334,15 +325,11 @@ class Handler:
self._session.accessed = True
self._session.save()
except Exception:
logger.exception(
'Got an exception setting session value %s to %s', key, value
)
logger.exception('Got an exception setting session value %s to %s', key, value)
def is_ip_allowed(self) -> bool:
try:
return net.contains(
GlobalConfig.ADMIN_TRUSTED_SOURCES.get(True), self._request.ip
)
return net.contains(GlobalConfig.ADMIN_TRUSTED_SOURCES.get(True), self._request.ip)
except Exception:
logger.warning(
'Error checking truted ADMIN source: "%s" does not seems to be a valid network string. Using Unrestricted access.',

View File

@ -30,7 +30,6 @@
Author: Adolfo Gómez, dkmaster at dkmon dot com
"""
import typing
import collections.abc
from uds import models
from uds.core import consts
@ -47,7 +46,9 @@ if typing.TYPE_CHECKING:
# If path has ".../services/[uuid]/..." we will replace uuid with "service name" sourrounded by []
# If path has ".../users/[uuid]/..." we will replace uuid with "user name" sourrounded by []
# If path has ".../groups/[uuid]/..." we will replace uuid with "group name" sourrounded by []
UUID_REPLACER = (
UUID_REPLACER: tuple[
tuple[str, type[models.Provider | models.Service | models.ServicePool | models.User | models.Group]], ...
] = (
('providers', models.Provider),
('services', models.Service),
('servicespools', models.ServicePool),

View File

@ -33,7 +33,6 @@
import datetime
import logging
import typing
import collections.abc
from django.utils.translation import gettext_lazy as _

View File

@ -32,7 +32,6 @@
"""
import logging
import typing
import collections.abc
from django.utils.translation import gettext as _

View File

@ -32,12 +32,12 @@
"""
import logging
import typing
import collections.abc
from django.utils.translation import gettext_lazy as _
from uds.core import types, consts
from uds.core.types import permissions
from uds.core.util import ensure
from uds.core.util.log import LogLevel
from uds.models import Server
from uds.core.exceptions.rest import NotFound, RequestError
@ -55,7 +55,7 @@ class ActorTokens(ModelHandler):
model = Server
model_filter = {'type': types.servers.ServerType.ACTOR}
table_title = typing.cast(str, _('Actor tokens'))
table_title = _('Actor tokens')
table_fields = [
# {'token': {'title': _('Token')}},
{'stamp': {'title': _('Date'), 'type': 'datetime'}},
@ -68,9 +68,9 @@ class ActorTokens(ModelHandler):
{'log_level': {'title': _('Log level')}},
]
def item_as_dict(self, item_: 'Model') -> dict[str, typing.Any]:
item = typing.cast(Server, item_)
data = item.data or {}
def item_as_dict(self, item: 'Model') -> dict[str, typing.Any]:
item = ensure.is_instance(item, Server)
data: dict[str, typing.Any] = item.data or {}
log_level_int = data.get('log_level', 2)
if log_level_int < 10000: # Old log level
log_level = LogLevel.from_actor_level(log_level_int).name

View File

@ -401,7 +401,7 @@ class Initialize(ActorV3Action):
# If not found an alias, try to locate on service table
# Not on alias token, try to locate on Service table
if not service:
service = typing.cast('Service', Service.objects.get(token=token))
service = Service.objects.get(token=token)
# If exists, create and alias for it
# Get first mac and, if not exists, get first ip
unique_id = self._params['id'][0].get('mac', self._params['id'][0].get('ip', ''))
@ -835,7 +835,7 @@ class Notify(ActorV3Action):
logger.debug('Args: %s, Params: %s', self._args, self._params)
try:
action = NotifyActionType(self._params['action'])
token = self._params['token'] # pylint: disable=unused-variable # Just to check it exists
_token = self._params['token'] # Just to check it exists
except Exception as e:
# Requested login, logout or whatever
raise exceptions.rest.RequestError('Invalid parameters') from e

View File

@ -38,7 +38,6 @@ import typing
from django.utils.translation import gettext
from django.utils.translation import gettext_lazy as _
from netaddr import N
from uds.core import auths, consts, exceptions, types
from uds.core.environment import Environment
@ -210,12 +209,14 @@ class Authenticators(ModelHandler):
auth = item.get_instance()
canDoSearch = (
# Cast to Any because we want to compare with the default method or if it's overriden
# Cast is neccesary to avoid mypy errors, for example
search_supported = (
type_ == 'user'
and (auth.search_users != auths.Authenticator.search_users)
or (auth.search_groups != auths.Authenticator.search_groups)
and (typing.cast(typing.Any, auth.search_users) != typing.cast(typing.Any, auths.Authenticator.search_users))
or (typing.cast(typing.Any, auth.search_groups) != typing.cast(typing.Any, auths.Authenticator.search_groups))
)
if canDoSearch is False:
if search_supported is False:
raise self.not_supported_response()
if type_ == 'user':

View File

@ -30,7 +30,6 @@
"""
@author: Adolfo Gómez, dkmaster at dkmon dot com
"""
import collections.abc
import datetime
import logging
import typing

View File

@ -32,7 +32,6 @@
"""
import logging
import typing
import collections.abc
from django.utils.translation import gettext_lazy as _
from uds.models import Calendar
@ -60,7 +59,7 @@ class Calendars(ModelHandler):
save_fields = ['name', 'comments', 'tags']
table_title = typing.cast(str, _('Calendars'))
table_title = _('Calendars')
table_fields = [
{
'name': {

View File

@ -28,7 +28,6 @@
"""
@author: Adolfo Gómez, dkmaster at dkmon dot com
"""
import collections.abc
import logging
import typing

View File

@ -31,7 +31,6 @@
@author: Adolfo Gómez, dkmaster at dkmon dot com
"""
import typing
import collections.abc
import logging
from uds.core.util.config import Config as CfgConfig

View File

@ -30,7 +30,6 @@
"""
@author: Adolfo Gómez, dkmaster at dkmon dot com
"""
import collections.abc
import datetime
import logging
import typing
@ -39,7 +38,6 @@ from uds.core import exceptions, types
from uds.core.managers.crypto import CryptoManager
from uds.core.managers.userservice import UserServiceManager
from uds.core.services.exceptions import ServiceNotReadyError
from uds.core.types.requests import ExtendedHttpRequestWithUser
from uds.core.util.rest.tools import match
from uds.REST import Handler
from uds.web.util import services

View File

@ -30,9 +30,7 @@
"""
@author: Adolfo Gómez, dkmaster at dkmon dot com
"""
import collections.abc
import logging
import typing
from uds.core import exceptions, types
from uds.core.ui import gui

View File

@ -32,12 +32,10 @@
"""
import logging
import typing
import collections.abc
from django.utils.translation import gettext_lazy as _, gettext
from uds.models import Image
from uds.core import types
from uds.core.ui import gui
from uds.core.util import ensure
from uds.REST.model import ModelHandler
@ -59,7 +57,7 @@ class Images(ModelHandler):
model = Image
save_fields = ['name', 'data']
table_title = typing.cast(str, _('Image Gallery'))
table_title = _('Image Gallery')
table_fields = [
{
'thumb': {

View File

@ -35,7 +35,7 @@ import logging
import time
import typing
from uds.core import consts, exceptions, types
from uds.core import consts, exceptions
from uds.core.auths.auth import authenticate
from uds.core.managers.crypto import CryptoManager
from uds.core.util.cache import Cache

View File

@ -32,7 +32,6 @@
"""
import logging
import typing
import collections.abc
from django.utils.translation import gettext
from django.utils.translation import gettext_lazy as _
@ -83,7 +82,7 @@ class MetaPools(ModelHandler):
'transport_grouping',
]
table_title = typing.cast(str, _('Meta Pools'))
table_title = _('Meta Pools')
table_fields = [
{'name': {'title': _('Name')}},
{'comments': {'title': _('Comments')}},
@ -116,7 +115,7 @@ class MetaPools(ModelHandler):
# if item does not have an associated service, hide it (the case, for example, for a removed service)
# Access from dict will raise an exception, and item will be skipped
poolGroupId = None
poolGroupName = typing.cast(str, _('Default'))
poolGroupName = _('Default')
poolGroupThumb = DEFAULT_THUMB_BASE64
if item.servicesPoolGroup is not None:
poolGroupId = item.servicesPoolGroup.uuid
@ -204,7 +203,7 @@ class MetaPools(ModelHandler):
},
{
'name': 'servicesPoolGroup_id',
'choices': [gui.choice_image(-1, typing.cast(str, _('Default')), DEFAULT_THUMB_BASE64)]
'choices': [gui.choice_image(-1, _('Default'), DEFAULT_THUMB_BASE64)]
+ gui.sorted_choices(
[
gui.choice_image(v.uuid, v.name, v.thumb64)

View File

@ -31,7 +31,6 @@
"""
import logging
import typing
import collections.abc
from django.utils.translation import gettext as _
@ -147,7 +146,7 @@ class MetaAssignedService(DetailHandler):
element['pool_name'] = item.deployed_service.name
return element
def _getAssignedService(self, metaPool: models.MetaPool, userServiceId: str) -> models.UserService:
def _get_assigned_userservice(self, metaPool: models.MetaPool, userServiceId: str) -> models.UserService:
"""
Gets an assigned service and checks that it belongs to this metapool
If not found, raises InvalidItemException
@ -185,7 +184,7 @@ class MetaAssignedService(DetailHandler):
try:
if not item: # All items
result = {}
result: dict[str, typing.Any] = {}
for k, props in assignedUserServicesForPools():
result[k.uuid] = MetaAssignedService.item_as_dict(parent, k, props)
@ -193,7 +192,7 @@ class MetaAssignedService(DetailHandler):
return MetaAssignedService.item_as_dict(
parent,
self._getAssignedService(parent, item),
self._get_assigned_userservice(parent, item),
props={
k: v
for k, v in models.Properties.objects.filter(
@ -237,7 +236,7 @@ class MetaAssignedService(DetailHandler):
def get_logs(self, parent: 'Model', item: str) -> list[typing.Any]:
parent = ensure.is_instance(parent, models.MetaPool)
try:
asignedService = self._getAssignedService(parent, item)
asignedService = self._get_assigned_userservice(parent, item)
logger.debug('Getting logs for %s', asignedService)
return log.get_logs(asignedService)
except Exception:
@ -245,7 +244,7 @@ class MetaAssignedService(DetailHandler):
def delete_item(self, parent: 'Model', item: str) -> None:
parent = ensure.is_instance(parent, models.MetaPool)
userService = self._getAssignedService(parent, item)
userService = self._get_assigned_userservice(parent, item)
if userService.user:
logStr = 'Deleted assigned service {} to user {} by {}'.format(
@ -274,17 +273,17 @@ class MetaAssignedService(DetailHandler):
raise self.invalid_item_response()
fields = self.fields_from_params(['auth_id', 'user_id'])
service = self._getAssignedService(parent, item)
userservice = self._get_assigned_userservice(parent, item)
user = models.User.objects.get(uuid=process_uuid(fields['user_id']))
logStr = 'Changing ownership of service from {} to {} by {}'.format(
service.user.pretty_name if service.user else 'unknown', user.pretty_name, self._user.pretty_name
userservice.user.pretty_name if userservice.user else 'unknown', user.pretty_name, self._user.pretty_name
)
# If there is another service that has this same owner, raise an exception
if (
service.deployed_service.userServices.filter(user=user)
.exclude(uuid=service.uuid)
userservice.deployed_service.userServices.filter(user=user)
.exclude(uuid=userservice.uuid)
.exclude(state__in=State.INFO_STATES)
.count()
> 0
@ -293,8 +292,8 @@ class MetaAssignedService(DetailHandler):
'There is already another user service assigned to {}'.format(user.pretty_name)
)
service.user = user
service.save()
userservice.user = user
userservice.save()
# Log change
log.log(parent, log.LogLevel.INFO, logStr, log.LogSource.ADMIN)

View File

@ -55,7 +55,7 @@ class MFA(ModelHandler):
model = models.MFA
save_fields = ['name', 'comments', 'tags', 'remember_device', 'validity']
table_title = typing.cast(str, _('Multi Factor Authentication'))
table_title = _('Multi Factor Authentication')
table_fields = [
{'name': {'title': _('Name'), 'visible': True, 'type': 'iconType'}},
{'type_name': {'title': _('Type')}},

View File

@ -32,7 +32,6 @@
"""
import logging
import typing
import collections.abc
from django.utils.translation import gettext_lazy as _, gettext
@ -59,7 +58,7 @@ class Networks(ModelHandler):
model = Network
save_fields = ['name', 'net_string', 'tags']
table_title = typing.cast(str, _('Networks'))
table_title = _('Networks')
table_fields = [
{
'name': {

View File

@ -36,7 +36,6 @@ import collections.abc
from django.utils.translation import gettext
from django.utils.translation import gettext_lazy as _
from traitlets import default
from uds.core import messaging, types
from uds.core.environment import Environment
@ -65,7 +64,7 @@ class Notifiers(ModelHandler):
'enabled',
]
table_title = typing.cast(str, _('Notifiers'))
table_title = _('Notifiers')
table_fields = [
{'name': {'title': _('Name'), 'visible': True, 'type': 'iconType'}},
{'type_name': {'title': _('Type')}},

View File

@ -33,7 +33,6 @@
import json
import logging
import typing
import collections.abc
from django.utils.translation import gettext as _

View File

@ -54,7 +54,7 @@ class OsManagers(ModelHandler):
model = OSManager
save_fields = ['name', 'comments', 'tags']
table_title = typing.cast(str, _('OS Managers'))
table_title = _('OS Managers')
table_fields = [
{'name': {'title': _('Name'), 'visible': True, 'type': 'iconType'}},
{'type_name': {'title': _('Type')}},

View File

@ -81,7 +81,7 @@ class Permissions(Handler):
def as_dict(
perms: collections.abc.Iterable[models.Permissions],
) -> list[dict[str, str]]:
res = []
res: list[dict[str, typing.Any]] = []
entity: typing.Optional[typing.Union[models.User, models.Group]]
for perm in perms:
if perm.user is None:
@ -130,8 +130,6 @@ class Permissions(Handler):
"""
logger.debug('Put args: %s', self._args)
la = len(self._args)
perm = uds.core.types.permissions.PermissionType.from_str(self._params.get('perm', '0'))
def add_user_permission(cls_param: str, obj_param: str, user_param: str) -> list[dict[str, str]]:

View File

@ -66,7 +66,7 @@ class Providers(ModelHandler):
save_fields = ['name', 'comments', 'tags']
table_title = typing.cast(str, _('Service providers'))
table_title = _('Service providers')
# Table info fields
table_fields = [
@ -81,7 +81,8 @@ class Providers(ModelHandler):
# Field from where to get "class" and prefix for that class, so this will generate "row-state-A, row-state-X, ....
table_row_style = types.ui.RowStyleInfo(prefix='row-maintenance-', field='maintenance_mode')
def item_as_dict(self, item: 'Provider') -> types.rest.ItemDictType:
def item_as_dict(self, item: 'Model') -> types.rest.ItemDictType:
item = ensure.is_instance(item, Provider)
type_ = item.get_type()
# Icon can have a lot of data (1-2 Kbytes), but it's not expected to have a lot of services providers, and even so, this will work fine

View File

@ -32,8 +32,6 @@
"""
import logging
import typing
import collections.abc
from click import style
from django.utils.translation import gettext_lazy as _

View File

@ -69,7 +69,7 @@ class ServerRegisterBase(Handler):
# Validate parameters
try:
try:
t = types.servers.ServerType(type)
types.servers.ServerType(type) # try to convert
except ValueError:
raise ValueError(_('Invalid type. Type must be an integer.'))
if len(subtype) > 16:

View File

@ -31,7 +31,6 @@
"""
import logging
import typing
import collections.abc
from django.utils.translation import gettext
from django.utils.translation import gettext_lazy as _

View File

@ -32,7 +32,6 @@
"""
import logging
import typing
import collections.abc
from django.utils.translation import gettext
from django.utils.translation import gettext_lazy as _
@ -65,7 +64,7 @@ class ServicesPoolGroups(ModelHandler):
model = ServicePoolGroup
save_fields = ['name', 'comments', 'image_id', 'priority']
table_title = typing.cast(str, _('Services Pool Groups'))
table_title = _('Services Pool Groups')
table_fields = [
{'priority': {'title': _('Priority'), 'type': 'numeric', 'width': '6em'}},
{

View File

@ -33,7 +33,6 @@
import datetime
import logging
import typing
import collections.abc
from django.db.models import Count, Q
from django.utils.translation import gettext
@ -101,7 +100,7 @@ class ServicesPools(ModelHandler):
remove_fields = ['osmanager_id', 'service_id']
table_title = typing.cast(str, _('Service Pools'))
table_title = _('Service Pools')
table_fields = [
{'name': {'title': _('Name')}},
{'state': {'title': _('Status'), 'type': 'dict', 'dict': State.literals_dict()}},
@ -194,7 +193,7 @@ class ServicesPools(ModelHandler):
# This needs a lot of queries, and really does not apport anything important to the report
# elif UserServiceManager().canInitiateServiceFromDeployedService(item) is False:
# state = State.SLOWED_DOWN
val = {
val: dict[str, typing.Any] = {
'id': item.uuid,
'name': item.name,
'short_name': item.short_name,
@ -614,39 +613,39 @@ class ServicesPools(ModelHandler):
return item.fallbackAccess
# Returns the action list based on current element, for calendar
def actionsList(self, item: 'Model') -> typing.Any:
def actionsList(self, item: 'Model') -> list[types.calendar.CalendarAction]:
item = ensure.is_instance(item, ServicePool)
validActions: tuple[types.calendar.CalendarAction, ...] = ()
valid_actions: list[types.calendar.CalendarAction] = []
itemInfo = item.service.get_type()
if itemInfo.uses_cache is True:
validActions += (
valid_actions += [
consts.calendar.CALENDAR_ACTION_INITIAL,
consts.calendar.CALENDAR_ACTION_CACHE_L1,
consts.calendar.CALENDAR_ACTION_MAX,
)
]
if itemInfo.uses_cache_l2 is True:
validActions += (consts.calendar.CALENDAR_ACTION_CACHE_L2,)
valid_actions += [consts.calendar.CALENDAR_ACTION_CACHE_L2,]
if itemInfo.publication_type is not None:
validActions += (consts.calendar.CALENDAR_ACTION_PUBLISH,)
valid_actions += [consts.calendar.CALENDAR_ACTION_PUBLISH,]
# Transport & groups actions
validActions += (
valid_actions += [
consts.calendar.CALENDAR_ACTION_ADD_TRANSPORT,
consts.calendar.CALENDAR_ACTION_DEL_TRANSPORT,
consts.calendar.CALENDAR_ACTION_DEL_ALL_TRANSPORTS,
consts.calendar.CALENDAR_ACTION_ADD_GROUP,
consts.calendar.CALENDAR_ACTION_DEL_GROUP,
consts.calendar.CALENDAR_ACTION_DEL_ALL_GROUPS,
)
]
# Advanced actions
validActions += (
valid_actions += [
consts.calendar.CALENDAR_ACTION_IGNORE_UNUSED,
consts.calendar.CALENDAR_ACTION_REMOVE_USERSERVICES,
consts.calendar.CALENDAR_ACTION_REMOVE_STUCK_USERSERVICES,
)
return validActions
]
return valid_actions
# Deprecated, use list_assignables
def listAssignables(self, item: 'Model') -> typing.Any:

View File

@ -33,7 +33,6 @@
import logging
import typing
import collections.abc
from django.utils.translation import gettext as _
from uds.core import types

View File

@ -170,10 +170,10 @@ class System(Handler):
'restrained_services_pools': restrained_services_pools,
}
if len(self._args) in (2, 3):
if len(self.args) in (2, 3):
# Extract pool if provided
pool: typing.Optional[models.ServicePool] = None
if len(self._args) == 3:
if len(self.args) == 3:
try:
pool = models.ServicePool.objects.get(uuid=process_uuid(self._args[2]))
except Exception:
@ -186,14 +186,14 @@ class System(Handler):
self._user, typing.cast('Model', pool), types.permissions.PermissionType.READ
):
raise exceptions.rest.AccessDenied()
if self._args[0] == 'stats':
if self._args[1] == 'assigned':
if self.args[0] == 'stats':
if self.args[1] == 'assigned':
return get_servicepools_counters(pool, counters.types.stats.CounterType.ASSIGNED)
elif self._args[1] == 'inuse':
elif self.args[1] == 'inuse':
return get_servicepools_counters(pool, counters.types.stats.CounterType.INUSE)
elif self._args[1] == 'cached':
elif self.args[1] == 'cached':
return get_servicepools_counters(pool, counters.types.stats.CounterType.CACHED)
elif self._args[1] == 'complete':
elif self.args[1] == 'complete':
return {
'assigned': get_servicepools_counters(
pool, counters.types.stats.CounterType.ASSIGNED, since_days=7

View File

@ -33,7 +33,6 @@
import datetime
import logging
import typing
import collections.abc
from uds.REST import Handler
@ -93,9 +92,7 @@ class Tickets(Handler):
needs_admin = True # By default, staff is lower level needed
@staticmethod
def result(
result: str = '', error: typing.Optional[str] = None
) -> dict[str, typing.Any]:
def result(result: str = '', error: typing.Optional[str] = None) -> dict[str, typing.Any]:
"""
Returns a result for a Ticket request
"""
@ -166,9 +163,7 @@ class Tickets(Handler):
# Will raise an exception if no auth found
if authId:
auth = models.Authenticator.objects.get(
uuid=process_uuid(authId.lower())
)
auth = models.Authenticator.objects.get(uuid=process_uuid(authId.lower()))
elif authName:
auth = models.Authenticator.objects.get(name=authName)
else:
@ -217,42 +212,30 @@ class Tickets(Handler):
pool: typing.Union[models.ServicePool, models.MetaPool]
try:
pool = typing.cast(
models.MetaPool, models.MetaPool.objects.get(uuid=poolUuid)
pool = models.MetaPool.objects.get(
uuid=poolUuid
) # If not an metapool uuid, will process it as a servicePool
if force:
# First, add groups to metapool
for addGrp in set(groupIds) - set(
pool.assignedGroups.values_list('uuid', flat=True)
):
for addGrp in set(groupIds) - set(pool.assignedGroups.values_list('uuid', flat=True)):
pool.assignedGroups.add(auth.groups.get(uuid=addGrp))
# And now, to ALL metapool members
for metaMember in pool.members.all():
# Now add groups to pools
for addGrp in set(groupIds) - set(
metaMember.pool.assignedGroups.values_list(
'uuid', flat=True
)
metaMember.pool.assignedGroups.values_list('uuid', flat=True)
):
metaMember.pool.assignedGroups.add(
auth.groups.get(uuid=addGrp)
)
metaMember.pool.assignedGroups.add(auth.groups.get(uuid=addGrp))
# For metapool, transport is ignored..
servicePoolId = 'M' + pool.uuid
except models.MetaPool.DoesNotExist:
pool = typing.cast(
models.ServicePool,
models.ServicePool.objects.get(uuid=poolUuid),
)
pool = models.ServicePool.objects.get(uuid=poolUuid)
# If forced that servicePool must honor groups
if force:
for addGrp in set(groupIds) - set(
pool.assignedGroups.values_list('uuid', flat=True)
):
for addGrp in set(groupIds) - set(pool.assignedGroups.values_list('uuid', flat=True)):
pool.assignedGroups.add(auth.groups.get(uuid=addGrp))
servicePoolId = 'F' + pool.uuid

View File

@ -64,7 +64,7 @@ class Transports(ModelHandler):
'label',
]
table_title = typing.cast(str, _('Transports'))
table_title = _('Transports')
table_fields = [
{'priority': {'title': _('Priority'), 'type': 'numeric', 'width': '6em'}},
{'name': {'title': _('Name'), 'visible': True, 'type': 'iconType'}},

View File

@ -35,9 +35,9 @@ import typing
from uds import models
from uds.core import exceptions, types
from uds.core.auths.auth import is_source_trusted
from uds.core.auths.auth import is_trusted_source
from uds.core.util import log, net
from uds.core.util.model import sql_datetime, sql_stamp_seconds
from uds.core.util.model import sql_stamp_seconds
from uds.core.util.stats import events
from uds.REST import Handler
@ -71,7 +71,7 @@ class TunnelTicket(Handler):
)
if (
not is_source_trusted(self._request.ip)
not is_trusted_source(self._request.ip)
or len(self._args) != 3
or len(self._args[0]) != 48
):

View File

@ -31,13 +31,12 @@
"""
import logging
import typing
import collections.abc
from django.utils.translation import gettext
from django.utils.translation import gettext_lazy as _
import uds.core.types.permissions
from uds.core import types, consts, ui
from uds.core import types, consts
from uds.core.util import permissions, validators, ensure
from uds.core.util.model import process_uuid
from uds import models
@ -45,7 +44,6 @@ from uds.REST.model import DetailHandler, ModelHandler
# Not imported at runtime, just for type checking
if typing.TYPE_CHECKING:
from uds.core.module import Module
from django.db.models import Model
logger = logging.getLogger(__name__)
@ -64,7 +62,7 @@ class TunnelServers(DetailHandler):
q = parent.servers.all().order_by('hostname')
else:
q = parent.servers.filter(uuid=process_uuid(item))
res = []
res: list[dict[str, typing.Any]] = []
i = None
for i in q:
val = {
@ -150,7 +148,7 @@ class Tunnels(ModelHandler):
detail = {'servers': TunnelServers}
save_fields = ['name', 'comments', 'host:', 'port:0']
table_title = typing.cast(str, _('Tunnels'))
table_title = _('Tunnels')
table_fields = [
{'name': {'title': _('Name'), 'visible': True, 'type': 'iconType'}},
{'comments': {'title': _('Comments')}},
@ -216,7 +214,7 @@ class Tunnels(ModelHandler):
item = self._args[-1]
if item is None:
if not item:
raise self.invalid_item_response('No server specified')
try:
@ -226,7 +224,6 @@ class Tunnels(ModelHandler):
except Exception:
raise self.invalid_item_response() from None
# TODO: implement this
return 'ok'
def tunnels(self, parent: 'Model') -> typing.Any:

View File

@ -322,7 +322,6 @@ class Groups(DetailHandler):
def get_items(self, parent: 'Model', item: typing.Optional[str]) -> types.rest.ManyItemsDictType:
parent = ensure.is_instance(parent, models.ServicePool)
group: models.Group
return [
{
'id': group.uuid,
@ -334,7 +333,7 @@ class Groups(DetailHandler):
'type': 'meta' if group.is_meta else 'group',
'auth_name': group.manager.name,
}
for group in parent.assignedGroups.all()
for group in typing.cast(collections.abc.Iterable[models.Group], parent.assignedGroups.all())
]
def get_title(self, parent: 'Model') -> str:

View File

@ -29,6 +29,7 @@
"""
@author: Adolfo Gómez, dkmaster at dkmon dot com
"""
# pyright: reportUnusedImport=false
from .base import BaseModelHandler
from .detail import DetailHandler
from .model import ModelHandler

View File

@ -31,32 +31,25 @@
"""
# pylint: disable=too-many-public-methods
import abc
import fnmatch
import inspect
import logging
import re
import typing
import collections.abc
from types import GeneratorType
from django.db import IntegrityError, models
from django.db import models
from django.utils.translation import gettext as _
from uds.core import consts
from uds.core import exceptions
from uds.core import types
from uds.core.module import Module
from uds.core.util import log, permissions
from uds.core.util.model import process_uuid
from uds.models import ManagedObjectModel, Network, Tag, TaggingMixin
from uds.REST.utils import rest_result
from uds.core.util import permissions
from uds.models import ManagedObjectModel, Network
from ..handlers import Handler
# Not imported at runtime, just for type checking
if typing.TYPE_CHECKING:
from uds.models import User
pass
logger = logging.getLogger(__name__)

View File

@ -31,25 +31,16 @@
"""
# pylint: disable=too-many-public-methods
import abc
import fnmatch
import inspect
import logging
import re
import typing
import collections.abc
from types import GeneratorType
from django.db import IntegrityError, models
from django.db import models
from django.utils.translation import gettext as _
from uds.core import consts
from uds.core import exceptions
from uds.core import types
from uds.core.module import Module
from uds.core.util import log, permissions
from uds.core.util.model import process_uuid
from uds.models import ManagedObjectModel, Network, Tag, TaggingMixin
from uds.REST.utils import rest_result
from .base import BaseModelHandler
@ -62,9 +53,6 @@ if typing.TYPE_CHECKING:
logger = logging.getLogger(__name__)
# Details do not have types at all
# so, right now, we only process details petitions for Handling & tables info
# noinspection PyMissingConstructor
@ -94,7 +82,7 @@ class DetailHandler(BaseModelHandler):
_parent: typing.Optional['ModelHandler']
_path: str
_params: typing.Any # _params is deserialized object from request
_args: tuple[str, ...]
_args: list[str]
_kwargs: dict[str, typing.Any]
_user: 'User'
@ -114,7 +102,7 @@ class DetailHandler(BaseModelHandler):
self._parent = parent_handler
self._path = path
self._params = params
self._args = args
self._args = list(args)
self._kwargs = kwargs
self._user = kwargs.get('user', None)
@ -349,4 +337,3 @@ class DetailHandler(BaseModelHandler):
:return: a list of log elements (normally got using "uds.core.util.log.get_logs" method)
"""
raise self.invalid_method_response()

View File

@ -31,14 +31,11 @@
"""
# pylint: disable=too-many-public-methods
import abc
import fnmatch
import inspect
import logging
import re
import typing
import collections.abc
from types import GeneratorType
from django.db import IntegrityError, models
from django.utils.translation import gettext as _
@ -48,15 +45,12 @@ from uds.core import exceptions
from uds.core import types
from uds.core.module import Module
from uds.core.util import log, permissions
from uds.core.util.model import process_uuid
from uds.models import ManagedObjectModel, Network, Tag, TaggingMixin
from uds.REST.utils import rest_result
from uds.models import ManagedObjectModel, Tag, TaggingMixin
from .base import BaseModelHandler
# Not imported at runtime, just for type checking
if typing.TYPE_CHECKING:
from uds.models import User
from .detail import DetailHandler
logger = logging.getLogger(__name__)

View File

@ -32,11 +32,9 @@
"""
import collections.abc
import datetime
from gc import collect
import json
import logging
import time
import types
import typing
from django.http import HttpResponse

View File

@ -29,7 +29,6 @@
Author: Adolfo Gómez, dkmaster at dkmon dot com
'''
import typing
import collections.abc
from uds.core.consts.system import VERSION
from uds.core.util.model import sql_stamp_seconds

View File

@ -30,6 +30,7 @@
"""
@author: Adolfo Gómez, dkmaster at dkmon dot com
"""
# pyright: reportUnusedImport=false
# Make sure that all services are "available" at service startup
import logging

View File

@ -50,8 +50,6 @@ if typing.TYPE_CHECKING:
def index(request: 'HttpRequest') -> HttpResponse:
# Gets csrf token
csrf_token = csrf.get_token(request)
if csrf_token is not None:
csrf_token = str(csrf_token)
return render(
request,

View File

@ -32,5 +32,6 @@
@author: Adolfo Gómez, dkmaster at dkmon dot com
"""
# pyright: reportUnusedImport=false
from uds.core.util.config import Config
from .authenticator import IPAuth

View File

@ -33,11 +33,10 @@
"""
import logging
import typing
import collections.abc
from django.utils.translation import gettext_noop as _
from uds.core import auths, types, consts
from uds.core import auths, types
from uds.core.ui import gui
from uds.core.util import net
@ -88,13 +87,13 @@ class IPAuth(auths.Authenticator):
ip = ip.split(':')[-1]
return ip
def get_groups(self, username: str, groupsManager: 'auths.GroupsManager') -> None:
def get_groups(self, username: str, groups_manager: 'auths.GroupsManager') -> None:
# these groups are a bit special. They are in fact ip-ranges, and we must check that the ip is in betwen
# The ranges are stored in group names
for g in groupsManager.enumerate_groups_name():
for g in groups_manager.enumerate_groups_name():
try:
if net.contains(g, username):
groupsManager.validate(g)
groups_manager.validate(g)
except Exception as e:
logger.error('Invalid network for IP auth: %s', e)
@ -102,12 +101,12 @@ class IPAuth(auths.Authenticator):
self,
username: str,
credentials: str, # pylint: disable=unused-argument
groupsManager: 'auths.GroupsManager',
groups_manager: 'auths.GroupsManager',
request: 'types.requests.ExtendedHttpRequest',
) -> types.auth.AuthenticationResult:
# If credentials is a dict, that can't be sent directly from web interface, we allow entering
if username == self.getIp(request):
self.get_groups(username, groupsManager)
self.get_groups(username, groups_manager)
return types.auth.SUCCESS_AUTH
return types.auth.FAILED_AUTH

View File

@ -27,9 +27,8 @@
# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
"""
@author: Adolfo Gómez, dkmaster at dkmon dot com
"""
# pyright: reportUnusedImport=false
from .authenticator import InternalDBAuth

View File

@ -32,7 +32,6 @@
@author: Adolfo Gómez, dkmaster at dkmon dot com
"""
import collections.abc
import logging
import typing
@ -40,7 +39,7 @@ import dns.resolver
import dns.reversename
from django.utils.translation import gettext_noop as _
from uds.core import auths, consts, exceptions, types
from uds.core import auths, types
from uds.core.auths.auth import log_login
from uds.core.managers.crypto import CryptoManager
from uds.core.types.states import State
@ -100,7 +99,7 @@ class InternalDBAuth(auths.Authenticator):
ip = request.ip_proxy if self.accepts_proxy.as_bool() else request.ip # pylint: disable=maybe-no-member
if self.reverse_dns.as_bool():
try:
return str(dns.resolver.query(dns.reversename.from_address(ip).to_text(), 'PTR')[0])
return str(dns.resolver.query(dns.reversename.from_address(ip).to_text(), 'PTR')[0]) # pyright: ignore[reportUnknownArgumentType]
except Exception:
# if we can't get the reverse, we will use the ip
pass
@ -175,7 +174,7 @@ class InternalDBAuth(auths.Authenticator):
log_login(request, self.db_obj(), username, 'Invalid password')
return types.auth.FAILED_AUTH
def get_groups(self, username: str, groupsManager: 'auths.GroupsManager') -> None:
def get_groups(self, username: str, groups_manager: 'auths.GroupsManager') -> None:
dbAuth = self.db_obj()
try:
user: 'models.User' = dbAuth.users.get(name=username.lower(), state=State.ACTIVE)
@ -188,7 +187,7 @@ class InternalDBAuth(auths.Authenticator):
grps.extend([g.name for g in parent.groups.all()])
except Exception:
pass
groupsManager.validate(grps)
groups_manager.validate(grps)
def get_real_name(self, username: str) -> str:
# Return the real name of the user, if it is set
@ -198,7 +197,7 @@ class InternalDBAuth(auths.Authenticator):
except Exception:
return super().get_real_name(username)
def create_user(self, usrData: dict[str, typing.Any]) -> None:
def create_user(self, user_data: dict[str, typing.Any]) -> None:
pass
@staticmethod

View File

@ -34,4 +34,5 @@ take care of registering it as provider
Author: Adolfo Gómez, dkmaster at dkmon dot com
"""
# pyright: reportUnusedImport=false
from .authenticator import OAuth2Authenticator

View File

@ -41,22 +41,18 @@ import datetime
import urllib.parse
from base64 import b64decode
import defusedxml.ElementTree as etree
import jwt
import requests
from django.utils.translation import gettext
from django.utils.translation import gettext_noop as _
from uds.core import auths, consts, exceptions, types
from uds.core.managers.crypto import CryptoManager
from uds.core.ui import gui
from uds.core.util import fields, model, auth as auth_utils
if typing.TYPE_CHECKING:
from django.http import HttpRequest
from cryptography.x509 import Certificate
logger = logging.getLogger(__name__)
# Alphabet used for PKCE
@ -64,6 +60,7 @@ PKCE_ALPHABET: typing.Final[str] = string.ascii_letters + string.digits + '-._~'
# Length of the State parameter
STATE_LENGTH: typing.Final[int] = 16
@dataclasses.dataclass
class TokenInfo:
access_token: str
@ -450,7 +447,9 @@ class OAuth2Authenticator(auths.Authenticator):
if self.responseType.value in ('code', 'pkce', 'openid+code'):
if self.commonGroups.value.strip() == '':
raise exceptions.ui.ValidationError(gettext('Common groups is required for "code" response types'))
raise exceptions.ui.ValidationError(
gettext('Common groups is required for "code" response types')
)
if self.tokenEndpoint.value.strip() == '':
raise exceptions.ui.ValidationError(
gettext('Token endpoint is required for "code" response types')

View File

@ -26,12 +26,11 @@
# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
"""
Sample authenticator. We import here the module, and uds.auths module will
take care of registering it as provider
Author: Adolfo Gómez, dkmaster at dkmon dot com
"""
# pyright: reportUnusedImport=false
from .authenticator import RadiusAuth

View File

@ -32,11 +32,10 @@ Author: Adolfo Gómez, dkmaster at dkmon dot com
"""
import logging
import typing
import collections.abc
from django.utils.translation import gettext_noop as _
from uds.core import auths, environment, types, consts
from uds.core import auths, environment, types
from uds.core.auths.auth import log_login
from uds.core.managers.crypto import CryptoManager
from uds.core.ui import gui
@ -45,8 +44,7 @@ from . import client
if typing.TYPE_CHECKING:
from uds.core.types.requests import ExtendedHttpRequest
from uds.core.environment import Environment
logger = logging.getLogger(__name__)
@ -146,7 +144,7 @@ class RadiusAuth(auths.Authenticator):
self,
username: str,
credentials: str,
groupsManager: 'auths.GroupsManager',
groups_manager: 'auths.GroupsManager',
request: 'ExtendedHttpRequest',
) -> types.auth.AuthenticationResult:
try:
@ -181,15 +179,15 @@ class RadiusAuth(auths.Authenticator):
storage[username] = groups
# Validate groups
groupsManager.validate(groups)
groups_manager.validate(groups)
return types.auth.SUCCESS_AUTH
def get_groups(self, username: str, groupsManager: 'auths.GroupsManager') -> None:
def get_groups(self, username: str, groups_manager: 'auths.GroupsManager') -> None:
with self.storage.as_dict() as storage:
groupsManager.validate(storage.get(username, []))
groups_manager.validate(storage.get(username, []))
def create_user(self, usrData: dict[str, str]) -> None:
def create_user(self, user_data: dict[str, str]) -> None:
pass
def remove_user(self, username: str) -> None:

View File

@ -1,3 +1,36 @@
# -*- coding: utf-8 -*-
#
# Copyright (c) 2021 Virtual Cable S.L.U.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without modification,
# are permitted provided that the following conditions are met:
#
# * Redistributions of source code must retain the above copyright notice,
# this list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above copyright notice,
# this list of conditions and the following disclaimer in the documentation
# and/or other materials provided with the distribution.
# * Neither the name of Virtual Cable S.L.U. nor the names of its contributors
# may be used to endorse or promote products derived from this software
# without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
# DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
# SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
"""
Author: Adolfo Gómez, dkmaster at dkmon dot com
"""
# pyright: reportUnknownMemberType=false
import dataclasses
import io
import logging

View File

@ -26,10 +26,8 @@
# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
"""
@author: Adolfo Gómez, dkmaster at dkmon dot com
"""
# pyright: reportUnusedImport=false
from .authenticator import RegexLdap

View File

@ -32,14 +32,12 @@
@author: Adolfo Gómez, dkmaster at dkmon dot com
"""
import logging
import re
import typing
import collections.abc
import ldap
from django.utils.translation import gettext_noop as _
from uds.core import auths, environment, exceptions, types, consts
from uds.core import auths, environment, exceptions, types
from uds.core.auths.auth import log_login
from uds.core.ui import gui
from uds.core.util import ensure, ldaputil, auth as auth_utils, fields
@ -52,8 +50,6 @@ except Exception:
# Not imported at runtime, just for type checking
if typing.TYPE_CHECKING:
from uds import models
from uds.core.environment import Environment
from uds.core.types.requests import ExtendedHttpRequest
logger = logging.getLogger(__name__)
@ -368,7 +364,7 @@ class RegexLdap(auths.Authenticator):
self,
username: str,
credentials: str,
groupsManager: 'auths.GroupsManager',
groups_manager: 'auths.GroupsManager',
request: 'ExtendedHttpRequest',
) -> types.auth.AuthenticationResult:
"""
@ -404,7 +400,7 @@ class RegexLdap(auths.Authenticator):
usr[self.mfa_attribute.value][0],
)
groupsManager.validate(self._get_groups(usr))
groups_manager.validate(self._get_groups(usr))
return types.auth.SUCCESS_AUTH

View File

@ -29,5 +29,6 @@
"""
Author: Adolfo Gómez, dkmaster at dkmon dot com
"""
# pyright: reportUnusedImport=false
from uds.core import managers
from .saml import SAMLAuthenticator # import for registration on space,

View File

@ -44,16 +44,15 @@ from onelogin.saml2.auth import OneLogin_Saml2_Auth
from onelogin.saml2.idp_metadata_parser import OneLogin_Saml2_IdPMetadataParser
from onelogin.saml2.settings import OneLogin_Saml2_Settings
from uds.core import auths, exceptions, types, consts
from uds.core import auths, exceptions, types
from uds.core.managers.crypto import CryptoManager
from uds.core.types.requests import ExtendedHttpRequest
from uds.core.ui import gui
from uds.core.util import security, decorators, ensure, auth as auth_utils
from uds.core.util import security, decorators, auth as auth_utils
from uds.core.util.model import sql_datetime
# Not imported at runtime, just for type checking
if typing.TYPE_CHECKING:
from django.http import HttpRequest
from urllib.parse import ParseResult
from uds.core.types.requests import ExtendedHttpRequestWithUser
@ -822,4 +821,4 @@ class SAMLAuthenticator(auths.Authenticator):
Clean ups storage data
"""
self.storage.remove(username)
self.mfa_clean(username)
self.mfa_clean(username)

View File

@ -34,6 +34,5 @@ take care of registering it as provider
Author: Adolfo Gómez, dkmaster at dkmon dot com
"""
# pyright: reportUnusedImport=false
from .sample_auth import SampleAuth
__updated__ = '2014-02-19'

View File

@ -37,13 +37,12 @@ import collections.abc
from django.utils.translation import gettext_noop as _
from uds.core.types.requests import ExtendedHttpRequest
from uds.core.ui import gui
from uds.core import auths, exceptions, types, consts
from uds.core import auths, exceptions, types
if typing.TYPE_CHECKING:
from django.http import (
HttpRequest,
) # pylint: disable=ungrouped-imports
from uds.core.types.requests import ExtendedHttpRequestWithUser
from uds.core.auths.groups_manager import GroupsManager
@ -161,7 +160,7 @@ class SampleAuth(auths.Authenticator):
self,
username: str,
credentials: str,
groupsManager: 'GroupsManager',
groups_manager: 'GroupsManager',
request: 'ExtendedHttpRequest', # pylint: disable=unused-argument
) -> types.auth.AuthenticationResult:
"""
@ -212,13 +211,13 @@ class SampleAuth(auths.Authenticator):
# two letters equals to the groups names known by UDS
# For this, we will ask the groups manager for the groups names, and will check that and,
# if the user match this criteria, will mark that group as valid
for g in groupsManager.enumerate_groups_name():
for g in groups_manager.enumerate_groups_name():
if len(set(g.lower()).intersection(username.lower())) >= 2:
groupsManager.validate(g)
groups_manager.validate(g)
return types.auth.SUCCESS_AUTH
def get_groups(self, username: str, groupsManager: 'auths.GroupsManager') -> None:
def get_groups(self, username: str, groups_manager: 'auths.GroupsManager') -> None:
"""
As with authenticator part related to groupsManager, this
method will fill the groups to which the specified username belongs to.
@ -229,9 +228,9 @@ class SampleAuth(auths.Authenticator):
In our case, we simply repeat the process that we also do at authenticate
"""
for g in groupsManager.enumerate_groups_name():
for g in groups_manager.enumerate_groups_name():
if len(set(g.lower()).intersection(username.lower())) >= 2:
groupsManager.validate(g)
groups_manager.validate(g)
def get_javascript(self, request: 'HttpRequest') -> typing.Optional[str]: # pylint: disable=unused-argument
"""
@ -263,7 +262,7 @@ class SampleAuth(auths.Authenticator):
def auth_callback(
self,
parameters: 'types.auth.AuthCallbackParams',
gm: 'GroupsManager',
groups_manager: 'GroupsManager',
request: 'types.requests.ExtendedHttpRequest',
) -> types.auth.AuthenticationResult:
"""
@ -283,7 +282,7 @@ class SampleAuth(auths.Authenticator):
return types.auth.AuthenticationResult(types.auth.AuthenticationState.SUCCESS, username=user)
def create_user(self, usrData: dict[str, str]) -> None:
def create_user(self, user_data: dict[str, str]) -> None:
"""
This method provides a "check oportunity" to authenticators for users created
manually at administration interface.
@ -301,10 +300,10 @@ class SampleAuth(auths.Authenticator):
"""
from uds.core.types.states import State # pylint: disable=import-outside-toplevel
usrData['real_name'] = usrData['name'] + ' ' + usrData['name']
usrData['state'] = State.INACTIVE
user_data['real_name'] = user_data['name'] + ' ' + user_data['name']
user_data['state'] = State.INACTIVE
def modify_user(self, usrData: dict[str, str]) -> None:
def modify_user(self, user_data: dict[str, str]) -> None:
"""
This method provides a "check opportunity" to authenticator for users modified
at administration interface.
@ -321,5 +320,5 @@ class SampleAuth(auths.Authenticator):
Here, we will simply update the realName of the user, and (we have to take care
this this kind of things) modify the userName to a new one, the original plus '-1'
"""
usrData['real_name'] = usrData['name'] + ' ' + usrData['name']
usrData['name'] += '-1'
user_data['real_name'] = user_data['name'] + ' ' + user_data['name']
user_data['name'] += '-1'

View File

@ -26,10 +26,8 @@
# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
'''
@author: Adolfo Gómez, dkmaster at dkmon dot com
Author: Adolfo Gómez, dkmaster at dkmon dot com
'''
# pyright: reportUnusedImport=false
from .authenticator import SimpleLDAPAuthenticator

View File

@ -33,11 +33,11 @@ import logging
import typing
import collections.abc
import ldap
import ldap # pyright: ignore # Needed to import ldap.filter without errors
import ldap.filter
from django.utils.translation import gettext_noop as _
from uds.core import auths, environment, types, consts, exceptions
from uds.core import auths, environment, types, exceptions
from uds.core.auths.auth import log_login
from uds.core.ui import gui
from uds.core.util import ensure, fields, ldaputil, validators
@ -351,7 +351,7 @@ class SimpleLDAPAuthenticator(auths.Authenticator):
self,
username: str,
credentials: str,
groupsManager: 'auths.GroupsManager',
groups_manager: 'auths.GroupsManager',
request: 'ExtendedHttpRequest',
) -> types.auth.AuthenticationResult:
'''
@ -385,7 +385,7 @@ class SimpleLDAPAuthenticator(auths.Authenticator):
user[self.mfa_attribute.as_str()][0],
)
groupsManager.validate(self._get_groups(user))
groups_manager.validate(self._get_groups(user))
return types.auth.SUCCESS_AUTH

View File

@ -32,6 +32,7 @@ UDS authentication related interfaces and classes
Author: Adolfo Gómez, dkmaster at dkmon dot com
"""
# pyright: reportUnusedImport=false
from .authenticator import (
Authenticator,
)

View File

@ -180,7 +180,7 @@ def web_login_required(
# Helper for checking if requests is from trusted source
def is_source_trusted(ip: str) -> bool:
def is_trusted_source(ip: str) -> bool:
return net.contains(GlobalConfig.TRUSTED_SOURCES.get(True), ip)
@ -198,7 +198,7 @@ def needs_trusted_source(
Wrapped function for decorator
"""
try:
if not is_source_trusted(request.ip):
if not is_trusted_source(request.ip):
return HttpResponseForbidden()
except Exception:
logger.warning(
@ -240,7 +240,7 @@ def register_user(
usr = authenticator.get_or_create_user(username, username)
usr.real_name = authInstance.get_real_name(username)
usr.save()
if usr is not None and State.from_str(usr.state).is_active():
if usr and State.from_str(usr.state).is_active():
# Now we update database groups for this user
usr.get_manager().recreate_groups(usr)
# And add an login event
@ -380,10 +380,8 @@ def authenticate_info_url(authenticator: typing.Union[str, bytes, models.Authent
name = authenticator
elif isinstance(authenticator, bytes):
name = authenticator.decode('utf8')
elif isinstance(authenticator, models.Authenticator):
name = authenticator.small_name
else:
raise ValueError('Invalid authenticator type')
name = authenticator.small_name
return reverse('page.auth.info', kwargs={'authenticator_name': name})
@ -540,7 +538,7 @@ def log_login(
def log_logout(request: 'ExtendedHttpRequest') -> None:
if request.user:
if request.user.manager.id is not None:
if request.user.manager.id:
log.log(
request.user.manager,
log.LogLevel.INFO,

View File

@ -322,7 +322,7 @@ class Authenticator(Module):
self,
username: str,
credentials: str,
groupsManager: 'GroupsManager',
groups_manager: 'GroupsManager',
request: 'types.requests.ExtendedHttpRequest',
) -> types.auth.AuthenticationResult:
"""
@ -364,7 +364,7 @@ class Authenticator(Module):
"""
return types.auth.FAILED_AUTH
def is_ip_allowed(self, request: 'HttpRequest') -> bool:
def is_ip_allowed(self, request: 'types.requests.ExtendedHttpRequest') -> bool:
"""
Used by the login interface to determine if the authenticator is visible on the login page.
"""
@ -372,7 +372,7 @@ class Authenticator(Module):
if not self.db_obj().id:
return True
return self.db_obj().state != consts.auth.DISABLED and self.db_obj().is_ip_allowed(
typing.cast('types.requests.ExtendedHttpRequest', request).ip
request.ip
)
def transformed_username(

View File

@ -31,7 +31,6 @@
@author: Adolfo Gómez, dkmaster at dkmon dot com
"""
import typing
import collections.abc
from uds.core.util import factory

View File

@ -33,7 +33,6 @@ Author: Adolfo Gómez, dkmaster at dkmon dot com
"""
import logging
import typing
import collections.abc
# Imports for type checking
if typing.TYPE_CHECKING:

View File

@ -32,7 +32,6 @@ Author: Adolfo Gómez, dkmaster at dkmon dot com
"""
import logging
import typing
import collections.abc
from .group import Group
from .groups_manager import GroupsManager

View File

@ -30,7 +30,7 @@
"""
Author: Adolfo Gómez, dkmaster at dkmon dot com
"""
import collections.abc
# pyright: reportUnusedImport=false
import time
import typing
from datetime import datetime

View File

@ -31,7 +31,6 @@
Author: Adolfo Gómez, dkmaster at dkmon dot com
"""
import typing
import collections.abc
MANAGED: typing.Final[str] = 'managed'
UNMANAGED: typing.Final[str] = 'unmanaged' # matches the definition of UDS Actors OFC

View File

@ -30,9 +30,7 @@
"""
Author: Adolfo Gómez, dkmaster at dkmon dot com
"""
from re import X
import typing
import collections.abc
# Constants for Visibility
VISIBLE: typing.Final[str] = 'v'

View File

@ -30,9 +30,7 @@
"""
Author: Adolfo Gómez, dkmaster at dkmon dot com
"""
from pickle import LONG
import typing
import collections.abc
# Default timeouts, in seconds

View File

@ -31,7 +31,6 @@
Author: Adolfo Gómez, dkmaster at dkmon dot com
"""
import typing
import collections.abc
# Request related timeouts, etc..
DEFAULT_REQUEST_TIMEOUT: typing.Final[int] = 20 # In seconds

View File

@ -32,7 +32,6 @@ Author: Adolfo Gómez, dkmaster at dkmon dot com
"""
import re
import typing
import collections.abc
from uds.core import types

View File

@ -31,7 +31,6 @@
Author: Adolfo Gómez, dkmaster at dkmon dot com
"""
import typing
import collections.abc
# REST API requests (..../REST/.../overview, ..../REST/.../types, etc)
OVERVIEW: typing.Final[str] = 'overview'

View File

@ -32,18 +32,17 @@ Author: Adolfo Gómez, dkmaster at dkmon dot com
"""
import time
import typing
import collections.abc
from django.conf import settings
if typing.TYPE_CHECKING:
from uds.core.util.unique_id_generator import UniqueGenerator
pass
# UDS Version related
VERSION = '4.0.0'
VERSION_STAMP = f'{time.strftime("%Y%m%d")}'
VERSION: typing.Final[str] = '4.0.0'
VERSION_STAMP: typing.Final[str] = f'{time.strftime("%Y%m%d")}'
# Minimal uds client version required to connect to this server
REQUIRED_CLIENT_VERSION = '3.6.0'
REQUIRED_CLIENT_VERSION: typing.Final[str] = '3.6.0'
# Max size of a rest request body
MAX_REQUEST_SIZE: typing.Final[int] = int(

View File

@ -31,7 +31,6 @@
Author: Adolfo Gómez, dkmaster at dkmon dot com
"""
import typing
import collections.abc
# Old encryption key, log ago deprecated, but still here for reading old data
UDSB: typing.Final[bytes] = b'udsprotect'

View File

@ -30,6 +30,7 @@
"""
Author: Adolfo Gómez, dkmaster at dkmon dot com
"""
# pyright: reportUnusedImport=false
from . import actor
from . import auth
from . import service

View File

@ -32,8 +32,8 @@ UDS jobs related modules
Author: Adolfo Gómez, dkmaster at dkmon dot com
"""
# pyright: reportUnusedImport=false
import typing
import collections.abc
from .job import Job
from .delayed_task import DelayedTask

View File

@ -31,7 +31,6 @@
@author: Adolfo Gómez, dkmaster at dkmon dot com
"""
import typing
import collections.abc
import logging
from uds.core.environment import Environmentable, Environment

View File

@ -36,7 +36,6 @@ from socket import gethostname
from datetime import timedelta
import logging
import typing
import collections.abc
from django.db import connections
from django.db import transaction, OperationalError

View File

@ -30,7 +30,6 @@
"""
import logging
import typing
import collections.abc
from uds.core.environment import Environmentable
from uds.core.util.config import Config

View File

@ -33,7 +33,6 @@
import datetime
import logging
import typing
import collections.abc
from uds.core.util import factory

View File

@ -31,7 +31,6 @@
@author: Adolfo Gómez, dkmaster at dkmon dot com
"""
import typing
import collections.abc
import platform
import threading
import time

View File

@ -33,11 +33,9 @@ UDS managers (downloads, users publications, ...)
Author: Adolfo Gómez, dkmaster at dkmon dot com
"""
import typing
import collections.abc
# Imports for type checking only (not on runtime), we have later to get rid of false "redefined outer names" for pylint
if typing.TYPE_CHECKING:
from .crypto import CryptoManager
from .task import TaskManager
from .downloads import DownloadsManager
from .log import LogManager
@ -62,11 +60,13 @@ def log_manager() -> 'LogManager':
return LogManager()
def publication_manager() -> 'PublicationManager':
from .publication import PublicationManager
return PublicationManager()
def notifications_manager() -> 'NotificationsManager':
from .notifications import NotificationsManager

View File

@ -39,7 +39,6 @@ import re
import string
import logging
import typing
import collections.abc
import secrets
# For password secrets

View File

@ -33,8 +33,6 @@
import os
import logging
import typing
import collections.abc
from wsgiref.util import FileWrapper
from django.http import HttpResponse, Http404, HttpRequest

View File

@ -25,5 +25,8 @@
# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
"""
Author: Adolfo Gómez, dkmaster at dkmon dot com
"""
# pyright: reportUnusedImport=false
from .manager import LogManager

View File

@ -31,7 +31,6 @@
"""
# import traceback
import typing
import collections.abc
import logging
from uds.core.util import singleton
@ -44,7 +43,6 @@ from uds.core.types.log import LogObjectType
# Not imported at runtime, just for type checking
if typing.TYPE_CHECKING:
from django.db.models import Model
from uds import models
logger = logging.getLogger(__name__)
@ -71,8 +69,6 @@ class LogManager(metaclass=singleton.Singleton):
# Ensure message fits on space
message = str(message)[:4096]
qs = Log.objects.filter(owner_id=owner_id, owner_type=owner_type.value)
# now, we add new log
try:
Log.objects.create(

View File

@ -31,7 +31,6 @@ Author: Adolfo Gómez, dkmaster at dkmon dot com
"""
import logging
import typing
import collections.abc
from django.apps import apps
from django.db import connections
@ -40,7 +39,7 @@ from uds.core.util import singleton
from uds.core.util.log import LogLevel
if typing.TYPE_CHECKING:
from ..messaging import provider
pass
logger = logging.getLogger(__name__)

View File

@ -30,7 +30,6 @@
@author: Adolfo Gómez, dkmaster at dkmon dot com
"""
import typing
import collections.abc
import logging
import datetime

View File

@ -31,7 +31,6 @@ Author: Adolfo Gómez, dkmaster at dkmon dot com
import datetime
import logging
import typing
import collections.abc
from concurrent.futures import ThreadPoolExecutor
from django.db import transaction

View File

@ -34,7 +34,6 @@ import time
import signal
import logging
import typing
import collections.abc
from django.db import connection
from uds.core.jobs.scheduler import Scheduler
@ -105,7 +104,7 @@ class TaskManager(metaclass=singleton.Singleton):
logger.info("Registering sheduled tasks")
# Simply import this to make workers "register" themselves
from uds.core import workers # pylint: disable=unused-import, import-outside-toplevel
from uds.core import workers # pyright: ignore[reportUnusedImport]
def add_other_tasks(self) -> None:
logger.info("Registering other tasks")

View File

@ -34,7 +34,6 @@ import logging
import operator
import random
import typing
import collections.abc
from django.db import transaction
from django.db.models import Q
@ -460,7 +459,7 @@ class UserServiceManager(metaclass=singleton.Singleton):
# Now find if there is a preparing one
with transaction.atomic():
caches = (
caches = list(
service_pool.cached_users_services()
.select_for_update()
.filter(cache_level=services.UserService.L1_CACHE, state=State.PREPARING)[:1]

View File

@ -28,7 +28,6 @@
"""
Author: Adolfo Gómez, dkmaster at dkmon dot com
"""
import base64
import json
import logging
import os
@ -97,9 +96,9 @@ def _execute_actor_request(
verify=verify,
timeout=TIMEOUT,
)
if verify:
if not(isinstance(verify, bool)):
try:
os.remove(typing.cast(str, verify))
os.remove(verify)
except Exception:
logger.exception('removing verify')
js = r.json()

View File

@ -33,7 +33,6 @@
import abc
import logging
import typing
import collections.abc
from uds.core import services, types
from uds.core.jobs.delayed_task import DelayedTask

View File

@ -29,6 +29,7 @@
"""
Author: Adolfo Gómez, dkmaster at dkmon dot com
"""
# pyright: reportUnusedImport=false
from .provider import Notifier, LogLevel
from .msgfactory import NotifierFactory

Some files were not shown because too many files have changed in this diff Show More