2017-04-19 20:44:31 +02:00
/* -------------------------------------------------------------------------- */
2019-01-16 11:27:59 +01:00
/* Copyright 2002-2019, OpenNebula Project, OpenNebula Systems */
2017-04-19 20:44:31 +02:00
/* */
/* Licensed under the Apache License, Version 2.0 (the "License"); you may */
/* not use this file except in compliance with the License. You may obtain */
/* a copy of the License at */
/* */
/* http://www.apache.org/licenses/LICENSE-2.0 */
/* */
/* Unless required by applicable law or agreed to in writing, software */
/* distributed under the License is distributed on an "AS IS" BASIS, */
/* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. */
/* See the License for the specific language governing permissions and */
/* limitations under the License. */
/* -------------------------------------------------------------------------- */
2017-04-27 01:03:44 +02:00
# ifndef REPLICA_REQUEST_H_
# define REPLICA_REQUEST_H_
2017-04-19 20:44:31 +02:00
2017-04-20 16:13:41 +02:00
# include "SyncRequest.h"
2017-04-19 20:44:31 +02:00
/**
* This class represents a log entry replication request . The replication request
* is synchronous : once it has been replicated in a majority of followers the
* client is notified ( SqlDB : : exec_wr ( ) call ) and DB updated .
*/
2017-04-27 01:03:44 +02:00
class ReplicaRequest : public SyncRequest
2017-04-19 20:44:31 +02:00
{
public :
2019-04-08 17:43:12 +02:00
ReplicaRequest ( uint64_t i ) : _index ( i ) , _to_commit ( - 1 ) , _replicas ( 1 ) { } ;
2017-04-19 20:44:31 +02:00
2017-04-27 01:03:44 +02:00
~ ReplicaRequest ( ) { } ;
2017-04-19 20:44:31 +02:00
/**
2017-04-27 01:03:44 +02:00
* This function updates the number of replicas of the record and decrement
* the number of servers left to reach majority consensus . If it reaches 0 ,
* the client is notified
* @ return number of replicas for this log
2017-04-19 20:44:31 +02:00
*/
2018-07-30 13:14:14 +02:00
int add_replica ( )
2017-04-20 16:13:41 +02:00
{
2017-04-27 01:03:44 +02:00
int __replicas ;
2017-04-19 20:44:31 +02:00
2017-04-27 01:03:44 +02:00
_replicas + + ;
2017-04-19 20:44:31 +02:00
2017-04-27 01:03:44 +02:00
if ( _to_commit > 0 )
{
_to_commit - - ;
}
__replicas = _replicas ;
if ( _to_commit = = 0 )
{
result = true ;
timeout = false ;
notify ( ) ;
}
2017-04-21 22:32:30 +02:00
2017-04-27 01:03:44 +02:00
return __replicas ;
}
/* ---------------------------------------------------------------------- */
/* Class access methods */
/* ---------------------------------------------------------------------- */
2019-04-08 17:43:12 +02:00
uint64_t index ( )
2017-04-20 16:13:41 +02:00
{
2017-04-27 01:03:44 +02:00
return _index ;
}
2017-04-19 20:44:31 +02:00
2017-04-25 11:49:52 +02:00
int replicas ( )
{
return _replicas ;
}
int to_commit ( )
{
return _to_commit ;
}
void to_commit ( int c )
{
_to_commit = c ;
}
2017-04-19 20:44:31 +02:00
private :
/**
* Index for this log entry
*/
2019-04-08 17:43:12 +02:00
uint64_t _index ;
2017-04-19 20:44:31 +02:00
/**
* Remaining number of servers that need to replicate this record to commit
* it . Initialized to ( Number_Servers - 1 ) / 2
*/
2017-04-25 11:49:52 +02:00
int _to_commit ;
2017-04-19 20:44:31 +02:00
/**
* Total number of replicas for this entry
*/
2017-04-25 11:49:52 +02:00
int _replicas ;
2017-04-19 20:44:31 +02:00
} ;
2018-07-30 13:14:14 +02:00
/**
* This class represents a map of replication requests . It syncs access between
* RaftManager and DB writer threads . A DB writer allocates and set the
* request and then it waits on it for completion .
*/
class ReplicaRequestMap
{
public :
ReplicaRequestMap ( )
{
pthread_mutex_init ( & mutex , 0 ) ;
} ;
virtual ~ ReplicaRequestMap ( )
{
pthread_mutex_destroy ( & mutex ) ;
}
/**
* Increments the number of replicas of this request . If it can be
* committed the request is removed from the map
* @ param rindex of the request
*
* @ return the number of replicas to commit , if 0 it can be committed
*/
2019-04-08 17:43:12 +02:00
int add_replica ( uint64_t rindex )
2018-07-30 13:14:14 +02:00
{
int to_commit = - 1 ;
pthread_mutex_lock ( & mutex ) ;
2019-04-08 17:43:12 +02:00
std : : map < uint64_t , ReplicaRequest * > : : iterator it = requests . find ( rindex ) ;
2018-07-30 13:14:14 +02:00
2018-09-06 17:22:52 +02:00
if ( it ! = requests . end ( ) & & it - > second ! = 0 )
2018-07-30 13:14:14 +02:00
{
it - > second - > add_replica ( ) ;
to_commit = it - > second - > to_commit ( ) ;
if ( to_commit = = 0 )
{
requests . erase ( it ) ;
}
}
pthread_mutex_unlock ( & mutex ) ;
return to_commit ;
}
2018-09-06 17:22:52 +02:00
/**
* Allocated an empty replica request . It marks a writer thread will wait
* on this request .
* @ param rindex of the request
*/
2019-04-08 17:43:12 +02:00
void allocate ( uint64_t rindex )
2018-09-06 17:22:52 +02:00
{
pthread_mutex_lock ( & mutex ) ;
requests . insert ( std : : make_pair ( rindex , ( ReplicaRequest * ) 0 ) ) ;
pthread_mutex_unlock ( & mutex ) ;
}
2018-07-30 13:14:14 +02:00
/**
* Set the replication request associated to this index . If there is no
* previous request associated to the index it is created .
* @ param rindex of the request
* @ param rr replica request pointer
*/
2019-04-08 17:43:12 +02:00
void set ( uint64_t rindex , ReplicaRequest * rr )
2018-07-30 13:14:14 +02:00
{
pthread_mutex_lock ( & mutex ) ;
2019-04-08 17:43:12 +02:00
std : : map < uint64_t , ReplicaRequest * > : : iterator it = requests . find ( rindex ) ;
2018-07-30 13:14:14 +02:00
if ( it = = requests . end ( ) )
{
requests . insert ( std : : make_pair ( rindex , rr ) ) ;
}
2018-09-06 17:22:52 +02:00
else if ( it - > second = = 0 )
{
it - > second = rr ;
}
pthread_mutex_unlock ( & mutex ) ;
}
/**
* Remove a replication request associated to this index
* @ param rindex of the request
*/
2019-04-08 17:43:12 +02:00
void remove ( uint64_t rindex )
2018-09-06 17:22:52 +02:00
{
pthread_mutex_lock ( & mutex ) ;
2019-04-08 17:43:12 +02:00
std : : map < uint64_t , ReplicaRequest * > : : iterator it = requests . find ( rindex ) ;
2018-09-06 17:22:52 +02:00
if ( it ! = requests . end ( ) )
{
requests . erase ( it ) ;
}
2018-08-24 17:56:14 +02:00
pthread_mutex_unlock ( & mutex ) ;
}
2018-07-30 13:14:14 +02:00
/**
* Notify all writers and clear the replica map
*/
void clear ( )
{
pthread_mutex_lock ( & mutex ) ;
2019-04-08 17:43:12 +02:00
std : : map < uint64_t , ReplicaRequest * > : : iterator it ;
2018-07-30 13:14:14 +02:00
for ( it = requests . begin ( ) ; it ! = requests . end ( ) ; + + it )
{
if ( it - > second = = 0 )
{
continue ;
}
it - > second - > result = false ;
it - > second - > timeout = false ;
it - > second - > message = " oned is now follower " ;
it - > second - > notify ( ) ;
}
requests . clear ( ) ;
pthread_mutex_unlock ( & mutex ) ;
}
2018-09-06 17:22:52 +02:00
/**
* @ return true if a replica request is set for this index
*/
2019-04-08 17:43:12 +02:00
bool is_replicable ( uint64_t rindex )
2018-09-06 17:22:52 +02:00
{
pthread_mutex_lock ( & mutex ) ;
2019-04-08 17:43:12 +02:00
std : : map < uint64_t , ReplicaRequest * > : : iterator it = requests . find ( rindex ) ;
2018-09-06 17:22:52 +02:00
2019-04-08 17:43:12 +02:00
bool rc = it = = requests . end ( ) | |
2018-09-06 17:22:52 +02:00
( it ! = requests . end ( ) & & it - > second ! = 0 ) ;
pthread_mutex_unlock ( & mutex ) ;
return rc ;
}
2018-07-30 13:14:14 +02:00
private :
2018-08-30 02:26:30 +02:00
2018-07-30 13:14:14 +02:00
pthread_mutex_t mutex ;
/**
* Clients waiting for a log replication
*/
2019-04-08 17:43:12 +02:00
std : : map < uint64_t , ReplicaRequest * > requests ;
2018-07-30 13:14:14 +02:00
} ;
2017-04-27 01:03:44 +02:00
# endif /*REPLICA_REQUEST_H_*/
2017-04-19 20:44:31 +02:00