1
0
mirror of https://github.com/dkmstr/openuds.git synced 2025-11-19 16:28:01 +03:00

5 Commits

Author SHA1 Message Date
aschumann-virtualcable
01d70fc510 changes in sanitized_name 2025-11-11 16:56:31 +01:00
aschumann-virtualcable
e38c7a367f Merge remote-tracking branch 'origin/master' into dev/andres/master 2025-11-11 16:48:31 +01:00
aschumann-virtualcable
06ef1ed201 Merge remote-tracking branch 'origin/master' into dev/andres/master 2025-11-06 15:57:04 +01:00
aschumann-virtualcable
2d2a6fe69c Merge branch 'dev/andres/master' of github.com:VirtualCable/openuds into dev/andres/master 2025-11-05 13:42:22 +01:00
aschumann-virtualcable
b534278280 refactor: Enhance VM name sanitization in OpenshiftProvider and update tests 2025-11-05 13:42:13 +01:00
8 changed files with 29 additions and 49 deletions

View File

@@ -37,8 +37,10 @@ import collections.abc
import traceback
from django import http
import django
import django.db
import django.db.models
from django.utils.decorators import method_decorator
from django.core.exceptions import ObjectDoesNotExist
from django.views.decorators.csrf import csrf_exempt
from django.views.generic.base import View
@@ -207,9 +209,9 @@ class Dispatcher(View):
except exceptions.services.generics.Error as e:
log.log_operation(handler, 503, types.log.LogLevel.ERROR)
return http.HttpResponseServerError(
f'{{"error": "{e}"}}'.encode(), content_type="application/json", status=503
f'{{"error": "{e}"}}'.encode(), content_type="application/json", code=503
)
except ObjectDoesNotExist as e: # All DoesNotExist exceptions are not found
except django.db.models.Model.DoesNotExist as e: # All DoesNotExist exceptions are not found
log.log_operation(handler, 404, types.log.LogLevel.ERROR)
return http.HttpResponseNotFound(f'{{"error": "{e}"}}'.encode(), content_type="application/json")
except Exception as e:

View File

