1
0
mirror of https://github.com/ansible/awx.git synced 2024-11-01 08:21:15 +03:00

Merge pull request #71 from AlanCoding/man_overboard

Set capacity to 0 if instance has not checked in lately
This commit is contained in:
Alan Rominger 2017-07-27 20:55:16 -04:00 committed by GitHub
commit 368ff8a4fb
4 changed files with 60 additions and 18 deletions

View File

@ -406,15 +406,25 @@ class IsolatedManager(object):
try: try:
task_result = result['plays'][0]['tasks'][0]['hosts'][instance.hostname] task_result = result['plays'][0]['tasks'][0]['hosts'][instance.hostname]
except (KeyError, IndexError): except (KeyError, IndexError):
logger.exception('Failed to read status from isolated instance {}.'.format(instance.hostname)) task_result = {}
continue
if 'capacity' in task_result: if 'capacity' in task_result:
instance.version = task_result['version'] instance.version = task_result['version']
if instance.capacity == 0 and task_result['capacity']:
logger.warning('Isolated instance {} has re-joined.'.format(instance.hostname))
instance.capacity = int(task_result['capacity']) instance.capacity = int(task_result['capacity'])
instance.save(update_fields=['capacity', 'version', 'modified']) instance.save(update_fields=['capacity', 'version', 'modified'])
elif instance.capacity == 0:
logger.debug('Isolated instance {} previously marked as lost, could not re-join.'.format(
instance.hostname))
else: else:
logger.warning('Could not update capacity of {}, msg={}'.format( logger.warning('Could not update status of isolated instance {}, msg={}'.format(
instance.hostname, task_result.get('msg', 'unknown failure'))) instance.hostname, task_result.get('msg', 'unknown failure')
))
if instance.is_lost(isolated=True):
instance.capacity = 0
instance.save(update_fields=['capacity'])
logger.error('Isolated instance {} last checked in at {}, marked as lost.'.format(
instance.hostname, instance.modified))
@staticmethod @staticmethod
def wrap_stdout_handle(instance, private_data_dir, stdout_handle, event_data_key='job_id'): def wrap_stdout_handle(instance, private_data_dir, stdout_handle, event_data_key='job_id'):

View File

@ -30,7 +30,7 @@ class Command(BaseCommand):
with advisory_lock('instance_registration_%s' % hostname): with advisory_lock('instance_registration_%s' % hostname):
instance = Instance.objects.filter(hostname=hostname) instance = Instance.objects.filter(hostname=hostname)
if instance.exists(): if instance.exists():
print("Instance already registered {}".format(instance[0])) print("Instance already registered {}".format(instance[0].hostname))
return return
instance = Instance(uuid=self.uuid, hostname=hostname) instance = Instance(uuid=self.uuid, hostname=hostname)
instance.save() instance.save()

View File

@ -5,6 +5,8 @@ from django.db import models
from django.db.models.signals import post_save from django.db.models.signals import post_save
from django.dispatch import receiver from django.dispatch import receiver
from django.utils.translation import ugettext_lazy as _ from django.utils.translation import ugettext_lazy as _
from django.conf import settings
from django.utils.timezone import now, timedelta
from solo.models import SingletonModel from solo.models import SingletonModel
@ -53,6 +55,14 @@ class Instance(models.Model):
# NOTE: TODO: Likely to repurpose this once standalone ramparts are a thing # NOTE: TODO: Likely to repurpose this once standalone ramparts are a thing
return "awx" return "awx"
def is_lost(self, ref_time=None, isolated=False):
if ref_time is None:
ref_time = now()
grace_period = 120
if isolated:
grace_period = settings.AWX_ISOLATED_PERIODIC_CHECK * 2
return self.modified < ref_time - timedelta(seconds=grace_period)
class InstanceGroup(models.Model): class InstanceGroup(models.Model):
"""A model representing a Queue/Group of AWX Instances.""" """A model representing a Queue/Group of AWX Instances."""

View File

