1
0
mirror of https://github.com/OpenNebula/one.git synced 2025-03-11 04:58:16 +03:00

F #4809: Get fed index from the DB (needed by followers in HA). Use Zone

ENDPOINT to replicate log instead of server list. Fix bug when replicate
fails in a zone.
This commit is contained in:
Ruben S. Montero 2017-06-26 19:52:46 +02:00
parent d831d3ebf4
commit d5d6cb9667
3 changed files with 74 additions and 64 deletions

View File

@ -147,11 +147,16 @@ public:
};
/**
* Return the last index of the fed log
* @return the last index of the fed log (from DB to use this method in
* HA followers)
*/
int get_last_index() const
unsigned int get_last_index() const
{
return last_index;
unsigned int li;
get_last_index(li);
return li;
}
private:
@ -193,14 +198,14 @@ private:
// -------------------------------------------------------------------------
struct ZoneServers
{
ZoneServers(int z, unsigned int l, const std::map<int,std::string>& s):
zone_id(z), servers(s), next(l){};
ZoneServers(int z, unsigned int l, const std::string& s):
zone_id(z), endpoint(s), next(l){};
~ZoneServers(){};
int zone_id;
std::map<int, std::string> servers;
std::string endpoint;
unsigned int next;
};
@ -260,7 +265,7 @@ private:
* @param index
* @return 0 on success
*/
int get_last_index(unsigned int& index);
int get_last_index(unsigned int& index) const;
/**
* Get the nest record to replicate in a zone
@ -270,7 +275,7 @@ private:
* @return 0 on success, -1 otherwise
*/
int get_next_record(int zone_id, int& index, std::string& sql,
std::map<int, std::string>& zservers);
std::string& zservers);
};
#endif /*FED_REPLICA_MANAGER_H_*/

View File

@ -174,7 +174,7 @@ class OneZoneHelper < OpenNebulaHelper::OneHelper
end.show([zone_hash['ZONE']['SERVER_POOL']['SERVER']].flatten, {})
puts
CLIHelper.print_header(str_h1 % "RAFT & FEDERATION SYNC STATUS",false)
CLIHelper.print_header(str_h1 % "HA & FEDERATION SYNC STATUS",false)
CLIHelper::ShowTable.new(nil, self) do

View File

