1
0
mirror of https://github.com/ansible/awx.git synced 2024-11-01 16:51:11 +03:00

Merge pull request #1572 from chrismeyersfsu/fix-instance_removed_from_group

handle unicode things in task logger
This commit is contained in:
Chris Meyers 2018-03-15 16:25:13 -04:00 committed by GitHub
commit 0689cea806
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -84,13 +84,13 @@ class LogErrorsTask(Task):
def on_failure(self, exc, task_id, args, kwargs, einfo):
if getattr(exc, 'is_awx_task_error', False):
# Error caused by user / tracked in job output
logger.warning(str(exc))
logger.warning(six.text_type("{}").format(exc))
elif isinstance(self, BaseTask):
logger.exception(
'%s %s execution encountered exception.',
get_type_for_model(self.model), args[0])
logger.exception(six.text_type(
'{!s} {!s} execution encountered exception.')
.format(get_type_for_model(self.model), args[0]))
else:
logger.exception('Task {} encountered exception.'.format(self.name), exc_info=exc)
logger.exception(six.text_type('Task {} encountered exception.').format(self.name), exc_info=exc)
super(LogErrorsTask, self).on_failure(exc, task_id, args, kwargs, einfo)
@ -107,7 +107,7 @@ def celery_startup(conf=None, **kwargs):
with disable_activity_stream():
sch.save()
except Exception:
logger.exception("Failed to rebuild schedule {}.".format(sch))
logger.exception(six.text_type("Failed to rebuild schedule {}.").format(sch))
@worker_process_init.connect
@ -126,8 +126,8 @@ def inform_cluster_of_shutdown(*args, **kwargs):
this_inst = Instance.objects.get(hostname=settings.CLUSTER_HOST_ID)
this_inst.capacity = 0 # No thank you to new jobs while shut down
this_inst.save(update_fields=['capacity', 'modified'])
logger.warning('Normal shutdown signal for instance {}, '
'removed self from capacity pool.'.format(this_inst.hostname))
logger.warning(six.text_type('Normal shutdown signal for instance {}, '
'removed self from capacity pool.').format(this_inst.hostname))
except Exception:
# General exception because LogErrorsTask not used with celery signals
logger.exception('Encountered problem with normal shutdown signal.')
@ -146,7 +146,7 @@ def apply_cluster_membership_policies(self):
# Process policy instance list first, these will represent manually managed instances
# that will not go through automatic policy determination
for ig in InstanceGroup.objects.all():
logger.info("Considering group {}".format(ig.name))
logger.info(six.text_type("Considering group {}").format(ig.name))
ig.instances.clear()
group_actual = Group(obj=ig, instances=[])
for i in ig.policy_instance_list:
@ -154,7 +154,7 @@ def apply_cluster_membership_policies(self):
if not inst.exists():
continue
inst = inst[0]
logger.info("Policy List, adding {} to {}".format(inst.hostname, ig.name))
logger.info(six.text_type("Policy List, adding {} to {}").format(inst.hostname, ig.name))
group_actual.instances.append(inst.id)
ig.instances.add(inst)
filtered_instances.append(inst)
@ -167,7 +167,7 @@ def apply_cluster_membership_policies(self):
for i in sorted(actual_instances, cmp=lambda x,y: len(x.groups) - len(y.groups)):
if len(g.instances) >= g.obj.policy_instance_minimum:
break
logger.info("Policy minimum, adding {} to {}".format(i.obj.hostname, g.obj.name))
logger.info(six.text_type("Policy minimum, adding {} to {}").format(i.obj.hostname, g.obj.name))
g.obj.instances.add(i.obj)
g.instances.append(i.obj.id)
i.groups.append(g.obj.id)
@ -176,7 +176,7 @@ def apply_cluster_membership_policies(self):
for i in sorted(actual_instances, cmp=lambda x,y: len(x.groups) - len(y.groups)):
if 100 * float(len(g.instances)) / len(actual_instances) >= g.obj.policy_instance_percentage:
break
logger.info("Policy percentage, adding {} to {}".format(i.obj.hostname, g.obj.name))
logger.info(six.text_type("Policy percentage, adding {} to {}").format(i.obj.hostname, g.obj.name))
g.instances.append(i.obj.id)
g.obj.instances.add(i.obj)
i.groups.append(g.obj.id)
@ -205,24 +205,24 @@ def handle_setting_changes(self, setting_keys):
@shared_task(bind=True, queue='tower_broadcast_all', base=LogErrorsTask)
def handle_ha_toplogy_changes(self):
instance = Instance.objects.me()
logger.debug("Reconfigure celeryd queues task on host {}".format(self.request.hostname))
logger.debug(six.text_type("Reconfigure celeryd queues task on host {}").format(self.request.hostname))
awx_app = Celery('awx')
awx_app.config_from_object('django.conf:settings')
instances, removed_queues, added_queues = register_celery_worker_queues(awx_app, self.request.hostname)
for instance in instances:
logger.info("Workers on tower node '{}' removed from queues {} and added to queues {}"
logger.info(six.text_type("Workers on tower node '{}' removed from queues {} and added to queues {}")
.format(instance.hostname, removed_queues, added_queues))
updated_routes = update_celery_worker_routes(instance, settings)
logger.info("Worker on tower node '{}' updated celery routes {} all routes are now {}"
logger.info(six.text_type("Worker on tower node '{}' updated celery routes {} all routes are now {}")
.format(instance.hostname, updated_routes, self.app.conf.CELERY_ROUTES))
@worker_ready.connect
def handle_ha_toplogy_worker_ready(sender, **kwargs):
logger.debug("Configure celeryd queues task on host {}".format(sender.hostname))
logger.debug(six.text_type("Configure celeryd queues task on host {}").format(sender.hostname))
instances, removed_queues, added_queues = register_celery_worker_queues(sender.app, sender.hostname)
for instance in instances:
logger.info("Workers on tower node '{}' unsubscribed from queues {} and subscribed to queues {}"
logger.info(six.text_type("Workers on tower node '{}' unsubscribed from queues {} and subscribed to queues {}")
.format(instance.hostname, removed_queues, added_queues))
# Expedite the first hearbeat run so a node comes online quickly.
@ -233,10 +233,10 @@ def handle_ha_toplogy_worker_ready(sender, **kwargs):
@celeryd_init.connect
def handle_update_celery_routes(sender=None, conf=None, **kwargs):
conf = conf if conf else sender.app.conf
logger.debug("Registering celery routes for {}".format(sender))
logger.debug(six.text_type("Registering celery routes for {}").format(sender))
instance = Instance.objects.me()
added_routes = update_celery_worker_routes(instance, conf)
logger.info("Workers on tower node '{}' added routes {} all routes are now {}"
logger.info(six.text_type("Workers on tower node '{}' added routes {} all routes are now {}")
.format(instance.hostname, added_routes, conf.CELERY_ROUTES))
@ -244,7 +244,7 @@ def handle_update_celery_routes(sender=None, conf=None, **kwargs):
def handle_update_celery_hostname(sender, instance, **kwargs):
tower_instance = Instance.objects.me()
instance.hostname = 'celery@{}'.format(tower_instance.hostname)
logger.warn("Set hostname to {}".format(instance.hostname))
logger.warn(six.text_type("Set hostname to {}").format(instance.hostname))
@shared_task(queue='tower', base=LogErrorsTask)
@ -264,7 +264,7 @@ def send_notifications(notification_list, job_id=None):
notification.status = "successful"
notification.notifications_sent = sent
except Exception as e:
logger.error("Send Notification Failed {}".format(e))
logger.error(six.text_type("Send Notification Failed {}").format(e))
notification.status = "failed"
notification.error = smart_str(e)
finally:
@ -299,7 +299,7 @@ def purge_old_stdout_files(self):
for f in os.listdir(settings.JOBOUTPUT_ROOT):
if os.path.getctime(os.path.join(settings.JOBOUTPUT_ROOT,f)) < nowtime - settings.LOCAL_STDOUT_EXPIRE_TIME:
os.unlink(os.path.join(settings.JOBOUTPUT_ROOT,f))
logger.info("Removing {}".format(os.path.join(settings.JOBOUTPUT_ROOT,f)))
logger.info(six.text_type("Removing {}").format(os.path.join(settings.JOBOUTPUT_ROOT,f)))
@shared_task(bind=True, base=LogErrorsTask)
@ -320,7 +320,7 @@ def cluster_node_heartbeat(self):
if this_inst:
startup_event = this_inst.is_lost(ref_time=nowtime)
if this_inst.capacity == 0 and this_inst.enabled:
logger.warning('Rejoining the cluster as instance {}.'.format(this_inst.hostname))
logger.warning(six.text_type('Rejoining the cluster as instance {}.').format(this_inst.hostname))
if this_inst.enabled:
this_inst.refresh_capacity()
handle_ha_toplogy_changes.apply_async()
@ -337,10 +337,11 @@ def cluster_node_heartbeat(self):
if other_inst.version == "":
continue
if Version(other_inst.version.split('-', 1)[0]) > Version(awx_application_version.split('-', 1)[0]) and not settings.DEBUG:
logger.error("Host {} reports version {}, but this node {} is at {}, shutting down".format(other_inst.hostname,
other_inst.version,
this_inst.hostname,
this_inst.version))
logger.error(six.text_type("Host {} reports version {}, but this node {} is at {}, shutting down")
.format(other_inst.hostname,
other_inst.version,
this_inst.hostname,
this_inst.version))
# Shutdown signal will set the capacity to zero to ensure no Jobs get added to this instance.
# The heartbeat task will reset the capacity to the system capacity after upgrade.
stop_local_services(['uwsgi', 'celery', 'beat', 'callback'], communicate=False)
@ -357,17 +358,17 @@ def cluster_node_heartbeat(self):
if other_inst.capacity != 0 and not settings.AWX_AUTO_DEPROVISION_INSTANCES:
other_inst.capacity = 0
other_inst.save(update_fields=['capacity'])
logger.error("Host {} last checked in at {}, marked as lost.".format(
logger.error(six.text_type("Host {} last checked in at {}, marked as lost.").format(
other_inst.hostname, other_inst.modified))
elif settings.AWX_AUTO_DEPROVISION_INSTANCES:
deprovision_hostname = other_inst.hostname
other_inst.delete()
logger.info("Host {} Automatically Deprovisioned.".format(deprovision_hostname))
logger.info(six.text_type("Host {} Automatically Deprovisioned.").format(deprovision_hostname))
except DatabaseError as e:
if 'did not affect any rows' in str(e):
logger.debug('Another instance has marked {} as lost'.format(other_inst.hostname))
logger.debug(six.text_type('Another instance has marked {} as lost').format(other_inst.hostname))
else:
logger.exception('Error marking {} as lost'.format(other_inst.hostname))
logger.exception(six.text_type('Error marking {} as lost').format(other_inst.hostname))
@shared_task(bind=True, base=LogErrorsTask)
@ -390,7 +391,7 @@ def awx_isolated_heartbeat(self):
isolated_instance.save(update_fields=['last_isolated_check'])
# Slow pass looping over isolated IGs and their isolated instances
if len(isolated_instance_qs) > 0:
logger.debug("Managing isolated instances {}.".format(','.join([inst.hostname for inst in isolated_instance_qs])))
logger.debug(six.text_type("Managing isolated instances {}.").format(','.join([inst.hostname for inst in isolated_instance_qs])))
isolated_manager.IsolatedManager.health_check(isolated_instance_qs, awx_application_version)
@ -551,7 +552,7 @@ def update_host_smart_inventory_memberships():
changed_inventories.add(smart_inventory)
SmartInventoryMembership.objects.bulk_create(memberships)
except IntegrityError as e:
logger.error("Update Host Smart Inventory Memberships failed due to an exception: " + str(e))
logger.error(six.text_type("Update Host Smart Inventory Memberships failed due to an exception: {}").format(e))
return
# Update computed fields for changed inventories outside atomic action
for smart_inventory in changed_inventories:
@ -576,7 +577,7 @@ def delete_inventory(self, inventory_id, user_id):
'inventories-status_changed',
{'group_name': 'inventories', 'inventory_id': inventory_id, 'status': 'deleted'}
)
logger.debug('Deleted inventory %s as user %s.' % (inventory_id, user_id))
logger.debug(six.text_type('Deleted inventory {} as user {}.').format(inventory_id, user_id))
except Inventory.DoesNotExist:
logger.exception("Delete Inventory failed due to missing inventory: " + str(inventory_id))
return
@ -598,7 +599,7 @@ def with_path_cleanup(f):
elif os.path.exists(p):
os.remove(p)
except OSError:
logger.exception("Failed to remove tmp file: {}".format(p))
logger.exception(six.text_type("Failed to remove tmp file: {}").format(p))
self.cleanup_paths = []
return _wrapped
@ -1011,7 +1012,7 @@ class BaseTask(LogErrorsTask):
try:
self.post_run_hook(instance, status, **kwargs)
except Exception:
logger.exception('{} Post run hook errored.'.format(instance.log_format))
logger.exception(six.text_type('{} Post run hook errored.').format(instance.log_format))
instance = self.update_model(pk)
if instance.cancel_flag:
status = 'canceled'
@ -1023,7 +1024,7 @@ class BaseTask(LogErrorsTask):
try:
self.final_run_hook(instance, status, **kwargs)
except Exception:
logger.exception('%s Final run hook errored.', instance.log_format)
logger.exception(six.text_type('{} Final run hook errored.').format(instance.log_format))
instance.websocket_emit_status(status)
if status != 'successful' and not hasattr(settings, 'CELERY_UNIT_TEST'):
# Raising an exception will mark the job as 'failed' in celery
@ -1616,8 +1617,8 @@ class RunProjectUpdate(BaseTask):
task_instance.request.id = project_request_id
task_instance.run(local_inv_update.id)
except Exception:
logger.exception('%s Unhandled exception updating dependent SCM inventory sources.',
project_update.log_format)
logger.exception(six.text_type('{} Unhandled exception updating dependent SCM inventory sources.')
.format(project_update.log_format))
try:
project_update.refresh_from_db()
@ -1630,10 +1631,10 @@ class RunProjectUpdate(BaseTask):
logger.warning('%s Dependent inventory update deleted during execution.', project_update.log_format)
continue
if project_update.cancel_flag:
logger.info('Project update {} was canceled while updating dependent inventories.'.format(project_update.log_format))
logger.info(six.text_type('Project update {} was canceled while updating dependent inventories.').format(project_update.log_format))
break
if local_inv_update.cancel_flag:
logger.info('Continuing to process project dependencies after {} was canceled'.format(local_inv_update.log_format))
logger.info(six.text_type('Continuing to process project dependencies after {} was canceled').format(local_inv_update.log_format))
if local_inv_update.status == 'successful':
inv_src.scm_last_revision = scm_revision
inv_src.save(update_fields=['scm_last_revision'])
@ -1642,7 +1643,7 @@ class RunProjectUpdate(BaseTask):
try:
fcntl.flock(self.lock_fd, fcntl.LOCK_UN)
except IOError as e:
logger.error("I/O error({0}) while trying to open lock file [{1}]: {2}".format(e.errno, instance.get_lock_file(), e.strerror))
logger.error(six.text_type("I/O error({0}) while trying to open lock file [{1}]: {2}").format(e.errno, instance.get_lock_file(), e.strerror))
os.close(self.lock_fd)
raise
@ -1660,14 +1661,14 @@ class RunProjectUpdate(BaseTask):
try:
self.lock_fd = os.open(lock_path, os.O_RDONLY | os.O_CREAT)
except OSError as e:
logger.error("I/O error({0}) while trying to open lock file [{1}]: {2}".format(e.errno, lock_path, e.strerror))
logger.error(six.text_type("I/O error({0}) while trying to open lock file [{1}]: {2}").format(e.errno, lock_path, e.strerror))
raise
try:
fcntl.flock(self.lock_fd, fcntl.LOCK_EX)
except IOError as e:
os.close(self.lock_fd)
logger.error("I/O error({0}) while trying to aquire lock on file [{1}]: {2}".format(e.errno, lock_path, e.strerror))
logger.error(six.text_type("I/O error({0}) while trying to aquire lock on file [{1}]: {2}").format(e.errno, lock_path, e.strerror))
raise
def pre_run_hook(self, instance, **kwargs):
@ -1687,7 +1688,7 @@ class RunProjectUpdate(BaseTask):
if lines:
p.scm_revision = lines[0].strip()
else:
logger.info("%s Could not find scm revision in check", instance.log_format)
logger.info(six.text_type("{} Could not find scm revision in check").format(instance.log_format))
p.playbook_files = p.playbooks
p.inventory_files = p.inventories
p.save()
@ -2297,7 +2298,7 @@ class RunSystemJob(BaseTask):
if 'granularity' in json_vars:
args.extend(['--granularity', str(json_vars['granularity'])])
except Exception:
logger.exception("%s Failed to parse system job", system_job.log_format)
logger.exception(six.text_type("{} Failed to parse system job").format(system_job.log_format))
return args
def build_env(self, instance, **kwargs):
@ -2332,7 +2333,7 @@ def deep_copy_model_obj(
self, model_module, model_name, obj_pk, new_obj_pk,
user_pk, sub_obj_list, permission_check_func=None
):
logger.info('Deep copy {} from {} to {}.'.format(model_name, obj_pk, new_obj_pk))
logger.info(six.text_type('Deep copy {} from {} to {}.').format(model_name, obj_pk, new_obj_pk))
from awx.api.generics import CopyAPIView
model = getattr(importlib.import_module(model_module), model_name, None)
if model is None: