1
0
mirror of https://github.com/ansible/awx.git synced 2024-10-27 00:55:06 +03:00

remove multiprocessing.Queue usage from the callback receiver

instead, just have each worker connect directly to redis
this has a few benefits:

- it's simpler to explain and debug
- back pressure on the queue keeps messages around in redis (which is
  observable, and survives the restart of Python processes)
- it's likely notably more performant at high loads
This commit is contained in:
Ryan Petrello 2020-09-21 10:48:42 -04:00
parent aac17b9d2c
commit cd0b9de7b9
No known key found for this signature in database
GPG Key ID: F2AA5F2122351777
4 changed files with 67 additions and 26 deletions

View File

@ -25,8 +25,14 @@ class Control(object):
def status(self, *args, **kwargs):
r = redis.Redis.from_url(settings.BROKER_URL)
stats = r.get(f'awx_{self.service}_statistics') or b''
return stats.decode('utf-8')
if self.service == 'dispatcher':
stats = r.get(f'awx_{self.service}_statistics') or b''
return stats.decode('utf-8')
else:
workers = []
for key in r.keys('awx_callback_receiver_statistics_*'):
workers.append(r.get(key).decode('utf-8'))
return '\n'.join(workers)
def running(self, *args, **kwargs):
return self.control_with_reply('running', *args, **kwargs)

View File

@ -132,22 +132,9 @@ class AWXConsumerRedis(AWXConsumerBase):
super(AWXConsumerRedis, self).run(*args, **kwargs)
self.worker.on_start()
time_to_sleep = 1
while True:
while True:
try:
res = self.redis.blpop(self.queues)
time_to_sleep = 1
res = json.loads(res[1])
self.process_task(res)
except redis.exceptions.RedisError:
time_to_sleep = min(time_to_sleep * 2, 30)
logger.exception(f"encountered an error communicating with redis. Reconnect attempt in {time_to_sleep} seconds")
time.sleep(time_to_sleep)
except (json.JSONDecodeError, KeyError):
logger.exception("failed to decode JSON message from redis")
if self.should_stop:
return
logger.debug(f'{os.getpid()} is alive')
time.sleep(60)
class AWXConsumerPG(AWXConsumerBase):

View File

@ -1,4 +1,5 @@
import cProfile
import json
import logging
import os
import pstats
@ -6,13 +7,16 @@ import signal
import tempfile
import time
import traceback
from queue import Empty as QueueEmpty
from django.conf import settings
from django.utils.timezone import now as tz_now
from django.db import DatabaseError, OperationalError, connection as django_connection
from django.db.utils import InterfaceError, InternalError, IntegrityError
import psutil
import redis
from awx.main.consumers import emit_channel_notification
from awx.main.models import (JobEvent, AdHocCommandEvent, ProjectUpdateEvent,
InventoryUpdateEvent, SystemJobEvent, UnifiedJob,
@ -24,10 +28,6 @@ from .base import BaseWorker
logger = logging.getLogger('awx.main.commands.run_callback_receiver')
# the number of seconds to buffer events in memory before flushing
# using JobEvent.objects.bulk_create()
BUFFER_SECONDS = .1
class CallbackBrokerWorker(BaseWorker):
'''
@ -39,21 +39,57 @@ class CallbackBrokerWorker(BaseWorker):
'''
MAX_RETRIES = 2
last_stats = time.time()
total = 0
last_event = ''
prof = None
def __init__(self):
self.buff = {}
self.pid = os.getpid()
self.redis = redis.Redis.from_url(settings.BROKER_URL)
for key in self.redis.keys('awx_callback_receiver_statistics_*'):
self.redis.delete(key)
def read(self, queue):
try:
return queue.get(block=True, timeout=BUFFER_SECONDS)
except QueueEmpty:
return {'event': 'FLUSH'}
res = self.redis.blpop(settings.CALLBACK_QUEUE, timeout=settings.JOB_EVENT_BUFFER_SECONDS)
if res is None:
return {'event': 'FLUSH'}
self.total += 1
return json.loads(res[1])
except redis.exceptions.RedisError:
logger.exception("encountered an error communicating with redis")
time.sleep(1)
except (json.JSONDecodeError, KeyError):
logger.exception("failed to decode JSON message from redis")
finally:
self.record_statistics()
return {'event': 'FLUSH'}
def record_statistics(self):
# buffer stat recording to once per (by default) 5s
if time.time() - self.last_stats > settings.JOB_EVENT_STATISTICS_INTERVAL:
try:
self.redis.set(f'awx_callback_receiver_statistics_{self.pid}', self.debug())
self.last_stats = time.time()
except Exception:
logger.exception("encountered an error communicating with redis")
self.last_stats = time.time()
def debug(self):
return f'. worker[pid:{self.pid}] sent={self.total} rss={self.mb}MB {self.last_event}'
@property
def mb(self):
return '{:0.3f}'.format(
psutil.Process(self.pid).memory_info().rss / 1024.0 / 1024.0
)
def toggle_profiling(self, *args):
if self.prof:
self.prof.disable()
filename = f'callback-{os.getpid()}.pstats'
filename = f'callback-{self.pid}.pstats'
filepath = os.path.join(tempfile.gettempdir(), filename)
with open(filepath, 'w') as f:
pstats.Stats(self.prof, stream=f).sort_stats('cumulative').print_stats()
@ -108,6 +144,8 @@ class CallbackBrokerWorker(BaseWorker):
def perform_work(self, body):
try:
flush = body.get('event') == 'FLUSH'
if flush:
self.last_event = ''
if not flush:
event_map = {
'job_id': JobEvent,
@ -123,6 +161,8 @@ class CallbackBrokerWorker(BaseWorker):
job_identifier = body[key]
break
self.last_event = f'\n\t- {cls.__name__} for #{job_identifier} ({body.get("event", "")} {body.get("uuid", "")})' # noqa
if body.get('event') == 'EOF':
try:
final_counter = body.get('final_counter', 0)

View File

@ -197,6 +197,14 @@ LOCAL_STDOUT_EXPIRE_TIME = 2592000
# events into the database
JOB_EVENT_WORKERS = 4
# The number of seconds (must be an integer) to buffer callback receiver bulk
# writes in memory before flushing via JobEvent.objects.bulk_create()
JOB_EVENT_BUFFER_SECONDS = 1
# The interval at which callback receiver statistics should be
# recorded
JOB_EVENT_STATISTICS_INTERVAL = 5
# The maximum size of the job event worker queue before requests are blocked
JOB_EVENT_MAX_QUEUE_SIZE = 10000