fix: improve the drain function

Critical bug (I believe) was that drain code entered the loop to evict
the pod after wait for pod to be deleted returned success effectively
evicting pod once again once it got rescheduled to a different node.

Add a global timeout to prevent draining code from running forever.

Filter more pod types which shouldn't be ever drained.

Fixes #3124

Signed-off-by: Andrey Smirnov <smirnov.andrey@gmail.com>
This commit is contained in:
Andrey Smirnov 2021-02-25 16:40:50 +03:00 committed by talos-bot
parent f24c815373
commit 779ac74a08
4 changed files with 60 additions and 39 deletions

2
go.mod
View File

@ -68,7 +68,7 @@ require (
github.com/talos-systems/go-cmd v0.0.0-20210216164758-68eb0067e0f0
github.com/talos-systems/go-loadbalancer v0.1.0
github.com/talos-systems/go-procfs v0.0.0-20210108152626-8cbc42d3dc24
github.com/talos-systems/go-retry v0.2.0
github.com/talos-systems/go-retry v0.2.1-0.20210119124456-b9dc1a990133
github.com/talos-systems/go-smbios v0.0.0-20200807005123-80196199691e
github.com/talos-systems/grpc-proxy v0.2.0
github.com/talos-systems/net v0.2.1-0.20210204205549-52c750994376

2
go.sum
View File

@ -869,6 +869,8 @@ github.com/talos-systems/go-retry v0.1.0/go.mod h1:HiXQqyVStZ35uSY/MTLWVvQVmC3lI
github.com/talos-systems/go-retry v0.1.1-0.20201113203059-8c63d290a688/go.mod h1:HiXQqyVStZ35uSY/MTLWVvQVmC3lIW2MS5VdDaMtoKM=
github.com/talos-systems/go-retry v0.2.0 h1:YpQHmtTZ2k0i/bBYRIasdVmF0XaiISVJUOrmZ6FzgLU=
github.com/talos-systems/go-retry v0.2.0/go.mod h1:HiXQqyVStZ35uSY/MTLWVvQVmC3lIW2MS5VdDaMtoKM=
github.com/talos-systems/go-retry v0.2.1-0.20210119124456-b9dc1a990133 h1:mHnKEViee9x2A6YbsUykwqh7L+tLpm5HTlos2QDlqts=
github.com/talos-systems/go-retry v0.2.1-0.20210119124456-b9dc1a990133/go.mod h1:HiXQqyVStZ35uSY/MTLWVvQVmC3lIW2MS5VdDaMtoKM=
github.com/talos-systems/go-smbios v0.0.0-20200807005123-80196199691e h1:uCp8BfH4Ky2R1XkOKA5pSZpeMMyt0AbH29PIrkoBlaM=
github.com/talos-systems/go-smbios v0.0.0-20200807005123-80196199691e/go.mod h1:HxhrzAoTZ7ed5Z5VvtCvnCIrOxyXDS7V2B5hCetAMW8=
github.com/talos-systems/grpc-proxy v0.2.0 h1:DN75bLfaW4xfhq0r0mwFRnfGhSB+HPhK1LNzuMEs9Pw=

View File

@ -1242,7 +1242,7 @@ func CordonAndDrainNode(seq runtime.Sequence, data interface{}) (runtime.TaskExe
return err
}
if err = kubeHelper.CordonAndDrain(nodename); err != nil {
if err = kubeHelper.CordonAndDrain(ctx, nodename); err != nil {
return err
}

View File

@ -11,11 +11,12 @@ import (
"fmt"
"log"
"net/url"
"sync"
"time"
"github.com/talos-systems/crypto/x509"
"github.com/talos-systems/go-retry/retry"
"golang.org/x/sync/errgroup"
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
policy "k8s.io/api/policy/v1beta1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
@ -31,6 +32,11 @@ import (
"github.com/talos-systems/talos/pkg/machinery/constants"
)
const (
// DrainTimeout is maximum time to wait for the node to be drained.
DrainTimeout = 5 * time.Minute
)
// Client represents a set of helper methods for interacting with the
// Kubernetes API.
type Client struct {
@ -279,18 +285,18 @@ func (h *Client) WaitUntilReady(name string) error {
}
// CordonAndDrain cordons and drains a node in one call.
func (h *Client) CordonAndDrain(node string) (err error) {
if err = h.Cordon(node); err != nil {
func (h *Client) CordonAndDrain(ctx context.Context, node string) (err error) {
if err = h.Cordon(ctx, node); err != nil {
return err
}
return h.Drain(node)
return h.Drain(ctx, node)
}
// Cordon marks a node as unschedulable.
func (h *Client) Cordon(name string) error {
err := retry.Exponential(30*time.Second, retry.WithUnits(250*time.Millisecond), retry.WithJitter(50*time.Millisecond)).Retry(func() error {
node, err := h.CoreV1().Nodes().Get(context.TODO(), name, metav1.GetOptions{})
func (h *Client) Cordon(ctx context.Context, name string) error {
err := retry.Exponential(30*time.Second, retry.WithUnits(250*time.Millisecond), retry.WithJitter(50*time.Millisecond)).RetryWithContext(ctx, func(ctx context.Context) error {
node, err := h.CoreV1().Nodes().Get(ctx, name, metav1.GetOptions{})
if err != nil {
return retry.UnexpectedError(err)
}
@ -302,7 +308,7 @@ func (h *Client) Cordon(name string) error {
node.Annotations[constants.AnnotationCordonedKey] = constants.AnnotationCordonedValue
node.Spec.Unschedulable = true
if _, err := h.CoreV1().Nodes().Update(context.TODO(), node, metav1.UpdateOptions{}); err != nil {
if _, err := h.CoreV1().Nodes().Update(ctx, node, metav1.UpdateOptions{}); err != nil {
return retry.ExpectedError(err)
}
@ -352,58 +358,69 @@ func (h *Client) Uncordon(name string, force bool) error {
}
// Drain evicts all pods on a given node.
func (h *Client) Drain(node string) error {
func (h *Client) Drain(ctx context.Context, node string) error {
ctx, cancel := context.WithTimeout(ctx, DrainTimeout)
defer cancel()
opts := metav1.ListOptions{
FieldSelector: fields.SelectorFromSet(fields.Set{"spec.nodeName": node}).String(),
}
pods, err := h.CoreV1().Pods(metav1.NamespaceAll).List(context.TODO(), opts)
pods, err := h.CoreV1().Pods(metav1.NamespaceAll).List(ctx, opts)
if err != nil {
return fmt.Errorf("cannot get pods for node %s: %w", node, err)
}
var wg sync.WaitGroup
wg.Add(len(pods.Items))
var eg errgroup.Group
// Evict each pod.
for _, pod := range pods.Items {
go func(p corev1.Pod) {
defer wg.Done()
p := pod
for _, ref := range p.ObjectMeta.OwnerReferences {
if ref.Kind == "DaemonSet" {
log.Printf("skipping DaemonSet pod %s\n", p.GetName())
eg.Go(func() error {
if _, ok := p.ObjectMeta.Annotations[corev1.MirrorPodAnnotationKey]; ok {
log.Printf("skipping mirror pod %s/%s\n", p.GetNamespace(), p.GetName())
return
}
if ref.Kind == "Node" {
log.Printf("skipping StaticPod pod %s\n", p.GetName())
return
}
return nil
}
if err := h.evict(p, int64(60)); err != nil {
controllerRef := metav1.GetControllerOf(&p)
if controllerRef == nil {
log.Printf("skipping unmanaged pod %s/%s\n", p.GetNamespace(), p.GetName())
return nil
}
if controllerRef.Kind == appsv1.SchemeGroupVersion.WithKind("DaemonSet").Kind {
log.Printf("skipping DaemonSet pod %s/%s\n", p.GetNamespace(), p.GetName())
return nil
}
if !p.DeletionTimestamp.IsZero() {
log.Printf("skipping deleted pod %s/%s\n", p.GetNamespace(), p.GetName())
}
if err := h.evict(ctx, p, int64(60)); err != nil {
log.Printf("WARNING: failed to evict pod: %v", err)
}
}(pod)
return nil
})
}
wg.Wait()
return nil
return eg.Wait()
}
func (h *Client) evict(p corev1.Pod, gracePeriod int64) error {
func (h *Client) evict(ctx context.Context, p corev1.Pod, gracePeriod int64) error {
for {
pol := &policy.Eviction{
ObjectMeta: metav1.ObjectMeta{Namespace: p.GetNamespace(), Name: p.GetName()},
DeleteOptions: &metav1.DeleteOptions{GracePeriodSeconds: &gracePeriod},
}
err := h.CoreV1().Pods(p.GetNamespace()).Evict(context.TODO(), pol)
err := h.CoreV1().Pods(p.GetNamespace()).Evict(ctx, pol)
switch {
case apierrors.IsTooManyRequests(err):
@ -413,16 +430,18 @@ func (h *Client) evict(p corev1.Pod, gracePeriod int64) error {
case err != nil:
return fmt.Errorf("failed to evict pod %s/%s: %w", p.GetNamespace(), p.GetName(), err)
default:
if err = h.waitForPodDeleted(&p); err != nil {
if err = h.waitForPodDeleted(ctx, &p); err != nil {
return fmt.Errorf("failed waiting on pod %s/%s to be deleted: %w", p.GetNamespace(), p.GetName(), err)
}
return nil
}
}
}
func (h *Client) waitForPodDeleted(p *corev1.Pod) error {
return retry.Constant(time.Minute, retry.WithUnits(3*time.Second)).Retry(func() error {
pod, err := h.CoreV1().Pods(p.GetNamespace()).Get(context.TODO(), p.GetName(), metav1.GetOptions{})
func (h *Client) waitForPodDeleted(ctx context.Context, p *corev1.Pod) error {
return retry.Constant(time.Minute, retry.WithUnits(3*time.Second)).RetryWithContext(ctx, func(ctx context.Context) error {
pod, err := h.CoreV1().Pods(p.GetNamespace()).Get(ctx, p.GetName(), metav1.GetOptions{})
switch {
case apierrors.IsNotFound(err):
return nil