diff --git a/awx/main/tasks.py b/awx/main/tasks.py index c47101b988..ca6f7dbede 100644 --- a/awx/main/tasks.py +++ b/awx/main/tasks.py @@ -55,7 +55,7 @@ from awx.main.queue import CallbackQueueDispatcher from awx.main.expect import run, isolated_manager from awx.main.utils import (get_ansible_version, get_ssh_version, decrypt_field, update_scm_url, check_proot_installed, build_proot_temp_dir, get_licenser, - wrap_args_with_proot, OutputEventFilter, ignore_inventory_computed_fields, + wrap_args_with_proot, OutputEventFilter, OutputVerboseFilter, ignore_inventory_computed_fields, ignore_inventory_group_removal, get_type_for_model, extract_ansible_vars) from awx.main.utils.reload import restart_local_services, stop_local_services from awx.main.utils.pglock import advisory_lock @@ -811,19 +811,26 @@ class BaseTask(LogErrorsTask): def get_stdout_handle(self, instance): ''' - Return an virtual file object for capturing stdout and events. + Return an virtual file object for capturing stdout and/or events. ''' dispatcher = CallbackQueueDispatcher() - def event_callback(event_data): - event_data.setdefault(self.event_data_key, instance.id) - if 'uuid' in event_data: - cache_event = cache.get('ev-{}'.format(event_data['uuid']), None) - if cache_event is not None: - event_data.update(cache_event) - dispatcher.dispatch(event_data) + if isinstance(instance, (Job, AdHocCommand, ProjectUpdate)): + def event_callback(event_data): + event_data.setdefault(self.event_data_key, instance.id) + if 'uuid' in event_data: + cache_event = cache.get('ev-{}'.format(event_data['uuid']), None) + if cache_event is not None: + event_data.update(cache_event) + dispatcher.dispatch(event_data) - return OutputEventFilter(event_callback) + return OutputEventFilter(event_callback) + else: + def event_callback(event_data): + event_data.setdefault(self.event_data_key, instance.id) + dispatcher.dispatch(event_data) + + return OutputVerboseFilter(event_callback) def pre_run_hook(self, instance, **kwargs): ''' diff --git a/awx/main/tests/unit/utils/test_event_filter.py b/awx/main/tests/unit/utils/test_event_filter.py index 85ecc609d0..fb8f4fa144 100644 --- a/awx/main/tests/unit/utils/test_event_filter.py +++ b/awx/main/tests/unit/utils/test_event_filter.py @@ -5,7 +5,7 @@ from StringIO import StringIO from six.moves import xrange -from awx.main.utils import OutputEventFilter +from awx.main.utils import OutputEventFilter, OutputVerboseFilter MAX_WIDTH = 78 EXAMPLE_UUID = '890773f5-fe6d-4091-8faf-bdc8021d65dd' @@ -145,3 +145,55 @@ def test_large_stdout_blob(): f = OutputEventFilter(_callback) for x in range(1024 * 10): f.write('x' * 1024) + + +def test_verbose_line_buffering(): + events = [] + + def _callback(event_data): + events.append(event_data) + + f = OutputVerboseFilter(_callback) + f.write('one two\r\n\r\n') + + assert len(events) == 2 + assert events[0]['start_line'] == 0 + assert events[0]['end_line'] == 1 + assert events[0]['stdout'] == 'one two' + + assert events[1]['start_line'] == 1 + assert events[1]['end_line'] == 2 + assert events[1]['stdout'] == '' + + f.write('three') + assert len(events) == 2 + f.write('\r\nfou') + + # three is not pushed to buffer until its line completes + assert len(events) == 3 + assert events[2]['start_line'] == 2 + assert events[2]['end_line'] == 3 + assert events[2]['stdout'] == 'three' + + f.write('r\r') + f.write('\nfi') + + assert events[3]['start_line'] == 3 + assert events[3]['end_line'] == 4 + assert events[3]['stdout'] == 'four' + + f.write('ve') + f.write('\r\n') + + assert len(events) == 5 + assert events[4]['start_line'] == 4 + assert events[4]['end_line'] == 5 + assert events[4]['stdout'] == 'five' + + f.close() + + from pprint import pprint + pprint(events) + assert len(events) == 6 + + assert events[5]['event'] == 'EOF' diff --git a/awx/main/utils/common.py b/awx/main/utils/common.py index ba3413a133..be531d7e17 100644 --- a/awx/main/utils/common.py +++ b/awx/main/utils/common.py @@ -48,7 +48,7 @@ __all__ = ['get_object_or_400', 'get_object_or_403', 'camelcase_to_underscore', 'copy_m2m_relationships', 'prefetch_page_capabilities', 'to_python_boolean', 'ignore_inventory_computed_fields', 'ignore_inventory_group_removal', '_inventory_updates', 'get_pk_from_dict', 'getattrd', 'NoDefaultProvided', - 'get_current_apps', 'set_current_apps', 'OutputEventFilter', + 'get_current_apps', 'set_current_apps', 'OutputEventFilter', 'OutputVerboseFilter', 'extract_ansible_vars', 'get_search_fields', 'get_system_task_capacity', 'get_cpu_capacity', 'get_mem_capacity', 'wrap_args_with_proot', 'build_proot_temp_dir', 'check_proot_installed', 'model_to_dict', 'model_instance_diff', 'timestamp_apiformat', 'parse_yaml_or_json', 'RequireDebugTrueOrTest', @@ -1009,6 +1009,32 @@ class OutputEventFilter(object): self._current_event_data = None +class OutputVerboseFilter(OutputEventFilter): + ''' + File-like object that dispatches stdout data. + Does not search for encoded job event data. + Use for unified job types that do not encode job event data. + ''' + def write(self, data): + self._buffer.write(data) + + # if the current chunk contains a line break + if data and '\n' in data: + # emit events for all complete lines we know about + lines = self._buffer.getvalue().splitlines(True) # keep ends + remainder = None + # if last line is not a complete line, then exclude it + if '\n' not in lines[-1]: + remainder = lines.pop() + # emit all complete lines + for line in lines: + self._emit_event(line) + self._buffer = StringIO() + # put final partial line back on buffer + if remainder: + self._buffer.write(remainder) + + def is_ansible_variable(key): return key.startswith('ansible_')