1
0
mirror of https://github.com/ansible/awx.git synced 2024-10-30 22:21:13 +03:00

Add test for approve node denial

This commit is contained in:
beeankha 2019-08-23 08:32:08 -04:00 committed by Ryan Petrello
parent 8b23ff71b4
commit b5c0f58137
No known key found for this signature in database
GPG Key ID: F2AA5F2122351777

View File

@ -6,6 +6,7 @@ from awx.api.versioning import reverse
from awx.main.models.activity_stream import ActivityStream from awx.main.models.activity_stream import ActivityStream
from awx.main.models.jobs import JobTemplate from awx.main.models.jobs import JobTemplate
from awx.main.models.workflow import ( from awx.main.models.workflow import (
WorkflowApproval,
WorkflowApprovalTemplate, WorkflowApprovalTemplate,
WorkflowJob, WorkflowJob,
WorkflowJobTemplate, WorkflowJobTemplate,
@ -143,15 +144,15 @@ class TestApprovalNodes():
] ]
@pytest.mark.django_db @pytest.mark.django_db
def test_approval_node_actions(self, post, admin_user, job_template): def test_approval_node_approve(self, post, admin_user, job_template):
# This test ensures that a user (with permissions to do so) can approve/deny # This test ensures that a user (with permissions to do so) can APPROVE
# workflow approvals. Also asserts that trying to approve/deny approvals # workflow approvals. Also asserts that trying to APPROVE approvals
# that have already been dealt with will throw an error. # that have already been dealt with will throw an error.
wfjt = WorkflowJobTemplate.objects.create(name='foobar') wfjt = WorkflowJobTemplate.objects.create(name='foobar')
node = wfjt.workflow_nodes.create(unified_job_template=job_template) node = wfjt.workflow_nodes.create(unified_job_template=job_template)
url = reverse('api:workflow_job_template_node_create_approval', url = reverse('api:workflow_job_template_node_create_approval',
kwargs={'pk': node.pk, 'version': 'v2'}) kwargs={'pk': node.pk, 'version': 'v2'})
post(url, {'name': 'Approve/Deny Test', 'description': '', 'timeout': 0}, post(url, {'name': 'Approve/Deny Test1', 'description': '', 'timeout': 0},
user=admin_user, expect=200) user=admin_user, expect=200)
post(reverse('api:workflow_job_template_launch', kwargs={'pk': wfjt.pk}), post(reverse('api:workflow_job_template_launch', kwargs={'pk': wfjt.pk}),
user=admin_user, expect=201) user=admin_user, expect=201)
@ -160,17 +161,48 @@ class TestApprovalNodes():
TaskManager().schedule() TaskManager().schedule()
wfj_node = wf_job.workflow_nodes.first() wfj_node = wf_job.workflow_nodes.first()
approval = wfj_node.job approval = wfj_node.job
assert approval.name == 'Approve/Deny Test' assert approval.name == 'Approve/Deny Test1'
post(reverse('api:workflow_approval_approve', kwargs={'pk': approval.pk}), post(reverse('api:workflow_approval_approve', kwargs={'pk': approval.pk}),
user=admin_user, expect=204) user=admin_user, expect=204)
# Test that there is an activity stream entry that was created for the "approve" action. # Test that there is an activity stream entry that was created for the "approve" action.
qs = ActivityStream.objects.order_by('-timestamp').first() qs = ActivityStream.objects.order_by('-timestamp').first()
assert qs.object1 == 'workflow_approval' assert qs.object1 == 'workflow_approval'
assert qs.changes == '{"status": ["pending", "successful"]}' assert qs.changes == '{"status": ["pending", "successful"]}'
assert WorkflowApproval.objects.get(pk=approval.pk).status == 'successful'
assert qs.operation == 'update' assert qs.operation == 'update'
post(reverse('api:workflow_approval_approve', kwargs={'pk': approval.pk}), post(reverse('api:workflow_approval_approve', kwargs={'pk': approval.pk}),
user=admin_user, expect=403) user=admin_user, expect=403)
@pytest.mark.django_db
def test_approval_node_deny(self, post, admin_user, job_template):
# This test ensures that a user (with permissions to do so) can DENY
# workflow approvals. Also asserts that trying to DENY approvals
# that have already been dealt with will throw an error.
wfjt = WorkflowJobTemplate.objects.create(name='foobar')
node = wfjt.workflow_nodes.create(unified_job_template=job_template)
url = reverse('api:workflow_job_template_node_create_approval',
kwargs={'pk': node.pk, 'version': 'v2'})
post(url, {'name': 'Approve/Deny Test2', 'description': '', 'timeout': 0},
user=admin_user, expect=200)
post(reverse('api:workflow_job_template_launch', kwargs={'pk': wfjt.pk}),
user=admin_user, expect=201)
wf_job = WorkflowJob.objects.first()
TaskManager().schedule()
TaskManager().schedule()
wfj_node = wf_job.workflow_nodes.first()
approval = wfj_node.job
assert approval.name == 'Approve/Deny Test2'
post(reverse('api:workflow_approval_deny', kwargs={'pk': approval.pk}),
user=admin_user, expect=204)
# Test that there is an activity stream entry that was created for the "approve" action.
qs = ActivityStream.objects.order_by('-timestamp').first()
assert qs.object1 == 'workflow_approval'
assert qs.changes == '{"status": ["pending", "failed"]}'
assert WorkflowApproval.objects.get(pk=approval.pk).status == 'failed'
assert qs.operation == 'update'
post(reverse('api:workflow_approval_deny', kwargs={'pk': approval.pk}),
user=admin_user, expect=403)
def test_approval_node_cleanup(self, post, approval_node, admin_user, get): def test_approval_node_cleanup(self, post, approval_node, admin_user, get):
workflow_job_template = WorkflowJobTemplate.objects.create() workflow_job_template = WorkflowJobTemplate.objects.create()
approval_node = WorkflowJobTemplateNode.objects.create( approval_node = WorkflowJobTemplateNode.objects.create(