1
0
mirror of https://github.com/dkmstr/openuds.git synced 2024-12-22 13:34:04 +03:00

More linting and more managers refactoring

This commit is contained in:
Adolfo Gómez García 2023-04-19 23:57:36 +02:00
parent 60fd7edc7b
commit b582a26be2
No known key found for this signature in database
GPG Key ID: DD1ABF20724CDA23
26 changed files with 174 additions and 264 deletions

View File

@ -33,7 +33,6 @@ import functools
import logging
from uds import models
from uds.core.managers import cryptoManager
from ...utils import rest
from ...fixtures import rest as rest_fixtures
@ -46,6 +45,7 @@ class GroupsTest(rest.test.RESTActorTestCase):
"""
Test users group rest api
"""
def setUp(self) -> None:
# Override number of items to create
rest.test.NUMBER_OF_ITEMS_TO_CREATE = 16
@ -66,7 +66,9 @@ class GroupsTest(rest.test.RESTActorTestCase):
for group in groups:
# Locate the group in the auth
dbgrp = self.auth.groups.get(name=group['name'])
self.assertTrue(rest.assertions.assertGroupIs(dbgrp, group, compare_uuid=True))
self.assertTrue(
rest.assertions.assertGroupIs(dbgrp, group, compare_uuid=True)
)
def test_groups_tableinfo(self) -> None:
url = f'authenticators/{self.auth.uuid}/groups/tableinfo'
@ -110,7 +112,6 @@ class GroupsTest(rest.test.RESTActorTestCase):
response = self.client.rest_get(f'{url}/invalid')
self.assertEqual(response.status_code, 404)
def test_group_create_edit(self) -> None:
url = f'authenticators/{self.auth.uuid}/groups'
# Normal group
@ -133,75 +134,12 @@ class GroupsTest(rest.test.RESTActorTestCase):
self.assertEqual(response.status_code, 400)
# Now a meta group, with some groups inside
groups = [self.simple_groups[0].uuid]
return
url = f'authenticators/{self.auth.uuid}/users'
user_dct: typing.Dict[str, typing.Any] = {
'name': 'test',
'real_name': 'test real name',
'comments': 'test comments',
'state': 'A',
'is_admin': True,
'staff_member': True,
'groups': [self.groups[0].uuid, self.groups[1].uuid],
}
# Now, will work
response = self.client.rest_put(
url,
user_dct,
content_type='application/json',
# groups = [self.simple_groups[0].uuid]
group_dct = rest_fixtures.createGroup(
meta=True, groups=[self.simple_groups[0].uuid, self.simple_groups[1].uuid]
)
# Get user from database and ensure values are correct
dbusr = self.auth.users.get(name=user_dct['name'])
self.assertEqual(user_dct['name'], dbusr.name)
self.assertEqual(user_dct['real_name'], dbusr.real_name)
self.assertEqual(user_dct['comments'], dbusr.comments)
self.assertEqual(user_dct['is_admin'], dbusr.is_admin)
self.assertEqual(user_dct['staff_member'], dbusr.staff_member)
self.assertEqual(user_dct['state'], dbusr.state)
self.assertEqual(user_dct['groups'], [i.uuid for i in dbusr.groups.all()])
self.assertEqual(response.status_code, 200)
# Returns nothing
# Now, will fail because name is already in use
response = self.client.rest_put(
url,
user_dct,
content_type='application/json',
)
self.assertEqual(response.status_code, 400)
# Modify saved user
user_dct['name'] = 'test2'
user_dct['real_name'] = 'test real name 2'
user_dct['comments'] = 'test comments 2'
user_dct['state'] = 'D'
user_dct['is_admin'] = False
user_dct['staff_member'] = False
user_dct['groups'] = [self.groups[2].uuid]
user_dct['id'] = dbusr.uuid
user_dct['password'] = 'test' # nosec: test password
user_dct['mfa_data'] = 'mfadata'
response = self.client.rest_put(
url,
user_dct,
content_type='application/json',
group_dct,
)
self.assertEqual(response.status_code, 200)
# Get user from database and ensure values are correct
dbusr = self.auth.users.get(name=user_dct['name'])
self.assertEqual(user_dct['name'], dbusr.name)
self.assertEqual(user_dct['real_name'], dbusr.real_name)
self.assertEqual(user_dct['comments'], dbusr.comments)
self.assertEqual(user_dct['is_admin'], dbusr.is_admin)
self.assertEqual(user_dct['staff_member'], dbusr.staff_member)
self.assertEqual(user_dct['state'], dbusr.state)
self.assertEqual(user_dct['groups'], [i.uuid for i in dbusr.groups.all()])
self.assertEqual(cryptoManager().checkHash(user_dct['password'], dbusr.password), True)

View File

@ -36,10 +36,8 @@ from uds.core.util import states
from uds.core.managers.crypto import CryptoManager
# Counters so we can reinvoke the same method and generate new data
glob = {
'user_id': 0,
'group_id': 0
}
glob = {'user_id': 0, 'group_id': 0}
def createAuthenticator(
authenticator: typing.Optional[models.Authenticator] = None,
@ -48,6 +46,7 @@ def createAuthenticator(
Creates a testing authenticator
"""
if authenticator is None:
# pylint: disable=import-outside-toplevel
from uds.auths.InternalDB.authenticator import InternalDBAuth
authenticator = models.Authenticator()
@ -74,10 +73,10 @@ def createUsers(
"""
users = [
authenticator.users.create(
name='user{}'.format(i),
password=CryptoManager().hash('user{}'.format(i)),
real_name='Real name {}'.format(i),
comments='User {}'.format(i),
name=f'user{i}',
password=CryptoManager().hash(f'user{i}'),
real_name=f'Real name {i}',
comments=f'User {i}',
staff_member=is_staff or is_admin,
is_admin=is_admin,
state=states.common.ACTIVE if enabled else states.common.BLOCKED,
@ -103,8 +102,8 @@ def createGroups(
"""
groups = [
authenticator.groups.create(
name='group{}'.format(i),
comments='Group {}'.format(i),
name=f'group{i}',
comments=f'Group {i}',
is_meta=False,
)
for i in range(glob['group_id'], glob['group_id'] + number_of_groups)
@ -123,8 +122,8 @@ def createMetaGroups(
"""
meta_groups = [
authenticator.groups.create(
name='meta-group{}'.format(i),
comments='Meta group {}'.format(i),
name=f'meta-group{i}',
comments=f'Meta group {i}',
is_meta=True,
meta_if_any=i % 2 == 0,
)
@ -136,9 +135,11 @@ def createMetaGroups(
groups = list(authenticator.groups.all())
if groups:
for meta in meta_groups:
for group in random.sample(groups, random.randint(1, len(groups)//2)): # nosec: testing only
for group in random.sample(
groups, random.randint(1, len(groups) // 2) # nosec: testing only
):
meta.groups.add(group)
glob['group_id'] += number_of_meta
return meta_groups

View File

@ -36,13 +36,14 @@ import typing
from django.test import SimpleTestCase
from django.test.client import Client
# Not used, alloes "rest.test" or "rest.assertions"
from . import test
from . import assertions
from uds.REST.handlers import AUTH_TOKEN_HEADER
# Not used, allows "rest.test" or "rest.assertions"
from . import test # pylint: disable=unused-import
from . import assertions # pylint: disable=unused-import
from .. import generators
from uds.REST.handlers import AUTH_TOKEN_HEADER
# Calls REST login
def login(
@ -67,7 +68,7 @@ def login(
caller.assertEqual(
response.status_code,
expectedResponseCode,
'Login from {}'.format(errorMessage or caller.__class__.__name__),
f'Login from {errorMessage or caller.__class__.__name__}',
)
if response.status_code == 200:
@ -83,16 +84,18 @@ def logout(caller: SimpleTestCase, client: Client, auth_token: str) -> None:
**{AUTH_TOKEN_HEADER: auth_token}
)
caller.assertEqual(
response.status_code, 200, 'Logout Result: {}'.format(response.content)
response.status_code, 200, f'Logout Result: {response.content}'
)
caller.assertEqual(
response.json(), {'result': 'ok'}, 'Logout Result: {}'.format(response.content)
response.json(), {'result': 'ok'}, 'Logout Result: {response.content}'
)
# Rest related utils for fixtures
# Just a holder for a type, to indentify uuids
# pylint: disable=too-few-public-methods
class uuid_type:
pass
@ -100,7 +103,7 @@ class uuid_type:
RestFieldType = typing.Tuple[str, typing.Union[typing.Type, typing.Tuple[str, ...]]]
RestFieldReference = typing.Final[typing.List[RestFieldType]]
# pylint: disable=too-many-return-statements
def random_value(
field_type: typing.Union[typing.Type, typing.Tuple[str, ...]],
value: typing.Any = None,
@ -125,10 +128,15 @@ def random_value(
if field_type == typing.List[int]:
return [generators.random_int() for _ in range(generators.random_int(1, 10))]
if field_type == typing.List[bool]:
return [random.choice([True, False]) for _ in range(generators.random_int(1, 10))] # nosec
return [
random.choice([True, False]) for _ in range(generators.random_int(1, 10)) # nosec: test values
]
if field_type == typing.List[typing.Tuple[str, str]]:
return [(generators.random_utf8_string(), generators.random_utf8_string()) for _ in range(generators.random_int(1, 10))]
return [
(generators.random_utf8_string(), generators.random_utf8_string())
for _ in range(generators.random_int(1, 10))
]
return None
@ -139,13 +147,13 @@ class RestStruct:
def as_dict(self, **kwargs) -> typing.Dict[str, typing.Any]:
# Use kwargs to override values
res = {k: kwargs.get(k, getattr(self, k)) for k in self.__annotations__}
res = {k: kwargs.get(k, getattr(self, k)) for k in self.__annotations__} # pylint: disable=no-member
# Remove None values for optional fields
return {
k: v
for k, v in res.items()
if v is not None
or self.__annotations__[k]
or self.__annotations__[k] # pylint: disable=no-member
not in (
typing.Optional[str],
typing.Optional[bool],

View File

@ -32,8 +32,7 @@ import logging
import typing
from uds import models
from uds.core.auths.user import User as aUser
from uds.core.managers import cryptoManager
from uds.core.managers.crypto import CryptoManager
from .. import ensure_data
@ -86,7 +85,7 @@ def assertUserIs(
# Compare password
if compare_password:
if not cryptoManager().checkHash(compare_to['password'], user.password):
if not CryptoManager().checkHash(compare_to['password'], user.password):
logger.info(
'User password do not match: %s != %s',
user.password,

View File

@ -28,19 +28,19 @@
"""
@author: Adolfo Gómez, dkmaster at dkmon dot com
"""
import logging
import typing
from uds import models
from uds.core.util import log
from uds.REST.handlers import AUTH_TOKEN_HEADER
from .. import test, generators, rest, constants
from ...fixtures import (
authenticators as authenticators_fixtures,
services as services_fixtures,
)
from uds.REST.handlers import AUTH_TOKEN_HEADER
NUMBER_OF_ITEMS_TO_CREATE = 4

View File

@ -40,7 +40,7 @@ from uds.core.util.config import GlobalConfig
from uds.core.auths.auth import getRootUser
from uds.core.util import net
from uds.models import Authenticator, User
from uds.core.managers import cryptoManager
from uds.core.managers.crypto import CryptoManager
from .exceptions import AccessDenied
@ -237,7 +237,7 @@ class Handler:
# crypt password and convert to base64
passwd = codecs.encode(
cryptoManager().symCrypt(password, scrambler), 'base64'
CryptoManager().symCrypt(password, scrambler), 'base64'
).decode()
session['REST'] = {

View File

@ -38,7 +38,7 @@ import typing
from uds.REST import Handler
from uds.REST import RequestError
from uds import models
from uds.core.managers import cryptoManager
from uds.core.managers.crypto import CryptoManager
from uds.core.util.model import processUuid
from uds.core.util import tools
@ -121,17 +121,25 @@ class Tickets(Handler):
raise RequestError('Invalid method')
try:
for i in ('authId', 'auth_id', 'authTag', 'auth_tag', 'auth', 'auth_name', 'authSmallName'):
for i in (
'authId',
'auth_id',
'authTag',
'auth_tag',
'auth',
'auth_name',
'authSmallName',
):
if i in self._params:
raise StopIteration
if 'username' in self._params and 'groups' in self._params:
raise StopIteration()
raise RequestError('Invalid parameters (no auth or username/groups)')
except StopIteration:
pass # All ok
pass # All ok
# Must be invoked as '/rest/ticket/create, with "username", ("authId" or "auth_id") or ("auth_tag" or "authSmallName" or "authTag"), "groups" (array) and optionally "time" (in seconds) as paramteres
def put(
self,
@ -258,7 +266,7 @@ class Tickets(Handler):
data = {
'username': username,
'password': cryptoManager().encrypt(password),
'password': CryptoManager().encrypt(password),
'realname': realname,
'groups': groupIds,
'auth': auth.uuid,

View File

@ -44,7 +44,7 @@ from uds.core.auths.user import User as aUser
from uds.core.util import log
from uds.core.util.model import processUuid
from uds.models import Authenticator, User, Group, ServicePool
from uds.core.managers import cryptoManager
from uds.core.managers.crypto import CryptoManager
from uds.REST import RequestError
from uds.core.ui.images import DEFAULT_THUMB_BASE64
@ -208,7 +208,7 @@ class Users(DetailHandler):
if 'password' in self._params:
valid_fields.append('password')
self._params['password'] = cryptoManager().hash(self._params['password'])
self._params['password'] = CryptoManager().hash(self._params['password'])
if 'mfa_data' in self._params:
valid_fields.append('mfa_data')
@ -252,8 +252,8 @@ class Users(DetailHandler):
raise RequestError(str(e)) from e
except ValidationError as e:
raise RequestError(str(e.message)) from e
except RequestError:
raise
except RequestError: # pylint: disable=try-except-raise
raise # Re-raise
except Exception as e:
logger.exception('Saving user')
raise self.invalidRequestException() from e
@ -484,8 +484,8 @@ class Groups(DetailHandler):
raise RequestError(_('User already exists (duplicate key error)')) from None
except AuthenticatorException as e:
raise RequestError(str(e)) from e
except RequestError:
raise
except RequestError: # pylint: disable=try-except-raise
raise # Re-raise
except Exception as e:
logger.exception('Saving group')
raise self.invalidRequestException() from e

View File

@ -41,7 +41,7 @@ import dns.reversename
from django.utils.translation import gettext_noop as _
from uds.core import auths
from uds.core.ui import gui
from uds.core.managers import cryptoManager
from uds.core.managers.crypto import CryptoManager
from uds.core.util.state import State
from uds.core.auths.auth import authLogLogin
@ -159,7 +159,7 @@ class InternalDBAuth(auths.Authenticator):
return auths.FAILED_AUTH
# Internal Db Auth has its own groups. (That is, no external source). If a group is active it is valid
if cryptoManager().checkHash(credentials, user.password):
if CryptoManager().checkHash(credentials, user.password):
groupsManager.validate([g.name for g in user.groups.all()])
return auths.SUCCESS_AUTH

View File

@ -37,7 +37,7 @@ from django.utils.translation import gettext_noop as _
from uds.core.ui import gui
from uds.core import auths
from uds.core.managers import cryptoManager
from uds.core.managers.crypto import CryptoManager
from uds.core.auths.auth import authLogLogin
from . import client
@ -207,7 +207,7 @@ class RadiusAuth(auths.Authenticator):
connection = self.radiusClient()
# Reply is not important...
connection.authenticate(
cryptoManager().randomString(10), cryptoManager().randomString(10)
CryptoManager().randomString(10), CryptoManager().randomString(10)
)
except client.RadiusAuthenticationError:
pass

View File

@ -48,7 +48,7 @@ from django.utils.translation import gettext_noop as _, gettext
from uds.models import getSqlDatetime
from uds.core.ui import gui
from uds.core import auths, exceptions
from uds.core.managers import cryptoManager
from uds.core.managers.crypto import CryptoManager
from uds.core.util.decorators import allowCache
from uds.core.util import security
@ -354,7 +354,7 @@ class SAMLAuthenticator(auths.Authenticator):
)
try:
cryptoManager().loadCertificate(self.serverCertificate.value)
CryptoManager().loadCertificate(self.serverCertificate.value)
except Exception as e:
raise exceptions.ValidationError(
gettext('Invalid server certificate. ') + str(e)
@ -373,7 +373,7 @@ class SAMLAuthenticator(auths.Authenticator):
)
try:
cryptoManager().loadPrivateKey(self.privateKey.value)
CryptoManager().loadPrivateKey(self.privateKey.value)
except Exception as e:
raise exceptions.ValidationError(gettext('Invalid private key. ') + str(e))
@ -555,8 +555,7 @@ class SAMLAuthenticator(auths.Authenticator):
)
if isinstance(metadata, str):
return metadata
else:
return typing.cast(bytes, metadata).decode()
return typing.cast(bytes, metadata).decode()
def validateField(self, field: gui.TextField):
"""

View File

@ -54,7 +54,7 @@ from uds.core.util import net
from uds.core.util.config import GlobalConfig
from uds.core.util.stats import events
from uds.core.util.state import State
from uds.core.managers import cryptoManager
from uds.core.managers.crypto import CryptoManager
from uds.core.auths import Authenticator as AuthenticatorInstance, SUCCESS_AUTH
from uds import models
@ -92,7 +92,7 @@ def getUDSCookie(
Generates a random cookie for uds, used, for example, to encript things
"""
if 'uds' not in request.COOKIES:
cookie = cryptoManager().randomString(UDS_COOKIE_LENGTH)
cookie = CryptoManager().randomString(UDS_COOKIE_LENGTH)
if response is not None:
response.set_cookie(
'uds',
@ -429,7 +429,7 @@ def webLogin(
request.session[USER_KEY] = user.id
request.session[PASS_KEY] = codecs.encode(
cryptoManager().symCrypt(password, cookie), "base64"
CryptoManager().symCrypt(password, cookie), "base64"
).decode() # as str
# Ensures that this user will have access through REST api if logged in through web interface
@ -456,11 +456,11 @@ def webPassword(request: HttpRequest) -> str:
"""
if hasattr(request, 'session'):
passkey = codecs.decode(request.session.get(PASS_KEY, '').encode(), 'base64')
return cryptoManager().symDecrpyt(
return CryptoManager().symDecrpyt(
passkey, getUDSCookie(request)
) # recover as original unicode string
# No session, get from _session instead, this is an "client" REST request
return cryptoManager().symDecrpyt(
return CryptoManager().symDecrpyt(
getattr(request, '_cryptedpass'), getattr(request, '_scrambler')
)

View File

@ -40,17 +40,10 @@ if typing.TYPE_CHECKING:
from .task import TaskManager
from .downloads import DownloadsManager
from .log import LogManager
from .user_service import UserServiceManager
from .publication import PublicationManager
from .notifications import NotificationsManager
def cryptoManager() -> 'CryptoManager':
from .crypto import CryptoManager # pylint: disable=import-outside-toplevel
return CryptoManager.manager()
def taskManager() -> 'TaskManager':
from .task import TaskManager # pylint: disable=import-outside-toplevel
@ -68,13 +61,6 @@ def logManager() -> 'LogManager':
return LogManager()
def userServiceManager() -> 'UserServiceManager':
from .user_service import UserServiceManager # pylint: disable=import-outside-toplevel
return UserServiceManager()
def publicationManager() -> 'PublicationManager':
from .publication import PublicationManager # pylint: disable=import-outside-toplevel

View File

@ -38,7 +38,7 @@ import typing
from wsgiref.util import FileWrapper
from django.http import HttpResponse, Http404
from uds.core.managers import cryptoManager
from uds.core.managers.crypto import CryptoManager
from uds.core.util import singleton
logger = logging.getLogger(__name__)
@ -76,7 +76,7 @@ class DownloadsManager(metaclass=singleton.Singleton):
@param path: path to file
@params zip: If download as zip
"""
_id = cryptoManager().uuid(name)
_id = CryptoManager().uuid(name)
self._downloadables[_id] = {
'name': name,
'comment': comment,

View File

@ -37,7 +37,7 @@ import enum
from django.apps import apps
from django.utils.translation import gettext_lazy as _, gettext
from uds.models.config import Config as DBConfig
from uds.core.managers import cryptoManager
from uds.core.managers.crypto import CryptoManager
logger = logging.getLogger(__name__)
@ -126,7 +126,7 @@ class Config:
if crypt is False or not default:
self._default = default
else:
self._default = cryptoManager().encrypt(default)
self._default = CryptoManager().encrypt(default)
self._help = kwargs.get('help', '')
@ -163,7 +163,7 @@ class Config:
except DBConfig.DoesNotExist:
# Not found, so we create it
if self._default and self._crypt:
self.set(cryptoManager().decrypt(self._default))
self.set(CryptoManager().decrypt(self._default))
elif not self._crypt:
self.set(self._default)
self._data = self._default
@ -175,7 +175,7 @@ class Config:
self._data = self._default
if self._crypt:
return cryptoManager().decrypt(typing.cast(str, self._data))
return CryptoManager().decrypt(typing.cast(str, self._data))
return typing.cast(str, self._data)
def setParams(self, params: typing.Any) -> None:
@ -238,7 +238,7 @@ class Config:
value = str(value)
if self._crypt:
value = cryptoManager().encrypt(value)
value = CryptoManager().encrypt(value)
logger.debug(
'Saving config %s.%s as %s', self._section.name(), self._key, value
@ -319,7 +319,7 @@ class Config:
return False # Skip non writable elements
if cfg.crypt:
value = cryptoManager().encrypt(value)
value = CryptoManager().encrypt(value)
cfg.value = value
cfg.save()
logger.debug('Updated value for %s.%s to %s', section, key, value)
@ -696,7 +696,7 @@ class GlobalConfig:
# Global UDS ID (common for all servers on the same cluster)
UDS_ID: Config.Value = Config.section(GLOBAL_SECTION).value(
'UDS ID',
cryptoManager().uuid(),
CryptoManager().uuid(),
type=Config.FieldType.READ,
help=_('Global UDS ID (common for all servers on the same cluster)'),
)

View File

@ -28,14 +28,14 @@
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
import typing
from uds.core.managers import cryptoManager
from uds.core.managers.crypto import CryptoManager
def generateUuid(obj: typing.Any = None) -> str:
"""
Generates a ramdom uuid for models default
"""
return cryptoManager().uuid(obj=obj).lower()
return CryptoManager().uuid(obj=obj).lower()
def processUuid(uuid: str) -> str:

View File

@ -37,7 +37,7 @@ from django.http import HttpResponse
from uds.models import TicketStore, UserService, TunnelToken
from uds.core.auths import auth
from uds.core.managers import cryptoManager
from uds.core.managers.crypto import CryptoManager
from uds.core.util import log
from uds.core.util.stats import events
from uds.core.util.request import ExtendedHttpRequestWithUser
@ -91,7 +91,7 @@ def guacamole(
raise Exception()
# Log message and event
protocol = 'RDS' if 'remote-app' in val else val['protocol'].upper()
host = val.get('hostname', '0.0.0.0')
host = val.get('hostname', '0.0.0.0') # nosec: Not a bind, just a placeholder for "no host"
msg = f'User {user.name} started HTML5 {protocol} tunnel to {host}.'
log.doLog(user.manager, log.INFO, msg)
log.doLog(userService, log.INFO, msg)
@ -113,7 +113,7 @@ def guacamole(
raise # Let it be handled by the upper layers
if 'password' in val:
val['password'] = cryptoManager().symDecrpyt(val['password'], scrambler)
val['password'] = CryptoManager().symDecrpyt(val['password'], scrambler)
response = dict2resp(val)
except Exception:

View File

@ -35,7 +35,7 @@ import typing
from django.db import models
from uds.core.managers import cryptoManager
from uds.core.managers.crypto import CryptoManager
from .uuid_model import UUIDModel
from .util import getSqlDatetime
@ -90,7 +90,7 @@ class TicketStore(UUIDModel):
@staticmethod
def generateUuid() -> str:
return (
cryptoManager().randomString(40).lower()
CryptoManager().randomString(40).lower()
) # Temporary fix lower() for compat with 3.0
@staticmethod
@ -111,7 +111,7 @@ class TicketStore(UUIDModel):
if secure:
if not owner:
raise ValueError('Tried to use a secure ticket without owner')
data = cryptoManager().AESCrypt(data, owner.encode())
data = CryptoManager().AESCrypt(data, owner.encode())
owner = SECURED # So data is REALLY encrypted
return (
@ -150,7 +150,7 @@ class TicketStore(UUIDModel):
data: bytes = t.data
if secure: # Owner has already been tested and it's not emtpy
data = cryptoManager().AESDecrypt(
data = CryptoManager().AESDecrypt(
data, typing.cast(str, owner).encode()
)
@ -191,7 +191,7 @@ class TicketStore(UUIDModel):
if secure: # Owner has already been tested and it's not emtpy
if not owner:
raise ValueError('Tried to use a secure ticket without owner')
data = cryptoManager().AESDecrypt(
data = CryptoManager().AESDecrypt(
data, typing.cast(str, owner).encode()
)
@ -212,7 +212,7 @@ class TicketStore(UUIDModel):
if secure:
if not owner:
raise ValueError('Tried to use a secure ticket without owner')
data = cryptoManager().AESCrypt(data, owner.encode())
data = CryptoManager().AESCrypt(data, owner.encode())
t.data = data
t.save(update_fields=['data'])
except TicketStore.DoesNotExist:
@ -242,7 +242,7 @@ class TicketStore(UUIDModel):
extra: typing.Optional[typing.Mapping[str, typing.Any]] = None,
validity: int = 60 * 60 * 24, # 24 Hours default validity for tunnel tickets
) -> str:
owner = cryptoManager().randomString(length=8)
owner = CryptoManager().randomString(length=8)
if not userService.user:
raise ValueError('User is not set in userService')
data = {

View File

@ -34,7 +34,7 @@ import logging
from django.db import models
from uds.core.managers import cryptoManager
from uds.core.managers.crypto import CryptoManager
from .user_service import UserService
from .util import getSqlDatetime
@ -46,7 +46,7 @@ def _session_id_generator() -> str:
"""
Generates a new session id
"""
return cryptoManager().unique()
return CryptoManager().unique()
class UserServiceSession(models.Model): # pylint: disable=too-many-public-methods

View File

@ -1,4 +1,4 @@
# -*- coding: utf-8 -*-
# pylint: disable=no-member
#
# Copyright (c) 2012-2021 Virtual Cable S.L.U.
@ -40,7 +40,7 @@ import ldap
from django.utils.translation import gettext_noop as _
from uds.core.ui import gui
from uds.core.managers import cryptoManager
from uds.core.managers.crypto import CryptoManager
from uds.core import exceptions
from uds.core.util import log
from uds.core.util import ldaputil
@ -234,7 +234,7 @@ class WinDomainOsManager(WindowsOsManager):
debug=False,
)
except Exception as e:
_str = 'Error: {}'.format(e)
_str = f'Error: {e}'
raise ldaputil.LDAPError(_str)
@ -247,9 +247,7 @@ class WinDomainOsManager(WindowsOsManager):
ldaputil.getAsDict(
ldapConnection,
base,
"(&(objectClass=group)(|(cn={0})(sAMAccountName={0})))".format(
group
),
f'(&(objectClass=group)(|(cn={group})(sAMAccountName={group})))',
['dn'],
sizeLimit=50,
)
@ -268,9 +266,7 @@ class WinDomainOsManager(WindowsOsManager):
# else:
base = ','.join(['DC=' + i for i in self._domain.split('.')])
fltr = '(&(objectClass=computer)(sAMAccountName={}$))'.format(
ldaputil.escape(machineName)
)
fltr = f'(&(objectClass=computer)(sAMAccountName={ldaputil.escape(machineName)}$))'
obj: typing.Optional[typing.MutableMapping[str, typing.Any]]
try:
obj = next(
@ -314,9 +310,7 @@ class WinDomainOsManager(WindowsOsManager):
log.doLog(
userService,
log.WARN,
"Could not remove machine from domain (_ldap._tcp.{0} not found)".format(
self._domain
),
f'Could not remove machine from domain (_ldap._tcp.{self._domain} not found)',
log.OSMANAGER,
)
except ldap.ALREADY_EXISTS: # type: ignore # (valid)
@ -325,13 +319,9 @@ class WinDomainOsManager(WindowsOsManager):
break
except ldaputil.LDAPError:
logger.exception('Ldap Exception caught')
error = "Could not add machine (invalid credentials? for {0})".format(
self._account
)
error = f'Could not add machine (invalid credentials? for {self._account})'
except Exception as e:
error = "Could not add machine {} to group {}: {}".format(
userService.friendly_name, self._group, e
)
error = f'Could not add machine {userService.friendly_name} to group {self._group}: {e}'
# logger.exception('Ldap Exception caught')
if error:
@ -362,9 +352,7 @@ class WinDomainOsManager(WindowsOsManager):
log.doLog(
userService,
log.WARN,
"Could not remove machine from domain (_ldap._tcp.{} not found)".format(
self._domain
),
f'Could not remove machine from domain (_ldap._tcp.{self._domain} not found)',
log.OSMANAGER,
)
return
@ -373,7 +361,7 @@ class WinDomainOsManager(WindowsOsManager):
log.doLog(
userService,
log.WARN,
"Could not remove machine from domain ({})".format(e),
f'Could not remove machine from domain ({e})',
log.OSMANAGER,
)
return
@ -382,7 +370,7 @@ class WinDomainOsManager(WindowsOsManager):
log.doLog(
userService,
log.WARN,
"Could not remove machine from domain ({})".format(e),
f'Could not remove machine from domain ({e})',
log.OSMANAGER,
)
return
@ -391,9 +379,7 @@ class WinDomainOsManager(WindowsOsManager):
res = self.__getMachine(ldapConnection, userService.friendly_name)
if res is None:
raise Exception(
'Machine {} not found on AD (permissions?)'.format(
userService.friendly_name
)
f'Machine {userService.friendly_name} not found on AD (permissions?)'
)
ldaputil.recursive_delete(ldapConnection, res)
except IndexError:
@ -497,7 +483,7 @@ class WinDomainOsManager(WindowsOsManager):
self._domain,
self._ou,
self._account,
cryptoManager().encrypt(self._password),
CryptoManager().encrypt(self._password),
base,
self._group,
self._serverHint,
@ -512,7 +498,7 @@ class WinDomainOsManager(WindowsOsManager):
self._domain = values[1]
self._ou = values[2]
self._account = values[3]
self._password = cryptoManager().decrypt(values[4])
self._password = CryptoManager().decrypt(values[4])
if values[0] in ('v2', 'v3', 'v4'):
self._group = values[6]

View File

@ -39,7 +39,7 @@ import typing
from django.utils.translation import gettext_noop as _
from uds.core.ui import gui
from uds.core.managers import cryptoManager
from uds.core.managers.crypto import CryptoManager
from uds.core import exceptions
from uds.core.util import log
@ -99,7 +99,7 @@ class WinRandomPassManager(WindowsOsManager):
self._password = values['password']
else:
self._userAccount = ''
self._password = ""
self._password = '' # nosec: not a password (empty)
def processUserPassword(
self, userService: 'UserService', username: str, password: str
@ -129,7 +129,7 @@ class WinRandomPassManager(WindowsOsManager):
log.doLog(
userService,
log.INFO,
"Password set to \"{}\"".format(randomPass),
f'Password set to "{randomPass}"',
log.OSMANAGER,
)
return randomPass
@ -151,14 +151,14 @@ class WinRandomPassManager(WindowsOsManager):
'''
base = codecs.encode(super().marshal(), 'hex').decode()
return '\t'.join(
['v1', self._userAccount, cryptoManager().encrypt(self._password), base]
['v1', self._userAccount, CryptoManager().encrypt(self._password), base]
).encode('utf8')
def unmarshal(self, data: bytes) -> None:
values = data.decode('utf8').split('\t')
if values[0] == 'v1':
self._userAccount = values[1]
self._password = cryptoManager().decrypt(values[2])
self._password = CryptoManager().decrypt(values[2])
super().unmarshal(codecs.decode(values[3].encode(), 'hex'))
def valuesDict(self) -> gui.ValuesDictType:

View File

@ -34,7 +34,7 @@ import logging
import typing
from uds.core.services import UserDeployment
from uds.core.managers import cryptoManager
from uds.core.managers.crypto import CryptoManager
from uds.core.util.state import State
from uds.core.util import log
from uds.models.util import getSqlDatetimeAsUnix
@ -121,7 +121,9 @@ class OGDeployment(UserDeployment):
self._machineId = vals[4].decode('utf8')
self._reason = vals[5].decode('utf8')
self._stamp = int(vals[6].decode('utf8'))
self._queue = pickle.loads(vals[7]) # nosec: not insecure, we are loading our own data
self._queue = pickle.loads(
vals[7]
) # nosec: not insecure, we are loading our own data
def getName(self) -> str:
return self._name
@ -166,7 +168,7 @@ class OGDeployment(UserDeployment):
return self.__executeQueue()
except Exception as e:
return self.__error('Error setting ready state: {}'.format(e))
return self.__error(f'Error setting ready state: {e}')
def deployForUser(self, user: 'models.User') -> str:
"""
@ -197,7 +199,7 @@ class OGDeployment(UserDeployment):
status = self.service().status(self._machineId)
except Exception as e:
logger.exception('Exception at checkMachineReady')
return self.__error('Error checking machine: {}'.format(e))
return self.__error(f'Error checking machine: {e}')
# possible status are ("off", "oglive", "busy", "linux", "windows", "macos" o "unknown").
if status['status'] in ("linux", "windows", "macos"):
@ -218,12 +220,6 @@ class OGDeployment(UserDeployment):
res = self._queue.pop(0)
return res
def __pushFrontOp(self, op: int) -> None:
self._queue.insert(0, op)
def __pushBackOp(self, op: int) -> None:
self._queue.append(op)
def __error(self, reason: typing.Any) -> str:
"""
Internal method to set object as error state
@ -238,7 +234,7 @@ class OGDeployment(UserDeployment):
try:
self.service().unreserve(self._machineId)
except Exception as e:
logger.warning('Error unreserving machine: $s', e)
logger.warning('Error unreserving machine: %s', e)
self._queue = [opError]
self._reason = str(reason)
@ -266,7 +262,7 @@ class OGDeployment(UserDeployment):
if execFnc is None:
return self.__error(
'Unknown operation found at execution queue ({0})'.format(op)
f'Unknown operation found at execution queue ({op})'
)
execFnc()
@ -292,7 +288,7 @@ class OGDeployment(UserDeployment):
Deploys a machine from template for user/cache
"""
r: typing.Any = None
token = cryptoManager().randomString(32)
token = CryptoManager().randomString(32)
try:
r = self.service().reserve()
self.service().notifyEvents(r['id'], token, self._uuid)
@ -306,7 +302,7 @@ class OGDeployment(UserDeployment):
# Error unreserving reserved machine on creation
logger.error('Error unreserving errored machine: %s', ei)
raise Exception('Error creating reservation: {}'.format(e))
raise Exception(f'Error creating reservation: {e}') from e
self._machineId = r['id']
self._name = r['name']
@ -389,7 +385,7 @@ class OGDeployment(UserDeployment):
if chkFnc is None:
return self.__error(
'Unknown operation found at check queue ({0})'.format(op)
f'Unknown operation found at check queue ({op})'
)
state = chkFnc()

View File

@ -1,4 +1,4 @@
# -*- coding: utf-8 -*-
# pylint: disable=no-member # For some reason, pylint does not detect the Tab member of gui
#
# Copyright (c) 2012-2022 Virtual Cable S.L.U.
@ -39,7 +39,7 @@ from django.utils.translation import gettext_noop as _
from uds.core.ui import gui
from uds.core import transports, exceptions
from uds.core.util import os_detector as OsDetector
from uds.core.managers import cryptoManager
from uds.core.managers.crypto import CryptoManager
from uds import models
# Not imported at runtime, just for type checking
@ -319,10 +319,8 @@ class HTML5RDPTransport(transports.Transport):
# Strip spaces and all trailing '/'
self.guacamoleServer.value = self.guacamoleServer.value.strip().rstrip('/')
if self.guacamoleServer.value[0:4] != 'http':
raise exceptions.ValidationError(
_('The server must be http or https')
)
#if self.useEmptyCreds.isTrue() and self.security.value != 'rdp':
raise exceptions.ValidationError(_('The server must be http or https'))
# if self.useEmptyCreds.isTrue() and self.security.value != 'rdp':
# raise exceptions.ValidationException(
# _(
# 'Empty credentials (on Credentials tab) is only allowed with Security level (on Parameters tab) set to "RDP"'
@ -379,8 +377,6 @@ class HTML5RDPTransport(transports.Transport):
domain = ''
username = proc[0]
azureAd = False
if self.fixedDomain.value != '':
domain = self.fixedDomain.value
@ -409,10 +405,10 @@ class HTML5RDPTransport(transports.Transport):
userService: 'models.UserService',
transport: 'models.Transport',
ip: str,
os: 'DetectedOsInfo',
os: 'DetectedOsInfo', # pylint: disable=unused-argument
user: 'models.User',
password: str,
request: 'ExtendedHttpRequestWithUser',
request: 'ExtendedHttpRequestWithUser', # pylint: disable=unused-argument
) -> str:
credsInfo = self.getConnectionInfo(userService, user, password)
username, password, domain = (
@ -421,10 +417,11 @@ class HTML5RDPTransport(transports.Transport):
credsInfo['domain'],
)
scrambler = cryptoManager().randomString(32)
passwordCrypted = cryptoManager().symCrypt(password, scrambler)
scrambler = CryptoManager().randomString(32)
passwordCrypted = CryptoManager().symCrypt(password, scrambler)
as_txt = lambda x: 'true' if x else 'false'
def as_txt(txt: typing.Any) -> str:
return 'true' if txt else 'false'
# Build params dict
params = {
@ -442,7 +439,7 @@ class HTML5RDPTransport(transports.Transport):
'disable-upload': as_txt(
self.enableFileSharing.value not in ('true', 'up')
),
'drive-path': '/share/{}'.format(user.uuid),
'drive-path': f'/share/{user.uuid}',
'drive-name': 'UDSfs',
'disable-copy': as_txt(
self.enableClipboard.value in ('dis-copy', 'disabled')
@ -457,13 +454,19 @@ class HTML5RDPTransport(transports.Transport):
},
}
if not password and self.security.value != 'rdp': # No password, but not rdp, so we need to use creds popup
extra_params=f'&creds={username}@{domain}'
if (
not password and self.security.value != 'rdp'
): # No password, but not rdp, so we need to use creds popup
extra_params = f'&creds={username}@{domain}'
else:
extra_params=''
extra_params = ''
# pylint: disable=using-constant-test
if False: # Future imp
sanitize = lambda x: re.sub("[^a-zA-Z0-9_-]", "_", x)
# sanitize = lambda x: re.sub("[^a-zA-Z0-9_-]", "_", x)
def sanitize(text: str) -> str:
return re.sub("[^a-zA-Z0-9_-]", "_", text)
params['recording-path'] = (
'/share/recording/'
+ sanitize(user.manager.name)
@ -502,9 +505,9 @@ class HTML5RDPTransport(transports.Transport):
ticket = models.TicketStore.create(params, validity=self.ticketValidity.num())
onw = '&o_n_w={}'.format(transport.uuid)
onw = f'&o_n_w={transport.uuid}'
if self.forceNewWindow.value == gui.TRUE:
onw = '&o_n_w={}'.format(userService.deployed_service.uuid)
onw = f'&o_n_w={userService.deployed_service.uuid}'
elif self.forceNewWindow.value == 'overwrite':
onw = '&o_s_w=yes'
path = (
@ -517,12 +520,5 @@ class HTML5RDPTransport(transports.Transport):
path = path[:-1]
return str(
"{server}{path}/#/?data={ticket}.{scrambler}{onw}{extra_params}".format(
server=self.guacamoleServer.value,
path=path,
ticket=ticket,
scrambler=scrambler,
onw=onw,
extra_params=extra_params,
)
f'{self.guacamoleServer.value}{path}/#/?data={ticket}.{scrambler}{onw}{extra_params}'
)

View File

@ -40,7 +40,7 @@ from uds.core.ui import gui
from uds.core import transports, exceptions
from uds.core.util import os_detector as OsDetector
from uds.core.managers import cryptoManager
from uds.core.managers.crypto import CryptoManager
from uds import models
# Not imported at runtime, just for type checking
@ -66,6 +66,7 @@ class HTML5SSHTransport(transports.Transport):
ownLink = True
supportedOss = OsDetector.allOss
# pylint: disable=no-member # ??? SSH is there, but pylint does not see it ???
protocol = transports.protocols.SSH
group = transports.TUNNELED_GROUP
@ -211,9 +212,7 @@ class HTML5SSHTransport(transports.Transport):
# Remove trailing / (one or more) from url if it exists from "guacamoleServer" field
self.guacamoleServer.value = self.guacamoleServer.value.strip().rstrip('/')
if self.guacamoleServer.value[0:4] != 'http':
raise exceptions.ValidationError(
_('The server must be http or https')
)
raise exceptions.ValidationError(_('The server must be http or https'))
def isAvailableFor(self, userService: 'models.UserService', ip: str) -> bool:
"""
@ -266,19 +265,19 @@ class HTML5SSHTransport(transports.Transport):
# Filesharing using guacamole sftp
if self.enableFileSharing.value != 'false':
params['enable-sftp'] = 'true'
if self.fileSharingRoot.value.strip():
params['sftp-root-directory'] = self.fileSharingRoot.value.strip()
if self.enableFileSharing.value not in ('down', 'true'):
params['sftp-disable-download'] = 'true'
params['sftp-disable-download'] = 'true'
if self.enableFileSharing.value not in ('up', 'true'):
params['sftp-disable-upload'] = 'true'
params['sftp-disable-upload'] = 'true'
logger.debug('SSH Params: %s', params)
scrambler = cryptoManager().randomString(32)
scrambler = CryptoManager().randomString(32)
ticket = models.TicketStore.create(params, validity=self.ticketValidity.num())
onw = ''
@ -289,7 +288,5 @@ class HTML5SSHTransport(transports.Transport):
onw = onw.format(hash(transport.name))
return str(
"{}/guacamole/#/?data={}.{}{}".format(
self.guacamoleServer.value, ticket, scrambler, onw
)
f'{self.guacamoleServer.value}/guacamole/#/?data={ticket}.{scrambler}{onw}'
)

View File

@ -38,7 +38,7 @@ from django.utils.translation import gettext_noop as _
from uds.core.ui import gui
from uds.core import transports, exceptions
from uds.core.util import os_detector as OsDetector
from uds.core.managers import cryptoManager
from uds.core.managers.crypto import CryptoManager
from uds import models
# Not imported at runtime, just for type checking
@ -236,7 +236,7 @@ class HTML5VNCTransport(transports.Transport):
logger.debug('VNC Params: %s', params)
scrambler = cryptoManager().randomString(32)
scrambler = CryptoManager().randomString(32)
ticket = models.TicketStore.create(params, validity=self.ticketValidity.num())
onw = ''
@ -246,8 +246,4 @@ class HTML5VNCTransport(transports.Transport):
onw = 'o_s_w=yes'
onw = onw.format(hash(transport.name))
return str(
"{}/guacamole/#/?data={}.{}{}".format(
self.guacamoleServer.value, ticket, scrambler, onw
)
)
return f'{self.guacamoleServer.value}/guacamole/#/?data={ticket}.{scrambler}{onw}'

View File

@ -47,7 +47,7 @@ from django.views.decorators.cache import never_cache
from uds.core.util.request import ExtendedHttpRequest, ExtendedHttpRequestWithUser
from uds.core.auths import auth, exceptions
from uds.core.util.config import GlobalConfig
from uds.core.managers import cryptoManager
from uds.core.managers.crypto import CryptoManager
from uds.web.util import errors
from uds.web.forms.LoginForm import LoginForm
from uds.web.forms.MFAForm import MFAForm
@ -338,7 +338,7 @@ def update_transport_ticket(
domain = data.get('domain', None) or None # If empty string, set to None
if password:
password = cryptoManager().symCrypt(password, scrambler)
password = CryptoManager().symCrypt(password, scrambler)
def checkValidTicket(data: typing.Mapping[str, typing.Any]) -> bool:
if 'ticket-info' not in data: