From ffb1707e74a71a55e10e737fdec3e2e0db0dcbce Mon Sep 17 00:00:00 2001 From: Ryan Petrello Date: Thu, 10 Oct 2019 17:06:12 -0400 Subject: [PATCH] add support for `awx-manage run_callback_receiver --status` --- awx/main/dispatch/control.py | 9 +++++---- awx/main/dispatch/pool.py | 5 +++++ awx/main/dispatch/worker/base.py | 12 +++++++++++- .../commands/run_callback_receiver.py | 18 +++++++++++++++++- awx/main/models/unified_jobs.py | 2 +- 5 files changed, 39 insertions(+), 7 deletions(-) diff --git a/awx/main/dispatch/control.py b/awx/main/dispatch/control.py index 5f081e84f2..83e2226012 100644 --- a/awx/main/dispatch/control.py +++ b/awx/main/dispatch/control.py @@ -15,18 +15,19 @@ class Control(object): services = ('dispatcher', 'callback_receiver') result = None - def __init__(self, service, host=None): + def __init__(self, service, queuename=None, routing_key=None): if service not in self.services: raise RuntimeError('{} must be in {}'.format(service, self.services)) self.service = service - self.queuename = host or get_local_queuename() - self.queue = Queue(self.queuename, Exchange(self.queuename), routing_key=self.queuename) + self.queuename = queuename or get_local_queuename() + self.routing_key = routing_key or self.queuename + self.queue = Queue(self.queuename, Exchange(self.queuename), routing_key=self.routing_key) def publish(self, msg, conn, **kwargs): producer = Producer( exchange=self.queue.exchange, channel=conn, - routing_key=self.queuename + routing_key=self.routing_key ) producer.publish(msg, expiration=5, **kwargs) diff --git a/awx/main/dispatch/pool.py b/awx/main/dispatch/pool.py index f5b92ca8f1..3fc502b33e 100644 --- a/awx/main/dispatch/pool.py +++ b/awx/main/dispatch/pool.py @@ -280,6 +280,11 @@ class WorkerPool(object): logger.exception('could not kill {}'.format(worker.pid)) + def cleanup(self): + for worker in self.workers: + worker.calculate_managed_tasks() + + class AutoscalePool(WorkerPool): ''' An extended pool implementation that automatically scales workers up and diff --git a/awx/main/dispatch/worker/base.py b/awx/main/dispatch/worker/base.py index e73ed4bade..bc440b831e 100644 --- a/awx/main/dispatch/worker/base.py +++ b/awx/main/dispatch/worker/base.py @@ -56,8 +56,18 @@ class AWXConsumer(ConsumerMixin): @property def listening_on(self): + def qname(q): + if q.routing_key != q.name: + return ':'.join([q.name, q.routing_key]) + return q.name + + def qtype(q): + if q.exchange.type != 'direct': + return ' [{}]'.format(q.exchange.type) + return '' + return 'listening on {}'.format([ - '{} [{}]'.format(q.name, q.exchange.type) for q in self.queues + '{}{}'.format(qname(q), qtype(q)) for q in self.queues ]) def control(self, body, message): diff --git a/awx/main/management/commands/run_callback_receiver.py b/awx/main/management/commands/run_callback_receiver.py index 51608a8b7a..8e706f5309 100644 --- a/awx/main/management/commands/run_callback_receiver.py +++ b/awx/main/management/commands/run_callback_receiver.py @@ -5,6 +5,8 @@ from django.conf import settings from django.core.management.base import BaseCommand from kombu import Exchange, Queue +from awx.main.dispatch import get_local_queuename +from awx.main.dispatch.control import Control from awx.main.dispatch.kombu import Connection from awx.main.dispatch.worker import AWXConsumer, CallbackBrokerWorker @@ -17,7 +19,20 @@ class Command(BaseCommand): ''' help = 'Launch the job callback receiver' + def add_arguments(self, parser): + parser.add_argument('--status', dest='status', action='store_true', + help='print the internal state of any running callback receiver') + def handle(self, *arg, **options): + control_routing_key = 'callback_receiver-{}-control'.format(get_local_queuename()) + if options.get('status'): + print(Control( + 'callback_receiver', + queuename=settings.CALLBACK_QUEUE, + routing_key=control_routing_key + ).status()) + return + with Connection(settings.BROKER_URL) as conn: consumer = None try: @@ -29,8 +44,9 @@ class Command(BaseCommand): Queue( settings.CALLBACK_QUEUE, Exchange(settings.CALLBACK_QUEUE, type='direct'), - routing_key=settings.CALLBACK_QUEUE + routing_key=key ) + for key in [settings.CALLBACK_QUEUE, control_routing_key] ] ) consumer.run() diff --git a/awx/main/models/unified_jobs.py b/awx/main/models/unified_jobs.py index 3613ac4d34..398f8bbe88 100644 --- a/awx/main/models/unified_jobs.py +++ b/awx/main/models/unified_jobs.py @@ -1319,7 +1319,7 @@ class UnifiedJob(PolymorphicModel, PasswordFieldsModel, CommonModelNameNotUnique timeout = 5 try: running = self.celery_task_id in ControlDispatcher( - 'dispatcher', self.execution_node + 'dispatcher', queuename=self.execution_node ).running(timeout=timeout) except socket.timeout: logger.error('could not reach dispatcher on {} within {}s'.format(