1
0
mirror of https://github.com/containous/traefik.git synced 2025-10-12 07:33:19 +03:00

Throttle Kubernetes config refresh

This commit is contained in:
Ben Weissmann
2019-08-30 06:16:04 -04:00
committed by Traefiker Bot
parent f1d016b893
commit bee370ec6b
2 changed files with 61 additions and 3 deletions

View File

@@ -16,6 +16,7 @@ import (
"time"
"github.com/cenk/backoff"
"github.com/containous/flaeg"
"github.com/containous/traefik/job"
"github.com/containous/traefik/log"
"github.com/containous/traefik/provider"
@@ -68,6 +69,7 @@ type Provider struct {
LabelSelector string `description:"Kubernetes Ingress label selector to use" export:"true"`
IngressClass string `description:"Value of kubernetes.io/ingress.class annotation to watch for" export:"true"`
IngressEndpoint *IngressEndpoint `description:"Kubernetes Ingress Endpoint"`
ThrottleDuration flaeg.Duration `description:"Ingress refresh throttle duration"`
lastConfiguration safe.Safe
}
@@ -137,16 +139,26 @@ func (p *Provider) Provide(configurationChan chan<- types.ConfigMessage, pool *s
return nil
}
}
throttleDuration := time.Duration(p.ThrottleDuration)
eventsChanToRead := throttleEvents(throttleDuration, stop, eventsChan)
for {
select {
case <-stop:
return nil
case event := <-eventsChan:
case event := <-eventsChanToRead:
// Note that event is the *first* event that came in during this
// throttling interval -- if we're hitting our throttle, we may have
// dropped events. This is fine, because we don't treat different
// event types differently. But if we do in the future, we'll need to
// track more information about the dropped events.
log.Debugf("Received Kubernetes event kind %T", event)
templateObjects, err := p.loadIngresses(k8sClient)
if err != nil {
return err
}
if reflect.DeepEqual(p.lastConfiguration.Get(), templateObjects) {
log.Debugf("Skipping Kubernetes event kind %T", event)
} else {
@@ -156,6 +168,11 @@ func (p *Provider) Provide(configurationChan chan<- types.ConfigMessage, pool *s
Configuration: p.loadConfig(*templateObjects),
}
}
// If we're throttling, we sleep here for the throttle duration to
// enforce that we don't refresh faster than our throttle. time.Sleep
// returns immediately if p.ThrottleDuration is 0 (no throttle).
time.Sleep(throttleDuration)
}
}
}
@@ -599,6 +616,39 @@ func (p *Provider) addGlobalBackend(cl Client, i *extensionsv1beta1.Ingress, tem
return nil
}
func throttleEvents(throttleDuration time.Duration, stop chan bool, eventsChan <-chan interface{}) chan interface{} {
if throttleDuration == 0 {
return nil
}
// Create a buffered channel to hold the pending event (if we're delaying processing the event due to throttling)
eventsChanBuffered := make(chan interface{}, 1)
// Run a goroutine that reads events from eventChan and does a
// non-blocking write to pendingEvent. This guarantees that writing to
// eventChan will never block, and that pendingEvent will have
// something in it if there's been an event since we read from that channel.
go func() {
for {
select {
case <-stop:
return
case nextEvent := <-eventsChan:
select {
case eventsChanBuffered <- nextEvent:
default:
// We already have an event in eventsChanBuffered, so we'll
// do a refresh as soon as our throttle allows us to. It's fine
// to drop the event and keep whatever's in the buffer -- we
// don't do different things for different events
log.Debugf("Dropping event kind %T due to throttling", nextEvent)
}
}
}
}()
return eventsChanBuffered
}
func getRuleForPath(pa extensionsv1beta1.HTTPIngressPath, i *extensionsv1beta1.Ingress) (string, error) {
if len(pa.Path) == 0 {
return "", nil