1
0
mirror of https://github.com/OpenNebula/one.git synced 2025-04-02 10:50:07 +03:00

F #4809: Raft servers / zone slave lists updated on server-add in the

rigth place (leaders/masters)
This commit is contained in:
Ruben S. Montero 2017-05-23 13:31:18 +02:00
parent afca14d75c
commit a216bb9eb4
7 changed files with 230 additions and 63 deletions

View File

@ -261,8 +261,9 @@ public:
* Adds a new server to the follower list and starts associated replica
* thread.
* @param follower_id id of new server
* @param xmlep xmlrpc endpoint for new server
*/
void add_server(int follower_id);
void add_server(int follower_id, const std::string& xmlep);
/**
* Deletes a new server to the follower list and stops associated replica

View File

@ -33,9 +33,10 @@ protected:
virtual ~RequestManagerUpdateDB(){};
/* -------------------------------------------------------------------- */
/* ---------------------------------------------------------------------- */
void request_execute(xmlrpc_c::paramList const& pl, RequestAttributes& att)
virtual void request_execute(xmlrpc_c::paramList const& pl,
RequestAttributes& att)
{
int oid = xmlrpc_c::value_int(pl.getInt(1));
std::string xml = xmlrpc_c::value_string(pl.getString(2));
@ -46,13 +47,30 @@ protected:
return;
}
ErrorCode ec = request_execute(oid, xml, att);
if ( ec == SUCCESS )
{
success_response(oid, att);
}
else
{
failure_response(ec, att);
}
}
/* ---------------------------------------------------------------------- */
/* ---------------------------------------------------------------------- */
ErrorCode request_execute(int oid, const std::string& xml,
RequestAttributes& att)
{
PoolObjectSQL * object = pool->get(oid,true);
if ( object == 0 )
{
att.resp_id = oid;
failure_response(NO_EXISTS, att);
return;
return NO_EXISTS;
}
string old_xml;
@ -62,35 +80,32 @@ protected:
if ( object->from_xml(xml) != 0 )
{
object->from_xml(old_xml);
object->unlock();
att.resp_msg = "Cannot update object from XML";
failure_response(INTERNAL, att);
object->unlock();
return;
return INTERNAL;
}
if ( object->get_oid() != oid )
{
object->from_xml(old_xml);
object->unlock();
att.resp_msg = "Consistency check failed";
failure_response(INTERNAL, att);
object->unlock();
return;
return INTERNAL;
}
pool->update(object);
object->unlock();
success_response(oid, att);
return;
return SUCCESS;
}
};
/* -------------------------------------------------------------------------- */
/* -------------------------------------------------------------------------- */
class MarketPlaceAppUpdateDB : public RequestManagerUpdateDB
{
public:
@ -103,6 +118,8 @@ public:
~MarketPlaceAppUpdateDB(){};
};
/* -------------------------------------------------------------------------- */
class MarketPlaceUpdateDB : public RequestManagerUpdateDB
{
public:
@ -115,4 +132,45 @@ public:
~MarketPlaceUpdateDB(){};
};
/* -------------------------------------------------------------------------- */
class ZoneUpdateDB : public RequestManagerUpdateDB
{
public:
ZoneUpdateDB():RequestManagerUpdateDB("one.zone.updatedb")
{
auth_object = PoolObjectSQL::ZONE;
pool = Nebula::instance().get_zonepool();
}
~ZoneUpdateDB(){};
virtual void request_execute(xmlrpc_c::paramList const& pl,
RequestAttributes& att)
{
int oid = xmlrpc_c::value_int(pl.getInt(1));
std::string xml = xmlrpc_c::value_string(pl.getString(2));
if ( att.uid != UserPool::ONEADMIN_ID )
{
failure_response(AUTHORIZATION, att);
return;
}
ErrorCode ec = RequestManagerUpdateDB::request_execute(oid, xml, att);
if ( ec == SUCCESS )
{
std::vector<int> zids;
success_response(oid, att);
Nebula::instance().get_frm()->update_zones(zids);
}
else
{
failure_response(ec, att);
}
}
};
#endif /* REQUEST_MANAGER_UPDATE_DB_H */

View File

