1
0
mirror of https://github.com/ansible/awx.git synced 2024-11-01 08:21:15 +03:00

Merge pull request #4369 from chrismeyersfsu/enhancement-dependencies

Enhancement dependencies
This commit is contained in:
Chris Meyers 2016-12-12 10:13:59 -05:00 committed by GitHub
commit c7f935367f
6 changed files with 136 additions and 10 deletions

View File

@ -353,6 +353,10 @@ class UnifiedJobTypeStringMixin(object):
def _underscore_to_camel(cls, word):
return ''.join(x.capitalize() or '_' for x in word.split('_'))
@classmethod
def _camel_to_underscore(cls, word):
return re.sub('(?!^)([A-Z]+)', r'_\1', word).lower()
@classmethod
def _model_type(cls, job_type):
# Django >= 1.9
@ -371,6 +375,9 @@ class UnifiedJobTypeStringMixin(object):
return None
return model.objects.get(id=job_id)
def model_to_str(self):
return UnifiedJobTypeStringMixin._camel_to_underscore(self.__class__.__name__)
class UnifiedJob(PolymorphicModel, PasswordFieldsModel, CommonModelNameNotUnique, UnifiedJobTypeStringMixin):
'''

View File

@ -166,6 +166,9 @@ class TaskManager():
return (active_task_queues, active_tasks)
def get_dependent_jobs_for_inv_and_proj_update(self, job_obj):
return [{'type': j.model_to_str(), 'id': j.id} for j in job_obj.dependent_jobs.all()]
def start_task(self, task, dependent_tasks=[]):
from awx.main.tasks import handle_work_error, handle_work_success
@ -179,6 +182,17 @@ class TaskManager():
success_handler = handle_work_success.s(task_actual=task_actual)
job_obj = task.get_full()
'''
This is to account for when there isn't enough capacity to execute all
dependent jobs (i.e. proj or inv update) within the same schedule()
call.
Proceeding calls to schedule() need to recontruct the proj or inv
update -> job fail logic dependency. The below call recontructs that
failure dependency.
'''
if len(dependencies) == 0:
dependencies = self.get_dependent_jobs_for_inv_and_proj_update(job_obj)
job_obj.status = 'waiting'
(start_status, opts) = job_obj.pre_start()
@ -230,16 +244,41 @@ class TaskManager():
return inventory_task
'''
Since we are dealing with partial objects we don't get to take advantage
of Django to resolve the type of related Many to Many field dependent_job.
Hence the, potentional, double query in this method.
'''
def get_related_dependent_jobs_as_patials(self, job_ids):
dependent_partial_jobs = []
for id in job_ids:
if ProjectUpdate.objects.filter(id=id).exists():
dependent_partial_jobs.append(ProjectUpdateDict({"id": id}).refresh_partial())
elif InventoryUpdate.objects.filter(id=id).exists():
dependent_partial_jobs.append(InventoryUpdateDict({"id": id}).refresh_partial())
return dependent_partial_jobs
def capture_chain_failure_dependencies(self, task, dependencies):
for dep in dependencies:
dep_obj = task.get_full()
dep_obj.dependent_jobs.add(task['id'])
dep_obj.save()
def generate_dependencies(self, task):
dependencies = []
# TODO: What if the project is null ?
if type(task) is JobDict:
if task['project__scm_update_on_launch'] is True and \
self.graph.should_update_related_project(task):
project_task = self.create_project_update(task)
dependencies.append(project_task)
# Inventory created 2 seconds behind job
'''
Inventory may have already been synced from a provision callback.
'''
inventory_sources_already_updated = task.get_inventory_sources_already_updated()
for inventory_source_task in self.graph.get_inventory_sources(task['inventory_id']):
@ -248,6 +287,8 @@ class TaskManager():
if self.graph.should_update_related_inventory_source(task, inventory_source_task['id']):
inventory_task = self.create_inventory_update(task, inventory_source_task)
dependencies.append(inventory_task)
self.capture_chain_failure_dependencies(task, dependencies)
return dependencies
def process_latest_project_updates(self, latest_project_updates):

View File

@ -117,10 +117,6 @@ class DependencyGraph(object):
if not latest_inventory_update:
return True
# TODO: Other finished, failed cases? i.e. error ?
if latest_inventory_update['status'] in ['failed', 'canceled']:
return True
'''
This is a bit of fuzzy logic.
If the latest inventory update has a created time == job_created_time-2
@ -138,7 +134,11 @@ class DependencyGraph(object):
timeout_seconds = timedelta(seconds=latest_inventory_update['inventory_source__update_cache_timeout'])
if (latest_inventory_update['finished'] + timeout_seconds) < now:
return True
if latest_inventory_update['inventory_source__update_on_launch'] is True and \
latest_inventory_update['status'] in ['failed', 'canceled', 'error']:
return True
return False
def mark_system_job(self):

View File

