mirror of
https://github.com/ansible/awx.git
synced 2024-11-01 16:51:11 +03:00
Continuously stream data from verbose jobs
In verbose unified job models (inventory updates, system jobs, etc.), do not delay dispatch just because the encoded event data is not part of the data written to the buffer. This allows output from these commands to be submitted to the callback queue as they are produced, instead of waiting until the buffer is closed.
This commit is contained in:
parent
3a3c883504
commit
8c167e50c9
@ -55,7 +55,7 @@ from awx.main.queue import CallbackQueueDispatcher
|
||||
from awx.main.expect import run, isolated_manager
|
||||
from awx.main.utils import (get_ansible_version, get_ssh_version, decrypt_field, update_scm_url,
|
||||
check_proot_installed, build_proot_temp_dir, get_licenser,
|
||||
wrap_args_with_proot, OutputEventFilter, ignore_inventory_computed_fields,
|
||||
wrap_args_with_proot, OutputEventFilter, OutputVerboseFilter, ignore_inventory_computed_fields,
|
||||
ignore_inventory_group_removal, get_type_for_model, extract_ansible_vars)
|
||||
from awx.main.utils.reload import restart_local_services, stop_local_services
|
||||
from awx.main.utils.pglock import advisory_lock
|
||||
@ -811,19 +811,26 @@ class BaseTask(LogErrorsTask):
|
||||
|
||||
def get_stdout_handle(self, instance):
|
||||
'''
|
||||
Return an virtual file object for capturing stdout and events.
|
||||
Return an virtual file object for capturing stdout and/or events.
|
||||
'''
|
||||
dispatcher = CallbackQueueDispatcher()
|
||||
|
||||
def event_callback(event_data):
|
||||
event_data.setdefault(self.event_data_key, instance.id)
|
||||
if 'uuid' in event_data:
|
||||
cache_event = cache.get('ev-{}'.format(event_data['uuid']), None)
|
||||
if cache_event is not None:
|
||||
event_data.update(cache_event)
|
||||
dispatcher.dispatch(event_data)
|
||||
if isinstance(instance, (Job, AdHocCommand, ProjectUpdate)):
|
||||
def event_callback(event_data):
|
||||
event_data.setdefault(self.event_data_key, instance.id)
|
||||
if 'uuid' in event_data:
|
||||
cache_event = cache.get('ev-{}'.format(event_data['uuid']), None)
|
||||
if cache_event is not None:
|
||||
event_data.update(cache_event)
|
||||
dispatcher.dispatch(event_data)
|
||||
|
||||
return OutputEventFilter(event_callback)
|
||||
return OutputEventFilter(event_callback)
|
||||
else:
|
||||
def event_callback(event_data):
|
||||
event_data.setdefault(self.event_data_key, instance.id)
|
||||
dispatcher.dispatch(event_data)
|
||||
|
||||
return OutputVerboseFilter(event_callback)
|
||||
|
||||
def pre_run_hook(self, instance, **kwargs):
|
||||
'''
|
||||
|
@ -5,7 +5,7 @@ from StringIO import StringIO
|
||||
|
||||
from six.moves import xrange
|
||||
|
||||
from awx.main.utils import OutputEventFilter
|
||||
from awx.main.utils import OutputEventFilter, OutputVerboseFilter
|
||||
|
||||
MAX_WIDTH = 78
|
||||
EXAMPLE_UUID = '890773f5-fe6d-4091-8faf-bdc8021d65dd'
|
||||
@ -145,3 +145,55 @@ def test_large_stdout_blob():
|
||||
f = OutputEventFilter(_callback)
|
||||
for x in range(1024 * 10):
|
||||
f.write('x' * 1024)
|
||||
|
||||
|
||||
def test_verbose_line_buffering():
|
||||
events = []
|
||||
|
||||
def _callback(event_data):
|
||||
events.append(event_data)
|
||||
|
||||
f = OutputVerboseFilter(_callback)
|
||||
f.write('one two\r\n\r\n')
|
||||
|
||||
assert len(events) == 2
|
||||
assert events[0]['start_line'] == 0
|
||||
assert events[0]['end_line'] == 1
|
||||
assert events[0]['stdout'] == 'one two'
|
||||
|
||||
assert events[1]['start_line'] == 1
|
||||
assert events[1]['end_line'] == 2
|
||||
assert events[1]['stdout'] == ''
|
||||
|
||||
f.write('three')
|
||||
assert len(events) == 2
|
||||
f.write('\r\nfou')
|
||||
|
||||
# three is not pushed to buffer until its line completes
|
||||
assert len(events) == 3
|
||||
assert events[2]['start_line'] == 2
|
||||
assert events[2]['end_line'] == 3
|
||||
assert events[2]['stdout'] == 'three'
|
||||
|
||||
f.write('r\r')
|
||||
f.write('\nfi')
|
||||
|
||||
assert events[3]['start_line'] == 3
|
||||
assert events[3]['end_line'] == 4
|
||||
assert events[3]['stdout'] == 'four'
|
||||
|
||||
f.write('ve')
|
||||
f.write('\r\n')
|
||||
|
||||
assert len(events) == 5
|
||||
assert events[4]['start_line'] == 4
|
||||
assert events[4]['end_line'] == 5
|
||||
assert events[4]['stdout'] == 'five'
|
||||
|
||||
f.close()
|
||||
|
||||
from pprint import pprint
|
||||
pprint(events)
|
||||
assert len(events) == 6
|
||||
|
||||
assert events[5]['event'] == 'EOF'
|
||||
|
@ -48,7 +48,7 @@ __all__ = ['get_object_or_400', 'get_object_or_403', 'camelcase_to_underscore',
|
||||
'copy_m2m_relationships', 'prefetch_page_capabilities', 'to_python_boolean',
|
||||
'ignore_inventory_computed_fields', 'ignore_inventory_group_removal',
|
||||
'_inventory_updates', 'get_pk_from_dict', 'getattrd', 'NoDefaultProvided',
|
||||
'get_current_apps', 'set_current_apps', 'OutputEventFilter',
|
||||
'get_current_apps', 'set_current_apps', 'OutputEventFilter', 'OutputVerboseFilter',
|
||||
'extract_ansible_vars', 'get_search_fields', 'get_system_task_capacity', 'get_cpu_capacity', 'get_mem_capacity',
|
||||
'wrap_args_with_proot', 'build_proot_temp_dir', 'check_proot_installed', 'model_to_dict',
|
||||
'model_instance_diff', 'timestamp_apiformat', 'parse_yaml_or_json', 'RequireDebugTrueOrTest',
|
||||
@ -1009,6 +1009,32 @@ class OutputEventFilter(object):
|
||||
self._current_event_data = None
|
||||
|
||||
|
||||
class OutputVerboseFilter(OutputEventFilter):
|
||||
'''
|
||||
File-like object that dispatches stdout data.
|
||||
Does not search for encoded job event data.
|
||||
Use for unified job types that do not encode job event data.
|
||||
'''
|
||||
def write(self, data):
|
||||
self._buffer.write(data)
|
||||
|
||||
# if the current chunk contains a line break
|
||||
if data and '\n' in data:
|
||||
# emit events for all complete lines we know about
|
||||
lines = self._buffer.getvalue().splitlines(True) # keep ends
|
||||
remainder = None
|
||||
# if last line is not a complete line, then exclude it
|
||||
if '\n' not in lines[-1]:
|
||||
remainder = lines.pop()
|
||||
# emit all complete lines
|
||||
for line in lines:
|
||||
self._emit_event(line)
|
||||
self._buffer = StringIO()
|
||||
# put final partial line back on buffer
|
||||
if remainder:
|
||||
self._buffer.write(remainder)
|
||||
|
||||
|
||||
def is_ansible_variable(key):
|
||||
return key.startswith('ansible_')
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user