1
0
mirror of https://github.com/OpenNebula/one.git synced 2025-01-03 01:17:41 +03:00

F #4809: First version of replication thread for federated zones

This commit is contained in:
Ruben S. Montero 2017-05-19 20:08:45 +02:00
parent c5a82aba4e
commit c5f54f8117
8 changed files with 318 additions and 34 deletions

View File

@ -61,6 +61,31 @@ public:
*/
int apply_log_record(int index, const std::string& sql);
/**
* Record was successfully replicated on zone, increase next index and
* send any pending records.
* @param zone_id
*/
void replicate_success(int zone_id);
/**
* Record could not be replicated on zone, decrease next index and
* send any pending records.
* @param zone_id
*/
void replicate_failure(int zone_id, int zone_last);
/**
* XML-RPC API call to replicate a log entry on slaves
* @param zone_id
* @param success status of API call
* @param last index replicate in zone slave
* @param error description if any
* @return 0 on success -1 if a xml-rpc/network error occurred
*/
int xmlrpc_replicate_log(int zone_id, bool& success, int& last,
std::string& err);
/**
* Finalizes the Federation Replica Manager
*/
@ -93,6 +118,9 @@ public:
*/
static int bootstrap(SqlDB *_db);
/**
* @return the id of fed. replica thread
*/
pthread_t get_thread_id() const
{
return frm_thread;
@ -132,7 +160,7 @@ private:
// Synchronization variables
// - last_index in the replication log
// - zones list of zones in the federation with:
// - list of servers
// - list of servers <id, xmlrpc endpoint>
// - next index to send to this zone
// -------------------------------------------------------------------------
struct ZoneServers
@ -190,7 +218,6 @@ private:
*/
int get_log_record(int index, std::string& sql);
/**
* Inserts a new record in the log ans updates the last_index variable
* (memory and db)
@ -206,6 +233,16 @@ private:
* @return 0 on success
*/
int get_last_index(unsigned int& index);
/**
* Get the nest record to replicate in a zone
* @param zone_id of the zone
* @param index of the next record to send
* @param sql command to replicate
* @return 0 on success, -1 otherwise
*/
int get_next_record(int zone_id, int& index, std::string& sql,
std::map<int, std::string>& zservers);
};
#endif /*FED_REPLICA_MANAGER_H_*/

View File

@ -130,10 +130,16 @@ private:
RaftManager * raftm;
};
// -----------------------------------------------------------------------------
// Federation replica thread. It replicates SQL commands on zone slaves for
// federated pools
// -----------------------------------------------------------------------------
class FedReplicaManager;
class FedReplicaThread : public ReplicaThread
{
public:
FedReplicaThread(int zone_id):ReplicaThread(zone_id){};
FedReplicaThread(int zone_id);
virtual ~FedReplicaThread(){};
@ -141,11 +147,12 @@ private:
/**
* Specific logic for the replicate process
*/
int replicate()
{
//TODO
return 0;
};
int replicate();
/**
* Pointers to other components
*/
FedReplicaManager * frm;
};
#endif

View File

@ -137,4 +137,23 @@ public:
RequestAttributes& att);
};
/* -------------------------------------------------------------------------- */
/* -------------------------------------------------------------------------- */
class ZoneReplicateFedLog : public RequestManagerZone
{
public:
ZoneReplicateFedLog():
RequestManagerZone("one.zone.fedreplicate", "Replicate a fed log record",
"A:sis")
{
log_method_call = false;
};
~ZoneReplicateFedLog(){};
void request_execute(xmlrpc_c::paramList const& _paramList,
RequestAttributes& att);
};
#endif

View File