@ -33,7 +33,7 @@ from celery.signals import celeryd_init, worker_process_init
# Django # Django
from django.conf import settings from django.conf import settings
from django.db import transaction, DatabaseError, IntegrityError from django.db import transaction, DatabaseError, IntegrityError, OperationalError
from django.utils.timezone import now, timedelta from django.utils.timezone import now, timedelta
from django.utils.encoding import smart_str from django.utils.encoding import smart_str
from django.core.mail import send_mail from django.core.mail import send_mail
@ -184,32 +184,54 @@ def purge_old_stdout_files(self):
def cluster_node_heartbeat(self): def cluster_node_heartbeat(self):
logger.debug("Cluster node heartbeat task.") logger.debug("Cluster node heartbeat task.")
nowtime = now() nowtime = now()
inst = Instance.objects.filter(hostname=settings.CLUSTER_HOST_ID) instance_list = list(Instance.objects.filter(rampart_groups__controller__isnull=True).distinct())
if inst.exists(): this_inst = None
inst = inst[0] lost_instances = []
inst.capacity = get_system_task_capacity() for inst in list(instance_list):
inst.version = awx_application_version if inst.hostname == settings.CLUSTER_HOST_ID:
inst.save() this_inst = inst
instance_list.remove(inst)
elif inst.is_lost(ref_time=nowtime):
lost_instances.append(inst)
instance_list.remove(inst)
if this_inst:
startup_event = this_inst.is_lost(ref_time=nowtime)
if this_inst.capacity == 0:
logger.warning('Rejoining the cluster as instance {}.'.format(this_inst.hostname))
this_inst.capacity = get_system_task_capacity()
this_inst.version = awx_application_version
this_inst.save(update_fields=['capacity', 'version', 'modified'])
if startup_event:
return
else: else:
raise RuntimeError("Cluster Host Not Found: {}".format(settings.CLUSTER_HOST_ID)) raise RuntimeError("Cluster Host Not Found: {}".format(settings.CLUSTER_HOST_ID))
recent_inst = Instance.objects.filter(modified__gt=nowtime - timedelta(seconds=70)).exclude(hostname=settings.CLUSTER_HOST_ID)
# IFF any node has a greater version than we do, then we'll shutdown services # IFF any node has a greater version than we do, then we'll shutdown services
for other_inst in recent_inst: for other_inst in instance_list:
if other_inst.version == "": if other_inst.version == "":
continue continue
if Version(other_inst.version.split('-', 1)[0]) > Version(awx_application_version) and not settings.DEBUG: if Version(other_inst.version.split('-', 1)[0]) > Version(awx_application_version) and not settings.DEBUG:
logger.error("Host {} reports version {}, but this node {} is at {}, shutting down".format(other_inst.hostname, logger.error("Host {} reports version {}, but this node {} is at {}, shutting down".format(other_inst.hostname,
other_inst.version, other_inst.version,
inst.hostname, this_inst.hostname,
inst.version)) this_inst.version))
# Set the capacity to zero to ensure no Jobs get added to this instance. # Set the capacity to zero to ensure no Jobs get added to this instance.
# The heartbeat task will reset the capacity to the system capacity after upgrade. # The heartbeat task will reset the capacity to the system capacity after upgrade.
inst.capacity = 0 this_inst.capacity = 0
inst.save() this_inst.save(update_fields=['capacity'])
stop_local_services(['uwsgi', 'celery', 'beat', 'callback', 'fact']) stop_local_services(['uwsgi', 'celery', 'beat', 'callback', 'fact'])
# We wait for the Popen call inside stop_local_services above # We wait for the Popen call inside stop_local_services above
# so the line below will rarely if ever be executed. # so the line below will rarely if ever be executed.
raise RuntimeError("Shutting down.") raise RuntimeError("Shutting down.")
for other_inst in lost_instances:
if other_inst.capacity == 0:
continue
try:
other_inst.capacity = 0
other_inst.save(update_fields=['capacity'])
logger.error("Host {} last checked in at {}, marked as lost.".format(
other_inst.hostname, other_inst.modified))
except (IntegrityError, OperationalError):
pass # another instance is updating the lost instance
@task(bind=True, base=LogErrorsTask) @task(bind=True, base=LogErrorsTask)