1
0
mirror of https://github.com/dkmstr/openuds.git synced 2025-01-22 22:03:54 +03:00

REST fixes.

Fixed several bugs arised with refactorization (Most related to type checkin on runtime), and added support for graduan moving custom methods to snake_case keeping backwards camelCase compat
This commit is contained in:
Adolfo Gómez García 2024-03-15 01:32:58 +01:00
parent cf8660aa78
commit 4ee8697827
No known key found for this signature in database
GPG Key ID: DD1ABF20724CDA23
9 changed files with 169 additions and 98 deletions

View File

@ -39,7 +39,7 @@ from django.utils.translation import gettext as _
from uds.core import types, consts
from uds.core.util import log, ensure
from uds.core.util.model import process_uuid
from uds.models import Calendar, CalendarAction, CalendarAccess, ServicePool
from uds import models
from uds.REST.model import DetailHandler
# Not imported at runtime, just for type checking
@ -54,7 +54,7 @@ DENY = 'DENY'
class AccessCalendars(DetailHandler):
@staticmethod
def as_dict(item: 'CalendarAccess') -> types.rest.ItemDictType:
def as_dict(item: 'models.CalendarAccess|models.CalendarAccessMeta') -> types.rest.ItemDictType:
return {
'id': item.uuid,
'calendar_id': item.calendar.uuid,
@ -64,13 +64,13 @@ class AccessCalendars(DetailHandler):
}
def get_items(self, parent: 'Model', item: typing.Optional[str]) -> types.rest.ManyItemsDictType:
parent = ensure.is_instance(parent, ServicePool)
# parent can be a ServicePool or a metaPool
parent = typing.cast(typing.Union['models.ServicePool', 'models.MetaPool'], parent)
try:
if not item:
return [AccessCalendars.as_dict(i) for i in parent.calendarAccess.all()]
return AccessCalendars.as_dict(
parent.calendarAccess.get(uuid=process_uuid(item))
)
return AccessCalendars.as_dict(parent.calendarAccess.get(uuid=process_uuid(item)))
except Exception as e:
logger.exception('err: %s', item)
raise self.invalid_item_response() from e
@ -86,34 +86,27 @@ class AccessCalendars(DetailHandler):
]
def save_item(self, parent: 'Model', item: typing.Optional[str]) -> None:
parent = ensure.is_instance(parent, ServicePool)
parent = typing.cast(typing.Union['models.ServicePool', 'models.MetaPool'], parent)
# If already exists
uuid = process_uuid(item) if item is not None else None
try:
calendar: Calendar = Calendar.objects.get(
uuid=process_uuid(self._params['calendar_id'])
)
calendar: models.Calendar = models.Calendar.objects.get(uuid=process_uuid(self._params['calendar_id']))
access: str = self._params['access'].upper()
if access not in (ALLOW, DENY):
raise Exception()
except Exception as e:
raise self.invalid_request_response(
_('Invalid parameters on request')
) from e
raise self.invalid_request_response(_('Invalid parameters on request')) from e
priority = int(self._params['priority'])
if uuid is not None:
calAccess: 'CalendarAccess' = parent.calendarAccess.get(uuid=uuid)
calAccess = parent.calendarAccess.get(uuid=uuid)
calAccess.calendar = calendar
calAccess.service_pool = parent
calAccess.access = access
calAccess.priority = priority
calAccess.save()
else:
parent.calendarAccess.create(
calendar=calendar, access=access, priority=priority
)
parent.calendarAccess.create(calendar=calendar, access=access, priority=priority)
log.log(
parent,
@ -123,7 +116,7 @@ class AccessCalendars(DetailHandler):
)
def delete_item(self, parent: 'Model', item: str) -> None:
parent = ensure.is_instance(parent, ServicePool)
parent = typing.cast(typing.Union['models.ServicePool', 'models.MetaPool'], parent)
calendarAccess = parent.calendarAccess.get(uuid=process_uuid(self._args[0]))
logStr = f'Removed access calendar {calendarAccess.calendar.name} by {self._user.pretty_name}'
calendarAccess.delete()
@ -141,7 +134,7 @@ class ActionsCalendars(DetailHandler):
]
@staticmethod
def as_dict(item: 'CalendarAction') -> dict[str, typing.Any]:
def as_dict(item: 'models.CalendarAction') -> dict[str, typing.Any]:
action = consts.calendar.CALENDAR_ACTION_DICT.get(item.action)
descrption = action.get('description') if action is not None else ''
params = json.loads(item.params)
@ -160,12 +153,10 @@ class ActionsCalendars(DetailHandler):
}
def get_items(self, parent: 'Model', item: typing.Optional[str]) -> types.rest.ManyItemsDictType:
parent = ensure.is_instance(parent, ServicePool)
parent = ensure.is_instance(parent, models.ServicePool)
try:
if item is None:
return [
ActionsCalendars.as_dict(i) for i in parent.calendaraction_set.all()
]
return [ActionsCalendars.as_dict(i) for i in parent.calendaraction_set.all()]
i = parent.calendaraction_set.get(uuid=process_uuid(item))
return ActionsCalendars.as_dict(i)
except Exception as e:
@ -179,18 +170,25 @@ class ActionsCalendars(DetailHandler):
{'calendar': {'title': _('Calendar')}},
{'description': {'title': _('Action')}},
{'pretty_params': {'title': _('Parameters')}},
{'at_start': {'title': _('Relative to')}},
{
'at_start': {
'title': _('Relative to'),
'type': 'dict',
'dict': {True: _('Start'), False: _('End')},
}
},
# {'at_start': {'title': _('At start')}},
{'events_offset': {'title': _('Time offset')}},
{'next_execution': {'title': _('Next execution'), 'type': 'datetime'}},
{'last_execution': {'title': _('Last execution'), 'type': 'datetime'}},
]
def save_item(self, parent: 'Model', item: typing.Optional[str]) -> None:
parent = ensure.is_instance(parent, ServicePool)
parent = ensure.is_instance(parent, models.ServicePool)
# If already exists
uuid = process_uuid(item) if item is not None else None
calendar = Calendar.objects.get(uuid=process_uuid(self._params['calendar_id']))
calendar = models.Calendar.objects.get(uuid=process_uuid(self._params['calendar_id']))
action = self._params['action'].upper()
if action not in consts.calendar.CALENDAR_ACTION_DICT:
raise self.invalid_request_response()
@ -199,14 +197,14 @@ class ActionsCalendars(DetailHandler):
params = json.dumps(self._params['params'])
# logger.debug('Got parameters: {} {} {} {} ----> {}'.format(calendar, action, events_offset, at_start, params))
logStr = (
log_string = (
f'{"Added" if uuid is None else "Updated"} scheduled action '
f'{calendar.name},{action},{events_offset},{"start" if at_start else "end"},{params} '
f'by {self._user.pretty_name}'
)
if uuid is not None:
calAction = CalendarAction.objects.get(uuid=uuid)
calAction = models.CalendarAction.objects.get(uuid=uuid)
calAction.calendar = calendar
calAction.service_pool = parent
calAction.action = action
@ -215,7 +213,7 @@ class ActionsCalendars(DetailHandler):
calAction.params = params
calAction.save()
else:
CalendarAction.objects.create(
models.CalendarAction.objects.create(
calendar=calendar,
service_pool=parent,
action=action,
@ -224,11 +222,11 @@ class ActionsCalendars(DetailHandler):
params=params,
)
log.log(parent, log.LogLevel.INFO, logStr, log.LogSource.ADMIN)
log.log(parent, log.LogLevel.INFO, log_string, log.LogSource.ADMIN)
def delete_item(self, parent: 'Model', item: str) -> None:
parent = ensure.is_instance(parent, ServicePool)
calendarAction = CalendarAction.objects.get(uuid=process_uuid(self._args[0]))
parent = ensure.is_instance(parent, models.ServicePool)
calendarAction = models.CalendarAction.objects.get(uuid=process_uuid(self._args[0]))
logStr = (
f'Removed scheduled action "{calendarAction.calendar.name},'
f'{calendarAction.action},{calendarAction.events_offset},'
@ -241,10 +239,10 @@ class ActionsCalendars(DetailHandler):
log.log(parent, log.LogLevel.INFO, logStr, log.LogSource.ADMIN)
def execute(self, parent: 'Model', item: str) -> typing.Any:
parent = ensure.is_instance(parent, ServicePool)
parent = ensure.is_instance(parent, models.ServicePool)
logger.debug('Launching action')
uuid = process_uuid(item)
calendarAction: CalendarAction = CalendarAction.objects.get(uuid=uuid)
calendarAction: models.CalendarAction = models.CalendarAction.objects.get(uuid=uuid)
self.ensure_has_access(calendarAction, types.permissions.PermissionType.MANAGEMENT)
logStr = (

View File

@ -96,6 +96,8 @@ class ServicesPools(ModelHandler):
'ignores_unused',
'account_id',
'calendar_message',
'custom_message',
'display_custom_message',
]
remove_fields = ['osmanager_id', 'service_id']
@ -223,6 +225,8 @@ class ServicesPools(ModelHandler):
{'id': i.meta_pool.uuid, 'name': i.meta_pool.name} for i in item.memberOfMeta.all()
],
'calendar_message': item.calendar_message,
'custom_message': item.custom_message,
'display_custom_message': item.display_custom_message,
}
# Extended info
@ -383,6 +387,26 @@ class ServicesPools(ModelHandler):
'order': 122,
'tab': gettext('Display'),
},
{
'name': 'custom_message',
'value': '',
'label': gettext('Custom launch message text'),
'tooltip': gettext(
'Custom message to be shown to users, if active, when trying to start a service from this pool.'
),
'type': types.ui.FieldType.TEXT,
'order': 123,
'tab': gettext('Display'),
},
{
'name': 'display_custom_message',
'value': False,
'label': gettext('Enable custom launch message'),
'tooltip': gettext('If active, the custom launch message will be shown to users'),
'type': types.ui.FieldType.CHECKBOX,
'order': 124,
'tab': gettext('Display'),
},
{
'name': 'initial_srvs',
'value': '0',
@ -475,10 +499,10 @@ class ServicesPools(ModelHandler):
serviceType = service.get_type()
if serviceType.publication_type is None:
self._params['publish_on_save'] = False
fields['publish_on_save'] = False
if serviceType.can_reset is False:
self._params['allow_users_reset'] = False
fields['allow_users_reset'] = False
if serviceType.needs_osmanager is True:
osmanager = OSManager.objects.get(uuid=process_uuid(fields['osmanager_id']))
@ -592,7 +616,7 @@ class ServicesPools(ModelHandler):
item = ensure.is_instance(item, ServicePool)
self.ensure_has_access(item, types.permissions.PermissionType.MANAGEMENT)
fallback = self._params.get('fallbackAccess')
fallback = self._params.get('fallbackAccess', self.params.get('fallback', None))
if fallback:
logger.debug('Setting fallback of %s to %s', item.name, fallback)
item.fallbackAccess = fallback
@ -639,6 +663,7 @@ class ServicesPools(ModelHandler):
consts.calendar.CALENDAR_ACTION_IGNORE_UNUSED,
consts.calendar.CALENDAR_ACTION_REMOVE_USERSERVICES,
consts.calendar.CALENDAR_ACTION_REMOVE_STUCK_USERSERVICES,
consts.calendar.CALENDAR_ACTION_DISPLAY_CUSTOM_MESSAGE,
]
return valid_actions

