geo-rep: Post process Data and Meta Changelogs

With this patch, Data and Meta GFIDs are post processed. If Changelog has
UNLINK entry then remove from Data and Meta GFIDs list(If stat on GFID is
ENOENT in Master).

While processing Changelogs,

- Collect all the data and meta operations in a temporary database
- Delete all Data and Meta GFIDs which are already unlinked as per Changelogs
  (unlink only if stat on GFID is ENOENT)
- Process all Entry operations as usual
- Process data and meta operations in batch(Fetch from Db in batch)
- Data sync is again batched based on number of changelogs(Default 1day
  changelogs). Once the sync is complete, Update last Changelog's time as last_synced
  time as usual.

Additionally maintain entry_stime on Brick root, ignore Entry ops if changelog
suffix time is less than entry_stime. If data stime is more than entry_stime,
this can happen only when passive worker updates stime by itself by getting
mount point stime. Use entry_stime = data_stime in this case.

New configurations:

max-rsync-retries - Default Value is 10
max-data-changelogs-in-batch - Max number of changelogs to be considered in a
batch for syncing. Default value is 5760(4 changelogs per min * 60 min *
24 hours)
max-history-changelogs-in-batch - Max number of history changelogs to be
processed at once. Default value 86400(4 changelogs per min * 60 min * 24
hours * 15 days)

BUG: 1364420
Change-Id: I7b665895bf4806035c2a8573d361257cbadbea17
Signed-off-by: Aravinda VK <avishwan@redhat.com>
Reviewed-on: http://review.gluster.org/15110
Smoke: Gluster Build System <jenkins@build.gluster.org>
NetBSD-regression: NetBSD Build System <jenkins@build.gluster.org>
Reviewed-by: Kotresh HR <khiremat@redhat.com>
CentOS-regression: Gluster Build System <jenkins@build.gluster.org>
This commit is contained in:
Aravinda VK 2016-08-08 17:02:37 +05:30
parent 4a3454753f
commit 6c283f107b
7 changed files with 514 additions and 184 deletions

View File

@ -3,6 +3,6 @@ syncdaemondir = $(libexecdir)/glusterfs/python/syncdaemon
syncdaemon_PYTHON = gconf.py gsyncd.py __init__.py master.py README.md repce.py \
resource.py configinterface.py syncdutils.py monitor.py libcxattr.py \
$(top_builddir)/contrib/ipaddr-py/ipaddr.py libgfchangelog.py changelogagent.py \
gsyncdstatus.py
gsyncdstatus.py changelogsdb.py
CLEANFILES =

View File

@ -0,0 +1,111 @@
#
# Copyright (c) 2016 Red Hat, Inc. <http://www.redhat.com>
# This file is part of GlusterFS.
# This file is licensed to you under your choice of the GNU Lesser
# General Public License, version 3 or any later version (LGPLv3 or
# later), or the GNU General Public License, version 2 (GPLv2), in all
# cases as published by the Free Software Foundation.
#
import os
import sqlite3
from errno import ENOENT
conn = None
cursor = None
def db_commit():
conn.commit()
def db_init(db_path):
global conn, cursor
# Remove Temp Db
try:
os.unlink(db_path)
os.unlink(db_path + "-journal")
except OSError as e:
if e.errno != ENOENT:
raise
conn = sqlite3.connect(db_path)
cursor = conn.cursor()
cursor.execute("DROP TABLE IF EXISTS data")
cursor.execute("DROP TABLE IF EXISTS meta")
query = """CREATE TABLE IF NOT EXISTS data(
gfid VARCHAR(100) PRIMARY KEY ON CONFLICT IGNORE,
changelog_time VARCHAR(100)
)"""
cursor.execute(query)
query = """CREATE TABLE IF NOT EXISTS meta(
gfid VARCHAR(100) PRIMARY KEY ON CONFLICT IGNORE,
changelog_time VARCHAR(100)
)"""
cursor.execute(query)
def db_record_data(gfid, changelog_time):
query = "INSERT INTO data(gfid, changelog_time) VALUES(?, ?)"
cursor.execute(query, (gfid, changelog_time))
def db_record_meta(gfid, changelog_time):
query = "INSERT INTO meta(gfid, changelog_time) VALUES(?, ?)"
cursor.execute(query, (gfid, changelog_time))
def db_remove_meta(gfid):
query = "DELETE FROM meta WHERE gfid = ?"
cursor.execute(query, (gfid, ))
def db_remove_data(gfid):
query = "DELETE FROM data WHERE gfid = ?"
cursor.execute(query, (gfid, ))
def db_get_data(start, end, limit, offset):
query = """SELECT gfid FROM data WHERE changelog_time
BETWEEN ? AND ? LIMIT ? OFFSET ?"""
cursor.execute(query, (start, end, limit, offset))
out = []
for row in cursor:
out.append(row[0])
return out
def db_get_meta(start, end, limit, offset):
query = """SELECT gfid FROM meta WHERE changelog_time
BETWEEN ? AND ? LIMIT ? OFFSET ?"""
cursor.execute(query, (start, end, limit, offset))
out = []
for row in cursor:
out.append(row[0])
return out
def db_delete_meta_if_exists_in_data():
query = """
DELETE FROM meta WHERE gfid in
(SELECT M.gfid
FROM meta M INNER JOIN data D
ON M.gfid = D.gfid)
"""
cursor.execute(query)
def db_get_data_count():
query = "SELECT COUNT(gfid) FROM data"
cursor.execute(query)
return cursor.fetchone()[0]
def db_get_meta_count():
query = "SELECT COUNT(gfid) FROM meta"
cursor.execute(query)
return cursor.fetchone()[0]

