1
0
mirror of https://github.com/dkmstr/openuds.git synced 2025-01-03 01:17:56 +03:00

Refactor network_from_str function to handle IPv6 addresses

Added sample set proxies list from cloudflare list
This commit is contained in:
Adolfo Gómez García 2024-09-30 20:53:39 +02:00
parent 093c7b3094
commit 81c5429f40
No known key found for this signature in database
GPG Key ID: DD1ABF20724CDA23
4 changed files with 178 additions and 44 deletions

View File

@ -0,0 +1,126 @@
# -*- coding: utf-8 -*-
#
# Copyright (c) 2024 Virtual Cable S.L.U.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without modification,
# are permitted provided that the following conditions are met:
#
# * Redistributions of source code must retain the above copyright notice,
# this list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above copyright notice,
# this list of conditions and the following disclaimer in the documentation
# and/or other materials provided with the distribution.
# * Neither the name of Virtual Cable S.L.U. nor the names of its contributors
# may be used to endorse or promote products derived from this software
# without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
# DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
# SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
'''
Author: Adolfo Gómez, dkmaster at dkmon dot com
'''
import typing
import asyncio
import aiohttp
AUTH_NAME: typing.Final[str] = 'interna'
AUTH_USER: typing.Final[str] = 'admin'
AUTH_PASS: typing.Final[str] = 'temporal'
REST_URL: typing.Final[str] = 'http://172.27.0.1:8000/uds/rest/'
class RESTException(Exception):
pass
class AuthException(RESTException):
pass
class LogoutException(RESTException):
pass
# Hace login con el root, puede usarse cualquier autenticador y cualquier usuario, pero en la 1.5 solo está implementado poder hacer
# este tipo de login con el usuario "root"
async def login(session: aiohttp.ClientSession) -> None:
parameters = {
'auth': AUTH_NAME,
'username': AUTH_USER,
'password': AUTH_PASS,
}
response = await session.post(REST_URL + 'auth/login', json=parameters)
if not response.ok:
raise AuthException('Error logging in')
# resp contiene las cabeceras, content el contenido de la respuesta (que es json), pero aún está en formato texto
res = await response.json()
print(res)
if res['result'] != 'ok': # Authentication error
raise AuthException('Authentication error')
session.headers.update({'X-Auth-Token': res['token']})
session.headers.update({'Scrambler': res['scrambler']})
# Fix user agent, so we indicate we are on Linux
session.headers.update({'User-Agent': 'SampleClient/1.0 (Linux)'})
async def logout(session: aiohttp.ClientSession) -> None:
response = await session.get(REST_URL + 'auth/logout')
if not response.ok:
raise LogoutException('Error logging out')
async def set_config_var(section: str, name: str, value: str, session: aiohttp.ClientSession) -> None:
response = await session.put(
REST_URL + 'config',
json={
section: {
name: {
'value': value,
}
}
},
)
if not response.ok:
raise RESTException('Error setting config var')
async def main():
async with aiohttp.ClientSession() as session:
await login(session) # Will raise an exception if error
# Get ipv4 and ipv6 from cloudflare
ips: typing.List[str] = []
for url in ['https://www.cloudflare.com/ips-v4', 'https://www.cloudflare.com/ips-v6']:
response = await session.get(url)
if not response.ok:
raise RESTException('Error getting cloudflare ips')
ips += (await response.text()).strip().split('\n')
await set_config_var('Security', 'Allowed IP Forwarders', ','.join(ips), session)
await logout(session)
if __name__ == "__main__":
loop = asyncio.new_event_loop()
loop.run_until_complete(main())

View File

