1
0
mirror of https://github.com/dkmstr/openuds.git synced 2025-03-24 22:50:28 +03:00

fixing minor possible issues with staff members && reformating code, adapting to type checking, etc...

This commit is contained in:
Adolfo Gómez García 2020-12-17 13:56:15 +01:00
parent 6b3d222a12
commit dcdea31061
19 changed files with 269 additions and 142 deletions

View File

@ -46,6 +46,7 @@ from .users_groups import Users, Groups
# Not imported at runtime, just for type checking
if typing.TYPE_CHECKING:
from django.db import models
from uds.core import Module
logger = logging.getLogger(__name__)
@ -78,13 +79,13 @@ class Authenticators(ModelHandler):
def typeInfo(self, type_: typing.Type['Module']) -> typing.Dict[str, typing.Any]:
if issubclass(type_, auths.Authenticator):
return {
'canSearchUsers': type_.searchUsers != auths.Authenticator.searchUsers,
'canSearchGroups': type_.searchGroups != auths.Authenticator.searchGroups,
'canSearchUsers': type_.searchUsers != auths.Authenticator.searchUsers, # type: ignore
'canSearchGroups': type_.searchGroups != auths.Authenticator.searchGroups, # type: ignore
'needsPassword': type_.needsPassword,
'userNameLabel': _(type_.userNameLabel),
'groupNameLabel': _(type_.groupNameLabel),
'passwordLabel': _(type_.passwordLabel),
'canCreateUsers': type_.createUser != auths.Authenticator.createUser,
'canCreateUsers': type_.createUser != auths.Authenticator.createUser, # type: ignore
'isExternal': type_.isExternalSource,
}
# Not of my type
@ -94,16 +95,24 @@ class Authenticators(ModelHandler):
try:
tgui = auths.factory().lookup(type_)
if tgui:
g = self.addDefaultFields(tgui.guiDescription(), ['name', 'comments', 'tags', 'priority', 'small_name'])
self.addField(g, {
'name': 'visible',
'value': True,
'label': ugettext('Visible'),
'tooltip': ugettext('If active, transport will be visible for users'),
'type': gui.InputField.CHECKBOX_TYPE,
'order': 107,
'tab': ugettext('Display'),
})
g = self.addDefaultFields(
tgui.guiDescription(),
['name', 'comments', 'tags', 'priority', 'small_name'],
)
self.addField(
g,
{
'name': 'visible',
'value': True,
'label': ugettext('Visible'),
'tooltip': ugettext(
'If active, transport will be visible for users'
),
'type': gui.InputField.CHECKBOX_TYPE,
'order': 107,
'tab': ugettext('Display'),
},
)
return g
raise Exception() # Not found
except Exception:
@ -124,7 +133,7 @@ class Authenticators(ModelHandler):
'type': type_.type(),
'type_name': type_.name(),
'type_info': self.typeInfo(type_),
'permission': permissions.getEffectivePermission(self._user, item)
'permission': permissions.getEffectivePermission(self._user, item),
}
# Custom "search" method
@ -141,7 +150,11 @@ class Authenticators(ModelHandler):
auth = item.getInstance()
canDoSearch = type_ == 'user' and (auth.searchUsers != auths.Authenticator.searchUsers) or (auth.searchGroups != auths.Authenticator.searchGroups)
canDoSearch = (
type_ == 'user'
and (auth.searchUsers != auths.Authenticator.searchUsers) # type: ignore
or (auth.searchGroups != auths.Authenticator.searchGroups) # type: ignore
)
if canDoSearch is False:
raise self.notSupported()
@ -161,11 +174,10 @@ class Authenticators(ModelHandler):
if not authType:
raise self.invalidRequestException('Invalid type: {}'.format(type_))
self.ensureAccess(authType, permissions.PERMISSION_MANAGEMENT, root=True)
tmpEnvironment = Environment.getTempEnv()
dct = self._params.copy()
dct['_request'] = self._request
res = authType.test(Environment.getTempEnv(), dct)
res = authType.test(tmpEnvironment, dct)
if res[0]:
return self.success()
return res[1]
@ -175,7 +187,7 @@ class Authenticators(ModelHandler):
for user in item.users.all():
for userService in user.userServices.all():
userService.user = None
userService.user = None # type: ignore
userService.removeOrCancel()
item.delete()

View File

