1
0
mirror of git://sourceware.org/git/lvm2.git synced 2025-01-04 09:18:36 +03:00

lvmdbusd: Add support for using lvm shell

With the addition of JSON and the ability to get output which is known to
not contain any extraneous text we can now leverage lvm shell, so that we
don't fork and exec lvm command line repeatedly.
This commit is contained in:
Tony Asleson 2016-08-12 15:23:05 -05:00
parent a0a2c84a26
commit 2352ff24a5
3 changed files with 180 additions and 110 deletions

View File

@ -12,6 +12,7 @@ import time
import threading import threading
from itertools import chain from itertools import chain
import collections import collections
import traceback
try: try:
from . import cfg from . import cfg
@ -119,27 +120,22 @@ def call_lvm(command, debug=False):
def _shell_cfg(): def _shell_cfg():
global _t_call global _t_call
log_debug('Using lvm shell!') try:
lvm_shell = LVMShellProxy() lvm_shell = LVMShellProxy()
_t_call = lvm_shell.call_lvm _t_call = lvm_shell.call_lvm
cfg.USE_SHELL = True
except Exception:
if cfg.USE_SHELL:
_shell_cfg()
else:
_t_call = call_lvm _t_call = call_lvm
log_error(traceback.format_exc())
log_error("Unable to utilize lvm shell, dropping back to fork & exec")
def set_execution(shell): def set_execution(shell):
global _t_call global _t_call
with cmd_lock: with cmd_lock:
_t_call = None
if shell:
log_debug('Using lvm shell!')
lvm_shell = LVMShellProxy()
_t_call = lvm_shell.call_lvm
else:
_t_call = call_lvm _t_call = call_lvm
if shell:
_shell_cfg()
def time_wrapper(command, debug=False): def time_wrapper(command, debug=False):
@ -219,6 +215,13 @@ def pv_remove(device, remove_options):
return call(cmd) return call(cmd)
def _qt(tag_name):
# When running in lvm shell you need to quote the tags
if cfg.USE_SHELL:
return '"%s"' % tag_name
return tag_name
def _tag(operation, what, add, rm, tag_options): def _tag(operation, what, add, rm, tag_options):
cmd = [operation] cmd = [operation]
cmd.extend(options_to_cli_args(tag_options)) cmd.extend(options_to_cli_args(tag_options))
@ -229,9 +232,11 @@ def _tag(operation, what, add, rm, tag_options):
cmd.append(what) cmd.append(what)
if add: if add:
cmd.extend(list(chain.from_iterable(('--addtag', x) for x in add))) cmd.extend(list(chain.from_iterable(
('--addtag', _qt(x)) for x in add)))
if rm: if rm:
cmd.extend(list(chain.from_iterable(('--deltag', x) for x in rm))) cmd.extend(list(chain.from_iterable(
('--deltag', _qt(x)) for x in rm)))
return call(cmd, False) return call(cmd, False)
@ -435,6 +440,9 @@ def supports_json():
cmd = ['help'] cmd = ['help']
rc, out, err = call(cmd) rc, out, err = call(cmd)
if rc == 0: if rc == 0:
if cfg.USE_SHELL:
return True
else:
if 'fullreport' in err: if 'fullreport' in err:
return True return True
return False return False
@ -477,6 +485,13 @@ def lvm_full_report_json():
rc, out, err = call(cmd) rc, out, err = call(cmd)
if rc == 0: if rc == 0:
# With the current implementation, if we are using the shell then we
# are using JSON and JSON is returned back to us as it was parsed to
# figure out if we completed OK or not
if cfg.USE_SHELL:
assert(type(out) == dict)
return out
else:
return json.loads(out) return json.loads(out)
return None return None

View File

