geo-rep: Consume Changelog History API

Every time when geo-rep restarts it first does FS crawl using
XCrawl and then switches to Changelog Mode. This is because changelog
only had live API, that is we can get changes only after registering.

Now this(http://review.gluster.org/#/c/6930/) patch introduces History
API for changelogs. If history is available then geo-rep will use it
instead of FS Crawl.

History API returns TS till what time history is available for
given start and end time. If TS < endtime then switch to FS Crawl.
(History => FS Crawl => Live Changelog)

If TS >= endtime, then switch directly to Changelog mode
(History => Live Changelog)

Change-Id: I4922f62b9f899c40643bd35720e0c81c36b2f255
Signed-off-by: Aravinda VK <avishwan@redhat.com>
Reviewed-on: http://review.gluster.org/6938
Reviewed-by: Venky Shankar <vshankar@redhat.com>
Reviewed-by: Humble Devassy Chirammal <humble.devassy@gmail.com>
Tested-by: Gluster Build System <jenkins@build.gluster.com>
This commit is contained in:
Aravinda VK 2014-02-07 11:24:41 +05:30 committed by Vijay Bellur
parent d09b327a27
commit 45d70cc748
4 changed files with 198 additions and 31 deletions

View File

@ -13,6 +13,10 @@ from ctypes import CDLL, create_string_buffer, get_errno
from ctypes.util import find_library
class ChangelogException(OSError):
pass
class Changes(object):
libgfc = CDLL(find_library("gfchangelog"), use_errno=True)
@ -21,9 +25,9 @@ class Changes(object):
return get_errno()
@classmethod
def raise_oserr(cls):
def raise_changelog_err(cls):
errn = cls.geterrno()
raise OSError(errn, os.strerror(errn))
raise ChangelogException(errn, os.strerror(errn))
@classmethod
def _get_api(cls, call):
@ -35,19 +39,19 @@ class Changes(object):
log_file,
log_level, retries)
if ret == -1:
cls.raise_oserr()
cls.raise_changelog_err()
@classmethod
def cl_scan(cls):
ret = cls._get_api('gf_changelog_scan')()
if ret == -1:
cls.raise_oserr()
cls.raise_changelog_err()
@classmethod
def cl_startfresh(cls):
ret = cls._get_api('gf_changelog_start_fresh')()
if ret == -1:
cls.raise_oserr()
cls.raise_changelog_err()
@classmethod
def cl_getchanges(cls):
@ -64,7 +68,7 @@ class Changes(object):
break
changes.append(buf.raw[:ret - 1])
if ret == -1:
cls.raise_oserr()
cls.raise_changelog_err()
# cleanup tracker
cls.cl_startfresh()
return sorted(changes, key=clsort)
@ -73,4 +77,48 @@ class Changes(object):
def cl_done(cls, clfile):
ret = cls._get_api('gf_changelog_done')(clfile)
if ret == -1:
cls.raise_oserr()
cls.raise_changelog_err()
@classmethod
def cl_history_scan(cls):
ret = cls._get_api('gf_history_changelog_scan')()
if ret == -1:
cls.raise_changelog_err()
return ret
@classmethod
def cl_history_changelog(cls, changelog_path, start, end):
ret = cls._get_api('gf_history_changelog')(changelog_path, start, end)
if ret == -1:
cls.raise_changelog_err()
return ret
@classmethod
def cl_history_startfresh(cls):
ret = cls._get_api('gf_history_changelog_start_fresh')()
if ret == -1:
cls.raise_changelog_err()
@classmethod
def cl_history_getchanges(cls):
changes = []
buf = create_string_buffer('\0', 4096)
call = cls._get_api('gf_history_changelog_next_change')
while True:
ret = call(buf, 4096)
if ret in (0, -1):
break
changes.append(buf.raw[:ret - 1])
if ret == -1:
cls.raise_changelog_err()
return changes
@classmethod
def cl_history_done(cls, clfile):
ret = cls._get_api('gf_history_changelog_done')(clfile)
if ret == -1:
cls.raise_changelog_err()

View File

