mirror of
https://github.com/ansible/awx.git
synced 2024-11-01 08:21:15 +03:00
fix scheduled jobs race condition
* The periodic scheduler that runs and spawns jobs from Schedule()'s can end up spawning more jobs than intended, for a single Schedule. Specifically, when tower clustering is involed. This change adds a "global" database lock around this critical code. If another process is already doing the scheduling, short circuit.
This commit is contained in:
parent
1328fb80a0
commit
d4c3c089df
@ -435,53 +435,59 @@ def awx_isolated_heartbeat():
|
||||
|
||||
@task()
|
||||
def awx_periodic_scheduler():
|
||||
run_now = now()
|
||||
state = TowerScheduleState.get_solo()
|
||||
last_run = state.schedule_last_run
|
||||
logger.debug("Last scheduler run was: %s", last_run)
|
||||
state.schedule_last_run = run_now
|
||||
state.save()
|
||||
with advisory_lock('awx_periodic_scheduler_lock', wait=False) as acquired:
|
||||
if acquired is False:
|
||||
logger.debug("Not running periodic scheduler, another task holds lock")
|
||||
return
|
||||
logger.debug("Starting periodic scheduler")
|
||||
|
||||
old_schedules = Schedule.objects.enabled().before(last_run)
|
||||
for schedule in old_schedules:
|
||||
schedule.save()
|
||||
schedules = Schedule.objects.enabled().between(last_run, run_now)
|
||||
run_now = now()
|
||||
state = TowerScheduleState.get_solo()
|
||||
last_run = state.schedule_last_run
|
||||
logger.debug("Last scheduler run was: %s", last_run)
|
||||
state.schedule_last_run = run_now
|
||||
state.save()
|
||||
|
||||
invalid_license = False
|
||||
try:
|
||||
access_registry[Job](None).check_license()
|
||||
except PermissionDenied as e:
|
||||
invalid_license = e
|
||||
old_schedules = Schedule.objects.enabled().before(last_run)
|
||||
for schedule in old_schedules:
|
||||
schedule.save()
|
||||
schedules = Schedule.objects.enabled().between(last_run, run_now)
|
||||
|
||||
for schedule in schedules:
|
||||
template = schedule.unified_job_template
|
||||
schedule.save() # To update next_run timestamp.
|
||||
if template.cache_timeout_blocked:
|
||||
logger.warn("Cache timeout is in the future, bypassing schedule for template %s" % str(template.id))
|
||||
continue
|
||||
invalid_license = False
|
||||
try:
|
||||
job_kwargs = schedule.get_job_kwargs()
|
||||
new_unified_job = schedule.unified_job_template.create_unified_job(**job_kwargs)
|
||||
logger.info('Spawned {} from schedule {}-{}.'.format(
|
||||
new_unified_job.log_format, schedule.name, schedule.pk))
|
||||
access_registry[Job](None).check_license()
|
||||
except PermissionDenied as e:
|
||||
invalid_license = e
|
||||
|
||||
if invalid_license:
|
||||
for schedule in schedules:
|
||||
template = schedule.unified_job_template
|
||||
schedule.save() # To update next_run timestamp.
|
||||
if template.cache_timeout_blocked:
|
||||
logger.warn("Cache timeout is in the future, bypassing schedule for template %s" % str(template.id))
|
||||
continue
|
||||
try:
|
||||
job_kwargs = schedule.get_job_kwargs()
|
||||
new_unified_job = schedule.unified_job_template.create_unified_job(**job_kwargs)
|
||||
logger.info(six.text_type('Spawned {} from schedule {}-{}.').format(
|
||||
new_unified_job.log_format, schedule.name, schedule.pk))
|
||||
|
||||
if invalid_license:
|
||||
new_unified_job.status = 'failed'
|
||||
new_unified_job.job_explanation = str(invalid_license)
|
||||
new_unified_job.save(update_fields=['status', 'job_explanation'])
|
||||
new_unified_job.websocket_emit_status("failed")
|
||||
raise invalid_license
|
||||
can_start = new_unified_job.signal_start()
|
||||
except Exception:
|
||||
logger.exception('Error spawning scheduled job.')
|
||||
continue
|
||||
if not can_start:
|
||||
new_unified_job.status = 'failed'
|
||||
new_unified_job.job_explanation = str(invalid_license)
|
||||
new_unified_job.job_explanation = "Scheduled job could not start because it was not in the right state or required manual credentials"
|
||||
new_unified_job.save(update_fields=['status', 'job_explanation'])
|
||||
new_unified_job.websocket_emit_status("failed")
|
||||
raise invalid_license
|
||||
can_start = new_unified_job.signal_start()
|
||||
except Exception:
|
||||
logger.exception('Error spawning scheduled job.')
|
||||
continue
|
||||
if not can_start:
|
||||
new_unified_job.status = 'failed'
|
||||
new_unified_job.job_explanation = "Scheduled job could not start because it was not in the right state or required manual credentials"
|
||||
new_unified_job.save(update_fields=['status', 'job_explanation'])
|
||||
new_unified_job.websocket_emit_status("failed")
|
||||
emit_channel_notification('schedules-changed', dict(id=schedule.id, group_name="schedules"))
|
||||
state.save()
|
||||
emit_channel_notification('schedules-changed', dict(id=schedule.id, group_name="schedules"))
|
||||
state.save()
|
||||
|
||||
|
||||
@task()
|
||||
|
Loading…
Reference in New Issue
Block a user