View File

@ -321,7 +321,8 @@ class Groups(DetailHandler):
"""
def get_items(self, parent: 'Model', item: typing.Optional[str]) -> types.rest.ManyItemsDictType:
parent = ensure.is_instance(parent, models.ServicePool)
parent = typing.cast(typing.Union['models.ServicePool', 'models.MetaPool'], parent)
return [
{
'id': group.uuid,
@ -337,7 +338,7 @@ class Groups(DetailHandler):
]
def get_title(self, parent: 'Model') -> str:
parent = ensure.is_instance(parent, models.ServicePool)
parent = typing.cast(typing.Union['models.ServicePool', 'models.MetaPool'], parent)
return _('Assigned groups')
def get_fields(self, parent: 'Model') -> list[typing.Any]:
@ -369,7 +370,8 @@ class Groups(DetailHandler):
return types.ui.RowStyleInfo(prefix='row-state-', field='state')
def save_item(self, parent: 'Model', item: typing.Optional[str]) -> None:
parent = ensure.is_instance(parent, models.ServicePool)
parent = typing.cast(typing.Union['models.ServicePool', 'models.MetaPool'], parent)
group: models.Group = models.Group.objects.get(uuid=process_uuid(self._params['id']))
parent.assignedGroups.add(group)
log.log(

View File

@ -44,6 +44,7 @@ from uds.core.util.model import process_uuid
from uds.REST.utils import rest_result
from .base import BaseModelHandler
from ..utils import camel_and_snake_case_from
# Not imported at runtime, just for type checking
if typing.TYPE_CHECKING:
@ -113,13 +114,14 @@ class DetailHandler(BaseModelHandler):
:param parent: Parent Model Element
:param arg: argument to pass to custom method
"""
logger.debug('Checking custom method %s', check)
if check in self.custom_methods:
operation = getattr(self, check)
if not arg:
return operation(parent)
return operation(parent, arg)
for to_check in self.custom_methods:
camel_case_name, snake_case_name = camel_and_snake_case_from(to_check)
if check in (camel_case_name, snake_case_name):
operation = getattr(self, snake_case_name, None) or getattr(self, camel_case_name, None)
if operation:
if not arg:
return operation(parent)
return operation(parent, arg)
return None

