1
0
mirror of https://github.com/ansible/awx.git synced 2024-11-02 01:21:21 +03:00

Remove Instance Group concept/usage from WorkflowJobs

This also relaxes some of the task manager rules on Instance Groups
down the full stack such that workflow jobs tend to shortcut the
processing or omit it altogether.

This lets the workflow job spawning logic exist outside of the
instance group queues, which it doesn't need to participate in in the
first place.
This commit is contained in:
Matthew Jones 2018-04-24 16:31:08 -04:00 committed by Matthew Jones
parent 7c0fdd5f05
commit 4af8a53232
No known key found for this signature in database
GPG Key ID: 76A4C17A97590C1C
4 changed files with 21 additions and 7 deletions

View File

@ -178,8 +178,6 @@ class InstanceGroupManager(models.Manager):
if t.status == 'waiting' or not t.execution_node:
# Subtract capacity from any peer groups that share instances
if not t.instance_group:
logger.warning('Excluded %s from capacity algorithm '
'(missing instance_group).', t.log_format)
impacted_groups = []
elif t.instance_group.name not in ig_ig_mapping:
# Waiting job in group with 0 capacity has no collateral impact

View File

@ -474,7 +474,7 @@ class WorkflowJob(UnifiedJob, WorkflowJobOptions, SurveyJobMixin, JobNotificatio
@property
def preferred_instance_groups(self):
return self.global_instance_groups
return []
'''
A WorkflowJob is a virtual job. It doesn't result in a celery task.

View File

@ -259,7 +259,7 @@ class TaskManager():
else:
if type(task) is WorkflowJob:
task.status = 'running'
if not task.supports_isolation() and rampart_group.controller_id:
elif not task.supports_isolation() and rampart_group.controller_id:
# non-Ansible jobs on isolated instances run on controller
task.instance_group = rampart_group.controller
logger.info('Submitting isolated %s to queue %s via %s.',
@ -271,7 +271,8 @@ class TaskManager():
task.celery_task_id = str(uuid.uuid4())
task.save()
self.consume_capacity(task, rampart_group.name)
if rampart_group is not None:
self.consume_capacity(task, rampart_group.name)
def post_commit():
task.websocket_emit_status(task.status)
@ -281,7 +282,7 @@ class TaskManager():
connection.on_commit(post_commit)
def process_running_tasks(self, running_tasks):
map(lambda task: self.graph[task.instance_group.name]['graph'].add_job(task), running_tasks)
map(lambda task: self.graph[task.instance_group.name]['graph'].add_job(task) if task.instance_group else None, running_tasks)
def create_project_update(self, task):
project_task = Project.objects.get(id=task.project_id).create_project_update(
@ -447,6 +448,9 @@ class TaskManager():
continue
preferred_instance_groups = task.preferred_instance_groups
found_acceptable_queue = False
if isinstance(task, WorkflowJob):
self.start_task(task, None, task.get_jobs_fail_chain())
continue
for rampart_group in preferred_instance_groups:
remaining_capacity = self.get_remaining_capacity(rampart_group.name)
if remaining_capacity <= 0:

View File

@ -2,7 +2,7 @@ import pytest
import mock
from datetime import timedelta
from awx.main.scheduler import TaskManager
from awx.main.models import InstanceGroup
from awx.main.models import InstanceGroup, WorkflowJob
from awx.main.tasks import apply_cluster_membership_policies
@ -77,6 +77,18 @@ def test_multi_group_with_shared_dependency(instance_factory, default_instance_g
assert TaskManager.start_task.call_count == 2
@pytest.mark.django_db
def test_workflow_job_no_instancegroup(workflow_job_template_factory, default_instance_group, mocker):
wfjt = workflow_job_template_factory('anicedayforawalk').workflow_job_template
wfj = WorkflowJob.objects.create(workflow_job_template=wfjt)
wfj.status = "pending"
wfj.save()
with mocker.patch("awx.main.scheduler.TaskManager.start_task"):
TaskManager().schedule()
TaskManager.start_task.assert_called_once_with(wfj, None, [])
assert wfj.instance_group is None
@pytest.mark.django_db
def test_overcapacity_blocking_other_groups_unaffected(instance_factory, default_instance_group, mocker,
instance_group_factory, job_template_factory):