1
0
mirror of https://github.com/OpenNebula/one.git synced 2024-12-22 13:33:52 +03:00

Improve checks to prevent unneeded replication triggers

(cherry picked from commit 50a588c38b)
(cherry picked from commit d21a0c9582)
(cherry picked from commit 64e7c5bbd0)
(cherry picked from commit 48881ae10a)
This commit is contained in:
Ruben S. Montero 2018-09-06 17:22:52 +02:00
parent 0557b79950
commit dd07086791
4 changed files with 82 additions and 8 deletions

View File

@ -91,6 +91,15 @@ public:
*/
void replicate_log(ReplicaRequest * rr);
/**
* Allocate a replica request fot the given index.
* @param rindex of the record for the request
*/
void replicate_allocate(int rindex)
{
requests.allocate(rindex);
}
/**
* Finalizes the Raft Consensus Manager
*/

View File

@ -135,7 +135,7 @@ public:
std::map<int, ReplicaRequest *>::iterator it = requests.find(rindex);
if ( it != requests.end() )
if ( it != requests.end() && it->second != 0 )
{
it->second->add_replica();
@ -152,6 +152,20 @@ public:
return to_commit;
}
/**
* Allocated an empty replica request. It marks a writer thread will wait
* on this request.
* @param rindex of the request
*/
void allocate(int rindex)
{
pthread_mutex_lock(&mutex);
requests.insert(std::make_pair(rindex, (ReplicaRequest*) 0));
pthread_mutex_unlock(&mutex);
}
/**
* Set the replication request associated to this index. If there is no
* previous request associated to the index it is created.
@ -168,6 +182,28 @@ public:
{
requests.insert(std::make_pair(rindex, rr));
}
else if ( it->second == 0 )
{
it->second = rr;
}
pthread_mutex_unlock(&mutex);
}
/**
* Remove a replication request associated to this index
* @param rindex of the request
*/
void remove(int rindex)
{
pthread_mutex_lock(&mutex);
std::map<int, ReplicaRequest *>::iterator it = requests.find(rindex);
if ( it != requests.end() )
{
requests.erase(it);
}
pthread_mutex_unlock(&mutex);
}
@ -200,6 +236,23 @@ public:
pthread_mutex_unlock(&mutex);
}
/**
* @return true if a replica request is set for this index
*/
bool is_replicable(int rindex)
{
pthread_mutex_lock(&mutex);
std::map<int, ReplicaRequest *>::iterator it = requests.find(rindex);
bool rc = it == requests.end() ||
(it != requests.end() && it->second != 0);
pthread_mutex_unlock(&mutex);
return rc;
}
private:
pthread_mutex_t mutex;

View File

@ -582,6 +582,8 @@ void RaftManager::replicate_log(ReplicaRequest * request)
{
request->notify();
requests.remove(request->index());
pthread_mutex_unlock(&mutex);
return;
}
@ -593,9 +595,15 @@ void RaftManager::replicate_log(ReplicaRequest * request)
for (it = next.begin(); it != next.end() && to_commit > 0; ++it)
{
if ( request->index() < (int) it->second )
int rindex = request->index();
if ( rindex < (int) it->second )
{
to_commit--;
}
else if ( rindex == (int) it->second )
{
replica_manager.replicate(it->first);
}
}
@ -607,6 +615,8 @@ void RaftManager::replicate_log(ReplicaRequest * request)
request->timeout = false;
commit = request->index();
requests.remove(request->index());
}
else
{
@ -615,11 +625,6 @@ void RaftManager::replicate_log(ReplicaRequest * request)
requests.set(request->index(), request);
}
if ( num_servers > 1 )
{
replica_manager.replicate();
}
pthread_mutex_unlock(&mutex);
}
@ -661,7 +666,8 @@ void RaftManager::replicate_success(int follower_id)
commit = replicated_index;
}
if ( db_lindex > replicated_index && state == LEADER )
if (db_lindex > replicated_index && state == LEADER &&
requests.is_replicable(replicated_index + 1))
{
replica_manager.replicate(follower_id);
}

View File

@ -405,6 +405,12 @@ int LogDB::insert_log_record(unsigned int term, std::ostringstream& sql,
return -1;
}
//allocate a replication request if log record is going to be replicated
if ( timestamp == 0 )
{
Nebula::instance().get_raftm()->replicate_allocate(next_index);
}
last_index = next_index;
last_term = term;