@ -14,10 +14,20 @@
import subprocess import subprocess
import shlex import shlex
from fcntl import fcntl, F_GETFL, F_SETFL from fcntl import fcntl, F_GETFL, F_SETFL
from os import O_NONBLOCK import os
import traceback import traceback
import sys import sys
import re import tempfile
import time
import select
import copy
try:
from simplejson.scanner import JSONDecodeError
import simplejson as json
except ImportError:
import json
try: try:
from .cfg import LVM_CMD from .cfg import LVM_CMD
@ -38,42 +48,52 @@ def _quote_arg(arg):
class LVMShellProxy(object): class LVMShellProxy(object):
def _read_until_prompt(self): def _read_until_prompt(self):
prev_ec = None
stdout = "" stdout = ""
report = ""
stderr = ""
# Try reading from all FDs to prevent one from filling up and causing
# a hang. We are also assuming that we won't get the lvm prompt back
# until we have already received all the output from stderr and the
# report descriptor too.
while not stdout.endswith(SHELL_PROMPT): while not stdout.endswith(SHELL_PROMPT):
try: try:
rd_fd = [
self.lvm_shell.stdout.fileno(),
self.report_r,
self.lvm_shell.stderr.fileno()]
ready = select.select(rd_fd, [], [], 2)
for r in ready[0]:
if r == self.lvm_shell.stdout.fileno():
while True:
tmp = self.lvm_shell.stdout.read() tmp = self.lvm_shell.stdout.read()
if tmp: if tmp:
stdout += tmp.decode("utf-8") stdout += tmp.decode("utf-8")
except IOError:
# nothing written yet
pass
# strip the prompt from the STDOUT before returning and grab the exit
# code if it's available
m = self.re.match(stdout)
if m:
prev_ec = int(m.group(2))
strip_idx = -1 * len(m.group(1))
else: else:
strip_idx = -1 * len(SHELL_PROMPT) break
return stdout[:strip_idx], prev_ec elif r == self.report_r:
def _read_line(self):
while True: while True:
try: tmp = os.read(self.report_r, 16384)
tmp = self.lvm_shell.stdout.readline()
if tmp: if tmp:
return tmp.decode("utf-8") report += tmp.decode("utf-8")
except IOError: if len(tmp) != 16384:
break
elif r == self.lvm_shell.stderr.fileno():
while True:
tmp = self.lvm_shell.stderr.read()
if tmp:
stderr += tmp.decode("utf-8")
else:
break
except IOError as ioe:
log_debug(str(ioe))
pass pass
def _discard_echo(self, expected): return stdout, report, stderr
line = ""
while line != expected:
# GNU readline inserts some interesting characters at times...
line += self._read_line().replace(' \r', '')
def _write_cmd(self, cmd): def _write_cmd(self, cmd):
cmd_bytes = bytes(cmd, "utf-8") cmd_bytes = bytes(cmd, "utf-8")
@ -81,39 +101,82 @@ class LVMShellProxy(object):
assert (num_written == len(cmd_bytes)) assert (num_written == len(cmd_bytes))
self.lvm_shell.stdin.flush() self.lvm_shell.stdin.flush()
def _lvm_echos(self):
echo = False
cmd = "version\n"
self._write_cmd(cmd)
line = self._read_line()
if line == cmd:
echo = True
self._read_until_prompt()
return echo
def __init__(self): def __init__(self):
self.re = re.compile(".*(\[(-?[0-9]+)\] lvm> $)", re.DOTALL)
# Create a temp directory
tmp_dir = tempfile.mkdtemp(prefix="lvmdbus_")
tmp_file = "%s/lvmdbus_report" % (tmp_dir)
try:
# Lets create fifo for the report output
os.mkfifo(tmp_file, 0o600)
except FileExistsError:
pass
self.report_r = os.open(tmp_file, os.O_NONBLOCK)
# Setup the environment for using our own socket for reporting
local_env = copy.deepcopy(os.environ)
local_env["LVM_REPORT_FD"] = "32"
local_env["LVM_COMMAND_PROFILE"] = "lvmdbusd"
flags = fcntl(self.report_r, F_GETFL)
fcntl(self.report_r, F_SETFL, flags | os.O_NONBLOCK)
# run the lvm shell # run the lvm shell
self.lvm_shell = subprocess.Popen( self.lvm_shell = subprocess.Popen(
[LVM_CMD], stdin=subprocess.PIPE, stdout=subprocess.PIPE, [LVM_CMD + " 32>%s" % tmp_file],
stderr=subprocess.PIPE, close_fds=True) stdin=subprocess.PIPE, stdout=subprocess.PIPE, env=local_env,
stderr=subprocess.PIPE, close_fds=True, shell=True)
flags = fcntl(self.lvm_shell.stdout, F_GETFL) flags = fcntl(self.lvm_shell.stdout, F_GETFL)
fcntl(self.lvm_shell.stdout, F_SETFL, flags | O_NONBLOCK) fcntl(self.lvm_shell.stdout, F_SETFL, flags | os.O_NONBLOCK)
flags = fcntl(self.lvm_shell.stderr, F_GETFL) flags = fcntl(self.lvm_shell.stderr, F_GETFL)
fcntl(self.lvm_shell.stderr, F_SETFL, flags | O_NONBLOCK) fcntl(self.lvm_shell.stderr, F_SETFL, flags | os.O_NONBLOCK)
# wait for the first prompt # wait for the first prompt
self._read_until_prompt() errors = self._read_until_prompt()[2]
if errors and len(errors):
raise RuntimeError(errors)
# Check to see if the version of LVM we are using is running with # These will get deleted when the FD count goes to zero so we can be
# gnu readline which will echo our writes from stdin to stdout # sure to clean up correctly no matter how we finish
self.echo = self._lvm_echos() os.unlink(tmp_file)
os.rmdir(tmp_dir)
def get_error_msg(self):
# We got an error, lets go fetch the error message
self._write_cmd('lastlog\n')
# read everything from the STDOUT to the next prompt
stdout, report, stderr = self._read_until_prompt()
try:
log = json.loads(report)
if 'log' in log:
error_msg = ""
# Walk the entire log array and build an error string
for log_entry in log['log']:
if log_entry['log_type'] == "error":
if error_msg:
error_msg += ', ' + log_entry['log_message']
else:
error_msg = log_entry['log_message']
return error_msg
return 'No error reason provided! (missing "log" section)'
except ValueError:
log_error("Invalid JSON returned from LVM")
log_error("BEGIN>>\n%s\n<<END" % report)
return "Invalid JSON returned from LVM when retrieving exit code"
def call_lvm(self, argv, debug=False): def call_lvm(self, argv, debug=False):
rc = 1
error_msg = ""
json_result = ""
# create the command string # create the command string
cmd = " ".join(_quote_arg(arg) for arg in argv) cmd = " ".join(_quote_arg(arg) for arg in argv)
cmd += "\n" cmd += "\n"
@ -121,46 +184,30 @@ class LVMShellProxy(object):
# run the command by writing it to the shell's STDIN # run the command by writing it to the shell's STDIN
self._write_cmd(cmd) self._write_cmd(cmd)
# If lvm is utilizing gnu readline, it echos stdin to stdout
if self.echo:
self._discard_echo(cmd)
# read everything from the STDOUT to the next prompt # read everything from the STDOUT to the next prompt
stdout, exit_code = self._read_until_prompt() stdout, report, stderr = self._read_until_prompt()
# read everything from STDERR if there's something (we waited for the # Parse the report to see what happened
# prompt on STDOUT so there should be all or nothing at this point on if report and len(report):
# STDERR) json_result = json.loads(report)
stderr = None if 'log' in json_result:
try: if json_result['log'][-1:][0]['log_ret_code'] == '1':
t_error = self.lvm_shell.stderr.read()
if t_error:
stderr = t_error.decode("utf-8")
except IOError:
# nothing on STDERR
pass
if exit_code is not None:
rc = exit_code
else:
# LVM does write to stderr even when it did complete successfully,
# so without having the exit code in the prompt we can never be
# sure.
if stderr:
rc = 1
else:
rc = 0 rc = 0
else:
error_msg = self.get_error_msg()
if debug or rc != 0: if debug or rc != 0:
log_error(('CMD: %s' % cmd)) log_error(('CMD: %s' % cmd))
log_error(("EC = %d" % rc)) log_error(("EC = %d" % rc))
log_error(("STDOUT=\n %s\n" % stdout)) log_error(("ERROR_MSG=\n %s\n" % error_msg))
log_error(("STDERR=\n %s\n" % stderr))
return (rc, stdout, stderr) return rc, json_result, error_msg
def __del__(self): def __del__(self):
try:
self.lvm_shell.terminate() self.lvm_shell.terminate()
except:
pass
if __name__ == "__main__": if __name__ == "__main__":
@ -170,10 +217,15 @@ if __name__ == "__main__":
while in_line: while in_line:
in_line = input("lvm> ") in_line = input("lvm> ")
if in_line: if in_line:
ret, out, err, = shell.call_lvm(in_line.split()) start = time.time()
print(("RET: %d" % ret)) ret, out, err = shell.call_lvm(in_line.split())
print(("OUT:\n%s" % out)) end = time.time()
print(("RC: %d" % ret))
#print(("OUT:\n%s" % out))
print(("ERR:\n%s" % err)) print(("ERR:\n%s" % err))
print("Command = %f seconds" % (end - start))
except KeyboardInterrupt: except KeyboardInterrupt:
pass pass
except EOFError: except EOFError:

