2017-04-17 13:50:02 +03:00
package consul
2016-02-02 20:03:40 +03:00
import (
2017-05-08 20:46:53 +03:00
"bytes"
2016-02-02 20:03:40 +03:00
"errors"
2016-06-20 20:13:22 +03:00
"sort"
2016-04-15 10:56:06 +03:00
"strconv"
2016-02-02 20:03:40 +03:00
"strings"
"text/template"
"time"
2016-05-20 18:17:38 +03:00
"github.com/BurntSushi/ty/fun"
2016-09-19 20:08:39 +03:00
"github.com/cenk/backoff"
"github.com/containous/traefik/job"
2016-09-23 19:27:01 +03:00
"github.com/containous/traefik/log"
2017-04-17 13:50:02 +03:00
"github.com/containous/traefik/provider"
2016-03-31 19:57:08 +03:00
"github.com/containous/traefik/safe"
2016-02-24 18:43:39 +03:00
"github.com/containous/traefik/types"
2016-02-02 20:03:40 +03:00
"github.com/hashicorp/consul/api"
)
const (
// DefaultWatchWaitTime is the duration to wait when polling consul
DefaultWatchWaitTime = 15 * time . Second
)
2017-04-17 13:50:02 +03:00
var _ provider . Provider = ( * CatalogProvider ) ( nil )
2016-08-16 20:13:18 +03:00
2017-04-17 13:50:02 +03:00
// CatalogProvider holds configurations of the Consul catalog provider.
type CatalogProvider struct {
2017-10-02 11:32:02 +03:00
provider . BaseProvider ` mapstructure:",squash" export:"true" `
2017-04-17 13:50:02 +03:00
Endpoint string ` description:"Consul server endpoint" `
Domain string ` description:"Default domain used" `
2017-10-02 11:32:02 +03:00
ExposedByDefault bool ` description:"Expose Consul services by default" export:"true" `
Prefix string ` description:"Prefix used for Consul catalog tags" export:"true" `
FrontEndRule string ` description:"Frontend rule used for Consul services" export:"true" `
2017-04-17 13:50:02 +03:00
client * api . Client
2017-05-08 20:46:53 +03:00
frontEndRuleTemplate * template . Template
2016-04-12 10:49:37 +03:00
}
type serviceUpdate struct {
ServiceName string
Attributes [ ] string
2016-02-02 20:03:40 +03:00
}
type catalogUpdate struct {
2016-04-12 10:49:37 +03:00
Service * serviceUpdate
2016-02-02 20:03:40 +03:00
Nodes [ ] * api . ServiceEntry
}
2016-06-20 20:13:22 +03:00
type nodeSorter [ ] * api . ServiceEntry
func ( a nodeSorter ) Len ( ) int {
return len ( a )
}
func ( a nodeSorter ) Swap ( i int , j int ) {
a [ i ] , a [ j ] = a [ j ] , a [ i ]
}
func ( a nodeSorter ) Less ( i int , j int ) bool {
lentr := a [ i ]
rentr := a [ j ]
ls := strings . ToLower ( lentr . Service . Service )
lr := strings . ToLower ( rentr . Service . Service )
if ls != lr {
return ls < lr
}
if lentr . Service . Address != rentr . Service . Address {
return lentr . Service . Address < rentr . Service . Address
}
if lentr . Node . Address != rentr . Node . Address {
return lentr . Node . Address < rentr . Node . Address
}
return lentr . Service . Port < rentr . Service . Port
}
2017-11-13 14:14:02 +03:00
func hasChanged ( current map [ string ] Service , previous map [ string ] Service ) bool {
addedServiceKeys , removedServiceKeys := getChangedServiceKeys ( current , previous )
return len ( removedServiceKeys ) > 0 || len ( addedServiceKeys ) > 0 || hasNodeOrTagsChanged ( current , previous )
}
func getChangedServiceKeys ( current map [ string ] Service , previous map [ string ] Service ) ( [ ] string , [ ] string ) {
currKeySet := fun . Set ( fun . Keys ( current ) . ( [ ] string ) ) . ( map [ string ] bool )
prevKeySet := fun . Set ( fun . Keys ( previous ) . ( [ ] string ) ) . ( map [ string ] bool )
2016-02-02 20:03:40 +03:00
2017-06-18 12:38:35 +03:00
addedKeys := fun . Difference ( currKeySet , prevKeySet ) . ( map [ string ] bool )
removedKeys := fun . Difference ( prevKeySet , currKeySet ) . ( map [ string ] bool )
return fun . Keys ( addedKeys ) . ( [ ] string ) , fun . Keys ( removedKeys ) . ( [ ] string )
}
2017-11-13 14:14:02 +03:00
func hasNodeOrTagsChanged ( current map [ string ] Service , previous map [ string ] Service ) bool {
var added [ ] string
var removed [ ] string
for key , value := range current {
if prevValue , ok := previous [ key ] ; ok {
addedNodesKeys , removedNodesKeys := getChangedStringKeys ( value . Nodes , prevValue . Nodes )
added = append ( added , addedNodesKeys ... )
removed = append ( removed , removedNodesKeys ... )
addedTagsKeys , removedTagsKeys := getChangedStringKeys ( value . Tags , prevValue . Tags )
added = append ( added , addedTagsKeys ... )
removed = append ( removed , removedTagsKeys ... )
2017-09-08 21:50:04 +03:00
}
}
2017-11-13 14:14:02 +03:00
return len ( added ) > 0 || len ( removed ) > 0
2017-09-08 21:50:04 +03:00
}
2017-11-13 14:14:02 +03:00
func getChangedStringKeys ( currState [ ] string , prevState [ ] string ) ( [ ] string , [ ] string ) {
2017-09-08 21:50:04 +03:00
currKeySet := fun . Set ( currState ) . ( map [ string ] bool )
prevKeySet := fun . Set ( prevState ) . ( map [ string ] bool )
addedKeys := fun . Difference ( currKeySet , prevKeySet ) . ( map [ string ] bool )
removedKeys := fun . Difference ( prevKeySet , currKeySet ) . ( map [ string ] bool )
return fun . Keys ( addedKeys ) . ( [ ] string ) , fun . Keys ( removedKeys ) . ( [ ] string )
}
2017-10-16 17:58:03 +03:00
func ( p * CatalogProvider ) watchHealthState ( stopCh <- chan struct { } , watchCh chan <- map [ string ] [ ] string , errorCh chan <- error ) {
2017-03-07 02:46:37 +03:00
health := p . client . Health ( )
2017-06-18 12:38:35 +03:00
catalog := p . client . Catalog ( )
2016-02-02 20:03:40 +03:00
2016-03-31 19:57:08 +03:00
safe . Go ( func ( ) {
2017-06-18 12:38:35 +03:00
// variable to hold previous state
2017-09-08 21:50:04 +03:00
var flashback [ ] string
2016-02-02 20:03:40 +03:00
2017-06-18 12:38:35 +03:00
options := & api . QueryOptions { WaitTime : DefaultWatchWaitTime }
2016-02-02 20:03:40 +03:00
for {
select {
case <- stopCh :
return
default :
}
2017-03-07 02:46:37 +03:00
// Listening to changes that leads to `passing` state or degrades from it.
2017-09-08 21:50:04 +03:00
healthyState , meta , err := health . State ( "passing" , options )
2017-03-07 02:46:37 +03:00
if err != nil {
2017-05-26 18:03:14 +03:00
log . WithError ( err ) . Error ( "Failed to retrieve health checks" )
2017-10-16 17:58:03 +03:00
errorCh <- err
2017-03-07 02:46:37 +03:00
return
}
2017-09-08 21:50:04 +03:00
var current [ ] string
if healthyState != nil {
for _ , healthy := range healthyState {
current = append ( current , healthy . ServiceID )
}
}
2016-02-02 20:03:40 +03:00
// If LastIndex didn't change then it means `Get` returned
// because of the WaitTime and the key didn't changed.
2017-06-18 12:38:35 +03:00
if options . WaitIndex == meta . LastIndex {
2016-02-02 20:03:40 +03:00
continue
}
2017-06-18 12:38:35 +03:00
options . WaitIndex = meta . LastIndex
// The response should be unified with watchCatalogServices
data , _ , err := catalog . Services ( & api . QueryOptions { } )
if err != nil {
log . Errorf ( "Failed to list services: %s" , err )
2017-10-16 17:58:03 +03:00
errorCh <- err
2017-06-18 12:38:35 +03:00
return
}
2016-02-02 20:03:40 +03:00
if data != nil {
2017-06-18 12:38:35 +03:00
// A critical note is that the return of a blocking request is no guarantee of a change.
// It is possible that there was an idempotent write that does not affect the result of the query.
// Thus it is required to do extra check for changes...
2017-11-13 14:14:02 +03:00
addedKeys , removedKeys := getChangedStringKeys ( current , flashback )
2017-06-18 12:38:35 +03:00
if len ( addedKeys ) > 0 {
log . WithField ( "DiscoveredServices" , addedKeys ) . Debug ( "Health State change detected." )
watchCh <- data
2017-09-08 21:50:04 +03:00
flashback = current
2017-06-18 12:38:35 +03:00
}
if len ( removedKeys ) > 0 {
log . WithField ( "MissingServices" , removedKeys ) . Debug ( "Health State change detected." )
watchCh <- data
2017-09-08 21:50:04 +03:00
flashback = current
2017-06-18 12:38:35 +03:00
}
2016-02-02 20:03:40 +03:00
}
}
2016-03-31 19:57:08 +03:00
} )
2017-06-18 12:38:35 +03:00
}
2017-09-08 21:50:04 +03:00
// Service represent a Consul service.
type Service struct {
Name string
Tags [ ] string
Nodes [ ] string
}
2017-10-16 17:58:03 +03:00
func ( p * CatalogProvider ) watchCatalogServices ( stopCh <- chan struct { } , watchCh chan <- map [ string ] [ ] string , errorCh chan <- error ) {
2017-06-18 12:38:35 +03:00
catalog := p . client . Catalog ( )
safe . Go ( func ( ) {
// variable to hold previous state
2017-09-08 21:50:04 +03:00
var flashback map [ string ] Service
2017-06-18 12:38:35 +03:00
options := & api . QueryOptions { WaitTime : DefaultWatchWaitTime }
2016-02-02 20:03:40 +03:00
2017-06-18 12:38:35 +03:00
for {
select {
case <- stopCh :
return
default :
}
data , meta , err := catalog . Services ( options )
if err != nil {
log . Errorf ( "Failed to list services: %s" , err )
2017-10-16 17:58:03 +03:00
errorCh <- err
2017-06-18 12:38:35 +03:00
return
}
if options . WaitIndex == meta . LastIndex {
continue
}
options . WaitIndex = meta . LastIndex
if data != nil {
2017-09-29 17:30:03 +03:00
current := make ( map [ string ] Service )
2017-09-08 21:50:04 +03:00
for key , value := range data {
2017-09-20 20:08:02 +03:00
nodes , _ , err := catalog . Service ( key , "" , & api . QueryOptions { } )
2017-09-08 21:50:04 +03:00
if err != nil {
log . Errorf ( "Failed to get detail of service %s: %s" , key , err )
2017-10-16 17:58:03 +03:00
errorCh <- err
2017-09-08 21:50:04 +03:00
return
}
nodesID := getServiceIds ( nodes )
if service , ok := current [ key ] ; ok {
service . Tags = value
service . Nodes = nodesID
} else {
service := Service {
Name : key ,
Tags : value ,
Nodes : nodesID ,
}
current [ key ] = service
}
}
2017-06-18 12:38:35 +03:00
// A critical note is that the return of a blocking request is no guarantee of a change.
// It is possible that there was an idempotent write that does not affect the result of the query.
// Thus it is required to do extra check for changes...
2017-11-13 14:14:02 +03:00
if hasChanged ( current , flashback ) {
2017-06-18 12:38:35 +03:00
watchCh <- data
2017-09-29 17:30:03 +03:00
flashback = current
2017-06-18 12:38:35 +03:00
}
}
}
} )
2016-02-02 20:03:40 +03:00
}
2017-11-13 14:14:02 +03:00
2017-09-08 21:50:04 +03:00
func getServiceIds ( services [ ] * api . CatalogService ) [ ] string {
var serviceIds [ ] string
for _ , service := range services {
serviceIds = append ( serviceIds , service . ServiceID )
}
return serviceIds
}
2016-02-02 20:03:40 +03:00
2017-04-17 13:50:02 +03:00
func ( p * CatalogProvider ) healthyNodes ( service string ) ( catalogUpdate , error ) {
health := p . client . Health ( )
2016-02-02 20:03:40 +03:00
opts := & api . QueryOptions { }
data , _ , err := health . Service ( service , "" , true , opts )
if err != nil {
2017-05-26 18:03:14 +03:00
log . WithError ( err ) . Errorf ( "Failed to fetch details of %s" , service )
2016-02-02 20:03:40 +03:00
return catalogUpdate { } , err
}
2016-05-20 18:17:38 +03:00
nodes := fun . Filter ( func ( node * api . ServiceEntry ) bool {
2017-08-25 18:32:03 +03:00
return p . nodeFilter ( service , node )
2016-05-20 18:17:38 +03:00
} , data ) . ( [ ] * api . ServiceEntry )
//Merge tags of nodes matching constraints, in a single slice.
tags := fun . Foldl ( func ( node * api . ServiceEntry , set [ ] string ) [ ] string {
return fun . Keys ( fun . Union (
fun . Set ( set ) ,
fun . Set ( node . Service . Tags ) ,
) . ( map [ string ] bool ) ) . ( [ ] string )
} , [ ] string { } , nodes ) . ( [ ] string )
2016-04-12 10:49:37 +03:00
2016-02-02 20:03:40 +03:00
return catalogUpdate {
2016-04-12 10:49:37 +03:00
Service : & serviceUpdate {
ServiceName : service ,
Attributes : tags ,
} ,
2016-05-30 16:05:58 +03:00
Nodes : nodes ,
2016-02-02 20:03:40 +03:00
} , nil
}
2017-08-25 18:32:03 +03:00
func ( p * CatalogProvider ) nodeFilter ( service string , node * api . ServiceEntry ) bool {
// Filter disabled application.
if ! p . isServiceEnabled ( node ) {
log . Debugf ( "Filtering disabled Consul service %s" , service )
return false
}
// Filter by constraints.
constraintTags := p . getConstraintTags ( node . Service . Tags )
ok , failingConstraint := p . MatchConstraints ( constraintTags )
if ! ok && failingConstraint != nil {
log . Debugf ( "Service %v pruned by '%v' constraint" , service , failingConstraint . String ( ) )
return false
}
return true
}
func ( p * CatalogProvider ) isServiceEnabled ( node * api . ServiceEntry ) bool {
enable , err := strconv . ParseBool ( p . getAttribute ( "enable" , node . Service . Tags , strconv . FormatBool ( p . ExposedByDefault ) ) )
if err != nil {
log . Debugf ( "Invalid value for enable, set to %b" , p . ExposedByDefault )
return p . ExposedByDefault
}
return enable
}
2017-05-08 20:46:53 +03:00
func ( p * CatalogProvider ) getPrefixedName ( name string ) string {
if len ( p . Prefix ) > 0 {
return p . Prefix + "." + name
}
return name
}
2017-04-17 13:50:02 +03:00
func ( p * CatalogProvider ) getEntryPoints ( list string ) [ ] string {
2016-04-12 10:49:37 +03:00
return strings . Split ( list , "," )
}
2017-04-17 13:50:02 +03:00
func ( p * CatalogProvider ) getBackend ( node * api . ServiceEntry ) string {
2016-02-02 20:03:40 +03:00
return strings . ToLower ( node . Service . Service )
}
2017-04-17 13:50:02 +03:00
func ( p * CatalogProvider ) getFrontendRule ( service serviceUpdate ) string {
customFrontendRule := p . getAttribute ( "frontend.rule" , service . Attributes , "" )
2017-05-08 20:46:53 +03:00
if customFrontendRule == "" {
customFrontendRule = p . FrontEndRule
}
t := p . frontEndRuleTemplate
t , err := t . Parse ( customFrontendRule )
if err != nil {
log . Errorf ( "failed to parse Consul Catalog custom frontend rule: %s" , err )
return ""
}
templateObjects := struct {
ServiceName string
Domain string
Attributes [ ] string
} {
ServiceName : service . ServiceName ,
Domain : p . Domain ,
Attributes : service . Attributes ,
}
var buffer bytes . Buffer
err = t . Execute ( & buffer , templateObjects )
if err != nil {
log . Errorf ( "failed to execute Consul Catalog custom frontend rule template: %s" , err )
return ""
2016-04-12 10:49:37 +03:00
}
2017-05-08 20:46:53 +03:00
return buffer . String ( )
2016-04-12 10:49:37 +03:00
}
2017-04-17 13:50:02 +03:00
func ( p * CatalogProvider ) getBackendAddress ( node * api . ServiceEntry ) string {
2016-04-15 10:56:06 +03:00
if node . Service . Address != "" {
return node . Service . Address
}
return node . Node . Address
}
2017-04-17 13:50:02 +03:00
func ( p * CatalogProvider ) getBackendName ( node * api . ServiceEntry , index int ) string {
2016-06-20 20:13:22 +03:00
serviceName := strings . ToLower ( node . Service . Service ) + "--" + node . Service . Address + "--" + strconv . Itoa ( node . Service . Port )
2016-05-02 17:14:21 +03:00
for _ , tag := range node . Service . Tags {
2017-04-17 13:50:02 +03:00
serviceName += "--" + provider . Normalize ( tag )
2016-04-15 10:56:06 +03:00
}
2016-05-02 17:14:21 +03:00
2016-04-15 10:56:06 +03:00
serviceName = strings . Replace ( serviceName , "." , "-" , - 1 )
serviceName = strings . Replace ( serviceName , "=" , "-" , - 1 )
// unique int at the end
serviceName += "--" + strconv . Itoa ( index )
return serviceName
}
2017-09-07 16:28:02 +03:00
func ( p * CatalogProvider ) getBasicAuth ( tags [ ] string ) [ ] string {
list := p . getAttribute ( "frontend.auth.basic" , tags , "" )
if list != "" {
return strings . Split ( list , "," )
}
return [ ] string { }
}
2017-10-16 18:38:03 +03:00
func ( p * CatalogProvider ) getSticky ( tags [ ] string ) string {
2017-10-10 12:10:02 +03:00
stickyTag := p . getTag ( types . LabelBackendLoadbalancerSticky , tags , "" )
if len ( stickyTag ) > 0 {
2017-10-12 18:50:03 +03:00
log . Warnf ( "Deprecated configuration found: %s. Please use %s." , types . LabelBackendLoadbalancerSticky , types . LabelBackendLoadbalancerStickiness )
2017-10-16 18:38:03 +03:00
} else {
stickyTag = "false"
2017-10-10 12:10:02 +03:00
}
2017-10-16 18:38:03 +03:00
return stickyTag
}
2017-10-10 12:10:02 +03:00
2017-10-16 18:38:03 +03:00
func ( p * CatalogProvider ) hasStickinessLabel ( tags [ ] string ) bool {
stickinessTag := p . getTag ( types . LabelBackendLoadbalancerStickiness , tags , "" )
return len ( stickinessTag ) > 0 && strings . EqualFold ( strings . TrimSpace ( stickinessTag ) , "true" )
2017-10-10 12:10:02 +03:00
}
func ( p * CatalogProvider ) getStickinessCookieName ( tags [ ] string ) string {
return p . getTag ( types . LabelBackendLoadbalancerStickinessCookieName , tags , "" )
}
func ( p * CatalogProvider ) getAttribute ( name string , tags [ ] string , defaultValue string ) string {
return p . getTag ( p . getPrefixedName ( name ) , tags , defaultValue )
}
2017-05-08 20:46:53 +03:00
func ( p * CatalogProvider ) hasTag ( name string , tags [ ] string ) bool {
// Very-very unlikely that a Consul tag would ever start with '=!='
tag := p . getTag ( name , tags , "=!=" )
return tag != "=!="
}
func ( p * CatalogProvider ) getTag ( name string , tags [ ] string , defaultValue string ) string {
2016-04-12 10:49:37 +03:00
for _ , tag := range tags {
2017-05-08 20:46:53 +03:00
// Given the nature of Consul tags, which could be either singular markers, or key=value pairs, we check if the consul tag starts with 'name'
if strings . Index ( strings . ToLower ( tag ) , strings . ToLower ( name ) ) == 0 {
// In case, where a tag might be a key=value, try to split it by the first '='
// - If the first element (which would always be there, even if the tag is a singular marker without '=' in it
if kv := strings . SplitN ( tag , "=" , 2 ) ; strings . ToLower ( kv [ 0 ] ) == strings . ToLower ( name ) {
// If the returned result is a key=value pair, return the 'value' component
if len ( kv ) == 2 {
return kv [ 1 ]
}
// If the returned result is a singular marker, return the 'key' component
return kv [ 0 ]
2016-04-12 10:49:37 +03:00
}
}
}
return defaultValue
2016-02-02 20:03:40 +03:00
}
2017-05-08 20:46:53 +03:00
func ( p * CatalogProvider ) getConstraintTags ( tags [ ] string ) [ ] string {
2016-05-30 16:05:58 +03:00
var list [ ] string
for _ , tag := range tags {
2017-08-26 13:12:44 +03:00
// We look for a Consul tag named 'traefik.tags' (unless different 'prefix' is configured)
2017-05-08 20:46:53 +03:00
if strings . Index ( strings . ToLower ( tag ) , p . getPrefixedName ( "tags=" ) ) == 0 {
// If 'traefik.tags=' tag is found, take the tag value and split by ',' adding the result to the list to be returned
splitedTags := strings . Split ( tag [ len ( p . getPrefixedName ( "tags=" ) ) : ] , "," )
2016-05-30 16:05:58 +03:00
list = append ( list , splitedTags ... )
}
}
return list
}
2017-04-17 13:50:02 +03:00
func ( p * CatalogProvider ) buildConfig ( catalog [ ] catalogUpdate ) * types . Configuration {
2016-02-02 20:03:40 +03:00
var FuncMap = template . FuncMap {
2017-10-10 12:10:02 +03:00
"getBackend" : p . getBackend ,
"getFrontendRule" : p . getFrontendRule ,
"getBackendName" : p . getBackendName ,
"getBackendAddress" : p . getBackendAddress ,
"getBasicAuth" : p . getBasicAuth ,
2017-10-16 18:38:03 +03:00
"getSticky" : p . getSticky ,
2017-10-10 12:10:02 +03:00
"hasStickinessLabel" : p . hasStickinessLabel ,
"getStickinessCookieName" : p . getStickinessCookieName ,
"getAttribute" : p . getAttribute ,
"getTag" : p . getTag ,
"hasTag" : p . hasTag ,
"getEntryPoints" : p . getEntryPoints ,
"hasMaxconnAttributes" : p . hasMaxconnAttributes ,
2016-02-02 20:03:40 +03:00
}
allNodes := [ ] * api . ServiceEntry { }
2016-04-12 10:49:37 +03:00
services := [ ] * serviceUpdate { }
2016-02-02 20:03:40 +03:00
for _ , info := range catalog {
2017-08-25 18:32:03 +03:00
if len ( info . Nodes ) > 0 {
services = append ( services , info . Service )
allNodes = append ( allNodes , info . Nodes ... )
2016-02-02 20:03:40 +03:00
}
}
2016-06-20 20:13:22 +03:00
// Ensure a stable ordering of nodes so that identical configurations may be detected
sort . Sort ( nodeSorter ( allNodes ) )
2016-02-02 20:03:40 +03:00
templateObjects := struct {
2016-04-12 10:49:37 +03:00
Services [ ] * serviceUpdate
2016-02-02 20:03:40 +03:00
Nodes [ ] * api . ServiceEntry
} {
2016-04-12 10:49:37 +03:00
Services : services ,
2016-02-02 20:03:40 +03:00
Nodes : allNodes ,
}
2017-04-17 13:50:02 +03:00
configuration , err := p . GetConfiguration ( "templates/consul_catalog.tmpl" , FuncMap , templateObjects )
2016-02-02 20:03:40 +03:00
if err != nil {
log . WithError ( err ) . Error ( "Failed to create config" )
}
return configuration
}
2017-04-17 13:50:02 +03:00
func ( p * CatalogProvider ) hasMaxconnAttributes ( attributes [ ] string ) bool {
amount := p . getAttribute ( "backend.maxconn.amount" , attributes , "" )
extractorfunc := p . getAttribute ( "backend.maxconn.extractorfunc" , attributes , "" )
2016-08-25 06:46:47 +03:00
if amount != "" && extractorfunc != "" {
return true
}
return false
}
2017-04-17 13:50:02 +03:00
func ( p * CatalogProvider ) getNodes ( index map [ string ] [ ] string ) ( [ ] catalogUpdate , error ) {
2016-02-02 20:03:40 +03:00
visited := make ( map [ string ] bool )
nodes := [ ] catalogUpdate { }
for service := range index {
name := strings . ToLower ( service )
if ! strings . Contains ( name , " " ) && ! visited [ name ] {
visited [ name ] = true
2017-05-26 18:03:14 +03:00
log . WithField ( "service" , name ) . Debug ( "Fetching service" )
2017-04-17 13:50:02 +03:00
healthy , err := p . healthyNodes ( name )
2016-02-02 20:03:40 +03:00
if err != nil {
return nil , err
}
2016-05-30 16:05:58 +03:00
// healthy.Nodes can be empty if constraints do not match, without throwing error
if healthy . Service != nil && len ( healthy . Nodes ) > 0 {
nodes = append ( nodes , healthy )
}
2016-02-02 20:03:40 +03:00
}
}
return nodes , nil
}
2017-04-17 13:50:02 +03:00
func ( p * CatalogProvider ) watch ( configurationChan chan <- types . ConfigMessage , stop chan bool ) error {
2016-02-02 20:03:40 +03:00
stopCh := make ( chan struct { } )
2017-06-18 12:38:35 +03:00
watchCh := make ( chan map [ string ] [ ] string )
2017-10-16 17:58:03 +03:00
errorCh := make ( chan error )
2017-06-18 12:38:35 +03:00
2017-10-16 17:58:03 +03:00
p . watchHealthState ( stopCh , watchCh , errorCh )
p . watchCatalogServices ( stopCh , watchCh , errorCh )
2016-02-02 20:03:40 +03:00
defer close ( stopCh )
2017-06-18 12:38:35 +03:00
defer close ( watchCh )
2016-02-02 20:03:40 +03:00
for {
select {
2016-04-13 21:36:23 +03:00
case <- stop :
return nil
2017-06-18 12:38:35 +03:00
case index , ok := <- watchCh :
2016-02-02 20:03:40 +03:00
if ! ok {
return errors . New ( "Consul service list nil" )
}
log . Debug ( "List of services changed" )
2017-04-17 13:50:02 +03:00
nodes , err := p . getNodes ( index )
2016-02-02 20:03:40 +03:00
if err != nil {
return err
}
2017-04-17 13:50:02 +03:00
configuration := p . buildConfig ( nodes )
2016-02-02 20:03:40 +03:00
configurationChan <- types . ConfigMessage {
ProviderName : "consul_catalog" ,
Configuration : configuration ,
}
2017-10-16 17:58:03 +03:00
case err := <- errorCh :
return err
2016-02-02 20:03:40 +03:00
}
}
}
2017-05-08 20:46:53 +03:00
func ( p * CatalogProvider ) setupFrontEndTemplate ( ) {
var FuncMap = template . FuncMap {
"getAttribute" : p . getAttribute ,
"getTag" : p . getTag ,
"hasTag" : p . hasTag ,
}
t := template . New ( "consul catalog frontend rule" ) . Funcs ( FuncMap )
p . frontEndRuleTemplate = t
}
2017-04-17 13:50:02 +03:00
// Provide allows the consul catalog provider to provide configurations to traefik
2016-02-02 20:03:40 +03:00
// using the given configuration channel.
2017-04-17 13:50:02 +03:00
func ( p * CatalogProvider ) Provide ( configurationChan chan <- types . ConfigMessage , pool * safe . Pool , constraints types . Constraints ) error {
2016-02-02 20:03:40 +03:00
config := api . DefaultConfig ( )
2017-04-17 13:50:02 +03:00
config . Address = p . Endpoint
2016-02-02 20:03:40 +03:00
client , err := api . NewClient ( config )
if err != nil {
return err
}
2017-04-17 13:50:02 +03:00
p . client = client
p . Constraints = append ( p . Constraints , constraints ... )
2017-05-08 20:46:53 +03:00
p . setupFrontEndTemplate ( )
2016-02-02 20:03:40 +03:00
2016-04-13 21:36:23 +03:00
pool . Go ( func ( stop chan bool ) {
2016-02-02 20:03:40 +03:00
notify := func ( err error , time time . Duration ) {
log . Errorf ( "Consul connection error %+v, retrying in %s" , err , time )
}
2016-08-19 15:24:09 +03:00
operation := func ( ) error {
2017-04-17 13:50:02 +03:00
return p . watch ( configurationChan , stop )
2016-02-02 20:03:40 +03:00
}
2016-12-08 15:32:12 +03:00
err := backoff . RetryNotify ( safe . OperationWithRecover ( operation ) , job . NewBackOff ( backoff . NewExponentialBackOff ( ) ) , notify )
2016-02-02 20:03:40 +03:00
if err != nil {
2016-08-19 11:36:54 +03:00
log . Errorf ( "Cannot connect to consul server %+v" , err )
2016-02-02 20:03:40 +03:00
}
2016-03-31 19:57:08 +03:00
} )
2016-02-02 20:03:40 +03:00
return err
}