#!@PYTHON3@ # Copyright (C) 2015-2016 Red Hat, Inc. All rights reserved. # # This copyrighted material is made available to anyone wishing to use, # modify, copy, or redistribute it subject to the terms and conditions # of the GNU General Public License v.2. # # You should have received a copy of the GNU General Public License # along with this program. If not, see . # # Copyright 2015-2016, Vratislav Podzimek import subprocess import shlex import os import pty import sys import tempfile import time import select try: import simplejson as json except ImportError: import json import lvmdbusd.cfg as cfg from lvmdbusd.utils import log_debug, log_error, add_no_notify, make_non_block,\ read_decoded, extract_stack_trace, LvmBug SHELL_PROMPT = "lvm> " def _quote_arg(arg): if len(shlex.split(arg)) > 1: return '"%s"' % arg else: return arg class LVMShellProxy(object): # Read REPORT FD until we have a complete and valid JSON record or give # up trying to get one. # # Returns stdout, report (JSON), stderr def _read_response(self, no_output=False): stdout = "" report = "" stderr = "" keep_reading = True extra_passes = 3 report_json = {} prev_report_len = 0 # Try reading from all FDs to prevent one from filling up and causing # a hang. Keep reading until we get the prompt back and the report # FD does not contain valid JSON while keep_reading and cfg.run.value != 0: try: rd_fd = [ self.parent_stdout_fd, self.report_stream.fileno(), self.parent_stderr_fd] ready = select.select(rd_fd, [], [], 2) for r in ready[0]: if r == self.parent_stdout_fd: for line in self.parent_stdout.readlines(): stdout += line elif r == self.report_stream.fileno(): report += read_decoded(self.report_stream) elif r == self.parent_stderr_fd: for line in self.parent_stderr.readlines(): stderr += line # Check to see if the lvm process died on us if self.lvm_shell.poll() is not None: raise Exception(self.lvm_shell.returncode, "%s" % stderr) if stdout.endswith(SHELL_PROMPT): if no_output: keep_reading = False else: cur_report_len = len(report) if cur_report_len != 0: # Only bother to parse if we have more data if prev_report_len != cur_report_len: prev_report_len = cur_report_len # Parse the JSON if it's good we are done, # if not we will try to read some more. try: report_json = json.loads(report) keep_reading = False except ValueError: pass if keep_reading: extra_passes -= 1 if extra_passes <= 0: if len(report): raise LvmBug("Invalid json: %s" % report) else: raise LvmBug( "lvm returned no JSON output!") except IOError as ioe: log_debug(str(ioe)) self.exit_shell() raise ioe if keep_reading and cfg.run.value == 0: # We didn't complete as we are shutting down # Try to clean up lvm shell process log_debug("exiting lvm shell as we are shutting down") self.exit_shell() raise SystemExit return stdout, report_json, stderr def _write_cmd(self, cmd): self.parent_stdin.write(cmd) self.parent_stdin.flush() def __init__(self): # Create a temp directory tmp_dir = tempfile.mkdtemp(prefix="lvmdbus_") tmp_file = "%s/lvmdbus_report" % (tmp_dir) # Create a fifo for the report output os.mkfifo(tmp_file, 0o600) # Open the fifo for use to read and for lvm child process to write to. self.report_fd = os.open(tmp_file, os.O_NONBLOCK) self.report_stream = os.fdopen(self.report_fd, 'rb', 0) lvm_fd = os.open(tmp_file, os.O_WRONLY) # Set up the environment for using our own socket for reporting and disable the abort # logic if lvm logs too much, which easily happens when utilizing the lvm shell. local_env = {"LC_ALL": "C", "LVM_REPORT_FD": "%s" % lvm_fd, "LVM_COMMAND_PROFILE": "lvmdbusd", "LVM_LOG_FILE_MAX_LINES": "0"} # If any env variables contain LVM we will propagate them too for k, v in os.environ.items(): if "LVM" in k: local_env[k] = v self.parent_stdin_fd, child_stdin_fd = pty.openpty() self.parent_stdout_fd, child_stdout_fd = pty.openpty() self.parent_stderr_fd, child_stderr_fd = pty.openpty() self.parent_stdin = os.fdopen(self.parent_stdin_fd, "w") self.parent_stdout = os.fdopen(self.parent_stdout_fd, "r") self.parent_stderr = os.fdopen(self.parent_stderr_fd, "r") # run the lvm shell self.lvm_shell = subprocess.Popen( [cfg.LVM_CMD], stdin=child_stdin_fd, stdout=child_stdout_fd, env=local_env, stderr=child_stderr_fd, close_fds=True, pass_fds=(lvm_fd,), shell=False) try: make_non_block(self.parent_stdout_fd) make_non_block(self.parent_stderr_fd) # Close our copies of the child FDs there were created with the fork, we don't need them open. os.close(lvm_fd) os.close(child_stdin_fd) os.close(child_stdout_fd) os.close(child_stderr_fd) # wait for the first prompt log_debug("waiting for first prompt...") errors = self._read_response(no_output=True)[2] if errors and len(errors): raise LvmBug(errors) log_debug("lvm prompt read!!!") except: raise finally: # These will get deleted when the FD count goes to zero, so we # can be sure to clean up correctly no matter how we finish os.unlink(tmp_file) os.rmdir(tmp_dir) def get_last_log(self): self._write_cmd('lastlog\n') report_json = self._read_response()[1] return LVMShellProxy.get_error_msg(report_json) @staticmethod def get_error_msg(report_json): # Get the error message from the returned JSON if 'log' in report_json: error_msg = "" # Walk the entire log array and build an error string for log_entry in report_json['log']: if log_entry['log_type'] == "error": if error_msg: error_msg += ', ' + log_entry['log_message'] else: error_msg = log_entry['log_message'] return error_msg return None def call_lvm(self, argv, debug=False): rc = 1 error_msg = "" if self.lvm_shell.poll(): raise Exception( self.lvm_shell.returncode, "Underlying lvm shell process is not present!") argv = add_no_notify(argv) # create the command string cmd = " ".join(_quote_arg(arg) for arg in argv) cmd += "\n" # run the command by writing it to the shell's STDIN self._write_cmd(cmd) # read everything from the STDOUT to the next prompt stdout, report_json, stderr = self._read_response() # Parse the report to see what happened if 'log' in report_json: ret_code = int(report_json['log'][-1:][0]['log_ret_code']) # If we have an exported vg we get a log_ret_code == 5 when # we do a 'fullreport' # Note: 0 == error if (ret_code == 1) or (ret_code == 5 and argv[0] == 'fullreport'): rc = 0 else: # Depending on where lvm fails the command, it may not have anything # to report for "lastlog", so we need to check for a message in the # report json too. error_msg = self.get_last_log() if error_msg is None: error_msg = LVMShellProxy.get_error_msg(report_json) if error_msg is None: error_msg = 'No error reason provided! (missing "log" section)' if debug or rc != 0: log_error(("CMD= %s" % cmd)) log_error(("EC= %d" % rc)) log_error(("ERROR_MSG=\n %s\n" % error_msg)) return rc, report_json, error_msg def exit_shell(self): try: self._write_cmd('exit\n') self.lvm_shell.wait(1) self.lvm_shell = None except Exception as _e: log_error(str(_e)) def __del__(self): # Note: When we are shutting down the daemon and the main process has already exited # and this gets called we have a limited set of things we can do, like we cannot call # log_error as _common_log is None!!! if self.lvm_shell is not None: try: self.lvm_shell.wait(1) except subprocess.TimeoutExpired: print("lvm shell child process did not exit as instructed, sending SIGTERM") cfg.ignore_sigterm = True self.lvm_shell.terminate() child_exit_code = self.lvm_shell.wait(1) print("lvm shell process exited with %d" % child_exit_code) if __name__ == "__main__": print("USING LVM BINARY: %s " % cfg.LVM_CMD) try: if len(sys.argv) > 1 and sys.argv[1] == "bisect": shell = LVMShellProxy() shell.exit_shell() else: shell = LVMShellProxy() in_line = "start" try: while in_line: in_line = input("lvm> ") if in_line: if in_line == "exit": shell.exit_shell() sys.exit(0) start = time.time() ret, out, err = shell.call_lvm(in_line.split()) end = time.time() print(("RC: %d" % ret)) print(("OUT:\n%s" % out)) print(("ERR:\n%s" % err)) print("Command = %f seconds" % (end - start)) except KeyboardInterrupt: pass except EOFError: pass except Exception as e: log_error("main process exiting on exception!\n%s" % extract_stack_trace(e)) sys.exit(1) sys.exit(0)