diff --git a/awx/main/scheduler/task_manager.py b/awx/main/scheduler/task_manager.py index c78b96b3f6..7e012b60b3 100644 --- a/awx/main/scheduler/task_manager.py +++ b/awx/main/scheduler/task_manager.py @@ -5,6 +5,7 @@ from datetime import datetime, timedelta import logging import uuid +import json from sets import Set # Django @@ -37,6 +38,7 @@ from awx.main.signals import disable_activity_stream from awx.main.scheduler.dependency_graph import DependencyGraph from awx.main import tasks as awx_tasks +from awx.main.utils import decrypt_field # Celery from celery.task.control import inspect @@ -390,17 +392,22 @@ class TaskManager(): dependencies.append(latest_project_update) # Inventory created 2 seconds behind job - if task.launch_type != 'callback': - for inventory_source in [invsrc for invsrc in self.all_inventory_sources if invsrc.inventory == task.inventory]: - if not inventory_source.update_on_launch: - continue - latest_inventory_update = self.get_latest_inventory_update(inventory_source) - if self.should_update_inventory_source(task, latest_inventory_update): - inventory_task = self.create_inventory_update(task, inventory_source) - dependencies.append(inventory_task) - else: - if latest_inventory_update.status in ['waiting', 'pending', 'running']: - dependencies.append(latest_inventory_update) + try: + start_args = json.loads(decrypt_field(task, field_name="start_args")) + except ValueError: + start_args = dict() + for inventory_source in [invsrc for invsrc in self.all_inventory_sources if invsrc.inventory == task.inventory]: + if "inventory_sources_already_updated" in start_args and inventory_source.id in start_args['inventory_sources_already_updated']: + continue + if not inventory_source.update_on_launch: + continue + latest_inventory_update = self.get_latest_inventory_update(inventory_source) + if self.should_update_inventory_source(task, latest_inventory_update): + inventory_task = self.create_inventory_update(task, inventory_source) + dependencies.append(inventory_task) + else: + if latest_inventory_update.status in ['waiting', 'pending', 'running']: + dependencies.append(latest_inventory_update) if len(dependencies) > 0: self.capture_chain_failure_dependencies(task, dependencies) diff --git a/awx/main/tests/functional/task_management/test_scheduler.py b/awx/main/tests/functional/task_management/test_scheduler.py index 05de8b2a81..9a1ece8ee9 100644 --- a/awx/main/tests/functional/task_management/test_scheduler.py +++ b/awx/main/tests/functional/task_management/test_scheduler.py @@ -1,11 +1,13 @@ import pytest import mock +import json from datetime import timedelta, datetime from django.core.cache import cache from django.utils.timezone import now as tz_now from awx.main.scheduler import TaskManager +from awx.main.utils import encrypt_field from awx.main.models import ( Job, Instance, @@ -154,7 +156,36 @@ def test_single_job_dependencies_inventory_update_launch(default_instance_group, with mock.patch("awx.main.scheduler.TaskManager.start_task"): TaskManager().schedule() TaskManager.start_task.assert_called_once_with(j, default_instance_group, []) - + + +@pytest.mark.django_db +def test_job_dependency_with_already_updated(default_instance_group, job_template_factory, mocker, inventory_source_factory): + objects = job_template_factory('jt', organization='org1', project='proj', + inventory='inv', credential='cred', + jobs=["job_should_start"]) + j = objects.jobs["job_should_start"] + j.status = 'pending' + j.save() + i = objects.inventory + ii = inventory_source_factory("ec2") + ii.source = "ec2" + ii.update_on_launch = True + ii.update_cache_timeout = 0 + ii.save() + i.inventory_sources.add(ii) + j.start_args = json.dumps(dict(inventory_sources_already_updated=[ii.id])) + j.save() + j.start_args = encrypt_field(j, field_name="start_args") + j.save() + with mock.patch("awx.main.scheduler.TaskManager.start_task"): + tm = TaskManager() + with mock.patch.object(TaskManager, "create_inventory_update", wraps=tm.create_inventory_update) as mock_iu: + tm.schedule() + mock_iu.assert_not_called() + with mock.patch("awx.main.scheduler.TaskManager.start_task"): + TaskManager().schedule() + TaskManager.start_task.assert_called_once_with(j, default_instance_group, []) + @pytest.mark.django_db def test_shared_dependencies_launch(default_instance_group, job_template_factory, mocker, inventory_source_factory):