see https://review.gluster.org/#/c/19788/, https://review.gluster.org/#/c/19871/, https://review.gluster.org/#/c/19952/, https://review.gluster.org/#/c/20104/, https://review.gluster.org/#/c/20162/, https://review.gluster.org/#/c/20185/, https://review.gluster.org/#/c/20207/, https://review.gluster.org/#/c/20227/, https://review.gluster.org/#/c/20307/, https://review.gluster.org/#/c/20320/, https://review.gluster.org/#/c/20332/, https://review.gluster.org/#/c/20364/, https://review.gluster.org/#/c/20441/, and https://review.gluster.org/#/c/20484 shebangs changed from /usr/bin/python2 to /usr/bin/python3. (Reminder, various distribution packaging guidelines require use of explicit python version and don't allow '#!/usr/bin/env python', regardless of how handy that idiom may be.) glusterfs.spec(.in) package python{2,3}-gluster and python2 or python3 dependencies as appropriate. configure(.ac): + test for and use python2 or python3 as appropriate. If build machine has python2 and python3, use python3. Override by setting PYTHON=/usr/bin/python2 when running configure. + PYTHONDEV_CPPFLAGS from python[23]-config --includes is a better match to the original python sysconfig.get_python_inc(). All those other extraneous flags breaks the build. + Only change the shebangs once. Changing them over and over again, e.g., during a `make glusterrpms` in extras/LinuxRPM just sends make (is it really make that's looping?) into an infinite loop. If you figure out why, let me know. + Oldest python2 is python2.6 on CentOS 6 and Debian 8 (Jessie). Everything else has 2.7 or 3.x + logic from https://review.gluster.org/c/glusterfs/+/21050, which needs to be removed/merged after that patch is merged. Builds on CentOS 6, CentOS 7, Fedora 28, Fedora rawhide, and the mysterious RHEL > 7. Change-Id: Idae21d3b6f58b32372e1daa0d234e491e563198f updates: #411 Signed-off-by: Kaleb S. KEITHLEY <kkeithle@redhat.com>
420 lines
15 KiB
Python
420 lines
15 KiB
Python
#!/usr/bin/python3
|
|
#
|
|
# Copyright (c) 2011-2014 Red Hat, Inc. <http://www.redhat.com>
|
|
# This file is part of GlusterFS.
|
|
|
|
# This file is licensed to you under your choice of the GNU Lesser
|
|
# General Public License, version 3 or any later version (LGPLv3 or
|
|
# later), or the GNU General Public License, version 2 (GPLv2), in all
|
|
# cases as published by the Free Software Foundation.
|
|
#
|
|
|
|
from __future__ import print_function
|
|
import fcntl
|
|
import os
|
|
import tempfile
|
|
try:
|
|
import urllib.parse as urllib
|
|
except ImportError:
|
|
import urllib
|
|
import json
|
|
import time
|
|
from datetime import datetime
|
|
from errno import EACCES, EAGAIN, ENOENT
|
|
import logging
|
|
|
|
from syncdutils import EVENT_GEOREP_ACTIVE, EVENT_GEOREP_PASSIVE, gf_event
|
|
from syncdutils import EVENT_GEOREP_CHECKPOINT_COMPLETED, lf
|
|
|
|
DEFAULT_STATUS = "N/A"
|
|
MONITOR_STATUS = ("Created", "Started", "Paused", "Stopped")
|
|
STATUS_VALUES = (DEFAULT_STATUS,
|
|
"Initializing...",
|
|
"Active",
|
|
"Passive",
|
|
"Faulty")
|
|
|
|
CRAWL_STATUS_VALUES = (DEFAULT_STATUS,
|
|
"Hybrid Crawl",
|
|
"History Crawl",
|
|
"Changelog Crawl")
|
|
|
|
|
|
def human_time(ts):
|
|
try:
|
|
return datetime.fromtimestamp(float(ts)).strftime("%Y-%m-%d %H:%M:%S")
|
|
except ValueError:
|
|
return DEFAULT_STATUS
|
|
|
|
|
|
def human_time_utc(ts):
|
|
try:
|
|
return datetime.utcfromtimestamp(
|
|
float(ts)).strftime("%Y-%m-%d %H:%M:%S")
|
|
except ValueError:
|
|
return DEFAULT_STATUS
|
|
|
|
|
|
def get_default_values():
|
|
return {
|
|
"slave_node": DEFAULT_STATUS,
|
|
"worker_status": DEFAULT_STATUS,
|
|
"last_synced": 0,
|
|
"last_synced_entry": 0,
|
|
"crawl_status": DEFAULT_STATUS,
|
|
"entry": 0,
|
|
"data": 0,
|
|
"meta": 0,
|
|
"failures": 0,
|
|
"checkpoint_completed": DEFAULT_STATUS,
|
|
"checkpoint_time": 0,
|
|
"checkpoint_completion_time": 0}
|
|
|
|
|
|
class LockedOpen(object):
|
|
|
|
def __init__(self, filename, *args, **kwargs):
|
|
self.filename = filename
|
|
self.open_args = args
|
|
self.open_kwargs = kwargs
|
|
self.fileobj = None
|
|
|
|
def __enter__(self):
|
|
"""
|
|
If two processes compete to update a file, The first process
|
|
gets the lock and the second process is blocked in the fcntl.flock()
|
|
call. When first process replaces the file and releases the lock,
|
|
the already open file descriptor in the second process now points
|
|
to a "ghost" file(not reachable by any path name) with old contents.
|
|
To avoid that conflict, check the fd already opened is same or
|
|
not. Open new one if not same
|
|
"""
|
|
f = open(self.filename, *self.open_args, **self.open_kwargs)
|
|
while True:
|
|
fcntl.flock(f, fcntl.LOCK_EX)
|
|
fnew = open(self.filename, *self.open_args, **self.open_kwargs)
|
|
if os.path.sameopenfile(f.fileno(), fnew.fileno()):
|
|
fnew.close()
|
|
break
|
|
else:
|
|
f.close()
|
|
f = fnew
|
|
self.fileobj = f
|
|
return f
|
|
|
|
def __exit__(self, _exc_type, _exc_value, _traceback):
|
|
fcntl.flock(self.fileobj, fcntl.LOCK_UN)
|
|
self.fileobj.close()
|
|
|
|
|
|
def set_monitor_status(status_file, status):
|
|
fd = os.open(status_file, os.O_CREAT | os.O_RDWR)
|
|
os.close(fd)
|
|
with LockedOpen(status_file, 'r+'):
|
|
with tempfile.NamedTemporaryFile('w', dir=os.path.dirname(status_file),
|
|
delete=False) as tf:
|
|
tf.write(status)
|
|
tempname = tf.name
|
|
|
|
os.rename(tempname, status_file)
|
|
dirfd = os.open(os.path.dirname(os.path.abspath(status_file)),
|
|
os.O_DIRECTORY)
|
|
os.fsync(dirfd)
|
|
os.close(dirfd)
|
|
|
|
|
|
class GeorepStatus(object):
|
|
def __init__(self, monitor_status_file, master_node, brick, master_node_id,
|
|
master, slave, monitor_pid_file=None):
|
|
self.master = master
|
|
slv_data = slave.split("::")
|
|
self.slave_host = slv_data[0]
|
|
self.slave_volume = slv_data[1].split(":")[0] # Remove Slave UUID
|
|
self.work_dir = os.path.dirname(monitor_status_file)
|
|
self.monitor_status_file = monitor_status_file
|
|
self.filename = os.path.join(self.work_dir,
|
|
"brick_%s.status"
|
|
% urllib.quote_plus(brick))
|
|
|
|
fd = os.open(self.filename, os.O_CREAT | os.O_RDWR)
|
|
os.close(fd)
|
|
fd = os.open(self.monitor_status_file, os.O_CREAT | os.O_RDWR)
|
|
os.close(fd)
|
|
self.master_node = master_node
|
|
self.master_node_id = master_node_id
|
|
self.brick = brick
|
|
self.default_values = get_default_values()
|
|
self.monitor_pid_file = monitor_pid_file
|
|
|
|
def send_event(self, event_type, **kwargs):
|
|
gf_event(event_type,
|
|
master_volume=self.master,
|
|
master_node=self.master_node,
|
|
master_node_id=self.master_node_id,
|
|
slave_host=self.slave_host,
|
|
slave_volume=self.slave_volume,
|
|
brick_path=self.brick,
|
|
**kwargs)
|
|
|
|
def _update(self, mergerfunc):
|
|
data = self.default_values
|
|
with LockedOpen(self.filename, 'r+') as f:
|
|
try:
|
|
data.update(json.load(f))
|
|
except ValueError:
|
|
pass
|
|
|
|
data = mergerfunc(data)
|
|
# If Data is not changed by merger func
|
|
if not data:
|
|
return False
|
|
|
|
with tempfile.NamedTemporaryFile(
|
|
'w',
|
|
dir=os.path.dirname(self.filename),
|
|
delete=False) as tf:
|
|
tf.write(data)
|
|
tempname = tf.name
|
|
|
|
os.rename(tempname, self.filename)
|
|
dirfd = os.open(os.path.dirname(os.path.abspath(self.filename)),
|
|
os.O_DIRECTORY)
|
|
os.fsync(dirfd)
|
|
os.close(dirfd)
|
|
return True
|
|
|
|
def reset_on_worker_start(self):
|
|
def merger(data):
|
|
data["slave_node"] = DEFAULT_STATUS
|
|
data["crawl_status"] = DEFAULT_STATUS
|
|
data["entry"] = 0
|
|
data["data"] = 0
|
|
data["meta"] = 0
|
|
return json.dumps(data)
|
|
|
|
self._update(merger)
|
|
|
|
def set_field(self, key, value):
|
|
def merger(data):
|
|
# Current data and prev data is same
|
|
if data[key] == value:
|
|
return {}
|
|
|
|
data[key] = value
|
|
return json.dumps(data)
|
|
|
|
return self._update(merger)
|
|
|
|
def trigger_gf_event_checkpoint_completion(self, checkpoint_time,
|
|
checkpoint_completion_time):
|
|
self.send_event(EVENT_GEOREP_CHECKPOINT_COMPLETED,
|
|
checkpoint_time=checkpoint_time,
|
|
checkpoint_completion_time=checkpoint_completion_time)
|
|
|
|
def set_last_synced(self, value, checkpoint_time):
|
|
def merger(data):
|
|
data["last_synced"] = value[0]
|
|
|
|
# If checkpoint is not set or reset
|
|
# or if last set checkpoint is changed
|
|
if checkpoint_time == 0 or \
|
|
checkpoint_time != data["checkpoint_time"]:
|
|
data["checkpoint_time"] = 0
|
|
data["checkpoint_completion_time"] = 0
|
|
data["checkpoint_completed"] = "No"
|
|
|
|
# If checkpoint is completed and not marked as completed
|
|
# previously then update the checkpoint completed time
|
|
if checkpoint_time > 0 and checkpoint_time <= value[0]:
|
|
if data["checkpoint_completed"] == "No":
|
|
curr_time = int(time.time())
|
|
data["checkpoint_time"] = checkpoint_time
|
|
data["checkpoint_completion_time"] = curr_time
|
|
data["checkpoint_completed"] = "Yes"
|
|
logging.info(lf("Checkpoint completed",
|
|
checkpoint_time=human_time_utc(
|
|
checkpoint_time),
|
|
completion_time=human_time_utc(curr_time)))
|
|
self.trigger_gf_event_checkpoint_completion(
|
|
checkpoint_time, curr_time)
|
|
|
|
return json.dumps(data)
|
|
|
|
self._update(merger)
|
|
|
|
def set_worker_status(self, status):
|
|
if self.set_field("worker_status", status):
|
|
logging.info(lf("Worker Status Change",
|
|
status=status))
|
|
|
|
def set_worker_crawl_status(self, status):
|
|
if self.set_field("crawl_status", status):
|
|
logging.info(lf("Crawl Status Change",
|
|
status=status))
|
|
|
|
def set_slave_node(self, slave_node):
|
|
def merger(data):
|
|
data["slave_node"] = slave_node
|
|
return json.dumps(data)
|
|
|
|
self._update(merger)
|
|
|
|
def inc_value(self, key, value):
|
|
def merger(data):
|
|
data[key] = data.get(key, 0) + value
|
|
return json.dumps(data)
|
|
|
|
self._update(merger)
|
|
|
|
def dec_value(self, key, value):
|
|
def merger(data):
|
|
data[key] = data.get(key, 0) - value
|
|
if data[key] < 0:
|
|
data[key] = 0
|
|
return json.dumps(data)
|
|
|
|
self._update(merger)
|
|
|
|
def set_active(self):
|
|
if self.set_field("worker_status", "Active"):
|
|
logging.info(lf("Worker Status Change",
|
|
status="Active"))
|
|
self.send_event(EVENT_GEOREP_ACTIVE)
|
|
|
|
def set_passive(self):
|
|
if self.set_field("worker_status", "Passive"):
|
|
logging.info(lf("Worker Status Change",
|
|
status="Passive"))
|
|
self.send_event(EVENT_GEOREP_PASSIVE)
|
|
|
|
def get_monitor_status(self):
|
|
data = ""
|
|
with open(self.monitor_status_file, "r") as f:
|
|
data = f.read().strip()
|
|
return data
|
|
|
|
def get_status(self, checkpoint_time=0):
|
|
"""
|
|
Monitor Status ---> Created Started Paused Stopped
|
|
----------------------------------------------------------------------
|
|
slave_node N/A VALUE VALUE N/A
|
|
status Created VALUE Paused Stopped
|
|
last_synced N/A VALUE VALUE VALUE
|
|
last_synced_entry N/A VALUE VALUE VALUE
|
|
crawl_status N/A VALUE N/A N/A
|
|
entry N/A VALUE N/A N/A
|
|
data N/A VALUE N/A N/A
|
|
meta N/A VALUE N/A N/A
|
|
failures N/A VALUE VALUE VALUE
|
|
checkpoint_completed N/A VALUE VALUE VALUE
|
|
checkpoint_time N/A VALUE VALUE VALUE
|
|
checkpoint_completed_time N/A VALUE VALUE VALUE
|
|
"""
|
|
data = self.default_values
|
|
with open(self.filename) as f:
|
|
try:
|
|
data.update(json.load(f))
|
|
except ValueError:
|
|
pass
|
|
monitor_status = self.get_monitor_status()
|
|
|
|
# Verifying whether monitor process running and adjusting status
|
|
if monitor_status in ["Started", "Paused"]:
|
|
try:
|
|
with open(self.monitor_pid_file, "r+") as f:
|
|
fcntl.lockf(f, fcntl.LOCK_EX | fcntl.LOCK_NB)
|
|
monitor_status = "Stopped"
|
|
except (IOError, OSError) as e:
|
|
# If pid file not exists, either monitor died or Geo-rep
|
|
# not even started once
|
|
if e.errno == ENOENT:
|
|
monitor_status = "Stopped"
|
|
elif e.errno in (EACCES, EAGAIN):
|
|
# cannot grab. so, monitor process still running..move on
|
|
pass
|
|
else:
|
|
raise
|
|
|
|
if monitor_status in ["Created", "Paused", "Stopped"]:
|
|
data["worker_status"] = monitor_status
|
|
|
|
if monitor_status == "":
|
|
data["worker_status"] = "Stopped"
|
|
|
|
# Checkpoint adjustments
|
|
if checkpoint_time == 0:
|
|
data["checkpoint_completed"] = DEFAULT_STATUS
|
|
data["checkpoint_time"] = DEFAULT_STATUS
|
|
data["checkpoint_completion_time"] = DEFAULT_STATUS
|
|
else:
|
|
if checkpoint_time != data["checkpoint_time"]:
|
|
if checkpoint_time <= data["last_synced"]:
|
|
data["checkpoint_completed"] = "Yes"
|
|
data["checkpoint_time"] = checkpoint_time
|
|
data["checkpoint_completion_time"] = data["last_synced"]
|
|
else:
|
|
data["checkpoint_completed"] = "No"
|
|
data["checkpoint_time"] = checkpoint_time
|
|
data["checkpoint_completion_time"] = DEFAULT_STATUS
|
|
|
|
if data["checkpoint_time"] not in [0, DEFAULT_STATUS]:
|
|
chkpt_time = data["checkpoint_time"]
|
|
data["checkpoint_time"] = human_time(chkpt_time)
|
|
data["checkpoint_time_utc"] = human_time_utc(chkpt_time)
|
|
|
|
if data["checkpoint_completion_time"] not in [0, DEFAULT_STATUS]:
|
|
chkpt_completion_time = data["checkpoint_completion_time"]
|
|
data["checkpoint_completion_time"] = human_time(
|
|
chkpt_completion_time)
|
|
data["checkpoint_completion_time_utc"] = human_time_utc(
|
|
chkpt_completion_time)
|
|
|
|
if data["last_synced"] == 0:
|
|
data["last_synced"] = DEFAULT_STATUS
|
|
data["last_synced_utc"] = DEFAULT_STATUS
|
|
else:
|
|
last_synced = data["last_synced"]
|
|
data["last_synced"] = human_time(last_synced)
|
|
data["last_synced_utc"] = human_time_utc(last_synced)
|
|
|
|
if data["worker_status"] != "Active":
|
|
data["last_synced"] = DEFAULT_STATUS
|
|
data["last_synced_utc"] = DEFAULT_STATUS
|
|
data["crawl_status"] = DEFAULT_STATUS
|
|
data["entry"] = DEFAULT_STATUS
|
|
data["data"] = DEFAULT_STATUS
|
|
data["meta"] = DEFAULT_STATUS
|
|
data["failures"] = DEFAULT_STATUS
|
|
data["checkpoint_completed"] = DEFAULT_STATUS
|
|
data["checkpoint_time"] = DEFAULT_STATUS
|
|
data["checkpoint_completed_time"] = DEFAULT_STATUS
|
|
data["checkpoint_time_utc"] = DEFAULT_STATUS
|
|
data["checkpoint_completion_time_utc"] = DEFAULT_STATUS
|
|
|
|
if data["worker_status"] not in ["Active", "Passive"]:
|
|
data["slave_node"] = DEFAULT_STATUS
|
|
|
|
if data.get("last_synced_utc", 0) == 0:
|
|
data["last_synced_utc"] = DEFAULT_STATUS
|
|
|
|
if data.get("checkpoint_completion_time_utc", 0) == 0:
|
|
data["checkpoint_completion_time_utc"] = DEFAULT_STATUS
|
|
|
|
if data.get("checkpoint_time_utc", 0) == 0:
|
|
data["checkpoint_time_utc"] = DEFAULT_STATUS
|
|
|
|
return data
|
|
|
|
def print_status(self, checkpoint_time=0, json_output=False):
|
|
status_out = self.get_status(checkpoint_time)
|
|
if json_output:
|
|
out = {}
|
|
# Convert all values as string
|
|
for k, v in status_out.items():
|
|
out[k] = str(v)
|
|
print(json.dumps(out))
|
|
return
|
|
|
|
for key, value in status_out.items():
|
|
print(("%s: %s" % (key, value)))
|