1
0
mirror of https://github.com/dkmstr/openuds.git synced 2024-12-22 13:34:04 +03:00

Some renames

This commit is contained in:
Adolfo Gómez García 2024-03-31 20:23:19 +02:00
parent fc230d7205
commit 2290b4f235
No known key found for this signature in database
GPG Key ID: DD1ABF20724CDA23
12 changed files with 58 additions and 28 deletions

View File

@ -55,7 +55,7 @@ class ServerEventsLogTest(rest.test.RESTTestCase):
# 'level': 'debug|info'|'warning'|'error',
# 'message': 'message',
# }
server = servers_fixtures.createServer()
server = servers_fixtures.create_server()
userService = self.user_service_managed
with mock.patch('uds.core.managers.log.manager.LogManager.log') as the_log:
@ -91,7 +91,7 @@ class ServerEventsLogTest(rest.test.RESTTestCase):
)
def test_event_log_fail(self) -> None:
server = servers_fixtures.createServer()
server = servers_fixtures.create_server()
data = {
'token': server.token,
'type': 'log',

View File

@ -55,7 +55,7 @@ class ServerEventsLoginLogoutTest(rest.test.RESTTestCase):
def setUp(self) -> None:
super().setUp()
self.server = servers_fixtures.createServer()
self.server = servers_fixtures.create_server()
def test_login(self) -> None:
# REST path: servers/notify (/uds/rest/...)

View File

@ -58,7 +58,7 @@ class ServerEventsPingTest(rest.test.RESTTestCase):
def setUp(self) -> None:
super().setUp()
self.server = servers_fixtures.createServer()
self.server = servers_fixtures.create_server()
def test_event_ping_with_stats(self) -> None:
# Ping event

View File

@ -56,7 +56,7 @@ class ServerTestTest(rest.test.RESTTestCase):
"""
Test server rest api registration
"""
server = servers_fixtures.createServer()
server = servers_fixtures.create_server()
response = self.client.rest_post(
'servers/test',
data={

View File

@ -78,7 +78,7 @@ class ServerManagerManagedServersTest(UDSTestCase):
# So we have 8 userservices, each one with a different user
self.user_services.extend(services_fixtures.create_db_cache_userservices())
self.registered_servers_group = servers_fixtures.createServerGroup(
self.registered_servers_group = servers_fixtures.create_server_group(
type=types.servers.ServerType.SERVER, subtype='test', num_servers=NUM_REGISTEREDSERVERS
)
# commodity call to assign

View File

@ -40,9 +40,8 @@ import functools
import logging
from uds import models
from uds.core import types, exceptions
from uds.core import types
from uds.core.managers import servers
from uds.core.util import storage
from ...fixtures import servers as servers_fixtures
from ...fixtures import services as services_fixtures
@ -68,11 +67,11 @@ class ServerManagerUnmanagedServersTest(UDSTestCase):
# Manager is a singleton, clear counters
# self.manager.clearCounters()
for i in range(NUM_USERSERVICES):
for _ in range(NUM_USERSERVICES):
# So we have 8 userservices, each one with a different user
self.user_services.extend(services_fixtures.create_db_cache_userservices())
self.registered_servers_group = servers_fixtures.createServerGroup(
self.registered_servers_group = servers_fixtures.create_server_group(
type=types.servers.ServerType.UNMANAGED, subtype='test', num_servers=NUM_REGISTEREDSERVERS
)
# commodity call to assign
@ -168,7 +167,7 @@ class ServerManagerUnmanagedServersTest(UDSTestCase):
# # Remove it, should decrement counter
for i in range(32, -1, -1): # Deletes 33 times
res = self.manager.release(userService, self.registered_servers_group)
_res = self.manager.release(userService, self.registered_servers_group)
self.assertEqual(len(self.registered_servers_group.properties), 0)

View File

@ -100,6 +100,48 @@ class DynamicPublicationTest(UDSTestCase):
publication = fixtures.create_dynamic_publication_queue(service)
self.check_iterations(service, publication, EXPECTED_DEPLOY_ITERATIONS_INFO)
def test_publication_fails_on_initialize(self) -> None:
service = fixtures.create_dynamic_service()
publication = fixtures.create_dynamic_publication(service)
# Mock op_initialize and make it fail with an exception
with mock.patch.object(publication, 'op_initialize', side_effect=Exception('Test')):
state = publication.publish()
self.assertEqual(state, types.states.TaskState.ERROR)
# Check that the reason is the exception
self.assertEqual(publication._reason, 'Test')
# Check that the queue is empty (only ERROR operation)
self.assertEqual(publication._queue, [types.services.Operation.ERROR])
def test_publication_fails_on_create(self) -> None:
service = fixtures.create_dynamic_service()
publication = fixtures.create_dynamic_publication(service)
# Mock op_create and make it fail with an exception
with mock.patch.object(publication, 'op_create', side_effect=Exception('Test')):
state = publication.publish() # Firt iteration is INITIALIZE
self.assertEqual(state, types.states.TaskState.RUNNING) # Should work
state = publication.check_state() # Second iteration is CREATE
self.assertEqual(state, types.states.TaskState.ERROR)
# Check that the reason is the exception
self.assertEqual(publication._reason, 'Test')
# Check that the queue is empty (only ERROR operation)
self.assertEqual(publication._queue, [types.services.Operation.ERROR])
def test_publication_fails_on_create_completed(self) -> None:
service = fixtures.create_dynamic_service()
publication = fixtures.create_dynamic_publication(service)
# Mock op_create_completed and make it fail with an exception
with mock.patch.object(publication, 'op_create_completed', side_effect=Exception('Test')):
state = publication.publish()
self.assertEqual(state, types.states.TaskState.RUNNING) # Should work
state = publication.check_state()
self.assertEqual(state, types.states.TaskState.RUNNING) # Should work
state = publication.check_state()
self.assertEqual(state, types.states.TaskState.ERROR)
# Check that the reason is the exception
self.assertEqual(publication._reason, 'Test')
# Check that the queue is empty (only ERROR operation)
self.assertEqual(publication._queue, [types.services.Operation.ERROR])
EXPECTED_DEPLOY_ITERATIONS_INFO: typing.Final[list[DynamicPublicationIterationInfo]] = [
# Initial state for queue

View File

@ -81,7 +81,7 @@ class PermissionsTest(UDSTestCase):
self.service = self.servicePool.service
self.provider = self.service.provider
self.network = network_fixtures.createNetwork()
self.network = network_fixtures.create_network()
def doTestUserPermissions(self, obj: 'Model', user: models.User) -> None:
permissions.add_user_permission(user, obj, uds.core.types.permissions.PermissionType.NONE)

View File

@ -29,7 +29,6 @@
@author: Adolfo Gómez, dkmaster at dkmon dot com
"""
import typing
import collections.abc
import random # nosec: testing only
from uds import models

View File

@ -28,9 +28,6 @@
"""
@author: Adolfo Gómez, dkmaster at dkmon dot com
"""
import typing
import collections.abc
from uds import models
@ -38,7 +35,7 @@ from uds import models
glob = {'network_id': 1}
def createNetwork() -> models.Network:
def create_network() -> models.Network:
return models.Network.create(
'Network %d' % glob['network_id'],
'192.168.{n}.0-192.168.{n}.255'.format(n=glob['network_id']),

View File

@ -29,7 +29,6 @@
@author: Adolfo Gómez, dkmaster at dkmon dot com
"""
import typing
import collections.abc
from uds import models
from uds.core import types
@ -38,7 +37,7 @@ import datetime
from ..utils import generators
def createServer(
def create_server(
type: 'types.servers.ServerType' = types.servers.ServerType.SERVER,
subtype: typing.Optional[str] = None,
version: typing.Optional[str] = None,
@ -62,7 +61,7 @@ def createServer(
)
def createServerGroup(
def create_server_group(
type: 'types.servers.ServerType' = types.servers.ServerType.SERVER,
subtype: typing.Optional[str] = None,
version: typing.Optional[str] = None,
@ -80,8 +79,8 @@ def createServerGroup(
host=host or '',
port=port,
)
for i in range(num_servers):
server = createServer(type, subtype=subtype, version=version, ip=ip, listen_port=listen_port)
for _ in range(num_servers):
server = create_server(type, subtype=subtype, version=version, ip=ip, listen_port=listen_port)
rsg.servers.add(server)
return rsg

View File

@ -30,7 +30,6 @@
"""
import datetime
import typing
import collections.abc
from uds import models
from uds.core import environment, types
@ -122,8 +121,6 @@ def create_db_servicepool(
transports: typing.Optional[list[models.Transport]] = None,
servicePoolGroup: typing.Optional[models.ServicePoolGroup] = None,
) -> models.ServicePool:
from uds.services.Test.service import TestServiceCache, TestServiceNoCache
from uds.osmanagers.Test import TestOSManager
service_pool: 'models.ServicePool' = service.deployedServices.create(
name='Service pool %d' % (glob['service_pool_id']),
@ -229,9 +226,6 @@ def create_db_one_cache_userservice(
groups: list['models.Group'],
type_: typing.Union[typing.Literal['managed'], typing.Literal['unmanaged']],
) -> 'models.UserService':
from uds.services.Test.service import TestServiceCache, TestServiceNoCache
from uds.osmanagers.Test import TestOSManager
from uds.transports.Test import TestTransport
service = create_db_service(provider)