View File

@ -48,6 +48,7 @@ from uds.core.util import log, permissions
from uds.models import ManagedObjectModel, Tag, TaggingMixin
from .base import BaseModelHandler
from ..utils import camel_and_snake_case_from
# Not imported at runtime, just for type checking
if typing.TYPE_CHECKING:
@ -352,15 +353,12 @@ class ModelHandler(BaseModelHandler):
# if has custom methods, look for if this request matches any of them
for cm in self.custom_methods:
# Convert to snake case
snake_case_name = re.sub(r'(?<!^)(?=[A-Z])', '_', cm[0]).lower()
# And snake case to camel case (first letter lower case, rest upper case)
camel_case_name = ''.join(x.capitalize() for x in snake_case_name.split('_'))
camel_case_name = camel_case_name[0].lower() + camel_case_name[1:]
camel_case_name, snake_case_name = camel_and_snake_case_from(cm[0])
if nArgs > 1 and cm[1] is True: # Method needs parent (existing item)
if self._args[1] in (camel_case_name, snake_case_name):
item = None
# Check if operation method exists
operation = getattr(self, snake_case_name) or getattr(self, camel_case_name)
operation = getattr(self, snake_case_name, None) or getattr(self, camel_case_name, None)
try:
if not operation:
raise Exception() # Operation not found

