fix: log explicitly when kubelet has no nodeIP match

Fixes #7487

When `.kubelet.nodeIP` filters yield no match, Talos should not start
the kubelet, as using empty address list results in `--node-ip=` empty
kubelet arg, which makes kubelet pick up "the first" address.

Instead, skip updating (creating) the nodeIP and log an explicit
warning.

Signed-off-by: Andrey Smirnov <andrey.smirnov@talos-systems.com>
This commit is contained in:
Andrey Smirnov 2023-07-20 00:41:47 +04:00
parent 6b39c6a4d3
commit 9ef4e5efca
No known key found for this signature in database
GPG Key ID: 7B26396447AB6DFD
18 changed files with 93 additions and 105 deletions

View File

@ -176,5 +176,7 @@ func (suite *K8sAddressFilterSuite) TearDownTest() {
}
func TestK8sAddressFilterSuite(t *testing.T) {
t.Parallel()
suite.Run(t, new(K8sAddressFilterSuite))
}

View File

@ -773,5 +773,7 @@ func (suite *ControlPlaneStaticPodSuite) TearDownTest() {
}
func TestControlPlaneStaticPodSuite(t *testing.T) {
t.Parallel()
suite.Run(t, new(ControlPlaneStaticPodSuite))
}

View File

@ -472,6 +472,8 @@ metadata:
}
func TestK8sControlPlaneSuite(t *testing.T) {
t.Parallel()
suite.Run(t, &K8sControlPlaneSuite{
DefaultSuite: ctest.DefaultSuite{
Timeout: 10 * time.Second,

View File

@ -157,5 +157,7 @@ func (suite *ExtraManifestSuite) TearDownTest() {
}
func TestExtraManifestSuite(t *testing.T) {
t.Parallel()
suite.Run(t, new(ExtraManifestSuite))
}

View File

@ -261,5 +261,7 @@ func (suite *KubeletConfigSuite) TearDownTest() {
}
func TestKubeletConfigSuite(t *testing.T) {
t.Parallel()
suite.Run(t, new(KubeletConfigSuite))
}

View File

@ -335,10 +335,14 @@ func (suite *KubeletSpecSuite) TearDownTest() {
}
func TestKubeletSpecSuite(t *testing.T) {
t.Parallel()
suite.Run(t, new(KubeletSpecSuite))
}
func TestNewKubeletConfigurationFail(t *testing.T) {
t.Parallel()
for _, tt := range []struct {
name string
cfgSpec *k8s.KubeletConfigSpec
@ -386,6 +390,8 @@ func TestNewKubeletConfigurationFail(t *testing.T) {
t.Run(
tt.name, func(t *testing.T) {
t.Parallel()
_, err := k8sctrl.NewKubeletConfiguration(tt.cfgSpec)
require.Error(t, err)
@ -396,6 +402,8 @@ func TestNewKubeletConfigurationFail(t *testing.T) {
}
func TestNewKubeletConfigurationMerge(t *testing.T) {
t.Parallel()
defaultKubeletConfig := kubeletconfig.KubeletConfiguration{
TypeMeta: metav1.TypeMeta{
APIVersion: kubeletconfig.SchemeGroupVersion.String(),
@ -530,6 +538,8 @@ func TestNewKubeletConfigurationMerge(t *testing.T) {
tt := tt
t.Run(tt.name, func(t *testing.T) {
t.Parallel()
expected := defaultKubeletConfig
tt.expectedOverrides(&expected)

View File

@ -135,6 +135,8 @@ func (suite *KubePrismConfigControllerSuite) TestGeneration() {
}
func TestEndpointsBalancerConfigControllerSuite(t *testing.T) {
t.Parallel()
suite.Run(t, &KubePrismConfigControllerSuite{
DefaultSuite: ctest.DefaultSuite{
AfterSetup: func(suite *ctest.DefaultSuite) {

View File

@ -120,6 +120,8 @@ func must[T any](res T, err error) func(t *require.Assertions) T {
}
func TestEndpointsBalancerControllerSuite(t *testing.T) {
t.Parallel()
suite.Run(t, &KubePrismControllerSuite{
DefaultSuite: ctest.DefaultSuite{
AfterSetup: func(suite *ctest.DefaultSuite) {

View File

@ -356,5 +356,7 @@ func (suite *ManifestSuite) TearDownTest() {
}
func TestManifestSuite(t *testing.T) {
t.Parallel()
suite.Run(t, new(ManifestSuite))
}

View File

@ -29,6 +29,8 @@ type NodeLabelsSuite struct {
}
func TestNodeLabelsSuite(t *testing.T) {
t.Parallel()
suite.Run(t, &NodeLabelsSuite{
DefaultSuite: ctest.DefaultSuite{
Timeout: 5 * time.Second,

View File

@ -31,6 +31,8 @@ type NodeTaintsSuite struct {
}
func TestNodeTaintsSuite(t *testing.T) {
t.Parallel()
suite.Run(t, &NodeTaintsSuite{
DefaultSuite: ctest.DefaultSuite{
Timeout: 5 * time.Second,

View File

@ -109,6 +109,10 @@ func (ctrl *NodeIPController) Run(ctx context.Context, r controller.Runtime, log
return fmt.Errorf("error filtering IPs: %w", err)
}
if len(ips) == 0 {
logger.Warn("no suitable node IP found, please make sure .machine.kubelet.nodeIP filters and pod/service subnets are set up correctly")
}
// filter down to make sure only one IPv4 and one IPv6 address stays
var hasIPv4, hasIPv6 bool

View File

@ -219,5 +219,7 @@ func (suite *NodeIPConfigSuite) TearDownTest() {
}
func TestNodeIPConfigSuite(t *testing.T) {
t.Parallel()
suite.Run(t, new(NodeIPConfigSuite))
}

View File

@ -2,76 +2,34 @@
// License, v. 2.0. If a copy of the MPL was not distributed with this
// file, You can obtain one at http://mozilla.org/MPL/2.0/.
//nolint:dupl
package k8s_test
import (
"context"
"fmt"
"log"
"net/netip"
"sync"
"testing"
"time"
"github.com/cosi-project/runtime/pkg/controller/runtime"
"github.com/cosi-project/runtime/pkg/resource"
"github.com/cosi-project/runtime/pkg/state"
"github.com/cosi-project/runtime/pkg/state/impl/inmem"
"github.com/cosi-project/runtime/pkg/state/impl/namespaced"
"github.com/siderolabs/go-retry/retry"
"github.com/cosi-project/runtime/pkg/resource/rtestutils"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/suite"
"github.com/siderolabs/talos/internal/app/machined/pkg/controllers/ctest"
k8sctrl "github.com/siderolabs/talos/internal/app/machined/pkg/controllers/k8s"
"github.com/siderolabs/talos/pkg/logging"
"github.com/siderolabs/talos/pkg/machinery/resources/k8s"
"github.com/siderolabs/talos/pkg/machinery/resources/network"
)
type NodeIPSuite struct {
suite.Suite
state state.State
runtime *runtime.Runtime
wg sync.WaitGroup
ctx context.Context //nolint:containedctx
ctxCancel context.CancelFunc
}
func (suite *NodeIPSuite) SetupTest() {
suite.ctx, suite.ctxCancel = context.WithTimeout(context.Background(), 3*time.Minute)
suite.state = state.WrapCore(namespaced.NewState(inmem.Build))
var err error
suite.runtime, err = runtime.NewRuntime(suite.state, logging.Wrap(log.Writer()))
suite.Require().NoError(err)
suite.Require().NoError(suite.runtime.RegisterController(&k8sctrl.NodeIPController{}))
suite.startRuntime()
}
func (suite *NodeIPSuite) startRuntime() {
suite.wg.Add(1)
go func() {
defer suite.wg.Done()
suite.Assert().NoError(suite.runtime.Run(suite.ctx))
}()
ctest.DefaultSuite
}
func (suite *NodeIPSuite) TestReconcileIPv4() {
cfg := k8s.NewNodeIPConfig(k8s.NamespaceName, k8s.KubeletID)
cfg.TypedSpec().ValidSubnets = []string{"10.0.0.0/24", "::/0"}
cfg.TypedSpec().ExcludeSubnets = []string{"10.0.0.2"}
suite.Require().NoError(suite.state.Create(suite.ctx, cfg))
suite.Require().NoError(suite.State().Create(suite.Ctx(), cfg))
addresses := network.NewNodeAddress(
network.NamespaceName,
@ -83,87 +41,73 @@ func (suite *NodeIPSuite) TestReconcileIPv4() {
netip.MustParsePrefix("10.0.0.5/24"),
}
suite.Require().NoError(suite.state.Create(suite.ctx, addresses))
suite.Require().NoError(suite.State().Create(suite.Ctx(), addresses))
suite.Assert().NoError(
retry.Constant(10*time.Second, retry.WithUnits(100*time.Millisecond)).Retry(
func() error {
NodeIP, err := suite.state.Get(
suite.ctx,
resource.NewMetadata(k8s.NamespaceName, k8s.NodeIPType, k8s.KubeletID, resource.VersionUndefined),
)
if err != nil {
if state.IsNotFoundError(err) {
return retry.ExpectedError(err)
}
return err
}
spec := NodeIP.(*k8s.NodeIP).TypedSpec()
suite.Assert().Equal("[10.0.0.5]", fmt.Sprintf("%s", spec.Addresses))
return nil
},
),
)
rtestutils.AssertResources(suite.Ctx(), suite.T(), suite.State(), []resource.ID{k8s.KubeletID}, func(nodeIP *k8s.NodeIP, asrt *assert.Assertions) {
asrt.Equal("[10.0.0.5]", fmt.Sprintf("%s", nodeIP.TypedSpec().Addresses))
})
}
func (suite *NodeIPSuite) TestReconcileDefaultSubnets() {
cfg := k8s.NewNodeIPConfig(k8s.NamespaceName, k8s.KubeletID)
cfg.TypedSpec().ValidSubnets = []string{"0.0.0.0/0", "::/0"}
suite.Require().NoError(suite.state.Create(suite.ctx, cfg))
suite.Require().NoError(suite.State().Create(suite.Ctx(), cfg))
addresses := network.NewNodeAddress(
network.NamespaceName,
network.FilteredNodeAddressID(network.NodeAddressRoutedID, k8s.NodeAddressFilterNoK8s),
)
addresses.TypedSpec().Addresses = []netip.Prefix{
netip.MustParsePrefix("10.0.0.5/24"),
netip.MustParsePrefix("192.168.1.1/24"),
netip.MustParsePrefix("2001:0db8:85a3:0000:0000:8a2e:0370:7334/64"),
netip.MustParsePrefix("2001:0db8:85a3:0000:0000:8a2e:0370:7335/64"),
}
suite.Require().NoError(suite.State().Create(suite.Ctx(), addresses))
suite.Require().NoError(suite.state.Create(suite.ctx, addresses))
suite.Assert().NoError(
retry.Constant(10*time.Second, retry.WithUnits(100*time.Millisecond)).Retry(
func() error {
NodeIP, err := suite.state.Get(
suite.ctx,
resource.NewMetadata(k8s.NamespaceName, k8s.NodeIPType, k8s.KubeletID, resource.VersionUndefined),
)
if err != nil {
if state.IsNotFoundError(err) {
return retry.ExpectedError(err)
}
return err
}
spec := NodeIP.(*k8s.NodeIP).TypedSpec()
suite.Assert().Equal("[10.0.0.5 2001:db8:85a3::8a2e:370:7334]", fmt.Sprintf("%s", spec.Addresses))
return nil
},
),
)
rtestutils.AssertResources(suite.Ctx(), suite.T(), suite.State(), []resource.ID{k8s.KubeletID}, func(nodeIP *k8s.NodeIP, asrt *assert.Assertions) {
asrt.Equal("[10.0.0.5 2001:db8:85a3::8a2e:370:7334]", fmt.Sprintf("%s", nodeIP.TypedSpec().Addresses))
})
}
func (suite *NodeIPSuite) TearDownTest() {
suite.T().Log("tear down")
func (suite *NodeIPSuite) TestReconcileNoMatch() {
cfg := k8s.NewNodeIPConfig(k8s.NamespaceName, k8s.KubeletID)
cfg.TypedSpec().ValidSubnets = []string{"0.0.0.0/0"}
suite.Require().NoError(suite.State().Create(suite.Ctx(), cfg))
suite.ctxCancel()
addresses := network.NewNodeAddress(
network.NamespaceName,
network.FilteredNodeAddressID(network.NodeAddressRoutedID, k8s.NodeAddressFilterNoK8s),
)
addresses.TypedSpec().Addresses = []netip.Prefix{
netip.MustParsePrefix("10.0.0.2/32"),
netip.MustParsePrefix("10.0.0.5/24"),
}
suite.Require().NoError(suite.State().Create(suite.Ctx(), addresses))
suite.wg.Wait()
rtestutils.AssertResources(suite.Ctx(), suite.T(), suite.State(), []resource.ID{k8s.KubeletID}, func(nodeIP *k8s.NodeIP, asrt *assert.Assertions) {
asrt.Equal("[10.0.0.2]", fmt.Sprintf("%s", nodeIP.TypedSpec().Addresses))
})
cfg.TypedSpec().ValidSubnets = nil
cfg.TypedSpec().ExcludeSubnets = []string{"10.0.0.2"}
suite.Require().NoError(suite.State().Update(suite.Ctx(), cfg))
// the node IP doesn't change, as there's no match for the filter
rtestutils.AssertResources(suite.Ctx(), suite.T(), suite.State(), []resource.ID{k8s.KubeletID}, func(nodeIP *k8s.NodeIP, asrt *assert.Assertions) {
asrt.Equal("[10.0.0.2]", fmt.Sprintf("%s", nodeIP.TypedSpec().Addresses))
})
}
func TestNodeIPSuite(t *testing.T) {
suite.Run(t, new(NodeIPSuite))
t.Parallel()
suite.Run(t, &NodeIPSuite{
DefaultSuite: ctest.DefaultSuite{
Timeout: 5 * time.Second,
AfterSetup: func(s *ctest.DefaultSuite) {
s.Require().NoError(s.Runtime().RegisterController(&k8sctrl.NodeIPController{}))
},
},
})
}

View File

@ -185,5 +185,7 @@ func (suite *NodenameSuite) TearDownTest() {
}
func TestNodenameSuite(t *testing.T) {
t.Parallel()
suite.Run(t, new(NodenameSuite))
}

View File

@ -55,6 +55,8 @@ func (suite *StaticEndpointControllerSuite) TestReconcile() {
}
func TestStaticEndpointControllerSuite(t *testing.T) {
t.Parallel()
suite.Run(t, &StaticEndpointControllerSuite{
DefaultSuite: ctest.DefaultSuite{
AfterSetup: func(suite *ctest.DefaultSuite) {

View File

@ -200,5 +200,7 @@ func (suite *StaticPodConfigSuite) TearDownTest() {
}
func TestStaticPodConfigSuite(t *testing.T) {
t.Parallel()
suite.Run(t, new(StaticPodConfigSuite))
}

View File

@ -186,5 +186,7 @@ func (suite *StaticPodListSuite) TearDownTest() {
}
func TestStaticPodListSuite(t *testing.T) {
t.Parallel()
suite.Run(t, new(StaticPodListSuite))
}