1
0
mirror of https://github.com/ansible/awx.git synced 2024-10-31 15:21:13 +03:00

Merge pull request #6001 from ryanpetrello/new_credential_model

Introduce a new CredentialTemplate model
This commit is contained in:
Ryan Petrello 2017-04-24 14:54:27 -04:00 committed by GitHub
commit e65ef35acf
37 changed files with 4204 additions and 473 deletions

View File

@ -1,13 +1,11 @@
# Copyright (c) 2016 Ansible, Inc. # Copyright (c) 2016 Ansible, Inc.
# All Rights Reserved. # All Rights Reserved.
# Django
from django.utils.encoding import force_text
# Django REST Framework # Django REST Framework
from rest_framework import serializers from rest_framework import serializers
__all__ = ['BooleanNullField', 'CharNullField', 'ChoiceNullField', 'EncryptedPasswordField', 'VerbatimField'] __all__ = ['BooleanNullField', 'CharNullField', 'ChoiceNullField', 'VerbatimField']
class NullFieldMixin(object): class NullFieldMixin(object):
@ -58,25 +56,6 @@ class ChoiceNullField(NullFieldMixin, serializers.ChoiceField):
return super(ChoiceNullField, self).to_internal_value(data or u'') return super(ChoiceNullField, self).to_internal_value(data or u'')
class EncryptedPasswordField(CharNullField):
'''
Custom field to handle encrypted password values (on credentials).
'''
def to_internal_value(self, data):
value = super(EncryptedPasswordField, self).to_internal_value(data or u'')
# If user submits a value starting with $encrypted$, ignore it.
if force_text(value).startswith('$encrypted$'):
raise serializers.SkipField
return value
def to_representation(self, value):
# Replace the actual encrypted value with the string $encrypted$.
if force_text(value).startswith('$encrypted$'):
return '$encrypted$'
return value
class VerbatimField(serializers.Field): class VerbatimField(serializers.Field):
''' '''
Custom field that passes the value through without changes. Custom field that passes the value through without changes.

View File

@ -6,7 +6,7 @@ import re
import json import json
# Django # Django
from django.core.exceptions import FieldError, ValidationError from django.core.exceptions import FieldError, ValidationError, ObjectDoesNotExist
from django.db import models from django.db import models
from django.db.models import Q from django.db.models import Q
from django.db.models.fields import FieldDoesNotExist from django.db.models.fields import FieldDoesNotExist
@ -22,6 +22,7 @@ from rest_framework.filters import BaseFilterBackend
# Ansible Tower # Ansible Tower
from awx.main.utils import get_type_for_model, to_python_boolean from awx.main.utils import get_type_for_model, to_python_boolean
from awx.main.models.credential import CredentialType
from awx.main.models.rbac import RoleAncestorEntry from awx.main.models.rbac import RoleAncestorEntry
@ -161,6 +162,18 @@ class FieldLookupBackend(BaseFilterBackend):
except UnicodeEncodeError: except UnicodeEncodeError:
raise ValueError("%r is not an allowed field name. Must be ascii encodable." % lookup) raise ValueError("%r is not an allowed field name. Must be ascii encodable." % lookup)
# Make legacy v1 Credential fields work for backwards compatability
# TODO: remove after API v1 deprecation period
if model._meta.object_name == 'Credential' and lookup == 'kind':
try:
type_ = CredentialType.from_v1_kind(value)
if type_ is None:
raise ParseError(_('cannot filter on kind %s') % value)
value = type_.pk
lookup = 'credential_type'
except ObjectDoesNotExist as e:
raise ParseError(_('cannot filter on kind %s') % value)
field, new_lookup = self.get_field_from_lookup(model, lookup) field, new_lookup = self.get_field_from_lookup(model, lookup)
# Type names are stored without underscores internally, but are presented and # Type names are stored without underscores internally, but are presented and

View File

@ -165,6 +165,7 @@ class APIView(views.APIView):
'new_in_300': getattr(self, 'new_in_300', False), 'new_in_300': getattr(self, 'new_in_300', False),
'new_in_310': getattr(self, 'new_in_310', False), 'new_in_310': getattr(self, 'new_in_310', False),
'new_in_320': getattr(self, 'new_in_320', False), 'new_in_320': getattr(self, 'new_in_320', False),
'new_in_api_v2': getattr(self, 'new_in_api_v2', False),
'deprecated': getattr(self, 'deprecated', False), 'deprecated': getattr(self, 'deprecated', False),
} }

View File

@ -186,6 +186,14 @@ class Metadata(metadata.SimpleMetadata):
break break
metadata['added_in_version'] = added_in_version metadata['added_in_version'] = added_in_version
# Add API version number in which view was added to Tower.
added_in_api_version = 'v1'
for version in ('v2',):
if getattr(view, 'new_in_api_%s' % version, False):
added_in_api_version = version
break
metadata['added_in_api_version'] = added_in_api_version
# Add type(s) handled by this view/serializer. # Add type(s) handled by this view/serializer.
if hasattr(view, 'get_serializer'): if hasattr(view, 'get_serializer'):
serializer = view.get_serializer() serializer = view.get_serializer()

View File

@ -48,8 +48,8 @@ from awx.main.utils import (
from awx.main.validators import vars_validate_or_raise from awx.main.validators import vars_validate_or_raise
from awx.conf.license import feature_enabled from awx.conf.license import feature_enabled
from awx.api.versioning import reverse from awx.api.versioning import reverse, get_request_version
from awx.api.fields import BooleanNullField, CharNullField, ChoiceNullField, EncryptedPasswordField, VerbatimField from awx.api.fields import BooleanNullField, CharNullField, ChoiceNullField, VerbatimField
logger = logging.getLogger('awx.api.serializers') logger = logging.getLogger('awx.api.serializers')
@ -243,6 +243,12 @@ class BaseSerializer(serializers.ModelSerializer):
created = serializers.SerializerMethodField() created = serializers.SerializerMethodField()
modified = serializers.SerializerMethodField() modified = serializers.SerializerMethodField()
@property
def version(self):
"""
The request version component of the URL as an integer i.e., 1 or 2
"""
return get_request_version(self.context.get('request'))
def get_type(self, obj): def get_type(self, obj):
return get_type_for_model(self.Meta.model) return get_type_for_model(self.Meta.model)
@ -309,7 +315,18 @@ class BaseSerializer(serializers.ModelSerializer):
continue continue
summary_fields[fk] = OrderedDict() summary_fields[fk] = OrderedDict()
for field in related_fields: for field in related_fields:
fval = getattr(fkval, field, None) fval = getattr(fkval, field, None)
# TODO: remove when API v1 is removed
if all([
self.version == 1,
'credential' in fk,
field == 'kind',
fval == 'machine'
]):
fval = 'ssh'
if fval is None and field == 'type': if fval is None and field == 'type':
if isinstance(fkval, PolymorphicModel): if isinstance(fkval, PolymorphicModel):
fkval = fkval.get_real_instance() fkval = fkval.get_real_instance()
@ -1819,25 +1836,76 @@ class ResourceAccessListElementSerializer(UserSerializer):
return ret return ret
class CredentialSerializer(BaseSerializer): class CredentialTypeSerializer(BaseSerializer):
show_capabilities = ['edit', 'delete'] show_capabilities = ['edit', 'delete']
class Meta:
model = CredentialType
fields = ('*', 'kind', 'name', 'managed_by_tower', 'inputs',
'injectors')
# TODO: remove when API v1 is removed
@six.add_metaclass(BaseSerializerMetaclass)
class V1CredentialFields(BaseSerializer):
class Meta: class Meta:
model = Credential model = Credential
fields = ('*', 'kind', 'cloud', 'host', 'username', fields = ('*', 'kind', 'cloud', 'host', 'username',
'password', 'security_token', 'project', 'domain', 'password', 'security_token', 'project', 'domain',
'ssh_key_data', 'ssh_key_unlock', 'organization', 'ssh_key_data', 'ssh_key_unlock', 'become_method',
'become_method', 'become_username', 'become_password', 'become_username', 'become_password', 'vault_password',
'vault_password', 'subscription', 'tenant', 'secret', 'client', 'subscription', 'tenant', 'secret', 'client', 'authorize',
'authorize', 'authorize_password') 'authorize_password')
def build_standard_field(self, field_name, model_field): def build_field(self, field_name, info, model_class, nested_depth):
field_class, field_kwargs = super(CredentialSerializer, self).build_standard_field(field_name, model_field) if field_name in V1Credential.FIELDS:
if field_name in Credential.PASSWORD_FIELDS: return self.build_standard_field(field_name,
field_class = EncryptedPasswordField V1Credential.FIELDS[field_name])
field_kwargs['required'] = False return super(V1CredentialFields, self).build_field(field_name, info, model_class, nested_depth)
field_kwargs['default'] = ''
return field_class, field_kwargs
@six.add_metaclass(BaseSerializerMetaclass)
class V2CredentialFields(BaseSerializer):
class Meta:
model = Credential
fields = ('*', 'credential_type', 'inputs')
class CredentialSerializer(BaseSerializer):
show_capabilities = ['edit', 'delete']
class Meta:
model = Credential
fields = ('*', 'organization')
def get_fields(self):
fields = super(CredentialSerializer, self).get_fields()
# TODO: remove when API v1 is removed
if self.version == 1:
fields.update(V1CredentialFields().get_fields())
else:
fields.update(V2CredentialFields().get_fields())
return fields
def to_representation(self, data):
value = super(CredentialSerializer, self).to_representation(data)
# TODO: remove when API v1 is removed
if self.version == 1:
if value.get('kind') == 'machine':
value['kind'] = 'ssh'
for field in V1Credential.PASSWORD_FIELDS:
if field in value and force_text(value[field]).startswith('$encrypted$'):
value[field] = '$encrypted$'
for k, v in value.get('inputs', {}).items():
if force_text(v).startswith('$encrypted$'):
value['inputs'][k] = '$encrypted$'
return value
def get_related(self, obj): def get_related(self, obj):
res = super(CredentialSerializer, self).get_related(obj) res = super(CredentialSerializer, self).get_related(obj)
@ -1853,6 +1921,12 @@ class CredentialSerializer(BaseSerializer):
owner_teams = self.reverse('api:credential_owner_teams_list', kwargs={'pk': obj.pk}), owner_teams = self.reverse('api:credential_owner_teams_list', kwargs={'pk': obj.pk}),
)) ))
# TODO: remove when API v1 is removed
if self.version > 1:
res.update(dict(
credential_type = self.reverse('api:credential_type_detail', kwargs={'pk': obj.credential_type.pk}),
))
parents = [role for role in obj.admin_role.parents.all() if role.object_id is not None] parents = [role for role in obj.admin_role.parents.all() if role.object_id is not None]
if parents: if parents:
res.update({parents[0].content_type.name:parents[0].content_object.get_absolute_url(self.context.get('request'))}) res.update({parents[0].content_type.name:parents[0].content_object.get_absolute_url(self.context.get('request'))})
@ -1886,6 +1960,35 @@ class CredentialSerializer(BaseSerializer):
return summary_dict return summary_dict
def get_validation_exclusions(self, obj=None):
# CredentialType is now part of validation; legacy v1 fields (e.g.,
# 'username', 'password') in JSON POST payloads use the
# CredentialType's inputs definition to determine their validity
ret = super(CredentialSerializer, self).get_validation_exclusions(obj)
for field in ('credential_type', 'inputs'):
if field in ret:
ret.remove(field)
return ret
def to_internal_value(self, data):
if 'credential_type' not in data:
# If `credential_type` is not provided, assume the payload is a
# v1 credential payload that specifies a `kind` and a flat list
# of field values
#
# In this scenario, we should automatically detect the proper
# CredentialType based on the provided values
kind = data.get('kind', 'ssh')
credential_type = CredentialType.from_v1_kind(kind, data)
data['credential_type'] = credential_type.pk
value = OrderedDict(
{'credential_type': credential_type}.items() +
super(CredentialSerializer, self).to_internal_value(data).items()
)
value.pop('kind', None)
return value
return super(CredentialSerializer, self).to_internal_value(data)
class CredentialSerializerCreate(CredentialSerializer): class CredentialSerializerCreate(CredentialSerializer):
@ -1926,7 +2029,20 @@ class CredentialSerializerCreate(CredentialSerializer):
team = validated_data.pop('team', None) team = validated_data.pop('team', None)
if team: if team:
validated_data['organization'] = team.organization validated_data['organization'] = team.organization
# If our payload contains v1 credential fields, translate to the new
# model
# TODO: remove when API v1 is removed
if self.version == 1:
for attr in (
set(V1Credential.FIELDS) & set(validated_data.keys()) # set intersection
):
validated_data.setdefault('inputs', {})
value = validated_data.pop(attr)
if value:
validated_data['inputs'][attr] = value
credential = super(CredentialSerializerCreate, self).create(validated_data) credential = super(CredentialSerializerCreate, self).create(validated_data)
if user: if user:
credential.admin_role.members.add(user) credential.admin_role.members.add(user)
if team: if team:

View File

@ -0,0 +1,4 @@
Version 2 of the Ansible Tower REST API.
Make a GET request to this resource to obtain a list of all child resources
available via the API.

View File

