#!@PYTHON3@ # Copyright (C) 2015-2016 Red Hat, Inc. All rights reserved. # # This copyrighted material is made available to anyone wishing to use, # modify, copy, or redistribute it subject to the terms and conditions # of the GNU General Public License v.2. # # You should have received a copy of the GNU General Public License # along with this program. If not, see . # # Copyright 2015-2016, Vratislav Podzimek import subprocess import shlex import os import traceback import sys import tempfile import time import select try: import simplejson as json except ImportError: import json from lvmdbusd.cfg import LVM_CMD from lvmdbusd.utils import log_debug, log_error, add_no_notify, make_non_block,\ read_decoded def _quote_arg(arg): if len(shlex.split(arg)) > 1: return '"%s"' % arg else: return arg class LVMShellProxy(object): # Read REPORT FD until we have a complete and valid JSON record or give # up trying to get one. # # Returns stdout, report (JSON), stderr def _read_response(self): stdout = "" report = "" stderr = "" keep_reading = True extra_passes = 3 report_json = {} prev_report_len = 0 # Try reading from all FDs to prevent one from filling up and causing # a hang. Keep reading until we get the prompt back and the report # FD does not contain valid JSON while keep_reading: try: rd_fd = [ self.lvm_shell.stdout.fileno(), self.report_stream.fileno(), self.lvm_shell.stderr.fileno()] ready = select.select(rd_fd, [], [], 2) for r in ready[0]: if r == self.lvm_shell.stdout.fileno(): stdout += read_decoded(self.lvm_shell.stdout) elif r == self.report_stream.fileno(): report += read_decoded(self.report_stream) elif r == self.lvm_shell.stderr.fileno(): stderr += read_decoded(self.lvm_shell.stderr) # Check to see if the lvm process died on us if self.lvm_shell.poll() is not None: raise Exception(self.lvm_shell.returncode, "%s" % stderr) cur_report_len = len(report) if cur_report_len != 0: # Only bother to parse if we have more data and the last 2 characters match expected # complete JSON, prevents excessive JSON parsing attempts if prev_report_len != cur_report_len and report[-2:] == "}\n": prev_report_len = cur_report_len # Parse the JSON if it's good we are done, # if not we will try to read some more. try: report_json = json.loads(report) keep_reading = False except ValueError: pass # As long as lvm is spewing something on one of the FDs we will # keep trying. If we get a few timeouts with no activity, and # we don't have valid JSON, we will raise an error. if len(ready) == 0 and keep_reading: extra_passes -= 1 if extra_passes <= 0: if len(report): raise ValueError("Invalid json: %s" % report) else: raise ValueError( "lvm returned no JSON output!") except IOError as ioe: log_debug(str(ioe)) pass return stdout, report_json, stderr def _write_cmd(self, cmd): cmd_bytes = bytes(cmd, "utf-8") num_written = self.lvm_shell.stdin.write(cmd_bytes) assert (num_written == len(cmd_bytes)) self.lvm_shell.stdin.flush() def __init__(self): # Create a temp directory tmp_dir = tempfile.mkdtemp(prefix="lvmdbus_") tmp_file = "%s/lvmdbus_report" % (tmp_dir) # Create a fifo for the report output os.mkfifo(tmp_file, 0o600) # Open the fifo for use to read and for lvm child process to write to. self.report_fd = os.open(tmp_file, os.O_NONBLOCK) self.report_stream = os.fdopen(self.report_fd, 'rb', 0) lvm_fd = os.open(tmp_file, os.O_WRONLY) # Set up the environment for using our own socket for reporting and disable the abort # logic if lvm logs too much, which easily happens when utilizing the lvm shell. local_env = {"LC_ALL": "C", "LVM_REPORT_FD": "%s" % lvm_fd, "LVM_COMMAND_PROFILE": "lvmdbusd", "LVM_LOG_FILE_MAX_LINES": "0"} # If any env variables contain LVM we will propagate them too for k, v in os.environ.items(): if "LVM" in k: local_env[k] = v # run the lvm shell self.lvm_shell = subprocess.Popen( [LVM_CMD], stdin=subprocess.PIPE, stdout=subprocess.PIPE, env=local_env, stderr=subprocess.PIPE, close_fds=True, pass_fds=(lvm_fd,), shell=False) try: make_non_block(self.lvm_shell.stdout) make_non_block(self.lvm_shell.stderr) # Close our copy of the lvm_fd, child process is open in its process space os.close(lvm_fd) # Assume we are ready as we may not get the lvm prompt message depending on # if we are using readline or editline. except: raise finally: # These will get deleted when the FD count goes to zero, so we # can be sure to clean up correctly no matter how we finish os.unlink(tmp_file) os.rmdir(tmp_dir) def get_last_log(self): self._write_cmd('lastlog\n') report_json= self._read_response()[1] return LVMShellProxy.get_error_msg(report_json) @staticmethod def get_error_msg(report_json): # Get the error message from the returned JSON if 'log' in report_json: error_msg = "" # Walk the entire log array and build an error string for log_entry in report_json['log']: if log_entry['log_type'] == "error": if error_msg: error_msg += ', ' + log_entry['log_message'] else: error_msg = log_entry['log_message'] return error_msg return None def call_lvm(self, argv, debug=False): rc = 1 error_msg = "" if self.lvm_shell.poll(): raise Exception( self.lvm_shell.returncode, "Underlying lvm shell process is not present!") argv = add_no_notify(argv) # create the command string cmd = " ".join(_quote_arg(arg) for arg in argv) cmd += "\n" # run the command by writing it to the shell's STDIN self._write_cmd(cmd) # read everything from the STDOUT to the next prompt stdout, report_json, stderr = self._read_response() # Parse the report to see what happened if 'log' in report_json: ret_code = int(report_json['log'][-1:][0]['log_ret_code']) # If we have an exported vg we get a log_ret_code == 5 when # we do a 'fullreport' # Note: 0 == error if (ret_code == 1) or (ret_code == 5 and argv[0] == 'fullreport'): rc = 0 else: # Depending on where lvm fails the command, it may not have anything # to report for "lastlog", so we need to check for a message in the # report json too. error_msg = self.get_last_log() if error_msg is None: error_msg = LVMShellProxy.get_error_msg(report_json) if error_msg is None: error_msg = 'No error reason provided! (missing "log" section)' if debug or rc != 0: log_error(('CMD: %s' % cmd)) log_error(("EC = %d" % rc)) log_error(("ERROR_MSG=\n %s\n" % error_msg)) return rc, report_json, error_msg def exit_shell(self): try: self._write_cmd('exit\n') except Exception as e: log_error(str(e)) def __del__(self): try: self.lvm_shell.terminate() except: pass if __name__ == "__main__": print("USING LVM BINARY: %s " % LVM_CMD) try: if len(sys.argv) > 1 and sys.argv[1] == "bisect": shell = LVMShellProxy() shell.exit_shell() else: shell = LVMShellProxy() in_line = "start" try: while in_line: in_line = input("lvm> ") if in_line: start = time.time() ret, out, err = shell.call_lvm(in_line.split()) end = time.time() print(("RC: %d" % ret)) print(("OUT:\n%s" % out)) print(("ERR:\n%s" % err)) print("Command = %f seconds" % (end - start)) except KeyboardInterrupt: pass except EOFError: pass except Exception: traceback.print_exc(file=sys.stdout) sys.exit(1) sys.exit(0)