mirror of
https://github.com/ansible/awx.git
synced 2024-11-01 08:21:15 +03:00
refactor credential injection for builtin types
this cleanups up a _lot_ of code duplication that we have for builtin credential types. it will allow customers to setup custom inventory sources that utilize builtin credential types (e.g., a custom inventory script that could use an AzureRM credential) see: https://github.com/ansible/ansible-tower/issues/7852
This commit is contained in:
parent
1a98cedc0f
commit
dbb4d2b011
@ -6,6 +6,7 @@ import json
|
||||
import logging
|
||||
import operator
|
||||
import os
|
||||
import re
|
||||
import stat
|
||||
import tempfile
|
||||
|
||||
@ -33,11 +34,34 @@ from awx.main.models.rbac import (
|
||||
ROLE_SINGLETON_SYSTEM_AUDITOR,
|
||||
)
|
||||
from awx.main.utils import encrypt_field
|
||||
from . import injectors as builtin_injectors
|
||||
|
||||
__all__ = ['Credential', 'CredentialType', 'V1Credential']
|
||||
__all__ = ['Credential', 'CredentialType', 'V1Credential', 'build_safe_env']
|
||||
|
||||
logger = logging.getLogger('awx.main.models.credential')
|
||||
|
||||
HIDDEN_PASSWORD = '**********'
|
||||
|
||||
|
||||
def build_safe_env(env):
|
||||
'''
|
||||
Build environment dictionary, hiding potentially sensitive information
|
||||
such as passwords or keys.
|
||||
'''
|
||||
hidden_re = re.compile(r'API|TOKEN|KEY|SECRET|PASS', re.I)
|
||||
urlpass_re = re.compile(r'^.*?://[^:]+:(.*?)@.*?$')
|
||||
safe_env = dict(env)
|
||||
for k, v in safe_env.items():
|
||||
if k == 'AWS_ACCESS_KEY_ID':
|
||||
continue
|
||||
elif k.startswith('ANSIBLE_') and not k.startswith('ANSIBLE_NET'):
|
||||
continue
|
||||
elif hidden_re.search(k):
|
||||
safe_env[k] = HIDDEN_PASSWORD
|
||||
elif type(v) == str and urlpass_re.match(v):
|
||||
safe_env[k] = urlpass_re.sub(HIDDEN_PASSWORD, v)
|
||||
return safe_env
|
||||
|
||||
|
||||
class V1Credential(object):
|
||||
|
||||
@ -562,6 +586,11 @@ class CredentialType(CommonModelNameNotUnique):
|
||||
files)
|
||||
"""
|
||||
if not self.injectors:
|
||||
if self.managed_by_tower and credential.kind in dir(builtin_injectors):
|
||||
injected_env = {}
|
||||
getattr(builtin_injectors, credential.kind)(credential, injected_env)
|
||||
env.update(injected_env)
|
||||
safe_env.update(build_safe_env(injected_env))
|
||||
return
|
||||
|
||||
class TowerNamespace:
|
35
awx/main/models/credential/injectors.py
Normal file
35
awx/main/models/credential/injectors.py
Normal file
@ -0,0 +1,35 @@
|
||||
from awx.main.utils import decrypt_field
|
||||
from django.conf import settings
|
||||
|
||||
|
||||
def aws(cred, env):
|
||||
env['AWS_ACCESS_KEY_ID'] = cred.username
|
||||
env['AWS_SECRET_ACCESS_KEY'] = decrypt_field(cred, 'password')
|
||||
if len(cred.security_token) > 0:
|
||||
env['AWS_SECURITY_TOKEN'] = decrypt_field(cred, 'security_token')
|
||||
|
||||
|
||||
def gce(cred, env):
|
||||
env['GCE_EMAIL'] = cred.username
|
||||
env['GCE_PROJECT'] = cred.project
|
||||
|
||||
|
||||
def azure_rm(cred, env):
|
||||
if len(cred.client) and len(cred.tenant):
|
||||
env['AZURE_CLIENT_ID'] = cred.client
|
||||
env['AZURE_SECRET'] = decrypt_field(cred, 'secret')
|
||||
env['AZURE_TENANT'] = cred.tenant
|
||||
env['AZURE_SUBSCRIPTION_ID'] = cred.subscription
|
||||
else:
|
||||
env['AZURE_SUBSCRIPTION_ID'] = cred.subscription
|
||||
env['AZURE_AD_USER'] = cred.username
|
||||
env['AZURE_PASSWORD'] = decrypt_field(cred, 'password')
|
||||
if cred.inputs.get('cloud_environment', None):
|
||||
env['AZURE_CLOUD_ENVIRONMENT'] = cred.inputs['cloud_environment']
|
||||
|
||||
|
||||
def vmware(cred, env):
|
||||
env['VMWARE_USER'] = cred.username
|
||||
env['VMWARE_PASSWORD'] = decrypt_field(cred, 'password')
|
||||
env['VMWARE_HOST'] = cred.host
|
||||
env['VMWARE_VALIDATE_CERTS'] = str(settings.VMWARE_VALIDATE_CERTS)
|
@ -670,25 +670,6 @@ class BaseTask(LogErrorsTask):
|
||||
env['PROOT_TMP_DIR'] = settings.AWX_PROOT_BASE_PATH
|
||||
return env
|
||||
|
||||
def build_safe_env(self, env, **kwargs):
|
||||
'''
|
||||
Build environment dictionary, hiding potentially sensitive information
|
||||
such as passwords or keys.
|
||||
'''
|
||||
hidden_re = re.compile(r'API|TOKEN|KEY|SECRET|PASS', re.I)
|
||||
urlpass_re = re.compile(r'^.*?://[^:]+:(.*?)@.*?$')
|
||||
safe_env = dict(env)
|
||||
for k,v in safe_env.items():
|
||||
if k == 'AWS_ACCESS_KEY_ID':
|
||||
continue
|
||||
elif k.startswith('ANSIBLE_') and not k.startswith('ANSIBLE_NET'):
|
||||
continue
|
||||
elif hidden_re.search(k):
|
||||
safe_env[k] = HIDDEN_PASSWORD
|
||||
elif type(v) == str and urlpass_re.match(v):
|
||||
safe_env[k] = urlpass_re.sub(HIDDEN_PASSWORD, v)
|
||||
return safe_env
|
||||
|
||||
def should_use_proot(self, instance, **kwargs):
|
||||
'''
|
||||
Return whether this task should use proot.
|
||||
@ -815,7 +796,7 @@ class BaseTask(LogErrorsTask):
|
||||
output_replacements = self.build_output_replacements(instance, **kwargs)
|
||||
cwd = self.build_cwd(instance, **kwargs)
|
||||
env = self.build_env(instance, **kwargs)
|
||||
safe_env = self.build_safe_env(env, **kwargs)
|
||||
safe_env = build_safe_env(env)
|
||||
|
||||
# handle custom injectors specified on the CredentialType
|
||||
credentials = []
|
||||
@ -1064,33 +1045,8 @@ class RunJob(BaseTask):
|
||||
# Set environment variables for cloud credentials.
|
||||
cred_files = kwargs.get('private_data_files', {}).get('credentials', {})
|
||||
for cloud_cred in job.cloud_credentials:
|
||||
if cloud_cred and cloud_cred.kind == 'aws':
|
||||
env['AWS_ACCESS_KEY_ID'] = cloud_cred.username
|
||||
env['AWS_SECRET_ACCESS_KEY'] = decrypt_field(cloud_cred, 'password')
|
||||
if len(cloud_cred.security_token) > 0:
|
||||
env['AWS_SECURITY_TOKEN'] = decrypt_field(cloud_cred, 'security_token')
|
||||
# FIXME: Add EC2_URL, maybe EC2_REGION!
|
||||
elif cloud_cred and cloud_cred.kind == 'gce':
|
||||
env['GCE_EMAIL'] = cloud_cred.username
|
||||
env['GCE_PROJECT'] = cloud_cred.project
|
||||
if cloud_cred and cloud_cred.kind == 'gce':
|
||||
env['GCE_PEM_FILE_PATH'] = cred_files.get(cloud_cred, '')
|
||||
elif cloud_cred and cloud_cred.kind == 'azure_rm':
|
||||
if len(cloud_cred.client) and len(cloud_cred.tenant):
|
||||
env['AZURE_CLIENT_ID'] = cloud_cred.client
|
||||
env['AZURE_SECRET'] = decrypt_field(cloud_cred, 'secret')
|
||||
env['AZURE_TENANT'] = cloud_cred.tenant
|
||||
env['AZURE_SUBSCRIPTION_ID'] = cloud_cred.subscription
|
||||
else:
|
||||
env['AZURE_SUBSCRIPTION_ID'] = cloud_cred.subscription
|
||||
env['AZURE_AD_USER'] = cloud_cred.username
|
||||
env['AZURE_PASSWORD'] = decrypt_field(cloud_cred, 'password')
|
||||
if cloud_cred.inputs.get('cloud_environment', None):
|
||||
env['AZURE_CLOUD_ENVIRONMENT'] = cloud_cred.inputs['cloud_environment']
|
||||
elif cloud_cred and cloud_cred.kind == 'vmware':
|
||||
env['VMWARE_USER'] = cloud_cred.username
|
||||
env['VMWARE_PASSWORD'] = decrypt_field(cloud_cred, 'password')
|
||||
env['VMWARE_HOST'] = cloud_cred.host
|
||||
env['VMWARE_VALIDATE_CERTS'] = str(settings.VMWARE_VALIDATE_CERTS)
|
||||
elif cloud_cred and cloud_cred.kind == 'openstack':
|
||||
env['OS_CLIENT_CONFIG_FILE'] = cred_files.get(cloud_cred, '')
|
||||
|
||||
@ -1839,38 +1795,22 @@ class RunInventoryUpdate(BaseTask):
|
||||
# The inventory modules are vendored in AWX in the
|
||||
# `awx/plugins/inventory` directory; those files should be kept in
|
||||
# sync with those in Ansible core at all times.
|
||||
passwords = kwargs.get('passwords', {})
|
||||
cred_data = kwargs.get('private_data_files', {}).get('credentials', '')
|
||||
cloud_credential = cred_data.get(inventory_update.credential, '')
|
||||
if inventory_update.source == 'ec2':
|
||||
if passwords.get('source_username', '') and passwords.get('source_password', ''):
|
||||
env['AWS_ACCESS_KEY_ID'] = passwords['source_username']
|
||||
env['AWS_SECRET_ACCESS_KEY'] = passwords['source_password']
|
||||
if len(passwords['source_security_token']) > 0:
|
||||
env['AWS_SECURITY_TOKEN'] = passwords['source_security_token']
|
||||
env['EC2_INI_PATH'] = cloud_credential
|
||||
elif inventory_update.source == 'vmware':
|
||||
env['VMWARE_INI_PATH'] = cloud_credential
|
||||
elif inventory_update.source == 'azure_rm':
|
||||
if len(passwords.get('source_client', '')) and \
|
||||
len(passwords.get('source_tenant', '')):
|
||||
env['AZURE_CLIENT_ID'] = passwords.get('source_client', '')
|
||||
env['AZURE_SECRET'] = passwords.get('source_secret', '')
|
||||
env['AZURE_TENANT'] = passwords.get('source_tenant', '')
|
||||
env['AZURE_SUBSCRIPTION_ID'] = passwords.get('source_subscription', '')
|
||||
else:
|
||||
env['AZURE_SUBSCRIPTION_ID'] = passwords.get('source_subscription', '')
|
||||
env['AZURE_AD_USER'] = passwords.get('source_username', '')
|
||||
env['AZURE_PASSWORD'] = passwords.get('source_password', '')
|
||||
env['AZURE_INI_PATH'] = cloud_credential
|
||||
if inventory_update.credential and \
|
||||
inventory_update.credential.inputs.get('cloud_environment', None):
|
||||
env['AZURE_CLOUD_ENVIRONMENT'] = inventory_update.credential.inputs['cloud_environment']
|
||||
elif inventory_update.source == 'gce':
|
||||
env['GCE_EMAIL'] = passwords.get('source_username', '')
|
||||
env['GCE_PROJECT'] = passwords.get('source_project', '')
|
||||
env['GCE_PEM_FILE_PATH'] = cloud_credential
|
||||
env['GCE_ZONE'] = inventory_update.source_regions if inventory_update.source_regions != 'all' else ''
|
||||
|
||||
ini_mapping = {
|
||||
'ec2': 'EC2_INI_PATH',
|
||||
'vmware': 'VMWARE_INI_PATH',
|
||||
'azure_rm': 'AZURE_INI_PATH',
|
||||
'gce': 'GCE_PEM_FILE_PATH',
|
||||
'openstack': 'OS_CLIENT_CONFIG_FILE',
|
||||
'satellite6': 'FOREMAN_INI_PATH',
|
||||
'cloudforms': 'CLOUDFORMS_INI_PATH'
|
||||
}
|
||||
if inventory_update.source in ini_mapping:
|
||||
cred_data = kwargs.get('private_data_files', {}).get('credentials', '')
|
||||
env[ini_mapping[inventory_update.source]] = cred_data.get(inventory_update.credential, '')
|
||||
|
||||
if inventory_update.source == 'gce':
|
||||
env['GCE_ZONE'] = inventory_update.source_regions if inventory_update.source_regions != 'all' else '' # noqa
|
||||
|
||||
# by default, the GCE inventory source caches results on disk for
|
||||
# 5 minutes; disable this behavior
|
||||
@ -1881,12 +1821,6 @@ class RunInventoryUpdate(BaseTask):
|
||||
cp.write(os.fdopen(handle, 'w'))
|
||||
os.chmod(path, stat.S_IRUSR | stat.S_IWUSR)
|
||||
env['GCE_INI_PATH'] = path
|
||||
elif inventory_update.source == 'openstack':
|
||||
env['OS_CLIENT_CONFIG_FILE'] = cloud_credential
|
||||
elif inventory_update.source == 'satellite6':
|
||||
env['FOREMAN_INI_PATH'] = cloud_credential
|
||||
elif inventory_update.source == 'cloudforms':
|
||||
env['CLOUDFORMS_INI_PATH'] = cloud_credential
|
||||
elif inventory_update.source in ['scm', 'custom']:
|
||||
for env_k in inventory_update.source_vars_dict:
|
||||
if str(env_k) not in env and str(env_k) not in settings.INV_ENV_VARIABLE_BLACKLIST:
|
||||
|
@ -28,7 +28,8 @@ from awx.main.models import (
|
||||
Project,
|
||||
ProjectUpdate,
|
||||
UnifiedJob,
|
||||
User
|
||||
User,
|
||||
build_safe_env
|
||||
)
|
||||
|
||||
from awx.main import tasks
|
||||
@ -91,14 +92,12 @@ def test_send_notifications_list(mocker):
|
||||
('CALLBACK_CONNECTION', 'amqp://tower:password@localhost:5672/tower'),
|
||||
])
|
||||
def test_safe_env_filtering(key, value):
|
||||
task = tasks.RunJob()
|
||||
assert task.build_safe_env({key: value})[key] == tasks.HIDDEN_PASSWORD
|
||||
assert build_safe_env({key: value})[key] == tasks.HIDDEN_PASSWORD
|
||||
|
||||
|
||||
def test_safe_env_returns_new_copy():
|
||||
task = tasks.RunJob()
|
||||
env = {'foo': 'bar'}
|
||||
assert task.build_safe_env(env) is not env
|
||||
assert build_safe_env(env) is not env
|
||||
|
||||
|
||||
def test_openstack_client_config_generation(mocker):
|
||||
@ -227,6 +226,8 @@ class TestJobExecution:
|
||||
# the update we care about for testing purposes
|
||||
if 'status' in kwargs:
|
||||
self.instance.status = kwargs['status']
|
||||
if 'job_env' in kwargs:
|
||||
self.instance.job_env = kwargs['job_env']
|
||||
return self.instance
|
||||
|
||||
self.task = self.TASK_CLS()
|
||||
@ -682,6 +683,7 @@ class TestJobCredentials(TestJobExecution):
|
||||
assert env['AWS_ACCESS_KEY_ID'] == 'bob'
|
||||
assert env['AWS_SECRET_ACCESS_KEY'] == 'secret'
|
||||
assert 'AWS_SECURITY_TOKEN' not in env
|
||||
assert self.instance.job_env['AWS_SECRET_ACCESS_KEY'] == tasks.HIDDEN_PASSWORD
|
||||
|
||||
def test_aws_cloud_credential_with_sts_token(self):
|
||||
aws = CredentialType.defaults['aws']()
|
||||
@ -702,6 +704,7 @@ class TestJobCredentials(TestJobExecution):
|
||||
assert env['AWS_ACCESS_KEY_ID'] == 'bob'
|
||||
assert env['AWS_SECRET_ACCESS_KEY'] == 'secret'
|
||||
assert env['AWS_SECURITY_TOKEN'] == 'token'
|
||||
assert self.instance.job_env['AWS_SECRET_ACCESS_KEY'] == tasks.HIDDEN_PASSWORD
|
||||
|
||||
def test_gce_credentials(self):
|
||||
gce = CredentialType.defaults['gce']()
|
||||
@ -753,6 +756,7 @@ class TestJobCredentials(TestJobExecution):
|
||||
assert env['AZURE_SECRET'] == 'some-secret'
|
||||
assert env['AZURE_TENANT'] == 'some-tenant'
|
||||
assert env['AZURE_SUBSCRIPTION_ID'] == 'some-subscription'
|
||||
assert self.instance.job_env['AZURE_SECRET'] == tasks.HIDDEN_PASSWORD
|
||||
|
||||
def test_azure_rm_with_password(self):
|
||||
azure = CredentialType.defaults['azure_rm']()
|
||||
@ -779,6 +783,7 @@ class TestJobCredentials(TestJobExecution):
|
||||
assert env['AZURE_AD_USER'] == 'bob'
|
||||
assert env['AZURE_PASSWORD'] == 'secret'
|
||||
assert env['AZURE_CLOUD_ENVIRONMENT'] == 'foobar'
|
||||
assert self.instance.job_env['AZURE_PASSWORD'] == tasks.HIDDEN_PASSWORD
|
||||
|
||||
def test_vmware_credentials(self):
|
||||
vmware = CredentialType.defaults['vmware']()
|
||||
@ -798,6 +803,7 @@ class TestJobCredentials(TestJobExecution):
|
||||
assert env['VMWARE_USER'] == 'bob'
|
||||
assert env['VMWARE_PASSWORD'] == 'secret'
|
||||
assert env['VMWARE_HOST'] == 'https://example.org'
|
||||
assert self.instance.job_env['VMWARE_PASSWORD'] == tasks.HIDDEN_PASSWORD
|
||||
|
||||
def test_openstack_credentials(self):
|
||||
openstack = CredentialType.defaults['openstack']()
|
||||
@ -895,6 +901,7 @@ class TestJobCredentials(TestJobExecution):
|
||||
|
||||
self.run_pexpect.side_effect = run_pexpect_side_effect
|
||||
self.task.run(self.pk)
|
||||
assert self.instance.job_env['ANSIBLE_NET_PASSWORD'] == tasks.HIDDEN_PASSWORD
|
||||
|
||||
def test_custom_environment_injectors_with_jinja_syntax_error(self):
|
||||
some_cloud = CredentialType(
|
||||
@ -1052,6 +1059,7 @@ class TestJobCredentials(TestJobExecution):
|
||||
|
||||
assert env['MY_CLOUD_PRIVATE_VAR'] == 'SUPER-SECRET-123'
|
||||
assert 'SUPER-SECRET-123' not in json.dumps(self.task.update_model.call_args_list)
|
||||
assert self.instance.job_env['MY_CLOUD_PRIVATE_VAR'] == tasks.HIDDEN_PASSWORD
|
||||
|
||||
def test_custom_environment_injectors_with_extra_vars(self):
|
||||
some_cloud = CredentialType(
|
||||
@ -1260,6 +1268,7 @@ class TestJobCredentials(TestJobExecution):
|
||||
|
||||
self.run_pexpect.side_effect = run_pexpect_side_effect
|
||||
self.task.run(self.pk)
|
||||
assert self.instance.job_env['AZURE_PASSWORD'] == tasks.HIDDEN_PASSWORD
|
||||
|
||||
def test_awx_task_env(self):
|
||||
patch = mock.patch('awx.main.tasks.settings.AWX_TASK_ENV', {'FOO': 'BAR'})
|
||||
@ -1420,6 +1429,46 @@ class TestInventoryUpdateCredentials(TestJobExecution):
|
||||
self.run_pexpect.side_effect = run_pexpect_side_effect
|
||||
self.task.run(self.pk)
|
||||
|
||||
@pytest.mark.parametrize('with_credential', [True, False])
|
||||
def test_custom_source(self, with_credential):
|
||||
self.instance.source = 'custom'
|
||||
self.instance.source_vars = '{"FOO": "BAR"}'
|
||||
patch = mock.patch.object(InventoryUpdate, 'source_script', mock.Mock(
|
||||
script='#!/bin/sh\necho "Hello, World!"')
|
||||
)
|
||||
self.patches.append(patch)
|
||||
patch.start()
|
||||
|
||||
if with_credential:
|
||||
azure_rm = CredentialType.defaults['azure_rm']()
|
||||
self.instance.credential = Credential(
|
||||
pk=1,
|
||||
credential_type=azure_rm,
|
||||
inputs = {
|
||||
'client': 'some-client',
|
||||
'secret': 'some-secret',
|
||||
'tenant': 'some-tenant',
|
||||
'subscription': 'some-subscription',
|
||||
}
|
||||
)
|
||||
|
||||
def run_pexpect_side_effect(*args, **kwargs):
|
||||
args, cwd, env, stdout = args
|
||||
assert '--custom' in ' '.join(args)
|
||||
script = args[args.index('--source') + 1]
|
||||
with open(script, 'r') as f:
|
||||
assert f.read() == self.instance.source_script.script
|
||||
assert env['FOO'] == 'BAR'
|
||||
if with_credential:
|
||||
assert env['AZURE_CLIENT_ID'] == 'some-client'
|
||||
assert env['AZURE_SECRET'] == 'some-secret'
|
||||
assert env['AZURE_TENANT'] == 'some-tenant'
|
||||
assert env['AZURE_SUBSCRIPTION_ID'] == 'some-subscription'
|
||||
return ['successful', 0]
|
||||
|
||||
self.run_pexpect.side_effect = run_pexpect_side_effect
|
||||
self.task.run(self.pk)
|
||||
|
||||
def test_ec2_source(self):
|
||||
aws = CredentialType.defaults['aws']()
|
||||
self.instance.source = 'ec2'
|
||||
@ -1446,6 +1495,7 @@ class TestInventoryUpdateCredentials(TestJobExecution):
|
||||
|
||||
self.run_pexpect.side_effect = run_pexpect_side_effect
|
||||
self.task.run(self.pk)
|
||||
assert self.instance.job_env['AWS_SECRET_ACCESS_KEY'] == tasks.HIDDEN_PASSWORD
|
||||
|
||||
def test_vmware_source(self):
|
||||
vmware = CredentialType.defaults['vmware']()
|
||||
@ -1472,6 +1522,78 @@ class TestInventoryUpdateCredentials(TestJobExecution):
|
||||
self.run_pexpect.side_effect = run_pexpect_side_effect
|
||||
self.task.run(self.pk)
|
||||
|
||||
def test_azure_rm_source_with_tenant(self):
|
||||
azure_rm = CredentialType.defaults['azure_rm']()
|
||||
self.instance.source = 'azure_rm'
|
||||
self.instance.source_regions = 'north, south, east, west'
|
||||
self.instance.credential = Credential(
|
||||
pk=1,
|
||||
credential_type=azure_rm,
|
||||
inputs = {
|
||||
'client': 'some-client',
|
||||
'secret': 'some-secret',
|
||||
'tenant': 'some-tenant',
|
||||
'subscription': 'some-subscription',
|
||||
'cloud_environment': 'foobar'
|
||||
}
|
||||
)
|
||||
|
||||
def run_pexpect_side_effect(*args, **kwargs):
|
||||
args, cwd, env, stdout = args
|
||||
assert env['AZURE_CLIENT_ID'] == 'some-client'
|
||||
assert env['AZURE_SECRET'] == 'some-secret'
|
||||
assert env['AZURE_TENANT'] == 'some-tenant'
|
||||
assert env['AZURE_SUBSCRIPTION_ID'] == 'some-subscription'
|
||||
assert env['AZURE_CLOUD_ENVIRONMENT'] == 'foobar'
|
||||
|
||||
config = ConfigParser.ConfigParser()
|
||||
config.read(env['AZURE_INI_PATH'])
|
||||
assert config.get('azure', 'include_powerstate') == 'yes'
|
||||
assert config.get('azure', 'group_by_resource_group') == 'yes'
|
||||
assert config.get('azure', 'group_by_location') == 'yes'
|
||||
assert config.get('azure', 'group_by_tag') == 'yes'
|
||||
assert config.get('azure', 'locations') == 'north,south,east,west'
|
||||
return ['successful', 0]
|
||||
|
||||
self.run_pexpect.side_effect = run_pexpect_side_effect
|
||||
self.task.run(self.pk)
|
||||
assert self.instance.job_env['AZURE_SECRET'] == tasks.HIDDEN_PASSWORD
|
||||
|
||||
def test_azure_rm_source_with_password(self):
|
||||
azure_rm = CredentialType.defaults['azure_rm']()
|
||||
self.instance.source = 'azure_rm'
|
||||
self.instance.source_regions = 'all'
|
||||
self.instance.credential = Credential(
|
||||
pk=1,
|
||||
credential_type=azure_rm,
|
||||
inputs = {
|
||||
'subscription': 'some-subscription',
|
||||
'username': 'bob',
|
||||
'password': 'secret',
|
||||
'cloud_environment': 'foobar'
|
||||
}
|
||||
)
|
||||
|
||||
def run_pexpect_side_effect(*args, **kwargs):
|
||||
args, cwd, env, stdout = args
|
||||
assert env['AZURE_SUBSCRIPTION_ID'] == 'some-subscription'
|
||||
assert env['AZURE_AD_USER'] == 'bob'
|
||||
assert env['AZURE_PASSWORD'] == 'secret'
|
||||
assert env['AZURE_CLOUD_ENVIRONMENT'] == 'foobar'
|
||||
|
||||
config = ConfigParser.ConfigParser()
|
||||
config.read(env['AZURE_INI_PATH'])
|
||||
assert config.get('azure', 'include_powerstate') == 'yes'
|
||||
assert config.get('azure', 'group_by_resource_group') == 'yes'
|
||||
assert config.get('azure', 'group_by_location') == 'yes'
|
||||
assert config.get('azure', 'group_by_tag') == 'yes'
|
||||
assert 'locations' not in config.items('azure')
|
||||
return ['successful', 0]
|
||||
|
||||
self.run_pexpect.side_effect = run_pexpect_side_effect
|
||||
self.task.run(self.pk)
|
||||
assert self.instance.job_env['AZURE_PASSWORD'] == tasks.HIDDEN_PASSWORD
|
||||
|
||||
def test_gce_source(self):
|
||||
gce = CredentialType.defaults['gce']()
|
||||
self.instance.source = 'gce'
|
||||
|
Loading…
Reference in New Issue
Block a user