mirror of
https://github.com/OpenNebula/one.git
synced 2025-03-13 12:58:17 +03:00
F #4809: Fix deadlock when stopping replica threads
This commit is contained in:
parent
95f728253c
commit
6d61d510b6
@ -221,6 +221,8 @@ void FedReplicaManager::update_zones(std::vector<int>& zone_ids)
|
||||
|
||||
void FedReplicaManager::add_zone(int zone_id)
|
||||
{
|
||||
std::ostringstream oss;
|
||||
|
||||
Nebula& nd = Nebula::instance();
|
||||
ZonePool * zpool = nd.get_zonepool();
|
||||
|
||||
@ -234,15 +236,21 @@ void FedReplicaManager::add_zone(int zone_id)
|
||||
|
||||
zones.insert(make_pair(zone_id, zs));
|
||||
|
||||
pthread_mutex_unlock(&mutex);
|
||||
oss << "Starting federation replication thread for slave: " << zone_id;
|
||||
|
||||
NebulaLog::log("FRM", Log::INFO, oss);
|
||||
|
||||
add_replica_thread(zone_id);
|
||||
|
||||
pthread_mutex_unlock(&mutex);
|
||||
}
|
||||
|
||||
/* -------------------------------------------------------------------------- */
|
||||
|
||||
void FedReplicaManager::delete_zone(int zone_id)
|
||||
{
|
||||
std::ostringstream oss;
|
||||
|
||||
std::map<int, ZoneServers *>::iterator it;
|
||||
|
||||
pthread_mutex_lock(&mutex);
|
||||
@ -258,9 +266,13 @@ void FedReplicaManager::delete_zone(int zone_id)
|
||||
|
||||
zones.erase(it);
|
||||
|
||||
pthread_mutex_unlock(&mutex);
|
||||
oss << "Stopping replication thread for slave: " << zone_id;
|
||||
|
||||
NebulaLog::log("FRM", Log::INFO, oss);
|
||||
|
||||
delete_replica_thread(zone_id);
|
||||
|
||||
pthread_mutex_unlock(&mutex);
|
||||
};
|
||||
|
||||
/* -------------------------------------------------------------------------- */
|
||||
|
@ -291,6 +291,8 @@ int RaftManager::get_leader_endpoint(std::string& endpoint)
|
||||
|
||||
void RaftManager::add_server(int follower_id, const std::string& endpoint)
|
||||
{
|
||||
std::ostringstream oss;
|
||||
|
||||
LogDB * logdb = Nebula::instance().get_logdb();
|
||||
|
||||
unsigned int log_index, log_term;
|
||||
@ -299,6 +301,12 @@ void RaftManager::add_server(int follower_id, const std::string& endpoint)
|
||||
|
||||
pthread_mutex_lock(&mutex);
|
||||
|
||||
if ( state != LEADER )
|
||||
{
|
||||
pthread_mutex_unlock(&mutex);
|
||||
return;
|
||||
}
|
||||
|
||||
num_servers++;
|
||||
servers.insert(std::make_pair(follower_id, endpoint));
|
||||
|
||||
@ -306,6 +314,11 @@ void RaftManager::add_server(int follower_id, const std::string& endpoint)
|
||||
|
||||
match.insert(std::make_pair(follower_id, 0));
|
||||
|
||||
oss << "Starting replication and heartbeat threads for follower: "
|
||||
<< follower_id;
|
||||
|
||||
NebulaLog::log("RCM", Log::INFO, oss);
|
||||
|
||||
replica_manager.add_replica_thread(follower_id);
|
||||
|
||||
heartbeat_manager.add_replica_thread(follower_id);
|
||||
@ -317,10 +330,17 @@ void RaftManager::add_server(int follower_id, const std::string& endpoint)
|
||||
|
||||
void RaftManager::delete_server(int follower_id)
|
||||
{
|
||||
std::ostringstream oss;
|
||||
std::map<int, std::string> _servers;
|
||||
|
||||
pthread_mutex_lock(&mutex);
|
||||
|
||||
if ( state != LEADER )
|
||||
{
|
||||
pthread_mutex_unlock(&mutex);
|
||||
return;
|
||||
}
|
||||
|
||||
num_servers--;
|
||||
servers.erase(follower_id);
|
||||
|
||||
@ -328,6 +348,11 @@ void RaftManager::delete_server(int follower_id)
|
||||
|
||||
match.erase(follower_id);
|
||||
|
||||
oss << "Stopping replication and heartbeat threads for follower: "
|
||||
<< follower_id;
|
||||
|
||||
NebulaLog::log("RCM", Log::INFO, oss);
|
||||
|
||||
replica_manager.delete_replica_thread(follower_id);
|
||||
|
||||
heartbeat_manager.delete_replica_thread(follower_id);
|
||||
@ -367,11 +392,7 @@ void RaftManager::leader()
|
||||
|
||||
if ( state != CANDIDATE )
|
||||
{
|
||||
NebulaLog::log("RCM", Log::INFO, "Cannot become leader, no longer "
|
||||
"candidate");
|
||||
|
||||
pthread_mutex_unlock(&mutex);
|
||||
|
||||
return;
|
||||
}
|
||||
|
||||
|
@ -60,8 +60,6 @@ void ReplicaManager::stop_replica_threads()
|
||||
{
|
||||
it->second->finalize();
|
||||
|
||||
pthread_join(it->second->thread_id(), 0);
|
||||
|
||||
delete it->second;
|
||||
}
|
||||
|
||||
@ -98,7 +96,6 @@ void ReplicaManager::replicate(int follower)
|
||||
|
||||
void ReplicaManager::delete_replica_thread(int follower_id)
|
||||
{
|
||||
std::ostringstream oss;
|
||||
|
||||
std::map<int, ReplicaThread *>::iterator it;
|
||||
|
||||
@ -109,14 +106,8 @@ void ReplicaManager::delete_replica_thread(int follower_id)
|
||||
return;
|
||||
}
|
||||
|
||||
oss << "Stopping replication thread for follower: " << follower_id;
|
||||
|
||||
NebulaLog::log("RCM", Log::INFO, oss);
|
||||
|
||||
it->second->finalize();
|
||||
|
||||
pthread_join(it->second->thread_id(), 0);
|
||||
|
||||
NebulaLog::log("RCM", Log::INFO, "Replication thread stopped");
|
||||
|
||||
delete it->second;
|
||||
@ -129,8 +120,6 @@ void ReplicaManager::delete_replica_thread(int follower_id)
|
||||
|
||||
void ReplicaManager::add_replica_thread(int follower_id)
|
||||
{
|
||||
std::ostringstream oss;
|
||||
|
||||
pthread_attr_t pattr;
|
||||
pthread_t thid;
|
||||
|
||||
@ -147,11 +136,7 @@ void ReplicaManager::add_replica_thread(int follower_id)
|
||||
thread_pool.insert(std::make_pair(follower_id, rthread));
|
||||
|
||||
pthread_attr_init (&pattr);
|
||||
pthread_attr_setdetachstate(&pattr, PTHREAD_CREATE_JOINABLE);
|
||||
|
||||
oss << "Starting replication thread for follower: " << follower_id;
|
||||
|
||||
NebulaLog::log("RCM", Log::INFO, oss);
|
||||
pthread_attr_setdetachstate(&pattr, PTHREAD_CREATE_DETACHED);
|
||||
|
||||
pthread_create(&thid, &pattr, replication_thread, (void *) rthread);
|
||||
|
||||
|
Loading…
x
Reference in New Issue
Block a user