feat: implement server-side API for cluster health checks

This implements existing server-side health checks as defined in
`internal/pkg/cluster/checks` in Talos API.

Summary of changes:

* new `cluster` API

* `apid` now listens without auth on local file socket

* `cluster` API is for now implemented in `machined`, but we can move it
to the new service if we find it more appropriate

* `talosctl health` by default now does server-side health check

UX: `talosctl health` without arguments does health check for the
cluster if it has healthy K8s to return master/worker nodes. If needed,
node list can be overridden with flags.

Signed-off-by: Andrey Smirnov <smirnov.andrey@gmail.com>
This commit is contained in:
Andrey Smirnov 2020-07-13 23:48:41 +03:00 committed by talos-bot
parent 7d10677ee8
commit c54639e541
19 changed files with 978 additions and 82 deletions

View File

@ -52,6 +52,8 @@ COPY ./api/network/network.proto /api/network/network.proto
RUN protoc -I/api --go_out=plugins=grpc,paths=source_relative:/api network/network.proto RUN protoc -I/api --go_out=plugins=grpc,paths=source_relative:/api network/network.proto
COPY ./api/os/os.proto /api/os/os.proto COPY ./api/os/os.proto /api/os/os.proto
RUN protoc -I/api --go_out=plugins=grpc,paths=source_relative:/api os/os.proto RUN protoc -I/api --go_out=plugins=grpc,paths=source_relative:/api os/os.proto
COPY ./api/cluster/cluster.proto /api/cluster/cluster.proto
RUN protoc -I/api --go_out=plugins=grpc,paths=source_relative:/api cluster/cluster.proto
# Gofumports generated files to adjust import order # Gofumports generated files to adjust import order
RUN gofumports -w -local github.com/talos-systems/talos /api/ RUN gofumports -w -local github.com/talos-systems/talos /api/
@ -63,6 +65,7 @@ COPY --from=generate-build /api/security/security.pb.go /api/security/
COPY --from=generate-build /api/machine/machine.pb.go /api/machine/ COPY --from=generate-build /api/machine/machine.pb.go /api/machine/
COPY --from=generate-build /api/time/time.pb.go /api/time/ COPY --from=generate-build /api/time/time.pb.go /api/time/
COPY --from=generate-build /api/network/network.pb.go /api/network/ COPY --from=generate-build /api/network/network.pb.go /api/network/
COPY --from=generate-build /api/cluster/cluster.pb.go /api/cluster/
# The base target provides a container that can be used to build all Talos # The base target provides a container that can be used to build all Talos
# assets. # assets.

460
api/cluster/cluster.pb.go Normal file
View File

