mirror of
https://github.com/ansible/awx.git
synced 2024-11-01 16:51:11 +03:00
Merge pull request #3052 from jakemcdermott/inputs_enc
explicitly handle .inputs in decrypt_field, encrypt_field Reviewed-by: Ryan Petrello https://github.com/ryanpetrello
This commit is contained in:
commit
be4b3c75b4
@ -32,9 +32,6 @@ from rest_framework.permissions import AllowAny
|
||||
from rest_framework.renderers import StaticHTMLRenderer, JSONRenderer
|
||||
from rest_framework.negotiation import DefaultContentNegotiation
|
||||
|
||||
# cryptography
|
||||
from cryptography.fernet import InvalidToken
|
||||
|
||||
# AWX
|
||||
from awx.api.filters import FieldLookupBackend
|
||||
from awx.main.models import * # noqa
|
||||
@ -858,11 +855,14 @@ class CopyAPIView(GenericAPIView):
|
||||
and isinstance(field_val[sub_field], six.string_types):
|
||||
try:
|
||||
field_val[sub_field] = decrypt_field(obj, field_name, sub_field)
|
||||
except InvalidToken:
|
||||
except AttributeError:
|
||||
# Catching the corner case with v1 credential fields
|
||||
field_val[sub_field] = decrypt_field(obj, sub_field)
|
||||
elif isinstance(field_val, six.string_types):
|
||||
field_val = decrypt_field(obj, field_name)
|
||||
try:
|
||||
field_val = decrypt_field(obj, field_name)
|
||||
except AttributeError:
|
||||
return field_val
|
||||
return field_val
|
||||
|
||||
def _build_create_dict(self, obj):
|
||||
|
@ -325,10 +325,11 @@ class Credential(PasswordFieldsModel, CommonModelNameNotUnique, ResourceMixin):
|
||||
|
||||
@property
|
||||
def has_encrypted_ssh_key_data(self):
|
||||
if self.pk:
|
||||
try:
|
||||
ssh_key_data = decrypt_field(self, 'ssh_key_data')
|
||||
else:
|
||||
ssh_key_data = self.ssh_key_data
|
||||
except AttributeError:
|
||||
return False
|
||||
|
||||
try:
|
||||
pem_objects = validate_ssh_private_key(ssh_key_data)
|
||||
for pem_object in pem_objects:
|
||||
@ -383,9 +384,8 @@ class Credential(PasswordFieldsModel, CommonModelNameNotUnique, ResourceMixin):
|
||||
super(Credential, self).save(*args, **kwargs)
|
||||
|
||||
def encrypt_field(self, field, ask):
|
||||
if not hasattr(self, field):
|
||||
if field not in self.inputs:
|
||||
return None
|
||||
|
||||
encrypted = encrypt_field(self, field, ask=ask)
|
||||
if encrypted:
|
||||
self.inputs[field] = encrypted
|
||||
@ -444,7 +444,12 @@ class Credential(PasswordFieldsModel, CommonModelNameNotUnique, ResourceMixin):
|
||||
:param default(optional[str]): A default return value to use.
|
||||
"""
|
||||
if field_name in self.credential_type.secret_fields:
|
||||
return decrypt_field(self, field_name)
|
||||
try:
|
||||
return decrypt_field(self, field_name)
|
||||
except AttributeError:
|
||||
if 'default' in kwargs:
|
||||
return kwargs['default']
|
||||
raise AttributeError
|
||||
if field_name in self.inputs:
|
||||
return self.inputs[field_name]
|
||||
if 'default' in kwargs:
|
||||
|
@ -345,6 +345,10 @@ def test_credential_get_input(organization_factory):
|
||||
'id': 'vault_id',
|
||||
'type': 'string',
|
||||
'secret': False
|
||||
}, {
|
||||
'id': 'secret',
|
||||
'type': 'string',
|
||||
'secret': True,
|
||||
}]
|
||||
}
|
||||
)
|
||||
@ -372,6 +376,12 @@ def test_credential_get_input(organization_factory):
|
||||
cred.get_input('field_not_on_credential_type')
|
||||
# verify that the provided default is used for undefined inputs
|
||||
assert cred.get_input('field_not_on_credential_type', default='bar') == 'bar'
|
||||
# verify expected exception is raised when attempting to access an unset secret
|
||||
# input without providing a default
|
||||
with pytest.raises(AttributeError):
|
||||
cred.get_input('secret')
|
||||
# verify that the provided default is used for undefined inputs
|
||||
assert cred.get_input('secret', default='fiz') == 'fiz'
|
||||
# verify return values for encrypted secret fields are decrypted
|
||||
assert cred.inputs['vault_password'].startswith('$encrypted$')
|
||||
assert cred.get_input('vault_password') == 'testing321'
|
||||
|
@ -1656,6 +1656,7 @@ class TestJobCredentials(TestJobExecution):
|
||||
'password': 'secret'
|
||||
}
|
||||
)
|
||||
azure_rm_credential.inputs['secret'] = ''
|
||||
azure_rm_credential.inputs['secret'] = encrypt_field(azure_rm_credential, 'secret')
|
||||
self.instance.credentials.add(azure_rm_credential)
|
||||
|
||||
@ -2089,6 +2090,7 @@ class TestInventoryUpdateCredentials(TestJobExecution):
|
||||
'host': 'https://keystone.example.org'
|
||||
}
|
||||
)
|
||||
cred.inputs['ssh_key_data'] = ''
|
||||
cred.inputs['ssh_key_data'] = encrypt_field(
|
||||
cred, 'ssh_key_data'
|
||||
)
|
||||
|
@ -2,6 +2,8 @@
|
||||
|
||||
# Copyright (c) 2017 Ansible, Inc.
|
||||
# All Rights Reserved.
|
||||
import pytest
|
||||
|
||||
from awx.conf.models import Setting
|
||||
from awx.main.utils import encryption
|
||||
|
||||
@ -45,6 +47,16 @@ def test_encrypt_field_with_empty_value():
|
||||
assert encrypted is None
|
||||
|
||||
|
||||
def test_encrypt_field_with_undefined_attr_raises_expected_exception():
|
||||
with pytest.raises(AttributeError):
|
||||
encryption.encrypt_field({}, 'undefined_attr')
|
||||
|
||||
|
||||
def test_decrypt_field_with_undefined_attr_raises_expected_exception():
|
||||
with pytest.raises(AttributeError):
|
||||
encryption.decrypt_field({}, 'undefined_attr')
|
||||
|
||||
|
||||
class TestSurveyReversibilityValue:
|
||||
'''
|
||||
Tests to enforce the contract with survey password question encrypted values
|
||||
|
@ -63,7 +63,13 @@ def encrypt_field(instance, field_name, ask=False, subfield=None):
|
||||
'''
|
||||
Return content of the given instance and field name encrypted.
|
||||
'''
|
||||
value = getattr(instance, field_name)
|
||||
try:
|
||||
value = instance.inputs[field_name]
|
||||
except (TypeError, AttributeError):
|
||||
value = getattr(instance, field_name)
|
||||
except KeyError:
|
||||
raise AttributeError(field_name)
|
||||
|
||||
if isinstance(value, dict) and subfield is not None:
|
||||
value = value[subfield]
|
||||
if value is None:
|
||||
@ -98,7 +104,13 @@ def decrypt_field(instance, field_name, subfield=None):
|
||||
'''
|
||||
Return content of the given instance and field name decrypted.
|
||||
'''
|
||||
value = getattr(instance, field_name)
|
||||
try:
|
||||
value = instance.inputs[field_name]
|
||||
except (TypeError, AttributeError):
|
||||
value = getattr(instance, field_name)
|
||||
except KeyError:
|
||||
raise AttributeError(field_name)
|
||||
|
||||
if isinstance(value, dict) and subfield is not None:
|
||||
value = value[subfield]
|
||||
value = smart_str(value)
|
||||
|
Loading…
Reference in New Issue
Block a user