diff --git a/include/RaftManager.h b/include/RaftManager.h index 57a51cf63f..0cd2a72aaa 100644 --- a/include/RaftManager.h +++ b/include/RaftManager.h @@ -91,6 +91,15 @@ public: */ void replicate_log(ReplicaRequest * rr); + /** + * Allocate a replica request fot the given index. + * @param rindex of the record for the request + */ + void replicate_allocate(int rindex) + { + requests.allocate(rindex); + } + /** * Finalizes the Raft Consensus Manager */ diff --git a/include/ReplicaRequest.h b/include/ReplicaRequest.h index da9bdf2f3d..a567413ef0 100644 --- a/include/ReplicaRequest.h +++ b/include/ReplicaRequest.h @@ -135,7 +135,7 @@ public: std::map::iterator it = requests.find(rindex); - if ( it != requests.end() ) + if ( it != requests.end() && it->second != 0 ) { it->second->add_replica(); @@ -152,6 +152,20 @@ public: return to_commit; } + /** + * Allocated an empty replica request. It marks a writer thread will wait + * on this request. + * @param rindex of the request + */ + void allocate(int rindex) + { + pthread_mutex_lock(&mutex); + + requests.insert(std::make_pair(rindex, (ReplicaRequest*) 0)); + + pthread_mutex_unlock(&mutex); + } + /** * Set the replication request associated to this index. If there is no * previous request associated to the index it is created. @@ -168,6 +182,28 @@ public: { requests.insert(std::make_pair(rindex, rr)); } + else if ( it->second == 0 ) + { + it->second = rr; + } + + pthread_mutex_unlock(&mutex); + } + + /** + * Remove a replication request associated to this index + * @param rindex of the request + */ + void remove(int rindex) + { + pthread_mutex_lock(&mutex); + + std::map::iterator it = requests.find(rindex); + + if ( it != requests.end() ) + { + requests.erase(it); + } pthread_mutex_unlock(&mutex); } @@ -200,6 +236,23 @@ public: pthread_mutex_unlock(&mutex); } + /** + * @return true if a replica request is set for this index + */ + bool is_replicable(int rindex) + { + pthread_mutex_lock(&mutex); + + std::map::iterator it = requests.find(rindex); + + bool rc = it == requests.end() || + (it != requests.end() && it->second != 0); + + pthread_mutex_unlock(&mutex); + + return rc; + } + private: pthread_mutex_t mutex; diff --git a/src/raft/RaftManager.cc b/src/raft/RaftManager.cc index d079c9cda4..aa948ed4c9 100644 --- a/src/raft/RaftManager.cc +++ b/src/raft/RaftManager.cc @@ -582,6 +582,8 @@ void RaftManager::replicate_log(ReplicaRequest * request) { request->notify(); + requests.remove(request->index()); + pthread_mutex_unlock(&mutex); return; } @@ -593,9 +595,15 @@ void RaftManager::replicate_log(ReplicaRequest * request) for (it = next.begin(); it != next.end() && to_commit > 0; ++it) { - if ( request->index() < (int) it->second ) + int rindex = request->index(); + + if ( rindex < (int) it->second ) { to_commit--; + } + else if ( rindex == (int) it->second ) + { + replica_manager.replicate(it->first); } } @@ -607,6 +615,8 @@ void RaftManager::replicate_log(ReplicaRequest * request) request->timeout = false; commit = request->index(); + + requests.remove(request->index()); } else { @@ -615,11 +625,6 @@ void RaftManager::replicate_log(ReplicaRequest * request) requests.set(request->index(), request); } - if ( num_servers > 1 ) - { - replica_manager.replicate(); - } - pthread_mutex_unlock(&mutex); } @@ -661,7 +666,8 @@ void RaftManager::replicate_success(int follower_id) commit = replicated_index; } - if ( db_lindex > replicated_index && state == LEADER ) + if (db_lindex > replicated_index && state == LEADER && + requests.is_replicable(replicated_index + 1)) { replica_manager.replicate(follower_id); } diff --git a/src/sql/LogDB.cc b/src/sql/LogDB.cc index a50f6a49ce..4bd585079d 100644 --- a/src/sql/LogDB.cc +++ b/src/sql/LogDB.cc @@ -405,6 +405,12 @@ int LogDB::insert_log_record(unsigned int term, std::ostringstream& sql, return -1; } + //allocate a replication request if log record is going to be replicated + if ( timestamp == 0 ) + { + Nebula::instance().get_raftm()->replicate_allocate(next_index); + } + last_index = next_index; last_term = term;