1
0
mirror of https://github.com/dkmstr/openuds.git synced 2024-12-22 13:34:04 +03:00

Fixed Serialization ui.

From old serialization, data fields where not correctly translated.
Also the test was bad designed, as the data of original was not randomized to ensure data loaded was right.
This commit is contained in:
Adolfo Gómez García 2024-04-18 18:42:02 +02:00
parent d84e5ac38c
commit d93ed52884
No known key found for this signature in database
GPG Key ID: DD1ABF20724CDA23
5 changed files with 206 additions and 106 deletions

View File

@ -69,23 +69,21 @@ class PublicationOldMachinesCleaner(DelayedTask):
def run(self) -> None:
try:
servicePoolPub: ServicePoolPublication = ServicePoolPublication.objects.get(
pk=self._id
)
servicePoolPub: ServicePoolPublication = ServicePoolPublication.objects.get(pk=self._id)
if servicePoolPub.state != State.REMOVABLE:
logger.info('Already removed')
now = sql_datetime()
activePub: typing.Optional[
ServicePoolPublication
] = servicePoolPub.deployed_service.active_publication()
servicePoolPub.deployed_service.userServices.filter(in_use=True).update(
in_use=False, state_date=now
current_publication: typing.Optional[ServicePoolPublication] = (
servicePoolPub.deployed_service.active_publication()
)
servicePoolPub.deployed_service.mark_old_userservices_as_removable(activePub)
except (
Exception
): # nosec: Removed publication, no problem at all, just continue
if current_publication:
servicePoolPub.deployed_service.userServices.filter(in_use=True).exclude(
publication=current_publication
).update(in_use=False, state_date=now)
servicePoolPub.deployed_service.mark_old_userservices_as_removable(current_publication)
except Exception: # nosec: Removed publication, no problem at all, just continue
pass
@ -104,9 +102,7 @@ class PublicationLauncher(DelayedTask):
try:
now = sql_datetime()
with transaction.atomic():
servicePoolPub = ServicePoolPublication.objects.select_for_update().get(
pk=self._publicationId
)
servicePoolPub = ServicePoolPublication.objects.select_for_update().get(pk=self._publicationId)
if not servicePoolPub:
raise ServicePool.DoesNotExist()
if (
@ -121,12 +117,7 @@ class PublicationLauncher(DelayedTask):
servicePool.current_pub_revision += 1
servicePool.set_value(
'toBeReplacedIn',
serialize(
now
+ datetime.timedelta(
hours=GlobalConfig.SESSION_EXPIRE_TIME.as_int(True)
)
),
serialize(now + datetime.timedelta(hours=GlobalConfig.SESSION_EXPIRE_TIME.as_int(True))),
)
servicePool.save()
PublicationFinishChecker.state_updater(servicePoolPub, pi, state)
@ -172,9 +163,7 @@ class PublicationFinishChecker(DelayedTask):
# Now we mark, if it exists, the previous usable publication as "Removable"
if publication_state.is_preparing():
old: ServicePoolPublication
for old in publication.deployed_service.publications.filter(
state=State.USABLE
):
for old in publication.deployed_service.publications.filter(state=State.USABLE):
old.set_state(State.REMOVABLE)
osm = publication.deployed_service.osmanager
@ -186,13 +175,9 @@ class PublicationFinishChecker(DelayedTask):
'pclean-' + str(old.id),
True,
)
publication.deployed_service.mark_old_userservices_as_removable(
publication
)
publication.deployed_service.mark_old_userservices_as_removable(publication)
else: # Remove only cache services, not assigned
publication.deployed_service.mark_old_userservices_as_removable(
publication, True
)
publication.deployed_service.mark_old_userservices_as_removable(publication, True)
publication.set_state(State.USABLE)
elif publication_state.is_removing():
@ -217,9 +202,7 @@ class PublicationFinishChecker(DelayedTask):
PublicationFinishChecker.check_later(publication, publication_instance)
@staticmethod
def check_later(
publication: ServicePoolPublication, publicationInstance: 'services.Publication'
) -> None:
def check_later(publication: ServicePoolPublication, publicationInstance: 'services.Publication') -> None:
"""
Inserts a task in the delayedTaskRunner so we can check the state of this publication
@param dps: Database object for ServicePoolPublication
@ -234,23 +217,17 @@ class PublicationFinishChecker(DelayedTask):
def run(self) -> None:
logger.debug('Checking publication finished %s', self._publishId)
try:
publication: ServicePoolPublication = ServicePoolPublication.objects.get(
pk=self._publishId
)
publication: ServicePoolPublication = ServicePoolPublication.objects.get(pk=self._publishId)
if publication.state != self._state:
logger.debug('Task overrided by another task (state of item changed)')
else:
publicationInstance = publication.get_instance()
logger.debug(
"publication instance class: %s", publicationInstance.__class__
)
logger.debug("publication instance class: %s", publicationInstance.__class__)
try:
state = publicationInstance.check_state()
except Exception:
state = types.states.TaskState.ERROR
PublicationFinishChecker.state_updater(
publication, publicationInstance, state
)
PublicationFinishChecker.state_updater(publication, publicationInstance, state)
except Exception as e:
logger.debug(
'Deployed service not found (erased from database) %s : %s',
@ -272,13 +249,9 @@ class PublicationManager(metaclass=singleton.Singleton):
"""
Returns the singleton to this manager
"""
return (
PublicationManager()
) # Singleton pattern will return always the same instance
return PublicationManager() # Singleton pattern will return always the same instance
def publish(
self, servicepool: ServicePool, changeLog: typing.Optional[str] = None
) -> None:
def publish(self, servicepool: ServicePool, changeLog: typing.Optional[str] = None) -> None:
"""
Initiates the publication of a service pool, or raises an exception if this cannot be done
:param servicePool: Service pool object (db object)
@ -286,15 +259,11 @@ class PublicationManager(metaclass=singleton.Singleton):
"""
if servicepool.publications.filter(state__in=State.PUBLISH_STATES).count() > 0:
raise PublishException(
_(
'Already publishing. Wait for previous publication to finish and try again'
)
_('Already publishing. Wait for previous publication to finish and try again')
)
if servicepool.is_in_maintenance():
raise PublishException(
_('Service is in maintenance mode and new publications are not allowed')
)
raise PublishException(_('Service is in maintenance mode and new publications are not allowed'))
publication: typing.Optional[ServicePoolPublication] = None
try:
@ -322,17 +291,13 @@ class PublicationManager(metaclass=singleton.Singleton):
logger.info('Could not delete %s', publication)
raise PublishException(str(e)) from e
def cancel(
self, publication: ServicePoolPublication
) -> ServicePoolPublication:
def cancel(self, publication: ServicePoolPublication) -> ServicePoolPublication:
"""
Invoked to cancel a publication.
Double invokation (i.e. invokation over a "cancelling" item) will lead to a "forced" cancellation (unclean)
:param servicePoolPub: Service pool publication (db object for a publication)
"""
publication = ServicePoolPublication.objects.get(
pk=publication.id
) # Reloads publication from db
publication = ServicePoolPublication.objects.get(pk=publication.id) # Reloads publication from db
if publication.state not in State.PUBLISH_STATES:
if publication.state == State.CANCELING: # Double cancel
logger.info('Double cancel invoked for a publication')
@ -357,16 +322,12 @@ class PublicationManager(metaclass=singleton.Singleton):
pubInstance = publication.get_instance()
state = pubInstance.cancel()
publication.set_state(State.CANCELING)
PublicationFinishChecker.state_updater(
publication, pubInstance, state
)
PublicationFinishChecker.state_updater(publication, pubInstance, state)
return publication
except Exception as e:
raise PublishException(str(e)) from e
def unpublish(
self, servicepool_publication: ServicePoolPublication
) -> None:
def unpublish(self, servicepool_publication: ServicePoolPublication) -> None:
"""
Unpublishes an active (usable) or removable publication
:param servicePoolPub: Publication to unpublish
@ -377,15 +338,11 @@ class PublicationManager(metaclass=singleton.Singleton):
):
raise PublishException(_('Can\'t unpublish non usable publication'))
if servicepool_publication.userServices.exclude(state__in=State.INFO_STATES).count() > 0:
raise PublishException(
_('Can\'t unpublish publications with services in process')
)
raise PublishException(_('Can\'t unpublish publications with services in process'))
try:
pubInstance = servicepool_publication.get_instance()
state = pubInstance.destroy()
servicepool_publication.set_state(State.REMOVING)
PublicationFinishChecker.state_updater(
servicepool_publication, pubInstance, state
)
PublicationFinishChecker.state_updater(servicepool_publication, pubInstance, state)
except Exception as e:
raise PublishException(str(e)) from e

