geo-rep: Refactoring Config and Arguments parsing

- Fixed Python pep8 issues
- Removed dead code
- Rewritten configuration management
- Rewritten Arguments/subcommands handling
- Added Args upgrade to accommodate all these changes without changing
  glusterd code
- use of md5 removed, which was used to hash the brick path for workdir

Both Master and Slave nodes will have subdir for session in the
format "<mastervol>_<primary_slave_host>_<slavevol>

  $GLUSTER_LOGDIR/geo-replication/<mastervol>_<primary_slave_host>_<slavevol>
  $GLUSTER_LOGDIR/geo-replication-slaves/<mastervol>_<primary_slave_host>_<slavevol>

Log file paths renamed since session info is available with directory
name itself.

  $LOG_DIR_MASTER/
      - gsyncd.log - Gsyncd, Worker monitor logs
      - mnt-<brick-path>.log - Aux mount logs, mounted by each worker
      - changes-<brick-path>.log - Changelog related logs(One per brick)

  $LOG_DIR_SLAVE/
      - gsyncd.log - Slave Gsyncd logs
      - mnt-<master-node>-<master-brick-path>.log - Aux mount logs,
        mounted for each connection from master-node:master-brick
      - mnt-mbr-<master-node>-<master-brick-path>.log - Same as above,
        but mountbroker setup

Fixes: #73
Change-Id: I2ec2a21e4e2a92fd92899d026e8543725276f021
Signed-off-by: Aravinda VK <avishwan@redhat.com>
This commit is contained in:
Aravinda VK 2017-06-21 12:56:14 +05:30 committed by Amar Tumballi
parent da825da950
commit 705ec05504
20 changed files with 2621 additions and 2324 deletions

1
.gitignore vendored
View File

@ -64,6 +64,7 @@ extras/systemd/glusterd.service
extras/systemd/glusterfssharedstorage.service
extras/who-wrote-glusterfs/gitdm
geo-replication/.tox
geo-replication/gsyncd.conf
geo-replication/src/gsyncd
geo-replication/src/peer_gsec_create
geo-replication/src/peer_mountbroker

View File

