tools/glusterfind: kill remote processes and separate run-time directories
Problem #1: Hitting CTRL+C leaves stale processes on remote nodes if glusterfind pre has been initiated. Solution #1: Adding "-t -t" to ssh command-line forces pseudo-terminal to be assigned to remote process. When local process receives Keyboard Interrupt, SIGHUP is immediately conveyed to the remote terminal causing remote changelog.py process to terminate immediately. Problem #2: Concurrent glusterfind pre runs are not possible on the same glusterfind session in case of a runaway process. Solution #2: glusterfind pre runs now add random directory name to the working directory to store and manage temporary database and changelog processing. If KeyboardInterrupt is received, the function call run_cmd_nodes("cleanup", args, tmpfilename=gtmpfilename) cleans up the remote run specific directory. Patch: 7571380 cli/xml: Fix wrong XML format in volume get command broke "gluster volume get <vol> changelog.rollover-time --xml" Now fixed function utils.py::get_changelog_rollover_time() Fixed spurious trailing space getting written if second path is empty in main.py::write_output() Fixed repetitive changelog processing in changelog.py::get_changes() Change-Id: Ia8d96e2cd47bf2a64416bece312e67631a1dbf29 BUG: 1382236 Signed-off-by: Milind Changire <mchangir@redhat.com> Reviewed-on: http://review.gluster.org/15609 Smoke: Gluster Build System <jenkins@build.gluster.org> CentOS-regression: Gluster Build System <jenkins@build.gluster.org> NetBSD-regression: NetBSD Build System <jenkins@build.gluster.org> Reviewed-by: Aravinda VK <avishwan@redhat.com>
This commit is contained in:
parent
62238f9fb3
commit
feea851fad
@ -284,7 +284,7 @@ def get_changes(brick, hash_dir, log_file, start, end, args):
|
||||
# history_getchanges()
|
||||
changes = []
|
||||
while libgfchangelog.cl_history_scan() > 0:
|
||||
changes += libgfchangelog.cl_history_getchanges()
|
||||
changes = libgfchangelog.cl_history_getchanges()
|
||||
|
||||
for change in changes:
|
||||
# Ignore if last processed changelog comes
|
||||
|
@ -18,6 +18,9 @@ import xml.etree.cElementTree as etree
|
||||
from argparse import ArgumentParser, RawDescriptionHelpFormatter, Action
|
||||
import logging
|
||||
import shutil
|
||||
import tempfile
|
||||
import signal
|
||||
from datetime import datetime
|
||||
|
||||
from utils import execute, is_host_local, mkdirp, fail
|
||||
from utils import setup_logger, human_time, handle_rm_error
|
||||
@ -34,6 +37,7 @@ ParseError = etree.ParseError if hasattr(etree, 'ParseError') else SyntaxError
|
||||
logger = logging.getLogger()
|
||||
node_outfiles = []
|
||||
vol_statusStr = ""
|
||||
gtmpfilename = None
|
||||
|
||||
|
||||
class StoreAbsPath(Action):
|
||||
@ -71,6 +75,8 @@ def node_cmd(host, host_uuid, task, cmd, args, opts):
|
||||
cmd = ["ssh",
|
||||
"-oNumberOfPasswordPrompts=0",
|
||||
"-oStrictHostKeyChecking=no",
|
||||
"-t",
|
||||
"-t",
|
||||
"-i", pem_key_path,
|
||||
"root@%s" % host] + cmd
|
||||
|
||||
@ -98,8 +104,13 @@ def run_cmd_nodes(task, args, **kwargs):
|
||||
host_uuid = node[0]
|
||||
cmd = []
|
||||
opts = {}
|
||||
|
||||
# tmpfilename is valid only for tasks: pre, query and cleanup
|
||||
tmpfilename = kwargs.get("tmpfilename", "BADNAME")
|
||||
|
||||
node_outfile = os.path.join(conf.get_opt("working_dir"),
|
||||
args.session, args.volume,
|
||||
tmpfilename,
|
||||
"tmp_output_%s" % num)
|
||||
|
||||
if task == "pre":
|
||||
@ -117,6 +128,9 @@ def run_cmd_nodes(task, args, **kwargs):
|
||||
tag = '""' if not is_host_local(host_uuid) else ""
|
||||
|
||||
node_outfiles.append(node_outfile)
|
||||
# remote file will be copied into this directory
|
||||
mkdirp(os.path.dirname(node_outfile),
|
||||
exit_on_err=True, logger=logger)
|
||||
|
||||
cmd = [change_detector,
|
||||
args.session,
|
||||
@ -144,6 +158,9 @@ def run_cmd_nodes(task, args, **kwargs):
|
||||
tag = '""' if not is_host_local(host_uuid) else ""
|
||||
|
||||
node_outfiles.append(node_outfile)
|
||||
# remote file will be copied into this directory
|
||||
mkdirp(os.path.dirname(node_outfile),
|
||||
exit_on_err=True, logger=logger)
|
||||
|
||||
cmd = [change_detector,
|
||||
args.session,
|
||||
@ -162,8 +179,9 @@ def run_cmd_nodes(task, args, **kwargs):
|
||||
opts["node_outfile"] = node_outfile
|
||||
opts["copy_outfile"] = True
|
||||
elif task == "cleanup":
|
||||
# After pre run, cleanup the working directory and other temp files
|
||||
# Remove the copied node_outfile in main node
|
||||
# After pre/query run, cleanup the working directory and other
|
||||
# temp files. Remove the directory to which node_outfile has
|
||||
# been copied in main node
|
||||
try:
|
||||
os.remove(node_outfile)
|
||||
except (OSError, IOError):
|
||||
@ -174,7 +192,9 @@ def run_cmd_nodes(task, args, **kwargs):
|
||||
cmd = [conf.get_opt("nodeagent"),
|
||||
"cleanup",
|
||||
args.session,
|
||||
args.volume] + (["--debug"] if args.debug else [])
|
||||
args.volume,
|
||||
os.path.dirname(node_outfile)] + \
|
||||
(["--debug"] if args.debug else [])
|
||||
elif task == "create":
|
||||
if vol_statusStr != "Started":
|
||||
fail("Volume %s is not online" % args.volume,
|
||||
@ -422,8 +442,8 @@ def enable_volume_options(args):
|
||||
% args.volume)
|
||||
|
||||
|
||||
def write_output(args, outfilemerger):
|
||||
with codecs.open(args.outfile, "a", encoding="utf-8") as f:
|
||||
def write_output(outfile, outfilemerger):
|
||||
with codecs.open(outfile, "a", encoding="utf-8") as f:
|
||||
for row in outfilemerger.get():
|
||||
# Multiple paths in case of Hardlinks
|
||||
paths = row[1].split(",")
|
||||
@ -438,9 +458,10 @@ def write_output(args, outfilemerger):
|
||||
if p_rep == row_2_rep:
|
||||
continue
|
||||
|
||||
f.write(u"{0} {1} {2}\n".format(row[0],
|
||||
p_rep,
|
||||
row_2_rep))
|
||||
if row_2_rep and row_2_rep != "":
|
||||
f.write(u"{0} {1} {2}\n".format(row[0], p_rep, row_2_rep))
|
||||
else:
|
||||
f.write(u"{0} {1}\n".format(row[0], p_rep))
|
||||
|
||||
|
||||
def mode_create(session_dir, args):
|
||||
@ -490,6 +511,8 @@ def mode_create(session_dir, args):
|
||||
|
||||
|
||||
def mode_query(session_dir, args):
|
||||
global gtmpfilename
|
||||
|
||||
# Verify volume status
|
||||
cmd = ["gluster", 'volume', 'info', args.volume, "--xml"]
|
||||
_, data, _ = execute(cmd,
|
||||
@ -533,7 +556,10 @@ def mode_query(session_dir, args):
|
||||
"Start time: %s"
|
||||
% ("default", args.volume, start))
|
||||
|
||||
run_cmd_nodes("query", args, start=start)
|
||||
prefix = datetime.now().strftime("%Y%m%d-%H%M%S-%f-")
|
||||
gtmpfilename = prefix + next(tempfile._get_candidate_names())
|
||||
|
||||
run_cmd_nodes("query", args, start=start, tmpfilename=gtmpfilename)
|
||||
|
||||
# Merger
|
||||
if args.full:
|
||||
@ -545,7 +571,7 @@ def mode_query(session_dir, args):
|
||||
# Read each Changelogs db and generate finaldb
|
||||
create_file(args.outfile, exit_on_err=True, logger=logger)
|
||||
outfilemerger = OutputMerger(args.outfile + ".db", node_outfiles)
|
||||
write_output(args, outfilemerger)
|
||||
write_output(args.outfile, outfilemerger)
|
||||
|
||||
try:
|
||||
os.remove(args.outfile + ".db")
|
||||
@ -558,6 +584,8 @@ def mode_query(session_dir, args):
|
||||
|
||||
|
||||
def mode_pre(session_dir, args):
|
||||
global gtmpfilename
|
||||
|
||||
"""
|
||||
Read from Session file and write to session.pre file
|
||||
"""
|
||||
@ -587,7 +615,10 @@ def mode_pre(session_dir, args):
|
||||
"Start time: %s, End time: %s"
|
||||
% (args.session, args.volume, start, endtime_to_update))
|
||||
|
||||
run_cmd_nodes("pre", args, start=start)
|
||||
prefix = datetime.now().strftime("%Y%m%d-%H%M%S-%f-")
|
||||
gtmpfilename = prefix + next(tempfile._get_candidate_names())
|
||||
|
||||
run_cmd_nodes("pre", args, start=start, tmpfilename=gtmpfilename)
|
||||
|
||||
# Merger
|
||||
if args.full:
|
||||
@ -599,8 +630,7 @@ def mode_pre(session_dir, args):
|
||||
# Read each Changelogs db and generate finaldb
|
||||
create_file(args.outfile, exit_on_err=True, logger=logger)
|
||||
outfilemerger = OutputMerger(args.outfile + ".db", node_outfiles)
|
||||
|
||||
write_output(args, outfilemerger)
|
||||
write_output(args.outfile, outfilemerger)
|
||||
|
||||
try:
|
||||
os.remove(args.outfile + ".db")
|
||||
@ -713,6 +743,10 @@ def mode_list(session_dir, args):
|
||||
|
||||
|
||||
def main():
|
||||
global gtmpfilename
|
||||
|
||||
args = None
|
||||
|
||||
try:
|
||||
args = _get_args()
|
||||
mkdirp(conf.get_opt("session_dir"), exit_on_err=True)
|
||||
@ -756,5 +790,13 @@ def main():
|
||||
# mode_<args.mode> will be the function name to be called
|
||||
globals()["mode_" + args.mode](session_dir, args)
|
||||
except KeyboardInterrupt:
|
||||
if args is not None:
|
||||
if args.mode == "pre" or args.mode == "query":
|
||||
# cleanup session
|
||||
if gtmpfilename is not None:
|
||||
# no more interrupts until we clean up
|
||||
signal.signal(signal.SIGINT, signal.SIG_IGN)
|
||||
run_cmd_nodes("cleanup", args, tmpfilename=gtmpfilename)
|
||||
|
||||
# Interrupted, exit with non zero error code
|
||||
sys.exit(2)
|
||||
|
@ -26,7 +26,8 @@ logger = logging.getLogger()
|
||||
def mode_cleanup(args):
|
||||
working_dir = os.path.join(conf.get_opt("working_dir"),
|
||||
args.session,
|
||||
args.volume)
|
||||
args.volume,
|
||||
args.tmpfilename)
|
||||
|
||||
mkdirp(os.path.join(conf.get_opt("log_dir"), args.session, args.volume),
|
||||
exit_on_err=True)
|
||||
@ -98,6 +99,7 @@ def _get_args():
|
||||
parser_cleanup = subparsers.add_parser('cleanup')
|
||||
parser_cleanup.add_argument("session", help="Session Name")
|
||||
parser_cleanup.add_argument("volume", help="Volume Name")
|
||||
parser_cleanup.add_argument("tmpfilename", help="Temporary File Name")
|
||||
parser_cleanup.add_argument("--debug", help="Debug", action="store_true")
|
||||
|
||||
parser_session_create = subparsers.add_parser('create')
|
||||
|
@ -227,7 +227,7 @@ def get_changelog_rollover_time(volumename):
|
||||
|
||||
try:
|
||||
tree = etree.fromstring(out)
|
||||
return int(tree.find('volGetopts/Value').text)
|
||||
return int(tree.find('volGetopts/Opt/Value').text)
|
||||
except ParseError:
|
||||
return DEFAULT_CHANGELOG_INTERVAL
|
||||
|
||||
|
Loading…
x
Reference in New Issue
Block a user