1
0
mirror of https://github.com/ansible/awx.git synced 2024-10-31 15:21:13 +03:00

Merge pull request #337 from AlanCoding/capacity_fix

correct capacity algorithm for task manager
This commit is contained in:
Alan Rominger 2017-08-28 16:49:19 -04:00 committed by GitHub
commit ba8326285e
9 changed files with 352 additions and 38 deletions

View File

@ -3564,6 +3564,7 @@ class InstanceSerializer(BaseSerializer):
class InstanceGroupSerializer(BaseSerializer):
consumed_capacity = serializers.SerializerMethodField()
percent_capacity_remaining = serializers.SerializerMethodField()
jobs_running = serializers.SerializerMethodField()
instances = serializers.SerializerMethodField()
@ -3581,17 +3582,37 @@ class InstanceGroupSerializer(BaseSerializer):
res['controller'] = self.reverse('api:instance_group_detail', kwargs={'pk': obj.controller_id})
return res
def get_jobs_qs(self):
# Store running jobs queryset in context, so it will be shared in ListView
if 'running_jobs' not in self.context:
self.context['running_jobs'] = UnifiedJob.objects.filter(
status__in=('running', 'waiting'))
return self.context['running_jobs']
def get_capacity_dict(self):
# Store capacity values (globally computed) in the context
if 'capacity_map' not in self.context:
ig_qs = None
if self.parent: # Is ListView:
ig_qs = self.parent.instance
self.context['capacity_map'] = InstanceGroup.objects.capacity_values(
qs=ig_qs, tasks=self.get_jobs_qs(), breakdown=True)
return self.context['capacity_map']
def get_consumed_capacity(self, obj):
return obj.consumed_capacity
return self.get_capacity_dict()[obj.name]['consumed_capacity']
def get_percent_capacity_remaining(self, obj):
if not obj.capacity or obj.consumed_capacity == obj.capacity:
if not obj.capacity:
return 0.0
else:
return float("{0:.2f}".format(((float(obj.capacity) - float(obj.consumed_capacity)) / (float(obj.capacity))) * 100))
return float("{0:.2f}".format(
((float(obj.capacity) - float(self.get_consumed_capacity(obj))) / (float(obj.capacity))) * 100)
)
def get_jobs_running(self, obj):
return UnifiedJob.objects.filter(instance_group=obj, status__in=('running', 'waiting',)).count()
jobs_qs = self.get_jobs_qs()
return sum(1 for job in jobs_qs if job.instance_group_id == obj.id)
def get_instances(self, obj):
return obj.instances.count()

View File

@ -377,9 +377,11 @@ class InstanceAccess(BaseAccess):
def get_queryset(self):
if self.user.is_superuser or self.user.is_system_auditor:
return Instance.objects.all().distinct()
qs = Instance.objects.all().distinct()
else:
return Instance.objects.filter(rampart_groups__in=self.user.get_queryset(InstanceGroup)).distinct()
qs = Instance.objects.filter(
rampart_groups__in=self.user.get_queryset(InstanceGroup)).distinct()
return qs.prefetch_related('rampart_groups')
def can_add(self, data):
return False
@ -397,9 +399,11 @@ class InstanceGroupAccess(BaseAccess):
def get_queryset(self):
if self.user.is_superuser or self.user.is_system_auditor:
return InstanceGroup.objects.all()
qs = InstanceGroup.objects.all()
else:
return InstanceGroup.objects.filter(organization__in=Organization.accessible_objects(self.user, 'admin_role'))
qs = InstanceGroup.objects.filter(
organization__in=Organization.accessible_pk_qs(self.user, 'admin_role'))
return qs.prefetch_related('instances')
def can_add(self, data):
return False

View File

