1
0
mirror of https://github.com/dkmstr/openuds.git synced 2025-02-27 09:57:46 +03:00

Adding support for putting back userservices into cache instead of removing them (if possible)

This commit is contained in:
Adolfo Gómez García 2024-09-02 19:11:49 +02:00
parent 7c2a8168fa
commit 207a784b8f
No known key found for this signature in database
GPG Key ID: DD1ABF20724CDA23
17 changed files with 105 additions and 54 deletions

View File

@ -515,7 +515,7 @@ class BaseReadyChange(ActorV3Action):
if osmanager:
osmanager.to_ready(userService)
UserServiceManager().notify_ready_from_os_manager(userService, '') # Currently, no data is received for os manager
UserServiceManager.manager().notify_ready_from_os_manager(userService, '') # Currently, no data is received for os manager
# Generates a certificate and send it to client.
privateKey, cert, password = security.create_self_signed_cert(self._params['ip'])

View File

@ -142,7 +142,7 @@ class Client(Handler):
userServiceInstance,
transport,
transportInstance,
) = UserServiceManager().get_user_service_info(
) = UserServiceManager.manager().get_user_service_info(
self._request.user,
self._request.os,
self._request.ip,

View File

@ -103,7 +103,7 @@ class Connection(Handler):
_, # iads,
_, # trans,
itrans,
) = UserServiceManager().get_user_service_info( # pylint: disable=unused-variable
) = UserServiceManager.manager().get_user_service_info( # pylint: disable=unused-variable
self._user,
self._request.os,
self._request.ip,
@ -134,7 +134,7 @@ class Connection(Handler):
def script(self, idService: str, idTransport: str, scrambler: str, hostname: str) -> dict[str, typing.Any]:
try:
res = UserServiceManager().get_user_service_info(
res = UserServiceManager.manager().get_user_service_info(
self._user, self._request.os, self._request.ip, idService, idTransport
)
userService: 'models.UserService'

View File

@ -194,7 +194,7 @@ class ServicesPools(ModelHandler):
if item.is_in_maintenance():
state = State.MAINTENANCE
# This needs a lot of queries, and really does not apport anything important to the report
# elif UserServiceManager().canInitiateServiceFromDeployedService(item) is False:
# elif UserServiceManager.manager().canInitiateServiceFromDeployedService(item) is False:
# state = State.SLOWED_DOWN
val: dict[str, typing.Any] = {
'id': item.uuid,
@ -679,7 +679,7 @@ class ServicesPools(ModelHandler):
return self.invalid_request_response('Invalid parameters')
logger.debug('Creating from assignable: %s', self._params)
UserServiceManager().create_from_assignable(
UserServiceManager.manager().create_from_assignable(
item,
User.objects.get(uuid__iexact=process_uuid(self._params['user_id'])),
self._params['assignable_id'],

View File

@ -257,7 +257,7 @@ class AssignedService(DetailHandler):
def reset(self, parent: 'models.ServicePool', item: str) -> typing.Any:
userService = parent.userServices.get(uuid=process_uuid(item))
UserServiceManager().reset(userService)
UserServiceManager.manager().reset(userService)
class CachedService(AssignedService):

View File

@ -318,7 +318,7 @@ class UserServiceManager(metaclass=singleton.Singleton):
# Save the new element, for reference
user_service_copy.save()
# Now, move the original to cache, but do it "hard" way, so we do not need to check for state
user_service.state = State.USABLE
user_service.os_state = State.USABLE
@ -381,13 +381,13 @@ class UserServiceManager(metaclass=singleton.Singleton):
# to create new items over the limit stablisshed, so we will not remove them anymore
l1_cache_count: int = (
servicepool.cached_users_services()
.filter(UserServiceManager().get_cache_state_filter(servicepool, types.services.CacheLevel.L1))
.filter(UserServiceManager.manager().get_cache_state_filter(servicepool, types.services.CacheLevel.L1))
.count()
)
l2_cache_count: int = (
(
servicepool.cached_users_services()
.filter(UserServiceManager().get_cache_state_filter(servicepool, types.services.CacheLevel.L2))
.filter(UserServiceManager.manager().get_cache_state_filter(servicepool, types.services.CacheLevel.L2))
.count()
)
if service_instance.uses_cache_l2
@ -395,7 +395,7 @@ class UserServiceManager(metaclass=singleton.Singleton):
)
assigned_count: int = (
servicepool.assigned_user_services()
.filter(UserServiceManager().get_state_filter(servicepool.service))
.filter(UserServiceManager.manager().get_state_filter(servicepool.service))
.count()
)
pool_stat = types.services.ServicePoolStats(servicepool, l1_cache_count, l2_cache_count, assigned_count)
@ -497,10 +497,21 @@ class UserServiceManager(metaclass=singleton.Singleton):
This method will take care of removing the service if no cache is desired of cache already full (on servicepool)
"""
if userservice.deployed_service.service.get_instance().allows_put_back_to_cache() is False:
if userservice.allow_putting_back_to_cache() is False:
userservice.release() # Normal release
return
# Some sanity checks, should never happen
if userservice.cache_level != types.services.CacheLevel.NONE:
logger.error('Cache level is not NONE for userservice %s on release_on_logout', userservice)
userservice.release()
return
if userservice.is_usable() is False:
logger.error('State is not USABLE for userservice %s on release_on_logout', userservice)
userservice.release()
return
stats = self.get_cache_servicepool_stats(userservice.deployed_service)
# Note that only moves to cache L1
# Also, we can get values for L2 cache, thats why we check L1 for overflow and needed

View File

@ -58,7 +58,7 @@ class DynamicService(services.Service, abc.ABC): # pylint: disable=too-many-pub
uses_cache_l2 = False # L2 Cache are running machines in suspended state
needs_osmanager = False # If the service needs a s.o. manager (managers are related to agents provided by services, i.e. virtual machines with agent)
must_stop_before_deletion = True # If the service must be stopped before deletion
must_stop_before_deletion = True # If the service must be stopped before deletion
# Gui remplates, to be "incorporated" by inherited classes if needed
base_machine = gui.ChoiceField(
@ -85,6 +85,16 @@ class DynamicService(services.Service, abc.ABC): # pylint: disable=too-many-pub
order=104,
tab=types.ui.Tab.ADVANCED,
)
put_back_to_cache = gui.ChoiceField(
order=105,
label=_('Put back to cache'),
tooltip=_('On machine releasy by logout, put it back to cache instead of deleting if possible.'),
choices=[
{'id': 'no', 'text': _('No. Never put it back to cache')},
{'id': 'yes', 'text': _('Yes, try to put it back to cache')},
],
tab=types.ui.Tab.ADVANCED,
)
def initialize(self, values: 'types.core.ValuesType') -> None:
"""
@ -96,6 +106,13 @@ class DynamicService(services.Service, abc.ABC): # pylint: disable=too-many-pub
if values:
validators.validate_basename(self.basename.value, self.lenname.value)
def allow_putting_back_to_cache(self) -> bool:
if self.has_field('put_back_to_cache') and isinstance(
getattr(self, 'put_back_to_cache', None), gui.ChoiceField
):
return self.put_back_to_cache.value == 'yes'
return False
def get_basename(self) -> str:
return self.basename.value
@ -127,7 +144,9 @@ class DynamicService(services.Service, abc.ABC): # pylint: disable=too-many-pub
This method must be be provided if the field remove_duplicates is used
If not, will raise a NotImplementedError
"""
raise NotImplementedError(f'{self.__class__}: find_duplicates must be implemented if remove_duplicates is used!')
raise NotImplementedError(
f'{self.__class__}: find_duplicates must be implemented if remove_duplicates is used!'
)
@typing.final
def perform_find_duplicates(self, name: str, mac: str) -> collections.abc.Iterable[str]:
@ -152,7 +171,9 @@ class DynamicService(services.Service, abc.ABC): # pylint: disable=too-many-pub
return []
@abc.abstractmethod
def get_ip(self, caller_instance: typing.Optional['DynamicUserService | DynamicPublication'], vmid: str) -> str:
def get_ip(
self, caller_instance: typing.Optional['DynamicUserService | DynamicPublication'], vmid: str
) -> str:
"""
Returns the ip of the machine
If cannot be obtained, MUST raise an exception
@ -160,7 +181,9 @@ class DynamicService(services.Service, abc.ABC): # pylint: disable=too-many-pub
...
@abc.abstractmethod
def get_mac(self, caller_instance: typing.Optional['DynamicUserService | DynamicPublication'], vmid: str) -> str:
def get_mac(
self, caller_instance: typing.Optional['DynamicUserService | DynamicPublication'], vmid: str
) -> str:
"""
Returns the mac of the machine
If cannot be obtained, MUST raise an exception
@ -171,14 +194,18 @@ class DynamicService(services.Service, abc.ABC): # pylint: disable=too-many-pub
...
@abc.abstractmethod
def is_running(self, caller_instance: typing.Optional['DynamicUserService | DynamicPublication'], vmid: str) -> bool:
def is_running(
self, caller_instance: typing.Optional['DynamicUserService | DynamicPublication'], vmid: str
) -> bool:
"""
Returns if the machine is ready and running
"""
...
@abc.abstractmethod
def start(self, caller_instance: typing.Optional['DynamicUserService | DynamicPublication'], vmid: str) -> None:
def start(
self, caller_instance: typing.Optional['DynamicUserService | DynamicPublication'], vmid: str
) -> None:
"""
Starts the machine
Returns None. If a task is needed for anything, use the caller_instance to notify
@ -186,36 +213,44 @@ class DynamicService(services.Service, abc.ABC): # pylint: disable=too-many-pub
...
@abc.abstractmethod
def stop(self, caller_instance: typing.Optional['DynamicUserService | DynamicPublication'], vmid: str) -> None:
def stop(
self, caller_instance: typing.Optional['DynamicUserService | DynamicPublication'], vmid: str
) -> None:
"""
Stops the machine
Returns None. If a task is needed for anything, use the caller_instance to notify
"""
...
def shutdown(self, caller_instance: typing.Optional['DynamicUserService | DynamicPublication'], vmid: str) -> None:
def shutdown(
self, caller_instance: typing.Optional['DynamicUserService | DynamicPublication'], vmid: str
) -> None:
"""
Shutdowns the machine. Defaults to stop
Returns None. If a task is needed for anything, use the caller_instance to notify
"""
return self.stop(caller_instance, vmid)
def reset(self, caller_instance: typing.Optional['DynamicUserService | DynamicPublication'], vmid: str) -> None:
def reset(
self, caller_instance: typing.Optional['DynamicUserService | DynamicPublication'], vmid: str
) -> None:
"""
Resets the machine
Returns None. If a task is needed for anything, use the caller_instance to notify
"""
# Default is to stop "hard"
return self.stop(caller_instance, vmid)
def delete(self, caller_instance: typing.Optional['DynamicUserService | DynamicPublication'], vmid: str) -> None:
def delete(
self, caller_instance: typing.Optional['DynamicUserService | DynamicPublication'], vmid: str
) -> None:
"""
Removes the machine, or queues it for removal, or whatever :)
Use the caller_instance to notify anything if needed, or to identify caller
"""
# Store the deletion has started for future reference
self.set_deleting_state(vmid)
DeferredDeletionWorker.add(self, vmid)
# Method for deferred deletion
@ -229,14 +264,14 @@ class DynamicService(services.Service, abc.ABC): # pylint: disable=too-many-pub
you provided a custom delete method, but better to have it implemented
"""
...
def is_deleted(self, vmid: str) -> bool:
"""
Checks if the deferred deletion of a machine is done
Default implementation is return True always
"""
return True
def notify_deleted(self, vmid: str) -> None:
"""
Called when the deferred deletion has been done
@ -244,7 +279,7 @@ class DynamicService(services.Service, abc.ABC): # pylint: disable=too-many-pub
# Remove the deletion started flag
with self.storage.as_dict() as storage:
del storage[f'deleting_{vmid}']
def set_deleting_state(self, vmid: str) -> None:
"""
Marks a machine as deleting
@ -252,8 +287,10 @@ class DynamicService(services.Service, abc.ABC): # pylint: disable=too-many-pub
with self.storage.as_dict() as storage:
# Store deleting vmid
storage[f'deleting_{vmid}'] = True
def is_deletion_in_progress(self, caller_instance: typing.Optional['DynamicUserService | DynamicPublication'], vmid: str) -> bool:
def is_deletion_in_progress(
self, caller_instance: typing.Optional['DynamicUserService | DynamicPublication'], vmid: str
) -> bool:
"""
Checks if the deferred deletion of a machine is running
Default implementation is return False always

View File

@ -269,7 +269,7 @@ class Service(Module):
"""
return True
def allows_put_back_to_cache(self) -> bool:
def allow_putting_back_to_cache(self) -> bool:
"""
Returns if this service can be put back to cache. This is used to check if a service can be put back to cache
when the user logouts instead of being removed. By default, this method returns False.

View File

@ -191,7 +191,7 @@ class CalendarAction(UUIDModel):
def _clear_cache() -> None:
# 4.- Remove all cache_l1_srvs
for i in self.service_pool.cached_users_services().filter(
UserServiceManager().get_cache_state_filter(
UserServiceManager.manager().get_cache_state_filter(
self.service_pool,
(
types.services.CacheLevel.L1

View File

@ -365,6 +365,9 @@ class UserService(UUIDModel, properties.PropertiesMixin):
Returns True if this User Service needs an os manager (i.e. parent services pools is marked to use an os manager)
"""
return bool(self.get_osmanager())
def allow_putting_back_to_cache(self) -> bool:
return self.deployed_service.service.get_instance().allow_putting_back_to_cache()
def transforms_user_or_password_for_service(self) -> bool:
"""
@ -466,7 +469,7 @@ class UserService(UUIDModel, properties.PropertiesMixin):
if not inUse: # Service released, check y we should mark it for removal
# If our publication is not current, mark this for removal
UserServiceManager().check_for_removal(self)
UserServiceManager.manager().check_for_removal(self)
def start_accounting(self) -> None:
# 1.- If do not have any account associated, do nothing
@ -577,7 +580,7 @@ class UserService(UUIDModel, properties.PropertiesMixin):
# pylint: disable=import-outside-toplevel
from uds.core.managers.userservice import UserServiceManager
UserServiceManager().move_to_level(self, cacheLevel)
UserServiceManager.manager().move_to_level(self, cacheLevel)
def set_comms_endpoint(self, commsUrl: typing.Optional[str] = None) -> None:
self.properties['comms_url'] = commsUrl

View File

@ -282,7 +282,7 @@ if sys.platform == 'win32':
# operations.writeToPipe("\\\\.\\pipe\\VDSMDPipe", packet, True)
dbUserService = self.db_obj()
if dbUserService:
UserServiceManager().send_script(dbUserService, script)
UserServiceManager.manager().send_script(dbUserService, script)
def process_ready_from_os_manager(self, data: typing.Any) -> types.states.TaskState:
# Here we will check for suspending the VM (when full ready)

View File

@ -296,5 +296,5 @@ class BaseX2GOTransport(transports.Transport):
def getAndPushKey(self, userName: str, userService: 'models.UserService') -> tuple[str, str]:
priv, pub = self.genKeyPairForSsh()
authScript = self.getAuthorizeScript(userName, pub)
UserServiceManager().send_script(userService, authScript)
UserServiceManager.manager().send_script(userService, authScript)
return priv, pub

View File

@ -428,7 +428,7 @@ def enable_service(
# If meta service, process and rebuild idService & idTransport
try:
res = UserServiceManager().get_user_service_info(
res = UserServiceManager.manager().get_user_service_info(
request.user, request.os, request.ip, idService, idTransport, validate_with_test=False
)
scrambler = CryptoManager().random_string(32)

View File

@ -254,7 +254,7 @@ def ticket_auth(
# Check if servicePool is part of the ticket
if poolUuid:
# Request service, with transport = None so it is automatic
res = UserServiceManager().get_user_service_info(
res = UserServiceManager.manager().get_user_service_info(
request.user, request.os, request.ip, poolUuid, None, False
)
_, userservice, _, transport, _ = res

View File

@ -370,7 +370,7 @@ def update_transport_ticket(
userService = models.UserService.objects.get(
uuid=data['ticket-info'].get('userService', None)
)
UserServiceManager().notify_preconnect(
UserServiceManager.manager().notify_preconnect(
userService,
types.connections.ConnectionData(
username=username,

View File

@ -66,7 +66,7 @@ def transport_own_link(
# For type checkers to "be happy"
try:
res = UserServiceManager().get_user_service_info(
res = UserServiceManager.manager().get_user_service_info(
request.user, request.os, request.ip, service_id, transport_id
)
ip, userService, _iads, trans, itrans = res
@ -159,10 +159,10 @@ def user_service_status(
userservice: typing.Optional['UserService'] = None
status = 'running'
# If service exists (meta or not)
if UserServiceManager().is_meta_service(service_id):
userservice = UserServiceManager().locate_meta_service(user=request.user, id_metapool=service_id)
if UserServiceManager.manager().is_meta_service(service_id):
userservice = UserServiceManager.manager().locate_meta_service(user=request.user, id_metapool=service_id)
else:
userservice = UserServiceManager().locate_user_service(
userservice = UserServiceManager.manager().locate_user_service(
user=request.user, id_service=service_id, create=False
)
if userservice:
@ -191,7 +191,7 @@ def user_service_status(
def action(request: 'ExtendedHttpRequestWithUser', service_id: str, action_string: str) -> HttpResponse:
userService = UserServiceManager.manager().locate_meta_service(request.user, service_id)
if not userService:
userService = UserServiceManager().locate_user_service(request.user, service_id, create=False)
userService = UserServiceManager.manager().locate_user_service(request.user, service_id, create=False)
response: typing.Any = None
rebuild: bool = False
@ -206,7 +206,7 @@ def action(request: 'ExtendedHttpRequestWithUser', service_id: str, action_strin
),
types.log.LogSource.WEB,
)
UserServiceManager().request_logoff(userService)
UserServiceManager.manager().request_logoff(userService)
userService.release()
elif (
action_string == 'reset'
@ -222,8 +222,8 @@ def action(request: 'ExtendedHttpRequestWithUser', service_id: str, action_strin
),
types.log.LogSource.WEB,
)
# UserServiceManager().requestLogoff(userService)
UserServiceManager().reset(userService)
# UserServiceManager.manager().requestLogoff(userService)
UserServiceManager.manager().reset(userService)
if rebuild:
# Rebuild services data, but return only "this" service

View File

@ -142,14 +142,14 @@ class ServiceCacheUpdater(Job):
# # to create new items over the limit stablisshed, so we will not remove them anymore
# l1_cache_count: int = (
# servicepool.cached_users_services()
# .filter(UserServiceManager().get_cache_state_filter(servicepool, types.services.CacheLevel.L1))
# .filter(UserServiceManager.manager().get_cache_state_filter(servicepool, types.services.CacheLevel.L1))
# .count()
# )
# l2_cache_count: int = (
# (
# servicepool.cached_users_services()
# .filter(
# UserServiceManager().get_cache_state_filter(servicepool, types.services.CacheLevel.L2)
# UserServiceManager.manager().get_cache_state_filter(servicepool, types.services.CacheLevel.L2)
# )
# .count()
# )
@ -158,7 +158,7 @@ class ServiceCacheUpdater(Job):
# )
# assigned_count: int = (
# servicepool.assigned_user_services()
# .filter(UserServiceManager().get_state_filter(servicepool.service))
# .filter(UserServiceManager.manager().get_state_filter(servicepool.service))
# .count()
# )
# pool_stat = types.services.ServicePoolStats(
@ -191,7 +191,7 @@ class ServiceCacheUpdater(Job):
# continue
# # If this service don't allows more starting user services, continue
# if not UserServiceManager().can_grow_service_pool(servicepool):
# if not UserServiceManager.manager().can_grow_service_pool(servicepool):
# logger.debug(
# 'This pool cannot grow rithg now: %s',
# servicepool,
@ -263,7 +263,7 @@ class ServiceCacheUpdater(Job):
servicepool_stats.servicepool.cached_users_services()
.select_for_update()
.filter(
UserServiceManager().get_cache_state_filter(
UserServiceManager.manager().get_cache_state_filter(
servicepool_stats.servicepool, types.services.CacheLevel.L2
)
)
@ -346,7 +346,7 @@ class ServiceCacheUpdater(Job):
i
for i in servicepool_stats.servicepool.cached_users_services()
.filter(
UserServiceManager().get_cache_state_filter(
UserServiceManager.manager().get_cache_state_filter(
servicepool_stats.servicepool, types.services.CacheLevel.L1
)
)
@ -390,7 +390,7 @@ class ServiceCacheUpdater(Job):
cacheItems = (
servicepool_stats.servicepool.cached_users_services()
.filter(
UserServiceManager().get_cache_state_filter(
UserServiceManager.manager().get_cache_state_filter(
servicepool_stats.servicepool, types.services.CacheLevel.L2
)
)