diff --git a/server/src/uds/core/managers/userservice_helpers/opchecker.py b/server/src/uds/core/managers/userservice_helpers/opchecker.py index cf87ed5b0..afcda99e5 100644 --- a/server/src/uds/core/managers/userservice_helpers/opchecker.py +++ b/server/src/uds/core/managers/userservice_helpers/opchecker.py @@ -67,9 +67,9 @@ class StateUpdater(abc.ABC): if msg is not None: log.log(self.user_service, types.log.LogLevel.ERROR, msg, types.log.LogSource.INTERNAL) - def save(self, newState: typing.Optional[str] = None) -> None: - if newState: - self.user_service.set_state(newState) + def save(self, new_state: typing.Optional[str] = None) -> None: + if new_state: + self.user_service.set_state(new_state) self.user_service.update_data(self.user_service_instance) diff --git a/server/src/uds/core/util/utils.py b/server/src/uds/core/util/utils.py index b0687e42c..2dc18640d 100644 --- a/server/src/uds/core/util/utils.py +++ b/server/src/uds/core/util/utils.py @@ -171,3 +171,41 @@ def ignore_exceptions(log: bool = False) -> typing.Iterator[None]: if log: logger.error('Ignoring exception: %s', e) pass + +class ExecutionTimer: + _start: datetime.datetime + _end: datetime.datetime + _running: bool + + _delay_threshold: float + _max_delay_rate: float + + def __init__(self, delay_threshold: float, *, max_delay_rate: float = 4.0) -> None: + self._start = datetime.datetime.now() + self._end = self._start + self._running = False + + self._delay_threshold = delay_threshold + self._max_delay_rate = max_delay_rate + + def __enter__(self) -> 'ExecutionTimer': + self._start = self._end = datetime.datetime.now() + self._running = True + return self + + def __exit__(self, exc_type: typing.Any, exc_value: typing.Any, traceback: typing.Any) -> None: + self._running = False + self._end = datetime.datetime.now() + + @property + def elapsed(self) -> datetime.timedelta: + if self._running: + return datetime.datetime.now() - self._start + return self._end - self._start + + @property + def delay_rate(self) -> float: + if self.elapsed.total_seconds() > self._delay_threshold: + # Ensure we do not delay too much, at most MAX_DELAY_RATE times + return min(self.elapsed.total_seconds() / self._delay_threshold, self._max_delay_rate) + return 1.0 diff --git a/server/src/uds/core/workers/deferred_deletion.py b/server/src/uds/core/workers/deferred_deletion.py index b6a564a8b..e2190a9ad 100644 --- a/server/src/uds/core/workers/deferred_deletion.py +++ b/server/src/uds/core/workers/deferred_deletion.py @@ -38,7 +38,7 @@ import logging from uds.models import Service from uds.core.util.model import sql_now from uds.core.jobs import Job -from uds.core.util import storage +from uds.core.util import storage, utils from uds.core.services.generics import exceptions as gen_exceptions @@ -69,40 +69,11 @@ TO_DELETE_GROUP: typing.Final[str] = 'to_delete' DELETING_GROUP: typing.Final[str] = 'deleting' -class ExecutionTimer: - _start: datetime.datetime - _end: datetime.datetime - _running: bool - - def __init__(self) -> None: - self._start = datetime.datetime.now() - self._end = self._start - self._running = False - - def __enter__(self) -> 'ExecutionTimer': - self._start = self._end = datetime.datetime.now() - self._running = True - return self - - def __exit__(self, exc_type: typing.Any, exc_value: typing.Any, traceback: typing.Any) -> None: - self._running = False - self._end = datetime.datetime.now() - - @property - def elapsed(self) -> datetime.timedelta: - if self._running: - return datetime.datetime.now() - self._start - return self._end - self._start - - @property - def delay_rate(self) -> float: - if self.elapsed.total_seconds() > OPERATION_DELAY_THRESHOLD: - # Ensure we do not delay too much, at most MAX_DELAY_RATE times - return min(self.elapsed.total_seconds() / OPERATION_DELAY_THRESHOLD, MAX_DELAY_RATE) - return 1.0 +def execution_timer() -> 'utils.ExecutionTimer': + return utils.ExecutionTimer(delay_threshold=OPERATION_DELAY_THRESHOLD, max_delay_rate=MAX_DELAY_RATE) -def calc_next(*, fatal: bool = False, delay_rate: float = 1.0) -> datetime.datetime: +def next_execution_calculator(*, fatal: bool = False, delay_rate: float = 1.0) -> datetime.datetime: """ Returns the next check time for a deletion operation """ @@ -135,7 +106,7 @@ class DeletionInfo: storage_dict[unique_key] = DeletionInfo( vmid=vmid, created=sql_now(), - next_check=calc_next(delay_rate=delay_rate), + next_check=next_execution_calculator(delay_rate=delay_rate), service_uuid=service_uuid, # fatal, total an retries are 0 by default ) @@ -207,7 +178,7 @@ class DeferredDeletionWorker(Job): logger.debug('Adding %s from service %s to deferred deletion', vmid, service.type_name) # If sync, execute now if not execute_later: - exec_time = ExecutionTimer() + exec_time = execution_timer() try: with exec_time: if service.must_stop_before_deletion: @@ -221,7 +192,9 @@ class DeferredDeletionWorker(Job): service.execute_delete(vmid) # If this takes too long, we will delay the next check a bit - DeletionInfo.create_on_storage(DELETING_GROUP, vmid, service.db_obj().uuid, delay_rate=exec_time.delay_rate) + DeletionInfo.create_on_storage( + DELETING_GROUP, vmid, service.db_obj().uuid, delay_rate=exec_time.delay_rate + ) except gen_exceptions.NotFoundError: return # Already removed except Exception as e: @@ -265,7 +238,7 @@ class DeferredDeletionWorker(Job): ) if not is_retryable: - info.next_check = calc_next(fatal=True, delay_rate=delay_rate) + info.next_check = next_execution_calculator(fatal=True, delay_rate=delay_rate) info.fatal_retries += 1 if info.fatal_retries >= MAX_FATAL_ERROR_RETRIES: logger.error( @@ -274,7 +247,7 @@ class DeferredDeletionWorker(Job): services[info.service_uuid].db_obj().name, ) return # Do not readd it - info.next_check = calc_next(delay_rate=delay_rate) + info.next_check = next_execution_calculator(delay_rate=delay_rate) info.total_retries += 1 if info.total_retries >= MAX_RETRAYABLE_ERROR_RETRIES: logger.error( @@ -291,7 +264,7 @@ class DeferredDeletionWorker(Job): # Now process waiting stops for key, info in to_stop: - exec_time = ExecutionTimer() + exec_time = execution_timer() try: service = services[info.service_uuid] with exec_time: @@ -308,7 +281,7 @@ class DeferredDeletionWorker(Job): info.retries = 0 # Reset retries service.stop(None, info.vmid) # Always try to stop it if we have tried before - info.next_check = calc_next(delay_rate=exec_time.delay_rate) + info.next_check = next_execution_calculator(delay_rate=exec_time.delay_rate) info.sync_to_storage(STOPPING_GROUP) else: # Do not update last_check to shutdown it asap, was not running after all @@ -322,22 +295,22 @@ class DeferredDeletionWorker(Job): # Now process waiting for finishing stops for key, info in stopping: - exec_time = ExecutionTimer() + exec_time = execution_timer() try: info.retries += 1 if info.retries > RETRIES_TO_RETRY: # If we have tried to stop it, and it has not stopped, add to stop again - info.next_check = calc_next() + info.next_check = next_execution_calculator() info.total_retries += 1 info.sync_to_storage(TO_STOP_GROUP) continue with exec_time: if services[info.service_uuid].is_running(None, info.vmid): - info.next_check = calc_next(delay_rate=exec_time.delay_rate) + info.next_check = next_execution_calculator(delay_rate=exec_time.delay_rate) info.total_retries += 1 info.sync_to_storage(STOPPING_GROUP) else: - info.next_check = calc_next(delay_rate=exec_time.delay_rate) + info.next_check = next_execution_calculator(delay_rate=exec_time.delay_rate) info.fatal_retries = info.total_retries = 0 info.sync_to_storage(TO_DELETE_GROUP) except Exception as e: @@ -350,7 +323,7 @@ class DeferredDeletionWorker(Job): # Now process waiting deletions for key, info in to_delete: service = services[info.service_uuid] - exec_time = ExecutionTimer() + exec_time = execution_timer() try: with exec_time: # If must be stopped before deletion, and is running, put it on to_stop @@ -360,12 +333,14 @@ class DeferredDeletionWorker(Job): service.execute_delete(info.vmid) # And store it for checking later if it has been deleted, reseting counters - info.next_check = calc_next(delay_rate=exec_time.delay_rate) + info.next_check = next_execution_calculator(delay_rate=exec_time.delay_rate) info.retries = 0 info.total_retries += 1 info.sync_to_storage(DELETING_GROUP) except Exception as e: - self._process_exception(key, info, TO_DELETE_GROUP, services, e, delay_rate=exec_time.delay_rate) + self._process_exception( + key, info, TO_DELETE_GROUP, services, e, delay_rate=exec_time.delay_rate + ) def process_deleting(self) -> None: """ @@ -378,19 +353,19 @@ class DeferredDeletionWorker(Job): # Now process waiting for finishing deletions for key, info in deleting: - exec_time = ExecutionTimer() + exec_time = execution_timer() try: info.retries += 1 if info.retries > RETRIES_TO_RETRY: # If we have tried to delete it, and it has not been deleted, add to delete again - info.next_check = calc_next() + info.next_check = next_execution_calculator() info.total_retries += 1 info.sync_to_storage(TO_DELETE_GROUP) continue with exec_time: # If not finished, readd it for later check if not services[info.service_uuid].is_deleted(info.vmid): - info.next_check = calc_next(delay_rate=exec_time.delay_rate) + info.next_check = next_execution_calculator(delay_rate=exec_time.delay_rate) info.total_retries += 1 info.sync_to_storage(DELETING_GROUP) except Exception as e: