fix: watch bufer overrun for RouteStatus

Fixes #8157

This PR contains two fixes, both related to the same problem.

Several routes for different links but  same IPv6 destination might exist
at the same time, so route resource ID should handle that. The problem
was that these routes were mis-reported causing internally updates for
the same resources multiple times (equal to the number of the links).

Don't trigger controllers more often than 10 times/seconds (with burst of
5) for kernel notifications. This ensures Talos doesn't try to reflect
current state of the network subsystem too often as resources, which
causes excessive CPU usage and might potentially lead to the buffer
overrun under high rate of changes.

Signed-off-by: Andrey Smirnov <andrey.smirnov@siderolabs.com>
This commit is contained in:
Andrey Smirnov 2024-01-17 18:40:36 +04:00
parent cc06b5d7a6
commit 474eccdc4c
No known key found for this signature in database
GPG Key ID: FE042E3D4085A811
16 changed files with 152 additions and 19 deletions

View File

@ -423,7 +423,7 @@ func (ctrl *ManagerController) Run(ctx context.Context, r controller.Runtime, lo
if err = safe.WriterModify(ctx, r,
network.NewRouteSpec(
network.ConfigNamespaceName,
network.LayeredID(network.ConfigOperator, network.RouteID(spec.Table, spec.Family, spec.Destination, spec.Gateway, spec.Priority)),
network.LayeredID(network.ConfigOperator, network.RouteID(spec.Table, spec.Family, spec.Destination, spec.Gateway, spec.Priority, spec.OutLinkName)),
),
func(r *network.RouteSpec) error {
*r.TypedSpec() = spec

View File

@ -145,6 +145,7 @@ func (suite *ManagerSuite) TestReconcile() {
netip.Prefix{},
netip.Addr{},
1,
"kubespan",
),
),
func(res *network.RouteSpec, asrt *assert.Assertions) {},
@ -160,6 +161,7 @@ func (suite *ManagerSuite) TestReconcile() {
netip.Prefix{},
netip.Addr{},
1,
"kubespan",
),
),
func(res *network.RouteSpec, asrt *assert.Assertions) {},

View File

@ -54,7 +54,7 @@ func (ctrl *AddressSpecController) Outputs() []controller.Output {
//nolint:gocyclo
func (ctrl *AddressSpecController) Run(ctx context.Context, r controller.Runtime, logger *zap.Logger) error {
// watch link changes as some address might need to be re-applied if the link appears
watcher, err := watch.NewRtNetlink(r, unix.RTMGRP_LINK)
watcher, err := watch.NewRtNetlink(watch.NewDefaultRateLimitedTrigger(ctx, r), unix.RTMGRP_LINK)
if err != nil {
return err
}

View File

@ -47,7 +47,7 @@ func (ctrl *AddressStatusController) Outputs() []controller.Output {
//
//nolint:gocyclo
func (ctrl *AddressStatusController) Run(ctx context.Context, r controller.Runtime, logger *zap.Logger) error {
watcher, err := watch.NewRtNetlink(r, unix.RTMGRP_LINK|unix.RTMGRP_IPV4_IFADDR|unix.RTMGRP_IPV6_IFADDR)
watcher, err := watch.NewRtNetlink(watch.NewDefaultRateLimitedTrigger(ctx, r), unix.RTMGRP_LINK|unix.RTMGRP_IPV4_IFADDR|unix.RTMGRP_IPV6_IFADDR)
if err != nil {
return err
}

View File

@ -67,7 +67,7 @@ func (ctrl *LinkSpecController) Run(ctx context.Context, r controller.Runtime, l
}
// watch link changes as some routes might need to be re-applied if the link appears
watcher, err := watch.NewRtNetlink(r, unix.RTMGRP_LINK)
watcher, err := watch.NewRtNetlink(watch.NewDefaultRateLimitedTrigger(ctx, r), unix.RTMGRP_LINK)
if err != nil {
return err
}

View File

@ -72,14 +72,14 @@ func (ctrl *LinkStatusController) Run(ctx context.Context, r controller.Runtime,
// create watch connections to rtnetlink and ethtool via genetlink
// these connections are used only to join multicast groups and receive notifications on changes
// other connections are used to send requests and receive responses, as we can't mix the notifications and request/responses
rtnetlinkWatcher, err := watch.NewRtNetlink(r, unix.RTMGRP_LINK)
rtnetlinkWatcher, err := watch.NewRtNetlink(watch.NewDefaultRateLimitedTrigger(ctx, r), unix.RTMGRP_LINK)
if err != nil {
return err
}
defer rtnetlinkWatcher.Done()
ethtoolWatcher, err := watch.NewEthtool(r)
ethtoolWatcher, err := watch.NewEthtool(watch.NewDefaultRateLimitedTrigger(ctx, r))
if err != nil {
logger.Warn("ethtool watcher failed to start", zap.Error(err))
} else {

View File

@ -302,7 +302,10 @@ func (ctrl *OperatorSpecController) reconcileOperatorOutputs(ctx context.Context
if err := apply(
network.NewRouteSpec(
network.ConfigNamespaceName,
fmt.Sprintf("%s/%s", op.Operator.Prefix(), network.RouteID(routeSpec.Table, routeSpec.Family, routeSpec.Destination, routeSpec.Gateway, routeSpec.Priority)),
fmt.Sprintf("%s/%s",
op.Operator.Prefix(),
network.RouteID(routeSpec.Table, routeSpec.Family, routeSpec.Destination, routeSpec.Gateway, routeSpec.Priority, routeSpec.OutLinkName),
),
),
func(r resource.Resource) {
*r.(*network.RouteSpec).TypedSpec() = routeSpec

View File

@ -281,7 +281,10 @@ func (ctrl *PlatformConfigController) apply(ctx context.Context, r controller.Ru
idBuilder: func(spec interface{}) (resource.ID, error) {
routeSpec := spec.(network.RouteSpecSpec) //nolint:errcheck,forcetypeassert
return network.LayeredID(network.ConfigPlatform, network.RouteID(routeSpec.Table, routeSpec.Family, routeSpec.Destination, routeSpec.Gateway, routeSpec.Priority)), nil
return network.LayeredID(
network.ConfigPlatform,
network.RouteID(routeSpec.Table, routeSpec.Family, routeSpec.Destination, routeSpec.Gateway, routeSpec.Priority, routeSpec.OutLinkName),
), nil
},
resourceBuilder: func(id string) resource.Resource {
return network.NewRouteSpec(network.ConfigNamespaceName, id)

View File

@ -155,7 +155,7 @@ func (ctrl *RouteConfigController) apply(ctx context.Context, r controller.Runti
for _, route := range routes {
route := route
id := network.LayeredID(route.ConfigLayer, network.RouteID(route.Table, route.Family, route.Destination, route.Gateway, route.Priority))
id := network.LayeredID(route.ConfigLayer, network.RouteID(route.Table, route.Family, route.Destination, route.Gateway, route.Priority, route.OutLinkName))
if err := r.Modify(
ctx,

View File

@ -230,7 +230,7 @@ func (suite *RouteConfigSuite) TestMachineConfiguration() {
suite.assertRoutes(
[]string{
"configuration/inet6/2001:470:6d:30e:8ed2:b60c:9d2f:803b//1024",
"configuration/eth2/inet6/2001:470:6d:30e:8ed2:b60c:9d2f:803b//1024",
"configuration/inet4/10.0.3.1/10.0.3.0/24/1024",
"configuration/inet4/192.168.0.25/192.168.0.0/18/25",
"configuration/inet4/192.244.0.1/192.244.0.0/24/1024",

View File

@ -73,7 +73,7 @@ func (ctrl *RouteMergeController) Run(ctx context.Context, r controller.Runtime,
for _, res := range list.Items {
route := res.(*network.RouteSpec) //nolint:errcheck,forcetypeassert
id := network.RouteID(route.TypedSpec().Table, route.TypedSpec().Family, route.TypedSpec().Destination, route.TypedSpec().Gateway, route.TypedSpec().Priority)
id := network.RouteID(route.TypedSpec().Table, route.TypedSpec().Family, route.TypedSpec().Destination, route.TypedSpec().Gateway, route.TypedSpec().Priority, route.TypedSpec().OutLinkName)
existing, ok := routes[id]
if ok && existing.TypedSpec().ConfigLayer > route.TypedSpec().ConfigLayer {

View File

@ -51,7 +51,7 @@ func (ctrl *RouteSpecController) Outputs() []controller.Output {
//nolint:gocyclo
func (ctrl *RouteSpecController) Run(ctx context.Context, r controller.Runtime, logger *zap.Logger) error {
// watch link changes as some routes might need to be re-applied if the link appears
watcher, err := watch.NewRtNetlink(r, unix.RTMGRP_LINK|unix.RTMGRP_IPV4_ROUTE)
watcher, err := watch.NewRtNetlink(watch.NewDefaultRateLimitedTrigger(ctx, r), unix.RTMGRP_LINK|unix.RTMGRP_IPV4_ROUTE)
if err != nil {
return err
}

View File

@ -47,7 +47,7 @@ func (ctrl *RouteStatusController) Outputs() []controller.Output {
//
//nolint:gocyclo
func (ctrl *RouteStatusController) Run(ctx context.Context, r controller.Runtime, logger *zap.Logger) error {
watcher, err := watch.NewRtNetlink(r, unix.RTMGRP_IPV4_MROUTE|unix.RTMGRP_IPV4_ROUTE|unix.RTMGRP_IPV6_MROUTE|unix.RTMGRP_IPV6_ROUTE)
watcher, err := watch.NewRtNetlink(watch.NewDefaultRateLimitedTrigger(ctx, r), unix.RTMGRP_IPV4_MROUTE|unix.RTMGRP_IPV4_ROUTE|unix.RTMGRP_IPV6_MROUTE|unix.RTMGRP_IPV6_ROUTE)
if err != nil {
return err
}
@ -104,7 +104,9 @@ func (ctrl *RouteStatusController) Run(ctx context.Context, r controller.Runtime
dstPrefix := netip.PrefixFrom(dstAddr, int(route.DstLength))
srcAddr, _ := netip.AddrFromSlice(route.Attributes.Src)
gatewayAddr, _ := netip.AddrFromSlice(route.Attributes.Gateway)
id := network.RouteID(nethelpers.RoutingTable(route.Table), nethelpers.Family(route.Family), dstPrefix, gatewayAddr, route.Attributes.Priority)
outLinkName := linkLookup[route.Attributes.OutIface]
id := network.RouteID(nethelpers.RoutingTable(route.Table), nethelpers.Family(route.Family), dstPrefix, gatewayAddr, route.Attributes.Priority, outLinkName)
if err = r.Modify(ctx, network.NewRouteStatus(network.NamespaceName, id), func(r resource.Resource) error {
status := r.(*network.RouteStatus).TypedSpec()
@ -114,7 +116,7 @@ func (ctrl *RouteStatusController) Run(ctx context.Context, r controller.Runtime
status.Source = srcAddr
status.Gateway = gatewayAddr
status.OutLinkIndex = route.Attributes.OutIface
status.OutLinkName = linkLookup[route.Attributes.OutIface]
status.OutLinkName = outLinkName
status.Priority = route.Attributes.Priority
status.Table = nethelpers.RoutingTable(route.Table)
status.Scope = nethelpers.Scope(route.Scope)

View File

@ -0,0 +1,74 @@
// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this
// file, You can obtain one at http://mozilla.org/MPL/2.0/.
package watch
import (
"context"
"golang.org/x/time/rate"
)
// RateLimitedTrigger wraps a Trigger with rate limiting.
type RateLimitedTrigger struct {
trigger Trigger
limiter *rate.Limiter
ch chan struct{}
}
// Interface check.
var _ Trigger = &RateLimitedTrigger{}
// NewRateLimitedTrigger creates a new RateLimitedTrigger with specified params.
//
// Trigger's goroutine exists when the context is canceled.
func NewRateLimitedTrigger(ctx context.Context, trigger Trigger, rateLimit rate.Limit, burst int) *RateLimitedTrigger {
t := &RateLimitedTrigger{
trigger: trigger,
limiter: rate.NewLimiter(rateLimit, burst),
ch: make(chan struct{}),
}
go t.run(ctx)
return t
}
// NewDefaultRateLimitedTrigger creates a new RateLimitedTrigger with default params.
func NewDefaultRateLimitedTrigger(ctx context.Context, trigger Trigger) *RateLimitedTrigger {
const (
defaultRate = 10 // 10 events per second
defaultBurst = 5 // 5 events
)
return NewRateLimitedTrigger(ctx, trigger, defaultRate, defaultBurst)
}
// QueueReconcile implements Trigger interface.
//
// The event is queued if the goroutine is ready to accept it (otherwise it's already
// busy processing a previous event).
// This function returns immediately.
func (t *RateLimitedTrigger) QueueReconcile() {
select {
case t.ch <- struct{}{}:
default:
}
}
func (t *RateLimitedTrigger) run(ctx context.Context) {
for {
select {
case <-ctx.Done():
return
case <-t.ch:
}
if err := t.limiter.Wait(ctx); err != nil {
return
}
t.trigger.QueueReconcile()
}
}

View File

@ -0,0 +1,45 @@
// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this
// file, You can obtain one at http://mozilla.org/MPL/2.0/.
package watch_test
import (
"context"
"sync/atomic"
"testing"
"time"
"github.com/stretchr/testify/assert"
"github.com/siderolabs/talos/internal/app/machined/pkg/controllers/network/watch"
)
type mockTrigger struct {
count atomic.Int64
}
func (t *mockTrigger) QueueReconcile() {
t.count.Add(1)
}
func (t *mockTrigger) Get() int64 {
return t.count.Load()
}
func TestRateLimitedTrigger(t *testing.T) {
mock := &mockTrigger{}
ctx, cancel := context.WithCancel(context.Background())
t.Cleanup(cancel)
trigger := watch.NewRateLimitedTrigger(ctx, mock, 10, 5)
start := time.Now()
for time.Since(start) < time.Second {
trigger.QueueReconcile()
}
assert.InDelta(t, int64(14), mock.Get(), 5)
}

View File

@ -36,17 +36,21 @@ func LinkID(linkName string) string {
}
// RouteID builds ID (primary key) for the route.
func RouteID(table nethelpers.RoutingTable, family nethelpers.Family, destination netip.Prefix, gateway netip.Addr, priority uint32) string {
func RouteID(table nethelpers.RoutingTable, family nethelpers.Family, destination netip.Prefix, gateway netip.Addr, priority uint32, outLinkName string) string {
dst, _ := destination.MarshalText() //nolint:errcheck
gw, _ := gateway.MarshalText() //nolint:errcheck
tablePrefix := ""
prefix := ""
if table != nethelpers.TableMain {
tablePrefix = fmt.Sprintf("%s/", table)
prefix = fmt.Sprintf("%s/", table)
}
return fmt.Sprintf("%s%s/%s/%s/%d", tablePrefix, family, string(gw), string(dst), priority)
if family == nethelpers.FamilyInet6 {
prefix += fmt.Sprintf("%s/", outLinkName)
}
return fmt.Sprintf("%s%s/%s/%s/%d", prefix, family, string(gw), string(dst), priority)
}
// OperatorID builds ID (primary key) for the operators.