View File

@ -29,6 +29,7 @@
Author: Adolfo Gómez, dkmaster at dkmon dot com
'''
import typing
import re
from uds.core.consts.system import VERSION
from uds.core.util.model import sql_stamp_seconds
@ -40,3 +41,15 @@ def rest_result(result: typing.Any, **kwargs: typing.Any) -> dict[str, typing.An
'''
# A common possible value in kwargs is "error"
return {'result': result, 'stamp': sql_stamp_seconds(), 'version': VERSION, **kwargs}
def camel_and_snake_case_from(text: str) -> tuple[str, str]:
'''
Returns a tuple with the camel case and snake case of a text
first value is camel case, second is snake case
'''
snake_case_name = re.sub(r'(?<!^)(?=[A-Z])', '_', text).lower()
# And snake case to camel case (first letter lower case, rest upper case)
camel_case_name = ''.join(x.capitalize() for x in snake_case_name.split('_'))
camel_case_name = camel_case_name[0].lower() + camel_case_name[1:]
return camel_case_name, snake_case_name

View File

@ -180,6 +180,18 @@ CALENDAR_ACTION_CLEAN_CACHE_L2: typing.Final['CalendarAction'] = {
'params': (),
}
CALENDAR_ACTION_DISPLAY_CUSTOM_MESSAGE: typing.Final['CalendarAction'] = {
'id': 'DISPLAY_CUSTOM_MESSAGE',
'description': _('Custom message on launchers'),
'params': (
{
'type': 'bool',
'name': 'visible',
'description': _('Visible'),
'default': False,
},
),
}
CALENDAR_ACTION_DICT: typing.Final[dict[str, 'CalendarAction']] = {
c['id']: c
@ -200,5 +212,6 @@ CALENDAR_ACTION_DICT: typing.Final[dict[str, 'CalendarAction']] = {
CALENDAR_ACTION_REMOVE_STUCK_USERSERVICES,
CALENDAR_ACTION_CLEAN_CACHE_L1,
CALENDAR_ACTION_CLEAN_CACHE_L2,
CALENDAR_ACTION_DISPLAY_CUSTOM_MESSAGE,
)
}

