mirror of
https://github.com/ansible/awx.git
synced 2024-10-31 23:51:09 +03:00
Merge pull request #5837 from ryanpetrello/celery-tastes-gross
get rid of celerybeat (and celery + billiard dependency) Reviewed-by: https://github.com/apps/softwarefactory-project-zuul
This commit is contained in:
commit
47383e05d6
52
awx/main/dispatch/periodic.py
Normal file
52
awx/main/dispatch/periodic.py
Normal file
@ -0,0 +1,52 @@
|
||||
import logging
|
||||
import threading
|
||||
import time
|
||||
|
||||
from django.conf import settings
|
||||
from django.db import connections
|
||||
from schedule import Scheduler
|
||||
|
||||
from awx.main.dispatch.worker import TaskWorker
|
||||
|
||||
logger = logging.getLogger('awx.main.dispatch.periodic')
|
||||
|
||||
|
||||
class Scheduler(Scheduler):
|
||||
|
||||
def run_continuously(self):
|
||||
cease_continuous_run = threading.Event()
|
||||
idle_seconds = max(
|
||||
1,
|
||||
min(self.jobs).period.total_seconds() / 2
|
||||
)
|
||||
|
||||
class ScheduleThread(threading.Thread):
|
||||
@classmethod
|
||||
def run(cls):
|
||||
while not cease_continuous_run.is_set():
|
||||
try:
|
||||
for conn in connections.all():
|
||||
# If the database connection has a hiccup, re-establish a new
|
||||
# connection
|
||||
conn.close_if_unusable_or_obsolete()
|
||||
self.run_pending()
|
||||
except Exception:
|
||||
logger.exception(
|
||||
'encountered an error while scheduling periodic tasks'
|
||||
)
|
||||
time.sleep(idle_seconds)
|
||||
logger.debug('periodic thread exiting...')
|
||||
|
||||
thread = ScheduleThread()
|
||||
thread.daemon = True
|
||||
thread.start()
|
||||
return cease_continuous_run
|
||||
|
||||
|
||||
def run_continuously():
|
||||
scheduler = Scheduler()
|
||||
for task in settings.CELERYBEAT_SCHEDULE.values():
|
||||
apply_async = TaskWorker.resolve_callable(task['task']).apply_async
|
||||
total_seconds = task['schedule'].total_seconds()
|
||||
scheduler.every(total_seconds).seconds.do(apply_async)
|
||||
return scheduler.run_continuously()
|
@ -16,6 +16,7 @@ from awx.main.dispatch.control import Control
|
||||
from awx.main.dispatch.kombu import Connection
|
||||
from awx.main.dispatch.pool import AutoscalePool
|
||||
from awx.main.dispatch.worker import AWXConsumer, TaskWorker
|
||||
from awx.main.dispatch import periodic
|
||||
|
||||
logger = logging.getLogger('awx.main.dispatch')
|
||||
|
||||
@ -36,71 +37,6 @@ class Command(BaseCommand):
|
||||
help=('cause the dispatcher to recycle all of its worker processes;'
|
||||
'running jobs will run to completion first'))
|
||||
|
||||
def beat(self):
|
||||
from celery import Celery
|
||||
from celery.beat import PersistentScheduler
|
||||
from celery.apps import beat
|
||||
|
||||
class AWXScheduler(PersistentScheduler):
|
||||
|
||||
def __init__(self, *args, **kwargs):
|
||||
self.ppid = os.getppid()
|
||||
super(AWXScheduler, self).__init__(*args, **kwargs)
|
||||
|
||||
def setup_schedule(self):
|
||||
super(AWXScheduler, self).setup_schedule()
|
||||
self.update_from_dict(settings.CELERYBEAT_SCHEDULE)
|
||||
|
||||
def tick(self, *args, **kwargs):
|
||||
if os.getppid() != self.ppid:
|
||||
# if the parent PID changes, this process has been orphaned
|
||||
# via e.g., segfault or sigkill, we should exit too
|
||||
raise SystemExit()
|
||||
return super(AWXScheduler, self).tick(*args, **kwargs)
|
||||
|
||||
def apply_async(self, entry, producer=None, advance=True, **kwargs):
|
||||
for conn in connections.all():
|
||||
# If the database connection has a hiccup, re-establish a new
|
||||
# connection
|
||||
conn.close_if_unusable_or_obsolete()
|
||||
task = TaskWorker.resolve_callable(entry.task)
|
||||
result, queue = task.apply_async()
|
||||
|
||||
class TaskResult(object):
|
||||
id = result['uuid']
|
||||
|
||||
return TaskResult()
|
||||
|
||||
sched_file = '/var/lib/awx/beat.db'
|
||||
app = Celery()
|
||||
app.conf.BROKER_URL = settings.BROKER_URL
|
||||
app.conf.CELERY_TASK_RESULT_EXPIRES = False
|
||||
|
||||
# celery in py3 seems to have a bug where the celerybeat schedule
|
||||
# shelve can become corrupted; we've _only_ seen this in Ubuntu and py36
|
||||
# it can be avoided by detecting and removing the corrupted file
|
||||
# at some point, we'll just stop using celerybeat, because it's clearly
|
||||
# buggy, too -_-
|
||||
#
|
||||
# https://github.com/celery/celery/issues/4777
|
||||
sched = AWXScheduler(schedule_filename=sched_file, app=app)
|
||||
try:
|
||||
sched.setup_schedule()
|
||||
except Exception:
|
||||
logger.exception('{} is corrupted, removing.'.format(sched_file))
|
||||
sched._remove_db()
|
||||
finally:
|
||||
try:
|
||||
sched.close()
|
||||
except Exception:
|
||||
logger.exception('{} failed to sync/close'.format(sched_file))
|
||||
|
||||
beat.Beat(
|
||||
30,
|
||||
app,
|
||||
schedule=sched_file, scheduler_cls=AWXScheduler
|
||||
).run()
|
||||
|
||||
def handle(self, *arg, **options):
|
||||
if options.get('status'):
|
||||
print(Control('dispatcher').status())
|
||||
@ -116,9 +52,10 @@ class Command(BaseCommand):
|
||||
# for the DB and memcached connections (that way lies race conditions)
|
||||
django_connection.close()
|
||||
django_cache.close()
|
||||
beat = Process(target=self.beat)
|
||||
beat.daemon = True
|
||||
beat.start()
|
||||
|
||||
# spawn a daemon thread to periodically enqueues scheduled tasks
|
||||
# (like the node heartbeat)
|
||||
cease_continuous_run = periodic.run_continuously()
|
||||
|
||||
reaper.reap()
|
||||
consumer = None
|
||||
@ -152,6 +89,7 @@ class Command(BaseCommand):
|
||||
)
|
||||
consumer.run()
|
||||
except KeyboardInterrupt:
|
||||
cease_continuous_run.set()
|
||||
logger.debug('Terminating Task Dispatcher')
|
||||
if consumer:
|
||||
consumer.stop()
|
||||
|
@ -1014,6 +1014,8 @@ class UnifiedJob(PolymorphicModel, PasswordFieldsModel, CommonModelNameNotUnique
|
||||
dir=settings.JOBOUTPUT_ROOT,
|
||||
encoding='utf-8'
|
||||
)
|
||||
from awx.main.tasks import purge_old_stdout_files # circular import
|
||||
purge_old_stdout_files.apply_async()
|
||||
|
||||
# Before the addition of event-based stdout, older versions of
|
||||
# awx stored stdout as raw text blobs in a certain database column
|
||||
|
@ -434,10 +434,6 @@ CELERYBEAT_SCHEDULE = {
|
||||
'schedule': timedelta(seconds=60),
|
||||
'options': {'expires': 50,}
|
||||
},
|
||||
'purge_stdout_files': {
|
||||
'task': 'awx.main.tasks.purge_old_stdout_files',
|
||||
'schedule': timedelta(days=7)
|
||||
},
|
||||
'gather_analytics': {
|
||||
'task': 'awx.main.tasks.gather_analytics',
|
||||
'schedule': timedelta(minutes=5)
|
||||
@ -454,7 +450,6 @@ CELERYBEAT_SCHEDULE = {
|
||||
},
|
||||
# 'isolated_heartbeat': set up at the end of production.py and development.py
|
||||
}
|
||||
AWX_INCONSISTENT_TASK_INTERVAL = 60 * 3
|
||||
|
||||
AWX_CELERY_QUEUES_STATIC = [
|
||||
CELERY_DEFAULT_QUEUE,
|
||||
|
@ -1,28 +0,0 @@
|
||||
Copyright (c) 2006-2008, R Oudkerk and Contributors
|
||||
|
||||
All rights reserved.
|
||||
|
||||
Redistribution and use in source and binary forms, with or without
|
||||
modification, are permitted provided that the following conditions
|
||||
are met:
|
||||
|
||||
1. Redistributions of source code must retain the above copyright
|
||||
notice, this list of conditions and the following disclaimer.
|
||||
2. Redistributions in binary form must reproduce the above copyright
|
||||
notice, this list of conditions and the following disclaimer in the
|
||||
documentation and/or other materials provided with the distribution.
|
||||
3. Neither the name of author nor the names of any contributors may be
|
||||
used to endorse or promote products derived from this software
|
||||
without specific prior written permission.
|
||||
|
||||
THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS "AS IS" AND
|
||||
ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
|
||||
IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
|
||||
ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE
|
||||
FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
|
||||
DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
|
||||
OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
|
||||
HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
|
||||
LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
|
||||
OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
|
||||
SUCH DAMAGE.
|
@ -1,54 +0,0 @@
|
||||
Copyright (c) 2015 Ask Solem & contributors. All rights reserved.
|
||||
Copyright (c) 2012-2014 GoPivotal, Inc. All rights reserved.
|
||||
Copyright (c) 2009, 2010, 2011, 2012 Ask Solem, and individual contributors. All rights reserved.
|
||||
|
||||
Celery is licensed under The BSD License (3 Clause, also known as
|
||||
the new BSD license). The license is an OSI approved Open Source
|
||||
license and is GPL-compatible(1).
|
||||
|
||||
The license text can also be found here:
|
||||
http://www.opensource.org/licenses/BSD-3-Clause
|
||||
|
||||
License
|
||||
=======
|
||||
|
||||
Redistribution and use in source and binary forms, with or without
|
||||
modification, are permitted provided that the following conditions are met:
|
||||
* Redistributions of source code must retain the above copyright
|
||||
notice, this list of conditions and the following disclaimer.
|
||||
* Redistributions in binary form must reproduce the above copyright
|
||||
notice, this list of conditions and the following disclaimer in the
|
||||
documentation and/or other materials provided with the distribution.
|
||||
* Neither the name of Ask Solem, nor the
|
||||
names of its contributors may be used to endorse or promote products
|
||||
derived from this software without specific prior written permission.
|
||||
|
||||
THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
|
||||
AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO,
|
||||
THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
|
||||
PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL Ask Solem OR CONTRIBUTORS
|
||||
BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
|
||||
CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
|
||||
SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
|
||||
INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
|
||||
CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
|
||||
ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
|
||||
POSSIBILITY OF SUCH DAMAGE.
|
||||
|
||||
Documentation License
|
||||
=====================
|
||||
|
||||
The documentation portion of Celery (the rendered contents of the
|
||||
"docs" directory of a software distribution or checkout) is supplied
|
||||
under the "Creative Commons Attribution-ShareAlike 4.0
|
||||
International" (CC BY-SA 4.0) License as described by
|
||||
http://creativecommons.org/licenses/by-sa/4.0/
|
||||
|
||||
Footnotes
|
||||
=========
|
||||
(1) A GPL-compatible license makes it possible to
|
||||
combine Celery with other software that is released
|
||||
under the GPL, it does not mean that we're distributing
|
||||
Celery under the GPL license. The BSD license, unlike the GPL,
|
||||
let you distribute a modified version without making your
|
||||
changes open source.
|
21
docs/licenses/schedule.txt
Normal file
21
docs/licenses/schedule.txt
Normal file
@ -0,0 +1,21 @@
|
||||
The MIT License (MIT)
|
||||
|
||||
Copyright (c) 2013 Daniel Bader (http://dbader.org)
|
||||
|
||||
Permission is hereby granted, free of charge, to any person obtaining a copy
|
||||
of this software and associated documentation files (the "Software"), to deal
|
||||
in the Software without restriction, including without limitation the rights
|
||||
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
|
||||
copies of the Software, and to permit persons to whom the Software is
|
||||
furnished to do so, subject to the following conditions:
|
||||
|
||||
The above copyright notice and this permission notice shall be included in
|
||||
all copies or substantial portions of the Software.
|
||||
|
||||
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
|
||||
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
|
||||
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
|
||||
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
|
||||
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
|
||||
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
|
||||
THE SOFTWARE.
|
@ -137,11 +137,6 @@ in the top-level Makefile.
|
||||
|
||||
## Library Notes
|
||||
|
||||
### celery
|
||||
|
||||
This is only used for the beat feature (running periodic tasks).
|
||||
This could be replaced, see: https://github.com/ansible/awx/pull/2530
|
||||
|
||||
### requests-futures
|
||||
|
||||
This can be removed when a solution for the external log queuing is ready.
|
||||
|
@ -4,7 +4,6 @@ asgi-amqp>=1.1.4 # see library notes, related to channels 2
|
||||
azure-keyvault==1.1.0 # see UPGRADE BLOCKERs
|
||||
boto # replacement candidate https://github.com/ansible/awx/issues/2115
|
||||
channels==1.1.8 # UPGRADE BLOCKER: Last before backwards-incompatible channels 2 upgrade
|
||||
celery==4.3.0 # see library notes
|
||||
daphne==1.4.2 # UPGRADE BLOCKER: last before channels 2 but not pinned by other deps
|
||||
django==2.2.8 # see UPGRADE BLOCKERs
|
||||
django-auth-ldap
|
||||
@ -35,6 +34,7 @@ pyparsing
|
||||
python-memcached
|
||||
python-radius
|
||||
python3-saml
|
||||
schedule==0.6.0
|
||||
social-auth-core==3.2.0 # see UPGRADE BLOCKERs
|
||||
social-auth-app-django==3.1.0 # see UPGRADE BLOCKERs
|
||||
requests
|
||||
@ -45,4 +45,4 @@ twilio
|
||||
uWSGI
|
||||
uwsgitop
|
||||
pip==19.3.1 # see UPGRADE BLOCKERs
|
||||
setuptools==41.6.0 # see UPGRADE BLOCKERs
|
||||
setuptools==41.6.0 # see UPGRADE BLOCKERs
|
||||
|
@ -10,10 +10,8 @@ automat==0.8.0 # via twisted
|
||||
azure-common==1.1.23 # via azure-keyvault
|
||||
azure-keyvault==1.1.0
|
||||
azure-nspkg==3.0.2 # via azure-keyvault
|
||||
billiard==3.6.1.0 # via celery
|
||||
boto==2.49.0
|
||||
cachetools==3.1.1 # via google-auth
|
||||
celery==4.3.0
|
||||
certifi==2019.11.28 # via kubernetes, msrest, requests
|
||||
cffi==1.13.2 # via cryptography
|
||||
channels==1.1.8
|
||||
@ -62,7 +60,7 @@ jaraco.text==3.2.0 # via irc, jaraco.collections
|
||||
jinja2==2.10.3
|
||||
jsonpickle==1.2 # via asgi-amqp
|
||||
jsonschema==3.2.0
|
||||
kombu==4.6.7 # via asgi-amqp, celery
|
||||
kombu==4.6.7 # via asgi-amqp
|
||||
kubernetes==10.0.1 # via openshift
|
||||
lockfile==0.12.2 # via python-daemon
|
||||
lxml==4.4.2 # via xmlsec
|
||||
@ -98,7 +96,7 @@ python-radius==1.0
|
||||
python-string-utils==0.6.0 # via openshift
|
||||
python3-openid==3.1.0 # via social-auth-core
|
||||
python3-saml==1.9.0
|
||||
pytz==2019.3 # via celery, django, irc, tempora, twilio
|
||||
pytz==2019.3 # via django, irc, tempora, twilio
|
||||
pyyaml==5.2 # via ansible-runner, djangorestframework-yaml, kubernetes
|
||||
requests-futures==1.0.0
|
||||
requests-oauthlib==1.3.0 # via kubernetes, msrest, social-auth-core
|
||||
@ -106,6 +104,7 @@ requests==2.22.0
|
||||
rsa==4.0 # via google-auth
|
||||
ruamel.yaml.clib==0.2.0 # via ruamel.yaml
|
||||
ruamel.yaml==0.16.5 # via openshift
|
||||
schedule==0.6.0
|
||||
six==1.13.0 # via ansible-runner, asgi-amqp, asgiref, autobahn, automat, cryptography, django-extensions, google-auth, isodate, jaraco.classes, jaraco.collections, jaraco.itertools, jaraco.logging, jaraco.text, jsonschema, kubernetes, openshift, pygerduty, pyhamcrest, pyrad, pyrsistent, python-dateutil, python-memcached, slackclient, social-auth-app-django, social-auth-core, tacacs-plus, tempora, twilio, txaio, websocket-client
|
||||
slackclient==1.1.2
|
||||
smmap2==2.0.5 # via gitdb2
|
||||
@ -120,7 +119,7 @@ txaio==18.8.1 # via autobahn
|
||||
urllib3==1.25.7 # via kubernetes, requests
|
||||
uwsgi==2.0.18
|
||||
uwsgitop==0.11
|
||||
vine==1.3.0 # via amqp, celery
|
||||
vine==1.3.0 # via amqp
|
||||
websocket-client==0.56.0 # via kubernetes, slackclient
|
||||
xmlsec==1.3.3 # via python3-saml
|
||||
zipp==0.6.0 # via importlib-metadata
|
||||
|
Loading…
Reference in New Issue
Block a user