geo-rep: Kill Geo-rep Worker when Agent process dies
When Changelog agent process dies, Geo-replication fails to detect and worker will run without respective Changelog agent. Status shows Active/Passive without any progress. With this patch, Worker process gets killed whenever Changelog agent dies. Change-Id: I30b4cc77f924f7e8174b8bfe415ac17f0b3851b4 Signed-off-by: Aravinda VK <avishwan@redhat.com> BUG: 1277076 Reviewed-on: http://review.gluster.org/12485 Tested-by: NetBSD Build System <jenkins@build.gluster.org> Tested-by: Gluster Build System <jenkins@build.gluster.com> Reviewed-by: Venky Shankar <vshankar@redhat.com> Reviewed-by: Kotresh HR <khiremat@redhat.com>
This commit is contained in:
parent
f68c95a429
commit
5d1ff7efd6
@ -66,8 +66,6 @@ class Changelog(object):
|
||||
class ChangelogAgent(object):
|
||||
def __init__(self, obj, fd_tup):
|
||||
(inf, ouf, rw, ww) = fd_tup.split(',')
|
||||
os.close(int(rw))
|
||||
os.close(int(ww))
|
||||
repce = RepceServer(obj, int(inf), int(ouf), 1)
|
||||
t = syncdutils.Thread(target=lambda: (repce.service_loop(),
|
||||
syncdutils.finalize()))
|
||||
|
@ -18,7 +18,7 @@ import xml.etree.ElementTree as XET
|
||||
from subprocess import PIPE
|
||||
from resource import Popen, FILE, GLUSTER, SSH
|
||||
from threading import Lock
|
||||
from errno import EEXIST
|
||||
from errno import ECHILD
|
||||
import re
|
||||
import random
|
||||
from gconf import gconf
|
||||
@ -188,10 +188,18 @@ class Monitor(object):
|
||||
ret = 0
|
||||
|
||||
def nwait(p, o=0):
|
||||
p2, r = waitpid(p, o)
|
||||
if not p2:
|
||||
return
|
||||
return r
|
||||
try:
|
||||
p2, r = waitpid(p, o)
|
||||
if not p2:
|
||||
return
|
||||
return r
|
||||
except OSError as e:
|
||||
# no child process, this happens if the child process
|
||||
# already died and has been cleaned up
|
||||
if e.errno == ECHILD:
|
||||
return -1
|
||||
else:
|
||||
raise
|
||||
|
||||
def exit_signalled(s):
|
||||
""" child teminated due to receipt of SIGUSR1 """
|
||||
@ -240,6 +248,8 @@ class Monitor(object):
|
||||
# spawn the agent process
|
||||
apid = os.fork()
|
||||
if apid == 0:
|
||||
os.close(rw)
|
||||
os.close(ww)
|
||||
os.execv(sys.executable, argv + ['--local-path', w[0],
|
||||
'--agent',
|
||||
'--rpc-fd',
|
||||
@ -249,6 +259,8 @@ class Monitor(object):
|
||||
cpid = os.fork()
|
||||
if cpid == 0:
|
||||
os.close(pr)
|
||||
os.close(ra)
|
||||
os.close(wa)
|
||||
os.execv(sys.executable, argv + ['--feedback-fd', str(pw),
|
||||
'--local-path', w[0],
|
||||
'--local-id',
|
||||
@ -277,30 +289,52 @@ class Monitor(object):
|
||||
|
||||
if so:
|
||||
ret = nwait(cpid, os.WNOHANG)
|
||||
ret_agent = nwait(apid, os.WNOHANG)
|
||||
|
||||
if ret_agent is not None:
|
||||
# Agent is died Kill Worker
|
||||
logging.info("Changelog Agent died, "
|
||||
"Aborting Worker(%s)" % w[0])
|
||||
os.kill(cpid, signal.SIGKILL)
|
||||
nwait(cpid)
|
||||
nwait(apid)
|
||||
|
||||
if ret is not None:
|
||||
logging.info("worker(%s) died before establishing "
|
||||
"connection" % w[0])
|
||||
nwait(apid) #wait for agent
|
||||
nwait(apid) # wait for agent
|
||||
else:
|
||||
logging.debug("worker(%s) connected" % w[0])
|
||||
while time.time() < t0 + conn_timeout:
|
||||
ret = nwait(cpid, os.WNOHANG)
|
||||
ret_agent = nwait(apid, os.WNOHANG)
|
||||
|
||||
if ret is not None:
|
||||
logging.info("worker(%s) died in startup "
|
||||
"phase" % w[0])
|
||||
nwait(apid) #wait for agent
|
||||
nwait(apid) # wait for agent
|
||||
break
|
||||
|
||||
if ret_agent is not None:
|
||||
# Agent is died Kill Worker
|
||||
logging.info("Changelog Agent died, Aborting "
|
||||
"Worker(%s)" % w[0])
|
||||
os.kill(cpid, signal.SIGKILL)
|
||||
nwait(cpid)
|
||||
nwait(apid)
|
||||
break
|
||||
|
||||
time.sleep(1)
|
||||
else:
|
||||
logging.info("worker(%s) not confirmed in %d sec, "
|
||||
"aborting it" % (w[0], conn_timeout))
|
||||
os.kill(cpid, signal.SIGKILL)
|
||||
nwait(apid) #wait for agent
|
||||
nwait(apid) # wait for agent
|
||||
ret = nwait(cpid)
|
||||
if ret is None:
|
||||
self.status[w[0]].set_worker_status(self.ST_STABLE)
|
||||
#If worker dies, agent terminates on EOF.
|
||||
#So lets wait for agent first.
|
||||
# If worker dies, agent terminates on EOF.
|
||||
# So lets wait for agent first.
|
||||
nwait(apid)
|
||||
ret = nwait(cpid)
|
||||
if exit_signalled(ret):
|
||||
|
@ -1394,8 +1394,6 @@ class GLUSTER(AbstractUrl, SlaveLocal, SlaveRemote):
|
||||
# g3 ==> changelog History
|
||||
changelog_register_failed = False
|
||||
(inf, ouf, ra, wa) = gconf.rpc_fd.split(',')
|
||||
os.close(int(ra))
|
||||
os.close(int(wa))
|
||||
changelog_agent = RepceClient(int(inf), int(ouf))
|
||||
status = GeorepStatus(gconf.state_file, gconf.local_path)
|
||||
status.reset_on_worker_start()
|
||||
|
Loading…
x
Reference in New Issue
Block a user