1
0
mirror of https://github.com/dkmstr/openuds.git synced 2025-01-10 01:17:59 +03:00

* Fixed test error with modified http client

* linting even more, untill all files has been linted
* Tested still works after changes :)
This commit is contained in:
Adolfo Gómez García 2023-04-15 21:21:23 +02:00
parent a058b61276
commit 8adc3ca40d
No known key found for this signature in database
GPG Key ID: DD1ABF20724CDA23
22 changed files with 232 additions and 221 deletions

View File

@ -3,6 +3,7 @@
Settings file for uds server (Django)
'''
import os
import sys
import django
# calculated paths for django and the site
@ -57,20 +58,23 @@ TIME_ZONE = 'Europe/Madrid'
# http://www.i18nguy.com/unicode/language-identifiers.html
LANGUAGE_CODE = 'en'
ugettext = lambda s: s
# Override for gettext so we can use the same syntax as in django
# and we can translate it later with our own function
def gettext(s):
return s
LANGUAGES = (
('es', ugettext('Spanish')),
('en', ugettext('English')),
('fr', ugettext('French')),
('de', ugettext('German')),
('pt', ugettext('Portuguese')),
('it', ugettext('Italian')),
('ar', ugettext('Arabic')),
('eu', ugettext('Basque')),
('ar', ugettext('Arabian')),
('ca', ugettext('Catalan')),
('zh-hans', ugettext('Chinese')),
('es', gettext('Spanish')),
('en', gettext('English')),
('fr', gettext('French')),
('de', gettext('German')),
('pt', gettext('Portuguese')),
('it', gettext('Italian')),
('ar', gettext('Arabic')),
('eu', gettext('Basque')),
('ar', gettext('Arabian')),
('ca', gettext('Catalan')),
('zh-hans', gettext('Chinese')),
)
LANGUAGE_COOKIE_NAME = 'uds_lang'
@ -159,7 +163,7 @@ FILE_UPLOAD_DIRECTORY_PERMISSIONS = 0o750
FILE_UPLOAD_MAX_MEMORY_SIZE = 512 * 1024 # 512 Kb
# Make this unique, and don't share it with anybody.
SECRET_KEY = 's5ky!7b5f#s35!e38xv%e-+iey6yi-#630x)kk3kk5_j8rie2*'
SECRET_KEY = 's5ky!7b5f#s35!e38xv%e-+iey6yi-#630x)kk3kk5_j8rie2*' # nosec: sample key, Remember to change it on production!!
# This is a very long string, an RSA KEY (this can be changed, but if u loose it, all encription will be lost)
RSA_KEY = '-----BEGIN RSA PRIVATE KEY-----\nMIICXgIBAAKBgQC0qe1GlriQbHFYdKYRPBFDSS8Ne/TEKI2mtPKJf36XZTy6rIyH\nvUpT1gMScVjHjOISLNJQqktyv0G+ZGzLDmfkCUBev6JBlFwNeX3Dv/97Q0BsEzJX\noYHiDANUkuB30ukmGvG0sg1v4ccl+xs2Su6pFSc5bGINBcQ5tO0ZI6Q1nQIDAQAB\nAoGBAKA7Octqb+T/mQOX6ZXNjY38wXOXJb44LXHWeGnEnvUNf/Aci0L0epCidfUM\nfG33oKX4BMwwTVxHDrsa/HaXn0FZtbQeBVywZqMqWpkfL/Ho8XJ8Rsq8OfElrwek\nOCPXgxMzQYxoNHw8V97k5qhfupQ+h878BseN367xSyQ8plahAkEAuPgAi6aobwZ5\nFZhx/+6rmQ8sM8FOuzzm6bclrvfuRAUFa9+kMM2K48NAneAtLPphofqI8wDPCYgQ\nTl7O96GXVQJBAPoKtWIMuBHJXKCdUNOISmeEvEzJMPKduvyqnUYv17tM0JTV0uzO\nuDpJoNIwVPq5c3LJaORKeCZnt3dBrdH1FSkCQQC3DK+1hIvhvB0uUvxWlIL7aTmM\nSny47Y9zsc04N6JzbCiuVdeueGs/9eXHl6f9gBgI7eCD48QAocfJVygphqA1AkEA\nrvzZjcIK+9+pJHqUO0XxlFrPkQloaRK77uHUaW9IEjui6dZu4+2T/q7SjubmQgWR\nZy7Pap03UuFZA2wCoqJbaQJAUG0FVrnyUORUnMQvdDjAWps2sXoPvA8sbQY1W8dh\nR2k4TCFl2wD7LutvsdgdkiH0gWdh5tc1c4dRmSX1eQ27nA==\n-----END RSA PRIVATE KEY-----'
@ -252,7 +256,7 @@ AUTHFILE = 'auth.log'
USEFILE = 'use.log'
TRACEFILE = 'trace.log'
OPERATIONSFILE = 'operations.log'
LOGLEVEL = DEBUG and 'DEBUG' or 'INFO'
LOGLEVEL = 'DEBUG' if DEBUG else 'INFO'
ROTATINGSIZE = 32 * 1024 * 1024 # 32 Megabytes before rotating files
LOGGING = {
@ -279,7 +283,6 @@ LOGGING = {
'workers': {
'format': 'uds-w[%(process)-5s]: %(levelname)s %(asctime)s %(name)s:%(funcName)s %(lineno)d %(message)s'
},
'database': {'format': '%(levelname)s %(asctime)s Database %(message)s'},
'auth': {'format': '%(asctime)s %(message)s'},
'use': {'format': '%(asctime)s %(message)s'},
'trace': {'format': '%(levelname)s %(asctime)s %(message)s'},

View File

@ -59,17 +59,17 @@ class GlobalRequestMiddlewareTest(test.WEBTestCase):
self.client.enable_ipv4()
response = self.client.get('/', secure=False)
request = typing.cast('ExtendedHttpRequestWithUser', response.wsgi_request)
req = typing.cast('ExtendedHttpRequestWithUser', response.wsgi_request)
# session[AUTHORIZED_KEY] = False, not logged in
self.assertEqual(request.session.get(AUTHORIZED_KEY), False)
self.assertEqual(req.session.get(AUTHORIZED_KEY), False)
# Ensure ip, and ip_proxy are set and both are the same, 127.0.0.1
self.assertEqual(request.ip, '127.0.0.1')
self.assertEqual(request.ip_proxy, '127.0.0.1')
self.assertEqual(request.ip_version, 4)
self.assertEqual(req.ip, '127.0.0.1')
self.assertEqual(req.ip_proxy, '127.0.0.1')
self.assertEqual(req.ip_version, 4)
# Ensure user is not set
self.assertEqual(request.user, None)
self.assertEqual(req.user, None)
# And redirects to index
self.assertEqual(response.status_code, 302)
self.assertEqual(response.url, reverse('page.index'))
@ -81,17 +81,17 @@ class GlobalRequestMiddlewareTest(test.WEBTestCase):
self.client.enable_ipv6()
response = self.client.get('/', secure=False)
request = typing.cast('ExtendedHttpRequestWithUser', response.wsgi_request)
req = typing.cast('ExtendedHttpRequestWithUser', response.wsgi_request)
# session[AUTHORIZED_KEY] = False, not logged in
self.assertEqual(request.session.get(AUTHORIZED_KEY), False)
self.assertEqual(req.session.get(AUTHORIZED_KEY), False)
# Ensure ip, and ip_proxy are set and both are the same,
self.assertEqual(request.ip, '::1')
self.assertEqual(request.ip_proxy, '::1')
self.assertEqual(request.ip_version, 6)
self.assertEqual(req.ip, '::1')
self.assertEqual(req.ip_proxy, '::1')
self.assertEqual(req.ip_version, 6)
# Ensure user is not set
self.assertEqual(request.user, None)
self.assertEqual(req.user, None)
# And redirects to index
self.assertEqual(response.status_code, 302)
self.assertEqual(response.url, reverse('page.index'))
@ -105,17 +105,17 @@ class GlobalRequestMiddlewareTest(test.WEBTestCase):
user = self.login(as_admin=False)
response = self.client.get('/', secure=False)
request = typing.cast('ExtendedHttpRequestWithUser', response.wsgi_request)
req = typing.cast('ExtendedHttpRequestWithUser', response.wsgi_request)
# session[AUTHORIZED_KEY] = True, logged in
self.assertEqual(request.session.get(AUTHORIZED_KEY), True)
self.assertEqual(req.session.get(AUTHORIZED_KEY), True)
# Ensure ip, and ip_proxy are set and both are the same,
self.assertEqual(request.ip, '127.0.0.1')
self.assertEqual(request.ip_proxy, '127.0.0.1')
self.assertEqual(request.ip_version, 4)
self.assertEqual(req.ip, '127.0.0.1')
self.assertEqual(req.ip_proxy, '127.0.0.1')
self.assertEqual(req.ip_version, 4)
# Ensure user is correct
self.assertEqual(request.user.uuid, user.uuid)
self.assertEqual(req.user.uuid, user.uuid)
# And redirects to index
self.assertEqual(response.status_code, 302)
self.assertEqual(response.url, reverse('page.index'))
@ -129,44 +129,45 @@ class GlobalRequestMiddlewareTest(test.WEBTestCase):
user = self.login(as_admin=False)
response = self.client.get('/', secure=False)
request = typing.cast('ExtendedHttpRequestWithUser', response.wsgi_request)
req = typing.cast('ExtendedHttpRequestWithUser', response.wsgi_request)
# session[AUTHORIZED_KEY] = True, logged in
self.assertEqual(request.session.get(AUTHORIZED_KEY), True)
self.assertEqual(req.session.get(AUTHORIZED_KEY), True)
# Ensure ip, and ip_proxy are set and both are the same,
self.assertEqual(request.ip, '::1')
self.assertEqual(request.ip_proxy, '::1')
self.assertEqual(request.ip_version, 6)
self.assertEqual(req.ip, '::1')
self.assertEqual(req.ip_proxy, '::1')
self.assertEqual(req.ip_version, 6)
# Ensure user is correct
self.assertEqual(request.user.uuid, user.uuid)
self.assertEqual(req.user.uuid, user.uuid)
# And redirects to index
self.assertEqual(response.status_code, 302)
self.assertEqual(response.url, reverse('page.index'))
def test_no_middleware(self) -> None:
# Ensure GlobalRequestMiddleware is not present
GlobalRequestMiddlewareTest.remove_middleware('uds.middleware.request.GlobalRequestMiddleware')
GlobalRequestMiddlewareTest.remove_middleware(
'uds.middleware.request.GlobalRequestMiddleware'
)
self.client.enable_ipv4()
response = self.client.get('/', secure=False)
request = response.wsgi_request
req = response.wsgi_request
# session[AUTHORIZED_KEY] is not present
self.assertEqual(AUTHORIZED_KEY in request.session, False)
self.assertEqual(AUTHORIZED_KEY in req.session, False)
# ip is not present, nor ip_proxy or ip_version
self.assertEqual(hasattr(request, 'ip'), False)
self.assertEqual(hasattr(request, 'ip_proxy'), False)
self.assertEqual(hasattr(request, 'ip_version'), False)
self.assertEqual(hasattr(req, 'ip'), False)
self.assertEqual(hasattr(req, 'ip_proxy'), False)
self.assertEqual(hasattr(req, 'ip_version'), False)
# Also, user is not present
self.assertEqual(hasattr(request, 'user'), False)
self.assertEqual(hasattr(req, 'user'), False)
# And redirects to index
self.assertEqual(response.status_code, 302)
self.assertEqual(response.url, reverse('page.index'))
def test_detect_ips_no_proxy(self) -> None:
req = mock.Mock()
# Use an ipv4 and an ipv6 address
@ -174,7 +175,7 @@ class GlobalRequestMiddlewareTest(test.WEBTestCase):
req.META = {
'REMOTE_ADDR': ip,
}
request._fill_ips(req)
request._fill_ips(req) # pylint: disable=protected-access
self.assertEqual(req.ip, ip)
self.assertEqual(req.ip_proxy, ip)
self.assertEqual(req.ip_version, 4 if '.' in ip else 6)
@ -193,22 +194,34 @@ class GlobalRequestMiddlewareTest(test.WEBTestCase):
'HTTP_X_FORWARDED_FOR': client_ip,
}
else:
req.META = {
'HTTP_X_FORWARDED_FOR': "{},{}".format(client_ip, proxy),
}
req.META = {'HTTP_X_FORWARDED_FOR': f'{client_ip},{proxy}'}
request._fill_ips(req)
self.assertEqual(req.ip, client_ip, "Failed for {}".format(req.META))
self.assertEqual(req.ip_proxy, client_ip, "Failed for {}".format(req.META))
self.assertEqual(req.ip_version, 4 if '.' in client_ip else 6, "Failed for {}".format(req.META))
request._fill_ips(req) # pylint: disable=protected-access
self.assertEqual(
req.ip, client_ip, "Failed for {}".format(req.META)
)
self.assertEqual(
req.ip_proxy, client_ip, "Failed for {}".format(req.META)
)
self.assertEqual(
req.ip_version,
4 if '.' in client_ip else 6,
"Failed for {}".format(req.META),
)
def test_detect_ips_proxy_chained(self) -> None:
config.GlobalConfig.BEHIND_PROXY.set(True)
req = mock.Mock()
# Use an ipv4 and an ipv6 address
for client_ip in ['192.168.128.128', '2001:db8:85a3:8d3:1319:8a2e:370:7348']:
for first_proxy in ['192.168.200.200', '2001:db8:85a3:8d3:1319:8a2e:370:7349']:
for second_proxy in ['192.168.201.201', '2001:db8:85a3:8d3:1319:8a2e:370:7350']:
for first_proxy in [
'192.168.200.200',
'2001:db8:85a3:8d3:1319:8a2e:370:7349',
]:
for second_proxy in [
'192.168.201.201',
'2001:db8:85a3:8d3:1319:8a2e:370:7350',
]:
for with_nginx in [True, False]:
x_forwarded_for = '{}, {}'.format(client_ip, first_proxy)
if with_nginx is False:
@ -218,11 +231,12 @@ class GlobalRequestMiddlewareTest(test.WEBTestCase):
}
else:
req.META = {
'HTTP_X_FORWARDED_FOR': "{}, {}".format(x_forwarded_for, second_proxy),
'HTTP_X_FORWARDED_FOR': "{}, {}".format(
x_forwarded_for, second_proxy
),
}
request._fill_ips(req)
self.assertEqual(req.ip, first_proxy)
self.assertEqual(req.ip_proxy, client_ip)
self.assertEqual(req.ip_version, 4 if '.' in first_proxy else 6)

View File

@ -28,12 +28,10 @@
"""
@author: Adolfo Gómez, dkmaster at dkmon dot com
"""
import typing
import logging
from django.urls import reverse
from uds.core.util import config
from uds.core.managers.crypto import CryptoManager
from ..utils import test
@ -48,10 +46,11 @@ class RedirectMiddlewareTest(test.UDSTransactionTestCase):
"""
def test_redirect(self):
RedirectMiddlewareTest.add_middleware('uds.middleware.redirect.RedirectMiddleware')
page = 'https://testserver' + reverse('page.index')
response = self.client.get('/', secure=False)
self.assertEqual(response.status_code, 302)
self.assertEqual(response.url, 'https://testserver/')
# Try secure, will redirect to index
self.assertEqual(response.status_code, 301)
self.assertEqual(response.url, page)
# Try secure, will redirect to index, not absulute url
response = self.client.get('/', secure=True)
self.assertEqual(response.status_code, 302)
self.assertEqual(response.url, reverse('page.index'))
@ -60,7 +59,7 @@ class RedirectMiddlewareTest(test.UDSTransactionTestCase):
for _ in range(32):
url = f'/{CryptoManager().randomString(32)}'
response = self.client.get(url, secure=False)
self.assertEqual(response.status_code, 302)
self.assertEqual(response.url, f'https://testserver{url}')
self.assertEqual(response.status_code, 301)
self.assertEqual(response.url, page)
response = self.client.get(url, secure=True)
self.assertEqual(response.status_code, 404) # Not found

View File

@ -53,14 +53,9 @@ class UDSHttpResponse(HttpResponse):
super().__init__(content, *args, **kwargs)
self.content = content
def json(self) -> typing.Any:
return super().json() # type: ignore
class UDSClientMixin:
headers: typing.Dict[str, str] = {
'HTTP_USER_AGENT': 'Testing user agent',
}
uds_headers: typing.Dict[str, str]
ip_version: int = 4
def initialize(self):
@ -73,18 +68,21 @@ class UDSClientMixin:
'django.contrib.messages.middleware.MessageMiddleware',
'uds.middleware.request.GlobalRequestMiddleware',
]
self.uds_headers = {
'HTTP_USER_AGENT': 'Testing user agent',
}
# Update settings security options
settings.RSA_KEY = '-----BEGIN PRIVATE KEY-----\nMIIEvwIBADANBgkqhkiG9w0BAQEFAASCBKkwggSlAgEAAoIBAQDcANi/08cnpn04\njKW/o2G1k4SIa6dJks8DmT4MQHOWqYC46YSIIPzqGoBPcvkbDSPSFBnByo3HhMY+\nk4JHc9SUwEmHSJWDCHjt7XSXX/ryqH0QQIJtSjk9Bc+GkOU24mnbITiw7ORjp7VN\nvgFdFhjVsZM/NjX/6Y9DoCPC1mGj0O9Dd4MfCsNwUxRhhR6LdrEnpRUSVW0Ksxbz\ncTfpQjdFr86+1BeUbzqN2HDcEGhvioj+0lGPXcOZoRNYU16H7kjLNP+o+rC7f/q/\nfoOYLzDSkmzePbcG+g0Hv7K7fuLus05ZWjupOmJA9hytB1BIF4p5f4ewl05Fx2Zj\nG2LneO2fAgMBAAECggEBANDimOnh2TkDceeMWx+OsAooC3E/zbEkjBudl3UoiNcn\nD0oCpkxeDeT0zpkgz/ZoTnd7kE0Y1e73WQc3JT5UcyXdQLMLLrIgDDnT+Jx1jB5z\n7XLN3UiJbblL2BOrZYbsCJf/fgU2l08rgBBVdJP+lAvps6YUAcd+6gDKfsnSpRhU\nWBHLZde7l6vUJ2OK9ZmHaghF5E8Xx918OSUKFJfGTYL5JLTb/scdl8vQse1quWC1\nk48PPXK10vOFvYWonQpRb2cOK/PPjPXPNWzcQyQY9D1iOeFvRyLqOXYE/ZY+qDe2\nHdPGrkl67yz01nzepkWWg/ZNbMXeZZyOnZm0aXtOxtkCgYEA/Qz3mescgwrt67yh\nFrbXjUqiVf2IpbNt88CUcbY0r1EdTA9OMtOtPYNvfpyRIRfDaZJ1zAdh3CZ2/hTm\ng+VUtseKnUDCi0xIBKX3V2O8sryWt2KStTnTo6JP0T47yXvmaRu5cutgoaD9SK+r\nN5vg1D2gNLmsT8uJh1Bl/yWGC4sCgYEA3pFGgAmiywsvmsddkI+LujoQVTiqkfFg\nMHHsJFOZlhYO83g49Q11pcQ70ukT6e89Ggwy///+z19p8jJ+wGqQWQLsM6eO1utg\nnJ8wMTwk8tOEm9MnWnnWhtG9KWcgkmwOVQiesJdWa1xOqsBKGchUkugmFycKNsiG\nHUbogbJ0OL0CgYBVLIcuxKdNKGGaxlwGVDbLdQKdJQBYncN1ly2f9K9ZD1loH4K3\nsu4N1W6y1Co5VFFO+KAzs4xp2HyW2xwX6xoPh6yNb53L2zombmKJhKWgF8A3K7Or\n0jH9UwXArUzcbZrJaC6MktNss85tJ8vepNYROkjxVkm8dgrtg89BCTVMLwKBgQCW\nSSh+uoL3cdUyQV63h4ZFOIHg2cOrin52F+bpXJ3/z2NHGa30IqOHTGtM7l+o/geX\nOBeT72tC4d2rUlduXEaeJDAUbRcxnnx9JayoAkG8ygDoK3uOR2kJXkTJ2T4QQPCo\nkIp/GaGcGxdviyo+IJyjGijmR1FJTrvotwG22iZKTQKBgQCIh50Dz0/rqZB4Om5g\nLLdZn1C8/lOR8hdK9WUyPHZfJKpQaDOlNdiy9x6xD6+uIQlbNsJhlDbOudHDurfI\nghGbJ1sy1FUloP+V3JAFS88zIwrddcGEso8YMFMCE1fH2/q35XGwZEnUq7ttDaxx\nHmTQ2w37WASIUgCl2GhM25np0Q==\n-----END PRIVATE KEY-----\n'
settings.CERTIFICATE = '-----BEGIN CERTIFICATE-----\nMIICzTCCAjYCCQCOUQEWpuEa3jANBgkqhkiG9w0BAQUFADCBqjELMAkGA1UEBhMC\nRVMxDzANBgNVBAgMBk1hZHJpZDEUMBIGA1UEBwwLQWxjb3Jjw4PCs24xHTAbBgNV\nBAoMFFZpcnR1YWwgQ2FibGUgUy5MLlUuMRQwEgYDVQQLDAtEZXZlbG9wbWVudDEY\nMBYGA1UEAwwPQWRvbGZvIEfDg8KzbWV6MSUwIwYJKoZIhvcNAQkBFhZhZ29tZXpA\ndmlydHVhbGNhYmxlLmVzMB4XDTEyMDYyNTA0MjM0MloXDTEzMDYyNTA0MjM0Mlow\ngaoxCzAJBgNVBAYTAkVTMQ8wDQYDVQQIDAZNYWRyaWQxFDASBgNVBAcMC0FsY29y\nY8ODwrNuMR0wGwYDVQQKDBRWaXJ0dWFsIENhYmxlIFMuTC5VLjEUMBIGA1UECwwL\nRGV2ZWxvcG1lbnQxGDAWBgNVBAMMD0Fkb2xmbyBHw4PCs21lejElMCMGCSqGSIb3\nDQEJARYWYWdvbWV6QHZpcnR1YWxjYWJsZS5lczCBnzANBgkqhkiG9w0BAQEFAAOB\njQAwgYkCgYEA35iGyHS/GVdWk3n9kQ+wsCLR++jd9Vez/s407/natm8YDteKksA0\nMwIvDAX722blm8PUya2NOlnum8KdyUPDOq825XERDlsIA+sTd6lb1c7w44qZ/pb+\n68mhXoRx2VJsu//+zhBkaQ1/KcugeHa4WLRIH35YLxdQDxrXS1eQWccCAwEAATAN\nBgkqhkiG9w0BAQUFAAOBgQAk+fJPpY+XvUsxR2A4SaQ8TGnE2x4PtpwCrCVzKEU9\nW2ugdXvysxkHbib3+JdA6s+lJjHs5HiMZPo/ak8adEKke+d10EU5YcUaJRRUpStY\nqQHziaqOl5Hgi75Kjskq6+tCU0Iui+s9pBg0V6y1AQsCmH2xFs7t1oEOGRFVarfF\n4Q==\n-----END CERTIFICATE-----'
def add_header(self, name: str, value: str):
self.headers[name] = value
self.uds_headers[name] = value
def set_user_agent(self, user_agent: typing.Optional[str] = None):
user_agent = user_agent or ''
# Add 'HTTP_USER_AGENT' header
self.headers['HTTP_USER_AGENT'] = user_agent
self.uds_headers['HTTP_USER_AGENT'] = user_agent
def enable_ipv4(self):
self.ip_version = 4
@ -121,7 +119,7 @@ class UDSClient(UDSClientMixin, Client):
# Copy request dict
request = request.copy()
# Add headers
request.update(self.headers)
request.update(self.uds_headers)
return super().request(**request)
def get(self, *args, **kwargs) -> 'UDSHttpResponse':
@ -176,9 +174,10 @@ class UDSAsyncClient(UDSClientMixin, AsyncClient):
# Copy request dict
request = request.copy()
# Add headers
request.update(self.headers)
request.update(self.uds_headers)
return await super().request(**request)
# pylint: disable=invalid-overridden-method
async def get(self, *args, **kwargs) -> 'UDSHttpResponse':
self.append_remote_addr(kwargs)
return typing.cast('UDSHttpResponse', await super().get(*args, **kwargs))
@ -187,6 +186,7 @@ class UDSAsyncClient(UDSClientMixin, AsyncClient):
# compose url
return await self.get(self.compose_rest_url(method), *args, **kwargs)
# pylint: disable=invalid-overridden-method
async def post(self, *args, **kwargs) -> 'UDSHttpResponse':
self.append_remote_addr(kwargs)
return typing.cast('UDSHttpResponse', await super().post(*args, **kwargs))
@ -195,6 +195,7 @@ class UDSAsyncClient(UDSClientMixin, AsyncClient):
kwargs['content_type'] = kwargs.get('content_type', 'application/json')
return await self.post(self.compose_rest_url(method), *args, **kwargs)
# pylint: disable=invalid-overridden-method
async def put(self, *args, **kwargs) -> 'UDSHttpResponse':
kwargs['content_type'] = kwargs.get('content_type', 'application/json')
return typing.cast('UDSHttpResponse', await super().put(*args, **kwargs))
@ -203,6 +204,7 @@ class UDSAsyncClient(UDSClientMixin, AsyncClient):
kwargs['content_type'] = kwargs.get('content_type', 'application/json')
return await self.put(self.compose_rest_url(method), *args, **kwargs)
# pylint: disable=invalid-overridden-method
async def delete(self, *args, **kwargs) -> 'UDSHttpResponse':
self.append_remote_addr(kwargs)
return typing.cast('UDSHttpResponse', await super().delete(*args, **kwargs))
@ -247,6 +249,7 @@ class UDSTransactionTestCase(UDSTestCaseMixin, TransactionTestCase):
setupClass(cls)
# pylint: disable=unused-argument
def setupClass(
cls: typing.Union[typing.Type[UDSTestCase], typing.Type[UDSTransactionTestCase]]
) -> None:

View File

@ -33,6 +33,7 @@ import typing
from django.urls import reverse
from uds import models
from uds.core.util.config import GlobalConfig
from ...utils.web import test
@ -41,8 +42,6 @@ from ...fixtures import authenticators as fixtures_authenticators
if typing.TYPE_CHECKING:
from django.http import HttpResponse
from uds import models
class WebLoginLogoutTest(test.WEBTestCase):
"""

View File

@ -65,7 +65,7 @@ def loadModulesUrls() -> typing.List[typing.Any]:
modName = 'uds.dispatchers'
pkgpath = os.path.dirname(typing.cast(str, sys.modules[modName].__file__))
for _, name, _ in pkgutil.iter_modules([pkgpath]):
fullModName = '{}.{}.urls'.format(modName, name)
fullModName = f'{modName}.{name}.urls'
try:
mod = importlib.import_module(fullModName)
urlpatterns: typing.List[typing.Any] = getattr(mod, 'urlpatterns')
@ -98,7 +98,7 @@ def importModules(modName: str, *, packageName: typing.Optional[str] = None) ->
pkgpath = os.path.dirname(typing.cast(str, sys.modules[modName].__file__))
if packageName: # Append package name to path and module name
pkgpath = os.path.join(pkgpath, packageName)
modName = '{}.{}'.format(modName, packageName)
modName = f'{modName}.{packageName}'
logger.info('* Importing modules from %s', pkgpath)
for _, name, _ in pkgutil.iter_modules([pkgpath]):

View File

@ -35,12 +35,13 @@ import socket
import logging
import typing
import ipaddress
import enum
class IpType(typing.NamedTuple):
ip: int
version: typing.Literal[4, 6, 0] # 0 is only used for invalid detected ip
class NetworkType(typing.NamedTuple):
start: int
end: int
@ -73,10 +74,11 @@ def ipToLong(ip: str) -> IpType:
try:
if ':' in ip and '.' not in ip:
return IpType(int(ipaddress.IPv6Address(ip)), 6)
else: # ipv4
if ':' in ip and '.' in ip:
ip = ip.split(':')[-1] # Last part of ipv6 address
return IpType(int(ipaddress.IPv4Address(ip)), 4)
if ':' in ip and '.' in ip:
ip = ip.split(':')[
-1
] # Last part of ipv6 address is ipv4 address (has dots and colons, so we can't use ipaddress)
return IpType(int(ipaddress.IPv4Address(ip)), 4)
except Exception as e:
logger.error('Ivalid value: %s (%s)', ip, e)
return IpType(0, 0) # Invalid values will map to "0.0.0.0" --> 0
@ -88,11 +90,10 @@ def longToIp(n: int, version: typing.Literal[0, 4, 6] = 0) -> str:
"""
if n > 2**32 or version == 6:
return str(ipaddress.IPv6Address(n).compressed)
else:
return str(ipaddress.IPv4Address(n))
return str(ipaddress.IPv4Address(n))
def networkFromStringIPv4(strNets: str, version: typing.Literal[0, 4, 6] = 0) -> NetworkType:
def networkFromStringIPv4(strNets: str) -> NetworkType:
'''
Parses the network from strings in this forms:
- A.* (or A.*.* or A.*.*.*)
@ -185,17 +186,17 @@ def networkFromStringIPv4(strNets: str, version: typing.Literal[0, 4, 6] = 0) ->
raise Exception()
except Exception as e:
logger.error('Invalid network found: %s %s', strNets, e)
raise ValueError(inputString)
raise ValueError(inputString) from e
def networkFromStringIPv6(strNets: str, version: typing.Literal[0, 4, 6] = 0) -> NetworkType:
def networkFromStringIPv6(strNets: str) -> NetworkType:
'''
returns a named tuple with networks start and network end
'''
logger.debug('Getting network from %s', strNets)
# if '*' or '::*', return the whole IPv6 range
if strNets == '*' or strNets == '::*':
if strNets in ('*', '::*'):
return NetworkType(0, 2**128 - 1, 6)
try:
@ -204,7 +205,7 @@ def networkFromStringIPv6(strNets: str, version: typing.Literal[0, 4, 6] = 0) ->
return NetworkType(int(net.network_address), int(net.broadcast_address), 6)
except Exception as e:
logger.error('Invalid network found: %s %s', strNets, e)
raise ValueError(strNets)
raise ValueError(strNets) from e
def networkFromString(
@ -212,12 +213,12 @@ def networkFromString(
version: typing.Literal[0, 4, 6] = 0,
) -> NetworkType:
if not ':' in strNets and version != 6:
return networkFromStringIPv4(strNets, version)
else: # ':' in strNets or version == 6:
# If is in fact an IPv4 address, return None network, this will not be used
if '.' in strNets:
return NetworkType(0, 0, 0)
return networkFromStringIPv6(strNets, version)
return networkFromStringIPv4(strNets)
# ':' in strNets or version == 6:
# If is in fact an IPv4 address, return None network, this will not be used
if '.' in strNets:
return NetworkType(0, 0, 0)
return networkFromStringIPv6(strNets)
def networksFromString(
@ -260,7 +261,7 @@ def isValidIp(value: str, version: typing.Literal[0, 4, 6] = 0) -> bool:
# Using ipaddress module
try:
addr = ipaddress.ip_address(value)
return version == 0 or addr.version == version
return version in (0, addr.version) # Must be the same version or 0
except ValueError:
return False

View File

@ -42,6 +42,7 @@ if typing.TYPE_CHECKING:
logger = logging.getLogger(__name__)
class ObjectType(enum.Enum):
PROVIDER = (1, models.Provider)
SERVICE = (2, models.Service)
@ -68,15 +69,13 @@ class ObjectType(enum.Enum):
NOTIFICATION = (23, models.Notification)
TICKET_STORE = (24, models.TicketStore)
@property
def model(self) -> typing.Type['Model']:
return self.value[1]
@property
def type(self) -> int:
"""Returns the integer value of this object type
"""
"""Returns the integer value of this object type"""
return self.value[0]
@staticmethod
@ -84,7 +83,7 @@ class ObjectType(enum.Enum):
for objType in ObjectType:
if objType.model == type(model):
return objType
raise ValueError('Invalid model type: {}'.format(model))
raise ValueError(f'Invalid model type: {model}')
def __eq__(self, __o: object) -> bool:
"""Compares with another ObjType, and includes int comparison

View File

@ -33,12 +33,13 @@
import logging
import typing
# from django.utils.translation import gettext as _
from uds import models
from uds.models.permissions import PermissionType
from uds.core.util import objtype
from django.utils.translation import gettext as _
# Not imported at runtime, just for type checking
if typing.TYPE_CHECKING:

View File

@ -24,12 +24,13 @@ KEY_SIZE = 4096
SECRET_SIZE = 32
try:
# Ensure that we do not get warnings about self signed certificates and so
import requests.packages.urllib3 # type: ignore
requests.packages.urllib3.disable_warnings() # @UndefinedVariable
except:
requests.packages.urllib3.disable_warnings() # type: ignore # pylint: disable=no-member
except Exception: # nosec: simple check for disabling warnings,
# Igonre if we cannot disable warnings
pass
@ -88,7 +89,9 @@ def createClientSslContext(verify: bool = True) -> ssl.SSLContext:
Returns:
A SSLContext object.
"""
sslContext = ssl.create_default_context(purpose=ssl.Purpose.SERVER_AUTH, cafile=certifi.where())
sslContext = ssl.create_default_context(
purpose=ssl.Purpose.SERVER_AUTH, cafile=certifi.where()
)
if not verify:
sslContext.check_hostname = False
sslContext.verify_mode = ssl.VerifyMode.CERT_NONE
@ -98,7 +101,9 @@ def createClientSslContext(verify: bool = True) -> ssl.SSLContext:
# sslContext.options |= ssl.OP_NO_TLSv1 | ssl.OP_NO_TLSv1_1 | ssl.OP_NO_SSLv2 | ssl.OP_NO_SSLv3
if hasattr(settings, 'SECURE_MIN_TLS_VERSION') and settings.SECURE_MIN_TLS_VERSION:
# format is "1.0, 1.1, 1.2 or 1.3", convert to ssl.TLSVersion.TLSv1_0, ssl.TLSVersion.TLSv1_1, ssl.TLSVersion.TLSv1_2 or ssl.TLSVersion.TLSv1_3
sslContext.minimum_version = getattr(ssl.TLSVersion, 'TLSv' + settings.SECURE_MIN_TLS_VERSION.replace('.', '_'))
sslContext.minimum_version = getattr(
ssl.TLSVersion, 'TLSv' + settings.SECURE_MIN_TLS_VERSION.replace('.', '_')
)
else:
sslContext.minimum_version = ssl.TLSVersion.TLSv1_2
@ -140,7 +145,10 @@ def checkCertificateMatchPrivateKey(*, cert: str, key: str) -> bool:
# Even if the key or certificate is not valid, we only want a True if they match, False otherwise
return False
def secureRequestsSession(*, verify: typing.Union[str, bool] = True) -> 'requests.Session':
def secureRequestsSession(
*, verify: typing.Union[str, bool] = True
) -> 'requests.Session':
'''
Generates a requests.Session object with a custom adapter that uses a custom SSLContext.
This is intended to be used for requests that need to be secure, but not necessarily verified.
@ -152,41 +160,34 @@ def secureRequestsSession(*, verify: typing.Union[str, bool] = True) -> 'request
Returns:
A requests.Session object.
'''
# Copy verify value
lverify = verify
class UDSHTTPAdapter(requests.adapters.HTTPAdapter):
def init_poolmanager(self, *args, **kwargs) -> None:
kwargs["ssl_context"] = createClientSslContext(verify=verify is True)
# See urllib3.poolmanager.SSL_KEYWORDS for all available keys.
return super().init_poolmanager(*args, **kwargs)
def cert_verify(self, conn, url, lverify, cert) -> None:
def cert_verify(self, conn, url, verify, cert) -> None:
"""Verify a SSL certificate. This method should not be called from user
code, and is only exposed for use when subclassing the
:class:`HTTPAdapter <requests.adapters.HTTPAdapter>`.
:param conn: The urllib3 connection object associated with the cert.
:param url: The requested URL.
:param verify: Either a boolean, in which case it controls whether we verify
the server's TLS certificate, or a string, in which case it must be a path
to a CA bundle to use
:param cert: The SSL certificate to verify.
code, and is only exposed for use when subclassing the HTTPAdapter class
"""
# If lverify is an string, use it even if verify is False
# if not, use verify value
if not isinstance(lverify, str):
lverify = verify
if not isinstance(verify, str):
verify = lverify
# logger.info('Connection info: %s', conn)
# for k, v in conn.__dict__.items():
# logger.info('Connection info: %s = %s', k, v)
super().cert_verify(conn, url, lverify, cert)
super().cert_verify(conn, url, verify, cert)
session = requests.Session()
session.mount("https://", UDSHTTPAdapter())
return session

View File

@ -49,7 +49,6 @@ def serialize(obj: typing.Any) -> bytes:
# generate pickle dump and encrypt it to keep it safe
# Compress data using lzma first
data = CryptoManager().fastCrypt(
lzma.compress(pickle.dumps(obj))
) # With latest available protocol
@ -64,10 +63,11 @@ def deserialize(data: typing.Optional[bytes]) -> typing.Any:
return None
if data[0:2] in DESERIALIZERS:
return pickle.loads(lzma.decompress(DESERIALIZERS[data[0:2]](data[2:]))) # nosec: Secured by encryption
else:
# Old version, try to unpickle it
try:
return pickle.loads(data) # nosec: Backward compatibility
except Exception:
return None
return pickle.loads(
lzma.decompress(DESERIALIZERS[data[0:2]](data[2:]))
) # nosec: Secured by encryption
# Old version, try to unpickle it
try:
return pickle.loads(data) # nosec: Backward compatibility
except Exception:
return None

View File

@ -34,7 +34,6 @@ import typing
import json
from django.contrib.sessions.serializers import JSONSerializer
class SessionSerializer:
"""
Serializer for django sessions.
@ -43,7 +42,8 @@ class SessionSerializer:
"""
Serialize data for storage in a session.
"""
return json.dumps(data).encode()
return json.dumps(data, separators=(',', ':')).encode()
def loads(self, data: bytes) -> typing.Dict[str, typing.Any]:
"""

View File

@ -1,5 +1,6 @@
import typing
class Singleton(type):
'''
Metaclass for singleton pattern
@ -8,14 +9,15 @@ class Singleton(type):
class MyClass(metaclass=Singleton):
...
'''
_instance: typing.Optional[typing.Any]
# We use __init__ so we customise the created class from this metaclass
def __init__(self, *args, **kwargs) -> None:
self._instance = None
def __init__(cls, *args, **kwargs) -> None:
cls._instance = None
super().__init__(*args, **kwargs)
def __call__(self, *args, **kwargs) -> typing.Any:
if self._instance is None:
self._instance = super().__call__(*args, **kwargs)
return self._instance
def __call__(cls, *args, **kwargs) -> typing.Any:
if cls._instance is None:
cls._instance = super().__call__(*args, **kwargs)
return cls._instance

View File

@ -42,11 +42,7 @@ class StateQueue:
self._current = None
def __str__(self):
res = '<StateQueue Current: %s, Queue: (%s)>' % (
self._current,
','.join(state for state in self._queue),
)
return res
return f'<StateQueue Current: {self._current}, Queue: ({",".join(state for state in self._queue)})>'
def clearQueue(self) -> None:
self._queue.clear()

View File

@ -29,7 +29,7 @@
"""
@author: Adolfo Gómez, dkmaster at dkmon dot com
"""
import pickle # nosec: This is e controled pickle use
import pickle # nosec: This is e controled pickle use
import base64
import hashlib
import codecs
@ -66,7 +66,9 @@ def _decodeValue(
) -> typing.Tuple[str, typing.Any]:
if value:
try:
v = pickle.loads(base64.b64decode(value.encode())) # nosec: This is e controled pickle loading
v = pickle.loads(
base64.b64decode(value.encode())
) # nosec: This is e controled pickle loading
if isinstance(v, tuple) and v[0] == MARK:
return typing.cast(typing.Tuple[str, typing.Any], v[1:])
# Fix value so it contains also the "key" (in this case, the original key is lost, we have only the hash value...)
@ -75,7 +77,7 @@ def _decodeValue(
try:
return ('#' + dbk, base64.b64decode(value.encode()).decode())
except Exception as e:
logger.warn('Unknown decodeable value: %s (%s)', value, e)
logger.warning('Unknown decodeable value: %s (%s)', value, e)
return ('', None)
@ -108,9 +110,8 @@ class StorageAsDict(MutableMapping):
@property
def _db(self) -> typing.Union[models.QuerySet, models.Manager]:
if self._atomic:
return DBStorage.objects.select_for_update() # type: ignore
else:
return DBStorage.objects # type: ignore
return DBStorage.objects.select_for_update()
return DBStorage.objects
@property
def _filtered(self) -> 'models.QuerySet[DBStorage]':
@ -127,7 +128,7 @@ class StorageAsDict(MutableMapping):
def __getitem__(self, key: str) -> typing.Any:
if not isinstance(key, str):
raise TypeError('Key must be str, {} found'.format(type(key)))
raise TypeError(f'Key must be str, {type(key)} found')
dbk = self._key(key)
logger.debug('Getitem: %s', dbk)
@ -139,12 +140,13 @@ class StorageAsDict(MutableMapping):
def __setitem__(self, key: str, value: typing.Any) -> None:
if not isinstance(key, str):
raise TypeError('Key must be str type, {} found'.format(type(key)))
raise TypeError(f'Key must be str type, {type(key)} found')
dbk = self._key(key)
logger.debug('Setitem: %s = %s', dbk, value)
data = _encodeValue(key, value, self._compat)
c, created = DBStorage.objects.update_or_create(
# ignores return value, we don't care if it was created or updated
DBStorage.objects.update_or_create(
key=dbk, defaults={'data': data, 'attr1': self._group, 'owner': self._owner}
)
@ -179,7 +181,7 @@ class StorageAsDict(MutableMapping):
return self[key] or default
def delete(self, key: str) -> None:
self.__delitem__(key)
self.__delitem__(key) # pylint: disable=unnecessary-dunder-call
# Custom utility methods
@property
@ -312,7 +314,9 @@ class Storage:
def getPickle(self, skey: typing.Union[str, bytes]) -> typing.Any:
v = self.readData(skey, True)
if v:
return pickle.loads(typing.cast(bytes, v)) # nosec: This is e controled pickle loading
return pickle.loads(
typing.cast(bytes, v)
) # nosec: This is e controled pickle loading
return None
def getPickleByAttr1(self, attr1: str, forUpdate: bool = False):
@ -335,7 +339,9 @@ class Storage:
try:
# Process several keys at once
DBStorage.objects.filter(key__in=[self.getKey(k) for k in keys]).delete()
except Exception: # nosec: Not interested in processing exceptions, just ignores it
except (
Exception
): # nosec: Not interested in processing exceptions, just ignores it
pass
def lock(self):

View File

@ -32,7 +32,6 @@ Author: Adolfo Gómez, dkmaster at dkmon dot com
"""
import sys
import os
import re
import datetime
import unicodedata
import typing
@ -41,8 +40,6 @@ from django.utils import formats
from django.utils.translation import gettext
import django.template.defaultfilters as filters
from uds.core import exceptions
class CaseInsensitiveDict(dict):
@classmethod
@ -50,46 +47,46 @@ class CaseInsensitiveDict(dict):
return key.lower() if isinstance(key, str) else key
def __init__(self, *args, **kwargs):
super(CaseInsensitiveDict, self).__init__(*args, **kwargs)
super().__init__(*args, **kwargs)
self._convert_keys()
def __getitem__(self, key):
return super(CaseInsensitiveDict, self).__getitem__(self.__class__._k(key))
return super().__getitem__(self.__class__._k(key))
def __setitem__(self, key, value):
super(CaseInsensitiveDict, self).__setitem__(self.__class__._k(key), value)
super().__setitem__(self.__class__._k(key), value)
def __delitem__(self, key):
return super(CaseInsensitiveDict, self).__delitem__(self.__class__._k(key))
return super().__delitem__(self.__class__._k(key))
def __contains__(self, key):
return super(CaseInsensitiveDict, self).__contains__(self.__class__._k(key))
return super().__contains__(self.__class__._k(key))
def pop(self, key, *args, **kwargs):
return super(CaseInsensitiveDict, self).pop(
self.__class__._k(key), *args, **kwargs
return super().pop(
self.__class__._k(key), *args, **kwargs # pylint: disable=protected-access
)
def get(self, key, *args, **kwargs):
return super(CaseInsensitiveDict, self).get(
self.__class__._k(key), *args, **kwargs
return super().get(
self.__class__._k(key), *args, **kwargs # pylint: disable=protected-access
)
def setdefault(self, key, *args, **kwargs):
return super(CaseInsensitiveDict, self).setdefault(
self.__class__._k(key), *args, **kwargs
return super().setdefault(
self.__class__._k(key), *args, **kwargs # pylint: disable=protected-access
)
def update(self, E=None, **F):
if E is None:
E = {}
super(CaseInsensitiveDict, self).update(self.__class__(E))
super(CaseInsensitiveDict, self).update(self.__class__(**F))
super().update(self.__class__(E))
super().update(self.__class__(**F))
def _convert_keys(self):
for k in list(self.keys()):
v = super(CaseInsensitiveDict, self).pop(k)
self.__setitem__(k, v)
v = super().pop(k)
self.__setitem__(k, v) # pylint: disable=unnecessary-dunder-call
def as_list(value: typing.Any) -> typing.List[typing.Any]:
@ -104,7 +101,7 @@ def as_list(value: typing.Any) -> typing.List[typing.Any]:
if isinstance(value, (bytes, str, int, float)):
return [value]
try:
return [v for v in value]
return list(value)
except Exception:
return [value]

View File

@ -46,7 +46,7 @@ class UniqueGIDGenerator(UniqueIDGenerator):
def __toName(self, seq: int) -> str:
if seq == -1:
raise KeyError('No more GIDS available.')
return '{:s}{:08d}'.format(self._baseName, seq)
return f'{self._baseName}{seq:08d}'
# return "%s%0*d" % (self._baseName, 8, seq)
def get(self, rangeStart: int = 0, rangeEnd: int = MAX_SEQ) -> str: # type: ignore

View File

@ -148,9 +148,9 @@ class UniqueIDGenerator:
def transfer(self, seq: int, toUidGen: 'UniqueIDGenerator') -> bool:
self.__filter(0, forUpdate=True).filter(owner=self._owner, seq=seq).update(
owner=toUidGen._owner,
basename=toUidGen._baseName,
stamp=getSqlDatetimeAsUnix(), # pylint: disable=protected-access
owner=toUidGen._owner, # pylint: disable=protected-access
basename=toUidGen._baseName, # pylint: disable=protected-access
stamp=getSqlDatetimeAsUnix(),
)
return True

View File

@ -50,17 +50,17 @@ class UniqueMacGenerator(UniqueIDGenerator):
def __toMac(self, seq: int) -> str:
if seq == -1: # No mor macs available
return '00:00:00:00:00:00'
return re.sub(r"(..)", r"\1:", "%0*X" % (12, seq))[:-1]
return re.sub(r"(..)", r"\1:", f'{seq:012X}')[:-1]
# noinspection PyMethodOverriding
def get(self, macRange: str) -> str: # type: ignore
def get(self, macRange: str) -> str: # type: ignore # pylint: disable=arguments-differ
firstMac, lastMac = macRange.split('-')
return self.__toMac(super().get(self.__toInt(firstMac), self.__toInt(lastMac)))
def transfer(self, mac: str, toUMgen: 'UniqueMacGenerator'): # type: ignore
def transfer(self, mac: str, toUMgen: 'UniqueMacGenerator'): # type: ignore # pylint: disable=arguments-renamed
super().transfer(self.__toInt(mac), toUMgen)
def free(self, mac: str) -> None:
def free(self, mac: str) -> None: # type: ignore # pylint: disable=arguments-renamed
super().free(self.__toInt(mac))
# Release is inherited, no mod needed

View File

@ -47,15 +47,15 @@ class UniqueNameGenerator(UniqueIDGenerator):
def __toName(self, seq: int, length: int) -> str:
if seq == -1:
raise KeyError('No more names available. Please, increase service digits.')
return "%s%0*d" % (self._baseName, length, seq)
return f'{self._baseName}{seq:0{length}d}'
def get(self, baseName: str, length: int = 5) -> str: # type: ignore # pylint: disable=arguments-differ
def get(self, baseName: str, length: int = 5) -> str: # type: ignore # pylint: disable=arguments-differ,arguments-renamed
self.setBaseName(baseName)
minVal = 0
maxVal = 10 ** length - 1
maxVal = 10**length - 1
return self.__toName(super().get(minVal, maxVal), length)
def transfer(self, baseName: str, name: str, toUNGen: 'UniqueNameGenerator') -> None: # type: ignore
def transfer(self, baseName: str, name: str, toUNGen: 'UniqueNameGenerator') -> None: # type: ignore # pylint: disable=arguments-differ
self.setBaseName(baseName)
super().transfer(int(name[len(self._baseName) :]), toUNGen)

View File

@ -65,16 +65,14 @@ def validateNumeric(
numeric = int(value)
if minValue is not None and numeric < minValue:
raise exceptions.ValidationError(
_(
'{0} must be greater than or equal to {1}'.format(
fieldName, minValue
)
_('{0} must be greater than or equal to {1}').format(
fieldName, minValue
)
)
if maxValue is not None and numeric > maxValue:
raise exceptions.ValidationError(
_('{0} must be lower than or equal to {1}'.format(fieldName, maxValue))
_('{0} must be lower than or equal to {1}').format(fieldName, maxValue)
)
value = str(numeric)
@ -82,7 +80,7 @@ def validateNumeric(
except ValueError:
raise exceptions.ValidationError(
_('{0} contains invalid characters').format(fieldName)
)
) from None
return int(value)
@ -139,7 +137,7 @@ def validateIpv4(ipv4: str) -> str:
except Exception:
raise exceptions.ValidationError(
_('{} is not a valid IPv4 address').format(ipv4)
)
) from None
return ipv4
@ -155,7 +153,7 @@ def validateIpv6(ipv6: str) -> str:
except Exception:
raise exceptions.ValidationError(
_('{} is not a valid IPv6 address').format(ipv6)
)
) from None
return ipv6
@ -171,7 +169,7 @@ def validateIpv4OrIpv6(ipv4OrIpv6: str) -> str:
except Exception:
raise exceptions.ValidationError(
_('{} is not a valid IPv4 or IPv6 address').format(ipv4OrIpv6)
)
) from None
return ipv4OrIpv6
@ -216,9 +214,7 @@ def validatePath(
)
else:
if not valid_for_windows.match(path) and not valid_for_unix.match(path):
raise exceptions.ValidationError(
_('{} is not a valid path').format(path)
)
raise exceptions.ValidationError(_('{} is not a valid path').format(path))
return path
@ -246,7 +242,7 @@ def validateHostPortPair(hostPortPair: str) -> typing.Tuple[str, int]:
except Exception:
raise exceptions.ValidationError(
_('{} is not a valid host:port pair').format(hostPortPair)
)
) from None
def validateTimeout(timeOutStr: str) -> int:
@ -273,9 +269,7 @@ def validateMac(mac: str) -> str:
) # In fact, it could be XX-XX-XX-XX-XX-XX, but we use - as range separator
if macRE.match(mac) is None:
raise exceptions.ValidationError(
_('{} is not a valid MAC address').format(mac)
)
raise exceptions.ValidationError(_('{} is not a valid MAC address').format(mac))
return mac
@ -293,7 +287,7 @@ def validateMacRange(macRange: str) -> str:
except Exception:
raise exceptions.ValidationError(
_('{} is not a valid MAC range').format(macRange)
)
) from None
return macRange
@ -343,8 +337,6 @@ def validateBasename(baseName: str, length: int = -1) -> str:
)
if baseName.isdigit():
raise exceptions.ValidationError(
_('The machine name can\'t be only numbers')
)
raise exceptions.ValidationError(_('The machine name can\'t be only numbers'))
return baseName

View File

@ -35,8 +35,6 @@ import typing
from django.urls import reverse
from django.http import HttpResponsePermanentRedirect
from uds.core.util.config import GlobalConfig
from . import builder
logger = logging.getLogger(__name__)
@ -46,9 +44,9 @@ if typing.TYPE_CHECKING:
def _check_redirectable(request: 'HttpRequest') -> typing.Optional['HttpResponse']:
if request.is_secure():
return None
return None
return HttpResponsePermanentRedirect(reverse('page.login'))
return HttpResponsePermanentRedirect(request.build_absolute_uri(reverse('page.index')).replace('http://', 'https://', 1))
# Compatibility with old middleware, so we can use it in settings.py as it was