1
0
mirror of https://github.com/dkmstr/openuds.git synced 2025-01-08 21:18:00 +03:00

Refactor config update method to handle non-existing config values and fixed Radius to allow stripping the domain part from the username

This commit is contained in:
Adolfo Gómez García 2024-10-17 18:19:19 +02:00
parent a318e8ca2b
commit 14a58dc423
No known key found for this signature in database
GPG Key ID: DD1ABF20724CDA23
5 changed files with 57 additions and 25 deletions

View File

@ -48,12 +48,17 @@ class Config(Handler):
return CfgConfig.get_config_values(self.is_admin()) return CfgConfig.get_config_values(self.is_admin())
def put(self) -> typing.Any: def put(self) -> typing.Any:
for section, section_dict in typing.cast( for section, section_dict in typing.cast(dict[str, dict[str, dict[str, str]]], self._params).items():
dict[str, dict[str, dict[str, str]]], self._params
).items():
for key, vals in section_dict.items(): for key, vals in section_dict.items():
logger.info( config = CfgConfig.update(CfgConfig.SectionType.from_str(section), key, vals['value'])
'Updating config value %s.%s to %s by %s', section, key, vals['value'], self._user.name if config is not None:
) logger.info(
CfgConfig.update(CfgConfig.SectionType.from_str(section), key, vals['value']) 'Updating config value %s.%s to %s by %s',
section,
key,
vals['value'] if not config.is_password else '********',
self._user.name,
)
else:
logger.error('Non existing config value %s.%s to %s by %s', section, key, vals['value'], self._user.name)
return 'done' return 'done'

View File

@ -233,6 +233,10 @@ class Config:
def get_type(self) -> int: def get_type(self) -> int:
return self._type return self._type
@property
def is_password(self) -> bool:
return self._type == Config.FieldType.PASSWORD
def get_params(self) -> typing.Any: def get_params(self) -> typing.Any:
return Config._config_params.get(self._section.name() + self._key, None) return Config._config_params.get(self._section.name() + self._key, None)
@ -322,7 +326,7 @@ class Config:
yield val yield val
@staticmethod @staticmethod
def update(section: 'Config.SectionType', key: str, value: str, check_type: bool = False) -> bool: def update(section: 'Config.SectionType', key: str, value: str, check_type: bool = False) -> 'None|Config.Value':
# If cfg value does not exists, simply ignore request # If cfg value does not exists, simply ignore request
try: try:
cfg: DBConfig = DBConfig.objects.get(section=section, key=key) cfg: DBConfig = DBConfig.objects.get(section=section, key=key)
@ -330,17 +334,17 @@ class Config:
Config.FieldType.READ, Config.FieldType.READ,
Config.FieldType.HIDDEN, Config.FieldType.HIDDEN,
): ):
return False # Skip non writable elements return None # Skip non writable elements
if cfg.field_type == Config.FieldType.PASSWORD.value: if cfg.field_type == Config.FieldType.PASSWORD.value:
value = CryptoManager().hash(value) value = CryptoManager.manager().hash(value)
cfg.value = value cfg.value = value
cfg.save() cfg.save(update_fields=['value'])
logger.debug('Updated value for %s.%s to %s', section, key, value) logger.debug('Updated value for %s.%s to %s', section, key, value)
return True return Config.section(section).value(key, value, type=Config.FieldType.from_int(cfg.field_type))
except Exception: except Exception:
return False return None
@staticmethod @staticmethod
def removed(section: 'Config.SectionType', key: str) -> None: def removed(section: 'Config.SectionType', key: str) -> None:

View File

@ -61,9 +61,7 @@ class Command(BaseCommand):
mod, name = Config.SectionType.from_str(first[0]), first[1] mod, name = Config.SectionType.from_str(first[0]), first[1]
else: else:
mod, name = Config.SectionType.GLOBAL, first[0] mod, name = Config.SectionType.GLOBAL, first[0]
if ( if Config.update(mod, name, value) is None:
Config.update(mod, name, value) is False
): # If not exists, try to store value without any special parameters
kwargs = {} kwargs = {}
if options['password']: if options['password']:
kwargs['type'] = Config.FieldType.PASSWORD kwargs['type'] = Config.FieldType.PASSWORD

