From a3e14127c342cd1fe9c4c398eae454e3b7023390 Mon Sep 17 00:00:00 2001 From: "Ruben S. Montero" Date: Thu, 14 Jun 2018 11:52:08 +0200 Subject: [PATCH] B #2144: Fix federation for non HA deployments. Better purge (cherry picked from commit ba0231ab0dd13e3ae2f0f23c98a366a91983bf69) --- src/sql/LogDB.cc | 81 ++++++++++++++++++++++++++++++++++++++---------- 1 file changed, 65 insertions(+), 16 deletions(-) diff --git a/src/sql/LogDB.cc b/src/sql/LogDB.cc index 6427c4f116..3be7d9f540 100644 --- a/src/sql/LogDB.cc +++ b/src/sql/LogDB.cc @@ -465,6 +465,12 @@ int LogDB::_exec_wr(ostringstream& cmd, int federated_index) if ( rc == 0 && Nebula::instance().is_federation_enabled() ) { insert_log_record(0, cmd, time(0), federated_index); + + pthread_mutex_lock(&mutex); + + last_applied = last_index; + + pthread_mutex_unlock(&mutex); } return rc; @@ -557,34 +563,77 @@ int LogDB::purge_log() { std::ostringstream oss; + int rc = 0; + + int fed_records_delete = 0; + int records_delete = 0 ; + pthread_mutex_lock(&mutex); - if ( last_index < log_retention ) + /* ---------------------------------------------------------------------- */ + /* Non-federated records */ + /* ---------------------------------------------------------------------- */ + int delete_index = last_applied - log_retention; + + if ( delete_index > 0 ) { - pthread_mutex_unlock(&mutex); - return 0; + empty_cb cb; + + // keep the last "log_retention" records as well as those not applied + oss << "DELETE FROM logdb WHERE timestamp > 0 AND log_index >= 0 " + << "AND fed_index = -1 AND log_index < " << delete_index; + + if ( db->limit_support() ) + { + oss << " LIMIT " << limit_purge; + } + + rc = db->exec_wr(oss, &cb); + + if ( rc != -1 ) + { + records_delete = cb.get_affected_rows(); + } } - unsigned int delete_index = last_applied - log_retention; - - empty_cb cb; - - // keep the last "log_retention" records as well as those not applied to DB - oss << "DELETE FROM logdb WHERE timestamp > 0 AND log_index >= 0 " - << "AND log_index < " << delete_index; - - if ( db->limit_support() ) + /* ---------------------------------------------------------------------- */ + /* Federated records */ + /* ---------------------------------------------------------------------- */ + if ( fed_log.size() < log_retention) { - oss << " LIMIT " << limit_purge; + fed_records_delete = 0; + } + else + { + fed_records_delete = fed_log.size() - log_retention; + + if ( fed_records_delete > limit_purge ) + { + fed_records_delete = limit_purge; + } } - int rc = db->exec_wr(oss, &cb); + std::set::iterator it = fed_log.begin(); + + for (int i=0; i < fed_records_delete; ++i) + { + oss.str(""); + + oss << "DELETE FROM logdb WHERE timestamp > 0 AND log_index >= 0 " + << "AND fed_index = " << *it; + + db->exec_wr(oss); + + it = fed_log.erase(it); + } pthread_mutex_unlock(&mutex); - if ( rc != -1 ) + records_delete += fed_records_delete ; + + if ( records_delete > 0 ) { - return cb.get_affected_rows(); + return records_delete; } return rc;