mirror of
https://github.com/ansible/awx.git
synced 2024-10-31 06:51:10 +03:00
Merge pull request #382 from jangsutsr/fix-264
Implement workflow job failure
This commit is contained in:
commit
bcd2a8f211
@ -468,9 +468,6 @@ class WorkflowJob(UnifiedJob, WorkflowJobOptions, SurveyJobMixin, JobNotificatio
|
||||
def _get_unified_job_template_class(cls):
|
||||
return WorkflowJobTemplate
|
||||
|
||||
def _has_failed(self):
|
||||
return False
|
||||
|
||||
def socketio_emit_data(self):
|
||||
return {}
|
||||
|
||||
|
@ -62,6 +62,7 @@ class WorkflowDAG(SimpleDAG):
|
||||
def is_workflow_done(self):
|
||||
root_nodes = self.get_root_nodes()
|
||||
nodes = root_nodes
|
||||
is_failed = False
|
||||
|
||||
for index, n in enumerate(nodes):
|
||||
obj = n['node_object']
|
||||
@ -69,24 +70,29 @@ class WorkflowDAG(SimpleDAG):
|
||||
|
||||
if obj.unified_job_template is None:
|
||||
continue
|
||||
if not job:
|
||||
return False
|
||||
# Job is about to run or is running. Hold our horses and wait for
|
||||
# the job to finish. We can't proceed down the graph path until we
|
||||
# have the job result.
|
||||
elif job.status in ['canceled', 'error']:
|
||||
continue
|
||||
elif job.status not in ['failed', 'successful']:
|
||||
return False
|
||||
elif job.status == 'failed':
|
||||
children_failed = self.get_dependencies(obj, 'failure_nodes')
|
||||
children_always = self.get_dependencies(obj, 'always_nodes')
|
||||
children_all = children_failed + children_always
|
||||
nodes.extend(children_all)
|
||||
elif job.status == 'successful':
|
||||
children_success = self.get_dependencies(obj, 'success_nodes')
|
||||
children_always = self.get_dependencies(obj, 'always_nodes')
|
||||
children_all = children_success + children_always
|
||||
nodes.extend(children_all)
|
||||
return True
|
||||
elif not job:
|
||||
return False, False
|
||||
|
||||
children_success = self.get_dependencies(obj, 'success_nodes')
|
||||
children_failed = self.get_dependencies(obj, 'failure_nodes')
|
||||
children_always = self.get_dependencies(obj, 'always_nodes')
|
||||
if not is_failed and job.status != 'successful':
|
||||
children_all = children_success + children_failed + children_always
|
||||
for child in children_all:
|
||||
if child['node_object'].job:
|
||||
break
|
||||
else:
|
||||
is_failed = True if children_all else job.status in ['failed', 'canceled', 'error']
|
||||
|
||||
if job.status in ['canceled', 'error']:
|
||||
continue
|
||||
elif job.status == 'failed':
|
||||
nodes.extend(children_failed + children_always)
|
||||
elif job.status == 'successful':
|
||||
nodes.extend(children_success + children_always)
|
||||
else:
|
||||
# Job is about to run or is running. Hold our horses and wait for
|
||||
# the job to finish. We can't proceed down the graph path until we
|
||||
# have the job result.
|
||||
return False, False
|
||||
return True, is_failed
|
||||
|
@ -219,12 +219,12 @@ class TaskManager():
|
||||
workflow_job.save()
|
||||
dag.cancel_node_jobs()
|
||||
connection.on_commit(lambda: workflow_job.websocket_emit_status(workflow_job.status))
|
||||
elif dag.is_workflow_done():
|
||||
else:
|
||||
is_done, has_failed = dag.is_workflow_done()
|
||||
if not is_done:
|
||||
continue
|
||||
result.append(workflow_job.id)
|
||||
if workflow_job._has_failed():
|
||||
workflow_job.status = 'failed'
|
||||
else:
|
||||
workflow_job.status = 'successful'
|
||||
workflow_job.status = 'failed' if has_failed else 'successful'
|
||||
workflow_job.save()
|
||||
connection.on_commit(lambda: workflow_job.websocket_emit_status(workflow_job.status))
|
||||
return result
|
||||
@ -363,7 +363,7 @@ class TaskManager():
|
||||
return False
|
||||
|
||||
'''
|
||||
If the latest project update has a created time == job_created_time-1
|
||||
If the latest project update has a created time == job_created_time-1
|
||||
then consider the project update found. This is so we don't enter an infinite loop
|
||||
of updating the project when cache timeout is 0.
|
||||
'''
|
||||
@ -515,7 +515,7 @@ class TaskManager():
|
||||
return None
|
||||
|
||||
'''
|
||||
Only consider failing tasks on instances for which we obtained a task
|
||||
Only consider failing tasks on instances for which we obtained a task
|
||||
list from celery for.
|
||||
'''
|
||||
running_tasks, waiting_tasks = self.get_running_tasks()
|
||||
|
@ -4,7 +4,7 @@ import pytest
|
||||
|
||||
# AWX
|
||||
from awx.main.models.workflow import WorkflowJob, WorkflowJobNode, WorkflowJobTemplateNode, WorkflowJobTemplate
|
||||
from awx.main.models.jobs import Job
|
||||
from awx.main.models.jobs import JobTemplate, Job
|
||||
from awx.main.models.projects import ProjectUpdate
|
||||
from awx.main.scheduler.dag_workflow import WorkflowDAG
|
||||
|
||||
@ -15,9 +15,28 @@ from django.core.exceptions import ValidationError
|
||||
|
||||
@pytest.mark.django_db
|
||||
class TestWorkflowDAGFunctional(TransactionTestCase):
|
||||
def workflow_job(self):
|
||||
def workflow_job(self, states=['new', 'new', 'new', 'new', 'new']):
|
||||
"""
|
||||
Workflow topology:
|
||||
node[0]
|
||||
/\
|
||||
s/ \f
|
||||
/ \
|
||||
node[1] node[3]
|
||||
/ \
|
||||
s/ \f
|
||||
/ \
|
||||
node[2] node[4]
|
||||
"""
|
||||
wfj = WorkflowJob.objects.create()
|
||||
nodes = [WorkflowJobNode.objects.create(workflow_job=wfj) for i in range(0, 5)]
|
||||
jt = JobTemplate.objects.create(name='test-jt')
|
||||
nodes = [WorkflowJobNode.objects.create(workflow_job=wfj, unified_job_template=jt) for i in range(0, 5)]
|
||||
for node, state in zip(nodes, states):
|
||||
if state:
|
||||
node.job = jt.create_job()
|
||||
node.job.status = state
|
||||
node.job.save()
|
||||
node.save()
|
||||
nodes[0].success_nodes.add(nodes[1])
|
||||
nodes[1].success_nodes.add(nodes[2])
|
||||
nodes[0].failure_nodes.add(nodes[3])
|
||||
@ -35,6 +54,41 @@ class TestWorkflowDAGFunctional(TransactionTestCase):
|
||||
with self.assertNumQueries(4):
|
||||
dag._init_graph(wfj)
|
||||
|
||||
def test_workflow_done(self):
|
||||
wfj = self.workflow_job(states=['failed', None, None, 'successful', None])
|
||||
dag = WorkflowDAG(workflow_job=wfj)
|
||||
is_done, has_failed = dag.is_workflow_done()
|
||||
self.assertTrue(is_done)
|
||||
self.assertFalse(has_failed)
|
||||
|
||||
def test_workflow_fails_for_unfinished_node(self):
|
||||
wfj = self.workflow_job(states=['error', None, None, None, None])
|
||||
dag = WorkflowDAG(workflow_job=wfj)
|
||||
is_done, has_failed = dag.is_workflow_done()
|
||||
self.assertTrue(is_done)
|
||||
self.assertTrue(has_failed)
|
||||
|
||||
def test_workflow_fails_for_no_error_handler(self):
|
||||
wfj = self.workflow_job(states=['successful', 'failed', None, None, None])
|
||||
dag = WorkflowDAG(workflow_job=wfj)
|
||||
is_done, has_failed = dag.is_workflow_done()
|
||||
self.assertTrue(is_done)
|
||||
self.assertTrue(has_failed)
|
||||
|
||||
def test_workflow_fails_leaf(self):
|
||||
wfj = self.workflow_job(states=['successful', 'successful', 'failed', None, None])
|
||||
dag = WorkflowDAG(workflow_job=wfj)
|
||||
is_done, has_failed = dag.is_workflow_done()
|
||||
self.assertTrue(is_done)
|
||||
self.assertTrue(has_failed)
|
||||
|
||||
def test_workflow_not_finished(self):
|
||||
wfj = self.workflow_job(states=['new', None, None, None, None])
|
||||
dag = WorkflowDAG(workflow_job=wfj)
|
||||
is_done, has_failed = dag.is_workflow_done()
|
||||
self.assertFalse(is_done)
|
||||
self.assertFalse(has_failed)
|
||||
|
||||
|
||||
@pytest.mark.django_db
|
||||
class TestWorkflowJob:
|
||||
@ -164,37 +218,3 @@ class TestWorkflowJobTemplate:
|
||||
wfjt2.validate_unique()
|
||||
wfjt2 = WorkflowJobTemplate(name='foo', organization=None)
|
||||
wfjt2.validate_unique()
|
||||
|
||||
|
||||
@pytest.mark.django_db
|
||||
class TestWorkflowJobFailure:
|
||||
"""
|
||||
Tests to re-implement if workflow failure status is introduced in
|
||||
a future Tower version.
|
||||
"""
|
||||
@pytest.fixture
|
||||
def wfj(self):
|
||||
return WorkflowJob.objects.create(name='test-wf-job')
|
||||
|
||||
def test_workflow_not_failed_unran_job(self, wfj):
|
||||
"""
|
||||
Test that an un-ran node will not mark workflow job as failed
|
||||
"""
|
||||
WorkflowJobNode.objects.create(workflow_job=wfj)
|
||||
assert not wfj._has_failed()
|
||||
|
||||
def test_workflow_not_failed_successful_job(self, wfj):
|
||||
"""
|
||||
Test that a sucessful node will not mark workflow job as failed
|
||||
"""
|
||||
job = Job.objects.create(name='test-job', status='successful')
|
||||
WorkflowJobNode.objects.create(workflow_job=wfj, job=job)
|
||||
assert not wfj._has_failed()
|
||||
|
||||
def test_workflow_not_failed_failed_job_but_okay(self, wfj):
|
||||
"""
|
||||
Test that a failed node will not mark workflow job as failed
|
||||
"""
|
||||
job = Job.objects.create(name='test-job', status='failed')
|
||||
WorkflowJobNode.objects.create(workflow_job=wfj, job=job)
|
||||
assert not wfj._has_failed()
|
||||
|
@ -48,6 +48,10 @@ Workflow job summary:
|
||||
|
||||
Starting from Tower 3.2, Workflow jobs support simultaneous job runs just like that of ordinary jobs. It is controlled by `allow_simultaneous` field of underlying workflow job template. By default, simultaneous workflow job runs are disabled and users should be prudent in enabling this functionality. Because the performance boost of simultaneous workflow runs will only manifest when a large portion of jobs contained by a workflow allow simultaneous runs. Otherwise it is expected to have some long-running workflow jobs since its spawned jobs can be in pending state for a long time.
|
||||
|
||||
Before Tower 3.3, the 'failed' status of workflow job is not defined. Starting from 3.3 we define a finished workflow job to fail, if at least one of the conditions below satisfies:
|
||||
* At least one node runs into states `canceled` or `error`.
|
||||
* At least one leaf node runs into states `failed`, but no child node is spawned to run (no error handler).
|
||||
|
||||
### Workflow Copy and Relaunch
|
||||
Other than the normal way of creating workflow job templates, it is also possible to copy existing workflow job templates. The resulting new workflow job template will be mostly identical to the original, except for `name` field which will be appended a text to indicate it's a copy.
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user