diff --git a/server/src/tests/core/services/generics/test_dynamic_deferred_removal.py b/server/src/tests/core/services/generics/test_dynamic_deferred_removal.py index 5c3ad0f18..1a3741bcf 100644 --- a/server/src/tests/core/services/generics/test_dynamic_deferred_removal.py +++ b/server/src/tests/core/services/generics/test_dynamic_deferred_removal.py @@ -61,9 +61,7 @@ class DynamicServiceTest(UDSTransactionTestCase): ]: with deferred_deletion.DeferredDeletionWorker.deferred_storage.as_dict(group) as storage: for key, info in typing.cast(dict[str, deferred_deletion.DeletionInfo], storage).items(): - info.last_check = sql_now() - datetime.timedelta( - seconds=deferred_deletion.CHECK_INTERVAL * 2 - ) + info.last_check = sql_now() - datetime.timedelta(seconds=deferred_deletion.CHECK_INTERVAL) storage[key] = info def count_entries_on_storage(self, group: str) -> int: @@ -74,7 +72,7 @@ class DynamicServiceTest(UDSTransactionTestCase): def patch_for_worker( self, *, - execute_side_effect: typing.Union[None, typing.Callable[..., None], Exception] = None, + execute_delete_side_effect: typing.Union[None, typing.Callable[..., None], Exception] = None, is_deleted_side_effect: typing.Union[None, typing.Callable[..., bool], Exception] = None, is_running: typing.Union[None, typing.Callable[..., bool]] = None, must_stop_before_deletion: bool = False, @@ -90,7 +88,7 @@ class DynamicServiceTest(UDSTransactionTestCase): instance_db_obj = mock.MagicMock(uuid='service1') instance_db_obj.get_instance.return_value = instance instance.db_obj.return_value = instance_db_obj - instance.execute_delete.side_effect = execute_side_effect + instance.execute_delete.side_effect = execute_delete_side_effect or helpers.returns_none instance.is_deleted.side_effect = is_deleted_side_effect or helpers.returns_true instance.must_stop_before_deletion = must_stop_before_deletion @@ -117,7 +115,7 @@ class DynamicServiceTest(UDSTransactionTestCase): storage.as_dict.side_effect = _as_dict yield instance, dct - def test_deferred_delete_full_fine_delete(self) -> None: + def test_delete_full_fine_delete(self) -> None: # Tests only delete and is_deleted, no stop and stopping service = fixtures.create_dynamic_service_for_deferred_deletion() @@ -228,7 +226,7 @@ class DynamicServiceTest(UDSTransactionTestCase): self.assertEqual(self.count_entries_on_storage(deferred_deletion.DELETING_GROUP), 0) self.assertEqual(self.count_entries_on_storage(deferred_deletion.TO_DELETE_GROUP), 0) - def test_deferred_delete_delayed_full(self) -> None: + def test_delete_delayed_full(self) -> None: service = fixtures.create_dynamic_service_for_deferred_deletion() provider = models.Provider.objects.create( @@ -322,7 +320,7 @@ class DynamicServiceTest(UDSTransactionTestCase): self.assertEqual(self.count_entries_on_storage(deferred_deletion.DELETING_GROUP), 0) self.assertEqual(self.count_entries_on_storage(deferred_deletion.TO_DELETE_GROUP), 0) - def test_deferred_deletion_is_deleted(self) -> None: + def test_deletion_is_deleted(self) -> None: for is_deleted in (True, False): with self.patch_for_worker( is_deleted_side_effect=lambda *args: is_deleted, @@ -355,14 +353,14 @@ class DynamicServiceTest(UDSTransactionTestCase): self.assertEqual(info.fatal_retries, 0) self.assertEqual(info.total_retries, 1) - def test_deferred_deletion_fails_add(self) -> None: + def test_deletion_fails_add(self) -> None: for error in ( gen_exceptions.RetryableError('error'), gen_exceptions.NotFoundError('error'), gen_exceptions.FatalError('error'), ): with self.patch_for_worker( - execute_side_effect=error, + execute_delete_side_effect=error, ) as (instance, dct): deferred_deletion.DeferredDeletionWorker.add(instance, 'vmid1', execute_later=False) @@ -418,7 +416,7 @@ class DynamicServiceTest(UDSTransactionTestCase): self.assertEqual(self.count_entries_on_storage(deferred_deletion.TO_DELETE_GROUP), 0) self.assertEqual(self.count_entries_on_storage(deferred_deletion.DELETING_GROUP), 0) - def test_deferred_deletion_fails_is_deleted(self) -> None: + def test_deletion_fails_is_deleted(self) -> None: for error in ( gen_exceptions.RetryableError('error'), gen_exceptions.NotFoundError('error'), @@ -472,7 +470,7 @@ class DynamicServiceTest(UDSTransactionTestCase): self.assertEqual(self.count_entries_on_storage(deferred_deletion.TO_DELETE_GROUP), 0) self.assertEqual(self.count_entries_on_storage(deferred_deletion.DELETING_GROUP), 0) - def test_deferred_stop(self) -> None: + def test_stop(self) -> None: # Explanation: # key: running, execute_later, should_try_soft_shutdown @@ -487,13 +485,13 @@ class DynamicServiceTest(UDSTransactionTestCase): # * shutdown is called if running and not execute_later and should_try_soft_shutdown (#2) COUNTERS_ADD: typing.Final[dict[tuple[bool, bool, bool], tuple[int, int, int, int, int, int, int]]] = { # run exec try TD DE TS ST is st sh - (True, True, True): (0, 0, 1, 0, 0, 0, 0), # 1 - (True, False, True): (0, 0, 0, 1, 1, 0, 1), # 2 - (True, True, False): (0, 0, 1, 0, 0, 0, 0), # 3 - (True, False, False): (0, 0, 0, 1, 1, 1, 0), # 4 - (False, True, True): (0, 0, 1, 0, 0, 0, 0), # 5 - (False, False, True): (0, 1, 0, 0, 1, 0, 0), # 6 - (False, True, False): (0, 0, 1, 0, 0, 0, 0), # 7 + (True, True, True): (0, 0, 1, 0, 0, 0, 0), # 1 + (True, False, True): (0, 0, 0, 1, 1, 0, 1), # 2 + (True, True, False): (0, 0, 1, 0, 0, 0, 0), # 3 + (True, False, False): (0, 0, 0, 1, 1, 1, 0), # 4 + (False, True, True): (0, 0, 1, 0, 0, 0, 0), # 5 + (False, False, True): (0, 1, 0, 0, 1, 0, 0), # 6 + (False, True, False): (0, 0, 1, 0, 0, 0, 0), # 7 (False, False, False): (0, 1, 0, 0, 1, 0, 0), # 8 } @@ -514,13 +512,13 @@ class DynamicServiceTest(UDSTransactionTestCase): # * shutdown if running and execute later and should_try_soft_shutdown (#2) COUNTERS_JOB: dict[tuple[bool, bool, bool], tuple[int, int, int, int, int, int, int]] = { # run exec try TD DE TS ST is st sh - (True, True, True): (0, 0, 0, 1, 1, 0, 1), # 1 - (True, False, True): (0, 0, 0, 1, 1, 0, 0), # 2 - (True, True, False): (0, 0, 0, 1, 1, 1, 0), # 3 - (True, False, False): (0, 0, 0, 1, 1, 0, 0), # 4 - (False, True, True): (0, 1, 0, 0, 1, 0, 0), # 5 - (False, False, True): (0, 0, 0, 0, 0, 0, 0), # 6 - (False, True, False): (0, 1, 0, 0, 1, 0, 0), # 7 + (True, True, True): (0, 0, 0, 1, 1, 0, 1), # 1 + (True, False, True): (0, 0, 0, 1, 1, 0, 0), # 2 + (True, True, False): (0, 0, 0, 1, 1, 1, 0), # 3 + (True, False, False): (0, 0, 0, 1, 1, 0, 0), # 4 + (False, True, True): (0, 1, 0, 0, 1, 0, 0), # 5 + (False, False, True): (0, 0, 0, 0, 0, 0, 0), # 6 + (False, True, False): (0, 1, 0, 0, 1, 0, 0), # 7 (False, False, False): (0, 0, 0, 0, 0, 0, 0), # 8 } @@ -574,3 +572,168 @@ class DynamicServiceTest(UDSTransactionTestCase): ), f'COUNTERS_JOB {running} {execute_later} --> {COUNTERS_JOB[(running, execute_later, should_try_soft_shutdown)]}', ) + + def test_stop_retry_stop(self) -> None: + deferred_deletion.RETRIES_TO_RETRY = 2 + deferred_deletion.MAX_TOTAL_RETRIES = 4 + + with self.patch_for_worker( + is_running=helpers.returns_true, + must_stop_before_deletion=True, + should_try_soft_shutdown=True, + ) as (instance, dct): + deferred_deletion.DeferredDeletionWorker.add(instance, 'vmid1', execute_later=False) + + info = next(iter(dct[deferred_deletion.STOPPING_GROUP].values())) + + self.assertEqual(info.total_retries, 0) + self.assertEqual(info.fatal_retries, 0) + self.assertEqual(info.retries, 0) + + instance.is_running.assert_called_once() + instance.should_try_soft_shutdown.assert_called_once() + instance.shutdown.assert_called_once() + + instance.reset_mock() + + job = deferred_deletion.DeferredDeletionWorker(environment=mock.MagicMock()) + # On fist run, will already be running + self.set_last_check_expired() # To ensure it's processed + job.run() + + instance.is_running.assert_called_once() + instance.reset_mock() + + self.assertEqual(info.total_retries, 1) + self.assertEqual(info.fatal_retries, 0) + self.assertEqual(info.retries, 1) + + # On second run, will already be running + self.set_last_check_expired() + job.run() + + instance.is_running.assert_called_once() + instance.reset_mock() + + self.assertEqual(info.total_retries, 2) + self.assertEqual(info.fatal_retries, 0) + self.assertEqual(info.retries, 2) + + # On third run, will simply be readded to TO_STOP_GROUP + self.set_last_check_expired() + job.run() + + # No calls + instance.assert_not_called() + + self.assertEqual(info.total_retries, 3) + self.assertEqual(info.fatal_retries, 0) + self.assertEqual(info.retries, 3) + + # should be on TO_STOP_GROUP + self.assertEqual(self.count_entries_on_storage(deferred_deletion.TO_STOP_GROUP), 1) + + # On next call, again is_running will be called, and stop this time + self.set_last_check_expired() + job.run() + + instance.is_running.assert_called_once() + instance.stop.assert_called_once() + instance.reset_mock() + + # Reseted retries, but no total_retries (as it's a new try) + self.assertEqual(info.total_retries, 4) + self.assertEqual(info.fatal_retries, 0) + self.assertEqual(info.retries, 0) + + # But sould have been removed from all queues + # due to MAX_TOTAL_RETRIES, this is checked for every queue on storage access, + # and STOPPING_GROUP is after TO_STOP_GROUP. So, after STOPPING adds it to TO_DELETE_GROUP + # the storage access method will remove it from TO_DELETE_GROUP due to MAX_TOTAL_RETRIES + + self.assertEqual(self.count_entries_on_storage(deferred_deletion.TO_STOP_GROUP), 0) + self.assertEqual(self.count_entries_on_storage(deferred_deletion.STOPPING_GROUP), 0) + self.assertEqual(self.count_entries_on_storage(deferred_deletion.TO_DELETE_GROUP), 0) + self.assertEqual(self.count_entries_on_storage(deferred_deletion.DELETING_GROUP), 0) + + + def test_delete_retry_delete(self) -> None: + deferred_deletion.RETRIES_TO_RETRY = 2 + deferred_deletion.MAX_TOTAL_RETRIES = 4 + + with self.patch_for_worker( + is_running=helpers.returns_true, + is_deleted_side_effect=helpers.returns_false, + ) as (instance, dct): + deferred_deletion.DeferredDeletionWorker.add(instance, 'vmid1', execute_later=False) + + info = next(iter(dct[deferred_deletion.DELETING_GROUP].values())) + + self.assertEqual(info.total_retries, 0) + self.assertEqual(info.fatal_retries, 0) + self.assertEqual(info.retries, 0) + + instance.is_running.assert_not_called() + instance.should_try_soft_shutdown.assert_not_called() + instance.execute_delete.assert_called() + + instance.reset_mock() + + job = deferred_deletion.DeferredDeletionWorker(environment=mock.MagicMock()) + # On fist run, will already be running + self.set_last_check_expired() # To ensure it's processed + job.run() + + instance.is_deleted.assert_called_once() + instance.reset_mock() + + self.assertEqual(info.total_retries, 1) + self.assertEqual(info.fatal_retries, 0) + self.assertEqual(info.retries, 1) + + # On second run, will already be running + self.set_last_check_expired() + job.run() + + instance.is_deleted.assert_called_once() + instance.reset_mock() + + self.assertEqual(info.total_retries, 2) + self.assertEqual(info.fatal_retries, 0) + self.assertEqual(info.retries, 2) + + # On third run, will simply be readded to TO_DELETE_GROUP + self.set_last_check_expired() + job.run() + + # No calls + instance.assert_not_called() + + self.assertEqual(info.total_retries, 3) + self.assertEqual(info.fatal_retries, 0) + self.assertEqual(info.retries, 3) + + # should be on TO_DELETE_GROUP + self.assertEqual(self.count_entries_on_storage(deferred_deletion.TO_DELETE_GROUP), 1) + + # On next call, again is_running will be called, and stop this time + self.set_last_check_expired() + job.run() + + instance.execute_delete.assert_called_once() + instance.reset_mock() + + # Reseted retries, but no total_retries (as it's a new try) + self.assertEqual(info.total_retries, 4) + self.assertEqual(info.fatal_retries, 0) + self.assertEqual(info.retries, 0) + + # But sould have been removed from all queues + # due to MAX_TOTAL_RETRIES, this is checked for every queue on storage access, + # and STOPPING_GROUP is after TO_STOP_GROUP. So, after STOPPING adds it to TO_DELETE_GROUP + # the storage access method will remove it from TO_DELETE_GROUP due to MAX_TOTAL_RETRIES + + self.assertEqual(self.count_entries_on_storage(deferred_deletion.TO_STOP_GROUP), 0) + self.assertEqual(self.count_entries_on_storage(deferred_deletion.STOPPING_GROUP), 0) + self.assertEqual(self.count_entries_on_storage(deferred_deletion.TO_DELETE_GROUP), 0) + self.assertEqual(self.count_entries_on_storage(deferred_deletion.DELETING_GROUP), 0) diff --git a/server/src/tests/utils/helpers.py b/server/src/tests/utils/helpers.py index c3e6f955b..33b5677ac 100644 --- a/server/src/tests/utils/helpers.py +++ b/server/src/tests/utils/helpers.py @@ -113,4 +113,7 @@ def returns_true(*args: typing.Any, **kwargs: typing.Any) -> bool: return True def returns_false(*args: typing.Any, **kwargs: typing.Any) -> bool: - return False \ No newline at end of file + return False + +def returns_none(*args: typing.Any, **kwargs: typing.Any) -> typing.Any: + return None \ No newline at end of file diff --git a/server/src/uds/core/workers/deferred_deletion.py b/server/src/uds/core/workers/deferred_deletion.py index 1bcbf0220..b0a3b0b3f 100644 --- a/server/src/uds/core/workers/deferred_deletion.py +++ b/server/src/uds/core/workers/deferred_deletion.py @@ -50,9 +50,14 @@ logger = logging.getLogger(__name__) MAX_FATAL_ERROR_RETRIES: typing.Final[int] = 8 MAX_TOTAL_RETRIES: typing.Final[int] = 1024 +RETRIES_TO_RETRY: typing.Final[int] = ( + 16 # Retries to stop again or to shutdown again in STOPPING_GROUP or DELETING_GROUP +) MAX_DELETIONS_AT_ONCE: typing.Final[int] = 16 MAX_DELETIONS_CHECKED_AT_ONCE: typing.Final[int] = MAX_DELETIONS_AT_ONCE * 2 +# This interval is how long will take to check again for deletion, stopping, etc... +# That is, once a machine is deleted, every 32 seconds will be check that it has been deleted CHECK_INTERVAL: typing.Final[int] = 32 # Check interval, in seconds TO_STOP_GROUP: typing.Final[str] = 'to_stop' @@ -69,6 +74,7 @@ class DeletionInfo: service_uuid: str fatal_retries: int = 0 # Fatal error retries total_retries: int = 0 # Total retries + retries: int = 0 # Retries to stop again or to delete again in STOPPING_GROUP or DELETING_GROUP def sync_to_storage(self, group: str) -> None: """ @@ -88,13 +94,20 @@ class DeletionInfo: created=sql_now(), last_check=sql_now(), service_uuid=service_uuid, + # fatal, total an retries are 0 by default ) @staticmethod def get_from_storage( storage_name: str, ) -> tuple[dict[str, 'DynamicService'], list[tuple[str, 'DeletionInfo']]]: - # Get all wating deletion, and try it + """ + Get a list of objects to be processed from storage + + Note: + This method will remove the objects from storage, so if needed, has to be readded + This is so we can release locks as soon as possible + """ count = 0 infos: list[tuple[str, DeletionInfo]] = [] @@ -107,6 +120,16 @@ class DeletionInfo: typing.cast(collections.abc.Iterable[tuple[str, DeletionInfo]], storage_dict.items()), key=lambda x: x[1].last_check, ): + # if max retries reached, remove it + if info.total_retries >= MAX_TOTAL_RETRIES: + logger.error( + 'Too many retries deleting %s from service %s, removing from deferred deletion', + info.vmid, + info.service_uuid, + ) + del storage_dict[key] + continue + if info.last_check + datetime.timedelta(seconds=CHECK_INTERVAL) > sql_now(): continue try: @@ -122,13 +145,15 @@ class DeletionInfo: if (count := count + 1) > MAX_DELETIONS_AT_ONCE: break - infos.append((key, info)) del storage_dict[key] # Remove from storage, being processed + + # Only add if not too many retries already + infos.append((key, info)) return services, infos class DeferredDeletionWorker(Job): - frecuency = 32 # Frecuncy for this job, in seconds + frecuency = 19 # Frequency for this job, in seconds friendly_name = 'Deferred deletion runner' deferred_storage: typing.ClassVar[storage.Storage] = storage.Storage('deferdel_worker') @@ -211,13 +236,21 @@ class DeferredDeletionWorker(Job): # Now process waiting stops for key, info in to_stop: try: - if services[info.service_uuid].is_running(None, info.vmid): - if services[info.service_uuid].should_try_soft_shutdown(): - services[info.service_uuid].shutdown(None, info.vmid) + service = services[info.service_uuid] + if service.is_running(None, info.vmid): + # if info.retries < RETRIES_TO_RETRY, means this is the first time we try to stop it + if info.retries < RETRIES_TO_RETRY: + if service.should_try_soft_shutdown(): + service.shutdown(None, info.vmid) + else: + service.stop(None, info.vmid) + info.fatal_retries = info.total_retries = 0 else: - services[info.service_uuid].stop(None, info.vmid) + info.total_retries += 1 # Count this as a general retry + info.retries = 0 # Reset retries + service.stop(None, info.vmid) # Always try to stop it if we have tried before + info.last_check = sql_now() - info.fatal_retries = info.total_retries = 0 info.sync_to_storage(STOPPING_GROUP) else: # Do not update last_check to shutdown it asap, was not running after all @@ -228,9 +261,17 @@ class DeferredDeletionWorker(Job): def process_stopping(self) -> None: services, stopping = DeletionInfo.get_from_storage(STOPPING_GROUP) - # Now process waiting stops + # Now process waiting for finishing stops for key, info in stopping: try: + info.retries += 1 + if info.retries > RETRIES_TO_RETRY: + # If we have tried to stop it, and it has not stopped, add to stop again + info.last_check = sql_now() + info.total_retries += 1 + info.sync_to_storage(TO_STOP_GROUP) + continue + if services[info.service_uuid].is_running(None, info.vmid): info.last_check = sql_now() info.total_retries += 1 @@ -251,7 +292,8 @@ class DeferredDeletionWorker(Job): services[info.service_uuid].execute_delete(info.vmid) # And store it for checking later if it has been deleted, reseting counters info.last_check = sql_now() - info.fatal_retries = info.total_retries = 0 + info.retries = 0 + info.total_retries += 1 info.sync_to_storage(DELETING_GROUP) except Exception as e: self._process_exception(key, info, TO_DELETE_GROUP, services, e) @@ -264,9 +306,17 @@ class DeferredDeletionWorker(Job): """ services, deleting = DeletionInfo.get_from_storage(DELETING_GROUP) - # Now process waiting deletions checks + # Now process waiting for finishing deletions for key, info in deleting: try: + info.retries += 1 + if info.retries > RETRIES_TO_RETRY: + # If we have tried to delete it, and it has not been deleted, add to delete again + info.last_check = sql_now() + info.total_retries += 1 + info.sync_to_storage(TO_DELETE_GROUP) + continue + # If not finished, readd it for later check if not services[info.service_uuid].is_deleted(info.vmid): info.last_check = sql_now()