1
0
mirror of https://github.com/dkmstr/openuds.git synced 2025-01-03 01:17:56 +03:00

Adding resilence to generics

This commit is contained in:
Adolfo Gómez García 2024-04-09 02:22:31 +02:00
parent 3dedd9b7e0
commit fca36ead57
No known key found for this signature in database
GPG Key ID: DD1ABF20724CDA23
14 changed files with 479 additions and 85 deletions

View File

@ -35,7 +35,7 @@ import time
import typing
from datetime import datetime
from . import actor, auth, cache, calendar, images, net, os, system, ticket, rest
from . import actor, auth, cache, calendar, images, net, os, system, ticket, rest, services
# Date related constants
NEVER: typing.Final[datetime] = datetime(1972, 7, 1)

View File

@ -0,0 +1,41 @@
# -*- coding: utf-8 -*-
#
# Copyright (c) 2024 Virtual Cable S.L.U.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without modification,
# are permitted provided that the following conditions are met:
#
# * Redistributions of source code must retain the above copyright notice,
# this list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above copyright notice,
# this list of conditions and the following disclaimer in the documentation
# and/or other materials provided with the distribution.
# * Neither the name of Virtual Cable S.L.U. nor the names of its contributors
# may be used to endorse or promote products derived from this software
# without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
# DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
# SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
"""
Author: Adolfo Gómez, dkmaster at dkmon dot com
"""
import typing
SUGGESTED_CHECK_INTERVAL: typing.Final[int] = 10 # In seconds
MAX_RETRIES: typing.Final[int] = 7 * 24 * 60 // SUGGESTED_CHECK_INTERVAL # 7 days
MAX_STATE_CHECKS: typing.Final[int] = 32 # Max number of state checks before giving up
PUB_SUGGESTED_CHECK_INTERVAL: typing.Final[int] = 20 # In seconds
PUB_MAX_RETRIES: typing.Final[int] = 7 * 24 * 60 // PUB_SUGGESTED_CHECK_INTERVAL # 7 days
PUB_MAX_STATE_CHECKS: typing.Final[int] = 7200 // PUB_SUGGESTED_CHECK_INTERVAL # 2 hours for a single state at mos

View File