@ -164,6 +164,11 @@ inventory_script_urls = patterns('awx.api.views',
url(r'^(?P<pk>[0-9]+)/object_roles/$', 'inventory_script_object_roles_list'), url(r'^(?P<pk>[0-9]+)/object_roles/$', 'inventory_script_object_roles_list'),
) )
credential_type_urls = patterns('awx.api.views',
url(r'^$', 'credential_type_list'),
url(r'^(?P<pk>[0-9]+)/$', 'credential_type_detail'),
)
credential_urls = patterns('awx.api.views', credential_urls = patterns('awx.api.views',
url(r'^$', 'credential_list'), url(r'^$', 'credential_list'),
url(r'^(?P<pk>[0-9]+)/activity_stream/$', 'credential_activity_stream_list'), url(r'^(?P<pk>[0-9]+)/activity_stream/$', 'credential_activity_stream_list'),
@ -335,7 +340,7 @@ activity_stream_urls = patterns('awx.api.views',
) )
v1_urls = patterns('awx.api.views', v1_urls = patterns('awx.api.views',
url(r'^$', 'api_version_root_view'), url(r'^$', 'api_v1_root_view'),
url(r'^ping/$', 'api_v1_ping_view'), url(r'^ping/$', 'api_v1_ping_view'),
url(r'^config/$', 'api_v1_config_view'), url(r'^config/$', 'api_v1_config_view'),
url(r'^auth/$', 'auth_view'), url(r'^auth/$', 'auth_view'),
@ -378,7 +383,13 @@ v1_urls = patterns('awx.api.views',
url(r'^activity_stream/', include(activity_stream_urls)), url(r'^activity_stream/', include(activity_stream_urls)),
) )
v2_urls = patterns('awx.api.views',
url(r'^$', 'api_v2_root_view'),
url(r'^credential_types/', include(credential_type_urls)),
)
urlpatterns = patterns('awx.api.views', urlpatterns = patterns('awx.api.views',
url(r'^$', 'api_root_view'), url(r'^$', 'api_root_view'),
url(r'^(?P<version>(v2))/', include(v2_urls)),
url(r'^(?P<version>(v1|v2))/', include(v1_urls)) url(r'^(?P<version>(v1|v2))/', include(v1_urls))
) )

View File

@ -1,10 +1,22 @@
# Copyright (c) 2017 Ansible by Red Hat # Copyright (c) 2017 Ansible by Red Hat
# All Rights Reserved. # All Rights Reserved.
from django.conf import settings
from rest_framework.reverse import reverse as drf_reverse from rest_framework.reverse import reverse as drf_reverse
from rest_framework.versioning import URLPathVersioning as BaseVersioning from rest_framework.versioning import URLPathVersioning as BaseVersioning
def get_request_version(request):
"""
The API version of a request as an integer i.e., 1 or 2
"""
version = settings.REST_FRAMEWORK['DEFAULT_VERSION']
if request and hasattr(request, 'version'):
version = request.version
return int(version.lstrip('v'))
def reverse(viewname, args=None, kwargs=None, request=None, format=None, **extra): def reverse(viewname, args=None, kwargs=None, request=None, format=None, **extra):
if request is None or getattr(request, 'version', None) is None: if request is None or getattr(request, 'version', None) is None:
# We need the "current request" to determine the correct version to # We need the "current request" to determine the correct version to
@ -13,7 +25,7 @@ def reverse(viewname, args=None, kwargs=None, request=None, format=None, **extra
if kwargs is None: if kwargs is None:
kwargs = {} kwargs = {}
if 'version' not in kwargs: if 'version' not in kwargs:
kwargs['version'] = 'v2' kwargs['version'] = settings.REST_FRAMEWORK['DEFAULT_VERSION']
return drf_reverse(viewname, args, kwargs, request, format, **extra) return drf_reverse(viewname, args, kwargs, request, format, **extra)

View File

@ -64,7 +64,7 @@ from awx.main.ha import is_ha_environment
from awx.api.authentication import TaskAuthentication, TokenGetAuthentication from awx.api.authentication import TaskAuthentication, TokenGetAuthentication
from awx.api.generics import get_view_name from awx.api.generics import get_view_name
from awx.api.generics import * # noqa from awx.api.generics import * # noqa
from awx.api.versioning import reverse from awx.api.versioning import reverse, get_request_version
from awx.conf.license import get_license, feature_enabled, feature_exists, LicenseForbids from awx.conf.license import get_license, feature_enabled, feature_exists, LicenseForbids
from awx.main.models import * # noqa from awx.main.models import * # noqa
from awx.main.utils import * # noqa from awx.main.utils import * # noqa
@ -134,8 +134,8 @@ class ApiRootView(APIView):
def get(self, request, format=None): def get(self, request, format=None):
''' list supported API versions ''' ''' list supported API versions '''
v1 = reverse('api:api_version_root_view', kwargs={'version': 'v1'}) v1 = reverse('api:api_v1_root_view', kwargs={'version': 'v1'})
v2 = reverse('api:api_version_root_view', kwargs={'version': 'v2'}) v2 = reverse('api:api_v2_root_view', kwargs={'version': 'v2'})
data = dict( data = dict(
description = _('Ansible Tower REST API'), description = _('Ansible Tower REST API'),
current_version = v2, current_version = v2,
@ -150,12 +150,10 @@ class ApiRootView(APIView):
class ApiVersionRootView(APIView): class ApiVersionRootView(APIView):
authentication_classes = [] authentication_classes = []
view_name = _('Version')
permission_classes = (AllowAny,) permission_classes = (AllowAny,)
def get(self, request, format=None): def get(self, request, format=None):
''' list top level resources ''' ''' list top level resources '''
data = OrderedDict() data = OrderedDict()
data['authtoken'] = reverse('api:auth_token_view', request=request) data['authtoken'] = reverse('api:auth_token_view', request=request)
data['ping'] = reverse('api:api_v1_ping_view', request=request) data['ping'] = reverse('api:api_v1_ping_view', request=request)
@ -169,6 +167,8 @@ class ApiVersionRootView(APIView):
data['project_updates'] = reverse('api:project_update_list', request=request) data['project_updates'] = reverse('api:project_update_list', request=request)
data['teams'] = reverse('api:team_list', request=request) data['teams'] = reverse('api:team_list', request=request)
data['credentials'] = reverse('api:credential_list', request=request) data['credentials'] = reverse('api:credential_list', request=request)
if get_request_version(request) > 1:
data['credential_types'] = reverse('api:credential_type_list', request=request)
data['inventory'] = reverse('api:inventory_list', request=request) data['inventory'] = reverse('api:inventory_list', request=request)
data['inventory_scripts'] = reverse('api:inventory_script_list', request=request) data['inventory_scripts'] = reverse('api:inventory_script_list', request=request)
data['inventory_sources'] = reverse('api:inventory_source_list', request=request) data['inventory_sources'] = reverse('api:inventory_source_list', request=request)
@ -196,6 +196,16 @@ class ApiVersionRootView(APIView):
return Response(data) return Response(data)
class ApiV1RootView(ApiVersionRootView):
view_name = _('Version 1')
class ApiV2RootView(ApiVersionRootView):
view_name = _('Version 2')
new_in_320 = True
new_in_api_v2 = True
class ApiV1PingView(APIView): class ApiV1PingView(APIView):
"""A simple view that reports very basic information about this Tower """A simple view that reports very basic information about this Tower
instance, which is acceptable to be public information. instance, which is acceptable to be public information.
@ -1476,6 +1486,22 @@ class UserAccessList(ResourceAccessList):
new_in_300 = True new_in_300 = True
class CredentialTypeList(ListCreateAPIView):
model = CredentialType
serializer_class = CredentialTypeSerializer
new_in_320 = True
new_in_api_v2 = True
class CredentialTypeDetail(RetrieveUpdateDestroyAPIView):
model = CredentialType
serializer_class = CredentialTypeSerializer
new_in_320 = True
new_in_api_v2 = True
class CredentialList(ListCreateAPIView): class CredentialList(ListCreateAPIView):
model = Credential model = Credential

View File

@ -824,6 +824,36 @@ class InventoryUpdateAccess(BaseAccess):
return self.user in obj.inventory_source.inventory.admin_role return self.user in obj.inventory_source.inventory.admin_role
class CredentialTypeAccess(BaseAccess):
'''
I can see credentials types when:
- I'm authenticated
I can create when:
- I'm a superuser:
I can change when:
- I'm a superuser and the type is not "managed by Tower"
I can change/delete when:
- I'm a superuser and the type is not "managed by Tower"
'''
model = CredentialType
def can_read(self, obj):
return True
def can_use(self, obj):
return True
def can_add(self, data):
return self.user.is_superuser
def can_change(self, obj, data):
return self.user.is_superuser and not obj.managed_by_tower
def can_delete(self, obj):
return self.user.is_superuser and not obj.managed_by_tower
class CredentialAccess(BaseAccess): class CredentialAccess(BaseAccess):
''' '''
I can see credentials when: I can see credentials when:
@ -2282,6 +2312,7 @@ register_access(Group, GroupAccess)
register_access(InventorySource, InventorySourceAccess) register_access(InventorySource, InventorySourceAccess)
register_access(InventoryUpdate, InventoryUpdateAccess) register_access(InventoryUpdate, InventoryUpdateAccess)
register_access(Credential, CredentialAccess) register_access(Credential, CredentialAccess)
register_access(CredentialType, CredentialTypeAccess)
register_access(Team, TeamAccess) register_access(Team, TeamAccess)
register_access(Project, ProjectAccess) register_access(Project, ProjectAccess)
register_access(ProjectUpdate, ProjectUpdateAccess) register_access(ProjectUpdate, ProjectUpdateAccess)

View File

@ -2,13 +2,18 @@
# All Rights Reserved. # All Rights Reserved.
# Python # Python
import copy
import json import json
import re import re
import sys import sys
import six import six
from pyparsing import infixNotation, opAssoc, Optional, Literal, CharsNotIn from pyparsing import infixNotation, opAssoc, Optional, Literal, CharsNotIn
from jinja2 import Environment, StrictUndefined
from jinja2.exceptions import UndefinedError
# Django # Django
from django.core import exceptions as django_exceptions
from django.db.models.signals import ( from django.db.models.signals import (
post_save, post_save,
post_delete, post_delete,
@ -24,6 +29,10 @@ from django.db.models.fields.related import (
) )
from django.utils.encoding import smart_text from django.utils.encoding import smart_text
from django.db.models import Q from django.db.models import Q
from django.utils.translation import ugettext_lazy as _
# jsonschema
from jsonschema import Draft4Validator
# Django-JSONField # Django-JSONField
from jsonfield import JSONField as upstream_JSONField from jsonfield import JSONField as upstream_JSONField
@ -526,3 +535,236 @@ class DynamicFilterField(models.TextField):
raise RuntimeError("Parsing the filter_string %s went terribly wrong" % filter_string) raise RuntimeError("Parsing the filter_string %s went terribly wrong" % filter_string)
class JSONSchemaField(JSONBField):
"""
A JSONB field that self-validates against a defined JSON schema
(http://json-schema.org). This base class is intended to be overwritten by
defining `self.schema`.
"""
# If an empty {} is provided, we still want to perform this schema
# validation
empty_values=(None, '')
def get_default(self):
return copy.deepcopy(super(JSONBField, self).get_default())
def schema(self, model_instance):
raise NotImplementedError()
def validate(self, value, model_instance):
super(JSONSchemaField, self).validate(value, model_instance)
errors = []
for error in Draft4Validator(self.schema(model_instance)).iter_errors(value):
errors.append(error)
if errors:
raise django_exceptions.ValidationError(
[e.message for e in errors],
code='invalid',
params={'value': value},
)
def get_db_prep_value(self, value, connection, prepared=False):
if connection.vendor == 'sqlite':
# sqlite (which we use for tests) does not support jsonb;
return json.dumps(value)
return super(JSONSchemaField, self).get_db_prep_value(
value, connection, prepared
)
def from_db_value(self, value, expression, connection, context):
# Work around a bug in django-jsonfield
# https://bitbucket.org/schinckel/django-jsonfield/issues/57/cannot-use-in-the-same-project-as-djangos
if isinstance(value, six.string_types):
return json.loads(value)
return value
class CredentialInputField(JSONSchemaField):
"""
Used to validate JSON for
`awx.main.models.credential:Credential().inputs`.
Input data for credentials is represented as a dictionary e.g.,
{'api_token': 'abc123', 'api_secret': 'SECRET'}
For the data to be valid, the keys of this dictionary should correspond
with the field names (and datatypes) defined in the associated
CredentialType e.g.,
{
'fields': [{
'id': 'api_token',
'label': 'API Token',
'type': 'string'
}, {
'id': 'api_secret',
'label': 'API Secret',
'type': 'string'
}]
}
"""
def schema(self, model_instance):
# determine the defined fields for the associated credential type
properties = {}
for field in model_instance.credential_type.inputs.get('fields', []):
field = field.copy()
properties[field.pop('id')] = field
return {
'type': 'object',
'properties': properties,
'additionalProperties': False,
}
def validate(self, value, model_instance):
super(CredentialInputField, self).validate(
value, model_instance
)
errors = []
inputs = model_instance.credential_type.inputs
for field in inputs.get('required', []):
if not value.get(field, None):
errors.append(
_('%s required for %s credential.') % (
field, model_instance.credential_type.name
)
)
if errors:
raise django_exceptions.ValidationError(
errors,
code='invalid',
params={'value': value},
)
class CredentialTypeInputField(JSONSchemaField):
"""
Used to validate JSON for
`awx.main.models.credential:CredentialType().inputs`.
"""
def schema(self, model_instance):
return {
'type': 'object',
'additionalProperties': False,
'properties': {
'fields': {
'type': 'array',
'items': {
'type': 'object',
'properties': {
'type': {'enum': ['string', 'number', 'ssh_private_key']},
'choices': {
'type': 'array',
'minItems': 1,
'items': {'type': 'string'},
'uniqueItems': True
},
'id': {'type': 'string'},
'label': {'type': 'string'},
'help_text': {'type': 'string'},
'multiline': {'type': 'boolean'},
'secret': {'type': 'boolean'},
'ask_at_runtime': {'type': 'boolean'},
},
'additionalProperties': False,
'required': ['id', 'label'],
}
}
}
}
class CredentialTypeInjectorField(JSONSchemaField):
"""
Used to validate JSON for
`awx.main.models.credential:CredentialType().injectors`.
"""
def schema(self, model_instance):
return {
'type': 'object',
'additionalProperties': False,
'properties': {
'file': {
'type': 'object',
'properties': {
'template': {'type': 'string'},
},
'additionalProperties': False,
'required': ['template'],
},
'ssh': {
'type': 'object',
'properties': {
'private': {'type': 'string'},
'public': {'type': 'string'},
},
'additionalProperties': False,
'required': ['public', 'private'],
},
'password': {
'type': 'object',
'properties': {
'key': {'type': 'string'},
'value': {'type': 'string'},
},
'additionalProperties': False,
'required': ['key', 'value'],
},
'env': {
'type': 'object',
'patternProperties': {
# http://pubs.opengroup.org/onlinepubs/9699919799/basedefs/V1_chap08.html
# In the shell command language, a word consisting solely
# of underscores, digits, and alphabetics from the portable
# character set. The first character of a name is not
# a digit.
'^[a-zA-Z_]+[a-zA-Z0-9_]*$': {'type': 'string'},
},
'additionalProperties': False,
},
'extra_vars': {
'type': 'object',
'patternProperties': {
# http://docs.ansible.com/ansible/playbooks_variables.html#what-makes-a-valid-variable-name
'^[a-zA-Z_]+[a-zA-Z0-9_]*$': {'type': 'string'},
},
'additionalProperties': False,
},
},
'additionalProperties': False
}
def validate(self, value, model_instance):
super(CredentialTypeInjectorField, self).validate(
value, model_instance
)
# make sure the inputs are clean first
CredentialTypeInputField().validate(model_instance.inputs, model_instance)
# In addition to basic schema validation, search the injector fields
# for template variables and make sure they match the fields defined in
# the inputs
valid_namespace = dict(
(field, 'EXAMPLE')
for field in model_instance.defined_fields
)
for type_, injector in value.items():
for key, tmpl in injector.items():
try:
Environment(
undefined=StrictUndefined
).from_string(tmpl).render(valid_namespace)
except UndefinedError as e:
raise django_exceptions.ValidationError(
_('%s uses an undefined field (%s)') % (key, e),
code='invalid',
params={'value': value},
)

View File

@ -3,7 +3,7 @@
from django.core.management.base import BaseCommand from django.core.management.base import BaseCommand
from crum import impersonate from crum import impersonate
from awx.main.models import User, Organization, Project, Inventory, Credential, Host, JobTemplate from awx.main.models import User, Organization, Project, Inventory, CredentialType, Credential, Host, JobTemplate
class Command(BaseCommand): class Command(BaseCommand):
@ -30,8 +30,12 @@ class Command(BaseCommand):
scm_update_cache_timeout=0, scm_update_cache_timeout=0,
organization=o) organization=o)
p.save(skip_update=True) p.save(skip_update=True)
c = Credential.objects.create(name='Demo Credential', ssh_type = CredentialType.from_v1_kind('ssh')
username=superuser.username, c = Credential.objects.create(credential_type=ssh_type,
name='Demo Credential',
inputs={
'username': superuser.username
},
created_by=superuser) created_by=superuser)
c.admin_role.members.add(superuser) c.admin_role.members.add(superuser)
i = Inventory.objects.create(name='Demo Inventory', i = Inventory.objects.create(name='Demo Inventory',

View File

@ -0,0 +1,57 @@
# -*- coding: utf-8 -*-
from __future__ import unicode_literals
from django.db import migrations, models
import django.db.models.deletion
from django.conf import settings
import taggit.managers
import awx.main.fields
class Migration(migrations.Migration):
dependencies = [
('main', '0038_v320_data_migrations'),
]
operations = [
migrations.CreateModel(
name='CredentialType',
fields=[
('id', models.AutoField(verbose_name='ID', serialize=False, auto_created=True, primary_key=True)),
('created', models.DateTimeField(default=None, editable=False)),
('modified', models.DateTimeField(default=None, editable=False)),
('description', models.TextField(default=b'', blank=True)),
('name', models.CharField(max_length=512)),
('kind', models.CharField(max_length=32, choices=[(b'machine', 'Machine'), (b'net', 'Network'), (b'scm', 'Source Control'), (b'cloud', 'Cloud')])),
('managed_by_tower', models.BooleanField(default=False, editable=False)),
('inputs', awx.main.fields.CredentialTypeInputField(default={}, blank=True)),
('injectors', awx.main.fields.CredentialTypeInjectorField(default={}, blank=True)),
('created_by', models.ForeignKey(related_name="{u'class': 'credentialtype', u'app_label': 'main'}(class)s_created+", on_delete=django.db.models.deletion.SET_NULL, default=None, editable=False, to=settings.AUTH_USER_MODEL, null=True)),
('modified_by', models.ForeignKey(related_name="{u'class': 'credentialtype', u'app_label': 'main'}(class)s_modified+", on_delete=django.db.models.deletion.SET_NULL, default=None, editable=False, to=settings.AUTH_USER_MODEL, null=True)),
('tags', taggit.managers.TaggableManager(to='taggit.Tag', through='taggit.TaggedItem', blank=True, help_text='A comma-separated list of tags.', verbose_name='Tags')),
],
options={
'ordering': ('kind', 'name'),
},
),
migrations.AlterModelOptions(
name='credential',
options={'ordering': ('name',)},
),
migrations.AddField(
model_name='credential',
name='inputs',
field=awx.main.fields.CredentialInputField(default={}, blank=True),
),
migrations.AddField(
model_name='credential',
name='credential_type',
field=models.ForeignKey(related_name='credentials', to='main.CredentialType', null=True),
preserve_default=False,
),
migrations.AlterUniqueTogether(
name='credential',
unique_together=set([('organization', 'name', 'credential_type')]),
),
]

View File

@ -0,0 +1,16 @@
# -*- coding: utf-8 -*-
from __future__ import unicode_literals
from django.db import migrations
from awx.main.migrations import _credentialtypes as credentialtypes
class Migration(migrations.Migration):
dependencies = [
('main', '0039_v320_add_credentialtype_model'),
]
operations = [
migrations.RunPython(credentialtypes.migrate_to_v2_credentials),
]

View File

@ -0,0 +1,98 @@
# -*- coding: utf-8 -*-
from __future__ import unicode_literals
from django.db import migrations
class Migration(migrations.Migration):
dependencies = [
('main', '0040_v320_migrate_v1_credentials'),
]
operations = [
migrations.RemoveField(
model_name='credential',
name='authorize',
),
migrations.RemoveField(
model_name='credential',
name='authorize_password',
),
migrations.RemoveField(
model_name='credential',
name='become_method',
),
migrations.RemoveField(
model_name='credential',
name='become_password',
),
migrations.RemoveField(
model_name='credential',
name='become_username',
),
migrations.RemoveField(
model_name='credential',
name='client',
),
migrations.RemoveField(
model_name='credential',
name='cloud',
),
migrations.RemoveField(
model_name='credential',
name='domain',
),
migrations.RemoveField(
model_name='credential',
name='host',
),
migrations.RemoveField(
model_name='credential',
name='kind',
),
migrations.RemoveField(
model_name='credential',
name='password',
),
migrations.RemoveField(
model_name='credential',
name='project',
),
migrations.RemoveField(
model_name='credential',
name='secret',
),
migrations.RemoveField(
model_name='credential',
name='security_token',
),
migrations.RemoveField(
model_name='credential',
name='ssh_key_data',
),
migrations.RemoveField(
model_name='credential',
name='ssh_key_unlock',
),
migrations.RemoveField(
model_name='credential',
name='subscription',
),
migrations.RemoveField(
model_name='credential',
name='tenant',
),
migrations.RemoveField(
model_name='credential',
name='username',
),
migrations.RemoveField(
model_name='credential',
name='vault_password',
),
migrations.AlterUniqueTogether(
name='credentialtype',
unique_together=set([('name', 'kind')]),
),
]

View File

@ -0,0 +1,49 @@
from django.db.models.signals import post_save
from awx.main.models import Credential, CredentialType
def migrate_to_v2_credentials(apps, schema_editor):
CredentialType.setup_tower_managed_defaults()
for cred in apps.get_model('main', 'Credential').objects.all():
data = {}
if getattr(cred, 'vault_password', None):
data['vault_password'] = cred.vault_password
credential_type = CredentialType.from_v1_kind(cred.kind, data)
defined_fields = credential_type.defined_fields
cred.credential_type = apps.get_model('main', 'CredentialType').objects.get(pk=credential_type.pk)
# temporarily disable implicit role signals; the class we're working on
# is the "pre-migration" credential model; our signals don't like that
# it differs from the "post-migration" credential model
for field in cred.__class__.__implicit_role_fields:
post_save.disconnect(field, cred.__class__, dispatch_uid='implicit-role-post-save')
for field in defined_fields:
if getattr(cred, field, None):
cred.inputs[field] = getattr(cred, field)
cred.save()
#
# If the credential contains a vault password, create a new
# *additional* credential with the proper CredentialType; this needs to
# perform a deep copy of the Credential that considers:
#
if cred.vault_password:
new_fields = {}
for field in CredentialType.from_v1_kind('ssh').defined_fields:
if getattr(cred, field, None):
new_fields[field] = getattr(cred, field)
if new_fields:
# We need to make an ssh credential, too
new_cred = Credential(credential_type=CredentialType.from_v1_kind('ssh'))
for field, value in new_fields.items():
new_cred.inputs[field] = value
# TODO: copy RBAC and Job Template assignments
new_cred.save()
# re-enable implicit role signals
for field in cred.__class__.__implicit_role_fields:
post_save.connect(field._post_save, cred.__class__, True, dispatch_uid='implicit-role-post-save')

View File

@ -99,7 +99,7 @@ class AdHocCommand(UnifiedJob, JobNotificationMixin):
def clean_credential(self): def clean_credential(self):
cred = self.credential cred = self.credential
if cred and cred.kind != 'ssh': if cred and cred.kind != 'machine':
raise ValidationError( raise ValidationError(
_('You must provide a machine / SSH credential.'), _('You must provide a machine / SSH credential.'),
) )

View File

@ -229,10 +229,8 @@ class PasswordFieldsModel(BaseModel):
setattr(self, field, '') setattr(self, field, '')
else: else:
ask = self._password_field_allows_ask(field) ask = self._password_field_allows_ask(field)
encrypted = encrypt_field(self, field, ask) self.encrypt_field(field, ask)
setattr(self, field, encrypted) self.mark_field_for_save(update_fields, field)
if field not in update_fields:
update_fields.append(field)
super(PasswordFieldsModel, self).save(*args, **kwargs) super(PasswordFieldsModel, self).save(*args, **kwargs)
# After saving a new instance for the first time, set the password # After saving a new instance for the first time, set the password
# fields and save again. # fields and save again.
@ -241,9 +239,17 @@ class PasswordFieldsModel(BaseModel):
for field in self.PASSWORD_FIELDS: for field in self.PASSWORD_FIELDS:
saved_value = getattr(self, '_saved_%s' % field, '') saved_value = getattr(self, '_saved_%s' % field, '')
setattr(self, field, saved_value) setattr(self, field, saved_value)
update_fields.append(field) self.mark_field_for_save(update_fields, field)
self.save(update_fields=update_fields) self.save(update_fields=update_fields)
def encrypt_field(self, field, ask):
encrypted = encrypt_field(self, field, ask)
setattr(self, field, encrypted)
def mark_field_for_save(self, update_fields, field):
if field not in update_fields:
update_fields.append(field)
class PrimordialModel(CreatedModifiedModel): class PrimordialModel(CreatedModifiedModel):
''' '''

File diff suppressed because it is too large Load Diff

View File

@ -156,7 +156,7 @@ class JobOptions(BaseModel):
def clean_credential(self): def clean_credential(self):
cred = self.credential cred = self.credential
if cred and cred.kind != 'ssh': if cred and cred.kind != 'machine':
raise ValidationError( raise ValidationError(
_('You must provide a machine / SSH credential.'), _('You must provide a machine / SSH credential.'),
) )

View File

@ -10,6 +10,7 @@ from awx.main.models import (
JobTemplate, JobTemplate,
Job, Job,
NotificationTemplate, NotificationTemplate,
CredentialType,
Credential, Credential,
Inventory, Inventory,
Label, Label,
@ -84,8 +85,14 @@ def mk_project(name, organization=None, description=None, persisted=True):
return project return project
def mk_credential(name, cloud=False, kind='ssh', persisted=True): def mk_credential(name, credential_type='ssh', persisted=True):
cred = Credential(name=name, cloud=cloud, kind=kind) type_ = CredentialType.defaults[credential_type]()
if persisted:
type_.save()
cred = Credential(
credential_type=type_,
name=name
)
if persisted: if persisted:
cred.save() cred.save()
return cred return cred

View File

@ -213,12 +213,12 @@ def create_job_template(name, roles=None, persisted=True, **kwargs):
if 'cloud_credential' in kwargs: if 'cloud_credential' in kwargs:
cloud_cred = kwargs['cloud_credential'] cloud_cred = kwargs['cloud_credential']
if type(cloud_cred) is not Credential: if type(cloud_cred) is not Credential:
cloud_cred = mk_credential(cloud_cred, kind='aws', persisted=persisted) cloud_cred = mk_credential(cloud_cred, credential_type='aws', persisted=persisted)
if 'network_credential' in kwargs: if 'network_credential' in kwargs:
net_cred = kwargs['network_credential'] net_cred = kwargs['network_credential']
if type(net_cred) is not Credential: if type(net_cred) is not Credential:
net_cred = mk_credential(net_cred, kind='net', persisted=persisted) net_cred = mk_credential(net_cred, credential_type='net', persisted=persisted)
if 'project' in kwargs: if 'project' in kwargs:
proj = kwargs['project'] proj = kwargs['project']

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,205 @@
import json
import pytest
from awx.main.models.credential import CredentialType
from awx.api.versioning import reverse
@pytest.mark.django_db
def test_list_as_unauthorized_xfail(get):
response = get(reverse('api:credential_type_list'))
assert response.status_code == 401
@pytest.mark.django_db
def test_list_as_normal_user(get, alice):
response = get(reverse('api:credential_type_list'), alice)
assert response.status_code == 200
assert response.data['count'] == 0
@pytest.mark.django_db
def test_list_as_admin(get, admin):
response = get(reverse('api:credential_type_list'), admin)
assert response.status_code == 200
assert response.data['count'] == 0
@pytest.mark.django_db
def test_create_as_unauthorized_xfail(get, post):
response = post(reverse('api:credential_type_list'), {
'name': 'Custom Credential Type',
})
assert response.status_code == 401
@pytest.mark.django_db
def test_update_as_unauthorized_xfail(patch):
ssh = CredentialType.defaults['ssh']()
ssh.save()
response = patch(
reverse('api:credential_type_detail', kwargs={'pk': ssh.pk}),
{
'name': 'Some Other Name'
}
)
assert response.status_code == 401
@pytest.mark.django_db
def test_delete_as_unauthorized_xfail(delete):
ssh = CredentialType.defaults['ssh']()
ssh.save()
response = delete(
reverse('api:credential_type_detail', kwargs={'pk': ssh.pk}),
)
assert response.status_code == 401
@pytest.mark.django_db
def test_create_as_normal_user_xfail(get, post, alice):
response = post(reverse('api:credential_type_list'), {
'name': 'Custom Credential Type',
}, alice)
assert response.status_code == 403
assert get(reverse('api:credential_type_list'), alice).data['count'] == 0
@pytest.mark.django_db
def test_create_as_admin(get, post, admin):
response = post(reverse('api:credential_type_list'), {
'kind': 'cloud',
'name': 'Custom Credential Type',
'inputs': {},
'injectors': {}
}, admin)
assert response.status_code == 201
response = get(reverse('api:credential_type_list'), admin)
assert response.data['count'] == 1
assert response.data['results'][0]['name'] == 'Custom Credential Type'
assert response.data['results'][0]['inputs'] == {}
assert response.data['results'][0]['injectors'] == {}
assert response.data['results'][0]['managed_by_tower'] is False
@pytest.mark.django_db
def test_create_managed_by_tower_readonly(get, post, admin):
response = post(reverse('api:credential_type_list'), {
'kind': 'cloud',
'name': 'Custom Credential Type',
'inputs': {},
'injectors': {},
'managed_by_tower': True
}, admin)
assert response.status_code == 201
response = get(reverse('api:credential_type_list'), admin)
assert response.data['count'] == 1
assert response.data['results'][0]['managed_by_tower'] is False
@pytest.mark.django_db
def test_create_with_valid_inputs(get, post, admin):
response = post(reverse('api:credential_type_list'), {
'kind': 'cloud',
'name': 'MyCloud',
'inputs': {
'fields': [{
'id': 'api_token',
'label': 'API Token',
'type': 'string',
'secret': True,
'ask_at_runtime': True
}]
},
'injectors': {}
}, admin)
assert response.status_code == 201
response = get(reverse('api:credential_type_list'), admin)
assert response.data['count'] == 1
fields = response.data['results'][0]['inputs']['fields']
assert len(fields) == 1
assert fields[0]['id'] == 'api_token'
assert fields[0]['label'] == 'API Token'
assert fields[0]['ask_at_runtime'] is True
assert fields[0]['secret'] is True
assert fields[0]['type'] == 'string'
@pytest.mark.django_db
def test_create_with_invalid_inputs_xfail(post, admin):
response = post(reverse('api:credential_type_list'), {
'kind': 'cloud',
'name': 'MyCloud',
'inputs': {'feeelds': {},},
'injectors': {}
}, admin)
assert response.status_code == 400
assert "'feeelds' was unexpected" in json.dumps(response.data)
@pytest.mark.django_db
def test_create_with_valid_injectors(get, post, admin):
response = post(reverse('api:credential_type_list'), {
'kind': 'cloud',
'name': 'MyCloud',
'inputs': {
'fields': [{
'id': 'api_token',
'label': 'API Token',
'type': 'string',
'secret': True,
'ask_at_runtime': True
}]
},
'injectors': {
'env': {
'ANSIBLE_MY_CLOUD_TOKEN': '{{api_token}}'
}
}
}, admin)
assert response.status_code == 201
response = get(reverse('api:credential_type_list'), admin)
assert response.data['count'] == 1
injectors = response.data['results'][0]['injectors']
assert len(injectors) == 1
assert injectors['env'] == {
'ANSIBLE_MY_CLOUD_TOKEN': '{{api_token}}'
}
@pytest.mark.django_db
def test_create_with_invalid_injectors_xfail(post, admin):
response = post(reverse('api:credential_type_list'), {
'kind': 'cloud',
'name': 'MyCloud',
'inputs': {},
'injectors': {'nonsense': 123}
}, admin)
assert response.status_code == 400
@pytest.mark.django_db
def test_create_with_undefined_template_variable_xfail(post, admin):
response = post(reverse('api:credential_type_list'), {
'kind': 'cloud',
'name': 'MyCloud',
'inputs': {
'fields': [{
'id': 'api_token',
'label': 'API Token',
'type': 'string',
'secret': True,
'ask_at_runtime': True
}]
},
'injectors': {
'env': {'ANSIBLE_MY_CLOUD_TOKEN': '{{api_tolkien}}'}
}
}, admin)
assert response.status_code == 400
assert "'api_tolkien' is undefined" in json.dumps(response.data)

View File

@ -10,8 +10,15 @@ from awx.api.versioning import reverse
@pytest.fixture @pytest.fixture
def runtime_data(organization): def runtime_data(organization, credentialtype_ssh):
cred_obj = Credential.objects.create(name='runtime-cred', kind='ssh', username='test_user2', password='pas4word2') cred_obj = Credential.objects.create(
name='runtime-cred',
credential_type=credentialtype_ssh,
inputs={
'username': 'test_user2',
'password': 'pas4word2'
}
)
inv_obj = organization.inventories.create(name="runtime-inv") inv_obj = organization.inventories.create(name="runtime-inv")
return dict( return dict(
extra_vars='{"job_launch_var": 4}', extra_vars='{"job_launch_var": 4}',

View File

@ -28,7 +28,7 @@ from rest_framework.test import (
force_authenticate, force_authenticate,
) )
from awx.main.models.credential import Credential from awx.main.models.credential import CredentialType, Credential
from awx.main.models.jobs import JobTemplate from awx.main.models.jobs import JobTemplate
from awx.main.models.inventory import ( from awx.main.models.inventory import (
Group, Group,
@ -191,18 +191,43 @@ def organization(instance):
@pytest.fixture @pytest.fixture
def credential(): def credentialtype_ssh():
return Credential.objects.create(kind='aws', name='test-cred', username='something', password='secret') ssh = CredentialType.defaults['ssh']()
ssh.save()
return ssh
@pytest.fixture @pytest.fixture
def machine_credential(): def credentialtype_aws():
return Credential.objects.create(name='machine-cred', kind='ssh', username='test_user', password='pas4word') aws = CredentialType.defaults['aws']()
aws.save()
return aws
@pytest.fixture @pytest.fixture
def org_credential(organization): def credentialtype_net():
return Credential.objects.create(kind='aws', name='test-cred', username='something', password='secret', organization=organization) net = CredentialType.defaults['net']()
net.save()
return net
@pytest.fixture
def credential(credentialtype_aws):
return Credential.objects.create(credential_type=credentialtype_aws, name='test-cred',
inputs={'username': 'something', 'password': 'secret'})
@pytest.fixture
def machine_credential(credentialtype_ssh):
return Credential.objects.create(credential_type=credentialtype_ssh, name='machine-cred',
inputs={'username': 'test_user', 'password': 'pas4word'})
@pytest.fixture
def org_credential(organization, credentialtype_aws):
return Credential.objects.create(credential_type=credentialtype_aws, name='test-cred',
inputs={'username': 'something', 'password': 'secret'},
organization=organization)
@pytest.fixture @pytest.fixture

View File

@ -0,0 +1,289 @@
# Copyright (c) 2017 Ansible by Red Hat
# All Rights Reserved.
import pytest
from django.core.exceptions import ValidationError
from awx.main.utils.common import decrypt_field
from awx.main.models import Credential, CredentialType
@pytest.mark.django_db
def test_default_cred_types():
assert sorted(CredentialType.defaults.keys()) == [
'aws',
'azure',
'azure_rm',
'cloudforms',
'gce',
'net',
'openstack',
'rackspace',
'satellite6',
'scm',
'ssh',
'vault',
'vmware',
]
for type_ in CredentialType.defaults.values():
assert type_().managed_by_tower is True
@pytest.mark.django_db
@pytest.mark.parametrize('kind', ['net', 'scm', 'ssh', 'vault'])
def test_cred_type_kind_uniqueness(kind):
"""
non-cloud credential types are exclusive_on_kind (you can only use *one* of
them at a time)
"""
assert CredentialType.defaults[kind]().unique_by_kind is True
@pytest.mark.django_db
def test_cloud_kind_uniqueness():
"""
you can specify more than one cloud credential type (as long as they have
different names so you don't e.g., use ec2 twice")
"""
assert CredentialType.defaults['aws']().unique_by_kind is False
@pytest.mark.django_db
@pytest.mark.parametrize('input_, valid', [
({}, True),
({'fields': []}, True),
({'fields': {}}, False),
({'fields': 123}, False),
({'fields': [{'id': 'username', 'label': 'Username', 'foo': 'bar'}]}, False),
({'fields': [{'id': 'username', 'label': 'Username', 'type': 'string'}]}, True),
({'fields': [{'id': 'username', 'label': 'Username', 'help_text': 1}]}, False),
({'fields': [{'id': 'username', 'label': 'Username', 'help_text': 'Help Text'}]}, True), # noqa
({'fields': [{'id': 'password', 'label': 'Password', 'type': 'number'}]}, True),
({'fields': [{'id': 'ssh_key', 'label': 'SSH Key', 'type': 'ssh_private_key'}]}, True), # noqa
({'fields': [{'id': 'other', 'label': 'Other', 'type': 'boolean'}]}, False),
({'fields': [{'id': 'certificate', 'label': 'Cert', 'multiline': True}]}, True),
({'fields': [{'id': 'certificate', 'label': 'Cert', 'multiline': 'bad'}]}, False), # noqa
({'fields': [{'id': 'token', 'label': 'Token', 'secret': True}]}, True),
({'fields': [{'id': 'token', 'label': 'Token', 'secret': 'bad'}]}, False),
({'fields': [{'id': 'token', 'label': 'Token', 'ask_at_runtime': True}]}, True),
({'fields': [{'id': 'token', 'label': 'Token', 'ask_at_runtime': 'bad'}]}, False), # noqa
({'fields': [{'id': 'become_method', 'label': 'Become', 'choices': 'not-a-list'}]}, False), # noqa
({'fields': [{'id': 'become_method', 'label': 'Become', 'choices': []}]}, False),
({'fields': [{'id': 'become_method', 'label': 'Become', 'choices': ['su', 'sudo']}]}, True), # noqa
({'fields': [{'id': 'become_method', 'label': 'Become', 'choices': ['dup', 'dup']}]}, False), # noqa
])
def test_cred_type_input_schema_validity(input_, valid):
type_ = CredentialType(
kind='cloud',
name='SomeCloud',
managed_by_tower=True,
inputs=input_
)
if valid is False:
with pytest.raises(ValidationError):
type_.full_clean()
else:
type_.full_clean()
@pytest.mark.django_db
@pytest.mark.parametrize('injectors, valid', [
({}, True),
({'invalid-injector': {}}, False),
({'file': 123}, False),
({'file': {}}, False),
({'file': {'template': '{{username}}'}}, True),
({'file': {'foo': 'bar'}}, False),
({'ssh': 123}, False),
({'ssh': {}}, False),
({'ssh': {'public': 'PUB'}}, False),
({'ssh': {'private': 'PRIV'}}, False),
({'ssh': {'public': 'PUB', 'private': 'PRIV'}}, True),
({'ssh': {'public': 'PUB', 'private': 'PRIV', 'a': 'b'}}, False),
({'password': {}}, False),
({'password': {'key': 'Password:'}}, False),
({'password': {'value': '{{pass}}'}}, False),
({'password': {'key': 'Password:', 'value': '{{pass}}'}}, True),
({'password': {'key': 'Password:', 'value': '{{pass}}', 'a': 'b'}}, False),
({'env': 123}, False),
({'env': {}}, True),
({'env': {'AWX_SECRET': '{{awx_secret}}'}}, True),
({'env': {'AWX_SECRET_99': '{{awx_secret}}'}}, True),
({'env': {'99': '{{awx_secret}}'}}, False),
({'env': {'AWX_SECRET=': '{{awx_secret}}'}}, False),
({'extra_vars': 123}, False),
({'extra_vars': {}}, True),
({'extra_vars': {'hostname': '{{host}}'}}, True),
({'extra_vars': {'hostname_99': '{{host}}'}}, True),
({'extra_vars': {'99': '{{host}}'}}, False),
({'extra_vars': {'99=': '{{host}}'}}, False),
])
def test_cred_type_injectors_schema(injectors, valid):
type_ = CredentialType(
kind='cloud',
name='SomeCloud',
managed_by_tower=True,
inputs={
'fields': [
{'id': 'username', 'type': 'string', 'label': '_'},
{'id': 'pass', 'type': 'string', 'label': '_'},
{'id': 'awx_secret', 'type': 'string', 'label': '_'},
{'id': 'host', 'type': 'string', 'label': '_'},
]
},
injectors=injectors
)
if valid is False:
with pytest.raises(ValidationError):
type_.full_clean()
else:
type_.full_clean()
@pytest.mark.django_db
def test_credential_creation(organization_factory):
org = organization_factory('test').organization
type_ = CredentialType(
kind='cloud',
name='SomeCloud',
managed_by_tower=True,
inputs={
'fields': [{
'id': 'username',
'label': 'Username for SomeCloud',
'type': 'string'
}]
}
)
type_.save()
cred = Credential(credential_type=type_, name="Bob's Credential",
inputs={'username': 'bob'}, organization=org)
cred.save()
cred.full_clean()
assert isinstance(cred, Credential)
assert cred.name == "Bob's Credential"
assert cred.inputs['username'] == cred.username == 'bob'
@pytest.mark.django_db
def test_credential_creation_validation_failure(organization_factory):
org = organization_factory('test').organization
type_ = CredentialType(
kind='cloud',
name='SomeCloud',
managed_by_tower=True,
inputs={
'fields': [{
'id': 'username',
'label': 'Username for SomeCloud',
'type': 'string'
}]
}
)
type_.save()
with pytest.raises(ValidationError):
cred = Credential(credential_type=type_, name="Bob's Credential",
inputs={'user': 'wrong-key'}, organization=org)
cred.save()
cred.full_clean()
@pytest.mark.django_db
def test_credential_encryption(organization_factory, credentialtype_ssh):
org = organization_factory('test').organization
cred = Credential(
credential_type=credentialtype_ssh,
name="Bob's Credential",
inputs={'password': 'testing123'},
organization=org
)
cred.save()
assert Credential.objects.count() == 1
cred = Credential.objects.all()[:1].get()
assert cred.inputs['password'].startswith('$encrypted$')
assert decrypt_field(cred, 'password') == 'testing123'
@pytest.mark.django_db
def test_credential_encryption_with_ask(organization_factory, credentialtype_ssh):
org = organization_factory('test').organization
cred = Credential(
credential_type=credentialtype_ssh,
name="Bob's Credential",
inputs={'password': 'ASK'},
organization=org
)
cred.save()
assert Credential.objects.count() == 1
cred = Credential.objects.all()[:1].get()
assert cred.inputs['password'] == 'ASK'
@pytest.mark.django_db
def test_credential_with_multiple_secrets(organization_factory, credentialtype_ssh):
org = organization_factory('test').organization
cred = Credential(
credential_type=credentialtype_ssh,
name="Bob's Credential",
inputs={'ssh_key_data': 'SOMEKEY', 'ssh_key_unlock': 'testing123'},
organization=org
)
cred.save()
assert Credential.objects.count() == 1
cred = Credential.objects.all()[:1].get()
assert cred.inputs['ssh_key_data'].startswith('$encrypted$')
assert decrypt_field(cred, 'ssh_key_data') == 'SOMEKEY'
assert cred.inputs['ssh_key_unlock'].startswith('$encrypted$')
assert decrypt_field(cred, 'ssh_key_unlock') == 'testing123'
@pytest.mark.django_db
def test_credential_update(organization_factory, credentialtype_ssh):
org = organization_factory('test').organization
cred = Credential(
credential_type=credentialtype_ssh,
name="Bob's Credential",
inputs={'password': 'testing123'},
organization=org
)
cred.save()
assert Credential.objects.count() == 1
cred = Credential.objects.all()[:1].get()
cred.inputs['password'] = 'newpassword'
cred.save()
assert Credential.objects.count() == 1
cred = Credential.objects.all()[:1].get()
assert cred.inputs['password'].startswith('$encrypted$')
assert decrypt_field(cred, 'password') == 'newpassword'
@pytest.mark.django_db
def test_credential_update_with_prior(organization_factory, credentialtype_ssh):
org = organization_factory('test').organization
cred = Credential(
credential_type=credentialtype_ssh,
name="Bob's Credential",
inputs={'password': 'testing123'},
organization=org
)
cred.save()
assert Credential.objects.count() == 1
cred = Credential.objects.all()[:1].get()
cred.inputs['username'] = 'joe'
cred.inputs['password'] = '$encrypted$'
cred.save()
assert Credential.objects.count() == 1
cred = Credential.objects.all()[:1].get()
assert cred.inputs['username'] == 'joe'
assert cred.inputs['password'].startswith('$encrypted$')
assert decrypt_field(cred, 'password') == 'testing123'

View File

@ -0,0 +1,313 @@
import mock
import pytest
from contextlib import contextmanager
from django.apps import apps
from awx.main.models import Credential, CredentialType
from awx.main.migrations._credentialtypes import migrate_to_v2_credentials
from awx.main.utils.common import decrypt_field
EXAMPLE_PRIVATE_KEY = '-----BEGIN PRIVATE KEY-----\nxyz==\n-----END PRIVATE KEY-----'
# TODO: remove this set of tests when API v1 is removed
@contextmanager
def migrate(credential, kind):
with mock.patch.object(Credential, 'kind', kind), \
mock.patch.object(Credential, 'objects', mock.Mock(
get=lambda **kw: credential,
all=lambda: [credential]
)):
class Apps(apps.__class__):
def get_model(self, app, model):
if model == 'Credential':
return Credential
return apps.get_model(app, model)
yield
migrate_to_v2_credentials(Apps(), None)
@pytest.mark.django_db
def test_ssh_migration():
cred = Credential(name='My Credential')
with migrate(cred, 'ssh'):
cred.__dict__.update({
'username': 'bob',
'password': 'secret',
'ssh_key_data': EXAMPLE_PRIVATE_KEY,
'ssh_key_unlock': 'keypass',
'become_method': 'sudo',
'become_username': 'superuser',
'become_password': 'superpassword',
})
assert cred.credential_type.name == 'SSH'
assert cred.inputs['username'] == 'bob'
assert cred.inputs['password'].startswith('$encrypted$')
assert decrypt_field(cred, 'password') == 'secret'
assert cred.inputs['ssh_key_data'].startswith('$encrypted$')
assert decrypt_field(cred, 'ssh_key_data') == EXAMPLE_PRIVATE_KEY
assert cred.inputs['ssh_key_unlock'].startswith('$encrypted$')
assert decrypt_field(cred, 'ssh_key_unlock') == 'keypass'
assert cred.inputs['become_method'] == 'sudo'
assert cred.inputs['become_username'] == 'superuser'
assert cred.inputs['become_password'].startswith('$encrypted$')
assert decrypt_field(cred, 'become_password') == 'superpassword'
assert Credential.objects.count() == 1
@pytest.mark.django_db
def test_scm_migration():
cred = Credential(name='My Credential')
with migrate(cred, 'scm'):
cred.__dict__.update({
'username': 'bob',
'password': 'secret',
'ssh_key_data': EXAMPLE_PRIVATE_KEY,
'ssh_key_unlock': 'keypass',
})
assert cred.credential_type.name == 'Source Control'
assert cred.inputs['username'] == 'bob'
assert cred.inputs['password'].startswith('$encrypted$')
assert decrypt_field(cred, 'password') == 'secret'
assert cred.inputs['ssh_key_data'].startswith('$encrypted$')
assert decrypt_field(cred, 'ssh_key_data') == EXAMPLE_PRIVATE_KEY
assert cred.inputs['ssh_key_unlock'].startswith('$encrypted$')
assert decrypt_field(cred, 'ssh_key_unlock') == 'keypass'
assert Credential.objects.count() == 1
@pytest.mark.django_db
def test_vault_only_migration():
cred = Credential(name='My Credential')
with migrate(cred, 'ssh'):
cred.__dict__.update({
'vault_password': 'vault',
})
assert cred.credential_type.name == 'Vault'
assert cred.inputs['vault_password'].startswith('$encrypted$')
assert decrypt_field(cred, 'vault_password') == 'vault'
assert Credential.objects.count() == 1
@pytest.mark.django_db
def test_vault_with_ssh_migration():
cred = Credential(name='My Credential')
with migrate(cred, 'ssh'):
cred.__dict__.update({
'vault_password': 'vault',
'username': 'bob',
'password': 'secret',
'ssh_key_data': EXAMPLE_PRIVATE_KEY,
'ssh_key_unlock': 'keypass',
'become_method': 'sudo',
'become_username': 'superuser',
'become_password': 'superpassword',
})
assert Credential.objects.count() == 2
assert Credential.objects.filter(credential_type__name='Vault').get() == cred
assert cred.inputs.keys() == ['vault_password']
assert cred.inputs['vault_password'].startswith('$encrypted$')
assert decrypt_field(cred, 'vault_password') == 'vault'
ssh_cred = Credential.objects.filter(credential_type__name='SSH').get()
assert sorted(ssh_cred.inputs.keys()) == sorted(CredentialType.from_v1_kind('ssh').defined_fields)
assert ssh_cred.credential_type.name == 'SSH'
assert ssh_cred.inputs['username'] == 'bob'
assert ssh_cred.inputs['password'].startswith('$encrypted$')
assert decrypt_field(ssh_cred, 'password') == 'secret'
assert ssh_cred.inputs['ssh_key_data'].startswith('$encrypted$')
assert decrypt_field(ssh_cred, 'ssh_key_data') == EXAMPLE_PRIVATE_KEY
assert ssh_cred.inputs['ssh_key_unlock'].startswith('$encrypted$')
assert decrypt_field(ssh_cred, 'ssh_key_unlock') == 'keypass'
assert ssh_cred.inputs['become_method'] == 'sudo'
assert ssh_cred.inputs['become_username'] == 'superuser'
assert ssh_cred.inputs['become_password'].startswith('$encrypted$')
assert decrypt_field(ssh_cred, 'become_password') == 'superpassword'
@pytest.mark.django_db
def test_net_migration():
cred = Credential(name='My Credential')
with migrate(cred, 'net'):
cred.__dict__.update({
'username': 'bob',
'password': 'secret',
'ssh_key_data': EXAMPLE_PRIVATE_KEY,
'ssh_key_unlock': 'keypass',
'authorize_password': 'authorize-secret',
})
assert cred.credential_type.name == 'Network'
assert cred.inputs['username'] == 'bob'
assert cred.inputs['password'].startswith('$encrypted$')
assert decrypt_field(cred, 'password') == 'secret'
assert cred.inputs['ssh_key_data'].startswith('$encrypted$')
assert decrypt_field(cred, 'ssh_key_data') == EXAMPLE_PRIVATE_KEY
assert cred.inputs['ssh_key_unlock'].startswith('$encrypted$')
assert decrypt_field(cred, 'ssh_key_unlock') == 'keypass'
assert cred.inputs['authorize_password'].startswith('$encrypted$')
assert decrypt_field(cred, 'authorize_password') == 'authorize-secret'
assert Credential.objects.count() == 1
@pytest.mark.django_db
def test_aws_migration():
cred = Credential(name='My Credential')
with migrate(cred, 'aws'):
cred.__dict__.update({
'username': 'bob',
'password': 'secret',
'security_token': 'secret-token'
})
assert cred.credential_type.name == 'Amazon Web Services'
assert cred.inputs['username'] == 'bob'
assert cred.inputs['password'].startswith('$encrypted$')
assert decrypt_field(cred, 'password') == 'secret'
assert cred.inputs['security_token'].startswith('$encrypted$')
assert decrypt_field(cred, 'security_token') == 'secret-token'
assert Credential.objects.count() == 1
@pytest.mark.django_db
def test_openstack_migration():
cred = Credential(name='My Credential')
with migrate(cred, 'openstack'):
cred.__dict__.update({
'username': 'bob',
'password': 'secret',
'host': 'https://keystone.example.org/',
'project': 'TENANT_ID',
})
assert cred.credential_type.name == 'OpenStack'
assert cred.inputs['username'] == 'bob'
assert cred.inputs['password'].startswith('$encrypted$')
assert decrypt_field(cred, 'password') == 'secret'
assert cred.inputs['host'] == 'https://keystone.example.org/'
assert cred.inputs['project'] == 'TENANT_ID'
assert Credential.objects.count() == 1
@pytest.mark.skip(reason="TODO: rackspace should be a custom type (we're removing official support)")
def test_rackspace():
pass
@pytest.mark.django_db
def test_vmware_migration():
cred = Credential(name='My Credential')
with migrate(cred, 'vmware'):
cred.__dict__.update({
'username': 'bob',
'password': 'secret',
'host': 'https://example.org/',
})
assert cred.credential_type.name == 'VMware vCenter'
assert cred.inputs['username'] == 'bob'
assert cred.inputs['password'].startswith('$encrypted$')
assert decrypt_field(cred, 'password') == 'secret'
assert cred.inputs['host'] == 'https://example.org/'
assert Credential.objects.count() == 1
@pytest.mark.django_db
def test_satellite6_migration():
cred = Credential(name='My Credential')
with migrate(cred, 'satellite6'):
cred.__dict__.update({
'username': 'bob',
'password': 'secret',
'host': 'https://example.org/',
})
assert cred.credential_type.name == 'Red Hat Satellite 6'
assert cred.inputs['username'] == 'bob'
assert cred.inputs['password'].startswith('$encrypted$')
assert decrypt_field(cred, 'password') == 'secret'
assert cred.inputs['host'] == 'https://example.org/'
assert Credential.objects.count() == 1
@pytest.mark.django_db
def test_cloudforms_migration():
cred = Credential(name='My Credential')
with migrate(cred, 'cloudforms'):
cred.__dict__.update({
'username': 'bob',
'password': 'secret',
'host': 'https://example.org/',
})
assert cred.credential_type.name == 'Red Hat CloudForms'
assert cred.inputs['username'] == 'bob'
assert cred.inputs['password'].startswith('$encrypted$')
assert decrypt_field(cred, 'password') == 'secret'
assert cred.inputs['host'] == 'https://example.org/'
assert Credential.objects.count() == 1
@pytest.mark.django_db
def test_gce_migration():
cred = Credential(name='My Credential')
with migrate(cred, 'gce'):
cred.__dict__.update({
'username': 'bob',
'project': 'PROJECT-123',
'ssh_key_data': EXAMPLE_PRIVATE_KEY
})
assert cred.credential_type.name == 'Google Compute Engine'
assert cred.inputs['username'] == 'bob'
assert cred.inputs['project'] == 'PROJECT-123'
assert cred.inputs['ssh_key_data'].startswith('$encrypted$')
assert decrypt_field(cred, 'ssh_key_data') == EXAMPLE_PRIVATE_KEY
assert Credential.objects.count() == 1
@pytest.mark.django_db
def test_azure_classic_migration():
cred = Credential(name='My Credential')
with migrate(cred, 'azure'):
cred.__dict__.update({
'username': 'bob',
'ssh_key_data': EXAMPLE_PRIVATE_KEY
})
assert cred.credential_type.name == 'Microsoft Azure Classic (deprecated)'
assert cred.inputs['username'] == 'bob'
assert cred.inputs['ssh_key_data'].startswith('$encrypted$')
assert decrypt_field(cred, 'ssh_key_data') == EXAMPLE_PRIVATE_KEY
assert Credential.objects.count() == 1
@pytest.mark.django_db
def test_azure_rm_migration():
cred = Credential(name='My Credential')
with migrate(cred, 'azure_rm'):
cred.__dict__.update({
'subscription': 'some-subscription',
'username': 'bob',
'password': 'some-password',
'client': 'some-client',
'secret': 'some-secret',
'tenant': 'some-tenant',
})
assert cred.credential_type.name == 'Microsoft Azure Resource Manager'
assert cred.inputs['subscription'] == 'some-subscription'
assert cred.inputs['username'] == 'bob'
assert cred.inputs['password'].startswith('$encrypted$')
assert decrypt_field(cred, 'password') == 'some-password'
assert cred.inputs['client'] == 'some-client'
assert cred.inputs['secret'].startswith('$encrypted$')
assert decrypt_field(cred, 'secret') == 'some-secret'
assert cred.inputs['tenant'] == 'some-tenant'
assert Credential.objects.count() == 1

View File

@ -5,12 +5,12 @@ from awx.main.models import Credential
@pytest.mark.django_db @pytest.mark.django_db
def test_cred_unique_org_name_kind(organization_factory): def test_cred_unique_org_name_kind(organization_factory, credentialtype_ssh):
objects = organization_factory("test") objects = organization_factory("test")
cred = Credential(name="test", kind="net", organization=objects.organization) cred = Credential(name="test", credential_type=credentialtype_ssh, organization=objects.organization)
cred.save() cred.save()
with pytest.raises(IntegrityError): with pytest.raises(IntegrityError):
cred = Credential(name="test", kind="net", organization=objects.organization) cred = Credential(name="test", credential_type=credentialtype_ssh, organization=objects.organization)
cred.save() cred.save()

View File

@ -21,12 +21,12 @@ def test_credential_migration_user(credential, user, permissions):
@pytest.mark.django_db @pytest.mark.django_db
def test_two_teams_same_cred_name(organization_factory): def test_two_teams_same_cred_name(organization_factory, credentialtype_net):
objects = organization_factory("test", objects = organization_factory("test",
teams=["team1", "team2"]) teams=["team1", "team2"])
cred1 = Credential.objects.create(name="test", kind="net", deprecated_team=objects.teams.team1) cred1 = Credential.objects.create(name="test", credential_type=credentialtype_net, deprecated_team=objects.teams.team1)
cred2 = Credential.objects.create(name="test", kind="net", deprecated_team=objects.teams.team2) cred2 = Credential.objects.create(name="test", credential_type=credentialtype_net, deprecated_team=objects.teams.team2)
rbac.migrate_credential(apps, None) rbac.migrate_credential(apps, None)
@ -119,7 +119,7 @@ def test_credential_access_auditor(credential, organization_factory):
@pytest.mark.django_db @pytest.mark.django_db
def test_credential_access_admin(user, team, credential): def test_credential_access_admin(user, team, credential, credentialtype_aws):
u = user('org-admin', False) u = user('org-admin', False)
team.organization.admin_role.members.add(u) team.organization.admin_role.members.add(u)
@ -137,7 +137,7 @@ def test_credential_access_admin(user, team, credential):
credential.admin_role.parents.add(team.admin_role) credential.admin_role.parents.add(team.admin_role)
credential.save() credential.save()
cred = Credential.objects.create(kind='aws', name='test-cred') cred = Credential.objects.create(credential_type=credentialtype_aws, name='test-cred')
cred.deprecated_team = team cred.deprecated_team = team
cred.save() cred.save()

View File

@ -51,13 +51,20 @@ class TestJobRelaunchAccess:
return jt.create_unified_job() return jt.create_unified_job()
@pytest.fixture @pytest.fixture
def job_with_prompts(self, machine_credential, inventory, organization): def job_with_prompts(self, machine_credential, inventory, organization, credentialtype_ssh):
jt = JobTemplate.objects.create( jt = JobTemplate.objects.create(
name='test-job-template-prompts', credential=machine_credential, inventory=inventory, name='test-job-template-prompts', credential=machine_credential, inventory=inventory,
ask_tags_on_launch=True, ask_variables_on_launch=True, ask_skip_tags_on_launch=True, ask_tags_on_launch=True, ask_variables_on_launch=True, ask_skip_tags_on_launch=True,
ask_limit_on_launch=True, ask_job_type_on_launch=True, ask_inventory_on_launch=True, ask_limit_on_launch=True, ask_job_type_on_launch=True, ask_inventory_on_launch=True,
ask_credential_on_launch=True) ask_credential_on_launch=True)
new_cred = Credential.objects.create(name='new-cred', kind='ssh', username='test_user', password='pas4word') new_cred = Credential.objects.create(
name='new-cred',
credential_type=credentialtype_ssh,
inputs={
'username': 'test_user',
'password': 'pas4word'
}
)
new_inv = Inventory.objects.create(name='new-inv', organization=organization) new_inv = Inventory.objects.create(name='new-inv', organization=organization)
return jt.create_unified_job(credential=new_cred, inventory=new_inv) return jt.create_unified_job(credential=new_cred, inventory=new_inv)

View File

@ -8,7 +8,7 @@ from awx.main.migrations import _old_access as old_access
@pytest.mark.django_db @pytest.mark.django_db
def test_project_migration(): def test_project_migration(credentialtype_ssh):
''' '''
o1 o2 o3 with o1 -- i1 o2 -- i2 o1 o2 o3 with o1 -- i1 o2 -- i2
@ -59,7 +59,7 @@ def test_project_migration():
o2 = Organization.objects.create(name='o2') o2 = Organization.objects.create(name='o2')
o3 = Organization.objects.create(name='o3') o3 = Organization.objects.create(name='o3')
c1 = Credential.objects.create(name='c1') c1 = Credential.objects.create(name='c1', credential_type=credentialtype_ssh)
project_name = unicode("\xc3\xb4", "utf-8") project_name = unicode("\xc3\xb4", "utf-8")
p1 = Project.objects.create(name=project_name, credential=c1) p1 = Project.objects.create(name=project_name, credential=c1)

View File

@ -1,7 +1,7 @@
import pytest import pytest
from awx.main.models.jobs import JobTemplate from awx.main.models.jobs import JobTemplate
from awx.main.models import Inventory, Credential, Project from awx.main.models import Inventory, CredentialType, Credential, Project
from awx.main.models.workflow import ( from awx.main.models.workflow import (
WorkflowJobTemplate, WorkflowJobTemplateNode, WorkflowJobOptions, WorkflowJobTemplate, WorkflowJobTemplateNode, WorkflowJobOptions,
WorkflowJob, WorkflowJobNode WorkflowJob, WorkflowJobNode
@ -125,7 +125,12 @@ def job_node_no_prompts(workflow_job_unit, jt_ask):
def job_node_with_prompts(job_node_no_prompts): def job_node_with_prompts(job_node_no_prompts):
job_node_no_prompts.char_prompts = example_prompts job_node_no_prompts.char_prompts = example_prompts
job_node_no_prompts.inventory = Inventory(name='example-inv') job_node_no_prompts.inventory = Inventory(name='example-inv')
job_node_no_prompts.credential = Credential(name='example-inv', kind='ssh', username='asdf', password='asdf') ssh_type = CredentialType.defaults['ssh']()
job_node_no_prompts.credential = Credential(
name='example-inv',
credential_type=ssh_type,
inputs={'username': 'asdf', 'password': 'asdf'}
)
return job_node_no_prompts return job_node_no_prompts
@ -138,7 +143,12 @@ def wfjt_node_no_prompts(workflow_job_template_unit, jt_ask):
def wfjt_node_with_prompts(wfjt_node_no_prompts): def wfjt_node_with_prompts(wfjt_node_no_prompts):
wfjt_node_no_prompts.char_prompts = example_prompts wfjt_node_no_prompts.char_prompts = example_prompts
wfjt_node_no_prompts.inventory = Inventory(name='example-inv') wfjt_node_no_prompts.inventory = Inventory(name='example-inv')
wfjt_node_no_prompts.credential = Credential(name='example-inv', kind='ssh', username='asdf', password='asdf') ssh_type = CredentialType.defaults['ssh']()
wfjt_node_no_prompts.credential = Credential(
name='example-inv',
credential_type=ssh_type,
inputs={'username': 'asdf', 'password': 'asdf'}
)
return wfjt_node_no_prompts return wfjt_node_no_prompts

View File

@ -1,6 +1,6 @@
import pytest import pytest
from awx.main.models.credential import Credential from awx.main.models.credential import CredentialType, Credential
from awx.main.models.jobs import Job from awx.main.models.jobs import Job
from awx.main.models.inventory import Inventory from awx.main.models.inventory import Inventory
from awx.main.tasks import RunJob from awx.main.tasks import RunJob
@ -10,12 +10,15 @@ def test_aws_cred_parse(mocker):
with mocker.patch('django.db.ConnectionRouter.db_for_write'): with mocker.patch('django.db.ConnectionRouter.db_for_write'):
job = Job(id=1) job = Job(id=1)
job.inventory = mocker.MagicMock(spec=Inventory, id=2) job.inventory = mocker.MagicMock(spec=Inventory, id=2)
aws = CredentialType.defaults['aws']()
options = { options = {
'kind': 'aws', 'credential_type': aws,
'username': 'aws_user', 'inputs': {
'password': 'aws_passwd', 'username': 'aws_user',
'security_token': 'token', 'password': 'aws_passwd',
'security_token': 'token',
}
} }
job.cloud_credential = Credential(**options) job.cloud_credential = Credential(**options)
@ -23,22 +26,26 @@ def test_aws_cred_parse(mocker):
mocker.patch.object(run_job, 'should_use_proot', return_value=False) mocker.patch.object(run_job, 'should_use_proot', return_value=False)
env = run_job.build_env(job, private_data_dir='/tmp') env = run_job.build_env(job, private_data_dir='/tmp')
assert env['AWS_ACCESS_KEY'] == options['username'] assert env['AWS_ACCESS_KEY'] == options['inputs']['username']
assert env['AWS_SECRET_KEY'] == options['password'] assert env['AWS_SECRET_KEY'] == options['inputs']['password']
assert env['AWS_SECURITY_TOKEN'] == options['security_token'] assert env['AWS_SECURITY_TOKEN'] == options['inputs']['security_token']
def test_net_cred_parse(mocker): def test_net_cred_parse(mocker):
with mocker.patch('django.db.ConnectionRouter.db_for_write'): with mocker.patch('django.db.ConnectionRouter.db_for_write'):
job = Job(id=1) job = Job(id=1)
job.inventory = mocker.MagicMock(spec=Inventory, id=2) job.inventory = mocker.MagicMock(spec=Inventory, id=2)
net = CredentialType.defaults['aws']()
options = { options = {
'username':'test', 'credential_type': net,
'password':'test', 'inputs': {
'authorize': True, 'username':'test',
'authorize_password': 'passwd', 'password':'test',
'ssh_key_data': """-----BEGIN PRIVATE KEY-----\nstuff==\n-----END PRIVATE KEY-----""", 'authorize': True,
'authorize_password': 'passwd',
'ssh_key_data': """-----BEGIN PRIVATE KEY-----\nstuff==\n-----END PRIVATE KEY-----""",
}
} }
private_data_files = { private_data_files = {
'network_credential': '/tmp/this_file_does_not_exist_during_test_but_the_path_is_real', 'network_credential': '/tmp/this_file_does_not_exist_during_test_but_the_path_is_real',
@ -49,21 +56,25 @@ def test_net_cred_parse(mocker):
mocker.patch.object(run_job, 'should_use_proot', return_value=False) mocker.patch.object(run_job, 'should_use_proot', return_value=False)
env = run_job.build_env(job, private_data_dir='/tmp', private_data_files=private_data_files) env = run_job.build_env(job, private_data_dir='/tmp', private_data_files=private_data_files)
assert env['ANSIBLE_NET_USERNAME'] == options['username'] assert env['ANSIBLE_NET_USERNAME'] == options['inputs']['username']
assert env['ANSIBLE_NET_PASSWORD'] == options['password'] assert env['ANSIBLE_NET_PASSWORD'] == options['inputs']['password']
assert env['ANSIBLE_NET_AUTHORIZE'] == '1' assert env['ANSIBLE_NET_AUTHORIZE'] == '1'
assert env['ANSIBLE_NET_AUTH_PASS'] == options['authorize_password'] assert env['ANSIBLE_NET_AUTH_PASS'] == options['inputs']['authorize_password']
assert env['ANSIBLE_NET_SSH_KEYFILE'] == private_data_files['network_credential'] assert env['ANSIBLE_NET_SSH_KEYFILE'] == private_data_files['network_credential']
@pytest.fixture @pytest.fixture
def mock_job(mocker): def mock_job(mocker):
ssh = CredentialType.defaults['ssh']()
options = { options = {
'username':'test', 'credential_type': ssh,
'password':'test', 'inputs': {
'ssh_key_data': """-----BEGIN PRIVATE KEY-----\nstuff==\n-----END PRIVATE KEY-----""", 'username':'test',
'authorize': True, 'password':'test',
'authorize_password': 'passwd', 'ssh_key_data': """-----BEGIN PRIVATE KEY-----\nstuff==\n-----END PRIVATE KEY-----""",
'authorize': True,
'authorize_password': 'passwd',
}
} }
mock_job_attrs = {'forks': False, 'id': 1, 'cancel_flag': False, 'status': 'running', 'job_type': 'normal', mock_job_attrs = {'forks': False, 'id': 1, 'cancel_flag': False, 'status': 'running', 'job_type': 'normal',

View File

@ -1,17 +1,29 @@
from contextlib import contextmanager from contextlib import contextmanager
from datetime import datetime
from functools import partial
import ConfigParser
import tempfile
import pytest import pytest
import yaml import yaml
import mock import mock
from awx.main.models import ( from awx.main.models import (
UnifiedJob, Credential,
CredentialType,
Inventory,
InventorySource,
InventoryUpdate,
Job,
Notification, Notification,
ProjectUpdate Project,
ProjectUpdate,
UnifiedJob,
) )
from awx.main import tasks from awx.main import tasks
from awx.main.task_engine import TaskEnhancer from awx.main.task_engine import TaskEnhancer
from awx.main.utils.common import encrypt_field
@contextmanager @contextmanager
@ -152,3 +164,682 @@ def test_openstack_client_config_generation_with_private_source_vars(mocker, sou
'private': expected 'private': expected
} }
} }
def pytest_generate_tests(metafunc):
# pytest.mark.parametrize doesn't work on unittest.TestCase methods
# see: https://docs.pytest.org/en/latest/example/parametrize.html#parametrizing-test-methods-through-per-class-configuration
if metafunc.cls and hasattr(metafunc.cls, 'parametrize'):
funcarglist = metafunc.cls.parametrize.get(metafunc.function.__name__)
if funcarglist:
argnames = sorted(funcarglist[0])
metafunc.parametrize(
argnames,
[[funcargs[name] for name in argnames] for funcargs in funcarglist]
)
class TestJobExecution:
"""
For job runs, test that `ansible-playbook` is invoked with the proper
arguments, environment variables, and pexpect passwords for a variety of
credential types.
"""
TASK_CLS = tasks.RunJob
EXAMPLE_PRIVATE_KEY = '-----BEGIN PRIVATE KEY-----\nxyz==\n-----END PRIVATE KEY-----'
def setup_method(self, method):
self.patches = [
mock.patch.object(Project, 'get_project_path', lambda *a, **kw: '/tmp/'),
# don't emit websocket statuses; they use the DB and complicate testing
mock.patch.object(UnifiedJob, 'websocket_emit_status', mock.Mock()),
mock.patch.object(Job, 'inventory', mock.Mock(pk=1, spec_set=['pk']))
]
for p in self.patches:
p.start()
self.instance = self.get_instance()
def status_side_effect(pk, **kwargs):
# If `Job.update_model` is called, we're not actually persisting
# to the database; just update the status, which is usually
# the update we care about for testing purposes
if 'status' in kwargs:
self.instance.status = kwargs['status']
return self.instance
self.task = self.TASK_CLS()
self.task.update_model = mock.Mock(side_effect=status_side_effect)
# The primary goal of these tests is to mock our `run_pexpect` call
# and make assertions about the arguments and environment passed to it.
self.task.run_pexpect = mock.Mock(return_value=['successful', 0])
# ignore pre-run and post-run hooks, they complicate testing in a variety of ways
self.task.pre_run_hook = self.task.post_run_hook = mock.Mock()
def teardown_method(self, method):
for p in self.patches:
p.stop()
def get_instance(self):
return Job(
pk=1,
created=datetime.utcnow(),
status='new',
job_type='run',
cancel_flag=False,
credential=None,
cloud_credential=None,
network_credential=None,
project=Project()
)
@property
def pk(self):
return self.instance.pk
class TestGenericRun(TestJobExecution):
def test_cancel_flag(self):
self.instance.cancel_flag = True
with pytest.raises(Exception):
self.task.run(self.pk)
for c in [
mock.call(self.pk, celery_task_id='', status='running'),
mock.call(self.pk, output_replacements=[], result_traceback=mock.ANY, status='canceled')
]:
assert c in self.task.update_model.call_args_list
def test_uses_bubblewrap(self):
self.task.run(self.pk)
assert self.task.run_pexpect.call_count == 1
call_args, _ = self.task.run_pexpect.call_args_list[0]
job, args, cwd, env, passwords, stdout = call_args
assert args[0] == 'bwrap'
class TestJobCredentials(TestJobExecution):
parametrize = {
'test_ssh_passwords': [
dict(field='password', password_name='ssh_password', expected_flag='--ask-pass'),
dict(field='ssh_key_unlock', password_name='ssh_key_unlock', expected_flag=None),
dict(field='become_password', password_name='become_password', expected_flag='--ask-become-pass'),
dict(field='vault_password', password_name='vault_password', expected_flag='--ask-vault-pass'),
]
}
def test_ssh_passwords(self, field, password_name, expected_flag):
ssh = CredentialType.defaults['ssh']()
self.instance.credential = Credential(
credential_type=ssh,
inputs = {'username': 'bob', field: 'secret'}
)
self.instance.credential.inputs[field] = encrypt_field(
self.instance.credential, field
)
self.task.run(self.pk)
assert self.task.run_pexpect.call_count == 1
call_args, _ = self.task.run_pexpect.call_args_list[0]
job, args, cwd, env, passwords, stdout = call_args
assert passwords[password_name] == 'secret'
assert '-u bob' in ' '.join(args)
if expected_flag:
assert expected_flag in ' '.join(args)
def test_ssh_key_with_agent(self):
ssh = CredentialType.defaults['ssh']()
self.instance.credential = Credential(
credential_type=ssh,
inputs = {
'username': 'bob',
'ssh_key_data': self.EXAMPLE_PRIVATE_KEY
}
)
self.instance.credential.inputs['ssh_key_data'] = encrypt_field(
self.instance.credential, 'ssh_key_data'
)
def run_pexpect_side_effect(private_data, *args, **kwargs):
job, args, cwd, env, passwords, stdout = args
ssh_key_data_fifo = '/'.join([private_data, 'credential'])
assert open(ssh_key_data_fifo, 'r').read() == self.EXAMPLE_PRIVATE_KEY
assert ' '.join(args).startswith(
'ssh-agent -a %s sh -c ssh-add %s && rm -f %s' % (
'/'.join([private_data, 'ssh_auth.sock']),
ssh_key_data_fifo,
ssh_key_data_fifo
)
)
return ['successful', 0]
private_data = tempfile.mkdtemp(prefix='ansible_tower_')
self.task.build_private_data_dir = mock.Mock(return_value=private_data)
self.task.run_pexpect = mock.Mock(
side_effect=partial(run_pexpect_side_effect, private_data)
)
self.task.run(self.pk, private_data_dir=private_data)
def test_aws_cloud_credential(self):
aws = CredentialType.defaults['aws']()
self.instance.cloud_credential = Credential(
credential_type=aws,
inputs = {'username': 'bob', 'password': 'secret'}
)
self.instance.cloud_credential.inputs['password'] = encrypt_field(
self.instance.cloud_credential, 'password'
)
self.task.run(self.pk)
assert self.task.run_pexpect.call_count == 1
call_args, _ = self.task.run_pexpect.call_args_list[0]
job, args, cwd, env, passwords, stdout = call_args
assert env['AWS_ACCESS_KEY'] == 'bob'
assert env['AWS_SECRET_KEY'] == 'secret'
assert 'AWS_SECURITY_TOKEN' not in env
def test_aws_cloud_credential_with_sts_token(self):
aws = CredentialType.defaults['aws']()
self.instance.cloud_credential = Credential(
credential_type=aws,
inputs = {'username': 'bob', 'password': 'secret', 'security_token': 'token'}
)
for key in ('password', 'security_token'):
self.instance.cloud_credential.inputs[key] = encrypt_field(
self.instance.cloud_credential, key
)
self.task.run(self.pk)
assert self.task.run_pexpect.call_count == 1
call_args, _ = self.task.run_pexpect.call_args_list[0]
job, args, cwd, env, passwords, stdout = call_args
assert env['AWS_ACCESS_KEY'] == 'bob'
assert env['AWS_SECRET_KEY'] == 'secret'
assert env['AWS_SECURITY_TOKEN'] == 'token'
def test_rax_credential(self):
rax = CredentialType.defaults['rackspace']()
self.instance.cloud_credential = Credential(
credential_type=rax,
inputs = {'username': 'bob', 'password': 'secret'}
)
self.instance.cloud_credential.inputs['password'] = encrypt_field(
self.instance.cloud_credential, 'password'
)
self.task.run(self.pk)
assert self.task.run_pexpect.call_count == 1
call_args, _ = self.task.run_pexpect.call_args_list[0]
job, args, cwd, env, passwords, stdout = call_args
assert env['RAX_USERNAME'] == 'bob'
assert env['RAX_API_KEY'] == 'secret'
assert env['CLOUD_VERIFY_SSL'] == 'False'
def test_gce_credentials(self):
gce = CredentialType.defaults['gce']()
self.instance.cloud_credential = Credential(
credential_type=gce,
inputs = {
'username': 'bob',
'project': 'some-project',
'ssh_key_data': self.EXAMPLE_PRIVATE_KEY
}
)
self.instance.cloud_credential.inputs['ssh_key_data'] = encrypt_field(
self.instance.cloud_credential, 'ssh_key_data'
)
def run_pexpect_side_effect(*args, **kwargs):
job, args, cwd, env, passwords, stdout = args
assert env['GCE_EMAIL'] == 'bob'
assert env['GCE_PROJECT'] == 'some-project'
ssh_key_data = env['GCE_PEM_FILE_PATH']
assert open(ssh_key_data, 'rb').read() == self.EXAMPLE_PRIVATE_KEY
return ['successful', 0]
self.task.run_pexpect = mock.Mock(side_effect=run_pexpect_side_effect)
self.task.run(self.pk)
def test_azure_credentials(self):
azure = CredentialType.defaults['azure']()
self.instance.cloud_credential = Credential(
credential_type=azure,
inputs = {
'username': 'bob',
'ssh_key_data': self.EXAMPLE_PRIVATE_KEY
}
)
self.instance.cloud_credential.inputs['ssh_key_data'] = encrypt_field(
self.instance.cloud_credential, 'ssh_key_data'
)
def run_pexpect_side_effect(*args, **kwargs):
job, args, cwd, env, passwords, stdout = args
assert env['AZURE_SUBSCRIPTION_ID'] == 'bob'
ssh_key_data = env['AZURE_CERT_PATH']
assert open(ssh_key_data, 'rb').read() == self.EXAMPLE_PRIVATE_KEY
return ['successful', 0]
self.task.run_pexpect = mock.Mock(side_effect=run_pexpect_side_effect)
self.task.run(self.pk)
def test_azure_rm_with_tenant(self):
azure = CredentialType.defaults['azure_rm']()
self.instance.cloud_credential = Credential(
credential_type=azure,
inputs = {
'client': 'some-client',
'secret': 'some-secret',
'tenant': 'some-tenant',
'subscription': 'some-subscription'
}
)
self.instance.cloud_credential.inputs['secret'] = encrypt_field(
self.instance.cloud_credential, 'secret'
)
self.task.run(self.pk)
assert self.task.run_pexpect.call_count == 1
call_args, _ = self.task.run_pexpect.call_args_list[0]
job, args, cwd, env, passwords, stdout = call_args
assert env['AZURE_CLIENT_ID'] == 'some-client'
assert env['AZURE_SECRET'] == 'some-secret'
assert env['AZURE_TENANT'] == 'some-tenant'
assert env['AZURE_SUBSCRIPTION_ID'] == 'some-subscription'
def test_azure_rm_with_password(self):
azure = CredentialType.defaults['azure_rm']()
self.instance.cloud_credential = Credential(
credential_type=azure,
inputs = {
'subscription': 'some-subscription',
'username': 'bob',
'password': 'secret'
}
)
self.instance.cloud_credential.inputs['password'] = encrypt_field(
self.instance.cloud_credential, 'password'
)
self.task.run(self.pk)
assert self.task.run_pexpect.call_count == 1
call_args, _ = self.task.run_pexpect.call_args_list[0]
job, args, cwd, env, passwords, stdout = call_args
assert env['AZURE_SUBSCRIPTION_ID'] == 'some-subscription'
assert env['AZURE_AD_USER'] == 'bob'
assert env['AZURE_PASSWORD'] == 'secret'
def test_vmware_credentials(self):
vmware = CredentialType.defaults['vmware']()
self.instance.cloud_credential = Credential(
credential_type=vmware,
inputs = {'username': 'bob', 'password': 'secret', 'host': 'https://example.org'}
)
self.instance.cloud_credential.inputs['password'] = encrypt_field(
self.instance.cloud_credential, 'password'
)
self.task.run(self.pk)
assert self.task.run_pexpect.call_count == 1
call_args, _ = self.task.run_pexpect.call_args_list[0]
job, args, cwd, env, passwords, stdout = call_args
assert env['VMWARE_USER'] == 'bob'
assert env['VMWARE_PASSWORD'] == 'secret'
assert env['VMWARE_HOST'] == 'https://example.org'
def test_openstack_credentials(self):
openstack = CredentialType.defaults['openstack']()
self.instance.cloud_credential = Credential(
credential_type=openstack,
inputs = {
'username': 'bob',
'password': 'secret',
'project': 'tenant-name',
'host': 'https://keystone.example.org'
}
)
self.instance.cloud_credential.inputs['password'] = encrypt_field(
self.instance.cloud_credential, 'password'
)
def run_pexpect_side_effect(*args, **kwargs):
job, args, cwd, env, passwords, stdout = args
shade_config = open(env['OS_CLIENT_CONFIG_FILE'], 'rb').read()
assert shade_config == '\n'.join([
'clouds:',
' devstack:',
' auth:',
' auth_url: https://keystone.example.org',
' password: secret',
' project_name: tenant-name',
' username: bob',
''
])
return ['successful', 0]
self.task.run_pexpect = mock.Mock(side_effect=run_pexpect_side_effect)
self.task.run(self.pk)
def test_net_credentials(self):
net = CredentialType.defaults['net']()
self.instance.network_credential = Credential(
credential_type=net,
inputs = {
'username': 'bob',
'password': 'secret',
'ssh_key_data': self.EXAMPLE_PRIVATE_KEY,
'authorize': True,
'authorize_password': 'authorizeme'
}
)
for field in ('password', 'ssh_key_data', 'authorize_password'):
self.instance.network_credential.inputs[field] = encrypt_field(
self.instance.network_credential, field
)
def run_pexpect_side_effect(*args, **kwargs):
job, args, cwd, env, passwords, stdout = args
assert env['ANSIBLE_NET_USERNAME'] == 'bob'
assert env['ANSIBLE_NET_PASSWORD'] == 'secret'
assert env['ANSIBLE_NET_AUTHORIZE'] == '1'
assert env['ANSIBLE_NET_AUTH_PASS'] == 'authorizeme'
assert open(env['ANSIBLE_NET_SSH_KEYFILE'], 'rb').read() == self.EXAMPLE_PRIVATE_KEY
return ['successful', 0]
self.task.run_pexpect = mock.Mock(side_effect=run_pexpect_side_effect)
self.task.run(self.pk)
class TestProjectUpdateCredentials(TestJobExecution):
TASK_CLS = tasks.RunProjectUpdate
def get_instance(self):
return ProjectUpdate(
pk=1,
project=Project()
)
parametrize = {
'test_username_and_password_auth': [
dict(scm_type='git'),
dict(scm_type='hg'),
dict(scm_type='svn'),
],
'test_ssh_key_auth': [
dict(scm_type='git'),
dict(scm_type='hg'),
dict(scm_type='svn'),
]
}
def test_username_and_password_auth(self, scm_type):
ssh = CredentialType.defaults['ssh']()
self.instance.scm_type = scm_type
self.instance.credential = Credential(
credential_type=ssh,
inputs = {'username': 'bob', 'password': 'secret'}
)
self.instance.credential.inputs['password'] = encrypt_field(
self.instance.credential, 'password'
)
self.task.run(self.pk)
assert self.task.run_pexpect.call_count == 1
call_args, _ = self.task.run_pexpect.call_args_list[0]
job, args, cwd, env, passwords, stdout = call_args
assert passwords.get('scm_username') == 'bob'
assert passwords.get('scm_password') == 'secret'
def test_ssh_key_auth(self, scm_type):
ssh = CredentialType.defaults['ssh']()
self.instance.scm_type = scm_type
self.instance.credential = Credential(
credential_type=ssh,
inputs = {
'username': 'bob',
'ssh_key_data': self.EXAMPLE_PRIVATE_KEY
}
)
self.instance.credential.inputs['ssh_key_data'] = encrypt_field(
self.instance.credential, 'ssh_key_data'
)
def run_pexpect_side_effect(private_data, *args, **kwargs):
job, args, cwd, env, passwords, stdout = args
ssh_key_data_fifo = '/'.join([private_data, 'scm_credential'])
assert open(ssh_key_data_fifo, 'r').read() == self.EXAMPLE_PRIVATE_KEY
assert ' '.join(args).startswith(
'ssh-agent -a %s sh -c ssh-add %s && rm -f %s' % (
'/'.join([private_data, 'ssh_auth.sock']),
ssh_key_data_fifo,
ssh_key_data_fifo
)
)
assert passwords.get('scm_username') == 'bob'
return ['successful', 0]
private_data = tempfile.mkdtemp(prefix='ansible_tower_')
self.task.build_private_data_dir = mock.Mock(return_value=private_data)
self.task.run_pexpect = mock.Mock(
side_effect=partial(run_pexpect_side_effect, private_data)
)
self.task.run(self.pk)
class TestInventoryUpdateCredentials(TestJobExecution):
TASK_CLS = tasks.RunInventoryUpdate
def get_instance(self):
return InventoryUpdate(
pk=1,
inventory_source=InventorySource(
pk=1,
inventory=Inventory(pk=1)
)
)
def test_ec2_source(self):
aws = CredentialType.defaults['aws']()
self.instance.source = 'ec2'
self.instance.credential = Credential(
credential_type=aws,
inputs = {'username': 'bob', 'password': 'secret'}
)
self.instance.credential.inputs['password'] = encrypt_field(
self.instance.credential, 'password'
)
def run_pexpect_side_effect(*args, **kwargs):
job, args, cwd, env, passwords, stdout = args
assert env['AWS_ACCESS_KEY_ID'] == 'bob'
assert env['AWS_SECRET_ACCESS_KEY'] == 'secret'
assert 'EC2_INI_PATH' in env
config = ConfigParser.ConfigParser()
config.read(env['EC2_INI_PATH'])
assert 'ec2' in config.sections()
return ['successful', 0]
self.task.run_pexpect = mock.Mock(side_effect=run_pexpect_side_effect)
self.task.run(self.pk)
def test_vmware_source(self):
vmware = CredentialType.defaults['vmware']()
self.instance.source = 'vmware'
self.instance.credential = Credential(
credential_type=vmware,
inputs = {'username': 'bob', 'password': 'secret', 'host': 'https://example.org'}
)
self.instance.credential.inputs['password'] = encrypt_field(
self.instance.credential, 'password'
)
def run_pexpect_side_effect(*args, **kwargs):
job, args, cwd, env, passwords, stdout = args
config = ConfigParser.ConfigParser()
config.read(env['VMWARE_INI_PATH'])
assert config.get('vmware', 'username') == 'bob'
assert config.get('vmware', 'password') == 'secret'
assert config.get('vmware', 'server') == 'https://example.org'
return ['successful', 0]
self.task.run_pexpect = mock.Mock(side_effect=run_pexpect_side_effect)
self.task.run(self.pk)
def test_azure_source(self):
azure = CredentialType.defaults['azure']()
self.instance.source = 'azure'
self.instance.credential = Credential(
credential_type=azure,
inputs = {
'username': 'bob',
'ssh_key_data': self.EXAMPLE_PRIVATE_KEY
}
)
self.instance.credential.inputs['ssh_key_data'] = encrypt_field(
self.instance.credential, 'ssh_key_data'
)
def run_pexpect_side_effect(*args, **kwargs):
job, args, cwd, env, passwords, stdout = args
assert env['AZURE_SUBSCRIPTION_ID'] == 'bob'
ssh_key_data = env['AZURE_CERT_PATH']
assert open(ssh_key_data, 'rb').read() == self.EXAMPLE_PRIVATE_KEY
return ['successful', 0]
self.task.run_pexpect = mock.Mock(side_effect=run_pexpect_side_effect)
self.task.run(self.pk)
def test_gce_source(self):
gce = CredentialType.defaults['gce']()
self.instance.source = 'gce'
self.instance.credential = Credential(
credential_type=gce,
inputs = {
'username': 'bob',
'project': 'some-project',
'ssh_key_data': self.EXAMPLE_PRIVATE_KEY
}
)
self.instance.credential.inputs['ssh_key_data'] = encrypt_field(
self.instance.credential, 'ssh_key_data'
)
def run_pexpect_side_effect(*args, **kwargs):
job, args, cwd, env, passwords, stdout = args
assert env['GCE_EMAIL'] == 'bob'
assert env['GCE_PROJECT'] == 'some-project'
ssh_key_data = env['GCE_PEM_FILE_PATH']
assert open(ssh_key_data, 'rb').read() == self.EXAMPLE_PRIVATE_KEY
return ['successful', 0]
self.task.run_pexpect = mock.Mock(side_effect=run_pexpect_side_effect)
self.task.run(self.pk)
def test_openstack_source(self):
openstack = CredentialType.defaults['openstack']()
self.instance.source = 'openstack'
self.instance.credential = Credential(
credential_type=openstack,
inputs = {
'username': 'bob',
'password': 'secret',
'project': 'tenant-name',
'host': 'https://keystone.example.org'
}
)
self.instance.credential.inputs['ssh_key_data'] = encrypt_field(
self.instance.credential, 'ssh_key_data'
)
def run_pexpect_side_effect(*args, **kwargs):
job, args, cwd, env, passwords, stdout = args
shade_config = open(env['OS_CLIENT_CONFIG_FILE'], 'rb').read()
assert '\n'.join([
'clouds:',
' devstack:',
' auth:',
' auth_url: https://keystone.example.org',
' password: secret',
' project_name: tenant-name',
' username: bob',
''
]) in shade_config
return ['successful', 0]
self.task.run_pexpect = mock.Mock(side_effect=run_pexpect_side_effect)
self.task.run(self.pk)
def test_satellite6_source(self):
satellite6 = CredentialType.defaults['satellite6']()
self.instance.source = 'satellite6'
self.instance.credential = Credential(
credential_type=satellite6,
inputs = {
'username': 'bob',
'password': 'secret',
'host': 'https://example.org'
}
)
self.instance.credential.inputs['password'] = encrypt_field(
self.instance.credential, 'password'
)
def run_pexpect_side_effect(*args, **kwargs):
job, args, cwd, env, passwords, stdout = args
config = ConfigParser.ConfigParser()
config.read(env['FOREMAN_INI_PATH'])
assert config.get('foreman', 'url') == 'https://example.org'
assert config.get('foreman', 'user') == 'bob'
assert config.get('foreman', 'password') == 'secret'
return ['successful', 0]
self.task.run_pexpect = mock.Mock(side_effect=run_pexpect_side_effect)
self.task.run(self.pk)
def test_cloudforms_source(self):
cloudforms = CredentialType.defaults['cloudforms']()
self.instance.source = 'cloudforms'
self.instance.credential = Credential(
credential_type=cloudforms,
inputs = {
'username': 'bob',
'password': 'secret',
'host': 'https://example.org'
}
)
self.instance.credential.inputs['password'] = encrypt_field(
self.instance.credential, 'password'
)
def run_pexpect_side_effect(*args, **kwargs):
job, args, cwd, env, passwords, stdout = args
config = ConfigParser.ConfigParser()
config.read(env['CLOUDFORMS_INI_PATH'])
assert config.get('cloudforms', 'url') == 'https://example.org'
assert config.get('cloudforms', 'username') == 'bob'
assert config.get('cloudforms', 'password') == 'secret'
assert config.get('cloudforms', 'ssl_verify') == 'false'
return ['successful', 0]
self.task.run_pexpect = mock.Mock(side_effect=run_pexpect_side_effect)
self.task.run(self.pk)

View File

@ -281,6 +281,7 @@ REST_FRAMEWORK = {
'VIEW_NAME_FUNCTION': 'awx.api.generics.get_view_name', 'VIEW_NAME_FUNCTION': 'awx.api.generics.get_view_name',
'VIEW_DESCRIPTION_FUNCTION': 'awx.api.generics.get_view_description', 'VIEW_DESCRIPTION_FUNCTION': 'awx.api.generics.get_view_description',
'NON_FIELD_ERRORS_KEY': '__all__', 'NON_FIELD_ERRORS_KEY': '__all__',
'DEFAULT_VERSION': 'v2'
} }
AUTHENTICATION_BACKENDS = ( AUTHENTICATION_BACKENDS = (

View File

@ -321,7 +321,7 @@ try:
name='%s Credential %d User %d' % (prefix, credential_id, user_idx), name='%s Credential %d User %d' % (prefix, credential_id, user_idx),
defaults=dict(created_by=next(creator_gen), defaults=dict(created_by=next(creator_gen),
modified_by=next(modifier_gen)), modified_by=next(modifier_gen)),
kind='ssh' credential_type=CredentialType.from_v1_kind('ssh')
) )
credential.admin_role.members.add(user) credential.admin_role.members.add(user)
credentials.append(credential) credentials.append(credential)
@ -344,7 +344,7 @@ try:
name='%s Credential %d team %d' % (prefix, credential_id, team_idx), name='%s Credential %d team %d' % (prefix, credential_id, team_idx),
defaults=dict(created_by=next(creator_gen), defaults=dict(created_by=next(creator_gen),
modified_by=next(modifier_gen)), modified_by=next(modifier_gen)),
kind='ssh' credential_type=CredentialType.from_v1_kind('ssh')
) )
credential.admin_role.parents.add(team.member_role) credential.admin_role.parents.add(team.member_role)
credentials.append(credential) credentials.append(credential)