mirror of
https://github.com/ansible/awx.git
synced 2024-11-02 01:21:21 +03:00
Merge pull request #477 from ansible/remove_callback_blocked_dependency
Remove the logic blocking dependent inventory updates on callbacks
This commit is contained in:
commit
a5c028cd45
@ -5,6 +5,7 @@
|
|||||||
from datetime import datetime, timedelta
|
from datetime import datetime, timedelta
|
||||||
import logging
|
import logging
|
||||||
import uuid
|
import uuid
|
||||||
|
import json
|
||||||
from sets import Set
|
from sets import Set
|
||||||
|
|
||||||
# Django
|
# Django
|
||||||
@ -37,6 +38,7 @@ from awx.main.signals import disable_activity_stream
|
|||||||
|
|
||||||
from awx.main.scheduler.dependency_graph import DependencyGraph
|
from awx.main.scheduler.dependency_graph import DependencyGraph
|
||||||
from awx.main import tasks as awx_tasks
|
from awx.main import tasks as awx_tasks
|
||||||
|
from awx.main.utils import decrypt_field
|
||||||
|
|
||||||
# Celery
|
# Celery
|
||||||
from celery.task.control import inspect
|
from celery.task.control import inspect
|
||||||
@ -390,17 +392,22 @@ class TaskManager():
|
|||||||
dependencies.append(latest_project_update)
|
dependencies.append(latest_project_update)
|
||||||
|
|
||||||
# Inventory created 2 seconds behind job
|
# Inventory created 2 seconds behind job
|
||||||
if task.launch_type != 'callback':
|
try:
|
||||||
for inventory_source in [invsrc for invsrc in self.all_inventory_sources if invsrc.inventory == task.inventory]:
|
start_args = json.loads(decrypt_field(task, field_name="start_args"))
|
||||||
if not inventory_source.update_on_launch:
|
except ValueError:
|
||||||
continue
|
start_args = dict()
|
||||||
latest_inventory_update = self.get_latest_inventory_update(inventory_source)
|
for inventory_source in [invsrc for invsrc in self.all_inventory_sources if invsrc.inventory == task.inventory]:
|
||||||
if self.should_update_inventory_source(task, latest_inventory_update):
|
if "inventory_sources_already_updated" in start_args and inventory_source.id in start_args['inventory_sources_already_updated']:
|
||||||
inventory_task = self.create_inventory_update(task, inventory_source)
|
continue
|
||||||
dependencies.append(inventory_task)
|
if not inventory_source.update_on_launch:
|
||||||
else:
|
continue
|
||||||
if latest_inventory_update.status in ['waiting', 'pending', 'running']:
|
latest_inventory_update = self.get_latest_inventory_update(inventory_source)
|
||||||
dependencies.append(latest_inventory_update)
|
if self.should_update_inventory_source(task, latest_inventory_update):
|
||||||
|
inventory_task = self.create_inventory_update(task, inventory_source)
|
||||||
|
dependencies.append(inventory_task)
|
||||||
|
else:
|
||||||
|
if latest_inventory_update.status in ['waiting', 'pending', 'running']:
|
||||||
|
dependencies.append(latest_inventory_update)
|
||||||
|
|
||||||
if len(dependencies) > 0:
|
if len(dependencies) > 0:
|
||||||
self.capture_chain_failure_dependencies(task, dependencies)
|
self.capture_chain_failure_dependencies(task, dependencies)
|
||||||
|
@ -1,11 +1,13 @@
|
|||||||
import pytest
|
import pytest
|
||||||
import mock
|
import mock
|
||||||
|
import json
|
||||||
from datetime import timedelta, datetime
|
from datetime import timedelta, datetime
|
||||||
|
|
||||||
from django.core.cache import cache
|
from django.core.cache import cache
|
||||||
from django.utils.timezone import now as tz_now
|
from django.utils.timezone import now as tz_now
|
||||||
|
|
||||||
from awx.main.scheduler import TaskManager
|
from awx.main.scheduler import TaskManager
|
||||||
|
from awx.main.utils import encrypt_field
|
||||||
from awx.main.models import (
|
from awx.main.models import (
|
||||||
Job,
|
Job,
|
||||||
Instance,
|
Instance,
|
||||||
@ -154,7 +156,36 @@ def test_single_job_dependencies_inventory_update_launch(default_instance_group,
|
|||||||
with mock.patch("awx.main.scheduler.TaskManager.start_task"):
|
with mock.patch("awx.main.scheduler.TaskManager.start_task"):
|
||||||
TaskManager().schedule()
|
TaskManager().schedule()
|
||||||
TaskManager.start_task.assert_called_once_with(j, default_instance_group, [])
|
TaskManager.start_task.assert_called_once_with(j, default_instance_group, [])
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.django_db
|
||||||
|
def test_job_dependency_with_already_updated(default_instance_group, job_template_factory, mocker, inventory_source_factory):
|
||||||
|
objects = job_template_factory('jt', organization='org1', project='proj',
|
||||||
|
inventory='inv', credential='cred',
|
||||||
|
jobs=["job_should_start"])
|
||||||
|
j = objects.jobs["job_should_start"]
|
||||||
|
j.status = 'pending'
|
||||||
|
j.save()
|
||||||
|
i = objects.inventory
|
||||||
|
ii = inventory_source_factory("ec2")
|
||||||
|
ii.source = "ec2"
|
||||||
|
ii.update_on_launch = True
|
||||||
|
ii.update_cache_timeout = 0
|
||||||
|
ii.save()
|
||||||
|
i.inventory_sources.add(ii)
|
||||||
|
j.start_args = json.dumps(dict(inventory_sources_already_updated=[ii.id]))
|
||||||
|
j.save()
|
||||||
|
j.start_args = encrypt_field(j, field_name="start_args")
|
||||||
|
j.save()
|
||||||
|
with mock.patch("awx.main.scheduler.TaskManager.start_task"):
|
||||||
|
tm = TaskManager()
|
||||||
|
with mock.patch.object(TaskManager, "create_inventory_update", wraps=tm.create_inventory_update) as mock_iu:
|
||||||
|
tm.schedule()
|
||||||
|
mock_iu.assert_not_called()
|
||||||
|
with mock.patch("awx.main.scheduler.TaskManager.start_task"):
|
||||||
|
TaskManager().schedule()
|
||||||
|
TaskManager.start_task.assert_called_once_with(j, default_instance_group, [])
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.django_db
|
@pytest.mark.django_db
|
||||||
def test_shared_dependencies_launch(default_instance_group, job_template_factory, mocker, inventory_source_factory):
|
def test_shared_dependencies_launch(default_instance_group, job_template_factory, mocker, inventory_source_factory):
|
||||||
|
Loading…
Reference in New Issue
Block a user