geo-rep: Structured log support

Changed all log messages to structured log format

Change-Id: Idae25f8b4ad0bbae38f4362cbda7bbf51ce7607b
Updates: #240
Signed-off-by: Aravinda VK <avishwan@redhat.com>
Reviewed-on: https://review.gluster.org/17551
Smoke: Gluster Build System <jenkins@build.gluster.org>
NetBSD-regression: NetBSD Build System <jenkins@build.gluster.org>
CentOS-regression: Gluster Build System <jenkins@build.gluster.org>
Reviewed-by: Kotresh HR <khiremat@redhat.com>
This commit is contained in:
Aravinda VK 2017-06-15 18:09:36 +05:30
parent 52d0886cfb
commit 0a8dac38ac
7 changed files with 289 additions and 210 deletions

View File

@ -39,7 +39,7 @@ from changelogagent import agent, Changelog
from gsyncdstatus import set_monitor_status, GeorepStatus, human_time_utc
from libcxattr import Xattr
import struct
from syncdutils import get_master_and_slave_data_from_args
from syncdutils import get_master_and_slave_data_from_args, lf
ParseError = XET.ParseError if hasattr(XET, 'ParseError') else SyntaxError
@ -127,24 +127,30 @@ def slave_vol_uuid_get(host, vol):
stdin=None, stdout=PIPE, stderr=PIPE)
vix, err = po.communicate()
if po.returncode != 0:
logging.info("Volume info failed, unable to get "
"volume uuid of %s present in %s,"
"returning empty string: %s" %
(vol, host, po.returncode))
logging.info(lf("Volume info failed, unable to get "
"volume uuid of slavevol, "
"returning empty string",
slavevol=vol,
slavehost=host,
error=po.returncode))
return ""
vi = XET.fromstring(vix)
if vi.find('opRet').text != '0':
logging.info("Unable to get volume uuid of %s, "
"present in %s returning empty string: %s" %
(vol, host, vi.find('opErrstr').text))
logging.info(lf("Unable to get volume uuid of slavevol, "
"returning empty string",
slavevol=vol,
slavehost=host,
error=vi.find('opErrstr').text))
return ""
try:
voluuid = vi.find("volInfo/volumes/volume/id").text
except (ParseError, AttributeError, ValueError) as e:
logging.info("Parsing failed to volume uuid of %s, "
"present in %s returning empty string: %s" %
(vol, host, e))
logging.info(lf("Parsing failed to volume uuid of slavevol, "
"returning empty string",
slavevol=vol,
slavehost=host,
error=e))
voluuid = ""
return voluuid
@ -692,16 +698,18 @@ def main_i():
if confdata.op == 'set':
if confdata.opt == 'checkpoint':
logging.info("Checkpoint Set: %s" % (
human_time_utc(confdata.val)))
logging.info(lf("Checkpoint Set",
time=human_time_utc(confdata.val)))
else:
logging.info("Config Set: %s = %s" % (
confdata.opt, confdata.val))
logging.info(lf("Config Set",
config=confdata.opt,
value=confdata.val))
elif confdata.op == 'del':
if confdata.opt == 'checkpoint':
logging.info("Checkpoint Reset")
else:
logging.info("Config Reset: %s" % confdata.opt)
logging.info(lf("Config Reset",
config=confdata.opt))
except IOError:
if sys.exc_info()[1].errno == ENOENT:
# directory of log path is not present,
@ -722,7 +730,8 @@ def main_i():
try:
GLogger._gsyncd_loginit(log_file=gconf.log_file, label='monitor')
gconf.log_exit = False
logging.info("Monitor Status: %s" % create)
logging.info(lf("Monitor Status Change",
status=create))
except IOError:
if sys.exc_info()[1].errno == ENOENT:
# If log dir not present
@ -772,7 +781,8 @@ def main_i():
if be_agent:
os.setsid()
logging.debug('rpc_fd: %s' % repr(gconf.rpc_fd))
logging.debug(lf("RPC FD",
rpc_fd=repr(gconf.rpc_fd)))
return agent(Changelog(), gconf.rpc_fd)
if be_monitor:
@ -786,7 +796,7 @@ def main_i():
remote.connect_remote(go_daemon='done')
local.connect()
if ffd:
logging.info ("Closing feedback fd, waking up the monitor")
logging.info("Closing feedback fd, waking up the monitor")
os.close(ffd)
local.service_loop(*[r for r in [remote] if r])

View File

