2016-08-16 20:13:18 +03:00
package cluster
import (
2016-08-16 18:26:10 +03:00
"context"
2016-12-30 11:21:13 +03:00
"time"
2016-09-23 19:27:01 +03:00
"github.com/cenk/backoff"
2016-08-18 15:20:11 +03:00
"github.com/containous/traefik/log"
2016-08-16 20:13:18 +03:00
"github.com/containous/traefik/safe"
"github.com/containous/traefik/types"
"github.com/docker/leadership"
)
// Leadership allows leadership election using a KV store
type Leadership struct {
2016-08-18 14:03:10 +03:00
* safe . Pool
* types . Cluster
2016-08-16 20:13:18 +03:00
candidate * leadership . Candidate
2016-08-18 15:20:11 +03:00
leader safe . Safe
listeners [ ] LeaderListener
2016-08-16 20:13:18 +03:00
}
2016-08-18 14:03:10 +03:00
// NewLeadership creates a leadership
func NewLeadership ( ctx context . Context , cluster * types . Cluster ) * Leadership {
return & Leadership {
Pool : safe . NewPool ( ctx ) ,
Cluster : cluster ,
candidate : leadership . NewCandidate ( cluster . Store , cluster . Store . Prefix + "/leader" , cluster . Node , 20 * time . Second ) ,
2016-08-18 15:20:11 +03:00
listeners : [ ] LeaderListener { } ,
2016-08-18 14:03:10 +03:00
}
}
2016-08-18 15:20:11 +03:00
// LeaderListener is called when leadership has changed
type LeaderListener func ( elected bool ) error
2016-08-16 20:13:18 +03:00
// Participate tries to be a leader
2016-08-18 14:03:10 +03:00
func ( l * Leadership ) Participate ( pool * safe . Pool ) {
pool . GoCtx ( func ( ctx context . Context ) {
log . Debugf ( "Node %s running for election" , l . Cluster . Node )
defer log . Debugf ( "Node %s no more running for election" , l . Cluster . Node )
2016-08-16 20:13:18 +03:00
backOff := backoff . NewExponentialBackOff ( )
operation := func ( ) error {
2016-11-16 11:56:52 +03:00
return l . run ( ctx , l . candidate )
2016-08-16 20:13:18 +03:00
}
notify := func ( err error , time time . Duration ) {
log . Errorf ( "Leadership election error %+v, retrying in %s" , err , time )
}
err := backoff . RetryNotify ( operation , backOff , notify )
if err != nil {
log . Errorf ( "Cannot elect leadership %+v" , err )
}
} )
}
2016-08-18 15:20:11 +03:00
// AddListener adds a leadership listerner
func ( l * Leadership ) AddListener ( listener LeaderListener ) {
l . listeners = append ( l . listeners , listener )
}
2016-08-16 20:13:18 +03:00
// Resign resigns from being a leader
func ( l * Leadership ) Resign ( ) {
2016-08-18 14:03:10 +03:00
l . candidate . Resign ( )
2016-08-18 15:20:11 +03:00
log . Infof ( "Node %s resigned" , l . Cluster . Node )
2016-08-16 20:13:18 +03:00
}
2016-11-16 11:56:52 +03:00
func ( l * Leadership ) run ( ctx context . Context , candidate * leadership . Candidate ) error {
2016-08-16 20:13:18 +03:00
electedCh , errCh := candidate . RunForElection ( )
for {
select {
case elected := <- electedCh :
2016-08-18 14:03:10 +03:00
l . onElection ( elected )
2016-08-16 20:13:18 +03:00
case err := <- errCh :
return err
2016-08-18 14:03:10 +03:00
case <- ctx . Done ( ) :
l . candidate . Resign ( )
2016-08-16 20:13:18 +03:00
return nil
}
}
}
2016-08-18 14:03:10 +03:00
func ( l * Leadership ) onElection ( elected bool ) {
if elected {
log . Infof ( "Node %s elected leader ♚" , l . Cluster . Node )
2016-08-18 15:20:11 +03:00
l . leader . Set ( true )
2016-08-18 14:03:10 +03:00
l . Start ( )
} else {
log . Infof ( "Node %s elected slave ♝" , l . Cluster . Node )
2016-08-18 15:20:11 +03:00
l . leader . Set ( false )
2016-08-18 14:03:10 +03:00
l . Stop ( )
}
2016-08-18 15:20:11 +03:00
for _ , listener := range l . listeners {
err := listener ( elected )
if err != nil {
log . Errorf ( "Error calling Leadership listener: %s" , err )
}
}
}
// IsLeader returns true if current node is leader
func ( l * Leadership ) IsLeader ( ) bool {
return l . leader . Get ( ) . ( bool )
2016-08-18 14:03:10 +03:00
}