@ -180,8 +180,6 @@ void FedReplicaManager::update_zones(std::vector<int>& zone_ids)
vector<int>::iterator it;
std::map<int, std::string> zone_servers;
int zone_id = nd.get_zone_id();
if ( zpool->list_zones(zone_ids) != 0 )
@ -201,15 +199,26 @@ void FedReplicaManager::update_zones(std::vector<int>& zone_ids)
}
else
{
zpool->get_zone_servers(*it, zone_servers);
Zone * zone = zpool->get(*it, true);
ZoneServers * zs = new ZoneServers(*it, last_index, zone_servers);
if ( zone == 0 )
{
it = zone_ids.erase(it);
}
else
{
std::string zedp;
zones.insert(make_pair(*it, zs));
zone->get_template_attribute("ENDPOINT", zedp);
zone_servers.clear();
zone->unlock();
++it;
ZoneServers * zs = new ZoneServers(*it, last_index, zedp);
zones.insert(make_pair(*it, zs));
++it;
}
}
}
@ -223,16 +232,25 @@ void FedReplicaManager::add_zone(int zone_id)
{
std::ostringstream oss;
std::string zedp;
Nebula& nd = Nebula::instance();
ZonePool * zpool = nd.get_zonepool();
std::map<int, std::string> zone_servers;
Zone * zone = zpool->get(zone_id, true);
zpool->get_zone_servers(zone_id, zone_servers);
if ( zone == 0 )
{
return;
}
zone->get_template_attribute("ENDPOINT", zedp);
zone->unlock();
pthread_mutex_lock(&mutex);
ZoneServers * zs = new ZoneServers(zone_id, last_index, zone_servers);
ZoneServers * zs = new ZoneServers(zone_id, last_index, zedp);
zones.insert(make_pair(zone_id, zs));
@ -336,8 +354,8 @@ void FedReplicaManager::timer_action(const ActionRequest& ar)
/* -------------------------------------------------------------------------- */
/* -------------------------------------------------------------------------- */
int FedReplicaManager::get_next_record(int zone_id, int& index, std::string& sql,
std::map<int, std::string>& zservers)
int FedReplicaManager::get_next_record(int zone_id, int& index,
std::string& sql, std::string& zedp)
{
pthread_mutex_lock(&mutex);
@ -349,10 +367,10 @@ int FedReplicaManager::get_next_record(int zone_id, int& index, std::string& sql
return -1;
}
index = it->second->next;
zservers = it->second->servers;
index = it->second->next;
zedp = it->second->endpoint;
int rc = get_log_record(index, sql);
int rc = get_log_record(index, sql);
pthread_mutex_unlock(&mutex);
@ -400,7 +418,7 @@ int FedReplicaManager::insert_log_record(int index, const std::string& sql)
/* -------------------------------------------------------------------------- */
int FedReplicaManager::get_last_index(unsigned int& index)
int FedReplicaManager::get_last_index(unsigned int& index) const
{
ostringstream oss;
@ -477,7 +495,7 @@ void FedReplicaManager::replicate_failure(int zone_id, int last_zone)
if ( last_zone >= 0 )
{
zs->next = last_zone + 1;
zs->next = last_zone - 1;
}
}
@ -496,25 +514,17 @@ int FedReplicaManager::xmlrpc_replicate_log(int zone_id, bool& success,
static const std::string replica_method = "one.zone.fedreplicate";
int index;
std::string sql, secret;
std::map<int, std::string> zservers;
std::map<int, std::string>::iterator it;
std::string sql, secret, zedp;
int xml_rc = 0;
if ( get_next_record(zone_id, index, sql, zservers) != 0 )
if ( get_next_record(zone_id, index, sql, zedp) != 0 )
{
error = "Failed to load federation log record";
return -1;
}
if ( zservers.size() == 0 )
{
error = "No servers defined in the zone";
return -1;
}
// -------------------------------------------------------------------------
// Get parameters to call append entries on follower
// -------------------------------------------------------------------------
@ -533,42 +543,37 @@ int FedReplicaManager::xmlrpc_replicate_log(int zone_id, bool& success,
// -------------------------------------------------------------------------
// Do the XML-RPC call
// -------------------------------------------------------------------------
for (it=zservers.begin(); it != zservers.end(); ++it)
xml_rc = Client::client()->call(zedp, replica_method, replica_params,
xmlrpc_timeout_ms, &result, error);
if ( xml_rc == 0 )
{
xml_rc = Client::client()->call(it->second, replica_method,
replica_params, xmlrpc_timeout_ms, &result, error);
vector<xmlrpc_c::value> values;
if ( xml_rc == 0 )
values = xmlrpc_c::value_array(result).vectorValueValue();
success = xmlrpc_c::value_boolean(values[0]);
if ( success ) //values[2] = error code (string)
{
vector<xmlrpc_c::value> values;
values = xmlrpc_c::value_array(result).vectorValueValue();
success = xmlrpc_c::value_boolean(values[0]);
if ( success ) //values[2] = error code (string)
{
last = xmlrpc_c::value_int(values[1]);
}
else
{
error = xmlrpc_c::value_string(values[1]);
last = xmlrpc_c::value_int(values[3]);
}
break;
last = xmlrpc_c::value_int(values[1]);
}
else
{
std::ostringstream ess;
ess << "Error replicating log entry " << index << " on zone server "
<< it->second << ": " << error;
NebulaLog::log("FRM", Log::ERROR, error);
error = ess.str();
error = xmlrpc_c::value_string(values[1]);
last = xmlrpc_c::value_int(values[3]);
}
}
else
{
std::ostringstream ess;
ess << "Error replicating log entry " << index << " on zone "
<< zone_id << " (" << zedp << "): " << error;
NebulaLog::log("FRM", Log::ERROR, error);
error = ess.str();
}
return xml_rc;
}