@ -20,7 +20,7 @@ from errno import EACCES, EAGAIN, ENOENT
import logging
from syncdutils import EVENT_GEOREP_ACTIVE, EVENT_GEOREP_PASSIVE, gf_event
from syncdutils import EVENT_GEOREP_CHECKPOINT_COMPLETED
from syncdutils import EVENT_GEOREP_CHECKPOINT_COMPLETED, lf
DEFAULT_STATUS = "N/A"
MONITOR_STATUS = ("Created", "Started", "Paused", "Stopped")
@ -225,10 +225,10 @@ class GeorepStatus(object):
data["checkpoint_time"] = checkpoint_time
data["checkpoint_completion_time"] = curr_time
data["checkpoint_completed"] = "Yes"
logging.info("Checkpoint completed. Checkpoint "
"Time: %s, Completion Time: %s" % (
human_time_utc(checkpoint_time),
human_time_utc(curr_time)))
logging.info(lf("Checkpoint completed",
checkpoint_time=human_time_utc(
checkpoint_time),
completion_time=human_time_utc(curr_time)))
self.trigger_gf_event_checkpoint_completion(
checkpoint_time, curr_time)
@ -238,11 +238,13 @@ class GeorepStatus(object):
def set_worker_status(self, status):
if self.set_field("worker_status", status):
logging.info("Worker Status: %s" % status)
logging.info(lf("Worker Status Change",
status=status))
def set_worker_crawl_status(self, status):
if self.set_field("crawl_status", status):
logging.info("Crawl Status: %s" % status)
logging.info(lf("Crawl Status Change",
status=status))
def set_slave_node(self, slave_node):
def merger(data):
@ -269,12 +271,14 @@ class GeorepStatus(object):
def set_active(self):
if self.set_field("worker_status", "Active"):
logging.info("Worker Status: Active")
logging.info(lf("Worker Status Change",
status="Active"))
self.send_event(EVENT_GEOREP_ACTIVE)
def set_passive(self):
if self.set_field("worker_status", "Passive"):
logging.info("Worker Status: Passive")
logging.info(lf("Worker Status Change",
status="Passive"))
self.send_event(EVENT_GEOREP_PASSIVE)
def get_monitor_status(self):

View File

