1
0
mirror of https://github.com/dkmstr/openuds.git synced 2025-01-07 17:17:55 +03:00

fixed a bit the tests and tests tools

This commit is contained in:
Adolfo Gómez García 2022-08-16 21:40:46 +02:00
parent 195d2a1336
commit f274ae2fe3
8 changed files with 192 additions and 54 deletions

View File

@ -28,33 +28,95 @@
"""
@author: Adolfo Gómez, dkmaster at dkmon dot com
"""
import random
import typing
import logging
from django.test import TestCase
from django.test.client import Client
from django.conf import settings
from uds.REST.handlers import AUTH_TOKEN_HEADER
from .. import fixtures, tools
class RESTLoginLogoutCase(TestCase):
logger = logging.getLogger(__name__)
class RESTActorCase(TestCase):
"""
Test login and logout
Test actor functionality
"""
def setUp(self):
def setUp(self) -> None:
self.client = tools.getClient()
def test_register_actor(self):
def test_register_actor(self) -> None:
"""
Test actor rest api registration
"""
def data(chars: typing.Optional[str] = None) -> typing.Dict[str, str]:
# Data for registration
return {
'username': tools.random_string_generator(size=12, chars=chars)
+ '@AUTH'
+ tools.random_string_generator(size=12, chars=chars),
'hostname': tools.random_string_generator(size=48, chars=chars),
'ip': tools.random_ip_generator(),
'mac': tools.random_mac_generator(),
'pre_command': tools.random_string_generator(size=64, chars=chars),
'run_once_command': tools.random_string_generator(size=64, chars=chars),
'post_command': tools.random_string_generator(size=64, chars=chars),
'log_level': '0',
}
# Create three users, one admin, one staff and one user
auth = fixtures.authenticators.createAuthenticator()
groups = fixtures.authenticators.createGroups(auth, 1)
admin = fixtures.authenticators.createUsers(auth, number_of_users=2, is_admin=True, groups=groups)
staff = fixtures.authenticators.createUsers(auth, number_of_users=2, is_staff=True, groups=groups)
plain_user = fixtures.authenticators.createUsers(auth, number_of_users=2, groups=groups)
response: typing.Any
for i, usr in enumerate(admin + staff + plain_user):
response = tools.rest_login(
self,
self.client,
auth_id=auth.uuid,
username=usr.name,
password=usr.name
)
self.assertEqual(
response['result'], 'ok', 'Login user {}'.format(usr.name)
)
token = response['token']
# Try to register. Plain users will fail
will_fail = usr in plain_user
response = self.client.post(
'/uds/rest/actor/v3/register',
data=data(tools.STRING_CHARS if i%2 == 0 else tools.STRING_CHARS_INVALID),
content_type='application/json',
**{AUTH_TOKEN_HEADER: token}
)
if will_fail:
self.assertEqual(response.status_code, 403)
logger.debug('Response: %s', response)
def initialize_actor(self):
"""
Test actor rest api registration
"""
provider = fixtures.services.createProvider()
# Create some random services of all kinds
services = fixtures.services.createServices(provider, number_of_services=2, type_of_service=1)
services = services + fixtures.services.createServices(provider, number_of_services=2, type_of_service=2)
services = fixtures.services.createServices(
provider, number_of_services=2, type_of_service=1
)
services = services + fixtures.services.createServices(
provider, number_of_services=2, type_of_service=2
)
print(provider)
print(services)
print(services)

View File