View File

@ -1684,9 +1684,13 @@ class UserInterface(metaclass=UserInterfaceType):
if not values: # Has nothing
return
field_names_translations: dict[str, str] = self._get_fieldname_translations()
for txt in values.split(FIELD_SEPARATOR):
kb, v = txt.split(NAME_VALUE_SEPARATOR)
k = kb.decode('utf8') # Convert name to string
# convert to new name if needed
k = field_names_translations.get(k, k)
if k in self._gui:
try:
if v.startswith(MULTIVALUE_FIELD):

View File

@ -29,7 +29,7 @@
Author: Adolfo Gómez, dkmaster at dkmon dot com
"""
import typing
import collections.abc
import random
import datetime
from uds.core.ui.user_interface import UserInterface, gui
@ -51,6 +51,92 @@ DEFAULTS: dict[str, typing.Any] = {
}
class TestingOldUserInterface(UserInterface):
strField = gui.TextField(
label='Text Field',
order=0,
tooltip='This is a text field',
required=True,
default=typing.cast(str, DEFAULTS['str_field']),
value=typing.cast(str, DEFAULTS['str_field']),
)
strAutoField = gui.TextAutocompleteField(
label='Text Autocomplete Field',
order=1,
tooltip='This is a text autocomplete field',
required=True,
default=typing.cast(str, DEFAULTS['str_auto_field']),
value=typing.cast(str, DEFAULTS['str_auto_field']),
)
numField = gui.NumericField(
label='Numeric Field',
order=2,
tooltip='This is a numeric field',
required=True,
default=typing.cast(int, DEFAULTS['num_field']),
value=typing.cast(int, DEFAULTS['num_field']),
)
passwordField = gui.PasswordField(
label='Password Field',
order=3,
tooltip='This is a password field',
required=True,
default=typing.cast(str, DEFAULTS['password_field']),
value=typing.cast(str, DEFAULTS['password_field']),
)
hiddenField = gui.HiddenField(
label='Hidden Field',
order=4,
default=DEFAULTS['hidden_field'],
)
choiceField = gui.ChoiceField(
label='Choice Field',
order=5,
tooltip='This is a choice field',
required=False,
default=typing.cast(str, DEFAULTS['choice_field']),
value=typing.cast(str, DEFAULTS['choice_field']),
)
multiChoiceField = gui.MultiChoiceField(
label='Multi Choice Field',
order=6,
tooltip='This is a multi choice field',
default=typing.cast(list[str], DEFAULTS['multi_choice_field']),
value=typing.cast(list[str], DEFAULTS['multi_choice_field']),
)
editableListField = gui.EditableListField(
label='Editable List Field',
order=7,
tooltip='This is a editable list field',
required=False,
default=typing.cast(list[str], DEFAULTS['editable_list_field']),
value=typing.cast(list[str], DEFAULTS['editable_list_field']),
)
checkboxField = gui.CheckBoxField(
label='Checkbox Field',
order=8,
tooltip='This is a checkbox field',
required=True,
default=typing.cast(bool, DEFAULTS['checkbox_field']),
value=typing.cast(bool, DEFAULTS['checkbox_field']),
)
imageChoiceField = gui.ImageChoiceField(
label='Image Choice Field',
order=9,
tooltip='This is a image choice field',
required=True,
default=typing.cast(str, DEFAULTS['image_choice_field']),
value=typing.cast(str, DEFAULTS['image_choice_field']),
)
dateField = gui.DateField(
label='Date Field',
order=10,
tooltip='This is a date field',
required=True,
default=typing.cast(datetime.date, DEFAULTS['date_field']),
value=typing.cast(datetime.date, DEFAULTS['date_field']),
)
class TestingUserInterface(UserInterface):
str_field = gui.TextField(
label='Text Field',
@ -67,6 +153,7 @@ class TestingUserInterface(UserInterface):
required=True,
default=typing.cast(str, DEFAULTS['str_auto_field']),
choices=['Value 1', 'Value 2', 'Value 3'],
old_field_name='strAutoField',
)
num_field = gui.NumericField(
label='Numeric Field',
@ -76,6 +163,7 @@ class TestingUserInterface(UserInterface):
default=typing.cast(int, DEFAULTS['num_field']),
min_value=0,
max_value=100,
old_field_name='numField',
)
password_field = gui.PasswordField(
label='Password Field',
@ -83,11 +171,13 @@ class TestingUserInterface(UserInterface):
tooltip='This is a password field',
required=True,
default=typing.cast(str, DEFAULTS['password_field']),
old_field_name='passwordField',
)
hidden_field = gui.HiddenField(
label='Hidden Field',
order=4,
default=DEFAULTS['hidden_field'],
old_field_name='hiddenField',
)
choice_field = gui.ChoiceField(
label='Choice Field',
@ -96,6 +186,7 @@ class TestingUserInterface(UserInterface):
required=False,
default=typing.cast(str, DEFAULTS['choice_field']),
choices=['Value 1', 'Value 2', 'Value 3'],
old_field_name='choiceField',
)
multi_choice_field = gui.MultiChoiceField(
label='Multi Choice Field',
@ -103,6 +194,7 @@ class TestingUserInterface(UserInterface):
tooltip='This is a multi choice field',
default=typing.cast(list[str], DEFAULTS['multi_choice_field']),
choices=['Value 1', 'Value 2', 'Value 3'],
old_field_name='multiChoiceField',
)
editable_list_field = gui.EditableListField(
label='Editable List Field',
@ -110,6 +202,7 @@ class TestingUserInterface(UserInterface):
tooltip='This is a editable list field',
required=False,
default=typing.cast(list[str], DEFAULTS['editable_list_field']),
old_field_name='editableListField',
)
checkbox_field = gui.CheckBoxField(
label='Checkbox Field',
@ -117,6 +210,7 @@ class TestingUserInterface(UserInterface):
tooltip='This is a checkbox field',
required=True,
default=typing.cast(bool, DEFAULTS['checkbox_field']),
old_field_name='checkboxField',
)
image_choice_field = gui.ImageChoiceField(
label='Image Choice Field',
@ -125,6 +219,7 @@ class TestingUserInterface(UserInterface):
required=True,
default=typing.cast(str, DEFAULTS['image_choice_field']),
choices=['Value 1', 'Value 2', 'Value 3'],
old_field_name='imageChoiceField',
)
date_field = gui.DateField(
label='Date Field',
@ -132,15 +227,33 @@ class TestingUserInterface(UserInterface):
tooltip='This is a date field',
required=True,
default=typing.cast(datetime.date, DEFAULTS['date_field']),
old_field_name='dateField',
)
help_field = gui.HelpField(
label='Info Field',
label='Info Field',
title=DEFAULTS['help_field'][0],
help=DEFAULTS['help_field'][1],
old_field_name='helpField',
)
# Equals operator, to speed up tests writing
def __eq__(self, other: typing.Any) -> bool:
if isinstance(other, TestingOldUserInterface):
return (
self.str_field.value == other.strField.value
and self.str_auto_field.value == other.strAutoField.value
and self.num_field.as_int() == other.numField.as_int()
and self.password_field.value == other.passwordField.value
# Hidden field is not compared, because it is not serialized
and self.choice_field.value == other.choiceField.value
and self.multi_choice_field.value == other.multiChoiceField.value
and self.editable_list_field.value == other.editableListField.value
and self.checkbox_field.value == other.checkboxField.value
and self.image_choice_field.value == other.imageChoiceField.value
and self.date_field.value == other.dateField.value
# Info field is not compared, because it is not serialized
# Nor help field, not present in old version
)
if not isinstance(other, TestingUserInterface):
return False
return (
@ -156,7 +269,34 @@ class TestingUserInterface(UserInterface):
and self.image_choice_field.value == other.image_choice_field.value
and self.date_field.value == other.date_field.value
# Info field is not compared, because it is not serialized
# Nor help field, not present in old version
)
def randomize_values(self) -> None:
self.str_field.default = ''.join(random.choices('abcdefghijklmnopqrstuvwxyz', k=10))
self.str_auto_field.default = ''.join(random.choices('abcdefghijklmnopqrstuvwxyz', k=10))
self.num_field.default = random.randint(0, 100)
self.password_field.default = ''.join(random.choices('abcdefghijklmnopqrstuvwxyz', k=10))
self.choice_field.default = random.choice(['Value 1', 'Value 2', 'Value 3'])
self.multi_choice_field.default = random.sample(['Value 1', 'Value 2', 'Value 3'], 2)
self.editable_list_field.default = random.sample(['Value 1', 'Value 2', 'Value 3'], 2)
self.checkbox_field.default = random.choice([True, False])
self.image_choice_field.default = random.choice(['Value 1', 'Value 2', 'Value 3'])
self.date_field.default = datetime.date(random.randint(2000, 2022), random.randint(1, 12), random.randint(1, 28))
# Ignore HelpField, not present in old version
# Also, randomize values
self.str_field.value = self.str_field.default
self.str_auto_field.value = self.str_auto_field.default
self.num_field.value = self.num_field.default
self.password_field.value = self.password_field.default
self.choice_field.value = self.choice_field.default
self.multi_choice_field.value = self.multi_choice_field.default
self.editable_list_field.value = self.editable_list_field.default
self.checkbox_field.value = self.checkbox_field.default
self.image_choice_field.value = self.image_choice_field.default
self.date_field.value = self.date_field.default
class TestingUserInterfaceFieldNameOrig(UserInterface):
@ -166,6 +306,7 @@ class TestingUserInterfaceFieldNameOrig(UserInterface):
tooltip='This is a text field',
required=True,
default=typing.cast(str, DEFAULTS['str_field']),
value=typing.cast(str, DEFAULTS['str_field']),
)

View File

@ -32,17 +32,12 @@
@author: Adolfo Gómez, dkmaster at dkmon dot com
"""
import logging
import typing
import collections.abc
import time
# We use commit/rollback
from ...utils.test import UDSTestCase
from uds.core import types, consts
from uds.core.ui.user_interface import gui
from ...fixtures.user_interface import TestingUserInterface, DEFAULTS
from .fixtures import TestingUserInterface, DEFAULTS
logger = logging.getLogger(__name__)
@ -53,41 +48,41 @@ class UserinterfaceInternalTest(UDSTestCase):
ui = TestingUserInterface()
self.assertEqual(ui.str_field.value, DEFAULTS['str_field'])
self.assertEqual(ui.str_field.as_str(), DEFAULTS['str_field'])
self.assertEqual(ui.str_auto_field.value, DEFAULTS['str_auto_field'])
self.assertEqual(ui.str_auto_field.as_str(), DEFAULTS['str_auto_field'])
self.assertEqual(ui.num_field.value, DEFAULTS['num_field'])
self.assertEqual(ui.num_field.as_int(), DEFAULTS['num_field'])
self.assertEqual(ui.password_field.value, DEFAULTS['password_field'])
self.assertEqual(ui.password_field.as_str(), DEFAULTS['password_field'])
self.assertEqual(ui.hidden_field.value, DEFAULTS['hidden_field'])
# Hidden field has no as_...
self.assertEqual(ui.choice_field.value, DEFAULTS['choice_field'])
self.assertEqual(ui.choice_field.as_str(), DEFAULTS['choice_field'])
self.assertEqual(ui.multi_choice_field.value, DEFAULTS['multi_choice_field'])
self.assertEqual(ui.multi_choice_field.as_list(), DEFAULTS['multi_choice_field'])
self.assertEqual(ui.editable_list_field.value, DEFAULTS['editable_list_field'])
self.assertEqual(ui.editable_list_field.as_list(), DEFAULTS['editable_list_field'])
self.assertEqual(ui.checkbox_field.value, DEFAULTS['checkbox_field'])
self.assertEqual(ui.checkbox_field.as_bool(), DEFAULTS['checkbox_field'])
self.assertEqual(ui.image_choice_field.value, DEFAULTS['image_choice_field'])
self.assertEqual(ui.image_choice_field.as_str(), DEFAULTS['image_choice_field'])
self.assertEqual(ui.date_field.value, DEFAULTS['date_field'])
self.assertEqual(ui.date_field.as_date(), DEFAULTS['date_field'])
self.assertEqual(ui.date_field.as_datetime().date(), DEFAULTS['date_field'])
self.assertEqual(ui.date_field.as_timestamp(), int(time.mktime(DEFAULTS['date_field'].timetuple())))
self.assertEqual(ui.help_field.value, DEFAULTS['help_field'])
def test_default(self) -> None:
ui = TestingUserInterface()
# Now for default values
@ -167,11 +162,11 @@ class UserinterfaceInternalTest(UDSTestCase):
'help_field': DEFAULTS['help_field'],
},
)
def test_labels(self) -> None:
ui = TestingUserInterface()
self.assertEqual(
{ k: v.label for k, v in ui._gui.items() },
{k: v.label for k, v in ui._gui.items()},
{
'str_field': 'Text Field',
'str_auto_field': 'Text Autocomplete Field',
@ -187,11 +182,11 @@ class UserinterfaceInternalTest(UDSTestCase):
'help_field': 'Info Field',
},
)
def test_order(self) -> None:
ui = TestingUserInterface()
self.assertEqual(
{ k: v._fields_info.order for k, v in ui._gui.items() },
{k: v._fields_info.order for k, v in ui._gui.items()},
{
'str_field': 0,
'str_auto_field': 1,
@ -207,11 +202,11 @@ class UserinterfaceInternalTest(UDSTestCase):
'help_field': 0, # Info field is without order, so it's 0
},
)
def test_required(self) -> None:
ui = TestingUserInterface()
self.assertEqual(
{ k: v._fields_info.required for k, v in ui._gui.items() },
{k: v._fields_info.required for k, v in ui._gui.items()},
{
'str_field': True,
'str_auto_field': True,
@ -227,17 +222,17 @@ class UserinterfaceInternalTest(UDSTestCase):
'help_field': None, # Info field is without required, so it's None
},
)
def test_tooltip(self) -> None:
ui = TestingUserInterface()
self.assertEqual(
{ k: v._fields_info.tooltip for k, v in ui._gui.items() },
{k: v._fields_info.tooltip for k, v in ui._gui.items()},
{
'str_field': 'This is a text field',
'str_auto_field': 'This is a text autocomplete field',
'num_field': 'This is a numeric field',
'password_field': 'This is a password field',
'hidden_field': '', # Tooltip is required, so it's ''
'hidden_field': '', # Tooltip is required, so it's ''
'choice_field': 'This is a choice field',
'multi_choice_field': 'This is a multi choice field',
'editable_list_field': 'This is a editable list field',
@ -246,4 +241,4 @@ class UserinterfaceInternalTest(UDSTestCase):
'date_field': 'This is a date field',
'help_field': '', # Info field is without tooltip, so it's '' because it's required
},
)
)

