1
0
mirror of https://github.com/dkmstr/openuds.git synced 2025-01-05 09:17:54 +03:00

Added some fixes to services_info_dict and finished tests for metapools transports grouping

This commit is contained in:
Adolfo Gómez García 2024-02-10 21:28:15 +01:00
parent 3d02a99b42
commit 9800218df9
No known key found for this signature in database
GPG Key ID: DD1ABF20724CDA23
18 changed files with 403 additions and 298 deletions

View File

@ -66,3 +66,6 @@ DEFAULT_MAX_PREPARING_SERVICES: typing.Final[int] = 15
# Default wait time for rechecks, etc... # Default wait time for rechecks, etc...
DEFAULT_WAIT_TIME: typing.Final[int] = 8 # seconds DEFAULT_WAIT_TIME: typing.Final[int] = 8 # seconds
# UDS Action url scheme
UDS_ACTION_SCHEME: typing.Final[str] = 'udsa://'

View File

@ -55,6 +55,7 @@ class FixedService(services.Service, abc.ABC): # pylint: disable=too-many-publi
""" """
Proxmox fixed machines service. Proxmox fixed machines service.
""" """
is_base: typing.ClassVar[bool] = True # This is a base service, not a final one is_base: typing.ClassVar[bool] = True # This is a base service, not a final one
uses_cache = False # Cache are running machine awaiting to be assigned uses_cache = False # Cache are running machine awaiting to be assigned
@ -73,7 +74,7 @@ class FixedService(services.Service, abc.ABC): # pylint: disable=too-many-publi
# allowed_protocols = types.transports.Protocol.generic_vdi(types.transports.Protocol.SPICE) # allowed_protocols = types.transports.Protocol.generic_vdi(types.transports.Protocol.SPICE)
# services_type_provided = types.services.ServiceType.VDI # services_type_provided = types.services.ServiceType.VDI
# Gui # Gui remplates, to be "incorporated" by inherited classes if needed
token = gui.TextField( token = gui.TextField(
order=1, order=1,
label=_('Service Token'), label=_('Service Token'),
@ -86,6 +87,25 @@ class FixedService(services.Service, abc.ABC): # pylint: disable=too-many-publi
readonly=False, readonly=False,
) )
use_snapshots = gui.CheckBoxField(
label=_('Use snapshots'),
default=False,
order=22,
tooltip=_('If active, UDS will try to create an snapshot (if one already does not exists) before accessing a machine, and restore it after usage.'),
tab=_('Machines'),
old_field_name='useSnapshots',
)
# Keep name as "machine" so we can use VCHelpers.getMachines
machines = gui.MultiChoiceField(
label=_("Machines"),
order=21,
tooltip=_('Machines for this service'),
required=True,
tab=_('Machines'),
rows=10,
)
def _get_assigned_machines(self) -> typing.Set[str]: def _get_assigned_machines(self) -> typing.Set[str]:
vals = self.storage.get_unpickle('vms') vals = self.storage.get_unpickle('vms')
logger.debug('Got storage VMS: %s', vals) logger.debug('Got storage VMS: %s', vals)

View File

@ -85,6 +85,7 @@ class HighAvailabilityPolicy(enum.IntEnum):
(HighAvailabilityPolicy.ENABLED, _('Enabled')), (HighAvailabilityPolicy.ENABLED, _('Enabled')),
] ]
@dataclasses.dataclass(frozen=True) @dataclasses.dataclass(frozen=True)
class UsageInfo: class UsageInfo:
used: int used: int
@ -93,3 +94,31 @@ class UsageInfo:
@property @property
def percent(self) -> int: def percent(self) -> int:
return (self.used * 100 // self.total) if self.total > 0 else 0 return (self.used * 100 // self.total) if self.total > 0 else 0
class UsageInfoVars:
use_percent: str
use_count: str
left_count: str
max_srvs: str
def __init__(self, pool_usage_info: typing.Optional[UsageInfo] = None):
if pool_usage_info is None:
pool_usage_info = UsageInfo(0, 0)
self.use_percent = str(pool_usage_info.percent) + '%' if pool_usage_info.total > 0 else ''
self.use_count = str(pool_usage_info.used) if pool_usage_info.total > 0 else ''
self.left_count = str(pool_usage_info.total - pool_usage_info.used) if pool_usage_info.total > 0 else ''
self.max_srvs = str(pool_usage_info.total) if pool_usage_info.total > 0 else ''
def replace(self, x: str) -> str:
return (
x.replace('{use}', self.use_percent)
.replace('{total}', self.max_srvs)
.replace('{usec}', self.use_count)
.replace('{left}', self.left_count)
)
@staticmethod
def has_macros(x: str) -> bool:
return any(y in x for y in ('{use}', '{total}', '{usec}', '{left}'))

View File

@ -290,7 +290,8 @@ def concurrent_removal_limit_field(
def remove_duplicates_field( def remove_duplicates_field(
order: int = 102, tab: 'types.ui.Tab|str|None' = types.ui.Tab.ADVANCED order: int = 102, tab: 'types.ui.Tab|str|None' = types.ui.Tab.ADVANCED,
old_field_name: typing.Optional[str] = None,
) -> ui.gui.CheckBoxField: ) -> ui.gui.CheckBoxField:
return ui.gui.CheckBoxField( return ui.gui.CheckBoxField(
label=_('Remove found duplicates'), label=_('Remove found duplicates'),
@ -298,7 +299,7 @@ def remove_duplicates_field(
order=order, order=order,
tooltip=_('If active, found duplicates vApps for this service will be removed'), tooltip=_('If active, found duplicates vApps for this service will be removed'),
tab=tab, tab=tab,
old_field_name='removeDuplicates', old_field_name=old_field_name,
) )

View File

@ -37,6 +37,8 @@ import collections.abc
from django.utils.translation import get_language from django.utils.translation import get_language
from django.utils import formats from django.utils import formats
from uds.core import consts
if typing.TYPE_CHECKING: if typing.TYPE_CHECKING:
from django.http import HttpRequest # pylint: disable=ungrouped-imports from django.http import HttpRequest # pylint: disable=ungrouped-imports
@ -62,7 +64,7 @@ def uds_access_link(
''' '''
If transportId (uuid) is None, this will be a metaLink If transportId (uuid) is None, this will be a metaLink
''' '''
return f'udsa://{serviceId}/{transportId or "meta"}' return f'{consts.system.UDS_ACTION_SCHEME}{serviceId}/{transportId or "meta"}'
def parse_date(dateToParse) -> datetime.date: def parse_date(dateToParse) -> datetime.date:

View File

@ -78,6 +78,11 @@ class Transport(ManagedObjectModel, TaggingMixin):
deployedServices: 'models.manager.RelatedManager[ServicePool]' deployedServices: 'models.manager.RelatedManager[ServicePool]'
networks: 'models.manager.RelatedManager[Network]' networks: 'models.manager.RelatedManager[Network]'
@property
# Alias for future renaming (start using alias asap)
def service_pools(self) -> 'models.manager.RelatedManager[ServicePool]':
return self.deployedServices
class Meta(ManagedObjectModel.Meta): # pylint: disable=too-few-public-methods class Meta(ManagedObjectModel.Meta): # pylint: disable=too-few-public-methods
""" """
Meta class to declare default order Meta class to declare default order

View File

@ -32,6 +32,7 @@ import logging
import typing import typing
from django.utils.translation import gettext_noop as _, gettext from django.utils.translation import gettext_noop as _, gettext
from regex import F
from uds.core import services, types, consts, exceptions from uds.core import services, types, consts, exceptions
from uds.core.services.specializations.fixed_machine.fixed_service import FixedService from uds.core.services.specializations.fixed_machine.fixed_service import FixedService
from uds.core.services.specializations.fixed_machine.fixed_userservice import FixedUserService from uds.core.services.specializations.fixed_machine.fixed_userservice import FixedUserService
@ -92,24 +93,9 @@ class ProxmoxFixedService(FixedService): # pylint: disable=too-many-public-meth
tab=_('Machines'), tab=_('Machines'),
old_field_name='resourcePool', old_field_name='resourcePool',
) )
# Keep name as "machine" so we can use VCHelpers.getMachines
machines = gui.MultiChoiceField(
label=_("Machines"),
order=21,
tooltip=_('Machines for this service'),
required=True,
tab=_('Machines'),
rows=10,
)
use_snapshots = gui.CheckBoxField( machines = FixedService.machines
label=_('Use snapshots'), use_snapshots = FixedService.use_snapshots
default=False,
order=22,
tooltip=_('If active, UDS will try to create an snapshot on VM use and recover if on exit.'),
tab=_('Machines'),
old_field_name='useSnapshots',
)
prov_uuid = gui.HiddenField(value=None) prov_uuid = gui.HiddenField(value=None)

View File

@ -55,7 +55,8 @@ logger = logging.getLogger(__name__)
class XenFixedService(FixedService): # pylint: disable=too-many-public-methods class XenFixedService(FixedService): # pylint: disable=too-many-public-methods
""" """
Proxmox fixed machines service. Represents a Proxmox service based on fixed machines.
This service requires the qemu agent to be installed on the machines.
""" """
type_name = _('Proxmox Fixed Machines') type_name = _('Proxmox Fixed Machines')
@ -86,29 +87,13 @@ class XenFixedService(FixedService): # pylint: disable=too-many-public-methods
'function': helpers.get_machines, 'function': helpers.get_machines,
'parameters': ['prov_uuid', 'folder'], 'parameters': ['prov_uuid', 'folder'],
}, },
tooltip=_('Resource Pool containing base machines'), tooltip=_('Folder containing base machines'),
required=True, required=True,
tab=_('Machines'), tab=_('Machines'),
old_field_name='resourcePool', old_field_name='resourcePool',
) )
# Keep name as "machine" so we can use VCHelpers.getMachines machines = FixedService.machines
machines = gui.MultiChoiceField( use_snapshots = FixedService.use_snapshots
label=_("Machines"),
order=21,
tooltip=_('Machines for this service'),
required=True,
tab=_('Machines'),
rows=10,
)
use_snapshots = gui.CheckBoxField(
label=_('Use snapshots'),
default=False,
order=22,
tooltip=_('If active, UDS will try to create an snapshot on VM use and recover if on exit.'),
tab=_('Machines'),
old_field_name='useSnapshots',
)
prov_uuid = gui.HiddenField(value=None) prov_uuid = gui.HiddenField(value=None)

