1
0
mirror of https://github.com/OpenNebula/one.git synced 2025-01-08 21:17:43 +03:00

F #4809: Some implementation files. Fix compilation issues

This commit is contained in:
Ruben S. Montero 2017-04-20 16:13:41 +02:00
parent bae57600fe
commit c8981e82a3
8 changed files with 487 additions and 61 deletions

View File

@ -55,6 +55,7 @@ main_env.Append(CPPPATH=[
main_env.Append(LIBPATH=[
cwd+'/src/common',
cwd+'/src/log',
cwd+'/src/logdb',
cwd+'/src/sql',
cwd+'/src/host',
cwd+'/src/cluster',
@ -211,6 +212,7 @@ main_env.ParseConfig('xml2-config --libs --cflags')
build_scripts=[
'src/sql/SConstruct',
'src/log/SConstruct',
'src/logdb/SConstruct',
'src/common/SConstruct',
'src/template/SConstruct',
'src/host/SConstruct',

View File

@ -19,14 +19,15 @@
#include <string>
#include <sstream>
#include <map>
#include "SqlDB.h"
#include "LogDBRequest.h"
class LogDB : public SqlDB
class LogDB : public SqlDB, Callbackable
{
public:
LogDB(SqlDB * _db):db(_db), term(0), index(0){};
LogDB(SqlDB * _db):db(_db), next_index(0), term(0){};
virtual ~LogDB(){};
@ -40,36 +41,34 @@ public:
next_index = i;
}
int exec_wr(ostringstream& cmd)
/**
* Return the request associated to the given logdb record. If there is
* no client waiting for its replication it is loaded from the DB.
* @param index of the associated logDB entry
* @return the LogDB replication request
*
*/
LogDBRequest * get_request(unsigned int index);
// -------------------------------------------------------------------------
// SQL interface
// -------------------------------------------------------------------------
/**
* This function replicates the DB changes on followers before updating
* the DB state
*/
int exec_wr(ostringstream& cmd);
virtual int exec_bootstrap(ostringstream& cmd)
{
int rc;
//TODO: WRITE RECORD IN DB
//
LogDBRecord * lr = new LogDBRequest(next_index, term, cmd);
next_index++;
//LogDBManager->triger(NEW_LOG_RECORD);
lr.wait();
if ( lr.result == true )
{
rc = exec(cmd, 0, false);
}
else
{
rc = -1;
//Nebula::Log
}
return rc;
return db->exec_bootstrap(cmd);
}
virtual int exec_rd(ostringstream& cmd, Callbackable* obj)
{
return db->exec_rd(cmd, obj);
}
// -------------------------------------------------------------------------
// SQL interface. Use database store implementation
// -------------------------------------------------------------------------
char * escape_str(const string& str)
{
return db->escape_str(str);
@ -88,7 +87,7 @@ public:
protected:
int exec(ostringstream& cmd, Callbackable* obj, bool quiet)
{
return db->exec(cmd, obj, quiet);
return -1;
}
private:
@ -108,6 +107,40 @@ private:
*/
unsigned int term;
/**
* List of pending requests (a client is waiting for the log entry to be
* replicated in a majority of followers)
*/
std::map<unsigned int, LogDBRequest *> requests;
// -------------------------------------------------------------------------
// DataBase implementation
// -------------------------------------------------------------------------
static const char * table;
static const char * db_names;
static const char * db_bootstrap;
/**
* This function loads a log record from the database and returns the an
* associated replication request
* @param index of the record
*
* @return the request 0 if failure
*/
int select_cb(void *req, int num, char **values, char **names);
LogDBRequest * select(int index);
/**
* Inserts or update a log record in the database
* @param request associated to the logDB entry to be inserted/updated
* @param replace true to replace an existing entry
*
* @return 0 on success
*/
int insert_replace(LogDBRequest * request, bool replace);
};
#endif /*LOG_DB_H_*/

View File

@ -20,7 +20,7 @@
#include <string>
#include <sstream>
#include "SqlDB.h"
#include "SyncRequest.h"
/**
* This class represents a log entry replication request. The replication request
@ -30,11 +30,9 @@
class LogDBRequest : public SyncRequest
{
public:
LogDBRequest(unsigned int i, unsigned int t, std::ostringstream o):
index(i), term(t), sql(o.str()), to_commit(-1), replicas(1)
{
pthread_mutex_init(&mutex, 0);
};
LogDBRequest(unsigned int i, unsigned int t, const std::ostringstream& o);
LogDBRequest(unsigned int i, unsigned int t, const char * s);
virtual ~LogDBRequest(){};
@ -43,30 +41,25 @@ public:
* this entry. If it reaches 0, the client is notified
* @return number of replicas for this log
*/
int replicated()
int replicated();
/* ---------------------------------------------------------------------- */
/* Class access methods */
/* ---------------------------------------------------------------------- */
unsigned int index()
{
int _replicas;
return _index;
};
lock();
unsigned int term()
{
return _term;
};
replicas++;
to_commit--;
_replicas = replicas;
if ( to_commit == 0 )
{
result = true;
timeout = false;
notify();
}
unlock();
return _replicas;
}
const std::string& sql()
{
return _sql;
};
private:
pthread_mutex_t mutex;
@ -74,17 +67,17 @@ private:
/**
* Index for this log entry
*/
unsigned int index;
unsigned int _index;
/**
* Term where this log entry was generated
*/
unsigned int term;
unsigned int _term;
/**
* SQL command to exec in the DB to update (INSERT, REPLACE, DROP)
*/
std::string sql;
std::string _sql;
/**
* Remaining number of servers that need to replicate this record to commit

View File

@ -48,7 +48,7 @@ public:
/**
* Error message for negative results
*/
string message;
std::string message;
/**
* Time out, true if the request ended because of a time out
@ -78,6 +78,14 @@ public:
am.loop();
};
/**
* Wait for the AuthRequest to be completed
*/
void wait(time_t t)
{
am.loop(t);
};
protected:
friend class MadManager;
@ -91,6 +99,18 @@ protected:
* The ActionManager that will be notify when the request is ready.
*/
ActionManager am;
/**
* Timer action to finalize time-out waits
*/
void timer_action(const ActionRequest& ar)
{
result = false;
timeout = true;
message = "Operation time out";
am.finalize();
};
};
#endif /*SYNC_REQUEST_H_*/

168
src/logdb/LogDB.cc Normal file
View File

@ -0,0 +1,168 @@
/* -------------------------------------------------------------------------- */
/* Copyright 2002-2016, OpenNebula Project, OpenNebula Systems */
/* */
/* Licensed under the Apache License, Version 2.0 (the "License"); you may */
/* not use this file except in compliance with the License. You may obtain */
/* a copy of the License at */
/* */
/* http://www.apache.org/licenses/LICENSE-2.0 */
/* */
/* Unless required by applicable law or agreed to in writing, software */
/* distributed under the License is distributed on an "AS IS" BASIS, */
/* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. */
/* See the License for the specific language governing permissions and */
/* limitations under the License. */
/* -------------------------------------------------------------------------- */
#include "LogDB.h"
#include "Nebula.h"
/* -------------------------------------------------------------------------- */
/* -------------------------------------------------------------------------- */
const char * LogDB::table = "logdb";
const char * LogDB::db_names = "index, term, sql";
const char * LogDB::db_bootstrap = "CREATE TABLE IF NOT EXISTS "
"logdb (index INTEGER, term INTEGER, sql MEDIUMTEXT, PRIMARY KEY(index))";
/* -------------------------------------------------------------------------- */
/* -------------------------------------------------------------------------- */
LogDBRequest * LogDB::get_request(unsigned int index)
{
std::map<unsigned int, LogDBRequest *>::iterator it;
it = requests.find(index);
if ( it == requests.end() )
{
LogDBRequest * req = select(index);
if ( req != 0 )
{
requests.insert(std::make_pair(index, req));
}
return req;
}
return it->second;
}
/* -------------------------------------------------------------------------- */
/* -------------------------------------------------------------------------- */
int LogDB::exec_wr(ostringstream& cmd)
{
int rc;
// Insert log entry in the Database
LogDBRequest * lr = new LogDBRequest(next_index, term, cmd);
if ( insert_replace(lr, false) != 0 )
{
NebulaLog::log("DBM", Log::ERROR, "Cannot insert log record in DB");
}
// Store the replication request in the active requests map
requests.insert(std::make_pair(next_index, lr));
next_index++;
//LogDBManager->triger(NEW_LOG_RECORD);
// Wait for completion
lr->wait();
if ( lr->result == true ) //Record replicated on majority of followers
{
rc = db->exec_wr(cmd);
}
else
{
std::ostringstream oss;
oss << "Cannot replicate log record on followers: " << lr->message;
NebulaLog::log("DBM", Log::ERROR, oss);
rc = -1;
}
return rc;
}
/* -------------------------------------------------------------------------- */
/* -------------------------------------------------------------------------- */
int LogDB::select_cb(void *req, int num, char **values, char **names)
{
if ( (!values[0]) || (!values[1]) || (!values[2]) || (num != 1) )
{
return -1;
}
LogDBRequest ** request = static_cast<LogDBRequest **>(req);
unsigned int index = static_cast<unsigned int>(atoi(values[0]));
unsigned int term = static_cast<unsigned int>(atoi(values[1]));
*request = new LogDBRequest(index, term, values[2]);
return 0;
}
/* -------------------------------------------------------------------------- */
/* -------------------------------------------------------------------------- */
LogDBRequest * LogDB::select(int index)
{
ostringstream oss;
LogDBRequest * request = 0;
oss << "SELECT index, term, sql FROM logdb WHERE index = " << index;
set_callback(static_cast<Callbackable::Callback>(&LogDB::select_cb),
static_cast<void *>(&request));
db->exec_rd(oss, this);
return request;
}
/* -------------------------------------------------------------------------- */
/* -------------------------------------------------------------------------- */
int LogDB::insert_replace(LogDBRequest * request, bool replace)
{
std::ostringstream oss;
char * sql_db = db->escape_str(request->sql().c_str());
if ( sql_db == 0 )
{
return -1;
}
if (replace)
{
oss << "REPLACE";
}
else
{
oss << "INSERT";
}
oss << " INTO " << table << " ("<< db_names <<") VALUES ("
<< request->index() << ","
<< request->term() << ","
<< "'" << sql_db << "')";
int rc = db->exec_wr(oss);
db->free_str(sql_db);
return rc;
}

115
src/logdb/LogDBManager.cc Normal file
View File

@ -0,0 +1,115 @@
/* -------------------------------------------------------------------------- */
/* Copyright 2002-2016, OpenNebula Project, OpenNebula Systems */
/* */
/* Licensed under the Apache License, Version 2.0 (the "License"); you may */
/* not use this file except in compliance with the License. You may obtain */
/* a copy of the License at */
/* */
/* http://www.apache.org/licenses/LICENSE-2.0 */
/* */
/* Unless required by applicable law or agreed to in writing, software */
/* distributed under the License is distributed on an "AS IS" BASIS, */
/* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. */
/* See the License for the specific language governing permissions and */
/* limitations under the License. */
/* -------------------------------------------------------------------------- */
#ifndef LOG_DB_MANAGER_H_
#define LOG_DB_MANAGER_H_
#include "ActionManager.h"
#include "LogDBRecord.h"
#include "ZoneServer.h"
extern "C" void * logdb_manager_loop(void *arg);
/* -------------------------------------------------------------------------- */
/* -------------------------------------------------------------------------- */
class LogDBAction : public ActionRequest
{
public:
enum Actions
{
NEW_LOGDB_RECORD,
DELETE_SERVER
}
LogDBAction(Actions a, LogDBRequest * r):ActionRequest(ActionRequest::USER),
_action(a), _request(r){};
LogDBAction(const LogDBAction& o):ActionRequest(o._type),
_action(o._action), _request(o._request){};
Actions action() const
{
return _action;
}
LogDBRequest * request() const
{
return _request;
}
ActionRequest * clone() const
{
return new LogDBAction(*this);
}
private:
Action _action;
LogDBRequest * _request;
};
// -----------------------------------------------------------------------------
// -----------------------------------------------------------------------------
class LogDBManager : public ActionListener
{
private:
class LogDBManagerThread
{
public:
LogDBManagerThread(ZoneServer * z):replicate(false), zserver(z)
{
pthread_mutex_init(&mutex, 0);
pthread_cond_init(&cond, 0);
};
virtual ~LogDBManagerThread(){};
void do_replication();
private:
pthread_t thread_id;
pthread_mutex_t mutex;
pthread_cond_t cond;
bool replicate;
ZoneServer * zserver;
}
/**
* LogDB records being replicated on followers
*/
std::map<unsigned int, LogDBRecord *> log;
// -------------------------------------------------------------------------
// Action Listener interface
// -------------------------------------------------------------------------
void finalize_action(const ActionRequest& ar)
{
NebulaLog::log("DBM",Log::INFO,"Stopping LogDB Manager...");
};
void user_action(const ActionRequest& ar);
}
#endif /*LOG_DB_H_*/

65
src/logdb/LogDBRequest.cc Normal file
View File

@ -0,0 +1,65 @@
/* -------------------------------------------------------------------------- */
/* Copyright 2002-2016, OpenNebula Project, OpenNebula Systems */
/* */
/* Licensed under the Apache License, Version 2.0 (the "License"); you may */
/* not use this file except in compliance with the License. You may obtain */
/* a copy of the License at */
/* */
/* http://www.apache.org/licenses/LICENSE-2.0 */
/* */
/* Unless required by applicable law or agreed to in writing, software */
/* distributed under the License is distributed on an "AS IS" BASIS, */
/* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. */
/* See the License for the specific language governing permissions and */
/* limitations under the License. */
/* -------------------------------------------------------------------------- */
#include "LogDBRequest.h"
// -----------------------------------------------------------------------------
// -----------------------------------------------------------------------------
LogDBRequest::LogDBRequest(unsigned int i, unsigned int t,
const std::ostringstream& o):
_index(i), _term(t), _sql(o.str()), to_commit(-1), replicas(1)
{
pthread_mutex_init(&mutex, 0);
};
// -----------------------------------------------------------------------------
// -----------------------------------------------------------------------------
LogDBRequest::LogDBRequest(unsigned int i, unsigned int t, const char * s):
_index(i), _term(t), _sql(s), to_commit(-1), replicas(1)
{
pthread_mutex_init(&mutex, 0);
};
// -----------------------------------------------------------------------------
// -----------------------------------------------------------------------------
int LogDBRequest::replicated()
{
int _replicas;
lock();
replicas++;
to_commit--;
_replicas = replicas;
if ( to_commit == 0 )
{
result = true;
timeout = false;
notify();
}
unlock();
return _replicas;
}

30
src/logdb/SConstruct Normal file
View File

@ -0,0 +1,30 @@
# SConstruct for src/log
# -------------------------------------------------------------------------- #
# Copyright 2002-2016, OpenNebula Project, OpenNebula Systems #
# #
# Licensed under the Apache License, Version 2.0 (the "License"); you may #
# not use this file except in compliance with the License. You may obtain #
# a copy of the License at #
# #
# http://www.apache.org/licenses/LICENSE-2.0 #
# #
# Unless required by applicable law or agreed to in writing, software #
# distributed under the License is distributed on an "AS IS" BASIS, #
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. #
# See the License for the specific language governing permissions and #
# limitations under the License. #
#--------------------------------------------------------------------------- #
Import('env')
lib_name='nebula_logdb'
# Sources to generate the library
source_files=[
'LogDB.cc',
'LogDBRequest.cc'
]
# Build library
env.StaticLibrary(lib_name, source_files)