1
0
mirror of https://github.com/ansible/awx.git synced 2024-10-31 15:21:13 +03:00

Use closing consistently.

This commit is contained in:
Luke Sneeringer 2014-11-04 10:46:17 -06:00
parent 8d99ef116c
commit fd9647ce94
3 changed files with 54 additions and 52 deletions

View File

@ -9,6 +9,7 @@ import logging
import json
import signal
import time
from contextlib import closing
from optparse import make_option
from multiprocessing import Process, Queue
@ -90,62 +91,62 @@ class CallbackReceiver(object):
time.sleep(0.1)
def callback_handler(self, use_workers, worker_queues):
pubsub = PubSub('callbacks')
message_number = 0
total_messages = 0
last_parent_events = {}
for message in pubsub.subscribe():
total_messages += 1
if not use_workers:
self.process_job_event(message)
else:
job_parent_events = last_parent_events.get(message['job_id'], {})
if message['event'] in ('playbook_on_play_start', 'playbook_on_stats', 'playbook_on_vars_prompt'):
parent = job_parent_events.get('playbook_on_start', None)
elif message['event'] in ('playbook_on_notify', 'playbook_on_setup',
'playbook_on_task_start',
'playbook_on_no_hosts_matched',
'playbook_on_no_hosts_remaining',
'playbook_on_import_for_host',
'playbook_on_not_import_for_host'):
parent = job_parent_events.get('playbook_on_play_start', None)
elif message['event'].startswith('runner_on_'):
list_parents = []
list_parents.append(job_parent_events.get('playbook_on_setup', None))
list_parents.append(job_parent_events.get('playbook_on_task_start', None))
list_parents = sorted(filter(lambda x: x is not None, list_parents), cmp=lambda x, y: y.id-x.id)
parent = list_parents[0] if len(list_parents) > 0 else None
with closing(PubSub('callbacks')) as callbacks:
for message in callbacks.subscribe(wait=0.1):
total_messages += 1
if not use_workers:
self.process_job_event(message)
else:
parent = None
if parent is not None:
message['parent'] = parent.id
if 'created' in message:
del(message['created'])
if message['event'] in ('playbook_on_start', 'playbook_on_play_start',
'playbook_on_setup', 'playbook_on_task_start'):
job_parent_events[message['event']] = self.process_job_event(message)
else:
if message['event'] == 'playbook_on_stats':
job_parent_events = {}
queue_actual_worker = worker_queues[total_messages % WORKERS]
queue_actual_worker[0] += 1
queue_actual_worker[1].put(message)
if queue_actual_worker[0] >= MAX_REQUESTS:
queue_actual_worker[0] = 0
# print("Recycling worker process")
# queue_actual_worker[2].join()
# connection.close()
# w = Process(target=self.callback_worker, args=(queue_actual_worker[1],))
# w.daemon = True
# w.start()
job_parent_events = last_parent_events.get(message['job_id'], {})
if message['event'] in ('playbook_on_play_start', 'playbook_on_stats', 'playbook_on_vars_prompt'):
parent = job_parent_events.get('playbook_on_start', None)
elif message['event'] in ('playbook_on_notify', 'playbook_on_setup',
'playbook_on_task_start',
'playbook_on_no_hosts_matched',
'playbook_on_no_hosts_remaining',
'playbook_on_import_for_host',
'playbook_on_not_import_for_host'):
parent = job_parent_events.get('playbook_on_play_start', None)
elif message['event'].startswith('runner_on_'):
list_parents = []
list_parents.append(job_parent_events.get('playbook_on_setup', None))
list_parents.append(job_parent_events.get('playbook_on_task_start', None))
list_parents = sorted(filter(lambda x: x is not None, list_parents), cmp=lambda x, y: y.id-x.id)
parent = list_parents[0] if len(list_parents) > 0 else None
else:
parent = None
if parent is not None:
message['parent'] = parent.id
if 'created' in message:
del(message['created'])
if message['event'] in ('playbook_on_start', 'playbook_on_play_start',
'playbook_on_setup', 'playbook_on_task_start'):
job_parent_events[message['event']] = self.process_job_event(message)
else:
if message['event'] == 'playbook_on_stats':
job_parent_events = {}
queue_actual_worker = worker_queues[total_messages % WORKERS]
queue_actual_worker[0] += 1
queue_actual_worker[1].put(message)
if queue_actual_worker[0] >= MAX_REQUESTS:
queue_actual_worker[0] = 0
# print("Recycling worker process")
# queue_actual_worker[2].join()
# connection.close()
# w = Process(target=self.callback_worker, args=(queue_actual_worker[1],))
# w.daemon = True
# w.start()
# signal.signal(signal.SIGINT, shutdown_handler([w]))
# signal.signal(signal.SIGTERM, shutdown_handler([w]))
# signal.signal(signal.SIGINT, shutdown_handler([w]))
# signal.signal(signal.SIGTERM, shutdown_handler([w]))
# queue_actual_worker[2] = w
last_parent_events[message['job_id']] = job_parent_events
self.consumer_subscriber.send("1")
# queue_actual_worker[2] = w
last_parent_events[message['job_id']] = job_parent_events
self.consumer_subscriber.send("1")
def process_job_event(self, data):
# Sanity check: Do we need to do anything at all?

View File

@ -74,7 +74,7 @@ class PubSub(object):
Ideally this should be used with `contextmanager.closing` to ensure
well-behavedness:
from contextmanager import closing
from contextlib import closing
with closing(PubSub('foobar')) as foobar:
for message in foobar.subscribe(wait=0.1):

View File

@ -38,6 +38,7 @@ import sys
import urllib
import urlparse
import time
from contextlib import closing
# Requests
import requests
@ -125,8 +126,8 @@ class CallbackModule(object):
self.connection_pid = active_pid
# Publish the callback through Redis.
pubsub = PubSub('callbacks')
pubsub.publish(msg)
with closing(PubSub('callbacks')) as callbacks:
callbacks.publish(msg)
return
except Exception, e:
self.logger.info('Publish Exception: %r, retry=%d', e,