mirror of
https://github.com/dkmstr/openuds.git
synced 2025-03-11 00:58:39 +03:00
Added basic service data "composer" for user services, normal pools and meta pools
This commit is contained in:
parent
597d72a609
commit
aca3073eea
161
server/src/tests/fixtures/services.py
vendored
161
server/src/tests/fixtures/services.py
vendored
@ -45,6 +45,8 @@ glob = {
|
||||
'transport_id': 1,
|
||||
'service_pool_id': 1,
|
||||
'user_service_id': 1,
|
||||
'meta_pool_id': 1,
|
||||
'service_pool_group_id': 1,
|
||||
}
|
||||
|
||||
|
||||
@ -62,18 +64,10 @@ def createProvider() -> models.Provider:
|
||||
return provider
|
||||
|
||||
|
||||
def createOneCacheTestingUserService(
|
||||
provider: 'models.Provider',
|
||||
user: 'models.User',
|
||||
groups: typing.List['models.Group'],
|
||||
type_: typing.Union[typing.Literal['managed'], typing.Literal['unmanaged']],
|
||||
) -> 'models.UserService':
|
||||
|
||||
def createService(provider: models.Provider) -> models.Service:
|
||||
from uds.services.Test.service import TestServiceCache, TestServiceNoCache
|
||||
from uds.osmanagers.Test import TestOSManager
|
||||
from uds.transports.Test import TestTransport
|
||||
|
||||
service: 'models.Service' = provider.services.create(
|
||||
service = provider.services.create(
|
||||
name='Service {}'.format(glob['service_id']),
|
||||
data_type=TestServiceCache.typeType,
|
||||
data=TestServiceCache(
|
||||
@ -83,25 +77,53 @@ def createOneCacheTestingUserService(
|
||||
)
|
||||
glob['service_id'] += 1 # In case we generate a some more services elsewhere
|
||||
|
||||
"""
|
||||
Creates several testing OS Managers
|
||||
"""
|
||||
return service
|
||||
|
||||
|
||||
def createOsManager() -> models.OSManager:
|
||||
from uds.osmanagers.Test import TestOSManager
|
||||
|
||||
values: typing.Dict[str, typing.Any] = {
|
||||
'onLogout': 'remove',
|
||||
'idle': 300,
|
||||
}
|
||||
osmanager: typing.Optional['models.OSManager'] = None
|
||||
if type_ == 'managed':
|
||||
osmanager = models.OSManager.objects.create(
|
||||
name='OS Manager %d' % (glob['osmanager_id']),
|
||||
comments='Comment for OS Manager %d' % (glob['osmanager_id']),
|
||||
data_type=TestOSManager.typeType,
|
||||
data=TestOSManager(
|
||||
environment.Environment(str(glob['osmanager_id'])), values
|
||||
).serialize(),
|
||||
osmanager = models.OSManager.objects.create(
|
||||
name='OS Manager %d' % (glob['osmanager_id']),
|
||||
comments='Comment for OS Manager %d' % (glob['osmanager_id']),
|
||||
data_type=TestOSManager.typeType,
|
||||
data=TestOSManager(
|
||||
environment.Environment(str(glob['osmanager_id'])), values
|
||||
).serialize(),
|
||||
)
|
||||
glob['osmanager_id'] += 1
|
||||
|
||||
return osmanager
|
||||
|
||||
|
||||
def createServicePoolGroup(
|
||||
image: typing.Optional[models.Image] = None,
|
||||
) -> models.ServicePoolGroup:
|
||||
service_pool_group: 'models.ServicePoolGroup' = (
|
||||
models.ServicePoolGroup.objects.create(
|
||||
name='Service pool group %d' % (glob['service_pool_group_id']),
|
||||
comments=f'Comment for service pool group {glob["service_pool_group_id"]}',
|
||||
image=image,
|
||||
)
|
||||
glob['osmanager_id'] += 1
|
||||
)
|
||||
glob['service_pool_group_id'] += 1
|
||||
|
||||
return service_pool_group
|
||||
|
||||
|
||||
def createServicePool(
|
||||
service: models.Service,
|
||||
osmanager: typing.Optional[models.OSManager] = None,
|
||||
groups: typing.Optional[typing.List[models.Group]] = None,
|
||||
transports: typing.Optional[typing.List[models.Transport]] = None,
|
||||
servicePoolGroup: typing.Optional[models.ServicePoolGroup] = None,
|
||||
) -> models.ServicePool:
|
||||
from uds.services.Test.service import TestServiceCache, TestServiceNoCache
|
||||
from uds.osmanagers.Test import TestOSManager
|
||||
|
||||
service_pool: 'models.ServicePool' = service.deployedServices.create(
|
||||
name='Service pool %d' % (glob['service_pool_id']),
|
||||
@ -109,14 +131,36 @@ def createOneCacheTestingUserService(
|
||||
comments='Comment for service pool %d' % (glob['service_pool_id']),
|
||||
osmanager=osmanager,
|
||||
)
|
||||
glob['service_pool_id'] += 1
|
||||
|
||||
for g in groups or []:
|
||||
service_pool.assignedGroups.add(g)
|
||||
|
||||
for t in transports or []:
|
||||
service_pool.transports.add(t)
|
||||
|
||||
if servicePoolGroup is not None:
|
||||
service_pool.servicesPoolGroup = servicePoolGroup
|
||||
|
||||
return service_pool
|
||||
|
||||
|
||||
def createPublication(
|
||||
service_pool: models.ServicePool,
|
||||
) -> models.ServicePoolPublication:
|
||||
publication: 'models.ServicePoolPublication' = service_pool.publications.create(
|
||||
publish_date=datetime.datetime.now(),
|
||||
state=states.publication.USABLE,
|
||||
state_date=datetime.datetime.now(),
|
||||
# Rest of fields are left as default
|
||||
)
|
||||
glob['service_pool_id'] += 1
|
||||
service_pool.publications.add(publication)
|
||||
|
||||
return publication
|
||||
|
||||
|
||||
def createTransport() -> models.Transport:
|
||||
from uds.transports.Test import TestTransport
|
||||
|
||||
values = {
|
||||
'testURL': 'http://www.udsenterprise.com',
|
||||
@ -131,14 +175,14 @@ def createOneCacheTestingUserService(
|
||||
).serialize(),
|
||||
)
|
||||
glob['transport_id'] += 1
|
||||
return transport
|
||||
|
||||
service_pool.publications.add(publication)
|
||||
for g in groups:
|
||||
service_pool.assignedGroups.add(g)
|
||||
service_pool.transports.add(transport)
|
||||
|
||||
service_pool.transports.add(transport)
|
||||
|
||||
def createUserService(
|
||||
service_pool: models.ServicePool,
|
||||
publication: models.ServicePoolPublication,
|
||||
user: models.User,
|
||||
) -> models.UserService:
|
||||
user_service: 'models.UserService' = service_pool.userServices.create(
|
||||
friendly_name='user-service-{}'.format(glob['user_service_id']),
|
||||
publication=publication,
|
||||
@ -151,10 +195,65 @@ def createOneCacheTestingUserService(
|
||||
src_hostname=generators.random_string(32),
|
||||
src_ip=generators.random_ip(),
|
||||
)
|
||||
|
||||
glob['user_service_id'] += 1
|
||||
return user_service
|
||||
|
||||
|
||||
def createMetaPool(
|
||||
service_pools: typing.List[models.ServicePool],
|
||||
groups: typing.List[models.Group],
|
||||
round_policy: int = models.MetaPool.ROUND_ROBIN_POOL,
|
||||
transport_grouping: int = models.MetaPool.AUTO_TRANSPORT_SELECT,
|
||||
ha_policy: int = models.MetaPool.HA_POLICY_ENABLED,
|
||||
) -> models.MetaPool:
|
||||
meta_pool: 'models.MetaPool' = models.MetaPool.objects.create(
|
||||
name='Meta pool %d' % (glob['meta_pool_id']),
|
||||
short_name='meta%d' % (glob['meta_pool_id']),
|
||||
comments='Comment for meta pool %d' % (glob['meta_pool_id']),
|
||||
policy=round_policy,
|
||||
transport_grouping=transport_grouping,
|
||||
ha_policy=ha_policy,
|
||||
)
|
||||
glob['meta_pool_id'] += 1
|
||||
|
||||
for g in groups:
|
||||
meta_pool.assignedGroups.add(g)
|
||||
|
||||
for priority, pool in enumerate(service_pools):
|
||||
meta_pool.members.create(pool=pool, priority=priority, enabled=True)
|
||||
|
||||
return meta_pool
|
||||
|
||||
|
||||
def createOneCacheTestingUserService(
|
||||
provider: 'models.Provider',
|
||||
user: 'models.User',
|
||||
groups: typing.List['models.Group'],
|
||||
type_: typing.Union[typing.Literal['managed'], typing.Literal['unmanaged']],
|
||||
) -> 'models.UserService':
|
||||
|
||||
from uds.services.Test.service import TestServiceCache, TestServiceNoCache
|
||||
from uds.osmanagers.Test import TestOSManager
|
||||
from uds.transports.Test import TestTransport
|
||||
|
||||
service = createService(provider)
|
||||
|
||||
"""
|
||||
Creates several testing OS Managers
|
||||
"""
|
||||
|
||||
osmanager: typing.Optional['models.OSManager'] = (
|
||||
None if type_ == 'unmanaged' else createOsManager()
|
||||
)
|
||||
transport: 'models.Transport' = createTransport()
|
||||
service_pool: 'models.ServicePool' = createServicePool(
|
||||
service, osmanager, groups, [transport]
|
||||
)
|
||||
publication: 'models.ServicePoolPublication' = createPublication(service_pool)
|
||||
|
||||
return createUserService(service_pool, publication, user)
|
||||
|
||||
|
||||
def createCacheTestingUserServices(
|
||||
count: int = 1,
|
||||
type_: typing.Literal['managed', 'unmanaged'] = 'managed',
|
||||
|
@ -32,9 +32,11 @@
|
||||
@author: Adolfo Gómez, dkmaster at dkmon dot com
|
||||
"""
|
||||
# We use commit/rollback
|
||||
import datetime
|
||||
import typing
|
||||
from unittest import mock
|
||||
|
||||
from uds import models
|
||||
from uds.web.util import services
|
||||
import uds.core.util.os_detector as osd
|
||||
|
||||
@ -44,26 +46,44 @@ from ...fixtures import services as fixtures_services
|
||||
|
||||
|
||||
class TestGetServicesData(UDSTransactionTestCase):
|
||||
def test_cache(self):
|
||||
request: mock.Mock
|
||||
auth: models.Authenticator
|
||||
groups: typing.List[models.Group]
|
||||
user: models.User
|
||||
|
||||
def setUp(self) -> None:
|
||||
# We need to create a user with some services
|
||||
auth = fixtures_authenticators.createAuthenticator()
|
||||
groups = fixtures_authenticators.createGroups(auth, 3)
|
||||
user = fixtures_authenticators.createUsers(auth, 1, groups=groups)[0]
|
||||
self.auth = fixtures_authenticators.createAuthenticator()
|
||||
self.groups = fixtures_authenticators.createGroups(self.auth, 3)
|
||||
self.user = fixtures_authenticators.createUsers(
|
||||
self.auth, 1, groups=self.groups
|
||||
)[0]
|
||||
|
||||
self.request = mock.Mock()
|
||||
self.request.user = self.user
|
||||
self.request.authorized = True
|
||||
self.request.session = {}
|
||||
self.request.ip = '127.0.0.1'
|
||||
self.request.ip_version = 4
|
||||
self.request.ip_proxy = '127.0.0.1'
|
||||
self.request.os = osd.DetectedOsInfo(
|
||||
osd.KnownOS.Linux, osd.KnownBrowser.Firefox, 'Windows 10'
|
||||
)
|
||||
|
||||
return super().setUp()
|
||||
|
||||
def test_get_services_data(self) -> None:
|
||||
# Create 10 services, for this user
|
||||
user_services: typing.List[models.ServicePool] = []
|
||||
for i in range(10):
|
||||
fixtures_services.createCacheTestingUserServices(count=1, user=user, groups=groups)
|
||||
user_services.append(
|
||||
fixtures_services.createCacheTestingUserServices(
|
||||
count=1, user=self.user, groups=self.groups
|
||||
)[0].deployed_service
|
||||
)
|
||||
|
||||
request = mock.Mock()
|
||||
request.user = user
|
||||
request.authorized = True
|
||||
request.session = {}
|
||||
request.ip = '127.0.0.1'
|
||||
request.ip_version = 4
|
||||
request.ip_proxy = '127.0.0.1'
|
||||
request.os = osd.DetectedOsInfo(osd.KnownOS.Linux, osd.KnownBrowser.Firefox, 'Windows 10')
|
||||
|
||||
data = services.getServicesData(request)
|
||||
data = services.getServicesData(self.request)
|
||||
now = datetime.datetime.now()
|
||||
# Will return this:
|
||||
# return {
|
||||
# 'services': services,
|
||||
@ -72,7 +92,10 @@ class TestGetServicesData(UDSTransactionTestCase):
|
||||
# 'transports': validTrans,
|
||||
# 'autorun': autorun,
|
||||
# }
|
||||
self.assertEqual(len(data['services']), 10)
|
||||
result_services: typing.Final[
|
||||
typing.List[typing.Mapping[str, typing.Any]]
|
||||
] = data['services']
|
||||
self.assertEqual(len(result_services), 10)
|
||||
self.assertEqual(data['ip'], '127.0.0.1')
|
||||
self.assertEqual(len(data['nets']), 0)
|
||||
self.assertEqual(len(data['transports']), 0)
|
||||
@ -99,6 +122,131 @@ class TestGetServicesData(UDSTransactionTestCase):
|
||||
# 'to_be_replaced_text': to_be_replaced_text,
|
||||
# 'custom_calendar_text': custom_calendar_text,
|
||||
# }
|
||||
for user_service in result_services:
|
||||
# Locate user service in user_services
|
||||
found: models.ServicePool = next(
|
||||
(x for x in user_services if x.uuid == user_service['id'][1:]),
|
||||
models.ServicePool(uuid='x'),
|
||||
)
|
||||
if found.uuid == 'x':
|
||||
self.fail('User service not found in user_services list')
|
||||
|
||||
self.assertEqual(user_service['is_meta'], False)
|
||||
self.assertEqual(user_service['name'], found.name)
|
||||
self.assertEqual(user_service['visual_name'], found.visual_name)
|
||||
self.assertEqual(user_service['description'], found.comments)
|
||||
self.assertEqual(
|
||||
user_service['group'], models.ServicePoolGroup.default().as_dict
|
||||
)
|
||||
self.assertEqual(
|
||||
[(i['id'], i['name']) for i in user_service['transports']],
|
||||
[(t.uuid, t.name) for t in found.transports.all()],
|
||||
)
|
||||
self.assertEqual(
|
||||
user_service['imageId'], found.image and found.image.uuid or 'x'
|
||||
)
|
||||
self.assertEqual(user_service['show_transports'], found.show_transports)
|
||||
self.assertEqual(
|
||||
user_service['allow_users_remove'], found.allow_users_remove
|
||||
)
|
||||
self.assertEqual(user_service['allow_users_reset'], found.allow_users_reset)
|
||||
self.assertEqual(
|
||||
user_service['maintenance'], found.service.provider.maintenance_mode
|
||||
)
|
||||
self.assertEqual(
|
||||
user_service['not_accesible'], not found.isAccessAllowed(now)
|
||||
)
|
||||
self.assertEqual(
|
||||
user_service['in_use'], found.userServices.filter(in_use=True).count()
|
||||
)
|
||||
self.assertEqual(user_service['to_be_replaced'], None)
|
||||
self.assertEqual(user_service['to_be_replaced_text'], '')
|
||||
self.assertEqual(user_service['custom_calendar_text'], '')
|
||||
|
||||
def test_get_meta_services_data(self) -> None:
|
||||
# Create 10 services, for this user
|
||||
user_services: typing.List[models.ServicePool] = []
|
||||
for i in range(100):
|
||||
user_services.append(
|
||||
fixtures_services.createCacheTestingUserServices(
|
||||
count=1, user=self.user, groups=self.groups
|
||||
)[0].deployed_service
|
||||
)
|
||||
|
||||
# Create 10 meta services, for this user
|
||||
meta_services: typing.List[models.MetaPool] = []
|
||||
for i in range(10):
|
||||
meta_services.append(
|
||||
fixtures_services.createMetaPool(
|
||||
service_pools=user_services[i * 10 : (i + 1) * 10], groups=self.groups
|
||||
)
|
||||
)
|
||||
|
||||
|
||||
data = services.getServicesData(self.request)
|
||||
now = datetime.datetime.now()
|
||||
|
||||
result_services: typing.Final[
|
||||
typing.List[typing.Mapping[str, typing.Any]]
|
||||
] = data['services']
|
||||
self.assertEqual(len(result_services), 10)
|
||||
self.assertEqual(data['ip'], '127.0.0.1')
|
||||
self.assertEqual(len(data['nets']), 0)
|
||||
self.assertEqual(len(data['transports']), 0)
|
||||
self.assertEqual(data['autorun'], 0)
|
||||
|
||||
for user_service in result_services:
|
||||
# Locate user service in user_services
|
||||
found: models.MetaPool = next(
|
||||
(x for x in meta_services if x.uuid == user_service['id'][1:]),
|
||||
models.MetaPool(uuid='x'),
|
||||
)
|
||||
if found.uuid == 'x':
|
||||
self.fail('User service not found in user_services list')
|
||||
|
||||
self.assertEqual(user_service['is_meta'], True)
|
||||
self.assertEqual(user_service['name'], found.name)
|
||||
self.assertEqual(user_service['visual_name'], found.visual_name)
|
||||
self.assertEqual(user_service['description'], found.comments)
|
||||
self.assertEqual(
|
||||
user_service['group'], models.ServicePoolGroup.default().as_dict
|
||||
)
|
||||
self.assertEqual(
|
||||
user_service['not_accesible'], not found.isAccessAllowed(now)
|
||||
)
|
||||
self.assertEqual(user_service['to_be_replaced'], None)
|
||||
self.assertEqual(user_service['to_be_replaced_text'], '')
|
||||
self.assertEqual(user_service['custom_calendar_text'], '')
|
||||
|
||||
def test_get_meta_and_not_services_data(self) -> None:
|
||||
# Create 10 services, for this user
|
||||
user_services: typing.List[models.ServicePool] = []
|
||||
for i in range(110):
|
||||
user_services.append(
|
||||
fixtures_services.createCacheTestingUserServices(
|
||||
count=1, user=self.user, groups=self.groups
|
||||
)[0].deployed_service
|
||||
)
|
||||
|
||||
# Create 10 meta services, for this user, last 10 user_services will not be added to meta pools
|
||||
meta_services: typing.List[models.MetaPool] = []
|
||||
for i in range(10):
|
||||
meta_services.append(
|
||||
fixtures_services.createMetaPool(
|
||||
service_pools=user_services[i * 10 : (i + 1) * 10], groups=self.groups
|
||||
)
|
||||
)
|
||||
|
||||
|
||||
data = services.getServicesData(self.request)
|
||||
now = datetime.datetime.now()
|
||||
|
||||
result_services: typing.Final[
|
||||
typing.List[typing.Mapping[str, typing.Any]]
|
||||
] = data['services']
|
||||
self.assertEqual(len(result_services), 20) # 10 metas and 10 normal pools
|
||||
# Some checks are ommited, because are already tested in other tests
|
||||
|
||||
self.assertEqual(len(list(filter(lambda x: x['is_meta'], result_services))), 10)
|
||||
self.assertEqual(len(list(filter(lambda x: not x['is_meta'], result_services))), 10)
|
||||
|
||||
|
@ -693,7 +693,7 @@ class ServicePool(UUIDModel, TaggingMixin): # type: ignore
|
||||
log.doLog(self, level, message, log.INTERNAL)
|
||||
|
||||
@staticmethod
|
||||
def beforeDelete(sender, **kwargs):
|
||||
def beforeDelete(sender, **kwargs) -> None:
|
||||
"""
|
||||
Used to invoke the Service class "Destroy" before deleting it from database.
|
||||
|
||||
|
@ -65,6 +65,7 @@ if typing.TYPE_CHECKING:
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def _serviceInfo(
|
||||
uuid: str,
|
||||
is_meta: bool,
|
||||
@ -278,10 +279,10 @@ def getServicesData(
|
||||
uuid=meta.uuid,
|
||||
is_meta=True,
|
||||
name=meta.name,
|
||||
visual_name= meta.visual_name,
|
||||
description= meta.comments,
|
||||
group= group,
|
||||
transports= metaTransports,
|
||||
visual_name=meta.visual_name,
|
||||
description=meta.comments,
|
||||
group=group,
|
||||
transports=metaTransports,
|
||||
image=meta.image,
|
||||
show_transports=len(metaTransports) > 1,
|
||||
allow_users_remove=meta.allow_users_remove,
|
||||
@ -350,7 +351,9 @@ def getServicesData(
|
||||
)
|
||||
# tbr = False
|
||||
if toBeReplacedDate:
|
||||
toBeReplaced = formats.date_format(toBeReplacedDate, 'SHORT_DATETIME_FORMAT')
|
||||
toBeReplaced = formats.date_format(
|
||||
toBeReplacedDate, 'SHORT_DATETIME_FORMAT'
|
||||
)
|
||||
toBeReplacedTxt = gettext(
|
||||
'This service is about to be replaced by a new version. Please, close the session before {} and save all your work to avoid loosing it.'
|
||||
).format(toBeReplacedDate)
|
||||
@ -372,7 +375,7 @@ def getServicesData(
|
||||
)
|
||||
|
||||
services.append(
|
||||
_serviceInfo(
|
||||
_serviceInfo(
|
||||
uuid=sPool.uuid,
|
||||
is_meta=False,
|
||||
name=datator(sPool.name),
|
||||
@ -394,7 +397,7 @@ def getServicesData(
|
||||
to_be_replaced=toBeReplaced,
|
||||
to_be_replaced_text=toBeReplacedTxt,
|
||||
custom_calendar_text=sPool.calendar_message,
|
||||
)
|
||||
)
|
||||
)
|
||||
|
||||
# logger.debug('Services: %s', services)
|
||||
|
Loading…
x
Reference in New Issue
Block a user