1
0
mirror of git://sourceware.org/git/lvm2.git synced 2025-02-24 17:57:48 +03:00

lvmdbusd: Use one thread to fetch state updates

In preparation to have more than one thread issuing commands to lvm
at the same time we need to serialize updates to the dbus state and
retrieving the global lvm state.  To achieve this we have one thread
handling this with a thread safe queue taking and coalescing requests.
This commit is contained in:
Tony Asleson 2016-11-10 12:19:48 -06:00
parent affe2cebf5
commit fa444906bb
6 changed files with 131 additions and 100 deletions

View File

@ -78,6 +78,7 @@ hidden_lv = itertools.count()
# Used to prevent circular imports... # Used to prevent circular imports...
load = None load = None
event = None
# Global cached state # Global cached state
db = None db = None

View File

@ -11,7 +11,10 @@ from .pv import load_pvs
from .vg import load_vgs from .vg import load_vgs
from .lv import load_lvs from .lv import load_lvs
from . import cfg from . import cfg
from .utils import MThreadRunner, log_debug from .utils import MThreadRunner, log_debug, log_error
import threading
import queue
import traceback
def _main_thread_load(refresh=True, emit_signal=True): def _main_thread_load(refresh=True, emit_signal=True):
@ -45,3 +48,112 @@ def load(refresh=True, emit_signal=True, cache_refresh=True, log=True,
rc = _main_thread_load(refresh, emit_signal) rc = _main_thread_load(refresh, emit_signal)
return rc return rc
# Even though lvm can handle multiple changes concurrently it really doesn't
# make sense to make a 1-1 fetch of data for each change of lvm because when
# we fetch the data once all previous changes are reflected.
class StateUpdate(object):
class UpdateRequest(object):
def __init__(self, refresh, emit_signal, cache_refresh, log,
need_main_thread):
self.is_done = False
self.refresh = refresh
self.emit_signal = emit_signal
self.cache_refresh = cache_refresh
self.log = log
self.need_main_thread = need_main_thread
self.result = None
self.cond = threading.Condition(threading.Lock())
def done(self):
with self.cond:
if not self.is_done:
self.cond.wait()
return self.result
def set_result(self, result):
with self.cond:
self.result = result
self.is_done = True
self.cond.notify_all()
@staticmethod
def update_thread(obj):
while cfg.run.value != 0:
# noinspection PyBroadException
try:
queued_requests = []
refresh = True
emit_signal = True
cache_refresh = True
log = True
need_main_thread = True
with obj.lock:
wait = not obj.deferred
obj.deferred = False
if wait:
queued_requests.append(obj.queue.get(True, 2))
# Ok we have one or the deferred queue has some,
# check if any others
try:
while True:
queued_requests.append(obj.queue.get(False))
except queue.Empty:
pass
log_debug("Processing %d updates!" % len(queued_requests))
# We have what we can, run the update with the needed options
for i in queued_requests:
if not i.refresh:
refresh = False
if not i.emit_signal:
emit_signal = False
if not i.cache_refresh:
cache_refresh = False
if not i.log:
log = False
if not i.need_main_thread:
need_main_thread = False
num_changes = load(refresh, emit_signal, cache_refresh, log,
need_main_thread)
# Update is done, let everyone know!
for i in queued_requests:
i.set_result(num_changes)
except queue.Empty:
pass
except Exception:
st = traceback.format_exc()
log_error("update_thread exception: \n%s" % st)
def __init__(self):
self.lock = threading.RLock()
self.queue = queue.Queue()
self.deferred = False
# Do initial load
load(refresh=False, emit_signal=False, need_main_thread=False)
self.thread = threading.Thread(target=StateUpdate.update_thread,
args=(self,))
def load(self, refresh=True, emit_signal=True, cache_refresh=True,
log=True, need_main_thread=True):
# Place this request on the queue and wait for it to be completed
req = StateUpdate.UpdateRequest(refresh, emit_signal, cache_refresh,
log, need_main_thread)
self.queue.put(req)
return req.done()
def event(self):
with self.lock:
self.deferred = True

View File

@ -20,7 +20,7 @@ import dbus.mainloop.glib
from . import lvmdb from . import lvmdb
# noinspection PyUnresolvedReferences # noinspection PyUnresolvedReferences
from gi.repository import GLib from gi.repository import GLib
from .fetch import load from .fetch import StateUpdate
from .manager import Manager from .manager import Manager
import traceback import traceback
import queue import queue
@ -29,7 +29,6 @@ from .utils import log_debug, log_error
import argparse import argparse
import os import os
import sys import sys
from .refresh import handle_external_event, event_complete
class Lvm(objectmanager.ObjectManager): class Lvm(objectmanager.ObjectManager):
@ -37,54 +36,15 @@ class Lvm(objectmanager.ObjectManager):
super(Lvm, self).__init__(object_path, BASE_INTERFACE) super(Lvm, self).__init__(object_path, BASE_INTERFACE)
def _discard_pending_refreshes():
# We just handled a refresh, if we have any in the queue they can be
# removed because by definition they are older than the refresh we just did.
# As we limit the number of refreshes getting into the queue
# we should only ever have one to remove.
requests = []
while not cfg.worker_q.empty():
try:
r = cfg.worker_q.get(block=False)
if r.method != handle_external_event:
requests.append(r)
else:
# Make sure we make this event complete even though it didn't
# run, otherwise no other events will get processed
event_complete()
break
except queue.Empty:
break
# Any requests we removed, but did not discard need to be re-queued
for r in requests:
cfg.worker_q.put(r)
def process_request(): def process_request():
while cfg.run.value != 0: while cfg.run.value != 0:
# noinspection PyBroadException # noinspection PyBroadException
try: try:
req = cfg.worker_q.get(True, 5) req = cfg.worker_q.get(True, 5)
start = cfg.db.num_refreshes
log_debug( log_debug(
"Running method: %s with args %s" % "Running method: %s with args %s" %
(str(req.method), str(req.arguments))) (str(req.method), str(req.arguments)))
req.run_cmd() req.run_cmd()
end = cfg.db.num_refreshes
num_refreshes = end - start
if num_refreshes > 0:
_discard_pending_refreshes()
if num_refreshes > 1:
log_debug(
"Inspect method %s for too many refreshes" %
(str(req.method)))
log_debug("Method complete ") log_debug("Method complete ")
except queue.Empty: except queue.Empty:
pass pass
@ -152,20 +112,25 @@ def main():
cfg.om = Lvm(BASE_OBJ_PATH) cfg.om = Lvm(BASE_OBJ_PATH)
cfg.om.register_object(Manager(MANAGER_OBJ_PATH)) cfg.om.register_object(Manager(MANAGER_OBJ_PATH))
cfg.load = load
cfg.db = lvmdb.DataStore(cfg.args.use_json) cfg.db = lvmdb.DataStore(cfg.args.use_json)
# Using a thread to process requests, we cannot hang the dbus library # Using a thread to process requests, we cannot hang the dbus library
# thread that is handling the dbus interface # thread that is handling the dbus interface
thread_list.append(threading.Thread(target=process_request)) thread_list.append(threading.Thread(target=process_request))
cfg.load(refresh=False, emit_signal=False, need_main_thread=False) # Have a single thread handling updating lvm and the dbus model so we don't
# have multiple threads doing this as the same time
updater = StateUpdate()
thread_list.append(updater.thread)
cfg.load = updater.load
cfg.event = updater.event
cfg.loop = GLib.MainLoop() cfg.loop = GLib.MainLoop()
for process in thread_list: for thread in thread_list:
process.damon = True thread.damon = True
process.start() thread.start()
# Add udev watching # Add udev watching
if cfg.args.use_udev: if cfg.args.use_udev:
@ -187,8 +152,8 @@ def main():
cfg.loop.run() cfg.loop.run()
udevwatch.remove() udevwatch.remove()
for process in thread_list: for thread in thread_list:
process.join() thread.join()
except KeyboardInterrupt: except KeyboardInterrupt:
utils.handler(signal.SIGINT, None) utils.handler(signal.SIGINT, None)
return 0 return 0

View File

@ -14,9 +14,7 @@ from .cfg import MANAGER_INTERFACE
import dbus import dbus
from . import cfg from . import cfg
from . import cmdhandler from . import cmdhandler
from .fetch import load_pvs, load_vgs
from .request import RequestEntry from .request import RequestEntry
from .refresh import event_add
from . import udevwatch from . import udevwatch
@ -183,7 +181,8 @@ class Manager(AutomatedProperties):
"udev monitoring") "udev monitoring")
# We are dependent on external events now to stay current! # We are dependent on external events now to stay current!
cfg.ee = True cfg.ee = True
event_add((command,)) utils.log_debug("ExternalEvent %s" % command)
cfg.event()
return dbus.Int32(0) return dbus.Int32(0)
@staticmethod @staticmethod

View File

@ -1,45 +0,0 @@
# Copyright (C) 2015-2016 Red Hat, Inc. All rights reserved.
#
# This copyrighted material is made available to anyone wishing to use,
# modify, copy, or redistribute it subject to the terms and conditions
# of the GNU General Public License v.2.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
# Try and minimize the refreshes we do.
import threading
from .request import RequestEntry
from . import cfg
from . import utils
_rlock = threading.RLock()
_count = 0
def handle_external_event(command):
utils.log_debug("External event: '%s'" % command)
event_complete()
cfg.load()
def event_add(params):
global _rlock
global _count
with _rlock:
if _count == 0:
_count += 1
r = RequestEntry(
-1, handle_external_event,
params, None, None, False)
cfg.worker_q.put(r)
def event_complete():
global _rlock
global _count
with _rlock:
if _count > 0:
_count -= 1
return _count

View File

@ -9,7 +9,6 @@
import pyudev import pyudev
import threading import threading
from .refresh import event_add
from . import cfg from . import cfg
observer = None observer = None
@ -38,7 +37,7 @@ def filter_event(action, device):
refresh = True refresh = True
if refresh: if refresh:
event_add(('udev',)) cfg.event()
def add(): def add():