mirror of
https://github.com/dkmstr/openuds.git
synced 2025-01-13 13:17:54 +03:00
updated tests and more refactoring
This commit is contained in:
parent
494e24691b
commit
2bd4d4408c
@ -452,7 +452,7 @@ class Initialize(ActorV3Action):
|
||||
|
||||
# Managed by UDS, get initialization data from osmanager and return it
|
||||
# Set last seen actor version
|
||||
userService.setActorVersion(self._params['version'])
|
||||
userService.actor_version = self._params['version']
|
||||
osData: collections.abc.MutableMapping[str, typing.Any] = {}
|
||||
osManager = userService.getOsManagerInstance()
|
||||
if osManager:
|
||||
@ -494,7 +494,7 @@ class BaseReadyChange(ActorV3Action):
|
||||
logger.debug('Args: %s, Params: %s', self._args, self._params)
|
||||
userService = self.getUserService()
|
||||
# Stores known IP and notifies it to deployment
|
||||
userService.logIP(self._params['ip'])
|
||||
userService.log_ip(self._params['ip'])
|
||||
userServiceInstance = userService.get_instance()
|
||||
userServiceInstance.setIp(self._params['ip'])
|
||||
userService.updateData(userServiceInstance)
|
||||
@ -575,8 +575,8 @@ class Version(ActorV3Action):
|
||||
def action(self) -> dict[str, typing.Any]:
|
||||
logger.debug('Version Args: %s, Params: %s', self._args, self._params)
|
||||
userService = self.getUserService()
|
||||
userService.setActorVersion(self._params['version'])
|
||||
userService.logIP(self._params['ip'])
|
||||
userService.actor_version = self._params['version']
|
||||
userService.log_ip(self._params['ip'])
|
||||
|
||||
return ActorV3Action.actorResult()
|
||||
|
||||
@ -707,7 +707,7 @@ class Log(ActorV3Action):
|
||||
def action(self) -> dict[str, typing.Any]:
|
||||
logger.debug('Args: %s, Params: %s', self._args, self._params)
|
||||
userService = self.getUserService()
|
||||
if userService.getActorVersion() < '4.0.0':
|
||||
if userService.actor_version < '4.0.0':
|
||||
# Adjust loglevel to own, we start on 10000 for OTHER, and received is 0 for OTHER
|
||||
level = log.LogLevel.fromInt(int(self._params['level']) + 10000)
|
||||
else:
|
||||
|
@ -44,7 +44,7 @@ if typing.TYPE_CHECKING:
|
||||
|
||||
|
||||
class JobsFactory(factory.Factory['Job']):
|
||||
def ensureJobsInDatabase(self) -> None:
|
||||
def ensure_jobs_registered(self) -> None:
|
||||
"""
|
||||
Ensures that uds core workers are correctly registered in database and in factory
|
||||
"""
|
||||
@ -55,6 +55,8 @@ class JobsFactory(factory.Factory['Job']):
|
||||
|
||||
try:
|
||||
logger.debug('Ensuring that jobs are registered inside database')
|
||||
# Ensure workers are initialized
|
||||
# That is, dynamic load of packages and registration of jobs on manager
|
||||
workers.initialize()
|
||||
for name, type_ in self.objects().items():
|
||||
try:
|
||||
|
@ -61,21 +61,21 @@ class JobThread(threading.Thread):
|
||||
Ensures that the scheduler db entry is released after run
|
||||
"""
|
||||
|
||||
_jobInstance: 'Job'
|
||||
_dbJobId: int
|
||||
_job_instance: 'Job'
|
||||
_db_job_id: int
|
||||
_freq: int
|
||||
|
||||
def __init__(self, jobInstance: 'Job', dbJob: DBScheduler) -> None:
|
||||
super().__init__()
|
||||
self._jobInstance = jobInstance
|
||||
self._dbJobId = dbJob.id
|
||||
self._job_instance = jobInstance
|
||||
self._db_job_id = dbJob.id
|
||||
self._freq = dbJob.frecuency
|
||||
|
||||
def run(self) -> None:
|
||||
try:
|
||||
self._jobInstance.execute()
|
||||
self._job_instance.execute()
|
||||
except Exception:
|
||||
logger.warning("Exception executing job %s", self._dbJobId)
|
||||
logger.warning("Exception executing job %s", self._db_job_id)
|
||||
finally:
|
||||
self.jobDone()
|
||||
|
||||
@ -86,7 +86,7 @@ class JobThread(threading.Thread):
|
||||
done = False
|
||||
while done is False:
|
||||
try:
|
||||
self.__updateDb()
|
||||
self._update_db_record()
|
||||
done = True
|
||||
except Exception:
|
||||
# Databases locked, maybe because we are on a multitask environment, let's try again in a while
|
||||
@ -100,12 +100,12 @@ class JobThread(threading.Thread):
|
||||
# Ensures DB connection is released after job is done
|
||||
connections['default'].close()
|
||||
|
||||
def __updateDb(self) -> None:
|
||||
def _update_db_record(self) -> None:
|
||||
"""
|
||||
Atomically updates the scheduler db to "release" this job
|
||||
"""
|
||||
with transaction.atomic():
|
||||
DBScheduler.objects.select_for_update().filter(id=self._dbJobId).update(
|
||||
DBScheduler.objects.select_for_update().filter(id=self._db_job_id).update(
|
||||
state=State.FOR_EXECUTE,
|
||||
owner_server='',
|
||||
next_execution=sql_datetime() + timedelta(seconds=self._freq),
|
||||
@ -117,7 +117,8 @@ class Scheduler:
|
||||
Class responsible of maintain/execute scheduled jobs
|
||||
"""
|
||||
|
||||
granularity = 2 # We check for cron jobs every THIS seconds
|
||||
# We check for scheduled operations every THIS seconds
|
||||
granularity: typing.Final[int] = 2
|
||||
|
||||
# to keep singleton Scheduler
|
||||
_scheduler: typing.Optional['Scheduler'] = None
|
||||
@ -194,7 +195,7 @@ class Scheduler:
|
||||
) from e
|
||||
|
||||
@staticmethod
|
||||
def release_own_shedules() -> None:
|
||||
def release_own_schedules() -> None:
|
||||
"""
|
||||
Releases all scheduleds being executed by this server
|
||||
"""
|
||||
@ -222,7 +223,7 @@ class Scheduler:
|
||||
"""
|
||||
# We ensure that the jobs are also in database so we can
|
||||
logger.debug('Run Scheduler thread')
|
||||
JobsFactory().ensureJobsInDatabase()
|
||||
JobsFactory().ensure_jobs_registered()
|
||||
logger.debug("At loop")
|
||||
while self._keep_running:
|
||||
try:
|
||||
@ -240,4 +241,4 @@ class Scheduler:
|
||||
except Exception:
|
||||
logger.exception('Exception clossing connection at delayed task')
|
||||
logger.info('Exiting Scheduler because stop has been requested')
|
||||
self.release_own_shedules()
|
||||
self.release_own_schedules()
|
||||
|
@ -361,7 +361,7 @@ class ServerManager(metaclass=singleton.Singleton):
|
||||
|
||||
return types.servers.ServerCounter(serverCounter.server_uuid, serverCounter.counter - 1)
|
||||
|
||||
def notifyPreconnect(
|
||||
def notify_preconnect(
|
||||
self,
|
||||
serverGroup: 'models.ServerGroup',
|
||||
userService: 'models.UserService',
|
||||
@ -377,7 +377,7 @@ class ServerManager(metaclass=singleton.Singleton):
|
||||
server = self.getServerAssignation(userService, serverGroup)
|
||||
|
||||
if server:
|
||||
requester.ServerApiRequester(server).notifyPreconnect(userService, info)
|
||||
requester.ServerApiRequester(server).notify_preconnect(userService, info)
|
||||
|
||||
def notifyAssign(
|
||||
self,
|
||||
|
@ -55,7 +55,7 @@ AUTH_TOKEN = 'X-TOKEN-AUTH'
|
||||
# If server is restrained, it will return False
|
||||
# If server is not restrained, it will execute the function and return it's result
|
||||
# If exception is raised, it will restrain the server and return False
|
||||
def restrainServer(func: collections.abc.Callable[..., typing.Any]) -> collections.abc.Callable[..., typing.Any]:
|
||||
def restrain_server(func: collections.abc.Callable[..., typing.Any]) -> collections.abc.Callable[..., typing.Any]:
|
||||
def inner(self: 'ServerApiRequester', *args: typing.Any, **kwargs: typing.Any) -> typing.Any:
|
||||
if self.server.isRestrained():
|
||||
return False
|
||||
@ -83,7 +83,7 @@ class ServerApiRequester:
|
||||
self.cache = cache.Cache('serverapi:' + server.uuid)
|
||||
|
||||
@contextlib.contextmanager
|
||||
def setupSession(
|
||||
def setup_session(
|
||||
self, *, minVersion: typing.Optional[str] = None
|
||||
) -> typing.Generator['requests.Session', None, None]:
|
||||
"""
|
||||
@ -124,7 +124,7 @@ class ServerApiRequester:
|
||||
except Exception:
|
||||
logger.error('Error removing temp file %s', verify)
|
||||
|
||||
def getCommsUrl(self, method: str, minVersion: typing.Optional[str]) -> typing.Optional[str]:
|
||||
def get_comms_endpoint(self, method: str, minVersion: typing.Optional[str]) -> typing.Optional[str]:
|
||||
"""
|
||||
Returns the url for a method on the server
|
||||
"""
|
||||
@ -133,14 +133,14 @@ class ServerApiRequester:
|
||||
):
|
||||
return None
|
||||
|
||||
return self.server.getCommsUrl(path=method)
|
||||
return self.server.get_comms_endpoint(path=method)
|
||||
|
||||
def get(self, method: str, *, minVersion: typing.Optional[str] = None) -> typing.Any:
|
||||
url = self.getCommsUrl(method, minVersion)
|
||||
url = self.get_comms_endpoint(method, minVersion)
|
||||
if not url:
|
||||
return None
|
||||
|
||||
with self.setupSession(minVersion=minVersion) as session:
|
||||
with self.setup_session(minVersion=minVersion) as session:
|
||||
response = session.get(url, timeout=(consts.system.DEFAULT_CONNECT_TIMEOUT, consts.system.DEFAULT_REQUEST_TIMEOUT))
|
||||
if not response.ok:
|
||||
logger.error(
|
||||
@ -151,11 +151,11 @@ class ServerApiRequester:
|
||||
return response.json()
|
||||
|
||||
def post(self, method: str, data: typing.Any, *, minVersion: typing.Optional[str] = None) -> typing.Any:
|
||||
url = self.getCommsUrl(method, minVersion)
|
||||
url = self.get_comms_endpoint(method, minVersion)
|
||||
if not url:
|
||||
return None
|
||||
|
||||
with self.setupSession(minVersion=minVersion) as session:
|
||||
with self.setup_session(minVersion=minVersion) as session:
|
||||
response = session.post(url, json=data, timeout=(consts.system.DEFAULT_CONNECT_TIMEOUT, consts.system.DEFAULT_REQUEST_TIMEOUT))
|
||||
if not response.ok:
|
||||
logger.error(
|
||||
@ -165,7 +165,7 @@ class ServerApiRequester:
|
||||
|
||||
return response.json()
|
||||
|
||||
@restrainServer
|
||||
@restrain_server
|
||||
def notifyAssign(
|
||||
self, userService: 'models.UserService', service_type: 'types.services.ServiceType', count: int
|
||||
) -> bool:
|
||||
@ -193,8 +193,8 @@ class ServerApiRequester:
|
||||
)
|
||||
return True
|
||||
|
||||
@restrainServer
|
||||
def notifyPreconnect(
|
||||
@restrain_server
|
||||
def notify_preconnect(
|
||||
self, userService: 'models.UserService', info: 'types.connections.ConnectionData'
|
||||
) -> bool:
|
||||
"""
|
||||
@ -227,7 +227,7 @@ class ServerApiRequester:
|
||||
)
|
||||
return True
|
||||
|
||||
@restrainServer
|
||||
@restrain_server
|
||||
def notifyRelease(self, userService: 'models.UserService') -> bool:
|
||||
"""
|
||||
Notifies removal of user service to server
|
||||
|
@ -120,7 +120,7 @@ class TaskManager(metaclass=singleton.Singleton):
|
||||
connection.close()
|
||||
|
||||
# Releases owned schedules so anyone can access them...
|
||||
Scheduler.release_own_shedules()
|
||||
Scheduler.release_own_schedules()
|
||||
|
||||
self.registerScheduledTasks()
|
||||
|
||||
|
@ -602,11 +602,11 @@ class UserServiceManager(metaclass=singleton.Singleton):
|
||||
except Exception:
|
||||
logger.exception('Reseting service')
|
||||
|
||||
def notifyPreconnect(self, userService: UserService, info: types.connections.ConnectionData) -> None:
|
||||
def notify_preconnect(self, userService: UserService, info: types.connections.ConnectionData) -> None:
|
||||
try:
|
||||
comms.notifyPreconnect(userService, info)
|
||||
comms.notify_preconnect(userService, info)
|
||||
except exceptions.actor.NoActorComms: # If no comms url for userService, try with service
|
||||
userService.deployed_service.service.notifyPreconnect(userService, info)
|
||||
userService.deployed_service.service.notify_preconnect(userService, info)
|
||||
|
||||
def checkUuid(self, userService: UserService) -> bool:
|
||||
return comms.checkUuid(userService)
|
||||
@ -795,7 +795,7 @@ class UserServiceManager(metaclass=singleton.Singleton):
|
||||
# If ready, show transport for this service, if also ready ofc
|
||||
userServiceInstance = userService.get_instance()
|
||||
ip = userServiceInstance.getIp()
|
||||
userService.logIP(ip) # Update known ip
|
||||
userService.log_ip(ip) # Update known ip
|
||||
logger.debug('IP: %s', ip)
|
||||
|
||||
if self.checkUuid(userService) is False: # The service is not the expected one
|
||||
@ -821,7 +821,7 @@ class UserServiceManager(metaclass=singleton.Singleton):
|
||||
transportInstance = transport.get_instance()
|
||||
if transportInstance.isAvailableFor(userService, ip):
|
||||
log.log(userService, log.LogLevel.INFO, "User service ready", log.LogSource.WEB)
|
||||
self.notifyPreconnect(
|
||||
self.notify_preconnect(
|
||||
userService,
|
||||
transportInstance.getConnectionInfo(userService, user, ''),
|
||||
)
|
||||
|
@ -47,7 +47,7 @@ logger = logging.getLogger(__name__)
|
||||
TIMEOUT = 2
|
||||
|
||||
|
||||
def _requestActor(
|
||||
def _execute_actor_request(
|
||||
userService: 'UserService',
|
||||
method: str,
|
||||
data: typing.Optional[collections.abc.MutableMapping[str, typing.Any]] = None,
|
||||
@ -59,7 +59,7 @@ def _requestActor(
|
||||
if no communications url is provided or no min version, raises a "NoActorComms" exception (or OldActorVersion, derived from NoActorComms)
|
||||
Returns request response value interpreted as json
|
||||
"""
|
||||
url = userService.getCommsUrl()
|
||||
url = userService.get_comms_endpoint()
|
||||
if not url:
|
||||
raise exceptions.actor.NoActorComms(f'No notification urls for {userService.friendly_name}')
|
||||
|
||||
@ -119,15 +119,15 @@ def _requestActor(
|
||||
return js
|
||||
|
||||
|
||||
def notifyPreconnect(userService: 'UserService', info: types.connections.ConnectionData) -> None:
|
||||
def notify_preconnect(userService: 'UserService', info: types.connections.ConnectionData) -> None:
|
||||
"""
|
||||
Notifies a preconnect to an user service
|
||||
"""
|
||||
src = userService.getConnectionSource()
|
||||
if userService.deployed_service.service.get_instance().notifyPreconnect(userService, info) is True:
|
||||
if userService.deployed_service.service.get_instance().notify_preconnect(userService, info) is True:
|
||||
return # Ok, service handled it
|
||||
|
||||
_requestActor(
|
||||
_execute_actor_request(
|
||||
userService,
|
||||
'preConnect',
|
||||
types.connections.PreconnectRequest(
|
||||
@ -148,7 +148,7 @@ def checkUuid(userService: 'UserService') -> bool:
|
||||
Checks if the uuid of the service is the same of our known uuid on DB
|
||||
"""
|
||||
try:
|
||||
uuid = _requestActor(userService, 'uuid')
|
||||
uuid = _execute_actor_request(userService, 'uuid')
|
||||
if uuid and uuid != userService.uuid: # Empty UUID means "no check this, fixed pool machine"
|
||||
logger.info(
|
||||
'Machine %s do not have expected uuid %s, instead has %s',
|
||||
@ -177,7 +177,7 @@ def requestScreenshot(userService: 'UserService') -> None:
|
||||
"""
|
||||
try:
|
||||
# Data = {} forces an empty POST
|
||||
_requestActor(
|
||||
_execute_actor_request(
|
||||
userService, 'screenshot', data={}, minVersion='4.0.0'
|
||||
) # First valid version with screenshot is 3.0
|
||||
except exceptions.actor.NoActorComms:
|
||||
@ -194,7 +194,7 @@ def sendScript(userService: 'UserService', script: str, forUser: bool = False) -
|
||||
if forUser:
|
||||
data['user'] = forUser
|
||||
# Data = {} forces an empty POST
|
||||
_requestActor(userService, 'script', data=data)
|
||||
_execute_actor_request(userService, 'script', data=data)
|
||||
except exceptions.actor.NoActorComms:
|
||||
pass
|
||||
|
||||
@ -204,7 +204,7 @@ def requestLogoff(userService: 'UserService') -> None:
|
||||
Ask client to logoff user
|
||||
"""
|
||||
try:
|
||||
_requestActor(userService, 'logout', data={})
|
||||
_execute_actor_request(userService, 'logout', data={})
|
||||
except exceptions.actor.NoActorComms:
|
||||
pass
|
||||
|
||||
@ -214,6 +214,6 @@ def sendMessage(userService: 'UserService', message: str) -> None:
|
||||
Sends an screen message to client
|
||||
"""
|
||||
try:
|
||||
_requestActor(userService, 'message', data={'message': message})
|
||||
_execute_actor_request(userService, 'message', data={'message': message})
|
||||
except exceptions.actor.NoActorComms:
|
||||
pass
|
||||
|
@ -78,7 +78,7 @@ class StateUpdater:
|
||||
ip = self.userServiceInstance.getIp()
|
||||
|
||||
if ip is not None and ip != '':
|
||||
self.userService.logIP(ip)
|
||||
self.userService.log_ip(ip)
|
||||
|
||||
def checkLater(self):
|
||||
UserServiceOpChecker.checkLater(self.userService, self.userServiceInstance)
|
||||
@ -162,7 +162,7 @@ class UpdateFromPreparing(StateUpdater):
|
||||
|
||||
# By default, if not valid publication, service will be marked for removal on preparation finished
|
||||
state = State.REMOVABLE
|
||||
if self.userService.isValidPublication():
|
||||
if self.userService.check_publication_validity():
|
||||
logger.debug('Publication is valid for %s', self.userService.friendly_name)
|
||||
state = self.checkOsManagerRelated()
|
||||
|
||||
|
@ -216,7 +216,7 @@ class OSManager(Module):
|
||||
"""
|
||||
|
||||
def logKnownIp(self, userService: 'UserService', ip: str) -> None:
|
||||
userService.logIP(ip)
|
||||
userService.log_ip(ip)
|
||||
|
||||
def toReady(self, userService: 'UserService') -> None:
|
||||
'''
|
||||
|
@ -451,7 +451,7 @@ class Service(Module):
|
||||
self.storage.delete('__nfo_' + id)
|
||||
return value
|
||||
|
||||
def notifyPreconnect(
|
||||
def notify_preconnect(
|
||||
self, userService: 'models.UserService', info: 'types.connections.ConnectionData'
|
||||
) -> bool:
|
||||
"""
|
||||
|
@ -130,14 +130,14 @@ class Transport(Module):
|
||||
Invoked when Transport is deleted
|
||||
"""
|
||||
|
||||
def testServer(
|
||||
def test_connectivity(
|
||||
self,
|
||||
userService: 'models.UserService',
|
||||
ip: str,
|
||||
port: typing.Union[str, int],
|
||||
timeout: float = 4,
|
||||
) -> bool:
|
||||
return net.testConnection(ip, str(port), timeout)
|
||||
return net.test_connectivity(ip, int(port), timeout)
|
||||
|
||||
def isAvailableFor(self, userService: 'models.UserService', ip: str) -> bool:
|
||||
"""
|
||||
|
@ -33,4 +33,4 @@
|
||||
|
||||
# Import alias
|
||||
# pylint: disable=unused-import
|
||||
from .net import testConnection as testServer
|
||||
from .net import test_connectivity
|
||||
|
@ -281,12 +281,12 @@ def isValidHost(value: str):
|
||||
return isValidIp(value) or isValidFQDN(value)
|
||||
|
||||
|
||||
def testConnection(host: str, port: typing.Union[int, str], timeOut: float = 4) -> bool:
|
||||
def test_connectivity(host: str, port: int, timeOut: float = 4) -> bool:
|
||||
try:
|
||||
logger.debug(
|
||||
'Checking connection to %s:%s with %s seconds timeout', host, port, timeOut
|
||||
)
|
||||
sock = socket.create_connection((host, int(port)), timeOut)
|
||||
sock = socket.create_connection((host, port), timeOut)
|
||||
sock.close()
|
||||
except Exception as e:
|
||||
logger.debug(
|
||||
|
@ -363,9 +363,9 @@ class Server(UUIDModel, TaggingMixin, properties.PropertiesMixin):
|
||||
|
||||
def setActorVersion(self, userService: 'UserService') -> None:
|
||||
"""Sets the actor version of this server to the userService"""
|
||||
userService.setActorVersion(f'Server {self.version or "unknown"}')
|
||||
userService.actor_version = f'Server {self.version or "unknown"}'
|
||||
|
||||
def getCommsUrl(self, *, path: typing.Optional[str] = None) -> typing.Optional[str]:
|
||||
def get_comms_endpoint(self, *, path: typing.Optional[str] = None) -> typing.Optional[str]:
|
||||
"""
|
||||
Returns the url for a path to this server
|
||||
|
||||
|
@ -169,10 +169,10 @@ class Service(ManagedObjectModel, TaggingMixin): # type: ignore
|
||||
# orphaned services?
|
||||
return self.provider.isInMaintenance() if self.provider else True
|
||||
|
||||
def testServer(self, host: str, port: typing.Union[str, int], timeout: float = 4) -> bool:
|
||||
return net.testConnection(host, port, timeout)
|
||||
def test_connectivity(self, host: str, port: typing.Union[str, int], timeout: float = 4) -> bool:
|
||||
return net.test_connectivity(host, int(port), timeout)
|
||||
|
||||
def notifyPreconnect(self, userService: 'UserService', info: 'types.connections.ConnectionData') -> None:
|
||||
def notify_preconnect(self, userService: 'UserService', info: 'types.connections.ConnectionData') -> None:
|
||||
"""
|
||||
Notify preconnect event to service, so it can do whatever it needs to do before connecting
|
||||
|
||||
@ -182,7 +182,7 @@ class Service(ManagedObjectModel, TaggingMixin): # type: ignore
|
||||
|
||||
Note:
|
||||
Override this method if you need to do something before connecting to a service
|
||||
(i.e. invoke notifyPreconnect using a Server, or whatever you need to do)
|
||||
(i.e. invoke notify_preconnect using a Server, or whatever you need to do)
|
||||
"""
|
||||
logger.warning('No actor notification available for user service %s', userService.friendly_name)
|
||||
|
||||
|
@ -650,8 +650,8 @@ class ServicePool(UUIDModel, TaggingMixin): # type: ignore
|
||||
|
||||
return types.pools.UsageInfo(cachedValue, maxs)
|
||||
|
||||
def testServer(self, host: str, port: typing.Union[str, int], timeout: float = 4) -> bool:
|
||||
return bool(self.service) and self.service.testServer(host, port, timeout)
|
||||
def test_connectivity(self, host: str, port: typing.Union[str, int], timeout: float = 4) -> bool:
|
||||
return bool(self.service) and self.service.test_connectivity(host, port, timeout)
|
||||
|
||||
# Utility for logging
|
||||
def log(self, message: str, level: log.LogLevel = log.LogLevel.INFO) -> None:
|
||||
|
@ -570,31 +570,33 @@ class UserService(UUIDModel, properties.PropertiesMixin):
|
||||
def setCommsUrl(self, commsUrl: typing.Optional[str] = None) -> None:
|
||||
self.properties['comms_url'] = commsUrl
|
||||
|
||||
def getCommsUrl(
|
||||
def get_comms_endpoint(
|
||||
self, path: typing.Optional[str] = None
|
||||
) -> typing.Optional[str]: # pylint: disable=unused-argument
|
||||
# path is not used, but to keep compat with Server "getCommUrl" method
|
||||
return self.properties.get('comms_url', None)
|
||||
|
||||
def notifyPreconnect(self) -> None:
|
||||
def notify_preconnect(self) -> None:
|
||||
"""
|
||||
Notifies preconnect to userService
|
||||
"""
|
||||
pass
|
||||
|
||||
|
||||
def logIP(self, ip: typing.Optional[str] = None) -> None:
|
||||
def log_ip(self, ip: typing.Optional[str] = None) -> None:
|
||||
self.properties['ip'] = ip
|
||||
|
||||
def getLoggedIP(self) -> str:
|
||||
def get_log_ip(self) -> str:
|
||||
return self.properties.get('ip') or '0.0.0.0' # nosec: no binding address
|
||||
|
||||
def setActorVersion(self, version: typing.Optional[str] = None) -> None:
|
||||
self.properties['actor_version'] = version
|
||||
|
||||
def getActorVersion(self) -> str:
|
||||
@property
|
||||
def actor_version(self) -> str:
|
||||
return self.properties.get('actor_version') or '0.0.0'
|
||||
|
||||
def isValidPublication(self) -> bool:
|
||||
@actor_version.setter
|
||||
def actor_version(self, version: str) -> None:
|
||||
self.properties['actor_version'] = version
|
||||
|
||||
def check_publication_validity(self) -> bool:
|
||||
"""
|
||||
Returns True if this user service does not needs an publication, or if this deployed service publication is the current one
|
||||
"""
|
||||
@ -606,8 +608,8 @@ class UserService(UUIDModel, properties.PropertiesMixin):
|
||||
def log(self, message: str, level: log.LogLevel = log.LogLevel.INFO) -> None:
|
||||
log.log(self, level, message, log.LogSource.INTERNAL)
|
||||
|
||||
def testServer(self, host, port, timeout=4) -> bool:
|
||||
return self.deployed_service.testServer(host, port, timeout)
|
||||
def test_connectivity(self, host, port, timeout=4) -> bool:
|
||||
return self.deployed_service.test_connectivity(host, port, timeout)
|
||||
|
||||
def __str__(self):
|
||||
return (
|
||||
|
@ -125,7 +125,7 @@ class LinuxOsManager(osmanagers.OSManager):
|
||||
'''
|
||||
if not userService.in_use:
|
||||
if (self._onLogout == 'remove') or (
|
||||
not userService.isValidPublication() and self._onLogout == 'keep'
|
||||
not userService.check_publication_validity() and self._onLogout == 'keep'
|
||||
):
|
||||
return True
|
||||
|
||||
|
@ -97,7 +97,7 @@ class TestOSManager(osmanagers.OSManager):
|
||||
'''
|
||||
if not userService.in_use:
|
||||
if (self.onLogout.value == 'remove') or (
|
||||
not userService.isValidPublication() and self.onLogout.value == 'keep'
|
||||
not userService.check_publication_validity() and self.onLogout.value == 'keep'
|
||||
):
|
||||
return True
|
||||
|
||||
|
@ -128,7 +128,7 @@ class WindowsOsManager(osmanagers.OSManager):
|
||||
"""
|
||||
if not userService.in_use:
|
||||
if (self._onLogout == 'remove') or (
|
||||
not userService.isValidPublication() and self._onLogout == 'keep'
|
||||
not userService.check_publication_validity() and self._onLogout == 'keep'
|
||||
):
|
||||
return True
|
||||
|
||||
|
@ -319,7 +319,7 @@ class OGDeployment(services.UserService):
|
||||
if dbs:
|
||||
dbs.properties['actor_version'] = '1.1-OpenGnsys'
|
||||
dbs.properties['token'] = token
|
||||
dbs.logIP(self._ip)
|
||||
dbs.log_ip(self._ip)
|
||||
|
||||
return State.RUNNING
|
||||
|
||||
|
@ -285,7 +285,7 @@ class IPMachinesService(IPServiceBase):
|
||||
wolENABLED = bool(self.parent().wolURL(theIP, theMAC))
|
||||
# Now, check if it is available on port, if required...
|
||||
if self._port > 0 and not wolENABLED: # If configured WOL, check is a nonsense
|
||||
if net.testConnection(theIP, self._port, timeOut=0.5) is False:
|
||||
if net.test_connectivity(theIP, self._port, timeOut=0.5) is False:
|
||||
# Log into logs of provider, so it can be "shown" on services logs
|
||||
self.parent().do_log(
|
||||
log.LogLevel.WARNING,
|
||||
|
@ -303,7 +303,7 @@ class HTML5RDPTransport(transports.Transport):
|
||||
ready = self.cache.get(ip)
|
||||
if not ready:
|
||||
# Check again for readyness
|
||||
if self.testServer(userService, ip, self.rdpPort.num()) is True:
|
||||
if self.test_connectivity(userService, ip, self.rdpPort.num()) is True:
|
||||
self.cache.put(ip, 'Y', READY_CACHE_TIMEOUT)
|
||||
return True
|
||||
self.cache.put(ip, 'N', READY_CACHE_TIMEOUT)
|
||||
|
@ -166,7 +166,7 @@ class HTML5SSHTransport(transports.Transport):
|
||||
ready = self.cache.get(ip)
|
||||
if not ready:
|
||||
# Check again for readyness
|
||||
if self.testServer(userService, ip, self.sshPort.value) is True:
|
||||
if self.test_connectivity(userService, ip, self.sshPort.value) is True:
|
||||
self.cache.put(ip, 'Y', READY_CACHE_TIMEOUT)
|
||||
return True
|
||||
self.cache.put(ip, 'N', READY_CACHE_TIMEOUT)
|
||||
|
@ -150,7 +150,7 @@ class HTML5VNCTransport(transports.Transport):
|
||||
ready = self.cache.get(ip)
|
||||
if not ready:
|
||||
# Check again for readyness
|
||||
if self.testServer(userService, ip, self.vncPort.value) is True:
|
||||
if self.test_connectivity(userService, ip, self.vncPort.value) is True:
|
||||
self.cache.put(ip, 'Y', READY_CACHE_TIMEOUT)
|
||||
return True
|
||||
self.cache.put(ip, 'N', READY_CACHE_TIMEOUT)
|
||||
|
@ -340,7 +340,7 @@ class BaseRDPTransport(transports.Transport):
|
||||
ready = self.cache.get(ip)
|
||||
if ready is None:
|
||||
# Check again for ready
|
||||
if self.testServer(userService, ip, self.rdpPort.num()) is True:
|
||||
if self.test_connectivity(userService, ip, self.rdpPort.num()) is True:
|
||||
self.cache.put(ip, 'Y', READY_CACHE_TIMEOUT)
|
||||
return True
|
||||
self.cache.put(ip, 'N', READY_CACHE_TIMEOUT)
|
||||
|
@ -179,7 +179,7 @@ class BaseSpiceTransport(transports.Transport):
|
||||
120,
|
||||
)
|
||||
|
||||
if self.testServer(userService, con['address'], port_to_test) is True:
|
||||
if self.test_connectivity(userService, con['address'], port_to_test) is True:
|
||||
self.cache.put(ip, 'Y', READY_CACHE_TIMEOUT)
|
||||
ready = 'Y'
|
||||
|
||||
|
@ -210,7 +210,7 @@ class BaseX2GOTransport(transports.Transport):
|
||||
ready = self.cache.get(ip)
|
||||
if ready is None:
|
||||
# Check again for ready
|
||||
if connection.testServer(ip, 22):
|
||||
if connection.test_connectivity(ip, 22):
|
||||
self.cache.put(ip, 'Y', READY_CACHE_TIMEOUT)
|
||||
return True
|
||||
self.cache.put(ip, 'N', READY_CACHE_TIMEOUT)
|
||||
|
@ -346,7 +346,7 @@ def update_transport_ticket(
|
||||
userService = models.UserService.objects.get(
|
||||
uuid=data['ticket-info'].get('userService', None)
|
||||
)
|
||||
UserServiceManager().notifyPreconnect(
|
||||
UserServiceManager().notify_preconnect(
|
||||
userService, types.connections.ConnectionData(
|
||||
username=username,
|
||||
protocol=data.get('protocol', ''),
|
||||
|
@ -165,7 +165,7 @@ def userServiceStatus(request: 'ExtendedHttpRequestWithUser', idService: str, id
|
||||
try:
|
||||
userServiceInstance = userService.get_instance()
|
||||
ip = userServiceInstance.getIp()
|
||||
userService.logIP(ip)
|
||||
userService.log_ip(ip)
|
||||
# logger.debug('Res: %s %s %s %s %s', ip, userService, userServiceInstance, transport, transportInstance)
|
||||
except ServiceNotReadyError:
|
||||
ip = None
|
||||
|
30
server/tests/core/jobs/__init__.py
Normal file
30
server/tests/core/jobs/__init__.py
Normal file
@ -0,0 +1,30 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
#
|
||||
# Copyright (c) 2024 Virtual Cable S.L.U.
|
||||
# All rights reserved.
|
||||
#
|
||||
# Redistribution and use in source and binary forms, with or without modification,
|
||||
# are permitted provided that the following conditions are met:
|
||||
#
|
||||
# * Redistributions of source code must retain the above copyright notice,
|
||||
# this list of conditions and the following disclaimer.
|
||||
# * Redistributions in binary form must reproduce the above copyright notice,
|
||||
# this list of conditions and the following disclaimer in the documentation
|
||||
# and/or other materials provided with the distribution.
|
||||
# * Neither the name of Virtual Cable S.L.U. nor the names of its contributors
|
||||
# may be used to endorse or promote products derived from this software
|
||||
# without specific prior written permission.
|
||||
#
|
||||
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
|
||||
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
|
||||
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
|
||||
# DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
|
||||
# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
|
||||
# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
|
||||
# SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
|
||||
# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
|
||||
# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
|
||||
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
|
||||
"""
|
||||
Author: Adolfo Gómez, dkmaster at dkmon dot com
|
||||
"""
|
89
server/tests/core/jobs/test_scheduler.py
Normal file
89
server/tests/core/jobs/test_scheduler.py
Normal file
@ -0,0 +1,89 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
|
||||
#
|
||||
# Copyright (c) 2022 Virtual Cable S.L.U.
|
||||
# All rights reserved.
|
||||
#
|
||||
# Redistribution and use in source and binary forms, with or without modification,
|
||||
# are permitted provided that the following conditions are met:
|
||||
#
|
||||
# * Redistributions of source code must retain the above copyright notice,
|
||||
# this list of conditions and the following disclaimer.
|
||||
# * Redistributions in binary form must reproduce the above copyright notice,
|
||||
# this list of conditions and the following disclaimer in the documentation
|
||||
# and/or other materials provided with the distribution.
|
||||
# * Neither the name of Virtual Cable S.L.U. nor the names of its contributors
|
||||
# may be used to endorse or promote products derived from this software
|
||||
# without specific prior written permission.
|
||||
#
|
||||
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
|
||||
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
|
||||
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
|
||||
# DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
|
||||
# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
|
||||
# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
|
||||
# SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
|
||||
# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
|
||||
# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
|
||||
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
|
||||
|
||||
"""
|
||||
@author: Adolfo Gómez, dkmaster at dkmon dot com
|
||||
"""
|
||||
import time
|
||||
import threading
|
||||
from tracemalloc import stop
|
||||
from unittest import mock
|
||||
|
||||
from django.test import TransactionTestCase
|
||||
|
||||
from uds.core.jobs import scheduler, jobs_factory
|
||||
|
||||
|
||||
class SchedulerTest(TransactionTestCase):
|
||||
def setUp(self) -> None:
|
||||
scheduler.Scheduler.granularity = 0.1 # type: ignore # Speed up tests
|
||||
|
||||
def test_init_execute_and_shutdown(self) -> None:
|
||||
sch = scheduler.Scheduler()
|
||||
# Patch:
|
||||
# * execute_job to call notify_termination
|
||||
# * release_own_shedules to do nothing
|
||||
# * jobs_factory.JobsFactory().ensure_jobs_registered to do nothing (JobsFactory is a singleton)
|
||||
with mock.patch.object(sch, 'execute_job') as mock_execute_job, mock.patch.object(
|
||||
sch, 'release_own_schedules'
|
||||
) as mock_release_own_schedules, mock.patch.object(
|
||||
jobs_factory.JobsFactory(), 'ensure_jobs_registered'
|
||||
) as mock_ensure_jobs_registered:
|
||||
left = 4
|
||||
|
||||
def our_execute_job(*args, **kwargs) -> None:
|
||||
nonlocal left
|
||||
left -= 1
|
||||
if left == 0:
|
||||
sch.notify_termination()
|
||||
|
||||
mock_execute_job.side_effect = our_execute_job
|
||||
|
||||
# Execute run, but if it does not call execute_job, it will hang
|
||||
# so we execute a thread that will call notify_termination after 1 second
|
||||
stop_event = threading.Event()
|
||||
|
||||
def ensure_stops() -> None:
|
||||
stop_event.wait(scheduler.Scheduler.granularity * 10)
|
||||
if left > 0:
|
||||
sch.notify_termination()
|
||||
|
||||
watchdog = threading.Thread(target=ensure_stops)
|
||||
watchdog.start()
|
||||
|
||||
sch.run()
|
||||
|
||||
# If watchdog is alive, it means that notify_termination was not called
|
||||
if watchdog.is_alive():
|
||||
stop_event.set()
|
||||
watchdog.join()
|
||||
|
||||
self.assertEqual(left, 0) # If left is 0, it means that execute_job was called 4 times
|
||||
mock_release_own_schedules.assert_called_once()
|
||||
mock_ensure_jobs_registered.assert_called_once()
|
Loading…
Reference in New Issue
Block a user