1
0
mirror of https://github.com/ansible/awx.git synced 2024-11-01 08:21:15 +03:00

Merge pull request #2174 from chrismeyersfsu/fix-reaper_hostname_map

fix celery task reaper
This commit is contained in:
Chris Meyers 2018-06-15 17:06:04 -04:00 committed by GitHub
commit ee179c3693
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 97 additions and 19 deletions

View File

@ -0,0 +1,20 @@
# -*- coding: utf-8 -*-
# Generated by Django 1.11.11 on 2018-06-14 17:23
from __future__ import unicode_literals
from django.db import migrations, models
class Migration(migrations.Migration):
dependencies = [
('main', '0040_v330_unifiedjob_controller_node'),
]
operations = [
migrations.AddField(
model_name='instance',
name='system_hostname',
field=models.CharField(default=b'', max_length=255, unique=True),
),
]

View File

@ -45,6 +45,11 @@ class Instance(BaseModel):
uuid = models.CharField(max_length=40) uuid = models.CharField(max_length=40)
hostname = models.CharField(max_length=250, unique=True) hostname = models.CharField(max_length=250, unique=True)
system_hostname = models.CharField(
max_length=255,
db_index=True,
help_text="Machine hostname",
)
created = models.DateTimeField(auto_now_add=True) created = models.DateTimeField(auto_now_add=True)
modified = models.DateTimeField(auto_now=True) modified = models.DateTimeField(auto_now=True)
last_isolated_check = models.DateTimeField( last_isolated_check = models.DateTimeField(
@ -109,6 +114,10 @@ class Instance(BaseModel):
def jobs_total(self): def jobs_total(self):
return UnifiedJob.objects.filter(execution_node=self.hostname).count() return UnifiedJob.objects.filter(execution_node=self.hostname).count()
@property
def celery_system_hostname(self):
return 'celery@{}'.format(self.system_hostname)
def is_lost(self, ref_time=None, isolated=False): def is_lost(self, ref_time=None, isolated=False):
if ref_time is None: if ref_time is None:
ref_time = now() ref_time = now()

View File

@ -8,6 +8,7 @@ import uuid
import json import json
import six import six
import random import random
import itertools
from sets import Set from sets import Set
# Django # Django
@ -158,6 +159,33 @@ class TaskManager():
return (active_task_queues, queues) return (active_task_queues, queues)
def map_system_hostname_to_instance_hostname(self, in_map):
'''
Convert celery's system hostnames to Instance.hostname values e.g.,
map_system_hostname_to_instance_hostname({
'node1.example.org': ABC,
'node2.example.org': ABC,
})
{
Instance.objects.get(system_hostname='node1.example.org').hostname: ABC,
Instance.objects.get(system_hostname='node2.example.org').hostname: ABC
}
'''
out_map = dict()
system_hostname_map = {i.system_hostname: i for i in
Instance.objects.only('system_hostname', 'hostname')}
for k, v in in_map.iteritems():
instance = system_hostname_map.get(k)
if not instance:
logger.warn("Could not map celery system hostname {} to Instance hostname".format(k))
else:
out_map[instance.hostname] = v
return out_map
def get_latest_project_update_tasks(self, all_sorted_tasks): def get_latest_project_update_tasks(self, all_sorted_tasks):
project_ids = Set() project_ids = Set()
for task in all_sorted_tasks: for task in all_sorted_tasks:
@ -554,10 +582,12 @@ class TaskManager():
Rectify tower db <-> celery inconsistent view of jobs state Rectify tower db <-> celery inconsistent view of jobs state
''' '''
last_cleanup = cache.get('last_celery_task_cleanup') or datetime.min.replace(tzinfo=utc) last_cleanup = cache.get('last_celery_task_cleanup') or datetime.min.replace(tzinfo=utc)
if (tz_now() - last_cleanup).seconds < settings.AWX_INCONSISTENT_TASK_INTERVAL: time_since_last_cleanup_sec = (tz_now() - last_cleanup).seconds
cleanup_diff = settings.AWX_INCONSISTENT_TASK_INTERVAL - time_since_last_cleanup_sec
if cleanup_diff > 0:
logger.debug("Skipping job reaper. Can run again in {} seconds".format(cleanup_diff))
return return
logger.debug("Failing inconsistent running jobs.")
celery_task_start_time = tz_now() celery_task_start_time = tz_now()
active_task_queues, active_queues = self.get_active_tasks() active_task_queues, active_queues = self.get_active_tasks()
cache.set('last_celery_task_cleanup', tz_now()) cache.set('last_celery_task_cleanup', tz_now())
@ -566,21 +596,21 @@ class TaskManager():
logger.error('Failed to retrieve active tasks from celery') logger.error('Failed to retrieve active tasks from celery')
return None return None
remapped_active_queues = self.map_system_hostname_to_instance_hostname(active_queues)
''' '''
Only consider failing tasks on instances for which we obtained a task Only consider failing tasks on instances for which we obtained a task
list from celery for. list from celery for.
''' '''
running_tasks, waiting_tasks = self.get_running_tasks() running_tasks, waiting_tasks = self.get_running_tasks()
all_celery_task_ids = [] all_celery_task_ids = list(itertools.chain.from_iterable(remapped_active_queues.values()))
for node, node_jobs in active_queues.iteritems():
all_celery_task_ids.extend(node_jobs)
self.fail_jobs_if_not_in_celery(waiting_tasks, all_celery_task_ids, celery_task_start_time) self.fail_jobs_if_not_in_celery(waiting_tasks, all_celery_task_ids, celery_task_start_time)
for node, node_jobs in running_tasks.iteritems(): for node, node_jobs in running_tasks.iteritems():
isolated = False isolated = False
if node in active_queues: if node in remapped_active_queues:
active_tasks = active_queues[node] active_tasks = remapped_active_queues[node]
else: else:
''' '''
Node task list not found in celery. We may branch into cases: Node task list not found in celery. We may branch into cases:
@ -599,11 +629,17 @@ class TaskManager():
node, [j.log_format for j in node_jobs])) node, [j.log_format for j in node_jobs]))
active_tasks = [] active_tasks = []
elif instance.capacity == 0: elif instance.capacity == 0:
logger.info("Instance {} is known to be offline and did not reply "
"with a list of running celery tasks. Going to fail all running"
"jobs associated with this instance.".format(instance.hostname))
active_tasks = [] active_tasks = []
elif instance.rampart_groups.filter(controller__isnull=False).exists(): elif instance.rampart_groups.filter(controller__isnull=False).exists():
logger.info("Failing all jobs for Isolated Instance {} ".format(instance.hostname))
active_tasks = all_celery_task_ids active_tasks = all_celery_task_ids
isolated = True isolated = True
else: else:
logger.info("Instance {} did not reply with a list of running "
"celery tasks and the Instance does not look offline.".format(instance.hostname))
continue continue
self.fail_jobs_if_not_in_celery( self.fail_jobs_if_not_in_celery(

View File

@ -20,6 +20,7 @@ import time
import traceback import traceback
import six import six
import urlparse import urlparse
import socket
from distutils.version import LooseVersion as Version from distutils.version import LooseVersion as Version
import yaml import yaml
import fcntl import fcntl
@ -231,11 +232,21 @@ def handle_ha_toplogy_worker_ready(sender, **kwargs):
@celeryd_after_setup.connect @celeryd_after_setup.connect
def handle_update_celery_hostname(sender, instance, **kwargs): def handle_update_celery_hostname(sender, instance, **kwargs):
(changed, tower_instance) = Instance.objects.get_or_register() '''
if changed: Celery will appear to infinitely reboot if an error occurs here.
logger.info(six.text_type("Registered tower node '{}'").format(tower_instance.hostname)) '''
instance.hostname = 'celery@{}'.format(tower_instance.hostname) try:
logger.warn(six.text_type("Set hostname to {}").format(instance.hostname)) (changed, tower_instance) = Instance.objects.get_or_register()
if changed:
logger.info(six.text_type("Registered tower node '{}'").format(tower_instance.hostname))
system_hostname = socket.gethostname()
if system_hostname != tower_instance.system_hostname:
tower_instance.system_hostname = system_hostname
tower_instance.save(update_fields=['system_hostname'])
logger.warn(six.text_type("Set system hostname to {}").format(tower_instance.system_hostname))
except Exception as e:
logger.error("Error encountered while starting celery and getting system hostname {}".format(e))
raise e
@shared_task(queue=settings.CELERY_DEFAULT_QUEUE) @shared_task(queue=settings.CELERY_DEFAULT_QUEUE)

View File

@ -265,10 +265,10 @@ class TestReaper():
def all_jobs(self, mocker): def all_jobs(self, mocker):
now = tz_now() now = tz_now()
Instance.objects.create(hostname='host1', capacity=100) Instance.objects.create(hostname='host1', system_hostname='host1_not_really', capacity=100)
Instance.objects.create(hostname='host2', capacity=100) Instance.objects.create(hostname='host2', system_hostname='host2', capacity=100)
Instance.objects.create(hostname='host3_split', capacity=100) Instance.objects.create(hostname='host3_split', system_hostname='host3_not_really', capacity=100)
Instance.objects.create(hostname='host4_offline', capacity=0) Instance.objects.create(hostname='host4_offline', system_hostname='host4_offline', capacity=0)
j1 = Job.objects.create(status='pending', execution_node='host1') j1 = Job.objects.create(status='pending', execution_node='host1')
j2 = Job.objects.create(status='waiting', celery_task_id='considered_j2') j2 = Job.objects.create(status='waiting', celery_task_id='considered_j2')
@ -327,7 +327,7 @@ class TestReaper():
@pytest.fixture @pytest.fixture
def active_tasks(self): def active_tasks(self):
return ([], { return ([], {
'host1': ['considered_j2', 'considered_j3', 'considered_j4',], 'host1_not_really': ['considered_j2', 'considered_j3', 'considered_j4',],
'host2': ['considered_j6', 'considered_j7'], 'host2': ['considered_j6', 'considered_j7'],
}) })

View File

@ -26,6 +26,7 @@ class TestCleanupInconsistentCeleryTasks():
def test_instance_does_not_exist(self, logger_mock, *args): def test_instance_does_not_exist(self, logger_mock, *args):
logger_mock.error = mock.MagicMock(side_effect=RuntimeError("mocked")) logger_mock.error = mock.MagicMock(side_effect=RuntimeError("mocked"))
tm = TaskManager() tm = TaskManager()
tm.map_system_hostname_to_instance_hostname = lambda *args: dict()
with pytest.raises(RuntimeError) as excinfo: with pytest.raises(RuntimeError) as excinfo:
tm.cleanup_inconsistent_celery_tasks() tm.cleanup_inconsistent_celery_tasks()
@ -45,6 +46,7 @@ class TestCleanupInconsistentCeleryTasks():
job.websocket_emit_status = mock.MagicMock() job.websocket_emit_status = mock.MagicMock()
get_running_tasks.return_value = ({'host1': [job]}, []) get_running_tasks.return_value = ({'host1': [job]}, [])
tm = TaskManager() tm = TaskManager()
tm.map_system_hostname_to_instance_hostname = lambda *args: dict(host1=[])
with mock.patch.object(job, 'save', side_effect=DatabaseError): with mock.patch.object(job, 'save', side_effect=DatabaseError):
tm.cleanup_inconsistent_celery_tasks() tm.cleanup_inconsistent_celery_tasks()