diff --git a/awx/main/managers.py b/awx/main/managers.py index 4f53da65ae..155a46fa25 100644 --- a/awx/main/managers.py +++ b/awx/main/managers.py @@ -58,6 +58,8 @@ def get_ig_ig_mapping(ig_instance_mapping, instance_ig_mapping): ig_ig_set = set() for instance_hostname in ig_instance_mapping[group_name]: ig_ig_set |= instance_ig_mapping[instance_hostname] + else: + ig_ig_set.add(group_name) # Group contains no instances, return self ig_ig_mapping[group_name] = ig_ig_set return ig_ig_mapping @@ -94,33 +96,6 @@ class InstanceManager(models.Manager): # NOTE: TODO: Likely to repurpose this once standalone ramparts are a thing return "tower" - def capacity_mapping(self, qs=None): - """ - Returns tuple of two dictionaries that shows mutual connections by name - for global accounting of capacity - - instance_ig_mapping: {'instance_name': } - ig_ig_mapping: {'group_name': } - """ - if qs is None: - qs = self.all().prefetch_related('rampart_groups') - instance_ig_mapping = {} - ig_instance_mapping = {} - # Create dictionaries that represent basic m2m memberships - for instance in qs: - if instance.capacity == 0: - continue - instance_ig_mapping[instance.hostname] = set( - group.name for group in instance.rampart_groups.all() - ) - for group in instance.rampart_groups.all(): - ig_instance_mapping.setdefault(group.name, set()) - ig_instance_mapping[group.name].add(instance.hostname) - # Get IG capacity overlap mapping - ig_ig_mapping = get_ig_ig_mapping(ig_instance_mapping, instance_ig_mapping) - - return instance_ig_mapping, ig_ig_mapping - class InstanceGroupManager(models.Manager): """A custom manager class for the Instance model. @@ -152,11 +127,20 @@ class InstanceGroupManager(models.Manager): return instance_ig_mapping, ig_ig_mapping + @staticmethod + def zero_out_group(graph, name, breakdown): + if name not in graph: + graph[name] = {} + graph[name]['consumed_capacity'] = 0 + if breakdown: + graph[name]['committed_capacity'] = 0 + graph[name]['running_capacity'] = 0 + def capacity_values(self, qs=None, tasks=None, breakdown=False, graph=None): """ Returns a dictionary of capacity values for all IGs """ - if qs is None: + if qs is None: # Optionally BYOQS - bring your own queryset qs = self.all().prefetch_related('instances') instance_ig_mapping, ig_ig_mapping = self.capacity_mapping(qs=qs) @@ -167,22 +151,41 @@ class InstanceGroupManager(models.Manager): if graph is None: graph = {group.name: {} for group in qs} for group_name in graph: - graph[group_name]['consumed_capacity'] = 0 - if breakdown: - graph[group_name]['committed_capacity'] = 0 - graph[group_name]['running_capacity'] = 0 + self.zero_out_group(graph, group_name, breakdown) for t in tasks: # TODO: dock capacity for isolated job management tasks running in queue impact = t.task_impact if t.status == 'waiting' or not t.execution_node: # Subtract capacity from any peer groups that share instances - for group_name in ig_ig_mapping[t.instance_group.name]: + if not t.instance_group: + logger.warning('Excluded %s from capacity algorithm ' + '(missing instance_group).', t.log_format) + impacted_groups = [] + elif t.instance_group.name not in ig_ig_mapping: + # Waiting job in group with 0 capacity has no collateral impact + impacted_groups = [t.instance_group.name] + else: + impacted_groups = ig_ig_mapping[t.instance_group.name] + for group_name in impacted_groups: + if group_name not in graph: + self.zero_out_group(graph, group_name, breakdown) graph[group_name]['consumed_capacity'] += impact if breakdown: graph[group_name]['committed_capacity'] += impact elif t.status == 'running': # Subtract capacity from all groups that contain the instance - for group_name in instance_ig_mapping[t.execution_node]: + if t.execution_node not in instance_ig_mapping: + logger.warning('Detected %s running inside lost instance, ' + 'may still be waiting for reaper.', t.log_format) + if t.instance_group: + impacted_groups = [t.instance_group.name] + else: + impacted_groups = [] + else: + impacted_groups = instance_ig_mapping[t.execution_node] + for group_name in impacted_groups: + if group_name not in graph: + self.zero_out_group(graph, group_name, breakdown) graph[group_name]['consumed_capacity'] += impact if breakdown: graph[group_name]['running_capacity'] += impact diff --git a/awx/main/scheduler/__init__.py b/awx/main/scheduler/__init__.py index 6430d007a4..a6f477539c 100644 --- a/awx/main/scheduler/__init__.py +++ b/awx/main/scheduler/__init__.py @@ -38,7 +38,7 @@ class TaskManager(): def __init__(self): self.graph = dict() - for rampart_group in InstanceGroup.objects.all(): + for rampart_group in InstanceGroup.objects.prefetch_related('instances'): self.graph[rampart_group.name] = dict(graph=DependencyGraph(rampart_group.name), capacity_total=rampart_group.capacity, consumed_capacity=0) diff --git a/awx/main/tests/functional/task_management/test_capacity.py b/awx/main/tests/functional/task_management/test_capacity.py index 66d2eab7f2..7b7b7d7dc0 100644 --- a/awx/main/tests/functional/task_management/test_capacity.py +++ b/awx/main/tests/functional/task_management/test_capacity.py @@ -1,10 +1,8 @@ import pytest -import mock from django.test import TransactionTestCase from awx.main.models import ( - Job, Instance, InstanceGroup, ) @@ -28,42 +26,9 @@ class TestCapacityMapping(TransactionTestCase): def test_mapping(self): self.sample_cluster() with self.assertNumQueries(2): - inst_map, ig_map = Instance.objects.capacity_mapping() + inst_map, ig_map = InstanceGroup.objects.capacity_mapping() assert inst_map['i1'] == set(['ig_small']) assert inst_map['i2'] == set(['ig_large', 'tower']) assert ig_map['ig_small'] == set(['ig_small']) assert ig_map['ig_large'] == set(['ig_large', 'tower']) assert ig_map['tower'] == set(['ig_large', 'tower']) - - def test_committed_capacity(self): - tower, ig_large, ig_small = self.sample_cluster() - tasks = [ - Job(status='waiting', instance_group=tower), - Job(status='waiting', instance_group=ig_large), - Job(status='waiting', instance_group=ig_small) - ] - with mock.patch.object(Job, 'task_impact', new=mock.PropertyMock(return_value=43)): - capacities = InstanceGroup.objects.capacity_values( - tasks=tasks, breakdown=True - ) - # Jobs submitted to either tower or ig_larg must count toward both - assert capacities['tower']['committed_capacity'] == 43 * 2 - assert capacities['ig_large']['committed_capacity'] == 43 * 2 - assert capacities['ig_small']['committed_capacity'] == 43 - - def test_running_capacity(self): - tower, ig_large, ig_small = self.sample_cluster() - tasks = [ - Job(status='running', execution_node='i1'), - Job(status='running', execution_node='i2'), - Job(status='running', execution_node='i3') - ] - with mock.patch.object(Job, 'task_impact', new=mock.PropertyMock(return_value=43)): - capacities = InstanceGroup.objects.capacity_values( - tasks=tasks, breakdown=True - ) - # Tower is only given 1 instance - assert capacities['tower']['running_capacity'] == 43 - # Large IG has 2 instances - assert capacities['ig_large']['running_capacity'] == 43 * 2 - assert capacities['ig_small']['running_capacity'] == 43 diff --git a/awx/main/tests/unit/test_capacity.py b/awx/main/tests/unit/test_capacity.py new file mode 100644 index 0000000000..7817521f2e --- /dev/null +++ b/awx/main/tests/unit/test_capacity.py @@ -0,0 +1,135 @@ +import pytest + +from awx.main.models import InstanceGroup + + +class FakeObject(object): + def __init__(self, **kwargs): + for k, v in kwargs.items(): + setattr(self, k, v) + + +class Job(FakeObject): + task_impact = 43 + + def log_format(self): + return 'job 382 (fake)' + + +@pytest.fixture +def sample_cluster(): + def stand_up_cluster(): + + class Instances(FakeObject): + def add(self, *args): + for instance in args: + self.obj.instance_list.append(instance) + + def all(self): + return self.obj.instance_list + + class InstanceGroup(FakeObject): + + def __init__(self, **kwargs): + super(InstanceGroup, self).__init__(**kwargs) + self.instance_list = [] + + @property + def instances(self): + mgr = Instances(obj=self) + return mgr + + + class Instance(FakeObject): + pass + + + ig_small = InstanceGroup(name='ig_small') + ig_large = InstanceGroup(name='ig_large') + tower = InstanceGroup(name='tower') + i1 = Instance(hostname='i1', capacity=200) + i2 = Instance(hostname='i2', capacity=200) + i3 = Instance(hostname='i3', capacity=200) + ig_small.instances.add(i1) + ig_large.instances.add(i2, i3) + tower.instances.add(i2) + return [tower, ig_large, ig_small] + return stand_up_cluster + + +def test_committed_capacity(sample_cluster): + tower, ig_large, ig_small = sample_cluster() + tasks = [ + Job(status='waiting', instance_group=tower), + Job(status='waiting', instance_group=ig_large), + Job(status='waiting', instance_group=ig_small) + ] + capacities = InstanceGroup.objects.capacity_values( + qs=[tower, ig_large, ig_small], tasks=tasks, breakdown=True + ) + # Jobs submitted to either tower or ig_larg must count toward both + assert capacities['tower']['committed_capacity'] == 43 * 2 + assert capacities['ig_large']['committed_capacity'] == 43 * 2 + assert capacities['ig_small']['committed_capacity'] == 43 + + +def test_running_capacity(sample_cluster): + tower, ig_large, ig_small = sample_cluster() + tasks = [ + Job(status='running', execution_node='i1'), + Job(status='running', execution_node='i2'), + Job(status='running', execution_node='i3') + ] + capacities = InstanceGroup.objects.capacity_values( + qs=[tower, ig_large, ig_small], tasks=tasks, breakdown=True + ) + # Tower is only given 1 instance + assert capacities['tower']['running_capacity'] == 43 + # Large IG has 2 instances + assert capacities['ig_large']['running_capacity'] == 43 * 2 + assert capacities['ig_small']['running_capacity'] == 43 + + +def test_offline_node_running(sample_cluster): + """ + Assure that algorithm doesn't explode if a job is marked running + in an offline node + """ + tower, ig_large, ig_small = sample_cluster() + ig_small.instance_list[0].capacity = 0 + tasks = [Job(status='running', execution_node='i1', instance_group=ig_small)] + capacities = InstanceGroup.objects.capacity_values( + qs=[tower, ig_large, ig_small], tasks=tasks) + assert capacities['ig_small']['consumed_capacity'] == 43 + + +def test_offline_node_waiting(sample_cluster): + """ + Same but for a waiting job + """ + tower, ig_large, ig_small = sample_cluster() + ig_small.instance_list[0].capacity = 0 + tasks = [Job(status='waiting', instance_group=ig_small)] + capacities = InstanceGroup.objects.capacity_values( + qs=[tower, ig_large, ig_small], tasks=tasks) + assert capacities['ig_small']['consumed_capacity'] == 43 + + +def test_RBAC_reduced_filter(sample_cluster): + """ + User can see jobs that are running in `ig_small` and `ig_large` IGs, + but user does not have permission to see those actual instance groups. + Verify that this does not blow everything up. + """ + tower, ig_large, ig_small = sample_cluster() + tasks = [ + Job(status='waiting', instance_group=tower), + Job(status='waiting', instance_group=ig_large), + Job(status='waiting', instance_group=ig_small) + ] + capacities = InstanceGroup.objects.capacity_values( + qs=[tower], tasks=tasks, breakdown=True + ) + # Cross-links between groups not visible to current user, + # so a naieve accounting of capacities is returned instead + assert capacities['tower']['committed_capacity'] == 43 diff --git a/awx/main/tests/unit/test_task_manager.py b/awx/main/tests/unit/test_task_manager.py index f76e77862b..65b7607bb4 100644 --- a/awx/main/tests/unit/test_task_manager.py +++ b/awx/main/tests/unit/test_task_manager.py @@ -20,7 +20,7 @@ class TestCleanupInconsistentCeleryTasks(): @mock.patch.object(cache, 'get', return_value=None) @mock.patch.object(TaskManager, 'get_active_tasks', return_value=([], {})) @mock.patch.object(TaskManager, 'get_running_tasks', return_value=({'host1': [Job(id=2), Job(id=3),]}, [])) - @mock.patch.object(InstanceGroup.objects, 'all', return_value=[]) + @mock.patch.object(InstanceGroup.objects, 'prefetch_related', return_value=[]) @mock.patch.object(Instance.objects, 'get', side_effect=Instance.DoesNotExist) @mock.patch('awx.main.scheduler.logger') def test_instance_does_not_exist(self, logger_mock, *args): @@ -36,7 +36,7 @@ class TestCleanupInconsistentCeleryTasks(): @mock.patch.object(cache, 'get', return_value=None) @mock.patch.object(TaskManager, 'get_active_tasks', return_value=([], {'host1': []})) - @mock.patch.object(InstanceGroup.objects, 'all', return_value=[]) + @mock.patch.object(InstanceGroup.objects, 'prefetch_related', return_value=[]) @mock.patch.object(TaskManager, 'get_running_tasks') @mock.patch('awx.main.scheduler.logger') def test_save_failed(self, logger_mock, get_running_tasks, *args):