@ -134,8 +134,6 @@ class CalendarRules(DetailHandler): # pylint: disable=too-many-public-methods
logger.exception('Saving calendar')
raise RequestError('incorrect invocation to PUT: {0}'.format(e))
return self.getItems(parent, calRule.uuid)
def deleteItem(self, parent: 'Calendar', item: str) -> None:
logger.debug('Deleting rule %s from %s', item, parent)
try:

View File

@ -33,6 +33,7 @@
import datetime
import logging
import typing
from uds.core.util.request import ExtendedHttpRequestWithUser
from uds.REST import Handler
from uds.REST import RequestError
@ -50,17 +51,18 @@ class Connection(Handler):
"""
Processes actor requests
"""
authenticated = True # Actor requests are not authenticated
needs_admin = False
needs_staff = False
@staticmethod
def result(
result: typing.Any = None,
error: typing.Optional[typing.Union[str, int]] = None,
errorCode: int = 0,
retryable: bool = False
) -> typing.Dict[str, typing.Any]:
result: typing.Any = None,
error: typing.Optional[typing.Union[str, int]] = None,
errorCode: int = 0,
retryable: bool = False,
) -> typing.Dict[str, typing.Any]:
"""
Helper method to create a "result" set for connection response
:param result: Result value to return (can be None, in which case it is converted to empty string '')
@ -88,28 +90,41 @@ class Connection(Handler):
# Ensure user is present on request, used by web views methods
self._request.user = self._user
return Connection.result(result=getServicesData(self._request))
return Connection.result(result=getServicesData(typing.cast(ExtendedHttpRequestWithUser, self._request)))
def connection(self, doNotCheck: bool = False):
idService = self._args[0]
idTransport = self._args[1]
try:
ip, userService, iads, trans, itrans = userServiceManager().getService( # pylint: disable=unused-variable
self._user, self._request.os, self._request.ip, idService, idTransport, not doNotCheck
(
ip,
userService,
iads,
trans,
itrans,
) = userServiceManager().getService( # pylint: disable=unused-variable
self._user,
self._request.os,
self._request.ip,
idService,
idTransport,
not doNotCheck,
)
ci = {
'username': '',
'password': '',
'domain': '',
'protocol': 'unknown',
'ip': ip
'ip': ip,
}
if itrans: # only will be available id doNotCheck is False
ci.update(itrans.getConnectionInfo(userService, self._user, 'UNKNOWN'))
return Connection.result(result=ci)
except ServiceNotReadyError as e:
# Refresh ticket and make this retrayable
return Connection.result(error=errors.SERVICE_IN_PREPARATION, errorCode=e.code, retryable=True)
return Connection.result(
error=errors.SERVICE_IN_PREPARATION, errorCode=e.code, retryable=True
)
except Exception as e:
logger.exception("Exception")
return Connection.result(error=str(e))
@ -122,23 +137,49 @@ class Connection(Handler):
hostname = self._args[3]
try:
res = userServiceManager().getService(self._user, self._request.os, self._request.ip, idService, idTransport)
res = userServiceManager().getService(
self._user, self._request.os, self._request.ip, idService, idTransport
)
logger.debug('Res: %s', res)
ip, userService, userServiceInstance, transport, transportInstance = res # pylint: disable=unused-variable
(
ip,
userService,
userServiceInstance,
transport,
transportInstance,
) = res # pylint: disable=unused-variable
password = cryptoManager().symDecrpyt(self.getValue('password'), scrambler)
userService.setConnectionSource(self._request.ip, hostname) # Store where we are accessing from so we can notify Service
userService.setConnectionSource(
self._request.ip, hostname
) # Store where we are accessing from so we can notify Service
transportScript = transportInstance.getEncodedTransportScript(userService, transport, ip, self._request.os, self._user, password, self._request)
if not ip:
raise ServiceNotReadyError()
transportScript = transportInstance.getEncodedTransportScript(
userService,
transport,
ip,
self._request.os,
self._user,
password,
self._request,
)
return Connection.result(result=transportScript)
except ServiceNotReadyError as e:
# Refresh ticket and make this retrayable
return Connection.result(error=errors.SERVICE_IN_PREPARATION, errorCode=e.code, retryable=True)
return Connection.result(
error=errors.SERVICE_IN_PREPARATION, errorCode=e.code, retryable=True
)
except Exception as e:
logger.exception("Exception")
return Connection.result(error=str(e))
def getTicketContent(self):
return {} # TODO: use this for something?
def get(self):
"""
Processes get requests

View File

