1
0
mirror of https://github.com/dkmstr/openuds.git synced 2025-01-10 01:17:59 +03:00

Fixing OTP with MFA on some cases...

This commit is contained in:
Adolfo Gómez García 2023-10-11 21:58:38 +02:00
parent 941d07b4e8
commit d21c371bf7
No known key found for this signature in database
GPG Key ID: DD1ABF20724CDA23
2 changed files with 32 additions and 22 deletions

View File

@ -2,6 +2,7 @@ import io
import logging
import enum
import typing
import string
from pyrad.client import Client
from pyrad.dictionary import Dictionary
@ -105,16 +106,12 @@ class RadiusClient:
def extractAccessChallenge(self, reply: pyrad.packet.AuthPacket) -> RadiusResult:
return RadiusResult(
pwd=RadiusStates.CORRECT,
replyMessage=typing.cast(
typing.List[bytes], reply.get('Reply-Message') or ['']
)[0],
replyMessage=typing.cast(typing.List[bytes], reply.get('Reply-Message') or [''])[0],
state=typing.cast(typing.List[bytes], reply.get('State') or [b''])[0],
otp_needed=RadiusStates.NEEDED,
)
def sendAccessRequest(
self, username: str, password: str, **kwargs
) -> pyrad.packet.AuthPacket:
def sendAccessRequest(self, username: str, password: str, **kwargs) -> pyrad.packet.AuthPacket:
req: pyrad.packet.AuthPacket = self.radiusServer.CreateAuthPacket(
code=pyrad.packet.AccessRequest,
User_Name=username,
@ -135,7 +132,7 @@ class RadiusClient:
) -> typing.Tuple[typing.List[str], str]:
reply = self.sendAccessRequest(username, password)
if reply.code not in (pyrad.packet.AccessAccept, pyrad.packet.AccessChallenge):
if reply.code not in (pyrad.packet.AccessAccept, pyrad.packet.AccessChallenge):
raise RadiusAuthenticationError('Access denied')
# User accepted, extract groups...
@ -178,17 +175,19 @@ class RadiusClient:
# user/pwd rejected
return RadiusResult(
pwd=RadiusStates.INCORRECT,
state=typing.cast(typing.List[bytes], reply.get('State') or [b''])[0],
)
def challenge_only(
self, username: str, otp: str, state: bytes = b'0000000000000000'
) -> RadiusResult:
def challenge_only(self, username: str, otp: str, state: bytes = b'0000000000000000') -> RadiusResult:
# clean otp code
otp = ''.join([x for x in otp if x in '0123456789'])
otp = ''.join([x for x in otp if x in string.digits])
logger.debug('Sending AccessChallenge request wit otp [%s]', otp)
reply = self.sendAccessRequest(username, otp, State=state)
logger.debug('Received AccessChallenge reply: %s', reply)
# correct OTP challenge
if reply.code == pyrad.packet.AccessAccept:
return RadiusResult(
@ -198,26 +197,25 @@ class RadiusClient:
# incorrect OTP challenge
return RadiusResult(
otp=RadiusStates.INCORRECT,
state=typing.cast(typing.List[bytes], reply.get('State') or [b''])[0],
)
def authenticate_and_challenge(
self, username: str, password: str, otp: str
) -> RadiusResult:
def authenticate_and_challenge(self, username: str, password: str, otp: str) -> RadiusResult:
reply = self.sendAccessRequest(username, password)
if reply.code == pyrad.packet.AccessChallenge:
state = typing.cast(typing.List[bytes], reply.get('State') or [b''])[0]
# replyMessage = typing.cast(typing.List[bytes], reply.get('Reply-Message') or [''])[0]
return self.challenge_only(username, otp, state=state)
# user/pwd accepted: but this user does not have challenge data
# we should not be here...
if reply.code == pyrad.packet.AccessAccept:
logger.warning(
"Radius OTP error: cheking for OTP for not needed user [%s]", username
)
logger.warning("Radius OTP error: cheking for OTP for not needed user [%s]", username)
return RadiusResult(
pwd=RadiusStates.CORRECT,
otp_needed=RadiusStates.NOT_NEEDED,
state=typing.cast(typing.List[bytes], reply.get('State') or [b''])[0],
)
# TODO: accept more AccessChallenge authentications (as RFC says)
@ -226,14 +224,16 @@ class RadiusClient:
return RadiusResult()
def authenticate_challenge(
self, username: str, password: str = '', otp: str = '', state: bytes = b'' # nosec: not a password, just an empty string
self, username: str, password: str = '', otp: str = '', state: typing.Optional[bytes] = None
) -> RadiusResult:
'''
wrapper for above 3 functions: authenticate_only, challenge_only, authenticate_and_challenge
calls wrapped functions based on passed input values: (pwd/otp/state)
'''
# clean input data
otp = ''.join([x for x in otp if x in '0123456789'])
# Keep only numbers in otp
state = state or b'0000000000000000'
otp = ''.join([x for x in otp if x in string.digits])
username = username.strip()
password = password.strip()
state = state.strip()
@ -245,6 +245,6 @@ class RadiusClient:
return self.authenticate_only(username, password)
if otp and not password:
# check only otp with static/invented state. allow this ?
return self.challenge_only(username, otp, state=b'0000000000000000')
return self.challenge_only(username, otp, state=state)
# otp and password
return self.authenticate_and_challenge(username, password, otp)

View File

@ -223,6 +223,9 @@ class RadiusOTP(mfas.MFA):
)
return self.checkResult(self.allowLoginWithoutMFA.value, request)
# Store state for later use, related to this user
request.session['radius_state'] = auth_reply.state or b''
# correct password and otp_needed
return mfas.MFA.RESULT.OK
@ -251,7 +254,14 @@ class RadiusOTP(mfas.MFA):
web_pwd = webPassword(request)
try:
connection = self.radiusClient()
auth_reply = connection.authenticate_challenge(username, password=web_pwd, otp=code)
state = request.session.get('radius_state', b'')
if state:
# Remove state from session
del request.session['radius_state']
# Use state to validate
auth_reply = connection.authenticate_challenge(username, otp=code, state=state)
else: # No state, so full authentication
auth_reply = connection.authenticate_challenge(username, password=web_pwd, otp=code)
except Exception as e:
logger.error("Exception found connecting to Radius OTP %s: %s", e.__class__, e)
if mfas.LoginAllowed.checkAction(self.responseErrorAction.value, request, self.networks.value):