mirror of
https://github.com/dkmstr/openuds.git
synced 2025-03-11 00:58:39 +03:00
Migrated defered deletions to common worker, more versatile and common for all
This commit is contained in:
parent
dec744a0e2
commit
14694cdaf8
@ -11,4 +11,4 @@ python_classes =
|
||||
filterwarnings =
|
||||
error
|
||||
ignore:The --rsyncdir command line argument and rsyncdirs config variable are deprecated.:DeprecationWarning
|
||||
|
||||
ignore::matplotlib._api.deprecation.MatplotlibDeprecationWarning:pydev
|
@ -475,7 +475,7 @@ class DynamicTestingService(dynamic_service.DynamicService):
|
||||
|
||||
def get_ip(
|
||||
self,
|
||||
caller_instance: dynamic_userservice.DynamicUserService | dynamic_publication.DynamicPublication,
|
||||
caller_instance: typing.Optional[dynamic_userservice.DynamicUserService | dynamic_publication.DynamicPublication],
|
||||
vmid: str,
|
||||
) -> str:
|
||||
self.mock.get_ip(caller_instance, vmid)
|
||||
@ -483,7 +483,7 @@ class DynamicTestingService(dynamic_service.DynamicService):
|
||||
|
||||
def get_mac(
|
||||
self,
|
||||
caller_instance: dynamic_userservice.DynamicUserService | dynamic_publication.DynamicPublication,
|
||||
caller_instance: typing.Optional[dynamic_userservice.DynamicUserService | dynamic_publication.DynamicPublication],
|
||||
vmid: str,
|
||||
) -> str:
|
||||
self.mock.get_mac(caller_instance, vmid)
|
||||
@ -491,7 +491,7 @@ class DynamicTestingService(dynamic_service.DynamicService):
|
||||
|
||||
def is_running(
|
||||
self,
|
||||
caller_instance: dynamic_userservice.DynamicUserService | dynamic_publication.DynamicPublication,
|
||||
caller_instance: typing.Optional[dynamic_userservice.DynamicUserService | dynamic_publication.DynamicPublication],
|
||||
vmid: str,
|
||||
) -> bool:
|
||||
self.mock.is_running(caller_instance, vmid)
|
||||
@ -499,7 +499,7 @@ class DynamicTestingService(dynamic_service.DynamicService):
|
||||
|
||||
def start(
|
||||
self,
|
||||
caller_instance: dynamic_userservice.DynamicUserService | dynamic_publication.DynamicPublication,
|
||||
caller_instance: typing.Optional[dynamic_userservice.DynamicUserService | dynamic_publication.DynamicPublication],
|
||||
vmid: str,
|
||||
) -> None:
|
||||
self.mock.start(caller_instance, vmid)
|
||||
@ -507,7 +507,7 @@ class DynamicTestingService(dynamic_service.DynamicService):
|
||||
|
||||
def stop(
|
||||
self,
|
||||
caller_instance: dynamic_userservice.DynamicUserService | dynamic_publication.DynamicPublication,
|
||||
caller_instance: typing.Optional[dynamic_userservice.DynamicUserService | dynamic_publication.DynamicPublication],
|
||||
vmid: str,
|
||||
) -> None:
|
||||
self.mock.stop(caller_instance, vmid)
|
||||
@ -515,7 +515,7 @@ class DynamicTestingService(dynamic_service.DynamicService):
|
||||
|
||||
def shutdown(
|
||||
self,
|
||||
caller_instance: dynamic_userservice.DynamicUserService | dynamic_publication.DynamicPublication,
|
||||
caller_instance: typing.Optional[dynamic_userservice.DynamicUserService | dynamic_publication.DynamicPublication],
|
||||
vmid: str,
|
||||
) -> None:
|
||||
self.mock.shutdown(caller_instance, vmid)
|
||||
@ -523,15 +523,18 @@ class DynamicTestingService(dynamic_service.DynamicService):
|
||||
|
||||
def delete(
|
||||
self,
|
||||
caller_instance: dynamic_userservice.DynamicUserService | dynamic_publication.DynamicPublication,
|
||||
caller_instance: typing.Optional[dynamic_userservice.DynamicUserService | dynamic_publication.DynamicPublication],
|
||||
vmid: str,
|
||||
) -> None:
|
||||
self.mock.remove(caller_instance, vmid)
|
||||
self.machine_running_flag = False
|
||||
|
||||
def execute_delete(self, vmid: str) -> None:
|
||||
self.mock.execute_delete(vmid)
|
||||
|
||||
def reset(
|
||||
self,
|
||||
caller_instance: dynamic_userservice.DynamicUserService | dynamic_publication.DynamicPublication,
|
||||
caller_instance: typing.Optional[dynamic_userservice.DynamicUserService | dynamic_publication.DynamicPublication],
|
||||
vmid: str,
|
||||
) -> None:
|
||||
self.mock.reset(caller_instance, vmid)
|
||||
@ -539,7 +542,7 @@ class DynamicTestingService(dynamic_service.DynamicService):
|
||||
|
||||
def suspend(
|
||||
self,
|
||||
caller_instance: dynamic_userservice.DynamicUserService | dynamic_publication.DynamicPublication,
|
||||
caller_instance: typing.Optional[dynamic_userservice.DynamicUserService | dynamic_publication.DynamicPublication],
|
||||
vmid: str,
|
||||
) -> None:
|
||||
self.mock.suspend(caller_instance, vmid)
|
||||
@ -653,6 +656,8 @@ class DynamicTestingServiceForDeferredDeletion(dynamic_service.DynamicService):
|
||||
type_name = 'Dynamic Deferred Deletion Testing'
|
||||
type_type = 'DynamicDeferredServiceTesting'
|
||||
type_description = 'Dynamic Service Testing description'
|
||||
|
||||
must_stop_before_deletion = False
|
||||
|
||||
mock: 'mock.Mock' = mock.MagicMock() # Remember, shared between instances
|
||||
|
||||
@ -666,21 +671,21 @@ class DynamicTestingServiceForDeferredDeletion(dynamic_service.DynamicService):
|
||||
# Not used, but needed to be implemented due to bein abstract
|
||||
def get_ip(
|
||||
self,
|
||||
caller_instance: dynamic_userservice.DynamicUserService | dynamic_publication.DynamicPublication,
|
||||
caller_instance: typing.Optional[dynamic_userservice.DynamicUserService | dynamic_publication.DynamicPublication],
|
||||
vmid: str,
|
||||
) -> str:
|
||||
return ''
|
||||
|
||||
def get_mac(
|
||||
self,
|
||||
caller_instance: dynamic_userservice.DynamicUserService | dynamic_publication.DynamicPublication,
|
||||
caller_instance: typing.Optional[dynamic_userservice.DynamicUserService | dynamic_publication.DynamicPublication],
|
||||
vmid: str,
|
||||
) -> str:
|
||||
return ''
|
||||
|
||||
def is_running(
|
||||
self,
|
||||
caller_instance: dynamic_userservice.DynamicUserService | dynamic_publication.DynamicPublication,
|
||||
caller_instance: typing.Optional[dynamic_userservice.DynamicUserService | dynamic_publication.DynamicPublication],
|
||||
vmid: str,
|
||||
) -> bool:
|
||||
self.mock.is_running(vmid)
|
||||
@ -688,20 +693,19 @@ class DynamicTestingServiceForDeferredDeletion(dynamic_service.DynamicService):
|
||||
|
||||
def start(
|
||||
self,
|
||||
caller_instance: dynamic_userservice.DynamicUserService | dynamic_publication.DynamicPublication,
|
||||
caller_instance: typing.Optional[dynamic_userservice.DynamicUserService | dynamic_publication.DynamicPublication],
|
||||
vmid: str,
|
||||
) -> None:
|
||||
self.mock.start(vmid)
|
||||
|
||||
def stop(
|
||||
self,
|
||||
caller_instance: dynamic_userservice.DynamicUserService | dynamic_publication.DynamicPublication,
|
||||
caller_instance: typing.Optional[dynamic_userservice.DynamicUserService | dynamic_publication.DynamicPublication],
|
||||
vmid: str,
|
||||
) -> None:
|
||||
self.mock.stop(vmid)
|
||||
|
||||
|
||||
|
||||
class DynamicTestingProvider(services.provider.ServiceProvider):
|
||||
type_name = 'Dynamic Provider'
|
||||
type_type = 'DynamicProvider'
|
||||
|
@ -42,6 +42,7 @@ from uds.core.workers import deferred_deletion
|
||||
from uds.core.services.generics import exceptions as gen_exceptions
|
||||
|
||||
from ....utils.test import UDSTransactionTestCase
|
||||
from ....utils import helpers
|
||||
from . import fixtures
|
||||
|
||||
|
||||
@ -52,11 +53,14 @@ class DynamicServiceTest(UDSTransactionTestCase):
|
||||
services.factory().insert(fixtures.DynamicTestingProvider)
|
||||
|
||||
def set_last_check_expired(self) -> None:
|
||||
for group in [deferred_deletion.TO_DELETE_GROUP, deferred_deletion.DELETING_GROUP]:
|
||||
for group in [
|
||||
deferred_deletion.TO_DELETE_GROUP,
|
||||
deferred_deletion.DELETING_GROUP,
|
||||
deferred_deletion.TO_STOP_GROUP,
|
||||
deferred_deletion.STOPPING_GROUP,
|
||||
]:
|
||||
with deferred_deletion.DeferredDeletionWorker.deferred_storage.as_dict(group) as storage:
|
||||
for key, info in typing.cast(
|
||||
dict[str, deferred_deletion.DeferredDeletionInfo], storage
|
||||
).items():
|
||||
for key, info in typing.cast(dict[str, deferred_deletion.DeletionInfo], storage).items():
|
||||
info.last_check = sql_now() - datetime.timedelta(
|
||||
seconds=deferred_deletion.CHECK_INTERVAL * 2
|
||||
)
|
||||
@ -69,25 +73,31 @@ class DynamicServiceTest(UDSTransactionTestCase):
|
||||
@contextlib.contextmanager
|
||||
def patch_for_worker(
|
||||
self,
|
||||
group: str,
|
||||
*,
|
||||
execute_side_effect: typing.Union[None, typing.Callable[..., None], Exception] = None,
|
||||
is_deleted_side_effect: typing.Union[None, typing.Callable[..., bool], Exception] = None,
|
||||
) -> typing.Iterator[tuple[mock.MagicMock, dict[str, dict[str, deferred_deletion.DeferredDeletionInfo]]]]:
|
||||
is_running: typing.Union[None, typing.Callable[..., bool]] = None,
|
||||
must_stop_before_deletion: bool = False,
|
||||
should_try_soft_shutdown: bool = False,
|
||||
) -> typing.Iterator[tuple[mock.MagicMock, dict[str, dict[str, deferred_deletion.DeletionInfo]]]]:
|
||||
"""
|
||||
Patch the storage to use a dict instead of the real storage
|
||||
|
||||
This is useful to test the worker without touching the real storage
|
||||
"""
|
||||
dct: dict[str, dict[str, deferred_deletion.DeferredDeletionInfo]] = {}
|
||||
dct: dict[str, dict[str, deferred_deletion.DeletionInfo]] = {}
|
||||
instance = mock.MagicMock()
|
||||
instance_db_obj = mock.MagicMock(uuid='service1')
|
||||
instance_db_obj.get_instance.return_value = instance
|
||||
instance.db_obj.return_value = instance_db_obj
|
||||
instance.execute_delete.side_effect = execute_side_effect
|
||||
if is_deleted_side_effect == None:
|
||||
instance.is_deleted.return_value = True
|
||||
else:
|
||||
instance.is_deleted.side_effect = is_deleted_side_effect
|
||||
instance.is_deleted.side_effect = is_deleted_side_effect or helpers.returns_true
|
||||
|
||||
instance.must_stop_before_deletion = must_stop_before_deletion
|
||||
instance.should_try_soft_shutdown.return_value = should_try_soft_shutdown
|
||||
instance.is_running.side_effect = is_running or helpers.returns_false
|
||||
instance.stop.return_value = None
|
||||
instance.shutdown.return_value = None
|
||||
|
||||
# Patchs uds.models.Service also for get_instance to work
|
||||
with mock.patch('uds.models.Service.objects') as objs:
|
||||
@ -99,7 +109,7 @@ class DynamicServiceTest(UDSTransactionTestCase):
|
||||
@contextlib.contextmanager
|
||||
def _as_dict(
|
||||
group: str, *args: typing.Any, **kwargs: typing.Any
|
||||
) -> typing.Iterator[dict[str, deferred_deletion.DeferredDeletionInfo]]:
|
||||
) -> typing.Iterator[dict[str, deferred_deletion.DeletionInfo]]:
|
||||
if group not in dct:
|
||||
dct[group] = {}
|
||||
yield dct[group]
|
||||
@ -108,6 +118,7 @@ class DynamicServiceTest(UDSTransactionTestCase):
|
||||
yield instance, dct
|
||||
|
||||
def test_deferred_delete_full_fine_delete(self) -> None:
|
||||
# Tests only delete and is_deleted, no stop and stopping
|
||||
|
||||
service = fixtures.create_dynamic_service_for_deferred_deletion()
|
||||
|
||||
@ -143,17 +154,19 @@ class DynamicServiceTest(UDSTransactionTestCase):
|
||||
# Reset mock
|
||||
fixtures.DynamicTestingServiceForDeferredDeletion.mock.reset_mock()
|
||||
|
||||
# No entries into to_delete
|
||||
# No entries into to_delete, nor TO_STOP nor STOPPING
|
||||
self.assertEqual(self.count_entries_on_storage(deferred_deletion.TO_DELETE_GROUP), 0)
|
||||
self.assertEqual(self.count_entries_on_storage(deferred_deletion.TO_STOP_GROUP), 0)
|
||||
self.assertEqual(self.count_entries_on_storage(deferred_deletion.STOPPING_GROUP), 0)
|
||||
|
||||
# Storage db should have 16 entries
|
||||
with deferred_deletion.DeferredDeletionWorker.deferred_storage.as_dict(
|
||||
deferred_deletion.DELETING_GROUP
|
||||
) as deleting:
|
||||
self.assertEqual(len(deleting), 16)
|
||||
for key, info in typing.cast(dict[str, deferred_deletion.DeferredDeletionInfo], deleting).items():
|
||||
for key, info in typing.cast(dict[str, deferred_deletion.DeletionInfo], deleting).items():
|
||||
now = sql_now()
|
||||
self.assertIsInstance(info, deferred_deletion.DeferredDeletionInfo)
|
||||
self.assertIsInstance(info, deferred_deletion.DeletionInfo)
|
||||
self.assertEqual(key, f'{info.service_uuid}_{info.vmid}')
|
||||
self.assertLessEqual(info.created, now)
|
||||
self.assertLessEqual(info.last_check, now)
|
||||
@ -162,13 +175,13 @@ class DynamicServiceTest(UDSTransactionTestCase):
|
||||
|
||||
# Instantiate the Job
|
||||
job = deferred_deletion.DeferredDeletionWorker(environment=mock.MagicMock())
|
||||
to_delete = job._get_from_storage(deferred_deletion.TO_DELETE_GROUP)
|
||||
to_delete = deferred_deletion.DeletionInfo.get_from_storage(deferred_deletion.TO_DELETE_GROUP)
|
||||
# Should be empty, both services and infos
|
||||
self.assertEqual(len(to_delete[0]), 0)
|
||||
self.assertEqual(len(to_delete[1]), 0)
|
||||
|
||||
# Now, get from deleting
|
||||
deleting = job._get_from_storage(deferred_deletion.DELETING_GROUP)
|
||||
deleting = deferred_deletion.DeletionInfo.get_from_storage(deferred_deletion.DELETING_GROUP)
|
||||
# Should have o services and infos also, because last_check has been too soon
|
||||
self.assertEqual(len(deleting[0]), 0)
|
||||
self.assertEqual(len(deleting[1]), 0)
|
||||
@ -179,7 +192,9 @@ class DynamicServiceTest(UDSTransactionTestCase):
|
||||
# Now, get from deleting again, should have all services and infos
|
||||
# OVerride MAX_DELETIONS_AT_ONCE to get only 1 entries
|
||||
deferred_deletion.MAX_DELETIONS_AT_ONCE = 1
|
||||
services_1, key_info_1 = job._get_from_storage(deferred_deletion.DELETING_GROUP)
|
||||
services_1, key_info_1 = deferred_deletion.DeletionInfo.get_from_storage(
|
||||
deferred_deletion.DELETING_GROUP
|
||||
)
|
||||
self.assertEqual(len(services_1), 1)
|
||||
self.assertEqual(len(key_info_1), 1)
|
||||
# And should rest only 15 on storage
|
||||
@ -188,9 +203,11 @@ class DynamicServiceTest(UDSTransactionTestCase):
|
||||
) as deleting:
|
||||
self.assertEqual(len(deleting), 15)
|
||||
deferred_deletion.MAX_DELETIONS_AT_ONCE = 16
|
||||
services_2, key_info_2 = job._get_from_storage(deferred_deletion.DELETING_GROUP)
|
||||
self.assertEqual(len(services_2), 8) # 8 services must be returned
|
||||
self.assertEqual(len(key_info_2), 15) # And 15 entries
|
||||
services_2, key_info_2 = deferred_deletion.DeletionInfo.get_from_storage(
|
||||
deferred_deletion.DELETING_GROUP
|
||||
)
|
||||
self.assertEqual(len(services_2), 8) # 8 services must be returned
|
||||
self.assertEqual(len(key_info_2), 15) # And 15 entries
|
||||
|
||||
# Re-store all DELETING_GROUP entries
|
||||
with deferred_deletion.DeferredDeletionWorker.deferred_storage.as_dict(
|
||||
@ -241,7 +258,7 @@ class DynamicServiceTest(UDSTransactionTestCase):
|
||||
self.assertEqual(self.count_entries_on_storage(deferred_deletion.TO_DELETE_GROUP), 1)
|
||||
|
||||
job = deferred_deletion.DeferredDeletionWorker(environment=mock.MagicMock())
|
||||
to_delete = job._get_from_storage(deferred_deletion.TO_DELETE_GROUP)
|
||||
to_delete = deferred_deletion.DeletionInfo.get_from_storage(deferred_deletion.TO_DELETE_GROUP)
|
||||
# Should be empty, both services and infos
|
||||
self.assertEqual(len(to_delete[0]), 0)
|
||||
self.assertEqual(len(to_delete[1]), 0)
|
||||
@ -250,7 +267,7 @@ class DynamicServiceTest(UDSTransactionTestCase):
|
||||
self.set_last_check_expired()
|
||||
|
||||
# Now, get from deleting again, should have all services and infos
|
||||
services, key_info = job._get_from_storage(deferred_deletion.TO_DELETE_GROUP)
|
||||
services, key_info = deferred_deletion.DeletionInfo.get_from_storage(deferred_deletion.TO_DELETE_GROUP)
|
||||
self.assertEqual(len(services), 1)
|
||||
self.assertEqual(len(key_info), 1)
|
||||
# now, db should be empty
|
||||
@ -278,9 +295,9 @@ class DynamicServiceTest(UDSTransactionTestCase):
|
||||
deferred_deletion.DELETING_GROUP
|
||||
) as deleting:
|
||||
self.assertEqual(len(deleting), 1)
|
||||
for key, info in typing.cast(dict[str, deferred_deletion.DeferredDeletionInfo], deleting).items():
|
||||
for key, info in typing.cast(dict[str, deferred_deletion.DeletionInfo], deleting).items():
|
||||
now = sql_now()
|
||||
self.assertIsInstance(info, deferred_deletion.DeferredDeletionInfo)
|
||||
self.assertIsInstance(info, deferred_deletion.DeletionInfo)
|
||||
self.assertEqual(key, f'{info.service_uuid}_{info.vmid}')
|
||||
self.assertLessEqual(info.created, now)
|
||||
self.assertLessEqual(info.last_check, now)
|
||||
@ -305,6 +322,39 @@ class DynamicServiceTest(UDSTransactionTestCase):
|
||||
self.assertEqual(self.count_entries_on_storage(deferred_deletion.DELETING_GROUP), 0)
|
||||
self.assertEqual(self.count_entries_on_storage(deferred_deletion.TO_DELETE_GROUP), 0)
|
||||
|
||||
def test_deferred_deletion_is_deleted(self) -> None:
|
||||
for is_deleted in (True, False):
|
||||
with self.patch_for_worker(
|
||||
is_deleted_side_effect=lambda *args: is_deleted,
|
||||
) as (instance, dct):
|
||||
deferred_deletion.DeferredDeletionWorker.add(instance, 'vmid1', execute_later=False)
|
||||
|
||||
# No entries in TO_DELETE_GROUP
|
||||
self.assertEqual(self.count_entries_on_storage(deferred_deletion.TO_DELETE_GROUP), 0)
|
||||
# One entry in DELETING_GROUP
|
||||
self.assertEqual(self.count_entries_on_storage(deferred_deletion.DELETING_GROUP), 1)
|
||||
|
||||
info = next(iter(dct[deferred_deletion.DELETING_GROUP].values()))
|
||||
|
||||
# Fix last_check
|
||||
self.set_last_check_expired()
|
||||
|
||||
job = deferred_deletion.DeferredDeletionWorker(environment=mock.MagicMock())
|
||||
job.run()
|
||||
|
||||
# Should have called is_deleted once
|
||||
instance.is_deleted.assert_called_once_with('vmid1')
|
||||
# if is_deleted returns True, should have removed the entry
|
||||
if is_deleted:
|
||||
self.assertEqual(self.count_entries_on_storage(deferred_deletion.TO_DELETE_GROUP), 0)
|
||||
self.assertEqual(self.count_entries_on_storage(deferred_deletion.DELETING_GROUP), 0)
|
||||
else:
|
||||
self.assertEqual(self.count_entries_on_storage(deferred_deletion.TO_DELETE_GROUP), 0)
|
||||
self.assertEqual(self.count_entries_on_storage(deferred_deletion.DELETING_GROUP), 1)
|
||||
# Also, info should have been updated
|
||||
self.assertEqual(info.fatal_retries, 0)
|
||||
self.assertEqual(info.total_retries, 1)
|
||||
|
||||
def test_deferred_deletion_fails_add(self) -> None:
|
||||
for error in (
|
||||
gen_exceptions.RetryableError('error'),
|
||||
@ -312,7 +362,6 @@ class DynamicServiceTest(UDSTransactionTestCase):
|
||||
gen_exceptions.FatalError('error'),
|
||||
):
|
||||
with self.patch_for_worker(
|
||||
deferred_deletion.TO_DELETE_GROUP,
|
||||
execute_side_effect=error,
|
||||
) as (instance, dct):
|
||||
deferred_deletion.DeferredDeletionWorker.add(instance, 'vmid1', execute_later=False)
|
||||
@ -376,7 +425,6 @@ class DynamicServiceTest(UDSTransactionTestCase):
|
||||
gen_exceptions.FatalError('error'),
|
||||
):
|
||||
with self.patch_for_worker(
|
||||
deferred_deletion.DELETING_GROUP,
|
||||
is_deleted_side_effect=error,
|
||||
) as (instance, dct):
|
||||
deferred_deletion.DeferredDeletionWorker.add(instance, 'vmid1', execute_later=False)
|
||||
@ -423,3 +471,106 @@ class DynamicServiceTest(UDSTransactionTestCase):
|
||||
# Should have removed the entry
|
||||
self.assertEqual(self.count_entries_on_storage(deferred_deletion.TO_DELETE_GROUP), 0)
|
||||
self.assertEqual(self.count_entries_on_storage(deferred_deletion.DELETING_GROUP), 0)
|
||||
|
||||
def test_deferred_stop(self) -> None:
|
||||
|
||||
# Explanation:
|
||||
# key: running, execute_later, should_try_soft_shutdown
|
||||
# TODELETE, DELETING, TO_STOP, STOPPING, is_running calls, stop calls, shutdown calls
|
||||
# group assignation:
|
||||
# * to TO_STOP if execute_later is true (#1, #3, #5, #7), because is requested so (and service requires stop)
|
||||
# * to STOPPING if running and execute_later is false (#2, #4). If not running, no need to stop
|
||||
# * to DELETING if not running and execute_later is false (#6, #8). If running, just proceed to delete and store in the deleting group
|
||||
# calls:
|
||||
# * is running is called if no execute_later (#2, #4, #6, #8)
|
||||
# * stop is called if running and not execute_later and not should_try_soft_shutdown (#4)
|
||||
# * shutdown is called if running and not execute_later and should_try_soft_shutdown (#2)
|
||||
COUNTERS_ADD: typing.Final[dict[tuple[bool, bool, bool], tuple[int, int, int, int, int, int, int]]] = {
|
||||
# run exec try TD DE TS ST is st sh
|
||||
(True, True, True): (0, 0, 1, 0, 0, 0, 0), # 1
|
||||
(True, False, True): (0, 0, 0, 1, 1, 0, 1), # 2
|
||||
(True, True, False): (0, 0, 1, 0, 0, 0, 0), # 3
|
||||
(True, False, False): (0, 0, 0, 1, 1, 1, 0), # 4
|
||||
(False, True, True): (0, 0, 1, 0, 0, 0, 0), # 5
|
||||
(False, False, True): (0, 1, 0, 0, 1, 0, 0), # 6
|
||||
(False, True, False): (0, 0, 1, 0, 0, 0, 0), # 7
|
||||
(False, False, False): (0, 1, 0, 0, 1, 0, 0), # 8
|
||||
}
|
||||
|
||||
# Explanation:
|
||||
# key: running, execute_later, should_try_soft_shutdown
|
||||
# TODELETE, DELETING, TO_STOP, STOPPING, is_running calls, stop calls, shutdown calls
|
||||
# group assignation:
|
||||
# * to TO_DELETE is not used in this flow, because as soon as the vm is stopped, is added to this group and PROCESSED
|
||||
# so, before exiting, it's already in the DELETING group
|
||||
# * to DELETING if not runing and execute_later is false (#6, #8). This is so because if not running. is moved to
|
||||
# the TODELETE group, and processed inmediately before returning from job run
|
||||
# * to TO_STOP is never moved, because it's the first step and only assigned on "add" method
|
||||
# * to STOPPING will contain 1 item as long as the vm is running (#1, #2, #3, #4)
|
||||
# Note that #6 and #8 are all 0, because the procedure has been completed (remember comes from ADD #6 and #8, that was in DELETING)
|
||||
# calls:
|
||||
# * is running if comes from TO_STOP or STOPPING (#1, #2, #3, #4, #5 and #7)
|
||||
# * stop if running and execute later and not should_try_soft_shutdown (#1, #3, #5)
|
||||
# * shutdown if running and execute later and should_try_soft_shutdown (#2)
|
||||
COUNTERS_JOB: dict[tuple[bool, bool, bool], tuple[int, int, int, int, int, int, int]] = {
|
||||
# run exec try TD DE TS ST is st sh
|
||||
(True, True, True): (0, 0, 0, 1, 1, 0, 1), # 1
|
||||
(True, False, True): (0, 0, 0, 1, 1, 0, 0), # 2
|
||||
(True, True, False): (0, 0, 0, 1, 1, 1, 0), # 3
|
||||
(True, False, False): (0, 0, 0, 1, 1, 0, 0), # 4
|
||||
(False, True, True): (0, 1, 0, 0, 1, 0, 0), # 5
|
||||
(False, False, True): (0, 0, 0, 0, 0, 0, 0), # 6
|
||||
(False, True, False): (0, 1, 0, 0, 1, 0, 0), # 7
|
||||
(False, False, False): (0, 0, 0, 0, 0, 0, 0), # 8
|
||||
}
|
||||
|
||||
for running in (True, False):
|
||||
|
||||
def _running(*args: typing.Any, **kwargs: typing.Any) -> bool:
|
||||
return running
|
||||
|
||||
for should_try_soft_shutdown in (True, False):
|
||||
for execute_later in (True, False):
|
||||
with self.patch_for_worker(
|
||||
is_running=_running,
|
||||
must_stop_before_deletion=True,
|
||||
should_try_soft_shutdown=should_try_soft_shutdown,
|
||||
) as (instance, _dct):
|
||||
deferred_deletion.DeferredDeletionWorker.add(
|
||||
instance, 'vmid1', execute_later=execute_later
|
||||
)
|
||||
|
||||
self.assertEqual(
|
||||
COUNTERS_ADD[(running, execute_later, should_try_soft_shutdown)],
|
||||
(
|
||||
self.count_entries_on_storage(deferred_deletion.TO_DELETE_GROUP),
|
||||
self.count_entries_on_storage(deferred_deletion.DELETING_GROUP),
|
||||
self.count_entries_on_storage(deferred_deletion.TO_STOP_GROUP),
|
||||
self.count_entries_on_storage(deferred_deletion.STOPPING_GROUP),
|
||||
instance.is_running.call_count,
|
||||
instance.stop.call_count,
|
||||
instance.shutdown.call_count,
|
||||
),
|
||||
f'COUNTERS_ADD {running} {execute_later} --> {COUNTERS_ADD[(running, execute_later, should_try_soft_shutdown)]}',
|
||||
)
|
||||
|
||||
# Fix last_check
|
||||
self.set_last_check_expired()
|
||||
|
||||
job = deferred_deletion.DeferredDeletionWorker(environment=mock.MagicMock())
|
||||
instance.reset_mock()
|
||||
job.run()
|
||||
|
||||
self.assertEqual(
|
||||
COUNTERS_JOB[(running, execute_later, should_try_soft_shutdown)],
|
||||
(
|
||||
self.count_entries_on_storage(deferred_deletion.TO_DELETE_GROUP),
|
||||
self.count_entries_on_storage(deferred_deletion.DELETING_GROUP),
|
||||
self.count_entries_on_storage(deferred_deletion.TO_STOP_GROUP),
|
||||
self.count_entries_on_storage(deferred_deletion.STOPPING_GROUP),
|
||||
instance.is_running.call_count,
|
||||
instance.stop.call_count,
|
||||
instance.shutdown.call_count,
|
||||
),
|
||||
f'COUNTERS_JOB {running} {execute_later} --> {COUNTERS_JOB[(running, execute_later, should_try_soft_shutdown)]}',
|
||||
)
|
||||
|
@ -224,7 +224,7 @@ class DynamicServiceTest(UDSTestCase):
|
||||
def test_userservice_try_soft_shutdown(self) -> None:
|
||||
service = fixtures.create_dynamic_service(try_soft_shutdown=True)
|
||||
userservice = fixtures.create_dynamic_userservice(service)
|
||||
self.assertTrue(service.try_graceful_shutdown())
|
||||
self.assertTrue(service.should_try_soft_shutdown())
|
||||
|
||||
# full deploy
|
||||
state = userservice.deploy_for_user(models.User())
|
||||
|
@ -144,7 +144,7 @@ class TestProxmovLinkedService(UDSTestCase):
|
||||
self.assertEqual(service.is_ha_enabled(), service.is_ha_enabled())
|
||||
|
||||
# Try graceful shutdown
|
||||
self.assertEqual(service.try_graceful_shutdown(), service.try_soft_shutdown.value)
|
||||
self.assertEqual(service.should_try_soft_shutdown(), service.try_soft_shutdown.value)
|
||||
|
||||
# Get console connection
|
||||
self.assertEqual(service.get_console_connection('1'), fixtures.CONSOLE_CONNECTION_INFO)
|
||||
|
@ -108,3 +108,9 @@ def waiter(finish_checker: typing.Callable[[], bool], timeout: int = 64, msg: ty
|
||||
|
||||
if msg:
|
||||
logger.info('%s. Elapsed time: %s', msg, time.time() - start_time)
|
||||
|
||||
def returns_true(*args: typing.Any, **kwargs: typing.Any) -> bool:
|
||||
return True
|
||||
|
||||
def returns_false(*args: typing.Any, **kwargs: typing.Any) -> bool:
|
||||
return False
|
@ -57,18 +57,8 @@ class DynamicService(services.Service, abc.ABC): # pylint: disable=too-many-pub
|
||||
uses_cache = False # Cache are running machine awaiting to be assigned
|
||||
uses_cache_l2 = False # L2 Cache are running machines in suspended state
|
||||
needs_osmanager = False # If the service needs a s.o. manager (managers are related to agents provided by services, i.e. virtual machines with agent)
|
||||
# can_reset = True
|
||||
|
||||
# : Types of publications (preparated data for deploys)
|
||||
# : In our case, we do no need a publication, so this is None
|
||||
# publication_type = None
|
||||
# : Types of deploys (services in cache and/or assigned to users)
|
||||
# Every service must have overrided FixedUserService with it's own implementation
|
||||
# so this needs to be overrided
|
||||
# user_service_type = DynamicUserService
|
||||
|
||||
# allowed_protocols = types.transports.Protocol.generic_vdi(types.transports.Protocol.SPICE)
|
||||
# services_type_provided = types.services.ServiceType.VDI
|
||||
must_stop_before_deletion = True # If the service must be stopped before deletion
|
||||
|
||||
# Gui remplates, to be "incorporated" by inherited classes if needed
|
||||
base_machine = gui.ChoiceField(
|
||||
@ -161,7 +151,7 @@ class DynamicService(services.Service, abc.ABC): # pylint: disable=too-many-pub
|
||||
return []
|
||||
|
||||
@abc.abstractmethod
|
||||
def get_ip(self, caller_instance: 'DynamicUserService | DynamicPublication', vmid: str) -> str:
|
||||
def get_ip(self, caller_instance: typing.Optional['DynamicUserService | DynamicPublication'], vmid: str) -> str:
|
||||
"""
|
||||
Returns the ip of the machine
|
||||
If cannot be obtained, MUST raise an exception
|
||||
@ -169,7 +159,7 @@ class DynamicService(services.Service, abc.ABC): # pylint: disable=too-many-pub
|
||||
...
|
||||
|
||||
@abc.abstractmethod
|
||||
def get_mac(self, caller_instance: 'DynamicUserService | DynamicPublication', vmid: str) -> str:
|
||||
def get_mac(self, caller_instance: typing.Optional['DynamicUserService | DynamicPublication'], vmid: str) -> str:
|
||||
"""
|
||||
Returns the mac of the machine
|
||||
If cannot be obtained, MUST raise an exception
|
||||
@ -180,14 +170,14 @@ class DynamicService(services.Service, abc.ABC): # pylint: disable=too-many-pub
|
||||
...
|
||||
|
||||
@abc.abstractmethod
|
||||
def is_running(self, caller_instance: 'DynamicUserService | DynamicPublication', vmid: str) -> bool:
|
||||
def is_running(self, caller_instance: typing.Optional['DynamicUserService | DynamicPublication'], vmid: str) -> bool:
|
||||
"""
|
||||
Returns if the machine is ready and running
|
||||
"""
|
||||
...
|
||||
|
||||
@abc.abstractmethod
|
||||
def start(self, caller_instance: 'DynamicUserService | DynamicPublication', vmid: str) -> None:
|
||||
def start(self, caller_instance: typing.Optional['DynamicUserService | DynamicPublication'], vmid: str) -> None:
|
||||
"""
|
||||
Starts the machine
|
||||
Returns None. If a task is needed for anything, use the caller_instance to notify
|
||||
@ -195,21 +185,21 @@ class DynamicService(services.Service, abc.ABC): # pylint: disable=too-many-pub
|
||||
...
|
||||
|
||||
@abc.abstractmethod
|
||||
def stop(self, caller_instance: 'DynamicUserService | DynamicPublication', vmid: str) -> None:
|
||||
def stop(self, caller_instance: typing.Optional['DynamicUserService | DynamicPublication'], vmid: str) -> None:
|
||||
"""
|
||||
Stops the machine
|
||||
Returns None. If a task is needed for anything, use the caller_instance to notify
|
||||
"""
|
||||
...
|
||||
|
||||
def shutdown(self, caller_instance: 'DynamicUserService | DynamicPublication', vmid: str) -> None:
|
||||
def shutdown(self, caller_instance: typing.Optional['DynamicUserService | DynamicPublication'], vmid: str) -> None:
|
||||
"""
|
||||
Shutdowns the machine. Defaults to stop
|
||||
Returns None. If a task is needed for anything, use the caller_instance to notify
|
||||
"""
|
||||
return self.stop(caller_instance, vmid)
|
||||
|
||||
def reset(self, caller_instance: 'DynamicUserService | DynamicPublication', vmid: str) -> None:
|
||||
def reset(self, caller_instance: typing.Optional['DynamicUserService | DynamicPublication'], vmid: str) -> None:
|
||||
"""
|
||||
Resets the machine
|
||||
Returns None. If a task is needed for anything, use the caller_instance to notify
|
||||
@ -217,19 +207,25 @@ class DynamicService(services.Service, abc.ABC): # pylint: disable=too-many-pub
|
||||
# Default is to stop "hard"
|
||||
return self.stop(caller_instance, vmid)
|
||||
|
||||
def delete(self, caller_instance: 'DynamicUserService | DynamicPublication', vmid: str) -> None:
|
||||
def delete(self, caller_instance: typing.Optional['DynamicUserService | DynamicPublication'], vmid: str) -> None:
|
||||
"""
|
||||
Removes the machine, or queues it for removal, or whatever :)
|
||||
Use the caller_instance to notify anything if needed, or to identify caller
|
||||
"""
|
||||
DeferredDeletionWorker.add(self, vmid)
|
||||
|
||||
# Method for deferred deletion
|
||||
# Note the lack of "caller_instance" parameter, this is because the deletion is deferred and
|
||||
# the caller instance could be already deleted
|
||||
@abc.abstractmethod
|
||||
def execute_delete(self, vmid: str) -> None:
|
||||
"""
|
||||
Executes the deferred deletion of a machine (normally, call the delete method of the api)
|
||||
This is make abstract to force the implementation of this method, even if not used because
|
||||
you provided a custom delete method, but better to have it implemented
|
||||
"""
|
||||
raise NotImplementedError('deferred_delete must be implemented')
|
||||
|
||||
...
|
||||
|
||||
def is_deleted(self, vmid: str) -> bool:
|
||||
"""
|
||||
Checks if the deferred deletion of a machine is done
|
||||
@ -249,7 +245,7 @@ class DynamicService(services.Service, abc.ABC): # pylint: disable=too-many-pub
|
||||
"""
|
||||
return not self.should_maintain_on_error()
|
||||
|
||||
def try_graceful_shutdown(self) -> bool:
|
||||
def should_try_soft_shutdown(self) -> bool:
|
||||
if self.has_field('try_soft_shutdown'):
|
||||
return self.try_soft_shutdown.value
|
||||
return False
|
||||
|
@ -489,7 +489,7 @@ class DynamicUserService(services.UserService, autoserializable.AutoSerializable
|
||||
|
||||
shutdown_operations: list[types.services.Operation] = (
|
||||
[]
|
||||
if not self.service().try_graceful_shutdown()
|
||||
if not self.service().should_try_soft_shutdown()
|
||||
else [types.services.Operation.SHUTDOWN, types.services.Operation.SHUTDOWN_COMPLETED]
|
||||
)
|
||||
destroy_operations = (
|
||||
|
@ -55,12 +55,14 @@ MAX_DELETIONS_CHECKED_AT_ONCE: typing.Final[int] = MAX_DELETIONS_AT_ONCE * 2
|
||||
|
||||
CHECK_INTERVAL: typing.Final[int] = 32 # Check interval, in seconds
|
||||
|
||||
TO_STOP_GROUP: typing.Final[str] = 'to_stop'
|
||||
STOPPING_GROUP: typing.Final[str] = 'stopping'
|
||||
TO_DELETE_GROUP: typing.Final[str] = 'to_delete'
|
||||
DELETING_GROUP: typing.Final[str] = 'deleting'
|
||||
|
||||
|
||||
@dataclasses.dataclass
|
||||
class DeferredDeletionInfo:
|
||||
class DeletionInfo:
|
||||
vmid: str
|
||||
created: datetime.datetime
|
||||
last_check: datetime.datetime
|
||||
@ -68,57 +70,33 @@ class DeferredDeletionInfo:
|
||||
fatal_retries: int = 0 # Fatal error retries
|
||||
total_retries: int = 0 # Total retries
|
||||
|
||||
|
||||
class DeferredDeletionWorker(Job):
|
||||
frecuency = 32 # Frecuncy for this job, in seconds
|
||||
friendly_name = 'Deferred deletion runner'
|
||||
|
||||
deferred_storage: typing.ClassVar[storage.Storage] = storage.Storage('deferdel_worker')
|
||||
def sync_to_storage(self, group: str) -> None:
|
||||
"""
|
||||
Ensures that this object is stored on the storage
|
||||
If exists, it will be updated, if not, it will be created
|
||||
"""
|
||||
unique_key = f'{self.service_uuid}_{self.vmid}'
|
||||
with DeferredDeletionWorker.deferred_storage.as_dict(group, atomic=True) as storage_dict:
|
||||
storage_dict[unique_key] = self
|
||||
|
||||
@staticmethod
|
||||
def add(service: 'DynamicService', vmid: str, execute_later: bool = False) -> None:
|
||||
# First, try sync deletion
|
||||
unique_key = f'{service.db_obj().uuid}_{vmid}'
|
||||
|
||||
def _add_for_later() -> None:
|
||||
with DeferredDeletionWorker.deferred_storage.as_dict(TO_DELETE_GROUP, atomic=True) as storage_dict:
|
||||
storage_dict[unique_key] = DeferredDeletionInfo(
|
||||
vmid=vmid,
|
||||
created=sql_now(),
|
||||
last_check=sql_now(),
|
||||
service_uuid=service.db_obj().uuid,
|
||||
)
|
||||
|
||||
if not execute_later:
|
||||
try:
|
||||
service.execute_delete(vmid)
|
||||
except gen_exceptions.NotFoundError:
|
||||
return # Already removed
|
||||
except Exception as e:
|
||||
logger.warning(
|
||||
'Could not delete %s from service %s: %s. Retrying later.', vmid, service.db_obj().name, e
|
||||
)
|
||||
_add_for_later()
|
||||
return
|
||||
else:
|
||||
_add_for_later()
|
||||
return
|
||||
|
||||
# Has not been deleted, so we will defer deletion
|
||||
with DeferredDeletionWorker.deferred_storage.as_dict(DELETING_GROUP, atomic=True) as storage_dict:
|
||||
storage_dict[unique_key] = DeferredDeletionInfo(
|
||||
def create_on_storage(group: str, vmid: str, service_uuid: str) -> None:
|
||||
unique_key = f'{service_uuid}_{vmid}'
|
||||
with DeferredDeletionWorker.deferred_storage.as_dict(group, atomic=True) as storage_dict:
|
||||
storage_dict[unique_key] = DeletionInfo(
|
||||
vmid=vmid,
|
||||
created=sql_now(),
|
||||
last_check=sql_now(),
|
||||
service_uuid=service.db_obj().uuid,
|
||||
service_uuid=service_uuid,
|
||||
)
|
||||
|
||||
def _get_from_storage(
|
||||
self, storage_name: str
|
||||
) -> tuple[dict[str, 'DynamicService'], list[tuple[str, DeferredDeletionInfo]]]:
|
||||
@staticmethod
|
||||
def get_from_storage(
|
||||
storage_name: str,
|
||||
) -> tuple[dict[str, 'DynamicService'], list[tuple[str, 'DeletionInfo']]]:
|
||||
# Get all wating deletion, and try it
|
||||
count = 0
|
||||
infos: list[tuple[str, DeferredDeletionInfo]] = []
|
||||
infos: list[tuple[str, DeletionInfo]] = []
|
||||
|
||||
services: dict[str, 'DynamicService'] = {}
|
||||
|
||||
@ -126,7 +104,7 @@ class DeferredDeletionWorker(Job):
|
||||
# We do this way to release db locks as soon as possible
|
||||
with DeferredDeletionWorker.deferred_storage.as_dict(storage_name, atomic=True) as storage_dict:
|
||||
for key, info in sorted(
|
||||
typing.cast(collections.abc.Iterable[tuple[str, DeferredDeletionInfo]], storage_dict.items()),
|
||||
typing.cast(collections.abc.Iterable[tuple[str, DeletionInfo]], storage_dict.items()),
|
||||
key=lambda x: x[1].last_check,
|
||||
):
|
||||
if info.last_check + datetime.timedelta(seconds=CHECK_INTERVAL) > sql_now():
|
||||
@ -148,11 +126,51 @@ class DeferredDeletionWorker(Job):
|
||||
del storage_dict[key] # Remove from storage, being processed
|
||||
return services, infos
|
||||
|
||||
|
||||
class DeferredDeletionWorker(Job):
|
||||
frecuency = 32 # Frecuncy for this job, in seconds
|
||||
friendly_name = 'Deferred deletion runner'
|
||||
|
||||
deferred_storage: typing.ClassVar[storage.Storage] = storage.Storage('deferdel_worker')
|
||||
|
||||
@staticmethod
|
||||
def add(service: 'DynamicService', vmid: str, execute_later: bool = False) -> None:
|
||||
# If sync, execute now
|
||||
if not execute_later:
|
||||
try:
|
||||
if service.must_stop_before_deletion:
|
||||
if service.is_running(None, vmid):
|
||||
if service.should_try_soft_shutdown():
|
||||
service.shutdown(None, vmid)
|
||||
else:
|
||||
service.stop(None, vmid)
|
||||
DeletionInfo.create_on_storage(STOPPING_GROUP, vmid, service.db_obj().uuid)
|
||||
return
|
||||
|
||||
service.execute_delete(vmid)
|
||||
except gen_exceptions.NotFoundError:
|
||||
return # Already removed
|
||||
except Exception as e:
|
||||
logger.warning(
|
||||
'Could not delete %s from service %s: %s. Retrying later.', vmid, service.db_obj().name, e
|
||||
)
|
||||
DeletionInfo.create_on_storage(TO_DELETE_GROUP, vmid, service.db_obj().uuid)
|
||||
return
|
||||
else:
|
||||
if service.must_stop_before_deletion:
|
||||
DeletionInfo.create_on_storage(TO_STOP_GROUP, vmid, service.db_obj().uuid)
|
||||
else:
|
||||
DeletionInfo.create_on_storage(TO_DELETE_GROUP, vmid, service.db_obj().uuid)
|
||||
return
|
||||
|
||||
# Has not been deleted, so we will defer deletion
|
||||
DeletionInfo.create_on_storage(DELETING_GROUP, vmid, service.db_obj().uuid)
|
||||
|
||||
def _process_exception(
|
||||
self,
|
||||
key: str,
|
||||
info: DeferredDeletionInfo,
|
||||
group: str,
|
||||
info: DeletionInfo,
|
||||
to_group: str,
|
||||
services: dict[str, 'DynamicService'],
|
||||
e: Exception,
|
||||
) -> None:
|
||||
@ -185,24 +203,56 @@ class DeferredDeletionWorker(Job):
|
||||
services[info.service_uuid].db_obj().name,
|
||||
)
|
||||
return # Do not readd it
|
||||
with DeferredDeletionWorker.deferred_storage.as_dict(group, atomic=True) as storage_dict:
|
||||
storage_dict[key] = info
|
||||
info.sync_to_storage(to_group)
|
||||
|
||||
def process_to_stop(self) -> None:
|
||||
services, to_stop = DeletionInfo.get_from_storage(TO_STOP_GROUP)
|
||||
|
||||
# Now process waiting stops
|
||||
for key, info in to_stop:
|
||||
try:
|
||||
if services[info.service_uuid].is_running(None, info.vmid):
|
||||
if services[info.service_uuid].should_try_soft_shutdown():
|
||||
services[info.service_uuid].shutdown(None, info.vmid)
|
||||
else:
|
||||
services[info.service_uuid].stop(None, info.vmid)
|
||||
info.last_check = sql_now()
|
||||
info.fatal_retries = info.total_retries = 0
|
||||
info.sync_to_storage(STOPPING_GROUP)
|
||||
else:
|
||||
# Do not update last_check to shutdown it asap, was not running after all
|
||||
info.sync_to_storage(TO_DELETE_GROUP)
|
||||
except Exception as e:
|
||||
self._process_exception(key, info, TO_STOP_GROUP, services, e)
|
||||
|
||||
def process_stopping(self) -> None:
|
||||
services, stopping = DeletionInfo.get_from_storage(STOPPING_GROUP)
|
||||
|
||||
# Now process waiting stops
|
||||
for key, info in stopping:
|
||||
try:
|
||||
if services[info.service_uuid].is_running(None, info.vmid):
|
||||
info.last_check = sql_now()
|
||||
info.total_retries += 1
|
||||
info.sync_to_storage(STOPPING_GROUP)
|
||||
else:
|
||||
info.last_check = sql_now()
|
||||
info.fatal_retries = info.total_retries = 0
|
||||
info.sync_to_storage(TO_DELETE_GROUP)
|
||||
except Exception as e:
|
||||
self._process_exception(key, info, STOPPING_GROUP, services, e)
|
||||
|
||||
def process_to_delete(self) -> None:
|
||||
services, to_delete = self._get_from_storage(TO_DELETE_GROUP)
|
||||
services, to_delete = DeletionInfo.get_from_storage(TO_DELETE_GROUP)
|
||||
|
||||
# Now process waiting deletions
|
||||
for key, info in to_delete:
|
||||
try:
|
||||
services[info.service_uuid].execute_delete(info.vmid)
|
||||
# And store it for checking later if it has been deleted, reseting counters
|
||||
with DeferredDeletionWorker.deferred_storage.as_dict(
|
||||
DELETING_GROUP, atomic=True
|
||||
) as storage_dict:
|
||||
info.last_check = sql_now()
|
||||
info.fatal_retries = 0
|
||||
info.total_retries = 0
|
||||
storage_dict[key] = info
|
||||
info.last_check = sql_now()
|
||||
info.fatal_retries = info.total_retries = 0
|
||||
info.sync_to_storage(DELETING_GROUP)
|
||||
except Exception as e:
|
||||
self._process_exception(key, info, TO_DELETE_GROUP, services, e)
|
||||
|
||||
@ -212,7 +262,7 @@ class DeferredDeletionWorker(Job):
|
||||
|
||||
Note: Very similar to process_to_delete, but this one is for objects that are already being deleted
|
||||
"""
|
||||
services, deleting = self._get_from_storage(DELETING_GROUP)
|
||||
services, deleting = DeletionInfo.get_from_storage(DELETING_GROUP)
|
||||
|
||||
# Now process waiting deletions checks
|
||||
for key, info in deleting:
|
||||
@ -221,13 +271,12 @@ class DeferredDeletionWorker(Job):
|
||||
if not services[info.service_uuid].is_deleted(info.vmid):
|
||||
info.last_check = sql_now()
|
||||
info.total_retries += 1
|
||||
with DeferredDeletionWorker.deferred_storage.as_dict(
|
||||
DELETING_GROUP, atomic=True
|
||||
) as storage_dict:
|
||||
storage_dict[key] = info
|
||||
info.sync_to_storage(DELETING_GROUP)
|
||||
except Exception as e:
|
||||
self._process_exception(key, info, DELETING_GROUP, services, e)
|
||||
|
||||
def run(self) -> None:
|
||||
self.process_to_stop()
|
||||
self.process_stopping()
|
||||
self.process_to_delete()
|
||||
self.process_deleting()
|
||||
|
@ -232,21 +232,21 @@ class OpenStackLiveService(DynamicService):
|
||||
def sanitized_name(self, name: str) -> str:
|
||||
return self.provider().sanitized_name(name)
|
||||
|
||||
def get_ip(self, caller_instance: 'DynamicUserService | DynamicPublication', vmid: str) -> str:
|
||||
def get_ip(self, caller_instance: typing.Optional['DynamicUserService | DynamicPublication'], vmid: str) -> str:
|
||||
return self.api.get_server(vmid).validated().addresses[0].ip
|
||||
|
||||
def get_mac(self, caller_instance: 'DynamicUserService | DynamicPublication', vmid: str) -> str:
|
||||
def get_mac(self, caller_instance: typing.Optional['DynamicUserService | DynamicPublication'], vmid: str) -> str:
|
||||
return self.api.get_server(vmid).validated().addresses[0].mac
|
||||
|
||||
def is_running(self, caller_instance: 'DynamicUserService | DynamicPublication', vmid: str) -> bool:
|
||||
def is_running(self, caller_instance: typing.Optional['DynamicUserService | DynamicPublication'], vmid: str) -> bool:
|
||||
return self.api.get_server(vmid).validated().power_state.is_running()
|
||||
|
||||
def start(self, caller_instance: 'DynamicUserService | DynamicPublication', vmid: str) -> None:
|
||||
def start(self, caller_instance: typing.Optional['DynamicUserService | DynamicPublication'], vmid: str) -> None:
|
||||
if self.api.get_server(vmid).validated().power_state.is_running():
|
||||
return
|
||||
self.api.start_server(vmid)
|
||||
|
||||
def stop(self, caller_instance: 'DynamicUserService | DynamicPublication', vmid: str) -> None:
|
||||
def stop(self, caller_instance: typing.Optional['DynamicUserService | DynamicPublication'], vmid: str) -> None:
|
||||
if self.api.get_server(vmid).validated().power_state.is_stopped():
|
||||
return
|
||||
self.api.stop_server(vmid)
|
||||
@ -255,18 +255,29 @@ class OpenStackLiveService(DynamicService):
|
||||
# Note that on openstack, stop is "soft", but may fail to stop if no agent is installed or not responding
|
||||
# We can anyway delete de machine even if it is not stopped
|
||||
|
||||
def reset(self, caller_instance: 'DynamicUserService | DynamicPublication', vmid: str) -> None:
|
||||
def reset(self, caller_instance: typing.Optional['DynamicUserService | DynamicPublication'], vmid: str) -> None:
|
||||
# Default is to stop "hard"
|
||||
return self.stop(caller_instance, vmid)
|
||||
|
||||
def delete(self, caller_instance: 'DynamicUserService | DynamicPublication', vmid: str) -> None:
|
||||
|
||||
def delete(self, caller_instance: typing.Optional['DynamicUserService | DynamicPublication'], vmid: str) -> None:
|
||||
"""
|
||||
Removes the machine, or queues it for removal, or whatever :)
|
||||
"""
|
||||
if isinstance(caller_instance, OpenStackLiveUserService):
|
||||
vmid = f'VM:{vmid}'
|
||||
super().delete(caller_instance, vmid)
|
||||
else:
|
||||
vmid = f'SS:{vmid}'
|
||||
super().delete(caller_instance, vmid)
|
||||
|
||||
def execute_delete(self, vmid: str) -> None:
|
||||
kind, vmid = vmid.split(':')
|
||||
if kind == 'VM':
|
||||
self.api.delete_server(vmid)
|
||||
else:
|
||||
self.api.delete_snapshot(vmid)
|
||||
|
||||
# default is_deleted is fine, returns True always
|
||||
|
||||
def make_template(
|
||||
self, template_name: str, description: typing.Optional[str] = None
|
||||
|
@ -46,6 +46,10 @@ MAX_VMID_LIFE_SECS: typing.Final[int] = 365 * 24 * 60 * 60 * 3 # 3 years for "r
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
# Job will be here for 4.0, but will be removed in a future
|
||||
# The idea is allow using old Removal Job for existing installations
|
||||
# but use new DeferredDeletionWorker for new installations
|
||||
|
||||
|
||||
class ProxmoxDeferredRemoval(jobs.Job):
|
||||
frecuency = 60 * 3 # Once every NN minutes
|
||||
@ -63,39 +67,39 @@ class ProxmoxDeferredRemoval(jobs.Job):
|
||||
return int(vmid), try_graceful_shutdown
|
||||
|
||||
|
||||
@staticmethod
|
||||
def remove(provider_instance: 'provider.ProxmoxProvider', vmid: int, try_graceful_shutdown: bool) -> None:
|
||||
def store_for_deferred_removal() -> None:
|
||||
provider_instance.storage.save_to_db('tr' + str(vmid), f'{vmid}:{"y" if try_graceful_shutdown else "n"}', attr1='tRm')
|
||||
ProxmoxDeferredRemoval.counter += 1
|
||||
logger.debug('Adding %s from %s to defeffed removal process', vmid, provider_instance)
|
||||
try:
|
||||
# First check state & stop machine if needed
|
||||
vminfo = provider_instance.get_machine_info(vmid)
|
||||
if vminfo.status == 'running':
|
||||
if try_graceful_shutdown:
|
||||
# If running vm, simply try to shutdown
|
||||
provider_instance.shutdown_machine(vmid)
|
||||
# Store for later removal
|
||||
else:
|
||||
# If running vm, simply stops it and wait for next
|
||||
provider_instance.stop_machine(vmid)
|
||||
# @staticmethod
|
||||
# def remove(provider_instance: 'provider.ProxmoxProvider', vmid: int, try_graceful_shutdown: bool) -> None:
|
||||
# def store_for_deferred_removal() -> None:
|
||||
# provider_instance.storage.save_to_db('tr' + str(vmid), f'{vmid}:{"y" if try_graceful_shutdown else "n"}', attr1='tRm')
|
||||
# ProxmoxDeferredRemoval.counter += 1
|
||||
# logger.debug('Adding %s from %s to defeffed removal process', vmid, provider_instance)
|
||||
# try:
|
||||
# # First check state & stop machine if needed
|
||||
# vminfo = provider_instance.get_machine_info(vmid)
|
||||
# if vminfo.status == 'running':
|
||||
# if try_graceful_shutdown:
|
||||
# # If running vm, simply try to shutdown
|
||||
# provider_instance.shutdown_machine(vmid)
|
||||
# # Store for later removal
|
||||
# else:
|
||||
# # If running vm, simply stops it and wait for next
|
||||
# provider_instance.stop_machine(vmid)
|
||||
|
||||
store_for_deferred_removal()
|
||||
return
|
||||
# store_for_deferred_removal()
|
||||
# return
|
||||
|
||||
provider_instance.remove_machine(vmid) # Try to remove, launch removal, but check later
|
||||
store_for_deferred_removal()
|
||||
# provider_instance.remove_machine(vmid) # Try to remove, launch removal, but check later
|
||||
# store_for_deferred_removal()
|
||||
|
||||
except client.ProxmoxNotFound:
|
||||
return # Machine does not exists
|
||||
except Exception as e:
|
||||
store_for_deferred_removal()
|
||||
logger.info(
|
||||
'Machine %s could not be removed right now, queued for later: %s',
|
||||
vmid,
|
||||
e,
|
||||
)
|
||||
# except client.ProxmoxNotFound:
|
||||
# return # Machine does not exists
|
||||
# except Exception as e:
|
||||
# store_for_deferred_removal()
|
||||
# logger.info(
|
||||
# 'Machine %s could not be removed right now, queued for later: %s',
|
||||
# vmid,
|
||||
# e,
|
||||
# )
|
||||
|
||||
@staticmethod
|
||||
def waitForTaskFinish(
|
||||
|
@ -33,6 +33,7 @@ import re
|
||||
import typing
|
||||
|
||||
from django.utils.translation import gettext_noop as _
|
||||
|
||||
from uds.core import types
|
||||
from uds.core.services.generics.dynamic.publication import DynamicPublication
|
||||
from uds.core.services.generics.dynamic.service import DynamicService
|
||||
@ -40,7 +41,7 @@ from uds.core.services.generics.dynamic.userservice import DynamicUserService
|
||||
from uds.core.ui import gui
|
||||
from uds.core.util import validators, fields
|
||||
|
||||
from . import helpers, jobs
|
||||
from . import helpers
|
||||
from .deployment_linked import ProxmoxUserserviceLinked
|
||||
from .publication import ProxmoxPublication
|
||||
|
||||
@ -272,7 +273,7 @@ class ProxmoxServiceLinked(DynamicService):
|
||||
def is_ha_enabled(self) -> bool:
|
||||
return self.ha.value != '__'
|
||||
|
||||
def try_graceful_shutdown(self) -> bool:
|
||||
def should_try_soft_shutdown(self) -> bool:
|
||||
return self.try_soft_shutdown.as_bool()
|
||||
|
||||
def get_console_connection(self, vmid: str) -> typing.Optional[types.services.ConsoleConnectionInfo]:
|
||||
@ -281,16 +282,16 @@ class ProxmoxServiceLinked(DynamicService):
|
||||
def is_avaliable(self) -> bool:
|
||||
return self.provider().is_available()
|
||||
|
||||
def get_ip(self, caller_instance: 'DynamicUserService | DynamicPublication', vmid: str) -> str:
|
||||
def get_ip(self, caller_instance: typing.Optional['DynamicUserService | DynamicPublication'], vmid: str) -> str:
|
||||
return self.provider().get_guest_ip_address(int(vmid))
|
||||
|
||||
def get_mac(self, caller_instance: 'DynamicUserService | DynamicPublication', vmid: str) -> str:
|
||||
def get_mac(self, caller_instance: typing.Optional['DynamicUserService | DynamicPublication'], vmid: str) -> str:
|
||||
# If vmid is empty, we are requesting a new mac
|
||||
if not vmid:
|
||||
return self.mac_generator().get(self.get_macs_range())
|
||||
return self.get_nic_mac(int(vmid))
|
||||
|
||||
def start(self, caller_instance: 'DynamicUserService | DynamicPublication', vmid: str) -> None:
|
||||
def start(self, caller_instance: typing.Optional['DynamicUserService | DynamicPublication'], vmid: str) -> None:
|
||||
if isinstance(caller_instance, ProxmoxUserserviceLinked):
|
||||
if self.is_running(caller_instance, vmid): # If running, skip
|
||||
caller_instance._task = ''
|
||||
@ -299,7 +300,7 @@ class ProxmoxServiceLinked(DynamicService):
|
||||
else:
|
||||
raise Exception('Invalid caller instance (publication) for start_machine()')
|
||||
|
||||
def stop(self, caller_instance: 'DynamicUserService | DynamicPublication', vmid: str) -> None:
|
||||
def stop(self, caller_instance: typing.Optional['DynamicUserService | DynamicPublication'], vmid: str) -> None:
|
||||
if isinstance(caller_instance, ProxmoxUserserviceLinked):
|
||||
if self.is_running(caller_instance, vmid):
|
||||
caller_instance._store_task(self.provider().stop_machine(int(vmid)))
|
||||
@ -308,7 +309,7 @@ class ProxmoxServiceLinked(DynamicService):
|
||||
else:
|
||||
raise Exception('Invalid caller instance (publication) for stop_machine()')
|
||||
|
||||
def shutdown(self, caller_instance: 'DynamicUserService | DynamicPublication', vmid: str) -> None:
|
||||
def shutdown(self, caller_instance: typing.Optional['DynamicUserService | DynamicPublication'], vmid: str) -> None:
|
||||
if isinstance(caller_instance, ProxmoxUserserviceLinked):
|
||||
if self.is_running(caller_instance, vmid):
|
||||
caller_instance._store_task(self.provider().shutdown_machine(int(vmid)))
|
||||
@ -317,13 +318,13 @@ class ProxmoxServiceLinked(DynamicService):
|
||||
else:
|
||||
raise Exception('Invalid caller instance (publication) for shutdown_machine()')
|
||||
|
||||
def is_running(self, caller_instance: 'DynamicUserService | DynamicPublication', vmid: str) -> bool:
|
||||
def is_running(self, caller_instance: typing.Optional['DynamicUserService | DynamicPublication'], vmid: str) -> bool:
|
||||
# Raise an exception if fails to get machine info
|
||||
vminfo = self.get_machine_info(int(vmid))
|
||||
|
||||
return vminfo.status != 'stopped'
|
||||
|
||||
def delete(self, caller_instance: 'DynamicUserService | DynamicPublication', vmid: str) -> None:
|
||||
def execute_delete(self, vmid: str) -> None:
|
||||
# All removals are deferred, so we can do it async
|
||||
# Try to stop it if already running... Hard stop
|
||||
jobs.ProxmoxDeferredRemoval.remove(self.provider(), int(vmid), self.try_graceful_shutdown())
|
||||
self.provider().remove_machine(int(vmid))
|
||||
|
@ -288,20 +288,20 @@ class XenLinkedService(DynamicService): # pylint: disable=too-many-public-metho
|
||||
vm_opaque_ref, mac_info={'network': self.network.value, 'mac': mac}, memory=self.memory.value
|
||||
)
|
||||
|
||||
def get_ip(self, caller_instance: 'DynamicUserService | DynamicPublication', vmid: str) -> str:
|
||||
def get_ip(self, caller_instance: typing.Optional['DynamicUserService | DynamicPublication'], vmid: str) -> str:
|
||||
"""
|
||||
Returns the ip of the machine
|
||||
If cannot be obtained, MUST raise an exception
|
||||
"""
|
||||
return '' # No ip will be get, UDS will assign one (from actor)
|
||||
|
||||
def get_mac(self, caller_instance: 'DynamicUserService | DynamicPublication', vmid: str) -> str:
|
||||
def get_mac(self, caller_instance: typing.Optional['DynamicUserService | DynamicPublication'], vmid: str) -> str:
|
||||
"""
|
||||
For
|
||||
"""
|
||||
return self.mac_generator().get(self.provider().get_macs_range())
|
||||
|
||||
def is_running(self, caller_instance: 'DynamicUserService | DynamicPublication', vmid: str) -> bool:
|
||||
def is_running(self, caller_instance: typing.Optional['DynamicUserService | DynamicPublication'], vmid: str) -> bool:
|
||||
"""
|
||||
Returns if the machine is ready and running
|
||||
"""
|
||||
@ -311,7 +311,7 @@ class XenLinkedService(DynamicService): # pylint: disable=too-many-public-metho
|
||||
return True
|
||||
return False
|
||||
|
||||
def start(self, caller_instance: 'DynamicUserService | DynamicPublication', vmid: str) -> None:
|
||||
def start(self, caller_instance: typing.Optional['DynamicUserService | DynamicPublication'], vmid: str) -> None:
|
||||
"""
|
||||
Starts the machine
|
||||
Can return a task, or None if no task is returned
|
||||
@ -319,7 +319,7 @@ class XenLinkedService(DynamicService): # pylint: disable=too-many-public-metho
|
||||
with self.provider().get_connection() as api:
|
||||
api.start_vm(vmid)
|
||||
|
||||
def stop(self, caller_instance: 'DynamicUserService | DynamicPublication', vmid: str) -> None:
|
||||
def stop(self, caller_instance: typing.Optional['DynamicUserService | DynamicPublication'], vmid: str) -> None:
|
||||
"""
|
||||
Stops the machine
|
||||
Can return a task, or None if no task is returned
|
||||
@ -327,13 +327,15 @@ class XenLinkedService(DynamicService): # pylint: disable=too-many-public-metho
|
||||
with self.provider().get_connection() as api:
|
||||
api.stop_vm(vmid)
|
||||
|
||||
def shutdown(self, caller_instance: 'DynamicUserService | DynamicPublication', vmid: str) -> None:
|
||||
def shutdown(self, caller_instance: typing.Optional['DynamicUserService | DynamicPublication'], vmid: str) -> None:
|
||||
with self.provider().get_connection() as api:
|
||||
api.shutdown_vm(vmid)
|
||||
|
||||
def delete(self, caller_instance: 'DynamicUserService | DynamicPublication', vmid: str) -> None:
|
||||
def execute_delete(self, vmid: str) -> None:
|
||||
"""
|
||||
Removes the machine, or queues it for removal, or whatever :)
|
||||
"""
|
||||
with self.provider().get_connection() as api:
|
||||
api.delete_vm(vmid)
|
||||
|
||||
# default is_deleted is enough for us, returns always True
|
||||
|
Loading…
x
Reference in New Issue
Block a user