[geo-rep]: Worker still ACTIVE after killing bricks

Problem: In changelog xlator after destroying listener it call's
         unlink to delete changelog socket file but socket file
         reference is not cleaned up from process memory

Solution: 1) To cleanup reference completely from process memory
             serialize transport cleanup for changelog and then
             unlink socket file
          2) Brick xlator will notify GF_EVENT_PARENT_DOWN to next
             xlator only after cleanup all xprts

Test: To test the same run below steps
      1) Setup some volume and enable brick mux
      2) kill anyone brick with gf_attach
      3) check changelog socket for specific to killed brick
         in lsof, it should cleanup completely

fixes: bz#1600145

Change-Id: Iba06cbf77d8a87b34a60fce50f6d8c0d427fa491
Signed-off-by: Mohit Agrawal <moagrawal@redhat.com>
This commit is contained in:
Mohit Agrawal 2018-11-23 09:39:43 +05:30 committed by Amar Tumballi
parent 52d3f82db2
commit fb917bf10b
11 changed files with 328 additions and 41 deletions

View File

@ -1036,7 +1036,7 @@ xlator_mem_free(xlator_t *xl)
static void
xlator_call_fini(xlator_t *this)
{
if (!this || this->cleanup_starting)
if (!this || this->call_cleanup)
return;
this->cleanup_starting = 1;
this->call_cleanup = 1;

View File

@ -3009,6 +3009,12 @@ socket_server_event_handler(int fd, int idx, int gen, void *data, int poll_in,
* thread context while we are using it here.
*/
priv->idx = idx;
priv->gen = gen;
if (poll_err) {
socket_event_poll_err(this, gen, idx);
goto out;
}
if (poll_in) {
int aflags = 0;

View File

@ -0,0 +1,109 @@
#!/bin/bash
. $(dirname $0)/../include.rc
. $(dirname $0)/../volume.rc
. $(dirname $0)/../geo-rep.rc
. $(dirname $0)/../env.rc
### Basic Tests with Distribute Replicate volumes
##Cleanup and start glusterd
cleanup;
SCRIPT_TIMEOUT=600
TEST glusterd;
TEST pidof glusterd
##Variables
GEOREP_CLI="$CLI volume geo-replication"
master=$GMV0
SH0="127.0.0.1"
slave=${SH0}::${GSV0}
num_active=2
num_passive=2
master_mnt=$M0
slave_mnt=$M1
############################################################
#SETUP VOLUMES AND GEO-REPLICATION
############################################################
##create_and_start_master_volume
TEST $CLI volume create $GMV0 replica 2 $H0:$B0/${GMV0}{1,2};
gluster v set all cluster.brick-multiplex on
TEST $CLI volume start $GMV0
##create_and_start_slave_volume
TEST $CLI volume create $GSV0 replica 2 $H0:$B0/${GSV0}{1,2};
TEST $CLI volume start $GSV0
##Create, start and mount meta_volume
TEST $CLI volume create $META_VOL replica 3 $H0:$B0/${META_VOL}{1,2,3};
TEST $CLI volume start $META_VOL
TEST mkdir -p $META_MNT
TEST glusterfs -s $H0 --volfile-id $META_VOL $META_MNT
############################################################
#BASIC GEO-REPLICATION TESTS
############################################################
#Create geo-rep session
TEST create_georep_session $master $slave
#Config gluster-command-dir
TEST $GEOREP_CLI $master $slave config gluster-command-dir ${GLUSTER_CMD_DIR}
#Config gluster-command-dir
TEST $GEOREP_CLI $master $slave config slave-gluster-command-dir ${GLUSTER_CMD_DIR}
#Enable_metavolume
TEST $GEOREP_CLI $master $slave config use_meta_volume true
#Wait for common secret pem file to be created
EXPECT_WITHIN $GEO_REP_TIMEOUT 0 check_common_secret_file
#Verify the keys are distributed
EXPECT_WITHIN $GEO_REP_TIMEOUT 0 check_keys_distributed
#Count no. of changelog socket
brick_pid=`ps -aef | grep glusterfsd | grep -v "shared_storage" | grep -v grep | awk -F " " '{print $2}'`
n=$(grep -Fc "changelog" /proc/$brick_pid/net/unix)
#Start_georep
TEST $GEOREP_CLI $master $slave start
EXPECT_WITHIN $GEO_REP_TIMEOUT 1 check_status_num_rows "Active"
EXPECT_WITHIN $GEO_REP_TIMEOUT 1 check_status_num_rows "Passive"
#Count no. of changelog socket
brick_pid=`ps -aef | grep glusterfsd | grep -v "shared_storage" | grep -v grep | awk -F " " '{print $2}'`
c=$(grep -Fc "changelog" /proc/$brick_pid/net/unix)
let expected=n+2
TEST [ "$c" -eq "$expected" ]
#Kill the "Active" brick
brick=$($GEOREP_CLI $master $slave status | grep -F "Active" | awk {'print $3'})
cat /proc/$brick_pid/net/unix | grep "changelog"
TEST kill_brick $GMV0 $H0 $brick
#Expect geo-rep status to be "Faulty"
EXPECT_WITHIN $GEO_REP_TIMEOUT 1 check_status_num_rows "Faulty"
EXPECT_WITHIN $GEO_REP_TIMEOUT 1 check_status_num_rows "Active"
#Count no. of changelog socket
brick_pid=`ps -aef | grep glusterfsd | grep -v "shared_storage" | grep -v grep | awk -F " " '{print $2}'`
cat /proc/$brick_pid/net/unix | grep "changelog"
ls -lrth /proc/$brick_pid/fd | grep "socket"
c=$(grep -Fc "changelog" /proc/$brick_pid/net/unix)
TEST [ "$c" -eq "$n" ]
#Stop Geo-rep
TEST $GEOREP_CLI $master $slave stop
#Delete Geo-rep
TEST $GEOREP_CLI $master $slave delete
#Cleanup authorized keys
sed -i '/^command=.*SSH_ORIGINAL_COMMAND#.*/d' ~/.ssh/authorized_keys
sed -i '/^command=.*gsyncd.*/d' ~/.ssh/authorized_keys
cleanup;

View File

@ -2,6 +2,7 @@
. $(dirname $0)/../../include.rc
. $(dirname $0)/../../volume.rc
SCRIPT_TIMEOUT=400
cleanup

View File

@ -134,6 +134,8 @@ changelog_rpc_notify(struct rpc_clnt *rpc, void *mydata, rpc_clnt_event_t event,
changelog_clnt_t *c_clnt = NULL;
changelog_priv_t *priv = NULL;
changelog_ev_selector_t *selection = NULL;
uint64_t clntcnt = 0;
uint64_t xprtcnt = 0;
crpc = mydata;
this = crpc->this;
@ -144,6 +146,7 @@ changelog_rpc_notify(struct rpc_clnt *rpc, void *mydata, rpc_clnt_event_t event,
switch (event) {
case RPC_CLNT_CONNECT:
selection = &priv->ev_selection;
GF_ATOMIC_INC(priv->clntcnt);
LOCK(&c_clnt->wait_lock);
{
@ -176,12 +179,23 @@ changelog_rpc_notify(struct rpc_clnt *rpc, void *mydata, rpc_clnt_event_t event,
changelog_set_disconnect_flag(crpc, _gf_true);
}
UNLOCK(&crpc->lock);
LOCK(&c_clnt->active_lock);
{
list_del_init(&crpc->list);
}
UNLOCK(&c_clnt->active_lock);
break;
case RPC_CLNT_MSG:
case RPC_CLNT_DESTROY:
/* Free up mydata */
changelog_rpc_clnt_unref(crpc);
clntcnt = GF_ATOMIC_DEC(priv->clntcnt);
xprtcnt = GF_ATOMIC_GET(priv->xprtcnt);
if (this->cleanup_starting) {
if (!clntcnt && !xprtcnt)
changelog_process_cleanup_event(this);
}
break;
case RPC_CLNT_PING:
break;

View File

@ -131,4 +131,6 @@ changelog_ev_queue_connection(changelog_clnt_t *, changelog_rpc_clnt_t *);
void
changelog_ev_cleanup_connections(xlator_t *, changelog_clnt_t *);
void
changelog_process_cleanup_event(xlator_t *);
#endif

View File

@ -307,6 +307,24 @@ struct changelog_priv {
/* glusterfind dependency to capture paths on deleted entries*/
gf_boolean_t capture_del_path;
/* Save total no. of listners */
gf_atomic_t listnercnt;
/* Save total no. of xprt are associated with listner */
gf_atomic_t xprtcnt;
/* Save xprt list */
struct list_head xprt_list;
/* Save total no. of client connection */
gf_atomic_t clntcnt;
/* Save cleanup brick in victim */
xlator_t *victim;
/* Status to save cleanup notify status */
gf_boolean_t notify_down;
};
struct changelog_local {

View File

@ -52,6 +52,7 @@ GLFS_MSGID(
CHANGELOG_MSG_FSTAT_OP_FAILED, CHANGELOG_MSG_LSEEK_OP_FAILED,
CHANGELOG_MSG_STRSTR_OP_FAILED, CHANGELOG_MSG_UNLINK_OP_FAILED,
CHANGELOG_MSG_DETECT_EMPTY_CHANGELOG_FAILED,
CHANGELOG_MSG_READLINK_OP_FAILED, CHANGELOG_MSG_EXPLICIT_ROLLOVER_FAILED);
CHANGELOG_MSG_READLINK_OP_FAILED, CHANGELOG_MSG_EXPLICIT_ROLLOVER_FAILED,
CHANGELOG_MSG_RPCSVC_NOTIFY_FAILED);
#endif /* !_CHANGELOG_MESSAGES_H_ */

View File

@ -260,6 +260,7 @@ changelog_rpc_server_destroy(xlator_t *this, rpcsvc_t *rpc, char *sockfile,
rpcsvc_listener_t *listener = NULL;
rpcsvc_listener_t *next = NULL;
struct rpcsvc_program *prog = NULL;
rpc_transport_t *trans = NULL;
while (*progs) {
prog = *progs;
@ -269,22 +270,25 @@ changelog_rpc_server_destroy(xlator_t *this, rpcsvc_t *rpc, char *sockfile,
list_for_each_entry_safe(listener, next, &rpc->listeners, list)
{
rpcsvc_listener_destroy(listener);
if (listener->trans) {
trans = listener->trans;
rpc_transport_disconnect(trans, _gf_false);
}
}
(void)rpcsvc_unregister_notify(rpc, fn, this);
sys_unlink(sockfile);
if (rpc->rxpool) {
mem_pool_destroy(rpc->rxpool);
rpc->rxpool = NULL;
}
/* TODO Avoid freeing rpc object in case of brick multiplex
after freeing rpc object svc->rpclock corrupted and it takes
more time to detach a brick
*/
if (!this->cleanup_starting)
if (!this->cleanup_starting) {
if (rpc->rxpool) {
mem_pool_destroy(rpc->rxpool);
rpc->rxpool = NULL;
}
GF_FREE(rpc);
}
}
rpcsvc_t *

View File

@ -43,9 +43,6 @@ changelog_cleanup_rpc_threads(xlator_t *this, changelog_priv_t *priv)
/** terminate dispatcher thread(s) */
changelog_cleanup_dispatchers(this, priv, priv->nr_dispatchers);
/* TODO: what about pending and waiting connections? */
changelog_ev_cleanup_connections(this, conn);
/* destroy locks */
ret = pthread_mutex_destroy(&conn->pending_lock);
if (ret != 0)
@ -147,48 +144,146 @@ int
changelog_rpcsvc_notify(rpcsvc_t *rpc, void *xl, rpcsvc_event_t event,
void *data)
{
xlator_t *this = NULL;
rpc_transport_t *trans = NULL;
rpc_transport_t *xprt = NULL;
rpc_transport_t *xp_next = NULL;
changelog_priv_t *priv = NULL;
uint64_t listnercnt = 0;
uint64_t xprtcnt = 0;
uint64_t clntcnt = 0;
rpcsvc_listener_t *listener = NULL;
rpcsvc_listener_t *next = NULL;
gf_boolean_t listner_found = _gf_false;
socket_private_t *sockpriv = NULL;
if (!xl || !data || !rpc) {
gf_msg_callingfn("changelog", GF_LOG_WARNING, 0,
CHANGELOG_MSG_RPCSVC_NOTIFY_FAILED,
"Calling rpc_notify without initializing");
goto out;
}
this = xl;
trans = data;
priv = this->private;
if (!priv) {
gf_msg_callingfn("changelog", GF_LOG_WARNING, 0,
CHANGELOG_MSG_RPCSVC_NOTIFY_FAILED,
"Calling rpc_notify without priv initializing");
goto out;
}
if (event == RPCSVC_EVENT_ACCEPT) {
GF_ATOMIC_INC(priv->xprtcnt);
LOCK(&priv->lock);
{
list_add_tail(&trans->list, &priv->xprt_list);
}
UNLOCK(&priv->lock);
goto out;
}
if (event == RPCSVC_EVENT_DISCONNECT) {
list_for_each_entry_safe(listener, next, &rpc->listeners, list)
{
if (listener && listener->trans) {
if (listener->trans == trans) {
listnercnt = GF_ATOMIC_DEC(priv->listnercnt);
listner_found = _gf_true;
rpcsvc_listener_destroy(listener);
}
}
}
if (listnercnt > 0) {
goto out;
}
if (listner_found) {
LOCK(&priv->lock);
list_for_each_entry_safe(xprt, xp_next, &priv->xprt_list, list)
{
sockpriv = (socket_private_t *)(xprt->private);
gf_log("changelog", GF_LOG_INFO,
"Send disconnect"
" on socket %d",
sockpriv->sock);
rpc_transport_disconnect(xprt, _gf_false);
}
UNLOCK(&priv->lock);
goto out;
}
LOCK(&priv->lock);
{
list_del_init(&trans->list);
}
UNLOCK(&priv->lock);
xprtcnt = GF_ATOMIC_DEC(priv->xprtcnt);
clntcnt = GF_ATOMIC_GET(priv->clntcnt);
if (!xprtcnt && !clntcnt) {
changelog_process_cleanup_event(this);
}
}
out:
return 0;
}
void
changelog_process_cleanup_event(xlator_t *this)
{
gf_boolean_t cleanup_notify = _gf_false;
changelog_priv_t *priv = NULL;
char sockfile[UNIX_PATH_MAX] = {
0,
};
if (!this)
return;
priv = this->private;
if (!priv)
return;
LOCK(&priv->lock);
{
cleanup_notify = priv->notify_down;
priv->notify_down = _gf_true;
}
UNLOCK(&priv->lock);
if (priv->victim && !cleanup_notify) {
default_notify(this, GF_EVENT_PARENT_DOWN, priv->victim);
if (priv->rpc) {
/* sockfile path could have been saved to avoid this */
CHANGELOG_MAKE_SOCKET_PATH(priv->changelog_brick, sockfile,
UNIX_PATH_MAX);
sys_unlink(sockfile);
(void)rpcsvc_unregister_notify(priv->rpc, changelog_rpcsvc_notify,
this);
if (priv->rpc->rxpool) {
mem_pool_destroy(priv->rpc->rxpool);
priv->rpc->rxpool = NULL;
}
GF_FREE(priv->rpc);
priv->rpc = NULL;
}
}
}
void
changelog_destroy_rpc_listner(xlator_t *this, changelog_priv_t *priv)
{
char sockfile[UNIX_PATH_MAX] = {
0,
};
changelog_clnt_t *c_clnt = &priv->connections;
changelog_rpc_clnt_t *crpc = NULL;
int nofconn = 0;
/* sockfile path could have been saved to avoid this */
CHANGELOG_MAKE_SOCKET_PATH(priv->changelog_brick, sockfile, UNIX_PATH_MAX);
changelog_rpc_server_destroy(this, priv->rpc, sockfile,
changelog_rpcsvc_notify, changelog_programs);
/* TODO Below approach is not perfect to wait for cleanup
all active connections without this code brick process
can be crash in case of brick multiplexing if any in-progress
request process on rpc by changelog xlator after
cleanup resources
*/
if (c_clnt) {
do {
nofconn = 0;
LOCK(&c_clnt->active_lock);
list_for_each_entry(crpc, &c_clnt->active, list) { nofconn++; }
UNLOCK(&c_clnt->active_lock);
LOCK(&c_clnt->wait_lock);
list_for_each_entry(crpc, &c_clnt->waitq, list) { nofconn++; }
UNLOCK(&c_clnt->wait_lock);
pthread_mutex_lock(&c_clnt->pending_lock);
list_for_each_entry(crpc, &c_clnt->pending, list) { nofconn++; }
pthread_mutex_unlock(&c_clnt->pending_lock);
} while (nofconn); /* Wait for all connection cleanup */
}
(void)changelog_cleanup_rpc_threads(this, priv);
}
rpcsvc_t *

View File

@ -2004,6 +2004,10 @@ notify(xlator_t *this, int event, void *data, ...)
struct list_head queue = {
0,
};
uint64_t xprtcnt = 0;
uint64_t clntcnt = 0;
changelog_clnt_t *conn = NULL;
gf_boolean_t cleanup_notify = _gf_false;
INIT_LIST_HEAD(&queue);
@ -2011,6 +2015,33 @@ notify(xlator_t *this, int event, void *data, ...)
if (!priv)
goto out;
if (event == GF_EVENT_PARENT_DOWN) {
priv->victim = data;
gf_log(this->name, GF_LOG_INFO,
"cleanup changelog rpc connection of brick %s",
priv->victim->name);
this->cleanup_starting = 1;
changelog_destroy_rpc_listner(this, priv);
conn = &priv->connections;
if (conn)
changelog_ev_cleanup_connections(this, conn);
xprtcnt = GF_ATOMIC_GET(priv->xprtcnt);
clntcnt = GF_ATOMIC_GET(priv->clntcnt);
if (!xprtcnt && !clntcnt) {
LOCK(&priv->lock);
{
cleanup_notify = priv->notify_down;
priv->notify_down = _gf_true;
}
UNLOCK(&priv->lock);
if (!cleanup_notify)
default_notify(this, GF_EVENT_PARENT_DOWN, data);
}
goto out;
}
if (event == GF_EVENT_TRANSLATOR_OP) {
dict = data;
@ -2629,8 +2660,10 @@ static void
changelog_cleanup_rpc(xlator_t *this, changelog_priv_t *priv)
{
/* terminate rpc server */
changelog_destroy_rpc_listner(this, priv);
if (!this->cleanup_starting)
changelog_destroy_rpc_listner(this, priv);
(void)changelog_cleanup_rpc_threads(this, priv);
/* cleanup rot buffs */
rbuf_dtor(priv->rbuf);
@ -2703,6 +2736,10 @@ init(xlator_t *this)
LOCK_INIT(&priv->lock);
LOCK_INIT(&priv->c_snap_lock);
GF_ATOMIC_INIT(priv->listnercnt, 0);
GF_ATOMIC_INIT(priv->clntcnt, 0);
GF_ATOMIC_INIT(priv->xprtcnt, 0);
INIT_LIST_HEAD(&priv->xprt_list);
ret = changelog_init_options(this, priv);
if (ret)