diff --git a/src/raft/FedReplicaManager.cc b/src/raft/FedReplicaManager.cc index 4e964709ea..1bef300c29 100644 --- a/src/raft/FedReplicaManager.cc +++ b/src/raft/FedReplicaManager.cc @@ -221,6 +221,8 @@ void FedReplicaManager::update_zones(std::vector& zone_ids) void FedReplicaManager::add_zone(int zone_id) { + std::ostringstream oss; + Nebula& nd = Nebula::instance(); ZonePool * zpool = nd.get_zonepool(); @@ -234,15 +236,21 @@ void FedReplicaManager::add_zone(int zone_id) zones.insert(make_pair(zone_id, zs)); - pthread_mutex_unlock(&mutex); + oss << "Starting federation replication thread for slave: " << zone_id; + + NebulaLog::log("FRM", Log::INFO, oss); add_replica_thread(zone_id); + + pthread_mutex_unlock(&mutex); } /* -------------------------------------------------------------------------- */ void FedReplicaManager::delete_zone(int zone_id) { + std::ostringstream oss; + std::map::iterator it; pthread_mutex_lock(&mutex); @@ -258,9 +266,13 @@ void FedReplicaManager::delete_zone(int zone_id) zones.erase(it); - pthread_mutex_unlock(&mutex); + oss << "Stopping replication thread for slave: " << zone_id; + + NebulaLog::log("FRM", Log::INFO, oss); delete_replica_thread(zone_id); + + pthread_mutex_unlock(&mutex); }; /* -------------------------------------------------------------------------- */ diff --git a/src/raft/RaftManager.cc b/src/raft/RaftManager.cc index 6d83b08e05..e984227598 100644 --- a/src/raft/RaftManager.cc +++ b/src/raft/RaftManager.cc @@ -291,6 +291,8 @@ int RaftManager::get_leader_endpoint(std::string& endpoint) void RaftManager::add_server(int follower_id, const std::string& endpoint) { + std::ostringstream oss; + LogDB * logdb = Nebula::instance().get_logdb(); unsigned int log_index, log_term; @@ -299,6 +301,12 @@ void RaftManager::add_server(int follower_id, const std::string& endpoint) pthread_mutex_lock(&mutex); + if ( state != LEADER ) + { + pthread_mutex_unlock(&mutex); + return; + } + num_servers++; servers.insert(std::make_pair(follower_id, endpoint)); @@ -306,6 +314,11 @@ void RaftManager::add_server(int follower_id, const std::string& endpoint) match.insert(std::make_pair(follower_id, 0)); + oss << "Starting replication and heartbeat threads for follower: " + << follower_id; + + NebulaLog::log("RCM", Log::INFO, oss); + replica_manager.add_replica_thread(follower_id); heartbeat_manager.add_replica_thread(follower_id); @@ -317,10 +330,17 @@ void RaftManager::add_server(int follower_id, const std::string& endpoint) void RaftManager::delete_server(int follower_id) { + std::ostringstream oss; std::map _servers; pthread_mutex_lock(&mutex); + if ( state != LEADER ) + { + pthread_mutex_unlock(&mutex); + return; + } + num_servers--; servers.erase(follower_id); @@ -328,6 +348,11 @@ void RaftManager::delete_server(int follower_id) match.erase(follower_id); + oss << "Stopping replication and heartbeat threads for follower: " + << follower_id; + + NebulaLog::log("RCM", Log::INFO, oss); + replica_manager.delete_replica_thread(follower_id); heartbeat_manager.delete_replica_thread(follower_id); @@ -367,11 +392,7 @@ void RaftManager::leader() if ( state != CANDIDATE ) { - NebulaLog::log("RCM", Log::INFO, "Cannot become leader, no longer " - "candidate"); - pthread_mutex_unlock(&mutex); - return; } diff --git a/src/raft/ReplicaManager.cc b/src/raft/ReplicaManager.cc index 4aaaabde7d..cc7a0e44f7 100644 --- a/src/raft/ReplicaManager.cc +++ b/src/raft/ReplicaManager.cc @@ -60,8 +60,6 @@ void ReplicaManager::stop_replica_threads() { it->second->finalize(); - pthread_join(it->second->thread_id(), 0); - delete it->second; } @@ -98,7 +96,6 @@ void ReplicaManager::replicate(int follower) void ReplicaManager::delete_replica_thread(int follower_id) { - std::ostringstream oss; std::map::iterator it; @@ -109,14 +106,8 @@ void ReplicaManager::delete_replica_thread(int follower_id) return; } - oss << "Stopping replication thread for follower: " << follower_id; - - NebulaLog::log("RCM", Log::INFO, oss); - it->second->finalize(); - pthread_join(it->second->thread_id(), 0); - NebulaLog::log("RCM", Log::INFO, "Replication thread stopped"); delete it->second; @@ -129,8 +120,6 @@ void ReplicaManager::delete_replica_thread(int follower_id) void ReplicaManager::add_replica_thread(int follower_id) { - std::ostringstream oss; - pthread_attr_t pattr; pthread_t thid; @@ -147,11 +136,7 @@ void ReplicaManager::add_replica_thread(int follower_id) thread_pool.insert(std::make_pair(follower_id, rthread)); pthread_attr_init (&pattr); - pthread_attr_setdetachstate(&pattr, PTHREAD_CREATE_JOINABLE); - - oss << "Starting replication thread for follower: " << follower_id; - - NebulaLog::log("RCM", Log::INFO, oss); + pthread_attr_setdetachstate(&pattr, PTHREAD_CREATE_DETACHED); pthread_create(&thid, &pattr, replication_thread, (void *) rthread);