From 38a08d163c31d934ecc02fd8a57ae3f3e85e45c9 Mon Sep 17 00:00:00 2001 From: Ryan Petrello Date: Thu, 6 Feb 2020 07:08:27 -0500 Subject: [PATCH] get rid of celery/celerybeat alternative to https://github.com/ansible/awx/pull/2530 which makes use of https://pypi.org/project/schedule/ this doesn't have support for any persistence (like how celery beat uses a shelve file), because all of our periodic jobs run at most every few minutes --- awx/main/dispatch/periodic.py | 52 +++++++++++++ .../management/commands/run_dispatcher.py | 74 ++----------------- awx/main/models/unified_jobs.py | 2 + awx/settings/defaults.py | 5 -- docs/licenses/billiard.txt | 28 ------- docs/licenses/celery.txt | 54 -------------- docs/licenses/schedule.txt | 21 ++++++ requirements/README.md | 5 -- requirements/requirements.in | 4 +- requirements/requirements.txt | 9 +-- 10 files changed, 87 insertions(+), 167 deletions(-) create mode 100644 awx/main/dispatch/periodic.py delete mode 100644 docs/licenses/billiard.txt delete mode 100644 docs/licenses/celery.txt create mode 100644 docs/licenses/schedule.txt diff --git a/awx/main/dispatch/periodic.py b/awx/main/dispatch/periodic.py new file mode 100644 index 0000000000..4d839d0521 --- /dev/null +++ b/awx/main/dispatch/periodic.py @@ -0,0 +1,52 @@ +import logging +import threading +import time + +from django.conf import settings +from django.db import connections +from schedule import Scheduler + +from awx.main.dispatch.worker import TaskWorker + +logger = logging.getLogger('awx.main.dispatch.periodic') + + +class Scheduler(Scheduler): + + def run_continuously(self): + cease_continuous_run = threading.Event() + idle_seconds = max( + 1, + min(self.jobs).period.total_seconds() / 2 + ) + + class ScheduleThread(threading.Thread): + @classmethod + def run(cls): + while not cease_continuous_run.is_set(): + try: + for conn in connections.all(): + # If the database connection has a hiccup, re-establish a new + # connection + conn.close_if_unusable_or_obsolete() + self.run_pending() + except Exception: + logger.exception( + 'encountered an error while scheduling periodic tasks' + ) + time.sleep(idle_seconds) + logger.debug('periodic thread exiting...') + + thread = ScheduleThread() + thread.daemon = True + thread.start() + return cease_continuous_run + + +def run_continuously(): + scheduler = Scheduler() + for task in settings.CELERYBEAT_SCHEDULE.values(): + apply_async = TaskWorker.resolve_callable(task['task']).apply_async + total_seconds = task['schedule'].total_seconds() + scheduler.every(total_seconds).seconds.do(apply_async) + return scheduler.run_continuously() diff --git a/awx/main/management/commands/run_dispatcher.py b/awx/main/management/commands/run_dispatcher.py index b57034b9f8..13c7d21369 100644 --- a/awx/main/management/commands/run_dispatcher.py +++ b/awx/main/management/commands/run_dispatcher.py @@ -16,6 +16,7 @@ from awx.main.dispatch.control import Control from awx.main.dispatch.kombu import Connection from awx.main.dispatch.pool import AutoscalePool from awx.main.dispatch.worker import AWXConsumer, TaskWorker +from awx.main.dispatch import periodic logger = logging.getLogger('awx.main.dispatch') @@ -36,71 +37,6 @@ class Command(BaseCommand): help=('cause the dispatcher to recycle all of its worker processes;' 'running jobs will run to completion first')) - def beat(self): - from celery import Celery - from celery.beat import PersistentScheduler - from celery.apps import beat - - class AWXScheduler(PersistentScheduler): - - def __init__(self, *args, **kwargs): - self.ppid = os.getppid() - super(AWXScheduler, self).__init__(*args, **kwargs) - - def setup_schedule(self): - super(AWXScheduler, self).setup_schedule() - self.update_from_dict(settings.CELERYBEAT_SCHEDULE) - - def tick(self, *args, **kwargs): - if os.getppid() != self.ppid: - # if the parent PID changes, this process has been orphaned - # via e.g., segfault or sigkill, we should exit too - raise SystemExit() - return super(AWXScheduler, self).tick(*args, **kwargs) - - def apply_async(self, entry, producer=None, advance=True, **kwargs): - for conn in connections.all(): - # If the database connection has a hiccup, re-establish a new - # connection - conn.close_if_unusable_or_obsolete() - task = TaskWorker.resolve_callable(entry.task) - result, queue = task.apply_async() - - class TaskResult(object): - id = result['uuid'] - - return TaskResult() - - sched_file = '/var/lib/awx/beat.db' - app = Celery() - app.conf.BROKER_URL = settings.BROKER_URL - app.conf.CELERY_TASK_RESULT_EXPIRES = False - - # celery in py3 seems to have a bug where the celerybeat schedule - # shelve can become corrupted; we've _only_ seen this in Ubuntu and py36 - # it can be avoided by detecting and removing the corrupted file - # at some point, we'll just stop using celerybeat, because it's clearly - # buggy, too -_- - # - # https://github.com/celery/celery/issues/4777 - sched = AWXScheduler(schedule_filename=sched_file, app=app) - try: - sched.setup_schedule() - except Exception: - logger.exception('{} is corrupted, removing.'.format(sched_file)) - sched._remove_db() - finally: - try: - sched.close() - except Exception: - logger.exception('{} failed to sync/close'.format(sched_file)) - - beat.Beat( - 30, - app, - schedule=sched_file, scheduler_cls=AWXScheduler - ).run() - def handle(self, *arg, **options): if options.get('status'): print(Control('dispatcher').status()) @@ -116,9 +52,10 @@ class Command(BaseCommand): # for the DB and memcached connections (that way lies race conditions) django_connection.close() django_cache.close() - beat = Process(target=self.beat) - beat.daemon = True - beat.start() + + # spawn a daemon thread to periodically enqueues scheduled tasks + # (like the node heartbeat) + cease_continuous_run = periodic.run_continuously() reaper.reap() consumer = None @@ -152,6 +89,7 @@ class Command(BaseCommand): ) consumer.run() except KeyboardInterrupt: + cease_continuous_run.set() logger.debug('Terminating Task Dispatcher') if consumer: consumer.stop() diff --git a/awx/main/models/unified_jobs.py b/awx/main/models/unified_jobs.py index 67883744de..666cf8c6bf 100644 --- a/awx/main/models/unified_jobs.py +++ b/awx/main/models/unified_jobs.py @@ -1014,6 +1014,8 @@ class UnifiedJob(PolymorphicModel, PasswordFieldsModel, CommonModelNameNotUnique dir=settings.JOBOUTPUT_ROOT, encoding='utf-8' ) + from awx.main.tasks import purge_old_stdout_files # circular import + purge_old_stdout_files.apply_async() # Before the addition of event-based stdout, older versions of # awx stored stdout as raw text blobs in a certain database column diff --git a/awx/settings/defaults.py b/awx/settings/defaults.py index 4efc70b0f2..c5c0ad5e89 100644 --- a/awx/settings/defaults.py +++ b/awx/settings/defaults.py @@ -434,10 +434,6 @@ CELERYBEAT_SCHEDULE = { 'schedule': timedelta(seconds=60), 'options': {'expires': 50,} }, - 'purge_stdout_files': { - 'task': 'awx.main.tasks.purge_old_stdout_files', - 'schedule': timedelta(days=7) - }, 'gather_analytics': { 'task': 'awx.main.tasks.gather_analytics', 'schedule': timedelta(minutes=5) @@ -454,7 +450,6 @@ CELERYBEAT_SCHEDULE = { }, # 'isolated_heartbeat': set up at the end of production.py and development.py } -AWX_INCONSISTENT_TASK_INTERVAL = 60 * 3 AWX_CELERY_QUEUES_STATIC = [ CELERY_DEFAULT_QUEUE, diff --git a/docs/licenses/billiard.txt b/docs/licenses/billiard.txt deleted file mode 100644 index 5163879147..0000000000 --- a/docs/licenses/billiard.txt +++ /dev/null @@ -1,28 +0,0 @@ -Copyright (c) 2006-2008, R Oudkerk and Contributors - -All rights reserved. - -Redistribution and use in source and binary forms, with or without -modification, are permitted provided that the following conditions -are met: - -1. Redistributions of source code must retain the above copyright - notice, this list of conditions and the following disclaimer. -2. Redistributions in binary form must reproduce the above copyright - notice, this list of conditions and the following disclaimer in the - documentation and/or other materials provided with the distribution. -3. Neither the name of author nor the names of any contributors may be - used to endorse or promote products derived from this software - without specific prior written permission. - -THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS "AS IS" AND -ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE -IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE -ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE -FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL -DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS -OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) -HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT -LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY -OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF -SUCH DAMAGE. diff --git a/docs/licenses/celery.txt b/docs/licenses/celery.txt deleted file mode 100644 index 92a530c9be..0000000000 --- a/docs/licenses/celery.txt +++ /dev/null @@ -1,54 +0,0 @@ -Copyright (c) 2015 Ask Solem & contributors. All rights reserved. -Copyright (c) 2012-2014 GoPivotal, Inc. All rights reserved. -Copyright (c) 2009, 2010, 2011, 2012 Ask Solem, and individual contributors. All rights reserved. - -Celery is licensed under The BSD License (3 Clause, also known as -the new BSD license). The license is an OSI approved Open Source -license and is GPL-compatible(1). - -The license text can also be found here: -http://www.opensource.org/licenses/BSD-3-Clause - -License -======= - -Redistribution and use in source and binary forms, with or without -modification, are permitted provided that the following conditions are met: - * Redistributions of source code must retain the above copyright - notice, this list of conditions and the following disclaimer. - * Redistributions in binary form must reproduce the above copyright - notice, this list of conditions and the following disclaimer in the - documentation and/or other materials provided with the distribution. - * Neither the name of Ask Solem, nor the - names of its contributors may be used to endorse or promote products - derived from this software without specific prior written permission. - -THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" -AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, -THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR -PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL Ask Solem OR CONTRIBUTORS -BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR -CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF -SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS -INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN -CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) -ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE -POSSIBILITY OF SUCH DAMAGE. - -Documentation License -===================== - -The documentation portion of Celery (the rendered contents of the -"docs" directory of a software distribution or checkout) is supplied -under the "Creative Commons Attribution-ShareAlike 4.0 -International" (CC BY-SA 4.0) License as described by -http://creativecommons.org/licenses/by-sa/4.0/ - -Footnotes -========= -(1) A GPL-compatible license makes it possible to - combine Celery with other software that is released - under the GPL, it does not mean that we're distributing - Celery under the GPL license. The BSD license, unlike the GPL, - let you distribute a modified version without making your - changes open source. diff --git a/docs/licenses/schedule.txt b/docs/licenses/schedule.txt new file mode 100644 index 0000000000..7c51781e88 --- /dev/null +++ b/docs/licenses/schedule.txt @@ -0,0 +1,21 @@ +The MIT License (MIT) + +Copyright (c) 2013 Daniel Bader (http://dbader.org) + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in +all copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +THE SOFTWARE. diff --git a/requirements/README.md b/requirements/README.md index 40b68f0172..3485ed0593 100644 --- a/requirements/README.md +++ b/requirements/README.md @@ -137,11 +137,6 @@ in the top-level Makefile. ## Library Notes -### celery - -This is only used for the beat feature (running periodic tasks). -This could be replaced, see: https://github.com/ansible/awx/pull/2530 - ### requests-futures This can be removed when a solution for the external log queuing is ready. diff --git a/requirements/requirements.in b/requirements/requirements.in index 4bc1157ea1..7cf75de287 100644 --- a/requirements/requirements.in +++ b/requirements/requirements.in @@ -4,7 +4,6 @@ asgi-amqp>=1.1.4 # see library notes, related to channels 2 azure-keyvault==1.1.0 # see UPGRADE BLOCKERs boto # replacement candidate https://github.com/ansible/awx/issues/2115 channels==1.1.8 # UPGRADE BLOCKER: Last before backwards-incompatible channels 2 upgrade -celery==4.3.0 # see library notes daphne==1.4.2 # UPGRADE BLOCKER: last before channels 2 but not pinned by other deps django==2.2.8 # see UPGRADE BLOCKERs django-auth-ldap @@ -35,6 +34,7 @@ pyparsing python-memcached python-radius python3-saml +schedule==0.6.0 social-auth-core==3.2.0 # see UPGRADE BLOCKERs social-auth-app-django==3.1.0 # see UPGRADE BLOCKERs requests @@ -45,4 +45,4 @@ twilio uWSGI uwsgitop pip==19.3.1 # see UPGRADE BLOCKERs -setuptools==41.6.0 # see UPGRADE BLOCKERs \ No newline at end of file +setuptools==41.6.0 # see UPGRADE BLOCKERs diff --git a/requirements/requirements.txt b/requirements/requirements.txt index b41c75ab81..4e0d5b6c39 100644 --- a/requirements/requirements.txt +++ b/requirements/requirements.txt @@ -10,10 +10,8 @@ automat==0.8.0 # via twisted azure-common==1.1.23 # via azure-keyvault azure-keyvault==1.1.0 azure-nspkg==3.0.2 # via azure-keyvault -billiard==3.6.1.0 # via celery boto==2.49.0 cachetools==3.1.1 # via google-auth -celery==4.3.0 certifi==2019.11.28 # via kubernetes, msrest, requests cffi==1.13.2 # via cryptography channels==1.1.8 @@ -62,7 +60,7 @@ jaraco.text==3.2.0 # via irc, jaraco.collections jinja2==2.10.3 jsonpickle==1.2 # via asgi-amqp jsonschema==3.2.0 -kombu==4.6.7 # via asgi-amqp, celery +kombu==4.6.7 # via asgi-amqp kubernetes==10.0.1 # via openshift lockfile==0.12.2 # via python-daemon lxml==4.4.2 # via xmlsec @@ -98,7 +96,7 @@ python-radius==1.0 python-string-utils==0.6.0 # via openshift python3-openid==3.1.0 # via social-auth-core python3-saml==1.9.0 -pytz==2019.3 # via celery, django, irc, tempora, twilio +pytz==2019.3 # via django, irc, tempora, twilio pyyaml==5.2 # via ansible-runner, djangorestframework-yaml, kubernetes requests-futures==1.0.0 requests-oauthlib==1.3.0 # via kubernetes, msrest, social-auth-core @@ -106,6 +104,7 @@ requests==2.22.0 rsa==4.0 # via google-auth ruamel.yaml.clib==0.2.0 # via ruamel.yaml ruamel.yaml==0.16.5 # via openshift +schedule==0.6.0 six==1.13.0 # via ansible-runner, asgi-amqp, asgiref, autobahn, automat, cryptography, django-extensions, google-auth, isodate, jaraco.classes, jaraco.collections, jaraco.itertools, jaraco.logging, jaraco.text, jsonschema, kubernetes, openshift, pygerduty, pyhamcrest, pyrad, pyrsistent, python-dateutil, python-memcached, slackclient, social-auth-app-django, social-auth-core, tacacs-plus, tempora, twilio, txaio, websocket-client slackclient==1.1.2 smmap2==2.0.5 # via gitdb2 @@ -120,7 +119,7 @@ txaio==18.8.1 # via autobahn urllib3==1.25.7 # via kubernetes, requests uwsgi==2.0.18 uwsgitop==0.11 -vine==1.3.0 # via amqp, celery +vine==1.3.0 # via amqp websocket-client==0.56.0 # via kubernetes, slackclient xmlsec==1.3.3 # via python3-saml zipp==0.6.0 # via importlib-metadata