diff --git a/include/LogDB.h b/include/LogDB.h index 1eb46362ed..664c14f19f 100644 --- a/include/LogDB.h +++ b/include/LogDB.h @@ -79,7 +79,7 @@ private: * This class implements a generic DB interface with replication. The associated * DB stores a log to replicate on followers. */ -class LogDB : public SqlDB, Callbackable +class LogDB : public SqlDB { public: LogDB(SqlDB * _db, bool solo, unsigned int log_retention, @@ -104,7 +104,7 @@ public: * timestamp of the record is updated. * @param index of the log record */ - int apply_log_records(unsigned int commit_index); + int apply_log_records(unsigned int commit_index); /** * Deletes the record in start_index and all that follow it @@ -218,6 +218,10 @@ public: return db->limit_support(); } + bool fts_available() + { + return db->fts_available(); + } // ------------------------------------------------------------------------- // Database methods // ------------------------------------------------------------------------- @@ -251,16 +255,11 @@ public: int next_federated(int index); - bool fts_available() - { - return db->fts_available(); - } - protected: - int exec(std::ostringstream& cmd, Callbackable* obj, bool quiet) + std::error_code execute(std::ostringstream& cmd, Callbackable *obj, bool quiet) { - return -1; - } + return SqlError::INTERNAL; + }; private: pthread_mutex_t mutex; @@ -338,11 +337,6 @@ private: */ int _exec_wr(ostringstream& cmd, int federated); - /** - * Callback to store the IDs of federated records in the federated log. - */ - int index_cb(void *null, int num, char **values, char **names); - /** * Applies the SQL command of the given record to the database. The * timestamp of the record is updated. @@ -427,10 +421,10 @@ public: } protected: - int exec(std::ostringstream& cmd, Callbackable* obj, bool quiet) + std::error_code execute(std::ostringstream& cmd, Callbackable *obj, bool quiet) { - return -1; - } + return SqlError::INTERNAL; + }; private: diff --git a/include/MySqlDB.h b/include/MySqlDB.h index 87f7ef4a79..ff4570d5a3 100644 --- a/include/MySqlDB.h +++ b/include/MySqlDB.h @@ -96,7 +96,7 @@ protected: * @param obj Callbackable obj to call if the query succeeds * @return 0 on success */ - int exec(ostringstream& cmd, Callbackable* obj, bool quiet); + std::error_code execute(std::ostringstream& cmd, Callbackable *obj, bool quiet); private: diff --git a/include/SqlDB.h b/include/SqlDB.h index 64b8a4b3b8..bc02c07357 100644 --- a/include/SqlDB.h +++ b/include/SqlDB.h @@ -20,7 +20,32 @@ #include #include "Callbackable.h" -using namespace std; +/** + * Abstract error conditions on SQL backends + */ +enum class SqlError +{ + SUCCESS = 0x0000000, + INTERNAL = 0x0000001, + CONNECTION = 0x1000000, + SQL = 0x2000000, + SQL_DUP_KEY= 0x2000001 +}; + +namespace std +{ + template<> + struct is_error_code_enum : true_type {}; +}; + +struct SqlErrorCategory : std::error_category +{ + const char * name() const noexcept override; + + std::string message(int e) const override; +}; + +std::error_code make_error_code(SqlError e); /** * SqlDB class.Provides an abstract interface to implement a SQL backend @@ -46,33 +71,40 @@ public: * @param callbak function to execute on each data returned * @return 0 on success */ - virtual int exec_local_wr(ostringstream& cmd) + virtual int exec_local_wr(std::ostringstream& cmd) { return exec(cmd, 0, false); } - virtual int exec_rd(ostringstream& cmd, Callbackable* obj) + virtual int exec_rd(std::ostringstream& cmd, Callbackable* obj) { return exec(cmd, obj, false); } - virtual int exec_wr(ostringstream& cmd) + virtual int exec_wr(std::ostringstream& cmd) { return exec(cmd, 0, false); } - virtual int exec_wr(ostringstream& cmd, Callbackable* obj) + virtual int exec_wr(std::ostringstream& cmd, Callbackable* obj) { return exec(cmd, obj, false); } + /* ---------------------------------------------------------------------- */ + + std::error_code execute_wr(std::ostringstream& cmd) + { + return execute(cmd, 0, false); + } + /** * This function returns a legal SQL string that can be used in an SQL * statement. * @param str the string to be escaped * @return a valid SQL string or NULL in case of failure */ - virtual char * escape_str(const string& str) = 0; + virtual char * escape_str(const std::string& str) = 0; /** * Frees a previously scaped string @@ -109,7 +141,22 @@ protected: * @param quiet True to log errors with DDEBUG level instead of ERROR * @return 0 on success */ - virtual int exec(ostringstream& cmd, Callbackable* obj, bool quiet) = 0; + int exec(std::ostringstream& cmd, Callbackable* obj, bool quiet) + { + int rc = 0; + + std::error_code ec = execute(cmd, obj, quiet); + + if (ec != SqlError::SUCCESS) + { + rc = -1; + }; + + return rc; + } + + virtual std::error_code execute(std::ostringstream& cmd, Callbackable *obj, + bool quiet) = 0; }; #endif /*SQL_DB_H_*/ diff --git a/include/SqliteDB.h b/include/SqliteDB.h index 13c2974cca..8f88807401 100644 --- a/include/SqliteDB.h +++ b/include/SqliteDB.h @@ -92,7 +92,7 @@ protected: * @param arg to pass to the callback function * @return 0 on success */ - int exec(ostringstream& cmd, Callbackable* obj, bool quiet); + std::error_code execute(std::ostringstream& cmd, Callbackable *obj, bool quiet); private: /** diff --git a/src/nebula/Nebula.cc b/src/nebula/Nebula.cc index 291d42a52f..5f4353c1fc 100644 --- a/src/nebula/Nebula.cc +++ b/src/nebula/Nebula.cc @@ -331,7 +331,7 @@ void Nebula::start(bool bootstrap_only) if ( (solo && local_bootstrap) || bootstrap_only) { - if ( logdb->bootstrap(db_backend) != 0 ) + if ( LogDB::bootstrap(db_backend) != 0 ) { throw runtime_error("Error bootstrapping database."); } diff --git a/src/sql/LogDB.cc b/src/sql/LogDB.cc index a79333ccfc..d22c8ca309 100644 --- a/src/sql/LogDB.cc +++ b/src/sql/LogDB.cc @@ -352,12 +352,13 @@ int LogDB::insert(int index, int term, const std::string& sql, time_t tstamp, int LogDB::apply_log_record(LogDBRecord * lr) { ostringstream oss_sql; + int rc = -1; oss_sql.str(lr->sql); - int rc = db->exec_wr(oss_sql); + std::error_code ec = db->execute_wr(oss_sql); - if ( rc == 0 ) + if (ec == SqlError::SUCCESS || ec == SqlError::SQL_DUP_KEY) { std::ostringstream oss; @@ -370,6 +371,8 @@ int LogDB::apply_log_record(LogDBRecord * lr) } last_applied = lr->index; + + rc = 0; } return rc; @@ -525,13 +528,13 @@ int LogDB::delete_log_records(unsigned int start_index) if ( rc == 0 ) { - LogDBRecord lr; + LogDBRecord lr; next_index = start_index; last_index = start_index - 1; - if ( get_log_record(last_index, lr) == 0 ) + if ( get_log_record(last_index, lr) == 0 ) { last_term = lr.term; } @@ -539,7 +542,7 @@ int LogDB::delete_log_records(unsigned int start_index) pthread_mutex_unlock(&mutex); - return rc; + return rc; } /* -------------------------------------------------------------------------- */ @@ -549,26 +552,26 @@ int LogDB::apply_log_records(unsigned int commit_index) { pthread_mutex_lock(&mutex); - while (last_applied < commit_index ) - { - LogDBRecord lr; + while (last_applied < commit_index ) + { + LogDBRecord lr; - if ( get_log_record(last_applied + 1, lr) != 0 ) - { + if ( get_log_record(last_applied + 1, lr) != 0 ) + { pthread_mutex_unlock(&mutex); - return -1; - } + return -1; + } - if ( apply_log_record(&lr) != 0 ) - { + if ( apply_log_record(&lr) != 0 ) + { pthread_mutex_unlock(&mutex); - return -1; - } - } + return -1; + } + } pthread_mutex_unlock(&mutex); - return 0; + return 0; } /* -------------------------------------------------------------------------- */ @@ -609,7 +612,7 @@ int LogDB::purge_log() /* ---------------------------------------------------------------------- */ /* Federated records. Keep last log_retention federated records */ /* ---------------------------------------------------------------------- */ - if ( fed_log.size() < log_retention ) + if ( fed_log.size() < log_retention ) { pthread_mutex_unlock(&mutex); @@ -642,7 +645,7 @@ int LogDB::purge_log() build_federated_index(); pthread_mutex_unlock(&mutex); - + return rc; } @@ -670,7 +673,7 @@ int LogDB::replicate(int rindex) } else if ( rr.result == true ) //Record replicated on majority of followers { - rc = apply_log_records(rindex); + rc = apply_log_records(rindex); } else { @@ -688,17 +691,6 @@ int LogDB::replicate(int rindex) /* -------------------------------------------------------------------------- */ /* -------------------------------------------------------------------------- */ -int LogDB::index_cb(void *null, int num, char **values, char **names) -{ - if ( num == 0 || values == 0 || values[0] == 0 ) - { - return -1; - } - - fed_log.insert(atoi(values[0])); - - return 0; -} void LogDB::build_federated_index() { @@ -706,13 +698,15 @@ void LogDB::build_federated_index() fed_log.clear(); - set_callback(static_cast(&LogDB::index_cb), 0); + set_cb cb; + + cb.set_callback(&fed_log); oss << "SELECT fed_index FROM " << table << " WHERE fed_index != -1 "; - db->exec_rd(oss, this); + db->exec_rd(oss, &cb); - unset_callback(); + cb.unset_callback(); } /* -------------------------------------------------------------------------- */ diff --git a/src/sql/MySqlDB.cc b/src/sql/MySqlDB.cc index d1771764e8..68871135b2 100644 --- a/src/sql/MySqlDB.cc +++ b/src/sql/MySqlDB.cc @@ -16,9 +16,10 @@ #include "MySqlDB.h" #include +#include /********* - * Doc: http://dev.mysql.com/doc/refman/5.5/en/c-api-function-overview.html + * Doc: https://dev.mysql.com/doc/refman/8.0/en/c-api.html ********/ /* -------------------------------------------------------------------------- */ @@ -162,11 +163,13 @@ bool MySqlDB::limit_support() /* -------------------------------------------------------------------------- */ -int MySqlDB::exec(ostringstream& cmd, Callbackable* obj, bool quiet) +std::error_code MySqlDB::execute(std::ostringstream& cmd, Callbackable *obj, bool quiet) { string str = cmd.str(); const char * c_str = str.c_str(); + std::error_code ec = SqlError::SUCCESS; + Log::MessageType error_level = quiet ? Log::DDEBUG : Log::ERROR; struct timespec timer; @@ -179,37 +182,52 @@ int MySqlDB::exec(ostringstream& cmd, Callbackable* obj, bool quiet) if (rc != 0) { - ostringstream oss; - const char * err_msg = mysql_error(db); - int err_num = mysql_errno(db); + ostringstream oss; - if( err_num == CR_SERVER_GONE_ERROR || err_num == CR_SERVER_LOST ) + const char * err_msg = mysql_error(db); + int err_num = mysql_errno(db); + + switch(err_num) { - oss << "MySQL connection error " << err_num << " : " << err_msg; + case CR_SERVER_GONE_ERROR: + case CR_SERVER_LOST: + oss << "MySQL connection error " << err_num << " : " << err_msg; - // Try to re-connect - if (mysql_real_connect(db, server.c_str(), user.c_str(), - password.c_str(), database.c_str(), - port, NULL, 0)) - { - oss << "... Reconnected."; - } - else - { - oss << "... Reconnection attempt failed."; - } + // Try to re-connect + if (mysql_real_connect(db, server.c_str(), user.c_str(), + password.c_str(), database.c_str(), port, NULL, 0)) + { + oss << "... Reconnected."; + } + else + { + oss << "... Reconnection attempt failed."; + } + + ec = SqlError::CONNECTION; + break; + + // Error codes that should be considered applied for the RAFT log. + case ER_DUP_ENTRY: + ec = SqlError::SQL_DUP_KEY; + break; + + default: + ec = SqlError::SQL; //Default exit code for errors + break; } - else + + if (ec != SqlError::CONNECTION) { oss << "SQL command was: " << c_str; oss << ", error " << err_num << " : " << err_msg; } - NebulaLog::log("ONE",error_level,oss); + NebulaLog::log("ONE", error_level, oss); free_db_connection(db); - return -1; + return ec; } if (obj != 0) @@ -238,12 +256,12 @@ int MySqlDB::exec(ostringstream& cmd, Callbackable* obj, bool quiet) free_db_connection(db); - return -1; + return make_error_code(SqlError::SQL); } // Fetch the names of the fields - num_fields = mysql_num_fields(result); - fields = mysql_fetch_fields(result); + num_fields = mysql_num_fields(result); + fields = mysql_fetch_fields(result); char ** names = new char*[num_fields]; @@ -257,7 +275,7 @@ int MySqlDB::exec(ostringstream& cmd, Callbackable* obj, bool quiet) { if ( obj->do_callback(num_fields, row, names) != 0 ) { - rc = -1; + ec = make_error_code(SqlError::SQL); break; } } @@ -288,7 +306,7 @@ int MySqlDB::exec(ostringstream& cmd, Callbackable* obj, bool quiet) NebulaLog::log("SQL", Log::WARNING, oss); } - return rc; + return ec; } /* -------------------------------------------------------------------------- */ @@ -362,4 +380,4 @@ bool MySqlDB::fts_available() } } -/* -------------------------------------------------------------------------- */ \ No newline at end of file +/* -------------------------------------------------------------------------- */ diff --git a/src/sql/SConstruct b/src/sql/SConstruct index 04cae3380e..91e16fcdad 100644 --- a/src/sql/SConstruct +++ b/src/sql/SConstruct @@ -20,7 +20,7 @@ Import('env') lib_name='nebula_sql' -source_files=['LogDB.cc'] +source_files=['LogDB.cc', 'SqlDB.cc'] # Sources to generate the library if env['sqlite']=='yes': diff --git a/src/sql/SqliteDB.cc b/src/sql/SqliteDB.cc index 37c735f14f..780aa5d9c5 100644 --- a/src/sql/SqliteDB.cc +++ b/src/sql/SqliteDB.cc @@ -43,11 +43,14 @@ extern "C" int sqlite_callback ( SqliteDB::SqliteDB(const string& db_name) { - int rc; - pthread_mutex_init(&mutex,0); - rc = sqlite3_open(db_name.c_str(), &db); + int rc = sqlite3_open(db_name.c_str(), &db); + + if ( rc != SQLITE_OK ) + { + throw runtime_error("Could not open database."); + } enable_limit = sqlite3_compileoption_used("SQLITE_ENABLE_UPDATE_DELETE_LIMIT"); @@ -56,10 +59,7 @@ SqliteDB::SqliteDB(const string& db_name) NebulaLog::log("ONE",Log::INFO , "sqlite has enabled: SQLITE_ENABLE_UPDATE_DELETE_LIMIT"); } - if ( rc != SQLITE_OK ) - { - throw runtime_error("Could not open database."); - } + sqlite3_extended_result_codes(db, 1); } /* -------------------------------------------------------------------------- */ @@ -89,19 +89,24 @@ bool SqliteDB::limit_support() /* -------------------------------------------------------------------------- */ -int SqliteDB::exec(ostringstream& cmd, Callbackable* obj, bool quiet) +std::error_code SqliteDB::execute(std::ostringstream& cmd, Callbackable *obj, bool quiet) { - int rc; + int rc; const char * c_str; string str; - int counter = 0; - char * err_msg = 0; + int counter = 0; + char * err_msg = 0; int (*callback)(void*,int,char**,char**); void * arg; + Log::MessageType error_level; + std::ostringstream oss; + + std::error_code ec; + str = cmd.str(); c_str = str.c_str(); @@ -131,8 +136,7 @@ int SqliteDB::exec(ostringstream& cmd, Callbackable* obj, bool quiet) select(0, NULL, NULL, NULL, &timeout); } - }while( (rc == SQLITE_BUSY || rc == SQLITE_IOERR) && - (counter < 10)); + }while((rc == SQLITE_BUSY || rc == SQLITE_IOERR) && (counter < 10)); if (obj != 0 && obj->get_affected_rows() == 0) { @@ -146,24 +150,38 @@ int SqliteDB::exec(ostringstream& cmd, Callbackable* obj, bool quiet) unlock(); - if (rc != SQLITE_OK) + switch(rc) { - if (err_msg != 0) - { - Log::MessageType error_level = quiet ? Log::DDEBUG : Log::ERROR; + case SQLITE_BUSY: + case SQLITE_IOERR: + ec = SqlError::CONNECTION; + break; - ostringstream oss; + case SQLITE_OK: + ec = SqlError::SUCCESS; + break; - oss << "SQL command was: " << c_str << ", error: " << err_msg; - NebulaLog::log("ONE",error_level,oss); + // Error codes that should be considered applied for the RAFT log. + case SQLITE_CONSTRAINT_UNIQUE: + ec = SqlError::SQL_DUP_KEY; + break; - sqlite3_free(err_msg); - } - - return -1; + default: + ec = SqlError::SQL; + break; } - return 0; + if ( ec != SqlError::SUCCESS && err_msg != NULL ) + { + error_level = quiet ? Log::DDEBUG : Log::ERROR; + + oss << "SQL command was: " << c_str << ", error: " << err_msg; + NebulaLog::log("ONE",error_level,oss); + + sqlite3_free(err_msg); + } + + return ec; } /* -------------------------------------------------------------------------- */