fix: iterate over etcd members endpoints for member promotion

This uses all available (potential) etcd endpoints, which includes the
member being promoted as well. We avoid failures by iterating over the
list of endpoints on each attempt to make sure each and every endpoint
is tried.

Part of #5889

Signed-off-by: Andrey Smirnov <andrey.smirnov@talos-systems.com>
This commit is contained in:
Andrey Smirnov 2022-07-29 23:18:08 +04:00
parent c70b692fb3
commit 6fc38bae69
No known key found for this signature in database
GPG Key ID: 7B26396447AB6DFD
6 changed files with 160 additions and 85 deletions

View File

@ -22,6 +22,7 @@ import (
"github.com/talos-systems/talos/pkg/machinery/config/types/v1alpha1/machine"
"github.com/talos-systems/talos/pkg/machinery/constants"
"github.com/talos-systems/talos/pkg/machinery/generic/slices"
"github.com/talos-systems/talos/pkg/machinery/nethelpers"
"github.com/talos-systems/talos/pkg/machinery/resources/config"
"github.com/talos-systems/talos/pkg/machinery/resources/k8s"
)
@ -164,7 +165,7 @@ func (ctrl *K8sControlPlaneController) manageAPIServerConfig(ctx context.Context
Image: cfgProvider.Cluster().APIServer().Image(),
CloudProvider: cloudProvider,
ControlPlaneEndpoint: cfgProvider.Cluster().Endpoint().String(),
EtcdServers: []string{"https://127.0.0.1:2379"},
EtcdServers: []string{fmt.Sprintf("https://%s", nethelpers.JoinHostPort("localhost", constants.EtcdClientPort))},
LocalPort: cfgProvider.Cluster().LocalAPIServerPort(),
ServiceCIDRs: cfgProvider.Cluster().Network().ServiceCIDRs(),
ExtraArgs: cfgProvider.Cluster().APIServer().ExtraArgs(),

View File

@ -92,11 +92,11 @@ func (ctrl *PKIController) Run(ctx context.Context, r controller.Runtime, logger
return err
}
if err = os.WriteFile(constants.KubernetesEtcdCACert, rootScrts.TypedSpec().EtcdCA.Crt, 0o400); err != nil {
if err = os.WriteFile(constants.EtcdCACert, rootScrts.TypedSpec().EtcdCA.Crt, 0o400); err != nil {
return fmt.Errorf("failed to write CA certificate: %w", err)
}
if err = os.WriteFile(constants.KubernetesEtcdCAKey, rootScrts.TypedSpec().EtcdCA.Key, 0o400); err != nil {
if err = os.WriteFile(constants.EtcdCAKey, rootScrts.TypedSpec().EtcdCA.Key, 0o400); err != nil {
return fmt.Errorf("failed to write CA key: %w", err)
}
@ -109,18 +109,18 @@ func (ctrl *PKIController) Run(ctx context.Context, r controller.Runtime, logger
}{
{
getter: func() *x509.PEMEncodedCertificateAndKey { return etcdCerts.Etcd },
keyPath: constants.KubernetesEtcdKey,
certPath: constants.KubernetesEtcdCert,
keyPath: constants.EtcdKey,
certPath: constants.EtcdCert,
},
{
getter: func() *x509.PEMEncodedCertificateAndKey { return etcdCerts.EtcdPeer },
keyPath: constants.KubernetesEtcdPeerKey,
certPath: constants.KubernetesEtcdPeerCert,
keyPath: constants.EtcdPeerKey,
certPath: constants.EtcdPeerCert,
},
{
getter: func() *x509.PEMEncodedCertificateAndKey { return etcdCerts.EtcdAdmin },
keyPath: constants.KubernetesEtcdAdminKey,
certPath: constants.KubernetesEtcdAdminCert,
keyPath: constants.EtcdAdminKey,
certPath: constants.EtcdAdminCert,
},
} {
if err = os.WriteFile(keypair.keyPath, keypair.getter().Key, 0o400); err != nil {

View File

@ -305,7 +305,7 @@ func buildInitialCluster(ctx context.Context, r runtime.Runtime, name, ip string
retry.WithErrorLogging(true),
).RetryWithContext(ctx, func(ctx context.Context) error {
var (
peerAddrs = []string{"https://" + net.FormatAddress(ip) + ":2380"}
peerAddrs = []string{"https://" + nethelpers.JoinHostPort(ip, +constants.EtcdPeerPort)}
resp *clientv3.MemberListResponse
)
@ -397,16 +397,16 @@ func (e *Etcd) argsForInit(ctx context.Context, r runtime.Runtime) error {
"auto-tls": "false",
"peer-auto-tls": "false",
"data-dir": constants.EtcdDataPath,
"listen-peer-urls": "https://" + net.FormatAddress(listenAddress) + ":2380",
"listen-client-urls": "https://" + net.FormatAddress(listenAddress) + ":2379",
"listen-peer-urls": "https://" + nethelpers.JoinHostPort(listenAddress, constants.EtcdPeerPort),
"listen-client-urls": "https://" + nethelpers.JoinHostPort(listenAddress, constants.EtcdClientPort),
"client-cert-auth": "true",
"cert-file": constants.KubernetesEtcdCert,
"key-file": constants.KubernetesEtcdKey,
"trusted-ca-file": constants.KubernetesEtcdCACert,
"cert-file": constants.EtcdCert,
"key-file": constants.EtcdKey,
"trusted-ca-file": constants.EtcdCACert,
"peer-client-cert-auth": "true",
"peer-cert-file": constants.KubernetesEtcdPeerCert,
"peer-key-file": constants.KubernetesEtcdPeerKey,
"peer-trusted-ca-file": constants.KubernetesEtcdCACert,
"peer-cert-file": constants.EtcdPeerCert,
"peer-key-file": constants.EtcdPeerKey,
"peer-trusted-ca-file": constants.EtcdCACert,
"experimental-initial-corrupt-check": "true",
}
@ -427,7 +427,7 @@ func (e *Etcd) argsForInit(ctx context.Context, r runtime.Runtime) error {
}
if ok {
initialCluster := fmt.Sprintf("%s=https://%s:2380", hostname, net.FormatAddress(primaryAddr))
initialCluster := fmt.Sprintf("%s=https://%s", hostname, nethelpers.JoinHostPort(primaryAddr, constants.EtcdPeerPort))
if upgraded {
denyListArgs.Set("initial-cluster-state", "existing")
@ -446,13 +446,13 @@ func (e *Etcd) argsForInit(ctx context.Context, r runtime.Runtime) error {
if !extraArgs.Contains("initial-advertise-peer-urls") {
denyListArgs.Set("initial-advertise-peer-urls",
fmt.Sprintf("https://%s", nethelpers.JoinHostPort(net.FormatAddress(primaryAddr), 2380)),
fmt.Sprintf("https://%s", nethelpers.JoinHostPort(primaryAddr, constants.EtcdPeerPort)),
)
}
if !extraArgs.Contains("advertise-client-urls") {
denyListArgs.Set("advertise-client-urls",
fmt.Sprintf("https://%s", nethelpers.JoinHostPort(net.FormatAddress(primaryAddr), 2379)),
fmt.Sprintf("https://%s", nethelpers.JoinHostPort(primaryAddr, constants.EtcdClientPort)),
)
}
@ -487,16 +487,16 @@ func (e *Etcd) argsForControlPlane(ctx context.Context, r runtime.Runtime) error
"auto-tls": "false",
"peer-auto-tls": "false",
"data-dir": constants.EtcdDataPath,
"listen-peer-urls": "https://" + net.FormatAddress(listenAddress) + ":2380",
"listen-client-urls": "https://" + net.FormatAddress(listenAddress) + ":2379",
"listen-peer-urls": "https://" + nethelpers.JoinHostPort(listenAddress, constants.EtcdPeerPort),
"listen-client-urls": "https://" + nethelpers.JoinHostPort(listenAddress, constants.EtcdClientPort),
"client-cert-auth": "true",
"cert-file": constants.KubernetesEtcdCert,
"key-file": constants.KubernetesEtcdKey,
"trusted-ca-file": constants.KubernetesEtcdCACert,
"cert-file": constants.EtcdCert,
"key-file": constants.EtcdKey,
"trusted-ca-file": constants.EtcdCACert,
"peer-client-cert-auth": "true",
"peer-cert-file": constants.KubernetesEtcdPeerCert,
"peer-key-file": constants.KubernetesEtcdPeerKey,
"peer-trusted-ca-file": constants.KubernetesEtcdCACert,
"peer-cert-file": constants.EtcdPeerCert,
"peer-key-file": constants.EtcdPeerKey,
"peer-trusted-ca-file": constants.EtcdCACert,
"experimental-initial-corrupt-check": "true",
}
@ -530,7 +530,7 @@ func (e *Etcd) argsForControlPlane(ctx context.Context, r runtime.Runtime) error
var initialCluster string
if e.Bootstrap {
initialCluster = fmt.Sprintf("%s=https://%s:2380", hostname, net.FormatAddress(primaryAddr))
initialCluster = fmt.Sprintf("%s=https://%s", hostname, nethelpers.JoinHostPort(primaryAddr, constants.EtcdPeerPort))
} else {
initialCluster, e.learnerMemberID, err = buildInitialCluster(ctx, r, hostname, primaryAddr)
if err != nil {
@ -543,14 +543,14 @@ func (e *Etcd) argsForControlPlane(ctx context.Context, r runtime.Runtime) error
if !extraArgs.Contains("initial-advertise-peer-urls") {
denyListArgs.Set("initial-advertise-peer-urls",
fmt.Sprintf("https://%s", nethelpers.JoinHostPort(net.FormatAddress(primaryAddr), 2380)),
fmt.Sprintf("https://%s", nethelpers.JoinHostPort(primaryAddr, constants.EtcdPeerPort)),
)
}
}
if !extraArgs.Contains("advertise-client-urls") {
denyListArgs.Set("advertise-client-urls",
fmt.Sprintf("https://%s", nethelpers.JoinHostPort(net.FormatAddress(primaryAddr), 2379)),
fmt.Sprintf("https://%s", nethelpers.JoinHostPort(net.FormatAddress(primaryAddr), constants.EtcdClientPort)),
)
}
@ -581,9 +581,9 @@ func (e *Etcd) recoverFromSnapshot(hostname, primaryAddr string) error {
Name: hostname,
OutputDataDir: constants.EtcdDataPath,
PeerURLs: []string{"https://" + net.FormatAddress(primaryAddr) + ":2380"},
PeerURLs: []string{"https://" + nethelpers.JoinHostPort(primaryAddr, constants.EtcdPeerPort)},
InitialCluster: fmt.Sprintf("%s=https://%s:2380", hostname, net.FormatAddress(primaryAddr)),
InitialCluster: fmt.Sprintf("%s=https://%s", hostname, nethelpers.JoinHostPort(primaryAddr, constants.EtcdPeerPort)),
SkipHashCheck: e.RecoverSkipHashCheck,
}); err != nil {
@ -600,24 +600,60 @@ func (e *Etcd) recoverFromSnapshot(hostname, primaryAddr string) error {
func promoteMember(ctx context.Context, r runtime.Runtime, memberID uint64) error {
// try to promote a member until it succeeds (call might fail until the member catches up with the leader)
// promote member call will fail until member catches up with the master
//
// iterate over all endpoints until we find the one which works
// if we stick with the default behavior, we might hit the member being promoted, and that will never
// promote itself.
idx := 0
return retry.Constant(10*time.Minute,
retry.WithUnits(15*time.Second),
retry.WithJitter(time.Second),
retry.WithErrorLogging(true),
).RetryWithContext(ctx, func(ctx context.Context) error {
client, err := etcd.NewClientFromControlPlaneIPsNoDiscovery(ctx, r.State().V1Alpha2().Resources())
endpoints, err := etcd.GetEndpoints(ctx, r.State().V1Alpha2().Resources())
if err != nil {
return retry.ExpectedError(err)
}
defer client.Close() //nolint:errcheck
if len(endpoints) == 0 {
return retry.ExpectedErrorf("no endpoints")
}
_, err = client.MemberPromote(ctx, memberID)
// try to iterate all available endpoints in the time available for an attempt
for i := 0; i < len(endpoints); i++ {
select {
case <-ctx.Done():
return retry.ExpectedError(ctx.Err())
default:
}
endpoint := endpoints[idx%len(endpoints)]
idx++
err = attemptPromote(ctx, endpoint, memberID)
if err == nil {
return nil
}
}
return retry.ExpectedError(err)
})
}
func attemptPromote(ctx context.Context, endpoint string, memberID uint64) error {
client, err := etcd.NewClient([]string{endpoint})
if err != nil {
return err
}
defer client.Close() //nolint:errcheck
_, err = client.MemberPromote(ctx, memberID)
return err
}
// IsDirEmpty checks if a directory is empty or not.
func IsDirEmpty(name string) (bool, error) {
f, err := os.Open(name)

View File

@ -0,0 +1,58 @@
// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this
// file, You can obtain one at http://mozilla.org/MPL/2.0/.
package etcd
import (
"context"
"fmt"
"github.com/cosi-project/runtime/pkg/resource"
"github.com/cosi-project/runtime/pkg/safe"
"github.com/cosi-project/runtime/pkg/state"
"github.com/talos-systems/talos/pkg/machinery/constants"
"github.com/talos-systems/talos/pkg/machinery/nethelpers"
"github.com/talos-systems/talos/pkg/machinery/resources/k8s"
)
// GetEndpoints returns expected endpoints of etcd cluster members.
//
// It is not guaranteed that etcd is running on each listed endpoint.
func GetEndpoints(ctx context.Context, resources state.State) ([]string, error) {
return getEndpoints(ctx, resources, "")
}
func getEndpoints(ctx context.Context, resources state.State, ignoreEndpointID string) ([]string, error) {
endpointResources, err := safe.StateList[*k8s.Endpoint](ctx, resources, resource.NewMetadata(k8s.ControlPlaneNamespaceName, k8s.EndpointType, "", resource.VersionUndefined))
if err != nil {
return nil, fmt.Errorf("error getting endpoints resources: %w", err)
}
iter := safe.IteratorFromList(endpointResources)
var endpointAddrs k8s.EndpointList
// merge all endpoints into a single list
for iter.Next() {
if iter.Value().Metadata().ID() == ignoreEndpointID {
continue
}
endpointAddrs = endpointAddrs.Merge(iter.Value())
}
if len(endpointAddrs) == 0 {
return nil, fmt.Errorf("no controlplane endpoints discovered yet")
}
endpoints := endpointAddrs.Strings()
// Etcd expects host:port format.
for i := 0; i < len(endpoints); i++ {
endpoints[i] = nethelpers.JoinHostPort(endpoints[i], constants.EtcdClientPort)
}
return endpoints, nil
}

View File

@ -13,9 +13,7 @@ import (
"os"
"time"
"github.com/cosi-project/runtime/pkg/resource"
"github.com/cosi-project/runtime/pkg/state"
"github.com/talos-systems/net"
"go.etcd.io/etcd/api/v3/etcdserverpb"
"go.etcd.io/etcd/api/v3/v3rpc/rpctypes"
"go.etcd.io/etcd/client/pkg/v3/transport"
@ -27,6 +25,7 @@ import (
"github.com/talos-systems/talos/pkg/machinery/config"
"github.com/talos-systems/talos/pkg/machinery/config/types/v1alpha1/machine"
"github.com/talos-systems/talos/pkg/machinery/constants"
"github.com/talos-systems/talos/pkg/machinery/nethelpers"
"github.com/talos-systems/talos/pkg/machinery/resources/k8s"
)
@ -42,9 +41,9 @@ type Client struct {
// a list of endpoints.
func NewClient(endpoints []string) (client *Client, err error) {
tlsInfo := transport.TLSInfo{
CertFile: constants.KubernetesEtcdAdminCert,
KeyFile: constants.KubernetesEtcdAdminKey,
TrustedCAFile: constants.KubernetesEtcdCACert,
CertFile: constants.EtcdAdminCert,
KeyFile: constants.EtcdAdminKey,
TrustedCAFile: constants.EtcdCACert,
}
tlsConfig, err := tlsInfo.ClientConfig()
@ -68,7 +67,7 @@ func NewClient(endpoints []string) (client *Client, err error) {
// NewLocalClient initializes and returns etcd client configured to talk to localhost endpoint.
func NewLocalClient() (client *Client, err error) {
return NewClient([]string{"127.0.0.1:2379"})
return NewClient([]string{nethelpers.JoinHostPort("localhost", constants.EtcdClientPort)})
}
// NewClientFromControlPlaneIPs initializes and returns an etcd client
@ -86,31 +85,9 @@ func NewClientFromControlPlaneIPsNoDiscovery(ctx context.Context, resources stat
}
func newClientFromControlPlaneIPs(ctx context.Context, resources state.State, ignoreEndpointID string) (client *Client, err error) {
endpointResources, err := resources.List(ctx, resource.NewMetadata(k8s.ControlPlaneNamespaceName, k8s.EndpointType, "", resource.VersionUndefined))
endpoints, err := getEndpoints(ctx, resources, ignoreEndpointID)
if err != nil {
return nil, fmt.Errorf("error getting endpoints resources: %w", err)
}
var endpointAddrs k8s.EndpointList
// merge all endpoints into a single list
for _, res := range endpointResources.Items {
if res.Metadata().ID() == ignoreEndpointID {
continue
}
endpointAddrs = endpointAddrs.Merge(res.(*k8s.Endpoint))
}
if len(endpointAddrs) == 0 {
return nil, fmt.Errorf("no controlplane endpoints discovered yet")
}
endpoints := endpointAddrs.Strings()
// Etcd expects host:port format.
for i := 0; i < len(endpoints); i++ {
endpoints[i] = net.FormatAddress(endpoints[i]) + ":2379"
return nil, err
}
// Shuffle endpoints to establish random order on each call to avoid patterns based on sorted IP list.

View File

@ -166,32 +166,35 @@ const (
// KubernetesCACert is the path to the root CA certificate.
KubernetesCACert = DefaultCertificatesDir + "/" + "ca.crt"
// KubernetesEtcdCACert is the path to the etcd CA certificate.
KubernetesEtcdCACert = EtcdPKIPath + "/" + "ca.crt"
// EtcdCACert is the path to the etcd CA certificate.
EtcdCACert = EtcdPKIPath + "/" + "ca.crt"
// KubernetesEtcdCAKey is the path to the etcd CA private key.
KubernetesEtcdCAKey = EtcdPKIPath + "/" + "ca.key"
// EtcdCAKey is the path to the etcd CA private key.
EtcdCAKey = EtcdPKIPath + "/" + "ca.key"
// KubernetesEtcdCert is the path to the etcd server certificate.
KubernetesEtcdCert = EtcdPKIPath + "/" + "server.crt"
// EtcdCert is the path to the etcd server certificate.
EtcdCert = EtcdPKIPath + "/" + "server.crt"
// KubernetesEtcdKey is the path to the etcd server private key.
KubernetesEtcdKey = EtcdPKIPath + "/" + "server.key"
// EtcdKey is the path to the etcd server private key.
EtcdKey = EtcdPKIPath + "/" + "server.key"
// KubernetesEtcdPeerCert is the path to the etcd peer certificate.
KubernetesEtcdPeerCert = EtcdPKIPath + "/" + "peer.crt"
// EtcdPeerCert is the path to the etcd peer certificate.
EtcdPeerCert = EtcdPKIPath + "/" + "peer.crt"
// KubernetesEtcdPeerKey is the path to the etcd peer private key.
KubernetesEtcdPeerKey = EtcdPKIPath + "/" + "peer.key"
// EtcdPeerKey is the path to the etcd peer private key.
EtcdPeerKey = EtcdPKIPath + "/" + "peer.key"
// KubernetesEtcdAdminCert is the path to the talos client certificate.
KubernetesEtcdAdminCert = EtcdPKIPath + "/" + "admin.crt"
// EtcdAdminCert is the path to the talos client certificate.
EtcdAdminCert = EtcdPKIPath + "/" + "admin.crt"
// KubernetesEtcdAdminKey is the path to the talos client private key.
KubernetesEtcdAdminKey = EtcdPKIPath + "/" + "admin.key"
// EtcdAdminKey is the path to the talos client private key.
EtcdAdminKey = EtcdPKIPath + "/" + "admin.key"
// KubernetesEtcdListenClientPort defines the port etcd listen on for client traffic.
KubernetesEtcdListenClientPort = "2379"
// EtcdClientPort defines the port etcd listen on for client traffic.
EtcdClientPort = 2379
// EtcdPeerPort defines the port etcd listens on for peer traffic.
EtcdPeerPort = 2380
// KubernetesAdminCertCommonName defines CN property of Kubernetes admin certificate.
KubernetesAdminCertCommonName = "admin"