@ -25,6 +25,7 @@ from tempfile import NamedTemporaryFile
from syncdutils import Thread, GsyncdError, boolify, escape
from syncdutils import unescape, select, gauxpfx, md5hex, selfkill
from syncdutils import lstat, errno_wrap
from syncdutils import NoPurgeTimeAvailable, PartialHistoryAvailable
URXTIME = (-1, 0)
@ -904,7 +905,7 @@ class GMasterChangelogMixin(GMasterCommon):
if done:
xtl = (int(change.split('.')[-1]) - 1, 0)
self.upd_stime(xtl)
map(self.master.server.changelog_done, changes)
map(self.changelog_done_func, changes)
self.update_worker_files_syncd()
break
@ -923,7 +924,7 @@ class GMasterChangelogMixin(GMasterCommon):
if done:
xtl = (int(change.split('.')[-1]) - 1, 0)
self.upd_stime(xtl)
map(self.master.server.changelog_done, changes)
map(self.changelog_done_func, changes)
break
# it's either entry_ops() or Rsync that failed to do it's
# job. Mostly it's entry_ops() [which currently has a problem
@ -1106,12 +1107,9 @@ class GMasterChangelogMixin(GMasterCommon):
purge_time = self.xtime('.', self.slave)
if isinstance(purge_time, int):
purge_time = None
try:
self.master.server.changelog_scan()
self.crawls += 1
except OSError:
self.fallback_xsync()
self.update_worker_crawl_status("Hybrid Crawl")
self.master.server.changelog_scan()
self.crawls += 1
changes = self.master.server.changelog_getchanges()
if changes:
if purge_time:
@ -1124,23 +1122,82 @@ class GMasterChangelogMixin(GMasterCommon):
os.path.basename(pr))
self.master.server.changelog_done(pr)
changes.remove(pr)
logging.debug('processing changes %s' % repr(changes))
if changes:
logging.debug('processing changes %s' % repr(changes))
self.process(changes)
def register(self):
(workdir, logfile) = self.setup_working_dir()
self.sleep_interval = int(gconf.change_interval)
self.changelog_done_func = self.master.server.changelog_done
# register with the changelog library
try:
# 9 == log level (DEBUG)
# 5 == connection retries
self.master.server.changelog_register(gconf.local_path,
workdir, logfile, 9, 5)
except OSError:
self.fallback_xsync()
# control should not reach here
raise
# 9 == log level (DEBUG)
# 5 == connection retries
self.master.server.changelog_register(gconf.local_path,
workdir, logfile, 9, 5)
class GMasterChangeloghistoryMixin(GMasterChangelogMixin):
def register(self):
super(GMasterChangeloghistoryMixin, self).register()
self.changelog_register_time = int(time.time())
self.changelog_done_func = self.master.server.history_changelog_done
def crawl(self):
self.update_worker_crawl_status("History Crawl")
# get stime (from the brick) and purge changelogs
# that are _historical_ to that time.
purge_time = self.xtime('.', self.slave)
if isinstance(purge_time, int):
purge_time = None
if not purge_time or purge_time == URXTIME:
raise NoPurgeTimeAvailable()
logging.debug("Get changelog history between %s and %s" %
(purge_time[0], self.changelog_register_time))
# Changelogs backend path is hardcoded as
# <BRICK_PATH>/.glusterfs/changelogs, if user configured to different
# location then consuming history will not work(Known issue as of now)
changelog_path = os.path.join(gconf.local_path,
".glusterfs/changelogs")
ts = self.master.server.history_changelog(changelog_path,
purge_time[0],
self.changelog_register_time)
# scan followed by getchanges till scan returns zero.
# history_changelog_scan() is blocking call, till it gets the number
# of changelogs to process. Returns zero when no changelogs
# to be processed. returns positive value as number of changelogs
# to be processed, which will be fetched using
# history_changelog_getchanges()
while self.master.server.history_changelog_scan() > 0:
self.crawls += 1
changes = self.master.server.history_changelog_getchanges()
if changes:
if purge_time:
logging.info("slave's time: %s" % repr(purge_time))
processed = [x for x in changes
if int(x.split('.')[-1]) < purge_time[0]]
for pr in processed:
logging.info('skipping already processed change: '
'%s...' % os.path.basename(pr))
self.changelog_done_func(pr)
changes.remove(pr)
if changes:
logging.debug('processing changes %s' % repr(changes))
self.process(changes)
# If TS returned from history_changelog is < register_time
# then FS crawl may be required, since history is only available
# till TS returned from history_changelog
if ts < self.changelog_register_time:
raise PartialHistoryAvailable(str(ts))
class GMasterXsyncMixin(GMasterChangelogMixin):

