1
0
mirror of https://github.com/ansible/awx.git synced 2024-11-01 08:21:15 +03:00

fix scheduled jobs race condition

* The periodic scheduler that runs and spawns jobs from Schedule()'s can
end up spawning more jobs than intended, for a single Schedule.
Specifically, when tower clustering is involed. This change adds a
"global" database lock around this critical code. If another process is
already doing the scheduling, short circuit.
This commit is contained in:
chris meyers 2019-02-07 16:57:06 -05:00
parent 1328fb80a0
commit d4c3c089df

View File

@ -435,53 +435,59 @@ def awx_isolated_heartbeat():
@task()
def awx_periodic_scheduler():
run_now = now()
state = TowerScheduleState.get_solo()
last_run = state.schedule_last_run
logger.debug("Last scheduler run was: %s", last_run)
state.schedule_last_run = run_now
state.save()
with advisory_lock('awx_periodic_scheduler_lock', wait=False) as acquired:
if acquired is False:
logger.debug("Not running periodic scheduler, another task holds lock")
return
logger.debug("Starting periodic scheduler")
old_schedules = Schedule.objects.enabled().before(last_run)
for schedule in old_schedules:
schedule.save()
schedules = Schedule.objects.enabled().between(last_run, run_now)
run_now = now()
state = TowerScheduleState.get_solo()
last_run = state.schedule_last_run
logger.debug("Last scheduler run was: %s", last_run)
state.schedule_last_run = run_now
state.save()
invalid_license = False
try:
access_registry[Job](None).check_license()
except PermissionDenied as e:
invalid_license = e
old_schedules = Schedule.objects.enabled().before(last_run)
for schedule in old_schedules:
schedule.save()
schedules = Schedule.objects.enabled().between(last_run, run_now)
for schedule in schedules:
template = schedule.unified_job_template
schedule.save() # To update next_run timestamp.
if template.cache_timeout_blocked:
logger.warn("Cache timeout is in the future, bypassing schedule for template %s" % str(template.id))
continue
invalid_license = False
try:
job_kwargs = schedule.get_job_kwargs()
new_unified_job = schedule.unified_job_template.create_unified_job(**job_kwargs)
logger.info('Spawned {} from schedule {}-{}.'.format(
new_unified_job.log_format, schedule.name, schedule.pk))
access_registry[Job](None).check_license()
except PermissionDenied as e:
invalid_license = e
if invalid_license:
for schedule in schedules:
template = schedule.unified_job_template
schedule.save() # To update next_run timestamp.
if template.cache_timeout_blocked:
logger.warn("Cache timeout is in the future, bypassing schedule for template %s" % str(template.id))
continue
try:
job_kwargs = schedule.get_job_kwargs()
new_unified_job = schedule.unified_job_template.create_unified_job(**job_kwargs)
logger.info(six.text_type('Spawned {} from schedule {}-{}.').format(
new_unified_job.log_format, schedule.name, schedule.pk))
if invalid_license:
new_unified_job.status = 'failed'
new_unified_job.job_explanation = str(invalid_license)
new_unified_job.save(update_fields=['status', 'job_explanation'])
new_unified_job.websocket_emit_status("failed")
raise invalid_license
can_start = new_unified_job.signal_start()
except Exception:
logger.exception('Error spawning scheduled job.')
continue
if not can_start:
new_unified_job.status = 'failed'
new_unified_job.job_explanation = str(invalid_license)
new_unified_job.job_explanation = "Scheduled job could not start because it was not in the right state or required manual credentials"
new_unified_job.save(update_fields=['status', 'job_explanation'])
new_unified_job.websocket_emit_status("failed")
raise invalid_license
can_start = new_unified_job.signal_start()
except Exception:
logger.exception('Error spawning scheduled job.')
continue
if not can_start:
new_unified_job.status = 'failed'
new_unified_job.job_explanation = "Scheduled job could not start because it was not in the right state or required manual credentials"
new_unified_job.save(update_fields=['status', 'job_explanation'])
new_unified_job.websocket_emit_status("failed")
emit_channel_notification('schedules-changed', dict(id=schedule.id, group_name="schedules"))
state.save()
emit_channel_notification('schedules-changed', dict(id=schedule.id, group_name="schedules"))
state.save()
@task()