1
0
mirror of https://github.com/dkmstr/openuds.git synced 2025-01-03 01:17:56 +03:00

Added support for retry delete and shutdown/stop if check takes too long

This commit is contained in:
Adolfo Gómez García 2024-06-25 04:38:28 +02:00
parent d279a44c24
commit e4be7859e0
No known key found for this signature in database
GPG Key ID: DD1ABF20724CDA23
3 changed files with 254 additions and 38 deletions

View File

@ -61,9 +61,7 @@ class DynamicServiceTest(UDSTransactionTestCase):
]:
with deferred_deletion.DeferredDeletionWorker.deferred_storage.as_dict(group) as storage:
for key, info in typing.cast(dict[str, deferred_deletion.DeletionInfo], storage).items():
info.last_check = sql_now() - datetime.timedelta(
seconds=deferred_deletion.CHECK_INTERVAL * 2
)
info.last_check = sql_now() - datetime.timedelta(seconds=deferred_deletion.CHECK_INTERVAL)
storage[key] = info
def count_entries_on_storage(self, group: str) -> int:
@ -74,7 +72,7 @@ class DynamicServiceTest(UDSTransactionTestCase):
def patch_for_worker(
self,
*,
execute_side_effect: typing.Union[None, typing.Callable[..., None], Exception] = None,
execute_delete_side_effect: typing.Union[None, typing.Callable[..., None], Exception] = None,
is_deleted_side_effect: typing.Union[None, typing.Callable[..., bool], Exception] = None,
is_running: typing.Union[None, typing.Callable[..., bool]] = None,
must_stop_before_deletion: bool = False,
@ -90,7 +88,7 @@ class DynamicServiceTest(UDSTransactionTestCase):
instance_db_obj = mock.MagicMock(uuid='service1')
instance_db_obj.get_instance.return_value = instance
instance.db_obj.return_value = instance_db_obj
instance.execute_delete.side_effect = execute_side_effect
instance.execute_delete.side_effect = execute_delete_side_effect or helpers.returns_none
instance.is_deleted.side_effect = is_deleted_side_effect or helpers.returns_true
instance.must_stop_before_deletion = must_stop_before_deletion
@ -117,7 +115,7 @@ class DynamicServiceTest(UDSTransactionTestCase):
storage.as_dict.side_effect = _as_dict
yield instance, dct
def test_deferred_delete_full_fine_delete(self) -> None:
def test_delete_full_fine_delete(self) -> None:
# Tests only delete and is_deleted, no stop and stopping
service = fixtures.create_dynamic_service_for_deferred_deletion()
@ -228,7 +226,7 @@ class DynamicServiceTest(UDSTransactionTestCase):
self.assertEqual(self.count_entries_on_storage(deferred_deletion.DELETING_GROUP), 0)
self.assertEqual(self.count_entries_on_storage(deferred_deletion.TO_DELETE_GROUP), 0)
def test_deferred_delete_delayed_full(self) -> None:
def test_delete_delayed_full(self) -> None:
service = fixtures.create_dynamic_service_for_deferred_deletion()
provider = models.Provider.objects.create(
@ -322,7 +320,7 @@ class DynamicServiceTest(UDSTransactionTestCase):
self.assertEqual(self.count_entries_on_storage(deferred_deletion.DELETING_GROUP), 0)
self.assertEqual(self.count_entries_on_storage(deferred_deletion.TO_DELETE_GROUP), 0)
def test_deferred_deletion_is_deleted(self) -> None:
def test_deletion_is_deleted(self) -> None:
for is_deleted in (True, False):
with self.patch_for_worker(
is_deleted_side_effect=lambda *args: is_deleted,
@ -355,14 +353,14 @@ class DynamicServiceTest(UDSTransactionTestCase):
self.assertEqual(info.fatal_retries, 0)
self.assertEqual(info.total_retries, 1)
def test_deferred_deletion_fails_add(self) -> None:
def test_deletion_fails_add(self) -> None:
for error in (
gen_exceptions.RetryableError('error'),
gen_exceptions.NotFoundError('error'),
gen_exceptions.FatalError('error'),
):
with self.patch_for_worker(
execute_side_effect=error,
execute_delete_side_effect=error,
) as (instance, dct):
deferred_deletion.DeferredDeletionWorker.add(instance, 'vmid1', execute_later=False)
@ -418,7 +416,7 @@ class DynamicServiceTest(UDSTransactionTestCase):
self.assertEqual(self.count_entries_on_storage(deferred_deletion.TO_DELETE_GROUP), 0)
self.assertEqual(self.count_entries_on_storage(deferred_deletion.DELETING_GROUP), 0)
def test_deferred_deletion_fails_is_deleted(self) -> None:
def test_deletion_fails_is_deleted(self) -> None:
for error in (
gen_exceptions.RetryableError('error'),
gen_exceptions.NotFoundError('error'),
@ -472,7 +470,7 @@ class DynamicServiceTest(UDSTransactionTestCase):
self.assertEqual(self.count_entries_on_storage(deferred_deletion.TO_DELETE_GROUP), 0)
self.assertEqual(self.count_entries_on_storage(deferred_deletion.DELETING_GROUP), 0)
def test_deferred_stop(self) -> None:
def test_stop(self) -> None:
# Explanation:
# key: running, execute_later, should_try_soft_shutdown
@ -574,3 +572,168 @@ class DynamicServiceTest(UDSTransactionTestCase):
),
f'COUNTERS_JOB {running} {execute_later} --> {COUNTERS_JOB[(running, execute_later, should_try_soft_shutdown)]}',
)
def test_stop_retry_stop(self) -> None:
deferred_deletion.RETRIES_TO_RETRY = 2
deferred_deletion.MAX_TOTAL_RETRIES = 4
with self.patch_for_worker(
is_running=helpers.returns_true,
must_stop_before_deletion=True,
should_try_soft_shutdown=True,
) as (instance, dct):
deferred_deletion.DeferredDeletionWorker.add(instance, 'vmid1', execute_later=False)
info = next(iter(dct[deferred_deletion.STOPPING_GROUP].values()))
self.assertEqual(info.total_retries, 0)
self.assertEqual(info.fatal_retries, 0)
self.assertEqual(info.retries, 0)
instance.is_running.assert_called_once()
instance.should_try_soft_shutdown.assert_called_once()
instance.shutdown.assert_called_once()
instance.reset_mock()
job = deferred_deletion.DeferredDeletionWorker(environment=mock.MagicMock())
# On fist run, will already be running
self.set_last_check_expired() # To ensure it's processed
job.run()
instance.is_running.assert_called_once()
instance.reset_mock()
self.assertEqual(info.total_retries, 1)
self.assertEqual(info.fatal_retries, 0)
self.assertEqual(info.retries, 1)
# On second run, will already be running
self.set_last_check_expired()
job.run()
instance.is_running.assert_called_once()
instance.reset_mock()
self.assertEqual(info.total_retries, 2)
self.assertEqual(info.fatal_retries, 0)
self.assertEqual(info.retries, 2)
# On third run, will simply be readded to TO_STOP_GROUP
self.set_last_check_expired()
job.run()
# No calls
instance.assert_not_called()
self.assertEqual(info.total_retries, 3)
self.assertEqual(info.fatal_retries, 0)
self.assertEqual(info.retries, 3)
# should be on TO_STOP_GROUP
self.assertEqual(self.count_entries_on_storage(deferred_deletion.TO_STOP_GROUP), 1)
# On next call, again is_running will be called, and stop this time
self.set_last_check_expired()
job.run()
instance.is_running.assert_called_once()
instance.stop.assert_called_once()
instance.reset_mock()
# Reseted retries, but no total_retries (as it's a new try)
self.assertEqual(info.total_retries, 4)
self.assertEqual(info.fatal_retries, 0)
self.assertEqual(info.retries, 0)
# But sould have been removed from all queues
# due to MAX_TOTAL_RETRIES, this is checked for every queue on storage access,
# and STOPPING_GROUP is after TO_STOP_GROUP. So, after STOPPING adds it to TO_DELETE_GROUP
# the storage access method will remove it from TO_DELETE_GROUP due to MAX_TOTAL_RETRIES
self.assertEqual(self.count_entries_on_storage(deferred_deletion.TO_STOP_GROUP), 0)
self.assertEqual(self.count_entries_on_storage(deferred_deletion.STOPPING_GROUP), 0)
self.assertEqual(self.count_entries_on_storage(deferred_deletion.TO_DELETE_GROUP), 0)
self.assertEqual(self.count_entries_on_storage(deferred_deletion.DELETING_GROUP), 0)
def test_delete_retry_delete(self) -> None:
deferred_deletion.RETRIES_TO_RETRY = 2
deferred_deletion.MAX_TOTAL_RETRIES = 4
with self.patch_for_worker(
is_running=helpers.returns_true,
is_deleted_side_effect=helpers.returns_false,
) as (instance, dct):
deferred_deletion.DeferredDeletionWorker.add(instance, 'vmid1', execute_later=False)
info = next(iter(dct[deferred_deletion.DELETING_GROUP].values()))
self.assertEqual(info.total_retries, 0)
self.assertEqual(info.fatal_retries, 0)
self.assertEqual(info.retries, 0)
instance.is_running.assert_not_called()
instance.should_try_soft_shutdown.assert_not_called()
instance.execute_delete.assert_called()
instance.reset_mock()
job = deferred_deletion.DeferredDeletionWorker(environment=mock.MagicMock())
# On fist run, will already be running
self.set_last_check_expired() # To ensure it's processed
job.run()
instance.is_deleted.assert_called_once()
instance.reset_mock()
self.assertEqual(info.total_retries, 1)
self.assertEqual(info.fatal_retries, 0)
self.assertEqual(info.retries, 1)
# On second run, will already be running
self.set_last_check_expired()
job.run()
instance.is_deleted.assert_called_once()
instance.reset_mock()
self.assertEqual(info.total_retries, 2)
self.assertEqual(info.fatal_retries, 0)
self.assertEqual(info.retries, 2)
# On third run, will simply be readded to TO_DELETE_GROUP
self.set_last_check_expired()
job.run()
# No calls
instance.assert_not_called()
self.assertEqual(info.total_retries, 3)
self.assertEqual(info.fatal_retries, 0)
self.assertEqual(info.retries, 3)
# should be on TO_DELETE_GROUP
self.assertEqual(self.count_entries_on_storage(deferred_deletion.TO_DELETE_GROUP), 1)
# On next call, again is_running will be called, and stop this time
self.set_last_check_expired()
job.run()
instance.execute_delete.assert_called_once()
instance.reset_mock()
# Reseted retries, but no total_retries (as it's a new try)
self.assertEqual(info.total_retries, 4)
self.assertEqual(info.fatal_retries, 0)
self.assertEqual(info.retries, 0)
# But sould have been removed from all queues
# due to MAX_TOTAL_RETRIES, this is checked for every queue on storage access,
# and STOPPING_GROUP is after TO_STOP_GROUP. So, after STOPPING adds it to TO_DELETE_GROUP
# the storage access method will remove it from TO_DELETE_GROUP due to MAX_TOTAL_RETRIES
self.assertEqual(self.count_entries_on_storage(deferred_deletion.TO_STOP_GROUP), 0)
self.assertEqual(self.count_entries_on_storage(deferred_deletion.STOPPING_GROUP), 0)
self.assertEqual(self.count_entries_on_storage(deferred_deletion.TO_DELETE_GROUP), 0)
self.assertEqual(self.count_entries_on_storage(deferred_deletion.DELETING_GROUP), 0)

View File

@ -114,3 +114,6 @@ def returns_true(*args: typing.Any, **kwargs: typing.Any) -> bool:
def returns_false(*args: typing.Any, **kwargs: typing.Any) -> bool:
return False
def returns_none(*args: typing.Any, **kwargs: typing.Any) -> typing.Any:
return None

View File

@ -50,9 +50,14 @@ logger = logging.getLogger(__name__)
MAX_FATAL_ERROR_RETRIES: typing.Final[int] = 8
MAX_TOTAL_RETRIES: typing.Final[int] = 1024
RETRIES_TO_RETRY: typing.Final[int] = (
16 # Retries to stop again or to shutdown again in STOPPING_GROUP or DELETING_GROUP
)
MAX_DELETIONS_AT_ONCE: typing.Final[int] = 16
MAX_DELETIONS_CHECKED_AT_ONCE: typing.Final[int] = MAX_DELETIONS_AT_ONCE * 2
# This interval is how long will take to check again for deletion, stopping, etc...
# That is, once a machine is deleted, every 32 seconds will be check that it has been deleted
CHECK_INTERVAL: typing.Final[int] = 32 # Check interval, in seconds
TO_STOP_GROUP: typing.Final[str] = 'to_stop'
@ -69,6 +74,7 @@ class DeletionInfo:
service_uuid: str
fatal_retries: int = 0 # Fatal error retries
total_retries: int = 0 # Total retries
retries: int = 0 # Retries to stop again or to delete again in STOPPING_GROUP or DELETING_GROUP
def sync_to_storage(self, group: str) -> None:
"""
@ -88,13 +94,20 @@ class DeletionInfo:
created=sql_now(),
last_check=sql_now(),
service_uuid=service_uuid,
# fatal, total an retries are 0 by default
)
@staticmethod
def get_from_storage(
storage_name: str,
) -> tuple[dict[str, 'DynamicService'], list[tuple[str, 'DeletionInfo']]]:
# Get all wating deletion, and try it
"""
Get a list of objects to be processed from storage
Note:
This method will remove the objects from storage, so if needed, has to be readded
This is so we can release locks as soon as possible
"""
count = 0
infos: list[tuple[str, DeletionInfo]] = []
@ -107,6 +120,16 @@ class DeletionInfo:
typing.cast(collections.abc.Iterable[tuple[str, DeletionInfo]], storage_dict.items()),
key=lambda x: x[1].last_check,
):
# if max retries reached, remove it
if info.total_retries >= MAX_TOTAL_RETRIES:
logger.error(
'Too many retries deleting %s from service %s, removing from deferred deletion',
info.vmid,
info.service_uuid,
)
del storage_dict[key]
continue
if info.last_check + datetime.timedelta(seconds=CHECK_INTERVAL) > sql_now():
continue
try:
@ -122,13 +145,15 @@ class DeletionInfo:
if (count := count + 1) > MAX_DELETIONS_AT_ONCE:
break
infos.append((key, info))
del storage_dict[key] # Remove from storage, being processed
# Only add if not too many retries already
infos.append((key, info))
return services, infos
class DeferredDeletionWorker(Job):
frecuency = 32 # Frecuncy for this job, in seconds
frecuency = 19 # Frequency for this job, in seconds
friendly_name = 'Deferred deletion runner'
deferred_storage: typing.ClassVar[storage.Storage] = storage.Storage('deferdel_worker')
@ -211,13 +236,21 @@ class DeferredDeletionWorker(Job):
# Now process waiting stops
for key, info in to_stop:
try:
if services[info.service_uuid].is_running(None, info.vmid):
if services[info.service_uuid].should_try_soft_shutdown():
services[info.service_uuid].shutdown(None, info.vmid)
service = services[info.service_uuid]
if service.is_running(None, info.vmid):
# if info.retries < RETRIES_TO_RETRY, means this is the first time we try to stop it
if info.retries < RETRIES_TO_RETRY:
if service.should_try_soft_shutdown():
service.shutdown(None, info.vmid)
else:
services[info.service_uuid].stop(None, info.vmid)
info.last_check = sql_now()
service.stop(None, info.vmid)
info.fatal_retries = info.total_retries = 0
else:
info.total_retries += 1 # Count this as a general retry
info.retries = 0 # Reset retries
service.stop(None, info.vmid) # Always try to stop it if we have tried before
info.last_check = sql_now()
info.sync_to_storage(STOPPING_GROUP)
else:
# Do not update last_check to shutdown it asap, was not running after all
@ -228,9 +261,17 @@ class DeferredDeletionWorker(Job):
def process_stopping(self) -> None:
services, stopping = DeletionInfo.get_from_storage(STOPPING_GROUP)
# Now process waiting stops
# Now process waiting for finishing stops
for key, info in stopping:
try:
info.retries += 1
if info.retries > RETRIES_TO_RETRY:
# If we have tried to stop it, and it has not stopped, add to stop again
info.last_check = sql_now()
info.total_retries += 1
info.sync_to_storage(TO_STOP_GROUP)
continue
if services[info.service_uuid].is_running(None, info.vmid):
info.last_check = sql_now()
info.total_retries += 1
@ -251,7 +292,8 @@ class DeferredDeletionWorker(Job):
services[info.service_uuid].execute_delete(info.vmid)
# And store it for checking later if it has been deleted, reseting counters
info.last_check = sql_now()
info.fatal_retries = info.total_retries = 0
info.retries = 0
info.total_retries += 1
info.sync_to_storage(DELETING_GROUP)
except Exception as e:
self._process_exception(key, info, TO_DELETE_GROUP, services, e)
@ -264,9 +306,17 @@ class DeferredDeletionWorker(Job):
"""
services, deleting = DeletionInfo.get_from_storage(DELETING_GROUP)
# Now process waiting deletions checks
# Now process waiting for finishing deletions
for key, info in deleting:
try:
info.retries += 1
if info.retries > RETRIES_TO_RETRY:
# If we have tried to delete it, and it has not been deleted, add to delete again
info.last_check = sql_now()
info.total_retries += 1
info.sync_to_storage(TO_DELETE_GROUP)
continue
# If not finished, readd it for later check
if not services[info.service_uuid].is_deleted(info.vmid):
info.last_check = sql_now()