1
0
mirror of https://github.com/dkmstr/openuds.git synced 2025-02-02 09:47:13 +03:00

Advancing on ovirt, some refactoring and fixes

This commit is contained in:
Adolfo Gómez García 2024-03-17 23:22:39 +01:00
parent f497c388ba
commit 188b27eb90
No known key found for this signature in database
GPG Key ID: DD1ABF20724CDA23
22 changed files with 397 additions and 145 deletions

View File

@ -508,7 +508,7 @@ class BaseReadyChange(ActorV3Action):
if osmanager:
osmanager.to_ready(userService)
UserServiceManager().notify_ready_from_os_manager(userService, '')
UserServiceManager().notify_ready_from_os_manager(userService, '') # Currently, no data is received for os manager
# Generates a certificate and send it to client.
privateKey, cert, password = security.create_self_signed_cert(self._params['ip'])

View File

@ -72,7 +72,7 @@ class UserServiceManager(metaclass=singleton.Singleton):
def manager() -> 'UserServiceManager':
return UserServiceManager() # Singleton pattern will return always the same instance
def get_cache_state_filter(self, service_pool: ServicePool, level: int) -> Q:
def get_cache_state_filter(self, service_pool: ServicePool, level: types.services.CacheLevel) -> Q:
return Q(cache_level=level) & self.get_state_filter(service_pool.service)
@staticmethod
@ -179,18 +179,18 @@ class UserServiceManager(metaclass=singleton.Singleton):
in_use=False,
)
def create_cache_for(self, publication: ServicePoolPublication, cacheLevel: int) -> UserService:
def create_cache_for(self, publication: ServicePoolPublication, cache_level: types.services.CacheLevel) -> UserService:
"""
Creates a new cache for the deployed service publication at level indicated
"""
operations_logger.info(
'Creating a new cache element at level %s for publication %s',
cacheLevel,
cache_level,
publication,
)
cache = self._create_cache_user_service_at_db(publication, cacheLevel)
cache = self._create_cache_user_service_at_db(publication, cache_level)
ci = cache.get_instance()
state = ci.deploy_for_cache(cacheLevel)
state = ci.deploy_for_cache(cache_level)
UserServiceOpChecker.state_updater(cache, ci, state)
return cache
@ -277,7 +277,7 @@ class UserServiceManager(metaclass=singleton.Singleton):
return assigned
def move_to_level(self, cache: UserService, cache_level: int) -> None:
def move_to_level(self, cache: UserService, cache_level: types.services.CacheLevel) -> None:
"""
Moves a cache element from one level to another
@return: cache element
@ -394,7 +394,7 @@ class UserServiceManager(metaclass=singleton.Singleton):
service_pool.cached_users_services()
.select_for_update()
.filter(
cache_level=services.UserService.L1_CACHE,
cache_level=types.services.CacheLevel.L1,
state=State.USABLE,
os_state=State.USABLE,
)[:1],
@ -420,7 +420,7 @@ class UserServiceManager(metaclass=singleton.Singleton):
list[UserService],
service_pool.cached_users_services()
.select_for_update()
.filter(cache_level=services.UserService.L1_CACHE, state=State.USABLE)[:1],
.filter(cache_level=types.services.CacheLevel.L1, state=State.USABLE)[:1],
)
if caches: # If there is a cache, we will use it
cache = caches[0]
@ -450,7 +450,7 @@ class UserServiceManager(metaclass=singleton.Singleton):
service_pool,
types.stats.EventType.CACHE_HIT,
fld1=service_pool.cached_users_services()
.filter(cache_level=services.UserService.L1_CACHE, state=State.USABLE)
.filter(cache_level=types.services.CacheLevel.L1, state=State.USABLE)
.count(),
)
return cache
@ -462,7 +462,7 @@ class UserServiceManager(metaclass=singleton.Singleton):
caches = list(
service_pool.cached_users_services()
.select_for_update()
.filter(cache_level=services.UserService.L1_CACHE, state=State.PREPARING)[:1]
.filter(cache_level=types.services.CacheLevel.L1, state=State.PREPARING)[:1]
)
if caches: # If there is a cache, we will use it
cache = caches[0]
@ -491,7 +491,7 @@ class UserServiceManager(metaclass=singleton.Singleton):
service_pool,
events.types.stats.EventType.CACHE_MISS,
fld1=service_pool.cached_users_services()
.filter(cache_level=services.UserService.L1_CACHE, state=State.PREPARING)
.filter(cache_level=types.services.CacheLevel.L1, state=State.PREPARING)
.count(),
)
return cache

View File

@ -109,10 +109,6 @@ class UserService(Environmentable, Serializable):
method error_reason can be called multiple times, including
serializations in middle, so remember to include reason of error in serializations
"""
USER: int = 0 # : Constant for User cache level
L1_CACHE = 1 # : Constant for Cache of level 1
L2_CACHE = 2 # : Constant for Cache of level 2
# : Suggested time for deployment finishing, in seconds
# : This allows the manager to, if deployment is no done in 1 step, re-check
# : the deployment once this time has passed, i.e. KVM COW deployment takes
@ -351,7 +347,7 @@ class UserService(Environmentable, Serializable):
"""
return types.states.TaskState.FINISHED
def deploy_for_cache(self, level: int) -> types.states.TaskState:
def deploy_for_cache(self, level: types.services.CacheLevel) -> types.states.TaskState:
"""
Deploys a user deployment as cache.
@ -465,7 +461,7 @@ class UserService(Environmentable, Serializable):
"""
pass
def move_to_cache(self, level: int) -> types.states.TaskState:
def move_to_cache(self, level: types.services.CacheLevel) -> types.states.TaskState:
"""
This method is invoked whenever the core needs to move from the current
cache level to a new cache level an user deployment.

