From 43aa0fc74194b265d54617a2cbb253a93f9ac18e Mon Sep 17 00:00:00 2001 From: Ryan Petrello Date: Fri, 7 Sep 2018 09:29:27 -0400 Subject: [PATCH] consolidate celery init signals to avoid an instance registration race --- awx/main/tasks.py | 55 ++++++++++++++++++++++------------------------- 1 file changed, 26 insertions(+), 29 deletions(-) diff --git a/awx/main/tasks.py b/awx/main/tasks.py index 490dc48b44..5c31641518 100644 --- a/awx/main/tasks.py +++ b/awx/main/tasks.py @@ -32,7 +32,7 @@ except Exception: from kombu import Queue, Exchange from kombu.common import Broadcast from celery import Task, shared_task -from celery.signals import celeryd_init, worker_shutdown, celeryd_after_setup +from celery.signals import celeryd_init, worker_shutdown # Django from django.conf import settings @@ -108,6 +108,31 @@ def log_celery_failure(self, exc, task_id, args, kwargs, einfo): @celeryd_init.connect def celery_startup(conf=None, **kwargs): + # + # When celeryd starts, if the instance cannot be found in the database, + # automatically register it. This is mostly useful for openshift-based + # deployments where: + # + # 2 Instances come online + # Instance B encounters a network blip, Instance A notices, and + # deprovisions it + # Instance B's connectivity is restored, celeryd starts, and it + # re-registers itself + # + # In traditional container-less deployments, instances don't get + # deprovisioned when they miss their heartbeat, so this code is mostly a + # no-op. + # + if kwargs['instance'].hostname != 'celery@{}'.format(settings.CLUSTER_HOST_ID): + error = six.text_type('celery -n {} does not match settings.CLUSTER_HOST_ID={}').format( + instance.hostname, settings.CLUSTER_HOST_ID + ) + logger.error(error) + raise RuntimeError(error) + (changed, tower_instance) = Instance.objects.get_or_register() + if changed: + logger.info(six.text_type("Registered tower node '{}'").format(tower_instance.hostname)) + startup_logger = logging.getLogger('awx.main.tasks') startup_logger.info("Syncing Schedules") for sch in Schedule.objects.all(): @@ -268,34 +293,6 @@ def handle_setting_changes(self, setting_keys): cache.delete_many(cache_keys) -@celeryd_after_setup.connect -def auto_register_ha_instance(sender, instance, **kwargs): - # - # When celeryd starts, if the instance cannot be found in the database, - # automatically register it. This is mostly useful for openshift-based - # deployments where: - # - # 2 Instances come online - # Instance B encounters a network blip, Instance A notices, and - # deprovisions it - # Instance B's connectivity is restored, celeryd starts, and it - # re-registers itself - # - # In traditional container-less deployments, instances don't get - # deprovisioned when they miss their heartbeat, so this code is mostly a - # no-op. - # - if instance.hostname != 'celery@{}'.format(settings.CLUSTER_HOST_ID): - error = six.text_type('celery -n {} does not match settings.CLUSTER_HOST_ID={}').format( - instance.hostname, settings.CLUSTER_HOST_ID - ) - logger.error(error) - raise RuntimeError(error) - (changed, tower_instance) = Instance.objects.get_or_register() - if changed: - logger.info(six.text_type("Registered tower node '{}'").format(tower_instance.hostname)) - - @shared_task(queue=settings.CELERY_DEFAULT_QUEUE) def send_notifications(notification_list, job_id=None): if not isinstance(notification_list, list):