From 2ea0b31e2b2475406a1bbedd316033ac5c5d8d7f Mon Sep 17 00:00:00 2001 From: chris meyers Date: Thu, 15 Mar 2018 11:18:42 -0400 Subject: [PATCH] handle unicode things in task logger Related to https://github.com/ansible/ansible-tower/issues/7957 * Problem presented itself as Instances falling out of Instance Groups. This was due to the cluster membership policy decider erroring out on a logger message with unicode. * Fixed up potential other unicode logger unicode issues in tasks.py --- awx/main/tasks.py | 95 ++++++++++++++++++++++++----------------------- 1 file changed, 48 insertions(+), 47 deletions(-) diff --git a/awx/main/tasks.py b/awx/main/tasks.py index 3afb479a25..fdf316b17b 100644 --- a/awx/main/tasks.py +++ b/awx/main/tasks.py @@ -84,13 +84,13 @@ class LogErrorsTask(Task): def on_failure(self, exc, task_id, args, kwargs, einfo): if getattr(exc, 'is_awx_task_error', False): # Error caused by user / tracked in job output - logger.warning(str(exc)) + logger.warning(six.text_type("{}").format(exc)) elif isinstance(self, BaseTask): - logger.exception( - '%s %s execution encountered exception.', - get_type_for_model(self.model), args[0]) + logger.exception(six.text_type( + '{!s} {!s} execution encountered exception.') + .format(get_type_for_model(self.model), args[0])) else: - logger.exception('Task {} encountered exception.'.format(self.name), exc_info=exc) + logger.exception(six.text_type('Task {} encountered exception.').format(self.name), exc_info=exc) super(LogErrorsTask, self).on_failure(exc, task_id, args, kwargs, einfo) @@ -107,7 +107,7 @@ def celery_startup(conf=None, **kwargs): with disable_activity_stream(): sch.save() except Exception: - logger.exception("Failed to rebuild schedule {}.".format(sch)) + logger.exception(six.text_type("Failed to rebuild schedule {}.").format(sch)) @worker_process_init.connect @@ -126,8 +126,8 @@ def inform_cluster_of_shutdown(*args, **kwargs): this_inst = Instance.objects.get(hostname=settings.CLUSTER_HOST_ID) this_inst.capacity = 0 # No thank you to new jobs while shut down this_inst.save(update_fields=['capacity', 'modified']) - logger.warning('Normal shutdown signal for instance {}, ' - 'removed self from capacity pool.'.format(this_inst.hostname)) + logger.warning(six.text_type('Normal shutdown signal for instance {}, ' + 'removed self from capacity pool.').format(this_inst.hostname)) except Exception: # General exception because LogErrorsTask not used with celery signals logger.exception('Encountered problem with normal shutdown signal.') @@ -146,7 +146,7 @@ def apply_cluster_membership_policies(self): # Process policy instance list first, these will represent manually managed instances # that will not go through automatic policy determination for ig in InstanceGroup.objects.all(): - logger.info("Considering group {}".format(ig.name)) + logger.info(six.text_type("Considering group {}").format(ig.name)) ig.instances.clear() group_actual = Group(obj=ig, instances=[]) for i in ig.policy_instance_list: @@ -154,7 +154,7 @@ def apply_cluster_membership_policies(self): if not inst.exists(): continue inst = inst[0] - logger.info("Policy List, adding {} to {}".format(inst.hostname, ig.name)) + logger.info(six.text_type("Policy List, adding {} to {}").format(inst.hostname, ig.name)) group_actual.instances.append(inst.id) ig.instances.add(inst) filtered_instances.append(inst) @@ -167,7 +167,7 @@ def apply_cluster_membership_policies(self): for i in sorted(actual_instances, cmp=lambda x,y: len(x.groups) - len(y.groups)): if len(g.instances) >= g.obj.policy_instance_minimum: break - logger.info("Policy minimum, adding {} to {}".format(i.obj.hostname, g.obj.name)) + logger.info(six.text_type("Policy minimum, adding {} to {}").format(i.obj.hostname, g.obj.name)) g.obj.instances.add(i.obj) g.instances.append(i.obj.id) i.groups.append(g.obj.id) @@ -176,7 +176,7 @@ def apply_cluster_membership_policies(self): for i in sorted(actual_instances, cmp=lambda x,y: len(x.groups) - len(y.groups)): if 100 * float(len(g.instances)) / len(actual_instances) >= g.obj.policy_instance_percentage: break - logger.info("Policy percentage, adding {} to {}".format(i.obj.hostname, g.obj.name)) + logger.info(six.text_type("Policy percentage, adding {} to {}").format(i.obj.hostname, g.obj.name)) g.instances.append(i.obj.id) g.obj.instances.add(i.obj) i.groups.append(g.obj.id) @@ -205,24 +205,24 @@ def handle_setting_changes(self, setting_keys): @shared_task(bind=True, queue='tower_broadcast_all', base=LogErrorsTask) def handle_ha_toplogy_changes(self): instance = Instance.objects.me() - logger.debug("Reconfigure celeryd queues task on host {}".format(self.request.hostname)) + logger.debug(six.text_type("Reconfigure celeryd queues task on host {}").format(self.request.hostname)) awx_app = Celery('awx') awx_app.config_from_object('django.conf:settings') instances, removed_queues, added_queues = register_celery_worker_queues(awx_app, self.request.hostname) for instance in instances: - logger.info("Workers on tower node '{}' removed from queues {} and added to queues {}" + logger.info(six.text_type("Workers on tower node '{}' removed from queues {} and added to queues {}") .format(instance.hostname, removed_queues, added_queues)) updated_routes = update_celery_worker_routes(instance, settings) - logger.info("Worker on tower node '{}' updated celery routes {} all routes are now {}" + logger.info(six.text_type("Worker on tower node '{}' updated celery routes {} all routes are now {}") .format(instance.hostname, updated_routes, self.app.conf.CELERY_ROUTES)) @worker_ready.connect def handle_ha_toplogy_worker_ready(sender, **kwargs): - logger.debug("Configure celeryd queues task on host {}".format(sender.hostname)) + logger.debug(six.text_type("Configure celeryd queues task on host {}").format(sender.hostname)) instances, removed_queues, added_queues = register_celery_worker_queues(sender.app, sender.hostname) for instance in instances: - logger.info("Workers on tower node '{}' unsubscribed from queues {} and subscribed to queues {}" + logger.info(six.text_type("Workers on tower node '{}' unsubscribed from queues {} and subscribed to queues {}") .format(instance.hostname, removed_queues, added_queues)) # Expedite the first hearbeat run so a node comes online quickly. @@ -233,10 +233,10 @@ def handle_ha_toplogy_worker_ready(sender, **kwargs): @celeryd_init.connect def handle_update_celery_routes(sender=None, conf=None, **kwargs): conf = conf if conf else sender.app.conf - logger.debug("Registering celery routes for {}".format(sender)) + logger.debug(six.text_type("Registering celery routes for {}").format(sender)) instance = Instance.objects.me() added_routes = update_celery_worker_routes(instance, conf) - logger.info("Workers on tower node '{}' added routes {} all routes are now {}" + logger.info(six.text_type("Workers on tower node '{}' added routes {} all routes are now {}") .format(instance.hostname, added_routes, conf.CELERY_ROUTES)) @@ -244,7 +244,7 @@ def handle_update_celery_routes(sender=None, conf=None, **kwargs): def handle_update_celery_hostname(sender, instance, **kwargs): tower_instance = Instance.objects.me() instance.hostname = 'celery@{}'.format(tower_instance.hostname) - logger.warn("Set hostname to {}".format(instance.hostname)) + logger.warn(six.text_type("Set hostname to {}").format(instance.hostname)) @shared_task(queue='tower', base=LogErrorsTask) @@ -264,7 +264,7 @@ def send_notifications(notification_list, job_id=None): notification.status = "successful" notification.notifications_sent = sent except Exception as e: - logger.error("Send Notification Failed {}".format(e)) + logger.error(six.text_type("Send Notification Failed {}").format(e)) notification.status = "failed" notification.error = smart_str(e) finally: @@ -299,7 +299,7 @@ def purge_old_stdout_files(self): for f in os.listdir(settings.JOBOUTPUT_ROOT): if os.path.getctime(os.path.join(settings.JOBOUTPUT_ROOT,f)) < nowtime - settings.LOCAL_STDOUT_EXPIRE_TIME: os.unlink(os.path.join(settings.JOBOUTPUT_ROOT,f)) - logger.info("Removing {}".format(os.path.join(settings.JOBOUTPUT_ROOT,f))) + logger.info(six.text_type("Removing {}").format(os.path.join(settings.JOBOUTPUT_ROOT,f))) @shared_task(bind=True, base=LogErrorsTask) @@ -320,7 +320,7 @@ def cluster_node_heartbeat(self): if this_inst: startup_event = this_inst.is_lost(ref_time=nowtime) if this_inst.capacity == 0 and this_inst.enabled: - logger.warning('Rejoining the cluster as instance {}.'.format(this_inst.hostname)) + logger.warning(six.text_type('Rejoining the cluster as instance {}.').format(this_inst.hostname)) if this_inst.enabled: this_inst.refresh_capacity() handle_ha_toplogy_changes.apply_async() @@ -337,10 +337,11 @@ def cluster_node_heartbeat(self): if other_inst.version == "": continue if Version(other_inst.version.split('-', 1)[0]) > Version(awx_application_version.split('-', 1)[0]) and not settings.DEBUG: - logger.error("Host {} reports version {}, but this node {} is at {}, shutting down".format(other_inst.hostname, - other_inst.version, - this_inst.hostname, - this_inst.version)) + logger.error(six.text_type("Host {} reports version {}, but this node {} is at {}, shutting down") + .format(other_inst.hostname, + other_inst.version, + this_inst.hostname, + this_inst.version)) # Shutdown signal will set the capacity to zero to ensure no Jobs get added to this instance. # The heartbeat task will reset the capacity to the system capacity after upgrade. stop_local_services(['uwsgi', 'celery', 'beat', 'callback'], communicate=False) @@ -357,17 +358,17 @@ def cluster_node_heartbeat(self): if other_inst.capacity != 0 and not settings.AWX_AUTO_DEPROVISION_INSTANCES: other_inst.capacity = 0 other_inst.save(update_fields=['capacity']) - logger.error("Host {} last checked in at {}, marked as lost.".format( + logger.error(six.text_type("Host {} last checked in at {}, marked as lost.").format( other_inst.hostname, other_inst.modified)) elif settings.AWX_AUTO_DEPROVISION_INSTANCES: deprovision_hostname = other_inst.hostname other_inst.delete() - logger.info("Host {} Automatically Deprovisioned.".format(deprovision_hostname)) + logger.info(six.text_type("Host {} Automatically Deprovisioned.").format(deprovision_hostname)) except DatabaseError as e: if 'did not affect any rows' in str(e): - logger.debug('Another instance has marked {} as lost'.format(other_inst.hostname)) + logger.debug(six.text_type('Another instance has marked {} as lost').format(other_inst.hostname)) else: - logger.exception('Error marking {} as lost'.format(other_inst.hostname)) + logger.exception(six.text_type('Error marking {} as lost').format(other_inst.hostname)) @shared_task(bind=True, base=LogErrorsTask) @@ -390,7 +391,7 @@ def awx_isolated_heartbeat(self): isolated_instance.save(update_fields=['last_isolated_check']) # Slow pass looping over isolated IGs and their isolated instances if len(isolated_instance_qs) > 0: - logger.debug("Managing isolated instances {}.".format(','.join([inst.hostname for inst in isolated_instance_qs]))) + logger.debug(six.text_type("Managing isolated instances {}.").format(','.join([inst.hostname for inst in isolated_instance_qs]))) isolated_manager.IsolatedManager.health_check(isolated_instance_qs, awx_application_version) @@ -551,7 +552,7 @@ def update_host_smart_inventory_memberships(): changed_inventories.add(smart_inventory) SmartInventoryMembership.objects.bulk_create(memberships) except IntegrityError as e: - logger.error("Update Host Smart Inventory Memberships failed due to an exception: " + str(e)) + logger.error(six.text_type("Update Host Smart Inventory Memberships failed due to an exception: {}").format(e)) return # Update computed fields for changed inventories outside atomic action for smart_inventory in changed_inventories: @@ -576,7 +577,7 @@ def delete_inventory(self, inventory_id, user_id): 'inventories-status_changed', {'group_name': 'inventories', 'inventory_id': inventory_id, 'status': 'deleted'} ) - logger.debug('Deleted inventory %s as user %s.' % (inventory_id, user_id)) + logger.debug(six.text_type('Deleted inventory {} as user {}.').format(inventory_id, user_id)) except Inventory.DoesNotExist: logger.exception("Delete Inventory failed due to missing inventory: " + str(inventory_id)) return @@ -598,7 +599,7 @@ def with_path_cleanup(f): elif os.path.exists(p): os.remove(p) except OSError: - logger.exception("Failed to remove tmp file: {}".format(p)) + logger.exception(six.text_type("Failed to remove tmp file: {}").format(p)) self.cleanup_paths = [] return _wrapped @@ -1011,7 +1012,7 @@ class BaseTask(LogErrorsTask): try: self.post_run_hook(instance, status, **kwargs) except Exception: - logger.exception('{} Post run hook errored.'.format(instance.log_format)) + logger.exception(six.text_type('{} Post run hook errored.').format(instance.log_format)) instance = self.update_model(pk) if instance.cancel_flag: status = 'canceled' @@ -1023,7 +1024,7 @@ class BaseTask(LogErrorsTask): try: self.final_run_hook(instance, status, **kwargs) except Exception: - logger.exception('%s Final run hook errored.', instance.log_format) + logger.exception(six.text_type('{} Final run hook errored.').format(instance.log_format)) instance.websocket_emit_status(status) if status != 'successful' and not hasattr(settings, 'CELERY_UNIT_TEST'): # Raising an exception will mark the job as 'failed' in celery @@ -1616,8 +1617,8 @@ class RunProjectUpdate(BaseTask): task_instance.request.id = project_request_id task_instance.run(local_inv_update.id) except Exception: - logger.exception('%s Unhandled exception updating dependent SCM inventory sources.', - project_update.log_format) + logger.exception(six.text_type('{} Unhandled exception updating dependent SCM inventory sources.') + .format(project_update.log_format)) try: project_update.refresh_from_db() @@ -1630,10 +1631,10 @@ class RunProjectUpdate(BaseTask): logger.warning('%s Dependent inventory update deleted during execution.', project_update.log_format) continue if project_update.cancel_flag: - logger.info('Project update {} was canceled while updating dependent inventories.'.format(project_update.log_format)) + logger.info(six.text_type('Project update {} was canceled while updating dependent inventories.').format(project_update.log_format)) break if local_inv_update.cancel_flag: - logger.info('Continuing to process project dependencies after {} was canceled'.format(local_inv_update.log_format)) + logger.info(six.text_type('Continuing to process project dependencies after {} was canceled').format(local_inv_update.log_format)) if local_inv_update.status == 'successful': inv_src.scm_last_revision = scm_revision inv_src.save(update_fields=['scm_last_revision']) @@ -1642,7 +1643,7 @@ class RunProjectUpdate(BaseTask): try: fcntl.flock(self.lock_fd, fcntl.LOCK_UN) except IOError as e: - logger.error("I/O error({0}) while trying to open lock file [{1}]: {2}".format(e.errno, instance.get_lock_file(), e.strerror)) + logger.error(six.text_type("I/O error({0}) while trying to open lock file [{1}]: {2}").format(e.errno, instance.get_lock_file(), e.strerror)) os.close(self.lock_fd) raise @@ -1660,14 +1661,14 @@ class RunProjectUpdate(BaseTask): try: self.lock_fd = os.open(lock_path, os.O_RDONLY | os.O_CREAT) except OSError as e: - logger.error("I/O error({0}) while trying to open lock file [{1}]: {2}".format(e.errno, lock_path, e.strerror)) + logger.error(six.text_type("I/O error({0}) while trying to open lock file [{1}]: {2}").format(e.errno, lock_path, e.strerror)) raise try: fcntl.flock(self.lock_fd, fcntl.LOCK_EX) except IOError as e: os.close(self.lock_fd) - logger.error("I/O error({0}) while trying to aquire lock on file [{1}]: {2}".format(e.errno, lock_path, e.strerror)) + logger.error(six.text_type("I/O error({0}) while trying to aquire lock on file [{1}]: {2}").format(e.errno, lock_path, e.strerror)) raise def pre_run_hook(self, instance, **kwargs): @@ -1687,7 +1688,7 @@ class RunProjectUpdate(BaseTask): if lines: p.scm_revision = lines[0].strip() else: - logger.info("%s Could not find scm revision in check", instance.log_format) + logger.info(six.text_type("{} Could not find scm revision in check").format(instance.log_format)) p.playbook_files = p.playbooks p.inventory_files = p.inventories p.save() @@ -2297,7 +2298,7 @@ class RunSystemJob(BaseTask): if 'granularity' in json_vars: args.extend(['--granularity', str(json_vars['granularity'])]) except Exception: - logger.exception("%s Failed to parse system job", system_job.log_format) + logger.exception(six.text_type("{} Failed to parse system job").format(system_job.log_format)) return args def build_env(self, instance, **kwargs): @@ -2332,7 +2333,7 @@ def deep_copy_model_obj( self, model_module, model_name, obj_pk, new_obj_pk, user_pk, sub_obj_list, permission_check_func=None ): - logger.info('Deep copy {} from {} to {}.'.format(model_name, obj_pk, new_obj_pk)) + logger.info(six.text_type('Deep copy {} from {} to {}.').format(model_name, obj_pk, new_obj_pk)) from awx.api.generics import CopyAPIView model = getattr(importlib.import_module(model_module), model_name, None) if model is None: