diff --git a/awx/main/management/commands/run_callback_receiver.py b/awx/main/management/commands/run_callback_receiver.py index 0ffa218348..14e44496bb 100644 --- a/awx/main/management/commands/run_callback_receiver.py +++ b/awx/main/management/commands/run_callback_receiver.py @@ -162,7 +162,7 @@ class CallbackBrokerWorker(ConsumerMixin): if body.get('event') == 'EOF': try: - final_line_count = body.get('final_line_count', 0) + final_counter = body.get('final_counter', 0) logger.info('Event processing is finished for Job {}, sending notifications'.format(job_identifier)) # EOF events are sent when stdout for the running task is # closed. don't actually persist them to the database; we @@ -170,7 +170,7 @@ class CallbackBrokerWorker(ConsumerMixin): # approximation for when a job is "done" emit_channel_notification( 'jobs-summary', - dict(group_name='jobs', unified_job_id=job_identifier, final_line_count=final_line_count) + dict(group_name='jobs', unified_job_id=job_identifier, final_counter=final_counter) ) # Additionally, when we've processed all events, we should # have all the data we need to send out success/failure diff --git a/awx/main/utils/common.py b/awx/main/utils/common.py index 59499acb2e..ac1f30d25e 100644 --- a/awx/main/utils/common.py +++ b/awx/main/utils/common.py @@ -989,7 +989,7 @@ class OutputEventFilter(object): if value: self._emit_event(value) self._buffer = StringIO() - self._event_callback(dict(event='EOF', final_line_count=self._start_line)) + self._event_callback(dict(event='EOF', final_counter=self._counter - 1)) def _emit_event(self, buffered_stdout, next_event_data=None): next_event_data = next_event_data or {} diff --git a/awx/ui/client/features/output/index.controller.js b/awx/ui/client/features/output/index.controller.js index 64d53b4faa..7ffb8e2f7e 100644 --- a/awx/ui/client/features/output/index.controller.js +++ b/awx/ui/client/features/output/index.controller.js @@ -163,6 +163,11 @@ function startListening () { listeners.push($scope.$on(resource.ws.events, (scope, data) => handleJobEvent(data))); listeners.push($scope.$on(resource.ws.status, (scope, data) => handleStatusEvent(data))); + + if (resource.model.get('type') === 'job') return; + if (resource.model.get('type') === 'project_update') return; + + listeners.push($scope.$on(resource.ws.summary, (scope, data) => handleSummaryEvent(data))); } function handleStatusEvent (data) { @@ -174,6 +179,13 @@ function handleJobEvent (data) { status.pushJobEvent(data); } +function handleSummaryEvent (data) { + if (resource.model.get('id') !== data.unified_job_id) return; + if (!data.final_counter) return; + + stream.setFinalCounter(data.final_counter); +} + function OutputIndexController ( _$compile_, _$q_, diff --git a/awx/ui/client/features/output/index.js b/awx/ui/client/features/output/index.js index 9f0d05df3c..6d990772ca 100644 --- a/awx/ui/client/features/output/index.js +++ b/awx/ui/client/features/output/index.js @@ -106,6 +106,7 @@ function resolveResource ( ws: { events: `${WS_PREFIX}-${key}-${id}`, status: `${WS_PREFIX}-${name}`, + summary: `${WS_PREFIX}-${name}-summary`, }, page: { cache: PAGE_CACHE, diff --git a/awx/ui/client/features/output/stream.service.js b/awx/ui/client/features/output/stream.service.js index c710a1fc1a..953c886882 100644 --- a/awx/ui/client/features/output/stream.service.js +++ b/awx/ui/client/features/output/stream.service.js @@ -14,10 +14,10 @@ function OutputStream ($q) { this.counters = { used: [], + ready: [], min: 1, max: 0, - last: null, - ready: false, + final: null, }; this.state = { @@ -81,16 +81,14 @@ function OutputStream ($q) { if (maxReady) { minReady = this.counters.min; - this.counters.ready = true; this.counters.min = maxReady + 1; this.counters.used = this.counters.used.filter(c => c > maxReady); - } else { - this.counters.ready = false; } this.counters.missing = missing; + this.counters.ready = [minReady, maxReady]; - return [minReady, maxReady]; + return this.counters.ready; }; this.pushJobEvent = data => { @@ -100,7 +98,7 @@ function OutputStream ($q) { .then(() => { if (data.event === JOB_END) { this.state.ending = true; - this.counters.last = data.counter; + this.counters.final = data.counter; } const [minReady, maxReady] = this.updateCounterState(data); @@ -117,7 +115,7 @@ function OutputStream ($q) { return $q.resolve(); } - const isLastFrame = this.state.ending && (maxReady >= this.counters.last); + const isLastFrame = this.state.ending && (maxReady >= this.counters.final); const events = this.hooks.bufferEmpty(minReady, maxReady); return this.emitFrames(events, isLastFrame); @@ -127,6 +125,27 @@ function OutputStream ($q) { return this.chain; }; + this.setFinalCounter = counter => { + this.chain = this.chain + .then(() => { + this.state.ending = true; + this.counters.final = counter; + + if (counter >= this.counters.min) { + return $q.resolve(); + } + + let events = []; + if (this.counters.ready.length > 0) { + events = this.hooks.bufferEmpty(...this.counters.ready); + } + + return this.emitFrames(events, true); + }); + + return this.chain; + }; + this.emitFrames = (events, last) => this.hooks.onFrames(events) .then(() => { if (last) { @@ -136,6 +155,7 @@ function OutputStream ($q) { this.hooks.onStop(); } + this.counters.ready.length = 0; return $q.resolve(); }); }