1
0
mirror of https://github.com/OpenNebula/one.git synced 2024-12-22 13:33:52 +03:00

Compute number of replicas needed to commit a log record

(cherry picked from commit 877df62c4f)
This commit is contained in:
Ruben S. Montero 2018-08-30 02:26:30 +02:00
parent a46b8fd1a4
commit 32dd228376
4 changed files with 18 additions and 92 deletions

View File

@ -91,15 +91,6 @@ public:
*/
void replicate_log(ReplicaRequest * rr);
/**
* Allocate a replica request fot the given index.
* @param rindex of the record for the request
*/
void replicate_allocate(int rindex)
{
requests.allocate(rindex);
}
/**
* Finalizes the Raft Consensus Manager
*/

View File

@ -135,7 +135,7 @@ public:
std::map<int, ReplicaRequest *>::iterator it = requests.find(rindex);
if ( it != requests.end() && it->second != 0 )
if ( it != requests.end() )
{
it->second->add_replica();
@ -152,25 +152,6 @@ public:
return to_commit;
}
/**
* Allocated an empty replica request. It marks a writer thread will wait
* on this request.
* @param rindex of the request
*/
void allocate(int rindex)
{
pthread_mutex_lock(&mutex);
std::map<int, ReplicaRequest *>::iterator it = requests.find(rindex);
if ( it == requests.end() )
{
requests.insert(std::make_pair(rindex, (ReplicaRequest*) 0));
}
pthread_mutex_unlock(&mutex);
}
/**
* Set the replication request associated to this index. If there is no
* previous request associated to the index it is created.
@ -187,28 +168,6 @@ public:
{
requests.insert(std::make_pair(rindex, rr));
}
else if ( it->second == 0 )
{
it->second = rr;
}
pthread_mutex_unlock(&mutex);
}
/**
* Remove a replication request associated to this index
* @param rindex of the request
*/
void remove(int rindex)
{
pthread_mutex_lock(&mutex);
std::map<int, ReplicaRequest *>::iterator it = requests.find(rindex);
if ( it != requests.end() )
{
requests.erase(it);
}
pthread_mutex_unlock(&mutex);
}
@ -241,35 +200,8 @@ public:
pthread_mutex_unlock(&mutex);
}
/**
* @return true if a replica request needs to be replicated
* - last DB index is greater than current replicated index
* - there is no allocation in progress for the next rindex
*/
bool need_replica(int db_index, int rindex)
{
if ( db_index <= rindex )
{
return false;
}
pthread_mutex_lock(&mutex);
std::map<int,ReplicaRequest *>::iterator it = requests.find(rindex + 1);
bool rc = true;
if ( it != requests.end() && it->second == 0 )
{
rc = false;
}
pthread_mutex_unlock(&mutex);
return rc;
}
private:
pthread_mutex_t mutex;
/**

View File

@ -582,13 +582,24 @@ void RaftManager::replicate_log(ReplicaRequest * request)
{
request->notify();
requests.remove(request->index());
pthread_mutex_unlock(&mutex);
return;
}
if ( num_servers <= 1 )
//Count servers that need to replicate this record
int to_commit = num_servers / 2;
std::map<int, unsigned int>::iterator it;
for (it = next.begin(); it != next.end() && to_commit > 0; ++it)
{
if ( request->index() < (int) it->second )
{
to_commit--;
}
}
if ( to_commit <= 0 )
{
request->notify();
@ -596,12 +607,10 @@ void RaftManager::replicate_log(ReplicaRequest * request)
request->timeout = false;
commit = request->index();
requests.remove(request->index());
}
else
{
request->to_commit(num_servers / 2 );
request->to_commit(to_commit);
requests.set(request->index(), request);
}
@ -652,7 +661,7 @@ void RaftManager::replicate_success(int follower_id)
commit = replicated_index;
}
if ( requests.need_replica(db_lindex, replicated_index) && state == LEADER )
if ( db_lindex > replicated_index && state == LEADER )
{
replica_manager.replicate(follower_id);
}

View File

@ -405,12 +405,6 @@ int LogDB::insert_log_record(unsigned int term, std::ostringstream& sql,
return -1;
}
//allocate a replication request if log record is going to be replicated
if ( timestamp == 0 )
{
Nebula::instance().get_raftm()->replicate_allocate(next_index);
}
last_index = next_index;
last_term = term;