diff --git a/src/raft/RaftManager.cc b/src/raft/RaftManager.cc index e984227598..d92ed3638f 100644 --- a/src/raft/RaftManager.cc +++ b/src/raft/RaftManager.cc @@ -605,7 +605,7 @@ void RaftManager::replicate_success(int follower_id) } } - if ( db_last_index > replicated_index ) + if ((db_last_index > replicated_index) && (state == LEADER)) { replica_manager.replicate(follower_id); } @@ -632,7 +632,10 @@ void RaftManager::replicate_failure(int follower_id) } } - replica_manager.replicate(follower_id); + if ( state == LEADER ) + { + replica_manager.replicate(follower_id); + } pthread_mutex_unlock(&mutex); } diff --git a/src/raft/ReplicaManager.cc b/src/raft/ReplicaManager.cc index 3fe62bf13a..140e42a000 100644 --- a/src/raft/ReplicaManager.cc +++ b/src/raft/ReplicaManager.cc @@ -59,10 +59,6 @@ void ReplicaManager::stop_replica_threads() for ( it = thread_pool.begin() ; it != thread_pool.end() ; ++it ) { it->second->finalize(); - - pthread_cancel(it->second->thread_id()); - - delete it->second; } thread_pool.clear(); @@ -110,10 +106,6 @@ void ReplicaManager::delete_replica_thread(int follower_id) it->second->finalize(); - NebulaLog::log("RCM", Log::INFO, "Replication thread stopped"); - - delete it->second; - thread_pool.erase(it); }; diff --git a/src/raft/ReplicaThread.cc b/src/raft/ReplicaThread.cc index 20512a7f21..c7c7cc51ba 100644 --- a/src/raft/ReplicaThread.cc +++ b/src/raft/ReplicaThread.cc @@ -55,6 +55,10 @@ extern "C" void * replication_thread(void *arg) rt->do_replication(); + NebulaLog::log("RCM", Log::INFO, "Replication thread stopped"); + + delete rt; + return 0; }