@ -1,7 +1,6 @@
# -*- coding: utf-8 -*-
#
# Copyright (c) 2012-2019 Virtual Cable S.L.
# Copyright (c) 2012-2020 Virtual Cable S.L.U.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without modification,
@ -63,8 +62,12 @@ class MetaServicesPool(DetailHandler):
'comments': item.pool.comments,
'priority': item.priority,
'enabled': item.enabled,
'user_services_count': item.pool.userServices.exclude(state__in=State.INFO_STATES).count(),
'user_services_in_preparation': item.pool.userServices.filter(state=State.PREPARING).count(),
'user_services_count': item.pool.userServices.exclude(
state__in=State.INFO_STATES
).count(),
'user_services_in_preparation': item.pool.userServices.filter(
state=State.PREPARING
).count(),
}
def getItems(self, parent: MetaPool, item: typing.Optional[str]):
@ -98,20 +101,30 @@ class MetaServicesPool(DetailHandler):
if uuid is not None:
member = parent.members.get(uuid=uuid)
member.pool = pool
member.pool = pool # type: ignore
member.enabled = enabled
member.priority = priority
member.save()
else:
parent.members.create(pool=pool, priority=priority, enabled=enabled)
log.doLog(parent, log.INFO, (uuid is None and "Added" or "Modified") + " meta pool member {}/{}/{} by {}".format(pool.name, priority, enabled, self._user.pretty_name), log.ADMIN)
log.doLog(
parent,
log.INFO,
("Added" if uuid is None else "Modified")
+ " meta pool member {}/{}/{} by {}".format(
pool.name, priority, enabled, self._user.pretty_name
),
log.ADMIN,
)
return self.success()
def deleteItem(self, parent: MetaPool, item: str):
member = parent.members.get(uuid=processUuid(self._args[0]))
logStr = "Removed meta pool member {} by {}".format(member.pool.name, self._user.pretty_name)
logStr = "Removed meta pool member {} by {}".format(
member.pool.name, self._user.pretty_name
)
member.delete()
@ -130,30 +143,33 @@ class MetaAssignedService(DetailHandler):
element['pool_name'] = item.deployed_service.name
return element
def _getAssignedService(self, metaPool: MetaPool, userServiceId: str) -> UserService:
def _getAssignedService(
self, metaPool: MetaPool, userServiceId: str
) -> UserService:
"""
Gets an assigned service and checks that it belongs to this metapool
If not found, raises InvalidItemException
"""
try:
return UserService.objects.filter(uuid=processUuid(userServiceId), cache_level=0, deployed_service__meta=metaPool)[0]
return UserService.objects.filter(
uuid=processUuid(userServiceId),
cache_level=0,
deployed_service__meta=metaPool,
)[0]
except Exception:
pass
raise self.invalidItemException()
def getItems(self, parent: MetaPool, item: typing.Optional[str]):
def assignedUserServicesForPools():
for m in parent.members.filter(enabled=True):
for u in m.pool.assignedUserServices().filter(
state__in=State.VALID_STATES
).prefetch_related(
'properties'
).prefetch_related(
'deployed_service'
).prefetch_related(
'publication'
):
for u in (
m.pool.assignedUserServices()
.filter(state__in=State.VALID_STATES)
.prefetch_related('properties')
.prefetch_related('deployed_service')
.prefetch_related('publication')
):
yield u
try:
@ -163,7 +179,9 @@ class MetaAssignedService(DetailHandler):
result[k.uuid] = MetaAssignedService.itemToDict(parent, k)
return list(result.values())
return MetaAssignedService.itemToDict(parent, self._getAssignedService(parent, item))
return MetaAssignedService.itemToDict(
parent, self._getAssignedService(parent, item)
)
except Exception:
logger.exception('getItems')
raise self.invalidItemException()
@ -178,12 +196,18 @@ class MetaAssignedService(DetailHandler):
{'unique_id': {'title': 'Unique ID'}},
{'ip': {'title': _('IP')}},
{'friendly_name': {'title': _('Friendly name')}},
{'state': {'title': _('status'), 'type': 'dict', 'dict': State.dictionary()}},
{
'state': {
'title': _('status'),
'type': 'dict',
'dict': State.dictionary(),
}
},
{'in_use': {'title': _('In Use')}},
{'source_host': {'title': _('Src Host')}},
{'source_ip': {'title': _('Src Ip')}},
{'owner': {'title': _('Owner')}},
{'actor_version': {'title': _('Actor version')}}
{'actor_version': {'title': _('Actor version')}},
]
def getRowStyle(self, parent: MetaPool) -> typing.Dict[str, typing.Any]:
@ -191,9 +215,9 @@ class MetaAssignedService(DetailHandler):
def getLogs(self, parent: MetaPool, item: str) -> typing.List[typing.Any]:
try:
item = self._getAssignedService(parent, item)
logger.debug('Getting logs for %s', item)
return log.getLogs(item)
asignedService = self._getAssignedService(parent, item)
logger.debug('Getting logs for %s', asignedService)
return log.getLogs(asignedService)
except Exception:
raise self.invalidItemException()
@ -201,9 +225,15 @@ class MetaAssignedService(DetailHandler):
userService = self._getAssignedService(parent, item)
if userService.user:
logStr = 'Deleted assigned service {} to user {} by {}'.format(userService.friendly_name, userService.user.pretty_name, self._user.pretty_name)
logStr = 'Deleted assigned service {} to user {} by {}'.format(
userService.friendly_name,
userService.user.pretty_name,
self._user.pretty_name,
)
else:
logStr = 'Deleted cached service {} by {}'.format(userService.friendly_name, self._user.pretty_name)
logStr = 'Deleted cached service {} by {}'.format(
userService.friendly_name, self._user.pretty_name
)
if userService.state in (State.USABLE, State.REMOVING):
userService.remove()
@ -225,13 +255,25 @@ class MetaAssignedService(DetailHandler):
service = self._getAssignedService(parent, item)
user: User = User.objects.get(uuid=processUuid(fields['user_id']))
logStr = 'Changing ownership of service from {} to {} by {}'.format(service.user.pretty_name, user.pretty_name, self._user.pretty_name)
logStr = 'Changing ownership of service from {} to {} by {}'.format(
service.user.pretty_name, user.pretty_name, self._user.pretty_name
)
# If there is another service that has this same owner, raise an exception
if service.deployed_service.userServices.filter(user=user).exclude(uuid=service.uuid).exclude(state__in=State.INFO_STATES).count() > 0:
raise self.invalidResponseException('There is already another user service assigned to {}'.format(user.pretty_name))
if (
service.deployed_service.userServices.filter(user=user)
.exclude(uuid=service.uuid)
.exclude(state__in=State.INFO_STATES)
.count()
> 0
):
raise self.invalidResponseException(
'There is already another user service assigned to {}'.format(
user.pretty_name
)
)
service.user = user
service.user = user # type: ignore
service.save()
# Log change

