1
0
mirror of https://github.com/dkmstr/openuds.git synced 2025-03-12 04:58:34 +03:00

Fixed publication return types and advancing on custom dynamic services base

This commit is contained in:
Adolfo Gómez García 2024-03-18 22:34:55 +01:00
parent 42556d9e33
commit 4fb1da1554
No known key found for this signature in database
GPG Key ID: DD1ABF20724CDA23
28 changed files with 732 additions and 355 deletions

View File

@ -41,6 +41,7 @@ from uds.core.jobs.delayed_task import DelayedTask
from uds.core.jobs.delayed_task_runner import DelayedTaskRunner
from uds.core.util.config import GlobalConfig
from uds.core.services.exceptions import PublishException
from uds.core import types
from uds.core.types.states import State
from uds.core.util import log
@ -157,19 +158,19 @@ class PublicationFinishChecker(DelayedTask):
@staticmethod
def state_updater(
publication: ServicePoolPublication,
publicationInstance: 'services.Publication',
state: str,
publication_instance: 'services.Publication',
exec_result: types.states.TaskState,
) -> None:
"""
Checks the value returned from invocation to publish or checkPublishingState, updating the servicePoolPub database object
Return True if it has to continue checking, False if finished
"""
try:
prevState: str = publication.state
checkLater: bool = False
if State.from_str(state).is_finished():
publication_state = types.states.State.from_str(publication.state)
check_later: bool = False
if exec_result.is_finished():
# Now we mark, if it exists, the previous usable publication as "Removable"
if State.from_str(prevState).is_preparing():
if publication_state.is_preparing():
old: ServicePoolPublication
for old in publication.deployed_service.publications.filter(
state=State.USABLE
@ -178,13 +179,7 @@ class PublicationFinishChecker(DelayedTask):
osm = publication.deployed_service.osmanager
# If os manager says "machine is persistent", do not tray to delete "previous version" assigned machines
doPublicationCleanup = (
True
if osm is None
else not osm.get_instance().is_persistent()
)
if doPublicationCleanup:
if osm is None or osm.get_instance().is_persistent() is False:
pc = PublicationOldMachinesCleaner(old.id)
pc.register(
GlobalConfig.SESSION_EXPIRE_TIME.as_int(True) * 3600,
@ -200,26 +195,26 @@ class PublicationFinishChecker(DelayedTask):
)
publication.set_state(State.USABLE)
elif State.from_str(prevState).is_removing():
elif publication_state.is_removing():
publication.set_state(State.REMOVED)
else: # State is canceling
publication.set_state(State.CANCELED)
# Mark all previous publications deployed services as removables
# and make this usable
publicationInstance.finish()
publication.update_data(publicationInstance)
elif State.from_str(state).is_errored():
publication.update_data(publicationInstance)
publication_instance.finish()
publication.update_data(publication_instance)
elif exec_result.is_errored():
publication.update_data(publication_instance)
publication.set_state(State.ERROR)
else:
checkLater = True # The task is running
publication.update_data(publicationInstance)
check_later = True # The task is running
publication.update_data(publication_instance)
if checkLater:
PublicationFinishChecker.check_later(publication, publicationInstance)
if check_later:
PublicationFinishChecker.check_later(publication, publication_instance)
except Exception:
logger.exception('At checkAndUpdate for publication')
PublicationFinishChecker.check_later(publication, publicationInstance)
PublicationFinishChecker.check_later(publication, publication_instance)
@staticmethod
def check_later(
@ -252,7 +247,7 @@ class PublicationFinishChecker(DelayedTask):
try:
state = publicationInstance.check_state()
except Exception:
state = State.ERROR
state = types.states.TaskState.ERROR
PublicationFinishChecker.state_updater(
publication, publicationInstance, state
)
@ -282,21 +277,21 @@ class PublicationManager(metaclass=singleton.Singleton):
) # Singleton pattern will return always the same instance
def publish(
self, servicePool: ServicePool, changeLog: typing.Optional[str] = None
self, servicepool: ServicePool, changeLog: typing.Optional[str] = None
) -> None:
"""
Initiates the publication of a service pool, or raises an exception if this cannot be done
:param servicePool: Service pool object (db object)
:param changeLog: if not None, store change log string on "change log" table
"""
if servicePool.publications.filter(state__in=State.PUBLISH_STATES).count() > 0:
if servicepool.publications.filter(state__in=State.PUBLISH_STATES).count() > 0:
raise PublishException(
_(
'Already publishing. Wait for previous publication to finish and try again'
)
)
if servicePool.is_in_maintenance():
if servicepool.is_in_maintenance():
raise PublishException(
_('Service is in maintenance mode and new publications are not allowed')
)
@ -304,15 +299,15 @@ class PublicationManager(metaclass=singleton.Singleton):
publication: typing.Optional[ServicePoolPublication] = None
try:
now = sql_datetime()
publication = servicePool.publications.create(
publication = servicepool.publications.create(
state=State.LAUNCHING,
state_date=now,
publish_date=now,
revision=servicePool.current_pub_revision,
revision=servicepool.current_pub_revision,
)
if changeLog:
servicePool.changelog.create(
revision=servicePool.current_pub_revision, log=changeLog, stamp=now
servicepool.changelog.create(
revision=servicepool.current_pub_revision, log=changeLog, stamp=now
)
if publication:
DelayedTaskRunner.runner().insert(

View File

@ -600,11 +600,20 @@ class UserServiceManager(metaclass=singleton.Singleton):
operations_logger.info('Reseting %s', user_service)
userServiceInstance = user_service.get_instance()
userservice_instance = user_service.get_instance()
try:
userServiceInstance.reset()
state = userservice_instance.reset()
except Exception:
logger.exception('Reseting service')
return
logger.debug('State: %s', state)
if state == types.states.TaskState.FINISHED:
user_service.update_data(userservice_instance)
return
UserServiceOpChecker.make_unique(user_service, userservice_instance, state)
def notify_preconnect(self, user_service: UserService, info: types.connections.ConnectionData) -> None:
try:

View File

@ -186,7 +186,7 @@ class Publication(Environmentable, Serializable):
return self._uuid
@abc.abstractmethod
def publish(self) -> types.states.State:
def publish(self) -> types.states.TaskState:
"""
This method is invoked whenever the administrator requests a new publication.
@ -222,7 +222,7 @@ class Publication(Environmentable, Serializable):
raise NotImplementedError(f'publish method for class {self.__class__.__name__} not provided! ')
@abc.abstractmethod
def check_state(self) -> types.states.State:
def check_state(self) -> types.states.TaskState:
"""
This is a task method. As that, the expected return values are
State values RUNNING, FINISHED or ERROR.
@ -275,7 +275,7 @@ class Publication(Environmentable, Serializable):
return 'unknown'
@abc.abstractmethod
def destroy(self) -> types.states.State:
def destroy(self) -> types.states.TaskState:
"""
This is a task method. As that, the expected return values are
State values RUNNING, FINISHED or ERROR.
@ -296,7 +296,7 @@ class Publication(Environmentable, Serializable):
raise NotImplementedError(f'destroy method for class {self.__class__.__name__} not provided!')
@abc.abstractmethod
def cancel(self) -> types.states.State:
def cancel(self) -> types.states.TaskState:
"""
This is a task method. As that, the expected return values are
State values RUNNING, FINISHED or ERROR.

View File

@ -0,0 +1,218 @@
# # -*- coding: utf-8 -*-
# #
# # Copyright (c) 2012-2019 Virtual Cable S.L.
# # All rights reserved.
# #
# """
# Author: Adolfo Gómez, dkmaster at dkmon dot com
# """
# import abc
# from datetime import datetime
# import logging
# import time
# import typing
# from django.utils.translation import gettext as _
# from uds.core import services, types
# from uds.core.types.services import Operation
# from uds.core.util import autoserializable
# if typing.TYPE_CHECKING:
# from .dynamic_service import DynamicService
# class DynamicPublication(services.Publication, autoserializable.AutoSerializable, abc.ABC):
# suggested_delay = 20 # For publications, we can check every 20 seconds
# _name = autoserializable.StringField(default='')
# _vm = autoserializable.StringField(default='')
# _queue = autoserializable.ListField[Operation]()
# # Utility overrides for type checking...
# def _current_op(self) -> Operation:
# if not self._queue:
# return Operation.FINISH
# return self._queue[0]
# def service(self) -> 'DynamicService':
# return typing.cast('DynamicService', super().service())
# def check_space(self) -> bool:
# """
# If the service needs to check space before publication, it should override this method
# """
# return True
# def publish(self) -> types.states.TaskState:
# """
# """
# try:
# # First we should create a full clone, so base machine do not get fullfilled with "garbage" delta disks...
# self._name = self.service().sanitize_machine_name(
# 'UDS Pub'
# + ' '
# + self.servicepool_name()
# + "-"
# + str(self.revision()) # plus current time, to avoid name collisions
# + "-"
# + f'{int(time.time())%256:2X}'
# )
# comments = _('UDS Publication for {0} created at {1}').format(
# self.servicepool_name(), str(datetime.now()).split('.')[0]
# )
# self._state = types.states.State.RUNNING
# self._operation = 'p' # Publishing
# self._destroy_after = False
# return types.states.TaskState.RUNNING
# except Exception as e:
# logger.exception('Caught exception %s', e)
# self._reason = str(e)
# return types.states.TaskState.ERROR
# def _execute_queue(self) -> types.states.TaskState:
# op = self._current_op()
# if op == Operation.ERROR:
# return types.states.TaskState.ERROR
# if op == Operation.FINISH:
# return types.states.TaskState.FINISHED
# def check_state(
# self,
# ) -> types.states.State: # pylint: disable = too-many-branches,too-many-return-statements
# if self._state != types.states.State.RUNNING:
# return types.states.State.from_str(self._state)
# task = self.service().get_task_info(self._task)
# trans: typing.Dict[str, str] = {
# VMWTask.ERROR: types.states.State.ERROR,
# VMWTask.RUNNING: types.states.State.RUNNING,
# VMWTask.FINISHED: types.states.State.FINISHED,
# VMWTask.UNKNOWN_TASK: types.states.State.ERROR,
# }
# reasons: typing.Dict[str, str] = {
# VMWTask.ERROR: 'Error',
# VMWTask.RUNNING: 'Already running',
# VMWTask.FINISHED: 'Finished',
# VMWTask.UNKNOWN_TASK: 'Task not known by VC',
# }
# try:
# st = task.state() or VMWTask.UNKNOWN_TASK
# except TypeError as e:
# logger.exception(
# 'Catch exception invoking vmware, delaying request: %s %s',
# e.__class__,
# e,
# )
# return types.states.State.from_str(self._state)
# except Exception as e:
# logger.exception('Catch exception invoking vmware: %s %s', e.__class__, e)
# self._state = types.states.State.ERROR
# self._reason = str(e)
# return self._state
# self._reason = reasons[st]
# self._state = trans[st]
# if self._state == types.states.State.ERROR:
# self._reason = task.result() or 'Publication not found!'
# elif self._state == types.states.State.FINISHED:
# if self._operation == 'x': # Destroying snapshot
# return self._remove_machine()
# if self._operation == 'p':
# self._vm = str(task.result() or '')
# # Now make snapshot
# if self._destroy_after:
# return self.destroy()
# if self.isFullCloner() is True: # If full cloner is our father
# self._snapshot = ''
# self._task = ''
# self._state = types.states.State.FINISHED
# return self._state
# try:
# comments = 'UDS Snapshot created at ' + str(datetime.now())
# self._task = self.service().create_snapshot(self._vm, SNAPNAME, comments)
# self._state = types.states.State.RUNNING
# self._operation = 's' # Snapshoting
# except Exception as e:
# self._state = types.states.State.ERROR
# self._reason = str(e)
# logger.exception('Exception caught creating snapshot')
# elif self._operation == 's':
# self._snapshot = task.result() or ''
# if (
# self._destroy_after
# ): # If publishing and was canceled or destroyed before finishing, do it now
# return self.destroy()
# else:
# self._snapshot = ''
# return types.states.State.from_str(self._state)
# def finish(self) -> None:
# self._task = ''
# self._destroy_after = False
# def destroy(self) -> types.states.State:
# if (
# self._state == types.states.State.RUNNING and self._destroy_after is False
# ): # If called destroy twice, will BREAK STOP publication
# self._destroy_after = True
# return types.states.State.RUNNING
# self._destroy_after = False
# # if self.snapshot != '':
# # return self.__removeSnapshot()
# return self._remove_machine()
# def cancel(self) -> types.states.State:
# return self.destroy()
# def error_reason(self) -> str:
# return self._reason
# def snapshot_reference(self) -> str:
# return self.service().provider().get_current_snapshot(self._vm) or 'invalid-snapshot'
# # return self.snapshot
# def machine_reference(self) -> str:
# return self._vm
# def _remove_machine(self) -> types.states.State:
# if not self._vm:
# logger.error("Machine reference not found!!")
# return types.states.State.ERROR
# try:
# self._task = self.service().remove_machine(self._vm)
# self._state = types.states.State.RUNNING
# self._operation = 'd'
# self._destroy_after = False
# return types.states.State.RUNNING
# except Exception as e:
# # logger.exception("Caught exception at __removeMachine %s:%s", e.__class__, e)
# logger.error('Error removing machine: %s', e)
# self._reason = str(e)
# return types.states.State.ERROR
# def unmarshal(self, data: bytes) -> None:
# if autoserializable.is_autoserializable_data(data):
# return super().unmarshal(data)
# _auto_data = OldSerialData()
# _auto_data.unmarshal(data)
# # Fill own data from restored data
# self._name = _auto_data._name
# self._vm = _auto_data._vm
# self._snapshot = _auto_data._snapshot
# self._task = _auto_data._task
# self._state = _auto_data._state
# self._operation = _auto_data._operation
# self._destroy_after = _auto_data._destroyAfter
# self._reason = _auto_data._reason
# # Flag for upgrade
# self.mark_for_upgrade(True)

View File

@ -39,7 +39,8 @@ from uds.core.util import fields, validators
# Not imported at runtime, just for type checking
if typing.TYPE_CHECKING:
# from .dynamic_userservice import DynamicUserService
from .dynamic_userservice import DynamicUserService
pass
logger = logging.getLogger(__name__)
@ -110,8 +111,15 @@ class DynamicService(services.Service, abc.ABC): # pylint: disable=too-many-pub
def get_lenname(self) -> int:
return self.lenname.value
def sanitize_machine_name(self, name: str) -> str:
"""
Sanitize machine name
Override it to provide a custom name sanitizer
"""
return name
@abc.abstractmethod
def get_machine_ip(self, machine_id: str) -> str:
def get_machine_ip(self, userservice_instance: 'DynamicUserService', machine_id: str) -> str:
"""
Returns the ip of the machine
If cannot be obtained, MUST raise an exception
@ -119,12 +127,83 @@ class DynamicService(services.Service, abc.ABC): # pylint: disable=too-many-pub
...
@abc.abstractmethod
def get_machine_mac(self, machine_id: str) -> str:
def get_machine_mac(self, userservice_instance: 'DynamicUserService', machine_id: str) -> str:
"""
Returns the mac of the machine
If cannot be obtained, MUST raise an exception
"""
...
@abc.abstractmethod
def is_machine_running(self, userservice_instance: 'DynamicUserService', machine_id: str) -> bool:
"""
Returns if the machine is running
"""
...
def is_machine_stopped(self, userservice_instance: 'DynamicUserService', machine_id: str) -> bool:
"""
Returns if the machine is stopped
"""
return not self.is_machine_running(userservice_instance, machine_id)
def is_machine_suspended(self, userservice_instance: 'DynamicUserService', machine_id: str) -> bool:
"""
Returns if the machine is suspended
"""
return self.is_machine_stopped(userservice_instance, machine_id)
@abc.abstractmethod
def create_machine(self, userservice_instance: 'DynamicUserService') -> str:
"""
Creates a new machine
Note that this must, in instance, or invoke somthing of the userservice
or operate by itself on userservice_instance
"""
...
@abc.abstractmethod
def start_machine(self, userservice_instance: 'DynamicUserService', machine_id: str) -> None:
"""
Starts the machine
"""
...
@abc.abstractmethod
def stop_machine(self, userservice_instance: 'DynamicUserService', machine_id: str) -> None:
"""
Stops the machine
"""
...
def shutdown_machine(self, userservice_instance: 'DynamicUserService', machine_id: str) -> None:
"""
Shutdowns the machine
Defaults to stop_machine
"""
self.stop_machine(userservice_instance, machine_id)
@abc.abstractmethod
def reset_machine(self, userservice_instance: 'DynamicUserService', machine_id: str) -> None:
"""
Resets the machine
"""
...
def suspend_machine(self, userservice_instance: 'DynamicUserService', machine_id: str) -> None:
"""
Suspends the machine
Defaults to shutdown_machine.
Can be overriden if the service supports suspending.
"""
self.shutdown_machine(userservice_instance, machine_id)
@abc.abstractmethod
def remove_machine(self, muserservice_instance: 'DynamicUserService', achine_id: str) -> None:
"""
Removes the machine
"""
...
def keep_on_error(self) -> bool:
return self.maintain_on_error.value

View File

@ -31,12 +31,12 @@
Author: Adolfo Gómez, dkmaster at dkmon dot com
"""
import abc
import enum
import logging
import typing
import collections.abc
from uds.core import services, types, consts
from uds.core.types.services import Operation
from uds.core.util import log, autoserializable
# Not imported at runtime, just for type checking
@ -47,36 +47,6 @@ if typing.TYPE_CHECKING:
logger = logging.getLogger(__name__)
class Operation(enum.IntEnum):
INITIALIZE = 1000
CREATE = 1001
CREATE_COMPLETED = 1002
START = 1003
START_COMPLETED = 1004
STOP = 1005 # This is a "hard" shutdown, likes a power off
STOP_COMPLETED = 1006
SHUTDOWN = 1007 # This is a "soft" shutdown
SHUTDOWN_COMPLETED = 1008
SUSPEND = 1009 # If not provided, Suppend is a "soft" shutdown
SUSPEND_COMPLETED = 1010
REMOVE = 1011
REMOVE_COMPLETED = 1012
RETRY = 1016
WAIT = 1013
NOP = 1014
ERROR = 9000
FINISH = 9900
UNKNOWN = 9999
@staticmethod
def from_int(value: int) -> 'Operation':
try:
return Operation(value)
except ValueError:
return Operation.UNKNOWN
class DynamicUserService(services.UserService, autoserializable.AutoSerializable, abc.ABC):
"""
This class represents a fixed user service, that is, a service that is assigned to an user
@ -86,14 +56,17 @@ class DynamicUserService(services.UserService, autoserializable.AutoSerializable
suggested_delay = 5
# Some customization fields
# If ip can be manually overriden
can_set_ip: typing.ClassVar[bool] = False
max_state_checks: typing.ClassVar[int] = 20 # By default, with a delay of 5 seconds, this is 100 seconds
# How many times we will check for a state before giving up
max_state_checks: typing.ClassVar[int] = 20
# If keep_state_sets_error is true, and an error occurs, the machine is set to FINISHED instead of ERROR
keep_state_sets_error: typing.ClassVar[bool] = False
_name = autoserializable.StringField(default='')
_mac = autoserializable.StringField(default='')
_vmid = autoserializable.StringField(default='')
_reason = autoserializable.StringField(default='')
_task = autoserializable.StringField(default='')
_queue = autoserializable.ListField[Operation]() # Default is empty list
# Note that even if SNAPHSHOT operations are in middel
@ -178,12 +151,18 @@ class DynamicUserService(services.UserService, autoserializable.AutoSerializable
logger.debug('Setting error state, reason: %s', reason)
self.do_log(log.LogLevel.ERROR, reason)
if self._vmid and self.service().keep_on_error() is False:
try:
# TODO: Remove VM using service or put it on a "to be removed" queue for a parallel job
self._vmid = ''
except Exception as e:
logger.exception('Exception removing machine: %s', e)
if self._vmid:
if self.service().keep_on_error() is False:
try:
# TODO: Remove VM using service or put it on a "to be removed" queue for a parallel job
self._vmid = ''
except Exception as e:
logger.exception('Exception removing machine: %s', e)
else:
logger.debug('Keep on error is enabled, not removing machine')
if self.keep_state_sets_error is False:
self._queue = [Operation.FINISH]
return types.states.TaskState.FINISHED
self._queue = [Operation.ERROR]
self._reason = reason
@ -213,15 +192,18 @@ class DynamicUserService(services.UserService, autoserializable.AutoSerializable
@typing.final
def get_unique_id(self) -> str:
# Provide self to the service, so it can some of our methods to generate the unique id
# (for example, own mac generator, that will autorelease the mac as soon as the machine is removed)
if not self._mac:
self._mac = self.service().get_machine_mac(self._vmid) or ''
self._mac = self.service().get_machine_mac(self, self._vmid) or ''
return self._mac
@typing.final
def get_ip(self) -> str:
# Provide self to the service, so it can some of our methods to generate the unique id
try:
if self._vmid:
return self.service().get_machine_ip(self._vmid)
return self.service().get_machine_ip(self, self._vmid)
except Exception:
logger.warning('Error obtaining IP for %s: %s', self.__class__.__name__, self._vmid, exc_info=True)
pass
@ -244,6 +226,28 @@ class DynamicUserService(services.UserService, autoserializable.AutoSerializable
self._queue = self._create_queue_l2_cache.copy()
return self._execute_queue()
@typing.final
def set_ready(self) -> types.states.TaskState:
# If already ready, return finished
if self.cache.get('ready') == '1':
return types.states.TaskState.FINISHED
try:
if self.service().is_machine_running(self, self._vmid):
self.cache.put('ready', '1')
return types.states.TaskState.FINISHED
self._queue = [Operation.START, Operation.START_COMPLETED, Operation.FINISH]
return self._execute_queue()
except Exception as e:
return self._error(f'Error on setReady: {e}')
def reset(self) -> types.states.TaskState:
if self._vmid != '':
self._queue = [Operation.RESET, Operation.RESET_COMPLETED, Operation.FINISH]
return types.states.TaskState.FINISHED
@typing.final
def _execute_queue(self) -> types.states.TaskState:
self._debug('execute_queue')
@ -257,17 +261,56 @@ class DynamicUserService(services.UserService, autoserializable.AutoSerializable
try:
self._reset_checks_counter() # Reset checks counter
operation_runner = _EXEC_FNCS[op]
# Invoke using instance, we have overrided methods
# and we want to use the overrided ones
getattr(self, operation_runner.__name__)()
# For custom operations, we will call the only one method
if op.is_custom():
self.op_custom(op)
else:
operation_runner = _EXECUTORS[op]
# Invoke using instance, we have overrided methods
# and we want to use the overrided ones
getattr(self, operation_runner.__name__)()
return types.states.TaskState.RUNNING
except Exception as e:
logger.exception('Unexpected FixedUserService exception: %s', e)
return self._error(str(e))
def check_state(self) -> types.states.TaskState:
"""
Check what operation is going on, and acts acordly to it
"""
self._debug('check_state')
op = self._current_op()
if op == Operation.ERROR:
return types.states.TaskState.ERROR
if op == Operation.FINISH:
return types.states.TaskState.FINISHED
if op != Operation.WAIT:
# All operations except WAIT will check against checks counter
state = self._inc_checks_counter(self._op2str(op))
if state is not None:
return state # Error, Finished or None
try:
if op.is_custom():
state = self.op_custom_checker(op)
else:
state = _CHECKERS[op](self)
if state == types.states.TaskState.FINISHED:
# Remove runing op
self._queue.pop(0)
return self._execute_queue()
return state
except Exception as e:
return self._error(e)
# Execution methods
# Every Operation has an execution method and a check method
def op_initialize(self) -> None:
@ -292,7 +335,7 @@ class DynamicUserService(services.UserService, autoserializable.AutoSerializable
"""
This method is called when the service is started
"""
pass
self.service().start_machine(self, self._vmid)
def op_start_completed(self) -> None:
"""
@ -302,9 +345,9 @@ class DynamicUserService(services.UserService, autoserializable.AutoSerializable
def op_stop(self) -> None:
"""
This method is called when the service is stopped
This method is called for stopping the service
"""
pass
self.service().stop_machine(self, self._vmid)
def op_stop_completed(self) -> None:
"""
@ -314,34 +357,46 @@ class DynamicUserService(services.UserService, autoserializable.AutoSerializable
def op_shutdown(self) -> None:
"""
This method is called when the service is shutdown
This method is called for shutdown the service
"""
self.op_stop() # By default, shutdown is a stop
self.service().shutdown_machine(self, self._vmid)
def op_shutdown_completed(self) -> None:
"""
This method is called when the service shutdown is completed
"""
self.op_stop_completed()
pass
def op_suspend(self) -> None:
"""
This method is called when the service is suspended
This method is called for suspend the service
"""
# Note that by default suspend is "shutdown" and not "stop" because we
self.op_shutdown()
self.service().suspend_machine(self, self._vmid)
def op_suspend_completed(self) -> None:
"""
This method is called when the service suspension is completed
"""
self.op_shutdown_completed()
pass
def op_reset(self) -> None:
"""
This method is called when the service is reset
"""
pass
def op_reset_completed(self) -> None:
"""
This method is called when the service reset is completed
"""
self.service().reset_machine(self, self._vmid)
def op_remove(self) -> None:
"""
This method is called when the service is removed
"""
pass
self.service().remove_machine(self, self._vmid)
def op_remove_completed(self) -> None:
"""
@ -349,21 +404,24 @@ class DynamicUserService(services.UserService, autoserializable.AutoSerializable
"""
pass
def op_retry(self) -> None:
"""
This method is called when the service is retried
"""
pass
def op_wait(self) -> None:
"""
This method is called when the service is waiting
Basically, will stop the execution of the queue until something external changes it (i.e. poping from the queue)
Executor does nothing
"""
pass
def op_nop(self) -> None:
"""
This method is called when the service is doing nothing
This does nothing, as it's a NOP operation
"""
pass
def op_custom(self, operation: Operation) -> None:
"""
This method is called when the service is doing a custom operation
"""
pass
@ -435,6 +493,12 @@ class DynamicUserService(services.UserService, autoserializable.AutoSerializable
"""
return types.states.TaskState.FINISHED
def op_reset_checker(self) -> types.states.TaskState:
"""
This method is called to check if the service is reset
"""
return types.states.TaskState.FINISHED
def op_remove_checker(self) -> types.states.TaskState:
"""
This method is called to check if the service is removed
@ -447,18 +511,24 @@ class DynamicUserService(services.UserService, autoserializable.AutoSerializable
"""
return types.states.TaskState.FINISHED
def op_retry_checker(self) -> types.states.TaskState:
"""
This method is called to check if the service is retried
"""
return types.states.TaskState.FINISHED
def op_wait_checker(self) -> types.states.TaskState:
"""
Wait will remain in the same state until something external changes it (i.e. poping from the queue)
"""
return types.states.TaskState.RUNNING
def op_nop_checker(self) -> types.states.TaskState:
"""
This method is called to check if the service is doing nothing
"""
return types.states.TaskState.FINISHED
def op_custom_checker(self, operation: Operation) -> types.states.TaskState:
"""
This method is called to check if the service is doing a custom operation
"""
return types.states.TaskState.FINISHED
# ERROR, FINISH and UNKNOWN are not here, as they are final states not needing to be checked
@staticmethod
@ -467,13 +537,12 @@ class DynamicUserService(services.UserService, autoserializable.AutoSerializable
def _debug(self, txt: str) -> None:
logger.debug(
'Queue at %s for %s: %s, mac:%s, vmId:%s, task:%s',
'Queue at %s for %s: %s, mac:%s, vmId:%s',
txt,
self._name,
[DynamicUserService._op2str(op) for op in self._queue],
self._mac,
self._vmid,
self._task,
)
@ -482,7 +551,7 @@ class DynamicUserService(services.UserService, autoserializable.AutoSerializable
# Basically, all methods starting with _ are final, and all other are overridable
# We use __name__ later to use them, so we can use type checking and invoke them via instance
# Note that ERROR and FINISH are not here, as they final states not needing to be executed
_EXEC_FNCS: typing.Final[
_EXECUTORS: typing.Final[
collections.abc.Mapping[Operation, collections.abc.Callable[[DynamicUserService], None]]
] = {
Operation.INITIALIZE: DynamicUserService.op_initialize,
@ -498,13 +567,12 @@ _EXEC_FNCS: typing.Final[
Operation.SUSPEND_COMPLETED: DynamicUserService.op_suspend_completed,
Operation.REMOVE: DynamicUserService.op_remove,
Operation.REMOVE_COMPLETED: DynamicUserService.op_remove_completed,
Operation.RETRY: DynamicUserService.op_retry,
Operation.WAIT: DynamicUserService.op_wait,
Operation.NOP: DynamicUserService.op_nop,
}
# Same af before, but for check methods
_CHECK_FNCS: typing.Final[
_CHECKERS: typing.Final[
collections.abc.Mapping[Operation, collections.abc.Callable[[DynamicUserService], types.states.TaskState]]
] = {
Operation.INITIALIZE: DynamicUserService.op_initialize_checker,
@ -520,6 +588,6 @@ _CHECK_FNCS: typing.Final[
Operation.SUSPEND_COMPLETED: DynamicUserService.op_suspend_completed_checker,
Operation.REMOVE: DynamicUserService.op_remove_checker,
Operation.REMOVE_COMPLETED: DynamicUserService.op_remove_completed_checker,
Operation.RETRY: DynamicUserService.op_retry_checker,
Operation.WAIT: DynamicUserService.op_wait_checker,
Operation.NOP: DynamicUserService.op_nop_checker,
}

View File

@ -31,12 +31,12 @@
Author: Adolfo Gómez, dkmaster at dkmon dot com
"""
import abc
import enum
import logging
import typing
import collections.abc
from uds.core import services, types
from uds.core.types.services import FixedOperation as Operation
from uds.core.util import log, autoserializable
# Not imported at runtime, just for type checking
@ -47,30 +47,6 @@ if typing.TYPE_CHECKING:
logger = logging.getLogger(__name__)
class Operation(enum.IntEnum):
CREATE = 0
START = 1
STOP = 2
REMOVE = 3
WAIT = 4
ERROR = 5
FINISH = 6
RETRY = 7
SNAPSHOT_CREATE = 8 # to recall process_snapshot
SNAPSHOT_RECOVER = 9 # to recall process_snapshot
PROCESS_TOKEN = 10
SOFT_SHUTDOWN = 11
NOP = 98
UNKNOWN = 99
@staticmethod
def from_int(value: int) -> 'Operation':
try:
return Operation(value)
except ValueError:
return Operation.UNKNOWN
class FixedUserService(services.UserService, autoserializable.AutoSerializable, abc.ABC):
"""

View File

@ -581,12 +581,13 @@ class UserService(Environmentable, Serializable):
"""
return cls.cancel != UserService.cancel
def reset(self) -> None:
def reset(self) -> types.states.TaskState:
"""
This method is invoked for "reset" an user service
This method is not intended to be a task right now, (so its one step method)
base method does nothing
"""
return types.states.TaskState.FINISHED
def get_connection_data(self) -> typing.Optional[types.services.ConnectionData]:
"""

View File

@ -114,3 +114,97 @@ class CacheLevel(enum.IntEnum):
NONE = 0 # : Constant for User cache level (no cache at all)
L1 = 1 # : Constant for Cache of level 1
L2 = 2 # : Constant for Cache of level 2
class Operation(enum.IntEnum):
"""
Generic Operation type, to be used as a "status" for operations on userservices
Notes:
* We set all numbers to VERY HIGH, so we can use the same class for all services
* Note that we will need to "translate" old values to new ones on the service,
* Adapting existing services to this, will probably need a migration
"""
INITIALIZE = 1000
CREATE = 1001
CREATE_COMPLETED = 1002
START = 1003
START_COMPLETED = 1004
STOP = 1005 # This is a "hard" shutdown, likes a power off
STOP_COMPLETED = 1006
SHUTDOWN = 1007 # This is a "soft" shutdown
SHUTDOWN_COMPLETED = 1008
SUSPEND = 1009 # If not provided, Suppend is a "soft" shutdown
SUSPEND_COMPLETED = 1010
RESET = 1011
RESET_COMPLETED = 1012
REMOVE = 1013
REMOVE_COMPLETED = 1014
WAIT = 1100
NOP = 1101
ERROR = 9000
FINISH = 9900
UNKNOWN = 9999
# Some custom values, jut in case anyone needs them
# For example, on a future, all fixed userservice will be moved
# to this model, and we will need to "translate" the old values to the new ones
# So we will translate, for example SNAPSHOT_CREATE to CUSTOM_1, etc..
CUSTOM_1 = 20001
CUSTOM_2 = 20002
CUSTOM_3 = 20003
CUSTOM_4 = 20004
CUSTOM_5 = 20005
CUSTOM_6 = 20006
CUSTOM_7 = 20007
CUSTOM_8 = 20008
CUSTOM_9 = 20009
def is_custom(self) -> bool:
"""
Returns if the operation is a custom one
"""
return self.value >= Operation.CUSTOM_1.value
@staticmethod
def from_int(value: int) -> 'Operation':
try:
return Operation(value)
except ValueError:
return Operation.UNKNOWN
def as_str(self) -> str:
return self.name
# FixedOperation, currently used on FixedServices
# Will evolve to Operation on future
class FixedOperation(enum.IntEnum):
CREATE = 0
START = 1
STOP = 2
REMOVE = 3
WAIT = 4
ERROR = 5
FINISH = 6
RETRY = 7
SNAPSHOT_CREATE = 8 # to recall process_snapshot
SNAPSHOT_RECOVER = 9 # to recall process_snapshot
PROCESS_TOKEN = 10
SOFT_SHUTDOWN = 11
NOP = 98
UNKNOWN = 99
@staticmethod
def from_int(value: int) -> 'FixedOperation':
try:
return FixedOperation(value)
except ValueError:
return FixedOperation.UNKNOWN
def as_str(self) -> str:
return self.name

View File

@ -460,9 +460,10 @@ class AutoSerializable(Serializable, metaclass=_FieldNameSetter):
def _autoserializable_fields(self) -> collections.abc.Iterator[tuple[str, _SerializableField[typing.Any]]]:
"""Returns an iterator over all fields in the class, including inherited ones
(that is, all fields that are instances of _SerializableField in the class and its bases)
Returns:
Tuple(name, field) for each field in the class and its bases
Tuple(name, _SerializableField) for each field in the class and its bases
"""
cls = self.__class__
while True:

View File

@ -1,86 +0,0 @@
# -*- coding: utf-8 -*-
#
# Copyright (c) 2012-2021 Virtual Cable S.L.U.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without modification,
# are permitted provided that the following conditions are met:
#
# * Redistributions of source code must retain the above copyright notice,
# this list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above copyright notice,
# this list of conditions and the following disclaimer in the documentation
# and/or other materials provided with the distribution.
# * Neither the name of Virtual Cable S.L.U. nor the names of its contributors
# may be used to endorse or promote products derived from this software
# without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
# DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
# SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
"""
@author: Adolfo Gómez, dkmaster at dkmon dot com
"""
import typing
class StateQueue:
_queue: list[typing.Any]
_current: typing.Optional[typing.Any]
def __init__(self) -> None:
self._queue = []
self._current = None
def __str__(self) -> str:
return f'<StateQueue Current: {self._current}, Queue: ({",".join(state for state in self._queue)})>'
def clear(self) -> None:
self._queue.clear()
def reset(self) -> None:
self._queue.clear()
self._current = None
def current(self) -> typing.Any:
return self._current
def set_current(self, newState: typing.Any) -> typing.Any:
self._current = newState
return self._current
def contains(self, state: typing.Any) -> bool:
# if self._queue.co
return state in self._queue
# for s in self._queue:
# if s == state:
# return True
# return False
__contains__ = contains
def push_back(self, state: typing.Any) -> None:
self._queue.append(state)
def push_front(self, state: typing.Any) -> None:
self._queue.insert(0, state)
def pop_front(self) -> typing.Optional[typing.Any]:
if self._queue:
return self._queue.pop(0)
return None
def remove(self, state: typing.Any) -> None:
try:
self._queue.remove(state)
except Exception: # nosec: Fine to ignore exception here
pass # If state not in queue, nothing happens

View File

@ -127,13 +127,14 @@ class OVirtLinkedUserService(services.UserService, autoserializable.AutoSerializ
with self.storage.as_dict() as data:
return data.get('exec_count', 0)
def _set_checks_counter(self, value: int) -> None:
def _reset_checks_counter(self) -> None:
with self.storage.as_dict() as data:
data['exec_count'] = value
data['exec_count'] = 0
def _inc_checks_counter(self, info: typing.Optional[str] = None) -> typing.Optional[types.states.TaskState]:
count = self._get_checks_counter() + 1
self._set_checks_counter(count)
with self.storage.as_dict() as data:
count = data.get('exec_count', 0)
data['exec_count'] = count + 1
if count > MAX_CHECK_COUNT:
return self._error(f'Max checks reached on {info or "unknown"}')
return None
@ -252,13 +253,15 @@ class OVirtLinkedUserService(services.UserService, autoserializable.AutoSerializ
return types.states.TaskState.FINISHED
def reset(self) -> None:
def reset(self) -> types.states.TaskState:
"""
o oVirt, reset operation just shutdowns it until v3 support is removed
"""
if self._vmid != '':
self.service().provider().api.stop_machine(self._vmid)
return types.states.TaskState.FINISHED
def get_console_connection(
self,
) -> typing.Optional[types.services.ConsoleConnectionInfo]:
@ -329,9 +332,6 @@ if sys.platform == 'win32':
check_states,
)
if (check_result := self._inc_checks_counter('check_machine_state')) is not None:
return check_result
vm_info = self.service().provider().api.get_machine_info(self._vmid)
if vm_info.status == ov_types.VMStatus.UNKNOWN and ov_types.VMStatus.UNKNOWN not in check_states:
@ -387,7 +387,7 @@ if sys.platform == 'win32':
return types.states.TaskState.FINISHED
# Reset checking count (for checks)
self._set_checks_counter(0)
self._reset_checks_counter()
try:
operation_runner = _EXECUTORS[op]
@ -628,6 +628,12 @@ if sys.platform == 'win32':
if op == Operation.FINISH:
return types.states.TaskState.FINISHED
if op != Operation.WAIT:
# If not waiting, check if we are in a loop
ret = self._inc_checks_counter(str(op))
if ret is not None:
return ret
try:
operation_checker = _CHECKERS[op]

View File

@ -88,7 +88,7 @@ class OVirtPublication(Publication, autoserializable.AutoSerializable):
self._destroy_after = destroy_after == 't'
self.mark_for_upgrade() # Mark so manager knows it has to be saved again
def publish(self) -> types.states.State:
def publish(self) -> types.states.TaskState:
"""
Realizes the publication of the service
"""
@ -107,19 +107,19 @@ class OVirtPublication(Publication, autoserializable.AutoSerializable):
except Exception as e:
self._state = 'error'
self._reason = str(e)
return types.states.State.ERROR
return types.states.TaskState.ERROR
return types.states.State.RUNNING
return types.states.TaskState.RUNNING
def check_state(self) -> types.states.State:
def check_state(self) -> types.states.TaskState:
"""
Checks state of publication creation
"""
if self._state == 'ok':
return types.states.State.FINISHED
return types.states.TaskState.FINISHED
if self._state == 'error':
return types.states.State.ERROR
return types.states.TaskState.ERROR
try:
status = self.service().provider().api.get_template_info(self._template_id).status
@ -132,26 +132,26 @@ class OVirtPublication(Publication, autoserializable.AutoSerializable):
if self._destroy_after:
self._destroy_after = False
return self.destroy()
return types.states.State.FINISHED
return types.states.TaskState.FINISHED
except Exception as e:
self._state = 'error'
self._reason = str(e)
return types.states.State.ERROR
return types.states.TaskState.ERROR
return types.states.State.RUNNING
return types.states.TaskState.RUNNING
def error_reason(self) -> str:
"""
If a publication produces an error, here we must notify the reason why
it happened. This will be called just after publish or check_state
if they return types.states.State.ERROR
if they return types.states.TaskState.ERROR
Returns an string, in our case, set at check_state
"""
return self._reason
def destroy(self) -> types.states.State:
def destroy(self) -> types.states.TaskState:
"""
This is called once a publication is no more needed.
@ -159,24 +159,24 @@ class OVirtPublication(Publication, autoserializable.AutoSerializable):
removing created "external" data (environment gets cleaned by core),
etc..
The retunred value is the same as when publishing, types.states.State.RUNNING,
types.states.State.FINISHED or types.states.State.ERROR.
The retunred value is the same as when publishing, types.states.TaskState.RUNNING,
types.states.TaskState.FINISHED or types.states.TaskState.ERROR.
"""
# We do not do anything else to destroy this instance of publication
if self._state == 'locked':
self._destroy_after = True
return types.states.State.RUNNING
return types.states.TaskState.RUNNING
try:
self.service().provider().api.remove_template(self._template_id)
except Exception as e:
self._state = 'error'
self._reason = str(e)
return types.states.State.ERROR
return types.states.TaskState.ERROR
return types.states.State.FINISHED
return types.states.TaskState.FINISHED
def cancel(self) -> types.states.State:
def cancel(self) -> types.states.TaskState:
"""
Do same thing as destroy
"""

View File

@ -55,23 +55,23 @@ class OpenGnsysPublication(Publication, autoserializable.AutoSerializable):
def service(self) -> 'OGService':
return typing.cast('OGService', super().service())
def publish(self) -> types.states.State:
def publish(self) -> types.states.TaskState:
"""
Realizes the publication of the service, on OpenGnsys, does nothing
"""
return types.states.State.FINISHED
return types.states.TaskState.FINISHED
def check_state(self) -> types.states.State:
def check_state(self) -> types.states.TaskState:
"""
Checks state of publication creation
"""
return types.states.State.FINISHED
return types.states.TaskState.FINISHED
def error_reason(self) -> str:
return 'No error possible :)'
def destroy(self) -> types.states.State:
return types.states.State.FINISHED
def destroy(self) -> types.states.TaskState:
return types.states.TaskState.FINISHED
def cancel(self) -> types.states.State:
def cancel(self) -> types.states.TaskState:
return self.destroy()

View File

@ -152,10 +152,12 @@ class OpenNebulaLiveDeployment(services.UserService, autoserializable.AutoSerial
return types.states.TaskState.FINISHED
def reset(self) -> None:
def reset(self) -> types.states.TaskState:
if self._vmid != '':
self.service().resetMachine(self._vmid)
return types.states.TaskState.FINISHED
def get_console_connection(self) -> typing.Optional[types.services.ConsoleConnectionInfo]:
return self.service().get_console_connection(self._vmid)

View File

@ -80,7 +80,7 @@ class OpenNebulaLivePublication(Publication, autoserializable.AutoSerializable):
self.mark_for_upgrade() # Flag so manager can save it again with new format
def publish(self) -> types.states.State:
def publish(self) -> types.states.TaskState:
"""
Realizes the publication of the service
"""
@ -95,47 +95,47 @@ class OpenNebulaLivePublication(Publication, autoserializable.AutoSerializable):
except Exception as e:
self._state = 'error'
self._reason = str(e)
return types.states.State.ERROR
return types.states.TaskState.ERROR
return types.states.State.RUNNING
return types.states.TaskState.RUNNING
def check_state(self) -> types.states.State:
def check_state(self) -> types.states.TaskState:
"""
Checks state of publication creation
"""
if self._state == 'running':
try:
if self.service().check_template_published(self._template_id) is False:
return types.states.State.RUNNING
return types.states.TaskState.RUNNING
self._state = 'ok'
except Exception as e:
self._state = 'error'
self._reason = str(e)
if self._state == 'error':
return types.states.State.ERROR
return types.states.TaskState.ERROR
if self._state == 'ok':
if self._destroy_after: # If we must destroy after publication, do it now
self._destroy_after = False
return self.destroy()
return types.states.State.FINISHED
return types.states.TaskState.FINISHED
self._state = 'ok'
return types.states.State.FINISHED
return types.states.TaskState.FINISHED
def error_reason(self) -> str:
"""
If a publication produces an error, here we must notify the reason why
it happened. This will be called just after publish or check_state
if they return types.states.State.ERROR
if they return types.states.TaskState.ERROR
Returns an string, in our case, set at check_state
"""
return self._reason
def destroy(self) -> types.states.State:
def destroy(self) -> types.states.TaskState:
"""
This is called once a publication is no more needed.
@ -143,15 +143,15 @@ class OpenNebulaLivePublication(Publication, autoserializable.AutoSerializable):
removing created "external" data (environment gets cleaned by core),
etc..
The retunred value is the same as when publishing, types.states.State.RUNNING,
types.states.State.FINISHED or types.states.State.ERROR.
The retunred value is the same as when publishing, types.states.TaskState.RUNNING,
types.states.TaskState.FINISHED or types.states.TaskState.ERROR.
"""
if self._state == 'error':
return types.states.State.ERROR
return types.states.TaskState.ERROR
if self._state == 'running':
self._destroy_after = True
return types.states.State.RUNNING
return types.states.TaskState.RUNNING
# We do not do anything else to destroy this instance of publication
try:
@ -159,11 +159,11 @@ class OpenNebulaLivePublication(Publication, autoserializable.AutoSerializable):
except Exception as e:
self._state = 'error'
self._reason = str(e)
return types.states.State.ERROR
return types.states.TaskState.ERROR
return types.states.State.FINISHED
return types.states.TaskState.FINISHED
def cancel(self) -> types.states.State:
def cancel(self) -> types.states.TaskState:
"""
Do same thing as destroy
"""

View File

@ -180,10 +180,12 @@ class OpenStackLiveUserService(
return types.states.TaskState.FINISHED
def reset(self) -> None:
def reset(self) -> types.states.TaskState:
if self._vmid != '':
self.service().reset_machine(self._vmid)
return types.states.TaskState.FINISHED
def process_ready_from_os_manager(self, data: typing.Any) -> types.states.TaskState:
# Here we will check for suspending the VM (when full ready)
logger.debug('Checking if cache 2 for %s', self._name)

View File

@ -84,7 +84,7 @@ class OpenStackUserServiceFixed(FixedUserService, autoserializable.AutoSerializa
self.cache.put('ready', '1')
return types.states.TaskState.FINISHED
def reset(self) -> None:
def reset(self) -> types.states.TaskState:
"""
OpenStack, reset operation
"""
@ -94,6 +94,8 @@ class OpenStackUserServiceFixed(FixedUserService, autoserializable.AutoSerializa
except Exception: # nosec: if cannot reset, ignore it
pass # If could not reset, ignore it...
return types.states.TaskState.FINISHED
def process_ready_from_os_manager(self, data: typing.Any) -> types.states.TaskState:
return types.states.TaskState.FINISHED

View File

@ -85,7 +85,7 @@ class OpenStackLivePublication(Publication, autoserializable.AutoSerializable):
self.mark_for_upgrade() # This will force remarshalling
def publish(self) -> types.states.State:
def publish(self) -> types.states.TaskState:
"""
Realizes the publication of the service
"""
@ -104,19 +104,19 @@ class OpenStackLivePublication(Publication, autoserializable.AutoSerializable):
logger.exception('Got exception')
self._status = 'error'
self._reason = 'Got error {}'.format(e)
return types.states.State.ERROR
return types.states.TaskState.ERROR
return types.states.State.RUNNING
return types.states.TaskState.RUNNING
def check_state(self) -> types.states.State:
def check_state(self) -> types.states.TaskState:
"""
Checks state of publication creation
"""
if self._status == openstack_types.SnapshotStatus.ERROR:
return types.states.State.ERROR
return types.states.TaskState.ERROR
if self._status == openstack_types.SnapshotStatus.AVAILABLE:
return types.states.State.FINISHED
return types.states.TaskState.FINISHED
try:
self._status = self.service().get_template(self._template_id).status # For next check
@ -125,34 +125,34 @@ class OpenStackLivePublication(Publication, autoserializable.AutoSerializable):
self._destroy_after = False
return self.destroy()
return types.states.State.RUNNING
return types.states.TaskState.RUNNING
except Exception as e:
self._status = 'error'
self._reason = str(e)
return types.states.State.ERROR
return types.states.TaskState.ERROR
def error_reason(self) -> str:
return self._reason
def destroy(self) -> types.states.State:
def destroy(self) -> types.states.TaskState:
# We do not do anything else to destroy this instance of publication
if self._status == 'error':
return types.states.State.ERROR # Nothing to cancel
return types.states.TaskState.ERROR # Nothing to cancel
if self._status == 'creating':
self._destroy_after = True
return types.states.State.RUNNING
return types.states.TaskState.RUNNING
try:
self.service().remove_template(self._template_id)
except Exception as e:
self._status = 'error'
self._reason = str(e)
return types.states.State.ERROR
return types.states.TaskState.ERROR
return types.states.State.FINISHED
return types.states.TaskState.FINISHED
def cancel(self) -> types.states.State:
def cancel(self) -> types.states.TaskState:
return self.destroy()
# Here ends the publication needed methods.

View File

@ -90,7 +90,7 @@ class ProxmoxUserServiceFixed(FixedUserService, autoserializable.AutoSerializabl
self.cache.put('ready', '1')
return types.states.TaskState.FINISHED
def reset(self) -> None:
def reset(self) -> types.states.TaskState:
"""
o Proxmox, reset operation just shutdowns it until v3 support is removed
"""
@ -100,6 +100,8 @@ class ProxmoxUserServiceFixed(FixedUserService, autoserializable.AutoSerializabl
except Exception: # nosec: if cannot reset, ignore it
pass # If could not reset, ignore it...
return types.states.TaskState.FINISHED
def process_ready_from_os_manager(self, data: typing.Any) -> types.states.TaskState:
return types.states.TaskState.FINISHED

View File

@ -200,7 +200,7 @@ class ProxmoxUserserviceLinked(services.UserService, autoserializable.AutoSerial
self.cache.put('ready', '1')
return types.states.TaskState.FINISHED
def reset(self) -> None:
def reset(self) -> types.states.TaskState:
"""
o Proxmox, reset operation just shutdowns it until v3 support is removed
"""
@ -210,6 +210,8 @@ class ProxmoxUserserviceLinked(services.UserService, autoserializable.AutoSerial
except Exception: # nosec: if cannot reset, ignore it
pass # If could not reset, ignore it...
return types.states.TaskState.FINISHED
def get_console_connection(
self,
) -> typing.Optional[types.services.ConsoleConnectionInfo]:

View File

@ -86,7 +86,7 @@ class ProxmoxPublication(services.Publication, autoserializable.AutoSerializable
self.mark_for_upgrade() # Flag so manager can save it again with new format
def publish(self) -> types.states.State:
def publish(self) -> types.states.TaskState:
"""
If no space is available, publication will fail with an error
"""
@ -99,36 +99,36 @@ class ProxmoxPublication(services.Publication, autoserializable.AutoSerializable
task = self.service().clone_machine(self._name, comments)
self._vmid = str(task.vmid)
self._task = ','.join((task.upid.node, task.upid.upid))
self._state = types.states.State.RUNNING
self._state = types.states.TaskState.RUNNING
self._operation = 'p' # Publishing
self._destroy_after = False
return types.states.State.RUNNING
return types.states.TaskState.RUNNING
except Exception as e:
logger.exception('Caught exception %s', e)
self._reason = str(e)
return types.states.State.ERROR
return types.states.TaskState.ERROR
def check_state(self) -> types.states.State:
if self._state != types.states.State.RUNNING:
return types.states.State.from_str(self._state)
def check_state(self) -> types.states.TaskState:
if self._state != types.states.TaskState.RUNNING:
return types.states.TaskState.from_str(self._state)
node, upid = self._task.split(',')
try:
task = self.service().provider().get_task_info(node, upid)
if task.is_running():
return types.states.State.RUNNING
return types.states.TaskState.RUNNING
except Exception as e:
logger.exception('Proxmox publication')
self._state = types.states.State.ERROR
self._state = types.states.TaskState.ERROR
self._reason = str(e)
return self._state
if task.is_errored():
self._reason = task.exitstatus
self._state = types.states.State.ERROR
self._state = types.states.TaskState.ERROR
else: # Finished
if self._destroy_after:
return self.destroy()
self._state = types.states.State.FINISHED
self._state = types.states.TaskState.FINISHED
if self._operation == 'p': # not Destroying
# Disable Protection (removal)
self.service().provider().set_protection(int(self._vmid), protection=False)
@ -149,20 +149,20 @@ class ProxmoxPublication(services.Publication, autoserializable.AutoSerializable
self._task = ''
self._destroy_after = False
def destroy(self) -> types.states.State:
def destroy(self) -> types.states.TaskState:
if (
self._state == types.states.State.RUNNING and self._destroy_after is False
self._state == types.states.TaskState.RUNNING and self._destroy_after is False
): # If called destroy twice, will BREAK STOP publication
self._destroy_after = True
return types.states.State.RUNNING
return types.states.TaskState.RUNNING
self._state = types.states.State.RUNNING
self._state = types.states.TaskState.RUNNING
self._operation = 'd'
self._destroy_after = False
try:
task = self.service().remove_machine(self.machine())
self._task = ','.join((task.node, task.upid))
return types.states.State.RUNNING
return types.states.TaskState.RUNNING
except Exception as e:
self._reason = str(e) # Store reason of error
logger.warning(
@ -170,9 +170,9 @@ class ProxmoxPublication(services.Publication, autoserializable.AutoSerializable
self.machine(),
e,
)
return types.states.State.ERROR
return types.states.TaskState.ERROR
def cancel(self) -> types.states.State:
def cancel(self) -> types.states.TaskState:
return self.destroy()
def error_reason(self) -> str:

View File

@ -117,7 +117,7 @@ class SamplePublication(services.Publication):
self._reason = vals[1]
self._number = int(vals[2])
def publish(self) -> types.states.State:
def publish(self) -> types.states.TaskState:
"""
This method is invoked whenever the administrator requests a new publication.
@ -129,7 +129,7 @@ class SamplePublication(services.Publication):
All publications can be synchronous or asynchronous.
The main difference between both is that first do whatever needed, (the
action must be fast enough to do not block core), returning types.states.State.FINISHED.
action must be fast enough to do not block core), returning types.states.TaskState.FINISHED.
The second (asynchronous) are publications that could block the core, so
it have to be done in more than one step.
@ -138,7 +138,7 @@ class SamplePublication(services.Publication):
* First we invoke the copy operation to virtualization provider
* Second, we kept needed values inside instance so we can serialize
them whenever requested
* Returns an types.states.State.RUNNING, indicating the core that the publication
* Returns an types.states.TaskState.RUNNING, indicating the core that the publication
has started but has to finish sometime later. (We do no check
again the state and keep waiting here, because we will block the
core untill this operation is finished).
@ -151,7 +151,7 @@ class SamplePublication(services.Publication):
finish at publish call but a later check_state call
Take care with instantiating threads from here. Whenever a publish returns
"types.states.State.RUNNING", the core will recheck it later, but not using this instance
"types.states.TaskState.RUNNING", the core will recheck it later, but not using this instance
and maybe that even do not use this server.
If you want to use threadings or somethin likt it, use DelayedTasks and
@ -172,17 +172,17 @@ class SamplePublication(services.Publication):
"""
self._number = 5
self._reason = ''
return types.states.State.RUNNING
return types.states.TaskState.RUNNING
def check_state(self) -> types.states.State:
def check_state(self) -> types.states.TaskState:
"""
Our publish method will initiate publication, but will not finish it.
So in our sample, wi will only check if _number reaches 0, and if so
return that we have finished, else we will return that we are working
on it.
One publish returns types.states.State.RUNNING, this task will get called untill
check_state returns types.states.State.FINISHED.
One publish returns types.states.TaskState.RUNNING, this task will get called untill
check_state returns types.states.TaskState.FINISHED.
Also, wi will make the publication fail one of every 10 calls to this
method.
@ -198,11 +198,11 @@ class SamplePublication(services.Publication):
# One of every 10 calls
if random.randint(0, 9) == 9:
self._reason = _('Random integer was 9!!! :-)')
return types.states.State.ERROR
return types.states.TaskState.ERROR
if self._number <= 0:
return types.states.State.FINISHED
return types.states.State.RUNNING
return types.states.TaskState.FINISHED
return types.states.TaskState.RUNNING
def finish(self) -> None:
"""
@ -222,13 +222,13 @@ class SamplePublication(services.Publication):
"""
If a publication produces an error, here we must notify the reason why
it happened. This will be called just after publish or check_state
if they return types.states.State.ERROR
if they return types.states.TaskState.ERROR
Returns an string, in our case, set at check_state
"""
return self._reason
def destroy(self) -> types.states.State:
def destroy(self) -> types.states.TaskState:
"""
This is called once a publication is no more needed.
@ -236,16 +236,16 @@ class SamplePublication(services.Publication):
removing created "external" data (environment gets cleaned by core),
etc..
The retunred value is the same as when publishing, types.states.State.RUNNING,
types.states.State.FINISHED or types.states.State.ERROR.
The retunred value is the same as when publishing, types.states.TaskState.RUNNING,
types.states.TaskState.FINISHED or types.states.TaskState.ERROR.
"""
self._name = ''
self._reason = '' # In fact, this is not needed, but cleaning up things... :-)
# We do not do anything else to destroy this instance of publication
return types.states.State.FINISHED
return types.states.TaskState.FINISHED
def cancel(self) -> types.states.State:
def cancel(self) -> types.states.TaskState:
"""
Invoked for canceling the current operation.
This can be invoked directly by an administration or by the clean up

View File

@ -78,32 +78,32 @@ class TestPublication(services.Publication):
# We do not check anything at marshal method, so we ensure that
# default values are correctly handled by marshal.
self.data.name = ''.join(random.choices(string.ascii_letters, k=8))
self.data.state = types.states.State.RUNNING
self.data.state = types.states.TaskState.RUNNING
self.data.reason = 'none'
self.data.number = 10
def publish(self) -> types.states.State:
def publish(self) -> types.states.TaskState:
logger.info('Publishing publication %s: %s remaining',self.data.name, self.data.number)
self.data.number -= 1
if self.data.number <= 0:
self.data.state = types.states.State.FINISHED
return types.states.State.from_str(self.data.state)
self.data.state = types.states.TaskState.FINISHED
return types.states.TaskState.from_str(self.data.state)
def finish(self) -> None:
# Make simply a random string
logger.info('Finishing publication %s', self.data.name)
self.data.number = 0
self.data.state = types.states.State.FINISHED
self.data.state = types.states.TaskState.FINISHED
def error_reason(self) -> str:
return self.data.reason
def destroy(self) -> types.states.State:
def destroy(self) -> types.states.TaskState:
logger.info('Destroying publication %s', self.data.name)
return types.states.State.FINISHED
return types.states.TaskState.FINISHED
def cancel(self) -> types.states.State:
def cancel(self) -> types.states.TaskState:
logger.info('Canceling publication %s', self.data.name)
return self.destroy()

View File

@ -159,10 +159,12 @@ class XenLinkedDeployment(services.UserService, autoserializable.AutoSerializabl
return types.states.TaskState.FINISHED
def reset(self) -> None:
def reset(self) -> types.states.TaskState:
if self._vmid:
self.service().reset_machine(self._vmid) # Reset in sync
return types.states.TaskState.FINISHED
def process_ready_from_os_manager(self, data: typing.Any) -> types.states.TaskState:
# Here we will check for suspending the VM (when full ready)
logger.debug('Checking if cache 2 for %s', self._name)

View File

@ -84,10 +84,12 @@ class XenFixedUserService(FixedUserService, autoserializable.AutoSerializable):
return types.states.TaskState.FINISHED
def reset(self) -> None:
def reset(self) -> types.states.TaskState:
if self._vmid:
self.service().reset_machine(self._vmid) # Reset in sync
return types.states.TaskState.FINISHED
def process_ready_from_os_manager(self, data: typing.Any) -> types.states.TaskState:
return types.states.TaskState.FINISHED

View File

@ -85,7 +85,7 @@ class XenPublication(Publication, autoserializable.AutoSerializable):
self.mark_for_upgrade() # Force upgrade asap
def publish(self) -> types.states.State:
def publish(self) -> types.states.TaskState:
"""
Realizes the publication of the service
"""
@ -104,19 +104,19 @@ class XenPublication(Publication, autoserializable.AutoSerializable):
except Exception as e:
self._state = 'error'
self._reason = str(e)
return types.states.State.ERROR
return types.states.TaskState.ERROR
return types.states.State.RUNNING
return types.states.TaskState.RUNNING
def check_state(self) -> types.states.State:
def check_state(self) -> types.states.TaskState:
"""
Checks state of publication creation
"""
if self._state == 'finished':
return types.states.State.FINISHED
return types.states.TaskState.FINISHED
if self._state == 'error':
return types.states.State.ERROR
return types.states.TaskState.ERROR
try:
state, result = self.service().check_task_finished(self._task)
@ -128,33 +128,33 @@ class XenPublication(Publication, autoserializable.AutoSerializable):
return self.destroy()
self.service().convert_to_template(self._template_id)
return types.states.State.FINISHED
return types.states.TaskState.FINISHED
except Exception as e:
self._state = 'error'
self._reason = str(e)
return types.states.State.ERROR
return types.states.TaskState.ERROR
return types.states.State.RUNNING
return types.states.TaskState.RUNNING
def error_reason(self) -> str:
return self._reason
def destroy(self) -> types.states.State:
def destroy(self) -> types.states.TaskState:
# We do not do anything else to destroy this instance of publication
if self._state == 'ok':
self._destroy_after = True
return types.states.State.RUNNING
return types.states.TaskState.RUNNING
try:
self.service().remove_template(self._template_id)
except Exception as e:
self._state = 'error'
self._reason = str(e)
return types.states.State.ERROR
return types.states.TaskState.ERROR
return types.states.State.FINISHED
return types.states.TaskState.FINISHED
def cancel(self) -> types.states.State:
def cancel(self) -> types.states.TaskState:
return self.destroy()
# Here ends the publication needed methods.

View File

@ -59,6 +59,8 @@ class TestOVirtLinkedService(UDSTransactionTestCase):
for _ in limited_iterator(lambda: state == types.states.TaskState.RUNNING, limit=128):
state = userservice.check_state()
# Ensure machine status is always DOWN, so the test does not end
utils.find_attr_in_list(fixtures.VMS_INFO, 'id', userservice._vmid).status = ov_types.VMStatus.DOWN
self.assertEqual(state, types.states.TaskState.ERROR)
self.assertGreater(