1
0
mirror of https://github.com/OpenNebula/one.git synced 2025-01-22 22:03:39 +03:00

F #4809: Forward request to leader

This commit is contained in:
Ruben S. Montero 2017-05-08 19:48:41 +02:00
parent 59cf651dd6
commit 590b3548e9
6 changed files with 193 additions and 125 deletions

View File

@ -75,6 +75,29 @@ public:
*/
static int read_oneauth(std::string &secret, std::string& error);
/**
* Performs a xmlrpc call to the initialized server
* @param method name
* @param plist initialized param list
* @param result of the xmlrpc call
*/
void call(const std::string& method, const xmlrpc_c::paramList& plist,
xmlrpc_c::value * const result);
/**
* Performs a xmlrpc call
* @param endpoint of server
* @param method name
* @param plist initialized param list
* @param timeout for the request, set 0 for global xml_rpc timeout
* @param result of the xmlrpc call
* @param error string if any
* @return 0
*/
static int call(const std::string& endpoint, const std::string& method,
const xmlrpc_c::paramList& plist, long long _timeout,
xmlrpc_c::value * const result, std::string& error);
/**
* Performs an xmlrpc call to the initialized server and credentials.
* This method automatically adds the credential argument.
@ -87,14 +110,6 @@ public:
void call(const std::string &method, const std::string format,
xmlrpc_c::value * const result, ...);
/**
* Performs a xmlrpc call to the initialized server
* @param method name
* @param plist initialized param list
* @param result of the xmlrpc call
*/
void call(const std::string& method, const xmlrpc_c::paramList& plist,
xmlrpc_c::value * const result);
private:
/**
* Creates a new xml-rpc client with specified options.

View File

@ -208,6 +208,13 @@ public:
return _index;
}
/**
* Gets the endpoint for xml-rpc calls of the current leader
* @param endpoint
* @return 0 on success, -1 if no leader found
*/
int get_leader_endpoint(std::string& endpoint);
// -------------------------------------------------------------------------
// XML-RPC Raft API calls
// -------------------------------------------------------------------------

View File

@ -179,6 +179,8 @@ protected:
bool leader_only; //Method can be only execute by leaders or solo servers
long long xmlrpc_timeout;
/* ---------------------------------------------------------------------- */
/* Class Constructors */
/* ---------------------------------------------------------------------- */
@ -193,6 +195,9 @@ protected:
log_method_call = true;
leader_only = true;
//TODO Get this from oned.conf
xmlrpc_timeout = 500;
};
virtual ~Request(){};

View File

@ -223,3 +223,52 @@ void Client::call(const std::string& method, const xmlrpc_c::paramList& plist,
girerr::error(failure.getDescription());
}
};
/* -------------------------------------------------------------------------- */
/* -------------------------------------------------------------------------- */
int Client::call(const std::string& endpoint, const std::string& method,
const xmlrpc_c::paramList& plist, long long _timeout,
xmlrpc_c::value * const result, std::string& error)
{
xmlrpc_c::carriageParm_curl0 carriage(endpoint);
xmlrpc_c::clientXmlTransport_curl transport;
xmlrpc_c::client_xml client(&transport);
xmlrpc_c::rpcPtr rpc_client(method, plist);
int xml_rc = 0;
try
{
rpc_client->start(&client, &carriage);
client.finishAsync(xmlrpc_c::timeout(_timeout));
if (!rpc_client->isFinished())
{
rpc_client->finishErr(girerr::error("XMLRPC method "+ method
+ " timeout, resetting call"));
}
if ( rpc_client->isSuccessful() )
{
*result = rpc_client->getResult();
}
else //RPC failed
{
xmlrpc_c::fault failure = rpc_client->getFault();
error = failure.getDescription();
xml_rc = -1;
}
}
catch (exception const& e)
{
error = e.what();
xml_rc = -1;
}
return xml_rc;
}

View File