@ -3,6 +3,7 @@
import sys
from datetime import timedelta
import logging
from django.db import models
from django.utils.timezone import now
@ -11,7 +12,9 @@ from django.conf import settings
from awx.main.utils.filters import SmartFilter
___all__ = ['HostManager', 'InstanceManager']
___all__ = ['HostManager', 'InstanceManager', 'InstanceGroupManager']
logger = logging.getLogger('awx.main.managers')
class HostManager(models.Manager):
@ -48,6 +51,19 @@ class HostManager(models.Manager):
return qs
def get_ig_ig_mapping(ig_instance_mapping, instance_ig_mapping):
# Create IG mapping by union of all groups their instances are members of
ig_ig_mapping = {}
for group_name in ig_instance_mapping.keys():
ig_ig_set = set()
for instance_hostname in ig_instance_mapping[group_name]:
ig_ig_set |= instance_ig_mapping[instance_hostname]
else:
ig_ig_set.add(group_name) # Group contains no instances, return self
ig_ig_mapping[group_name] = ig_ig_set
return ig_ig_mapping
class InstanceManager(models.Manager):
"""A custom manager class for the Instance model.
@ -79,3 +95,100 @@ class InstanceManager(models.Manager):
def my_role(self):
# NOTE: TODO: Likely to repurpose this once standalone ramparts are a thing
return "tower"
class InstanceGroupManager(models.Manager):
"""A custom manager class for the Instance model.
Used for global capacity calculations
"""
def capacity_mapping(self, qs=None):
"""
Another entry-point to Instance manager method by same name
"""
if qs is None:
qs = self.all().prefetch_related('instances')
instance_ig_mapping = {}
ig_instance_mapping = {}
# Create dictionaries that represent basic m2m memberships
for group in qs:
ig_instance_mapping[group.name] = set(
instance.hostname for instance in group.instances.all() if
instance.capacity != 0
)
for inst in group.instances.all():
if inst.capacity == 0:
continue
instance_ig_mapping.setdefault(inst.hostname, set())
instance_ig_mapping[inst.hostname].add(group.name)
# Get IG capacity overlap mapping
ig_ig_mapping = get_ig_ig_mapping(ig_instance_mapping, instance_ig_mapping)
return instance_ig_mapping, ig_ig_mapping
@staticmethod
def zero_out_group(graph, name, breakdown):
if name not in graph:
graph[name] = {}
graph[name]['consumed_capacity'] = 0
if breakdown:
graph[name]['committed_capacity'] = 0
graph[name]['running_capacity'] = 0
def capacity_values(self, qs=None, tasks=None, breakdown=False, graph=None):
"""
Returns a dictionary of capacity values for all IGs
"""
if qs is None: # Optionally BYOQS - bring your own queryset
qs = self.all().prefetch_related('instances')
instance_ig_mapping, ig_ig_mapping = self.capacity_mapping(qs=qs)
if tasks is None:
tasks = self.model.unifiedjob_set.related.related_model.objects.filter(
status__in=('running', 'waiting'))
if graph is None:
graph = {group.name: {} for group in qs}
for group_name in graph:
self.zero_out_group(graph, group_name, breakdown)
for t in tasks:
# TODO: dock capacity for isolated job management tasks running in queue
impact = t.task_impact
if t.status == 'waiting' or not t.execution_node:
# Subtract capacity from any peer groups that share instances
if not t.instance_group:
logger.warning('Excluded %s from capacity algorithm '
'(missing instance_group).', t.log_format)
impacted_groups = []
elif t.instance_group.name not in ig_ig_mapping:
# Waiting job in group with 0 capacity has no collateral impact
impacted_groups = [t.instance_group.name]
else:
impacted_groups = ig_ig_mapping[t.instance_group.name]
for group_name in impacted_groups:
if group_name not in graph:
self.zero_out_group(graph, group_name, breakdown)
graph[group_name]['consumed_capacity'] += impact
if breakdown:
graph[group_name]['committed_capacity'] += impact
elif t.status == 'running':
# Subtract capacity from all groups that contain the instance
if t.execution_node not in instance_ig_mapping:
logger.warning('Detected %s running inside lost instance, '
'may still be waiting for reaper.', t.log_format)
if t.instance_group:
impacted_groups = [t.instance_group.name]
else:
impacted_groups = []
else:
impacted_groups = instance_ig_mapping[t.execution_node]
for group_name in impacted_groups:
if group_name not in graph:
self.zero_out_group(graph, group_name, breakdown)
graph[group_name]['consumed_capacity'] += impact
if breakdown:
graph[group_name]['running_capacity'] += impact
else:
logger.error('Programming error, %s not in ["running", "waiting"]', t.log_format)
return graph

View File

@ -11,7 +11,7 @@ from django.utils.timezone import now, timedelta
from solo.models import SingletonModel
from awx.api.versioning import reverse
from awx.main.managers import InstanceManager
from awx.main.managers import InstanceManager, InstanceGroupManager
from awx.main.models.inventory import InventoryUpdate
from awx.main.models.jobs import Job
from awx.main.models.projects import ProjectUpdate
@ -66,6 +66,8 @@ class Instance(models.Model):
class InstanceGroup(models.Model):
"""A model representing a Queue/Group of AWX Instances."""
objects = InstanceGroupManager()
name = models.CharField(max_length=250, unique=True)
created = models.DateTimeField(auto_now_add=True)
modified = models.DateTimeField(auto_now=True)
@ -89,12 +91,7 @@ class InstanceGroup(models.Model):
@property
def capacity(self):
return sum([x[0] for x in self.instances.values_list('capacity')])
@property
def consumed_capacity(self):
return sum(x.task_impact for x in UnifiedJob.objects.filter(instance_group=self,
status__in=('running', 'waiting')))
return sum([inst.capacity for inst in self.instances.all()])
class Meta:
app_label = 'main'

View File

@ -38,10 +38,10 @@ class TaskManager():
def __init__(self):
self.graph = dict()
for rampart_group in InstanceGroup.objects.all():
for rampart_group in InstanceGroup.objects.prefetch_related('instances'):
self.graph[rampart_group.name] = dict(graph=DependencyGraph(rampart_group.name),
capacity_total=rampart_group.capacity,
capacity_used=0)
consumed_capacity=0)
def is_job_blocked(self, task):
# TODO: I'm not happy with this, I think blocking behavior should be decided outside of the dependency graph
@ -403,15 +403,21 @@ class TaskManager():
preferred_instance_groups = task.preferred_instance_groups
found_acceptable_queue = False
for rampart_group in preferred_instance_groups:
if self.get_remaining_capacity(rampart_group.name) <= 0:
logger.debug("Skipping group %s capacity <= 0", rampart_group.name)
remaining_capacity = self.get_remaining_capacity(rampart_group.name)
if remaining_capacity <= 0:
logger.debug("Skipping group %s, remaining_capacity %s <= 0",
rampart_group.name, remaining_capacity)
continue
if not self.would_exceed_capacity(task, rampart_group.name):
logger.debug("Starting %s in group %s", task.log_format, rampart_group.name)
logger.debug("Starting %s in group %s (remaining_capacity=%s)",
task.log_format, rampart_group.name, remaining_capacity)
self.graph[rampart_group.name]['graph'].add_job(task)
self.start_task(task, rampart_group, task.get_jobs_fail_chain())
found_acceptable_queue = True
break
else:
logger.debug("Not enough capacity to run %s on %s (remaining_capacity=%s)",
task.log_format, rampart_group.name, remaining_capacity)
if not found_acceptable_queue:
logger.debug("%s couldn't be scheduled on graph, waiting for next cycle", task.log_format)
@ -486,32 +492,29 @@ class TaskManager():
self.fail_jobs_if_not_in_celery(node_jobs, active_tasks, celery_task_start_time)
def calculate_capacity_used(self, tasks):
for rampart_group in self.graph:
self.graph[rampart_group]['capacity_used'] = 0
for t in tasks:
# TODO: dock capacity for isolated job management tasks running in queue
for group_actual in InstanceGroup.objects.filter(instances__hostname=t.execution_node).values_list('name'):
if group_actual[0] in self.graph:
self.graph[group_actual[0]]['capacity_used'] += t.task_impact
def calculate_capacity_consumed(self, tasks):
self.graph = InstanceGroup.objects.capacity_values(tasks=tasks, graph=self.graph)
def would_exceed_capacity(self, task, instance_group):
current_capacity = self.graph[instance_group]['capacity_used']
current_capacity = self.graph[instance_group]['consumed_capacity']
capacity_total = self.graph[instance_group]['capacity_total']
if current_capacity == 0:
return False
return (task.task_impact + current_capacity > capacity_total)
def consume_capacity(self, task, instance_group):
self.graph[instance_group]['capacity_used'] += task.task_impact
logger.debug('%s consumed %s capacity units from %s with prior total of %s',
task.log_format, task.task_impact, instance_group,
self.graph[instance_group]['consumed_capacity'])
self.graph[instance_group]['consumed_capacity'] += task.task_impact
def get_remaining_capacity(self, instance_group):
return (self.graph[instance_group]['capacity_total'] - self.graph[instance_group]['capacity_used'])
return (self.graph[instance_group]['capacity_total'] - self.graph[instance_group]['consumed_capacity'])
def process_tasks(self, all_sorted_tasks):
running_tasks = filter(lambda t: t.status in ['waiting', 'running'], all_sorted_tasks)
self.calculate_capacity_used(running_tasks)
self.calculate_capacity_consumed(running_tasks)
self.process_running_tasks(running_tasks)
@ -540,12 +543,13 @@ class TaskManager():
return finished_wfjs
def schedule(self):
logger.debug("Starting Schedule")
with transaction.atomic():
# Lock
with advisory_lock('task_manager_lock', wait=False) as acquired:
if acquired is False:
logger.debug("Not running scheduler, another task holds lock")
return
logger.debug("Starting Scheduler")
self.cleanup_inconsistent_celery_tasks()
finished_wfjs = self._schedule()

View File

@ -3,7 +3,7 @@
import logging
# Celery
from celery import task
from celery import Task, task
# AWX
from awx.main.scheduler import TaskManager
@ -15,6 +15,12 @@ logger = logging.getLogger('awx.main.scheduler')
# updated model, the call to schedule() may get stale data.
class LogErrorsTask(Task):
def on_failure(self, exc, task_id, args, kwargs, einfo):
logger.exception('Task {} encountered exception.'.format(self.name), exc_info=exc)
super(LogErrorsTask, self).on_failure(exc, task_id, args, kwargs, einfo)
@task
def run_job_launch(job_id):
TaskManager().schedule()
@ -25,7 +31,7 @@ def run_job_complete(job_id):
TaskManager().schedule()
@task
@task(base=LogErrorsTask)
def run_task_manager():
logger.debug("Running Tower task manager.")
TaskManager().schedule()

View File

@ -0,0 +1,34 @@
import pytest
from django.test import TransactionTestCase
from awx.main.models import (
Instance,
InstanceGroup,
)
@pytest.mark.django_db
class TestCapacityMapping(TransactionTestCase):
def sample_cluster(self):
ig_small = InstanceGroup.objects.create(name='ig_small')
ig_large = InstanceGroup.objects.create(name='ig_large')
tower = InstanceGroup.objects.create(name='tower')
i1 = Instance.objects.create(hostname='i1', capacity=200)
i2 = Instance.objects.create(hostname='i2', capacity=200)
i3 = Instance.objects.create(hostname='i3', capacity=200)
ig_small.instances.add(i1)
ig_large.instances.add(i2, i3)
tower.instances.add(i2)
return [tower, ig_large, ig_small]
def test_mapping(self):
self.sample_cluster()
with self.assertNumQueries(2):
inst_map, ig_map = InstanceGroup.objects.capacity_mapping()
assert inst_map['i1'] == set(['ig_small'])
assert inst_map['i2'] == set(['ig_large', 'tower'])
assert ig_map['ig_small'] == set(['ig_small'])
assert ig_map['ig_large'] == set(['ig_large', 'tower'])
assert ig_map['tower'] == set(['ig_large', 'tower'])

View File

@ -0,0 +1,135 @@
import pytest
from awx.main.models import InstanceGroup
class FakeObject(object):
def __init__(self, **kwargs):
for k, v in kwargs.items():
setattr(self, k, v)
class Job(FakeObject):
task_impact = 43
def log_format(self):
return 'job 382 (fake)'
@pytest.fixture
def sample_cluster():
def stand_up_cluster():
class Instances(FakeObject):
def add(self, *args):
for instance in args:
self.obj.instance_list.append(instance)
def all(self):
return self.obj.instance_list
class InstanceGroup(FakeObject):
def __init__(self, **kwargs):
super(InstanceGroup, self).__init__(**kwargs)
self.instance_list = []
@property
def instances(self):
mgr = Instances(obj=self)
return mgr
class Instance(FakeObject):
pass
ig_small = InstanceGroup(name='ig_small')
ig_large = InstanceGroup(name='ig_large')
tower = InstanceGroup(name='tower')
i1 = Instance(hostname='i1', capacity=200)
i2 = Instance(hostname='i2', capacity=200)
i3 = Instance(hostname='i3', capacity=200)
ig_small.instances.add(i1)
ig_large.instances.add(i2, i3)
tower.instances.add(i2)
return [tower, ig_large, ig_small]
return stand_up_cluster
def test_committed_capacity(sample_cluster):
tower, ig_large, ig_small = sample_cluster()
tasks = [
Job(status='waiting', instance_group=tower),
Job(status='waiting', instance_group=ig_large),
Job(status='waiting', instance_group=ig_small)
]
capacities = InstanceGroup.objects.capacity_values(
qs=[tower, ig_large, ig_small], tasks=tasks, breakdown=True
)
# Jobs submitted to either tower or ig_larg must count toward both
assert capacities['tower']['committed_capacity'] == 43 * 2
assert capacities['ig_large']['committed_capacity'] == 43 * 2
assert capacities['ig_small']['committed_capacity'] == 43
def test_running_capacity(sample_cluster):
tower, ig_large, ig_small = sample_cluster()
tasks = [
Job(status='running', execution_node='i1'),
Job(status='running', execution_node='i2'),
Job(status='running', execution_node='i3')
]
capacities = InstanceGroup.objects.capacity_values(
qs=[tower, ig_large, ig_small], tasks=tasks, breakdown=True
)
# Tower is only given 1 instance
assert capacities['tower']['running_capacity'] == 43
# Large IG has 2 instances
assert capacities['ig_large']['running_capacity'] == 43 * 2
assert capacities['ig_small']['running_capacity'] == 43
def test_offline_node_running(sample_cluster):
"""
Assure that algorithm doesn't explode if a job is marked running
in an offline node
"""
tower, ig_large, ig_small = sample_cluster()
ig_small.instance_list[0].capacity = 0
tasks = [Job(status='running', execution_node='i1', instance_group=ig_small)]
capacities = InstanceGroup.objects.capacity_values(
qs=[tower, ig_large, ig_small], tasks=tasks)
assert capacities['ig_small']['consumed_capacity'] == 43
def test_offline_node_waiting(sample_cluster):
"""
Same but for a waiting job
"""
tower, ig_large, ig_small = sample_cluster()
ig_small.instance_list[0].capacity = 0
tasks = [Job(status='waiting', instance_group=ig_small)]
capacities = InstanceGroup.objects.capacity_values(
qs=[tower, ig_large, ig_small], tasks=tasks)
assert capacities['ig_small']['consumed_capacity'] == 43
def test_RBAC_reduced_filter(sample_cluster):
"""
User can see jobs that are running in `ig_small` and `ig_large` IGs,
but user does not have permission to see those actual instance groups.
Verify that this does not blow everything up.
"""
tower, ig_large, ig_small = sample_cluster()
tasks = [
Job(status='waiting', instance_group=tower),
Job(status='waiting', instance_group=ig_large),
Job(status='waiting', instance_group=ig_small)
]
capacities = InstanceGroup.objects.capacity_values(
qs=[tower], tasks=tasks, breakdown=True
)
# Cross-links between groups not visible to current user,
# so a naieve accounting of capacities is returned instead
assert capacities['tower']['committed_capacity'] == 43

View File

@ -20,7 +20,7 @@ class TestCleanupInconsistentCeleryTasks():
@mock.patch.object(cache, 'get', return_value=None)
@mock.patch.object(TaskManager, 'get_active_tasks', return_value=([], {}))
@mock.patch.object(TaskManager, 'get_running_tasks', return_value=({'host1': [Job(id=2), Job(id=3),]}, []))
@mock.patch.object(InstanceGroup.objects, 'all', return_value=[])
@mock.patch.object(InstanceGroup.objects, 'prefetch_related', return_value=[])
@mock.patch.object(Instance.objects, 'get', side_effect=Instance.DoesNotExist)
@mock.patch('awx.main.scheduler.logger')
def test_instance_does_not_exist(self, logger_mock, *args):
@ -36,7 +36,7 @@ class TestCleanupInconsistentCeleryTasks():
@mock.patch.object(cache, 'get', return_value=None)
@mock.patch.object(TaskManager, 'get_active_tasks', return_value=([], {'host1': []}))
@mock.patch.object(InstanceGroup.objects, 'all', return_value=[])
@mock.patch.object(InstanceGroup.objects, 'prefetch_related', return_value=[])
@mock.patch.object(TaskManager, 'get_running_tasks')
@mock.patch('awx.main.scheduler.logger')
def test_save_failed(self, logger_mock, get_running_tasks, *args):