1
0
mirror of https://github.com/dkmstr/openuds.git synced 2025-12-07 04:24:19 +03:00

3 Commits

Author SHA1 Message Date
Adolfo Gómez García
67a58d57cb Remove redundant exception handling in ModelHandler 2025-11-11 19:09:20 +01:00
Adolfo Gómez García
39a046bb23 Refactor type hinting and clean up whitespace in sorting methods 2025-11-11 18:44:08 +01:00
Adolfo Gómez García
ae16e78a4a Refactor error handling and improve sorting methods in REST handlers 2025-11-11 18:40:07 +01:00
8 changed files with 49 additions and 29 deletions

View File

@@ -37,10 +37,8 @@ import collections.abc
import traceback
from django import http
import django
import django.db
import django.db.models
from django.utils.decorators import method_decorator
from django.core.exceptions import ObjectDoesNotExist
from django.views.decorators.csrf import csrf_exempt
from django.views.generic.base import View
@@ -209,9 +207,9 @@ class Dispatcher(View):
except exceptions.services.generics.Error as e:
log.log_operation(handler, 503, types.log.LogLevel.ERROR)
return http.HttpResponseServerError(
f'{{"error": "{e}"}}'.encode(), content_type="application/json", code=503
f'{{"error": "{e}"}}'.encode(), content_type="application/json", status=503
)
except django.db.models.Model.DoesNotExist as e: # All DoesNotExist exceptions are not found
except ObjectDoesNotExist as e: # All DoesNotExist exceptions are not found
log.log_operation(handler, 404, types.log.LogLevel.ERROR)
return http.HttpResponseNotFound(f'{{"error": "{e}"}}'.encode(), content_type="application/json")
except Exception as e:

View File

