geo-rep: Archive Changelogs and avoid generating empty XSync changelogs
With this patch, - Hybrid Crawl will not generate empty Changelogs - Archives Changelogs when processed(Hybrid(XSync), History, and Changelog Crawl - Passive worker cleans up its processing directory BUG: 1169331 Change-Id: I1383ffaed261cdf50da91b14260b4d43177657d1 Signed-off-by: Aravinda VK <avishwan@redhat.com> Reviewed-on: http://review.gluster.org/9453 Reviewed-by: Venky Shankar <vshankar@redhat.com> Tested-by: Venky Shankar <vshankar@redhat.com>
This commit is contained in:
parent
7847db9a6d
commit
1226083d0f
@ -252,6 +252,8 @@ def main_i():
|
||||
op.add_option('--sync-jobs', metavar='N', type=int, default=3)
|
||||
op.add_option('--replica-failover-interval', metavar='N',
|
||||
type=int, default=1)
|
||||
op.add_option('--changelog-archive-format', metavar='N',
|
||||
type=str, default="%Y%m")
|
||||
op.add_option(
|
||||
'--turns', metavar='N', type=int, default=0, help=SUPPRESS_HELP)
|
||||
op.add_option('--allow-network', metavar='IPS', default='')
|
||||
|
@ -17,6 +17,7 @@ import logging
|
||||
import socket
|
||||
import string
|
||||
import errno
|
||||
import tarfile
|
||||
from errno import ENOENT, ENODATA, EPIPE, EEXIST
|
||||
from threading import Condition, Lock
|
||||
from datetime import datetime
|
||||
@ -533,6 +534,19 @@ class GMasterCommon(object):
|
||||
if brick_stime < cluster_stime:
|
||||
self.slave.server.set_stime(
|
||||
self.FLAT_DIR_HIERARCHY, self.uuid, cluster_stime)
|
||||
# Purge all changelogs available in processing dir
|
||||
# less than cluster_stime
|
||||
proc_dir = os.path.join(self.setup_working_dir(),
|
||||
".processing")
|
||||
|
||||
if os.path.exists(proc_dir):
|
||||
to_purge = [f for f in os.listdir(proc_dir)
|
||||
if (f.startswith("CHANGELOG.") and
|
||||
int(f.split('.')[-1]) <
|
||||
cluster_stime[0])]
|
||||
for f in to_purge:
|
||||
os.remove(os.path.join(proc_dir, f))
|
||||
|
||||
time.sleep(5)
|
||||
continue
|
||||
self.update_worker_health("Active")
|
||||
@ -775,6 +789,47 @@ class GMasterChangelogMixin(GMasterCommon):
|
||||
CHANGELOG_LOG_LEVEL = 9
|
||||
CHANGELOG_CONN_RETRIES = 5
|
||||
|
||||
def archive_and_purge_changelogs(self, changelogs):
|
||||
# Creates tar file instead of tar.gz, since changelogs will
|
||||
# be appended to existing tar. archive name is
|
||||
# archive_<YEAR><MONTH>.tar
|
||||
archive_name = "archive_%s.tar" % datetime.today().strftime(
|
||||
gconf.changelog_archive_format)
|
||||
|
||||
try:
|
||||
tar = tarfile.open(os.path.join(self.processed_changelogs_dir,
|
||||
archive_name),
|
||||
"a")
|
||||
except tarfile.ReadError:
|
||||
tar = tarfile.open(os.path.join(self.processed_changelogs_dir,
|
||||
archive_name),
|
||||
"w")
|
||||
|
||||
for f in changelogs:
|
||||
try:
|
||||
f = os.path.basename(f)
|
||||
tar.add(os.path.join(self.processed_changelogs_dir, f),
|
||||
arcname=os.path.basename(f))
|
||||
except:
|
||||
exc = sys.exc_info()[1]
|
||||
if ((isinstance(exc, OSError) or
|
||||
isinstance(exc, IOError)) and exc.errno == ENOENT):
|
||||
continue
|
||||
else:
|
||||
tar.close()
|
||||
raise
|
||||
tar.close()
|
||||
|
||||
for f in changelogs:
|
||||
try:
|
||||
f = os.path.basename(f)
|
||||
os.remove(os.path.join(self.processed_changelogs_dir, f))
|
||||
except OSError as e:
|
||||
if e.errno == errno.ENOENT:
|
||||
continue
|
||||
else:
|
||||
raise
|
||||
|
||||
def fallback_xsync(self):
|
||||
logging.info('falling back to xsync mode')
|
||||
gconf.configinterface.set('change-detector', 'xsync')
|
||||
@ -990,6 +1045,7 @@ class GMasterChangelogMixin(GMasterCommon):
|
||||
xtl = (int(change.split('.')[-1]) - 1, 0)
|
||||
self.upd_stime(xtl)
|
||||
map(self.changelog_done_func, changes)
|
||||
self.archive_and_purge_changelogs(changes)
|
||||
self.update_worker_files_syncd()
|
||||
break
|
||||
|
||||
@ -1009,6 +1065,7 @@ class GMasterChangelogMixin(GMasterCommon):
|
||||
xtl = (int(change.split('.')[-1]) - 1, 0)
|
||||
self.upd_stime(xtl)
|
||||
map(self.changelog_done_func, changes)
|
||||
self.archive_and_purge_changelogs(changes)
|
||||
break
|
||||
# it's either entry_ops() or Rsync that failed to do it's
|
||||
# job. Mostly it's entry_ops() [which currently has a problem
|
||||
@ -1204,6 +1261,7 @@ class GMasterChangelogMixin(GMasterCommon):
|
||||
os.path.basename(pr))
|
||||
self.changelog_done_func(pr)
|
||||
changes.remove(pr)
|
||||
self.archive_and_purge_changelogs(processed)
|
||||
|
||||
if changes:
|
||||
logging.debug('processing changes %s' % repr(changes))
|
||||
@ -1213,6 +1271,8 @@ class GMasterChangelogMixin(GMasterCommon):
|
||||
self.changelog_agent = changelog_agent
|
||||
self.sleep_interval = int(gconf.change_interval)
|
||||
self.changelog_done_func = self.changelog_agent.done
|
||||
self.processed_changelogs_dir = os.path.join(self.setup_working_dir(),
|
||||
".processed")
|
||||
|
||||
|
||||
class GMasterChangeloghistoryMixin(GMasterChangelogMixin):
|
||||
@ -1222,6 +1282,8 @@ class GMasterChangeloghistoryMixin(GMasterChangelogMixin):
|
||||
self.history_crawl_start_time = register_time
|
||||
self.changelog_done_func = self.changelog_agent.history_done
|
||||
self.history_turns = 0
|
||||
self.processed_changelogs_dir = os.path.join(self.setup_working_dir(),
|
||||
".history/.processed")
|
||||
|
||||
def crawl(self, no_stime_update=False):
|
||||
self.history_turns += 1
|
||||
@ -1314,6 +1376,7 @@ class GMasterXsyncMixin(GMasterChangelogMixin):
|
||||
self.sleep_interval = 60
|
||||
self.tempdir = self.setup_working_dir()
|
||||
self.tempdir = os.path.join(self.tempdir, 'xsync')
|
||||
self.processed_changelogs_dir = self.tempdir
|
||||
logging.info('xsync temp directory: %s' % self.tempdir)
|
||||
try:
|
||||
os.makedirs(self.tempdir)
|
||||
@ -1348,6 +1411,7 @@ class GMasterXsyncMixin(GMasterChangelogMixin):
|
||||
elif item[0] == 'xsync':
|
||||
logging.info('processing xsync changelog %s' % (item[1]))
|
||||
self.process([item[1]], 0)
|
||||
self.archive_and_purge_changelogs([item[1]])
|
||||
elif item[0] == 'stime':
|
||||
if not no_stime_update:
|
||||
# xsync is started after running history but if
|
||||
@ -1367,6 +1431,9 @@ class GMasterXsyncMixin(GMasterChangelogMixin):
|
||||
time.sleep(1)
|
||||
|
||||
def write_entry_change(self, prefix, data=[]):
|
||||
if not getattr(self, "fh", None):
|
||||
self.open()
|
||||
|
||||
self.fh.write("%s %s\n" % (prefix, ' '.join(data)))
|
||||
|
||||
def open(self):
|
||||
@ -1378,7 +1445,11 @@ class GMasterXsyncMixin(GMasterChangelogMixin):
|
||||
raise
|
||||
|
||||
def close(self):
|
||||
self.fh.close()
|
||||
if getattr(self, "fh", None):
|
||||
self.fh.flush()
|
||||
os.fsync(self.fh.fileno())
|
||||
self.fh.close()
|
||||
self.fh = None
|
||||
|
||||
def fname(self):
|
||||
return self.xsync_change
|
||||
@ -1389,11 +1460,11 @@ class GMasterXsyncMixin(GMasterChangelogMixin):
|
||||
def sync_xsync(self, last):
|
||||
"""schedule a processing of changelog"""
|
||||
self.close()
|
||||
self.put('xsync', self.fname())
|
||||
if self.counter > 0:
|
||||
self.put('xsync', self.fname())
|
||||
self.counter = 0
|
||||
if not last:
|
||||
time.sleep(1) # make sure changelogs are 1 second apart
|
||||
self.open()
|
||||
|
||||
def sync_stime(self, stime=None, last=False):
|
||||
"""schedule a stime synchronization"""
|
||||
@ -1427,7 +1498,6 @@ class GMasterXsyncMixin(GMasterChangelogMixin):
|
||||
the filesystem tree, but set after directory synchronization.
|
||||
"""
|
||||
if path == '.':
|
||||
self.open()
|
||||
self.crawls += 1
|
||||
if not xtr_root:
|
||||
# get the root stime and use it for all comparisons
|
||||
@ -1479,7 +1549,9 @@ class GMasterXsyncMixin(GMasterChangelogMixin):
|
||||
logging.warn('skipping entry %s..' % e)
|
||||
continue
|
||||
mo = st.st_mode
|
||||
self.counter += 1
|
||||
self.counter += 1 if ((stat.S_ISDIR(mo) or
|
||||
stat.S_ISLNK(mo) or
|
||||
stat.S_ISREG(mo))) else 0
|
||||
if self.counter == self.XSYNC_MAX_ENTRIES:
|
||||
self.sync_done(self.stimes, False)
|
||||
self.stimes = []
|
||||
|
Loading…
x
Reference in New Issue
Block a user