View File

@ -97,8 +97,8 @@ class AccessCalendars(DetailHandler):
if uuid is not None:
calAccess: 'CalendarAccess' = parent.calendarAccess.get(uuid=uuid)
calAccess.calendar = calendar
calAccess.service_pool = parent
calAccess.calendar = calendar # type: ignore
calAccess.service_pool = parent # type: ignore
calAccess.access = access
calAccess.priority = priority
calAccess.save()
@ -183,8 +183,8 @@ class ActionsCalendars(DetailHandler):
if uuid is not None:
calAction = CalendarAction.objects.get(uuid=uuid)
calAction.calendar = calendar
calAction.service_pool = parent
calAction.calendar = calendar # type: ignore
calAction.service_pool = parent # type: ignore
calAction.action = action
calAction.at_start = atStart
calAction.events_offset = eventsOffset
@ -208,10 +208,10 @@ class ActionsCalendars(DetailHandler):
log.doLog(parent, log.INFO, logStr, log.ADMIN)
def execute(self, parent: 'ServicePool', item: str):
self.ensureAccess(item, permissions.PERMISSION_MANAGEMENT)
logger.debug('Launching action')
uuid = processUuid(item)
calendarAction: CalendarAction = CalendarAction.objects.get(uuid=uuid)
self.ensureAccess(calendarAction, permissions.PERMISSION_MANAGEMENT)
logStr = "Launched scheduled action \"{},{},{},{},{}\" by {}".format(
calendarAction.calendar.name, calendarAction.action,
calendarAction.events_offset, calendarAction.at_start and 'Start' or 'End', calendarAction.params,

View File

@ -1,7 +1,7 @@
# -*- coding: utf-8 -*-
#
# Copyright (c) 2014-2019 Virtual Cable S.L.
# Copyright (c) 2014-2020 Virtual Cable S.L.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without modification,
@ -52,6 +52,7 @@ class Permissions(Handler):
"""
Processes permissions requests
"""
needs_admin = True
@staticmethod
@ -75,7 +76,9 @@ class Permissions(Handler):
return cls
@staticmethod
def permsToDict(perms: typing.Iterable[models.Permissions]) -> typing.List[typing.Dict[str, str]]:
def permsToDict(
perms: typing.Iterable[models.Permissions],
) -> typing.List[typing.Dict[str, str]]:
res = []
for perm in perms:
if perm.user is None:
@ -85,16 +88,18 @@ class Permissions(Handler):
kind = 'user'
entity = perm.user
res.append({
'id': perm.uuid,
'type': kind,
'auth': entity.manager.uuid,
'auth_name': entity.manager.name,
'entity_id': entity.uuid,
'entity_name': entity.name,
'perm': perm.permission,
'perm_name': perm.permission_as_string
})
res.append(
{
'id': perm.uuid,
'type': kind,
'auth': entity.manager.uuid,
'auth_name': entity.manager.name,
'entity_id': entity.uuid,
'entity_name': entity.name,
'perm': perm.permission,
'perm_name': perm.permission_as_string,
}
)
return sorted(res, key=lambda v: v['auth_name'] + v['entity_name'])
@ -125,7 +130,7 @@ class Permissions(Handler):
'0': permissions.PERMISSION_NONE,
'1': permissions.PERMISSION_READ,
'2': permissions.PERMISSION_MANAGEMENT,
'3': permissions.PERMISSION_ALL
'3': permissions.PERMISSION_ALL,
}.get(self._params.get('perm', '0'), permissions.PERMISSION_NONE)
cls = Permissions.getClass(self._args[0])

