diff --git a/.gitignore b/.gitignore index abcaa6dc04..4981eb3afd 100644 --- a/.gitignore +++ b/.gitignore @@ -11,7 +11,7 @@ awx/ui/static/js/awx.min.js awx/ui/static/js/local_config.js awx/ui/static/css/awx.min.css awx/main/fixtures -awx/tower_warnings.log +awx/*.log tower/tower_warnings.log celerybeat-schedule awx/ui/static/docs diff --git a/Makefile b/Makefile index 442106ae50..12bd816eaf 100644 --- a/Makefile +++ b/Makefile @@ -208,6 +208,7 @@ server_noattach: server: server_noattach tmux -2 attach-session -t tower +# Use with iterm2's native tmux protocol support servercc: server_noattach tmux -2 -CC attach-session -t tower diff --git a/awx/main/management/commands/run_callback_receiver.py b/awx/main/management/commands/run_callback_receiver.py index 01994b0a95..e883a1963c 100644 --- a/awx/main/management/commands/run_callback_receiver.py +++ b/awx/main/management/commands/run_callback_receiver.py @@ -23,6 +23,8 @@ from django.db import connection from awx.main.models import * # noqa from awx.main.socket import Socket +logger = logging.getLogger('awx.main.commands.run_callback_receiver') + MAX_REQUESTS = 10000 WORKERS = 4 @@ -31,9 +33,6 @@ class CallbackReceiver(object): def __init__(self): self.parent_mappings = {} - def print_log(self, message): - print("[%s] %s" % (now().isoformat(), message)) - def run_subscriber(self, use_workers=True): def shutdown_handler(active_workers): def _handler(signum, frame): @@ -62,10 +61,10 @@ class CallbackReceiver(object): w = Process(target=self.callback_worker, args=(queue_actual,)) w.start() if settings.DEBUG: - self.print_log('Started worker %s' % str(idx)) + logger.info('Started worker %s' % str(idx)) worker_queues.append([0, queue_actual, w]) elif settings.DEBUG: - self.print_log('Started callback receiver (no workers)') + logger.warn('Started callback receiver (no workers)') main_process = Process( target=self.callback_handler, @@ -216,12 +215,12 @@ class CallbackReceiver(object): return job_event except DatabaseError as e: # Log the error and try again. - self.print_log('Database error saving job event, retrying in ' - '1 second (retry #%d): %s', retry_count + 1, e) + logger.error('Database error saving job event, retrying in ' + '1 second (retry #%d): %s', retry_count + 1, e) time.sleep(1) # We failed too many times, and are giving up. - self.print_log('Failed to save job event after %d retries.', retry_count) + logger.error('Failed to save job event after %d retries.', retry_count) return None def callback_worker(self, queue_actual): @@ -234,7 +233,7 @@ class CallbackReceiver(object): self.process_job_event(message) messages_processed += 1 if messages_processed >= MAX_REQUESTS: - self.print_log("Shutting down message receiver") + logger.info("Shutting down message receiver") break class Command(NoArgsCommand): @@ -245,19 +244,7 @@ class Command(NoArgsCommand): ''' help = 'Launch the job callback receiver' - def init_logging(self): - log_levels = dict(enumerate([logging.ERROR, logging.INFO, - logging.DEBUG, 0])) - self.logger = logging.getLogger('awx.main.commands.run_callback_receiver') - self.logger.setLevel(log_levels.get(self.verbosity, 0)) - handler = logging.StreamHandler() - handler.setFormatter(logging.Formatter('%(message)s')) - self.logger.addHandler(handler) - self.logger.propagate = False - def handle_noargs(self, **options): - self.verbosity = int(options.get('verbosity', 1)) - self.init_logging() cr = CallbackReceiver() try: cr.run_subscriber() diff --git a/awx/main/management/commands/run_socketio_service.py b/awx/main/management/commands/run_socketio_service.py index 625f4e70bc..2a47818893 100644 --- a/awx/main/management/commands/run_socketio_service.py +++ b/awx/main/management/commands/run_socketio_service.py @@ -23,8 +23,7 @@ from socketio import socketio_manage from socketio.server import SocketIOServer from socketio.namespace import BaseNamespace -def print_log(message): - print("[%s] %s" % (now().isoformat(), message)) +logger = logging.getLogger('awx.main.commands.run_socketio_service') valid_sockets = [] @@ -45,6 +44,7 @@ class TowerBaseNamespace(BaseNamespace): valid_sockets = valid_sockets[1:] return set(['recv_connect'] + self.get_allowed_methods()) else: + logger.warn("Authentication Failure validating user") self.emit("connect_failed", "Authentication failed") return set(['recv_connect']) @@ -67,6 +67,7 @@ class TowerBaseNamespace(BaseNamespace): else: return False except Exception, e: + logger.error("Exception validating user: " + str(e)) return False def recv_connect(self): @@ -76,20 +77,20 @@ class TowerBaseNamespace(BaseNamespace): class TestNamespace(TowerBaseNamespace): def recv_connect(self): - print_log("Received client connect for test namespace from %s" % str(self.environ['REMOTE_ADDR'])) + logger.info("Received client connect for test namespace from %s" % str(self.environ['REMOTE_ADDR'])) self.emit('test', "If you see this then you attempted to connect to the test socket endpoint") super(TestNamespace, self).recv_connect() class JobNamespace(TowerBaseNamespace): def recv_connect(self): - print_log("Received client connect for job namespace from %s" % str(self.environ['REMOTE_ADDR'])) + logger.info("Received client connect for job namespace from %s" % str(self.environ['REMOTE_ADDR'])) super(JobNamespace, self).recv_connect() class JobEventNamespace(TowerBaseNamespace): def recv_connect(self): - print_log("Received client connect for job event namespace from %s" % str(self.environ['REMOTE_ADDR'])) + logger.info("Received client connect for job event namespace from %s" % str(self.environ['REMOTE_ADDR'])) super(JobEventNamespace, self).recv_connect() class ScheduleNamespace(TowerBaseNamespace): @@ -98,7 +99,7 @@ class ScheduleNamespace(TowerBaseNamespace): return ["schedule_changed"] def recv_connect(self): - print_log("Received client connect for schedule namespace from %s" % str(self.environ['REMOTE_ADDR'])) + logger.info("Received client connect for schedule namespace from %s" % str(self.environ['REMOTE_ADDR'])) super(ScheduleNamespace, self).recv_connect() class TowerSocket(object): @@ -111,6 +112,7 @@ class TowerSocket(object): '/socket.io/job_events': JobEventNamespace, '/socket.io/schedules': ScheduleNamespace}) else: + logger.warn("Invalid connect path received: " + path) start_response('404 Not Found', []) return ['Tower version %s' % awx.__version__] @@ -141,28 +143,16 @@ class Command(NoArgsCommand): make_option('--socketio_port', dest='socketio_port', type='int', default=8080, help='Port to accept socketio requests from clients'),) - def init_logging(self): - log_levels = dict(enumerate([logging.ERROR, logging.INFO, - logging.DEBUG, 0])) - self.logger = logging.getLogger('awx.main.commands.run_socketio_service') - self.logger.setLevel(log_levels.get(self.verbosity, 0)) - handler = logging.StreamHandler() - handler.setFormatter(logging.Formatter('%(message)s')) - self.logger.addHandler(handler) - self.logger.propagate = False - def handle_noargs(self, **options): - self.verbosity = int(options.get('verbosity', 1)) - self.init_logging() socketio_listen_port = settings.SOCKETIO_LISTEN_PORT try: if os.path.exists('/etc/tower/tower.cert') and os.path.exists('/etc/tower/tower.key'): - print 'Listening on port https://0.0.0.0:' + str(socketio_listen_port) + logger.info('Listening on port https://0.0.0.0:' + str(socketio_listen_port)) server = SocketIOServer(('0.0.0.0', socketio_listen_port), TowerSocket(), resource='socket.io', keyfile='/etc/tower/tower.key', certfile='/etc/tower/tower.cert') else: - print 'Listening on port http://0.0.0.0:' + str(socketio_listen_port) + logger.info('Listening on port http://0.0.0.0:' + str(socketio_listen_port)) server = SocketIOServer(('0.0.0.0', socketio_listen_port), TowerSocket(), resource='socket.io') handler_thread = Thread(target=notification_handler, args=(server,)) diff --git a/awx/main/management/commands/run_task_system.py b/awx/main/management/commands/run_task_system.py index 24721b1919..31acbb6af4 100644 --- a/awx/main/management/commands/run_task_system.py +++ b/awx/main/management/commands/run_task_system.py @@ -22,14 +22,10 @@ from awx.main.utils import get_system_task_capacity # Celery from celery.task.control import inspect +logger = logging.getLogger('awx.main.commands.run_task_system') queue = FifoQueue('tower_task_manager') - -def print_log(message): - print("[%s] %s" % (now().isoformat(), message)) - - class SimpleDAG(object): ''' A simple implementation of a directed acyclic graph ''' @@ -165,7 +161,7 @@ def rebuild_graph(message): if not hasattr(settings, 'IGNORE_CELERY_INSPECTOR'): active_task_queues = inspector.active() else: - print_log("Ignoring celery task inspector") + logger.warn("Ignoring celery task inspector") active_task_queues = None all_sorted_tasks = get_tasks() @@ -177,10 +173,9 @@ def rebuild_graph(message): for queue in active_task_queues: active_tasks += [at['id'] for at in active_task_queues[queue]] else: - if settings.DEBUG: - print_log("Could not communicate with celery!") - # TODO: Something needs to be done here to signal to the system - # as a whole that celery appears to be down. + logger.error("Could not communicate with celery!") + # TODO: Something needs to be done here to signal to the system + # as a whole that celery appears to be down. if not hasattr(settings, 'CELERY_UNIT_TEST'): return None running_tasks = filter(lambda t: t.status == 'running', all_sorted_tasks) @@ -188,7 +183,7 @@ def rebuild_graph(message): new_tasks = filter(lambda t: t.status == 'pending', all_sorted_tasks) # Check running tasks and make sure they are active in celery - print_log("Active celery tasks: " + str(active_tasks)) + logger.debug("Active celery tasks: " + str(active_tasks)) for task in list(running_tasks): if (task.celery_task_id not in active_tasks and not hasattr(settings, 'IGNORE_CELERY_INSPECTOR')): # NOTE: Pull status again and make sure it didn't finish in @@ -201,13 +196,13 @@ def rebuild_graph(message): task.save() task.socketio_emit_status("failed") running_tasks.pop(running_tasks.index(task)) - print_log("Task %s appears orphaned... marking as failed" % task) + logger.error("Task %s appears orphaned... marking as failed" % task) # Create and process dependencies for new tasks for task in new_tasks: - print_log("Checking dependencies for: %s" % str(task)) + logger.debug("Checking dependencies for: %s" % str(task)) task_dependencies = task.generate_dependencies(running_tasks + waiting_tasks) # TODO: other 'new' tasks? Need to investigate this scenario - print_log("New dependencies: %s" % str(task_dependencies)) + logger.debug("New dependencies: %s" % str(task_dependencies)) for dep in task_dependencies: # We recalculate the created time for the moment to ensure the # dependencies are always sorted in the right order relative to @@ -246,11 +241,11 @@ def process_graph(graph, task_capacity): running_impact = sum([t['node_object'].task_impact for t in running_nodes]) ready_nodes = filter(lambda x: x['node_object'].status != 'running', leaf_nodes) remaining_volume = task_capacity - running_impact - print_log('Running Nodes: %s; Capacity: %s; Running Impact: %s; ' - 'Remaining Capacity: %s' % - (str(running_nodes), str(task_capacity), - str(running_impact), str(remaining_volume))) - print_log("Ready Nodes: %s" % str(ready_nodes)) + logger.info('Running Nodes: %s; Capacity: %s; Running Impact: %s; ' + 'Remaining Capacity: %s' % + (str(running_nodes), str(task_capacity), + str(running_impact), str(remaining_volume))) + logger.info("Ready Nodes: %s" % str(ready_nodes)) for task_node in ready_nodes: node_obj = task_node['node_object'] # NOTE: This could be used to pass metadata through the task system @@ -277,9 +272,9 @@ def process_graph(graph, task_capacity): continue remaining_volume -= impact running_impact += impact - print_log('Started Node: %s (capacity hit: %s) ' - 'Remaining Capacity: %s' % - (str(node_obj), str(impact), str(remaining_volume))) + logger.info('Started Node: %s (capacity hit: %s) ' + 'Remaining Capacity: %s' % + (str(node_obj), str(impact), str(remaining_volume))) def run_taskmanager(): """Receive task start and finish signals to rebuild a dependency graph @@ -319,7 +314,7 @@ def run_taskmanager(): # appropriate. if (datetime.datetime.now() - last_rebuild).seconds > 10: if message is not None and 'pause' in message: - print_log("Pause command received: %s" % str(message)) + logger.info("Pause command received: %s" % str(message)) paused = message['pause'] graph = rebuild_graph(message) if not paused and graph is not None: @@ -340,19 +335,7 @@ class Command(NoArgsCommand): """ help = 'Launch the Tower task management system' - def init_logging(self): - log_levels = dict(enumerate([logging.ERROR, logging.INFO, - logging.DEBUG, 0])) - self.logger = logging.getLogger('awx.main.commands.run_task_system') - self.logger.setLevel(log_levels.get(self.verbosity, 0)) - handler = logging.StreamHandler() - handler.setFormatter(logging.Formatter('%(message)s')) - self.logger.addHandler(handler) - self.logger.propagate = False - def handle_noargs(self, **options): - self.verbosity = int(options.get('verbosity', 1)) - self.init_logging() try: run_taskmanager() except KeyboardInterrupt: diff --git a/awx/settings/defaults.py b/awx/settings/defaults.py index e1edfe8443..fed14e4fa5 100644 --- a/awx/settings/defaults.py +++ b/awx/settings/defaults.py @@ -93,6 +93,9 @@ PROJECTS_ROOT = os.path.join(BASE_DIR, 'projects') # directory should not be web-accessible JOBOUTPUT_ROOT = os.path.join(BASE_DIR, 'job_output') +# Absolute filesystem path to the directory to store logs +LOG_ROOT = os.path.join(BASE_DIR) + # The heartbeat file for the tower scheduler SCHEDULE_METADATA_LOCATION = os.path.join(BASE_DIR, '.tower_cycle') @@ -554,11 +557,38 @@ LOGGING = { 'filters': ['require_debug_false'], 'class': 'django.utils.log.AdminEmailHandler', }, - 'rotating_file': { + 'tower_warnings': { 'level': 'WARNING', 'class':'logging.handlers.RotatingFileHandler', 'filters': ['require_debug_false'], - 'filename': os.path.join(BASE_DIR, 'tower_warnings.log'), + 'filename': os.path.join(LOG_ROOT, 'tower_warnings.log'), + 'maxBytes': 1024*1024*5, # 5 MB + 'backupCount': 5, + 'formatter':'simple', + }, + 'callback_receiver': { + 'level': 'WARNING', + 'class':'logging.handlers.RotatingFileHandler', + 'filters': ['require_debug_false'], + 'filename': os.path.join(LOG_ROOT, 'callback_receiver.log'), + 'maxBytes': 1024*1024*5, # 5 MB + 'backupCount': 5, + 'formatter':'simple', + }, + 'socketio_service': { + 'level': 'WARNING', + 'class':'logging.handlers.RotatingFileHandler', + 'filters': ['require_debug_false'], + 'filename': os.path.join(LOG_ROOT, 'socketio_service.log'), + 'maxBytes': 1024*1024*5, # 5 MB + 'backupCount': 5, + 'formatter':'simple', + }, + 'task_system': { + 'level': 'INFO', + 'class':'logging.handlers.RotatingFileHandler', + 'filters': ['require_debug_false'], + 'filename': os.path.join(LOG_ROOT, 'task_system.log'), 'maxBytes': 1024*1024*5, # 5 MB 'backupCount': 5, 'formatter':'simple', @@ -569,12 +599,12 @@ LOGGING = { 'handlers': ['console'], }, 'django.request': { - 'handlers': ['mail_admins', 'console', 'file', 'syslog', 'rotating_file'], + 'handlers': ['mail_admins', 'console', 'file', 'tower_warnings'], 'level': 'WARNING', 'propagate': False, }, 'rest_framework.request': { - 'handlers': ['mail_admins', 'console', 'file', 'syslog', 'rotating_file'], + 'handlers': ['mail_admins', 'console', 'file', 'tower_warnings'], 'level': 'WARNING', 'propagate': False, }, @@ -582,9 +612,21 @@ LOGGING = { 'handlers': ['console'], }, 'awx': { - 'handlers': ['console', 'file', 'syslog', 'rotating_file'], + 'handlers': ['console', 'file', 'tower_warnings'], 'level': 'DEBUG', }, + 'awx.main.commands.run_callback_receiver': { + 'handlers': ['console', 'file', 'callback_receiver'], + 'propagate': False + }, + 'awx.main.commands.run_socketio_service': { + 'handlers': ['console', 'file', 'socketio_service'], + 'propagate': False + }, + 'awx.main.commands.run_task_system': { + 'handlers': ['console', 'file', 'task_system'], + 'propagate': False + }, 'awx.main.access': { 'handlers': ['null'], 'propagate': False, diff --git a/awx/settings/production.py b/awx/settings/production.py index 9ded1f47ca..e93f08e59e 100644 --- a/awx/settings/production.py +++ b/awx/settings/production.py @@ -44,7 +44,7 @@ JOBOUTPUT_ROOT = '/var/lib/awx/job_status/' # The heartbeat file for the tower scheduler SCHEDULE_METADATA_LOCATION = '/var/lib/awx/.tower_cycle' -LOGGING['handlers']['rotating_file'] = { +LOGGING['handlers']['tower_warnings'] = { 'level': 'WARNING', 'class':'logging.handlers.RotatingFileHandler', 'filters': ['require_debug_false'], @@ -54,6 +54,37 @@ LOGGING['handlers']['rotating_file'] = { 'formatter':'simple', } + +LOGGING['handlers']['callback_receiver'] = { + 'level': 'WARNING', + 'class':'logging.handlers.RotatingFileHandler', + 'filters': ['require_debug_false'], + 'filename': '/var/log/tower/callback_receiver.log', + 'maxBytes': 1024*1024*5, # 5 MB + 'backupCount': 5, + 'formatter':'simple', +} + +LOGGING['handlers']['socketio_service'] = { + 'level': 'WARNING', + 'class':'logging.handlers.RotatingFileHandler', + 'filters': ['require_debug_false'], + 'filename': '/var/log/tower/socketio_service.log', + 'maxBytes': 1024*1024*5, # 5 MB + 'backupCount': 5, + 'formatter':'simple', +} + +LOGGING['handlers']['task_system'] = { + 'level': 'INFO', + 'class':'logging.handlers.RotatingFileHandler', + 'filters': ['require_debug_false'], + 'filename': '/var/log/tower/task_system.log', + 'maxBytes': 1024*1024*5, # 5 MB + 'backupCount': 5, + 'formatter':'simple', +} + # Load settings from any .py files in the global conf.d directory specified in # the environment, defaulting to /etc/tower/conf.d/. settings_dir = os.environ.get('AWX_SETTINGS_DIR', '/etc/tower/conf.d/')