1
0
mirror of https://github.com/dkmstr/openuds.git synced 2025-01-22 22:03:54 +03:00

Moving constants and types from deferred deletion

This commit is contained in:
Adolfo Gómez García 2024-08-30 16:26:50 +02:00
parent c69d880c40
commit bf6e1674d2
No known key found for this signature in database
GPG Key ID: DD1ABF20724CDA23
4 changed files with 259 additions and 151 deletions

View File

@ -40,6 +40,7 @@ from uds.core import services
from uds.core.util.model import sql_now
from uds.workers import deferred_deletion
from uds.core.services.generics import exceptions as gen_exceptions
from uds.core.consts import defered_deletion as deferred_consts
from ....utils.test import UDSTransactionTestCase
from ....utils import helpers
@ -54,10 +55,10 @@ class DynamicDeferredDeleteTest(UDSTransactionTestCase):
def set_last_check_expired(self) -> None:
for group in [
deferred_deletion.TO_DELETE_GROUP,
deferred_deletion.DELETING_GROUP,
deferred_deletion.TO_STOP_GROUP,
deferred_deletion.STOPPING_GROUP,
deferred_consts.TO_DELETE_GROUP,
deferred_consts.DELETING_GROUP,
deferred_consts.TO_STOP_GROUP,
deferred_consts.STOPPING_GROUP,
]:
with deferred_deletion.DeferredDeletionWorker.deferred_storage.as_dict(group) as storage:
for key, info in typing.cast(dict[str, deferred_deletion.DeletionInfo], storage).items():
@ -153,13 +154,13 @@ class DynamicDeferredDeleteTest(UDSTransactionTestCase):
fixtures.DynamicTestingServiceForDeferredDeletion.mock.reset_mock()
# No entries into to_delete, nor TO_STOP nor STOPPING
self.assertEqual(self.count_entries_on_storage(deferred_deletion.TO_DELETE_GROUP), 0)
self.assertEqual(self.count_entries_on_storage(deferred_deletion.TO_STOP_GROUP), 0)
self.assertEqual(self.count_entries_on_storage(deferred_deletion.STOPPING_GROUP), 0)
self.assertEqual(self.count_entries_on_storage(deferred_consts.TO_DELETE_GROUP), 0)
self.assertEqual(self.count_entries_on_storage(deferred_consts.TO_STOP_GROUP), 0)
self.assertEqual(self.count_entries_on_storage(deferred_consts.STOPPING_GROUP), 0)
# Storage db should have 16 entries
with deferred_deletion.DeferredDeletionWorker.deferred_storage.as_dict(
deferred_deletion.DELETING_GROUP
deferred_consts.DELETING_GROUP
) as deleting:
self.assertEqual(len(deleting), 16)
for key, info in typing.cast(dict[str, deferred_deletion.DeletionInfo], deleting).items():
@ -173,13 +174,13 @@ class DynamicDeferredDeleteTest(UDSTransactionTestCase):
# Instantiate the Job
job = deferred_deletion.DeferredDeletionWorker(environment=mock.MagicMock())
to_delete = deferred_deletion.DeletionInfo.get_from_storage(deferred_deletion.TO_DELETE_GROUP)
to_delete = deferred_deletion.DeletionInfo.get_from_storage(deferred_consts.TO_DELETE_GROUP)
# Should be empty, both services and infos
self.assertEqual(len(to_delete[0]), 0)
self.assertEqual(len(to_delete[1]), 0)
# Now, get from deleting
deleting = deferred_deletion.DeletionInfo.get_from_storage(deferred_deletion.DELETING_GROUP)
deleting = deferred_deletion.DeletionInfo.get_from_storage(deferred_consts.DELETING_GROUP)
# Should have o services and infos also, because last_check has been too soon
self.assertEqual(len(deleting[0]), 0)
self.assertEqual(len(deleting[1]), 0)
@ -189,33 +190,33 @@ class DynamicDeferredDeleteTest(UDSTransactionTestCase):
# Now, get from deleting again, should have all services and infos
# OVerride MAX_DELETIONS_AT_ONCE to get only 1 entries
deferred_deletion.MAX_DELETIONS_AT_ONCE = 1
deferred_consts.MAX_DELETIONS_AT_ONCE = 1
services_1, key_info_1 = deferred_deletion.DeletionInfo.get_from_storage(
deferred_deletion.DELETING_GROUP
deferred_consts.DELETING_GROUP
)
self.assertEqual(len(services_1), 1)
self.assertEqual(len(key_info_1), 1)
# And should rest only 15 on storage
with deferred_deletion.DeferredDeletionWorker.deferred_storage.as_dict(
deferred_deletion.DELETING_GROUP
deferred_consts.DELETING_GROUP
) as deleting:
self.assertEqual(len(deleting), 15)
deferred_deletion.MAX_DELETIONS_AT_ONCE = 16
deferred_consts.MAX_DELETIONS_AT_ONCE = 16
services_2, key_info_2 = deferred_deletion.DeletionInfo.get_from_storage(
deferred_deletion.DELETING_GROUP
deferred_consts.DELETING_GROUP
)
self.assertEqual(len(services_2), 8) # 8 services must be returned
self.assertEqual(len(key_info_2), 15) # And 15 entries
# Re-store all DELETING_GROUP entries
with deferred_deletion.DeferredDeletionWorker.deferred_storage.as_dict(
deferred_deletion.DELETING_GROUP
deferred_consts.DELETING_GROUP
) as deleting:
for info in itertools.chain(key_info_1, key_info_2):
deleting[info[0]] = info[1]
# set MAX_DELETIONS_AT_ONCE to a value bigger than 16
deferred_deletion.MAX_DELETIONS_AT_ONCE = 100
deferred_consts.MAX_DELETIONS_AT_ONCE = 100
# Now, process all entries normally
job.run()
@ -223,8 +224,8 @@ class DynamicDeferredDeleteTest(UDSTransactionTestCase):
# Should have called is_deleted 16 times
self.assertEqual(fixtures.DynamicTestingServiceForDeferredDeletion.mock.is_deleted.call_count, 16)
# And should have removed all entries from deleting, because is_deleted returns True
self.assertEqual(self.count_entries_on_storage(deferred_deletion.DELETING_GROUP), 0)
self.assertEqual(self.count_entries_on_storage(deferred_deletion.TO_DELETE_GROUP), 0)
self.assertEqual(self.count_entries_on_storage(deferred_consts.DELETING_GROUP), 0)
self.assertEqual(self.count_entries_on_storage(deferred_consts.TO_DELETE_GROUP), 0)
def test_delete_delayed_full(self) -> None:
service = fixtures.create_dynamic_service_for_deferred_deletion()
@ -251,12 +252,12 @@ class DynamicDeferredDeleteTest(UDSTransactionTestCase):
instance.mock.execute_delete.assert_not_called()
# No entries deleting
self.assertEqual(self.count_entries_on_storage(deferred_deletion.DELETING_GROUP), 0)
self.assertEqual(self.count_entries_on_storage(deferred_consts.DELETING_GROUP), 0)
# to_delete should contain one entry
self.assertEqual(self.count_entries_on_storage(deferred_deletion.TO_DELETE_GROUP), 1)
self.assertEqual(self.count_entries_on_storage(deferred_consts.TO_DELETE_GROUP), 1)
job = deferred_deletion.DeferredDeletionWorker(environment=mock.MagicMock())
to_delete = deferred_deletion.DeletionInfo.get_from_storage(deferred_deletion.TO_DELETE_GROUP)
to_delete = deferred_deletion.DeletionInfo.get_from_storage(deferred_consts.TO_DELETE_GROUP)
# Should be empty, both services and infos
self.assertEqual(len(to_delete[0]), 0)
self.assertEqual(len(to_delete[1]), 0)
@ -265,15 +266,15 @@ class DynamicDeferredDeleteTest(UDSTransactionTestCase):
self.set_last_check_expired()
# Now, get from deleting again, should have all services and infos
services, key_info = deferred_deletion.DeletionInfo.get_from_storage(deferred_deletion.TO_DELETE_GROUP)
services, key_info = deferred_deletion.DeletionInfo.get_from_storage(deferred_consts.TO_DELETE_GROUP)
self.assertEqual(len(services), 1)
self.assertEqual(len(key_info), 1)
# now, db should be empty
self.assertEqual(self.count_entries_on_storage(deferred_deletion.TO_DELETE_GROUP), 0)
self.assertEqual(self.count_entries_on_storage(deferred_consts.TO_DELETE_GROUP), 0)
# Re store the entry
with deferred_deletion.DeferredDeletionWorker.deferred_storage.as_dict(
deferred_deletion.TO_DELETE_GROUP
deferred_consts.TO_DELETE_GROUP
) as to_delete:
for info in key_info:
to_delete[info[0]] = info[1]
@ -284,13 +285,13 @@ class DynamicDeferredDeleteTest(UDSTransactionTestCase):
# Should have called execute_delete once
instance.mock.execute_delete.assert_called_once_with('vmid_1')
# And should have removed all entries from to_delete
self.assertEqual(self.count_entries_on_storage(deferred_deletion.TO_DELETE_GROUP), 0)
self.assertEqual(self.count_entries_on_storage(deferred_deletion.DELETING_GROUP), 1)
self.assertEqual(self.count_entries_on_storage(deferred_consts.TO_DELETE_GROUP), 0)
self.assertEqual(self.count_entries_on_storage(deferred_consts.DELETING_GROUP), 1)
instance.mock.reset_mock()
# And should have one entry in deleting
with deferred_deletion.DeferredDeletionWorker.deferred_storage.as_dict(
deferred_deletion.DELETING_GROUP
deferred_consts.DELETING_GROUP
) as deleting:
self.assertEqual(len(deleting), 1)
for key, info in typing.cast(dict[str, deferred_deletion.DeletionInfo], deleting).items():
@ -317,8 +318,8 @@ class DynamicDeferredDeleteTest(UDSTransactionTestCase):
# Now should have called is_deleted once, and no entries in deleting nor to_delete
instance.mock.is_deleted.assert_called_once_with('vmid_1')
self.assertEqual(self.count_entries_on_storage(deferred_deletion.DELETING_GROUP), 0)
self.assertEqual(self.count_entries_on_storage(deferred_deletion.TO_DELETE_GROUP), 0)
self.assertEqual(self.count_entries_on_storage(deferred_consts.DELETING_GROUP), 0)
self.assertEqual(self.count_entries_on_storage(deferred_consts.TO_DELETE_GROUP), 0)
def test_deletion_is_deleted(self) -> None:
for is_deleted in (True, False):
@ -328,11 +329,11 @@ class DynamicDeferredDeleteTest(UDSTransactionTestCase):
deferred_deletion.DeferredDeletionWorker.add(instance, 'vmid1', execute_later=False)
# No entries in TO_DELETE_GROUP
self.assertEqual(self.count_entries_on_storage(deferred_deletion.TO_DELETE_GROUP), 0)
self.assertEqual(self.count_entries_on_storage(deferred_consts.TO_DELETE_GROUP), 0)
# One entry in DELETING_GROUP
self.assertEqual(self.count_entries_on_storage(deferred_deletion.DELETING_GROUP), 1)
self.assertEqual(self.count_entries_on_storage(deferred_consts.DELETING_GROUP), 1)
info = next(iter(dct[deferred_deletion.DELETING_GROUP].values()))
info = next(iter(dct[deferred_consts.DELETING_GROUP].values()))
# Fix last_check
self.set_last_check_expired()
@ -344,11 +345,11 @@ class DynamicDeferredDeleteTest(UDSTransactionTestCase):
instance.is_deleted.assert_called_once_with('vmid1')
# if is_deleted returns True, should have removed the entry
if is_deleted:
self.assertEqual(self.count_entries_on_storage(deferred_deletion.TO_DELETE_GROUP), 0)
self.assertEqual(self.count_entries_on_storage(deferred_deletion.DELETING_GROUP), 0)
self.assertEqual(self.count_entries_on_storage(deferred_consts.TO_DELETE_GROUP), 0)
self.assertEqual(self.count_entries_on_storage(deferred_consts.DELETING_GROUP), 0)
else:
self.assertEqual(self.count_entries_on_storage(deferred_deletion.TO_DELETE_GROUP), 0)
self.assertEqual(self.count_entries_on_storage(deferred_deletion.DELETING_GROUP), 1)
self.assertEqual(self.count_entries_on_storage(deferred_consts.TO_DELETE_GROUP), 0)
self.assertEqual(self.count_entries_on_storage(deferred_consts.DELETING_GROUP), 1)
# Also, info should have been updated
self.assertEqual(info.fatal_retries, 0)
self.assertEqual(info.total_retries, 1)
@ -366,14 +367,14 @@ class DynamicDeferredDeleteTest(UDSTransactionTestCase):
# Not found should remove the entry and nothing more
if isinstance(error, gen_exceptions.NotFoundError):
self.assertEqual(self.count_entries_on_storage(deferred_deletion.TO_DELETE_GROUP), 0)
self.assertEqual(self.count_entries_on_storage(deferred_deletion.DELETING_GROUP), 0)
self.assertEqual(self.count_entries_on_storage(deferred_consts.TO_DELETE_GROUP), 0)
self.assertEqual(self.count_entries_on_storage(deferred_consts.DELETING_GROUP), 0)
continue
self.assertEqual(self.count_entries_on_storage(deferred_deletion.TO_DELETE_GROUP), 1)
self.assertEqual(self.count_entries_on_storage(deferred_deletion.DELETING_GROUP), 0)
self.assertEqual(self.count_entries_on_storage(deferred_consts.TO_DELETE_GROUP), 1)
self.assertEqual(self.count_entries_on_storage(deferred_consts.DELETING_GROUP), 0)
info = next(iter(dct[deferred_deletion.TO_DELETE_GROUP].values())) # Get first element
info = next(iter(dct[deferred_consts.TO_DELETE_GROUP].values())) # Get first element
self.assertEqual(info.vmid, 'vmid1')
self.assertEqual(info.service_uuid, instance.db_obj().uuid)
self.assertEqual(info.fatal_retries, 0)
@ -382,8 +383,8 @@ class DynamicDeferredDeleteTest(UDSTransactionTestCase):
job = deferred_deletion.DeferredDeletionWorker(environment=mock.MagicMock())
job.run()
# due to check_interval, no retries are done
self.assertEqual(self.count_entries_on_storage(deferred_deletion.TO_DELETE_GROUP), 1)
self.assertEqual(self.count_entries_on_storage(deferred_deletion.DELETING_GROUP), 0)
self.assertEqual(self.count_entries_on_storage(deferred_consts.TO_DELETE_GROUP), 1)
self.assertEqual(self.count_entries_on_storage(deferred_consts.DELETING_GROUP), 0)
# Fix last_check
self.set_last_check_expired()
@ -394,27 +395,27 @@ class DynamicDeferredDeleteTest(UDSTransactionTestCase):
if isinstance(error, gen_exceptions.RetryableError):
self.assertEqual(info.fatal_retries, 0)
self.assertEqual(info.total_retries, 1)
self.assertEqual(self.count_entries_on_storage(deferred_deletion.TO_DELETE_GROUP), 1)
self.assertEqual(self.count_entries_on_storage(deferred_deletion.DELETING_GROUP), 0)
self.assertEqual(self.count_entries_on_storage(deferred_consts.TO_DELETE_GROUP), 1)
self.assertEqual(self.count_entries_on_storage(deferred_consts.DELETING_GROUP), 0)
# Test that MAX_TOTAL_RETRIES works fine
deferred_deletion.MAX_RETRAYABLE_ERROR_RETRIES = 2
deferred_consts.MAX_RETRAYABLE_ERROR_RETRIES = 2
# reset last_check, or it will not retry
self.set_last_check_expired()
job.run()
# Should have removed the entry
self.assertEqual(self.count_entries_on_storage(deferred_deletion.TO_DELETE_GROUP), 0)
self.assertEqual(self.count_entries_on_storage(deferred_deletion.DELETING_GROUP), 0)
self.assertEqual(self.count_entries_on_storage(deferred_consts.TO_DELETE_GROUP), 0)
self.assertEqual(self.count_entries_on_storage(deferred_consts.DELETING_GROUP), 0)
else:
self.assertEqual(info.fatal_retries, 1)
self.assertEqual(info.total_retries, 1)
# test that MAX_FATAL_RETRIES works fine
deferred_deletion.MAX_FATAL_ERROR_RETRIES = 2
deferred_consts.MAX_FATAL_ERROR_RETRIES = 2
# reset last_check, or it will not retry
self.set_last_check_expired()
job.run()
# Should have removed the entry
self.assertEqual(self.count_entries_on_storage(deferred_deletion.TO_DELETE_GROUP), 0)
self.assertEqual(self.count_entries_on_storage(deferred_deletion.DELETING_GROUP), 0)
self.assertEqual(self.count_entries_on_storage(deferred_consts.TO_DELETE_GROUP), 0)
self.assertEqual(self.count_entries_on_storage(deferred_consts.DELETING_GROUP), 0)
def test_deletion_fails_is_deleted(self) -> None:
for error in (
@ -428,11 +429,11 @@ class DynamicDeferredDeleteTest(UDSTransactionTestCase):
deferred_deletion.DeferredDeletionWorker.add(instance, 'vmid1', execute_later=False)
# No entries in TO_DELETE_GROUP
self.assertEqual(self.count_entries_on_storage(deferred_deletion.TO_DELETE_GROUP), 0)
self.assertEqual(self.count_entries_on_storage(deferred_consts.TO_DELETE_GROUP), 0)
# One entry in DELETING_GROUP
self.assertEqual(self.count_entries_on_storage(deferred_deletion.DELETING_GROUP), 1)
self.assertEqual(self.count_entries_on_storage(deferred_consts.DELETING_GROUP), 1)
info = next(iter(dct[deferred_deletion.DELETING_GROUP].values()))
info = next(iter(dct[deferred_consts.DELETING_GROUP].values()))
# Fix last_check
self.set_last_check_expired()
@ -445,30 +446,30 @@ class DynamicDeferredDeleteTest(UDSTransactionTestCase):
if isinstance(error, gen_exceptions.RetryableError):
self.assertEqual(info.fatal_retries, 0)
self.assertEqual(info.total_retries, 1)
self.assertEqual(self.count_entries_on_storage(deferred_deletion.TO_DELETE_GROUP), 0)
self.assertEqual(self.count_entries_on_storage(deferred_deletion.DELETING_GROUP), 1)
self.assertEqual(self.count_entries_on_storage(deferred_consts.TO_DELETE_GROUP), 0)
self.assertEqual(self.count_entries_on_storage(deferred_consts.DELETING_GROUP), 1)
# Test that MAX_TOTAL_RETRIES works fine
deferred_deletion.MAX_RETRAYABLE_ERROR_RETRIES = 2
deferred_consts.MAX_RETRAYABLE_ERROR_RETRIES = 2
# reset last_check, or it will not retry
self.set_last_check_expired()
job.run()
# Should have removed the entry
self.assertEqual(self.count_entries_on_storage(deferred_deletion.TO_DELETE_GROUP), 0)
self.assertEqual(self.count_entries_on_storage(deferred_deletion.DELETING_GROUP), 0)
self.assertEqual(self.count_entries_on_storage(deferred_consts.TO_DELETE_GROUP), 0)
self.assertEqual(self.count_entries_on_storage(deferred_consts.DELETING_GROUP), 0)
elif isinstance(error, gen_exceptions.NotFoundError):
self.assertEqual(self.count_entries_on_storage(deferred_deletion.TO_DELETE_GROUP), 0)
self.assertEqual(self.count_entries_on_storage(deferred_deletion.DELETING_GROUP), 0)
self.assertEqual(self.count_entries_on_storage(deferred_consts.TO_DELETE_GROUP), 0)
self.assertEqual(self.count_entries_on_storage(deferred_consts.DELETING_GROUP), 0)
else:
self.assertEqual(info.fatal_retries, 1)
self.assertEqual(info.total_retries, 1)
# test that MAX_FATAL_RETRIES works fine
deferred_deletion.MAX_FATAL_ERROR_RETRIES = 2
deferred_consts.MAX_FATAL_ERROR_RETRIES = 2
# reset last_check, or it will not retry
self.set_last_check_expired()
job.run()
# Should have removed the entry
self.assertEqual(self.count_entries_on_storage(deferred_deletion.TO_DELETE_GROUP), 0)
self.assertEqual(self.count_entries_on_storage(deferred_deletion.DELETING_GROUP), 0)
self.assertEqual(self.count_entries_on_storage(deferred_consts.TO_DELETE_GROUP), 0)
self.assertEqual(self.count_entries_on_storage(deferred_consts.DELETING_GROUP), 0)
def test_stop(self) -> None:
@ -541,10 +542,10 @@ class DynamicDeferredDeleteTest(UDSTransactionTestCase):
self.assertEqual(
COUNTERS_ADD[(running, execute_later, should_try_soft_shutdown)],
(
self.count_entries_on_storage(deferred_deletion.TO_DELETE_GROUP),
self.count_entries_on_storage(deferred_deletion.DELETING_GROUP),
self.count_entries_on_storage(deferred_deletion.TO_STOP_GROUP),
self.count_entries_on_storage(deferred_deletion.STOPPING_GROUP),
self.count_entries_on_storage(deferred_consts.TO_DELETE_GROUP),
self.count_entries_on_storage(deferred_consts.DELETING_GROUP),
self.count_entries_on_storage(deferred_consts.TO_STOP_GROUP),
self.count_entries_on_storage(deferred_consts.STOPPING_GROUP),
instance.is_running.call_count,
instance.stop.call_count,
instance.shutdown.call_count,
@ -562,10 +563,10 @@ class DynamicDeferredDeleteTest(UDSTransactionTestCase):
self.assertEqual(
COUNTERS_JOB[(running, execute_later, should_try_soft_shutdown)],
(
self.count_entries_on_storage(deferred_deletion.TO_DELETE_GROUP),
self.count_entries_on_storage(deferred_deletion.DELETING_GROUP),
self.count_entries_on_storage(deferred_deletion.TO_STOP_GROUP),
self.count_entries_on_storage(deferred_deletion.STOPPING_GROUP),
self.count_entries_on_storage(deferred_consts.TO_DELETE_GROUP),
self.count_entries_on_storage(deferred_consts.DELETING_GROUP),
self.count_entries_on_storage(deferred_consts.TO_STOP_GROUP),
self.count_entries_on_storage(deferred_consts.STOPPING_GROUP),
instance.is_running.call_count,
instance.stop.call_count,
instance.shutdown.call_count,
@ -574,8 +575,8 @@ class DynamicDeferredDeleteTest(UDSTransactionTestCase):
)
def test_stop_retry_stop(self) -> None:
deferred_deletion.RETRIES_TO_RETRY = 2
deferred_deletion.MAX_RETRAYABLE_ERROR_RETRIES = 4
deferred_consts.RETRIES_TO_RETRY = 2
deferred_consts.MAX_RETRAYABLE_ERROR_RETRIES = 4
with self.patch_for_worker(
is_running=helpers.returns_true,
@ -584,7 +585,7 @@ class DynamicDeferredDeleteTest(UDSTransactionTestCase):
) as (instance, dct):
deferred_deletion.DeferredDeletionWorker.add(instance, 'vmid1', execute_later=False)
info = next(iter(dct[deferred_deletion.STOPPING_GROUP].values()))
info = next(iter(dct[deferred_consts.STOPPING_GROUP].values()))
self.assertEqual(info.total_retries, 0)
self.assertEqual(info.fatal_retries, 0)
@ -631,7 +632,7 @@ class DynamicDeferredDeleteTest(UDSTransactionTestCase):
self.assertEqual(info.retries, 3)
# should be on TO_STOP_GROUP
self.assertEqual(self.count_entries_on_storage(deferred_deletion.TO_STOP_GROUP), 1)
self.assertEqual(self.count_entries_on_storage(deferred_consts.TO_STOP_GROUP), 1)
# On next call, again is_running will be called, and stop this time
self.set_last_check_expired()
@ -651,15 +652,15 @@ class DynamicDeferredDeleteTest(UDSTransactionTestCase):
# and STOPPING_GROUP is after TO_STOP_GROUP. So, after STOPPING adds it to TO_DELETE_GROUP
# the storage access method will remove it from TO_DELETE_GROUP due to MAX_TOTAL_RETRIES
self.assertEqual(self.count_entries_on_storage(deferred_deletion.TO_STOP_GROUP), 0)
self.assertEqual(self.count_entries_on_storage(deferred_deletion.STOPPING_GROUP), 0)
self.assertEqual(self.count_entries_on_storage(deferred_deletion.TO_DELETE_GROUP), 0)
self.assertEqual(self.count_entries_on_storage(deferred_deletion.DELETING_GROUP), 0)
self.assertEqual(self.count_entries_on_storage(deferred_consts.TO_STOP_GROUP), 0)
self.assertEqual(self.count_entries_on_storage(deferred_consts.STOPPING_GROUP), 0)
self.assertEqual(self.count_entries_on_storage(deferred_consts.TO_DELETE_GROUP), 0)
self.assertEqual(self.count_entries_on_storage(deferred_consts.DELETING_GROUP), 0)
def test_delete_retry_delete(self) -> None:
deferred_deletion.RETRIES_TO_RETRY = 2
deferred_deletion.MAX_RETRAYABLE_ERROR_RETRIES = 4
deferred_consts.RETRIES_TO_RETRY = 2
deferred_consts.MAX_RETRAYABLE_ERROR_RETRIES = 4
with self.patch_for_worker(
is_running=helpers.returns_true,
@ -667,7 +668,7 @@ class DynamicDeferredDeleteTest(UDSTransactionTestCase):
) as (instance, dct):
deferred_deletion.DeferredDeletionWorker.add(instance, 'vmid1', execute_later=False)
info = next(iter(dct[deferred_deletion.DELETING_GROUP].values()))
info = next(iter(dct[deferred_consts.DELETING_GROUP].values()))
self.assertEqual(info.total_retries, 0)
self.assertEqual(info.fatal_retries, 0)
@ -714,7 +715,7 @@ class DynamicDeferredDeleteTest(UDSTransactionTestCase):
self.assertEqual(info.retries, 3)
# should be on TO_DELETE_GROUP
self.assertEqual(self.count_entries_on_storage(deferred_deletion.TO_DELETE_GROUP), 1)
self.assertEqual(self.count_entries_on_storage(deferred_consts.TO_DELETE_GROUP), 1)
# On next call, again is_running will be called, and stop this time
self.set_last_check_expired()
@ -733,7 +734,7 @@ class DynamicDeferredDeleteTest(UDSTransactionTestCase):
# and STOPPING_GROUP is after TO_STOP_GROUP. So, after STOPPING adds it to TO_DELETE_GROUP
# the storage access method will remove it from TO_DELETE_GROUP due to MAX_TOTAL_RETRIES
self.assertEqual(self.count_entries_on_storage(deferred_deletion.TO_STOP_GROUP), 0)
self.assertEqual(self.count_entries_on_storage(deferred_deletion.STOPPING_GROUP), 0)
self.assertEqual(self.count_entries_on_storage(deferred_deletion.TO_DELETE_GROUP), 0)
self.assertEqual(self.count_entries_on_storage(deferred_deletion.DELETING_GROUP), 0)
self.assertEqual(self.count_entries_on_storage(deferred_consts.TO_STOP_GROUP), 0)
self.assertEqual(self.count_entries_on_storage(deferred_consts.STOPPING_GROUP), 0)
self.assertEqual(self.count_entries_on_storage(deferred_consts.TO_DELETE_GROUP), 0)
self.assertEqual(self.count_entries_on_storage(deferred_consts.DELETING_GROUP), 0)

View File

@ -0,0 +1,55 @@
# -*- coding: utf-8 -*-
#
# Copyright (c) 2024 Virtual Cable S.L.U.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without modification,
# are permitted provided that the following conditions are met:
#
# * Redistributions of source code must retain the above copyright notice,
# this list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above copyright notice,
# this list of conditions and the following disclaimer in the documentation
# and/or other materials provided with the distribution.
# * Neither the name of Virtual Cable S.L.U. nor the names of its contributors
# may be used to endorse or promote products derived from this software
# without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
# DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
# SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
"""
Author: Adolfo Gómez, dkmaster at dkmon dot com
"""
import typing
import datetime
MAX_FATAL_ERROR_RETRIES: typing.Final[int] = 16
MAX_RETRAYABLE_ERROR_RETRIES: typing.Final[int] = 8192 # Max retries before giving up at most 72 hours
# Retries to stop again or to shutdown again in STOPPING_GROUP or DELETING_GROUP
RETRIES_TO_RETRY: typing.Final[int] = 32
MAX_DELETIONS_AT_ONCE: typing.Final[int] = 32
# For every operation that takes more than this time, multiplay CHECK_INTERVAL by (time / TIME_THRESHOLD)
OPERATION_DELAY_THRESHOLD: typing.Final[int] = 2
MAX_DELAY_RATE: typing.Final[float] = 4.0
# This interval is how long will take to check again for deletion, stopping, etc...
# That is, once a machine is deleted, every CHECK_INTERVAL seconds will be check that it has been deleted
CHECK_INTERVAL: typing.Final[datetime.timedelta] = datetime.timedelta(seconds=11) # Check interval
FATAL_ERROR_INTERVAL_MULTIPLIER: typing.Final[int] = 2 # Multiplier for fatal errors
TO_STOP_GROUP: typing.Final[str] = 'to_stop'
STOPPING_GROUP: typing.Final[str] = 'stopping'
TO_DELETE_GROUP: typing.Final[str] = 'to_delete'
DELETING_GROUP: typing.Final[str] = 'deleting'

View File

@ -0,0 +1,66 @@
#
# Copyright (c) 2024 Virtual Cable S.L.U.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without modification,
# are permitted provided that the following conditions are met:
#
# * Redistributions of source code must retain the above copyright notice,
# this list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above copyright notice,
# this list of conditions and the following disclaimer in the documentation
# and/or other materials provided with the distribution.
# * Neither the name of Virtual Cable S.L.U. nor the names of its contributors
# may be used to endorse or promote products derived from this software
# without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
# DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
# SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
"""
Author: Adolfo Gómez, dkmaster at dkmon dot com
"""
import argparse
import logging
import sys
import typing
from django.core.management.base import BaseCommand
from uds.core.util import config
from uds.workers.deferred_deletion import DeferredDeletionWorker
logger = logging.getLogger(__name__)
class Command(BaseCommand):
help = "Generate report of deletion queue"
def add_arguments(self, parser: argparse.ArgumentParser) -> None:
parser.add_argument(
'--output',
action='store',
dest='output',
default=None,
help='Output file',
)
def handle(self, *args: typing.Any, **options: typing.Any) -> None:
logger.debug("Show settings")
config.GlobalConfig.initialize()
output = options.get('output', None)
if output is None:
logger.debug("Output file: %s", output)
DeferredDeletionWorker.report(sys.stdout)
else:
with open(output, 'w', encoding='utf8') as f:
DeferredDeletionWorker.report(f)

View File

@ -39,6 +39,7 @@ from uds.models import Service
from uds.core.util.model import sql_now
from uds.core.jobs import Job
from uds.core.util import storage, utils
from uds.core.consts import defered_deletion as consts
from uds.core.services.generics import exceptions as gen_exceptions
@ -48,36 +49,15 @@ if typing.TYPE_CHECKING:
logger = logging.getLogger(__name__)
MAX_FATAL_ERROR_RETRIES: typing.Final[int] = 16
MAX_RETRAYABLE_ERROR_RETRIES: typing.Final[int] = 8192 # Max retries before giving up at most 72 hours
# Retries to stop again or to shutdown again in STOPPING_GROUP or DELETING_GROUP
RETRIES_TO_RETRY: typing.Final[int] = 32
MAX_DELETIONS_AT_ONCE: typing.Final[int] = 32
# For every operation that takes more than this time, multiplay CHECK_INTERVAL by (time / TIME_THRESHOLD)
OPERATION_DELAY_THRESHOLD: typing.Final[int] = 2
MAX_DELAY_RATE: typing.Final[float] = 4.0
# This interval is how long will take to check again for deletion, stopping, etc...
# That is, once a machine is deleted, every CHECK_INTERVAL seconds will be check that it has been deleted
CHECK_INTERVAL: typing.Final[datetime.timedelta] = datetime.timedelta(seconds=11) # Check interval
FATAL_ERROR_INTERVAL_MULTIPLIER: typing.Final[int] = 2 # Multiplier for fatal errors
TO_STOP_GROUP: typing.Final[str] = 'to_stop'
STOPPING_GROUP: typing.Final[str] = 'stopping'
TO_DELETE_GROUP: typing.Final[str] = 'to_delete'
DELETING_GROUP: typing.Final[str] = 'deleting'
def execution_timer() -> 'utils.ExecutionTimer':
return utils.ExecutionTimer(delay_threshold=OPERATION_DELAY_THRESHOLD, max_delay_rate=MAX_DELAY_RATE)
return utils.ExecutionTimer(delay_threshold=consts.OPERATION_DELAY_THRESHOLD, max_delay_rate=consts.MAX_DELAY_RATE)
def next_execution_calculator(*, fatal: bool = False, delay_rate: float = 1.0) -> datetime.datetime:
"""
Returns the next check time for a deletion operation
"""
return sql_now() + (CHECK_INTERVAL * (FATAL_ERROR_INTERVAL_MULTIPLIER if fatal else 1) * delay_rate)
return sql_now() + (consts.CHECK_INTERVAL * (consts.FATAL_ERROR_INTERVAL_MULTIPLIER if fatal else 1) * delay_rate)
@dataclasses.dataclass
@ -85,7 +65,7 @@ class DeletionInfo:
vmid: str
created: datetime.datetime
next_check: datetime.datetime
service_uuid: str
service_uuid: str # uuid of the service that owns this vmid (not the pool, but the service)
fatal_retries: int = 0 # Fatal error retries
total_retries: int = 0 # Total retries
retries: int = 0 # Retries to stop again or to delete again in STOPPING_GROUP or DELETING_GROUP
@ -99,6 +79,10 @@ class DeletionInfo:
with DeferredDeletionWorker.deferred_storage.as_dict(group, atomic=True) as storage_dict:
storage_dict[unique_key] = self
# For reporting
def as_csv(self) -> str:
return f'{self.vmid},{self.created},{self.next_check},{self.service_uuid},{self.fatal_retries},{self.total_retries},{self.retries}'
@staticmethod
def create_on_storage(group: str, vmid: str, service_uuid: str, delay_rate: float = 1.0) -> None:
unique_key = f'{service_uuid}_{vmid}'
@ -113,7 +97,7 @@ class DeletionInfo:
@staticmethod
def get_from_storage(
storage_name: str,
group: str,
) -> tuple[dict[str, 'DynamicService'], list[tuple[str, 'DeletionInfo']]]:
"""
Get a list of objects to be processed from storage
@ -130,13 +114,13 @@ class DeletionInfo:
# First, get ownership of to_delete objects to be processed
# We do this way to release db locks as soon as possible
now = sql_now()
with DeferredDeletionWorker.deferred_storage.as_dict(storage_name, atomic=True) as storage_dict:
with DeferredDeletionWorker.deferred_storage.as_dict(group, atomic=True) as storage_dict:
for key, info in sorted(
typing.cast(collections.abc.Iterable[tuple[str, DeletionInfo]], storage_dict.items()),
key=lambda x: x[1].next_check,
):
# if max retries reached, remove it
if info.total_retries >= MAX_RETRAYABLE_ERROR_RETRIES:
if info.total_retries >= consts.MAX_RETRAYABLE_ERROR_RETRIES:
logger.error(
'Too many retries deleting %s from service %s, removing from deferred deletion',
info.vmid,
@ -157,7 +141,7 @@ class DeletionInfo:
del storage_dict[key]
continue
if (count := count + 1) > MAX_DELETIONS_AT_ONCE:
if (count := count + 1) > consts.MAX_DELETIONS_AT_ONCE:
break
del storage_dict[key] # Remove from storage, being processed
@ -166,9 +150,11 @@ class DeletionInfo:
infos.append((key, info))
return services, infos
# For reporting
def as_csv(self) -> str:
return f'{self.vmid},{self.created},{self.next_check},{self.service_uuid},{self.fatal_retries},{self.total_retries},{self.retries}'
@staticmethod
def count_from_storage(group: str) -> int:
# Counts the total number of objects in storage
with DeferredDeletionWorker.deferred_storage.as_dict(group) as storage_dict:
return len(storage_dict)
@staticmethod
def csv_header() -> str:
@ -195,13 +181,13 @@ class DeferredDeletionWorker(Job):
service.shutdown(None, vmid)
else:
service.stop(None, vmid)
DeletionInfo.create_on_storage(STOPPING_GROUP, vmid, service.db_obj().uuid)
DeletionInfo.create_on_storage(consts.STOPPING_GROUP, vmid, service.db_obj().uuid)
return
service.execute_delete(vmid)
# If this takes too long, we will delay the next check a bit
DeletionInfo.create_on_storage(
DELETING_GROUP, vmid, service.db_obj().uuid, delay_rate=exec_time.delay_rate
consts.DELETING_GROUP, vmid, service.db_obj().uuid, delay_rate=exec_time.delay_rate
)
except gen_exceptions.NotFoundError:
return # Already removed
@ -210,7 +196,7 @@ class DeferredDeletionWorker(Job):
'Could not delete %s from service %s: %s. Retrying later.', vmid, service.db_obj().name, e
)
DeletionInfo.create_on_storage(
TO_DELETE_GROUP,
consts.TO_DELETE_GROUP,
vmid,
service.db_obj().uuid,
delay_rate=exec_time.delay_rate,
@ -218,9 +204,9 @@ class DeferredDeletionWorker(Job):
return
else:
if service.must_stop_before_deletion:
DeletionInfo.create_on_storage(TO_STOP_GROUP, vmid, service.db_obj().uuid)
DeletionInfo.create_on_storage(consts.TO_STOP_GROUP, vmid, service.db_obj().uuid)
else:
DeletionInfo.create_on_storage(TO_DELETE_GROUP, vmid, service.db_obj().uuid)
DeletionInfo.create_on_storage(consts.TO_DELETE_GROUP, vmid, service.db_obj().uuid)
return
def _process_exception(
@ -248,7 +234,7 @@ class DeferredDeletionWorker(Job):
if not is_retryable:
info.next_check = next_execution_calculator(fatal=True, delay_rate=delay_rate)
info.fatal_retries += 1
if info.fatal_retries >= MAX_FATAL_ERROR_RETRIES:
if info.fatal_retries >= consts.MAX_FATAL_ERROR_RETRIES:
logger.error(
'Fatal error deleting %s from service %s, removing from deferred deletion',
info.vmid,
@ -257,7 +243,7 @@ class DeferredDeletionWorker(Job):
return # Do not readd it
info.next_check = next_execution_calculator(delay_rate=delay_rate)
info.total_retries += 1
if info.total_retries >= MAX_RETRAYABLE_ERROR_RETRIES:
if info.total_retries >= consts.MAX_RETRAYABLE_ERROR_RETRIES:
logger.error(
'Too many retries deleting %s from service %s, removing from deferred deletion',
info.vmid,
@ -267,7 +253,7 @@ class DeferredDeletionWorker(Job):
info.sync_to_storage(to_group)
def process_to_stop(self) -> None:
services, to_stop = DeletionInfo.get_from_storage(TO_STOP_GROUP)
services, to_stop = DeletionInfo.get_from_storage(consts.TO_STOP_GROUP)
logger.debug('Processing %s to stop', to_stop)
# Now process waiting stops
@ -278,7 +264,7 @@ class DeferredDeletionWorker(Job):
with exec_time:
if service.is_running(None, info.vmid):
# if info.retries < RETRIES_TO_RETRY, means this is the first time we try to stop it
if info.retries < RETRIES_TO_RETRY:
if info.retries < consts.RETRIES_TO_RETRY:
if service.should_try_soft_shutdown():
service.shutdown(None, info.vmid)
else:
@ -290,15 +276,15 @@ class DeferredDeletionWorker(Job):
service.stop(None, info.vmid) # Always try to stop it if we have tried before
info.next_check = next_execution_calculator(delay_rate=exec_time.delay_rate)
info.sync_to_storage(STOPPING_GROUP)
info.sync_to_storage(consts.STOPPING_GROUP)
else:
# Do not update last_check to shutdown it asap, was not running after all
info.sync_to_storage(TO_DELETE_GROUP)
info.sync_to_storage(consts.TO_DELETE_GROUP)
except Exception as e:
self._process_exception(key, info, TO_STOP_GROUP, services, e, delay_rate=exec_time.delay_rate)
self._process_exception(key, info, consts.TO_STOP_GROUP, services, e, delay_rate=exec_time.delay_rate)
def process_stopping(self) -> None:
services, stopping = DeletionInfo.get_from_storage(STOPPING_GROUP)
services, stopping = DeletionInfo.get_from_storage(consts.STOPPING_GROUP)
logger.debug('Processing %s stopping', stopping)
# Now process waiting for finishing stops
@ -306,26 +292,26 @@ class DeferredDeletionWorker(Job):
exec_time = execution_timer()
try:
info.retries += 1
if info.retries > RETRIES_TO_RETRY:
if info.retries > consts.RETRIES_TO_RETRY:
# If we have tried to stop it, and it has not stopped, add to stop again
info.next_check = next_execution_calculator()
info.total_retries += 1
info.sync_to_storage(TO_STOP_GROUP)
info.sync_to_storage(consts.TO_STOP_GROUP)
continue
with exec_time:
if services[info.service_uuid].is_running(None, info.vmid):
info.next_check = next_execution_calculator(delay_rate=exec_time.delay_rate)
info.total_retries += 1
info.sync_to_storage(STOPPING_GROUP)
info.sync_to_storage(consts.STOPPING_GROUP)
else:
info.next_check = next_execution_calculator(delay_rate=exec_time.delay_rate)
info.fatal_retries = info.total_retries = 0
info.sync_to_storage(TO_DELETE_GROUP)
info.sync_to_storage(consts.TO_DELETE_GROUP)
except Exception as e:
self._process_exception(key, info, STOPPING_GROUP, services, e, delay_rate=exec_time.delay_rate)
self._process_exception(key, info, consts.STOPPING_GROUP, services, e, delay_rate=exec_time.delay_rate)
def process_to_delete(self) -> None:
services, to_delete = DeletionInfo.get_from_storage(TO_DELETE_GROUP)
services, to_delete = DeletionInfo.get_from_storage(consts.TO_DELETE_GROUP)
logger.debug('Processing %s to delete', to_delete)
# Now process waiting deletions
@ -336,7 +322,7 @@ class DeferredDeletionWorker(Job):
with exec_time:
# If must be stopped before deletion, and is running, put it on to_stop
if service.must_stop_before_deletion and service.is_running(None, info.vmid):
info.sync_to_storage(TO_STOP_GROUP)
info.sync_to_storage(consts.TO_STOP_GROUP)
continue
service.execute_delete(info.vmid)
@ -344,10 +330,10 @@ class DeferredDeletionWorker(Job):
info.next_check = next_execution_calculator(delay_rate=exec_time.delay_rate)
info.retries = 0
info.total_retries += 1
info.sync_to_storage(DELETING_GROUP)
info.sync_to_storage(consts.DELETING_GROUP)
except Exception as e:
self._process_exception(
key, info, TO_DELETE_GROUP, services, e, delay_rate=exec_time.delay_rate
key, info, consts.TO_DELETE_GROUP, services, e, delay_rate=exec_time.delay_rate
)
def process_deleting(self) -> None:
@ -356,7 +342,7 @@ class DeferredDeletionWorker(Job):
Note: Very similar to process_to_delete, but this one is for objects that are already being deleted
"""
services, deleting = DeletionInfo.get_from_storage(DELETING_GROUP)
services, deleting = DeletionInfo.get_from_storage(consts.DELETING_GROUP)
logger.debug('Processing %s deleting', deleting)
# Now process waiting for finishing deletions
@ -364,20 +350,20 @@ class DeferredDeletionWorker(Job):
exec_time = execution_timer()
try:
info.retries += 1
if info.retries > RETRIES_TO_RETRY:
if info.retries > consts.RETRIES_TO_RETRY:
# If we have tried to delete it, and it has not been deleted, add to delete again
info.next_check = next_execution_calculator()
info.total_retries += 1
info.sync_to_storage(TO_DELETE_GROUP)
info.sync_to_storage(consts.TO_DELETE_GROUP)
continue
with exec_time:
# If not finished, readd it for later check
if not services[info.service_uuid].is_deleted(info.vmid):
info.next_check = next_execution_calculator(delay_rate=exec_time.delay_rate)
info.total_retries += 1
info.sync_to_storage(DELETING_GROUP)
info.sync_to_storage(consts.DELETING_GROUP)
except Exception as e:
self._process_exception(key, info, DELETING_GROUP, services, e, delay_rate=exec_time.delay_rate)
self._process_exception(key, info, consts.DELETING_GROUP, services, e, delay_rate=exec_time.delay_rate)
def run(self) -> None:
self.process_to_stop()
@ -390,10 +376,10 @@ class DeferredDeletionWorker(Job):
def report(out: typing.TextIO) -> None:
out.write(DeletionInfo.csv_header() + '\n')
for group in [
TO_DELETE_GROUP,
DELETING_GROUP,
TO_STOP_GROUP,
STOPPING_GROUP,
consts.TO_DELETE_GROUP,
consts.DELETING_GROUP,
consts.TO_STOP_GROUP,
consts.STOPPING_GROUP,
]:
with DeferredDeletionWorker.deferred_storage.as_dict(group) as storage:
for _key, info in typing.cast(dict[str, DeletionInfo], storage).items():