feat: support control plane upgrades with Talos managed control plane

Upgrade is performed by updating node configuration (node by node, service
by service), watching internal resource state to get new configuration
version and verifying that pod with matching version successfully
propagated to the API server state and pod is ready.

Process is similar to the rolling update of the DaemonSet.

Signed-off-by: Andrey Smirnov <smirnov.andrey@gmail.com>
This commit is contained in:
Andrey Smirnov 2021-02-20 20:00:22 +03:00 committed by talos-bot
parent 8789849c70
commit e2f1fbcfdb
4 changed files with 251 additions and 6 deletions

View File

@ -74,7 +74,7 @@ const (
previousK8sVersion = "1.19.4"
stableK8sVersion = "1.20.1"
currentK8sVersion = "1.20.4" //nolint: deadcode,varcheck
currentK8sVersion = "1.20.4"
)
var (
@ -128,7 +128,7 @@ func upgradeStableReleaseToCurrent() upgradeSpec {
TargetInstallerImage: fmt.Sprintf("%s/%s:%s", DefaultSettings.TargetInstallImageRegistry, images.DefaultInstallerImageName, DefaultSettings.CurrentVersion),
TargetVersion: DefaultSettings.CurrentVersion,
TargetK8sVersion: stableK8sVersion, // TODO: skip k8s upgrade: currentK8sVersion,
TargetK8sVersion: currentK8sVersion,
TargetSelfHosted: false,
MasterNodes: DefaultSettings.MasterNodes,
@ -150,7 +150,7 @@ func upgradeSingeNodePreserve() upgradeSpec {
TargetInstallerImage: fmt.Sprintf("%s/%s:%s", DefaultSettings.TargetInstallImageRegistry, images.DefaultInstallerImageName, DefaultSettings.CurrentVersion),
TargetVersion: DefaultSettings.CurrentVersion,
TargetK8sVersion: stableK8sVersion, // TODO: skip k8s upgrade: currentK8sVersion
TargetK8sVersion: currentK8sVersion,
TargetSelfHosted: false,
MasterNodes: 1,
@ -173,7 +173,7 @@ func upgradeSingeNodeStage() upgradeSpec {
TargetInstallerImage: fmt.Sprintf("%s/%s:%s", DefaultSettings.TargetInstallImageRegistry, images.DefaultInstallerImageName, DefaultSettings.CurrentVersion),
TargetVersion: DefaultSettings.CurrentVersion,
TargetK8sVersion: stableK8sVersion, // TODO: skip k8s upgrade: currentK8sVersion
TargetK8sVersion: currentK8sVersion,
TargetSelfHosted: false,
MasterNodes: 1,

View File

@ -32,6 +32,7 @@ import (
"github.com/talos-systems/talos/pkg/machinery/config/configloader"
v1alpha1config "github.com/talos-systems/talos/pkg/machinery/config/types/v1alpha1"
"github.com/talos-systems/talos/pkg/machinery/config/types/v1alpha1/generate"
machinetype "github.com/talos-systems/talos/pkg/machinery/config/types/v1alpha1/machine"
"github.com/talos-systems/talos/pkg/machinery/constants"
"github.com/talos-systems/talos/pkg/resources/config"
"github.com/talos-systems/talos/pkg/resources/k8s"
@ -63,7 +64,7 @@ func ConvertToStaticPods(ctx context.Context, cluster ConvertProvider, options C
return fmt.Errorf("error building kubernetes client: %w", err)
}
options.masterNodes, err = k8sClient.MasterIPs(ctx)
options.masterNodes, err = k8sClient.NodeIPs(ctx, machinetype.TypeControlPlane)
if err != nil {
return fmt.Errorf("error fetching master nodes: %w", err)
}

View File

@ -6,9 +6,22 @@ package kubernetes
import (
"context"
"errors"
"fmt"
"strings"
"time"
"github.com/talos-systems/go-retry/retry"
"github.com/talos-systems/os-runtime/pkg/state"
apierrors "k8s.io/apimachinery/pkg/api/errors"
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"github.com/talos-systems/talos/pkg/cluster"
"github.com/talos-systems/talos/pkg/machinery/client"
v1alpha1config "github.com/talos-systems/talos/pkg/machinery/config/types/v1alpha1"
machinetype "github.com/talos-systems/talos/pkg/machinery/config/types/v1alpha1/machine"
"github.com/talos-systems/talos/pkg/machinery/constants"
"github.com/talos-systems/talos/pkg/resources/config"
)
// UpgradeProvider are the cluster interfaces required by upgrade process.
@ -18,6 +31,236 @@ type UpgradeProvider interface {
}
// UpgradeTalosManaged the Kubernetes control plane.
//
//nolint: gocyclo
func UpgradeTalosManaged(ctx context.Context, cluster UpgradeProvider, options UpgradeOptions) error {
return fmt.Errorf("not implemented yet")
switch {
case strings.HasPrefix(options.FromVersion, "1.19.") && strings.HasPrefix(options.ToVersion, "1.19."):
case strings.HasPrefix(options.FromVersion, "1.19.") && strings.HasPrefix(options.ToVersion, "1.20."):
options.extraUpdaters = append(options.extraUpdaters, addControlPlaneToleration())
case strings.HasPrefix(options.FromVersion, "1.20.") && strings.HasPrefix(options.ToVersion, "1.20."):
default:
return fmt.Errorf("unsupported upgrade from %q to %q", options.FromVersion, options.ToVersion)
}
k8sClient, err := cluster.K8sHelper(ctx)
if err != nil {
return fmt.Errorf("error building kubernetes client: %w", err)
}
options.masterNodes, err = k8sClient.NodeIPs(ctx, machinetype.TypeControlPlane)
if err != nil {
return fmt.Errorf("error fetching master nodes: %w", err)
}
if len(options.masterNodes) == 0 {
return fmt.Errorf("no master nodes discovered")
}
fmt.Printf("discovered master nodes %q\n", options.masterNodes)
for _, service := range []string{kubeAPIServer, kubeControllerManager, kubeScheduler} {
if err = upgradeConfigPatch(ctx, cluster, options, service); err != nil {
return fmt.Errorf("failed updating service %q: %w", service, err)
}
}
if err = hyperkubeUpgradeDs(ctx, k8sClient.Clientset, kubeProxy, options); err != nil {
return fmt.Errorf("error updating kube-proxy: %w", err)
}
return nil
}
func upgradeConfigPatch(ctx context.Context, cluster UpgradeProvider, options UpgradeOptions, service string) error {
fmt.Printf("updating %q to version %q\n", service, options.ToVersion)
for _, node := range options.masterNodes {
if err := upgradeNodeConfigPatch(ctx, cluster, options, service, node); err != nil {
return fmt.Errorf("error updating node %q: %w", node, err)
}
}
return nil
}
//nolint: gocyclo
func upgradeNodeConfigPatch(ctx context.Context, cluster UpgradeProvider, options UpgradeOptions, service, node string) error {
ctx, cancel := context.WithCancel(ctx)
defer cancel()
c, err := cluster.Client()
if err != nil {
return fmt.Errorf("error building Talos API client: %w", err)
}
ctx = client.WithNodes(ctx, node)
fmt.Printf(" > updating node %q\n", node)
watchClient, err := c.Resources.Watch(ctx, config.NamespaceName, config.K8sControlPlaneType, service)
if err != nil {
return fmt.Errorf("error watching service configuration: %w", err)
}
// first response is resource definition
_, err = watchClient.Recv()
if err != nil {
return fmt.Errorf("error watching config: %w", err)
}
// second is the initial state
watchInitial, err := watchClient.Recv()
if err != nil {
return fmt.Errorf("error watching config: %w", err)
}
if watchInitial.EventType != state.Created {
return fmt.Errorf("unexpected event type: %d", watchInitial.EventType)
}
skipConfigWait := false
err = patchNodeConfig(ctx, cluster, node, upgradeConfigPatcher(options, service))
if err != nil {
if errors.Is(err, errUpdateSkipped) {
skipConfigWait = true
} else {
return fmt.Errorf("error patching node config: %w", err)
}
}
var expectedConfigVersion string
if !skipConfigWait {
watchUpdated, err := watchClient.Recv()
if err != nil {
return fmt.Errorf("error watching config: %w", err)
}
if watchUpdated.EventType != state.Updated {
return fmt.Errorf("unexpected event type: %d", watchInitial.EventType)
}
expectedConfigVersion = watchUpdated.Resource.Metadata().Version().String()
} else {
expectedConfigVersion = watchInitial.Resource.Metadata().Version().String()
}
return retry.Constant(3*time.Minute, retry.WithUnits(10*time.Second), retry.WithErrorLogging(true)).Retry(func() error {
return checkPodStatus(ctx, cluster, service, node, expectedConfigVersion)
})
}
var errUpdateSkipped = fmt.Errorf("update skipped")
//nolint: gocyclo
func upgradeConfigPatcher(options UpgradeOptions, service string) func(config *v1alpha1config.Config) error {
return func(config *v1alpha1config.Config) error {
if config.ClusterConfig == nil {
config.ClusterConfig = &v1alpha1config.ClusterConfig{}
}
switch service {
case kubeAPIServer:
if config.ClusterConfig.APIServerConfig == nil {
config.ClusterConfig.APIServerConfig = &v1alpha1config.APIServerConfig{}
}
image := fmt.Sprintf("%s:v%s", constants.KubernetesAPIServerImage, options.ToVersion)
if config.ClusterConfig.APIServerConfig.ContainerImage == image {
return errUpdateSkipped
}
config.ClusterConfig.APIServerConfig.ContainerImage = image
case kubeControllerManager:
if config.ClusterConfig.ControllerManagerConfig == nil {
config.ClusterConfig.ControllerManagerConfig = &v1alpha1config.ControllerManagerConfig{}
}
image := fmt.Sprintf("%s:v%s", constants.KubernetesControllerManagerImage, options.ToVersion)
if config.ClusterConfig.ControllerManagerConfig.ContainerImage == image {
return errUpdateSkipped
}
config.ClusterConfig.ControllerManagerConfig.ContainerImage = image
case kubeScheduler:
if config.ClusterConfig.SchedulerConfig == nil {
config.ClusterConfig.SchedulerConfig = &v1alpha1config.SchedulerConfig{}
}
image := fmt.Sprintf("%s:v%s", constants.KubernetesSchedulerImage, options.ToVersion)
if config.ClusterConfig.SchedulerConfig.ContainerImage == image {
return errUpdateSkipped
}
config.ClusterConfig.SchedulerConfig.ContainerImage = image
default:
return fmt.Errorf("unsupported service %q", service)
}
return nil
}
}
//nolint: gocyclo
func checkPodStatus(ctx context.Context, cluster UpgradeProvider, service, node, configVersion string) error {
k8sClient, err := cluster.K8sHelper(ctx)
if err != nil {
return retry.UnexpectedError(fmt.Errorf("error building kubernetes client: %w", err))
}
pods, err := k8sClient.CoreV1().Pods(namespace).List(ctx, v1.ListOptions{
LabelSelector: fmt.Sprintf("k8s-app = %s", service),
})
if err != nil {
if apierrors.IsTimeout(err) || apierrors.IsServerTimeout(err) || apierrors.IsInternalError(err) {
return retry.ExpectedError(err)
}
return retry.UnexpectedError(err)
}
podFound := false
for _, pod := range pods.Items {
if pod.Status.HostIP != node {
continue
}
podFound = true
if pod.Annotations[constants.AnnotationStaticPodConfigVersion] != configVersion {
return retry.ExpectedError(fmt.Errorf("config version mismatch: got %q, expected %q", pod.Annotations[constants.AnnotationStaticPodConfigVersion], configVersion))
}
ready := false
for _, condition := range pod.Status.Conditions {
if condition.Type != "Ready" {
continue
}
if condition.Status == "True" {
ready = true
break
}
}
if !ready {
return retry.ExpectedError(fmt.Errorf("pod is not ready"))
}
break
}
if !podFound {
return retry.ExpectedError(fmt.Errorf("pod not found in the API server state"))
}
return nil
}

View File

@ -28,6 +28,7 @@ type UpgradeOptions struct {
extraUpdaters []daemonsetUpdater
podCheckpointerExtraUpdaters []daemonsetUpdater
masterNodes []string
}
type daemonsetUpdater func(ds string, daemonset *appsv1.DaemonSet) error