1
0
mirror of https://github.com/dkmstr/openuds.git synced 2025-02-02 09:47:13 +03:00

Adding more automated tests

This commit is contained in:
Adolfo Gómez García 2023-01-27 17:02:35 +01:00
parent aca3073eea
commit 990a5335d6
No known key found for this signature in database
GPG Key ID: DD1ABF20724CDA23
13 changed files with 649 additions and 50 deletions

View File

@ -4,5 +4,5 @@ python_files = tests.py test_*.py *_tests.py
# Do not look for classes (all test clasess are descendant of unittest.TestCase), some of which are named Test* but are not tests (e.g. TestProvider)
python_classes =
# If coverage is enabled, debug test will not work on vscode
#addopts = --cov --cov-report html --cov-config=coverage.ini -n 12
addopts = --cov --cov-report html --cov-config=coverage.ini -n 12
#addopts = --cov --cov-report html --cov-config=coverage.ini -n 12

View File

View File

@ -0,0 +1,52 @@
# -*- coding: utf-8 -*-
#
# Copyright (c) 2022 Virtual Cable S.L.U.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without modification,
# are permitted provided that the following conditions are met:
#
# * Redistributions of source code must retain the above copyright notice,
# this list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above copyright notice,
# this list of conditions and the following disclaimer in the documentation
# and/or other materials provided with the distribution.
# * Neither the name of Virtual Cable S.L.U. nor the names of its contributors
# may be used to endorse or promote products derived from this software
# without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
# DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
# SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
"""
@author: Adolfo Gómez, dkmaster at dkmon dot com
"""
import typing
import functools
import logging
from uds import models
from uds.core import VERSION
from uds.core.managers import cryptoManager
from ...utils import rest
logger = logging.getLogger(__name__)
class UsersTest(rest.test.RESTActorTestCase):
"""
Test users group rest api
"""
def setUp(self) -> None:
super().setUp()
self.login()

View File

