From d347a06e3d74439c65944b77896b532b4802a734 Mon Sep 17 00:00:00 2001 From: chris meyers Date: Thu, 29 Nov 2018 13:29:16 -0500 Subject: [PATCH] do not deny existing workflow node relationships --- awx/api/views/__init__.py | 14 +++++--- .../functional/api/test_workflow_node.py | 34 +++++++++---------- 2 files changed, 26 insertions(+), 22 deletions(-) diff --git a/awx/api/views/__init__.py b/awx/api/views/__init__.py index d566dfd477..48340a1011 100644 --- a/awx/api/views/__init__.py +++ b/awx/api/views/__init__.py @@ -2957,12 +2957,16 @@ class WorkflowJobTemplateNodeChildrenBaseList(WorkflowsEnforcementMixin, Enforce if parent.id == sub.id: return {"Error": _("Cycle detected.")} - if parent.id == sub.id + ''' + Look for parent->child connection in all relationships except the relationship that is + attempting to be added; because it's ok to re-add the relationship + ''' + relationships = ['success_nodes', 'failure_nodes', 'always_nodes'] + relationships.remove(self.relationship) + qs = reduce(lambda x, y: (x | y), + (Q(**{'{}__in'.format(rel): [sub.id]}) for rel in relationships)) - if WorkflowJobTemplateNode.objects.filter(Q(pk=parent.id) & - Q(success_nodes__in=[sub.id]) | - Q(failure_nodes__in=[sub.id]) | - Q(always_nodes__in=[sub.id])).exists(): + if WorkflowJobTemplateNode.objects.filter(Q(pk=parent.id) & qs).exists(): return {"Error": _("Relationship not allowed.")} parent_node_type_relationship = getattr(parent, self.relationship) diff --git a/awx/main/tests/functional/api/test_workflow_node.py b/awx/main/tests/functional/api/test_workflow_node.py index 003beea9bf..3e8a756e48 100644 --- a/awx/main/tests/functional/api/test_workflow_node.py +++ b/awx/main/tests/functional/api/test_workflow_node.py @@ -77,7 +77,7 @@ def test_node_accepts_prompted_fields(inventory, project, workflow_job_template, @pytest.mark.django_db -class TestExclusivePathEnforcement(): +class TestExclusiveRelationshipEnforcement(): @pytest.fixture def n1(self, workflow_job_template): return WorkflowJobTemplateNode.objects.create(workflow_job_template=workflow_job_template) @@ -86,11 +86,11 @@ class TestExclusivePathEnforcement(): def n2(self, workflow_job_template): return WorkflowJobTemplateNode.objects.create(workflow_job_template=workflow_job_template) - def generate_url(self, path, id): - return reverse('api:workflow_job_template_node_{}_nodes_list'.format(path), + def generate_url(self, relationship, id): + return reverse('api:workflow_job_template_node_{}_nodes_list'.format(relationship), kwargs={'pk': id}) - path_permutations = [ + relationship_permutations = [ ['success', 'failure', 'always'], ['success', 'always', 'failure'], ['failure', 'always', 'success'], @@ -99,10 +99,10 @@ class TestExclusivePathEnforcement(): ['always', 'failure', 'success'], ] - @pytest.mark.parametrize("paths", path_permutations, ids=["-".join(item) for item in path_permutations]) - def test_multi_connections_same_parent_disallowed(self, post, admin_user, n1, n2, paths): - for index, path in enumerate(paths): - r = post(self.generate_url(path, n1.id), + @pytest.mark.parametrize("relationships", relationship_permutations, ids=["-".join(item) for item in relationship_permutations]) + def test_multi_connections_same_parent_disallowed(self, post, admin_user, n1, n2, relationships): + for index, relationship in enumerate(relationships): + r = post(self.generate_url(relationship, n1.id), data={'associate': True, 'id': n2.id}, user=admin_user, expect=204 if index == 0 else 400) @@ -110,16 +110,16 @@ class TestExclusivePathEnforcement(): if index != 0: assert {'Error': 'Relationship not allowed.'} == json.loads(r.content) - @pytest.mark.parametrize("relationship", ['success', 'failure', 'always']): + @pytest.mark.parametrize("relationship", ['success', 'failure', 'always']) def test_existing_relationship_allowed(self, post, admin_user, n1, n2, relationship): - r = post(self.generate_url(path, n1.id), - data={'associate': True, 'id': n2.id}, - user=admin_user, - expect=204) - r = post(self.generate_url(path, n1.id), - data={'associate': True, 'id': n2.id}, - user=admin_user, - expect=200) + post(self.generate_url(relationship, n1.id), + data={'associate': True, 'id': n2.id}, + user=admin_user, + expect=204) + post(self.generate_url(relationship, n1.id), + data={'associate': True, 'id': n2.id}, + user=admin_user, + expect=204) @pytest.mark.django_db