mirror of
https://github.com/dkmstr/openuds.git
synced 2025-03-11 00:58:39 +03:00
Improving generic services
This commit is contained in:
parent
05c434eb8a
commit
0554089ecc
@ -108,7 +108,7 @@ class LogManager(metaclass=singleton.Singleton):
|
||||
level: int,
|
||||
message: str,
|
||||
source: str,
|
||||
logName: typing.Optional[str] = None,
|
||||
log_name: typing.Optional[str] = None,
|
||||
) -> None:
|
||||
"""
|
||||
Do the logging for the requested object.
|
||||
@ -121,12 +121,12 @@ class LogManager(metaclass=singleton.Singleton):
|
||||
else LogObjectType.SYSLOG
|
||||
)
|
||||
objectId = getattr(db_object, 'id', -1)
|
||||
logName = logName or ''
|
||||
log_name = log_name or ''
|
||||
|
||||
if owner_type is not None:
|
||||
try:
|
||||
self._log(
|
||||
owner_type, objectId, level, message, source, logName
|
||||
owner_type, objectId, level, message, source, log_name
|
||||
)
|
||||
except Exception as e:
|
||||
logger.error('Error logging %s.%s-%s %s: %s (%s)', db_object.__class__, objectId, source, level, message, e)
|
||||
|
@ -20,6 +20,8 @@ from uds.core import services, types
|
||||
from uds.core.types.services import Operation
|
||||
from uds.core.util import autoserializable
|
||||
|
||||
from .. import exceptions
|
||||
|
||||
if typing.TYPE_CHECKING:
|
||||
from .service import DynamicService
|
||||
|
||||
@ -32,7 +34,7 @@ def must_have_vmid(fnc: typing.Callable[[typing.Any], None]) -> typing.Callable[
|
||||
@functools.wraps(fnc)
|
||||
def wrapper(self: 'DynamicPublication') -> None:
|
||||
if self._vmid == '':
|
||||
raise Exception(f'No machine id on {self._name} for {fnc}')
|
||||
raise exceptions.FatalError(f'No machine id on {self._name} for {fnc}')
|
||||
return fnc(self)
|
||||
|
||||
return wrapper
|
||||
@ -113,14 +115,14 @@ class DynamicPublication(services.Publication, autoserializable.AutoSerializable
|
||||
self._reason = reason
|
||||
return types.states.TaskState.ERROR
|
||||
|
||||
def service(self) -> 'DynamicService':
|
||||
return typing.cast('DynamicService', super().service())
|
||||
|
||||
def check_space(self) -> bool:
|
||||
def _retry_later(self) -> types.states.TaskState:
|
||||
"""
|
||||
If the service needs to check space before publication, it should override this method
|
||||
Retries the current operation
|
||||
For this, we insert a NOP that will be consumed instead of the current operationç
|
||||
by the queue runner
|
||||
"""
|
||||
return True
|
||||
self._queue.insert(0, Operation.NOP)
|
||||
return types.states.TaskState.RUNNING
|
||||
|
||||
def _execute_queue(self) -> types.states.TaskState:
|
||||
self._debug('execute_queue')
|
||||
@ -145,10 +147,22 @@ class DynamicPublication(services.Publication, autoserializable.AutoSerializable
|
||||
getattr(self, operation_runner.__name__)()
|
||||
|
||||
return types.states.TaskState.RUNNING
|
||||
except exceptions.RetryableError as e:
|
||||
# This is a retryable error, so we will retry later
|
||||
return self._retry_later()
|
||||
except Exception as e:
|
||||
logger.exception('Unexpected FixedUserService exception: %s', e)
|
||||
return self._error(str(e))
|
||||
|
||||
def service(self) -> 'DynamicService':
|
||||
return typing.cast('DynamicService', super().service())
|
||||
|
||||
def check_space(self) -> bool:
|
||||
"""
|
||||
If the service needs to check space before publication, it should override this method
|
||||
"""
|
||||
return True
|
||||
|
||||
@typing.final
|
||||
def publish(self) -> types.states.TaskState:
|
||||
""" """
|
||||
@ -194,6 +208,11 @@ class DynamicPublication(services.Publication, autoserializable.AutoSerializable
|
||||
return self._execute_queue()
|
||||
|
||||
return state
|
||||
except exceptions.RetryableError as e:
|
||||
# This is a retryable error, so we will retry later
|
||||
# We don not need to push a NOP here, as we will retry the same operation checking again
|
||||
# And it has not been removed from the queue
|
||||
return types.states.TaskState.RUNNING
|
||||
except Exception as e:
|
||||
return self._error(e)
|
||||
|
||||
|
@ -41,6 +41,8 @@ from uds.core.types.services import Operation
|
||||
from uds.core.util import log, autoserializable
|
||||
from uds.core.util.model import sql_stamp_seconds
|
||||
|
||||
from .. import exceptions
|
||||
|
||||
# Not imported at runtime, just for type checking
|
||||
if typing.TYPE_CHECKING:
|
||||
from uds import models
|
||||
@ -55,7 +57,7 @@ def must_have_vmid(fnc: typing.Callable[[typing.Any], None]) -> typing.Callable[
|
||||
@functools.wraps(fnc)
|
||||
def wrapper(self: 'DynamicUserService') -> None:
|
||||
if self._vmid == '':
|
||||
raise Exception(f'No machine id on {self._name} for {fnc}')
|
||||
raise exceptions.FatalError(f'No machine id on {self._name} for {fnc}')
|
||||
return fnc(self)
|
||||
|
||||
return wrapper
|
||||
@ -172,7 +174,7 @@ class DynamicUserService(services.UserService, autoserializable.AutoSerializable
|
||||
self._queue = queue
|
||||
self._queue_has_new_format = True
|
||||
|
||||
def _retry_again(self) -> types.states.TaskState:
|
||||
def _retry_later(self) -> types.states.TaskState:
|
||||
"""
|
||||
Retries the current operation
|
||||
For this, we insert a NOP that will be consumed instead of the current operationç
|
||||
@ -222,6 +224,38 @@ class DynamicUserService(services.UserService, autoserializable.AutoSerializable
|
||||
self._reason = reason
|
||||
return types.states.TaskState.ERROR
|
||||
|
||||
@typing.final
|
||||
def _execute_queue(self) -> types.states.TaskState:
|
||||
self._debug('execute_queue')
|
||||
op = self._current_op()
|
||||
|
||||
if op == Operation.ERROR:
|
||||
return types.states.TaskState.ERROR
|
||||
|
||||
if op == Operation.FINISH:
|
||||
return types.states.TaskState.FINISHED
|
||||
|
||||
try:
|
||||
self._reset_checks_counter() # Reset checks counter
|
||||
|
||||
# For custom operations, we will call the only one method
|
||||
if op.is_custom():
|
||||
self.op_custom(op)
|
||||
else:
|
||||
operation_runner = _EXECUTORS[op]
|
||||
|
||||
# Invoke using instance, we have overrided methods
|
||||
# and we want to use the overrided ones
|
||||
getattr(self, operation_runner.__name__)()
|
||||
|
||||
return types.states.TaskState.RUNNING
|
||||
except exceptions.RetryableError as e:
|
||||
# This is a retryable error, so we will retry later
|
||||
return self._retry_later()
|
||||
except Exception as e:
|
||||
logger.exception('Unexpected FixedUserService exception: %s', e)
|
||||
return self._error(e)
|
||||
|
||||
# Utility overrides for type checking...
|
||||
# Probably, overriden again on child classes
|
||||
def service(self) -> 'service.DynamicService':
|
||||
@ -314,35 +348,6 @@ class DynamicUserService(services.UserService, autoserializable.AutoSerializable
|
||||
|
||||
return types.states.TaskState.FINISHED
|
||||
|
||||
@typing.final
|
||||
def _execute_queue(self) -> types.states.TaskState:
|
||||
self._debug('execute_queue')
|
||||
op = self._current_op()
|
||||
|
||||
if op == Operation.ERROR:
|
||||
return types.states.TaskState.ERROR
|
||||
|
||||
if op == Operation.FINISH:
|
||||
return types.states.TaskState.FINISHED
|
||||
|
||||
try:
|
||||
self._reset_checks_counter() # Reset checks counter
|
||||
|
||||
# For custom operations, we will call the only one method
|
||||
if op.is_custom():
|
||||
self.op_custom(op)
|
||||
else:
|
||||
operation_runner = _EXECUTORS[op]
|
||||
|
||||
# Invoke using instance, we have overrided methods
|
||||
# and we want to use the overrided ones
|
||||
getattr(self, operation_runner.__name__)()
|
||||
|
||||
return types.states.TaskState.RUNNING
|
||||
except Exception as e:
|
||||
logger.exception('Unexpected FixedUserService exception: %s', e)
|
||||
return self._error(e)
|
||||
|
||||
@typing.final
|
||||
def check_state(self) -> types.states.TaskState:
|
||||
"""
|
||||
@ -382,6 +387,11 @@ class DynamicUserService(services.UserService, autoserializable.AutoSerializable
|
||||
return self._execute_queue()
|
||||
|
||||
return state
|
||||
except exceptions.RetryableError as e:
|
||||
# This is a retryable error, so we will retry later
|
||||
# We don not need to push a NOP here, as we will retry the same operation checking again
|
||||
# And it has not been removed from the queue
|
||||
return types.states.TaskState.RUNNING
|
||||
except Exception as e:
|
||||
return self._error(e)
|
||||
|
||||
|
28
server/src/uds/core/services/generics/exceptions.py
Normal file
28
server/src/uds/core/services/generics/exceptions.py
Normal file
@ -0,0 +1,28 @@
|
||||
from uds.core import exceptions as core_exceptions
|
||||
|
||||
|
||||
class RetryableError(core_exceptions.UDSException):
|
||||
"""
|
||||
Exception that is raised when an error is detected that can be retried
|
||||
"""
|
||||
|
||||
def __init__(self, message: str):
|
||||
super().__init__(message)
|
||||
|
||||
|
||||
class FatalError(core_exceptions.UDSException):
|
||||
"""
|
||||
Exception that is raised when an error is detected that can't be retried
|
||||
"""
|
||||
|
||||
def __init__(self, message: str):
|
||||
super().__init__(message)
|
||||
|
||||
|
||||
class NotFoundError(core_exceptions.UDSException):
|
||||
"""
|
||||
Exception that is raised when an object is not found
|
||||
"""
|
||||
|
||||
def __init__(self, message: str):
|
||||
super().__init__(message)
|
@ -39,6 +39,8 @@ from uds.core import consts, services, types
|
||||
from uds.core.types.services import Operation
|
||||
from uds.core.util import log, autoserializable
|
||||
|
||||
from .. import exceptions
|
||||
|
||||
# Not imported at runtime, just for type checking
|
||||
if typing.TYPE_CHECKING:
|
||||
from uds import models
|
||||
@ -106,7 +108,7 @@ class FixedUserService(services.UserService, autoserializable.AutoSerializable,
|
||||
return None
|
||||
|
||||
@typing.final
|
||||
def _retry_later(self) -> str:
|
||||
def _retry_later(self) -> types.states.TaskState:
|
||||
self._queue.insert(0, Operation.NOP)
|
||||
return types.states.TaskState.RUNNING
|
||||
|
||||
@ -139,6 +141,34 @@ class FixedUserService(services.UserService, autoserializable.AutoSerializable,
|
||||
self._reason = reason
|
||||
return types.states.TaskState.ERROR
|
||||
|
||||
@typing.final
|
||||
def _execute_queue(self) -> types.states.TaskState:
|
||||
self._debug('executeQueue')
|
||||
op = self._current_op()
|
||||
|
||||
if op == Operation.ERROR:
|
||||
return types.states.TaskState.ERROR
|
||||
|
||||
if op == Operation.FINISH:
|
||||
return types.states.TaskState.FINISHED
|
||||
|
||||
try:
|
||||
self._reset_checks_counter() # Reset checks counter
|
||||
|
||||
operation_runner = _EXECUTORS[op]
|
||||
|
||||
# Invoke using instance, we have overrided methods
|
||||
# and we want to use the overrided ones
|
||||
getattr(self, operation_runner.__name__)()
|
||||
|
||||
return types.states.TaskState.RUNNING
|
||||
except exceptions.RetryableError as e:
|
||||
# This is a retryable error, so we will retry later
|
||||
return self._retry_later()
|
||||
except Exception as e:
|
||||
logger.exception('Unexpected FixedUserService exception: %s', e)
|
||||
return self._error(str(e))
|
||||
|
||||
# Utility overrides for type checking...
|
||||
# Probably, overriden again on child classes
|
||||
def service(self) -> 'service.FixedService':
|
||||
@ -161,6 +191,9 @@ class FixedUserService(services.UserService, autoserializable.AutoSerializable,
|
||||
try:
|
||||
if self._vmid:
|
||||
return self.service().get_guest_ip_address(self._vmid)
|
||||
except exceptions.NotFoundError:
|
||||
self.do_log(log.LogLevel.ERROR, f'Machine not found: {self._vmid}::{self._name}')
|
||||
|
||||
except Exception:
|
||||
pass
|
||||
return ''
|
||||
@ -193,6 +226,8 @@ class FixedUserService(services.UserService, autoserializable.AutoSerializable,
|
||||
self._queue = [Operation.FINISH]
|
||||
else:
|
||||
self._queue = [Operation.START, Operation.START_COMPLETED, Operation.FINISH]
|
||||
except exceptions.NotFoundError:
|
||||
return self._error('Machine not found')
|
||||
except Exception as e:
|
||||
return self._error(f'Error on setReady: {e}')
|
||||
return self._execute_queue()
|
||||
@ -204,31 +239,6 @@ class FixedUserService(services.UserService, autoserializable.AutoSerializable,
|
||||
self._queue = FixedUserService._assign_queue.copy() # copy is needed to avoid modifying class var
|
||||
return self._execute_queue()
|
||||
|
||||
@typing.final
|
||||
def _execute_queue(self) -> types.states.TaskState:
|
||||
self._debug('executeQueue')
|
||||
op = self._current_op()
|
||||
|
||||
if op == Operation.ERROR:
|
||||
return types.states.TaskState.ERROR
|
||||
|
||||
if op == Operation.FINISH:
|
||||
return types.states.TaskState.FINISHED
|
||||
|
||||
try:
|
||||
self._reset_checks_counter() # Reset checks counter
|
||||
|
||||
operation_runner = _EXECUTORS[op]
|
||||
|
||||
# Invoke using instance, we have overrided methods
|
||||
# and we want to use the overrided ones
|
||||
getattr(self, operation_runner.__name__)()
|
||||
|
||||
return types.states.TaskState.RUNNING
|
||||
except Exception as e:
|
||||
logger.exception('Unexpected FixedUserService exception: %s', e)
|
||||
return self._error(str(e))
|
||||
|
||||
@typing.final
|
||||
def check_state(self) -> types.states.TaskState:
|
||||
"""
|
||||
@ -262,6 +272,13 @@ class FixedUserService(services.UserService, autoserializable.AutoSerializable,
|
||||
return self._execute_queue()
|
||||
|
||||
return state
|
||||
except exceptions.RetryableError as e:
|
||||
# This is a retryable error, so we will retry later
|
||||
# We don not need to push a NOP here, as we will retry the same operation checking again
|
||||
# And it has not been removed from the queue
|
||||
return types.states.TaskState.RUNNING
|
||||
except exceptions.NotFoundError as e:
|
||||
return self._error(f'Machine not found ({e})')
|
||||
except Exception as e:
|
||||
logger.exception('Unexpected UserService check exception: %s', e)
|
||||
return self._error(str(e))
|
||||
@ -269,7 +286,7 @@ class FixedUserService(services.UserService, autoserializable.AutoSerializable,
|
||||
@typing.final
|
||||
def op_nop(self) -> None:
|
||||
"""
|
||||
Executes opWait, it simply waits something "external" to end
|
||||
Does nothing
|
||||
"""
|
||||
pass
|
||||
|
||||
|
@ -179,16 +179,16 @@ def log_use(
|
||||
|
||||
|
||||
def log(
|
||||
wichObject: typing.Optional['Model'],
|
||||
db_object: typing.Optional['Model'],
|
||||
level: LogLevel,
|
||||
message: str,
|
||||
source: LogSource = LogSource.UNKNOWN,
|
||||
logName: typing.Optional[str] = None,
|
||||
log_name: typing.Optional[str] = None,
|
||||
) -> None:
|
||||
# pylint: disable=import-outside-toplevel
|
||||
from uds.core.managers.log import LogManager
|
||||
|
||||
LogManager.manager().log(wichObject, level, message, source, logName)
|
||||
LogManager.manager().log(db_object, level, message, source, log_name)
|
||||
|
||||
|
||||
def get_logs(wichObject: typing.Optional['Model'], limit: int = -1) -> list[dict[str, typing.Any]]:
|
||||
|
@ -207,7 +207,7 @@ class ProxmoxUserserviceLinked(DynamicUserService):
|
||||
# Set vm mac address now on first interface
|
||||
self.service().provider().set_machine_mac(int(self._vmid), self.get_unique_id())
|
||||
except client.ProxmoxConnectionError:
|
||||
self._retry_again() # Push nop to front of queue, so it is consumed instead of this one
|
||||
self._retry_later() # Push nop to front of queue, so it is consumed instead of this one
|
||||
return
|
||||
except Exception as e:
|
||||
logger.exception('Setting HA and MAC on proxmox')
|
||||
|
Loading…
x
Reference in New Issue
Block a user