@ -127,18 +127,18 @@ class Login(Handler):
):
raise exceptions.rest.RequestError('Invalid parameters (no auth)')
authId: typing.Optional[str] = self._params.get(
auth_id: typing.Optional[str] = self._params.get(
'auth_id',
self._params.get('authId', None), # Old compat, alias
)
authLabel: typing.Optional[str] = self._params.get(
auth_label: typing.Optional[str] = self._params.get(
'auth_label',
self._params.get(
'authSmallName', # Old compat name
self._params.get('authLabel', None), # Old compat name
),
)
authName: typing.Optional[str] = self._params.get('auth', None)
auth_name: typing.Optional[str] = self._params.get('auth', None)
platform: str = self._params.get('platform', self._request.os.os.value[0])
username: str = self._params['username']
@ -148,12 +148,12 @@ class Login(Handler):
# Generate a random scrambler
scrambler: str = CryptoManager.manager().random_string(32)
if (
authName == 'admin'
or authLabel == 'admin'
or authId == '00000000-0000-0000-0000-000000000000'
or (not authId and not authName and not authLabel)
auth_name == 'admin'
or auth_label == 'admin'
or auth_id == '00000000-0000-0000-0000-000000000000'
or (not auth_id and not auth_name and not auth_label)
):
if GlobalConfig.SUPER_USER_LOGIN.get(True) == username and CryptoManager().check_hash(
if GlobalConfig.SUPER_USER_LOGIN.get(True) == username and CryptoManager.manager().check_hash(
password, GlobalConfig.SUPER_USER_PASS.get(True)
):
self.gen_auth_token(-1, username, password, locale, platform, True, True, scrambler)
@ -161,12 +161,12 @@ class Login(Handler):
return Login.result(error='Invalid credentials')
# Will raise an exception if no auth found
if authId:
auth = Authenticator.objects.get(uuid=process_uuid(authId))
elif authName:
auth = Authenticator.objects.get(name=authName)
if auth_id:
auth = Authenticator.objects.get(uuid=process_uuid(auth_id))
elif auth_name:
auth = Authenticator.objects.get(name__iexact=auth_name)
else:
auth = Authenticator.objects.get(small_name=authLabel)
auth = Authenticator.objects.get(small_name__iexact=auth_label)
# No matter in fact the password, just not empty (so it can be encrypted, but will be invalid anyway)
password = password or CryptoManager().random_string(32)

View File

@ -180,7 +180,9 @@ def needs_trusted_source(
"""
@wraps(view_func)
def _wrapped_view(request: 'types.requests.ExtendedHttpRequest', *args: typing.Any, **kwargs: typing.Any) -> HttpResponse:
def _wrapped_view(
request: 'types.requests.ExtendedHttpRequest', *args: typing.Any, **kwargs: typing.Any
) -> HttpResponse:
"""
Wrapped function for decorator
"""
@ -202,7 +204,9 @@ def needs_trusted_source(
# it's designed to be used in ajax calls mainly
def deny_non_authenticated(view_func: collections.abc.Callable[..., RT]) -> collections.abc.Callable[..., RT]:
@wraps(view_func)
def _wrapped_view(request: 'types.requests.ExtendedHttpRequest', *args: typing.Any, **kwargs: typing.Any) -> RT:
def _wrapped_view(
request: 'types.requests.ExtendedHttpRequest', *args: typing.Any, **kwargs: typing.Any
) -> RT:
if not request.user or not request.authorized:
return HttpResponseForbidden() # type: ignore
return view_func(request, *args, **kwargs)
@ -215,6 +219,7 @@ def register_user(
auth_instance: AuthenticatorInstance,
username: str,
request: 'types.requests.ExtendedHttpRequest',
skip_callbacks: bool = False,
) -> types.auth.LoginResult:
"""
Check if this user already exists on database with this authenticator, if don't, create it with defaults
@ -239,9 +244,9 @@ def register_user(
browser=request.os.browser,
version=request.os.version,
)
# Try to notify callback if needed
callbacks.weblogin(usr)
if not skip_callbacks:
callbacks.weblogin(usr)
return types.auth.LoginResult(user=usr)
return types.auth.LoginResult()
@ -252,19 +257,17 @@ def authenticate(
password: str,
authenticator: models.Authenticator,
request: 'types.requests.ExtendedHttpRequest',
skip_callbacks: bool = False,
) -> types.auth.LoginResult:
"""
Given an username, password and authenticator, try to authenticate user
@param username: username to authenticate
@param password: password to authenticate this user
@param authenticator: Authenticator (database object) used to authenticate with provided credentials
@param request: Request object
@return:
An types.auth.LoginResult indicating:
user if success in logging in field user or None if not
url if not success in logging in field url so instead of error UDS will redirect to this url
Authenticate user with provided credentials
Args:
username (str): username to authenticate
password (str): password to authenticate this user
authenticator (models.Authenticator): Authenticator (database object) used to authenticate with provided credentials
request (ExtendedHttpRequestWithUser): Request object
skip_callbacks (bool, optional): Skip callbacks. Defaults to False.
"""
logger.debug('Authenticating user %s with authenticator %s', username, authenticator)
@ -354,12 +357,11 @@ def authenticate_via_callback(
if result.success == types.auth.AuthenticationState.REDIRECT:
return types.auth.LoginResult(url=result.url)
if result.username:
return register_user(authenticator, auth_instance, result.username or '', request)
else:
if not result.username:
logger.warning('Authenticator %s returned empty username', authenticator.name)
raise exceptions.auth.InvalidUserException('User doesn\'t has access to UDS')
raise exceptions.auth.InvalidUserException('User doesn\'t has access to UDS')
return register_user(authenticator, auth_instance, result.username, request)
def authenticate_callback_url(authenticator: models.Authenticator) -> str:
@ -447,7 +449,9 @@ def web_password(request: HttpRequest) -> str:
return CryptoManager().symmetric_decrypt(passkey, uds_cookie(request)) # recover as original unicode string
def web_logout(request: 'types.requests.ExtendedHttpRequest', exit_url: typing.Optional[str] = None) -> HttpResponse:
def web_logout(
request: 'types.requests.ExtendedHttpRequest', exit_url: typing.Optional[str] = None
) -> HttpResponse:
"""
Helper function to clear user related data from session. If this method is not used, the session we be cleaned anyway
by django in regular basis.

View File

@ -30,6 +30,7 @@
"""
Author: Adolfo Gómez, dkmaster at dkmon dot com
"""
import functools
import ipaddress
import logging
import re
@ -210,29 +211,32 @@ def network_from_str_ipv6(strNets: str) -> NetworkType:
def network_from_str(
strNets: str,
network_str: str,
version: typing.Literal[0, 4, 6] = 0,
) -> NetworkType:
if not ':' in strNets and version != 6:
return network_from_str_ipv4(strNets)
# ':' in strNets or version == 6:
# If is in fact an IPv4 address, return None network, this will not be used
if '.' in strNets:
try:
if not ':' in network_str and version != 6:
return network_from_str_ipv4(network_str)
# ':' in strNets or version == 6:
# If is in fact an IPv4 address, return None network, this will not be used
if '.' in network_str:
return NetworkType(0, 0, 0)
return network_from_str_ipv6(network_str)
except ValueError:
return NetworkType(0, 0, 0)
return network_from_str_ipv6(strNets)
@functools.lru_cache(maxsize=32)
def networks_from_str(
nets: str,
networks_str: str,
version: typing.Literal[0, 4, 6] = 0,
) -> list[NetworkType]:
"""
If allowMultipleNetworks is True, it allows ',' and ';' separators (and, ofc, more than 1 network)
Returns a list of networks tuples in the form [(start1, end1), (start2, end2) ...]
"""
return [network_from_str(str_net, version) for str_net in re.split('[;,]', nets) if str_net]
return [network_from_str(str_net, version) for str_net in re.split('[;,]', networks_str) if str_net]
@functools.lru_cache(maxsize=32)
def contains(
networks: typing.Union[str, NetworkType, list[NetworkType]],
ip: typing.Union[str, int],