mirror of
https://github.com/dkmstr/openuds.git
synced 2025-01-10 01:17:59 +03:00
Moved ExecutionTimer to be used in other places
This commit is contained in:
parent
a25184af52
commit
1e64b957a1
@ -67,9 +67,9 @@ class StateUpdater(abc.ABC):
|
|||||||
if msg is not None:
|
if msg is not None:
|
||||||
log.log(self.user_service, types.log.LogLevel.ERROR, msg, types.log.LogSource.INTERNAL)
|
log.log(self.user_service, types.log.LogLevel.ERROR, msg, types.log.LogSource.INTERNAL)
|
||||||
|
|
||||||
def save(self, newState: typing.Optional[str] = None) -> None:
|
def save(self, new_state: typing.Optional[str] = None) -> None:
|
||||||
if newState:
|
if new_state:
|
||||||
self.user_service.set_state(newState)
|
self.user_service.set_state(new_state)
|
||||||
|
|
||||||
self.user_service.update_data(self.user_service_instance)
|
self.user_service.update_data(self.user_service_instance)
|
||||||
|
|
||||||
|
@ -171,3 +171,41 @@ def ignore_exceptions(log: bool = False) -> typing.Iterator[None]:
|
|||||||
if log:
|
if log:
|
||||||
logger.error('Ignoring exception: %s', e)
|
logger.error('Ignoring exception: %s', e)
|
||||||
pass
|
pass
|
||||||
|
|
||||||
|
class ExecutionTimer:
|
||||||
|
_start: datetime.datetime
|
||||||
|
_end: datetime.datetime
|
||||||
|
_running: bool
|
||||||
|
|
||||||
|
_delay_threshold: float
|
||||||
|
_max_delay_rate: float
|
||||||
|
|
||||||
|
def __init__(self, delay_threshold: float, *, max_delay_rate: float = 4.0) -> None:
|
||||||
|
self._start = datetime.datetime.now()
|
||||||
|
self._end = self._start
|
||||||
|
self._running = False
|
||||||
|
|
||||||
|
self._delay_threshold = delay_threshold
|
||||||
|
self._max_delay_rate = max_delay_rate
|
||||||
|
|
||||||
|
def __enter__(self) -> 'ExecutionTimer':
|
||||||
|
self._start = self._end = datetime.datetime.now()
|
||||||
|
self._running = True
|
||||||
|
return self
|
||||||
|
|
||||||
|
def __exit__(self, exc_type: typing.Any, exc_value: typing.Any, traceback: typing.Any) -> None:
|
||||||
|
self._running = False
|
||||||
|
self._end = datetime.datetime.now()
|
||||||
|
|
||||||
|
@property
|
||||||
|
def elapsed(self) -> datetime.timedelta:
|
||||||
|
if self._running:
|
||||||
|
return datetime.datetime.now() - self._start
|
||||||
|
return self._end - self._start
|
||||||
|
|
||||||
|
@property
|
||||||
|
def delay_rate(self) -> float:
|
||||||
|
if self.elapsed.total_seconds() > self._delay_threshold:
|
||||||
|
# Ensure we do not delay too much, at most MAX_DELAY_RATE times
|
||||||
|
return min(self.elapsed.total_seconds() / self._delay_threshold, self._max_delay_rate)
|
||||||
|
return 1.0
|
||||||
|
@ -38,7 +38,7 @@ import logging
|
|||||||
from uds.models import Service
|
from uds.models import Service
|
||||||
from uds.core.util.model import sql_now
|
from uds.core.util.model import sql_now
|
||||||
from uds.core.jobs import Job
|
from uds.core.jobs import Job
|
||||||
from uds.core.util import storage
|
from uds.core.util import storage, utils
|
||||||
|
|
||||||
from uds.core.services.generics import exceptions as gen_exceptions
|
from uds.core.services.generics import exceptions as gen_exceptions
|
||||||
|
|
||||||
@ -69,40 +69,11 @@ TO_DELETE_GROUP: typing.Final[str] = 'to_delete'
|
|||||||
DELETING_GROUP: typing.Final[str] = 'deleting'
|
DELETING_GROUP: typing.Final[str] = 'deleting'
|
||||||
|
|
||||||
|
|
||||||
class ExecutionTimer:
|
def execution_timer() -> 'utils.ExecutionTimer':
|
||||||
_start: datetime.datetime
|
return utils.ExecutionTimer(delay_threshold=OPERATION_DELAY_THRESHOLD, max_delay_rate=MAX_DELAY_RATE)
|
||||||
_end: datetime.datetime
|
|
||||||
_running: bool
|
|
||||||
|
|
||||||
def __init__(self) -> None:
|
|
||||||
self._start = datetime.datetime.now()
|
|
||||||
self._end = self._start
|
|
||||||
self._running = False
|
|
||||||
|
|
||||||
def __enter__(self) -> 'ExecutionTimer':
|
|
||||||
self._start = self._end = datetime.datetime.now()
|
|
||||||
self._running = True
|
|
||||||
return self
|
|
||||||
|
|
||||||
def __exit__(self, exc_type: typing.Any, exc_value: typing.Any, traceback: typing.Any) -> None:
|
|
||||||
self._running = False
|
|
||||||
self._end = datetime.datetime.now()
|
|
||||||
|
|
||||||
@property
|
|
||||||
def elapsed(self) -> datetime.timedelta:
|
|
||||||
if self._running:
|
|
||||||
return datetime.datetime.now() - self._start
|
|
||||||
return self._end - self._start
|
|
||||||
|
|
||||||
@property
|
|
||||||
def delay_rate(self) -> float:
|
|
||||||
if self.elapsed.total_seconds() > OPERATION_DELAY_THRESHOLD:
|
|
||||||
# Ensure we do not delay too much, at most MAX_DELAY_RATE times
|
|
||||||
return min(self.elapsed.total_seconds() / OPERATION_DELAY_THRESHOLD, MAX_DELAY_RATE)
|
|
||||||
return 1.0
|
|
||||||
|
|
||||||
|
|
||||||
def calc_next(*, fatal: bool = False, delay_rate: float = 1.0) -> datetime.datetime:
|
def next_execution_calculator(*, fatal: bool = False, delay_rate: float = 1.0) -> datetime.datetime:
|
||||||
"""
|
"""
|
||||||
Returns the next check time for a deletion operation
|
Returns the next check time for a deletion operation
|
||||||
"""
|
"""
|
||||||
@ -135,7 +106,7 @@ class DeletionInfo:
|
|||||||
storage_dict[unique_key] = DeletionInfo(
|
storage_dict[unique_key] = DeletionInfo(
|
||||||
vmid=vmid,
|
vmid=vmid,
|
||||||
created=sql_now(),
|
created=sql_now(),
|
||||||
next_check=calc_next(delay_rate=delay_rate),
|
next_check=next_execution_calculator(delay_rate=delay_rate),
|
||||||
service_uuid=service_uuid,
|
service_uuid=service_uuid,
|
||||||
# fatal, total an retries are 0 by default
|
# fatal, total an retries are 0 by default
|
||||||
)
|
)
|
||||||
@ -207,7 +178,7 @@ class DeferredDeletionWorker(Job):
|
|||||||
logger.debug('Adding %s from service %s to deferred deletion', vmid, service.type_name)
|
logger.debug('Adding %s from service %s to deferred deletion', vmid, service.type_name)
|
||||||
# If sync, execute now
|
# If sync, execute now
|
||||||
if not execute_later:
|
if not execute_later:
|
||||||
exec_time = ExecutionTimer()
|
exec_time = execution_timer()
|
||||||
try:
|
try:
|
||||||
with exec_time:
|
with exec_time:
|
||||||
if service.must_stop_before_deletion:
|
if service.must_stop_before_deletion:
|
||||||
@ -221,7 +192,9 @@ class DeferredDeletionWorker(Job):
|
|||||||
|
|
||||||
service.execute_delete(vmid)
|
service.execute_delete(vmid)
|
||||||
# If this takes too long, we will delay the next check a bit
|
# If this takes too long, we will delay the next check a bit
|
||||||
DeletionInfo.create_on_storage(DELETING_GROUP, vmid, service.db_obj().uuid, delay_rate=exec_time.delay_rate)
|
DeletionInfo.create_on_storage(
|
||||||
|
DELETING_GROUP, vmid, service.db_obj().uuid, delay_rate=exec_time.delay_rate
|
||||||
|
)
|
||||||
except gen_exceptions.NotFoundError:
|
except gen_exceptions.NotFoundError:
|
||||||
return # Already removed
|
return # Already removed
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
@ -265,7 +238,7 @@ class DeferredDeletionWorker(Job):
|
|||||||
)
|
)
|
||||||
|
|
||||||
if not is_retryable:
|
if not is_retryable:
|
||||||
info.next_check = calc_next(fatal=True, delay_rate=delay_rate)
|
info.next_check = next_execution_calculator(fatal=True, delay_rate=delay_rate)
|
||||||
info.fatal_retries += 1
|
info.fatal_retries += 1
|
||||||
if info.fatal_retries >= MAX_FATAL_ERROR_RETRIES:
|
if info.fatal_retries >= MAX_FATAL_ERROR_RETRIES:
|
||||||
logger.error(
|
logger.error(
|
||||||
@ -274,7 +247,7 @@ class DeferredDeletionWorker(Job):
|
|||||||
services[info.service_uuid].db_obj().name,
|
services[info.service_uuid].db_obj().name,
|
||||||
)
|
)
|
||||||
return # Do not readd it
|
return # Do not readd it
|
||||||
info.next_check = calc_next(delay_rate=delay_rate)
|
info.next_check = next_execution_calculator(delay_rate=delay_rate)
|
||||||
info.total_retries += 1
|
info.total_retries += 1
|
||||||
if info.total_retries >= MAX_RETRAYABLE_ERROR_RETRIES:
|
if info.total_retries >= MAX_RETRAYABLE_ERROR_RETRIES:
|
||||||
logger.error(
|
logger.error(
|
||||||
@ -291,7 +264,7 @@ class DeferredDeletionWorker(Job):
|
|||||||
|
|
||||||
# Now process waiting stops
|
# Now process waiting stops
|
||||||
for key, info in to_stop:
|
for key, info in to_stop:
|
||||||
exec_time = ExecutionTimer()
|
exec_time = execution_timer()
|
||||||
try:
|
try:
|
||||||
service = services[info.service_uuid]
|
service = services[info.service_uuid]
|
||||||
with exec_time:
|
with exec_time:
|
||||||
@ -308,7 +281,7 @@ class DeferredDeletionWorker(Job):
|
|||||||
info.retries = 0 # Reset retries
|
info.retries = 0 # Reset retries
|
||||||
service.stop(None, info.vmid) # Always try to stop it if we have tried before
|
service.stop(None, info.vmid) # Always try to stop it if we have tried before
|
||||||
|
|
||||||
info.next_check = calc_next(delay_rate=exec_time.delay_rate)
|
info.next_check = next_execution_calculator(delay_rate=exec_time.delay_rate)
|
||||||
info.sync_to_storage(STOPPING_GROUP)
|
info.sync_to_storage(STOPPING_GROUP)
|
||||||
else:
|
else:
|
||||||
# Do not update last_check to shutdown it asap, was not running after all
|
# Do not update last_check to shutdown it asap, was not running after all
|
||||||
@ -322,22 +295,22 @@ class DeferredDeletionWorker(Job):
|
|||||||
|
|
||||||
# Now process waiting for finishing stops
|
# Now process waiting for finishing stops
|
||||||
for key, info in stopping:
|
for key, info in stopping:
|
||||||
exec_time = ExecutionTimer()
|
exec_time = execution_timer()
|
||||||
try:
|
try:
|
||||||
info.retries += 1
|
info.retries += 1
|
||||||
if info.retries > RETRIES_TO_RETRY:
|
if info.retries > RETRIES_TO_RETRY:
|
||||||
# If we have tried to stop it, and it has not stopped, add to stop again
|
# If we have tried to stop it, and it has not stopped, add to stop again
|
||||||
info.next_check = calc_next()
|
info.next_check = next_execution_calculator()
|
||||||
info.total_retries += 1
|
info.total_retries += 1
|
||||||
info.sync_to_storage(TO_STOP_GROUP)
|
info.sync_to_storage(TO_STOP_GROUP)
|
||||||
continue
|
continue
|
||||||
with exec_time:
|
with exec_time:
|
||||||
if services[info.service_uuid].is_running(None, info.vmid):
|
if services[info.service_uuid].is_running(None, info.vmid):
|
||||||
info.next_check = calc_next(delay_rate=exec_time.delay_rate)
|
info.next_check = next_execution_calculator(delay_rate=exec_time.delay_rate)
|
||||||
info.total_retries += 1
|
info.total_retries += 1
|
||||||
info.sync_to_storage(STOPPING_GROUP)
|
info.sync_to_storage(STOPPING_GROUP)
|
||||||
else:
|
else:
|
||||||
info.next_check = calc_next(delay_rate=exec_time.delay_rate)
|
info.next_check = next_execution_calculator(delay_rate=exec_time.delay_rate)
|
||||||
info.fatal_retries = info.total_retries = 0
|
info.fatal_retries = info.total_retries = 0
|
||||||
info.sync_to_storage(TO_DELETE_GROUP)
|
info.sync_to_storage(TO_DELETE_GROUP)
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
@ -350,7 +323,7 @@ class DeferredDeletionWorker(Job):
|
|||||||
# Now process waiting deletions
|
# Now process waiting deletions
|
||||||
for key, info in to_delete:
|
for key, info in to_delete:
|
||||||
service = services[info.service_uuid]
|
service = services[info.service_uuid]
|
||||||
exec_time = ExecutionTimer()
|
exec_time = execution_timer()
|
||||||
try:
|
try:
|
||||||
with exec_time:
|
with exec_time:
|
||||||
# If must be stopped before deletion, and is running, put it on to_stop
|
# If must be stopped before deletion, and is running, put it on to_stop
|
||||||
@ -360,12 +333,14 @@ class DeferredDeletionWorker(Job):
|
|||||||
|
|
||||||
service.execute_delete(info.vmid)
|
service.execute_delete(info.vmid)
|
||||||
# And store it for checking later if it has been deleted, reseting counters
|
# And store it for checking later if it has been deleted, reseting counters
|
||||||
info.next_check = calc_next(delay_rate=exec_time.delay_rate)
|
info.next_check = next_execution_calculator(delay_rate=exec_time.delay_rate)
|
||||||
info.retries = 0
|
info.retries = 0
|
||||||
info.total_retries += 1
|
info.total_retries += 1
|
||||||
info.sync_to_storage(DELETING_GROUP)
|
info.sync_to_storage(DELETING_GROUP)
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
self._process_exception(key, info, TO_DELETE_GROUP, services, e, delay_rate=exec_time.delay_rate)
|
self._process_exception(
|
||||||
|
key, info, TO_DELETE_GROUP, services, e, delay_rate=exec_time.delay_rate
|
||||||
|
)
|
||||||
|
|
||||||
def process_deleting(self) -> None:
|
def process_deleting(self) -> None:
|
||||||
"""
|
"""
|
||||||
@ -378,19 +353,19 @@ class DeferredDeletionWorker(Job):
|
|||||||
|
|
||||||
# Now process waiting for finishing deletions
|
# Now process waiting for finishing deletions
|
||||||
for key, info in deleting:
|
for key, info in deleting:
|
||||||
exec_time = ExecutionTimer()
|
exec_time = execution_timer()
|
||||||
try:
|
try:
|
||||||
info.retries += 1
|
info.retries += 1
|
||||||
if info.retries > RETRIES_TO_RETRY:
|
if info.retries > RETRIES_TO_RETRY:
|
||||||
# If we have tried to delete it, and it has not been deleted, add to delete again
|
# If we have tried to delete it, and it has not been deleted, add to delete again
|
||||||
info.next_check = calc_next()
|
info.next_check = next_execution_calculator()
|
||||||
info.total_retries += 1
|
info.total_retries += 1
|
||||||
info.sync_to_storage(TO_DELETE_GROUP)
|
info.sync_to_storage(TO_DELETE_GROUP)
|
||||||
continue
|
continue
|
||||||
with exec_time:
|
with exec_time:
|
||||||
# If not finished, readd it for later check
|
# If not finished, readd it for later check
|
||||||
if not services[info.service_uuid].is_deleted(info.vmid):
|
if not services[info.service_uuid].is_deleted(info.vmid):
|
||||||
info.next_check = calc_next(delay_rate=exec_time.delay_rate)
|
info.next_check = next_execution_calculator(delay_rate=exec_time.delay_rate)
|
||||||
info.total_retries += 1
|
info.total_retries += 1
|
||||||
info.sync_to_storage(DELETING_GROUP)
|
info.sync_to_storage(DELETING_GROUP)
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
|
Loading…
Reference in New Issue
Block a user