1
0
mirror of https://github.com/ansible/awx.git synced 2024-11-02 09:51:09 +03:00

fix task manager running task structures

- prevent dual-entry for first item in running jobs due to
   setdefault syntax
 - fix issue where queues (celery tasks) only returned the last
   item in the inspector output due to looping problem
   this caused reaper bugs in production
This commit is contained in:
AlanCoding 2017-09-21 14:38:58 -04:00
parent d0cb11acb3
commit dba83674a2
No known key found for this signature in database
GPG Key ID: FD2C3C012A72926B
2 changed files with 22 additions and 5 deletions

View File

@ -97,7 +97,7 @@ class TaskManager():
~Q(polymorphic_ctype_id=workflow_ctype_id)) ~Q(polymorphic_ctype_id=workflow_ctype_id))
for j in jobs: for j in jobs:
if j.execution_node: if j.execution_node:
execution_nodes.setdefault(j.execution_node, [j]).append(j) execution_nodes.setdefault(j.execution_node, []).append(j)
else: else:
waiting_jobs.append(j) waiting_jobs.append(j)
return (execution_nodes, waiting_jobs) return (execution_nodes, waiting_jobs)
@ -142,10 +142,10 @@ class TaskManager():
active_tasks = set() active_tasks = set()
map(lambda at: active_tasks.add(at['id']), active_task_queues[queue]) map(lambda at: active_tasks.add(at['id']), active_task_queues[queue])
# celery worker name is of the form celery@myhost.com # celery worker name is of the form celery@myhost.com
queue_name = queue.split('@') queue_name = queue.split('@')
queue_name = queue_name[1 if len(queue_name) > 1 else 0] queue_name = queue_name[1 if len(queue_name) > 1 else 0]
queues[queue_name] = active_tasks queues[queue_name] = active_tasks
else: else:
if not hasattr(settings, 'CELERY_UNIT_TEST'): if not hasattr(settings, 'CELERY_UNIT_TEST'):
return (None, None) return (None, None)

View File

@ -50,3 +50,20 @@ class TestCleanupInconsistentCeleryTasks():
tm.cleanup_inconsistent_celery_tasks() tm.cleanup_inconsistent_celery_tasks()
job.save.assert_called_once() job.save.assert_called_once()
logger_mock.error.assert_called_once_with("Task job 2 (failed) DB error in marking failed. Job possibly deleted.") logger_mock.error.assert_called_once_with("Task job 2 (failed) DB error in marking failed. Job possibly deleted.")
@mock.patch.object(InstanceGroup.objects, 'prefetch_related', return_value=[])
@mock.patch('awx.main.scheduler.task_manager.inspect')
def test_multiple_active_instances_sanity_check(self, inspect_mock, *args):
class MockInspector:
pass
mock_inspector = MockInspector()
mock_inspector.active = lambda: {
'celery@host1': [],
'celery@host2': []
}
inspect_mock.return_value = mock_inspector
tm = TaskManager()
active_task_queues, queues = tm.get_active_tasks()
assert 'host1' in queues
assert 'host2' in queues