gsyncd: distribute the crawling load

* also consume changelog for change detection.
* Status fixes
* Use new libgfchangelog done API
* process (and sync) one changelog at a time

Change-Id: I24891615bb762e0741b1819ddfdef8802326cb16
BUG: 847839
Original Author: Csaba Henk <csaba@redhat.com>
Original Author: Aravinda VK <avishwan@redhat.com>
Original Author: Venky Shankar <vshankar@redhat.com>
Original Author: Amar Tumballi <amarts@redhat.com>
Original Author: Avra Sengupta <asengupt@redhat.com>
Signed-off-by: Avra Sengupta <asengupt@redhat.com>
Reviewed-on: http://review.gluster.org/5131
Reviewed-by: Vijay Bellur <vbellur@redhat.com>
Tested-by: Vijay Bellur <vbellur@redhat.com>
This commit is contained in:
Avra Sengupta 2013-06-01 16:17:57 +05:30 committed by Vijay Bellur
parent 4944fc943e
commit b13c483dca
27 changed files with 1788 additions and 242 deletions

View File

@ -29,6 +29,7 @@ AC_CONFIG_HEADERS([config.h])
AC_CONFIG_FILES([Makefile
libglusterfs/Makefile
libglusterfs/src/Makefile
geo-replication/src/peer_gsec_create
glusterfsd/Makefile
glusterfsd/src/Makefile
rpc/Makefile
@ -149,6 +150,7 @@ AC_CONFIG_FILES([Makefile
extras/ocf/glusterd
extras/ocf/volume
extras/LinuxRPM/Makefile
extras/geo-rep/Makefile
contrib/fuse-util/Makefile
contrib/uuid/uuid_types.h
glusterfs-api.pc

View File

@ -2,7 +2,7 @@
EditorModedir = $(docdir)
EditorMode_DATA = glusterfs-mode.el glusterfs.vim
SUBDIRS = init.d systemd benchmarking hook-scripts $(OCF_SUBDIR) LinuxRPM
SUBDIRS = init.d systemd benchmarking hook-scripts $(OCF_SUBDIR) LinuxRPM geo-rep
confdir = $(sysconfdir)/glusterfs
conf_DATA = glusterfs-logrotate

View File

@ -0,0 +1,2 @@
EXTRA_DIST = gsync-sync-gfid.c gsync-upgrade.sh generate-gfid-file.sh \
get-gfid.sh slave-upgrade.sh

View File

@ -0,0 +1,53 @@
#!/bin/bash
#Usage: generate-gfid-file.sh <master-volfile-server:master-volume> <path-to-get-gfid.sh> <output-file>
function get_gfids()
{
GET_GFID_CMD=$1
OUTPUT_FILE=$2
find . -exec $GET_GFID_CMD {} \; > $OUTPUT_FILE
}
function mount_client()
{
local T; # temporary mount
local i; # inode number
VOLFILE_SERVER=$1;
VOLUME=$2;
GFID_CMD=$3;
OUTPUT=$4;
T=$(mktemp -d);
glusterfs -s $VOLFILE_SERVER --volfile-id $VOLUME $T;
i=$(stat -c '%i' $T);
[ "x$i" = "x1" ] || fatal "could not mount volume $MASTER on $T";
cd $T;
get_gfids $GFID_CMD $OUTPUT
cd -;
umount $T || fatal "could not umount $MASTER from $T";
rmdir $T || warn "rmdir of $T failed";
}
function main()
{
SLAVE=$1
GET_GFID_CMD=$2
OUTPUT=$3
VOLFILE_SERVER=`echo $SLAVE | sed -e 's/\(.*\):.*/\1/'`
VOLUME_NAME=`echo $SLAVE | sed -e 's/.*:\(.*\)/\1/'`
mount_client $VOLFILE_SERVER $VOLUME_NAME $GET_GFID_CMD $OUTPUT
}
main "$@";

7
extras/geo-rep/get-gfid.sh Executable file
View File

@ -0,0 +1,7 @@
#!/bin/bash
ATTR_STR=`getfattr -h $1 -n glusterfs.gfid.string`
GLFS_PATH=`echo $ATTR_STR | sed -e 's/# file: \(.*\) glusterfs.gfid.string*/\1/g'`
GFID=`echo $ATTR_STR | sed -e 's/.*glusterfs.gfid.string="\(.*\)"/\1/g'`
echo "$GFID $GLFS_PATH"

View File

@ -0,0 +1,106 @@
#include <stdio.h>
#include <errno.h>
#include <string.h>
#include <limits.h>
#include <sys/types.h>
#include <attr/xattr.h>
#include <libgen.h>
#include <ctype.h>
#include <stdlib.h>
#ifndef UUID_CANONICAL_FORM_LEN
#define UUID_CANONICAL_FORM_LEN 36
#endif
#ifndef GF_FUSE_AUX_GFID_HEAL
#define GF_FUSE_AUX_GFID_HEAL "glusterfs.gfid.heal"
#endif
#define GLFS_LINE_MAX (PATH_MAX + (2 * UUID_CANONICAL_FORM_LEN))
int
main (int argc, char *argv[])
{
char *file = NULL;
char *tmp = NULL;
char *tmp1 = NULL;
char *parent_dir = NULL;
char *gfid = NULL;
char *bname = NULL;
int ret = -1;
int len = 0;
FILE *fp = NULL;
char line[GLFS_LINE_MAX] = {0,};
char *path = NULL;
void *blob = NULL;
void *tmp_blob = NULL;
if (argc != 2) {
/* each line in the file has the following format
* uuid-in-canonical-form path-relative-to-gluster-mount.
* Both uuid and relative path are from master mount.
*/
fprintf (stderr, "usage: %s <file-of-paths-to-be-synced>\n",
argv[0]);
goto out;
}
file = argv[1];
fp = fopen (file, "r");
if (fp == NULL) {
fprintf (stderr, "cannot open %s for reading (%s)\n",
file, strerror (errno));
goto out;
}
while (fgets (line, GLFS_LINE_MAX, fp) != NULL) {
tmp = line;
path = gfid = line;
path += UUID_CANONICAL_FORM_LEN + 1;
while(isspace (*path))
path++;
if ((strlen (line) < GLFS_LINE_MAX) &&
(line[strlen (line) - 1] == '\n'))
line[strlen (line) - 1] = '\0';
line[UUID_CANONICAL_FORM_LEN] = '\0';
tmp = strdup (path);
tmp1 = strdup (path);
parent_dir = dirname (tmp);
bname = basename (tmp1);
/* gfid + '\0' + bname + '\0' */
len = UUID_CANONICAL_FORM_LEN + 1 + strlen (bname) + 1;
blob = calloc (1, len);
memcpy (blob, gfid, UUID_CANONICAL_FORM_LEN);
tmp_blob = blob + UUID_CANONICAL_FORM_LEN + 1;
memcpy (tmp_blob, bname, strlen (bname));
ret = setxattr (parent_dir, GF_FUSE_AUX_GFID_HEAL, blob, len,
0);
if (ret < 0) {
fprintf (stderr, "setxattr on %s/%s failed (%s)\n",
parent_dir, bname, strerror (errno));
}
memset (line, 0, GLFS_LINE_MAX);
free (blob);
free (tmp); free (tmp1);
blob = NULL;
}
ret = 0;
out:
return ret;
}

View File

@ -0,0 +1,123 @@
#!/bin/bash
#usage: gsync-upgrade.sh <slave-volfile-server:slave-volume> <gfid-file>
# <path-to-gsync-sync-gfid> <ssh-identity-file>
#<slave-volfile-server>: a machine on which gluster cli can fetch slave volume info.
# slave-volfile-server defaults to localhost.
#
#<gfid-file>: a file containing paths and their associated gfids
# on master. The paths are relative to master mount point
# (not absolute). An example extract of <gfid-file> can be,
#
# <extract>
# 22114455-57c5-46e9-a783-c40f83a72b09 /dir
# 25772386-3eb8-4550-a802-c3fdc938ca80 /dir/file
# </extract>
#
#<ssh-identity-file>: file from which the identity (private key) for public key authentication is read.
SLAVE_MOUNT='/tmp/glfs_slave'
function SSH()
{
HOST=$1
SSHKEY=$2
shift 2
ssh -qi $SSHKEY \
-oPasswordAuthentication=no \
-oStrictHostKeyChecking=no \
"$HOST" "$@";
}
function get_bricks()
{
SSHKEY=$3
SSH $1 $SSHKEY "gluster volume info $2" | grep -E 'Brick[0-9]+' | sed -e 's/[^:]*:\(.*\)/\1/g'
}
function cleanup_brick()
{
HOST=$1
BRICK=$2
SSHKEY=$3
# TODO: write a C program to receive a list of files and does cleanup on
# them instead of spawning a new setfattr process for each file if
# performance is bad.
SSH -i $SSHKEY $HOST "rm -rf $BRICK/.glusterfs/* && find $BRICK -exec setfattr -x trusted.gfid {} \;"
}
function cleanup_slave()
{
SSHKEY=$2
VOLFILE_SERVER=`echo $1 | sed -e 's/\(.*\):.*/\1/'`
VOLUME_NAME=`echo $1 | sed -e 's/.*:\(.*\)/\1/'`
BRICKS=`get_bricks $VOLFILE_SERVER $VOLUME_NAME $SSHKEY`
for i in $BRICKS; do
HOST=`echo $i | sed -e 's/\(.*\):.*/\1/'`
BRICK=`echo $i | sed -e 's/.*:\(.*\)/\1/'`
cleanup_brick $HOST $BRICK $SSHKEY
done
SSH -i $SSHKEY $VOLFILE_SERVER "gluster --mode=script volume stop $VOLUME_NAME; gluster volume start $VOLUME_NAME";
}
function mount_client()
{
local T; # temporary mount
local i; # inode number
GFID_FILE=$3
SYNC_CMD=$4
T=$(mktemp -d);
glusterfs --aux-gfid-mount -s $1 --volfile-id $2 $T;
i=$(stat -c '%i' $T);
[ "x$i" = "x1" ] || fatal "could not mount volume $MASTER on $T";
cd $T;
$SYNC_CMD $GFID_FILE
cd -;
umount -l $T || fatal "could not umount $MASTER from $T";
rmdir $T || warn "rmdir of $T failed";
}
function sync_gfids()
{
SLAVE=$1
GFID_FILE=$2
SLAVE_VOLFILE_SERVER=`echo $SLAVE | sed -e 's/\(.*\):.*/\1/'`
SLAVE_VOLUME_NAME=`echo $SLAVE | sed -e 's/.*:\(.*\)/\1/'`
if [ "x$SLAVE_VOLFILE_SERVER" = "x" ]; then
SLAVE_VOLFILE_SERVER="localhost"
fi
mount_client $SLAVE_VOLFILE_SERVER $SLAVE_VOLUME_NAME $GFID_FILE $3
}
function upgrade()
{
SLAVE=$1
GFID_FILE=$2
SYNC_CMD=$3
SSHKEY=$4
cleanup_slave $SLAVE $SSHKEY
sync_gfids $SLAVE $GFID_FILE $SYNC_CMD
}
upgrade "$@"

View File

@ -0,0 +1,102 @@
#!/bin/bash
#usage: slave-upgrade.sh <volfile-server:volname> <gfid-file>
# <path-to-gsync-sync-gfid>
#<slave-volfile-server>: a machine on which gluster cli can fetch slave volume info.
# slave-volfile-server defaults to localhost.
#
#<gfid-file>: a file containing paths and their associated gfids
# on master. The paths are relative to master mount point
# (not absolute). An example extract of <gfid-file> can be,
#
# <extract>
# 22114455-57c5-46e9-a783-c40f83a72b09 /dir
# 25772386-3eb8-4550-a802-c3fdc938ca80 /dir/file
# </extract>
function get_bricks()
{
gluster volume info $1 | grep -E 'Brick[0-9]+' | sed -e 's/[^:]*:\(.*\)/\1/g'
}
function cleanup_brick()
{
HOST=$1
BRICK=$2
# TODO: write a C program to receive a list of files and does cleanup on
# them instead of spawning a new setfattr process for each file if
# performance is bad.
ssh $HOST "rm -rf $BRICK/.glusterfs/* && find $BRICK -exec setfattr -x trusted.gfid {} \; 2>/dev/null"
}
function cleanup_slave()
{
VOLUME_NAME=`echo $1 | sed -e 's/.*:\(.*\)/\1/'`
BRICKS=`get_bricks $VOLUME_NAME`
for i in $BRICKS; do
HOST=`echo $i | sed -e 's/\(.*\):.*/\1/'`
BRICK=`echo $i | sed -e 's/.*:\(.*\)/\1/'`
cleanup_brick $HOST $BRICK
done
# Now restart the volume
gluster --mode=script volume stop $VOLUME_NAME;
gluster volume start $VOLUME_NAME;
}
function mount_client()
{
local T; # temporary mount
local i; # inode number
VOLUME_NAME=$2;
GFID_FILE=$3
SYNC_CMD=$4
T=$(mktemp -d);
glusterfs --aux-gfid-mount -s $1 --volfile-id $VOLUME_NAME $T;
i=$(stat -c '%i' $T);
cd $T;
$SYNC_CMD $GFID_FILE
cd -;
umount $T || fatal "could not umount $MASTER from $T";
rmdir $T || warn "rmdir of $T failed";
}
function sync_gfids()
{
SLAVE=$1
GFID_FILE=$2
SYNC_CMD=$3
SLAVE_VOLFILE_SERVER=`echo $SLAVE | sed -e 's/\(.*\):.*/\1/'`
SLAVE_VOLUME_NAME=`echo $SLAVE | sed -e 's/.*:\(.*\)/\1/'`
if [ "x$SLAVE_VOLFILE_SERVER" = "x" ]; then
SLAVE_VOLFILE_SERVER="localhost"
fi
mount_client $SLAVE_VOLFILE_SERVER $SLAVE_VOLUME_NAME $GFID_FILE $SYNC_CMD
}
function upgrade()
{
SLAVE=$1
GFID_FILE=$2
SYNC_CMD=$3
cleanup_slave $SLAVE
sync_gfids $SLAVE $GFID_FILE $SYNC_CMD
}
upgrade "$@"

View File

@ -1 +1 @@
EXTRA_DIST = S29CTDBsetup.sh S30samba-start.sh S30samba-stop.sh S30samba-set.sh
EXTRA_DIST = S29CTDBsetup.sh S30samba-start.sh S30samba-stop.sh S30samba-set.sh Sglusterd-geo-rep-create-post.sh

View File

@ -1,5 +1,12 @@
gsyncddir = $(libexecdir)/glusterfs
gsyncd_SCRIPTS = gverify.sh peer_add_secret_pub peer_gsec_create
# peer_gsec_create is not added to EXTRA_DIST as it's derived
# from a .in file
EXTRA_DIST = gverify.sh peer_add_secret_pub
gsyncd_PROGRAMS = gsyncd
gsyncd_SOURCES = gsyncd.c procdiggy.c

View File

@ -285,6 +285,46 @@ invoke_rsync (int argc, char **argv)
return 1;
}
static int
invoke_gluster (int argc, char **argv)
{
int i = 0;
int j = 0;
int optsover = 0;
char *ov = NULL;
for (i = 1; i < argc; i++) {
ov = strtail (argv[i], "--");
if (ov && !optsover) {
if (*ov == '\0')
optsover = 1;
continue;
}
switch (++j) {
case 1:
if (strcmp (argv[i], "volume") != 0)
goto error;
break;
case 2:
if (strcmp (argv[i], "info") != 0)
goto error;
break;
case 3:
break;
default:
goto error;
}
}
argv[0] = "gluster";
execvp (SBIN_DIR"/gluster", argv);
fprintf (stderr, "exec of gluster failed\n");
return 127;
error:
fprintf (stderr, "disallowed gluster invocation\n");
return 1;
}
struct invocable {
char *name;
@ -292,8 +332,9 @@ struct invocable {
};
struct invocable invocables[] = {
{ "rsync", invoke_rsync },
{ "gsyncd", invoke_gsyncd },
{ "rsync", invoke_rsync },
{ "gsyncd", invoke_gsyncd },
{ "gluster", invoke_gluster },
{ NULL, NULL}
};

142
geo-replication/src/gverify.sh Executable file
View File

@ -0,0 +1,142 @@
#!/bin/bash
# Script to verify the Master and Slave Gluster compatibility.
# To use ./gverify <master volume> <slave host> <slave volume>
# Returns 0 if master and slave compatible.
BUFFER_SIZE=1000;
slave_log_file=`gluster --print-logdir`/geo-replication-slaves/slave.log
function SSHM()
{
ssh -q \
-oPasswordAuthentication=no \
-oStrictHostKeyChecking=no \
-oControlMaster=yes \
"$@";
}
function cmd_master()
{
VOL=$1;
local cmd_line;
cmd_line=$(cat <<EOF
function do_verify() {
v=\$1;
d=\$(mktemp -d 2>/dev/null);
glusterfs -s localhost --xlator-option="*dht.lookup-unhashed=off" --volfile-id \$v -l $slave_log_file \$d;
i=\$(stat -c "%i" \$d);
if [[ "\$i" -ne "1" ]]; then
echo 0:0;
exit 1;
fi;
cd \$d;
available_size=\$(df \$d | tail -1 | awk "{print \\\$2}");
umount -l \$d;
rmdir \$d;
ver=\$(gluster --version | head -1 | cut -f2 -d " ");
echo \$available_size:\$ver;
};
cd /tmp;
[ x$VOL != x ] && do_verify $VOL;
EOF
);
echo $cmd_line;
}
function cmd_slave()
{
VOL=$1;
local cmd_line;
cmd_line=$(cat <<EOF
function do_verify() {
v=\$1;
d=\$(mktemp -d 2>/dev/null);
glusterfs -s localhost --xlator-option="*dht.lookup-unhashed=off" --volfile-id \$v -l $slave_log_file \$d;
i=\$(stat -c "%i" \$d);
if [[ "\$i" -ne "1" ]]; then
echo 0:0;
exit 1;
fi;
cd \$d;
available_size=\$(df \$d | tail -1 | awk "{print \\\$4}");
umount -l \$d;
rmdir \$d;
ver=\$(gluster --version | head -1 | cut -f2 -d " ");
echo \$available_size:\$ver;
};
cd /tmp;
[ x$VOL != x ] && do_verify $VOL;
EOF
);
echo $cmd_line;
}
function master_stats()
{
MASTERVOL=$1;
local cmd_line;
cmd_line=$(cmd_master $MASTERVOL);
bash -c "$cmd_line";
}
function slave_stats()
{
SLAVEHOST=$1;
SLAVEVOL=$2;
local cmd_line;
cmd_line=$(cmd_slave $SLAVEVOL);
SSHM $SLAVEHOST bash -c "'$cmd_line'";
}
function main()
{
ERRORS=0;
master_data=$(master_stats $1);
slave_data=$(slave_stats $2 $3);
master_size=$(echo $master_data | cut -f1 -d':');
slave_size=$(echo $slave_data | cut -f1 -d':');
master_version=$(echo $master_data | cut -f2 -d':');
slave_version=$(echo $slave_data | cut -f2 -d':');
log_file=$4
if [[ "x$master_size" = "x" || "x$master_version" = "x" || "$master_size" -eq "0" ]]; then
echo "Unable to fetch master volume details." > $log_file;
exit 1;
fi;
if [[ "x$slave_size" = "x" || "x$slave_version" = "x" || "$slave_size" -eq "0" ]]; then
ping -w 5 $2;
if [ $? -ne 0 ]; then
echo "$2 not reachable." > $log_file
exit 1;
fi;
echo "Unable to fetch slave volume details." > $log_file;
exit 1;
fi;
if [ $slave_size -ge $(($master_size - $BUFFER_SIZE )) ]; then
echo "Total size of master is lesser than available size of slave." > $log_file;
else
echo "Total size of master is greater than available size of slave." > $log_file;
ERRORS=$(($ERRORS + 1));
exit $ERRORS;
fi;
if [[ $master_version < $slave_version || $master_version == $slave_version ]]; then
echo "Gluster version of master and slave matches." > $log_file;
else
echo "Gluster version mismatch between master and slave." > $log_file;
ERRORS=$(($ERRORS + 1));
exit $ERRORS;
fi;
exit $ERRORS;
}
main "$@";

View File

@ -0,0 +1,3 @@
#!/bin/bash
cat $1 >> ~/.ssh/authorized_keys

View File

@ -0,0 +1,12 @@
#!/bin/bash
prefix=@prefix@
exec_prefix=@exec_prefix@
if [ ! -f "$GLUSTERD_WORKING_DIR"/geo-replication/secret.pem.pub ]; then
\rm -rf "$GLUSTERD_WORKING_DIR"/geo-replication/secret.pem*
ssh-keygen -N '' -f "$GLUSTERD_WORKING_DIR"/geo-replication/secret.pem > /dev/null
fi
output=`echo command=\"@libexecdir@/glusterfs/gsyncd\" " "``cat "$GLUSTERD_WORKING_DIR"/geo-replication/secret.pem.pub`
echo $output

View File

@ -2,6 +2,6 @@ syncdaemondir = $(libexecdir)/glusterfs/python/syncdaemon
syncdaemon_PYTHON = gconf.py gsyncd.py __init__.py master.py README.md repce.py \
resource.py configinterface.py syncdutils.py monitor.py libcxattr.py \
$(top_builddir)/contrib/ipaddr-py/ipaddr.py
$(top_builddir)/contrib/ipaddr-py/ipaddr.py libgfchangelog.py
CLEANFILES =

View File

@ -11,13 +11,13 @@ Requirements are categorized according to this.
* Python >= 2.5, or 2.4 with Ctypes (see below) (both)
* OpenSSH >= 4.0 (master) / SSH2 compliant sshd (eg. openssh) (slave)
* rsync (both)
* glusterfs with marker support (master); glusterfs (optional on slave)
* FUSE; for supported versions consult glusterfs
* glusterfs: with marker and changelog support (master & slave);
* FUSE: glusterfs fuse module with auxilary gfid based access support
INSTALLATION
------------
As of now, the supported way of operation is running from the source directory.
As of now, the supported way of operation is running from the source directory or using the RPMs given.
If you use Python 2.4.x, you need to install the [Ctypes module](http://python.net/crew/theller/ctypes/).
@ -46,34 +46,11 @@ USAGE
-----
gsyncd is a utilitly for continous mirroring, ie. it mirrors master to slave incrementally.
Assume we have a gluster volume _pop_ at localhost. We try to set up the following mirrors
for it with gysncd:
Assume we have a gluster volume _pop_ at localhost. We try to set up the mirroring for volume
_pop_ using gsyncd for gluster volume _moz_ on remote machine/cluster @ example.com. The
respective gsyncd invocations are (demoing some syntax sugaring):
1. _/data/mirror_
2. local gluster volume _yow_
3. _/data/far_mirror_ at example.com
4. gluster volume _moz_ at example.com
The respective gsyncd invocations are (demoing some syntax sugaring):
1.
gsyncd.py gluster://localhost:pop file:///data/mirror
or short form
gsyncd.py :pop /data/mirror
2. `gsyncd :pop :yow`
3.
gsyncd.py :pop ssh://example.com:/data/far_mirror
or short form
gsyncd.py :pop example.com:/data/far_mirror
4. `gsyncd.py :pop example.com::moz`
`gsyncd.py :pop example.com::moz`
gsyncd has to be available on both sides; it's location on the remote side has to be specified
via the "--remote-gsyncd" option (or "remote-gsyncd" config file parameter). (This option can also be

View File

@ -24,9 +24,9 @@ class MultiDict(object):
def __getitem__(self, key):
val = None
for d in self.dicts:
if d.get(key):
if d.get(key) != None:
val = d[key]
if not val:
if val == None:
raise KeyError(key)
return val

View File

@ -2,10 +2,12 @@
import os
import os.path
import glob
import sys
import time
import logging
import signal
import shutil
import optparse
import fcntl
import fnmatch
@ -17,7 +19,7 @@ from ipaddr import IPAddress, IPNetwork
from gconf import gconf
from syncdutils import FreeObject, norm, grabpidfile, finalize, log_raise_exception
from syncdutils import GsyncdError, select, set_term_handler, privileged
from syncdutils import GsyncdError, select, set_term_handler, privileged, update_file
from configinterface import GConffile
import resource
from monitor import monitor
@ -109,6 +111,17 @@ def startup(**kw):
GLogger._gsyncd_loginit(**kw)
def _unlink(path):
try:
os.unlink(path)
except (OSError, IOError):
if sys.exc_info()[1].errno == ENOENT:
pass
else:
raise GsyncdError('Unlink error: %s' % path)
def main():
"""main routine, signal/exception handling boilerplates"""
gconf.starttime = time.time()
@ -153,21 +166,27 @@ def main_i():
op.add_option('--gluster-log-file', metavar='LOGF', default=os.devnull, type=str, action='callback', callback=store_abs)
op.add_option('--gluster-log-level', metavar='LVL')
op.add_option('--gluster-params', metavar='PRMS', default='')
op.add_option('--glusterd-uuid', metavar='UUID', type=str, default='', help=SUPPRESS_HELP)
op.add_option('--gluster-cli-options', metavar='OPTS', default='--log-file=-')
op.add_option('--mountbroker', metavar='LABEL')
op.add_option('-p', '--pid-file', metavar='PIDF', type=str, action='callback', callback=store_abs)
op.add_option('-l', '--log-file', metavar='LOGF', type=str, action='callback', callback=store_abs)
op.add_option('--log-file-mbr', metavar='LOGF', type=str, action='callback', callback=store_abs)
op.add_option('--state-file', metavar='STATF', type=str, action='callback', callback=store_abs)
op.add_option('--state-detail-file', metavar='STATF', type=str, action='callback', callback=store_abs)
op.add_option('--ignore-deletes', default=False, action='store_true')
op.add_option('--isolated-slave', default=False, action='store_true')
op.add_option('--use-rsync-xattrs', default=False, action='store_true')
op.add_option('-L', '--log-level', metavar='LVL')
op.add_option('-r', '--remote-gsyncd', metavar='CMD', default=os.path.abspath(sys.argv[0]))
op.add_option('--volume-id', metavar='UUID')
op.add_option('--slave-id', metavar='ID')
op.add_option('--session-owner', metavar='ID')
op.add_option('--local-id', metavar='ID', help=SUPPRESS_HELP, default='')
op.add_option('--local-path', metavar='PATH', help=SUPPRESS_HELP, default='')
op.add_option('-s', '--ssh-command', metavar='CMD', default='ssh')
op.add_option('--rsync-command', metavar='CMD', default='rsync')
op.add_option('--rsync-options', metavar='OPTS', default='--sparse')
op.add_option('--rsync-options', metavar='OPTS', default='')
op.add_option('--rsync-ssh-options', metavar='OPTS', default='--compress')
op.add_option('--timeout', metavar='SEC', type=int, default=120)
op.add_option('--connection-timeout', metavar='SEC', type=int, default=60, help=SUPPRESS_HELP)
@ -186,15 +205,28 @@ def main_i():
# see crawl() for usage of the above tunables
op.add_option('--special-sync-mode', type=str, help=SUPPRESS_HELP)
# changelog or xtime? (TODO: Change the default)
op.add_option('--change-detector', metavar='MODE', type=str, default='xtime')
# sleep interval for change detection (xtime crawl uses a hardcoded 1 second sleep time)
op.add_option('--change-interval', metavar='SEC', type=int, default=3)
# working directory for changelog based mechanism
op.add_option('--working-dir', metavar='DIR', type=str, action='callback', callback=store_abs)
op.add_option('-c', '--config-file', metavar='CONF', type=str, action='callback', callback=store_local)
# duh. need to specify dest or value will be mapped to None :S
op.add_option('--monitor', dest='monitor', action='callback', callback=store_local_curry(True))
op.add_option('--resource-local', dest='resource_local', type=str, action='callback', callback=store_local)
op.add_option('--resource-remote', dest='resource_remote', type=str, action='callback', callback=store_local)
op.add_option('--feedback-fd', dest='feedback_fd', type=int, help=SUPPRESS_HELP, action='callback', callback=store_local)
op.add_option('--listen', dest='listen', help=SUPPRESS_HELP, action='callback', callback=store_local_curry(True))
op.add_option('-N', '--no-daemon', dest="go_daemon", action='callback', callback=store_local_curry('dont'))
op.add_option('--verify', type=str, dest="verify", action='callback', callback=store_local)
op.add_option('--create', type=str, dest="create", action='callback', callback=store_local)
op.add_option('--delete', dest='delete', action='callback', callback=store_local_curry(True))
op.add_option('--debug', dest="go_daemon", action='callback', callback=lambda *a: (store_local_curry('dont')(*a),
setattr(a[-1].values, 'log_file', '-'),
setattr(a[-1].values, 'log_level', 'DEBUG'))),
op.add_option('--path', type=str, action='append')
for a in ('check', 'get'):
op.add_option('--config-' + a, metavar='OPT', type=str, dest='config', action='callback',
@ -225,6 +257,19 @@ def main_i():
# values container.
defaults = op.get_default_values()
opts, args = op.parse_args(values=optparse.Values())
args_orig = args[:]
r = rconf.get('resource_local')
if r:
if len(args) == 0:
args.append(None)
args[0] = r
r = rconf.get('resource_remote')
if r:
if len(args) == 0:
raise GsyncdError('local resource unspecfied')
elif len(args) == 1:
args.append(None)
args[1] = r
confdata = rconf.get('config')
if not (len(args) == 2 or \
(len(args) == 1 and rconf.get('listen')) or \
@ -234,6 +279,12 @@ def main_i():
sys.stderr.write(op.get_usage() + "\n")
sys.exit(1)
verify = rconf.get('verify')
if verify:
logging.info (verify)
logging.info ("Able to spawn gsyncd.py")
return
restricted = os.getenv('_GSYNCD_RESTRICTED_')
if restricted:
@ -250,6 +301,17 @@ def main_i():
(k, v))
confrx = getattr(confdata, 'rx', None)
def makersc(aa, check=True):
if not aa:
return ([], None, None)
ra = [resource.parse_url(u) for u in aa]
local = ra[0]
remote = None
if len(ra) > 1:
remote = ra[1]
if check and not local.can_connect_to(remote):
raise GsyncdError("%s cannot work with %s" % (local.path, remote and remote.path))
return (ra, local, remote)
if confrx:
# peers are regexen, don't try to parse them
if confrx == 'glob':
@ -257,27 +319,20 @@ def main_i():
canon_peers = args
namedict = {}
else:
rscs = [resource.parse_url(u) for u in args]
dc = rconf.get('url_print')
rscs, local, remote = makersc(args_orig, not dc)
if dc:
for r in rscs:
print(r.get_url(**{'normal': {},
'canon': {'canonical': True},
'canon_esc': {'canonical': True, 'escaped': True}}[dc]))
return
local = remote = None
if rscs:
local = rscs[0]
if len(rscs) > 1:
remote = rscs[1]
if not local.can_connect_to(remote):
raise GsyncdError("%s cannot work with %s" % (local.path, remote and remote.path))
pa = ([], [], [])
urlprms = ({}, {'canonical': True}, {'canonical': True, 'escaped': True})
for x in rscs:
for i in range(len(pa)):
pa[i].append(x.get_url(**urlprms[i]))
peers, canon_peers, canon_esc_peers = pa
_, canon_peers, canon_esc_peers = pa
# creating the namedict, a dict representing various ways of referring to / repreenting
# peers to be fillable in config templates
mods = (lambda x: x, lambda x: x[0].upper() + x[1:], lambda x: 'e' + x[0].upper() + x[1:])
@ -327,6 +382,39 @@ def main_i():
gconf.__dict__.update(opts.__dict__)
gconf.configinterface = gcnf
delete = rconf.get('delete')
if delete:
logging.info ('geo-replication delete')
# Delete pid file, status file, socket file
cleanup_paths = []
if getattr(gconf, 'pid_file', None):
cleanup_paths.append(gconf.pid_file)
if getattr(gconf, 'state_file', None):
cleanup_paths.append(gconf.state_file)
if getattr(gconf, 'state_detail_file', None):
cleanup_paths.append(gconf.state_detail_file)
if getattr(gconf, 'state_socket_unencoded', None):
cleanup_paths.append(gconf.state_socket_unencoded)
# Cleanup changelog working dirs
if getattr(gconf, 'working_dir', None):
try:
shutil.rmtree(gconf.working_dir)
except (IOError, OSError):
if sys.exc_info()[1].errno == ENOENT:
pass
else:
raise GsyncdError('Error while removing working dir: %s' % gconf.working_dir)
for path in cleanup_paths:
# To delete temp files
for f in glob.glob(path + "*"):
_unlink(f)
return
if restricted and gconf.allow_network:
ssh_conn = os.getenv('SSH_CONNECTION')
if not ssh_conn:
@ -380,9 +468,16 @@ def main_i():
raise
return
create = rconf.get('create')
if create:
if getattr(gconf, 'state_file', None):
update_file(gconf.state_file, lambda f: f.write(create + '\n'))
return
go_daemon = rconf['go_daemon']
be_monitor = rconf.get('monitor')
rscs, local, remote = makersc(args)
if not be_monitor and isinstance(remote, resource.SSH) and \
go_daemon == 'should':
go_daemon = 'postconn'
@ -393,16 +488,16 @@ def main_i():
label = 'monitor'
elif remote:
#master
label = ''
label = gconf.local_path
else:
label = 'slave'
startup(go_daemon=go_daemon, log_file=log_file, label=label)
resource.Popen.init_errhandler()
if be_monitor:
return monitor()
return monitor(*rscs)
logging.info("syncing: %s" % " -> ".join(peers))
resource.Popen.init_errhandler()
logging.info("syncing: %s" % " -> ".join(r.url for r in rscs))
if remote:
go_daemon = remote.connect_remote(go_daemon=go_daemon)
if go_daemon:

View File

@ -42,6 +42,16 @@ class Xattr(object):
def lgetxattr(cls, path, attr, siz=0):
return cls._query_xattr( path, siz, 'lgetxattr', attr)
@classmethod
def lgetxattr_buf(cls, path, attr):
"""lgetxattr variant with size discovery"""
size = cls.lgetxattr(path, attr)
if size == -1:
cls.raise_oserr()
if size == 0:
return ''
return cls.lgetxattr(path, attr, size)
@classmethod
def llistxattr(cls, path, siz=0):
ret = cls._query_xattr(path, siz, 'llistxattr')
@ -55,6 +65,11 @@ class Xattr(object):
if ret == -1:
cls.raise_oserr()
@classmethod
def lsetxattr_l(cls, path, attr, val):
""" lazy lsetxattr(): caller handles errno """
cls.libc.lsetxattr(path, attr, val, len(val), 0)
@classmethod
def lremovexattr(cls, path, attr):
ret = cls.libc.lremovexattr(path, attr)

View File

@ -0,0 +1,64 @@
import os
from ctypes import *
from ctypes.util import find_library
class Changes(object):
libgfc = CDLL(find_library("gfchangelog"), use_errno=True)
@classmethod
def geterrno(cls):
return get_errno()
@classmethod
def raise_oserr(cls):
errn = cls.geterrno()
raise OSError(errn, os.strerror(errn))
@classmethod
def _get_api(cls, call):
return getattr(cls.libgfc, call)
@classmethod
def cl_register(cls, brick, path, log_file, log_level, retries = 0):
ret = cls._get_api('gf_changelog_register')(brick, path,
log_file, log_level, retries)
if ret == -1:
cls.raise_oserr()
@classmethod
def cl_scan(cls):
ret = cls._get_api('gf_changelog_scan')()
if ret == -1:
cls.raise_oserr()
@classmethod
def cl_startfresh(cls):
ret = cls._get_api('gf_changelog_start_fresh')()
if ret == -1:
cls.raise_oserr()
@classmethod
def cl_getchanges(cls):
""" remove hardcoding for path name length """
def clsort(f):
return f.split('.')[-1]
changes = []
buf = create_string_buffer('\0', 4096)
call = cls._get_api('gf_changelog_next_change')
while True:
ret = call(buf, 4096)
if ret in (0, -1):
break;
changes.append(buf.raw[:ret-1])
if ret == -1:
cls.raise_oserr()
# cleanup tracker
cls.cl_startfresh()
return sorted(changes, key=clsort)
@classmethod
def cl_done(cls, clfile):
ret = cls._get_api('gf_changelog_done')(clfile)
if ret == -1:
cls.raise_oserr()

View File

@ -4,22 +4,20 @@ import time
import stat
import random
import signal
import json
import logging
import socket
import string
import errno
import re
from errno import ENOENT, ENODATA, EPIPE
from shutil import copyfileobj
from errno import ENOENT, ENODATA, EPIPE, EEXIST
from threading import currentThread, Condition, Lock
from datetime import datetime
try:
from hashlib import md5 as md5
except ImportError:
# py 2.4
from md5 import new as md5
from gconf import gconf
from syncdutils import FreeObject, Thread, GsyncdError, boolify, \
escape, unescape, select
from tempfile import mkdtemp, NamedTemporaryFile
from syncdutils import FreeObject, Thread, GsyncdError, boolify, escape, \
unescape, select, gauxpfx, md5hex, selfkill, entry2pb
URXTIME = (-1, 0)
@ -51,18 +49,20 @@ def _volinfo_hook_relax_foreign(self):
# The API!
def gmaster_builder():
def gmaster_builder(excrawl=None):
"""produce the GMaster class variant corresponding
to sync mode"""
this = sys.modules[__name__]
modemixin = gconf.special_sync_mode
if not modemixin:
modemixin = 'normal'
logging.info('setting up master for %s sync mode' % modemixin)
changemixin = isinstance(excrawl, str) and excrawl or gconf.change_detector
logging.info('setting up %s change detection mode' % changemixin)
modemixin = getattr(this, modemixin.capitalize() + 'Mixin')
crawlmixin = getattr(this, 'GMaster' + changemixin.capitalize() + 'Mixin')
sendmarkmixin = boolify(gconf.use_rsync_xattrs) and SendmarkRsyncMixin or SendmarkNormalMixin
purgemixin = boolify(gconf.ignore_deletes) and PurgeNoopMixin or PurgeNormalMixin
class _GMaster(GMasterBase, modemixin, sendmarkmixin, purgemixin):
class _GMaster(crawlmixin, modemixin, sendmarkmixin, purgemixin):
pass
return _GMaster
@ -100,12 +100,9 @@ class NormalMixin(object):
def make_xtime_opts(self, is_master, opts):
if not 'create' in opts:
opts['create'] = is_master and not self.inter_master
opts['create'] = is_master
if not 'default_xtime' in opts:
if is_master and self.inter_master:
opts['default_xtime'] = ENODATA
else:
opts['default_xtime'] = URXTIME
opts['default_xtime'] = URXTIME
def xtime_low(self, server, path, **opts):
xt = server.xtime(path, self.uuid)
@ -114,7 +111,7 @@ class NormalMixin(object):
if xt == ENODATA or xt < self.volmark:
if opts['create']:
xt = _xtime_now()
server.set_xtime(path, self.uuid, xt)
server.aggregated.set_xtime(path, self.uuid, xt)
else:
xt = opts['default_xtime']
return xt
@ -151,6 +148,13 @@ class NormalMixin(object):
def set_slave_xtime(self, path, mark):
self.slave.server.set_xtime(path, self.uuid, mark)
class PartialMixin(NormalMixin):
"""a variant tuned towards operation with a master
that has partial info of the slave (brick typically)"""
def xtime_reversion_hook(self, path, xtl, xtr):
pass
class WrapupMixin(NormalMixin):
"""a variant that differs from normal in terms
of ignoring non-indexed files"""
@ -163,7 +167,7 @@ class WrapupMixin(NormalMixin):
opts['default_xtime'] = URXTIME
@staticmethod
def keepalive_payload_hook(timo, gap):
def keepalive_payload_hook(self, timo, gap):
return (None, gap)
def volinfo_hook(self):
@ -236,19 +240,19 @@ class BlindMixin(object):
# from interrupted gsyncd transfer
logging.warn('have to fix up missing xtime on ' + path)
xt0 = _xtime_now()
server.set_xtime(path, self.uuid, xt0)
server.aggregated.set_xtime(path, self.uuid, xt0)
else:
xt0 = opts['default_xtime']
xt = (xt0, xt[1])
return xt
@staticmethod
def keepalive_payload_hook(timo, gap):
def keepalive_payload_hook(self, timo, gap):
return (None, gap)
def volinfo_hook(self):
res = _volinfo_hook_relax_foreign(self)
volinfo_r_new = self.slave.server.native_volume_info()
volinfo_r_new = self.slave.server.aggregated.native_volume_info()
if volinfo_r_new['retval']:
raise GsyncdError("slave is corrupt")
if getattr(self, 'volinfo_r', None):
@ -321,9 +325,7 @@ class PurgeNoopMixin(object):
def purge_missing(self, path, names):
pass
class GMasterBase(object):
class GMasterCommon(object):
"""abstract class impementling master role"""
KFGN = 0
@ -334,8 +336,8 @@ class GMasterBase(object):
err out on multiple foreign masters
"""
fgn_vis, nat_vi = self.master.server.foreign_volume_infos(), \
self.master.server.native_volume_info()
fgn_vis, nat_vi = self.master.server.aggregated.foreign_volume_infos(), \
self.master.server.aggregated.native_volume_info()
fgn_vi = None
if fgn_vis:
if len(fgn_vis) > 1:
@ -376,6 +378,33 @@ class GMasterBase(object):
self.make_xtime_opts(rsc == self.master, opts)
return self.xtime_low(rsc.server, path, **opts)
def get_initial_crawl_data(self):
default_data = {'sync_time': 0, 'files_synced': 0, 'bytes_synced': 0}
if getattr(gconf, 'state_detail_file', None):
try:
return json.load(open(gconf.state_detail_file))
except (IOError, OSError):
ex = sys.exc_info()[1]
if ex.errno == ENOENT:
# Create file with initial data
with open(gconf.state_detail_file, 'wb') as f:
json.dump(default_data, f)
return default_data
else:
raise
return default_data
def update_crawl_data(self):
if getattr(gconf, 'state_detail_file', None):
try:
same_dir = os.path.dirname(gconf.state_detail_file)
with NamedTemporaryFile(dir=same_dir, delete=False) as tmp:
json.dump(self.total_crawl_stats, tmp)
os.rename(tmp.name, gconf.state_detail_file)
except (IOError, OSError):
raise
def __init__(self, master, slave):
self.master = master
self.slave = slave
@ -392,15 +421,12 @@ class GMasterBase(object):
self.crawls = 0
self.turns = 0
self.total_turns = int(gconf.turns)
self.lastreport = {'crawls': 0, 'turns': 0}
self.lastreport = {'crawls': 0, 'turns': 0, 'time': 0}
self.crawl_stats = {'sync_time': 0, 'last_synctime': 0, 'crawl_starttime': 0,
'crawl_time': 0, 'files_synced': 0, 'bytes_synced' :0}
self.total_crawl_stats = self.get_initial_crawl_data()
self.start = None
self.change_seen = None
self.syncTime=0
self.lastSyncTime=0
self.crawlStartTime=0
self.crawlTime=0
self.filesSynced=0
self.bytesSynced=0
# the authoritative (foreign, native) volinfo pair
# which lets us deduce what to do when we refetch
# the volinfos from system
@ -409,8 +435,94 @@ class GMasterBase(object):
# the actual volinfo we make use of
self.volinfo = None
self.terminate = False
self.sleep_interval = 1
self.checkpoint_thread = None
def init_keep_alive(cls):
"""start the keep-alive thread """
timo = int(gconf.timeout or 0)
if timo > 0:
def keep_alive():
while True:
vi, gap = cls.keepalive_payload_hook(timo, timo * 0.5)
cls.slave.server.keep_alive(vi)
time.sleep(gap)
t = Thread(target=keep_alive)
t.start()
def volinfo_query(self):
"""volume info state machine"""
volinfo_sys, state_change = self.volinfo_hook()
if self.inter_master:
self.volinfo = volinfo_sys[self.KFGN]
else:
self.volinfo = volinfo_sys[self.KNAT]
if state_change == self.KFGN or (state_change == self.KNAT and not self.inter_master):
logging.info('new master is %s', self.uuid)
if self.volinfo:
logging.info("%s master with volume id %s ..." % \
(self.inter_master and "intermediate" or "primary",
self.uuid))
if state_change == self.KFGN:
gconf.configinterface.set('volume_id', self.uuid)
if self.volinfo:
if self.volinfo['retval']:
raise GsyncdError ("master is corrupt")
self.start_checkpoint_thread()
else:
if should_display_info or self.crawls == 0:
if self.inter_master:
logging.info("waiting for being synced from %s ..." % \
self.volinfo_state[self.KFGN]['uuid'])
else:
logging.info("waiting for volume info ...")
return True
def should_crawl(cls):
return (gconf.glusterd_uuid in cls.master.server.node_uuid())
def register(self):
self.register()
def crawlwrap(self, oneshot=False):
if oneshot:
# it's important to do this during the oneshot crawl as
# for a passive gsyncd (ie. in a replicate scenario)
# the keepalive thread would keep the connection alive.
self.init_keep_alive()
self.lastreport['time'] = time.time()
self.crawl_stats['crawl_starttime'] = datetime.now()
logging.info('crawl interval: %d seconds' % self.sleep_interval)
t0 = time.time()
crawl = self.should_crawl()
while not self.terminate:
if self.volinfo_query():
continue
t1 = time.time()
if int(t1 - t0) >= 60: #lets hardcode this check to 60 seconds
crawl = self.should_crawl()
t0 = t1
if not crawl:
time.sleep(5)
continue
if self.start:
logging.debug("... crawl #%d done, took %.6f seconds" % \
(self.crawls, time.time() - self.start))
self.start = t1
should_display_info = self.start - self.lastreport['time'] >= 60
if should_display_info:
logging.info("%d crawls, %d turns",
self.crawls - self.lastreport['crawls'],
self.turns - self.lastreport['turns'])
self.lastreport.update(crawls = self.crawls,
turns = self.turns,
time = self.start)
self.crawl()
if oneshot:
return
time.sleep(self.sleep_interval)
@classmethod
def _checkpt_param(cls, chkpt, prm, xtimish=True):
"""use config backend to lookup a parameter belonging to
@ -443,32 +555,37 @@ class GMasterBase(object):
return ts
def get_extra_info(self):
str_info="\nFile synced : %d" %(self.filesSynced)
str_info+="\nBytes Synced : %d KB" %(self.syncer.bytesSynced)
str_info+="\nSync Time : %f seconds" %(self.syncTime)
self.crawlTime=datetime.now()-self.crawlStartTime
years , days =divmod(self.crawlTime.days,365.25)
years=int(years)
days=int(days)
str_info = "\nFilesSynced=%d;" % (self.crawl_stats['files_synced'])
str_info += "BytesSynced=%s;" % (self.crawl_stats['bytes_synced'])
self.crawl_stats['crawl_time'] = datetime.now() - self.crawl_stats['crawl_starttime']
str_info += "Uptime=%s;" % (self._crawl_time_format(self.crawl_stats['crawl_time']))
str_info += "SyncTime=%s;" % (self.crawl_stats['sync_time'])
str_info += "TotalSyncTime=%s;" % (self.total_crawl_stats['sync_time'])
str_info += "TotalFilesSynced=%d;" % (self.total_crawl_stats['files_synced'])
str_info += "TotalBytesSynced=%s;" % (self.total_crawl_stats['bytes_synced'])
str_info += "\0"
logging.debug(str_info)
return str_info
def _crawl_time_format(self, crawl_time):
# Ex: 5 years, 4 days, 20:23:10
years, days = divmod(crawl_time.days, 365.25)
years = int(years)
days = int(days)
date=""
m, s = divmod(self.crawlTime.seconds, 60)
m, s = divmod(crawl_time.seconds, 60)
h, m = divmod(m, 60)
if years!=0 :
date+=str(years)+" year "
if days!=0 :
date+=str(days)+" day "
if h!=0 :
date+=str(h)+" H : "
if m!=0 or h!=0 :
date+=str(m)+" M : "
if years != 0:
date += "%s %s " % (years, "year" if years == 1 else "years")
if days != 0:
date += "%s %s " % (days, "day" if days == 1 else "days")
date+=str(s)+" S"
self.crawlTime=date
str_info+="\nCrawl Time : %s" %(str(self.crawlTime))
str_info+="\n\0"
return str_info
date += "%s:%s:%s" % (string.zfill(h, 2), string.zfill(m, 2), string.zfill(s, 2))
return date
def checkpt_service(self, chan, chkpt, tgt):
"""checkpoint service loop
@ -517,7 +634,7 @@ class GMasterBase(object):
try:
conn, _ = chan.accept()
try:
conn.send(" | checkpoint %s %s %s" % (chkpt, status,self.get_extra_info()))
conn.send(" | checkpoint %s %s %s" % (chkpt, status, self.get_extra_info()))
except:
exc = sys.exc_info()[1]
if (isinstance(exc, OSError) or isinstance(exc, IOError)) and \
@ -536,7 +653,7 @@ class GMasterBase(object):
):
return
chan = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
state_socket = os.path.join(gconf.socketdir, md5(gconf.state_socket_unencoded).hexdigest() + ".socket")
state_socket = os.path.join(gconf.socketdir, md5hex(gconf.state_socket_unencoded) + ".socket")
try:
os.unlink(state_socket)
except:
@ -559,22 +676,6 @@ class GMasterBase(object):
t.start()
self.checkpoint_thread = t
def crawl_loop(self):
"""start the keep-alive thread and iterate .crawl"""
timo = int(gconf.timeout or 0)
if timo > 0:
def keep_alive():
while True:
vi, gap = self.keepalive_payload_hook(timo, timo * 0.5)
self.slave.server.keep_alive(vi)
time.sleep(gap)
t = Thread(target=keep_alive)
t.start()
self.lastreport['time'] = time.time()
self.crawlStartTime=datetime.now()
while not self.terminate:
self.crawl()
def add_job(self, path, label, job, *a, **kw):
"""insert @job function to job table at @path with @label"""
if self.jobtab.get(path) == None:
@ -600,7 +701,7 @@ class GMasterBase(object):
ret = j[-1]()
if not ret:
succeed = False
if succeed:
if succeed and not args[0] == None:
self.sendmark(path, *args)
return succeed
@ -653,6 +754,319 @@ class GMasterBase(object):
tuple(srep(vi) for vi in volinfo_state + volinfo_sys + newstate))
return newstate, param.state_change
class GMasterChangelogMixin(GMasterCommon):
""" changelog based change detection and syncing """
# index for change type and entry
IDX_START = 0
IDX_END = 2
POS_GFID = 0
POS_TYPE = 1
POS_ENTRY1 = 2
POS_ENTRY2 = 3 # renames
_CL_TYPE_DATA_PFX = "D "
_CL_TYPE_METADATA_PFX = "M "
_CL_TYPE_ENTRY_PFX = "E "
TYPE_GFID = [_CL_TYPE_DATA_PFX] # ignoring metadata ops
TYPE_ENTRY = [_CL_TYPE_ENTRY_PFX]
# flat directory heirarchy for gfid based access
FLAT_DIR_HIERARCHY = '.'
def fallback_xsync(self):
logging.info('falling back to xsync mode')
gconf.configinterface.set('change-detector', 'xsync')
selfkill()
def setup_working_dir(self):
workdir = os.path.join(gconf.working_dir, md5hex(gconf.local_path))
logfile = os.path.join(workdir, 'changes.log')
logging.debug('changelog working dir %s (log: %s)' % (workdir, logfile))
return (workdir, logfile)
def lstat(self, e):
try:
return os.lstat(e)
except (IOError, OSError):
ex = sys.exc_info()[1]
if ex.errno == ENOENT:
return ex.errno
else:
raise
# sync data
def syncdata(self, datas):
logging.debug('datas: %s' % (datas))
for data in datas:
logging.debug('candidate for syncing %s' % data)
pb = self.syncer.add(data)
timeA = datetime.now()
def regjob(se, xte, pb):
rv = pb.wait()
if rv[0]:
logging.debug('synced ' + se)
# update stats
timeB = datetime.now()
self.crawl_stats['last_synctime'] = timeB - timeA
self.crawl_stats['sync_time'] += ((self.crawl_stats['last_synctime'].microseconds) / (10.0 ** 6))
self.crawl_stats['files_synced'] += 1
self.crawl_stats['bytes_synced'] += self.syncer.bytes_synced
# cumulative statistics
self.total_crawl_stats['bytes_synced'] += self.syncer.bytes_synced
self.total_crawl_stats['sync_time'] += ((self.crawl_stats['last_synctime'].microseconds) / (10.0 ** 6))
self.total_crawl_stats['files_synced'] += 1
return True
else:
if rv[1] in [23, 24]:
# stat to check if the file exist
st = self.lstat(se)
if isinstance(st, int):
# file got unlinked in the interim
return True
logging.warn('Rsync: %s [errcode: %d]' % (se, rv[1]))
self.add_job(self.FLAT_DIR_HIERARCHY, 'reg', regjob, data, None, pb)
if self.wait(self.FLAT_DIR_HIERARCHY, None):
self.update_crawl_data()
return True
def process_change(self, change, done):
clist = []
entries = []
purges = set()
links = set()
datas = set()
pfx = gauxpfx()
try:
f = open(change, "r")
clist = f.readlines()
f.close()
except IOError:
raise
def edct(op, **ed):
dct = {}
dct['op'] = op
for k in ed:
if k == 'stat':
st = ed[k]
dst = dct['stat'] = {}
dst['uid'] = st.st_uid
dst['gid'] = st.st_gid
dst['mode'] = st.st_mode
else:
dct[k] = ed[k]
return dct
for e in clist:
e = e.strip()
et = e[self.IDX_START:self.IDX_END]
ec = e[self.IDX_END:].split(' ')
if et in self.TYPE_ENTRY:
ty = ec[self.POS_TYPE]
en = unescape(os.path.join(pfx, ec[self.POS_ENTRY1]))
gfid = ec[self.POS_GFID]
# definitely need a better way bucketize entry ops
if ty in ['UNLINK', 'RMDIR']:
entries.append(edct(ty, gfid=gfid, entry=en))
purges.update([os.path.join(pfx, gfid)])
continue
if not ty == 'RENAME':
go = os.path.join(pfx, gfid)
st = self.lstat(go)
if isinstance(st, int):
logging.debug('file %s got purged in the interim' % go)
continue
if ty in ['CREATE', 'MKDIR', 'MKNOD']:
entries.append(edct(ty, stat=st, entry=en, gfid=gfid))
elif ty == 'LINK':
entries.append(edct(ty, stat=st, entry=en, gfid=gfid))
links.update([os.path.join(pfx, gfid)])
elif ty == 'SYMLINK':
entries.append(edct(ty, stat=st, entry=en, gfid=gfid, link=os.readlink(en)))
elif ty == 'RENAME':
e2 = unescape(os.path.join(pfx, ec[self.POS_ENTRY2]))
entries.append(edct(ty, gfid=gfid, entry=en, entry1=e2))
else:
pass
elif et in self.TYPE_GFID:
da = os.path.join(pfx, ec[0])
st = self.lstat(da)
if isinstance(st, int):
logging.debug('file %s got purged in the interim' % da)
continue
datas.update([da])
logging.debug('entries: %s' % repr(entries))
# sync namespace
if (entries):
self.slave.server.entry_ops(entries)
# sync data
if self.syncdata(datas - (purges - links)):
if done:
self.master.server.changelog_done(change)
return True
def process(self, changes, done=1):
for change in changes:
times = 0
while True:
times += 1
logging.debug('processing change %s [%d time(s)]' % (change, times))
if self.process_change(change, done):
break
# it's either entry_ops() or Rsync that failed to do it's
# job. Mostly it's entry_ops() [which currently has a problem
# of failing to create an entry but failing to return an errno]
# Therefore we do not know if it's either Rsync or the freaking
# entry_ops() that failed... so we retry the _whole_ changelog
# again.
# TODO: remove entry retries when it's gets fixed.
logging.warn('incomplete sync, retrying changelog: %s' % change)
time.sleep(0.5)
self.turns += 1
def upd_stime(self, stime):
if stime:
self.sendmark(self.FLAT_DIR_HIERARCHY, stime)
def crawl(self):
changes = []
try:
self.master.server.changelog_scan()
self.crawls += 1
except OSError:
self.fallback_xsync()
changes = self.master.server.changelog_getchanges()
if changes:
xtl = self.xtime(self.FLAT_DIR_HIERARCHY)
if isinstance(xtl, int):
raise GsyncdError('master is corrupt')
logging.debug('processing changes %s' % repr(changes))
self.process(changes)
self.upd_stime(xtl)
def register(self):
(workdir, logfile) = self.setup_working_dir()
self.sleep_interval = int(gconf.change_interval)
# register with the changelog library
try:
# 9 == log level (DEBUG)
# 5 == connection retries
self.master.server.changelog_register(gconf.local_path,
workdir, logfile, 9, 5)
except OSError:
self.fallback_xsync()
# control should not reach here
raise
class GMasterXsyncMixin(GMasterChangelogMixin):
"""
This crawl needs to be xtime based (as of now
it's not. this is beacuse we generate CHANGELOG
file during each crawl which is then processed
by process_change()).
For now it's used as a one-shot initial sync
mechanism and only syncs directories, regular
files and symlinks.
"""
def register(self):
self.sleep_interval = 60
self.tempdir = self.setup_working_dir()[0]
self.tempdir = os.path.join(self.tempdir, 'xsync')
logging.info('xsync temp directory: %s' % self.tempdir)
try:
os.makedirs(self.tempdir)
except OSError:
ex = sys.exc_info()[1]
if ex.errno == EEXIST and os.path.isdir(self.tempdir):
pass
else:
raise
def write_entry_change(self, prefix, data=[]):
self.fh.write("%s %s\n" % (prefix, ' '.join(data)))
def open(self):
try:
self.xsync_change = os.path.join(self.tempdir, 'XSYNC-CHANGELOG.' + str(int(time.time())))
self.fh = open(self.xsync_change, 'w')
except IOError:
raise
def close(self):
self.fh.close()
def fname(self):
return self.xsync_change
def crawl(self, path='.', xtr=None, done=0):
""" generate a CHANGELOG file consumable by process_change """
if path == '.':
self.open()
self.crawls += 1
if not xtr:
# get the root stime and use it for all comparisons
xtr = self.xtime('.', self.slave)
if isinstance(xtr, int):
if xtr != ENOENT:
raise GsyncdError('slave is corrupt')
xtr = self.minus_infinity
xtl = self.xtime(path)
if isinstance(xtl, int):
raise GsyncdError('master is corrupt')
if xtr == xtl:
if path == '.':
self.close()
return
self.xtime_reversion_hook(path, xtl, xtr)
logging.debug("entering " + path)
dem = self.master.server.entries(path)
pargfid = self.master.server.gfid(path)
if isinstance(pargfid, int):
logging.warn('skipping directory %s' % (path))
for e in dem:
bname = e
e = os.path.join(path, e)
st = self.lstat(e)
if isinstance(st, int):
logging.warn('%s got purged in the interim..' % e)
continue
gfid = self.master.server.gfid(e)
if isinstance(gfid, int):
logging.warn('skipping entry %s..' % (e))
continue
xte = self.xtime(e)
if isinstance(xte, int):
raise GsyncdError('master is corrupt')
if not self.need_sync(e, xte, xtr):
continue
mo = st.st_mode
if stat.S_ISDIR(mo):
self.write_entry_change("E", [gfid, 'MKDIR', escape(os.path.join(pargfid, bname))])
self.crawl(e, xtr)
elif stat.S_ISREG(mo):
self.write_entry_change("E", [gfid, 'CREATE', escape(os.path.join(pargfid, bname))])
self.write_entry_change("D", [gfid])
elif stat.S_ISLNK(mo):
self.write_entry_change("E", [gfid, 'SYMLINK', escape(os.path.join(pargfid, bname))])
else:
logging.info('ignoring %s' % e)
if path == '.':
logging.info('processing xsync changelog %s' % self.fname())
self.close()
self.process([self.fname()], done)
self.upd_stime(xtl)
class GMasterXtimeMixin(GMasterCommon):
""" xtime based change detection and syncing """
def register(self):
pass
def crawl(self, path='.', xtl=None):
"""crawling...
@ -691,46 +1105,6 @@ class GMasterBase(object):
assert that the file systems (master / slave) underneath do not change and actions
taken upon some condition will not lose their context by the time they are performed.
"""
if path == '.':
if self.start:
self.crawls += 1
logging.debug("... crawl #%d done, took %.6f seconds" % \
(self.crawls, time.time() - self.start))
time.sleep(1)
self.start = time.time()
should_display_info = self.start - self.lastreport['time'] >= 60
if should_display_info:
logging.info("completed %d crawls, %d turns",
self.crawls - self.lastreport['crawls'],
self.turns - self.lastreport['turns'])
self.lastreport.update(crawls = self.crawls,
turns = self.turns,
time = self.start)
volinfo_sys, state_change = self.volinfo_hook()
if self.inter_master:
self.volinfo = volinfo_sys[self.KFGN]
else:
self.volinfo = volinfo_sys[self.KNAT]
if state_change == self.KFGN or (state_change == self.KNAT and not self.inter_master):
logging.info('new master is %s', self.uuid)
if self.volinfo:
logging.info("%s master with volume id %s ..." % \
(self.inter_master and "intermediate" or "primary",
self.uuid))
if state_change == self.KFGN:
gconf.configinterface.set('volume_id', self.uuid)
if self.volinfo:
if self.volinfo['retval']:
raise GsyncdError ("master is corrupt")
self.start_checkpoint_thread()
else:
if should_display_info or self.crawls == 0:
if self.inter_master:
logging.info("waiting for being synced from %s ..." % \
self.volinfo_state[self.KFGN]['uuid'])
else:
logging.info("waiting for volume info ...")
return
logging.debug("entering " + path)
if not xtl:
xtl = self.xtime(path)
@ -806,6 +1180,7 @@ class GMasterBase(object):
st = indulgently(e, lambda e: os.lstat(e))
if st == False:
continue
mo = st.st_mode
adct = {'own': (st.st_uid, st.st_gid)}
if stat.S_ISLNK(mo):
@ -815,16 +1190,19 @@ class GMasterBase(object):
elif stat.S_ISREG(mo):
logging.debug("syncing %s ..." % e)
pb = self.syncer.add(e)
timeA=datetime.now()
timeA = datetime.now()
def regjob(e, xte, pb):
if pb.wait():
if pb.wait()[0]:
logging.debug("synced " + e)
self.sendmark_regular(e, xte)
timeB=datetime.now()
self.lastSyncTime=timeB-timeA
self.syncTime=(self.syncTime+self.lastSyncTime.microseconds)/(10.0**6)
self.filesSynced=self.filesSynced+1
# update stats
timeB = datetime.now()
self.crawl_stats['last_synctime'] = timeB - timeA
self.crawl_stats['sync_time'] += ((self.crawl_stats['last_synctime'].microseconds) / (10.0 ** 6))
self.crawl_stats['files_synced'] += 1
self.total_crawl_stats['sync_time'] += ((self.crawl_stats['last_synctime'].microseconds) / (10.0 ** 6))
self.total_crawl_stats['files_synced'] += 1
self.update_crawl_data()
return True
else:
logging.warn("failed to sync " + e)
@ -841,6 +1219,7 @@ class GMasterBase(object):
if path == '.':
self.wait(path, xtl)
class BoxClosedErr(Exception):
pass
@ -920,7 +1299,7 @@ class Syncer(object):
self.slave = slave
self.lock = Lock()
self.pb = PostBox()
self.bytesSynced=0
self.bytes_synced = 0
for i in range(int(gconf.sync_jobs)):
t = Thread(target=self.syncjob)
t.start()
@ -940,13 +1319,10 @@ class Syncer(object):
pb.close()
po = self.slave.rsync(pb)
if po.returncode == 0:
regEx=re.search('\ *total\ *transferred\ *file\ *size:\ *(\d+)\ *bytes\ *',po.stdout.read(),re.IGNORECASE)
if regEx:
self.bytesSynced+=(int(regEx.group(1)))/1024
ret = True
ret = (True, 0)
elif po.returncode in (23, 24):
# partial transfer (cf. rsync(1)), that's normal
ret = False
ret = (False, po.returncode)
else:
po.errfail()
pb.wakeup(ret)

View File

@ -3,26 +3,94 @@ import sys
import time
import signal
import logging
import uuid
import xml.etree.ElementTree as XET
from subprocess import PIPE
from resource import Popen, FILE, GLUSTER, SSH
from threading import Lock
from gconf import gconf
from syncdutils import update_file, select, waitpid, set_term_handler
from syncdutils import update_file, select, waitpid, set_term_handler, is_host_local, GsyncdError
from syncdutils import escape, Thread, finalize, memoize
class Volinfo(object):
def __init__(self, vol, host='localhost', prelude=[]):
po = Popen(prelude + ['gluster', '--xml', '--remote-host=' + host, 'volume', 'info', vol],
stdout=PIPE, stderr=PIPE)
vix = po.stdout.read()
po.wait()
po.terminate_geterr()
vi = XET.fromstring(vix)
if vi.find('opRet').text != '0':
if prelude:
via = '(via %s) ' % prelude.join(' ')
else:
via = ' '
raise GsyncdError('getting volume info of %s%s failed with errorcode %s',
(vol, via, vi.find('opErrno').text))
self.tree = vi
self.volume = vol
self.host = host
def get(self, elem):
return self.tree.findall('.//' + elem)
@property
@memoize
def bricks(self):
def bparse(b):
host, dirp = b.text.split(':', 2)
return {'host': host, 'dir': dirp}
return [ bparse(b) for b in self.get('brick') ]
@property
@memoize
def uuid(self):
ids = self.get('id')
if len(ids) != 1:
raise GsyncdError("volume info of %s obtained from %s: ambiguous uuid",
self.volume, self.host)
return ids[0].text
class Monitor(object):
"""class which spawns and manages gsyncd workers"""
def __init__(self):
self.state = None
ST_INIT = 'Initializing...'
ST_STABLE = 'Stable'
ST_FAULTY = 'faulty'
ST_INCON = 'inconsistent'
_ST_ORD = [ST_STABLE, ST_INIT, ST_FAULTY, ST_INCON]
def set_state(self, state):
def __init__(self):
self.lock = Lock()
self.state = {}
def set_state(self, state, w=None):
"""set the state that can be used by external agents
like glusterd for status reporting"""
if state == self.state:
return
self.state = state
logging.info('new state: %s' % state)
if getattr(gconf, 'state_file', None):
update_file(gconf.state_file, lambda f: f.write(state + '\n'))
computestate = lambda: self.state and self._ST_ORD[max(self._ST_ORD.index(s) for s in self.state.values())]
if w:
self.lock.acquire()
old_state = computestate()
self.state[w] = state
state = computestate()
self.lock.release()
if state != old_state:
self.set_state(state)
else:
logging.info('new state: %s' % state)
if getattr(gconf, 'state_file', None):
update_file(gconf.state_file, lambda f: f.write(state + '\n'))
def monitor(self):
@staticmethod
def terminate():
# relax one SIGTERM by setting a handler that sets back
# standard handler
set_term_handler(lambda *a: set_term_handler())
# give a chance to graceful exit
os.kill(-os.getpid(), signal.SIGTERM)
def monitor(self, w, argv, cpids):
"""the monitor loop
Basic logic is a blantantly simple blunt heuristics:
@ -41,27 +109,8 @@ class Monitor(object):
blown worker blows up on EPIPE if the net goes down,
due to the keep-alive thread)
"""
def sigcont_handler(*a):
"""
Re-init logging and send group kill signal
"""
md = gconf.log_metadata
logging.shutdown()
lcls = logging.getLoggerClass()
lcls.setup(label=md.get('saved_label'), **md)
pid = os.getpid()
os.kill(-pid, signal.SIGUSR1)
signal.signal(signal.SIGUSR1, lambda *a: ())
signal.signal(signal.SIGCONT, sigcont_handler)
argv = sys.argv[:]
for o in ('-N', '--no-daemon', '--monitor'):
while o in argv:
argv.remove(o)
argv.extend(('-N', '-p', ''))
argv.insert(0, os.path.basename(sys.executable))
self.set_state('starting...')
self.set_state(self.ST_INIT, w)
ret = 0
def nwait(p, o=0):
p2, r = waitpid(p, o)
@ -83,7 +132,13 @@ class Monitor(object):
cpid = os.fork()
if cpid == 0:
os.close(pr)
os.execv(sys.executable, argv + ['--feedback-fd', str(pw)])
os.execv(sys.executable, argv + ['--feedback-fd', str(pw),
'--local-path', w[0],
'--local-id', '.' + escape(w[0]),
'--resource-remote', w[1]])
self.lock.acquire()
cpids.add(cpid)
self.lock.release()
os.close(pw)
t0 = time.time()
so = select((pr,), (), (), conn_timeout)[0]
@ -103,27 +158,104 @@ class Monitor(object):
else:
logging.debug("worker not confirmed in %d sec, aborting it" % \
conn_timeout)
# relax one SIGTERM by setting a handler that sets back
# standard handler
set_term_handler(lambda *a: set_term_handler())
# give a chance to graceful exit
os.kill(-os.getpid(), signal.SIGTERM)
self.terminate()
time.sleep(1)
os.kill(cpid, signal.SIGKILL)
ret = nwait(cpid)
if ret == None:
self.set_state('OK')
self.set_state(self.ST_STABLE, w)
ret = nwait(cpid)
if exit_signalled(ret):
ret = 0
else:
ret = exit_status(ret)
if ret in (0,1):
self.set_state('faulty')
self.set_state(self.ST_FAULTY, w)
time.sleep(10)
self.set_state('inconsistent')
self.set_state(self.ST_INCON, w)
return ret
def monitor():
def multiplex(self, wspx, suuid):
def sigcont_handler(*a):
"""
Re-init logging and send group kill signal
"""
md = gconf.log_metadata
logging.shutdown()
lcls = logging.getLoggerClass()
lcls.setup(label=md.get('saved_label'), **md)
pid = os.getpid()
os.kill(-pid, signal.SIGUSR1)
signal.signal(signal.SIGUSR1, lambda *a: ())
signal.signal(signal.SIGCONT, sigcont_handler)
argv = sys.argv[:]
for o in ('-N', '--no-daemon', '--monitor'):
while o in argv:
argv.remove(o)
argv.extend(('-N', '-p', '', '--slave-id', suuid))
argv.insert(0, os.path.basename(sys.executable))
cpids = set()
ta = []
for wx in wspx:
def wmon(w):
cpid, _ = self.monitor(w, argv, cpids)
terminate()
time.sleep(1)
self.lock.acquire()
for cpid in cpids:
os.kill(cpid, signal.SIGKILL)
self.lock.release()
finalize(exval=1)
t = Thread(target = wmon, args=[wx])
t.start()
ta.append(t)
for t in ta:
t.join()
def distribute(*resources):
master, slave = resources
mvol = Volinfo(master.volume, master.host)
logging.debug('master bricks: ' + repr(mvol.bricks))
locmbricks = [ b['dir'] for b in mvol.bricks if is_host_local(b['host']) ]
prelude = []
si = slave
if isinstance(slave, SSH):
prelude = gconf.ssh_command.split() + [slave.remote_addr]
si = slave.inner_rsc
logging.debug('slave SSH gateway: ' + slave.remote_addr)
if isinstance(si, FILE):
sbricks = {'host': 'localhost', 'dir': si.path}
suuid = uuid.uuid5(uuid.NAMESPACE_URL, slave.get_url(canonical=True))
elif isinstance(si, GLUSTER):
svol = Volinfo(si.volume, si.host, prelude)
sbricks = svol.bricks
suuid = svol.uuid
else:
raise GsyncdError("unkown slave type " + slave.url)
logging.info('slave bricks: ' + repr(sbricks))
if isinstance(si, FILE):
slaves = [ slave.url ]
else:
slavenodes = set(b['host'] for b in sbricks)
if isinstance(slave, SSH) and not gconf.isolated_slave:
rap = SSH.parse_ssh_address(slave.remote_addr)
slaves = [ 'ssh://' + rap['user'] + '@' + h + ':' + si.url for h in slavenodes ]
else:
slavevols = [ h + ':' + si.volume for h in slavenodes ]
if isinstance(slave, SSH):
slaves = [ 'ssh://' + rap.remote_addr + ':' + v for v in slavevols ]
else:
slaves = slavevols
locmbricks.sort()
slaves.sort()
workerspex = []
for i in range(len(locmbricks)):
workerspex.append((locmbricks[i], slaves[i % len(slaves)]))
logging.info('worker specs: ' + repr(workerspex))
return workerspex, suuid
def monitor(*resources):
"""oh yeah, actually Monitor is used as singleton, too"""
return Monitor().monitor()
return Monitor().multiplex(*distribute(*resources))

View File

@ -5,13 +5,14 @@ import stat
import time
import fcntl
import errno
import types
import struct
import socket
import logging
import tempfile
import threading
import subprocess
from errno import EEXIST, ENOENT, ENODATA, ENOTDIR, ELOOP, EISDIR
from errno import EEXIST, ENOENT, ENODATA, ENOTDIR, ELOOP, EISDIR, ENOTEMPTY
from select import error as SelectError
from gconf import gconf
@ -19,7 +20,8 @@ import repce
from repce import RepceServer, RepceClient
from master import gmaster_builder
import syncdutils
from syncdutils import GsyncdError, select, privileged, boolify
from syncdutils import GsyncdError, select, privileged, boolify, funcode
from syncdutils import umask, entry2pb, gauxpfx, errno_wrap
UrlRX = re.compile('\A(\w+)://([^ *?[]*)\Z')
HostRX = re.compile('[a-z\d](?:[a-z\d.-]*[a-z\d])?', re.I)
@ -105,7 +107,18 @@ class _MetaXattr(object):
setattr(self, m, getattr(LXattr, m))
return getattr(self, meth)
class _MetaChangelog(object):
def __getattr__(self, meth):
from libgfchangelog import Changes as LChanges
xmeth = [ m for m in dir(LChanges) if m[0] != '_' ]
if not meth in xmeth:
return
for m in xmeth:
setattr(self, m, getattr(LChanges, m))
return getattr(self, meth)
Xattr = _MetaXattr()
Changes = _MetaChangelog()
class Popen(subprocess.Popen):
@ -245,10 +258,24 @@ class Server(object):
and classmethods and is used directly, without instantiation.)
"""
GX_NSPACE = (privileged() and "trusted" or "system") + ".glusterfs"
GX_NSPACE_PFX = (privileged() and "trusted" or "system")
GX_NSPACE = GX_NSPACE_PFX + ".glusterfs"
NTV_FMTSTR = "!" + "B"*19 + "II"
FRGN_XTRA_FMT = "I"
FRGN_FMTSTR = NTV_FMTSTR + FRGN_XTRA_FMT
GX_GFID_CANONICAL_LEN = 37 # canonical gfid len + '\0'
local_path = ''
@classmethod
def _fmt_mknod(cls, l):
return "!II%dsI%dsIII" % (cls.GX_GFID_CANONICAL_LEN, l+1)
@classmethod
def _fmt_mkdir(cls, l):
return "!II%dsI%dsII" % (cls.GX_GFID_CANONICAL_LEN, l+1)
@classmethod
def _fmt_symlink(cls, l1, l2):
return "!II%dsI%ds%ds" % (cls.GX_GFID_CANONICAL_LEN, l1+1, l2+1)
def _pathguard(f):
"""decorator method that checks
@ -257,22 +284,21 @@ class Server(object):
point out of the managed tree
"""
fc = getattr(f, 'func_code', None)
if not fc:
# python 3
fc = f.__code__
fc = funcode(f)
pi = list(fc.co_varnames).index('path')
def ff(*a):
path = a[pi]
ps = path.split('/')
if path[0] == '/' or '..' in ps:
raise ValueError('unsafe path')
a = list(a)
a[pi] = os.path.join(a[0].local_path, path)
return f(*a)
return ff
@staticmethod
@classmethod
@_pathguard
def entries(path):
def entries(cls, path):
"""directory entries in an array"""
# prevent symlinks being followed
if not stat.S_ISDIR(os.lstat(path).st_mode):
@ -370,6 +396,18 @@ class Server(object):
else:
raise
@classmethod
def gfid(cls, gfidpath):
return errno_wrap(Xattr.lgetxattr, [gfidpath, 'glusterfs.gfid', cls.GX_GFID_CANONICAL_LEN], [ENOENT])
@classmethod
def node_uuid(cls, path='.'):
try:
uuid_l = Xattr.lgetxattr_buf(path, '.'.join([cls.GX_NSPACE, 'node-uuid']))
return uuid_l[:-1].split(' ')
except OSError:
raise
@classmethod
def xtime_vec(cls, path, *uuids):
"""vectored version of @xtime
@ -402,9 +440,96 @@ class Server(object):
for u,t in mark_dct.items():
cls.set_xtime(path, u, t)
@staticmethod
@classmethod
def entry_ops(cls, entries):
pfx = gauxpfx()
logging.debug('entries: %s' % repr(entries))
# regular file
def entry_pack_reg(gf, bn, st):
blen = len(bn)
mo = st['mode']
return struct.pack(cls._fmt_mknod(blen),
st['uid'], st['gid'],
gf, mo, bn,
stat.S_IMODE(mo), 0, umask())
# mkdir
def entry_pack_mkdir(gf, bn, st):
blen = len(bn)
mo = st['mode']
return struct.pack(cls._fmt_mkdir(blen),
st['uid'], st['gid'],
gf, mo, bn,
stat.S_IMODE(mo), umask())
#symlink
def entry_pack_symlink(gf, bn, lnk, st):
blen = len(bn)
llen = len(lnk)
return struct.pack(cls._fmt_symlink(blen, llen),
st['uid'], st['gid'],
gf, st['mode'], bn, lnk)
def entry_purge(entry, gfid):
# This is an extremely racy code and needs to be fixed ASAP.
# The GFID check here is to be sure that the pargfid/bname
# to be purged is the GFID gotten from the changelog.
# (a stat(changelog_gfid) would also be valid here)
# The race here is between the GFID check and the purge.
disk_gfid = cls.gfid(entry)
if isinstance(disk_gfid, int):
return
if not gfid == disk_gfid:
return
er = errno_wrap(os.unlink, [entry], [ENOENT, EISDIR])
if isinstance(er, int):
if er == EISDIR:
er = errno_wrap(os.rmdir, [entry], [ENOENT, ENOTEMPTY])
if er == ENOTEMPTY:
return er
for e in entries:
blob = None
op = e['op']
gfid = e['gfid']
entry = e['entry']
(pg, bname) = entry2pb(entry)
if op in ['RMDIR', 'UNLINK']:
while True:
er = entry_purge(entry, gfid)
if isinstance(er, int):
time.sleep(1)
else:
break
elif op == 'CREATE':
blob = entry_pack_reg(gfid, bname, e['stat'])
elif op == 'MKDIR':
blob = entry_pack_mkdir(gfid, bname, e['stat'])
elif op == 'LINK':
errno_wrap(os.link, [os.path.join(pfx, gfid), entry], [ENOENT, EEXIST])
elif op == 'SYMLINK':
blob = entry_pack_symlink(gfid, bname, e['link'], e['stat'])
elif op == 'RENAME':
en = e['entry1']
errno_wrap(os.rename, [entry, en], [ENOENT, EEXIST])
if blob:
errno_wrap(Xattr.lsetxattr_l, [pg, 'glusterfs.gfid.newfile', blob], [ENOENT, EEXIST])
@classmethod
def changelog_register(cls, cl_brick, cl_dir, cl_log, cl_level, retries = 0):
Changes.cl_register(cl_brick, cl_dir, cl_log, cl_level, retries)
@classmethod
def changelog_scan(cls):
Changes.cl_scan()
@classmethod
def changelog_getchanges(cls):
return Changes.cl_getchanges()
@classmethod
def changelog_done(cls, clfile):
Changes.cl_done(clfile)
@classmethod
@_pathguard
def setattr(path, adct):
def setattr(cls, path, adct):
"""set file attributes
@adct is a dict, where 'own', 'mode' and 'times'
@ -537,10 +662,10 @@ class SlaveRemote(object):
raise GsyncdError("no files to sync")
logging.debug("files: " + ", ".join(files))
argv = gconf.rsync_command.split() + \
['-aR0', '--files-from=-', '--super','--stats', '--numeric-ids', '--no-implied-dirs'] + \
['-avR0', '--inplace', '--files-from=-', '--super','--stats', '--numeric-ids', '--no-implied-dirs'] + \
gconf.rsync_options.split() + (boolify(gconf.use_rsync_xattrs) and ['--xattrs'] or []) + \
['.'] + list(args)
po = Popen(argv, stdin=subprocess.PIPE,stdout=subprocess.PIPE,stderr=subprocess.PIPE)
po = Popen(argv, stdin=subprocess.PIPE,stderr=subprocess.PIPE)
for f in files:
po.stdin.write(f)
po.stdin.write('\0')
@ -685,7 +810,8 @@ class GLUSTER(AbstractUrl, SlaveLocal, SlaveRemote):
def can_connect_to(self, remote):
"""determine our position in the connectibility matrix"""
return True
return not remote or \
(isinstance(remote, SSH) and isinstance(remote.inner_rsc, GLUSTER))
class Mounter(object):
"""Abstract base class for mounter backends"""
@ -864,6 +990,10 @@ class GLUSTER(AbstractUrl, SlaveLocal, SlaveRemote):
sup(self, *a, **kw)
self.slavedir = "/proc/%d/cwd" % self.server.pid()
def gmaster_instantiate_tuple(self, slave):
"""return a tuple of the 'one shot' and the 'main crawl' class instance"""
return (gmaster_builder('xsync')(self, slave), gmaster_builder()(self, slave))
def service_loop(self, *args):
"""enter service loop
@ -873,7 +1003,41 @@ class GLUSTER(AbstractUrl, SlaveLocal, SlaveRemote):
- else do that's what's inherited
"""
if args:
gmaster_builder()(self, args[0]).crawl_loop()
slave = args[0]
if gconf.local_path:
class brickserver(FILE.FILEServer):
local_path = gconf.local_path
aggregated = self.server
@classmethod
def entries(cls, path):
e = super(brickserver, cls).entries(path)
# on the brick don't mess with /.glusterfs
if path == '.':
try:
e.remove('.glusterfs')
except ValueError:
pass
return e
if gconf.slave_id:
# define {,set_}xtime in slave, thus preempting
# the call to remote, so that it takes data from
# the local brick
slave.server.xtime = types.MethodType(lambda _self, path, uuid: brickserver.xtime(path, uuid + '.' + gconf.slave_id), slave.server)
slave.server.set_xtime = types.MethodType(lambda _self, path, uuid, mark: brickserver.set_xtime(path, uuid + '.' + gconf.slave_id, mark), slave.server)
(g1, g2) = self.gmaster_instantiate_tuple(slave)
g1.master.server = brickserver
g2.master.server = brickserver
else:
(g1, g2) = self.gmaster_instantiate_tuple(slave)
g1.master.server.aggregated = gmaster.master.server
g2.master.server.aggregated = gmaster.master.server
# bad bad bad: bad way to do things like this
# need to make this elegant
# register the crawlers and start crawling
g1.register()
g2.register()
g1.crawlwrap(oneshot=True)
g2.crawlwrap()
else:
sup(self, *args)
@ -893,13 +1057,18 @@ class SSH(AbstractUrl, SlaveRemote):
'^((?:%s@)?%s):(.+)' % tuple([ r.pattern for r in (UserRX, HostRX) ]))
self.inner_rsc = parse_url(inner_url)
def canonical_path(self):
m = re.match('([^@]+)@(.+)', self.remote_addr)
@staticmethod
def parse_ssh_address(addr):
m = re.match('([^@]+)@(.+)', addr)
if m:
u, h = m.groups()
else:
u, h = syncdutils.getusername(), self.remote_addr
remote_addr = '@'.join([u, gethostbyname(h)])
u, h = syncdutils.getusername(), addr
return {'user': u, 'host': h}
def canonical_path(self):
rap = self.parse_ssh_address(self.remote_addr)
remote_addr = '@'.join([rap['user'], gethostbyname(rap['host'])])
return ':'.join([remote_addr, self.inner_rsc.get_url(canonical=True)])
def can_connect_to(self, remote):

View File

@ -5,8 +5,9 @@ import time
import fcntl
import shutil
import logging
import socket
from threading import Lock, Thread as baseThread
from errno import EACCES, EAGAIN, EPIPE, ENOTCONN, ECONNABORTED, EINTR, errorcode
from errno import EACCES, EAGAIN, EPIPE, ENOTCONN, ECONNABORTED, EINTR, ENOENT, EPERM, ESTALE, errorcode
from signal import signal, SIGTERM, SIGKILL
from time import sleep
import select as oselect
@ -25,6 +26,15 @@ try:
except ImportError:
import urllib
try:
from hashlib import md5 as md5
except ImportError:
# py 2.4
from md5 import new as md5
# auxillary gfid based access prefix
_CL_AUX_GFID_PFX = ".gfid/"
def escape(s):
"""the chosen flavor of string escaping, used all over
to turn whatever data to creatable representation"""
@ -286,3 +296,93 @@ def waitpid (*a):
def set_term_handler(hook=lambda *a: finalize(*a, **{'exval': 1})):
signal(SIGTERM, hook)
def is_host_local(host):
locaddr = False
for ai in socket.getaddrinfo(host, None):
# cf. http://github.com/gluster/glusterfs/blob/ce111f47/xlators/mgmt/glusterd/src/glusterd-utils.c#L125
if ai[0] == socket.AF_INET:
if ai[-1][0].split(".")[0] == "127":
locaddr = True
break
elif ai[0] == socket.AF_INET6:
if ai[-1][0] == "::1":
locaddr = True
break
else:
continue
try:
# use ICMP socket to avoid net.ipv4.ip_nonlocal_bind issue,
# cf. https://bugzilla.redhat.com/show_bug.cgi?id=890587
s = socket.socket(ai[0], socket.SOCK_RAW, socket.IPPROTO_ICMP)
except socket.error:
ex = sys.exc_info()[1]
if ex.errno != EPERM:
raise
f = None
try:
f = open("/proc/sys/net/ipv4/ip_nonlocal_bind")
if int(f.read()) != 0:
raise GsyncdError(
"non-local bind is set and not allowed to create raw sockets, "
"cannot determine if %s is local" % host)
s = socket.socket(ai[0], socket.SOCK_DGRAM)
finally:
if f:
f.close()
try:
s.bind(ai[-1])
locaddr = True
break
except:
pass
s.close()
return locaddr
def funcode(f):
fc = getattr(f, 'func_code', None)
if not fc:
# python 3
fc = f.__code__
return fc
def memoize(f):
fc = funcode(f)
fn = fc.co_name
def ff(self, *a, **kw):
rv = getattr(self, '_' + fn, None)
if rv == None:
rv = f(self, *a, **kw)
setattr(self, '_' + fn, rv)
return rv
return ff
def umask():
return os.umask(0)
def entry2pb(e):
return e.rsplit('/', 1)
def gauxpfx():
return _CL_AUX_GFID_PFX
def md5hex(s):
return md5(s).hexdigest()
def selfkill(sig=SIGTERM):
os.kill(os.getpid(), sig)
def errno_wrap(call, arg=[], errnos=[]):
""" wrapper around calls resilient to errnos.
retry in case of ESTALE
"""
while True:
try:
return call(*arg)
except OSError:
ex = sys.exc_info()[1]
if ex.errno in errnos:
return ex.errno
if not ex.errno == ESTALE:
raise
time.sleep(0.5) # retry the call

View File

@ -520,6 +520,7 @@ fi
%defattr(-,root,root)
%{_libexecdir}/glusterfs/gsyncd
%{_libexecdir}/glusterfs/python/syncdaemon/*
%{_libexecdir}/glusterfs/gverify.sh
%ghost %dir %attr(0755,-,-) %{_sharedstatedir}/glusterd/geo-replication
%ghost %attr(0644,-,-) %{_sharedstatedir}/glusterd/geo-replication/gsyncd.conf
%endif
@ -696,6 +697,9 @@ if [ $1 -ge 1 ]; then
fi
%changelog
* Thu Jul 25 2013 Aravinda VK <avishwan@redhat.com>
- Added gverify.sh to %{_libexecdir}/glusterfs directory.
* Thu Jul 25 2013 Harshavardhana <fharshav@redhat.com>
- Allow to build with '--without bd' to disable 'bd' xlator

View File

@ -33,6 +33,10 @@ static char *gsync_reserved_opts[] = {
"session-owner",
"state-socket-unencoded",
"socketdir",
"ignore-deletes",
"local-id",
"local-path",
"slave-id",
NULL
};

View File

@ -633,10 +633,20 @@ configure_syncdaemon (glusterd_conf_t *conf)
runinit_gsyncd_setrx (&runner, conf);
runner_add_args (&runner,
"gluster-log-file",
DEFAULT_LOG_FILE_DIRECTORY"/"GEOREP"/${mastervol}/${eSlave}.gluster.log",
DEFAULT_LOG_FILE_DIRECTORY"/"GEOREP"/${mastervol}/${eSlave}${local_id}.gluster.log",
".", ".", NULL);
RUN_GSYNCD_CMD;
/* ignore-deletes */
runinit_gsyncd_setrx (&runner, conf);
runner_add_args (&runner, "ignore-deletes", "true", ".", ".", NULL);
RUN_GSYNCD_CMD;
/* special-sync-mode */
runinit_gsyncd_setrx (&runner, conf);
runner_add_args (&runner, "special-sync-mode", "partial", ".", ".", NULL);
RUN_GSYNCD_CMD;
/************
* slave pre-configuration
************/