1
0
mirror of https://github.com/OpenNebula/one.git synced 2025-01-28 17:47:03 +03:00

Merge branch 'f2722'

This commit is contained in:
Ruben S. Montero 2019-04-15 17:08:01 +02:00
commit e25e25ca6b
No known key found for this signature in database
GPG Key ID: A0CEA6FA880A1D87
14 changed files with 422 additions and 236 deletions

View File

@ -56,7 +56,7 @@ public:
* @param sql command to apply to DB
* @return 0 on success, last_index if missing records, -1 on DB error
*/
int apply_log_record(int index, int prev, const std::string& sql);
uint64_t apply_log_record(uint64_t index, uint64_t prev, const std::string& sql);
/**
* Record was successfully replicated on zone, increase next index and
@ -70,7 +70,7 @@ public:
* send any pending records.
* @param zone_id
*/
void replicate_failure(int zone_id, int zone_last);
void replicate_failure(int zone_id, uint64_t zone_last);
/**
* XML-RPC API call to replicate a log entry on slaves
@ -80,7 +80,7 @@ public:
* @param error description if any
* @return 0 on success -1 if a xml-rpc/network error occurred
*/
int xmlrpc_replicate_log(int zone_id, bool& success, int& last,
int xmlrpc_replicate_log(int zone_id, bool& success, uint64_t& last,
std::string& err);
/**
@ -167,8 +167,8 @@ private:
struct ZoneServers
{
ZoneServers(int z, unsigned int l, const std::string& s):
zone_id(z), endpoint(s), next(l), last(-1){};
ZoneServers(int z, uint64_t l, const std::string& s):
zone_id(z), endpoint(s), next(l), last(UINT64_MAX){};
~ZoneServers(){};
@ -176,9 +176,9 @@ private:
std::string endpoint;
int next;
uint64_t next;
int last;
uint64_t last;
};
std::map<int, ZoneServers *> zones;

View File

@ -32,9 +32,9 @@ public:
/**
* Index for this log entry (and previous)
*/
unsigned int index;
uint64_t index;
unsigned int prev_index;
uint64_t prev_index;
/**
* Term where this log (and previous) entry was generated
@ -57,7 +57,7 @@ public:
* The index in the federation, -1 if the log entry is not federated.
* At master fed_index is equal to index.
*/
int fed_index;
uint64_t fed_index;
/**
* Sets callback to load register from DB
@ -82,8 +82,8 @@ private:
class LogDB : public SqlDB
{
public:
LogDB(SqlDB * _db, bool solo, bool cache, unsigned int log_retention,
unsigned int limit_purge);
LogDB(SqlDB * _db, bool solo, bool cache, uint64_t log_retention,
uint64_t limit_purge);
virtual ~LogDB();
@ -97,20 +97,20 @@ public:
* @param lr logDBrecored to load from the DB
* @return 0 on success -1 otherwise
*/
int get_log_record(unsigned int index, LogDBRecord& lr);
int get_log_record(uint64_t index, LogDBRecord& lr);
/**
* Applies the SQL command of the given record to the database. The
* timestamp of the record is updated.
* @param index of the log record
*/
int apply_log_records(unsigned int commit_index);
int apply_log_records(uint64_t commit_index);
/**
* Deletes the record in start_index and all that follow it
* @param start_index first log record to delete
*/
int delete_log_records(unsigned int start_index);
int delete_log_records(uint64_t start_index);
/**
* Inserts a new log record in the database. This method should be used
@ -119,13 +119,13 @@ public:
* @param term for the record
* @param sql command of the record
* @param timestamp associated to this record
* @param fed_index index in the federation -1 if not federated
* @param fed_index index in the federation UINT64_MAX if not federated
* @param replace if true will replace the record if it exists
*
* @return -1 on failure, index of the inserted record on success
* @return 0 on sucess, -1 on failure
*/
int insert_log_record(unsigned int index, unsigned int term,
std::ostringstream& sql, time_t timestamp, int fed_index,
int insert_log_record(uint64_t index, unsigned int term,
std::ostringstream& sql, time_t timestamp, uint64_t fed_index,
bool replace);
/**
@ -135,7 +135,7 @@ public:
*
* @return 0 on success, -1 in case of failure
*/
int replicate(int rindex);
int replicate(uint64_t rindex);
//--------------------------------------------------------------------------
// Functions to manage the Raft state. Log record 0, term -1
@ -145,14 +145,14 @@ public:
* @param raft attributes in XML format
* @return 0 on success
*/
int update_raft_state(std::string& raft_xml);
int update_raft_state(std::string name, std::string& raft_xml);
/**
* Returns the raft state attributes as stored in the log
* @param raft_xml attributes in xml
* @return 0 on success
*/
int get_raft_state(std::string &raft_xml);
int get_raft_state(std::string name, std::string &raft_xml);
/**
* Purge log records. Delete old records applied to database upto the
@ -170,7 +170,7 @@ public:
*/
int exec_wr(ostringstream& cmd)
{
return _exec_wr(cmd, -1);
return _exec_wr(cmd, UINT64_MAX);
}
int exec_wr(ostringstream& cmd, Callbackable* obj)
@ -183,7 +183,7 @@ public:
return _exec_wr(cmd, 0);
}
int exec_federated_wr(ostringstream& cmd, int index)
int exec_federated_wr(ostringstream& cmd, uint64_t index)
{
return _exec_wr(cmd, index);
}
@ -234,14 +234,14 @@ public:
*
* @return 0 on success
*/
int setup_index(int& last_applied, int& last_index);
int setup_index(uint64_t& last_applied, uint64_t& last_index);
/**
* Gets the index & term of the last record in the log
* @param _i the index
* @param _t the term
*/
void get_last_record_index(unsigned int& _i, unsigned int& _t);
void get_last_record_index(uint64_t& _i, unsigned int& _t);
// -------------------------------------------------------------------------
// Federate log methods
@ -249,11 +249,11 @@ public:
/**
* Get last federated index, and previous
*/
int last_federated();
uint64_t last_federated();
int previous_federated(int index);
uint64_t previous_federated(uint64_t index);
int next_federated(int index);
uint64_t next_federated(uint64_t index);
protected:
int exec_ext(std::ostringstream& cmd, Callbackable *obj, bool quiet)
@ -282,17 +282,17 @@ private:
/**
* Index to be used by the next logDB record
*/
unsigned int next_index;
uint64_t next_index;
/**
* Index of the last log entry applied to the DB state
*/
unsigned int last_applied;
uint64_t last_applied;
/**
* Index of the last (highest) log entry
*/
unsigned int last_index;
uint64_t last_index;
/**
* term of the last (highest) log entry
@ -302,12 +302,12 @@ private:
/**
* Max number of records to keep in the database
*/
unsigned int log_retention;
uint64_t log_retention;
/**
* Max number of logs purged on each call.
*/
unsigned int limit_purge;
uint64_t limit_purge;
// -------------------------------------------------------------------------
// Federated Log
@ -316,7 +316,7 @@ private:
* The federated log stores a map with the federated log index and its
* corresponding local index. For the master both are the same
*/
std::set<int> fed_log;
std::set<uint64_t> fed_log;
/**
* Generates the federated index, it should be called whenever a server
@ -337,10 +337,10 @@ private:
* Replicates writes in the followers and apply changes to DB state once
* it is safe to do so.
*
* @param federated -1 not federated (fed_index = -1), 0 generate fed index
* (fed_index = index), > 0 set (fed_index = federated)
* @param federated UINT64_MAX not federated (fed_index = UINT64_MAX), 0
* generate fed index (fed_index = index), > 0 set (fed_index = federated)
*/
int _exec_wr(ostringstream& cmd, int federated);
int _exec_wr(ostringstream& cmd, uint64_t federated);
/**
* Applies the SQL command of the given record to the database. The
@ -360,8 +360,8 @@ private:
*
* @return 0 on success
*/
int insert(int index, int term, const std::string& sql, time_t ts, int fi,
bool replace);
int insert(uint64_t index, unsigned int term, const std::string& sql,
time_t ts, uint64_t fi, bool replace);
/**
* Inserts a new log record in the database. If the record is successfully
@ -373,8 +373,8 @@ private:
*
* @return -1 on failure, index of the inserted record on success
*/
int insert_log_record(unsigned int term, std::ostringstream& sql,
time_t timestamp, int federated);
uint64_t insert_log_record(unsigned int term, std::ostringstream& sql,
time_t timestamp, uint64_t fed_index);
};
// -----------------------------------------------------------------------------

