fix: use 'no block' etcd dial with multiple endpoints

The problem showed up on 'reset' of the Talos node which had multiple
endpoints for other control plane nodes, many of which weren't actually
available.

When 'grpc.WithBlock()' is used, etcd will try to dial the first
endpoint and return an error if the dial fails.

Use noblock mode by default with multiple endpoints, and blocking mode
with a single endpoint.

Pass the context to etcd to properly abort dial operations if the
context get canceled.

Signed-off-by: Andrey Smirnov <andrey.smirnov@talos-systems.com>
This commit is contained in:
Andrey Smirnov 2023-03-20 22:34:58 +04:00
parent 28713c2c4d
commit bec89bf6e5
No known key found for this signature in database
GPG Key ID: 7B26396447AB6DFD
7 changed files with 29 additions and 22 deletions

View File

@ -1720,7 +1720,7 @@ func (s *Server) EtcdMemberList(ctx context.Context, in *machine.EtcdMemberListR
)
if in.QueryLocal {
client, err = etcd.NewLocalClient()
client, err = etcd.NewLocalClient(ctx)
} else {
client, err = etcd.NewClientFromControlPlaneIPs(ctx, s.Controller.Runtime().State().V1Alpha2().Resources())
}
@ -1879,7 +1879,10 @@ func (s *Server) EtcdSnapshot(in *machine.EtcdSnapshotRequest, srv machine.Machi
return err
}
client, err := etcd.NewLocalClient()
ctx, cancel := context.WithCancel(srv.Context())
defer cancel()
client, err := etcd.NewLocalClient(ctx)
if err != nil {
return fmt.Errorf("failed to create etcd client: %w", err)
}
@ -1892,9 +1895,6 @@ func (s *Server) EtcdSnapshot(in *machine.EtcdSnapshotRequest, srv machine.Machi
return fmt.Errorf("failed reading etcd snapshot: %w", err)
}
ctx, cancel := context.WithCancel(srv.Context())
defer cancel()
chunker := stream.NewChunker(ctx, rd)
chunkCh := chunker.Read()
@ -2006,7 +2006,7 @@ func (s *Server) EtcdAlarmList(ctx context.Context, in *emptypb.Empty) (*machine
return nil, err
}
client, err := etcd.NewLocalClient()
client, err := etcd.NewLocalClient(ctx)
if err != nil {
return nil, fmt.Errorf("failed to create etcd client: %w", err)
}
@ -2036,7 +2036,7 @@ func (s *Server) EtcdAlarmDisarm(ctx context.Context, in *emptypb.Empty) (*machi
return nil, err
}
client, err := etcd.NewLocalClient()
client, err := etcd.NewLocalClient(ctx)
if err != nil {
return nil, fmt.Errorf("failed to create etcd client: %w", err)
}
@ -2069,7 +2069,7 @@ func (s *Server) EtcdDefragment(ctx context.Context, in *emptypb.Empty) (*machin
return nil, err
}
client, err := etcd.NewLocalClient()
client, err := etcd.NewLocalClient(ctx)
if err != nil {
return nil, fmt.Errorf("failed to create etcd client: %w", err)
}
@ -2097,7 +2097,7 @@ func (s *Server) EtcdStatus(ctx context.Context, in *emptypb.Empty) (*machine.Et
return nil, err
}
client, err := etcd.NewLocalClient()
client, err := etcd.NewLocalClient(ctx)
if err != nil {
return nil, fmt.Errorf("failed to create etcd client: %w", err)
}

View File

