1
0
mirror of https://github.com/OpenNebula/one.git synced 2025-03-30 22:50:10 +03:00

Revert "F #2722: Increase the size of the replication and federation log indexes"

This reverts commit 1da93f37326a5bc294f74f6faedbcc0182b03adf.
This commit is contained in:
Ruben S. Montero 2019-04-08 17:09:56 +02:00
parent 1da93f3732
commit b59ee42750
No known key found for this signature in database
GPG Key ID: A0CEA6FA880A1D87
14 changed files with 219 additions and 401 deletions

View File

@ -56,7 +56,7 @@ public:
* @param sql command to apply to DB
* @return 0 on success, last_index if missing records, -1 on DB error
*/
uint64_t apply_log_record(uint64_t index, uint64_t prev, const std::string& sql);
int apply_log_record(int index, int prev, const std::string& sql);
/**
* Record was successfully replicated on zone, increase next index and
@ -70,7 +70,7 @@ public:
* send any pending records.
* @param zone_id
*/
void replicate_failure(int zone_id, uint64_t zone_last);
void replicate_failure(int zone_id, int zone_last);
/**
* XML-RPC API call to replicate a log entry on slaves
@ -80,7 +80,7 @@ public:
* @param error description if any
* @return 0 on success -1 if a xml-rpc/network error occurred
*/
int xmlrpc_replicate_log(int zone_id, bool& success, uint64_t& last,
int xmlrpc_replicate_log(int zone_id, bool& success, int& last,
std::string& err);
/**
@ -167,8 +167,8 @@ private:
struct ZoneServers
{
ZoneServers(int z, uint64_t l, const std::string& s):
zone_id(z), endpoint(s), next(l), last(UINT64_MAX){};
ZoneServers(int z, unsigned int l, const std::string& s):
zone_id(z), endpoint(s), next(l), last(-1){};
~ZoneServers(){};
@ -176,9 +176,9 @@ private:
std::string endpoint;
uint64_t next;
int next;
uint64_t last;
int last;
};
std::map<int, ZoneServers *> zones;

View File

@ -32,9 +32,9 @@ public:
/**
* Index for this log entry (and previous)
*/
uint64_t index;
unsigned int index;
uint64_t prev_index;
unsigned int prev_index;
/**
* Term where this log (and previous) entry was generated
@ -57,7 +57,7 @@ public:
* The index in the federation, -1 if the log entry is not federated.
* At master fed_index is equal to index.
*/
uint64_t fed_index;
int fed_index;
/**
* Sets callback to load register from DB
@ -82,8 +82,8 @@ private:
class LogDB : public SqlDB
{
public:
LogDB(SqlDB * _db, bool solo, bool cache, uint64_t log_retention,
uint64_t limit_purge);
LogDB(SqlDB * _db, bool solo, bool cache, unsigned int log_retention,
unsigned int limit_purge);
virtual ~LogDB();
@ -97,20 +97,20 @@ public:
* @param lr logDBrecored to load from the DB
* @return 0 on success -1 otherwise
*/
int get_log_record(uint64_t index, LogDBRecord& lr);
int get_log_record(unsigned int index, LogDBRecord& lr);
/**
* Applies the SQL command of the given record to the database. The
* timestamp of the record is updated.
* @param index of the log record
*/
int apply_log_records(uint64_t commit_index);
int apply_log_records(unsigned int commit_index);
/**
* Deletes the record in start_index and all that follow it
* @param start_index first log record to delete
*/
int delete_log_records(uint64_t start_index);
int delete_log_records(unsigned int start_index);
/**
* Inserts a new log record in the database. This method should be used
@ -124,8 +124,8 @@ public:
*
* @return -1 on failure, index of the inserted record on success
*/
int insert_log_record(uint64_t index, unsigned int term,
std::ostringstream& sql, time_t timestamp, uint64_t fed_index,
int insert_log_record(unsigned int index, unsigned int term,
std::ostringstream& sql, time_t timestamp, int fed_index,
bool replace);
/**
@ -135,7 +135,7 @@ public:
*
* @return 0 on success, -1 in case of failure
*/
int replicate(uint64_t rindex);
int replicate(int rindex);
//--------------------------------------------------------------------------
// Functions to manage the Raft state. Log record 0, term -1
@ -145,14 +145,14 @@ public:
* @param raft attributes in XML format
* @return 0 on success
*/
int update_raft_state(std::string name, std::string& raft_xml);
int update_raft_state(std::string& raft_xml);
/**
* Returns the raft state attributes as stored in the log
* @param raft_xml attributes in xml
* @return 0 on success
*/
int get_raft_state(std::string name, std::string &raft_xml);
int get_raft_state(std::string &raft_xml);
/**
* Purge log records. Delete old records applied to database upto the
@ -170,7 +170,7 @@ public:
*/
int exec_wr(ostringstream& cmd)
{
return _exec_wr(cmd, UINT64_MAX);
return _exec_wr(cmd, -1);
}
int exec_wr(ostringstream& cmd, Callbackable* obj)
@ -183,7 +183,7 @@ public:
return _exec_wr(cmd, 0);
}
int exec_federated_wr(ostringstream& cmd, uint64_t index)
int exec_federated_wr(ostringstream& cmd, int index)
{
return _exec_wr(cmd, index);
}
@ -234,14 +234,14 @@ public:
*
* @return 0 on success
*/
int setup_index(uint64_t& last_applied, uint64_t& last_index);
int setup_index(int& last_applied, int& last_index);
/**
* Gets the index & term of the last record in the log
* @param _i the index
* @param _t the term
*/
void get_last_record_index(uint64_t& _i, unsigned int& _t);
void get_last_record_index(unsigned int& _i, unsigned int& _t);
// -------------------------------------------------------------------------
// Federate log methods
@ -249,11 +249,11 @@ public:
/**
* Get last federated index, and previous
*/
uint64_t last_federated();
int last_federated();
uint64_t previous_federated(uint64_t index);
int previous_federated(int index);
uint64_t next_federated(uint64_t index);
int next_federated(int index);
protected:
int exec_ext(std::ostringstream& cmd, Callbackable *obj, bool quiet)
@ -282,17 +282,17 @@ private:
/**
* Index to be used by the next logDB record
*/
uint64_t next_index;
unsigned int next_index;
/**
* Index of the last log entry applied to the DB state
*/
uint64_t last_applied;
unsigned int last_applied;
/**
* Index of the last (highest) log entry
*/
uint64_t last_index;
unsigned int last_index;
/**
* term of the last (highest) log entry
@ -302,12 +302,12 @@ private:
/**
* Max number of records to keep in the database
*/
uint64_t log_retention;
unsigned int log_retention;
/**
* Max number of logs purged on each call.
*/
uint64_t limit_purge;
unsigned int limit_purge;
// -------------------------------------------------------------------------
// Federated Log
@ -316,7 +316,7 @@ private:
* The federated log stores a map with the federated log index and its
* corresponding local index. For the master both are the same
*/
std::set<uint64_t> fed_log;
std::set<int> fed_log;
/**
* Generates the federated index, it should be called whenever a server
@ -340,7 +340,7 @@ private:
* @param federated -1 not federated (fed_index = -1), 0 generate fed index
* (fed_index = index), > 0 set (fed_index = federated)
*/
int _exec_wr(ostringstream& cmd, uint64_t federated);
int _exec_wr(ostringstream& cmd, int federated);
/**
* Applies the SQL command of the given record to the database. The
@ -360,8 +360,8 @@ private:
*
* @return 0 on success
*/
int insert(uint64_t index, int term, const std::string& sql, time_t ts,
uint64_t fi, bool replace);
int insert(int index, int term, const std::string& sql, time_t ts, int fi,
bool replace);
/**
* Inserts a new log record in the database. If the record is successfully
@ -373,8 +373,8 @@ private:
*
* @return -1 on failure, index of the inserted record on success
*/
uint64_t insert_log_record(uint64_t term, std::ostringstream& sql,
time_t timestamp, uint64_t fed_index);
int insert_log_record(unsigned int term, std::ostringstream& sql,
time_t timestamp, int federated);
};
// -----------------------------------------------------------------------------

