mirror of
https://github.com/ansible/awx.git
synced 2024-10-31 15:21:13 +03:00
Mock out job start related stuff in runtime tests
This commit is contained in:
parent
718b5019b6
commit
64fc0c8ecb
@ -8,8 +8,6 @@ from awx.main.models.jobs import Job, JobTemplate
|
|||||||
|
|
||||||
from django.core.urlresolvers import reverse
|
from django.core.urlresolvers import reverse
|
||||||
|
|
||||||
from copy import copy
|
|
||||||
|
|
||||||
@pytest.fixture
|
@pytest.fixture
|
||||||
def runtime_data(organization):
|
def runtime_data(organization):
|
||||||
cred_obj = Credential.objects.create(name='runtime-cred', kind='ssh', username='test_user2', password='pas4word2')
|
cred_obj = Credential.objects.create(name='runtime-cred', kind='ssh', username='test_user2', password='pas4word2')
|
||||||
@ -60,24 +58,23 @@ def job_template_prompts_null(project):
|
|||||||
|
|
||||||
@pytest.mark.django_db
|
@pytest.mark.django_db
|
||||||
@pytest.mark.job_runtime_vars
|
@pytest.mark.job_runtime_vars
|
||||||
def test_job_ignore_unprompted_vars(runtime_data, job_template_prompts, post, user):
|
def test_job_ignore_unprompted_vars(runtime_data, job_template_prompts, post, user, mocker):
|
||||||
job_template = job_template_prompts(False)
|
job_template = job_template_prompts(False)
|
||||||
job_template_saved = copy(job_template)
|
|
||||||
|
|
||||||
response = post(reverse('api:job_template_launch', args=[job_template.pk]),
|
mock_job = mocker.MagicMock(spec=Job, id=968, **runtime_data)
|
||||||
runtime_data, user('admin', True))
|
|
||||||
|
|
||||||
assert response.status_code == 201
|
with mocker.patch('awx.main.models.unified_jobs.UnifiedJobTemplate.create_unified_job', return_value=mock_job):
|
||||||
|
with mocker.patch('awx.api.serializers.JobSerializer.to_representation'):
|
||||||
|
response = post(reverse('api:job_template_launch', args=[job_template.pk]),
|
||||||
|
runtime_data, user('admin', True))
|
||||||
|
assert response.status_code == 201
|
||||||
|
|
||||||
|
# Check that job is serialized correctly
|
||||||
job_id = response.data['job']
|
job_id = response.data['job']
|
||||||
job_obj = Job.objects.get(pk=job_id)
|
assert job_id == 968
|
||||||
|
|
||||||
# Check that job data matches job_template data
|
# If job is created with no arguments, it will inherit JT attributes
|
||||||
assert len(yaml.load(job_obj.extra_vars)) == 0
|
mock_job.signal_start.assert_called_once_with(extra_vars={})
|
||||||
assert job_obj.limit == job_template_saved.limit
|
|
||||||
assert job_obj.job_type == job_template_saved.job_type
|
|
||||||
assert job_obj.inventory.pk == job_template_saved.inventory.pk
|
|
||||||
assert job_obj.job_tags == job_template_saved.job_tags
|
|
||||||
assert job_obj.credential.pk == job_template_saved.credential.pk
|
|
||||||
|
|
||||||
# Check that response tells us what things were ignored
|
# Check that response tells us what things were ignored
|
||||||
assert 'job_launch_var' in response.data['ignored_fields']['extra_vars']
|
assert 'job_launch_var' in response.data['ignored_fields']['extra_vars']
|
||||||
@ -90,30 +87,25 @@ def test_job_ignore_unprompted_vars(runtime_data, job_template_prompts, post, us
|
|||||||
|
|
||||||
@pytest.mark.django_db
|
@pytest.mark.django_db
|
||||||
@pytest.mark.job_runtime_vars
|
@pytest.mark.job_runtime_vars
|
||||||
def test_job_accept_prompted_vars(runtime_data, job_template_prompts, post, user):
|
def test_job_accept_prompted_vars(runtime_data, job_template_prompts, post, user, mocker):
|
||||||
job_template = job_template_prompts(True)
|
job_template = job_template_prompts(True)
|
||||||
admin_user = user('admin', True)
|
|
||||||
|
|
||||||
job_template.inventory.execute_role.members.add(admin_user)
|
mock_job = mocker.MagicMock(spec=Job, id=968, **runtime_data)
|
||||||
|
|
||||||
response = post(reverse('api:job_template_launch', args=[job_template.pk]),
|
with mocker.patch('awx.main.models.unified_jobs.UnifiedJobTemplate.create_unified_job', return_value=mock_job):
|
||||||
runtime_data, admin_user)
|
with mocker.patch('awx.api.serializers.JobSerializer.to_representation'):
|
||||||
|
response = post(reverse('api:job_template_launch', args=[job_template.pk]),
|
||||||
|
runtime_data, user('admin', True))
|
||||||
|
|
||||||
assert response.status_code == 201
|
assert response.status_code == 201
|
||||||
job_id = response.data['job']
|
job_id = response.data['job']
|
||||||
job_obj = Job.objects.get(pk=job_id)
|
assert job_id == 968
|
||||||
|
|
||||||
# Check that job data matches the given runtime variables
|
mock_job.signal_start.assert_called_once_with(**runtime_data)
|
||||||
assert 'job_launch_var' in yaml.load(job_obj.extra_vars)
|
|
||||||
assert job_obj.limit == runtime_data['limit']
|
|
||||||
assert job_obj.job_type == runtime_data['job_type']
|
|
||||||
assert job_obj.inventory.pk == runtime_data['inventory']
|
|
||||||
assert job_obj.credential.pk == runtime_data['credential']
|
|
||||||
assert job_obj.job_tags == runtime_data['job_tags']
|
|
||||||
|
|
||||||
@pytest.mark.django_db
|
@pytest.mark.django_db
|
||||||
@pytest.mark.job_runtime_vars
|
@pytest.mark.job_runtime_vars
|
||||||
def test_job_accept_prompted_vars_null(runtime_data, job_template_prompts_null, post, user):
|
def test_job_accept_prompted_vars_null(runtime_data, job_template_prompts_null, post, user, mocker):
|
||||||
job_template = job_template_prompts_null
|
job_template = job_template_prompts_null
|
||||||
common_user = user('not-admin', False)
|
common_user = user('not-admin', False)
|
||||||
|
|
||||||
@ -126,20 +118,17 @@ def test_job_accept_prompted_vars_null(runtime_data, job_template_prompts_null,
|
|||||||
inventory = Inventory.objects.get(pk=runtime_data['inventory'])
|
inventory = Inventory.objects.get(pk=runtime_data['inventory'])
|
||||||
inventory.use_role.members.add(common_user)
|
inventory.use_role.members.add(common_user)
|
||||||
|
|
||||||
response = post(reverse('api:job_template_launch', args=[job_template.pk]),
|
mock_job = mocker.MagicMock(spec=Job, id=968, **runtime_data)
|
||||||
runtime_data, common_user)
|
|
||||||
|
with mocker.patch('awx.main.models.unified_jobs.UnifiedJobTemplate.create_unified_job', return_value=mock_job):
|
||||||
|
with mocker.patch('awx.api.serializers.JobSerializer.to_representation'):
|
||||||
|
response = post(reverse('api:job_template_launch', args=[job_template.pk]),
|
||||||
|
runtime_data, common_user)
|
||||||
|
|
||||||
assert response.status_code == 201
|
assert response.status_code == 201
|
||||||
job_id = response.data['job']
|
job_id = response.data['job']
|
||||||
job_obj = Job.objects.get(pk=job_id)
|
assert job_id == 968
|
||||||
|
mock_job.signal_start.assert_called_once_with(**runtime_data)
|
||||||
# Check that job data matches the given runtime variables
|
|
||||||
assert 'job_launch_var' in yaml.load(job_obj.extra_vars)
|
|
||||||
assert job_obj.limit == runtime_data['limit']
|
|
||||||
assert job_obj.job_type == runtime_data['job_type']
|
|
||||||
assert job_obj.inventory.pk == runtime_data['inventory']
|
|
||||||
assert job_obj.credential.pk == runtime_data['credential']
|
|
||||||
assert job_obj.job_tags == runtime_data['job_tags']
|
|
||||||
|
|
||||||
@pytest.mark.django_db
|
@pytest.mark.django_db
|
||||||
@pytest.mark.job_runtime_vars
|
@pytest.mark.job_runtime_vars
|
||||||
@ -182,25 +171,23 @@ def test_job_launch_fails_without_inventory(deploy_jobtemplate, post, user):
|
|||||||
|
|
||||||
@pytest.mark.django_db
|
@pytest.mark.django_db
|
||||||
@pytest.mark.job_runtime_vars
|
@pytest.mark.job_runtime_vars
|
||||||
def test_job_launch_fails_without_inventory_access(deploy_jobtemplate, machine_credential, post, user):
|
def test_job_launch_fails_without_inventory_access(job_template_prompts, runtime_data, machine_credential, post, user, mocker):
|
||||||
deploy_jobtemplate.ask_inventory_on_launch = True
|
job_template = job_template_prompts(True)
|
||||||
deploy_jobtemplate.credential = machine_credential
|
|
||||||
deploy_jobtemplate.save()
|
|
||||||
common_user = user('test-user', False)
|
common_user = user('test-user', False)
|
||||||
deploy_jobtemplate.execute_role.members.add(common_user)
|
job_template.execute_role.members.add(common_user)
|
||||||
deploy_jobtemplate.inventory.use_role.members.add(common_user)
|
|
||||||
deploy_jobtemplate.project.member_role.members.add(common_user)
|
|
||||||
deploy_jobtemplate.credential.use_role.members.add(common_user)
|
|
||||||
|
|
||||||
# Assure that the base job template can be launched to begin with
|
# Assure that the base job template can be launched to begin with
|
||||||
response = post(reverse('api:job_template_launch',
|
mock_job = mocker.MagicMock(spec=Job, id=968, **runtime_data)
|
||||||
args=[deploy_jobtemplate.pk]), {}, common_user)
|
with mocker.patch('awx.main.models.unified_jobs.UnifiedJobTemplate.create_unified_job', return_value=mock_job):
|
||||||
|
with mocker.patch('awx.api.serializers.JobSerializer.to_representation'):
|
||||||
|
response = post(reverse('api:job_template_launch',
|
||||||
|
args=[job_template.pk]), {}, common_user)
|
||||||
|
|
||||||
assert response.status_code == 201
|
assert response.status_code == 201
|
||||||
|
|
||||||
# Assure that giving an inventory without access to the inventory blocks the launch
|
# Assure that giving an inventory without access to the inventory blocks the launch
|
||||||
new_inv = deploy_jobtemplate.project.organization.inventories.create(name="user-can-not-use")
|
new_inv = job_template.project.organization.inventories.create(name="user-can-not-use")
|
||||||
response = post(reverse('api:job_template_launch', args=[deploy_jobtemplate.pk]),
|
response = post(reverse('api:job_template_launch', args=[job_template.pk]),
|
||||||
dict(inventory=new_inv.pk), common_user)
|
dict(inventory=new_inv.pk), common_user)
|
||||||
|
|
||||||
assert response.status_code == 403
|
assert response.status_code == 403
|
||||||
@ -208,29 +195,23 @@ def test_job_launch_fails_without_inventory_access(deploy_jobtemplate, machine_c
|
|||||||
|
|
||||||
@pytest.mark.django_db
|
@pytest.mark.django_db
|
||||||
@pytest.mark.job_runtime_vars
|
@pytest.mark.job_runtime_vars
|
||||||
def test_job_relaunch_prompted_vars(runtime_data, job_template_prompts, post, user):
|
def test_job_relaunch_copy_vars(runtime_data, job_template_prompts, project, post, mocker):
|
||||||
job_template = job_template_prompts(True)
|
job_template = job_template_prompts(True)
|
||||||
admin_user = user('admin', True)
|
|
||||||
|
|
||||||
# Launch job, overwriting several JT fields
|
# Create a job with the given data that will be relaunched
|
||||||
first_response = post(reverse('api:job_template_launch', args=[job_template.pk]),
|
job_create_kwargs = runtime_data
|
||||||
runtime_data, admin_user)
|
inv_obj = Inventory.objects.get(pk=job_create_kwargs.pop('inventory'))
|
||||||
|
cred_obj = Credential.objects.get(pk=job_create_kwargs.pop('credential'))
|
||||||
|
original_job = Job.objects.create(inventory=inv_obj, credential=cred_obj, job_template=job_template, **job_create_kwargs)
|
||||||
|
with mocker.patch('awx.main.models.unified_jobs.UnifiedJobTemplate._get_unified_job_field_names', return_value=runtime_data.keys()):
|
||||||
|
second_job = original_job.copy()
|
||||||
|
|
||||||
assert first_response.status_code == 201
|
# Check that job data matches the original variables
|
||||||
original_job = Job.objects.get(pk=first_response.data['job'])
|
assert 'job_launch_var' in yaml.load(second_job.extra_vars)
|
||||||
|
assert original_job.limit == second_job.limit
|
||||||
# Launch a second job as a relaunch of the first
|
assert original_job.job_type == second_job.job_type
|
||||||
second_response = post(reverse('api:job_relaunch', args=[original_job.pk]),
|
assert original_job.inventory.pk == second_job.inventory.pk
|
||||||
{}, admin_user)
|
assert original_job.job_tags == second_job.job_tags
|
||||||
relaunched_job = Job.objects.get(pk=second_response.data['job'])
|
|
||||||
|
|
||||||
# Check that job data matches the original runtime variables
|
|
||||||
assert first_response.status_code == 201
|
|
||||||
assert 'job_launch_var' in yaml.load(relaunched_job.extra_vars)
|
|
||||||
assert relaunched_job.limit == runtime_data['limit']
|
|
||||||
assert relaunched_job.job_type == runtime_data['job_type']
|
|
||||||
assert relaunched_job.inventory.pk == runtime_data['inventory']
|
|
||||||
assert relaunched_job.job_tags == runtime_data['job_tags']
|
|
||||||
|
|
||||||
@pytest.mark.django_db
|
@pytest.mark.django_db
|
||||||
def test_job_launch_JT_with_validation(machine_credential, deploy_jobtemplate):
|
def test_job_launch_JT_with_validation(machine_credential, deploy_jobtemplate):
|
||||||
@ -246,10 +227,8 @@ def test_job_launch_JT_with_validation(machine_credential, deploy_jobtemplate):
|
|||||||
assert validated
|
assert validated
|
||||||
|
|
||||||
job_obj = deploy_jobtemplate.create_unified_job(**kv)
|
job_obj = deploy_jobtemplate.create_unified_job(**kv)
|
||||||
result = job_obj.signal_start(**kv)
|
|
||||||
|
|
||||||
final_job_extra_vars = yaml.load(job_obj.extra_vars)
|
final_job_extra_vars = yaml.load(job_obj.extra_vars)
|
||||||
assert result
|
|
||||||
assert 'job_template_var' in final_job_extra_vars
|
assert 'job_template_var' in final_job_extra_vars
|
||||||
assert 'job_launch_var' in final_job_extra_vars
|
assert 'job_launch_var' in final_job_extra_vars
|
||||||
assert job_obj.credential.id == machine_credential.id
|
assert job_obj.credential.id == machine_credential.id
|
||||||
@ -280,16 +259,17 @@ def test_job_launch_unprompted_vars_with_survey(mocker, job_template_prompts, po
|
|||||||
}
|
}
|
||||||
job_template.save()
|
job_template.save()
|
||||||
|
|
||||||
response = post(
|
mock_job = mocker.MagicMock(spec=Job, id=968, extra_vars={"job_launch_var": 3, "survey_var": 4})
|
||||||
reverse('api:job_template_launch', args=[job_template.pk]),
|
with mocker.patch('awx.main.models.unified_jobs.UnifiedJobTemplate.create_unified_job', return_value=mock_job):
|
||||||
dict(extra_vars={"job_launch_var": 3, "survey_var": 4}),
|
with mocker.patch('awx.api.serializers.JobSerializer.to_representation'):
|
||||||
user('admin', True))
|
response = post(
|
||||||
assert response.status_code == 201
|
reverse('api:job_template_launch', args=[job_template.pk]),
|
||||||
|
dict(extra_vars={"job_launch_var": 3, "survey_var": 4}),
|
||||||
|
user('admin', True))
|
||||||
|
assert response.status_code == 201
|
||||||
|
|
||||||
job_id = response.data['job']
|
job_id = response.data['job']
|
||||||
job_obj = Job.objects.get(pk=job_id)
|
assert job_id == 968
|
||||||
|
|
||||||
# Check that the survey variable is accepted and the job variable isn't
|
# Check that the survey variable is accepted and the job variable isn't
|
||||||
job_extra_vars = yaml.load(job_obj.extra_vars)
|
mock_job.signal_start.assert_called_once_with(extra_vars={"survey_var": 4})
|
||||||
assert 'job_launch_var' not in job_extra_vars
|
|
||||||
assert 'survey_var' in job_extra_vars
|
|
||||||
|
Loading…
Reference in New Issue
Block a user