@ -24,7 +24,7 @@ from datetime import datetime
from gconf import gconf
from syncdutils import Thread, GsyncdError, boolify, escape
from syncdutils import unescape, gauxpfx, md5hex, selfkill
from syncdutils import lstat, errno_wrap, FreeObject
from syncdutils import lstat, errno_wrap, FreeObject, lf
from syncdutils import NoStimeAvailable, PartialHistoryAvailable
URXTIME = (-1, 0)
@ -54,8 +54,8 @@ def _volinfo_hook_relax_foreign(self):
fgn_vi = volinfo_sys[self.KFGN]
if fgn_vi:
expiry = fgn_vi['timeout'] - int(time.time()) + 1
logging.info('foreign volume info found, waiting %d sec for expiry' %
expiry)
logging.info(lf('foreign volume info found, waiting for expiry',
expiry=expiry))
time.sleep(expiry)
volinfo_sys = self.get_sys_volinfo()
return volinfo_sys
@ -90,7 +90,8 @@ def gmaster_builder(excrawl=None):
modemixin = 'normal'
changemixin = 'xsync' if gconf.change_detector == 'xsync' \
else excrawl or gconf.change_detector
logging.debug('setting up %s change detection mode' % changemixin)
logging.debug(lf('setting up change detection mode',
mode=changemixin))
modemixin = getattr(this, modemixin.capitalize() + 'Mixin')
crawlmixin = getattr(this, 'GMaster' + changemixin.capitalize() + 'Mixin')
sendmarkmixin = boolify(
@ -256,7 +257,7 @@ class TarSSHEngine(object):
"""
def a_syncdata(self, files):
logging.debug('files: %s' % (files))
logging.debug(lf("Files", files=files))
for f in files:
pb = self.syncer.add(f)
@ -264,7 +265,7 @@ class TarSSHEngine(object):
def regjob(se, xte, pb):
rv = pb.wait()
if rv[0]:
logging.debug('synced ' + se)
logging.debug(lf('synced', file=se))
return True
else:
# stat check for file presence
@ -290,16 +291,16 @@ class RsyncEngine(object):
"""Sync engine that uses rsync(1) for data transfers"""
def a_syncdata(self, files):
logging.debug('files: %s' % (files))
logging.debug(lf("files", files=files))
for f in files:
logging.debug('candidate for syncing %s' % f)
logging.debug(lf('candidate for syncing', file=f))
pb = self.syncer.add(f)
def regjob(se, xte, pb):
rv = pb.wait()
if rv[0]:
logging.debug('synced ' + se)
logging.debug(lf('synced', file=se))
return True
else:
# stat to check if the file exist
@ -431,16 +432,16 @@ class GMasterCommon(object):
fcntl.lockf(gconf.mgmt_lock_fd, fcntl.LOCK_EX | fcntl.LOCK_NB)
if not gconf.active_earlier:
gconf.active_earlier = True
logging.info("Got lock : %s : Becoming ACTIVE"
% gconf.local_path)
logging.info(lf("Got lock Becoming ACTIVE",
brick=gconf.local_path))
return True
except:
ex = sys.exc_info()[1]
if isinstance(ex, IOError) and ex.errno in (EACCES, EAGAIN):
if not gconf.passive_earlier:
gconf.passive_earlier = True
logging.info("Didn't get lock : %s : Becoming PASSIVE"
% gconf.local_path)
logging.info(lf("Didn't get lock Becoming PASSIVE",
brick=gconf.local_path))
return False
raise
@ -449,7 +450,7 @@ class GMasterCommon(object):
+ str(gconf.subvol_num) + ".lock"
mgmt_lock_dir = os.path.join(gconf.meta_volume_mnt, "geo-rep")
path = os.path.join(mgmt_lock_dir, bname)
logging.debug("lock_file_path: %s" % path)
logging.debug(lf("lock file path", path=path))
try:
fd = os.open(path, os.O_CREAT | os.O_RDWR)
except OSError:
@ -477,15 +478,16 @@ class GMasterCommon(object):
# cannot grab, it's taken
if not gconf.passive_earlier:
gconf.passive_earlier = True
logging.info("Didn't get lock : %s : Becoming PASSIVE"
% gconf.local_path)
logging.info(lf("Didn't get lock Becoming PASSIVE",
brick=gconf.local_path))
gconf.mgmt_lock_fd = fd
return False
raise
if not gconf.active_earlier:
gconf.active_earlier = True
logging.info("Got lock : %s : Becoming ACTIVE" % gconf.local_path)
logging.info(lf("Got lock Becoming ACTIVE",
brick=gconf.local_path))
return True
def should_crawl(self):
@ -533,8 +535,8 @@ class GMasterCommon(object):
gconf.configinterface.set('volume_id', self.uuid)
if self.volinfo:
if self.volinfo['retval']:
logging.warn("master cluster's info may not be valid %d" %
self.volinfo['retval'])
logging.warn(lf("master cluster's info may not be valid",
error=self.volinfo['retval']))
else:
raise GsyncdError("master volinfo unavailable")
self.lastreport['time'] = time.time()
@ -566,8 +568,10 @@ class GMasterCommon(object):
brick_stime = self.xtime('.', self.slave)
cluster_stime = self.master.server.aggregated.stime_mnt(
'.', '.'.join([str(self.uuid), str(gconf.slave_id)]))
logging.debug("Cluster stime: %s | Brick stime: %s" %
(repr(cluster_stime), repr(brick_stime)))
logging.debug(lf("Crawl info",
cluster_stime=cluster_stime,
brick_stime=brick_stime))
if not isinstance(cluster_stime, int):
if brick_stime < cluster_stime:
self.slave.server.set_stime(
@ -773,8 +777,8 @@ class GMasterChangelogMixin(GMasterCommon):
st = lstat(os.path.join(gfid_prefix, failure[0][entry_key]))
if not isinstance(st, int):
num_failures += 1
logging.error('%s FAILED: %s' % (log_prefix,
repr(failure)))
logging.error(lf('%s FAILED' % log_prefix,
data=failure))
if failure[0]['op'] == 'MKDIR':
raise GsyncdError("The above directory failed to sync."
" Please fix it to proceed further.")
@ -826,8 +830,8 @@ class GMasterChangelogMixin(GMasterCommon):
if self.name == 'live_changelog' or \
self.name == 'history_changelog':
if boolify(gconf.is_hottier) and et == self.TYPE_ENTRY:
logging.debug('skip ENTRY op: %s if hot tier brick'
% (ec[self.POS_TYPE]))
logging.debug(lf('skip ENTRY op if hot tier brick',
op=ec[self.POS_TYPE]))
continue
# Data and Meta operations are decided while parsing
@ -917,7 +921,8 @@ class GMasterChangelogMixin(GMasterCommon):
go = os.path.join(pfx, gfid)
st = lstat(go)
if isinstance(st, int):
logging.debug('file %s got purged in the interim' % go)
logging.debug(lf('file got purged in the interim',
file=go))
continue
if ty == 'LINK':
@ -930,7 +935,9 @@ class GMasterChangelogMixin(GMasterCommon):
entries.append(
edct(ty, stat=st, entry=en, gfid=gfid, link=rl))
else:
logging.warn('ignoring %s [op %s]' % (gfid, ty))
logging.warn(lf('ignoring op',
gfid=gfid,
type=ty))
elif et == self.TYPE_GFID:
# If self.unlinked_gfids is available, then that means it is
# retrying the changelog second time. Do not add the GFID's
@ -962,7 +969,8 @@ class GMasterChangelogMixin(GMasterCommon):
(boolify(gconf.sync_xattrs) or boolify(gconf.sync_acls)):
datas.add(os.path.join(pfx, ec[0]))
else:
logging.warn('got invalid changelog type: %s' % (et))
logging.warn(lf('got invalid fop type',
type=et))
logging.debug('entries: %s' % repr(entries))
# Increment counters for Status
@ -1011,7 +1019,8 @@ class GMasterChangelogMixin(GMasterCommon):
else:
st = lstat(go[0])
if isinstance(st, int):
logging.debug('file %s got purged in the interim' % go[0])
logging.debug(lf('file got purged in the interim',
file=go[0]))
continue
meta_entries.append(edct('META', go=go[0], stat=st))
if meta_entries:
@ -1067,7 +1076,8 @@ class GMasterChangelogMixin(GMasterCommon):
self.a_syncdata(self.datas_in_batch)
else:
for change in changes:
logging.debug('processing change %s' % change)
logging.debug(lf('processing change',
changelog=change))
self.process_change(change, done, retry)
if not retry:
# number of changelogs processed in the batch
@ -1111,9 +1121,9 @@ class GMasterChangelogMixin(GMasterCommon):
retry = True
tries += 1
if tries == int(gconf.max_rsync_retries):
logging.error('changelogs %s could not be processed '
'completely - moving on...' %
' '.join(map(os.path.basename, changes)))
logging.error(lf('changelogs could not be processed '
'completely - moving on...',
files=map(os.path.basename, changes)))
# Reset data counter on failure
self.status.dec_value("data", self.files_in_batch)
@ -1133,8 +1143,8 @@ class GMasterChangelogMixin(GMasterCommon):
# entry_ops() that failed... so we retry the _whole_ changelog
# again.
# TODO: remove entry retries when it's gets fixed.
logging.warn('incomplete sync, retrying changelogs: %s' %
' '.join(map(os.path.basename, changes)))
logging.warn(lf('incomplete sync, retrying changelogs',
files=map(os.path.basename, changes)))
# Reset the Data counter before Retry
self.status.dec_value("data", self.files_in_batch)
@ -1145,43 +1155,44 @@ class GMasterChangelogMixin(GMasterCommon):
# Log the Skipped Entry ops range if any
if self.skipped_entry_changelogs_first is not None and \
self.skipped_entry_changelogs_last is not None:
logging.info("Skipping already processed entry "
"ops from CHANGELOG.{0} to CHANGELOG.{1} "
"Num: {2}".format(
self.skipped_entry_changelogs_first,
self.skipped_entry_changelogs_last,
self.num_skipped_entry_changelogs))
logging.info(lf("Skipping already processed entry ops",
from_changelog=self.skipped_entry_changelogs_first,
to_changelog=self.skipped_entry_changelogs_last,
num_changelogs=self.num_skipped_entry_changelogs))
# Log Current batch details
if changes:
logging.info(
"Entry Time Taken (UNL:{0} RMD:{1} CRE:{2} MKN:{3} "
"MKD:{4} REN:{5} LIN:{6} SYM:{7}): {8:.4f} "
"secs ".format (
self.batch_stats["UNLINK"], self.batch_stats["RMDIR"],
self.batch_stats["CREATE"], self.batch_stats["MKNOD"],
self.batch_stats["MKDIR"], self.batch_stats["RENAME"],
self.batch_stats["LINK"], self.batch_stats["SYMLINK"],
self.batch_stats["ENTRY_SYNC_TIME"]))
lf("Entry Time Taken",
UNL=self.batch_stats["UNLINK"],
RMD=self.batch_stats["RMDIR"],
CRE=self.batch_stats["CREATE"],
MKN=self.batch_stats["MKNOD"],
MKD=self.batch_stats["MKDIR"],
REN=self.batch_stats["RENAME"],
LIN=self.batch_stats["LINK"],
SYM=self.batch_stats["SYMLINK"],
duration="%.4f" % self.batch_stats["ENTRY_SYNC_TIME"]))
logging.info(
"Metadata Time Taken (SETA:{0}): {1:.4f} secs. "
"Data Time Taken (SETX:{2} XATT:{3} DATA:{4}): "
"{5:.4f} secs".format(
self.batch_stats["SETATTR"],
self.batch_stats["META_SYNC_TIME"],
self.batch_stats["SETXATTR"], self.batch_stats["XATTROP"],
self.batch_stats["DATA"],
time.time() - self.batch_stats["DATA_START_TIME"]))
lf("Data/Metadata Time Taken",
SETA=self.batch_stats["SETATTR"],
meta_duration="%.4f" % self.batch_stats["META_SYNC_TIME"],
SETX=self.batch_stats["SETXATTR"],
XATT=self.batch_stats["XATTROP"],
DATA=self.batch_stats["DATA"],
data_duration="%.4f" % (
time.time() - self.batch_stats["DATA_START_TIME"])))
logging.info(
"{0} mode completed in {1:.4f} seconds "
"({2} - {3} Num: {4}) stime: {5}, entry_stime: {6}".format(
self.name,
time.time() - self.batch_start_time,
changes[0].split("/")[-1],
changes[-1].split("/")[-1],
len(changes),
repr(self.get_data_stime()),
repr(self.get_entry_stime())))
lf("Batch Completed",
mode=self.name,
duration="%.4f" % (time.time() - self.batch_start_time),
changelog_start=changes[0].split(".")[-1],
changelog_end=changes[-1].split(".")[-1],
num_changelogs=len(changes),
stime=self.get_data_stime(),
entry_stime=self.get_entry_stime()))
def upd_entry_stime(self, stime):
self.slave.server.set_entry_stime(self.FLAT_DIR_HIERARCHY,
@ -1231,7 +1242,8 @@ class GMasterChangelogMixin(GMasterCommon):
changelogs_batches[-1].append(c)
for batch in changelogs_batches:
logging.debug('processing changes %s' % repr(batch))
logging.debug(lf('processing changes',
batch=batch))
self.process(batch)
def crawl(self):
@ -1246,13 +1258,14 @@ class GMasterChangelogMixin(GMasterCommon):
changes = self.changelog_agent.getchanges()
if changes:
if data_stime:
logging.info("slave's time: %s" % repr(data_stime))
logging.info(lf("slave's time",
stime=data_stime))
processed = [x for x in changes
if int(x.split('.')[-1]) < data_stime[0]]
for pr in processed:
logging.info(
'skipping already processed change: %s...' %
os.path.basename(pr))
logging.debug(
lf('skipping already processed change',
changelog=os.path.basename(pr)))
self.changelog_done_func(pr)
changes.remove(pr)
self.archive_and_purge_changelogs(processed)
@ -1289,10 +1302,11 @@ class GMasterChangeloghistoryMixin(GMasterChangelogMixin):
data_stime = self.get_data_stime()
end_time = int(time.time())
logging.info('starting history crawl... turns: %s, stime: %s, '
'etime: %s, entry_stime: %s'
% (self.history_turns, repr(data_stime),
repr(end_time), self.get_entry_stime()))
logging.info(lf('starting history crawl',
turns=self.history_turns,
stime=data_stime,
etime=end_time,
entry_stime=self.get_entry_stime()))
if not data_stime or data_stime == URXTIME:
raise NoStimeAvailable()
@ -1320,12 +1334,13 @@ class GMasterChangeloghistoryMixin(GMasterChangelogMixin):
changes = self.changelog_agent.history_getchanges()
if changes:
if data_stime:
logging.info("slave's time: %s" % repr(data_stime))
logging.info(lf("slave's time",
stime=data_stime))
processed = [x for x in changes
if int(x.split('.')[-1]) < data_stime[0]]
for pr in processed:
logging.info('skipping already processed change: '
'%s...' % os.path.basename(pr))
logging.debug(lf('skipping already processed change',
changelog=os.path.basename(pr)))
self.changelog_done_func(pr)
changes.remove(pr)
@ -1333,10 +1348,10 @@ class GMasterChangeloghistoryMixin(GMasterChangelogMixin):
history_turn_time = int(time.time()) - self.history_crawl_start_time
logging.info('finished history crawl syncing, endtime: %s, '
'stime: %s, entry_stime: %s'
% (actual_end, repr(self.get_data_stime()),
self.get_entry_stime()))
logging.info(lf('finished history crawl',
endtime=actual_end,
stime=self.get_data_stime(),
entry_stime=self.get_entry_stime()))
# If TS returned from history_changelog is < register_time
# then FS crawl may be required, since history is only available
@ -1376,7 +1391,8 @@ class GMasterXsyncMixin(GMasterChangelogMixin):
self.stimes = []
self.sleep_interval = 60
self.tempdir = self.setup_working_dir()
logging.info('Working dir: %s' % self.tempdir)
logging.info(lf('Working dir',
path=self.tempdir))
self.tempdir = os.path.join(self.tempdir, 'xsync')
self.processed_changelogs_dir = self.tempdir
self.name = "xsync"
@ -1400,25 +1416,28 @@ class GMasterXsyncMixin(GMasterChangelogMixin):
self.Xcrawl()
t = Thread(target=Xsyncer)
t.start()
logging.info('starting hybrid crawl..., stime: %s'
% repr(self.get_data_stime()))
logging.info(lf('starting hybrid crawl',
stime=self.get_data_stime()))
self.status.set_worker_crawl_status("Hybrid Crawl")
while True:
try:
item = self.comlist.pop(0)
if item[0] == 'finale':
logging.info('finished hybrid crawl syncing, stime: %s'
% repr(self.get_data_stime()))
logging.info(lf('finished hybrid crawl',
stime=self.get_data_stime()))
break
elif item[0] == 'xsync':
logging.info('processing xsync changelog %s' % (item[1]))
logging.info(lf('processing xsync changelog',
path=item[1]))
self.process([item[1]], 0)
self.archive_and_purge_changelogs([item[1]])
elif item[0] == 'stime':
logging.debug('setting slave time: %s' % repr(item[1]))
logging.debug(lf('setting slave time',
time=item[1]))
self.upd_stime(item[1][1], item[1][0])
else:
logging.warn('unknown tuple in comlist (%s)' % repr(item))
logging.warn(lf('unknown tuple in comlist',
entry=item))
except IndexError:
time.sleep(1)
@ -1496,8 +1515,9 @@ class GMasterXsyncMixin(GMasterChangelogMixin):
xtr_root = self.xtime('.', self.slave)
if isinstance(xtr_root, int):
if xtr_root != ENOENT:
logging.warn("slave cluster not returning the "
"correct xtime for root (%d)" % xtr_root)
logging.warn(lf("slave cluster not returning the "
"correct xtime for root",
xtime=xtr_root))
xtr_root = self.minus_infinity
xtl = self.xtime(path)
if isinstance(xtl, int):
@ -1505,8 +1525,10 @@ class GMasterXsyncMixin(GMasterChangelogMixin):
xtr = self.xtime(path, self.slave)
if isinstance(xtr, int):
if xtr != ENOENT:
logging.warn("slave cluster not returning the "
"correct xtime for %s (%d)" % (path, xtr))
logging.warn(lf("slave cluster not returning the "
"correct xtime",
path=path,
xtime=xtr))
xtr = self.minus_infinity
xtr = max(xtr, xtr_root)
zero_zero = (0, 0)
@ -1521,27 +1543,32 @@ class GMasterXsyncMixin(GMasterChangelogMixin):
dem = self.master.server.entries(path)
pargfid = self.master.server.gfid(path)
if isinstance(pargfid, int):
logging.warn('skipping directory %s' % (path))
logging.warn(lf('skipping directory',
path=path))
for e in dem:
bname = e
e = os.path.join(path, e)
xte = self.xtime(e)
if isinstance(xte, int):
logging.warn("irregular xtime for %s: %s" %
(e, errno.errorcode[xte]))
logging.warn(lf("irregular xtime",
path=e,
error=errno.errorcode[xte]))
continue
if not self.need_sync(e, xte, xtr):
continue
st = self.master.server.lstat(e)
if isinstance(st, int):
logging.warn('%s got purged in the interim ...' % e)
logging.warn(lf('got purged in the interim',
path=e))
continue
if self.is_sticky(e, st.st_mode):
logging.debug('ignoring sticky bit file %s' % e)
logging.debug(lf('ignoring sticky bit file',
path=e))
continue
gfid = self.master.server.gfid(e)
if isinstance(gfid, int):
logging.warn('skipping entry %s..' % e)
logging.warn(lf('skipping entry',
path=e))
continue
mo = st.st_mode
self.counter += 1 if ((stat.S_ISDIR(mo) or
@ -1704,14 +1731,12 @@ class Syncer(object):
pb.close()
start = time.time()
po = self.sync_engine(pb, self.log_err)
logging.info("Sync Time Taken (Job:{0} "
"Files:{1} ReturnCode:{2}): "
"{3:.4f} secs".format(
job_id,
len(pb),
po.returncode,
time.time() - start
))
logging.info(lf("Sync Time Taken",
job=job_id,
num_files=len(pb),
return_code=po.returncode,
duration="%.4f" % (time.time() - start)))
if po.returncode == 0:
ret = (True, 0)
elif po.returncode in self.errnos_ok:

View File

@ -22,7 +22,7 @@ from errno import ECHILD, ESRCH
import re
import random
from gconf import gconf
from syncdutils import select, waitpid, errno_wrap
from syncdutils import select, waitpid, errno_wrap, lf
from syncdutils import set_term_handler, is_host_local, GsyncdError
from syncdutils import escape, Thread, finalize, memoize
from syncdutils import gf_event, EVENT_GEOREP_FAULTY
@ -63,15 +63,17 @@ def get_slave_bricks_status(host, vol):
po.wait()
po.terminate_geterr(fail_on_err=False)
if po.returncode != 0:
logging.info("Volume status command failed, unable to get "
"list of up nodes of %s, returning empty list: %s" %
(vol, po.returncode))
logging.info(lf("Volume status command failed, unable to get "
"list of up nodes, returning empty list",
volume=vol,
error=po.returncode))
return []
vi = XET.fromstring(vix)
if vi.find('opRet').text != '0':
logging.info("Unable to get list of up nodes of %s, "
"returning empty list: %s" %
(vol, vi.find('opErrstr').text))
logging.info(lf("Unable to get list of up nodes, "
"returning empty list",
volume=vol,
error=vi.find('opErrstr').text))
return []
up_hosts = set()
@ -81,8 +83,10 @@ def get_slave_bricks_status(host, vol):
if el.find('status').text == '1':
up_hosts.add(el.find('hostname').text)
except (ParseError, AttributeError, ValueError) as e:
logging.info("Parsing failed to get list of up nodes of %s, "
"returning empty list: %s" % (vol, e))
logging.info(lf("Parsing failed to get list of up nodes, "
"returning empty list",
volume=vol,
error=e))
return list(up_hosts)
@ -271,8 +275,9 @@ class Monitor(object):
# Spawn the worker and agent in lock to avoid fd leak
self.lock.acquire()
logging.info('starting gsyncd worker(%s). Slave node: %s' %
(w[0]['dir'], remote_host))
logging.info(lf('starting gsyncd worker',
brick=w[0]['dir'],
slave_node=remote_host))
# Couple of pipe pairs for RPC communication b/w
# worker and changelog agent.
@ -336,15 +341,16 @@ class Monitor(object):
if ret_agent is not None:
# Agent is died Kill Worker
logging.info("Changelog Agent died, "
"Aborting Worker(%s)" % w[0]['dir'])
logging.info(lf("Changelog Agent died, Aborting Worker",
brick=w[0]['dir']))
errno_wrap(os.kill, [cpid, signal.SIGKILL], [ESRCH])
nwait(cpid)
nwait(apid)
if ret is not None:
logging.info("worker(%s) died before establishing "
"connection" % w[0]['dir'])
logging.info(lf("worker died before establishing "
"connection",
brick=w[0]['dir']))
nwait(apid) # wait for agent
else:
logging.debug("worker(%s) connected" % w[0]['dir'])
@ -353,15 +359,16 @@ class Monitor(object):
ret_agent = nwait(apid, os.WNOHANG)
if ret is not None:
logging.info("worker(%s) died in startup "
"phase" % w[0]['dir'])
logging.info(lf("worker died in startup phase",
brick=w[0]['dir']))
nwait(apid) # wait for agent
break
if ret_agent is not None:
# Agent is died Kill Worker
logging.info("Changelog Agent died, Aborting "
"Worker(%s)" % w[0]['dir'])
logging.info(lf("Changelog Agent died, Aborting "
"Worker",
brick=w[0]['dir']))
errno_wrap(os.kill, [cpid, signal.SIGKILL], [ESRCH])
nwait(cpid)
nwait(apid)
@ -369,13 +376,15 @@ class Monitor(object):
time.sleep(1)
else:
logging.info("worker(%s) not confirmed in %d sec, aborting it. "
"Gsyncd invocation on remote slave via SSH or "
"gluster master mount might have hung. Please "
"check the above logs for exact issue and check "
"master or slave volume for errors. Restarting "
"master/slave volume accordingly might help."
% (w[0]['dir'], conn_timeout))
logging.info(
lf("Worker not confirmed after wait, aborting it. "
"Gsyncd invocation on remote slave via SSH or "
"gluster master mount might have hung. Please "
"check the above logs for exact issue and check "
"master or slave volume for errors. Restarting "
"master/slave volume accordingly might help.",
brick=w[0]['dir'],
timeout=conn_timeout))
errno_wrap(os.kill, [cpid, signal.SIGKILL], [ESRCH])
nwait(apid) # wait for agent
ret = nwait(cpid)

View File

@ -29,7 +29,7 @@ except ImportError:
# py 3
import pickle
from syncdutils import Thread, select
from syncdutils import Thread, select, lf
pickle_proto = -1
repce_version = 1.0
@ -203,8 +203,10 @@ class RepceClient(object):
meth, *args, **{'cbk': lambda rj, res: rj.wakeup(res)})
exc, res = rjob.wait()
if exc:
logging.error('call %s (%s) failed on peer with %s' %
(repr(rjob), meth, str(type(res).__name__)))
logging.error(lf('call failed on peer',
call=repr(rjob),
method=meth,
error=str(type(res).__name__)))
raise res
logging.debug("call %s %s -> %s" % (repr(rjob), meth, repr(res)))
return res

View File

@ -41,7 +41,7 @@ from syncdutils import get_changelog_log_level, get_rsync_version
from syncdutils import CHANGELOG_AGENT_CLIENT_VERSION
from gsyncdstatus import GeorepStatus
from syncdutils import get_master_and_slave_data_from_args
from syncdutils import mntpt_list
from syncdutils import mntpt_list, lf
UrlRX = re.compile('\A(\w+)://([^ *?[]*)\Z')
HostRX = re.compile('[a-zA-Z\d](?:[a-zA-Z\d.-]*[a-zA-Z\d])?', re.I)
@ -228,11 +228,9 @@ class Popen(subprocess.Popen):
def errlog(self):
"""make a log about child's failure event"""
filling = ""
if self.elines:
filling = ", saying:"
logging.error("""command "%s" returned with %s%s""" %
(" ".join(self.args), repr(self.returncode), filling))
logging.error(lf("command returned error",
cmd=" ".join(self.args),
error=self.returncode))
lp = ''
def logerr(l):
@ -725,11 +723,12 @@ class Server(object):
def rename_with_disk_gfid_confirmation(gfid, entry, en):
if not matching_disk_gfid(gfid, entry):
logging.error("RENAME ignored: "
"source entry:%s(gfid:%s) does not match with "
"on-disk gfid(%s), when attempting to rename "
"to %s" %
(entry, gfid, cls.gfid_mnt(entry), en))
logging.error(lf("RENAME ignored: source entry does not match "
"with on-disk gfid",
source=entry,
gfid=gfid,
disk_gfid=cls.gfid_mnt(entry),
target=en))
return
cmd_ret = errno_wrap(os.rename,
@ -769,12 +768,17 @@ class Server(object):
logging.debug("Removed %s => %s/%s recursively" %
(gfid, pg, bname))
else:
logging.warn("Recursive remove %s => %s/%s"
"failed: %s" % (gfid, pg, bname,
os.strerror(er1)))
logging.warn(lf("Recursive remove failed",
gfid=gfid,
pgfid=pg,
bname=bname,
error=os.strerror(er1)))
else:
logging.warn("Failed to remove %s => %s/%s. %s" %
(gfid, pg, bname, os.strerror(er)))
logging.warn(lf("Failed to remove",
gfid=gfid,
pgfid=pg,
bname=bname,
error=os.strerror(er)))
elif op in ['CREATE', 'MKNOD']:
slink = os.path.join(pfx, gfid)
st = lstat(slink)
@ -833,10 +837,11 @@ class Server(object):
except OSError as e:
if e.errno == ENOTEMPTY:
logging.error(
"Unable to delete directory "
"{0}, Both Old({1}) and New{2}"
" directories exists".format(
entry, entry, en))
lf("Unable to delete directory"
", Both Old and New"
" directories exists",
old=entry,
new=en))
else:
raise
else:
@ -1011,8 +1016,8 @@ class SlaveLocal(object):
time.sleep(int(gconf.timeout))
if lp == self.server.last_keep_alive:
logging.info(
"connection inactive for %d seconds, stopping" %
int(gconf.timeout))
lf("connection inactive, stopping",
timeout=int(gconf.timeout)))
break
else:
select((), (), ())
@ -1114,7 +1119,9 @@ class SlaveRemote(object):
if kw.get("log_err", False):
for errline in stderr.strip().split("\n")[:-1]:
logging.error("SYNC Error(Rsync): %s" % errline)
logging.error(lf("SYNC Error",
sync_engine="Rsync",
error=errline))
if log_rsync_performance:
rsync_msg = []
@ -1129,7 +1136,8 @@ class SlaveRemote(object):
line.startswith("Total bytes received:") or \
line.startswith("sent "):
rsync_msg.append(line)
logging.info("rsync performance: %s" % ", ".join(rsync_msg))
logging.info(lf("rsync performance",
data=", ".join(rsync_msg)))
return po
@ -1169,7 +1177,9 @@ class SlaveRemote(object):
if log_err:
for errline in stderr1.strip().split("\n")[:-1]:
logging.error("SYNC Error(Untar): %s" % errline)
logging.error(lf("SYNC Error",
sync_engine="Tarssh",
error=errline))
return p1
@ -1389,7 +1399,8 @@ class GLUSTER(AbstractUrl, SlaveLocal, SlaveRemote):
if rv:
rv = (os.WIFEXITED(rv) and os.WEXITSTATUS(rv) or 0) - \
(os.WIFSIGNALED(rv) and os.WTERMSIG(rv) or 0)
logging.warn('stale mount possibly left behind on ' + d)
logging.warn(lf('stale mount possibly left behind',
path=d))
raise GsyncdError("cleaning up temp mountpoint %s "
"failed with status %d" %
(d, rv))
@ -1478,7 +1489,7 @@ class GLUSTER(AbstractUrl, SlaveLocal, SlaveRemote):
# if cli terminated with error due to being
# refused by glusterd, what it put
# out on stdout is a diagnostic message
logging.error('glusterd answered: %s' % self.mntpt)
logging.error(lf('glusterd answered', mnt=self.mntpt))
def connect(self):
"""inhibit the resource beyond
@ -1488,7 +1499,7 @@ class GLUSTER(AbstractUrl, SlaveLocal, SlaveRemote):
with given backend
"""
logging.info ("Mounting gluster volume locally...")
logging.info("Mounting gluster volume locally...")
t0 = time.time()
label = getattr(gconf, 'mountbroker', None)
if not label and not privileged():
@ -1500,8 +1511,8 @@ class GLUSTER(AbstractUrl, SlaveLocal, SlaveRemote):
['log-file=' + gconf.gluster_log_file, 'volfile-server=' +
self.host, 'volfile-id=' + self.volume, 'client-pid=-1']
mounter(params).inhibit(*[l for l in [label] if l])
logging.info ("Mounted gluster volume. Time taken: {0:.4f} "
"secs".format((time.time() - t0)))
logging.info(lf("Mounted gluster volume",
duration="%.4f" % (time.time() - t0)))
def connect_remote(self, *a, **kw):
sup(self, *a, **kw)
@ -1643,11 +1654,12 @@ class GLUSTER(AbstractUrl, SlaveLocal, SlaveRemote):
g2.register(register_time, changelog_agent, status)
g3.register(register_time, changelog_agent, status)
except ChangelogException as e:
logging.error("Changelog register failed, %s" % e)
logging.error(lf("Changelog register failed", error=e))
sys.exit(1)
g1.register(status=status)
logging.info("Register time: %s" % register_time)
logging.info(lf("Register time",
time=register_time))
# oneshot: Try to use changelog history api, if not
# available switch to FS crawl
# Note: if config.change_detector is xsync then
@ -1655,8 +1667,9 @@ class GLUSTER(AbstractUrl, SlaveLocal, SlaveRemote):
try:
g3.crawlwrap(oneshot=True)
except PartialHistoryAvailable as e:
logging.info('Partial history available, using xsync crawl'
' after consuming history till %s' % str(e))
logging.info(lf('Partial history available, using xsync crawl'
' after consuming history',
till=e))
g1.crawlwrap(oneshot=True, register_time=register_time)
except ChangelogHistoryNotAvailable:
logging.info('Changelog history not available, using xsync')
@ -1665,13 +1678,14 @@ class GLUSTER(AbstractUrl, SlaveLocal, SlaveRemote):
logging.info('No stime available, using xsync crawl')
g1.crawlwrap(oneshot=True, register_time=register_time)
except ChangelogException as e:
logging.error("Changelog History Crawl failed, %s" % e)
logging.error(lf("Changelog History Crawl failed",
error=e))
sys.exit(1)
try:
g2.crawlwrap()
except ChangelogException as e:
logging.error("Changelog crawl failed, %s" % e)
logging.error(lf("Changelog crawl failed", error=e))
sys.exit(1)
else:
sup(self, *args)
@ -1763,14 +1777,14 @@ class SSH(AbstractUrl, SlaveRemote):
self.inner_rsc.url)
deferred = go_daemon == 'postconn'
logging.info ("Initializing SSH connection between master and slave...")
logging.info("Initializing SSH connection between master and slave...")
t0 = time.time()
ret = sup(self, gconf.ssh_command.split() +
["-p", str(gconf.ssh_port)] +
gconf.ssh_ctl_args + [self.remote_addr],
slave=self.inner_rsc.url, deferred=deferred)
logging.info ("SSH connection between master and slave established. "
"Time taken: {0:.4f} secs".format((time.time() - t0)))
logging.info(lf("SSH connection between master and slave established.",
duration="%.4f" % (time.time() - t0)))
if deferred:
# send a message to peer so that we can wait for

