1
0
mirror of https://github.com/dkmstr/openuds.git synced 2025-01-11 05:17:55 +03:00

A lot of refactorization

This commit is contained in:
Adolfo Gómez García 2024-01-11 23:19:58 +01:00
parent 27d91154b6
commit e4d2d2a843
No known key found for this signature in database
GPG Key ID: DD1ABF20724CDA23
94 changed files with 563 additions and 433 deletions

View File

@ -96,7 +96,7 @@ def checkBlockedIp(request: 'ExtendedHttpRequest') -> None:
logger.info(
'Access to actor from %s is blocked for %s seconds since last fail',
request.ip,
GlobalConfig.LOGIN_BLOCK.getInt(),
GlobalConfig.LOGIN_BLOCK.as_int(),
)
# Sleep a while to try to minimize brute force attacks somehow
time.sleep(3) # 3 seconds should be enough
@ -105,7 +105,7 @@ def checkBlockedIp(request: 'ExtendedHttpRequest') -> None:
def incFailedIp(request: 'ExtendedHttpRequest') -> None:
fails = cache.get(request.ip, 0) + 1
cache.put(request.ip, fails, GlobalConfig.LOGIN_BLOCK.getInt())
cache.put(request.ip, fails, GlobalConfig.LOGIN_BLOCK.as_int())
# Decorator that clears failed counter for the IP if succeeds
@ -509,7 +509,7 @@ class BaseReadyChange(ActorV3Action):
UserServiceManager().notify_ready_from_os_manager(userService, '')
# Generates a certificate and send it to client.
privateKey, cert, password = security.selfSignedCert(self._params['ip'])
privateKey, cert, password = security.create_self_signed_cert(self._params['ip'])
# Store certificate with userService
userService.properties['cert'] = cert
userService.properties['priv'] = privateKey
@ -800,7 +800,7 @@ class Unmanaged(ActorV3Action):
ip = self._params['id'][0]['ip'] # Get first IP if no valid ip found
# Generates a certificate and send it to client (actor).
privateKey, certificate, password = security.selfSignedCert(ip)
privateKey, certificate, password = security.create_self_signed_cert(ip)
if validId:
# If id is assigned to an user service, notify "logout" to it

View File

@ -45,7 +45,7 @@ class Config(Handler):
needs_admin = True # By default, staff is lower level needed
def get(self) -> typing.Any:
return CfgConfig.getConfigValues(self.is_admin())
return CfgConfig.get_config_values(self.is_admin())
def put(self) -> typing.Any:

View File

@ -93,7 +93,7 @@ class Connection(Handler):
self._request.user = self._user
return Connection.result(
result=services.getServicesData(typing.cast(ExtendedHttpRequestWithUser, self._request))
result=services.get_services_data(typing.cast(ExtendedHttpRequestWithUser, self._request))
)
def connection(self, idService: str, idTransport: str, skip: str = '') -> dict[str, typing.Any]:
@ -183,7 +183,7 @@ class Connection(Handler):
self._request.user = self._user # type: ignore
setattr(self._request, '_cryptedpass', self._session['REST']['password']) # type: ignore # pylint: disable=protected-access
setattr(self._request, '_scrambler', self._request.META['HTTP_SCRAMBLER']) # type: ignore # pylint: disable=protected-access
linkInfo = services.enableService(self._request, idService=idService, idTransport=idTransport)
linkInfo = services.enable_service(self._request, idService=idService, idTransport=idTransport)
if linkInfo['error']:
return Connection.result(error=linkInfo['error'])
return Connection.result(result=linkInfo['url'])

View File

