2017-04-17 13:50:02 +03:00
package mesos
2016-07-20 12:56:14 +03:00
import (
2016-12-30 11:21:13 +03:00
"fmt"
2016-07-20 12:56:14 +03:00
"strings"
2016-11-28 16:59:08 +03:00
"time"
2016-09-19 20:08:39 +03:00
"github.com/cenk/backoff"
"github.com/containous/traefik/job"
2016-09-23 19:27:01 +03:00
"github.com/containous/traefik/log"
2017-04-17 13:50:02 +03:00
"github.com/containous/traefik/provider"
2016-07-20 12:56:14 +03:00
"github.com/containous/traefik/safe"
"github.com/containous/traefik/types"
"github.com/mesos/mesos-go/detector"
2018-01-10 06:05:33 +03:00
"github.com/mesosphere/mesos-dns/records"
"github.com/mesosphere/mesos-dns/records/state"
2016-12-30 11:21:13 +03:00
// Register mesos zoo the detector
_ "github.com/mesos/mesos-go/detector/zoo"
2016-07-20 12:56:14 +03:00
"github.com/mesosphere/mesos-dns/detect"
"github.com/mesosphere/mesos-dns/logging"
"github.com/mesosphere/mesos-dns/util"
)
2017-04-17 13:50:02 +03:00
var _ provider . Provider = ( * Provider ) ( nil )
2016-08-16 20:13:18 +03:00
2017-04-17 13:50:02 +03:00
//Provider holds configuration of the provider.
type Provider struct {
provider . BaseProvider
2016-07-20 12:56:14 +03:00
Endpoint string ` description:"Mesos server endpoint. You can also specify multiple endpoint for Mesos" `
Domain string ` description:"Default domain used" `
2017-10-02 11:32:02 +03:00
ExposedByDefault bool ` description:"Expose Mesos apps by default" export:"true" `
GroupsAsSubDomains bool ` description:"Convert Mesos groups to subdomains" export:"true" `
ZkDetectionTimeout int ` description:"Zookeeper timeout (in seconds)" export:"true" `
RefreshSeconds int ` description:"Polling interval (in seconds)" export:"true" `
2018-01-10 06:05:59 +03:00
IPSources string ` description:"IPSources (e.g. host, docker, mesos, netinfo)" export:"true" `
2017-10-02 11:32:02 +03:00
StateTimeoutSecond int ` description:"HTTP Timeout (in seconds)" export:"true" `
2016-07-20 12:56:14 +03:00
Masters [ ] string
}
2017-04-17 13:50:02 +03:00
// Provide allows the mesos provider to provide configurations to traefik
2016-07-20 12:56:14 +03:00
// using the given configuration channel.
2017-04-17 13:50:02 +03:00
func ( p * Provider ) Provide ( configurationChan chan <- types . ConfigMessage , pool * safe . Pool , constraints types . Constraints ) error {
2016-07-20 12:56:14 +03:00
operation := func ( ) error {
// initialize logging
logging . SetupLogs ( )
2017-04-17 13:50:02 +03:00
log . Debugf ( "%s" , p . IPSources )
2016-07-20 12:56:14 +03:00
var zk string
var masters [ ] string
2017-04-17 13:50:02 +03:00
if strings . HasPrefix ( p . Endpoint , "zk://" ) {
zk = p . Endpoint
2016-07-20 12:56:14 +03:00
} else {
2017-04-17 13:50:02 +03:00
masters = strings . Split ( p . Endpoint , "," )
2016-07-20 12:56:14 +03:00
}
errch := make ( chan error )
changed := detectMasters ( zk , masters )
2017-04-17 13:50:02 +03:00
reload := time . NewTicker ( time . Second * time . Duration ( p . RefreshSeconds ) )
zkTimeout := time . Second * time . Duration ( p . ZkDetectionTimeout )
2016-07-20 12:56:14 +03:00
timeout := time . AfterFunc ( zkTimeout , func ( ) {
if zkTimeout > 0 {
errch <- fmt . Errorf ( "master detection timed out after %s" , zkTimeout )
}
} )
defer reload . Stop ( )
defer util . HandleCrash ( )
2017-04-17 13:50:02 +03:00
if ! p . Watch {
2016-07-20 12:56:14 +03:00
reload . Stop ( )
timeout . Stop ( )
}
for {
select {
case <- reload . C :
2018-01-10 06:05:33 +03:00
tasks := p . getTasks ( )
configuration := p . buildConfiguration ( tasks )
2016-07-20 12:56:14 +03:00
if configuration != nil {
configurationChan <- types . ConfigMessage {
ProviderName : "mesos" ,
Configuration : configuration ,
}
}
case masters := <- changed :
if len ( masters ) == 0 || masters [ 0 ] == "" {
// no leader
timeout . Reset ( zkTimeout )
} else {
timeout . Stop ( )
}
log . Debugf ( "new masters detected: %v" , masters )
2017-04-17 13:50:02 +03:00
p . Masters = masters
2018-01-10 06:05:33 +03:00
tasks := p . getTasks ( )
configuration := p . buildConfiguration ( tasks )
2016-07-20 12:56:14 +03:00
if configuration != nil {
configurationChan <- types . ConfigMessage {
ProviderName : "mesos" ,
Configuration : configuration ,
}
}
case err := <- errch :
log . Errorf ( "%s" , err )
}
}
}
notify := func ( err error , time time . Duration ) {
2017-05-26 18:03:14 +03:00
log . Errorf ( "Mesos connection error %+v, retrying in %s" , err , time )
2016-07-20 12:56:14 +03:00
}
2016-12-08 15:32:12 +03:00
err := backoff . RetryNotify ( safe . OperationWithRecover ( operation ) , job . NewBackOff ( backoff . NewExponentialBackOff ( ) ) , notify )
2016-07-20 12:56:14 +03:00
if err != nil {
2017-05-26 18:03:14 +03:00
log . Errorf ( "Cannot connect to Mesos server %+v" , err )
2016-07-20 12:56:14 +03:00
}
return nil
}
func detectMasters ( zk string , masters [ ] string ) <- chan [ ] string {
changed := make ( chan [ ] string , 1 )
if zk != "" {
log . Debugf ( "Starting master detector for ZK " , zk )
if md , err := detector . New ( zk ) ; err != nil {
2017-05-26 18:03:14 +03:00
log . Errorf ( "Failed to create master detector: %v" , err )
2016-07-20 12:56:14 +03:00
} else if err := md . Detect ( detect . NewMasters ( masters , changed ) ) ; err != nil {
2017-05-26 18:03:14 +03:00
log . Errorf ( "Failed to initialize master detector: %v" , err )
2016-07-20 12:56:14 +03:00
}
} else {
changed <- masters
}
return changed
}
2018-01-10 06:05:33 +03:00
func ( p * Provider ) getTasks ( ) [ ] state . Task {
rg := records . NewRecordGenerator ( time . Duration ( p . StateTimeoutSecond ) * time . Second )
st , err := rg . FindMaster ( p . Masters ... )
if err != nil {
log . Errorf ( "Failed to create a client for Mesos, error: %v" , err )
return nil
}
return taskRecords ( st )
}
func taskRecords ( st state . State ) [ ] state . Task {
var tasks [ ] state . Task
for _ , f := range st . Frameworks {
for _ , task := range f . Tasks {
for _ , slave := range st . Slaves {
if task . SlaveID == slave . ID {
task . SlaveIP = slave . PID . Host
}
}
// only do running and discoverable tasks
if task . State == "TASK_RUNNING" {
tasks = append ( tasks , task )
}
}
}
return tasks
}