1
0
mirror of https://github.com/ansible/awx.git synced 2024-10-31 15:21:13 +03:00

fixup unit tests for tasks

This commit is contained in:
chris meyers 2019-03-18 12:51:32 -04:00
parent 8a04c22b2b
commit 5135b8a969

View File

@ -18,7 +18,6 @@ import yaml
from django.conf import settings
from awx.main.models import (
AdHocCommand,
Credential,
@ -41,6 +40,7 @@ from awx.main import tasks
from awx.main.queue import CallbackQueueDispatcher
from awx.main.utils import encrypt_field, encrypt_value, OutputEventFilter
from awx.main.utils.safe_yaml import SafeLoader
from awx.main.exceptions import AwxTaskError
class TestJobExecution(object):
@ -65,7 +65,12 @@ def patch_Job():
@pytest.fixture
def job():
return Job(pk=1, id=1, project=Project(), inventory=Inventory())
return Job(pk=1, id=1, project=Project(), inventory=Inventory(), job_template=JobTemplate(id=1, name='foo'))
@pytest.fixture
def adhoc_job():
return AdHocCommand(pk=1, id=1, inventory=Inventory())
@pytest.fixture
@ -77,6 +82,15 @@ def update_model_wrapper(job):
return fn
@pytest.fixture
def adhoc_update_model_wrapper(adhoc_job):
def fn(pk, **kwargs):
for k, v in kwargs.items():
setattr(adhoc_job, k, v)
return adhoc_job
return fn
@pytest.fixture
def patch_CallbackQueueDispatcher():
with mock.patch('awx.main.tasks.CallbackQueueDispatcher') as m:
@ -103,23 +117,21 @@ def test_work_success_callback_missing_job():
assert tasks.handle_work_success(task_data) is None
def test_send_notifications_list(mocker):
patches = list()
@mock.patch('awx.main.models.UnifiedJob.objects.get')
@mock.patch('awx.main.models.Notification.objects.filter')
def test_send_notifications_list(mock_notifications_filter, mock_job_get, mocker):
mock_job = mocker.MagicMock(spec=UnifiedJob)
patches.append(mocker.patch('awx.main.models.UnifiedJob.objects.get', return_value=mock_job))
mock_job_get.return_value = mock_job
mock_notifications = [mocker.MagicMock(spec=Notification, subject="test", body={'hello': 'world'})]
patches.append(mocker.patch('awx.main.models.Notification.objects.filter', return_value=mock_notifications))
mock_notifications_filter.return_value = mock_notifications
with apply_patches(patches):
tasks.send_notifications([1,2], job_id=1)
assert Notification.objects.filter.call_count == 1
assert mock_notifications[0].status == "successful"
assert mock_notifications[0].save.called
tasks.send_notifications([1,2], job_id=1)
assert Notification.objects.filter.call_count == 1
assert mock_notifications[0].status == "successful"
assert mock_notifications[0].save.called
assert mock_job.notifications.add.called
assert mock_job.notifications.add.called_with(*mock_notifications)
assert mock_job.notifications.add.called
assert mock_job.notifications.add.called_with(*mock_notifications)
@pytest.mark.parametrize("key,value", [
@ -142,7 +154,7 @@ def test_safe_env_returns_new_copy():
@pytest.mark.parametrize("source,expected", [
(None, True), (False, False), (True, True)
])
def test_openstack_client_config_generation(mocker, source, expected):
def test_openstack_client_config_generation(mocker, source, expected, private_data_dir):
update = tasks.RunInventoryUpdate()
credential_type = CredentialType.defaults['openstack']()
inputs = {
@ -162,7 +174,7 @@ def test_openstack_client_config_generation(mocker, source, expected):
'source_vars_dict': {},
'get_cloud_credential': cred_method
})
cloud_config = update.build_private_data(inventory_update)
cloud_config = update.build_private_data(inventory_update, private_data_dir)
cloud_credential = yaml.load(
cloud_config.get('credentials')[credential]
)
@ -184,7 +196,7 @@ def test_openstack_client_config_generation(mocker, source, expected):
@pytest.mark.parametrize("source,expected", [
(False, False), (True, True)
])
def test_openstack_client_config_generation_with_private_source_vars(mocker, source, expected):
def test_openstack_client_config_generation_with_private_source_vars(mocker, source, expected, private_data_dir):
update = tasks.RunInventoryUpdate()
credential_type = CredentialType.defaults['openstack']()
inputs = {
@ -203,7 +215,7 @@ def test_openstack_client_config_generation_with_private_source_vars(mocker, sou
'source_vars_dict': {'private': source},
'get_cloud_credential': cred_method
})
cloud_config = update.build_private_data(inventory_update)
cloud_config = update.build_private_data(inventory_update, private_data_dir)
cloud_credential = yaml.load(
cloud_config.get('credentials')[credential]
)
@ -251,124 +263,109 @@ class TestExtraVarSanitation(TestJobExecution):
UNSAFE = '{{ lookup(''pipe'',''ls -la'') }}'
def test_vars_unsafe_by_default(self):
self.instance.created_by = User(pk=123, username='angry-spud')
def test_vars_unsafe_by_default(self, job, private_data_dir):
job.created_by = User(pk=123, username='angry-spud')
def run_pexpect_side_effect(*args, **kwargs):
args, cwd, env, stdout = args
extra_vars = parse_extra_vars(args)
task = tasks.RunJob()
task.build_extra_vars_file(job, private_data_dir, {})
# ensure that strings are marked as unsafe
for unsafe in ['awx_job_template_name', 'tower_job_template_name',
'awx_user_name', 'tower_job_launch_type',
'awx_project_revision',
'tower_project_revision', 'tower_user_name',
'awx_job_launch_type']:
assert hasattr(extra_vars[unsafe], '__UNSAFE__')
fd = open(os.path.join(private_data_dir, 'env', 'extravars'))
extra_vars = yaml.load(fd, SafeLoader)
# ensure that non-strings are marked as safe
for safe in ['awx_job_template_id', 'awx_job_id', 'awx_user_id',
'tower_user_id', 'tower_job_template_id',
'tower_job_id']:
assert not hasattr(extra_vars[safe], '__UNSAFE__')
return ['successful', 0]
# ensure that strings are marked as unsafe
for unsafe in ['awx_job_template_name', 'tower_job_template_name',
'awx_user_name', 'tower_job_launch_type',
'awx_project_revision',
'tower_project_revision', 'tower_user_name',
'awx_job_launch_type']:
assert hasattr(extra_vars[unsafe], '__UNSAFE__')
self.run_pexpect.side_effect = run_pexpect_side_effect
self.task.run(self.pk)
# ensure that non-strings are marked as safe
for safe in ['awx_job_template_id', 'awx_job_id', 'awx_user_id',
'tower_user_id', 'tower_job_template_id',
'tower_job_id']:
assert not hasattr(extra_vars[safe], '__UNSAFE__')
def test_launchtime_vars_unsafe(self):
self.instance.extra_vars = json.dumps({'msg': self.UNSAFE})
def run_pexpect_side_effect(*args, **kwargs):
args, cwd, env, stdout = args
extra_vars = parse_extra_vars(args)
assert extra_vars['msg'] == self.UNSAFE
assert hasattr(extra_vars['msg'], '__UNSAFE__')
return ['successful', 0]
def test_launchtime_vars_unsafe(self, job, private_data_dir):
job.extra_vars = json.dumps({'msg': self.UNSAFE})
task = tasks.RunJob()
self.run_pexpect.side_effect = run_pexpect_side_effect
self.task.run(self.pk)
task.build_extra_vars_file(job, private_data_dir, {})
def test_nested_launchtime_vars_unsafe(self):
self.instance.extra_vars = json.dumps({'msg': {'a': [self.UNSAFE]}})
fd = open(os.path.join(private_data_dir, 'env', 'extravars'))
extra_vars = yaml.load(fd, SafeLoader)
assert extra_vars['msg'] == self.UNSAFE
assert hasattr(extra_vars['msg'], '__UNSAFE__')
def run_pexpect_side_effect(*args, **kwargs):
args, cwd, env, stdout = args
extra_vars = parse_extra_vars(args)
assert extra_vars['msg'] == {'a': [self.UNSAFE]}
assert hasattr(extra_vars['msg']['a'][0], '__UNSAFE__')
return ['successful', 0]
def test_nested_launchtime_vars_unsafe(self, job, private_data_dir):
job.extra_vars = json.dumps({'msg': {'a': [self.UNSAFE]}})
task = tasks.RunJob()
self.run_pexpect.side_effect = run_pexpect_side_effect
self.task.run(self.pk)
task.build_extra_vars_file(job, private_data_dir, {})
def test_whitelisted_jt_extra_vars(self):
self.instance.job_template.extra_vars = self.instance.extra_vars = json.dumps({'msg': self.UNSAFE})
fd = open(os.path.join(private_data_dir, 'env', 'extravars'))
extra_vars = yaml.load(fd, SafeLoader)
assert extra_vars['msg'] == {'a': [self.UNSAFE]}
assert hasattr(extra_vars['msg']['a'][0], '__UNSAFE__')
def run_pexpect_side_effect(*args, **kwargs):
args, cwd, env, stdout = args
extra_vars = parse_extra_vars(args)
assert extra_vars['msg'] == self.UNSAFE
assert not hasattr(extra_vars['msg'], '__UNSAFE__')
return ['successful', 0]
def test_whitelisted_jt_extra_vars(self, job, private_data_dir):
job.job_template.extra_vars = job.extra_vars = json.dumps({'msg': self.UNSAFE})
task = tasks.RunJob()
self.run_pexpect.side_effect = run_pexpect_side_effect
self.task.run(self.pk)
task.build_extra_vars_file(job, private_data_dir, {})
def test_nested_whitelisted_vars(self):
self.instance.extra_vars = json.dumps({'msg': {'a': {'b': [self.UNSAFE]}}})
self.instance.job_template.extra_vars = self.instance.extra_vars
fd = open(os.path.join(private_data_dir, 'env', 'extravars'))
extra_vars = yaml.load(fd, SafeLoader)
assert extra_vars['msg'] == self.UNSAFE
assert not hasattr(extra_vars['msg'], '__UNSAFE__')
def run_pexpect_side_effect(*args, **kwargs):
args, cwd, env, stdout = args
extra_vars = parse_extra_vars(args)
assert extra_vars['msg'] == {'a': {'b': [self.UNSAFE]}}
assert not hasattr(extra_vars['msg']['a']['b'][0], '__UNSAFE__')
return ['successful', 0]
def test_nested_whitelisted_vars(self, job, private_data_dir):
job.extra_vars = json.dumps({'msg': {'a': {'b': [self.UNSAFE]}}})
job.job_template.extra_vars = job.extra_vars
task = tasks.RunJob()
self.run_pexpect.side_effect = run_pexpect_side_effect
self.task.run(self.pk)
task.build_extra_vars_file(job, private_data_dir, {})
def test_sensitive_values_dont_leak(self):
fd = open(os.path.join(private_data_dir, 'env', 'extravars'))
extra_vars = yaml.load(fd, SafeLoader)
assert extra_vars['msg'] == {'a': {'b': [self.UNSAFE]}}
assert not hasattr(extra_vars['msg']['a']['b'][0], '__UNSAFE__')
def test_sensitive_values_dont_leak(self, job, private_data_dir):
# JT defines `msg=SENSITIVE`, the job *should not* be able to do
# `other_var=SENSITIVE`
self.instance.job_template.extra_vars = json.dumps({'msg': self.UNSAFE})
self.instance.extra_vars = json.dumps({
job.job_template.extra_vars = json.dumps({'msg': self.UNSAFE})
job.extra_vars = json.dumps({
'msg': 'other-value',
'other_var': self.UNSAFE
})
task = tasks.RunJob()
def run_pexpect_side_effect(*args, **kwargs):
args, cwd, env, stdout = args
extra_vars = parse_extra_vars(args)
task.build_extra_vars_file(job, private_data_dir, {})
assert extra_vars['msg'] == 'other-value'
assert hasattr(extra_vars['msg'], '__UNSAFE__')
fd = open(os.path.join(private_data_dir, 'env', 'extravars'))
extra_vars = yaml.load(fd, SafeLoader)
assert extra_vars['msg'] == 'other-value'
assert hasattr(extra_vars['msg'], '__UNSAFE__')
assert extra_vars['other_var'] == self.UNSAFE
assert hasattr(extra_vars['other_var'], '__UNSAFE__')
assert extra_vars['other_var'] == self.UNSAFE
assert hasattr(extra_vars['other_var'], '__UNSAFE__')
return ['successful', 0]
def test_overwritten_jt_extra_vars(self, job, private_data_dir):
job.job_template.extra_vars = json.dumps({'msg': 'SAFE'})
job.extra_vars = json.dumps({'msg': self.UNSAFE})
task = tasks.RunJob()
self.run_pexpect.side_effect = run_pexpect_side_effect
self.task.run(self.pk)
task.build_extra_vars_file(job, private_data_dir, {})
def test_overwritten_jt_extra_vars(self):
self.instance.job_template.extra_vars = json.dumps({'msg': 'SAFE'})
self.instance.extra_vars = json.dumps({'msg': self.UNSAFE})
def run_pexpect_side_effect(*args, **kwargs):
args, cwd, env, stdout = args
extra_vars = parse_extra_vars(args)
assert extra_vars['msg'] == self.UNSAFE
assert hasattr(extra_vars['msg'], '__UNSAFE__')
return ['successful', 0]
self.run_pexpect.side_effect = run_pexpect_side_effect
self.task.run(self.pk)
fd = open(os.path.join(private_data_dir, 'env', 'extravars'))
extra_vars = yaml.load(fd, SafeLoader)
assert extra_vars['msg'] == self.UNSAFE
assert hasattr(extra_vars['msg'], '__UNSAFE__')
class TestGenericRun(TestJobExecution):
class TestGenericRun():
def test_generic_failure(self, patch_Job):
job = Job(status='running', inventory=Inventory())
@ -554,40 +551,59 @@ class TestGenericRun(TestJobExecution):
class TestAdhocRun(TestJobExecution):
TASK_CLS = tasks.RunAdHocCommand
def test_options_jinja_usage(self, adhoc_job, adhoc_update_model_wrapper):
adhoc_job.module_args = '{{ ansible_ssh_pass }}'
adhoc_job.websocket_emit_status = mock.Mock()
def get_instance(self):
return AdHocCommand(
pk=1,
created=datetime.utcnow(),
status='new',
cancel_flag=False,
verbosity=3,
extra_vars={'awx_foo': 'awx-bar'}
)
task = tasks.RunAdHocCommand()
task.update_model = mock.Mock(wraps=adhoc_update_model_wrapper)
task.build_inventory = mock.Mock()
def test_options_jinja_usage(self):
self.instance.module_args = '{{ ansible_ssh_pass }}'
with pytest.raises(Exception):
self.task.run(self.pk)
update_model_call = self.task.update_model.call_args[1]
task.run(adhoc_job.pk)
call_args, _ = task.update_model.call_args_list[0]
update_model_call = task.update_model.call_args[1]
assert 'Jinja variables are not allowed' in update_model_call['result_traceback']
'''
TODO: The jinja action is in _write_extra_vars_file. The extra vars should
be wrapped in unsafe
'''
'''
def test_extra_vars_jinja_usage(self, adhoc_job, adhoc_update_model_wrapper):
adhoc_job.module_args = 'ls'
adhoc_job.extra_vars = json.dumps({
'foo': '{{ bar }}'
})
#adhoc_job.websocket_emit_status = mock.Mock()
task = tasks.RunAdHocCommand()
#task.update_model = mock.Mock(wraps=adhoc_update_model_wrapper)
#task.build_inventory = mock.Mock(return_value='/tmp/something.inventory')
task._write_extra_vars_file = mock.Mock()
task.build_extra_vars_file(adhoc_job, 'ignore')
call_args, _ = task._write_extra_vars_file.call_args_list[0]
private_data_dir, extra_vars = call_args
assert extra_vars['foo'] == '{{ bar }}'
'''
def test_created_by_extra_vars(self):
self.instance.created_by = User(pk=123, username='angry-spud')
adhoc_job = AdHocCommand(created_by=User(pk=123, username='angry-spud'))
def run_pexpect_side_effect(*args, **kwargs):
args, cwd, env, stdout = args
extra_vars = parse_extra_vars(args)
assert extra_vars['tower_user_id'] == 123
assert extra_vars['tower_user_name'] == "angry-spud"
assert extra_vars['awx_user_id'] == 123
assert extra_vars['awx_user_name'] == "angry-spud"
assert extra_vars['awx_foo'] == "awx-bar"
return ['successful', 0]
task = tasks.RunAdHocCommand()
task._write_extra_vars_file = mock.Mock()
task.build_extra_vars_file(adhoc_job, None, dict())
self.run_pexpect.side_effect = run_pexpect_side_effect
self.task.run(self.pk)
call_args, _ = task._write_extra_vars_file.call_args_list[0]
private_data_dir, extra_vars = call_args
assert extra_vars['tower_user_id'] == 123
assert extra_vars['tower_user_name'] == "angry-spud"
assert extra_vars['awx_user_id'] == 123
assert extra_vars['awx_user_name'] == "angry-spud"
class TestIsolatedExecution(TestJobExecution):