View File

@ -108,3 +108,9 @@ class ReadyStatus(enum.IntEnum):
Returns the code as a percentage (0-100)
"""
return 25 * self.value
class CacheLevel(enum.IntEnum):
NONE = 0 # : Constant for User cache level (no cache at all)
L1 = 1 # : Constant for Cache of level 1
L2 = 2 # : Constant for Cache of level 2

View File

@ -41,7 +41,7 @@ from uds.core.types.states import State
from uds.core.managers.userservice import UserServiceManager
from uds.core.services.exceptions import MaxServicesReachedError
from uds.models import ServicePool, ServicePoolPublication, UserService
from uds.core import services
from uds.core import types
from uds.core.util import log
from uds.core.jobs import Job
@ -188,14 +188,14 @@ class ServiceCacheUpdater(Job):
# to create new items over the limit stablisshed, so we will not remove them anymore
l1_cache_count: int = (
servicepool.cached_users_services()
.filter(UserServiceManager().get_cache_state_filter(servicepool, services.UserService.L1_CACHE))
.filter(UserServiceManager().get_cache_state_filter(servicepool, types.services.CacheLevel.L1))
.count()
)
l2_cache_count: int = (
(
servicepool.cached_users_services()
.filter(
UserServiceManager().get_cache_state_filter(servicepool, services.UserService.L2_CACHE)
UserServiceManager().get_cache_state_filter(servicepool, types.services.CacheLevel.L2)
)
.count()
)
@ -280,7 +280,7 @@ class ServiceCacheUpdater(Job):
.select_for_update()
.filter(
UserServiceManager().get_cache_state_filter(
servicepool_stats.servicepool, services.UserService.L2_CACHE
servicepool_stats.servicepool, types.services.CacheLevel.L2
)
)
.order_by('creation_date')
@ -297,13 +297,13 @@ class ServiceCacheUpdater(Job):
break
if valid is not None:
valid.move_to_level(services.UserService.L1_CACHE)
valid.move_to_level(types.services.CacheLevel.L1)
return
try:
# This has a velid publication, or it will not be here
UserServiceManager().create_cache_for(
typing.cast(ServicePoolPublication, servicepool_stats.servicepool.active_publication()),
services.UserService.L1_CACHE,
types.services.CacheLevel.L1,
)
except MaxServicesReachedError:
log.log(
@ -336,7 +336,7 @@ class ServiceCacheUpdater(Job):
# This has a velid publication, or it will not be here
UserServiceManager().create_cache_for(
typing.cast(ServicePoolPublication, servicepool_stats.servicepool.active_publication()),
services.UserService.L2_CACHE,
types.services.CacheLevel.L2,
)
except MaxServicesReachedError:
logger.warning(
@ -358,7 +358,7 @@ class ServiceCacheUpdater(Job):
for i in servicepool_stats.servicepool.cached_users_services()
.filter(
UserServiceManager().get_cache_state_filter(
servicepool_stats.servicepool, services.UserService.L1_CACHE
servicepool_stats.servicepool, types.services.CacheLevel.L1
)
)
.order_by('-creation_date')
@ -384,7 +384,7 @@ class ServiceCacheUpdater(Job):
break
if valid is not None:
valid.move_to_level(services.UserService.L2_CACHE)
valid.move_to_level(types.services.CacheLevel.L2)
return
cache = cacheItems[0]
@ -400,7 +400,7 @@ class ServiceCacheUpdater(Job):
servicepool_stats.servicepool.cached_users_services()
.filter(
UserServiceManager().get_cache_state_filter(
servicepool_stats.servicepool, services.UserService.L2_CACHE
servicepool_stats.servicepool, types.services.CacheLevel.L2
)
)
.order_by('creation_date')

View File

@ -44,7 +44,7 @@ from django.db import models
from uds.core.util import calendar
from uds.core.util import log
from uds.core.managers.userservice import UserServiceManager
from uds.core import services, types, consts
from uds.core import types, consts
from .calendar import Calendar
from .uuid_model import UUIDModel
@ -195,9 +195,9 @@ class CalendarAction(UUIDModel):
UserServiceManager().get_cache_state_filter(
self.service_pool,
(
services.UserService.L1_CACHE
types.services.CacheLevel.L1
if self.action == consts.calendar.CALENDAR_ACTION_CLEAN_CACHE_L1['id']
else services.UserService.L2_CACHE
else types.services.CacheLevel.L2
),
)
):

View File

@ -648,7 +648,7 @@ class ServicePool(UUIDModel, TaggingMixin):
Returns:
A list of db records (userService) with assinged user services
"""
return self.userServices.filter(cache_level=0)
return self.userServices.filter(cache_level=types.services.CacheLevel.NONE)
def erroneous_user_services(self) -> 'models.QuerySet[UserService]':
"""

View File

@ -567,7 +567,7 @@ class UserService(UUIDModel, properties.PropertiesMixin):
"""
self.remove_or_cancel()
def move_to_level(self, cacheLevel: int) -> None:
def move_to_level(self, cacheLevel: types.services.CacheLevel) -> None:
"""
Moves cache items betwen levels, managed directly

View File

@ -52,6 +52,10 @@ if typing.TYPE_CHECKING:
logger = logging.getLogger(__name__)
MAX_CHECK_COUNT: typing.Final[int] = (
20 # If in suggested_delay * MAX_CHECK_COUNT we are still in same state, we will resign and return error
)
class Operation(enum.IntEnum):
"""
@ -86,6 +90,13 @@ UP_STATES: typing.Final[set[ov_types.VMStatus]] = {
ov_types.VMStatus.RESTORING_STATE,
}
DOWN_STATES: typing.Final[set[ov_types.VMStatus]] = {
ov_types.VMStatus.DOWN,
ov_types.VMStatus.POWERING_DOWN,
ov_types.VMStatus.SUSPENDED,
ov_types.VMStatus.PAUSED,
}
class OVirtLinkedUserService(services.UserService, autoserializable.AutoSerializable):
"""
@ -109,6 +120,22 @@ class OVirtLinkedUserService(services.UserService, autoserializable.AutoSerializ
_reason = autoserializable.StringField(default='')
_queue = autoserializable.ListField[Operation]()
# helpers
def _get_checks_counter(self) -> int:
with self.storage.as_dict() as data:
return data.get('exec_count', 0)
def _set_checks_counter(self, value: int) -> None:
with self.storage.as_dict() as data:
data['exec_count'] = value
def _inc_checks_counter(self, info: typing.Optional[str] = None) -> typing.Optional[types.states.TaskState]:
count = self._get_checks_counter() + 1
self._set_checks_counter(count)
if count > MAX_CHECK_COUNT:
return self._error(f'Max checks reached on {info or "unknown"}')
return None
# Utility overrides for type checking...
def service(self) -> 'OVirtLinkedService':
return typing.cast('OVirtLinkedService', super().service())
@ -274,7 +301,7 @@ if sys.platform == 'win32':
"""
Deploys an service instance for cache
"""
self._init_queue_for_deploy(level == self.L2_CACHE)
self._init_queue_for_deploy(level == types.services.CacheLevel.L2)
return self._execute_queue()
def _init_queue_for_deploy(self, for_level_2: bool = False) -> None:
@ -299,6 +326,10 @@ if sys.platform == 'win32':
self._name,
check_state,
)
if (check_result := self._inc_checks_counter('check_machine_state')) is not None:
return check_result
vm_info = self.service().provider().api.get_machine_info(self._vmid)
if vm_info.status == ov_types.VMStatus.UNKNOWN:
return self._error('Machine not found')
@ -355,31 +386,20 @@ if sys.platform == 'win32':
if op == Operation.FINISH:
return types.states.TaskState.FINISHED
fncs: dict[Operation, typing.Optional[collections.abc.Callable[[], str]]] = {
Operation.CREATE: self._create,
Operation.RETRY: self._retry,
Operation.START: self._start_machine,
Operation.STOP: self._stop_machine,
Operation.SUSPEND: self._suspend_machine,
Operation.WAIT: self._wait,
Operation.REMOVE: self._remove,
Operation.CHANGEMAC: self._change_mac,
}
# Reset checking count (for checks)
self._set_checks_counter(0)
try:
operation_runner: typing.Optional[collections.abc.Callable[[], str]] = fncs.get(op, None)
operation_runner = EXECUTORS[op]
if operation_runner is None:
return self._error(f'Unknown operation found at execution queue ({op})')
operation_runner()
operation_runner(self)
return types.states.TaskState.RUNNING
except Exception as e:
return self._error(e)
# Queue execution methods
def _retry(self) -> types.states.TaskState:
def _retry(self) -> None:
"""
Used to retry an operation
In fact, this will not be never invoked, unless we push it twice, because
@ -387,15 +407,15 @@ if sys.platform == 'win32':
At executeQueue this return value will be ignored, and it will only be used at check_state
"""
return types.states.TaskState.FINISHED
return
def _wait(self) -> types.states.TaskState:
def _wait(self) -> None:
"""
Executes opWait, it simply waits something "external" to end
"""
return types.states.TaskState.RUNNING
return
def _create(self) -> str:
def _create(self) -> None:
"""
Deploys a machine from template for user/cache
"""
@ -413,9 +433,7 @@ if sys.platform == 'win32':
self._vmid = self.service().deploy_from_template(name, comments, template_id).id
return types.states.TaskState.RUNNING
def _remove(self) -> str:
def _remove(self) -> None:
"""
Removes a machine from system
"""
@ -429,9 +447,7 @@ if sys.platform == 'win32':
else:
self.service().provider().api.remove_machine(self._vmid)
return types.states.TaskState.RUNNING
def _start_machine(self) -> str:
def _start_machine(self) -> None:
"""
Powers on the machine
"""
@ -440,14 +456,13 @@ if sys.platform == 'win32':
if vminfo.status == ov_types.VMStatus.UNKNOWN:
raise Exception('Machine not found')
if vminfo.status not in UP_STATES:
self._push_front_op(Operation.RETRY)
else:
self.service().provider().api.start_machine(self._vmid)
if vminfo.status in UP_STATES:
# Already started, return
return
return types.states.TaskState.RUNNING
self.service().provider().api.start_machine(self._vmid)
def _stop_machine(self) -> str:
def _stop_machine(self) -> None:
"""
Powers off the machine
"""
@ -456,17 +471,12 @@ if sys.platform == 'win32':
if vminfo.status == ov_types.VMStatus.UNKNOWN:
raise Exception('Machine not found')
if vminfo.status == ov_types.VMStatus.DOWN: # Already stoped, return
return types.states.TaskState.RUNNING
if vminfo.status in DOWN_STATES:
return
if vminfo.status not in UP_STATES:
self._push_front_op(Operation.RETRY)
else:
self.service().provider().api.stop_machine(self._vmid)
self.service().provider().api.stop_machine(self._vmid)
return types.states.TaskState.RUNNING
def _suspend_machine(self) -> str:
def _suspend_machine(self) -> None:
"""
Suspends the machine
"""
@ -475,8 +485,8 @@ if sys.platform == 'win32':
if vminfo.status == ov_types.VMStatus.UNKNOWN:
raise Exception('Machine not found')
if vminfo.status == ov_types.VMStatus.SUSPENDED: # Already suspended, return
return types.states.TaskState.RUNNING
if vminfo.status in DOWN_STATES: # Already in an state that is not up,
return
if vminfo.status not in UP_STATES:
self._push_front_op(
@ -485,9 +495,7 @@ if sys.platform == 'win32':
else:
self.service().provider().api.suspend_machine(self._vmid)
return types.states.TaskState.RUNNING
def _change_mac(self) -> str:
def _change_mac(self) -> None:
"""
Changes the mac of the first nic
"""
@ -495,9 +503,23 @@ if sys.platform == 'win32':
# Fix usb if needed
self.service().fix_usb(self._vmid)
# Check methods
def _retry_checker(self) -> types.states.TaskState:
"""
Used to retry an operation
In fact, this will not be never invoked, unless we push it twice, because
check_state method will "pop" first item when a check operation returns types.states.DeployState.FINISHED
At executeQueue this return value will be ignored, and it will only be used at check_state
"""
return types.states.TaskState.FINISHED
def _wait_checker(self) -> types.states.TaskState:
"""
Executes opWait, it simply waits something "external" to end
"""
return types.states.TaskState.RUNNING
# Check methods
def _create_checker(self) -> types.states.TaskState:
"""
Checks the state of a deploy for an user or cache
@ -514,15 +536,13 @@ if sys.platform == 'win32':
"""
Checks if machine has stoped
"""
return self._check_machine_state([ov_types.VMStatus.DOWN])
return self._check_machine_state(DOWN_STATES)
def _suspend_checker(self) -> types.states.TaskState:
"""
Check if the machine has suspended
Check if the machine has suspended, or something like that
"""
return self._check_machine_state(
[ov_types.VMStatus.SUSPENDED, ov_types.VMStatus.DOWN]
) # Down is also valid for us
return self._check_machine_state(DOWN_STATES)
def _remove_checker(self) -> types.states.TaskState:
"""
@ -551,26 +571,10 @@ if sys.platform == 'win32':
if op == Operation.FINISH:
return types.states.TaskState.FINISHED
fncs: dict[Operation, typing.Optional[collections.abc.Callable[[], types.states.TaskState]]] = {
Operation.CREATE: self._create_checker,
Operation.RETRY: self._retry,
Operation.WAIT: self._wait,
Operation.START: self._start_checker,
Operation.STOP: self._stop_checker,
Operation.SUSPEND: self._suspend_checker,
Operation.REMOVE: self._remove_checker,
Operation.CHANGEMAC: self._mac_checker,
}
try:
operation_checker: typing.Optional[
typing.Optional[collections.abc.Callable[[], types.states.TaskState]]
] = fncs.get(op, None)
operation_checker = CHECKERS[op]
if operation_checker is None:
return self._error(f'Unknown operation found at check queue ({op})')
state = operation_checker()
state = operation_checker(self)
if state == types.states.TaskState.FINISHED:
self._pop_current_op() # Remove runing op
return self._execute_queue()
@ -586,7 +590,7 @@ if sys.platform == 'win32':
if Operation.REMOVE in self._queue:
return types.states.TaskState.RUNNING
if level == self.L1_CACHE:
if level == types.services.CacheLevel.L1:
self._queue = [Operation.START, Operation.FINISH]
else:
self._queue = [Operation.START, Operation.SUSPEND, Operation.FINISH]
@ -665,3 +669,27 @@ if sys.platform == 'win32':
self._vmid,
[OVirtLinkedUserService._op2str(op) for op in self._queue],
)
EXECUTORS: dict[Operation, collections.abc.Callable[[OVirtLinkedUserService], None]] = {
Operation.CREATE: OVirtLinkedUserService._create,
Operation.RETRY: OVirtLinkedUserService._retry,
Operation.START: OVirtLinkedUserService._start_machine,
Operation.STOP: OVirtLinkedUserService._stop_machine,
Operation.SUSPEND: OVirtLinkedUserService._suspend_machine,
Operation.WAIT: OVirtLinkedUserService._wait,
Operation.REMOVE: OVirtLinkedUserService._remove,
Operation.CHANGEMAC: OVirtLinkedUserService._change_mac,
}
CHECKERS: dict[Operation, collections.abc.Callable[[OVirtLinkedUserService], types.states.TaskState]] = {
Operation.CREATE: OVirtLinkedUserService._create_checker,
Operation.RETRY: OVirtLinkedUserService._retry_checker,
Operation.WAIT: OVirtLinkedUserService._wait_checker,
Operation.START: OVirtLinkedUserService._start_checker,
Operation.STOP: OVirtLinkedUserService._stop_checker,
Operation.SUSPEND: OVirtLinkedUserService._suspend_checker,
Operation.REMOVE: OVirtLinkedUserService._remove_checker,
Operation.CHANGEMAC: OVirtLinkedUserService._mac_checker,
}

View File

@ -65,10 +65,10 @@ class OVirtDeferredRemoval(jobs.Job):
# Tries to stop machine sync when found, if any error is done, defer removal for a scheduled task
try:
# First check state & stop machine if needed
state = instance.api.get_machine_state(vmid)
if state in (ov_types.VMStatus.UP, ov_types.VMStatus.POWERING_UP, ov_types.VMStatus.SUSPENDED):
status = instance.api.get_machine_info(vmid).status
if status in (ov_types.VMStatus.UP, ov_types.VMStatus.POWERING_UP, ov_types.VMStatus.SUSPENDED):
instance.api.stop_machine(vmid)
elif state != ov_types.VMStatus.UNKNOWN: # Machine exists, remove it later
elif status != ov_types.VMStatus.UNKNOWN: # Machine exists, remove it later
instance.storage.save_to_db('tr' + vmid, vmid, attr1='tRm')
except Exception as e:
@ -100,8 +100,8 @@ class OVirtDeferredRemoval(jobs.Job):
logger.debug('Found %s for removal %s', vmid, i)
# If machine is powered on, tries to stop it
# tries to remove in sync mode
state = instance.api.get_machine_state(vmid)
if state in (
status = instance.api.get_machine_info(vmid).status
if status in (
ov_types.VMStatus.UP,
ov_types.VMStatus.POWERING_UP,
ov_types.VMStatus.SUSPENDED,
@ -109,7 +109,7 @@ class OVirtDeferredRemoval(jobs.Job):
instance.api.stop_machine(vmid)
return
if state != ov_types.VMStatus.UNKNOWN: # Machine exists, try to remove it now
if status != ov_types.VMStatus.UNKNOWN: # Machine exists, try to remove it now
instance.api.remove_machine(vmid)
# It this is reached, remove check

View File

@ -180,11 +180,11 @@ class OpenNebulaLiveDeployment(services.UserService, autoserializable.AutoSerial
self._init_queue_for_deploy(False)
return self._execute_queue()
def deploy_for_cache(self, level: int) -> types.states.TaskState:
def deploy_for_cache(self, level: types.services.CacheLevel) -> types.states.TaskState:
"""
Deploys an service instance for cache
"""
self._init_queue_for_deploy(level == self.L2_CACHE)
self._init_queue_for_deploy(level == types.services.CacheLevel.L2)
return self._execute_queue()
def _init_queue_for_deploy(self, for_level_2: bool = False) -> None:
@ -435,14 +435,14 @@ class OpenNebulaLiveDeployment(services.UserService, autoserializable.AutoSerial
except Exception as e:
return self._error(e)
def move_to_cache(self, level: int) -> types.states.TaskState:
def move_to_cache(self, level: types.services.CacheLevel) -> types.states.TaskState:
"""
Moves machines between cache levels
"""
if Operation.REMOVE in self._queue:
return types.states.TaskState.RUNNING
if level == self.L1_CACHE:
if level == types.services.CacheLevel.L1:
self._queue = [Operation.START, Operation.FINISH]
else:
self._queue = [Operation.START, Operation.SHUTDOWN, Operation.FINISH]

View File

@ -199,18 +199,18 @@ class OpenStackLiveUserService(
Deploys an service instance for an user.
"""
logger.debug('Deploying for user')
self._init_queue_for_deploy(False)
self._init_queue_for_deploy(types.services.CacheLevel.NONE)
return self._execute_queue()
def deploy_for_cache(self, level: int) -> types.states.TaskState:
def deploy_for_cache(self, level: types.services.CacheLevel) -> types.states.TaskState:
"""
Deploys an service instance for cache
"""
self._init_queue_for_deploy(level)
return self._execute_queue()
def _init_queue_for_deploy(self, level: int) -> None:
if level in (self.USER, self.L1_CACHE):
def _init_queue_for_deploy(self, level: types.services.CacheLevel) -> None:
if level in (types.services.CacheLevel.NONE, types.services.CacheLevel.L1):
self._queue = [Operation.CREATE, Operation.FINISH]
else:
self._queue = [Operation.CREATE, Operation.WAIT, Operation.SUSPEND, Operation.FINISH]
@ -459,14 +459,14 @@ class OpenStackLiveUserService(
except Exception as e:
return self._error(e)
def move_to_cache(self, level: int) -> types.states.TaskState:
def move_to_cache(self, level: types.services.CacheLevel) -> types.states.TaskState:
"""
Moves machines between cache levels
"""
if Operation.REMOVE in self._queue:
return types.states.TaskState.RUNNING
if level == self.L1_CACHE:
if level == types.services.CacheLevel.L1:
self._queue = [Operation.START, Operation.FINISH]
else: # Currently L2 is not supported
self._queue = [Operation.START, Operation.SUSPEND, Operation.FINISH]

View File

@ -253,11 +253,11 @@ if sys.platform == 'win32':
self._init_queue_for_deploy(False)
return self._execute_queue()
def deploy_for_cache(self, level: int) -> types.states.TaskState:
def deploy_for_cache(self, level: types.services.CacheLevel) -> types.states.TaskState:
"""
Deploys an service instance for cache
"""
self._init_queue_for_deploy(level == self.L2_CACHE)
self._init_queue_for_deploy(level == types.services.CacheLevel.L2)
return self._execute_queue()
def _init_queue_for_deploy(self, forLevel2: bool = False) -> None:
@ -595,14 +595,14 @@ if sys.platform == 'win32':
except Exception as e:
return self._error(e)
def move_to_cache(self, level: int) -> types.states.TaskState:
def move_to_cache(self, level: types.services.CacheLevel) -> types.states.TaskState:
"""
Moves machines between cache levels
"""
if Operation.REMOVE in self._queue:
return types.states.TaskState.RUNNING
if level == self.L1_CACHE:
if level == types.services.CacheLevel.L1:
self._queue = [Operation.START, Operation.FINISH]
else:
self._queue = [Operation.START, Operation.SHUTDOWN, Operation.FINISH]

View File

@ -181,15 +181,15 @@ class XenLinkedDeployment(services.UserService, autoserializable.AutoSerializabl
self._init_queue_for_deployment(False)
return self._execute_queue()
def deploy_for_cache(self, level: int) -> types.states.TaskState:
def deploy_for_cache(self, level: types.services.CacheLevel) -> types.states.TaskState:
"""
Deploys an service instance for cache
"""
self._init_queue_for_deployment(level == self.L2_CACHE)
self._init_queue_for_deployment(level == types.services.CacheLevel.L2)
return self._execute_queue()
def _init_queue_for_deployment(self, forLevel2: bool = False) -> None:
if forLevel2 is False:
def _init_queue_for_deployment(self, cache_l2: bool = False) -> None:
if cache_l2 is False:
self._queue = [
Operation.CREATE,
Operation.CONFIGURE,
@ -498,14 +498,14 @@ class XenLinkedDeployment(services.UserService, autoserializable.AutoSerializabl
except Exception as e:
return self._error(e)
def move_to_cache(self, level: int) -> types.states.TaskState:
def move_to_cache(self, level: types.services.CacheLevel) -> types.states.TaskState:
"""
Moves machines between cache levels
"""
if Operation.REMOVE in self._queue:
return types.states.TaskState.RUNNING
if level == self.L1_CACHE:
if level == types.services.CacheLevel.L1:
self._queue = [Operation.START, Operation.FINISH]
else:
self._queue = [Operation.START, Operation.SUSPEND, Operation.FINISH]

View File

@ -59,7 +59,7 @@ class TestOpenstackLiveService(UDSTransactionTestCase):
# Test Deploy for cache, should raise Exception due
# to the fact fixed services cannot have cached items
with self.assertRaises(Exception):
userservice.deploy_for_cache(level=1)
userservice.deploy_for_cache(level=types.services.CacheLevel.L1)
state = userservice.deploy_for_user(models.User())

View File

@ -79,7 +79,7 @@ STORAGES_INFO: list[ov_types.StorageInfo] = [
id=f'stid-{i}',
name=f'storage-{i}',
type=from_enum(ov_types.StorageType, i),
available=(i+4) * 1024 * 1024 * 1024, # So all storages has enough space
available=(i + 4) * 1024 * 1024 * 1024, # So all storages has enough space
used=i * 1024 * 1024 * 1024 // 2,
status=from_list([ov_types.StorageStatus.ACTIVE, ov_types.StorageStatus.INACTIVE], i),
)
@ -255,7 +255,9 @@ def patch_provider_api(
) -> typing.Generator[mock.Mock, None, None]:
client = create_client_mock()
# api is a property, patch it correctly
with mock.patch('uds.services.OVirt.provider.OVirtProvider.api', new_callable=mock.PropertyMock, **kwargs) as api:
with mock.patch(
'uds.services.OVirt.provider.OVirtProvider.api', new_callable=mock.PropertyMock, **kwargs
) as api:
api.return_value = client
yield client
@ -273,7 +275,9 @@ def create_provider(**kwargs: typing.Any) -> 'provider.OVirtProvider':
)
def create_linked_service(provider: typing.Optional[provider.OVirtProvider] = None, **kwargs: typing.Any) -> 'service_linked.OVirtLinkedService':
def create_linked_service(
provider: typing.Optional[provider.OVirtProvider] = None, **kwargs: typing.Any
) -> 'service_linked.OVirtLinkedService':
"""
Create a service
"""
@ -294,23 +298,26 @@ def create_publication(service: 'service_linked.OVirtLinkedService') -> publicat
Create a publication
"""
uuid_ = str(uuid.uuid4())
return publication.OVirtPublication(
pub = publication.OVirtPublication(
environment=environment.Environment.private_environment(uuid_),
service=service,
revision=1,
servicepool_name='servicepool_name',
uuid=uuid_,
)
pub._template_id = random.choice(TEMPLATES_INFO).id
return pub
def create_linked_userservice(
service: 'service_linked.OVirtLinkedService',
service: typing.Optional['service_linked.OVirtLinkedService'] = None,
publication: typing.Optional[publication.OVirtPublication] = None,
) -> 'deployment_linked.OVirtLinkedUserService':
"""
Create a linked user service
"""
uuid_ = str(uuid.uuid4())
service = service or create_linked_service()
return deployment_linked.OVirtLinkedUserService(
environment=environment.Environment.private_environment(uuid_),
service=service,

View File

@ -38,7 +38,7 @@ from tests.utils.test import UDSTransactionTestCase
from uds.core.environment import Environment
from uds.services.OVirt.deployment import Operation as Operation, OVirtLinkedUserService
from uds.services.OVirt.deployment_linked import Operation as Operation, OVirtLinkedUserService
# if not data.startswith(b'v'):
# return super().unmarshal(data)

View File

@ -30,12 +30,10 @@
"""
Author: Adolfo Gómez, dkmaster at dkmon dot com
"""
import pickle
import typing
from tests.utils.test import UDSTestCase
from uds.core.environment import Environment
from uds.core.util import autoserializable
from uds.services.OVirt.publication import OVirtPublication as Publication

View File

@ -40,6 +40,18 @@ from ...utils.test import UDSTestCase
class TestProxmovLinkedService(UDSTestCase):
def test_service_linked_data(self) -> None:
"""
Test the linked service data is loaded correctly from fixture
"""
service = fixtures.create_linked_service()
utils.check_userinterface_values(service, fixtures.SERVICE_VALUES_DICT)
self.assertEqual(service.get_macs_range(), service.provider().get_macs_range())
self.assertEqual(service.get_basename(), service.basename.value)
self.assertEqual(service.get_lenname(), service.lenname.value)
self.assertEqual(service.get_display(), service.display.value)
def test_service_is_available(self) -> None:
"""
Test the provider
@ -64,7 +76,7 @@ class TestProxmovLinkedService(UDSTestCase):
with fixtures.patch_provider_api() as _api:
service = fixtures.create_linked_service()
storage = utils.id_from_list(fixtures.STORAGES_INFO, 'id', service.datastore.value)
storage = utils.find_attr_in_list(fixtures.STORAGES_INFO, 'id', service.datastore.value)
# Ensure available is greater that configured on service
old_available = storage.available # For future tests to restore it
try:
@ -75,7 +87,7 @@ class TestProxmovLinkedService(UDSTestCase):
storage.available = (service.reserved_storage_gb.value - 1) * 1024 * 1024 * 1024
with self.assertRaises(Exception):
service.verify_free_storage()
finally:
finally:
storage.available = old_available
def test_sanitized_name(self) -> None:
@ -99,4 +111,39 @@ class TestProxmovLinkedService(UDSTestCase):
service.datastore.value,
service.display.value,
)
def test_deploy_from_template(self) -> None:
with fixtures.patch_provider_api() as api:
service = fixtures.create_linked_service()
# Ensure that the template is deployed
service.deploy_from_template('test', 'test comments', fixtures.TEMPLATES_INFO[0].id)
api.deploy_from_template.assert_called_with(
'test',
'test comments',
fixtures.TEMPLATES_INFO[0].id,
service.cluster.value,
service.display.value,
service.usb.value,
service.memory.value,
service.guaranteed_memory.value,
)
def test_fix_usb(self) -> None:
with fixtures.patch_provider_api() as api:
service = fixtures.create_linked_service()
# first, with native, should call fix_usb
service.usb.value = 'native'
service.fix_usb(fixtures.VMS_INFO[0].id)
api.fix_usb.assert_called_with(service.machine.value)
# Now, with "disabled" should not call fix_usb
api.fix_usb.reset_mock()
service.usb.value = 'disabled'
service.fix_usb(fixtures.VMS_INFO[0].id)
api.fix_usb.assert_not_called()
def test_get_console_connection(self) -> None:
with fixtures.patch_provider_api() as api:
service = fixtures.create_linked_service()
# Ensure that the console connection is retrieved
service.get_console_connection(service.machine.value)
api.get_console_connection_info.assert_called_with(service.machine.value)

View File

@ -0,0 +1,156 @@
# -*- coding: utf-8 -*-
#
# Copyright (c) 2024 Virtual Cable S.L.U.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without modification,
# are permitted provided that the following conditions are met:
#
# * Redistributions of source code must retain the above copyright notice,
# this list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above copyright notice,
# this list of conditions and the following disclaimer in the documentation
# and/or other materials provided with the distribution.
# * Neither the name of Virtual Cable S.L.U. nor the names of its contributors
# may be used to endorse or promote products derived from this software
# without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
# DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
# SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
"""
Author: Adolfo Gómez, dkmaster at dkmon dot com
"""
from uds.core import types
from uds.services.OVirt.deployment_linked import Operation
from uds.services.OVirt.ovirt import types as ov_types
from . import fixtures
from ... import utils
from ...utils.test import UDSTransactionTestCase
from ...utils.generators import limited_iterator
# We use transactions on some related methods (storage access, etc...)
class TestProxmovLinkedService(UDSTransactionTestCase):
def setUp(self) -> None:
# Set machine state for fixture to
for vm in fixtures.VMS_INFO:
vm.status = ov_types.VMStatus.DOWN
def test_max_check_works(self) -> None:
with fixtures.patch_provider_api() as _api:
userservice = fixtures.create_linked_userservice()
state = userservice.deploy_for_cache(level=1)
for _ in limited_iterator(lambda: state == types.states.TaskState.RUNNING, limit=128):
state = userservice.check_state()
self.assertEqual(state, types.states.TaskState.ERROR)
self.assertGreater(
userservice._get_checks_counter(), 0
) # Should have any configured value, but greater than 0
def test_userservice_linked_cache_l1(self) -> None:
"""
Test the user service
"""
with fixtures.patch_provider_api() as api:
userservice = fixtures.create_linked_userservice()
service = userservice.service()
service.usb.value = 'native' # With usb
_publication = userservice.publication()
state = userservice.deploy_for_cache(level=1)
self.assertEqual(state, types.states.TaskState.RUNNING)
# Ensure that in the event of failure, we don't loop forever
for counter in limited_iterator(lambda: state == types.states.TaskState.RUNNING, limit=128):
# this user service expects the machine to be started at some point, so after a few iterations, we set it to started
# note that the user service has a counter for max "recheck" without any success, and if reached, it will fail
if counter == 12:
vm = utils.find_attr_in_list(fixtures.VMS_INFO, 'id', userservice._vmid)
vm.status = ov_types.VMStatus.UP
state = userservice.check_state()
self.assertEqual(state, types.states.TaskState.FINISHED)
self.assertEqual(userservice._name[: len(service.get_basename())], service.get_basename())
self.assertEqual(len(userservice._name), len(service.get_basename()) + service.get_lenname())
# Assarts has an vmid
self.assertTrue(bool(userservice._vmid))
# Assert several deploy api methods has been called, no matter args
# tested on other tests
api.get_storage_info.assert_called()
api.deploy_from_template.assert_called()
api.get_machine_info.assert_called()
api.update_machine_mac.assert_called()
api.fix_usb.assert_called()
api.start_machine.assert_called()
def test_userservice_linked_cache_l2(self) -> None:
"""
Test the user service
"""
with fixtures.patch_provider_api() as api:
userservice = fixtures.create_linked_userservice()
service = userservice.service()
service.usb.value = 'native' # With usb
_publication = userservice.publication()
state = userservice.deploy_for_cache(level=2)
self.assertEqual(state, types.states.TaskState.RUNNING)
# Ensure that in the event of failure, we don't loop forever
for counter in limited_iterator(lambda: state == types.states.TaskState.RUNNING, limit=128):
# this user service expects the machine to be started at some point, so after a few iterations, we set it to started
# note that the user service has a counter for max "recheck" without any success, and if reached, it will fail
if counter == 12:
vm = utils.find_attr_in_list(fixtures.VMS_INFO, 'id', userservice._vmid)
vm.status = ov_types.VMStatus.UP
# Again, machine will be suspended for L2, so we set it to suspended after a few iterations more
if counter == 24:
vm = utils.find_attr_in_list(fixtures.VMS_INFO, 'id', userservice._vmid)
vm.status = ov_types.VMStatus.SUSPENDED
state = userservice.check_state()
# If first item in queue is WAIT, we must "simulate" the wake up
if userservice._queue[0] == Operation.WAIT:
state = userservice.process_ready_from_os_manager(None)
self.assertEqual(state, types.states.TaskState.FINISHED)
self.assertEqual(userservice._name[: len(service.get_basename())], service.get_basename())
self.assertEqual(len(userservice._name), len(service.get_basename()) + service.get_lenname())
# Assarts has an vmid
self.assertTrue(bool(userservice._vmid))
# Assert several deploy api methods has been called, no matter args
# tested on other tests
api.get_storage_info.assert_called()
api.deploy_from_template.assert_called()
api.get_machine_info.assert_called()
api.update_machine_mac.assert_called()
api.fix_usb.assert_called()
api.start_machine.assert_called()

View File

@ -45,7 +45,7 @@ from ...utils.generators import limited_iterator
# We use transactions on some related methods (storage access, etc...)
class TestProxmovLinkedService(UDSTransactionTestCase):
def setUp(self) -> None:
# Set machine state for fixture to started
# Set machine state for fixture to stopped
fixtures.VMS_INFO = [
fixtures.VMS_INFO[i]._replace(status='stopped') for i in range(len(fixtures.VMS_INFO))
]

View File

@ -31,8 +31,10 @@
import logging
import typing
import collections.abc
from unittest import mock
from django.db import models
from uds.core import ui
logger = logging.getLogger(__name__)
@ -161,7 +163,7 @@ class MustBeOfType:
def __repr__(self) -> str:
return self.__str__()
def id_from_list(lst: list[T], attribute: str, value: typing.Any) -> T:
def find_attr_in_list(lst: list[T], attribute: str, value: typing.Any) -> T:
"""
Returns an item from a list of items
"""
@ -169,3 +171,15 @@ def id_from_list(lst: list[T], attribute: str, value: typing.Any) -> T:
if getattr(item, attribute) == value:
return item
raise ValueError(f'Item with id {value} not found in list')
def check_userinterface_values(obj: ui.UserInterface, values: ui.gui.ValuesDictType) -> None:
"""
Checks that a user interface object has the values specified
"""
for k, v in values.items():
if isinstance(v, MustBeOfType):
assert isinstance(getattr(obj, k), v._kind)
elif v == mock.ANY:
pass
else:
assert getattr(obj, k) == v