diff --git a/server/src/tests/core/services/generics/test_dynamic_deferred_delete.py b/server/src/tests/core/services/generics/test_dynamic_deferred_delete.py index 62176bc56..9e1e81f36 100644 --- a/server/src/tests/core/services/generics/test_dynamic_deferred_delete.py +++ b/server/src/tests/core/services/generics/test_dynamic_deferred_delete.py @@ -57,13 +57,13 @@ class DynamicDeferredDeleteTest(UDSTransactionTestCase): def set_last_check_expired(self) -> None: for group in deferred_types.DeferredStorageGroup: - with deferred_deletion.DeferredDeletionWorker.deferred_storage.as_dict(group) as storage: + with deferred_deletion.DeletionInfo.deferred_storage.as_dict(group) as storage: for key, info in typing.cast(dict[str, deferred_deletion.DeletionInfo], storage).items(): info.next_check = sql_now() - datetime.timedelta(seconds=1) storage[key] = info def count_entries_on_storage(self, group: str) -> int: - with deferred_deletion.DeferredDeletionWorker.deferred_storage.as_dict(group) as storage: + with deferred_deletion.DeletionInfo.deferred_storage.as_dict(group) as storage: return len(storage) @contextlib.contextmanager @@ -99,7 +99,7 @@ class DynamicDeferredDeleteTest(UDSTransactionTestCase): with mock.patch('uds.models.Service.objects') as objs: objs.get.return_value = instance.db_obj() with mock.patch( - 'uds.workers.deferred_deletion.DeferredDeletionWorker.deferred_storage' + 'uds.workers.deferred_deletion.DeletionInfo.deferred_storage' ) as storage: @contextlib.contextmanager @@ -156,7 +156,7 @@ class DynamicDeferredDeleteTest(UDSTransactionTestCase): self.assertEqual(self.count_entries_on_storage(deferred_types.DeferredStorageGroup.STOPPING), 0) # Storage db should have 16 entries - with deferred_deletion.DeferredDeletionWorker.deferred_storage.as_dict( + with deferred_deletion.DeletionInfo.deferred_storage.as_dict( deferred_types.DeferredStorageGroup.DELETING ) as deleting: self.assertEqual(len(deleting), 16) @@ -194,7 +194,7 @@ class DynamicDeferredDeleteTest(UDSTransactionTestCase): self.assertEqual(len(services_1), 1) self.assertEqual(len(key_info_1), 1) # And should rest only 15 on storage - with deferred_deletion.DeferredDeletionWorker.deferred_storage.as_dict( + with deferred_deletion.DeletionInfo.deferred_storage.as_dict( deferred_types.DeferredStorageGroup.DELETING ) as deleting: self.assertEqual(len(deleting), 15) @@ -206,11 +206,11 @@ class DynamicDeferredDeleteTest(UDSTransactionTestCase): self.assertEqual(len(key_info_2), 15) # And 15 entries # Re-store all DELETING_GROUP entries - with deferred_deletion.DeferredDeletionWorker.deferred_storage.as_dict( + with deferred_deletion.DeletionInfo.deferred_storage.as_dict( deferred_types.DeferredStorageGroup.DELETING ) as deleting: for info in itertools.chain(key_info_1, key_info_2): - deleting[info[0]] = info[1] + deleting[info.key] = info # set MAX_DELETIONS_AT_ONCE to a value bigger than 16 deferred_consts.MAX_DELETIONS_AT_ONCE = 100 @@ -263,18 +263,18 @@ class DynamicDeferredDeleteTest(UDSTransactionTestCase): self.set_last_check_expired() # Now, get from deleting again, should have all services and infos - services, key_info = deferred_deletion.DeletionInfo.get_from_storage(deferred_types.DeferredStorageGroup.TO_DELETE) + services, info = deferred_deletion.DeletionInfo.get_from_storage(deferred_types.DeferredStorageGroup.TO_DELETE) self.assertEqual(len(services), 1) - self.assertEqual(len(key_info), 1) + self.assertEqual(len(info), 1) # now, db should be empty self.assertEqual(self.count_entries_on_storage(deferred_types.DeferredStorageGroup.TO_DELETE), 0) # Re store the entry - with deferred_deletion.DeferredDeletionWorker.deferred_storage.as_dict( + with deferred_deletion.DeletionInfo.deferred_storage.as_dict( deferred_types.DeferredStorageGroup.TO_DELETE ) as to_delete: - for info in key_info: - to_delete[info[0]] = info[1] + for info in info: + to_delete[info.key] = info # Process should move from to_delete to deleting job.run() # process_to_delete and process_deleting @@ -287,7 +287,7 @@ class DynamicDeferredDeleteTest(UDSTransactionTestCase): instance.mock.reset_mock() # And should have one entry in deleting - with deferred_deletion.DeferredDeletionWorker.deferred_storage.as_dict( + with deferred_deletion.DeletionInfo.deferred_storage.as_dict( deferred_types.DeferredStorageGroup.DELETING ) as deleting: self.assertEqual(len(deleting), 1) diff --git a/server/src/uds/core/util/utils.py b/server/src/uds/core/util/utils.py index 2dc18640d..795be85b0 100644 --- a/server/src/uds/core/util/utils.py +++ b/server/src/uds/core/util/utils.py @@ -181,6 +181,21 @@ class ExecutionTimer: _max_delay_rate: float def __init__(self, delay_threshold: float, *, max_delay_rate: float = 4.0) -> None: + """ + Creates a new ExecutionTimer + + Arguments: + delay_threshold {float} -- Threshold for the delay rate, in seconds. + max_delay_rate {float} -- Maximum delay rate, defaults to 4.0 + + Note: + - delay_threshold is the time in seconds that we consider an operation is taking too long + - max_delay_rate is the maximum delay rate, if the operation is taking longer than the threshold, we will + multiply the delay by the delay rate, but at most by the max delay rate + - The delay will be calculated as the elapsed time divided by the threshold, at most the max delay rate + - A value of <= 0.0 will not delay at all, a value of 1.0 will delay as much as the elapsed time, a value of 2.0 + will delay twice the elapsed time, and so on + """ self._start = datetime.datetime.now() self._end = self._start self._running = False @@ -205,7 +220,22 @@ class ExecutionTimer: @property def delay_rate(self) -> float: - if self.elapsed.total_seconds() > self._delay_threshold: + """ + Returns the delay rate based on the elapsed time + Delay rate is a multiplier for the delay time based on the elapsed time + I.e: + - If the elapsed time is 0, the delay rate is 1.0 + - If the delay_threshold is lower or equal to 0, the delay rate is 1.0 + - If the elapsed time is greater than the threshold, the delay rate is the elapsed time divided by the threshold + for example: + * threshold = 2, elapsed = 4, delay rate = 2.0 + * threshold = 2, elapsed = 8, delay rate = 4.0 + - If the delay rate is greater than the max delay rate, the delay rate is the max delay rate + + This allows us to increase the delay for next check based on how long the operation is taking + (the longer it takes, the longer we wait for the next check) + """ + if self._delay_threshold > 0 and self.elapsed.total_seconds() > self._delay_threshold: # Ensure we do not delay too much, at most MAX_DELAY_RATE times return min(self.elapsed.total_seconds() / self._delay_threshold, self._max_delay_rate) return 1.0 diff --git a/server/src/uds/workers/deferred_deletion.py b/server/src/uds/workers/deferred_deletion.py index c7aea219a..59ab6d785 100644 --- a/server/src/uds/workers/deferred_deletion.py +++ b/server/src/uds/workers/deferred_deletion.py @@ -51,6 +51,10 @@ logger = logging.getLogger(__name__) def execution_timer() -> 'utils.ExecutionTimer': + """ + Generates an execution timer for deletion operations + This allows to delay the next check based on how long the operation took + """ return utils.ExecutionTimer( delay_threshold=consts.OPERATION_DELAY_THRESHOLD, max_delay_rate=consts.MAX_DELAY_RATE ) @@ -75,24 +79,32 @@ class DeletionInfo: total_retries: int = 0 # Total retries retries: int = 0 # Retries to stop again or to delete again in STOPPING_GROUP or DELETING_GROUP + deferred_storage: typing.ClassVar[storage.Storage] = storage.Storage('deferdel_worker') + + @property + def key(self) -> str: + return DeletionInfo.generate_key(self.service_uuid, self.vmid) + def sync_to_storage(self, group: types.DeferredStorageGroup) -> None: """ Ensures that this object is stored on the storage If exists, it will be updated, if not, it will be created """ - unique_key = f'{self.service_uuid}_{self.vmid}' - with DeferredDeletionWorker.deferred_storage.as_dict(group, atomic=True) as storage_dict: - storage_dict[unique_key] = self + with DeletionInfo.deferred_storage.as_dict(group, atomic=True) as storage_dict: + storage_dict[self.key] = self # For reporting def as_csv(self) -> str: return f'{self.vmid},{self.created},{self.next_check},{self.service_uuid},{self.fatal_retries},{self.total_retries},{self.retries}' + @staticmethod + def generate_key(service_uuid: str, vmid: str) -> str: + return f'{service_uuid}_{vmid}' + @staticmethod def create_on_storage(group: str, vmid: str, service_uuid: str, delay_rate: float = 1.0) -> None: - unique_key = f'{service_uuid}_{vmid}' - with DeferredDeletionWorker.deferred_storage.as_dict(group, atomic=True) as storage_dict: - storage_dict[unique_key] = DeletionInfo( + with DeletionInfo.deferred_storage.as_dict(group, atomic=True) as storage_dict: + storage_dict[DeletionInfo.generate_key(service_uuid, vmid)] = DeletionInfo( vmid=vmid, created=sql_now(), next_check=next_execution_calculator(delay_rate=delay_rate), @@ -103,7 +115,7 @@ class DeletionInfo: @staticmethod def get_from_storage( group: types.DeferredStorageGroup, - ) -> tuple[dict[str, 'DynamicService'], list[tuple[str, 'DeletionInfo']]]: + ) -> tuple[dict[str, 'DynamicService'], list['DeletionInfo']]: """ Get a list of objects to be processed from storage @@ -112,14 +124,14 @@ class DeletionInfo: This is so we can release locks as soon as possible """ count = 0 - infos: list[tuple[str, DeletionInfo]] = [] + infos: list[DeletionInfo] = [] services: dict[str, 'DynamicService'] = {} # First, get ownership of to_delete objects to be processed # We do this way to release db locks as soon as possible now = sql_now() - with DeferredDeletionWorker.deferred_storage.as_dict(group, atomic=True) as storage_dict: + with DeletionInfo.deferred_storage.as_dict(group, atomic=True) as storage_dict: for key, info in sorted( typing.cast(collections.abc.Iterable[tuple[str, DeletionInfo]], storage_dict.items()), key=lambda x: x[1].next_check, @@ -152,13 +164,13 @@ class DeletionInfo: del storage_dict[key] # Remove from storage, being processed # Only add if not too many retries already - infos.append((key, info)) + infos.append(info) return services, infos @staticmethod def count_from_storage(group: types.DeferredStorageGroup) -> int: # Counts the total number of objects in storage - with DeferredDeletionWorker.deferred_storage.as_dict(group) as storage_dict: + with DeletionInfo.deferred_storage.as_dict(group) as storage_dict: return len(storage_dict) @staticmethod @@ -170,8 +182,6 @@ class DeferredDeletionWorker(Job): frecuency = 7 # Frequency for this job, in seconds friendly_name = 'Deferred deletion runner' - deferred_storage: typing.ClassVar[storage.Storage] = storage.Storage('deferdel_worker') - @staticmethod def add(service: 'DynamicService', vmid: str, execute_later: bool = False) -> None: logger.debug('Adding %s from service %s to deferred deletion', vmid, service.type_name) @@ -223,7 +233,6 @@ class DeferredDeletionWorker(Job): def _process_exception( self, - key: str, info: DeletionInfo, to_group: types.DeferredStorageGroup, services: dict[str, 'DynamicService'], @@ -269,7 +278,7 @@ class DeferredDeletionWorker(Job): logger.debug('Processing %s to stop', to_stop) # Now process waiting stops - for key, info in to_stop: + for info in to_stop: # Key not used exec_time = execution_timer() try: service = services[info.service_uuid] @@ -294,7 +303,7 @@ class DeferredDeletionWorker(Job): info.sync_to_storage(types.DeferredStorageGroup.TO_DELETE) except Exception as e: self._process_exception( - key, info, types.DeferredStorageGroup.TO_STOP, services, e, delay_rate=exec_time.delay_rate + info, types.DeferredStorageGroup.TO_STOP, services, e, delay_rate=exec_time.delay_rate ) def process_stopping(self) -> None: @@ -302,7 +311,7 @@ class DeferredDeletionWorker(Job): logger.debug('Processing %s stopping', stopping) # Now process waiting for finishing stops - for key, info in stopping: + for info in stopping: exec_time = execution_timer() try: info.retries += 1 @@ -323,7 +332,7 @@ class DeferredDeletionWorker(Job): info.sync_to_storage(types.DeferredStorageGroup.TO_DELETE) except Exception as e: self._process_exception( - key, info, types.DeferredStorageGroup.STOPPING, services, e, delay_rate=exec_time.delay_rate + info, types.DeferredStorageGroup.STOPPING, services, e, delay_rate=exec_time.delay_rate ) def process_to_delete(self) -> None: @@ -331,7 +340,7 @@ class DeferredDeletionWorker(Job): logger.debug('Processing %s to delete', to_delete) # Now process waiting deletions - for key, info in to_delete: + for info in to_delete: service = services[info.service_uuid] exec_time = execution_timer() try: @@ -349,7 +358,6 @@ class DeferredDeletionWorker(Job): info.sync_to_storage(types.DeferredStorageGroup.DELETING) except Exception as e: self._process_exception( - key, info, types.DeferredStorageGroup.TO_DELETE, services, @@ -367,7 +375,7 @@ class DeferredDeletionWorker(Job): logger.debug('Processing %s deleting', deleting) # Now process waiting for finishing deletions - for key, info in deleting: + for info in deleting: # Key not used exec_time = execution_timer() try: info.retries += 1 @@ -385,7 +393,7 @@ class DeferredDeletionWorker(Job): info.sync_to_storage(types.DeferredStorageGroup.DELETING) except Exception as e: self._process_exception( - key, info, types.DeferredStorageGroup.DELETING, services, e, delay_rate=exec_time.delay_rate + info, types.DeferredStorageGroup.DELETING, services, e, delay_rate=exec_time.delay_rate ) def run(self) -> None: @@ -399,7 +407,7 @@ class DeferredDeletionWorker(Job): def report(out: typing.TextIO) -> None: out.write(DeletionInfo.csv_header() + '\n') for group in types.DeferredStorageGroup: - with DeferredDeletionWorker.deferred_storage.as_dict(group) as storage: + with DeletionInfo.deferred_storage.as_dict(group) as storage: for _key, info in typing.cast(dict[str, DeletionInfo], storage).items(): out.write(info.as_csv() + '\n') out.write('\n')