View File

@ -274,6 +274,28 @@ def main_i():
op.add_option('--sync-acls', default=True, action='store_true')
op.add_option('--log-rsync-performance', default=False,
action='store_true')
op.add_option('--max-rsync-retries', type=int, default=10)
# This is for stime granularity, Bigger batch will be split into
# multiple data batches, On failure it will start from this point
# Default value is 1 day changelogs
# (4 * 60 * 24 = 5760)
# 4 changelogs per minute
# 60 min per hr
# 24 hrs per day
op.add_option('--max-data-changelogs-in-batch', type=int, default=5760)
# While processing Historical Changelogs above BATCH SIZE is not considered
# since all Changelogs to be post processed once, Batching it makes more
# rsync retries. (4 * 60 * 24 * 15 = 86400)
# 4 changelogs per minute
# 60 min per hr
# 24 hrs per day
# 15 days
# This means 15 days changelogs can be processed at once in case of
# History scan
op.add_option('--max-history-changelogs-in-batch', type=int, default=86400)
op.add_option('--pause-on-start', default=False, action='store_true')
op.add_option('-L', '--log-level', metavar='LVL')
op.add_option('-r', '--remote-gsyncd', metavar='CMD',

View File

@ -52,6 +52,7 @@ def get_default_values():
"slave_node": DEFAULT_STATUS,
"worker_status": DEFAULT_STATUS,
"last_synced": 0,
"last_synced_entry": 0,
"crawl_status": DEFAULT_STATUS,
"entry": 0,
"data": 0,
@ -239,6 +240,7 @@ class GeorepStatus(object):
slave_node N/A VALUE VALUE N/A
status Created VALUE Paused Stopped
last_synced N/A VALUE VALUE VALUE
last_synced_entry N/A VALUE VALUE VALUE
crawl_status N/A VALUE N/A N/A
entry N/A VALUE N/A N/A
data N/A VALUE N/A N/A

View File

@ -25,7 +25,13 @@ from gconf import gconf
from syncdutils import Thread, GsyncdError, boolify, escape
from syncdutils import unescape, gauxpfx, md5hex, selfkill
from syncdutils import lstat, errno_wrap, FreeObject
from syncdutils import NoPurgeTimeAvailable, PartialHistoryAvailable
from syncdutils import NoStimeAvailable, PartialHistoryAvailable
from changelogsdb import db_init, db_record_data, db_record_meta
from changelogsdb import db_remove_data, db_remove_meta
from changelogsdb import db_get_data, db_get_meta, db_commit
from changelogsdb import db_get_data_count, db_get_meta_count
from changelogsdb import db_delete_meta_if_exists_in_data
URXTIME = (-1, 0)
@ -45,6 +51,10 @@ CHANGELOG_ROLLOVER_TIME = 15
# that batch since stime will get updated after each batch.
MAX_CHANGELOG_BATCH_SIZE = 727040
# Number of record to query once
DB_PAGINATION_SIZE_META = 100
DB_PAGINATION_SIZE_DATA = 1000
# Utility functions to help us to get to closer proximity
# of the DRY principle (no, don't look for elevated or
# perspectivistic things here)
@ -69,6 +79,24 @@ def _volinfo_hook_relax_foreign(self):
return volinfo_sys
def edct(op, **ed):
dct = {}
dct['op'] = op
for k in ed:
if k == 'stat':
st = ed[k]
dst = dct['stat'] = {}
if st:
dst['uid'] = st.st_uid
dst['gid'] = st.st_gid
dst['mode'] = st.st_mode
dst['atime'] = st.st_atime
dst['mtime'] = st.st_mtime
else:
dct[k] = ed[k]
return dct
# The API!
def gmaster_builder(excrawl=None):
@ -259,7 +287,7 @@ class TarSSHEngine(object):
st = lstat(se)
if isinstance(st, int):
# file got unlinked in the interim
self.unlinked_gfids.add(se)
db_remove_data(se)
return True
self.add_job(self.FLAT_DIR_HIERARCHY, 'reg', regjob, f, None, pb)
@ -294,7 +322,7 @@ class RsyncEngine(object):
st = lstat(se)
if isinstance(st, int):
# file got unlinked in the interim
self.unlinked_gfids.add(se)
db_remove_data(se)
return True
self.add_job(self.FLAT_DIR_HIERARCHY, 'reg', regjob, f, None, pb)
@ -340,6 +368,18 @@ class GMasterCommon(object):
if self.volinfo:
return self.volinfo['volume_mark']
def get_entry_stime(self):
data = self.slave.server.entry_stime(".", self.uuid)
if isinstance(data, int):
data = None
return data
def get_data_stime(self):
data = self.slave.server.stime(".", self.uuid)
if isinstance(data, int):
data = None
return data
def xtime(self, path, *a, **opts):
"""get amended xtime
@ -387,7 +427,6 @@ class GMasterCommon(object):
self.volinfo = None
self.terminate = False
self.sleep_interval = 1
self.unlinked_gfids = set()
def init_keep_alive(cls):
"""start the keep-alive thread """
@ -553,7 +592,7 @@ class GMasterCommon(object):
self.FLAT_DIR_HIERARCHY, self.uuid, cluster_stime)
# Purge all changelogs available in processing dir
# less than cluster_stime
proc_dir = os.path.join(self.setup_working_dir(),
proc_dir = os.path.join(self.tempdir,
".processing")
if os.path.exists(proc_dir):
@ -627,6 +666,11 @@ class GMasterCommon(object):
ret = j[-1]()
if not ret:
succeed = False
# All the unlinked GFIDs removed from Data and Meta list
# Commit the Transaction
db_commit()
if succeed and not args[0] is None:
self.sendmark(path, *args)
return succeed
@ -670,9 +714,6 @@ class GMasterChangelogMixin(GMasterCommon):
# flat directory hierarchy for gfid based access
FLAT_DIR_HIERARCHY = '.'
# maximum retries per changelog before giving up
MAX_RETRIES = 10
CHANGELOG_LOG_LEVEL = 9
CHANGELOG_CONN_RETRIES = 5
@ -727,21 +768,48 @@ class GMasterChangelogMixin(GMasterCommon):
logging.debug('changelog working dir %s' % workdir)
return workdir
def get_purge_time(self):
purge_time = self.xtime('.', self.slave)
if isinstance(purge_time, int):
purge_time = None
return purge_time
def log_failures(self, failures, entry_key, gfid_prefix, log_prefix):
num_failures = 0
for failure in failures:
st = lstat(os.path.join(gfid_prefix, failure[0][entry_key]))
if not isinstance(st, int):
num_failures += 1
logging.error('%s FAILED: %s' % (log_prefix,
repr(failure)))
self.status.inc_value("failures", num_failures)
def process_change(self, change, done, retry):
pfx = gauxpfx()
clist = []
entries = []
meta_gfid = set()
datas = set()
change_ts = change.split(".")[-1]
# self.data_batch_start is None only in beginning and during
# new batch start
if self.data_batch_start is None:
self.data_batch_start = change_ts
# Ignore entry ops which are already processed in Changelog modes
ignore_entry_ops = False
entry_stime = None
data_stime = None
if self.name in ["live_changelog", "history_changelog"]:
entry_stime = self.get_entry_stime()
data_stime = self.get_data_stime()
if entry_stime is not None and data_stime is not None:
# if entry_stime is not None but data_stime > entry_stime
# This situation is caused by the stime update of Passive worker
# Consider data_stime in this case.
if data_stime[0] > entry_stime[0]:
entry_stime = data_stime
# Compare the entry_stime with changelog file suffix
# if changelog time is less than entry_stime then ignore
if int(change_ts) <= entry_stime[0]:
ignore_entry_ops = True
# basic crawl stats: files and bytes
files_pending = {'count': 0, 'purge': 0, 'bytes': 0, 'files': []}
try:
f = open(change, "r")
clist = f.readlines()
@ -749,42 +817,6 @@ class GMasterChangelogMixin(GMasterCommon):
except IOError:
raise
def edct(op, **ed):
dct = {}
dct['op'] = op
for k in ed:
if k == 'stat':
st = ed[k]
dst = dct['stat'] = {}
if st:
dst['uid'] = st.st_uid
dst['gid'] = st.st_gid
dst['mode'] = st.st_mode
dst['atime'] = st.st_atime
dst['mtime'] = st.st_mtime
else:
dct[k] = ed[k]
return dct
# entry counts (not purges)
def entry_update():
files_pending['count'] += 1
# purge count
def purge_update():
files_pending['purge'] += 1
def log_failures(failures, entry_key, gfid_prefix, log_prefix):
num_failures = 0
for failure in failures:
st = lstat(os.path.join(gfid_prefix, failure[0][entry_key]))
if not isinstance(st, int):
num_failures += 1
logging.error('%s FAILED: %s' % (log_prefix,
repr(failure)))
self.status.inc_value("failures", num_failures)
for e in clist:
e = e.strip()
et = e[self.IDX_START:self.IDX_END] # entry type
@ -792,12 +824,20 @@ class GMasterChangelogMixin(GMasterCommon):
# skip ENTRY operation if hot tier brick
if self.name == 'live_changelog' or \
self.name == 'history_changelog':
self.name == 'history_changelog':
if boolify(gconf.is_hottier) and et == self.TYPE_ENTRY:
logging.debug('skip ENTRY op: %s if hot tier brick'
% (ec[self.POS_TYPE]))
continue
# Data and Meta operations are decided while parsing
# UNLINK/RMDIR/MKNOD except that case ignore all the other
# entry ops if ignore_entry_ops is True.
# UNLINK/RMDIR/MKNOD entry_ops are ignored in the end
if ignore_entry_ops and et == self.TYPE_ENTRY and \
ec[self.POS_TYPE] not in ["UNLINK", "RMDIR", "MKNOD"]:
continue
if et == self.TYPE_ENTRY:
# extract information according to the type of
# the entry operation. create(), mkdir() and mknod()
@ -819,15 +859,16 @@ class GMasterChangelogMixin(GMasterCommon):
# Remove from DATA list, so that rsync will
# not fail
pt = os.path.join(pfx, ec[0])
if pt in datas:
datas.remove(pt)
st = lstat(pt)
if isinstance(st, int):
# file got unlinked, May be historical Changelog
db_remove_data(pt)
db_remove_meta(pt)
if not boolify(gconf.ignore_deletes):
purge_update()
entries.append(edct(ty, gfid=gfid, entry=en))
if not ignore_entry_ops:
entries.append(edct(ty, gfid=gfid, entry=en))
elif ty in ['CREATE', 'MKDIR', 'MKNOD']:
entry_update()
# Special case: record mknod as link
if ty in ['MKNOD']:
mode = int(ec[2])
@ -845,10 +886,10 @@ class GMasterChangelogMixin(GMasterCommon):
# CREATED if source not exists.
entries.append(edct('LINK', stat=st, entry=en,
gfid=gfid))
# Here, we have the assumption that only
# tier-gfid.linkto causes this mknod. Add data
datas.add(os.path.join(pfx, ec[0]))
db_record_data(os.path.join(pfx, ec[0]),
change_ts)
continue
# stat info. present in the changelog itself
@ -867,7 +908,6 @@ class GMasterChangelogMixin(GMasterCommon):
if isinstance(rl, int):
rl = None
entry_update()
e1 = unescape(os.path.join(pfx, ec[self.POS_ENTRY1 - 1]))
entries.append(edct(ty, gfid=gfid, entry=e1, entry1=en,
stat=st, link=rl))
@ -880,120 +920,102 @@ class GMasterChangelogMixin(GMasterCommon):
continue
if ty == 'LINK':
entry_update()
entries.append(edct(ty, stat=st, entry=en, gfid=gfid))
elif ty == 'SYMLINK':
rl = errno_wrap(os.readlink, [en], [ENOENT], [ESTALE])
if isinstance(rl, int):
continue
entry_update()
entries.append(
edct(ty, stat=st, entry=en, gfid=gfid, link=rl))
else:
logging.warn('ignoring %s [op %s]' % (gfid, ty))
elif et == self.TYPE_GFID:
# If self.unlinked_gfids is available, then that means it is
# retrying the changelog second time. Do not add the GFID's
# to rsync job if failed previously but unlinked in master
if self.unlinked_gfids and \
os.path.join(pfx, ec[0]) in self.unlinked_gfids:
logging.debug("ignoring data, since file purged interim")
else:
datas.add(os.path.join(pfx, ec[0]))
db_record_data(os.path.join(pfx, ec[0]), change_ts)
elif et == self.TYPE_META:
if ec[1] == 'SETATTR': # only setattr's for now...
if len(ec) == 5:
# In xsync crawl, we already have stat data
# avoid doing stat again
meta_gfid.add((os.path.join(pfx, ec[0]),
XCrawlMetadata(st_uid=ec[2],
st_gid=ec[3],
st_mode=ec[4],
st_atime=ec[5],
st_mtime=ec[6])))
else:
meta_gfid.add((os.path.join(pfx, ec[0]), ))
db_record_meta(os.path.join(pfx, ec[0]), change_ts)
elif ec[1] == 'SETXATTR' or ec[1] == 'XATTROP' or \
ec[1] == 'FXATTROP':
# To sync xattr/acls use rsync/tar, --xattrs and --acls
# switch to rsync and tar
if not boolify(gconf.use_tarssh) and \
(boolify(gconf.sync_xattrs) or boolify(gconf.sync_acls)):
datas.add(os.path.join(pfx, ec[0]))
db_record_data(os.path.join(pfx, ec[0]), change_ts)
else:
logging.warn('got invalid changelog type: %s' % (et))
logging.debug('entries: %s' % repr(entries))
# Increment counters for Status
self.status.inc_value("entry", len(entries))
self.files_in_batch += len(datas)
self.status.inc_value("data", len(datas))
# sync namespace
if entries:
if entries and not ignore_entry_ops:
# Increment counters for Status
self.status.inc_value("entry", len(entries))
failures = self.slave.server.entry_ops(entries)
log_failures(failures, 'gfid', gauxpfx(), 'ENTRY')
self.log_failures(failures, 'gfid', gauxpfx(), 'ENTRY')
self.status.dec_value("entry", len(entries))
# sync metadata
if meta_gfid:
meta_entries = []
for go in meta_gfid:
if len(go) > 1:
st = go[1]
else:
st = lstat(go[0])
if isinstance(st, int):
logging.debug('file %s got purged in the interim' % go[0])
continue
meta_entries.append(edct('META', go=go[0], stat=st))
if meta_entries:
self.status.inc_value("meta", len(entries))
failures = self.slave.server.meta_ops(meta_entries)
log_failures(failures, 'go', '', 'META')
self.status.dec_value("meta", len(entries))
# Update Entry stime in Brick Root only in case of Changelog mode
if self.name in ["live_changelog", "history_changelog"]:
entry_stime_to_update = (int(change_ts) - 1, 0)
self.upd_entry_stime(entry_stime_to_update)
self.status.set_field("last_synced_entry",
entry_stime_to_update[0])
# sync data
if datas:
if ignore_entry_ops:
# Book keeping, to show in logs the range of Changelogs skipped
self.num_skipped_entry_changelogs += 1
if self.skipped_entry_changelogs_first is None:
self.skipped_entry_changelogs_first = change_ts
self.skipped_entry_changelogs_last = change_ts
# Batch data based on number of changelogs as configured as
# gconf.max_data_changelogs_in_batch(Default is 24 hrs)
# stime will be set after completion of these batch, so on failure
# Geo-rep will progress day by day
if (self.num_changelogs > gconf.max_data_changelogs_in_batch):
# (Start Changelog TS, End Changelog TS, [Changes])
self.data_batches.append([self.data_batch_start, change_ts,
[change]])
self.data_batch_start = None
self.num_changelogs = 0
else:
self.data_batches[-1][1] = change_ts
self.data_batches[-1][2].append(change)
def datas_to_queue(self, start, end):
# Paginate db entries and add it to Rsync PostBox
offset = 0
total_datas = 0
while True:
# Db Pagination
datas = db_get_data(start=start, end=end,
limit=DB_PAGINATION_SIZE_DATA,
offset=offset)
if len(datas) == 0:
break
offset += DB_PAGINATION_SIZE_DATA
total_datas += len(datas)
self.a_syncdata(datas)
self.datas_in_batch.update(datas)
return total_datas
def process(self, changes, done=1):
tries = 0
def handle_data_sync(self, start, end, changes, done, total_datas):
"""
Wait till all rsync jobs are complete, also handle the retries
Update data stime Once Rsync jobs are complete.
"""
retry = False
self.unlinked_gfids = set()
self.files_in_batch = 0
self.datas_in_batch = set()
tries = 0
# Error log disabled till the last round
self.syncer.disable_errorlog()
while True:
# first, fire all changelog transfers in parallel. entry and
# metadata are performed synchronously, therefore in serial.
# However at the end of each changelog, data is synchronized
# with syncdata_async() - which means it is serial w.r.t
# entries/metadata of that changelog but happens in parallel
# with data of other changelogs.
if retry:
if tries == (self.MAX_RETRIES - 1):
# Enable Error logging if it is last retry
self.syncer.enable_errorlog()
self.datas_to_queue(start, end)
# Remove Unlinked GFIDs from Queue
for unlinked_gfid in self.unlinked_gfids:
if unlinked_gfid in self.datas_in_batch:
self.datas_in_batch.remove(unlinked_gfid)
# Retry only Sync. Do not retry entry ops
if self.datas_in_batch:
self.a_syncdata(self.datas_in_batch)
else:
for change in changes:
logging.debug('processing change %s' % change)
self.process_change(change, done, retry)
if not retry:
# number of changelogs processed in the batch
self.turns += 1
if retry and tries == (gconf.max_rsync_retries - 1):
# Enable Error logging if it is last retry
self.syncer.enable_errorlog()
# Now we wait for all the data transfers fired off in the above
# step to complete. Note that this is not ideal either. Ideally
@ -1016,38 +1038,34 @@ class GMasterChangelogMixin(GMasterCommon):
# @change is the last changelog (therefore max time for this batch)
if self.syncdata_wait():
self.unlinked_gfids = set()
if done:
xtl = (int(change.split('.')[-1]) - 1, 0)
xtl = (int(changes[-1].split('.')[-1]) - 1, 0)
self.upd_stime(xtl)
map(self.changelog_done_func, changes)
self.archive_and_purge_changelogs(changes)
# Reset Data counter after sync
self.status.dec_value("data", self.files_in_batch)
self.files_in_batch = 0
self.datas_in_batch = set()
self.status.dec_value("data", total_datas)
break
# We do not know which changelog transfer failed, retry everything.
retry = True
tries += 1
if tries == self.MAX_RETRIES:
if tries >= gconf.max_rsync_retries:
logging.error('changelogs %s could not be processed '
'completely - moving on...' %
' '.join(map(os.path.basename, changes)))
# Reset data counter on failure
self.status.dec_value("data", self.files_in_batch)
self.files_in_batch = 0
self.datas_in_batch = set()
self.status.dec_value("data", total_datas)
if done:
xtl = (int(change.split('.')[-1]) - 1, 0)
xtl = (int(changes[-1].split('.')[-1]) - 1, 0)
self.upd_stime(xtl)
map(self.changelog_done_func, changes)
self.archive_and_purge_changelogs(changes)
break
# it's either entry_ops() or Rsync that failed to do it's
# job. Mostly it's entry_ops() [which currently has a problem
# of failing to create an entry but failing to return an errno]
@ -1058,11 +1076,114 @@ class GMasterChangelogMixin(GMasterCommon):
logging.warn('incomplete sync, retrying changelogs: %s' %
' '.join(map(os.path.basename, changes)))
# Reset the Data counter before Retry
self.status.dec_value("data", self.files_in_batch)
self.files_in_batch = 0
time.sleep(0.5)
# Log Current batch details
if changes:
logging.info(
"{0} mode completed in {1:.4f} seconds "
"({2} - {3} Num: {4}) stime: {5}, entry_stime: {6}".format(
self.name,
time.time() - self.batch_start_time,
changes[0].split("/")[-1],
changes[-1].split("/")[-1],
len(changes),
repr(self.get_data_stime()),
repr(self.get_entry_stime())))
def process(self, changes, done=1):
retry = False
first_changelog_ts = changes[0].split(".")[-1]
db_init(os.path.join(self.tempdir, "temp_changelogs.db"))
self.skipped_entry_changelogs_first = None
self.skipped_entry_changelogs_last = None
self.num_skipped_entry_changelogs = 0
self.batch_start_time = time.time()
# (Start Changelog TS, End Changelog TS, [Changes])
self.data_batches = [[first_changelog_ts, first_changelog_ts, []]]
self.data_batch_start = None
self.num_changelogs = 0
for change in changes:
logging.debug('processing change %s' % change)
self.process_change(change, done, retry)
# number of changelogs processed in the batch
self.turns += 1
# Rsync/Tar will preserve permissions, so if a GFID exists
# in data queue then it syncs meta details too. Remove
# all meta from meta table if exists in data table
db_delete_meta_if_exists_in_data()
# All the Data/Meta populated, Commit the Changes in Db
db_commit()
# Log the Skipped Entry ops range if any
if self.skipped_entry_changelogs_first is not None and \
self.skipped_entry_changelogs_last is not None:
logging.info("Skipping already processed entry "
"ops from CHANGELOG.{0} to CHANGELOG.{1} "
"Num: {2}".format(
self.skipped_entry_changelogs_first,
self.skipped_entry_changelogs_last,
self.num_skipped_entry_changelogs))
# Entry Changelogs syncing finished
logging.info("Syncing Entries completed in {0:.4f} seconds "
"CHANGELOG.{1} - CHANGELOG.{2} "
"Num: {3}".format(
time.time() - self.batch_start_time,
changes[0].split(".")[-1],
changes[-1].split(".")[-1],
len(changes)))
# Update Status Data and Meta Count
self.status.inc_value("data", db_get_data_count())
self.status.inc_value("meta", db_get_meta_count())
for b in self.data_batches:
# Add to data Queue, so that Rsync will start parallelly
# while syncing Meta ops
total_datas = self.datas_to_queue(b[0], b[1])
# Sync Meta
offset = 0
while True:
# Db Pagination
meta_gfids = db_get_meta(start=b[0], end=b[1],
limit=DB_PAGINATION_SIZE_META,
offset=offset)
if len(meta_gfids) == 0:
break
offset += DB_PAGINATION_SIZE_META
# Collect required information for GFIDs which
# exists in Master
meta_entries = []
for go in meta_gfids:
st = lstat(go)
if isinstance(st, int):
logging.debug('file %s got purged in the '
'interim' % go)
continue
meta_entries.append(edct('META', go=go, stat=st))
if meta_entries:
failures = self.slave.server.meta_ops(meta_entries)
self.log_failures(failures, 'go', '', 'META')
self.status.dec_value("meta", len(meta_entries))
# Sync Data, Rsync already started syncing the files
# wait for the completion and retry if required.
self.handle_data_sync(b[0], b[1], b[2], done, total_datas)
def upd_entry_stime(self, stime):
self.slave.server.set_entry_stime(self.FLAT_DIR_HIERARCHY,
self.uuid,
stime)
def upd_stime(self, stime, path=None):
if not path:
path = self.FLAT_DIR_HIERARCHY
@ -1085,7 +1206,12 @@ class GMasterChangelogMixin(GMasterCommon):
remote_node_ip = node.split(":")[0]
self.status.set_slave_node(remote_node_ip)
def changelogs_batch_process(self, changes):
def changelogs_batch_process(self, changes, single_batch=False):
if single_batch and changes:
logging.debug('processing changes %s' % repr(changes))
self.process(changes)
return
changelogs_batches = []
current_size = 0
for c in changes:
@ -1112,16 +1238,15 @@ class GMasterChangelogMixin(GMasterCommon):
changes = []
# get stime (from the brick) and purge changelogs
# that are _historical_ to that time.
purge_time = self.get_purge_time()
data_stime = self.get_data_stime()
self.changelog_agent.scan()
self.crawls += 1
changes = self.changelog_agent.getchanges()
if changes:
if purge_time:
logging.info("slave's time: %s" % repr(purge_time))
if data_stime:
processed = [x for x in changes
if int(x.split('.')[-1]) < purge_time[0]]
if int(x.split('.')[-1]) < data_stime[0]]
for pr in processed:
logging.info(
'skipping already processed change: %s...' %
@ -1136,7 +1261,8 @@ class GMasterChangelogMixin(GMasterCommon):
self.changelog_agent = changelog_agent
self.sleep_interval = int(gconf.change_interval)
self.changelog_done_func = self.changelog_agent.done
self.processed_changelogs_dir = os.path.join(self.setup_working_dir(),
self.tempdir = self.setup_working_dir()
self.processed_changelogs_dir = os.path.join(self.tempdir,
".processed")
self.name = "live_changelog"
self.status = status
@ -1149,7 +1275,8 @@ class GMasterChangeloghistoryMixin(GMasterChangelogMixin):
self.history_crawl_start_time = register_time
self.changelog_done_func = self.changelog_agent.history_done
self.history_turns = 0
self.processed_changelogs_dir = os.path.join(self.setup_working_dir(),
self.tempdir = self.setup_working_dir()
self.processed_changelogs_dir = os.path.join(self.tempdir,
".history/.processed")
self.name = "history_changelog"
self.status = status
@ -1157,15 +1284,17 @@ class GMasterChangeloghistoryMixin(GMasterChangelogMixin):
def crawl(self):
self.history_turns += 1
self.status.set_worker_crawl_status("History Crawl")
purge_time = self.get_purge_time()
data_stime = self.get_data_stime()
end_time = int(time.time())
logging.info('starting history crawl... turns: %s, stime: %s, etime: %s'
% (self.history_turns, repr(purge_time), repr(end_time)))
logging.info('starting history crawl... turns: %s, stime: %s, '
'etime: %s, entry_stime: %s'
% (self.history_turns, repr(data_stime),
repr(end_time), self.get_entry_stime()))
if not purge_time or purge_time == URXTIME:
if not data_stime or data_stime == URXTIME:
logging.info("stime not available, abandoning history crawl")
raise NoPurgeTimeAvailable()
raise NoStimeAvailable()
# Changelogs backend path is hardcoded as
# <BRICK_PATH>/.glusterfs/changelogs, if user configured to different
@ -1174,7 +1303,7 @@ class GMasterChangeloghistoryMixin(GMasterChangelogMixin):
".glusterfs/changelogs")
ret, actual_end = self.changelog_agent.history(
changelog_path,
purge_time[0],
data_stime[0],
end_time,
int(gconf.sync_jobs))
@ -1184,27 +1313,42 @@ class GMasterChangeloghistoryMixin(GMasterChangelogMixin):
# to be processed. returns positive value as number of changelogs
# to be processed, which will be fetched using
# history_getchanges()
while self.changelog_agent.history_scan() > 0:
num_scanned_changelogs = self.changelog_agent.history_scan()
num_changelogs = num_scanned_changelogs
changes = []
while num_scanned_changelogs > 0:
self.crawls += 1
changes = self.changelog_agent.history_getchanges()
changes += self.changelog_agent.history_getchanges()
if changes:
if purge_time:
logging.info("slave's time: %s" % repr(purge_time))
if data_stime:
processed = [x for x in changes
if int(x.split('.')[-1]) < purge_time[0]]
if int(x.split('.')[-1]) < data_stime[0]]
for pr in processed:
logging.info('skipping already processed change: '
'%s...' % os.path.basename(pr))
self.changelog_done_func(pr)
changes.remove(pr)
self.changelogs_batch_process(changes)
if num_changelogs > gconf.max_history_changelogs_in_batch:
self.changelogs_batch_process(changes, single_batch=True)
num_changelogs = 0
changes = []
num_scanned_changelogs = self.changelog_agent.history_scan()
num_changelogs += num_scanned_changelogs
# If Last batch is not processed with MAX_NUM_CHANGELOGS_IN_BATCH
# condition above
if changes:
self.changelogs_batch_process(changes, single_batch=True)
history_turn_time = int(time.time()) - self.history_crawl_start_time
logging.info('finished history crawl syncing, endtime: %s, stime: %s'
% (actual_end, repr(self.get_purge_time())))
logging.info('finished history crawl syncing, endtime: %s, '
'stime: %s, entry_stime: %s'
% (actual_end, repr(self.get_data_stime()),
self.get_entry_stime()))
# If TS returned from history_changelog is < register_time
# then FS crawl may be required, since history is only available
@ -1269,14 +1413,14 @@ class GMasterXsyncMixin(GMasterChangelogMixin):
t = Thread(target=Xsyncer)
t.start()
logging.info('starting hybrid crawl..., stime: %s'
% repr(self.get_purge_time()))
% repr(self.get_data_stime()))
self.status.set_worker_crawl_status("Hybrid Crawl")
while True:
try:
item = self.comlist.pop(0)
if item[0] == 'finale':
logging.info('finished hybrid crawl syncing, stime: %s'
% repr(self.get_purge_time()))
% repr(self.get_data_stime()))
break
elif item[0] == 'xsync':
logging.info('processing xsync changelog %s' % (item[1]))

View File

@ -31,11 +31,10 @@ import shutil
from gconf import gconf
import repce
from repce import RepceServer, RepceClient
from master import gmaster_builder
import syncdutils
from syncdutils import GsyncdError, select, privileged, boolify, funcode
from syncdutils import umask, entry2pb, gauxpfx, errno_wrap, lstat
from syncdutils import NoPurgeTimeAvailable, PartialHistoryAvailable
from syncdutils import NoStimeAvailable, PartialHistoryAvailable
from syncdutils import ChangelogException, ChangelogHistoryNotAvailable
from syncdutils import CHANGELOG_AGENT_CLIENT_VERSION
from gsyncdstatus import GeorepStatus
@ -521,6 +520,29 @@ class Server(object):
else:
raise
@classmethod
@_pathguard
def entry_stime(cls, path, uuid):
"""
entry_stime xattr to reduce the number of retry of Entry changes when
Geo-rep worker crashes and restarts. entry_stime is updated after
processing every changelog file. On failure and restart, worker only
have to reprocess the last changelog for Entry ops.
Xattr Key: <PFX>.<MASTERVOL_UUID>.<SLAVEVOL_UUID>.entry_stime
"""
try:
val = Xattr.lgetxattr(path,
'.'.join([cls.GX_NSPACE, uuid,
'entry_stime']),
8)
return struct.unpack('!II', val)
except OSError:
ex = sys.exc_info()[1]
if ex.errno in (ENOENT, ENODATA, ENOTDIR):
return ex.errno
else:
raise
@classmethod
def node_uuid(cls, path='.'):
try:
@ -540,6 +562,16 @@ class Server(object):
struct.pack('!II', *mark)],
[ENOENT])
@classmethod
@_pathguard
def set_entry_stime(cls, path, uuid, mark):
"""set @mark as stime for @uuid on @path"""
errno_wrap(Xattr.lsetxattr,
[path,
'.'.join([cls.GX_NSPACE, uuid, 'entry_stime']),
struct.pack('!II', *mark)],
[ENOENT])
@classmethod
@_pathguard
def set_xtime(cls, path, uuid, mark):
@ -1376,6 +1408,7 @@ class GLUSTER(AbstractUrl, SlaveLocal, SlaveRemote):
def gmaster_instantiate_tuple(self, slave):
"""return a tuple of the 'one shot' and the 'main crawl'
class instance"""
from master import gmaster_builder
return (gmaster_builder('xsync')(self, slave),
gmaster_builder()(self, slave),
gmaster_builder('changeloghistory')(self, slave))
@ -1436,6 +1469,13 @@ class GLUSTER(AbstractUrl, SlaveLocal, SlaveRemote):
uuid + '.' + gconf.slave_id)
),
slave.server)
slave.server.entry_stime = types.MethodType(
lambda _self, path, uuid: (
brickserver.entry_stime(
path,
uuid + '.' + gconf.slave_id)
),
slave.server)
slave.server.set_stime = types.MethodType(
lambda _self, path, uuid, mark: (
brickserver.set_stime(path,
@ -1443,6 +1483,14 @@ class GLUSTER(AbstractUrl, SlaveLocal, SlaveRemote):
mark)
),
slave.server)
slave.server.set_entry_stime = types.MethodType(
lambda _self, path, uuid, mark: (
brickserver.set_entry_stime(
path,
uuid + '.' + gconf.slave_id,
mark)
),
slave.server)
(g1, g2, g3) = self.gmaster_instantiate_tuple(slave)
g1.master.server = brickserver
g2.master.server = brickserver
@ -1506,7 +1554,7 @@ class GLUSTER(AbstractUrl, SlaveLocal, SlaveRemote):
except ChangelogHistoryNotAvailable:
logging.info('Changelog history not available, using xsync')
g1.crawlwrap(oneshot=True, register_time=register_time)
except NoPurgeTimeAvailable:
except NoStimeAvailable:
logging.info('No stime available, using xsync crawl')
g1.crawlwrap(oneshot=True, register_time=register_time)
except ChangelogException as e:

View File

@ -45,7 +45,7 @@ except ImportError:
# auxiliary gfid based access prefix
_CL_AUX_GFID_PFX = ".gfid/"
GF_OP_RETRIES = 20
GF_OP_RETRIES = 10
CHANGELOG_AGENT_SERVER_VERSION = 1.0
CHANGELOG_AGENT_CLIENT_VERSION = 1.0
@ -494,15 +494,18 @@ def errno_wrap(call, arg=[], errnos=[], retry_errnos=[]):
def lstat(e):
return errno_wrap(os.lstat, [e], [ENOENT], [ESTALE])
class NoPurgeTimeAvailable(Exception):
class NoStimeAvailable(Exception):
pass
class PartialHistoryAvailable(Exception):
pass
class ChangelogHistoryNotAvailable(Exception):
pass
class ChangelogException(OSError):
pass