1
0
mirror of https://github.com/ansible/awx.git synced 2024-11-01 08:21:15 +03:00

Merge pull request #3856 from ansible/revert-3842-callback-receiver-status

Revert "add support for `awx-manage run_callback_receiver --status`"
This commit is contained in:
Ryan Petrello 2019-10-18 10:10:58 -04:00 committed by GitHub
commit bd8b3a4f74
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 7 additions and 39 deletions

View File

@ -15,19 +15,18 @@ class Control(object):
services = ('dispatcher', 'callback_receiver')
result = None
def __init__(self, service, queuename=None, routing_key=None):
def __init__(self, service, host=None):
if service not in self.services:
raise RuntimeError('{} must be in {}'.format(service, self.services))
self.service = service
self.queuename = queuename or get_local_queuename()
self.routing_key = routing_key or self.queuename
self.queue = Queue(self.queuename, Exchange(self.queuename), routing_key=self.routing_key)
self.queuename = host or get_local_queuename()
self.queue = Queue(self.queuename, Exchange(self.queuename), routing_key=self.queuename)
def publish(self, msg, conn, **kwargs):
producer = Producer(
exchange=self.queue.exchange,
channel=conn,
routing_key=self.routing_key
routing_key=self.queuename
)
producer.publish(msg, expiration=5, **kwargs)

View File

@ -280,11 +280,6 @@ class WorkerPool(object):
logger.exception('could not kill {}'.format(worker.pid))
def cleanup(self):
for worker in self.workers:
worker.calculate_managed_tasks()
class AutoscalePool(WorkerPool):
'''
An extended pool implementation that automatically scales workers up and

View File

@ -56,18 +56,8 @@ class AWXConsumer(ConsumerMixin):
@property
def listening_on(self):
def qname(q):
if q.routing_key != q.name:
return ':'.join([q.name, q.routing_key])
return q.name
def qtype(q):
if q.exchange.type != 'direct':
return ' [{}]'.format(q.exchange.type)
return ''
return 'listening on {}'.format([
'{}{}'.format(qname(q), qtype(q)) for q in self.queues
'{} [{}]'.format(q.name, q.exchange.type) for q in self.queues
])
def control(self, body, message):

View File

@ -5,8 +5,6 @@ from django.conf import settings
from django.core.management.base import BaseCommand
from kombu import Exchange, Queue
from awx.main.dispatch import get_local_queuename
from awx.main.dispatch.control import Control
from awx.main.dispatch.kombu import Connection
from awx.main.dispatch.worker import AWXConsumer, CallbackBrokerWorker
@ -19,20 +17,7 @@ class Command(BaseCommand):
'''
help = 'Launch the job callback receiver'
def add_arguments(self, parser):
parser.add_argument('--status', dest='status', action='store_true',
help='print the internal state of any running callback receiver')
def handle(self, *arg, **options):
control_routing_key = 'callback_receiver-{}-control'.format(get_local_queuename())
if options.get('status'):
print(Control(
'callback_receiver',
queuename=settings.CALLBACK_QUEUE,
routing_key=control_routing_key
).status())
return
with Connection(settings.BROKER_URL) as conn:
consumer = None
try:
@ -44,9 +29,8 @@ class Command(BaseCommand):
Queue(
settings.CALLBACK_QUEUE,
Exchange(settings.CALLBACK_QUEUE, type='direct'),
routing_key=key
routing_key=settings.CALLBACK_QUEUE
)
for key in [settings.CALLBACK_QUEUE, control_routing_key]
]
)
consumer.run()

View File

@ -1319,7 +1319,7 @@ class UnifiedJob(PolymorphicModel, PasswordFieldsModel, CommonModelNameNotUnique
timeout = 5
try:
running = self.celery_task_id in ControlDispatcher(
'dispatcher', queuename=self.execution_node
'dispatcher', self.execution_node
).running(timeout=timeout)
except socket.timeout:
logger.error('could not reach dispatcher on {} within {}s'.format(