/* -------------------------------------------------------------------------- */ /* Copyright 2002-2016, OpenNebula Project, OpenNebula Systems */ /* */ /* Licensed under the Apache License, Version 2.0 (the "License"); you may */ /* not use this file except in compliance with the License. You may obtain */ /* a copy of the License at */ /* */ /* http://www.apache.org/licenses/LICENSE-2.0 */ /* */ /* Unless required by applicable law or agreed to in writing, software */ /* distributed under the License is distributed on an "AS IS" BASIS, */ /* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. */ /* See the License for the specific language governing permissions and */ /* limitations under the License. */ /* -------------------------------------------------------------------------- */ #ifndef RAFT_MANAGER_H_ #define RAFT_MANAGER_H_ #include "ActionManager.h" #include "ReplicaManager.h" #include "ReplicaRequest.h" extern "C" void * raft_manager_loop(void *arg); /* -------------------------------------------------------------------------- */ /* -------------------------------------------------------------------------- */ class RaftManager : public ActionListener { public: /** * State of this server */ enum State { SOLO = 0, CANDIDATE = 1, FOLLOWER = 2, LEADER = 3 }; /** * Raft manager constructor * @param server_id of this server * @param log_purge period to purge logDB records * @param bcast heartbeat broadcast timeout * @param election timeout * @param xmlrpc timeout for RAFT related xmlrpc API calls **/ RaftManager(int server_id, time_t log_purge, long long bcast, long long election, time_t xmlrpc); virtual ~RaftManager(){}; // ------------------------------------------------------------------------- // Raft associated actions (synchronous) // ------------------------------------------------------------------------- /** * Follower successfully replicated a log entry: * - Increment next entry to send to follower * - Update match entry on follower * - Evaluate majority to apply changes to DB */ void replicate_success(unsigned int follower_id); /** * Follower failed to replicate a log entry because an inconsistency was * detected (same index, different term): * - Decrease follower next_index * - Retry (do not wait for replica events) */ void replicate_failure(unsigned int follower_id); /** * Triggers a REPLICATE event, it will notify the replica threads to * send the log to the followers */ void replicate_log(ReplicaRequest * rr); /** * Finalizes the Raft Consensus Manager */ void finalize() { am.finalize(); } /** * Starts the Raft Consensus Manager */ int start(); pthread_t get_thread_id() const { return raft_thread; }; // ------------------------------------------------------------------------- // Raft state query functions // ------------------------------------------------------------------------- /** * Return the Raft status in XML format * @return xml document with the raft state */ std::string& to_xml(std::string& state_xml); /** * Makes this server follower. Stop associated replication facilities */ void follower(unsigned int term); unsigned int get_term() { unsigned int _term; pthread_mutex_lock(&mutex); _term = term; pthread_mutex_unlock(&mutex); return _term; } unsigned int get_commit() { unsigned int _commit; pthread_mutex_lock(&mutex); _commit = commit; pthread_mutex_unlock(&mutex); return _commit; } /** * @param leader_commit index sent by leader in a replicate xml-rpc call * @param index of the last record inserted in the database * @return the updated commit index */ unsigned int update_commit(unsigned int leader_commit, unsigned int index) { unsigned int _commit; pthread_mutex_lock(&mutex); if ( leader_commit > commit ) { if ( index < leader_commit ) { commit = index; } else { commit = leader_commit; } } _commit = commit; pthread_mutex_unlock(&mutex); return _commit; } /** * Evaluates a vote request. It is granted if no vote has been granted in * this term or it is requested by the same candidate. * @param _votedfor the candidate id * @return -1 if vote is not granted */ int update_votedfor(int _votedfor); /** * Update the last_heartbeat time recieved from server */ void update_last_heartbeat(); /** * @return true if the server is the leader of the zone, runs in solo mode * or is a follower */ bool is_leader() { return test_state(LEADER); } bool is_follower() { return test_state(FOLLOWER); } bool is_candidate() { return test_state(CANDIDATE); } bool is_solo() { return test_state(SOLO); } /** * Get next index to send to the follower * @param follower server id * @return -1 on failure, the next index if success */ int get_next_index(unsigned int follower_id) { std::map::iterator it; unsigned int _index = -1; pthread_mutex_lock(&mutex); it = next.find(follower_id); if ( it != next.end() ) { _index = it->second; } pthread_mutex_unlock(&mutex); return _index; } // ------------------------------------------------------------------------- // XML-RPC Raft API calls // ------------------------------------------------------------------------- /** * Calls the follower xml-rpc method * @param follower_id to make the call * @param lr the record to replicate * @param success of the xml-rpc method * @param ft term in the follower as returned by the replicate call * @param error describing error if any * @return -1 if a XMl-RPC (network) error occurs, 0 otherwise */ int xmlrpc_replicate_log(int follower_id, LogDBRecord * lr, bool& success, unsigned int& ft, std::string& error); /** * Calls the request vote xml-rpc method * @param follower_id to make the call * @param lindex highest last log index * @param lterm highest last log term * @param success of the xml-rpc method * @param ft term in the follower as returned by the replicate call * @param error describing error if any * @return -1 if a XMl-RPC (network) error occurs, 0 otherwise */ int xmlrpc_request_vote(int follower_id, unsigned int lindex, unsigned int lterm, bool& success, unsigned int& fterm, std::string& error); // ------------------------------------------------------------------------- // Server related interface // ------------------------------------------------------------------------- /** * Adds a new server to the follower list and starts associated replica * thread. * @param follower_id id of new server */ void add_server(unsigned int follower_id); /** * Deletes a new server to the follower list and stops associated replica * thread. * @param follower_id id of server */ void delete_server(unsigned int follower_id); private: friend void * raft_manager_loop(void *arg); /** * Thread id of the main event loop */ pthread_t raft_thread; pthread_mutex_t mutex; /** * Event engine for the RaftManager */ ActionManager am; /** * Clients waiting for a log replication */ std::map requests; // ------------------------------------------------------------------------- // Raft state // ------------------------------------------------------------------------- /** * Server state */ State state; /** * Server id */ int server_id; /** * Current term */ unsigned int term; /** * Number of servers in zone */ unsigned int num_servers; /** * Time when the last heartbeat was sent (LEADER) or received (FOLLOWER) */ struct timespec last_heartbeat; /** * ID of the last candidate we voted for ( -1 if none ) */ int votedfor; /** * This is the raft persistent state: votedfor and current term. It is * stored along the log in a special record (0, -1 , TEMPLATE, 0) */ Template raft_state; //-------------------------------------------------------------------------- // Timers // - timer_period_ms. Base timer to wake up the manager (10ms) // - purge_period_ms. How often the LogDB is purged (600s) // - xmlrpc_timeout. To timeout xml-rpc api calls to replicate log // - election_timeout. Timeout leader heartbeats (followers) // - broadcast_timeout. To send heartbeat to followers (leader) //-------------------------------------------------------------------------- static const time_t timer_period_ms; time_t purge_period_ms; time_t xmlrpc_timeout_ms; struct timespec election_timeout; struct timespec broadcast_timeout; //-------------------------------------------------------------------------- // Volatile log index variables // - commit, highest log known to be committed // - applied, highest log applied to DB (in LogDB) // //---------------------------- LEADER VARIABLES ---------------------------- // // - next, next log to send to each follower // - match, highest log replicated in this server // - servers, list of servers in zone and xml-rpc edp // ------------------------------------------------------------------------- ReplicaManager replica_manager; unsigned int commit; std::map next; std::map match; std::map servers; // ------------------------------------------------------------------------- // Action Listener interface // ------------------------------------------------------------------------- /** * Termination function */ void finalize_action(const ActionRequest& ar); /** * This function is executed periodically to purge the state log */ void timer_action(const ActionRequest& ar); /** * @param s the state to check * @return true if the server states matches the provided one */ bool test_state(State s) { bool _is_state; pthread_mutex_lock(&mutex); _is_state = state == s; pthread_mutex_unlock(&mutex); return _is_state; } // ------------------------------------------------------------------------- // Internal Raft functions // ------------------------------------------------------------------------- /** * Send the heartbeat to the followers. */ void send_heartbeat(); /** * Request votes of followers */ void request_vote(); /** * Makes this server leader, and start replica threads */ void leader(); }; #endif /*RAFT_MANAGER_H_*/