View File

@ -35,7 +35,7 @@ import logging
from django.utils.translation import gettext_noop as _, gettext from django.utils.translation import gettext_noop as _, gettext
from uds.core import mfas, exceptions from uds.core import mfas, exceptions, types
from uds.core.ui import gui from uds.core.ui import gui
from uds.auths.Radius import client from uds.auths.Radius import client
@ -111,6 +111,18 @@ class RadiusOTP(mfas.MFA):
login_without_mfa_policy_networks = fields.login_without_mfa_policy_networks_field() login_without_mfa_policy_networks = fields.login_without_mfa_policy_networks_field()
allow_skip_mfa_from_networks = fields.allow_skip_mfa_from_networks_field() allow_skip_mfa_from_networks = fields.allow_skip_mfa_from_networks_field()
send_just_username = gui.CheckBoxField(
label=_('Send only username (without domain) to radius server'),
order=55,
default=False,
tooltip=_(
'If unchecked, username will be sent as is to radius server. \n'
'If checked, domain part will be removed from username before sending it to radius server.'
),
required=False,
tab=types.ui.Tab.CONFIG,
)
def radius_client(self) -> client.RadiusClient: def radius_client(self) -> client.RadiusClient:
"""Return a new radius client .""" """Return a new radius client ."""
return client.RadiusClient( return client.RadiusClient(
@ -158,9 +170,14 @@ class RadiusOTP(mfas.MFA):
# if we are in a "all-users-otp" policy, avoid this step and go directly to ask for OTP # if we are in a "all-users-otp" policy, avoid this step and go directly to ask for OTP
if self.all_users_otp.value: if self.all_users_otp.value:
return mfas.MFA.RESULT.OK return mfas.MFA.RESULT.OK
# The identifier has preference over username, but normally will be empty
username = identifier or username username = identifier or username
# Remove domain part from username if needed
if self.send_just_username.value:
username = username.strip().split('@')[0].split('\\')[-1]
web_pwd = web_password(request) web_pwd = web_password(request)
try: try:
connection = self.radius_client() connection = self.radius_client()
@ -220,8 +237,14 @@ class RadiusOTP(mfas.MFA):
regenerate a new State after a wrong sent OTP code regenerate a new State after a wrong sent OTP code
slightly less efficient but a lot simpler slightly less efficient but a lot simpler
''' '''
# The identifier has preference over username, but normally will be empty
# This allows derived class to "alter" the username if needed
username = identifier or username username = identifier or username
# Remove domain part from username if needed
if self.send_just_username.value:
username = username.strip().split('@')[0].split('\\')[-1]
try: try:
err = _('Invalid OTP code') err = _('Invalid OTP code')

View File

@ -420,13 +420,15 @@ class SMSMFA(mfas.MFA):
phone: str, phone: str,
) -> mfas.MFA.RESULT: ) -> mfas.MFA.RESULT:
url = self.build_sms_url(userid, username, code, phone) url = self.build_sms_url(userid, username, code, phone)
if self.http_method.value == 'GET': match self.http_method.value:
return self._send_sms_using_get(request, userid, username, url) case 'GET':
if self.http_method.value == 'POST': return self._send_sms_using_get(request, userid, username, url)
return self._send_sms_using_post(request, userid, username, url, code, phone) case 'POST':
if self.http_method.value == 'PUT': return self._send_sms_using_post(request, userid, username, url, code, phone)
return self._send_sms_using_put(request, userid, username, url, code, phone) case 'PUT':
raise Exception('Unknown SMS sending method') return self._send_sms_using_put(request, userid, username, url, code, phone)
case _:
raise Exception('Unknown SMS sending method')
def label(self) -> str: def label(self) -> str:
return gettext('MFA Code') return gettext('MFA Code')