mirror of
https://github.com/ansible/awx.git
synced 2024-10-31 06:51:10 +03:00
run dependencies when capacity is available
This commit is contained in:
parent
0f98e1edec
commit
ed37e68c53
@ -157,8 +157,6 @@ class Scheduler():
|
|||||||
def start_task(self, task, dependent_tasks=[]):
|
def start_task(self, task, dependent_tasks=[]):
|
||||||
from awx.main.tasks import handle_work_error, handle_work_success
|
from awx.main.tasks import handle_work_error, handle_work_success
|
||||||
|
|
||||||
status_changed = False
|
|
||||||
|
|
||||||
task_actual = {
|
task_actual = {
|
||||||
'type':task.get_job_type_str(),
|
'type':task.get_job_type_str(),
|
||||||
'id': task['id'],
|
'id': task['id'],
|
||||||
@ -169,13 +167,10 @@ class Scheduler():
|
|||||||
success_handler = handle_work_success.s(task_actual=task_actual)
|
success_handler = handle_work_success.s(task_actual=task_actual)
|
||||||
|
|
||||||
job_obj = task.get_full()
|
job_obj = task.get_full()
|
||||||
if job_obj.status == 'pending':
|
|
||||||
status_changed = True
|
|
||||||
job_obj.status = 'waiting'
|
job_obj.status = 'waiting'
|
||||||
|
|
||||||
(start_status, opts) = job_obj.pre_start()
|
(start_status, opts) = job_obj.pre_start()
|
||||||
if not start_status:
|
if not start_status:
|
||||||
status_changed = True
|
|
||||||
job_obj.status = 'failed'
|
job_obj.status = 'failed'
|
||||||
if job_obj.job_explanation:
|
if job_obj.job_explanation:
|
||||||
job_obj.job_explanation += ' '
|
job_obj.job_explanation += ' '
|
||||||
@ -185,15 +180,12 @@ class Scheduler():
|
|||||||
else:
|
else:
|
||||||
if type(job_obj) is WorkflowJob:
|
if type(job_obj) is WorkflowJob:
|
||||||
job_obj.status = 'running'
|
job_obj.status = 'running'
|
||||||
status_changed = True
|
|
||||||
|
|
||||||
if status_changed is True:
|
|
||||||
job_obj.save()
|
job_obj.save()
|
||||||
|
|
||||||
self.consume_capacity(task)
|
self.consume_capacity(task)
|
||||||
|
|
||||||
def post_commit():
|
def post_commit():
|
||||||
if status_changed:
|
|
||||||
job_obj.websocket_emit_status(job_obj.status)
|
job_obj.websocket_emit_status(job_obj.status)
|
||||||
if job_obj.status != 'failed':
|
if job_obj.status != 'failed':
|
||||||
job_obj.start_celery_task(opts, error_callback=error_handler, success_callback=success_handler)
|
job_obj.start_celery_task(opts, error_callback=error_handler, success_callback=success_handler)
|
||||||
@ -201,17 +193,14 @@ class Scheduler():
|
|||||||
connection.on_commit(post_commit)
|
connection.on_commit(post_commit)
|
||||||
|
|
||||||
def process_runnable_tasks(self, runnable_tasks):
|
def process_runnable_tasks(self, runnable_tasks):
|
||||||
for i, task in enumerate(runnable_tasks):
|
map(lambda task: self.graph.add_job(task), runnable_tasks)
|
||||||
# TODO: maybe batch process new tasks.
|
|
||||||
# Processing a new task individually seems to be expensive
|
|
||||||
self.graph.add_job(task)
|
|
||||||
|
|
||||||
def create_project_update(self, task):
|
def create_project_update(self, task):
|
||||||
dep = Project.objects.get(id=task['project_id']).create_project_update(launch_type='dependency')
|
dep = Project.objects.get(id=task['project_id']).create_project_update(launch_type='dependency')
|
||||||
|
|
||||||
# Project created 1 seconds behind
|
# Project created 1 seconds behind
|
||||||
dep.created = task['created'] - timedelta(seconds=1)
|
dep.created = task['created'] - timedelta(seconds=1)
|
||||||
dep.status = 'waiting'
|
dep.status = 'pending'
|
||||||
dep.save()
|
dep.save()
|
||||||
|
|
||||||
project_task = ProjectUpdateDict.get_partial(dep.id)
|
project_task = ProjectUpdateDict.get_partial(dep.id)
|
||||||
@ -222,7 +211,7 @@ class Scheduler():
|
|||||||
dep = InventorySource.objects.get(id=inventory_source_task['id']).create_inventory_update(launch_type='dependency')
|
dep = InventorySource.objects.get(id=inventory_source_task['id']).create_inventory_update(launch_type='dependency')
|
||||||
|
|
||||||
dep.created = task['created'] - timedelta(seconds=2)
|
dep.created = task['created'] - timedelta(seconds=2)
|
||||||
dep.status = 'waiting'
|
dep.status = 'pending'
|
||||||
dep.save()
|
dep.save()
|
||||||
|
|
||||||
inventory_task = InventoryUpdateDict.get_partial(dep.id)
|
inventory_task = InventoryUpdateDict.get_partial(dep.id)
|
||||||
@ -267,6 +256,9 @@ class Scheduler():
|
|||||||
|
|
||||||
def process_pending_tasks(self, pending_tasks):
|
def process_pending_tasks(self, pending_tasks):
|
||||||
for task in pending_tasks:
|
for task in pending_tasks:
|
||||||
|
# Stop processing tasks if we know we are out of capacity
|
||||||
|
if self.get_remaining_capacity() <= 0:
|
||||||
|
return
|
||||||
|
|
||||||
if not self.graph.is_job_blocked(task):
|
if not self.graph.is_job_blocked(task):
|
||||||
dependencies = self.generate_dependencies(task)
|
dependencies = self.generate_dependencies(task)
|
||||||
@ -280,10 +272,6 @@ class Scheduler():
|
|||||||
else:
|
else:
|
||||||
self.graph.add_job(task)
|
self.graph.add_job(task)
|
||||||
|
|
||||||
# Stop processing tasks if we know we are out of capacity
|
|
||||||
if self.get_remaining_capacity() <= 0:
|
|
||||||
return
|
|
||||||
|
|
||||||
def process_celery_tasks(self, active_tasks, all_running_sorted_tasks):
|
def process_celery_tasks(self, active_tasks, all_running_sorted_tasks):
|
||||||
'''
|
'''
|
||||||
Rectify tower db <-> celery inconsistent view of jobs state
|
Rectify tower db <-> celery inconsistent view of jobs state
|
||||||
@ -330,7 +318,7 @@ class Scheduler():
|
|||||||
|
|
||||||
self.process_runnable_tasks(runnable_tasks)
|
self.process_runnable_tasks(runnable_tasks)
|
||||||
|
|
||||||
pending_tasks = filter(lambda t: t['status'] == 'pending', all_sorted_tasks)
|
pending_tasks = filter(lambda t: t['status'] in 'pending', all_sorted_tasks)
|
||||||
self.process_pending_tasks(pending_tasks)
|
self.process_pending_tasks(pending_tasks)
|
||||||
|
|
||||||
def _schedule(self):
|
def _schedule(self):
|
||||||
|
Loading…
Reference in New Issue
Block a user