1
0
mirror of https://github.com/dkmstr/openuds.git synced 2025-03-20 06:50:23 +03:00

Refactor REST methods to use specific ItemDictType for item serialization and improve type handling

This commit is contained in:
Adolfo Gómez García 2025-02-09 15:15:23 +01:00
parent 10076bf46a
commit 803c8ba7b2
No known key found for this signature in database
GPG Key ID: DD1ABF20724CDA23
32 changed files with 747 additions and 431 deletions

View File

@ -55,7 +55,7 @@ class HelpMethodInfo:
method: str
text: str
methods: list[types.rest.HelpNode.Methods]
methods: list[types.rest.doc.HelpNode.Methods]
def __str__(self) -> str:
return f'{self.method}: {self.text}'
@ -69,45 +69,45 @@ class HelpMethod(enum.Enum):
'<uuid>',
'Retrieves an item by its UUID',
[
types.rest.HelpNode.Methods.GET,
types.rest.HelpNode.Methods.PUT,
types.rest.HelpNode.Methods.DELETE,
types.rest.doc.HelpNode.Methods.GET,
types.rest.doc.HelpNode.Methods.PUT,
types.rest.doc.HelpNode.Methods.DELETE,
],
)
LOG = HelpMethodInfo(
f'<uuid>/{consts.rest.LOG}',
'Retrieves the logs of an element by its UUID',
[
types.rest.HelpNode.Methods.GET,
types.rest.doc.HelpNode.Methods.GET,
],
)
OVERVIEW = HelpMethodInfo(
consts.rest.OVERVIEW, 'General Overview of all items (a list', [types.rest.HelpNode.Methods.GET]
consts.rest.OVERVIEW, 'General Overview of all items (a list', [types.rest.doc.HelpNode.Methods.GET]
)
TABLEINFO = HelpMethodInfo(
consts.rest.TABLEINFO,
'Table visualization information (types, etc..)',
[
types.rest.HelpNode.Methods.GET,
types.rest.doc.HelpNode.Methods.GET,
],
)
TYPES = HelpMethodInfo(
consts.rest.TYPES,
'Retrieves a list of types available',
[
types.rest.HelpNode.Methods.GET,
types.rest.doc.HelpNode.Methods.GET,
],
)
TYPES_TYPE = HelpMethodInfo(
f'{consts.rest.TYPES}/<type>',
'Retrieves a type information',
[
types.rest.HelpNode.Methods.GET,
types.rest.doc.HelpNode.Methods.GET,
],
)
GUI = HelpMethodInfo(consts.rest.GUI, 'GUI information', [types.rest.HelpNode.Methods.GET])
GUI = HelpMethodInfo(consts.rest.GUI, 'GUI information', [types.rest.doc.HelpNode.Methods.GET])
GUI_TYPES = HelpMethodInfo(
f'{consts.rest.GUI}/<type>', 'GUI Types information', [types.rest.HelpNode.Methods.GET]
f'{consts.rest.GUI}/<type>', 'GUI Types information', [types.rest.doc.HelpNode.Methods.GET]
)
@ -115,7 +115,7 @@ class HelpMethod(enum.Enum):
class HelpInfo:
path: str
text: str
method: types.rest.HelpNode.Methods = types.rest.HelpNode.Methods.GET
method: types.rest.doc.HelpNode.Methods = types.rest.doc.HelpNode.Methods.GET
@property
def is_empty(self) -> bool:
@ -143,11 +143,11 @@ class Documentation(View):
help_data: list[HelpInfo] = []
def _process_node(node: 'types.rest.HelpNode', path: str) -> None:
def _process_node(node: 'types.rest.doc.HelpNode', path: str) -> None:
match node.kind:
case types.rest.HelpNode.Type.PATH:
case types.rest.doc.HelpNode.Type.PATH:
pass
case types.rest.HelpNode.Type.MODEL | types.rest.HelpNode.Type.DETAIL:
case types.rest.doc.HelpNode.Type.MODEL | types.rest.doc.HelpNode.Type.DETAIL:
for func in [
HelpMethod.OVERVIEW,
HelpMethod.GUI,

View File

@ -72,7 +72,7 @@ class Handler(abc.ABC):
# For implementing help
# A list of pairs of (path, help) for subpaths on this handler
help_paths: typing.ClassVar[list[types.rest.HelpDoc]] = []
help_paths: typing.ClassVar[list[types.rest.doc.HelpDoc]] = []
help_text: typing.ClassVar[str] = 'No help available'
_request: 'ExtendedHttpRequestWithUser' # It's a modified HttpRequest

View File

@ -55,6 +55,13 @@ class Accounts(ModelHandler):
"""
Processes REST requests about accounts
"""
class AccountItem(types.rest.ItemDictType):
id: str
name: str
tags: typing.List[str]
comments: str
time_mark: typing.Optional[datetime.datetime]
permission: int
model = Account
detail = {'usage': AccountsUsage}
@ -74,7 +81,7 @@ class Accounts(ModelHandler):
{'tags': {'title': _('tags'), 'visible': False}},
]
def item_as_dict(self, item: 'Model') -> types.rest.ItemDictType:
def item_as_dict(self, item: 'Model') -> AccountItem:
item = ensure.is_instance(item, Account)
return {
'id': item.uuid,

View File

@ -30,6 +30,7 @@
"""
Author: Adolfo Gómez, dkmaster at dkmon dot com
"""
import datetime
import logging
import typing
@ -52,6 +53,21 @@ logger = logging.getLogger(__name__)
class ActorTokens(ModelHandler):
class ActorTokenItem(types.rest.ItemDictType):
id: str
name: str
stamp: datetime.datetime
username: str
ip: str
host: str
hostname: str
version: str
pre_command: str
post_command: str
run_once_command: str
log_level: str
os: str
model = Server
model_filter = {'type': types.servers.ServerType.ACTOR}
@ -70,7 +86,7 @@ class ActorTokens(ModelHandler):
{'os': {'title': _('OS')}},
]
def item_as_dict(self, item: 'Model') -> dict[str, typing.Any]:
def item_as_dict(self, item: 'Model') -> ActorTokenItem:
item = ensure.is_instance(item, Server)
data: dict[str, typing.Any] = item.data or {}
if item.log_level < 10000: # Old log level, from actor, etc..
@ -79,7 +95,9 @@ class ActorTokens(ModelHandler):
log_level = LogLevel(item.log_level).name
return {
'id': item.token,
'name': str(_('Token isued by {} from {}')).format(item.register_username, item.hostname or item.ip),
'name': str(_('Token isued by {} from {}')).format(
item.register_username, item.hostname or item.ip
),
'stamp': item.stamp,
'username': item.register_username,
'ip': item.ip,

View File

@ -60,6 +60,26 @@ logger = logging.getLogger(__name__)
# Enclosed methods under /auth path
class Authenticators(ModelHandler):
class PartialAuthItem(types.rest.ItemDictType):
numeric_id: int
id: str
name: str
priority: int
class FullAuthItem(PartialAuthItem):
tags: list[str]
comments: str
net_filtering: str
networks: list[dict[str, str]]
state: str
mfa_id: str
small_name: str
users_count: int
type: str
type_name: str
type_info: types.rest.TypeInfoDict
permission: int
model = Authenticator
# Custom get method "search" that requires authenticator id
custom_methods = [types.rest.ModelCustomMethod('search', True)]
@ -155,35 +175,37 @@ class Authenticators(ModelHandler):
logger.info('Type not found: %s', e)
raise exceptions.rest.NotFound('type not found') from e
def item_as_dict(self, item: 'Model') -> dict[str, typing.Any]:
def item_as_dict(self, item: 'Model') -> PartialAuthItem | FullAuthItem:
summary = 'summarize' in self._params
item = ensure.is_instance(item, Authenticator)
v: dict[str, typing.Any] = {
if summary:
return {
'numeric_id': item.id,
'id': item.uuid,
'name': item.name,
'priority': item.priority,
}
type_ = item.get_type()
return {
'numeric_id': item.id,
'id': item.uuid,
'name': item.name,
'priority': item.priority,
'tags': [tag.tag for tag in typing.cast(collections.abc.Iterable[Tag], item.tags.all())],
'comments': item.comments,
'net_filtering': item.net_filtering,
'networks': [{'id': n.uuid} for n in item.networks.all()],
'state': item.state,
'mfa_id': item.mfa.uuid if item.mfa else '',
'small_name': item.small_name,
'users_count': item.users.count(),
'type': type_.mod_type(),
'type_name': type_.mod_name(),
'type_info': self.type_as_dict(type_),
'permission': permissions.effective_permissions(self._user, item),
}
if not summary:
type_ = item.get_type()
v.update(
{
'tags': [tag.tag for tag in typing.cast(collections.abc.Iterable[Tag], item.tags.all())],
'comments': item.comments,
'net_filtering': item.net_filtering,
'networks': [{'id': n.uuid} for n in item.networks.all()],
'state': item.state,
'mfa_id': item.mfa.uuid if item.mfa else '',
'small_name': item.small_name,
'users_count': item.users.count(),
'type': type_.mod_type(),
'type_name': type_.mod_name(),
'type_info': self.type_as_dict(type_),
'permission': permissions.effective_permissions(self._user, item),
}
)
return v
def post_save(self, item: 'Model') -> None:
item = ensure.is_instance(item, Authenticator)
@ -198,7 +220,7 @@ class Authenticators(ModelHandler):
item.networks.set(Network.objects.filter(uuid__in=networks))
# Custom "search" method
def search(self, item: 'Model') -> list[types.rest.ItemDictType]:
def search(self, item: 'Model') -> list[types.auth.SearchResultItem.ItemDict]:
"""
API:
Search for users or groups in this authenticator
@ -240,7 +262,9 @@ class Authenticators(ModelHandler):
return [i.as_dict() for i in itertools.islice(iterable, limit)]
except Exception as e:
logger.exception('Too many results: %s', e)
return [{'id': _('Too many results...'), 'name': _('Refine your query')}]
return [
types.auth.SearchResultItem(id=_('Too many results...'), name=_('Refine your query')).as_dict()
]
# self.invalidResponseException('{}'.format(e))
def test(self, type_: str) -> typing.Any:

View File

@ -30,10 +30,12 @@
"""
@Author: Adolfo Gómez, dkmaster at dkmon dot com
"""
import datetime
import logging
import typing
from django.utils.translation import gettext_lazy as _
from uds.core import types
from uds.models import Calendar
from uds.core.util import permissions, ensure
@ -53,6 +55,16 @@ class Calendars(ModelHandler):
"""
Processes REST requests about calendars
"""
class CalendarItem(types.rest.ItemDictType):
id: str
name: str
tags: list[str]
comments: str
modified: datetime.datetime
number_rules: int
number_access: int
number_actions: int
permission: types.permissions.PermissionType
model = Calendar
detail = {'rules': CalendarRules}
@ -77,7 +89,7 @@ class Calendars(ModelHandler):
{'tags': {'title': _('tags'), 'visible': False}},
]
def item_as_dict(self, item: 'Model') -> dict[str, typing.Any]:
def item_as_dict(self, item: 'Model') -> CalendarItem:
item = ensure.is_instance(item, Calendar)
return {
'id': item.uuid,

View File

@ -42,7 +42,7 @@ from uds.core.exceptions.services import ServiceNotReadyError
from uds.core.types.log import LogLevel, LogSource
from uds.core.util.config import GlobalConfig
from uds.core.util.model import sql_stamp_seconds
from uds.core.util.rest.tools import match
from uds.core.util.rest.tools import match_args
from uds.models import TicketStore, User
from uds.REST import Handler
@ -282,7 +282,7 @@ class Client(Handler):
}
)
return match(
return match_args(
self._args,
_error, # In case of error, raises RequestError
((), _noargs), # No args, return version

View File

@ -38,7 +38,7 @@ from uds.core import exceptions, types, consts
from uds.core.managers.crypto import CryptoManager
from uds.core.managers.userservice import UserServiceManager
from uds.core.exceptions.services import ServiceNotReadyError
from uds.core.util.rest.tools import match
from uds.core.util.rest.tools import match_args
from uds.REST import Handler
from uds.web.util import services
@ -177,7 +177,7 @@ class Connection(Handler):
def error() -> dict[str, typing.Any]:
raise exceptions.rest.RequestError('Invalid Request')
return match(
return match_args(
self._args,
error,
((), self.service_list),

View File

@ -52,6 +52,16 @@ class Images(ModelHandler):
"""
Handles the gallery REST interface
"""
class ImageItem(types.rest.ItemDictType):
id: str
name: str
data: str
class ImageItemOverview(types.rest.ItemDictType):
id: str
name: str
size: str
thumb: str
path = 'gallery'
model = Image
@ -96,7 +106,7 @@ class Images(ModelHandler):
},
)
def item_as_dict(self, item: 'Model') -> dict[str, typing.Any]:
def item_as_dict(self, item: 'Model') -> ImageItem:
item = ensure.is_instance(item, Image)
return {
'id': item.uuid,
@ -104,7 +114,7 @@ class Images(ModelHandler):
'data': item.data64,
}
def item_as_dict_overview(self, item: 'Model') -> dict[str, typing.Any]:
def item_as_dict_overview(self, item: 'Model') -> ImageItemOverview:
item = ensure.is_instance(item, Image)
return {
'id': item.uuid,

View File

@ -60,6 +60,27 @@ class MetaPools(ModelHandler):
Handles Services Pools REST requests
"""
class MetaPoolItem(types.rest.ItemDictType):
id: str
name: str
short_name: str
tags: list[str]
comments: str
thumb: str
image_id: str | None
servicesPoolGroup_id: str | None
pool_group_name: str | None
pool_group_thumb: str | None
user_services_count: int
user_services_in_preparation: int
visible: bool
policy: str
fallbackAccess: str
permission: int
calendar_message: str
transport_grouping: int
ha_policy: str
model = MetaPool
detail = {
'pools': MetaServicesPool,
@ -113,7 +134,7 @@ class MetaPools(ModelHandler):
types.rest.ModelCustomMethod('get_fallback_access', True),
]
def item_as_dict(self, item: 'Model') -> dict[str, typing.Any]:
def item_as_dict(self, item: 'Model') -> MetaPoolItem:
item = ensure.is_instance(item, MetaPool)
# if item does not have an associated service, hide it (the case, for example, for a removed service)
# Access from dict will raise an exception, and item will be skipped
@ -134,7 +155,7 @@ class MetaPools(ModelHandler):
(i.pool.userServices.filter(state=State.PREPARING).count()) for i in all_pools
)
val = {
return {
'id': item.uuid,
'name': item.name,
'short_name': item.short_name,
@ -156,8 +177,6 @@ class MetaPools(ModelHandler):
'ha_policy': str(item.ha_policy),
}
return val
# Gui related
def get_gui(self, type_: str) -> list[typing.Any]:
local_gui = self.add_default_fields([], ['name', 'comments', 'tags'])

View File

@ -52,6 +52,18 @@ logger = logging.getLogger(__name__)
class MFA(ModelHandler):
class MFAItem(types.rest.ItemDictType):
id: str
name: str
remember_device: int
validity: int
tags: list[str]
comments: str
type: str
type_name: str
permission: int
model = models.MFA
save_fields = ['name', 'comments', 'tags', 'remember_device', 'validity']
@ -103,8 +115,8 @@ class MFA(ModelHandler):
)
return local_gui
def item_as_dict(self, item: 'Model') -> dict[str, typing.Any]:
def item_as_dict(self, item: 'Model') -> MFAItem:
item = ensure.is_instance(item, models.MFA)
type_ = item.get_type()
return {

View File

@ -54,6 +54,15 @@ class Networks(ModelHandler):
Processes REST requests about networks
Implements specific handling for network related requests using GUI
"""
class NetworkItem(types.rest.ItemDictType):
id: str
name: str
tags: list[str]
net_string: str
transports_count: int
authenticators_count: int
permission: types.permissions.PermissionType
model = Network
save_fields = ['name', 'net_string', 'tags']
@ -100,8 +109,8 @@ class Networks(ModelHandler):
'order': 100, # At end
},
)
def item_as_dict(self, item: 'Model') -> dict[str, typing.Any]:
def item_as_dict(self, item: 'Model') -> NetworkItem:
item = ensure.is_instance(item, Network)
return {
'id': item.uuid,

View File

@ -54,6 +54,18 @@ logger = logging.getLogger(__name__)
class Notifiers(ModelHandler):
class NotifierItem(types.rest.ItemDictType):
id: str
name: str
level: str
enabled: bool
tags: list[str]
comments: str
type: str
type_name: str
permission: types.permissions.PermissionType
path = 'messaging'
model = Notifier
save_fields = [
@ -113,7 +125,7 @@ class Notifiers(ModelHandler):
return local_gui
def item_as_dict(self, item: 'Model') -> dict[str, typing.Any]:
def item_as_dict(self, item: 'Model') -> NotifierItem:
item = ensure.is_instance(item, Notifier)
type_ = item.get_type()
return {

View File

@ -51,6 +51,17 @@ logger = logging.getLogger(__name__)
class OsManagers(ModelHandler):
class OsManagerItem(types.rest.ItemDictType):
id: str
name: str
tags: list[str]
deployed_count: int
type: str
type_name: str
servicesTypes: list[str]
comments: str
permission: types.permissions.PermissionType
model = OSManager
save_fields = ['name', 'comments', 'tags']
@ -63,7 +74,7 @@ class OsManagers(ModelHandler):
{'tags': {'title': _('tags'), 'visible': False}},
]
def os_manager_as_dict(self, osm: OSManager) -> dict[str, typing.Any]:
def os_manager_as_dict(self, osm: OSManager) -> OsManagerItem:
type_ = osm.get_type()
return {
'id': osm.uuid,
@ -79,7 +90,7 @@ class OsManagers(ModelHandler):
'permission': permissions.effective_permissions(self._user, osm),
}
def item_as_dict(self, item: 'Model') -> types.rest.ItemDictType:
def item_as_dict(self, item: 'Model') -> OsManagerItem:
item = ensure.is_instance(item, OSManager)
return self.os_manager_as_dict(item)

View File

@ -38,7 +38,7 @@ import uds.core.types.permissions
from uds import models
from uds.core import consts, exceptions
from uds.core.util import permissions
from uds.core.util.rest.tools import match
from uds.core.util.rest.tools import match_args
from uds.REST import Handler
# Not imported at runtime, just for type checking
@ -168,7 +168,7 @@ class Permissions(Handler):
raise exceptions.rest.RequestError('Invalid request')
# match is a helper function that will match the args with the given patterns
return match(self._args,
return match_args(self._args,
no_match,
(('<cls>', '<obj>', 'users', 'add', '<user>'), add_user_permission),
(('<cls>', '<obj>', 'groups', 'add', '<group>'), add_group_permission),

View File

@ -54,10 +54,27 @@ if typing.TYPE_CHECKING:
from django.db.models import Model
# Helper class for Provider offers
class OfferItem(types.rest.ItemDictType):
name: str
type: str
description: str
icon: str
class Providers(ModelHandler):
"""
Providers REST handler
"""
class ProviderItem(types.rest.ItemDictType):
id: str
name: str
tags: list[str]
services_count: int
user_services_count: int
maintenance_mode: bool
offers: list[OfferItem]
type: str
type_name: str
comments: str
permission: types.permissions.PermissionType
model = Provider
detail = {'services': DetailServices, 'usage': ServicesUsage}
@ -85,12 +102,12 @@ class Providers(ModelHandler):
# Field from where to get "class" and prefix for that class, so this will generate "row-state-A, row-state-X, ....
table_row_style = types.ui.RowStyleInfo(prefix='row-maintenance-', field='maintenance_mode')
def item_as_dict(self, item: 'Model') -> types.rest.ItemDictType:
def item_as_dict(self, item: 'Model') -> ProviderItem:
item = ensure.is_instance(item, Provider)
type_ = item.get_type()
# Icon can have a lot of data (1-2 Kbytes), but it's not expected to have a lot of services providers, and even so, this will work fine
offers = [
offers: list[OfferItem] = [
{
'name': gettext(t.mod_name()),
'type': t.mod_type(),

View File

@ -36,7 +36,7 @@ import typing
from django.utils.translation import gettext_lazy as _
from uds.core import types, consts
from uds.core.util.rest.tools import match
from uds.core.util.rest.tools import match_args
from uds.REST import model
from uds import reports
@ -97,7 +97,7 @@ class Reports(model.BaseModelHandler):
def report_gui(report_id: str) -> typing.Any:
return self.get_gui(report_id)
return match(
return match_args(
self._args,
error,
((), lambda: list(self.get_items())),

View File

@ -29,6 +29,7 @@
"""
Author: Adolfo Gómez, dkmaster at dkmon dot com
"""
import datetime
import logging
import typing
@ -50,6 +51,19 @@ logger = logging.getLogger(__name__)
# REST API for Server Tokens management (for admin interface)
class ServersTokens(ModelHandler):
class TokenItem(types.rest.ItemDictType):
id: str
name: str
stamp: datetime.datetime
username: str
ip: str
hostname: str
listen_port: int
mac: str
token: str
type: str
os: str
# servers/groups/[id]/servers
model = models.Server
model_exclude = {
@ -71,7 +85,7 @@ class ServersTokens(ModelHandler):
{'stamp': {'title': _('Date'), 'type': 'datetime'}},
]
def item_as_dict(self, item: 'Model') -> dict[str, typing.Any]:
def item_as_dict(self, item: 'Model') -> TokenItem:
item = typing.cast('models.Server', item) # We will receive for sure
return {
'id': item.uuid,
@ -108,6 +122,14 @@ class ServersTokens(ModelHandler):
# REST API For servers (except tunnel servers nor actors)
class ServersServers(DetailHandler):
class ServerItem(types.rest.ItemDictType):
id: str
hostname: str
ip: str
listen_port: int
mac: str
maintenance_mode: bool
custom_methods = ['maintenance', 'importcsv']
def get_items(self, parent: 'Model', item: typing.Optional[str]) -> types.rest.ManyItemsDictType:
@ -117,23 +139,24 @@ class ServersServers(DetailHandler):
q = parent.servers.all()
else:
q = parent.servers.filter(uuid=process_uuid(item))
res: types.rest.ItemListType = []
res: list[ServersServers.ServerItem] = []
i = None
for i in q:
val = {
'id': i.uuid,
'hostname': i.hostname,
'ip': i.ip,
'listen_port': i.listen_port,
'mac': i.mac if i.mac != consts.MAC_UNKNOWN else '',
'maintenance_mode': i.maintenance_mode,
}
res.append(val)
res.append(
{
'id': i.uuid,
'hostname': i.hostname,
'ip': i.ip,
'listen_port': i.listen_port,
'mac': i.mac if i.mac != consts.MAC_UNKNOWN else '',
'maintenance_mode': i.maintenance_mode,
}
)
if item is None:
return res
return typing.cast(types.rest.ManyItemsDictType, res)
if not i:
raise Exception('Item not found')
return res[0]
return typing.cast(types.rest.ManyItemsDictType, res[0])
except Exception as e:
logger.exception('REST servers')
raise self.invalid_item_response() from e
@ -403,6 +426,17 @@ class ServersServers(DetailHandler):
class ServersGroups(ModelHandler):
class GroupItem(types.rest.ItemDictType):
id: str
name: str
comments: str
type: str
subtype: str
type_name: str
tags: list[str]
servers_count: int
permission: types.permissions.PermissionType
custom_methods = [
types.rest.ModelCustomMethod('stats', True),
]
@ -478,7 +512,7 @@ class ServersGroups(ModelHandler):
fields['subtype'] = subtype
return super().pre_save(fields)
def item_as_dict(self, item: 'Model') -> dict[str, typing.Any]:
def item_as_dict(self, item: 'Model') -> GroupItem:
item = ensure.is_instance(item, models.ServerGroup)
return {
'id': item.uuid,

View File

@ -54,9 +54,20 @@ from uds.core.ui import gui
class ServicesPoolGroups(ModelHandler):
"""
Handles the gallery REST interface
"""
class ServicePoolGroupItem(types.rest.ItemDictType):
id: str
name: str
comments: str
priority: int
image_id: str|None
class ServicePoolGroupItemOverview(types.rest.ItemDictType):
id: str
name: str
priority: int
comments: str
thumb: str
path = 'gallery'
model = ServicePoolGroup
@ -111,20 +122,20 @@ class ServicesPoolGroups(ModelHandler):
self.add_field(local_gui, field)
return local_gui
def item_as_dict(self, item: 'Model') -> dict[str, typing.Any]:
def item_as_dict(self, item: 'Model') -> ServicePoolGroupItem:
item = ensure.is_instance(item, ServicePoolGroup)
return {
'id': item.uuid,
'priority': item.priority,
'name': item.name,
'comments': item.comments,
'priority': item.priority,
'image_id': item.image.uuid if item.image else None,
}
def item_as_dict_overview(
self, item: 'Model'
) -> dict[str, typing.Any]:
) -> ServicePoolGroupItemOverview:
item = ensure.is_instance(item, ServicePoolGroup)
return {
'id': item.uuid,

View File

@ -175,7 +175,49 @@ class ServicesPools(ModelHandler):
# return super().get_items(overview=kwargs.get('overview', True), prefetch=['service', 'service__provider', 'servicesPoolGroup', 'image', 'tags'])
# return super(ServicesPools, self).get_items(*args, **kwargs)
def item_as_dict(self, item: 'Model') -> dict[str, typing.Any]:
class SummaryItem(types.rest.ItemDictType):
id: str
name: str
short_name: str
tags: typing.List[str]
parent: str
parent_type: str
comments: str
state: str
thumb: str
account: str
account_id: str | None
service_id: str
provider_id: str
image_id: str | None
initial_srvs: int
cache_l1_srvs: int
cache_l2_srvs: int
max_srvs: int
show_transports: bool
visible: bool
allow_users_remove: bool
allow_users_reset: bool
ignores_unused: bool
fallbackAccess: str
meta_member: list[dict[str, str]]
calendar_message: str
custom_message: str
display_custom_message: bool
osmanager_id: str | None
class FullItem(SummaryItem):
user_services_count: int
user_services_in_preparation: int
restrained: bool
permission: int
info: dict[str, typing.Any]
pool_group_id: str | None
pool_group_name: str
pool_group_thumb: str
usage: str
def item_as_dict(self, item: 'Model') -> SummaryItem | FullItem:
item = ensure.is_instance(item, ServicePool)
summary = 'summarize' in self._params
# if item does not have an associated service, hide it (the case, for example, for a removed service)
@ -196,7 +238,7 @@ class ServicesPools(ModelHandler):
# This needs a lot of queries, and really does not apport anything important to the report
# elif UserServiceManager.manager().canInitiateServiceFromDeployedService(item) is False:
# state = State.SLOWED_DOWN
val: dict[str, typing.Any] = {
val: ServicesPools.SummaryItem = {
'id': item.uuid,
'name': item.name,
'short_name': item.short_name,
@ -227,45 +269,46 @@ class ServicesPools(ModelHandler):
'calendar_message': item.calendar_message,
'custom_message': item.custom_message,
'display_custom_message': item.display_custom_message,
'osmanager_id': item.osmanager.uuid if item.osmanager else None,
}
if summary:
return val
# Recast to complete data
val = typing.cast(ServicesPools.FullItem, val)
# Extended info
if not summary:
if hasattr(item, 'valid_count'):
valid_count = getattr(item, 'valid_count')
preparing_count = getattr(item, 'preparing_count')
restrained = getattr(item, 'error_count') >= GlobalConfig.RESTRAINT_COUNT.as_int()
usage_count = getattr(item, 'usage_count')
else:
valid_count = item.userServices.exclude(state__in=State.INFO_STATES).count()
preparing_count = item.userServices.filter(state=State.PREPARING).count()
restrained = item.is_restrained()
usage_count = -1
if hasattr(item, 'valid_count'):
valid_count = getattr(item, 'valid_count')
preparing_count = getattr(item, 'preparing_count')
restrained = getattr(item, 'error_count') >= GlobalConfig.RESTRAINT_COUNT.as_int()
usage_count = getattr(item, 'usage_count')
else:
valid_count = item.userServices.exclude(state__in=State.INFO_STATES).count()
preparing_count = item.userServices.filter(state=State.PREPARING).count()
restrained = item.is_restrained()
usage_count = -1
poolgroup_id = None
poolgroup_name = _('Default')
poolgroup_thumb = DEFAULT_THUMB_BASE64
if item.servicesPoolGroup is not None:
poolgroup_id = item.servicesPoolGroup.uuid
poolgroup_name = item.servicesPoolGroup.name
if item.servicesPoolGroup.image is not None:
poolgroup_thumb = item.servicesPoolGroup.image.thumb64
poolgroup_id = None
poolgroup_name = _('Default')
poolgroup_thumb = DEFAULT_THUMB_BASE64
if item.servicesPoolGroup is not None:
poolgroup_id = item.servicesPoolGroup.uuid
poolgroup_name = item.servicesPoolGroup.name
if item.servicesPoolGroup.image is not None:
poolgroup_thumb = item.servicesPoolGroup.image.thumb64
val['state'] = state
val['thumb'] = item.image.thumb64 if item.image is not None else DEFAULT_THUMB_BASE64
val['user_services_count'] = valid_count
val['user_services_in_preparation'] = preparing_count
val['tags'] = [tag.tag for tag in item.tags.all()]
val['restrained'] = restrained
val['permission'] = permissions.effective_permissions(self._user, item)
val['info'] = Services.service_info(item.service)
val['pool_group_id'] = poolgroup_id
val['pool_group_name'] = poolgroup_name
val['pool_group_thumb'] = poolgroup_thumb
val['usage'] = str(item.usage(usage_count).percent) + '%'
if item.osmanager:
val['osmanager_id'] = item.osmanager.uuid
val['state'] = state
val['thumb'] = item.image.thumb64 if item.image is not None else DEFAULT_THUMB_BASE64
val['user_services_count'] = valid_count
val['user_services_in_preparation'] = preparing_count
val['tags'] = [tag.tag for tag in item.tags.all()]
val['restrained'] = restrained
val['permission'] = permissions.effective_permissions(self._user, item)
val['info'] = Services.service_info(item.service)
val['pool_group_id'] = poolgroup_id
val['pool_group_name'] = poolgroup_name
val['pool_group_thumb'] = poolgroup_thumb
val['usage'] = str(item.usage(usage_count).percent) + '%'
return val

View File

@ -146,15 +146,15 @@ class System(Handler):
min_access_role = consts.UserRole.STAFF
help_paths = [
types.rest.HelpDoc('', ''),
types.rest.HelpDoc('stats/assigned', ''),
types.rest.HelpDoc('stats/inuse', ''),
types.rest.HelpDoc('stats/cached', ''),
types.rest.HelpDoc('stats/complete', ''),
types.rest.HelpDoc('stats/assigned/<uuuid>', 'Get service pool assigned stats'),
types.rest.HelpDoc('stats/inuse/<uuid>', 'Get service pool in use stats'),
types.rest.HelpDoc('stats/cached/<uuid>', 'Get service pool cached stats'),
types.rest.HelpDoc('stats/complete/<uuid>', 'Get service pool complete stats'),
types.rest.doc.HelpDoc('', ''),
types.rest.doc.HelpDoc('stats/assigned', ''),
types.rest.doc.HelpDoc('stats/inuse', ''),
types.rest.doc.HelpDoc('stats/cached', ''),
types.rest.doc.HelpDoc('stats/complete', ''),
types.rest.doc.HelpDoc('stats/assigned/<uuuid>', 'Get service pool assigned stats'),
types.rest.doc.HelpDoc('stats/inuse/<uuid>', 'Get service pool in use stats'),
types.rest.doc.HelpDoc('stats/cached/<uuid>', 'Get service pool cached stats'),
types.rest.doc.HelpDoc('stats/complete/<uuid>', 'Get service pool complete stats'),
]
help_text = 'Provides system information. Must be admin to access this'

View File

@ -53,6 +53,24 @@ logger = logging.getLogger(__name__)
class Transports(ModelHandler):
class TransportItem(types.rest.ItemDictType):
id: str
name: str
tags: list[str]
comments: str
priority: int
label: str
net_filtering: str
networks: list[str]
allowed_oss: list[str]
pools: list[str]
pools_count: int
deployed_count: int
type: str
type_name: str
protocol: str
permission: int
model = Transport
save_fields = [
'name',
@ -102,7 +120,10 @@ class Transports(ModelHandler):
'name': 'allowed_oss',
'value': [],
'choices': sorted(
[ui.gui.choice_item(x.db_value(), x.os_name().title()) for x in consts.os.KNOWN_OS_LIST],
[
ui.gui.choice_item(x.db_value(), x.os_name().title())
for x in consts.os.KNOWN_OS_LIST
],
key=lambda x: x['text'].lower(),
),
'label': gettext('Allowed Devices'),
@ -148,7 +169,7 @@ class Transports(ModelHandler):
return field
def item_as_dict(self, item: 'Model') -> dict[str, typing.Any]:
def item_as_dict(self, item: 'Model') -> TransportItem:
item = ensure.is_instance(item, Transport)
type_ = item.get_type()
pools = list(item.deployedServices.all().values_list('uuid', flat=True))

View File

@ -53,6 +53,13 @@ class TunnelServers(DetailHandler):
# tunnels/[id]/servers
custom_methods = ['maintenance']
class ServerItem(types.rest.ItemDictType):
id: str
hostname: str
ip: str
mac: str
maintenance: bool
def get_items(self, parent: 'Model', item: typing.Optional[str]) -> types.rest.ManyItemsDictType:
parent = ensure.is_instance(parent, models.ServerGroup)
try:
@ -62,22 +69,23 @@ class TunnelServers(DetailHandler):
q = parent.servers.all().order_by('hostname')
else:
q = parent.servers.filter(uuid=process_uuid(item))
res: list[dict[str, typing.Any]] = []
res: list[TunnelServers.ServerItem] = []
i = None
for i in q:
val = {
'id': i.uuid,
'hostname': i.hostname,
'ip': i.ip,
'mac': i.mac if not multi or i.mac != consts.MAC_UNKNOWN else '',
'maintenance': i.maintenance_mode,
}
res.append(val)
res.append(
{
'id': i.uuid,
'hostname': i.hostname,
'ip': i.ip,
'mac': i.mac if not multi or i.mac != consts.MAC_UNKNOWN else '',
'maintenance': i.maintenance_mode,
}
)
if multi:
return res
return typing.cast(types.rest.ManyItemsDictType, res)
if not i:
raise Exception('Item not found')
return res[0]
return typing.cast(types.rest.ManyItemsDictType, res[0])
except Exception as e:
logger.exception('REST groups')
raise self.invalid_item_response() from e
@ -125,7 +133,7 @@ class TunnelServers(DetailHandler):
"""
API:
Custom method that swaps maintenance mode state for a tunnel server
"""
parent = ensure.is_instance(parent, models.ServerGroup)
item = models.Server.objects.get(uuid=process_uuid(id))
@ -137,6 +145,17 @@ class TunnelServers(DetailHandler):
# Enclosed methods under /auth path
class Tunnels(ModelHandler):
class TunnelItem(types.rest.ItemDictType):
id: str
name: str
comments: str
host: str
port: int
tags: list[str]
transports_count: int
servers_count: int
permission: uds.core.types.permissions.PermissionType
path = 'tunnels'
name = 'tunnels'
model = models.ServerGroup
@ -187,7 +206,7 @@ class Tunnels(ModelHandler):
],
)
def item_as_dict(self, item: 'Model') -> dict[str, typing.Any]:
def item_as_dict(self, item: 'Model') -> TunnelItem:
item = ensure.is_instance(item, models.ServerGroup)
return {
'id': item.uuid,

View File

@ -329,18 +329,23 @@ class BaseModelHandler(Handler):
return args
def fill_instance_fields(self, item: 'models.Model', res: dict[str, typing.Any]) -> dict[str, typing.Any]:
def fill_instance_fields(self, item: 'models.Model', dct: types.rest.ItemDictType) -> None:
"""
For Managed Objects (db element that contains a serialized object), fills a dictionary with the "field" parameters values.
For non managed objects, it does nothing
:param item: Item to extract fields
:param res: Dictionary to "extend" with instance key-values pairs
Args:
item: Item to fill fields
dct: Dictionary to fill with fields
"""
# Cast to allow override typing
res = typing.cast(dict[str, typing.Any], dct)
if isinstance(item, ManagedObjectModel):
i = item.get_instance()
i.init_gui() # Defaults & stuff
res.update(i.get_fields_as_dict())
return res
# Exceptions
def invalid_request_response(self, message: typing.Optional[str] = None) -> exceptions.rest.HandlerError:

View File

@ -118,7 +118,7 @@ class ModelHandler(BaseModelHandler):
"""
return {}
def item_as_dict_overview(self, item: models.Model) -> dict[str, typing.Any]:
def item_as_dict_overview(self, item: models.Model) -> types.rest.ItemDictType:
"""
Invoked when request is an "overview"
default behavior is return item_as_dict

View File

@ -40,7 +40,7 @@ import typing
from django.http import HttpResponse
from django.utils.functional import Promise as DjangoPromise
from uds.core import consts, types
from uds.core import consts
from .utils import to_incremental_json
@ -109,38 +109,35 @@ class ContentProcessor:
"""
Helper for renderers. Alters some types so they can be serialized correctly (as we want them to be)
"""
if obj is None or isinstance(obj, (bool, int, float, str)):
return obj
match obj:
case None | bool() | int() | float() | str():
return obj
case dict():
return {
k: ContentProcessor.process_for_render(v)
for k, v in typing.cast(dict[str, typing.Any], obj).items()
}
# If we are a typed response...
if isinstance(obj, types.rest.TypedResponse):
return ContentProcessor.process_for_render(obj.as_dict())
case DjangoPromise():
return str(obj) # This is for translations
if isinstance(obj, DjangoPromise):
return str(obj) # This is for translations
case bytes():
return obj.decode('utf-8')
if isinstance(obj, dict):
return {
k: ContentProcessor.process_for_render(v)
for k, v in typing.cast(dict[str, typing.Any], obj).items()
}
case collections.abc.Iterable():
return [
ContentProcessor.process_for_render(v)
for v in typing.cast(collections.abc.Iterable[typing.Any], obj)
]
if isinstance(obj, bytes):
return obj.decode('utf-8')
case datetime.datetime():
return int(time.mktime(obj.timetuple()))
if isinstance(obj, collections.abc.Iterable):
return [
ContentProcessor.process_for_render(v)
for v in typing.cast(collections.abc.Iterable[typing.Any], obj)
]
case datetime.date():
return '{}-{:02d}-{:02d}'.format(obj.year, obj.month, obj.day)
if isinstance(obj, (datetime.datetime,)): # Datetime as timestamp
return int(time.mktime(obj.timetuple()))
if isinstance(obj, (datetime.date,)): # Date as string
return '{}-{:02d}-{:02d}'.format(obj.year, obj.month, obj.day)
return str(obj)
case _:
return str(obj)
class MarshallerProcessor(ContentProcessor):

View File

@ -30,6 +30,7 @@
Author: Adolfo Gómez, dkmaster at dkmon dot com
"""
# pyright: reportUnusedImport=false
from . import rest
from . import (
auth,
calendar,
@ -41,7 +42,6 @@ from . import (
permissions,
pools,
requests,
rest,
servers,
services,
states,

View File

@ -119,8 +119,12 @@ class LoginResult:
@dataclasses.dataclass
class SearchResultItem:
class ItemDict(typing.TypedDict):
id: str
name: str
id: str
name: str
def as_dict(self) -> typing.Dict[str, str]:
return dataclasses.asdict(self)
def as_dict(self) -> 'SearchResultItem.ItemDict':
return typing.cast(SearchResultItem.ItemDict, dataclasses.asdict(self))

View File

@ -30,60 +30,16 @@
Author: Adolfo Gómez, dkmaster at dkmon dot com
"""
import abc
import enum
import re
import typing
import dataclasses
import collections.abc
from . import doc
if typing.TYPE_CHECKING:
from uds.REST.handlers import Handler
# TypedResponse related.
# Typed responses are used to define the type of the response that a method will return.
# This allow us to "describe" it later on the documentation, and also to check that the
# response is correct (and also to generate the response in the correct format)
class TypedResponse(abc.ABC):
def as_dict(self) -> dict[str, typing.Any]:
# If we are a dataclass
if dataclasses.is_dataclass(self):
return dataclasses.asdict(self)
# If we are a dict
if isinstance(self, dict):
return self
raise Exception(f'Cannot convert {self} to dict')
@classmethod
def as_help(cls: type) -> dict[str, typing.Any]:
"""
Returns a representation, as json, of the response type to be used on documentation
For this, we build a dict of "name": "<type>" for each field of the response and returns it
Note that we support nested dataclasses and dicts, but not lists
"""
CLASS_REPR: dict[typing.Any, str] = {
str: '<string>',
int: '<integer>',
float: '<float>',
bool: '<boolean>',
dict: '<dict>',
list: '<list>',
typing.Any: '<any>',
}
def _as_help(obj: typing.Any) -> typing.Union[str, dict[str, typing.Any]]:
if dataclasses.is_dataclass(obj):
return {field.name: _as_help(field.type) for field in dataclasses.fields(obj)}
if isinstance(obj, dict):
return {k: str(_as_help(v)) for k, v in typing.cast(dict[str, typing.Any], obj).items()}
return CLASS_REPR.get(obj, str(obj))
return {field.name: _as_help(field.type) for field in dataclasses.fields(cls)}
# Type related definitions
TypeInfoDict = dict[str, typing.Any] # Alias for type info dict
@ -148,163 +104,22 @@ class ModelCustomMethod:
name: str
needs_parent: bool = True
# Note that for this item to work with documentation
# no forward references can be used (that is, do not use quotes around the inner field types)
class ItemDictType(typing.TypedDict):
pass
# Alias for item type
ItemDictType = dict[str, typing.Any]
# ItemDictType = dict[str, typing.Any]
ItemListType = list[ItemDictType]
ItemGeneratorType = typing.Generator[ItemDictType, None, None]
# Alias for get_items return type
ManyItemsDictType = typing.Union[ItemListType, ItemDictType, ItemGeneratorType]
ManyItemsDictType: typing.TypeAlias = ItemListType|ItemDictType|ItemGeneratorType
#
FieldType = collections.abc.Mapping[str, typing.Any]
# Regular expression to match the API: part of the docstring
# should be a multi line string, with a line containing only "API:" (with leading and trailing \s)
API_RE = re.compile(r'(?ms)^\s*API:\s*$')
@dataclasses.dataclass(eq=False)
class HelpDoc:
"""
Help helper class
"""
@dataclasses.dataclass
class ArgumentInfo:
name: str
type: str
description: str
path: str
description: str
arguments: list[ArgumentInfo] = dataclasses.field(default_factory=list)
# Result is always a json ressponse, so we can describe it as a dict
# Note that this dict can be nested
returns: typing.Any = None
def __init__(
self,
path: str,
help: str,
*,
arguments: typing.Optional[list[ArgumentInfo]] = None,
returns: typing.Optional[dict[str, typing.Any]] = None,
) -> None:
self.path = path
self.description = help
self.arguments = arguments or []
self.returns = returns or {}
def __hash__(self) -> int:
return hash(self.path)
def __eq__(self, other: object) -> bool:
if not isinstance(other, HelpDoc):
return False
return self.path == other.path
def as_str(self) -> str:
return f'{self.path} - {self.description}'
@property
def is_empty(self) -> bool:
return self.path == '' and self.description == ''
def _process_help(self, help: str, annotations: typing.Optional[dict[str, typing.Any]] = None) -> None:
"""
Processes the help string, removing leading and trailing spaces
"""
self.description = ''
self.arguments = []
self.returns = None
match = API_RE.search(help)
if match:
self.description = help[: match.start()].strip()
if annotations:
if 'return' in annotations:
t = annotations['return']
if isinstance(t, collections.abc.Iterable):
pass
# if issubclass(annotations['return'], TypedResponse):
# self.returns = annotations['return'].as_help()
@staticmethod
def from_typed_response(path: str, help: str, TR: type[TypedResponse]) -> 'HelpDoc':
"""
Returns a HelpDoc from a TypedResponse class
"""
return HelpDoc(
path=path,
help=help,
returns=TR.as_help(),
)
@staticmethod
def from_fnc(path: str, help: str, fnc: typing.Callable[..., typing.Any]) -> 'HelpDoc|None':
"""
Returns a HelpDoc from a function that returns a list of TypedResponses
"""
return_type: typing.Any = fnc.__annotations__.get('return')
if isinstance(return_type, TypedResponse):
return HelpDoc.from_typed_response(path, help, typing.cast(type[TypedResponse], return_type))
elif (
isinstance(return_type, collections.abc.Iterable)
and len(typing.cast(typing.Any, return_type).__args__) == 1
and issubclass(typing.cast(typing.Any, return_type).__args__[0], TypedResponse)
):
hd = HelpDoc.from_typed_response(
path, help, typing.cast(type[TypedResponse], typing.cast(typing.Any, return_type).__args__[0])
)
hd.returns = [hd.returns] # We need to return a list of returns
return hd
return None
@dataclasses.dataclass(frozen=True)
class HelpNode:
class Type(enum.StrEnum):
MODEL = 'model'
DETAIL = 'detail'
CUSTOM = 'custom'
PATH = 'path'
class Methods(enum.StrEnum):
GET = 'GET'
POST = 'POST'
PUT = 'PUT'
DELETE = 'DELETE'
PATCH = 'PATCH'
help: HelpDoc
children: list['HelpNode'] # Children nodes
kind: Type
methods: set[Methods] = dataclasses.field(default_factory=lambda: {HelpNode.Methods.GET})
def __hash__(self) -> int:
return hash(self.help.path + ''.join(method for method in self.methods))
def __eq__(self, other: object) -> bool:
if isinstance(other, HelpNode):
return self.help.path == other.help.path and self.methods == other.methods
if not isinstance(other, HelpDoc):
return False
return self.help.path == other.path
@property
def is_empty(self) -> bool:
return self.help.is_empty and not self.children
def __str__(self) -> str:
return f'HelpNode({self.help}, {self.children})'
@dataclasses.dataclass(frozen=True)
class HandlerNode:
"""
@ -346,35 +161,35 @@ class HandlerNode:
return ret + ''.join(child.tree(level + 1) for child in self.children.values())
def help_node(self) -> HelpNode:
def help_node(self) -> doc.HelpNode:
"""
Returns a HelpNode for this node (and children recursively)
"""
from uds.REST.model import ModelHandler
custom_help: set[HelpNode] = set()
custom_help: set[doc.HelpNode] = set()
help_node_type = HelpNode.Type.PATH
help_node_type = doc.HelpNode.Type.PATH
if self.handler:
help_node_type = HelpNode.Type.CUSTOM
help_node_type = doc.HelpNode.Type.CUSTOM
if issubclass(self.handler, ModelHandler):
help_node_type = HelpNode.Type.MODEL
help_node_type = doc.HelpNode.Type.MODEL
# Add custom_methods
for method in self.handler.custom_methods:
# Method is a Me CustomModelMethod,
# We access the __doc__ of the function inside the handler with method.name
doc = getattr(self.handler, method.name).__doc__ or ''
doc_attr = getattr(self.handler, method.name).__doc__ or ''
path = (
f'{self.full_path()}/{method.name}'
if not method.needs_parent
else f'{self.full_path()}/<uuid>/{method.name}'
)
custom_help.add(
HelpNode(
HelpDoc(path=path, help=doc),
doc.HelpNode(
doc.HelpDoc(path=path, help=doc_attr),
[],
HelpNode.Type.CUSTOM,
doc.HelpNode.Type.CUSTOM,
)
)
@ -382,35 +197,35 @@ class HandlerNode:
if self.handler.detail:
for method_name, method_class in self.handler.detail.items():
custom_help.add(
HelpNode(
HelpDoc(path=self.full_path() + '/' + method_name, help=''),
doc.HelpNode(
doc.HelpDoc(path=self.full_path() + '/' + method_name, help=''),
[],
HelpNode.Type.DETAIL,
doc.HelpNode.Type.DETAIL,
)
)
# Add custom_methods
for detail_method in method_class.custom_methods:
# Method is a Me CustomModelMethod,
# We access the __doc__ of the function inside the handler with method.name
doc = getattr(method_class, detail_method).__doc__ or ''
doc_attr = getattr(method_class, detail_method).__doc__ or ''
custom_help.add(
HelpNode(
HelpDoc(
doc.HelpNode(
doc.HelpDoc(
path=self.full_path()
+ '/<uuid>/'
+ method_name
+ '/<uuid>/'
+ detail_method,
help=doc,
help=doc_attr,
),
[],
HelpNode.Type.CUSTOM,
doc.HelpNode.Type.CUSTOM,
)
)
custom_help |= {
HelpNode(
HelpDoc(
doc.HelpNode(
doc.HelpDoc(
path=self.full_path() + '/' + help_info.path,
help=help_info.description,
),
@ -422,8 +237,8 @@ class HandlerNode:
custom_help |= {child.help_node() for child in self.children.values()}
return HelpNode(
help=HelpDoc(path=self.full_path(), help=self.handler.__doc__ or ''),
return doc.HelpNode(
help=doc.HelpDoc(path=self.full_path(), help=self.handler.__doc__ or ''),
children=list(custom_help),
kind=help_node_type,
)

View File

@ -0,0 +1,220 @@
# -*- coding: utf-8 -*-
#
# Copyright (c) 2023 Virtual Cable S.L.U.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without modification,
# are permitted provided that the following conditions are met:
#
# * Redistributions of source code must retain the above copyright notice,
# this list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above copyright notice,
# this list of conditions and the following disclaimer in the documentation
# and/or other materials provided with the distribution.
# * Neither the name of Virtual Cable S.L.U. nor the names of its contributors
# may be used to endorse or promote products derived from this software
# without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
# DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
# SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
"""
Author: Adolfo Gómez, dkmaster at dkmon dot com
"""
import enum
import re
import typing
import dataclasses
import collections.abc
# TypedResponse related.
# Typed responses are used to define the type of the response that a method will return.
# This allow us to "describe" it later on the documentation, and also to check that the
# response is correct (and also to generate the response in the correct format)
class TypedResponse(typing.TypedDict):
pass
def extract_doc(response: type[TypedResponse]) -> dict[str, typing.Any]:
"""
Returns a representation, as json, of the response type to be used on documentation
For this, we build a dict of "name": "<type>" for each field of the response and returns it
Note that we support nested dataclasses and dicts, but not lists
"""
CLASS_REPR: dict[typing.Any, str] = {
str: '<string>',
int: '<integer>',
float: '<float>',
bool: '<boolean>',
dict: '<dict>',
list: '<list>',
typing.Any: '<any>',
}
def _as_help(obj: typing.Any) -> typing.Union[str, dict[str, typing.Any]]:
if hasattr(obj, '__annotations__'):
return {name: _as_help(field) for name, field in obj.__annotations__.items()}
return CLASS_REPR.get(obj, str(obj))
# For sure, first level is a dict
return typing.cast(dict[str, typing.Any], _as_help(response))
def is_typed_response(t: type[TypedResponse]) -> bool:
return hasattr(t, '__orig_bases__') and TypedResponse in t.__orig_bases__
# Regular expression to match the API: part of the docstring
# should be a multi line string, with a line containing only "API:" (with leading and trailing \s)
API_RE = re.compile(r'(?ms)^\s*API:\s*$')
@dataclasses.dataclass(eq=False)
class HelpDoc:
"""
Help helper class
"""
@dataclasses.dataclass
class ArgumentInfo:
name: str
type: str
description: str
path: str
description: str
arguments: list[ArgumentInfo] = dataclasses.field(default_factory=list)
# Result is always a json ressponse, so we can describe it as a dict
# Note that this dict can be nested
returns: typing.Any = None
def __init__(
self,
path: str,
help: str,
*,
arguments: typing.Optional[list[ArgumentInfo]] = None,
returns: typing.Optional[dict[str, typing.Any]] = None,
) -> None:
self.path = path
self.description = help
self.arguments = arguments or []
self.returns = returns or {}
def __hash__(self) -> int:
return hash(self.path)
def __eq__(self, other: object) -> bool:
if not isinstance(other, HelpDoc):
return False
return self.path == other.path
def as_str(self) -> str:
return f'{self.path} - {self.description}'
@property
def is_empty(self) -> bool:
return self.path == '' and self.description == ''
def _process_help(self, help: str, annotations: typing.Optional[dict[str, typing.Any]] = None) -> None:
"""
Processes the help string, removing leading and trailing spaces
"""
self.description = ''
self.arguments = []
self.returns = None
match = API_RE.search(help)
if match:
self.description = help[: match.start()].strip()
if annotations:
if 'return' in annotations:
t = annotations['return']
if isinstance(t, collections.abc.Iterable):
pass
# if issubclass(annotations['return'], TypedResponse):
# self.returns = annotations['return'].as_help()
@staticmethod
def from_typed_response(path: str, help: str, TR: type[TypedResponse]) -> 'HelpDoc':
"""
Returns a HelpDoc from a TypedResponse class
"""
return HelpDoc(
path=path,
help=help,
returns=extract_doc(TR),
)
@staticmethod
def from_fnc(path: str, help: str, fnc: typing.Callable[..., typing.Any]) -> 'HelpDoc|None':
"""
Returns a HelpDoc from a function that returns a list of TypedResponses
"""
return_type: typing.Any = fnc.__annotations__.get('return')
if is_typed_response(return_type):
return HelpDoc.from_typed_response(path, help, typing.cast(type[TypedResponse], return_type))
elif (
isinstance(return_type, collections.abc.Iterable)
and len(typing.get_args(return_type)) == 1
and is_typed_response(typing.get_args(return_type)[0])
):
hd = HelpDoc.from_typed_response(
path, help, typing.cast(type[TypedResponse], typing.cast(typing.Any, return_type).__args__[0])
)
hd.returns = [hd.returns] # We need to return a list of returns
return hd
return None
@dataclasses.dataclass(frozen=True)
class HelpNode:
class Type(enum.StrEnum):
MODEL = 'model'
DETAIL = 'detail'
CUSTOM = 'custom'
PATH = 'path'
class Methods(enum.StrEnum):
GET = 'GET'
POST = 'POST'
PUT = 'PUT'
DELETE = 'DELETE'
PATCH = 'PATCH'
help: HelpDoc
children: list['HelpNode'] # Children nodes
kind: Type
methods: set[Methods] = dataclasses.field(default_factory=lambda: {HelpNode.Methods.GET})
def __hash__(self) -> int:
return hash(self.help.path + ''.join(method for method in self.methods))
def __eq__(self, other: object) -> bool:
if isinstance(other, HelpNode):
return self.help.path == other.help.path and self.methods == other.methods
if not isinstance(other, HelpDoc):
return False
return self.help.path == other.path
@property
def is_empty(self) -> bool:
return self.help.is_empty and not self.children
def __str__(self) -> str:
return f'HelpNode({self.help}, {self.children})'

View File

@ -48,7 +48,7 @@ T = typing.TypeVar('T', bound=typing.Any)
# The callback will be called with the arguments in the order they are in the tuple, so:
# callback(sample, arg_2, argument)
# And the literals will be ignored
def match(
def match_args(
arg_list: collections.abc.Iterable[str],
error: collections.abc.Callable[..., typing.Any],
*args: tuple[tuple[str, ...], collections.abc.Callable[..., T]],

View File

@ -43,7 +43,7 @@ logger = logging.getLogger(__name__)
class TestHelpDoc(TestCase):
def test_helpdoc_basic(self) -> None:
h = rest.HelpDoc('/path', 'help_text')
h = rest.doc.HelpDoc('/path', 'help_text')
self.assertEqual(h.path, '/path')
self.assertEqual(h.description, 'help_text')
@ -51,10 +51,10 @@ class TestHelpDoc(TestCase):
def test_helpdoc_with_args(self) -> None:
arguments = [
rest.HelpDoc.ArgumentInfo('arg1', 'arg1_type', 'arg1_description'),
rest.HelpDoc.ArgumentInfo('arg2', 'arg2_type', 'arg2_description'),
rest.doc.HelpDoc.ArgumentInfo('arg1', 'arg1_type', 'arg1_description'),
rest.doc.HelpDoc.ArgumentInfo('arg2', 'arg2_type', 'arg2_description'),
]
h = rest.HelpDoc(
h = rest.doc.HelpDoc(
'/path',
'help_text',
arguments=arguments,
@ -66,13 +66,13 @@ class TestHelpDoc(TestCase):
def test_helpdoc_with_args_and_return(self) -> None:
arguments = [
rest.HelpDoc.ArgumentInfo('arg1', 'arg1_type', 'arg1_description'),
rest.HelpDoc.ArgumentInfo('arg2', 'arg2_type', 'arg2_description'),
rest.doc.HelpDoc.ArgumentInfo('arg1', 'arg1_type', 'arg1_description'),
rest.doc.HelpDoc.ArgumentInfo('arg2', 'arg2_type', 'arg2_description'),
]
returns = {
'name': 'return_name',
}
h = rest.HelpDoc(
h = rest.doc.HelpDoc(
'/path',
'help_text',
arguments=arguments,
@ -85,13 +85,12 @@ class TestHelpDoc(TestCase):
self.assertEqual(h.returns, returns)
def test_help_doc_from_typed_response(self) -> None:
@dataclasses.dataclass
class TestResponse(rest.TypedResponse):
name: str = 'test_name'
age: int = 0
money: float = 0.0
class TestResponse(rest.doc.TypedResponse):
name: str
age: int
money: float
h = rest.HelpDoc.from_typed_response('path', 'help', TestResponse)
h = rest.doc.HelpDoc.from_typed_response('path', 'help', TestResponse)
self.assertEqual(h.path, 'path')
self.assertEqual(h.description, 'help')
@ -106,20 +105,18 @@ class TestHelpDoc(TestCase):
)
def test_help_doc_from_typed_response_nested_dataclass(self) -> None:
@dataclasses.dataclass
class TestResponse:
name: str = 'test_name'
age: int = 0
money: float = 0.0
@dataclasses.dataclass
class TestResponse2(rest.TypedResponse):
class TestResponse2(rest.doc.TypedResponse):
name: str
age: int
money: float
nested: TestResponse
h = rest.HelpDoc.from_typed_response('path', 'help', TestResponse2)
h = rest.doc.HelpDoc.from_typed_response('path', 'help', TestResponse2)
self.assertEqual(h.path, 'path')
self.assertEqual(h.description, 'help')
@ -139,19 +136,18 @@ class TestHelpDoc(TestCase):
)
def test_help_doc_from_fnc(self) -> None:
@dataclasses.dataclass
class TestResponse(rest.TypedResponse):
name: str = 'test_name'
age: int = 0
money: float = 0.0
class TestResponse(rest.doc.TypedResponse):
name: str
age: int
money: float
def testing_fnc() -> TestResponse:
"""
This is a test function
"""
return TestResponse()
return TestResponse(name='test_name', age=0, money=0.0)
h = rest.HelpDoc.from_fnc('path', 'help', testing_fnc)
h = rest.doc.HelpDoc.from_fnc('path', 'help', testing_fnc)
if h is None:
self.fail('HelpDoc is None')
@ -175,17 +171,17 @@ class TestHelpDoc(TestCase):
"""
return {}
h = rest.HelpDoc.from_fnc('path', 'help', testing_fnc)
h = rest.doc.HelpDoc.from_fnc('path', 'help', testing_fnc)
self.assertIsNone(h)
def test_help_doc_from_fnc_list(self) -> None:
@dataclasses.dataclass
class TestResponse(rest.TypedResponse):
name: str = 'test_name'
age: int = 0
money: float = 0.0
class TestResponse(rest.doc.TypedResponse):
name: str
age: int
money: float
def testing_fnc() -> list[TestResponse]:
"""
@ -193,7 +189,7 @@ class TestHelpDoc(TestCase):
"""
return []
h = rest.HelpDoc.from_fnc('path', 'help', testing_fnc)
h = rest.doc.HelpDoc.from_fnc('path', 'help', testing_fnc)
if h is None:
self.fail('HelpDoc is None')