@ -0,0 +1,239 @@
# -*- coding: utf-8 -*-
#
# Copyright (c) 2022 Virtual Cable S.L.U.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without modification,
# are permitted provided that the following conditions are met:
#
# * Redistributions of source code must retain the above copyright notice,
# this list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above copyright notice,
# this list of conditions and the following disclaimer in the documentation
# and/or other materials provided with the distribution.
# * Neither the name of Virtual Cable S.L.U. nor the names of its contributors
# may be used to endorse or promote products derived from this software
# without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
# DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
# SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
"""
@author: Adolfo Gómez, dkmaster at dkmon dot com
"""
import typing
import functools
import logging
from uds import models
from uds.REST.methods.users_groups import getPoolsForGroups
from uds.core import VERSION
from uds.core.managers import cryptoManager
from ...utils import rest
logger = logging.getLogger(__name__)
class UsersTest(rest.test.RESTActorTestCase):
"""
Test users group rest api
"""
def setUp(self) -> None:
super().setUp()
self.login()
def test_users(self) -> None:
url = f'authenticators/{self.auth.uuid}/users'
# Now, will work
response = self.client.rest_get(f'{url}/overview')
self.assertEqual(response.status_code, 200)
users = response.json()
self.assertEqual(
len(users), rest.test.NUMBER_OF_ITEMS_TO_CREATE * 3
) # 3 because will create admins, staff and plain users
# Ensure values are correct
user: typing.Mapping[str, typing.Any]
for user in users:
# Locate the user in the auth
number = int(user['name'][4:])
dbusr = self.auth.users.get(name=user['name'])
self.assertEqual(user['real_name'], dbusr.real_name)
self.assertEqual(user['comments'], dbusr.comments)
self.assertEqual(user['is_admin'], dbusr.is_admin)
self.assertEqual(user['staff_member'], dbusr.staff_member)
self.assertEqual(user['state'], dbusr.state)
self.assertEqual(user['id'], dbusr.uuid)
self.assertTrue(len(user['role']) > 0)
def test_users_tableinfo(self) -> None:
url = f'authenticators/{self.auth.uuid}/users/tableinfo'
# Now, will work
response = self.client.rest_get(url)
self.assertEqual(response.status_code, 200)
tableinfo = response.json()
self.assertIn('title', tableinfo)
self.assertIn('subtitle', tableinfo)
self.assertIn('fields', tableinfo)
self.assertIn('row-style', tableinfo)
# Ensure at least name, role, real_name comments, state and last_access are present on tableinfo['fields']
fields: typing.List[typing.Mapping[str, typing.Any]] = tableinfo['fields']
self.assertTrue(
functools.reduce(
lambda x, y: x and y,
map(
lambda f: next(iter(f.keys()))
in (
'name',
'role',
'real_name',
'comments',
'state',
'last_access',
),
fields,
),
)
)
def test_user(self) -> None:
url = f'authenticators/{self.auth.uuid}/users'
# Now, will work
for i in self.users:
response = self.client.rest_get(f'{url}/{i.uuid}')
self.assertEqual(response.status_code, 200)
user = response.json()
self.assertEqual(user['name'], i.name)
self.assertEqual(user['real_name'], i.real_name)
self.assertEqual(user['comments'], i.comments)
self.assertEqual(user['is_admin'], i.is_admin)
self.assertEqual(user['staff_member'], i.staff_member)
self.assertEqual(user['state'], i.state)
self.assertEqual(user['id'], i.uuid)
self.assertTrue(len(user['role']) > 0)
# invalid user
response = self.client.rest_get(f'{url}/invalid')
self.assertEqual(response.status_code, 404)
def test_users_log(self) -> None:
url = f'authenticators/{self.auth.uuid}/users/'
# Now, will work
for user in self.users:
response = self.client.rest_get(url + f'{user.uuid}/log')
self.assertEqual(response.status_code, 200)
self.assertEqual(len(response.json()), 4) # INFO, WARN, ERROR, DEBUG
# invalid user
response = self.client.rest_get(url + 'invalid/log')
self.assertEqual(response.status_code, 404)
def test_user_create_edit(self) -> None:
url = f'authenticators/{self.auth.uuid}/users'
user_dct: typing.Dict[str, typing.Any] = {
'name': 'test',
'real_name': 'test real name',
'comments': 'test comments',
'state': 'A',
'is_admin': True,
'staff_member': True,
'groups': [self.groups[0].uuid, self.groups[1].uuid],
}
# Now, will work
response = self.client.rest_put(
url,
user_dct,
content_type='application/json',
)
# Get user from database and ensure values are correct
dbusr = self.auth.users.get(name=user_dct['name'])
self.assertEqual(user_dct['name'], dbusr.name)
self.assertEqual(user_dct['real_name'], dbusr.real_name)
self.assertEqual(user_dct['comments'], dbusr.comments)
self.assertEqual(user_dct['is_admin'], dbusr.is_admin)
self.assertEqual(user_dct['staff_member'], dbusr.staff_member)
self.assertEqual(user_dct['state'], dbusr.state)
self.assertEqual(user_dct['groups'], [i.uuid for i in dbusr.groups.all()])
self.assertEqual(response.status_code, 200)
# Returns nothing
# Now, will fail because name is already in use
response = self.client.rest_put(
url,
user_dct,
content_type='application/json',
)
self.assertEqual(response.status_code, 400)
# Modify saved user
user_dct['name'] = 'test2'
user_dct['real_name'] = 'test real name 2'
user_dct['comments'] = 'test comments 2'
user_dct['state'] = 'D'
user_dct['is_admin'] = False
user_dct['staff_member'] = False
user_dct['groups'] = [self.groups[2].uuid]
user_dct['id'] = dbusr.uuid
user_dct['password'] = 'test' # nosec: test password
user_dct['mfa_data'] = 'mfadata'
response = self.client.rest_put(
url,
user_dct,
content_type='application/json',
)
self.assertEqual(response.status_code, 200)
# Get user from database and ensure values are correct
dbusr = self.auth.users.get(name=user_dct['name'])
self.assertEqual(user_dct['name'], dbusr.name)
self.assertEqual(user_dct['real_name'], dbusr.real_name)
self.assertEqual(user_dct['comments'], dbusr.comments)
self.assertEqual(user_dct['is_admin'], dbusr.is_admin)
self.assertEqual(user_dct['staff_member'], dbusr.staff_member)
self.assertEqual(user_dct['state'], dbusr.state)
self.assertEqual(user_dct['groups'], [i.uuid for i in dbusr.groups.all()])
self.assertEqual(cryptoManager().checkHash(user_dct['password'], dbusr.password), True)
def test_user_delete(self) -> None:
url = f'authenticators/{self.auth.uuid}/users'
# Now, will work
response = self.client.rest_delete(url + f'/{self.plain_users[0].uuid}')
self.assertEqual(response.status_code, 200)
# Returns nothing
# Now, will fail because user does not exist
response = self.client.rest_delete(url + f'/{self.plain_users[0].uuid}')
self.assertEqual(response.status_code, 404)
def test_user_userservices_and_servicepools(self) -> None:
url = f'authenticators/{self.auth.uuid}/users/{self.plain_users[0].uuid}/userServices'
# Now, will work
response = self.client.rest_get(url)
self.assertEqual(response.status_code, 200)
self.assertEqual(len(response.json()), 2)
# Same with service pools
url = f'authenticators/{self.auth.uuid}/users/{self.plain_users[0].uuid}/servicesPools'
response = self.client.rest_get(url)
self.assertEqual(response.status_code, 200)
groups = self.plain_users[0].groups.all()
count = len(list(models.ServicePool.getDeployedServicesForGroups(groups)))
self.assertEqual(len(response.json()), count)

