#!/usr/bin/env python3 # Copyright (C) 2015-2016 Red Hat, Inc. All rights reserved. # # This copyrighted material is made available to anyone wishing to use, # modify, copy, or redistribute it subject to the terms and conditions # of the GNU General Public License v.2. # # You should have received a copy of the GNU General Public License # along with this program. If not, see . # # Copyright 2015-2016, Vratislav Podzimek import subprocess import shlex from fcntl import fcntl, F_GETFL, F_SETFL import os import traceback import sys import tempfile import time import select import copy try: import simplejson as json except ImportError: import json from lvmdbusd.cfg import LVM_CMD from lvmdbusd.utils import log_debug, log_error, add_no_notify SHELL_PROMPT = "lvm> " def _quote_arg(arg): if len(shlex.split(arg)) > 1: return '"%s"' % arg else: return arg class LVMShellProxy(object): @staticmethod def _read(stream): tmp = stream.read() if tmp: return tmp.decode("utf-8") return '' # Read until we get prompt back and a result # @param: no_output Caller expects no output to report FD # Returns stdout, report, stderr (report is JSON!) def _read_until_prompt(self, no_output=False): stdout = "" report = "" stderr = "" keep_reading = True extra_passes = 3 report_json = {} prev_report_len = 0 # Try reading from all FDs to prevent one from filling up and causing # a hang. Keep reading until we get the prompt back and the report # FD does not contain valid JSON while keep_reading: try: rd_fd = [ self.lvm_shell.stdout.fileno(), self.report_stream.fileno(), self.lvm_shell.stderr.fileno()] ready = select.select(rd_fd, [], [], 2) for r in ready[0]: if r == self.lvm_shell.stdout.fileno(): stdout += LVMShellProxy._read(self.lvm_shell.stdout) elif r == self.report_stream.fileno(): report += LVMShellProxy._read(self.report_stream) elif r == self.lvm_shell.stderr.fileno(): stderr += LVMShellProxy._read(self.lvm_shell.stderr) # Check to see if the lvm process died on us if self.lvm_shell.poll(): raise Exception(self.lvm_shell.returncode, "%s" % stderr) if stdout.endswith(SHELL_PROMPT): if no_output: keep_reading = False else: cur_report_len = len(report) if cur_report_len != 0: # Only bother to parse if we have more data if prev_report_len != cur_report_len: prev_report_len = cur_report_len # Parse the JSON if it's good we are done, # if not we will try to read some more. try: report_json = json.loads(report) keep_reading = False except ValueError: pass if keep_reading: extra_passes -= 1 if extra_passes <= 0: if len(report): raise ValueError("Invalid json: %s" % report) else: raise ValueError( "lvm returned no JSON output!") except IOError as ioe: log_debug(str(ioe)) pass return stdout, report_json, stderr def _write_cmd(self, cmd): cmd_bytes = bytes(cmd, "utf-8") num_written = self.lvm_shell.stdin.write(cmd_bytes) assert (num_written == len(cmd_bytes)) self.lvm_shell.stdin.flush() @staticmethod def _make_non_block(stream): flags = fcntl(stream, F_GETFL) fcntl(stream, F_SETFL, flags | os.O_NONBLOCK) def __init__(self): # Create a temp directory tmp_dir = tempfile.mkdtemp(prefix="lvmdbus_") tmp_file = "%s/lvmdbus_report" % (tmp_dir) try: # Lets create fifo for the report output os.mkfifo(tmp_file, 0o600) except FileExistsError: pass # We have to open non-blocking as the other side isn't open until # we actually fork the process. self.report_fd = os.open(tmp_file, os.O_NONBLOCK) self.report_stream = os.fdopen(self.report_fd, 'rb', 0) # Setup the environment for using our own socket for reporting local_env = copy.deepcopy(os.environ) local_env["LVM_REPORT_FD"] = "32" local_env["LVM_COMMAND_PROFILE"] = "lvmdbusd" # Disable the abort logic if lvm logs too much, which easily happens # when utilizing the lvm shell. local_env["LVM_LOG_FILE_MAX_LINES"] = "0" # run the lvm shell self.lvm_shell = subprocess.Popen( [LVM_CMD + " 32>%s" % tmp_file], stdin=subprocess.PIPE, stdout=subprocess.PIPE, env=local_env, stderr=subprocess.PIPE, close_fds=True, shell=True) try: LVMShellProxy._make_non_block(self.lvm_shell.stdout) LVMShellProxy._make_non_block(self.lvm_shell.stderr) # wait for the first prompt errors = self._read_until_prompt(no_output=True)[2] if errors and len(errors): raise RuntimeError(errors) except: raise finally: # These will get deleted when the FD count goes to zero so we # can be sure to clean up correctly no matter how we finish os.unlink(tmp_file) os.rmdir(tmp_dir) def get_error_msg(self): # We got an error, lets go fetch the error message self._write_cmd('lastlog\n') # read everything from the STDOUT to the next prompt stdout, report_json, stderr = self._read_until_prompt() if 'log' in report_json: error_msg = "" # Walk the entire log array and build an error string for log_entry in report_json['log']: if log_entry['log_type'] == "error": if error_msg: error_msg += ', ' + log_entry['log_message'] else: error_msg = log_entry['log_message'] return error_msg return 'No error reason provided! (missing "log" section)' def call_lvm(self, argv, debug=False): rc = 1 error_msg = "" if self.lvm_shell.poll(): raise Exception( self.lvm_shell.returncode, "Underlying lvm shell process is not present!") argv = add_no_notify(argv) # create the command string cmd = " ".join(_quote_arg(arg) for arg in argv) cmd += "\n" # run the command by writing it to the shell's STDIN self._write_cmd(cmd) # read everything from the STDOUT to the next prompt stdout, report_json, stderr = self._read_until_prompt() # Parse the report to see what happened if 'log' in report_json: if report_json['log'][-1:][0]['log_ret_code'] == '1': rc = 0 else: error_msg = self.get_error_msg() if debug or rc != 0: log_error(('CMD: %s' % cmd)) log_error(("EC = %d" % rc)) log_error(("ERROR_MSG=\n %s\n" % error_msg)) return rc, report_json, error_msg def exit_shell(self): try: self._write_cmd('exit\n') except Exception as e: log_error(str(e)) def __del__(self): try: self.lvm_shell.terminate() except: pass if __name__ == "__main__": shell = LVMShellProxy() in_line = "start" try: while in_line: in_line = input("lvm> ") if in_line: start = time.time() ret, out, err = shell.call_lvm(in_line.split()) end = time.time() print(("RC: %d" % ret)) print(("OUT:\n%s" % out)) print(("ERR:\n%s" % err)) print("Command = %f seconds" % (end - start)) except KeyboardInterrupt: pass except EOFError: pass except Exception: traceback.print_exc(file=sys.stdout)