View File

@ -100,10 +100,12 @@ def main():
parser.add_argument("--debug", action='store_true', parser.add_argument("--debug", action='store_true',
help="Dump debug messages", default=False, help="Dump debug messages", default=False,
dest='debug') dest='debug')
parser.add_argument("--nojson", action='store_false', parser.add_argument("--nojson", action='store_false',
help="Do not use LVM JSON output", default=None, help="Do not use LVM JSON output", default=None,
dest='use_json') dest='use_json')
parser.add_argument("--lvmshell", action='store_true',
help="Use the lvm shell, not fork & exec lvm", default=False,
dest='use_lvm_shell')
use_session = os.getenv('LVMDBUSD_USE_SESSION', False) use_session = os.getenv('LVMDBUSD_USE_SESSION', False)
@ -113,6 +115,7 @@ def main():
args = parser.parse_args() args = parser.parse_args()
cfg.DEBUG = args.debug cfg.DEBUG = args.debug
cmdhandler.set_execution(args.use_lvm_shell)
# List of threads that we start up # List of threads that we start up
thread_list = [] thread_list = []
@ -159,7 +162,7 @@ def main():
end = time.time() end = time.time()
log_debug( log_debug(
'Service ready! total time= %.2f, lvm time= %.2f count= %d' % 'Service ready! total time= %.4f, lvm time= %.4f count= %d' %
(end - start, cmdhandler.total_time, cmdhandler.total_count), (end - start, cmdhandler.total_time, cmdhandler.total_count),
'bg_black', 'fg_light_green') 'bg_black', 'fg_light_green')