mirror of
https://github.com/dkmstr/openuds.git
synced 2025-01-22 22:03:54 +03:00
Minor fixes and finished actor_initialization implementation
Improved deferred deletion to, in process_to_stop, check must_stop_before_deletion also
This commit is contained in:
parent
b01cd25648
commit
6544f194db
@ -467,7 +467,9 @@ class Initialize(ActorV3Action):
|
||||
userservice.actor_version = self._params['version']
|
||||
|
||||
# Give the oportunity to change things to the userservice on initialization
|
||||
userservice.get_instance().actor_initialization(self._params)
|
||||
if userservice.get_instance().actor_initialization(self._params):
|
||||
# Store changes to db
|
||||
userservice.update_data(userservice.get_instance())
|
||||
|
||||
os_data: dict[str, typing.Any] = {}
|
||||
osmanager = userservice.get_osmanager_instance()
|
||||
|
@ -94,33 +94,33 @@ class PublicationLauncher(DelayedTask):
|
||||
|
||||
def __init__(self, publication: ServicePoolPublication):
|
||||
super().__init__()
|
||||
self._publicationId = publication.id
|
||||
self._publication_id = publication.id
|
||||
|
||||
def run(self) -> None:
|
||||
logger.debug('Publishing')
|
||||
servicePoolPub: typing.Optional[ServicePoolPublication] = None
|
||||
servicepool_publication: typing.Optional[ServicePoolPublication] = None
|
||||
try:
|
||||
now = sql_now()
|
||||
with transaction.atomic():
|
||||
servicePoolPub = ServicePoolPublication.objects.select_for_update().get(pk=self._publicationId)
|
||||
if not servicePoolPub:
|
||||
servicepool_publication = ServicePoolPublication.objects.select_for_update().get(pk=self._publication_id)
|
||||
if not servicepool_publication:
|
||||
raise ServicePool.DoesNotExist()
|
||||
if (
|
||||
servicePoolPub.state != State.LAUNCHING
|
||||
servicepool_publication.state != State.LAUNCHING
|
||||
): # If not preparing (may has been canceled by user) just return
|
||||
return
|
||||
servicePoolPub.state = State.PREPARING
|
||||
servicePoolPub.save()
|
||||
pi = servicePoolPub.get_instance()
|
||||
servicepool_publication.state = State.PREPARING
|
||||
servicepool_publication.save()
|
||||
pi = servicepool_publication.get_instance()
|
||||
state = pi.publish()
|
||||
servicePool: ServicePool = servicePoolPub.deployed_service
|
||||
servicePool: ServicePool = servicepool_publication.deployed_service
|
||||
servicePool.current_pub_revision += 1
|
||||
servicePool.set_value(
|
||||
'toBeReplacedIn',
|
||||
serialize(now + datetime.timedelta(hours=GlobalConfig.SESSION_EXPIRE_TIME.as_int(True))),
|
||||
)
|
||||
servicePool.save()
|
||||
PublicationFinishChecker.state_updater(servicePoolPub, pi, state)
|
||||
PublicationFinishChecker.state_updater(servicepool_publication, pi, state)
|
||||
except (
|
||||
ServicePoolPublication.DoesNotExist
|
||||
): # Deployed service publication has been removed from database, this is ok, just ignore it
|
||||
@ -128,11 +128,11 @@ class PublicationLauncher(DelayedTask):
|
||||
except Exception:
|
||||
logger.exception("Exception launching publication")
|
||||
try:
|
||||
if servicePoolPub:
|
||||
servicePoolPub.state = State.ERROR
|
||||
servicePoolPub.save()
|
||||
if servicepool_publication:
|
||||
servicepool_publication.state = State.ERROR
|
||||
servicepool_publication.save()
|
||||
except Exception:
|
||||
logger.error('Error saving ERROR state for pool %s', servicePoolPub)
|
||||
logger.error('Error saving ERROR state for pool %s', servicepool_publication)
|
||||
|
||||
|
||||
# Delayed Task that checks if a publication is done
|
||||
|
@ -532,12 +532,15 @@ class UserService(Environmentable, Serializable, abc.ABC):
|
||||
"""
|
||||
pass
|
||||
|
||||
def actor_initialization(self, request_params: dict[str, typing.Any]) -> None:
|
||||
def actor_initialization(self, request_params: dict[str, typing.Any]) -> bool:
|
||||
"""
|
||||
This method is invoked by the actor REST API when the actor initialize
|
||||
is called. This is a good place to do things that needs to be once only...
|
||||
|
||||
Should return True if any internal instance data has changed, so it gets
|
||||
stored back to database.
|
||||
"""
|
||||
pass
|
||||
return False
|
||||
|
||||
def error_reason(self) -> str:
|
||||
"""
|
||||
|
@ -159,6 +159,9 @@ class DeferredDeletionWorker(Job):
|
||||
exec_time = execution_timer()
|
||||
try:
|
||||
service = services[info.service_uuid]
|
||||
if service.must_stop_before_deletion is False:
|
||||
info.sync_to_storage(types.DeferredStorageGroup.TO_DELETE)
|
||||
continue
|
||||
with exec_time:
|
||||
if service.is_running(None, info.vmid):
|
||||
# if info.retries < RETRIES_TO_RETRY, means this is the first time we try to stop it
|
||||
|
@ -77,7 +77,7 @@ class UserServiceRemover(Job):
|
||||
# USER_SERVICE_REMOVAL_LIMIT is the maximum number of items to remove at once
|
||||
# This configuration value is cached at startup, so it is not updated until next reload
|
||||
max_to_remove: int = GlobalConfig.USER_SERVICE_CLEAN_NUMBER.as_int()
|
||||
manager = UserServiceManager()
|
||||
manager = UserServiceManager.manager()
|
||||
|
||||
with transaction.atomic():
|
||||
removeFrom = sql_now() - timedelta(
|
||||
|
Loading…
x
Reference in New Issue
Block a user