From b26e5a716af4df1c4afe29b5419684f54ef6021c Mon Sep 17 00:00:00 2001 From: "Ruben S. Montero" Date: Sun, 23 Apr 2017 01:43:01 +0200 Subject: [PATCH] F #4809: Replication logic --- include/LogDBManager.h | 2 + include/ZoneServer.h | 24 ++++++ src/logdb/LogDBManager.cc | 152 +++++++++++++++++++++++++++++++------- 3 files changed, 153 insertions(+), 25 deletions(-) diff --git a/include/LogDBManager.h b/include/LogDBManager.h index bf5273b180..d45ce2375a 100644 --- a/include/LogDBManager.h +++ b/include/LogDBManager.h @@ -124,6 +124,8 @@ private: bool _finalize; + bool _pending_requests; + // --------------------------------------------------------------------- // Information of the replication target server and leader // --------------------------------------------------------------------- diff --git a/include/ZoneServer.h b/include/ZoneServer.h index b79408f5cc..a6dab8ccad 100644 --- a/include/ZoneServer.h +++ b/include/ZoneServer.h @@ -135,6 +135,30 @@ public: return state == LEADER; } + /** + * Decrease the next log entry to send to this follower + */ + void dec_next() + { + next--; + } + + /** + * Increase the next log entry to send to this follower + */ + void inc_next() + { + next++; + } + + /** + * Set the the index of the highest log entry on this follower + */ + void set_match(unsigned int _match) + { + match = _match; + } + private: State state; diff --git a/src/logdb/LogDBManager.cc b/src/logdb/LogDBManager.cc index bad4f85630..12c7e7efc8 100644 --- a/src/logdb/LogDBManager.cc +++ b/src/logdb/LogDBManager.cc @@ -229,11 +229,20 @@ void LogDBManager::ReplicaThread::do_replication() { pthread_mutex_lock(&mutex); - if ( _finalize ) + while ( _pending_requests == false ) { - return; + pthread_cond_wait(&cond,&mutex); + + if ( _finalize ) + { + return; + } } + _pending_requests = false; + + pthread_mutex_unlock(&mutex); + Zone * zone = zpool->get(zone_id, true); if ( zone == 0 ) @@ -280,6 +289,13 @@ void LogDBManager::ReplicaThread::do_replication() lr->unlock(); + ostringstream oss; + + oss << "Replicating log entry " << id << "-" << term << " on server: " + << follower_id << " (" << follower_edp <<")"; + + NebulaLog::log("DBM", Log::DDEBUG, oss); + xmlrpc_c::carriageParm_curl0 carriage(follower_edp); xmlrpc_c::paramList replica_params; @@ -295,36 +311,120 @@ void LogDBManager::ReplicaThread::do_replication() xmlrpc_c::rpc rpc_client(replica_method, replica_params); - rpc_client.call(&client, &carriage); - - /* try - xmlrpc_c::rpcPtr rpc(method, plist); - xmlrpc_c::carriageParm_curl0 cparam(one_endpoint); + { + rpc_client.call(&client, &carriage); - rpc->start(&client, &cparam); + if ( rpc_client.isSuccessful() ) + { + xmlrpc_c::value result = rpc_client.getResult(); - client.finishAsync(xmlrpc_c::timeout(timeout)); + vector values = + xmlrpc_c::value_array(result).vectorValueValue(); - if (!rpc->isFinished()) - { - rpc->finishErr(girerr::error("XMLRPC method " + method + - " timeout, resetting call")); - } + bool success = xmlrpc_c::value_boolean(values[0]); - if (rpc->isSuccessful()) - { - *result = rpc->getResult(); - } - else - { - xmlrpc_c::fault failure = rpc->getFault(); + if ( success ) + { + zone = zpool->get(zone_id, true); - girerr::error(failure.getDescription()); - } + if ( zone == 0 ) + { + continue; + } -*/ - pthread_mutex_unlock(&mutex); + ZoneServer * follower = zone->get_server(follower_id); + + if ( follower == 0 ) + { + zone->unlock(); + + continue; + } + + follower->inc_next(); + + follower->set_match(id); + + zone->unlock(); + } + else + { + int follower_term = xmlrpc_c::value_boolean(values[1]); + + if ( follower_term > term ) + { + //Convert to follower + // - Update term + // - Set state to follower + // - Stop replica threads + ostringstream ess; + + ess << "Detected a higher term on follower: " + << follower_id << " giving up leadership"; + + NebulaLog::log("DBM", Log::WARNING, ess); + } + else + { + //Log inconsistency in follower + // - Decrease follower index + // - Retry + ostringstream ess; + + ess << "Log inconsistency detected on follower: " + << follower_id; + + NebulaLog::log("DBM", Log::WARNING, ess); + + zone = zpool->get(zone_id, true); + + if ( zone == 0 ) + { + continue; + } + + ZoneServer * follower = zone->get_server(follower_id); + + if ( follower == 0 ) + { + zone->unlock(); + + continue; + } + + follower->dec_next(); + + zone->unlock(); + + _pending_requests = true; + } + } + } + else //RPC failed, will retry on next replication request + { + ostringstream ess; + + xmlrpc_c::fault failure = rpc_client.getFault(); + + ess << "Error replicating log entry " << id << "-" << term + << " on follower " << follower_id << ": " + << failure.getDescription(); + + NebulaLog::log("DBM", Log::ERROR, ess); + } + } + catch (exception const& e) + { + ostringstream ess; + + ess << "Error replicating log entry " << id << "-" << term + << " on follower " << follower_id << ": " << e.what(); + + NebulaLog::log("DBM", Log::ERROR, ess); + + continue; + } } } @@ -337,6 +437,8 @@ void LogDBManager::ReplicaThread::finalize() _finalize = true; + _pending_requests = false; + pthread_cond_signal(&cond); pthread_mutex_unlock(&mutex);