@ -118,7 +118,7 @@ func (ctrl *AdvertisedPeerController) updateAdvertisedPeers(ctx context.Context,
ctx, cancel := context.WithTimeout(ctx, 30*time.Second)
defer cancel()
client, err := etcdcli.NewLocalClient()
client, err := etcdcli.NewLocalClient(ctx)
if err != nil {
return fmt.Errorf("error creating etcd client: %w", err)
}

View File

@ -105,7 +105,7 @@ func (ctrl *MemberController) getLocalMemberID(ctx context.Context) (uint64, err
return ctrl.GetLocalMemberIDFunc(ctx)
}
client, err := pkgetcd.NewLocalClient()
client, err := pkgetcd.NewLocalClient(ctx)
if err != nil {
return 0, err
}

View File

@ -211,7 +211,7 @@ func (vip *VIP) campaign(ctx context.Context, notifyCh chan<- struct{}) error {
return fmt.Errorf("refusing to join election without a hostname")
}
ec, err := etcd.NewLocalClient()
ec, err := etcd.NewLocalClient(ctx)
if err != nil {
return fmt.Errorf("failed to create local etcd client: %w", err)
}

View File

@ -28,6 +28,7 @@ import (
"github.com/siderolabs/go-retry/retry"
clientv3 "go.etcd.io/etcd/client/v3"
snapshot "go.etcd.io/etcd/etcdutl/v3/snapshot"
"google.golang.org/grpc"
"github.com/siderolabs/talos/internal/app/machined/pkg/runtime"
"github.com/siderolabs/talos/internal/app/machined/pkg/system"
@ -234,7 +235,7 @@ func (e *Etcd) HealthFunc(runtime.Runtime) health.Check {
if e.client == nil {
var err error
e.client, err = etcd.NewLocalClient()
e.client, err = etcd.NewLocalClient(ctx)
if err != nil {
return err
}
@ -583,6 +584,7 @@ func promoteMember(ctx context.Context, r runtime.Runtime, memberID uint64) erro
return retry.Constant(10*time.Minute,
retry.WithUnits(15*time.Second),
retry.WithAttemptTimeout(30*time.Second),
retry.WithJitter(time.Second),
retry.WithErrorLogging(true),
).RetryWithContext(ctx, func(ctx context.Context) error {
@ -617,7 +619,7 @@ func promoteMember(ctx context.Context, r runtime.Runtime, memberID uint64) erro
}
func attemptPromote(ctx context.Context, endpoint string, memberID uint64) error {
client, err := etcd.NewClient([]string{endpoint})
client, err := etcd.NewClient(ctx, []string{endpoint}, grpc.WithBlock())
if err != nil {
return err
}

View File

@ -40,7 +40,7 @@ type Client struct {
// NewClient initializes and returns an etcd client configured to talk to
// a list of endpoints.
func NewClient(endpoints []string) (client *Client, err error) {
func NewClient(ctx context.Context, endpoints []string, dialOpts ...grpc.DialOption) (client *Client, err error) {
tlsInfo := transport.TLSInfo{
CertFile: constants.EtcdAdminCert,
KeyFile: constants.EtcdAdminKey,
@ -55,7 +55,8 @@ func NewClient(endpoints []string) (client *Client, err error) {
c, err := clientv3.New(clientv3.Config{
Endpoints: endpoints,
DialTimeout: 5 * time.Second,
DialOptions: []grpc.DialOption{grpc.WithBlock()},
Context: ctx,
DialOptions: dialOpts,
TLS: tlsConfig,
Logger: zap.NewNop(),
})
@ -67,13 +68,17 @@ func NewClient(endpoints []string) (client *Client, err error) {
}
// NewLocalClient initializes and returns etcd client configured to talk to localhost endpoint.
func NewLocalClient() (client *Client, err error) {
return NewClient([]string{nethelpers.JoinHostPort("localhost", constants.EtcdClientPort)})
func NewLocalClient(ctx context.Context, dialOpts ...grpc.DialOption) (client *Client, err error) {
return NewClient(
ctx,
[]string{nethelpers.JoinHostPort("localhost", constants.EtcdClientPort)},
append([]grpc.DialOption{grpc.WithBlock()}, dialOpts...)...,
)
}
// NewClientFromControlPlaneIPs initializes and returns an etcd client
// configured to talk to all members.
func NewClientFromControlPlaneIPs(ctx context.Context, resources state.State) (client *Client, err error) {
func NewClientFromControlPlaneIPs(ctx context.Context, resources state.State, dialOpts ...grpc.DialOption) (client *Client, err error) {
endpoints, err := GetEndpoints(ctx, resources)
if err != nil {
return nil, err
@ -82,7 +87,7 @@ func NewClientFromControlPlaneIPs(ctx context.Context, resources state.State) (c
// Shuffle endpoints to establish random order on each call to avoid patterns based on sorted IP list.
rand.Shuffle(len(endpoints), func(i, j int) { endpoints[i], endpoints[j] = endpoints[j], endpoints[i] })
return NewClient(endpoints)
return NewClient(ctx, endpoints, dialOpts...)
}
// ValidateForUpgrade validates the etcd cluster state to ensure that performing
@ -141,7 +146,7 @@ func (c *Client) ValidateQuorum(ctx context.Context) (err error) {
}
func validateMemberHealth(ctx context.Context, memberURIs []string) (err error) {
c, err := NewClient(memberURIs)
c, err := NewClient(ctx, memberURIs)
if err != nil {
return fmt.Errorf("failed to create client to member: %w", err)
}

View File

@ -14,7 +14,7 @@ import (
// WithLock executes the given function exclusively by acquiring an Etcd lock with the given key.
func WithLock(ctx context.Context, key string, logger *zap.Logger, f func() error) error {
etcdClient, err := NewLocalClient()
etcdClient, err := NewLocalClient(ctx)
if err != nil {
return fmt.Errorf("error creating etcd client: %w", err)
}