@ -1,6 +1,7 @@
# Python
import json
import itertools
# AWX
from awx.main.utils import decrypt_field_value
@ -61,13 +62,36 @@ class PartialModelDict(object):
def task_impact(self):
raise RuntimeError("Inherit and implement me")
@classmethod
def merge_values(cls, values):
grouped_results = itertools.groupby(values, key=lambda value: value['id'])
merged_values = []
for k, g in grouped_results:
print k
groups = list(g)
merged_value = {}
for group in groups:
for key, val in group.iteritems():
if not merged_value.get(key):
merged_value[key] = val
elif val != merged_value[key]:
if isinstance(merged_value[key], list):
if val not in merged_value[key]:
merged_value[key].append(val)
else:
old_val = merged_value[key]
merged_value[key] = [old_val, val]
merged_values.append(merged_value)
return merged_values
class JobDict(PartialModelDict):
FIELDS = (
'id', 'status', 'job_template_id', 'inventory_id', 'project_id',
'launch_type', 'limit', 'allow_simultaneous', 'created',
'job_type', 'celery_task_id', 'project__scm_update_on_launch',
'forks', 'start_args',
'forks', 'start_args', 'dependent_jobs__id',
)
model = Job
@ -85,6 +109,14 @@ class JobDict(PartialModelDict):
start_args = start_args or {}
return start_args.get('inventory_sources_already_updated', [])
@classmethod
def filter_partial(cls, status=[]):
kv = {
'status__in': status
}
merged = PartialModelDict.merge_values(cls.model.objects.filter(**kv).values(*cls.get_db_values()))
return [cls(o) for o in merged]
class ProjectUpdateDict(PartialModelDict):
FIELDS = (
@ -134,7 +166,8 @@ class InventoryUpdateDict(PartialModelDict):
#'inventory_source__update_on_launch',
#'inventory_source__update_cache_timeout',
FIELDS = (
'id', 'status', 'created', 'celery_task_id', 'inventory_source_id', 'inventory_source__inventory_id',
'id', 'status', 'created', 'celery_task_id', 'inventory_source_id',
'inventory_source__inventory_id',
)
model = InventoryUpdate
@ -151,6 +184,7 @@ class InventoryUpdateLatestDict(InventoryUpdateDict):
FIELDS = (
'id', 'status', 'created', 'celery_task_id', 'inventory_source_id',
'finished', 'inventory_source__update_cache_timeout', 'launch_type',
'inventory_source__update_on_launch',
)
model = InventoryUpdate
@ -217,7 +251,7 @@ class SystemJobDict(PartialModelDict):
class AdHocCommandDict(PartialModelDict):
FIELDS = (
'id', 'created', 'status', 'inventory_id',
'id', 'created', 'status', 'inventory_id', 'dependent_jobs__id',
)
model = AdHocCommand

View File

@ -36,6 +36,7 @@ def scheduler_factory(mocker, epoch):
def no_create_project_update(task):
raise RuntimeError("create_project_update should not be called")
mocker.patch.object(sched, 'capture_chain_failure_dependencies')
mocker.patch.object(sched, 'get_tasks', return_value=tasks)
mocker.patch.object(sched, 'get_running_workflow_jobs', return_value=[])
mocker.patch.object(sched, 'get_inventory_source_tasks', return_value=inventory_sources)

View File

@ -26,6 +26,22 @@ def successful_inventory_update_latest_cache_expired(inventory_update_latest_fac
return iu
@pytest.fixture
def failed_inventory_update_latest_cache_zero(failed_inventory_update_latest):
iu = failed_inventory_update_latest
iu['inventory_source__update_cache_timeout'] = 0
iu['inventory_source__update_on_launch'] = True
iu['finished'] = iu['created'] + timedelta(seconds=2)
iu['status'] = 'failed'
return iu
@pytest.fixture
def failed_inventory_update_latest_cache_non_zero(failed_inventory_update_latest_cache_zero):
failed_inventory_update_latest_cache_zero['inventory_source__update_cache_timeout'] = 10000000
return failed_inventory_update_latest_cache_zero
class TestStartInventoryUpdate():
def test_pending(self, scheduler_factory, pending_inventory_update):
scheduler = scheduler_factory(tasks=[pending_inventory_update])
@ -79,11 +95,38 @@ class TestCreateDependentInventoryUpdate():
scheduler.start_task.assert_called_with(waiting_inventory_update, [pending_job])
def test_last_update_failed(self, scheduler_factory, pending_job, failed_inventory_update, failed_inventory_update_latest, waiting_inventory_update, inventory_id_sources):
def test_last_update_timeout_zero_failed(self, scheduler_factory, pending_job, failed_inventory_update, failed_inventory_update_latest_cache_zero, waiting_inventory_update, inventory_id_sources):
scheduler = scheduler_factory(tasks=[failed_inventory_update, pending_job],
latest_inventory_updates=[failed_inventory_update_latest],
latest_inventory_updates=[failed_inventory_update_latest_cache_zero],
create_inventory_update=waiting_inventory_update,
inventory_sources=inventory_id_sources)
scheduler._schedule()
scheduler.start_task.assert_called_with(waiting_inventory_update, [pending_job])
def test_last_update_timeout_non_zero_failed(self, scheduler_factory, pending_job, failed_inventory_update, failed_inventory_update_latest_cache_non_zero, waiting_inventory_update, inventory_id_sources):
scheduler = scheduler_factory(tasks=[failed_inventory_update, pending_job],
latest_inventory_updates=[failed_inventory_update_latest_cache_non_zero],
create_inventory_update=waiting_inventory_update,
inventory_sources=inventory_id_sources)
scheduler._schedule()
scheduler.start_task.assert_called_with(waiting_inventory_update, [pending_job])
class TestCaptureChainFailureDependencies():
@pytest.fixture
def inventory_id_sources(self, inventory_source_factory):
return [
(1, [inventory_source_factory(id=1)]),
]
def test(self, scheduler_factory, pending_job, waiting_inventory_update, inventory_id_sources):
scheduler = scheduler_factory(tasks=[pending_job],
create_inventory_update=waiting_inventory_update,
inventory_sources=inventory_id_sources)
scheduler._schedule()
scheduler.capture_chain_failure_dependencies.assert_called_with(pending_job, [waiting_inventory_update])