View File

@ -95,7 +95,7 @@ public:
* Allocate a replica request fot the given index.
* @param rindex of the record for the request
*/
void replicate_allocate(uint64_t rindex)
void replicate_allocate(int rindex)
{
requests.allocate(rindex);
}
@ -145,9 +145,9 @@ public:
return _term;
}
uint64_t get_commit()
unsigned int get_commit()
{
uint64_t _commit;
unsigned int _commit;
pthread_mutex_lock(&mutex);
@ -164,7 +164,7 @@ public:
* @param index of the last record inserted in the database
* @return the updated commit index
*/
uint64_t update_commit(uint64_t leader_commit, uint64_t index);
unsigned int update_commit(unsigned int leader_commit, unsigned int index);
/**
* Evaluates a vote request. It is granted if no vote has been granted in
@ -224,10 +224,10 @@ public:
* @param follower server id
* @return -1 on failure, the next index if success
*/
uint64_t get_next_index(int follower_id)
int get_next_index(int follower_id)
{
std::map<int, uint64_t>::iterator it;
uint64_t _index = -1;
std::map<int, unsigned int>::iterator it;
unsigned int _index = -1;
pthread_mutex_lock(&mutex);
@ -275,7 +275,7 @@ public:
* @param error describing error if any
* @return -1 if a XMl-RPC (network) error occurs, 0 otherwise
*/
int xmlrpc_request_vote(int follower_id, uint64_t lindex,
int xmlrpc_request_vote(int follower_id, unsigned int lindex,
unsigned int lterm, bool& success, unsigned int& fterm,
std::string& error);
@ -369,11 +369,6 @@ private:
*/
Template raft_state;
/**
* Value for name column in system_attributes table for raft state.
*/
static const string raft_state_name;
/**
* After becoming a leader it is replicating and applying any pending
* log entry.
@ -413,11 +408,11 @@ private:
HeartBeatManager heartbeat_manager;
uint64_t commit;
unsigned int commit;
std::map<int, uint64_t> next;
std::map<int, unsigned int> next;
std::map<int, uint64_t> match;
std::map<int, unsigned int> match;
std::map<int, std::string> servers;
@ -470,11 +465,6 @@ private:
* Makes this server leader, and start replica threads
*/
void leader();
/**
* Init the raft state status row.
*/
int init_raft_state(const std::string& raft_xml);
};
#endif /*RAFT_MANAGER_H_*/

View File

@ -27,7 +27,7 @@
class ReplicaRequest : public SyncRequest
{
public:
ReplicaRequest(uint64_t i):_index(i), _to_commit(-1), _replicas(1){};
ReplicaRequest(unsigned int i):_index(i), _to_commit(-1), _replicas(1){};
~ReplicaRequest(){};
@ -64,7 +64,7 @@ public:
/* ---------------------------------------------------------------------- */
/* Class access methods */
/* ---------------------------------------------------------------------- */
uint64_t index()
int index()
{
return _index;
}
@ -88,7 +88,7 @@ private:
/**
* Index for this log entry
*/
uint64_t _index;
unsigned int _index;
/**
* Remaining number of servers that need to replicate this record to commit
@ -127,13 +127,13 @@ public:
*
* @return the number of replicas to commit, if 0 it can be committed
*/
int add_replica(uint64_t rindex)
int add_replica(int rindex)
{
int to_commit = -1;
pthread_mutex_lock(&mutex);
std::map<uint64_t, ReplicaRequest *>::iterator it = requests.find(rindex);
std::map<int, ReplicaRequest *>::iterator it = requests.find(rindex);
if ( it != requests.end() && it->second != 0 )
{
@ -157,7 +157,7 @@ public:
* on this request.
* @param rindex of the request
*/
void allocate(uint64_t rindex)
void allocate(int rindex)
{
pthread_mutex_lock(&mutex);
@ -172,11 +172,11 @@ public:
* @param rindex of the request
* @param rr replica request pointer
*/
void set(uint64_t rindex, ReplicaRequest * rr)
void set(int rindex, ReplicaRequest * rr)
{
pthread_mutex_lock(&mutex);
std::map<uint64_t, ReplicaRequest *>::iterator it = requests.find(rindex);
std::map<int, ReplicaRequest *>::iterator it = requests.find(rindex);
if ( it == requests.end() )
{
@ -194,11 +194,11 @@ public:
* Remove a replication request associated to this index
* @param rindex of the request
*/
void remove(uint64_t rindex)
void remove(int rindex)
{
pthread_mutex_lock(&mutex);
std::map<uint64_t, ReplicaRequest *>::iterator it = requests.find(rindex);
std::map<int, ReplicaRequest *>::iterator it = requests.find(rindex);
if ( it != requests.end() )
{
@ -215,7 +215,7 @@ public:
{
pthread_mutex_lock(&mutex);
std::map<uint64_t, ReplicaRequest *>::iterator it;
std::map<int, ReplicaRequest *>::iterator it;
for ( it = requests.begin() ; it != requests.end() ; ++it )
{
@ -239,13 +239,13 @@ public:
/**
* @return true if a replica request is set for this index
*/
bool is_replicable(uint64_t rindex)
bool is_replicable(int rindex)
{
pthread_mutex_lock(&mutex);
std::map<uint64_t, ReplicaRequest *>::iterator it = requests.find(rindex);
std::map<int, ReplicaRequest *>::iterator it = requests.find(rindex);
bool rc = it == requests.end() ||
bool rc = it == requests.end() ||
(it != requests.end() && it->second != 0);
pthread_mutex_unlock(&mutex);
@ -260,7 +260,7 @@ private:
/**
* Clients waiting for a log replication
*/
std::map<uint64_t, ReplicaRequest *> requests;
std::map<int, ReplicaRequest *> requests;
};
#endif /*REPLICA_REQUEST_H_*/

View File

@ -55,14 +55,11 @@ public:
int resp_id; /**< Id of the object */
string resp_msg; /**< Additional response message */
uint64_t replication_idx;
RequestAttributes()
{
resp_obj = PoolObjectSQL::NONE;
resp_id = -1;
resp_msg = "";
replication_idx = UINT64_MAX;
resp_obj = PoolObjectSQL::NONE;
resp_id = -1;
resp_msg = "";
};
RequestAttributes(const RequestAttributes& ra)
@ -87,8 +84,6 @@ public:
resp_obj = ra.resp_obj;
resp_id = ra.resp_id;
resp_msg = ra.resp_msg;
replication_idx = ra.replication_idx;
};
RequestAttributes(int _uid, int _gid, const RequestAttributes& ra)
@ -115,8 +110,6 @@ public:
resp_obj = PoolObjectSQL::NONE;
resp_id = -1;
resp_msg = "";
replication_idx = UINT64_MAX;
};
bool is_admin() const
@ -147,16 +140,15 @@ public:
* Error codes for the XML-RPC API
*/
enum ErrorCode {
SUCCESS = 0x00000,
AUTHENTICATION = 0x00100,
AUTHORIZATION = 0x00200,
NO_EXISTS = 0x00400,
ACTION = 0x00800,
XML_RPC_API = 0x01000,
INTERNAL = 0x02000,
ALLOCATE = 0x04000,
LOCKED = 0x08000,
REPLICATION = 0x10000
SUCCESS = 0x0000,
AUTHENTICATION = 0x0100,
AUTHORIZATION = 0x0200,
NO_EXISTS = 0x0400,
ACTION = 0x0800,
XML_RPC_API = 0x1000,
INTERNAL = 0x2000,
ALLOCATE = 0x4000,
LOCKED = 0x8000
};
/**
@ -295,14 +287,6 @@ protected:
*/
void success_response(bool val, RequestAttributes& att);
/**
* Builds an XML-RPC response updating retval. After calling this function
* the xml-rpc excute method should return
* @param val to be returned to the client
* @param att the specific request attributes
*/
void success_response(uint64_t val, RequestAttributes& att);
/**
* Builds an XML-RPC response updating retval. After calling this function
* the xml-rpc excute method should return. A descriptive error message
@ -313,14 +297,6 @@ protected:
*/
void failure_response(ErrorCode ec, RequestAttributes& ra);
/**
* Builds an XML-RPC response updating retval. After calling this function
* the xml-rpc excute method should return. A descriptive error message
* is constructed for DB replications erros.
* @param att the specific request attributes
*/
void failure_response_replication(RequestAttributes& att);
/**
* Builds an error response. A descriptive error message
* is constructed using att.resp_obj, att.resp_id and/or att.resp_msg and

View File

@ -1479,8 +1479,7 @@ ONEDB_LOCAL_MIGRATOR_FILES="src/onedb/local/4.5.80_to_4.7.80.rb \
src/onedb/local/5.4.1_to_5.5.80.rb \
src/onedb/local/5.5.80_to_5.6.0.rb \
src/onedb/local/5.6.0_to_5.7.80.rb \
src/onedb/local/5.7.80_to_5.8.0.rb \
src/onedb/local/5.8.0_to_5.9.80.rb"
src/onedb/local/5.7.80_to_5.8.0.rb"
ONEDB_PATCH_FILES="src/onedb/patches/4.14_monitoring.rb \
src/onedb/patches/history_times.rb"

View File

@ -104,14 +104,10 @@ class OneDBBacKEnd
index_sqlite: ["CREATE INDEX state_oid_idx ON vm_pool (state, oid);",
"CREATE INDEX applied_idx ON logdb (applied);"]
},
"5.9.80" => {
logdb: "log_index BIGINT UNSIGNED PRIMARY KEY, term INTEGER, sqlcmd MEDIUMTEXT, " <<
"timestamp INTEGER, fed_index BIGINT UNSIGNED, applied BOOLEAN"
}
}
LATEST_DB_VERSION = "5.9.80"
LATEST_DB_VERSION = "5.7.80"
def get_schema(type, version = nil)
if !version

View File

@ -1,67 +0,0 @@
# -------------------------------------------------------------------------- #
# Copyright 2002-2019, OpenNebula Project, OpenNebula Systems #
# #
# Licensed under the Apache License, Version 2.0 (the "License"); you may #
# not use this file except in compliance with the License. You may obtain #
# a copy of the License at #
# #
# http://www.apache.org/licenses/LICENSE-2.0 #
# #
# Unless required by applicable law or agreed to in writing, software #
# distributed under the License is distributed on an "AS IS" BASIS, #
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. #
# See the License for the specific language governing permissions and #
# limitations under the License. #
#--------------------------------------------------------------------------- #
require 'set'
require 'base64'
require 'zlib'
require 'pathname'
require 'yaml'
require 'opennebula'
require 'vcenter_driver'
$LOAD_PATH << File.dirname(__FILE__)
include OpenNebula
module Migrator
def db_version
'5.9.80'
end
def one_version
'OpenNebula 5.9.80'
end
def up
feature_2722
true
end
private
def feature_2722
@db.run 'DROP TABLE IF EXISTS old_logdb;'
@db.run 'ALTER TABLE logdb RENAME TO old_logdb;'
create_table(:logdb)
@db.run 'INSERT INTO system_attributes (name, body)' <<
'SELECT \'RAFT_STATE\', sqlcmd FROM old_logdb WHERE log_index = -1;'
@db.run 'DELETE FROM old_logdb WHERE log_index = -1;'
db.transaction do
# update virtual networks
@db.fetch('SELECT * FROM old_logdb') do |row|
row[:fed_index] = 18446744073709551615 if row[:fed_index] < 0
@db[:logdb].insert(row)
end
end
end
end

View File

@ -58,14 +58,14 @@ FedReplicaManager::~FedReplicaManager()
/* -------------------------------------------------------------------------- */
/* -------------------------------------------------------------------------- */
uint64_t FedReplicaManager::apply_log_record(uint64_t index, uint64_t prev,
int FedReplicaManager::apply_log_record(int index, int prev,
const std::string& sql)
{
uint64_t rc;
int rc;
pthread_mutex_lock(&mutex);
uint64_t last_index = logdb->last_federated();
int last_index = logdb->last_federated();
if ( prev != last_index )
{
@ -80,7 +80,7 @@ uint64_t FedReplicaManager::apply_log_record(uint64_t index, uint64_t prev,
if ( logdb->exec_federated_wr(oss, index) != 0 )
{
pthread_mutex_unlock(&mutex);
return UINT64_MAX;
return -1;
}
pthread_mutex_unlock(&mutex);
@ -154,7 +154,7 @@ void FedReplicaManager::update_zones(std::vector<int>& zone_ids)
pthread_mutex_lock(&mutex);
uint64_t last_index = logdb->last_federated();
int last_index = logdb->last_federated();
zones.clear();
@ -291,7 +291,7 @@ int FedReplicaManager::get_next_record(int zone_id, std::string& zedp,
zedp = zs->endpoint;
//Initialize next index for the zone
if ( zs->next == UINT64_MAX )
if ( zs->next == -1 )
{
zs->next= logdb->last_federated();
}
@ -301,10 +301,10 @@ int FedReplicaManager::get_next_record(int zone_id, std::string& zedp,
{
zs->next = logdb->next_federated(zs->next);
if ( zs->next == UINT64_MAX ) //no new records
if ( zs->last == zs->next ) //no new records
{
pthread_mutex_unlock(&mutex);
return -2;
return -1;
}
}
@ -346,7 +346,7 @@ void FedReplicaManager::replicate_success(int zone_id)
zs->next = logdb->next_federated(zs->next);
if ( zs->next != UINT64_MAX )
if ( zs->next != -1 )
{
ReplicaManager::replicate(zone_id);
}
@ -356,7 +356,7 @@ void FedReplicaManager::replicate_success(int zone_id)
/* -------------------------------------------------------------------------- */
void FedReplicaManager::replicate_failure(int zone_id, uint64_t last_zone)
void FedReplicaManager::replicate_failure(int zone_id, int last_zone)
{
pthread_mutex_lock(&mutex);
@ -366,14 +366,14 @@ void FedReplicaManager::replicate_failure(int zone_id, uint64_t last_zone)
{
ZoneServers * zs = it->second;
if ( last_zone != UINT64_MAX )
if ( last_zone >= 0 )
{
zs->last = last_zone;
zs->next = logdb->next_federated(zs->last);
}
if ( zs->next != UINT64_MAX )
if ( zs->next != -1 )
{
ReplicaManager::replicate(zone_id);
}
@ -387,7 +387,7 @@ void FedReplicaManager::replicate_failure(int zone_id, uint64_t last_zone)
/* -------------------------------------------------------------------------- */
int FedReplicaManager::xmlrpc_replicate_log(int zone_id, bool& success,
uint64_t& last, std::string& error)
int& last, std::string& error)
{
static const std::string replica_method = "one.zone.fedreplicate";
@ -397,14 +397,12 @@ int FedReplicaManager::xmlrpc_replicate_log(int zone_id, bool& success,
LogDBRecord lr;
int rc = get_next_record(zone_id, zedp, lr, error);
if ( rc != 0 )
if ( get_next_record(zone_id, zedp, lr, error) != 0 )
{
return rc;
return -1;
}
uint64_t prev_index = logdb->previous_federated(lr.index);
int prev_index = logdb->previous_federated(lr.index);
// -------------------------------------------------------------------------
// Get parameters to call append entries on follower
@ -418,8 +416,8 @@ int FedReplicaManager::xmlrpc_replicate_log(int zone_id, bool& success,
}
replica_params.add(xmlrpc_c::value_string(xmlrpc_secret));
replica_params.add(xmlrpc_c::value_i8(lr.index));
replica_params.add(xmlrpc_c::value_i8(prev_index));
replica_params.add(xmlrpc_c::value_int(lr.index));
replica_params.add(xmlrpc_c::value_int(prev_index));
replica_params.add(xmlrpc_c::value_string(lr.sql));
// -------------------------------------------------------------------------
@ -437,12 +435,12 @@ int FedReplicaManager::xmlrpc_replicate_log(int zone_id, bool& success,
if ( success ) //values[2] = error code (string)
{
last = xmlrpc_c::value_i8(values[1]);
last = xmlrpc_c::value_int(values[1]);
}
else
{
error = xmlrpc_c::value_string(values[1]);
last = xmlrpc_c::value_i8(values[3]);
last = xmlrpc_c::value_int(values[3]);
}
}
else

View File

@ -27,8 +27,6 @@
/* -------------------------------------------------------------------------- */
const time_t RaftManager::timer_period_ms = 50;
const string RaftManager::raft_state_name = "RAFT_STATE";
static void set_timeout(long long ms, struct timespec& timeout)
{
std::lldiv_t d;
@ -70,20 +68,20 @@ RaftManager::RaftManager(int id, const VectorAttribute * leader_hook_mad,
// - term
// -------------------------------------------------------------------------
if ( logdb->get_raft_state(raft_state_name, raft_xml) != 0 )
if ( logdb->get_raft_state(raft_xml) != 0 )
{
std::ostringstream bsr;
bsr << "<MESSAGE>bootstrap state</MESSAGE>";
bsr << "bootstrap state";
init_raft_state(bsr.str());
logdb->insert_log_record(-1, -1, bsr, 0, -1, false);
raft_state.replace("TERM", 0);
raft_state.replace("VOTEDFOR", -1);
raft_state.to_xml(raft_xml);
logdb->update_raft_state(raft_state_name, raft_xml);
logdb->update_raft_state(raft_xml);
votedfor = -1;
term = 0;
@ -302,8 +300,7 @@ void RaftManager::add_server(int follower_id, const std::string& endpoint)
LogDB * logdb = Nebula::instance().get_logdb();
unsigned int log_term;
uint64_t log_index;
unsigned int log_index, log_term;
logdb->get_last_record_index(log_index, log_term);
@ -382,7 +379,7 @@ extern "C" void * reconciling_thread(void *arg)
LogDB * logdb = nd.get_logdb();
RaftManager * rm = nd.get_raftm();
uint64_t * index = static_cast<uint64_t *>(arg);
int * index = static_cast<int *>(arg);
NebulaLog::log("RCM", Log::INFO, "Replicating log to followers");
@ -413,7 +410,7 @@ void RaftManager::leader()
std::map<int, std::string>::iterator it;
std::vector<int> _follower_ids;
uint64_t index, _applied, _next_index;
int index, _applied, _next_index;
std::map<int, std::string> _servers;
@ -511,7 +508,7 @@ void RaftManager::leader()
void RaftManager::follower(unsigned int _term)
{
uint64_t lapplied, lindex;
int lapplied, lindex;
Nebula& nd = Nebula::instance();
LogDB * logdb = nd.get_logdb();
@ -570,7 +567,7 @@ void RaftManager::follower(unsigned int _term)
if (!raft_state_xml.empty())
{
logdb->update_raft_state(raft_state_name, raft_state_xml);
logdb->update_raft_state(raft_state_xml);
}
}
@ -594,13 +591,13 @@ void RaftManager::replicate_log(ReplicaRequest * request)
//Count servers that need to replicate this record
int to_commit = num_servers / 2;
std::map<int, uint64_t>::iterator it;
std::map<int, unsigned int>::iterator it;
for (it = next.begin(); it != next.end() ; ++it)
{
uint64_t rindex = request->index();
int rindex = request->index();
if ( rindex < (uint64_t) it->second )
if ( rindex < (int) it->second )
{
to_commit--;
}
@ -638,14 +635,13 @@ void RaftManager::replicate_success(int follower_id)
{
std::map<int, ReplicaRequest *>::iterator it;
std::map<int, uint64_t>::iterator next_it;
std::map<int, uint64_t>::iterator match_it;
std::map<int, unsigned int>::iterator next_it;
std::map<int, unsigned int>::iterator match_it;
Nebula& nd = Nebula::instance();
LogDB * logdb = nd.get_logdb();
unsigned int db_lterm;
uint64_t db_lindex;
unsigned int db_lindex, db_lterm;
logdb->get_last_record_index(db_lindex, db_lterm);
@ -660,7 +656,7 @@ void RaftManager::replicate_success(int follower_id)
return;
}
uint64_t replicated_index = next_it->second;
unsigned int replicated_index = next_it->second;
match_it->second = replicated_index;
next_it->second = replicated_index + 1;
@ -684,7 +680,7 @@ void RaftManager::replicate_success(int follower_id)
void RaftManager::replicate_failure(int follower_id)
{
std::map<int, uint64_t>::iterator next_it;
std::map<int, unsigned int>::iterator next_it;
pthread_mutex_lock(&mutex);
@ -726,9 +722,10 @@ void RaftManager::update_last_heartbeat(int _leader_id)
/* -------------------------------------------------------------------------- */
/* -------------------------------------------------------------------------- */
uint64_t RaftManager::update_commit(uint64_t leader_commit, uint64_t index)
unsigned int RaftManager::update_commit(unsigned int leader_commit,
unsigned int index)
{
uint64_t _commit;
unsigned int _commit;
pthread_mutex_lock(&mutex);
@ -778,7 +775,7 @@ int RaftManager::update_votedfor(int _votedfor)
pthread_mutex_unlock(&mutex);
logdb->update_raft_state(raft_state_name, raft_state_xml);
logdb->update_raft_state(raft_state_xml);
return 0;
}
@ -889,8 +886,7 @@ void RaftManager::timer_action(const ActionRequest& ar)
void RaftManager::request_vote()
{
unsigned int lterm, fterm, _term;
uint64_t lindex;
unsigned int lindex, lterm, fterm, _term;
int _server_id;
std::map<int, std::string> _servers;
@ -951,7 +947,7 @@ void RaftManager::request_vote()
pthread_mutex_unlock(&mutex);
logdb->update_raft_state(raft_state_name, raft_state_xml);
logdb->update_raft_state(raft_state_xml);
logdb->get_last_record_index(lindex, lterm);
@ -1032,7 +1028,7 @@ void RaftManager::request_vote()
pthread_mutex_unlock(&mutex);
logdb->update_raft_state(raft_state_name, raft_state_xml);
logdb->update_raft_state(raft_state_xml);
srand(_server_id+1);
@ -1041,7 +1037,7 @@ void RaftManager::request_vote()
oss.str("");
oss << "No leader found, starting new election in " << ms << "ms";
oss << "No leader found, starting new election in " << ms << "ms";
NebulaLog::log("RCM", Log::INFO, oss);
set_timeout(ms, etimeout);
@ -1058,7 +1054,7 @@ int RaftManager::xmlrpc_replicate_log(int follower_id, LogDBRecord * lr,
bool& success, unsigned int& fterm, std::string& error)
{
int _server_id;
uint64_t _commit;
int _commit;
int _term;
std::string xmlrpc_secret;
@ -1103,13 +1099,13 @@ int RaftManager::xmlrpc_replicate_log(int follower_id, LogDBRecord * lr,
replica_params.add(xmlrpc_c::value_string(xmlrpc_secret));
replica_params.add(xmlrpc_c::value_int(_server_id));
replica_params.add(xmlrpc_c::value_i8(_commit));
replica_params.add(xmlrpc_c::value_int(_commit));
replica_params.add(xmlrpc_c::value_int(_term));
replica_params.add(xmlrpc_c::value_i8(lr->index));
replica_params.add(xmlrpc_c::value_int(lr->index));
replica_params.add(xmlrpc_c::value_int(lr->term));
replica_params.add(xmlrpc_c::value_i8(lr->prev_index));
replica_params.add(xmlrpc_c::value_int(lr->prev_index));
replica_params.add(xmlrpc_c::value_int(lr->prev_term));
replica_params.add(xmlrpc_c::value_i8(lr->fed_index));
replica_params.add(xmlrpc_c::value_int(lr->fed_index));
replica_params.add(xmlrpc_c::value_string(lr->sql));
// -------------------------------------------------------------------------
@ -1151,7 +1147,7 @@ int RaftManager::xmlrpc_replicate_log(int follower_id, LogDBRecord * lr,
/* -------------------------------------------------------------------------- */
/* -------------------------------------------------------------------------- */
int RaftManager::xmlrpc_request_vote(int follower_id, uint64_t lindex,
int RaftManager::xmlrpc_request_vote(int follower_id, unsigned int lindex,
unsigned int lterm, bool& success, unsigned int& fterm,
std::string& error)
{
@ -1200,7 +1196,7 @@ int RaftManager::xmlrpc_request_vote(int follower_id, uint64_t lindex,
replica_params.add(xmlrpc_c::value_string(xmlrpc_secret));
replica_params.add(xmlrpc_c::value_int(_term));
replica_params.add(xmlrpc_c::value_int(_server_id));
replica_params.add(xmlrpc_c::value_i8(lindex));
replica_params.add(xmlrpc_c::value_int(lindex));
replica_params.add(xmlrpc_c::value_int(lterm));
// -------------------------------------------------------------------------
@ -1247,8 +1243,8 @@ std::string& RaftManager::to_xml(std::string& raft_xml)
Nebula& nd = Nebula::instance();
LogDB * logdb = nd.get_logdb();
unsigned int lterm;
uint64_t lindex;
unsigned int lindex, lterm;
std::ostringstream oss;
logdb->get_last_record_index(lindex, lterm);
@ -1296,10 +1292,9 @@ std::string& RaftManager::to_xml(std::string& raft_xml)
void RaftManager::reset_index(int follower_id)
{
std::map<int, uint64_t>::iterator next_it;
std::map<int, unsigned int>::iterator next_it;
unsigned int log_term;
uint64_t log_index;
unsigned int log_index, log_term;
LogDB * logdb = Nebula::instance().get_logdb();
@ -1316,15 +1311,3 @@ void RaftManager::reset_index(int follower_id)
pthread_mutex_unlock(&mutex);
}
/* -------------------------------------------------------------------------- */
/* -------------------------------------------------------------------------- */
int RaftManager::init_raft_state(const std::string& raft_xml)
{
string error;
Nebula& nd = Nebula::instance();
return nd.insert_sys_attribute(raft_state_name, raft_xml, error);
}

View File

@ -184,7 +184,7 @@ int RaftReplicaThread::replicate()
unsigned int term = raftm->get_term();
uint64_t next_index = raftm->get_next_index(follower_id);
int next_index = raftm->get_next_index(follower_id);
if ( logdb->get_log_record(next_index, lr) != 0 )
{
@ -255,19 +255,13 @@ int FedReplicaThread::replicate()
bool success = false;
uint64_t last;
int last;
int rc = frm->xmlrpc_replicate_log(follower_id, success, last, error);
if ( rc == -1 )
if ( frm->xmlrpc_replicate_log(follower_id, success, last, error) != 0 )
{
NebulaLog::log("FRM", Log::ERROR, error);
return -1;
}
else if ( rc == -2 )
{
return 0;
}
if ( success )
{
@ -317,7 +311,7 @@ int HeartBeatThread::replicate()
lr.sql = "";
lr.timestamp = 0;
lr.fed_index = UINT64_MAX;
lr.fed_index = -1;
rc = raftm->xmlrpc_replicate_log(follower_id, &lr, success, fterm, error);

View File

@ -330,9 +330,6 @@ void Request::log_xmlrpc_value(const xmlrpc_c::value& v, std::ostringstream& oss
oss << ", "
<< static_cast<double>(xmlrpc_c::value_double(v));
break;
case xmlrpc_c::value::TYPE_I8:
oss << ", " << static_cast<uint64_t>(xmlrpc_c::value_i8(v));
break;
default:
oss << ", unknown param type";
break;
@ -731,21 +728,6 @@ void Request::failure_response(ErrorCode ec, const string& str_val,
arrayData.push_back(xmlrpc_c::value_string(str_val));
arrayData.push_back(xmlrpc_c::value_int(ec));
arrayData.push_back(xmlrpc_c::value_int(att.resp_id));
arrayData.push_back(xmlrpc_c::value_i8(att.replication_idx));
xmlrpc_c::value_array arrayresult(arrayData);
*(att.retval) = arrayresult;
}
void Request::failure_response_replication(RequestAttributes& att)
{
vector<xmlrpc_c::value> arrayData;
arrayData.push_back(xmlrpc_c::value_boolean(false));
arrayData.push_back(xmlrpc_c::value_string(att.resp_msg));
arrayData.push_back(xmlrpc_c::value_int(REPLICATION));
arrayData.push_back(xmlrpc_c::value_i8(att.replication_idx));
xmlrpc_c::value_array arrayresult(arrayData);
@ -827,9 +809,6 @@ string Request::failure_message(ErrorCode ec, RequestAttributes& att)
oss << " [" << att.resp_id << "].";
}
break;
case REPLICATION:
oss << "Error replicating log entry " << att.replication_idx << ".";
break;
}
return oss.str();
@ -895,23 +874,6 @@ void Request::success_response(bool val, RequestAttributes& att)
/* -------------------------------------------------------------------------- */
/* -------------------------------------------------------------------------- */
void Request::success_response(uint64_t val, RequestAttributes& att)
{
vector<xmlrpc_c::value> arrayData;
arrayData.push_back(xmlrpc_c::value_boolean(true));
arrayData.push_back(xmlrpc_c::value_i8(val));
arrayData.push_back(xmlrpc_c::value_int(SUCCESS));
xmlrpc_c::value_array arrayresult(arrayData);
*(att.retval) = arrayresult;
}
/* -------------------------------------------------------------------------- */
/* -------------------------------------------------------------------------- */
int Request::get_info(
PoolSQL * pool,
int id,

View File

@ -297,15 +297,15 @@ void ZoneReplicateLog::request_execute(xmlrpc_c::paramList const& paramList,
RaftManager * raftm = nd.get_raftm();
int leader_id = xmlrpc_c::value_int(paramList.getInt(1));
uint64_t leader_commit = xmlrpc_c::value_i8(paramList.getI8(2));
int leader_id = xmlrpc_c::value_int(paramList.getInt(1));
int leader_commit = xmlrpc_c::value_int(paramList.getInt(2));
unsigned int leader_term = xmlrpc_c::value_int(paramList.getInt(3));
uint64_t index = xmlrpc_c::value_i8(paramList.getI8(4));
unsigned int index = xmlrpc_c::value_int(paramList.getInt(4));
unsigned int term = xmlrpc_c::value_int(paramList.getInt(5));
uint64_t prev_index = xmlrpc_c::value_i8(paramList.getI8(6));
unsigned int prev_index = xmlrpc_c::value_int(paramList.getInt(6));
unsigned int prev_term = xmlrpc_c::value_int(paramList.getInt(7));
uint64_t fed_index = xmlrpc_c::value_i8(paramList.getI8(8));
unsigned int fed_index = xmlrpc_c::value_int(paramList.getInt(8));
string sql = xmlrpc_c::value_string(paramList.getString(9));
@ -370,12 +370,11 @@ void ZoneReplicateLog::request_execute(xmlrpc_c::paramList const& paramList,
if ( index == 0 && prev_index == 0 && term == 0 && prev_term == 0 &&
sql.empty() )
{
unsigned int lterm;
uint64_t lindex;
unsigned int lindex, lterm;
logdb->get_last_record_index(lindex, lterm);
uint64_t new_commit = raftm->update_commit(leader_commit, lindex);
unsigned int new_commit = raftm->update_commit(leader_commit, lindex);
logdb->apply_log_records(new_commit);
@ -444,7 +443,7 @@ void ZoneReplicateLog::request_execute(xmlrpc_c::paramList const& paramList,
return;
}
uint64_t new_commit = raftm->update_commit(leader_commit, index);
unsigned int new_commit = raftm->update_commit(leader_commit, index);
logdb->apply_log_records(new_commit);
@ -465,13 +464,12 @@ void ZoneVoteRequest::request_execute(xmlrpc_c::paramList const& paramList,
unsigned int candidate_term = xmlrpc_c::value_int(paramList.getInt(1));
unsigned int candidate_id = xmlrpc_c::value_int(paramList.getInt(2));
uint64_t candidate_log_index = xmlrpc_c::value_i8(paramList.getI8(3));
unsigned int candidate_log_index = xmlrpc_c::value_int(paramList.getInt(3));
unsigned int candidate_log_term = xmlrpc_c::value_int(paramList.getInt(4));
unsigned int current_term = raftm->get_term();
unsigned int log_term;
uint64_t log_index;
unsigned int log_index, log_term;
logdb->get_last_record_index(log_index, log_term);
@ -570,13 +568,13 @@ void ZoneReplicateFedLog::request_execute(xmlrpc_c::paramList const& paramList,
FedReplicaManager * frm = nd.get_frm();
uint64_t index = xmlrpc_c::value_i8(paramList.getI8(1));
uint64_t prev = xmlrpc_c::value_i8(paramList.getI8(2));
int index = xmlrpc_c::value_int(paramList.getInt(1));
int prev = xmlrpc_c::value_int(paramList.getInt(2));
string sql = xmlrpc_c::value_string(paramList.getString(3));
if (!att.is_oneadmin())
{
att.replication_idx = UINT64_MAX;
att.resp_id = -1;
failure_response(AUTHORIZATION, att);
return;
@ -585,7 +583,7 @@ void ZoneReplicateFedLog::request_execute(xmlrpc_c::paramList const& paramList,
if ( nd.is_cache() )
{
att.resp_msg = "Server is in cache mode.";
att.replication_idx = UINT64_MAX;
att.resp_id = -1;
failure_response(ACTION, att);
return;
@ -598,9 +596,9 @@ void ZoneReplicateFedLog::request_execute(xmlrpc_c::paramList const& paramList,
NebulaLog::log("ReM", Log::ERROR, oss);
att.resp_msg = oss.str();
att.replication_idx = UINT64_MAX;
att.resp_id = -1;
failure_response_replication(att);
failure_response(ACTION, att);
return;
}
@ -611,28 +609,28 @@ void ZoneReplicateFedLog::request_execute(xmlrpc_c::paramList const& paramList,
NebulaLog::log("ReM", Log::INFO, oss);
att.resp_msg = oss.str();
att.replication_idx = UINT64_MAX;
att.resp_id = - 1;
failure_response_replication(att);
failure_response(ACTION, att);
return;
}
uint64_t rc = frm->apply_log_record(index, prev, sql);
int rc = frm->apply_log_record(index, prev, sql);
if ( rc == 0 )
{
success_response(index, att);
}
else if ( rc == UINT64_MAX )
else if ( rc < 0 )
{
oss << "Error replicating log entry " << index << " in zone";
NebulaLog::log("ReM", Log::INFO, oss);
att.resp_msg = oss.str();
att.replication_idx = index;
att.resp_id = index;
failure_response_replication(att);
failure_response(ACTION, att);
}
else // rc == last_index in log
{
@ -641,9 +639,9 @@ void ZoneReplicateFedLog::request_execute(xmlrpc_c::paramList const& paramList,
NebulaLog::log("ReM", Log::INFO, oss);
att.resp_msg = oss.str();
att.replication_idx = rc;
att.resp_id = rc;
failure_response_replication(att);
failure_response(ACTION, att);
}
return;

View File

@ -28,8 +28,8 @@ const char * LogDB::table = "logdb";
const char * LogDB::db_names = "log_index, term, sqlcmd, timestamp, fed_index, applied";
const char * LogDB::db_bootstrap = "CREATE TABLE IF NOT EXISTS "
"logdb (log_index BIGINT UNSIGNED PRIMARY KEY, term INTEGER, sqlcmd MEDIUMTEXT, "
"timestamp INTEGER, fed_index BIGINT UNSIGNED, applied BOOLEAN)";
"logdb (log_index INTEGER PRIMARY KEY, term INTEGER, sqlcmd MEDIUMTEXT, "
"timestamp INTEGER, fed_index INTEGER, applied BOOLEAN)";
/* -------------------------------------------------------------------------- */
@ -68,25 +68,15 @@ int LogDBRecord::select_cb(void *nil, int num, char **values, char **names)
std::string * _sql;
std::istringstream iss;
iss.str(string(values[0]));
iss >> index;
iss.clear();
index = static_cast<unsigned int>(atoi(values[0]));
term = static_cast<unsigned int>(atoi(values[1]));
zsql = values[2];
timestamp = static_cast<unsigned int>(atoi(values[3]));
iss.str(string(values[4]));
iss >> fed_index;
iss.clear();
iss.str(string(values[5]));
iss >> prev_index;
iss.clear();
fed_index = static_cast<unsigned int>(atoi(values[4]));
prev_index = static_cast<unsigned int>(atoi(values[5]));
prev_term = static_cast<unsigned int>(atoi(values[6]));
_sql = one_util::zlib_decompress(zsql, true);
@ -114,11 +104,11 @@ int LogDBRecord::select_cb(void *nil, int num, char **values, char **names)
/* -------------------------------------------------------------------------- */
/* -------------------------------------------------------------------------- */
LogDB::LogDB(SqlDB * _db, bool _solo, bool _cache, uint64_t _lret, uint64_t _lp):
LogDB::LogDB(SqlDB * _db, bool _solo, bool _cache, unsigned int _lret, unsigned int _lp):
solo(_solo), cache(_cache), db(_db), next_index(0), last_applied(-1), last_index(-1),
last_term(-1), log_retention(_lret), limit_purge(_lp)
{
uint64_t r, i;
int r, i;
pthread_mutex_init(&mutex, 0);
@ -144,18 +134,18 @@ LogDB::~LogDB()
/* -------------------------------------------------------------------------- */
/* -------------------------------------------------------------------------- */
int LogDB::setup_index(uint64_t& _last_applied, uint64_t& _last_index)
int LogDB::setup_index(int& _last_applied, int& _last_index)
{
int rc = 0;
std::ostringstream oss;
single_cb<uint64_t> cb;
single_cb<int> cb;
LogDBRecord lr;
_last_applied = 0;
_last_index = UINT64_MAX;
_last_index = -1;
pthread_mutex_lock(&mutex);
@ -165,14 +155,14 @@ int LogDB::setup_index(uint64_t& _last_applied, uint64_t& _last_index)
rc += db->exec_rd(oss, &cb);
if ( rc == 0 && cb.get_affected_rows() != 0)
cb.unset_callback();
if ( rc == 0 )
{
next_index = _last_index + 1;
last_index = _last_index;
}
cb.unset_callback();
oss.str("");
cb.set_callback(&_last_applied);
@ -205,11 +195,11 @@ int LogDB::setup_index(uint64_t& _last_applied, uint64_t& _last_index)
/* -------------------------------------------------------------------------- */
/* -------------------------------------------------------------------------- */
int LogDB::get_log_record(uint64_t index, LogDBRecord& lr)
int LogDB::get_log_record(unsigned int index, LogDBRecord& lr)
{
ostringstream oss;
uint64_t prev_index = index - 1;
unsigned int prev_index = index - 1;
if ( index == 0 )
{
@ -238,7 +228,7 @@ int LogDB::get_log_record(uint64_t index, LogDBRecord& lr)
/* -------------------------------------------------------------------------- */
/* -------------------------------------------------------------------------- */
void LogDB::get_last_record_index(uint64_t& _i, unsigned int& _t)
void LogDB::get_last_record_index(unsigned int& _i, unsigned int& _t)
{
pthread_mutex_lock(&mutex);
@ -251,13 +241,13 @@ void LogDB::get_last_record_index(uint64_t& _i, unsigned int& _t)
/* -------------------------------------------------------------------------- */
/* -------------------------------------------------------------------------- */
int LogDB::get_raft_state(std::string name, std::string &raft_xml)
int LogDB::get_raft_state(std::string &raft_xml)
{
ostringstream oss;
single_cb<std::string> cb;
oss << "SELECT body FROM system_attributes WHERE name = '" << name << "'";
oss << "SELECT sqlcmd FROM logdb WHERE log_index = -1 AND term = -1";
cb.set_callback(&raft_xml);
@ -276,7 +266,7 @@ int LogDB::get_raft_state(std::string name, std::string &raft_xml)
/* -------------------------------------------------------------------------- */
/* -------------------------------------------------------------------------- */
int LogDB::update_raft_state(std::string name, std::string& raft_xml)
int LogDB::update_raft_state(std::string& raft_xml)
{
std::ostringstream oss;
@ -287,7 +277,7 @@ int LogDB::update_raft_state(std::string name, std::string& raft_xml)
return -1;
}
oss << "UPDATE system_attributes SET body ='" << sql_db << "' WHERE name = '" << name << "'";
oss << "UPDATE logdb SET sqlcmd ='" << sql_db << "' WHERE log_index = -1";
db->free_str(sql_db);
@ -297,8 +287,8 @@ int LogDB::update_raft_state(std::string name, std::string& raft_xml)
/* -------------------------------------------------------------------------- */
/* -------------------------------------------------------------------------- */
int LogDB::insert(uint64_t index, int term, const std::string& sql, time_t tstamp,
uint64_t fed_index, bool replace)
int LogDB::insert(int index, int term, const std::string& sql, time_t tstamp,
int fed_index, bool replace)
{
std::ostringstream oss;
@ -394,14 +384,14 @@ int LogDB::apply_log_record(LogDBRecord * lr)
/* -------------------------------------------------------------------------- */
/* -------------------------------------------------------------------------- */
uint64_t LogDB::insert_log_record(uint64_t term, std::ostringstream& sql,
time_t timestamp, uint64_t fed_index)
int LogDB::insert_log_record(unsigned int term, std::ostringstream& sql,
time_t timestamp, int fed_index)
{
pthread_mutex_lock(&mutex);
uint64_t index = next_index;
unsigned int index = next_index;
uint64_t _fed_index;
int _fed_index;
if ( fed_index == 0 )
{
@ -433,7 +423,7 @@ uint64_t LogDB::insert_log_record(uint64_t term, std::ostringstream& sql,
next_index++;
if ( fed_index != UINT64_MAX )
if ( fed_index != -1 )
{
fed_log.insert(_fed_index);
}
@ -446,9 +436,8 @@ uint64_t LogDB::insert_log_record(uint64_t term, std::ostringstream& sql,
/* -------------------------------------------------------------------------- */
/* -------------------------------------------------------------------------- */
int LogDB::insert_log_record(uint64_t index, unsigned int term,
std::ostringstream& sql, time_t timestamp, uint64_t fed_index,
bool replace)
int LogDB::insert_log_record(unsigned int index, unsigned int term,
std::ostringstream& sql, time_t timestamp, int fed_index, bool replace)
{
int rc;
@ -467,7 +456,7 @@ int LogDB::insert_log_record(uint64_t index, unsigned int term,
next_index = last_index + 1;
}
if ( fed_index != UINT64_MAX )
if ( fed_index != -1 )
{
fed_log.insert(fed_index);
}
@ -481,7 +470,7 @@ int LogDB::insert_log_record(uint64_t index, unsigned int term,
/* -------------------------------------------------------------------------- */
/* -------------------------------------------------------------------------- */
int LogDB::_exec_wr(ostringstream& cmd, uint64_t federated)
int LogDB::_exec_wr(ostringstream& cmd, int federated_index)
{
int rc;
@ -496,7 +485,7 @@ int LogDB::_exec_wr(ostringstream& cmd, uint64_t federated)
if ( rc == 0 && Nebula::instance().is_federation_enabled() )
{
insert_log_record(0, cmd, time(0), federated);
insert_log_record(0, cmd, time(0), federated_index);
pthread_mutex_lock(&mutex);
@ -521,9 +510,9 @@ int LogDB::_exec_wr(ostringstream& cmd, uint64_t federated)
// -------------------------------------------------------------------------
// Insert log entry in the database and replicate on followers
// -------------------------------------------------------------------------
uint64_t rindex = insert_log_record(raftm->get_term(), cmd, 0, federated);
int rindex = insert_log_record(raftm->get_term(), cmd, 0, federated_index);
if ( rindex == UINT64_MAX )
if ( rindex == -1 )
{
return -1;
}
@ -534,7 +523,7 @@ int LogDB::_exec_wr(ostringstream& cmd, uint64_t federated)
/* -------------------------------------------------------------------------- */
/* -------------------------------------------------------------------------- */
int LogDB::delete_log_records(uint64_t start_index)
int LogDB::delete_log_records(unsigned int start_index)
{
std::ostringstream oss;
int rc;
@ -567,7 +556,7 @@ int LogDB::delete_log_records(uint64_t start_index)
/* -------------------------------------------------------------------------- */
/* -------------------------------------------------------------------------- */
int LogDB::apply_log_records(uint64_t commit_index)
int LogDB::apply_log_records(unsigned int commit_index)
{
pthread_mutex_lock(&mutex);
@ -602,10 +591,10 @@ int LogDB::purge_log()
empty_cb cb;
multiple_cb<std::vector, uint64_t> cb_info;
multiple_cb<std::vector, int> cb_info;
single_cb<string> cb_min_idx;
std::vector<uint64_t> maxmin_i;
std::vector<uint64_t> maxmin_e;
std::vector<int> maxmin_i;
std::vector<int> maxmin_e;
string min_idx;
int rc = 0;
@ -629,8 +618,8 @@ int LogDB::purge_log()
oss.str("");
oss << " SELECT MIN(i.log_index) FROM ("
<< " SELECT log_index FROM logdb WHERE fed_index = " << UINT64_MAX
<< " AND applied = 1 AND log_index >= 0 "
<< " SELECT log_index FROM logdb WHERE fed_index = -1 AND"
<< " applied = 1 AND log_index >= 0 "
<< " ORDER BY log_index DESC LIMIT " << log_retention
<< " ) AS i";
@ -644,7 +633,7 @@ int LogDB::purge_log()
oss.str("");
oss << "DELETE FROM logdb WHERE applied = 1 AND log_index >= 0 "
<< "AND fed_index = " << UINT64_MAX << " AND log_index < " << min_idx;
<< "AND fed_index = -1 AND log_index < " << min_idx;
if ( db->limit_support() )
{
@ -692,8 +681,8 @@ int LogDB::purge_log()
oss.str("");
oss << " SELECT MIN(i.log_index) FROM ("
<< " SELECT log_index FROM logdb WHERE fed_index != " << UINT64_MAX
<< " AND applied = 1 AND log_index >= 0 "
<< " SELECT log_index FROM logdb WHERE fed_index != -1 AND"
<< " applied = 1 AND log_index >= 0 "
<< " ORDER BY log_index DESC LIMIT " << log_retention
<< " ) AS i";
@ -707,7 +696,7 @@ int LogDB::purge_log()
oss.str("");
oss << "DELETE FROM logdb WHERE applied = 1 AND log_index >= 0 "
<< "AND fed_index != " << UINT64_MAX << " AND log_index < " << min_idx;
<< "AND fed_index != -1 AND log_index < " << min_idx;
if ( db->limit_support() )
{
@ -735,7 +724,7 @@ int LogDB::purge_log()
/* -------------------------------------------------------------------------- */
/* -------------------------------------------------------------------------- */
int LogDB::replicate(uint64_t rindex)
int LogDB::replicate(int rindex)
{
int rc;
@ -781,11 +770,11 @@ void LogDB::build_federated_index()
fed_log.clear();
set_cb<uint64_t> cb;
set_cb<int> cb;
cb.set_callback(&fed_log);
oss << "SELECT fed_index FROM " << table << " WHERE fed_index != " << UINT64_MAX;
oss << "SELECT fed_index FROM " << table << " WHERE fed_index != -1 ";
db->exec_rd(oss, &cb);
@ -795,15 +784,15 @@ void LogDB::build_federated_index()
/* -------------------------------------------------------------------------- */
/* -------------------------------------------------------------------------- */
uint64_t LogDB::last_federated()
int LogDB::last_federated()
{
pthread_mutex_lock(&mutex);
uint64_t findex = -1;
int findex = -1;
if ( !fed_log.empty() )
{
set<uint64_t>::reverse_iterator rit;
set<int>::reverse_iterator rit;
rit = fed_log.rbegin();
@ -817,13 +806,13 @@ uint64_t LogDB::last_federated()
/* -------------------------------------------------------------------------- */
uint64_t LogDB::previous_federated(uint64_t i)
int LogDB::previous_federated(int i)
{
set<uint64_t>::iterator it;
set<int>::iterator it;
pthread_mutex_lock(&mutex);
uint64_t findex = UINT64_MAX;
int findex = -1;
it = fed_log.find(i);
@ -839,13 +828,13 @@ uint64_t LogDB::previous_federated(uint64_t i)
/* -------------------------------------------------------------------------- */
uint64_t LogDB::next_federated(uint64_t i)
int LogDB::next_federated(int i)
{
set<uint64_t>::iterator it;
set<int>::iterator it;
pthread_mutex_lock(&mutex);
uint64_t findex = UINT64_MAX;
int findex = -1;
it = fed_log.find(i);