View File

@ -45,7 +45,7 @@ logger = logging.getLogger(__name__)
class SystemTest(rest.test.RESTTestCase):
def test_overview(self):
# If not logged in, will fail
response = self.client.get('/uds/rest/system/overview')
response = self.client.rest_get('system/overview')
self.assertEqual(response.status_code, 403)
# Login as admin
@ -59,11 +59,12 @@ class SystemTest(rest.test.RESTTestCase):
# rest.test.NUMBER_OF_ITEMS_TO_CREATE groups
# 2 services (1 managed, 1 unmanaged), 2 service_pools (1 for each service), 2 user_services (1 for each service pool)
# no meta_pools, and no restrained_services_pools
self.assertEqual(json['users'], rest.test.NUMBER_OF_ITEMS_TO_CREATE * 3)
self.assertEqual(json['users'], rest.test.NUMBER_OF_ITEMS_TO_CREATE * 3) # 3 because will create admins, staff and plain users
self.assertEqual(json['groups'], rest.test.NUMBER_OF_ITEMS_TO_CREATE)
self.assertEqual(json['services'], 2)
self.assertEqual(json['service_pools'], 2)
self.assertEqual(json['user_services'], 2)
count = len(self.user_services) + 2
self.assertEqual(json['services'], count)
self.assertEqual(json['service_pools'], count)
self.assertEqual(json['user_services'], count)
self.assertEqual(json['meta_pools'], 0)
self.assertEqual(json['restrained_services_pools'], 0)

View File

