Advancing on tests

This commit is contained in:
Adolfo Gómez García 2022-08-19 00:46:38 +02:00
parent a42857038c
commit 07031850e5
4 changed files with 83 additions and 39 deletions

View File

@ -31,16 +31,10 @@
import typing
import logging
from django.test import TestCase
from django.test.client import Client
from django.conf import settings
from uds import models
from uds.REST.handlers import AUTH_TOKEN_HEADER
from uds.REST.methods.actor_v3 import MANAGED, UNMANAGED, ALLOWED_FAILS
from .. import fixtures
from ..utils import rest, constants
from ..utils import rest
logger = logging.getLogger(__name__)
@ -50,30 +44,41 @@ class TestActorV3(rest.test.RESTTestCase):
Test actor functionality
"""
def test_register(self) -> None:
def test_test_managed(self) -> None:
"""
Test actor rest api registration
Test actorv3 initialization
"""
response: typing.Any
for i, usr in enumerate(self.admins + self.staffs + self.plain_users):
token = self.login(usr)
rest_token, actor_token = self.login_and_register()
# Auth token already set in client headers
# Try to register. Plain users will fail
will_fail = usr in self.plain_users
response = self.client.post(
'/uds/rest/actor/v3/register',
data=self.register_data(
constants.STRING_CHARS if i % 2 == 0 else constants.STRING_CHARS_INVALID
),
content_type='application/json',
**{AUTH_TOKEN_HEADER: token}
)
if will_fail:
self.assertEqual(response.status_code, 403)
continue # Try next user, this one will fail
# No actor token, will fail
response = self.client.post(
'/uds/rest/actor/v3/test',
data={'type': MANAGED},
content_type='application/json',
)
self.assertEqual(response.status_code, 200)
self.assertEqual(response.json()['result'], 'invalid token')
# Invalid actor token also fails
response = self.client.post(
'/uds/rest/actor/v3/test',
data={'type': MANAGED, 'token': 'invalid'},
content_type='application/json',
)
self.assertEqual(response.status_code, 200)
token = response.json()['result']
self.assertEqual(response.status_code, 200)
self.assertEqual(response.json()['result'], 'invalid token')
# Ensure database contains the registered token
self.assertEqual(models.ActorToken.objects.filter(token=token).count(), 1)
# Without header, test will success because its not authenticated
self.client.add_header(AUTH_TOKEN_HEADER, 'invalid')
response = self.client.post(
'/uds/rest/actor/v3/test',
data={'type': MANAGED, 'token': actor_token},
content_type='application/json',
)
self.assertEqual(response.status_code, 200)
# We have 2 attempts failed

View File

@ -97,10 +97,13 @@ class RESTTestCase(test.UDSTestCase):
'log_level': '0',
}
# Login as specified and returns the auth token
def login(
self, user: typing.Optional[models.User] = None, as_admin: bool = True
) -> str:
'''
Login as specified and returns the auth token
The token is inserted on the header of the client, so it can be used in the rest of the tests
'''
user = user or (self.admins[0] if as_admin else self.staffs[0])
response = rest.login(
self,
@ -110,6 +113,8 @@ class RESTTestCase(test.UDSTestCase):
password=user.name,
)
self.assertEqual(response['result'], 'ok', 'Login failed')
# Insert token into headers
self.client.add_header(AUTH_TOKEN_HEADER, response['token'])
return response['token']
# Login as admin or staff and register an actor
@ -117,12 +122,11 @@ class RESTTestCase(test.UDSTestCase):
# - The login auth token
# - The actor token
def login_and_register(self, as_admin: bool = True) -> typing.Tuple[str, str]:
token = self.login(as_admin=as_admin)
token = self.login(as_admin=as_admin) # Token not used, alreade inserted on login
response = self.client.post(
'/uds/rest/actor/v3/register',
data=self.register_data(constants.STRING_CHARS),
content_type='application/json',
**{AUTH_TOKEN_HEADER: token}
)
self.assertEqual(response.status_code, 200, 'Actor registration failed')
return token, response.json()['result']

View File

@ -43,6 +43,10 @@ from uds.core.managers.crypto import CryptoManager
logger = logging.getLogger(__name__)
class UDSClient(Client):
headers: typing.Dict[str, str] = {
'HTTP_USER_AGENT': 'Testing user agent',
}
def __init__(
self, enforce_csrf_checks: bool =False, raise_request_exception: bool=True, **defaults: typing.Any
):
@ -57,13 +61,27 @@ class UDSClient(Client):
]
# Instantiate the client and add basic user agent
super().__init__(enforce_csrf_checks, raise_request_exception, HTTP_USER_AGENT='Testing user agent')
super().__init__(enforce_csrf_checks, raise_request_exception)
# and required UDS cookie
self.cookies['uds'] = CryptoManager().randomString(48)
def add_header(self, name: str, value: str):
self.headers[name] = value
def request(self, **request: typing.Any):
# Copy request dict
request = request.copy()
# Add headers
request.update(self.headers)
return super().request(**request)
class UDSTestCase(TestCase):
client_class: typing.Type = UDSClient
client: UDSClient
class UDSTransactionTestCasse(TransactionTestCase):
client_class: typing.Type = UDSClient
client: UDSClient

View File

@ -29,6 +29,7 @@
@author: Adolfo Gómez, dkmaster at dkmon dot com
"""
import secrets
import time
import logging
import typing
@ -58,9 +59,13 @@ if typing.TYPE_CHECKING:
logger = logging.getLogger(__name__)
ALLOWED_FAILS = 5
ALLOWED_FAILS = 8 # More than enough for now
MANAGED = 'managed'
UNMANAGED = 'unmanaged' # matches the definition of UDS Actors OFC
# Cache the "failed login attempts" for a given IP
cache = Cache('actorv3')
class BlockAccess(Exception):
pass
@ -70,6 +75,7 @@ class BlockAccess(Exception):
def fixIdsList(idsList: typing.List[str]) -> typing.List[str]:
return [i.upper() for i in idsList] + [i.lower() for i in idsList]
def checkBlockedIp(ip: str) -> None:
if GlobalConfig.BLOCK_ACTOR_FAILURES.getBool() is False:
return
@ -81,15 +87,20 @@ def checkBlockedIp(ip: str) -> None:
ip,
GlobalConfig.LOGIN_BLOCK.getInt(),
)
# Sleep a while to try to minimize brute force attacks somehow
time.sleep(3) # 3 seconds should be enough
raise BlockAccess()
def incFailedIp(ip: str) -> None:
cache = Cache('actorv3')
fails = cache.get(ip, 0) + 1
cache.put(ip, fails, GlobalConfig.LOGIN_BLOCK.getInt())
def clearFailedIp(ip: str) -> None:
cache.remove(ip)
class ActorV3Action(Handler):
authenticated = False # Actor requests are not authenticated normally
path = 'actor/v3'
@ -126,6 +137,8 @@ class ActorV3Action(Handler):
checkBlockedIp(self._request.ip)
result = self.action()
logger.debug('Action result: %s', result)
# Result was ok, clear the failed requests for this ip
clearFailedIp(self._request.ip)
return result
except (BlockAccess, KeyError):
# For blocking attacks
@ -153,8 +166,12 @@ class Test(ActorV3Action):
token=self._params['token']
) # Not assigned, because only needs check
except Exception:
# Increase failed attempts
incFailedIp(self._request.ip)
# And return error
return ActorV3Action.actorResult('invalid token')
clearFailedIp(self._request.ip)
return ActorV3Action.actorResult('ok')
@ -167,7 +184,7 @@ class Register(ActorV3Action):
- hostname: hostname of the registering machine
- pre_command: command to be executed before the connection of the user is established
- post_command: command to be executed after the actor is initialized and before set ready
- run_once_command: comand to run just once after the actor is started. The actor will stop after this.
- run_once_command: comand to run just once after the actor is started. The actor will stop after this.
The command is responsible to restart the actor.
- log_level: log level for the actor
@ -493,7 +510,7 @@ class LoginLogout(ActorV3Action):
service.processLogout(validId, remote_login=is_remote)
# All right, service notified..
except Exception as e :
except Exception as e:
# Log error and continue
logger.error('Error notifying service: %s (%s)', e, self._params)
raise BlockAccess()