From 4af8a532321d783382f80a59491ae4e9256ebd2c Mon Sep 17 00:00:00 2001 From: Matthew Jones Date: Tue, 24 Apr 2018 16:31:08 -0400 Subject: [PATCH] Remove Instance Group concept/usage from WorkflowJobs This also relaxes some of the task manager rules on Instance Groups down the full stack such that workflow jobs tend to shortcut the processing or omit it altogether. This lets the workflow job spawning logic exist outside of the instance group queues, which it doesn't need to participate in in the first place. --- awx/main/managers.py | 2 -- awx/main/models/workflow.py | 2 +- awx/main/scheduler/task_manager.py | 10 +++++++--- .../task_management/test_rampart_groups.py | 14 +++++++++++++- 4 files changed, 21 insertions(+), 7 deletions(-) diff --git a/awx/main/managers.py b/awx/main/managers.py index 1adb75e913..274a0ef774 100644 --- a/awx/main/managers.py +++ b/awx/main/managers.py @@ -178,8 +178,6 @@ class InstanceGroupManager(models.Manager): if t.status == 'waiting' or not t.execution_node: # Subtract capacity from any peer groups that share instances if not t.instance_group: - logger.warning('Excluded %s from capacity algorithm ' - '(missing instance_group).', t.log_format) impacted_groups = [] elif t.instance_group.name not in ig_ig_mapping: # Waiting job in group with 0 capacity has no collateral impact diff --git a/awx/main/models/workflow.py b/awx/main/models/workflow.py index f43a43cd24..c63bbc6f1f 100644 --- a/awx/main/models/workflow.py +++ b/awx/main/models/workflow.py @@ -474,7 +474,7 @@ class WorkflowJob(UnifiedJob, WorkflowJobOptions, SurveyJobMixin, JobNotificatio @property def preferred_instance_groups(self): - return self.global_instance_groups + return [] ''' A WorkflowJob is a virtual job. It doesn't result in a celery task. diff --git a/awx/main/scheduler/task_manager.py b/awx/main/scheduler/task_manager.py index f988e76fd3..b3d5ed14f1 100644 --- a/awx/main/scheduler/task_manager.py +++ b/awx/main/scheduler/task_manager.py @@ -259,7 +259,7 @@ class TaskManager(): else: if type(task) is WorkflowJob: task.status = 'running' - if not task.supports_isolation() and rampart_group.controller_id: + elif not task.supports_isolation() and rampart_group.controller_id: # non-Ansible jobs on isolated instances run on controller task.instance_group = rampart_group.controller logger.info('Submitting isolated %s to queue %s via %s.', @@ -271,7 +271,8 @@ class TaskManager(): task.celery_task_id = str(uuid.uuid4()) task.save() - self.consume_capacity(task, rampart_group.name) + if rampart_group is not None: + self.consume_capacity(task, rampart_group.name) def post_commit(): task.websocket_emit_status(task.status) @@ -281,7 +282,7 @@ class TaskManager(): connection.on_commit(post_commit) def process_running_tasks(self, running_tasks): - map(lambda task: self.graph[task.instance_group.name]['graph'].add_job(task), running_tasks) + map(lambda task: self.graph[task.instance_group.name]['graph'].add_job(task) if task.instance_group else None, running_tasks) def create_project_update(self, task): project_task = Project.objects.get(id=task.project_id).create_project_update( @@ -447,6 +448,9 @@ class TaskManager(): continue preferred_instance_groups = task.preferred_instance_groups found_acceptable_queue = False + if isinstance(task, WorkflowJob): + self.start_task(task, None, task.get_jobs_fail_chain()) + continue for rampart_group in preferred_instance_groups: remaining_capacity = self.get_remaining_capacity(rampart_group.name) if remaining_capacity <= 0: diff --git a/awx/main/tests/functional/task_management/test_rampart_groups.py b/awx/main/tests/functional/task_management/test_rampart_groups.py index 9b4b3eac44..ce79b78003 100644 --- a/awx/main/tests/functional/task_management/test_rampart_groups.py +++ b/awx/main/tests/functional/task_management/test_rampart_groups.py @@ -2,7 +2,7 @@ import pytest import mock from datetime import timedelta from awx.main.scheduler import TaskManager -from awx.main.models import InstanceGroup +from awx.main.models import InstanceGroup, WorkflowJob from awx.main.tasks import apply_cluster_membership_policies @@ -77,6 +77,18 @@ def test_multi_group_with_shared_dependency(instance_factory, default_instance_g assert TaskManager.start_task.call_count == 2 +@pytest.mark.django_db +def test_workflow_job_no_instancegroup(workflow_job_template_factory, default_instance_group, mocker): + wfjt = workflow_job_template_factory('anicedayforawalk').workflow_job_template + wfj = WorkflowJob.objects.create(workflow_job_template=wfjt) + wfj.status = "pending" + wfj.save() + with mocker.patch("awx.main.scheduler.TaskManager.start_task"): + TaskManager().schedule() + TaskManager.start_task.assert_called_once_with(wfj, None, []) + assert wfj.instance_group is None + + @pytest.mark.django_db def test_overcapacity_blocking_other_groups_unaffected(instance_factory, default_instance_group, mocker, instance_group_factory, job_template_factory):