From f2039e0260752de7e377de97df5395cbed167e5e Mon Sep 17 00:00:00 2001 From: "Ruben S. Montero" Date: Sun, 23 Apr 2017 13:09:12 +0200 Subject: [PATCH] F #4809: Added LogDBManger to Zone server --- include/LogDBManager.h | 7 +++++- include/ZoneServer.h | 46 ++++++++++++++++++++++++++++++++++++++- src/logdb/LogDBManager.cc | 30 +++++++++++++++++++++++-- src/zone/Zone.cc | 36 ++++++++++++++++++++++++++++++ 4 files changed, 115 insertions(+), 4 deletions(-) diff --git a/include/LogDBManager.h b/include/LogDBManager.h index d45ce2375a..0a229a3715 100644 --- a/include/LogDBManager.h +++ b/include/LogDBManager.h @@ -68,7 +68,10 @@ class LogDBManager : public ActionListener public: LogDBManager(){}; - virtual ~LogDBManager(){}; + virtual ~LogDBManager() + { + stop_action(); + }; /** * Triggers specific actions to the LogDBManager @@ -103,6 +106,8 @@ private: void do_replication(); + void add_request(); + void finalize(); pthread_t * thread_id() diff --git a/include/ZoneServer.h b/include/ZoneServer.h index a6dab8ccad..648ab327c7 100644 --- a/include/ZoneServer.h +++ b/include/ZoneServer.h @@ -22,6 +22,8 @@ #include "ExtendedAttribute.h" +class LogDBManager; + /** * The VirtualMachine DISK attribute */ @@ -36,7 +38,8 @@ public: LEADER = 3 }; - ZoneServer(VectorAttribute *va, int id):ExtendedAttribute(va, id){}; + ZoneServer(VectorAttribute *va, int id):ExtendedAttribute(va, id), + state(FOLLOWER), dbm(0){}; virtual ~ZoneServer(){}; @@ -64,6 +67,9 @@ public: return 0; } + //-------------------------------------------------------------------------- + // Server attributes + //-------------------------------------------------------------------------- /** * @return the ID of the server */ @@ -159,6 +165,39 @@ public: match = _match; } + /** + * Set the state for this follower + */ + void set_state(State s) + { + state = s; + } + + //-------------------------------------------------------------------------- + // LogDBManager interface + //-------------------------------------------------------------------------- + /** + * Start the LogDB manager and associated replica threads for followers + * @param 0 on success + */ + int logdbm_start(); + + /** + * Stop the LogDB manager and threads + * @param 0 on success + */ + void logdbm_stop(); + + /** + * Start replica threads for new servers in zone + */ + void logdbm_addserver(); + + /** + * Start replication of a new log entry on followers + */ + void logdbm_replicate(); + private: State state; @@ -184,6 +223,11 @@ private: unsigned int next; unsigned int match; + + /** + * Replication Manager for this server (only started in self) + */ + LogDBManager * dbm; }; diff --git a/src/logdb/LogDBManager.cc b/src/logdb/LogDBManager.cc index 12c7e7efc8..a28047417e 100644 --- a/src/logdb/LogDBManager.cc +++ b/src/logdb/LogDBManager.cc @@ -153,7 +153,7 @@ void LogDBManager::start_action() thread_pool.insert(std::make_pair(id, rthread)); pthread_attr_init (&pattr); - pthread_attr_setdetachstate(&pattr, PTHREAD_CREATE_DETACHED); + pthread_attr_setdetachstate(&pattr, PTHREAD_CREATE_JOINABLE); oss << "Starting replication thread for server " << id; @@ -168,6 +168,8 @@ void LogDBManager::start_action() zone->unlock(); }; +// ----------------------------------------------------------------------------- +// ----------------------------------------------------------------------------- void LogDBManager::stop_action() { @@ -177,15 +179,25 @@ void LogDBManager::stop_action() { it->second->finalize(); + pthread_join(*(it->second->thread_id()),0); + delete it->second; } thread_pool.clear(); }; +// ----------------------------------------------------------------------------- +// ----------------------------------------------------------------------------- + void LogDBManager::replicate_action() { + std::map::iterator it; + for ( it = thread_pool.begin() ; it != thread_pool.end() ; ++it ) + { + it->second->add_request(); + } }; void LogDBManager::delete_server_action() @@ -212,7 +224,7 @@ LogDBManager::ReplicaThread::ReplicaThread(int f, int l): // ----------------------------------------------------------------------------- // ----------------------------------------------------------------------------- -// + void LogDBManager::ReplicaThread::do_replication() { std::string secret, error; @@ -443,3 +455,17 @@ void LogDBManager::ReplicaThread::finalize() pthread_mutex_unlock(&mutex); } + +// ----------------------------------------------------------------------------- +// ----------------------------------------------------------------------------- + +void LogDBManager::ReplicaThread::add_request() +{ + pthread_mutex_lock(&mutex); + + _pending_requests = true; + + pthread_cond_signal(&cond); + + pthread_mutex_unlock(&mutex); +} diff --git a/src/zone/Zone.cc b/src/zone/Zone.cc index d0b634eee8..56a2d4e0f1 100644 --- a/src/zone/Zone.cc +++ b/src/zone/Zone.cc @@ -16,6 +16,7 @@ #include "Zone.h" #include "ZoneServer.h" +#include "LogDBManager.h" /* ------------------------------------------------------------------------ */ @@ -354,3 +355,38 @@ ZoneServer * Zone::get_server(int server_id) { return servers->get_server(server_id); } + +//-------------------------------------------------------------------------- +// ZoneServer LogDBManager implementation +//-------------------------------------------------------------------------- +int ZoneServer::logdbm_start() +{ + if ( state != LEADER ) + { + return -1; + } + + if ( dbm == 0 ) + { + dbm = new LogDBManager(); + } + + dbm->trigger(LogDBAction::START); + + return 0; +} + +void ZoneServer::logdbm_stop() +{ + dbm->trigger(LogDBAction::STOP); +} + +void ZoneServer::logdbm_addserver() +{ + dbm->trigger(LogDBAction::ADD_SERVER); +} + +void ZoneServer::logdbm_replicate() +{ + dbm->trigger(LogDBAction::REPLICATE); +}