View File

@ -95,7 +95,7 @@ public:
* Allocate a replica request fot the given index.
* @param rindex of the record for the request
*/
void replicate_allocate(int rindex)
void replicate_allocate(uint64_t rindex)
{
requests.allocate(rindex);
}
@ -145,9 +145,9 @@ public:
return _term;
}
unsigned int get_commit()
uint64_t get_commit()
{
unsigned int _commit;
uint64_t _commit;
pthread_mutex_lock(&mutex);
@ -164,7 +164,7 @@ public:
* @param index of the last record inserted in the database
* @return the updated commit index
*/
unsigned int update_commit(unsigned int leader_commit, unsigned int index);
uint64_t update_commit(uint64_t leader_commit, uint64_t index);
/**
* Evaluates a vote request. It is granted if no vote has been granted in
@ -222,12 +222,12 @@ public:
/**
* Get next index to send to the follower
* @param follower server id
* @return -1 on failure, the next index if success
* @return UINT64_MAX on failure, the next index if success
*/
int get_next_index(int follower_id)
uint64_t get_next_index(int follower_id)
{
std::map<int, unsigned int>::iterator it;
unsigned int _index = -1;
std::map<int, uint64_t>::iterator it;
uint64_t _index = UINT64_MAX;
pthread_mutex_lock(&mutex);
@ -275,7 +275,7 @@ public:
* @param error describing error if any
* @return -1 if a XMl-RPC (network) error occurs, 0 otherwise
*/
int xmlrpc_request_vote(int follower_id, unsigned int lindex,
int xmlrpc_request_vote(int follower_id, uint64_t lindex,
unsigned int lterm, bool& success, unsigned int& fterm,
std::string& error);
@ -369,6 +369,11 @@ private:
*/
Template raft_state;
/**
* Value for name column in system_attributes table for raft state.
*/
static const string raft_state_name;
/**
* After becoming a leader it is replicating and applying any pending
* log entry.
@ -408,11 +413,11 @@ private:
HeartBeatManager heartbeat_manager;
unsigned int commit;
uint64_t commit;
std::map<int, unsigned int> next;
std::map<int, uint64_t> next;
std::map<int, unsigned int> match;
std::map<int, uint64_t> match;
std::map<int, std::string> servers;
@ -465,6 +470,11 @@ private:
* Makes this server leader, and start replica threads
*/
void leader();
/**
* Init the raft state status row.
*/
int init_raft_state(const std::string& raft_xml);
};
#endif /*RAFT_MANAGER_H_*/

View File

@ -27,7 +27,7 @@
class ReplicaRequest : public SyncRequest
{
public:
ReplicaRequest(unsigned int i):_index(i), _to_commit(-1), _replicas(1){};
ReplicaRequest(uint64_t i):_index(i), _to_commit(-1), _replicas(1){};
~ReplicaRequest(){};
@ -64,7 +64,7 @@ public:
/* ---------------------------------------------------------------------- */
/* Class access methods */
/* ---------------------------------------------------------------------- */
int index()
uint64_t index()
{
return _index;
}
@ -88,7 +88,7 @@ private:
/**
* Index for this log entry
*/
unsigned int _index;
uint64_t _index;
/**
* Remaining number of servers that need to replicate this record to commit
@ -127,13 +127,13 @@ public:
*
* @return the number of replicas to commit, if 0 it can be committed
*/
int add_replica(int rindex)
int add_replica(uint64_t rindex)
{
int to_commit = -1;
pthread_mutex_lock(&mutex);
std::map<int, ReplicaRequest *>::iterator it = requests.find(rindex);
std::map<uint64_t, ReplicaRequest *>::iterator it = requests.find(rindex);
if ( it != requests.end() && it->second != 0 )
{
@ -157,7 +157,7 @@ public:
* on this request.
* @param rindex of the request
*/
void allocate(int rindex)
void allocate(uint64_t rindex)
{
pthread_mutex_lock(&mutex);
@ -172,11 +172,11 @@ public:
* @param rindex of the request
* @param rr replica request pointer
*/
void set(int rindex, ReplicaRequest * rr)
void set(uint64_t rindex, ReplicaRequest * rr)
{
pthread_mutex_lock(&mutex);
std::map<int, ReplicaRequest *>::iterator it = requests.find(rindex);
std::map<uint64_t, ReplicaRequest *>::iterator it = requests.find(rindex);
if ( it == requests.end() )
{
@ -194,11 +194,11 @@ public:
* Remove a replication request associated to this index
* @param rindex of the request
*/
void remove(int rindex)
void remove(uint64_t rindex)
{
pthread_mutex_lock(&mutex);
std::map<int, ReplicaRequest *>::iterator it = requests.find(rindex);
std::map<uint64_t, ReplicaRequest *>::iterator it = requests.find(rindex);
if ( it != requests.end() )
{
@ -215,7 +215,7 @@ public:
{
pthread_mutex_lock(&mutex);
std::map<int, ReplicaRequest *>::iterator it;
std::map<uint64_t, ReplicaRequest *>::iterator it;
for ( it = requests.begin() ; it != requests.end() ; ++it )
{
@ -239,13 +239,13 @@ public:
/**
* @return true if a replica request is set for this index
*/
bool is_replicable(int rindex)
bool is_replicable(uint64_t rindex)
{
pthread_mutex_lock(&mutex);
std::map<int, ReplicaRequest *>::iterator it = requests.find(rindex);
std::map<uint64_t, ReplicaRequest *>::iterator it = requests.find(rindex);
bool rc = it == requests.end() ||
bool rc = it == requests.end() ||
(it != requests.end() && it->second != 0);
pthread_mutex_unlock(&mutex);
@ -260,7 +260,7 @@ private:
/**
* Clients waiting for a log replication
*/
std::map<int, ReplicaRequest *> requests;
std::map<uint64_t, ReplicaRequest *> requests;
};
#endif /*REPLICA_REQUEST_H_*/

View File

@ -55,11 +55,14 @@ public:
int resp_id; /**< Id of the object */
string resp_msg; /**< Additional response message */
uint64_t replication_idx;
RequestAttributes()
{
resp_obj = PoolObjectSQL::NONE;
resp_id = -1;
resp_msg = "";
resp_obj = PoolObjectSQL::NONE;
resp_id = -1;
resp_msg = "";
replication_idx = UINT64_MAX;
};
RequestAttributes(const RequestAttributes& ra)
@ -84,6 +87,8 @@ public:
resp_obj = ra.resp_obj;
resp_id = ra.resp_id;
resp_msg = ra.resp_msg;
replication_idx = ra.replication_idx;
};
RequestAttributes(int _uid, int _gid, const RequestAttributes& ra)
@ -110,6 +115,8 @@ public:
resp_obj = PoolObjectSQL::NONE;
resp_id = -1;
resp_msg = "";
replication_idx = UINT64_MAX;
};
bool is_admin() const
@ -140,15 +147,16 @@ public:
* Error codes for the XML-RPC API
*/
enum ErrorCode {
SUCCESS = 0x0000,
AUTHENTICATION = 0x0100,
AUTHORIZATION = 0x0200,
NO_EXISTS = 0x0400,
ACTION = 0x0800,
XML_RPC_API = 0x1000,
INTERNAL = 0x2000,
ALLOCATE = 0x4000,
LOCKED = 0x8000
SUCCESS = 0x00000,
AUTHENTICATION = 0x00100,
AUTHORIZATION = 0x00200,
NO_EXISTS = 0x00400,
ACTION = 0x00800,
XML_RPC_API = 0x01000,
INTERNAL = 0x02000,
ALLOCATE = 0x04000,
LOCKED = 0x08000,
REPLICATION = 0x10000
};
/**
@ -287,6 +295,14 @@ protected:
*/
void success_response(bool val, RequestAttributes& att);
/**
* Builds an XML-RPC response updating retval. After calling this function
* the xml-rpc excute method should return
* @param val to be returned to the client
* @param att the specific request attributes
*/
void success_response(uint64_t val, RequestAttributes& att);
/**
* Builds an XML-RPC response updating retval. After calling this function
* the xml-rpc excute method should return. A descriptive error message

View File

@ -1487,7 +1487,8 @@ ONEDB_LOCAL_MIGRATOR_FILES="src/onedb/local/4.5.80_to_4.7.80.rb \
src/onedb/local/5.4.1_to_5.5.80.rb \
src/onedb/local/5.5.80_to_5.6.0.rb \
src/onedb/local/5.6.0_to_5.7.80.rb \
src/onedb/local/5.7.80_to_5.8.0.rb"
src/onedb/local/5.7.80_to_5.8.0.rb \
src/onedb/local/5.8.0_to_5.9.80.rb"
ONEDB_PATCH_FILES="src/onedb/patches/4.14_monitoring.rb \
src/onedb/patches/history_times.rb"

View File

@ -104,10 +104,14 @@ class OneDBBacKEnd
index_sqlite: ["CREATE INDEX state_oid_idx ON vm_pool (state, oid);",
"CREATE INDEX applied_idx ON logdb (applied);"]
},
"5.9.80" => {
logdb: "log_index BIGINT UNSIGNED PRIMARY KEY, term INTEGER, sqlcmd MEDIUMTEXT, " <<
"timestamp INTEGER, fed_index BIGINT UNSIGNED, applied BOOLEAN"
}
}
LATEST_DB_VERSION = "5.7.80"
LATEST_DB_VERSION = "5.9.80"
def get_schema(type, version = nil)
if !version

View File

@ -0,0 +1,67 @@
# -------------------------------------------------------------------------- #
# Copyright 2002-2019, OpenNebula Project, OpenNebula Systems #
# #
# Licensed under the Apache License, Version 2.0 (the "License"); you may #
# not use this file except in compliance with the License. You may obtain #
# a copy of the License at #
# #
# http://www.apache.org/licenses/LICENSE-2.0 #
# #
# Unless required by applicable law or agreed to in writing, software #
# distributed under the License is distributed on an "AS IS" BASIS, #
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. #
# See the License for the specific language governing permissions and #
# limitations under the License. #
#--------------------------------------------------------------------------- #
require 'set'
require 'base64'
require 'zlib'
require 'pathname'
require 'yaml'
require 'opennebula'
require 'vcenter_driver'
$LOAD_PATH << File.dirname(__FILE__)
include OpenNebula
module Migrator
def db_version
'5.9.80'
end
def one_version
'OpenNebula 5.9.80'
end
def up
feature_2722
true
end
private
def feature_2722
@db.run 'DROP TABLE IF EXISTS old_logdb;'
@db.run 'ALTER TABLE logdb RENAME TO old_logdb;'
create_table(:logdb)
@db.run 'INSERT INTO system_attributes (name, body)' <<
'SELECT \'RAFT_STATE\', sqlcmd FROM old_logdb WHERE log_index = -1;'
@db.run 'DELETE FROM old_logdb WHERE log_index = -1;'
db.transaction do
# update virtual networks
@db.fetch('SELECT * FROM old_logdb') do |row|
row[:fed_index] = 18446744073709551615 if row[:fed_index] < 0
@db[:logdb].insert(row)
end
end
end
end

View File

@ -58,14 +58,14 @@ FedReplicaManager::~FedReplicaManager()
/* -------------------------------------------------------------------------- */
/* -------------------------------------------------------------------------- */
int FedReplicaManager::apply_log_record(int index, int prev,
uint64_t FedReplicaManager::apply_log_record(uint64_t index, uint64_t prev,
const std::string& sql)
{
int rc;
uint64_t rc;
pthread_mutex_lock(&mutex);
int last_index = logdb->last_federated();
uint64_t last_index = logdb->last_federated();
if ( prev != last_index )
{
@ -80,7 +80,7 @@ int FedReplicaManager::apply_log_record(int index, int prev,
if ( logdb->exec_federated_wr(oss, index) != 0 )
{
pthread_mutex_unlock(&mutex);
return -1;
return UINT64_MAX;
}
pthread_mutex_unlock(&mutex);
@ -154,7 +154,7 @@ void FedReplicaManager::update_zones(std::vector<int>& zone_ids)
pthread_mutex_lock(&mutex);
int last_index = logdb->last_federated();
uint64_t last_index = logdb->last_federated();
zones.clear();
@ -291,7 +291,7 @@ int FedReplicaManager::get_next_record(int zone_id, std::string& zedp,
zedp = zs->endpoint;
//Initialize next index for the zone
if ( zs->next == -1 )
if ( zs->next == UINT64_MAX )
{
zs->next= logdb->last_federated();
}
@ -301,10 +301,10 @@ int FedReplicaManager::get_next_record(int zone_id, std::string& zedp,
{
zs->next = logdb->next_federated(zs->next);
if ( zs->last == zs->next ) //no new records
if ( zs->next == UINT64_MAX ) //no new records
{
pthread_mutex_unlock(&mutex);
return -1;
return -2;
}
}
@ -346,7 +346,7 @@ void FedReplicaManager::replicate_success(int zone_id)
zs->next = logdb->next_federated(zs->next);
if ( zs->next != -1 )
if ( zs->next != UINT64_MAX )
{
ReplicaManager::replicate(zone_id);
}
@ -356,7 +356,7 @@ void FedReplicaManager::replicate_success(int zone_id)
/* -------------------------------------------------------------------------- */
void FedReplicaManager::replicate_failure(int zone_id, int last_zone)
void FedReplicaManager::replicate_failure(int zone_id, uint64_t last_zone)
{
pthread_mutex_lock(&mutex);
@ -366,14 +366,14 @@ void FedReplicaManager::replicate_failure(int zone_id, int last_zone)
{
ZoneServers * zs = it->second;
if ( last_zone >= 0 )
if ( last_zone != UINT64_MAX )
{
zs->last = last_zone;
zs->next = logdb->next_federated(zs->last);
}
if ( zs->next != -1 )
if ( zs->next != UINT64_MAX )
{
ReplicaManager::replicate(zone_id);
}
@ -387,7 +387,7 @@ void FedReplicaManager::replicate_failure(int zone_id, int last_zone)
/* -------------------------------------------------------------------------- */
int FedReplicaManager::xmlrpc_replicate_log(int zone_id, bool& success,
int& last, std::string& error)
uint64_t& last, std::string& error)
{
static const std::string replica_method = "one.zone.fedreplicate";
@ -397,12 +397,14 @@ int FedReplicaManager::xmlrpc_replicate_log(int zone_id, bool& success,
LogDBRecord lr;
if ( get_next_record(zone_id, zedp, lr, error) != 0 )
int rc = get_next_record(zone_id, zedp, lr, error);
if ( rc != 0 )
{
return -1;
return rc;
}
int prev_index = logdb->previous_federated(lr.index);
uint64_t prev_index = logdb->previous_federated(lr.index);
// -------------------------------------------------------------------------
// Get parameters to call append entries on follower
@ -416,8 +418,8 @@ int FedReplicaManager::xmlrpc_replicate_log(int zone_id, bool& success,
}
replica_params.add(xmlrpc_c::value_string(xmlrpc_secret));
replica_params.add(xmlrpc_c::value_int(lr.index));
replica_params.add(xmlrpc_c::value_int(prev_index));
replica_params.add(xmlrpc_c::value_i8(lr.index));
replica_params.add(xmlrpc_c::value_i8(prev_index));
replica_params.add(xmlrpc_c::value_string(lr.sql));
// -------------------------------------------------------------------------
@ -435,12 +437,12 @@ int FedReplicaManager::xmlrpc_replicate_log(int zone_id, bool& success,
if ( success ) //values[2] = error code (string)
{
last = xmlrpc_c::value_int(values[1]);
last = xmlrpc_c::value_i8(values[1]);
}
else
{
error = xmlrpc_c::value_string(values[1]);
last = xmlrpc_c::value_int(values[3]);
last = xmlrpc_c::value_i8(values[4]);
}
}
else
@ -457,3 +459,4 @@ int FedReplicaManager::xmlrpc_replicate_log(int zone_id, bool& success,
return xml_rc;
}

View File

@ -27,6 +27,8 @@
/* -------------------------------------------------------------------------- */
const time_t RaftManager::timer_period_ms = 50;
const string RaftManager::raft_state_name = "RAFT_STATE";
static void set_timeout(long long ms, struct timespec& timeout)
{
std::lldiv_t d;
@ -68,20 +70,20 @@ RaftManager::RaftManager(int id, const VectorAttribute * leader_hook_mad,
// - term
// -------------------------------------------------------------------------
if ( logdb->get_raft_state(raft_xml) != 0 )
if ( logdb->get_raft_state(raft_state_name, raft_xml) != 0 )
{
std::ostringstream bsr;
bsr << "bootstrap state";
bsr << "<MESSAGE>bootstrap state</MESSAGE>";
logdb->insert_log_record(-1, -1, bsr, 0, -1, false);
init_raft_state(bsr.str());
raft_state.replace("TERM", 0);
raft_state.replace("VOTEDFOR", -1);
raft_state.to_xml(raft_xml);
logdb->update_raft_state(raft_xml);
logdb->update_raft_state(raft_state_name, raft_xml);
votedfor = -1;
term = 0;
@ -300,7 +302,8 @@ void RaftManager::add_server(int follower_id, const std::string& endpoint)
LogDB * logdb = Nebula::instance().get_logdb();
unsigned int log_index, log_term;
unsigned int log_term;
uint64_t log_index;
logdb->get_last_record_index(log_index, log_term);
@ -379,7 +382,7 @@ extern "C" void * reconciling_thread(void *arg)
LogDB * logdb = nd.get_logdb();
RaftManager * rm = nd.get_raftm();
int * index = static_cast<int *>(arg);
uint64_t * index = static_cast<uint64_t *>(arg);
NebulaLog::log("RCM", Log::INFO, "Replicating log to followers");
@ -410,7 +413,7 @@ void RaftManager::leader()
std::map<int, std::string>::iterator it;
std::vector<int> _follower_ids;
int index, _applied, _next_index;
uint64_t index, _applied, _next_index;
std::map<int, std::string> _servers;
@ -508,7 +511,7 @@ void RaftManager::leader()
void RaftManager::follower(unsigned int _term)
{
int lapplied, lindex;
uint64_t lapplied, lindex;
Nebula& nd = Nebula::instance();
LogDB * logdb = nd.get_logdb();
@ -567,7 +570,7 @@ void RaftManager::follower(unsigned int _term)
if (!raft_state_xml.empty())
{
logdb->update_raft_state(raft_state_xml);
logdb->update_raft_state(raft_state_name, raft_state_xml);
}
}
@ -591,13 +594,11 @@ void RaftManager::replicate_log(ReplicaRequest * request)
//Count servers that need to replicate this record
int to_commit = num_servers / 2;
std::map<int, unsigned int>::iterator it;
std::map<int, uint64_t>::iterator it;
for (it = next.begin(); it != next.end() ; ++it)
{
int rindex = request->index();
if ( rindex < (int) it->second )
if ( request->index() < it->second )
{
to_commit--;
}
@ -635,13 +636,14 @@ void RaftManager::replicate_success(int follower_id)
{
std::map<int, ReplicaRequest *>::iterator it;
std::map<int, unsigned int>::iterator next_it;
std::map<int, unsigned int>::iterator match_it;
std::map<int, uint64_t>::iterator next_it;
std::map<int, uint64_t>::iterator match_it;
Nebula& nd = Nebula::instance();
LogDB * logdb = nd.get_logdb();
unsigned int db_lindex, db_lterm;
unsigned int db_lterm;
uint64_t db_lindex;
logdb->get_last_record_index(db_lindex, db_lterm);
@ -656,7 +658,7 @@ void RaftManager::replicate_success(int follower_id)
return;
}
unsigned int replicated_index = next_it->second;
uint64_t replicated_index = next_it->second;
match_it->second = replicated_index;
next_it->second = replicated_index + 1;
@ -680,7 +682,7 @@ void RaftManager::replicate_success(int follower_id)
void RaftManager::replicate_failure(int follower_id)
{
std::map<int, unsigned int>::iterator next_it;
std::map<int, uint64_t>::iterator next_it;
pthread_mutex_lock(&mutex);
@ -722,10 +724,9 @@ void RaftManager::update_last_heartbeat(int _leader_id)
/* -------------------------------------------------------------------------- */
/* -------------------------------------------------------------------------- */
unsigned int RaftManager::update_commit(unsigned int leader_commit,
unsigned int index)
uint64_t RaftManager::update_commit(uint64_t leader_commit, uint64_t index)
{
unsigned int _commit;
uint64_t _commit;
pthread_mutex_lock(&mutex);
@ -775,7 +776,7 @@ int RaftManager::update_votedfor(int _votedfor)
pthread_mutex_unlock(&mutex);
logdb->update_raft_state(raft_state_xml);
logdb->update_raft_state(raft_state_name, raft_state_xml);
return 0;
}
@ -886,7 +887,8 @@ void RaftManager::timer_action(const ActionRequest& ar)
void RaftManager::request_vote()
{
unsigned int lindex, lterm, fterm, _term;
unsigned int lterm, fterm, _term;
uint64_t lindex;
int _server_id;
std::map<int, std::string> _servers;
@ -947,7 +949,7 @@ void RaftManager::request_vote()
pthread_mutex_unlock(&mutex);
logdb->update_raft_state(raft_state_xml);
logdb->update_raft_state(raft_state_name, raft_state_xml);
logdb->get_last_record_index(lindex, lterm);
@ -1028,7 +1030,7 @@ void RaftManager::request_vote()
pthread_mutex_unlock(&mutex);
logdb->update_raft_state(raft_state_xml);
logdb->update_raft_state(raft_state_name, raft_state_xml);
srand(_server_id+1);
@ -1037,7 +1039,7 @@ void RaftManager::request_vote()
oss.str("");
oss << "No leader found, starting new election in " << ms << "ms";
oss << "No leader found, starting new election in " << ms << "ms";
NebulaLog::log("RCM", Log::INFO, oss);
set_timeout(ms, etimeout);
@ -1054,8 +1056,8 @@ int RaftManager::xmlrpc_replicate_log(int follower_id, LogDBRecord * lr,
bool& success, unsigned int& fterm, std::string& error)
{
int _server_id;
int _commit;
int _term;
uint64_t _commit;
unsigned int _term;
std::string xmlrpc_secret;
static const std::string replica_method = "one.zone.replicate";
@ -1099,13 +1101,13 @@ int RaftManager::xmlrpc_replicate_log(int follower_id, LogDBRecord * lr,
replica_params.add(xmlrpc_c::value_string(xmlrpc_secret));
replica_params.add(xmlrpc_c::value_int(_server_id));
replica_params.add(xmlrpc_c::value_int(_commit));
replica_params.add(xmlrpc_c::value_i8(_commit));
replica_params.add(xmlrpc_c::value_int(_term));
replica_params.add(xmlrpc_c::value_int(lr->index));
replica_params.add(xmlrpc_c::value_i8(lr->index));
replica_params.add(xmlrpc_c::value_int(lr->term));
replica_params.add(xmlrpc_c::value_int(lr->prev_index));
replica_params.add(xmlrpc_c::value_i8(lr->prev_index));
replica_params.add(xmlrpc_c::value_int(lr->prev_term));
replica_params.add(xmlrpc_c::value_int(lr->fed_index));
replica_params.add(xmlrpc_c::value_i8(lr->fed_index));
replica_params.add(xmlrpc_c::value_string(lr->sql));
// -------------------------------------------------------------------------
@ -1147,12 +1149,12 @@ int RaftManager::xmlrpc_replicate_log(int follower_id, LogDBRecord * lr,
/* -------------------------------------------------------------------------- */
/* -------------------------------------------------------------------------- */
int RaftManager::xmlrpc_request_vote(int follower_id, unsigned int lindex,
int RaftManager::xmlrpc_request_vote(int follower_id, uint64_t lindex,
unsigned int lterm, bool& success, unsigned int& fterm,
std::string& error)
{
int _server_id;
int _term;
unsigned int _term;
std::string xmlrpc_secret;
static const std::string replica_method = "one.zone.voterequest";
@ -1196,7 +1198,7 @@ int RaftManager::xmlrpc_request_vote(int follower_id, unsigned int lindex,
replica_params.add(xmlrpc_c::value_string(xmlrpc_secret));
replica_params.add(xmlrpc_c::value_int(_term));
replica_params.add(xmlrpc_c::value_int(_server_id));
replica_params.add(xmlrpc_c::value_int(lindex));
replica_params.add(xmlrpc_c::value_i8(lindex));
replica_params.add(xmlrpc_c::value_int(lterm));
// -------------------------------------------------------------------------
@ -1243,8 +1245,8 @@ std::string& RaftManager::to_xml(std::string& raft_xml)
Nebula& nd = Nebula::instance();
LogDB * logdb = nd.get_logdb();
unsigned int lindex, lterm;
unsigned int lterm;
uint64_t lindex;
std::ostringstream oss;
logdb->get_last_record_index(lindex, lterm);
@ -1292,9 +1294,10 @@ std::string& RaftManager::to_xml(std::string& raft_xml)
void RaftManager::reset_index(int follower_id)
{
std::map<int, unsigned int>::iterator next_it;
std::map<int, uint64_t>::iterator next_it;
unsigned int log_index, log_term;
unsigned int log_term;
uint64_t log_index;
LogDB * logdb = Nebula::instance().get_logdb();
@ -1311,3 +1314,15 @@ void RaftManager::reset_index(int follower_id)
pthread_mutex_unlock(&mutex);
}
/* -------------------------------------------------------------------------- */
/* -------------------------------------------------------------------------- */
int RaftManager::init_raft_state(const std::string& raft_xml)
{
string error;
Nebula& nd = Nebula::instance();
return nd.insert_sys_attribute(raft_state_name, raft_xml, error);
}

View File

@ -184,7 +184,19 @@ int RaftReplicaThread::replicate()
unsigned int term = raftm->get_term();
int next_index = raftm->get_next_index(follower_id);
uint64_t next_index = raftm->get_next_index(follower_id);
if ( next_index == UINT64_MAX )
{
ostringstream ess;
ess << "Failed to get next replica index for follower: " << follower_id;
NebulaLog::log("RCM", Log::ERROR, ess);
return -1;
}
if ( logdb->get_log_record(next_index, lr) != 0 )
{
@ -255,13 +267,19 @@ int FedReplicaThread::replicate()
bool success = false;
int last;
uint64_t last;
if ( frm->xmlrpc_replicate_log(follower_id, success, last, error) != 0 )
int rc = frm->xmlrpc_replicate_log(follower_id, success, last, error);
if ( rc == -1 )
{
NebulaLog::log("FRM", Log::ERROR, error);
return -1;
}
else if ( rc == -2 )
{
return 0;
}
if ( success )
{
@ -311,7 +329,7 @@ int HeartBeatThread::replicate()
lr.sql = "";
lr.timestamp = 0;
lr.fed_index = -1;
lr.fed_index = UINT64_MAX;
rc = raftm->xmlrpc_replicate_log(follower_id, &lr, success, fterm, error);

View File

@ -330,6 +330,9 @@ void Request::log_xmlrpc_value(const xmlrpc_c::value& v, std::ostringstream& oss
oss << ", "
<< static_cast<double>(xmlrpc_c::value_double(v));
break;
case xmlrpc_c::value::TYPE_I8:
oss << ", " << static_cast<uint64_t>(xmlrpc_c::value_i8(v));
break;
default:
oss << ", unknown param type";
break;
@ -728,6 +731,7 @@ void Request::failure_response(ErrorCode ec, const string& str_val,
arrayData.push_back(xmlrpc_c::value_string(str_val));
arrayData.push_back(xmlrpc_c::value_int(ec));
arrayData.push_back(xmlrpc_c::value_int(att.resp_id));
arrayData.push_back(xmlrpc_c::value_i8(att.replication_idx));
xmlrpc_c::value_array arrayresult(arrayData);
@ -809,6 +813,18 @@ string Request::failure_message(ErrorCode ec, RequestAttributes& att)
oss << " [" << att.resp_id << "].";
}
break;
case REPLICATION:
oss << "Error replicating log entry " << att.replication_idx;
if (att.resp_msg.empty())
{
oss << ".";
}
else
{
oss << ": " << att.resp_msg << ".";
}
break;
}
return oss.str();
@ -874,6 +890,23 @@ void Request::success_response(bool val, RequestAttributes& att)
/* -------------------------------------------------------------------------- */
/* -------------------------------------------------------------------------- */
void Request::success_response(uint64_t val, RequestAttributes& att)
{
vector<xmlrpc_c::value> arrayData;
arrayData.push_back(xmlrpc_c::value_boolean(true));
arrayData.push_back(xmlrpc_c::value_i8(val));
arrayData.push_back(xmlrpc_c::value_int(SUCCESS));
xmlrpc_c::value_array arrayresult(arrayData);
*(att.retval) = arrayresult;
}
/* -------------------------------------------------------------------------- */
/* -------------------------------------------------------------------------- */
int Request::get_info(
PoolSQL * pool,
int id,

View File

@ -297,15 +297,15 @@ void ZoneReplicateLog::request_execute(xmlrpc_c::paramList const& paramList,
RaftManager * raftm = nd.get_raftm();
int leader_id = xmlrpc_c::value_int(paramList.getInt(1));
int leader_commit = xmlrpc_c::value_int(paramList.getInt(2));
int leader_id = xmlrpc_c::value_int(paramList.getInt(1));
uint64_t leader_commit = xmlrpc_c::value_i8(paramList.getI8(2));
unsigned int leader_term = xmlrpc_c::value_int(paramList.getInt(3));
unsigned int index = xmlrpc_c::value_int(paramList.getInt(4));
uint64_t index = xmlrpc_c::value_i8(paramList.getI8(4));
unsigned int term = xmlrpc_c::value_int(paramList.getInt(5));
unsigned int prev_index = xmlrpc_c::value_int(paramList.getInt(6));
uint64_t prev_index = xmlrpc_c::value_i8(paramList.getI8(6));
unsigned int prev_term = xmlrpc_c::value_int(paramList.getInt(7));
unsigned int fed_index = xmlrpc_c::value_int(paramList.getInt(8));
uint64_t fed_index = xmlrpc_c::value_i8(paramList.getI8(8));
string sql = xmlrpc_c::value_string(paramList.getString(9));
@ -370,11 +370,12 @@ void ZoneReplicateLog::request_execute(xmlrpc_c::paramList const& paramList,
if ( index == 0 && prev_index == 0 && term == 0 && prev_term == 0 &&
sql.empty() )
{
unsigned int lindex, lterm;
unsigned int lterm;
uint64_t lindex;
logdb->get_last_record_index(lindex, lterm);
unsigned int new_commit = raftm->update_commit(leader_commit, lindex);
uint64_t new_commit = raftm->update_commit(leader_commit, lindex);
logdb->apply_log_records(new_commit);
@ -443,7 +444,7 @@ void ZoneReplicateLog::request_execute(xmlrpc_c::paramList const& paramList,
return;
}
unsigned int new_commit = raftm->update_commit(leader_commit, index);
uint64_t new_commit = raftm->update_commit(leader_commit, index);
logdb->apply_log_records(new_commit);
@ -464,12 +465,13 @@ void ZoneVoteRequest::request_execute(xmlrpc_c::paramList const& paramList,
unsigned int candidate_term = xmlrpc_c::value_int(paramList.getInt(1));
unsigned int candidate_id = xmlrpc_c::value_int(paramList.getInt(2));
unsigned int candidate_log_index = xmlrpc_c::value_int(paramList.getInt(3));
uint64_t candidate_log_index = xmlrpc_c::value_i8(paramList.getI8(3));
unsigned int candidate_log_term = xmlrpc_c::value_int(paramList.getInt(4));
unsigned int current_term = raftm->get_term();
unsigned int log_index, log_term;
unsigned int log_term;
uint64_t log_index;
logdb->get_last_record_index(log_index, log_term);
@ -568,13 +570,13 @@ void ZoneReplicateFedLog::request_execute(xmlrpc_c::paramList const& paramList,
FedReplicaManager * frm = nd.get_frm();
int index = xmlrpc_c::value_int(paramList.getInt(1));
int prev = xmlrpc_c::value_int(paramList.getInt(2));
uint64_t index = xmlrpc_c::value_i8(paramList.getI8(1));
uint64_t prev = xmlrpc_c::value_i8(paramList.getI8(2));
string sql = xmlrpc_c::value_string(paramList.getString(3));
if (!att.is_oneadmin())
{
att.resp_id = -1;
att.replication_idx = UINT64_MAX;
failure_response(AUTHORIZATION, att);
return;
@ -583,7 +585,7 @@ void ZoneReplicateFedLog::request_execute(xmlrpc_c::paramList const& paramList,
if ( nd.is_cache() )
{
att.resp_msg = "Server is in cache mode.";
att.resp_id = -1;
att.replication_idx = UINT64_MAX;
failure_response(ACTION, att);
return;
@ -596,9 +598,9 @@ void ZoneReplicateFedLog::request_execute(xmlrpc_c::paramList const& paramList,
NebulaLog::log("ReM", Log::ERROR, oss);
att.resp_msg = oss.str();
att.resp_id = -1;
att.replication_idx = UINT64_MAX;
failure_response(ACTION, att);
failure_response(REPLICATION, att);
return;
}
@ -609,28 +611,28 @@ void ZoneReplicateFedLog::request_execute(xmlrpc_c::paramList const& paramList,
NebulaLog::log("ReM", Log::INFO, oss);
att.resp_msg = oss.str();
att.resp_id = - 1;
att.replication_idx = UINT64_MAX;
failure_response(ACTION, att);
failure_response(REPLICATION, att);
return;
}
int rc = frm->apply_log_record(index, prev, sql);
uint64_t rc = frm->apply_log_record(index, prev, sql);
if ( rc == 0 )
{
success_response(index, att);
}
else if ( rc < 0 )
else if ( rc == UINT64_MAX )
{
oss << "Error replicating log entry " << index << " in zone";
NebulaLog::log("ReM", Log::INFO, oss);
att.resp_msg = oss.str();
att.resp_id = index;
att.replication_idx = index;
failure_response(ACTION, att);
failure_response(REPLICATION, att);
}
else // rc == last_index in log
{
@ -639,9 +641,9 @@ void ZoneReplicateFedLog::request_execute(xmlrpc_c::paramList const& paramList,
NebulaLog::log("ReM", Log::INFO, oss);
att.resp_msg = oss.str();
att.resp_id = rc;
att.replication_idx = rc;
failure_response(ACTION, att);
failure_response(REPLICATION, att);
}
return;

View File

@ -28,8 +28,8 @@ const char * LogDB::table = "logdb";
const char * LogDB::db_names = "log_index, term, sqlcmd, timestamp, fed_index, applied";
const char * LogDB::db_bootstrap = "CREATE TABLE IF NOT EXISTS "
"logdb (log_index INTEGER PRIMARY KEY, term INTEGER, sqlcmd MEDIUMTEXT, "
"timestamp INTEGER, fed_index INTEGER, applied BOOLEAN)";
"logdb (log_index BIGINT UNSIGNED PRIMARY KEY, term INTEGER, sqlcmd MEDIUMTEXT, "
"timestamp INTEGER, fed_index BIGINT UNSIGNED, applied BOOLEAN)";
/* -------------------------------------------------------------------------- */
@ -68,15 +68,30 @@ int LogDBRecord::select_cb(void *nil, int num, char **values, char **names)
std::string * _sql;
index = static_cast<unsigned int>(atoi(values[0]));
term = static_cast<unsigned int>(atoi(values[1]));
zsql = values[2];
std::istringstream iss;
timestamp = static_cast<unsigned int>(atoi(values[3]));
iss.str(string(values[0]));
iss >> index;
iss.clear();
fed_index = static_cast<unsigned int>(atoi(values[4]));
iss.str(string(values[1]));
iss >> term;
iss.clear();
zsql = values[2];
iss.str(string(values[3]));
iss >> timestamp;
iss.clear();
iss.str(string(values[4]));
iss >> fed_index;
iss.clear();
iss.str(string(values[5]));
iss >> prev_index;
iss.clear();
prev_index = static_cast<unsigned int>(atoi(values[5]));
prev_term = static_cast<unsigned int>(atoi(values[6]));
_sql = one_util::zlib_decompress(zsql, true);
@ -104,11 +119,11 @@ int LogDBRecord::select_cb(void *nil, int num, char **values, char **names)
/* -------------------------------------------------------------------------- */
/* -------------------------------------------------------------------------- */
LogDB::LogDB(SqlDB * _db, bool _solo, bool _cache, unsigned int _lret, unsigned int _lp):
solo(_solo), cache(_cache), db(_db), next_index(0), last_applied(-1), last_index(-1),
last_term(-1), log_retention(_lret), limit_purge(_lp)
LogDB::LogDB(SqlDB * _db, bool _solo, bool _cache, uint64_t _lret, uint64_t _lp):
solo(_solo), cache(_cache), db(_db), next_index(0), last_applied(-1),
last_index(-1), last_term(-1), log_retention(_lret), limit_purge(_lp)
{
int r, i;
uint64_t r, i;
pthread_mutex_init(&mutex, 0);
@ -120,7 +135,7 @@ LogDB::LogDB(SqlDB * _db, bool _solo, bool _cache, unsigned int _lret, unsigned
oss << time(0);
insert_log_record(0, 0, oss, time(0), -1, false);
insert_log_record(0, 0, oss, time(0), UINT64_MAX, false);
}
setup_index(r, i);
@ -134,18 +149,18 @@ LogDB::~LogDB()
/* -------------------------------------------------------------------------- */
/* -------------------------------------------------------------------------- */
int LogDB::setup_index(int& _last_applied, int& _last_index)
int LogDB::setup_index(uint64_t& _last_applied, uint64_t& _last_index)
{
int rc = 0;
std::ostringstream oss;
single_cb<int> cb;
single_cb<uint64_t> cb;
LogDBRecord lr;
_last_applied = 0;
_last_index = -1;
_last_index = UINT64_MAX;
pthread_mutex_lock(&mutex);
@ -155,14 +170,14 @@ int LogDB::setup_index(int& _last_applied, int& _last_index)
rc += db->exec_rd(oss, &cb);
cb.unset_callback();
if ( rc == 0 )
if ( rc == 0 && cb.get_affected_rows() != 0)
{
next_index = _last_index + 1;
last_index = _last_index;
}
cb.unset_callback();
oss.str("");
cb.set_callback(&_last_applied);
@ -195,11 +210,11 @@ int LogDB::setup_index(int& _last_applied, int& _last_index)
/* -------------------------------------------------------------------------- */
/* -------------------------------------------------------------------------- */
int LogDB::get_log_record(unsigned int index, LogDBRecord& lr)
int LogDB::get_log_record(uint64_t index, LogDBRecord& lr)
{
ostringstream oss;
unsigned int prev_index = index - 1;
uint64_t prev_index = index - 1;
if ( index == 0 )
{
@ -228,7 +243,7 @@ int LogDB::get_log_record(unsigned int index, LogDBRecord& lr)
/* -------------------------------------------------------------------------- */
/* -------------------------------------------------------------------------- */
void LogDB::get_last_record_index(unsigned int& _i, unsigned int& _t)
void LogDB::get_last_record_index(uint64_t& _i, unsigned int& _t)
{
pthread_mutex_lock(&mutex);
@ -241,13 +256,13 @@ void LogDB::get_last_record_index(unsigned int& _i, unsigned int& _t)
/* -------------------------------------------------------------------------- */
/* -------------------------------------------------------------------------- */
int LogDB::get_raft_state(std::string &raft_xml)
int LogDB::get_raft_state(std::string name, std::string &raft_xml)
{
ostringstream oss;
single_cb<std::string> cb;
oss << "SELECT sqlcmd FROM logdb WHERE log_index = -1 AND term = -1";
oss << "SELECT body FROM system_attributes WHERE name = '" << name << "'";
cb.set_callback(&raft_xml);
@ -266,7 +281,7 @@ int LogDB::get_raft_state(std::string &raft_xml)
/* -------------------------------------------------------------------------- */
/* -------------------------------------------------------------------------- */
int LogDB::update_raft_state(std::string& raft_xml)
int LogDB::update_raft_state(std::string name, std::string& raft_xml)
{
std::ostringstream oss;
@ -277,7 +292,8 @@ int LogDB::update_raft_state(std::string& raft_xml)
return -1;
}
oss << "UPDATE logdb SET sqlcmd ='" << sql_db << "' WHERE log_index = -1";
oss << "UPDATE system_attributes SET body ='" << sql_db
<< "' WHERE name = '" << name << "'";
db->free_str(sql_db);
@ -287,8 +303,8 @@ int LogDB::update_raft_state(std::string& raft_xml)
/* -------------------------------------------------------------------------- */
/* -------------------------------------------------------------------------- */
int LogDB::insert(int index, int term, const std::string& sql, time_t tstamp,
int fed_index, bool replace)
int LogDB::insert(uint64_t index, unsigned int term, const std::string& sql,
time_t tstamp, uint64_t fed_index, bool replace)
{
std::ostringstream oss;
@ -361,8 +377,8 @@ int LogDB::apply_log_record(LogDBRecord * lr)
{
std::ostringstream oss;
oss << "UPDATE logdb SET timestamp = " << time(0) << ", applied = 1" << " WHERE "
<< "log_index = " << lr->index << " AND timestamp = 0";
oss << "UPDATE logdb SET timestamp = " << time(0) << ", applied = 1"
<< " WHERE log_index = " << lr->index << " AND timestamp = 0";
if ( db->exec_wr(oss) != 0 )
{
@ -384,14 +400,14 @@ int LogDB::apply_log_record(LogDBRecord * lr)
/* -------------------------------------------------------------------------- */
/* -------------------------------------------------------------------------- */
int LogDB::insert_log_record(unsigned int term, std::ostringstream& sql,
time_t timestamp, int fed_index)
uint64_t LogDB::insert_log_record(unsigned int term, std::ostringstream& sql,
time_t timestamp, uint64_t fed_index)
{
pthread_mutex_lock(&mutex);
unsigned int index = next_index;
uint64_t index = next_index;
int _fed_index;
uint64_t _fed_index;
if ( fed_index == 0 )
{
@ -408,7 +424,7 @@ int LogDB::insert_log_record(unsigned int term, std::ostringstream& sql,
pthread_mutex_unlock(&mutex);
return -1;
return UINT64_MAX;
}
//allocate a replication request if log record is going to be replicated
@ -423,7 +439,7 @@ int LogDB::insert_log_record(unsigned int term, std::ostringstream& sql,
next_index++;
if ( fed_index != -1 )
if ( _fed_index != UINT64_MAX )
{
fed_log.insert(_fed_index);
}
@ -436,8 +452,9 @@ int LogDB::insert_log_record(unsigned int term, std::ostringstream& sql,
/* -------------------------------------------------------------------------- */
/* -------------------------------------------------------------------------- */
int LogDB::insert_log_record(unsigned int index, unsigned int term,
std::ostringstream& sql, time_t timestamp, int fed_index, bool replace)
int LogDB::insert_log_record(uint64_t index, unsigned int term,
std::ostringstream& sql, time_t timestamp, uint64_t fed_index,
bool replace)
{
int rc;
@ -456,7 +473,7 @@ int LogDB::insert_log_record(unsigned int index, unsigned int term,
next_index = last_index + 1;
}
if ( fed_index != -1 )
if ( fed_index != UINT64_MAX )
{
fed_log.insert(fed_index);
}
@ -470,7 +487,7 @@ int LogDB::insert_log_record(unsigned int index, unsigned int term,
/* -------------------------------------------------------------------------- */
/* -------------------------------------------------------------------------- */
int LogDB::_exec_wr(ostringstream& cmd, int federated_index)
int LogDB::_exec_wr(ostringstream& cmd, uint64_t federated)
{
int rc;
@ -485,7 +502,7 @@ int LogDB::_exec_wr(ostringstream& cmd, int federated_index)
if ( rc == 0 && Nebula::instance().is_federation_enabled() )
{
insert_log_record(0, cmd, time(0), federated_index);
insert_log_record(0, cmd, time(0), federated);
pthread_mutex_lock(&mutex);
@ -510,9 +527,9 @@ int LogDB::_exec_wr(ostringstream& cmd, int federated_index)
// -------------------------------------------------------------------------
// Insert log entry in the database and replicate on followers
// -------------------------------------------------------------------------
int rindex = insert_log_record(raftm->get_term(), cmd, 0, federated_index);
uint64_t rindex = insert_log_record(raftm->get_term(), cmd, 0, federated);
if ( rindex == -1 )
if ( rindex == UINT64_MAX )
{
return -1;
}
@ -523,7 +540,7 @@ int LogDB::_exec_wr(ostringstream& cmd, int federated_index)
/* -------------------------------------------------------------------------- */
/* -------------------------------------------------------------------------- */
int LogDB::delete_log_records(unsigned int start_index)
int LogDB::delete_log_records(uint64_t start_index)
{
std::ostringstream oss;
int rc;
@ -556,7 +573,7 @@ int LogDB::delete_log_records(unsigned int start_index)
/* -------------------------------------------------------------------------- */
/* -------------------------------------------------------------------------- */
int LogDB::apply_log_records(unsigned int commit_index)
int LogDB::apply_log_records(uint64_t commit_index)
{
pthread_mutex_lock(&mutex);
@ -591,10 +608,10 @@ int LogDB::purge_log()
empty_cb cb;
multiple_cb<std::vector, int> cb_info;
multiple_cb<std::vector, uint64_t> cb_info;
single_cb<string> cb_min_idx;
std::vector<int> maxmin_i;
std::vector<int> maxmin_e;
std::vector<uint64_t> maxmin_i;
std::vector<uint64_t> maxmin_e;
string min_idx;
int rc = 0;
@ -618,8 +635,8 @@ int LogDB::purge_log()
oss.str("");
oss << " SELECT MIN(i.log_index) FROM ("
<< " SELECT log_index FROM logdb WHERE fed_index = -1 AND"
<< " applied = 1 AND log_index >= 0 "
<< " SELECT log_index FROM logdb WHERE fed_index = " << UINT64_MAX
<< " AND applied = 1 AND log_index >= 0 "
<< " ORDER BY log_index DESC LIMIT " << log_retention
<< " ) AS i";
@ -633,7 +650,7 @@ int LogDB::purge_log()
oss.str("");
oss << "DELETE FROM logdb WHERE applied = 1 AND log_index >= 0 "
<< "AND fed_index = -1 AND log_index < " << min_idx;
<< "AND fed_index = " << UINT64_MAX << " AND log_index < " << min_idx;
if ( db->limit_support() )
{
@ -681,8 +698,8 @@ int LogDB::purge_log()
oss.str("");
oss << " SELECT MIN(i.log_index) FROM ("
<< " SELECT log_index FROM logdb WHERE fed_index != -1 AND"
<< " applied = 1 AND log_index >= 0 "
<< " SELECT log_index FROM logdb WHERE fed_index != " << UINT64_MAX
<< " AND applied = 1 AND log_index >= 0 "
<< " ORDER BY log_index DESC LIMIT " << log_retention
<< " ) AS i";
@ -696,7 +713,7 @@ int LogDB::purge_log()
oss.str("");
oss << "DELETE FROM logdb WHERE applied = 1 AND log_index >= 0 "
<< "AND fed_index != -1 AND log_index < " << min_idx;
<< "AND fed_index != " << UINT64_MAX << " AND log_index < " << min_idx;
if ( db->limit_support() )
{
@ -724,7 +741,7 @@ int LogDB::purge_log()
/* -------------------------------------------------------------------------- */
/* -------------------------------------------------------------------------- */
int LogDB::replicate(int rindex)
int LogDB::replicate(uint64_t rindex)
{
int rc;
@ -770,11 +787,11 @@ void LogDB::build_federated_index()
fed_log.clear();
set_cb<int> cb;
set_cb<uint64_t> cb;
cb.set_callback(&fed_log);
oss << "SELECT fed_index FROM " << table << " WHERE fed_index != -1 ";
oss << "SELECT fed_index FROM " << table << " WHERE fed_index != " << UINT64_MAX;
db->exec_rd(oss, &cb);
@ -784,15 +801,15 @@ void LogDB::build_federated_index()
/* -------------------------------------------------------------------------- */
/* -------------------------------------------------------------------------- */
int LogDB::last_federated()
uint64_t LogDB::last_federated()
{
pthread_mutex_lock(&mutex);
int findex = -1;
uint64_t findex = -1;
if ( !fed_log.empty() )
{
set<int>::reverse_iterator rit;
set<uint64_t>::reverse_iterator rit;
rit = fed_log.rbegin();
@ -806,13 +823,13 @@ int LogDB::last_federated()
/* -------------------------------------------------------------------------- */
int LogDB::previous_federated(int i)
uint64_t LogDB::previous_federated(uint64_t i)
{
set<int>::iterator it;
set<uint64_t>::iterator it;
pthread_mutex_lock(&mutex);
int findex = -1;
uint64_t findex = UINT64_MAX;
it = fed_log.find(i);
@ -828,13 +845,13 @@ int LogDB::previous_federated(int i)
/* -------------------------------------------------------------------------- */
int LogDB::next_federated(int i)
uint64_t LogDB::next_federated(uint64_t i)
{
set<int>::iterator it;
set<uint64_t>::iterator it;
pthread_mutex_lock(&mutex);
int findex = -1;
uint64_t findex = UINT64_MAX;
it = fed_log.find(i);