@ -0,0 +1,155 @@
# -*- coding: utf-8 -*-
#
# Copyright (c) 2022 Virtual Cable S.L.U.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without modification,
# are permitted provided that the following conditions are met:
#
# * Redistributions of source code must retain the above copyright notice,
# this list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above copyright notice,
# this list of conditions and the following disclaimer in the documentation
# and/or other materials provided with the distribution.
# * Neither the name of Virtual Cable S.L.U. nor the names of its contributors
# may be used to endorse or promote products derived from this software
# without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
# DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
# SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
"""
@author: Adolfo Gómez, dkmaster at dkmon dot com
"""
import io
import typing
import logging
from unittest.mock import Mock
from uds.core.reports import graphs
from ...utils.test import UDSTestCase
logger = logging.getLogger(__name__)
class GraphsTest(UDSTestCase):
data1: typing.Dict[str, typing.Any]
data2: typing.Dict[str, typing.Any]
def setUp(self):
# Data must be a dict with the following keys:
# - x: List of x values
# - y: List of dicts with the following keys:
# - data: List of y values
# - label: Label for the bar
# - title: Title of the chart
# - xlabel: Label for x axis
# - ylabel: Label for y axis
# - allTicks: If True, all x values will be shown as ticks
# - xtickFnc: Function to be used to format x ticks labels
self.data1 = {
'x': [1, 2, 3],
'y': [
{
'data': [1, 2, 3],
'label': 'Test',
},
{
'data': [1, 2, 3],
'label': 'Test2',
},
{
'data': [1, 2, 3],
'label': 'Test3',
}
],
'title': 'Test',
'xlabel': 'X',
'ylabel': 'Y',
'allTicks': True,
'xtickFnc': Mock()
}
# Data must be a dict with the following keys:
# - x: List of x values
# - y: List of y values
# - z: List of z values (must be a list of lists)
# - title: Title of the chart
# - xlabel: Label for x axis
# - ylabel: Label for y axis
# - zlabel: Label for z axis
# - allTicks: If True, all x values will be shown as ticks
# - xtickFnc: Function to be used to format x ticks labels from x ticks
# - ytickFnc: Function to be used to format y ticks labels form y ticks
self.data2 = {
'x': [1, 2, 3],
'y': [1, 2, 3],
'z': [
[1, 2, 3],
[1, 2, 3],
[1, 2, 3]
],
'title': 'Test',
'xlabel': 'X',
'ylabel': 'Y',
'zlabel': 'Z',
'allTicks': True,
'xtickFnc': Mock(),
'ytickFnc': Mock()
}
def test_bar_chart(self):
output = io.BytesIO()
graphs.barChart((10, 8, 96), data=self.data1, output=output)
value = output.getvalue()
self.assertGreater(len(value), 0)
self.assertEqual(self.data1['xtickFnc'].call_count, 3)
# Save to /tmp so we can check it
with open('/tmp/bar.png', 'wb') as f: # nosec: this is a test, we are not using a real file
f.write(value)
def test_line_chart(self):
output = io.BytesIO()
graphs.lineChart((10, 8, 96), data=self.data1, output=output)
value = output.getvalue()
self.assertGreater(len(value), 0)
self.assertEqual(self.data1['xtickFnc'].call_count, 3)
# Save to /tmp so we can check it
with open('/tmp/line.png', 'wb') as f: # nosec: this is a test, we are not using a real file
f.write(value)
def test_surface_chart(self):
output = io.BytesIO()
graphs.surfaceChart((10, 8, 96), data=self.data2, output=output)
value = output.getvalue()
self.assertGreater(len(value), 0)
self.assertEqual(self.data2['xtickFnc'].call_count, 3)
self.assertEqual(self.data2['ytickFnc'].call_count, 3)
# Save to /tmp so we can check it
with open('/tmp/surface.png', 'wb') as f: # nosec: this is a test, we are not using a real file
f.write(value)
def test_surface_chart_wireframe(self):
self.data2['wireframe'] = True
output = io.BytesIO()
graphs.surfaceChart((10, 8, 96), data=self.data2, output=output)
value = output.getvalue()
self.assertGreater(len(value), 0)
self.assertEqual(self.data2['xtickFnc'].call_count, 3)
self.assertEqual(self.data2['ytickFnc'].call_count, 3)
# Save to /tmp so we can check it
with open('/tmp/surface-wireframe.png', 'wb') as f: # nosec: this is a test, we are not using a real file
f.write(value)

View File

