mirror of
git://sourceware.org/git/lvm2.git
synced 2025-01-05 13:18:20 +03:00
20b21f4fd8
When a client is doing a wait on a job, any other clients will hang when trying to do anything with the service. This is caused by the wait code which was placing the thread that handles incoming dbus requests to sleep until either the timeout expired or the job operation completed. This change creates a thread for the wait request, so that the thread processing incoming requests can continue to run.
202 lines
5.6 KiB
Python
202 lines
5.6 KiB
Python
# Copyright (C) 2015-2016 Red Hat, Inc. All rights reserved.
|
|
#
|
|
# This copyrighted material is made available to anyone wishing to use,
|
|
# modify, copy, or redistribute it subject to the terms and conditions
|
|
# of the GNU General Public License v.2.
|
|
#
|
|
# You should have received a copy of the GNU General Public License
|
|
# along with this program. If not, see <http://www.gnu.org/licenses/>.
|
|
|
|
import threading
|
|
import subprocess
|
|
from . import cfg
|
|
import time
|
|
from .cmdhandler import options_to_cli_args
|
|
import dbus
|
|
from .utils import pv_range_append, pv_dest_ranges, log_error, log_debug
|
|
import traceback
|
|
|
|
_rlock = threading.RLock()
|
|
_thread_list = list()
|
|
|
|
|
|
def pv_move_lv_cmd(move_options, lv_full_name,
|
|
pv_source, pv_source_range, pv_dest_range_list):
|
|
cmd = ['pvmove', '-i', '1']
|
|
cmd.extend(options_to_cli_args(move_options))
|
|
|
|
if lv_full_name:
|
|
cmd.extend(['-n', lv_full_name])
|
|
|
|
pv_range_append(cmd, pv_source, *pv_source_range)
|
|
pv_dest_ranges(cmd, pv_dest_range_list)
|
|
|
|
return cmd
|
|
|
|
|
|
def lv_merge_cmd(merge_options, lv_full_name):
|
|
cmd = ['lvconvert', '--merge', '-i', '1']
|
|
cmd.extend(options_to_cli_args(merge_options))
|
|
cmd.append(lv_full_name)
|
|
return cmd
|
|
|
|
|
|
def _move_merge(interface_name, cmd, job_state):
|
|
add(cmd, job_state)
|
|
|
|
done = job_state.Wait(-1)
|
|
if not done:
|
|
ec, err_msg = job_state.GetError
|
|
raise dbus.exceptions.DBusException(
|
|
interface_name,
|
|
'Exit code %s, stderr = %s' % (str(ec), err_msg))
|
|
|
|
cfg.load()
|
|
return '/'
|
|
|
|
|
|
def move(interface_name, lv_name, pv_src_obj, pv_source_range,
|
|
pv_dests_and_ranges, move_options, job_state):
|
|
"""
|
|
Common code for the pvmove handling.
|
|
:param interface_name: What dbus interface we are providing for
|
|
:param lv_name: Optional (None or name of LV to move)
|
|
:param pv_src_obj: dbus object patch for source PV
|
|
:param pv_source_range: (0,0 to ignore, else start, end segments)
|
|
:param pv_dests_and_ranges: Array of PV object paths and start/end segs
|
|
:param move_options: Hash with optional arguments
|
|
:param job_state: Used to convey information about jobs between processes
|
|
:return: '/' When complete, the empty object path
|
|
"""
|
|
pv_dests = []
|
|
pv_src = cfg.om.get_object_by_path(pv_src_obj)
|
|
if pv_src:
|
|
|
|
# Check to see if we are handling a move to a specific
|
|
# destination(s)
|
|
if len(pv_dests_and_ranges):
|
|
for pr in pv_dests_and_ranges:
|
|
pv_dbus_obj = cfg.om.get_object_by_path(pr[0])
|
|
if not pv_dbus_obj:
|
|
raise dbus.exceptions.DBusException(
|
|
interface_name,
|
|
'PV Destination (%s) not found' % pr[0])
|
|
|
|
pv_dests.append((pv_dbus_obj.lvm_id, pr[1], pr[2]))
|
|
|
|
# Generate the command line for this command, but don't
|
|
# execute it.
|
|
cmd = pv_move_lv_cmd(move_options,
|
|
lv_name,
|
|
pv_src.lvm_id,
|
|
pv_source_range,
|
|
pv_dests)
|
|
|
|
return _move_merge(interface_name, cmd, job_state)
|
|
else:
|
|
raise dbus.exceptions.DBusException(
|
|
interface_name, 'pv_src_obj (%s) not found' % pv_src_obj)
|
|
|
|
|
|
def merge(interface_name, lv_uuid, lv_name, merge_options, job_state):
|
|
# Make sure we have a dbus object representing it
|
|
dbo = cfg.om.get_object_by_uuid_lvm_id(lv_uuid, lv_name)
|
|
if dbo:
|
|
cmd = lv_merge_cmd(merge_options, dbo.lvm_id)
|
|
return _move_merge(interface_name, cmd, job_state)
|
|
else:
|
|
raise dbus.exceptions.DBusException(
|
|
interface_name,
|
|
'LV with uuid %s and name %s not present!' % (lv_uuid, lv_name))
|
|
|
|
|
|
def background_reaper():
|
|
while cfg.run.value != 0:
|
|
with _rlock:
|
|
num_threads = len(_thread_list) - 1
|
|
if num_threads >= 0:
|
|
for i in range(num_threads, -1, -1):
|
|
_thread_list[i].join(0)
|
|
if not _thread_list[i].is_alive():
|
|
log_debug("Removing thread: %s" % _thread_list[i].name)
|
|
_thread_list.pop(i)
|
|
|
|
time.sleep(3)
|
|
|
|
|
|
def background_execute(command, background_job):
|
|
|
|
# Wrap this whole operation in an exception handler, otherwise if we
|
|
# hit a code bug we will silently exit this thread without anyone being
|
|
# the wiser.
|
|
try:
|
|
process = subprocess.Popen(command, stdout=subprocess.PIPE,
|
|
stderr=subprocess.PIPE, close_fds=True)
|
|
lines_iterator = iter(process.stdout.readline, b"")
|
|
for line in lines_iterator:
|
|
line_str = line.decode("utf-8")
|
|
|
|
# Check to see if the line has the correct number of separators
|
|
try:
|
|
if line_str.count(':') == 2:
|
|
(device, ignore, percentage) = line_str.split(':')
|
|
background_job.Percent = \
|
|
round(float(percentage.strip()[:-1]), 1)
|
|
except ValueError:
|
|
log_error("Trying to parse percentage which failed for %s" %
|
|
line_str)
|
|
|
|
out = process.communicate()
|
|
|
|
if process.returncode == 0:
|
|
background_job.Percent = 100
|
|
|
|
background_job.set_result(process.returncode, out[1])
|
|
|
|
except Exception:
|
|
# In the unlikely event that we blow up, we need to unblock caller which
|
|
# is waiting on an answer.
|
|
st = traceback.format_exc()
|
|
error = "Exception in background thread: \n%s" % st
|
|
log_error(error)
|
|
background_job.set_result(1, error)
|
|
|
|
|
|
def add(command, reporting_job):
|
|
# Create the thread, get it running and then add it to the list
|
|
t = threading.Thread(
|
|
target=background_execute,
|
|
name="thread: " + ' '.join(command),
|
|
args=(command, reporting_job))
|
|
t.start()
|
|
|
|
with _rlock:
|
|
_thread_list.append(t)
|
|
|
|
|
|
def wait_thread(job, timeout, cb, cbe):
|
|
# We need to put the wait on it's own thread, so that we don't block the
|
|
# entire dbus queue processing thread
|
|
try:
|
|
cb(job.state.Wait(timeout))
|
|
except Exception as e:
|
|
cbe("Wait exception: %s" % str(e))
|
|
return 0
|
|
|
|
|
|
def add_wait(job, timeout, cb, cbe):
|
|
|
|
if timeout == 0:
|
|
# Users are basically polling, do not create thread
|
|
cb(job.Complete)
|
|
else:
|
|
t = threading.Thread(
|
|
target=wait_thread,
|
|
name="thread job.Wait: %s" % job.dbus_object_path(),
|
|
args=(job, timeout, cb, cbe)
|
|
)
|
|
|
|
t.start()
|
|
with _rlock:
|
|
_thread_list.append(t)
|