mirror of
https://github.com/ansible/awx.git
synced 2024-11-01 08:21:15 +03:00
Prevent nested encrypted field leak in activity stream.
This commit is contained in:
parent
7c2e5df659
commit
7f1f68ee28
@ -1944,9 +1944,8 @@ class CredentialSerializer(BaseSerializer):
|
||||
if field in value and force_text(value[field]).startswith('$encrypted$'):
|
||||
value[field] = '$encrypted$'
|
||||
|
||||
for k, v in value.get('inputs', {}).items():
|
||||
if force_text(v).startswith('$encrypted$'):
|
||||
value['inputs'][k] = '$encrypted$'
|
||||
if 'inputs' in value:
|
||||
value['inputs'] = data.display_inputs()
|
||||
return value
|
||||
|
||||
def get_related(self, obj):
|
||||
@ -3203,11 +3202,8 @@ class NotificationTemplateSerializer(BaseSerializer):
|
||||
|
||||
def to_representation(self, obj):
|
||||
ret = super(NotificationTemplateSerializer, self).to_representation(obj)
|
||||
for field in obj.notification_class.init_parameters:
|
||||
config = obj.notification_configuration
|
||||
if field in config and force_text(config[field]).startswith('$encrypted$'):
|
||||
config[field] = '$encrypted$'
|
||||
ret['notification_configuration'] = config
|
||||
if 'notification_configuration' in ret:
|
||||
ret['notification_configuration'] = obj.display_notification_configuration()
|
||||
return ret
|
||||
|
||||
def get_related(self, obj):
|
||||
|
@ -15,6 +15,7 @@ from jinja2 import Template
|
||||
from django.db import models
|
||||
from django.utils.translation import ugettext_lazy as _
|
||||
from django.core.exceptions import ValidationError
|
||||
from django.utils.encoding import force_text
|
||||
|
||||
# AWX
|
||||
from awx.api.versioning import reverse
|
||||
@ -370,6 +371,13 @@ class Credential(PasswordFieldsModel, CommonModelNameNotUnique, ResourceMixin):
|
||||
field = 'inputs'
|
||||
super(Credential, self).mark_field_for_save(update_fields, field)
|
||||
|
||||
def display_inputs(self):
|
||||
field_val = self.inputs.copy()
|
||||
for k, v in field_val.items():
|
||||
if force_text(v).startswith('$encrypted$'):
|
||||
field_val[k] = '$encrypted$'
|
||||
return field_val
|
||||
|
||||
|
||||
class CredentialType(CommonModelNameNotUnique):
|
||||
'''
|
||||
|
@ -6,7 +6,7 @@ import logging
|
||||
from django.db import models
|
||||
from django.core.mail.message import EmailMessage
|
||||
from django.utils.translation import ugettext_lazy as _
|
||||
from django.utils.encoding import smart_str
|
||||
from django.utils.encoding import smart_str, force_text
|
||||
|
||||
# AWX
|
||||
from awx.api.versioning import reverse
|
||||
@ -119,6 +119,13 @@ class NotificationTemplate(CommonModelNameNotUnique):
|
||||
notification_obj = EmailMessage(subject, backend_obj.format_body(body), sender, recipients)
|
||||
return backend_obj.send_messages([notification_obj])
|
||||
|
||||
def display_notification_configuration(self):
|
||||
field_val = self.notification_configuration.copy()
|
||||
for field in self.notification_class.init_parameters:
|
||||
if field in field_val and force_text(field_val[field]).startswith('$encrypted$'):
|
||||
field_val[field] = '$encrypted$'
|
||||
return field_val
|
||||
|
||||
|
||||
class Notification(CreatedModifiedModel):
|
||||
'''
|
||||
|
@ -292,6 +292,15 @@ def notification_template(organization):
|
||||
headers={"Test": "Header"}))
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def notification_template_with_encrypt(organization):
|
||||
return NotificationTemplate.objects.create(name='test-notification_template_with_encrypt',
|
||||
organization=organization,
|
||||
notification_type="slack",
|
||||
notification_configuration=dict(channels=["Foo", "Bar"],
|
||||
token="token"))
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def notification(notification_template):
|
||||
return Notification.objects.create(notification_template=notification_template,
|
||||
|
60
awx/main/tests/functional/utils/test_common.py
Normal file
60
awx/main/tests/functional/utils/test_common.py
Normal file
@ -0,0 +1,60 @@
|
||||
import pytest
|
||||
|
||||
import copy
|
||||
import json
|
||||
|
||||
from awx.main.utils.common import (
|
||||
model_instance_diff,
|
||||
model_to_dict,
|
||||
)
|
||||
|
||||
|
||||
@pytest.mark.django_db
|
||||
def test_model_to_dict_user(alice):
|
||||
username = copy.copy(alice.username)
|
||||
password = copy.copy(alice.password)
|
||||
output_dict = model_to_dict(alice)
|
||||
assert output_dict['username'] == username
|
||||
assert output_dict['password'] == 'hidden'
|
||||
assert alice.username == password
|
||||
assert alice.password == password
|
||||
|
||||
|
||||
@pytest.mark.django_db
|
||||
def test_model_to_dict_credential(credential):
|
||||
name = copy.copy(credential.name)
|
||||
inputs = copy.copy(credential.inputs)
|
||||
output_dict = model_to_dict(credential)
|
||||
assert output_dict['name'] == name
|
||||
assert output_dict['inputs'] == 'hidden'
|
||||
assert credential.name == name
|
||||
assert credential.inputs == inputs
|
||||
|
||||
|
||||
@pytest.mark.django_db
|
||||
def test_model_to_dict_notification_template(notification_template_with_encrypt):
|
||||
old_configuration = copy.deepcopy(notification_template_with_encrypt.notification_configuration)
|
||||
output_dict = model_to_dict(notification_template_with_encrypt)
|
||||
new_configuration = json.loads(output_dict['notification_configuration'])
|
||||
assert notification_template_with_encrypt.notification_configuration == old_configuration
|
||||
assert new_configuration['token'] == '$encrypted$'
|
||||
assert new_configuration['channels'] == old_configuration['channels']
|
||||
|
||||
|
||||
@pytest.mark.django_db
|
||||
def test_model_instance_diff(alice, bob):
|
||||
alice_name = copy.copy(alice.username)
|
||||
alice_pass = copy.copy(alice.password)
|
||||
bob_name = copy.copy(bob.username)
|
||||
bob_pass = copy.copy(bob.password)
|
||||
output_dict = model_instance_diff(alice, bob)
|
||||
assert alice_name == alice.username
|
||||
assert alice_pass == alice.password
|
||||
assert bob_name == bob.username
|
||||
assert bob_pass == bob.password
|
||||
assert output_dict['username'][0] == alice_name
|
||||
assert output_dict['username'][1] == bob_name
|
||||
assert output_dict['password'] == ('hidden', 'hidden')
|
||||
assert hasattr(alice, 'is_superuser')
|
||||
assert hasattr(bob, 'is_superuser')
|
||||
assert 'is_superuser' not in output_dict
|
@ -372,6 +372,26 @@ def get_allowed_fields(obj, serializer_mapping):
|
||||
return allowed_fields
|
||||
|
||||
|
||||
def _convert_model_field_for_display(obj, field_name, password_fields=None):
|
||||
# NOTE: Careful modifying the value of field_val, as it could modify
|
||||
# underlying model object field value also.
|
||||
field_val = getattr(obj, field_name, None)
|
||||
if password_fields is None:
|
||||
password_fields = set(getattr(type(obj), 'PASSWORD_FIELDS', [])) | set(['password'])
|
||||
if field_name in password_fields:
|
||||
return u'hidden'
|
||||
if hasattr(obj, 'display_%s' % field_name):
|
||||
field_val = getattr(obj, 'display_%s' % field_name)()
|
||||
if isinstance(field_val, (list, dict)):
|
||||
try:
|
||||
field_val = json.dumps(field_val, ensure_ascii=False)
|
||||
except Exception:
|
||||
pass
|
||||
if type(field_val) not in (bool, int, type(None)):
|
||||
field_val = smart_str(field_val)
|
||||
return field_val
|
||||
|
||||
|
||||
def model_instance_diff(old, new, serializer_mapping=None):
|
||||
"""
|
||||
Calculate the differences between two model instances. One of the instances may be None (i.e., a newly
|
||||
@ -380,13 +400,13 @@ def model_instance_diff(old, new, serializer_mapping=None):
|
||||
When provided, read-only fields will not be included in the resulting dictionary
|
||||
"""
|
||||
from django.db.models import Model
|
||||
from awx.main.models.credential import Credential
|
||||
PASSWORD_FIELDS = ['password'] + Credential.PASSWORD_FIELDS
|
||||
|
||||
if not(old is None or isinstance(old, Model)):
|
||||
raise TypeError('The supplied old instance is not a valid model instance.')
|
||||
if not(new is None or isinstance(new, Model)):
|
||||
raise TypeError('The supplied new instance is not a valid model instance.')
|
||||
old_password_fields = set(getattr(type(old), 'PASSWORD_FIELDS', [])) | set(['password'])
|
||||
new_password_fields = set(getattr(type(new), 'PASSWORD_FIELDS', [])) | set(['password'])
|
||||
|
||||
diff = {}
|
||||
|
||||
@ -395,15 +415,11 @@ def model_instance_diff(old, new, serializer_mapping=None):
|
||||
for field in allowed_fields:
|
||||
old_value = getattr(old, field, None)
|
||||
new_value = getattr(new, field, None)
|
||||
|
||||
if old_value != new_value and field not in PASSWORD_FIELDS:
|
||||
if type(old_value) not in (bool, int, type(None)):
|
||||
old_value = smart_str(old_value)
|
||||
if type(new_value) not in (bool, int, type(None)):
|
||||
new_value = smart_str(new_value)
|
||||
diff[field] = (old_value, new_value)
|
||||
elif old_value != new_value and field in PASSWORD_FIELDS:
|
||||
diff[field] = (u"hidden", u"hidden")
|
||||
if old_value != new_value:
|
||||
diff[field] = (
|
||||
_convert_model_field_for_display(old, field, password_fields=old_password_fields),
|
||||
_convert_model_field_for_display(new, field, password_fields=new_password_fields),
|
||||
)
|
||||
|
||||
if len(diff) == 0:
|
||||
diff = None
|
||||
@ -417,8 +433,7 @@ def model_to_dict(obj, serializer_mapping=None):
|
||||
serializer_mapping are used to determine read-only fields.
|
||||
When provided, read-only fields will not be included in the resulting dictionary
|
||||
"""
|
||||
from awx.main.models.credential import Credential
|
||||
PASSWORD_FIELDS = ['password'] + Credential.PASSWORD_FIELDS
|
||||
password_fields = set(getattr(type(obj), 'PASSWORD_FIELDS', [])) | set(['password'])
|
||||
attr_d = {}
|
||||
|
||||
allowed_fields = get_allowed_fields(obj, serializer_mapping)
|
||||
@ -426,14 +441,8 @@ def model_to_dict(obj, serializer_mapping=None):
|
||||
for field in obj._meta.fields:
|
||||
if field.name not in allowed_fields:
|
||||
continue
|
||||
if field.name not in PASSWORD_FIELDS:
|
||||
field_val = getattr(obj, field.name, None)
|
||||
if type(field_val) not in (bool, int, type(None)):
|
||||
attr_d[field.name] = smart_str(field_val)
|
||||
else:
|
||||
attr_d[field.name] = field_val
|
||||
else:
|
||||
attr_d[field.name] = "hidden"
|
||||
attr_d[field.name] = _convert_model_field_for_display(obj, field.name, password_fields=password_fields)
|
||||
|
||||
return attr_d
|
||||
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user