/* -------------------------------------------------------------------------- */ /* Copyright 2002-2024, OpenNebula Project, OpenNebula Systems */ /* */ /* Licensed under the Apache License, Version 2.0 (the "License"); you may */ /* not use this file except in compliance with the License. You may obtain */ /* a copy of the License at */ /* */ /* http://www.apache.org/licenses/LICENSE-2.0 */ /* */ /* Unless required by applicable law or agreed to in writing, software */ /* distributed under the License is distributed on an "AS IS" BASIS, */ /* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. */ /* See the License for the specific language governing permissions and */ /* limitations under the License. */ /* -------------------------------------------------------------------------- */ #ifndef REPLICA_REQUEST_H_ #define REPLICA_REQUEST_H_ #include "SyncRequest.h" /** * This class represents a log entry replication request. The replication request * is synchronous: once it has been replicated in a majority of followers the * client is notified (SqlDB::exec_wr() call) and DB updated. */ class ReplicaRequest : public SyncRequest { public: ReplicaRequest(uint64_t i):_index(i), _to_commit(-1), _replicas(1) {}; ~ReplicaRequest() {}; /** * This function updates the number of replicas of the record and decrement * the number of servers left to reach majority consensus. If it reaches 0, * the client is notified * @return number of replicas for this log */ int add_replica() { int __replicas; _replicas++; if ( _to_commit > 0 ) { _to_commit--; } __replicas = _replicas; if ( _to_commit == 0 ) { result = true; timeout = false; notify(); } return __replicas; } /* ---------------------------------------------------------------------- */ /* Class access methods */ /* ---------------------------------------------------------------------- */ uint64_t index() { return _index; } int replicas() { return _replicas; } int to_commit() { return _to_commit; } void to_commit(int c) { _to_commit = c; } private: /** * Index for this log entry */ uint64_t _index; /** * Remaining number of servers that need to replicate this record to commit * it. Initialized to ( Number_Servers - 1 ) / 2 */ int _to_commit; /** * Total number of replicas for this entry */ int _replicas; }; /** * This class represents a map of replication requests. It syncs access between * RaftManager and DB writer threads. A DB writer allocates and set the * request and then it waits on it for completion. */ class ReplicaRequestMap { public: ReplicaRequestMap() = default; virtual ~ReplicaRequestMap() = default; /** * Increments the number of replicas of this request. If it can be * committed the request is removed from the map * @param rindex of the request * * @return the number of replicas to commit, if 0 it can be committed */ int add_replica(uint64_t rindex) { int to_commit = -1; std::lock_guard lock(_mutex); auto it = requests.find(rindex); if ( it != requests.end() && it->second ) { it->second->add_replica(); to_commit = it->second->to_commit(); if ( to_commit == 0 ) { requests.erase(it); } } return to_commit; } /** * Allocated an empty replica request. It marks a writer thread will wait * on this request. * @param rindex of the request */ void allocate(uint64_t rindex) { std::lock_guard lock(_mutex); requests.insert(std::make_pair(rindex, (ReplicaRequest*) 0)); } /** * Set the replication request associated to this index. If there is no * previous request associated to the index it is created. * @param rindex of the request * @param rr replica request pointer */ void set(uint64_t rindex, ReplicaRequest * rr) { std::lock_guard lock(_mutex); requests[rindex] = rr; } /** * Remove a replication request associated to this index * @param rindex of the request */ void remove(uint64_t rindex) { std::lock_guard lock(_mutex); requests.erase(rindex); } /** * Notify all writers and clear the replica map */ void clear() { std::lock_guard lock(_mutex); for ( auto it = requests.begin() ; it != requests.end() ; ++it ) { if ( it->second == 0 ) { continue; } it->second->result = false; it->second->timeout = false; it->second->message = "oned is now follower"; it->second->notify(); } requests.clear(); } /** * @return true if a replica request is set for this index */ bool is_replicable(uint64_t rindex) { std::lock_guard lock(_mutex); auto it = requests.find(rindex); return (it == requests.end()) || (it->second != nullptr); } private: std::mutex _mutex; /** * Clients waiting for a log replication */ std::map requests; }; #endif /*REPLICA_REQUEST_H_*/