2016-08-16 20:13:18 +03:00
package cluster
import (
2016-08-16 18:26:10 +03:00
"context"
2018-03-22 20:00:05 +03:00
"net/http"
2016-12-30 11:21:13 +03:00
"time"
2016-09-23 19:27:01 +03:00
"github.com/cenk/backoff"
2018-03-22 20:00:05 +03:00
"github.com/containous/mux"
2016-08-18 15:20:11 +03:00
"github.com/containous/traefik/log"
2016-08-16 20:13:18 +03:00
"github.com/containous/traefik/safe"
"github.com/containous/traefik/types"
"github.com/docker/leadership"
2018-03-22 20:00:05 +03:00
"github.com/unrolled/render"
2016-08-16 20:13:18 +03:00
)
2018-03-22 20:00:05 +03:00
var templatesRenderer = render . New ( render . Options {
Directory : "nowhere" ,
} )
2016-08-16 20:13:18 +03:00
// Leadership allows leadership election using a KV store
type Leadership struct {
2016-08-18 14:03:10 +03:00
* safe . Pool
* types . Cluster
2016-08-16 20:13:18 +03:00
candidate * leadership . Candidate
2016-12-09 16:26:33 +03:00
leader * safe . Safe
2016-08-18 15:20:11 +03:00
listeners [ ] LeaderListener
2016-08-16 20:13:18 +03:00
}
2016-08-18 14:03:10 +03:00
// NewLeadership creates a leadership
func NewLeadership ( ctx context . Context , cluster * types . Cluster ) * Leadership {
return & Leadership {
Pool : safe . NewPool ( ctx ) ,
Cluster : cluster ,
candidate : leadership . NewCandidate ( cluster . Store , cluster . Store . Prefix + "/leader" , cluster . Node , 20 * time . Second ) ,
2016-08-18 15:20:11 +03:00
listeners : [ ] LeaderListener { } ,
2016-12-09 16:26:33 +03:00
leader : safe . New ( false ) ,
2016-08-18 14:03:10 +03:00
}
}
2016-08-18 15:20:11 +03:00
// LeaderListener is called when leadership has changed
type LeaderListener func ( elected bool ) error
2016-08-16 20:13:18 +03:00
// Participate tries to be a leader
2016-08-18 14:03:10 +03:00
func ( l * Leadership ) Participate ( pool * safe . Pool ) {
pool . GoCtx ( func ( ctx context . Context ) {
log . Debugf ( "Node %s running for election" , l . Cluster . Node )
defer log . Debugf ( "Node %s no more running for election" , l . Cluster . Node )
2016-08-16 20:13:18 +03:00
backOff := backoff . NewExponentialBackOff ( )
operation := func ( ) error {
2016-11-16 11:56:52 +03:00
return l . run ( ctx , l . candidate )
2016-08-16 20:13:18 +03:00
}
notify := func ( err error , time time . Duration ) {
log . Errorf ( "Leadership election error %+v, retrying in %s" , err , time )
}
2016-12-08 15:32:12 +03:00
err := backoff . RetryNotify ( safe . OperationWithRecover ( operation ) , backOff , notify )
2016-08-16 20:13:18 +03:00
if err != nil {
log . Errorf ( "Cannot elect leadership %+v" , err )
}
} )
}
2017-06-01 23:10:19 +03:00
// AddListener adds a leadership listener
2016-08-18 15:20:11 +03:00
func ( l * Leadership ) AddListener ( listener LeaderListener ) {
l . listeners = append ( l . listeners , listener )
}
2016-08-16 20:13:18 +03:00
// Resign resigns from being a leader
func ( l * Leadership ) Resign ( ) {
2016-08-18 14:03:10 +03:00
l . candidate . Resign ( )
2016-08-18 15:20:11 +03:00
log . Infof ( "Node %s resigned" , l . Cluster . Node )
2016-08-16 20:13:18 +03:00
}
2016-11-16 11:56:52 +03:00
func ( l * Leadership ) run ( ctx context . Context , candidate * leadership . Candidate ) error {
2016-08-16 20:13:18 +03:00
electedCh , errCh := candidate . RunForElection ( )
for {
select {
case elected := <- electedCh :
2016-08-18 14:03:10 +03:00
l . onElection ( elected )
2016-08-16 20:13:18 +03:00
case err := <- errCh :
return err
2016-08-18 14:03:10 +03:00
case <- ctx . Done ( ) :
l . candidate . Resign ( )
2016-08-16 20:13:18 +03:00
return nil
}
}
}
2016-08-18 14:03:10 +03:00
func ( l * Leadership ) onElection ( elected bool ) {
if elected {
log . Infof ( "Node %s elected leader ♚" , l . Cluster . Node )
2016-08-18 15:20:11 +03:00
l . leader . Set ( true )
2016-08-18 14:03:10 +03:00
l . Start ( )
} else {
2017-05-21 15:35:11 +03:00
log . Infof ( "Node %s elected worker ♝" , l . Cluster . Node )
2016-08-18 15:20:11 +03:00
l . leader . Set ( false )
2016-08-18 14:03:10 +03:00
l . Stop ( )
}
2016-08-18 15:20:11 +03:00
for _ , listener := range l . listeners {
err := listener ( elected )
if err != nil {
log . Errorf ( "Error calling Leadership listener: %s" , err )
}
}
}
2018-03-22 20:00:05 +03:00
type leaderResponse struct {
Leader bool ` json:"leader" `
}
func ( l * Leadership ) getLeaderHandler ( response http . ResponseWriter , request * http . Request ) {
leader := & leaderResponse { Leader : l . IsLeader ( ) }
status := http . StatusOK
if ! leader . Leader {
// Set status to be `429`, as this will typically cause load balancers to stop sending requests to the instance without removing them from rotation.
status = http . StatusTooManyRequests
}
err := templatesRenderer . JSON ( response , status , leader )
if err != nil {
log . Error ( err )
}
}
2016-08-18 15:20:11 +03:00
// IsLeader returns true if current node is leader
func ( l * Leadership ) IsLeader ( ) bool {
return l . leader . Get ( ) . ( bool )
2016-08-18 14:03:10 +03:00
}
2018-03-22 20:00:05 +03:00
// AddRoutes add dashboard routes on a router
func ( l * Leadership ) AddRoutes ( router * mux . Router ) {
// Expose cluster leader
router . Methods ( http . MethodGet ) . Path ( "/api/cluster/leader" ) . HandlerFunc ( l . getLeaderHandler )
}