mirror of
https://github.com/dkmstr/openuds.git
synced 2025-01-22 22:03:54 +03:00
Adapting OpenStack to generics
This commit is contained in:
parent
7db960ca92
commit
57e5412553
@ -61,10 +61,10 @@ EXPECTED_FIELDS: typing.Final[set[str]] = {
|
||||
'_check_count'
|
||||
}
|
||||
|
||||
TEST_QUEUE: typing.Final[list[deployment.Operation]] = [
|
||||
deployment.Operation.CREATE,
|
||||
deployment.Operation.REMOVE,
|
||||
deployment.Operation.RETRY,
|
||||
TEST_QUEUE: typing.Final[list[deployment.OldOperation]] = [
|
||||
deployment.OldOperation.CREATE,
|
||||
deployment.OldOperation.REMOVE,
|
||||
deployment.OldOperation.RETRY,
|
||||
]
|
||||
|
||||
SERIALIZED_DEPLOYMENT_DATA: typing.Final[typing.Mapping[str, bytes]] = {
|
||||
@ -128,26 +128,26 @@ class OpenStackDeploymentSerializationTest(UDSTransactionTestCase):
|
||||
self.assertEqual(instance._queue, TEST_QUEUE)
|
||||
|
||||
instance._queue = [
|
||||
deployment.Operation.CREATE,
|
||||
deployment.Operation.FINISH,
|
||||
deployment.OldOperation.CREATE,
|
||||
deployment.OldOperation.FINISH,
|
||||
]
|
||||
marshaled_data = instance.marshal()
|
||||
|
||||
instance = _create_instance(marshaled_data)
|
||||
self.assertEqual(
|
||||
instance._queue,
|
||||
[deployment.Operation.CREATE, deployment.Operation.FINISH],
|
||||
[deployment.OldOperation.CREATE, deployment.OldOperation.FINISH],
|
||||
)
|
||||
# Append something remarshall and check
|
||||
instance._queue.insert(0, deployment.Operation.RETRY)
|
||||
instance._queue.insert(0, deployment.OldOperation.RETRY)
|
||||
marshaled_data = instance.marshal()
|
||||
instance = _create_instance(marshaled_data)
|
||||
self.assertEqual(
|
||||
instance._queue,
|
||||
[
|
||||
deployment.Operation.RETRY,
|
||||
deployment.Operation.CREATE,
|
||||
deployment.Operation.FINISH,
|
||||
deployment.OldOperation.RETRY,
|
||||
deployment.OldOperation.CREATE,
|
||||
deployment.OldOperation.FINISH,
|
||||
],
|
||||
)
|
||||
# Remove something remarshall and check
|
||||
@ -156,7 +156,7 @@ class OpenStackDeploymentSerializationTest(UDSTransactionTestCase):
|
||||
instance = _create_instance(marshaled_data)
|
||||
self.assertEqual(
|
||||
instance._queue,
|
||||
[deployment.Operation.CREATE, deployment.Operation.FINISH],
|
||||
[deployment.OldOperation.CREATE, deployment.OldOperation.FINISH],
|
||||
)
|
||||
|
||||
def test_autoserialization_fields(self) -> None:
|
||||
|
@ -36,7 +36,7 @@ from uds.core import types
|
||||
from unittest import mock
|
||||
|
||||
from uds.services.OpenStack.openstack import types as openstack_types
|
||||
from uds.services.OpenStack.deployment import Operation
|
||||
from uds.services.OpenStack.deployment import OldOperation
|
||||
|
||||
from . import fixtures
|
||||
|
||||
@ -114,7 +114,7 @@ class TestOpenstackLiveDeployment(UDSTransactionTestCase):
|
||||
self.assertNotEqual(userservice._ip, '', f'Error on {to_test} deployment')
|
||||
|
||||
# And queue must be finished
|
||||
self.assertEqual(userservice._queue, [Operation.FINISH], f'Error on {to_test} deployment')
|
||||
self.assertEqual(userservice._queue, [OldOperation.FINISH], f'Error on {to_test} deployment')
|
||||
|
||||
def test_userservice_cancel(self) -> None:
|
||||
"""
|
||||
@ -143,7 +143,7 @@ class TestOpenstackLiveDeployment(UDSTransactionTestCase):
|
||||
|
||||
self.assertEqual(
|
||||
userservice._queue,
|
||||
[current_op] + [Operation.STOP, Operation.REMOVE, Operation.FINISH],
|
||||
[current_op] + [OldOperation.STOP, OldOperation.REMOVE, OldOperation.FINISH],
|
||||
)
|
||||
|
||||
counter = 0
|
||||
|
@ -154,7 +154,7 @@ class DynamicService(services.Service, abc.ABC): # pylint: disable=too-many-pub
|
||||
) -> None:
|
||||
"""
|
||||
Starts the machine
|
||||
Can return a task, or None if no task is returned
|
||||
Returns None. If a task is needed for anything, use the caller_instance to notify
|
||||
"""
|
||||
...
|
||||
|
||||
@ -162,7 +162,7 @@ class DynamicService(services.Service, abc.ABC): # pylint: disable=too-many-pub
|
||||
def stop(self, caller_instance: 'DynamicUserService | DynamicPublication', vmid: str) -> None:
|
||||
"""
|
||||
Stops the machine
|
||||
Can return a task, or None if no task is returned
|
||||
Returns None. If a task is needed for anything, use the caller_instance to notify
|
||||
"""
|
||||
...
|
||||
|
||||
@ -170,9 +170,8 @@ class DynamicService(services.Service, abc.ABC): # pylint: disable=too-many-pub
|
||||
self, caller_instance: 'DynamicUserService | DynamicPublication', vmid: str
|
||||
) -> None:
|
||||
"""
|
||||
Shutdowns the machine
|
||||
Defaults to stop_machine
|
||||
Can return a task, or None if no task is returned
|
||||
Shutdowns the machine. Defaults to stop
|
||||
Returns None. If a task is needed for anything, use the caller_instance to notify
|
||||
"""
|
||||
return self.stop(caller_instance, vmid)
|
||||
|
||||
@ -181,7 +180,7 @@ class DynamicService(services.Service, abc.ABC): # pylint: disable=too-many-pub
|
||||
) -> None:
|
||||
"""
|
||||
Resets the machine
|
||||
Can return a task, or None if no task is returned
|
||||
Returns None. If a task is needed for anything, use the caller_instance to notify
|
||||
"""
|
||||
# Default is to stop "hard"
|
||||
return self.stop(caller_instance, vmid)
|
||||
@ -192,6 +191,7 @@ class DynamicService(services.Service, abc.ABC): # pylint: disable=too-many-pub
|
||||
) -> None:
|
||||
"""
|
||||
Removes the machine, or queues it for removal, or whatever :)
|
||||
Use the caller_instance to notify anything if needed, or to identify caller
|
||||
"""
|
||||
...
|
||||
|
||||
|
@ -30,20 +30,17 @@
|
||||
"""
|
||||
Author: Adolfo Gómez, dkmaster at dkmon dot com
|
||||
"""
|
||||
import collections.abc
|
||||
import enum
|
||||
import logging
|
||||
import pickle # nosec: not insecure, we are loading our own data
|
||||
import typing
|
||||
|
||||
from uds.core import consts, services, types
|
||||
from uds.core import consts, types
|
||||
from uds.core.services.generics.dynamic.userservice import DynamicUserService
|
||||
from uds.core.util import autoserializable
|
||||
|
||||
from .openstack import types as openstack_types
|
||||
|
||||
# Not imported at runtime, just for type checking
|
||||
if typing.TYPE_CHECKING:
|
||||
from uds import models
|
||||
|
||||
from .publication import OpenStackLivePublication
|
||||
from .service import OpenStackLiveService
|
||||
@ -56,7 +53,7 @@ logger = logging.getLogger(__name__)
|
||||
CHECK_COUNT_BEFORE_FAILURE: typing.Final[int] = 25
|
||||
|
||||
|
||||
class Operation(enum.IntEnum):
|
||||
class OldOperation(enum.IntEnum):
|
||||
CREATE = 0
|
||||
START = 1
|
||||
SUSPEND = 2
|
||||
@ -70,16 +67,28 @@ class Operation(enum.IntEnum):
|
||||
UNKNOWN = 99
|
||||
|
||||
@staticmethod
|
||||
def from_int(value: int) -> 'Operation':
|
||||
def from_int(value: int) -> 'OldOperation':
|
||||
try:
|
||||
return Operation(value)
|
||||
return OldOperation(value)
|
||||
except ValueError:
|
||||
return Operation.UNKNOWN
|
||||
return OldOperation.UNKNOWN
|
||||
|
||||
def to_operation(self) -> types.services.Operation:
|
||||
return {
|
||||
OldOperation.CREATE: types.services.Operation.CREATE,
|
||||
OldOperation.START: types.services.Operation.START,
|
||||
OldOperation.SUSPEND: types.services.Operation.SUSPEND,
|
||||
OldOperation.REMOVE: types.services.Operation.DELETE,
|
||||
OldOperation.WAIT: types.services.Operation.WAIT,
|
||||
OldOperation.ERROR: types.services.Operation.ERROR,
|
||||
OldOperation.FINISH: types.services.Operation.FINISH,
|
||||
OldOperation.RETRY: types.services.Operation.RETRY,
|
||||
OldOperation.STOP: types.services.Operation.STOP,
|
||||
OldOperation.UNKNOWN: types.services.Operation.UNKNOWN,
|
||||
}.get(self, types.services.Operation.UNKNOWN)
|
||||
|
||||
|
||||
class OpenStackLiveUserService(
|
||||
services.UserService, autoserializable.AutoSerializable
|
||||
): # pylint: disable=too-many-public-methods
|
||||
class OpenStackLiveUserService(DynamicUserService, autoserializable.AutoSerializable):
|
||||
"""
|
||||
This class generates the user consumable elements of the service tree.
|
||||
|
||||
@ -90,14 +99,6 @@ class OpenStackLiveUserService(
|
||||
The logic for managing ovirt deployments (user machines in this case) is here.
|
||||
"""
|
||||
|
||||
_name = autoserializable.StringField(default='')
|
||||
_ip = autoserializable.StringField(default='')
|
||||
_mac = autoserializable.StringField(default='')
|
||||
_vmid = autoserializable.StringField(default='')
|
||||
_reason = autoserializable.StringField(default='')
|
||||
_check_count = autoserializable.IntegerField(default=CHECK_COUNT_BEFORE_FAILURE)
|
||||
_queue = autoserializable.ListField[Operation]()
|
||||
|
||||
# _name: str = ''
|
||||
# _ip: str = ''
|
||||
# _mac: str = ''
|
||||
@ -105,8 +106,23 @@ class OpenStackLiveUserService(
|
||||
# _reason: str = ''
|
||||
# _queue: list[int] = []
|
||||
|
||||
# : Recheck every this seconds by default (for task methods)
|
||||
suggested_delay = 5
|
||||
|
||||
# Custom queue
|
||||
_create_queue = [
|
||||
types.services.Operation.CREATE,
|
||||
types.services.Operation.FINISH,
|
||||
]
|
||||
_create_queue_l1_cache = [
|
||||
types.services.Operation.CREATE,
|
||||
types.services.Operation.FINISH,
|
||||
]
|
||||
# Note that openstack does not implements L2 cache
|
||||
_create_queue_l2_cache = [
|
||||
types.services.Operation.CREATE,
|
||||
types.services.Operation.WAIT,
|
||||
types.services.Operation.STOP,
|
||||
types.services.Operation.FINISH,
|
||||
]
|
||||
|
||||
# For typing check only...
|
||||
def service(self) -> 'OpenStackLiveService':
|
||||
@ -127,207 +143,16 @@ class OpenStackLiveUserService(
|
||||
self._mac = vals[3].decode('utf8')
|
||||
self._vmid = vals[4].decode('utf8')
|
||||
self._reason = vals[5].decode('utf8')
|
||||
self._queue = [Operation.from_int(i) for i in pickle.loads(vals[6])] # nosec
|
||||
self._queue = [OldOperation.from_int(i).to_operation() for i in pickle.loads(vals[6])] # nosec
|
||||
|
||||
self.mark_for_upgrade() # Flag so manager can save it again with new format
|
||||
|
||||
def get_name(self) -> str:
|
||||
if self._name == '':
|
||||
try:
|
||||
self._name = 'UDS-U-' + self.name_generator().get(
|
||||
self.service().get_basename(), self.service().get_lenname()
|
||||
)
|
||||
except KeyError:
|
||||
return consts.NO_MORE_NAMES
|
||||
return self._name
|
||||
|
||||
def set_ip(self, ip: str) -> None:
|
||||
self._ip = ip
|
||||
|
||||
def get_unique_id(self) -> str:
|
||||
return self._mac
|
||||
|
||||
def get_ip(self) -> str:
|
||||
return self._ip
|
||||
|
||||
def set_ready(self) -> types.states.TaskState:
|
||||
"""
|
||||
The method is invoked whenever a machine is provided to an user, right
|
||||
before presenting it (via transport rendering) to the user.
|
||||
"""
|
||||
if self.cache.get('ready') == '1':
|
||||
return types.states.TaskState.FINISHED
|
||||
|
||||
try:
|
||||
status = self.service().get_machine_status(self._vmid)
|
||||
|
||||
if status.is_lost():
|
||||
return self._error('Machine is not available anymore')
|
||||
|
||||
power_state = self.service().get_machine_power_state(self._vmid)
|
||||
|
||||
if power_state.is_paused():
|
||||
self.service().resume_machine(self._vmid)
|
||||
elif power_state.is_stopped():
|
||||
self.service().start_machine(self._vmid)
|
||||
|
||||
# Right now, we suppose the machine is ready
|
||||
|
||||
self.cache.put('ready', '1')
|
||||
except Exception as e:
|
||||
self.do_log(types.log.LogLevel.ERROR, 'Error on setReady: {}'.format(e))
|
||||
# Treat as operation done, maybe the machine is ready and we can continue
|
||||
|
||||
return types.states.TaskState.FINISHED
|
||||
|
||||
def reset(self) -> types.states.TaskState:
|
||||
if self._vmid != '':
|
||||
self.service().reset_machine(self._vmid)
|
||||
|
||||
return types.states.TaskState.FINISHED
|
||||
|
||||
def process_ready_from_os_manager(self, data: typing.Any) -> types.states.TaskState:
|
||||
# Here we will check for suspending the VM (when full ready)
|
||||
logger.debug('Checking if cache 2 for %s', self._name)
|
||||
if self._get_current_op() == Operation.WAIT:
|
||||
logger.debug('Machine is ready. Moving to level 2')
|
||||
self._pop_current_op() # Remove current state
|
||||
return self._execute_queue()
|
||||
# Do not need to go to level 2 (opWait is in fact "waiting for moving machine to cache level 2)
|
||||
return types.states.TaskState.FINISHED
|
||||
|
||||
def deploy_for_user(self, user: 'models.User') -> types.states.TaskState:
|
||||
"""
|
||||
Deploys an service instance for an user.
|
||||
"""
|
||||
logger.debug('Deploying for user')
|
||||
self._init_queue_for_deploy(types.services.CacheLevel.NONE)
|
||||
return self._execute_queue()
|
||||
|
||||
def deploy_for_cache(self, level: types.services.CacheLevel) -> types.states.TaskState:
|
||||
"""
|
||||
Deploys an service instance for cache
|
||||
"""
|
||||
self._init_queue_for_deploy(level)
|
||||
return self._execute_queue()
|
||||
|
||||
def _init_queue_for_deploy(self, level: types.services.CacheLevel) -> None:
|
||||
if level in (types.services.CacheLevel.NONE, types.services.CacheLevel.L1):
|
||||
self._queue = [Operation.CREATE, Operation.FINISH]
|
||||
else:
|
||||
self._queue = [Operation.CREATE, Operation.WAIT, Operation.SUSPEND, Operation.FINISH]
|
||||
|
||||
def _check_machine_power_state(self, *check_state: 'openstack_types.PowerState') -> types.states.TaskState:
|
||||
self._check_count -= 1
|
||||
if self._check_count < 0:
|
||||
return self._error('Machine is not {str(check_state)} after {CHECK_COUNT_BEFORE_FAILURE} checks')
|
||||
|
||||
logger.debug(
|
||||
'Checking that state of machine %s (%s) is %s (remaining checks: %s)',
|
||||
self._vmid,
|
||||
self._name,
|
||||
check_state,
|
||||
self._check_count,
|
||||
)
|
||||
power_state = self.service().get_machine_power_state(self._vmid)
|
||||
|
||||
ret = types.states.TaskState.RUNNING
|
||||
|
||||
if power_state in check_state:
|
||||
ret = types.states.TaskState.FINISHED
|
||||
|
||||
return ret
|
||||
|
||||
def _get_current_op(self) -> Operation:
|
||||
if not self._queue:
|
||||
return Operation.FINISH
|
||||
|
||||
return self._queue[0]
|
||||
|
||||
def _pop_current_op(self) -> Operation:
|
||||
if not self._queue:
|
||||
return Operation.FINISH
|
||||
|
||||
return self._queue.pop(0)
|
||||
|
||||
def _reset_check_count(self) -> None:
|
||||
# Check the maximum number of checks before failure
|
||||
# So we dont stuck forever on check state to CHECK_COUNT_BEFORE_FAILURE
|
||||
self._check_count = CHECK_COUNT_BEFORE_FAILURE
|
||||
|
||||
def _error(self, reason: typing.Any) -> types.states.TaskState:
|
||||
"""
|
||||
Internal method to set object as error state
|
||||
|
||||
Returns:
|
||||
types.states.DeployState.ERROR, so we can do "return self.__error(reason)"
|
||||
"""
|
||||
logger.debug('Setting error state, reason: %s', reason)
|
||||
is_creation = self._get_current_op() == Operation.CREATE
|
||||
self._queue = [Operation.ERROR]
|
||||
self._reason = str(reason)
|
||||
|
||||
self.do_log(types.log.LogLevel.ERROR, self._reason)
|
||||
|
||||
if self._vmid:
|
||||
# Creating machines should be deleted on error
|
||||
if is_creation or self.service().keep_on_error() is False: # Powers off & delete it
|
||||
try:
|
||||
self.service().delete_machine(self._vmid)
|
||||
except Exception:
|
||||
logger.warning('Can\t set machine %s state to stopped', self._vmid)
|
||||
else:
|
||||
self.do_log(
|
||||
types.log.LogLevel.INFO, 'Keep on error is enabled, machine will not be marked for deletion'
|
||||
)
|
||||
# Fix queue to FINISH and return it
|
||||
self._queue = [Operation.FINISH]
|
||||
return types.states.TaskState.FINISHED
|
||||
|
||||
return types.states.TaskState.ERROR
|
||||
|
||||
def _execute_queue(self) -> types.states.TaskState:
|
||||
self._debug('executeQueue')
|
||||
op = self._get_current_op()
|
||||
|
||||
if op == Operation.ERROR:
|
||||
return types.states.TaskState.ERROR
|
||||
|
||||
if op == Operation.FINISH:
|
||||
return types.states.TaskState.FINISHED
|
||||
|
||||
try:
|
||||
if op not in _EXECUTE_FNCS:
|
||||
return self._error('Unknown operation found at execution queue ({0})'.format(op))
|
||||
|
||||
_EXECUTE_FNCS[op](self)
|
||||
|
||||
return types.states.TaskState.RUNNING
|
||||
except Exception as e:
|
||||
return self._error(e)
|
||||
|
||||
# Queue execution methods
|
||||
def _retry(self) -> None:
|
||||
"""
|
||||
Used to retry an operation
|
||||
In fact, this will not be never invoked, unless we push it twice, because
|
||||
check_state method will "pop" first item when a check operation returns types.states.DeployState.FINISHED
|
||||
|
||||
At executeQueue this return value will be ignored, and it will only be used at check_state
|
||||
"""
|
||||
pass
|
||||
|
||||
def _wait(self) -> None:
|
||||
"""
|
||||
Executes opWait, it simply waits something "external" to end
|
||||
"""
|
||||
pass
|
||||
|
||||
def _create(self) -> None:
|
||||
def op_create(self) -> None:
|
||||
"""
|
||||
Deploys a machine from template for user/cache
|
||||
"""
|
||||
templateId = self.publication().get_template_id()
|
||||
template_id = self.publication().get_template_id()
|
||||
name = self.get_name()
|
||||
if name == consts.NO_MORE_NAMES:
|
||||
raise Exception(
|
||||
@ -336,227 +161,24 @@ class OpenStackLiveUserService(
|
||||
|
||||
name = self.service().sanitized_name(name)
|
||||
|
||||
self._vmid = self.service().deploy_from_template(name, templateId).id
|
||||
self._vmid = self.service().deploy_from_template(name, template_id).id
|
||||
if not self._vmid:
|
||||
raise Exception('Can\'t create machine')
|
||||
|
||||
self._reset_check_count()
|
||||
|
||||
return None
|
||||
|
||||
def _remove(self) -> None:
|
||||
"""
|
||||
Removes a machine from system
|
||||
"""
|
||||
status = self.service().get_machine_status(self._vmid)
|
||||
|
||||
if status.is_lost():
|
||||
raise Exception('Machine not found. (Status {})'.format(status))
|
||||
|
||||
self.service().delete_machine(self._vmid)
|
||||
|
||||
def _start_machine(self) -> None:
|
||||
"""
|
||||
Powers on the machine
|
||||
"""
|
||||
self.service().start_machine(self._vmid)
|
||||
|
||||
self._reset_check_count()
|
||||
|
||||
def _stop_machine(self) -> None:
|
||||
"""
|
||||
Powers off the machine
|
||||
"""
|
||||
self.service().stop_machine(self._vmid)
|
||||
|
||||
self._reset_check_count()
|
||||
|
||||
def _suspend_machine(self) -> None:
|
||||
"""
|
||||
Suspends the machine
|
||||
"""
|
||||
self.service().suspend_machine(self._vmid)
|
||||
|
||||
self._reset_check_count()
|
||||
|
||||
def _check_retry(self) -> types.states.TaskState:
|
||||
"""
|
||||
This method is invoked when a task has been retried.
|
||||
"""
|
||||
return types.states.TaskState.FINISHED
|
||||
|
||||
def _check_wait(self) -> types.states.TaskState:
|
||||
"""
|
||||
This method is invoked when a task is waiting for something.
|
||||
"""
|
||||
return types.states.TaskState.RUNNING
|
||||
|
||||
|
||||
# Check methods
|
||||
def _check_create(self) -> types.states.TaskState:
|
||||
def op_create_checker(self) -> types.states.TaskState:
|
||||
"""
|
||||
Checks the state of a deploy for an user or cache
|
||||
"""
|
||||
# Checks if machine has been created
|
||||
ret = self._check_machine_power_state(openstack_types.PowerState.RUNNING)
|
||||
if ret == types.states.TaskState.FINISHED:
|
||||
# If machine is requested to not be removed never, we may end with
|
||||
# an empty mac and ip, but no problem. Next time we will get it
|
||||
# Get IP & MAC (early stage)
|
||||
addr = self.service().get_server_address(self._vmid)
|
||||
self._mac, self._ip = addr.mac, addr.ip
|
||||
|
||||
return ret
|
||||
|
||||
def _check_start(self) -> types.states.TaskState:
|
||||
"""
|
||||
Checks if machine has started
|
||||
"""
|
||||
return self._check_machine_power_state(openstack_types.PowerState.RUNNING)
|
||||
|
||||
def _check_stop(self) -> types.states.TaskState:
|
||||
"""
|
||||
Checks if machine has stopped
|
||||
"""
|
||||
return self._check_machine_power_state(
|
||||
openstack_types.PowerState.SHUTDOWN,
|
||||
openstack_types.PowerState.CRASHED,
|
||||
openstack_types.PowerState.SUSPENDED,
|
||||
)
|
||||
|
||||
def _check_suspend(self) -> types.states.TaskState:
|
||||
"""
|
||||
Check if the machine has suspended
|
||||
"""
|
||||
return self._check_machine_power_state(openstack_types.PowerState.SUSPENDED)
|
||||
|
||||
def _check_removed(self) -> types.states.TaskState:
|
||||
"""
|
||||
Checks if a machine has been removed
|
||||
"""
|
||||
return types.states.TaskState.FINISHED # No check at all, always true
|
||||
|
||||
def check_state(self) -> types.states.TaskState:
|
||||
"""
|
||||
Check what operation is going on, and acts acordly to it
|
||||
"""
|
||||
self._debug('check_state')
|
||||
op = self._get_current_op()
|
||||
|
||||
if op == Operation.ERROR:
|
||||
return types.states.TaskState.ERROR
|
||||
|
||||
if op == Operation.FINISH:
|
||||
if self.service().is_running(self, self._vmid):
|
||||
server_info = self.service().api.get_server(self._vmid).validated()
|
||||
self._mac = server_info.addresses[0].mac
|
||||
self._ip = server_info.addresses[0].ip
|
||||
return types.states.TaskState.FINISHED
|
||||
|
||||
try:
|
||||
if op not in _CHECK_FNCS:
|
||||
return self._error('Unknown operation found at execution queue ({0})'.format(op))
|
||||
|
||||
state = _CHECK_FNCS[op](self)
|
||||
if state == types.states.TaskState.FINISHED:
|
||||
self._pop_current_op() # Remove runing op
|
||||
return self._execute_queue()
|
||||
|
||||
return state
|
||||
except Exception as e:
|
||||
return self._error(e)
|
||||
|
||||
def move_to_cache(self, level: types.services.CacheLevel) -> types.states.TaskState:
|
||||
"""
|
||||
Moves machines between cache levels
|
||||
"""
|
||||
if Operation.REMOVE in self._queue:
|
||||
return types.states.TaskState.RUNNING
|
||||
|
||||
if level == types.services.CacheLevel.L1:
|
||||
self._queue = [Operation.START, Operation.FINISH]
|
||||
else: # Currently L2 is not supported
|
||||
self._queue = [Operation.START, Operation.SUSPEND, Operation.FINISH]
|
||||
|
||||
return self._execute_queue()
|
||||
|
||||
def error_reason(self) -> str:
|
||||
return self._reason
|
||||
|
||||
def destroy(self) -> types.states.TaskState:
|
||||
"""
|
||||
Invoked for destroying a deployed service
|
||||
"""
|
||||
self._debug('destroy')
|
||||
# If executing something, wait until finished to remove it
|
||||
# We simply replace the execution queue
|
||||
op = self._get_current_op()
|
||||
|
||||
if op == Operation.ERROR:
|
||||
return self._error('Machine is already in error state!')
|
||||
|
||||
ops = [Operation.STOP, Operation.REMOVE, Operation.FINISH]
|
||||
|
||||
if op == Operation.FINISH or op == Operation.WAIT:
|
||||
self._queue = ops
|
||||
return self._execute_queue() # Run it right now
|
||||
|
||||
# If an operation is pending, maybe checking, so we will wait until it finishes
|
||||
self._queue = [op] + ops
|
||||
|
||||
# Do not execute anything.here, just continue normally
|
||||
|
||||
return types.states.TaskState.RUNNING
|
||||
|
||||
def cancel(self) -> types.states.TaskState:
|
||||
"""
|
||||
This is a task method. As that, the excepted return values are
|
||||
types.states.DeployState.values RUNNING, FINISHED or ERROR.
|
||||
|
||||
This can be invoked directly by an administration or by the clean up
|
||||
of the deployed service (indirectly).
|
||||
When administrator requests it, the cancel is "delayed" and not
|
||||
invoked directly.
|
||||
"""
|
||||
return self.destroy()
|
||||
|
||||
@staticmethod
|
||||
def _op2str(op: Operation) -> str:
|
||||
return {
|
||||
Operation.CREATE: 'create',
|
||||
Operation.START: 'start',
|
||||
Operation.SUSPEND: 'suspend',
|
||||
Operation.REMOVE: 'remove',
|
||||
Operation.WAIT: 'wait',
|
||||
Operation.ERROR: 'error',
|
||||
Operation.FINISH: 'finish',
|
||||
Operation.RETRY: 'retry',
|
||||
}.get(op, '????')
|
||||
|
||||
def _debug(self, txt: str) -> None:
|
||||
logger.debug(
|
||||
'types.states.DeployState.at %s: name: %s, ip: %s, mac: %s, vmid:%s, queue: %s',
|
||||
txt,
|
||||
self._name,
|
||||
self._ip,
|
||||
self._mac,
|
||||
self._vmid,
|
||||
[OpenStackLiveUserService._op2str(op) for op in self._queue],
|
||||
)
|
||||
|
||||
|
||||
# Execution methods
|
||||
_EXECUTE_FNCS: dict[int, collections.abc.Callable[[OpenStackLiveUserService], None]] = {
|
||||
Operation.CREATE: OpenStackLiveUserService._create,
|
||||
Operation.RETRY: OpenStackLiveUserService._retry,
|
||||
Operation.START: OpenStackLiveUserService._start_machine,
|
||||
Operation.STOP: OpenStackLiveUserService._stop_machine,
|
||||
Operation.SUSPEND: OpenStackLiveUserService._suspend_machine,
|
||||
Operation.WAIT: OpenStackLiveUserService._wait,
|
||||
Operation.REMOVE: OpenStackLiveUserService._remove,
|
||||
}
|
||||
|
||||
# Check methods
|
||||
_CHECK_FNCS: dict[int, collections.abc.Callable[[OpenStackLiveUserService], types.states.TaskState]] = {
|
||||
Operation.CREATE: OpenStackLiveUserService._check_create,
|
||||
Operation.RETRY: OpenStackLiveUserService._check_retry,
|
||||
Operation.WAIT: OpenStackLiveUserService._check_wait,
|
||||
Operation.START: OpenStackLiveUserService._check_start,
|
||||
Operation.STOP: OpenStackLiveUserService._check_stop,
|
||||
Operation.SUSPEND: OpenStackLiveUserService._check_suspend,
|
||||
Operation.REMOVE: OpenStackLiveUserService._check_removed,
|
||||
}
|
||||
|
@ -213,16 +213,16 @@ class OpenstackClient: # pylint: disable=too-many-public-methods
|
||||
def _get_compute_endpoint(self) -> str:
|
||||
return self._get_endpoint_for('compute', 'compute_legacy')
|
||||
|
||||
def _get_endpoints_iterable(self, cache_key: str, *type_: str) -> list[str]:
|
||||
def _get_endpoints_iterable(self, cache_key: str, *types: str) -> list[str]:
|
||||
# If endpoint is cached, use it as first endpoint
|
||||
found_endpoints = list(self._get_endpoints_for(*type_))
|
||||
found_endpoints = list(self._get_endpoints_for(*types))
|
||||
if self.cache.get(cache_key) in found_endpoints:
|
||||
# If cached endpoint is in the list, use it as first endpoint
|
||||
found_endpoints = [self.cache.get(cache_key)] + list(
|
||||
set(found_endpoints) - {self.cache.get(cache_key)}
|
||||
)
|
||||
|
||||
logger.debug('Endpoints for %s: %s', type_, found_endpoints)
|
||||
logger.debug('Endpoints for %s: %s', types, found_endpoints)
|
||||
|
||||
return found_endpoints
|
||||
|
||||
@ -239,8 +239,6 @@ class OpenstackClient: # pylint: disable=too-many-public-methods
|
||||
cache_key = ''.join(endpoints_types)
|
||||
found_endpoints = self._get_endpoints_iterable(cache_key, *endpoints_types)
|
||||
|
||||
retrayable = False
|
||||
|
||||
for i, endpoint in enumerate(found_endpoints):
|
||||
try:
|
||||
logger.debug(
|
||||
@ -258,11 +256,9 @@ class OpenstackClient: # pylint: disable=too-many-public-methods
|
||||
logger.debug('Result: %s', r.content)
|
||||
return r
|
||||
except Exception as e:
|
||||
if isinstance(e, (requests.exceptions.Timeout, requests.exceptions.ConnectionError)):
|
||||
retrayable = True # Endpoint is down, can retry if none is working
|
||||
|
||||
if i == len(found_endpoints) - 1:
|
||||
if retrayable:
|
||||
# Endpoint is down, can retry if none is working
|
||||
if isinstance(e, (requests.exceptions.Timeout, requests.exceptions.ConnectionError)):
|
||||
raise exceptions.RetryableError('All endpoints failed') from e # With last exception
|
||||
raise e
|
||||
logger.warning('Error requesting %s: %s', endpoint + path, e)
|
||||
@ -303,6 +299,9 @@ class OpenstackClient: # pylint: disable=too-many-public-methods
|
||||
except Exception as e:
|
||||
# If last endpoint, raise exception
|
||||
if i == len(found_endpoints) - 1:
|
||||
# Endpoint is down, can retry if none is working
|
||||
if isinstance(e, (requests.exceptions.Timeout, requests.exceptions.ConnectionError)):
|
||||
raise exceptions.RetryableError('All endpoints failed') from e # With last exception
|
||||
raise e
|
||||
logger.warning('Error requesting %s: %s (%s)', endpoint + path, e, error_message)
|
||||
self.cache.remove(cache_key)
|
||||
@ -570,7 +569,7 @@ class OpenstackClient: # pylint: disable=too-many-public-methods
|
||||
# Very small timeout, so repeated operations will use same data
|
||||
# Any cache time less than 5 seconds will be fine, beceuse checks on
|
||||
# openstack are done every 5 seconds
|
||||
@decorators.cached(prefix='svr', timeout=4, key_helper=cache_key_helper)
|
||||
@decorators.cached(prefix='svr', timeout=consts.cache.SHORTEST_CACHE_TIMEOUT, key_helper=cache_key_helper)
|
||||
def get_server(self, server_id: str) -> openstack_types.ServerInfo:
|
||||
r = self._request_from_endpoint(
|
||||
'get',
|
||||
@ -753,6 +752,7 @@ class OpenstackClient: # pylint: disable=too-many-public-methods
|
||||
|
||||
def stop_server(self, server_id: str) -> None:
|
||||
# this does not returns anything
|
||||
# {"os-resetState": {"state": "error"}}
|
||||
self._request_from_endpoint(
|
||||
'post',
|
||||
endpoints_types=COMPUTE_ENDPOINT_TYPES,
|
||||
@ -761,6 +761,21 @@ class OpenstackClient: # pylint: disable=too-many-public-methods
|
||||
error_message='Stoping server',
|
||||
expects_json=False,
|
||||
)
|
||||
|
||||
def reboot_server(self, server_id: str, hard: bool = True) -> None:
|
||||
# Does not need return value
|
||||
try:
|
||||
type_reboot = 'HARD' if hard else 'SOFT'
|
||||
self._request_from_endpoint(
|
||||
'post',
|
||||
endpoints_types=COMPUTE_ENDPOINT_TYPES,
|
||||
path=f'/servers/{server_id}/action',
|
||||
data=f'{"reboot":{"type":"{type_reboot}"}}',
|
||||
error_message='Rebooting server',
|
||||
expects_json=False,
|
||||
)
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
def suspend_server(self, server_id: str) -> None:
|
||||
# this does not returns anything
|
||||
|
@ -35,6 +35,8 @@ import typing
|
||||
import dataclasses
|
||||
import enum
|
||||
|
||||
from uds.core.services.generics import exceptions
|
||||
|
||||
|
||||
class ServerStatus(enum.StrEnum):
|
||||
ACTIVE = 'ACTIVE' # The server is active.
|
||||
@ -212,6 +214,17 @@ class ServerInfo:
|
||||
access_addr_ipv6: str
|
||||
fault: typing.Optional[str]
|
||||
admin_pass: str
|
||||
|
||||
def validated(self) -> 'ServerInfo':
|
||||
"""
|
||||
Raises NotFoundError if server is lost
|
||||
|
||||
Returns:
|
||||
self
|
||||
"""
|
||||
if self.status.is_lost():
|
||||
raise exceptions.NotFoundError(f'Server {self.id} is lost')
|
||||
return self
|
||||
|
||||
@staticmethod
|
||||
def from_dict(d: dict[str, typing.Any]) -> 'ServerInfo':
|
||||
|
@ -35,8 +35,9 @@ import typing
|
||||
|
||||
from django.utils.translation import gettext_noop as _
|
||||
|
||||
from uds.core import services, types
|
||||
from uds.core.util import fields, validators
|
||||
from uds.core import types
|
||||
from uds.core.services.generics.dynamic.service import DynamicService
|
||||
from uds.core.util import validators
|
||||
from uds.core.ui import gui
|
||||
|
||||
from .publication import OpenStackLivePublication
|
||||
@ -54,8 +55,11 @@ if typing.TYPE_CHECKING:
|
||||
|
||||
AnyOpenStackProvider: typing.TypeAlias = typing.Union[OpenStackProvider, OpenStackProviderLegacy]
|
||||
|
||||
from uds.core.services.generics.dynamic.userservice import DynamicUserService
|
||||
from uds.core.services.generics.dynamic.publication import DynamicPublication
|
||||
|
||||
class OpenStackLiveService(services.Service):
|
||||
|
||||
class OpenStackLiveService(DynamicService):
|
||||
"""
|
||||
OpenStack Live Service
|
||||
"""
|
||||
@ -169,15 +173,15 @@ class OpenStackLiveService(services.Service):
|
||||
old_field_name='securityGroups',
|
||||
)
|
||||
|
||||
basename = fields.basename_field(order=9, tab=types.ui.Tab.MACHINE)
|
||||
lenname = fields.lenname_field(order=10, tab=types.ui.Tab.MACHINE)
|
||||
basename = DynamicService.basename
|
||||
lenname = DynamicService.lenname
|
||||
|
||||
maintain_on_error = fields.maintain_on_error_field(order=11, tab=types.ui.Tab.MACHINE)
|
||||
maintain_on_error = DynamicService.maintain_on_error
|
||||
|
||||
prov_uuid = gui.HiddenField()
|
||||
|
||||
_api: typing.Optional['openstack_client.OpenstackClient'] = None
|
||||
|
||||
cached_api: typing.Optional['openstack_client.OpenstackClient'] = None
|
||||
|
||||
# Note: currently, Openstack does not provides a way of specifying how to stop the server
|
||||
# At least, i have not found it on the documentation
|
||||
|
||||
@ -191,9 +195,6 @@ class OpenStackLiveService(services.Service):
|
||||
if values:
|
||||
validators.validate_basename(self.basename.value, self.lenname.as_int())
|
||||
|
||||
# self.ov.value = self.provider().serialize()
|
||||
# self.ev.value = self.provider().env.key
|
||||
|
||||
def provider(self) -> 'AnyOpenStackProvider':
|
||||
return typing.cast('AnyOpenStackProvider', super().provider())
|
||||
|
||||
@ -223,14 +224,50 @@ class OpenStackLiveService(services.Service):
|
||||
|
||||
@property
|
||||
def api(self) -> 'openstack_client.OpenstackClient':
|
||||
if not self._api:
|
||||
self._api = self.provider().api(projectid=self.project.value, region=self.region.value)
|
||||
if not self.cached_api:
|
||||
self.cached_api = self.provider().api(projectid=self.project.value, region=self.region.value)
|
||||
|
||||
return self._api
|
||||
return self.cached_api
|
||||
|
||||
def sanitized_name(self, name: str) -> str:
|
||||
return self.provider().sanitized_name(name)
|
||||
|
||||
def get_ip(self, caller_instance: 'DynamicUserService | DynamicPublication', vmid: str) -> str:
|
||||
return self.api.get_server(vmid).validated().addresses[0].ip
|
||||
|
||||
def get_mac(self, caller_instance: 'DynamicUserService | DynamicPublication', vmid: str) -> str:
|
||||
return self.api.get_server(vmid).validated().addresses[0].mac
|
||||
|
||||
def is_running(self, caller_instance: 'DynamicUserService | DynamicPublication', vmid: str) -> bool:
|
||||
return self.api.get_server(vmid).validated().power_state.is_running()
|
||||
|
||||
def start(self, caller_instance: 'DynamicUserService | DynamicPublication', vmid: str) -> None:
|
||||
if self.api.get_server(vmid).validated().power_state.is_running():
|
||||
return
|
||||
self.api.start_server(vmid)
|
||||
|
||||
def stop(self, caller_instance: 'DynamicUserService | DynamicPublication', vmid: str) -> None:
|
||||
if self.api.get_server(vmid).validated().power_state.is_stopped():
|
||||
return
|
||||
self.api.stop_server(vmid)
|
||||
|
||||
# Default shutdown is stop
|
||||
# Note that on openstack, stop is "soft", but may fail to stop if no agent is installed or not responding
|
||||
# We can anyway delete de machine even if it is not stopped
|
||||
|
||||
def reset(self, caller_instance: 'DynamicUserService | DynamicPublication', vmid: str) -> None:
|
||||
# Default is to stop "hard"
|
||||
return self.stop(caller_instance, vmid)
|
||||
|
||||
def delete(self, caller_instance: 'DynamicUserService | DynamicPublication', vmid: str) -> None:
|
||||
"""
|
||||
Removes the machine, or queues it for removal, or whatever :)
|
||||
"""
|
||||
if isinstance(caller_instance, OpenStackLiveUserService):
|
||||
self.api.delete_server(vmid)
|
||||
else:
|
||||
self.api.delete_snapshot(vmid)
|
||||
|
||||
def make_template(
|
||||
self, template_name: str, description: typing.Optional[str] = None
|
||||
) -> openstack_types.VolumeSnapshotInfo:
|
||||
@ -270,133 +307,5 @@ class OpenStackLiveService(services.Service):
|
||||
security_groups_ids=self.security_groups.value,
|
||||
)
|
||||
|
||||
def remove_template(self, templateId: str) -> None:
|
||||
"""
|
||||
invokes removeTemplate from parent provider
|
||||
"""
|
||||
self.api.delete_snapshot(templateId)
|
||||
|
||||
def get_machine_status(self, vmid: str) -> openstack_types.ServerStatus:
|
||||
vminfo = self.api.get_server(vmid)
|
||||
if vminfo.status in (openstack_types.ServerStatus.ERROR, openstack_types.ServerStatus.DELETED):
|
||||
logger.warning(
|
||||
'Got server status %s for %s: %s',
|
||||
vminfo.status,
|
||||
vmid,
|
||||
vminfo.fault,
|
||||
)
|
||||
return vminfo.status
|
||||
|
||||
def get_machine_power_state(self, vmid: str) -> openstack_types.PowerState:
|
||||
vminfo = self.api.get_server(vmid)
|
||||
return vminfo.power_state
|
||||
|
||||
def start_machine(self, vmid: str) -> None:
|
||||
"""
|
||||
Tries to start a machine. No check is done, it is simply requested to OpenStack.
|
||||
|
||||
This start also "resume" suspended/paused machines
|
||||
|
||||
Args:
|
||||
machineId: Id of the machine
|
||||
|
||||
Returns:
|
||||
"""
|
||||
# if already running, do nothing
|
||||
if self.get_machine_power_state(vmid) == openstack_types.PowerState.RUNNING:
|
||||
return
|
||||
|
||||
self.api.start_server(vmid)
|
||||
|
||||
def stop_machine(self, vmid: str) -> None:
|
||||
"""
|
||||
Tries to stop a machine. No check is done, it is simply requested to OpenStack
|
||||
|
||||
Args:
|
||||
machineId: Id of the machine
|
||||
|
||||
Returns:
|
||||
"""
|
||||
# If already stopped, do nothing
|
||||
if self.get_machine_power_state(vmid) == openstack_types.PowerState.SHUTDOWN:
|
||||
return
|
||||
|
||||
self.api.stop_server(vmid)
|
||||
|
||||
def reset_machine(self, vmid: str) -> None:
|
||||
"""
|
||||
Tries to stop a machine. No check is done, it is simply requested to OpenStack
|
||||
|
||||
Args:
|
||||
machineId: Id of the machine
|
||||
|
||||
Returns:
|
||||
"""
|
||||
self.api.reset_server(vmid)
|
||||
|
||||
def suspend_machine(self, vmid: str) -> None:
|
||||
"""
|
||||
Tries to suspend a machine. No check is done, it is simply requested to OpenStack
|
||||
|
||||
Args:
|
||||
machineId: Id of the machine
|
||||
|
||||
Returns:
|
||||
"""
|
||||
# If not running, do nothing
|
||||
if self.get_machine_power_state(vmid) != openstack_types.PowerState.RUNNING:
|
||||
return
|
||||
self.api.suspend_server(vmid)
|
||||
|
||||
def resume_machine(self, vmid: str) -> None:
|
||||
"""
|
||||
Tries to start a machine. No check is done, it is simply requested to OpenStack
|
||||
|
||||
Args:
|
||||
machineId: Id of the machine
|
||||
|
||||
Returns:
|
||||
"""
|
||||
# If not suspended, do nothing
|
||||
if self.get_machine_power_state(vmid) != openstack_types.PowerState.SUSPENDED:
|
||||
return
|
||||
self.api.resume_server(vmid)
|
||||
|
||||
def delete_machine(self, vmid: str) -> None:
|
||||
"""
|
||||
Tries to delete a machine. No check is done, it is simply requested to OpenStack
|
||||
|
||||
Args:
|
||||
machineId: Id of the machine
|
||||
|
||||
Returns:
|
||||
"""
|
||||
self.api.delete_server(vmid)
|
||||
|
||||
def get_server_address(self, vmid: str) -> openstack_types.ServerInfo.AddresInfo:
|
||||
"""
|
||||
Gets the mac address of first nic of the machine
|
||||
"""
|
||||
vminfo = self.api.get_server(vmid)
|
||||
return vminfo.addresses[0]
|
||||
|
||||
def get_basename(self) -> str:
|
||||
"""
|
||||
Returns the base name
|
||||
"""
|
||||
return self.basename.value
|
||||
|
||||
def get_lenname(self) -> int:
|
||||
"""
|
||||
Returns the length of numbers part
|
||||
"""
|
||||
return int(self.lenname.value)
|
||||
|
||||
def is_avaliable(self) -> bool:
|
||||
return self.provider().is_available()
|
||||
|
||||
def allows_errored_userservice_cleanup(self) -> bool:
|
||||
return not self.maintain_on_error.value
|
||||
|
||||
def keep_on_error(self) -> bool:
|
||||
return self.maintain_on_error.as_bool()
|
||||
|
Loading…
x
Reference in New Issue
Block a user