mirror of
https://github.com/ansible/awx.git
synced 2024-11-01 08:21:15 +03:00
always chain failures
* When inv and proj updates trigger from a JT run, if either update fails then the job template should get marked failed. Before this commit, the job template would get marked failed ONLY if there was enough capacity to run all the associated updates within the same schedule() call. If, instead, the associated updates were ran in another schedule() call, the failure chain was lost. This changeset fixes that by saving the necessary data in the dependent_jobs relationship so that the failure is always chained.
This commit is contained in:
parent
0d8f14647f
commit
cc7c2957cf
@ -353,6 +353,10 @@ class UnifiedJobTypeStringMixin(object):
|
||||
def _underscore_to_camel(cls, word):
|
||||
return ''.join(x.capitalize() or '_' for x in word.split('_'))
|
||||
|
||||
@classmethod
|
||||
def _camel_to_underscore(cls, word):
|
||||
return re.sub('(?!^)([A-Z]+)', r'_\1', word).lower()
|
||||
|
||||
@classmethod
|
||||
def _model_type(cls, job_type):
|
||||
# Django >= 1.9
|
||||
@ -371,6 +375,9 @@ class UnifiedJobTypeStringMixin(object):
|
||||
return None
|
||||
return model.objects.get(id=job_id)
|
||||
|
||||
def model_to_str(self):
|
||||
return UnifiedJobTypeStringMixin._camel_to_underscore(self.__class__.__name__)
|
||||
|
||||
|
||||
class UnifiedJob(PolymorphicModel, PasswordFieldsModel, CommonModelNameNotUnique, UnifiedJobTypeStringMixin):
|
||||
'''
|
||||
|
@ -166,6 +166,9 @@ class TaskManager():
|
||||
|
||||
return (active_task_queues, active_tasks)
|
||||
|
||||
def get_dependent_jobs_for_inv_and_proj_update(self, job_obj):
|
||||
return [{'type': j.model_to_str(), 'id': j.id} for j in job_obj.dependent_jobs.all()]
|
||||
|
||||
def start_task(self, task, dependent_tasks=[]):
|
||||
from awx.main.tasks import handle_work_error, handle_work_success
|
||||
|
||||
@ -179,6 +182,17 @@ class TaskManager():
|
||||
success_handler = handle_work_success.s(task_actual=task_actual)
|
||||
|
||||
job_obj = task.get_full()
|
||||
'''
|
||||
This is to account for when there isn't enough capacity to execute all
|
||||
dependent jobs (i.e. proj or inv update) within the same schedule()
|
||||
call.
|
||||
|
||||
Proceeding calls to schedule() need to recontruct the proj or inv
|
||||
update -> job fail logic dependency. The below call recontructs that
|
||||
failure dependency.
|
||||
'''
|
||||
if len(dependencies) == 0:
|
||||
dependencies = self.get_dependent_jobs_for_inv_and_proj_update(job_obj)
|
||||
job_obj.status = 'waiting'
|
||||
|
||||
(start_status, opts) = job_obj.pre_start()
|
||||
@ -230,10 +244,32 @@ class TaskManager():
|
||||
|
||||
return inventory_task
|
||||
|
||||
'''
|
||||
Since we are dealing with partial objects we don't get to take advantage
|
||||
of Django to resolve the type of related Many to Many field dependent_job.
|
||||
|
||||
Hence the, potentional, double query in this method.
|
||||
'''
|
||||
def get_related_dependent_jobs_as_patials(self, job_ids):
|
||||
dependent_partial_jobs = []
|
||||
for id in job_ids:
|
||||
if ProjectUpdate.objects.filter(id=id).exists():
|
||||
dependent_partial_jobs.append(ProjectUpdateDict({"id": id}).refresh_partial())
|
||||
elif InventoryUpdate.objects.filter(id=id).exists():
|
||||
dependent_partial_jobs.append(InventoryUpdateDict({"id": id}).refresh_partial())
|
||||
return dependent_partial_jobs
|
||||
|
||||
def capture_chain_failure_dependencies(self, task, dependencies):
|
||||
for dep in dependencies:
|
||||
dep_obj = task.get_full()
|
||||
dep_obj.dependent_jobs.add(task['id'])
|
||||
dep_obj.save()
|
||||
|
||||
def generate_dependencies(self, task):
|
||||
dependencies = []
|
||||
# TODO: What if the project is null ?
|
||||
if type(task) is JobDict:
|
||||
|
||||
if task['project__scm_update_on_launch'] is True and \
|
||||
self.graph.should_update_related_project(task):
|
||||
project_task = self.create_project_update(task)
|
||||
@ -248,6 +284,8 @@ class TaskManager():
|
||||
if self.graph.should_update_related_inventory_source(task, inventory_source_task['id']):
|
||||
inventory_task = self.create_inventory_update(task, inventory_source_task)
|
||||
dependencies.append(inventory_task)
|
||||
|
||||
self.capture_chain_failure_dependencies(task, dependencies)
|
||||
return dependencies
|
||||
|
||||
def process_latest_project_updates(self, latest_project_updates):
|
||||
|
@ -1,6 +1,7 @@
|
||||
|
||||
# Python
|
||||
import json
|
||||
import itertools
|
||||
|
||||
# AWX
|
||||
from awx.main.utils import decrypt_field_value
|
||||
@ -61,13 +62,36 @@ class PartialModelDict(object):
|
||||
def task_impact(self):
|
||||
raise RuntimeError("Inherit and implement me")
|
||||
|
||||
@classmethod
|
||||
def merge_values(cls, values):
|
||||
grouped_results = itertools.groupby(values, key=lambda value: value['id'])
|
||||
|
||||
merged_values = []
|
||||
for k, g in grouped_results:
|
||||
print k
|
||||
groups = list(g)
|
||||
merged_value = {}
|
||||
for group in groups:
|
||||
for key, val in group.iteritems():
|
||||
if not merged_value.get(key):
|
||||
merged_value[key] = val
|
||||
elif val != merged_value[key]:
|
||||
if isinstance(merged_value[key], list):
|
||||
if val not in merged_value[key]:
|
||||
merged_value[key].append(val)
|
||||
else:
|
||||
old_val = merged_value[key]
|
||||
merged_value[key] = [old_val, val]
|
||||
merged_values.append(merged_value)
|
||||
return merged_values
|
||||
|
||||
|
||||
class JobDict(PartialModelDict):
|
||||
FIELDS = (
|
||||
'id', 'status', 'job_template_id', 'inventory_id', 'project_id',
|
||||
'launch_type', 'limit', 'allow_simultaneous', 'created',
|
||||
'job_type', 'celery_task_id', 'project__scm_update_on_launch',
|
||||
'forks', 'start_args',
|
||||
'forks', 'start_args', 'dependent_jobs__id',
|
||||
)
|
||||
model = Job
|
||||
|
||||
@ -85,6 +109,14 @@ class JobDict(PartialModelDict):
|
||||
start_args = start_args or {}
|
||||
return start_args.get('inventory_sources_already_updated', [])
|
||||
|
||||
@classmethod
|
||||
def filter_partial(cls, status=[]):
|
||||
kv = {
|
||||
'status__in': status
|
||||
}
|
||||
merged = PartialModelDict.merge_values(cls.model.objects.filter(**kv).values(*cls.get_db_values()))
|
||||
return [cls(o) for o in merged]
|
||||
|
||||
|
||||
class ProjectUpdateDict(PartialModelDict):
|
||||
FIELDS = (
|
||||
@ -134,7 +166,8 @@ class InventoryUpdateDict(PartialModelDict):
|
||||
#'inventory_source__update_on_launch',
|
||||
#'inventory_source__update_cache_timeout',
|
||||
FIELDS = (
|
||||
'id', 'status', 'created', 'celery_task_id', 'inventory_source_id', 'inventory_source__inventory_id',
|
||||
'id', 'status', 'created', 'celery_task_id', 'inventory_source_id',
|
||||
'inventory_source__inventory_id',
|
||||
)
|
||||
model = InventoryUpdate
|
||||
|
||||
@ -217,7 +250,7 @@ class SystemJobDict(PartialModelDict):
|
||||
|
||||
class AdHocCommandDict(PartialModelDict):
|
||||
FIELDS = (
|
||||
'id', 'created', 'status', 'inventory_id',
|
||||
'id', 'created', 'status', 'inventory_id', 'dependent_jobs__id',
|
||||
)
|
||||
model = AdHocCommand
|
||||
|
||||
|
@ -36,6 +36,7 @@ def scheduler_factory(mocker, epoch):
|
||||
def no_create_project_update(task):
|
||||
raise RuntimeError("create_project_update should not be called")
|
||||
|
||||
mocker.patch.object(sched, 'capture_chain_failure_dependencies')
|
||||
mocker.patch.object(sched, 'get_tasks', return_value=tasks)
|
||||
mocker.patch.object(sched, 'get_running_workflow_jobs', return_value=[])
|
||||
mocker.patch.object(sched, 'get_inventory_source_tasks', return_value=inventory_sources)
|
||||
|
@ -87,3 +87,21 @@ class TestCreateDependentInventoryUpdate():
|
||||
scheduler._schedule()
|
||||
|
||||
scheduler.start_task.assert_called_with(waiting_inventory_update, [pending_job])
|
||||
|
||||
|
||||
class TestCaptureChainFailureDependencies():
|
||||
@pytest.fixture
|
||||
def inventory_id_sources(self, inventory_source_factory):
|
||||
return [
|
||||
(1, [inventory_source_factory(id=1)]),
|
||||
]
|
||||
|
||||
def test(self, scheduler_factory, pending_job, waiting_inventory_update, inventory_id_sources):
|
||||
scheduler = scheduler_factory(tasks=[pending_job],
|
||||
create_inventory_update=waiting_inventory_update,
|
||||
inventory_sources=inventory_id_sources)
|
||||
|
||||
scheduler._schedule()
|
||||
|
||||
scheduler.capture_chain_failure_dependencies.assert_called_with(pending_job, [waiting_inventory_update])
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user