View File

@ -1,7 +1,7 @@
# -*- coding: utf-8 -*-
#
# Copyright (c) 2014-2019 Virtual Cable S.L.
# Copyright (c) 2014-2020 Virtual Cable S.L.U.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without modification,
@ -48,16 +48,17 @@ from .services_usage import ServicesUsage
logger = logging.getLogger(__name__)
if typing.TYPE_CHECKING:
from django.db import models
class Providers(ModelHandler):
"""
Providers REST handler
"""
model = Provider
detail = {
'services': DetailServices,
'usage': ServicesUsage
}
detail = {'services': DetailServices, 'usage': ServicesUsage}
custom_methods = [('allservices', False), ('service', False), ('maintenance', True)]
@ -72,7 +73,9 @@ class Providers(ModelHandler):
{'comments': {'title': _('Comments')}},
{'maintenance_state': {'title': _('Status')}},
{'services_count': {'title': _('Services'), 'type': 'numeric'}},
{'user_services_count': {'title': _('User Services'), 'type': 'numeric'}}, # , 'width': '132px'
{
'user_services_count': {'title': _('User Services'), 'type': 'numeric'}
}, # , 'width': '132px'
{'tags': {'title': _('tags'), 'visible': False}},
]
# Field from where to get "class" and prefix for that class, so this will generate "row-state-A, row-state-X, ....
@ -82,25 +85,32 @@ class Providers(ModelHandler):
type_ = item.getType()
# Icon can have a lot of data (1-2 Kbytes), but it's not expected to have a lot of services providers, and even so, this will work fine
offers = [{
'name': ugettext(t.name()),
'type': t.type(),
'description': ugettext(t.description()),
'icon': t.icon64().replace('\n', '')
} for t in type_.getServicesTypes()]
offers = [
{
'name': ugettext(t.name()),
'type': t.type(),
'description': ugettext(t.description()),
'icon': t.icon64().replace('\n', ''),
}
for t in type_.getServicesTypes()
]
return {
'id': item.uuid,
'name': item.name,
'tags': [tag.vtag for tag in item.tags.all()],
'services_count': item.services.count(),
'user_services_count': UserService.objects.filter(deployed_service__service__provider=item).exclude(state__in=(State.REMOVED, State.ERROR)).count(),
'user_services_count': UserService.objects.filter(
deployed_service__service__provider=item
)
.exclude(state__in=(State.REMOVED, State.ERROR))
.count(),
'maintenance_mode': item.maintenance_mode,
'offers': offers,
'type': type_.type(),
'type_name': type_.name(),
'comments': item.comments,
'permission': permissions.getEffectivePermission(self._user, item)
'permission': permissions.getEffectivePermission(self._user, item),
}
def checkDelete(self, item: Provider) -> None:
@ -115,7 +125,9 @@ class Providers(ModelHandler):
def getGui(self, type_: str) -> typing.List[typing.Any]:
clsType = services.factory().lookup(type_)
if clsType:
return self.addDefaultFields(clsType.guiDescription(), ['name', 'comments', 'tags'])
return self.addDefaultFields(
clsType.guiDescription(), ['name', 'comments', 'tags']
)
raise NotFound('Type not found!')
def allservices(self) -> typing.Generator[typing.Dict, None, None]:
@ -136,7 +148,9 @@ class Providers(ModelHandler):
"""
try:
service = Service.objects.get(uuid=self._args[1])
perm = self.ensureAccess(service.provider, permissions.PERMISSION_READ) # Ensures that we can read this item
perm = self.ensureAccess(
service.provider, permissions.PERMISSION_READ
) # Ensures that we can read this item
return DetailServices.serviceToDict(service, perm, True)
except Exception:
# logger.exception('Exception')
@ -161,13 +175,12 @@ class Providers(ModelHandler):
if not spType:
raise NotFound('Type not found!')
self.ensureAccess(spType, permissions.PERMISSION_MANAGEMENT, root=True)
tmpEnvironment = Environment.getTempEnv()
logger.debug('spType: %s', spType)
dct = self._params.copy()
dct['_request'] = self._request
res = spType.test(Environment.getTempEnv(), dct)
res = spType.test(tmpEnvironment, dct)
if res[0]:
return 'ok'

View File

@ -182,7 +182,7 @@ class Services(DetailHandler): # pylint: disable=too-many-public-methods
service.tags.set(
[models.Tag.objects.get_or_create(tag=val)[0] for val in tags]
)
service.proxy = proxy
service.proxy = proxy # type: ignore # Valid asignation, but mypy complains :)
serviceInstance = service.getInstance(self._params)

View File

@ -128,6 +128,9 @@ class Tickets(Handler):
userIp: typing.Optional[str] = self._params.get('userIp', None)
spUuid: str = ''
trUuid: str = ''
try:
authId = self._params.get('authId', None)
authName = self._params.get('auth', None)
@ -187,8 +190,8 @@ class Tickets(Handler):
logger.error('Service pool %s does not has valid transports for ip %s', servicePool.name, userIp)
raise Exception('Service pool does not has any valid transports for ip {}'.format(userIp))
servicePool = servicePool.uuid
transport = transport.uuid
spUuid = servicePool.uuid
trUuid = transport.uuid
except models.Authenticator.DoesNotExist:
return Tickets.result(error='Authenticator does not exists')
@ -205,8 +208,8 @@ class Tickets(Handler):
'realname': realname,
'groups': groupIds,
'auth': auth.uuid,
'servicePool': servicePool,
'transport': transport,
'servicePool': spUuid,
'transport': trUuid,
}
ticket = models.TicketStore.create(data)

View File

@ -143,7 +143,7 @@ class Transports(ModelHandler):
if networks is None:
return
logger.debug('Networks: %s', networks)
item.networks.set(Network.objects.filter(uuid__in=networks))
item.networks.set(Network.objects.filter(uuid__in=networks)) # type: ignore # set is not part of "queryset"
try:
pools = self._params['pools']
@ -155,7 +155,7 @@ class Transports(ModelHandler):
return
logger.debug('Pools: %s', pools)
item.deployedServices.set(ServicePool.objects.filter(uuid__in=pools))
item.deployedServices.set(ServicePool.objects.filter(uuid__in=pools)) # type: ignore # set is not part of "queryset"
# try:
# oss = ','.join(self._params['allowed_oss'])

View File

@ -221,7 +221,7 @@ class AssignedService(DetailHandler):
)
)
userService.user = user
userService.user = user # type: ignore
userService.save()
# Log change

View File

@ -193,7 +193,7 @@ class Users(DetailHandler):
assignedUserService: 'UserService'
for assignedUserService in user.userServices.all():
try:
assignedUserService.user = None # Remove assigned user (avoid cascade deletion)
assignedUserService.user = None # type: ignore # Remove assigned user (avoid cascade deletion)
assignedUserService.save(update_fields=['user'])
assignedUserService.removeOrCancel()
except Exception:
@ -423,7 +423,7 @@ class Groups(DetailHandler):
if not tmpSet:
break # If already empty, stop
users = list(tmpSet)
users = list(tmpSet) if tmpSet else list()
tmpSet = None
else:
users = group.users.all()

View File

@ -111,7 +111,7 @@ def getRootUser() -> User:
staff_member=True,
is_admin=True,
)
user.manager = Authenticator()
user.manager = Authenticator() # type: ignore
# Fake overwrite some methods, a bit cheating? maybe? :)
user.getGroups = lambda: [] # type: ignore
user.updateLastAccess = lambda: None # type: ignore
@ -140,13 +140,13 @@ def webLoginRequired(
# if GlobalConfig.REDIRECT_TO_HTTPS.getBool() is True:
# url = url.replace('http://', 'https://')
# logger.debug('No user found, redirecting to %s', url)
return HttpResponseRedirect(reverse('page.login'))
return HttpResponseRedirect(reverse('page.login')) # type: ignore
if admin is True or admin == 'admin':
if request.user.isStaff() is False or (
admin == 'admin' and request.user.is_admin is False
):
return HttpResponseForbidden(_('Forbidden'))
return HttpResponseForbidden(_('Forbidden')) # type: ignore
return view_func(request, *args, **kwargs)
@ -171,7 +171,7 @@ def trustedSourceRequired(
"""
try:
if not net.ipInNetwork(request.ip, GlobalConfig.TRUSTED_SOURCES.get(True)):
return HttpResponseForbidden()
return HttpResponseForbidden() # type: ignore
except Exception as e:
logger.warning(
'Error checking trusted source: "%s" does not seems to be a valid network string. Using Unrestricted access.',
@ -189,7 +189,7 @@ def denyNonAuthenticated(
@wraps(view_func)
def _wrapped_view(request: 'ExtendedHttpRequest', *args, **kwargs) -> RT:
if not request.user:
return HttpResponseForbidden()
return HttpResponseForbidden() # type: ignore
return view_func(request, *args, **kwargs)
return _wrapped_view

View File

@ -1,6 +1,6 @@
# -*- coding: utf-8 -*-
#
# Copyright (c) 2012-2019 Virtual Cable S.L.
# Copyright (c) 2012-2020 Virtual Cable S.L.U.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without modification,
@ -92,7 +92,7 @@ class LogManager:
if avoidDuplicates is True:
try:
lg = models.Log.objects.filter(owner_id=owner_id, owner_type=owner_type, level=level, source=source).order_by('-created', '-id')[0]
if lg.message == message:
if lg.data == message:
# Do not log again, already logged
return
except Exception: # Do not exists log
@ -127,7 +127,7 @@ class LogManager:
owner_type = transDict.get(type(wichObject), None)
if owner_type is not None:
self.__log(owner_type, wichObject.id, level, message, source, avoidDuplicates)
self.__log(owner_type, wichObject.id, level, message, source, avoidDuplicates) # type: ignore
else:
logger.debug('Requested doLog for a type of object not covered: %s', wichObject)
@ -140,7 +140,7 @@ class LogManager:
logger.debug('Getting log: %s -> %s', wichObject, owner_type)
if owner_type is not None: # 0 is valid owner type
return self.__getLogs(owner_type, wichObject.id, limit)
return self.__getLogs(owner_type, wichObject.id, limit) # type: ignore
logger.debug('Requested getLogs for a type of object not covered: %s', wichObject)
return []
@ -154,6 +154,6 @@ class LogManager:
owner_type = transDict.get(type(wichObject), None)
if owner_type:
self.__clearLogs(owner_type, wichObject.id)
self.__clearLogs(owner_type, wichObject.id) # type: ignore
else:
logger.debug('Requested clearLogs for a type of object not covered: %s', wichObject)

View File

@ -32,6 +32,7 @@
"""
import logging
import typing
from uds.REST.methods.permissions import Permissions
from uds import models
from uds.core.util import ot
@ -56,18 +57,21 @@ def getPermissions(obj: 'Model') -> typing.List[models.Permissions]:
return list(models.Permissions.enumeratePermissions(object_type=ot.getObjectType(obj), object_id=obj.pk))
def getEffectivePermission(user: 'models.User', obj: 'Model', root: bool = False):
if user.is_admin is True:
return PERMISSION_ALL
def getEffectivePermission(user: 'models.User', obj: 'Model', root: bool = False) -> int:
try:
if user.is_admin is True:
return PERMISSION_ALL
if user.staff_member is False:
if user.staff_member is False:
return PERMISSION_NONE
if root is False:
return models.Permissions.getPermissions(user=user, groups=user.groups.all(), object_type=ot.getObjectType(obj), object_id=obj.pk)
return models.Permissions.getPermissions(user=user, groups=user.groups.all(), object_type=ot.getObjectType(obj))
except Exception:
return PERMISSION_NONE
if root is False:
return models.Permissions.getPermissions(user=user, groups=user.groups.all(), object_type=ot.getObjectType(obj), object_id=obj.pk)
return models.Permissions.getPermissions(user=user, groups=user.groups.all(), object_type=ot.getObjectType(obj))
def addUserPermission(user: 'models.User', obj: 'Model', permission: int = PERMISSION_READ):
# Some permissions added to some object types needs at least READ_PERMISSION on parent

View File

@ -159,6 +159,8 @@ class ServicePool(UUIDModel, TaggingMixin): # type: ignore
userServices: 'models.QuerySet[UserService]'
calendarAccess: 'models.QuerySet[CalendarAccess]'
changelog: 'models.QuerySet[ServicePoolPublicationChangelog]'
calendaraction_set: typing.Any
class Meta(UUIDModel.Meta):
"""

View File

@ -44,7 +44,7 @@ from .tag import TaggingMixin
# Not imported at runtime, just for type checking
if typing.TYPE_CHECKING:
from uds.models import Network
from uds.models import Network, ServicePool
logger = logging.getLogger(__name__)
@ -56,29 +56,33 @@ class Transport(ManagedObjectModel, TaggingMixin):
Sample of transports are RDP, Spice, Web file uploader, etc...
"""
# pylint: disable=model-missing-unicode
priority = models.IntegerField(default=0, db_index=True)
nets_positive = models.BooleanField(default=False)
# We store allowed oss as a comma-separated list
allowed_oss = models.CharField(max_length=255, default='')
# "fake" relations declarations for type checking
networks: 'models.QuerySet[Network]'
# "fake" declarations for type checking
objects: 'models.BaseManager[Transport]'
deployedServices: 'models.QuerySet[ServicePool]'
networks: 'models.QuerySet[Network]'
class Meta(ManagedObjectModel.Meta):
"""
Meta class to declare default order
"""
ordering = ('name',)
app_label = 'uds'
def getInstance(self, values: typing.Optional[typing.Dict[str, str]] = None) -> 'transports.Transport':
def getInstance(
self, values: typing.Optional[typing.Dict[str, str]] = None
) -> 'transports.Transport':
return typing.cast('transports.Transport', super().getInstance(values=values))
def getType(self) -> 'typing.Type[transports.Transport]':
def getType(self) -> 'typing.Type[transports.Transport]':
"""
Get the type of the object this record represents.
@ -144,6 +148,7 @@ class Transport(ManagedObjectModel, TaggingMixin):
:note: If destroy raises an exception, the deletion is not taken.
"""
from uds.core.util.permissions import clean
toDelete = kwargs['instance']
logger.debug('Before delete transport %s', toDelete)
@ -156,5 +161,6 @@ class Transport(ManagedObjectModel, TaggingMixin):
# Clears related permissions
clean(toDelete)
# : Connects a pre deletion signal to OS Manager
models.signals.pre_delete.connect(Transport.beforeDelete, sender=Transport)

View File

@ -47,7 +47,7 @@ from .uuid_model import UUIDModel
# Not imported at runtime, just for type checking
if typing.TYPE_CHECKING:
from uds.core import auths
from uds.models import Group
from uds.models import Group, UserService
logger = logging.getLogger(__name__)
@ -72,6 +72,7 @@ class User(UUIDModel):
# "fake" declarations for type checking
objects: 'models.BaseManager[User]'
groups: 'models.QuerySet[Group]'
userServices: 'models.QuerySet[UserService]'
class Meta(UUIDModel.Meta):
"""

View File

@ -193,7 +193,7 @@ class UserService(UUIDModel): # pylint: disable=too-many-public-methods
publicationInstance = self.publication.getInstance()
except Exception:
# The publication to witch this item points to, does not exists
self.publication = None
self.publication = None # type: ignore
logger.exception(
'Got exception at getInstance of an userService %s (seems that publication does not exists!)',
self,
@ -401,7 +401,7 @@ class UserService(UUIDModel): # pylint: disable=too-many-public-methods
"""
self.cache_level = 0
self.state_date = getSqlDatetime()
self.user = user
self.user = user # type: ignore
self.save(update_fields=['cache_level', 'state_date', 'user'])
def setInUse(self, inUse: bool) -> None: