1
0
mirror of https://github.com/OpenNebula/one.git synced 2025-03-25 02:50:08 +03:00

Merge branch 'bug-sqlerror'

This commit is contained in:
Ruben S. Montero 2019-02-13 11:40:10 +01:00
commit 063c429083
9 changed files with 187 additions and 116 deletions

View File

@ -79,7 +79,7 @@ private:
* This class implements a generic DB interface with replication. The associated
* DB stores a log to replicate on followers.
*/
class LogDB : public SqlDB, Callbackable
class LogDB : public SqlDB
{
public:
LogDB(SqlDB * _db, bool solo, unsigned int log_retention,
@ -104,7 +104,7 @@ public:
* timestamp of the record is updated.
* @param index of the log record
*/
int apply_log_records(unsigned int commit_index);
int apply_log_records(unsigned int commit_index);
/**
* Deletes the record in start_index and all that follow it
@ -218,6 +218,10 @@ public:
return db->limit_support();
}
bool fts_available()
{
return db->fts_available();
}
// -------------------------------------------------------------------------
// Database methods
// -------------------------------------------------------------------------
@ -251,16 +255,11 @@ public:
int next_federated(int index);
bool fts_available()
{
return db->fts_available();
}
protected:
int exec(std::ostringstream& cmd, Callbackable* obj, bool quiet)
std::error_code execute(std::ostringstream& cmd, Callbackable *obj, bool quiet)
{
return -1;
}
return SqlError::INTERNAL;
};
private:
pthread_mutex_t mutex;
@ -338,11 +337,6 @@ private:
*/
int _exec_wr(ostringstream& cmd, int federated);
/**
* Callback to store the IDs of federated records in the federated log.
*/
int index_cb(void *null, int num, char **values, char **names);
/**
* Applies the SQL command of the given record to the database. The
* timestamp of the record is updated.
@ -427,10 +421,10 @@ public:
}
protected:
int exec(std::ostringstream& cmd, Callbackable* obj, bool quiet)
std::error_code execute(std::ostringstream& cmd, Callbackable *obj, bool quiet)
{
return -1;
}
return SqlError::INTERNAL;
};
private:

View File

@ -96,7 +96,7 @@ protected:
* @param obj Callbackable obj to call if the query succeeds
* @return 0 on success
*/
int exec(ostringstream& cmd, Callbackable* obj, bool quiet);
std::error_code execute(std::ostringstream& cmd, Callbackable *obj, bool quiet);
private:

View File

@ -20,7 +20,32 @@
#include <sstream>
#include "Callbackable.h"
using namespace std;
/**
* Abstract error conditions on SQL backends
*/
enum class SqlError
{
SUCCESS = 0x0000000,
INTERNAL = 0x0000001,
CONNECTION = 0x1000000,
SQL = 0x2000000,
SQL_DUP_KEY= 0x2000001
};
namespace std
{
template<>
struct is_error_code_enum<SqlError> : true_type {};
};
struct SqlErrorCategory : std::error_category
{
const char * name() const noexcept override;
std::string message(int e) const override;
};
std::error_code make_error_code(SqlError e);
/**
* SqlDB class.Provides an abstract interface to implement a SQL backend
@ -46,33 +71,40 @@ public:
* @param callbak function to execute on each data returned
* @return 0 on success
*/
virtual int exec_local_wr(ostringstream& cmd)
virtual int exec_local_wr(std::ostringstream& cmd)
{
return exec(cmd, 0, false);
}
virtual int exec_rd(ostringstream& cmd, Callbackable* obj)
virtual int exec_rd(std::ostringstream& cmd, Callbackable* obj)
{
return exec(cmd, obj, false);
}
virtual int exec_wr(ostringstream& cmd)
virtual int exec_wr(std::ostringstream& cmd)
{
return exec(cmd, 0, false);
}
virtual int exec_wr(ostringstream& cmd, Callbackable* obj)
virtual int exec_wr(std::ostringstream& cmd, Callbackable* obj)
{
return exec(cmd, obj, false);
}
/* ---------------------------------------------------------------------- */
std::error_code execute_wr(std::ostringstream& cmd)
{
return execute(cmd, 0, false);
}
/**
* This function returns a legal SQL string that can be used in an SQL
* statement.
* @param str the string to be escaped
* @return a valid SQL string or NULL in case of failure
*/
virtual char * escape_str(const string& str) = 0;
virtual char * escape_str(const std::string& str) = 0;
/**
* Frees a previously scaped string
@ -109,7 +141,22 @@ protected:
* @param quiet True to log errors with DDEBUG level instead of ERROR
* @return 0 on success
*/
virtual int exec(ostringstream& cmd, Callbackable* obj, bool quiet) = 0;
int exec(std::ostringstream& cmd, Callbackable* obj, bool quiet)
{
int rc = 0;
std::error_code ec = execute(cmd, obj, quiet);
if (ec != SqlError::SUCCESS)
{
rc = -1;
};
return rc;
}
virtual std::error_code execute(std::ostringstream& cmd, Callbackable *obj,
bool quiet) = 0;
};
#endif /*SQL_DB_H_*/

View File

@ -92,7 +92,7 @@ protected:
* @param arg to pass to the callback function
* @return 0 on success
*/
int exec(ostringstream& cmd, Callbackable* obj, bool quiet);
std::error_code execute(std::ostringstream& cmd, Callbackable *obj, bool quiet);
private:
/**

View File

@ -331,7 +331,7 @@ void Nebula::start(bool bootstrap_only)
if ( (solo && local_bootstrap) || bootstrap_only)
{
if ( logdb->bootstrap(db_backend) != 0 )
if ( LogDB::bootstrap(db_backend) != 0 )
{
throw runtime_error("Error bootstrapping database.");
}

View File

@ -352,12 +352,13 @@ int LogDB::insert(int index, int term, const std::string& sql, time_t tstamp,
int LogDB::apply_log_record(LogDBRecord * lr)
{
ostringstream oss_sql;
int rc = -1;
oss_sql.str(lr->sql);
int rc = db->exec_wr(oss_sql);
std::error_code ec = db->execute_wr(oss_sql);
if ( rc == 0 )
if (ec == SqlError::SUCCESS || ec == SqlError::SQL_DUP_KEY)
{
std::ostringstream oss;
@ -370,6 +371,8 @@ int LogDB::apply_log_record(LogDBRecord * lr)
}
last_applied = lr->index;
rc = 0;
}
return rc;
@ -525,13 +528,13 @@ int LogDB::delete_log_records(unsigned int start_index)
if ( rc == 0 )
{
LogDBRecord lr;
LogDBRecord lr;
next_index = start_index;
last_index = start_index - 1;
if ( get_log_record(last_index, lr) == 0 )
if ( get_log_record(last_index, lr) == 0 )
{
last_term = lr.term;
}
@ -539,7 +542,7 @@ int LogDB::delete_log_records(unsigned int start_index)
pthread_mutex_unlock(&mutex);
return rc;
return rc;
}
/* -------------------------------------------------------------------------- */
@ -549,26 +552,26 @@ int LogDB::apply_log_records(unsigned int commit_index)
{
pthread_mutex_lock(&mutex);
while (last_applied < commit_index )
{
LogDBRecord lr;
while (last_applied < commit_index )
{
LogDBRecord lr;
if ( get_log_record(last_applied + 1, lr) != 0 )
{
if ( get_log_record(last_applied + 1, lr) != 0 )
{
pthread_mutex_unlock(&mutex);
return -1;
}
return -1;
}
if ( apply_log_record(&lr) != 0 )
{
if ( apply_log_record(&lr) != 0 )
{
pthread_mutex_unlock(&mutex);
return -1;
}
}
return -1;
}
}
pthread_mutex_unlock(&mutex);
return 0;
return 0;
}
/* -------------------------------------------------------------------------- */
@ -609,7 +612,7 @@ int LogDB::purge_log()
/* ---------------------------------------------------------------------- */
/* Federated records. Keep last log_retention federated records */
/* ---------------------------------------------------------------------- */
if ( fed_log.size() < log_retention )
if ( fed_log.size() < log_retention )
{
pthread_mutex_unlock(&mutex);
@ -642,7 +645,7 @@ int LogDB::purge_log()
build_federated_index();
pthread_mutex_unlock(&mutex);
return rc;
}
@ -670,7 +673,7 @@ int LogDB::replicate(int rindex)
}
else if ( rr.result == true ) //Record replicated on majority of followers
{
rc = apply_log_records(rindex);
rc = apply_log_records(rindex);
}
else
{
@ -688,17 +691,6 @@ int LogDB::replicate(int rindex)
/* -------------------------------------------------------------------------- */
/* -------------------------------------------------------------------------- */
int LogDB::index_cb(void *null, int num, char **values, char **names)
{
if ( num == 0 || values == 0 || values[0] == 0 )
{
return -1;
}
fed_log.insert(atoi(values[0]));
return 0;
}
void LogDB::build_federated_index()
{
@ -706,13 +698,15 @@ void LogDB::build_federated_index()
fed_log.clear();
set_callback(static_cast<Callbackable::Callback>(&LogDB::index_cb), 0);
set_cb<int> cb;
cb.set_callback(&fed_log);
oss << "SELECT fed_index FROM " << table << " WHERE fed_index != -1 ";
db->exec_rd(oss, this);
db->exec_rd(oss, &cb);
unset_callback();
cb.unset_callback();
}
/* -------------------------------------------------------------------------- */

View File

@ -16,9 +16,10 @@
#include "MySqlDB.h"
#include <mysql/errmsg.h>
#include <mysqld_error.h>
/*********
* Doc: http://dev.mysql.com/doc/refman/5.5/en/c-api-function-overview.html
* Doc: https://dev.mysql.com/doc/refman/8.0/en/c-api.html
********/
/* -------------------------------------------------------------------------- */
@ -162,11 +163,13 @@ bool MySqlDB::limit_support()
/* -------------------------------------------------------------------------- */
int MySqlDB::exec(ostringstream& cmd, Callbackable* obj, bool quiet)
std::error_code MySqlDB::execute(std::ostringstream& cmd, Callbackable *obj, bool quiet)
{
string str = cmd.str();
const char * c_str = str.c_str();
std::error_code ec = SqlError::SUCCESS;
Log::MessageType error_level = quiet ? Log::DDEBUG : Log::ERROR;
struct timespec timer;
@ -179,37 +182,52 @@ int MySqlDB::exec(ostringstream& cmd, Callbackable* obj, bool quiet)
if (rc != 0)
{
ostringstream oss;
const char * err_msg = mysql_error(db);
int err_num = mysql_errno(db);
ostringstream oss;
if( err_num == CR_SERVER_GONE_ERROR || err_num == CR_SERVER_LOST )
const char * err_msg = mysql_error(db);
int err_num = mysql_errno(db);
switch(err_num)
{
oss << "MySQL connection error " << err_num << " : " << err_msg;
case CR_SERVER_GONE_ERROR:
case CR_SERVER_LOST:
oss << "MySQL connection error " << err_num << " : " << err_msg;
// Try to re-connect
if (mysql_real_connect(db, server.c_str(), user.c_str(),
password.c_str(), database.c_str(),
port, NULL, 0))
{
oss << "... Reconnected.";
}
else
{
oss << "... Reconnection attempt failed.";
}
// Try to re-connect
if (mysql_real_connect(db, server.c_str(), user.c_str(),
password.c_str(), database.c_str(), port, NULL, 0))
{
oss << "... Reconnected.";
}
else
{
oss << "... Reconnection attempt failed.";
}
ec = SqlError::CONNECTION;
break;
// Error codes that should be considered applied for the RAFT log.
case ER_DUP_ENTRY:
ec = SqlError::SQL_DUP_KEY;
break;
default:
ec = SqlError::SQL; //Default exit code for errors
break;
}
else
if (ec != SqlError::CONNECTION)
{
oss << "SQL command was: " << c_str;
oss << ", error " << err_num << " : " << err_msg;
}
NebulaLog::log("ONE",error_level,oss);
NebulaLog::log("ONE", error_level, oss);
free_db_connection(db);
return -1;
return ec;
}
if (obj != 0)
@ -238,12 +256,12 @@ int MySqlDB::exec(ostringstream& cmd, Callbackable* obj, bool quiet)
free_db_connection(db);
return -1;
return make_error_code(SqlError::SQL);
}
// Fetch the names of the fields
num_fields = mysql_num_fields(result);
fields = mysql_fetch_fields(result);
num_fields = mysql_num_fields(result);
fields = mysql_fetch_fields(result);
char ** names = new char*[num_fields];
@ -257,7 +275,7 @@ int MySqlDB::exec(ostringstream& cmd, Callbackable* obj, bool quiet)
{
if ( obj->do_callback(num_fields, row, names) != 0 )
{
rc = -1;
ec = make_error_code(SqlError::SQL);
break;
}
}
@ -288,7 +306,7 @@ int MySqlDB::exec(ostringstream& cmd, Callbackable* obj, bool quiet)
NebulaLog::log("SQL", Log::WARNING, oss);
}
return rc;
return ec;
}
/* -------------------------------------------------------------------------- */
@ -362,4 +380,4 @@ bool MySqlDB::fts_available()
}
}
/* -------------------------------------------------------------------------- */
/* -------------------------------------------------------------------------- */