@ -17,6 +17,7 @@
#include "FedReplicaManager.h"
#include "ReplicaThread.h"
#include "Nebula.h"
#include "Client.h"
/* -------------------------------------------------------------------------- */
/* -------------------------------------------------------------------------- */
@ -76,6 +77,8 @@ FedReplicaManager::FedReplicaManager(time_t _t, time_t _p, SqlDB * d,
get_last_index(last_index);
// Replica threads are started on master in solo mode.
// HA start/stop the replica threads on leader/follower states will
if ( nd.is_federation_master() && s )
{
start_replica_threads(zone_ids);
@ -155,11 +158,11 @@ int FedReplicaManager::apply_log_record(int index, const std::string& sql)
std::ostringstream oss(sql);
rc = logdb->exec_wr(oss);
logdb->exec_wr(oss);
pthread_mutex_unlock(&mutex);
return rc;
return 0;
}
/* -------------------------------------------------------------------------- */
@ -206,7 +209,7 @@ int FedReplicaManager::start()
void FedReplicaManager::finalize_action(const ActionRequest& ar)
{
NebulaLog::log("RCM", Log::INFO, "Raft Consensus Manager...");
NebulaLog::log("FRM", Log::INFO, "Raft Consensus Manager...");
}
/* -------------------------------------------------------------------------- */
@ -307,6 +310,31 @@ void FedReplicaManager::timer_action(const ActionRequest& ar)
/* -------------------------------------------------------------------------- */
/* -------------------------------------------------------------------------- */
int FedReplicaManager::get_next_record(int zone_id, int& index, std::string& sql,
std::map<int, std::string>& zservers)
{
pthread_mutex_lock(&mutex);
std::map<int, ZoneServers *>::iterator it = zones.find(zone_id);
if ( it == zones.end() )
{
pthread_mutex_unlock(&mutex);
return -1;
}
index = it->second->next;
zservers = it->second->servers;
int rc = get_log_record(index, sql);
pthread_mutex_unlock(&mutex);
return rc;
}
/* -------------------------------------------------------------------------- */
int FedReplicaManager::get_log_record(int index, std::string& sql)
{
ostringstream oss;
@ -375,3 +403,130 @@ int FedReplicaManager::bootstrap(SqlDB *_db)
/* -------------------------------------------------------------------------- */
/* -------------------------------------------------------------------------- */
void FedReplicaManager::replicate_success(int zone_id)
{
pthread_mutex_lock(&mutex);
std::map<int, ZoneServers *>::iterator it = zones.find(zone_id);
if ( it == zones.end() )
{
pthread_mutex_unlock(&mutex);
return;
}
ZoneServers * zs = it->second;
zs->next++;
if ( last_index >= zs->next )
{
ReplicaManager::replicate(zone_id);
}
pthread_mutex_unlock(&mutex);
}
/* -------------------------------------------------------------------------- */
void FedReplicaManager::replicate_failure(int zone_id, int last_zone)
{
pthread_mutex_lock(&mutex);
std::map<int, ZoneServers *>::iterator it = zones.find(zone_id);
if ( it != zones.end() )
{
ZoneServers * zs = it->second;
if ( last_zone >= 0 )
{
zs->next = last_zone + 1;
}
}
ReplicaManager::replicate(zone_id);
pthread_mutex_unlock(&mutex);
}
/* -------------------------------------------------------------------------- */
/* -------------------------------------------------------------------------- */
int FedReplicaManager::xmlrpc_replicate_log(int zone_id, bool& success,
int& last, std::string& error)
{
static const std::string replica_method = "one.zone.fedreplicate";
int index;
std::string sql, secret;
std::map<int, std::string> zservers;
std::map<int, std::string>::iterator it;
int xml_rc = 0;
if ( get_next_record(zone_id, index, sql, zservers) != 0 )
{
error = "Failed to load federation log record";
return -1;
}
// -------------------------------------------------------------------------
// Get parameters to call append entries on follower
// -------------------------------------------------------------------------
if ( Client::read_oneauth(secret, error) == -1 )
{
return -1;
}
xmlrpc_c::value result;
xmlrpc_c::paramList replica_params;
replica_params.add(xmlrpc_c::value_string(secret));
replica_params.add(xmlrpc_c::value_int(index));
replica_params.add(xmlrpc_c::value_string(sql));
// -------------------------------------------------------------------------
// Do the XML-RPC call
// -------------------------------------------------------------------------
for (it=zservers.begin(); it != zservers.end(); ++it)
{
xml_rc = Client::client()->call(it->second, replica_method,
replica_params, xmlrpc_timeout_ms, &result, error);
if ( xml_rc == 0 )
{
vector<xmlrpc_c::value> values;
values = xmlrpc_c::value_array(result).vectorValueValue();
success = xmlrpc_c::value_boolean(values[0]);
if ( success ) //values[2] = error code (string)
{
last = xmlrpc_c::value_int(values[1]);
}
else
{
error = xmlrpc_c::value_string(values[1]);
last = xmlrpc_c::value_int(values[3]);
}
break;
}
else
{
std::ostringstream ess;
ess << "Error replicating log entry " << index << " on zone server "
<< it->second << ": " << error;
NebulaLog::log("FRM", Log::ERROR, error);
error = ess.str();
}
}
return xml_rc;
}

View File

@ -574,7 +574,10 @@ void RaftManager::replicate_failure(int follower_id)
if ( next_it != next.end() )
{
next_it->second = next_it->second - 1;
if ( next_it->second > 0 )
{
next_it->second = next_it->second - 1;
}
}
replica_manager.replicate(follower_id);

View File

@ -210,44 +210,39 @@ int RaftReplicaThread::replicate()
// -----------------------------------------------------------------------------
// -----------------------------------------------------------------------------
/*
void FederationReplicaThread::replicate()
FedReplicaThread::FedReplicaThread(int zone_id):ReplicaThread(zone_id)
{
Nebula& nd = Nebula::instance();
frm = nd.get_frm();
};
// -----------------------------------------------------------------------------
// -----------------------------------------------------------------------------
int FedReplicaThread::replicate()
{
std::string error;
LogDBRecord lr;
bool success = false;
unsigned int follower_term = -1;
int last;
int next_index = raftm->get_next_index(follower_id);
if ( fedlogdb->get_log_record(next_index, lr) != 0 )
{
ostringstream ess;
ess << "Failed to load log record at index: " << next_index;
NebulaLog::log("RCM", Log::ERROR, ess);
return;
}
if ( fedm->xmlrpc_replicate_log(follower_id, &lr, success, error) != 0 )
if ( frm->xmlrpc_replicate_log(follower_id, success, last, error) != 0 )
{
NebulaLog::log("FRM", Log::ERROR, error);
return -1;
}
if ( success )
{
fedm->replicate_success(follower_id);
frm->replicate_success(follower_id);
}
else
{
fedm->replicate_failure(follower_id);
frm->replicate_failure(follower_id, last);
}
return 0;
}
*/

View File

@ -790,6 +790,7 @@ void RequestManager::register_xml_methods()
xmlrpc_c::methodPtr zone_replicatelog(new ZoneReplicateLog());
xmlrpc_c::methodPtr zone_voterequest(new ZoneVoteRequest());
xmlrpc_c::methodPtr zone_raftstatus(new ZoneRaftStatus());
xmlrpc_c::methodPtr zone_fedreplicatelog(new ZoneReplicateFedLog());
xmlrpc_c::methodPtr zone_info(new ZoneInfo());
xmlrpc_c::methodPtr zonepool_info(new ZonePoolInfo());
@ -800,6 +801,7 @@ void RequestManager::register_xml_methods()
RequestManagerRegistry.addMethod("one.zone.info", zone_info);
RequestManagerRegistry.addMethod("one.zone.rename", zone_rename);
RequestManagerRegistry.addMethod("one.zone.replicate",zone_replicatelog);
RequestManagerRegistry.addMethod("one.zone.fedreplicate",zone_fedreplicatelog);
RequestManagerRegistry.addMethod("one.zone.voterequest",zone_voterequest);
RequestManagerRegistry.addMethod("one.zone.raftstatus", zone_raftstatus);

View File

@ -344,3 +344,69 @@ void ZoneRaftStatus::request_execute(xmlrpc_c::paramList const& paramList,
success_response(raft_xml, att);
}
/* -------------------------------------------------------------------------- */
/* -------------------------------------------------------------------------- */
void ZoneReplicateFedLog::request_execute(xmlrpc_c::paramList const& paramList,
RequestAttributes& att)
{
std::ostringstream oss;
Nebula& nd = Nebula::instance();
FedReplicaManager * frm = nd.get_frm();
int index = xmlrpc_c::value_int(paramList.getInt(1));
string sql = xmlrpc_c::value_string(paramList.getString(2));
if ( att.uid != 0 )
{
att.resp_id = -1;
failure_response(AUTHORIZATION, att);
return;
}
if ( !nd.is_federation_slave() )
{
oss << "Cannot replicate federate log records on federation master";
NebulaLog::log("ReM", Log::INFO, oss);
att.resp_msg = oss.str();
att.resp_id = - 1;
failure_response(ACTION, att);
}
int rc = frm->apply_log_record(index, sql);
if ( rc == 0 )
{
success_response(index, att);
}
else if ( rc < 0 )
{
oss << "Error replicating log entry " << index << "in zone";
NebulaLog::log("ReM", Log::INFO, oss);
att.resp_msg = oss.str();
att.resp_id = index - 1;
failure_response(ACTION, att);
}
else // rc == last_index in log
{
oss << "Zone log is outdated last log index is " << rc;
NebulaLog::log("ReM", Log::INFO, oss);
att.resp_msg = oss.str();
att.resp_id = rc;
failure_response(ACTION, att);
}
return;
}