@ -0,0 +1,460 @@
// Code generated by protoc-gen-go. DO NOT EDIT.
// versions:
// protoc-gen-go v1.23.0
// protoc v3.12.3
// source: cluster/cluster.proto
package cluster
import (
context "context"
reflect "reflect"
sync "sync"
proto "github.com/golang/protobuf/proto"
duration "github.com/golang/protobuf/ptypes/duration"
grpc "google.golang.org/grpc"
codes "google.golang.org/grpc/codes"
status "google.golang.org/grpc/status"
protoreflect "google.golang.org/protobuf/reflect/protoreflect"
protoimpl "google.golang.org/protobuf/runtime/protoimpl"
common "github.com/talos-systems/talos/api/common"
)
const (
// Verify that this generated code is sufficiently up-to-date.
_ = protoimpl.EnforceVersion(20 - protoimpl.MinVersion)
// Verify that runtime/protoimpl is sufficiently up-to-date.
_ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20)
)
// This is a compile-time assertion that a sufficiently up-to-date version
// of the legacy proto package is being used.
const _ = proto.ProtoPackageIsVersion4
type HealthCheckRequest struct {
state protoimpl.MessageState
sizeCache protoimpl.SizeCache
unknownFields protoimpl.UnknownFields
WaitTimeout *duration.Duration `protobuf:"bytes,1,opt,name=wait_timeout,json=waitTimeout,proto3" json:"wait_timeout,omitempty"`
ClusterInfo *ClusterInfo `protobuf:"bytes,2,opt,name=cluster_info,json=clusterInfo,proto3" json:"cluster_info,omitempty"`
}
func (x *HealthCheckRequest) Reset() {
*x = HealthCheckRequest{}
if protoimpl.UnsafeEnabled {
mi := &file_cluster_cluster_proto_msgTypes[0]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
}
func (x *HealthCheckRequest) String() string {
return protoimpl.X.MessageStringOf(x)
}
func (*HealthCheckRequest) ProtoMessage() {}
func (x *HealthCheckRequest) ProtoReflect() protoreflect.Message {
mi := &file_cluster_cluster_proto_msgTypes[0]
if protoimpl.UnsafeEnabled && x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
ms.StoreMessageInfo(mi)
}
return ms
}
return mi.MessageOf(x)
}
// Deprecated: Use HealthCheckRequest.ProtoReflect.Descriptor instead.
func (*HealthCheckRequest) Descriptor() ([]byte, []int) {
return file_cluster_cluster_proto_rawDescGZIP(), []int{0}
}
func (x *HealthCheckRequest) GetWaitTimeout() *duration.Duration {
if x != nil {
return x.WaitTimeout
}
return nil
}
func (x *HealthCheckRequest) GetClusterInfo() *ClusterInfo {
if x != nil {
return x.ClusterInfo
}
return nil
}
type ClusterInfo struct {
state protoimpl.MessageState
sizeCache protoimpl.SizeCache
unknownFields protoimpl.UnknownFields
ControlPlaneNodes []string `protobuf:"bytes,1,rep,name=control_plane_nodes,json=controlPlaneNodes,proto3" json:"control_plane_nodes,omitempty"`
WorkerNodes []string `protobuf:"bytes,2,rep,name=worker_nodes,json=workerNodes,proto3" json:"worker_nodes,omitempty"`
ForceEndpoint string `protobuf:"bytes,3,opt,name=force_endpoint,json=forceEndpoint,proto3" json:"force_endpoint,omitempty"`
}
func (x *ClusterInfo) Reset() {
*x = ClusterInfo{}
if protoimpl.UnsafeEnabled {
mi := &file_cluster_cluster_proto_msgTypes[1]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
}
func (x *ClusterInfo) String() string {
return protoimpl.X.MessageStringOf(x)
}
func (*ClusterInfo) ProtoMessage() {}
func (x *ClusterInfo) ProtoReflect() protoreflect.Message {
mi := &file_cluster_cluster_proto_msgTypes[1]
if protoimpl.UnsafeEnabled && x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
ms.StoreMessageInfo(mi)
}
return ms
}
return mi.MessageOf(x)
}
// Deprecated: Use ClusterInfo.ProtoReflect.Descriptor instead.
func (*ClusterInfo) Descriptor() ([]byte, []int) {
return file_cluster_cluster_proto_rawDescGZIP(), []int{1}
}
func (x *ClusterInfo) GetControlPlaneNodes() []string {
if x != nil {
return x.ControlPlaneNodes
}
return nil
}
func (x *ClusterInfo) GetWorkerNodes() []string {
if x != nil {
return x.WorkerNodes
}
return nil
}
func (x *ClusterInfo) GetForceEndpoint() string {
if x != nil {
return x.ForceEndpoint
}
return ""
}
type HealthCheckProgress struct {
state protoimpl.MessageState
sizeCache protoimpl.SizeCache
unknownFields protoimpl.UnknownFields
Metadata *common.Metadata `protobuf:"bytes,1,opt,name=metadata,proto3" json:"metadata,omitempty"`
Message string `protobuf:"bytes,2,opt,name=message,proto3" json:"message,omitempty"`
}
func (x *HealthCheckProgress) Reset() {
*x = HealthCheckProgress{}
if protoimpl.UnsafeEnabled {
mi := &file_cluster_cluster_proto_msgTypes[2]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
}
func (x *HealthCheckProgress) String() string {
return protoimpl.X.MessageStringOf(x)
}
func (*HealthCheckProgress) ProtoMessage() {}
func (x *HealthCheckProgress) ProtoReflect() protoreflect.Message {
mi := &file_cluster_cluster_proto_msgTypes[2]
if protoimpl.UnsafeEnabled && x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
ms.StoreMessageInfo(mi)
}
return ms
}
return mi.MessageOf(x)
}
// Deprecated: Use HealthCheckProgress.ProtoReflect.Descriptor instead.
func (*HealthCheckProgress) Descriptor() ([]byte, []int) {
return file_cluster_cluster_proto_rawDescGZIP(), []int{2}
}
func (x *HealthCheckProgress) GetMetadata() *common.Metadata {
if x != nil {
return x.Metadata
}
return nil
}
func (x *HealthCheckProgress) GetMessage() string {
if x != nil {
return x.Message
}
return ""
}
var File_cluster_cluster_proto protoreflect.FileDescriptor
var file_cluster_cluster_proto_rawDesc = []byte{
0x0a, 0x15, 0x63, 0x6c, 0x75, 0x73, 0x74, 0x65, 0x72, 0x2f, 0x63, 0x6c, 0x75, 0x73, 0x74, 0x65,
0x72, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x12, 0x07, 0x63, 0x6c, 0x75, 0x73, 0x74, 0x65, 0x72,
0x1a, 0x1e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75,
0x66, 0x2f, 0x64, 0x75, 0x72, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f,
0x1a, 0x13, 0x63, 0x6f, 0x6d, 0x6d, 0x6f, 0x6e, 0x2f, 0x63, 0x6f, 0x6d, 0x6d, 0x6f, 0x6e, 0x2e,
0x70, 0x72, 0x6f, 0x74, 0x6f, 0x22, 0x8b, 0x01, 0x0a, 0x12, 0x48, 0x65, 0x61, 0x6c, 0x74, 0x68,
0x43, 0x68, 0x65, 0x63, 0x6b, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x3c, 0x0a, 0x0c,
0x77, 0x61, 0x69, 0x74, 0x5f, 0x74, 0x69, 0x6d, 0x65, 0x6f, 0x75, 0x74, 0x18, 0x01, 0x20, 0x01,
0x28, 0x0b, 0x32, 0x19, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74,
0x6f, 0x62, 0x75, 0x66, 0x2e, 0x44, 0x75, 0x72, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x52, 0x0b, 0x77,
0x61, 0x69, 0x74, 0x54, 0x69, 0x6d, 0x65, 0x6f, 0x75, 0x74, 0x12, 0x37, 0x0a, 0x0c, 0x63, 0x6c,
0x75, 0x73, 0x74, 0x65, 0x72, 0x5f, 0x69, 0x6e, 0x66, 0x6f, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0b,
0x32, 0x14, 0x2e, 0x63, 0x6c, 0x75, 0x73, 0x74, 0x65, 0x72, 0x2e, 0x43, 0x6c, 0x75, 0x73, 0x74,
0x65, 0x72, 0x49, 0x6e, 0x66, 0x6f, 0x52, 0x0b, 0x63, 0x6c, 0x75, 0x73, 0x74, 0x65, 0x72, 0x49,
0x6e, 0x66, 0x6f, 0x22, 0x87, 0x01, 0x0a, 0x0b, 0x43, 0x6c, 0x75, 0x73, 0x74, 0x65, 0x72, 0x49,
0x6e, 0x66, 0x6f, 0x12, 0x2e, 0x0a, 0x13, 0x63, 0x6f, 0x6e, 0x74, 0x72, 0x6f, 0x6c, 0x5f, 0x70,
0x6c, 0x61, 0x6e, 0x65, 0x5f, 0x6e, 0x6f, 0x64, 0x65, 0x73, 0x18, 0x01, 0x20, 0x03, 0x28, 0x09,
0x52, 0x11, 0x63, 0x6f, 0x6e, 0x74, 0x72, 0x6f, 0x6c, 0x50, 0x6c, 0x61, 0x6e, 0x65, 0x4e, 0x6f,
0x64, 0x65, 0x73, 0x12, 0x21, 0x0a, 0x0c, 0x77, 0x6f, 0x72, 0x6b, 0x65, 0x72, 0x5f, 0x6e, 0x6f,
0x64, 0x65, 0x73, 0x18, 0x02, 0x20, 0x03, 0x28, 0x09, 0x52, 0x0b, 0x77, 0x6f, 0x72, 0x6b, 0x65,
0x72, 0x4e, 0x6f, 0x64, 0x65, 0x73, 0x12, 0x25, 0x0a, 0x0e, 0x66, 0x6f, 0x72, 0x63, 0x65, 0x5f,
0x65, 0x6e, 0x64, 0x70, 0x6f, 0x69, 0x6e, 0x74, 0x18, 0x03, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0d,
0x66, 0x6f, 0x72, 0x63, 0x65, 0x45, 0x6e, 0x64, 0x70, 0x6f, 0x69, 0x6e, 0x74, 0x22, 0x5d, 0x0a,
0x13, 0x48, 0x65, 0x61, 0x6c, 0x74, 0x68, 0x43, 0x68, 0x65, 0x63, 0x6b, 0x50, 0x72, 0x6f, 0x67,
0x72, 0x65, 0x73, 0x73, 0x12, 0x2c, 0x0a, 0x08, 0x6d, 0x65, 0x74, 0x61, 0x64, 0x61, 0x74, 0x61,
0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x10, 0x2e, 0x63, 0x6f, 0x6d, 0x6d, 0x6f, 0x6e, 0x2e,
0x4d, 0x65, 0x74, 0x61, 0x64, 0x61, 0x74, 0x61, 0x52, 0x08, 0x6d, 0x65, 0x74, 0x61, 0x64, 0x61,
0x74, 0x61, 0x12, 0x18, 0x0a, 0x07, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x18, 0x02, 0x20,
0x01, 0x28, 0x09, 0x52, 0x07, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x32, 0x5c, 0x0a, 0x0e,
0x43, 0x6c, 0x75, 0x73, 0x74, 0x65, 0x72, 0x53, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x12, 0x4a,
0x0a, 0x0b, 0x48, 0x65, 0x61, 0x6c, 0x74, 0x68, 0x43, 0x68, 0x65, 0x63, 0x6b, 0x12, 0x1b, 0x2e,
0x63, 0x6c, 0x75, 0x73, 0x74, 0x65, 0x72, 0x2e, 0x48, 0x65, 0x61, 0x6c, 0x74, 0x68, 0x43, 0x68,
0x65, 0x63, 0x6b, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x1c, 0x2e, 0x63, 0x6c, 0x75,
0x73, 0x74, 0x65, 0x72, 0x2e, 0x48, 0x65, 0x61, 0x6c, 0x74, 0x68, 0x43, 0x68, 0x65, 0x63, 0x6b,
0x50, 0x72, 0x6f, 0x67, 0x72, 0x65, 0x73, 0x73, 0x30, 0x01, 0x42, 0x4b, 0x0a, 0x0f, 0x63, 0x6f,
0x6d, 0x2e, 0x63, 0x6c, 0x75, 0x73, 0x74, 0x65, 0x72, 0x2e, 0x61, 0x70, 0x69, 0x42, 0x0a, 0x43,
0x6c, 0x75, 0x73, 0x74, 0x65, 0x72, 0x41, 0x70, 0x69, 0x50, 0x01, 0x5a, 0x2a, 0x67, 0x69, 0x74,
0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x74, 0x61, 0x6c, 0x6f, 0x73, 0x2d, 0x73, 0x79,
0x73, 0x74, 0x65, 0x6d, 0x73, 0x2f, 0x74, 0x61, 0x6c, 0x6f, 0x73, 0x2f, 0x61, 0x70, 0x69, 0x2f,
0x63, 0x6c, 0x75, 0x73, 0x74, 0x65, 0x72, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33,
}
var (
file_cluster_cluster_proto_rawDescOnce sync.Once
file_cluster_cluster_proto_rawDescData = file_cluster_cluster_proto_rawDesc
)
func file_cluster_cluster_proto_rawDescGZIP() []byte {
file_cluster_cluster_proto_rawDescOnce.Do(func() {
file_cluster_cluster_proto_rawDescData = protoimpl.X.CompressGZIP(file_cluster_cluster_proto_rawDescData)
})
return file_cluster_cluster_proto_rawDescData
}
var (
file_cluster_cluster_proto_msgTypes = make([]protoimpl.MessageInfo, 3)
file_cluster_cluster_proto_goTypes = []interface{}{
(*HealthCheckRequest)(nil), // 0: cluster.HealthCheckRequest
(*ClusterInfo)(nil), // 1: cluster.ClusterInfo
(*HealthCheckProgress)(nil), // 2: cluster.HealthCheckProgress
(*duration.Duration)(nil), // 3: google.protobuf.Duration
(*common.Metadata)(nil), // 4: common.Metadata
}
)
var file_cluster_cluster_proto_depIdxs = []int32{
3, // 0: cluster.HealthCheckRequest.wait_timeout:type_name -> google.protobuf.Duration
1, // 1: cluster.HealthCheckRequest.cluster_info:type_name -> cluster.ClusterInfo
4, // 2: cluster.HealthCheckProgress.metadata:type_name -> common.Metadata
0, // 3: cluster.ClusterService.HealthCheck:input_type -> cluster.HealthCheckRequest
2, // 4: cluster.ClusterService.HealthCheck:output_type -> cluster.HealthCheckProgress
4, // [4:5] is the sub-list for method output_type
3, // [3:4] is the sub-list for method input_type
3, // [3:3] is the sub-list for extension type_name
3, // [3:3] is the sub-list for extension extendee
0, // [0:3] is the sub-list for field type_name
}
func init() { file_cluster_cluster_proto_init() }
func file_cluster_cluster_proto_init() {
if File_cluster_cluster_proto != nil {
return
}
if !protoimpl.UnsafeEnabled {
file_cluster_cluster_proto_msgTypes[0].Exporter = func(v interface{}, i int) interface{} {
switch v := v.(*HealthCheckRequest); i {
case 0:
return &v.state
case 1:
return &v.sizeCache
case 2:
return &v.unknownFields
default:
return nil
}
}
file_cluster_cluster_proto_msgTypes[1].Exporter = func(v interface{}, i int) interface{} {
switch v := v.(*ClusterInfo); i {
case 0:
return &v.state
case 1:
return &v.sizeCache
case 2:
return &v.unknownFields
default:
return nil
}
}
file_cluster_cluster_proto_msgTypes[2].Exporter = func(v interface{}, i int) interface{} {
switch v := v.(*HealthCheckProgress); i {
case 0:
return &v.state
case 1:
return &v.sizeCache
case 2:
return &v.unknownFields
default:
return nil
}
}
}
type x struct{}
out := protoimpl.TypeBuilder{
File: protoimpl.DescBuilder{
GoPackagePath: reflect.TypeOf(x{}).PkgPath(),
RawDescriptor: file_cluster_cluster_proto_rawDesc,
NumEnums: 0,
NumMessages: 3,
NumExtensions: 0,
NumServices: 1,
},
GoTypes: file_cluster_cluster_proto_goTypes,
DependencyIndexes: file_cluster_cluster_proto_depIdxs,
MessageInfos: file_cluster_cluster_proto_msgTypes,
}.Build()
File_cluster_cluster_proto = out.File
file_cluster_cluster_proto_rawDesc = nil
file_cluster_cluster_proto_goTypes = nil
file_cluster_cluster_proto_depIdxs = nil
}
// Reference imports to suppress errors if they are not otherwise used.
var (
_ context.Context
_ grpc.ClientConnInterface
)
// This is a compile-time assertion to ensure that this generated file
// is compatible with the grpc package it is being compiled against.
const _ = grpc.SupportPackageIsVersion6
// ClusterServiceClient is the client API for ClusterService service.
//
// For semantics around ctx use and closing/ending streaming RPCs, please refer to https://godoc.org/google.golang.org/grpc#ClientConn.NewStream.
type ClusterServiceClient interface {
HealthCheck(ctx context.Context, in *HealthCheckRequest, opts ...grpc.CallOption) (ClusterService_HealthCheckClient, error)
}
type clusterServiceClient struct {
cc grpc.ClientConnInterface
}
func NewClusterServiceClient(cc grpc.ClientConnInterface) ClusterServiceClient {
return &clusterServiceClient{cc}
}
func (c *clusterServiceClient) HealthCheck(ctx context.Context, in *HealthCheckRequest, opts ...grpc.CallOption) (ClusterService_HealthCheckClient, error) {
stream, err := c.cc.NewStream(ctx, &_ClusterService_serviceDesc.Streams[0], "/cluster.ClusterService/HealthCheck", opts...)
if err != nil {
return nil, err
}
x := &clusterServiceHealthCheckClient{stream}
if err := x.ClientStream.SendMsg(in); err != nil {
return nil, err
}
if err := x.ClientStream.CloseSend(); err != nil {
return nil, err
}
return x, nil
}
type ClusterService_HealthCheckClient interface {
Recv() (*HealthCheckProgress, error)
grpc.ClientStream
}
type clusterServiceHealthCheckClient struct {
grpc.ClientStream
}
func (x *clusterServiceHealthCheckClient) Recv() (*HealthCheckProgress, error) {
m := new(HealthCheckProgress)
if err := x.ClientStream.RecvMsg(m); err != nil {
return nil, err
}
return m, nil
}
// ClusterServiceServer is the server API for ClusterService service.
type ClusterServiceServer interface {
HealthCheck(*HealthCheckRequest, ClusterService_HealthCheckServer) error
}
// UnimplementedClusterServiceServer can be embedded to have forward compatible implementations.
type UnimplementedClusterServiceServer struct {
}
func (*UnimplementedClusterServiceServer) HealthCheck(*HealthCheckRequest, ClusterService_HealthCheckServer) error {
return status.Errorf(codes.Unimplemented, "method HealthCheck not implemented")
}
func RegisterClusterServiceServer(s *grpc.Server, srv ClusterServiceServer) {
s.RegisterService(&_ClusterService_serviceDesc, srv)
}
func _ClusterService_HealthCheck_Handler(srv interface{}, stream grpc.ServerStream) error {
m := new(HealthCheckRequest)
if err := stream.RecvMsg(m); err != nil {
return err
}
return srv.(ClusterServiceServer).HealthCheck(m, &clusterServiceHealthCheckServer{stream})
}
type ClusterService_HealthCheckServer interface {
Send(*HealthCheckProgress) error
grpc.ServerStream
}
type clusterServiceHealthCheckServer struct {
grpc.ServerStream
}
func (x *clusterServiceHealthCheckServer) Send(m *HealthCheckProgress) error {
return x.ServerStream.SendMsg(m)
}
var _ClusterService_serviceDesc = grpc.ServiceDesc{
ServiceName: "cluster.ClusterService",
HandlerType: (*ClusterServiceServer)(nil),
Methods: []grpc.MethodDesc{},
Streams: []grpc.StreamDesc{
{
StreamName: "HealthCheck",
Handler: _ClusterService_HealthCheck_Handler,
ServerStreams: true,
},
},
Metadata: "cluster/cluster.proto",
}

