diff --git a/Dockerfile b/Dockerfile index 32f688d56..1eb2dca09 100644 --- a/Dockerfile +++ b/Dockerfile @@ -52,6 +52,8 @@ COPY ./api/network/network.proto /api/network/network.proto RUN protoc -I/api --go_out=plugins=grpc,paths=source_relative:/api network/network.proto COPY ./api/os/os.proto /api/os/os.proto RUN protoc -I/api --go_out=plugins=grpc,paths=source_relative:/api os/os.proto +COPY ./api/cluster/cluster.proto /api/cluster/cluster.proto +RUN protoc -I/api --go_out=plugins=grpc,paths=source_relative:/api cluster/cluster.proto # Gofumports generated files to adjust import order RUN gofumports -w -local github.com/talos-systems/talos /api/ @@ -63,6 +65,7 @@ COPY --from=generate-build /api/security/security.pb.go /api/security/ COPY --from=generate-build /api/machine/machine.pb.go /api/machine/ COPY --from=generate-build /api/time/time.pb.go /api/time/ COPY --from=generate-build /api/network/network.pb.go /api/network/ +COPY --from=generate-build /api/cluster/cluster.pb.go /api/cluster/ # The base target provides a container that can be used to build all Talos # assets. diff --git a/api/cluster/cluster.pb.go b/api/cluster/cluster.pb.go new file mode 100644 index 000000000..7cc7460ac --- /dev/null +++ b/api/cluster/cluster.pb.go @@ -0,0 +1,460 @@ +// Code generated by protoc-gen-go. DO NOT EDIT. +// versions: +// protoc-gen-go v1.23.0 +// protoc v3.12.3 +// source: cluster/cluster.proto + +package cluster + +import ( + context "context" + reflect "reflect" + sync "sync" + + proto "github.com/golang/protobuf/proto" + duration "github.com/golang/protobuf/ptypes/duration" + grpc "google.golang.org/grpc" + codes "google.golang.org/grpc/codes" + status "google.golang.org/grpc/status" + protoreflect "google.golang.org/protobuf/reflect/protoreflect" + protoimpl "google.golang.org/protobuf/runtime/protoimpl" + + common "github.com/talos-systems/talos/api/common" +) + +const ( + // Verify that this generated code is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(20 - protoimpl.MinVersion) + // Verify that runtime/protoimpl is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20) +) + +// This is a compile-time assertion that a sufficiently up-to-date version +// of the legacy proto package is being used. +const _ = proto.ProtoPackageIsVersion4 + +type HealthCheckRequest struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + WaitTimeout *duration.Duration `protobuf:"bytes,1,opt,name=wait_timeout,json=waitTimeout,proto3" json:"wait_timeout,omitempty"` + ClusterInfo *ClusterInfo `protobuf:"bytes,2,opt,name=cluster_info,json=clusterInfo,proto3" json:"cluster_info,omitempty"` +} + +func (x *HealthCheckRequest) Reset() { + *x = HealthCheckRequest{} + if protoimpl.UnsafeEnabled { + mi := &file_cluster_cluster_proto_msgTypes[0] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *HealthCheckRequest) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*HealthCheckRequest) ProtoMessage() {} + +func (x *HealthCheckRequest) ProtoReflect() protoreflect.Message { + mi := &file_cluster_cluster_proto_msgTypes[0] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use HealthCheckRequest.ProtoReflect.Descriptor instead. +func (*HealthCheckRequest) Descriptor() ([]byte, []int) { + return file_cluster_cluster_proto_rawDescGZIP(), []int{0} +} + +func (x *HealthCheckRequest) GetWaitTimeout() *duration.Duration { + if x != nil { + return x.WaitTimeout + } + return nil +} + +func (x *HealthCheckRequest) GetClusterInfo() *ClusterInfo { + if x != nil { + return x.ClusterInfo + } + return nil +} + +type ClusterInfo struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + ControlPlaneNodes []string `protobuf:"bytes,1,rep,name=control_plane_nodes,json=controlPlaneNodes,proto3" json:"control_plane_nodes,omitempty"` + WorkerNodes []string `protobuf:"bytes,2,rep,name=worker_nodes,json=workerNodes,proto3" json:"worker_nodes,omitempty"` + ForceEndpoint string `protobuf:"bytes,3,opt,name=force_endpoint,json=forceEndpoint,proto3" json:"force_endpoint,omitempty"` +} + +func (x *ClusterInfo) Reset() { + *x = ClusterInfo{} + if protoimpl.UnsafeEnabled { + mi := &file_cluster_cluster_proto_msgTypes[1] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *ClusterInfo) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*ClusterInfo) ProtoMessage() {} + +func (x *ClusterInfo) ProtoReflect() protoreflect.Message { + mi := &file_cluster_cluster_proto_msgTypes[1] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use ClusterInfo.ProtoReflect.Descriptor instead. +func (*ClusterInfo) Descriptor() ([]byte, []int) { + return file_cluster_cluster_proto_rawDescGZIP(), []int{1} +} + +func (x *ClusterInfo) GetControlPlaneNodes() []string { + if x != nil { + return x.ControlPlaneNodes + } + return nil +} + +func (x *ClusterInfo) GetWorkerNodes() []string { + if x != nil { + return x.WorkerNodes + } + return nil +} + +func (x *ClusterInfo) GetForceEndpoint() string { + if x != nil { + return x.ForceEndpoint + } + return "" +} + +type HealthCheckProgress struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + Metadata *common.Metadata `protobuf:"bytes,1,opt,name=metadata,proto3" json:"metadata,omitempty"` + Message string `protobuf:"bytes,2,opt,name=message,proto3" json:"message,omitempty"` +} + +func (x *HealthCheckProgress) Reset() { + *x = HealthCheckProgress{} + if protoimpl.UnsafeEnabled { + mi := &file_cluster_cluster_proto_msgTypes[2] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *HealthCheckProgress) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*HealthCheckProgress) ProtoMessage() {} + +func (x *HealthCheckProgress) ProtoReflect() protoreflect.Message { + mi := &file_cluster_cluster_proto_msgTypes[2] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use HealthCheckProgress.ProtoReflect.Descriptor instead. +func (*HealthCheckProgress) Descriptor() ([]byte, []int) { + return file_cluster_cluster_proto_rawDescGZIP(), []int{2} +} + +func (x *HealthCheckProgress) GetMetadata() *common.Metadata { + if x != nil { + return x.Metadata + } + return nil +} + +func (x *HealthCheckProgress) GetMessage() string { + if x != nil { + return x.Message + } + return "" +} + +var File_cluster_cluster_proto protoreflect.FileDescriptor + +var file_cluster_cluster_proto_rawDesc = []byte{ + 0x0a, 0x15, 0x63, 0x6c, 0x75, 0x73, 0x74, 0x65, 0x72, 0x2f, 0x63, 0x6c, 0x75, 0x73, 0x74, 0x65, + 0x72, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x12, 0x07, 0x63, 0x6c, 0x75, 0x73, 0x74, 0x65, 0x72, + 0x1a, 0x1e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, + 0x66, 0x2f, 0x64, 0x75, 0x72, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, + 0x1a, 0x13, 0x63, 0x6f, 0x6d, 0x6d, 0x6f, 0x6e, 0x2f, 0x63, 0x6f, 0x6d, 0x6d, 0x6f, 0x6e, 0x2e, + 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x22, 0x8b, 0x01, 0x0a, 0x12, 0x48, 0x65, 0x61, 0x6c, 0x74, 0x68, + 0x43, 0x68, 0x65, 0x63, 0x6b, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x3c, 0x0a, 0x0c, + 0x77, 0x61, 0x69, 0x74, 0x5f, 0x74, 0x69, 0x6d, 0x65, 0x6f, 0x75, 0x74, 0x18, 0x01, 0x20, 0x01, + 0x28, 0x0b, 0x32, 0x19, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, + 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x44, 0x75, 0x72, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x52, 0x0b, 0x77, + 0x61, 0x69, 0x74, 0x54, 0x69, 0x6d, 0x65, 0x6f, 0x75, 0x74, 0x12, 0x37, 0x0a, 0x0c, 0x63, 0x6c, + 0x75, 0x73, 0x74, 0x65, 0x72, 0x5f, 0x69, 0x6e, 0x66, 0x6f, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0b, + 0x32, 0x14, 0x2e, 0x63, 0x6c, 0x75, 0x73, 0x74, 0x65, 0x72, 0x2e, 0x43, 0x6c, 0x75, 0x73, 0x74, + 0x65, 0x72, 0x49, 0x6e, 0x66, 0x6f, 0x52, 0x0b, 0x63, 0x6c, 0x75, 0x73, 0x74, 0x65, 0x72, 0x49, + 0x6e, 0x66, 0x6f, 0x22, 0x87, 0x01, 0x0a, 0x0b, 0x43, 0x6c, 0x75, 0x73, 0x74, 0x65, 0x72, 0x49, + 0x6e, 0x66, 0x6f, 0x12, 0x2e, 0x0a, 0x13, 0x63, 0x6f, 0x6e, 0x74, 0x72, 0x6f, 0x6c, 0x5f, 0x70, + 0x6c, 0x61, 0x6e, 0x65, 0x5f, 0x6e, 0x6f, 0x64, 0x65, 0x73, 0x18, 0x01, 0x20, 0x03, 0x28, 0x09, + 0x52, 0x11, 0x63, 0x6f, 0x6e, 0x74, 0x72, 0x6f, 0x6c, 0x50, 0x6c, 0x61, 0x6e, 0x65, 0x4e, 0x6f, + 0x64, 0x65, 0x73, 0x12, 0x21, 0x0a, 0x0c, 0x77, 0x6f, 0x72, 0x6b, 0x65, 0x72, 0x5f, 0x6e, 0x6f, + 0x64, 0x65, 0x73, 0x18, 0x02, 0x20, 0x03, 0x28, 0x09, 0x52, 0x0b, 0x77, 0x6f, 0x72, 0x6b, 0x65, + 0x72, 0x4e, 0x6f, 0x64, 0x65, 0x73, 0x12, 0x25, 0x0a, 0x0e, 0x66, 0x6f, 0x72, 0x63, 0x65, 0x5f, + 0x65, 0x6e, 0x64, 0x70, 0x6f, 0x69, 0x6e, 0x74, 0x18, 0x03, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0d, + 0x66, 0x6f, 0x72, 0x63, 0x65, 0x45, 0x6e, 0x64, 0x70, 0x6f, 0x69, 0x6e, 0x74, 0x22, 0x5d, 0x0a, + 0x13, 0x48, 0x65, 0x61, 0x6c, 0x74, 0x68, 0x43, 0x68, 0x65, 0x63, 0x6b, 0x50, 0x72, 0x6f, 0x67, + 0x72, 0x65, 0x73, 0x73, 0x12, 0x2c, 0x0a, 0x08, 0x6d, 0x65, 0x74, 0x61, 0x64, 0x61, 0x74, 0x61, + 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x10, 0x2e, 0x63, 0x6f, 0x6d, 0x6d, 0x6f, 0x6e, 0x2e, + 0x4d, 0x65, 0x74, 0x61, 0x64, 0x61, 0x74, 0x61, 0x52, 0x08, 0x6d, 0x65, 0x74, 0x61, 0x64, 0x61, + 0x74, 0x61, 0x12, 0x18, 0x0a, 0x07, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x18, 0x02, 0x20, + 0x01, 0x28, 0x09, 0x52, 0x07, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x32, 0x5c, 0x0a, 0x0e, + 0x43, 0x6c, 0x75, 0x73, 0x74, 0x65, 0x72, 0x53, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x12, 0x4a, + 0x0a, 0x0b, 0x48, 0x65, 0x61, 0x6c, 0x74, 0x68, 0x43, 0x68, 0x65, 0x63, 0x6b, 0x12, 0x1b, 0x2e, + 0x63, 0x6c, 0x75, 0x73, 0x74, 0x65, 0x72, 0x2e, 0x48, 0x65, 0x61, 0x6c, 0x74, 0x68, 0x43, 0x68, + 0x65, 0x63, 0x6b, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x1c, 0x2e, 0x63, 0x6c, 0x75, + 0x73, 0x74, 0x65, 0x72, 0x2e, 0x48, 0x65, 0x61, 0x6c, 0x74, 0x68, 0x43, 0x68, 0x65, 0x63, 0x6b, + 0x50, 0x72, 0x6f, 0x67, 0x72, 0x65, 0x73, 0x73, 0x30, 0x01, 0x42, 0x4b, 0x0a, 0x0f, 0x63, 0x6f, + 0x6d, 0x2e, 0x63, 0x6c, 0x75, 0x73, 0x74, 0x65, 0x72, 0x2e, 0x61, 0x70, 0x69, 0x42, 0x0a, 0x43, + 0x6c, 0x75, 0x73, 0x74, 0x65, 0x72, 0x41, 0x70, 0x69, 0x50, 0x01, 0x5a, 0x2a, 0x67, 0x69, 0x74, + 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x74, 0x61, 0x6c, 0x6f, 0x73, 0x2d, 0x73, 0x79, + 0x73, 0x74, 0x65, 0x6d, 0x73, 0x2f, 0x74, 0x61, 0x6c, 0x6f, 0x73, 0x2f, 0x61, 0x70, 0x69, 0x2f, + 0x63, 0x6c, 0x75, 0x73, 0x74, 0x65, 0x72, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, +} + +var ( + file_cluster_cluster_proto_rawDescOnce sync.Once + file_cluster_cluster_proto_rawDescData = file_cluster_cluster_proto_rawDesc +) + +func file_cluster_cluster_proto_rawDescGZIP() []byte { + file_cluster_cluster_proto_rawDescOnce.Do(func() { + file_cluster_cluster_proto_rawDescData = protoimpl.X.CompressGZIP(file_cluster_cluster_proto_rawDescData) + }) + return file_cluster_cluster_proto_rawDescData +} + +var ( + file_cluster_cluster_proto_msgTypes = make([]protoimpl.MessageInfo, 3) + file_cluster_cluster_proto_goTypes = []interface{}{ + (*HealthCheckRequest)(nil), // 0: cluster.HealthCheckRequest + (*ClusterInfo)(nil), // 1: cluster.ClusterInfo + (*HealthCheckProgress)(nil), // 2: cluster.HealthCheckProgress + (*duration.Duration)(nil), // 3: google.protobuf.Duration + (*common.Metadata)(nil), // 4: common.Metadata + } +) + +var file_cluster_cluster_proto_depIdxs = []int32{ + 3, // 0: cluster.HealthCheckRequest.wait_timeout:type_name -> google.protobuf.Duration + 1, // 1: cluster.HealthCheckRequest.cluster_info:type_name -> cluster.ClusterInfo + 4, // 2: cluster.HealthCheckProgress.metadata:type_name -> common.Metadata + 0, // 3: cluster.ClusterService.HealthCheck:input_type -> cluster.HealthCheckRequest + 2, // 4: cluster.ClusterService.HealthCheck:output_type -> cluster.HealthCheckProgress + 4, // [4:5] is the sub-list for method output_type + 3, // [3:4] is the sub-list for method input_type + 3, // [3:3] is the sub-list for extension type_name + 3, // [3:3] is the sub-list for extension extendee + 0, // [0:3] is the sub-list for field type_name +} + +func init() { file_cluster_cluster_proto_init() } +func file_cluster_cluster_proto_init() { + if File_cluster_cluster_proto != nil { + return + } + if !protoimpl.UnsafeEnabled { + file_cluster_cluster_proto_msgTypes[0].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*HealthCheckRequest); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_cluster_cluster_proto_msgTypes[1].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*ClusterInfo); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_cluster_cluster_proto_msgTypes[2].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*HealthCheckProgress); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + } + type x struct{} + out := protoimpl.TypeBuilder{ + File: protoimpl.DescBuilder{ + GoPackagePath: reflect.TypeOf(x{}).PkgPath(), + RawDescriptor: file_cluster_cluster_proto_rawDesc, + NumEnums: 0, + NumMessages: 3, + NumExtensions: 0, + NumServices: 1, + }, + GoTypes: file_cluster_cluster_proto_goTypes, + DependencyIndexes: file_cluster_cluster_proto_depIdxs, + MessageInfos: file_cluster_cluster_proto_msgTypes, + }.Build() + File_cluster_cluster_proto = out.File + file_cluster_cluster_proto_rawDesc = nil + file_cluster_cluster_proto_goTypes = nil + file_cluster_cluster_proto_depIdxs = nil +} + +// Reference imports to suppress errors if they are not otherwise used. +var ( + _ context.Context + _ grpc.ClientConnInterface +) + +// This is a compile-time assertion to ensure that this generated file +// is compatible with the grpc package it is being compiled against. +const _ = grpc.SupportPackageIsVersion6 + +// ClusterServiceClient is the client API for ClusterService service. +// +// For semantics around ctx use and closing/ending streaming RPCs, please refer to https://godoc.org/google.golang.org/grpc#ClientConn.NewStream. +type ClusterServiceClient interface { + HealthCheck(ctx context.Context, in *HealthCheckRequest, opts ...grpc.CallOption) (ClusterService_HealthCheckClient, error) +} + +type clusterServiceClient struct { + cc grpc.ClientConnInterface +} + +func NewClusterServiceClient(cc grpc.ClientConnInterface) ClusterServiceClient { + return &clusterServiceClient{cc} +} + +func (c *clusterServiceClient) HealthCheck(ctx context.Context, in *HealthCheckRequest, opts ...grpc.CallOption) (ClusterService_HealthCheckClient, error) { + stream, err := c.cc.NewStream(ctx, &_ClusterService_serviceDesc.Streams[0], "/cluster.ClusterService/HealthCheck", opts...) + if err != nil { + return nil, err + } + x := &clusterServiceHealthCheckClient{stream} + if err := x.ClientStream.SendMsg(in); err != nil { + return nil, err + } + if err := x.ClientStream.CloseSend(); err != nil { + return nil, err + } + return x, nil +} + +type ClusterService_HealthCheckClient interface { + Recv() (*HealthCheckProgress, error) + grpc.ClientStream +} + +type clusterServiceHealthCheckClient struct { + grpc.ClientStream +} + +func (x *clusterServiceHealthCheckClient) Recv() (*HealthCheckProgress, error) { + m := new(HealthCheckProgress) + if err := x.ClientStream.RecvMsg(m); err != nil { + return nil, err + } + return m, nil +} + +// ClusterServiceServer is the server API for ClusterService service. +type ClusterServiceServer interface { + HealthCheck(*HealthCheckRequest, ClusterService_HealthCheckServer) error +} + +// UnimplementedClusterServiceServer can be embedded to have forward compatible implementations. +type UnimplementedClusterServiceServer struct { +} + +func (*UnimplementedClusterServiceServer) HealthCheck(*HealthCheckRequest, ClusterService_HealthCheckServer) error { + return status.Errorf(codes.Unimplemented, "method HealthCheck not implemented") +} + +func RegisterClusterServiceServer(s *grpc.Server, srv ClusterServiceServer) { + s.RegisterService(&_ClusterService_serviceDesc, srv) +} + +func _ClusterService_HealthCheck_Handler(srv interface{}, stream grpc.ServerStream) error { + m := new(HealthCheckRequest) + if err := stream.RecvMsg(m); err != nil { + return err + } + return srv.(ClusterServiceServer).HealthCheck(m, &clusterServiceHealthCheckServer{stream}) +} + +type ClusterService_HealthCheckServer interface { + Send(*HealthCheckProgress) error + grpc.ServerStream +} + +type clusterServiceHealthCheckServer struct { + grpc.ServerStream +} + +func (x *clusterServiceHealthCheckServer) Send(m *HealthCheckProgress) error { + return x.ServerStream.SendMsg(m) +} + +var _ClusterService_serviceDesc = grpc.ServiceDesc{ + ServiceName: "cluster.ClusterService", + HandlerType: (*ClusterServiceServer)(nil), + Methods: []grpc.MethodDesc{}, + Streams: []grpc.StreamDesc{ + { + StreamName: "HealthCheck", + Handler: _ClusterService_HealthCheck_Handler, + ServerStreams: true, + }, + }, + Metadata: "cluster/cluster.proto", +} diff --git a/api/cluster/cluster.proto b/api/cluster/cluster.proto new file mode 100644 index 000000000..a74b64b05 --- /dev/null +++ b/api/cluster/cluster.proto @@ -0,0 +1,32 @@ +syntax = "proto3"; + +package cluster; + +option go_package = "github.com/talos-systems/talos/api/cluster"; +option java_multiple_files = true; +option java_outer_classname = "ClusterApi"; +option java_package = "com.cluster.api"; + +import "google/protobuf/duration.proto"; +import "common/common.proto"; + +// The cluster service definition. +service ClusterService { + rpc HealthCheck(HealthCheckRequest) returns (stream HealthCheckProgress); +} + +message HealthCheckRequest { + google.protobuf.Duration wait_timeout = 1; + ClusterInfo cluster_info = 2; +} + +message ClusterInfo { + repeated string control_plane_nodes = 1; + repeated string worker_nodes = 2; + string force_endpoint = 3; +} + +message HealthCheckProgress { + common.Metadata metadata = 1; + string message = 2; +} diff --git a/cmd/talosctl/cmd/talos/crashdump.go b/cmd/talosctl/cmd/talos/crashdump.go index 30dc849fb..6938bf635 100644 --- a/cmd/talosctl/cmd/talos/crashdump.go +++ b/cmd/talosctl/cmd/talos/crashdump.go @@ -14,6 +14,10 @@ import ( "github.com/talos-systems/talos/pkg/client" ) +var crashdumpCmdFlags struct { + clusterState clusterNodes +} + // crashdumpCmd represents the crashdump command. var crashdumpCmd = &cobra.Command{ Use: "crashdump", @@ -29,7 +33,7 @@ var crashdumpCmd = &cobra.Command{ worker := cluster.APICrashDumper{ ClientProvider: clientProvider, - Info: &clusterState, + Info: &crashdumpCmdFlags.clusterState, } worker.CrashDump(ctx, os.Stdout) @@ -41,7 +45,7 @@ var crashdumpCmd = &cobra.Command{ func init() { addCommand(crashdumpCmd) - crashdumpCmd.Flags().StringVar(&clusterState.InitNode, "init-node", "", "specify IPs of init node") - crashdumpCmd.Flags().StringSliceVar(&clusterState.ControlPlaneNodes, "control-plane-nodes", nil, "specify IPs of control plane nodes") - crashdumpCmd.Flags().StringSliceVar(&clusterState.WorkerNodes, "worker-nodes", nil, "specify IPs of worker nodes") + crashdumpCmd.Flags().StringVar(&crashdumpCmdFlags.clusterState.InitNode, "init-node", "", "specify IPs of init node") + crashdumpCmd.Flags().StringSliceVar(&crashdumpCmdFlags.clusterState.ControlPlaneNodes, "control-plane-nodes", nil, "specify IPs of control plane nodes") + crashdumpCmd.Flags().StringSliceVar(&crashdumpCmdFlags.clusterState.WorkerNodes, "worker-nodes", nil, "specify IPs of worker nodes") } diff --git a/cmd/talosctl/cmd/talos/health.go b/cmd/talosctl/cmd/talos/health.go index f6dd117e7..63a654362 100644 --- a/cmd/talosctl/cmd/talos/health.go +++ b/cmd/talosctl/cmd/talos/health.go @@ -6,10 +6,17 @@ package talos import ( "context" + "fmt" + "io" + "os" "time" "github.com/spf13/cobra" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" + clusterapi "github.com/talos-systems/talos/api/cluster" + "github.com/talos-systems/talos/cmd/talosctl/pkg/talos/helpers" "github.com/talos-systems/talos/internal/app/machined/pkg/runtime" "github.com/talos-systems/talos/internal/pkg/cluster" "github.com/talos-systems/talos/internal/pkg/cluster/check" @@ -39,11 +46,12 @@ func (cluster *clusterNodes) NodesByType(t runtime.MachineType) []string { } } -var ( +var healthCmdFlags struct { clusterState clusterNodes clusterWaitTimeout time.Duration forceEndpoint string -) + runOnServer bool +} // healthCmd represents the health command. var healthCmd = &cobra.Command{ @@ -53,38 +61,88 @@ var healthCmd = &cobra.Command{ Args: cobra.NoArgs, RunE: func(cmd *cobra.Command, args []string) error { return WithClient(func(ctx context.Context, c *client.Client) error { - clientProvider := &cluster.ConfigClientProvider{ - DefaultClient: c, - } - defer clientProvider.Close() //nolint: errcheck - - state := struct { - cluster.ClientProvider - cluster.K8sProvider - cluster.Info - }{ - ClientProvider: clientProvider, - K8sProvider: &cluster.KubernetesClient{ - ClientProvider: clientProvider, - ForceEndpoint: forceEndpoint, - }, - Info: &clusterState, + if healthCmdFlags.runOnServer { + return healthOnServer(ctx, c) } - // Run cluster readiness checks - checkCtx, checkCtxCancel := context.WithTimeout(ctx, clusterWaitTimeout) - defer checkCtxCancel() - - return check.Wait(checkCtx, &state, append(check.DefaultClusterChecks(), check.ExtraClusterChecks()...), check.StderrReporter()) + return healthOnClient(ctx, c) }) }, } +func healthOnClient(ctx context.Context, c *client.Client) error { + clientProvider := &cluster.ConfigClientProvider{ + DefaultClient: c, + } + defer clientProvider.Close() //nolint: errcheck + + state := struct { + cluster.ClientProvider + cluster.K8sProvider + cluster.Info + }{ + ClientProvider: clientProvider, + K8sProvider: &cluster.KubernetesClient{ + ClientProvider: clientProvider, + ForceEndpoint: healthCmdFlags.forceEndpoint, + }, + Info: &healthCmdFlags.clusterState, + } + + // Run cluster readiness checks + checkCtx, checkCtxCancel := context.WithTimeout(ctx, healthCmdFlags.clusterWaitTimeout) + defer checkCtxCancel() + + return check.Wait(checkCtx, &state, append(check.DefaultClusterChecks(), check.ExtraClusterChecks()...), check.StderrReporter()) +} + +func healthOnServer(ctx context.Context, c *client.Client) error { + if err := helpers.FailIfMultiNodes(ctx, "health"); err != nil { + return err + } + + controlPlaneNodes := healthCmdFlags.clusterState.ControlPlaneNodes + if healthCmdFlags.clusterState.InitNode != "" { + controlPlaneNodes = append(controlPlaneNodes, healthCmdFlags.clusterState.InitNode) + } + + client, err := c.ClusterHealthCheck(ctx, healthCmdFlags.clusterWaitTimeout, &clusterapi.ClusterInfo{ + ControlPlaneNodes: controlPlaneNodes, + WorkerNodes: healthCmdFlags.clusterState.WorkerNodes, + ForceEndpoint: healthCmdFlags.forceEndpoint, + }) + if err != nil { + return err + } + + if err := client.CloseSend(); err != nil { + return err + } + + for { + msg, err := client.Recv() + if err != nil { + if err == io.EOF || status.Code(err) == codes.Canceled { + return nil + } + + return err + } + + if msg.GetMetadata().GetError() != "" { + return fmt.Errorf("healthcheck error: %s", msg.GetMetadata().GetError()) + } + + fmt.Fprintln(os.Stderr, msg.GetMessage()) + } +} + func init() { addCommand(healthCmd) - healthCmd.Flags().StringVar(&clusterState.InitNode, "init-node", "", "specify IPs of init node") - healthCmd.Flags().StringSliceVar(&clusterState.ControlPlaneNodes, "control-plane-nodes", nil, "specify IPs of control plane nodes") - healthCmd.Flags().StringSliceVar(&clusterState.WorkerNodes, "worker-nodes", nil, "specify IPs of worker nodes") - healthCmd.Flags().DurationVar(&clusterWaitTimeout, "wait-timeout", 20*time.Minute, "timeout to wait for the cluster to be ready") - healthCmd.Flags().StringVar(&forceEndpoint, "k8s-endpoint", "", "use endpoint instead of kubeconfig default") + healthCmd.Flags().StringVar(&healthCmdFlags.clusterState.InitNode, "init-node", "", "specify IPs of init node") + healthCmd.Flags().StringSliceVar(&healthCmdFlags.clusterState.ControlPlaneNodes, "control-plane-nodes", nil, "specify IPs of control plane nodes") + healthCmd.Flags().StringSliceVar(&healthCmdFlags.clusterState.WorkerNodes, "worker-nodes", nil, "specify IPs of worker nodes") + healthCmd.Flags().DurationVar(&healthCmdFlags.clusterWaitTimeout, "wait-timeout", 20*time.Minute, "timeout to wait for the cluster to be ready") + healthCmd.Flags().StringVar(&healthCmdFlags.forceEndpoint, "k8s-endpoint", "", "use endpoint instead of kubeconfig default") + healthCmd.Flags().BoolVar(&healthCmdFlags.runOnServer, "server", true, "run server-side check") } diff --git a/docs/talosctl/talosctl_health.md b/docs/talosctl/talosctl_health.md index 4923e646b..004b1019b 100644 --- a/docs/talosctl/talosctl_health.md +++ b/docs/talosctl/talosctl_health.md @@ -18,6 +18,7 @@ talosctl health [flags] -h, --help help for health --init-node string specify IPs of init node --k8s-endpoint string use endpoint instead of kubeconfig default + --server run server-side check (default true) --wait-timeout duration timeout to wait for the cluster to be ready (default 20m0s) --worker-nodes strings specify IPs of worker nodes ``` diff --git a/internal/app/apid/main.go b/internal/app/apid/main.go index dc3c64156..46c0f91db 100644 --- a/internal/app/apid/main.go +++ b/internal/app/apid/main.go @@ -11,6 +11,7 @@ import ( "strings" "github.com/talos-systems/grpc-proxy/proxy" + "golang.org/x/sync/errgroup" "google.golang.org/grpc" "google.golang.org/grpc/credentials" @@ -79,6 +80,7 @@ func main() { "/machine.MachineService/Logs", "/machine.MachineService/Read", "/os.OSService/Dmesg", + "/cluster.ClusterService/HealthCheck", } { router.RegisterStreamedRegex("^" + regexp.QuoteMeta(methodName) + "$") } @@ -86,23 +88,45 @@ func main() { // register future pattern: method should have suffix "Stream" router.RegisterStreamedRegex("Stream$") - err = factory.ListenAndServe( - router, - factory.Port(constants.ApidPort), - factory.WithDefaultLog(), - factory.ServerOptions( - grpc.Creds( - credentials.NewTLS(serverTLSConfig), + var errGroup errgroup.Group + + errGroup.Go(func() error { + return factory.ListenAndServe( + router, + factory.Port(constants.ApidPort), + factory.WithDefaultLog(), + factory.ServerOptions( + grpc.Creds( + credentials.NewTLS(serverTLSConfig), + ), + grpc.CustomCodec(proxy.Codec()), + grpc.UnknownServiceHandler( + proxy.TransparentHandler( + router.Director, + proxy.WithStreamedDetector(router.StreamedDetector), + )), ), - grpc.CustomCodec(proxy.Codec()), - grpc.UnknownServiceHandler( - proxy.TransparentHandler( - router.Director, - proxy.WithStreamedDetector(router.StreamedDetector), - )), - ), - ) - if err != nil { + ) + }) + + errGroup.Go(func() error { + return factory.ListenAndServe( + router, + factory.Network("unix"), + factory.SocketPath(constants.APISocketPath), + factory.WithDefaultLog(), + factory.ServerOptions( + grpc.CustomCodec(proxy.Codec()), + grpc.UnknownServiceHandler( + proxy.TransparentHandler( + router.Director, + proxy.WithStreamedDetector(router.StreamedDetector), + )), + ), + ) + }) + + if err := errGroup.Wait(); err != nil { log.Fatalf("listen: %v", err) } } diff --git a/internal/app/machined/internal/server/v1alpha1/v1alpha1_cluster.go b/internal/app/machined/internal/server/v1alpha1/v1alpha1_cluster.go new file mode 100644 index 000000000..bdaee87f3 --- /dev/null +++ b/internal/app/machined/internal/server/v1alpha1/v1alpha1_cluster.go @@ -0,0 +1,128 @@ +// This Source Code Form is subject to the terms of the Mozilla Public +// License, v. 2.0. If a copy of the MPL was not distributed with this +// file, You can obtain one at http://mozilla.org/MPL/2.0/. + +package runtime + +import ( + "context" + "fmt" + "strings" + + clusterapi "github.com/talos-systems/talos/api/cluster" + + "github.com/talos-systems/talos/internal/app/machined/pkg/runtime" + "github.com/talos-systems/talos/internal/pkg/cluster" + "github.com/talos-systems/talos/internal/pkg/cluster/check" + "github.com/talos-systems/talos/internal/pkg/conditions" +) + +// HealthCheck implements the cluster.ClusterServer interface. +func (s *Server) HealthCheck(in *clusterapi.HealthCheckRequest, srv clusterapi.ClusterService_HealthCheckServer) error { + clientProvider := &cluster.LocalClientProvider{} + defer clientProvider.Close() //nolint: errcheck + + k8sProvider := &cluster.KubernetesClient{ + ClientProvider: clientProvider, + ForceEndpoint: in.GetClusterInfo().GetForceEndpoint(), + } + clusterState := clusterState{ + controlPlaneNodes: in.GetClusterInfo().GetControlPlaneNodes(), + workerNodes: in.GetClusterInfo().GetWorkerNodes(), + } + + state := struct { + cluster.ClientProvider + cluster.K8sProvider + cluster.Info + }{ + ClientProvider: clientProvider, + K8sProvider: k8sProvider, + Info: &clusterState, + } + + // Run cluster readiness checks + checkCtx, checkCtxCancel := context.WithTimeout(srv.Context(), in.WaitTimeout.AsDuration()) + defer checkCtxCancel() + + if err := clusterState.resolve(checkCtx, k8sProvider); err != nil { + return fmt.Errorf("error discovering nodes: %w", err) + } + + if err := srv.Send(&clusterapi.HealthCheckProgress{ + Message: fmt.Sprintf("discovered nodes: %s", &clusterState), + }); err != nil { + return err + } + + return check.Wait(checkCtx, &state, append(check.DefaultClusterChecks(), check.ExtraClusterChecks()...), &healthReporter{srv: srv}) +} + +type healthReporter struct { + srv clusterapi.ClusterService_HealthCheckServer + lastLine string +} + +func (hr *healthReporter) Update(condition conditions.Condition) { + line := fmt.Sprintf("waiting for %s", condition) + + if line != hr.lastLine { + hr.srv.Send(&clusterapi.HealthCheckProgress{ //nolint: errcheck + Message: strings.TrimSpace(line), + }) + + hr.lastLine = line + } +} + +type clusterState struct { + controlPlaneNodes []string + workerNodes []string +} + +func (cluster *clusterState) Nodes() []string { + return append(cluster.controlPlaneNodes, cluster.workerNodes...) +} + +func (cluster *clusterState) NodesByType(t runtime.MachineType) []string { + switch t { + case runtime.MachineTypeInit: + return nil + case runtime.MachineTypeControlPlane: + return cluster.controlPlaneNodes + case runtime.MachineTypeJoin: + return cluster.workerNodes + default: + panic("unsupported machine type") + } +} + +func (cluster *clusterState) resolve(ctx context.Context, k8sProvider *cluster.KubernetesClient) error { + var err error + + if len(cluster.controlPlaneNodes) == 0 { + if _, err = k8sProvider.K8sClient(ctx); err != nil { + return err + } + + if cluster.controlPlaneNodes, err = k8sProvider.KubeHelper.MasterIPs(ctx); err != nil { + return err + } + } + + if len(cluster.workerNodes) == 0 { + if _, err = k8sProvider.K8sClient(ctx); err != nil { + return err + } + + if cluster.workerNodes, err = k8sProvider.KubeHelper.WorkerIPs(ctx); err != nil { + return err + } + } + + return nil +} + +func (cluster *clusterState) String() string { + return fmt.Sprintf("control plane: %q, worker: %q", cluster.controlPlaneNodes, cluster.workerNodes) +} diff --git a/internal/app/machined/internal/server/v1alpha1/v1alpha1_server.go b/internal/app/machined/internal/server/v1alpha1/v1alpha1_server.go index 469994fa5..69f7804b2 100644 --- a/internal/app/machined/internal/server/v1alpha1/v1alpha1_server.go +++ b/internal/app/machined/internal/server/v1alpha1/v1alpha1_server.go @@ -32,6 +32,7 @@ import ( "golang.org/x/sys/unix" "google.golang.org/grpc" + "github.com/talos-systems/talos/api/cluster" "github.com/talos-systems/talos/api/common" "github.com/talos-systems/talos/api/machine" osapi "github.com/talos-systems/talos/api/os" @@ -68,6 +69,7 @@ func (s *Server) Register(obj *grpc.Server) { machine.RegisterMachineServiceServer(obj, s) osapi.RegisterOSServiceServer(obj, &osdServer{Server: s}) //nolint: staticcheck + cluster.RegisterClusterServiceServer(obj, s) } // Reboot implements the machine.MachineServer interface. diff --git a/internal/app/machined/pkg/system/services/apid.go b/internal/app/machined/pkg/system/services/apid.go index 836d3b7b0..0175b3991 100644 --- a/internal/app/machined/pkg/system/services/apid.go +++ b/internal/app/machined/pkg/system/services/apid.go @@ -9,6 +9,7 @@ import ( "context" "fmt" "net" + "os" "path/filepath" "strings" "time" @@ -73,11 +74,19 @@ func (o *APID) DependsOn(r runtime.Runtime) []string { return []string{"containerd", "networkd", "timed"} } +// Runner implements the Service interface. +// +//nolint: gocyclo func (o *APID) Runner(r runtime.Runtime) (runner.Runner, error) { image := "talos/apid" endpoints := []string{"127.0.0.1"} + // Ensure socket dir exists + if err := os.MkdirAll(filepath.Dir(constants.APISocketPath), 0750); err != nil { + return nil, err + } + if r.Config().Machine().Type() == runtime.MachineTypeJoin { opts := []retry.Option{retry.WithUnits(3 * time.Second), retry.WithJitter(time.Second)} @@ -117,6 +126,7 @@ func (o *APID) Runner(r runtime.Runtime) (runner.Runner, error) { {Type: "bind", Destination: "/etc/ssl", Source: "/etc/ssl", Options: []string{"bind", "ro"}}, {Type: "bind", Destination: constants.ConfigPath, Source: constants.ConfigPath, Options: []string{"rbind", "ro"}}, {Type: "bind", Destination: filepath.Dir(constants.RouterdSocketPath), Source: filepath.Dir(constants.RouterdSocketPath), Options: []string{"rbind", "ro"}}, + {Type: "bind", Destination: filepath.Dir(constants.APISocketPath), Source: filepath.Dir(constants.APISocketPath), Options: []string{"rbind", "rw"}}, } env := []string{} diff --git a/internal/app/routerd/main.go b/internal/app/routerd/main.go index 040f4eb77..9ba249e67 100644 --- a/internal/app/routerd/main.go +++ b/internal/app/routerd/main.go @@ -33,6 +33,7 @@ func main() { router.RegisterLocalBackend("machine.MachineService", machinedBackend) router.RegisterLocalBackend("time.TimeService", backend.NewLocal("timed", constants.TimeSocketPath)) router.RegisterLocalBackend("network.NetworkService", backend.NewLocal("networkd", constants.NetworkSocketPath)) + router.RegisterLocalBackend("cluster.ClusterService", machinedBackend) err := factory.ListenAndServe( router, diff --git a/internal/integration/cli/health.go b/internal/integration/cli/health.go index e695fb178..ff469e58e 100644 --- a/internal/integration/cli/health.go +++ b/internal/integration/cli/health.go @@ -25,13 +25,13 @@ func (suite *HealthSuite) SuiteName() string { return "cli.HealthSuite" } -// TestRun does successful health check run. -func (suite *HealthSuite) TestRun() { +// TestClientSide does successful health check run from client-side. +func (suite *HealthSuite) TestClientSide() { if suite.Cluster == nil { suite.T().Skip("Cluster is not available, skipping test") } - args := []string{} + args := []string{"--server=false"} bootstrapAPIIsUsed := true @@ -86,6 +86,15 @@ func (suite *HealthSuite) TestRun() { ) } +// TestServerSide does successful health check run from server-side. +func (suite *HealthSuite) TestServerSide() { + suite.RunCLI([]string{"health"}, + base.StderrNotEmpty(), + base.StdoutEmpty(), + base.StderrShouldMatch(regexp.MustCompile(`waiting for all k8s nodes to report ready`)), + ) +} + func init() { allSuites = append(allSuites, new(HealthSuite)) } diff --git a/internal/pkg/cluster/kubelet.go b/internal/pkg/cluster/kubelet.go new file mode 100644 index 000000000..8415e58d3 --- /dev/null +++ b/internal/pkg/cluster/kubelet.go @@ -0,0 +1,37 @@ +// This Source Code Form is subject to the terms of the Mozilla Public +// License, v. 2.0. If a copy of the MPL was not distributed with this +// file, You can obtain one at http://mozilla.org/MPL/2.0/. + +package cluster + +import ( + "context" + + k8s "k8s.io/client-go/kubernetes" + + "github.com/talos-systems/talos/pkg/kubernetes" +) + +// KubernetesFromKubeletClient provides Kubernetes client built from local kubelet config. +type KubernetesFromKubeletClient struct { + KubeHelper *kubernetes.Client + clientset *k8s.Clientset +} + +// K8sClient builds Kubernetes client from local kubelet config. +// +// Kubernetes client instance is cached. +func (k *KubernetesFromKubeletClient) K8sClient(ctx context.Context) (*k8s.Clientset, error) { + if k.clientset != nil { + return k.clientset, nil + } + + var err error + if k.KubeHelper, err = kubernetes.NewClientFromKubeletKubeconfig(); err != nil { + return nil, err + } + + k.clientset = k.KubeHelper.Clientset + + return k.clientset, nil +} diff --git a/internal/pkg/cluster/kubernetes.go b/internal/pkg/cluster/kubernetes.go index f7e0da4d0..096f557bb 100644 --- a/internal/pkg/cluster/kubernetes.go +++ b/internal/pkg/cluster/kubernetes.go @@ -12,6 +12,8 @@ import ( "k8s.io/client-go/kubernetes" "k8s.io/client-go/tools/clientcmd" clientcmdapi "k8s.io/client-go/tools/clientcmd/api" + + k8s "github.com/talos-systems/talos/pkg/kubernetes" ) // KubernetesClient provides Kubernetes client built via Talos API Kubeconfig. @@ -22,7 +24,8 @@ type KubernetesClient struct { // ForceEndpoint overrides default Kubernetes API endpoint. ForceEndpoint string - clientset *kubernetes.Clientset + KubeHelper *k8s.Client + clientset *kubernetes.Clientset } // K8sClient builds Kubernetes client via Talos Kubeconfig API. @@ -57,10 +60,11 @@ func (k *KubernetesClient) K8sClient(ctx context.Context) (*kubernetes.Clientset config.Host = fmt.Sprintf("%s:%d", k.ForceEndpoint, 6443) } - clientset, err := kubernetes.NewForConfig(config) - if err == nil { - k.clientset = clientset + if k.KubeHelper, err = k8s.NewForConfig(config); err != nil { + return nil, err } - return clientset, err + k.clientset = k.KubeHelper.Clientset + + return k.clientset, nil } diff --git a/internal/pkg/cluster/local.go b/internal/pkg/cluster/local.go new file mode 100644 index 000000000..c15b5f94e --- /dev/null +++ b/internal/pkg/cluster/local.go @@ -0,0 +1,51 @@ +// This Source Code Form is subject to the terms of the Mozilla Public +// License, v. 2.0. If a copy of the MPL was not distributed with this +// file, You can obtain one at http://mozilla.org/MPL/2.0/. + +package cluster + +import ( + "context" + "fmt" + + "google.golang.org/grpc" + + "github.com/talos-systems/talos/pkg/client" + "github.com/talos-systems/talos/pkg/constants" +) + +// LocalClientProvider builds Talos client to connect to same-node apid instance over file socket. +type LocalClientProvider struct { + client *client.Client +} + +// Client returns Talos client instance for default (if no endpoints are given) or +// specific endpoints. +// +// Client implements ClientProvider interface. +func (c *LocalClientProvider) Client(endpoints ...string) (*client.Client, error) { + if len(endpoints) > 0 { + return nil, fmt.Errorf("custom endpoints not supported with LocalClientProvider") + } + + var err error + + if c.client == nil { + c.client, err = client.New(context.TODO(), client.WithUnixSocket(constants.APISocketPath), client.WithGRPCDialOptions(grpc.WithInsecure())) + } + + return c.client, err +} + +// Close all the client connections. +func (c *LocalClientProvider) Close() error { + if c.client != nil { + if err := c.client.Close(); err != nil { + return err + } + + c.client = nil + } + + return nil +} diff --git a/pkg/client/client.go b/pkg/client/client.go index f64367ee3..5cca0c59f 100644 --- a/pkg/client/client.go +++ b/pkg/client/client.go @@ -16,15 +16,18 @@ import ( "fmt" "io" "strings" + "time" "github.com/golang/protobuf/ptypes/empty" "google.golang.org/grpc" "google.golang.org/grpc/codes" "google.golang.org/grpc/credentials" "google.golang.org/grpc/status" + "google.golang.org/protobuf/types/known/durationpb" grpctls "github.com/talos-systems/talos/pkg/grpc/tls" + clusterapi "github.com/talos-systems/talos/api/cluster" "github.com/talos-systems/talos/api/common" machineapi "github.com/talos-systems/talos/api/machine" networkapi "github.com/talos-systems/talos/api/network" @@ -49,11 +52,16 @@ type Client struct { MachineClient machineapi.MachineServiceClient TimeClient timeapi.TimeServiceClient NetworkClient networkapi.NetworkServiceClient + ClusterClient clusterapi.ClusterServiceClient } func (c *Client) resolveConfigContext() error { var ok bool + if c.options.unixSocketPath != "" { + return nil + } + if c.options.configContext != nil { return nil } @@ -82,6 +90,10 @@ func (c *Client) resolveConfigContext() error { } func (c *Client) getEndpoints() []string { + if c.options.unixSocketPath != "" { + return []string{c.options.unixSocketPath} + } + if len(c.options.endpointsOverride) > 0 { return c.options.endpointsOverride } @@ -121,6 +133,7 @@ func New(ctx context.Context, opts ...OptionFunc) (c *Client, err error) { c.MachineClient = machineapi.NewMachineServiceClient(c.conn) c.TimeClient = timeapi.NewTimeServiceClient(c.conn) c.NetworkClient = networkapi.NewNetworkServiceClient(c.conn) + c.ClusterClient = clusterapi.NewClusterServiceClient(c.conn) return c, nil } @@ -128,38 +141,46 @@ func New(ctx context.Context, opts ...OptionFunc) (c *Client, err error) { func (c *Client) getConn(ctx context.Context, opts ...grpc.DialOption) (*grpc.ClientConn, error) { endpoints := c.getEndpoints() - // NB: we use the `dns` scheme here in order to handle fancier situations - // when there is a single endpoint. - // Such possibilities include SRV records, multiple IPs from A and/or AAAA - // records, and descriptive TXT records which include things like load - // balancer specs. - target := fmt.Sprintf("dns:///%s:%d", net.FormatAddress(endpoints[0]), constants.ApidPort) + var target string - // If we are given more than one endpoint, we need to use our custom list resolver - if len(endpoints) > 1 { + switch { + case c.options.unixSocketPath != "": + target = fmt.Sprintf("unix:///%s", c.options.unixSocketPath) + case len(endpoints) > 1: target = fmt.Sprintf("%s:///%s", talosListResolverScheme, strings.Join(endpoints, ",")) + default: + // NB: we use the `dns` scheme here in order to handle fancier situations + // when there is a single endpoint. + // Such possibilities include SRV records, multiple IPs from A and/or AAAA + // records, and descriptive TXT records which include things like load + // balancer specs. + target = fmt.Sprintf("dns:///%s:%d", net.FormatAddress(endpoints[0]), constants.ApidPort) } - // Add TLS credentials to gRPC DialOptions - tlsConfig := c.options.tlsConfig - if tlsConfig == nil { - creds, err := CredentialsFromConfigContext(c.options.configContext) - if err != nil { - return nil, fmt.Errorf("failed to acquire credentials: %w", err) + dialOpts := []grpc.DialOption(nil) + + if c.options.unixSocketPath == "" { + // Add TLS credentials to gRPC DialOptions + tlsConfig := c.options.tlsConfig + if tlsConfig == nil { + creds, err := CredentialsFromConfigContext(c.options.configContext) + if err != nil { + return nil, fmt.Errorf("failed to acquire credentials: %w", err) + } + + tlsConfig, err = grpctls.New( + grpctls.WithKeypair(creds.Crt), + grpctls.WithClientAuthType(grpctls.Mutual), + grpctls.WithCACertPEM(creds.CA), + ) + if err != nil { + return nil, fmt.Errorf("failed to construct TLS credentials: %w", err) + } } - tlsConfig, err = grpctls.New( - grpctls.WithKeypair(creds.Crt), - grpctls.WithClientAuthType(grpctls.Mutual), - grpctls.WithCACertPEM(creds.CA), + dialOpts = append(dialOpts, + grpc.WithTransportCredentials(credentials.NewTLS(tlsConfig)), ) - if err != nil { - return nil, fmt.Errorf("failed to construct TLS credentials: %w", err) - } - } - - dialOpts := []grpc.DialOption{ - grpc.WithTransportCredentials(credentials.NewTLS(tlsConfig)), } dialOpts = append(dialOpts, c.options.grpcDialOptions...) @@ -255,6 +276,7 @@ func NewClient(cfg *tls.Config, endpoints []string, port int, opts ...grpc.DialO c.MachineClient = machineapi.NewMachineServiceClient(c.conn) c.TimeClient = timeapi.NewTimeServiceClient(c.conn) c.NetworkClient = networkapi.NewNetworkServiceClient(c.conn) + c.ClusterClient = clusterapi.NewClusterServiceClient(c.conn) return c, nil } @@ -704,6 +726,14 @@ func (c *Client) Read(ctx context.Context, path string) (io.ReadCloser, <-chan e return ReadStream(stream) } +// ClusterHealthCheck runs a Talos cluster health check. +func (c *Client) ClusterHealthCheck(ctx context.Context, waitTimeout time.Duration, clusterInfo *clusterapi.ClusterInfo) (clusterapi.ClusterService_HealthCheckClient, error) { + return c.ClusterClient.HealthCheck(ctx, &clusterapi.HealthCheckRequest{ + WaitTimeout: durationpb.New(waitTimeout), + ClusterInfo: clusterInfo, + }) +} + // MachineStream is a common interface for streams returned by streaming APIs. type MachineStream interface { Recv() (*common.Data, error) diff --git a/pkg/client/options.go b/pkg/client/options.go index 65666816d..2fd2f0817 100644 --- a/pkg/client/options.go +++ b/pkg/client/options.go @@ -23,6 +23,8 @@ type Options struct { contextOverride string contextOverrideSet bool + + unixSocketPath string } // OptionFunc sets an option for the creation of the Client. @@ -108,3 +110,16 @@ func WithConfigFromFile(fn string) OptionFunc { return nil } } + +// WithUnixSocket creates a Client which connects to apid over local file socket. +// +// This option disables config parsing and TLS. +// +// Connection over unix socket is only used within the Talos node. +func WithUnixSocket(path string) OptionFunc { + return func(o *Options) error { + o.unixSocketPath = path + + return nil + } +} diff --git a/pkg/constants/constants.go b/pkg/constants/constants.go index 5c45404fd..8a67fc66a 100644 --- a/pkg/constants/constants.go +++ b/pkg/constants/constants.go @@ -227,6 +227,9 @@ const ( // TalosConfigEnvVar is the environment variable for setting the Talos configuration file path. TalosConfigEnvVar = "TALOSCONFIG" + // APISocketPath is the path to file socket of apid. + APISocketPath = SystemRunPath + "/apid/apid.sock" + // MachineSocketPath is the path to file socket of machine API. MachineSocketPath = SystemRunPath + "/machined/machine.sock" diff --git a/pkg/kubernetes/kubernetes.go b/pkg/kubernetes/kubernetes.go index 7ad50b742..945086af0 100644 --- a/pkg/kubernetes/kubernetes.go +++ b/pkg/kubernetes/kubernetes.go @@ -143,7 +143,7 @@ func NewTemporaryClientFromPKI(ca *x509.PEMEncodedCertificateAndKey, endpoint *u return h, nil } -// MasterIPs cordons and drains a node in one call. +// MasterIPs returns a list of control plane endpoints (IP addresses). func (h *Client) MasterIPs(ctx context.Context) (addrs []string, err error) { endpoints, err := h.CoreV1().Endpoints("default").Get(ctx, "kubernetes", metav1.GetOptions{}) if err != nil { @@ -161,6 +161,30 @@ func (h *Client) MasterIPs(ctx context.Context) (addrs []string, err error) { return addrs, nil } +// WorkerIPs returns list of worker nodes IP addresses. +func (h *Client) WorkerIPs(ctx context.Context) (addrs []string, err error) { + resp, err := h.CoreV1().Nodes().List(ctx, metav1.ListOptions{}) + if err != nil { + return nil, err + } + + addrs = []string{} + + for _, node := range resp.Items { + if _, ok := node.Labels[constants.LabelNodeRoleMaster]; ok { + continue + } + + for _, nodeAddress := range node.Status.Addresses { + if nodeAddress.Type == corev1.NodeInternalIP { + addrs = append(addrs, nodeAddress.Address) + } + } + } + + return addrs, nil +} + // LabelNodeAsMaster labels a node with the required master label. func (h *Client) LabelNodeAsMaster(name string) (err error) { n, err := h.CoreV1().Nodes().Get(context.TODO(), name, metav1.GetOptions{})