1
0
mirror of https://github.com/OpenNebula/one.git synced 2025-03-15 18:50:09 +03:00

F #4809: Replication logic

This commit is contained in:
Ruben S. Montero 2017-04-23 01:43:01 +02:00
parent ed0d64a2a0
commit b26e5a716a
3 changed files with 153 additions and 25 deletions

View File

@ -124,6 +124,8 @@ private:
bool _finalize;
bool _pending_requests;
// ---------------------------------------------------------------------
// Information of the replication target server and leader
// ---------------------------------------------------------------------

View File

@ -135,6 +135,30 @@ public:
return state == LEADER;
}
/**
* Decrease the next log entry to send to this follower
*/
void dec_next()
{
next--;
}
/**
* Increase the next log entry to send to this follower
*/
void inc_next()
{
next++;
}
/**
* Set the the index of the highest log entry on this follower
*/
void set_match(unsigned int _match)
{
match = _match;
}
private:
State state;

View File

@ -229,11 +229,20 @@ void LogDBManager::ReplicaThread::do_replication()
{
pthread_mutex_lock(&mutex);
if ( _finalize )
while ( _pending_requests == false )
{
return;
pthread_cond_wait(&cond,&mutex);
if ( _finalize )
{
return;
}
}
_pending_requests = false;
pthread_mutex_unlock(&mutex);
Zone * zone = zpool->get(zone_id, true);
if ( zone == 0 )
@ -280,6 +289,13 @@ void LogDBManager::ReplicaThread::do_replication()
lr->unlock();
ostringstream oss;
oss << "Replicating log entry " << id << "-" << term << " on server: "
<< follower_id << " (" << follower_edp <<")";
NebulaLog::log("DBM", Log::DDEBUG, oss);
xmlrpc_c::carriageParm_curl0 carriage(follower_edp);
xmlrpc_c::paramList replica_params;
@ -295,36 +311,120 @@ void LogDBManager::ReplicaThread::do_replication()
xmlrpc_c::rpc rpc_client(replica_method, replica_params);
rpc_client.call(&client, &carriage);
/*
try
xmlrpc_c::rpcPtr rpc(method, plist);
xmlrpc_c::carriageParm_curl0 cparam(one_endpoint);
{
rpc_client.call(&client, &carriage);
rpc->start(&client, &cparam);
if ( rpc_client.isSuccessful() )
{
xmlrpc_c::value result = rpc_client.getResult();
client.finishAsync(xmlrpc_c::timeout(timeout));
vector<xmlrpc_c::value> values =
xmlrpc_c::value_array(result).vectorValueValue();
if (!rpc->isFinished())
{
rpc->finishErr(girerr::error("XMLRPC method " + method +
" timeout, resetting call"));
}
bool success = xmlrpc_c::value_boolean(values[0]);
if (rpc->isSuccessful())
{
*result = rpc->getResult();
}
else
{
xmlrpc_c::fault failure = rpc->getFault();
if ( success )
{
zone = zpool->get(zone_id, true);
girerr::error(failure.getDescription());
}
if ( zone == 0 )
{
continue;
}
*/
pthread_mutex_unlock(&mutex);
ZoneServer * follower = zone->get_server(follower_id);
if ( follower == 0 )
{
zone->unlock();
continue;
}
follower->inc_next();
follower->set_match(id);
zone->unlock();
}
else
{
int follower_term = xmlrpc_c::value_boolean(values[1]);
if ( follower_term > term )
{
//Convert to follower
// - Update term
// - Set state to follower
// - Stop replica threads
ostringstream ess;
ess << "Detected a higher term on follower: "
<< follower_id << " giving up leadership";
NebulaLog::log("DBM", Log::WARNING, ess);
}
else
{
//Log inconsistency in follower
// - Decrease follower index
// - Retry
ostringstream ess;
ess << "Log inconsistency detected on follower: "
<< follower_id;
NebulaLog::log("DBM", Log::WARNING, ess);
zone = zpool->get(zone_id, true);
if ( zone == 0 )
{
continue;
}
ZoneServer * follower = zone->get_server(follower_id);
if ( follower == 0 )
{
zone->unlock();
continue;
}
follower->dec_next();
zone->unlock();
_pending_requests = true;
}
}
}
else //RPC failed, will retry on next replication request
{
ostringstream ess;
xmlrpc_c::fault failure = rpc_client.getFault();
ess << "Error replicating log entry " << id << "-" << term
<< " on follower " << follower_id << ": "
<< failure.getDescription();
NebulaLog::log("DBM", Log::ERROR, ess);
}
}
catch (exception const& e)
{
ostringstream ess;
ess << "Error replicating log entry " << id << "-" << term
<< " on follower " << follower_id << ": " << e.what();
NebulaLog::log("DBM", Log::ERROR, ess);
continue;
}
}
}
@ -337,6 +437,8 @@ void LogDBManager::ReplicaThread::finalize()
_finalize = true;
_pending_requests = false;
pthread_cond_signal(&cond);
pthread_mutex_unlock(&mutex);