mirror of
https://github.com/ansible/awx.git
synced 2024-11-01 16:51:11 +03:00
decide the node a job will run early
* Deciding the Instance that a Job runs on at celery task run-time makes it hard to evenly distribute tasks among Instnaces. Instead, the task manager will look at the world of running jobs and choose an instance node to run on; applying a deterministic job distribution algo.
This commit is contained in:
parent
f394c02afd
commit
e720fe5dd0
@ -92,6 +92,10 @@ class Instance(BaseModel):
|
|||||||
return sum(x.task_impact for x in UnifiedJob.objects.filter(execution_node=self.hostname,
|
return sum(x.task_impact for x in UnifiedJob.objects.filter(execution_node=self.hostname,
|
||||||
status__in=('running', 'waiting')))
|
status__in=('running', 'waiting')))
|
||||||
|
|
||||||
|
@property
|
||||||
|
def remaining_capacity(self):
|
||||||
|
return self.capacity - self.consumed_capacity
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def role(self):
|
def role(self):
|
||||||
# NOTE: TODO: Likely to repurpose this once standalone ramparts are a thing
|
# NOTE: TODO: Likely to repurpose this once standalone ramparts are a thing
|
||||||
@ -187,6 +191,25 @@ class InstanceGroup(BaseModel, RelatedJobsMixin):
|
|||||||
validate_queuename(self.name)
|
validate_queuename(self.name)
|
||||||
return self.name
|
return self.name
|
||||||
|
|
||||||
|
def fit_task_to_most_remaining_capacity_instance(self, task):
|
||||||
|
instance_most_capacity = None
|
||||||
|
for i in self.instances.order_by('hostname'):
|
||||||
|
if i.remaining_capacity >= task.task_impact and \
|
||||||
|
(instance_most_capacity is None or
|
||||||
|
i.remaining_capacity > instance_most_capacity.remaining_capacity):
|
||||||
|
instance_most_capacity = i
|
||||||
|
return instance_most_capacity
|
||||||
|
|
||||||
|
def find_largest_idle_instance(self):
|
||||||
|
largest_instance = None
|
||||||
|
for i in self.instances.order_by('hostname'):
|
||||||
|
if i.jobs_running == 0:
|
||||||
|
if largest_instance is None:
|
||||||
|
largest_instance = i
|
||||||
|
elif i.capacity > largest_instance.capacity:
|
||||||
|
largest_instance = i
|
||||||
|
return largest_instance
|
||||||
|
|
||||||
|
|
||||||
class TowerScheduleState(SingletonModel):
|
class TowerScheduleState(SingletonModel):
|
||||||
schedule_last_run = models.DateTimeField(auto_now_add=True)
|
schedule_last_run = models.DateTimeField(auto_now_add=True)
|
||||||
|
@ -1228,9 +1228,9 @@ class UnifiedJob(PolymorphicModel, PasswordFieldsModel, CommonModelNameNotUnique
|
|||||||
raise RuntimeError("Expected celery_task_id to be set on model.")
|
raise RuntimeError("Expected celery_task_id to be set on model.")
|
||||||
kwargs['task_id'] = self.celery_task_id
|
kwargs['task_id'] = self.celery_task_id
|
||||||
task_class = self._get_task_class()
|
task_class = self._get_task_class()
|
||||||
|
args = [self.pk]
|
||||||
from awx.main.models.ha import InstanceGroup
|
from awx.main.models.ha import InstanceGroup
|
||||||
ig = InstanceGroup.objects.get(name=queue)
|
ig = InstanceGroup.objects.get(name=queue)
|
||||||
args = [self.pk]
|
|
||||||
if ig.controller_id:
|
if ig.controller_id:
|
||||||
if self.supports_isolation(): # case of jobs and ad hoc commands
|
if self.supports_isolation(): # case of jobs and ad hoc commands
|
||||||
isolated_instance = ig.instances.order_by('-capacity').first()
|
isolated_instance = ig.instances.order_by('-capacity').first()
|
||||||
|
@ -234,7 +234,7 @@ class TaskManager():
|
|||||||
def get_dependent_jobs_for_inv_and_proj_update(self, job_obj):
|
def get_dependent_jobs_for_inv_and_proj_update(self, job_obj):
|
||||||
return [{'type': j.model_to_str(), 'id': j.id} for j in job_obj.dependent_jobs.all()]
|
return [{'type': j.model_to_str(), 'id': j.id} for j in job_obj.dependent_jobs.all()]
|
||||||
|
|
||||||
def start_task(self, task, rampart_group, dependent_tasks=None):
|
def start_task(self, task, rampart_group, dependent_tasks=None, instance=None):
|
||||||
from awx.main.tasks import handle_work_error, handle_work_success
|
from awx.main.tasks import handle_work_error, handle_work_success
|
||||||
|
|
||||||
dependent_tasks = dependent_tasks or []
|
dependent_tasks = dependent_tasks or []
|
||||||
@ -269,7 +269,11 @@ class TaskManager():
|
|||||||
task.log_format, task.instance_group_id, rampart_group.controller_id)
|
task.log_format, task.instance_group_id, rampart_group.controller_id)
|
||||||
else:
|
else:
|
||||||
task.instance_group = rampart_group
|
task.instance_group = rampart_group
|
||||||
logger.info('Submitting %s to instance group %s.', task.log_format, task.instance_group_id)
|
if instance is not None:
|
||||||
|
task.execution_node = instance.hostname
|
||||||
|
logger.debug(six.text_type("Dependent {} is blocked from running").format(task.log_format))
|
||||||
|
logger.info(six.text_type('Submitting {} to <instance group, instance> <{},{}>.').format(
|
||||||
|
task.log_format, task.instance_group_id, task.execution_node))
|
||||||
with disable_activity_stream():
|
with disable_activity_stream():
|
||||||
task.celery_task_id = str(uuid.uuid4())
|
task.celery_task_id = str(uuid.uuid4())
|
||||||
task.save()
|
task.save()
|
||||||
@ -280,8 +284,8 @@ class TaskManager():
|
|||||||
def post_commit():
|
def post_commit():
|
||||||
task.websocket_emit_status(task.status)
|
task.websocket_emit_status(task.status)
|
||||||
if task.status != 'failed':
|
if task.status != 'failed':
|
||||||
if rampart_group is not None:
|
if instance is not None:
|
||||||
actual_queue=rampart_group.name
|
actual_queue=instance.hostname
|
||||||
else:
|
else:
|
||||||
actual_queue=settings.CELERY_DEFAULT_QUEUE
|
actual_queue=settings.CELERY_DEFAULT_QUEUE
|
||||||
task.start_celery_task(opts, error_callback=error_handler, success_callback=success_handler, queue=actual_queue)
|
task.start_celery_task(opts, error_callback=error_handler, success_callback=success_handler, queue=actual_queue)
|
||||||
@ -433,17 +437,32 @@ class TaskManager():
|
|||||||
continue
|
continue
|
||||||
preferred_instance_groups = task.preferred_instance_groups
|
preferred_instance_groups = task.preferred_instance_groups
|
||||||
found_acceptable_queue = False
|
found_acceptable_queue = False
|
||||||
|
idle_instance_that_fits = None
|
||||||
for rampart_group in preferred_instance_groups:
|
for rampart_group in preferred_instance_groups:
|
||||||
|
if idle_instance_that_fits is None:
|
||||||
|
idle_instance_that_fits = rampart_group.find_largest_idle_instance()
|
||||||
if self.get_remaining_capacity(rampart_group.name) <= 0:
|
if self.get_remaining_capacity(rampart_group.name) <= 0:
|
||||||
logger.debug(six.text_type("Skipping group {} capacity <= 0").format(rampart_group.name))
|
logger.debug(six.text_type("Skipping group {} capacity <= 0").format(rampart_group.name))
|
||||||
continue
|
continue
|
||||||
if not self.would_exceed_capacity(task, rampart_group.name):
|
|
||||||
logger.debug(six.text_type("Starting dependent {} in group {}").format(task.log_format, rampart_group.name))
|
execution_instance = rampart_group.fit_task_to_most_remaining_capacity_instance(task)
|
||||||
|
if execution_instance:
|
||||||
|
logger.debug(six.text_type("Starting dependent {} in group {} instance {}").format(
|
||||||
|
task.log_format, rampart_group.name, execution_instance.hostname))
|
||||||
|
elif not execution_instance and idle_instance_that_fits:
|
||||||
|
execution_instance = idle_instance_that_fits
|
||||||
|
logger.debug(six.text_type("Starting dependent {} in group {} on idle instance {}").format(
|
||||||
|
task.log_format, rampart_group.name, execution_instance.hostname))
|
||||||
|
if execution_instance:
|
||||||
self.graph[rampart_group.name]['graph'].add_job(task)
|
self.graph[rampart_group.name]['graph'].add_job(task)
|
||||||
tasks_to_fail = filter(lambda t: t != task, dependency_tasks)
|
tasks_to_fail = filter(lambda t: t != task, dependency_tasks)
|
||||||
tasks_to_fail += [dependent_task]
|
tasks_to_fail += [dependent_task]
|
||||||
self.start_task(task, rampart_group, tasks_to_fail)
|
self.start_task(task, rampart_group, tasks_to_fail, execution_instance)
|
||||||
found_acceptable_queue = True
|
found_acceptable_queue = True
|
||||||
|
break
|
||||||
|
else:
|
||||||
|
logger.debug(six.text_type("No instance available in group {} to run job {} w/ capacity requirement {}").format(
|
||||||
|
rampart_group.name, task.log_format, task.task_impact))
|
||||||
if not found_acceptable_queue:
|
if not found_acceptable_queue:
|
||||||
logger.debug(six.text_type("Dependent {} couldn't be scheduled on graph, waiting for next cycle").format(task.log_format))
|
logger.debug(six.text_type("Dependent {} couldn't be scheduled on graph, waiting for next cycle").format(task.log_format))
|
||||||
|
|
||||||
@ -455,25 +474,35 @@ class TaskManager():
|
|||||||
continue
|
continue
|
||||||
preferred_instance_groups = task.preferred_instance_groups
|
preferred_instance_groups = task.preferred_instance_groups
|
||||||
found_acceptable_queue = False
|
found_acceptable_queue = False
|
||||||
|
idle_instance_that_fits = None
|
||||||
if isinstance(task, WorkflowJob):
|
if isinstance(task, WorkflowJob):
|
||||||
self.start_task(task, None, task.get_jobs_fail_chain())
|
self.start_task(task, None, task.get_jobs_fail_chain(), None)
|
||||||
continue
|
continue
|
||||||
for rampart_group in preferred_instance_groups:
|
for rampart_group in preferred_instance_groups:
|
||||||
|
if idle_instance_that_fits is None:
|
||||||
|
idle_instance_that_fits = rampart_group.find_largest_idle_instance()
|
||||||
remaining_capacity = self.get_remaining_capacity(rampart_group.name)
|
remaining_capacity = self.get_remaining_capacity(rampart_group.name)
|
||||||
if remaining_capacity <= 0:
|
if remaining_capacity <= 0:
|
||||||
logger.debug(six.text_type("Skipping group {}, remaining_capacity {} <= 0").format(
|
logger.debug(six.text_type("Skipping group {}, remaining_capacity {} <= 0").format(
|
||||||
rampart_group.name, remaining_capacity))
|
rampart_group.name, remaining_capacity))
|
||||||
continue
|
continue
|
||||||
if not self.would_exceed_capacity(task, rampart_group.name):
|
|
||||||
logger.debug(six.text_type("Starting {} in group {} (remaining_capacity={})").format(
|
execution_instance = rampart_group.fit_task_to_most_remaining_capacity_instance(task)
|
||||||
task.log_format, rampart_group.name, remaining_capacity))
|
if execution_instance:
|
||||||
|
logger.debug(six.text_type("Starting {} in group {} instance {} (remaining_capacity={})").format(
|
||||||
|
task.log_format, rampart_group.name, execution_instance.hostname, remaining_capacity))
|
||||||
|
elif not execution_instance and idle_instance_that_fits:
|
||||||
|
execution_instance = idle_instance_that_fits
|
||||||
|
logger.debug(six.text_type("Starting {} in group {} instance {} (remaining_capacity={})").format(
|
||||||
|
task.log_format, rampart_group.name, execution_instance.hostname, remaining_capacity))
|
||||||
|
if execution_instance:
|
||||||
self.graph[rampart_group.name]['graph'].add_job(task)
|
self.graph[rampart_group.name]['graph'].add_job(task)
|
||||||
self.start_task(task, rampart_group, task.get_jobs_fail_chain())
|
self.start_task(task, rampart_group, task.get_jobs_fail_chain(), execution_instance)
|
||||||
found_acceptable_queue = True
|
found_acceptable_queue = True
|
||||||
break
|
break
|
||||||
else:
|
else:
|
||||||
logger.debug(six.text_type("Not enough capacity to run {} on {} (remaining_capacity={})").format(
|
logger.debug(six.text_type("No instance available in group {} to run job {} w/ capacity requirement {}").format(
|
||||||
task.log_format, rampart_group.name, remaining_capacity))
|
rampart_group.name, task.log_format, task.task_impact))
|
||||||
if not found_acceptable_queue:
|
if not found_acceptable_queue:
|
||||||
logger.debug(six.text_type("{} couldn't be scheduled on graph, waiting for next cycle").format(task.log_format))
|
logger.debug(six.text_type("{} couldn't be scheduled on graph, waiting for next cycle").format(task.log_format))
|
||||||
|
|
||||||
|
@ -872,10 +872,12 @@ class BaseTask(Task):
|
|||||||
'''
|
'''
|
||||||
Run the job/task and capture its output.
|
Run the job/task and capture its output.
|
||||||
'''
|
'''
|
||||||
|
'''
|
||||||
execution_node = settings.CLUSTER_HOST_ID
|
execution_node = settings.CLUSTER_HOST_ID
|
||||||
if isolated_host is not None:
|
if isolated_host is not None:
|
||||||
execution_node = isolated_host
|
execution_node = isolated_host
|
||||||
instance = self.update_model(pk, status='running', execution_node=execution_node,
|
'''
|
||||||
|
instance = self.update_model(pk, status='running',
|
||||||
start_args='') # blank field to remove encrypted passwords
|
start_args='') # blank field to remove encrypted passwords
|
||||||
|
|
||||||
instance.websocket_emit_status("running")
|
instance.websocket_emit_status("running")
|
||||||
|
Loading…
Reference in New Issue
Block a user