@ -41,7 +41,7 @@ logger = logging.getLogger(__name__)
class NetTest(UDSTestCase):
def testNetworkFromStringIPv4(self):
def testNetworkFromStringIPv4(self) -> None:
for n in (
('*', 0, 4294967295),
('192.168.0.1', 3232235521, 3232235521),
@ -91,32 +91,32 @@ class NetTest(UDSTestCase):
'Incorrect network end value for {0}'.format(n[0]),
)
for n in ('192.168.0', '192.168.0.5-192.168.0.3', 'no net'):
for n1 in ('192.168.0', '192.168.0.5-192.168.0.3', 'no net'):
with self.assertRaises(ValueError):
net.networksFromString(n)
net.networksFromString(n1)
self.assertEqual(net.ipToLong('192.168.0.5').ip, 3232235525)
self.assertEqual(net.longToIp(3232235525, 4), '192.168.0.5')
for n in range(0, 255):
self.assertTrue(net.contains('192.168.0.0/24', '192.168.0.{}'.format(n)))
for n2 in range(0, 255):
self.assertTrue(net.contains('192.168.0.0/24', '192.168.0.{}'.format(n2)))
for n in range(4294):
for n3 in range(4294):
self.assertTrue(
net.contains([net.NetworkType(0, 4294967295, 4)], n * 1000)
net.contains([net.NetworkType(0, 4294967295, 4)], n3 * 1000)
)
self.assertTrue(
net.contains(net.NetworkType(0, 4294967295, 4), n * 1000)
net.contains(net.NetworkType(0, 4294967295, 4), n3 * 1000)
)
# Test some ip conversions from long to ip and viceversa
for n in ('172', '192', '10'):
for n2 in range(0, 256, 17):
for n3 in range(0, 256, 13):
for n4 in range(0, 256, 11):
ip = '{0}.{1}.{2}.{3}'.format(n, n2, n3, n4)
for n4 in ('172', '192', '10'):
for n5 in range(0, 256, 17):
for n6 in range(0, 256, 13):
for n7 in range(0, 256, 11):
ip = '{0}.{1}.{2}.{3}'.format(n4, n5, n6, n7)
self.assertEqual(net.longToIp(net.ipToLong(ip).ip, 4), ip)
def testNetworkFromStringIPv6(self):
def testNetworkFromStringIPv6(self) -> None:
# IPv6 only support standard notation, and '*', but not "netmask" or "range"
for n in (
(
@ -189,7 +189,7 @@ class NetTest(UDSTestCase):
)
# iterate some ipv6 addresses
for n in (
for n6 in (
'2001:db8::1',
'2001:1::1',
'2001:2:3::1',
@ -198,4 +198,4 @@ class NetTest(UDSTestCase):
'2001:2:3:4:5:6:0:1',
):
# Ensure converting back to string ips works
self.assertEqual(net.longToIp(net.ipToLong(n).ip, 6), n)
self.assertEqual(net.longToIp(net.ipToLong(n6).ip, 6), n6)

View File

@ -28,11 +28,11 @@
"""
@author: Adolfo Gómez, dkmaster at dkmon dot com
"""
import logging
import typing
from uds import models
from uds.core.util import log
from .. import test, generators, rest, constants
from ...fixtures import (
@ -53,9 +53,14 @@ class RESTTestCase(test.UDSTransactionTestCase):
staffs: typing.List[models.User]
plain_users: typing.List[models.User]
users = property(lambda self: self.admins + self.staffs + self.plain_users)
provider: models.Provider
user_service_managed: models.UserService
user_service_unmanaged: models.UserService
user_services: typing.List[models.UserService]
def setUp(self) -> None:
# Set up data for REST Test cases
# First, the authenticator related
@ -80,15 +85,19 @@ class RESTTestCase(test.UDSTransactionTestCase):
self.auth, number_of_users=NUMBER_OF_ITEMS_TO_CREATE, groups=self.groups
)
for user in self.users:
log.doLog(user, log.DEBUG, f'Debug Log for {user.name}')
log.doLog(user, log.INFO, f'Info Log for {user.name}')
log.doLog(user, log.WARNING, f'Warning Log for {user.name}')
log.doLog(user, log.ERROR, f'Error Log for {user.name}')
self.provider = services_fixtures.createProvider()
self.user_service_managed = (
services_fixtures.createOneCacheTestingUserService(
self.provider,
self.admins[0],
self.groups,
'managed',
)
self.user_service_managed = services_fixtures.createOneCacheTestingUserService(
self.provider,
self.admins[0],
self.groups,
'managed',
)
self.user_service_unmanaged = (
services_fixtures.createOneCacheTestingUserService(
@ -99,6 +108,19 @@ class RESTTestCase(test.UDSTransactionTestCase):
)
)
self.user_services = []
for user in self.users:
self.user_services.append(
services_fixtures.createOneCacheTestingUserService(
self.provider, user, self.groups, 'managed'
)
)
self.user_services.append(
services_fixtures.createOneCacheTestingUserService(
self.provider, user, self.groups, 'unmanaged'
)
)
def login(
self, user: typing.Optional[models.User] = None, as_admin: bool = True
) -> str:
@ -121,7 +143,7 @@ class RESTTestCase(test.UDSTransactionTestCase):
class RESTActorTestCase(RESTTestCase):
# Login as admin or staff and register an actor
# Returns as a tuple the auth token and the actor registration result token:
# - The login auth token
@ -138,7 +160,9 @@ class RESTActorTestCase(RESTTestCase):
self.assertEqual(response.status_code, 200, 'Actor registration failed')
return token, response.json()['result']
def register_data(self, chars: typing.Optional[str] = None) -> typing.Dict[str, str]:
def register_data(
self, chars: typing.Optional[str] = None
) -> typing.Dict[str, str]:
# Data for registration
return {
'username': generators.random_string(size=12, chars=chars)

View File

@ -41,6 +41,8 @@ from uds.core.managers.crypto import CryptoManager
logger = logging.getLogger(__name__)
REST_PATH = '/uds/rest/'
class UDSHttpResponse(HttpResponse):
"""
@ -95,14 +97,17 @@ class UDSClientMixin:
kwargs['REMOTE_ADDR'] = '127.0.0.1'
elif self.ip_version == 6:
kwargs['REMOTE_ADDR'] = '::1'
def compose_rest_url(self, method: str) -> str:
return f'{REST_PATH}/{method}'
class UDSClient(UDSClientMixin, Client):
def __init__(
self,
enforce_csrf_checks: bool = False,
raise_request_exception: bool = True,
**defaults: typing.Any
**defaults: typing.Any,
):
UDSClientMixin.initialize(self)
@ -123,17 +128,41 @@ class UDSClient(UDSClientMixin, Client):
self.append_remote_addr(kwargs)
return typing.cast('UDSHttpResponse', super().get(*args, **kwargs))
def rest_get(self, method: str, *args, **kwargs) -> 'UDSHttpResponse':
# compose url
return self.get(self.compose_rest_url(method), *args, **kwargs)
def post(self, *args, **kwargs) -> 'UDSHttpResponse':
self.append_remote_addr(kwargs)
return typing.cast('UDSHttpResponse', super().post(*args, **kwargs))
def rest_post(self, method: str, *args, **kwargs) -> 'UDSHttpResponse':
# compose url
return self.post(self.compose_rest_url(method), *args, **kwargs)
def put(self, *args, **kwargs) -> 'UDSHttpResponse':
self.append_remote_addr(kwargs)
return typing.cast('UDSHttpResponse', super().put(*args, **kwargs))
def rest_put(self, method: str, *args, **kwargs) -> 'UDSHttpResponse':
# compose url
return self.put(self.compose_rest_url(method), *args, **kwargs)
def delete(self, *args, **kwargs) -> 'UDSHttpResponse':
self.append_remote_addr(kwargs)
return typing.cast('UDSHttpResponse', super().delete(*args, **kwargs))
def rest_delete(self, method: str, *args, **kwargs) -> 'UDSHttpResponse':
# compose url
return self.delete(self.compose_rest_url(method), *args, **kwargs)
class UDSAsyncClient(UDSClientMixin, AsyncClient):
def __init__(
self,
enforce_csrf_checks: bool = False,
raise_request_exception: bool = True,
**defaults: typing.Any
**defaults: typing.Any,
):
UDSClientMixin.initialize(self)
@ -154,10 +183,35 @@ class UDSAsyncClient(UDSClientMixin, AsyncClient):
self.append_remote_addr(kwargs)
return typing.cast('UDSHttpResponse', await super().get(*args, **kwargs))
async def rest_get(self, method: str, *args, **kwargs) -> 'UDSHttpResponse':
# compose url
return await self.get(self.compose_rest_url(method), *args, **kwargs)
async def post(self, *args, **kwargs) -> 'UDSHttpResponse':
self.append_remote_addr(kwargs)
return typing.cast('UDSHttpResponse', await super().post(*args, **kwargs))
async def rest_post(self, method: str, *args, **kwargs) -> 'UDSHttpResponse':
# compose url
return await self.post(self.compose_rest_url(method), *args, **kwargs)
async def put(self, *args, **kwargs) -> 'UDSHttpResponse':
self.append_remote_addr(kwargs)
return typing.cast('UDSHttpResponse', await super().put(*args, **kwargs))
async def rest_put(self, method: str, *args, **kwargs) -> 'UDSHttpResponse':
# compose url
return await self.put(self.compose_rest_url(method), *args, **kwargs)
async def delete(self, *args, **kwargs) -> 'UDSHttpResponse':
self.append_remote_addr(kwargs)
return typing.cast('UDSHttpResponse', await super().delete(*args, **kwargs))
async def rest_delete(self, method: str, *args, **kwargs) -> 'UDSHttpResponse':
# compose url
return await self.delete(self.compose_rest_url(method), *args, **kwargs)
class UDSTestCaseMixin:
client_class: typing.Type = UDSClient
async_client_class: typing.Type = UDSAsyncClient
@ -178,8 +232,8 @@ class UDSTestCaseMixin:
except ValueError:
pass # Not present
class UDSTestCase(UDSTestCaseMixin, TestCase):
class UDSTestCase(UDSTestCaseMixin, TestCase):
@classmethod
def setUpClass(cls) -> None:
super().setUpClass()
@ -187,7 +241,6 @@ class UDSTestCase(UDSTestCaseMixin, TestCase):
class UDSTransactionTestCase(UDSTestCaseMixin, TransactionTestCase):
@classmethod
def setUpClass(cls) -> None:
super().setUpClass()

View File

@ -129,7 +129,7 @@ class TestGetServicesData(UDSTransactionTestCase):
models.ServicePool(uuid='x'),
)
if found.uuid == 'x':
self.fail('User service not found in user_services list')
self.fail('Pool not found in user_services list')
self.assertEqual(user_service['is_meta'], False)
self.assertEqual(user_service['name'], found.name)
@ -202,7 +202,7 @@ class TestGetServicesData(UDSTransactionTestCase):
models.MetaPool(uuid='x'),
)
if found.uuid == 'x':
self.fail('User service not found in user_services list')
self.fail('Meta pool not found in user_services list')
self.assertEqual(user_service['is_meta'], True)
self.assertEqual(user_service['name'], found.name)

View File

@ -181,8 +181,11 @@ class Handler:
if self.needs_staff and not self.is_staff_member():
raise AccessDenied()
self._user = self.getUser()
try:
self._user = self.getUser()
except Exception as e:
# Maybe the user was deleted, so access is denied
raise AccessDenied() from e
else:
self._user = User() # Empty user for non authenticated handlers

View File

@ -32,6 +32,7 @@
"""
import logging
import io
import typing
from matplotlib.backends.backend_agg import FigureCanvasAgg as FigureCanvas
@ -39,7 +40,7 @@ from matplotlib.figure import Figure
from matplotlib import cm
# This must be imported to allow 3d projections
from mpl_toolkits.mplot3d import Axes3D # pylint: disable=unused-import
from mpl_toolkits.mplot3d.axes3d import Axes3D # pylint: disable=unused-import
import numpy as np
@ -47,8 +48,31 @@ logger = logging.getLogger(__name__)
def barChart(
size: typing.Tuple[float, float, int], data: typing.Mapping, output: typing.BinaryIO
size: typing.Tuple[float, float, int], data: typing.Mapping[str, typing.Any], output: io.BytesIO
) -> None:
"""
Generates a bar chart and stores it on output
Args:
size: Size of the chart (width, height, dpi) in inches
data: Data to be used to generate the chart
output: Output stream to store the chart
Notes:
Data must be a dict with the following keys:
- x: List of x values
- y: List of dicts with the following keys:
- data: List of y values
- label: Label for the bar
- title: Title of the chart
- xlabel: Label for x axis
- ylabel: Label for y axis
- allTicks: If True, all x values will be shown as ticks
- xtickFnc: Function to be used to format x ticks labels
Returns:
None, writes the chart on output as png
"""
d = data['x']
ind = np.arange(len(d))
ys = data['y']
@ -72,7 +96,7 @@ def barChart(
if data.get('allTicks', True):
axis.set_xticks(ind)
if 'xtickFnc' in data:
if 'xtickFnc' in data and data['xtickFnc']:
axis.set_xticklabels([data['xtickFnc'](v) for v in axis.get_xticks()])
axis.legend()
@ -81,8 +105,31 @@ def barChart(
def lineChart(
size: typing.Tuple[float, float, int], data: typing.Mapping, output: typing.BinaryIO
size: typing.Tuple[float, float, int], data: typing.Mapping[str, typing.Any], output: io.BytesIO
) -> None:
"""
Generates a line chart and stores it on output
Args:
size: Size of the chart (width, height, dpi) in inches
data: Data to be used to generate the chart
output: Output stream to store the chart
Notes:
Data must be a dict with the following keys:
- x: List of x valuesç
- y: List of dicts with the following keys:
- data: List of y values
- label: Label for the line
- title: Title of the chart
- xlabel: Label for x axis
- ylabel: Label for y axis
- allTicks: If True, all x values will be shown as ticks
- xtickFnc: Function to be used to format x ticks labels
Returns:
None, writes the chart on output as png
"""
x = data['x']
y = data['y']
@ -104,7 +151,7 @@ def lineChart(
if data.get('allTicks', True):
axis.set_xticks(x)
if 'xtickFnc' in data:
if 'xtickFnc' in data and data['xtickFnc']:
axis.set_xticklabels([data['xtickFnc'](v) for v in axis.get_xticks()])
axis.legend()
@ -113,8 +160,33 @@ def lineChart(
def surfaceChart(
size: typing.Tuple[float, float, int], data: typing.Mapping, output: typing.BinaryIO
size: typing.Tuple[float, float, int], data: typing.Mapping[str, typing.Any], output: io.BytesIO
) -> None:
"""
Generates a surface chart and stores it on output
Args:
size: Size of the chart (width, height, dpi) in inches
data: Data to be used to generate the chart
output: Output stream to store the chart
Notes:
Data must be a dict with the following keys:
- x: List of x values
- y: List of y values
- z: List of z values, must be a bidimensional list
- wireframe: If True, a wireframe will be shown
- title: Title of the chart
- xlabel: Label for x axis
- ylabel: Label for y axis
- zlabel: Label for z axis
- allTicks: If True, all x values will be shown as ticks
- xtickFnc: Function to be used to format x ticks labels from x ticks
- ytickFnc: Function to be used to format y ticks labels form y ticks
Returns:
None, writes the chart on output as png
"""
x = data['x']
y = data['y']
z = data['z']
@ -133,7 +205,7 @@ def surfaceChart(
fig: Figure = Figure(figsize=(size[0], size[1]), dpi=size[2]) # type: ignore
FigureCanvas(fig) # Stores canvas on fig.canvas
axis = fig.add_subplot(111, projection='3d')
axis: typing.Any = fig.add_subplot(111, projection='3d')
# axis.grid(color='r', linestyle='dotted', linewidth=0.1, alpha=0.5)
if data.get('wireframe', False):
@ -154,9 +226,9 @@ def surfaceChart(
axis.set_xticks(data['x'])
axis.set_yticks(data['y'])
if 'xtickFnc' in data:
if 'xtickFnc' in data and data['xtickFnc']:
axis.set_xticklabels([data['xtickFnc'](v) for v in axis.get_xticks()])
if 'ytickFnc' in data:
if 'xtickFnc' in data and data['ytickFnc']:
axis.set_yticklabels([data['ytickFnc'](v) for v in axis.get_yticks()])
fig.savefig(output, format='png', transparent=True)