/* -------------------------------------------------------------------------- */ /* Copyright 2002-2020, OpenNebula Project, OpenNebula Systems */ /* */ /* Licensed under the Apache License, Version 2.0 (the "License"); you may */ /* not use this file except in compliance with the License. You may obtain */ /* a copy of the License at */ /* */ /* http://www.apache.org/licenses/LICENSE-2.0 */ /* */ /* Unless required by applicable law or agreed to in writing, software */ /* distributed under the License is distributed on an "AS IS" BASIS, */ /* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. */ /* See the License for the specific language governing permissions and */ /* limitations under the License. */ /* -------------------------------------------------------------------------- */ #ifndef RAFT_MANAGER_H_ #define RAFT_MANAGER_H_ #include "ActionManager.h" #include "ReplicaManager.h" #include "ReplicaRequest.h" #include "Template.h" #include "ExecuteHook.h" extern "C" void * raft_manager_loop(void *arg); extern "C" void * reconciling_thread(void *arg); class LogDBRecord; /* -------------------------------------------------------------------------- */ /* -------------------------------------------------------------------------- */ class RaftManager : public ActionListener { public: /** * State of this server */ enum State { SOLO = 0, CANDIDATE = 1, FOLLOWER = 2, LEADER = 3 }; /** * Raft manager constructor * @param server_id of this server * @param leader_hook_mad to be executed when follower->leader * @param follower_hook_mad to be executed when leader->follower * @param log_purge period to purge logDB records * @param bcast heartbeat broadcast timeout * @param election timeout * @param xmlrpc timeout for RAFT related xmlrpc API calls **/ RaftManager(int server_id, const VectorAttribute * leader_hook_mad, const VectorAttribute * follower_hook_mad, time_t log_purge, long long bcast, long long election, time_t xmlrpc, const string& remotes_location); ~RaftManager() { delete leader_hook; delete follower_hook; pthread_mutex_destroy(&mutex); }; // ------------------------------------------------------------------------- // Raft associated actions (synchronous) // ------------------------------------------------------------------------- /** * Follower successfully replicated a log entry: * - Increment next entry to send to follower * - Update match entry on follower * - Evaluate majority to apply changes to DB */ void replicate_success(int follower_id); /** * Follower failed to replicate a log entry because an inconsistency was * detected (same index, different term): * - Decrease follower next_index * - Retry (do not wait for replica events) */ void replicate_failure(int follower_id); /** * Triggers a REPLICATE event, it will notify the replica threads to * send the log to the followers */ void replicate_log(ReplicaRequest * rr); /** * Allocate a replica request fot the given index. * @param rindex of the record for the request */ void replicate_allocate(uint64_t rindex) { requests.allocate(rindex); } /** * Finalizes the Raft Consensus Manager */ void finalize() { am.finalize(); } /** * Starts the Raft Consensus Manager */ int start(); pthread_t get_thread_id() const { return raft_thread; }; // ------------------------------------------------------------------------- // Raft state query functions // ------------------------------------------------------------------------- /** * Return the Raft status in XML format * @return xml document with the raft state */ std::string& to_xml(std::string& state_xml); /** * Makes this server follower. Stop associated replication facilities */ void follower(unsigned int term); static std::string state_to_str(State _state) { string st; switch (_state) { case SOLO: st = "SOLO"; break; case CANDIDATE: st = "CANDIDATE"; break; case FOLLOWER: st = "FOLLOWER"; break; case LEADER: st = "LEADER"; break; } return st; } State get_state() { State _state; pthread_mutex_lock(&mutex); _state = state; pthread_mutex_unlock(&mutex); return _state; } unsigned int get_term() { unsigned int _term; pthread_mutex_lock(&mutex); _term = term; pthread_mutex_unlock(&mutex); return _term; } uint64_t get_commit() { uint64_t _commit; pthread_mutex_lock(&mutex); _commit = commit; pthread_mutex_unlock(&mutex); return _commit; } /** * Update the commit index = min(leader_commit, log index). * @param leader_commit index sent by leader in a replicate xml-rpc call * @param index of the last record inserted in the database * @return the updated commit index */ uint64_t update_commit(uint64_t leader_commit, uint64_t index); /** * Evaluates a vote request. It is granted if no vote has been granted in * this term or it is requested by the same candidate. * @param _votedfor the candidate id * @return -1 if vote is not granted */ int update_votedfor(int _votedfor); /** * Update the last_heartbeat time recieved from server. It stores the id * of the leader. * @param leader_id id of server, -1 if there is no leader set (e.g. * during a election because a vote request was received) */ void update_last_heartbeat(int leader_id); /** * @return true if the server is the leader of the zone, runs in solo mode * or is a follower */ bool is_leader() { return test_state(LEADER); } bool is_follower() { return test_state(FOLLOWER); } bool is_candidate() { return test_state(CANDIDATE); } bool is_solo() { return test_state(SOLO); } bool is_reconciling() { bool _reconciling; pthread_mutex_lock(&mutex); _reconciling = reconciling; pthread_mutex_unlock(&mutex); return _reconciling; } /** * Get next index to send to the follower * @param follower server id * @return UINT64_MAX on failure, the next index if success */ uint64_t get_next_index(int follower_id) { std::map::iterator it; uint64_t _index = UINT64_MAX; pthread_mutex_lock(&mutex); it = next.find(follower_id); if ( it != next.end() ) { _index = it->second; } pthread_mutex_unlock(&mutex); return _index; } /** * Gets the endpoint for xml-rpc calls of the current leader * @param endpoint * @return 0 on success, -1 if no leader found */ int get_leader_endpoint(std::string& endpoint); // ------------------------------------------------------------------------- // XML-RPC Raft API calls // ------------------------------------------------------------------------- /** * Calls the follower xml-rpc method * @param follower_id to make the call * @param lr the record to replicate * @param success of the xml-rpc method * @param ft term in the follower as returned by the replicate call * @param error describing error if any * @return -1 if a XMl-RPC (network) error occurs, 0 otherwise */ int xmlrpc_replicate_log(int follower_id, LogDBRecord * lr, bool& success, unsigned int& ft, std::string& error); /** * Calls the request vote xml-rpc method * @param follower_id to make the call * @param lindex highest last log index * @param lterm highest last log term * @param success of the xml-rpc method * @param ft term in the follower as returned by the replicate call * @param error describing error if any * @return -1 if a XMl-RPC (network) error occurs, 0 otherwise */ int xmlrpc_request_vote(int follower_id, uint64_t lindex, unsigned int lterm, bool& success, unsigned int& fterm, std::string& error); // ------------------------------------------------------------------------- // Server related interface // ------------------------------------------------------------------------- /** * Adds a new server to the follower list and starts associated replica * thread. * @param follower_id id of new server * @param xmlep xmlrpc endpoint for new server */ void add_server(int follower_id, const std::string& xmlep); /** * Deletes a new server to the follower list and stops associated replica * thread. * @param follower_id id of server */ void delete_server(int follower_id); /** * Reset index for a follower. * @param follower_id id of server */ void reset_index(int follower_id); private: friend void * raft_manager_loop(void *arg); friend void * reconciling_thread(void *arg); /** * Thread id of the main event loop */ pthread_t raft_thread; pthread_mutex_t mutex; /** * Event engine for the RaftManager */ ActionManager am; /** * Clients waiting for a log replication */ ReplicaRequestMap requests; // ------------------------------------------------------------------------- // Raft state // ------------------------------------------------------------------------- /** * Server state */ State state; /** * Server id */ int server_id; /** * Current term */ unsigned int term; /** * Number of servers in zone */ unsigned int num_servers; /** * Time when the last heartbeat was sent (LEADER) or received (FOLLOWER) */ struct timespec last_heartbeat; /** * ID of the last candidate we voted for ( -1 if none ) */ int votedfor; /** * ID of leader for the current term */ int leader_id; /** * This is the raft persistent state: votedfor and current term. It is * stored along the log in a special record (0, -1 , TEMPLATE, 0) */ Template raft_state; /** * Value for name column in system_attributes table for raft state. */ static const string raft_state_name; /** * After becoming a leader it is replicating and applying any pending * log entry. */ bool reconciling; //-------------------------------------------------------------------------- // Timers // - timer_period_ms. Base timer to wake up the manager (10ms) // - purge_period_ms. How often the LogDB is purged (600s) // - xmlrpc_timeout. To timeout xml-rpc api calls to replicate log // - election_timeout. Timeout leader heartbeats (followers) // - broadcast_timeout. To send heartbeat to followers (leader) //-------------------------------------------------------------------------- static const time_t timer_period_ms; time_t purge_period_ms; time_t xmlrpc_timeout_ms; struct timespec election_timeout; struct timespec broadcast_timeout; //-------------------------------------------------------------------------- // Volatile log index variables // - commit, highest log known to be committed // - applied, highest log applied to DB (in LogDB) // //---------------------------- LEADER VARIABLES ---------------------------- // // - next, next log to send to each follower // - match, highest log replicated in this server // - servers, list of servers in zone and xml-rpc edp // ------------------------------------------------------------------------- RaftReplicaManager replica_manager; HeartBeatManager heartbeat_manager; uint64_t commit; std::map next; std::map match; std::map servers; // ------------------------------------------------------------------------- // Hooks // ------------------------------------------------------------------------- ExecuteHook * leader_hook; ExecuteHook * follower_hook; // ------------------------------------------------------------------------- // Action Listener interface // ------------------------------------------------------------------------- /** * Termination function */ void finalize_action(const ActionRequest& ar); /** * This function is executed periodically to purge the state log */ void timer_action(const ActionRequest& ar); /** * @param s the state to check * @return true if the server states matches the provided one */ bool test_state(State s) { bool _is_state; pthread_mutex_lock(&mutex); _is_state = state == s; pthread_mutex_unlock(&mutex); return _is_state; } // ------------------------------------------------------------------------- // Internal Raft functions // ------------------------------------------------------------------------- /** * Request votes of followers */ void request_vote(); /** * Makes this server leader, and start replica threads */ void leader(); /** * Init the raft state status row. */ int init_raft_state(const std::string& raft_xml); }; #endif /*RAFT_MANAGER_H_*/