View File

@ -139,57 +139,57 @@ class CalendarAction(UUIDModel):
self.last_execution = sql_datetime()
params = json.loads(self.params)
saveServicePool = save
should_save_servicepool = save
def numVal(field: str) -> int:
def _numeric_value(field: str) -> int:
v = int(params[field])
return v if v >= 0 else 0
# Actions related to calendar actions
def set_l1_cache() -> None:
self.service_pool.cache_l1_srvs = numVal('size')
def _set_l1_cache() -> None:
self.service_pool.cache_l1_srvs = _numeric_value('size')
def set_l2_cache() -> None:
self.service_pool.cache_l2_srvs = numVal('size')
def _set_l2_cache() -> None:
self.service_pool.cache_l2_srvs = _numeric_value('size')
def set_initial() -> None:
self.service_pool.initial_srvs = numVal('size')
def _set_initial() -> None:
self.service_pool.initial_srvs = _numeric_value('size')
def set_max() -> None:
self.service_pool.max_srvs = numVal('size')
def _set_max() -> None:
self.service_pool.max_srvs = _numeric_value('size')
def publish() -> None:
nonlocal saveServicePool
def _publish() -> None:
nonlocal should_save_servicepool
self.service_pool.publish()
saveServicePool = False
should_save_servicepool = False
def ignores_unused() -> None:
def _ignores_unused() -> None:
self.service_pool.ignores_unused = params['state'] in ('true', '1', True)
def remove_userservices() -> None:
def _remove_userservices() -> None:
# 1.- Remove usable assigned services (Ignore "creating ones", just for created)
for userService in self.service_pool.assigned_user_services().filter(
state=types.states.State.USABLE
):
userService.remove()
def remove_stuck_userservice() -> None:
def _remove_stuck_userservice() -> None:
# 1.- Remove stuck assigned services (Ignore "creating ones", just for created)
since = sql_datetime() - datetime.timedelta(hours=numVal('hours'))
since = sql_datetime() - datetime.timedelta(hours=_numeric_value('hours'))
for userService in self.service_pool.assigned_user_services().filter(
state_date__lt=since, state=types.states.State.USABLE
):
userService.remove()
def del_all_transport() -> None:
def _del_all_transport() -> None:
# 2.- Remove all transports
self.service_pool.transports.clear()
def del_all_groups() -> None:
def _del_all_groups() -> None:
# 3.- Remove all groups
self.service_pool.assignedGroups.clear()
def clear_cache() -> None:
def _clear_cache() -> None:
# 4.- Remove all cache_l1_srvs
for i in self.service_pool.cached_users_services().filter(
UserServiceManager().get_cache_state_filter(
@ -203,7 +203,7 @@ class CalendarAction(UUIDModel):
):
i.remove()
def add_del_transport() -> None:
def _add_del_transport() -> None:
try:
t = Transport.objects.get(uuid=params['transport'])
if self.action == consts.calendar.CALENDAR_ACTION_ADD_TRANSPORT['id']:
@ -215,7 +215,7 @@ class CalendarAction(UUIDModel):
'Scheduled action not executed because transport is not available anymore'
)
def add_del_group() -> None:
def _add_del_group() -> None:
try:
auth, grp = params['group'].split('@')
grp = Authenticator.objects.get(uuid=auth).groups.get(uuid=grp)
@ -225,46 +225,50 @@ class CalendarAction(UUIDModel):
self.service_pool.assignedGroups.remove(grp)
except Exception:
self.service_pool.log('Scheduled action not executed because group is not available anymore')
def _set_display_custom_message() -> None:
self.service_pool.display_custom_message = params['visible'] in ('true', '1', True)
actions: collections.abc.Mapping[str, tuple[collections.abc.Callable[[], None], bool]] = {
# Id, actions (lambda), saveServicePool (bool)
consts.calendar.CALENDAR_ACTION_CACHE_L1['id']: (set_l1_cache, True),
consts.calendar.CALENDAR_ACTION_CACHE_L2['id']: (set_l2_cache, True),
consts.calendar.CALENDAR_ACTION_INITIAL['id']: (set_initial, True),
consts.calendar.CALENDAR_ACTION_MAX['id']: (set_max, True),
consts.calendar.CALENDAR_ACTION_PUBLISH['id']: (publish, False),
consts.calendar.CALENDAR_ACTION_IGNORE_UNUSED['id']: (ignores_unused, True),
consts.calendar.CALENDAR_ACTION_REMOVE_USERSERVICES['id']: (remove_userservices, False),
consts.calendar.CALENDAR_ACTION_CACHE_L1['id']: (_set_l1_cache, True),
consts.calendar.CALENDAR_ACTION_CACHE_L2['id']: (_set_l2_cache, True),
consts.calendar.CALENDAR_ACTION_INITIAL['id']: (_set_initial, True),
consts.calendar.CALENDAR_ACTION_MAX['id']: (_set_max, True),
consts.calendar.CALENDAR_ACTION_PUBLISH['id']: (_publish, False),
consts.calendar.CALENDAR_ACTION_IGNORE_UNUSED['id']: (_ignores_unused, True),
consts.calendar.CALENDAR_ACTION_REMOVE_USERSERVICES['id']: (_remove_userservices, False),
consts.calendar.CALENDAR_ACTION_REMOVE_STUCK_USERSERVICES['id']: (
remove_stuck_userservice,
_remove_stuck_userservice,
False,
),
consts.calendar.CALENDAR_ACTION_DEL_ALL_TRANSPORTS['id']: (del_all_transport, False),
consts.calendar.CALENDAR_ACTION_DEL_ALL_GROUPS['id']: (del_all_groups, False),
consts.calendar.CALENDAR_ACTION_CLEAN_CACHE_L1['id']: (clear_cache, False),
consts.calendar.CALENDAR_ACTION_CLEAN_CACHE_L2['id']: (clear_cache, False),
consts.calendar.CALENDAR_ACTION_DEL_ALL_TRANSPORTS['id']: (_del_all_transport, False),
consts.calendar.CALENDAR_ACTION_DEL_ALL_GROUPS['id']: (_del_all_groups, False),
consts.calendar.CALENDAR_ACTION_CLEAN_CACHE_L1['id']: (_clear_cache, False),
consts.calendar.CALENDAR_ACTION_CLEAN_CACHE_L2['id']: (_clear_cache, False),
consts.calendar.CALENDAR_ACTION_ADD_TRANSPORT['id']: (
add_del_transport,
_add_del_transport,
False,
),
consts.calendar.CALENDAR_ACTION_DEL_TRANSPORT['id']: (
add_del_transport,
_add_del_transport,
False,
),
consts.calendar.CALENDAR_ACTION_ADD_GROUP['id']: (add_del_group, False),
consts.calendar.CALENDAR_ACTION_DEL_GROUP['id']: (add_del_group, False),
consts.calendar.CALENDAR_ACTION_ADD_GROUP['id']: (_add_del_group, False),
consts.calendar.CALENDAR_ACTION_DEL_GROUP['id']: (_add_del_group, False),
consts.calendar.CALENDAR_ACTION_DISPLAY_CUSTOM_MESSAGE['id']: (_set_display_custom_message, True),
}
fncAction, saveServicePool = actions.get(self.action, (None, False))
action_executor, should_save_servicepool = actions.get(self.action, (None, False))
action = consts.calendar.CALENDAR_ACTION_DICT.get(self.action)
description = self.action if not action else action.get('description', self.action)
if fncAction:
if action_executor:
try:
fncAction()
action_executor()
if saveServicePool:
if should_save_servicepool:
self.service_pool.save()
self.service_pool.log(

View File

@ -79,6 +79,7 @@ def _service_info(
to_be_replaced: typing.Optional[str],
to_be_replaced_text: str,
custom_calendar_text: str,
custom_message_text: typing.Optional[str],
) -> collections.abc.Mapping[str, typing.Any]:
return {
'id': ('M' if is_meta else 'F') + uuid,
@ -98,6 +99,7 @@ def _service_info(
'to_be_replaced': to_be_replaced,
'to_be_replaced_text': to_be_replaced_text,
'custom_calendar_text': custom_calendar_text,
'custom_message_text': custom_message_text,
}
@ -186,6 +188,17 @@ def get_services_info_dict(
# Check that we have access to at least one transport on some of its children
transports_in_meta: list[collections.abc.Mapping[str, typing.Any]] = []
in_use: bool = typing.cast(typing.Any, meta).number_in_use > 0 # Anotated value
custom_message: typing.Optional[str] = None
# Fist member of the pool that has a custom message, and is enabled, will be used
# Ordered by priority, using internal sort, to take advantage of prefetched data
sorted_members = sorted(meta.members.all(), key=lambda x: x.priority)
# Get first member with custom message visible and enabled for metapools
for member in sorted_members:
if member.pool.display_custom_message and member.pool.visible and member.pool.custom_message.strip():
custom_message = member.pool.custom_message
break
# Calculate info variable macros content if needed
info_vars = (
@ -206,7 +219,7 @@ def get_services_info_dict(
)
transports_in_all_pools = reduce(
reducer,
[{t for t in _valid_transports(member)} for member in meta.members.all()],
[{t for t in _valid_transports(member)} for member in sorted_members],
)
transports_in_meta = _build_transports_for_meta(
transports_in_all_pools,
@ -218,7 +231,7 @@ def get_services_info_dict(
transports_in_all_pools_by_label: typing.Optional[typing.Set[str]] = None
temporary_transport_set_by_label: typing.Set[str]
for member in meta.members.all():
for member in sorted_members:
temporary_transport_set_by_label = set()
# if first pool, get all its transports and check that are valid
for t in _valid_transports(member):
@ -246,7 +259,7 @@ def get_services_info_dict(
{
'id': 'meta',
'name': 'meta',
'link': html.uds_access_link(request, 'M' + meta.uuid, None),
'link': html.uds_access_link(request, 'M' + meta.uuid, None),
'priority': 0,
}
if any(_valid_transports(member) for member in meta.members.all())
@ -279,6 +292,7 @@ def get_services_info_dict(
to_be_replaced=None,
to_be_replaced_text='',
custom_calendar_text=meta.calendar_message,
custom_message_text=custom_message,
)
)
@ -373,6 +387,8 @@ def get_services_info_dict(
to_be_replaced=replace_date_as_str,
to_be_replaced_text=replace_date_info_text,
custom_calendar_text=service_pool.calendar_message,
# Only add custom message if it's enabled and has a message
custom_message_text=service_pool.custom_message if service_pool.display_custom_message and service_pool.custom_message.strip() else None,
)
)