1
0
mirror of https://github.com/dkmstr/openuds.git synced 2025-01-03 01:17:56 +03:00

chore: Move constants and types from deferred deletion

Simple update for storage, now using contextmanager instead of an intermediary class for "as dict"
This commit is contained in:
Adolfo Gómez García 2024-08-30 18:40:32 +02:00
parent bf6e1674d2
commit e19805f081
No known key found for this signature in database
GPG Key ID: DD1ABF20724CDA23
7 changed files with 209 additions and 165 deletions

View File

@ -40,7 +40,9 @@ from uds.core import services
from uds.core.util.model import sql_now
from uds.workers import deferred_deletion
from uds.core.services.generics import exceptions as gen_exceptions
from uds.core.consts import defered_deletion as deferred_consts
from uds.core.consts import deferred_deletion as deferred_consts
from uds.core.types import deferred_deletion as deferred_types
from ....utils.test import UDSTransactionTestCase
from ....utils import helpers
@ -54,12 +56,7 @@ class DynamicDeferredDeleteTest(UDSTransactionTestCase):
services.factory().insert(fixtures.DynamicTestingProvider)
def set_last_check_expired(self) -> None:
for group in [
deferred_consts.TO_DELETE_GROUP,
deferred_consts.DELETING_GROUP,
deferred_consts.TO_STOP_GROUP,
deferred_consts.STOPPING_GROUP,
]:
for group in deferred_types.DeferredStorageGroup:
with deferred_deletion.DeferredDeletionWorker.deferred_storage.as_dict(group) as storage:
for key, info in typing.cast(dict[str, deferred_deletion.DeletionInfo], storage).items():
info.next_check = sql_now() - datetime.timedelta(seconds=1)
@ -154,13 +151,13 @@ class DynamicDeferredDeleteTest(UDSTransactionTestCase):
fixtures.DynamicTestingServiceForDeferredDeletion.mock.reset_mock()
# No entries into to_delete, nor TO_STOP nor STOPPING
self.assertEqual(self.count_entries_on_storage(deferred_consts.TO_DELETE_GROUP), 0)
self.assertEqual(self.count_entries_on_storage(deferred_consts.TO_STOP_GROUP), 0)
self.assertEqual(self.count_entries_on_storage(deferred_consts.STOPPING_GROUP), 0)
self.assertEqual(self.count_entries_on_storage(deferred_types.DeferredStorageGroup.TO_DELETE), 0)
self.assertEqual(self.count_entries_on_storage(deferred_types.DeferredStorageGroup.TO_STOP), 0)
self.assertEqual(self.count_entries_on_storage(deferred_types.DeferredStorageGroup.STOPPING), 0)
# Storage db should have 16 entries
with deferred_deletion.DeferredDeletionWorker.deferred_storage.as_dict(
deferred_consts.DELETING_GROUP
deferred_types.DeferredStorageGroup.DELETING
) as deleting:
self.assertEqual(len(deleting), 16)
for key, info in typing.cast(dict[str, deferred_deletion.DeletionInfo], deleting).items():
@ -174,13 +171,13 @@ class DynamicDeferredDeleteTest(UDSTransactionTestCase):
# Instantiate the Job
job = deferred_deletion.DeferredDeletionWorker(environment=mock.MagicMock())
to_delete = deferred_deletion.DeletionInfo.get_from_storage(deferred_consts.TO_DELETE_GROUP)
to_delete = deferred_deletion.DeletionInfo.get_from_storage(deferred_types.DeferredStorageGroup.TO_DELETE)
# Should be empty, both services and infos
self.assertEqual(len(to_delete[0]), 0)
self.assertEqual(len(to_delete[1]), 0)
# Now, get from deleting
deleting = deferred_deletion.DeletionInfo.get_from_storage(deferred_consts.DELETING_GROUP)
deleting = deferred_deletion.DeletionInfo.get_from_storage(deferred_types.DeferredStorageGroup.DELETING)
# Should have o services and infos also, because last_check has been too soon
self.assertEqual(len(deleting[0]), 0)
self.assertEqual(len(deleting[1]), 0)
@ -192,25 +189,25 @@ class DynamicDeferredDeleteTest(UDSTransactionTestCase):
# OVerride MAX_DELETIONS_AT_ONCE to get only 1 entries
deferred_consts.MAX_DELETIONS_AT_ONCE = 1
services_1, key_info_1 = deferred_deletion.DeletionInfo.get_from_storage(
deferred_consts.DELETING_GROUP
deferred_types.DeferredStorageGroup.DELETING
)
self.assertEqual(len(services_1), 1)
self.assertEqual(len(key_info_1), 1)
# And should rest only 15 on storage
with deferred_deletion.DeferredDeletionWorker.deferred_storage.as_dict(
deferred_consts.DELETING_GROUP
deferred_types.DeferredStorageGroup.DELETING
) as deleting:
self.assertEqual(len(deleting), 15)
deferred_consts.MAX_DELETIONS_AT_ONCE = 16
services_2, key_info_2 = deferred_deletion.DeletionInfo.get_from_storage(
deferred_consts.DELETING_GROUP
deferred_types.DeferredStorageGroup.DELETING
)
self.assertEqual(len(services_2), 8) # 8 services must be returned
self.assertEqual(len(key_info_2), 15) # And 15 entries
# Re-store all DELETING_GROUP entries
with deferred_deletion.DeferredDeletionWorker.deferred_storage.as_dict(
deferred_consts.DELETING_GROUP
deferred_types.DeferredStorageGroup.DELETING
) as deleting:
for info in itertools.chain(key_info_1, key_info_2):
deleting[info[0]] = info[1]
@ -224,8 +221,8 @@ class DynamicDeferredDeleteTest(UDSTransactionTestCase):
# Should have called is_deleted 16 times
self.assertEqual(fixtures.DynamicTestingServiceForDeferredDeletion.mock.is_deleted.call_count, 16)
# And should have removed all entries from deleting, because is_deleted returns True
self.assertEqual(self.count_entries_on_storage(deferred_consts.DELETING_GROUP), 0)
self.assertEqual(self.count_entries_on_storage(deferred_consts.TO_DELETE_GROUP), 0)
self.assertEqual(self.count_entries_on_storage(deferred_types.DeferredStorageGroup.DELETING), 0)
self.assertEqual(self.count_entries_on_storage(deferred_types.DeferredStorageGroup.TO_DELETE), 0)
def test_delete_delayed_full(self) -> None:
service = fixtures.create_dynamic_service_for_deferred_deletion()
@ -252,12 +249,12 @@ class DynamicDeferredDeleteTest(UDSTransactionTestCase):
instance.mock.execute_delete.assert_not_called()
# No entries deleting
self.assertEqual(self.count_entries_on_storage(deferred_consts.DELETING_GROUP), 0)
self.assertEqual(self.count_entries_on_storage(deferred_types.DeferredStorageGroup.DELETING), 0)
# to_delete should contain one entry
self.assertEqual(self.count_entries_on_storage(deferred_consts.TO_DELETE_GROUP), 1)
self.assertEqual(self.count_entries_on_storage(deferred_types.DeferredStorageGroup.TO_DELETE), 1)
job = deferred_deletion.DeferredDeletionWorker(environment=mock.MagicMock())
to_delete = deferred_deletion.DeletionInfo.get_from_storage(deferred_consts.TO_DELETE_GROUP)
to_delete = deferred_deletion.DeletionInfo.get_from_storage(deferred_types.DeferredStorageGroup.TO_DELETE)
# Should be empty, both services and infos
self.assertEqual(len(to_delete[0]), 0)
self.assertEqual(len(to_delete[1]), 0)
@ -266,15 +263,15 @@ class DynamicDeferredDeleteTest(UDSTransactionTestCase):
self.set_last_check_expired()
# Now, get from deleting again, should have all services and infos
services, key_info = deferred_deletion.DeletionInfo.get_from_storage(deferred_consts.TO_DELETE_GROUP)
services, key_info = deferred_deletion.DeletionInfo.get_from_storage(deferred_types.DeferredStorageGroup.TO_DELETE)
self.assertEqual(len(services), 1)
self.assertEqual(len(key_info), 1)
# now, db should be empty
self.assertEqual(self.count_entries_on_storage(deferred_consts.TO_DELETE_GROUP), 0)
self.assertEqual(self.count_entries_on_storage(deferred_types.DeferredStorageGroup.TO_DELETE), 0)
# Re store the entry
with deferred_deletion.DeferredDeletionWorker.deferred_storage.as_dict(
deferred_consts.TO_DELETE_GROUP
deferred_types.DeferredStorageGroup.TO_DELETE
) as to_delete:
for info in key_info:
to_delete[info[0]] = info[1]
@ -285,13 +282,13 @@ class DynamicDeferredDeleteTest(UDSTransactionTestCase):
# Should have called execute_delete once
instance.mock.execute_delete.assert_called_once_with('vmid_1')
# And should have removed all entries from to_delete
self.assertEqual(self.count_entries_on_storage(deferred_consts.TO_DELETE_GROUP), 0)
self.assertEqual(self.count_entries_on_storage(deferred_consts.DELETING_GROUP), 1)
self.assertEqual(self.count_entries_on_storage(deferred_types.DeferredStorageGroup.TO_DELETE), 0)
self.assertEqual(self.count_entries_on_storage(deferred_types.DeferredStorageGroup.DELETING), 1)
instance.mock.reset_mock()
# And should have one entry in deleting
with deferred_deletion.DeferredDeletionWorker.deferred_storage.as_dict(
deferred_consts.DELETING_GROUP
deferred_types.DeferredStorageGroup.DELETING
) as deleting:
self.assertEqual(len(deleting), 1)
for key, info in typing.cast(dict[str, deferred_deletion.DeletionInfo], deleting).items():
@ -318,8 +315,8 @@ class DynamicDeferredDeleteTest(UDSTransactionTestCase):
# Now should have called is_deleted once, and no entries in deleting nor to_delete
instance.mock.is_deleted.assert_called_once_with('vmid_1')
self.assertEqual(self.count_entries_on_storage(deferred_consts.DELETING_GROUP), 0)
self.assertEqual(self.count_entries_on_storage(deferred_consts.TO_DELETE_GROUP), 0)
self.assertEqual(self.count_entries_on_storage(deferred_types.DeferredStorageGroup.DELETING), 0)
self.assertEqual(self.count_entries_on_storage(deferred_types.DeferredStorageGroup.TO_DELETE), 0)
def test_deletion_is_deleted(self) -> None:
for is_deleted in (True, False):
@ -329,11 +326,11 @@ class DynamicDeferredDeleteTest(UDSTransactionTestCase):
deferred_deletion.DeferredDeletionWorker.add(instance, 'vmid1', execute_later=False)
# No entries in TO_DELETE_GROUP
self.assertEqual(self.count_entries_on_storage(deferred_consts.TO_DELETE_GROUP), 0)
self.assertEqual(self.count_entries_on_storage(deferred_types.DeferredStorageGroup.TO_DELETE), 0)
# One entry in DELETING_GROUP
self.assertEqual(self.count_entries_on_storage(deferred_consts.DELETING_GROUP), 1)
self.assertEqual(self.count_entries_on_storage(deferred_types.DeferredStorageGroup.DELETING), 1)
info = next(iter(dct[deferred_consts.DELETING_GROUP].values()))
info = next(iter(dct[deferred_types.DeferredStorageGroup.DELETING].values()))
# Fix last_check
self.set_last_check_expired()
@ -345,11 +342,11 @@ class DynamicDeferredDeleteTest(UDSTransactionTestCase):
instance.is_deleted.assert_called_once_with('vmid1')
# if is_deleted returns True, should have removed the entry
if is_deleted:
self.assertEqual(self.count_entries_on_storage(deferred_consts.TO_DELETE_GROUP), 0)
self.assertEqual(self.count_entries_on_storage(deferred_consts.DELETING_GROUP), 0)
self.assertEqual(self.count_entries_on_storage(deferred_types.DeferredStorageGroup.TO_DELETE), 0)
self.assertEqual(self.count_entries_on_storage(deferred_types.DeferredStorageGroup.DELETING), 0)
else:
self.assertEqual(self.count_entries_on_storage(deferred_consts.TO_DELETE_GROUP), 0)
self.assertEqual(self.count_entries_on_storage(deferred_consts.DELETING_GROUP), 1)
self.assertEqual(self.count_entries_on_storage(deferred_types.DeferredStorageGroup.TO_DELETE), 0)
self.assertEqual(self.count_entries_on_storage(deferred_types.DeferredStorageGroup.DELETING), 1)
# Also, info should have been updated
self.assertEqual(info.fatal_retries, 0)
self.assertEqual(info.total_retries, 1)
@ -367,14 +364,14 @@ class DynamicDeferredDeleteTest(UDSTransactionTestCase):
# Not found should remove the entry and nothing more
if isinstance(error, gen_exceptions.NotFoundError):
self.assertEqual(self.count_entries_on_storage(deferred_consts.TO_DELETE_GROUP), 0)
self.assertEqual(self.count_entries_on_storage(deferred_consts.DELETING_GROUP), 0)
self.assertEqual(self.count_entries_on_storage(deferred_types.DeferredStorageGroup.TO_DELETE), 0)
self.assertEqual(self.count_entries_on_storage(deferred_types.DeferredStorageGroup.DELETING), 0)
continue
self.assertEqual(self.count_entries_on_storage(deferred_consts.TO_DELETE_GROUP), 1)
self.assertEqual(self.count_entries_on_storage(deferred_consts.DELETING_GROUP), 0)
self.assertEqual(self.count_entries_on_storage(deferred_types.DeferredStorageGroup.TO_DELETE), 1)
self.assertEqual(self.count_entries_on_storage(deferred_types.DeferredStorageGroup.DELETING), 0)
info = next(iter(dct[deferred_consts.TO_DELETE_GROUP].values())) # Get first element
info = next(iter(dct[deferred_types.DeferredStorageGroup.TO_DELETE].values())) # Get first element
self.assertEqual(info.vmid, 'vmid1')
self.assertEqual(info.service_uuid, instance.db_obj().uuid)
self.assertEqual(info.fatal_retries, 0)
@ -383,8 +380,8 @@ class DynamicDeferredDeleteTest(UDSTransactionTestCase):
job = deferred_deletion.DeferredDeletionWorker(environment=mock.MagicMock())
job.run()
# due to check_interval, no retries are done
self.assertEqual(self.count_entries_on_storage(deferred_consts.TO_DELETE_GROUP), 1)
self.assertEqual(self.count_entries_on_storage(deferred_consts.DELETING_GROUP), 0)
self.assertEqual(self.count_entries_on_storage(deferred_types.DeferredStorageGroup.TO_DELETE), 1)
self.assertEqual(self.count_entries_on_storage(deferred_types.DeferredStorageGroup.DELETING), 0)
# Fix last_check
self.set_last_check_expired()
@ -395,16 +392,16 @@ class DynamicDeferredDeleteTest(UDSTransactionTestCase):
if isinstance(error, gen_exceptions.RetryableError):
self.assertEqual(info.fatal_retries, 0)
self.assertEqual(info.total_retries, 1)
self.assertEqual(self.count_entries_on_storage(deferred_consts.TO_DELETE_GROUP), 1)
self.assertEqual(self.count_entries_on_storage(deferred_consts.DELETING_GROUP), 0)
self.assertEqual(self.count_entries_on_storage(deferred_types.DeferredStorageGroup.TO_DELETE), 1)
self.assertEqual(self.count_entries_on_storage(deferred_types.DeferredStorageGroup.DELETING), 0)
# Test that MAX_TOTAL_RETRIES works fine
deferred_consts.MAX_RETRAYABLE_ERROR_RETRIES = 2
# reset last_check, or it will not retry
self.set_last_check_expired()
job.run()
# Should have removed the entry
self.assertEqual(self.count_entries_on_storage(deferred_consts.TO_DELETE_GROUP), 0)
self.assertEqual(self.count_entries_on_storage(deferred_consts.DELETING_GROUP), 0)
self.assertEqual(self.count_entries_on_storage(deferred_types.DeferredStorageGroup.TO_DELETE), 0)
self.assertEqual(self.count_entries_on_storage(deferred_types.DeferredStorageGroup.DELETING), 0)
else:
self.assertEqual(info.fatal_retries, 1)
self.assertEqual(info.total_retries, 1)
@ -414,8 +411,8 @@ class DynamicDeferredDeleteTest(UDSTransactionTestCase):
self.set_last_check_expired()
job.run()
# Should have removed the entry
self.assertEqual(self.count_entries_on_storage(deferred_consts.TO_DELETE_GROUP), 0)
self.assertEqual(self.count_entries_on_storage(deferred_consts.DELETING_GROUP), 0)
self.assertEqual(self.count_entries_on_storage(deferred_types.DeferredStorageGroup.TO_DELETE), 0)
self.assertEqual(self.count_entries_on_storage(deferred_types.DeferredStorageGroup.DELETING), 0)
def test_deletion_fails_is_deleted(self) -> None:
for error in (
@ -429,11 +426,11 @@ class DynamicDeferredDeleteTest(UDSTransactionTestCase):
deferred_deletion.DeferredDeletionWorker.add(instance, 'vmid1', execute_later=False)
# No entries in TO_DELETE_GROUP
self.assertEqual(self.count_entries_on_storage(deferred_consts.TO_DELETE_GROUP), 0)
self.assertEqual(self.count_entries_on_storage(deferred_types.DeferredStorageGroup.TO_DELETE), 0)
# One entry in DELETING_GROUP
self.assertEqual(self.count_entries_on_storage(deferred_consts.DELETING_GROUP), 1)
self.assertEqual(self.count_entries_on_storage(deferred_types.DeferredStorageGroup.DELETING), 1)
info = next(iter(dct[deferred_consts.DELETING_GROUP].values()))
info = next(iter(dct[deferred_types.DeferredStorageGroup.DELETING].values()))
# Fix last_check
self.set_last_check_expired()
@ -446,19 +443,19 @@ class DynamicDeferredDeleteTest(UDSTransactionTestCase):
if isinstance(error, gen_exceptions.RetryableError):
self.assertEqual(info.fatal_retries, 0)
self.assertEqual(info.total_retries, 1)
self.assertEqual(self.count_entries_on_storage(deferred_consts.TO_DELETE_GROUP), 0)
self.assertEqual(self.count_entries_on_storage(deferred_consts.DELETING_GROUP), 1)
self.assertEqual(self.count_entries_on_storage(deferred_types.DeferredStorageGroup.TO_DELETE), 0)
self.assertEqual(self.count_entries_on_storage(deferred_types.DeferredStorageGroup.DELETING), 1)
# Test that MAX_TOTAL_RETRIES works fine
deferred_consts.MAX_RETRAYABLE_ERROR_RETRIES = 2
# reset last_check, or it will not retry
self.set_last_check_expired()
job.run()
# Should have removed the entry
self.assertEqual(self.count_entries_on_storage(deferred_consts.TO_DELETE_GROUP), 0)
self.assertEqual(self.count_entries_on_storage(deferred_consts.DELETING_GROUP), 0)
self.assertEqual(self.count_entries_on_storage(deferred_types.DeferredStorageGroup.TO_DELETE), 0)
self.assertEqual(self.count_entries_on_storage(deferred_types.DeferredStorageGroup.DELETING), 0)
elif isinstance(error, gen_exceptions.NotFoundError):
self.assertEqual(self.count_entries_on_storage(deferred_consts.TO_DELETE_GROUP), 0)
self.assertEqual(self.count_entries_on_storage(deferred_consts.DELETING_GROUP), 0)
self.assertEqual(self.count_entries_on_storage(deferred_types.DeferredStorageGroup.TO_DELETE), 0)
self.assertEqual(self.count_entries_on_storage(deferred_types.DeferredStorageGroup.DELETING), 0)
else:
self.assertEqual(info.fatal_retries, 1)
self.assertEqual(info.total_retries, 1)
@ -468,8 +465,8 @@ class DynamicDeferredDeleteTest(UDSTransactionTestCase):
self.set_last_check_expired()
job.run()
# Should have removed the entry
self.assertEqual(self.count_entries_on_storage(deferred_consts.TO_DELETE_GROUP), 0)
self.assertEqual(self.count_entries_on_storage(deferred_consts.DELETING_GROUP), 0)
self.assertEqual(self.count_entries_on_storage(deferred_types.DeferredStorageGroup.TO_DELETE), 0)
self.assertEqual(self.count_entries_on_storage(deferred_types.DeferredStorageGroup.DELETING), 0)
def test_stop(self) -> None:
@ -542,10 +539,10 @@ class DynamicDeferredDeleteTest(UDSTransactionTestCase):
self.assertEqual(
COUNTERS_ADD[(running, execute_later, should_try_soft_shutdown)],
(
self.count_entries_on_storage(deferred_consts.TO_DELETE_GROUP),
self.count_entries_on_storage(deferred_consts.DELETING_GROUP),
self.count_entries_on_storage(deferred_consts.TO_STOP_GROUP),
self.count_entries_on_storage(deferred_consts.STOPPING_GROUP),
self.count_entries_on_storage(deferred_types.DeferredStorageGroup.TO_DELETE),
self.count_entries_on_storage(deferred_types.DeferredStorageGroup.DELETING),
self.count_entries_on_storage(deferred_types.DeferredStorageGroup.TO_STOP),
self.count_entries_on_storage(deferred_types.DeferredStorageGroup.STOPPING),
instance.is_running.call_count,
instance.stop.call_count,
instance.shutdown.call_count,
@ -563,10 +560,10 @@ class DynamicDeferredDeleteTest(UDSTransactionTestCase):
self.assertEqual(
COUNTERS_JOB[(running, execute_later, should_try_soft_shutdown)],
(
self.count_entries_on_storage(deferred_consts.TO_DELETE_GROUP),
self.count_entries_on_storage(deferred_consts.DELETING_GROUP),
self.count_entries_on_storage(deferred_consts.TO_STOP_GROUP),
self.count_entries_on_storage(deferred_consts.STOPPING_GROUP),
self.count_entries_on_storage(deferred_types.DeferredStorageGroup.TO_DELETE),
self.count_entries_on_storage(deferred_types.DeferredStorageGroup.DELETING),
self.count_entries_on_storage(deferred_types.DeferredStorageGroup.TO_STOP),
self.count_entries_on_storage(deferred_types.DeferredStorageGroup.STOPPING),
instance.is_running.call_count,
instance.stop.call_count,
instance.shutdown.call_count,
@ -585,7 +582,7 @@ class DynamicDeferredDeleteTest(UDSTransactionTestCase):
) as (instance, dct):
deferred_deletion.DeferredDeletionWorker.add(instance, 'vmid1', execute_later=False)
info = next(iter(dct[deferred_consts.STOPPING_GROUP].values()))
info = next(iter(dct[deferred_types.DeferredStorageGroup.STOPPING].values()))
self.assertEqual(info.total_retries, 0)
self.assertEqual(info.fatal_retries, 0)
@ -632,7 +629,7 @@ class DynamicDeferredDeleteTest(UDSTransactionTestCase):
self.assertEqual(info.retries, 3)
# should be on TO_STOP_GROUP
self.assertEqual(self.count_entries_on_storage(deferred_consts.TO_STOP_GROUP), 1)
self.assertEqual(self.count_entries_on_storage(deferred_types.DeferredStorageGroup.TO_STOP), 1)
# On next call, again is_running will be called, and stop this time
self.set_last_check_expired()
@ -652,10 +649,10 @@ class DynamicDeferredDeleteTest(UDSTransactionTestCase):
# and STOPPING_GROUP is after TO_STOP_GROUP. So, after STOPPING adds it to TO_DELETE_GROUP
# the storage access method will remove it from TO_DELETE_GROUP due to MAX_TOTAL_RETRIES
self.assertEqual(self.count_entries_on_storage(deferred_consts.TO_STOP_GROUP), 0)
self.assertEqual(self.count_entries_on_storage(deferred_consts.STOPPING_GROUP), 0)
self.assertEqual(self.count_entries_on_storage(deferred_consts.TO_DELETE_GROUP), 0)
self.assertEqual(self.count_entries_on_storage(deferred_consts.DELETING_GROUP), 0)
self.assertEqual(self.count_entries_on_storage(deferred_types.DeferredStorageGroup.TO_STOP), 0)
self.assertEqual(self.count_entries_on_storage(deferred_types.DeferredStorageGroup.STOPPING), 0)
self.assertEqual(self.count_entries_on_storage(deferred_types.DeferredStorageGroup.TO_DELETE), 0)
self.assertEqual(self.count_entries_on_storage(deferred_types.DeferredStorageGroup.DELETING), 0)
def test_delete_retry_delete(self) -> None:
@ -668,7 +665,7 @@ class DynamicDeferredDeleteTest(UDSTransactionTestCase):
) as (instance, dct):
deferred_deletion.DeferredDeletionWorker.add(instance, 'vmid1', execute_later=False)
info = next(iter(dct[deferred_consts.DELETING_GROUP].values()))
info = next(iter(dct[deferred_types.DeferredStorageGroup.DELETING].values()))
self.assertEqual(info.total_retries, 0)
self.assertEqual(info.fatal_retries, 0)
@ -715,7 +712,7 @@ class DynamicDeferredDeleteTest(UDSTransactionTestCase):
self.assertEqual(info.retries, 3)
# should be on TO_DELETE_GROUP
self.assertEqual(self.count_entries_on_storage(deferred_consts.TO_DELETE_GROUP), 1)
self.assertEqual(self.count_entries_on_storage(deferred_types.DeferredStorageGroup.TO_DELETE), 1)
# On next call, again is_running will be called, and stop this time
self.set_last_check_expired()
@ -734,7 +731,7 @@ class DynamicDeferredDeleteTest(UDSTransactionTestCase):
# and STOPPING_GROUP is after TO_STOP_GROUP. So, after STOPPING adds it to TO_DELETE_GROUP
# the storage access method will remove it from TO_DELETE_GROUP due to MAX_TOTAL_RETRIES
self.assertEqual(self.count_entries_on_storage(deferred_consts.TO_STOP_GROUP), 0)
self.assertEqual(self.count_entries_on_storage(deferred_consts.STOPPING_GROUP), 0)
self.assertEqual(self.count_entries_on_storage(deferred_consts.TO_DELETE_GROUP), 0)
self.assertEqual(self.count_entries_on_storage(deferred_consts.DELETING_GROUP), 0)
self.assertEqual(self.count_entries_on_storage(deferred_types.DeferredStorageGroup.TO_STOP), 0)
self.assertEqual(self.count_entries_on_storage(deferred_types.DeferredStorageGroup.STOPPING), 0)
self.assertEqual(self.count_entries_on_storage(deferred_types.DeferredStorageGroup.TO_DELETE), 0)
self.assertEqual(self.count_entries_on_storage(deferred_types.DeferredStorageGroup.DELETING), 0)

View File

@ -48,8 +48,8 @@ MAX_DELAY_RATE: typing.Final[float] = 4.0
CHECK_INTERVAL: typing.Final[datetime.timedelta] = datetime.timedelta(seconds=11) # Check interval
FATAL_ERROR_INTERVAL_MULTIPLIER: typing.Final[int] = 2 # Multiplier for fatal errors
TO_STOP_GROUP: typing.Final[str] = 'to_stop'
STOPPING_GROUP: typing.Final[str] = 'stopping'
TO_DELETE_GROUP: typing.Final[str] = 'to_delete'
DELETING_GROUP: typing.Final[str] = 'deleting'
# TO_STOP_GROUP: typing.Final[str] = 'to_stop'
# STOPPING_GROUP: typing.Final[str] = 'stopping'
# TO_DELETE_GROUP: typing.Final[str] = 'to_delete'
# DELETING_GROUP: typing.Final[str] = 'deleting'

View File

@ -28,6 +28,7 @@
"""
Author: Adolfo Gómez, dkmaster at dkmon dot com
"""
import contextlib
import datetime
import logging
import typing
@ -41,7 +42,7 @@ from uds import models
from uds.core import exceptions, types
from uds.core.util import model as model_utils
from uds.core.util import singleton
from uds.core.util.storage import StorageAccess, Storage
from uds.core.util.storage import StorageAsDict, Storage
from .servers_api import events, requester
@ -65,11 +66,13 @@ class ServerManager(metaclass=singleton.Singleton):
def manager() -> 'ServerManager':
return ServerManager() # Singleton pattern will return always the same instance
def counter_storage(self) -> 'StorageAccess':
@contextlib.contextmanager
def counter_storage(self) -> typing.Iterator[StorageAsDict]:
# If counters are too old, restart them
if datetime.datetime.now() - self.last_counters_clean > self.MAX_COUNTERS_AGE:
self.clear_unmanaged_usage()
return Storage(self.STORAGE_NAME).as_dict(atomic=True, group='counters')
with Storage(self.STORAGE_NAME).as_dict(atomic=True, group='counters') as storage:
yield storage
def property_name(self, user: typing.Optional[typing.Union[str, 'models.User']]) -> str:
"""Returns the property name for a user"""
@ -155,7 +158,9 @@ class ServerManager(metaclass=singleton.Singleton):
# To values over threshold, we will add 1, so they are always worse than any value under threshold
# No matter if over threshold is overcalculed, it will be always worse than any value under threshold
# and all values over threshold will be affected in the same way
return weight_threshold - stats.weight() if stats.weight() < weight_threshold else 1 + stats.weight()
return (
weight_threshold - stats.weight() if stats.weight() < weight_threshold else 1 + stats.weight()
)
# Now, cachedStats has a list of tuples (stats, server), use it to find the best server
for stats, server in stats_and_servers:

View File

@ -0,0 +1,44 @@
# -*- coding: utf-8 -*-
#
# Copyright (c) 2024 Virtual Cable S.L.U.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without modification,
# are permitted provided that the following conditions are met:
#
# * Redistributions of source code must retain the above copyright notice,
# this list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above copyright notice,
# this list of conditions and the following disclaimer in the documentation
# and/or other materials provided with the distribution.
# * Neither the name of Virtual Cable S.L.U. nor the names of its contributors
# may be used to endorse or promote products derived from this software
# without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
# DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
# SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
"""
Author: Adolfo Gómez, dkmaster at dkmon dot com
"""
import enum
class DeferredStorageGroup(enum.StrEnum):
TO_STOP = 'to_stop'
STOPPING = 'stopping'
TO_DELETE = 'to_delete'
DELETING = 'deleting'
@staticmethod
def from_str(value: str) -> 'DeferredStorageGroup':
return DeferredStorageGroup(value)

View File

@ -29,6 +29,7 @@
"""
Author: Adolfo Gómez, dkmaster at dkmon dot com
"""
import contextlib
import pickle # nosec: This is e controled pickle use
import base64
import hashlib
@ -198,6 +199,9 @@ class StorageAsDict(dict[str, typing.Any]):
def delete(self, key: str) -> None:
self.__delitem__(key) # pylint: disable=unnecessary-dunder-call
def clear(self) -> None:
self._filtered.delete() # Removes all keys
# Custom utility methods
@property
def group(self) -> str:
@ -208,39 +212,6 @@ class StorageAsDict(dict[str, typing.Any]):
self._group = value or ''
class StorageAccess:
"""
Allows the access to the storage as a dict, with atomic transaction if requested
"""
_owner: str
_group: typing.Optional[str]
_atomic: typing.Optional[transaction.Atomic]
def __init__(
self,
owner: str,
group: typing.Optional[str] = None,
atomic: bool = False,
):
self._owner = owner
self._group = group
self._atomic = transaction.atomic() if atomic else None
def __enter__(self) -> StorageAsDict:
if self._atomic:
self._atomic.__enter__()
return StorageAsDict(
owner=self._owner,
group=self._group,
atomic=bool(self._atomic),
)
def __exit__(self, exc_type: typing.Any, exc_value: typing.Any, traceback: typing.Any) -> None:
if self._atomic:
self._atomic.__exit__(exc_type, exc_value, traceback)
class Storage:
_owner: str
_bownwer: bytes
@ -382,12 +353,17 @@ class Storage:
except Exception: # nosec: Not interested in processing exceptions, just ignores it
pass
@contextlib.contextmanager
def as_dict(
self,
group: typing.Optional[str] = None,
atomic: bool = False,
) -> StorageAccess:
return StorageAccess(self._owner, group=group, atomic=atomic)
) -> typing.Iterator[StorageAsDict]:
if atomic:
with transaction.atomic():
yield StorageAsDict(self._owner, group=group, atomic=True)
else:
yield StorageAsDict(self._owner, group=group, atomic=False)
def search_by_attr1(
self, attr1: typing.Union[collections.abc.Iterable[str], str]

View File

@ -210,10 +210,14 @@ class User(UUIDModel, properties.PropertiesMixin):
to_delete.clean_related_data()
# Remove related stored values
with storage.StorageAccess('manager' + str(to_delete.manager.uuid)) as store:
for key in store.keys():
store.delete(key)
try:
storage.StorageAsDict(
owner='manager' + str(to_delete.manager.uuid),
group=None,
atomic=False
).clear()
except Exception:
logger.exception('Removing stored data')
# now removes all "child" of this user, if it has children
User.objects.filter(parent=to_delete.id).delete()

View File

@ -39,7 +39,8 @@ from uds.models import Service
from uds.core.util.model import sql_now
from uds.core.jobs import Job
from uds.core.util import storage, utils
from uds.core.consts import defered_deletion as consts
from uds.core.consts import deferred_deletion as consts
from uds.core.types import deferred_deletion as types
from uds.core.services.generics import exceptions as gen_exceptions
@ -50,14 +51,18 @@ logger = logging.getLogger(__name__)
def execution_timer() -> 'utils.ExecutionTimer':
return utils.ExecutionTimer(delay_threshold=consts.OPERATION_DELAY_THRESHOLD, max_delay_rate=consts.MAX_DELAY_RATE)
return utils.ExecutionTimer(
delay_threshold=consts.OPERATION_DELAY_THRESHOLD, max_delay_rate=consts.MAX_DELAY_RATE
)
def next_execution_calculator(*, fatal: bool = False, delay_rate: float = 1.0) -> datetime.datetime:
"""
Returns the next check time for a deletion operation
"""
return sql_now() + (consts.CHECK_INTERVAL * (consts.FATAL_ERROR_INTERVAL_MULTIPLIER if fatal else 1) * delay_rate)
return sql_now() + (
consts.CHECK_INTERVAL * (consts.FATAL_ERROR_INTERVAL_MULTIPLIER if fatal else 1) * delay_rate
)
@dataclasses.dataclass
@ -70,7 +75,7 @@ class DeletionInfo:
total_retries: int = 0 # Total retries
retries: int = 0 # Retries to stop again or to delete again in STOPPING_GROUP or DELETING_GROUP
def sync_to_storage(self, group: str) -> None:
def sync_to_storage(self, group: types.DeferredStorageGroup) -> None:
"""
Ensures that this object is stored on the storage
If exists, it will be updated, if not, it will be created
@ -97,7 +102,7 @@ class DeletionInfo:
@staticmethod
def get_from_storage(
group: str,
group: types.DeferredStorageGroup,
) -> tuple[dict[str, 'DynamicService'], list[tuple[str, 'DeletionInfo']]]:
"""
Get a list of objects to be processed from storage
@ -151,7 +156,7 @@ class DeletionInfo:
return services, infos
@staticmethod
def count_from_storage(group: str) -> int:
def count_from_storage(group: types.DeferredStorageGroup) -> int:
# Counts the total number of objects in storage
with DeferredDeletionWorker.deferred_storage.as_dict(group) as storage_dict:
return len(storage_dict)
@ -181,13 +186,18 @@ class DeferredDeletionWorker(Job):
service.shutdown(None, vmid)
else:
service.stop(None, vmid)
DeletionInfo.create_on_storage(consts.STOPPING_GROUP, vmid, service.db_obj().uuid)
DeletionInfo.create_on_storage(
types.DeferredStorageGroup.STOPPING, vmid, service.db_obj().uuid
)
return
service.execute_delete(vmid)
# If this takes too long, we will delay the next check a bit
DeletionInfo.create_on_storage(
consts.DELETING_GROUP, vmid, service.db_obj().uuid, delay_rate=exec_time.delay_rate
types.DeferredStorageGroup.DELETING,
vmid,
service.db_obj().uuid,
delay_rate=exec_time.delay_rate,
)
except gen_exceptions.NotFoundError:
return # Already removed
@ -196,7 +206,7 @@ class DeferredDeletionWorker(Job):
'Could not delete %s from service %s: %s. Retrying later.', vmid, service.db_obj().name, e
)
DeletionInfo.create_on_storage(
consts.TO_DELETE_GROUP,
types.DeferredStorageGroup.TO_DELETE,
vmid,
service.db_obj().uuid,
delay_rate=exec_time.delay_rate,
@ -204,16 +214,18 @@ class DeferredDeletionWorker(Job):
return
else:
if service.must_stop_before_deletion:
DeletionInfo.create_on_storage(consts.TO_STOP_GROUP, vmid, service.db_obj().uuid)
DeletionInfo.create_on_storage(types.DeferredStorageGroup.TO_STOP, vmid, service.db_obj().uuid)
else:
DeletionInfo.create_on_storage(consts.TO_DELETE_GROUP, vmid, service.db_obj().uuid)
DeletionInfo.create_on_storage(
types.DeferredStorageGroup.TO_DELETE, vmid, service.db_obj().uuid
)
return
def _process_exception(
self,
key: str,
info: DeletionInfo,
to_group: str,
to_group: types.DeferredStorageGroup,
services: dict[str, 'DynamicService'],
e: Exception,
*,
@ -253,7 +265,7 @@ class DeferredDeletionWorker(Job):
info.sync_to_storage(to_group)
def process_to_stop(self) -> None:
services, to_stop = DeletionInfo.get_from_storage(consts.TO_STOP_GROUP)
services, to_stop = DeletionInfo.get_from_storage(types.DeferredStorageGroup.TO_STOP)
logger.debug('Processing %s to stop', to_stop)
# Now process waiting stops
@ -276,15 +288,17 @@ class DeferredDeletionWorker(Job):
service.stop(None, info.vmid) # Always try to stop it if we have tried before
info.next_check = next_execution_calculator(delay_rate=exec_time.delay_rate)
info.sync_to_storage(consts.STOPPING_GROUP)
info.sync_to_storage(types.DeferredStorageGroup.STOPPING)
else:
# Do not update last_check to shutdown it asap, was not running after all
info.sync_to_storage(consts.TO_DELETE_GROUP)
info.sync_to_storage(types.DeferredStorageGroup.TO_DELETE)
except Exception as e:
self._process_exception(key, info, consts.TO_STOP_GROUP, services, e, delay_rate=exec_time.delay_rate)
self._process_exception(
key, info, types.DeferredStorageGroup.TO_STOP, services, e, delay_rate=exec_time.delay_rate
)
def process_stopping(self) -> None:
services, stopping = DeletionInfo.get_from_storage(consts.STOPPING_GROUP)
services, stopping = DeletionInfo.get_from_storage(types.DeferredStorageGroup.STOPPING)
logger.debug('Processing %s stopping', stopping)
# Now process waiting for finishing stops
@ -296,22 +310,24 @@ class DeferredDeletionWorker(Job):
# If we have tried to stop it, and it has not stopped, add to stop again
info.next_check = next_execution_calculator()
info.total_retries += 1
info.sync_to_storage(consts.TO_STOP_GROUP)
info.sync_to_storage(types.DeferredStorageGroup.TO_STOP)
continue
with exec_time:
if services[info.service_uuid].is_running(None, info.vmid):
info.next_check = next_execution_calculator(delay_rate=exec_time.delay_rate)
info.total_retries += 1
info.sync_to_storage(consts.STOPPING_GROUP)
info.sync_to_storage(types.DeferredStorageGroup.STOPPING)
else:
info.next_check = next_execution_calculator(delay_rate=exec_time.delay_rate)
info.fatal_retries = info.total_retries = 0
info.sync_to_storage(consts.TO_DELETE_GROUP)
info.sync_to_storage(types.DeferredStorageGroup.TO_DELETE)
except Exception as e:
self._process_exception(key, info, consts.STOPPING_GROUP, services, e, delay_rate=exec_time.delay_rate)
self._process_exception(
key, info, types.DeferredStorageGroup.STOPPING, services, e, delay_rate=exec_time.delay_rate
)
def process_to_delete(self) -> None:
services, to_delete = DeletionInfo.get_from_storage(consts.TO_DELETE_GROUP)
services, to_delete = DeletionInfo.get_from_storage(types.DeferredStorageGroup.TO_DELETE)
logger.debug('Processing %s to delete', to_delete)
# Now process waiting deletions
@ -322,7 +338,7 @@ class DeferredDeletionWorker(Job):
with exec_time:
# If must be stopped before deletion, and is running, put it on to_stop
if service.must_stop_before_deletion and service.is_running(None, info.vmid):
info.sync_to_storage(consts.TO_STOP_GROUP)
info.sync_to_storage(types.DeferredStorageGroup.TO_STOP)
continue
service.execute_delete(info.vmid)
@ -330,10 +346,15 @@ class DeferredDeletionWorker(Job):
info.next_check = next_execution_calculator(delay_rate=exec_time.delay_rate)
info.retries = 0
info.total_retries += 1
info.sync_to_storage(consts.DELETING_GROUP)
info.sync_to_storage(types.DeferredStorageGroup.DELETING)
except Exception as e:
self._process_exception(
key, info, consts.TO_DELETE_GROUP, services, e, delay_rate=exec_time.delay_rate
key,
info,
types.DeferredStorageGroup.TO_DELETE,
services,
e,
delay_rate=exec_time.delay_rate,
)
def process_deleting(self) -> None:
@ -342,7 +363,7 @@ class DeferredDeletionWorker(Job):
Note: Very similar to process_to_delete, but this one is for objects that are already being deleted
"""
services, deleting = DeletionInfo.get_from_storage(consts.DELETING_GROUP)
services, deleting = DeletionInfo.get_from_storage(types.DeferredStorageGroup.DELETING)
logger.debug('Processing %s deleting', deleting)
# Now process waiting for finishing deletions
@ -354,16 +375,18 @@ class DeferredDeletionWorker(Job):
# If we have tried to delete it, and it has not been deleted, add to delete again
info.next_check = next_execution_calculator()
info.total_retries += 1
info.sync_to_storage(consts.TO_DELETE_GROUP)
info.sync_to_storage(types.DeferredStorageGroup.TO_DELETE)
continue
with exec_time:
# If not finished, readd it for later check
if not services[info.service_uuid].is_deleted(info.vmid):
info.next_check = next_execution_calculator(delay_rate=exec_time.delay_rate)
info.total_retries += 1
info.sync_to_storage(consts.DELETING_GROUP)
info.sync_to_storage(types.DeferredStorageGroup.DELETING)
except Exception as e:
self._process_exception(key, info, consts.DELETING_GROUP, services, e, delay_rate=exec_time.delay_rate)
self._process_exception(
key, info, types.DeferredStorageGroup.DELETING, services, e, delay_rate=exec_time.delay_rate
)
def run(self) -> None:
self.process_to_stop()
@ -375,12 +398,7 @@ class DeferredDeletionWorker(Job):
@staticmethod
def report(out: typing.TextIO) -> None:
out.write(DeletionInfo.csv_header() + '\n')
for group in [
consts.TO_DELETE_GROUP,
consts.DELETING_GROUP,
consts.TO_STOP_GROUP,
consts.STOPPING_GROUP,
]:
for group in types.DeferredStorageGroup:
with DeferredDeletionWorker.deferred_storage.as_dict(group) as storage:
for _key, info in typing.cast(dict[str, DeletionInfo], storage).items():
out.write(info.as_csv() + '\n')