1
0
mirror of https://github.com/ansible/awx.git synced 2024-11-01 08:21:15 +03:00

Merge pull request #4111 from jangsutsr/4086_provide_linkage_from_spawned_job_to_wfj

Provide linkage from spawned job to wfj
This commit is contained in:
Aaron Tan 2016-12-01 21:59:47 -05:00 committed by GitHub
commit 7af675b031
5 changed files with 26 additions and 12 deletions

View File

@ -599,8 +599,22 @@ class UnifiedJobSerializer(BaseSerializer):
res['stdout'] = reverse('api:job_stdout', args=(obj.pk,)) res['stdout'] = reverse('api:job_stdout', args=(obj.pk,))
elif isinstance(obj, AdHocCommand): elif isinstance(obj, AdHocCommand):
res['stdout'] = reverse('api:ad_hoc_command_stdout', args=(obj.pk,)) res['stdout'] = reverse('api:ad_hoc_command_stdout', args=(obj.pk,))
if obj.workflow_job_id:
res['source_workflow_job'] = reverse('api:workflow_job_detail', args=(obj.workflow_job_id,))
return res return res
def get_summary_fields(self, obj):
summary_fields = super(UnifiedJobSerializer, self).get_summary_fields(obj)
if obj.spawned_by_workflow:
summary_fields['source_workflow_job'] = {}
summary_obj = obj.unified_job_node.workflow_job
for field in SUMMARIZABLE_FK_FIELDS['job']:
val = getattr(summary_obj, field, None)
if val is not None:
summary_fields['source_workflow_job'][field] = val
return summary_fields
def to_representation(self, obj): def to_representation(self, obj):
serializer_class = None serializer_class = None
if type(self) is UnifiedJobSerializer: if type(self) is UnifiedJobSerializer:

View File

