1
0
mirror of https://github.com/OpenNebula/one.git synced 2025-01-24 02:03:52 +03:00

B #4636: Fix Federation replication bug (#4638)

This commit is contained in:
Christian González 2020-05-04 14:38:50 +02:00 committed by GitHub
parent 562855c08d
commit 7b6b054b1a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 43 additions and 15 deletions

View File

@ -94,20 +94,21 @@ public:
* Loads a log record from the database. Memory is allocated by this class * Loads a log record from the database. Memory is allocated by this class
* and needs to be freed. * and needs to be freed.
* @param index of the associated logDB entry * @param index of the associated logDB entry
* @param prev_index of the associated logDB entry
* @param lr logDBrecored to load from the DB * @param lr logDBrecored to load from the DB
* @return 0 on success -1 otherwise * @return 0 on success -1 otherwise
*/ */
int get_log_record(uint64_t index, LogDBRecord& lr); int get_log_record(uint64_t index, uint64_t prev_index, LogDBRecord& lr);
/** /**
* Applies the SQL command of the given record to the database. The * Applies the SQL command of the given record to the database. The
* timestamp of the record is updated. * timestamp of the record is updated. (Do not use for Federation)
* @param index of the log record * @param index of the log record
*/ */
int apply_log_records(uint64_t commit_index); int apply_log_records(uint64_t commit_index);
/** /**
* Deletes the record in start_index and all that follow it * Deletes the record in start_index and all that follow it (do not use for Federation)
* @param start_index first log record to delete * @param start_index first log record to delete
*/ */
int delete_log_records(uint64_t start_index); int delete_log_records(uint64_t start_index);

View File

@ -310,7 +310,22 @@ int FedReplicaManager::get_next_record(int zone_id, std::string& zedp,
return -2; return -2;
} }
int rc = logdb->get_log_record(zs->next, lr); int prev_index = logdb->previous_federated(zs->next);
if ( prev_index == UINT64_MAX )
{
std::ostringstream oss;
oss << "Missing federation record previous to: " << zs->next;
error = oss.str();
pthread_mutex_unlock(&mutex);
return -1;
}
int rc = logdb->get_log_record(zs->next, prev_index, lr);
if ( rc == -1 ) if ( rc == -1 )
{ {

View File

@ -199,7 +199,7 @@ int RaftReplicaThread::replicate()
} }
if ( logdb->get_log_record(next_index, lr) != 0 ) if ( logdb->get_log_record(next_index, next_index - 1, lr) != 0 )
{ {
ostringstream ess; ostringstream ess;

View File

@ -404,7 +404,7 @@ void ZoneReplicateLog::request_execute(xmlrpc_c::paramList const& paramList,
if ( index > 0 ) if ( index > 0 )
{ {
if ( logdb->get_log_record(prev_index, prev_lr) != 0 ) if ( logdb->get_log_record(prev_index, prev_index - 1, prev_lr) != 0 )
{ {
att.resp_msg = "Error loading previous log record"; att.resp_msg = "Error loading previous log record";
att.resp_id = current_term; att.resp_id = current_term;
@ -423,7 +423,7 @@ void ZoneReplicateLog::request_execute(xmlrpc_c::paramList const& paramList,
} }
} }
if ( logdb->get_log_record(index, lr) == 0 ) if ( logdb->get_log_record(index, index - 1, lr) == 0 )
{ {
if ( lr.term != term ) if ( lr.term != term )
{ {

View File

@ -131,7 +131,7 @@ LogDB::LogDB(SqlDB * _db, bool _solo, bool _cache, uint64_t _lret, uint64_t _lp)
LogDBRecord lr; LogDBRecord lr;
if ( get_log_record(0, lr) != 0 ) if ( get_log_record(0, 0, lr) != 0 )
{ {
std::ostringstream oss; std::ostringstream oss;
@ -195,7 +195,7 @@ int LogDB::setup_index(uint64_t& _last_applied, uint64_t& _last_index)
last_applied = _last_applied; last_applied = _last_applied;
} }
rc += get_log_record(last_index, lr); rc += get_log_record(last_index, last_index - 1, lr);
if ( rc == 0 ) if ( rc == 0 )
{ {
@ -212,12 +212,10 @@ int LogDB::setup_index(uint64_t& _last_applied, uint64_t& _last_index)
/* -------------------------------------------------------------------------- */ /* -------------------------------------------------------------------------- */
/* -------------------------------------------------------------------------- */ /* -------------------------------------------------------------------------- */
int LogDB::get_log_record(uint64_t index, LogDBRecord& lr) int LogDB::get_log_record(uint64_t index, uint64_t prev_index, LogDBRecord& lr)
{ {
ostringstream oss; ostringstream oss;
uint64_t prev_index = index - 1;
if ( index == 0 ) if ( index == 0 )
{ {
prev_index = 0; prev_index = 0;
@ -353,8 +351,22 @@ int LogDB::insert(uint64_t index, unsigned int term, const std::string& sql,
{ {
//Check for duplicate (leader retrying i.e. xmlrpc client timeout) //Check for duplicate (leader retrying i.e. xmlrpc client timeout)
LogDBRecord lr; LogDBRecord lr;
int prev_index;
if ( get_log_record(index, lr) == 0 ) if (fed_index == UINT64_MAX)
{
prev_index = index - 1;
}
else
{
prev_index = previous_federated(index);
}
if ( fed_index != UINT64_MAX && prev_index == UINT64_MAX )
{
rc = -1;
}
else if ( get_log_record(index, prev_index, lr) == 0 )
{ {
NebulaLog::log("DBM", Log::ERROR, "Duplicated log record"); NebulaLog::log("DBM", Log::ERROR, "Duplicated log record");
rc = 0; rc = 0;
@ -565,7 +577,7 @@ int LogDB::delete_log_records(uint64_t start_index)
last_index = start_index - 1; last_index = start_index - 1;
if ( get_log_record(last_index, lr) == 0 ) if ( get_log_record(last_index, last_index - 1, lr) == 0 )
{ {
last_term = lr.term; last_term = lr.term;
} }
@ -587,7 +599,7 @@ int LogDB::apply_log_records(uint64_t commit_index)
{ {
LogDBRecord lr; LogDBRecord lr;
if ( get_log_record(last_applied + 1, lr) != 0 ) if ( get_log_record(last_applied + 1, last_applied, lr) != 0 )
{ {
pthread_mutex_unlock(&mutex); pthread_mutex_unlock(&mutex);
return -1; return -1;