View File

@ -33,7 +33,6 @@
"""
import logging
import typing
import collections.abc
# We use commit/rollback
from ...utils.test import UDSTestCase
@ -42,8 +41,9 @@ from uds.core import types, consts
from uds.core.ui.user_interface import UserInterface
from unittest import mock
from ...fixtures.user_interface import (
from .fixtures import (
TestingUserInterface,
TestingOldUserInterface,
DEFAULTS,
TestingUserInterfaceFieldName,
TestingUserInterfaceFieldNameOrig,
@ -150,16 +150,18 @@ class UserinterfaceTest(UDSTestCase):
def test_old_serialization(self) -> None:
# This test is to ensure that old serialized data can be loaded
# This data is from a
ui = TestingUserInterface()
ui = TestingOldUserInterface()
data = old_serialize_form(ui)
ui2 = TestingUserInterface()
ui2.randomize_values() # Ensure data is different
ui2.deserialize_from_old_format(data)
self.assertEqual(ui, ui2)
self.assertEqual(ui2, ui) # Important, TestingUserInterface has __eq__ method, but TestingOldUserInterface has not
self.ensure_values_fine(ui2)
# Now deserialize old data with new method, (will internally call oldUnserializeForm)
ui3 = TestingUserInterface()
ui3.randomize_values() # Ensure data is different
self.assertTrue(ui3.deserialize_fields(data)) # Should need upgrade
self.assertEqual(ui, ui3)
@ -171,6 +173,7 @@ class UserinterfaceTest(UDSTestCase):
ui = TestingUserInterface()
data = ui.serialize_fields()
ui2 = TestingUserInterface()
ui2.randomize_values() # Ensure data is different
self.assertFalse(ui2.deserialize_fields(data)) # Should not need upgrade
self.assertEqual(ui, ui2)