View File

@ -29,6 +29,7 @@
@author: Adolfo Gómez, dkmaster at dkmon dot com @author: Adolfo Gómez, dkmaster at dkmon dot com
''' '''
import collections.abc import collections.abc
from functools import reduce
import logging import logging
import typing import typing
@ -53,7 +54,7 @@ from uds.models import MetaPool, Network, ServicePool, ServicePoolGroup, TicketS
# Not imported at runtime, just for type checking # Not imported at runtime, just for type checking
if typing.TYPE_CHECKING: if typing.TYPE_CHECKING:
from uds.core.types.requests import ExtendedHttpRequestWithUser from uds.core.types.requests import ExtendedHttpRequestWithUser
from uds.models import Image from uds.models import Image, MetaPoolMember
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@ -120,54 +121,45 @@ def get_services_info_dict(
""" """
# We look for services for this authenticator groups. User is logged in in just 1 authenticator, so his groups must coincide with those assigned to ds # We look for services for this authenticator groups. User is logged in in just 1 authenticator, so his groups must coincide with those assigned to ds
groups = list(request.user.get_groups()) groups = list(request.user.get_groups())
availServicePools = list( available_service_pools = list(
ServicePool.get_pools_for_groups(groups, request.user) ServicePool.get_pools_for_groups(groups, request.user)
) # Pass in user to get "number_assigned" to optimize ) # Pass in user to get "number_assigned" to optimize
availMetaPools = list( available_metapools = list(
MetaPool.metapools_for_groups(groups, request.user) MetaPool.metapools_for_groups(groups, request.user)
) # Pass in user to get "number_assigned" to optimize ) # Pass in user to get "number_assigned" to optimize
now = sql_datetime() now = sql_datetime()
# Information for administrators # Information for administrators
nets = '' nets = ''
validTrans = '' valid_transports = ''
osType: 'types.os.KnownOS' = request.os.os os_type: 'types.os.KnownOS' = request.os.os
logger.debug('OS: %s', osType) logger.debug('OS: %s', os_type)
if request.user.is_staff(): def _is_valid_transport(t: Transport) -> bool:
nets = ','.join([n.name for n in Network.get_networks_for_ip(request.ip)]) transport_type = t.get_type()
tt = [] return (
t: Transport transport_type is not None
for t in Transport.objects.all().prefetch_related('networks'): and t.is_ip_allowed(request.ip)
if t.is_ip_allowed(request.ip): and transport_type.supports_os(os_type)
tt.append(t.name) and t.is_os_allowed(os_type)
validTrans = ','.join(tt) )
logger.debug('Checking meta pools: %s', availMetaPools)
services = []
# Metapool helpers # Metapool helpers
def transportIterator(member) -> collections.abc.Iterable[Transport]: def _valid_transports(member: 'MetaPoolMember') -> collections.abc.Iterable[Transport]:
t: Transport t: Transport
for t in member.pool.transports.all().order_by('priority'): for t in member.pool.transports.all().order_by('priority'):
try: try:
typeTrans = t.get_type() if _is_valid_transport(t):
if (
typeTrans
and t.is_ip_allowed(request.ip)
and typeTrans.supports_os(osType)
and t.is_os_allowed(osType)
):
yield t yield t
except Exception as e: except Exception as e:
logger.warning('Transport %s of %s not found. Ignoring. (%s)', t, member.pool, e) logger.warning('Transport %s of %s not found. Ignoring. (%s)', t, member.pool, e)
def buildMetaTransports( def _build_transports_for_meta(
transports: collections.abc.Iterable[Transport], isLabel: bool, meta: 'MetaPool' transports: collections.abc.Iterable[Transport], is_by_label: bool, meta: 'MetaPool'
) -> list[collections.abc.Mapping[str, typing.Any]]: ) -> list[collections.abc.Mapping[str, typing.Any]]:
def idd(i): def idd(i):
return i.uuid if not isLabel else 'LABEL:' + i.label return i.uuid if not is_by_label else 'LABEL:' + i.label
return [ return [
{ {
@ -176,110 +168,96 @@ def get_services_info_dict(
'link': html.uds_access_link(request, 'M' + meta.uuid, idd(i)), # type: ignore 'link': html.uds_access_link(request, 'M' + meta.uuid, idd(i)), # type: ignore
'priority': i.priority, 'priority': i.priority,
} }
for i in transports for i in sorted(transports, key=lambda x: x.priority) # Sorted by priority
] ]
if request.user.is_staff():
nets = ','.join([n.name for n in Network.get_networks_for_ip(request.ip)])
tt = []
t: Transport
for t in Transport.objects.all().prefetch_related('networks'):
if t.is_ip_allowed(request.ip):
tt.append(t.name)
valid_transports = ','.join(tt)
logger.debug('Checking meta pools: %s', available_metapools)
services = []
# Preload all assigned user services for this user # Preload all assigned user services for this user
# Add meta pools data first # Add meta pools data first
for meta in availMetaPools: for meta in available_metapools:
# Check that we have access to at least one transport on some of its children # Check that we have access to at least one transport on some of its children
metaTransports: list[collections.abc.Mapping[str, typing.Any]] = [] transports_in_meta: list[collections.abc.Mapping[str, typing.Any]] = []
in_use = meta.number_in_use > 0 # type: ignore # anotated value in_use = meta.number_in_use > 0 # type: ignore # anotated value
inAll: typing.Optional[typing.Set[str]] = None # Calculate info variable macros content if needed
tmpSet: typing.Set[str] info_vars = (
types.pools.UsageInfoVars(meta.usage())
# If no macro on names, skip calculation (and set to empty) if (
if '{' in meta.name or '{' in meta.visual_name: types.pools.UsageInfoVars.has_macros(meta.name)
poolUsageInfo = meta.usage() or types.pools.UsageInfoVars.has_macros(meta.visual_name)
use_percent = str(poolUsageInfo.percent) + '%' )
use_count = str(poolUsageInfo.used) else types.pools.UsageInfoVars()
left_count = str(poolUsageInfo.total - poolUsageInfo.used)
max_srvs = str(poolUsageInfo.total)
else:
max_srvs = ''
use_percent = ''
use_count = ''
left_count = ''
# pylint: disable=cell-var-from-loop
def macro_info(x: str) -> str:
return (
x.replace('{use}', use_percent)
.replace('{total}', max_srvs)
.replace('{usec}', use_count)
.replace('{left}', left_count)
) )
if meta.transport_grouping == types.pools.TransportSelectionPolicy.COMMON: if meta.transport_grouping == types.pools.TransportSelectionPolicy.COMMON:
# only keep transports that are in ALL members # Keep only transports that are in all pools
for member in meta.members.all().order_by('priority'): # This will be done by getting all transports from all pools and then intersecting them
tmpSet = set() # using reduce
# if first pool, get all its transports and check that are valid transports_in_all_pools = typing.cast(
for t in transportIterator(member): set[Transport],
if inAll is None: reduce(
tmpSet.add(t.uuid) # type: ignore lambda x, y: x & y,
elif t.uuid in inAll: # For subsequent, reduce... [{t for t in _valid_transports(member)} for member in meta.members.all()],
tmpSet.add(t.uuid) # type: ignore ),
)
inAll = tmpSet transports_in_meta = _build_transports_for_meta(
# tmpSet has ALL common transports transports_in_all_pools,
metaTransports = buildMetaTransports( is_by_label=False,
Transport.objects.filter(uuid__in=inAll or []), isLabel=False, meta=meta meta=meta,
) )
elif meta.transport_grouping == types.pools.TransportSelectionPolicy.LABEL: elif meta.transport_grouping == types.pools.TransportSelectionPolicy.LABEL:
ltrans: collections.abc.MutableMapping[str, Transport] = {} ltrans: collections.abc.MutableMapping[str, Transport] = {}
for member in meta.members.all().order_by('priority'): transports_in_all_pools_by_label: typing.Optional[typing.Set[str]] = None
tmpSet = set() temporary_transport_set_by_label: typing.Set[str]
for member in meta.members.all():
temporary_transport_set_by_label = set()
# if first pool, get all its transports and check that are valid # if first pool, get all its transports and check that are valid
for t in transportIterator(member): for t in _valid_transports(member):
if not t.label: if not t.label:
continue continue
if t.label not in ltrans or ltrans[t.label].priority > t.priority: if t.label not in ltrans or ltrans[t.label].priority > t.priority:
ltrans[t.label] = t ltrans[t.label] = t
if inAll is None: if transports_in_all_pools_by_label is None:
tmpSet.add(t.label) temporary_transport_set_by_label.add(t.label)
elif t.label in inAll: # For subsequent, reduce... elif t.label in transports_in_all_pools_by_label: # For subsequent, reduce...
tmpSet.add(t.label) temporary_transport_set_by_label.add(t.label)
inAll = tmpSet transports_in_all_pools_by_label = temporary_transport_set_by_label
# tmpSet has ALL common transports # tmpSet has ALL common transports
metaTransports = buildMetaTransports( transports_in_meta = _build_transports_for_meta(
(v for k, v in ltrans.items() if k in (inAll or set())), isLabel=True, meta=meta (v for k, v in ltrans.items() if k in (transports_in_all_pools_by_label or set())),
is_by_label=True,
meta=meta,
) )
else: else:
for member in meta.members.all(): # If we have at least one valid transport,
# if pool.is_in_maintenance(): # mark as "meta" transport and add it to the list
# continue transports_in_meta = [
for t in member.pool.transports.all():
typeTrans = t.get_type()
if (
typeTrans
and t.is_ip_allowed(request.ip)
and typeTrans.supports_os(osType)
and t.is_os_allowed(osType)
):
metaTransports = [
{ {
'id': 'meta', 'id': 'meta',
'name': 'meta', 'name': 'meta',
'link': html.uds_access_link(request, 'M' + meta.uuid, None), # type: ignore 'link': html.uds_access_link(request, 'M' + meta.uuid, None), # type: ignore
'priority': 0, 'priority': 0,
} }
if any(_valid_transports(member) for member in meta.members.all())
else {}
] ]
break
# if not in_use and meta.number_in_use: # Only look for assignation on possible used
# assignedUserService = UserServiceManager().getExistingAssignationForUser(pool, request.user)
# if assignedUserService:
# in_use = assignedUserService.in_use
# Stop when 1 usable pool is found (metaTransports is filled)
if metaTransports:
break
# If no usable pools, this is not visible # If no usable pools, this is not visible
if metaTransports: if transports_in_meta:
group: collections.abc.MutableMapping[str, typing.Any] = ( group: collections.abc.MutableMapping[str, typing.Any] = (
meta.servicesPoolGroup.as_dict if meta.servicesPoolGroup else ServicePoolGroup.default().as_dict meta.servicesPoolGroup.as_dict if meta.servicesPoolGroup else ServicePoolGroup.default().as_dict
) )
@ -288,13 +266,13 @@ def get_services_info_dict(
_service_info( _service_info(
uuid=meta.uuid, uuid=meta.uuid,
is_meta=True, is_meta=True,
name=macro_info(meta.name), name=info_vars.replace(meta.name),
visual_name=macro_info(meta.visual_name), visual_name=info_vars.replace(meta.visual_name),
description=meta.comments, description=meta.comments,
group=group, group=group,
transports=metaTransports, transports=transports_in_meta,
image=meta.image, image=meta.image,
show_transports=len(metaTransports) > 1, show_transports=len(transports_in_meta) > 1,
allow_users_remove=meta.allow_users_remove, allow_users_remove=meta.allow_users_remove,
allow_users_reset=meta.allow_users_remove, allow_users_reset=meta.allow_users_remove,
maintenance=meta.is_in_maintenance(), maintenance=meta.is_in_maintenance(),
@ -307,20 +285,20 @@ def get_services_info_dict(
) )
# Now generic user service # Now generic user service
for sPool in availServicePools: for service_pool in available_service_pools:
# Skip pools that are part of meta pools # Skip pools that are part of meta pools
if sPool.owned_by_meta: if service_pool.owned_by_meta:
continue continue
# If no macro on names, skip calculation # If no macro on names, skip calculation
if '{' in sPool.name or '{' in sPool.visual_name: if '{' in service_pool.name or '{' in service_pool.visual_name:
poolUsageInfo = sPool.usage( pool_usage_info = service_pool.usage(
sPool.usage_count, # type: ignore # anotated value service_pool.usage_count, # type: ignore # anotated value
) )
use_percent = str(poolUsageInfo.percent) + '%' use_percent = str(pool_usage_info.percent) + '%'
use_count = str(poolUsageInfo.used) use_count = str(pool_usage_info.used)
left_count = str(poolUsageInfo.total - poolUsageInfo.used) left_count = str(pool_usage_info.total - pool_usage_info.used)
max_srvs = str(poolUsageInfo.total) max_srvs = str(pool_usage_info.total)
else: else:
max_srvs = '' max_srvs = ''
use_percent = '' use_percent = ''
@ -328,7 +306,7 @@ def get_services_info_dict(
left_count = '' left_count = ''
# pylint: disable=cell-var-from-loop # pylint: disable=cell-var-from-loop
def macro_info(x: str) -> str: def _replace_macro_vars(x: str) -> str:
return ( return (
x.replace('{use}', use_percent) x.replace('{use}', use_percent)
.replace('{total}', max_srvs) .replace('{total}', max_srvs)
@ -338,19 +316,14 @@ def get_services_info_dict(
trans: list[collections.abc.Mapping[str, typing.Any]] = [] trans: list[collections.abc.Mapping[str, typing.Any]] = []
for t in sorted( for t in sorted(
sPool.transports.all(), key=lambda x: x.priority service_pool.transports.all(), key=lambda x: x.priority
): # In memory sort, allows reuse prefetched and not too big array ): # In memory sort, allows reuse prefetched and not too big array
typeTrans = t.get_type() transport_type = t.get_type()
if ( if _is_valid_transport(t):
typeTrans if transport_type.own_link:
and t.is_ip_allowed(request.ip) link = reverse('webapi.transport_own_link', args=('F' + service_pool.uuid, t.uuid)) # type: ignore
and typeTrans.supports_os(osType)
and t.is_os_allowed(osType)
):
if typeTrans.own_link:
link = reverse('webapi.transport_own_link', args=('F' + sPool.uuid, t.uuid)) # type: ignore
else: else:
link = html.uds_access_link(request, 'F' + sPool.uuid, t.uuid) # type: ignore link = html.uds_access_link(request, 'F' + service_pool.uuid, t.uuid) # type: ignore
trans.append({'id': t.uuid, 'name': t.name, 'link': link, 'priority': t.priority}) trans.append({'id': t.uuid, 'name': t.name, 'link': link, 'priority': t.priority})
# If empty transports, do not include it on list # If empty transports, do not include it on list
@ -358,52 +331,54 @@ def get_services_info_dict(
continue continue
# Locate if user service has any already assigned user service for this. Use "pre cached" number of assignations in this pool to optimize # Locate if user service has any already assigned user service for this. Use "pre cached" number of assignations in this pool to optimize
in_use = typing.cast(typing.Any, sPool).number_in_use > 0 in_use = typing.cast(typing.Any, service_pool).number_in_use > 0
# if svr.number_in_use: # Anotated value got from getDeployedServicesForGroups(...). If 0, no assignation for this user # if svr.number_in_use: # Anotated value got from getDeployedServicesForGroups(...). If 0, no assignation for this user
# ads = UserServiceManager().getExistingAssignationForUser(svr, request.user) # ads = UserServiceManager().getExistingAssignationForUser(svr, request.user)
# if ads: # if ads:
# in_use = ads.in_use # in_use = ads.in_use
group = ( group = (
sPool.servicesPoolGroup.as_dict if sPool.servicesPoolGroup else ServicePoolGroup.default().as_dict service_pool.servicesPoolGroup.as_dict
if service_pool.servicesPoolGroup
else ServicePoolGroup.default().as_dict
) )
# Only add toBeReplaced info in case we allow it. This will generate some "overload" on the services # Only add toBeReplaced info in case we allow it. This will generate some "overload" on the services
toBeReplacedDate = ( when_will_be_replaced = (
sPool.when_will_be_replaced(request.user) service_pool.when_will_be_replaced(request.user)
if typing.cast(typing.Any, sPool).pubs_active > 0 if typing.cast(typing.Any, service_pool).pubs_active > 0
and GlobalConfig.NOTIFY_REMOVAL_BY_PUB.as_bool(False) and GlobalConfig.NOTIFY_REMOVAL_BY_PUB.as_bool(False)
else None else None
) )
# tbr = False # tbr = False
if toBeReplacedDate: if when_will_be_replaced:
toBeReplaced = formats.date_format(toBeReplacedDate, 'SHORT_DATETIME_FORMAT') replace_date_as_str = formats.date_format(when_will_be_replaced, 'SHORT_DATETIME_FORMAT')
toBeReplacedTxt = gettext( replace_date_info_text = gettext(
'This service is about to be replaced by a new version. Please, close the session before {} and save all your work to avoid loosing it.' 'This service is about to be replaced by a new version. Please, close the session before {} and save all your work to avoid loosing it.'
).format(toBeReplacedDate) ).format(when_will_be_replaced)
else: else:
toBeReplaced = None replace_date_as_str = None
toBeReplacedTxt = '' replace_date_info_text = ''
services.append( services.append(
_service_info( _service_info(
uuid=sPool.uuid, uuid=service_pool.uuid,
is_meta=False, is_meta=False,
name=macro_info(sPool.name), name=_replace_macro_vars(service_pool.name),
visual_name=macro_info(sPool.visual_name), visual_name=_replace_macro_vars(service_pool.visual_name),
description=sPool.comments, description=service_pool.comments,
group=group, group=group,
transports=trans, transports=trans,
image=sPool.image, image=service_pool.image,
show_transports=sPool.show_transports, show_transports=service_pool.show_transports,
allow_users_remove=sPool.allow_users_remove, allow_users_remove=service_pool.allow_users_remove,
allow_users_reset=sPool.allow_users_reset, allow_users_reset=service_pool.allow_users_reset,
maintenance=sPool.is_in_maintenance(), maintenance=service_pool.is_in_maintenance(),
not_accesible=not sPool.is_access_allowed(now), not_accesible=not service_pool.is_access_allowed(now),
in_use=in_use, in_use=in_use,
to_be_replaced=toBeReplaced, to_be_replaced=replace_date_as_str,
to_be_replaced_text=toBeReplacedTxt, to_be_replaced_text=replace_date_info_text,
custom_calendar_text=sPool.calendar_message, custom_calendar_text=service_pool.calendar_message,
) )
) )
@ -427,7 +402,7 @@ def get_services_info_dict(
'services': services, 'services': services,
'ip': request.ip, 'ip': request.ip,
'nets': nets, 'nets': nets,
'transports': validTrans, 'transports': valid_transports,
'autorun': autorun, 'autorun': autorun,
} }

View File

@ -48,18 +48,18 @@ class LoginLogoutTest(test.UDSTestCase):
""" """
Test login and logout Test login and logout
""" """
auth = fixtures_authenticators.createAuthenticator() auth = fixtures_authenticators.create_authenticator()
# Create some ramdom users # Create some ramdom users
admins = fixtures_authenticators.createUsers( admins = fixtures_authenticators.create_users(
auth, number_of_users=8, is_admin=True auth, number_of_users=8, is_admin=True
) )
staffs = fixtures_authenticators.createUsers( staffs = fixtures_authenticators.create_users(
auth, number_of_users=8, is_staff=True auth, number_of_users=8, is_staff=True
) )
users = fixtures_authenticators.createUsers(auth, number_of_users=8) users = fixtures_authenticators.create_users(auth, number_of_users=8)
# Create some groups # Create some groups
groups = fixtures_authenticators.createGroups(auth, number_of_groups=32) groups = fixtures_authenticators.create_groups(auth, number_of_groups=32)
# Add users to some groups, ramdomly # Add users to some groups, ramdomly
for user in users + admins + staffs: for user in users + admins + staffs:

View File

@ -47,9 +47,9 @@ class ModelUUIDTest(UDSTestCase):
def setUp(self) -> None: def setUp(self) -> None:
super().setUp() super().setUp()
self.auth = authenticators_fixtures.createAuthenticator() self.auth = authenticators_fixtures.create_authenticator()
self.group = authenticators_fixtures.createGroups(self.auth, 1)[0] self.group = authenticators_fixtures.create_groups(self.auth, 1)[0]
self.user = authenticators_fixtures.createUsers(self.auth, 1, groups=[self.group])[0] self.user = authenticators_fixtures.create_users(self.auth, 1, groups=[self.group])[0]
def test_uuid_lowercase(self): def test_uuid_lowercase(self):
""" """

View File

@ -60,15 +60,15 @@ class PermissionsTest(UDSTestCase):
network: models.Network network: models.Network
def setUp(self) -> None: def setUp(self) -> None:
self.authenticator = authenticators_fixtures.createAuthenticator() self.authenticator = authenticators_fixtures.create_authenticator()
self.groups = authenticators_fixtures.createGroups(self.authenticator) self.groups = authenticators_fixtures.create_groups(self.authenticator)
self.users = authenticators_fixtures.createUsers( self.users = authenticators_fixtures.create_users(
self.authenticator, groups=self.groups self.authenticator, groups=self.groups
) )
self.admins = authenticators_fixtures.createUsers( self.admins = authenticators_fixtures.create_users(
self.authenticator, is_admin=True, groups=self.groups self.authenticator, is_admin=True, groups=self.groups
) )
self.staffs = authenticators_fixtures.createUsers( self.staffs = authenticators_fixtures.create_users(
self.authenticator, is_staff=True, groups=self.groups self.authenticator, is_staff=True, groups=self.groups
) )
self.userService = services_fixtures.create_one_cache_testing_userservice( self.userService = services_fixtures.create_one_cache_testing_userservice(

View File

@ -40,7 +40,7 @@ from uds.core.managers.crypto import CryptoManager
glob = {'user_id': 0, 'group_id': 0} glob = {'user_id': 0, 'group_id': 0}
def createAuthenticator( def create_authenticator(
authenticator: typing.Optional[models.Authenticator] = None, authenticator: typing.Optional[models.Authenticator] = None,
) -> models.Authenticator: ) -> models.Authenticator:
""" """
@ -61,7 +61,7 @@ def createAuthenticator(
return authenticator return authenticator
def createUsers( def create_users(
authenticator: models.Authenticator, authenticator: models.Authenticator,
number_of_users: int = 1, number_of_users: int = 1,
is_staff: bool = False, is_staff: bool = False,
@ -96,7 +96,7 @@ def createUsers(
return users return users
def createGroups( def create_groups(
authenticator: models.Authenticator, number_of_groups: int = 1 authenticator: models.Authenticator, number_of_groups: int = 1
) -> list[models.Group]: ) -> list[models.Group]:
""" """

View File

@ -159,7 +159,7 @@ def create_test_publication(
return publication return publication
def create_test_transport() -> models.Transport: def create_test_transport(**kwargs) -> models.Transport:
from uds.transports.Test import TestTransport from uds.transports.Test import TestTransport
values = TestTransport( values = TestTransport(
@ -170,6 +170,7 @@ def create_test_transport() -> models.Transport:
comments='Comment for Transport %d' % (glob['transport_id']), comments='Comment for Transport %d' % (glob['transport_id']),
data_type=TestTransport.type_type, data_type=TestTransport.type_type,
data=TestTransport(environment.Environment(str(glob['transport_id'])), values).serialize(), data=TestTransport(environment.Environment(str(glob['transport_id'])), values).serialize(),
**kwargs,
) )
glob['transport_id'] += 1 glob['transport_id'] += 1
return transport return transport
@ -255,9 +256,9 @@ def create_cache_testing_userservices(
from . import authenticators from . import authenticators
if not user or not groups: if not user or not groups:
auth = authenticators.createAuthenticator() auth = authenticators.create_authenticator()
groups = authenticators.createGroups(auth, 3) groups = authenticators.create_groups(auth, 3)
user = authenticators.createUsers(auth, 1, groups=groups)[0] user = authenticators.create_users(auth, 1, groups=groups)[0]
user_services: list[models.UserService] = [] user_services: list[models.UserService] = []
for _ in range(count): for _ in range(count):
user_services.append(create_one_cache_testing_userservice(createProvider(), user, groups, type_)) user_services.append(create_one_cache_testing_userservice(createProvider(), user, groups, type_))

View File

@ -65,23 +65,23 @@ class RESTTestCase(test.UDSTransactionTestCase):
def setUp(self) -> None: def setUp(self) -> None:
# Set up data for REST Test cases # Set up data for REST Test cases
# First, the authenticator related # First, the authenticator related
self.auth = authenticators_fixtures.createAuthenticator() self.auth = authenticators_fixtures.create_authenticator()
self.simple_groups = authenticators_fixtures.createGroups(self.auth, NUMBER_OF_ITEMS_TO_CREATE) self.simple_groups = authenticators_fixtures.create_groups(self.auth, NUMBER_OF_ITEMS_TO_CREATE)
self.meta_groups = authenticators_fixtures.createMetaGroups(self.auth, NUMBER_OF_ITEMS_TO_CREATE) self.meta_groups = authenticators_fixtures.createMetaGroups(self.auth, NUMBER_OF_ITEMS_TO_CREATE)
# Create some users, one admin, one staff and one user # Create some users, one admin, one staff and one user
self.admins = authenticators_fixtures.createUsers( self.admins = authenticators_fixtures.create_users(
self.auth, self.auth,
number_of_users=NUMBER_OF_ITEMS_TO_CREATE, number_of_users=NUMBER_OF_ITEMS_TO_CREATE,
is_admin=True, is_admin=True,
groups=self.groups, groups=self.groups,
) )
self.staffs = authenticators_fixtures.createUsers( self.staffs = authenticators_fixtures.create_users(
self.auth, self.auth,
number_of_users=NUMBER_OF_ITEMS_TO_CREATE, number_of_users=NUMBER_OF_ITEMS_TO_CREATE,
is_staff=True, is_staff=True,
groups=self.groups, groups=self.groups,
) )
self.plain_users = authenticators_fixtures.createUsers( self.plain_users = authenticators_fixtures.create_users(
self.auth, number_of_users=NUMBER_OF_ITEMS_TO_CREATE, groups=self.groups self.auth, number_of_users=NUMBER_OF_ITEMS_TO_CREATE, groups=self.groups
) )

View File

@ -52,24 +52,24 @@ class WEBTestCase(test.UDSTransactionTestCase):
def setUp(self) -> None: def setUp(self) -> None:
# Set up data for REST Test cases # Set up data for REST Test cases
# First, the authenticator related # First, the authenticator related
self.auth = authenticators_fixtures.createAuthenticator() self.auth = authenticators_fixtures.create_authenticator()
self.groups = authenticators_fixtures.createGroups( self.groups = authenticators_fixtures.create_groups(
self.auth, NUMBER_OF_ITEMS_TO_CREATE self.auth, NUMBER_OF_ITEMS_TO_CREATE
) )
# Create some users, one admin, one staff and one user # Create some users, one admin, one staff and one user
self.admins = authenticators_fixtures.createUsers( self.admins = authenticators_fixtures.create_users(
self.auth, self.auth,
number_of_users=NUMBER_OF_ITEMS_TO_CREATE, number_of_users=NUMBER_OF_ITEMS_TO_CREATE,
is_admin=True, is_admin=True,
groups=self.groups, groups=self.groups,
) )
self.staffs = authenticators_fixtures.createUsers( self.staffs = authenticators_fixtures.create_users(
self.auth, self.auth,
number_of_users=NUMBER_OF_ITEMS_TO_CREATE, number_of_users=NUMBER_OF_ITEMS_TO_CREATE,
is_staff=True, is_staff=True,
groups=self.groups, groups=self.groups,
) )
self.plain_users = authenticators_fixtures.createUsers( self.plain_users = authenticators_fixtures.create_users(
self.auth, number_of_users=NUMBER_OF_ITEMS_TO_CREATE, groups=self.groups self.auth, number_of_users=NUMBER_OF_ITEMS_TO_CREATE, groups=self.groups
) )

View File

@ -60,18 +60,18 @@ class WebLoginLogoutTest(test.WEBTestCase):
""" """
Test login and logout Test login and logout
""" """
auth = fixtures_authenticators.createAuthenticator() auth = fixtures_authenticators.create_authenticator()
# Create some ramdom users # Create some ramdom users
admins = fixtures_authenticators.createUsers( admins = fixtures_authenticators.create_users(
auth, number_of_users=8, is_admin=True auth, number_of_users=8, is_admin=True
) )
stafs = fixtures_authenticators.createUsers( stafs = fixtures_authenticators.create_users(
auth, number_of_users=8, is_staff=True auth, number_of_users=8, is_staff=True
) )
users = fixtures_authenticators.createUsers(auth, number_of_users=8) users = fixtures_authenticators.create_users(auth, number_of_users=8)
# Create some groups # Create some groups
groups = fixtures_authenticators.createGroups(auth, number_of_groups=32) groups = fixtures_authenticators.create_groups(auth, number_of_groups=32)
# Add users to some groups, ramdomly # Add users to some groups, ramdomly
for user in users + admins + stafs: for user in users + admins + stafs:
@ -118,8 +118,8 @@ class WebLoginLogoutTest(test.WEBTestCase):
self.assertInvalidLogin(response) self.assertInvalidLogin(response)
def test_login_valid_user_no_group(self): def test_login_valid_user_no_group(self):
user = fixtures_authenticators.createUsers( user = fixtures_authenticators.create_users(
fixtures_authenticators.createAuthenticator(), fixtures_authenticators.create_authenticator(),
)[0] )[0]
response = self.do_login(user.name, user.name, user.manager.uuid) response = self.do_login(user.name, user.name, user.manager.uuid)
@ -127,8 +127,8 @@ class WebLoginLogoutTest(test.WEBTestCase):
self.assertEqual(models.Log.objects.count(), 4) self.assertEqual(models.Log.objects.count(), 4)
user = fixtures_authenticators.createUsers( user = fixtures_authenticators.create_users(
fixtures_authenticators.createAuthenticator(), fixtures_authenticators.create_authenticator(),
is_staff=True, is_staff=True,
)[0] )[0]
@ -137,8 +137,8 @@ class WebLoginLogoutTest(test.WEBTestCase):
self.assertEqual(models.Log.objects.count(), 8) self.assertEqual(models.Log.objects.count(), 8)
user = fixtures_authenticators.createUsers( user = fixtures_authenticators.create_users(
fixtures_authenticators.createAuthenticator(), fixtures_authenticators.create_authenticator(),
is_admin=True, is_admin=True,
)[0] )[0]
@ -148,8 +148,8 @@ class WebLoginLogoutTest(test.WEBTestCase):
self.assertEqual(models.Log.objects.count(), 12) self.assertEqual(models.Log.objects.count(), 12)
def test_login_invalid_user(self): def test_login_invalid_user(self):
user = fixtures_authenticators.createUsers( user = fixtures_authenticators.create_users(
fixtures_authenticators.createAuthenticator(), fixtures_authenticators.create_authenticator(),
)[0] )[0]
response = self.do_login(user.name, 'wrong password', user.manager.uuid) response = self.do_login(user.name, 'wrong password', user.manager.uuid)
@ -159,8 +159,8 @@ class WebLoginLogoutTest(test.WEBTestCase):
# + 2 system logs (auth.log), one for each failed login # + 2 system logs (auth.log), one for each failed login
self.assertEqual(models.Log.objects.count(), 6) self.assertEqual(models.Log.objects.count(), 6)
user = fixtures_authenticators.createUsers( user = fixtures_authenticators.create_users(
fixtures_authenticators.createAuthenticator(), fixtures_authenticators.create_authenticator(),
is_staff=True, is_staff=True,
)[0] )[0]
@ -169,8 +169,8 @@ class WebLoginLogoutTest(test.WEBTestCase):
self.assertEqual(models.Log.objects.count(), 12) self.assertEqual(models.Log.objects.count(), 12)
user = fixtures_authenticators.createUsers( user = fixtures_authenticators.create_users(
fixtures_authenticators.createAuthenticator(), fixtures_authenticators.create_authenticator(),
is_admin=True, is_admin=True,
)[0] )[0]

View File

@ -31,19 +31,21 @@
""" """
@author: Adolfo Gómez, dkmaster at dkmon dot com @author: Adolfo Gómez, dkmaster at dkmon dot com
""" """
# We use commit/rollback
import datetime
import typing
import collections.abc import collections.abc
import datetime
import functools
import itertools
import random
import typing
from unittest import mock from unittest import mock
from uds import models from uds import models
from uds.core import types, consts from uds.core import consts, types
from uds.web.util import services from uds.web.util import services
from ...utils.test import UDSTransactionTestCase
from ...fixtures import authenticators as fixtures_authenticators from ...fixtures import authenticators as fixtures_authenticators
from ...fixtures import services as fixtures_services from ...fixtures import services as fixtures_services
from ...utils.test import UDSTransactionTestCase
class TestGetServicesData(UDSTransactionTestCase): class TestGetServicesData(UDSTransactionTestCase):
@ -51,14 +53,14 @@ class TestGetServicesData(UDSTransactionTestCase):
auth: models.Authenticator auth: models.Authenticator
groups: list[models.Group] groups: list[models.Group]
user: models.User user: models.User
transports: list[models.Transport]
def setUp(self) -> None: def setUp(self) -> None:
# We need to create a user with some services # We need to create a user with some services
self.auth = fixtures_authenticators.createAuthenticator() self.auth = fixtures_authenticators.create_authenticator()
self.groups = fixtures_authenticators.createGroups(self.auth, 3) self.groups = fixtures_authenticators.create_groups(self.auth, 3)
self.user = fixtures_authenticators.createUsers( self.user = fixtures_authenticators.create_users(self.auth, 1, groups=self.groups)[0]
self.auth, 1, groups=self.groups self.transports = [fixtures_services.create_test_transport(priority=counter, label=f'label{counter}') for counter in range(10)]
)[0]
self.request = mock.Mock() self.request = mock.Mock()
self.request.user = self.user self.request.user = self.user
@ -73,11 +75,35 @@ class TestGetServicesData(UDSTransactionTestCase):
return super().setUp() return super().setUp()
def _create_metapools(
self,
grouping_method: types.pools.TransportSelectionPolicy,
ha_policy: types.pools.HighAvailabilityPolicy = types.pools.HighAvailabilityPolicy.DISABLED,
) -> None:
# Create 10 services, for this user
service_pools: list[models.ServicePool] = []
for i in range(110):
service_pools.append(
fixtures_services.create_cache_testing_userservices(
count=1, user=self.user, groups=self.groups
)[0].deployed_service
)
# Create 10 meta services, for this user, last 10 user_services will not be added to meta pools
meta_services: list[models.MetaPool] = []
for i in range(10):
service_pool = fixtures_services.create_test_metapool(
service_pools=service_pools[i * 10 : (i + 1) * 10],
groups=self.groups,
transport_grouping=grouping_method,
)
meta_services.append(service_pool)
def test_get_services_data(self) -> None: def test_get_services_data(self) -> None:
# Create 10 services, for this user # Create 10 services, for this user
user_services: list[models.ServicePool] = [] service_pools: list[models.ServicePool] = []
for i in range(10): for i in range(10):
user_services.append( service_pools.append(
fixtures_services.create_cache_testing_userservices( fixtures_services.create_cache_testing_userservices(
count=1, user=self.user, groups=self.groups count=1, user=self.user, groups=self.groups
)[0].deployed_service )[0].deployed_service
@ -93,9 +119,7 @@ class TestGetServicesData(UDSTransactionTestCase):
# 'transports': validTrans, # 'transports': validTrans,
# 'autorun': autorun, # 'autorun': autorun,
# } # }
result_services: typing.Final[ result_services: typing.Final[list[dict[str, typing.Any]]] = data['services']
list[collections.abc.Mapping[str, typing.Any]]
] = data['services']
self.assertEqual(len(result_services), 10) self.assertEqual(len(result_services), 10)
self.assertEqual(data['ip'], '127.0.0.1') self.assertEqual(data['ip'], '127.0.0.1')
self.assertEqual(len(data['nets']), 0) self.assertEqual(len(data['nets']), 0)
@ -126,7 +150,7 @@ class TestGetServicesData(UDSTransactionTestCase):
for user_service in result_services: for user_service in result_services:
# Locate user service in user_services # Locate user service in user_services
found: models.ServicePool = next( found: models.ServicePool = next(
(x for x in user_services if x.uuid == user_service['id'][1:]), (x for x in service_pools if x.uuid == user_service['id'][1:]),
models.ServicePool(uuid='x'), models.ServicePool(uuid='x'),
) )
if found.uuid == 'x': if found.uuid == 'x':
@ -136,39 +160,27 @@ class TestGetServicesData(UDSTransactionTestCase):
self.assertEqual(user_service['name'], found.name) self.assertEqual(user_service['name'], found.name)
self.assertEqual(user_service['visual_name'], found.visual_name) self.assertEqual(user_service['visual_name'], found.visual_name)
self.assertEqual(user_service['description'], found.comments) self.assertEqual(user_service['description'], found.comments)
self.assertEqual( self.assertEqual(user_service['group'], models.ServicePoolGroup.default().as_dict)
user_service['group'], models.ServicePoolGroup.default().as_dict
)
self.assertEqual( self.assertEqual(
[(i['id'], i['name']) for i in user_service['transports']], [(i['id'], i['name']) for i in user_service['transports']],
[(t.uuid, t.name) for t in found.transports.all()], [(t.uuid, t.name) for t in found.transports.all()],
) )
self.assertEqual( self.assertEqual(user_service['imageId'], found.image and found.image.uuid or 'x')
user_service['imageId'], found.image and found.image.uuid or 'x'
)
self.assertEqual(user_service['show_transports'], found.show_transports) self.assertEqual(user_service['show_transports'], found.show_transports)
self.assertEqual( self.assertEqual(user_service['allow_users_remove'], found.allow_users_remove)
user_service['allow_users_remove'], found.allow_users_remove
)
self.assertEqual(user_service['allow_users_reset'], found.allow_users_reset) self.assertEqual(user_service['allow_users_reset'], found.allow_users_reset)
self.assertEqual( self.assertEqual(user_service['maintenance'], found.service.provider.maintenance_mode)
user_service['maintenance'], found.service.provider.maintenance_mode self.assertEqual(user_service['not_accesible'], not found.is_access_allowed(now))
) self.assertEqual(user_service['in_use'], found.userServices.filter(in_use=True).count())
self.assertEqual(
user_service['not_accesible'], not found.is_access_allowed(now)
)
self.assertEqual(
user_service['in_use'], found.userServices.filter(in_use=True).count()
)
self.assertEqual(user_service['to_be_replaced'], None) self.assertEqual(user_service['to_be_replaced'], None)
self.assertEqual(user_service['to_be_replaced_text'], '') self.assertEqual(user_service['to_be_replaced_text'], '')
self.assertEqual(user_service['custom_calendar_text'], '') self.assertEqual(user_service['custom_calendar_text'], '')
def test_get_meta_services_data(self) -> None: def test_get_meta_services_data(self) -> None:
# Create 10 services, for this user # Create 10 services, for this user
user_services: list[models.ServicePool] = [] service_pools: list[models.ServicePool] = []
for i in range(100): for i in range(100):
user_services.append( service_pools.append(
fixtures_services.create_cache_testing_userservices( fixtures_services.create_cache_testing_userservices(
count=1, user=self.user, groups=self.groups count=1, user=self.user, groups=self.groups
)[0].deployed_service )[0].deployed_service
@ -179,17 +191,14 @@ class TestGetServicesData(UDSTransactionTestCase):
for i in range(10): for i in range(10):
meta_services.append( meta_services.append(
fixtures_services.create_test_metapool( fixtures_services.create_test_metapool(
service_pools=user_services[i * 10 : (i + 1) * 10], groups=self.groups service_pools=service_pools[i * 10 : (i + 1) * 10], groups=self.groups
) )
) )
data = services.get_services_info_dict(self.request) data = services.get_services_info_dict(self.request)
now = datetime.datetime.now() now = datetime.datetime.now()
result_services: typing.Final[ result_services: typing.Final[list[dict[str, typing.Any]]] = data['services']
list[collections.abc.Mapping[str, typing.Any]]
] = data['services']
self.assertEqual(len(result_services), 10) self.assertEqual(len(result_services), 10)
self.assertEqual(data['ip'], '127.0.0.1') self.assertEqual(data['ip'], '127.0.0.1')
self.assertEqual(len(data['nets']), 0) self.assertEqual(len(data['nets']), 0)
@ -209,12 +218,8 @@ class TestGetServicesData(UDSTransactionTestCase):
self.assertEqual(user_service['name'], found.name) self.assertEqual(user_service['name'], found.name)
self.assertEqual(user_service['visual_name'], found.visual_name) self.assertEqual(user_service['visual_name'], found.visual_name)
self.assertEqual(user_service['description'], found.comments) self.assertEqual(user_service['description'], found.comments)
self.assertEqual( self.assertEqual(user_service['group'], models.ServicePoolGroup.default().as_dict)
user_service['group'], models.ServicePoolGroup.default().as_dict self.assertEqual(user_service['not_accesible'], not found.is_access_allowed(now))
)
self.assertEqual(
user_service['not_accesible'], not found.is_access_allowed(now)
)
self.assertEqual(user_service['to_be_replaced'], None) self.assertEqual(user_service['to_be_replaced'], None)
self.assertEqual(user_service['to_be_replaced_text'], '') self.assertEqual(user_service['to_be_replaced_text'], '')
self.assertEqual(user_service['custom_calendar_text'], '') self.assertEqual(user_service['custom_calendar_text'], '')
@ -238,16 +243,109 @@ class TestGetServicesData(UDSTransactionTestCase):
) )
) )
data = services.get_services_info_dict(self.request) data = services.get_services_info_dict(self.request)
now = datetime.datetime.now() now = datetime.datetime.now()
result_services: typing.Final[ result_services: typing.Final[list[dict[str, typing.Any]]] = data['services']
list[collections.abc.Mapping[str, typing.Any]]
] = data['services']
self.assertEqual(len(result_services), 20) # 10 metas and 10 normal pools self.assertEqual(len(result_services), 20) # 10 metas and 10 normal pools
# Some checks are ommited, because are already tested in other tests # Some checks are ommited, because are already tested in other tests
self.assertEqual(len(list(filter(lambda x: x['is_meta'], result_services))), 10) self.assertEqual(len(list(filter(lambda x: x['is_meta'], result_services))), 10)
self.assertEqual(len(list(filter(lambda x: not x['is_meta'], result_services))), 10) self.assertEqual(len(list(filter(lambda x: not x['is_meta'], result_services))), 10)
def _generate_metapool_with_transports(
self, count: int, transport_grouping: types.pools.TransportSelectionPolicy, *,
add_random_transports: bool
) -> tuple[list[models.ServicePool], models.MetaPool]:
service_pools: list[models.ServicePool] = []
for i in range(count):
pool = fixtures_services.create_cache_testing_userservices(
count=1, user=self.user, groups=self.groups
)[0].deployed_service
pool.transports.add(*self.transports[:3]) # Add the first 3 transports to all pools
# add some random transports to each pool after the three common ones
if add_random_transports:
pool.transports.add(*random.sample(self.transports[3:], 3))
service_pools.append(pool)
return service_pools, fixtures_services.create_test_metapool(
service_pools=service_pools,
groups=self.groups,
transport_grouping=transport_grouping,
)
def test_meta_common_grouping(self) -> None:
# For this test, we don't mind returned value, we just want to create the pools on db
self._generate_metapool_with_transports(
10, types.pools.TransportSelectionPolicy.COMMON, # Group by common transports
add_random_transports=True
)
# Now, get the data
data = services.get_services_info_dict(self.request)
# Now, check that the meta pool has the same transports as the common ones
result_services: typing.Final[list[dict[str, typing.Any]]] = data['services']
# We except 1 result only, a meta pool (is_meta = True)
self.assertEqual(len(result_services), 1)
self.assertEqual(result_services[0]['is_meta'], True)
# Transpors for this meta pool should be the common ones, ordered by priority
# First compose a list of the common transports, ordered by priority
common_transports_ids = [t.uuid for t in sorted(self.transports[:3], key=lambda x: x.priority)]
# Now, check that the transports are the same, and ordered by priority
self.assertEqual([t['id'] for t in result_services[0]['transports']], common_transports_ids)
def test_meta_auto_grouping(self) -> None:
self._generate_metapool_with_transports(
10, types.pools.TransportSelectionPolicy.AUTO, # Group by common transports
add_random_transports=True
)
# Now, get the data
data = services.get_services_info_dict(self.request)
result_services: typing.Final[list[dict[str, typing.Any]]] = data['services']
# We except 1 result only, a meta pool (is_meta = True)
self.assertEqual(len(result_services), 1)
self.assertEqual(result_services[0]['is_meta'], True)
# Transport should be {'id': 'meta', 'name: 'meta', 'priority': 0, 'link': (an udsa://... link}, and only one
self.assertEqual(len(result_services[0]['transports']), 1)
transport = result_services[0]['transports'][0]
self.assertEqual(transport['id'], 'meta')
self.assertEqual(transport['name'], 'meta')
self.assertEqual(transport['priority'], 0)
self.assertTrue(transport['link'].startswith(consts.system.UDS_ACTION_SCHEME))
def test_meta_label_grouping(self) -> None:
pools, meta = self._generate_metapool_with_transports(
10, types.pools.TransportSelectionPolicy.LABEL, # Group by common transports
add_random_transports=False
)
# Now we hav to had 2 same labels on the transports, add some ramdon to transports, but ensuring
# that no transport is assigned to more ALL the transports
possible_transports = self.transports[3:]
transport_iterator = itertools.cycle(possible_transports)
for pool in pools:
pool.transports.add(next(transport_iterator))
pool.transports.add(next(transport_iterator))
# Whe know for sure that the only transports valid are the first 3 ones, because the rest are not present
# in ALL the transports
# Now, get the data
data = services.get_services_info_dict(self.request)
result_services: typing.Final[list[dict[str, typing.Any]]] = data['services']
# We except 1 result only, a meta pool (is_meta = True)
self.assertEqual(len(result_services), 1)
self.assertEqual(result_services[0]['is_meta'], True)
# should have 3 transports, the first 3 ones
self.assertEqual(len(result_services[0]['transports']), 3)
# id should be "LABEL:[the label]" for each transport. We added trasnports label "label0", "label1" and "label2", same as priority
self.assertEqual([t['id'] for t in result_services[0]['transports']], ['LABEL:label0', 'LABEL:label1', 'LABEL:label2'])
# And priority should be 0, 1 and 2
self.assertEqual([t['priority'] for t in result_services[0]['transports']], [0, 1, 2])