mirror of
https://github.com/dkmstr/openuds.git
synced 2025-02-02 09:47:13 +03:00
Adding support for waiting the userservice mark as "deleted" until the deferred deletion is completed, so limits can be correctly respected.
This commit is contained in:
parent
3f60e7509e
commit
ef4b7e5bac
@ -631,10 +631,6 @@ class DynamicTestingPublicationQueue(dynamic_publication.DynamicPublication):
|
||||
self.mock.shutdown_completed_checker()
|
||||
return TaskState.FINISHED
|
||||
|
||||
def op_delete_checker(self) -> types.states.TaskState:
|
||||
self.mock.remove_checker()
|
||||
return TaskState.FINISHED
|
||||
|
||||
def op_delete_completed_checker(self) -> types.states.TaskState:
|
||||
self.mock.remove_completed_checker()
|
||||
return TaskState.FINISHED
|
||||
@ -667,6 +663,10 @@ class DynamicTestingServiceForDeferredDeletion(dynamic_service.DynamicService):
|
||||
def is_deleted(self, vmid: str) -> bool:
|
||||
self.mock.is_deleted(vmid)
|
||||
return True
|
||||
|
||||
def notify_deleted(self, vmid: str) -> None:
|
||||
self.mock.notify_deleted(vmid)
|
||||
return
|
||||
|
||||
# Not used, but needed to be implemented due to bein abstract
|
||||
def get_ip(
|
||||
|
@ -57,13 +57,13 @@ class DynamicDeferredDeleteTest(UDSTransactionTestCase):
|
||||
|
||||
def set_last_check_expired(self) -> None:
|
||||
for group in deferred_types.DeferredStorageGroup:
|
||||
with deferred_deletion.DeletionInfo.deferred_storage.as_dict(group) as storage:
|
||||
for key, info in typing.cast(dict[str, deferred_deletion.DeletionInfo], storage).items():
|
||||
with deferred_types.DeletionInfo.deferred_storage.as_dict(group) as storage:
|
||||
for key, info in typing.cast(dict[str, deferred_types.DeletionInfo], storage).items():
|
||||
info.next_check = sql_now() - datetime.timedelta(seconds=1)
|
||||
storage[key] = info
|
||||
|
||||
def count_entries_on_storage(self, group: str) -> int:
|
||||
with deferred_deletion.DeletionInfo.deferred_storage.as_dict(group) as storage:
|
||||
with deferred_types.DeletionInfo.deferred_storage.as_dict(group) as storage:
|
||||
return len(storage)
|
||||
|
||||
@contextlib.contextmanager
|
||||
@ -75,13 +75,13 @@ class DynamicDeferredDeleteTest(UDSTransactionTestCase):
|
||||
is_running: typing.Union[None, typing.Callable[..., bool]] = None,
|
||||
must_stop_before_deletion: bool = False,
|
||||
should_try_soft_shutdown: bool = False,
|
||||
) -> typing.Iterator[tuple[mock.MagicMock, dict[str, dict[str, deferred_deletion.DeletionInfo]]]]:
|
||||
) -> typing.Iterator[tuple[mock.MagicMock, dict[str, dict[str, deferred_types.DeletionInfo]]]]:
|
||||
"""
|
||||
Patch the storage to use a dict instead of the real storage
|
||||
|
||||
This is useful to test the worker without touching the real storage
|
||||
"""
|
||||
dct: dict[str, dict[str, deferred_deletion.DeletionInfo]] = {}
|
||||
dct: dict[str, dict[str, deferred_types.DeletionInfo]] = {}
|
||||
instance = mock.MagicMock()
|
||||
instance_db_obj = mock.MagicMock(uuid='service1')
|
||||
instance_db_obj.get_instance.return_value = instance
|
||||
@ -99,13 +99,13 @@ class DynamicDeferredDeleteTest(UDSTransactionTestCase):
|
||||
with mock.patch('uds.models.Service.objects') as objs:
|
||||
objs.get.return_value = instance.db_obj()
|
||||
with mock.patch(
|
||||
'uds.workers.deferred_deletion.DeletionInfo.deferred_storage'
|
||||
'uds.core.types.deferred_deletion.DeletionInfo.deferred_storage'
|
||||
) as storage:
|
||||
|
||||
@contextlib.contextmanager
|
||||
def _as_dict(
|
||||
group: str, *args: typing.Any, **kwargs: typing.Any
|
||||
) -> typing.Iterator[dict[str, deferred_deletion.DeletionInfo]]:
|
||||
) -> typing.Iterator[dict[str, deferred_types.DeletionInfo]]:
|
||||
if group not in dct:
|
||||
dct[group] = {}
|
||||
yield dct[group]
|
||||
@ -147,6 +147,7 @@ class DynamicDeferredDeleteTest(UDSTransactionTestCase):
|
||||
|
||||
self.assertEqual(fixtures.DynamicTestingServiceForDeferredDeletion.mock.execute_delete.call_count, 16)
|
||||
self.assertEqual(fixtures.DynamicTestingServiceForDeferredDeletion.mock.is_deleted.call_count, 0)
|
||||
self.assertEqual(fixtures.DynamicTestingServiceForDeferredDeletion.mock.notify_deleted.call_count, 0)
|
||||
# Reset mock
|
||||
fixtures.DynamicTestingServiceForDeferredDeletion.mock.reset_mock()
|
||||
|
||||
@ -156,13 +157,13 @@ class DynamicDeferredDeleteTest(UDSTransactionTestCase):
|
||||
self.assertEqual(self.count_entries_on_storage(deferred_types.DeferredStorageGroup.STOPPING), 0)
|
||||
|
||||
# Storage db should have 16 entries
|
||||
with deferred_deletion.DeletionInfo.deferred_storage.as_dict(
|
||||
with deferred_types.DeletionInfo.deferred_storage.as_dict(
|
||||
deferred_types.DeferredStorageGroup.DELETING
|
||||
) as deleting:
|
||||
self.assertEqual(len(deleting), 16)
|
||||
for key, info in typing.cast(dict[str, deferred_deletion.DeletionInfo], deleting).items():
|
||||
for key, info in typing.cast(dict[str, deferred_types.DeletionInfo], deleting).items():
|
||||
now = sql_now()
|
||||
self.assertIsInstance(info, deferred_deletion.DeletionInfo)
|
||||
self.assertIsInstance(info, deferred_types.DeletionInfo)
|
||||
self.assertEqual(key, f'{info.service_uuid}_{info.vmid}')
|
||||
self.assertLessEqual(info.created, now)
|
||||
self.assertGreaterEqual(info.next_check, now)
|
||||
@ -171,13 +172,13 @@ class DynamicDeferredDeleteTest(UDSTransactionTestCase):
|
||||
|
||||
# Instantiate the Job
|
||||
job = deferred_deletion.DeferredDeletionWorker(environment=mock.MagicMock())
|
||||
to_delete = deferred_deletion.DeletionInfo.get_from_storage(deferred_types.DeferredStorageGroup.TO_DELETE)
|
||||
to_delete = deferred_types.DeletionInfo.get_from_storage(deferred_types.DeferredStorageGroup.TO_DELETE)
|
||||
# Should be empty, both services and infos
|
||||
self.assertEqual(len(to_delete[0]), 0)
|
||||
self.assertEqual(len(to_delete[1]), 0)
|
||||
|
||||
# Now, get from deleting
|
||||
deleting = deferred_deletion.DeletionInfo.get_from_storage(deferred_types.DeferredStorageGroup.DELETING)
|
||||
deleting = deferred_types.DeletionInfo.get_from_storage(deferred_types.DeferredStorageGroup.DELETING)
|
||||
# Should have o services and infos also, because last_check has been too soon
|
||||
self.assertEqual(len(deleting[0]), 0)
|
||||
self.assertEqual(len(deleting[1]), 0)
|
||||
@ -188,25 +189,25 @@ class DynamicDeferredDeleteTest(UDSTransactionTestCase):
|
||||
# Now, get from deleting again, should have all services and infos
|
||||
# OVerride MAX_DELETIONS_AT_ONCE to get only 1 entries
|
||||
deferred_consts.MAX_DELETIONS_AT_ONCE = 1
|
||||
services_1, key_info_1 = deferred_deletion.DeletionInfo.get_from_storage(
|
||||
services_1, key_info_1 = deferred_types.DeletionInfo.get_from_storage(
|
||||
deferred_types.DeferredStorageGroup.DELETING
|
||||
)
|
||||
self.assertEqual(len(services_1), 1)
|
||||
self.assertEqual(len(key_info_1), 1)
|
||||
# And should rest only 15 on storage
|
||||
with deferred_deletion.DeletionInfo.deferred_storage.as_dict(
|
||||
with deferred_types.DeletionInfo.deferred_storage.as_dict(
|
||||
deferred_types.DeferredStorageGroup.DELETING
|
||||
) as deleting:
|
||||
self.assertEqual(len(deleting), 15)
|
||||
deferred_consts.MAX_DELETIONS_AT_ONCE = 16
|
||||
services_2, key_info_2 = deferred_deletion.DeletionInfo.get_from_storage(
|
||||
services_2, key_info_2 = deferred_types.DeletionInfo.get_from_storage(
|
||||
deferred_types.DeferredStorageGroup.DELETING
|
||||
)
|
||||
self.assertEqual(len(services_2), 8) # 8 services must be returned
|
||||
self.assertEqual(len(key_info_2), 15) # And 15 entries
|
||||
|
||||
# Re-store all DELETING_GROUP entries
|
||||
with deferred_deletion.DeletionInfo.deferred_storage.as_dict(
|
||||
with deferred_types.DeletionInfo.deferred_storage.as_dict(
|
||||
deferred_types.DeferredStorageGroup.DELETING
|
||||
) as deleting:
|
||||
for info in itertools.chain(key_info_1, key_info_2):
|
||||
@ -220,6 +221,7 @@ class DynamicDeferredDeleteTest(UDSTransactionTestCase):
|
||||
|
||||
# Should have called is_deleted 16 times
|
||||
self.assertEqual(fixtures.DynamicTestingServiceForDeferredDeletion.mock.is_deleted.call_count, 16)
|
||||
self.assertEqual(fixtures.DynamicTestingServiceForDeferredDeletion.mock.notify_deleted.call_count, 16)
|
||||
# And should have removed all entries from deleting, because is_deleted returns True
|
||||
self.assertEqual(self.count_entries_on_storage(deferred_types.DeferredStorageGroup.DELETING), 0)
|
||||
self.assertEqual(self.count_entries_on_storage(deferred_types.DeferredStorageGroup.TO_DELETE), 0)
|
||||
@ -254,7 +256,7 @@ class DynamicDeferredDeleteTest(UDSTransactionTestCase):
|
||||
self.assertEqual(self.count_entries_on_storage(deferred_types.DeferredStorageGroup.TO_DELETE), 1)
|
||||
|
||||
job = deferred_deletion.DeferredDeletionWorker(environment=mock.MagicMock())
|
||||
to_delete = deferred_deletion.DeletionInfo.get_from_storage(deferred_types.DeferredStorageGroup.TO_DELETE)
|
||||
to_delete = deferred_types.DeletionInfo.get_from_storage(deferred_types.DeferredStorageGroup.TO_DELETE)
|
||||
# Should be empty, both services and infos
|
||||
self.assertEqual(len(to_delete[0]), 0)
|
||||
self.assertEqual(len(to_delete[1]), 0)
|
||||
@ -263,14 +265,14 @@ class DynamicDeferredDeleteTest(UDSTransactionTestCase):
|
||||
self.set_last_check_expired()
|
||||
|
||||
# Now, get from deleting again, should have all services and infos
|
||||
services, info = deferred_deletion.DeletionInfo.get_from_storage(deferred_types.DeferredStorageGroup.TO_DELETE)
|
||||
services, info = deferred_types.DeletionInfo.get_from_storage(deferred_types.DeferredStorageGroup.TO_DELETE)
|
||||
self.assertEqual(len(services), 1)
|
||||
self.assertEqual(len(info), 1)
|
||||
# now, db should be empty
|
||||
self.assertEqual(self.count_entries_on_storage(deferred_types.DeferredStorageGroup.TO_DELETE), 0)
|
||||
|
||||
# Re store the entry
|
||||
with deferred_deletion.DeletionInfo.deferred_storage.as_dict(
|
||||
with deferred_types.DeletionInfo.deferred_storage.as_dict(
|
||||
deferred_types.DeferredStorageGroup.TO_DELETE
|
||||
) as to_delete:
|
||||
for info in info:
|
||||
@ -287,27 +289,29 @@ class DynamicDeferredDeleteTest(UDSTransactionTestCase):
|
||||
|
||||
instance.mock.reset_mock()
|
||||
# And should have one entry in deleting
|
||||
with deferred_deletion.DeletionInfo.deferred_storage.as_dict(
|
||||
with deferred_types.DeletionInfo.deferred_storage.as_dict(
|
||||
deferred_types.DeferredStorageGroup.DELETING
|
||||
) as deleting:
|
||||
self.assertEqual(len(deleting), 1)
|
||||
for key, info in typing.cast(dict[str, deferred_deletion.DeletionInfo], deleting).items():
|
||||
for key, info in typing.cast(dict[str, deferred_types.DeletionInfo], deleting).items():
|
||||
now = sql_now()
|
||||
self.assertIsInstance(info, deferred_deletion.DeletionInfo)
|
||||
self.assertIsInstance(info, deferred_types.DeletionInfo)
|
||||
self.assertEqual(key, f'{info.service_uuid}_{info.vmid}')
|
||||
self.assertLessEqual(info.created, now)
|
||||
self.assertGreaterEqual(info.next_check, now)
|
||||
self.assertEqual(info.fatal_retries, 0)
|
||||
self.assertEqual(info.total_retries, 1)
|
||||
|
||||
# And no call to is_deleted
|
||||
# And no call to is_deleted nor notify_deleted
|
||||
instance.mock.is_deleted.assert_not_called()
|
||||
instance.mock.notify_deleted.assert_not_called()
|
||||
|
||||
# Executing now, should do nothing because last_check is not expired
|
||||
job.run()
|
||||
|
||||
# Should have called is_deleted 0 times, due to last_check not expired
|
||||
instance.mock.is_deleted.assert_not_called()
|
||||
instance.mock.notify_deleted.assert_not_called()
|
||||
|
||||
self.set_last_check_expired() # So deleting gets processed
|
||||
|
||||
@ -315,6 +319,7 @@ class DynamicDeferredDeleteTest(UDSTransactionTestCase):
|
||||
|
||||
# Now should have called is_deleted once, and no entries in deleting nor to_delete
|
||||
instance.mock.is_deleted.assert_called_once_with('vmid_1')
|
||||
instance.mock.notify_deleted.assert_called_once_with('vmid_1')
|
||||
self.assertEqual(self.count_entries_on_storage(deferred_types.DeferredStorageGroup.DELETING), 0)
|
||||
self.assertEqual(self.count_entries_on_storage(deferred_types.DeferredStorageGroup.TO_DELETE), 0)
|
||||
|
||||
@ -342,9 +347,11 @@ class DynamicDeferredDeleteTest(UDSTransactionTestCase):
|
||||
instance.is_deleted.assert_called_once_with('vmid1')
|
||||
# if is_deleted returns True, should have removed the entry
|
||||
if is_deleted:
|
||||
instance.notify_deleted.assert_called_once_with('vmid1')
|
||||
self.assertEqual(self.count_entries_on_storage(deferred_types.DeferredStorageGroup.TO_DELETE), 0)
|
||||
self.assertEqual(self.count_entries_on_storage(deferred_types.DeferredStorageGroup.DELETING), 0)
|
||||
else:
|
||||
instance.notify_deleted.assert_not_called()
|
||||
self.assertEqual(self.count_entries_on_storage(deferred_types.DeferredStorageGroup.TO_DELETE), 0)
|
||||
self.assertEqual(self.count_entries_on_storage(deferred_types.DeferredStorageGroup.DELETING), 1)
|
||||
# Also, info should have been updated
|
||||
@ -438,7 +445,9 @@ class DynamicDeferredDeleteTest(UDSTransactionTestCase):
|
||||
job = deferred_deletion.DeferredDeletionWorker(environment=mock.MagicMock())
|
||||
job.run()
|
||||
|
||||
# Should have called is_deleted once
|
||||
# Should have called is_deleted once and notify_deleted not called
|
||||
instance.is_deleted.assert_called_once_with('vmid1')
|
||||
instance.notify_deleted.assert_not_called()
|
||||
|
||||
if isinstance(error, gen_exceptions.RetryableError):
|
||||
self.assertEqual(info.fatal_retries, 0)
|
||||
|
@ -488,7 +488,9 @@ class DynamicPublication(services.Publication, autoserializable.AutoSerializable
|
||||
"""
|
||||
This method is called to check if the service is removed
|
||||
"""
|
||||
return types.states.TaskState.FINISHED
|
||||
if self.service().is_delete_running(self, self._vmid) is False:
|
||||
return types.states.TaskState.FINISHED
|
||||
return types.states.TaskState.RUNNING
|
||||
|
||||
def op_delete_completed_checker(self) -> types.states.TaskState:
|
||||
"""
|
||||
|
@ -213,6 +213,11 @@ class DynamicService(services.Service, abc.ABC): # pylint: disable=too-many-pub
|
||||
Removes the machine, or queues it for removal, or whatever :)
|
||||
Use the caller_instance to notify anything if needed, or to identify caller
|
||||
"""
|
||||
# Store the deletion has started for future reference
|
||||
with self.storage.as_dict() as storage:
|
||||
# Store deleting vmid
|
||||
storage[f'deleting_{vmid}'] = True
|
||||
|
||||
DeferredDeletionWorker.add(self, vmid)
|
||||
|
||||
# Method for deferred deletion
|
||||
@ -233,6 +238,22 @@ class DynamicService(services.Service, abc.ABC): # pylint: disable=too-many-pub
|
||||
Default implementation is return True always
|
||||
"""
|
||||
return True
|
||||
|
||||
def notify_deleted(self, vmid: str) -> None:
|
||||
"""
|
||||
Called when the deferred deletion has been done
|
||||
"""
|
||||
# Remove the deletion started flag
|
||||
with self.storage.as_dict() as storage:
|
||||
del storage[f'deleting_{vmid}']
|
||||
|
||||
def is_delete_running(self, caller_instance: typing.Optional['DynamicUserService | DynamicPublication'], vmid: str) -> bool:
|
||||
"""
|
||||
Checks if the deferred deletion of a machine is running
|
||||
Default implementation is return False always
|
||||
"""
|
||||
with self.storage.as_dict() as storage:
|
||||
return f'deleting_{vmid}' in storage
|
||||
|
||||
def should_maintain_on_error(self) -> bool:
|
||||
if self.has_field('maintain_on_error'): # If has been defined on own class...
|
||||
|
@ -646,6 +646,10 @@ class DynamicUserService(services.UserService, autoserializable.AutoSerializable
|
||||
def op_delete(self) -> None:
|
||||
"""
|
||||
This method is called when the service is removed
|
||||
|
||||
Note:
|
||||
If you override this method, probably you will need to override "op_delete_completed" too
|
||||
Because the op_delete_completed method uses the information generated by service().delete
|
||||
"""
|
||||
self.service().delete(self, self._vmid)
|
||||
|
||||
@ -807,7 +811,9 @@ class DynamicUserService(services.UserService, autoserializable.AutoSerializable
|
||||
"""
|
||||
This method is called to check if the service is removed
|
||||
"""
|
||||
return types.states.TaskState.FINISHED
|
||||
if self.service().is_delete_running(self, self._vmid) is False:
|
||||
return types.states.TaskState.FINISHED
|
||||
return types.states.TaskState.RUNNING
|
||||
|
||||
def op_delete_completed_checker(self) -> types.states.TaskState:
|
||||
"""
|
||||
|
@ -30,8 +30,23 @@
|
||||
"""
|
||||
Author: Adolfo Gómez, dkmaster at dkmon dot com
|
||||
"""
|
||||
import collections.abc
|
||||
import dataclasses
|
||||
import datetime
|
||||
import logging
|
||||
import enum
|
||||
|
||||
from uds.core.consts import deferred_deletion as consts
|
||||
from uds.core.types import deferred_deletion as types
|
||||
from uds.core.util import storage
|
||||
from uds.core.util.model import sql_now
|
||||
from uds.models import Service
|
||||
import typing
|
||||
|
||||
if typing.TYPE_CHECKING:
|
||||
from uds.core.services.generics.dynamic.service import DynamicService
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
class DeferredStorageGroup(enum.StrEnum):
|
||||
TO_STOP = 'to_stop'
|
||||
@ -42,3 +57,122 @@ class DeferredStorageGroup(enum.StrEnum):
|
||||
@staticmethod
|
||||
def from_str(value: str) -> 'DeferredStorageGroup':
|
||||
return DeferredStorageGroup(value)
|
||||
|
||||
|
||||
@dataclasses.dataclass
|
||||
class DeletionInfo:
|
||||
vmid: str
|
||||
created: datetime.datetime
|
||||
next_check: datetime.datetime
|
||||
service_uuid: str # uuid of the service that owns this vmid (not the pool, but the service)
|
||||
fatal_retries: int = 0 # Fatal error retries
|
||||
total_retries: int = 0 # Total retries
|
||||
retries: int = 0 # Retries to stop again or to delete again in STOPPING_GROUP or DELETING_GROUP
|
||||
|
||||
deferred_storage: typing.ClassVar[storage.Storage] = storage.Storage('deferdel_worker')
|
||||
|
||||
@property
|
||||
def key(self) -> str:
|
||||
return DeletionInfo.generate_key(self.service_uuid, self.vmid)
|
||||
|
||||
def sync_to_storage(self, group: types.DeferredStorageGroup) -> None:
|
||||
"""
|
||||
Ensures that this object is stored on the storage
|
||||
If exists, it will be updated, if not, it will be created
|
||||
"""
|
||||
with DeletionInfo.deferred_storage.as_dict(group, atomic=True) as storage_dict:
|
||||
storage_dict[self.key] = self
|
||||
|
||||
# For reporting
|
||||
def as_csv(self) -> str:
|
||||
return f'{self.vmid},{self.created},{self.next_check},{self.service_uuid},{self.fatal_retries},{self.total_retries},{self.retries}'
|
||||
|
||||
@staticmethod
|
||||
def next_execution_calculator(*, fatal: bool = False, delay_rate: float = 1.0) -> datetime.datetime:
|
||||
"""
|
||||
Returns the next check time for a deletion operation
|
||||
"""
|
||||
return sql_now() + (
|
||||
consts.CHECK_INTERVAL * (consts.FATAL_ERROR_INTERVAL_MULTIPLIER if fatal else 1) * delay_rate
|
||||
)
|
||||
|
||||
@staticmethod
|
||||
def generate_key(service_uuid: str, vmid: str) -> str:
|
||||
return f'{service_uuid}_{vmid}'
|
||||
|
||||
@staticmethod
|
||||
def create_on_storage(group: str, vmid: str, service_uuid: str, delay_rate: float = 1.0) -> None:
|
||||
with DeletionInfo.deferred_storage.as_dict(group, atomic=True) as storage_dict:
|
||||
storage_dict[DeletionInfo.generate_key(service_uuid, vmid)] = DeletionInfo(
|
||||
vmid=vmid,
|
||||
created=sql_now(),
|
||||
next_check=DeletionInfo.next_execution_calculator(delay_rate=delay_rate),
|
||||
service_uuid=service_uuid,
|
||||
# fatal, total an retries are 0 by default
|
||||
)
|
||||
|
||||
@staticmethod
|
||||
def get_from_storage(
|
||||
group: types.DeferredStorageGroup,
|
||||
) -> tuple[dict[str, 'DynamicService'], list['DeletionInfo']]:
|
||||
"""
|
||||
Get a list of objects to be processed from storage
|
||||
|
||||
Note:
|
||||
This method will remove the objects from storage, so if needed, has to be readded
|
||||
This is so we can release locks as soon as possible
|
||||
"""
|
||||
count = 0
|
||||
infos: list[DeletionInfo] = []
|
||||
|
||||
services: dict[str, 'DynamicService'] = {}
|
||||
|
||||
# First, get ownership of to_delete objects to be processed
|
||||
# We do this way to release db locks as soon as possible
|
||||
now = sql_now()
|
||||
with DeletionInfo.deferred_storage.as_dict(group, atomic=True) as storage_dict:
|
||||
for key, info in sorted(
|
||||
typing.cast(collections.abc.Iterable[tuple[str, DeletionInfo]], storage_dict.items()),
|
||||
key=lambda x: x[1].next_check,
|
||||
):
|
||||
# if max retries reached, remove it
|
||||
if info.total_retries >= consts.MAX_RETRAYABLE_ERROR_RETRIES:
|
||||
logger.error(
|
||||
'Too many retries deleting %s from service %s, removing from deferred deletion',
|
||||
info.vmid,
|
||||
info.service_uuid,
|
||||
)
|
||||
del storage_dict[key]
|
||||
continue
|
||||
|
||||
if info.next_check > now: # If not time to process yet, skip
|
||||
continue
|
||||
try:
|
||||
if info.service_uuid not in services:
|
||||
services[info.service_uuid] = typing.cast(
|
||||
'DynamicService', Service.objects.get(uuid=info.service_uuid).get_instance()
|
||||
)
|
||||
except Exception as e:
|
||||
logger.error('Could not get service instance for %s: %s', info.service_uuid, e)
|
||||
del storage_dict[key]
|
||||
continue
|
||||
|
||||
if (count := count + 1) > consts.MAX_DELETIONS_AT_ONCE:
|
||||
break
|
||||
|
||||
del storage_dict[key] # Remove from storage, being processed
|
||||
|
||||
# Only add if not too many retries already
|
||||
infos.append(info)
|
||||
return services, infos
|
||||
|
||||
@staticmethod
|
||||
def count_from_storage(group: types.DeferredStorageGroup) -> int:
|
||||
# Counts the total number of objects in storage
|
||||
with DeletionInfo.deferred_storage.as_dict(group) as storage_dict:
|
||||
return len(storage_dict)
|
||||
|
||||
|
||||
@staticmethod
|
||||
def csv_header() -> str:
|
||||
return 'vmid,created,next_check,service_uuid,fatal_retries,total_retries,retries'
|
||||
|
@ -29,16 +29,11 @@
|
||||
"""
|
||||
Author: Adolfo Gómez, dkmaster at dkmon dot com
|
||||
"""
|
||||
import dataclasses
|
||||
import collections.abc
|
||||
import datetime
|
||||
import typing
|
||||
import logging
|
||||
|
||||
from uds.models import Service
|
||||
from uds.core.util.model import sql_now
|
||||
from uds.core.jobs import Job
|
||||
from uds.core.util import storage, utils
|
||||
from uds.core.util import utils
|
||||
from uds.core.consts import deferred_deletion as consts
|
||||
from uds.core.types import deferred_deletion as types
|
||||
|
||||
@ -60,124 +55,6 @@ def execution_timer() -> 'utils.ExecutionTimer':
|
||||
)
|
||||
|
||||
|
||||
def next_execution_calculator(*, fatal: bool = False, delay_rate: float = 1.0) -> datetime.datetime:
|
||||
"""
|
||||
Returns the next check time for a deletion operation
|
||||
"""
|
||||
return sql_now() + (
|
||||
consts.CHECK_INTERVAL * (consts.FATAL_ERROR_INTERVAL_MULTIPLIER if fatal else 1) * delay_rate
|
||||
)
|
||||
|
||||
|
||||
@dataclasses.dataclass
|
||||
class DeletionInfo:
|
||||
vmid: str
|
||||
created: datetime.datetime
|
||||
next_check: datetime.datetime
|
||||
service_uuid: str # uuid of the service that owns this vmid (not the pool, but the service)
|
||||
fatal_retries: int = 0 # Fatal error retries
|
||||
total_retries: int = 0 # Total retries
|
||||
retries: int = 0 # Retries to stop again or to delete again in STOPPING_GROUP or DELETING_GROUP
|
||||
|
||||
deferred_storage: typing.ClassVar[storage.Storage] = storage.Storage('deferdel_worker')
|
||||
|
||||
@property
|
||||
def key(self) -> str:
|
||||
return DeletionInfo.generate_key(self.service_uuid, self.vmid)
|
||||
|
||||
def sync_to_storage(self, group: types.DeferredStorageGroup) -> None:
|
||||
"""
|
||||
Ensures that this object is stored on the storage
|
||||
If exists, it will be updated, if not, it will be created
|
||||
"""
|
||||
with DeletionInfo.deferred_storage.as_dict(group, atomic=True) as storage_dict:
|
||||
storage_dict[self.key] = self
|
||||
|
||||
# For reporting
|
||||
def as_csv(self) -> str:
|
||||
return f'{self.vmid},{self.created},{self.next_check},{self.service_uuid},{self.fatal_retries},{self.total_retries},{self.retries}'
|
||||
|
||||
@staticmethod
|
||||
def generate_key(service_uuid: str, vmid: str) -> str:
|
||||
return f'{service_uuid}_{vmid}'
|
||||
|
||||
@staticmethod
|
||||
def create_on_storage(group: str, vmid: str, service_uuid: str, delay_rate: float = 1.0) -> None:
|
||||
with DeletionInfo.deferred_storage.as_dict(group, atomic=True) as storage_dict:
|
||||
storage_dict[DeletionInfo.generate_key(service_uuid, vmid)] = DeletionInfo(
|
||||
vmid=vmid,
|
||||
created=sql_now(),
|
||||
next_check=next_execution_calculator(delay_rate=delay_rate),
|
||||
service_uuid=service_uuid,
|
||||
# fatal, total an retries are 0 by default
|
||||
)
|
||||
|
||||
@staticmethod
|
||||
def get_from_storage(
|
||||
group: types.DeferredStorageGroup,
|
||||
) -> tuple[dict[str, 'DynamicService'], list['DeletionInfo']]:
|
||||
"""
|
||||
Get a list of objects to be processed from storage
|
||||
|
||||
Note:
|
||||
This method will remove the objects from storage, so if needed, has to be readded
|
||||
This is so we can release locks as soon as possible
|
||||
"""
|
||||
count = 0
|
||||
infos: list[DeletionInfo] = []
|
||||
|
||||
services: dict[str, 'DynamicService'] = {}
|
||||
|
||||
# First, get ownership of to_delete objects to be processed
|
||||
# We do this way to release db locks as soon as possible
|
||||
now = sql_now()
|
||||
with DeletionInfo.deferred_storage.as_dict(group, atomic=True) as storage_dict:
|
||||
for key, info in sorted(
|
||||
typing.cast(collections.abc.Iterable[tuple[str, DeletionInfo]], storage_dict.items()),
|
||||
key=lambda x: x[1].next_check,
|
||||
):
|
||||
# if max retries reached, remove it
|
||||
if info.total_retries >= consts.MAX_RETRAYABLE_ERROR_RETRIES:
|
||||
logger.error(
|
||||
'Too many retries deleting %s from service %s, removing from deferred deletion',
|
||||
info.vmid,
|
||||
info.service_uuid,
|
||||
)
|
||||
del storage_dict[key]
|
||||
continue
|
||||
|
||||
if info.next_check > now: # If not time to process yet, skip
|
||||
continue
|
||||
try:
|
||||
if info.service_uuid not in services:
|
||||
services[info.service_uuid] = typing.cast(
|
||||
'DynamicService', Service.objects.get(uuid=info.service_uuid).get_instance()
|
||||
)
|
||||
except Exception as e:
|
||||
logger.error('Could not get service instance for %s: %s', info.service_uuid, e)
|
||||
del storage_dict[key]
|
||||
continue
|
||||
|
||||
if (count := count + 1) > consts.MAX_DELETIONS_AT_ONCE:
|
||||
break
|
||||
|
||||
del storage_dict[key] # Remove from storage, being processed
|
||||
|
||||
# Only add if not too many retries already
|
||||
infos.append(info)
|
||||
return services, infos
|
||||
|
||||
@staticmethod
|
||||
def count_from_storage(group: types.DeferredStorageGroup) -> int:
|
||||
# Counts the total number of objects in storage
|
||||
with DeletionInfo.deferred_storage.as_dict(group) as storage_dict:
|
||||
return len(storage_dict)
|
||||
|
||||
@staticmethod
|
||||
def csv_header() -> str:
|
||||
return 'vmid,created,next_check,service_uuid,fatal_retries,total_retries,retries'
|
||||
|
||||
|
||||
class DeferredDeletionWorker(Job):
|
||||
frecuency = 7 # Frequency for this job, in seconds
|
||||
friendly_name = 'Deferred deletion runner'
|
||||
@ -196,14 +73,14 @@ class DeferredDeletionWorker(Job):
|
||||
service.shutdown(None, vmid)
|
||||
else:
|
||||
service.stop(None, vmid)
|
||||
DeletionInfo.create_on_storage(
|
||||
types.DeletionInfo.create_on_storage(
|
||||
types.DeferredStorageGroup.STOPPING, vmid, service.db_obj().uuid
|
||||
)
|
||||
return
|
||||
|
||||
service.execute_delete(vmid)
|
||||
# If this takes too long, we will delay the next check a bit
|
||||
DeletionInfo.create_on_storage(
|
||||
types.DeletionInfo.create_on_storage(
|
||||
types.DeferredStorageGroup.DELETING,
|
||||
vmid,
|
||||
service.db_obj().uuid,
|
||||
@ -215,7 +92,7 @@ class DeferredDeletionWorker(Job):
|
||||
logger.warning(
|
||||
'Could not delete %s from service %s: %s. Retrying later.', vmid, service.db_obj().name, e
|
||||
)
|
||||
DeletionInfo.create_on_storage(
|
||||
types.DeletionInfo.create_on_storage(
|
||||
types.DeferredStorageGroup.TO_DELETE,
|
||||
vmid,
|
||||
service.db_obj().uuid,
|
||||
@ -224,16 +101,16 @@ class DeferredDeletionWorker(Job):
|
||||
return
|
||||
else:
|
||||
if service.must_stop_before_deletion:
|
||||
DeletionInfo.create_on_storage(types.DeferredStorageGroup.TO_STOP, vmid, service.db_obj().uuid)
|
||||
types.DeletionInfo.create_on_storage(types.DeferredStorageGroup.TO_STOP, vmid, service.db_obj().uuid)
|
||||
else:
|
||||
DeletionInfo.create_on_storage(
|
||||
types.DeletionInfo.create_on_storage(
|
||||
types.DeferredStorageGroup.TO_DELETE, vmid, service.db_obj().uuid
|
||||
)
|
||||
return
|
||||
|
||||
def _process_exception(
|
||||
self,
|
||||
info: DeletionInfo,
|
||||
info: types.DeletionInfo,
|
||||
to_group: types.DeferredStorageGroup,
|
||||
services: dict[str, 'DynamicService'],
|
||||
e: Exception,
|
||||
@ -253,7 +130,7 @@ class DeferredDeletionWorker(Job):
|
||||
)
|
||||
|
||||
if not is_retryable:
|
||||
info.next_check = next_execution_calculator(fatal=True, delay_rate=delay_rate)
|
||||
info.next_check = types.DeletionInfo.next_execution_calculator(fatal=True, delay_rate=delay_rate)
|
||||
info.fatal_retries += 1
|
||||
if info.fatal_retries >= consts.MAX_FATAL_ERROR_RETRIES:
|
||||
logger.error(
|
||||
@ -262,7 +139,7 @@ class DeferredDeletionWorker(Job):
|
||||
services[info.service_uuid].db_obj().name,
|
||||
)
|
||||
return # Do not readd it
|
||||
info.next_check = next_execution_calculator(delay_rate=delay_rate)
|
||||
info.next_check = types.DeletionInfo.next_execution_calculator(delay_rate=delay_rate)
|
||||
info.total_retries += 1
|
||||
if info.total_retries >= consts.MAX_RETRAYABLE_ERROR_RETRIES:
|
||||
logger.error(
|
||||
@ -274,7 +151,7 @@ class DeferredDeletionWorker(Job):
|
||||
info.sync_to_storage(to_group)
|
||||
|
||||
def process_to_stop(self) -> None:
|
||||
services, to_stop = DeletionInfo.get_from_storage(types.DeferredStorageGroup.TO_STOP)
|
||||
services, to_stop = types.DeletionInfo.get_from_storage(types.DeferredStorageGroup.TO_STOP)
|
||||
logger.debug('Processing %s to stop', to_stop)
|
||||
|
||||
# Now process waiting stops
|
||||
@ -296,7 +173,7 @@ class DeferredDeletionWorker(Job):
|
||||
info.retries = 0 # Reset retries
|
||||
service.stop(None, info.vmid) # Always try to stop it if we have tried before
|
||||
|
||||
info.next_check = next_execution_calculator(delay_rate=exec_time.delay_rate)
|
||||
info.next_check = types.DeletionInfo.next_execution_calculator(delay_rate=exec_time.delay_rate)
|
||||
info.sync_to_storage(types.DeferredStorageGroup.STOPPING)
|
||||
else:
|
||||
# Do not update last_check to shutdown it asap, was not running after all
|
||||
@ -307,7 +184,7 @@ class DeferredDeletionWorker(Job):
|
||||
)
|
||||
|
||||
def process_stopping(self) -> None:
|
||||
services, stopping = DeletionInfo.get_from_storage(types.DeferredStorageGroup.STOPPING)
|
||||
services, stopping = types.DeletionInfo.get_from_storage(types.DeferredStorageGroup.STOPPING)
|
||||
logger.debug('Processing %s stopping', stopping)
|
||||
|
||||
# Now process waiting for finishing stops
|
||||
@ -317,17 +194,17 @@ class DeferredDeletionWorker(Job):
|
||||
info.retries += 1
|
||||
if info.retries > consts.RETRIES_TO_RETRY:
|
||||
# If we have tried to stop it, and it has not stopped, add to stop again
|
||||
info.next_check = next_execution_calculator()
|
||||
info.next_check = types.DeletionInfo.next_execution_calculator()
|
||||
info.total_retries += 1
|
||||
info.sync_to_storage(types.DeferredStorageGroup.TO_STOP)
|
||||
continue
|
||||
with exec_time:
|
||||
if services[info.service_uuid].is_running(None, info.vmid):
|
||||
info.next_check = next_execution_calculator(delay_rate=exec_time.delay_rate)
|
||||
info.next_check = types.DeletionInfo.next_execution_calculator(delay_rate=exec_time.delay_rate)
|
||||
info.total_retries += 1
|
||||
info.sync_to_storage(types.DeferredStorageGroup.STOPPING)
|
||||
else:
|
||||
info.next_check = next_execution_calculator(delay_rate=exec_time.delay_rate)
|
||||
info.next_check = types.DeletionInfo.next_execution_calculator(delay_rate=exec_time.delay_rate)
|
||||
info.fatal_retries = info.total_retries = 0
|
||||
info.sync_to_storage(types.DeferredStorageGroup.TO_DELETE)
|
||||
except Exception as e:
|
||||
@ -336,7 +213,7 @@ class DeferredDeletionWorker(Job):
|
||||
)
|
||||
|
||||
def process_to_delete(self) -> None:
|
||||
services, to_delete = DeletionInfo.get_from_storage(types.DeferredStorageGroup.TO_DELETE)
|
||||
services, to_delete = types.DeletionInfo.get_from_storage(types.DeferredStorageGroup.TO_DELETE)
|
||||
logger.debug('Processing %s to delete', to_delete)
|
||||
|
||||
# Now process waiting deletions
|
||||
@ -352,7 +229,7 @@ class DeferredDeletionWorker(Job):
|
||||
|
||||
service.execute_delete(info.vmid)
|
||||
# And store it for checking later if it has been deleted, reseting counters
|
||||
info.next_check = next_execution_calculator(delay_rate=exec_time.delay_rate)
|
||||
info.next_check = types.DeletionInfo.next_execution_calculator(delay_rate=exec_time.delay_rate)
|
||||
info.retries = 0
|
||||
info.total_retries += 1
|
||||
info.sync_to_storage(types.DeferredStorageGroup.DELETING)
|
||||
@ -371,7 +248,7 @@ class DeferredDeletionWorker(Job):
|
||||
|
||||
Note: Very similar to process_to_delete, but this one is for objects that are already being deleted
|
||||
"""
|
||||
services, deleting = DeletionInfo.get_from_storage(types.DeferredStorageGroup.DELETING)
|
||||
services, deleting = types.DeletionInfo.get_from_storage(types.DeferredStorageGroup.DELETING)
|
||||
logger.debug('Processing %s deleting', deleting)
|
||||
|
||||
# Now process waiting for finishing deletions
|
||||
@ -381,16 +258,19 @@ class DeferredDeletionWorker(Job):
|
||||
info.retries += 1
|
||||
if info.retries > consts.RETRIES_TO_RETRY:
|
||||
# If we have tried to delete it, and it has not been deleted, add to delete again
|
||||
info.next_check = next_execution_calculator()
|
||||
info.next_check = types.DeletionInfo.next_execution_calculator()
|
||||
info.total_retries += 1
|
||||
info.sync_to_storage(types.DeferredStorageGroup.TO_DELETE)
|
||||
continue
|
||||
with exec_time:
|
||||
# If not finished, readd it for later check
|
||||
if not services[info.service_uuid].is_deleted(info.vmid):
|
||||
info.next_check = next_execution_calculator(delay_rate=exec_time.delay_rate)
|
||||
info.next_check = types.DeletionInfo.next_execution_calculator(delay_rate=exec_time.delay_rate)
|
||||
info.total_retries += 1
|
||||
info.sync_to_storage(types.DeferredStorageGroup.DELETING)
|
||||
else:
|
||||
# Deletion is finished, notify to service
|
||||
services[info.service_uuid].notify_deleted(info.vmid)
|
||||
except Exception as e:
|
||||
self._process_exception(
|
||||
info, types.DeferredStorageGroup.DELETING, services, e, delay_rate=exec_time.delay_rate
|
||||
@ -405,9 +285,9 @@ class DeferredDeletionWorker(Job):
|
||||
# To allow reporting what is on the queues
|
||||
@staticmethod
|
||||
def report(out: typing.TextIO) -> None:
|
||||
out.write(DeletionInfo.csv_header() + '\n')
|
||||
out.write(types.DeletionInfo.csv_header() + '\n')
|
||||
for group in types.DeferredStorageGroup:
|
||||
with DeletionInfo.deferred_storage.as_dict(group) as storage:
|
||||
for _key, info in typing.cast(dict[str, DeletionInfo], storage).items():
|
||||
with types.DeletionInfo.deferred_storage.as_dict(group) as storage:
|
||||
for _key, info in typing.cast(dict[str, types.DeletionInfo], storage).items():
|
||||
out.write(info.as_csv() + '\n')
|
||||
out.write('\n')
|
||||
|
Loading…
x
Reference in New Issue
Block a user