View File

@ -304,8 +304,8 @@ def log_raise_exception(excont):
gconf.transport.terminate_geterr()
elif isinstance(exc, OSError) and exc.errno in (ENOTCONN,
ECONNABORTED):
logging.error('glusterfs session went down [%s]',
errorcode[exc.errno])
logging.error(lf('glusterfs session went down',
error=errorcode[exc.errno]))
else:
logtag = "FAIL"
if not logtag and logging.getLogger().isEnabledFor(logging.DEBUG):
@ -387,8 +387,9 @@ def boolify(s):
if lstr in true_list:
rv = True
elif not lstr in false_list:
logging.warn("Unknown string (%s) in string to boolean conversion "
"defaulting to False\n" % (s))
logging.warn(lf("Unknown string in \"string to boolean\" conversion, "
"defaulting to False",
str=s))
return rv
@ -497,8 +498,9 @@ def errno_wrap(call, arg=[], errnos=[], retry_errnos=[]):
nr_tries += 1
if nr_tries == GF_OP_RETRIES:
# probably a screwed state, cannot do much...
logging.warn('reached maximum retries (%s)...%s' %
(repr(arg), ex))
logging.warn(lf('reached maximum retries',
args=repr(arg),
error=ex))
raise
time.sleep(0.250) # retry the call
@ -572,3 +574,16 @@ def get_rsync_version(rsync_cmd):
rsync_version = out.split(" ", 4)[3]
return rsync_version
def lf(event, **kwargs):
"""
Log Format helper function, log messages can be
easily modified to structured log format.
lf("Config Change", sync_jobs=4, brick=/bricks/b1) will be
converted as "Config Change<TAB>brick=/bricks/b1<TAB>sync_jobs=4"
"""
msg = event
for k, v in kwargs.items():
msg += "\t{0}={1}".format(k, v)
return msg