View File

@ -33,6 +33,8 @@ from master import gmaster_builder
import syncdutils
from syncdutils import GsyncdError, select, privileged, boolify, funcode
from syncdutils import umask, entry2pb, gauxpfx, errno_wrap, lstat
from syncdutils import NoPurgeTimeAvailable, PartialHistoryAvailable
from libgfchangelog import ChangelogException
UrlRX = re.compile('\A(\w+)://([^ *?[]*)\Z')
HostRX = re.compile('[a-z\d](?:[a-z\d.-]*[a-z\d])?', re.I)
@ -682,6 +684,22 @@ class Server(object):
def changelog_done(cls, clfile):
Changes.cl_done(clfile)
@classmethod
def history_changelog(cls, changelog_path, start, end):
return Changes.cl_history_changelog(changelog_path, start, end)
@classmethod
def history_changelog_scan(cls):
return Changes.cl_history_scan()
@classmethod
def history_changelog_getchanges(cls):
return Changes.cl_history_getchanges()
@classmethod
def history_changelog_done(cls, clfile):
Changes.cl_history_done(clfile)
@classmethod
@_pathguard
def setattr(cls, path, adct):
@ -1213,7 +1231,8 @@ class GLUSTER(AbstractUrl, SlaveLocal, SlaveRemote):
"""return a tuple of the 'one shot' and the 'main crawl'
class instance"""
return (gmaster_builder('xsync')(self, slave),
gmaster_builder()(self, slave))
gmaster_builder()(self, slave),
gmaster_builder('changeloghistory')(self, slave))
def service_loop(self, *args):
"""enter service loop
@ -1277,20 +1296,55 @@ class GLUSTER(AbstractUrl, SlaveLocal, SlaveRemote):
mark)
),
slave.server)
(g1, g2) = self.gmaster_instantiate_tuple(slave)
(g1, g2, g3) = self.gmaster_instantiate_tuple(slave)
g1.master.server = brickserver
g2.master.server = brickserver
g3.master.server = brickserver
else:
(g1, g2) = self.gmaster_instantiate_tuple(slave)
(g1, g2, g3) = self.gmaster_instantiate_tuple(slave)
g1.master.server.aggregated = gmaster.master.server
g2.master.server.aggregated = gmaster.master.server
g3.master.server.aggregated = gmaster.master.server
# bad bad bad: bad way to do things like this
# need to make this elegant
# register the crawlers and start crawling
# g1 ==> Xsync, g2 ==> config.change_detector(changelog by default)
# g3 ==> changelog History
g1.register()
g2.register()
g1.crawlwrap(oneshot=True)
g2.crawlwrap()
try:
g2.register()
g3.register()
except ChangelogException as e:
logging.debug("Changelog register failed: %s - %s" %
(e.errno, e.strerror))
# oneshot: Try to use changelog history api, if not
# available switch to FS crawl
# Note: if config.change_detector is xsync then
# it will not use changelog history api
try:
g3.crawlwrap(oneshot=True)
except (ChangelogException, NoPurgeTimeAvailable,
PartialHistoryAvailable) as e:
if isinstance(e, ChangelogException):
logging.debug('Changelog history crawl failed, failback '
'to xsync: %s - %s' % (e.errno, e.strerror))
elif isinstance(e, NoPurgeTimeAvailable):
logging.debug('Using xsync crawl since no purge time '
'available')
elif isinstance(e, PartialHistoryAvailable):
logging.debug('Using xsync crawl after consuming history '
'till %s' % str(e))
g1.crawlwrap(oneshot=True)
# crawl loop: Try changelog crawl, if failed
# switch to FS crawl
try:
g2.crawlwrap()
except ChangelogException as e:
logging.debug('Changelog crawl failed, failback to xsync: '
'%s - %s' % (e.errno, e.strerror))
g1.crawlwrap()
else:
sup(self, *args)

View File

@ -488,3 +488,11 @@ def lstat(e):
return ex.errno
else:
raise
class NoPurgeTimeAvailable(Exception):
pass
class PartialHistoryAvailable(Exception):
pass