2015-11-01 16:35:01 +01:00
package provider
2015-09-12 15:10:03 +02:00
2015-09-09 22:39:08 +02:00
import (
2015-11-01 16:35:01 +01:00
"errors"
2015-11-13 11:50:32 +01:00
"net/url"
2016-05-31 23:23:23 +02:00
"sort"
2015-09-24 17:16:13 +02:00
"strconv"
2016-02-01 16:08:58 +01:00
"strings"
2015-09-24 17:16:13 +02:00
"text/template"
2016-08-13 12:55:15 -04:00
"math"
"net/http"
"time"
2015-09-10 22:54:37 +02:00
"github.com/BurntSushi/ty/fun"
2016-09-19 19:08:39 +02:00
"github.com/cenk/backoff"
"github.com/containous/traefik/job"
2016-09-23 18:27:01 +02:00
"github.com/containous/traefik/log"
2016-03-31 18:57:08 +02:00
"github.com/containous/traefik/safe"
2016-02-24 16:43:39 +01:00
"github.com/containous/traefik/types"
2015-09-12 15:10:03 +02:00
"github.com/gambol99/go-marathon"
2015-09-09 22:39:08 +02:00
)
2016-08-16 19:13:18 +02:00
var _ Provider = ( * Marathon ) ( nil )
2015-11-01 19:29:47 +01:00
// Marathon holds configuration of the Marathon provider.
2015-11-02 19:48:34 +01:00
type Marathon struct {
2016-06-01 16:47:39 +02:00
BaseProvider
2016-10-06 17:42:19 +02:00
Endpoint string ` description:"Marathon server endpoint. You can also specify multiple endpoint for Marathon" `
Domain string ` description:"Default domain used" `
ExposedByDefault bool ` description:"Expose Marathon apps by default" `
GroupsAsSubDomains bool ` description:"Convert Marathon groups to subdomains" `
DCOSToken string ` description:"DCOSToken for DCOS environment, This will override the Authorization header" `
MarathonLBCompatibility bool ` description:"Add compatibility with marathon-lb labels" `
TLS * ClientTLS ` description:"Enable Docker TLS support" `
Basic * MarathonBasic
marathonClient marathon . Marathon
2015-11-13 11:50:32 +01:00
}
2016-01-18 11:52:18 +01:00
// MarathonBasic holds basic authentication specific configurations
type MarathonBasic struct {
HTTPBasicAuthUser string
HTTPBasicPassword string
}
2015-11-13 11:50:32 +01:00
type lightMarathonClient interface {
2015-11-22 23:32:31 +01:00
AllTasks ( v url . Values ) ( * marathon . Tasks , error )
2016-05-03 16:52:14 +02:00
Applications ( url . Values ) ( * marathon . Applications , error )
2015-09-09 22:39:08 +02:00
}
2015-11-01 19:29:47 +01:00
// Provide allows the provider to provide configurations to traefik
// using the given configuration channel.
2016-11-09 19:27:04 +01:00
func ( provider * Marathon ) Provide ( configurationChan chan <- types . ConfigMessage , pool * safe . Pool , constraints types . Constraints ) error {
2016-05-30 15:05:58 +02:00
provider . Constraints = append ( provider . Constraints , constraints ... )
2016-04-15 18:59:51 +02:00
operation := func ( ) error {
config := marathon . NewDefaultConfig ( )
config . URL = provider . Endpoint
config . EventsTransport = marathon . EventsTransportSSE
if provider . Basic != nil {
config . HTTPBasicAuthUser = provider . Basic . HTTPBasicAuthUser
config . HTTPBasicPassword = provider . Basic . HTTPBasicPassword
}
2016-06-18 14:51:52 +02:00
if len ( provider . DCOSToken ) > 0 {
config . DCOSToken = provider . DCOSToken
}
2016-06-27 16:14:56 +02:00
TLSConfig , err := provider . TLS . CreateTLSConfig ( )
if err != nil {
return err
}
2016-04-15 18:59:51 +02:00
config . HTTPClient = & http . Client {
Transport : & http . Transport {
2016-06-27 16:14:56 +02:00
TLSClientConfig : TLSConfig ,
2016-04-15 18:59:51 +02:00
} ,
}
client , err := marathon . NewClient ( config )
if err != nil {
log . Errorf ( "Failed to create a client for marathon, error: %s" , err )
return err
}
provider . marathonClient = client
update := make ( marathon . EventsChannel , 5 )
if provider . Watch {
2016-06-18 14:51:52 +02:00
if err := client . AddEventsListener ( update , marathon . EventIDApplications ) ; err != nil {
2016-04-15 18:59:51 +02:00
log . Errorf ( "Failed to register for events, %s" , err )
return err
}
2016-04-13 20:36:23 +02:00
pool . Go ( func ( stop chan bool ) {
2016-04-19 12:00:22 +02:00
defer close ( update )
2015-09-24 17:16:13 +02:00
for {
2016-04-13 20:36:23 +02:00
select {
case <- stop :
return
case event := <- update :
log . Debug ( "Marathon event receveived" , event )
configuration := provider . loadMarathonConfig ( )
if configuration != nil {
configurationChan <- types . ConfigMessage {
ProviderName : "marathon" ,
Configuration : configuration ,
}
2015-11-13 11:50:32 +01:00
}
2015-09-09 23:09:16 +02:00
}
2015-09-24 17:16:13 +02:00
}
2016-03-31 18:57:08 +02:00
} )
2015-09-09 23:09:16 +02:00
}
2016-04-19 12:00:22 +02:00
configuration := provider . loadMarathonConfig ( )
configurationChan <- types . ConfigMessage {
ProviderName : "marathon" ,
Configuration : configuration ,
}
2016-04-15 18:59:51 +02:00
return nil
2015-09-09 22:39:08 +02:00
}
2015-09-24 17:16:13 +02:00
2016-04-15 18:59:51 +02:00
notify := func ( err error , time time . Duration ) {
log . Errorf ( "Marathon connection error %+v, retrying in %s" , err , time )
}
2016-09-19 19:08:39 +02:00
err := backoff . RetryNotify ( operation , job . NewBackOff ( backoff . NewExponentialBackOff ( ) ) , notify )
2016-04-15 18:59:51 +02:00
if err != nil {
2016-08-19 10:36:54 +02:00
log . Errorf ( "Cannot connect to Marathon server %+v" , err )
2015-11-13 11:50:32 +01:00
}
2015-10-01 12:04:25 +02:00
return nil
2015-09-09 22:39:08 +02:00
}
2015-11-02 19:48:34 +01:00
func ( provider * Marathon ) loadMarathonConfig ( ) * types . Configuration {
2015-10-08 21:21:51 +02:00
var MarathonFuncMap = template . FuncMap {
2016-08-13 12:55:15 -04:00
"getBackend" : provider . getBackend ,
"getPort" : provider . getPort ,
"getWeight" : provider . getWeight ,
"getDomain" : provider . getDomain ,
"getProtocol" : provider . getProtocol ,
"getPassHostHeader" : provider . getPassHostHeader ,
"getPriority" : provider . getPriority ,
"getEntryPoints" : provider . getEntryPoints ,
"getFrontendRule" : provider . getFrontendRule ,
"getFrontendBackend" : provider . getFrontendBackend ,
"replace" : replace ,
"hasCircuitBreakerLabels" : provider . hasCircuitBreakerLabels ,
"hasLoadBalancerLabels" : provider . hasLoadBalancerLabels ,
"hasMaxConnLabels" : provider . hasMaxConnLabels ,
"getMaxConnExtractorFunc" : provider . getMaxConnExtractorFunc ,
"getMaxConnAmount" : provider . getMaxConnAmount ,
"getLoadBalancerMethod" : provider . getLoadBalancerMethod ,
"getCircuitBreakerExpression" : provider . getCircuitBreakerExpression ,
2016-05-13 10:22:11 -04:00
"getSticky" : provider . getSticky ,
2015-10-08 21:21:51 +02:00
}
2015-09-09 22:39:08 +02:00
applications , err := provider . marathonClient . Applications ( nil )
2015-09-12 15:10:03 +02:00
if err != nil {
2015-09-24 14:32:37 +02:00
log . Errorf ( "Failed to create a client for marathon, error: %s" , err )
2015-09-09 22:39:08 +02:00
return nil
}
2015-09-10 22:54:37 +02:00
2016-02-09 23:10:24 +01:00
tasks , err := provider . marathonClient . AllTasks ( & marathon . AllTasksOpts { Status : "running" } )
2015-09-12 15:10:03 +02:00
if err != nil {
2015-09-24 14:32:37 +02:00
log . Errorf ( "Failed to create a client for marathon, error: %s" , err )
2015-09-09 22:39:08 +02:00
return nil
}
2015-09-10 22:54:37 +02:00
//filter tasks
filteredTasks := fun . Filter ( func ( task marathon . Task ) bool {
2016-09-20 09:44:32 +01:00
return provider . taskFilter ( task , applications , provider . ExposedByDefault )
2015-09-10 22:54:37 +02:00
} , tasks . Tasks ) . ( [ ] marathon . Task )
//filter apps
filteredApps := fun . Filter ( func ( app marathon . Application ) bool {
2016-09-20 09:44:32 +01:00
return provider . applicationFilter ( app , filteredTasks )
2015-09-10 22:54:37 +02:00
} , applications . Apps ) . ( [ ] marathon . Application )
2015-09-09 22:39:08 +02:00
templateObjects := struct {
Applications [ ] marathon . Application
Tasks [ ] marathon . Task
Domain string
} {
2015-09-10 22:54:37 +02:00
filteredApps ,
filteredTasks ,
2015-09-09 22:39:08 +02:00
provider . Domain ,
}
2015-11-13 11:50:32 +01:00
configuration , err := provider . getConfiguration ( "templates/marathon.tmpl" , MarathonFuncMap , templateObjects )
2015-09-09 22:39:08 +02:00
if err != nil {
2015-11-13 11:50:32 +01:00
log . Error ( err )
2015-09-09 22:39:08 +02:00
}
2015-11-13 11:50:32 +01:00
return configuration
}
2015-09-09 22:39:08 +02:00
2016-09-20 09:44:32 +01:00
func ( provider * Marathon ) taskFilter ( task marathon . Task , applications * marathon . Applications , exposedByDefaultFlag bool ) bool {
2015-11-13 11:50:32 +01:00
if len ( task . Ports ) == 0 {
log . Debug ( "Filtering marathon task without port %s" , task . AppID )
return false
}
2015-12-05 10:59:01 -08:00
application , err := getApplication ( task , applications . Apps )
if err != nil {
2015-11-13 11:50:32 +01:00
log . Errorf ( "Unable to get marathon application from task %s" , task . AppID )
return false
}
2016-09-23 22:05:11 +01:00
label , _ := provider . getLabel ( application , "traefik.tags" )
constraintTags := strings . Split ( label , "," )
2016-10-06 17:42:19 +02:00
if provider . MarathonLBCompatibility {
if label , err := provider . getLabel ( application , "HAPROXY_GROUP" ) ; err == nil {
constraintTags = append ( constraintTags , label )
}
}
2016-09-23 22:05:11 +01:00
if ok , failingConstraint := provider . MatchConstraints ( constraintTags ) ; ! ok {
if failingConstraint != nil {
log . Debugf ( "Application %v pruned by '%v' constraint" , application . ID , failingConstraint . String ( ) )
2016-09-20 09:44:32 +01:00
}
2016-09-23 22:05:11 +01:00
return false
2016-09-20 09:44:32 +01:00
}
2016-03-21 12:37:02 +03:00
if ! isApplicationEnabled ( application , exposedByDefaultFlag ) {
2015-11-13 11:50:32 +01:00
log . Debugf ( "Filtering disabled marathon task %s" , task . AppID )
return false
}
2015-12-05 10:59:01 -08:00
//filter indeterminable task port
2016-06-18 14:51:52 +02:00
portIndexLabel := ( * application . Labels ) [ "traefik.portIndex" ]
portValueLabel := ( * application . Labels ) [ "traefik.port" ]
2015-12-05 10:59:01 -08:00
if portIndexLabel != "" && portValueLabel != "" {
log . Debugf ( "Filtering marathon task %s specifying both traefik.portIndex and traefik.port labels" , task . AppID )
return false
}
2016-10-27 15:49:34 +02:00
2015-12-05 10:59:01 -08:00
if portIndexLabel != "" {
2016-06-18 14:51:52 +02:00
index , err := strconv . Atoi ( ( * application . Labels ) [ "traefik.portIndex" ] )
2015-12-05 10:59:01 -08:00
if err != nil || index < 0 || index > len ( application . Ports ) - 1 {
log . Debugf ( "Filtering marathon task %s with unexpected value for traefik.portIndex label" , task . AppID )
return false
}
}
if portValueLabel != "" {
2016-06-18 14:51:52 +02:00
port , err := strconv . Atoi ( ( * application . Labels ) [ "traefik.port" ] )
2015-12-05 10:59:01 -08:00
if err != nil {
log . Debugf ( "Filtering marathon task %s with unexpected value for traefik.port label" , task . AppID )
return false
}
var foundPort bool
for _ , exposedPort := range task . Ports {
if port == exposedPort {
foundPort = true
break
}
}
if ! foundPort {
log . Debugf ( "Filtering marathon task %s without a matching port for traefik.port label" , task . AppID )
return false
}
}
2015-11-13 11:50:32 +01:00
//filter healthchecks
if application . HasHealthChecks ( ) {
if task . HasHealthCheckResults ( ) {
2016-02-09 23:10:24 +01:00
for _ , healthcheck := range task . HealthCheckResults {
2015-11-13 11:50:32 +01:00
// found one bad healthcheck, return false
if ! healthcheck . Alive {
log . Debugf ( "Filtering marathon task %s with bad healthcheck" , task . AppID )
return false
}
}
}
2015-09-09 22:39:08 +02:00
}
2015-11-13 11:50:32 +01:00
return true
}
2015-09-09 22:39:08 +02:00
2016-09-20 09:44:32 +01:00
func ( provider * Marathon ) applicationFilter ( app marathon . Application , filteredTasks [ ] marathon . Task ) bool {
2016-09-23 22:05:11 +01:00
label , _ := provider . getLabel ( app , "traefik.tags" )
constraintTags := strings . Split ( label , "," )
2016-10-06 17:42:19 +02:00
if provider . MarathonLBCompatibility {
if label , err := provider . getLabel ( app , "HAPROXY_GROUP" ) ; err == nil {
constraintTags = append ( constraintTags , label )
}
}
2016-09-20 09:44:32 +01:00
if ok , failingConstraint := provider . MatchConstraints ( constraintTags ) ; ! ok {
if failingConstraint != nil {
log . Debugf ( "Application %v pruned by '%v' constraint" , app . ID , failingConstraint . String ( ) )
}
return false
}
2015-11-13 11:50:32 +01:00
return fun . Exists ( func ( task marathon . Task ) bool {
return task . AppID == app . ID
} , filteredTasks )
2015-09-10 22:54:37 +02:00
}
2015-10-27 00:26:35 +01:00
func getApplication ( task marathon . Task , apps [ ] marathon . Application ) ( marathon . Application , error ) {
2015-09-10 22:54:37 +02:00
for _ , application := range apps {
2015-09-12 15:10:03 +02:00
if application . ID == task . AppID {
2015-10-27 00:26:35 +01:00
return application , nil
2015-09-10 22:54:37 +02:00
}
}
2015-10-27 00:26:35 +01:00
return marathon . Application { } , errors . New ( "Application not found: " + task . AppID )
2015-09-12 15:10:03 +02:00
}
2015-10-23 09:49:19 +02:00
2016-03-21 12:37:02 +03:00
func isApplicationEnabled ( application marathon . Application , exposedByDefault bool ) bool {
2016-06-18 14:51:52 +02:00
return exposedByDefault && ( * application . Labels ) [ "traefik.enable" ] != "false" || ( * application . Labels ) [ "traefik.enable" ] == "true"
2016-03-21 12:37:02 +03:00
}
2015-11-02 19:48:34 +01:00
func ( provider * Marathon ) getLabel ( application marathon . Application , label string ) ( string , error ) {
2016-06-18 14:51:52 +02:00
for key , value := range * application . Labels {
2015-10-23 09:49:19 +02:00
if key == label {
return value , nil
}
}
return "" , errors . New ( "Label not found:" + label )
}
2015-12-05 10:59:01 -08:00
func ( provider * Marathon ) getPort ( task marathon . Task , applications [ ] marathon . Application ) string {
application , err := getApplication ( task , applications )
if err != nil {
log . Errorf ( "Unable to get marathon application from task %s" , task . AppID )
return ""
}
if portIndexLabel , err := provider . getLabel ( application , "traefik.portIndex" ) ; err == nil {
if index , err := strconv . Atoi ( portIndexLabel ) ; err == nil {
return strconv . Itoa ( task . Ports [ index ] )
}
}
if portValueLabel , err := provider . getLabel ( application , "traefik.port" ) ; err == nil {
return portValueLabel
}
2015-11-13 11:50:32 +01:00
for _ , port := range task . Ports {
return strconv . Itoa ( port )
}
return ""
}
func ( provider * Marathon ) getWeight ( task marathon . Task , applications [ ] marathon . Application ) string {
application , errApp := getApplication ( task , applications )
if errApp != nil {
log . Errorf ( "Unable to get marathon application from task %s" , task . AppID )
return "0"
}
if label , err := provider . getLabel ( application , "traefik.weight" ) ; err == nil {
return label
}
return "0"
}
func ( provider * Marathon ) getDomain ( application marathon . Application ) string {
if label , err := provider . getLabel ( application , "traefik.domain" ) ; err == nil {
return label
}
return provider . Domain
}
func ( provider * Marathon ) getProtocol ( task marathon . Task , applications [ ] marathon . Application ) string {
application , errApp := getApplication ( task , applications )
if errApp != nil {
log . Errorf ( "Unable to get marathon application from task %s" , task . AppID )
return "http"
}
if label , err := provider . getLabel ( application , "traefik.protocol" ) ; err == nil {
return label
}
return "http"
}
2016-05-13 10:22:11 -04:00
func ( provider * Marathon ) getSticky ( application marathon . Application ) string {
2016-09-28 13:28:20 +02:00
if sticky , err := provider . getLabel ( application , "traefik.backend.loadbalancer.sticky" ) ; err == nil {
2016-05-13 10:22:11 -04:00
return sticky
}
return "false"
}
2015-11-13 11:50:32 +01:00
func ( provider * Marathon ) getPassHostHeader ( application marathon . Application ) string {
if passHostHeader , err := provider . getLabel ( application , "traefik.frontend.passHostHeader" ) ; err == nil {
return passHostHeader
}
2016-05-10 07:43:24 -04:00
return "true"
2015-10-23 09:49:19 +02:00
}
2016-06-06 22:30:23 +02:00
func ( provider * Marathon ) getPriority ( application marathon . Application ) string {
if priority , err := provider . getLabel ( application , "traefik.frontend.priority" ) ; err == nil {
return priority
}
return "0"
}
2016-02-01 16:08:58 +01:00
func ( provider * Marathon ) getEntryPoints ( application marathon . Application ) [ ] string {
if entryPoints , err := provider . getLabel ( application , "traefik.frontend.entryPoints" ) ; err == nil {
return strings . Split ( entryPoints , "," )
}
return [ ] string { }
}
2015-11-13 11:50:32 +01:00
// getFrontendRule returns the frontend rule for the specified application, using
2015-11-01 19:29:47 +01:00
// it's label. It returns a default one (Host) if the label is not present.
2015-11-13 11:50:32 +01:00
func ( provider * Marathon ) getFrontendRule ( application marathon . Application ) string {
2015-10-23 09:49:19 +02:00
if label , err := provider . getLabel ( application , "traefik.frontend.rule" ) ; err == nil {
return label
}
2016-10-06 17:42:19 +02:00
if provider . MarathonLBCompatibility {
if label , err := provider . getLabel ( application , "HAPROXY_0_VHOST" ) ; err == nil {
return "Host:" + label
}
}
2016-06-01 16:47:39 +02:00
return "Host:" + provider . getSubDomain ( application . ID ) + "." + provider . Domain
2015-10-23 09:49:19 +02:00
}
2016-01-20 18:55:10 +00:00
2016-02-12 14:45:36 +01:00
func ( provider * Marathon ) getBackend ( task marathon . Task , applications [ ] marathon . Application ) string {
application , errApp := getApplication ( task , applications )
if errApp != nil {
log . Errorf ( "Unable to get marathon application from task %s" , task . AppID )
return ""
}
return provider . getFrontendBackend ( application )
}
func ( provider * Marathon ) getFrontendBackend ( application marathon . Application ) string {
2016-01-20 18:55:10 +00:00
if label , err := provider . getLabel ( application , "traefik.backend" ) ; err == nil {
return label
}
return replace ( "/" , "-" , application . ID )
}
2016-05-31 23:23:23 +02:00
2016-06-01 16:47:39 +02:00
func ( provider * Marathon ) getSubDomain ( name string ) string {
if provider . GroupsAsSubDomains {
splitedName := strings . Split ( strings . TrimPrefix ( name , "/" ) , "/" )
sort . Sort ( sort . Reverse ( sort . StringSlice ( splitedName ) ) )
reverseName := strings . Join ( splitedName , "." )
return reverseName
}
return strings . Replace ( strings . TrimPrefix ( name , "/" ) , "/" , "-" , - 1 )
2016-05-31 23:23:23 +02:00
}
2016-08-13 12:55:15 -04:00
func ( provider * Marathon ) hasCircuitBreakerLabels ( application marathon . Application ) bool {
if _ , err := provider . getLabel ( application , "traefik.backend.circuitbreaker.expression" ) ; err != nil {
return false
}
return true
}
func ( provider * Marathon ) hasLoadBalancerLabels ( application marathon . Application ) bool {
2016-09-28 13:28:20 +02:00
_ , errMethod := provider . getLabel ( application , "traefik.backend.loadbalancer.method" )
_ , errSticky := provider . getLabel ( application , "traefik.backend.loadbalancer.sticky" )
if errMethod != nil && errSticky != nil {
2016-08-13 12:55:15 -04:00
return false
}
return true
}
func ( provider * Marathon ) hasMaxConnLabels ( application marathon . Application ) bool {
if _ , err := provider . getLabel ( application , "traefik.backend.maxconn.amount" ) ; err != nil {
return false
}
if _ , err := provider . getLabel ( application , "traefik.backend.maxconn.extractorfunc" ) ; err != nil {
return false
}
return true
}
func ( provider * Marathon ) getMaxConnAmount ( application marathon . Application ) int64 {
if label , err := provider . getLabel ( application , "traefik.backend.maxconn.amount" ) ; err == nil {
i , errConv := strconv . ParseInt ( label , 10 , 64 )
if errConv != nil {
log . Errorf ( "Unable to parse traefik.backend.maxconn.amount %s" , label )
return math . MaxInt64
}
return i
}
return math . MaxInt64
}
func ( provider * Marathon ) getMaxConnExtractorFunc ( application marathon . Application ) string {
if label , err := provider . getLabel ( application , "traefik.backend.maxconn.extractorfunc" ) ; err == nil {
return label
}
return "request.host"
}
func ( provider * Marathon ) getLoadBalancerMethod ( application marathon . Application ) string {
if label , err := provider . getLabel ( application , "traefik.backend.loadbalancer.method" ) ; err == nil {
return label
}
return "wrr"
}
func ( provider * Marathon ) getCircuitBreakerExpression ( application marathon . Application ) string {
if label , err := provider . getLabel ( application , "traefik.backend.circuitbreaker.expression" ) ; err == nil {
return label
}
return "NetworkErrorRatio() > 1"
}