32
api/cluster/cluster.proto Normal file
View File

@ -0,0 +1,32 @@
syntax = "proto3";
package cluster;
option go_package = "github.com/talos-systems/talos/api/cluster";
option java_multiple_files = true;
option java_outer_classname = "ClusterApi";
option java_package = "com.cluster.api";
import "google/protobuf/duration.proto";
import "common/common.proto";
// The cluster service definition.
service ClusterService {
rpc HealthCheck(HealthCheckRequest) returns (stream HealthCheckProgress);
}
message HealthCheckRequest {
google.protobuf.Duration wait_timeout = 1;
ClusterInfo cluster_info = 2;
}
message ClusterInfo {
repeated string control_plane_nodes = 1;
repeated string worker_nodes = 2;
string force_endpoint = 3;
}
message HealthCheckProgress {
common.Metadata metadata = 1;
string message = 2;
}

View File

@ -14,6 +14,10 @@ import (
"github.com/talos-systems/talos/pkg/client" "github.com/talos-systems/talos/pkg/client"
) )
var crashdumpCmdFlags struct {
clusterState clusterNodes
}
// crashdumpCmd represents the crashdump command. // crashdumpCmd represents the crashdump command.
var crashdumpCmd = &cobra.Command{ var crashdumpCmd = &cobra.Command{
Use: "crashdump", Use: "crashdump",
@ -29,7 +33,7 @@ var crashdumpCmd = &cobra.Command{
worker := cluster.APICrashDumper{ worker := cluster.APICrashDumper{
ClientProvider: clientProvider, ClientProvider: clientProvider,
Info: &clusterState, Info: &crashdumpCmdFlags.clusterState,
} }
worker.CrashDump(ctx, os.Stdout) worker.CrashDump(ctx, os.Stdout)
@ -41,7 +45,7 @@ var crashdumpCmd = &cobra.Command{
func init() { func init() {
addCommand(crashdumpCmd) addCommand(crashdumpCmd)
crashdumpCmd.Flags().StringVar(&clusterState.InitNode, "init-node", "", "specify IPs of init node") crashdumpCmd.Flags().StringVar(&crashdumpCmdFlags.clusterState.InitNode, "init-node", "", "specify IPs of init node")
crashdumpCmd.Flags().StringSliceVar(&clusterState.ControlPlaneNodes, "control-plane-nodes", nil, "specify IPs of control plane nodes") crashdumpCmd.Flags().StringSliceVar(&crashdumpCmdFlags.clusterState.ControlPlaneNodes, "control-plane-nodes", nil, "specify IPs of control plane nodes")
crashdumpCmd.Flags().StringSliceVar(&clusterState.WorkerNodes, "worker-nodes", nil, "specify IPs of worker nodes") crashdumpCmd.Flags().StringSliceVar(&crashdumpCmdFlags.clusterState.WorkerNodes, "worker-nodes", nil, "specify IPs of worker nodes")
} }

View File

@ -6,10 +6,17 @@ package talos
import ( import (
"context" "context"
"fmt"
"io"
"os"
"time" "time"
"github.com/spf13/cobra" "github.com/spf13/cobra"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
clusterapi "github.com/talos-systems/talos/api/cluster"
"github.com/talos-systems/talos/cmd/talosctl/pkg/talos/helpers"
"github.com/talos-systems/talos/internal/app/machined/pkg/runtime" "github.com/talos-systems/talos/internal/app/machined/pkg/runtime"
"github.com/talos-systems/talos/internal/pkg/cluster" "github.com/talos-systems/talos/internal/pkg/cluster"
"github.com/talos-systems/talos/internal/pkg/cluster/check" "github.com/talos-systems/talos/internal/pkg/cluster/check"
@ -39,11 +46,12 @@ func (cluster *clusterNodes) NodesByType(t runtime.MachineType) []string {
} }
} }
var ( var healthCmdFlags struct {
clusterState clusterNodes clusterState clusterNodes
clusterWaitTimeout time.Duration clusterWaitTimeout time.Duration
forceEndpoint string forceEndpoint string
) runOnServer bool
}
// healthCmd represents the health command. // healthCmd represents the health command.
var healthCmd = &cobra.Command{ var healthCmd = &cobra.Command{
@ -53,38 +61,88 @@ var healthCmd = &cobra.Command{
Args: cobra.NoArgs, Args: cobra.NoArgs,
RunE: func(cmd *cobra.Command, args []string) error { RunE: func(cmd *cobra.Command, args []string) error {
return WithClient(func(ctx context.Context, c *client.Client) error { return WithClient(func(ctx context.Context, c *client.Client) error {
clientProvider := &cluster.ConfigClientProvider{ if healthCmdFlags.runOnServer {
DefaultClient: c, return healthOnServer(ctx, c)
}
defer clientProvider.Close() //nolint: errcheck
state := struct {
cluster.ClientProvider
cluster.K8sProvider
cluster.Info
}{
ClientProvider: clientProvider,
K8sProvider: &cluster.KubernetesClient{
ClientProvider: clientProvider,
ForceEndpoint: forceEndpoint,
},
Info: &clusterState,
} }
// Run cluster readiness checks return healthOnClient(ctx, c)
checkCtx, checkCtxCancel := context.WithTimeout(ctx, clusterWaitTimeout)
defer checkCtxCancel()
return check.Wait(checkCtx, &state, append(check.DefaultClusterChecks(), check.ExtraClusterChecks()...), check.StderrReporter())
}) })
}, },
} }
func healthOnClient(ctx context.Context, c *client.Client) error {
clientProvider := &cluster.ConfigClientProvider{
DefaultClient: c,
}
defer clientProvider.Close() //nolint: errcheck
state := struct {
cluster.ClientProvider
cluster.K8sProvider
cluster.Info
}{
ClientProvider: clientProvider,
K8sProvider: &cluster.KubernetesClient{
ClientProvider: clientProvider,
ForceEndpoint: healthCmdFlags.forceEndpoint,
},
Info: &healthCmdFlags.clusterState,
}
// Run cluster readiness checks
checkCtx, checkCtxCancel := context.WithTimeout(ctx, healthCmdFlags.clusterWaitTimeout)
defer checkCtxCancel()
return check.Wait(checkCtx, &state, append(check.DefaultClusterChecks(), check.ExtraClusterChecks()...), check.StderrReporter())
}
func healthOnServer(ctx context.Context, c *client.Client) error {
if err := helpers.FailIfMultiNodes(ctx, "health"); err != nil {
return err
}
controlPlaneNodes := healthCmdFlags.clusterState.ControlPlaneNodes
if healthCmdFlags.clusterState.InitNode != "" {
controlPlaneNodes = append(controlPlaneNodes, healthCmdFlags.clusterState.InitNode)
}
client, err := c.ClusterHealthCheck(ctx, healthCmdFlags.clusterWaitTimeout, &clusterapi.ClusterInfo{
ControlPlaneNodes: controlPlaneNodes,
WorkerNodes: healthCmdFlags.clusterState.WorkerNodes,
ForceEndpoint: healthCmdFlags.forceEndpoint,
})
if err != nil {
return err
}
if err := client.CloseSend(); err != nil {
return err
}
for {
msg, err := client.Recv()
if err != nil {
if err == io.EOF || status.Code(err) == codes.Canceled {
return nil
}
return err
}
if msg.GetMetadata().GetError() != "" {
return fmt.Errorf("healthcheck error: %s", msg.GetMetadata().GetError())
}
fmt.Fprintln(os.Stderr, msg.GetMessage())
}
}
func init() { func init() {
addCommand(healthCmd) addCommand(healthCmd)
healthCmd.Flags().StringVar(&clusterState.InitNode, "init-node", "", "specify IPs of init node") healthCmd.Flags().StringVar(&healthCmdFlags.clusterState.InitNode, "init-node", "", "specify IPs of init node")
healthCmd.Flags().StringSliceVar(&clusterState.ControlPlaneNodes, "control-plane-nodes", nil, "specify IPs of control plane nodes") healthCmd.Flags().StringSliceVar(&healthCmdFlags.clusterState.ControlPlaneNodes, "control-plane-nodes", nil, "specify IPs of control plane nodes")
healthCmd.Flags().StringSliceVar(&clusterState.WorkerNodes, "worker-nodes", nil, "specify IPs of worker nodes") healthCmd.Flags().StringSliceVar(&healthCmdFlags.clusterState.WorkerNodes, "worker-nodes", nil, "specify IPs of worker nodes")
healthCmd.Flags().DurationVar(&clusterWaitTimeout, "wait-timeout", 20*time.Minute, "timeout to wait for the cluster to be ready") healthCmd.Flags().DurationVar(&healthCmdFlags.clusterWaitTimeout, "wait-timeout", 20*time.Minute, "timeout to wait for the cluster to be ready")
healthCmd.Flags().StringVar(&forceEndpoint, "k8s-endpoint", "", "use endpoint instead of kubeconfig default") healthCmd.Flags().StringVar(&healthCmdFlags.forceEndpoint, "k8s-endpoint", "", "use endpoint instead of kubeconfig default")
healthCmd.Flags().BoolVar(&healthCmdFlags.runOnServer, "server", true, "run server-side check")
} }

View File

@ -18,6 +18,7 @@ talosctl health [flags]
-h, --help help for health -h, --help help for health
--init-node string specify IPs of init node --init-node string specify IPs of init node
--k8s-endpoint string use endpoint instead of kubeconfig default --k8s-endpoint string use endpoint instead of kubeconfig default
--server run server-side check (default true)
--wait-timeout duration timeout to wait for the cluster to be ready (default 20m0s) --wait-timeout duration timeout to wait for the cluster to be ready (default 20m0s)
--worker-nodes strings specify IPs of worker nodes --worker-nodes strings specify IPs of worker nodes
``` ```

View File

@ -11,6 +11,7 @@ import (
"strings" "strings"
"github.com/talos-systems/grpc-proxy/proxy" "github.com/talos-systems/grpc-proxy/proxy"
"golang.org/x/sync/errgroup"
"google.golang.org/grpc" "google.golang.org/grpc"
"google.golang.org/grpc/credentials" "google.golang.org/grpc/credentials"
@ -79,6 +80,7 @@ func main() {
"/machine.MachineService/Logs", "/machine.MachineService/Logs",
"/machine.MachineService/Read", "/machine.MachineService/Read",
"/os.OSService/Dmesg", "/os.OSService/Dmesg",
"/cluster.ClusterService/HealthCheck",
} { } {
router.RegisterStreamedRegex("^" + regexp.QuoteMeta(methodName) + "$") router.RegisterStreamedRegex("^" + regexp.QuoteMeta(methodName) + "$")
} }
@ -86,23 +88,45 @@ func main() {
// register future pattern: method should have suffix "Stream" // register future pattern: method should have suffix "Stream"
router.RegisterStreamedRegex("Stream$") router.RegisterStreamedRegex("Stream$")
err = factory.ListenAndServe( var errGroup errgroup.Group
router,
factory.Port(constants.ApidPort), errGroup.Go(func() error {
factory.WithDefaultLog(), return factory.ListenAndServe(
factory.ServerOptions( router,
grpc.Creds( factory.Port(constants.ApidPort),
credentials.NewTLS(serverTLSConfig), factory.WithDefaultLog(),
factory.ServerOptions(
grpc.Creds(
credentials.NewTLS(serverTLSConfig),
),
grpc.CustomCodec(proxy.Codec()),
grpc.UnknownServiceHandler(
proxy.TransparentHandler(
router.Director,
proxy.WithStreamedDetector(router.StreamedDetector),
)),
), ),
grpc.CustomCodec(proxy.Codec()), )
grpc.UnknownServiceHandler( })
proxy.TransparentHandler(
router.Director, errGroup.Go(func() error {
proxy.WithStreamedDetector(router.StreamedDetector), return factory.ListenAndServe(
)), router,
), factory.Network("unix"),
) factory.SocketPath(constants.APISocketPath),
if err != nil { factory.WithDefaultLog(),
factory.ServerOptions(
grpc.CustomCodec(proxy.Codec()),
grpc.UnknownServiceHandler(
proxy.TransparentHandler(
router.Director,
proxy.WithStreamedDetector(router.StreamedDetector),
)),
),
)
})
if err := errGroup.Wait(); err != nil {
log.Fatalf("listen: %v", err) log.Fatalf("listen: %v", err)
} }
} }

View File

@ -0,0 +1,128 @@
// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this
// file, You can obtain one at http://mozilla.org/MPL/2.0/.
package runtime
import (
"context"
"fmt"
"strings"
clusterapi "github.com/talos-systems/talos/api/cluster"
"github.com/talos-systems/talos/internal/app/machined/pkg/runtime"
"github.com/talos-systems/talos/internal/pkg/cluster"
"github.com/talos-systems/talos/internal/pkg/cluster/check"
"github.com/talos-systems/talos/internal/pkg/conditions"
)
// HealthCheck implements the cluster.ClusterServer interface.
func (s *Server) HealthCheck(in *clusterapi.HealthCheckRequest, srv clusterapi.ClusterService_HealthCheckServer) error {
clientProvider := &cluster.LocalClientProvider{}
defer clientProvider.Close() //nolint: errcheck
k8sProvider := &cluster.KubernetesClient{
ClientProvider: clientProvider,
ForceEndpoint: in.GetClusterInfo().GetForceEndpoint(),
}
clusterState := clusterState{
controlPlaneNodes: in.GetClusterInfo().GetControlPlaneNodes(),
workerNodes: in.GetClusterInfo().GetWorkerNodes(),
}
state := struct {
cluster.ClientProvider
cluster.K8sProvider
cluster.Info
}{
ClientProvider: clientProvider,
K8sProvider: k8sProvider,
Info: &clusterState,
}
// Run cluster readiness checks
checkCtx, checkCtxCancel := context.WithTimeout(srv.Context(), in.WaitTimeout.AsDuration())
defer checkCtxCancel()
if err := clusterState.resolve(checkCtx, k8sProvider); err != nil {
return fmt.Errorf("error discovering nodes: %w", err)
}
if err := srv.Send(&clusterapi.HealthCheckProgress{
Message: fmt.Sprintf("discovered nodes: %s", &clusterState),
}); err != nil {
return err
}
return check.Wait(checkCtx, &state, append(check.DefaultClusterChecks(), check.ExtraClusterChecks()...), &healthReporter{srv: srv})
}
type healthReporter struct {
srv clusterapi.ClusterService_HealthCheckServer
lastLine string
}
func (hr *healthReporter) Update(condition conditions.Condition) {
line := fmt.Sprintf("waiting for %s", condition)
if line != hr.lastLine {
hr.srv.Send(&clusterapi.HealthCheckProgress{ //nolint: errcheck
Message: strings.TrimSpace(line),
})
hr.lastLine = line
}
}
type clusterState struct {
controlPlaneNodes []string
workerNodes []string
}
func (cluster *clusterState) Nodes() []string {
return append(cluster.controlPlaneNodes, cluster.workerNodes...)
}
func (cluster *clusterState) NodesByType(t runtime.MachineType) []string {
switch t {
case runtime.MachineTypeInit:
return nil
case runtime.MachineTypeControlPlane:
return cluster.controlPlaneNodes
case runtime.MachineTypeJoin:
return cluster.workerNodes
default:
panic("unsupported machine type")
}
}
func (cluster *clusterState) resolve(ctx context.Context, k8sProvider *cluster.KubernetesClient) error {
var err error
if len(cluster.controlPlaneNodes) == 0 {
if _, err = k8sProvider.K8sClient(ctx); err != nil {
return err
}
if cluster.controlPlaneNodes, err = k8sProvider.KubeHelper.MasterIPs(ctx); err != nil {
return err
}
}
if len(cluster.workerNodes) == 0 {
if _, err = k8sProvider.K8sClient(ctx); err != nil {
return err
}
if cluster.workerNodes, err = k8sProvider.KubeHelper.WorkerIPs(ctx); err != nil {
return err
}
}
return nil
}
func (cluster *clusterState) String() string {
return fmt.Sprintf("control plane: %q, worker: %q", cluster.controlPlaneNodes, cluster.workerNodes)
}

View File

@ -32,6 +32,7 @@ import (
"golang.org/x/sys/unix" "golang.org/x/sys/unix"
"google.golang.org/grpc" "google.golang.org/grpc"
"github.com/talos-systems/talos/api/cluster"
"github.com/talos-systems/talos/api/common" "github.com/talos-systems/talos/api/common"
"github.com/talos-systems/talos/api/machine" "github.com/talos-systems/talos/api/machine"
osapi "github.com/talos-systems/talos/api/os" osapi "github.com/talos-systems/talos/api/os"
@ -68,6 +69,7 @@ func (s *Server) Register(obj *grpc.Server) {
machine.RegisterMachineServiceServer(obj, s) machine.RegisterMachineServiceServer(obj, s)
osapi.RegisterOSServiceServer(obj, &osdServer{Server: s}) //nolint: staticcheck osapi.RegisterOSServiceServer(obj, &osdServer{Server: s}) //nolint: staticcheck
cluster.RegisterClusterServiceServer(obj, s)
} }
// Reboot implements the machine.MachineServer interface. // Reboot implements the machine.MachineServer interface.

View File

@ -9,6 +9,7 @@ import (
"context" "context"
"fmt" "fmt"
"net" "net"
"os"
"path/filepath" "path/filepath"
"strings" "strings"
"time" "time"
@ -73,11 +74,19 @@ func (o *APID) DependsOn(r runtime.Runtime) []string {
return []string{"containerd", "networkd", "timed"} return []string{"containerd", "networkd", "timed"}
} }
// Runner implements the Service interface.
//
//nolint: gocyclo
func (o *APID) Runner(r runtime.Runtime) (runner.Runner, error) { func (o *APID) Runner(r runtime.Runtime) (runner.Runner, error) {
image := "talos/apid" image := "talos/apid"
endpoints := []string{"127.0.0.1"} endpoints := []string{"127.0.0.1"}
// Ensure socket dir exists
if err := os.MkdirAll(filepath.Dir(constants.APISocketPath), 0750); err != nil {
return nil, err
}
if r.Config().Machine().Type() == runtime.MachineTypeJoin { if r.Config().Machine().Type() == runtime.MachineTypeJoin {
opts := []retry.Option{retry.WithUnits(3 * time.Second), retry.WithJitter(time.Second)} opts := []retry.Option{retry.WithUnits(3 * time.Second), retry.WithJitter(time.Second)}
@ -117,6 +126,7 @@ func (o *APID) Runner(r runtime.Runtime) (runner.Runner, error) {
{Type: "bind", Destination: "/etc/ssl", Source: "/etc/ssl", Options: []string{"bind", "ro"}}, {Type: "bind", Destination: "/etc/ssl", Source: "/etc/ssl", Options: []string{"bind", "ro"}},
{Type: "bind", Destination: constants.ConfigPath, Source: constants.ConfigPath, Options: []string{"rbind", "ro"}}, {Type: "bind", Destination: constants.ConfigPath, Source: constants.ConfigPath, Options: []string{"rbind", "ro"}},
{Type: "bind", Destination: filepath.Dir(constants.RouterdSocketPath), Source: filepath.Dir(constants.RouterdSocketPath), Options: []string{"rbind", "ro"}}, {Type: "bind", Destination: filepath.Dir(constants.RouterdSocketPath), Source: filepath.Dir(constants.RouterdSocketPath), Options: []string{"rbind", "ro"}},
{Type: "bind", Destination: filepath.Dir(constants.APISocketPath), Source: filepath.Dir(constants.APISocketPath), Options: []string{"rbind", "rw"}},
} }
env := []string{} env := []string{}

View File

@ -33,6 +33,7 @@ func main() {
router.RegisterLocalBackend("machine.MachineService", machinedBackend) router.RegisterLocalBackend("machine.MachineService", machinedBackend)
router.RegisterLocalBackend("time.TimeService", backend.NewLocal("timed", constants.TimeSocketPath)) router.RegisterLocalBackend("time.TimeService", backend.NewLocal("timed", constants.TimeSocketPath))
router.RegisterLocalBackend("network.NetworkService", backend.NewLocal("networkd", constants.NetworkSocketPath)) router.RegisterLocalBackend("network.NetworkService", backend.NewLocal("networkd", constants.NetworkSocketPath))
router.RegisterLocalBackend("cluster.ClusterService", machinedBackend)
err := factory.ListenAndServe( err := factory.ListenAndServe(
router, router,

View File

@ -25,13 +25,13 @@ func (suite *HealthSuite) SuiteName() string {
return "cli.HealthSuite" return "cli.HealthSuite"
} }
// TestRun does successful health check run. // TestClientSide does successful health check run from client-side.
func (suite *HealthSuite) TestRun() { func (suite *HealthSuite) TestClientSide() {
if suite.Cluster == nil { if suite.Cluster == nil {
suite.T().Skip("Cluster is not available, skipping test") suite.T().Skip("Cluster is not available, skipping test")
} }
args := []string{} args := []string{"--server=false"}
bootstrapAPIIsUsed := true bootstrapAPIIsUsed := true
@ -86,6 +86,15 @@ func (suite *HealthSuite) TestRun() {
) )
} }
// TestServerSide does successful health check run from server-side.
func (suite *HealthSuite) TestServerSide() {
suite.RunCLI([]string{"health"},
base.StderrNotEmpty(),
base.StdoutEmpty(),
base.StderrShouldMatch(regexp.MustCompile(`waiting for all k8s nodes to report ready`)),
)
}
func init() { func init() {
allSuites = append(allSuites, new(HealthSuite)) allSuites = append(allSuites, new(HealthSuite))
} }

View File

@ -0,0 +1,37 @@
// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this
// file, You can obtain one at http://mozilla.org/MPL/2.0/.
package cluster
import (
"context"
k8s "k8s.io/client-go/kubernetes"
"github.com/talos-systems/talos/pkg/kubernetes"
)
// KubernetesFromKubeletClient provides Kubernetes client built from local kubelet config.
type KubernetesFromKubeletClient struct {
KubeHelper *kubernetes.Client
clientset *k8s.Clientset
}
// K8sClient builds Kubernetes client from local kubelet config.
//
// Kubernetes client instance is cached.
func (k *KubernetesFromKubeletClient) K8sClient(ctx context.Context) (*k8s.Clientset, error) {
if k.clientset != nil {
return k.clientset, nil
}
var err error
if k.KubeHelper, err = kubernetes.NewClientFromKubeletKubeconfig(); err != nil {
return nil, err
}
k.clientset = k.KubeHelper.Clientset
return k.clientset, nil
}

View File

@ -12,6 +12,8 @@ import (
"k8s.io/client-go/kubernetes" "k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/clientcmd" "k8s.io/client-go/tools/clientcmd"
clientcmdapi "k8s.io/client-go/tools/clientcmd/api" clientcmdapi "k8s.io/client-go/tools/clientcmd/api"
k8s "github.com/talos-systems/talos/pkg/kubernetes"
) )
// KubernetesClient provides Kubernetes client built via Talos API Kubeconfig. // KubernetesClient provides Kubernetes client built via Talos API Kubeconfig.
@ -22,7 +24,8 @@ type KubernetesClient struct {
// ForceEndpoint overrides default Kubernetes API endpoint. // ForceEndpoint overrides default Kubernetes API endpoint.
ForceEndpoint string ForceEndpoint string
clientset *kubernetes.Clientset KubeHelper *k8s.Client
clientset *kubernetes.Clientset
} }
// K8sClient builds Kubernetes client via Talos Kubeconfig API. // K8sClient builds Kubernetes client via Talos Kubeconfig API.
@ -57,10 +60,11 @@ func (k *KubernetesClient) K8sClient(ctx context.Context) (*kubernetes.Clientset
config.Host = fmt.Sprintf("%s:%d", k.ForceEndpoint, 6443) config.Host = fmt.Sprintf("%s:%d", k.ForceEndpoint, 6443)
} }
clientset, err := kubernetes.NewForConfig(config) if k.KubeHelper, err = k8s.NewForConfig(config); err != nil {
if err == nil { return nil, err
k.clientset = clientset
} }
return clientset, err k.clientset = k.KubeHelper.Clientset
return k.clientset, nil
} }

View File

@ -0,0 +1,51 @@
// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this
// file, You can obtain one at http://mozilla.org/MPL/2.0/.
package cluster
import (
"context"
"fmt"
"google.golang.org/grpc"
"github.com/talos-systems/talos/pkg/client"
"github.com/talos-systems/talos/pkg/constants"
)
// LocalClientProvider builds Talos client to connect to same-node apid instance over file socket.
type LocalClientProvider struct {
client *client.Client
}
// Client returns Talos client instance for default (if no endpoints are given) or
// specific endpoints.
//
// Client implements ClientProvider interface.
func (c *LocalClientProvider) Client(endpoints ...string) (*client.Client, error) {
if len(endpoints) > 0 {
return nil, fmt.Errorf("custom endpoints not supported with LocalClientProvider")
}
var err error
if c.client == nil {
c.client, err = client.New(context.TODO(), client.WithUnixSocket(constants.APISocketPath), client.WithGRPCDialOptions(grpc.WithInsecure()))
}
return c.client, err
}
// Close all the client connections.
func (c *LocalClientProvider) Close() error {
if c.client != nil {
if err := c.client.Close(); err != nil {
return err
}
c.client = nil
}
return nil
}

View File

@ -16,15 +16,18 @@ import (
"fmt" "fmt"
"io" "io"
"strings" "strings"
"time"
"github.com/golang/protobuf/ptypes/empty" "github.com/golang/protobuf/ptypes/empty"
"google.golang.org/grpc" "google.golang.org/grpc"
"google.golang.org/grpc/codes" "google.golang.org/grpc/codes"
"google.golang.org/grpc/credentials" "google.golang.org/grpc/credentials"
"google.golang.org/grpc/status" "google.golang.org/grpc/status"
"google.golang.org/protobuf/types/known/durationpb"
grpctls "github.com/talos-systems/talos/pkg/grpc/tls" grpctls "github.com/talos-systems/talos/pkg/grpc/tls"
clusterapi "github.com/talos-systems/talos/api/cluster"
"github.com/talos-systems/talos/api/common" "github.com/talos-systems/talos/api/common"
machineapi "github.com/talos-systems/talos/api/machine" machineapi "github.com/talos-systems/talos/api/machine"
networkapi "github.com/talos-systems/talos/api/network" networkapi "github.com/talos-systems/talos/api/network"
@ -49,11 +52,16 @@ type Client struct {
MachineClient machineapi.MachineServiceClient MachineClient machineapi.MachineServiceClient
TimeClient timeapi.TimeServiceClient TimeClient timeapi.TimeServiceClient
NetworkClient networkapi.NetworkServiceClient NetworkClient networkapi.NetworkServiceClient
ClusterClient clusterapi.ClusterServiceClient
} }
func (c *Client) resolveConfigContext() error { func (c *Client) resolveConfigContext() error {
var ok bool var ok bool
if c.options.unixSocketPath != "" {
return nil
}
if c.options.configContext != nil { if c.options.configContext != nil {
return nil return nil
} }
@ -82,6 +90,10 @@ func (c *Client) resolveConfigContext() error {
} }
func (c *Client) getEndpoints() []string { func (c *Client) getEndpoints() []string {
if c.options.unixSocketPath != "" {
return []string{c.options.unixSocketPath}
}
if len(c.options.endpointsOverride) > 0 { if len(c.options.endpointsOverride) > 0 {
return c.options.endpointsOverride return c.options.endpointsOverride
} }
@ -121,6 +133,7 @@ func New(ctx context.Context, opts ...OptionFunc) (c *Client, err error) {
c.MachineClient = machineapi.NewMachineServiceClient(c.conn) c.MachineClient = machineapi.NewMachineServiceClient(c.conn)
c.TimeClient = timeapi.NewTimeServiceClient(c.conn) c.TimeClient = timeapi.NewTimeServiceClient(c.conn)
c.NetworkClient = networkapi.NewNetworkServiceClient(c.conn) c.NetworkClient = networkapi.NewNetworkServiceClient(c.conn)
c.ClusterClient = clusterapi.NewClusterServiceClient(c.conn)
return c, nil return c, nil
} }
@ -128,38 +141,46 @@ func New(ctx context.Context, opts ...OptionFunc) (c *Client, err error) {
func (c *Client) getConn(ctx context.Context, opts ...grpc.DialOption) (*grpc.ClientConn, error) { func (c *Client) getConn(ctx context.Context, opts ...grpc.DialOption) (*grpc.ClientConn, error) {
endpoints := c.getEndpoints() endpoints := c.getEndpoints()
// NB: we use the `dns` scheme here in order to handle fancier situations var target string
// when there is a single endpoint.
// Such possibilities include SRV records, multiple IPs from A and/or AAAA
// records, and descriptive TXT records which include things like load
// balancer specs.
target := fmt.Sprintf("dns:///%s:%d", net.FormatAddress(endpoints[0]), constants.ApidPort)
// If we are given more than one endpoint, we need to use our custom list resolver switch {
if len(endpoints) > 1 { case c.options.unixSocketPath != "":
target = fmt.Sprintf("unix:///%s", c.options.unixSocketPath)
case len(endpoints) > 1:
target = fmt.Sprintf("%s:///%s", talosListResolverScheme, strings.Join(endpoints, ",")) target = fmt.Sprintf("%s:///%s", talosListResolverScheme, strings.Join(endpoints, ","))
default:
// NB: we use the `dns` scheme here in order to handle fancier situations
// when there is a single endpoint.
// Such possibilities include SRV records, multiple IPs from A and/or AAAA
// records, and descriptive TXT records which include things like load
// balancer specs.
target = fmt.Sprintf("dns:///%s:%d", net.FormatAddress(endpoints[0]), constants.ApidPort)
} }
// Add TLS credentials to gRPC DialOptions dialOpts := []grpc.DialOption(nil)
tlsConfig := c.options.tlsConfig
if tlsConfig == nil { if c.options.unixSocketPath == "" {
creds, err := CredentialsFromConfigContext(c.options.configContext) // Add TLS credentials to gRPC DialOptions
if err != nil { tlsConfig := c.options.tlsConfig
return nil, fmt.Errorf("failed to acquire credentials: %w", err) if tlsConfig == nil {
creds, err := CredentialsFromConfigContext(c.options.configContext)
if err != nil {
return nil, fmt.Errorf("failed to acquire credentials: %w", err)
}
tlsConfig, err = grpctls.New(
grpctls.WithKeypair(creds.Crt),
grpctls.WithClientAuthType(grpctls.Mutual),
grpctls.WithCACertPEM(creds.CA),
)
if err != nil {
return nil, fmt.Errorf("failed to construct TLS credentials: %w", err)
}
} }
tlsConfig, err = grpctls.New( dialOpts = append(dialOpts,
grpctls.WithKeypair(creds.Crt), grpc.WithTransportCredentials(credentials.NewTLS(tlsConfig)),
grpctls.WithClientAuthType(grpctls.Mutual),
grpctls.WithCACertPEM(creds.CA),
) )
if err != nil {
return nil, fmt.Errorf("failed to construct TLS credentials: %w", err)
}
}
dialOpts := []grpc.DialOption{
grpc.WithTransportCredentials(credentials.NewTLS(tlsConfig)),
} }
dialOpts = append(dialOpts, c.options.grpcDialOptions...) dialOpts = append(dialOpts, c.options.grpcDialOptions...)
@ -255,6 +276,7 @@ func NewClient(cfg *tls.Config, endpoints []string, port int, opts ...grpc.DialO
c.MachineClient = machineapi.NewMachineServiceClient(c.conn) c.MachineClient = machineapi.NewMachineServiceClient(c.conn)
c.TimeClient = timeapi.NewTimeServiceClient(c.conn) c.TimeClient = timeapi.NewTimeServiceClient(c.conn)
c.NetworkClient = networkapi.NewNetworkServiceClient(c.conn) c.NetworkClient = networkapi.NewNetworkServiceClient(c.conn)
c.ClusterClient = clusterapi.NewClusterServiceClient(c.conn)
return c, nil return c, nil
} }
@ -704,6 +726,14 @@ func (c *Client) Read(ctx context.Context, path string) (io.ReadCloser, <-chan e
return ReadStream(stream) return ReadStream(stream)
} }
// ClusterHealthCheck runs a Talos cluster health check.
func (c *Client) ClusterHealthCheck(ctx context.Context, waitTimeout time.Duration, clusterInfo *clusterapi.ClusterInfo) (clusterapi.ClusterService_HealthCheckClient, error) {
return c.ClusterClient.HealthCheck(ctx, &clusterapi.HealthCheckRequest{
WaitTimeout: durationpb.New(waitTimeout),
ClusterInfo: clusterInfo,
})
}
// MachineStream is a common interface for streams returned by streaming APIs. // MachineStream is a common interface for streams returned by streaming APIs.
type MachineStream interface { type MachineStream interface {
Recv() (*common.Data, error) Recv() (*common.Data, error)

View File

@ -23,6 +23,8 @@ type Options struct {
contextOverride string contextOverride string
contextOverrideSet bool contextOverrideSet bool
unixSocketPath string
} }
// OptionFunc sets an option for the creation of the Client. // OptionFunc sets an option for the creation of the Client.
@ -108,3 +110,16 @@ func WithConfigFromFile(fn string) OptionFunc {
return nil return nil
} }
} }
// WithUnixSocket creates a Client which connects to apid over local file socket.
//
// This option disables config parsing and TLS.
//
// Connection over unix socket is only used within the Talos node.
func WithUnixSocket(path string) OptionFunc {
return func(o *Options) error {
o.unixSocketPath = path
return nil
}
}

View File

@ -227,6 +227,9 @@ const (
// TalosConfigEnvVar is the environment variable for setting the Talos configuration file path. // TalosConfigEnvVar is the environment variable for setting the Talos configuration file path.
TalosConfigEnvVar = "TALOSCONFIG" TalosConfigEnvVar = "TALOSCONFIG"
// APISocketPath is the path to file socket of apid.
APISocketPath = SystemRunPath + "/apid/apid.sock"
// MachineSocketPath is the path to file socket of machine API. // MachineSocketPath is the path to file socket of machine API.
MachineSocketPath = SystemRunPath + "/machined/machine.sock" MachineSocketPath = SystemRunPath + "/machined/machine.sock"

View File

@ -143,7 +143,7 @@ func NewTemporaryClientFromPKI(ca *x509.PEMEncodedCertificateAndKey, endpoint *u
return h, nil return h, nil
} }
// MasterIPs cordons and drains a node in one call. // MasterIPs returns a list of control plane endpoints (IP addresses).
func (h *Client) MasterIPs(ctx context.Context) (addrs []string, err error) { func (h *Client) MasterIPs(ctx context.Context) (addrs []string, err error) {
endpoints, err := h.CoreV1().Endpoints("default").Get(ctx, "kubernetes", metav1.GetOptions{}) endpoints, err := h.CoreV1().Endpoints("default").Get(ctx, "kubernetes", metav1.GetOptions{})
if err != nil { if err != nil {
@ -161,6 +161,30 @@ func (h *Client) MasterIPs(ctx context.Context) (addrs []string, err error) {
return addrs, nil return addrs, nil
} }
// WorkerIPs returns list of worker nodes IP addresses.
func (h *Client) WorkerIPs(ctx context.Context) (addrs []string, err error) {
resp, err := h.CoreV1().Nodes().List(ctx, metav1.ListOptions{})
if err != nil {
return nil, err
}
addrs = []string{}
for _, node := range resp.Items {
if _, ok := node.Labels[constants.LabelNodeRoleMaster]; ok {
continue
}
for _, nodeAddress := range node.Status.Addresses {
if nodeAddress.Type == corev1.NodeInternalIP {
addrs = append(addrs, nodeAddress.Address)
}
}
}
return addrs, nil
}
// LabelNodeAsMaster labels a node with the required master label. // LabelNodeAsMaster labels a node with the required master label.
func (h *Client) LabelNodeAsMaster(name string) (err error) { func (h *Client) LabelNodeAsMaster(name string) (err error) {
n, err := h.CoreV1().Nodes().Get(context.TODO(), name, metav1.GetOptions{}) n, err := h.CoreV1().Nodes().Get(context.TODO(), name, metav1.GetOptions{})