1
0
mirror of https://github.com/OpenNebula/one.git synced 2025-04-02 10:50:07 +03:00

F #4809: start/stop replica threads when adding/deleting servers

This commit is contained in:
Ruben S. Montero 2017-05-02 01:38:30 +02:00
parent 3be9f38d92
commit 071d0d840f
7 changed files with 99 additions and 26 deletions

View File

@ -221,10 +221,9 @@ public:
// -------------------------------------------------------------------------
// Server related interface
// -------------------------------------------------------------------------
/**
* Update the servers in the zone (numner of servers, endpoints...)
*/
void update_zone_servers();
void add_server(unsigned int follower_id);
void delete_server(unsigned int follower_id);
private:
friend void * raft_manager_loop(void *arg);
@ -274,13 +273,16 @@ private:
*/
struct timespec last_heartbeat;
/**
* Timers
* - timer_period_ms. Base timer to wake up the manager (10ms)
* - purge_period_ms. How often the LogDB is purged (600s)
* -
*/
//--------------------------------------------------------------------------
// Timers
// - timer_period_ms. Base timer to wake up the manager (10ms)
// - purge_period_ms. How often the LogDB is purged (600s)
// - xmlrpc_timeout. To timeout xml-rpc api calls to replicate log
// - election_timeout. Timeout leader heartbeats (followers)
// - broadcast_timeout. To send heartbeat to followers (leader)
//--------------------------------------------------------------------------
static const time_t timer_period_ms;
static const time_t purge_period_ms;
static const time_t xmlrpc_timeout_ms;
@ -298,6 +300,7 @@ private:
//
// - next, next log to send to each follower <follower, next>
// - match, highest log replicated in this server <follower, match>
// - servers, list of servers in zone and xml-rpc edp <follower, edp>
// -------------------------------------------------------------------------
ReplicaManager replica_manager;
@ -339,10 +342,22 @@ private:
return _is_state;
}
// -------------------------------------------------------------------------
// Helper functions
// -------------------------------------------------------------------------
/**
* Send the heartbeat to the followers
*/
void send_heartbeat();
/**
* Update the servers in the zone (numner of servers, endpoints...). This
* function updates:
* - num_servers
* - servers id
* - servers endpoints
*/
void update_zone_servers();
};
#endif /*RAFT_MANAGER_H_*/

View File

@ -49,11 +49,12 @@ public:
/**
* Add servers to this zone
* @param tmpl with SERVER definitions
* @param sid id of the new sever
* @param error
*
* @return 0 on success, -1 otherwise
*/
int add_server(Template& tmpl, string& error);
int add_server(Template& tmpl, int& sid, string& error);
/**
* Delete a server from this zone

View File

@ -159,7 +159,7 @@ public:
return static_cast<ZoneServer *>(get_attribute(id));
}
int add_server(VectorAttribute * va, string& error)
int add_server(VectorAttribute * va, int& sid, string& error)
{
ZoneServer * server = new ZoneServer(va, next_id);
@ -174,6 +174,8 @@ public:
add_attribute(server, next_id);
sid = next_id;
next_id += 1;
return 0;

View File

@ -152,6 +152,44 @@ void RaftManager::update_zone_servers()
pthread_mutex_unlock(&mutex);
}
/* -------------------------------------------------------------------------- */
void RaftManager::add_server(unsigned int follower_id)
{
LogDB * logdb = Nebula::instance().get_logdb();
unsigned int index = logdb->last_index();
update_zone_servers();
pthread_mutex_lock(&mutex);
next.insert(std::make_pair(follower_id, index + 1));
match.insert(std::make_pair(follower_id, 0));
replica_manager.add_replica_thread(follower_id);
pthread_mutex_unlock(&mutex);
};
/* -------------------------------------------------------------------------- */
void RaftManager::delete_server(unsigned int follower_id)
{
update_zone_servers();
pthread_mutex_lock(&mutex);
next.erase(follower_id);
match.erase(follower_id);
replica_manager.delete_replica_thread(follower_id);
pthread_mutex_unlock(&mutex);
};
/* -------------------------------------------------------------------------- */
/* -------------------------------------------------------------------------- */
@ -220,10 +258,10 @@ void RaftManager::leader(unsigned int _term)
term = _term;
pthread_mutex_unlock(&mutex);
replica_manager.start_replica_threads(_follower_ids);
pthread_mutex_unlock(&mutex);
NebulaLog::log("RCM", Log::INFO, "oned is now the leader of zone");
}
@ -237,12 +275,12 @@ void RaftManager::follower(unsigned int _term)
Nebula& nd = Nebula::instance();
LogDB * logdb = nd.get_logdb();
replica_manager.stop_replica_threads();
logdb->setup_index(lapplied, lindex);
pthread_mutex_lock(&mutex);
replica_manager.stop_replica_threads();
state = FOLLOWER;
term = _term;
@ -294,12 +332,12 @@ void RaftManager::replicate_log(ReplicaRequest * request)
requests.insert(std::make_pair(request->index(), request));
}
pthread_mutex_unlock(&mutex);
if ( num_servers > 1 )
{
replica_manager.replicate();
}
pthread_mutex_unlock(&mutex);
}
/* -------------------------------------------------------------------------- */
@ -315,6 +353,8 @@ void RaftManager::replicate_success(unsigned int follower_id)
Nebula& nd = Nebula::instance();
LogDB * logdb = nd.get_logdb();
int db_last_index = logdb->last_index();
pthread_mutex_lock(&mutex);
next_it = next.find(follower_id);
@ -345,12 +385,12 @@ void RaftManager::replicate_success(unsigned int follower_id)
}
}
pthread_mutex_unlock(&mutex);
if ( logdb->last_index() > replicated_index )
if ( db_last_index > replicated_index )
{
replica_manager.replicate(follower_id);
}
pthread_mutex_unlock(&mutex);
}
/* -------------------------------------------------------------------------- */
@ -369,9 +409,9 @@ void RaftManager::replicate_failure(unsigned int follower_id)
next_it->second = next_it->second - 1;
}
pthread_mutex_unlock(&mutex);
replica_manager.replicate(follower_id);
pthread_mutex_unlock(&mutex);
}
/* -------------------------------------------------------------------------- */

