From 7861cda6fed80390bd238cba7de997a1a51ce3d9 Mon Sep 17 00:00:00 2001 From: Matthew Jones Date: Wed, 27 Sep 2017 16:00:05 -0400 Subject: [PATCH 1/3] Remove the logic blocking dependent inventory updates on callbacks --- awx/main/scheduler/task_manager.py | 21 ++++++++++----------- 1 file changed, 10 insertions(+), 11 deletions(-) diff --git a/awx/main/scheduler/task_manager.py b/awx/main/scheduler/task_manager.py index c78b96b3f6..2a06d79fbb 100644 --- a/awx/main/scheduler/task_manager.py +++ b/awx/main/scheduler/task_manager.py @@ -390,17 +390,16 @@ class TaskManager(): dependencies.append(latest_project_update) # Inventory created 2 seconds behind job - if task.launch_type != 'callback': - for inventory_source in [invsrc for invsrc in self.all_inventory_sources if invsrc.inventory == task.inventory]: - if not inventory_source.update_on_launch: - continue - latest_inventory_update = self.get_latest_inventory_update(inventory_source) - if self.should_update_inventory_source(task, latest_inventory_update): - inventory_task = self.create_inventory_update(task, inventory_source) - dependencies.append(inventory_task) - else: - if latest_inventory_update.status in ['waiting', 'pending', 'running']: - dependencies.append(latest_inventory_update) + for inventory_source in [invsrc for invsrc in self.all_inventory_sources if invsrc.inventory == task.inventory]: + if not inventory_source.update_on_launch: + continue + latest_inventory_update = self.get_latest_inventory_update(inventory_source) + if self.should_update_inventory_source(task, latest_inventory_update): + inventory_task = self.create_inventory_update(task, inventory_source) + dependencies.append(inventory_task) + else: + if latest_inventory_update.status in ['waiting', 'pending', 'running']: + dependencies.append(latest_inventory_update) if len(dependencies) > 0: self.capture_chain_failure_dependencies(task, dependencies) From f4c9617f95b794c59e7da4c18259a3b399039ee0 Mon Sep 17 00:00:00 2001 From: Matthew Jones Date: Wed, 27 Sep 2017 16:31:22 -0400 Subject: [PATCH 2/3] Check for inventory sources already updated from start args In the case where the host didn't exist in the inventory source and was found in the first inventory sync --- awx/main/scheduler/task_manager.py | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/awx/main/scheduler/task_manager.py b/awx/main/scheduler/task_manager.py index 2a06d79fbb..f4731901b9 100644 --- a/awx/main/scheduler/task_manager.py +++ b/awx/main/scheduler/task_manager.py @@ -5,6 +5,7 @@ from datetime import datetime, timedelta import logging import uuid +import json from sets import Set # Django @@ -37,6 +38,7 @@ from awx.main.signals import disable_activity_stream from awx.main.scheduler.dependency_graph import DependencyGraph from awx.main import tasks as awx_tasks +from awx.main.utils import decrypt_field # Celery from celery.task.control import inspect @@ -379,6 +381,7 @@ class TaskManager(): def generate_dependencies(self, task): dependencies = [] if type(task) is Job: + start_args = json.loads(decrypt_field(task, field_name="start_args")) # TODO: Can remove task.project None check after scan-job-default-playbook is removed if task.project is not None and task.project.scm_update_on_launch is True: latest_project_update = self.get_latest_project_update(task) @@ -391,6 +394,8 @@ class TaskManager(): # Inventory created 2 seconds behind job for inventory_source in [invsrc for invsrc in self.all_inventory_sources if invsrc.inventory == task.inventory]: + if "inventory_sources_already_updated" in start_args and inventory_source.id in start_args['inventory_sources_already_updated']: + continue if not inventory_source.update_on_launch: continue latest_inventory_update = self.get_latest_inventory_update(inventory_source) From 3e38a0c17d226bbe3d796e321df7f215f203d575 Mon Sep 17 00:00:00 2001 From: Matthew Jones Date: Wed, 27 Sep 2017 16:52:05 -0400 Subject: [PATCH 3/3] Add unit test for inventory_sources_already_updated --- awx/main/scheduler/task_manager.py | 5 ++- .../task_management/test_scheduler.py | 33 ++++++++++++++++++- 2 files changed, 36 insertions(+), 2 deletions(-) diff --git a/awx/main/scheduler/task_manager.py b/awx/main/scheduler/task_manager.py index f4731901b9..7e012b60b3 100644 --- a/awx/main/scheduler/task_manager.py +++ b/awx/main/scheduler/task_manager.py @@ -381,7 +381,6 @@ class TaskManager(): def generate_dependencies(self, task): dependencies = [] if type(task) is Job: - start_args = json.loads(decrypt_field(task, field_name="start_args")) # TODO: Can remove task.project None check after scan-job-default-playbook is removed if task.project is not None and task.project.scm_update_on_launch is True: latest_project_update = self.get_latest_project_update(task) @@ -393,6 +392,10 @@ class TaskManager(): dependencies.append(latest_project_update) # Inventory created 2 seconds behind job + try: + start_args = json.loads(decrypt_field(task, field_name="start_args")) + except ValueError: + start_args = dict() for inventory_source in [invsrc for invsrc in self.all_inventory_sources if invsrc.inventory == task.inventory]: if "inventory_sources_already_updated" in start_args and inventory_source.id in start_args['inventory_sources_already_updated']: continue diff --git a/awx/main/tests/functional/task_management/test_scheduler.py b/awx/main/tests/functional/task_management/test_scheduler.py index 05de8b2a81..9a1ece8ee9 100644 --- a/awx/main/tests/functional/task_management/test_scheduler.py +++ b/awx/main/tests/functional/task_management/test_scheduler.py @@ -1,11 +1,13 @@ import pytest import mock +import json from datetime import timedelta, datetime from django.core.cache import cache from django.utils.timezone import now as tz_now from awx.main.scheduler import TaskManager +from awx.main.utils import encrypt_field from awx.main.models import ( Job, Instance, @@ -154,7 +156,36 @@ def test_single_job_dependencies_inventory_update_launch(default_instance_group, with mock.patch("awx.main.scheduler.TaskManager.start_task"): TaskManager().schedule() TaskManager.start_task.assert_called_once_with(j, default_instance_group, []) - + + +@pytest.mark.django_db +def test_job_dependency_with_already_updated(default_instance_group, job_template_factory, mocker, inventory_source_factory): + objects = job_template_factory('jt', organization='org1', project='proj', + inventory='inv', credential='cred', + jobs=["job_should_start"]) + j = objects.jobs["job_should_start"] + j.status = 'pending' + j.save() + i = objects.inventory + ii = inventory_source_factory("ec2") + ii.source = "ec2" + ii.update_on_launch = True + ii.update_cache_timeout = 0 + ii.save() + i.inventory_sources.add(ii) + j.start_args = json.dumps(dict(inventory_sources_already_updated=[ii.id])) + j.save() + j.start_args = encrypt_field(j, field_name="start_args") + j.save() + with mock.patch("awx.main.scheduler.TaskManager.start_task"): + tm = TaskManager() + with mock.patch.object(TaskManager, "create_inventory_update", wraps=tm.create_inventory_update) as mock_iu: + tm.schedule() + mock_iu.assert_not_called() + with mock.patch("awx.main.scheduler.TaskManager.start_task"): + TaskManager().schedule() + TaskManager.start_task.assert_called_once_with(j, default_instance_group, []) + @pytest.mark.django_db def test_shared_dependencies_launch(default_instance_group, job_template_factory, mocker, inventory_source_factory):