1
0
mirror of https://github.com/dkmstr/openuds.git synced 2025-01-18 06:03:54 +03:00

Moving types of deferred deletion to a better place, and fixing up left-begind not needed behaviors

This commit is contained in:
Adolfo Gómez García 2024-08-30 19:39:40 +02:00
parent eb62997bde
commit 3f60e7509e
No known key found for this signature in database
GPG Key ID: DD1ABF20724CDA23
3 changed files with 75 additions and 37 deletions

View File

@ -57,13 +57,13 @@ class DynamicDeferredDeleteTest(UDSTransactionTestCase):
def set_last_check_expired(self) -> None:
for group in deferred_types.DeferredStorageGroup:
with deferred_deletion.DeferredDeletionWorker.deferred_storage.as_dict(group) as storage:
with deferred_deletion.DeletionInfo.deferred_storage.as_dict(group) as storage:
for key, info in typing.cast(dict[str, deferred_deletion.DeletionInfo], storage).items():
info.next_check = sql_now() - datetime.timedelta(seconds=1)
storage[key] = info
def count_entries_on_storage(self, group: str) -> int:
with deferred_deletion.DeferredDeletionWorker.deferred_storage.as_dict(group) as storage:
with deferred_deletion.DeletionInfo.deferred_storage.as_dict(group) as storage:
return len(storage)
@contextlib.contextmanager
@ -99,7 +99,7 @@ class DynamicDeferredDeleteTest(UDSTransactionTestCase):
with mock.patch('uds.models.Service.objects') as objs:
objs.get.return_value = instance.db_obj()
with mock.patch(
'uds.workers.deferred_deletion.DeferredDeletionWorker.deferred_storage'
'uds.workers.deferred_deletion.DeletionInfo.deferred_storage'
) as storage:
@contextlib.contextmanager
@ -156,7 +156,7 @@ class DynamicDeferredDeleteTest(UDSTransactionTestCase):
self.assertEqual(self.count_entries_on_storage(deferred_types.DeferredStorageGroup.STOPPING), 0)
# Storage db should have 16 entries
with deferred_deletion.DeferredDeletionWorker.deferred_storage.as_dict(
with deferred_deletion.DeletionInfo.deferred_storage.as_dict(
deferred_types.DeferredStorageGroup.DELETING
) as deleting:
self.assertEqual(len(deleting), 16)
@ -194,7 +194,7 @@ class DynamicDeferredDeleteTest(UDSTransactionTestCase):
self.assertEqual(len(services_1), 1)
self.assertEqual(len(key_info_1), 1)
# And should rest only 15 on storage
with deferred_deletion.DeferredDeletionWorker.deferred_storage.as_dict(
with deferred_deletion.DeletionInfo.deferred_storage.as_dict(
deferred_types.DeferredStorageGroup.DELETING
) as deleting:
self.assertEqual(len(deleting), 15)
@ -206,11 +206,11 @@ class DynamicDeferredDeleteTest(UDSTransactionTestCase):
self.assertEqual(len(key_info_2), 15) # And 15 entries
# Re-store all DELETING_GROUP entries
with deferred_deletion.DeferredDeletionWorker.deferred_storage.as_dict(
with deferred_deletion.DeletionInfo.deferred_storage.as_dict(
deferred_types.DeferredStorageGroup.DELETING
) as deleting:
for info in itertools.chain(key_info_1, key_info_2):
deleting[info[0]] = info[1]
deleting[info.key] = info
# set MAX_DELETIONS_AT_ONCE to a value bigger than 16
deferred_consts.MAX_DELETIONS_AT_ONCE = 100
@ -263,18 +263,18 @@ class DynamicDeferredDeleteTest(UDSTransactionTestCase):
self.set_last_check_expired()
# Now, get from deleting again, should have all services and infos
services, key_info = deferred_deletion.DeletionInfo.get_from_storage(deferred_types.DeferredStorageGroup.TO_DELETE)
services, info = deferred_deletion.DeletionInfo.get_from_storage(deferred_types.DeferredStorageGroup.TO_DELETE)
self.assertEqual(len(services), 1)
self.assertEqual(len(key_info), 1)
self.assertEqual(len(info), 1)
# now, db should be empty
self.assertEqual(self.count_entries_on_storage(deferred_types.DeferredStorageGroup.TO_DELETE), 0)
# Re store the entry
with deferred_deletion.DeferredDeletionWorker.deferred_storage.as_dict(
with deferred_deletion.DeletionInfo.deferred_storage.as_dict(
deferred_types.DeferredStorageGroup.TO_DELETE
) as to_delete:
for info in key_info:
to_delete[info[0]] = info[1]
for info in info:
to_delete[info.key] = info
# Process should move from to_delete to deleting
job.run() # process_to_delete and process_deleting
@ -287,7 +287,7 @@ class DynamicDeferredDeleteTest(UDSTransactionTestCase):
instance.mock.reset_mock()
# And should have one entry in deleting
with deferred_deletion.DeferredDeletionWorker.deferred_storage.as_dict(
with deferred_deletion.DeletionInfo.deferred_storage.as_dict(
deferred_types.DeferredStorageGroup.DELETING
) as deleting:
self.assertEqual(len(deleting), 1)

View File

@ -181,6 +181,21 @@ class ExecutionTimer:
_max_delay_rate: float
def __init__(self, delay_threshold: float, *, max_delay_rate: float = 4.0) -> None:
"""
Creates a new ExecutionTimer
Arguments:
delay_threshold {float} -- Threshold for the delay rate, in seconds.
max_delay_rate {float} -- Maximum delay rate, defaults to 4.0
Note:
- delay_threshold is the time in seconds that we consider an operation is taking too long
- max_delay_rate is the maximum delay rate, if the operation is taking longer than the threshold, we will
multiply the delay by the delay rate, but at most by the max delay rate
- The delay will be calculated as the elapsed time divided by the threshold, at most the max delay rate
- A value of <= 0.0 will not delay at all, a value of 1.0 will delay as much as the elapsed time, a value of 2.0
will delay twice the elapsed time, and so on
"""
self._start = datetime.datetime.now()
self._end = self._start
self._running = False
@ -205,7 +220,22 @@ class ExecutionTimer:
@property
def delay_rate(self) -> float:
if self.elapsed.total_seconds() > self._delay_threshold:
"""
Returns the delay rate based on the elapsed time
Delay rate is a multiplier for the delay time based on the elapsed time
I.e:
- If the elapsed time is 0, the delay rate is 1.0
- If the delay_threshold is lower or equal to 0, the delay rate is 1.0
- If the elapsed time is greater than the threshold, the delay rate is the elapsed time divided by the threshold
for example:
* threshold = 2, elapsed = 4, delay rate = 2.0
* threshold = 2, elapsed = 8, delay rate = 4.0
- If the delay rate is greater than the max delay rate, the delay rate is the max delay rate
This allows us to increase the delay for next check based on how long the operation is taking
(the longer it takes, the longer we wait for the next check)
"""
if self._delay_threshold > 0 and self.elapsed.total_seconds() > self._delay_threshold:
# Ensure we do not delay too much, at most MAX_DELAY_RATE times
return min(self.elapsed.total_seconds() / self._delay_threshold, self._max_delay_rate)
return 1.0

View File

@ -51,6 +51,10 @@ logger = logging.getLogger(__name__)
def execution_timer() -> 'utils.ExecutionTimer':
"""
Generates an execution timer for deletion operations
This allows to delay the next check based on how long the operation took
"""
return utils.ExecutionTimer(
delay_threshold=consts.OPERATION_DELAY_THRESHOLD, max_delay_rate=consts.MAX_DELAY_RATE
)
@ -75,24 +79,32 @@ class DeletionInfo:
total_retries: int = 0 # Total retries
retries: int = 0 # Retries to stop again or to delete again in STOPPING_GROUP or DELETING_GROUP
deferred_storage: typing.ClassVar[storage.Storage] = storage.Storage('deferdel_worker')
@property
def key(self) -> str:
return DeletionInfo.generate_key(self.service_uuid, self.vmid)
def sync_to_storage(self, group: types.DeferredStorageGroup) -> None:
"""
Ensures that this object is stored on the storage
If exists, it will be updated, if not, it will be created
"""
unique_key = f'{self.service_uuid}_{self.vmid}'
with DeferredDeletionWorker.deferred_storage.as_dict(group, atomic=True) as storage_dict:
storage_dict[unique_key] = self
with DeletionInfo.deferred_storage.as_dict(group, atomic=True) as storage_dict:
storage_dict[self.key] = self
# For reporting
def as_csv(self) -> str:
return f'{self.vmid},{self.created},{self.next_check},{self.service_uuid},{self.fatal_retries},{self.total_retries},{self.retries}'
@staticmethod
def generate_key(service_uuid: str, vmid: str) -> str:
return f'{service_uuid}_{vmid}'
@staticmethod
def create_on_storage(group: str, vmid: str, service_uuid: str, delay_rate: float = 1.0) -> None:
unique_key = f'{service_uuid}_{vmid}'
with DeferredDeletionWorker.deferred_storage.as_dict(group, atomic=True) as storage_dict:
storage_dict[unique_key] = DeletionInfo(
with DeletionInfo.deferred_storage.as_dict(group, atomic=True) as storage_dict:
storage_dict[DeletionInfo.generate_key(service_uuid, vmid)] = DeletionInfo(
vmid=vmid,
created=sql_now(),
next_check=next_execution_calculator(delay_rate=delay_rate),
@ -103,7 +115,7 @@ class DeletionInfo:
@staticmethod
def get_from_storage(
group: types.DeferredStorageGroup,
) -> tuple[dict[str, 'DynamicService'], list[tuple[str, 'DeletionInfo']]]:
) -> tuple[dict[str, 'DynamicService'], list['DeletionInfo']]:
"""
Get a list of objects to be processed from storage
@ -112,14 +124,14 @@ class DeletionInfo:
This is so we can release locks as soon as possible
"""
count = 0
infos: list[tuple[str, DeletionInfo]] = []
infos: list[DeletionInfo] = []
services: dict[str, 'DynamicService'] = {}
# First, get ownership of to_delete objects to be processed
# We do this way to release db locks as soon as possible
now = sql_now()
with DeferredDeletionWorker.deferred_storage.as_dict(group, atomic=True) as storage_dict:
with DeletionInfo.deferred_storage.as_dict(group, atomic=True) as storage_dict:
for key, info in sorted(
typing.cast(collections.abc.Iterable[tuple[str, DeletionInfo]], storage_dict.items()),
key=lambda x: x[1].next_check,
@ -152,13 +164,13 @@ class DeletionInfo:
del storage_dict[key] # Remove from storage, being processed
# Only add if not too many retries already
infos.append((key, info))
infos.append(info)
return services, infos
@staticmethod
def count_from_storage(group: types.DeferredStorageGroup) -> int:
# Counts the total number of objects in storage
with DeferredDeletionWorker.deferred_storage.as_dict(group) as storage_dict:
with DeletionInfo.deferred_storage.as_dict(group) as storage_dict:
return len(storage_dict)
@staticmethod
@ -170,8 +182,6 @@ class DeferredDeletionWorker(Job):
frecuency = 7 # Frequency for this job, in seconds
friendly_name = 'Deferred deletion runner'
deferred_storage: typing.ClassVar[storage.Storage] = storage.Storage('deferdel_worker')
@staticmethod
def add(service: 'DynamicService', vmid: str, execute_later: bool = False) -> None:
logger.debug('Adding %s from service %s to deferred deletion', vmid, service.type_name)
@ -223,7 +233,6 @@ class DeferredDeletionWorker(Job):
def _process_exception(
self,
key: str,
info: DeletionInfo,
to_group: types.DeferredStorageGroup,
services: dict[str, 'DynamicService'],
@ -269,7 +278,7 @@ class DeferredDeletionWorker(Job):
logger.debug('Processing %s to stop', to_stop)
# Now process waiting stops
for key, info in to_stop:
for info in to_stop: # Key not used
exec_time = execution_timer()
try:
service = services[info.service_uuid]
@ -294,7 +303,7 @@ class DeferredDeletionWorker(Job):
info.sync_to_storage(types.DeferredStorageGroup.TO_DELETE)
except Exception as e:
self._process_exception(
key, info, types.DeferredStorageGroup.TO_STOP, services, e, delay_rate=exec_time.delay_rate
info, types.DeferredStorageGroup.TO_STOP, services, e, delay_rate=exec_time.delay_rate
)
def process_stopping(self) -> None:
@ -302,7 +311,7 @@ class DeferredDeletionWorker(Job):
logger.debug('Processing %s stopping', stopping)
# Now process waiting for finishing stops
for key, info in stopping:
for info in stopping:
exec_time = execution_timer()
try:
info.retries += 1
@ -323,7 +332,7 @@ class DeferredDeletionWorker(Job):
info.sync_to_storage(types.DeferredStorageGroup.TO_DELETE)
except Exception as e:
self._process_exception(
key, info, types.DeferredStorageGroup.STOPPING, services, e, delay_rate=exec_time.delay_rate
info, types.DeferredStorageGroup.STOPPING, services, e, delay_rate=exec_time.delay_rate
)
def process_to_delete(self) -> None:
@ -331,7 +340,7 @@ class DeferredDeletionWorker(Job):
logger.debug('Processing %s to delete', to_delete)
# Now process waiting deletions
for key, info in to_delete:
for info in to_delete:
service = services[info.service_uuid]
exec_time = execution_timer()
try:
@ -349,7 +358,6 @@ class DeferredDeletionWorker(Job):
info.sync_to_storage(types.DeferredStorageGroup.DELETING)
except Exception as e:
self._process_exception(
key,
info,
types.DeferredStorageGroup.TO_DELETE,
services,
@ -367,7 +375,7 @@ class DeferredDeletionWorker(Job):
logger.debug('Processing %s deleting', deleting)
# Now process waiting for finishing deletions
for key, info in deleting:
for info in deleting: # Key not used
exec_time = execution_timer()
try:
info.retries += 1
@ -385,7 +393,7 @@ class DeferredDeletionWorker(Job):
info.sync_to_storage(types.DeferredStorageGroup.DELETING)
except Exception as e:
self._process_exception(
key, info, types.DeferredStorageGroup.DELETING, services, e, delay_rate=exec_time.delay_rate
info, types.DeferredStorageGroup.DELETING, services, e, delay_rate=exec_time.delay_rate
)
def run(self) -> None:
@ -399,7 +407,7 @@ class DeferredDeletionWorker(Job):
def report(out: typing.TextIO) -> None:
out.write(DeletionInfo.csv_header() + '\n')
for group in types.DeferredStorageGroup:
with DeferredDeletionWorker.deferred_storage.as_dict(group) as storage:
with DeletionInfo.deferred_storage.as_dict(group) as storage:
for _key, info in typing.cast(dict[str, DeletionInfo], storage).items():
out.write(info.as_csv() + '\n')
out.write('\n')