View File

@ -272,6 +272,8 @@ void ReplicaManager::replicate(int follower)
void ReplicaManager::delete_replica_thread(int follower_id)
{
std::ostringstream oss;
std::map<int, ReplicaThread *>::iterator it;
it = thread_pool.find(follower_id);
@ -281,10 +283,16 @@ void ReplicaManager::delete_replica_thread(int follower_id)
return;
}
oss << "Stopping replication thread for follower: " << follower_id;
NebulaLog::log("RCM", Log::INFO, oss);
it->second->finalize();
pthread_join(it->second->thread_id(), 0);
NebulaLog::log("RCM", Log::INFO, "Replication thread stopped");
delete it->second;
thread_pool.erase(it);

View File

@ -25,6 +25,7 @@ void ZoneAddServer::request_execute(xmlrpc_c::paramList const& paramList,
{
int id = xmlrpc_c::value_int(paramList.getInt(1));
string zs_str = xmlrpc_c::value_string(paramList.getString(2));
int zs_id;
string error_str;
@ -55,7 +56,7 @@ void ZoneAddServer::request_execute(xmlrpc_c::paramList const& paramList,
return;
}
if ( zone->add_server(zs_tmpl, att.resp_msg) == -1 )
if ( zone->add_server(zs_tmpl, zs_id, att.resp_msg) == -1 )
{
failure_response(ACTION, att);
@ -66,6 +67,8 @@ void ZoneAddServer::request_execute(xmlrpc_c::paramList const& paramList,
zone->unlock();
Nebula::instance().get_raftm()->add_server(zs_id);
success_response(id, att);
}
@ -107,6 +110,8 @@ void ZoneDeleteServer::request_execute(xmlrpc_c::paramList const& paramList,
zone->unlock();
Nebula::instance().get_raftm()->delete_server(zs_id);
success_response(id, att);
}

View File

@ -300,20 +300,22 @@ int Zone::post_update_template(string& error)
/* -------------------------------------------------------------------------- */
/* -------------------------------------------------------------------------- */
int Zone::add_server(Template& tmpl, string& error)
int Zone::add_server(Template& tmpl, int& sid, string& error)
{
vector<VectorAttribute *> vs;
vector<VectorAttribute *>::iterator it;
VectorAttribute * server;
sid = -1;
tmpl.get(ZoneServers::SERVER_NAME, vs);
for ( it = vs.begin() ; it != vs.end() ; ++it )
{
server = new VectorAttribute(*it);
if ( servers->add_server(server, error) == -1 )
if ( servers->add_server(server, sid, error) == -1 )
{
delete server;