1
0
mirror of https://github.com/dkmstr/openuds.git synced 2025-02-02 09:47:13 +03:00

Adding support for "return user service to cache" support

This commit is contained in:
Adolfo Gómez García 2024-09-02 18:04:16 +02:00
parent 629d4622a7
commit 0a39939659
No known key found for this signature in database
GPG Key ID: DD1ABF20724CDA23
25 changed files with 446 additions and 245 deletions

View File

@ -729,18 +729,22 @@ def create_dynamic_service(
provider: 'DynamicTestingProvider|None' = None,
maintain_on_error: bool = False,
try_soft_shutdown: bool = False,
override_mockables: bool = True
) -> DynamicTestingService:
uuid_ = str(uuid.uuid4())
service = DynamicTestingService(
srvc = DynamicTestingService(
provider=provider or create_dynamic_provider(),
environment=environment.Environment.private_environment(uuid),
uuid=uuid_,
)
service.mock.reset_mock() # Mock is shared between instances, so we need to reset it
service.machine_running_flag = False
service.maintain_on_error.value = maintain_on_error
service.try_soft_shutdown.value = try_soft_shutdown
return service
srvc.mock.reset_mock() # Mock is shared between instances, so we need to reset it
srvc.machine_running_flag = False
srvc.maintain_on_error.value = maintain_on_error
srvc.try_soft_shutdown.value = try_soft_shutdown
if override_mockables:
srvc.is_deletion_in_progress = mock.MagicMock()
srvc.is_deletion_in_progress.return_value = False
return srvc
def create_dynamic_service_for_deferred_deletion(
@ -805,11 +809,12 @@ def create_dynamic_userservice_queue(
def create_dynamic_userservice(
service: 'DynamicTestingService|None' = None, publication: 'DynamicTestingPublication|None' = None
service: 'DynamicTestingService|None' = None, publication: 'DynamicTestingPublication|None' = None,
override_mockables: bool = True
) -> DynamicTestingUserService:
uuid_ = str(uuid.uuid4())
userservice = DynamicTestingUserService(
service=service or create_dynamic_service(None),
service=service or create_dynamic_service(None, override_mockables=override_mockables),
publication=create_dynamic_publication(None),
environment=environment.Environment.private_environment(uuid),
uuid=uuid_,

View File

@ -198,7 +198,7 @@ class DynamicPublicationTest(UDSTestCase):
self.assertEqual(counter, 10) # 4 retries + 5 retries after reset + 1 of the reset itself
def test_publication_delete(self) -> None:
service = fixtures.create_dynamic_service()
service = fixtures.create_dynamic_service(override_mockables=False)
publication = fixtures.create_dynamic_publication(service)
publication._queue = [
types.services.Operation.NOP, # First check

View File

@ -321,7 +321,7 @@ class DynamicServiceTest(UDSTestCase):
self.assertEqual(counter, 10) # 4 retries + 5 retries after reset + 1 of the reset itself
def test_userservice_delete(self) -> None:
service = fixtures.create_dynamic_service()
service = fixtures.create_dynamic_service(override_mockables=False)
userservice = fixtures.create_dynamic_userservice(service)
userservice._vmid = 'vmid'
userservice._queue = [

View File

@ -49,7 +49,7 @@ if typing.TYPE_CHECKING:
logger = logging.getLogger(__name__)
class ServiceCacheUpdaterTest(UDSTestCase):
servicePool: 'models.ServicePool'
servicepool: 'models.ServicePool'
def setUp(self) -> None:
# Default values for max
@ -60,11 +60,11 @@ class ServiceCacheUpdaterTest(UDSTestCase):
ServiceCacheUpdater.setup()
userService = services_fixtures.create_db_cache_userservices()[0]
self.servicePool = userService.deployed_service
self.servicepool = userService.deployed_service
userService.delete() # empty all
def removing_or_canceled_count(self) -> int:
return self.servicePool.userServices.filter(
return self.servicepool.userServices.filter(
state__in=[State.REMOVABLE, State.CANCELED]
).count()
@ -73,7 +73,7 @@ class ServiceCacheUpdaterTest(UDSTestCase):
updater = ServiceCacheUpdater(Environment.testing_environment())
updater.run()
# Test user service will cancel automatically so it will not get in "removable" state (on remove start, it will tell it has been removed)
return self.servicePool.userServices.count() - self.removing_or_canceled_count()
return self.servicepool.userServices.count() - self.removing_or_canceled_count()
def set_cache(
self,
@ -82,52 +82,52 @@ class ServiceCacheUpdaterTest(UDSTestCase):
cache2: typing.Optional[int] = None,
max: typing.Optional[int] = None,
) -> None:
self.servicePool.initial_srvs = (
self.servicePool.initial_srvs if initial is None else initial
self.servicepool.initial_srvs = (
self.servicepool.initial_srvs if initial is None else initial
)
self.servicePool.cache_l1_srvs = (
self.servicePool.cache_l1_srvs if cache is None else cache
self.servicepool.cache_l1_srvs = (
self.servicepool.cache_l1_srvs if cache is None else cache
)
self.servicePool.cache_l2_srvs = (
self.servicePool.cache_l2_srvs if cache2 is None else cache2
self.servicepool.cache_l2_srvs = (
self.servicepool.cache_l2_srvs if cache2 is None else cache2
)
self.servicePool.max_srvs = self.servicePool.max_srvs if max is None else max
self.servicePool.save()
self.servicepool.max_srvs = self.servicepool.max_srvs if max is None else max
self.servicepool.save()
def test_initial(self) -> None:
self.set_cache(initial=100, cache=10, max=500)
self.assertEqual(
self.execute_cache_updater(self.servicePool.initial_srvs + 10),
self.servicePool.initial_srvs,
self.execute_cache_updater(self.servicepool.initial_srvs + 10),
self.servicepool.initial_srvs,
)
def test_remove(self) -> None:
self.set_cache(initial=100, cache=110, max=500)
self.execute_cache_updater(self.servicePool.cache_l1_srvs)
self.execute_cache_updater(self.servicepool.cache_l1_srvs)
# Now again, decrease cache to original, must remove ten elements
mustDelete = self.servicePool.cache_l1_srvs - self.servicePool.initial_srvs
must_delete = self.servicepool.cache_l1_srvs - self.servicepool.initial_srvs
self.set_cache(cache=10)
self.assertEqual(
self.execute_cache_updater(mustDelete*2), self.servicePool.initial_srvs
self.execute_cache_updater(must_delete*2), self.servicepool.initial_srvs
)
self.assertEqual(self.removing_or_canceled_count(), mustDelete)
self.assertEqual(self.removing_or_canceled_count(), must_delete)
def test_max(self) -> None:
self.set_cache(initial=100, cache=10, max=50)
self.assertEqual(
self.execute_cache_updater(self.servicePool.initial_srvs + 10),
self.servicePool.max_srvs,
self.execute_cache_updater(self.servicepool.initial_srvs + 10),
self.servicepool.max_srvs,
)
self.set_cache(cache=200)
self.assertEqual(
self.execute_cache_updater(self.servicePool.initial_srvs + 10),
self.servicePool.max_srvs,
self.execute_cache_updater(self.servicepool.initial_srvs + 10),
self.servicepool.max_srvs,
)
def test_cache(self) -> None:
@ -135,8 +135,8 @@ class ServiceCacheUpdaterTest(UDSTestCase):
# Try to "overcreate" cache elements (must create 100, that is "cache" (bigger than initial))
self.assertEqual(
self.execute_cache_updater(self.servicePool.cache_l1_srvs + 10),
self.servicePool.cache_l1_srvs,
self.execute_cache_updater(self.servicepool.cache_l1_srvs + 10),
self.servicepool.cache_l1_srvs,
)
def test_provider_preparing_limits(self) -> None:
@ -144,14 +144,14 @@ class ServiceCacheUpdaterTest(UDSTestCase):
self.set_cache(initial=100, cache=10, max=50)
# Try to "overcreate" cache elements but provider limits it to 10
self.assertEqual(self.execute_cache_updater(self.servicePool.cache_l1_srvs + 10), 10)
self.assertEqual(self.execute_cache_updater(self.servicepool.cache_l1_srvs + 10), 10)
# Delete all userServices
self.servicePool.userServices.all().delete()
self.servicepool.userServices.all().delete()
# Now, set provider limit to 0. Minumum aceptable is 1, so 1 will be created
TestProvider.concurrent_creation_limit = 0
self.assertEqual(self.execute_cache_updater(self.servicePool.cache_l1_srvs + 10), 1)
self.assertEqual(self.execute_cache_updater(self.servicepool.cache_l1_srvs + 10), 1)
def test_provider_no_removing_limits(self) -> None:
# Removing limits are appliend in fact when EXECUTING removal, not when marking as removable
@ -159,7 +159,7 @@ class ServiceCacheUpdaterTest(UDSTestCase):
self.set_cache(initial=0, cache=50, max=50)
# Try to "overcreate" cache elements but provider limits it to 10
self.execute_cache_updater(self.servicePool.cache_l1_srvs)
self.execute_cache_updater(self.servicepool.cache_l1_srvs)
# Now set cache to a lower value
self.set_cache(cache=10)
@ -174,12 +174,12 @@ class ServiceCacheUpdaterTest(UDSTestCase):
self.set_cache(initial=100, cache=100, max=50)
# Try to "overcreate" cache elements but provider limits it to 10
self.assertEqual(self.execute_cache_updater(self.servicePool.cache_l1_srvs + 10), TestServiceCache.userservices_limit)
self.assertEqual(self.execute_cache_updater(self.servicepool.cache_l1_srvs + 10), TestServiceCache.userservices_limit)
# Delete all userServices
self.servicePool.userServices.all().delete()
self.servicepool.userServices.all().delete()
# We again allow masUserServices to be zero (meaning that no service will be created)
# This allows us to "honor" some external providers that, in some cases, will not have services available...
TestServiceCache.userservices_limit = 0
self.assertEqual(self.execute_cache_updater(self.servicePool.cache_l1_srvs + 10), 0)
self.assertEqual(self.execute_cache_updater(self.servicepool.cache_l1_srvs + 10), 0)

View File

@ -590,6 +590,8 @@ def create_service_linked(
service_db_mock.name = 'ServiceName'
srvc.db_obj = mock.MagicMock()
srvc.db_obj.return_value = service_db_mock
srvc.is_deletion_in_progress = mock.MagicMock()
srvc.is_deletion_in_progress.return_value = False
return srvc

View File

@ -490,6 +490,8 @@ def create_service_linked(
service_db_mock.name = 'ServiceName'
srvc.db_obj = mock.MagicMock()
srvc.db_obj.return_value = service_db_mock
srvc.is_deletion_in_progress = mock.MagicMock()
srvc.is_deletion_in_progress.return_value = False
return srvc

View File

@ -673,9 +673,9 @@ class Logout(ActorV3Action):
if osmanager:
if osmanager.is_removable_on_logout(userservice):
logger.debug('Removable on logout: %s', osmanager)
userservice.remove()
userservice.release()
else:
userservice.remove()
userservice.release()
def action(self) -> dict[str, typing.Any]:
is_managed = self._params.get('type') != consts.actor.UNMANAGED

View File

@ -256,7 +256,7 @@ class MetaAssignedService(DetailHandler):
logStr = 'Deleted cached service {} by {}'.format(userService.friendly_name, self._user.pretty_name)
if userService.state in (State.USABLE, State.REMOVING):
userService.remove()
userService.release()
elif userService.state == State.PREPARING:
userService.cancel()
elif userService.state == State.REMOVABLE:

View File

@ -145,7 +145,7 @@ class ServicesUsage(DetailHandler):
logger.debug('Deleting user service')
if userService.state in (State.USABLE, State.REMOVING):
userService.remove()
userService.release()
elif userService.state == State.PREPARING:
userService.cancel()
elif userService.state == State.REMOVABLE:

View File

@ -216,7 +216,7 @@ class AssignedService(DetailHandler):
logStr = f'Deleted cached service {userService.friendly_name} by {self._user.pretty_name}'
if userService.state in (State.USABLE, State.REMOVING):
userService.remove()
userService.release()
elif userService.state == State.PREPARING:
userService.cancel()
elif userService.state == State.REMOVABLE:

View File

@ -141,7 +141,7 @@ def process_logout(server: 'models.Server', data: dict[str, typing.Any]) -> typi
osmanager: typing.Optional[osmanagers.OSManager] = userService.get_osmanager_instance()
if not osmanager or osmanager.is_removable_on_logout(userService):
logger.debug('Removable on logout: %s', osmanager)
userService.remove()
userService.release()
return rest_result(consts.OK)

View File

@ -50,7 +50,7 @@ from uds.core.services.exceptions import (
)
from uds.core.util import log, singleton
from uds.core.util.decorators import cached
from uds.core.util.model import sql_now
from uds.core.util.model import generate_uuid, sql_now
from uds.core.types.states import State
from uds.core.util.stats import events
from uds.models import MetaPool, ServicePool, ServicePoolPublication, Transport, User, UserService
@ -302,6 +302,130 @@ class UserServiceManager(metaclass=singleton.Singleton):
# Data will be serialized on makeUnique process
UserServiceOpChecker.make_unique(cache, cache_instance, state)
def clone_userservice_as_cache(self, user_service: UserService) -> UserService:
"""
Clones the record of a user service, cleaning up some fields so it's a cache element
The uuid will be regenerated, and the pk will be set to None
"""
# Load as new variable to avoid modifying original
user_service = UserService.objects.get(id=user_service.id)
user_service.pk = None
user_service.uuid = generate_uuid()
user_service.user = None
user_service.cache_level = types.services.CacheLevel.L1
user_service.in_use = False
user_service.state = State.USABLE # We set it to usable so it can be used directly...
user_service.os_state = State.USABLE
# Save the new cache element
user_service.save()
return user_service
def get_cache_servicepool_stats(self, servicepool: ServicePool) -> 'types.services.ServicePoolStats':
"""
Returns the stats (for cache pourposes) for a service pool.
"""
# State filter for cached and inAssigned objects
# First we get all deployed services that could need cache generation
# We start filtering out the deployed services that do not need caching at all.
if (
servicepool.max_srvs == 0
or servicepool.state != State.ACTIVE
or servicepool.service.provider.maintenance_mode is True
):
return types.services.ServicePoolStats.null() # No cache needed for this servicepool
service_instance = servicepool.service.get_instance()
servicepool.user_services.update() # Cleans cached queries
# If this deployedService don't have a publication active and needs it, ignore it
service_instance = servicepool.service.get_instance()
if service_instance.uses_cache is False:
logger.debug(
'Service pool does not uses cache: %s',
servicepool.name,
)
return types.services.ServicePoolStats.null()
if servicepool.active_publication() is None and service_instance.publication_type is not None:
logger.debug(
'Service pool needs publication and has none: %s',
servicepool.name,
)
return types.services.ServicePoolStats.null()
# If it has any running publication, do not generate cache anymore
if servicepool.publications.filter(state=State.PREPARING).count() > 0:
logger.debug(
'Service pool with publication running: %s',
servicepool.name,
)
return types.services.ServicePoolStats.null()
if servicepool.is_restrained():
logger.debug(
'Restrained service pool: %s',
servicepool.name,
)
return types.services.ServicePoolStats.null()
# Get data related to actual state of cache
# Before we were removing the elements marked to be destroyed after creation, but this makes us
# to create new items over the limit stablisshed, so we will not remove them anymore
l1_cache_count: int = (
servicepool.cached_users_services()
.filter(UserServiceManager().get_cache_state_filter(servicepool, types.services.CacheLevel.L1))
.count()
)
l2_cache_count: int = (
(
servicepool.cached_users_services()
.filter(UserServiceManager().get_cache_state_filter(servicepool, types.services.CacheLevel.L2))
.count()
)
if service_instance.uses_cache_l2
else 0
)
assigned_count: int = (
servicepool.assigned_user_services()
.filter(UserServiceManager().get_state_filter(servicepool.service))
.count()
)
pool_stat = types.services.ServicePoolStats(servicepool, l1_cache_count, l2_cache_count, assigned_count)
# if we bypasses max cache, we will reduce it in first place. This is so because this will free resources on service provider
logger.debug(
"Examining %s with %s in cache L1 and %s in cache L2, %s inAssigned",
servicepool.name,
l1_cache_count,
l2_cache_count,
assigned_count,
)
# Check for cache overflow
# We have more than we want
if pool_stat.has_l1_cache_overflow() or pool_stat.has_l2_cache_overflow():
logger.debug('We have more services than max configured.')
return pool_stat
# Check for cache needed
# If this service don't allows more starting user services...
if not UserServiceManager.manager().can_grow_service_pool(servicepool):
logger.debug(
'This pool cannot grow rithg now: %s',
servicepool,
)
return types.services.ServicePoolStats.null()
if pool_stat.is_l1_cache_growth_required() or pool_stat.is_l2_cache_growth_required():
logger.debug('Needs to grow L1 cache for %s', servicepool)
return pool_stat
# If this point reached, we do not need any cache
return types.services.ServicePoolStats.null()
def cancel(self, user_service: UserService) -> None:
"""
Cancels an user service creation
@ -360,6 +484,24 @@ class UserServiceManager(metaclass=singleton.Singleton):
_('Can\'t remove nor cancel {} cause its state don\'t allow it').format(user_service.name)
)
def release_on_logout(self, user_service: UserService) -> None:
"""
In case of logout, this method will take care of removing the service
This is so because on logout, may the userservice returns back to cache if ower service
desired it that way.
This method will take care of removing the service if no cache is desired of cache already full (on servicepool)
"""
stats = self.get_cache_servicepool_stats(user_service.deployed_service)
# Note that only moves to cache L1
# Also, we can get values for L2 cache, thats why we check L1 for overflow and needed
if stats.has_l1_cache_overflow():
user_service.release() # Mark as removable
elif stats.is_l1_cache_growth_required():
# Move the clone of the user service to cache, and set our as REMOVED
_cache = self.clone_userservice_as_cache(user_service)
user_service.set_state(State.REMOVED)
def get_existing_assignation_for_user(
self, service_pool: ServicePool, user: User
) -> typing.Optional[UserService]:
@ -664,7 +806,7 @@ class UserServiceManager(metaclass=singleton.Singleton):
remove = True
if remove:
user_service.remove()
user_service.release()
def notify_ready_from_os_manager(self, user_service: UserService, data: typing.Any) -> None:
try:
@ -955,10 +1097,10 @@ class UserServiceManager(metaclass=singleton.Singleton):
# Sort pools related to policy now, and xtract only pools, not sort keys
# split resuult in two lists, 100% full and not 100% full
# Remove "full" pools (100%) from result and pools in maintenance mode, not ready pools, etc...
sortedPools = sorted(sortPools, key=operator.itemgetter(0)) # sort by priority (first element)
sorted_pools = sorted(sortPools, key=operator.itemgetter(0)) # sort by priority (first element)
pools: list[ServicePool] = []
pool_full: list[ServicePool] = []
for p in sortedPools:
for p in sorted_pools:
if not p[1].is_usable():
continue
if p[1].usage().percent == 100:

View File

@ -490,7 +490,7 @@ class DynamicPublication(services.Publication, autoserializable.AutoSerializable
"""
This method is called to check if the service is removed
"""
if self.service().check_deleting_status(self, self._vmid) is False:
if self.service().is_deletion_in_progress(self, self._vmid) is False:
return types.states.TaskState.FINISHED
return types.states.TaskState.RUNNING

View File

@ -253,7 +253,7 @@ class DynamicService(services.Service, abc.ABC): # pylint: disable=too-many-pub
# Store deleting vmid
storage[f'deleting_{vmid}'] = True
def check_deleting_status(self, caller_instance: typing.Optional['DynamicUserService | DynamicPublication'], vmid: str) -> bool:
def is_deletion_in_progress(self, caller_instance: typing.Optional['DynamicUserService | DynamicPublication'], vmid: str) -> bool:
"""
Checks if the deferred deletion of a machine is running
Default implementation is return False always

View File

@ -811,7 +811,7 @@ class DynamicUserService(services.UserService, autoserializable.AutoSerializable
"""
This method is called to check if the service is removed
"""
if self.service().check_deleting_status(self, self._vmid) is False:
if self.service().is_deletion_in_progress(self, self._vmid) is False:
return types.states.TaskState.FINISHED
return types.states.TaskState.RUNNING

View File

@ -29,9 +29,13 @@
"""
Author: Adolfo Gómez, dkmaster at dkmon dot com
"""
import typing
import dataclasses
import enum
if typing.TYPE_CHECKING:
from uds.models.service_pool import ServicePool
class ServiceType(enum.StrEnum):
VDI = 'VDI'
@ -116,6 +120,7 @@ class ReadyStatus(enum.IntEnum):
except ValueError:
return ReadyStatus.USERSERVICE_NOT_READY
class CacheLevel(enum.IntEnum):
NONE = 0 # : Constant for User cache level (no cache at all)
L1 = 1 # : Constant for Cache of level 1
@ -131,6 +136,7 @@ class Operation(enum.IntEnum):
* Note that we will need to "translate" old values to new ones on the service,
* Adapting existing services to this, will probably need a migration
"""
# Standard operations 1000-1999
INITIALIZE = 1000
CREATE = 1001
@ -147,16 +153,16 @@ class Operation(enum.IntEnum):
RESET_COMPLETED = 1012
DELETE = 1013
DELETE_COMPLETED = 1014
WAIT = 1100 # This is a "wait" operation, used to wait for something to happen
NOP = 1101
RETRY = 1102 # Do not have executors, inserted to retry operation and recognize it
# Custom validations 2000-2999
DESTROY_VALIDATOR = 2000 # Check if the userservice has an vmid to stop destroying it if needed
# Specific operations 3000-3999
# for Fixed User Services
SNAPSHOT_CREATE = 3000
SNAPSHOT_RECOVER = 3001
@ -173,7 +179,7 @@ class Operation(enum.IntEnum):
# So we will translate, for example SNAPSHOT_CREATE to CUSTOM_1, etc..
# Fixed user services does not allows custom operations, we use them
# to alias some fixed operations (like snapshot create, recover, etc..)
# Custom operations 20000-29999
CUSTOM_1 = 20001
CUSTOM_2 = 20002
@ -184,7 +190,7 @@ class Operation(enum.IntEnum):
CUSTOM_7 = 20007
CUSTOM_8 = 20008
CUSTOM_9 = 20009
def is_custom(self) -> bool:
"""
Returns if the operation is a custom one
@ -197,7 +203,68 @@ class Operation(enum.IntEnum):
return Operation(value)
except ValueError:
return Operation.UNKNOWN
def as_str(self) -> str:
return self.name
@dataclasses.dataclass
class ServicePoolStats:
servicepool: typing.Optional['ServicePool']
l1_cache_count: int
l2_cache_count: int
assigned_count: int
def has_l1_cache_overflow(self) -> bool:
"""Checks if L1 cache is overflown
Overflows if:
* l1_assigned_count > max_srvs
(this is, if we have more than max, we can remove cached l1 items until we reach max)
* l1_assigned_count > initial_srvs and l1_cache_count > cache_l1_srvs
(this is, if we have more than initial, we can remove cached l1 items until we reach cache_l1_srvs)
"""
if not self.servicepool:
return False
l1_assigned_count = self.l1_cache_count + self.assigned_count
return l1_assigned_count > self.servicepool.max_srvs or (
l1_assigned_count > self.servicepool.initial_srvs
and self.l1_cache_count > self.servicepool.cache_l1_srvs
)
def is_l1_cache_growth_required(self) -> bool:
"""Checks if L1 cache is needed
Grow L1 cache if:
* l1_assigned_count < max_srvs and (l1_assigned_count < initial_srvs or l1_cache_count < cache_l1_srvs)
(this is, if we have not reached max, and we have not reached initial or cache_l1_srvs, we need to grow L1 cache)
"""
if not self.servicepool:
return False
l1_assigned_count = self.l1_cache_count + self.assigned_count
return l1_assigned_count < self.servicepool.max_srvs and (
l1_assigned_count < self.servicepool.initial_srvs
or self.l1_cache_count < self.servicepool.cache_l1_srvs
)
def has_l2_cache_overflow(self) -> bool:
"""Checks if L2 cache is overflown"""
if not self.servicepool:
return False
return self.l2_cache_count > self.servicepool.cache_l2_srvs
def is_l2_cache_growth_required(self) -> bool:
"""Checks if L2 cache is needed"""
if not self.servicepool:
return False
return self.l2_cache_count < self.servicepool.cache_l2_srvs
def is_null(self) -> bool:
return self.servicepool is None
@staticmethod
def null() -> 'ServicePoolStats':
return ServicePoolStats(None, 0, 0, 0)

View File

@ -96,8 +96,6 @@ class State(enum.StrEnum):
@property
def localized(self) -> str:
"""Returns the literal translation of the state"""
print(self)
print(_TRANSLATIONS.get(self))
return _TRANSLATIONS.get(self, _TRANSLATIONS[State.UNKNOWN])
def is_active(self) -> bool:

View File

@ -170,7 +170,7 @@ class CalendarAction(UUIDModel):
for userService in self.service_pool.assigned_user_services().filter(
state=types.states.State.USABLE
):
userService.remove()
userService.release()
def _remove_stuck_userservice() -> None:
# 1.- Remove stuck assigned services (Ignore "creating ones", just for created)
@ -178,7 +178,7 @@ class CalendarAction(UUIDModel):
for userService in self.service_pool.assigned_user_services().filter(
state_date__lt=since, state=types.states.State.USABLE
):
userService.remove()
userService.release()
def _del_all_transport() -> None:
# 2.- Remove all transports
@ -200,7 +200,7 @@ class CalendarAction(UUIDModel):
),
)
):
i.remove()
i.release()
def _add_del_transport() -> None:
try:

View File

@ -227,7 +227,7 @@ class User(UUIDModel, properties.PropertiesMixin):
# Removes all user services assigned to this user (unassign it and mark for removal)
for us in to_delete.userServices.all():
us.assign_to(None)
us.remove()
us.release()
logger.debug('Deleted user %s', to_delete)

View File

@ -524,26 +524,20 @@ class UserService(UUIDModel, properties.PropertiesMixin):
"""
Returns if this service is ready (not preparing or marked for removal)
"""
# pylint: disable=import-outside-toplevel
from uds.core.managers.userservice import UserServiceManager
# Call to isReady of the instance
return UserServiceManager().is_ready(self)
return UserServiceManager.manager().is_ready(self)
def is_in_maintenance(self) -> bool:
return self.deployed_service.is_in_maintenance()
def remove(self) -> None:
"""
Mark this user deployed service for removal
"""
self.set_state(State.REMOVABLE)
def release(self) -> None:
"""
A much more convenient method name that "remove" (i think :) )
Mark this user deployed service for removal.
If from_logout is true, maybe the service can return to cache, else, it will be removed
"""
self.remove()
self.set_state(State.REMOVABLE)
def cancel(self) -> None:
"""
@ -560,7 +554,7 @@ class UserService(UUIDModel, properties.PropertiesMixin):
Marks for removal or cancels it, depending on state
"""
if self.is_usable():
self.remove()
self.release()
else:
self.cancel()

View File

@ -123,7 +123,7 @@ class LinuxOsManager(osmanagers.OSManager):
'Unused user service for too long. Removing due to OS Manager parameters.',
types.log.LogSource.OSMANAGER,
)
userservice.remove()
userservice.release()
def is_persistent(self) -> bool:
return fields.onlogout_field_is_persistent(self.on_logout)

View File

@ -123,7 +123,7 @@ class TestOSManager(osmanagers.OSManager):
'Unused user service for too long. Removing due to OS Manager parameters.',
types.log.LogSource.OSMANAGER,
)
userservice.remove()
userservice.release()
def is_persistent(self) -> bool:
return self.on_logout.value == 'keep-always'

View File

@ -114,7 +114,7 @@ class WindowsOsManager(osmanagers.OSManager):
'Unused user service for too long. Removing due to OS Manager parameters.',
types.log.LogSource.OSMANAGER,
)
userservice.remove()
userservice.release()
def is_persistent(self) -> bool:
return fields.onlogout_field_is_persistent(self.on_logout)

View File

@ -80,6 +80,6 @@ class OpenGnsysMaintainer(jobs.Job):
'The cached user service %s is about to expire. Removing it so it can be recreated',
userService,
)
userService.remove()
userService.release()
logger.debug('OpenGnsys job finished')

View File

@ -29,7 +29,6 @@
"""
Author: Adolfo Gómez, dkmaster at dkmon dot com
"""
import dataclasses
import logging
import typing
import collections.abc
@ -57,51 +56,6 @@ logger = logging.getLogger(__name__)
# * l2 will be removed until cache_l2_srvs is reached
@dataclasses.dataclass(slots=True)
class ServicePoolStats:
servicepool: ServicePool
l1_cache_count: int
l2_cache_count: int
assigned_count: int
def l1_cache_overflow(self) -> bool:
"""Checks if L1 cache is overflown
Overflows if:
* l1_assigned_count > max_srvs
(this is, if we have more than max, we can remove cached l1 items until we reach max)
* l1_assigned_count > initial_srvs and l1_cache_count > cache_l1_srvs
(this is, if we have more than initial, we can remove cached l1 items until we reach cache_l1_srvs)
"""
l1_assigned_count = self.l1_cache_count + self.assigned_count
return l1_assigned_count > self.servicepool.max_srvs or (
l1_assigned_count > self.servicepool.initial_srvs
and self.l1_cache_count > self.servicepool.cache_l1_srvs
)
def l1_cache_needed(self) -> bool:
"""Checks if L1 cache is needed
Grow L1 cache if:
* l1_assigned_count < max_srvs and (l1_assigned_count < initial_srvs or l1_cache_count < cache_l1_srvs)
(this is, if we have not reached max, and we have not reached initial or cache_l1_srvs, we need to grow L1 cache)
"""
l1_assigned_count = self.l1_cache_count + self.assigned_count
return l1_assigned_count < self.servicepool.max_srvs and (
l1_assigned_count < self.servicepool.initial_srvs
or self.l1_cache_count < self.servicepool.cache_l1_srvs
)
def l2_cache_overflow(self) -> bool:
"""Checks if L2 cache is overflown"""
return self.l2_cache_count > self.servicepool.cache_l2_srvs
def l2_cache_needed(self) -> bool:
"""Checks if L2 cache is needed"""
return self.l2_cache_count < self.servicepool.cache_l2_srvs
class ServiceCacheUpdater(Job):
"""
Cache updater is responsible of keeping up to date the cache for different deployed services configurations requested
@ -131,9 +85,139 @@ class ServiceCacheUpdater(Job):
remaining_restraing_time,
)
# def service_pools_needing_cache_update(
# self,
# ) -> list[types.services.ServicePoolStats]:
# # State filter for cached and inAssigned objects
# # First we get all deployed services that could need cache generation
# # We start filtering out the deployed services that do not need caching at all.
# candidate_servicepools: collections.abc.Iterable[ServicePool] = (
# ServicePool.objects.filter(Q(initial_srvs__gte=0) | Q(cache_l1_srvs__gte=0))
# .filter(
# max_srvs__gt=0,
# state=State.ACTIVE,
# service__provider__maintenance_mode=False,
# )
# .iterator()
# )
# # We will get the one that proportionally needs more cache
# servicepools_numbers: list[types.services.ServicePoolStats] = []
# for servicepool in candidate_servicepools:
# servicepool.user_services.update() # Cleans cached queries
# # If this deployedService don't have a publication active and needs it, ignore it
# service_instance = servicepool.service.get_instance()
# if service_instance.uses_cache is False:
# logger.debug(
# 'Skipping cache generation for service pool that does not uses cache: %s',
# servicepool.name,
# )
# continue
# if servicepool.active_publication() is None and service_instance.publication_type is not None:
# logger.debug(
# 'Skipping. %s Needs publication but do not have one',
# servicepool.name,
# )
# continue
# # If it has any running publication, do not generate cache anymore
# if servicepool.publications.filter(state=State.PREPARING).count() > 0:
# logger.debug(
# 'Skipping cache generation for service pool with publication running: %s',
# servicepool.name,
# )
# continue
# if servicepool.is_restrained():
# logger.debug(
# 'StopSkippingped cache generation for restrained service pool: %s',
# servicepool.name,
# )
# ServiceCacheUpdater._notify_restrain(servicepool)
# continue
# # Get data related to actual state of cache
# # Before we were removing the elements marked to be destroyed after creation, but this makes us
# # to create new items over the limit stablisshed, so we will not remove them anymore
# l1_cache_count: int = (
# servicepool.cached_users_services()
# .filter(UserServiceManager().get_cache_state_filter(servicepool, types.services.CacheLevel.L1))
# .count()
# )
# l2_cache_count: int = (
# (
# servicepool.cached_users_services()
# .filter(
# UserServiceManager().get_cache_state_filter(servicepool, types.services.CacheLevel.L2)
# )
# .count()
# )
# if service_instance.uses_cache_l2
# else 0
# )
# assigned_count: int = (
# servicepool.assigned_user_services()
# .filter(UserServiceManager().get_state_filter(servicepool.service))
# .count()
# )
# pool_stat = types.services.ServicePoolStats(
# servicepool, l1_cache_count, l2_cache_count, assigned_count
# )
# # if we bypasses max cache, we will reduce it in first place. This is so because this will free resources on service provider
# logger.debug(
# "Examining %s with %s in cache L1 and %s in cache L2, %s inAssigned",
# servicepool.name,
# l1_cache_count,
# l2_cache_count,
# assigned_count,
# )
# # We have more than we want
# if pool_stat.l1_cache_overflow():
# logger.debug('We have more services than max configured. Reducing..')
# servicepools_numbers.append(
# types.services.ServicePoolStats(servicepool, l1_cache_count, l2_cache_count, assigned_count)
# )
# continue
# # If we have more in L2 cache than needed, decrease L2 cache, but int this case, we continue checking cause L2 cache removal
# # has less priority than l1 creations or removals, but higher. In this case, we will simply take last l2 oversized found and reduce it
# if pool_stat.l2_cache_overflow():
# logger.debug('We have more services in L2 cache than configured, reducing')
# servicepools_numbers.append(
# types.services.ServicePoolStats(servicepool, l1_cache_count, l2_cache_count, assigned_count)
# )
# continue
# # If this service don't allows more starting user services, continue
# if not UserServiceManager().can_grow_service_pool(servicepool):
# logger.debug(
# 'This pool cannot grow rithg now: %s',
# servicepool,
# )
# continue
# if pool_stat.l1_cache_needed():
# logger.debug('Needs to grow L1 cache for %s', servicepool)
# servicepools_numbers.append(
# types.services.ServicePoolStats(servicepool, l1_cache_count, l2_cache_count, assigned_count)
# )
# continue
# if pool_stat.l2_cache_needed():
# logger.debug('Needs to grow L2 cache for %s', servicepool)
# servicepools_numbers.append(
# types.services.ServicePoolStats(servicepool, l1_cache_count, l2_cache_count, assigned_count)
# )
# continue
# # We also return calculated values so we can reuse then
# return servicepools_numbers
def service_pools_needing_cache_update(
self,
) -> list[ServicePoolStats]:
) -> list[types.services.ServicePoolStats]:
# State filter for cached and inAssigned objects
# First we get all deployed services that could need cache generation
# We start filtering out the deployed services that do not need caching at all.
@ -148,120 +232,18 @@ class ServiceCacheUpdater(Job):
)
# We will get the one that proportionally needs more cache
servicepools_numbers: list[ServicePoolStats] = []
servicepools_numbers: list[types.services.ServicePoolStats] = []
for servicepool in candidate_servicepools:
servicepool.user_services.update() # Cleans cached queries
# If this deployedService don't have a publication active and needs it, ignore it
service_instance = servicepool.service.get_instance()
if service_instance.uses_cache is False:
logger.debug(
'Skipping cache generation for service pool that does not uses cache: %s',
servicepool.name,
)
continue
if servicepool.active_publication() is None and service_instance.publication_type is not None:
logger.debug(
'Skipping. %s Needs publication but do not have one',
servicepool.name,
)
continue
# If it has any running publication, do not generate cache anymore
if servicepool.publications.filter(state=State.PREPARING).count() > 0:
logger.debug(
'Skipping cache generation for service pool with publication running: %s',
servicepool.name,
)
continue
if servicepool.is_restrained():
logger.debug(
'StopSkippingped cache generation for restrained service pool: %s',
servicepool.name,
)
ServiceCacheUpdater._notify_restrain(servicepool)
continue
# Get data related to actual state of cache
# Before we were removing the elements marked to be destroyed after creation, but this makes us
# to create new items over the limit stablisshed, so we will not remove them anymore
l1_cache_count: int = (
servicepool.cached_users_services()
.filter(UserServiceManager().get_cache_state_filter(servicepool, types.services.CacheLevel.L1))
.count()
)
l2_cache_count: int = (
(
servicepool.cached_users_services()
.filter(
UserServiceManager().get_cache_state_filter(servicepool, types.services.CacheLevel.L2)
)
.count()
)
if service_instance.uses_cache_l2
else 0
)
assigned_count: int = (
servicepool.assigned_user_services()
.filter(UserServiceManager().get_state_filter(servicepool.service))
.count()
)
pool_stat = ServicePoolStats(servicepool, l1_cache_count, l2_cache_count, assigned_count)
# if we bypasses max cache, we will reduce it in first place. This is so because this will free resources on service provider
logger.debug(
"Examining %s with %s in cache L1 and %s in cache L2, %s inAssigned",
servicepool.name,
l1_cache_count,
l2_cache_count,
assigned_count,
)
# We have more than we want
if pool_stat.l1_cache_overflow():
logger.debug('We have more services than max configured. Reducing..')
servicepools_numbers.append(
ServicePoolStats(servicepool, l1_cache_count, l2_cache_count, assigned_count)
)
continue
# If we have more in L2 cache than needed, decrease L2 cache, but int this case, we continue checking cause L2 cache removal
# has less priority than l1 creations or removals, but higher. In this case, we will simply take last l2 oversized found and reduce it
if pool_stat.l2_cache_overflow():
logger.debug('We have more services in L2 cache than configured, reducing')
servicepools_numbers.append(
ServicePoolStats(servicepool, l1_cache_count, l2_cache_count, assigned_count)
)
continue
# If this service don't allows more starting user services, continue
if not UserServiceManager().can_grow_service_pool(servicepool):
logger.debug(
'This pool cannot grow rithg now: %s',
servicepool,
)
continue
if pool_stat.l1_cache_needed():
logger.debug('Needs to grow L1 cache for %s', servicepool)
servicepools_numbers.append(
ServicePoolStats(servicepool, l1_cache_count, l2_cache_count, assigned_count)
)
continue
if pool_stat.l2_cache_needed():
logger.debug('Needs to grow L2 cache for %s', servicepool)
servicepools_numbers.append(
ServicePoolStats(servicepool, l1_cache_count, l2_cache_count, assigned_count)
)
continue
stats = UserServiceManager.manager().get_cache_servicepool_stats(servicepool)
if not stats.is_null():
servicepools_numbers.append(stats)
# We also return calculated values so we can reuse then
return servicepools_numbers
def grow_l1_cache(
self,
servicepool_stats: ServicePoolStats,
servicepool_stats: types.services.ServicePoolStats,
) -> None:
"""
This method tries to enlarge L1 cache.
@ -270,6 +252,8 @@ class ServiceCacheUpdater(Job):
and PREPARING, assigned, L1 and L2) is over max allowed service deployments,
this method will not grow the L1 cache
"""
if servicepool_stats.servicepool is None:
return
logger.debug('Growing L1 cache creating a new service for %s', servicepool_stats.servicepool.name)
# First, we try to assign from L2 cache
if servicepool_stats.l2_cache_count > 0:
@ -301,7 +285,7 @@ class ServiceCacheUpdater(Job):
return
try:
# This has a velid publication, or it will not be here
UserServiceManager().create_cache_for(
UserServiceManager.manager().create_cache_for(
typing.cast(ServicePoolPublication, servicepool_stats.servicepool.active_publication()),
types.services.CacheLevel.L1,
)
@ -322,7 +306,7 @@ class ServiceCacheUpdater(Job):
def grow_l2_cache(
self,
servicepool_stats: ServicePoolStats,
servicepool_stats: types.services.ServicePoolStats,
) -> None:
"""
Tries to grow L2 cache of service.
@ -331,10 +315,12 @@ class ServiceCacheUpdater(Job):
and PREPARING, assigned, L1 and L2) is over max allowed service deployments,
this method will not grow the L1 cache
"""
if servicepool_stats.servicepool is None:
return
logger.debug("Growing L2 cache creating a new service for %s", servicepool_stats.servicepool.name)
try:
# This has a velid publication, or it will not be here
UserServiceManager().create_cache_for(
UserServiceManager.manager().create_cache_for(
typing.cast(ServicePoolPublication, servicepool_stats.servicepool.active_publication()),
types.services.CacheLevel.L2,
)
@ -344,15 +330,18 @@ class ServiceCacheUpdater(Job):
servicepool_stats.servicepool.name,
servicepool_stats.servicepool.max_srvs,
)
# TODO: When alerts are ready, notify this
# Alerts notified through logger
def reduce_l1_cache(
self,
servicepool_stats: ServicePoolStats,
servicepool_stats: types.services.ServicePoolStats,
) -> None:
logger.debug("Reducing L1 cache erasing a service in cache for %s", servicepool_stats.servicepool)
# We will try to destroy the newest l1_cache_count element that is USABLE if the deployer can't cancel a new service creation
# Here, we will take into account the "remove_after" marked user services, so we don't try to remove them
if servicepool_stats.servicepool is None:
return
cacheItems: list[UserService] = [
i
for i in servicepool_stats.servicepool.cached_users_services()
@ -392,8 +381,10 @@ class ServiceCacheUpdater(Job):
def reduce_l2_cache(
self,
servicepool_stats: ServicePoolStats,
servicepool_stats: types.services.ServicePoolStats,
) -> None:
if servicepool_stats.servicepool is None:
return
logger.debug("Reducing L2 cache erasing a service in cache for %s", servicepool_stats.servicepool.name)
if servicepool_stats.l2_cache_count > 0:
cacheItems = (
@ -418,12 +409,12 @@ class ServiceCacheUpdater(Job):
# Treat l1 and l2 cache independently
# first, try to reduce cache and then grow it
if servicepool_stat.l1_cache_overflow():
if servicepool_stat.has_l1_cache_overflow():
self.reduce_l1_cache(servicepool_stat)
elif servicepool_stat.l1_cache_needed(): # We need more L1 items
elif servicepool_stat.is_l1_cache_growth_required(): # We need more L1 items
self.grow_l1_cache(servicepool_stat)
# Treat l1 and l2 cache independently
if servicepool_stat.l2_cache_overflow():
if servicepool_stat.has_l2_cache_overflow():
self.reduce_l2_cache(servicepool_stat)
elif servicepool_stat.l2_cache_needed(): # We need more L2 items
elif servicepool_stat.is_l2_cache_growth_required(): # We need more L2 items
self.grow_l2_cache(servicepool_stat)