From d69b4e00ffbdee567e56c6d46c033869b77f795f Mon Sep 17 00:00:00 2001 From: AlanCoding Date: Wed, 21 Jun 2017 15:26:26 -0400 Subject: [PATCH] select isolated node inside of queue based on capacity --- awx/main/models/unified_jobs.py | 4 +-- awx/main/scheduler/__init__.py | 1 + .../functional/models/test_unified_job.py | 35 +++++++++++++++++++ 3 files changed, 37 insertions(+), 3 deletions(-) diff --git a/awx/main/models/unified_jobs.py b/awx/main/models/unified_jobs.py index 55c53f1d8f..0142dd957a 100644 --- a/awx/main/models/unified_jobs.py +++ b/awx/main/models/unified_jobs.py @@ -961,9 +961,7 @@ class UnifiedJob(PolymorphicModel, PasswordFieldsModel, CommonModelNameNotUnique args = [self.pk] if ig.controller_id: if self.supports_isolation(): # case of jobs and ad hoc commands - # TODO: dock capacity from controller instance, use capacity to select isolated node - import random - isolated_instance = random.choice(ig.instances.all()) + isolated_instance = ig.instances.order_by('-capacity').first() args.append(isolated_instance.hostname) else: # proj & inv updates, system jobs run on controller queue = ig.controller.name diff --git a/awx/main/scheduler/__init__.py b/awx/main/scheduler/__init__.py index d3ad7de17c..2f844aacd9 100644 --- a/awx/main/scheduler/__init__.py +++ b/awx/main/scheduler/__init__.py @@ -406,6 +406,7 @@ class TaskManager(): for rampart_group in self.graph: self.graph[rampart_group]['capacity_used'] = 0 for t in tasks: + # TODO: dock capacity for isolated job management tasks running in queue for group_actual in InstanceGroup.objects.filter(instances__hostname=t.execution_node).values_list('name'): if group_actual[0] in self.graph: self.graph[group_actual[0]]['capacity_used'] += t.task_impact diff --git a/awx/main/tests/functional/models/test_unified_job.py b/awx/main/tests/functional/models/test_unified_job.py index 5b89644457..28131ec385 100644 --- a/awx/main/tests/functional/models/test_unified_job.py +++ b/awx/main/tests/functional/models/test_unified_job.py @@ -1,10 +1,12 @@ import pytest +import mock # Django from django.contrib.contenttypes.models import ContentType # AWX from awx.main.models import UnifiedJobTemplate, Job, JobTemplate, WorkflowJobTemplate, Project +from awx.main.models.ha import InstanceGroup @pytest.mark.django_db @@ -65,3 +67,36 @@ class TestCreateUnifiedJob: assert second_job.inventory == job_with_links.inventory assert second_job.limit == 'my_server' assert net_credential in second_job.extra_credentials.all() + + +@pytest.mark.django_db +class TestIsolatedRuns: + + def test_low_capacity_isolated_instance_selected(self): + ig = InstanceGroup.objects.create(name='tower') + iso_ig = InstanceGroup.objects.create(name='thepentagon', controller=ig) + iso_ig.instances.create(hostname='iso1', capacity=50) + i2 = iso_ig.instances.create(hostname='iso2', capacity=200) + job = Job.objects.create( + instance_group=iso_ig + ) + + mock_async = mock.MagicMock() + success_callback = mock.MagicMock() + error_callback = mock.MagicMock() + + class MockTaskClass: + apply_async = mock_async + + with mock.patch.object(job, '_get_task_class') as task_class: + task_class.return_value = MockTaskClass + job.start_celery_task([], error_callback, success_callback, 'thepentagon') + mock_async.assert_called_with([job.id, 'iso2'], [], link_error=error_callback, link=success_callback, queue='thepentagon') + + i2.capacity = 20 + i2.save() + + with mock.patch.object(job, '_get_task_class') as task_class: + task_class.return_value = MockTaskClass + job.start_celery_task([], error_callback, success_callback, 'thepentagon') + mock_async.assert_called_with([job.id, 'iso1'], [], link_error=error_callback, link=success_callback, queue='thepentagon')