mirror of
git://sourceware.org/git/lvm2.git
synced 2025-01-18 10:04:20 +03:00
lvmdbusd: Fix hang when lvm compiled with 'enable-notify-dbus'
The following operations would hang if lvm was compiled with 'enable-notify-dbus' and the client specified -1 for the timeout: * LV snapshot merge * VG move * LV move This was caused because the implementation of these three dbus methods is different. Most of the dbus method calls are executed by gathering information needed to fulfill it, placing that information on a thread safe queue and returning. The results later to be returned to the client with callbacks. With this approach we can process an arbitrary number of commands without any of them blocking other dbus commands. However, the 3 dbus methods listed above did not utilize this functionality because they were implemented with a separate thread that handles the fork & exec of lvm. This is done because these operations can be very slow to complete. However, because of this the lvm command that we were waiting on is trying to call back into the dbus service to notify it that something changed. Because the code was blocking the process that handles the incoming dbus activity the lvm command blocked. We were stuck until the client timed-out the connection, which then causes the service to unblock and continue. If the client did not have a timeout, we would have been hung indefinitely. The fix is to always utilize the worker queue on all dbus methods. We need to ensure that lvm is tested with 'enable-notify-dbus' enabled and disabled.
This commit is contained in:
parent
5274c2f11b
commit
dd5d865020
@ -43,40 +43,22 @@ def lv_merge_cmd(merge_options, lv_full_name):
|
|||||||
return cmd
|
return cmd
|
||||||
|
|
||||||
|
|
||||||
def _create_background_dbus_job(job_state):
|
def _move_merge(interface_name, cmd, job_state):
|
||||||
job_obj = Job(None, job_state)
|
|
||||||
cfg.om.register_object(job_obj)
|
|
||||||
return job_obj.dbus_object_path()
|
|
||||||
|
|
||||||
|
|
||||||
def _move_merge(interface_name, cmd, time_out):
|
|
||||||
# Create job object to be used while running the command
|
|
||||||
rc = '/'
|
|
||||||
job_state = JobState(None)
|
|
||||||
add(cmd, job_state)
|
add(cmd, job_state)
|
||||||
|
|
||||||
if time_out == -1:
|
done = job_state.Wait(-1)
|
||||||
# Waiting forever
|
|
||||||
done = job_state.Wait(time_out)
|
|
||||||
if not done:
|
if not done:
|
||||||
ec, err_msg = job_state.GetError
|
ec, err_msg = job_state.GetError
|
||||||
raise dbus.exceptions.DBusException(
|
raise dbus.exceptions.DBusException(
|
||||||
interface_name,
|
interface_name,
|
||||||
'Exit code %s, stderr = %s' % (str(ec), err_msg))
|
'Exit code %s, stderr = %s' % (str(ec), err_msg))
|
||||||
elif time_out == 0:
|
|
||||||
# Immediately create and return a job
|
|
||||||
rc = _create_background_dbus_job(job_state)
|
|
||||||
else:
|
|
||||||
# Willing to wait for a bit
|
|
||||||
done = job_state.Wait(time_out)
|
|
||||||
if not done:
|
|
||||||
rc = _create_background_dbus_job(job_state)
|
|
||||||
|
|
||||||
return rc
|
cfg.load()
|
||||||
|
return '/'
|
||||||
|
|
||||||
|
|
||||||
def move(interface_name, lv_name, pv_src_obj, pv_source_range,
|
def move(interface_name, lv_name, pv_src_obj, pv_source_range,
|
||||||
pv_dests_and_ranges, move_options, time_out):
|
pv_dests_and_ranges, move_options, job_state):
|
||||||
"""
|
"""
|
||||||
Common code for the pvmove handling.
|
Common code for the pvmove handling.
|
||||||
:param interface_name: What dbus interface we are providing for
|
:param interface_name: What dbus interface we are providing for
|
||||||
@ -85,8 +67,8 @@ def move(interface_name, lv_name, pv_src_obj, pv_source_range,
|
|||||||
:param pv_source_range: (0,0 to ignore, else start, end segments)
|
:param pv_source_range: (0,0 to ignore, else start, end segments)
|
||||||
:param pv_dests_and_ranges: Array of PV object paths and start/end segs
|
:param pv_dests_and_ranges: Array of PV object paths and start/end segs
|
||||||
:param move_options: Hash with optional arguments
|
:param move_options: Hash with optional arguments
|
||||||
:param time_out:
|
:param job_state: Used to convey information about jobs between processes
|
||||||
:return: Object path to job object
|
:return: '/' When complete, the empty object path
|
||||||
"""
|
"""
|
||||||
pv_dests = []
|
pv_dests = []
|
||||||
pv_src = cfg.om.get_object_by_path(pv_src_obj)
|
pv_src = cfg.om.get_object_by_path(pv_src_obj)
|
||||||
@ -112,18 +94,18 @@ def move(interface_name, lv_name, pv_src_obj, pv_source_range,
|
|||||||
pv_source_range,
|
pv_source_range,
|
||||||
pv_dests)
|
pv_dests)
|
||||||
|
|
||||||
return _move_merge(interface_name, cmd, time_out)
|
return _move_merge(interface_name, cmd, job_state)
|
||||||
else:
|
else:
|
||||||
raise dbus.exceptions.DBusException(
|
raise dbus.exceptions.DBusException(
|
||||||
interface_name, 'pv_src_obj (%s) not found' % pv_src_obj)
|
interface_name, 'pv_src_obj (%s) not found' % pv_src_obj)
|
||||||
|
|
||||||
|
|
||||||
def merge(interface_name, lv_uuid, lv_name, merge_options, time_out):
|
def merge(interface_name, lv_uuid, lv_name, merge_options, job_state):
|
||||||
# Make sure we have a dbus object representing it
|
# Make sure we have a dbus object representing it
|
||||||
dbo = cfg.om.get_object_by_uuid_lvm_id(lv_uuid, lv_name)
|
dbo = cfg.om.get_object_by_uuid_lvm_id(lv_uuid, lv_name)
|
||||||
if dbo:
|
if dbo:
|
||||||
cmd = lv_merge_cmd(merge_options, dbo.lvm_id)
|
cmd = lv_merge_cmd(merge_options, dbo.lvm_id)
|
||||||
return _move_merge(interface_name, cmd, time_out)
|
return _move_merge(interface_name, cmd, job_state)
|
||||||
else:
|
else:
|
||||||
raise dbus.exceptions.DBusException(
|
raise dbus.exceptions.DBusException(
|
||||||
interface_name,
|
interface_name,
|
||||||
@ -143,17 +125,6 @@ def background_reaper():
|
|||||||
time.sleep(3)
|
time.sleep(3)
|
||||||
|
|
||||||
|
|
||||||
def process_background_result(job_object, exit_code, error_msg):
|
|
||||||
cfg.load()
|
|
||||||
job_object.set_result(exit_code, error_msg)
|
|
||||||
return None
|
|
||||||
|
|
||||||
|
|
||||||
# noinspection PyUnusedLocal
|
|
||||||
def empty_cb(disregard):
|
|
||||||
pass
|
|
||||||
|
|
||||||
|
|
||||||
def background_execute(command, background_job, skip_first_line=False):
|
def background_execute(command, background_job, skip_first_line=False):
|
||||||
|
|
||||||
# Wrap this whole operation in an exception handler, otherwise if we
|
# Wrap this whole operation in an exception handler, otherwise if we
|
||||||
@ -181,23 +152,15 @@ def background_execute(command, background_job, skip_first_line=False):
|
|||||||
if process.returncode == 0:
|
if process.returncode == 0:
|
||||||
background_job.Percent = 100
|
background_job.Percent = 100
|
||||||
|
|
||||||
# Queue up the result so that it gets executed in same thread as others.
|
background_job.set_result(process.returncode, out[1])
|
||||||
r = RequestEntry(
|
|
||||||
-1, process_background_result,
|
|
||||||
(background_job, process.returncode, out[1]),
|
|
||||||
empty_cb, empty_cb, False)
|
|
||||||
cfg.worker_q.put(r)
|
|
||||||
except Exception:
|
except Exception:
|
||||||
# In the unlikely event that we blew up, lets notify fill out the
|
# In the unlikely event that we blow up, we need to unblock caller which
|
||||||
# job object so that the client doesn't hang potentially forever!
|
# is waiting on an answer.
|
||||||
st = traceback.format_exc()
|
st = traceback.format_exc()
|
||||||
error = "Exception in background thread: \n%s" % st
|
error = "Exception in background thread: \n%s" % st
|
||||||
log_error(error)
|
log_error(error)
|
||||||
r = RequestEntry(
|
background_job.set_result(1, error)
|
||||||
-1, process_background_result,
|
|
||||||
(background_job, 1, error),
|
|
||||||
empty_cb, empty_cb, False)
|
|
||||||
cfg.worker_q.put(r)
|
|
||||||
|
|
||||||
|
|
||||||
def add(command, reporting_job):
|
def add(command, reporting_job):
|
||||||
|
@ -17,7 +17,7 @@ import threading
|
|||||||
|
|
||||||
# noinspection PyPep8Naming
|
# noinspection PyPep8Naming
|
||||||
class JobState(object):
|
class JobState(object):
|
||||||
def __init__(self, request):
|
def __init__(self, request=None):
|
||||||
self.rlock = threading.RLock()
|
self.rlock = threading.RLock()
|
||||||
|
|
||||||
self._percent = 0
|
self._percent = 0
|
||||||
|
@ -22,6 +22,7 @@ from .loader import common
|
|||||||
from .state import State
|
from .state import State
|
||||||
from . import background
|
from . import background
|
||||||
from .utils import round_size
|
from .utils import round_size
|
||||||
|
from .job import JobState
|
||||||
|
|
||||||
|
|
||||||
# Try and build a key for a LV, so that we sort the LVs with least dependencies
|
# Try and build a key for a LV, so that we sort the LVs with least dependencies
|
||||||
@ -444,14 +445,21 @@ class Lv(LvCommon):
|
|||||||
@dbus.service.method(
|
@dbus.service.method(
|
||||||
dbus_interface=LV_INTERFACE,
|
dbus_interface=LV_INTERFACE,
|
||||||
in_signature='o(tt)a(ott)ia{sv}',
|
in_signature='o(tt)a(ott)ia{sv}',
|
||||||
out_signature='o')
|
out_signature='o',
|
||||||
|
async_callbacks=('cb', 'cbe'))
|
||||||
def Move(self, pv_src_obj, pv_source_range,
|
def Move(self, pv_src_obj, pv_source_range,
|
||||||
pv_dests_and_ranges,
|
pv_dests_and_ranges,
|
||||||
tmo, move_options):
|
tmo, move_options, cb, cbe):
|
||||||
return background.move(
|
|
||||||
LV_INTERFACE, self.lvm_id, pv_src_obj,
|
job_state = JobState()
|
||||||
pv_source_range, pv_dests_and_ranges,
|
|
||||||
move_options, tmo)
|
r = RequestEntry(
|
||||||
|
tmo, background.move,
|
||||||
|
(LV_INTERFACE, self.lvm_id, pv_src_obj, pv_source_range,
|
||||||
|
pv_dests_and_ranges, move_options, job_state), cb, cbe, False,
|
||||||
|
job_state)
|
||||||
|
|
||||||
|
cfg.worker_q.put(r)
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
def _snap_shot(lv_uuid, lv_name, name, optional_size,
|
def _snap_shot(lv_uuid, lv_name, name, optional_size,
|
||||||
@ -875,7 +883,13 @@ class LvSnapShot(Lv):
|
|||||||
@dbus.service.method(
|
@dbus.service.method(
|
||||||
dbus_interface=SNAPSHOT_INTERFACE,
|
dbus_interface=SNAPSHOT_INTERFACE,
|
||||||
in_signature='ia{sv}',
|
in_signature='ia{sv}',
|
||||||
out_signature='o')
|
out_signature='o',
|
||||||
def Merge(self, tmo, merge_options):
|
async_callbacks=('cb', 'cbe'))
|
||||||
return background.merge(SNAPSHOT_INTERFACE, self.Uuid, self.lvm_id,
|
def Merge(self, tmo, merge_options, cb, cbe):
|
||||||
merge_options, tmo)
|
job_state = JobState()
|
||||||
|
|
||||||
|
r = RequestEntry(tmo, background.merge,
|
||||||
|
(SNAPSHOT_INTERFACE, self.Uuid, self.lvm_id,
|
||||||
|
merge_options, job_state), cb, cbe, False,
|
||||||
|
job_state)
|
||||||
|
cfg.worker_q.put(r)
|
||||||
|
@ -18,7 +18,7 @@ from .utils import log_error
|
|||||||
|
|
||||||
class RequestEntry(object):
|
class RequestEntry(object):
|
||||||
def __init__(self, tmo, method, arguments, cb, cb_error,
|
def __init__(self, tmo, method, arguments, cb, cb_error,
|
||||||
return_tuple=True):
|
return_tuple=True, job_state=None):
|
||||||
self.tmo = tmo
|
self.tmo = tmo
|
||||||
self.method = method
|
self.method = method
|
||||||
self.arguments = arguments
|
self.arguments = arguments
|
||||||
@ -33,6 +33,7 @@ class RequestEntry(object):
|
|||||||
self._rc = 0
|
self._rc = 0
|
||||||
self._rc_error = None
|
self._rc_error = None
|
||||||
self._return_tuple = return_tuple
|
self._return_tuple = return_tuple
|
||||||
|
self._job_state = job_state
|
||||||
|
|
||||||
if self.tmo < 0:
|
if self.tmo < 0:
|
||||||
# Client is willing to block forever
|
# Client is willing to block forever
|
||||||
@ -53,7 +54,7 @@ class RequestEntry(object):
|
|||||||
r.timer_expired()
|
r.timer_expired()
|
||||||
|
|
||||||
def _return_job(self):
|
def _return_job(self):
|
||||||
self._job = Job(self)
|
self._job = Job(self, self._job_state)
|
||||||
cfg.om.register_object(self._job, True)
|
cfg.om.register_object(self._job, True)
|
||||||
if self._return_tuple:
|
if self._return_tuple:
|
||||||
self.cb(('/', self._job.dbus_object_path()))
|
self.cb(('/', self._job.dbus_object_path()))
|
||||||
|
@ -20,6 +20,7 @@ from .loader import common
|
|||||||
from .state import State
|
from .state import State
|
||||||
from . import background
|
from . import background
|
||||||
from .utils import round_size
|
from .utils import round_size
|
||||||
|
from .job import JobState
|
||||||
|
|
||||||
|
|
||||||
# noinspection PyUnusedLocal
|
# noinspection PyUnusedLocal
|
||||||
@ -352,12 +353,20 @@ class Vg(AutomatedProperties):
|
|||||||
@dbus.service.method(
|
@dbus.service.method(
|
||||||
dbus_interface=VG_INTERFACE,
|
dbus_interface=VG_INTERFACE,
|
||||||
in_signature='o(tt)a(ott)ia{sv}',
|
in_signature='o(tt)a(ott)ia{sv}',
|
||||||
out_signature='o')
|
out_signature='o',
|
||||||
|
async_callbacks=('cb', 'cbe'))
|
||||||
def Move(self, pv_src_obj, pv_source_range, pv_dests_and_ranges,
|
def Move(self, pv_src_obj, pv_source_range, pv_dests_and_ranges,
|
||||||
tmo, move_options):
|
tmo, move_options, cb, cbe):
|
||||||
return background.move(
|
|
||||||
VG_INTERFACE, None, pv_src_obj, pv_source_range,
|
job_state = JobState()
|
||||||
pv_dests_and_ranges, move_options, tmo)
|
|
||||||
|
r = RequestEntry(
|
||||||
|
tmo, background.move,
|
||||||
|
(VG_INTERFACE, None, pv_src_obj, pv_source_range,
|
||||||
|
pv_dests_and_ranges, move_options, job_state), cb, cbe, False,
|
||||||
|
job_state)
|
||||||
|
|
||||||
|
cfg.worker_q.put(r)
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
def _lv_create(uuid, vg_name, name, size_bytes, pv_dests_and_ranges,
|
def _lv_create(uuid, vg_name, name, size_bytes, pv_dests_and_ranges,
|
||||||
|
Loading…
x
Reference in New Issue
Block a user