1
0
mirror of https://github.com/containous/traefik.git synced 2024-12-23 17:34:13 +03:00
traefik/pkg/server/configurationwatcher.go

249 lines
7.3 KiB
Go

package server
import (
"context"
"encoding/json"
"reflect"
"github.com/rs/zerolog"
"github.com/rs/zerolog/log"
"github.com/traefik/traefik/v3/pkg/config/dynamic"
"github.com/traefik/traefik/v3/pkg/logs"
"github.com/traefik/traefik/v3/pkg/provider"
"github.com/traefik/traefik/v3/pkg/safe"
"github.com/traefik/traefik/v3/pkg/tls"
"github.com/traefik/traefik/v3/pkg/types"
)
// ConfigurationWatcher watches configuration changes.
type ConfigurationWatcher struct {
providerAggregator provider.Provider
defaultEntryPoints []string
allProvidersConfigs chan dynamic.Message
newConfigs chan dynamic.Configurations
requiredProvider string
configurationListeners []func(dynamic.Configuration)
routinesPool *safe.Pool
}
// NewConfigurationWatcher creates a new ConfigurationWatcher.
func NewConfigurationWatcher(
routinesPool *safe.Pool,
pvd provider.Provider,
defaultEntryPoints []string,
requiredProvider string,
) *ConfigurationWatcher {
return &ConfigurationWatcher{
providerAggregator: pvd,
allProvidersConfigs: make(chan dynamic.Message, 100),
newConfigs: make(chan dynamic.Configurations),
routinesPool: routinesPool,
defaultEntryPoints: defaultEntryPoints,
requiredProvider: requiredProvider,
}
}
// Start the configuration watcher.
func (c *ConfigurationWatcher) Start() {
c.routinesPool.GoCtx(c.receiveConfigurations)
c.routinesPool.GoCtx(c.applyConfigurations)
c.startProviderAggregator()
}
// Stop the configuration watcher.
func (c *ConfigurationWatcher) Stop() {
close(c.allProvidersConfigs)
close(c.newConfigs)
}
// AddListener adds a new listener function used when new configuration is provided.
func (c *ConfigurationWatcher) AddListener(listener func(dynamic.Configuration)) {
if c.configurationListeners == nil {
c.configurationListeners = make([]func(dynamic.Configuration), 0)
}
c.configurationListeners = append(c.configurationListeners, listener)
}
func (c *ConfigurationWatcher) startProviderAggregator() {
log.Info().Msgf("Starting provider aggregator %T", c.providerAggregator)
safe.Go(func() {
err := c.providerAggregator.Provide(c.allProvidersConfigs, c.routinesPool)
if err != nil {
log.Error().Err(err).Msgf("Error starting provider aggregator %T", c.providerAggregator)
}
})
}
// receiveConfigurations receives configuration changes from the providers.
// The configuration message then gets passed along a series of check, notably
// to verify that, for a given provider, the configuration that was just received
// is at least different from the previously received one.
// The full set of configurations is then sent to the throttling goroutine,
// (throttleAndApplyConfigurations) via a RingChannel, which ensures that we can
// constantly send in a non-blocking way to the throttling goroutine the last
// global state we are aware of.
func (c *ConfigurationWatcher) receiveConfigurations(ctx context.Context) {
newConfigurations := make(dynamic.Configurations)
var output chan dynamic.Configurations
for {
select {
case <-ctx.Done():
return
// DeepCopy is necessary because newConfigurations gets modified later by the consumer of c.newConfigs
case output <- newConfigurations.DeepCopy():
output = nil
default:
select {
case <-ctx.Done():
return
case configMsg, ok := <-c.allProvidersConfigs:
if !ok {
return
}
logger := log.Ctx(ctx).With().Str(logs.ProviderName, configMsg.ProviderName).Logger()
if configMsg.Configuration == nil {
logger.Debug().Msg("Skipping nil configuration")
continue
}
if isEmptyConfiguration(configMsg.Configuration) {
logger.Debug().Msg("Skipping empty configuration")
continue
}
logConfiguration(logger, configMsg)
if reflect.DeepEqual(newConfigurations[configMsg.ProviderName], configMsg.Configuration) {
// no change, do nothing
logger.Debug().Msg("Skipping unchanged configuration")
continue
}
newConfigurations[configMsg.ProviderName] = configMsg.Configuration.DeepCopy()
output = c.newConfigs
// DeepCopy is necessary because newConfigurations gets modified later by the consumer of c.newConfigs
case output <- newConfigurations.DeepCopy():
output = nil
}
}
}
}
// applyConfigurations blocks on a RingChannel that receives the new
// set of configurations that is compiled and sent by receiveConfigurations as soon
// as a provider change occurs. If the new set is different from the previous set
// that had been applied, the new set is applied, and we sleep for a while before
// listening on the channel again.
func (c *ConfigurationWatcher) applyConfigurations(ctx context.Context) {
var lastConfigurations dynamic.Configurations
for {
select {
case <-ctx.Done():
return
case newConfigs, ok := <-c.newConfigs:
if !ok {
return
}
// We wait for first configuration of the required provider before applying configurations.
if _, ok := newConfigs[c.requiredProvider]; c.requiredProvider != "" && !ok {
continue
}
if reflect.DeepEqual(newConfigs, lastConfigurations) {
continue
}
conf := mergeConfiguration(newConfigs.DeepCopy(), c.defaultEntryPoints)
conf = applyModel(conf)
for _, listener := range c.configurationListeners {
listener(conf)
}
lastConfigurations = newConfigs
}
}
}
func logConfiguration(logger zerolog.Logger, configMsg dynamic.Message) {
if logger.GetLevel() > zerolog.DebugLevel {
return
}
copyConf := configMsg.Configuration.DeepCopy()
if copyConf.TLS != nil {
copyConf.TLS.Certificates = nil
if copyConf.TLS.Options != nil {
cleanedOptions := make(map[string]tls.Options, len(copyConf.TLS.Options))
for name, option := range copyConf.TLS.Options {
option.ClientAuth.CAFiles = []types.FileOrContent{}
cleanedOptions[name] = option
}
copyConf.TLS.Options = cleanedOptions
}
for k := range copyConf.TLS.Stores {
st := copyConf.TLS.Stores[k]
st.DefaultCertificate = nil
copyConf.TLS.Stores[k] = st
}
}
if copyConf.HTTP != nil {
for _, transport := range copyConf.HTTP.ServersTransports {
transport.Certificates = tls.Certificates{}
transport.RootCAs = []types.FileOrContent{}
}
}
if copyConf.TCP != nil {
for _, transport := range copyConf.TCP.ServersTransports {
if transport.TLS != nil {
transport.TLS.Certificates = tls.Certificates{}
transport.TLS.RootCAs = []types.FileOrContent{}
}
}
}
jsonConf, err := json.Marshal(copyConf)
if err != nil {
logger.Error().Err(err).Msg("Could not marshal dynamic configuration")
logger.Debug().Msgf("Configuration received: [struct] %#v", copyConf)
} else {
logger.Debug().RawJSON("config", jsonConf).Msg("Configuration received")
}
}
func isEmptyConfiguration(conf *dynamic.Configuration) bool {
if conf.TCP == nil {
conf.TCP = &dynamic.TCPConfiguration{}
}
if conf.HTTP == nil {
conf.HTTP = &dynamic.HTTPConfiguration{}
}
if conf.UDP == nil {
conf.UDP = &dynamic.UDPConfiguration{}
}
httpEmpty := conf.HTTP.Routers == nil && conf.HTTP.Services == nil && conf.HTTP.Middlewares == nil
tlsEmpty := conf.TLS == nil || conf.TLS.Certificates == nil && conf.TLS.Stores == nil && conf.TLS.Options == nil
tcpEmpty := conf.TCP.Routers == nil && conf.TCP.Services == nil && conf.TCP.Middlewares == nil
udpEmpty := conf.UDP.Routers == nil && conf.UDP.Services == nil
return httpEmpty && tlsEmpty && tcpEmpty && udpEmpty
}