From 1cea20092c26e0667d1459fd64779dd2a5620842 Mon Sep 17 00:00:00 2001 From: chris meyers Date: Fri, 8 Jun 2018 11:20:54 -0400 Subject: [PATCH 1/2] remove rampart group queue subscription * We now target Instances in the task manager when transitioning jobs from pending to waiting; whereas before we submitted jobs to Instance Groups to be picked up by Instance's in those Instance Groups. Subscribing Instances to their Instance Groups is no longer needed. This change removes the Instance Group queue subscription. --- awx/main/tests/unit/utils/test_ha.py | 23 ++++++++++++----------- awx/main/utils/ha.py | 7 ++----- 2 files changed, 14 insertions(+), 16 deletions(-) diff --git a/awx/main/tests/unit/utils/test_ha.py b/awx/main/tests/unit/utils/test_ha.py index 94cb7d3606..f73f8e908a 100644 --- a/awx/main/tests/unit/utils/test_ha.py +++ b/awx/main/tests/unit/utils/test_ha.py @@ -18,7 +18,8 @@ from awx.main.utils.ha import ( class TestAddRemoveCeleryWorkerQueues(): @pytest.fixture def instance_generator(self, mocker): - def fn(groups=['east', 'west', 'north', 'south'], hostname='east-1'): + def fn(hostname='east-1'): + groups=['east', 'west', 'north', 'south'] instance = mocker.MagicMock() instance.hostname = hostname instance.rampart_groups = mocker.MagicMock() @@ -40,29 +41,29 @@ class TestAddRemoveCeleryWorkerQueues(): app.control.cancel_consumer = mocker.MagicMock() return app - @pytest.mark.parametrize("broadcast_queues,static_queues,_worker_queues,groups,hostname,added_expected,removed_expected", [ - (['tower_broadcast_all'], ['east', 'west'], ['east', 'west', 'east-1'], [], 'east-1', ['tower_broadcast_all_east-1'], []), - ([], [], ['east', 'west', 'east-1'], ['east', 'west'], 'east-1', [], []), - ([], [], ['east', 'west'], ['east', 'west'], 'east-1', ['east-1'], []), - ([], [], [], ['east', 'west'], 'east-1', ['east', 'west', 'east-1'], []), - ([], [], ['china', 'russia'], ['east', 'west'], 'east-1', ['east', 'west', 'east-1'], ['china', 'russia']), + @pytest.mark.parametrize("broadcast_queues,static_queues,_worker_queues,hostname,added_expected,removed_expected", [ + (['tower_broadcast_all'], ['east', 'west'], ['east', 'west', 'east-1'], 'east-1', ['tower_broadcast_all_east-1'], []), + ([], [], ['east', 'west', 'east-1'], 'east-1', [], ['east', 'west']), + ([], [], ['east', 'west'], 'east-1', ['east-1'], ['east', 'west']), + ([], [], [], 'east-1', ['east-1'], []), + ([], [], ['china', 'russia'], 'east-1', [ 'east-1'], ['china', 'russia']), ]) def test__add_remove_celery_worker_queues_noop(self, mock_app, instance_generator, worker_queues_generator, broadcast_queues, static_queues, _worker_queues, - groups, hostname, + hostname, added_expected, removed_expected): - instance = instance_generator(groups=groups, hostname=hostname) + instance = instance_generator(hostname=hostname) worker_queues = worker_queues_generator(_worker_queues) with nested( mock.patch('awx.main.utils.ha.settings.AWX_CELERY_QUEUES_STATIC', static_queues), mock.patch('awx.main.utils.ha.settings.AWX_CELERY_BCAST_QUEUES_STATIC', broadcast_queues), mock.patch('awx.main.utils.ha.settings.CLUSTER_HOST_ID', hostname)): (added_queues, removed_queues) = _add_remove_celery_worker_queues(mock_app, [instance], worker_queues, hostname) - assert set(added_queues) == set(added_expected) - assert set(removed_queues) == set(removed_expected) + assert set(added_expected) == set(added_queues) + assert set(removed_expected) == set(removed_queues) class TestUpdateCeleryWorkerRouter(): diff --git a/awx/main/utils/ha.py b/awx/main/utils/ha.py index 49421ad4cb..dd629ca24d 100644 --- a/awx/main/utils/ha.py +++ b/awx/main/utils/ha.py @@ -17,14 +17,11 @@ def construct_bcast_queue_name(common_name): def _add_remove_celery_worker_queues(app, controlled_instances, worker_queues, worker_name): removed_queues = [] added_queues = [] - ig_names = set() hostnames = set([instance.hostname for instance in controlled_instances]) - for instance in controlled_instances: - ig_names.update(instance.rampart_groups.values_list('name', flat=True)) worker_queue_names = set([q['name'] for q in worker_queues]) bcast_queue_names = set([construct_bcast_queue_name(n) for n in settings.AWX_CELERY_BCAST_QUEUES_STATIC]) - all_queue_names = ig_names | hostnames | set(settings.AWX_CELERY_QUEUES_STATIC) + all_queue_names = hostnames | set(settings.AWX_CELERY_QUEUES_STATIC) desired_queues = bcast_queue_names | (all_queue_names if instance.enabled else set()) # Remove queues @@ -33,7 +30,7 @@ def _add_remove_celery_worker_queues(app, controlled_instances, worker_queues, w app.control.cancel_consumer(queue_name.encode("utf8"), reply=True, destination=[worker_name]) removed_queues.append(queue_name.encode("utf8")) - # Add queues for instance and instance groups + # Add queues for instances for queue_name in all_queue_names: if queue_name not in worker_queue_names: app.control.add_consumer(queue_name.encode("utf8"), reply=True, destination=[worker_name]) From fb119671146b779a1f710872cccbae9e8adb93c7 Mon Sep 17 00:00:00 2001 From: chris meyers Date: Fri, 8 Jun 2018 13:46:58 -0400 Subject: [PATCH 2/2] remove isolated instance group queue listening --- awx/main/tasks.py | 12 ++++++------ awx/main/tests/unit/utils/test_ha.py | 2 +- awx/main/utils/ha.py | 14 ++++---------- 3 files changed, 11 insertions(+), 17 deletions(-) diff --git a/awx/main/tasks.py b/awx/main/tasks.py index be10d3ef58..c0667d15b6 100644 --- a/awx/main/tasks.py +++ b/awx/main/tasks.py @@ -209,19 +209,19 @@ def handle_ha_toplogy_changes(self): logger.debug(six.text_type("Reconfigure celeryd queues task on host {}").format(self.request.hostname)) awx_app = Celery('awx') awx_app.config_from_object('django.conf:settings') - instances, removed_queues, added_queues = register_celery_worker_queues(awx_app, self.request.hostname) + removed_queues, added_queues = register_celery_worker_queues(awx_app, self.request.hostname) if len(removed_queues) + len(added_queues) > 0: - logger.info(six.text_type("Workers on tower node(s) '{}' removed from queues {} and added to queues {}") - .format([i.hostname for i in instances], removed_queues, added_queues)) + logger.info(six.text_type("Workers on tower node '{}' removed from queues {} and added to queues {}") + .format(self.request.hostname, removed_queues, added_queues)) @worker_ready.connect def handle_ha_toplogy_worker_ready(sender, **kwargs): logger.debug(six.text_type("Configure celeryd queues task on host {}").format(sender.hostname)) - instances, removed_queues, added_queues = register_celery_worker_queues(sender.app, sender.hostname) + removed_queues, added_queues = register_celery_worker_queues(sender.app, sender.hostname) if len(removed_queues) + len(added_queues) > 0: - logger.info(six.text_type("Workers on tower node(s) '{}' removed from queues {} and added to queues {}") - .format([i.hostname for i in instances], removed_queues, added_queues)) + logger.info(six.text_type("Workers on tower node '{}' removed from queues {} and added to queues {}") + .format(sender.hostname, removed_queues, added_queues)) # Expedite the first hearbeat run so a node comes online quickly. cluster_node_heartbeat.apply([]) diff --git a/awx/main/tests/unit/utils/test_ha.py b/awx/main/tests/unit/utils/test_ha.py index f73f8e908a..35c8005781 100644 --- a/awx/main/tests/unit/utils/test_ha.py +++ b/awx/main/tests/unit/utils/test_ha.py @@ -61,7 +61,7 @@ class TestAddRemoveCeleryWorkerQueues(): mock.patch('awx.main.utils.ha.settings.AWX_CELERY_QUEUES_STATIC', static_queues), mock.patch('awx.main.utils.ha.settings.AWX_CELERY_BCAST_QUEUES_STATIC', broadcast_queues), mock.patch('awx.main.utils.ha.settings.CLUSTER_HOST_ID', hostname)): - (added_queues, removed_queues) = _add_remove_celery_worker_queues(mock_app, [instance], worker_queues, hostname) + (added_queues, removed_queues) = _add_remove_celery_worker_queues(mock_app, instance, worker_queues, hostname) assert set(added_expected) == set(added_queues) assert set(removed_expected) == set(removed_queues) diff --git a/awx/main/utils/ha.py b/awx/main/utils/ha.py index dd629ca24d..1d9b6e08d3 100644 --- a/awx/main/utils/ha.py +++ b/awx/main/utils/ha.py @@ -14,14 +14,13 @@ def construct_bcast_queue_name(common_name): return common_name.encode('utf8') + '_' + settings.CLUSTER_HOST_ID -def _add_remove_celery_worker_queues(app, controlled_instances, worker_queues, worker_name): +def _add_remove_celery_worker_queues(app, instance, worker_queues, worker_name): removed_queues = [] added_queues = [] - hostnames = set([instance.hostname for instance in controlled_instances]) worker_queue_names = set([q['name'] for q in worker_queues]) bcast_queue_names = set([construct_bcast_queue_name(n) for n in settings.AWX_CELERY_BCAST_QUEUES_STATIC]) - all_queue_names = hostnames | set(settings.AWX_CELERY_QUEUES_STATIC) + all_queue_names = set([instance.hostname]) | set(settings.AWX_CELERY_QUEUES_STATIC) desired_queues = bcast_queue_names | (all_queue_names if instance.enabled else set()) # Remove queues @@ -69,18 +68,13 @@ class AWXCeleryRouter(object): def register_celery_worker_queues(app, celery_worker_name): instance = Instance.objects.me() - controlled_instances = [instance] - if instance.is_controller(): - controlled_instances.extend(Instance.objects.filter( - rampart_groups__controller__instances__hostname=instance.hostname - )) added_queues = [] removed_queues = [] celery_host_queues = app.control.inspect([celery_worker_name]).active_queues() celery_worker_queues = celery_host_queues[celery_worker_name] if celery_host_queues else [] - (added_queues, removed_queues) = _add_remove_celery_worker_queues(app, controlled_instances, + (added_queues, removed_queues) = _add_remove_celery_worker_queues(app, instance, celery_worker_queues, celery_worker_name) - return (controlled_instances, removed_queues, added_queues) + return (removed_queues, added_queues)