diff --git a/server/src/tests/core/services/generics/fixtures.py b/server/src/tests/core/services/generics/fixtures.py index 5c1236aec..2ba798609 100644 --- a/server/src/tests/core/services/generics/fixtures.py +++ b/server/src/tests/core/services/generics/fixtures.py @@ -729,18 +729,22 @@ def create_dynamic_service( provider: 'DynamicTestingProvider|None' = None, maintain_on_error: bool = False, try_soft_shutdown: bool = False, + override_mockables: bool = True ) -> DynamicTestingService: uuid_ = str(uuid.uuid4()) - service = DynamicTestingService( + srvc = DynamicTestingService( provider=provider or create_dynamic_provider(), environment=environment.Environment.private_environment(uuid), uuid=uuid_, ) - service.mock.reset_mock() # Mock is shared between instances, so we need to reset it - service.machine_running_flag = False - service.maintain_on_error.value = maintain_on_error - service.try_soft_shutdown.value = try_soft_shutdown - return service + srvc.mock.reset_mock() # Mock is shared between instances, so we need to reset it + srvc.machine_running_flag = False + srvc.maintain_on_error.value = maintain_on_error + srvc.try_soft_shutdown.value = try_soft_shutdown + if override_mockables: + srvc.is_deletion_in_progress = mock.MagicMock() + srvc.is_deletion_in_progress.return_value = False + return srvc def create_dynamic_service_for_deferred_deletion( @@ -805,11 +809,12 @@ def create_dynamic_userservice_queue( def create_dynamic_userservice( - service: 'DynamicTestingService|None' = None, publication: 'DynamicTestingPublication|None' = None + service: 'DynamicTestingService|None' = None, publication: 'DynamicTestingPublication|None' = None, + override_mockables: bool = True ) -> DynamicTestingUserService: uuid_ = str(uuid.uuid4()) userservice = DynamicTestingUserService( - service=service or create_dynamic_service(None), + service=service or create_dynamic_service(None, override_mockables=override_mockables), publication=create_dynamic_publication(None), environment=environment.Environment.private_environment(uuid), uuid=uuid_, diff --git a/server/src/tests/core/services/generics/test_dynamic_publication.py b/server/src/tests/core/services/generics/test_dynamic_publication.py index 31fd035c1..badfdd415 100644 --- a/server/src/tests/core/services/generics/test_dynamic_publication.py +++ b/server/src/tests/core/services/generics/test_dynamic_publication.py @@ -198,7 +198,7 @@ class DynamicPublicationTest(UDSTestCase): self.assertEqual(counter, 10) # 4 retries + 5 retries after reset + 1 of the reset itself def test_publication_delete(self) -> None: - service = fixtures.create_dynamic_service() + service = fixtures.create_dynamic_service(override_mockables=False) publication = fixtures.create_dynamic_publication(service) publication._queue = [ types.services.Operation.NOP, # First check diff --git a/server/src/tests/core/services/generics/test_dynamic_service.py b/server/src/tests/core/services/generics/test_dynamic_service.py index 3201861a2..510d05450 100644 --- a/server/src/tests/core/services/generics/test_dynamic_service.py +++ b/server/src/tests/core/services/generics/test_dynamic_service.py @@ -321,7 +321,7 @@ class DynamicServiceTest(UDSTestCase): self.assertEqual(counter, 10) # 4 retries + 5 retries after reset + 1 of the reset itself def test_userservice_delete(self) -> None: - service = fixtures.create_dynamic_service() + service = fixtures.create_dynamic_service(override_mockables=False) userservice = fixtures.create_dynamic_userservice(service) userservice._vmid = 'vmid' userservice._queue = [ diff --git a/server/src/tests/core/workers/test_servicepools_cache_updater.py b/server/src/tests/core/workers/test_servicepools_cache_updater.py index d4e02231a..b497f1cb0 100644 --- a/server/src/tests/core/workers/test_servicepools_cache_updater.py +++ b/server/src/tests/core/workers/test_servicepools_cache_updater.py @@ -49,7 +49,7 @@ if typing.TYPE_CHECKING: logger = logging.getLogger(__name__) class ServiceCacheUpdaterTest(UDSTestCase): - servicePool: 'models.ServicePool' + servicepool: 'models.ServicePool' def setUp(self) -> None: # Default values for max @@ -60,11 +60,11 @@ class ServiceCacheUpdaterTest(UDSTestCase): ServiceCacheUpdater.setup() userService = services_fixtures.create_db_cache_userservices()[0] - self.servicePool = userService.deployed_service + self.servicepool = userService.deployed_service userService.delete() # empty all def removing_or_canceled_count(self) -> int: - return self.servicePool.userServices.filter( + return self.servicepool.userServices.filter( state__in=[State.REMOVABLE, State.CANCELED] ).count() @@ -73,7 +73,7 @@ class ServiceCacheUpdaterTest(UDSTestCase): updater = ServiceCacheUpdater(Environment.testing_environment()) updater.run() # Test user service will cancel automatically so it will not get in "removable" state (on remove start, it will tell it has been removed) - return self.servicePool.userServices.count() - self.removing_or_canceled_count() + return self.servicepool.userServices.count() - self.removing_or_canceled_count() def set_cache( self, @@ -82,52 +82,52 @@ class ServiceCacheUpdaterTest(UDSTestCase): cache2: typing.Optional[int] = None, max: typing.Optional[int] = None, ) -> None: - self.servicePool.initial_srvs = ( - self.servicePool.initial_srvs if initial is None else initial + self.servicepool.initial_srvs = ( + self.servicepool.initial_srvs if initial is None else initial ) - self.servicePool.cache_l1_srvs = ( - self.servicePool.cache_l1_srvs if cache is None else cache + self.servicepool.cache_l1_srvs = ( + self.servicepool.cache_l1_srvs if cache is None else cache ) - self.servicePool.cache_l2_srvs = ( - self.servicePool.cache_l2_srvs if cache2 is None else cache2 + self.servicepool.cache_l2_srvs = ( + self.servicepool.cache_l2_srvs if cache2 is None else cache2 ) - self.servicePool.max_srvs = self.servicePool.max_srvs if max is None else max - self.servicePool.save() + self.servicepool.max_srvs = self.servicepool.max_srvs if max is None else max + self.servicepool.save() def test_initial(self) -> None: self.set_cache(initial=100, cache=10, max=500) self.assertEqual( - self.execute_cache_updater(self.servicePool.initial_srvs + 10), - self.servicePool.initial_srvs, + self.execute_cache_updater(self.servicepool.initial_srvs + 10), + self.servicepool.initial_srvs, ) def test_remove(self) -> None: self.set_cache(initial=100, cache=110, max=500) - self.execute_cache_updater(self.servicePool.cache_l1_srvs) + self.execute_cache_updater(self.servicepool.cache_l1_srvs) # Now again, decrease cache to original, must remove ten elements - mustDelete = self.servicePool.cache_l1_srvs - self.servicePool.initial_srvs + must_delete = self.servicepool.cache_l1_srvs - self.servicepool.initial_srvs self.set_cache(cache=10) self.assertEqual( - self.execute_cache_updater(mustDelete*2), self.servicePool.initial_srvs + self.execute_cache_updater(must_delete*2), self.servicepool.initial_srvs ) - self.assertEqual(self.removing_or_canceled_count(), mustDelete) + self.assertEqual(self.removing_or_canceled_count(), must_delete) def test_max(self) -> None: self.set_cache(initial=100, cache=10, max=50) self.assertEqual( - self.execute_cache_updater(self.servicePool.initial_srvs + 10), - self.servicePool.max_srvs, + self.execute_cache_updater(self.servicepool.initial_srvs + 10), + self.servicepool.max_srvs, ) self.set_cache(cache=200) self.assertEqual( - self.execute_cache_updater(self.servicePool.initial_srvs + 10), - self.servicePool.max_srvs, + self.execute_cache_updater(self.servicepool.initial_srvs + 10), + self.servicepool.max_srvs, ) def test_cache(self) -> None: @@ -135,8 +135,8 @@ class ServiceCacheUpdaterTest(UDSTestCase): # Try to "overcreate" cache elements (must create 100, that is "cache" (bigger than initial)) self.assertEqual( - self.execute_cache_updater(self.servicePool.cache_l1_srvs + 10), - self.servicePool.cache_l1_srvs, + self.execute_cache_updater(self.servicepool.cache_l1_srvs + 10), + self.servicepool.cache_l1_srvs, ) def test_provider_preparing_limits(self) -> None: @@ -144,14 +144,14 @@ class ServiceCacheUpdaterTest(UDSTestCase): self.set_cache(initial=100, cache=10, max=50) # Try to "overcreate" cache elements but provider limits it to 10 - self.assertEqual(self.execute_cache_updater(self.servicePool.cache_l1_srvs + 10), 10) + self.assertEqual(self.execute_cache_updater(self.servicepool.cache_l1_srvs + 10), 10) # Delete all userServices - self.servicePool.userServices.all().delete() + self.servicepool.userServices.all().delete() # Now, set provider limit to 0. Minumum aceptable is 1, so 1 will be created TestProvider.concurrent_creation_limit = 0 - self.assertEqual(self.execute_cache_updater(self.servicePool.cache_l1_srvs + 10), 1) + self.assertEqual(self.execute_cache_updater(self.servicepool.cache_l1_srvs + 10), 1) def test_provider_no_removing_limits(self) -> None: # Removing limits are appliend in fact when EXECUTING removal, not when marking as removable @@ -159,7 +159,7 @@ class ServiceCacheUpdaterTest(UDSTestCase): self.set_cache(initial=0, cache=50, max=50) # Try to "overcreate" cache elements but provider limits it to 10 - self.execute_cache_updater(self.servicePool.cache_l1_srvs) + self.execute_cache_updater(self.servicepool.cache_l1_srvs) # Now set cache to a lower value self.set_cache(cache=10) @@ -174,12 +174,12 @@ class ServiceCacheUpdaterTest(UDSTestCase): self.set_cache(initial=100, cache=100, max=50) # Try to "overcreate" cache elements but provider limits it to 10 - self.assertEqual(self.execute_cache_updater(self.servicePool.cache_l1_srvs + 10), TestServiceCache.userservices_limit) + self.assertEqual(self.execute_cache_updater(self.servicepool.cache_l1_srvs + 10), TestServiceCache.userservices_limit) # Delete all userServices - self.servicePool.userServices.all().delete() + self.servicepool.userServices.all().delete() # We again allow masUserServices to be zero (meaning that no service will be created) # This allows us to "honor" some external providers that, in some cases, will not have services available... TestServiceCache.userservices_limit = 0 - self.assertEqual(self.execute_cache_updater(self.servicePool.cache_l1_srvs + 10), 0) + self.assertEqual(self.execute_cache_updater(self.servicepool.cache_l1_srvs + 10), 0) diff --git a/server/src/tests/services/proxmox/fixtures.py b/server/src/tests/services/proxmox/fixtures.py index f3aa66db7..cfcbfc3bf 100644 --- a/server/src/tests/services/proxmox/fixtures.py +++ b/server/src/tests/services/proxmox/fixtures.py @@ -590,6 +590,8 @@ def create_service_linked( service_db_mock.name = 'ServiceName' srvc.db_obj = mock.MagicMock() srvc.db_obj.return_value = service_db_mock + srvc.is_deletion_in_progress = mock.MagicMock() + srvc.is_deletion_in_progress.return_value = False return srvc diff --git a/server/src/tests/services/xen/fixtures.py b/server/src/tests/services/xen/fixtures.py index 458226aeb..f5543f256 100644 --- a/server/src/tests/services/xen/fixtures.py +++ b/server/src/tests/services/xen/fixtures.py @@ -490,6 +490,8 @@ def create_service_linked( service_db_mock.name = 'ServiceName' srvc.db_obj = mock.MagicMock() srvc.db_obj.return_value = service_db_mock + srvc.is_deletion_in_progress = mock.MagicMock() + srvc.is_deletion_in_progress.return_value = False return srvc diff --git a/server/src/uds/REST/methods/actor_v3.py b/server/src/uds/REST/methods/actor_v3.py index b066c4331..c9fde2ccf 100644 --- a/server/src/uds/REST/methods/actor_v3.py +++ b/server/src/uds/REST/methods/actor_v3.py @@ -673,9 +673,9 @@ class Logout(ActorV3Action): if osmanager: if osmanager.is_removable_on_logout(userservice): logger.debug('Removable on logout: %s', osmanager) - userservice.remove() + userservice.release() else: - userservice.remove() + userservice.release() def action(self) -> dict[str, typing.Any]: is_managed = self._params.get('type') != consts.actor.UNMANAGED diff --git a/server/src/uds/REST/methods/meta_service_pools.py b/server/src/uds/REST/methods/meta_service_pools.py index f9b8204b1..0ef8b2296 100644 --- a/server/src/uds/REST/methods/meta_service_pools.py +++ b/server/src/uds/REST/methods/meta_service_pools.py @@ -256,7 +256,7 @@ class MetaAssignedService(DetailHandler): logStr = 'Deleted cached service {} by {}'.format(userService.friendly_name, self._user.pretty_name) if userService.state in (State.USABLE, State.REMOVING): - userService.remove() + userService.release() elif userService.state == State.PREPARING: userService.cancel() elif userService.state == State.REMOVABLE: diff --git a/server/src/uds/REST/methods/services_usage.py b/server/src/uds/REST/methods/services_usage.py index 21b9bd89f..4652683aa 100644 --- a/server/src/uds/REST/methods/services_usage.py +++ b/server/src/uds/REST/methods/services_usage.py @@ -145,7 +145,7 @@ class ServicesUsage(DetailHandler): logger.debug('Deleting user service') if userService.state in (State.USABLE, State.REMOVING): - userService.remove() + userService.release() elif userService.state == State.PREPARING: userService.cancel() elif userService.state == State.REMOVABLE: diff --git a/server/src/uds/REST/methods/user_services.py b/server/src/uds/REST/methods/user_services.py index d1ea9d431..6bbc55686 100644 --- a/server/src/uds/REST/methods/user_services.py +++ b/server/src/uds/REST/methods/user_services.py @@ -216,7 +216,7 @@ class AssignedService(DetailHandler): logStr = f'Deleted cached service {userService.friendly_name} by {self._user.pretty_name}' if userService.state in (State.USABLE, State.REMOVING): - userService.remove() + userService.release() elif userService.state == State.PREPARING: userService.cancel() elif userService.state == State.REMOVABLE: diff --git a/server/src/uds/core/managers/servers_api/events.py b/server/src/uds/core/managers/servers_api/events.py index 086bf50a5..8467e4588 100644 --- a/server/src/uds/core/managers/servers_api/events.py +++ b/server/src/uds/core/managers/servers_api/events.py @@ -141,7 +141,7 @@ def process_logout(server: 'models.Server', data: dict[str, typing.Any]) -> typi osmanager: typing.Optional[osmanagers.OSManager] = userService.get_osmanager_instance() if not osmanager or osmanager.is_removable_on_logout(userService): logger.debug('Removable on logout: %s', osmanager) - userService.remove() + userService.release() return rest_result(consts.OK) diff --git a/server/src/uds/core/managers/userservice.py b/server/src/uds/core/managers/userservice.py index 5ad5c8816..1ff1232b6 100644 --- a/server/src/uds/core/managers/userservice.py +++ b/server/src/uds/core/managers/userservice.py @@ -50,7 +50,7 @@ from uds.core.services.exceptions import ( ) from uds.core.util import log, singleton from uds.core.util.decorators import cached -from uds.core.util.model import sql_now +from uds.core.util.model import generate_uuid, sql_now from uds.core.types.states import State from uds.core.util.stats import events from uds.models import MetaPool, ServicePool, ServicePoolPublication, Transport, User, UserService @@ -302,6 +302,130 @@ class UserServiceManager(metaclass=singleton.Singleton): # Data will be serialized on makeUnique process UserServiceOpChecker.make_unique(cache, cache_instance, state) + def clone_userservice_as_cache(self, user_service: UserService) -> UserService: + """ + Clones the record of a user service, cleaning up some fields so it's a cache element + The uuid will be regenerated, and the pk will be set to None + """ + # Load as new variable to avoid modifying original + user_service = UserService.objects.get(id=user_service.id) + user_service.pk = None + user_service.uuid = generate_uuid() + user_service.user = None + user_service.cache_level = types.services.CacheLevel.L1 + user_service.in_use = False + user_service.state = State.USABLE # We set it to usable so it can be used directly... + user_service.os_state = State.USABLE + + # Save the new cache element + user_service.save() + return user_service + + def get_cache_servicepool_stats(self, servicepool: ServicePool) -> 'types.services.ServicePoolStats': + """ + Returns the stats (for cache pourposes) for a service pool. + """ + # State filter for cached and inAssigned objects + # First we get all deployed services that could need cache generation + # We start filtering out the deployed services that do not need caching at all. + if ( + servicepool.max_srvs == 0 + or servicepool.state != State.ACTIVE + or servicepool.service.provider.maintenance_mode is True + ): + return types.services.ServicePoolStats.null() # No cache needed for this servicepool + + service_instance = servicepool.service.get_instance() + + servicepool.user_services.update() # Cleans cached queries + + # If this deployedService don't have a publication active and needs it, ignore it + service_instance = servicepool.service.get_instance() + + if service_instance.uses_cache is False: + logger.debug( + 'Service pool does not uses cache: %s', + servicepool.name, + ) + return types.services.ServicePoolStats.null() + + if servicepool.active_publication() is None and service_instance.publication_type is not None: + logger.debug( + 'Service pool needs publication and has none: %s', + servicepool.name, + ) + return types.services.ServicePoolStats.null() + + # If it has any running publication, do not generate cache anymore + if servicepool.publications.filter(state=State.PREPARING).count() > 0: + logger.debug( + 'Service pool with publication running: %s', + servicepool.name, + ) + return types.services.ServicePoolStats.null() + + if servicepool.is_restrained(): + logger.debug( + 'Restrained service pool: %s', + servicepool.name, + ) + return types.services.ServicePoolStats.null() + + # Get data related to actual state of cache + # Before we were removing the elements marked to be destroyed after creation, but this makes us + # to create new items over the limit stablisshed, so we will not remove them anymore + l1_cache_count: int = ( + servicepool.cached_users_services() + .filter(UserServiceManager().get_cache_state_filter(servicepool, types.services.CacheLevel.L1)) + .count() + ) + l2_cache_count: int = ( + ( + servicepool.cached_users_services() + .filter(UserServiceManager().get_cache_state_filter(servicepool, types.services.CacheLevel.L2)) + .count() + ) + if service_instance.uses_cache_l2 + else 0 + ) + assigned_count: int = ( + servicepool.assigned_user_services() + .filter(UserServiceManager().get_state_filter(servicepool.service)) + .count() + ) + pool_stat = types.services.ServicePoolStats(servicepool, l1_cache_count, l2_cache_count, assigned_count) + + # if we bypasses max cache, we will reduce it in first place. This is so because this will free resources on service provider + logger.debug( + "Examining %s with %s in cache L1 and %s in cache L2, %s inAssigned", + servicepool.name, + l1_cache_count, + l2_cache_count, + assigned_count, + ) + + # Check for cache overflow + # We have more than we want + if pool_stat.has_l1_cache_overflow() or pool_stat.has_l2_cache_overflow(): + logger.debug('We have more services than max configured.') + return pool_stat + + # Check for cache needed + # If this service don't allows more starting user services... + if not UserServiceManager.manager().can_grow_service_pool(servicepool): + logger.debug( + 'This pool cannot grow rithg now: %s', + servicepool, + ) + return types.services.ServicePoolStats.null() + + if pool_stat.is_l1_cache_growth_required() or pool_stat.is_l2_cache_growth_required(): + logger.debug('Needs to grow L1 cache for %s', servicepool) + return pool_stat + + # If this point reached, we do not need any cache + return types.services.ServicePoolStats.null() + def cancel(self, user_service: UserService) -> None: """ Cancels an user service creation @@ -360,6 +484,24 @@ class UserServiceManager(metaclass=singleton.Singleton): _('Can\'t remove nor cancel {} cause its state don\'t allow it').format(user_service.name) ) + def release_on_logout(self, user_service: UserService) -> None: + """ + In case of logout, this method will take care of removing the service + This is so because on logout, may the userservice returns back to cache if ower service + desired it that way. + + This method will take care of removing the service if no cache is desired of cache already full (on servicepool) + """ + stats = self.get_cache_servicepool_stats(user_service.deployed_service) + # Note that only moves to cache L1 + # Also, we can get values for L2 cache, thats why we check L1 for overflow and needed + if stats.has_l1_cache_overflow(): + user_service.release() # Mark as removable + elif stats.is_l1_cache_growth_required(): + # Move the clone of the user service to cache, and set our as REMOVED + _cache = self.clone_userservice_as_cache(user_service) + user_service.set_state(State.REMOVED) + def get_existing_assignation_for_user( self, service_pool: ServicePool, user: User ) -> typing.Optional[UserService]: @@ -664,7 +806,7 @@ class UserServiceManager(metaclass=singleton.Singleton): remove = True if remove: - user_service.remove() + user_service.release() def notify_ready_from_os_manager(self, user_service: UserService, data: typing.Any) -> None: try: @@ -955,10 +1097,10 @@ class UserServiceManager(metaclass=singleton.Singleton): # Sort pools related to policy now, and xtract only pools, not sort keys # split resuult in two lists, 100% full and not 100% full # Remove "full" pools (100%) from result and pools in maintenance mode, not ready pools, etc... - sortedPools = sorted(sortPools, key=operator.itemgetter(0)) # sort by priority (first element) + sorted_pools = sorted(sortPools, key=operator.itemgetter(0)) # sort by priority (first element) pools: list[ServicePool] = [] pool_full: list[ServicePool] = [] - for p in sortedPools: + for p in sorted_pools: if not p[1].is_usable(): continue if p[1].usage().percent == 100: diff --git a/server/src/uds/core/services/generics/dynamic/publication.py b/server/src/uds/core/services/generics/dynamic/publication.py index 02788e666..45319a8bd 100644 --- a/server/src/uds/core/services/generics/dynamic/publication.py +++ b/server/src/uds/core/services/generics/dynamic/publication.py @@ -490,7 +490,7 @@ class DynamicPublication(services.Publication, autoserializable.AutoSerializable """ This method is called to check if the service is removed """ - if self.service().check_deleting_status(self, self._vmid) is False: + if self.service().is_deletion_in_progress(self, self._vmid) is False: return types.states.TaskState.FINISHED return types.states.TaskState.RUNNING diff --git a/server/src/uds/core/services/generics/dynamic/service.py b/server/src/uds/core/services/generics/dynamic/service.py index 268200eb7..df2fb45f4 100644 --- a/server/src/uds/core/services/generics/dynamic/service.py +++ b/server/src/uds/core/services/generics/dynamic/service.py @@ -253,7 +253,7 @@ class DynamicService(services.Service, abc.ABC): # pylint: disable=too-many-pub # Store deleting vmid storage[f'deleting_{vmid}'] = True - def check_deleting_status(self, caller_instance: typing.Optional['DynamicUserService | DynamicPublication'], vmid: str) -> bool: + def is_deletion_in_progress(self, caller_instance: typing.Optional['DynamicUserService | DynamicPublication'], vmid: str) -> bool: """ Checks if the deferred deletion of a machine is running Default implementation is return False always diff --git a/server/src/uds/core/services/generics/dynamic/userservice.py b/server/src/uds/core/services/generics/dynamic/userservice.py index 7481b56a3..caf681dd7 100644 --- a/server/src/uds/core/services/generics/dynamic/userservice.py +++ b/server/src/uds/core/services/generics/dynamic/userservice.py @@ -811,7 +811,7 @@ class DynamicUserService(services.UserService, autoserializable.AutoSerializable """ This method is called to check if the service is removed """ - if self.service().check_deleting_status(self, self._vmid) is False: + if self.service().is_deletion_in_progress(self, self._vmid) is False: return types.states.TaskState.FINISHED return types.states.TaskState.RUNNING diff --git a/server/src/uds/core/types/services.py b/server/src/uds/core/types/services.py index 87056fc25..6efc5e40f 100644 --- a/server/src/uds/core/types/services.py +++ b/server/src/uds/core/types/services.py @@ -29,9 +29,13 @@ """ Author: Adolfo Gómez, dkmaster at dkmon dot com """ +import typing import dataclasses import enum +if typing.TYPE_CHECKING: + from uds.models.service_pool import ServicePool + class ServiceType(enum.StrEnum): VDI = 'VDI' @@ -116,6 +120,7 @@ class ReadyStatus(enum.IntEnum): except ValueError: return ReadyStatus.USERSERVICE_NOT_READY + class CacheLevel(enum.IntEnum): NONE = 0 # : Constant for User cache level (no cache at all) L1 = 1 # : Constant for Cache of level 1 @@ -131,6 +136,7 @@ class Operation(enum.IntEnum): * Note that we will need to "translate" old values to new ones on the service, * Adapting existing services to this, will probably need a migration """ + # Standard operations 1000-1999 INITIALIZE = 1000 CREATE = 1001 @@ -147,16 +153,16 @@ class Operation(enum.IntEnum): RESET_COMPLETED = 1012 DELETE = 1013 DELETE_COMPLETED = 1014 - + WAIT = 1100 # This is a "wait" operation, used to wait for something to happen NOP = 1101 RETRY = 1102 # Do not have executors, inserted to retry operation and recognize it - + # Custom validations 2000-2999 DESTROY_VALIDATOR = 2000 # Check if the userservice has an vmid to stop destroying it if needed # Specific operations 3000-3999 - + # for Fixed User Services SNAPSHOT_CREATE = 3000 SNAPSHOT_RECOVER = 3001 @@ -173,7 +179,7 @@ class Operation(enum.IntEnum): # So we will translate, for example SNAPSHOT_CREATE to CUSTOM_1, etc.. # Fixed user services does not allows custom operations, we use them # to alias some fixed operations (like snapshot create, recover, etc..) - + # Custom operations 20000-29999 CUSTOM_1 = 20001 CUSTOM_2 = 20002 @@ -184,7 +190,7 @@ class Operation(enum.IntEnum): CUSTOM_7 = 20007 CUSTOM_8 = 20008 CUSTOM_9 = 20009 - + def is_custom(self) -> bool: """ Returns if the operation is a custom one @@ -197,7 +203,68 @@ class Operation(enum.IntEnum): return Operation(value) except ValueError: return Operation.UNKNOWN - + def as_str(self) -> str: return self.name + +@dataclasses.dataclass +class ServicePoolStats: + servicepool: typing.Optional['ServicePool'] + l1_cache_count: int + l2_cache_count: int + assigned_count: int + + def has_l1_cache_overflow(self) -> bool: + """Checks if L1 cache is overflown + + Overflows if: + * l1_assigned_count > max_srvs + (this is, if we have more than max, we can remove cached l1 items until we reach max) + * l1_assigned_count > initial_srvs and l1_cache_count > cache_l1_srvs + (this is, if we have more than initial, we can remove cached l1 items until we reach cache_l1_srvs) + """ + if not self.servicepool: + return False + + l1_assigned_count = self.l1_cache_count + self.assigned_count + return l1_assigned_count > self.servicepool.max_srvs or ( + l1_assigned_count > self.servicepool.initial_srvs + and self.l1_cache_count > self.servicepool.cache_l1_srvs + ) + + def is_l1_cache_growth_required(self) -> bool: + """Checks if L1 cache is needed + + Grow L1 cache if: + * l1_assigned_count < max_srvs and (l1_assigned_count < initial_srvs or l1_cache_count < cache_l1_srvs) + (this is, if we have not reached max, and we have not reached initial or cache_l1_srvs, we need to grow L1 cache) + + """ + if not self.servicepool: + return False + + l1_assigned_count = self.l1_cache_count + self.assigned_count + return l1_assigned_count < self.servicepool.max_srvs and ( + l1_assigned_count < self.servicepool.initial_srvs + or self.l1_cache_count < self.servicepool.cache_l1_srvs + ) + + def has_l2_cache_overflow(self) -> bool: + """Checks if L2 cache is overflown""" + if not self.servicepool: + return False + return self.l2_cache_count > self.servicepool.cache_l2_srvs + + def is_l2_cache_growth_required(self) -> bool: + """Checks if L2 cache is needed""" + if not self.servicepool: + return False + return self.l2_cache_count < self.servicepool.cache_l2_srvs + + def is_null(self) -> bool: + return self.servicepool is None + + @staticmethod + def null() -> 'ServicePoolStats': + return ServicePoolStats(None, 0, 0, 0) diff --git a/server/src/uds/core/types/states.py b/server/src/uds/core/types/states.py index a9fd0bd15..d42a49b2e 100644 --- a/server/src/uds/core/types/states.py +++ b/server/src/uds/core/types/states.py @@ -96,8 +96,6 @@ class State(enum.StrEnum): @property def localized(self) -> str: """Returns the literal translation of the state""" - print(self) - print(_TRANSLATIONS.get(self)) return _TRANSLATIONS.get(self, _TRANSLATIONS[State.UNKNOWN]) def is_active(self) -> bool: diff --git a/server/src/uds/models/calendar_action.py b/server/src/uds/models/calendar_action.py index 1e9f22414..13e7778ec 100644 --- a/server/src/uds/models/calendar_action.py +++ b/server/src/uds/models/calendar_action.py @@ -170,7 +170,7 @@ class CalendarAction(UUIDModel): for userService in self.service_pool.assigned_user_services().filter( state=types.states.State.USABLE ): - userService.remove() + userService.release() def _remove_stuck_userservice() -> None: # 1.- Remove stuck assigned services (Ignore "creating ones", just for created) @@ -178,7 +178,7 @@ class CalendarAction(UUIDModel): for userService in self.service_pool.assigned_user_services().filter( state_date__lt=since, state=types.states.State.USABLE ): - userService.remove() + userService.release() def _del_all_transport() -> None: # 2.- Remove all transports @@ -200,7 +200,7 @@ class CalendarAction(UUIDModel): ), ) ): - i.remove() + i.release() def _add_del_transport() -> None: try: diff --git a/server/src/uds/models/user.py b/server/src/uds/models/user.py index 085292c64..3c4af203a 100644 --- a/server/src/uds/models/user.py +++ b/server/src/uds/models/user.py @@ -227,7 +227,7 @@ class User(UUIDModel, properties.PropertiesMixin): # Removes all user services assigned to this user (unassign it and mark for removal) for us in to_delete.userServices.all(): us.assign_to(None) - us.remove() + us.release() logger.debug('Deleted user %s', to_delete) diff --git a/server/src/uds/models/user_service.py b/server/src/uds/models/user_service.py index e253c2878..7b5bebab6 100644 --- a/server/src/uds/models/user_service.py +++ b/server/src/uds/models/user_service.py @@ -524,26 +524,20 @@ class UserService(UUIDModel, properties.PropertiesMixin): """ Returns if this service is ready (not preparing or marked for removal) """ - # pylint: disable=import-outside-toplevel from uds.core.managers.userservice import UserServiceManager # Call to isReady of the instance - return UserServiceManager().is_ready(self) + return UserServiceManager.manager().is_ready(self) def is_in_maintenance(self) -> bool: return self.deployed_service.is_in_maintenance() - def remove(self) -> None: - """ - Mark this user deployed service for removal - """ - self.set_state(State.REMOVABLE) - def release(self) -> None: """ - A much more convenient method name that "remove" (i think :) ) + Mark this user deployed service for removal. + If from_logout is true, maybe the service can return to cache, else, it will be removed """ - self.remove() + self.set_state(State.REMOVABLE) def cancel(self) -> None: """ @@ -560,7 +554,7 @@ class UserService(UUIDModel, properties.PropertiesMixin): Marks for removal or cancels it, depending on state """ if self.is_usable(): - self.remove() + self.release() else: self.cancel() diff --git a/server/src/uds/osmanagers/LinuxOsManager/linux_osmanager.py b/server/src/uds/osmanagers/LinuxOsManager/linux_osmanager.py index 462084e7d..b611cc6c9 100644 --- a/server/src/uds/osmanagers/LinuxOsManager/linux_osmanager.py +++ b/server/src/uds/osmanagers/LinuxOsManager/linux_osmanager.py @@ -123,7 +123,7 @@ class LinuxOsManager(osmanagers.OSManager): 'Unused user service for too long. Removing due to OS Manager parameters.', types.log.LogSource.OSMANAGER, ) - userservice.remove() + userservice.release() def is_persistent(self) -> bool: return fields.onlogout_field_is_persistent(self.on_logout) diff --git a/server/src/uds/osmanagers/Test/testing_osmanager.py b/server/src/uds/osmanagers/Test/testing_osmanager.py index eee718b9d..c4692f266 100644 --- a/server/src/uds/osmanagers/Test/testing_osmanager.py +++ b/server/src/uds/osmanagers/Test/testing_osmanager.py @@ -123,7 +123,7 @@ class TestOSManager(osmanagers.OSManager): 'Unused user service for too long. Removing due to OS Manager parameters.', types.log.LogSource.OSMANAGER, ) - userservice.remove() + userservice.release() def is_persistent(self) -> bool: return self.on_logout.value == 'keep-always' diff --git a/server/src/uds/osmanagers/WindowsOsManager/windows.py b/server/src/uds/osmanagers/WindowsOsManager/windows.py index 7790de3b7..3fd2ac9aa 100644 --- a/server/src/uds/osmanagers/WindowsOsManager/windows.py +++ b/server/src/uds/osmanagers/WindowsOsManager/windows.py @@ -114,7 +114,7 @@ class WindowsOsManager(osmanagers.OSManager): 'Unused user service for too long. Removing due to OS Manager parameters.', types.log.LogSource.OSMANAGER, ) - userservice.remove() + userservice.release() def is_persistent(self) -> bool: return fields.onlogout_field_is_persistent(self.on_logout) diff --git a/server/src/uds/services/OpenGnsys/jobs.py b/server/src/uds/services/OpenGnsys/jobs.py index 115624779..566ed5e9f 100644 --- a/server/src/uds/services/OpenGnsys/jobs.py +++ b/server/src/uds/services/OpenGnsys/jobs.py @@ -80,6 +80,6 @@ class OpenGnsysMaintainer(jobs.Job): 'The cached user service %s is about to expire. Removing it so it can be recreated', userService, ) - userService.remove() + userService.release() logger.debug('OpenGnsys job finished') diff --git a/server/src/uds/workers/servicepools_cache_updater.py b/server/src/uds/workers/servicepools_cache_updater.py index 9d1442640..5e415830d 100644 --- a/server/src/uds/workers/servicepools_cache_updater.py +++ b/server/src/uds/workers/servicepools_cache_updater.py @@ -29,7 +29,6 @@ """ Author: Adolfo Gómez, dkmaster at dkmon dot com """ -import dataclasses import logging import typing import collections.abc @@ -57,51 +56,6 @@ logger = logging.getLogger(__name__) # * l2 will be removed until cache_l2_srvs is reached -@dataclasses.dataclass(slots=True) -class ServicePoolStats: - servicepool: ServicePool - l1_cache_count: int - l2_cache_count: int - assigned_count: int - - def l1_cache_overflow(self) -> bool: - """Checks if L1 cache is overflown - - Overflows if: - * l1_assigned_count > max_srvs - (this is, if we have more than max, we can remove cached l1 items until we reach max) - * l1_assigned_count > initial_srvs and l1_cache_count > cache_l1_srvs - (this is, if we have more than initial, we can remove cached l1 items until we reach cache_l1_srvs) - """ - l1_assigned_count = self.l1_cache_count + self.assigned_count - return l1_assigned_count > self.servicepool.max_srvs or ( - l1_assigned_count > self.servicepool.initial_srvs - and self.l1_cache_count > self.servicepool.cache_l1_srvs - ) - - def l1_cache_needed(self) -> bool: - """Checks if L1 cache is needed - - Grow L1 cache if: - * l1_assigned_count < max_srvs and (l1_assigned_count < initial_srvs or l1_cache_count < cache_l1_srvs) - (this is, if we have not reached max, and we have not reached initial or cache_l1_srvs, we need to grow L1 cache) - - """ - l1_assigned_count = self.l1_cache_count + self.assigned_count - return l1_assigned_count < self.servicepool.max_srvs and ( - l1_assigned_count < self.servicepool.initial_srvs - or self.l1_cache_count < self.servicepool.cache_l1_srvs - ) - - def l2_cache_overflow(self) -> bool: - """Checks if L2 cache is overflown""" - return self.l2_cache_count > self.servicepool.cache_l2_srvs - - def l2_cache_needed(self) -> bool: - """Checks if L2 cache is needed""" - return self.l2_cache_count < self.servicepool.cache_l2_srvs - - class ServiceCacheUpdater(Job): """ Cache updater is responsible of keeping up to date the cache for different deployed services configurations requested @@ -131,9 +85,139 @@ class ServiceCacheUpdater(Job): remaining_restraing_time, ) + # def service_pools_needing_cache_update( + # self, + # ) -> list[types.services.ServicePoolStats]: + # # State filter for cached and inAssigned objects + # # First we get all deployed services that could need cache generation + # # We start filtering out the deployed services that do not need caching at all. + # candidate_servicepools: collections.abc.Iterable[ServicePool] = ( + # ServicePool.objects.filter(Q(initial_srvs__gte=0) | Q(cache_l1_srvs__gte=0)) + # .filter( + # max_srvs__gt=0, + # state=State.ACTIVE, + # service__provider__maintenance_mode=False, + # ) + # .iterator() + # ) + + # # We will get the one that proportionally needs more cache + # servicepools_numbers: list[types.services.ServicePoolStats] = [] + # for servicepool in candidate_servicepools: + # servicepool.user_services.update() # Cleans cached queries + # # If this deployedService don't have a publication active and needs it, ignore it + # service_instance = servicepool.service.get_instance() + + # if service_instance.uses_cache is False: + # logger.debug( + # 'Skipping cache generation for service pool that does not uses cache: %s', + # servicepool.name, + # ) + # continue + + # if servicepool.active_publication() is None and service_instance.publication_type is not None: + # logger.debug( + # 'Skipping. %s Needs publication but do not have one', + # servicepool.name, + # ) + # continue + # # If it has any running publication, do not generate cache anymore + # if servicepool.publications.filter(state=State.PREPARING).count() > 0: + # logger.debug( + # 'Skipping cache generation for service pool with publication running: %s', + # servicepool.name, + # ) + # continue + + # if servicepool.is_restrained(): + # logger.debug( + # 'StopSkippingped cache generation for restrained service pool: %s', + # servicepool.name, + # ) + # ServiceCacheUpdater._notify_restrain(servicepool) + # continue + + # # Get data related to actual state of cache + # # Before we were removing the elements marked to be destroyed after creation, but this makes us + # # to create new items over the limit stablisshed, so we will not remove them anymore + # l1_cache_count: int = ( + # servicepool.cached_users_services() + # .filter(UserServiceManager().get_cache_state_filter(servicepool, types.services.CacheLevel.L1)) + # .count() + # ) + # l2_cache_count: int = ( + # ( + # servicepool.cached_users_services() + # .filter( + # UserServiceManager().get_cache_state_filter(servicepool, types.services.CacheLevel.L2) + # ) + # .count() + # ) + # if service_instance.uses_cache_l2 + # else 0 + # ) + # assigned_count: int = ( + # servicepool.assigned_user_services() + # .filter(UserServiceManager().get_state_filter(servicepool.service)) + # .count() + # ) + # pool_stat = types.services.ServicePoolStats( + # servicepool, l1_cache_count, l2_cache_count, assigned_count + # ) + # # if we bypasses max cache, we will reduce it in first place. This is so because this will free resources on service provider + # logger.debug( + # "Examining %s with %s in cache L1 and %s in cache L2, %s inAssigned", + # servicepool.name, + # l1_cache_count, + # l2_cache_count, + # assigned_count, + # ) + + # # We have more than we want + # if pool_stat.l1_cache_overflow(): + # logger.debug('We have more services than max configured. Reducing..') + # servicepools_numbers.append( + # types.services.ServicePoolStats(servicepool, l1_cache_count, l2_cache_count, assigned_count) + # ) + # continue + + # # If we have more in L2 cache than needed, decrease L2 cache, but int this case, we continue checking cause L2 cache removal + # # has less priority than l1 creations or removals, but higher. In this case, we will simply take last l2 oversized found and reduce it + # if pool_stat.l2_cache_overflow(): + # logger.debug('We have more services in L2 cache than configured, reducing') + # servicepools_numbers.append( + # types.services.ServicePoolStats(servicepool, l1_cache_count, l2_cache_count, assigned_count) + # ) + # continue + + # # If this service don't allows more starting user services, continue + # if not UserServiceManager().can_grow_service_pool(servicepool): + # logger.debug( + # 'This pool cannot grow rithg now: %s', + # servicepool, + # ) + # continue + + # if pool_stat.l1_cache_needed(): + # logger.debug('Needs to grow L1 cache for %s', servicepool) + # servicepools_numbers.append( + # types.services.ServicePoolStats(servicepool, l1_cache_count, l2_cache_count, assigned_count) + # ) + # continue + + # if pool_stat.l2_cache_needed(): + # logger.debug('Needs to grow L2 cache for %s', servicepool) + # servicepools_numbers.append( + # types.services.ServicePoolStats(servicepool, l1_cache_count, l2_cache_count, assigned_count) + # ) + # continue + + # # We also return calculated values so we can reuse then + # return servicepools_numbers + def service_pools_needing_cache_update( self, - ) -> list[ServicePoolStats]: + ) -> list[types.services.ServicePoolStats]: # State filter for cached and inAssigned objects # First we get all deployed services that could need cache generation # We start filtering out the deployed services that do not need caching at all. @@ -148,120 +232,18 @@ class ServiceCacheUpdater(Job): ) # We will get the one that proportionally needs more cache - servicepools_numbers: list[ServicePoolStats] = [] + servicepools_numbers: list[types.services.ServicePoolStats] = [] for servicepool in candidate_servicepools: - servicepool.user_services.update() # Cleans cached queries - # If this deployedService don't have a publication active and needs it, ignore it - service_instance = servicepool.service.get_instance() - - if service_instance.uses_cache is False: - logger.debug( - 'Skipping cache generation for service pool that does not uses cache: %s', - servicepool.name, - ) - continue - - if servicepool.active_publication() is None and service_instance.publication_type is not None: - logger.debug( - 'Skipping. %s Needs publication but do not have one', - servicepool.name, - ) - continue - # If it has any running publication, do not generate cache anymore - if servicepool.publications.filter(state=State.PREPARING).count() > 0: - logger.debug( - 'Skipping cache generation for service pool with publication running: %s', - servicepool.name, - ) - continue - - if servicepool.is_restrained(): - logger.debug( - 'StopSkippingped cache generation for restrained service pool: %s', - servicepool.name, - ) - ServiceCacheUpdater._notify_restrain(servicepool) - continue - - # Get data related to actual state of cache - # Before we were removing the elements marked to be destroyed after creation, but this makes us - # to create new items over the limit stablisshed, so we will not remove them anymore - l1_cache_count: int = ( - servicepool.cached_users_services() - .filter(UserServiceManager().get_cache_state_filter(servicepool, types.services.CacheLevel.L1)) - .count() - ) - l2_cache_count: int = ( - ( - servicepool.cached_users_services() - .filter( - UserServiceManager().get_cache_state_filter(servicepool, types.services.CacheLevel.L2) - ) - .count() - ) - if service_instance.uses_cache_l2 - else 0 - ) - assigned_count: int = ( - servicepool.assigned_user_services() - .filter(UserServiceManager().get_state_filter(servicepool.service)) - .count() - ) - pool_stat = ServicePoolStats(servicepool, l1_cache_count, l2_cache_count, assigned_count) - # if we bypasses max cache, we will reduce it in first place. This is so because this will free resources on service provider - logger.debug( - "Examining %s with %s in cache L1 and %s in cache L2, %s inAssigned", - servicepool.name, - l1_cache_count, - l2_cache_count, - assigned_count, - ) - - # We have more than we want - if pool_stat.l1_cache_overflow(): - logger.debug('We have more services than max configured. Reducing..') - servicepools_numbers.append( - ServicePoolStats(servicepool, l1_cache_count, l2_cache_count, assigned_count) - ) - continue - - # If we have more in L2 cache than needed, decrease L2 cache, but int this case, we continue checking cause L2 cache removal - # has less priority than l1 creations or removals, but higher. In this case, we will simply take last l2 oversized found and reduce it - if pool_stat.l2_cache_overflow(): - logger.debug('We have more services in L2 cache than configured, reducing') - servicepools_numbers.append( - ServicePoolStats(servicepool, l1_cache_count, l2_cache_count, assigned_count) - ) - continue - - # If this service don't allows more starting user services, continue - if not UserServiceManager().can_grow_service_pool(servicepool): - logger.debug( - 'This pool cannot grow rithg now: %s', - servicepool, - ) - continue - - if pool_stat.l1_cache_needed(): - logger.debug('Needs to grow L1 cache for %s', servicepool) - servicepools_numbers.append( - ServicePoolStats(servicepool, l1_cache_count, l2_cache_count, assigned_count) - ) - continue - - if pool_stat.l2_cache_needed(): - logger.debug('Needs to grow L2 cache for %s', servicepool) - servicepools_numbers.append( - ServicePoolStats(servicepool, l1_cache_count, l2_cache_count, assigned_count) - ) - continue + stats = UserServiceManager.manager().get_cache_servicepool_stats(servicepool) + if not stats.is_null(): + servicepools_numbers.append(stats) # We also return calculated values so we can reuse then return servicepools_numbers def grow_l1_cache( self, - servicepool_stats: ServicePoolStats, + servicepool_stats: types.services.ServicePoolStats, ) -> None: """ This method tries to enlarge L1 cache. @@ -270,6 +252,8 @@ class ServiceCacheUpdater(Job): and PREPARING, assigned, L1 and L2) is over max allowed service deployments, this method will not grow the L1 cache """ + if servicepool_stats.servicepool is None: + return logger.debug('Growing L1 cache creating a new service for %s', servicepool_stats.servicepool.name) # First, we try to assign from L2 cache if servicepool_stats.l2_cache_count > 0: @@ -301,7 +285,7 @@ class ServiceCacheUpdater(Job): return try: # This has a velid publication, or it will not be here - UserServiceManager().create_cache_for( + UserServiceManager.manager().create_cache_for( typing.cast(ServicePoolPublication, servicepool_stats.servicepool.active_publication()), types.services.CacheLevel.L1, ) @@ -322,7 +306,7 @@ class ServiceCacheUpdater(Job): def grow_l2_cache( self, - servicepool_stats: ServicePoolStats, + servicepool_stats: types.services.ServicePoolStats, ) -> None: """ Tries to grow L2 cache of service. @@ -331,10 +315,12 @@ class ServiceCacheUpdater(Job): and PREPARING, assigned, L1 and L2) is over max allowed service deployments, this method will not grow the L1 cache """ + if servicepool_stats.servicepool is None: + return logger.debug("Growing L2 cache creating a new service for %s", servicepool_stats.servicepool.name) try: # This has a velid publication, or it will not be here - UserServiceManager().create_cache_for( + UserServiceManager.manager().create_cache_for( typing.cast(ServicePoolPublication, servicepool_stats.servicepool.active_publication()), types.services.CacheLevel.L2, ) @@ -344,15 +330,18 @@ class ServiceCacheUpdater(Job): servicepool_stats.servicepool.name, servicepool_stats.servicepool.max_srvs, ) - # TODO: When alerts are ready, notify this + # Alerts notified through logger def reduce_l1_cache( self, - servicepool_stats: ServicePoolStats, + servicepool_stats: types.services.ServicePoolStats, ) -> None: logger.debug("Reducing L1 cache erasing a service in cache for %s", servicepool_stats.servicepool) # We will try to destroy the newest l1_cache_count element that is USABLE if the deployer can't cancel a new service creation # Here, we will take into account the "remove_after" marked user services, so we don't try to remove them + if servicepool_stats.servicepool is None: + return + cacheItems: list[UserService] = [ i for i in servicepool_stats.servicepool.cached_users_services() @@ -392,8 +381,10 @@ class ServiceCacheUpdater(Job): def reduce_l2_cache( self, - servicepool_stats: ServicePoolStats, + servicepool_stats: types.services.ServicePoolStats, ) -> None: + if servicepool_stats.servicepool is None: + return logger.debug("Reducing L2 cache erasing a service in cache for %s", servicepool_stats.servicepool.name) if servicepool_stats.l2_cache_count > 0: cacheItems = ( @@ -418,12 +409,12 @@ class ServiceCacheUpdater(Job): # Treat l1 and l2 cache independently # first, try to reduce cache and then grow it - if servicepool_stat.l1_cache_overflow(): + if servicepool_stat.has_l1_cache_overflow(): self.reduce_l1_cache(servicepool_stat) - elif servicepool_stat.l1_cache_needed(): # We need more L1 items + elif servicepool_stat.is_l1_cache_growth_required(): # We need more L1 items self.grow_l1_cache(servicepool_stat) # Treat l1 and l2 cache independently - if servicepool_stat.l2_cache_overflow(): + if servicepool_stat.has_l2_cache_overflow(): self.reduce_l2_cache(servicepool_stat) - elif servicepool_stat.l2_cache_needed(): # We need more L2 items + elif servicepool_stat.is_l2_cache_growth_required(): # We need more L2 items self.grow_l2_cache(servicepool_stat)