1
0
mirror of https://github.com/ansible/awx.git synced 2024-10-31 06:51:10 +03:00

properly migrate vault credentials to the new credentialtype model

This commit is contained in:
Ryan Petrello 2017-04-25 13:15:03 -04:00
parent 1f99a0df85
commit c0add33212
3 changed files with 61 additions and 44 deletions

View File

@ -40,7 +40,7 @@ from jsonbfield.fields import JSONField as upstream_JSONBField
# AWX
from awx.main.models.rbac import batch_role_ancestor_rebuilding, Role
from awx.main.utils import get_current_apps
from awx.main import utils
__all__ = ['AutoOneToOneField', 'ImplicitRoleField', 'JSONField', 'DynamicFilterField']
@ -120,7 +120,7 @@ def resolve_role_field(obj, field):
return []
if len(field_components) == 1:
role_cls = str(get_current_apps().get_model('main', 'Role'))
role_cls = str(utils.get_current_apps().get_model('main', 'Role'))
if not str(type(obj)) == role_cls:
raise Exception(smart_text('{} refers to a {}, not a Role'.format(field, type(obj))))
ret.append(obj.id)
@ -255,8 +255,8 @@ class ImplicitRoleField(models.ForeignKey):
def _post_save(self, instance, created, *args, **kwargs):
Role_ = get_current_apps().get_model('main', 'Role')
ContentType_ = get_current_apps().get_model('contenttypes', 'ContentType')
Role_ = utils.get_current_apps().get_model('main', 'Role')
ContentType_ = utils.get_current_apps().get_model('contenttypes', 'ContentType')
ct_id = ContentType_.objects.get_for_model(instance).id
with batch_role_ancestor_rebuilding():
# Create any missing role objects
@ -307,7 +307,7 @@ class ImplicitRoleField(models.ForeignKey):
for path in paths:
if path.startswith("singleton:"):
singleton_name = path[10:]
Role_ = get_current_apps().get_model('main', 'Role')
Role_ = utils.get_current_apps().get_model('main', 'Role')
qs = Role_.objects.filter(singleton_name=singleton_name)
if qs.count() >= 1:
role = qs[0]
@ -326,7 +326,7 @@ class ImplicitRoleField(models.ForeignKey):
for implicit_role_field in getattr(instance.__class__, '__implicit_role_fields'):
role_ids.append(getattr(instance, implicit_role_field.name + '_id'))
Role_ = get_current_apps().get_model('main', 'Role')
Role_ = utils.get_current_apps().get_model('main', 'Role')
child_ids = [x for x in Role_.parents.through.objects.filter(to_role_id__in=role_ids).distinct().values_list('from_role_id', flat=True)]
Role_.objects.filter(id__in=role_ids).delete()
Role.rebuild_role_ancestor_list([], child_ids)

View File

@ -1,49 +1,65 @@
from django.db.models.signals import post_save
from awx.main.models import Credential, CredentialType
import mock
from awx.main.models import CredentialType
from awx.main.utils.common import encrypt_field, decrypt_field
def migrate_to_v2_credentials(apps, schema_editor):
CredentialType.setup_tower_managed_defaults()
for cred in apps.get_model('main', 'Credential').objects.all():
data = {}
if getattr(cred, 'vault_password', None):
data['vault_password'] = cred.vault_password
credential_type = CredentialType.from_v1_kind(cred.kind, data)
defined_fields = credential_type.defined_fields
cred.credential_type = apps.get_model('main', 'CredentialType').objects.get(pk=credential_type.pk)
# this mock is necessary to make the implicit role generation save signal
# use the correct Role model (the version active at this point in
# migration, not the one at HEAD)
with mock.patch('awx.main.utils.get_current_apps', lambda: apps):
# temporarily disable implicit role signals; the class we're working on
# is the "pre-migration" credential model; our signals don't like that
# it differs from the "post-migration" credential model
for field in cred.__class__.__implicit_role_fields:
post_save.disconnect(field, cred.__class__, dispatch_uid='implicit-role-post-save')
for cred in apps.get_model('main', 'Credential').objects.all():
data = {}
if getattr(cred, 'vault_password', None):
data['vault_password'] = cred.vault_password
credential_type = CredentialType.from_v1_kind(cred.kind, data)
defined_fields = credential_type.defined_fields
cred.credential_type = apps.get_model('main', 'CredentialType').objects.get(pk=credential_type.pk)
for field in defined_fields:
if getattr(cred, field, None):
cred.inputs[field] = getattr(cred, field)
for field in defined_fields:
if getattr(cred, field, None):
cred.inputs[field] = getattr(cred, field)
cred.save()
#
# If the credential contains a vault password, create a new
# *additional* credential with the proper CredentialType; this needs to
# perform a deep copy of the Credential that considers:
#
if cred.vault_password:
new_fields = {}
for field in CredentialType.from_v1_kind('ssh').defined_fields:
if getattr(cred, field, None):
new_fields[field] = getattr(cred, field)
if new_fields:
#
# If the credential contains a vault password, create a new
# *additional* credential for the ssh details
#
if cred.vault_password:
# We need to make an ssh credential, too
new_cred = Credential(credential_type=CredentialType.from_v1_kind('ssh'))
for field, value in new_fields.items():
new_cred.inputs[field] = value
ssh_type = CredentialType.from_v1_kind('ssh')
new_cred = apps.get_model('main', 'Credential').objects.get(pk=cred.pk)
new_cred.pk = None
new_cred.vault_password = ''
new_cred.credential_type = apps.get_model('main', 'CredentialType').objects.get(pk=ssh_type.pk)
if 'vault_password' in new_cred.inputs:
del new_cred.inputs['vault_password']
# TODO: copy RBAC and Job Template assignments
new_cred.save()
# unset these attributes so that new roles are properly created
# at save time
new_cred.read_role = None
new_cred.admin_role = None
new_cred.use_role = None
# re-enable implicit role signals
for field in cred.__class__.__implicit_role_fields:
post_save.connect(field._post_save, cred.__class__, True, dispatch_uid='implicit-role-post-save')
# TODO: Job Template assignments
if any([getattr(cred, field) for field in ssh_type.defined_fields]):
new_cred.save(force_insert=True)
# passwords must be decrypted and re-encrypted, because
# their encryption is based on the Credential's primary key
# (which has changed)
for field in ssh_type.defined_fields:
if field in ssh_type.secret_fields:
value = decrypt_field(cred, field)
if value:
setattr(new_cred, field, value)
new_cred.inputs[field] = encrypt_field(new_cred, field)
setattr(new_cred, field, '')
else:
new_cred.inputs[field] = getattr(cred, field)
new_cred.save()

View File

@ -1,6 +1,7 @@
import mock
import pytest
from contextlib import contextmanager
from copy import deepcopy
from django.apps import apps
@ -17,7 +18,7 @@ EXAMPLE_PRIVATE_KEY = '-----BEGIN PRIVATE KEY-----\nxyz==\n-----END PRIVATE KEY-
def migrate(credential, kind):
with mock.patch.object(Credential, 'kind', kind), \
mock.patch.object(Credential, 'objects', mock.Mock(
get=lambda **kw: credential,
get=lambda **kw: deepcopy(credential),
all=lambda: [credential]
)):
class Apps(apps.__class__):