@@ -392,16 +392,16 @@ class Handler(abc.ABC):
return self._params[name]
return ''
def get_sort_field_info(self, *args: str) -> tuple[str, bool]|None:
def get_sort_field_info(self, *args: str) -> tuple[str, bool] | None:
"""
Returns sorting information for the first sorting if it is contained in the odata orderby list.
Args:
args: The possible name of the field name to check for sorting information.
Returns:
A tuple containing the clean field name found and a boolean indicating if the sorting is descending,
Note:
We only use the first in case of table sort translations, so this only returns info for the first field
"""
@@ -413,7 +413,7 @@ class Handler(abc.ABC):
is_descending = order_field.startswith('-')
return (clean_field, is_descending)
return None
def apply_sort(self, qs: QuerySet[typing.Any]) -> list[typing.Any] | QuerySet[typing.Any]:
"""
Custom sorting function to apply to querysets.
@@ -452,7 +452,7 @@ class Handler(abc.ABC):
result = self.apply_sort(qs)
else:
result = qs
# If odata start/limit are set, apply them
if self.odata.start is not None:
result = result[self.odata.start :]
@@ -464,13 +464,6 @@ class Handler(abc.ABC):
# to avoid issues later
result = list(result)
# Get total items and set it on X-Filtered-Count
try:
total_items = len(result)
self.add_header('X-Filtered-Count', total_items)
except Exception as e:
raise exceptions.rest.RequestError(f'Invalid odata: {e}')
return result
def filter_odata_data(self, data: collections.abc.Iterable[T]) -> list[T]:

View File

@@ -51,7 +51,7 @@ class Config(Handler):
ROLE = consts.UserRole.ADMIN
def get(self) -> typing.Any:
return self.filter_odata_data(CfgConfig.get_config_values(self.is_admin()))
return CfgConfig.get_config_values(self.is_admin())
def put(self) -> typing.Any:
for section, section_dict in typing.cast(dict[str, dict[str, dict[str, str]]], self._params).items():

View File

@@ -36,7 +36,7 @@ import logging
import typing
from django.utils.translation import gettext, gettext_lazy as _
from django.db.models import Model
from django.db.models import Model, Count
import uds.core.types.permissions
from uds.core import exceptions, services, types
@@ -51,6 +51,9 @@ from .services_usage import ServicesUsage
logger = logging.getLogger(__name__)
if typing.TYPE_CHECKING:
from django.db.models.query import QuerySet
# Helper class for Provider offers
@dataclasses.dataclass
@@ -96,6 +99,8 @@ class Providers(ModelHandler[ProviderItem]):
.numeric_column(name='user_services_count', title=_('User Services'))
.text_column(name='tags', title=_('Tags'), visible=False)
.row_style(prefix='row-maintenance-', field='maintenance_mode')
.with_field_mappings(type_name='data_type')
.with_filter_fields('name', 'data_type', 'comments', 'maintenance_mode')
).build()
# Rest api related information to complete the auto-generated API
@@ -103,6 +108,14 @@ class Providers(ModelHandler[ProviderItem]):
typed=types.rest.api.RestApiInfoGuiType.MULTIPLE_TYPES,
)
def apply_sort(self, qs: 'QuerySet[typing.Any]') -> 'list[typing.Any] | QuerySet[typing.Any]':
if field_info := self.get_sort_field_info('services_count'):
field_name, is_descending = field_info
order_by_field = f"-{field_name}" if is_descending else field_name
return qs.annotate(services_count=Count('services')).order_by(order_by_field)
return super().apply_sort(qs)
def get_item(self, item: 'Model') -> ProviderItem:
item = ensure.is_instance(item, Provider)
type_ = item.get_type()

View File

@@ -53,6 +53,8 @@ from uds.REST.model import DetailHandler
from .user_services import AssignedUserService, UserServiceItem
if typing.TYPE_CHECKING:
from django.db.models.query import QuerySet
logger = logging.getLogger(__name__)
@@ -116,6 +118,13 @@ class Users(DetailHandler[UserItem]):
groups=[i.uuid for i in user.get_groups()],
role=user.get_role().as_str(),
)
def apply_sort(self, qs: 'QuerySet[typing.Any]') -> 'list[typing.Any] | QuerySet[typing.Any]':
if field_info := self.get_sort_field_info('role'):
descending = '-' if field_info[1] else ''
return qs.order_by(f'{descending}is_admin', f'{descending}staff_member')
return super().apply_sort(qs)
def get_item_position(self, parent: 'Model', item_uuid: str) -> int:
parent = ensure.is_instance(parent, Authenticator)

View File

@@ -198,15 +198,10 @@ class ModelHandler(BaseModelHandler[types.rest.T_Item], abc.ABC):
method = getattr(detail_handler, self._operation)
return method()
except self.MODEL.DoesNotExist:
raise exceptions.rest.NotFound('Item not found on model {self.MODEL.__name__}')
except (KeyError, AttributeError) as e:
raise exceptions.rest.InvalidMethodError(f'Invalid method {self._operation}') from e
except exceptions.rest.HandlerError:
raise
except Exception as e:
logger.error('Exception processing detail: %s', e)
raise exceptions.rest.RequestError(f'Error processing detail: {e}') from e
# Data related
def get_item(self, item: models.Model) -> types.rest.T_Item:

View File

@@ -125,6 +125,18 @@ class OpenshiftProvider(ServiceProvider):
# Utility
def sanitized_name(self, name: str) -> str:
"""
OpenShift only allows machine names with [a-zA-Z0-9_-]
Sanitizes the VM name to comply with RFC 1123:
- Converts to lowercase
- Replaces any character not in [a-z0-9.-] with '-'
- Collapses multiple '-' into one
- Removes leading/trailing non-alphanumeric characters
- Limits length to 63 characters
"""
return re.sub(r'[^a-zA-Z0-9-]', '-', name).lower()[:63]
name = name.lower()
# Replace any character not allowed with '-'
name = re.sub(r'[^a-z0-9.-]', '-', name)
# Collapse multiple '-' into one
name = re.sub(r'-{2,}', '-', name)
# Remove leading/trailing non-alphanumeric characters
name = re.sub(r'^[^a-z0-9]+|[^a-z0-9]+$', '', name)
return name[:63]

View File

@@ -134,9 +134,9 @@ class TestOpenshiftProvider(UDSTransactionTestCase):
test_cases = [
('Test-VM-1', 'test-vm-1'),
('Test_VM@2', 'test-vm-2'),
('My Test VM!!!', 'my-test-vm---'),
('Test !!! this is', 'test-----this-is'),
('UDS-Pub-Hello World!!--2025065122-v1', 'uds-pub-hello-world----2025065122-v1'),
('My Test VM!!!', 'my-test-vm'),
('Test !!! this is', 'test-this-is'),
('UDS-Pub-Hello World!!--2025065122-v1', 'uds-pub-hello-world-2025065122-v1'),
('a' * 100, 'a' * 63), # Test truncation
]
for input_name, expected in test_cases: