chore: modernize machined/pkg/controllers/k8s

This is going to be multipart effort to finally use safe.* wrappers in the production code.
Quick regexp search shows that there are around 150 direct type assertions on resources (excluding the ones in this commit).

Also - migrate from `interface{}` to `any` and use `slices.Sort*` instead of `sort.*` where possible.

Signed-off-by: Dmitriy Matrenichev <dmitry.matrenichev@siderolabs.com>
This commit is contained in:
Dmitriy Matrenichev 2023-12-04 17:47:41 +03:00
parent 760f793d55
commit 59b62398f6
No known key found for this signature in database
GPG Key ID: D3363CF894E68892
22 changed files with 122 additions and 120 deletions

View File

@ -11,7 +11,6 @@ import (
"slices"
"github.com/cosi-project/runtime/pkg/controller"
"github.com/cosi-project/runtime/pkg/resource"
"github.com/cosi-project/runtime/pkg/safe"
"github.com/cosi-project/runtime/pkg/state"
"github.com/siderolabs/gen/optional"
@ -97,20 +96,16 @@ func (ctrl *AddressFilterController) Run(ctx context.Context, r controller.Runti
serviceCIDRs = append(serviceCIDRs, ipPrefix)
}
if err = r.Modify(ctx, network.NewNodeAddressFilter(network.NamespaceName, k8s.NodeAddressFilterNoK8s), func(r resource.Resource) error {
spec := r.(*network.NodeAddressFilter).TypedSpec()
spec.ExcludeSubnets = append(slices.Clone(podCIDRs), serviceCIDRs...)
if err = safe.WriterModify(ctx, r, network.NewNodeAddressFilter(network.NamespaceName, k8s.NodeAddressFilterNoK8s), func(r *network.NodeAddressFilter) error {
r.TypedSpec().ExcludeSubnets = append(slices.Clone(podCIDRs), serviceCIDRs...)
return nil
}); err != nil {
return fmt.Errorf("error updating output resource: %w", err)
}
if err = r.Modify(ctx, network.NewNodeAddressFilter(network.NamespaceName, k8s.NodeAddressFilterOnlyK8s), func(r resource.Resource) error {
spec := r.(*network.NodeAddressFilter).TypedSpec()
spec.IncludeSubnets = append(slices.Clone(podCIDRs), serviceCIDRs...)
if err = safe.WriterModify(ctx, r, network.NewNodeAddressFilter(network.NamespaceName, k8s.NodeAddressFilterOnlyK8s), func(r *network.NodeAddressFilter) error {
r.TypedSpec().IncludeSubnets = append(slices.Clone(podCIDRs), serviceCIDRs...)
return nil
}); err != nil {

View File

@ -14,6 +14,7 @@ import (
"github.com/cosi-project/runtime/pkg/controller"
"github.com/cosi-project/runtime/pkg/resource"
"github.com/cosi-project/runtime/pkg/safe"
"github.com/cosi-project/runtime/pkg/state"
"github.com/siderolabs/gen/maps"
"github.com/siderolabs/gen/optional"
@ -107,7 +108,7 @@ func (ctrl *ControlPlaneStaticPodController) Run(ctx context.Context, r controll
}
// wait for etcd to be healthy as kube-apiserver is using local etcd instance
etcdResource, err := r.Get(ctx, resource.NewMetadata(v1alpha1.NamespaceName, v1alpha1.ServiceType, "etcd", resource.VersionUndefined))
etcdResource, err := safe.ReaderGetByID[*v1alpha1.Service](ctx, r, "etcd")
if err != nil {
if state.IsNotFoundError(err) {
if err = ctrl.teardownAll(ctx, r); err != nil {
@ -120,11 +121,11 @@ func (ctrl *ControlPlaneStaticPodController) Run(ctx context.Context, r controll
return err
}
if !etcdResource.(*v1alpha1.Service).TypedSpec().Healthy {
if !etcdResource.TypedSpec().Healthy {
continue
}
secretsStatusResource, err := r.Get(ctx, resource.NewMetadata(k8s.ControlPlaneNamespaceName, k8s.SecretsStatusType, k8s.StaticPodSecretsStaticPodID, resource.VersionUndefined))
secretsStatusResource, err := safe.ReaderGetByID[*k8s.SecretsStatus](ctx, r, k8s.StaticPodSecretsStaticPodID)
if err != nil {
if state.IsNotFoundError(err) {
if err = ctrl.teardownAll(ctx, r); err != nil {
@ -137,9 +138,9 @@ func (ctrl *ControlPlaneStaticPodController) Run(ctx context.Context, r controll
return err
}
secretsVersion := secretsStatusResource.(*k8s.SecretsStatus).TypedSpec().Version
secretsVersion := secretsStatusResource.TypedSpec().Version
configStatusResource, err := r.Get(ctx, resource.NewMetadata(k8s.ControlPlaneNamespaceName, k8s.ConfigStatusType, k8s.ConfigStatusStaticPodID, resource.VersionUndefined))
configStatusResource, err := safe.ReaderGetByID[*k8s.ConfigStatus](ctx, r, k8s.ConfigStatusStaticPodID)
if err != nil {
if state.IsNotFoundError(err) {
if err = ctrl.teardownAll(ctx, r); err != nil {
@ -152,7 +153,7 @@ func (ctrl *ControlPlaneStaticPodController) Run(ctx context.Context, r controll
return err
}
configVersion := configStatusResource.(*k8s.ConfigStatus).TypedSpec().Version
configVersion := configStatusResource.TypedSpec().Version
touchedIDs := map[string]struct{}{}
@ -339,7 +340,7 @@ func goGCEnvFromResources(resources v1.ResourceRequirements) (envVar v1.EnvVar)
return envVar
}
func (ctrl *ControlPlaneStaticPodController) manageAPIServer(ctx context.Context, r controller.Runtime, logger *zap.Logger,
func (ctrl *ControlPlaneStaticPodController) manageAPIServer(ctx context.Context, r controller.Runtime, _ *zap.Logger,
configResource resource.Resource, secretsVersion, configVersion string,
) (string, error) {
cfg := configResource.(*k8s.APIServerConfig).TypedSpec()
@ -445,8 +446,8 @@ func (ctrl *ControlPlaneStaticPodController) manageAPIServer(ctx context.Context
env = append(env, goGCEnv)
}
return k8s.APIServerID, r.Modify(ctx, k8s.NewStaticPod(k8s.NamespaceName, k8s.APIServerID), func(r resource.Resource) error {
return k8sadapter.StaticPod(r.(*k8s.StaticPod)).SetPod(&v1.Pod{
return k8s.APIServerID, safe.WriterModify(ctx, r, k8s.NewStaticPod(k8s.NamespaceName, k8s.APIServerID), func(r *k8s.StaticPod) error {
return k8sadapter.StaticPod(r).SetPod(&v1.Pod{
TypeMeta: metav1.TypeMeta{
APIVersion: "v1",
Kind: "Pod",
@ -555,7 +556,7 @@ func (ctrl *ControlPlaneStaticPodController) manageAPIServer(ctx context.Context
}
func (ctrl *ControlPlaneStaticPodController) manageControllerManager(ctx context.Context, r controller.Runtime,
logger *zap.Logger, configResource resource.Resource, secretsVersion, configVersion string,
_ *zap.Logger, configResource resource.Resource, secretsVersion, _ string,
) (string, error) {
cfg := configResource.(*k8s.ControllerManagerConfig).TypedSpec()
@ -619,8 +620,8 @@ func (ctrl *ControlPlaneStaticPodController) manageControllerManager(ctx context
env = append(env, goGCEnv)
}
return k8s.ControllerManagerID, r.Modify(ctx, k8s.NewStaticPod(k8s.NamespaceName, k8s.ControllerManagerID), func(r resource.Resource) error {
return k8sadapter.StaticPod(r.(*k8s.StaticPod)).SetPod(&v1.Pod{
return k8s.ControllerManagerID, safe.WriterModify(ctx, r, k8s.NewStaticPod(k8s.NamespaceName, k8s.ControllerManagerID), func(r *k8s.StaticPod) error {
return k8sadapter.StaticPod(r).SetPod(&v1.Pod{
TypeMeta: metav1.TypeMeta{
APIVersion: "v1",
Kind: "Pod",
@ -712,7 +713,7 @@ func (ctrl *ControlPlaneStaticPodController) manageControllerManager(ctx context
}
func (ctrl *ControlPlaneStaticPodController) manageScheduler(ctx context.Context, r controller.Runtime,
logger *zap.Logger, configResource resource.Resource, secretsVersion, configVersion string,
_ *zap.Logger, configResource resource.Resource, secretsVersion, _ string,
) (string, error) {
cfg := configResource.(*k8s.SchedulerConfig).TypedSpec()
@ -758,8 +759,8 @@ func (ctrl *ControlPlaneStaticPodController) manageScheduler(ctx context.Context
env = append(env, goGCEnv)
}
return k8s.SchedulerID, r.Modify(ctx, k8s.NewStaticPod(k8s.NamespaceName, k8s.SchedulerID), func(r resource.Resource) error {
return k8sadapter.StaticPod(r.(*k8s.StaticPod)).SetPod(&v1.Pod{
return k8s.SchedulerID, safe.WriterModify(ctx, r, k8s.NewStaticPod(k8s.NamespaceName, k8s.SchedulerID), func(r *k8s.StaticPod) error {
return k8sadapter.StaticPod(r).SetPod(&v1.Pod{
TypeMeta: metav1.TypeMeta{
APIVersion: "v1",
Kind: "Pod",

View File

@ -319,13 +319,13 @@ func (suite *K8sControlPlaneSuite) TestReconcileResources() {
APIServerConfig: &v1alpha1.APIServerConfig{
ResourcesConfig: &v1alpha1.ResourcesConfig{
Requests: v1alpha1.Unstructured{
Object: map[string]interface{}{
Object: map[string]any{
"cpu": "100m",
"memory": "1Gi",
},
},
Limits: v1alpha1.Unstructured{
Object: map[string]interface{}{
Object: map[string]any{
"cpu": 2,
"memory": "1500Mi",
},
@ -335,13 +335,13 @@ func (suite *K8sControlPlaneSuite) TestReconcileResources() {
ControllerManagerConfig: &v1alpha1.ControllerManagerConfig{
ResourcesConfig: &v1alpha1.ResourcesConfig{
Requests: v1alpha1.Unstructured{
Object: map[string]interface{}{
Object: map[string]any{
"cpu": "50m",
"memory": "500Mi",
},
},
Limits: v1alpha1.Unstructured{
Object: map[string]interface{}{
Object: map[string]any{
"cpu": 1,
"memory": "1000Mi",
},
@ -351,13 +351,13 @@ func (suite *K8sControlPlaneSuite) TestReconcileResources() {
SchedulerConfig: &v1alpha1.SchedulerConfig{
ResourcesConfig: &v1alpha1.ResourcesConfig{
Requests: v1alpha1.Unstructured{
Object: map[string]interface{}{
Object: map[string]any{
"cpu": "150m",
"memory": "2Gi",
},
},
Limits: v1alpha1.Unstructured{
Object: map[string]interface{}{
Object: map[string]any{
"cpu": 3,
"memory": "2000Mi",
},

View File

@ -9,11 +9,11 @@ import (
"fmt"
"net/netip"
"reflect"
"sort"
"slices"
"time"
"github.com/cosi-project/runtime/pkg/controller"
"github.com/cosi-project/runtime/pkg/resource"
"github.com/cosi-project/runtime/pkg/safe"
"github.com/cosi-project/runtime/pkg/state"
"github.com/siderolabs/gen/optional"
"go.uber.org/zap"
@ -80,7 +80,7 @@ func (ctrl *EndpointController) Run(ctx context.Context, r controller.Runtime, l
case <-r.EventCh():
}
machineTypeRes, err := r.Get(ctx, resource.NewMetadata(config.NamespaceName, config.MachineTypeType, config.MachineTypeID, resource.VersionUndefined))
machineTypeRes, err := safe.ReaderGetByID[*config.MachineType](ctx, r, config.MachineTypeID)
if err != nil {
if state.IsNotFoundError(err) {
continue
@ -89,7 +89,7 @@ func (ctrl *EndpointController) Run(ctx context.Context, r controller.Runtime, l
return fmt.Errorf("error getting machine type: %w", err)
}
machineType := machineTypeRes.(*config.MachineType).MachineType()
machineType := machineTypeRes.MachineType()
switch machineType { //nolint:exhaustive
case machine.TypeWorker:
@ -170,7 +170,7 @@ func (ctrl *EndpointController) watchEndpointsOnControlPlane(ctx context.Context
return nil
}
secretsResources, err := r.Get(ctx, resource.NewMetadata(secrets.NamespaceName, secrets.KubernetesType, secrets.KubernetesID, resource.VersionUndefined))
secretsResources, err := safe.ReaderGetByID[*secrets.Kubernetes](ctx, r, secrets.KubernetesID)
if err != nil {
if state.IsNotFoundError(err) {
return nil
@ -179,7 +179,7 @@ func (ctrl *EndpointController) watchEndpointsOnControlPlane(ctx context.Context
return err
}
secrets := secretsResources.(*secrets.Kubernetes).TypedSpec()
secrets := secretsResources.TypedSpec()
kubeconfig, err := clientcmd.BuildConfigFromKubeconfigGetter("", func() (*clientcmdapi.Config, error) {
// using here kubeconfig with cluster control plane endpoint, as endpoint discovery should work before local API server is ready
@ -207,16 +207,17 @@ func (ctrl *EndpointController) updateEndpointsResource(ctx context.Context, r c
}
}
sort.Slice(addrs, func(i, j int) bool { return addrs[i].Compare(addrs[j]) < 0 })
slices.SortFunc(addrs, func(a, b netip.Addr) int { return a.Compare(b) })
if err := r.Modify(ctx,
if err := safe.WriterModify(ctx,
r,
k8s.NewEndpoint(k8s.ControlPlaneNamespaceName, k8s.ControlPlaneAPIServerEndpointsID),
func(r resource.Resource) error {
if !reflect.DeepEqual(r.(*k8s.Endpoint).TypedSpec().Addresses, addrs) {
func(r *k8s.Endpoint) error {
if !reflect.DeepEqual(r.TypedSpec().Addresses, addrs) {
logger.Debug("updated controlplane endpoints", zap.Any("endpoints", addrs))
}
r.(*k8s.Endpoint).TypedSpec().Addresses = addrs
r.TypedSpec().Addresses = addrs
return nil
},
@ -287,9 +288,9 @@ func kubernetesEndpointWatcher(ctx context.Context, logger *zap.Logger, client *
}
if _, err := informer.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) { notifyCh <- obj.(*corev1.Endpoints) },
DeleteFunc: func(_ interface{}) { notifyCh <- &corev1.Endpoints{} },
UpdateFunc: func(_, obj interface{}) { notifyCh <- obj.(*corev1.Endpoints) },
AddFunc: func(obj any) { notifyCh <- obj.(*corev1.Endpoints) },
DeleteFunc: func(_ any) { notifyCh <- &corev1.Endpoints{} },
UpdateFunc: func(_, obj any) { notifyCh <- obj.(*corev1.Endpoints) },
}); err != nil {
return nil, nil, fmt.Errorf("error adding watch event handler: %w", err)
}

View File

@ -13,6 +13,7 @@ import (
"github.com/cosi-project/runtime/pkg/controller"
"github.com/cosi-project/runtime/pkg/resource"
"github.com/cosi-project/runtime/pkg/safe"
"github.com/cosi-project/runtime/pkg/state"
"github.com/hashicorp/go-cleanhttp"
"github.com/hashicorp/go-getter/v2"
@ -73,7 +74,7 @@ func (ctrl *ExtraManifestController) Run(ctx context.Context, r controller.Runti
}
// wait for network to be ready as networking is required to download extra manifests
networkResource, err := r.Get(ctx, resource.NewMetadata(network.NamespaceName, network.StatusType, network.StatusID, resource.VersionUndefined))
networkResource, err := safe.ReaderGetByID[*network.Status](ctx, r, network.StatusID)
if err != nil {
if state.IsNotFoundError(err) {
continue
@ -82,13 +83,13 @@ func (ctrl *ExtraManifestController) Run(ctx context.Context, r controller.Runti
return err
}
networkStatus := networkResource.(*network.Status).TypedSpec()
networkStatus := networkResource.TypedSpec()
if !(networkStatus.AddressReady && networkStatus.ConnectivityReady) {
continue
}
configResource, err := r.Get(ctx, k8s.NewExtraManifestsConfig().Metadata())
configResource, err := safe.ReaderGetByID[*k8s.ExtraManifestsConfig](ctx, r, k8s.ExtraManifestsConfigID)
if err != nil {
if state.IsNotFoundError(err) {
if err = ctrl.teardownAll(ctx, r); err != nil {
@ -101,7 +102,7 @@ func (ctrl *ExtraManifestController) Run(ctx context.Context, r controller.Runti
return err
}
config := *configResource.(*k8s.ExtraManifestsConfig).TypedSpec()
config := *configResource.TypedSpec()
var multiErr *multierror.Error
@ -211,9 +212,9 @@ func (ctrl *ExtraManifestController) processURL(ctx context.Context, r controlle
return
}
if err = r.Modify(ctx, k8s.NewManifest(k8s.ControlPlaneNamespaceName, id),
func(r resource.Resource) error {
return k8sadapter.Manifest(r.(*k8s.Manifest)).SetYAML(contents)
if err = safe.WriterModify(ctx, r, k8s.NewManifest(k8s.ControlPlaneNamespaceName, id),
func(r *k8s.Manifest) error {
return k8sadapter.Manifest(r).SetYAML(contents)
}); err != nil {
err = fmt.Errorf("error updating manifests: %w", err)
@ -224,11 +225,12 @@ func (ctrl *ExtraManifestController) processURL(ctx context.Context, r controlle
}
func (ctrl *ExtraManifestController) processInline(ctx context.Context, r controller.Runtime, manifest k8s.ExtraManifest, id resource.ID) error {
err := r.Modify(
err := safe.WriterModify(
ctx,
r,
k8s.NewManifest(k8s.ControlPlaneNamespaceName, id),
func(r resource.Resource) error {
return k8sadapter.Manifest(r.(*k8s.Manifest)).SetYAML([]byte(manifest.InlineManifest))
func(r *k8s.Manifest) error {
return k8sadapter.Manifest(r).SetYAML([]byte(manifest.InlineManifest))
},
)
if err != nil {

View File

@ -60,7 +60,7 @@ func (r *NodeWatcher) Watch(ctx context.Context, logger *zap.Logger) (<-chan str
notifyCh := make(chan struct{}, 1)
notify := func(_ interface{}) {
notify := func(_ any) {
select {
case notifyCh <- struct{}{}:
default:
@ -78,7 +78,7 @@ func (r *NodeWatcher) Watch(ctx context.Context, logger *zap.Logger) (<-chan str
if _, err := r.nodes.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: notify,
DeleteFunc: notify,
UpdateFunc: func(_, _ interface{}) { notify(nil) },
UpdateFunc: func(_, _ any) { notify(nil) },
}); err != nil {
return nil, nil, fmt.Errorf("failed to add event handler: %w", err)
}

View File

@ -102,7 +102,7 @@ func (suite *KubeletConfigSuite) TestReconcile() {
},
},
KubeletExtraConfig: v1alpha1.Unstructured{
Object: map[string]interface{}{
Object: map[string]any{
"serverTLSBootstrap": true,
},
},
@ -170,7 +170,7 @@ func (suite *KubeletConfigSuite) TestReconcile() {
spec.ExtraMounts,
)
suite.Assert().Equal(
map[string]interface{}{
map[string]any{
"serverTLSBootstrap": true,
},
spec.ExtraConfig,

View File

@ -213,7 +213,7 @@ func (ctrl *KubeletSpecController) Run(ctx context.Context, r controller.Runtime
}
}
func prepareExtraConfig(extraConfig map[string]interface{}) (*kubeletconfig.KubeletConfiguration, error) {
func prepareExtraConfig(extraConfig map[string]any) (*kubeletconfig.KubeletConfiguration, error) {
// check for fields that can't be overridden via extraConfig
var multiErr *multierror.Error

View File

@ -78,7 +78,7 @@ func (suite *KubeletSpecSuite) TestReconcileDefault() {
)
asrt.Equal(cfg.TypedSpec().ExtraMounts, spec.ExtraMounts)
asrt.Equal([]interface{}{"10.96.0.10"}, spec.Config["clusterDNS"])
asrt.Equal([]any{"10.96.0.10"}, spec.Config["clusterDNS"])
asrt.Equal("cluster.local", spec.Config["clusterDomain"])
})
}
@ -164,7 +164,7 @@ func (suite *KubeletSpecSuite) TestReconcileWithExtraConfig() {
cfg.TypedSpec().Image = "kubelet:v2.0.0"
cfg.TypedSpec().ClusterDNS = []string{"10.96.0.11"}
cfg.TypedSpec().ClusterDomain = "some.local"
cfg.TypedSpec().ExtraConfig = map[string]interface{}{
cfg.TypedSpec().ExtraConfig = map[string]any{
"serverTLSBootstrap": true,
}
@ -270,7 +270,7 @@ func TestNewKubeletConfigurationFail(t *testing.T) {
cfgSpec: &k8s.KubeletConfigSpec{
ClusterDNS: []string{"10.96.0.10"},
ClusterDomain: "cluster.svc",
ExtraConfig: map[string]interface{}{
ExtraConfig: map[string]any{
"API": "v1",
"foo": "bar",
"Port": "xyz",
@ -283,7 +283,7 @@ func TestNewKubeletConfigurationFail(t *testing.T) {
cfgSpec: &k8s.KubeletConfigSpec{
ClusterDNS: []string{"10.96.0.10"},
ClusterDomain: "cluster.svc",
ExtraConfig: map[string]interface{}{
ExtraConfig: map[string]any{
"oomScoreAdj": "v1",
},
},
@ -294,7 +294,7 @@ func TestNewKubeletConfigurationFail(t *testing.T) {
cfgSpec: &k8s.KubeletConfigSpec{
ClusterDNS: []string{"10.96.0.10"},
ClusterDomain: "cluster.svc",
ExtraConfig: map[string]interface{}{
ExtraConfig: map[string]any{
"oomScoreAdj": -300,
"port": 81,
"authentication": nil,
@ -380,7 +380,7 @@ func TestNewKubeletConfigurationMerge(t *testing.T) {
cfgSpec: &k8s.KubeletConfigSpec{
ClusterDNS: []string{"10.0.0.5"},
ClusterDomain: "cluster.local",
ExtraConfig: map[string]interface{}{
ExtraConfig: map[string]any{
"oomScoreAdj": -300,
"enableDebuggingHandlers": true,
},
@ -396,7 +396,7 @@ func TestNewKubeletConfigurationMerge(t *testing.T) {
cfgSpec: &k8s.KubeletConfigSpec{
ClusterDNS: []string{"10.0.0.5"},
ClusterDomain: "cluster.local",
ExtraConfig: map[string]interface{}{
ExtraConfig: map[string]any{
"shutdownGracePeriod": "0s",
"shutdownGracePeriodCriticalPods": "0s",
},

View File

@ -210,8 +210,8 @@ func (ctrl *KubeletStaticPodController) refreshPodStatus(ctx context.Context, r
podsSeen[statusID] = struct{}{}
if err = r.Modify(ctx, k8s.NewStaticPodStatus(k8s.NamespaceName, statusID), func(r resource.Resource) error {
return k8sadapter.StaticPodStatus(r.(*k8s.StaticPodStatus)).SetStatus(&pod.Status)
if err = safe.WriterModify(ctx, r, k8s.NewStaticPodStatus(k8s.NamespaceName, statusID), func(r *k8s.StaticPodStatus) error {
return k8sadapter.StaticPodStatus(r).SetStatus(&pod.Status)
}); err != nil {
return fmt.Errorf("error updating pod status: %w", err)
}

View File

@ -14,6 +14,7 @@ import (
"github.com/cosi-project/runtime/pkg/controller"
"github.com/cosi-project/runtime/pkg/resource"
"github.com/cosi-project/runtime/pkg/safe"
"github.com/cosi-project/runtime/pkg/state"
"github.com/siderolabs/gen/optional"
"go.uber.org/zap"
@ -70,7 +71,7 @@ func (ctrl *ManifestController) Run(ctx context.Context, r controller.Runtime, l
case <-r.EventCh():
}
configResource, err := r.Get(ctx, k8s.NewBootstrapManifestsConfig().Metadata())
configResource, err := safe.ReaderGetByID[*k8s.BootstrapManifestsConfig](ctx, r, k8s.BootstrapManifestsConfigID)
if err != nil {
if state.IsNotFoundError(err) {
if err = ctrl.teardownAll(ctx, r); err != nil {
@ -83,9 +84,9 @@ func (ctrl *ManifestController) Run(ctx context.Context, r controller.Runtime, l
return err
}
config := *configResource.(*k8s.BootstrapManifestsConfig).TypedSpec()
config := *configResource.TypedSpec()
secretsResources, err := r.Get(ctx, resource.NewMetadata(secrets.NamespaceName, secrets.KubernetesRootType, secrets.KubernetesRootID, resource.VersionUndefined))
secretsResources, err := safe.ReaderGetByID[*secrets.KubernetesRoot](ctx, r, secrets.KubernetesRootID)
if err != nil {
if state.IsNotFoundError(err) {
if err = ctrl.teardownAll(ctx, r); err != nil {
@ -98,7 +99,7 @@ func (ctrl *ManifestController) Run(ctx context.Context, r controller.Runtime, l
return err
}
secrets := secretsResources.(*secrets.KubernetesRoot).TypedSpec()
secrets := secretsResources.TypedSpec()
renderedManifests, err := ctrl.render(config, secrets)
if err != nil {
@ -108,9 +109,9 @@ func (ctrl *ManifestController) Run(ctx context.Context, r controller.Runtime, l
for _, renderedManifest := range renderedManifests {
renderedManifest := renderedManifest
if err = r.Modify(ctx, k8s.NewManifest(k8s.ControlPlaneNamespaceName, renderedManifest.name),
func(r resource.Resource) error {
return k8sadapter.Manifest(r.(*k8s.Manifest)).SetYAML(renderedManifest.data)
if err = safe.WriterModify(ctx, r, k8s.NewManifest(k8s.ControlPlaneNamespaceName, renderedManifest.name),
func(r *k8s.Manifest) error {
return k8sadapter.Manifest(r).SetYAML(renderedManifest.data)
}); err != nil {
return fmt.Errorf("error updating manifests: %w", err)
}

View File

@ -5,12 +5,15 @@
package k8s
import (
"cmp"
"context"
"fmt"
"slices"
"sort"
"github.com/cosi-project/runtime/pkg/controller"
"github.com/cosi-project/runtime/pkg/resource"
"github.com/cosi-project/runtime/pkg/safe"
"github.com/cosi-project/runtime/pkg/state"
"github.com/hashicorp/go-multierror"
"github.com/siderolabs/gen/optional"
@ -92,7 +95,7 @@ func (ctrl *ManifestApplyController) Run(ctx context.Context, r controller.Runti
case <-r.EventCh():
}
secretsResources, err := r.Get(ctx, resource.NewMetadata(secrets.NamespaceName, secrets.KubernetesType, secrets.KubernetesID, resource.VersionUndefined))
secretsResources, err := safe.ReaderGetByID[*secrets.Kubernetes](ctx, r, secrets.KubernetesID)
if err != nil {
if state.IsNotFoundError(err) {
continue
@ -101,10 +104,10 @@ func (ctrl *ManifestApplyController) Run(ctx context.Context, r controller.Runti
return err
}
secrets := secretsResources.(*secrets.Kubernetes).TypedSpec()
secrets := secretsResources.TypedSpec()
// wait for etcd to be healthy as controller relies on etcd for locking
etcdResource, err := r.Get(ctx, resource.NewMetadata(v1alpha1.NamespaceName, v1alpha1.ServiceType, "etcd", resource.VersionUndefined))
etcdResource, err := safe.ReaderGetByID[*v1alpha1.Service](ctx, r, "etcd")
if err != nil {
if state.IsNotFoundError(err) {
continue
@ -113,7 +116,7 @@ func (ctrl *ManifestApplyController) Run(ctx context.Context, r controller.Runti
return err
}
if !etcdResource.(*v1alpha1.Service).TypedSpec().Healthy {
if !etcdResource.TypedSpec().Healthy {
continue
}
@ -122,8 +125,8 @@ func (ctrl *ManifestApplyController) Run(ctx context.Context, r controller.Runti
return fmt.Errorf("error listing manifests: %w", err)
}
sort.Slice(manifests.Items, func(i, j int) bool {
return manifests.Items[i].Metadata().ID() < manifests.Items[j].Metadata().ID()
slices.SortFunc(manifests.Items, func(a, b resource.Resource) int {
return cmp.Compare(a.Metadata().ID(), b.Metadata().ID())
})
if len(manifests.Items) > 0 {
@ -163,8 +166,8 @@ func (ctrl *ManifestApplyController) Run(ctx context.Context, r controller.Runti
}
}
if err = r.Modify(ctx, k8s.NewManifestStatus(k8s.ControlPlaneNamespaceName), func(r resource.Resource) error {
status := r.(*k8s.ManifestStatus).TypedSpec()
if err = safe.WriterModify(ctx, r, k8s.NewManifestStatus(k8s.ControlPlaneNamespaceName), func(r *k8s.ManifestStatus) error {
status := r.TypedSpec()
status.ManifestsApplied = xslices.Map(manifests.Items, func(m resource.Resource) string {
return m.Metadata().ID()

View File

@ -225,8 +225,8 @@ func (suite *ManifestSuite) TestReconcileKubeProxyExtraArgs() {
suite.Assert().Equal("DaemonSet", k8sadapter.Manifest(manifest).Objects()[0].GetKind())
ds := k8sadapter.Manifest(manifest).Objects()[0].Object
containerSpec := ds["spec"].(map[string]interface{})["template"].(map[string]interface{})["spec"].(map[string]interface{})["containers"].([]interface{})[0]
args := containerSpec.(map[string]interface{})["command"].([]interface{}) //nolint:errcheck,forcetypeassert
containerSpec := ds["spec"].(map[string]any)["template"].(map[string]any)["spec"].(map[string]any)["containers"].([]any)[0]
args := containerSpec.(map[string]any)["command"].([]any) //nolint:errcheck,forcetypeassert
suite.Assert().Equal("--bind-address=\"::\"", args[len(args)-1])
}

View File

@ -8,7 +8,7 @@ import (
"context"
"encoding/json"
"fmt"
"sort"
"slices"
"time"
"github.com/cosi-project/runtime/pkg/controller"
@ -258,7 +258,7 @@ func umarshalOwnedAnnotation(node *v1.Node, annotation string) (map[string]struc
func marshalOwnedAnnotation(node *v1.Node, annotation string, ownedMap map[string]struct{}) error {
owned := maps.Keys(ownedMap)
sort.Strings(owned)
slices.Sort(owned)
if len(owned) > 0 {
ownedJSON, err := json.Marshal(owned)

View File

@ -5,7 +5,7 @@
package k8s_test
import (
"sort"
"slices"
"testing"
"github.com/siderolabs/gen/maps"
@ -132,7 +132,7 @@ func TestApplyLabels(t *testing.T) {
newOwnedLabels = []string{}
}
sort.Strings(newOwnedLabels)
slices.Sort(newOwnedLabels)
assert.Equal(t, tt.expectedLabels, node.Labels)
assert.Equal(t, tt.expectedOwnedLabels, newOwnedLabels)
@ -317,7 +317,7 @@ func TestApplyTaints(t *testing.T) {
newOwnedTaints = []string{}
}
sort.Strings(newOwnedTaints)
slices.Sort(newOwnedTaints)
assert.Equal(t, tt.expectedTaints, node.Spec.Taints)
assert.Equal(t, tt.expectedOwnedTaints, newOwnedTaints)

View File

@ -139,11 +139,12 @@ func (ctrl *NodeIPController) Run(ctx context.Context, r controller.Runtime, log
}
}
if err = r.Modify(
if err = safe.WriterModify(
ctx,
r,
k8s.NewNodeIP(k8s.NamespaceName, k8s.KubeletID),
func(r resource.Resource) error {
spec := r.(*k8s.NodeIP).TypedSpec()
func(r *k8s.NodeIP) error {
spec := r.TypedSpec()
spec.Addresses = nodeIPs

View File

@ -13,7 +13,7 @@ import (
"path/filepath"
"github.com/cosi-project/runtime/pkg/controller"
"github.com/cosi-project/runtime/pkg/resource"
"github.com/cosi-project/runtime/pkg/safe"
"github.com/cosi-project/runtime/pkg/state"
"github.com/siderolabs/go-kubernetes/kubernetes/compatibility"
"go.uber.org/zap"
@ -77,7 +77,7 @@ func (ctrl *RenderConfigsStaticPodController) Run(ctx context.Context, r control
case <-r.EventCh():
}
admissionRes, err := r.Get(ctx, k8s.NewAdmissionControlConfig().Metadata())
admissionRes, err := safe.ReaderGetByID[*k8s.AdmissionControlConfig](ctx, r, k8s.AdmissionControlConfigID)
if err != nil {
if state.IsNotFoundError(err) {
continue
@ -86,9 +86,9 @@ func (ctrl *RenderConfigsStaticPodController) Run(ctx context.Context, r control
return fmt.Errorf("error getting admission config resource: %w", err)
}
admissionConfig := admissionRes.(*k8s.AdmissionControlConfig).TypedSpec()
admissionConfig := admissionRes.TypedSpec()
auditRes, err := r.Get(ctx, k8s.NewAuditPolicyConfig().Metadata())
auditRes, err := safe.ReaderGetByID[*k8s.AuditPolicyConfig](ctx, r, k8s.AuditPolicyConfigID)
if err != nil {
if state.IsNotFoundError(err) {
continue
@ -97,9 +97,9 @@ func (ctrl *RenderConfigsStaticPodController) Run(ctx context.Context, r control
return fmt.Errorf("error getting audit config resource: %w", err)
}
auditConfig := auditRes.(*k8s.AuditPolicyConfig).TypedSpec()
auditConfig := auditRes.TypedSpec()
kubeSchedulerRes, err := r.Get(ctx, k8s.NewSchedulerConfig().Metadata())
kubeSchedulerRes, err := safe.ReaderGetByID[*k8s.SchedulerConfig](ctx, r, k8s.SchedulerConfigID)
if err != nil {
if state.IsNotFoundError(err) {
continue
@ -108,7 +108,7 @@ func (ctrl *RenderConfigsStaticPodController) Run(ctx context.Context, r control
return fmt.Errorf("error getting scheduler config resource: %w", err)
}
kubeSchedulerConfig := kubeSchedulerRes.(*k8s.SchedulerConfig).TypedSpec()
kubeSchedulerConfig := kubeSchedulerRes.TypedSpec()
kubeSchedulerVersion := compatibility.VersionFromImageRef(kubeSchedulerConfig.Image)
@ -190,9 +190,9 @@ func (ctrl *RenderConfigsStaticPodController) Run(ctx context.Context, r control
}
}
if err = r.Modify(ctx, k8s.NewConfigStatus(k8s.ControlPlaneNamespaceName, k8s.ConfigStatusStaticPodID), func(r resource.Resource) error {
r.(*k8s.ConfigStatus).TypedSpec().Ready = true
r.(*k8s.ConfigStatus).TypedSpec().Version = admissionRes.Metadata().Version().String() + auditRes.Metadata().Version().String() + kubeSchedulerRes.Metadata().Version().String()
if err = safe.WriterModify(ctx, r, k8s.NewConfigStatus(k8s.ControlPlaneNamespaceName, k8s.ConfigStatusStaticPodID), func(r *k8s.ConfigStatus) error {
r.TypedSpec().Ready = true
r.TypedSpec().Version = admissionRes.Metadata().Version().String() + auditRes.Metadata().Version().String() + kubeSchedulerRes.Metadata().Version().String()
return nil
}); err != nil {

View File

@ -327,9 +327,9 @@ func (ctrl *RenderSecretsStaticPodController) Run(ctx context.Context, r control
}
}
if err = r.Modify(ctx, k8s.NewSecretsStatus(k8s.ControlPlaneNamespaceName, k8s.StaticPodSecretsStaticPodID), func(r resource.Resource) error {
r.(*k8s.SecretsStatus).TypedSpec().Ready = true
r.(*k8s.SecretsStatus).TypedSpec().Version = secretsRes.Metadata().Version().String()
if err = safe.WriterModify(ctx, r, k8s.NewSecretsStatus(k8s.ControlPlaneNamespaceName, k8s.StaticPodSecretsStaticPodID), func(r *k8s.SecretsStatus) error {
r.TypedSpec().Ready = true
r.TypedSpec().Version = secretsRes.Metadata().Version().String()
return nil
}); err != nil {

View File

@ -9,7 +9,6 @@ import (
"fmt"
"github.com/cosi-project/runtime/pkg/controller"
"github.com/cosi-project/runtime/pkg/resource"
"github.com/cosi-project/runtime/pkg/safe"
"github.com/cosi-project/runtime/pkg/state"
"github.com/siderolabs/gen/optional"
@ -100,8 +99,8 @@ func (ctrl *StaticPodConfigController) Run(ctx context.Context, r controller.Run
id := fmt.Sprintf("%s-%s", namespace, name)
if err = r.Modify(ctx, k8s.NewStaticPod(k8s.NamespaceName, id), func(r resource.Resource) error {
r.(*k8s.StaticPod).TypedSpec().Pod = pod
if err = safe.WriterModify(ctx, r, k8s.NewStaticPod(k8s.NamespaceName, id), func(r *k8s.StaticPod) error {
r.TypedSpec().Pod = pod
return nil
}); err != nil {

View File

@ -107,15 +107,15 @@ func (suite *StaticPodConfigSuite) TestReconcile() {
MachineConfig: &v1alpha1.MachineConfig{
MachinePods: []v1alpha1.Unstructured{
{
Object: map[string]interface{}{
Object: map[string]any{
"apiVersion": "v1",
"kind": "pod",
"metadata": map[string]interface{}{
"metadata": map[string]any{
"name": "nginx",
},
"spec": map[string]interface{}{
"containers": []interface{}{
map[string]interface{}{
"spec": map[string]any{
"containers": []any{
map[string]any{
"name": "nginx",
"image": "nginx",
},
@ -148,7 +148,7 @@ func (suite *StaticPodConfigSuite) TestReconcile() {
)
// update the pod changing the namespace
cfg.Container().RawV1Alpha1().MachineConfig.MachinePods[0].Object["metadata"].(map[string]interface{})["namespace"] = "custom"
cfg.Container().RawV1Alpha1().MachineConfig.MachinePods[0].Object["metadata"].(map[string]any)["namespace"] = "custom"
suite.Require().NoError(suite.state.Update(suite.ctx, cfg))
suite.Assert().NoError(

View File

@ -12,7 +12,6 @@ import (
"sync"
"github.com/cosi-project/runtime/pkg/controller"
"github.com/cosi-project/runtime/pkg/resource"
"github.com/cosi-project/runtime/pkg/safe"
"go.uber.org/zap"
"gopkg.in/yaml.v3"
@ -54,7 +53,7 @@ func (ctrl *StaticPodServerController) Outputs() []controller.Output {
}
}
type pod map[string]interface{}
type pod map[string]any
type podList struct {
Kind string `json:"kind,omitempty" protobuf:"bytes,1,opt,name=kind"`
@ -191,10 +190,10 @@ func (ctrl *StaticPodServerController) createServer(ctx context.Context, r contr
shutdownServer()
}()
if err := r.Modify(ctx, k8s.NewStaticPodServerStatus(k8s.NamespaceName, k8s.StaticPodServerStatusResourceID), func(r resource.Resource) error {
if err := safe.WriterModify(ctx, r, k8s.NewStaticPodServerStatus(k8s.NamespaceName, k8s.StaticPodServerStatusResourceID), func(r *k8s.StaticPodServerStatus) error {
url := fmt.Sprintf("http://%s", listener.Addr().String())
r.(*k8s.StaticPodServerStatus).TypedSpec().URL = url
r.TypedSpec().URL = url
return nil
}); err != nil {

View File

@ -110,7 +110,7 @@ func (suite *StaticPodListSuite) getResource(
func newTestPod(name string) *k8s.StaticPod {
testPod := k8s.NewStaticPod(k8s.NamespaceName, name)
testPod.TypedSpec().Pod = map[string]interface{}{
testPod.TypedSpec().Pod = map[string]any{
"metadata": name,
"spec": "testSpec",
}