@ -45,6 +45,7 @@ AC_CONFIG_FILES([Makefile
geo-replication/src/peer_georep-sshkey.py
extras/peer_add_secret_pub
geo-replication/syncdaemon/conf.py
geo-replication/gsyncd.conf
extras/snap_scheduler/conf.py
glusterfsd/Makefile
glusterfsd/src/Makefile

View File

@ -1,3 +1,8 @@
SUBDIRS = syncdaemon src
CLEANFILES =
EXTRA_DIST = gsyncd.conf.in
gsyncdconfdir = $(sysconfdir)/glusterfs/
gsyncdconf_DATA = gsyncd.conf

View File

@ -0,0 +1,301 @@
[__meta__]
version = 4.0
[glusterd-workdir]
value = @GLUSTERD_WORKDIR@
[gluster-logdir]
value = /var/log/glusterfs
[gluster-rundir]
value = /var/run/gluster
[gsyncd-miscdir]
value = /var/lib/misc/gluster/gsyncd
[stime-xattr-prefix]
value=
[checkpoint]
value=0
help=Set Checkpoint
validation=unixtime
type=int
[gluster-cli-options]
value=
help=Gluster CLI Options
[pid-file]
value=${gluster_rundir}/gsyncd-${master}-${primary_slave_host}-${slavevol}.pid
configurable=false
template = true
help=PID file path
[state-file]
value=${glusterd_workdir}/geo-replication/${master}_${primary_slave_host}_${slavevol}/monitor.status
configurable=false
template=true
help=Status File path
[georep-session-working-dir]
value=${glusterd_workdir}/geo-replication/${master}_${primary_slave_host}_${slavevol}/
template=true
help=Session Working directory
configurable=false
[access-mount]
value=false
help=Do not unmount the Aux mounts when stopped or crash
[isolated-slaves]
value=
help=List of Slave nodes which are isolated
[changelog-batch-size]
# Max size of Changelogs to process per batch, Changelogs Processing is
# not limited by the number of changelogs but instead based on
# size of the changelog file, One sample changelog file size was 145408
# with ~1000 CREATE and ~1000 DATA. 5 such files in one batch is 727040
# If geo-rep worker crashes while processing a batch, it has to retry only
# that batch since stime will get updated after each batch.
value=727040
help=Max size of Changelogs to process per batch.
type=int
[slave-timeout]
value=120
type=int
help=Timeout in seconds for Slave Gsyncd. If no activity from master for this timeout, Slave gsyncd will be disconnected. Set Timeout to zero to skip this check.
[connection-timeout]
value=60
type=int
help=Timeout for mounts
[replica-failover-interval]
value=1
type=int
help=Minimum time interval in seconds for passive worker to become Active
[chnagelog-archive-format]
value=%Y%m
help=Processed changelogs will be archived in working directory. Pattern for archive file
[use-meta-volume]
value=false
type=bool
help=Use this to set Active Passive mode to meta-volume.
[meta-volume-mnt]
value=/var/run/gluster/shared_storage
help=Meta Volume or Shared Volume mount path
[allow-network]
value=
[change-interval]
value=5
type=int
[use-tarssh]
value=false
type=bool
help=Use sync-mode as tarssh
[remote-gsyncd]
value =
help=If SSH keys are not secured with gsyncd prefix then use this configuration to set the actual path of gsyncd(Usually /usr/libexec/glusterfs/gsyncd)
[gluster-command-dir]
value=/usr/local/sbin/
help=Directory where Gluster binary exists
[gluster-params]
value = aux-gfid-mount acl
help=Parameters for Gluster Geo-rep mount in Master
[slave-gluster-params]
value = aux-gfid-mount acl
help=Parameters for Gluster Geo-rep mount in Slave
[ignore-deletes]
value = false
type=bool
help=Do not sync deletes in Slave
[special-sync-mode]
# tunables for failover/failback mechanism:
# None - gsyncd behaves as normal
# blind - gsyncd works with xtime pairs to identify
# candidates for synchronization
# wrapup - same as normal mode but does not assign
# xtimes to orphaned files
# see crawl() for usage of the above tunables
value =
help=
[working-dir]
value = ${gsyncd_miscdir}/${master}_${primary_slave_host}_${slavevol}/
template=true
configurable=false
help=Working directory for storing Changelogs
[change-detector]
value=changelog
help=Change detector
validation=choice
allowed_values=changelog,xsync
[cli-log-file]
value=${gluster_logdir}/geo-replication/cli.log
template=true
configurable=false
[cli-log-level]
value=INFO
help=Set CLI Log Level
validation=choice
allowed_values=ERROR,INFO,WARNING,DEBUG
[log-file]
value=${gluster_logdir}/geo-replication/${master}_${primary_slave_host}_${slavevol}/gsyncd.log
configurable=false
template=true
[changelog-log-file]
value=${gluster_logdir}/geo-replication/${master}_${primary_slave_host}_${slavevol}/changes-${local_id}.log
configurable=false
template=true
[gluster-log-file]
value=${gluster_logdir}/geo-replication/${master}_${primary_slave_host}_${slavevol}/mnt-${local_id}.log
template=true
configurable=false
[slave-log-file]
value=${gluster_logdir}/geo-replication-slaves/${master}_${primary_slave_host}_${slavevol}/gsyncd.log
template=true
configurable=false
[slave-gluster-log-file]
value=${gluster_logdir}/geo-replication-slaves/${master}_${primary_slave_host}_${slavevol}/mnt-${master_node}-${master_brick_id}.log
template=true
configurable=false
[slave-gluster-log-file-mbr]
value=${gluster_logdir}/geo-replication-slaves/${master}_${primary_slave_host}_${slavevol}/mnt-mbr-${master_node}-${master_brick_id}.log
template=true
configurable=false
[log-level]
value=INFO
help=Set Log Level
validation=choice
allowed_values=ERROR,INFO,WARNING,DEBUG
[gluster-log-level]
value=INFO
help=Set Gluster mount Log Level
validation=choice
allowed_values=ERROR,INFO,WARNING,DEBUG
[changelog-log-level]
value=INFO
help=Set Changelog Log Level
validation=choice
allowed_values=ERROR,INFO,WARNING,DEBUG
[slave-log-level]
value=INFO
help=Set Slave Gsyncd Log Level
validation=choice
allowed_values=ERROR,INFO,WARNING,DEBUG
[slave-gluster-log-level]
value=INFO
help=Set Slave Gluster mount Log Level
validation=choice
allowed_values=ERROR,INFO,WARNING,DEBUG
[ssh-port]
value=22
validation=int
help=Set SSH port
[ssh-command]
value=ssh
help=Set ssh binary path
validation=execpath
[tar-command]
value=tar
help=Set tar command path
validation=execpath
[ssh-options]
value = -oPasswordAuthentication=no -oStrictHostKeyChecking=no -i ${glusterd_workdir}/geo-replication/secret.pem
template=true
[ssh-options-tar]
value = -oPasswordAuthentication=no -oStrictHostKeyChecking=no -i ${glusterd_workdir}/geo-replication/tar_ssh.pem
template=true
[gluster-command]
value=gluster
help=Set gluster binary path
validation=execpath
[sync-jobs]
value=3
help=Number of Syncer jobs
validation=minmax
min=1
max=100
type=int
[rsync-command]
value=rsync
help=Set rsync command path
validation=execpath
[rsync-options]
value=
[rsync-ssh-options]
value=
[rsync-opt-ignore-missing-args]
value=
[rsync-opt-existing]
value=
[log-rsync-performance]
value=false
help=Log Rsync performance
validation=bool
type=bool
[use-rsync-xattrs]
value=false
type=bool
[sync-xattrs]
value=true
type=bool
[sync-acls]
value=true
type=bool
[max-rsync-retries]
value=10
type=int
[changelog-archive-format]
value=%Y%m
[state_socket_unencoded]
# Unused, For backward compatibility
value=

View File

@ -1,8 +1,8 @@
syncdaemondir = $(GLUSTERFS_LIBEXECDIR)/python/syncdaemon
syncdaemon_PYTHON = gconf.py gsyncd.py __init__.py master.py README.md repce.py \
resource.py configinterface.py syncdutils.py monitor.py libcxattr.py \
syncdaemon_PYTHON = rconf.py gsyncd.py __init__.py master.py README.md repce.py \
resource.py syncdutils.py monitor.py libcxattr.py gsyncdconfig.py \
$(top_builddir)/contrib/ipaddr-py/ipaddr.py libgfchangelog.py changelogagent.py \
gsyncdstatus.py conf.py
gsyncdstatus.py conf.py logutils.py subcmds.py argsupgrade.py
CLEANFILES =

View File

@ -0,0 +1,335 @@
# Converts old style args into new style args
import sys
from argparse import ArgumentParser
import socket
import os
from syncdutils import GsyncdError
from conf import GLUSTERD_WORKDIR
def gethostbyname(hnam):
"""gethostbyname wrapper"""
try:
return socket.gethostbyname(hnam)
except socket.gaierror:
ex = sys.exc_info()[1]
raise GsyncdError("failed to resolve %s: %s" %
(hnam, ex.strerror))
def slave_url(urldata):
urldata = urldata.replace("ssh://", "")
host, vol = urldata.split("::")
vol = vol.split(":")[0]
return "%s::%s" % (host, vol)
def init_gsyncd_template_conf():
path = GLUSTERD_WORKDIR + "/geo-replication/gsyncd_template.conf"
dname = os.path.dirname(path)
if not os.path.exists(dname):
try:
os.mkdir(dname)
except OSError:
pass
if not os.path.exists(path):
fd = os.open(path, os.O_CREAT | os.O_RDWR)
os.close(fd)
def init_gsyncd_session_conf(master, slave):
slave = slave_url(slave)
master = master.strip(":")
slavehost, slavevol = slave.split("::")
slavehost = slavehost.split("@")[-1]
# Session Config File
path = "%s/geo-replication/%s_%s_%s/gsyncd.conf" % (
GLUSTERD_WORKDIR, master, slavehost, slavevol)
if os.path.exists(os.path.dirname(path)) and not os.path.exists(path):
fd = os.open(path, os.O_CREAT | os.O_RDWR)
os.close(fd)
def init_gsyncd_conf(path):
dname = os.path.dirname(path)
if not os.path.exists(dname):
try:
os.mkdir(dname)
except OSError:
pass
if os.path.exists(dname) and not os.path.exists(path):
fd = os.open(path, os.O_CREAT | os.O_RDWR)
os.close(fd)
def upgrade():
# Create dummy template conf(empty), hack to avoid glusterd
# fail when it does stat to check the existence.
init_gsyncd_template_conf()
if "--monitor" in sys.argv:
# python gsyncd.py --path=/bricks/b1
# --monitor -c gsyncd.conf
# --iprefix=/var :gv1
# --glusterd-uuid=f26ac7a8-eb1b-4ea7-959c-80b27d3e43d0
# f241::gv2
p = ArgumentParser()
p.add_argument("master")
p.add_argument("slave")
p.add_argument("--glusterd-uuid")
p.add_argument("-c")
p.add_argument("--iprefix")
p.add_argument("--path", action="append")
pargs = p.parse_known_args(sys.argv[1:])[0]
# Overwrite the sys.argv after rearrange
init_gsyncd_session_conf(pargs.master, pargs.slave)
sys.argv = [
sys.argv[0],
"monitor",
pargs.master.strip(":"),
slave_url(pargs.slave),
"--local-node-id",
pargs.glusterd_uuid
]
elif "--status-get" in sys.argv:
# -c gsyncd.conf --iprefix=/var :gv1 f241::gv2
# --status-get --path /bricks/b1
p = ArgumentParser()
p.add_argument("master")
p.add_argument("slave")
p.add_argument("-c")
p.add_argument("--path")
p.add_argument("--iprefix")
pargs = p.parse_known_args(sys.argv[1:])[0]
init_gsyncd_session_conf(pargs.master, pargs.slave)
sys.argv = [
sys.argv[0],
"status",
pargs.master.strip(":"),
slave_url(pargs.slave),
"--local-path",
pargs.path
]
elif "--canonicalize-url" in sys.argv:
# This can accept multiple URLs and converts each URL to the
# format ssh://USER@IP:gluster://127.0.0.1:VOLUME
# This format not used in gsyncd, but added for glusterd compatibility
p = ArgumentParser()
p.add_argument("--canonicalize-url", nargs="+")
pargs = p.parse_known_args(sys.argv[1:])[0]
for url in pargs.canonicalize_url:
host, vol = url.split("::")
host = host.replace("ssh://", "")
remote_addr = host
if "@" not in remote_addr:
remote_addr = "root@" + remote_addr
user, hname = remote_addr.split("@")
print("ssh://%s@%s:gluster://127.0.0.1:%s" % (
user, gethostbyname(hname), vol))
sys.exit(0)
elif "--normalize-url" in sys.argv:
# Adds schema prefix as ssh://
# This format not used in gsyncd, but added for glusterd compatibility
p = ArgumentParser()
p.add_argument("--normalize-url")
pargs = p.parse_known_args(sys.argv[1:])[0]
print("ssh://%s" % slave_url(pargs.normalize_url))
sys.exit(0)
elif "--config-get-all" in sys.argv:
# -c gsyncd.conf --iprefix=/var :gv1 f241::gv2 --config-get-all
p = ArgumentParser()
p.add_argument("master")
p.add_argument("slave")
p.add_argument("-c")
p.add_argument("--iprefix")
pargs = p.parse_known_args(sys.argv[1:])[0]
init_gsyncd_session_conf(pargs.master, pargs.slave)
sys.argv = [
sys.argv[0],
"config-get",
pargs.master.strip(":"),
slave_url(pargs.slave),
"--show-defaults",
"--use-underscore"
]
elif "--verify" in sys.argv and "spawning" in sys.argv:
# Just checks that able to spawn gsyncd or not
sys.exit(0)
elif "--slavevoluuid-get" in sys.argv:
# --slavevoluuid-get f241::gv2
p = ArgumentParser()
p.add_argument("--slavevoluuid-get")
p.add_argument("-c")
p.add_argument("--iprefix")
pargs = p.parse_known_args(sys.argv[1:])[0]
host, vol = pargs.slavevoluuid_get.split("::")
# Modified sys.argv
sys.argv = [
sys.argv[0],
"voluuidget",
host,
vol
]
elif "--config-set-rx" in sys.argv:
# Not required since default conf is not generated
# and custom conf generated only when required
# -c gsyncd.conf --config-set-rx remote-gsyncd
# /usr/local/libexec/glusterfs/gsyncd . .
# Touch the gsyncd.conf file and create session
# directory if required
p = ArgumentParser()
p.add_argument("-c", dest="config_file")
pargs = p.parse_known_args(sys.argv[1:])[0]
# If not template conf then it is trying to create
# session config, create a empty file instead
if pargs.config_file.endswith("gsyncd.conf"):
init_gsyncd_conf(pargs.config_file)
sys.exit(0)
elif "--create" in sys.argv:
# To update monitor status file
# --create Created -c gsyncd.conf
# --iprefix=/var :gv1 f241::gv2
p = ArgumentParser()
p.add_argument("--create")
p.add_argument("master")
p.add_argument("slave")
p.add_argument("-c")
p.add_argument("--iprefix")
pargs = p.parse_known_args(sys.argv[1:])[0]
init_gsyncd_session_conf(pargs.master, pargs.slave)
# Modified sys.argv
sys.argv = [
sys.argv[0],
"monitor-status",
pargs.master.strip(":"),
slave_url(pargs.slave),
pargs.create
]
elif "--config-get" in sys.argv:
# -c gsyncd.conf --iprefix=/var :gv1 f241::gv2 --config-get pid-file
p = ArgumentParser()
p.add_argument("--config-get")
p.add_argument("master")
p.add_argument("slave")
p.add_argument("-c")
p.add_argument("--iprefix")
pargs = p.parse_known_args(sys.argv[1:])[0]
init_gsyncd_session_conf(pargs.master, pargs.slave)
# Modified sys.argv
sys.argv = [
sys.argv[0],
"config-get",
pargs.master.strip(":"),
slave_url(pargs.slave),
"--only-value",
"--show-defaults",
"--name",
pargs.config_get.replace("_", "-")
]
elif "--config-set" in sys.argv:
# ignore session-owner
if "session-owner" in sys.argv:
sys.exit(0)
# --path=/bricks/b1 -c gsyncd.conf :gv1 f241::gv2
# --config-set log_level DEBUG
p = ArgumentParser()
p.add_argument("master")
p.add_argument("slave")
p.add_argument("--config-set", nargs=2)
p.add_argument("-c")
pargs = p.parse_known_args(sys.argv[1:])[0]
init_gsyncd_session_conf(pargs.master, pargs.slave)
# Modified sys.argv
sys.argv = [
sys.argv[0],
"config-set",
pargs.master.strip(":"),
slave_url(pargs.slave),
pargs.config_set[0],
pargs.config_set[1]
]
elif "--config-check" in sys.argv:
# --config-check georep_session_working_dir
p = ArgumentParser()
p.add_argument("--config-check")
p.add_argument("-c")
pargs = p.parse_known_args(sys.argv[1:])[0]
# Modified sys.argv
sys.argv = [
sys.argv[0],
"config-check",
pargs.config_check.replace("_", "-")
]
elif "--config-del" in sys.argv:
# -c gsyncd.conf --iprefix=/var :gv1 f241::gv2 --config-del log_level
p = ArgumentParser()
p.add_argument("--config-del")
p.add_argument("master")
p.add_argument("slave")
p.add_argument("-c")
p.add_argument("--iprefix")
pargs = p.parse_known_args(sys.argv[1:])[0]
init_gsyncd_session_conf(pargs.master, pargs.slave)
# Modified sys.argv
sys.argv = [
sys.argv[0],
"config-reset",
pargs.master.strip(":"),
slave_url(pargs.slave),
pargs.config_del.replace("_", "-")
]
elif "--delete" in sys.argv:
# --delete -c gsyncd.conf --iprefix=/var
# --path-list=--path=/bricks/b1 :gv1 f241::gv2
p = ArgumentParser()
p.add_argument("--reset-sync-time", action="store_true")
p.add_argument("--path-list")
p.add_argument("master")
p.add_argument("slave")
p.add_argument("--iprefix")
p.add_argument("-c")
pargs = p.parse_known_args(sys.argv[1:])[0]
init_gsyncd_session_conf(pargs.master, pargs.slave)
paths = pargs.path_list.split("--path=")
paths = ["--path=%s" % x.strip() for x in paths if x.strip() != ""]
# Modified sys.argv
sys.argv = [
sys.argv[0],
"delete",
pargs.master.strip(":"),
slave_url(pargs.slave)
]
sys.argv += paths
if pargs.reset_sync_time:
sys.argv.append("--reset-sync-time")

View File

@ -9,7 +9,6 @@
# cases as published by the Free Software Foundation.
#
import os
import logging
import syncdutils
from syncdutils import select, CHANGELOG_AGENT_SERVER_VERSION

View File

@ -13,3 +13,5 @@ GLUSTERD_WORKDIR = "@GLUSTERD_WORKDIR@"
LOCALSTATEDIR = "@localstatedir@"
UUID_FILE = "@GLUSTERD_WORKDIR@/glusterd.info"
GLUSTERFS_CONFDIR = "@SYSCONF_DIR@/glusterfs"
GCONF_VERSION = 4.0

View File

@ -1,434 +0,0 @@
#
# Copyright (c) 2011-2014 Red Hat, Inc. <http://www.redhat.com>
# This file is part of GlusterFS.
# This file is licensed to you under your choice of the GNU Lesser
# General Public License, version 3 or any later version (LGPLv3 or
# later), or the GNU General Public License, version 2 (GPLv2), in all
# cases as published by the Free Software Foundation.
#
try:
import ConfigParser
except ImportError:
# py 3
import configparser as ConfigParser
import re
from string import Template
import os
import errno
import sys
from stat import ST_DEV, ST_INO, ST_MTIME
import tempfile
import shutil
from syncdutils import escape, unescape, norm, update_file, GsyncdError
from conf import GLUSTERD_WORKDIR, LOCALSTATEDIR
SECT_ORD = '__section_order__'
SECT_META = '__meta__'
config_version = 2.0
re_type = type(re.compile(''))
TMPL_CONFIG_FILE = GLUSTERD_WORKDIR + "/geo-replication/gsyncd_template.conf"
# (SECTION, OPTION, OLD VALUE, NEW VALUE)
CONFIGS = (
("peersrx . .",
"georep_session_working_dir",
"",
GLUSTERD_WORKDIR + "/geo-replication/${mastervol}_${remotehost}_"
"${slavevol}/"),
("peersrx .",
"gluster_params",
"aux-gfid-mount xlator-option=\*-dht.assert-no-child-down=true",
"aux-gfid-mount"),
("peersrx .",
"gluster_params",
"aux-gfid-mount",
"aux-gfid-mount acl"),
("peersrx . .",
"ssh_command_tar",
"",
"ssh -oPasswordAuthentication=no -oStrictHostKeyChecking=no "
"-i " + GLUSTERD_WORKDIR + "/geo-replication/tar_ssh.pem"),
("peersrx . .",
"changelog_log_file",
"",
"${iprefix}/log/glusterfs/geo-replication/${mastervol}"
"/${eSlave}${local_id}-changes.log"),
("peersrx . .",
"working_dir",
LOCALSTATEDIR + "/run/gluster/${mastervol}/${eSlave}",
"${iprefix}/lib/misc/glusterfsd/${mastervol}/${eSlave}"),
("peersrx . .",
"ignore_deletes",
"true",
"false"),
("peersrx . .",
"pid-file",
GLUSTERD_WORKDIR + "/geo-replication/${mastervol}_${remotehost}_"
"${slavevol}/${eSlave}.pid",
GLUSTERD_WORKDIR + "/geo-replication/${mastervol}_${remotehost}_"
"${slavevol}/monitor.pid"),
("peersrx . .",
"state-file",
GLUSTERD_WORKDIR + "/geo-replication/${mastervol}_${remotehost}_"
"${slavevol}/${eSlave}.status",
GLUSTERD_WORKDIR + "/geo-replication/${mastervol}_${remotehost}_"
"${slavevol}/monitor.status"),
("peersrx .",
"log_file",
"${iprefix}/log/glusterfs/geo-replication-slaves/${session_owner}:${eSlave}.log",
"${iprefix}/log/glusterfs/geo-replication-slaves/${session_owner}:${local_node}${local_id}.${slavevol}.log"),
("peersrx .",
"log_file_mbr",
"${iprefix}/log/glusterfs/geo-replication-slaves/mbr/${session_owner}:${eSlave}.log",
"${iprefix}/log/glusterfs/geo-replication-slaves/mbr/${session_owner}:${local_node}${local_id}.${slavevol}.log"),
("peersrx .",
"gluster_log_file",
"${iprefix}/log/glusterfs/geo-replication-slaves/${session_owner}:${eSlave}.gluster.log",
"${iprefix}/log/glusterfs/geo-replication-slaves/${session_owner}:${local_node}${local_id}.${slavevol}.gluster.log")
)
def upgrade_config_file(path, confdata):
config_change = False
config = ConfigParser.RawConfigParser()
# If confdata.rx present then glusterd is adding config values,
# it will create config file if not exists. config.read is fine in
# this case since any other error will be raised during write.
if getattr(confdata, "rx", False):
config.read(path)
else:
with open(path) as fp:
config.readfp(fp)
for sec, opt, oldval, newval in CONFIGS:
try:
val = config.get(sec, opt)
except ConfigParser.NoOptionError:
# if new config opt not exists
config_change = True
config.set(sec, opt, newval)
continue
except ConfigParser.Error:
"""
When gsyncd invoked at the time of create, config file
will not be their. Ignore any ConfigParser errors
"""
continue
if val == newval:
# value is same as new val
continue
if val == oldval:
# config value needs update
config_change = True
config.set(sec, opt, newval)
# To convert from old peers section format to new peers section format.
# Old format: peers gluster://<master ip>:<master vol> \
# ssh://root@<slave ip>:gluster://<master ip>:<slave vol>
# New format: peers <master vol name> <slave vol name>
for old_sect in config.sections():
if old_sect.startswith("peers "):
peers_data = old_sect.split(" ")
mvol = peers_data[1].split("%3A")[-1]
svol = peers_data[2].split("%3A")[-1]
new_sect = "peers {0} {1}".format(mvol, svol)
if old_sect == new_sect:
# Already in new format "peers mastervol slavevol"
continue
# Create new section if not exists
try:
config.add_section(new_sect)
except ConfigParser.DuplicateSectionError:
pass
config_change = True
# Add all the items of old_sect to new_sect
for key, val in config.items(old_sect):
config.set(new_sect, key, val)
# Delete old section
config.remove_section(old_sect)
if config_change:
tempConfigFile = tempfile.NamedTemporaryFile(mode="wb", delete=False)
with open(tempConfigFile.name, 'wb') as configFile:
config.write(configFile)
# If src and dst are two different file system, then os.rename
# fails, In this case if temp file created in /tmp and if /tmp is
# separate fs then os.rename gives following error, so use shutil
# OSError: [Errno 18] Invalid cross-device link
# mail.python.org/pipermail/python-list/2005-February/342893.html
shutil.move(tempConfigFile.name, path)
class MultiDict(object):
"""a virtual dict-like class which functions as the union
of underlying dicts"""
def __init__(self, *dd):
self.dicts = dd
def __getitem__(self, key):
val = None
for d in self.dicts:
if d.get(key) is not None:
val = d[key]
if val is None:
raise KeyError(key)
return val
class GConffile(object):
"""A high-level interface to ConfigParser which flattens the two-tiered
config layout by implenting automatic section dispatch based on initial
parameters.
Also ensure section ordering in terms of their time of addition -- a compat
hack for Python < 2.7.
"""
def _normconfig(self):
"""normalize config keys by s/-/_/g"""
for n, s in self.config._sections.items():
if n.find('__') == 0:
continue
s2 = type(s)()
for k, v in s.items():
if k.find('__') != 0:
k = norm(k)
s2[k] = v
self.config._sections[n] = s2
def __init__(self, path, peers, confdata, *dd):
"""
- .path: location of config file
- .config: underlying ConfigParser instance
- .peers: on behalf of whom we flatten .config
(master, or master-slave url pair)
- .auxdicts: template subtituents
"""
self.peers = peers
self.path = path
self.auxdicts = dd
self.config = ConfigParser.RawConfigParser()
if getattr(confdata, "rx", False):
self.config.read(path)
else:
with open(path) as fp:
self.config.readfp(fp)
self.dev, self.ino, self.mtime = -1, -1, -1
self._normconfig()
def _load(self):
try:
sres = os.stat(self.path)
self.dev = sres[ST_DEV]
self.ino = sres[ST_INO]
self.mtime = sres[ST_MTIME]
except (OSError, IOError):
if sys.exc_info()[1].errno == errno.ENOENT:
sres = None
self.config = ConfigParser.RawConfigParser()
with open(self.path) as fp:
self.config.readfp(fp)
self._normconfig()
def get_realtime(self, opt, default_value=None):
try:
sres = os.stat(self.path)
except (OSError, IOError):
if sys.exc_info()[1].errno == errno.ENOENT:
sres = None
else:
raise
# compare file system stat with that of our stream file handle
if not sres or sres[ST_DEV] != self.dev or \
sres[ST_INO] != self.ino or self.mtime != sres[ST_MTIME]:
self._load()
return self.get(opt, printValue=False, default_value=default_value)
def section(self, rx=False):
"""get the section name of the section representing .peers
in .config"""
peers = self.peers
if not peers:
peers = ['.', '.']
rx = True
if rx:
return ' '.join(['peersrx'] + [escape(u) for u in peers])
else:
return ' '.join(['peers'] + [u.split(':')[-1] for u in peers])
@staticmethod
def parse_section(section):
"""retrieve peers sequence encoded by section name
(as urls or regexen, depending on section type)
"""
sl = section.split()
st = sl.pop(0)
sl = [unescape(u) for u in sl]
if st == 'peersrx':
sl = [re.compile(u) for u in sl]
return sl
def ord_sections(self):
"""Return an ordered list of sections.
Ordering happens based on the auxiliary
SECT_ORD section storing indices for each
section added through the config API.
To not to go corrupt in case of manually
written config files, we take care to append
also those sections which are not registered
in SECT_ORD.
Needed for python 2.{4,5,6} where ConfigParser
cannot yet order sections/options internally.
"""
so = {}
if self.config.has_section(SECT_ORD):
so = self.config._sections[SECT_ORD]
so2 = {}
for k, v in so.items():
if k != '__name__':
so2[k] = int(v)
tv = 0
if so2:
tv = max(so2.values()) + 1
ss = [s for s in self.config.sections() if s.find('__') != 0]
for s in ss:
if s in so.keys():
continue
so2[s] = tv
tv += 1
def scmp(x, y):
return cmp(*(so2[s] for s in (x, y)))
ss.sort(scmp)
return ss
def update_to(self, dct, allow_unresolved=False):
"""update @dct from key/values of ours.
key/values are collected from .config by filtering the regexp sections
according to match, and from .section. The values are treated as
templates, which are substituted from .auxdicts and (in case of regexp
sections) match groups.
"""
if not self.peers:
raise GsyncdError('no peers given, cannot select matching options')
def update_from_sect(sect, mud):
for k, v in self.config._sections[sect].items():
# Template expects String to be passed
# if any config value is not string then it
# fails with ValueError
v = "{0}".format(v)
if k == '__name__':
continue
if allow_unresolved:
dct[k] = Template(v).safe_substitute(mud)
else:
dct[k] = Template(v).substitute(mud)
for sect in self.ord_sections():
sp = self.parse_section(sect)
if isinstance(sp[0], re_type) and len(sp) == len(self.peers):
match = True
mad = {}
for i in range(len(sp)):
m = sp[i].search(self.peers[i])
if not m:
match = False
break
for j in range(len(m.groups())):
mad['match%d_%d' % (i + 1, j + 1)] = m.groups()[j]
if match:
update_from_sect(sect, MultiDict(dct, mad, *self.auxdicts))
if self.config.has_section(self.section()):
update_from_sect(self.section(), MultiDict(dct, *self.auxdicts))
def get(self, opt=None, printValue=True, default_value=None):
"""print the matching key/value pairs from .config,
or if @opt given, the value for @opt (according to the
logic described in .update_to)
"""
d = {}
self.update_to(d, allow_unresolved=True)
if opt:
opt = norm(opt)
v = d.get(opt, default_value)
if printValue:
if v is not None:
print(v)
else:
return v
else:
for k, v in d.iteritems():
if k == '__name__':
continue
print("%s: %s" % (k, v))
def write(self, trfn, opt, *a, **kw):
"""update on-disk config transactionally
@trfn is the transaction function
"""
def mergeconf(f):
self.config = ConfigParser.RawConfigParser()
self.config.readfp(f)
self._normconfig()
if not self.config.has_section(SECT_META):
self.config.add_section(SECT_META)
self.config.set(SECT_META, 'version', config_version)
return trfn(norm(opt), *a, **kw)
def updateconf(f):
self.config.write(f)
update_file(self.path, updateconf, mergeconf)
def _set(self, opt, val, rx=False):
"""set @opt to @val in .section"""
sect = self.section(rx)
if not self.config.has_section(sect):
self.config.add_section(sect)
# regarding SECT_ORD, cf. ord_sections
if not self.config.has_section(SECT_ORD):
self.config.add_section(SECT_ORD)
self.config.set(
SECT_ORD, sect, len(self.config._sections[SECT_ORD]))
self.config.set(sect, opt, val)
return True
def set(self, opt, *a, **kw):
"""perform ._set transactionally"""
self.write(self._set, opt, *a, **kw)
def _delete(self, opt, rx=False):
"""delete @opt from .section"""
sect = self.section(rx)
if self.config.has_section(sect):
return self.config.remove_option(sect, opt)
def delete(self, opt, *a, **kw):
"""perform ._delete transactionally"""
self.write(self._delete, opt, *a, **kw)

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,365 @@
from ConfigParser import ConfigParser, NoSectionError
import os
from string import Template
from datetime import datetime
# Global object which can be used in other modules
# once load_config is called
_gconf = {}
class GconfNotConfigurable(Exception):
pass
class GconfInvalidValue(Exception):
pass
class Gconf(object):
def __init__(self, default_conf_file, custom_conf_file=None,
args={}, extra_tmpl_args={}):
self.default_conf_file = default_conf_file
self.custom_conf_file = custom_conf_file
self.tmp_conf_file = None
self.gconf = {}
self.gconfdata = {}
self.gconf_typecast = {}
self.template_conf = []
self.non_configurable_configs = []
self.prev_mtime = 0
if custom_conf_file is not None:
self.tmp_conf_file = custom_conf_file + ".tmp"
self.session_conf_items = []
self.args = args
self.extra_tmpl_args = extra_tmpl_args
self._load()
def _tmpl_substitute(self):
tmpl_values = {}
for k, v in self.gconf.items():
tmpl_values[k.replace("-", "_")] = v
# override the config file values with the one user passed
for k, v in self.args.items():
# override the existing value only if set by user
if v is not None:
tmpl_values[k] = v
for k, v in self.extra_tmpl_args.items():
tmpl_values[k] = v
for k, v in self.gconf.items():
if k in self.template_conf and \
(isinstance(v, str) or isinstance(v, unicode)):
self.gconf[k] = Template(v).safe_substitute(tmpl_values)
def _do_typecast(self):
for k, v in self.gconf.items():
cast_func = globals().get(
"to_" + self.gconf_typecast.get(k, "string"), None)
if cast_func is not None:
self.gconf[k] = cast_func(v)
def reset(self, name):
# If custom conf file is not set then it is only read only configs
if self.custom_conf_file is None:
raise GconfNotConfigurable()
# If a config can not be modified
if name != "all" and not self._is_configurable(name):
raise GconfNotConfigurable()
cnf = ConfigParser()
with open(self.custom_conf_file) as f:
cnf.readfp(f)
# Nothing to Reset, Not configured
if name != "all":
if not cnf.has_option("vars", name):
return True
# Remove option from custom conf file
cnf.remove_option("vars", name)
else:
# Remove and add empty section, do not disturb if config file
# already has any other section
try:
cnf.remove_section("vars")
except NoSectionError:
pass
cnf.add_section("vars")
with open(self.tmp_conf_file, "w") as fw:
cnf.write(fw)
os.rename(self.tmp_conf_file, self.custom_conf_file)
self.reload()
return True
def set(self, name, value):
if self.custom_conf_file is None:
raise GconfNotConfigurable()
if not self._is_configurable(name):
raise GconfNotConfigurable()
if not self._is_valid_value(name, value):
raise GconfInvalidValue()
curr_val = self.gconf.get(name, None)
if curr_val == value:
return True
cnf = ConfigParser()
with open(self.custom_conf_file) as f:
cnf.readfp(f)
if not cnf.has_section("vars"):
cnf.add_section("vars")
cnf.set("vars", name, value)
with open(self.tmp_conf_file, "w") as fw:
cnf.write(fw)
os.rename(self.tmp_conf_file, self.custom_conf_file)
self.reload()
return True
def check(self, name, value=None, with_conffile=True):
if with_conffile and self.custom_conf_file is None:
raise GconfNotConfigurable()
if not self._is_configurable(name):
raise GconfNotConfigurable()
if value is not None and not self._is_valid_value(name, value):
raise GconfInvalidValue()
def _load(self):
self.gconf = {}
self.template_conf = []
self.gconf_typecast = {}
self.non_configurable_configs = []
self.session_conf_items = []
conf = ConfigParser()
# Default Template config file
with open(self.default_conf_file) as f:
conf.readfp(f)
# Custom Config file
if self.custom_conf_file is not None:
with open(self.custom_conf_file) as f:
conf.readfp(f)
# Get version from default conf file
self.version = conf.get("__meta__", "version")
# Populate default values
for sect in conf.sections():
if sect in ["__meta__", "vars"]:
continue
# Collect list of available options with help details
self.gconfdata[sect] = {}
for k, v in conf.items(sect):
self.gconfdata[sect][k] = v.strip()
# Collect the Type cast information
if conf.has_option(sect, "type"):
self.gconf_typecast[sect] = conf.get(sect, "type")
# Prepare list of configurable conf
if conf.has_option(sect, "configurable"):
if conf.get(sect, "configurable").lower() == "false":
self.non_configurable_configs.append(sect)
# if it is a template conf value which needs to be substituted
if conf.has_option(sect, "template"):
if conf.get(sect, "template").lower().strip() == "true":
self.template_conf.append(sect)
# Set default values
if conf.has_option(sect, "value"):
self.gconf[sect] = conf.get(sect, "value").strip()
# Load the custom conf elements and overwrite
if conf.has_section("vars"):
for k, v in conf.items("vars"):
self.session_conf_items.append(k)
self.gconf[k] = v.strip()
self._tmpl_substitute()
self._do_typecast()
def reload(self):
if self._is_config_changed():
self._load()
def get(self, name, default_value=None):
return self.gconf.get(name, default_value)
def getall(self, show_defaults=False, show_non_configurable=False):
cnf = {}
if not show_defaults:
for k in self.session_conf_items:
if k not in self.non_configurable_configs:
cnf[k] = self.get(k)
return cnf
# Show all configs including defaults
for k, v in self.gconf.items():
if show_non_configurable:
cnf[k] = v
else:
if k not in self.non_configurable_configs:
cnf[k] = v
return cnf
def getr(self, name, default_value=None):
self.reload()
return self.get(name, default_value)
def get_help(self, name=None):
pass
def _is_configurable(self, name):
item = self.gconfdata.get(name, None)
if item is None:
return False
return item.get("configurable", True)
def _is_valid_value(self, name, value):
item = self.gconfdata.get(name, None)
if item is None:
return False
# If validation func not defined
if item.get("validation", None) is None:
return True
# minmax validation
if item["validation"] == "minmax":
return validate_minmax(value, item["min"], item["max"])
if item["validation"] == "choice":
return validate_choice(value, item["allowed_values"])
if item["validation"] == "bool":
return validate_bool(value)
if item["validation"] == "execpath":
return validate_execpath(value)
if item["validation"] == "unixtime":
return validate_unixtime(value)
return False
def _is_config_changed(self):
if self.custom_conf_file is not None and \
os.path.exists(self.custom_conf_file):
st = os.lstat(self.custom_conf_file)
if st.st_mtime > self.prev_mtime:
self.prev_mtime = st.st_mtime
return True
return False
def validate_unixtime(value):
try:
y = datetime.fromtimestamp(int(value)).strftime("%Y")
if y == "1970":
return False
return True
except ValueError:
return False
def validate_minmax(value, minval, maxval):
value = int(value)
minval = int(minval)
maxval = int(maxval)
return value >= minval and value <= maxval
def validate_choice(value, allowed_values):
allowed_values = allowed_values.split(",")
allowed_values = [v.strip() for v in allowed_values]
return value in allowed_values
def validate_bool(value):
return value in ["true", "false"]
def validate_execpath(value):
return os.path.isfile(value) and os.access(value, os.X_OK)
def validate_filepath(value):
return os.path.isfile(value)
def validate_path(value):
return os.path.exists(value)
def to_int(value):
return int(value)
def to_float(value):
return float(value)
def to_bool(value):
return True if value == "true" else False
def get(name, default_value=None):
return _gconf.get(name, default_value)
def getall(show_defaults=False, show_non_configurable=False):
return _gconf.getall(show_defaults=show_defaults,
show_non_configurable=show_non_configurable)
def getr(name, default_value=None):
return _gconf.getr(name, default_value)
def load(default_conf, custom_conf=None, args={}, extra_tmpl_args={}):
global _gconf
_gconf = Gconf(default_conf, custom_conf, args, extra_tmpl_args)
def setconfig(name, value):
global _gconf
_gconf.set(name, value)
def resetconfig(name):
global _gconf
_gconf.reset(name)
def check(name, value=None, with_conffile=True):
global _gconf
_gconf.check(name, value=value, with_conffile=with_conffile)

View File

@ -9,13 +9,15 @@
#
import os
from ctypes import CDLL, RTLD_GLOBAL, create_string_buffer, get_errno, byref, c_ulong
from ctypes import CDLL, RTLD_GLOBAL, create_string_buffer, \
get_errno, byref, c_ulong
from ctypes.util import find_library
from syncdutils import ChangelogException, ChangelogHistoryNotAvailable
class Changes(object):
libgfc = CDLL(find_library("gfchangelog"), mode=RTLD_GLOBAL, use_errno=True)
libgfc = CDLL(find_library("gfchangelog"), mode=RTLD_GLOBAL,
use_errno=True)
@classmethod
def geterrno(cls):

View File

@ -0,0 +1,66 @@
import logging
from logging import Logger, handlers
import sys
import time
class GLogger(Logger):
"""Logger customizations for gsyncd.
It implements a log format similar to that of glusterfs.
"""
def makeRecord(self, name, level, *a):
rv = Logger.makeRecord(self, name, level, *a)
rv.nsecs = (rv.created - int(rv.created)) * 1000000
fr = sys._getframe(4)
callee = fr.f_locals.get('self')
if callee:
ctx = str(type(callee)).split("'")[1].split('.')[-1]
else:
ctx = '<top>'
if not hasattr(rv, 'funcName'):
rv.funcName = fr.f_code.co_name
rv.lvlnam = logging.getLevelName(level)[0]
rv.ctx = ctx
return rv
LOGFMT = ("[%(asctime)s.%(nsecs)d] %(lvlnam)s [%(module)s{0}"
":%(lineno)s:%(funcName)s] %(ctx)s: %(message)s")
def setup_logging(level="INFO", label="", log_file=""):
if label:
label = "(" + label + ")"
filename = None
stream = None
if log_file:
if log_file in ('-', '/dev/stderr'):
stream = sys.stderr
elif log_file == '/dev/stdout':
stream = sys.stdout
else:
filename = log_file
datefmt = "%Y-%m-%d %H:%M:%S"
fmt = LOGFMT.format(label)
logging.root = GLogger("root", level)
logging.setLoggerClass(GLogger)
logging.Formatter.converter = time.gmtime # Log in GMT/UTC time
logging.getLogger().handlers = []
logging.getLogger().setLevel(level)
if filename is not None:
logging_handler = handlers.WatchedFileHandler(filename)
formatter = logging.Formatter(fmt=fmt,
datefmt=datefmt)
logging_handler.setFormatter(formatter)
logging.getLogger().addHandler(logging_handler)
else:
logging.basicConfig(stream=stream,
format=fmt,
datefmt=datefmt,
level=level)

View File

@ -12,7 +12,6 @@ import os
import sys
import time
import stat
import json
import logging
import fcntl
import string
@ -21,9 +20,11 @@ import tarfile
from errno import ENOENT, ENODATA, EEXIST, EACCES, EAGAIN, ESTALE, EINTR
from threading import Condition, Lock
from datetime import datetime
from gconf import gconf
from syncdutils import Thread, GsyncdError, boolify, escape_space_newline
from syncdutils import unescape_space_newline, gauxpfx, md5hex, selfkill
import gsyncdconfig as gconf
from rconf import rconf
from syncdutils import Thread, GsyncdError, escape_space_newline
from syncdutils import unescape_space_newline, gauxpfx, escape
from syncdutils import lstat, errno_wrap, FreeObject, lf, matching_disk_gfid
from syncdutils import NoStimeAvailable, PartialHistoryAvailable
@ -85,24 +86,41 @@ def gmaster_builder(excrawl=None):
"""produce the GMaster class variant corresponding
to sync mode"""
this = sys.modules[__name__]
modemixin = gconf.special_sync_mode
modemixin = gconf.get("special-sync-mode")
if not modemixin:
modemixin = 'normal'
changemixin = 'xsync' if gconf.change_detector == 'xsync' \
else excrawl or gconf.change_detector
if gconf.get("change-detector") == 'xsync':
changemixin = 'xsync'
elif excrawl:
changemixin = excrawl
else:
changemixin = gconf.get("change-detector")
logging.debug(lf('setting up change detection mode',
mode=changemixin))
modemixin = getattr(this, modemixin.capitalize() + 'Mixin')
crawlmixin = getattr(this, 'GMaster' + changemixin.capitalize() + 'Mixin')
sendmarkmixin = boolify(
gconf.use_rsync_xattrs) and SendmarkRsyncMixin or SendmarkNormalMixin
purgemixin = boolify(
gconf.ignore_deletes) and PurgeNoopMixin or PurgeNormalMixin
syncengine = boolify(gconf.use_tarssh) and TarSSHEngine or RsyncEngine
if gconf.get("use-rsync-xattrs"):
sendmarkmixin = SendmarkRsyncMixin
else:
sendmarkmixin = SendmarkNormalMixin
if gconf.get("ignore-deletes"):
purgemixin = PurgeNoopMixin
else:
purgemixin = PurgeNormalMixin
if gconf.get("sync-method") == "tarssh":
syncengine = TarSSHEngine
else:
syncengine = RsyncEngine
class _GMaster(crawlmixin, modemixin, sendmarkmixin,
purgemixin, syncengine):
pass
return _GMaster
@ -139,9 +157,9 @@ class NormalMixin(object):
return xt0 >= xt1
def make_xtime_opts(self, is_master, opts):
if not 'create' in opts:
if 'create' not in opts:
opts['create'] = is_master
if not 'default_xtime' in opts:
if 'default_xtime' not in opts:
opts['default_xtime'] = URXTIME
def xtime_low(self, rsc, path, **opts):
@ -212,9 +230,9 @@ class RecoverMixin(NormalMixin):
@staticmethod
def make_xtime_opts(is_master, opts):
if not 'create' in opts:
if 'create' not in opts:
opts['create'] = False
if not 'default_xtime' in opts:
if 'default_xtime' not in opts:
opts['default_xtime'] = URXTIME
def keepalive_payload_hook(self, timo, gap):
@ -385,7 +403,7 @@ class GMasterCommon(object):
self.master = master
self.slave = slave
self.jobtab = {}
if boolify(gconf.use_tarssh):
if gconf.get("sync-method") == "tarssh":
self.syncer = Syncer(slave, self.slave.tarssh, [2])
else:
# partial transfer (cf. rsync(1)), that's normal
@ -401,7 +419,7 @@ class GMasterCommon(object):
# 0.
self.crawls = 0
self.turns = 0
self.total_turns = int(gconf.turns)
self.total_turns = rconf.turns
self.crawl_start = datetime.now()
self.lastreport = {'crawls': 0, 'turns': 0, 'time': 0}
self.start = None
@ -414,7 +432,7 @@ class GMasterCommon(object):
def init_keep_alive(cls):
"""start the keep-alive thread """
timo = int(gconf.timeout or 0)
timo = gconf.get("slave-timeout", 0)
if timo > 0:
def keep_alive():
while True:
@ -427,28 +445,28 @@ class GMasterCommon(object):
def mgmt_lock(self):
"""Take management volume lock """
if gconf.mgmt_lock_fd:
if rconf.mgmt_lock_fd:
try:
fcntl.lockf(gconf.mgmt_lock_fd, fcntl.LOCK_EX | fcntl.LOCK_NB)
if not gconf.active_earlier:
gconf.active_earlier = True
fcntl.lockf(rconf.mgmt_lock_fd, fcntl.LOCK_EX | fcntl.LOCK_NB)
if not rconf.active_earlier:
rconf.active_earlier = True
logging.info(lf("Got lock Becoming ACTIVE",
brick=gconf.local_path))
brick=rconf.args.local_path))
return True
except:
ex = sys.exc_info()[1]
if isinstance(ex, IOError) and ex.errno in (EACCES, EAGAIN):
if not gconf.passive_earlier:
gconf.passive_earlier = True
if not rconf.passive_earlier:
rconf.passive_earlier = True
logging.info(lf("Didn't get lock Becoming PASSIVE",
brick=gconf.local_path))
brick=rconf.local_path))
return False
raise
fd = None
bname = str(self.uuid) + "_" + str(gconf.slave_id) + "_subvol_" \
+ str(gconf.subvol_num) + ".lock"
mgmt_lock_dir = os.path.join(gconf.meta_volume_mnt, "geo-rep")
bname = str(self.uuid) + "_" + rconf.args.slave_id + "_subvol_" \
+ str(rconf.args.subvol_num) + ".lock"
mgmt_lock_dir = os.path.join(gconf.get("meta-volume-mnt"), "geo-rep")
path = os.path.join(mgmt_lock_dir, bname)
logging.debug(lf("lock file path", path=path))
try:
@ -471,30 +489,30 @@ class GMasterCommon(object):
try:
fcntl.lockf(fd, fcntl.LOCK_EX | fcntl.LOCK_NB)
# Save latest FD for future use
gconf.mgmt_lock_fd = fd
rconf.mgmt_lock_fd = fd
except:
ex = sys.exc_info()[1]
if isinstance(ex, IOError) and ex.errno in (EACCES, EAGAIN):
# cannot grab, it's taken
if not gconf.passive_earlier:
gconf.passive_earlier = True
if not rconf.passive_earlier:
rconf.passive_earlier = True
logging.info(lf("Didn't get lock Becoming PASSIVE",
brick=gconf.local_path))
gconf.mgmt_lock_fd = fd
brick=rconf.args.local_path))
rconf.mgmt_lock_fd = fd
return False
raise
if not gconf.active_earlier:
gconf.active_earlier = True
if not rconf.active_earlier:
rconf.active_earlier = True
logging.info(lf("Got lock Becoming ACTIVE",
brick=gconf.local_path))
brick=rconf.args.local_path))
return True
def should_crawl(self):
if not boolify(gconf.use_meta_volume):
return gconf.glusterd_uuid in self.master.server.node_uuid()
if not gconf.get("use-meta-volume"):
return rconf.args.local_node_id in self.master.server.node_uuid()
if not os.path.ismount(gconf.meta_volume_mnt):
if not os.path.ismount(gconf.get("meta-volume-mnt")):
logging.error("Meta-volume is not mounted. Worker Exiting...")
sys.exit(1)
return self.mgmt_lock()
@ -532,7 +550,7 @@ class GMasterCommon(object):
logging.debug("%s master with volume id %s ..." %
(inter_master and "intermediate" or "primary",
self.uuid))
gconf.configinterface.set('volume_id', self.uuid)
rconf.volume_id = self.uuid
if self.volinfo:
if self.volinfo['retval']:
logging.warn(lf("master cluster's info may not be valid",
@ -557,7 +575,7 @@ class GMasterCommon(object):
turns=self.turns,
time=self.start)
t1 = time.time()
if int(t1 - t0) >= int(gconf.replica_failover_interval):
if int(t1 - t0) >= gconf.get("replica-failover-interval"):
crawl = self.should_crawl()
t0 = t1
self.update_worker_remote_node()
@ -567,7 +585,7 @@ class GMasterCommon(object):
# which is min of cluster (but max of the replicas)
brick_stime = self.xtime('.', self.slave)
cluster_stime = self.master.server.aggregated.stime_mnt(
'.', '.'.join([str(self.uuid), str(gconf.slave_id)]))
'.', '.'.join([str(self.uuid), rconf.args.slave_id]))
logging.debug(lf("Crawl info",
cluster_stime=cluster_stime,
brick_stime=brick_stime))
@ -675,6 +693,7 @@ class XCrawlMetadata(object):
self.st_atime = float(st_atime)
self.st_mtime = float(st_mtime)
class GMasterChangelogMixin(GMasterCommon):
""" changelog based change detection and syncing """
@ -701,34 +720,34 @@ class GMasterChangelogMixin(GMasterCommon):
def init_fop_batch_stats(self):
self.batch_stats = {
"CREATE":0,
"MKNOD":0,
"UNLINK":0,
"MKDIR":0,
"RMDIR":0,
"LINK":0,
"SYMLINK":0,
"RENAME":0,
"SETATTR":0,
"SETXATTR":0,
"XATTROP":0,
"DATA":0,
"ENTRY_SYNC_TIME":0,
"META_SYNC_TIME":0,
"DATA_START_TIME":0
"CREATE": 0,
"MKNOD": 0,
"UNLINK": 0,
"MKDIR": 0,
"RMDIR": 0,
"LINK": 0,
"SYMLINK": 0,
"RENAME": 0,
"SETATTR": 0,
"SETXATTR": 0,
"XATTROP": 0,
"DATA": 0,
"ENTRY_SYNC_TIME": 0,
"META_SYNC_TIME": 0,
"DATA_START_TIME": 0
}
def update_fop_batch_stats(self, ty):
if ty in ['FSETXATTR']:
ty = 'SETXATTR'
self.batch_stats[ty] = self.batch_stats.get(ty,0) + 1
ty = 'SETXATTR'
self.batch_stats[ty] = self.batch_stats.get(ty, 0) + 1
def archive_and_purge_changelogs(self, changelogs):
# Creates tar file instead of tar.gz, since changelogs will
# be appended to existing tar. archive name is
# archive_<YEAR><MONTH>.tar
archive_name = "archive_%s.tar" % datetime.today().strftime(
gconf.changelog_archive_format)
gconf.get("changelog-archive-format"))
try:
tar = tarfile.open(os.path.join(self.processed_changelogs_dir,
@ -764,13 +783,9 @@ class GMasterChangelogMixin(GMasterCommon):
else:
raise
def fallback_xsync(self):
logging.info('falling back to xsync mode')
gconf.configinterface.set('change-detector', 'xsync')
selfkill()
def setup_working_dir(self):
workdir = os.path.join(gconf.working_dir, md5hex(gconf.local_path))
workdir = os.path.join(gconf.get("working-dir"),
escape(rconf.args.local_path))
logging.debug('changelog working dir %s' % workdir)
return workdir
@ -804,27 +819,30 @@ class GMasterChangelogMixin(GMasterCommon):
logging.info(lf('Fixing gfid mismatch in slave. Deleting'
' the entry', retry_count=retry_count,
entry=repr(failure)))
#Add deletion to fix_entry_ops list
# Add deletion to fix_entry_ops list
if failure[2]['slave_isdir']:
fix_entry_ops.append(edct('RMDIR',
gfid=failure[2]['slave_gfid'],
entry=pbname))
fix_entry_ops.append(
edct('RMDIR',
gfid=failure[2]['slave_gfid'],
entry=pbname))
else:
fix_entry_ops.append(edct('UNLINK',
gfid=failure[2]['slave_gfid'],
entry=pbname))
fix_entry_ops.append(
edct('UNLINK',
gfid=failure[2]['slave_gfid'],
entry=pbname))
elif not isinstance(st, int):
#The file exists on master but with different name.
#Probabaly renamed and got missed during xsync crawl.
# The file exists on master but with different name.
# Probabaly renamed and got missed during xsync crawl.
if failure[2]['slave_isdir']:
logging.info(lf('Fixing gfid mismatch in slave',
retry_count=retry_count,
entry=repr(failure)))
realpath = os.readlink(os.path.join(gconf.local_path,
".glusterfs",
slave_gfid[0:2],
slave_gfid[2:4],
slave_gfid))
realpath = os.readlink(os.path.join(
rconf.args.local_path,
".glusterfs",
slave_gfid[0:2],
slave_gfid[2:4],
slave_gfid))
dst_entry = os.path.join(pfx, realpath.split('/')[-2],
realpath.split('/')[-1])
rename_dict = edct('RENAME', gfid=slave_gfid,
@ -840,19 +858,20 @@ class GMasterChangelogMixin(GMasterCommon):
' Deleting the entry',
retry_count=retry_count,
entry=repr(failure)))
fix_entry_ops.append(edct('UNLINK',
gfid=failure[2]['slave_gfid'],
entry=pbname))
fix_entry_ops.append(
edct('UNLINK',
gfid=failure[2]['slave_gfid'],
entry=pbname))
logging.error(lf('Entry cannot be fixed in slave due '
'to GFID mismatch, find respective '
'path for the GFID and trigger sync',
gfid=slave_gfid))
if fix_entry_ops:
#Process deletions of entries whose gfids are mismatched
# Process deletions of entries whose gfids are mismatched
failures1 = self.slave.server.entry_ops(fix_entry_ops)
if not failures1:
logging.info ("Sucessfully fixed entry ops with gfid mismatch")
logging.info("Sucessfully fixed entry ops with gfid mismatch")
return failures1
@ -880,12 +899,11 @@ class GMasterChangelogMixin(GMasterCommon):
for failure in failures1:
logging.error("Failed to fix entry ops %s", repr(failure))
else:
#Retry original entry list 5 times
# Retry original entry list 5 times
failures = self.slave.server.entry_ops(entries)
self.log_failures(failures, 'gfid', gauxpfx(), 'ENTRY')
def process_change(self, change, done, retry):
pfx = gauxpfx()
clist = []
@ -930,7 +948,7 @@ class GMasterChangelogMixin(GMasterCommon):
# skip ENTRY operation if hot tier brick
if self.name == 'live_changelog' or \
self.name == 'history_changelog':
if boolify(gconf.is_hottier) and et == self.TYPE_ENTRY:
if rconf.args.is_hottier and et == self.TYPE_ENTRY:
logging.debug(lf('skip ENTRY op if hot tier brick',
op=ec[self.POS_TYPE]))
continue
@ -978,7 +996,7 @@ class GMasterChangelogMixin(GMasterCommon):
'master', gfid=gfid, pgfid_bname=en))
continue
if not boolify(gconf.ignore_deletes):
if not gconf.get("ignore-deletes"):
if not ignore_entry_ops:
entries.append(edct(ty, gfid=gfid, entry=en))
elif ty in ['CREATE', 'MKDIR', 'MKNOD']:
@ -1084,12 +1102,11 @@ class GMasterChangelogMixin(GMasterCommon):
st_mtime=ec[6])))
else:
meta_gfid.add((os.path.join(pfx, ec[0]), ))
elif ec[1] == 'SETXATTR' or ec[1] == 'XATTROP' or \
ec[1] == 'FXATTROP':
elif ec[1] in ['SETXATTR', 'XATTROP', 'FXATTROP']:
# To sync xattr/acls use rsync/tar, --xattrs and --acls
# switch to rsync and tar
if not boolify(gconf.use_tarssh) and \
(boolify(gconf.sync_xattrs) or boolify(gconf.sync_acls)):
if not gconf.get("sync-method") == "tarssh" and \
(gconf.get("sync-xattrs") or gconf.get("sync-acls")):
datas.add(os.path.join(pfx, ec[0]))
else:
logging.warn(lf('got invalid fop type',
@ -1102,8 +1119,8 @@ class GMasterChangelogMixin(GMasterCommon):
self.status.inc_value("data", len(datas))
self.batch_stats["DATA"] += self.files_in_batch - \
self.batch_stats["SETXATTR"] - \
self.batch_stats["XATTROP"]
self.batch_stats["SETXATTR"] - \
self.batch_stats["XATTROP"]
entry_start_time = time.time()
# sync namespace
@ -1185,7 +1202,7 @@ class GMasterChangelogMixin(GMasterCommon):
# with data of other changelogs.
if retry:
if tries == (int(gconf.max_rsync_retries) - 1):
if tries == (gconf.get("max-rsync-retries") - 1):
# Enable Error logging if it is last retry
self.syncer.enable_errorlog()
@ -1243,7 +1260,7 @@ class GMasterChangelogMixin(GMasterCommon):
# We do not know which changelog transfer failed, retry everything.
retry = True
tries += 1
if tries == int(gconf.max_rsync_retries):
if tries == gconf.get("max-rsync-retries"):
logging.error(lf('changelogs could not be processed '
'completely - moving on...',
files=map(os.path.basename, changes)))
@ -1331,8 +1348,7 @@ class GMasterChangelogMixin(GMasterCommon):
# Update last_synced_time in status file based on stime
# only update stime if stime xattr set to Brick root
if path == self.FLAT_DIR_HIERARCHY:
chkpt_time = gconf.configinterface.get_realtime(
"checkpoint")
chkpt_time = gconf.getr("checkpoint")
checkpoint_time = 0
if chkpt_time is not None:
checkpoint_time = int(chkpt_time)
@ -1340,7 +1356,7 @@ class GMasterChangelogMixin(GMasterCommon):
self.status.set_last_synced(stime, checkpoint_time)
def update_worker_remote_node(self):
node = sys.argv[-1]
node = rconf.args.resource_remote
node_data = node.split("@")
node = node_data[-1]
remote_node_ip = node.split(":")[0]
@ -1351,7 +1367,7 @@ class GMasterChangelogMixin(GMasterCommon):
current_size = 0
for c in changes:
si = os.lstat(c).st_size
if (si + current_size) > int(gconf.changelog_batch_size):
if (si + current_size) > gconf.get("changelog-batch-size"):
# Create new batch if single Changelog file greater than
# Max Size! or current batch size exceeds Max size
changelogs_batches.append([c])
@ -1397,7 +1413,7 @@ class GMasterChangelogMixin(GMasterCommon):
def register(self, register_time, changelog_agent, status):
self.changelog_agent = changelog_agent
self.sleep_interval = int(gconf.change_interval)
self.sleep_interval = gconf.get("change-interval")
self.changelog_done_func = self.changelog_agent.done
self.tempdir = self.setup_working_dir()
self.processed_changelogs_dir = os.path.join(self.tempdir,
@ -1437,13 +1453,13 @@ class GMasterChangeloghistoryMixin(GMasterChangelogMixin):
# Changelogs backend path is hardcoded as
# <BRICK_PATH>/.glusterfs/changelogs, if user configured to different
# location then consuming history will not work(Known issue as of now)
changelog_path = os.path.join(gconf.local_path,
changelog_path = os.path.join(rconf.args.local_path,
".glusterfs/changelogs")
ret, actual_end = self.changelog_agent.history(
changelog_path,
data_stime[0],
end_time,
int(gconf.sync_jobs))
gconf.get("sync-jobs"))
# scan followed by getchanges till scan returns zero.
# history_scan() is blocking call, till it gets the number
@ -1736,7 +1752,8 @@ class GMasterXsyncMixin(GMasterChangelogMixin):
[gfid, 'MKNOD', str(mo),
str(0), str(0),
escape_space_newline(
os.path.join(pargfid, bname))])
os.path.join(
pargfid, bname))])
else:
self.write_entry_change(
"E", [gfid, 'LINK', escape_space_newline(
@ -1837,8 +1854,8 @@ class Syncer(object):
self.pb = PostBox()
self.sync_engine = sync_engine
self.errnos_ok = resilient_errnos
for i in range(int(gconf.sync_jobs)):
t = Thread(target=self.syncjob, args=(i+1, ))
for i in range(gconf.get("sync-jobs")):
t = Thread(target=self.syncjob, args=(i + 1, ))
t.start()
def syncjob(self, job_id):

View File

@ -13,21 +13,19 @@ import sys
import time
import signal
import logging
import uuid
import xml.etree.ElementTree as XET
from subprocess import PIPE
from resource import FILE, GLUSTER, SSH
from threading import Lock
from errno import ECHILD, ESRCH
import re
import random
from gconf import gconf
from syncdutils import select, waitpid, errno_wrap, lf
from syncdutils import set_term_handler, is_host_local, GsyncdError
from syncdutils import escape, Thread, finalize
from syncdutils import gf_event, EVENT_GEOREP_FAULTY
from syncdutils import Volinfo, Popen
from resource import SSH
import gsyncdconfig as gconf
from rconf import rconf
from syncdutils import select, waitpid, errno_wrap, lf, grabpidfile
from syncdutils import set_term_handler, is_host_local, GsyncdError
from syncdutils import Thread, finalize, Popen, Volinfo
from syncdutils import gf_event, EVENT_GEOREP_FAULTY
from gsyncdstatus import GeorepStatus, set_monitor_status
@ -82,7 +80,8 @@ def get_slave_bricks_status(host, vol):
try:
for el in vi.findall('volStatus/volumes/volume/node'):
if el.find('status').text == '1':
up_hosts.add(el.find('hostname').text)
up_hosts.add((el.find('hostname').text,
el.find('peerid').text))
except (ParseError, AttributeError, ValueError) as e:
logging.info(lf("Parsing failed to get list of up nodes, "
"returning empty list",
@ -116,7 +115,8 @@ class Monitor(object):
# give a chance to graceful exit
errno_wrap(os.kill, [-os.getpid(), signal.SIGTERM], [ESRCH])
def monitor(self, w, argv, cpids, agents, slave_vol, slave_host, master):
def monitor(self, w, argv, cpids, agents, slave_vol, slave_host, master,
suuid):
"""the monitor loop
Basic logic is a blantantly simple blunt heuristics:
@ -136,7 +136,7 @@ class Monitor(object):
due to the keep-alive thread)
"""
if not self.status.get(w[0]['dir'], None):
self.status[w[0]['dir']] = GeorepStatus(gconf.state_file,
self.status[w[0]['dir']] = GeorepStatus(gconf.get("state-file"),
w[0]['host'],
w[0]['dir'],
w[0]['uuid'],
@ -144,7 +144,7 @@ class Monitor(object):
"%s::%s" % (slave_host,
slave_vol))
set_monitor_status(gconf.state_file, self.ST_STARTED)
set_monitor_status(gconf.get("state-file"), self.ST_STARTED)
self.status[w[0]['dir']].set_worker_status(self.ST_INIT)
ret = 0
@ -172,26 +172,22 @@ class Monitor(object):
return os.WEXITSTATUS(s)
return 1
conn_timeout = int(gconf.connection_timeout)
conn_timeout = gconf.get("connection-timeout")
while ret in (0, 1):
remote_host = w[1]
remote_user, remote_host = w[1][0].split("@")
remote_id = w[1][1]
# Check the status of the connected slave node
# If the connected slave node is down then try to connect to
# different up node.
m = re.match("(ssh|gluster|file):\/\/(.+)@([^:]+):(.+)",
remote_host)
if m:
current_slave_host = m.group(3)
slave_up_hosts = get_slave_bricks_status(
slave_host, slave_vol)
current_slave_host = remote_host
slave_up_hosts = get_slave_bricks_status(
slave_host, slave_vol)
if current_slave_host not in slave_up_hosts:
if len(slave_up_hosts) > 0:
remote_host = "%s://%s@%s:%s" % (m.group(1),
m.group(2),
random.choice(
slave_up_hosts),
m.group(4))
if (current_slave_host, remote_id) not in slave_up_hosts:
if len(slave_up_hosts) > 0:
remote_new = random.choice(slave_up_hosts)
remote_host = "%s@%s" % (remote_user, remote_new[0])
remote_id = remote_new[1]
# Spawn the worker and agent in lock to avoid fd leak
self.lock.acquire()
@ -213,33 +209,58 @@ class Monitor(object):
if apid == 0:
os.close(rw)
os.close(ww)
os.execv(sys.executable, argv + ['--local-path', w[0]['dir'],
'--local-node', w[0]['host'],
'--local-node-id',
w[0]['uuid'],
'--agent',
'--rpc-fd',
','.join([str(ra), str(wa),
str(rw), str(ww)])])
args_to_agent = argv + [
'agent',
rconf.args.master,
rconf.args.slave,
'--local-path', w[0]['dir'],
'--local-node', w[0]['host'],
'--local-node-id', w[0]['uuid'],
'--slave-id', suuid,
'--rpc-fd', ','.join([str(ra), str(wa), str(rw), str(ww)])
]
if rconf.args.config_file is not None:
args_to_agent += ['-c', rconf.args.config_file]
if rconf.args.debug:
args_to_agent.append("--debug")
os.execv(sys.executable, args_to_agent)
pr, pw = os.pipe()
cpid = os.fork()
if cpid == 0:
os.close(pr)
os.close(ra)
os.close(wa)
os.execv(sys.executable, argv + ['--feedback-fd', str(pw),
'--local-path', w[0]['dir'],
'--local-node', w[0]['host'],
'--local-node-id',
w[0]['uuid'],
'--local-id',
'.' + escape(w[0]['dir']),
'--rpc-fd',
','.join([str(rw), str(ww),
str(ra), str(wa)]),
'--subvol-num', str(w[2])] +
(['--is-hottier'] if w[3] else []) +
['--resource-remote', remote_host])
args_to_worker = argv + [
'worker',
rconf.args.master,
rconf.args.slave,
'--feedback-fd', str(pw),
'--local-path', w[0]['dir'],
'--local-node', w[0]['host'],
'--local-node-id', w[0]['uuid'],
'--slave-id', suuid,
'--rpc-fd',
','.join([str(rw), str(ww), str(ra), str(wa)]),
'--subvol-num', str(w[2]),
'--resource-remote', remote_host,
'--resource-remote-id', remote_id
]
if rconf.args.config_file is not None:
args_to_worker += ['-c', rconf.args.config_file]
if w[3]:
args_to_worker.append("--is-hottier")
if rconf.args.debug:
args_to_worker.append("--debug")
os.execv(sys.executable, args_to_worker)
cpids.add(cpid)
agents.add(apid)
@ -290,7 +311,8 @@ class Monitor(object):
logging.info(lf("Changelog Agent died, Aborting "
"Worker",
brick=w[0]['dir']))
errno_wrap(os.kill, [cpid, signal.SIGKILL], [ESRCH])
errno_wrap(os.kill, [cpid, signal.SIGKILL],
[ESRCH])
nwait(cpid)
nwait(apid)
break
@ -333,12 +355,7 @@ class Monitor(object):
return ret
def multiplex(self, wspx, suuid, slave_vol, slave_host, master):
argv = sys.argv[:]
for o in ('-N', '--no-daemon', '--monitor'):
while o in argv:
argv.remove(o)
argv.extend(('-N', '-p', '', '--slave-id', suuid))
argv.insert(0, os.path.basename(sys.executable))
argv = [os.path.basename(sys.executable), sys.argv[0]]
cpids = set()
agents = set()
@ -346,7 +363,7 @@ class Monitor(object):
for wx in wspx:
def wmon(w):
cpid, _ = self.monitor(w, argv, cpids, agents, slave_vol,
slave_host, master)
slave_host, master, suuid)
time.sleep(1)
self.lock.acquire()
for cpid in cpids:
@ -362,55 +379,39 @@ class Monitor(object):
t.join()
def distribute(*resources):
master, slave = resources
def distribute(master, slave):
mvol = Volinfo(master.volume, master.host)
logging.debug('master bricks: ' + repr(mvol.bricks))
prelude = []
si = slave
slave_host = None
slave_vol = None
if isinstance(slave, SSH):
prelude = gconf.ssh_command.split() + [slave.remote_addr]
si = slave.inner_rsc
logging.debug('slave SSH gateway: ' + slave.remote_addr)
if isinstance(si, FILE):
sbricks = {'host': 'localhost', 'dir': si.path}
suuid = uuid.uuid5(uuid.NAMESPACE_URL, slave.get_url(canonical=True))
elif isinstance(si, GLUSTER):
svol = Volinfo(si.volume, slave.remote_addr.split('@')[-1])
sbricks = svol.bricks
suuid = svol.uuid
slave_host = slave.remote_addr.split('@')[-1]
slave_vol = si.volume
prelude = [gconf.get("ssh-command")] + \
gconf.get("ssh-options").split() + \
["-p", str(gconf.get("ssh-port"))] + \
[slave.remote_addr]
logging.debug('slave SSH gateway: ' + slave.remote_addr)
svol = Volinfo(slave.volume, "localhost", prelude)
sbricks = svol.bricks
suuid = svol.uuid
slave_host = slave.remote_addr.split('@')[-1]
slave_vol = slave.volume
# save this xattr for the session delete command
old_stime_xattr_prefix = gconf.get("stime-xattr-prefix", None)
new_stime_xattr_prefix = "trusted.glusterfs." + mvol.uuid + "." + \
svol.uuid
if not old_stime_xattr_prefix or \
old_stime_xattr_prefix != new_stime_xattr_prefix:
gconf.setconfig("stime-xattr-prefix", new_stime_xattr_prefix)
# save this xattr for the session delete command
old_stime_xattr_name = getattr(gconf, "master.stime_xattr_name", None)
new_stime_xattr_name = "trusted.glusterfs." + mvol.uuid + "." + \
svol.uuid + ".stime"
if not old_stime_xattr_name or \
old_stime_xattr_name != new_stime_xattr_name:
gconf.configinterface.set("master.stime_xattr_name",
new_stime_xattr_name)
else:
raise GsyncdError("unknown slave type " + slave.url)
logging.debug('slave bricks: ' + repr(sbricks))
if isinstance(si, FILE):
slaves = [slave.url]
else:
slavenodes = set(b['host'] for b in sbricks)
if isinstance(slave, SSH) and not gconf.isolated_slave:
rap = SSH.parse_ssh_address(slave)
slaves = ['ssh://' + rap['user'] + '@' + h + ':' + si.url
for h in slavenodes]
else:
slavevols = [h + ':' + si.volume for h in slavenodes]
if isinstance(slave, SSH):
slaves = ['ssh://' + rap.remote_addr + ':' + v
for v in slavevols]
else:
slaves = slavevols
slavenodes = set((b['host'], b["uuid"]) for b in sbricks)
rap = SSH.parse_ssh_address(slave)
slaves = [(rap['user'] + '@' + h[0], h[1]) for h in slavenodes]
workerspex = []
for idx, brick in enumerate(mvol.bricks):
@ -424,12 +425,47 @@ def distribute(*resources):
return workerspex, suuid, slave_vol, slave_host, master
def monitor(*resources):
def monitor(local, remote):
# Check if gsyncd restarted in pause state. If
# yes, send SIGSTOP to negative of monitor pid
# to go back to pause state.
if gconf.pause_on_start:
if rconf.args.pause_on_start:
errno_wrap(os.kill, [-os.getpid(), signal.SIGSTOP], [ESRCH])
"""oh yeah, actually Monitor is used as singleton, too"""
return Monitor().multiplex(*distribute(*resources))
return Monitor().multiplex(*distribute(local, remote))
def startup(go_daemon=True):
"""set up logging, pidfile grabbing, daemonization"""
pid_file = gconf.get("pid-file")
if not grabpidfile():
sys.stderr.write("pidfile is taken, exiting.\n")
sys.exit(2)
rconf.pid_file_owned = True
if not go_daemon:
return
x, y = os.pipe()
cpid = os.fork()
if cpid:
os.close(x)
sys.exit()
os.close(y)
os.setsid()
dn = os.open(os.devnull, os.O_RDWR)
for f in (sys.stdin, sys.stdout, sys.stderr):
os.dup2(dn, f.fileno())
if not grabpidfile(pid_file + '.tmp'):
raise GsyncdError("cannot grab temporary pidfile")
os.rename(pid_file + '.tmp', pid_file)
# wait for parent to terminate
# so we can start up with
# no messing from the dirty
# ol' bustard
select((x,), (), ())
os.close(x)

View File

@ -9,9 +9,9 @@
#
class GConf(object):
class RConf(object):
"""singleton class to store globals
"""singleton class to store runtime globals
shared between gsyncd modules"""
ssh_ctl_dir = None
@ -28,5 +28,7 @@ class GConf(object):
active_earlier = False
passive_earlier = False
mgmt_lock_fd = None
args = None
turns = 0
gconf = GConf()
rconf = RConf()

View File

@ -13,21 +13,9 @@ import sys
import time
import logging
from threading import Condition
try:
import thread
except ImportError:
# py 3
import _thread as thread
try:
from Queue import Queue
except ImportError:
# py 3
from queue import Queue
try:
import cPickle as pickle
except ImportError:
# py 3
import pickle
import thread
from Queue import Queue
import cPickle as pickle
from syncdutils import Thread, select, lf

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,306 @@
import logging
from syncdutils import lf
import gsyncdconfig as gconf
ERROR_CONFIG_INVALID = 2
ERROR_CONFIG_INVALID_VALUE = 3
ERROR_CONFIG_NOT_CONFIGURABLE = 4
def subcmd_monitor_status(args):
from gsyncdstatus import set_monitor_status
from rconf import rconf
set_monitor_status(gconf.get("state-file"), args.status)
rconf.log_exit = False
logging.info(lf("Monitor Status Change", status=args.status))
def subcmd_status(args):
from gsyncdstatus import GeorepStatus
master_name = args.master.replace(":", "")
slave_data = args.slave.replace("ssh://", "")
brick_status = GeorepStatus(gconf.get("state-file"),
"",
args.local_path,
"",
master_name,
slave_data,
gconf.get("pid-file"))
checkpoint_time = gconf.get("checkpoint", 0)
brick_status.print_status(checkpoint_time=checkpoint_time)
def subcmd_monitor(args):
import monitor
from resource import GLUSTER, SSH, Popen
go_daemon = False if args.debug else True
monitor.startup(go_daemon)
Popen.init_errhandler()
local = GLUSTER("localhost", args.master)
slavehost, slavevol = args.slave.split("::")
remote = SSH(slavehost, slavevol)
return monitor.monitor(local, remote)
def subcmd_verify_spawning(args):
logging.info("Able to spawn gsyncd.py")
def subcmd_worker(args):
import os
import fcntl
from resource import GLUSTER, SSH, Popen
Popen.init_errhandler()
fcntl.fcntl(args.feedback_fd, fcntl.F_SETFD, fcntl.FD_CLOEXEC)
local = GLUSTER("localhost", args.master)
slavehost, slavevol = args.slave.split("::")
remote = SSH(slavehost, slavevol)
remote.connect_remote()
local.connect()
logging.info("Closing feedback fd, waking up the monitor")
os.close(args.feedback_fd)
local.service_loop(remote)
def subcmd_slave(args):
from resource import GLUSTER, Popen
Popen.init_errhandler()
slavevol = args.slave.split("::")[-1]
local = GLUSTER("localhost", slavevol)
local.connect()
local.service_loop()
def subcmd_agent(args):
import os
from changelogagent import agent, Changelog
from syncdutils import lf
os.setsid()
logging.debug(lf("RPC FD",
rpc_fd=repr(args.rpc_fd)))
return agent(Changelog(), args.rpc_fd)
def subcmd_voluuidget(args):
from subprocess import Popen, PIPE
import xml.etree.ElementTree as XET
ParseError = XET.ParseError if hasattr(XET, 'ParseError') else SyntaxError
po = Popen(['gluster', '--xml', '--remote-host=' + args.host,
'volume', 'info', args.volname], bufsize=0,
stdin=None, stdout=PIPE, stderr=PIPE)
vix, err = po.communicate()
if po.returncode != 0:
logging.info(lf("Volume info failed, unable to get "
"volume uuid of slavevol, "
"returning empty string",
slavevol=args.volname,
slavehost=args.host,
error=po.returncode))
return ""
vi = XET.fromstring(vix)
if vi.find('opRet').text != '0':
logging.info(lf("Unable to get volume uuid of slavevol, "
"returning empty string",
slavevol=args.volname,
slavehost=args.host,
error=vi.find('opErrstr').text))
return ""
try:
voluuid = vi.find("volInfo/volumes/volume/id").text
except (ParseError, AttributeError, ValueError) as e:
logging.info(lf("Parsing failed to volume uuid of slavevol, "
"returning empty string",
slavevol=args.volname,
slavehost=args.host,
error=e))
voluuid = ""
print(voluuid)
def _unlink(path):
import os
from errno import ENOENT
from syncdutils import GsyncdError
import sys
try:
os.unlink(path)
except (OSError, IOError):
if sys.exc_info()[1].errno == ENOENT:
pass
else:
raise GsyncdError('Unlink error: %s' % path)
def subcmd_delete(args):
import logging
import shutil
import glob
import sys
from errno import ENOENT, ENODATA
import struct
from syncdutils import GsyncdError, Xattr, errno_wrap
import gsyncdconfig as gconf
logging.info('geo-replication delete')
# remove the stime xattr from all the brick paths so that
# a re-create of a session will start sync all over again
stime_xattr_prefix = gconf.get('stime-xattr-prefix', None)
# Delete pid file, status file, socket file
cleanup_paths = []
cleanup_paths.append(gconf.get("pid-file"))
# Cleanup Session dir
try:
shutil.rmtree(gconf.get("georep-session-working-dir"))
except (IOError, OSError):
if sys.exc_info()[1].errno == ENOENT:
pass
else:
raise GsyncdError(
'Error while removing working dir: %s' %
gconf.get("georep-session-working-dir"))
# Cleanup changelog working dirs
try:
shutil.rmtree(gconf.get("working-dir"))
except (IOError, OSError):
if sys.exc_info()[1].errno == ENOENT:
pass
else:
raise GsyncdError(
'Error while removing working dir: %s' %
gconf.get("working-dir"))
for path in cleanup_paths:
# To delete temp files
for f in glob.glob(path + "*"):
_unlink(f)
if args.reset_sync_time and stime_xattr_prefix:
for p in args.paths:
if p != "":
# set stime to (0,0) to trigger full volume content resync
# to slave on session recreation
# look at master.py::Xcrawl hint: zero_zero
errno_wrap(Xattr.lsetxattr,
(p, stime_xattr_prefix + ".stime",
struct.pack("!II", 0, 0)),
[ENOENT, ENODATA])
errno_wrap(Xattr.lremovexattr,
(p, stime_xattr_prefix + ".entry_stime"),
[ENOENT, ENODATA])
return
def print_config(name, value, only_value=False, use_underscore=False):
val = value
if isinstance(value, bool):
val = str(value).lower()
if only_value:
print(val)
else:
if use_underscore:
name = name.replace("-", "_")
print("%s:%s" % (name, val))
def config_name_format(val):
return val.replace("_", "-")
def subcmd_config_get(args):
import sys
all_config = gconf.getall(show_defaults=args.show_defaults,
show_non_configurable=True)
if args.name is not None:
val = all_config.get(config_name_format(args.name), None)
if val is None:
sys.stderr.write("Invalid config name \"%s\"\n" % args.name)
sys.exit(ERROR_CONFIG_INVALID)
print_config(args.name, val, only_value=args.only_value,
use_underscore=args.use_underscore)
return
for k in sorted(all_config):
print_config(k, all_config[k], use_underscore=args.use_underscore)
def subcmd_config_check(args):
import sys
try:
gconf.check(config_name_format(args.name), value=args.value,
with_conffile=False)
except gconf.GconfNotConfigurable:
cnf_val = gconf.get(config_name_format(args.name), None)
if cnf_val is None:
sys.stderr.write("Invalid config name \"%s\"\n" % args.name)
sys.exit(ERROR_CONFIG_INVALID)
# Not configurable
sys.stderr.write("Not configurable \"%s\"\n" % args.name)
sys.exit(ERROR_CONFIG_NOT_CONFIGURABLE)
except gconf.GconfInvalidValue:
sys.stderr.write("Invalid config value \"%s=%s\"\n" % (args.name,
args.value))
sys.exit(ERROR_CONFIG_INVALID_VALUE)
def subcmd_config_set(args):
import sys
try:
gconf.setconfig(config_name_format(args.name), args.value)
except gconf.GconfNotConfigurable:
cnf_val = gconf.get(config_name_format(args.name), None)
if cnf_val is None:
sys.stderr.write("Invalid config name \"%s\"\n" % args.name)
sys.exit(ERROR_CONFIG_INVALID)
# Not configurable
sys.stderr.write("Not configurable \"%s\"\n" % args.name)
sys.exit(ERROR_CONFIG_NOT_CONFIGURABLE)
except gconf.GconfInvalidValue:
sys.stderr.write("Invalid config value \"%s=%s\"\n" % (args.name,
args.value))
sys.exit(ERROR_CONFIG_INVALID_VALUE)
def subcmd_config_reset(args):
import sys
try:
gconf.resetconfig(config_name_format(args.name))
except gconf.GconfNotConfigurable:
cnf_val = gconf.get(config_name_format(args.name), None)
if cnf_val is None:
sys.stderr.write("Invalid config name \"%s\"\n" % args.name)
sys.exit(ERROR_CONFIG_INVALID)
# Not configurable
sys.stderr.write("Not configurable \"%s\"\n" % args.name)
sys.exit(ERROR_CONFIG_NOT_CONFIGURABLE)

View File

@ -15,19 +15,19 @@ import time
import fcntl
import shutil
import logging
import socket
import errno
import threading
import subprocess
from subprocess import PIPE
from threading import Lock, Thread as baseThread
from errno import EACCES, EAGAIN, EPIPE, ENOTCONN, ECONNABORTED
from errno import EINTR, ENOENT, EPERM, ESTALE, EBUSY, errorcode
from errno import EINTR, ENOENT, ESTALE, EBUSY, errorcode
from signal import signal, SIGTERM
import select as oselect
from os import waitpid as owaitpid
import xml.etree.ElementTree as XET
from select import error as SelectError
from cPickle import PickleError
from conf import GLUSTERFS_LIBEXECDIR, UUID_FILE
sys.path.insert(1, GLUSTERFS_LIBEXECDIR)
@ -46,25 +46,10 @@ except ImportError:
EVENT_GEOREP_PASSIVE = None
EVENT_GEOREP_CHECKPOINT_COMPLETED = None
try:
from cPickle import PickleError
except ImportError:
# py 3
from pickle import PickleError
import gsyncdconfig as gconf
from rconf import rconf
from gconf import gconf
try:
# py 3
from urllib import parse as urllib
except ImportError:
import urllib
try:
from hashlib import md5 as md5
except ImportError:
# py 2.4
from md5 import new as md5
from hashlib import md5 as md5
# auxiliary gfid based access prefix
_CL_AUX_GFID_PFX = ".gfid/"
@ -80,6 +65,10 @@ SPACE_ESCAPE_CHAR = "%20"
NEWLINE_ESCAPE_CHAR = "%0A"
PERCENTAGE_ESCAPE_CHAR = "%25"
final_lock = Lock()
mntpt_list = []
def sup(x, *a, **kw):
"""a rubyesque "super" for python ;)
@ -93,12 +82,7 @@ def sup(x, *a, **kw):
def escape(s):
"""the chosen flavor of string escaping, used all over
to turn whatever data to creatable representation"""
return urllib.quote_plus(s)
def unescape(s):
"""inverse of .escape"""
return urllib.unquote_plus(s)
return s.replace("/", "-").strip("-")
def escape_space_newline(s):
@ -170,17 +154,17 @@ def setup_ssh_ctl(ctld, remote_addr, resource_url):
"""
Setup GConf ssh control path parameters
"""
gconf.ssh_ctl_dir = ctld
rconf.ssh_ctl_dir = ctld
content = "SLAVE_HOST=%s\nSLAVE_RESOURCE_URL=%s" % (remote_addr,
resource_url)
content_md5 = md5hex(content)
fname = os.path.join(gconf.ssh_ctl_dir,
fname = os.path.join(rconf.ssh_ctl_dir,
"%s.mft" % content_md5)
create_manifest(fname, content)
ssh_ctl_path = os.path.join(gconf.ssh_ctl_dir,
ssh_ctl_path = os.path.join(rconf.ssh_ctl_dir,
"%s.sock" % content_md5)
gconf.ssh_ctl_args = ["-oControlMaster=auto", "-S", ssh_ctl_path]
rconf.ssh_ctl_args = ["-oControlMaster=auto", "-S", ssh_ctl_path]
def grabfile(fname, content=None):
@ -207,32 +191,30 @@ def grabfile(fname, content=None):
except:
f.close()
raise
gconf.permanent_handles.append(f)
rconf.permanent_handles.append(f)
return f
def grabpidfile(fname=None, setpid=True):
""".grabfile customization for pid files"""
if not fname:
fname = gconf.pid_file
fname = gconf.get("pid-file")
content = None
if setpid:
content = str(os.getpid()) + '\n'
return grabfile(fname, content=content)
final_lock = Lock()
mntpt_list = []
def finalize(*a, **kw):
def finalize(*args, **kwargs):
"""all those messy final steps we go trough upon termination
Do away with pidfile, ssh control dir and logging.
"""
final_lock.acquire()
if getattr(gconf, 'pid_file', None):
rm_pidf = gconf.pid_file_owned
if gconf.cpid:
if gconf.get('pid_file'):
rm_pidf = rconf.pid_file_owned
if rconf.cpid:
# exit path from parent branch of daemonization
rm_pidf = False
while True:
@ -240,37 +222,31 @@ def finalize(*a, **kw):
if not f:
# child has already taken over pidfile
break
if os.waitpid(gconf.cpid, os.WNOHANG)[0] == gconf.cpid:
if os.waitpid(rconf.cpid, os.WNOHANG)[0] == rconf.cpid:
# child has terminated
rm_pidf = True
break
time.sleep(0.1)
if rm_pidf:
try:
os.unlink(gconf.pid_file)
os.unlink(rconf.pid_file)
except:
ex = sys.exc_info()[1]
if ex.errno == ENOENT:
pass
else:
raise
if gconf.ssh_ctl_dir and not gconf.cpid:
if rconf.ssh_ctl_dir and not rconf.cpid:
def handle_rm_error(func, path, exc_info):
if exc_info[1].errno == ENOENT:
return
raise exc_info[1]
shutil.rmtree(gconf.ssh_ctl_dir, onerror=handle_rm_error)
if getattr(gconf, 'state_socket', None):
try:
os.unlink(gconf.state_socket)
except:
if sys.exc_info()[0] == OSError:
pass
shutil.rmtree(rconf.ssh_ctl_dir, onerror=handle_rm_error)
""" Unmount if not done """
for mnt in mntpt_list:
p0 = subprocess.Popen (["umount", "-l", mnt], stderr=subprocess.PIPE)
p0 = subprocess.Popen(["umount", "-l", mnt], stderr=subprocess.PIPE)
_, errdata = p0.communicate()
if p0.returncode == 0:
try:
@ -280,12 +256,11 @@ def finalize(*a, **kw):
else:
pass
if gconf.log_exit:
if rconf.log_exit:
logging.info("exiting.")
sys.stdout.flush()
sys.stderr.flush()
os._exit(kw.get('exval', 0))
os._exit(kwargs.get('exval', 0))
def log_raise_exception(excont):
@ -315,9 +290,9 @@ def log_raise_exception(excont):
((isinstance(exc, OSError) or isinstance(exc, IOError)) and
exc.errno == EPIPE):
logging.error('connection to peer is broken')
if hasattr(gconf, 'transport'):
gconf.transport.wait()
if gconf.transport.returncode == 127:
if hasattr(rconf, 'transport'):
rconf.transport.wait()
if rconf.transport.returncode == 127:
logging.error("getting \"No such file or directory\""
"errors is most likely due to "
"MISCONFIGURATION, please remove all "
@ -331,7 +306,7 @@ def log_raise_exception(excont):
"<SLAVEVOL> config remote-gsyncd "
"<GSYNCD_PATH> (Example GSYNCD_PATH: "
"`/usr/libexec/glusterfs/gsyncd`)")
gconf.transport.terminate_geterr()
rconf.transport.terminate_geterr()
elif isinstance(exc, OSError) and exc.errno in (ENOTCONN,
ECONNABORTED):
logging.error(lf('glusterfs session went down',
@ -365,20 +340,20 @@ class Thread(baseThread):
function coughs up an exception
"""
def __init__(self, *a, **kw):
tf = kw.get('target')
def __init__(self, *args, **kwargs):
tf = kwargs.get('target')
if tf:
def twrap(*aa):
def twrap(*aargs):
excont = FreeObject(exval=0)
try:
tf(*aa)
tf(*aargs)
except:
try:
log_raise_exception(excont)
finally:
finalize(exval=excont.exval)
kw['target'] = twrap
baseThread.__init__(self, *a, **kw)
kwargs['target'] = twrap
baseThread.__init__(self, *args, **kwargs)
self.setDaemon(True)
@ -443,7 +418,7 @@ def boolify(s):
lstr = s.lower()
if lstr in true_list:
rv = True
elif not lstr in false_list:
elif lstr not in false_list:
logging.warn(lf("Unknown string in \"string to boolean\" conversion, "
"defaulting to False",
str=s))
@ -451,29 +426,33 @@ def boolify(s):
return rv
def eintr_wrap(func, exc, *a):
def eintr_wrap(func, exc, *args):
"""
wrapper around syscalls resilient to interrupt caused
by signals
"""
while True:
try:
return func(*a)
return func(*args)
except exc:
ex = sys.exc_info()[1]
if not ex.args[0] == EINTR:
raise
def select(*a):
return eintr_wrap(oselect.select, oselect.error, *a)
def select(*args):
return eintr_wrap(oselect.select, oselect.error, *args)
def waitpid(*a):
return eintr_wrap(owaitpid, OSError, *a)
def waitpid(*args):
return eintr_wrap(owaitpid, OSError, *args)
def set_term_handler(hook=lambda *a: finalize(*a, **{'exval': 1})):
def term_handler_default_hook(signum, frame):
finalize(signum, frame, exval=1)
def set_term_handler(hook=term_handler_default_hook):
signal(SIGTERM, hook)
@ -550,7 +529,7 @@ def errno_wrap(call, arg=[], errnos=[], retry_errnos=[]):
ex = sys.exc_info()[1]
if ex.errno in errnos:
return ex.errno
if not ex.errno in retry_errnos:
if ex.errno not in retry_errnos:
raise
nr_tries += 1
if nr_tries == GF_OP_RETRIES: