mirror of
https://github.com/ansible/awx.git
synced 2024-11-01 08:21:15 +03:00
Merge pull request #992 from ryanpetrello/optimize-output-event-filter
optimize OutputEventFilter for large stdout streams
This commit is contained in:
commit
5387846cbb
@ -147,7 +147,7 @@ class CallbackBrokerWorker(ConsumerMixin):
|
||||
from pprint import pformat
|
||||
logger.info('Body: {}'.format(
|
||||
highlight(pformat(body, width=160), PythonLexer(), Terminal256Formatter(style='friendly'))
|
||||
))
|
||||
)[:1024 * 4])
|
||||
|
||||
def _save_event_data():
|
||||
for key, cls in event_map.items():
|
||||
|
@ -1,6 +1,7 @@
|
||||
import pytest
|
||||
import base64
|
||||
import json
|
||||
from StringIO import StringIO
|
||||
|
||||
from awx.main.utils import OutputEventFilter
|
||||
|
||||
@ -99,3 +100,46 @@ def test_large_data_payload(fake_callback, fake_cache, wrapped_handle):
|
||||
assert recomb_data['role'] == 'some_path_to_role'
|
||||
assert 'event' in recomb_data
|
||||
assert recomb_data['event'] == 'foo'
|
||||
|
||||
|
||||
def test_event_lazy_parsing(fake_callback, fake_cache, wrapped_handle):
|
||||
# Pretend that this is done by the Ansible callback module
|
||||
fake_cache[':1:ev-{}'.format(EXAMPLE_UUID)] = {'event': 'foo'}
|
||||
buff = StringIO()
|
||||
event_data_to_encode = {
|
||||
'uuid': EXAMPLE_UUID,
|
||||
'host': 'localhost',
|
||||
'role': 'some_path_to_role'
|
||||
}
|
||||
write_encoded_event_data(buff, event_data_to_encode)
|
||||
|
||||
# write the data to the event filter in chunks to test lazy event matching
|
||||
buff.seek(0)
|
||||
start_token_chunk = buff.read(1) # \x1b
|
||||
start_token_remainder = buff.read(2) # [K
|
||||
body = buff.read(15) # next 15 bytes of base64 data
|
||||
remainder = buff.read() # the remainder
|
||||
for chunk in (start_token_chunk, start_token_remainder, body, remainder):
|
||||
wrapped_handle.write(chunk)
|
||||
|
||||
wrapped_handle.write('\r\nTASK [Gathering Facts] *********************************************************\n')
|
||||
wrapped_handle.write('\u001b[0;33mchanged: [localhost]\u001b[0m\n')
|
||||
write_encoded_event_data(wrapped_handle, {})
|
||||
# stop pretending
|
||||
|
||||
assert len(fake_callback) == 1
|
||||
recomb_data = fake_callback[0]
|
||||
assert 'role' in recomb_data
|
||||
assert recomb_data['role'] == 'some_path_to_role'
|
||||
assert 'event' in recomb_data
|
||||
assert recomb_data['event'] == 'foo'
|
||||
|
||||
|
||||
@pytest.mark.timeout(1)
|
||||
def test_large_stdout_blob():
|
||||
def _callback(*args, **kw):
|
||||
pass
|
||||
|
||||
f = OutputEventFilter(_callback)
|
||||
for x in range(1024 * 10):
|
||||
f.write('x' * 1024)
|
||||
|
@ -18,6 +18,7 @@ import contextlib
|
||||
import tempfile
|
||||
import six
|
||||
import psutil
|
||||
from StringIO import StringIO
|
||||
|
||||
# Decorator
|
||||
from decorator import decorator
|
||||
@ -867,7 +868,8 @@ class OutputEventFilter(object):
|
||||
self._event_ct = 0
|
||||
self._counter = 1
|
||||
self._start_line = 0
|
||||
self._buffer = ''
|
||||
self._buffer = StringIO()
|
||||
self._last_chunk = ''
|
||||
self._current_event_data = None
|
||||
|
||||
def flush(self):
|
||||
@ -878,9 +880,19 @@ class OutputEventFilter(object):
|
||||
pass
|
||||
|
||||
def write(self, data):
|
||||
self._buffer += data
|
||||
while True:
|
||||
match = self.EVENT_DATA_RE.search(self._buffer)
|
||||
self._buffer.write(data)
|
||||
|
||||
# keep a sliding window of the last chunk written so we can detect
|
||||
# event tokens and determine if we need to perform a search of the full
|
||||
# buffer
|
||||
should_search = '\x1b[K' in (self._last_chunk + data)
|
||||
self._last_chunk = data
|
||||
|
||||
# Only bother searching the buffer if we recently saw a start/end
|
||||
# token (\x1b[K)
|
||||
while should_search:
|
||||
value = self._buffer.getvalue()
|
||||
match = self.EVENT_DATA_RE.search(value)
|
||||
if not match:
|
||||
break
|
||||
try:
|
||||
@ -888,13 +900,17 @@ class OutputEventFilter(object):
|
||||
event_data = json.loads(base64.b64decode(base64_data))
|
||||
except ValueError:
|
||||
event_data = {}
|
||||
self._emit_event(self._buffer[:match.start()], event_data)
|
||||
self._buffer = self._buffer[match.end():]
|
||||
self._emit_event(value[:match.start()], event_data)
|
||||
remainder = value[match.end():]
|
||||
self._buffer = StringIO()
|
||||
self._buffer.write(remainder)
|
||||
self._last_chunk = remainder
|
||||
|
||||
def close(self):
|
||||
if self._buffer:
|
||||
self._emit_event(self._buffer)
|
||||
self._buffer = ''
|
||||
value = self._buffer.getvalue()
|
||||
if value:
|
||||
self._emit_event(value)
|
||||
self._buffer = StringIO()
|
||||
self._event_callback(dict(event='EOF'))
|
||||
|
||||
def _emit_event(self, buffered_stdout, next_event_data=None):
|
||||
|
@ -10,6 +10,7 @@ pytest-cov
|
||||
pytest-django
|
||||
pytest-pythonpath
|
||||
pytest-mock
|
||||
pytest-timeout
|
||||
logutils
|
||||
flower
|
||||
uwsgitop
|
||||
|
Loading…
Reference in New Issue
Block a user