feat: implement integration with Discovery Service

This provides integration layer with discovery service to provide
cluster discovery (and transitively KubeSpan peer discovery).

Signed-off-by: Andrey Smirnov <andrey.smirnov@talos-systems.com>
This commit is contained in:
Andrey Smirnov 2021-09-24 15:06:02 +03:00
parent 353d632ae5
commit 30ae714243
No known key found for this signature in database
GPG Key ID: 7B26396447AB6DFD
10 changed files with 745 additions and 3 deletions

1
go.mod
View File

@ -73,6 +73,7 @@ require (
github.com/spf13/cobra v1.2.1
github.com/stretchr/testify v1.7.0
github.com/talos-systems/crypto v0.3.2
github.com/talos-systems/discovery-service v0.0.3-0.20210928170742-e9d5dfa15e92
github.com/talos-systems/go-blockdevice v0.2.4-0.20210925062844-70d28650b398
github.com/talos-systems/go-cmd v0.1.0
github.com/talos-systems/go-debug v0.2.1

2
go.sum
View File

@ -1042,6 +1042,8 @@ github.com/syndtr/gocapability v0.0.0-20180916011248-d98352740cb2/go.mod h1:hkRG
github.com/syndtr/gocapability v0.0.0-20200815063812-42c35b437635/go.mod h1:hkRG7XYTFWNJGYcbNJQlaLq0fg1yr4J4t/NcTQtrfww=
github.com/talos-systems/crypto v0.3.2 h1:I+MC9ql6K29EMlbPzdSeHZInSRWdze1FX1qGGrlom8Q=
github.com/talos-systems/crypto v0.3.2/go.mod h1:xaNCB2/Bxaj+qrkdeodhRv5eKQVvKOGBBMj58MrIPY8=
github.com/talos-systems/discovery-service v0.0.3-0.20210928170742-e9d5dfa15e92 h1:Z381kVGNLIZyvmCN6yhJSA0k5ArrTApVXwxB6u6gZXM=
github.com/talos-systems/discovery-service v0.0.3-0.20210928170742-e9d5dfa15e92/go.mod h1:+9VWFbTcUChtlE0qc2fQ3Lyj1kj2AakFQ/ITnaB8Pd0=
github.com/talos-systems/go-blockdevice v0.2.3/go.mod h1:qnn/zDc09I1DA2BUDDCOSA2D0P8pIDjN8pGiRoRaQig=
github.com/talos-systems/go-blockdevice v0.2.4-0.20210925062844-70d28650b398 h1:4NH2IPnswmMfhU0Jb39vtik8xa7J3eObB1rbxhKzpO4=
github.com/talos-systems/go-blockdevice v0.2.4-0.20210925062844-70d28650b398/go.mod h1:qnn/zDc09I1DA2BUDDCOSA2D0P8pIDjN8pGiRoRaQig=

View File

@ -6,7 +6,10 @@ package cluster
import (
"context"
"encoding/base64"
"fmt"
"net"
"net/url"
"github.com/AlekSi/pointer"
"github.com/cosi-project/runtime/pkg/controller"
@ -74,6 +77,39 @@ func (ctrl *ConfigController) Run(ctx context.Context, r controller.Runtime, log
if c.Cluster().Discovery().Enabled() {
res.(*cluster.Config).TypedSpec().RegistryKubernetesEnabled = c.Cluster().Discovery().Registries().Kubernetes().Enabled()
res.(*cluster.Config).TypedSpec().RegistryServiceEnabled = c.Cluster().Discovery().Registries().Service().Enabled()
if c.Cluster().Discovery().Registries().Service().Enabled() {
var u *url.URL
u, err = url.ParseRequestURI(c.Cluster().Discovery().Registries().Service().Endpoint())
if err != nil {
return err
}
host := u.Hostname()
port := u.Port()
if port == "" {
port = "443" // use default https port
}
res.(*cluster.Config).TypedSpec().ServiceEndpoint = net.JoinHostPort(host, port)
res.(*cluster.Config).TypedSpec().ServiceEncryptionKey, err = base64.StdEncoding.DecodeString(c.Cluster().Secret())
if err != nil {
return err
}
res.(*cluster.Config).TypedSpec().ServiceClusterID = c.Cluster().ID()
} else {
res.(*cluster.Config).TypedSpec().ServiceEndpoint = ""
res.(*cluster.Config).TypedSpec().ServiceEncryptionKey = nil
res.(*cluster.Config).TypedSpec().ServiceClusterID = ""
}
} else {
res.(*cluster.Config).TypedSpec().RegistryKubernetesEnabled = false
res.(*cluster.Config).TypedSpec().RegistryServiceEnabled = false
}
return nil

View File

@ -1,6 +1,7 @@
// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this
// file, You can obtain one at http://mozilla.org/MPL/2.0/.
package cluster_test
import (
@ -29,6 +30,8 @@ func (suite *ConfigSuite) TestReconcileConfig() {
cfg := config.NewMachineConfig(&v1alpha1.Config{
ConfigVersion: "v1alpha1",
ClusterConfig: &v1alpha1.ClusterConfig{
ClusterID: "cluster1",
ClusterSecret: "kCQsKr4B28VUl7qw1sVkTDNF9fFH++ViIuKsss+C6kc=",
ClusterDiscoveryConfig: v1alpha1.ClusterDiscoveryConfig{
DiscoveryEnabled: true,
},
@ -47,6 +50,57 @@ func (suite *ConfigSuite) TestReconcileConfig() {
suite.Assert().True(spec.DiscoveryEnabled)
suite.Assert().True(spec.RegistryKubernetesEnabled)
suite.Assert().True(spec.RegistryServiceEnabled)
suite.Assert().Equal("discovery.talos.dev:443", spec.ServiceEndpoint)
suite.Assert().Equal("cluster1", spec.ServiceClusterID)
suite.Assert().Equal(
[]byte("\x90\x24\x2c\x2a\xbe\x01\xdb\xc5\x54\x97\xba\xb0\xd6\xc5\x64\x4c\x33\x45\xf5\xf1\x47\xfb\xe5\x62\x22\xe2\xac\xb2\xcf\x82\xea\x47"),
spec.ServiceEncryptionKey)
return nil
},
),
))
}
func (suite *ConfigSuite) TestReconcileConfigCustom() {
suite.Require().NoError(suite.runtime.RegisterController(&clusterctrl.ConfigController{}))
suite.startRuntime()
cfg := config.NewMachineConfig(&v1alpha1.Config{
ConfigVersion: "v1alpha1",
ClusterConfig: &v1alpha1.ClusterConfig{
ClusterID: "cluster1",
ClusterSecret: "kCQsKr4B28VUl7qw1sVkTDNF9fFH++ViIuKsss+C6kc=",
ClusterDiscoveryConfig: v1alpha1.ClusterDiscoveryConfig{
DiscoveryEnabled: true,
DiscoveryRegistries: v1alpha1.DiscoveryRegistriesConfig{
RegistryKubernetes: v1alpha1.RegistryKubernetesConfig{
RegistryDisabled: true,
},
RegistryService: v1alpha1.RegistryServiceConfig{
RegistryEndpoint: "https://[2001:470:6d:30e:565d:e162:e2a0:cf5a]:3456/",
},
},
},
},
})
suite.Require().NoError(suite.state.Create(suite.ctx, cfg))
specMD := resource.NewMetadata(config.NamespaceName, cluster.ConfigType, cluster.ConfigID, resource.VersionUndefined)
suite.Assert().NoError(retry.Constant(3*time.Second, retry.WithUnits(100*time.Millisecond)).Retry(
suite.assertResource(
specMD,
func(res resource.Resource) error {
spec := res.(*cluster.Config).TypedSpec()
suite.Assert().True(spec.DiscoveryEnabled)
suite.Assert().False(spec.RegistryKubernetesEnabled)
suite.Assert().True(spec.RegistryServiceEnabled)
suite.Assert().Equal("[2001:470:6d:30e:565d:e162:e2a0:cf5a]:3456", spec.ServiceEndpoint)
return nil
},

View File

@ -0,0 +1,398 @@
// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this
// file, You can obtain one at http://mozilla.org/MPL/2.0/.
package cluster
import (
"context"
"crypto/aes"
"crypto/cipher"
"errors"
"fmt"
"time"
"github.com/AlekSi/pointer"
"github.com/cosi-project/runtime/pkg/controller"
"github.com/cosi-project/runtime/pkg/resource"
"github.com/cosi-project/runtime/pkg/state"
"github.com/talos-systems/discovery-service/api/v1alpha1/client/pb"
discoveryclient "github.com/talos-systems/discovery-service/pkg/client"
"go.uber.org/zap"
"inet.af/netaddr"
"github.com/talos-systems/talos/pkg/machinery/config/types/v1alpha1/machine"
"github.com/talos-systems/talos/pkg/machinery/proto"
"github.com/talos-systems/talos/pkg/resources/cluster"
"github.com/talos-systems/talos/pkg/resources/config"
"github.com/talos-systems/talos/pkg/version"
)
const defaultDiscoveryTTL = 30 * time.Minute
// DiscoveryServiceController pushes Affiliate resource to the Kubernetes registry.
type DiscoveryServiceController struct {
Insecure bool // only for testing
localAffiliateID resource.ID
}
// Name implements controller.Controller interface.
func (ctrl *DiscoveryServiceController) Name() string {
return "cluster.DiscoveryServiceController"
}
// Inputs implements controller.Controller interface.
func (ctrl *DiscoveryServiceController) Inputs() []controller.Input {
return []controller.Input{
{
Namespace: config.NamespaceName,
Type: cluster.ConfigType,
ID: pointer.ToString(cluster.ConfigID),
Kind: controller.InputWeak,
},
{
Namespace: cluster.NamespaceName,
Type: cluster.IdentityType,
ID: pointer.ToString(cluster.LocalIdentity),
Kind: controller.InputWeak,
},
}
}
// Outputs implements controller.Controller interface.
func (ctrl *DiscoveryServiceController) Outputs() []controller.Output {
return []controller.Output{
{
Type: cluster.AffiliateType,
Kind: controller.OutputShared,
},
}
}
// Run implements controller.Controller interface.
//
//nolint:gocyclo,cyclop
func (ctrl *DiscoveryServiceController) Run(ctx context.Context, r controller.Runtime, logger *zap.Logger) error {
var (
client *discoveryclient.Client
clientCtxCancel context.CancelFunc
)
clientErrCh := make(chan error, 1)
defer func() {
if clientCtxCancel != nil {
clientCtxCancel()
<-clientErrCh
}
}()
notifyCh := make(chan struct{}, 1)
var (
prevLocalData *pb.Affiliate
prevLocalEndpoints []*pb.Endpoint
)
for {
select {
case <-ctx.Done():
return nil
case <-r.EventCh():
case <-notifyCh:
case err := <-clientErrCh:
if clientCtxCancel != nil {
clientCtxCancel()
}
clientCtxCancel = nil
if err != nil && !errors.Is(err, context.Canceled) {
return fmt.Errorf("error from discovery client: %w", err)
}
}
discoveryConfig, err := r.Get(ctx, resource.NewMetadata(config.NamespaceName, cluster.ConfigType, cluster.ConfigID, resource.VersionUndefined))
if err != nil {
if !state.IsNotFoundError(err) {
return fmt.Errorf("error getting discovery config: %w", err)
}
continue
}
if !discoveryConfig.(*cluster.Config).TypedSpec().RegistryServiceEnabled {
if clientCtxCancel != nil {
clientCtxCancel()
<-clientErrCh
clientCtxCancel = nil
client = nil
prevLocalData = nil
prevLocalEndpoints = nil
}
continue
}
identity, err := r.Get(ctx, resource.NewMetadata(cluster.NamespaceName, cluster.IdentityType, cluster.LocalIdentity, resource.VersionUndefined))
if err != nil {
if !state.IsNotFoundError(err) {
return fmt.Errorf("error getting local identity: %w", err)
}
continue
}
localAffiliateID := identity.(*cluster.Identity).TypedSpec().NodeID
if ctrl.localAffiliateID != localAffiliateID {
ctrl.localAffiliateID = localAffiliateID
if err = r.UpdateInputs(append(ctrl.Inputs(),
controller.Input{
Namespace: cluster.NamespaceName,
Type: cluster.AffiliateType,
ID: pointer.ToString(ctrl.localAffiliateID),
Kind: controller.InputWeak,
},
)); err != nil {
return err
}
if clientCtxCancel != nil {
clientCtxCancel()
<-clientErrCh
clientCtxCancel = nil
client = nil
prevLocalData = nil
prevLocalEndpoints = nil
}
}
affiliate, err := r.Get(ctx, resource.NewMetadata(cluster.NamespaceName, cluster.AffiliateType, ctrl.localAffiliateID, resource.VersionUndefined))
if err != nil {
if !state.IsNotFoundError(err) {
return fmt.Errorf("error getting local affiliate: %w", err)
}
continue
}
affiliateSpec := affiliate.(*cluster.Affiliate).TypedSpec()
if client == nil {
var cipher cipher.Block
cipher, err = aes.NewCipher(discoveryConfig.(*cluster.Config).TypedSpec().ServiceEncryptionKey)
if err != nil {
return fmt.Errorf("error initializing AES cipher: %w", err)
}
client, err = discoveryclient.NewClient(discoveryclient.Options{
Cipher: cipher,
Endpoint: discoveryConfig.(*cluster.Config).TypedSpec().ServiceEndpoint,
ClusterID: discoveryConfig.(*cluster.Config).TypedSpec().ServiceClusterID,
AffiliateID: localAffiliateID,
TTL: defaultDiscoveryTTL,
Insecure: ctrl.Insecure,
ClientVersion: version.Tag,
})
if err != nil {
return fmt.Errorf("error initializing discovery client: %w", err)
}
var clientCtx context.Context
clientCtx, clientCtxCancel = context.WithCancel(ctx) //nolint:govet
go func() {
clientErrCh <- client.Run(clientCtx, logger, notifyCh)
}()
}
localData := pbAffiliate(affiliateSpec)
localEndpoints := pbEndpoints(affiliateSpec)
// don't send updates on localData if it hasn't changed: this introduces positive feedback loop,
// as the watch loop will notify on self update
if !proto.Equal(localData, prevLocalData) || !equalEndpoints(localEndpoints, prevLocalEndpoints) {
if err = client.SetLocalData(&discoveryclient.Affiliate{
Affiliate: localData,
Endpoints: localEndpoints,
}, nil); err != nil {
return fmt.Errorf("error setting local affiliate data: %w", err) //nolint:govet
}
prevLocalData = localData
prevLocalEndpoints = localEndpoints
}
touchedIDs := make(map[resource.ID]struct{})
for _, discoveredAffiliate := range client.GetAffiliates() {
id := fmt.Sprintf("service/%s", discoveredAffiliate.Affiliate.NodeId)
discoveredAffiliate := discoveredAffiliate
if err = r.Modify(ctx, cluster.NewAffiliate(cluster.RawNamespaceName, id), func(res resource.Resource) error {
*res.(*cluster.Affiliate).TypedSpec() = specAffiliate(discoveredAffiliate.Affiliate, discoveredAffiliate.Endpoints)
return nil
}); err != nil {
return err
}
touchedIDs[id] = struct{}{}
}
// list keys for cleanup
list, err := r.List(ctx, resource.NewMetadata(cluster.RawNamespaceName, cluster.AffiliateType, "", resource.VersionUndefined))
if err != nil {
return fmt.Errorf("error listing resources: %w", err)
}
for _, res := range list.Items {
if res.Metadata().Owner() != ctrl.Name() {
continue
}
if _, ok := touchedIDs[res.Metadata().ID()]; !ok {
if err = r.Destroy(ctx, res.Metadata()); err != nil {
return fmt.Errorf("error cleaning up specs: %w", err)
}
}
}
}
}
func pbAffiliate(affiliate *cluster.AffiliateSpec) *pb.Affiliate {
addresses := make([][]byte, len(affiliate.Addresses))
for i := range addresses {
addresses[i], _ = affiliate.Addresses[i].MarshalBinary() //nolint:errcheck // doesn't fail
}
var kubeSpan *pb.KubeSpan
if affiliate.KubeSpan.PublicKey != "" {
kubeSpan = &pb.KubeSpan{
PublicKey: affiliate.KubeSpan.PublicKey,
}
kubeSpan.Address, _ = affiliate.KubeSpan.Address.MarshalBinary() //nolint:errcheck // doesn't fail
additionalAddresses := make([]*pb.IPPrefix, len(affiliate.KubeSpan.AdditionalAddresses))
for i := range additionalAddresses {
additionalAddresses[i] = &pb.IPPrefix{
Bits: uint32(affiliate.KubeSpan.AdditionalAddresses[i].Bits()),
}
additionalAddresses[i].Ip, _ = affiliate.KubeSpan.AdditionalAddresses[i].IP().MarshalBinary() //nolint:errcheck // doesn't fail
}
kubeSpan.AdditionalAddresses = additionalAddresses
}
return &pb.Affiliate{
NodeId: affiliate.NodeID,
Addresses: addresses,
Hostname: affiliate.Hostname,
Nodename: affiliate.Nodename,
MachineType: affiliate.MachineType.String(),
OperatingSystem: affiliate.OperatingSystem,
Kubespan: kubeSpan,
}
}
func pbEndpoints(affiliate *cluster.AffiliateSpec) []*pb.Endpoint {
if affiliate.KubeSpan.PublicKey == "" || len(affiliate.KubeSpan.Endpoints) == 0 {
return nil
}
result := make([]*pb.Endpoint, len(affiliate.KubeSpan.Endpoints))
for i := range result {
result[i] = &pb.Endpoint{
Port: uint32(affiliate.KubeSpan.Endpoints[i].Port()),
}
result[i].Ip, _ = affiliate.KubeSpan.Endpoints[i].IP().MarshalBinary() //nolint:errcheck // doesn't fail
}
return result
}
func equalEndpoints(a []*pb.Endpoint, b []*pb.Endpoint) bool {
if a == nil || b == nil {
return a == nil && b == nil
}
if len(a) != len(b) {
return false
}
for i := range a {
if !proto.Equal(a[i], b[i]) {
return false
}
}
return true
}
func specAffiliate(affiliate *pb.Affiliate, endpoints []*pb.Endpoint) cluster.AffiliateSpec {
result := cluster.AffiliateSpec{
NodeID: affiliate.NodeId,
Hostname: affiliate.Hostname,
Nodename: affiliate.Nodename,
OperatingSystem: affiliate.OperatingSystem,
}
result.MachineType, _ = machine.ParseType(affiliate.MachineType) //nolint:errcheck // ignore parse error (machine.TypeUnknown)
result.Addresses = make([]netaddr.IP, 0, len(affiliate.Addresses))
for i := range affiliate.Addresses {
var ip netaddr.IP
if err := ip.UnmarshalBinary(affiliate.Addresses[i]); err == nil {
result.Addresses = append(result.Addresses, ip)
}
}
if affiliate.Kubespan != nil {
result.KubeSpan.PublicKey = affiliate.Kubespan.PublicKey
result.KubeSpan.Address.UnmarshalBinary(affiliate.Kubespan.Address) //nolint:errcheck // ignore error, address will be zero
result.KubeSpan.AdditionalAddresses = make([]netaddr.IPPrefix, 0, len(affiliate.Kubespan.AdditionalAddresses))
for i := range affiliate.Kubespan.AdditionalAddresses {
var ip netaddr.IP
if err := ip.UnmarshalBinary(affiliate.Kubespan.AdditionalAddresses[i].Ip); err == nil {
result.KubeSpan.AdditionalAddresses = append(result.KubeSpan.AdditionalAddresses, netaddr.IPPrefixFrom(ip, uint8(affiliate.Kubespan.AdditionalAddresses[i].Bits)))
}
}
result.KubeSpan.Endpoints = make([]netaddr.IPPort, 0, len(endpoints))
for i := range endpoints {
var ip netaddr.IP
if err := ip.UnmarshalBinary(endpoints[i].Ip); err == nil {
result.KubeSpan.Endpoints = append(result.KubeSpan.Endpoints, netaddr.IPPortFrom(ip, uint16(endpoints[i].Port)))
}
}
}
return result
}

View File

@ -0,0 +1,232 @@
// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this
// file, You can obtain one at http://mozilla.org/MPL/2.0/.
package cluster_test
import (
"bytes"
"context"
"crypto/aes"
"net"
"testing"
"time"
"github.com/cosi-project/runtime/pkg/resource"
grpc_middleware "github.com/grpc-ecosystem/go-grpc-middleware"
grpc_zap "github.com/grpc-ecosystem/go-grpc-middleware/logging/zap"
grpc_ctxtags "github.com/grpc-ecosystem/go-grpc-middleware/tags"
"github.com/stretchr/testify/require"
"github.com/stretchr/testify/suite"
"github.com/talos-systems/discovery-service/api/v1alpha1/client/pb"
serverpb "github.com/talos-systems/discovery-service/api/v1alpha1/server/pb"
"github.com/talos-systems/discovery-service/pkg/client"
"github.com/talos-systems/discovery-service/pkg/server"
"github.com/talos-systems/go-retry/retry"
"go.uber.org/zap/zaptest"
"google.golang.org/grpc"
"inet.af/netaddr"
clusterctrl "github.com/talos-systems/talos/internal/app/machined/pkg/controllers/cluster"
"github.com/talos-systems/talos/pkg/machinery/config/types/v1alpha1/machine"
"github.com/talos-systems/talos/pkg/machinery/proto"
"github.com/talos-systems/talos/pkg/resources/cluster"
"github.com/talos-systems/talos/pkg/resources/config"
)
type DiscoveryServiceSuite struct {
ClusterSuite
}
func setupServer(t *testing.T) (address string) {
t.Helper()
lis, err := net.Listen("tcp", "localhost:0")
require.NoError(t, err)
logger := zaptest.NewLogger(t)
serverOptions := []grpc.ServerOption{
grpc_middleware.WithUnaryServerChain(
grpc_ctxtags.UnaryServerInterceptor(grpc_ctxtags.WithFieldExtractor(server.FieldExtractor)),
grpc_zap.UnaryServerInterceptor(logger),
),
grpc_middleware.WithStreamServerChain(
grpc_ctxtags.StreamServerInterceptor(grpc_ctxtags.WithFieldExtractor(server.FieldExtractor)),
grpc_zap.StreamServerInterceptor(logger),
),
}
s := grpc.NewServer(serverOptions...)
serverpb.RegisterClusterServer(s, server.NewTestClusterServer())
go func() {
require.NoError(t, s.Serve(lis))
}()
t.Cleanup(s.Stop)
return lis.Addr().String()
}
func (suite *DiscoveryServiceSuite) TestReconcile() {
suite.startRuntime()
suite.Require().NoError(suite.runtime.RegisterController(&clusterctrl.DiscoveryServiceController{
Insecure: true,
}))
address := setupServer(suite.T())
// regular discovery affiliate
discoveryConfig := cluster.NewConfig(config.NamespaceName, cluster.ConfigID)
discoveryConfig.TypedSpec().DiscoveryEnabled = true
discoveryConfig.TypedSpec().RegistryServiceEnabled = true
discoveryConfig.TypedSpec().ServiceEndpoint = address
discoveryConfig.TypedSpec().ServiceClusterID = "fake"
discoveryConfig.TypedSpec().ServiceEncryptionKey = bytes.Repeat([]byte{1}, 32)
suite.Require().NoError(suite.state.Create(suite.ctx, discoveryConfig))
nodeIdentity := cluster.NewIdentity(cluster.NamespaceName, cluster.LocalIdentity)
suite.Require().NoError(nodeIdentity.TypedSpec().Generate())
suite.Require().NoError(suite.state.Create(suite.ctx, nodeIdentity))
localAffiliate := cluster.NewAffiliate(cluster.NamespaceName, nodeIdentity.TypedSpec().NodeID)
*localAffiliate.TypedSpec() = cluster.AffiliateSpec{
NodeID: nodeIdentity.TypedSpec().NodeID,
Hostname: "foo.com",
Nodename: "bar",
MachineType: machine.TypeControlPlane,
Addresses: []netaddr.IP{netaddr.MustParseIP("192.168.3.4")},
KubeSpan: cluster.KubeSpanAffiliateSpec{
PublicKey: "PLPNBddmTgHJhtw0vxltq1ZBdPP9RNOEUd5JjJZzBRY=",
Address: netaddr.MustParseIP("fd50:8d60:4238:6302:f857:23ff:fe21:d1e0"),
AdditionalAddresses: []netaddr.IPPrefix{netaddr.MustParseIPPrefix("10.244.3.1/24")},
Endpoints: []netaddr.IPPort{netaddr.MustParseIPPort("10.0.0.2:51820"), netaddr.MustParseIPPort("192.168.3.4:51820")},
},
}
suite.Require().NoError(suite.state.Create(suite.ctx, localAffiliate))
// create a test client connected to the same cluster but under different affiliate ID
cipher, err := aes.NewCipher(discoveryConfig.TypedSpec().ServiceEncryptionKey)
suite.Require().NoError(err)
cli, err := client.NewClient(client.Options{
Cipher: cipher,
Endpoint: address,
ClusterID: discoveryConfig.TypedSpec().ServiceClusterID,
AffiliateID: "fake",
TTL: 5 * time.Minute,
Insecure: true,
})
suite.Require().NoError(err)
errCh := make(chan error, 1)
notifyCh := make(chan struct{}, 1)
cliCtx, cliCtxCancel := context.WithCancel(suite.ctx)
defer cliCtxCancel()
go func() {
errCh <- cli.Run(cliCtx, zaptest.NewLogger(suite.T()), notifyCh)
}()
suite.Assert().NoError(retry.Constant(3*time.Second, retry.WithUnits(100*time.Millisecond)).Retry(
func() error {
// controller should register its local affiliate, and we should see it being discovered
affiliates := cli.GetAffiliates()
if len(affiliates) != 1 {
return retry.ExpectedErrorf("affiliates len %d != 1", len(affiliates))
}
suite.Require().Len(affiliates[0].Endpoints, 2)
suite.Assert().True(proto.Equal(&pb.Affiliate{
NodeId: nodeIdentity.TypedSpec().NodeID,
Addresses: [][]byte{[]byte("\xc0\xa8\x03\x04")},
Hostname: "foo.com",
Nodename: "bar",
MachineType: "controlplane",
OperatingSystem: "",
Kubespan: &pb.KubeSpan{
PublicKey: "PLPNBddmTgHJhtw0vxltq1ZBdPP9RNOEUd5JjJZzBRY=",
Address: []byte("\xfd\x50\x8d\x60\x42\x38\x63\x02\xf8\x57\x23\xff\xfe\x21\xd1\xe0"),
AdditionalAddresses: []*pb.IPPrefix{
{
Ip: []byte("\x0a\xf4\x03\x01"),
Bits: 24,
},
},
},
}, affiliates[0].Affiliate))
suite.Assert().True(proto.Equal(
&pb.Endpoint{
Ip: []byte("\n\x00\x00\x02"),
Port: 51820,
},
affiliates[0].Endpoints[0]), "expected %v", affiliates[0].Endpoints[0])
suite.Assert().True(proto.Equal(
&pb.Endpoint{
Ip: []byte("\xc0\xa8\x03\x04"),
Port: 51820,
},
affiliates[0].Endpoints[1]), "expected %v", affiliates[0].Endpoints[1])
return nil
},
))
// inject some affiliate via our client, controller should publish it as an affiliate
suite.Require().NoError(cli.SetLocalData(&client.Affiliate{
Affiliate: &pb.Affiliate{
NodeId: "7x1SuC8Ege5BGXdAfTEff5iQnlWZLfv9h1LGMxA2pYkC",
Addresses: [][]byte{[]byte("\xc0\xa8\x03\x05")},
Hostname: "some.com",
Nodename: "some",
MachineType: "worker",
OperatingSystem: "test OS",
Kubespan: &pb.KubeSpan{
PublicKey: "1CXkdhWBm58c36kTpchR8iGlXHG1ruHa5W8gsFqD8Qs=",
Address: []byte("\xfd\x50\x8d\x60\x42\x38\x63\x02\xf8\x57\x23\xff\xfe\x21\xd1\xe1"),
AdditionalAddresses: []*pb.IPPrefix{
{
Ip: []byte("\x0a\xf4\x04\x01"),
Bits: 24,
},
},
},
},
Endpoints: []*pb.Endpoint{
{
Ip: []byte("\xc0\xa8\x03\x05"),
Port: 51820,
},
},
}, nil))
suite.Assert().NoError(retry.Constant(3*time.Second, retry.WithUnits(100*time.Millisecond)).Retry(
suite.assertResource(*cluster.NewAffiliate(cluster.RawNamespaceName, "service/7x1SuC8Ege5BGXdAfTEff5iQnlWZLfv9h1LGMxA2pYkC").Metadata(), func(r resource.Resource) error {
spec := r.(*cluster.Affiliate).TypedSpec()
suite.Assert().Equal("7x1SuC8Ege5BGXdAfTEff5iQnlWZLfv9h1LGMxA2pYkC", spec.NodeID)
suite.Assert().Equal([]netaddr.IP{netaddr.MustParseIP("192.168.3.5")}, spec.Addresses)
suite.Assert().Equal("some.com", spec.Hostname)
suite.Assert().Equal("some", spec.Nodename)
suite.Assert().Equal(machine.TypeWorker, spec.MachineType)
suite.Assert().Equal("test OS", spec.OperatingSystem)
suite.Assert().Equal(netaddr.MustParseIP("fd50:8d60:4238:6302:f857:23ff:fe21:d1e1"), spec.KubeSpan.Address)
suite.Assert().Equal("1CXkdhWBm58c36kTpchR8iGlXHG1ruHa5W8gsFqD8Qs=", spec.KubeSpan.PublicKey)
suite.Assert().Equal([]netaddr.IPPrefix{netaddr.MustParseIPPrefix("10.244.4.1/24")}, spec.KubeSpan.AdditionalAddresses)
suite.Assert().Equal([]netaddr.IPPort{netaddr.MustParseIPPort("192.168.3.5:51820")}, spec.KubeSpan.Endpoints)
return nil
}),
))
cliCtxCancel()
suite.Assert().NoError(<-errCh)
}
func TestDiscoveryServiceSuite(t *testing.T) {
suite.Run(t, new(DiscoveryServiceSuite))
}

View File

@ -78,6 +78,7 @@ func (ctrl *Controller) Run(ctx context.Context) error {
},
&cluster.AffiliateMergeController{},
&cluster.ConfigController{},
&cluster.DiscoveryServiceController{},
&cluster.LocalAffiliateController{},
&cluster.MemberController{},
&cluster.KubernetesPullController{},

View File

@ -325,9 +325,17 @@ func (c ClusterDiscoveryConfig) Validate(clusterCfg *ClusterConfig) error {
}
if c.Registries().Service().Enabled() {
_, err := url.ParseRequestURI(c.Registries().Service().Endpoint())
url, err := url.ParseRequestURI(c.Registries().Service().Endpoint())
if err != nil {
result = multierror.Append(result, fmt.Errorf("cluster discovery service registry endpoint is invalid: %w", err))
} else {
if url.Scheme != "https" {
result = multierror.Append(result, fmt.Errorf("cluster discovery service should use TLS"))
}
if url.Path != "" && url.Path != "/" {
result = multierror.Append(result, fmt.Errorf("cluster discovery service path should be empty"))
}
}
if clusterCfg.ID() == "" {

View File

@ -11,6 +11,11 @@ import (
// Message is the main interface for protobuf API v2 messages.
type Message = proto.Message
// Equal reports whether two messages are equal.
func Equal(a, b Message) bool {
return proto.Equal(a, b)
}
// vtprotoMessage is the interface for vtproto additions.
//
// We use only a subset of that interface but include additional methods

View File

@ -27,8 +27,12 @@ type Config struct {
// ConfigSpec describes KubeSpan configuration..
type ConfigSpec struct {
DiscoveryEnabled bool `yaml:"discoveryEnabled"`
RegistryKubernetesEnabled bool `yaml:"registryKubernetesEnabled"`
DiscoveryEnabled bool `yaml:"discoveryEnabled"`
RegistryKubernetesEnabled bool `yaml:"registryKubernetesEnabled"`
RegistryServiceEnabled bool `yaml:"registryServiceEnabled"`
ServiceEndpoint string `yaml:"serviceEndpoint"`
ServiceEncryptionKey []byte `yaml:"serviceEncryptionKey"`
ServiceClusterID string `yaml:"serviceClusterID"`
}
// NewConfig initializes a Config resource.
@ -72,6 +76,7 @@ func (r *Config) ResourceDefinition() meta.ResourceDefinitionSpec {
Aliases: []resource.Type{},
DefaultNamespace: config.NamespaceName,
PrintColumns: []meta.PrintColumn{},
Sensitivity: meta.Sensitive,
}
}