mirror of
git://sourceware.org/git/lvm2.git
synced 2025-01-05 13:18:20 +03:00
70d0b210e1
It appears that the output of lvconvert --merge can vary some. The code was blowing up as it was trying to parse a line of stdout to retrieve the % complete, but the line did not have the needed format and an execption was thrown. The uncaught exception caused the background thread to exit without updating the job object, which caused the client to hang forever waiting. Added a default exception handler to prevent unhandled execptions causing hangs and removed the parameter skip_first_line as it's no longer needed. The code checks to see if the line can be parsed before doing so. Signed-off-by: Tony Asleson <tasleson@redhat.com>
213 lines
6.0 KiB
Python
213 lines
6.0 KiB
Python
# Copyright (C) 2015-2016 Red Hat, Inc. All rights reserved.
|
|
#
|
|
# This copyrighted material is made available to anyone wishing to use,
|
|
# modify, copy, or redistribute it subject to the terms and conditions
|
|
# of the GNU General Public License v.2.
|
|
#
|
|
# You should have received a copy of the GNU General Public License
|
|
# along with this program. If not, see <http://www.gnu.org/licenses/>.
|
|
|
|
import threading
|
|
import subprocess
|
|
from . import cfg
|
|
import time
|
|
from .cmdhandler import options_to_cli_args
|
|
import dbus
|
|
from .job import Job, JobState
|
|
from .utils import pv_range_append, pv_dest_ranges, log_debug, log_error
|
|
from .request import RequestEntry
|
|
import traceback
|
|
|
|
_rlock = threading.RLock()
|
|
_thread_list = list()
|
|
|
|
|
|
def pv_move_lv_cmd(move_options, lv_full_name,
|
|
pv_source, pv_source_range, pv_dest_range_list):
|
|
cmd = ['pvmove', '-i', '1']
|
|
cmd.extend(options_to_cli_args(move_options))
|
|
|
|
if lv_full_name:
|
|
cmd.extend(['-n', lv_full_name])
|
|
|
|
pv_range_append(cmd, pv_source, *pv_source_range)
|
|
pv_dest_ranges(cmd, pv_dest_range_list)
|
|
|
|
return cmd
|
|
|
|
|
|
def lv_merge_cmd(merge_options, lv_full_name):
|
|
cmd = ['lvconvert', '--merge', '-i', '1']
|
|
cmd.extend(options_to_cli_args(merge_options))
|
|
cmd.append(lv_full_name)
|
|
return cmd
|
|
|
|
|
|
def _create_background_dbus_job(job_state):
|
|
job_obj = Job(None, job_state)
|
|
cfg.om.register_object(job_obj)
|
|
return job_obj.dbus_object_path()
|
|
|
|
|
|
def _move_merge(interface_name, cmd, time_out):
|
|
# Create job object to be used while running the command
|
|
rc = '/'
|
|
job_state = JobState(None)
|
|
add(cmd, job_state)
|
|
|
|
if time_out == -1:
|
|
# Waiting forever
|
|
done = job_state.Wait(time_out)
|
|
if not done:
|
|
ec, err_msg = job_state.GetError
|
|
raise dbus.exceptions.DBusException(
|
|
interface_name,
|
|
'Exit code %s, stderr = %s' % (str(ec), err_msg))
|
|
elif time_out == 0:
|
|
# Immediately create and return a job
|
|
rc = _create_background_dbus_job(job_state)
|
|
else:
|
|
# Willing to wait for a bit
|
|
done = job_state.Wait(time_out)
|
|
if not done:
|
|
rc = _create_background_dbus_job(job_state)
|
|
|
|
return rc
|
|
|
|
|
|
def move(interface_name, lv_name, pv_src_obj, pv_source_range,
|
|
pv_dests_and_ranges, move_options, time_out):
|
|
"""
|
|
Common code for the pvmove handling.
|
|
:param interface_name: What dbus interface we are providing for
|
|
:param lv_name: Optional (None or name of LV to move)
|
|
:param pv_src_obj: dbus object patch for source PV
|
|
:param pv_source_range: (0,0 to ignore, else start, end segments)
|
|
:param pv_dests_and_ranges: Array of PV object paths and start/end segs
|
|
:param move_options: Hash with optional arguments
|
|
:param time_out:
|
|
:return: Object path to job object
|
|
"""
|
|
pv_dests = []
|
|
pv_src = cfg.om.get_object_by_path(pv_src_obj)
|
|
if pv_src:
|
|
|
|
# Check to see if we are handling a move to a specific
|
|
# destination(s)
|
|
if len(pv_dests_and_ranges):
|
|
for pr in pv_dests_and_ranges:
|
|
pv_dbus_obj = cfg.om.get_object_by_path(pr[0])
|
|
if not pv_dbus_obj:
|
|
raise dbus.exceptions.DBusException(
|
|
interface_name,
|
|
'PV Destination (%s) not found' % pr[0])
|
|
|
|
pv_dests.append((pv_dbus_obj.lvm_id, pr[1], pr[2]))
|
|
|
|
# Generate the command line for this command, but don't
|
|
# execute it.
|
|
cmd = pv_move_lv_cmd(move_options,
|
|
lv_name,
|
|
pv_src.lvm_id,
|
|
pv_source_range,
|
|
pv_dests)
|
|
|
|
return _move_merge(interface_name, cmd, time_out)
|
|
else:
|
|
raise dbus.exceptions.DBusException(
|
|
interface_name, 'pv_src_obj (%s) not found' % pv_src_obj)
|
|
|
|
|
|
def merge(interface_name, lv_uuid, lv_name, merge_options, time_out):
|
|
# Make sure we have a dbus object representing it
|
|
dbo = cfg.om.get_object_by_uuid_lvm_id(lv_uuid, lv_name)
|
|
if dbo:
|
|
cmd = lv_merge_cmd(merge_options, dbo.lvm_id)
|
|
return _move_merge(interface_name, cmd, time_out)
|
|
else:
|
|
raise dbus.exceptions.DBusException(
|
|
interface_name,
|
|
'LV with uuid %s and name %s not present!' % (lv_uuid, lv_name))
|
|
|
|
|
|
def background_reaper():
|
|
while cfg.run.value != 0:
|
|
with _rlock:
|
|
num_threads = len(_thread_list) - 1
|
|
if num_threads >= 0:
|
|
for i in range(num_threads, -1, -1):
|
|
_thread_list[i].join(0)
|
|
if not _thread_list[i].is_alive():
|
|
_thread_list.pop(i)
|
|
|
|
time.sleep(3)
|
|
|
|
|
|
def process_background_result(job_object, exit_code, error_msg):
|
|
cfg.load()
|
|
job_object.set_result(exit_code, error_msg)
|
|
return None
|
|
|
|
|
|
# noinspection PyUnusedLocal
|
|
def empty_cb(disregard):
|
|
pass
|
|
|
|
|
|
def background_execute(command, background_job, skip_first_line=False):
|
|
|
|
# Wrap this whole operation in an exception handler, otherwise if we
|
|
# hit a code bug we will silently exit this thread without anyone being
|
|
# the wiser.
|
|
try:
|
|
process = subprocess.Popen(command, stdout=subprocess.PIPE,
|
|
stderr=subprocess.PIPE, close_fds=True)
|
|
lines_iterator = iter(process.stdout.readline, b"")
|
|
for line in lines_iterator:
|
|
line_str = line.decode("utf-8")
|
|
|
|
# Check to see if the line has the correct number of separators
|
|
try:
|
|
if line_str.count(':') == 2:
|
|
(device, ignore, percentage) = line_str.split(':')
|
|
background_job.Percent = \
|
|
round(float(percentage.strip()[:-1]), 1)
|
|
except ValueError:
|
|
log_error("Trying to parse percentage which failed for %s" %
|
|
line_str)
|
|
|
|
out = process.communicate()
|
|
|
|
if process.returncode == 0:
|
|
background_job.Percent = 100
|
|
|
|
# Queue up the result so that it gets executed in same thread as others.
|
|
r = RequestEntry(
|
|
-1, process_background_result,
|
|
(background_job, process.returncode, out[1]),
|
|
empty_cb, empty_cb, False)
|
|
cfg.worker_q.put(r)
|
|
except Exception:
|
|
# In the unlikely event that we blew up, lets notify fill out the
|
|
# job object so that the client doesn't hang potentially forever!
|
|
st = traceback.format_exc()
|
|
error = "Exception in background thread: \n%s" % st
|
|
log_error(error)
|
|
r = RequestEntry(
|
|
-1, process_background_result,
|
|
(background_job, 1, error),
|
|
empty_cb, empty_cb, False)
|
|
cfg.worker_q.put(r)
|
|
|
|
|
|
def add(command, reporting_job):
|
|
# Create the thread, get it running and then add it to the list
|
|
t = threading.Thread(
|
|
target=background_execute,
|
|
name="thread: " + ' '.join(command),
|
|
args=(command, reporting_job))
|
|
t.start()
|
|
|
|
with _rlock:
|
|
_thread_list.append(t)
|