feat: add etcd API

This adds RPCs for basic etcd management tasks.

Signed-off-by: Andrew Rynhard <andrew@rynhard.io>
This commit is contained in:
Andrew Rynhard 2020-10-03 12:04:13 -07:00 committed by Andrew Rynhard
parent 9c969a4be5
commit 4eeef28e90
7 changed files with 1445 additions and 431 deletions

View File

@ -14,7 +14,8 @@ import "common/common.proto";
// The machine service definition.
service MachineService {
rpc ApplyConfiguration(ApplyConfigurationRequest) returns (ApplyConfigurationResponse);
rpc ApplyConfiguration(ApplyConfigurationRequest)
returns (ApplyConfigurationResponse);
rpc Bootstrap(BootstrapRequest) returns (BootstrapResponse);
rpc Containers(ContainersRequest) returns (ContainersResponse);
rpc Copy(CopyRequest) returns (stream common.Data);
@ -22,6 +23,11 @@ service MachineService {
rpc DiskStats(google.protobuf.Empty) returns (DiskStatsResponse);
rpc Dmesg(DmesgRequest) returns (stream common.Data);
rpc Events(EventsRequest) returns (stream Event);
rpc EtcdMemberList(EtcdMemberListRequest) returns (EtcdMemberListResponse);
rpc EtcdLeaveCluster(EtcdLeaveClusterRequest)
returns (EtcdLeaveClusterResponse);
rpc EtcdForfeitLeadership(EtcdForfeitLeadershipRequest)
returns (EtcdForfeitLeadershipResponse);
rpc Hostname(google.protobuf.Empty) returns (HostnameResponse);
rpc Kubeconfig(google.protobuf.Empty) returns (stream common.Data);
rpc List(ListRequest) returns (stream FileInfo);
@ -29,7 +35,8 @@ service MachineService {
rpc Logs(LogsRequest) returns (stream common.Data);
rpc Memory(google.protobuf.Empty) returns (MemoryResponse);
rpc Mounts(google.protobuf.Empty) returns (MountsResponse);
rpc NetworkDeviceStats(google.protobuf.Empty) returns (NetworkDeviceStatsResponse);
rpc NetworkDeviceStats(google.protobuf.Empty)
returns (NetworkDeviceStatsResponse);
rpc Processes(google.protobuf.Empty) returns (ProcessesResponse);
rpc Read(ReadRequest) returns (stream common.Data);
rpc Reboot(google.protobuf.Empty) returns (RebootResponse);
@ -49,10 +56,9 @@ service MachineService {
}
// rpc applyConfiguration
// ApplyConfiguration describes a request to assert a new configuration upon a node.
message ApplyConfigurationRequest {
bytes data = 1;
}
// ApplyConfiguration describes a request to assert a new configuration upon a
// node.
message ApplyConfigurationRequest { bytes data = 1; }
// ApplyConfigurationResponse describes the response to a configuration request.
message ApplyConfiguration { common.Metadata metadata = 1; }
@ -499,9 +505,7 @@ message MemInfo {
// rpc Hostname
message HostnameResponse {
repeated Hostname messages = 1;
}
message HostnameResponse { repeated Hostname messages = 1; }
message Hostname {
common.Metadata metadata = 1;
@ -510,9 +514,7 @@ message Hostname {
// rpc LoadAvg
message LoadAvgResponse {
repeated LoadAvg messages = 1;
}
message LoadAvgResponse { repeated LoadAvg messages = 1; }
message LoadAvg {
common.Metadata metadata = 1;
@ -521,12 +523,9 @@ message LoadAvg {
double load15 = 4;
}
// rpc SystemStat
message SystemStatResponse {
repeated SystemStat messages = 1;
}
message SystemStatResponse { repeated SystemStat messages = 1; }
message SystemStat {
common.Metadata metadata = 1;
@ -569,12 +568,9 @@ message SoftIRQStat {
uint64 rcu = 10;
}
// rpc CPUInfo
message CPUInfoResponse {
repeated CPUsInfo messages = 1;
}
message CPUInfoResponse { repeated CPUsInfo messages = 1; }
message CPUsInfo {
common.Metadata metadata = 1;
@ -610,12 +606,9 @@ message CPUInfo {
string power_management = 26;
}
// rpc NetworkDeviceStats
message NetworkDeviceStatsResponse {
repeated NetworkDeviceStats messages = 1;
}
message NetworkDeviceStatsResponse { repeated NetworkDeviceStats messages = 1; }
message NetworkDeviceStats {
common.Metadata metadata = 1;
@ -643,12 +636,9 @@ message NetDev {
uint64 tx_compressed = 17;
}
// rpc DiskStats
message DiskStatsResponse {
repeated DiskStats messages = 1;
}
message DiskStatsResponse { repeated DiskStats messages = 1; }
message DiskStats {
common.Metadata metadata = 1;
@ -674,3 +664,23 @@ message DiskStat {
uint64 discard_sectors = 15;
uint64 discard_time_ms = 16;
}
message EtcdLeaveClusterRequest {}
message EtcdLeaveCluster { common.Metadata metadata = 1; }
message EtcdLeaveClusterResponse { repeated EtcdLeaveCluster messages = 1; }
message EtcdForfeitLeadershipRequest {}
message EtcdForfeitLeadership {
common.Metadata metadata = 1;
string member = 2;
}
message EtcdForfeitLeadershipResponse {
repeated EtcdForfeitLeadership messages = 1;
}
message EtcdMemberListRequest {}
message EtcdMemberList {
common.Metadata metadata = 1;
repeated string members = 2;
}
message EtcdMemberListResponse { repeated EtcdMemberList messages = 1; }

View File

@ -14,7 +14,6 @@ import (
"os"
"github.com/kubernetes-sigs/bootkube/pkg/recovery"
"go.etcd.io/etcd/clientv3"
k8saes "k8s.io/apiserver/pkg/storage/value/encrypt/aes"
"github.com/talos-systems/talos/internal/pkg/etcd"
@ -40,7 +39,7 @@ func recoverAssets(config config.Provider) error {
switch *recoverSource {
case machineapi.RecoverRequest_ETCD.String():
var client *clientv3.Client
var client *etcd.Client
client, err = etcd.NewClient([]string{"127.0.0.1:2379"})
if err != nil {
@ -54,7 +53,7 @@ func recoverAssets(config config.Provider) error {
return err
}
backend = recovery.NewEtcdBackendWithTransformer(client, "/registry", transform)
backend = recovery.NewEtcdBackendWithTransformer(client.Client, "/registry", transform)
case machineapi.RecoverRequest_APISERVER.String():
backend, err = recovery.NewAPIServerBackend(constants.RecoveryKubeconfig)
if err != nil {

View File

@ -240,7 +240,12 @@ func (s *Server) Upgrade(ctx context.Context, in *machine.UpgradeRequest) (reply
return nil, fmt.Errorf("error validating installer image %q: %w", in.GetImage(), err)
}
if err = etcd.ValidateForUpgrade(s.Controller.Runtime().Config(), in.GetPreserve()); err != nil {
client, err := etcd.NewClientFromControlPlaneIPs(ctx, s.Controller.Runtime().Config().Cluster().CA(), s.Controller.Runtime().Config().Cluster().Endpoint())
if err != nil {
return nil, fmt.Errorf("failed to create etcd client: %w", err)
}
if err = client.ValidateForUpgrade(ctx, s.Controller.Runtime().Config(), in.GetPreserve()); err != nil {
return nil, fmt.Errorf("error validating etcd for upgrade: %w", err)
}
@ -1200,3 +1205,84 @@ func (s *Server) Memory(ctx context.Context, in *empty.Empty) (reply *machine.Me
return reply, err
}
// EtcdMemberList implements the machine.MachineServer interface.
func (s *Server) EtcdMemberList(ctx context.Context, in *machine.EtcdMemberListRequest) (reply *machine.EtcdMemberListResponse, err error) {
client, err := etcd.NewClientFromControlPlaneIPs(ctx, s.Controller.Runtime().Config().Cluster().CA(), s.Controller.Runtime().Config().Cluster().Endpoint())
if err != nil {
return nil, err
}
// nolint: errcheck
defer client.Close()
resp, err := client.MemberList(ctx)
if err != nil {
return nil, err
}
members := make([]string, 0, len(resp.Members))
for _, member := range resp.Members {
members = append(members, member.GetName())
}
reply = &machine.EtcdMemberListResponse{
Messages: []*machine.EtcdMemberList{
{
Members: members,
},
},
}
return reply, nil
}
// EtcdLeaveCluster implements the machine.MachineServer interface.
func (s *Server) EtcdLeaveCluster(ctx context.Context, in *machine.EtcdLeaveClusterRequest) (reply *machine.EtcdLeaveClusterResponse, err error) {
client, err := etcd.NewClientFromControlPlaneIPs(ctx, s.Controller.Runtime().Config().Cluster().CA(), s.Controller.Runtime().Config().Cluster().Endpoint())
if err != nil {
return nil, fmt.Errorf("failed to create etcd client: %w", err)
}
// nolint: errcheck
defer client.Close()
if err = client.LeaveCluster(ctx); err != nil {
return nil, fmt.Errorf("failed to leave cluster: %w", err)
}
reply = &machine.EtcdLeaveClusterResponse{
Messages: []*machine.EtcdLeaveCluster{
{},
},
}
return reply, nil
}
// EtcdForfeitLeadership implements the machine.MachineServer interface.
func (s *Server) EtcdForfeitLeadership(ctx context.Context, in *machine.EtcdForfeitLeadershipRequest) (reply *machine.EtcdForfeitLeadershipResponse, err error) {
client, err := etcd.NewClientFromControlPlaneIPs(ctx, s.Controller.Runtime().Config().Cluster().CA(), s.Controller.Runtime().Config().Cluster().Endpoint())
if err != nil {
return nil, fmt.Errorf("failed to create etcd client: %w", err)
}
// nolint: errcheck
defer client.Close()
leader, err := client.ForfeitLeadership(ctx)
if err != nil {
return nil, fmt.Errorf("failed to forfeit leadership: %w", err)
}
reply = &machine.EtcdForfeitLeadershipResponse{
Messages: []*machine.EtcdForfeitLeadership{
{
Member: leader,
},
},
}
return reply, nil
}

View File

@ -1160,53 +1160,16 @@ func UncordonNode(seq runtime.Sequence, data interface{}) (runtime.TaskExecution
// nolint: gocyclo
func LeaveEtcd(seq runtime.Sequence, data interface{}) (runtime.TaskExecutionFunc, string) {
return func(ctx context.Context, logger *log.Logger, r runtime.Runtime) (err error) {
hostname, err := os.Hostname()
if err != nil {
return err
}
client, err := etcd.NewClientFromControlPlaneIPs(ctx, r.Config().Cluster().CA(), r.Config().Cluster().Endpoint())
if err != nil {
return err
return fmt.Errorf("failed to create etcd client: %w", err)
}
// nolint: errcheck
defer client.Close()
resp, err := client.MemberList(ctx)
if err != nil {
return err
}
var id *uint64
for _, member := range resp.Members {
if member.Name == hostname {
member := member
id = &member.ID
break
}
}
if id == nil {
return fmt.Errorf("failed to find %q in list of etcd members", hostname)
}
logger.Println("leaving etcd cluster")
_, err = client.MemberRemove(ctx, *id)
if err != nil {
return err
}
if err = system.Services(nil).Stop(ctx, "etcd"); err != nil {
return err
}
// Once the member is removed, the data is no longer valid.
if err = os.RemoveAll(constants.EtcdDataPath); err != nil {
return err
if err = client.LeaveCluster(ctx); err != nil {
return fmt.Errorf("failed to leave cluster: %w", err)
}
return nil

View File

@ -0,0 +1,139 @@
// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this
// file, You can obtain one at http://mozilla.org/MPL/2.0/.
// +build integration_api
package api
import (
"context"
"io"
"testing"
"time"
"github.com/golang/protobuf/ptypes/empty"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"github.com/talos-systems/talos/internal/integration/base"
machineapi "github.com/talos-systems/talos/pkg/machinery/api/machine"
"github.com/talos-systems/talos/pkg/machinery/client"
"github.com/talos-systems/talos/pkg/machinery/config/types/v1alpha1/machine"
"github.com/talos-systems/talos/pkg/machinery/constants"
)
type EtcdSuite struct {
base.APISuite
ctx context.Context
ctxCancel context.CancelFunc
}
// SuiteName ...
func (suite *EtcdSuite) SuiteName() string {
return "api.EtcdSuite"
}
// SetupTest ...
func (suite *EtcdSuite) SetupTest() {
if testing.Short() {
suite.T().Skip("skipping in short mode")
}
// make sure we abort at some point in time, but give enough room for Etcds
suite.ctx, suite.ctxCancel = context.WithTimeout(context.Background(), 30*time.Minute)
}
// TearDownTest ...
func (suite *EtcdSuite) TearDownTest() {
if suite.ctxCancel != nil {
suite.ctxCancel()
}
}
// TestEtcdForfeitLeadership tests moving etcd leadership to another member.
func (suite *EtcdSuite) TestEtcdForfeitLeadership() {
if suite.Cluster == nil {
suite.T().Skip("without full cluster state etcd test is not reliable (can't wait for cluster readiness in between resets)")
}
nodes := suite.DiscoverNodes().NodesByType(machine.TypeControlPlane)
var leader string
for _, node := range nodes {
resp, err := suite.Client.MachineClient.EtcdForfeitLeadership(client.WithNodes(suite.ctx, node), &machineapi.EtcdForfeitLeadershipRequest{})
suite.Require().NoError(err)
suite.Assert().Empty(resp.Messages[0].Metadata.Error, "node: %s", node)
if resp.Messages[0].GetMember() != "" {
leader = resp.Messages[0].GetMember()
suite.T().Log("Moved leadership to", leader)
}
}
suite.Assert().NotEmpty(leader)
}
// TestEtcdLeaveCluster tests removing an etcd member.
func (suite *EtcdSuite) TestEtcdLeaveCluster() {
if !suite.Capabilities().SupportsReboot {
suite.T().Skip("cluster doesn't support reboot (and reset)")
}
if suite.Cluster == nil {
suite.T().Skip("without full cluster state reset test is not reliable (can't wait for cluster readiness in between resets)")
}
nodes := suite.DiscoverNodes().NodesByType(machine.TypeControlPlane)
node := nodes[2]
suite.T().Log("Removing etcd member", node)
nodeCtx := client.WithNodes(suite.ctx, node)
_, err := suite.Client.MachineClient.EtcdForfeitLeadership(nodeCtx, &machineapi.EtcdForfeitLeadershipRequest{})
suite.Require().NoError(err)
_, err = suite.Client.MachineClient.EtcdLeaveCluster(nodeCtx, &machineapi.EtcdLeaveClusterRequest{})
suite.Require().NoError(err)
services, err := suite.Client.MachineClient.ServiceList(nodeCtx, &empty.Empty{})
suite.Require().NoError(err)
for _, service := range services.Messages[0].GetServices() {
if service.Id == "etcd" && service.State != "Finished" {
suite.Assert().Equal("Finished", service.State)
}
}
stream, err := suite.Client.MachineClient.List(nodeCtx, &machineapi.ListRequest{Root: constants.EtcdDataPath})
suite.Require().NoError(err)
for {
info, err := stream.Recv()
if err != nil {
if err == io.EOF || status.Code(err) == codes.Canceled {
break
}
}
suite.Assert().Equal("rpc error: code = Unknown desc = lstat /var/lib/etcd: no such file or directory", info.Metadata.Error)
}
// NB: Reboot the node so that it can rejoin the etcd cluster. This allows us
// to check the cluster health and catch any issues in rejoining.
suite.AssertRebooted(suite.ctx, node, func(nodeCtx context.Context) error {
_, err = suite.Client.MachineClient.Reboot(nodeCtx, &empty.Empty{})
return err
}, 10*time.Minute)
}
func init() {
allSuites = append(allSuites, new(EtcdSuite))
}

View File

@ -7,10 +7,13 @@ package etcd
import (
"context"
"fmt"
"log"
"net/url"
"os"
"time"
"go.etcd.io/etcd/clientv3"
"go.etcd.io/etcd/etcdserver/etcdserverpb"
"go.etcd.io/etcd/pkg/transport"
"google.golang.org/grpc"
@ -18,15 +21,21 @@ import (
"github.com/talos-systems/net"
"github.com/talos-systems/talos/internal/app/machined/pkg/system"
"github.com/talos-systems/talos/pkg/kubernetes"
"github.com/talos-systems/talos/pkg/machinery/config"
"github.com/talos-systems/talos/pkg/machinery/config/types/v1alpha1/machine"
"github.com/talos-systems/talos/pkg/machinery/constants"
)
// Client is a wrapper around the official etcd client.
type Client struct {
*clientv3.Client
}
// NewClient initializes and returns an etcd client configured to talk to
// a local endpoint.
func NewClient(endpoints []string) (client *clientv3.Client, err error) {
func NewClient(endpoints []string) (client *Client, err error) {
tlsInfo := transport.TLSInfo{
CertFile: constants.KubernetesEtcdPeerCert,
KeyFile: constants.KubernetesEtcdPeerKey,
@ -38,7 +47,7 @@ func NewClient(endpoints []string) (client *clientv3.Client, err error) {
return nil, err
}
client, err = clientv3.New(clientv3.Config{
c, err := clientv3.New(clientv3.Config{
Endpoints: endpoints,
DialTimeout: 5 * time.Second,
DialOptions: []grpc.DialOption{grpc.WithBlock()},
@ -48,12 +57,12 @@ func NewClient(endpoints []string) (client *clientv3.Client, err error) {
return nil, err
}
return client, nil
return &Client{Client: c}, nil
}
// NewClientFromControlPlaneIPs initializes and returns an etcd client
// configured to talk to all members.
func NewClientFromControlPlaneIPs(ctx context.Context, creds *x509.PEMEncodedCertificateAndKey, endpoint *url.URL) (client *clientv3.Client, err error) {
func NewClientFromControlPlaneIPs(ctx context.Context, creds *x509.PEMEncodedCertificateAndKey, endpoint *url.URL) (client *Client, err error) {
h, err := kubernetes.NewTemporaryClientFromPKI(creds, endpoint)
if err != nil {
return nil, err
@ -75,17 +84,9 @@ func NewClientFromControlPlaneIPs(ctx context.Context, creds *x509.PEMEncodedCer
// ValidateForUpgrade validates the etcd cluster state to ensure that performing
// an upgrade is safe.
func ValidateForUpgrade(config config.Provider, preserve bool) error {
func (c *Client) ValidateForUpgrade(ctx context.Context, config config.Provider, preserve bool) error {
if config.Machine().Type() != machine.TypeJoin {
client, err := NewClientFromControlPlaneIPs(context.TODO(), config.Cluster().CA(), config.Cluster().Endpoint())
if err != nil {
return err
}
// nolint: errcheck
defer client.Close()
resp, err := client.MemberList(context.Background())
resp, err := c.MemberList(context.Background())
if err != nil {
return err
}
@ -106,3 +107,112 @@ func ValidateForUpgrade(config config.Provider, preserve bool) error {
return nil
}
// LeaveCluster removes the current member from the etcd cluster.
//
// nolint: gocyclo
func (c *Client) LeaveCluster(ctx context.Context) error {
hostname, err := os.Hostname()
if err != nil {
return err
}
resp, err := c.MemberList(ctx)
if err != nil {
return err
}
var id *uint64
for _, member := range resp.Members {
if member.Name == hostname {
member := member
id = &member.ID
break
}
}
if id == nil {
return fmt.Errorf("failed to find %q in list of etcd members", hostname)
}
_, err = c.MemberRemove(ctx, *id)
if err != nil {
return fmt.Errorf("failed to remove member %d: %w", *id, err)
}
if err = system.Services(nil).Stop(ctx, "etcd"); err != nil {
return fmt.Errorf("failed to stop etcd: %w", err)
}
// Once the member is removed, the data is no longer valid.
if err = os.RemoveAll(constants.EtcdDataPath); err != nil {
return fmt.Errorf("failed to remove %s: %w", constants.EtcdDataPath, err)
}
return nil
}
// ForfeitLeadership transfers leadership from the current member to another
// member.
//
// nolint: gocyclo
func (c *Client) ForfeitLeadership(ctx context.Context) (string, error) {
hostname, err := os.Hostname()
if err != nil {
return "", fmt.Errorf("failed to get hostname: %w", err)
}
resp, err := c.MemberList(ctx)
if err != nil {
return "", fmt.Errorf("failed to list etcd members: %w", err)
}
if len(resp.Members) == 1 {
return "", fmt.Errorf("cannot forfeit leadership, only one member")
}
var member *etcdserverpb.Member
for _, m := range resp.Members {
if m.Name == hostname {
member = m
break
}
}
if member == nil {
return "", fmt.Errorf("failed to find %q in list of etcd members", hostname)
}
for _, ep := range member.GetClientURLs() {
var status *clientv3.StatusResponse
status, err = c.Status(ctx, ep)
if err != nil {
return "", err
}
if status.Leader != member.GetID() {
return "", nil
}
for _, m := range resp.Members {
if m.GetID() != member.GetID() {
log.Printf("moving leadership from %q to %q", member.GetName(), m.GetName())
c.SetEndpoints(ep)
_, err = c.MoveLeader(ctx, m.GetID())
if err != nil {
return "", err
}
return m.GetName(), nil
}
}
}
return "", nil
}

File diff suppressed because it is too large Load Diff