mirror of
https://github.com/containous/traefik.git
synced 2025-01-11 05:17:52 +03:00
Reflect changes in catalog healthy nodes in healthCheck watch
This commit is contained in:
parent
6bc59f8b33
commit
1d7bf200a8
@ -1,6 +1,7 @@
|
|||||||
package integration
|
package integration
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"bytes"
|
||||||
"fmt"
|
"fmt"
|
||||||
"net/http"
|
"net/http"
|
||||||
"time"
|
"time"
|
||||||
@ -41,7 +42,7 @@ func (s *ConsulCatalogSuite) waitToElectConsulLeader() error {
|
|||||||
leader, err := s.consulClient.Status().Leader()
|
leader, err := s.consulClient.Status().Leader()
|
||||||
|
|
||||||
if err != nil || len(leader) == 0 {
|
if err != nil || len(leader) == 0 {
|
||||||
return fmt.Errorf("Leader not found. %v", err)
|
return fmt.Errorf("leader not found. %v", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
@ -55,9 +56,6 @@ func (s *ConsulCatalogSuite) createConsulClient(config *api.Config, c *check.C)
|
|||||||
s.consulClient = consulClient
|
s.consulClient = consulClient
|
||||||
return consulClient
|
return consulClient
|
||||||
}
|
}
|
||||||
func (s *ConsulCatalogSuite) startConsulService(c *check.C) {
|
|
||||||
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s *ConsulCatalogSuite) registerService(name string, address string, port int, tags []string) error {
|
func (s *ConsulCatalogSuite) registerService(name string, address string, port int, tags []string) error {
|
||||||
catalog := s.consulClient.Catalog()
|
catalog := s.consulClient.Catalog()
|
||||||
@ -80,7 +78,7 @@ func (s *ConsulCatalogSuite) registerService(name string, address string, port i
|
|||||||
|
|
||||||
func (s *ConsulCatalogSuite) registerAgentService(name string, address string, port int, tags []string) error {
|
func (s *ConsulCatalogSuite) registerAgentService(name string, address string, port int, tags []string) error {
|
||||||
agent := s.consulClient.Agent()
|
agent := s.consulClient.Agent()
|
||||||
err := agent.ServiceRegister(
|
return agent.ServiceRegister(
|
||||||
&api.AgentServiceRegistration{
|
&api.AgentServiceRegistration{
|
||||||
ID: address,
|
ID: address,
|
||||||
Tags: tags,
|
Tags: tags,
|
||||||
@ -93,13 +91,24 @@ func (s *ConsulCatalogSuite) registerAgentService(name string, address string, p
|
|||||||
},
|
},
|
||||||
},
|
},
|
||||||
)
|
)
|
||||||
return err
|
}
|
||||||
|
|
||||||
|
func (s *ConsulCatalogSuite) registerCheck(name string, address string, port int) error {
|
||||||
|
agent := s.consulClient.Agent()
|
||||||
|
checkRegistration := &api.AgentCheckRegistration{
|
||||||
|
ID: fmt.Sprintf("%s-%s", name, address),
|
||||||
|
Name: name,
|
||||||
|
ServiceID: address,
|
||||||
|
}
|
||||||
|
checkRegistration.HTTP = fmt.Sprintf("http://%s:%d/health", address, port)
|
||||||
|
checkRegistration.Interval = "2s"
|
||||||
|
checkRegistration.CheckID = address
|
||||||
|
return agent.CheckRegister(checkRegistration)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *ConsulCatalogSuite) deregisterAgentService(address string) error {
|
func (s *ConsulCatalogSuite) deregisterAgentService(address string) error {
|
||||||
agent := s.consulClient.Agent()
|
agent := s.consulClient.Agent()
|
||||||
err := agent.ServiceDeregister(address)
|
return agent.ServiceDeregister(address)
|
||||||
return err
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *ConsulCatalogSuite) deregisterService(name string, address string) error {
|
func (s *ConsulCatalogSuite) deregisterService(name string, address string) error {
|
||||||
@ -514,3 +523,76 @@ func (s *ConsulCatalogSuite) TestRetryWithConsulServer(c *check.C) {
|
|||||||
err = try.Request(req, 10*time.Second, try.StatusCodeIs(http.StatusOK), try.HasBody())
|
err = try.Request(req, 10*time.Second, try.StatusCodeIs(http.StatusOK), try.HasBody())
|
||||||
c.Assert(err, checker.IsNil)
|
c.Assert(err, checker.IsNil)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (s *ConsulCatalogSuite) TestServiceWithMultipleHealthCheck(c *check.C) {
|
||||||
|
//Scale consul to 0 to be able to start traefik before and test retry
|
||||||
|
s.composeProject.Scale(c, "consul", 0)
|
||||||
|
|
||||||
|
cmd, display := s.traefikCmd(
|
||||||
|
withConfigFile("fixtures/consul_catalog/simple.toml"),
|
||||||
|
"--consulCatalog",
|
||||||
|
"--consulCatalog.watch=false",
|
||||||
|
"--consulCatalog.exposedByDefault=true",
|
||||||
|
"--consulCatalog.endpoint="+s.consulIP+":8500",
|
||||||
|
"--consulCatalog.domain=consul.localhost")
|
||||||
|
defer display(c)
|
||||||
|
err := cmd.Start()
|
||||||
|
c.Assert(err, checker.IsNil)
|
||||||
|
defer cmd.Process.Kill()
|
||||||
|
|
||||||
|
// Wait for Traefik to turn ready.
|
||||||
|
err = try.GetRequest("http://127.0.0.1:8000/", 2*time.Second, try.StatusCodeIs(http.StatusNotFound))
|
||||||
|
c.Assert(err, checker.IsNil)
|
||||||
|
|
||||||
|
req, err := http.NewRequest(http.MethodGet, "http://127.0.0.1:8000/", nil)
|
||||||
|
c.Assert(err, checker.IsNil)
|
||||||
|
req.Host = "test.consul.localhost"
|
||||||
|
|
||||||
|
// Request should fail
|
||||||
|
err = try.Request(req, 2*time.Second, try.StatusCodeIs(http.StatusNotFound), try.HasBody())
|
||||||
|
c.Assert(err, checker.IsNil)
|
||||||
|
|
||||||
|
// Scale consul to 1
|
||||||
|
s.composeProject.Scale(c, "consul", 1)
|
||||||
|
s.waitToElectConsulLeader()
|
||||||
|
|
||||||
|
whoami := s.composeProject.Container(c, "whoami1")
|
||||||
|
// Register service
|
||||||
|
err = s.registerAgentService("test", whoami.NetworkSettings.IPAddress, 80, []string{"name=whoami1"})
|
||||||
|
c.Assert(err, checker.IsNil, check.Commentf("Error registering agent service"))
|
||||||
|
defer s.deregisterAgentService(whoami.NetworkSettings.IPAddress)
|
||||||
|
|
||||||
|
// Register one healthcheck
|
||||||
|
err = s.registerCheck("test", whoami.NetworkSettings.IPAddress, 80)
|
||||||
|
c.Assert(err, checker.IsNil, check.Commentf("Error registering check"))
|
||||||
|
|
||||||
|
// Provider consul catalog should be present
|
||||||
|
err = try.GetRequest("http://127.0.0.1:8080/api/providers", 10*time.Second, try.BodyContains("consul_catalog"))
|
||||||
|
c.Assert(err, checker.IsNil)
|
||||||
|
|
||||||
|
// Should be ok
|
||||||
|
err = try.Request(req, 10*time.Second, try.StatusCodeIs(http.StatusOK), try.HasBody())
|
||||||
|
c.Assert(err, checker.IsNil)
|
||||||
|
|
||||||
|
// Change health value of service to critical
|
||||||
|
reqHealth, err := http.NewRequest(http.MethodPost, fmt.Sprintf("http://%s:80/health", whoami.NetworkSettings.IPAddress), bytes.NewBuffer([]byte("500")))
|
||||||
|
c.Assert(err, checker.IsNil)
|
||||||
|
reqHealth.Host = "test.consul.localhost"
|
||||||
|
|
||||||
|
err = try.Request(reqHealth, 10*time.Second, try.StatusCodeIs(http.StatusOK))
|
||||||
|
c.Assert(err, checker.IsNil)
|
||||||
|
|
||||||
|
// Should be a 404
|
||||||
|
err = try.Request(req, 10*time.Second, try.StatusCodeIs(http.StatusNotFound), try.HasBody())
|
||||||
|
c.Assert(err, checker.IsNil)
|
||||||
|
|
||||||
|
// Change health value of service to passing
|
||||||
|
reqHealth, err = http.NewRequest(http.MethodPost, fmt.Sprintf("http://%s:80/health", whoami.NetworkSettings.IPAddress), bytes.NewBuffer([]byte("200")))
|
||||||
|
c.Assert(err, checker.IsNil)
|
||||||
|
err = try.Request(reqHealth, 10*time.Second, try.StatusCodeIs(http.StatusOK))
|
||||||
|
c.Assert(err, checker.IsNil)
|
||||||
|
|
||||||
|
// Should be a 200
|
||||||
|
err = try.Request(req, 10*time.Second, try.StatusCodeIs(http.StatusOK), try.HasBody())
|
||||||
|
c.Assert(err, checker.IsNil)
|
||||||
|
}
|
||||||
|
@ -2,6 +2,7 @@ package consulcatalog
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"errors"
|
"errors"
|
||||||
|
"fmt"
|
||||||
"strconv"
|
"strconv"
|
||||||
"strings"
|
"strings"
|
||||||
"sync"
|
"sync"
|
||||||
@ -255,7 +256,7 @@ func (p *Provider) watchHealthState(stopCh <-chan struct{}, watchCh chan<- map[s
|
|||||||
|
|
||||||
safe.Go(func() {
|
safe.Go(func() {
|
||||||
// variable to hold previous state
|
// variable to hold previous state
|
||||||
var flashback []string
|
var flashback map[string][]string
|
||||||
|
|
||||||
options := &api.QueryOptions{WaitTime: DefaultWatchWaitTime}
|
options := &api.QueryOptions{WaitTime: DefaultWatchWaitTime}
|
||||||
|
|
||||||
@ -267,19 +268,28 @@ func (p *Provider) watchHealthState(stopCh <-chan struct{}, watchCh chan<- map[s
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Listening to changes that leads to `passing` state or degrades from it.
|
// Listening to changes that leads to `passing` state or degrades from it.
|
||||||
healthyState, meta, err := health.State("passing", options)
|
healthyState, meta, err := health.State("any", options)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.WithError(err).Error("Failed to retrieve health checks")
|
log.WithError(err).Error("Failed to retrieve health checks")
|
||||||
notifyError(err)
|
notifyError(err)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
var current []string
|
var current = make(map[string][]string)
|
||||||
|
var currentFailing = make(map[string]*api.HealthCheck)
|
||||||
if healthyState != nil {
|
if healthyState != nil {
|
||||||
for _, healthy := range healthyState {
|
for _, healthy := range healthyState {
|
||||||
current = append(current, healthy.ServiceID)
|
key := fmt.Sprintf("%s-%s", healthy.Node, healthy.ServiceID)
|
||||||
|
_, failing := currentFailing[key]
|
||||||
|
if healthy.Status == "passing" && !failing {
|
||||||
|
current[key] = append(current[key], healthy.Node)
|
||||||
|
} else {
|
||||||
|
currentFailing[key] = healthy
|
||||||
|
if _, ok := current[key]; ok {
|
||||||
|
delete(current, key)
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// If LastIndex didn't change then it means `Get` returned
|
// If LastIndex didn't change then it means `Get` returned
|
||||||
@ -302,7 +312,7 @@ func (p *Provider) watchHealthState(stopCh <-chan struct{}, watchCh chan<- map[s
|
|||||||
// A critical note is that the return of a blocking request is no guarantee of a change.
|
// A critical note is that the return of a blocking request is no guarantee of a change.
|
||||||
// It is possible that there was an idempotent write that does not affect the result of the query.
|
// It is possible that there was an idempotent write that does not affect the result of the query.
|
||||||
// Thus it is required to do extra check for changes...
|
// Thus it is required to do extra check for changes...
|
||||||
addedKeys, removedKeys := getChangedStringKeys(current, flashback)
|
addedKeys, removedKeys, changedKeys := getChangedHealth(current, flashback)
|
||||||
|
|
||||||
if len(addedKeys) > 0 {
|
if len(addedKeys) > 0 {
|
||||||
log.WithField("DiscoveredServices", addedKeys).Debug("Health State change detected.")
|
log.WithField("DiscoveredServices", addedKeys).Debug("Health State change detected.")
|
||||||
@ -315,6 +325,13 @@ func (p *Provider) watchHealthState(stopCh <-chan struct{}, watchCh chan<- map[s
|
|||||||
watchCh <- data
|
watchCh <- data
|
||||||
flashback = current
|
flashback = current
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if len(changedKeys) > 0 {
|
||||||
|
log.WithField("ChangedServices", changedKeys).Debug("Health State change detected.")
|
||||||
|
watchCh <- data
|
||||||
|
flashback = current
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
@ -394,6 +411,27 @@ func getChangedStringKeys(currState []string, prevState []string) ([]string, []s
|
|||||||
return fun.Keys(addedKeys).([]string), fun.Keys(removedKeys).([]string)
|
return fun.Keys(addedKeys).([]string), fun.Keys(removedKeys).([]string)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func getChangedHealth(current map[string][]string, previous map[string][]string) ([]string, []string, []string) {
|
||||||
|
currKeySet := fun.Set(fun.Keys(current).([]string)).(map[string]bool)
|
||||||
|
prevKeySet := fun.Set(fun.Keys(previous).([]string)).(map[string]bool)
|
||||||
|
|
||||||
|
addedKeys := fun.Difference(currKeySet, prevKeySet).(map[string]bool)
|
||||||
|
removedKeys := fun.Difference(prevKeySet, currKeySet).(map[string]bool)
|
||||||
|
|
||||||
|
var changedKeys []string
|
||||||
|
|
||||||
|
for key, value := range current {
|
||||||
|
if prevValue, ok := previous[key]; ok {
|
||||||
|
addedNodesKeys, removedNodesKeys := getChangedStringKeys(value, prevValue)
|
||||||
|
if len(addedNodesKeys) > 0 || len(removedNodesKeys) > 0 {
|
||||||
|
changedKeys = append(changedKeys, key)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return fun.Keys(addedKeys).([]string), fun.Keys(removedKeys).([]string), changedKeys
|
||||||
|
}
|
||||||
|
|
||||||
func getChangedIntKeys(currState []int, prevState []int) ([]int, []int) {
|
func getChangedIntKeys(currState []int, prevState []int) ([]int, []int) {
|
||||||
currKeySet := fun.Set(currState).(map[int]bool)
|
currKeySet := fun.Set(currState).(map[int]bool)
|
||||||
prevKeySet := fun.Set(prevState).(map[int]bool)
|
prevKeySet := fun.Set(prevState).(map[int]bool)
|
||||||
|
Loading…
Reference in New Issue
Block a user