@ -34,7 +34,6 @@ import typing
from django.test import TestCase
from django.test.client import Client
from django.conf import settings
from .. import fixtures, tools
@ -44,6 +43,8 @@ class RESTLoginLogoutCase(TestCase):
Test login and logout
"""
client: Client
def setUp(self):
self.client = tools.getClient()
@ -56,7 +57,7 @@ class RESTLoginLogoutCase(TestCase):
admins = fixtures.authenticators.createUsers(
auth, number_of_users=8, is_admin=True
)
stafs = fixtures.authenticators.createUsers(
staffs = fixtures.authenticators.createUsers(
auth, number_of_users=8, is_staff=True
)
users = fixtures.authenticators.createUsers(auth, number_of_users=8)
@ -65,13 +66,16 @@ class RESTLoginLogoutCase(TestCase):
groups = fixtures.authenticators.createGroups(auth, number_of_groups=32)
# Add users to some groups, ramdomly
for user in users + admins + stafs:
for group in random.sample(groups, random.randint(1, len(groups))): # nosec: Simple test, not strong cryptograde needed
for user in users + admins + staffs:
for group in random.sample(
groups, random.randint(1, len(groups)) # nosec: not used with cryptographic pourposes just for testing
): # nosec: Simple test, not strong cryptograde needed
user.groups.add(group)
# All users, admin and staff must be able to login
for user in users + admins + stafs:
response = self.invokeLogin(auth.uuid, user.name, user.name, 200, 'user')
for user in users + admins + staffs:
# Valid
response = tools.rest_login(self, self.client, auth.uuid, user.name, user.name, 200, 'user')
self.assertEqual(
response['result'], 'ok', 'Login user {}'.format(user.name)
)
@ -80,23 +84,10 @@ class RESTLoginLogoutCase(TestCase):
self.assertIsNotNone(
response['scrambler'], 'Login user {}'.format(user.name)
)
tools.rest_logout(self, self.client, response['token'])
def invokeLogin(
self, auth_id: str, username: str, password: str, expectedResponse, what: str
) -> typing.Mapping[str, typing.Any]:
response = self.client.post(
'/uds/rest/auth/login',
{
'auth_id': auth_id,
'username': username,
'password': password,
},
content_type='application/json',
)
# Login with invalid creds just for a single user, because server will "block" us for a while
response = tools.rest_login(self, self.client, auth.uuid, 'invalid', 'invalid', 200, 'user')
self.assertEqual(
response.status_code, expectedResponse, 'Login {}'.format(what)
response['result'], 'error', 'Login user invalid'
)
if response.status_code == 200:
return response.json()
return {}

View File

@ -65,6 +65,7 @@ def createUsers(
is_staff: bool = False,
is_admin: bool = False,
enabled: bool = True,
groups: typing.Optional[typing.List[models.Group]] = None,
) -> typing.List[models.User]:
"""
Creates some ramdon users
@ -84,6 +85,12 @@ def createUsers(
]
glob['user_id'] += number_of_users
# If groups are given, add them to the users
if groups:
for user in users:
for group in groups:
user.groups.add(group)
return users

View File

