mirror of
https://github.com/dkmstr/openuds.git
synced 2025-03-11 00:58:39 +03:00
Refactor variable names for better readability and consistency
This commit is contained in:
parent
bfa474d3f1
commit
66dcad2d8d
@ -54,11 +54,11 @@ class HangedCleaner(Job):
|
||||
since_state = now - timedelta(
|
||||
seconds=GlobalConfig.MAX_INITIALIZING_TIME.as_int()
|
||||
)
|
||||
since_removing = now - timedelta(seconds=GlobalConfig.MAX_REMOVAL_TIME.as_int())
|
||||
removing_since = now - timedelta(seconds=GlobalConfig.MAX_REMOVAL_TIME.as_int())
|
||||
# Filter for locating machine not ready
|
||||
flt = Q(state_date__lt=since_state, state=types.states.State.PREPARING) | Q(
|
||||
state_date__lt=since_state, state=types.states.State.USABLE, os_state=types.states.State.PREPARING
|
||||
) | Q(state_date__lt=since_removing, state__in=[types.states.State.REMOVING, types.states.State.CANCELING])
|
||||
) | Q(state_date__lt=removing_since, state__in=[types.states.State.REMOVING, types.states.State.CANCELING])
|
||||
|
||||
servicepools_with_hanged = (
|
||||
ServicePool.objects.annotate(
|
||||
@ -75,7 +75,7 @@ class HangedCleaner(Job):
|
||||
userServices__os_state=types.states.State.PREPARING,
|
||||
)
|
||||
| Q(
|
||||
userServices__state_date__lt=since_removing,
|
||||
userServices__state_date__lt=removing_since,
|
||||
userServices__state__in=[types.states.State.REMOVING, types.states.State.CANCELING],
|
||||
),
|
||||
)
|
||||
|
@ -31,7 +31,6 @@ Author: Adolfo Gómez, dkmaster at dkmon dot com
|
||||
"""
|
||||
from datetime import timedelta
|
||||
import logging
|
||||
import collections.abc
|
||||
|
||||
from uds.core.managers import publication_manager
|
||||
from uds.core.util.config import GlobalConfig
|
||||
@ -52,11 +51,11 @@ class PublicationInfoItemsCleaner(Job):
|
||||
friendly_name = 'Publications Info Cleaner'
|
||||
|
||||
def run(self) -> None:
|
||||
removeFrom = sql_now() - timedelta(
|
||||
remove_since = sql_now() - timedelta(
|
||||
seconds=GlobalConfig.KEEP_INFO_TIME.as_int(True)
|
||||
)
|
||||
ServicePoolPublication.objects.filter(
|
||||
state__in=State.INFO_STATES, state_date__lt=removeFrom
|
||||
state__in=State.INFO_STATES, state_date__lt=remove_since
|
||||
).delete()
|
||||
|
||||
|
||||
@ -68,9 +67,7 @@ class PublicationCleaner(Job):
|
||||
friendly_name = 'Publication Cleaner'
|
||||
|
||||
def run(self) -> None:
|
||||
removables: collections.abc.Iterable[
|
||||
ServicePoolPublication
|
||||
] = ServicePoolPublication.objects.filter(
|
||||
removables = ServicePoolPublication.objects.filter(
|
||||
state=State.REMOVABLE,
|
||||
deployed_service__service__provider__maintenance_mode=False,
|
||||
)
|
||||
|
@ -53,11 +53,11 @@ class DeployedServiceInfoItemsCleaner(Job):
|
||||
friendly_name = 'Deployed Service Info Cleaner'
|
||||
|
||||
def run(self) -> None:
|
||||
removeFrom = sql_now() - timedelta(
|
||||
remove_since = sql_now() - timedelta(
|
||||
seconds=GlobalConfig.KEEP_INFO_TIME.as_int()
|
||||
)
|
||||
ServicePool.objects.filter(
|
||||
state__in=State.INFO_STATES, state_date__lt=removeFrom
|
||||
state__in=State.INFO_STATES, state_date__lt=remove_since
|
||||
).delete()
|
||||
|
||||
|
||||
@ -95,75 +95,75 @@ class DeployedServiceRemover(Job):
|
||||
service_pool.name += ' (removed)'
|
||||
service_pool.save(update_fields=['state', 'state_date', 'name'])
|
||||
|
||||
def continue_removal_of(self, servicePool: ServicePool) -> None:
|
||||
def continue_removal_of(self, servicepool: ServicePool) -> None:
|
||||
# get current time
|
||||
now = sql_now()
|
||||
|
||||
# Recheck that there is no publication created just after "startRemovalOf"
|
||||
try:
|
||||
for pub in servicePool.publications.filter(state=State.PREPARING):
|
||||
for pub in servicepool.publications.filter(state=State.PREPARING):
|
||||
pub.cancel()
|
||||
except Exception: # nosec: Dont care if we fail here, we will try again later
|
||||
pass
|
||||
|
||||
try:
|
||||
# Now all publications are canceling, let's try to cancel cache and assigned also
|
||||
uServices: collections.abc.Iterable[UserService] = servicePool.userServices.filter(
|
||||
userservices: collections.abc.Iterable[UserService] = servicepool.userServices.filter(
|
||||
state=State.PREPARING
|
||||
)
|
||||
for userService in uServices:
|
||||
logger.debug('Canceling %s', userService)
|
||||
userService.cancel()
|
||||
for userservice in userservices:
|
||||
logger.debug('Canceling %s', userservice)
|
||||
userservice.cancel()
|
||||
except Exception: # nosec: Dont care if we fail here, we will try again later
|
||||
pass
|
||||
|
||||
# First, we remove all publications and user services in "info_state"
|
||||
with transaction.atomic():
|
||||
servicePool.userServices.select_for_update().filter(
|
||||
servicepool.userServices.select_for_update().filter(
|
||||
state__in=State.INFO_STATES
|
||||
).delete()
|
||||
|
||||
# Mark usable user services as removable, as batch
|
||||
with transaction.atomic():
|
||||
servicePool.userServices.select_for_update().filter(
|
||||
servicepool.userServices.select_for_update().filter(
|
||||
state=State.USABLE
|
||||
).update(state=State.REMOVABLE, state_date=now)
|
||||
|
||||
# When no service is at database, we start with publications
|
||||
if servicePool.userServices.all().count() == 0:
|
||||
if servicepool.userServices.all().count() == 0:
|
||||
try:
|
||||
logger.debug('All services removed, checking active publication')
|
||||
if servicePool.active_publication() is not None:
|
||||
if servicepool.active_publication() is not None:
|
||||
logger.debug('Active publication found, unpublishing it')
|
||||
servicePool.unpublish()
|
||||
servicepool.unpublish()
|
||||
else:
|
||||
logger.debug(
|
||||
'No active publication found, removing info states and checking if removal is done'
|
||||
)
|
||||
servicePool.publications.filter(
|
||||
servicepool.publications.filter(
|
||||
state__in=State.INFO_STATES
|
||||
).delete()
|
||||
if servicePool.publications.count() == 0:
|
||||
servicePool.removed() # Mark it as removed, let model decide what to do
|
||||
if servicepool.publications.count() == 0:
|
||||
servicepool.removed() # Mark it as removed, let model decide what to do
|
||||
except Exception:
|
||||
logger.exception('Cought unexpected exception at continueRemovalOf: ')
|
||||
|
||||
def force_removal_of(self, servicePool: ServicePool) -> None:
|
||||
def force_removal_of(self, servicepool: ServicePool) -> None:
|
||||
# Simple remove all publications and user services, without checking anything
|
||||
# Log userServices forcet to remove
|
||||
logger.warning(
|
||||
'Service %s has been in removing state for too long, forcing removal',
|
||||
servicePool.name,
|
||||
servicepool.name,
|
||||
)
|
||||
for userservice in servicePool.userServices.all():
|
||||
for userservice in servicepool.userServices.all():
|
||||
logger.warning('Force removing user service %s', userservice)
|
||||
userservice.delete()
|
||||
servicePool.userServices.all().delete()
|
||||
for publication in servicePool.publications.all():
|
||||
servicepool.userServices.all().delete()
|
||||
for publication in servicepool.publications.all():
|
||||
logger.warning('Force removing %s', publication)
|
||||
publication.delete()
|
||||
|
||||
servicePool.removed() # Mark it as removed, let model decide what to do
|
||||
servicepool.removed() # Mark it as removed, let model decide what to do
|
||||
|
||||
def run(self) -> None:
|
||||
# First check if there is someone in "removable" estate
|
||||
|
@ -85,136 +85,6 @@ class ServiceCacheUpdater(Job):
|
||||
remaining_restraing_time,
|
||||
)
|
||||
|
||||
# def service_pools_needing_cache_update(
|
||||
# self,
|
||||
# ) -> list[types.services.ServicePoolStats]:
|
||||
# # State filter for cached and inAssigned objects
|
||||
# # First we get all deployed services that could need cache generation
|
||||
# # We start filtering out the deployed services that do not need caching at all.
|
||||
# candidate_servicepools: collections.abc.Iterable[ServicePool] = (
|
||||
# ServicePool.objects.filter(Q(initial_srvs__gte=0) | Q(cache_l1_srvs__gte=0))
|
||||
# .filter(
|
||||
# max_srvs__gt=0,
|
||||
# state=State.ACTIVE,
|
||||
# service__provider__maintenance_mode=False,
|
||||
# )
|
||||
# .iterator()
|
||||
# )
|
||||
|
||||
# # We will get the one that proportionally needs more cache
|
||||
# servicepools_numbers: list[types.services.ServicePoolStats] = []
|
||||
# for servicepool in candidate_servicepools:
|
||||
# servicepool.user_services.update() # Cleans cached queries
|
||||
# # If this deployedService don't have a publication active and needs it, ignore it
|
||||
# service_instance = servicepool.service.get_instance()
|
||||
|
||||
# if service_instance.uses_cache is False:
|
||||
# logger.debug(
|
||||
# 'Skipping cache generation for service pool that does not uses cache: %s',
|
||||
# servicepool.name,
|
||||
# )
|
||||
# continue
|
||||
|
||||
# if servicepool.active_publication() is None and service_instance.publication_type is not None:
|
||||
# logger.debug(
|
||||
# 'Skipping. %s Needs publication but do not have one',
|
||||
# servicepool.name,
|
||||
# )
|
||||
# continue
|
||||
# # If it has any running publication, do not generate cache anymore
|
||||
# if servicepool.publications.filter(state=State.PREPARING).count() > 0:
|
||||
# logger.debug(
|
||||
# 'Skipping cache generation for service pool with publication running: %s',
|
||||
# servicepool.name,
|
||||
# )
|
||||
# continue
|
||||
|
||||
# if servicepool.is_restrained():
|
||||
# logger.debug(
|
||||
# 'StopSkippingped cache generation for restrained service pool: %s',
|
||||
# servicepool.name,
|
||||
# )
|
||||
# ServiceCacheUpdater._notify_restrain(servicepool)
|
||||
# continue
|
||||
|
||||
# # Get data related to actual state of cache
|
||||
# # Before we were removing the elements marked to be destroyed after creation, but this makes us
|
||||
# # to create new items over the limit stablisshed, so we will not remove them anymore
|
||||
# l1_cache_count: int = (
|
||||
# servicepool.cached_users_services()
|
||||
# .filter(UserServiceManager.manager().get_cache_state_filter(servicepool, types.services.CacheLevel.L1))
|
||||
# .count()
|
||||
# )
|
||||
# l2_cache_count: int = (
|
||||
# (
|
||||
# servicepool.cached_users_services()
|
||||
# .filter(
|
||||
# UserServiceManager.manager().get_cache_state_filter(servicepool, types.services.CacheLevel.L2)
|
||||
# )
|
||||
# .count()
|
||||
# )
|
||||
# if service_instance.uses_cache_l2
|
||||
# else 0
|
||||
# )
|
||||
# assigned_count: int = (
|
||||
# servicepool.assigned_user_services()
|
||||
# .filter(UserServiceManager.manager().get_state_filter(servicepool.service))
|
||||
# .count()
|
||||
# )
|
||||
# pool_stat = types.services.ServicePoolStats(
|
||||
# servicepool, l1_cache_count, l2_cache_count, assigned_count
|
||||
# )
|
||||
# # if we bypasses max cache, we will reduce it in first place. This is so because this will free resources on service provider
|
||||
# logger.debug(
|
||||
# "Examining %s with %s in cache L1 and %s in cache L2, %s inAssigned",
|
||||
# servicepool.name,
|
||||
# l1_cache_count,
|
||||
# l2_cache_count,
|
||||
# assigned_count,
|
||||
# )
|
||||
|
||||
# # We have more than we want
|
||||
# if pool_stat.l1_cache_overflow():
|
||||
# logger.debug('We have more services than max configured. Reducing..')
|
||||
# servicepools_numbers.append(
|
||||
# types.services.ServicePoolStats(servicepool, l1_cache_count, l2_cache_count, assigned_count)
|
||||
# )
|
||||
# continue
|
||||
|
||||
# # If we have more in L2 cache than needed, decrease L2 cache, but int this case, we continue checking cause L2 cache removal
|
||||
# # has less priority than l1 creations or removals, but higher. In this case, we will simply take last l2 oversized found and reduce it
|
||||
# if pool_stat.l2_cache_overflow():
|
||||
# logger.debug('We have more services in L2 cache than configured, reducing')
|
||||
# servicepools_numbers.append(
|
||||
# types.services.ServicePoolStats(servicepool, l1_cache_count, l2_cache_count, assigned_count)
|
||||
# )
|
||||
# continue
|
||||
|
||||
# # If this service don't allows more starting user services, continue
|
||||
# if not UserServiceManager.manager().can_grow_service_pool(servicepool):
|
||||
# logger.debug(
|
||||
# 'This pool cannot grow rithg now: %s',
|
||||
# servicepool,
|
||||
# )
|
||||
# continue
|
||||
|
||||
# if pool_stat.l1_cache_needed():
|
||||
# logger.debug('Needs to grow L1 cache for %s', servicepool)
|
||||
# servicepools_numbers.append(
|
||||
# types.services.ServicePoolStats(servicepool, l1_cache_count, l2_cache_count, assigned_count)
|
||||
# )
|
||||
# continue
|
||||
|
||||
# if pool_stat.l2_cache_needed():
|
||||
# logger.debug('Needs to grow L2 cache for %s', servicepool)
|
||||
# servicepools_numbers.append(
|
||||
# types.services.ServicePoolStats(servicepool, l1_cache_count, l2_cache_count, assigned_count)
|
||||
# )
|
||||
# continue
|
||||
|
||||
# # We also return calculated values so we can reuse then
|
||||
# return servicepools_numbers
|
||||
|
||||
def service_pools_needing_cache_update(
|
||||
self,
|
||||
) -> list[types.services.ServicePoolStats]:
|
||||
|
@ -59,7 +59,7 @@ class StuckCleaner(Job):
|
||||
since_state: datetime = sql_now() - timedelta(seconds=MAX_STUCK_TIME)
|
||||
# Filter for locating machine stuck on removing, cancelling, etc..
|
||||
# Locate service pools with pending assigned service in use
|
||||
servicePoolswithStucks = (
|
||||
servicepools_with_stucks = (
|
||||
ServicePool.objects.annotate(
|
||||
stuckCount=Count(
|
||||
'userServices',
|
||||
@ -84,7 +84,7 @@ class StuckCleaner(Job):
|
||||
yield from q.exclude(state__in=types.states.State.INFO_STATES + types.states.State.VALID_STATES)
|
||||
yield from q.filter(state=types.states.State.PREPARING)
|
||||
|
||||
for servicepool in servicePoolswithStucks:
|
||||
for servicepool in servicepools_with_stucks:
|
||||
if servicepool.service.get_instance().allows_errored_userservice_cleanup() is False:
|
||||
continue
|
||||
# logger.debug('Searching for stuck states for %s', servicePool.name)
|
||||
|
Loading…
x
Reference in New Issue
Block a user