diff --git a/docs/content/providers/consul-catalog.md b/docs/content/providers/consul-catalog.md index 35d0d600e..4d4617765 100644 --- a/docs/content/providers/consul-catalog.md +++ b/docs/content/providers/consul-catalog.md @@ -721,3 +721,27 @@ providers: --providers.consulcatalog.namespace=production # ... ``` + +### `watch` + +_Optional, Default=false_ + +When set to `true`, watches for Consul changes ([Consul watches checks](https://www.consul.io/docs/dynamic-app-config/watches#checks)). + +```yaml tab="File (YAML)" +providers: + consulCatalog: + watch: true + # ... +``` + +```toml tab="File (TOML)" +[providers.consulCatalog] + watch = true + # ... +``` + +```bash tab="CLI" +--providers.consulcatalog.watch=true +# ... +``` diff --git a/docs/content/reference/static-configuration/cli-ref.md b/docs/content/reference/static-configuration/cli-ref.md index 38aeeed26..198854d4b 100644 --- a/docs/content/reference/static-configuration/cli-ref.md +++ b/docs/content/reference/static-configuration/cli-ref.md @@ -456,6 +456,9 @@ Name of the Traefik service in Consul Catalog (needs to be registered via the or `--providers.consulcatalog.stale`: Use stale consistency for catalog reads. (Default: ```false```) +`--providers.consulcatalog.watch`: +Watch Consul API events. (Default: ```false```) + `--providers.docker`: Enable Docker backend with default settings. (Default: ```false```) diff --git a/docs/content/reference/static-configuration/env-ref.md b/docs/content/reference/static-configuration/env-ref.md index 6e91952d9..297ba299e 100644 --- a/docs/content/reference/static-configuration/env-ref.md +++ b/docs/content/reference/static-configuration/env-ref.md @@ -423,6 +423,9 @@ Name of the Traefik service in Consul Catalog (needs to be registered via the or `TRAEFIK_PROVIDERS_CONSULCATALOG_STALE`: Use stale consistency for catalog reads. (Default: ```false```) +`TRAEFIK_PROVIDERS_CONSULCATALOG_WATCH`: +Watch Consul API events. (Default: ```false```) + `TRAEFIK_PROVIDERS_CONSUL_ENDPOINTS`: KV store endpoints (Default: ```127.0.0.1:8500```) diff --git a/docs/content/reference/static-configuration/file.toml b/docs/content/reference/static-configuration/file.toml index a995affa5..9de2bc365 100644 --- a/docs/content/reference/static-configuration/file.toml +++ b/docs/content/reference/static-configuration/file.toml @@ -149,6 +149,7 @@ exposedByDefault = true defaultRule = "foobar" namespace = "foobar" + watch = true [providers.consulCatalog.endpoint] address = "foobar" scheme = "foobar" diff --git a/docs/content/reference/static-configuration/file.yaml b/docs/content/reference/static-configuration/file.yaml index c4e5c31ee..f7068ab38 100644 --- a/docs/content/reference/static-configuration/file.yaml +++ b/docs/content/reference/static-configuration/file.yaml @@ -161,6 +161,7 @@ providers: exposedByDefault: true defaultRule: foobar namespace: foobar + watch: true endpoint: address: foobar scheme: foobar diff --git a/integration/consul_catalog_test.go b/integration/consul_catalog_test.go index 11b6fbf68..b1e3d3366 100644 --- a/integration/consul_catalog_test.go +++ b/integration/consul_catalog_test.go @@ -234,6 +234,80 @@ func (s *ConsulCatalogSuite) TestSimpleConfiguration(c *check.C) { c.Assert(err, checker.IsNil) } +func (s *ConsulCatalogSuite) TestSimpleConfigurationWithWatch(c *check.C) { + tempObjects := struct { + ConsulAddress string + DefaultRule string + }{ + ConsulAddress: s.consulURL, + DefaultRule: "Host(`{{ normalize .Name }}.consul.localhost`)", + } + + file := s.adaptFile(c, "fixtures/consul_catalog/simple_watch.toml", tempObjects) + defer os.Remove(file) + + reg := &api.AgentServiceRegistration{ + ID: "whoami1", + Name: "whoami", + Tags: []string{"traefik.enable=true"}, + Port: 80, + Address: s.getComposeServiceIP(c, "whoami1"), + } + err := s.registerService(reg, false) + c.Assert(err, checker.IsNil) + + cmd, display := s.traefikCmd(withConfigFile(file)) + defer display(c) + err = cmd.Start() + c.Assert(err, checker.IsNil) + defer s.killCmd(cmd) + + req, err := http.NewRequest(http.MethodGet, "http://127.0.0.1:8000/", nil) + c.Assert(err, checker.IsNil) + req.Host = "whoami.consul.localhost" + + err = try.Request(req, 2*time.Second, try.StatusCodeIs(http.StatusOK), try.BodyContainsOr("Hostname: whoami1")) + c.Assert(err, checker.IsNil) + + err = s.deregisterService("whoami1", false) + c.Assert(err, checker.IsNil) + + err = try.Request(req, 2*time.Second, try.StatusCodeIs(http.StatusNotFound)) + c.Assert(err, checker.IsNil) + + whoamiIP := s.getComposeServiceIP(c, "whoami1") + reg.Check = &api.AgentServiceCheck{ + CheckID: "some-ok-check", + TCP: whoamiIP + ":80", + Name: "some-ok-check", + Interval: "1s", + Timeout: "1s", + } + + err = s.registerService(reg, false) + c.Assert(err, checker.IsNil) + + err = try.Request(req, 2*time.Second, try.StatusCodeIs(http.StatusOK), try.BodyContainsOr("Hostname: whoami1")) + c.Assert(err, checker.IsNil) + + reg.Check = &api.AgentServiceCheck{ + CheckID: "some-failing-check", + TCP: ":80", + Name: "some-failing-check", + Interval: "1s", + Timeout: "1s", + } + + err = s.registerService(reg, false) + c.Assert(err, checker.IsNil) + + err = try.Request(req, 2*time.Second, try.StatusCodeIs(http.StatusNotFound)) + c.Assert(err, checker.IsNil) + + err = s.deregisterService("whoami1", false) + c.Assert(err, checker.IsNil) +} + func (s *ConsulCatalogSuite) TestRegisterServiceWithoutIP(c *check.C) { tempObjects := struct { ConsulAddress string diff --git a/integration/fixtures/consul_catalog/simple_watch.toml b/integration/fixtures/consul_catalog/simple_watch.toml new file mode 100644 index 000000000..9f73dcaf1 --- /dev/null +++ b/integration/fixtures/consul_catalog/simple_watch.toml @@ -0,0 +1,22 @@ +[global] + checkNewVersion = false + sendAnonymousUsage = false + +[log] + level = "DEBUG" + +[entryPoints] + [entryPoints.web] + address = ":8000" + +[api] + insecure = true + +[providers] + [providers.consulCatalog] + exposedByDefault = true + refreshInterval = "500ms" + defaultRule = "{{ .DefaultRule }}" + watch = true + [providers.consulCatalog.endpoint] + address = "{{ .ConsulAddress }}" diff --git a/pkg/provider/consulcatalog/connect_tls.go b/pkg/provider/consulcatalog/connect_tls.go index 4c0e437e0..5b7a92739 100644 --- a/pkg/provider/consulcatalog/connect_tls.go +++ b/pkg/provider/consulcatalog/connect_tls.go @@ -12,8 +12,6 @@ import ( type connectCert struct { root []string leaf keyPair - // err is used to propagate to the caller (Provide) any error occurring within the certificate watcher goroutines. - err error } func (c *connectCert) getRoot() []traefiktls.FileOrContent { diff --git a/pkg/provider/consulcatalog/consul_catalog.go b/pkg/provider/consulcatalog/consul_catalog.go index 2f949b047..a51127d5a 100644 --- a/pkg/provider/consulcatalog/consul_catalog.go +++ b/pkg/provider/consulcatalog/consul_catalog.go @@ -56,10 +56,12 @@ type Provider struct { ConnectByDefault bool `description:"Consider every service as Connect capable by default." json:"connectByDefault,omitempty" toml:"connectByDefault,omitempty" yaml:"connectByDefault,omitempty" export:"true"` ServiceName string `description:"Name of the Traefik service in Consul Catalog (needs to be registered via the orchestrator or manually)." json:"serviceName,omitempty" toml:"serviceName,omitempty" yaml:"serviceName,omitempty" export:"true"` Namespace string `description:"Sets the namespace used to discover services (Consul Enterprise only)." json:"namespace,omitempty" toml:"namespace,omitempty" yaml:"namespace,omitempty" export:"true"` + Watch bool `description:"Watch Consul API events." json:"watch,omitempty" toml:"watch,omitempty" yaml:"watch,omitempty" export:"true"` - client *api.Client - defaultRuleTpl *template.Template - certChan chan *connectCert + client *api.Client + defaultRuleTpl *template.Template + certChan chan *connectCert + watchServicesChan chan struct{} } // EndpointConfig holds configurations of the endpoint. @@ -98,7 +100,9 @@ func (p *Provider) Init() error { } p.defaultRuleTpl = defaultRuleTpl - p.certChan = make(chan *connectCert) + p.certChan = make(chan *connectCert, 1) + p.watchServicesChan = make(chan struct{}, 1) + return nil } @@ -107,27 +111,31 @@ func (p *Provider) Provide(configurationChan chan<- dynamic.Message, pool *safe. var err error p.client, err = createClient(p.Namespace, p.Endpoint) if err != nil { - return fmt.Errorf("unable to create consul client: %w", err) + return fmt.Errorf("failed to create consul client: %w", err) } - if p.ConnectAware { - leafWatcher, rootWatcher, err := p.createConnectTLSWatchers() - if err != nil { - return fmt.Errorf("unable to create consul watch plans: %w", err) - } - pool.GoCtx(func(routineCtx context.Context) { - p.watchConnectTLS(routineCtx, leafWatcher, rootWatcher) - }) - } - - var certInfo *connectCert - pool.GoCtx(func(routineCtx context.Context) { ctxLog := log.With(routineCtx, log.Str(log.ProviderName, "consulcatalog")) logger := log.FromContext(ctxLog) operation := func() error { - var err error + ctx, cancel := context.WithCancel(ctxLog) + + // When the operation terminates, we want to clean up the + // goroutines in watchConnectTLS and watchServices. + defer cancel() + + errChan := make(chan error, 2) + + if p.ConnectAware { + go func() { + if err := p.watchConnectTLS(ctx); err != nil { + errChan <- fmt.Errorf("failed to watch connect certificates: %w", err) + } + }() + } + + var certInfo *connectCert // If we are running in connect aware mode then we need to // make sure that we obtain the certificates before starting @@ -137,37 +145,46 @@ func (p *Provider) Provide(configurationChan chan<- dynamic.Message, pool *safe. if p.ConnectAware && !certInfo.isReady() { logger.Infof("Waiting for Connect certificate before building first configuration") select { - case <-routineCtx.Done(): + case <-ctx.Done(): return nil + + case err = <-errChan: + return err + case certInfo = <-p.certChan: - if certInfo.err != nil { - return backoff.Permanent(err) - } } } // get configuration at the provider's startup. - err = p.loadConfiguration(ctxLog, certInfo, configurationChan) - if err != nil { + if err = p.loadConfiguration(ctx, certInfo, configurationChan); err != nil { return fmt.Errorf("failed to get consul catalog data: %w", err) } - // Periodic refreshes. - ticker := time.NewTicker(time.Duration(p.RefreshInterval)) - defer ticker.Stop() + go func() { + // Periodic refreshes. + if !p.Watch { + repeatSend(ctx, time.Duration(p.RefreshInterval), p.watchServicesChan) + return + } + + if err := p.watchServices(ctx); err != nil { + errChan <- fmt.Errorf("failed to watch services: %w", err) + } + }() for { select { - case <-routineCtx.Done(): + case <-ctx.Done(): return nil - case <-ticker.C: + + case err = <-errChan: + return err + case certInfo = <-p.certChan: - if certInfo.err != nil { - return backoff.Permanent(err) - } + case <-p.watchServicesChan: } - err = p.loadConfiguration(ctxLog, certInfo, configurationChan) - if err != nil { + + if err = p.loadConfiguration(ctx, certInfo, configurationChan); err != nil { return fmt.Errorf("failed to refresh consul catalog data: %w", err) } } @@ -330,6 +347,67 @@ func (p *Provider) fetchService(ctx context.Context, name string, connectEnabled return consulServices, statuses, err } +// watchServices watches for update events of the services list and statuses, +// and transmits them to the caller through the p.watchServicesChan. +func (p *Provider) watchServices(ctx context.Context) error { + servicesWatcher, err := watch.Parse(map[string]interface{}{"type": "services"}) + if err != nil { + return fmt.Errorf("failed to create services watcher plan: %w", err) + } + + servicesWatcher.HybridHandler = func(_ watch.BlockingParamVal, _ interface{}) { + select { + case <-ctx.Done(): + case p.watchServicesChan <- struct{}{}: + default: + // Event chan is full, discard event. + } + } + + checksWatcher, err := watch.Parse(map[string]interface{}{"type": "checks"}) + if err != nil { + return fmt.Errorf("failed to create checks watcher plan: %w", err) + } + + checksWatcher.HybridHandler = func(_ watch.BlockingParamVal, _ interface{}) { + select { + case <-ctx.Done(): + case p.watchServicesChan <- struct{}{}: + default: + // Event chan is full, discard event. + } + } + + logger := hclog.New(&hclog.LoggerOptions{ + Name: "consulcatalog", + Level: hclog.LevelFromString(logrus.GetLevel().String()), + JSONFormat: true, + }) + + errChan := make(chan error, 2) + + defer func() { + servicesWatcher.Stop() + checksWatcher.Stop() + }() + + go func() { + errChan <- servicesWatcher.RunWithClientAndHclog(p.client, logger) + }() + + go func() { + errChan <- checksWatcher.RunWithClientAndHclog(p.client, logger) + }() + + select { + case <-ctx.Done(): + return nil + + case err = <-errChan: + return fmt.Errorf("services or checks watcher terminated: %w", err) + } +} + func rootsWatchHandler(ctx context.Context, dest chan<- []string) func(watch.BlockingParamVal, interface{}) { return func(_ watch.BlockingParamVal, raw interface{}) { if raw == nil { @@ -348,7 +426,10 @@ func rootsWatchHandler(ctx context.Context, dest chan<- []string) func(watch.Blo roots = append(roots, root.RootCertPEM) } - dest <- roots + select { + case <-ctx.Done(): + case dest <- roots: + } } } @@ -370,65 +451,59 @@ func leafWatcherHandler(ctx context.Context, dest chan<- keyPair) func(watch.Blo return } - dest <- keyPair{ + kp := keyPair{ cert: v.CertPEM, key: v.PrivateKeyPEM, } + + select { + case <-ctx.Done(): + case dest <- kp: + } } } -func (p *Provider) createConnectTLSWatchers() (*watch.Plan, *watch.Plan, error) { +// watchConnectTLS watches for updates of the root certificate or the leaf +// certificate, and transmits them to the caller via p.certChan. +func (p *Provider) watchConnectTLS(ctx context.Context) error { + leafChan := make(chan keyPair) leafWatcher, err := watch.Parse(map[string]interface{}{ "type": "connect_leaf", "service": p.ServiceName, }) if err != nil { - return nil, nil, fmt.Errorf("failed to create leaf cert watcher plan: %w", err) + return fmt.Errorf("failed to create leaf cert watcher plan: %w", err) } + leafWatcher.HybridHandler = leafWatcherHandler(ctx, leafChan) - rootWatcher, err := watch.Parse(map[string]interface{}{ + rootsChan := make(chan []string) + rootsWatcher, err := watch.Parse(map[string]interface{}{ "type": "connect_roots", }) if err != nil { - return nil, nil, fmt.Errorf("failed to create root cert watcher plan: %w", err) + return fmt.Errorf("failed to create roots cert watcher plan: %w", err) } + rootsWatcher.HybridHandler = rootsWatchHandler(ctx, rootsChan) - return leafWatcher, rootWatcher, nil -} - -// watchConnectTLS watches for updates of the root certificate or the leaf -// certificate, and transmits them to the caller via p.certChan. Any error is also -// propagated up through p.certChan, in connectCert.err. -func (p *Provider) watchConnectTLS(ctx context.Context, leafWatcher *watch.Plan, rootWatcher *watch.Plan) { - ctxLog := log.With(ctx, log.Str(log.ProviderName, "consulcatalog")) - logger := log.FromContext(ctxLog) - - leafChan := make(chan keyPair) - rootChan := make(chan []string) - - leafWatcher.HybridHandler = leafWatcherHandler(ctx, leafChan) - rootWatcher.HybridHandler = rootsWatchHandler(ctx, rootChan) - - logOpts := &hclog.LoggerOptions{ + hclogger := hclog.New(&hclog.LoggerOptions{ Name: "consulcatalog", Level: hclog.LevelFromString(logrus.GetLevel().String()), JSONFormat: true, - } + }) - hclogger := hclog.New(logOpts) + errChan := make(chan error, 2) - go func() { - err := leafWatcher.RunWithClientAndHclog(p.client, hclogger) - if err != nil { - p.certChan <- &connectCert{err: err} - } + defer func() { + leafWatcher.Stop() + rootsWatcher.Stop() }() go func() { - err := rootWatcher.RunWithClientAndHclog(p.client, hclogger) - if err != nil { - p.certChan <- &connectCert{err: err} - } + errChan <- leafWatcher.RunWithClientAndHclog(p.client, hclogger) + }() + + go func() { + errChan <- rootsWatcher.RunWithClientAndHclog(p.client, hclogger) }() var ( @@ -440,20 +515,28 @@ func (p *Provider) watchConnectTLS(ctx context.Context, leafWatcher *watch.Plan, for { select { case <-ctx.Done(): - leafWatcher.Stop() - rootWatcher.Stop() - return - case rootCerts = <-rootChan: + return nil + + case err := <-errChan: + return fmt.Errorf("leaf or roots watcher terminated: %w", err) + + case rootCerts = <-rootsChan: case leafCerts = <-leafChan: } + newCertInfo := &connectCert{ root: rootCerts, leaf: leafCerts, } if newCertInfo.isReady() && !newCertInfo.equals(certInfo) { - logger.Debugf("Updating connect certs for service %s", p.ServiceName) + log.FromContext(ctx).Debugf("Updating connect certs for service %s", p.ServiceName) + certInfo = newCertInfo - p.certChan <- newCertInfo + + select { + case <-ctx.Done(): + case p.certChan <- newCertInfo: + } } } } @@ -487,3 +570,25 @@ func createClient(namespace string, endpoint *EndpointConfig) (*api.Client, erro return api.NewClient(&config) } + +func repeatSend(ctx context.Context, interval time.Duration, c chan<- struct{}) { + ticker := time.NewTicker(interval) + defer ticker.Stop() + + for { + select { + case <-ctx.Done(): + return + + case <-ticker.C: + select { + case <-ctx.Done(): + return + + case c <- struct{}{}: + default: + // Chan is full, discard event. + } + } + } +}