mirror of
https://github.com/OpenNebula/one.git
synced 2025-01-11 05:17:41 +03:00
F #4809: Work on log management and replication
This commit is contained in:
parent
116425fc99
commit
dd0598aaa6
@ -248,6 +248,14 @@ protected:
|
||||
virtual ExtendedAttribute * attribute_factory(VectorAttribute * va,
|
||||
int id) const = 0;
|
||||
|
||||
/**
|
||||
* @return the number of elements in the set
|
||||
*/
|
||||
unsigned int size()
|
||||
{
|
||||
return a_set.size();
|
||||
}
|
||||
|
||||
/* ---------------------------------------------------------------------- */
|
||||
/* ---------------------------------------------------------------------- */
|
||||
|
||||
|
@ -27,19 +27,22 @@
|
||||
class LogDB : public SqlDB, Callbackable
|
||||
{
|
||||
public:
|
||||
LogDB(SqlDB * _db):db(_db), next_index(0), term(0){};
|
||||
|
||||
virtual ~LogDB(){};
|
||||
|
||||
void set_term(unsigned int t)
|
||||
LogDB(SqlDB * _db):db(_db), next_index(0)
|
||||
{
|
||||
term = t;
|
||||
}
|
||||
pthread_mutex_init(&mutex, 0);
|
||||
};
|
||||
|
||||
void set_index(unsigned int i)
|
||||
virtual ~LogDB()
|
||||
{
|
||||
next_index = i;
|
||||
}
|
||||
std::map<unsigned int, LogDBRequest *>::iterator it;
|
||||
|
||||
for ( it = requests.begin(); it != requests.end(); ++it )
|
||||
{
|
||||
delete it->second;
|
||||
}
|
||||
|
||||
delete db;
|
||||
};
|
||||
|
||||
/**
|
||||
* Return the request associated to the given logdb record. If there is
|
||||
@ -59,12 +62,12 @@ public:
|
||||
*/
|
||||
int exec_wr(ostringstream& cmd);
|
||||
|
||||
virtual int exec_bootstrap(ostringstream& cmd)
|
||||
int exec_bootstrap(ostringstream& cmd)
|
||||
{
|
||||
return db->exec_bootstrap(cmd);
|
||||
}
|
||||
|
||||
virtual int exec_rd(ostringstream& cmd, Callbackable* obj)
|
||||
int exec_rd(ostringstream& cmd, Callbackable* obj)
|
||||
{
|
||||
return db->exec_rd(cmd, obj);
|
||||
}
|
||||
@ -84,6 +87,16 @@ public:
|
||||
return db->multiple_values_support();
|
||||
}
|
||||
|
||||
// -------------------------------------------------------------------------
|
||||
// Database methods
|
||||
// -------------------------------------------------------------------------
|
||||
int bootstrap()
|
||||
{
|
||||
ostringstream oss(db_bootstrap);
|
||||
|
||||
return db->exec_bootstrap(oss);
|
||||
}
|
||||
|
||||
protected:
|
||||
int exec(ostringstream& cmd, Callbackable* obj, bool quiet)
|
||||
{
|
||||
@ -91,6 +104,8 @@ protected:
|
||||
}
|
||||
|
||||
private:
|
||||
pthread_mutex_t mutex;
|
||||
|
||||
/**
|
||||
* Pointer to the underlying DB store
|
||||
*/
|
||||
@ -101,12 +116,6 @@ private:
|
||||
*/
|
||||
unsigned int next_index;
|
||||
|
||||
/**
|
||||
* Current term to be included in new LogDB records generated during the
|
||||
* term.
|
||||
*/
|
||||
unsigned int term;
|
||||
|
||||
/**
|
||||
* List of pending requests (a client is waiting for the log entry to be
|
||||
* replicated in a majority of followers)
|
||||
|
@ -17,6 +17,8 @@
|
||||
#ifndef LOG_DB_MANAGER_H_
|
||||
#define LOG_DB_MANAGER_H_
|
||||
|
||||
#include <xmlrpc-c/client.hpp>
|
||||
|
||||
#include "ActionManager.h"
|
||||
#include "ZoneServer.h"
|
||||
|
||||
@ -35,7 +37,8 @@ public:
|
||||
START,
|
||||
STOP,
|
||||
REPLICATE,
|
||||
DELETE_SERVER
|
||||
DELETE_SERVER,
|
||||
ADD_SERVER
|
||||
};
|
||||
|
||||
LogDBAction(Actions a):ActionRequest(ActionRequest::USER), _action(a){};
|
||||
@ -62,8 +65,23 @@ private:
|
||||
|
||||
class LogDBManager : public ActionListener
|
||||
{
|
||||
private:
|
||||
public:
|
||||
LogDBManager(){};
|
||||
|
||||
virtual ~LogDBManager(){};
|
||||
|
||||
/**
|
||||
* Triggers specific actions to the LogDBManager
|
||||
* @param action to trigger in the manager
|
||||
*/
|
||||
void trigger(LogDBAction::Actions action)
|
||||
{
|
||||
LogDBAction log_action(action);
|
||||
|
||||
am.trigger(log_action);
|
||||
}
|
||||
|
||||
private:
|
||||
friend void * logdb_manager_loop(void *arg);
|
||||
|
||||
friend void * replication_thread(void *arg);
|
||||
@ -73,58 +91,113 @@ private:
|
||||
*/
|
||||
ActionManager am;
|
||||
|
||||
/**
|
||||
* Servers in the zone, managed by Zone::add/delete_server
|
||||
*/
|
||||
ZoneServers * servers;
|
||||
|
||||
// -------------------------------------------------------------------------
|
||||
// Replication thread class
|
||||
// Replication thread class & pool
|
||||
// -------------------------------------------------------------------------
|
||||
class ReplicaThread
|
||||
{
|
||||
public:
|
||||
ReplicaThread(ZoneServer * z):zserver(z)
|
||||
{
|
||||
pthread_mutex_init(&mutex, 0);
|
||||
|
||||
pthread_cond_init(&cond, 0);
|
||||
};
|
||||
ReplicaThread(ZoneServer * server, ZoneServer * leader);
|
||||
|
||||
virtual ~ReplicaThread(){};
|
||||
|
||||
void do_replication();
|
||||
|
||||
void finalize();
|
||||
|
||||
pthread_t * thread_id()
|
||||
{
|
||||
return &_thread_id;
|
||||
}
|
||||
|
||||
private:
|
||||
pthread_t thread_id;
|
||||
friend void * replication_thread(void *arg);
|
||||
|
||||
// ---------------------------------------------------------------------
|
||||
// pthread synchronization variables
|
||||
// ---------------------------------------------------------------------
|
||||
pthread_t _thread_id;
|
||||
|
||||
pthread_mutex_t mutex;
|
||||
|
||||
pthread_cond_t cond;
|
||||
|
||||
ZoneServer * zserver;
|
||||
bool _finalize;
|
||||
|
||||
// ---------------------------------------------------------------------
|
||||
// Information of the replication target server and leader
|
||||
// ---------------------------------------------------------------------
|
||||
ZoneServer * server;
|
||||
|
||||
ZoneServer * leader;
|
||||
|
||||
// ---------------------------------------------------------------------
|
||||
// XML-RPC client variables to talk with this server
|
||||
// ---------------------------------------------------------------------
|
||||
xmlrpc_c::clientXmlTransport_curl transport;
|
||||
|
||||
xmlrpc_c::client_xml client;
|
||||
|
||||
static const std::string replica_method;
|
||||
};
|
||||
|
||||
std::map<int, ReplicaThread *> thread_pool;
|
||||
|
||||
ReplicaThread * get_thread(int server_id)
|
||||
{
|
||||
std::map<int, ReplicaThread *>::iterator it;
|
||||
|
||||
it = thread_pool.find(server_id);
|
||||
|
||||
if ( it == thread_pool.end() )
|
||||
{
|
||||
return 0;
|
||||
}
|
||||
|
||||
return it->second;
|
||||
}
|
||||
|
||||
// -------------------------------------------------------------------------
|
||||
// Action Listener interface
|
||||
// -------------------------------------------------------------------------
|
||||
void finalize_action(const ActionRequest& ar);
|
||||
|
||||
/**
|
||||
* Start the replication threads, one for each server in the zone
|
||||
*/
|
||||
void start(const ActionRequest& ar);
|
||||
|
||||
/**
|
||||
* Stop the replication threads (leader becomes follower)
|
||||
*/
|
||||
void stop(const ActionRequest& ar);
|
||||
|
||||
/**
|
||||
* Notify threads there is a new log entry to replicate on followers
|
||||
*/
|
||||
void replicate(const ActionRequest& ar);
|
||||
|
||||
/**
|
||||
* Event dispatcher function
|
||||
*/
|
||||
void user_action(const ActionRequest& ar);
|
||||
|
||||
/**
|
||||
* Termination function
|
||||
*/
|
||||
void finalize_action(const ActionRequest& ar);
|
||||
|
||||
// -------------------------------------------------------------------------
|
||||
// LogDBManager actions
|
||||
// -------------------------------------------------------------------------
|
||||
/**
|
||||
* Start the replication threads, one for each server in the zone
|
||||
*/
|
||||
void start_action();
|
||||
|
||||
/**
|
||||
* Stop the replication threads (leader becomes follower)
|
||||
*/
|
||||
void stop_action();
|
||||
|
||||
/**
|
||||
* Notify threads there is a new log entry to replicate on followers
|
||||
*/
|
||||
void replicate_action();
|
||||
|
||||
/**
|
||||
*
|
||||
*/
|
||||
void delete_server_action();
|
||||
|
||||
};
|
||||
|
||||
#endif /*LOG_DB_MANAGER_H_*/
|
||||
|
@ -17,7 +17,7 @@
|
||||
#ifndef NEBULA_H_
|
||||
#define NEBULA_H_
|
||||
|
||||
#include "SqlDB.h"
|
||||
#include "LogDB.h"
|
||||
#include "SystemDB.h"
|
||||
|
||||
#include "NebulaTemplate.h"
|
||||
@ -76,6 +76,10 @@ public:
|
||||
// --------------------------------------------------------------
|
||||
// Pool Accessors
|
||||
// --------------------------------------------------------------
|
||||
LogDB * get_logdb()
|
||||
{
|
||||
return logdb;
|
||||
};
|
||||
|
||||
VirtualMachinePool * get_vmpool()
|
||||
{
|
||||
@ -400,6 +404,11 @@ public:
|
||||
return zone_id;
|
||||
};
|
||||
|
||||
int get_server_id()
|
||||
{
|
||||
return server_id;
|
||||
};
|
||||
|
||||
const string& get_master_oned()
|
||||
{
|
||||
return master_oned;
|
||||
@ -660,7 +669,7 @@ private:
|
||||
"/DEFAULT_GROUP_QUOTAS/NETWORK_QUOTA",
|
||||
"/DEFAULT_GROUP_QUOTAS/IMAGE_QUOTA",
|
||||
"/DEFAULT_GROUP_QUOTAS/VM_QUOTA"),
|
||||
system_db(0), db(0),
|
||||
system_db(0), logdb(0),
|
||||
vmpool(0), hpool(0), vnpool(0), upool(0), ipool(0), gpool(0), tpool(0),
|
||||
dspool(0), clpool(0), docpool(0), zonepool(0), secgrouppool(0),
|
||||
vdcpool(0), vrouterpool(0), marketpool(0), apppool(0), vmgrouppool(0),
|
||||
@ -730,7 +739,7 @@ private:
|
||||
delete marketm;
|
||||
delete ipamm;
|
||||
delete nebula_configuration;
|
||||
delete db;
|
||||
delete logdb;
|
||||
delete system_db;
|
||||
};
|
||||
|
||||
@ -765,6 +774,7 @@ private:
|
||||
bool federation_enabled;
|
||||
bool federation_master;
|
||||
int zone_id;
|
||||
int server_id;
|
||||
string master_oned;
|
||||
|
||||
// ---------------------------------------------------------------
|
||||
@ -784,7 +794,7 @@ private:
|
||||
// Nebula Pools
|
||||
// ---------------------------------------------------------------
|
||||
|
||||
SqlDB * db;
|
||||
LogDB * logdb;
|
||||
VirtualMachinePool * vmpool;
|
||||
HostPool * hpool;
|
||||
VirtualNetworkPool * vnpool;
|
||||
|
@ -23,7 +23,7 @@
|
||||
using namespace std;
|
||||
|
||||
class ZoneServers;
|
||||
|
||||
class ZoneServer;
|
||||
/**
|
||||
* The Zone class.
|
||||
*/
|
||||
@ -64,6 +64,25 @@ public:
|
||||
*/
|
||||
int delete_server(int id, string& error);
|
||||
|
||||
/**
|
||||
* @return the servers in this zone
|
||||
*/
|
||||
ZoneServers * get_servers()
|
||||
{
|
||||
return servers;
|
||||
}
|
||||
|
||||
/**
|
||||
* @param server_id
|
||||
* @return the server
|
||||
*/
|
||||
ZoneServer * get_server(int server_id);
|
||||
|
||||
/**
|
||||
* @return the number of servers in this zone
|
||||
*/
|
||||
unsigned int servers_size();
|
||||
|
||||
private:
|
||||
// -------------------------------------------------------------------------
|
||||
// Friends
|
||||
|
@ -72,6 +72,42 @@ public:
|
||||
return ExtendedAttribute::get_id();
|
||||
}
|
||||
|
||||
int get_term() const
|
||||
{
|
||||
return term;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return highest log known to be commited
|
||||
*/
|
||||
int get_commit() const
|
||||
{
|
||||
return commit;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return highest log applied to DB
|
||||
*/
|
||||
int get_applied() const
|
||||
{
|
||||
return applied;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return next log to send to this server
|
||||
*/
|
||||
int get_next() const
|
||||
{
|
||||
return next;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return highest log replicated in this server
|
||||
*/
|
||||
int get_match() const
|
||||
{
|
||||
return match;
|
||||
}
|
||||
/**
|
||||
* Initialized follower data
|
||||
* @param last log index
|
||||
@ -82,7 +118,22 @@ public:
|
||||
match = 0;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return true if the server is offline
|
||||
*/
|
||||
bool is_offline()
|
||||
{
|
||||
return state == OFFLINE;
|
||||
}
|
||||
|
||||
private:
|
||||
State state;
|
||||
|
||||
/**
|
||||
* Current term
|
||||
*/
|
||||
unsigned int term;
|
||||
|
||||
//--------------------------------------------------------------------------
|
||||
// Volatile log index variables
|
||||
// - commit, highest log known to be commited
|
||||
@ -217,6 +268,13 @@ public:
|
||||
return static_cast<ZoneServer *>(delete_attribute(id));
|
||||
};
|
||||
|
||||
/**
|
||||
* @return servers in zone
|
||||
*/
|
||||
unsigned int size()
|
||||
{
|
||||
return ExtendedAttributeSet::size();
|
||||
}
|
||||
|
||||
protected:
|
||||
ExtendedAttribute * attribute_factory(VectorAttribute * va, int id) const
|
||||
|
@ -16,16 +16,17 @@
|
||||
|
||||
#include "LogDB.h"
|
||||
#include "Nebula.h"
|
||||
#include "ZoneServer.h"
|
||||
|
||||
/* -------------------------------------------------------------------------- */
|
||||
/* -------------------------------------------------------------------------- */
|
||||
|
||||
const char * LogDB::table = "logdb";
|
||||
|
||||
const char * LogDB::db_names = "index, term, sql";
|
||||
const char * LogDB::db_names = "log_index, term, sql";
|
||||
|
||||
const char * LogDB::db_bootstrap = "CREATE TABLE IF NOT EXISTS "
|
||||
"logdb (index INTEGER, term INTEGER, sql MEDIUMTEXT, PRIMARY KEY(index))";
|
||||
"logdb (log_index INTEGER PRIMARY KEY, term INTEGER, sql MEDIUMTEXT)";
|
||||
|
||||
/* -------------------------------------------------------------------------- */
|
||||
/* -------------------------------------------------------------------------- */
|
||||
@ -34,21 +35,34 @@ LogDBRequest * LogDB::get_request(unsigned int index)
|
||||
{
|
||||
std::map<unsigned int, LogDBRequest *>::iterator it;
|
||||
|
||||
LogDBRequest * req = 0;
|
||||
|
||||
pthread_mutex_lock(&mutex);
|
||||
|
||||
it = requests.find(index);
|
||||
|
||||
if ( it == requests.end() )
|
||||
if ( it != requests.end() )
|
||||
{
|
||||
req = it->second;
|
||||
}
|
||||
|
||||
pthread_mutex_unlock(&mutex);
|
||||
|
||||
if ( req == 0 )
|
||||
{
|
||||
LogDBRequest * req = select(index);
|
||||
|
||||
if ( req != 0 )
|
||||
{
|
||||
requests.insert(std::make_pair(index, req));
|
||||
}
|
||||
pthread_mutex_lock(&mutex);
|
||||
|
||||
return req;
|
||||
requests.insert(std::make_pair(index, req));
|
||||
|
||||
pthread_mutex_unlock(&mutex);
|
||||
}
|
||||
}
|
||||
|
||||
return it->second;
|
||||
return req;
|
||||
}
|
||||
|
||||
/* -------------------------------------------------------------------------- */
|
||||
@ -57,18 +71,70 @@ int LogDB::exec_wr(ostringstream& cmd)
|
||||
{
|
||||
int rc;
|
||||
|
||||
// Insert log entry in the Database
|
||||
Nebula& nd = Nebula::instance();
|
||||
ZonePool * zpool = nd.get_zonepool();
|
||||
|
||||
int server_id = nd.get_server_id();
|
||||
int zone_id = nd.get_zone_id();
|
||||
|
||||
ZoneServer * server = 0;
|
||||
unsigned int term = 0;
|
||||
|
||||
if ( server_id != -1 )
|
||||
{
|
||||
Zone * zone = zpool->get(zone_id, true);
|
||||
|
||||
if ( zone != 0 )
|
||||
{
|
||||
if ( zone->servers_size() > 1 )
|
||||
{
|
||||
server = zone->get_server(server_id);
|
||||
term = server->get_term();
|
||||
}
|
||||
|
||||
zone->unlock();
|
||||
}
|
||||
}
|
||||
|
||||
// -------------------------------------------------------------------------
|
||||
// OpenNebula was started in solo mode
|
||||
// -------------------------------------------------------------------------
|
||||
if ( server == 0 )
|
||||
{
|
||||
pthread_mutex_lock(&mutex);
|
||||
|
||||
LogDBRequest lr(next_index, term, cmd);
|
||||
|
||||
next_index++;
|
||||
|
||||
pthread_mutex_unlock(&mutex);
|
||||
|
||||
if ( insert_replace(&lr, false) != 0 )
|
||||
{
|
||||
NebulaLog::log("DBM", Log::ERROR, "Cannot insert log record in DB");
|
||||
}
|
||||
|
||||
return db->exec_wr(cmd);
|
||||
}
|
||||
|
||||
// -------------------------------------------------------------------------
|
||||
// Insert log entry in the database and replicate on followers
|
||||
// -------------------------------------------------------------------------
|
||||
pthread_mutex_lock(&mutex);
|
||||
|
||||
LogDBRequest * lr = new LogDBRequest(next_index, term, cmd);
|
||||
|
||||
requests.insert(std::make_pair(next_index, lr));
|
||||
|
||||
next_index++;
|
||||
|
||||
pthread_mutex_unlock(&mutex);
|
||||
|
||||
if ( insert_replace(lr, false) != 0 )
|
||||
{
|
||||
NebulaLog::log("DBM", Log::ERROR, "Cannot insert log record in DB");
|
||||
}
|
||||
|
||||
// Store the replication request in the active requests map
|
||||
requests.insert(std::make_pair(next_index, lr));
|
||||
|
||||
next_index++;
|
||||
|
||||
//LogDBManager->triger(NEW_LOG_RECORD);
|
||||
|
||||
|
@ -17,6 +17,7 @@
|
||||
#include "LogDBManager.h"
|
||||
#include "Nebula.h"
|
||||
#include "NebulaLog.h"
|
||||
#include "Client.h"
|
||||
|
||||
// ----------------------------------------------------------------------------
|
||||
// Thread wrapper functions
|
||||
@ -67,24 +68,210 @@ void LogDBManager::finalize_action(const ActionRequest& ar)
|
||||
|
||||
void LogDBManager::user_action(const ActionRequest& ar)
|
||||
{
|
||||
const LogDBAction& la = static_cast<const LogDBAction& >(ar);
|
||||
|
||||
switch(la.action())
|
||||
{
|
||||
case LogDBAction::START:
|
||||
case LogDBAction::ADD_SERVER:
|
||||
start_action();
|
||||
break;
|
||||
|
||||
case LogDBAction::STOP:
|
||||
stop_action();
|
||||
break;
|
||||
|
||||
case LogDBAction::REPLICATE:
|
||||
replicate_action();
|
||||
break;
|
||||
|
||||
case LogDBAction::DELETE_SERVER:
|
||||
delete_server_action();
|
||||
break;
|
||||
}
|
||||
};
|
||||
|
||||
void LogDBManager::start(const ActionRequest& ar)
|
||||
// -----------------------------------------------------------------------------
|
||||
// -----------------------------------------------------------------------------
|
||||
|
||||
void LogDBManager::start_action()
|
||||
{
|
||||
std::ostringstream oss;
|
||||
|
||||
Nebula& nd = Nebula::instance();
|
||||
ZonePool * zpool = nd.get_zonepool();
|
||||
|
||||
int this_id = nd.get_server_id();
|
||||
int zone_id = nd.get_zone_id();
|
||||
|
||||
Zone * zone = zpool->get(zone_id, true);
|
||||
|
||||
if ( zone == 0 )
|
||||
{
|
||||
oss << "start replicas: zone " << zone_id << "does not exist.";
|
||||
|
||||
NebulaLog::log("DBM", Log::ERROR, oss);
|
||||
return;
|
||||
}
|
||||
|
||||
ZoneServers * servers = zone->get_servers();
|
||||
ZoneServer * this_server = servers->get_server(this_id);
|
||||
|
||||
if ( this_server == 0 )
|
||||
{
|
||||
oss << "start replicas: server " << zone_id << "does not exist.";
|
||||
|
||||
NebulaLog::log("DBM", Log::ERROR, oss);
|
||||
return;
|
||||
}
|
||||
|
||||
ZoneServers::zone_iterator it;
|
||||
|
||||
for (it = servers->begin() ; it != servers->end() ; ++it )
|
||||
{
|
||||
pthread_attr_t pattr;
|
||||
|
||||
int id = (*it)->get_id();
|
||||
|
||||
if ( id == this_id || (*it)->is_offline() || get_thread(id) != 0 )
|
||||
{
|
||||
continue;
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------------
|
||||
// Initialize follower
|
||||
// ---------------------------------------------------------------------
|
||||
(*it)->init_follower(this_server->get_applied());
|
||||
|
||||
// ---------------------------------------------------------------------
|
||||
// Create replication thread for this follower
|
||||
// ---------------------------------------------------------------------
|
||||
ReplicaThread * rthread = new ReplicaThread(*it, this_server);
|
||||
|
||||
thread_pool.insert(std::make_pair(id, rthread));
|
||||
|
||||
pthread_attr_init (&pattr);
|
||||
pthread_attr_setdetachstate(&pattr, PTHREAD_CREATE_JOINABLE);
|
||||
|
||||
oss << "Starting replication thread for server " << id;
|
||||
|
||||
NebulaLog::log("DBM", Log::INFO, oss);
|
||||
|
||||
pthread_create(rthread->thread_id(), &pattr, replication_thread,
|
||||
(void *) rthread);
|
||||
}
|
||||
|
||||
zone->unlock();
|
||||
};
|
||||
|
||||
|
||||
void LogDBManager::stop_action()
|
||||
{
|
||||
std::map<int, ReplicaThread *>::iterator it;
|
||||
|
||||
for ( it = thread_pool.begin() ; it != thread_pool.end() ; ++it )
|
||||
{
|
||||
it->second->finalize();
|
||||
|
||||
pthread_join(*(it->second->thread_id()), 0);
|
||||
|
||||
delete it->second;
|
||||
}
|
||||
|
||||
thread_pool.clear();
|
||||
};
|
||||
|
||||
void LogDBManager::replicate_action()
|
||||
{
|
||||
|
||||
};
|
||||
|
||||
void LogDBManager::stop(const ActionRequest& ar)
|
||||
{
|
||||
|
||||
};
|
||||
|
||||
void LogDBManager::replicate(const ActionRequest& ar)
|
||||
void LogDBManager::delete_server_action()
|
||||
{
|
||||
|
||||
};
|
||||
|
||||
// -----------------------------------------------------------------------------
|
||||
// -----------------------------------------------------------------------------
|
||||
// Replication thread logic
|
||||
// -----------------------------------------------------------------------------
|
||||
// -----------------------------------------------------------------------------
|
||||
|
||||
const std::string LogDBManager::ReplicaThread::replica_method =
|
||||
"one.zone.replicate";
|
||||
|
||||
LogDBManager::ReplicaThread::ReplicaThread(ZoneServer * z, ZoneServer *l):
|
||||
_finalize(false), server(z), leader(l), client(&transport)
|
||||
{
|
||||
pthread_mutex_init(&mutex, 0);
|
||||
|
||||
pthread_cond_init(&cond, 0);
|
||||
};
|
||||
|
||||
// -----------------------------------------------------------------------------
|
||||
// -----------------------------------------------------------------------------
|
||||
//
|
||||
void LogDBManager::ReplicaThread::do_replication()
|
||||
{
|
||||
std::string server_endpoint = server->vector_value("ENDPOINT");
|
||||
|
||||
std::string secret, error;
|
||||
|
||||
Client::read_oneauth(secret, error);
|
||||
|
||||
while ( _finalize == false )
|
||||
{
|
||||
pthread_mutex_lock(&mutex);
|
||||
|
||||
if ( _finalize )
|
||||
{
|
||||
return;
|
||||
}
|
||||
|
||||
int next = server->get_next();
|
||||
/*
|
||||
LogDBRequest
|
||||
|
||||
xmlrpc_c::carriageParm_curl0 carriage(server_endpoint);
|
||||
|
||||
xmlrpc_c::paramList replica_params;
|
||||
|
||||
replica_params.add(xmlrpc_c::value_string(secret));
|
||||
replica_params.add(xmlrpc_c::value_int(leader->term()));
|
||||
replica_params.add(xmlrpc_c::value_int(secret));
|
||||
replica_params.add(xmlrpc_c::value_string(secret));
|
||||
|
||||
|
||||
|
||||
client->call("one.vm.deploy", // methodName
|
||||
"iibi", // arguments format
|
||||
&deploy_result, // resultP
|
||||
vid, // argument 1 (VM)
|
||||
hid, // argument 2 (HOST)
|
||||
false, // argument 3 (ENFORCE)
|
||||
dsid); // argument 5 (SYSTEM SD)
|
||||
|
||||
sampleAddParms.add(xmlrpc_c::value_int(5));
|
||||
sampleAddParms.add(xmlrpc_c::value_int(7));
|
||||
|
||||
xmlrpc_c::rpcPtr myRpcP(methodName, sampleAddParms);
|
||||
|
||||
string const serverUrl("http://localhost:8080/RPC2");
|
||||
*/
|
||||
pthread_mutex_unlock(&mutex);
|
||||
}
|
||||
}
|
||||
|
||||
// -----------------------------------------------------------------------------
|
||||
// -----------------------------------------------------------------------------
|
||||
|
||||
void LogDBManager::ReplicaThread::finalize()
|
||||
{
|
||||
pthread_mutex_lock(&mutex);
|
||||
|
||||
_finalize = true;
|
||||
|
||||
pthread_cond_signal(&cond);
|
||||
|
||||
pthread_mutex_unlock(&mutex);
|
||||
}
|
||||
|
@ -45,6 +45,7 @@ void Nebula::start(bool bootstrap_only)
|
||||
int signal;
|
||||
char hn[80];
|
||||
string scripts_remote_dir;
|
||||
SqlDB * db_backend;
|
||||
|
||||
if ( gethostname(hn,79) != 0 )
|
||||
{
|
||||
@ -161,6 +162,7 @@ void Nebula::start(bool bootstrap_only)
|
||||
federation_enabled = false;
|
||||
federation_master = false;
|
||||
zone_id = 0;
|
||||
server_id = -1;
|
||||
master_oned = "";
|
||||
|
||||
const VectorAttribute * vatt = nebula_configuration->get("FEDERATION");
|
||||
@ -210,6 +212,11 @@ void Nebula::start(bool bootstrap_only)
|
||||
"FEDERATION MASTER_ONED endpoint is missing.");
|
||||
}
|
||||
}
|
||||
|
||||
if ( vatt->vector_value("SERVER_ID", server_id) != 0 )
|
||||
{
|
||||
server_id = -1;
|
||||
}
|
||||
}
|
||||
|
||||
Log::set_zone_id(zone_id);
|
||||
@ -278,13 +285,15 @@ void Nebula::start(bool bootstrap_only)
|
||||
|
||||
if ( db_is_sqlite )
|
||||
{
|
||||
db = new SqliteDB(var_location + "one.db");
|
||||
db_backend = new SqliteDB(var_location + "one.db");
|
||||
}
|
||||
else
|
||||
{
|
||||
db = new MySqlDB(server, port, user, passwd, db_name);
|
||||
db_backend = new MySqlDB(server, port, user, passwd, db_name);
|
||||
}
|
||||
|
||||
logdb = new LogDB(db_backend);
|
||||
|
||||
// ---------------------------------------------------------------------
|
||||
// Prepare the SystemDB and check versions
|
||||
// ---------------------------------------------------------------------
|
||||
@ -294,7 +303,7 @@ void Nebula::start(bool bootstrap_only)
|
||||
|
||||
NebulaLog::log("ONE",Log::INFO,"Checking database version.");
|
||||
|
||||
system_db = new SystemDB(db);
|
||||
system_db = new SystemDB(logdb);
|
||||
|
||||
rc = system_db->check_db_version(is_federation_slave(),
|
||||
local_bootstrap,
|
||||
@ -311,19 +320,20 @@ void Nebula::start(bool bootstrap_only)
|
||||
NebulaLog::log("ONE",Log::INFO,
|
||||
"Bootstrapping OpenNebula database, stage 1.");
|
||||
|
||||
rc += VirtualMachinePool::bootstrap(db);
|
||||
rc += HostPool::bootstrap(db);
|
||||
rc += VirtualNetworkPool::bootstrap(db);
|
||||
rc += ImagePool::bootstrap(db);
|
||||
rc += VMTemplatePool::bootstrap(db);
|
||||
rc += DatastorePool::bootstrap(db);
|
||||
rc += ClusterPool::bootstrap(db);
|
||||
rc += DocumentPool::bootstrap(db);
|
||||
rc += UserQuotas::bootstrap(db);
|
||||
rc += GroupQuotas::bootstrap(db);
|
||||
rc += SecurityGroupPool::bootstrap(db);
|
||||
rc += VirtualRouterPool::bootstrap(db);
|
||||
rc += VMGroupPool::bootstrap(db);
|
||||
rc += VirtualMachinePool::bootstrap(logdb);
|
||||
rc += HostPool::bootstrap(logdb);
|
||||
rc += VirtualNetworkPool::bootstrap(logdb);
|
||||
rc += ImagePool::bootstrap(logdb);
|
||||
rc += VMTemplatePool::bootstrap(logdb);
|
||||
rc += DatastorePool::bootstrap(logdb);
|
||||
rc += ClusterPool::bootstrap(logdb);
|
||||
rc += DocumentPool::bootstrap(logdb);
|
||||
rc += UserQuotas::bootstrap(logdb);
|
||||
rc += GroupQuotas::bootstrap(logdb);
|
||||
rc += SecurityGroupPool::bootstrap(logdb);
|
||||
rc += VirtualRouterPool::bootstrap(logdb);
|
||||
rc += VMGroupPool::bootstrap(logdb);
|
||||
rc += logdb->bootstrap();
|
||||
|
||||
// Create the system tables only if bootstrap went well
|
||||
if (rc == 0)
|
||||
@ -341,13 +351,13 @@ void Nebula::start(bool bootstrap_only)
|
||||
NebulaLog::log("ONE",Log::INFO,
|
||||
"Bootstrapping OpenNebula database, stage 2.");
|
||||
|
||||
rc += GroupPool::bootstrap(db);
|
||||
rc += UserPool::bootstrap(db);
|
||||
rc += AclManager::bootstrap(db);
|
||||
rc += ZonePool::bootstrap(db);
|
||||
rc += VdcPool::bootstrap(db);
|
||||
rc += MarketPlacePool::bootstrap(db);
|
||||
rc += MarketPlaceAppPool::bootstrap(db);
|
||||
rc += GroupPool::bootstrap(logdb);
|
||||
rc += UserPool::bootstrap(logdb);
|
||||
rc += AclManager::bootstrap(logdb);
|
||||
rc += ZonePool::bootstrap(logdb);
|
||||
rc += VdcPool::bootstrap(logdb);
|
||||
rc += MarketPlacePool::bootstrap(logdb);
|
||||
rc += MarketPlaceAppPool::bootstrap(logdb);
|
||||
|
||||
// Create the system tables only if bootstrap went well
|
||||
if ( rc == 0 )
|
||||
@ -425,7 +435,8 @@ void Nebula::start(bool bootstrap_only)
|
||||
// ---- ACL Manager ----
|
||||
try
|
||||
{
|
||||
aclm = new AclManager(db, zone_id, is_federation_slave(), timer_period);
|
||||
aclm = new AclManager(logdb, zone_id, is_federation_slave(),
|
||||
timer_period);
|
||||
}
|
||||
catch (bad_alloc&)
|
||||
{
|
||||
@ -449,7 +460,7 @@ void Nebula::start(bool bootstrap_only)
|
||||
|
||||
vnc_conf = nebula_configuration->get("VNC_PORTS");
|
||||
|
||||
clpool = new ClusterPool(db, vnc_conf);
|
||||
clpool = new ClusterPool(logdb, vnc_conf);
|
||||
|
||||
/* --------------------- VirtualMachine Pool ------------------------ */
|
||||
vector<const VectorAttribute *> vm_hooks;
|
||||
@ -489,7 +500,7 @@ void Nebula::start(bool bootstrap_only)
|
||||
disk_cost = 0;
|
||||
}
|
||||
|
||||
vmpool = new VirtualMachinePool(db, vm_hooks, hook_location,
|
||||
vmpool = new VirtualMachinePool(logdb, vm_hooks, hook_location,
|
||||
remotes_location, vm_restricted_attrs, vm_expiration,
|
||||
vm_submit_on_hold, cpu_cost, mem_cost, disk_cost);
|
||||
|
||||
@ -500,9 +511,10 @@ void Nebula::start(bool bootstrap_only)
|
||||
|
||||
nebula_configuration->get("HOST_HOOK", host_hooks);
|
||||
|
||||
nebula_configuration->get("HOST_MONITORING_EXPIRATION_TIME", host_expiration);
|
||||
nebula_configuration->get("HOST_MONITORING_EXPIRATION_TIME",
|
||||
host_expiration);
|
||||
|
||||
hpool = new HostPool(db, host_hooks, hook_location, remotes_location,
|
||||
hpool = new HostPool(logdb, host_hooks, hook_location, remotes_location,
|
||||
host_expiration);
|
||||
|
||||
/* --------------------- VirtualRouter Pool ------------------------- */
|
||||
@ -510,7 +522,8 @@ void Nebula::start(bool bootstrap_only)
|
||||
|
||||
nebula_configuration->get("VROUTER_HOOK", vrouter_hooks);
|
||||
|
||||
vrouterpool = new VirtualRouterPool(db, vrouter_hooks, remotes_location);
|
||||
vrouterpool = new VirtualRouterPool(logdb, vrouter_hooks,
|
||||
remotes_location);
|
||||
|
||||
/* -------------------- VirtualNetwork Pool ------------------------- */
|
||||
int size;
|
||||
@ -537,8 +550,9 @@ void Nebula::start(bool bootstrap_only)
|
||||
|
||||
vxlan_id = nebula_configuration->get("VXLAN_IDS");
|
||||
|
||||
vnpool = new VirtualNetworkPool(db, mac_prefix, size, vnet_restricted_attrs,
|
||||
vnet_hooks, remotes_location, inherit_vnet_attrs, vlan_id, vxlan_id);
|
||||
vnpool = new VirtualNetworkPool(logdb, mac_prefix, size,
|
||||
vnet_restricted_attrs, vnet_hooks, remotes_location,
|
||||
inherit_vnet_attrs, vlan_id, vxlan_id);
|
||||
|
||||
/* ----------------------- Group/User Pool -------------------------- */
|
||||
vector<const VectorAttribute *> user_hooks;
|
||||
@ -548,15 +562,15 @@ void Nebula::start(bool bootstrap_only)
|
||||
|
||||
nebula_configuration->get("GROUP_HOOK", group_hooks);
|
||||
|
||||
gpool = new GroupPool(db, group_hooks, remotes_location,
|
||||
gpool = new GroupPool(logdb, group_hooks, remotes_location,
|
||||
is_federation_slave());
|
||||
|
||||
nebula_configuration->get("SESSION_EXPIRATION_TIME", expiration_time);
|
||||
|
||||
nebula_configuration->get("USER_HOOK", user_hooks);
|
||||
|
||||
upool = new UserPool(db, expiration_time, user_hooks, remotes_location,
|
||||
is_federation_slave());
|
||||
upool = new UserPool(logdb, expiration_time, user_hooks,
|
||||
remotes_location, is_federation_slave());
|
||||
|
||||
/* -------------------- Image/Datastore Pool ------------------------ */
|
||||
string image_type;
|
||||
@ -578,27 +592,27 @@ void Nebula::start(bool bootstrap_only)
|
||||
|
||||
nebula_configuration->get("INHERIT_IMAGE_ATTR", inherit_image_attrs);
|
||||
|
||||
ipool = new ImagePool(db, image_type, device_prefix, cd_dev_prefix,
|
||||
ipool = new ImagePool(logdb, image_type, device_prefix, cd_dev_prefix,
|
||||
img_restricted_attrs, image_hooks, remotes_location,
|
||||
inherit_image_attrs);
|
||||
|
||||
nebula_configuration->get("INHERIT_DATASTORE_ATTR", inherit_ds_attrs);
|
||||
|
||||
dspool = new DatastorePool(db, inherit_ds_attrs);
|
||||
dspool = new DatastorePool(logdb, inherit_ds_attrs);
|
||||
|
||||
/* ----- Document, Zone, VDC, VMTemplate, SG and Makerket Pools ----- */
|
||||
docpool = new DocumentPool(db);
|
||||
zonepool = new ZonePool(db, is_federation_slave());
|
||||
vdcpool = new VdcPool(db, is_federation_slave());
|
||||
docpool = new DocumentPool(logdb);
|
||||
zonepool = new ZonePool(logdb, is_federation_slave());
|
||||
vdcpool = new VdcPool(logdb, is_federation_slave());
|
||||
|
||||
tpool = new VMTemplatePool(db);
|
||||
tpool = new VMTemplatePool(logdb);
|
||||
|
||||
secgrouppool = new SecurityGroupPool(db);
|
||||
secgrouppool = new SecurityGroupPool(logdb);
|
||||
|
||||
marketpool = new MarketPlacePool(db, is_federation_slave());
|
||||
apppool = new MarketPlaceAppPool(db, is_federation_slave());
|
||||
marketpool = new MarketPlacePool(logdb, is_federation_slave());
|
||||
apppool = new MarketPlaceAppPool(logdb, is_federation_slave());
|
||||
|
||||
vmgrouppool = new VMGroupPool(db);
|
||||
vmgrouppool = new VMGroupPool(logdb);
|
||||
|
||||
default_user_quota.select();
|
||||
default_group_quota.select();
|
||||
@ -977,6 +991,20 @@ void Nebula::start(bool bootstrap_only)
|
||||
throw runtime_error("Could not start the Request Manager");
|
||||
}
|
||||
|
||||
// -----------------------------------------------------------
|
||||
// Start HA mode if working in a cluster of oned's
|
||||
// -----------------------------------------------------------
|
||||
|
||||
if ( server_id != -1 )
|
||||
{
|
||||
NebulaLog::log("ONE", Log::INFO, "No SERVER_ID defined, starting "
|
||||
"oned in solo mode.");
|
||||
}
|
||||
else
|
||||
{
|
||||
|
||||
}
|
||||
|
||||
// -----------------------------------------------------------
|
||||
// Wait for a SIGTERM or SIGINT signal
|
||||
// -----------------------------------------------------------
|
||||
|
@ -427,6 +427,7 @@ void OpenNebulaTemplate::set_conf_default()
|
||||
vvalue.clear();
|
||||
vvalue.insert(make_pair("MODE","STANDALONE"));
|
||||
vvalue.insert(make_pair("ZONE_ID","0"));
|
||||
vvalue.insert(make_pair("SERVER_ID","-1"));
|
||||
vvalue.insert(make_pair("MASTER_ONED",""));
|
||||
|
||||
vattribute = new VectorAttribute("FEDERATION",vvalue);
|
||||
|
@ -68,6 +68,7 @@ env.Prepend(LIBS=[
|
||||
'nebula_marketplace',
|
||||
'nebula_ipamm',
|
||||
'nebula_vmgroup',
|
||||
'nebula_logdb',
|
||||
'crypto',
|
||||
'xml2'
|
||||
])
|
||||
|
@ -344,3 +344,13 @@ int Zone::delete_server(int id, string& error)
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
unsigned int Zone::servers_size()
|
||||
{
|
||||
return servers->size();
|
||||
}
|
||||
|
||||
ZoneServer * Zone::get_server(int server_id)
|
||||
{
|
||||
return servers->get_server(server_id);
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user