@ -50,11 +50,12 @@ public:
* Add servers to this zone
* @param tmpl with SERVER definitions
* @param sid id of the new sever
* @param xmlep endpoint of the new server
* @param error
*
* @return 0 on success, -1 otherwise
*/
int add_server(Template& tmpl, int& sid, string& error);
int add_server(Template& tmpl, int& sid, string& xmlep, string& error);
/**
* Delete a server from this zone

View File

@ -276,7 +276,7 @@ int RaftManager::get_leader_endpoint(std::string& endpoint)
/* -------------------------------------------------------------------------- */
/* -------------------------------------------------------------------------- */
void RaftManager::add_server(int follower_id)
void RaftManager::add_server(int follower_id, const std::string& endpoint)
{
LogDB * logdb = Nebula::instance().get_logdb();
@ -284,14 +284,10 @@ void RaftManager::add_server(int follower_id)
logdb->get_last_record_index(log_index, log_term);
std::map<int, std::string> _servers;
unsigned int _num_servers = get_zone_servers(_servers);
pthread_mutex_lock(&mutex);
num_servers = _num_servers;
servers = _servers;
num_servers++;
servers.insert(std::make_pair(follower_id, endpoint));
next.insert(std::make_pair(follower_id, log_index + 1));
@ -308,12 +304,10 @@ void RaftManager::delete_server(int follower_id)
{
std::map<int, std::string> _servers;
unsigned int _num_servers = get_zone_servers(_servers);
pthread_mutex_lock(&mutex);
num_servers = _num_servers;
servers = _servers;
num_servers--;
servers.erase(follower_id);
next.erase(follower_id);

View File

@ -759,8 +759,6 @@ void RequestManager::register_xml_methods()
xmlrpc_c::method * zone_update_pt;
xmlrpc_c::method * zone_delete_pt;
xmlrpc_c::method * zone_rename_pt;
xmlrpc_c::method * zone_addserver_pt;
xmlrpc_c::method * zone_delserver_pt;
if (nebula.is_federation_slave())
{
@ -768,8 +766,6 @@ void RequestManager::register_xml_methods()
zone_update_pt = new RequestManagerProxy("one.zone.update");
zone_delete_pt = new RequestManagerProxy("one.zone.delete");
zone_rename_pt = new RequestManagerProxy("one.zone.rename");
zone_addserver_pt = new RequestManagerProxy("one.zone.addserver");
zone_delserver_pt = new RequestManagerProxy("one.zone.delserver");
}
else
{
@ -777,16 +773,18 @@ void RequestManager::register_xml_methods()
zone_update_pt = new ZoneUpdateTemplate();
zone_delete_pt = new ZoneDelete();
zone_rename_pt = new ZoneRename();
zone_addserver_pt = new ZoneAddServer();
zone_delserver_pt = new ZoneDeleteServer();
xmlrpc_c::methodPtr zone_updatedb(new ZoneUpdateDB());
RequestManagerRegistry.addMethod("one.zone.updatedb", zone_updatedb);
}
xmlrpc_c::methodPtr zone_allocate(zone_allocate_pt);
xmlrpc_c::methodPtr zone_update(zone_update_pt);
xmlrpc_c::methodPtr zone_delete(zone_delete_pt);
xmlrpc_c::methodPtr zone_rename(zone_rename_pt);
xmlrpc_c::methodPtr zone_addserver(zone_addserver_pt);
xmlrpc_c::methodPtr zone_delserver(zone_delserver_pt);
xmlrpc_c::methodPtr zone_addserver(new ZoneAddServer());
xmlrpc_c::methodPtr zone_delserver(new ZoneDeleteServer());
xmlrpc_c::methodPtr zone_replicatelog(new ZoneReplicateLog());
xmlrpc_c::methodPtr zone_voterequest(new ZoneVoteRequest());
xmlrpc_c::methodPtr zone_raftstatus(new ZoneRaftStatus());

View File

@ -16,6 +16,47 @@
#include "RequestManagerZone.h"
#include "Nebula.h"
#include "Client.h"
/* -------------------------------------------------------------------------- */
/* -------------------------------------------------------------------------- */
static Request::ErrorCode master_update_zone(int oid, const std::string& xml,
RequestAttributes& att)
{
Client * client = Client::client();
xmlrpc_c::value result;
vector<xmlrpc_c::value> values;
std::ostringstream oss("Cannot update zone at federation master: ",
std::ios::ate);
try
{
client->call("one.zone.updatedb", "is", &result, oid, xml.c_str());
}
catch (exception const& e)
{
oss << e.what();
att.resp_msg = oss.str();
return Request::ACTION;
}
values = xmlrpc_c::value_array(result).vectorValueValue();
if ( xmlrpc_c::value_boolean(values[0]) == false )
{
std::string e = xmlrpc_c::value_string(values[1]);
oss << e;
att.resp_msg = oss.str();
return Request::ACTION;
}
return Request::SUCCESS;
}
/* -------------------------------------------------------------------------- */
/* -------------------------------------------------------------------------- */
@ -23,11 +64,22 @@
void ZoneAddServer::request_execute(xmlrpc_c::paramList const& paramList,
RequestAttributes& att)
{
Nebula& nd = Nebula::instance();
int id = xmlrpc_c::value_int(paramList.getInt(1));
string zs_str = xmlrpc_c::value_string(paramList.getString(2));
int zs_id;
string error_str;
string error_str, xmlep;
if ( id != nd.get_zone_id() )
{
att.resp_msg = "Servers have to be added through the target zone"
" endpoints";
failure_response(ACTION, att);
return;
}
if ( basic_authorization(id, att) == false )
{
@ -56,21 +108,45 @@ void ZoneAddServer::request_execute(xmlrpc_c::paramList const& paramList,
return;
}
if ( zone->add_server(zs_tmpl, zs_id, att.resp_msg) == -1 )
if ( zone->add_server(zs_tmpl, zs_id, xmlep, att.resp_msg) == -1 )
{
failure_response(ACTION, att);
return;
}
pool->update(zone);
if ( nd.is_federation_master() || !nd.is_federation_enabled() )
{
std::vector<int> zids;
zone->unlock();
pool->update(zone);
std::vector<int> zids;
zone->unlock();
Nebula::instance().get_raftm()->add_server(zs_id);
Nebula::instance().get_frm()->update_zones(zids);
nd.get_frm()->update_zones(zids);
}
else
{
std::string tmpl_xml;
int oid = zone->get_oid();
zone->to_xml(tmpl_xml);
ErrorCode ec = master_update_zone(oid, tmpl_xml, att);
zone->unlock();
if ( ec != SUCCESS )
{
NebulaLog::log("ReM", Log::ERROR, att.resp_msg);
failure_response(ec, att);
return;
}
}
nd.get_raftm()->add_server(zs_id, xmlep);
success_response(id, att);
}
@ -81,11 +157,22 @@ void ZoneAddServer::request_execute(xmlrpc_c::paramList const& paramList,
void ZoneDeleteServer::request_execute(xmlrpc_c::paramList const& paramList,
RequestAttributes& att)
{
int id = xmlrpc_c::value_int(paramList.getInt(1));
int zs_id = xmlrpc_c::value_int(paramList.getInt(2));
Nebula& nd = Nebula::instance();
int id = xmlrpc_c::value_int(paramList.getInt(1));
int zs_id = xmlrpc_c::value_int(paramList.getInt(2));
string error_str;
if ( id != nd.get_zone_id() )
{
att.resp_msg = "Servers have to be deleted through the target zone"
" endpoints";
failure_response(ACTION, att);
return;
}
if ( basic_authorization(id, att) == false )
{
return;
@ -109,14 +196,38 @@ void ZoneDeleteServer::request_execute(xmlrpc_c::paramList const& paramList,
return;
}
pool->update(zone);
if ( nd.is_federation_master() )
{
std::vector<int> zids;
zone->unlock();
pool->update(zone);
std::vector<int> zids;
zone->unlock();
Nebula::instance().get_raftm()->delete_server(zs_id);
Nebula::instance().get_frm()->update_zones(zids);
nd.get_frm()->update_zones(zids);
}
else
{
std::string tmpl_xml;
int oid = zone->get_oid();
zone->to_xml(tmpl_xml);
ErrorCode ec = master_update_zone(oid, tmpl_xml, att);
zone->unlock();
if ( ec != SUCCESS )
{
NebulaLog::log("ReM", Log::ERROR, att.resp_msg);
failure_response(ec, att);
return;
}
}
nd.get_raftm()->delete_server(zs_id);
success_response(id, att);
}

View File

@ -300,7 +300,7 @@ int Zone::post_update_template(string& error)
/* -------------------------------------------------------------------------- */
/* -------------------------------------------------------------------------- */
int Zone::add_server(Template& tmpl, int& sid, string& error)
int Zone::add_server(Template& tmpl, int& sid, string& xmlep, string& error)
{
vector<VectorAttribute *> vs;
vector<VectorAttribute *>::iterator it;
@ -309,22 +309,26 @@ int Zone::add_server(Template& tmpl, int& sid, string& error)
sid = -1;
tmpl.get(ZoneServers::SERVER_NAME, vs);
const VectorAttribute * tmpl_server = tmpl.get(ZoneServers::SERVER_NAME);
for ( it = vs.begin() ; it != vs.end() ; ++it )
if ( tmpl_server == 0 )
{
server = new VectorAttribute(*it);
if ( servers->add_server(server, sid, error) == -1 )
{
delete server;
return -1;
}
servers_template.set(server);
return -1;
}
server = new VectorAttribute(tmpl_server);
if ( servers->add_server(server, sid, error) == -1 )
{
delete server;
return -1;
}
xmlep = server->vector_value("ENDPOINT");
servers_template.set(server);
return 0;
}