1
0
mirror of https://github.com/containous/traefik.git synced 2025-01-22 22:03:51 +03:00
traefik/pkg/server/configurationwatcher_test.go

803 lines
22 KiB
Go

package server
import (
"context"
"fmt"
"strconv"
"sync"
"testing"
"time"
"github.com/stretchr/testify/assert"
"github.com/traefik/traefik/v2/pkg/config/dynamic"
"github.com/traefik/traefik/v2/pkg/provider/aggregator"
"github.com/traefik/traefik/v2/pkg/safe"
th "github.com/traefik/traefik/v2/pkg/testhelpers"
"github.com/traefik/traefik/v2/pkg/tls"
)
type mockProvider struct {
messages []dynamic.Message
wait time.Duration
first chan struct{}
throttleDuration time.Duration
}
func (p *mockProvider) Provide(configurationChan chan<- dynamic.Message, _ *safe.Pool) error {
wait := p.wait
if wait == 0 {
wait = 20 * time.Millisecond
}
if len(p.messages) == 0 {
return fmt.Errorf("no messages available")
}
configurationChan <- p.messages[0]
if p.first != nil {
<-p.first
}
for _, message := range p.messages[1:] {
time.Sleep(wait)
configurationChan <- message
}
return nil
}
// ThrottleDuration returns the throttle duration.
func (p *mockProvider) ThrottleDuration() time.Duration {
return p.throttleDuration
}
func (p *mockProvider) Init() error {
return nil
}
func TestNewConfigurationWatcher(t *testing.T) {
routinesPool := safe.NewPool(context.Background())
t.Cleanup(routinesPool.Stop)
pvd := &mockProvider{
messages: []dynamic.Message{{
ProviderName: "mock",
Configuration: &dynamic.Configuration{
HTTP: th.BuildConfiguration(
th.WithRouters(
th.WithRouter("test",
th.WithEntryPoints("e"),
th.WithServiceName("scv"))),
),
},
}},
}
watcher := NewConfigurationWatcher(routinesPool, pvd, []string{}, "")
run := make(chan struct{})
watcher.AddListener(func(conf dynamic.Configuration) {
expected := dynamic.Configuration{
HTTP: th.BuildConfiguration(
th.WithRouters(
th.WithRouter("test@mock",
th.WithEntryPoints("e"),
th.WithServiceName("scv"))),
th.WithMiddlewares(),
th.WithLoadBalancerServices(),
),
TCP: &dynamic.TCPConfiguration{
Routers: map[string]*dynamic.TCPRouter{},
Middlewares: map[string]*dynamic.TCPMiddleware{},
Services: map[string]*dynamic.TCPService{},
},
TLS: &dynamic.TLSConfiguration{
Options: map[string]tls.Options{
"default": tls.DefaultTLSOptions,
},
Stores: map[string]tls.Store{},
},
UDP: &dynamic.UDPConfiguration{
Routers: map[string]*dynamic.UDPRouter{},
Services: map[string]*dynamic.UDPService{},
},
}
assert.Equal(t, expected, conf)
close(run)
})
watcher.Start()
<-run
}
func TestWaitForRequiredProvider(t *testing.T) {
routinesPool := safe.NewPool(context.Background())
pvdAggregator := &mockProvider{
wait: 5 * time.Millisecond,
}
config := &dynamic.Configuration{
HTTP: th.BuildConfiguration(
th.WithRouters(th.WithRouter("foo", th.WithEntryPoints("ep"))),
th.WithLoadBalancerServices(th.WithService("bar")),
),
}
pvdAggregator.messages = append(pvdAggregator.messages, dynamic.Message{
ProviderName: "mock",
Configuration: config,
})
pvdAggregator.messages = append(pvdAggregator.messages, dynamic.Message{
ProviderName: "required",
Configuration: config,
})
pvdAggregator.messages = append(pvdAggregator.messages, dynamic.Message{
ProviderName: "mock2",
Configuration: config,
})
watcher := NewConfigurationWatcher(routinesPool, pvdAggregator, []string{}, "required")
publishedConfigCount := 0
watcher.AddListener(func(_ dynamic.Configuration) {
publishedConfigCount++
})
watcher.Start()
t.Cleanup(watcher.Stop)
t.Cleanup(routinesPool.Stop)
// give some time so that the configuration can be processed
time.Sleep(20 * time.Millisecond)
// after 20 milliseconds we should have 2 configs published
assert.Equal(t, 2, publishedConfigCount, "times configs were published")
}
func TestIgnoreTransientConfiguration(t *testing.T) {
routinesPool := safe.NewPool(context.Background())
config := &dynamic.Configuration{
HTTP: th.BuildConfiguration(
th.WithRouters(th.WithRouter("foo", th.WithEntryPoints("ep"))),
th.WithLoadBalancerServices(th.WithService("bar")),
),
}
config2 := &dynamic.Configuration{
HTTP: th.BuildConfiguration(
th.WithRouters(th.WithRouter("baz", th.WithEntryPoints("ep"))),
th.WithLoadBalancerServices(th.WithService("toto")),
),
}
watcher := NewConfigurationWatcher(routinesPool, &mockProvider{}, []string{}, "")
publishedConfigCount := 0
var lastConfig dynamic.Configuration
blockConfConsumer := make(chan struct{})
watcher.AddListener(func(config dynamic.Configuration) {
publishedConfigCount++
lastConfig = config
<-blockConfConsumer
})
watcher.Start()
t.Cleanup(watcher.Stop)
t.Cleanup(routinesPool.Stop)
watcher.allProvidersConfigs <- dynamic.Message{
ProviderName: "mock",
Configuration: config,
}
watcher.allProvidersConfigs <- dynamic.Message{
ProviderName: "mock",
Configuration: config2,
}
watcher.allProvidersConfigs <- dynamic.Message{
ProviderName: "mock",
Configuration: config,
}
// give some time before closing the channel.
time.Sleep(20 * time.Millisecond)
close(blockConfConsumer)
// give some time so that the configuration can be processed.
time.Sleep(20 * time.Millisecond)
// after 20 milliseconds we should have 1 config published.
assert.Equal(t, 1, publishedConfigCount, "times configs were published")
expected := dynamic.Configuration{
HTTP: th.BuildConfiguration(
th.WithRouters(th.WithRouter("foo@mock", th.WithEntryPoints("ep"))),
th.WithLoadBalancerServices(th.WithService("bar@mock")),
th.WithMiddlewares(),
),
TCP: &dynamic.TCPConfiguration{
Routers: map[string]*dynamic.TCPRouter{},
Middlewares: map[string]*dynamic.TCPMiddleware{},
Services: map[string]*dynamic.TCPService{},
},
UDP: &dynamic.UDPConfiguration{
Routers: map[string]*dynamic.UDPRouter{},
Services: map[string]*dynamic.UDPService{},
},
TLS: &dynamic.TLSConfiguration{
Options: map[string]tls.Options{
"default": tls.DefaultTLSOptions,
},
Stores: map[string]tls.Store{},
},
}
assert.Equal(t, expected, lastConfig)
}
func TestListenProvidersThrottleProviderConfigReload(t *testing.T) {
routinesPool := safe.NewPool(context.Background())
pvd := &mockProvider{
wait: 10 * time.Millisecond,
throttleDuration: 30 * time.Millisecond,
}
for i := 0; i < 5; i++ {
pvd.messages = append(pvd.messages, dynamic.Message{
ProviderName: "mock",
Configuration: &dynamic.Configuration{
HTTP: th.BuildConfiguration(
th.WithRouters(th.WithRouter("foo"+strconv.Itoa(i), th.WithEntryPoints("ep"))),
th.WithLoadBalancerServices(th.WithService("bar")),
),
},
})
}
providerAggregator := aggregator.ProviderAggregator{}
err := providerAggregator.AddProvider(pvd)
assert.NoError(t, err)
watcher := NewConfigurationWatcher(routinesPool, providerAggregator, []string{}, "")
publishedConfigCount := 0
watcher.AddListener(func(_ dynamic.Configuration) {
publishedConfigCount++
})
watcher.Start()
t.Cleanup(watcher.Stop)
t.Cleanup(routinesPool.Stop)
// Give some time so that the configuration can be processed.
time.Sleep(100 * time.Millisecond)
// To load 5 new configs it would require 150ms (5 configs * 30ms).
// In 100ms, we should only have time to load 3 configs.
assert.LessOrEqual(t, publishedConfigCount, 3, "config was applied too many times")
assert.Greater(t, publishedConfigCount, 0, "config was not applied at least once")
}
func TestListenProvidersSkipsEmptyConfigs(t *testing.T) {
routinesPool := safe.NewPool(context.Background())
pvd := &mockProvider{
messages: []dynamic.Message{{ProviderName: "mock"}},
}
watcher := NewConfigurationWatcher(routinesPool, pvd, []string{}, "")
watcher.AddListener(func(_ dynamic.Configuration) {
t.Error("An empty configuration was published but it should not")
})
watcher.Start()
t.Cleanup(watcher.Stop)
t.Cleanup(routinesPool.Stop)
// give some time so that the configuration can be processed
time.Sleep(100 * time.Millisecond)
}
func TestListenProvidersSkipsSameConfigurationForProvider(t *testing.T) {
routinesPool := safe.NewPool(context.Background())
message := dynamic.Message{
ProviderName: "mock",
Configuration: &dynamic.Configuration{
HTTP: th.BuildConfiguration(
th.WithRouters(th.WithRouter("foo", th.WithEntryPoints("ep"))),
th.WithLoadBalancerServices(th.WithService("bar")),
),
},
}
pvd := &mockProvider{
messages: []dynamic.Message{message, message},
}
watcher := NewConfigurationWatcher(routinesPool, pvd, []string{}, "")
var configurationReloads int
watcher.AddListener(func(_ dynamic.Configuration) {
configurationReloads++
})
watcher.Start()
t.Cleanup(watcher.Stop)
t.Cleanup(routinesPool.Stop)
// give some time so that the configuration can be processed
time.Sleep(100 * time.Millisecond)
assert.Equal(t, 1, configurationReloads, "Same configuration should not be published multiple times")
}
func TestListenProvidersDoesNotSkipFlappingConfiguration(t *testing.T) {
routinesPool := safe.NewPool(context.Background())
configuration := &dynamic.Configuration{
HTTP: th.BuildConfiguration(
th.WithRouters(th.WithRouter("foo", th.WithEntryPoints("ep"))),
th.WithLoadBalancerServices(th.WithService("bar")),
),
}
transientConfiguration := &dynamic.Configuration{
HTTP: th.BuildConfiguration(
th.WithRouters(th.WithRouter("bad", th.WithEntryPoints("ep"))),
th.WithLoadBalancerServices(th.WithService("bad")),
),
}
pvd := &mockProvider{
wait: 5 * time.Millisecond, // The last message needs to be received before the second has been fully processed
throttleDuration: 15 * time.Millisecond,
messages: []dynamic.Message{
{ProviderName: "mock", Configuration: configuration},
{ProviderName: "mock", Configuration: transientConfiguration},
{ProviderName: "mock", Configuration: configuration},
},
}
watcher := NewConfigurationWatcher(routinesPool, pvd, []string{}, "")
var lastConfig dynamic.Configuration
watcher.AddListener(func(conf dynamic.Configuration) {
lastConfig = conf
})
watcher.Start()
t.Cleanup(watcher.Stop)
t.Cleanup(routinesPool.Stop)
// give some time so that the configuration can be processed
time.Sleep(100 * time.Millisecond)
expected := dynamic.Configuration{
HTTP: th.BuildConfiguration(
th.WithRouters(th.WithRouter("foo@mock", th.WithEntryPoints("ep"))),
th.WithLoadBalancerServices(th.WithService("bar@mock")),
th.WithMiddlewares(),
),
TCP: &dynamic.TCPConfiguration{
Routers: map[string]*dynamic.TCPRouter{},
Middlewares: map[string]*dynamic.TCPMiddleware{},
Services: map[string]*dynamic.TCPService{},
},
UDP: &dynamic.UDPConfiguration{
Routers: map[string]*dynamic.UDPRouter{},
Services: map[string]*dynamic.UDPService{},
},
TLS: &dynamic.TLSConfiguration{
Options: map[string]tls.Options{
"default": tls.DefaultTLSOptions,
},
Stores: map[string]tls.Store{},
},
}
assert.Equal(t, expected, lastConfig)
}
func TestListenProvidersIgnoreSameConfig(t *testing.T) {
routinesPool := safe.NewPool(context.Background())
configuration := &dynamic.Configuration{
HTTP: th.BuildConfiguration(
th.WithRouters(th.WithRouter("foo", th.WithEntryPoints("ep"))),
th.WithLoadBalancerServices(th.WithService("bar")),
),
}
transientConfiguration := &dynamic.Configuration{
HTTP: th.BuildConfiguration(
th.WithRouters(th.WithRouter("bad", th.WithEntryPoints("ep"))),
th.WithLoadBalancerServices(th.WithService("bad")),
),
}
// The transient configuration is sent alternatively with the configuration we want to be applied.
// It is intended to show that even if the configurations are different,
// those transient configurations will be ignored if they are sent in a time frame
// lower than the provider throttle duration.
pvd := &mockProvider{
wait: 1 * time.Microsecond, // Enqueue them fast
throttleDuration: time.Millisecond,
first: make(chan struct{}),
messages: []dynamic.Message{
{ProviderName: "mock", Configuration: configuration},
{ProviderName: "mock", Configuration: transientConfiguration},
{ProviderName: "mock", Configuration: configuration},
{ProviderName: "mock", Configuration: transientConfiguration},
{ProviderName: "mock", Configuration: configuration},
},
}
providerAggregator := aggregator.ProviderAggregator{}
err := providerAggregator.AddProvider(pvd)
assert.NoError(t, err)
watcher := NewConfigurationWatcher(routinesPool, providerAggregator, []string{}, "")
var configurationReloads int
var lastConfig dynamic.Configuration
var once sync.Once
watcher.AddListener(func(conf dynamic.Configuration) {
configurationReloads++
lastConfig = conf
// Allows next configurations to be sent by the mock provider as soon as the first configuration message is applied.
once.Do(func() {
pvd.first <- struct{}{}
// Wait for all configuration messages to pile in
time.Sleep(5 * time.Millisecond)
})
})
watcher.Start()
t.Cleanup(watcher.Stop)
t.Cleanup(routinesPool.Stop)
// Wait long enough
time.Sleep(50 * time.Millisecond)
expected := dynamic.Configuration{
HTTP: th.BuildConfiguration(
th.WithRouters(th.WithRouter("foo@mock", th.WithEntryPoints("ep"))),
th.WithLoadBalancerServices(th.WithService("bar@mock")),
th.WithMiddlewares(),
),
TCP: &dynamic.TCPConfiguration{
Routers: map[string]*dynamic.TCPRouter{},
Middlewares: map[string]*dynamic.TCPMiddleware{},
Services: map[string]*dynamic.TCPService{},
},
UDP: &dynamic.UDPConfiguration{
Routers: map[string]*dynamic.UDPRouter{},
Services: map[string]*dynamic.UDPService{},
},
TLS: &dynamic.TLSConfiguration{
Options: map[string]tls.Options{
"default": tls.DefaultTLSOptions,
},
Stores: map[string]tls.Store{},
},
}
assert.Equal(t, expected, lastConfig)
assert.Equal(t, 1, configurationReloads)
}
func TestApplyConfigUnderStress(t *testing.T) {
routinesPool := safe.NewPool(context.Background())
watcher := NewConfigurationWatcher(routinesPool, &mockProvider{}, []string{}, "")
routinesPool.GoCtx(func(ctx context.Context) {
i := 0
for {
select {
case <-ctx.Done():
return
case watcher.allProvidersConfigs <- dynamic.Message{ProviderName: "mock", Configuration: &dynamic.Configuration{
HTTP: th.BuildConfiguration(
th.WithRouters(th.WithRouter("foo"+strconv.Itoa(i), th.WithEntryPoints("ep"))),
th.WithLoadBalancerServices(th.WithService("bar")),
),
}}:
}
i++
}
})
var configurationReloads int
watcher.AddListener(func(conf dynamic.Configuration) {
configurationReloads++
})
watcher.Start()
t.Cleanup(watcher.Stop)
t.Cleanup(routinesPool.Stop)
time.Sleep(100 * time.Millisecond)
// Ensure that at least two configurations have been applied
// if we simulate being spammed configuration changes by the provider(s).
// In theory, checking at least one would be sufficient,
// but checking for two also ensures that we're looping properly,
// and that the whole algo holds, etc.
t.Log(configurationReloads)
assert.GreaterOrEqual(t, configurationReloads, 2)
}
func TestListenProvidersIgnoreIntermediateConfigs(t *testing.T) {
routinesPool := safe.NewPool(context.Background())
configuration := &dynamic.Configuration{
HTTP: th.BuildConfiguration(
th.WithRouters(th.WithRouter("foo", th.WithEntryPoints("ep"))),
th.WithLoadBalancerServices(th.WithService("bar")),
),
}
transientConfiguration := &dynamic.Configuration{
HTTP: th.BuildConfiguration(
th.WithRouters(th.WithRouter("bad", th.WithEntryPoints("ep"))),
th.WithLoadBalancerServices(th.WithService("bad")),
),
}
transientConfiguration2 := &dynamic.Configuration{
HTTP: th.BuildConfiguration(
th.WithRouters(th.WithRouter("bad2", th.WithEntryPoints("ep"))),
th.WithLoadBalancerServices(th.WithService("bad2")),
),
}
finalConfiguration := &dynamic.Configuration{
HTTP: th.BuildConfiguration(
th.WithRouters(th.WithRouter("final", th.WithEntryPoints("ep"))),
th.WithLoadBalancerServices(th.WithService("final")),
),
}
pvd := &mockProvider{
wait: 10 * time.Microsecond, // Enqueue them fast
throttleDuration: 10 * time.Millisecond,
messages: []dynamic.Message{
{ProviderName: "mock", Configuration: configuration},
{ProviderName: "mock", Configuration: transientConfiguration},
{ProviderName: "mock", Configuration: transientConfiguration2},
{ProviderName: "mock", Configuration: finalConfiguration},
},
}
providerAggregator := aggregator.ProviderAggregator{}
err := providerAggregator.AddProvider(pvd)
assert.NoError(t, err)
watcher := NewConfigurationWatcher(routinesPool, providerAggregator, []string{}, "")
var configurationReloads int
var lastConfig dynamic.Configuration
watcher.AddListener(func(conf dynamic.Configuration) {
configurationReloads++
lastConfig = conf
})
watcher.Start()
t.Cleanup(watcher.Stop)
t.Cleanup(routinesPool.Stop)
// Wait long enough
time.Sleep(500 * time.Millisecond)
expected := dynamic.Configuration{
HTTP: th.BuildConfiguration(
th.WithRouters(th.WithRouter("final@mock", th.WithEntryPoints("ep"))),
th.WithLoadBalancerServices(th.WithService("final@mock")),
th.WithMiddlewares(),
),
TCP: &dynamic.TCPConfiguration{
Routers: map[string]*dynamic.TCPRouter{},
Middlewares: map[string]*dynamic.TCPMiddleware{},
Services: map[string]*dynamic.TCPService{},
},
UDP: &dynamic.UDPConfiguration{
Routers: map[string]*dynamic.UDPRouter{},
Services: map[string]*dynamic.UDPService{},
},
TLS: &dynamic.TLSConfiguration{
Options: map[string]tls.Options{
"default": tls.DefaultTLSOptions,
},
Stores: map[string]tls.Store{},
},
}
assert.Equal(t, expected, lastConfig)
assert.Equal(t, 2, configurationReloads)
}
func TestListenProvidersPublishesConfigForEachProvider(t *testing.T) {
routinesPool := safe.NewPool(context.Background())
configuration := &dynamic.Configuration{
HTTP: th.BuildConfiguration(
th.WithRouters(th.WithRouter("foo", th.WithEntryPoints("ep"))),
th.WithLoadBalancerServices(th.WithService("bar")),
),
}
pvd := &mockProvider{
messages: []dynamic.Message{
{ProviderName: "mock", Configuration: configuration},
{ProviderName: "mock2", Configuration: configuration},
},
}
watcher := NewConfigurationWatcher(routinesPool, pvd, []string{}, "")
var publishedProviderConfig dynamic.Configuration
watcher.AddListener(func(conf dynamic.Configuration) {
publishedProviderConfig = conf
})
watcher.Start()
t.Cleanup(watcher.Stop)
t.Cleanup(routinesPool.Stop)
// give some time so that the configuration can be processed
time.Sleep(100 * time.Millisecond)
expected := dynamic.Configuration{
HTTP: th.BuildConfiguration(
th.WithRouters(
th.WithRouter("foo@mock", th.WithEntryPoints("ep")),
th.WithRouter("foo@mock2", th.WithEntryPoints("ep")),
),
th.WithLoadBalancerServices(
th.WithService("bar@mock"),
th.WithService("bar@mock2"),
),
th.WithMiddlewares(),
),
TCP: &dynamic.TCPConfiguration{
Routers: map[string]*dynamic.TCPRouter{},
Middlewares: map[string]*dynamic.TCPMiddleware{},
Services: map[string]*dynamic.TCPService{},
},
TLS: &dynamic.TLSConfiguration{
Options: map[string]tls.Options{
"default": tls.DefaultTLSOptions,
},
Stores: map[string]tls.Store{},
},
UDP: &dynamic.UDPConfiguration{
Routers: map[string]*dynamic.UDPRouter{},
Services: map[string]*dynamic.UDPService{},
},
}
assert.Equal(t, expected, publishedProviderConfig)
}
func TestPublishConfigUpdatedByProvider(t *testing.T) {
routinesPool := safe.NewPool(context.Background())
pvdConfiguration := dynamic.Configuration{
TCP: &dynamic.TCPConfiguration{
Routers: map[string]*dynamic.TCPRouter{
"foo": {},
},
},
}
pvd := &mockProvider{
wait: 10 * time.Millisecond,
messages: []dynamic.Message{
{
ProviderName: "mock",
Configuration: &pvdConfiguration,
},
{
ProviderName: "mock",
Configuration: &pvdConfiguration,
},
},
}
watcher := NewConfigurationWatcher(routinesPool, pvd, []string{}, "")
publishedConfigCount := 0
watcher.AddListener(func(configuration dynamic.Configuration) {
publishedConfigCount++
// Update the provider configuration published in next dynamic Message which should trigger a new publishing.
pvdConfiguration.TCP.Routers["bar"] = &dynamic.TCPRouter{}
})
watcher.Start()
t.Cleanup(watcher.Stop)
t.Cleanup(routinesPool.Stop)
// give some time so that the configuration can be processed.
time.Sleep(100 * time.Millisecond)
assert.Equal(t, 2, publishedConfigCount)
}
func TestPublishConfigUpdatedByConfigWatcherListener(t *testing.T) {
routinesPool := safe.NewPool(context.Background())
pvd := &mockProvider{
wait: 10 * time.Millisecond,
messages: []dynamic.Message{
{
ProviderName: "mock",
Configuration: &dynamic.Configuration{
TCP: &dynamic.TCPConfiguration{
Routers: map[string]*dynamic.TCPRouter{
"foo": {},
},
},
},
},
{
ProviderName: "mock",
Configuration: &dynamic.Configuration{
TCP: &dynamic.TCPConfiguration{
Routers: map[string]*dynamic.TCPRouter{
"foo": {},
},
},
},
},
},
}
watcher := NewConfigurationWatcher(routinesPool, pvd, []string{}, "")
publishedConfigCount := 0
watcher.AddListener(func(configuration dynamic.Configuration) {
publishedConfigCount++
// Modify the provided configuration.
// This should not modify the configuration stored in the configuration watcher and therefore there will be no new publishing.
configuration.TCP.Routers["foo@mock"].Rule = "bar"
})
watcher.Start()
t.Cleanup(watcher.Stop)
t.Cleanup(routinesPool.Stop)
// give some time so that the configuration can be processed.
time.Sleep(100 * time.Millisecond)
assert.Equal(t, 1, publishedConfigCount)
}