mirror of
https://github.com/ansible/awx.git
synced 2024-10-31 15:21:13 +03:00
Handle capacity algorithm corner cases
Instance has gone lost, and jobs are still either running or waiting inside of its instance group RBAC - user does not have permission to see some of the groups that would be used in the capacity calculation For some cases, a naive capacity dictionary is returned, main goal is to not throw errors and avoid unpredicted behavior Detailed capacity tests are moved into new unit test file.
This commit is contained in:
parent
5327a4c622
commit
d54eb93f26
@ -58,6 +58,8 @@ def get_ig_ig_mapping(ig_instance_mapping, instance_ig_mapping):
|
||||
ig_ig_set = set()
|
||||
for instance_hostname in ig_instance_mapping[group_name]:
|
||||
ig_ig_set |= instance_ig_mapping[instance_hostname]
|
||||
else:
|
||||
ig_ig_set.add(group_name) # Group contains no instances, return self
|
||||
ig_ig_mapping[group_name] = ig_ig_set
|
||||
return ig_ig_mapping
|
||||
|
||||
@ -94,33 +96,6 @@ class InstanceManager(models.Manager):
|
||||
# NOTE: TODO: Likely to repurpose this once standalone ramparts are a thing
|
||||
return "tower"
|
||||
|
||||
def capacity_mapping(self, qs=None):
|
||||
"""
|
||||
Returns tuple of two dictionaries that shows mutual connections by name
|
||||
for global accounting of capacity
|
||||
|
||||
instance_ig_mapping: {'instance_name': <set of group names instance is member of>}
|
||||
ig_ig_mapping: {'group_name': <set of group names that share instances>}
|
||||
"""
|
||||
if qs is None:
|
||||
qs = self.all().prefetch_related('rampart_groups')
|
||||
instance_ig_mapping = {}
|
||||
ig_instance_mapping = {}
|
||||
# Create dictionaries that represent basic m2m memberships
|
||||
for instance in qs:
|
||||
if instance.capacity == 0:
|
||||
continue
|
||||
instance_ig_mapping[instance.hostname] = set(
|
||||
group.name for group in instance.rampart_groups.all()
|
||||
)
|
||||
for group in instance.rampart_groups.all():
|
||||
ig_instance_mapping.setdefault(group.name, set())
|
||||
ig_instance_mapping[group.name].add(instance.hostname)
|
||||
# Get IG capacity overlap mapping
|
||||
ig_ig_mapping = get_ig_ig_mapping(ig_instance_mapping, instance_ig_mapping)
|
||||
|
||||
return instance_ig_mapping, ig_ig_mapping
|
||||
|
||||
|
||||
class InstanceGroupManager(models.Manager):
|
||||
"""A custom manager class for the Instance model.
|
||||
@ -152,11 +127,20 @@ class InstanceGroupManager(models.Manager):
|
||||
|
||||
return instance_ig_mapping, ig_ig_mapping
|
||||
|
||||
@staticmethod
|
||||
def zero_out_group(graph, name, breakdown):
|
||||
if name not in graph:
|
||||
graph[name] = {}
|
||||
graph[name]['consumed_capacity'] = 0
|
||||
if breakdown:
|
||||
graph[name]['committed_capacity'] = 0
|
||||
graph[name]['running_capacity'] = 0
|
||||
|
||||
def capacity_values(self, qs=None, tasks=None, breakdown=False, graph=None):
|
||||
"""
|
||||
Returns a dictionary of capacity values for all IGs
|
||||
"""
|
||||
if qs is None:
|
||||
if qs is None: # Optionally BYOQS - bring your own queryset
|
||||
qs = self.all().prefetch_related('instances')
|
||||
instance_ig_mapping, ig_ig_mapping = self.capacity_mapping(qs=qs)
|
||||
|
||||
@ -167,22 +151,41 @@ class InstanceGroupManager(models.Manager):
|
||||
if graph is None:
|
||||
graph = {group.name: {} for group in qs}
|
||||
for group_name in graph:
|
||||
graph[group_name]['consumed_capacity'] = 0
|
||||
if breakdown:
|
||||
graph[group_name]['committed_capacity'] = 0
|
||||
graph[group_name]['running_capacity'] = 0
|
||||
self.zero_out_group(graph, group_name, breakdown)
|
||||
for t in tasks:
|
||||
# TODO: dock capacity for isolated job management tasks running in queue
|
||||
impact = t.task_impact
|
||||
if t.status == 'waiting' or not t.execution_node:
|
||||
# Subtract capacity from any peer groups that share instances
|
||||
for group_name in ig_ig_mapping[t.instance_group.name]:
|
||||
if not t.instance_group:
|
||||
logger.warning('Excluded %s from capacity algorithm '
|
||||
'(missing instance_group).', t.log_format)
|
||||
impacted_groups = []
|
||||
elif t.instance_group.name not in ig_ig_mapping:
|
||||
# Waiting job in group with 0 capacity has no collateral impact
|
||||
impacted_groups = [t.instance_group.name]
|
||||
else:
|
||||
impacted_groups = ig_ig_mapping[t.instance_group.name]
|
||||
for group_name in impacted_groups:
|
||||
if group_name not in graph:
|
||||
self.zero_out_group(graph, group_name, breakdown)
|
||||
graph[group_name]['consumed_capacity'] += impact
|
||||
if breakdown:
|
||||
graph[group_name]['committed_capacity'] += impact
|
||||
elif t.status == 'running':
|
||||
# Subtract capacity from all groups that contain the instance
|
||||
for group_name in instance_ig_mapping[t.execution_node]:
|
||||
if t.execution_node not in instance_ig_mapping:
|
||||
logger.warning('Detected %s running inside lost instance, '
|
||||
'may still be waiting for reaper.', t.log_format)
|
||||
if t.instance_group:
|
||||
impacted_groups = [t.instance_group.name]
|
||||
else:
|
||||
impacted_groups = []
|
||||
else:
|
||||
impacted_groups = instance_ig_mapping[t.execution_node]
|
||||
for group_name in impacted_groups:
|
||||
if group_name not in graph:
|
||||
self.zero_out_group(graph, group_name, breakdown)
|
||||
graph[group_name]['consumed_capacity'] += impact
|
||||
if breakdown:
|
||||
graph[group_name]['running_capacity'] += impact
|
||||
|
@ -38,7 +38,7 @@ class TaskManager():
|
||||
|
||||
def __init__(self):
|
||||
self.graph = dict()
|
||||
for rampart_group in InstanceGroup.objects.all():
|
||||
for rampart_group in InstanceGroup.objects.prefetch_related('instances'):
|
||||
self.graph[rampart_group.name] = dict(graph=DependencyGraph(rampart_group.name),
|
||||
capacity_total=rampart_group.capacity,
|
||||
consumed_capacity=0)
|
||||
|
@ -1,10 +1,8 @@
|
||||
import pytest
|
||||
import mock
|
||||
|
||||
from django.test import TransactionTestCase
|
||||
|
||||
from awx.main.models import (
|
||||
Job,
|
||||
Instance,
|
||||
InstanceGroup,
|
||||
)
|
||||
@ -28,42 +26,9 @@ class TestCapacityMapping(TransactionTestCase):
|
||||
def test_mapping(self):
|
||||
self.sample_cluster()
|
||||
with self.assertNumQueries(2):
|
||||
inst_map, ig_map = Instance.objects.capacity_mapping()
|
||||
inst_map, ig_map = InstanceGroup.objects.capacity_mapping()
|
||||
assert inst_map['i1'] == set(['ig_small'])
|
||||
assert inst_map['i2'] == set(['ig_large', 'tower'])
|
||||
assert ig_map['ig_small'] == set(['ig_small'])
|
||||
assert ig_map['ig_large'] == set(['ig_large', 'tower'])
|
||||
assert ig_map['tower'] == set(['ig_large', 'tower'])
|
||||
|
||||
def test_committed_capacity(self):
|
||||
tower, ig_large, ig_small = self.sample_cluster()
|
||||
tasks = [
|
||||
Job(status='waiting', instance_group=tower),
|
||||
Job(status='waiting', instance_group=ig_large),
|
||||
Job(status='waiting', instance_group=ig_small)
|
||||
]
|
||||
with mock.patch.object(Job, 'task_impact', new=mock.PropertyMock(return_value=43)):
|
||||
capacities = InstanceGroup.objects.capacity_values(
|
||||
tasks=tasks, breakdown=True
|
||||
)
|
||||
# Jobs submitted to either tower or ig_larg must count toward both
|
||||
assert capacities['tower']['committed_capacity'] == 43 * 2
|
||||
assert capacities['ig_large']['committed_capacity'] == 43 * 2
|
||||
assert capacities['ig_small']['committed_capacity'] == 43
|
||||
|
||||
def test_running_capacity(self):
|
||||
tower, ig_large, ig_small = self.sample_cluster()
|
||||
tasks = [
|
||||
Job(status='running', execution_node='i1'),
|
||||
Job(status='running', execution_node='i2'),
|
||||
Job(status='running', execution_node='i3')
|
||||
]
|
||||
with mock.patch.object(Job, 'task_impact', new=mock.PropertyMock(return_value=43)):
|
||||
capacities = InstanceGroup.objects.capacity_values(
|
||||
tasks=tasks, breakdown=True
|
||||
)
|
||||
# Tower is only given 1 instance
|
||||
assert capacities['tower']['running_capacity'] == 43
|
||||
# Large IG has 2 instances
|
||||
assert capacities['ig_large']['running_capacity'] == 43 * 2
|
||||
assert capacities['ig_small']['running_capacity'] == 43
|
||||
|
135
awx/main/tests/unit/test_capacity.py
Normal file
135
awx/main/tests/unit/test_capacity.py
Normal file
@ -0,0 +1,135 @@
|
||||
import pytest
|
||||
|
||||
from awx.main.models import InstanceGroup
|
||||
|
||||
|
||||
class FakeObject(object):
|
||||
def __init__(self, **kwargs):
|
||||
for k, v in kwargs.items():
|
||||
setattr(self, k, v)
|
||||
|
||||
|
||||
class Job(FakeObject):
|
||||
task_impact = 43
|
||||
|
||||
def log_format(self):
|
||||
return 'job 382 (fake)'
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def sample_cluster():
|
||||
def stand_up_cluster():
|
||||
|
||||
class Instances(FakeObject):
|
||||
def add(self, *args):
|
||||
for instance in args:
|
||||
self.obj.instance_list.append(instance)
|
||||
|
||||
def all(self):
|
||||
return self.obj.instance_list
|
||||
|
||||
class InstanceGroup(FakeObject):
|
||||
|
||||
def __init__(self, **kwargs):
|
||||
super(InstanceGroup, self).__init__(**kwargs)
|
||||
self.instance_list = []
|
||||
|
||||
@property
|
||||
def instances(self):
|
||||
mgr = Instances(obj=self)
|
||||
return mgr
|
||||
|
||||
|
||||
class Instance(FakeObject):
|
||||
pass
|
||||
|
||||
|
||||
ig_small = InstanceGroup(name='ig_small')
|
||||
ig_large = InstanceGroup(name='ig_large')
|
||||
tower = InstanceGroup(name='tower')
|
||||
i1 = Instance(hostname='i1', capacity=200)
|
||||
i2 = Instance(hostname='i2', capacity=200)
|
||||
i3 = Instance(hostname='i3', capacity=200)
|
||||
ig_small.instances.add(i1)
|
||||
ig_large.instances.add(i2, i3)
|
||||
tower.instances.add(i2)
|
||||
return [tower, ig_large, ig_small]
|
||||
return stand_up_cluster
|
||||
|
||||
|
||||
def test_committed_capacity(sample_cluster):
|
||||
tower, ig_large, ig_small = sample_cluster()
|
||||
tasks = [
|
||||
Job(status='waiting', instance_group=tower),
|
||||
Job(status='waiting', instance_group=ig_large),
|
||||
Job(status='waiting', instance_group=ig_small)
|
||||
]
|
||||
capacities = InstanceGroup.objects.capacity_values(
|
||||
qs=[tower, ig_large, ig_small], tasks=tasks, breakdown=True
|
||||
)
|
||||
# Jobs submitted to either tower or ig_larg must count toward both
|
||||
assert capacities['tower']['committed_capacity'] == 43 * 2
|
||||
assert capacities['ig_large']['committed_capacity'] == 43 * 2
|
||||
assert capacities['ig_small']['committed_capacity'] == 43
|
||||
|
||||
|
||||
def test_running_capacity(sample_cluster):
|
||||
tower, ig_large, ig_small = sample_cluster()
|
||||
tasks = [
|
||||
Job(status='running', execution_node='i1'),
|
||||
Job(status='running', execution_node='i2'),
|
||||
Job(status='running', execution_node='i3')
|
||||
]
|
||||
capacities = InstanceGroup.objects.capacity_values(
|
||||
qs=[tower, ig_large, ig_small], tasks=tasks, breakdown=True
|
||||
)
|
||||
# Tower is only given 1 instance
|
||||
assert capacities['tower']['running_capacity'] == 43
|
||||
# Large IG has 2 instances
|
||||
assert capacities['ig_large']['running_capacity'] == 43 * 2
|
||||
assert capacities['ig_small']['running_capacity'] == 43
|
||||
|
||||
|
||||
def test_offline_node_running(sample_cluster):
|
||||
"""
|
||||
Assure that algorithm doesn't explode if a job is marked running
|
||||
in an offline node
|
||||
"""
|
||||
tower, ig_large, ig_small = sample_cluster()
|
||||
ig_small.instance_list[0].capacity = 0
|
||||
tasks = [Job(status='running', execution_node='i1', instance_group=ig_small)]
|
||||
capacities = InstanceGroup.objects.capacity_values(
|
||||
qs=[tower, ig_large, ig_small], tasks=tasks)
|
||||
assert capacities['ig_small']['consumed_capacity'] == 43
|
||||
|
||||
|
||||
def test_offline_node_waiting(sample_cluster):
|
||||
"""
|
||||
Same but for a waiting job
|
||||
"""
|
||||
tower, ig_large, ig_small = sample_cluster()
|
||||
ig_small.instance_list[0].capacity = 0
|
||||
tasks = [Job(status='waiting', instance_group=ig_small)]
|
||||
capacities = InstanceGroup.objects.capacity_values(
|
||||
qs=[tower, ig_large, ig_small], tasks=tasks)
|
||||
assert capacities['ig_small']['consumed_capacity'] == 43
|
||||
|
||||
|
||||
def test_RBAC_reduced_filter(sample_cluster):
|
||||
"""
|
||||
User can see jobs that are running in `ig_small` and `ig_large` IGs,
|
||||
but user does not have permission to see those actual instance groups.
|
||||
Verify that this does not blow everything up.
|
||||
"""
|
||||
tower, ig_large, ig_small = sample_cluster()
|
||||
tasks = [
|
||||
Job(status='waiting', instance_group=tower),
|
||||
Job(status='waiting', instance_group=ig_large),
|
||||
Job(status='waiting', instance_group=ig_small)
|
||||
]
|
||||
capacities = InstanceGroup.objects.capacity_values(
|
||||
qs=[tower], tasks=tasks, breakdown=True
|
||||
)
|
||||
# Cross-links between groups not visible to current user,
|
||||
# so a naieve accounting of capacities is returned instead
|
||||
assert capacities['tower']['committed_capacity'] == 43
|
@ -20,7 +20,7 @@ class TestCleanupInconsistentCeleryTasks():
|
||||
@mock.patch.object(cache, 'get', return_value=None)
|
||||
@mock.patch.object(TaskManager, 'get_active_tasks', return_value=([], {}))
|
||||
@mock.patch.object(TaskManager, 'get_running_tasks', return_value=({'host1': [Job(id=2), Job(id=3),]}, []))
|
||||
@mock.patch.object(InstanceGroup.objects, 'all', return_value=[])
|
||||
@mock.patch.object(InstanceGroup.objects, 'prefetch_related', return_value=[])
|
||||
@mock.patch.object(Instance.objects, 'get', side_effect=Instance.DoesNotExist)
|
||||
@mock.patch('awx.main.scheduler.logger')
|
||||
def test_instance_does_not_exist(self, logger_mock, *args):
|
||||
@ -36,7 +36,7 @@ class TestCleanupInconsistentCeleryTasks():
|
||||
|
||||
@mock.patch.object(cache, 'get', return_value=None)
|
||||
@mock.patch.object(TaskManager, 'get_active_tasks', return_value=([], {'host1': []}))
|
||||
@mock.patch.object(InstanceGroup.objects, 'all', return_value=[])
|
||||
@mock.patch.object(InstanceGroup.objects, 'prefetch_related', return_value=[])
|
||||
@mock.patch.object(TaskManager, 'get_running_tasks')
|
||||
@mock.patch('awx.main.scheduler.logger')
|
||||
def test_save_failed(self, logger_mock, get_running_tasks, *args):
|
||||
|
Loading…
Reference in New Issue
Block a user