mirror of
https://github.com/containous/traefik.git
synced 2025-01-11 05:17:52 +03:00
cbccdd51c5
- log & error: remove format if not necessary, add if necessary. - add constants for k8s annotations. - fix typos
356 lines
9.7 KiB
Go
356 lines
9.7 KiB
Go
package consul
|
|
|
|
import (
|
|
"errors"
|
|
"sort"
|
|
"strconv"
|
|
"strings"
|
|
"text/template"
|
|
"time"
|
|
|
|
"github.com/BurntSushi/ty/fun"
|
|
"github.com/cenk/backoff"
|
|
"github.com/containous/traefik/job"
|
|
"github.com/containous/traefik/log"
|
|
"github.com/containous/traefik/provider"
|
|
"github.com/containous/traefik/safe"
|
|
"github.com/containous/traefik/types"
|
|
"github.com/hashicorp/consul/api"
|
|
)
|
|
|
|
const (
|
|
// DefaultWatchWaitTime is the duration to wait when polling consul
|
|
DefaultWatchWaitTime = 15 * time.Second
|
|
)
|
|
|
|
var _ provider.Provider = (*CatalogProvider)(nil)
|
|
|
|
// CatalogProvider holds configurations of the Consul catalog provider.
|
|
type CatalogProvider struct {
|
|
provider.BaseProvider `mapstructure:",squash"`
|
|
Endpoint string `description:"Consul server endpoint"`
|
|
Domain string `description:"Default domain used"`
|
|
Prefix string `description:"Prefix used for Consul catalog tags"`
|
|
client *api.Client
|
|
}
|
|
|
|
type serviceUpdate struct {
|
|
ServiceName string
|
|
Attributes []string
|
|
}
|
|
|
|
type catalogUpdate struct {
|
|
Service *serviceUpdate
|
|
Nodes []*api.ServiceEntry
|
|
}
|
|
|
|
type nodeSorter []*api.ServiceEntry
|
|
|
|
func (a nodeSorter) Len() int {
|
|
return len(a)
|
|
}
|
|
|
|
func (a nodeSorter) Swap(i int, j int) {
|
|
a[i], a[j] = a[j], a[i]
|
|
}
|
|
|
|
func (a nodeSorter) Less(i int, j int) bool {
|
|
lentr := a[i]
|
|
rentr := a[j]
|
|
|
|
ls := strings.ToLower(lentr.Service.Service)
|
|
lr := strings.ToLower(rentr.Service.Service)
|
|
|
|
if ls != lr {
|
|
return ls < lr
|
|
}
|
|
if lentr.Service.Address != rentr.Service.Address {
|
|
return lentr.Service.Address < rentr.Service.Address
|
|
}
|
|
if lentr.Node.Address != rentr.Node.Address {
|
|
return lentr.Node.Address < rentr.Node.Address
|
|
}
|
|
return lentr.Service.Port < rentr.Service.Port
|
|
}
|
|
|
|
func (p *CatalogProvider) watchServices(stopCh <-chan struct{}) <-chan map[string][]string {
|
|
watchCh := make(chan map[string][]string)
|
|
|
|
catalog := p.client.Catalog()
|
|
health := p.client.Health()
|
|
var lastHealthIndex uint64
|
|
|
|
safe.Go(func() {
|
|
defer close(watchCh)
|
|
|
|
catalogOptions := &api.QueryOptions{WaitTime: DefaultWatchWaitTime}
|
|
healthOptions := &api.QueryOptions{}
|
|
|
|
for {
|
|
select {
|
|
case <-stopCh:
|
|
return
|
|
default:
|
|
}
|
|
|
|
data, catalogMeta, err := catalog.Services(catalogOptions)
|
|
if err != nil {
|
|
log.WithError(err).Error("Failed to list services")
|
|
return
|
|
}
|
|
|
|
// Listening to changes that leads to `passing` state or degrades from it.
|
|
// The call is used just as a trigger for further actions
|
|
// (intentionally there is no interest in the received data).
|
|
_, healthMeta, err := health.State("passing", healthOptions)
|
|
if err != nil {
|
|
log.WithError(err).Error("Failed to retrieve health checks")
|
|
return
|
|
}
|
|
|
|
// If LastIndex didn't change then it means `Get` returned
|
|
// because of the WaitTime and the key didn't changed.
|
|
sameServiceAmount := catalogOptions.WaitIndex == catalogMeta.LastIndex
|
|
sameServiceHealth := lastHealthIndex == healthMeta.LastIndex
|
|
if sameServiceAmount && sameServiceHealth {
|
|
continue
|
|
}
|
|
catalogOptions.WaitIndex = catalogMeta.LastIndex
|
|
lastHealthIndex = healthMeta.LastIndex
|
|
|
|
if data != nil {
|
|
watchCh <- data
|
|
}
|
|
}
|
|
})
|
|
|
|
return watchCh
|
|
}
|
|
|
|
func (p *CatalogProvider) healthyNodes(service string) (catalogUpdate, error) {
|
|
health := p.client.Health()
|
|
opts := &api.QueryOptions{}
|
|
data, _, err := health.Service(service, "", true, opts)
|
|
if err != nil {
|
|
log.WithError(err).Errorf("Failed to fetch details of %s", service)
|
|
return catalogUpdate{}, err
|
|
}
|
|
|
|
nodes := fun.Filter(func(node *api.ServiceEntry) bool {
|
|
constraintTags := p.getContraintTags(node.Service.Tags)
|
|
ok, failingConstraint := p.MatchConstraints(constraintTags)
|
|
if ok == false && failingConstraint != nil {
|
|
log.Debugf("Service %v pruned by '%v' constraint", service, failingConstraint.String())
|
|
}
|
|
return ok
|
|
}, data).([]*api.ServiceEntry)
|
|
|
|
//Merge tags of nodes matching constraints, in a single slice.
|
|
tags := fun.Foldl(func(node *api.ServiceEntry, set []string) []string {
|
|
return fun.Keys(fun.Union(
|
|
fun.Set(set),
|
|
fun.Set(node.Service.Tags),
|
|
).(map[string]bool)).([]string)
|
|
}, []string{}, nodes).([]string)
|
|
|
|
return catalogUpdate{
|
|
Service: &serviceUpdate{
|
|
ServiceName: service,
|
|
Attributes: tags,
|
|
},
|
|
Nodes: nodes,
|
|
}, nil
|
|
}
|
|
|
|
func (p *CatalogProvider) getEntryPoints(list string) []string {
|
|
return strings.Split(list, ",")
|
|
}
|
|
|
|
func (p *CatalogProvider) getBackend(node *api.ServiceEntry) string {
|
|
return strings.ToLower(node.Service.Service)
|
|
}
|
|
|
|
func (p *CatalogProvider) getFrontendRule(service serviceUpdate) string {
|
|
customFrontendRule := p.getAttribute("frontend.rule", service.Attributes, "")
|
|
if customFrontendRule != "" {
|
|
return customFrontendRule
|
|
}
|
|
return "Host:" + service.ServiceName + "." + p.Domain
|
|
}
|
|
|
|
func (p *CatalogProvider) getBackendAddress(node *api.ServiceEntry) string {
|
|
if node.Service.Address != "" {
|
|
return node.Service.Address
|
|
}
|
|
return node.Node.Address
|
|
}
|
|
|
|
func (p *CatalogProvider) getBackendName(node *api.ServiceEntry, index int) string {
|
|
serviceName := strings.ToLower(node.Service.Service) + "--" + node.Service.Address + "--" + strconv.Itoa(node.Service.Port)
|
|
|
|
for _, tag := range node.Service.Tags {
|
|
serviceName += "--" + provider.Normalize(tag)
|
|
}
|
|
|
|
serviceName = strings.Replace(serviceName, ".", "-", -1)
|
|
serviceName = strings.Replace(serviceName, "=", "-", -1)
|
|
|
|
// unique int at the end
|
|
serviceName += "--" + strconv.Itoa(index)
|
|
return serviceName
|
|
}
|
|
|
|
func (p *CatalogProvider) getAttribute(name string, tags []string, defaultValue string) string {
|
|
for _, tag := range tags {
|
|
if strings.Index(strings.ToLower(tag), p.Prefix+".") == 0 {
|
|
if kv := strings.SplitN(tag[len(p.Prefix+"."):], "=", 2); len(kv) == 2 && strings.ToLower(kv[0]) == strings.ToLower(name) {
|
|
return kv[1]
|
|
}
|
|
}
|
|
}
|
|
return defaultValue
|
|
}
|
|
|
|
func (p *CatalogProvider) getContraintTags(tags []string) []string {
|
|
var list []string
|
|
|
|
for _, tag := range tags {
|
|
if strings.Index(strings.ToLower(tag), p.Prefix+".tags=") == 0 {
|
|
splitedTags := strings.Split(tag[len(p.Prefix+".tags="):], ",")
|
|
list = append(list, splitedTags...)
|
|
}
|
|
}
|
|
|
|
return list
|
|
}
|
|
|
|
func (p *CatalogProvider) buildConfig(catalog []catalogUpdate) *types.Configuration {
|
|
var FuncMap = template.FuncMap{
|
|
"getBackend": p.getBackend,
|
|
"getFrontendRule": p.getFrontendRule,
|
|
"getBackendName": p.getBackendName,
|
|
"getBackendAddress": p.getBackendAddress,
|
|
"getAttribute": p.getAttribute,
|
|
"getEntryPoints": p.getEntryPoints,
|
|
"hasMaxconnAttributes": p.hasMaxconnAttributes,
|
|
}
|
|
|
|
allNodes := []*api.ServiceEntry{}
|
|
services := []*serviceUpdate{}
|
|
for _, info := range catalog {
|
|
for _, node := range info.Nodes {
|
|
isEnabled := p.getAttribute("enable", node.Service.Tags, "true")
|
|
if isEnabled != "false" && len(info.Nodes) > 0 {
|
|
services = append(services, info.Service)
|
|
allNodes = append(allNodes, info.Nodes...)
|
|
break
|
|
}
|
|
|
|
}
|
|
}
|
|
// Ensure a stable ordering of nodes so that identical configurations may be detected
|
|
sort.Sort(nodeSorter(allNodes))
|
|
|
|
templateObjects := struct {
|
|
Services []*serviceUpdate
|
|
Nodes []*api.ServiceEntry
|
|
}{
|
|
Services: services,
|
|
Nodes: allNodes,
|
|
}
|
|
|
|
configuration, err := p.GetConfiguration("templates/consul_catalog.tmpl", FuncMap, templateObjects)
|
|
if err != nil {
|
|
log.WithError(err).Error("Failed to create config")
|
|
}
|
|
|
|
return configuration
|
|
}
|
|
|
|
func (p *CatalogProvider) hasMaxconnAttributes(attributes []string) bool {
|
|
amount := p.getAttribute("backend.maxconn.amount", attributes, "")
|
|
extractorfunc := p.getAttribute("backend.maxconn.extractorfunc", attributes, "")
|
|
if amount != "" && extractorfunc != "" {
|
|
return true
|
|
}
|
|
return false
|
|
}
|
|
|
|
func (p *CatalogProvider) getNodes(index map[string][]string) ([]catalogUpdate, error) {
|
|
visited := make(map[string]bool)
|
|
|
|
nodes := []catalogUpdate{}
|
|
for service := range index {
|
|
name := strings.ToLower(service)
|
|
if !strings.Contains(name, " ") && !visited[name] {
|
|
visited[name] = true
|
|
log.WithField("service", name).Debug("Fetching service")
|
|
healthy, err := p.healthyNodes(name)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
// healthy.Nodes can be empty if constraints do not match, without throwing error
|
|
if healthy.Service != nil && len(healthy.Nodes) > 0 {
|
|
nodes = append(nodes, healthy)
|
|
}
|
|
}
|
|
}
|
|
return nodes, nil
|
|
}
|
|
|
|
func (p *CatalogProvider) watch(configurationChan chan<- types.ConfigMessage, stop chan bool) error {
|
|
stopCh := make(chan struct{})
|
|
serviceCatalog := p.watchServices(stopCh)
|
|
|
|
defer close(stopCh)
|
|
|
|
for {
|
|
select {
|
|
case <-stop:
|
|
return nil
|
|
case index, ok := <-serviceCatalog:
|
|
if !ok {
|
|
return errors.New("Consul service list nil")
|
|
}
|
|
log.Debug("List of services changed")
|
|
nodes, err := p.getNodes(index)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
configuration := p.buildConfig(nodes)
|
|
configurationChan <- types.ConfigMessage{
|
|
ProviderName: "consul_catalog",
|
|
Configuration: configuration,
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
// Provide allows the consul catalog provider to provide configurations to traefik
|
|
// using the given configuration channel.
|
|
func (p *CatalogProvider) Provide(configurationChan chan<- types.ConfigMessage, pool *safe.Pool, constraints types.Constraints) error {
|
|
config := api.DefaultConfig()
|
|
config.Address = p.Endpoint
|
|
client, err := api.NewClient(config)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
p.client = client
|
|
p.Constraints = append(p.Constraints, constraints...)
|
|
|
|
pool.Go(func(stop chan bool) {
|
|
notify := func(err error, time time.Duration) {
|
|
log.Errorf("Consul connection error %+v, retrying in %s", err, time)
|
|
}
|
|
operation := func() error {
|
|
return p.watch(configurationChan, stop)
|
|
}
|
|
err := backoff.RetryNotify(safe.OperationWithRecover(operation), job.NewBackOff(backoff.NewExponentialBackOff()), notify)
|
|
if err != nil {
|
|
log.Errorf("Cannot connect to consul server %+v", err)
|
|
}
|
|
})
|
|
|
|
return err
|
|
}
|