diff --git a/awx/main/tasks.py b/awx/main/tasks.py index f4e8a6ea4c..d8fa4a88c0 100644 --- a/awx/main/tasks.py +++ b/awx/main/tasks.py @@ -435,53 +435,59 @@ def awx_isolated_heartbeat(): @task() def awx_periodic_scheduler(): - run_now = now() - state = TowerScheduleState.get_solo() - last_run = state.schedule_last_run - logger.debug("Last scheduler run was: %s", last_run) - state.schedule_last_run = run_now - state.save() + with advisory_lock('awx_periodic_scheduler_lock', wait=False) as acquired: + if acquired is False: + logger.debug("Not running periodic scheduler, another task holds lock") + return + logger.debug("Starting periodic scheduler") - old_schedules = Schedule.objects.enabled().before(last_run) - for schedule in old_schedules: - schedule.save() - schedules = Schedule.objects.enabled().between(last_run, run_now) + run_now = now() + state = TowerScheduleState.get_solo() + last_run = state.schedule_last_run + logger.debug("Last scheduler run was: %s", last_run) + state.schedule_last_run = run_now + state.save() - invalid_license = False - try: - access_registry[Job](None).check_license() - except PermissionDenied as e: - invalid_license = e + old_schedules = Schedule.objects.enabled().before(last_run) + for schedule in old_schedules: + schedule.save() + schedules = Schedule.objects.enabled().between(last_run, run_now) - for schedule in schedules: - template = schedule.unified_job_template - schedule.save() # To update next_run timestamp. - if template.cache_timeout_blocked: - logger.warn("Cache timeout is in the future, bypassing schedule for template %s" % str(template.id)) - continue + invalid_license = False try: - job_kwargs = schedule.get_job_kwargs() - new_unified_job = schedule.unified_job_template.create_unified_job(**job_kwargs) - logger.info('Spawned {} from schedule {}-{}.'.format( - new_unified_job.log_format, schedule.name, schedule.pk)) + access_registry[Job](None).check_license() + except PermissionDenied as e: + invalid_license = e - if invalid_license: + for schedule in schedules: + template = schedule.unified_job_template + schedule.save() # To update next_run timestamp. + if template.cache_timeout_blocked: + logger.warn("Cache timeout is in the future, bypassing schedule for template %s" % str(template.id)) + continue + try: + job_kwargs = schedule.get_job_kwargs() + new_unified_job = schedule.unified_job_template.create_unified_job(**job_kwargs) + logger.info(six.text_type('Spawned {} from schedule {}-{}.').format( + new_unified_job.log_format, schedule.name, schedule.pk)) + + if invalid_license: + new_unified_job.status = 'failed' + new_unified_job.job_explanation = str(invalid_license) + new_unified_job.save(update_fields=['status', 'job_explanation']) + new_unified_job.websocket_emit_status("failed") + raise invalid_license + can_start = new_unified_job.signal_start() + except Exception: + logger.exception('Error spawning scheduled job.') + continue + if not can_start: new_unified_job.status = 'failed' - new_unified_job.job_explanation = str(invalid_license) + new_unified_job.job_explanation = "Scheduled job could not start because it was not in the right state or required manual credentials" new_unified_job.save(update_fields=['status', 'job_explanation']) new_unified_job.websocket_emit_status("failed") - raise invalid_license - can_start = new_unified_job.signal_start() - except Exception: - logger.exception('Error spawning scheduled job.') - continue - if not can_start: - new_unified_job.status = 'failed' - new_unified_job.job_explanation = "Scheduled job could not start because it was not in the right state or required manual credentials" - new_unified_job.save(update_fields=['status', 'job_explanation']) - new_unified_job.websocket_emit_status("failed") - emit_channel_notification('schedules-changed', dict(id=schedule.id, group_name="schedules")) - state.save() + emit_channel_notification('schedules-changed', dict(id=schedule.id, group_name="schedules")) + state.save() @task()