From 1112557c791735e15ed5de6071efdda1df290738 Mon Sep 17 00:00:00 2001 From: AlanCoding Date: Tue, 25 Jul 2017 15:35:44 -0400 Subject: [PATCH] set capacity to 0 if instance has not checked in lately --- awx/main/isolated/isolated_manager.py | 18 +++++-- .../management/commands/register_instance.py | 2 +- awx/main/models/ha.py | 10 ++++ awx/main/tasks.py | 48 ++++++++++++++----- 4 files changed, 60 insertions(+), 18 deletions(-) diff --git a/awx/main/isolated/isolated_manager.py b/awx/main/isolated/isolated_manager.py index 1ac035ad4b..9627d1fe8d 100644 --- a/awx/main/isolated/isolated_manager.py +++ b/awx/main/isolated/isolated_manager.py @@ -406,15 +406,25 @@ class IsolatedManager(object): try: task_result = result['plays'][0]['tasks'][0]['hosts'][instance.hostname] except (KeyError, IndexError): - logger.exception('Failed to read status from isolated instance {}.'.format(instance.hostname)) - continue + task_result = {} if 'capacity' in task_result: instance.version = task_result['version'] + if instance.capacity == 0 and task_result['capacity']: + logger.warning('Isolated instance {} has re-joined.'.format(instance.hostname)) instance.capacity = int(task_result['capacity']) instance.save(update_fields=['capacity', 'version', 'modified']) + elif instance.capacity == 0: + logger.debug('Isolated instance {} previously marked as lost, could not re-join.'.format( + instance.hostname)) else: - logger.warning('Could not update capacity of {}, msg={}'.format( - instance.hostname, task_result.get('msg', 'unknown failure'))) + logger.warning('Could not update status of isolated instance {}, msg={}'.format( + instance.hostname, task_result.get('msg', 'unknown failure') + )) + if instance.is_lost(isolated=True): + instance.capacity = 0 + instance.save(update_fields=['capacity']) + logger.error('Isolated instance {} last checked in at {}, marked as lost.'.format( + instance.hostname, instance.modified)) @staticmethod def wrap_stdout_handle(instance, private_data_dir, stdout_handle, event_data_key='job_id'): diff --git a/awx/main/management/commands/register_instance.py b/awx/main/management/commands/register_instance.py index 0a14206ef4..9cf28a95b8 100644 --- a/awx/main/management/commands/register_instance.py +++ b/awx/main/management/commands/register_instance.py @@ -30,7 +30,7 @@ class Command(BaseCommand): with advisory_lock('instance_registration_%s' % hostname): instance = Instance.objects.filter(hostname=hostname) if instance.exists(): - print("Instance already registered {}".format(instance[0])) + print("Instance already registered {}".format(instance[0].hostname)) return instance = Instance(uuid=self.uuid, hostname=hostname) instance.save() diff --git a/awx/main/models/ha.py b/awx/main/models/ha.py index 2790f89ca6..509e37b4ac 100644 --- a/awx/main/models/ha.py +++ b/awx/main/models/ha.py @@ -5,6 +5,8 @@ from django.db import models from django.db.models.signals import post_save from django.dispatch import receiver from django.utils.translation import ugettext_lazy as _ +from django.conf import settings +from django.utils.timezone import now, timedelta from solo.models import SingletonModel @@ -53,6 +55,14 @@ class Instance(models.Model): # NOTE: TODO: Likely to repurpose this once standalone ramparts are a thing return "awx" + def is_lost(self, ref_time=None, isolated=False): + if ref_time is None: + ref_time = now() + grace_period = 120 + if isolated: + grace_period = settings.AWX_ISOLATED_PERIODIC_CHECK * 2 + return self.modified < ref_time - timedelta(seconds=grace_period) + class InstanceGroup(models.Model): """A model representing a Queue/Group of AWX Instances.""" diff --git a/awx/main/tasks.py b/awx/main/tasks.py index c95ff9710c..b7c9644a89 100644 --- a/awx/main/tasks.py +++ b/awx/main/tasks.py @@ -33,7 +33,7 @@ from celery.signals import celeryd_init, worker_process_init # Django from django.conf import settings -from django.db import transaction, DatabaseError, IntegrityError +from django.db import transaction, DatabaseError, IntegrityError, OperationalError from django.utils.timezone import now, timedelta from django.utils.encoding import smart_str from django.core.mail import send_mail @@ -184,32 +184,54 @@ def purge_old_stdout_files(self): def cluster_node_heartbeat(self): logger.debug("Cluster node heartbeat task.") nowtime = now() - inst = Instance.objects.filter(hostname=settings.CLUSTER_HOST_ID) - if inst.exists(): - inst = inst[0] - inst.capacity = get_system_task_capacity() - inst.version = awx_application_version - inst.save() + instance_list = list(Instance.objects.filter(rampart_groups__controller__isnull=True).distinct()) + this_inst = None + lost_instances = [] + for inst in list(instance_list): + if inst.hostname == settings.CLUSTER_HOST_ID: + this_inst = inst + instance_list.remove(inst) + elif inst.is_lost(ref_time=nowtime): + lost_instances.append(inst) + instance_list.remove(inst) + if this_inst: + startup_event = this_inst.is_lost(ref_time=nowtime) + if this_inst.capacity == 0: + logger.warning('Rejoining the cluster as instance {}.'.format(this_inst.hostname)) + this_inst.capacity = get_system_task_capacity() + this_inst.version = awx_application_version + this_inst.save(update_fields=['capacity', 'version', 'modified']) + if startup_event: + return else: raise RuntimeError("Cluster Host Not Found: {}".format(settings.CLUSTER_HOST_ID)) - recent_inst = Instance.objects.filter(modified__gt=nowtime - timedelta(seconds=70)).exclude(hostname=settings.CLUSTER_HOST_ID) # IFF any node has a greater version than we do, then we'll shutdown services - for other_inst in recent_inst: + for other_inst in instance_list: if other_inst.version == "": continue if Version(other_inst.version.split('-', 1)[0]) > Version(awx_application_version) and not settings.DEBUG: logger.error("Host {} reports version {}, but this node {} is at {}, shutting down".format(other_inst.hostname, other_inst.version, - inst.hostname, - inst.version)) + this_inst.hostname, + this_inst.version)) # Set the capacity to zero to ensure no Jobs get added to this instance. # The heartbeat task will reset the capacity to the system capacity after upgrade. - inst.capacity = 0 - inst.save() + this_inst.capacity = 0 + this_inst.save(update_fields=['capacity']) stop_local_services(['uwsgi', 'celery', 'beat', 'callback', 'fact']) # We wait for the Popen call inside stop_local_services above # so the line below will rarely if ever be executed. raise RuntimeError("Shutting down.") + for other_inst in lost_instances: + if other_inst.capacity == 0: + continue + try: + other_inst.capacity = 0 + other_inst.save(update_fields=['capacity']) + logger.error("Host {} last checked in at {}, marked as lost.".format( + other_inst.hostname, other_inst.modified)) + except (IntegrityError, OperationalError): + pass # another instance is updating the lost instance @task(bind=True, base=LogErrorsTask)