View File

@ -20,7 +20,7 @@ Import('env')
lib_name='nebula_sql'
source_files=['LogDB.cc']
source_files=['LogDB.cc', 'SqlDB.cc']
# Sources to generate the library
if env['sqlite']=='yes':

View File

@ -43,11 +43,14 @@ extern "C" int sqlite_callback (
SqliteDB::SqliteDB(const string& db_name)
{
int rc;
pthread_mutex_init(&mutex,0);
rc = sqlite3_open(db_name.c_str(), &db);
int rc = sqlite3_open(db_name.c_str(), &db);
if ( rc != SQLITE_OK )
{
throw runtime_error("Could not open database.");
}
enable_limit = sqlite3_compileoption_used("SQLITE_ENABLE_UPDATE_DELETE_LIMIT");
@ -56,10 +59,7 @@ SqliteDB::SqliteDB(const string& db_name)
NebulaLog::log("ONE",Log::INFO , "sqlite has enabled: SQLITE_ENABLE_UPDATE_DELETE_LIMIT");
}
if ( rc != SQLITE_OK )
{
throw runtime_error("Could not open database.");
}
sqlite3_extended_result_codes(db, 1);
}
/* -------------------------------------------------------------------------- */
@ -89,19 +89,24 @@ bool SqliteDB::limit_support()
/* -------------------------------------------------------------------------- */
int SqliteDB::exec(ostringstream& cmd, Callbackable* obj, bool quiet)
std::error_code SqliteDB::execute(std::ostringstream& cmd, Callbackable *obj, bool quiet)
{
int rc;
int rc;
const char * c_str;
string str;
int counter = 0;
char * err_msg = 0;
int counter = 0;
char * err_msg = 0;
int (*callback)(void*,int,char**,char**);
void * arg;
Log::MessageType error_level;
std::ostringstream oss;
std::error_code ec;
str = cmd.str();
c_str = str.c_str();
@ -131,8 +136,7 @@ int SqliteDB::exec(ostringstream& cmd, Callbackable* obj, bool quiet)
select(0, NULL, NULL, NULL, &timeout);
}
}while( (rc == SQLITE_BUSY || rc == SQLITE_IOERR) &&
(counter < 10));
}while((rc == SQLITE_BUSY || rc == SQLITE_IOERR) && (counter < 10));
if (obj != 0 && obj->get_affected_rows() == 0)
{
@ -146,24 +150,38 @@ int SqliteDB::exec(ostringstream& cmd, Callbackable* obj, bool quiet)
unlock();
if (rc != SQLITE_OK)
switch(rc)
{
if (err_msg != 0)
{
Log::MessageType error_level = quiet ? Log::DDEBUG : Log::ERROR;
case SQLITE_BUSY:
case SQLITE_IOERR:
ec = SqlError::CONNECTION;
break;
ostringstream oss;
case SQLITE_OK:
ec = SqlError::SUCCESS;
break;
oss << "SQL command was: " << c_str << ", error: " << err_msg;
NebulaLog::log("ONE",error_level,oss);
// Error codes that should be considered applied for the RAFT log.
case SQLITE_CONSTRAINT_UNIQUE:
ec = SqlError::SQL_DUP_KEY;
break;
sqlite3_free(err_msg);
}
return -1;
default:
ec = SqlError::SQL;
break;
}
return 0;
if ( ec != SqlError::SUCCESS && err_msg != NULL )
{
error_level = quiet ? Log::DDEBUG : Log::ERROR;
oss << "SQL command was: " << c_str << ", error: " << err_msg;
NebulaLog::log("ONE",error_level,oss);
sqlite3_free(err_msg);
}
return ec;
}
/* -------------------------------------------------------------------------- */