@ -207,6 +207,39 @@ static unsigned int get_zone_servers(std::map<unsigned int, std::string>& _serv)
return _num_servers;
}
int RaftManager::get_leader_endpoint(std::string& endpoint)
{
int rc;
pthread_mutex_lock(&mutex);
if ( leader_id == -1 )
{
rc = -1;
}
else
{
std::map<unsigned int, std::string>::iterator it;
it = servers.find(leader_id);
if ( it == servers.end() )
{
rc = -1;
}
else
{
endpoint = it->second;
rc = 0;
}
}
pthread_mutex_unlock(&mutex);
return rc;
}
/* -------------------------------------------------------------------------- */
/* -------------------------------------------------------------------------- */
void RaftManager::add_server(unsigned int follower_id)
@ -909,8 +942,6 @@ int RaftManager::xmlrpc_replicate_log(int follower_id, LogDBRecord * lr,
static const std::string replica_method = "one.zone.replicate";
std::ostringstream ess;
std::string secret;
std::string follower_edp;
@ -947,14 +978,9 @@ int RaftManager::xmlrpc_replicate_log(int follower_id, LogDBRecord * lr,
return -1;
}
xmlrpc_c::carriageParm_curl0 carriage(follower_edp);
xmlrpc_c::value result;
xmlrpc_c::paramList replica_params;
xmlrpc_c::clientXmlTransport_curl transport;
xmlrpc_c::client_xml client(&transport);
replica_params.add(xmlrpc_c::value_string(secret));
replica_params.add(xmlrpc_c::value_int(_server_id));
replica_params.add(xmlrpc_c::value_int(_commit));
@ -965,63 +991,37 @@ int RaftManager::xmlrpc_replicate_log(int follower_id, LogDBRecord * lr,
replica_params.add(xmlrpc_c::value_int(lr->prev_term));
replica_params.add(xmlrpc_c::value_string(lr->sql));
xmlrpc_c::rpcPtr rpc_client(replica_method, replica_params);
// -------------------------------------------------------------------------
// Do the XML-RPC call
// -------------------------------------------------------------------------
try
xml_rc = Client::client()->call(follower_edp, replica_method, replica_params,
xmlrpc_timeout_ms, &result, error);
if ( xml_rc == 0 )
{
rpc_client->start(&client, &carriage);
vector<xmlrpc_c::value> values;
client.finishAsync(xmlrpc_c::timeout(xmlrpc_timeout_ms));
values = xmlrpc_c::value_array(result).vectorValueValue();
success = xmlrpc_c::value_boolean(values[0]);
if (!rpc_client->isFinished())
if ( success ) //values[2] = error code (string)
{
rpc_client->finishErr(girerr::error("XMLRPC method "+replica_method
+ " timeout, resetting call"));
fterm = xmlrpc_c::value_int(values[1]);
}
if ( rpc_client->isSuccessful() )
else
{
vector<xmlrpc_c::value> values;
xmlrpc_c::value result = rpc_client->getResult();
values = xmlrpc_c::value_array(result).vectorValueValue();
success = xmlrpc_c::value_boolean(values[0]);
if ( success ) //values[2] = error code (string)
{
fterm = xmlrpc_c::value_int(values[1]);
}
else
{
error = xmlrpc_c::value_string(values[1]);
fterm = xmlrpc_c::value_int(values[3]);
}
}
else //RPC failed, will retry on next replication request
{
xmlrpc_c::fault failure = rpc_client->getFault();
ess << "Error replicating log entry " << lr->index
<< " on follower " << follower_id << ": "
<< failure.getDescription();
error = ess.str();
xml_rc = -1;
error = xmlrpc_c::value_string(values[1]);
fterm = xmlrpc_c::value_int(values[3]);
}
}
catch (exception const& e)
else
{
ess << "Error exception replicating log entry " << lr->index
<< " on follower " << follower_id << ": " << e.what();
std::ostringstream ess;
error = ess.str();
ess << "Error replicating log entry " << lr->index << " on follower "
<< follower_id << ": " << error;
xml_rc = -1;
error = ess.str();
}
return xml_rc;
@ -1039,8 +1039,6 @@ int RaftManager::xmlrpc_request_vote(int follower_id, unsigned int lindex,
static const std::string replica_method = "one.zone.voterequest";
std::ostringstream ess;
std::string secret;
std::string follower_edp;
@ -1076,76 +1074,46 @@ int RaftManager::xmlrpc_request_vote(int follower_id, unsigned int lindex,
return -1;
}
xmlrpc_c::carriageParm_curl0 carriage(follower_edp);
xmlrpc_c::value result;
xmlrpc_c::paramList replica_params;
xmlrpc_c::clientXmlTransport_curl transport;
xmlrpc_c::client_xml client(&transport);
replica_params.add(xmlrpc_c::value_string(secret));
replica_params.add(xmlrpc_c::value_int(_term));
replica_params.add(xmlrpc_c::value_int(_server_id));
replica_params.add(xmlrpc_c::value_int(lindex));
replica_params.add(xmlrpc_c::value_int(lterm));
xmlrpc_c::rpcPtr rpc_client(replica_method, replica_params);
// -------------------------------------------------------------------------
// Do the XML-RPC call
// -------------------------------------------------------------------------
try
xml_rc = Client::client()->call(follower_edp, replica_method, replica_params,
xmlrpc_timeout_ms, &result, error);
if ( xml_rc == 0 )
{
rpc_client->start(&client, &carriage);
vector<xmlrpc_c::value> values;
client.finishAsync(xmlrpc_c::timeout(xmlrpc_timeout_ms));
values = xmlrpc_c::value_array(result).vectorValueValue();
success = xmlrpc_c::value_boolean(values[0]);
if (!rpc_client->isFinished())
if ( success ) //values[2] = error code (string)
{
rpc_client->finishErr(girerr::error("XMLRPC method "+replica_method
+ " timeout, resetting call"));
fterm = xmlrpc_c::value_int(values[1]);
}
if ( rpc_client->isSuccessful() )
else
{
vector<xmlrpc_c::value> values;
xmlrpc_c::value result = rpc_client->getResult();
values = xmlrpc_c::value_array(result).vectorValueValue();
success = xmlrpc_c::value_boolean(values[0]);
if ( success ) //values[2] = error code (string)
{
fterm = xmlrpc_c::value_int(values[1]);
}
else
{
error = xmlrpc_c::value_string(values[1]);
fterm = xmlrpc_c::value_int(values[3]);
}
}
else //RPC failed, vote not granted
{
xmlrpc_c::fault failure = rpc_client->getFault();
ess << "Error requesting vote from follower " << follower_id << ": "
<< failure.getDescription();
error = ess.str();
xml_rc = -1;
error = xmlrpc_c::value_string(values[1]);
fterm = xmlrpc_c::value_int(values[3]);
}
}
catch (exception const& e)
else
{
ess << "Error requesting vote from follower " << follower_id << ": "
<< e.what();
std::ostringstream ess;
error = ess.str();
ess << "Error requesting vote from follower "<< follower_id << ":"
<< error;
xml_rc = -1;
error = ess.str();
}
return xml_rc;

View File

@ -16,6 +16,7 @@
#include "Request.h"
#include "Nebula.h"
#include "Client.h"
#include "PoolObjectAuth.h"
@ -270,12 +271,15 @@ void Request::execute(
att.retval = _retval;
att.session = xmlrpc_c::value_string (_paramList.getString(0));
att.req_id = (reinterpret_cast<uintptr_t>(this) * rand()) % 10000;
att.req_id = (reinterpret_cast<uintptr_t>(this) * rand()) % 10000;
Nebula& nd = Nebula::instance();
Nebula& nd = Nebula::instance();
UserPool* upool = nd.get_upool();
RaftManager * raftm = nd.get_raftm();
UserPool* upool = nd.get_upool();
bool authenticated = upool->authenticate(att.session, att.password,
att.uid, att.gid, att.uname, att.gname, att.group_ids, att.umask);
if ( log_method_call )
{
@ -283,10 +287,40 @@ void Request::execute(
hidden_params);
}
if ( authenticated == false )
{
failure_response(AUTHENTICATION, att);
log_result(att, method_name);
return;
}
if ( raftm->is_follower() && leader_only)
{
att.resp_msg = "Cannot process request, server is a follower";
failure_response(INTERNAL, att);
string leader_endpoint, error;
if ( raftm->get_leader_endpoint(leader_endpoint) != 0 )
{
att.resp_msg = "Cannot process request, not leader found";
failure_response(INTERNAL, att);
log_result(att, method_name);
return;
}
int rc = Client::call(leader_endpoint, method_name, _paramList,
xmlrpc_timeout, _retval, att.resp_msg);
if ( rc != 0 )
{
failure_response(INTERNAL, att);
log_result(att, method_name);
return;
}
}
else if ( raftm->is_candidate() && leader_only)
{
@ -295,17 +329,7 @@ void Request::execute(
}
else //leader or solo or !leader_only
{
bool authenticated = upool->authenticate(att.session, att.password,
att.uid, att.gid, att.uname, att.gname, att.group_ids, att.umask);
if ( authenticated == false )
{
failure_response(AUTHENTICATION, att);
}
else
{
request_execute(_paramList, att);
}
request_execute(_paramList, att);
}
if ( log_method_call )