@ -16,7 +16,7 @@ import time
import typing
from django.utils.translation import gettext as _
from uds.core import services, types
from uds.core import services, types, consts
from uds.core.types.services import Operation
from uds.core.util import autoserializable
@ -42,15 +42,17 @@ def must_have_vmid(fnc: typing.Callable[[typing.Any], None]) -> typing.Callable[
class DynamicPublication(services.Publication, autoserializable.AutoSerializable, abc.ABC):
# Very simmilar to DynamicUserService, but with some differences
suggested_delay = 20 # For publications, we can check every 20 seconds
suggested_delay = consts.services.PUB_SUGGESTED_CHECK_INTERVAL
# Some customization fields
# How many times we will check for a state before giving up
# Publications can take a long time, so we will check it for a long time
# as long as a couple of hours by default with our suggested delay
max_state_checks: typing.ClassVar[int] = 7200 // suggested_delay
# If must wait untill finish queue for destroying the machine
max_state_checks: typing.ClassVar[int] = consts.services.PUB_MAX_STATE_CHECKS
# How many "retries" operation on same state will be allowed before giving up
max_retries: typing.ClassVar[int] = consts.services.MAX_RETRIES
# If must wait until finish queue for destroying the machine
wait_until_finish_to_destroy: typing.ClassVar[bool] = False
_name = autoserializable.StringField(default='')
@ -75,10 +77,12 @@ class DynamicPublication(services.Publication, autoserializable.AutoSerializable
]
# Utility overrides for type checking...
@typing.final
def _reset_checks_counter(self) -> None:
with self.storage.as_dict() as data:
data['exec_count'] = 0
@typing.final
def _inc_checks_counter(self, info: typing.Optional[str] = None) -> typing.Optional[types.states.TaskState]:
with self.storage.as_dict() as data:
count = data.get('exec_count', 0) + 1
@ -87,12 +91,38 @@ class DynamicPublication(services.Publication, autoserializable.AutoSerializable
return self._error(f'Max checks reached on {info or "unknown"}')
return None
@typing.final
def _reset_retries_counter(self) -> None:
with self.storage.as_dict() as data:
data['retries'] = 0
@typing.final
def _inc_retries_counter(self) -> typing.Optional[types.states.TaskState]:
with self.storage.as_dict() as data:
retries = data.get('retries', 0) + 1
data['retries'] = retries
if retries > self.max_retries: # Use self to access class variables, so we can override them on subclasses
return self._error(f'Max retries reached')
return None
@typing.final
def _current_op(self) -> Operation:
if not self._queue:
return Operation.FINISH
return self._queue[0]
@typing.final
def _set_queue(self, queue: list[Operation]) -> None:
"""
Sets the queue of tasks to be executed
Ensures that we mark it as new format
"""
self._queue = queue
@typing.final
def _error(self, reason: typing.Union[str, Exception]) -> types.states.TaskState:
"""
Internal method to set object as error state
@ -115,15 +145,7 @@ class DynamicPublication(services.Publication, autoserializable.AutoSerializable
self._reason = reason
return types.states.TaskState.ERROR
def _retry_later(self) -> types.states.TaskState:
"""
Retries the current operation
For this, we insert a NOP that will be consumed instead of the current operationç
by the queue runner
"""
self._queue.insert(0, Operation.NOP)
return types.states.TaskState.RUNNING
@typing.final
def _execute_queue(self) -> types.states.TaskState:
self._debug('execute_queue')
op = self._current_op()
@ -149,11 +171,26 @@ class DynamicPublication(services.Publication, autoserializable.AutoSerializable
return types.states.TaskState.RUNNING
except exceptions.RetryableError as e:
# This is a retryable error, so we will retry later
return self._retry_later()
return self.retry_later()
except Exception as e:
logger.exception('Unexpected FixedUserService exception: %s', e)
return self._error(str(e))
@typing.final
def retry_later(self) -> types.states.TaskState:
"""
Retries the current operation
For this, we insert a RETRY that will be:
- If used from a "executor" method, will invoke the "retry_checker" method
- If used from a "checker" method, will be consumed, and the operation will be retried
In any case, if we overpass the max retries, we will set the machine to error state
"""
if self._inc_retries_counter() is not None:
return self._error('Max retries reached')
self._queue.insert(0, Operation.RETRY)
return types.states.TaskState.FINISHED
def service(self) -> 'DynamicService':
return typing.cast('DynamicService', super().service())
@ -204,7 +241,10 @@ class DynamicPublication(services.Publication, autoserializable.AutoSerializable
state = getattr(self, operation_checker.__name__)()
if state == types.states.TaskState.FINISHED:
# Remove runing op
self._queue.pop(0)
top_op = self._queue.pop(0) # May have inserted a RETRY, check it
# And reset retries counter
if top_op != Operation.RETRY:
self._reset_retries_counter()
return self._execute_queue()
return state
@ -445,6 +485,13 @@ class DynamicPublication(services.Publication, autoserializable.AutoSerializable
"""
return types.states.TaskState.FINISHED
@typing.final
def op_retry_checker(self) -> types.states.TaskState:
# If max retrieas has beeen reached, error should already have been set
if self._queue[0] == Operation.ERROR:
return types.states.TaskState.ERROR
return types.states.TaskState.FINISHED
def op_destroy_validator_checker(self) -> types.states.TaskState:
"""
This method is called to check if the service is validating the destroy operation
@ -507,6 +554,7 @@ _EXECUTORS: typing.Final[
Operation.WAIT: DynamicPublication.op_unsupported,
Operation.NOP: DynamicPublication.op_nop,
Operation.DESTROY_VALIDATOR: DynamicPublication.op_destroy_validator,
# Retry operation has no executor, look "retry_later" method
}
# Same af before, but for check methods
@ -531,4 +579,6 @@ _CHECKERS: typing.Final[
Operation.WAIT: DynamicPublication.op_unsupported_checker,
Operation.NOP: DynamicPublication.op_nop_checker,
Operation.DESTROY_VALIDATOR: DynamicPublication.op_destroy_validator_checker,
# Retry operation can be inserted by a executor, so it will need a checker
Operation.RETRY: DynamicPublication.op_retry_checker,
}

View File

@ -211,12 +211,9 @@ class DynamicService(services.Service, abc.ABC): # pylint: disable=too-many-pub
def allows_errored_userservice_cleanup(self) -> bool:
"""
Returns if this service can clean errored services. This is used to check if a service can be cleaned
from the stuck cleaner job, for example. By default, this method returns True.
from the stuck cleaner job, for example.
"""
if self.has_field('maintain_on_error'):
return not self.maintain_on_error.value
return True
return not self.should_maintain_on_error()
def try_graceful_shutdown(self) -> bool:
if self.has_field('try_soft_shutdown'):

View File

@ -69,13 +69,15 @@ class DynamicUserService(services.UserService, autoserializable.AutoSerializable
and that will be always the from a "fixed" machine, that is, a machine that is not created.
"""
suggested_delay = 8
suggested_delay = consts.services.SUGGESTED_CHECK_INTERVAL
# Some customization fields
# If ip can be manually overriden, normally True... (set by actor, for example)
can_set_ip: typing.ClassVar[bool] = True
# How many times we will check for a state before giving up
max_state_checks: typing.ClassVar[int] = 20
# How many "retries" operation on same state will be allowed before giving up
max_retries: typing.ClassVar[int] = consts.services.MAX_RETRIES
# If keep_state_sets_error is true, and an error occurs, the machine is set to FINISHED instead of ERROR
keep_state_sets_error: typing.ClassVar[bool] = False
# If must wait untill finish queue for destroying the machine
@ -131,10 +133,12 @@ class DynamicUserService(services.UserService, autoserializable.AutoSerializable
Operation.FINISH,
]
@typing.final
def _reset_checks_counter(self) -> None:
with self.storage.as_dict() as data:
data['exec_count'] = 0
@typing.final
def _inc_checks_counter(self, info: typing.Optional[str] = None) -> typing.Optional[types.states.TaskState]:
with self.storage.as_dict() as data:
count = data.get('exec_count', 0) + 1
@ -143,6 +147,23 @@ class DynamicUserService(services.UserService, autoserializable.AutoSerializable
return self._error(f'Max checks reached on {info or "unknown"}')
return None
@typing.final
def _reset_retries_counter(self) -> None:
with self.storage.as_dict() as data:
data['retries'] = 0
@typing.final
def _inc_retries_counter(self) -> typing.Optional[types.states.TaskState]:
with self.storage.as_dict() as data:
retries = data.get('retries', 0) + 1
data['retries'] = retries
if retries > self.max_retries: # get "own class" max retries
return self._error(f'Max retries reached')
return None
@typing.final
def _current_op(self) -> Operation:
"""
Get the current operation from the queue
@ -163,6 +184,7 @@ class DynamicUserService(services.UserService, autoserializable.AutoSerializable
return self._queue[0]
@typing.final
def _set_queue(self, queue: list[Operation]) -> None:
"""
Sets the queue of tasks to be executed
@ -171,15 +193,7 @@ class DynamicUserService(services.UserService, autoserializable.AutoSerializable
self._queue = queue
self._queue_has_new_format = True
def _retry_later(self) -> types.states.TaskState:
"""
Retries the current operation
For this, we insert a NOP that will be consumed instead of the current operationç
by the queue runner
"""
self._queue.insert(0, Operation.NOP)
return types.states.TaskState.RUNNING
@typing.final
def _generate_name(self) -> str:
"""
Can be overriden. Generates a unique name for the machine.
@ -248,11 +262,26 @@ class DynamicUserService(services.UserService, autoserializable.AutoSerializable
return types.states.TaskState.RUNNING
except exceptions.RetryableError as e:
# This is a retryable error, so we will retry later
return self._retry_later()
return self.retry_later()
except Exception as e:
logger.exception('Unexpected FixedUserService exception: %s', e)
return self._error(e)
@typing.final
def retry_later(self) -> types.states.TaskState:
"""
Retries the current operation
For this, we insert a RETRY that will be:
- If used from a "executor" method, will invoke the "retry_checker" method
- If used from a "checker" method, will be consumed, and the operation will be retried
In any case, if we overpass the max retries, we will set the machine to error state
"""
if self._inc_retries_counter() is not None:
return self._error('Max retries reached')
self._queue.insert(0, Operation.RETRY)
return types.states.TaskState.FINISHED
# Utility overrides for type checking...
# Probably, overriden again on child classes
def service(self) -> 'service.DynamicService':
@ -380,7 +409,9 @@ class DynamicUserService(services.UserService, autoserializable.AutoSerializable
if state == types.states.TaskState.FINISHED:
# Remove finished operation from queue
self._queue.pop(0)
top_op = self._queue.pop(0)
if top_op != Operation.RETRY:
self._reset_retries_counter()
return self._execute_queue()
return state
@ -553,7 +584,7 @@ class DynamicUserService(services.UserService, autoserializable.AutoSerializable
This does nothing, as it's a NOP operation
"""
pass
def op_destroy_validator(self) -> None:
"""
This method is called to check if the userservice has an vmid to stop destroying it if needed
@ -706,11 +737,19 @@ class DynamicUserService(services.UserService, autoserializable.AutoSerializable
"""
return types.states.TaskState.RUNNING
@typing.final
def op_nop_checker(self) -> types.states.TaskState:
"""
This method is called to check if the service is doing nothing
"""
return types.states.TaskState.FINISHED
@typing.final
def op_retry_checker(self) -> types.states.TaskState:
# If max retrieas has beeen reached, error should already have been set
if self._queue[0] == Operation.ERROR:
return types.states.TaskState.ERROR
return types.states.TaskState.FINISHED
def op_destroy_validator_checker(self) -> types.states.TaskState:
"""
@ -767,6 +806,7 @@ _EXECUTORS: typing.Final[
Operation.WAIT: DynamicUserService.op_wait,
Operation.NOP: DynamicUserService.op_nop,
Operation.DESTROY_VALIDATOR: DynamicUserService.op_destroy_validator,
# Retry operation has no executor, look "retry_later" method
}
# Same af before, but for check methods
@ -791,4 +831,6 @@ _CHECKERS: typing.Final[
Operation.WAIT: DynamicUserService.op_wait_checker,
Operation.NOP: DynamicUserService.op_nop_checker,
Operation.DESTROY_VALIDATOR: DynamicUserService.op_destroy_validator_checker,
# Retry operation can be inserted by a executor, so it will need a checker
Operation.RETRY: DynamicUserService.op_retry_checker,
}

View File

@ -168,7 +168,7 @@ class FixedService(services.Service, abc.ABC): # pylint: disable=too-many-publi
Creates a snapshot for the machine
"""
return
def snapshot_recovery(self, userservice_instance: 'FixedUserService') -> None:
"""
Removes the snapshot for the machine
@ -205,7 +205,7 @@ class FixedService(services.Service, abc.ABC): # pylint: disable=too-many-publi
except Exception as e:
logger.error('Error processing remove and free: %s', e)
raise Exception(f'Error processing remove and free: {e} on {vmid}') from e
def is_ready(self, vmid: str) -> bool:
"""
Returns if the machine is ready for usage
@ -259,6 +259,13 @@ class FixedService(services.Service, abc.ABC): # pylint: disable=too-many-publi
return machines.as_list()
def allows_errored_userservice_cleanup(self) -> bool:
"""
Returns if this service can clean errored services. This is used to check if a service can be cleaned
from the stuck cleaner job, for example.
"""
return not self.should_maintain_on_error()
def should_maintain_on_error(self) -> bool:
if self.has_field('maintain_on_error'): # If has been defined on own class...
return self.maintain_on_error.value

View File

@ -58,6 +58,8 @@ class FixedUserService(services.UserService, autoserializable.AutoSerializable,
suggested_delay = 8
# How many times we will check for a state before giving up
max_state_checks: typing.ClassVar[int] = 20
# How many "retries" operation on same state will be allowed before giving up
max_retries: typing.ClassVar[int] = consts.services.MAX_RETRIES
_name = autoserializable.StringField(default='')
_mac = autoserializable.StringField(default='')
@ -95,10 +97,20 @@ class FixedUserService(services.UserService, autoserializable.AutoSerializable,
return self._queue[0]
@typing.final
def _set_queue(self, queue: list[Operation]) -> None:
"""
Sets the queue of tasks to be executed
Ensures that we mark it as new format
"""
self._queue = queue
@typing.final
def _reset_checks_counter(self) -> None:
with self.storage.as_dict() as data:
data['exec_count'] = 0
@typing.final
def _inc_checks_counter(self, info: typing.Optional[str] = None) -> typing.Optional[types.states.TaskState]:
with self.storage.as_dict() as data:
count = data.get('exec_count', 0) + 1
@ -108,9 +120,20 @@ class FixedUserService(services.UserService, autoserializable.AutoSerializable,
return None
@typing.final
def _retry_later(self) -> types.states.TaskState:
self._queue.insert(0, Operation.NOP)
return types.states.TaskState.RUNNING
def _reset_retries_counter(self) -> None:
with self.storage.as_dict() as data:
data['retries'] = 0
@typing.final
def _inc_retries_counter(self) -> typing.Optional[types.states.TaskState]:
with self.storage.as_dict() as data:
retries = data.get('retries', 0) + 1
data['retries'] = retries
if retries > self.max_retries: # get "own class" max retries
return self._error(f'Max retries reached')
return None
@typing.final
def _error(self, reason: typing.Union[str, Exception]) -> types.states.TaskState:
@ -164,11 +187,25 @@ class FixedUserService(services.UserService, autoserializable.AutoSerializable,
return types.states.TaskState.RUNNING
except exceptions.RetryableError as e:
# This is a retryable error, so we will retry later
return self._retry_later()
return self.retry_later()
except Exception as e:
logger.exception('Unexpected FixedUserService exception: %s', e)
return self._error(str(e))
@typing.final
def retry_later(self) -> types.states.TaskState:
"""
Retries the current operation
For this, we insert a RETRY that will be:
- If used from a "executor" method, will invoke the "retry_checker" method
- If used from a "checker" method, will be consumed, and the operation will be retried
In any case, if we overpass the max retries, we will set the machine to error state
"""
if self._inc_retries_counter() is not None:
return self._error('Max retries reached')
self._queue.insert(0, Operation.RETRY)
return types.states.TaskState.FINISHED
# Utility overrides for type checking...
# Probably, overriden again on child classes
def service(self) -> 'service.FixedService':
@ -193,7 +230,7 @@ class FixedUserService(services.UserService, autoserializable.AutoSerializable,
return self.service().get_guest_ip_address(self._vmid)
except exceptions.NotFoundError:
self.do_log(log.LogLevel.ERROR, f'Machine not found: {self._vmid}::{self._name}')
except Exception:
pass
return ''
@ -268,7 +305,10 @@ class FixedUserService(services.UserService, autoserializable.AutoSerializable,
state = typing.cast(types.states.TaskState, getattr(self, check_function.__name__)())
if state == types.states.TaskState.FINISHED:
self._queue.pop(0) # Remove finished op
top_op = self._queue.pop(0) # Remove finished op
# And reset retries counter, if needed
if top_op != Operation.RETRY:
self._reset_retries_counter()
return self._execute_queue()
return state
@ -355,6 +395,13 @@ class FixedUserService(services.UserService, autoserializable.AutoSerializable,
def op_nop_checker(self) -> types.states.TaskState:
return types.states.TaskState.FINISHED
@typing.final
def op_retry_checker(self) -> types.states.TaskState:
# If max retrieas has beeen reached, error should already have been set
if self._queue[0] == Operation.ERROR:
return types.states.TaskState.ERROR
return types.states.TaskState.FINISHED
def op_start(self) -> None:
"""
Override this method to start the machine if needed
@ -464,13 +511,13 @@ _EXECUTORS: typing.Final[
Operation.CREATE: FixedUserService.op_create,
Operation.START: FixedUserService.op_start,
Operation.STOP: FixedUserService.op_stop,
Operation.WAIT: FixedUserService.op_nop, # Fixed assigned services has no cache 2, so no need to wait
Operation.REMOVE: FixedUserService.op_remove,
Operation.SNAPSHOT_CREATE: FixedUserService.op_snapshot_create,
Operation.SNAPSHOT_RECOVER: FixedUserService.op_snapshot_recover,
Operation.PROCESS_TOKEN: FixedUserService.op_process_tocken,
Operation.SHUTDOWN: FixedUserService.op_shutdown,
Operation.NOP: FixedUserService.op_nop,
# Retry operation has no executor, look "retry_later" method
}
# Same af before, but for check methods
@ -478,7 +525,6 @@ _CHECKERS: typing.Final[
collections.abc.Mapping[Operation, collections.abc.Callable[[FixedUserService], types.states.TaskState]]
] = {
Operation.CREATE: FixedUserService.op_create_checker,
Operation.WAIT: FixedUserService.op_nop_checker, # Fixed assigned services has no cache 2, so no need to wait
Operation.START: FixedUserService.op_start_checker,
Operation.STOP: FixedUserService.op_stop_checker,
Operation.REMOVE: FixedUserService.op_removed_checker,
@ -487,4 +533,6 @@ _CHECKERS: typing.Final[
Operation.PROCESS_TOKEN: FixedUserService.op_process_token_checker,
Operation.SHUTDOWN: FixedUserService.op_shutdown_checker,
Operation.NOP: FixedUserService.op_nop_checker,
# Retry operation can be inserted by a executor, so it will need a checker
Operation.RETRY: FixedUserService.op_retry_checker,
}

View File

@ -125,7 +125,7 @@ class Operation(enum.IntEnum):
* Note that we will need to "translate" old values to new ones on the service,
* Adapting existing services to this, will probably need a migration
"""
# Standard operations 1000-1999
INITIALIZE = 1000
CREATE = 1001
CREATE_COMPLETED = 1002
@ -142,13 +142,21 @@ class Operation(enum.IntEnum):
REMOVE = 1013
REMOVE_COMPLETED = 1014
WAIT = 1100
WAIT = 1100 # This is a "wait" operation, used to wait for something to happen
NOP = 1101
RETRY = 1102 # Do not have executors, inserted to retry operation and recognize it
# Custom validations
DESTROY_VALIDATOR = 1102 # Check if the userservice has an vmid to stop destroying it if needed
# Custom validations 2000-2999
DESTROY_VALIDATOR = 2000 # Check if the userservice has an vmid to stop destroying it if needed
# Final operations
# Specific operations 3000-3999
# for Fixed User Services
SNAPSHOT_CREATE = 3000
SNAPSHOT_RECOVER = 3001
PROCESS_TOKEN = 3002
# Final operations 9000-9999
ERROR = 9000
FINISH = 9900
UNKNOWN = 9999
@ -159,6 +167,8 @@ class Operation(enum.IntEnum):
# So we will translate, for example SNAPSHOT_CREATE to CUSTOM_1, etc..
# Fixed user services does not allows custom operations, we use them
# to alias some fixed operations (like snapshot create, recover, etc..)
# Custom operations 20000-29999
CUSTOM_1 = 20001
CUSTOM_2 = 20002
CUSTOM_3 = 20003
@ -169,11 +179,6 @@ class Operation(enum.IntEnum):
CUSTOM_8 = 20008
CUSTOM_9 = 20009
# Some alias for Fixed Services
SNAPSHOT_CREATE = CUSTOM_7
SNAPSHOT_RECOVER = CUSTOM_8
PROCESS_TOKEN = CUSTOM_9
def is_custom(self) -> bool:
"""
Returns if the operation is a custom one

View File

@ -112,7 +112,7 @@ class ProxmoxUserServiceFixed(FixedUserService, autoserializable.AutoSerializabl
try:
vminfo = self.service().get_machine_info(int(self._vmid))
except client.ProxmoxConnectionError:
self._retry_later()
self.retry_later()
return
except Exception as e:
raise Exception('Machine not found on start machine') from e

View File

@ -207,7 +207,7 @@ class ProxmoxUserserviceLinked(DynamicUserService):
# Set vm mac address now on first interface
self.service().provider().set_machine_mac(int(self._vmid), self.get_unique_id())
except client.ProxmoxConnectionError:
self._retry_later() # Push nop to front of queue, so it is consumed instead of this one
self.retry_later() # Push nop to front of queue, so it is consumed instead of this one
return
except Exception as e:
logger.exception('Setting HA and MAC on proxmox')

View File

@ -65,6 +65,23 @@ class FixedTestingUserService(fixed_userservice.FixedUserService):
self.mock.op_stop_checker()
return types.states.TaskState.FINISHED
# Exception raiser for tests
def op_custom(self, operation: types.services.Operation) -> None:
if operation == types.services.Operation.CUSTOM_1:
raise Exception('CUSTOM_1')
if operation == types.services.Operation.CUSTOM_3:
self.retry_later() # In this case, will not return it, but should work fine
def op_custom_checker(self, operation: dynamic_userservice.Operation) -> types.states.TaskState:
if operation == types.services.Operation.CUSTOM_1:
raise Exception('CUSTOM_1')
# custom 2 will be for testing retry_later
if operation == types.services.Operation.CUSTOM_2:
return self.retry_later()
return types.states.TaskState.FINISHED
def db_obj(self) -> typing.Any:
self.mock.db_obj()
return None
@ -394,7 +411,7 @@ class DynamicTestingUserServiceQueue(dynamic_userservice.DynamicUserService):
self.mock.wait_checker()
return types.states.TaskState.FINISHED # Ensure we finish right now for wait
def op_nop_checker(self) -> types.states.TaskState:
def op_nop_checker(self) -> types.states.TaskState: # type: ignore # overriding a final method
self.mock.nop_checker()
return types.states.TaskState.FINISHED
@ -427,10 +444,17 @@ class DynamicTestingUserService(dynamic_userservice.DynamicUserService):
def op_custom(self, operation: types.services.Operation) -> None:
if operation == types.services.Operation.CUSTOM_1:
raise Exception('CUSTOM_1')
if operation == types.services.Operation.CUSTOM_3:
self.retry_later() # In this case, will not return it, but should work fine
def op_custom_checker(self, operation: dynamic_userservice.Operation) -> types.states.TaskState:
if operation == types.services.Operation.CUSTOM_1:
raise Exception('CUSTOM_1')
# custom 2 will be for testing retry_later
if operation == types.services.Operation.CUSTOM_2:
return self.retry_later()
return types.states.TaskState.FINISHED
@ -528,6 +552,20 @@ class DynamicTestingPublication(dynamic_publication.DynamicPublication):
def op_create(self) -> None:
self.mock.op_create()
# Exception raiser for tests
def op_custom(self, operation: types.services.Operation) -> None:
self.mock.custom(operation)
if operation == types.services.Operation.CUSTOM_3:
self.retry_later() # In this case, will not return it, but should work fine
def op_custom_checker(self, operation: dynamic_userservice.Operation) -> types.states.TaskState:
self.mock.custom_checker(operation)
# custom 2 will be for testing retry_later
if operation == types.services.Operation.CUSTOM_2:
return self.retry_later()
return types.states.TaskState.FINISHED
class DynamicTestingPublicationQueue(dynamic_publication.DynamicPublication):
mock: 'mock.Mock' = mock.MagicMock()
@ -566,6 +604,8 @@ class DynamicTestingPublicationQueue(dynamic_publication.DynamicPublication):
def op_custom(self, operation: types.services.Operation) -> None:
self.mock.custom(operation)
def op_initialize_checker(self) -> types.states.TaskState:
self.mock.initialize_checker()
return TaskState.FINISHED

View File

@ -35,6 +35,8 @@ from unittest import mock
from unittest.mock import call
from uds.core import types
from ....utils.generators import limited_iterator
from ....utils.test import UDSTestCase
from ....utils import MustBeOfType
@ -142,6 +144,53 @@ class DynamicPublicationTest(UDSTestCase):
# Check that the queue is empty (only ERROR operation)
self.assertEqual(publication._queue, [types.services.Operation.ERROR])
def test_publication_max_retries_executor(self) -> None:
service = fixtures.create_dynamic_service()
publication = fixtures.create_dynamic_publication(service)
publication._queue = [
types.services.Operation.NOP,
types.services.Operation.CUSTOM_3,
types.services.Operation.CUSTOM_3,
types.services.Operation.FINISH,
]
fixtures.DynamicTestingPublication.max_retries = 5
state = types.states.TaskState.RUNNING
counter = 0
for counter in limited_iterator(lambda: state == types.states.TaskState.RUNNING, limit=128):
if counter == 5:
# Replace the first item in queue to NOP, so next check will fail
publication._queue[0] = types.services.Operation.NOP
state = publication.check_state()
self.assertEqual(publication.check_state(), types.states.TaskState.ERROR)
self.assertEqual(publication.error_reason(), 'Max retries reached')
self.assertEqual(counter, 11) # 4 retries + 5 retries after reset + 1 of the reset itself + 1 of initial NOP
def test_publication_max_retries_checker(self) -> None:
service = fixtures.create_dynamic_service()
publication = fixtures.create_dynamic_publication(service)
publication._queue = [
types.services.Operation.CUSTOM_3,
types.services.Operation.CUSTOM_3,
types.services.Operation.FINISH,
]
fixtures.DynamicTestingPublication.max_retries = 5
state = types.states.TaskState.RUNNING
counter = 0
for counter in limited_iterator(lambda: state == types.states.TaskState.RUNNING, limit=128):
if counter == 4:
# Replace the first item in queue to NOP, so next check will fail
publication._queue[0] = types.services.Operation.NOP
state = publication.check_state()
self.assertEqual(publication.check_state(), types.states.TaskState.ERROR)
self.assertEqual(publication.error_reason(), 'Max retries reached')
self.assertEqual(counter, 10) # 4 retries + 5 retries after reset + 1 of the reset itself
EXPECTED_DEPLOY_ITERATIONS_INFO: typing.Final[list[DynamicPublicationIterationInfo]] = [
# Initial state for queue

View File

@ -156,14 +156,14 @@ class DynamicServiceTest(UDSTestCase):
def test_userservice_removal(self) -> None:
service = fixtures.create_dynamic_service()
userservice = fixtures.create_dynamic_userservice(service)
userservice._vmid = ''
# If no vmid, will stop after first step
state = userservice.destroy()
self.assertEqual(state, types.states.TaskState.RUNNING)
state = userservice.check_state()
self.assertEqual(state, types.states.TaskState.FINISHED)
# With vmid, will go through all the steps
userservice._vmid = 'vmid'
service.machine_running_flag = True
@ -181,67 +181,67 @@ class DynamicServiceTest(UDSTestCase):
service.mock.stop.assert_called_once_with(userservice, userservice._vmid)
service.mock.is_running.assert_called_once_with(userservice, userservice._vmid)
service.mock.remove.assert_called_once_with(userservice, userservice._vmid)
def test_userservice_maintain_on_error_no_created(self) -> None:
service = fixtures.create_dynamic_service(maintain_on_error=True)
userservice = fixtures.create_dynamic_userservice(service)
self.assertFalse(service.allows_errored_userservice_cleanup())
self.assertTrue(service.should_maintain_on_error())
state = userservice.deploy_for_user(models.User())
self.assertEqual(state, types.states.TaskState.RUNNING)
# Force failure
userservice._queue = [types.services.Operation.CUSTOM_1]
self.assertEqual(userservice.check_state(), types.states.TaskState.ERROR)
self.assertEqual(userservice.error_reason(), 'CUSTOM_1')
def test_userservice_maintain_on_error_created(self) -> None:
service = fixtures.create_dynamic_service(maintain_on_error=True)
userservice = fixtures.create_dynamic_userservice(service)
self.assertFalse(service.allows_errored_userservice_cleanup())
self.assertTrue(service.should_maintain_on_error())
state = userservice.deploy_for_user(models.User())
self.assertEqual(state, types.states.TaskState.RUNNING)
# Again, to execute "CREATE"
state = userservice.check_state()
self.assertEqual(state, types.states.TaskState.RUNNING)
self.assertTrue(userservice._vmid != '')
# Now, force failure (will be raise on op_custom_1_checker)
userservice._queue = [types.services.Operation.CUSTOM_1]
# Now, no error should be returned, but finish
self.assertEqual(userservice.check_state(), types.states.TaskState.FINISHED)
self.assertTrue(userservice._error_debug_info != '')
def test_userservice_try_soft_shutdown(self) -> None:
service = fixtures.create_dynamic_service(try_soft_shutdown=True)
userservice = fixtures.create_dynamic_userservice(service)
self.assertTrue(service.try_graceful_shutdown())
# full deploy
state = userservice.deploy_for_user(models.User())
self.assertEqual(state, types.states.TaskState.RUNNING)
for _ in limited_iterator(lambda: state != types.states.TaskState.FINISHED, limit=128):
state = userservice.check_state()
# Now, destroy it. Should call shutdown instead of stop
service.mock.reset_mock()
userservice.mock.reset_mock()
state = userservice.destroy()
for _ in limited_iterator(lambda: state != types.states.TaskState.FINISHED, limit=128):
state = userservice.check_state()
self.assertEqual(state, types.states.TaskState.FINISHED)
service.mock.shutdown.assert_called_once_with(userservice, userservice._vmid)
def test_service_set_ready(self) -> None:
def test_userservice_set_ready(self) -> None:
service = fixtures.create_dynamic_service()
userservice = fixtures.create_dynamic_userservice(service)
# full deploy
state = userservice.deploy_for_user(models.User())
self.assertEqual(state, types.states.TaskState.RUNNING)
@ -253,7 +253,53 @@ class DynamicServiceTest(UDSTestCase):
self.assertEqual(userservice.set_ready(), types.states.TaskState.FINISHED)
# is_ready should have been called
service.mock.is_running.assert_called_once()
def test_userservice_max_retries_executor(self) -> None:
service = fixtures.create_dynamic_service()
userservice = fixtures.create_dynamic_userservice(service)
userservice._queue = [
types.services.Operation.NOP,
types.services.Operation.CUSTOM_3,
types.services.Operation.CUSTOM_3,
types.services.Operation.FINISH,
]
fixtures.DynamicTestingUserService.max_retries = 5
state = types.states.TaskState.RUNNING
counter = 0
for counter in limited_iterator(lambda: state == types.states.TaskState.RUNNING, limit=128):
if counter == 5:
# Replace the first item in queue to NOP, so next check will fail
userservice._queue[0] = types.services.Operation.NOP
state = userservice.check_state()
self.assertEqual(userservice.check_state(), types.states.TaskState.ERROR)
self.assertEqual(userservice.error_reason(), 'Max retries reached')
self.assertEqual(counter, 11) # 4 retries + 5 retries after reset + 1 of the reset itself + 1 of initial NOP
def test_userservice_max_retries_checker(self) -> None:
service = fixtures.create_dynamic_service()
userservice = fixtures.create_dynamic_userservice(service)
userservice._queue = [
types.services.Operation.CUSTOM_2,
types.services.Operation.CUSTOM_2,
types.services.Operation.FINISH,
]
fixtures.DynamicTestingUserService.max_retries = 5
state = types.states.TaskState.RUNNING
counter = 0
for counter in limited_iterator(lambda: state == types.states.TaskState.RUNNING, limit=128):
if counter == 4:
# Replace the first item in queue to NOP, so next check will fail
userservice._queue[0] = types.services.Operation.NOP
state = userservice.check_state()
self.assertEqual(userservice.check_state(), types.states.TaskState.ERROR)
self.assertEqual(userservice.error_reason(), 'Max retries reached')
self.assertEqual(counter, 10) # 4 retries + 5 retries after reset + 1 of the reset itself
EXPECTED_DEPLOY_ITERATIONS_INFO: typing.Final[list[DynamicServiceIterationInfo]] = [
@ -275,9 +321,7 @@ EXPECTED_DEPLOY_ITERATIONS_INFO: typing.Final[list[DynamicServiceIterationInfo]]
DynamicServiceIterationInfo( # 4, START
queue=fixtures.ALL_TESTEABLE_OPERATIONS[3:],
user_service_calls=[call.create_completed_checker()],
service_calls=[
call.start(MustBeOfType(fixtures.DynamicTestingUserServiceQueue), MustBeOfType(str))
],
service_calls=[call.start(MustBeOfType(fixtures.DynamicTestingUserServiceQueue), MustBeOfType(str))],
),
DynamicServiceIterationInfo( # 5, START_COMPLETED
queue=fixtures.ALL_TESTEABLE_OPERATIONS[4:],
@ -289,9 +333,7 @@ EXPECTED_DEPLOY_ITERATIONS_INFO: typing.Final[list[DynamicServiceIterationInfo]]
DynamicServiceIterationInfo( # 6, STOP
queue=fixtures.ALL_TESTEABLE_OPERATIONS[5:],
user_service_calls=[call.start_completed_checker()],
service_calls=[
call.stop(MustBeOfType(fixtures.DynamicTestingUserServiceQueue), MustBeOfType(str))
],
service_calls=[call.stop(MustBeOfType(fixtures.DynamicTestingUserServiceQueue), MustBeOfType(str))],
),
DynamicServiceIterationInfo( # 7, STOP_COMPLETED
queue=fixtures.ALL_TESTEABLE_OPERATIONS[6:],
@ -337,9 +379,7 @@ EXPECTED_DEPLOY_ITERATIONS_INFO: typing.Final[list[DynamicServiceIterationInfo]]
DynamicServiceIterationInfo( # 14, REMOVE
queue=fixtures.ALL_TESTEABLE_OPERATIONS[13:],
user_service_calls=[call.reset_completed_checker()],
service_calls=[
call.remove(MustBeOfType(fixtures.DynamicTestingUserServiceQueue), MustBeOfType(str))
],
service_calls=[call.remove(MustBeOfType(fixtures.DynamicTestingUserServiceQueue), MustBeOfType(str))],
),
DynamicServiceIterationInfo( # 15, REMOVE_COMPLETED
queue=fixtures.ALL_TESTEABLE_OPERATIONS[14:],

View File

@ -39,6 +39,7 @@ from uds.core.services.generics.fixed import (
userservice,
)
from ....utils.test import UDSTestCase
from ....utils.generators import limited_iterator
from . import fixtures
@ -283,4 +284,78 @@ class FixedServiceTest(UDSTestCase):
else:
self.assertEqual(userservice.set_ready(), types.states.TaskState.FINISHED)
def test_userservice_maintain_on_error_no_created(self) -> None:
_prov, service, userservice = self.create_elements()
service.maintain_on_error.value = True
self.assertFalse(service.allows_errored_userservice_cleanup())
self.assertTrue(service.should_maintain_on_error())
# Force failure
# patch userservice op_start_checker to raise an exception
with mock.patch.object(userservice, 'op_start_checker', side_effect=Exception('Error')):
userservice._queue = [types.services.Operation.START]
userservice._vmid = ''
self.assertEqual(userservice.check_state(), types.states.TaskState.ERROR)
self.assertEqual(userservice.error_reason(), 'Error')
def test_userservice_maintain_on_error_created(self) -> None:
_prov, service, userservice = self.create_elements()
service.maintain_on_error.value = True
self.assertFalse(service.allows_errored_userservice_cleanup())
self.assertTrue(service.should_maintain_on_error())
# Force failure
# patch userservice op_start_checker to raise an exception, but with a vmid
with mock.patch.object(userservice, 'op_start_checker', side_effect=Exception('Error')):
userservice._queue = [types.services.Operation.START]
userservice._vmid = 'vmid'
self.assertEqual(userservice.check_state(), types.states.TaskState.FINISHED)
def test_userservice_max_retries_executor(self) -> None:
_prov, _service, userservice = self.create_elements()
# Patch userservice op_start to call retry_later()
with mock.patch.object(userservice, 'op_start', side_effect=userservice.retry_later):
userservice._queue = [
types.services.Operation.NOP,
types.services.Operation.START,
types.services.Operation.START,
types.services.Operation.FINISH,
]
fixtures.FixedTestingUserService.max_retries = 5
state = types.states.TaskState.RUNNING
counter = 0
for counter in limited_iterator(lambda: state == types.states.TaskState.RUNNING, limit=128):
if counter == 5:
# Replace the first item in queue to NOP, so next check will fail
userservice._queue[0] = types.services.Operation.NOP
state = userservice.check_state()
self.assertEqual(userservice.check_state(), types.states.TaskState.ERROR)
self.assertEqual(userservice.error_reason(), 'Max retries reached')
self.assertEqual(counter, 11) # 4 retries + 5 retries after reset + 1 of the reset itself + 1 of initial NOP
def test_userservice_max_retries_checker(self) -> None:
_prov, _service, userservice = self.create_elements()
# Patch userservice op_start to call retry_later()
with mock.patch.object(userservice, 'op_start_checker', side_effect=userservice.retry_later):
userservice._queue = [
types.services.Operation.START,
types.services.Operation.START,
types.services.Operation.FINISH,
]
fixtures.FixedTestingUserService.max_retries = 5
state = types.states.TaskState.RUNNING
counter = 0
for counter in limited_iterator(lambda: state == types.states.TaskState.RUNNING, limit=128):
if counter == 4:
# Replace the first item in queue to NOP, so next check will fail
userservice._queue[0] = types.services.Operation.NOP
state = userservice.check_state()
self.assertEqual(userservice.check_state(), types.states.TaskState.ERROR)
self.assertIsInstance(userservice.error_reason(), str)
self.assertEqual(counter, 10) # 4 retries + 5 retries after reset + 1 of the reset itself