diff --git a/awxkit/awxkit/ws.py b/awxkit/awxkit/ws.py index 8005a8ef66..11668641bd 100644 --- a/awxkit/awxkit/ws.py +++ b/awxkit/awxkit/ws.py @@ -4,6 +4,7 @@ import logging import atexit import json import ssl +from datetime import datetime from six.moves.queue import Queue, Empty from six.moves.urllib.parse import urlparse @@ -93,6 +94,7 @@ class WSClient(object): cookie=auth_cookie) self._message_cache = [] self._should_subscribe_to_pending_job = False + self._pending_unsubscribe = threading.Event() def connect(self): wst = threading.Thread(target=self._ws_run_forever, args=(self.ws, {"cert_reqs": ssl.CERT_NONE})) @@ -184,11 +186,17 @@ class WSClient(object): payload['xrftoken'] = self.csrftoken self._send(json.dumps(payload)) - def unsubscribe(self): - self._send(json.dumps(dict(groups={}, xrftoken=self.csrftoken))) - # it takes time for the unsubscribe event to be recieved and consumed and for - # messages to stop being put on the queue for daphne to send to us - time.sleep(5) + def unsubscribe(self, wait=True, timeout=10): + time_start = datetime.now() + if wait: + # Other unnsubscribe events could have caused the edge to trigger. + # This way the _next_ event will trigger our waiting. + self._pending_unsubscribe.clear() + self._send(json.dumps(dict(groups={}, xrftoken=self.csrftoken))) + if not self._pending_unsubscribe.wait(timeout): + raise RuntimeError(f"Failed while waiting on unsubscribe reply because timeout of {timeout} seconds was reached.") + else: + self._send(json.dumps(dict(groups={}, xrftoken=self.csrftoken))) def _on_message(self, message): message = json.loads(message) @@ -202,6 +210,10 @@ class WSClient(object): self._should_subscribe_to_pending_job['events'] == 'project_update_events'): self._update_subscription(message['unified_job_id']) + # unsubscribe acknowledgement + if 'groups_current' in message: + self._pending_unsubscribe.set() + return self._recv_queue.put(message) def _update_subscription(self, job_id):