2017-04-25 18:15:31 +03:00
/* -------------------------------------------------------------------------- */
/* Copyright 2002-2016, OpenNebula Project, OpenNebula Systems */
/* */
/* Licensed under the Apache License, Version 2.0 (the "License"); you may */
/* not use this file except in compliance with the License. You may obtain */
/* a copy of the License at */
/* */
/* http://www.apache.org/licenses/LICENSE-2.0 */
/* */
/* Unless required by applicable law or agreed to in writing, software */
/* distributed under the License is distributed on an "AS IS" BASIS, */
/* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. */
/* See the License for the specific language governing permissions and */
/* limitations under the License. */
/* -------------------------------------------------------------------------- */
# ifndef RAFT_MANAGER_H_
# define RAFT_MANAGER_H_
# include "ActionManager.h"
2017-04-27 02:03:44 +03:00
# include "ReplicaManager.h"
# include "ReplicaRequest.h"
2017-05-15 00:48:46 +03:00
# include "Template.h"
2017-04-25 18:15:31 +03:00
extern " C " void * raft_manager_loop ( void * arg ) ;
/* -------------------------------------------------------------------------- */
/* -------------------------------------------------------------------------- */
class RaftManager : public ActionListener
{
public :
2017-04-27 02:03:44 +03:00
/**
* State of this server
*/
enum State {
SOLO = 0 ,
CANDIDATE = 1 ,
FOLLOWER = 2 ,
LEADER = 3
} ;
2017-05-02 18:57:55 +03:00
/**
* Raft manager constructor
* @ param server_id of this server
* @ param log_purge period to purge logDB records
* @ param bcast heartbeat broadcast timeout
* @ param election timeout
* @ param xmlrpc timeout for RAFT related xmlrpc API calls
* */
RaftManager ( int server_id , time_t log_purge , long long bcast ,
long long election , time_t xmlrpc ) ;
2017-04-25 18:15:31 +03:00
virtual ~ RaftManager ( ) { } ;
2017-04-27 02:03:44 +03:00
// -------------------------------------------------------------------------
2017-04-28 23:23:32 +03:00
// Raft associated actions (synchronous)
2017-04-27 02:03:44 +03:00
// -------------------------------------------------------------------------
2017-04-25 18:15:31 +03:00
/**
2017-04-28 23:23:32 +03:00
* Follower successfully replicated a log entry :
* - Increment next entry to send to follower
* - Update match entry on follower
* - Evaluate majority to apply changes to DB
2017-04-25 18:15:31 +03:00
*/
2017-04-28 23:23:32 +03:00
void replicate_success ( unsigned int follower_id ) ;
2017-04-25 18:15:31 +03:00
2017-04-27 02:03:44 +03:00
/**
2017-04-28 23:23:32 +03:00
* Follower failed to replicate a log entry because an inconsistency was
* detected ( same index , different term ) :
* - Decrease follower next_index
* - Retry ( do not wait for replica events )
2017-04-27 02:03:44 +03:00
*/
2017-04-28 23:23:32 +03:00
void replicate_failure ( unsigned int follower_id ) ;
2017-04-27 02:03:44 +03:00
/**
* Triggers a REPLICATE event , it will notify the replica threads to
* send the log to the followers
*/
2017-04-28 23:23:32 +03:00
void replicate_log ( ReplicaRequest * rr ) ;
2017-04-27 02:03:44 +03:00
/**
* Finalizes the Raft Consensus Manager
*/
2017-04-25 18:15:31 +03:00
void finalize ( )
{
am . finalize ( ) ;
}
2017-04-27 02:03:44 +03:00
/**
* Starts the Raft Consensus Manager
*/
2017-04-25 18:15:31 +03:00
int start ( ) ;
pthread_t get_thread_id ( ) const
{
return raft_thread ;
} ;
2017-04-27 02:03:44 +03:00
// -------------------------------------------------------------------------
// Raft state query functions
// -------------------------------------------------------------------------
2017-05-03 21:04:42 +03:00
/**
2017-05-06 01:17:27 +03:00
* Return the Raft status in XML format
* @ return xml document with the raft state
*/
std : : string & to_xml ( std : : string & state_xml ) ;
/**
2017-05-03 21:04:42 +03:00
* Makes this server follower . Stop associated replication facilities
*/
void follower ( unsigned int term ) ;
2017-04-27 02:03:44 +03:00
unsigned int get_term ( )
{
unsigned int _term ;
pthread_mutex_lock ( & mutex ) ;
_term = term ;
pthread_mutex_unlock ( & mutex ) ;
return _term ;
}
unsigned int get_commit ( )
{
unsigned int _commit ;
pthread_mutex_lock ( & mutex ) ;
_commit = commit ;
pthread_mutex_unlock ( & mutex ) ;
return _commit ;
}
2017-04-27 13:42:09 +03:00
/**
2017-05-07 00:36:08 +03:00
* Update the commit index = min ( leader_commit , log index ) .
2017-04-27 13:42:09 +03:00
* @ param leader_commit index sent by leader in a replicate xml - rpc call
* @ param index of the last record inserted in the database
* @ return the updated commit index
*/
2017-05-07 00:36:08 +03:00
unsigned int update_commit ( unsigned int leader_commit , unsigned int index ) ;
2017-04-27 02:03:44 +03:00
2017-05-03 00:43:18 +03:00
/**
* Evaluates a vote request . It is granted if no vote has been granted in
* this term or it is requested by the same candidate .
* @ param _votedfor the candidate id
* @ return - 1 if vote is not granted
*/
int update_votedfor ( int _votedfor ) ;
2017-04-30 20:56:47 +03:00
/**
2017-05-08 11:12:49 +03:00
* Update the last_heartbeat time recieved from server . It stores the id
* of the leader .
* @ param leader_id id of server , - 1 if there is no leader set ( e . g .
* during a election because a vote request was received )
2017-04-30 20:56:47 +03:00
*/
2017-05-08 11:12:49 +03:00
void update_last_heartbeat ( int leader_id ) ;
2017-04-30 20:56:47 +03:00
2017-04-27 02:03:44 +03:00
/**
2017-04-27 13:42:09 +03:00
* @ return true if the server is the leader of the zone , runs in solo mode
* or is a follower
2017-04-27 02:03:44 +03:00
*/
2017-04-27 13:42:09 +03:00
bool is_leader ( )
2017-04-27 02:03:44 +03:00
{
2017-04-27 13:42:09 +03:00
return test_state ( LEADER ) ;
}
2017-04-27 02:03:44 +03:00
2017-04-27 13:42:09 +03:00
bool is_follower ( )
{
return test_state ( FOLLOWER ) ;
2017-04-27 02:03:44 +03:00
}
2017-05-05 16:46:57 +03:00
bool is_candidate ( )
{
return test_state ( CANDIDATE ) ;
}
2017-04-28 23:46:37 +03:00
bool is_solo ( )
{
return test_state ( SOLO ) ;
}
2017-04-27 02:03:44 +03:00
/**
* Get next index to send to the follower
* @ param follower server id
* @ return - 1 on failure , the next index if success
*/
int get_next_index ( unsigned int follower_id )
{
std : : map < unsigned int , unsigned int > : : iterator it ;
unsigned int _index = - 1 ;
pthread_mutex_lock ( & mutex ) ;
it = next . find ( follower_id ) ;
if ( it ! = next . end ( ) )
{
_index = it - > second ;
}
pthread_mutex_unlock ( & mutex ) ;
return _index ;
}
2017-05-08 20:48:41 +03:00
/**
* Gets the endpoint for xml - rpc calls of the current leader
* @ param endpoint
* @ return 0 on success , - 1 if no leader found
*/
int get_leader_endpoint ( std : : string & endpoint ) ;
2017-04-30 20:56:47 +03:00
// -------------------------------------------------------------------------
// XML-RPC Raft API calls
// -------------------------------------------------------------------------
/**
* Calls the follower xml - rpc method
* @ param follower_id to make the call
* @ param lr the record to replicate
* @ param success of the xml - rpc method
* @ param ft term in the follower as returned by the replicate call
* @ param error describing error if any
* @ return - 1 if a XMl - RPC ( network ) error occurs , 0 otherwise
*/
int xmlrpc_replicate_log ( int follower_id , LogDBRecord * lr , bool & success ,
unsigned int & ft , std : : string & error ) ;
2017-05-03 21:04:42 +03:00
/**
* Calls the request vote xml - rpc method
* @ param follower_id to make the call
* @ param lindex highest last log index
* @ param lterm highest last log term
* @ param success of the xml - rpc method
* @ param ft term in the follower as returned by the replicate call
* @ param error describing error if any
* @ return - 1 if a XMl - RPC ( network ) error occurs , 0 otherwise
*/
int xmlrpc_request_vote ( int follower_id , unsigned int lindex ,
unsigned int lterm , bool & success , unsigned int & fterm ,
std : : string & error ) ;
2017-04-30 20:56:47 +03:00
// -------------------------------------------------------------------------
// Server related interface
// -------------------------------------------------------------------------
2017-05-02 18:57:55 +03:00
/**
* Adds a new server to the follower list and starts associated replica
* thread .
* @ param follower_id id of new server
*/
2017-05-02 02:38:30 +03:00
void add_server ( unsigned int follower_id ) ;
2017-05-02 18:57:55 +03:00
/**
* Deletes a new server to the follower list and stops associated replica
* thread .
* @ param follower_id id of server
*/
2017-05-02 02:38:30 +03:00
void delete_server ( unsigned int follower_id ) ;
2017-04-30 20:56:47 +03:00
2017-04-25 18:15:31 +03:00
private :
friend void * raft_manager_loop ( void * arg ) ;
2017-04-27 02:03:44 +03:00
/**
* Thread id of the main event loop
*/
2017-04-25 18:15:31 +03:00
pthread_t raft_thread ;
2017-04-27 02:03:44 +03:00
pthread_mutex_t mutex ;
2017-04-25 18:15:31 +03:00
/**
* Event engine for the RaftManager
*/
ActionManager am ;
2017-04-27 02:03:44 +03:00
/**
* Clients waiting for a log replication
*/
std : : map < unsigned int , ReplicaRequest * > requests ;
// -------------------------------------------------------------------------
// Raft state
// -------------------------------------------------------------------------
/**
* Server state
*/
State state ;
2017-04-30 20:56:47 +03:00
/**
* Server id
*/
int server_id ;
2017-04-27 02:03:44 +03:00
/**
* Current term
*/
unsigned int term ;
2017-04-28 20:35:57 +03:00
/**
* Number of servers in zone
*/
unsigned int num_servers ;
2017-04-30 20:56:47 +03:00
/**
* Time when the last heartbeat was sent ( LEADER ) or received ( FOLLOWER )
*/
struct timespec last_heartbeat ;
2017-04-30 00:25:53 +03:00
2017-05-03 00:43:18 +03:00
/**
* ID of the last candidate we voted for ( - 1 if none )
*/
int votedfor ;
2017-05-08 11:12:49 +03:00
/**
* ID of leader for the current term
*/
int leader_id ;
2017-05-03 00:43:18 +03:00
/**
* This is the raft persistent state : votedfor and current term . It is
* stored along the log in a special record ( 0 , - 1 , TEMPLATE , 0 )
*/
Template raft_state ;
2017-05-02 02:38:30 +03:00
//--------------------------------------------------------------------------
// Timers
// - timer_period_ms. Base timer to wake up the manager (10ms)
// - purge_period_ms. How often the LogDB is purged (600s)
// - xmlrpc_timeout. To timeout xml-rpc api calls to replicate log
// - election_timeout. Timeout leader heartbeats (followers)
// - broadcast_timeout. To send heartbeat to followers (leader)
//--------------------------------------------------------------------------
2017-04-30 20:56:47 +03:00
static const time_t timer_period_ms ;
2017-05-02 02:38:30 +03:00
2017-05-02 18:57:55 +03:00
time_t purge_period_ms ;
2017-04-30 20:56:47 +03:00
2017-05-02 18:57:55 +03:00
time_t xmlrpc_timeout_ms ;
2017-04-30 20:56:47 +03:00
struct timespec election_timeout ;
struct timespec broadcast_timeout ;
2017-04-30 00:25:53 +03:00
2017-04-27 02:03:44 +03:00
//--------------------------------------------------------------------------
// Volatile log index variables
// - commit, highest log known to be committed
2017-04-27 12:12:30 +03:00
// - applied, highest log applied to DB (in LogDB)
2017-04-27 02:03:44 +03:00
//
//---------------------------- LEADER VARIABLES ----------------------------
//
// - next, next log to send to each follower <follower, next>
// - match, highest log replicated in this server <follower, match>
2017-05-02 02:38:30 +03:00
// - servers, list of servers in zone and xml-rpc edp <follower, edp>
2017-04-27 02:03:44 +03:00
// -------------------------------------------------------------------------
2017-05-15 00:48:46 +03:00
RaftReplicaManager replica_manager ;
2017-04-27 02:03:44 +03:00
unsigned int commit ;
std : : map < unsigned int , unsigned int > next ;
std : : map < unsigned int , unsigned int > match ;
2017-04-30 20:56:47 +03:00
std : : map < unsigned int , std : : string > servers ;
2017-04-25 18:15:31 +03:00
// -------------------------------------------------------------------------
// Action Listener interface
// -------------------------------------------------------------------------
/**
* Termination function
*/
void finalize_action ( const ActionRequest & ar ) ;
2017-04-30 00:25:53 +03:00
/**
* This function is executed periodically to purge the state log
*/
void timer_action ( const ActionRequest & ar ) ;
2017-04-27 13:42:09 +03:00
/**
* @ param s the state to check
* @ return true if the server states matches the provided one
*/
bool test_state ( State s )
{
bool _is_state ;
pthread_mutex_lock ( & mutex ) ;
_is_state = state = = s ;
pthread_mutex_unlock ( & mutex ) ;
return _is_state ;
}
2017-04-30 20:56:47 +03:00
2017-05-02 02:38:30 +03:00
// -------------------------------------------------------------------------
2017-05-02 18:57:55 +03:00
// Internal Raft functions
2017-05-02 02:38:30 +03:00
// -------------------------------------------------------------------------
2017-04-30 20:56:47 +03:00
/**
2017-05-02 18:57:55 +03:00
* Send the heartbeat to the followers .
2017-04-30 20:56:47 +03:00
*/
void send_heartbeat ( ) ;
2017-05-03 21:04:42 +03:00
/**
* Request votes of followers
*/
void request_vote ( ) ;
/**
* Makes this server leader , and start replica threads
*/
void leader ( ) ;
2017-04-25 18:15:31 +03:00
} ;
# endif /*RAFT_MANAGER_H_*/