1
0
mirror of https://github.com/containous/traefik.git synced 2025-03-10 12:58:23 +03:00

Implementation of serving not ready endpoints

This commit is contained in:
Valéry Fouques 2024-12-11 13:54:05 +01:00 committed by GitHub
parent a4c0b1649d
commit 9588e51146
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
13 changed files with 204 additions and 42 deletions

View File

@ -982,6 +982,9 @@ More information in the dedicated [mirroring](../services/index.md#mirroring-ser
As explained in the section about [Sticky sessions](../../services/#sticky-sessions), for stickiness to work all the way,
it must be specified at each load-balancing level.
When stickiness is enabled, Traefik uses Kubernetes [serving](https://kubernetes.io/docs/concepts/services-networking/endpoint-slices/#serving) endpoints status to detect and mark servers as fenced.
Fenced servers can still process requests tied to sticky cookies, while they are terminating.
For instance, in the example below, there is a first level of load-balancing because there is a (Weighted Round Robin) load-balancing of the two `whoami` services,
and there is a second level because each whoami service is a `replicaset` and is thus handled as a load-balancer of servers.

View File

@ -391,6 +391,11 @@ which in turn will create the resulting routers, services, handlers, etc.
traefik.ingress.kubernetes.io/service.sticky.cookie.path: /foobar
```
## Stickiness and load-balancing
When stickiness is enabled, Traefik uses Kubernetes [serving](https://kubernetes.io/docs/concepts/services-networking/endpoint-slices/#serving) endpoints status to detect and mark servers as fenced.
Fenced servers can still process requests tied to sticky cookies, while they are terminating.
## Path Types on Kubernetes 1.18+
If the Kubernetes cluster version is 1.18+,

View File

@ -257,6 +257,7 @@ type Server struct {
URL string `json:"url,omitempty" toml:"url,omitempty" yaml:"url,omitempty" label:"-"`
Weight *int `json:"weight,omitempty" toml:"weight,omitempty" yaml:"weight,omitempty" label:"weight" export:"true"`
PreservePath bool `json:"preservePath,omitempty" toml:"preservePath,omitempty" yaml:"preservePath,omitempty" label:"-" export:"true"`
Fenced bool `json:"fenced,omitempty" toml:"-" yaml:"-" label:"-" file:"-" kv:"-"`
Scheme string `json:"-" toml:"-" yaml:"-" file:"-"`
Port string `json:"-" toml:"-" yaml:"-" file:"-"`
}

View File

@ -13,9 +13,11 @@ import (
"github.com/traefik/traefik/v3/pkg/logs"
"github.com/traefik/traefik/v3/pkg/provider"
traefikv1alpha1 "github.com/traefik/traefik/v3/pkg/provider/kubernetes/crd/traefikio/v1alpha1"
"github.com/traefik/traefik/v3/pkg/provider/kubernetes/k8s"
"github.com/traefik/traefik/v3/pkg/tls"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/util/intstr"
"k8s.io/utils/ptr"
)
const (
@ -544,7 +546,7 @@ func (c configBuilder) loadServers(parentNamespace string, svc traefikv1alpha1.L
}
for _, endpoint := range endpointSlice.Endpoints {
if endpoint.Conditions.Ready == nil || !*endpoint.Conditions.Ready {
if !k8s.EndpointServing(endpoint) {
continue
}
@ -555,7 +557,8 @@ func (c configBuilder) loadServers(parentNamespace string, svc traefikv1alpha1.L
addresses[address] = struct{}{}
servers = append(servers, dynamic.Server{
URL: fmt.Sprintf("%s://%s", protocol, net.JoinHostPort(address, strconv.Itoa(int(port)))),
URL: fmt.Sprintf("%s://%s", protocol, net.JoinHostPort(address, strconv.Itoa(int(port)))),
Fenced: ptr.Deref(endpoint.Conditions.Serving, false),
})
}
}

View File

@ -4737,6 +4737,14 @@ func TestLoadIngressRoutes(t *testing.T) {
{
URL: "http://10.10.0.2:80",
},
{
URL: "http://10.10.0.3:80",
Fenced: true,
},
{
URL: "http://10.10.0.4:80",
Fenced: true,
},
{
URL: "http://10.10.0.5:80",
},

View File

@ -18,7 +18,7 @@ func Test_convertSlice_corev1_to_networkingv1(t *testing.T) {
{
Port: 123,
Protocol: "https",
Error: ptr("test"),
Error: pointer("test"),
},
},
},
@ -35,7 +35,7 @@ func Test_convertSlice_corev1_to_networkingv1(t *testing.T) {
{
Port: 123,
Protocol: "https",
Error: ptr("test"),
Error: pointer("test"),
},
},
},
@ -52,7 +52,7 @@ func Test_convert(t *testing.T) {
{
Port: 123,
Protocol: "https",
Error: ptr("test"),
Error: pointer("test"),
},
},
}
@ -67,14 +67,10 @@ func Test_convert(t *testing.T) {
{
Port: 123,
Protocol: "https",
Error: ptr("test"),
Error: pointer("test"),
},
},
}
assert.Equal(t, expected, actual)
}
func ptr[T any](v T) *T {
return &v
}

View File

@ -30,6 +30,7 @@ import (
corev1 "k8s.io/api/core/v1"
netv1 "k8s.io/api/networking/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/utils/ptr"
)
const (
@ -587,7 +588,7 @@ func (p *Provider) loadService(client Client, namespace string, backend netv1.In
protocol := getProtocol(portSpec, portName, svcConfig)
for _, endpoint := range endpointSlice.Endpoints {
if endpoint.Conditions.Ready == nil || !*endpoint.Conditions.Ready {
if !k8s.EndpointServing(endpoint) {
continue
}
@ -598,7 +599,8 @@ func (p *Provider) loadService(client Client, namespace string, backend netv1.In
addresses[address] = struct{}{}
svc.LoadBalancer.Servers = append(svc.LoadBalancer.Servers, dynamic.Server{
URL: fmt.Sprintf("%s://%s", protocol, net.JoinHostPort(address, strconv.Itoa(int(port)))),
URL: fmt.Sprintf("%s://%s", protocol, net.JoinHostPort(address, strconv.Itoa(int(port)))),
Fenced: ptr.Deref(endpoint.Conditions.Serving, false),
})
}
}

View File

@ -0,0 +1,11 @@
package k8s
import (
v1 "k8s.io/api/discovery/v1"
"k8s.io/utils/ptr"
)
// EndpointServing returns true if the endpoint is still serving the service.
func EndpointServing(endpoint v1.Endpoint) bool {
return ptr.Deref(endpoint.Conditions.Ready, false) || ptr.Deref(endpoint.Conditions.Serving, false)
}

View File

@ -0,0 +1,75 @@
package k8s
import (
"testing"
"github.com/stretchr/testify/assert"
v1 "k8s.io/api/discovery/v1"
)
func TestEndpointServing(t *testing.T) {
tests := []struct {
name string
endpoint v1.Endpoint
want bool
}{
{
name: "no status",
endpoint: v1.Endpoint{
Conditions: v1.EndpointConditions{
Ready: nil,
Serving: nil,
},
},
want: false,
},
{
name: "ready",
endpoint: v1.Endpoint{
Conditions: v1.EndpointConditions{
Ready: pointer(true),
Serving: nil,
},
},
want: true,
},
{
name: "not ready",
endpoint: v1.Endpoint{
Conditions: v1.EndpointConditions{
Ready: pointer(false),
Serving: nil,
},
},
want: false,
},
{
name: "not ready and serving",
endpoint: v1.Endpoint{
Conditions: v1.EndpointConditions{
Ready: pointer(false),
Serving: pointer(true),
},
},
want: true,
},
{
name: "not ready and not serving",
endpoint: v1.Endpoint{
Conditions: v1.EndpointConditions{
Ready: pointer(false),
Serving: pointer(false),
},
},
want: false,
},
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
got := EndpointServing(test.endpoint)
assert.Equal(t, test.want, got)
})
}
}
func pointer[T any](v T) *T { return &v }

View File

@ -4,7 +4,6 @@ package server
import (
"context"
"net"
"testing"
"github.com/stretchr/testify/require"

View File

@ -64,12 +64,15 @@ type Balancer struct {
// updaters is the list of hooks that are run (to update the Balancer
// parent(s)), whenever the Balancer status changes.
updaters []func(bool)
// fenced is the list of terminating yet still serving child services.
fenced map[string]struct{}
}
// New creates a new load balancer.
func New(sticky *dynamic.Sticky, wantHealthCheck bool) *Balancer {
balancer := &Balancer{
status: make(map[string]struct{}),
fenced: make(map[string]struct{}),
handlerMap: make(map[string]*namedHandler),
wantsHealthCheck: wantHealthCheck,
}
@ -179,7 +182,7 @@ func (b *Balancer) nextServer() (*namedHandler, error) {
b.handlersMu.Lock()
defer b.handlersMu.Unlock()
if len(b.handlers) == 0 || len(b.status) == 0 {
if len(b.handlers) == 0 || len(b.status) == 0 || len(b.fenced) == len(b.handlers) {
return nil, errNoAvailableServer
}
@ -194,7 +197,10 @@ func (b *Balancer) nextServer() (*namedHandler, error) {
heap.Push(b, handler)
if _, ok := b.status[handler.name]; ok {
break
if _, ok := b.fenced[handler.name]; !ok {
// do not select a fenced handler.
break
}
}
}
@ -255,7 +261,7 @@ func (b *Balancer) ServeHTTP(w http.ResponseWriter, req *http.Request) {
// Add adds a handler.
// A handler with a non-positive weight is ignored.
func (b *Balancer) Add(name string, handler http.Handler, weight *int) {
func (b *Balancer) Add(name string, handler http.Handler, weight *int, fenced bool) {
w := 1
if weight != nil {
w = *weight
@ -271,6 +277,9 @@ func (b *Balancer) Add(name string, handler http.Handler, weight *int) {
h.deadline = b.curDeadline + 1/h.weight
heap.Push(b, h)
b.status[name] = struct{}{}
if fenced {
b.fenced[name] = struct{}{}
}
b.handlerMap[name] = h
b.handlerMap[hash(name)] = h
b.handlersMu.Unlock()

View File

@ -18,12 +18,12 @@ func TestBalancer(t *testing.T) {
balancer.Add("first", http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) {
rw.Header().Set("server", "first")
rw.WriteHeader(http.StatusOK)
}), pointer(3))
}), pointer(3), false)
balancer.Add("second", http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) {
rw.Header().Set("server", "second")
rw.WriteHeader(http.StatusOK)
}), pointer(1))
}), pointer(1), false)
recorder := &responseRecorder{ResponseRecorder: httptest.NewRecorder(), save: map[string]int{}}
for range 4 {
@ -49,9 +49,9 @@ func TestBalancerOneServerZeroWeight(t *testing.T) {
balancer.Add("first", http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) {
rw.Header().Set("server", "first")
rw.WriteHeader(http.StatusOK)
}), pointer(1))
}), pointer(1), false)
balancer.Add("second", http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) {}), pointer(0))
balancer.Add("second", http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) {}), pointer(0), false)
recorder := &responseRecorder{ResponseRecorder: httptest.NewRecorder(), save: map[string]int{}}
for range 3 {
@ -70,11 +70,11 @@ func TestBalancerNoServiceUp(t *testing.T) {
balancer.Add("first", http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) {
rw.WriteHeader(http.StatusInternalServerError)
}), pointer(1))
}), pointer(1), false)
balancer.Add("second", http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) {
rw.WriteHeader(http.StatusInternalServerError)
}), pointer(1))
}), pointer(1), false)
balancer.SetStatus(context.WithValue(context.Background(), serviceName, "parent"), "first", false)
balancer.SetStatus(context.WithValue(context.Background(), serviceName, "parent"), "second", false)
@ -91,11 +91,11 @@ func TestBalancerOneServerDown(t *testing.T) {
balancer.Add("first", http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) {
rw.Header().Set("server", "first")
rw.WriteHeader(http.StatusOK)
}), pointer(1))
}), pointer(1), false)
balancer.Add("second", http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) {
rw.WriteHeader(http.StatusInternalServerError)
}), pointer(1))
}), pointer(1), false)
balancer.SetStatus(context.WithValue(context.Background(), serviceName, "parent"), "second", false)
recorder := &responseRecorder{ResponseRecorder: httptest.NewRecorder(), save: map[string]int{}}
@ -112,12 +112,12 @@ func TestBalancerDownThenUp(t *testing.T) {
balancer.Add("first", http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) {
rw.Header().Set("server", "first")
rw.WriteHeader(http.StatusOK)
}), pointer(1))
}), pointer(1), false)
balancer.Add("second", http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) {
rw.Header().Set("server", "second")
rw.WriteHeader(http.StatusOK)
}), pointer(1))
}), pointer(1), false)
balancer.SetStatus(context.WithValue(context.Background(), serviceName, "parent"), "second", false)
recorder := &responseRecorder{ResponseRecorder: httptest.NewRecorder(), save: map[string]int{}}
@ -141,30 +141,30 @@ func TestBalancerPropagate(t *testing.T) {
balancer1.Add("first", http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) {
rw.Header().Set("server", "first")
rw.WriteHeader(http.StatusOK)
}), pointer(1))
}), pointer(1), false)
balancer1.Add("second", http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) {
rw.Header().Set("server", "second")
rw.WriteHeader(http.StatusOK)
}), pointer(1))
}), pointer(1), false)
balancer2 := New(nil, true)
balancer2.Add("third", http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) {
rw.Header().Set("server", "third")
rw.WriteHeader(http.StatusOK)
}), pointer(1))
}), pointer(1), false)
balancer2.Add("fourth", http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) {
rw.Header().Set("server", "fourth")
rw.WriteHeader(http.StatusOK)
}), pointer(1))
}), pointer(1), false)
topBalancer := New(nil, true)
topBalancer.Add("balancer1", balancer1, pointer(1))
topBalancer.Add("balancer1", balancer1, pointer(1), false)
_ = balancer1.RegisterStatusUpdater(func(up bool) {
topBalancer.SetStatus(context.WithValue(context.Background(), serviceName, "top"), "balancer1", up)
// TODO(mpl): if test gets flaky, add channel or something here to signal that
// propagation is done, and wait on it before sending request.
})
topBalancer.Add("balancer2", balancer2, pointer(1))
topBalancer.Add("balancer2", balancer2, pointer(1), false)
_ = balancer2.RegisterStatusUpdater(func(up bool) {
topBalancer.SetStatus(context.WithValue(context.Background(), serviceName, "top"), "balancer2", up)
})
@ -211,8 +211,20 @@ func TestBalancerPropagate(t *testing.T) {
func TestBalancerAllServersZeroWeight(t *testing.T) {
balancer := New(nil, false)
balancer.Add("test", http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) {}), pointer(0))
balancer.Add("test2", http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) {}), pointer(0))
balancer.Add("test", http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) {}), pointer(0), false)
balancer.Add("test2", http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) {}), pointer(0), false)
recorder := httptest.NewRecorder()
balancer.ServeHTTP(recorder, httptest.NewRequest(http.MethodGet, "/", nil))
assert.Equal(t, http.StatusServiceUnavailable, recorder.Result().StatusCode)
}
func TestBalancerAllServersFenced(t *testing.T) {
balancer := New(nil, false)
balancer.Add("test", http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) {}), pointer(1), true)
balancer.Add("test2", http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) {}), pointer(1), true)
recorder := httptest.NewRecorder()
balancer.ServeHTTP(recorder, httptest.NewRequest(http.MethodGet, "/", nil))
@ -235,12 +247,12 @@ func TestSticky(t *testing.T) {
balancer.Add("first", http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) {
rw.Header().Set("server", "first")
rw.WriteHeader(http.StatusOK)
}), pointer(1))
}), pointer(1), false)
balancer.Add("second", http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) {
rw.Header().Set("server", "second")
rw.WriteHeader(http.StatusOK)
}), pointer(2))
}), pointer(2), false)
recorder := &responseRecorder{
ResponseRecorder: httptest.NewRecorder(),
@ -277,12 +289,12 @@ func TestSticky_FallBack(t *testing.T) {
balancer.Add("first", http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) {
rw.Header().Set("server", "first")
rw.WriteHeader(http.StatusOK)
}), pointer(1))
}), pointer(1), false)
balancer.Add("second", http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) {
rw.Header().Set("server", "second")
rw.WriteHeader(http.StatusOK)
}), pointer(2))
}), pointer(2), false)
recorder := &responseRecorder{ResponseRecorder: httptest.NewRecorder(), save: map[string]int{}}
@ -306,12 +318,12 @@ func TestBalancerBias(t *testing.T) {
balancer.Add("first", http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) {
rw.Header().Set("server", "A")
rw.WriteHeader(http.StatusOK)
}), pointer(11))
}), pointer(11), false)
balancer.Add("second", http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) {
rw.Header().Set("server", "B")
rw.WriteHeader(http.StatusOK)
}), pointer(3))
}), pointer(3), false)
recorder := &responseRecorder{ResponseRecorder: httptest.NewRecorder(), save: map[string]int{}}
@ -341,3 +353,41 @@ func (r *responseRecorder) WriteHeader(statusCode int) {
}
r.ResponseRecorder.WriteHeader(statusCode)
}
// TestSticky_Fenced checks that fenced node receive traffic if their sticky cookie matches.
func TestSticky_Fenced(t *testing.T) {
balancer := New(&dynamic.Sticky{Cookie: &dynamic.Cookie{Name: "test"}}, false)
balancer.Add("first", http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) {
rw.Header().Set("server", "first")
rw.WriteHeader(http.StatusOK)
}), pointer(1), false)
balancer.Add("second", http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) {
rw.Header().Set("server", "second")
rw.WriteHeader(http.StatusOK)
}), pointer(1), false)
balancer.Add("fenced", http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) {
rw.Header().Set("server", "fenced")
rw.WriteHeader(http.StatusOK)
}), pointer(1), true)
recorder := &responseRecorder{ResponseRecorder: httptest.NewRecorder(), save: map[string]int{}}
stickyReq := httptest.NewRequest(http.MethodGet, "/", nil)
stickyReq.AddCookie(&http.Cookie{Name: "test", Value: "fenced"})
req := httptest.NewRequest(http.MethodGet, "/", nil)
for range 4 {
recorder.ResponseRecorder = httptest.NewRecorder()
balancer.ServeHTTP(recorder, stickyReq)
balancer.ServeHTTP(recorder, req)
}
assert.Equal(t, 4, recorder.save["fenced"])
assert.Equal(t, 2, recorder.save["first"])
assert.Equal(t, 2, recorder.save["second"])
}

View File

@ -258,7 +258,7 @@ func (m *Manager) getWRRServiceHandler(ctx context.Context, serviceName string,
return nil, err
}
balancer.Add(service.Name, serviceHandler, service.Weight)
balancer.Add(service.Name, serviceHandler, service.Weight, false)
if config.HealthCheck == nil {
continue
@ -397,7 +397,7 @@ func (m *Manager) getLoadBalancerServiceHandler(ctx context.Context, serviceName
proxy, _ = capture.Wrap(proxy)
}
lb.Add(proxyName, proxy, server.Weight)
lb.Add(proxyName, proxy, server.Weight, server.Fenced)
// servers are considered UP by default.
info.UpdateServerStatus(target.String(), runtime.StatusUp)