1
0
mirror of https://github.com/dkmstr/openuds.git synced 2025-03-11 00:58:39 +03:00

added secure request session for weblogin callback, and used constant for timeout

This commit is contained in:
Adolfo Gómez García 2024-09-27 16:27:50 +02:00
parent 69087f46fe
commit f53c9bb793
No known key found for this signature in database
GPG Key ID: DD1ABF20724CDA23
6 changed files with 29 additions and 35 deletions

View File

@ -59,32 +59,32 @@ class AuthCallbackTest(UDSTestCase):
self.user = authenticators_fixtures.create_db_users(self.auth, number_of_users=1, groups=self.groups[:3])[0]
def test_no_callback(self) -> None:
config.GlobalConfig.LOGIN_CALLBACK_URL.set('') # Clean callback url
config.GlobalConfig.NOTIFY_CALLBACK_URL.set('') # Clean callback url
with mock.patch('requests.post') as mock_post:
callbacks.perform_login_callback(self.user)
callbacks.weblogin(self.user)
mock_post.assert_not_called()
def test_callback_failed_url(self) -> None:
config.GlobalConfig.LOGIN_CALLBACK_URL.set('http://localhost:1234') # Sample non existent url
config.GlobalConfig.NOTIFY_CALLBACK_URL.set('http://localhost:1234') # Sample non existent url
callbacks.FAILURE_CACHE.set('notify_failure', 3) # Already failed 3 times
with mock.patch('requests.post') as mock_post:
callbacks.perform_login_callback(self.user)
callbacks.weblogin(self.user)
mock_post.assert_not_called()
def test_callback_fails_reteleadly(self) -> None:
config.GlobalConfig.LOGIN_CALLBACK_URL.set('https://localhost:1234')
def test_callback_fails_repeteadly(self) -> None:
config.GlobalConfig.NOTIFY_CALLBACK_URL.set('https://localhost:1234')
with mock.patch('requests.post') as mock_post:
mock_post.side_effect = Exception('Error')
for _i in range(4):
callbacks.perform_login_callback(self.user)
for _i in range(16):
callbacks.weblogin(self.user)
self.assertEqual(mock_post.call_count, 3)
def test_callback_change_groups(self) -> None:
config.GlobalConfig.LOGIN_CALLBACK_URL.set('https://localhost:1234')
config.GlobalConfig.NOTIFY_CALLBACK_URL.set('https://localhost:1234')
all_groups = {group.name for group in self.groups}
current_groups = {group.name for group in self.user.groups.all()}
@ -97,7 +97,7 @@ class AuthCallbackTest(UDSTestCase):
'removed_groups': list(current_groups),
}
callbacks.perform_login_callback(self.user)
callbacks.weblogin(self.user)
self.assertEqual(mock_post.call_count, 1)
self.assertEqual({group.name for group in self.user.groups.all()}, diff_groups)

View File

@ -240,7 +240,7 @@ def register_user(
version=request.os.version,
)
# Try to notify callback if needed
callbacks.perform_login_callback(usr)
callbacks.weblogin(usr)
return types.auth.LoginResult(user=usr)

View File

@ -35,11 +35,10 @@ import re
import typing
import collections.abc
import requests
from uds import models
from uds.core import types
from uds.core.util import config, cache, log
from uds.core import types, consts
from uds.core.util import config, cache, log, security
logger = logging.getLogger(__name__)
@ -48,26 +47,29 @@ FAILURE_CACHE: typing.Final[cache.Cache] = cache.Cache('callback_auth_failure',
RE_GROUPS: typing.Final[typing.Pattern[str]] = re.compile(r'^[A-Za-z0-9_-]+$')
def perform_login_callback(user: models.User) -> None:
def weblogin(user: models.User) -> None:
"""
This method is called when a user logs in. It can be used to perform any action needed when a user logs in.
"""
notify_url = config.GlobalConfig.LOGIN_CALLBACK_URL.as_str()
notify_url = config.GlobalConfig.NOTIFY_CALLBACK_URL.as_str()
if not notify_url.startswith('https') or (fail_count := FAILURE_CACHE.get('notify_failure', 0)) >= 3:
return
# We are going to notify the login to the callback URL
# This is a POST with a JSON payload
try:
response = requests.post(
response = security.secure_requests_session().post(
notify_url,
json={
'authenticator_uuid': user.manager.uuid,
'user_uuid': user.uuid,
'username': user.name,
'groups': [group.name for group in user.groups.all()],
'type': 'weblogin',
'info': {
'authenticator_uuid': user.manager.uuid,
'user_uuid': user.uuid,
'username': user.name,
'groups': [group.name for group in user.groups.all()],
},
},
timeout=3,
timeout=consts.net.URGENT_REQUEST_TIMEOUT,
)
response.raise_for_status()
FAILURE_CACHE.delete('notify_failure')
@ -99,7 +101,6 @@ def perform_login_callback(user: models.User) -> None:
continue
changed_grps += [f'-{group_name}']
user.groups.remove(group)
# Log if groups were changed to keep track of changes
if changed_grps:
@ -112,4 +113,4 @@ def perform_login_callback(user: models.User) -> None:
except Exception as e:
logger.error('Error notifying login to callback URL: %s', e)
FAILURE_CACHE.set('notify_failure', fail_count+1)
FAILURE_CACHE.set('notify_failure', fail_count + 1)

View File

@ -35,6 +35,7 @@ import typing
# Request related timeouts, etc..
DEFAULT_REQUEST_TIMEOUT: typing.Final[int] = 20 # In seconds
DEFAULT_CONNECT_TIMEOUT: typing.Final[int] = 4 # In seconds
URGENT_REQUEST_TIMEOUT: typing.Final[int] = 3 # In seconds
# Default UDS Registerd Server listen port
SERVER_DEFAULT_LISTEN_PORT: typing.Final[int] = 43910

View File

@ -796,8 +796,8 @@ class GlobalConfig:
type=Config.FieldType.BOOLEAN,
help=_('Enable VNC menu for user services'),
)
LOGIN_CALLBACK_URL: Config.Value = Config.section(Config.SectionType.GLOBAL).value(
'loginCallbackURL',
NOTIFY_CALLBACK_URL: Config.Value = Config.section(Config.SectionType.GLOBAL).value(
'notifyCallbackURL',
'',
type=Config.FieldType.HIDDEN,
help=''

View File

@ -58,15 +58,6 @@ logger = logging.getLogger(__name__)
# Disable warnings from urllib for
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
try:
# Ensure that we do not get warnings about self signed certificates and so
import requests.packages.urllib3 # type: ignore
requests.packages.urllib3.disable_warnings() # pyright: ignore
except Exception: # nosec: simple check for disabling warnings,
# Igonre if we cannot disable warnings
pass
def create_self_signed_cert(ip: str) -> tuple[str, str, str]:
"""
@ -194,6 +185,7 @@ def secure_requests_session(*, verify: 'str|bool' = True) -> 'requests.Session':
# Disable warnings from urllib for insecure requests
# Note that although this is done globaly, on some circunstances, may be overriden later
# by some other code that uses urllib3 directly...
# This will ensure that we do not get warnings about self signed certificates
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)