1
0
mirror of https://github.com/ansible/awx.git synced 2024-11-02 01:21:21 +03:00

update encryption migration checks and tests

This commit is contained in:
Wayne Witzel III 2017-06-15 12:04:54 -04:00
parent 9014e8c612
commit f9b412419c
4 changed files with 55 additions and 30 deletions

View File

@ -9,7 +9,7 @@ from awx.conf import settings_registry
__all__ = ['replace_aesecb_fernet', 'get_encryption_key', 'encrypt_field',
'decrypt_value', 'decrypt_value']
'decrypt_value', 'decrypt_value', 'should_decrypt_field']
def replace_aesecb_fernet(apps, schema_editor):
@ -17,8 +17,7 @@ def replace_aesecb_fernet(apps, schema_editor):
for setting in Setting.objects.filter().order_by('pk'):
if settings_registry.is_setting_encrypted(setting.key):
if setting.value.startswith('$encrypted$AESCBC$'):
continue
if should_decrypt_field(setting.value):
setting.value = decrypt_field(setting, 'value')
setting.save()
@ -100,3 +99,7 @@ def encrypt_field(instance, field_name, ask=False, subfield=None, skip_utf8=Fals
# know to decode the data when it's decrypted later
tokens.insert(1, 'UTF8')
return '$'.join(tokens)
def should_decrypt_field(value):
return value.startswith('$encrypted$') and '$AESCBC$' not in value

View File

@ -1,3 +1,7 @@
# -*- coding: utf-8 -*-
# Copyright (c) 2017 Ansible, Inc.
# All Rights Reserved.
import pytest
import mock
@ -12,17 +16,21 @@ from awx.main.utils import decrypt_field as new_decrypt_field
@pytest.mark.django_db
def test_settings():
@pytest.mark.parametrize("old_enc, new_enc, value", [
('$encrypted$UTF8$AES', '$encrypted$UTF8$AESCBC$', u'Iñtërnâtiônàlizætiøn'),
('$encrypted$AES$', '$encrypted$AESCBC$', 'test'),
])
def test_settings(old_enc, new_enc, value):
with mock.patch('awx.conf.models.encrypt_field', encrypt_field):
with mock.patch('awx.conf.settings.decrypt_field', decrypt_field):
setting = Setting.objects.create(key='SOCIAL_AUTH_GITHUB_SECRET', value='test')
assert setting.value.startswith('$encrypted$AES$')
setting = Setting.objects.create(key='SOCIAL_AUTH_GITHUB_SECRET', value=value)
assert setting.value.startswith(old_enc)
replace_aesecb_fernet(apps, None)
setting.refresh_from_db()
assert setting.value.startswith('$encrypted$AESCBC$')
assert new_decrypt_field(setting, 'value') == 'test'
assert setting.value.startswith(new_enc)
assert new_decrypt_field(setting, 'value') == value
# This is here for a side-effect.
# Exception if the encryption type of AESCBC is not properly skipped, ensures

View File

@ -1,5 +1,8 @@
from awx.main import utils
from awx.conf.migrations._reencrypt import decrypt_field
from awx.conf.migrations._reencrypt import (
decrypt_field,
should_decrypt_field,
)
__all__ = ['replace_aesecb_fernet']
@ -16,8 +19,7 @@ def _notification_templates(apps):
for nt in NotificationTemplate.objects.all():
for field in filter(lambda x: nt.notification_class.init_parameters[x]['type'] == "password",
nt.notification_class.init_parameters):
if nt.notification_configuration[field].startswith('$encrypted$AESCBC$'):
continue
if should_decrypt_field(nt.notification_configuration[field]):
value = decrypt_field(nt, 'notification_configuration', subfield=field)
nt.notification_configuration[field] = value
nt.save()
@ -32,7 +34,7 @@ def _credentials(apps):
utils.get_current_apps = lambda: apps
for credential in apps.get_model('main', 'Credential').objects.all():
for field_name, value in credential.inputs.items():
if value.startswith('$encrypted$AES$'):
if should_decrypt_field(value):
value = decrypt_field(credential, field_name)
credential.inputs[field_name] = value
credential.save()
@ -45,8 +47,7 @@ def _unified_jobs(apps):
UnifiedJob = apps.get_model('main', 'UnifiedJob')
for uj in UnifiedJob.objects.all():
if uj.start_args is not None:
if uj.start_args.startswith('$encrypted$AESCBC$'):
continue
if should_decrypt_field(uj.start_args):
start_args = decrypt_field(uj, 'start_args')
uj.start_args = start_args
uj.save()

View File

@ -1,3 +1,7 @@
# -*- coding: utf-8 -*-
# Copyright (c) 2017 Ansible, Inc.
# All Rights Reserved.
import json
import pytest
import mock
@ -23,6 +27,7 @@ from awx.main.utils import decrypt_field
@pytest.mark.django_db
def test_notification_template_migration():
# Doesn't get tagged as UTF8 because the the internal save call explicitly sets skip_utf8=True
with mock.patch('awx.main.models.notifications.encrypt_field', encrypt_field):
nt = NotificationTemplate.objects.create(notification_type='slack', notification_configuration=dict(token='test'))
@ -42,20 +47,24 @@ def test_notification_template_migration():
@pytest.mark.django_db
def test_credential_migration():
@pytest.mark.parametrize("old_enc, new_enc, value", [
('$encrypted$UTF8$AES', '$encrypted$UTF8$AESCBC$', u'Iñtërnâtiônàlizætiøn'),
('$encrypted$AES$', '$encrypted$AESCBC$', 'test'),
])
def test_credential_migration(old_enc, new_enc, value):
with mock.patch('awx.main.models.credential.encrypt_field', encrypt_field):
cred_type = ssh()
cred_type.save()
cred = Credential.objects.create(credential_type=cred_type, inputs=dict(password='test'))
cred = Credential.objects.create(credential_type=cred_type, inputs=dict(password=value))
assert cred.password.startswith('$encrypted$AES$')
assert cred.password.startswith(old_enc)
_credentials(apps)
cred.refresh_from_db()
assert cred.password.startswith('$encrypted$AESCBC$')
assert decrypt_field(cred, 'password') == 'test'
assert cred.password.startswith(new_enc)
assert decrypt_field(cred, 'password') == value
# This is here for a side-effect.
# Exception if the encryption type of AESCBC is not properly skipped, ensures
@ -64,17 +73,21 @@ def test_credential_migration():
@pytest.mark.django_db
def test_unified_job_migration():
@pytest.mark.parametrize("old_enc, new_enc, value", [
('$encrypted$AES$', '$encrypted$AESCBC$', u'Iñtërnâtiônàlizætiøn'),
('$encrypted$AES$', '$encrypted$AESCBC$', 'test'),
])
def test_unified_job_migration(old_enc, new_enc, value):
with mock.patch('awx.main.models.base.encrypt_field', encrypt_field):
uj = UnifiedJob.objects.create(launch_type='manual', start_args=json.dumps({'test':'value'}))
uj = UnifiedJob.objects.create(launch_type='manual', start_args=json.dumps({'test':value}))
assert uj.start_args.startswith('$encrypted$AES$')
assert uj.start_args.startswith(old_enc)
_unified_jobs(apps)
uj.refresh_from_db()
assert uj.start_args.startswith('$encrypted$AESCBC$')
assert json.loads(decrypt_field(uj, 'start_args')) == {'test':'value'}
assert uj.start_args.startswith(new_enc)
assert json.loads(decrypt_field(uj, 'start_args')) == {'test':value}
# This is here for a side-effect.
# Exception if the encryption type of AESCBC is not properly skipped, ensures