mirror of
https://github.com/OpenNebula/one.git
synced 2025-01-11 05:17:41 +03:00
461 lines
13 KiB
C++
461 lines
13 KiB
C++
/* -------------------------------------------------------------------------- */
|
|
/* Copyright 2002-2022, OpenNebula Project, OpenNebula Systems */
|
|
/* */
|
|
/* Licensed under the Apache License, Version 2.0 (the "License"); you may */
|
|
/* not use this file except in compliance with the License. You may obtain */
|
|
/* a copy of the License at */
|
|
/* */
|
|
/* http://www.apache.org/licenses/LICENSE-2.0 */
|
|
/* */
|
|
/* Unless required by applicable law or agreed to in writing, software */
|
|
/* distributed under the License is distributed on an "AS IS" BASIS, */
|
|
/* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. */
|
|
/* See the License for the specific language governing permissions and */
|
|
/* limitations under the License. */
|
|
/* -------------------------------------------------------------------------- */
|
|
|
|
#ifndef RAFT_MANAGER_H_
|
|
#define RAFT_MANAGER_H_
|
|
|
|
#include "Listener.h"
|
|
#include "ReplicaManager.h"
|
|
#include "ReplicaRequest.h"
|
|
#include "Template.h"
|
|
#include "ExecuteHook.h"
|
|
|
|
class LogDBRecord;
|
|
|
|
/* -------------------------------------------------------------------------- */
|
|
/* -------------------------------------------------------------------------- */
|
|
|
|
class RaftManager
|
|
{
|
|
public:
|
|
/**
|
|
* State of this server
|
|
*/
|
|
enum State {
|
|
SOLO = 0,
|
|
CANDIDATE = 1,
|
|
FOLLOWER = 2,
|
|
LEADER = 3
|
|
};
|
|
|
|
/**
|
|
* Raft manager constructor
|
|
* @param server_id of this server
|
|
* @param leader_hook_mad to be executed when follower->leader
|
|
* @param follower_hook_mad to be executed when leader->follower
|
|
* @param log_purge period to purge logDB records
|
|
* @param bcast heartbeat broadcast timeout
|
|
* @param election timeout
|
|
* @param xmlrpc timeout for RAFT related xmlrpc API calls
|
|
**/
|
|
RaftManager(int server_id, const VectorAttribute * leader_hook_mad,
|
|
const VectorAttribute * follower_hook_mad, time_t log_purge,
|
|
long long bcast, long long election, time_t xmlrpc,
|
|
const std::string& remotes_location);
|
|
|
|
~RaftManager() = default;
|
|
|
|
// -------------------------------------------------------------------------
|
|
// Raft associated actions (synchronous)
|
|
// -------------------------------------------------------------------------
|
|
/**
|
|
* Follower successfully replicated a log entry:
|
|
* - Increment next entry to send to follower
|
|
* - Update match entry on follower
|
|
* - Evaluate majority to apply changes to DB
|
|
*/
|
|
void replicate_success(int follower_id);
|
|
|
|
/**
|
|
* Follower failed to replicate a log entry because an inconsistency was
|
|
* detected (same index, different term):
|
|
* - Decrease follower next_index
|
|
* - Retry (do not wait for replica events)
|
|
*/
|
|
void replicate_failure(int follower_id);
|
|
|
|
/**
|
|
* Triggers a REPLICATE event, it will notify the replica threads to
|
|
* send the log to the followers
|
|
*/
|
|
void replicate_log(ReplicaRequest * rr);
|
|
|
|
/**
|
|
* Allocate a replica request fot the given index.
|
|
* @param rindex of the record for the request
|
|
*/
|
|
void replicate_allocate(uint64_t rindex)
|
|
{
|
|
requests.allocate(rindex);
|
|
}
|
|
|
|
/**
|
|
* Termination function
|
|
*/
|
|
void finalize();
|
|
|
|
// -------------------------------------------------------------------------
|
|
// Raft state query functions
|
|
// -------------------------------------------------------------------------
|
|
/**
|
|
* Return the Raft status in XML format
|
|
* @return xml document with the raft state
|
|
*/
|
|
std::string& to_xml(std::string& state_xml);
|
|
|
|
/**
|
|
* Makes this server follower. Stop associated replication facilities
|
|
*/
|
|
void follower(unsigned int term);
|
|
|
|
static std::string state_to_str(State _state)
|
|
{
|
|
std::string st;
|
|
|
|
switch (_state)
|
|
{
|
|
case SOLO:
|
|
st = "SOLO";
|
|
break;
|
|
case CANDIDATE:
|
|
st = "CANDIDATE";
|
|
break;
|
|
case FOLLOWER:
|
|
st = "FOLLOWER";
|
|
break;
|
|
case LEADER:
|
|
st = "LEADER";
|
|
break;
|
|
}
|
|
return st;
|
|
}
|
|
|
|
State get_state()
|
|
{
|
|
std::lock_guard<std::mutex> lock(raft_mutex);
|
|
|
|
return state;
|
|
}
|
|
|
|
unsigned int get_term()
|
|
{
|
|
std::lock_guard<std::mutex> lock(raft_mutex);
|
|
|
|
return term;
|
|
}
|
|
|
|
uint64_t get_commit()
|
|
{
|
|
std::lock_guard<std::mutex> lock(raft_mutex);
|
|
|
|
return commit;
|
|
}
|
|
|
|
/**
|
|
* Update the commit index = min(leader_commit, log index).
|
|
* @param leader_commit index sent by leader in a replicate xml-rpc call
|
|
* @param index of the last record inserted in the database
|
|
* @return the updated commit index
|
|
*/
|
|
uint64_t update_commit(uint64_t leader_commit, uint64_t index);
|
|
|
|
/**
|
|
* Evaluates a vote request. It is granted if no vote has been granted in
|
|
* this term or it is requested by the same candidate.
|
|
* @param _votedfor the candidate id
|
|
* @return -1 if vote is not granted
|
|
*/
|
|
int update_votedfor(int _votedfor);
|
|
|
|
/**
|
|
* Update the last_heartbeat time recieved from server. It stores the id
|
|
* of the leader.
|
|
* @param leader_id id of server, -1 if there is no leader set (e.g.
|
|
* during a election because a vote request was received)
|
|
*/
|
|
void update_last_heartbeat(int leader_id);
|
|
|
|
/**
|
|
* @return true if the server is the leader of the zone, runs in solo mode
|
|
* or is a follower
|
|
*/
|
|
bool is_leader()
|
|
{
|
|
return test_state(LEADER);
|
|
}
|
|
|
|
bool is_follower()
|
|
{
|
|
return test_state(FOLLOWER);
|
|
}
|
|
|
|
bool is_candidate()
|
|
{
|
|
return test_state(CANDIDATE);
|
|
}
|
|
|
|
bool is_solo()
|
|
{
|
|
return test_state(SOLO);
|
|
}
|
|
|
|
bool is_reconciling()
|
|
{
|
|
bool _reconciling;
|
|
|
|
std::lock_guard<std::mutex> lock(raft_mutex);
|
|
|
|
_reconciling = reconciling;
|
|
|
|
return _reconciling;
|
|
}
|
|
|
|
/**
|
|
* Get next index to send to the follower
|
|
* @param follower server id
|
|
* @return UINT64_MAX on failure, the next index if success
|
|
*/
|
|
uint64_t get_next_index(int follower_id)
|
|
{
|
|
uint64_t _index = UINT64_MAX;
|
|
|
|
std::lock_guard<std::mutex> lock(raft_mutex);
|
|
|
|
auto it = next.find(follower_id);
|
|
|
|
if ( it != next.end() )
|
|
{
|
|
_index = it->second;
|
|
}
|
|
|
|
return _index;
|
|
}
|
|
|
|
/**
|
|
* Gets the endpoint for xml-rpc calls of the current leader
|
|
* @param endpoint
|
|
* @return 0 on success, -1 if no leader found
|
|
*/
|
|
int get_leader_endpoint(std::string& endpoint);
|
|
|
|
// -------------------------------------------------------------------------
|
|
// XML-RPC Raft API calls
|
|
// -------------------------------------------------------------------------
|
|
/**
|
|
* Calls the follower xml-rpc method
|
|
* @param follower_id to make the call
|
|
* @param lr the record to replicate
|
|
* @param success of the xml-rpc method
|
|
* @param ft term in the follower as returned by the replicate call
|
|
* @param error describing error if any
|
|
* @return -1 if a XMl-RPC (network) error occurs, 0 otherwise
|
|
*/
|
|
int xmlrpc_replicate_log(int follower_id, LogDBRecord * lr, bool& success,
|
|
unsigned int& ft, std::string& error);
|
|
|
|
/**
|
|
* Calls the request vote xml-rpc method
|
|
* @param follower_id to make the call
|
|
* @param lindex highest last log index
|
|
* @param lterm highest last log term
|
|
* @param success of the xml-rpc method
|
|
* @param ft term in the follower as returned by the replicate call
|
|
* @param error describing error if any
|
|
* @return -1 if a XMl-RPC (network) error occurs, 0 otherwise
|
|
*/
|
|
int xmlrpc_request_vote(int follower_id, uint64_t lindex,
|
|
unsigned int lterm, bool& success, unsigned int& fterm,
|
|
std::string& error);
|
|
|
|
// -------------------------------------------------------------------------
|
|
// Server related interface
|
|
// -------------------------------------------------------------------------
|
|
/**
|
|
* Adds a new server to the follower list and starts associated replica
|
|
* thread.
|
|
* @param follower_id id of new server
|
|
* @param xmlep xmlrpc endpoint for new server
|
|
*/
|
|
void add_server(int follower_id, const std::string& xmlep);
|
|
|
|
/**
|
|
* Deletes a new server to the follower list and stops associated replica
|
|
* thread.
|
|
* @param follower_id id of server
|
|
*/
|
|
void delete_server(int follower_id);
|
|
|
|
/**
|
|
* Reset index for a follower.
|
|
* @param follower_id id of server
|
|
*/
|
|
void reset_index(int follower_id);
|
|
|
|
private:
|
|
std::mutex raft_mutex;
|
|
|
|
/**
|
|
* Clients waiting for a log replication
|
|
*/
|
|
ReplicaRequestMap requests;
|
|
|
|
// -------------------------------------------------------------------------
|
|
// Raft state
|
|
// -------------------------------------------------------------------------
|
|
/**
|
|
* Server state
|
|
*/
|
|
State state;
|
|
|
|
/**
|
|
* Server id
|
|
*/
|
|
int server_id;
|
|
|
|
/**
|
|
* Current term
|
|
*/
|
|
unsigned int term;
|
|
|
|
/**
|
|
* Number of servers in zone
|
|
*/
|
|
unsigned int num_servers;
|
|
|
|
/**
|
|
* Time when the last heartbeat was sent (LEADER) or received (FOLLOWER)
|
|
*/
|
|
struct timespec last_heartbeat;
|
|
|
|
/**
|
|
* ID of the last candidate we voted for ( -1 if none )
|
|
*/
|
|
int votedfor;
|
|
|
|
/**
|
|
* ID of leader for the current term
|
|
*/
|
|
int leader_id;
|
|
|
|
/**
|
|
* This is the raft persistent state: votedfor and current term. It is
|
|
* stored along the log in a special record (0, -1 , TEMPLATE, 0)
|
|
*/
|
|
Template raft_state;
|
|
|
|
/**
|
|
* Value for name column in system_attributes table for raft state.
|
|
*/
|
|
static const std::string raft_state_name;
|
|
|
|
/**
|
|
* After becoming a leader it is replicating and applying any pending
|
|
* log entry.
|
|
*/
|
|
bool reconciling;
|
|
|
|
//--------------------------------------------------------------------------
|
|
// Timers
|
|
// - timer_period_ms. Base timer to wake up the manager (10ms)
|
|
// - purge_period_ms. How often the LogDB is purged (600s)
|
|
// - xmlrpc_timeout. To timeout xml-rpc api calls to replicate log
|
|
// - election_timeout. Timeout leader heartbeats (followers)
|
|
// - broadcast_timeout. To send heartbeat to followers (leader)
|
|
//--------------------------------------------------------------------------
|
|
static const time_t timer_period_ms;
|
|
|
|
time_t purge_period_ms;
|
|
|
|
time_t xmlrpc_timeout_ms;
|
|
|
|
struct timespec election_timeout;
|
|
|
|
struct timespec broadcast_timeout;
|
|
|
|
/**
|
|
* Timer action async execution
|
|
*/
|
|
Timer timer_thread;
|
|
|
|
Timer purge_thread;
|
|
|
|
//--------------------------------------------------------------------------
|
|
// Volatile log index variables
|
|
// - commit, highest log known to be committed
|
|
// - applied, highest log applied to DB (in LogDB)
|
|
//
|
|
//---------------------------- LEADER VARIABLES ----------------------------
|
|
//
|
|
// - next, next log to send to each follower <follower, next>
|
|
// - match, highest log replicated in this server <follower, match>
|
|
// - servers, list of servers in zone and xml-rpc edp <follower, edp>
|
|
// -------------------------------------------------------------------------
|
|
RaftReplicaManager replica_manager;
|
|
|
|
HeartBeatManager heartbeat_manager;
|
|
|
|
uint64_t commit;
|
|
|
|
std::map<int, uint64_t> next;
|
|
|
|
std::map<int, uint64_t> match;
|
|
|
|
std::map<int, std::string> servers;
|
|
|
|
// -------------------------------------------------------------------------
|
|
// Hooks
|
|
// -------------------------------------------------------------------------
|
|
|
|
std::unique_ptr<ExecuteHook> leader_hook;
|
|
std::unique_ptr<ExecuteHook> follower_hook;
|
|
|
|
// -------------------------------------------------------------------------
|
|
// Internal Raft functions
|
|
// -------------------------------------------------------------------------
|
|
/**
|
|
* This function is executed periodically to vote leader
|
|
*/
|
|
void timer_action();
|
|
|
|
/**
|
|
* This function is executed periodically to purge the state log
|
|
*/
|
|
void purge_action();
|
|
|
|
/**
|
|
* @param s the state to check
|
|
* @return true if the server states matches the provided one
|
|
*/
|
|
bool test_state(State s)
|
|
{
|
|
bool _is_state;
|
|
|
|
std::lock_guard<std::mutex> lock(raft_mutex);
|
|
|
|
_is_state = state == s;
|
|
|
|
return _is_state;
|
|
}
|
|
|
|
/**
|
|
* Request votes of followers
|
|
*/
|
|
void request_vote();
|
|
|
|
/**
|
|
* Makes this server leader, and start replica threads
|
|
*/
|
|
void leader();
|
|
|
|
/**
|
|
* Init the raft state status row.
|
|
*/
|
|
int init_raft_state(const std::string& raft_xml);
|
|
};
|
|
|
|
#endif /*RAFT_MANAGER_H_*/
|
|
|