@ -1865,6 +1865,7 @@ class UnifiedJobAccess(BaseAccess):
qs = qs.select_related( qs = qs.select_related(
'created_by', 'created_by',
'modified_by', 'modified_by',
'unified_job_node__workflow_job',
) )
qs = qs.prefetch_related( qs = qs.prefetch_related(
'unified_job_template', 'unified_job_template',

View File

@ -520,7 +520,6 @@ class UnifiedJob(PolymorphicModel, PasswordFieldsModel, CommonModelNameNotUnique
related_name='%(class)s_labels' related_name='%(class)s_labels'
) )
def get_absolute_url(self): def get_absolute_url(self):
real_instance = self.get_real_instance() real_instance = self.get_real_instance()
if real_instance != self: if real_instance != self:

View File

@ -18,8 +18,8 @@ from awx.main.scheduler.dag_workflow import WorkflowDAG
from awx.main.scheduler.dependency_graph import DependencyGraph from awx.main.scheduler.dependency_graph import DependencyGraph
from awx.main.scheduler.partial import ( from awx.main.scheduler.partial import (
JobDict, JobDict,
ProjectUpdateDict, ProjectUpdateDict,
ProjectUpdateLatestDict, ProjectUpdateLatestDict,
InventoryUpdateDict, InventoryUpdateDict,
InventoryUpdateLatestDict, InventoryUpdateLatestDict,
@ -103,7 +103,7 @@ class TaskManager():
for task in all_sorted_tasks: for task in all_sorted_tasks:
if type(task) is JobDict: if type(task) is JobDict:
inventory_ids.add(task['inventory_id']) inventory_ids.add(task['inventory_id'])
for inventory_id in inventory_ids: for inventory_id in inventory_ids:
results.append((inventory_id, InventorySourceDict.filter_partial(inventory_id))) results.append((inventory_id, InventorySourceDict.filter_partial(inventory_id)))
@ -174,10 +174,10 @@ class TaskManager():
'id': task['id'], 'id': task['id'],
} }
dependencies = [{'type': t.get_job_type_str(), 'id': t['id']} for t in dependent_tasks] dependencies = [{'type': t.get_job_type_str(), 'id': t['id']} for t in dependent_tasks]
error_handler = handle_work_error.s(subtasks=[task_actual] + dependencies) error_handler = handle_work_error.s(subtasks=[task_actual] + dependencies)
success_handler = handle_work_success.s(task_actual=task_actual) success_handler = handle_work_success.s(task_actual=task_actual)
job_obj = task.get_full() job_obj = task.get_full()
job_obj.status = 'waiting' job_obj.status = 'waiting'
@ -201,7 +201,7 @@ class TaskManager():
job_obj.websocket_emit_status(job_obj.status) job_obj.websocket_emit_status(job_obj.status)
if job_obj.status != 'failed': if job_obj.status != 'failed':
job_obj.start_celery_task(opts, error_callback=error_handler, success_callback=success_handler) job_obj.start_celery_task(opts, error_callback=error_handler, success_callback=success_handler)
connection.on_commit(post_commit) connection.on_commit(post_commit)
def process_runnable_tasks(self, runnable_tasks): def process_runnable_tasks(self, runnable_tasks):
@ -279,7 +279,7 @@ class TaskManager():
if not self.graph.is_job_blocked(task): if not self.graph.is_job_blocked(task):
dependencies = self.generate_dependencies(task) dependencies = self.generate_dependencies(task)
self.process_dependencies(task, dependencies) self.process_dependencies(task, dependencies)
# Spawning deps might have blocked us # Spawning deps might have blocked us
if not self.graph.is_job_blocked(task): if not self.graph.is_job_blocked(task):
self.graph.add_job(task) self.graph.add_job(task)
@ -295,7 +295,7 @@ class TaskManager():
for task in all_running_sorted_tasks: for task in all_running_sorted_tasks:
if (task['celery_task_id'] not in active_tasks and not hasattr(settings, 'IGNORE_CELERY_INSPECTOR')): if (task['celery_task_id'] not in active_tasks and not hasattr(settings, 'IGNORE_CELERY_INSPECTOR')):
# NOTE: Pull status again and make sure it didn't finish in # NOTE: Pull status again and make sure it didn't finish in
# the meantime? # the meantime?
# TODO: try catch the getting of the job. The job COULD have been deleted # TODO: try catch the getting of the job. The job COULD have been deleted
task_obj = task.get_full() task_obj = task.get_full()
@ -346,7 +346,7 @@ class TaskManager():
latest_inventory_updates = self.get_latest_inventory_update_tasks(all_sorted_tasks) latest_inventory_updates = self.get_latest_inventory_update_tasks(all_sorted_tasks)
self.process_latest_inventory_updates(latest_inventory_updates) self.process_latest_inventory_updates(latest_inventory_updates)
inventory_id_sources = self.get_inventory_source_tasks(all_sorted_tasks) inventory_id_sources = self.get_inventory_source_tasks(all_sorted_tasks)
self.process_inventory_sources(inventory_id_sources) self.process_inventory_sources(inventory_id_sources)
@ -371,4 +371,3 @@ class TaskManager():
# Operations whose queries rely on modifications made during the atomic scheduling session # Operations whose queries rely on modifications made during the atomic scheduling session
for wfj in WorkflowJob.objects.filter(id__in=finished_wfjs): for wfj in WorkflowJob.objects.filter(id__in=finished_wfjs):
_send_notification_templates(wfj, 'succeeded' if wfj.status == 'successful' else 'failed') _send_notification_templates(wfj, 'succeeded' if wfj.status == 'successful' else 'failed')

View File

@ -34,7 +34,8 @@ def project_update(mocker):
@pytest.fixture @pytest.fixture
def job(mocker, job_template, project_update): def job(mocker, job_template, project_update):
return mocker.MagicMock(pk=5, job_template=job_template, project_update=project_update) return mocker.MagicMock(pk=5, job_template=job_template, project_update=project_update,
workflow_job_id=None)
@pytest.fixture @pytest.fixture