@ -104,7 +104,7 @@ class Login(Handler):
logger.info(
'Access to REST API %s is blocked for %s seconds since last fail',
self._request.ip,
GlobalConfig.LOGIN_BLOCK.getInt(),
GlobalConfig.LOGIN_BLOCK.as_int(),
)
raise exceptions.rest.AccessDenied('Too many fails')
@ -177,7 +177,7 @@ class Login(Handler):
# Sleep a while here to "prottect"
time.sleep(3) # Wait 3 seconds if credentials fails for "protection"
# And store in cache for blocking for a while if fails
fail_cache.put(self._request.ip, fails + 1, GlobalConfig.LOGIN_BLOCK.getInt())
fail_cache.put(self._request.ip, fails + 1, GlobalConfig.LOGIN_BLOCK.as_int())
return Login.result(error='Invalid credentials')
return Login.result(

View File

@ -80,11 +80,11 @@ class ServerRegisterBase(Handler):
raise ValueError(_('Invalid data. Max length is 2048.'))
if port < 1 or port > 65535:
raise ValueError(_('Invalid port. Must be between 1 and 65535'))
validators.validateIpv4OrIpv6(ip) # Will raise "validation error"
validators.validateFqdn(hostname)
validators.validateMac(mac)
validators.validateJson(data)
validators.validateServerCertificate(certificate)
validators.validate_ip(ip) # Will raise "validation error"
validators.validate_fqdn(hostname)
validators.validate_mac(mac)
validators.validate_json(data)
validators.validate_server_certificate(certificate)
except Exception as e:
raise rest_exceptions.RequestError(str(e)) from e

View File

@ -138,7 +138,7 @@ class ServicesPools(ModelHandler):
def get_items(self, *args, **kwargs) -> typing.Generator[typing.Any, None, None]:
# Optimized query, due that there is a lot of info needed for theee
d = sql_datetime() - datetime.timedelta(seconds=GlobalConfig.RESTRAINT_TIME.getInt())
d = sql_datetime() - datetime.timedelta(seconds=GlobalConfig.RESTRAINT_TIME.as_int())
return super().get_items(
overview=kwargs.get('overview', True),
query=(
@ -240,7 +240,7 @@ class ServicesPools(ModelHandler):
if hasattr(item, 'valid_count'):
valid_count = item.valid_count # type: ignore
preparing_count = item.preparing_count # type: ignore
restrained = item.error_count >= GlobalConfig.RESTRAINT_COUNT.getInt() # type: ignore
restrained = item.error_count >= GlobalConfig.RESTRAINT_COUNT.as_int() # type: ignore
usage_count = item.usage_count # type: ignore
else:
valid_count = item.userServices.exclude(state__in=State.INFO_STATES).count()

View File

@ -207,7 +207,7 @@ class Tunnels(ModelHandler):
fields['type'] = types.servers.ServerType.TUNNEL.value
fields['port'] = int(fields['port'])
# Ensure host is a valid IP(4 or 6) or hostname
validators.validateHost(fields['host'])
validators.validate_host(fields['host'])
def assign(self, parent: 'Model') -> typing.Any:
parent = ensure.is_instance(parent, models.ServerGroup)

View File

@ -174,7 +174,7 @@ class RadiusAuth(auths.Authenticator):
groups.append(self.globalGroup.value.strip())
# Cache groups for "getGroups", because radius will not send us those
with self.storage.map() as storage:
with self.storage.as_dict() as storage:
storage[username] = groups
# Validate groups
@ -183,14 +183,14 @@ class RadiusAuth(auths.Authenticator):
return types.auth.SUCCESS_AUTH
def get_groups(self, username: str, groupsManager: 'auths.GroupsManager') -> None:
with self.storage.map() as storage:
with self.storage.as_dict() as storage:
groupsManager.validate(storage.get(username, []))
def create_user(self, usrData: dict[str, str]) -> None:
pass
def remove_user(self, username: str) -> None:
with self.storage.map() as storage:
with self.storage.as_dict() as storage:
if username in storage:
del storage[username]
return super().remove_user(username)

View File

@ -370,7 +370,7 @@ class SAMLAuthenticator(auths.Authenticator):
except Exception as e:
raise exceptions.validation.ValidationError(gettext('Invalid private key. ') + str(e))
if not security.checkCertificateMatchPrivateKey(
if not security.check_certificate_matches_private_key(
cert=self.serverCertificate.value, key=self.privateKey.value
):
raise exceptions.validation.ValidationError(gettext('Certificate and private key do not match'))

View File

@ -51,17 +51,17 @@ class Environment:
not stored with main module data.
The environment is composed of a "cache" and a "storage". First are volatile data, while second are persistent data.
"""
__slots__ = ['_key', '_cache', '_storage', '_idGenerators']
__slots__ = ['_key', '_cache', '_storage', '_id_generators']
_key: str
_cache: 'Cache'
_storage: 'Storage'
_idGenerators: dict[str, 'UniqueIDGenerator']
_id_generators: dict[str, 'UniqueIDGenerator']
def __init__(
self,
uniqueKey: str,
idGenerators: typing.Optional[dict[str, 'UniqueIDGenerator']] = None,
id_generators: typing.Optional[dict[str, 'UniqueIDGenerator']] = None,
):
"""
Initialized the Environment for the specified id
@ -77,7 +77,7 @@ class Environment:
self._key = uniqueKey
self._cache = Cache(uniqueKey)
self._storage = Storage(uniqueKey)
self._idGenerators = idGenerators or {}
self._id_generators = id_generators or {}
@property
def cache(self) -> 'Cache':
@ -95,7 +95,7 @@ class Environment:
"""
return self._storage
def idGenerators(self, generatorId: str) -> 'UniqueIDGenerator':
def id_generator(self, generator_id: str) -> 'UniqueIDGenerator':
"""
The idea of generator of id is to obtain at some moment Ids with a proper generator.
If the environment do not contains generators of id, this method will return None.
@ -103,9 +103,9 @@ class Environment:
@param generatorId: Id of the generator to obtain
@return: Generator for that id, or None if no generator for that id is found
"""
if not self._idGenerators or generatorId not in self._idGenerators:
raise Exception(f'No generator found for {generatorId}')
return self._idGenerators[generatorId]
if not self._id_generators or generator_id not in self._id_generators:
raise Exception(f'No generator found for {generator_id}')
return self._id_generators[generator_id]
@property
def key(self) -> str:
@ -114,17 +114,17 @@ class Environment:
"""
return self._key
def clearRelatedData(self):
def clean_related_data(self):
"""
Removes all related information from database for this environment.
"""
self._cache.clear()
self._storage.clear()
for _, v in self._idGenerators.items():
for _, v in self._id_generators.items():
v.release()
@staticmethod
def getEnvForTableElement(
def get_environment_for_table(
tblName,
id_,
idGeneratorsTypes: typing.Optional[dict[str, typing.Any]] = None,
@ -147,7 +147,7 @@ class Environment:
return Environment(name, idGenerators)
@staticmethod
def getEnvForType(type_) -> 'Environment':
def get_environment_for_type(type_) -> 'Environment':
"""
Obtains an environment associated with a type instead of a record
@param type_: Type
@ -167,7 +167,7 @@ class Environment:
return env
@staticmethod
def getGlobalEnv() -> 'Environment':
def get_common_environment() -> 'Environment':
"""
Provides global environment
"""
@ -236,7 +236,7 @@ class Environmentable:
"""
return self._env.storage
def id_generators(self, generatorId: str) -> 'UniqueIDGenerator':
def id_generator(self, generatorId: str) -> 'UniqueIDGenerator':
"""
Utility method to access the id generator of the environment containe by this object
@ -246,4 +246,4 @@ class Environmentable:
Returns:
Generator for the object and the id specified
"""
return self._env.idGenerators(generatorId)
return self._env.id_generator(generatorId)

View File

@ -50,7 +50,7 @@ class DelayedTask(Environmentable):
"""
Remember to invoke parent init in derived clases using super(myClass,self).__init__() to let this initialize its own variables
"""
super().__init__(environment or Environment.getEnvForType(self.__class__))
super().__init__(environment or Environment.get_environment_for_type(self.__class__))
def execute(self) -> None:
"""

View File

@ -137,7 +137,7 @@ class DelayedTaskRunner(metaclass=singleton.Singleton):
if taskInstance:
logger.debug('Executing delayedTask:>%s<', task)
# Re-create environment data
taskInstance.env = Environment.getEnvForType(taskInstance.__class__)
taskInstance.env = Environment.get_environment_for_type(taskInstance.__class__)
DelayedTaskThread(taskInstance).start()
def _insert(self, instance: DelayedTask, delay: int, tag: str) -> None:

View File

@ -58,7 +58,7 @@ class Job(Environmentable):
"""
if cls.frecuency_cfg:
try:
cls.frecuency = cls.frecuency_cfg.getInt(force=True)
cls.frecuency = cls.frecuency_cfg.as_int(force=True)
logger.debug(
'Setting frequency from DB setting for %s to %s', cls, cls.frecuency
)

View File

@ -124,7 +124,7 @@ class PublicationLauncher(DelayedTask):
serialize(
now
+ datetime.timedelta(
hours=GlobalConfig.SESSION_EXPIRE_TIME.getInt(True)
hours=GlobalConfig.SESSION_EXPIRE_TIME.as_int(True)
)
),
)
@ -188,7 +188,7 @@ class PublicationFinishChecker(DelayedTask):
if doPublicationCleanup:
pc = PublicationOldMachinesCleaner(old.id)
pc.register(
GlobalConfig.SESSION_EXPIRE_TIME.getInt(True) * 3600,
GlobalConfig.SESSION_EXPIRE_TIME.as_int(True) * 3600,
'pclean-' + str(old.id),
True,
)
@ -371,27 +371,27 @@ class PublicationManager(metaclass=singleton.Singleton):
raise PublishException(str(e)) from e
def unpublish(
self, servicePoolPub: ServicePoolPublication
self, servicepool_publication: ServicePoolPublication
): # pylint: disable=no-self-use
"""
Unpublishes an active (usable) or removable publication
:param servicePoolPub: Publication to unpublish
"""
if (
State.from_str(servicePoolPub.state).is_usable() is False
and State.from_str(servicePoolPub.state).is_removable() is False
State.from_str(servicepool_publication.state).is_usable() is False
and State.from_str(servicepool_publication.state).is_removable() is False
):
raise PublishException(_('Can\'t unpublish non usable publication'))
if servicePoolPub.userServices.exclude(state__in=State.INFO_STATES).count() > 0:
if servicepool_publication.userServices.exclude(state__in=State.INFO_STATES).count() > 0:
raise PublishException(
_('Can\'t unpublish publications with services in process')
)
try:
pubInstance = servicePoolPub.get_instance()
pubInstance = servicepool_publication.get_instance()
state = pubInstance.destroy()
servicePoolPub.set_state(State.REMOVING)
servicepool_publication.set_state(State.REMOVING)
PublicationFinishChecker.state_updater(
servicePoolPub, pubInstance, state
servicepool_publication, pubInstance, state
)
except Exception as e:
raise PublishException(str(e)) from e

View File

@ -73,7 +73,7 @@ class ServerManager(metaclass=singleton.Singleton):
# If counters are too old, restart them
if datetime.datetime.now() - self.last_counters_clean > self.MAX_COUNTERS_AGE:
self.clear_unmanaged_usage()
return Storage(self.STORAGE_NAME).map(atomic=True, group='counters')
return Storage(self.STORAGE_NAME).as_dict(atomic=True, group='counters')
def property_name(self, user: typing.Optional[typing.Union[str, 'models.User']]) -> str:
"""Returns the property name for a user"""

View File

@ -98,7 +98,7 @@ class ServerApiRequester:
with tempfile.NamedTemporaryFile('w', delete=False) as f:
f.write(self.server.certificate) # Save cert
verify = f.name
session = security.secureRequestsSession(verify=verify)
session = security.secure_requests_session(verify=verify)
# Setup headers
session.headers.update(
{

View File

@ -89,7 +89,7 @@ class StatsManager(metaclass=singleton.Singleton):
model: type[typing.Union['StatsCounters', 'StatsEvents', 'StatsCountersAccum']],
) -> None:
minTime = time.mktime(
(sql_datetime() - datetime.timedelta(days=GlobalConfig.STATS_DURATION.getInt())).timetuple()
(sql_datetime() - datetime.timedelta(days=GlobalConfig.STATS_DURATION.as_int())).timetuple()
)
model.objects.filter(stamp__lt=minTime).delete()

View File

@ -68,13 +68,13 @@ class DelayedTaskThread(BaseThread):
class TaskManager(metaclass=singleton.Singleton):
__slots__ = ('threads', 'keepRunning')
__slots__ = ('threads', 'keep_running')
keepRunning: bool
keep_running: bool
threads: list[BaseThread]
def __init__(self):
self.keepRunning = True
self.keep_running = True
self.threads = []
@staticmethod
@ -82,7 +82,7 @@ class TaskManager(metaclass=singleton.Singleton):
return TaskManager()
@staticmethod
def sig_term(sigNum, frame): # pylint: disable=unused-argument
def sig_term(sigNum: int, frame: typing.Any) -> None:
"""
This method will ensure that we finish correctly current running task before exiting.
If we need to stop cause something went wrong (that should not happen), we must send sigterm, wait a while (10-20 secs) and after that send sigkill
@ -92,7 +92,7 @@ class TaskManager(metaclass=singleton.Singleton):
Take a look at killTaskManager.sh :-)
"""
logger.info("Caught term signal, finishing task manager")
TaskManager.manager().keepRunning = False
TaskManager.manager().keep_running = False
def register_job(self, jobType: type[jobs.Job]) -> None:
jobName = jobType.friendly_name
@ -114,7 +114,7 @@ class TaskManager(metaclass=singleton.Singleton):
self.threads.append(thread)
def run(self) -> None:
self.keepRunning = True
self.keep_running = True
# Don't know why, but with django 1.8, must "reset" connections so them do not fail on first access...
# Is simmilar to https://code.djangoproject.com/ticket/21597#comment:29
connection.close()
@ -124,8 +124,8 @@ class TaskManager(metaclass=singleton.Singleton):
self.register_scheduled_tasks()
noSchedulers: int = GlobalConfig.SCHEDULER_THREADS.getInt()
noDelayedTasks: int = GlobalConfig.DELAYED_TASKS_THREADS.getInt()
noSchedulers: int = GlobalConfig.SCHEDULER_THREADS.as_int()
noDelayedTasks: int = GlobalConfig.DELAYED_TASKS_THREADS.as_int()
logger.info(
'Starting %s schedulers and %s task executors', noSchedulers, noDelayedTasks
@ -156,7 +156,7 @@ class TaskManager(metaclass=singleton.Singleton):
# Remote.on()
# gc.set_debug(gc.DEBUG_LEAK)
while self.keepRunning:
while self.keep_running:
time.sleep(1)
for thread in self.threads:

View File

@ -327,27 +327,27 @@ class UserServiceManager(metaclass=singleton.Singleton):
return user_service
def remove(self, user_service: UserService) -> UserService:
def remove(self, userservice: UserService) -> UserService:
"""
Removes a uService element
"""
with transaction.atomic():
user_service = UserService.objects.select_for_update().get(id=user_service.id)
operationsLogger.info('Removing userService %a', user_service.name)
if user_service.is_usable() is False and State.from_str(user_service.state).is_removable() is False:
userservice = UserService.objects.select_for_update().get(id=userservice.id)
operationsLogger.info('Removing userService %a', userservice.name)
if userservice.is_usable() is False and State.from_str(userservice.state).is_removable() is False:
raise OperationException(_('Can\'t remove a non active element'))
user_service.set_state(State.REMOVING)
logger.debug("***** The state now is %s *****", State.from_str(user_service.state).literal)
user_service.setInUse(False) # For accounting, ensure that it is not in use right now
user_service.save()
userservice.set_state(State.REMOVING)
logger.debug("***** The state now is %s *****", State.from_str(userservice.state).literal)
userservice.setInUse(False) # For accounting, ensure that it is not in use right now
userservice.save()
userServiceInstance = user_service.get_instance()
userServiceInstance = userservice.get_instance()
state = userServiceInstance.destroy()
# Data will be serialized on makeUnique process
UserServiceOpChecker.make_unique(user_service, userServiceInstance, state)
UserServiceOpChecker.make_unique(userservice, userServiceInstance, state)
return user_service
return userservice
def remove_or_cancel(self, user_service: UserService):
if user_service.is_usable() or State.from_str(user_service.state).is_removable():

View File

@ -37,7 +37,7 @@ import typing
import collections.abc
from uds.core import exceptions, types
from uds.core.util.security import secureRequestsSession
from uds.core.util.security import secure_requests_session
if typing.TYPE_CHECKING:
from uds.models import UserService
@ -86,7 +86,7 @@ def _execute_actor_request(
verify = f.name
else:
verify = False
session = secureRequestsSession(verify=cert)
session = secure_requests_session(verify=cert)
if data is None:
r = session.get(url, verify=verify, timeout=TIMEOUT)
else:

View File

@ -45,4 +45,4 @@ DO_NOT_REPEAT: typing.Final[cfg.Config.Value] = cfg.Config.section(cfg.Config.Se
)
# Ensure that we have a default value for this on startup
DO_NOT_REPEAT.getInt()
DO_NOT_REPEAT.as_int()

View File

@ -78,7 +78,7 @@ class MessageProcessorThread(BaseThread):
# Locate all notifications from "persistent" and try to process them
# If no notification can be fully resolved, it will be kept in the database
not_before = sql_datetime() - datetime.timedelta(
seconds=DO_NOT_REPEAT.getInt()
seconds=DO_NOT_REPEAT.as_int()
)
for n in Notification.get_persistent_queryset().all():
# If there are any other notification simmilar to this on default db, skip it

View File

@ -71,7 +71,7 @@ class ServiceProviderFactory(factory.ModuleFactory[ServiceProvider]):
offers = []
for s in type_.offers:
if s.uses_cache_l2:
s.uses_cache = True
s.uses_cache = True # Ensures uses cache is true
if s.publication_type is None:
logger.error(
'Provider %s offers %s, but %s needs cache and do not have publication_type defined',
@ -93,9 +93,16 @@ class ServiceProviderFactory(factory.ModuleFactory[ServiceProvider]):
Returns a list of all service providers registered that do not need
to be published
"""
res = []
for p in self.providers().values():
for s in p.offers:
if s.publication_type is None and s.must_assign_manually is False:
res.append(s)
return res
return [
s
for p in self.providers().values()
for s in p.offers
if s.publication_type is None and s.must_assign_manually is False
]
# old code :-)
# res = []
# for p in self.providers().values():
# for s in p.offers:
# if s.publication_type is None and s.must_assign_manually is False:
# res.append(s)
# return res

View File

@ -318,7 +318,7 @@ class Service(Module):
Returns the environment unique mac addresses generator
"""
return typing.cast('UniqueMacGenerator', self.id_generators('mac'))
return typing.cast('UniqueMacGenerator', self.id_generator('mac'))
def name_generator(self) -> typing.Optional['UniqueNameGenerator']:
"""
@ -326,7 +326,7 @@ class Service(Module):
Returns the environment unique name generator
"""
return typing.cast('UniqueNameGenerator', self.id_generators('name'))
return typing.cast('UniqueNameGenerator', self.id_generator('name'))
def enumerate_assignables(self) -> collections.abc.Iterable[tuple[str, str]]:
"""

View File

@ -238,7 +238,7 @@ class UserService(Environmentable, Serializable):
Returns the environment unique mac addresses generator
"""
return typing.cast('UniqueMacGenerator', self.id_generators('mac'))
return typing.cast('UniqueMacGenerator', self.id_generator('mac'))
def name_generator(self) -> 'UniqueNameGenerator':
"""
@ -246,7 +246,7 @@ class UserService(Environmentable, Serializable):
Returns the environment unique name generator
"""
return typing.cast('UniqueNameGenerator', self.id_generators('name'))
return typing.cast('UniqueNameGenerator', self.id_generator('name'))
def gid_generator(self) -> 'UniqueGIDGenerator':
"""
@ -254,7 +254,7 @@ class UserService(Environmentable, Serializable):
Returns the environment unique global id generator
"""
return typing.cast('UniqueGIDGenerator', self.id_generators('id'))
return typing.cast('UniqueGIDGenerator', self.id_generator('id'))
def get_unique_id(self) -> str:
"""

View File

@ -32,8 +32,8 @@ class LogObjectType(enum.IntEnum):
from uds.core.util.config import GlobalConfig # pylint: disable=import-outside-toplevel
if self == LogObjectType.SYSLOG:
return GlobalConfig.GENERAL_LOG_MAX_ELEMENTS.getInt()
return GlobalConfig.INDIVIDIAL_LOG_MAX_ELEMENTS.getInt()
return GlobalConfig.GENERAL_LOG_MAX_ELEMENTS.as_int()
return GlobalConfig.INDIVIDIAL_LOG_MAX_ELEMENTS.as_int()
@staticmethod
def get_type_from_model(model: 'Model') -> 'LogObjectType|None':

View File

@ -125,6 +125,7 @@ class FieldInfo:
tooltip: str
order: int
type: FieldType
stored_field_name: typing.Optional[str] = None
readonly: typing.Optional[bool] = None
value: typing.Union[collections.abc.Callable[[], typing.Any], typing.Any] = None
default: typing.Optional[typing.Union[collections.abc.Callable[[], str], str]] = None

View File

@ -255,35 +255,14 @@ class gui:
_fields_info: types.ui.FieldInfo
def __init__(self, label: str, type: types.ui.FieldType, **kwargs) -> None:
# if defvalue or defaultValue or defValue in kwargs, emit a warning
# with the new name (that is "default"), but use the old one
for new_name, old_names in (
('default', ('defvalue', 'defaultValue', 'defValue')),
('readonly', ('rdonly, readOnly')),
):
for i in old_names:
if i in kwargs:
try:
caller = inspect.stack()[
2
] # bypass this method and the caller (that is a derived class)
except IndexError:
caller = inspect.stack()[1] # bypass only this method
logger.warning(
'Field %s: %s parameter is deprecated, use "%s" instead. Called from %s:%s',
label,
i,
new_name,
caller.filename,
caller.lineno,
)
kwargs[new_name] = kwargs[i]
break
def __init__(
self, label: str, type: types.ui.FieldType, stored_field_name: typing.Optional[str], **kwargs
) -> None:
default = kwargs.get('default')
# Length is not used on some kinds of fields, but present in all anyway
# This property only affects in "modify" operations
self._fields_info = types.ui.FieldInfo(
stored_field_name=stored_field_name,
order=kwargs.get('order') or 0,
label=label,
tooltip=kwargs.get('tooltip') or '',
@ -318,6 +297,12 @@ class gui:
def is_serializable(self) -> bool:
return True
def stored_field_name(self) -> typing.Optional[str]:
"""
Returns the name of the field
"""
return self._fields_info.stored_field_name
def num(self) -> int:
try:
@ -469,8 +454,10 @@ class gui:
value: typing.Optional[str] = None,
pattern: typing.Union[str, types.ui.FieldPatternType] = types.ui.FieldPatternType.NONE,
lines: int = 0,
stored_field_name: typing.Optional[str] = None,
) -> None:
super().__init__(
stored_field_name=stored_field_name,
label=label,
length=length,
readonly=readonly,
@ -515,28 +502,28 @@ class gui:
if isinstance(pattern, types.ui.FieldPatternType):
try:
if pattern == types.ui.FieldPatternType.IPV4:
validators.validateIpv4(self.value)
validators.validate_ipv4(self.value)
elif pattern == types.ui.FieldPatternType.IPV6:
validators.validateIpv6(self.value)
validators.validate_ipv6(self.value)
elif pattern == types.ui.FieldPatternType.IP:
validators.validateIpv4OrIpv6(self.value)
validators.validate_ip(self.value)
elif pattern == types.ui.FieldPatternType.MAC:
validators.validateMac(self.value)
validators.validate_mac(self.value)
elif pattern == types.ui.FieldPatternType.URL:
validators.validateUrl(self.value)
elif pattern == types.ui.FieldPatternType.EMAIL:
validators.validateEmail(self.value)
validators.validate_email(self.value)
elif pattern == types.ui.FieldPatternType.FQDN:
validators.validateFqdn(self.value)
validators.validate_fqdn(self.value)
elif pattern == types.ui.FieldPatternType.HOSTNAME:
validators.validateHostname(self.value)
validators.validate_hostname(self.value)
elif pattern == types.ui.FieldPatternType.HOST:
try:
validators.validateHostname(self.value, allowDomain=True)
validators.validate_hostname(self.value, allowDomain=True)
except exceptions.validation.ValidationError:
validators.validateIpv4OrIpv6(self.value)
validators.validate_ip(self.value)
elif pattern == types.ui.FieldPatternType.PATH:
validators.validatePath(self.value)
validators.validate_path(self.value)
return True
except exceptions.validation.ValidationError:
return False
@ -628,8 +615,10 @@ class gui:
value: typing.Optional[int] = None,
min_value: typing.Optional[int] = None,
max_value: typing.Optional[int] = None,
stored_field_name: typing.Optional[str] = None,
) -> None:
super().__init__(
stored_field_name=stored_field_name,
label=label,
length=length,
readonly=readonly,
@ -681,8 +670,10 @@ class gui:
typing.Union[collections.abc.Callable[[], datetime.date], datetime.date]
] = None,
value: typing.Optional[typing.Union[str, datetime.date]] = None,
stored_field_name: typing.Optional[str] = None,
) -> None:
super().__init__(
stored_field_name=stored_field_name,
label=label,
length=length,
readonly=readonly,
@ -766,8 +757,10 @@ class gui:
tab: typing.Optional[typing.Union[str, types.ui.Tab]] = None,
default: typing.Union[collections.abc.Callable[[], str], str] = '',
value: typing.Optional[str] = None,
stored_field_name: typing.Optional[str] = None,
):
super().__init__(
stored_field_name=stored_field_name,
label=label,
length=length,
readonly=readonly,
@ -828,8 +821,10 @@ class gui:
default: typing.Any = None, # May be also callable
value: typing.Any = None,
serializable: bool = False,
stored_field_name: typing.Optional[str] = None,
) -> None:
super().__init__(
stored_field_name=stored_field_name,
label=label,
order=order,
default=default,
@ -880,8 +875,10 @@ class gui:
tab: typing.Optional[typing.Union[str, types.ui.Tab]] = None,
default: typing.Union[collections.abc.Callable[[], bool], bool] = False,
value: typing.Optional[bool] = None,
stored_field_name: typing.Optional[str] = None,
):
super().__init__(
stored_field_name=stored_field_name,
label=label,
readonly=readonly,
order=order,
@ -1019,8 +1016,10 @@ class gui:
tab: typing.Optional[typing.Union[str, types.ui.Tab]] = None,
default: typing.Union[collections.abc.Callable[[], str], str, None] = None,
value: typing.Optional[str] = None,
stored_field_name: typing.Optional[str] = None,
) -> None:
super().__init__(
stored_field_name=stored_field_name,
label=label,
readonly=readonly,
order=order,
@ -1067,8 +1066,10 @@ class gui:
tab: typing.Optional[typing.Union[str, types.ui.Tab]] = None,
default: typing.Union[collections.abc.Callable[[], str], str, None] = None,
value: typing.Optional[str] = None,
stored_field_name: typing.Optional[str] = None,
):
super().__init__(
stored_field_name=stored_field_name,
label=label,
readonly=readonly,
order=order,
@ -1141,8 +1142,10 @@ class gui:
collections.abc.Callable[[], str], collections.abc.Callable[[], list[str]], list[str], str, None
] = None,
value: typing.Optional[collections.abc.Iterable[str]] = None,
stored_field_name: typing.Optional[str] = None,
):
super().__init__(
stored_field_name=stored_field_name,
label=label,
readonly=readonly,
order=order,
@ -1202,8 +1205,10 @@ class gui:
collections.abc.Callable[[], str], collections.abc.Callable[[], list[str]], list[str], str, None
] = None,
value: typing.Optional[collections.abc.Iterable[str]] = None,
stored_field_name: typing.Optional[str] = None,
) -> None:
super().__init__(
stored_field_name=stored_field_name,
label=label,
readonly=readonly,
order=order,
@ -1230,8 +1235,15 @@ class gui:
"""
def __init__(self, label: str, default: str) -> None:
super().__init__(label=label, default=default, type=types.ui.FieldType.INFO)
def __init__(
self,
label: str,
default: str,
stored_field_name: typing.Optional[str] = None,
) -> None:
super().__init__(
label=label, default=default, type=types.ui.FieldType.INFO, stored_field_name=stored_field_name
)
class UserInterfaceType(type):
@ -1259,7 +1271,7 @@ class UserInterfaceType(type):
_gui[attrName] = attr
new_class_dict[attrName] = attr
new_class_dict['_base_gui'] = _gui
new_class_dict['_gui_fields_template'] = _gui
return typing.cast('UserInterfaceType', type.__new__(mcs, classname, bases, new_class_dict))
@ -1285,7 +1297,7 @@ class UserInterface(metaclass=UserInterfaceAbstract):
error: str
# Class variable that will hold the gui fields description
_base_gui: typing.ClassVar[dict[str, gui.InputField]]
_gui_fields_template: typing.ClassVar[dict[str, gui.InputField]]
# instance variable that will hold the gui fields description
# this allows us to modify the gui fields values at runtime without affecting other instances
@ -1303,7 +1315,7 @@ class UserInterface(metaclass=UserInterfaceAbstract):
# Ensure "gui" points to a copy of original gui, not the original one
# this is done to avoid modifying the original gui description
self._gui = copy.deepcopy(self._base_gui)
self._gui = copy.deepcopy(self._gui_fields_template)
# If a field has a callable on defined attributes(value, default, choices)
# update the reference to the new copy
@ -1416,9 +1428,9 @@ class UserInterface(metaclass=UserInterfaceAbstract):
}
# Any unexpected type will raise an exception
arr = [
(k, v.type.name, fw_converters[v.type](v))
for k, v in self._gui.items()
if fw_converters[v.type](v) is not None
(field_name, field.stored_field_name() or field.type.name, fw_converters[field.type](field))
for field_name, field in self._gui.items()
if fw_converters[field.type](field) is not None
]
return SERIALIZATION_HEADER + SERIALIZATION_VERSION + serialize(arr)
@ -1461,13 +1473,21 @@ class UserInterface(metaclass=UserInterfaceAbstract):
return
arr = _unserialize(values)
# Dict of translations from stored_field_name to field_name
field_names_translations: dict[str, str] = {}
for fld_name, fld in self._gui.items():
fld_stored_field_name = fld.stored_field_name()
if fld_stored_field_name and fld_stored_field_name != fld_name:
field_names_translations[fld_stored_field_name] = fld_name
# Set all values to defaults ones
for k in self._gui:
if self._gui[k].is_type(types.ui.FieldType.HIDDEN) and self._gui[k].is_serializable() is False:
for fld_name in self._gui:
fld = self._gui[fld_name]
if self._gui[fld_name].is_type(types.ui.FieldType.HIDDEN) and self._gui[fld_name].is_serializable() is False:
# logger.debug('Field {0} is not unserializable'.format(k))
continue
self._gui[k].value = self._gui[k].default
self._gui[fld_name].value = self._gui[fld_name].default
converters: collections.abc.Mapping[types.ui.FieldType, collections.abc.Callable[[str], typing.Any]] = {
types.ui.FieldType.TEXT: lambda x: x,
@ -1486,18 +1506,20 @@ class UserInterface(metaclass=UserInterfaceAbstract):
types.ui.FieldType.INFO: lambda x: None,
}
for k, t, v in arr:
if k not in self._gui:
logger.warning('Field %s not found in form', k)
for fld_name, fld_type, fld_value in arr:
if fld_name in field_names_translations:
fld_name = field_names_translations[fld_name] # Convert stored_field_name to field_name if needed
if fld_name not in self._gui:
logger.warning('Field %s not found in form', fld_name)
continue
field_type = self._gui[k].type
field_type = self._gui[fld_name].type
if field_type not in converters:
logger.warning('Field %s has no converter', k)
logger.warning('Field %s has no converter', fld_name)
continue
if t != field_type.name:
logger.warning('Field %s has different type than expected', k)
if fld_type != field_type.name:
logger.warning('Field %s has different type than expected', fld_name)
continue
self._gui[k].value = converters[field_type](v)
self._gui[fld_name].value = converters[field_type](fld_value)
def deserialize_old_fields(self, values: bytes) -> None:
"""

View File

@ -37,7 +37,6 @@ import logging
from django.db import transaction
from django.db.utils import OperationalError
from uds.models.cache import Cache as DBCache
from uds.core.util.model import sql_datetime
from uds.core.util import serializer

View File

@ -215,6 +215,4 @@ class CalendarChecker:
@staticmethod
def _gen_cache_key(key: str) -> str:
# Returns a valid cache key for all caching backends (memcached, redis, or whatever)
# Simple, fastest algorihm is to use md5
return hashlib.md5(key.encode('utf-8')).hexdigest() # nosec simple fast algorithm for cache keys
return hashlib.sha256(key.encode('utf-8'), usedforsecurity=False).hexdigest()

View File

@ -98,25 +98,25 @@ class Config:
return Config.SectionType
class Section:
_sectionName: 'Config.SectionType'
_section_name: 'Config.SectionType'
def __init__(self, sectionName: 'Config.SectionType') -> None:
self._sectionName = sectionName
self._section_name = sectionName
def value(self, key, default='', **kwargs) -> 'Config.Value':
return Config.value(self, key, default, **kwargs)
def valueCrypt(self, key, default='', **kwargs) -> 'Config.Value':
def value_encrypted(self, key, default='', **kwargs) -> 'Config.Value':
return Config.value(self, key, default, True, **kwargs)
def valueLong(self, key, default='', **kwargs) -> 'Config.Value':
def value_longtext(self, key, default='', **kwargs) -> 'Config.Value':
return Config.value(self, key, default, False, True, **kwargs)
def name(self) -> str:
return self._sectionName
return self._section_name
def __str__(self) -> str:
return self._sectionName
return self._section_name
class Value:
_section: 'Config.Section' # type: ignore # mypy complains??
@ -195,10 +195,10 @@ class Config:
return CryptoManager().decrypt(typing.cast(str, self._data))
return typing.cast(str, self._data)
def setParams(self, params: typing.Any) -> None:
def set_params(self, params: typing.Any) -> None:
_configParams[self._section.name() + self._key] = params
def getInt(self, force: bool = False) -> int:
def as_int(self, force: bool = False) -> int:
try:
return int(self.get(force))
except Exception:
@ -228,19 +228,19 @@ class Config:
def section(self) -> str:
return self._section.name()
def isCrypted(self) -> bool:
def is_encrypted(self) -> bool:
return self._crypt
def isLongText(self) -> bool:
def is_long_text(self) -> bool:
return self._longText
def get_type(self) -> int:
return self._type
def getParams(self) -> typing.Any:
def get_params(self) -> typing.Any:
return _configParams.get(self._section.name() + self._key, None)
def getHelp(self) -> str:
def get_help(self) -> str:
return gettext(self._help)
def set(self, value: typing.Union[str, bool, int]) -> None:
@ -309,7 +309,7 @@ class Config:
continue
logger.debug('%s.%s:%s,%s', cfg.section, cfg.key, cfg.value, cfg.field_type)
if cfg.crypt:
val = Config.section(Config.SectionType.from_str(cfg.section)).valueCrypt(cfg.key)
val = Config.section(Config.SectionType.from_str(cfg.section)).value_encrypted(cfg.key)
else:
val = Config.section(Config.SectionType.from_str(cfg.section)).value(cfg.key)
yield val
@ -342,7 +342,7 @@ class Config:
return False
@staticmethod
def getConfigValues(
def get_config_values(
addCrypt: bool = False,
) -> collections.abc.Mapping[str, collections.abc.Mapping[str, collections.abc.Mapping[str, typing.Any]]]:
"""
@ -355,7 +355,7 @@ class Config:
if cfg.key() in REMOVED_CONFIG_ELEMENTS.get(cfg.section(), ()):
continue
if cfg.isCrypted() is True and addCrypt is False:
if cfg.is_encrypted() is True and addCrypt is False:
continue
if cfg.get_type() == Config.FieldType.PASSWORD and addCrypt is False:
@ -366,11 +366,11 @@ class Config:
res[cfg.section()] = {}
res[cfg.section()][cfg.key()] = {
'value': cfg.get(),
'crypt': cfg.isCrypted(),
'longText': cfg.isLongText(),
'crypt': cfg.is_encrypted(),
'longText': cfg.is_long_text(),
'type': cfg.get_type(),
'params': cfg.getParams(),
'help': cfg.getHelp(),
'params': cfg.get_params(),
'help': cfg.get_help(),
}
logger.debug('Configuration: %s', res)
return res
@ -448,7 +448,7 @@ class GlobalConfig:
'superUser', 'root', type=Config.FieldType.TEXT, help=_('Superuser username')
)
# Superuser password (do not need to be at database!!!)
SUPER_USER_PASS: Config.Value = Config.section(Config.SectionType.SECURITY).valueCrypt(
SUPER_USER_PASS: Config.Value = Config.section(Config.SectionType.SECURITY).value_encrypted(
'rootPass', 'udsmam0', type=Config.FieldType.PASSWORD, help=_('Superuser password')
)
SUPER_USER_ALLOW_WEBACCESS: Config.Value = Config.section(Config.SectionType.SECURITY).value(

View File

@ -393,7 +393,7 @@ def blocker(
result = f(*args, **kwargs)
except uds.core.exceptions.rest.BlockAccess:
# Increment
blockCache.put(ip, failuresCount + 1, GlobalConfig.LOGIN_BLOCK.getInt())
blockCache.put(ip, failuresCount + 1, GlobalConfig.LOGIN_BLOCK.as_int())
raise exceptions.rest.AccessDenied
# Any other exception will be raised
except Exception:

View File

@ -43,7 +43,7 @@ try:
except ImportError:
import hashlib
hasher = hashlib.md5 # nosec: just a hashm not for crypto
hasher = hashlib.sha256 # nosec: just a hashm not for crypto

View File

@ -35,7 +35,7 @@ except Exception: # nosec: simple check for disabling warnings,
pass
def selfSignedCert(ip: str) -> tuple[str, str, str]:
def create_self_signed_cert(ip: str) -> tuple[str, str, str]:
"""
Generates a self signed certificate for the given ip.
This method is mainly intended to be used for generating/saving Actor certificates.
@ -80,7 +80,7 @@ def selfSignedCert(ip: str) -> tuple[str, str, str]:
)
def createClientSslContext(verify: bool = True) -> ssl.SSLContext:
def create_client_sslcontext(verify: bool = True) -> ssl.SSLContext:
"""
Creates a SSLContext for client connections.
@ -115,7 +115,7 @@ def createClientSslContext(verify: bool = True) -> ssl.SSLContext:
return sslContext
def checkCertificateMatchPrivateKey(*, cert: str, key: str) -> bool:
def check_certificate_matches_private_key(*, cert: str, key: str) -> bool:
"""
Checks if a certificate and a private key match.
All parameters must be keyword arguments.
@ -147,7 +147,7 @@ def checkCertificateMatchPrivateKey(*, cert: str, key: str) -> bool:
return False
def secureRequestsSession(
def secure_requests_session(
*, verify: typing.Union[str, bool] = True
) -> 'requests.Session':
'''
@ -167,7 +167,7 @@ def secureRequestsSession(
class UDSHTTPAdapter(requests.adapters.HTTPAdapter):
def init_poolmanager(self, *args, **kwargs) -> None:
kwargs["ssl_context"] = createClientSslContext(verify=verify is True)
kwargs["ssl_context"] = create_client_sslcontext(verify=verify is True)
# See urllib3.poolmanager.SSL_KEYWORDS for all available keys.
return super().init_poolmanager(*args, **kwargs)
@ -196,7 +196,7 @@ def secureRequestsSession(
return session
def checkServerCertificateIsValid(cert: str) -> bool:
def is_server_certificate_valid(cert: str) -> bool:
"""
Checks if a certificate is valid.
All parameters must be keyword arguments.

View File

@ -29,6 +29,7 @@
"""
@author: Adolfo Gómez, dkmaster at dkmon dot com
"""
from email.mime import base
import pickle # nosec: This is e controled pickle use
import base64
import hashlib
@ -46,20 +47,22 @@ logger = logging.getLogger(__name__)
MARK = '_mgb_'
def _calculate_key(owner: bytes, key: bytes, extra: typing.Optional[bytes] = None) -> str:
def _old_calculate_key(owner: bytes, key: bytes) -> str:
h = hashlib.md5(usedforsecurity=False)
h.update(owner)
h.update(key)
if extra:
h.update(extra)
return h.hexdigest()
def _encode_value(key: str, value: typing.Any, compat: bool = False) -> str:
if not compat:
return base64.b64encode(pickle.dumps((MARK, key, value))).decode()
# Compatibility save
return base64.b64encode(pickle.dumps(value)).decode()
def _calculate_key(owner: bytes, key: bytes) -> str:
h = hashlib.sha256(usedforsecurity=False)
h.update(owner)
h.update(key)
return h.hexdigest()
def _encode_value(key: str, value: typing.Any) -> str:
return base64.b64encode(pickle.dumps((MARK, key, value))).decode()
def _decode_value(dbk: str, value: typing.Optional[str]) -> tuple[str, typing.Any]:
@ -83,12 +86,15 @@ class StorageAsDict(MutableMapping):
Accesses storage as dictionary. Much more convenient that old method
"""
_group: str
_owner: str
_atomic: bool
def __init__(
self,
owner: str,
group: typing.Optional[str],
atomic: bool = False,
compat: bool = False,
) -> None:
"""Initializes an storage as dict accesor
@ -102,7 +108,6 @@ class StorageAsDict(MutableMapping):
self._group = group or ''
self._owner = owner
self._atomic = atomic # Not used right now, maybe removed
self._compat = compat
@property
def _db(self) -> typing.Union[models.QuerySet, models.Manager]:
@ -117,33 +122,43 @@ class StorageAsDict(MutableMapping):
fltr_params['attr1'] = self._group
return typing.cast('models.QuerySet[DBStorage]', self._db.filter(**fltr_params))
def _key(self, key: str) -> str:
def _key(self, key: str, old_method: bool = False) -> str:
if key[0] == '#':
# Compat with old db key
return key[1:]
return _calculate_key(self._owner.encode(), key.encode())
if not old_method:
return _calculate_key(self._owner.encode(), key.encode())
return _old_calculate_key(self._owner.encode(), key.encode())
def __getitem__(self, key: str) -> typing.Any:
if not isinstance(key, str):
raise TypeError(f'Key must be str, {type(key)} found')
dbk = self._key(key)
try:
c: DBStorage = typing.cast(DBStorage, self._db.get(pk=dbk))
if c.owner != self._owner: # Maybe a key collision,
logger.error('Key collision detected for key %s', key)
return None
okey, value = _decode_value(dbk, c.data)
return _decode_value(dbk, c.data)[1] # Ignores original key
except DBStorage.DoesNotExist:
return None
# First, try new key, and, if needed, old key
# If old key is found, it will be updated to new key
for use_old_method in (False, True):
db_key = self._key(key, old_method=use_old_method)
try:
c: DBStorage = typing.cast(DBStorage, self._db.get(pk=db_key))
if c.owner != self._owner: # Maybe a key collision,
logger.error('Key collision detected for key %s', key)
return None
okey, value = _decode_value(db_key, c.data)
if use_old_method:
# Update key on db
c.delete()
DBStorage.objects.create(key=self._key(key), owner=self._owner, data=c.data, attr1=c.attr1)
return value
except DBStorage.DoesNotExist:
pass
return None
def __setitem__(self, key: str, value: typing.Any) -> None:
if not isinstance(key, str):
raise TypeError(f'Key must be str type, {type(key)} found')
dbk = self._key(key)
data = _encode_value(key, value, self._compat)
data = _encode_value(key, value)
# ignores return value, we don't care if it was created or updated
DBStorage.objects.update_or_create(
key=dbk, defaults={'data': data, 'attr1': self._group, 'owner': self._owner}
@ -195,22 +210,19 @@ class StorageAccess:
Allows the access to the storage as a dict, with atomic transaction if requested
"""
owner: str
group: typing.Optional[str]
atomic: bool
compat: bool
_owner: str
_group: typing.Optional[str]
_atomic: typing.Optional[transaction.Atomic]
def __init__(
self,
owner: str,
group: typing.Optional[str] = None,
atomic: bool = False,
compat: bool = False,
):
self._owner = owner
self._group = group
self._atomic = transaction.atomic() if atomic else None
self._compat = compat
def __enter__(self):
if self._atomic:
@ -219,7 +231,6 @@ class StorageAccess:
owner=self._owner,
group=self._group,
atomic=bool(self._atomic),
compat=self._compat,
)
def __exit__(self, exc_type, exc_value, traceback):
@ -235,8 +246,11 @@ class Storage:
self._owner = typing.cast(str, owner.decode('utf-8') if isinstance(owner, bytes) else owner)
self._bowner = self._owner.encode('utf8')
def get_key(self, key: typing.Union[str, bytes]) -> str:
return _calculate_key(self._bowner, key.encode('utf8') if isinstance(key, str) else key)
def get_key(self, key: typing.Union[str, bytes], old_method: bool = False) -> str:
bkey: bytes = key.encode('utf8') if isinstance(key, str) else key
if not old_method:
return _calculate_key(self._bowner, bkey)
return _old_calculate_key(self._bowner, bkey)
def save_to_db(
self,
@ -252,14 +266,14 @@ class Storage:
key = self.get_key(skey)
if isinstance(data, str):
data = data.encode('utf-8')
data_string = codecs.encode(data, 'base64').decode()
data_encoded = base64.b64encode(data).decode()
attr1 = attr1 or ''
try:
DBStorage.objects.create(owner=self._owner, key=key, data=data_string, attr1=attr1)
DBStorage.objects.create(owner=self._owner, key=key, data=data_encoded, attr1=attr1)
except Exception:
with transaction.atomic():
DBStorage.objects.filter(key=key).select_for_update().update(
owner=self._owner, data=data_string, attr1=attr1
owner=self._owner, data=data_encoded, attr1=attr1
) # @UndefinedVariable
def put(self, skey: typing.Union[str, bytes], data: typing.Any) -> None:
@ -286,20 +300,29 @@ class Storage:
def read_from_db(
self, skey: typing.Union[str, bytes], fromPickle: bool = False
) -> typing.Optional[typing.Union[str, bytes]]:
try:
key = self.get_key(skey)
c: DBStorage = DBStorage.objects.get(pk=key) # @UndefinedVariable
val = codecs.decode(c.data.encode(), 'base64')
if fromPickle:
return val
for use_old_method in (False, True):
try:
return val.decode('utf-8') # Tries to encode in utf-8
except Exception:
return val
except DBStorage.DoesNotExist: # @UndefinedVariable
return None
key = self.get_key(skey, old_method=use_old_method)
c: DBStorage = DBStorage.objects.get(pk=key) # @UndefinedVariable
val = base64.b64decode(c.data.encode())
# if old key is used, update it to new key
if use_old_method:
# Remove and re-create with new key
c.delete()
DBStorage.objects.create(
key=self.get_key(skey), owner=self._owner, data=c.data, attr1=c.attr1
)
if fromPickle:
return val
try:
return val.decode('utf-8') # Tries to encode in utf-8
except Exception:
return val
except DBStorage.DoesNotExist:
pass
return None
def get(self, skey: typing.Union[str, bytes]) -> typing.Optional[typing.Union[str, bytes]]:
return self.read_from_db(skey)
@ -348,13 +371,12 @@ class Storage:
"""
# dbStorage.objects.unlock() # @UndefinedVariable
def map(
def as_dict(
self,
group: typing.Optional[str] = None,
atomic: bool = False,
compat: bool = False,
) -> StorageAccess:
return StorageAccess(self._owner, group=group, atomic=atomic, compat=compat)
return StorageAccess(self._owner, group=group, atomic=atomic)
def search_by_attr1(
self, attr1: typing.Union[collections.abc.Iterable[str], str]
@ -368,14 +390,14 @@ class Storage:
yield codecs.decode(v.data.encode(), 'base64')
def filter(
self, attr1: typing.Optional[str] = None, forUpdate: bool = False
self, attr1: typing.Optional[str] = None, for_update: bool = False
) -> collections.abc.Iterable[tuple[str, bytes, 'str|None']]:
if attr1 is None:
query = DBStorage.objects.filter(owner=self._owner) # @UndefinedVariable
else:
query = DBStorage.objects.filter(owner=self._owner, attr1=attr1) # @UndefinedVariable
if forUpdate:
if for_update:
query = query.select_for_update()
for v in query: # @UndefinedVariable

View File

@ -49,7 +49,7 @@ logger = logging.getLogger(__name__)
url_validator = dj_validators.URLValidator(['http', 'https'])
def validateNumeric(
def validate_numeric(
value: typing.Union[str, int],
min_value: typing.Optional[int] = None,
max_value: typing.Optional[int] = None,
@ -87,7 +87,7 @@ def validateNumeric(
return int(value)
def validateHostname(hostname: str, maxLength: int = 64, allowDomain=False) -> str:
def validate_hostname(hostname: str, maxLength: int = 64, allowDomain=False) -> str:
if len(hostname) > maxLength:
raise exceptions.validation.ValidationError(
_('{} is not a valid hostname: maximum host name length exceeded.').format(hostname)
@ -107,8 +107,8 @@ def validateHostname(hostname: str, maxLength: int = 64, allowDomain=False) -> s
return hostname
def validateFqdn(fqdn: str, maxLength: int = 255) -> str:
return validateHostname(fqdn, maxLength, allowDomain=True)
def validate_fqdn(fqdn: str, maxLength: int = 255) -> str:
return validate_hostname(fqdn, maxLength, allowDomain=True)
def validateUrl(url: str, maxLength: int = 1024) -> str:
@ -123,7 +123,7 @@ def validateUrl(url: str, maxLength: int = 1024) -> str:
return url
def validateIpv4(ipv4: str) -> str:
def validate_ipv4(ipv4: str) -> str:
"""
Validates that a ipv4 address is valid
:param ipv4: ipv4 address to validate
@ -137,7 +137,7 @@ def validateIpv4(ipv4: str) -> str:
return ipv4
def validateIpv6(ipv6: str) -> str:
def validate_ipv6(ipv6: str) -> str:
"""
Validates that a ipv6 address is valid
:param ipv6: ipv6 address to validate
@ -151,7 +151,7 @@ def validateIpv6(ipv6: str) -> str:
return ipv6
def validateIpv4OrIpv6(ipv4OrIpv6: str) -> str:
def validate_ip(ipv4_or_ipv6: str) -> str:
"""
Validates that a ipv4 or ipv6 address is valid
:param ipv4OrIpv6: ipv4 or ipv6 address to validate
@ -159,15 +159,15 @@ def validateIpv4OrIpv6(ipv4OrIpv6: str) -> str:
:return: Raises exceptions.Validation exception if is invalid, else return the value "fixed"
"""
try:
dj_validators.validate_ipv46_address(ipv4OrIpv6)
dj_validators.validate_ipv46_address(ipv4_or_ipv6)
except Exception:
raise exceptions.validation.ValidationError(
_('{} is not a valid IPv4 or IPv6 address').format(ipv4OrIpv6)
_('{} is not a valid IPv4 or IPv6 address').format(ipv4_or_ipv6)
) from None
return ipv4OrIpv6
return ipv4_or_ipv6
def validatePath(
def validate_path(
path: str,
maxLength: int = 1024,
mustBeWindows: bool = False,
@ -207,7 +207,7 @@ def validatePath(
return path
def validatePort(port: typing.Union[str, int]) -> int:
def validate_port(port: typing.Union[str, int]) -> int:
"""
Validates that a port number is valid
@ -220,10 +220,10 @@ def validatePort(port: typing.Union[str, int]) -> int:
Raises:
exceptions.ValidationException: if port is not valid
"""
return validateNumeric(port, min_value=1, max_value=65535, fieldName='Port')
return validate_numeric(port, min_value=1, max_value=65535, fieldName='Port')
def validateHost(host: str) -> str:
def validate_host(host: str) -> str:
"""
Validates that a host is valid
:param host: host to validate
@ -233,10 +233,10 @@ def validateHost(host: str) -> str:
dj_validators.validate_ipv46_address(host)
return host
except Exception:
return validateFqdn(host)
return validate_fqdn(host)
def validateHostPortPair(hostPortPair: str) -> tuple[str, int]:
def validate_host_port(host_port_pair: str) -> tuple[str, int]:
"""
Validates that a host:port pair is valid
:param hostPortPair: host:port pair to validate
@ -244,32 +244,32 @@ def validateHostPortPair(hostPortPair: str) -> tuple[str, int]:
:return: Raises exceptions.Validation exception if is invalid, else return the value "fixed"
"""
try:
if '[' in hostPortPair and ']' in hostPortPair: # IPv6
host, port = hostPortPair.split(']:')
if '[' in host_port_pair and ']' in host_port_pair: # IPv6
host, port = host_port_pair.split(']:')
host = host[1:]
else:
host, port = hostPortPair.split(':')
host, port = host_port_pair.split(':')
# if an ip address is used, it must be valid
try:
dj_validators.validate_ipv46_address(host)
return host, validatePort(port)
return host, validate_port(port)
except Exception:
return validateHostname(host, 255, False), validatePort(port)
return validate_hostname(host, 255, False), validate_port(port)
except Exception:
raise exceptions.validation.ValidationError(_('{} is not a valid host:port pair').format(hostPortPair)) from None
raise exceptions.validation.ValidationError(_('{} is not a valid host:port pair').format(host_port_pair)) from None
def validateTimeout(timeOutStr: str) -> int:
def validate_timeout(timeOutStr: str) -> int:
"""
Validates that a timeout value is valid
:param timeOutStr: timeout to validate
:param returnAsInteger: if True, returns value as integer, if not, as string
:return: Raises exceptions.Validation exception if is invalid, else return the value "fixed"
"""
return validateNumeric(timeOutStr, min_value=0, fieldName='Timeout')
return validate_numeric(timeOutStr, min_value=0, fieldName='Timeout')
def validateMac(mac: str) -> str:
def validate_mac(mac: str) -> str:
"""
Validates that a mac address is valid
:param mac: mac address to validate
@ -288,7 +288,7 @@ def validateMac(mac: str) -> str:
return mac
def validateMacRange(macRange: str) -> str:
def validate_mac_range(macRange: str) -> str:
"""
Corrects mac range (uppercase, without spaces), and checks that is range is valid
:param macRange: Range to fix
@ -296,15 +296,15 @@ def validateMacRange(macRange: str) -> str:
"""
try:
macRangeStart, macRangeEnd = macRange.split('-')
validateMac(macRangeStart)
validateMac(macRangeEnd)
validate_mac(macRangeStart)
validate_mac(macRangeEnd)
except Exception:
raise exceptions.validation.ValidationError(_('{} is not a valid MAC range').format(macRange)) from None
return macRange
def validateEmail(email: str) -> str:
def validate_email(email: str) -> str:
"""
Validates that an email is valid
:param email: email to validate
@ -319,7 +319,7 @@ def validateEmail(email: str) -> str:
return email
def validateBasename(baseName: str, length: int = -1) -> str:
def validate_basename(baseName: str, length: int = -1) -> str:
""" "Checks if the basename + length is valid for services. Raises an exception if not valid"
Arguments:
@ -348,7 +348,7 @@ def validateBasename(baseName: str, length: int = -1) -> str:
return baseName
def validateJson(jsonData: typing.Optional[str]) -> typing.Any:
def validate_json(jsonData: typing.Optional[str]) -> typing.Any:
"""
Validates that a json data is valid (or empty)
@ -369,7 +369,7 @@ def validateJson(jsonData: typing.Optional[str]) -> typing.Any:
raise exceptions.validation.ValidationError(_('Invalid JSON data')) from None
def validateServerCertificate(cert: typing.Optional[str]) -> str:
def validate_server_certificate(cert: typing.Optional[str]) -> str:
"""
Validates that a certificate is valid
@ -385,13 +385,13 @@ def validateServerCertificate(cert: typing.Optional[str]) -> str:
if not cert:
return ''
try:
security.checkServerCertificateIsValid(cert)
security.is_server_certificate_valid(cert)
except Exception as e:
raise exceptions.validation.ValidationError(_('Invalid certificate') + f' :{e}') from e
return cert
def validateServerCertificateMulti(value: typing.Optional[str]) -> str:
def validate_server_certificate_multiple(value: typing.Optional[str]) -> str:
"""
Validates the multi line fields refering to attributes
"""

View File

@ -50,7 +50,7 @@ class AssignedAndUnused(Job):
def run(self) -> None:
since_state = sql_datetime() - timedelta(
seconds=GlobalConfig.CHECK_UNUSED_TIME.getInt()
seconds=GlobalConfig.CHECK_UNUSED_TIME.as_int()
)
# Locate service pools with pending assigned service in use
outdatedServicePools = ServicePool.objects.annotate(

View File

@ -51,9 +51,9 @@ class HangedCleaner(Job):
def run(self) -> None:
now = sql_datetime()
since_state = now - timedelta(
seconds=GlobalConfig.MAX_INITIALIZING_TIME.getInt()
seconds=GlobalConfig.MAX_INITIALIZING_TIME.as_int()
)
since_removing = now - timedelta(seconds=GlobalConfig.MAX_REMOVAL_TIME.getInt())
since_removing = now - timedelta(seconds=GlobalConfig.MAX_REMOVAL_TIME.as_int())
# Filter for locating machine not ready
flt = Q(state_date__lt=since_state, state=State.PREPARING) | Q(
state_date__lt=since_state, state=State.USABLE, os_state=State.PREPARING

View File

@ -54,7 +54,7 @@ class PublicationInfoItemsCleaner(Job):
def run(self) -> None:
removeFrom = sql_datetime() - timedelta(
seconds=GlobalConfig.KEEP_INFO_TIME.getInt(True)
seconds=GlobalConfig.KEEP_INFO_TIME.as_int(True)
)
ServicePoolPublication.objects.filter(
state__in=State.INFO_STATES, state_date__lt=removeFrom

View File

@ -55,7 +55,7 @@ class DeployedServiceInfoItemsCleaner(Job):
def run(self) -> None:
removeFrom = sql_datetime() - timedelta(
seconds=GlobalConfig.KEEP_INFO_TIME.getInt()
seconds=GlobalConfig.KEEP_INFO_TIME.as_int()
)
ServicePool.objects.filter(
state__in=State.INFO_STATES, state_date__lt=removeFrom

View File

@ -171,7 +171,7 @@ class StatsAccumulator(Job):
def run(self):
try:
StatsManager.manager().acummulate(config.GlobalConfig.STATS_ACCUM_MAX_CHUNK_TIME.getInt())
StatsManager.manager().acummulate(config.GlobalConfig.STATS_ACCUM_MAX_CHUNK_TIME.as_int())
except Exception:
logger.exception('Compressing counters')

View File

@ -59,7 +59,7 @@ class UserServiceInfoItemsCleaner(Job):
def run(self) -> None:
removeFrom = sql_datetime() - timedelta(
seconds=GlobalConfig.KEEP_INFO_TIME.getInt(True)
seconds=GlobalConfig.KEEP_INFO_TIME.as_int(True)
)
logger.debug('Removing information user services from %s', removeFrom)
with transaction.atomic():
@ -78,7 +78,7 @@ class UserServiceRemover(Job):
def run(self) -> None:
# USER_SERVICE_REMOVAL_LIMIT is the maximum number of items to remove at once
# This configuration value is cached at startup, so it is not updated until next reload
removeAtOnce: int = GlobalConfig.USER_SERVICE_CLEAN_NUMBER.getInt()
removeAtOnce: int = GlobalConfig.USER_SERVICE_CLEAN_NUMBER.as_int()
manager = UserServiceManager()
with transaction.atomic():

View File

@ -70,7 +70,7 @@ class Command(BaseCommand):
if options['password']:
kwargs['type'] = Config.FieldType.PASSWORD
if options['force_crypt']:
value = Config.section(mod).valueCrypt(name, value).get()
value = Config.section(mod).value_encrypted(name, value).get()
else:
Config.section(mod).value(name, value).get()
except Exception as e:

View File

@ -73,7 +73,7 @@ class Command(BaseCommand):
elif options['yaml']:
writer = {} # Create a dict to store data, and write at the end
# Get sections, key, value as a list of tuples
for section, data in config.Config.getConfigValues().items():
for section, data in config.Config.get_config_values().items():
for key, value in data.items():
# value is a dict, get 'value' key
if options['csv']:

View File

@ -193,14 +193,14 @@ class EmailMFA(mfas.MFA):
# Now check is valid format
if ':' in hostname:
host, port = validators.validateHostPortPair(hostname)
host, port = validators.validate_host_port(hostname)
self.hostname.value = f'{host}:{port}'
else:
host = self.hostname.as_clean_str()
self.hostname.value = validators.validateFqdn(host)
self.hostname.value = validators.validate_fqdn(host)
# now check from email and to email
self.fromEmail.value = validators.validateEmail(self.fromEmail.value)
self.fromEmail.value = validators.validate_email(self.fromEmail.value)
def html(self, request: 'ExtendedHttpRequest', userId: str, username: str) -> str:
return gettext('Check your mail. You will receive an email with the verification code')

View File

@ -282,7 +282,7 @@ class SMSMFA(mfas.MFA):
return url
def getSession(self) -> requests.Session:
session = security.secureRequestsSession(verify=self.ignoreCertificateErrors.as_bool())
session = security.secure_requests_session(verify=self.ignoreCertificateErrors.as_bool())
# 0 means no authentication
if self.authenticationMethod.value == '1':
session.auth = requests.auth.HTTPBasicAuth(

View File

@ -159,9 +159,9 @@ def _process_request(request: 'ExtendedHttpRequest') -> typing.Optional['HttpRes
request.session[EXPIRY_KEY] = (
now
+ datetime.timedelta(
seconds=GlobalConfig.SESSION_DURATION_ADMIN.getInt()
seconds=GlobalConfig.SESSION_DURATION_ADMIN.as_int()
if request.user.is_staff()
else GlobalConfig.SESSION_DURATION_USER.getInt()
else GlobalConfig.SESSION_DURATION_USER.as_int()
)
).isoformat() # store as ISO format, str, json serilizable

View File

@ -10,7 +10,7 @@ logger = logging.getLogger(__name__)
def migrate(apps: typing.Any, schema_editor: typing.Any) -> None:
"""
Migrates an old tunnel transport to a new one (with tunnelServer)
Migrates old properties to new ones
"""
try:
UserServiceProperty = apps.get_model('uds', 'UserServiceProperty')
@ -28,7 +28,7 @@ def migrate(apps: typing.Any, schema_editor: typing.Any) -> None:
def rollback(apps: typing.Any, schema_editor: typing.Any) -> None:
"""
Migrates an old tunnel transport to a new one (with tunnelServer)
rollback migration
"""
try:
UserServiceProperty = apps.get_model('uds', 'UserServiceProperty')

View File

@ -72,7 +72,7 @@ def migrate(
server_ip_hostname: list[tuple[str, str]] = []
for server in servers:
try:
validators.validateIpv4OrIpv6(server)
validators.validate_ip(server)
# Is Pure IP, try to get hostname
try:
answers = dns.resolver.resolve(dns.reversename.from_address(server), 'PTR')

View File

@ -292,7 +292,7 @@ class Authenticator(ManagedObjectModel, TaggingMixin):
if toDelete.data != '':
s = toDelete.get_instance()
s.destroy()
s.env.clearRelatedData()
s.env.clean_related_data()
# Clears related logs
log.clear_logs(toDelete)

View File

@ -69,7 +69,7 @@ class ManagedObjectModel(UUIDModel):
"""
Returns an environment valid for the record this object represents
"""
return Environment.getEnvForTableElement(self._meta.verbose_name, self.id) # type: ignore # pylint: disable=no-member
return Environment.get_environment_for_table(self._meta.verbose_name, self.id) # type: ignore # pylint: disable=no-member
def deserialize(
self, obj: Module, values: typing.Optional[collections.abc.Mapping[str, str]]

View File

@ -101,7 +101,7 @@ class MFA(ManagedObjectModel, TaggingMixin): # type: ignore
try:
s = to_delete.get_instance()
s.destroy()
s.env.clearRelatedData()
s.env.clean_related_data()
except Exception as e:
logger.error(
'Error processing deletion of notifier %s: %s (forced deletion)',

View File

@ -140,7 +140,7 @@ class Notifier(ManagedObjectModel, TaggingMixin):
try:
s = to_delete.get_instance()
s.destroy() # Invokes the destruction of "related own data"
s.env.clearRelatedData() # Clears related data, such as storage, cache, etc...
s.env.clean_related_data() # Clears related data, such as storage, cache, etc...
except Exception as e:
logger.error(
'Error processing deletion of notifier %s: %s (forced deletion)',

View File

@ -116,7 +116,7 @@ class OSManager(ManagedObjectModel, TaggingMixin):
if to_delete.data != '':
s = to_delete.get_instance()
s.destroy()
s.env.clearRelatedData()
s.env.clean_related_data()
logger.debug('Before delete os manager %s', to_delete)

View File

@ -121,7 +121,7 @@ class Provider(ManagedObjectModel, TaggingMixin): # type: ignore
if to_delete.data != '':
s = to_delete.get_instance()
s.destroy()
s.env.clearRelatedData()
s.env.clean_related_data()
# Clears related logs
log.clear_logs(to_delete)

View File

@ -87,7 +87,7 @@ class Scheduler(models.Model):
"""
Returns an environment valid for the record this object represents
"""
return Environment.getEnvForTableElement(self._meta.verbose_name, self.id) # type: ignore # pylint: disable=no-member
return Environment.get_environment_for_table(self._meta.verbose_name, self.id) # type: ignore # pylint: disable=no-member
def get_instance(self) -> typing.Optional[jobs.Job]:
"""
@ -108,7 +108,7 @@ class Scheduler(models.Model):
"""
toDelete: 'Scheduler' = kwargs['instance']
logger.debug('Deleting sheduled task %s', toDelete)
toDelete.get_environment().clearRelatedData()
toDelete.get_environment().clean_related_data()
def __str__(self) -> str:
return f'Scheduled task {self.name}, every {self.frecuency}, last execution at {self.last_execution}, state = {self.state}'

View File

@ -103,7 +103,7 @@ class Service(ManagedObjectModel, TaggingMixin): # type: ignore
"""
Returns an environment valid for the record this object represents
"""
return Environment.getEnvForTableElement(
return Environment.get_environment_for_table(
self._meta.verbose_name, # type: ignore
self.id,
{
@ -220,7 +220,7 @@ class Service(ManagedObjectModel, TaggingMixin): # type: ignore
if to_delete.data != '':
s = to_delete.get_instance()
s.destroy()
s.env.clearRelatedData()
s.env.clean_related_data()
# Clears related logs
log.clear_logs(to_delete)

View File

@ -165,7 +165,7 @@ class ServicePool(UUIDModel, TaggingMixin): # type: ignore
"""
Returns an environment valid for the record this object represents
"""
return Environment.getEnvForTableElement(self._meta.verbose_name, self.id) # type: ignore
return Environment.get_environment_for_table(self._meta.verbose_name, self.id) # type: ignore
def active_publication(self) -> typing.Optional['ServicePoolPublication']:
"""
@ -207,13 +207,13 @@ class ServicePool(UUIDModel, TaggingMixin): # type: ignore
from uds.models.user_service import \
UserService # pylint: disable=import-outside-toplevel
if GlobalConfig.RESTRAINT_TIME.getInt() <= 0:
if GlobalConfig.RESTRAINT_TIME.as_int() <= 0:
return (
ServicePool.objects.none()
) # Do not perform any restraint check if we set the globalconfig to 0 (or less)
date = sql_datetime() - timedelta(seconds=GlobalConfig.RESTRAINT_TIME.getInt())
min_ = GlobalConfig.RESTRAINT_COUNT.getInt()
date = sql_datetime() - timedelta(seconds=GlobalConfig.RESTRAINT_TIME.as_int())
min_ = GlobalConfig.RESTRAINT_COUNT.as_int()
res = []
for v in (
@ -258,13 +258,13 @@ class ServicePool(UUIDModel, TaggingMixin): # type: ignore
from uds.core.util.config import \
GlobalConfig # pylint: disable=import-outside-toplevel
if GlobalConfig.RESTRAINT_TIME.getInt() <= 0:
if GlobalConfig.RESTRAINT_TIME.as_int() <= 0:
return False # Do not perform any restraint check if we set the globalconfig to 0 (or less)
date = typing.cast(datetime, sql_datetime()) - timedelta(seconds=GlobalConfig.RESTRAINT_TIME.getInt())
date = typing.cast(datetime, sql_datetime()) - timedelta(seconds=GlobalConfig.RESTRAINT_TIME.as_int())
if (
self.userServices.filter(state=types.states.State.ERROR, state_date__gt=date).count()
>= GlobalConfig.RESTRAINT_COUNT.getInt()
>= GlobalConfig.RESTRAINT_COUNT.as_int()
):
return True
@ -677,7 +677,7 @@ class ServicePool(UUIDModel, TaggingMixin): # type: ignore
toDelete: 'ServicePool' = kwargs['instance']
logger.debug('Deleting Service Pool %s', toDelete)
toDelete.get_environment().clearRelatedData()
toDelete.get_environment().clean_related_data()
# Clears related logs
log.clear_logs(toDelete)

View File

@ -113,7 +113,7 @@ class ServicePoolPublication(UUIDModel):
"""
Returns an environment valid for the record this object represents
"""
return Environment.getEnvForTableElement(self._meta.verbose_name, self.id) # type: ignore
return Environment.get_environment_for_table(self._meta.verbose_name, self.id) # type: ignore
def get_instance(self) -> 'services.Publication':
"""
@ -210,7 +210,7 @@ class ServicePoolPublication(UUIDModel):
:note: If destroy raises an exception, the deletion is not taken.
"""
to_delete: ServicePoolPublication = kwargs['instance']
to_delete.get_environment().clearRelatedData()
to_delete.get_environment().clean_related_data()
# Delete method is invoked directly by PublicationManager,
# Destroying a publication is not obligatory an 1 step action.

View File

@ -170,7 +170,7 @@ class Transport(ManagedObjectModel, TaggingMixin):
if toDelete.data != '':
s = toDelete.get_instance()
s.destroy()
s.env.clearRelatedData()
s.env.clean_related_data()
# Clears related permissions
clean(toDelete)

View File

@ -174,7 +174,7 @@ class UserService(UUIDModel, properties.PropertiesMixin):
(see related classes uds.core.util.unique_name_generator and uds.core.util.unique_mac_generator)
"""
return Environment.getEnvForTableElement(
return Environment.get_environment_for_table(
self._meta.verbose_name, # type: ignore # pylint: disable=no-member
self.id,
{
@ -636,7 +636,7 @@ class UserService(UUIDModel, properties.PropertiesMixin):
"""
to_delete: 'UserService' = kwargs['instance']
# Clear environment
to_delete.get_environment().clearRelatedData()
to_delete.get_environment().clean_related_data()
# Ensure all sessions are closed (invoke with '' to close all sessions)
# In fact, sessions are going to be deleted also, but we give then
# the oportunity to execute some code before deleting them

View File

@ -153,15 +153,15 @@ class EmailNotifier(messaging.Notifier):
# Now check is valid format
if ':' in hostname:
host, port = validators.validateHostPortPair(hostname)
host, port = validators.validate_host_port(hostname)
self.hostname.value = f'{host}:{port}'
else:
host = self.hostname.as_clean_str()
self.hostname.value = validators.validateFqdn(host)
self.hostname.value = validators.validate_fqdn(host)
# now check from email and to email
self.fromEmail.value = validators.validateEmail(self.fromEmail.value)
self.toEmail.value = validators.validateEmail(self.toEmail.value)
self.fromEmail.value = validators.validate_email(self.fromEmail.value)
self.toEmail.value = validators.validate_email(self.toEmail.value)
# Done

View File

@ -209,8 +209,8 @@ class OVirtProvider(
self._api = None
if values is not None:
self.macsRange.value = validators.validateMacRange(self.macsRange.value)
self.timeout.value = validators.validateTimeout(self.timeout.value)
self.macsRange.value = validators.validate_mac_range(self.macsRange.value)
self.timeout.value = validators.validate_timeout(self.timeout.value)
logger.debug(self.host.value)
def testConnection(self) -> bool:

View File

@ -223,7 +223,7 @@ class OVirtLinkedService(services.Service): # pylint: disable=too-many-public-m
initialized by __init__ method of base class, before invoking this.
"""
if values:
validators.validateBasename(self.baseName.value, self.lenName.num())
validators.validate_basename(self.baseName.value, self.lenName.num())
if int(self.memory.value) < 256 or int(self.memoryGuaranteed.value) < 256:
raise exceptions.validation.ValidationError(
_('The minimum allowed memory is 256 Mb')

View File

@ -135,7 +135,7 @@ class OpenGnsysClient:
) -> typing.Any:
if not FAKE:
return ensureResponseIsValid(
security.secureRequestsSession(verify=self.verifyCert).post(
security.secure_requests_session(verify=self.verifyCert).post(
self._ogUrl(path),
data=json.dumps(data),
headers=self.headers,
@ -149,7 +149,7 @@ class OpenGnsysClient:
def _get(self, path: str, errMsg: typing.Optional[str] = None) -> typing.Any:
if not FAKE:
return ensureResponseIsValid(
security.secureRequestsSession(verify=self.verifyCert).get(
security.secure_requests_session(verify=self.verifyCert).get(
self._ogUrl(path), headers=self.headers, verify=self.verifyCert,
timeout=TIMEOUT,
),
@ -161,7 +161,7 @@ class OpenGnsysClient:
def _delete(self, path: str, errMsg: typing.Optional[str] = None) -> typing.Any:
if not FAKE:
return ensureResponseIsValid(
security.secureRequestsSession(verify=self.verifyCert).delete(
security.secure_requests_session(verify=self.verifyCert).delete(
self._ogUrl(path),
headers=self.headers,
timeout=TIMEOUT,

View File

@ -186,7 +186,7 @@ class OGProvider(ServiceProvider):
self._api = None
if values:
self.timeout.value = validators.validateTimeout(self.timeout.value)
self.timeout.value = validators.validate_timeout(self.timeout.value)
logger.debug('Endpoint: %s', self.endpoint)
try:

View File

@ -158,7 +158,7 @@ class OpenNebulaProvider(ServiceProvider): # pylint: disable=too-many-public-me
self._api = None
if values:
self.timeout.value = validators.validateTimeout(self.timeout.value)
self.timeout.value = validators.validate_timeout(self.timeout.value)
logger.debug('Endpoint: %s', self.endpoint)
@property

View File

@ -149,7 +149,7 @@ class LiveService(services.Service):
if not values:
return
self.baseName.value = validators.validateBasename(
self.baseName.value = validators.validate_basename(
self.baseName.value, length=self.lenName.num()
)

View File

@ -172,7 +172,7 @@ class Client: # pylint: disable=too-many-public-methods
access: typing.Optional[str] = None,
proxies: typing.Optional[collections.abc.MutableMapping[str, str]] = None,
):
self._session = security.secureRequestsSession(verify=VERIFY_SSL)
self._session = security.secure_requests_session(verify=VERIFY_SSL)
if proxies:
self._session.proxies = proxies

View File

@ -232,7 +232,7 @@ class OpenStackProvider(ServiceProvider):
self._api = None
if values is not None:
self.timeout.value = validators.validateTimeout(self.timeout.value)
self.timeout.value = validators.validate_timeout(self.timeout.value)
def api(self, projectId=None, region=None) -> openstack.Client:
projectId = projectId or self.tenant.value or None

View File

@ -220,7 +220,7 @@ class ProviderLegacy(ServiceProvider):
# Just reset _api connection variable
if values is not None:
self.timeout.value = validators.validateTimeout(self.timeout.value)
self.timeout.value = validators.validate_timeout(self.timeout.value)
def api(self, projectId=None, region=None) -> openstack.Client:
proxies = None

View File

@ -210,7 +210,7 @@ class LiveService(services.Service):
initialized by __init__ method of base class, before invoking this.
"""
if values:
validators.validateBasename(self.baseName.value, self.lenName.num())
validators.validate_basename(self.baseName.value, self.lenName.num())
# self.ov.value = self.parent().serialize()
# self.ev.value = self.parent().env.key

View File

@ -79,7 +79,7 @@ class IPServiceBase(services.Service):
if wolurl:
logger.info('Launching WOL: %s', wolurl)
try:
security.secureRequestsSession(verify=verify_ssl).get(wolurl)
security.secure_requests_session(verify=verify_ssl).get(wolurl)
# logger.debug('Result: %s', result)
except Exception as e:
logger.error('Error on WOL: %s', e)

View File

@ -160,7 +160,7 @@ class ProxmoxClient:
def _get(self, path: str) -> typing.Any:
try:
result = security.secureRequestsSession(verify=self._validateCert).get(
result = security.secure_requests_session(verify=self._validateCert).get(
self._getPath(path),
headers=self.headers,
cookies={'PVEAuthCookie': self._ticket},
@ -179,7 +179,7 @@ class ProxmoxClient:
data: typing.Optional[collections.abc.Iterable[tuple[str, str]]] = None,
) -> typing.Any:
try:
result = security.secureRequestsSession(verify=self._validateCert).post(
result = security.secure_requests_session(verify=self._validateCert).post(
self._getPath(path),
data=data, # type: ignore
headers=self.headers,
@ -199,7 +199,7 @@ class ProxmoxClient:
data: typing.Optional[collections.abc.Iterable[tuple[str, str]]] = None,
) -> typing.Any:
try:
result = security.secureRequestsSession(verify=self._validateCert).delete(
result = security.secure_requests_session(verify=self._validateCert).delete(
self._getPath(path),
data=data, # type: ignore
headers=self.headers,
@ -231,7 +231,7 @@ class ProxmoxClient:
return
try:
result = security.secureRequestsSession(verify=self._validateCert).post(
result = security.secure_requests_session(verify=self._validateCert).post(
url=self._getPath('access/ticket'),
data=self._credentials,
headers=self.headers,

View File

@ -190,7 +190,7 @@ class ProxmoxProvider(
self._api = None
if values is not None:
self.timeout.value = validators.validateTimeout(self.timeout.value)
self.timeout.value = validators.validate_timeout(self.timeout.value)
logger.debug(self.host.value)
# All proxmox use same UniqueId generator

View File

@ -194,7 +194,7 @@ class ProxmoxLinkedService(services.Service): # pylint: disable=too-many-public
def initialize(self, values: 'Module.ValuesType') -> None:
if values:
self.baseName.value = validators.validateBasename(
self.baseName.value = validators.validate_basename(
self.baseName.value, length=self.lenName.num()
)
# if int(self.memory.value) < 128:

View File

@ -188,7 +188,7 @@ class XenLinkedService(services.Service): # pylint: disable=too-many-public-met
initialized by __init__ method of base class, before invoking this.
"""
if values:
validators.validateBasename(self.baseName.value, self.lenName.num())
validators.validate_basename(self.baseName.value, self.lenName.num())
if int(self.memory.value) < 256:
raise exceptions.validation.ValidationError(

View File

@ -118,7 +118,7 @@ class TRDPTransport(BaseRDPTransport):
def initialize(self, values: 'Module.ValuesType'):
if values:
validators.validateHostPortPair(values.get('tunnelServer', ''))
validators.validate_host_port(values.get('tunnelServer', ''))
def get_transport_script( # pylint: disable=too-many-locals
self,

View File

@ -87,7 +87,7 @@ class TSPICETransport(BaseSpiceTransport):
def initialize(self, values: 'Module.ValuesType'):
if values:
validators.validateHostPortPair(values.get('tunnelServer', ''))
validators.validate_host_port(values.get('tunnelServer', ''))
def get_transport_script( # pylint: disable=too-many-locals
self,

View File

@ -64,16 +64,17 @@ class URLCustomTransport(transports.Transport):
protocol = types.transports.Protocol.OTHER
group = types.transports.Grouping.DIRECT
urlPattern = gui.TextField(
url_pattern = gui.TextField(
label=_('URL Pattern'),
order=1,
tooltip=_('URL Pattern to open (i.e. https://_IP_/test?user=_USER_'),
default='https://www.udsenterprise.com',
length=256,
required=True,
stored_field_name='urlPattern', # Allows compat with old versions
)
forceNewWindow = gui.CheckBoxField(
force_new_window = gui.CheckBoxField(
label=_('Force new HTML Window'),
order=91,
tooltip=_(
@ -81,6 +82,7 @@ class URLCustomTransport(transports.Transport):
),
default=False,
tab=types.ui.Tab.ADVANCED,
stored_field_name='forceNewWindow', # Allows compat with old versions
)
def initialize(self, values: 'Module.ValuesType'):
@ -88,8 +90,8 @@ class URLCustomTransport(transports.Transport):
return
# Strip spaces
if not (
self.urlPattern.value.startswith('http://')
or self.urlPattern.value.startswith('https://')
self.url_pattern.value.startswith('http://')
or self.url_pattern.value.startswith('https://')
):
raise exceptions.validation.ValidationError(
_('The url must be http or https')
@ -115,11 +117,11 @@ class URLCustomTransport(transports.Transport):
username: str = user.get_username_for_auth()
username, password = userService.process_user_password(username, password)
url = self.urlPattern.value.replace('_IP_', ip).replace('_USER_', username)
url = self.url_pattern.value.replace('_IP_', ip).replace('_USER_', username)
onw = (
'&o_n_w={}'.format(hash(transport.name))
if self.forceNewWindow.as_bool()
if self.force_new_window.as_bool()
else ''
)
return str("{}{}".format(url, onw))

View File

@ -93,7 +93,7 @@ class TX2GOTransport(BaseX2GOTransport):
def initialize(self, values: 'Module.ValuesType'):
if values:
validators.validateHostPortPair(values.get('tunnelServer', ''))
validators.validate_host_port(values.get('tunnelServer', ''))
def get_transport_script(
self,

View File

@ -156,7 +156,7 @@ urlpatterns = [
# WEB API path (not REST api, frontend)
re_path(
r'^uds/webapi/img/transport/(?P<idTrans>[a-zA-Z0-9:-]+)$',
uds.web.views.transportIcon,
uds.web.views.transport_icon,
name='webapi.transportIcon',
),
re_path(
@ -167,12 +167,12 @@ urlpatterns = [
# Enabler and Status action are first processed, and if not match, execute the generic "action" handler
re_path(
r'^uds/webapi/action/(?P<idService>[a-zA-Z0-9:-]+)/enable/(?P<idTransport>[a-zA-Z0-9:-]+)$',
uds.web.views.userServiceEnabler,
uds.web.views.user_service_enabler,
name='webapi.enabler',
),
re_path(
r'^uds/webapi/action/(?P<idService>[a-zA-Z0-9:-]+)/status/(?P<idTransport>[a-zA-Z0-9:-]+)$',
uds.web.views.userServiceStatus,
uds.web.views.user_service_status,
name='webapi.status',
),
re_path(
@ -189,7 +189,7 @@ urlpatterns = [
# Transport own link processor
re_path(
r'^uds/webapi/trans/(?P<idService>[a-zA-Z0-9:-]+)/(?P<idTransport>[a-zA-Z0-9:-]+)$',
uds.web.views.transportOwnLink,
uds.web.views.transport_own_link,
name='TransportOwnLink',
),
# Transport ticket update (for username/password on html5)
@ -207,7 +207,7 @@ urlpatterns = [
# Error message
re_path(
r'^uds/webapi/error/(?P<err>[0-9]+)$',
uds.web.views.errorMessage,
uds.web.views.error_message,
name='webapi.error',
),
# END WEB API

View File

@ -32,7 +32,6 @@ import collections.abc
import logging
import typing
from django.http import HttpResponseRedirect
from django.utils.translation import gettext as _
from uds.core import types
@ -44,8 +43,6 @@ from uds.models import Authenticator
# Not imported at runtime, just for type checking
if typing.TYPE_CHECKING:
from django.http import HttpRequest # pylint: disable=ungrouped-imports
from uds.core.types.requests import ExtendedHttpRequest
from uds.web.forms.LoginForm import LoginForm
@ -83,9 +80,7 @@ def check_login( # pylint: disable=too-many-branches, too-many-statements
if form.is_valid():
os = request.os
try:
authenticator = Authenticator.objects.get(
uuid=process_uuid(form.cleaned_data['authenticator'])
)
authenticator = Authenticator.objects.get(uuid=process_uuid(form.cleaned_data['authenticator']))
except Exception:
authenticator = Authenticator.null()
userName = form.cleaned_data['user']
@ -95,22 +90,14 @@ def check_login( # pylint: disable=too-many-branches, too-many-statements
cache = Cache('auth')
cacheKey = str(authenticator.id) + userName
tries = cache.get(cacheKey) or 0
triesByIp = (
(cache.get(request.ip) or 0) if GlobalConfig.LOGIN_BLOCK_IP.as_bool() else 0
)
maxTries = GlobalConfig.MAX_LOGIN_TRIES.getInt()
triesByIp = (cache.get(request.ip) or 0) if GlobalConfig.LOGIN_BLOCK_IP.as_bool() else 0
maxTries = GlobalConfig.MAX_LOGIN_TRIES.as_int()
# Get instance..
authInstance = authenticator.get_instance()
# Check if user is locked
if (
authInstance.block_user_on_failures is True
and (tries >= maxTries)
or triesByIp >= maxTries
):
if authInstance.block_user_on_failures is True and (tries >= maxTries) or triesByIp >= maxTries:
log_login(request, authenticator, userName, 'Temporarily blocked')
return types.auth.LoginResult(
errstr=_('Too many authentication errrors. User temporarily blocked')
)
return types.auth.LoginResult(errstr=_('Too many authentication errrors. User temporarily blocked'))
# check if authenticator is visible for this requests
if authInstance.is_ip_allowed(request=request) is False:
log_login(
@ -121,14 +108,16 @@ def check_login( # pylint: disable=too-many-branches, too-many-statements
)
return types.auth.LoginResult(errstr=_('Access tried from an unallowed source'))
password = form.cleaned_data['password'] or 'axd56adhg466jasd6q8sadñ€sáé--v' # Random string, in fact, just a placeholder that will not be used :)
password = (
form.cleaned_data['password'] or 'axd56adhg466jasd6q8sadñ€sáé--v'
) # Random string, in fact, just a placeholder that will not be used :)
authResult = authenticate(userName, password, authenticator, request=request)
logger.debug('User: %s', authResult.user)
if authResult.user is None:
logger.debug("Invalid user %s (access denied)", userName)
cache.put(cacheKey, tries + 1, GlobalConfig.LOGIN_BLOCK.getInt())
cache.put(request.ip, triesByIp + 1, GlobalConfig.LOGIN_BLOCK.getInt())
cache.put(cacheKey, tries + 1, GlobalConfig.LOGIN_BLOCK.as_int())
cache.put(request.ip, triesByIp + 1, GlobalConfig.LOGIN_BLOCK.as_int())
log_login(
request,
authenticator,

View File

@ -164,7 +164,7 @@ def uds_js(request: 'ExtendedHttpRequest') -> str:
'os': request.os.os.name,
'image_size': Image.MAX_IMAGE_SIZE,
'experimental_features': GlobalConfig.EXPERIMENTAL_FEATURES.as_bool(),
'reload_time': GlobalConfig.RELOAD_TIME.getInt(True),
'reload_time': GlobalConfig.RELOAD_TIME.as_int(True),
'site_name': GlobalConfig.SITE_NAME.get(),
'site_copyright_info': GlobalConfig.SITE_COPYRIGHT.get(),
'site_copyright_link': GlobalConfig.SITE_COPYRIGHT_LINK.get(),
@ -210,7 +210,7 @@ def uds_js(request: 'ExtendedHttpRequest') -> str:
'launch': request.session.get('launch', ''),
'brand': settings.UDSBRAND if hasattr(settings, 'UDSBRAND') else ''
},
'min_for_filter': GlobalConfig.SITE_FILTER_MIN.getInt(True),
'min_for_filter': GlobalConfig.SITE_FILTER_MIN.as_int(True),
}
info: typing.Optional[collections.abc.MutableMapping] = None
@ -308,7 +308,7 @@ def uds_js(request: 'ExtendedHttpRequest') -> str:
config['urls']['admin'] = reverse('uds.admin.views.index')
config['urls']['rest'] = reverse('REST', kwargs={'arguments': ''})
# Admin config
page_size = GlobalConfig.ADMIN_PAGESIZE.getInt(True)
page_size = GlobalConfig.ADMIN_PAGESIZE.as_int(True)
vnc_userservices = GlobalConfig.ADMIN_ENABLE_USERSERVICES_VNC.as_bool(True)
# Fix page size to razonable usable values
page_size = 10 if page_size < 10 else 100 if page_size > 100 else page_size

View File

@ -55,7 +55,7 @@ if typing.TYPE_CHECKING:
logger = logging.getLogger(__name__)
def errorView(request: 'HttpRequest', errorCode: int) -> HttpResponseRedirect:
def error_view(request: 'HttpRequest', errorCode: int) -> HttpResponseRedirect:
return HttpResponseRedirect(reverse('page.error', kwargs={'err': errorCode}))
@ -66,15 +66,15 @@ def error(request: 'HttpRequest', err: str) -> 'HttpResponse':
return render(request, 'uds/modern/index.html', {})
def exceptionView(request: 'HttpRequest', exception: Exception) -> HttpResponseRedirect:
def exception_view(request: 'HttpRequest', exception: Exception) -> HttpResponseRedirect:
"""
Tries to render an error page with error information
"""
logger.debug(traceback.format_exc())
return errorView(request, types.errors.Error.from_exception(exception))
return error_view(request, types.errors.Error.from_exception(exception))
def errorMessage(request: 'HttpRequest', err: int) -> 'HttpResponse':
def error_message(request: 'HttpRequest', err: int) -> 'HttpResponse':
"""
Error view, responsible of error display
"""

View File

@ -60,7 +60,7 @@ logger = logging.getLogger(__name__)
# pylint: disable=too-many-arguments
def _serviceInfo(
def _service_info(
uuid: str,
is_meta: bool,
name: str,
@ -101,7 +101,7 @@ def _serviceInfo(
# pylint: disable=too-many-locals, too-many-branches, too-many-statements
def getServicesData(
def get_services_data(
request: 'ExtendedHttpRequestWithUser',
) -> dict[str, typing.Any]: # pylint: disable=too-many-locals, too-many-branches, too-many-statements
"""Obtains the service data dictionary will all available services for this request
@ -285,7 +285,7 @@ def getServicesData(
)
services.append(
_serviceInfo(
_service_info(
uuid=meta.uuid,
is_meta=True,
name=macro_info(meta.name),
@ -386,7 +386,7 @@ def getServicesData(
toBeReplacedTxt = ''
services.append(
_serviceInfo(
_service_info(
uuid=sPool.uuid,
is_meta=False,
name=macro_info(sPool.name),
@ -432,7 +432,7 @@ def getServicesData(
}
def enableService(
def enable_service(
request: 'ExtendedHttpRequestWithUser', idService: str, idTransport: str
) -> collections.abc.Mapping[str, typing.Any]:
# Maybe we could even protect this even more by limiting referer to own server /? (just a meditation..)

View File

@ -32,13 +32,13 @@
import logging
# from .login import login, logout
from uds.web.util.errors import error, errorMessage
from uds.web.util.errors import error, error_message
from .service import (
transportOwnLink,
transportIcon,
userServiceEnabler,
userServiceStatus,
serviceImage,
transport_own_link,
transport_icon,
user_service_enabler,
user_service_status,
service_image,
action,
)
from .auth import auth_callback, auth_callback_stage2, auth_info, ticket_auth, custom_auth

View File

@ -95,7 +95,7 @@ def auth_callback(request: HttpRequest, authName: str) -> HttpResponse:
return HttpResponseRedirect(reverse('page.auth.callback_stage2', args=[ticket]))
except Exception as e:
# No authenticator found...
return errors.exceptionView(request, e)
return errors.exception_view(request, e)
def auth_callback_stage2(request: 'ExtendedHttpRequestWithUser', ticketId: str) -> HttpResponse:
@ -138,7 +138,7 @@ def auth_callback_stage2(request: 'ExtendedHttpRequestWithUser', ticketId: str)
)
except Exception as e:
logger.exception('authCallback')
return errors.exceptionView(request, e)
return errors.exception_view(request, e)
@csrf_exempt
@ -278,15 +278,15 @@ def ticket_auth(
uds_cookie(request, response, True)
return response
except ServiceNotReadyError:
return errors.errorView(request, types.errors.Error.SERVICE_NOT_READY)
return errors.error_view(request, types.errors.Error.SERVICE_NOT_READY)
except TicketStore.InvalidTicket:
return errors.errorView(request, types.errors.Error.RELOAD_NOT_SUPPORTED)
return errors.error_view(request, types.errors.Error.RELOAD_NOT_SUPPORTED)
except Authenticator.DoesNotExist:
logger.error('Ticket has an non existing authenticator')
return errors.errorView(request, types.errors.Error.ACCESS_DENIED)
return errors.error_view(request, types.errors.Error.ACCESS_DENIED)
except ServicePool.DoesNotExist: # type: ignore # DoesNotExist is different for each model
logger.error('Ticket has an invalid Service Pool')
return errors.errorView(request, types.errors.Error.SERVICE_NOT_FOUND)
return errors.error_view(request, types.errors.Error.SERVICE_NOT_FOUND)
except Exception as e:
logger.exception('Exception')
return errors.exceptionView(request, e)
return errors.exception_view(request, e)

View File

@ -56,7 +56,7 @@ from uds.web.forms.LoginForm import LoginForm
from uds.web.forms.MFAForm import MFAForm
from uds.web.util import configjs, errors
from uds.web.util.authentication import check_login
from uds.web.util.services import getServicesData
from uds.web.util.services import get_services_data
logger = logging.getLogger(__name__)
@ -132,7 +132,7 @@ def login(request: types.requests.ExtendedHttpRequest, tag: typing.Optional[str]
) # On failure, wait a bit if not localhost (random wait)
# If error is numeric, redirect...
if loginResult.errid:
return errors.errorView(request, loginResult.errid)
return errors.error_view(request, loginResult.errid)
# Error, set error on session for process for js
request.session['errors'] = [loginResult.errstr]
@ -162,7 +162,7 @@ def js(request: types.requests.ExtendedHttpRequest) -> HttpResponse:
@never_cache
@auth.deny_non_authenticated # web_login_required not used here because this is not a web page, but js
def services_data_json(request: types.requests.ExtendedHttpRequestWithUser) -> HttpResponse:
return JsonResponse(getServicesData(request))
return JsonResponse(get_services_data(request))
# The MFA page does not needs CSRF token, so we disable it
@ -232,7 +232,7 @@ def mfa(
request.user.manager.name,
mfa_provider.name,
)
return errors.errorView(request, types.errors.Error.ACCESS_DENIED)
return errors.error_view(request, types.errors.Error.ACCESS_DENIED)
# None, the authenticator will decide what to do if mfa_identifier is empty
tries = request.session.get('mfa_tries', 0)
@ -277,12 +277,12 @@ def mfa(
logger.error('MFA error: %s', e)
tries += 1
request.session['mfa_tries'] = tries
if tries >= config.GlobalConfig.MAX_LOGIN_TRIES.getInt():
if tries >= config.GlobalConfig.MAX_LOGIN_TRIES.as_int():
# Clean session
request.session.flush()
# Too many tries, redirect to login error page
return errors.errorView(request, types.errors.Error.ACCESS_DENIED)
return errors.errorView(request, types.errors.Error.INVALID_MFA_CODE)
return errors.error_view(request, types.errors.Error.ACCESS_DENIED)
return errors.error_view(request, types.errors.Error.INVALID_MFA_CODE)
else:
pass # Will render again the page
else:
@ -306,7 +306,7 @@ def mfa(
request.session['mfa_start_time'] = now
except Exception as e:
logger.error('Error processing MFA: %s', e)
return errors.errorView(request, types.errors.Error.UNKNOWN_ERROR)
return errors.error_view(request, types.errors.Error.UNKNOWN_ERROR)
# Compose a nice "XX years, XX months, XX days, XX hours, XX minutes" string from mfaProvider.remember_device
remember_device = ''

View File

@ -56,7 +56,7 @@ logger = logging.getLogger(__name__)
@web_login_required(admin=False)
def transportOwnLink(request: 'ExtendedHttpRequestWithUser', idService: str, idTransport: str):
def transport_own_link(request: 'ExtendedHttpRequestWithUser', idService: str, idTransport: str):
response: collections.abc.MutableMapping[str, typing.Any] = {}
# If userService is not owned by user, will raise an exception
@ -89,7 +89,7 @@ def transportOwnLink(request: 'ExtendedHttpRequestWithUser', idService: str, idT
# pylint: disable=unused-argument
@cache_page(3600, key_prefix='img', cache='memory')
def transportIcon(request: 'ExtendedHttpRequest', idTrans: str) -> HttpResponse:
def transport_icon(request: 'ExtendedHttpRequest', idTrans: str) -> HttpResponse:
try:
transport: Transport
if idTrans[:6] == 'LABEL:':
@ -105,7 +105,7 @@ def transportIcon(request: 'ExtendedHttpRequest', idTrans: str) -> HttpResponse:
@cache_page(3600, key_prefix='img', cache='memory')
def serviceImage(request: 'ExtendedHttpRequest', idImage: str) -> HttpResponse:
def service_image(request: 'ExtendedHttpRequest', idImage: str) -> HttpResponse:
try:
icon = Image.objects.get(uuid=process_uuid(idImage))
return icon.image_as_response()
@ -121,11 +121,11 @@ def serviceImage(request: 'ExtendedHttpRequest', idImage: str) -> HttpResponse:
@web_login_required(admin=False)
@never_cache
def userServiceEnabler(
def user_service_enabler(
request: 'ExtendedHttpRequestWithUser', idService: str, idTransport: str
) -> HttpResponse:
return HttpResponse(
json.dumps(services.enableService(request, idService=idService, idTransport=idTransport)),
json.dumps(services.enable_service(request, idService=idService, idTransport=idTransport)),
content_type='application/json',
)
@ -141,7 +141,7 @@ def closer(request: 'ExtendedHttpRequest') -> HttpResponse:
@web_login_required(admin=False)
@never_cache
def userServiceStatus(request: 'ExtendedHttpRequestWithUser', idService: str, idTransport: str) -> HttpResponse:
def user_service_status(request: 'ExtendedHttpRequestWithUser', idService: str, idTransport: str) -> HttpResponse:
'''
Returns;
'running' if not ready
@ -222,7 +222,7 @@ def action(request: 'ExtendedHttpRequestWithUser', idService: str, actionString:
if rebuild:
# Rebuild services data, but return only "this" service
for v in services.getServicesData(request)['services']:
for v in services.get_services_data(request)['services']:
if v['id'] == idService:
response = v
break

View File

@ -41,7 +41,12 @@ from ...utils.test import UDSTestCase
from uds.core import types, consts
from uds.core.ui.user_interface import gui
from ...fixtures.user_interface import TestingUserInterface, DEFAULTS
from ...fixtures.user_interface import (
TestingUserInterface,
DEFAULTS,
TestingUserInterfaceFieldName,
TestingUserInterfaceFieldNameOrig,
)
logger = logging.getLogger(__name__)
@ -168,3 +173,12 @@ class UserinterfaceTest(UDSTestCase):
self.assertEqual(ui, ui2)
self.ensure_values_fine(ui2)
def test_stored_field_name(self):
# This test is to ensure that new serialized data can be loaded
ui = TestingUserInterfaceFieldNameOrig()
data = ui.serialize_fields()
ui2 = TestingUserInterfaceFieldName()
ui2.unserialize_fields(data)
self.assertEqual(ui.strField.value, ui2.str_field.value)

View File

@ -30,8 +30,10 @@
@author: Adolfo Gómez, dkmaster at dkmon dot com
"""
import base64
from ...utils.test import UDSTestCase
from uds.core.util.storage import Storage
from uds.core.util import storage
from uds import models
UNICODE_CHARS = 'ñöçóá^(pípè)'
UNICODE_CHARS_2 = 'ñöçóá^(€íöè)'
@ -40,32 +42,84 @@ VALUE_1 = ['unicode', b'string', {'a': 1, 'b': 2.0}]
class StorageTest(UDSTestCase):
def test_storage(self):
storage = Storage(UNICODE_CHARS)
strg = storage.Storage(UNICODE_CHARS)
storage.put(UNICODE_CHARS, b'chars')
storage.save_to_db('saveData', UNICODE_CHARS, UNICODE_CHARS)
storage.save_to_db('saveData2', UNICODE_CHARS_2, UNICODE_CHARS)
storage.save_to_db('saveData3', UNICODE_CHARS, 'attribute')
storage.save_to_db('saveData4', UNICODE_CHARS_2, 'attribute')
storage.put(b'key', UNICODE_CHARS)
storage.put(UNICODE_CHARS_2, UNICODE_CHARS)
strg.put(UNICODE_CHARS, b'chars')
strg.save_to_db('saveData', UNICODE_CHARS, UNICODE_CHARS)
strg.save_to_db('saveData2', UNICODE_CHARS_2, UNICODE_CHARS)
strg.save_to_db('saveData3', UNICODE_CHARS, 'attribute')
strg.save_to_db('saveData4', UNICODE_CHARS_2, 'attribute')
strg.put(b'key', UNICODE_CHARS)
strg.put(UNICODE_CHARS_2, UNICODE_CHARS)
storage.put_pickle('pickle', VALUE_1)
strg.put_pickle('pickle', VALUE_1)
self.assertEqual(storage.get(UNICODE_CHARS), u'chars') # Always returns unicod
self.assertEqual(storage.read_from_db('saveData'), UNICODE_CHARS)
self.assertEqual(storage.read_from_db('saveData2'), UNICODE_CHARS_2)
self.assertEqual(storage.get(b'key'), UNICODE_CHARS)
self.assertEqual(storage.get(UNICODE_CHARS_2), UNICODE_CHARS)
self.assertEqual(storage.get_unpickle('pickle'), VALUE_1)
self.assertEqual(strg.get(UNICODE_CHARS), u'chars') # Always returns unicod
self.assertEqual(strg.read_from_db('saveData'), UNICODE_CHARS)
self.assertEqual(strg.read_from_db('saveData2'), UNICODE_CHARS_2)
self.assertEqual(strg.get(b'key'), UNICODE_CHARS)
self.assertEqual(strg.get(UNICODE_CHARS_2), UNICODE_CHARS)
self.assertEqual(strg.get_unpickle('pickle'), VALUE_1)
self.assertEqual(len(list(storage.search_by_attr1(UNICODE_CHARS))), 2)
self.assertEqual(len(list(storage.search_by_attr1('attribute'))), 2)
self.assertEqual(len(list(strg.search_by_attr1(UNICODE_CHARS))), 2)
self.assertEqual(len(list(strg.search_by_attr1('attribute'))), 2)
storage.remove(UNICODE_CHARS)
storage.remove(b'key')
storage.remove('pickle')
strg.remove(UNICODE_CHARS)
strg.remove(b'key')
strg.remove('pickle')
self.assertIsNone(storage.get(UNICODE_CHARS))
self.assertIsNone(storage.get(b'key'))
self.assertIsNone(storage.get_unpickle('pickle'))
self.assertIsNone(strg.get(UNICODE_CHARS))
self.assertIsNone(strg.get(b'key'))
self.assertIsNone(strg.get_unpickle('pickle'))
def test_storage_as_dict(self):
strg = storage.Storage(UNICODE_CHARS)
strg.put(UNICODE_CHARS, 'chars')
with strg.as_dict() as d:
d['test_key'] = UNICODE_CHARS_2
# Assert that UNICODE_CHARS is in the dict
self.assertEqual(d[UNICODE_CHARS], 'chars')
self.assertEqual(d['test_key'], UNICODE_CHARS_2)
# The values set inside the "with" are not available "outside"
# because the format is not compatible (with the dict, the values are stored as a tuple, with the original key stored
# and with old format, only the value is stored
def test_old_storage_compat(self):
models.Storage.objects.create(
owner=UNICODE_CHARS,
key=storage._old_calculate_key(UNICODE_CHARS.encode(), UNICODE_CHARS.encode()),
data=base64.b64encode((UNICODE_CHARS * 5).encode()).decode(),
)
strg = storage.Storage(UNICODE_CHARS)
# Ensure that key is found
self.assertEqual(strg.get(UNICODE_CHARS), UNICODE_CHARS * 5)
# And that now, the key is stored in the new format
# If not exists, will raise an exception
models.Storage.objects.get(
owner=UNICODE_CHARS,
key=storage._calculate_key(UNICODE_CHARS.encode(), UNICODE_CHARS.encode()),
)
def test_storage_as_dict_old(self):
models.Storage.objects.create(
owner=UNICODE_CHARS,
key=storage._old_calculate_key(UNICODE_CHARS.encode(), UNICODE_CHARS.encode()),
data=base64.b64encode((UNICODE_CHARS * 5).encode()).decode(),
)
strg = storage.Storage(UNICODE_CHARS)
with strg.as_dict() as d:
# Assert that UNICODE_CHARS is in the dict (stored with old format)
self.assertEqual(d[UNICODE_CHARS], UNICODE_CHARS * 5)
# And that now, the key is stored in the new format
# If not exists, will raise an exception
models.Storage.objects.get(
owner=UNICODE_CHARS,
key=storage._calculate_key(UNICODE_CHARS.encode(), UNICODE_CHARS.encode()),
)

View File

@ -50,6 +50,7 @@ DEFAULTS: dict[str, typing.Any] = {
'info_field': 'Default value info',
}
class TestingUserInterface(UserInterface):
str_field = gui.TextField(
label='Text Field',
@ -57,6 +58,7 @@ class TestingUserInterface(UserInterface):
tooltip='This is a text field',
required=True,
default=typing.cast(str, DEFAULTS['str_field']),
stored_field_name='strField',
)
str_auto_field = gui.TextAutocompleteField(
label='Text Autocomplete Field',
@ -136,7 +138,6 @@ class TestingUserInterface(UserInterface):
default=typing.cast(str, DEFAULTS['info_field']),
)
# Equals operator, to speed up tests writing
def __eq__(self, other: typing.Any) -> bool:
if not isinstance(other, TestingUserInterface):
@ -155,3 +156,24 @@ class TestingUserInterface(UserInterface):
and self.date_field.value == other.date_field.value
# Info field is not compared, because it is not serialized
)
class TestingUserInterfaceFieldNameOrig(UserInterface):
strField = gui.TextField(
label='Text Field',
order=0,
tooltip='This is a text field',
required=True,
default=typing.cast(str, DEFAULTS['str_field']),
)
class TestingUserInterfaceFieldName(UserInterface):
str_field = gui.TextField(
label='Text Field',
order=0,
tooltip='This is a text field',
required=True,
default='', # Will be loaded from orig
stored_field_name='strField',
)

View File

@ -83,7 +83,7 @@ class TestGetServicesData(UDSTransactionTestCase):
)[0].deployed_service
)
data = services.getServicesData(self.request)
data = services.get_services_data(self.request)
now = datetime.datetime.now()
# Will return this:
# return {
@ -184,7 +184,7 @@ class TestGetServicesData(UDSTransactionTestCase):
)
data = services.getServicesData(self.request)
data = services.get_services_data(self.request)
now = datetime.datetime.now()
result_services: typing.Final[
@ -239,7 +239,7 @@ class TestGetServicesData(UDSTransactionTestCase):
)
data = services.getServicesData(self.request)
data = services.get_services_data(self.request)
now = datetime.datetime.now()
result_services: typing.Final[