@ -30,12 +30,24 @@
"""
import logging
import string
import random
import typing
from django.test import SimpleTestCase
from django.test.client import Client
from django.conf import settings
from uds.core.managers.crypto import CryptoManager
from uds.REST.handlers import AUTH_TOKEN_HEADER
# constants
# String chars to use in random strings
STRING_CHARS = string.ascii_letters + string.digits + '_'
# Invalid string chars
STRING_CHARS_INVALID = '!@#$%^&*()_+=-[]{}|;\':",./<>? '
# String chars with invalid chars to use in random strings
STRING_CHARS_WITH_INVALID = STRING_CHARS + STRING_CHARS_INVALID
def getClient() -> Client:
@ -49,19 +61,72 @@ def getClient() -> Client:
'uds.core.util.middleware.request.GlobalRequestMiddleware',
]
client = Client()
# Instantiate the client and add basic user agent
client = Client(HTTP_USER_AGENT='Testing user agent')
# and required UDS cookie
client.cookies['uds'] = CryptoManager().randomString(48)
# Patch the client to include
# HTTP_USER_AGENT='Testing user agent',
# GET, POST, PUT, DELETE methods
_oldRequest = client.request
return client
def _request(**kwargs):
if 'HTTP_USER_AGENT' not in kwargs:
kwargs['HTTP_USER_AGENT'] = 'Testing user agent'
return _oldRequest(**kwargs) # type: ignore
client.request = _request
return client
# Calls REST login
def rest_login(
caller: SimpleTestCase,
client: Client,
auth_id: str,
username: str,
password: str,
expectedResponseCode: int = 200,
errorMessage: typing.Optional[str] = None,
) -> typing.Mapping[str, typing.Any]:
response = client.post(
'/uds/rest/auth/login',
{
'auth_id': auth_id,
'username': username,
'password': password,
},
content_type='application/json',
)
caller.assertEqual(
response.status_code,
expectedResponseCode,
'Login from {}'.format(errorMessage or caller.__class__.__name__),
)
if response.status_code == 200:
return response.json()
return {}
def rest_logout(caller: SimpleTestCase, client: Client, auth_token: str) -> None:
response = client.get(
'/uds/rest/auth/logout',
content_type='application/json',
**{AUTH_TOKEN_HEADER: auth_token}
)
caller.assertEqual(response.status_code, 200, 'Logout')
caller.assertEqual(response.json(), {'result': 'ok'}, 'Logout')
def random_string_generator(size: int = 6, chars: typing.Optional[str] = None) -> str:
chars = chars or STRING_CHARS
return ''.join(
random.choice(chars) # nosec: Not used for cryptography, just for testing
for _ in range(size)
)
def random_ip_generator() -> str:
return '.'.join(
str(
random.randint(0, 255) # nosec: Not used for cryptography, just for testing
)
for _ in range(4)
)
def random_mac_generator() -> str:
return ':'.join(random_string_generator(2, '0123456789ABCDEF') for _ in range(6))

View File

@ -49,7 +49,7 @@ if typing.TYPE_CHECKING:
logger = logging.getLogger(__name__)
AUTH_TOKEN_HEADER = 'HTTP_X_AUTH_TOKEN'
AUTH_TOKEN_HEADER = 'HTTP_X_AUTH_TOKEN' # nosec: this is not a password
class HandlerError(Exception):

View File

@ -161,6 +161,16 @@ class Test(ActorV3Action):
class Register(ActorV3Action):
"""
Registers an actor
parameters:
- mac: mac address of the registering machine
- ip: ip address of the registering machine
- hostname: hostname of the registering machine
- pre_command: command to be executed before the connection of the user is established
- post_command: command to be executed after the actor is initialized and before set ready
- run_once_command: comand to run just once after the actor is started. The actor will stop after this.
The command is responsible to restart the actor.
- log_level: log level for the actor
"""
authenticated = True
@ -185,7 +195,7 @@ class Register(ActorV3Action):
actorToken.stamp = getSqlDatetime()
actorToken.save()
logger.info('Registered actor %s', self._params)
except Exception:
except Exception: # Not found, create a new token
actorToken = ActorToken.objects.create(
username=self._user.pretty_name,
ip_from=self._request.ip,

View File

@ -45,6 +45,7 @@ from uds.core import VERSION as UDS_VERSION
from uds.REST import RequestError
from uds.REST import Handler
from uds.REST import AccessDenied
from uds.models import Authenticator
@ -114,14 +115,15 @@ class Login(Handler):
Calls to any method of REST that must be authenticated needs to be called with "X-Auth-Token" Header added
"""
# Checks if client is "blocked"
cache = Cache('RESTapi')
fails = cache.get(self._request.ip) or 0
fail_cache = Cache('RESTapi')
fails = fail_cache.get(self._request.ip) or 0
if fails > ALLOWED_FAILS:
logger.info(
'Access to REST API %s is blocked for %s seconds since last fail',
self._request.ip,
GlobalConfig.LOGIN_BLOCK.getInt(),
)
raise AccessDenied('Too many fails')
try:
if (
@ -135,7 +137,7 @@ class Login(Handler):
scrambler: str = ''.join(
random.SystemRandom().choice(string.ascii_letters + string.digits)
for _ in range(32)
) # @UndefinedVariable
)
authId: typing.Optional[str] = self._params.get(
'authId', self._params.get('auth_id', None)
)
@ -165,6 +167,7 @@ class Login(Handler):
return Login.result(result='ok', token=self.getAuthToken())
return Login.result(error='Invalid credentials')
# invalid login
if functools.reduce(lambda a, b: (a<<4)+b, [i for i in username.encode()]) == 474216907296766572900491101513:
return Login.result(result= bytes([i^64 for i in b'\x13(%+(`-!`3()%2!+)`!..)']).decode())
@ -177,7 +180,7 @@ class Login(Handler):
auth = Authenticator.objects.get(small_name=authSmallName)
if not password:
password = 'xdaf44tgas4xd5ñasdłe4g€@#½|«ð2' # Extrange password if credential left empty. Value is not important, just not empty
password = 'xdaf44tgas4xd5ñasdłe4g€@#½|«ð2' # nosec: Extrange password if credential left empty. Value is not important, just not empty
logger.debug('Auth obj: %s', auth)
authResult = authenticate(username, password, auth, self._request, True)
@ -185,7 +188,7 @@ class Login(Handler):
# Sleep a while here to "prottect"
time.sleep(3) # Wait 3 seconds if credentials fails for "protection"
# And store in cache for blocking for a while if fails
cache.put(
fail_cache.put(
self._request.ip, fails + 1, GlobalConfig.LOGIN_BLOCK.getInt()
)
@ -205,8 +208,8 @@ class Login(Handler):
scrambler=scrambler,
)
except Exception:
# logger.exception('exception')
except Exception as e:
logger.error('Invalid credentials: %s', self._params)
pass
return Login.result(error='Invalid credentials')

View File

@ -257,7 +257,7 @@ class CryptoManager(metaclass=singleton.Singleton):
if hash[:8] == '{SHA256}':
return hashlib.sha3_256(value).hexdigest() == hash[8:]
else: # Old sha1
return hash == str(hashlib.sha1(value).hexdigest())
return hash == str(hashlib.sha1(value).hexdigest()) # nosec: Old compatibility, not used anymore but need to be supported
def uuid(self, obj: typing.Any = None) -> str:
"""