mirror of
https://github.com/ansible/awx.git
synced 2024-11-01 08:21:15 +03:00
Merge pull request #1116 from bmduffy/bugfix-pem-validation
[bugfix-pem-validation]
This commit is contained in:
commit
7d51b3b6b6
@ -193,3 +193,7 @@ L5Hj+B02+FAiz8zVGumbVykvPtzgTb0E+0rJKNO0/EgGqWsk/oC0
|
|||||||
"""
|
"""
|
||||||
|
|
||||||
TEST_SSH_KEY_DATA_UNLOCK = 'unlockme'
|
TEST_SSH_KEY_DATA_UNLOCK = 'unlockme'
|
||||||
|
|
||||||
|
TEST_CATTED_SSH_KEY_DATA = """
|
||||||
|
-----BEGIN RSA PRIVATE KEY----- MIIEogIBAAKCAQEA1T4za6qBbHxFpN5f9eFvA74MFjrsjcp1uvzOaE23AYKMDEJg hJ6dqQ7GwHLNIeIeumqDFmODauIzrgSDJTT5+NG30Rr+rRi0zDkrkBAj/AtA+SaV hbzqB6ZSd7LaMly9XAc+82OKlNpuWS9hPmFaSShzDTXRu5RRyvm4NDCAOGDu5hyV R2pV/ffKDNfNkChnqzvRRW9laQcVmliZhlTGn7nPZ+JbjpwEy0nwW+4zoAiEvwnT 52N4xTqIcYOnXtGiaf13dh7FkUfYmS0tzF3+h8QRKwtIm4y+sq84R/kr79/0t5aR UpJynNrECajzmArpL4IjXKTPIyUpTKirJgGnCwIDAQABAoIBAC6bbbm2hpsjfkVO pUKkhxMWUqX5MwK6oYjBAIwjkEAwPFPhnh7eXC87H42oidVCCt1LsmMOVQbjcdAz BEb5kTkk/Twi3k8O+1U3maHfJT5NZ2INYNjeNXh+jb/Dw5UGWAzpOIUR2JQ4Oa4c gPCVbppW0O6uOKz6+fWXJv+hKiUoBCC0TiY52iseHJdUOaKNxYRD2IyIzCAxFSd5 tZRaARIYDsugXp3E/TdbsVWA7bmjIBOXq+SquTrlB8x7j3B7+Pi09nAJ2U/uV4PH E+/2Fl009ywfmqancvnhwnz+GQ5jjP+gTfghJfbO+Z6M346rS0Vw+osrPgfyudNH lCswHOECgYEA/Cfq25gDP07wo6+wYWbx6LIzj/SSZy/Ux9P8zghQfoZiPoaq7BQB PAzwLNt7JWST8U11LZA8/wo6ch+HSTMk+m5ieVuru2cHxTDqeNlh94eCrNwPJ5ay A5U6LxAuSCTAzp+rv6KQUx1JcKSEHuh+nRYTKvUDE6iA6YtPLO96lLUCgYEA2H5r OPX2M4w1Q9zjol77lplbPRdczXNd0PIzhy8Z2ID65qvmr1nxBG4f2H96ykW8CKLX NvSXreNZ1BhOXc/3Hv+3mm46iitB33gDX4mlV4Jyo/w5IWhUKRyoW6qXquFFsScx RzTrx/9M+aZeRRLdsBk27HavFEg6jrbQ0SleZL8CgYAaM6Op8d/UgkVrHOR9Go9k mK/W85kK8+NuaE7Ksf57R0eKK8AzC9kc/lMuthfTyOG+n0ff1i8gaVWtai1Ko+/h vfqplacAsDIUgYK70AroB8LCZ5ODj5sr2CPVpB7LDFakod7c6O2KVW6+L7oy5AHU HOkc+5y4PDg5DGrLxo68SQKBgAlGoWF3aG0c/MtDk51JZI43U+lyLs++ua5SMlMA eaMFI7rucpvgxqrh7Qthqukvw7a7A22fXUBeFWM5B2KNnpD9c+hyAKAa6l+gzMQz KZpuRGsyS2BbEAAS8kO7M3Rm4o2MmFfstI2FKs8nibJ79HOvIONQ0n+T+K5Utu2/ UAQRAoGAFB4fiIyQ0nYzCf18Z4Wvi/qeIOW+UoBonIN3y1h4wruBywINHxFMHx4a VImJ6R09hoJ9D3Mxli3xF/8JIjfTG5fBSGrGnuofl14d/XtRDXbT2uhVXrIkeLL/ ojODwwEx0VhxIRUEjPTvEl6AFSRRcBp3KKzQ/cu7vafY6GTlOUI= -----END RSA PRIVATE KEY-----
|
||||||
|
"""
|
||||||
|
@ -12,12 +12,53 @@ from awx.main.tests.data.ssh import (
|
|||||||
TEST_OPENSSH_KEY_DATA,
|
TEST_OPENSSH_KEY_DATA,
|
||||||
TEST_OPENSSH_KEY_DATA_LOCKED,
|
TEST_OPENSSH_KEY_DATA_LOCKED,
|
||||||
TEST_SSH_CERT_KEY,
|
TEST_SSH_CERT_KEY,
|
||||||
|
TEST_CATTED_SSH_KEY_DATA,
|
||||||
)
|
)
|
||||||
from rest_framework.serializers import ValidationError as RestValidationError
|
from rest_framework.serializers import ValidationError as RestValidationError
|
||||||
|
|
||||||
import pytest
|
import pytest
|
||||||
|
|
||||||
|
|
||||||
|
def test_invalid_keys():
|
||||||
|
invalid_keys = [
|
||||||
|
"---BEGIN FOO -----foobar-----END FOO----",
|
||||||
|
"-----BEGIN FOO---foobar-----END FOO----",
|
||||||
|
"-----BEGIN FOO-----foobar---END FOO----",
|
||||||
|
"----- BEGIN FOO ----- foobar ----- FAIL FOO ----",
|
||||||
|
"----- FAIL FOO ----- foobar ----- END FOO ----",
|
||||||
|
"----BEGIN FOO----foobar----END BAR----",
|
||||||
|
]
|
||||||
|
for invalid_key in invalid_keys:
|
||||||
|
with pytest.raises(ValidationError):
|
||||||
|
validate_private_key(invalid_key)
|
||||||
|
with pytest.raises(ValidationError):
|
||||||
|
validate_certificate(invalid_key)
|
||||||
|
with pytest.raises(ValidationError):
|
||||||
|
validate_ssh_private_key(invalid_key)
|
||||||
|
|
||||||
|
|
||||||
|
def test_invalid_rsa_key():
|
||||||
|
invalid_key = TEST_SSH_KEY_DATA.replace('-----END', '----END')
|
||||||
|
with pytest.raises(ValidationError):
|
||||||
|
validate_private_key(invalid_key)
|
||||||
|
with pytest.raises(ValidationError):
|
||||||
|
validate_certificate(invalid_key)
|
||||||
|
with pytest.raises(ValidationError):
|
||||||
|
validate_ssh_private_key(invalid_key)
|
||||||
|
|
||||||
|
|
||||||
|
def test_valid_catted_rsa_key():
|
||||||
|
valid_key = TEST_CATTED_SSH_KEY_DATA
|
||||||
|
pem_objects = validate_private_key(valid_key)
|
||||||
|
assert pem_objects[0]['key_type'] == 'rsa'
|
||||||
|
assert not pem_objects[0]['key_enc']
|
||||||
|
with pytest.raises(ValidationError):
|
||||||
|
validate_certificate(valid_key)
|
||||||
|
pem_objects = validate_ssh_private_key(valid_key)
|
||||||
|
assert pem_objects[0]['key_type'] == 'rsa'
|
||||||
|
assert not pem_objects[0]['key_enc']
|
||||||
|
|
||||||
|
|
||||||
def test_valid_rsa_key():
|
def test_valid_rsa_key():
|
||||||
valid_key = TEST_SSH_KEY_DATA
|
valid_key = TEST_SSH_KEY_DATA
|
||||||
pem_objects = validate_private_key(valid_key)
|
pem_objects = validate_private_key(valid_key)
|
||||||
@ -42,16 +83,6 @@ def test_valid_locked_rsa_key():
|
|||||||
assert pem_objects[0]['key_enc']
|
assert pem_objects[0]['key_enc']
|
||||||
|
|
||||||
|
|
||||||
def test_invalid_rsa_key():
|
|
||||||
invalid_key = TEST_SSH_KEY_DATA.replace('-----END', '----END')
|
|
||||||
with pytest.raises(ValidationError):
|
|
||||||
validate_private_key(invalid_key)
|
|
||||||
with pytest.raises(ValidationError):
|
|
||||||
validate_certificate(invalid_key)
|
|
||||||
with pytest.raises(ValidationError):
|
|
||||||
validate_ssh_private_key(invalid_key)
|
|
||||||
|
|
||||||
|
|
||||||
def test_valid_openssh_key():
|
def test_valid_openssh_key():
|
||||||
valid_key = TEST_OPENSSH_KEY_DATA
|
valid_key = TEST_OPENSSH_KEY_DATA
|
||||||
pem_objects = validate_private_key(valid_key)
|
pem_objects = validate_private_key(valid_key)
|
||||||
@ -126,5 +157,3 @@ def test_valid_vars(var_str):
|
|||||||
def test_invalid_vars(var_str):
|
def test_invalid_vars(var_str):
|
||||||
with pytest.raises(RestValidationError):
|
with pytest.raises(RestValidationError):
|
||||||
vars_validate_or_raise(var_str)
|
vars_validate_or_raise(var_str)
|
||||||
|
|
||||||
|
|
||||||
|
@ -21,7 +21,7 @@ def validate_pem(data, min_keys=0, max_keys=None, min_certs=0, max_certs=None):
|
|||||||
"""
|
"""
|
||||||
Validate the given PEM data is valid and contains the required numbers of
|
Validate the given PEM data is valid and contains the required numbers of
|
||||||
keys and certificates.
|
keys and certificates.
|
||||||
|
|
||||||
Return a list of PEM objects, where each object is a dict with the following
|
Return a list of PEM objects, where each object is a dict with the following
|
||||||
keys:
|
keys:
|
||||||
- 'all': The entire string for the PEM object including BEGIN/END lines.
|
- 'all': The entire string for the PEM object including BEGIN/END lines.
|
||||||
@ -48,24 +48,31 @@ def validate_pem(data, min_keys=0, max_keys=None, min_certs=0, max_certs=None):
|
|||||||
|
|
||||||
# Build regular expressions for matching each object in the PEM file.
|
# Build regular expressions for matching each object in the PEM file.
|
||||||
pem_obj_re = re.compile(
|
pem_obj_re = re.compile(
|
||||||
r'^(-{4,}) *BEGIN ([A-Z ]+?) *\1[\r\n]+' +
|
r'^(?P<dashes>-{4,}) *BEGIN (?P<type>[A-Z ]+?) *(?P=dashes)' +
|
||||||
r'(.+?)[\r\n]+\1 *END \2 *\1[\r\n]?(.*?)$', re.DOTALL,
|
r'\s*(?P<data>.+?)\s*' +
|
||||||
|
r'(?P=dashes) *END (?P=type) *(?P=dashes)' +
|
||||||
|
r'(?P<next>.*?)$', re.DOTALL
|
||||||
)
|
)
|
||||||
pem_obj_header_re = re.compile(r'^(.+?):\s*?(.+?)(\\??)$')
|
pem_obj_header_re = re.compile(r'^(.+?):\s*?(.+?)(\\??)$')
|
||||||
|
|
||||||
pem_objects = []
|
pem_objects = []
|
||||||
key_count, cert_count = 0, 0
|
key_count, cert_count = 0, 0
|
||||||
|
|
||||||
|
# Strip leading whitespaces at the start of the PEM data
|
||||||
data = data.lstrip()
|
data = data.lstrip()
|
||||||
|
|
||||||
while data:
|
while data:
|
||||||
match = pem_obj_re.match(data)
|
match = pem_obj_re.match(data)
|
||||||
if not match:
|
if not match:
|
||||||
raise ValidationError(_('Invalid certificate or key: %s...') % data[:100])
|
raise ValidationError(_('Invalid certificate or key: %s...') % data[:100])
|
||||||
data = match.group(4).lstrip()
|
|
||||||
|
# The rest of the PEM data to process
|
||||||
|
data = match.group('next').lstrip()
|
||||||
|
|
||||||
# Check PEM object type, check key type if private key.
|
# Check PEM object type, check key type if private key.
|
||||||
pem_obj_info = {}
|
pem_obj_info = {}
|
||||||
pem_obj_info['all'] = match.group(0)
|
pem_obj_info['all'] = match.group(0)
|
||||||
pem_obj_info['type'] = pem_obj_type = match.group(2)
|
pem_obj_info['type'] = pem_obj_type = match.group('type')
|
||||||
if pem_obj_type.endswith('PRIVATE KEY'):
|
if pem_obj_type.endswith('PRIVATE KEY'):
|
||||||
key_count += 1
|
key_count += 1
|
||||||
pem_obj_info['type'] = 'PRIVATE KEY'
|
pem_obj_info['type'] = 'PRIVATE KEY'
|
||||||
@ -80,7 +87,7 @@ def validate_pem(data, min_keys=0, max_keys=None, min_certs=0, max_certs=None):
|
|||||||
raise ValidationError(_('Unsupported PEM object type: "%s"') % pem_obj_type)
|
raise ValidationError(_('Unsupported PEM object type: "%s"') % pem_obj_type)
|
||||||
|
|
||||||
# Ensure that this PEM object is valid base64 data.
|
# Ensure that this PEM object is valid base64 data.
|
||||||
pem_obj_info['data'] = match.group(3)
|
pem_obj_info['data'] = match.group('data')
|
||||||
base64_data = ''
|
base64_data = ''
|
||||||
line_continues = False
|
line_continues = False
|
||||||
for line in pem_obj_info['data'].splitlines():
|
for line in pem_obj_info['data'].splitlines():
|
||||||
@ -161,8 +168,10 @@ def validate_certificate(data):
|
|||||||
Validate that data contains one or more certificates. Adds BEGIN/END lines
|
Validate that data contains one or more certificates. Adds BEGIN/END lines
|
||||||
if necessary.
|
if necessary.
|
||||||
"""
|
"""
|
||||||
if 'BEGIN CERTIFICATE' not in data:
|
if 'BEGIN' not in data:
|
||||||
data = '-----BEGIN CERTIFICATE-----\n{}\n-----END CERTIFICATE-----\n'.format(data)
|
data = "-----BEGIN CERTIFICATE-----\n{}".format(data)
|
||||||
|
if 'END' not in data:
|
||||||
|
data = "{}\n-----END CERTIFICATE-----\n".format(data)
|
||||||
return validate_pem(data, max_keys=0, min_certs=1)
|
return validate_pem(data, max_keys=0, min_certs=1)
|
||||||
|
|
||||||
|
|
||||||
@ -186,4 +195,3 @@ def vars_validate_or_raise(vars_str):
|
|||||||
return vars_str
|
return vars_str
|
||||||
except ParseError as e:
|
except ParseError as e:
|
||||||
raise RestValidationError(str(e))
|
raise RestValidationError(str(e))
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user