@@ -392,16 +392,16 @@ class Handler(abc.ABC):
return self._params[name]
return ''
def get_sort_field_info(self, *args: str) -> tuple[str, bool] | None:
def get_sort_field_info(self, *args: str) -> tuple[str, bool]|None:
"""
Returns sorting information for the first sorting if it is contained in the odata orderby list.
Args:
args: The possible name of the field name to check for sorting information.
Returns:
A tuple containing the clean field name found and a boolean indicating if the sorting is descending,
Note:
We only use the first in case of table sort translations, so this only returns info for the first field
"""
@@ -413,7 +413,7 @@ class Handler(abc.ABC):
is_descending = order_field.startswith('-')
return (clean_field, is_descending)
return None
def apply_sort(self, qs: QuerySet[typing.Any]) -> list[typing.Any] | QuerySet[typing.Any]:
"""
Custom sorting function to apply to querysets.
@@ -452,7 +452,7 @@ class Handler(abc.ABC):
result = self.apply_sort(qs)
else:
result = qs
# If odata start/limit are set, apply them
if self.odata.start is not None:
result = result[self.odata.start :]
@@ -464,6 +464,13 @@ class Handler(abc.ABC):
# to avoid issues later
result = list(result)
# Get total items and set it on X-Filtered-Count
try:
total_items = len(result)
self.add_header('X-Filtered-Count', total_items)
except Exception as e:
raise exceptions.rest.RequestError(f'Invalid odata: {e}')
return result
def filter_odata_data(self, data: collections.abc.Iterable[T]) -> list[T]:

View File

@@ -51,7 +51,7 @@ class Config(Handler):
ROLE = consts.UserRole.ADMIN
def get(self) -> typing.Any:
return CfgConfig.get_config_values(self.is_admin())
return self.filter_odata_data(CfgConfig.get_config_values(self.is_admin()))
def put(self) -> typing.Any:
for section, section_dict in typing.cast(dict[str, dict[str, dict[str, str]]], self._params).items():

View File

@@ -36,7 +36,7 @@ import logging
import typing
from django.utils.translation import gettext, gettext_lazy as _
from django.db.models import Model, Count
from django.db.models import Model
import uds.core.types.permissions
from uds.core import exceptions, services, types
@@ -51,9 +51,6 @@ from .services_usage import ServicesUsage
logger = logging.getLogger(__name__)
if typing.TYPE_CHECKING:
from django.db.models.query import QuerySet
# Helper class for Provider offers
@dataclasses.dataclass
@@ -99,8 +96,6 @@ class Providers(ModelHandler[ProviderItem]):
.numeric_column(name='user_services_count', title=_('User Services'))
.text_column(name='tags', title=_('Tags'), visible=False)
.row_style(prefix='row-maintenance-', field='maintenance_mode')
.with_field_mappings(type_name='data_type')
.with_filter_fields('name', 'data_type', 'comments', 'maintenance_mode')
).build()
# Rest api related information to complete the auto-generated API
@@ -108,14 +103,6 @@ class Providers(ModelHandler[ProviderItem]):
typed=types.rest.api.RestApiInfoGuiType.MULTIPLE_TYPES,
)
def apply_sort(self, qs: 'QuerySet[typing.Any]') -> 'list[typing.Any] | QuerySet[typing.Any]':
if field_info := self.get_sort_field_info('services_count'):
field_name, is_descending = field_info
order_by_field = f"-{field_name}" if is_descending else field_name
return qs.annotate(services_count=Count('services')).order_by(order_by_field)
return super().apply_sort(qs)
def get_item(self, item: 'Model') -> ProviderItem:
item = ensure.is_instance(item, Provider)
type_ = item.get_type()

View File

@@ -53,8 +53,6 @@ from uds.REST.model import DetailHandler
from .user_services import AssignedUserService, UserServiceItem
if typing.TYPE_CHECKING:
from django.db.models.query import QuerySet
logger = logging.getLogger(__name__)
@@ -118,13 +116,6 @@ class Users(DetailHandler[UserItem]):
groups=[i.uuid for i in user.get_groups()],
role=user.get_role().as_str(),
)
def apply_sort(self, qs: 'QuerySet[typing.Any]') -> 'list[typing.Any] | QuerySet[typing.Any]':
if field_info := self.get_sort_field_info('role'):
descending = '-' if field_info[1] else ''
return qs.order_by(f'{descending}is_admin', f'{descending}staff_member')
return super().apply_sort(qs)
def get_item_position(self, parent: 'Model', item_uuid: str) -> int:
parent = ensure.is_instance(parent, Authenticator)

View File

@@ -198,10 +198,15 @@ class ModelHandler(BaseModelHandler[types.rest.T_Item], abc.ABC):
method = getattr(detail_handler, self._operation)
return method()
except self.MODEL.DoesNotExist:
raise exceptions.rest.NotFound('Item not found on model {self.MODEL.__name__}')
except (KeyError, AttributeError) as e:
raise exceptions.rest.InvalidMethodError(f'Invalid method {self._operation}') from e
except exceptions.rest.HandlerError:
raise
except Exception as e:
logger.error('Exception processing detail: %s', e)
raise exceptions.rest.RequestError(f'Error processing detail: {e}') from e
# Data related
def get_item(self, item: models.Model) -> types.rest.T_Item:

View File

@@ -125,18 +125,6 @@ class OpenshiftProvider(ServiceProvider):
# Utility
def sanitized_name(self, name: str) -> str:
"""
Sanitizes the VM name to comply with RFC 1123:
- Converts to lowercase
- Replaces any character not in [a-z0-9.-] with '-'
- Collapses multiple '-' into one
- Removes leading/trailing non-alphanumeric characters
- Limits length to 63 characters
OpenShift only allows machine names with [a-zA-Z0-9_-]
"""
name = name.lower()
# Replace any character not allowed with '-'
name = re.sub(r'[^a-z0-9.-]', '-', name)
# Collapse multiple '-' into one
name = re.sub(r'-{2,}', '-', name)
# Remove leading/trailing non-alphanumeric characters
name = re.sub(r'^[^a-z0-9]+|[^a-z0-9]+$', '', name)
return name[:63]
return re.sub(r'[^a-zA-Z0-9-]', '-', name).lower()[:63]

View File

@@ -134,9 +134,9 @@ class TestOpenshiftProvider(UDSTransactionTestCase):
test_cases = [
('Test-VM-1', 'test-vm-1'),
('Test_VM@2', 'test-vm-2'),
('My Test VM!!!', 'my-test-vm'),
('Test !!! this is', 'test-this-is'),
('UDS-Pub-Hello World!!--2025065122-v1', 'uds-pub-hello-world-2025065122-v1'),
('My Test VM!!!', 'my-test-vm---'),
('Test !!! this is', 'test-----this-is'),
('UDS-Pub-Hello World!!--2025065122-v1', 'uds-pub-hello-world----2025065122-v1'),
('a' * 100, 'a' * 63), # Test truncation
]
for input_name, expected in test_cases: