feat: introduce siderolink config resource & reconnect

Introduce a new resource, `SiderolinkConfig`, to store SideroLink connection configuration (api endpoint for now).

Introduce a controller for this resource which populates it from the Kernel cmdline.

Rework the SideroLink `ManagerController` to take this new resource as input and reconfigure the link on changes.

Additionally, if the siderolink connection is lost, reconnect to it and reconfigure the links/addresses.

Closes siderolabs/talos#7142, siderolabs/talos#7143.

Signed-off-by: Utku Ozdemir <utku.ozdemir@siderolabs.com>
This commit is contained in:
Utku Ozdemir 2023-05-04 11:01:41 +02:00
parent 860002c735
commit 62c6e9655c
No known key found for this signature in database
GPG Key ID: 65933E76F0549B0D
14 changed files with 972 additions and 117 deletions

View File

@ -0,0 +1,11 @@
syntax = "proto3";
package talos.resource.definitions.siderolink;
option go_package = "github.com/siderolabs/talos/pkg/machinery/api/resource/definitions/siderolink";
// ConfigSpec describes KubeSpan configuration..
message ConfigSpec {
string api_endpoint = 1;
}

View File

@ -12,10 +12,11 @@ import (
"go4.org/netipx"
"golang.zx2c4.com/wireguard/wgctrl/wgtypes"
"github.com/siderolabs/talos/internal/app/machined/pkg/adapters/wireguard"
"github.com/siderolabs/talos/pkg/machinery/resources/kubespan"
)
// PeerStatusSpec adapter provides Wiregard integration and state management.
// PeerStatusSpec adapter provides Wireguard integration and state management.
//
//nolint:revive,golint
func PeerStatusSpec(r *kubespan.PeerStatusSpec) peerStatus {
@ -28,14 +29,6 @@ type peerStatus struct {
*kubespan.PeerStatusSpec
}
// PeerDownInterval is the time since last handshake when established peer is considered to be down.
//
// WG whitepaper defines a downed peer as being:
// Handshake Timeout (180s) + Rekey Timeout (5s) + Rekey Attempt Timeout (90s)
//
// This interval is applied when the link is already established.
const PeerDownInterval = (180 + 5 + 90) * time.Second
// EndpointConnectionTimeout is time to wait for initial handshake when the endpoint is just set.
const EndpointConnectionTimeout = 15 * time.Second
@ -52,7 +45,7 @@ const EndpointConnectionTimeout = 15 * time.Second
// | | |
// T0 T0+endpointConnectionTimeout T0+peerDownInterval
//
// Where T0 = LastEndpontChange
// Where T0 = LastEndpointChange
//
// The question is where is LastHandshakeTimeout vs. those points above:
//
@ -71,9 +64,9 @@ func (a peerStatus) CalculateState() {
// CalculateStateWithDurations calculates the state based on the time since events.
func (a peerStatus) CalculateStateWithDurations(sinceLastHandshake, sinceEndpointChange time.Duration) {
switch {
case sinceEndpointChange > PeerDownInterval: // past T0+peerDownInterval
case sinceEndpointChange > wireguard.PeerDownInterval: // past T0+peerDownInterval
// if we got handshake in the last peerDownInterval, endpoint is up
if sinceLastHandshake < PeerDownInterval {
if sinceLastHandshake < wireguard.PeerDownInterval {
a.PeerStatusSpec.State = kubespan.PeerStateUp
} else {
a.PeerStatusSpec.State = kubespan.PeerStateDown

View File

@ -13,6 +13,7 @@ import (
"github.com/stretchr/testify/assert"
kubespanadapter "github.com/siderolabs/talos/internal/app/machined/pkg/adapters/kubespan"
"github.com/siderolabs/talos/internal/app/machined/pkg/adapters/wireguard"
"github.com/siderolabs/talos/pkg/machinery/resources/kubespan"
)
@ -80,13 +81,13 @@ func TestPeerStatus_CalculateState(t *testing.T) {
},
{
name: "peer is down",
sinceLastHandshake: 2 * kubespanadapter.PeerDownInterval,
sinceEndpointChange: 2 * kubespanadapter.PeerDownInterval,
sinceLastHandshake: 2 * wireguard.PeerDownInterval,
sinceEndpointChange: 2 * wireguard.PeerDownInterval,
expectedState: kubespan.PeerStateDown,
},
{
name: "fresh peer, no handshake",
sinceLastHandshake: 2 * kubespanadapter.PeerDownInterval,
sinceLastHandshake: 2 * wireguard.PeerDownInterval,
sinceEndpointChange: kubespanadapter.EndpointConnectionTimeout / 2,
expectedState: kubespan.PeerStateUnknown,
},
@ -110,8 +111,8 @@ func TestPeerStatus_CalculateState(t *testing.T) {
},
{
name: "established peer, up",
sinceLastHandshake: kubespanadapter.PeerDownInterval / 2,
sinceEndpointChange: kubespanadapter.PeerDownInterval + 1,
sinceLastHandshake: wireguard.PeerDownInterval / 2,
sinceEndpointChange: wireguard.PeerDownInterval + 1,
expectedState: kubespan.PeerStateUp,
},
} {

View File

@ -0,0 +1,16 @@
// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this
// file, You can obtain one at http://mozilla.org/MPL/2.0/.
// Package wireguard implements common wireguard functionality.
package wireguard
import "time"
// PeerDownInterval is the time since last handshake when established peer is considered to be down.
//
// WG whitepaper defines a downed peer as being:
// Handshake Timeout (180s) + Rekey Timeout (5s) + Rekey Attempt Timeout (90s)
//
// This interval is applied when the link is already established.
const PeerDownInterval = (180 + 5 + 90) * time.Second

View File

@ -0,0 +1,88 @@
// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this
// file, You can obtain one at http://mozilla.org/MPL/2.0/.
package siderolink
import (
"context"
"fmt"
"github.com/cosi-project/runtime/pkg/controller"
"github.com/cosi-project/runtime/pkg/safe"
"github.com/cosi-project/runtime/pkg/state"
"github.com/siderolabs/go-procfs/procfs"
"go.uber.org/zap"
"github.com/siderolabs/talos/pkg/machinery/constants"
"github.com/siderolabs/talos/pkg/machinery/resources/config"
"github.com/siderolabs/talos/pkg/machinery/resources/siderolink"
)
// ConfigController interacts with SideroLink API and brings up the SideroLink Wireguard interface.
type ConfigController struct {
Cmdline *procfs.Cmdline
}
// Name implements controller.Controller interface.
func (ctrl *ConfigController) Name() string {
return "siderolink.ConfigController"
}
// Inputs implements controller.Controller interface.
func (ctrl *ConfigController) Inputs() []controller.Input {
return nil
}
// Outputs implements controller.Controller interface.
func (ctrl *ConfigController) Outputs() []controller.Output {
return []controller.Output{
{
Type: siderolink.ConfigType,
Kind: controller.OutputExclusive,
},
}
}
// Run implements controller.Controller interface.
func (ctrl *ConfigController) Run(ctx context.Context, r controller.Runtime, _ *zap.Logger) error {
for {
select {
case <-ctx.Done():
return nil
case <-r.EventCh():
}
if err := ctrl.updateConfig(ctx, r); err != nil {
return fmt.Errorf("failed to update config: %w", err)
}
}
}
func (ctrl *ConfigController) updateConfig(ctx context.Context, r controller.Runtime) error {
cfg := siderolink.NewConfig(config.NamespaceName, siderolink.ConfigID)
endpoint := ctrl.apiEndpoint()
if endpoint == "" {
err := r.Destroy(ctx, cfg.Metadata())
if err != nil && !state.IsNotFoundError(err) {
return err
}
return nil
}
return safe.WriterModify(ctx, r, cfg, func(c *siderolink.Config) error {
c.TypedSpec().APIEndpoint = endpoint
return nil
})
}
func (ctrl *ConfigController) apiEndpoint() string {
if ctrl.Cmdline == nil || ctrl.Cmdline.Get(constants.KernelParamSideroLink).First() == nil {
return ""
}
return *ctrl.Cmdline.Get(constants.KernelParamSideroLink).First()
}

View File

@ -8,34 +8,38 @@ import (
"bytes"
"context"
"crypto/tls"
"errors"
"fmt"
"net/netip"
"net/url"
"os"
"regexp"
"time"
"github.com/cosi-project/runtime/pkg/controller"
"github.com/cosi-project/runtime/pkg/resource"
"github.com/cosi-project/runtime/pkg/safe"
"github.com/cosi-project/runtime/pkg/state"
"github.com/siderolabs/go-pointer"
"github.com/siderolabs/go-procfs/procfs"
pb "github.com/siderolabs/siderolink/api/siderolink"
"github.com/siderolabs/siderolink/pkg/wireguard"
"go.uber.org/zap"
"golang.zx2c4.com/wireguard/wgctrl"
"golang.zx2c4.com/wireguard/wgctrl/wgtypes"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/credentials/insecure"
"github.com/siderolabs/talos/internal/pkg/smbios"
"github.com/siderolabs/talos/pkg/machinery/constants"
"github.com/siderolabs/talos/pkg/machinery/nethelpers"
"github.com/siderolabs/talos/pkg/machinery/resources/config"
"github.com/siderolabs/talos/pkg/machinery/resources/hardware"
"github.com/siderolabs/talos/pkg/machinery/resources/network"
"github.com/siderolabs/talos/pkg/machinery/resources/siderolink"
)
// ManagerController interacts with SideroLink API and brings up the SideroLink Wireguard interface.
type ManagerController struct {
Cmdline *procfs.Cmdline
nodeKey wgtypes.Key
}
@ -46,14 +50,7 @@ func (ctrl *ManagerController) Name() string {
// Inputs implements controller.Controller interface.
func (ctrl *ManagerController) Inputs() []controller.Input {
return []controller.Input{
{
Namespace: network.NamespaceName,
Type: network.StatusType,
ID: pointer.To(network.StatusID),
Kind: controller.InputWeak,
},
}
return nil
}
// Outputs implements controller.Controller interface.
@ -70,7 +67,7 @@ func (ctrl *ManagerController) Outputs() []controller.Output {
}
}
var urlSchemeMatcher = regexp.MustCompile(`[a-zA-z]+\://`)
var urlSchemeMatcher = regexp.MustCompile(`[a-zA-z]+://`)
type apiEndpoint struct {
Host string
@ -110,51 +107,20 @@ func parseAPIEndpoint(sideroLinkParam string) (apiEndpoint, error) {
// Run implements controller.Controller interface.
//
//nolint:gocyclo
//nolint:gocyclo,cyclop
func (ctrl *ManagerController) Run(ctx context.Context, r controller.Runtime, logger *zap.Logger) error {
if ctrl.Cmdline == nil || ctrl.Cmdline.Get(constants.KernelParamSideroLink).First() == nil {
// no SideroLink command line argument, skip controller
return nil
// initially, wait for the network address status to be ready
if err := r.UpdateInputs([]controller.Input{
{
Namespace: network.NamespaceName,
Type: network.StatusType,
ID: pointer.To(network.StatusID),
Kind: controller.InputWeak,
},
}); err != nil {
return err
}
s, err := smbios.GetSMBIOSInfo()
if err != nil {
return fmt.Errorf("error reading node UUID: %w", err)
}
nodeUUID := s.SystemInformation.UUID
var zeroKey wgtypes.Key
if bytes.Equal(ctrl.nodeKey[:], zeroKey[:]) {
ctrl.nodeKey, err = wgtypes.GeneratePrivateKey()
if err != nil {
return fmt.Errorf("error generating Wireguard key: %w", err)
}
}
apiEndpoint := *ctrl.Cmdline.Get(constants.KernelParamSideroLink).First()
parsedEndpoint, err := parseAPIEndpoint(apiEndpoint)
if err != nil {
logger.Warn("failed to parse siderolink kernel parameter", zap.Error(err))
}
var transportCredentials credentials.TransportCredentials
if parsedEndpoint.Insecure {
transportCredentials = insecure.NewCredentials()
} else {
transportCredentials = credentials.NewTLS(&tls.Config{})
}
conn, err := grpc.DialContext(ctx, parsedEndpoint.Host, grpc.WithTransportCredentials(transportCredentials))
if err != nil {
return fmt.Errorf("error dialing SideroLink endpoint %q: %w", apiEndpoint, err)
}
sideroLinkClient := pb.NewProvisionServiceClient(conn)
for {
select {
case <-ctx.Done():
@ -162,7 +128,7 @@ func (ctrl *ManagerController) Run(ctx context.Context, r controller.Runtime, lo
case <-r.EventCh():
}
netStatus, err := r.Get(ctx, resource.NewMetadata(network.NamespaceName, network.StatusType, network.StatusID, resource.VersionUndefined))
netStatus, err := safe.ReaderGet[*network.Status](ctx, r, network.NewStatus(network.NamespaceName, network.StatusID).Metadata())
if err != nil {
if state.IsNotFoundError(err) {
// no network state yet
@ -172,18 +138,143 @@ func (ctrl *ManagerController) Run(ctx context.Context, r controller.Runtime, lo
return fmt.Errorf("error reading network status: %w", err)
}
if !netStatus.(*network.Status).TypedSpec().AddressReady {
if !netStatus.TypedSpec().AddressReady {
// wait for address
continue
}
resp, err := sideroLinkClient.Provision(ctx, &pb.ProvisionRequest{
NodeUuid: nodeUUID,
NodePublicKey: ctrl.nodeKey.PublicKey().String(),
JoinToken: parsedEndpoint.JoinToken,
})
break
}
// normal reconcile loop
if err := r.UpdateInputs([]controller.Input{
{
Namespace: config.NamespaceName,
Type: siderolink.ConfigType,
ID: pointer.To(siderolink.ConfigID),
Kind: controller.InputWeak,
},
{
Namespace: hardware.NamespaceName,
Type: hardware.SystemInformationType,
ID: pointer.To(hardware.SystemInformationID),
Kind: controller.InputWeak,
},
}); err != nil {
return err
}
r.QueueReconcile()
wgClient, wgClientErr := wgctrl.New()
if wgClientErr != nil {
return wgClientErr
}
defer func() {
if closeErr := wgClient.Close(); closeErr != nil {
logger.Error("failed to close wg client", zap.Error(closeErr))
}
}()
var zeroKey wgtypes.Key
if bytes.Equal(ctrl.nodeKey[:], zeroKey[:]) {
var err error
ctrl.nodeKey, err = wgtypes.GeneratePrivateKey()
if err != nil {
return fmt.Errorf("error accessing SideroLink API: %w", err)
return fmt.Errorf("error generating Wireguard key: %w", err)
}
}
ticker := time.NewTicker(30 * time.Second)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return nil
case <-ticker.C:
reconnect, err := ctrl.shouldReconnect(wgClient)
if err != nil {
return err
}
if !reconnect {
// nothing to do
continue
}
case <-r.EventCh():
}
cfg, err := safe.ReaderGet[*siderolink.Config](ctx, r, siderolink.NewConfig(config.NamespaceName, siderolink.ConfigID).Metadata())
if err != nil {
if state.IsNotFoundError(err) {
if cleanupErr := ctrl.cleanup(ctx, r, nil, nil, logger); cleanupErr != nil {
return fmt.Errorf("failed to do cleanup: %w", cleanupErr)
}
// no config
continue
}
return fmt.Errorf("failed to get siderolink config: %w", err)
}
sysInfo, err := safe.ReaderGet[*hardware.SystemInformation](ctx, r, hardware.NewSystemInformation(hardware.SystemInformationID).Metadata())
if err != nil {
if state.IsNotFoundError(err) {
// no system information
continue
}
return fmt.Errorf("failed to get system information: %w", err)
}
nodeUUID := sysInfo.TypedSpec().UUID
endpoint := cfg.TypedSpec().APIEndpoint
parsedEndpoint, err := parseAPIEndpoint(endpoint)
if err != nil {
return fmt.Errorf("failed to parse siderolink endpoint: %w", err)
}
var transportCredentials credentials.TransportCredentials
if parsedEndpoint.Insecure {
transportCredentials = insecure.NewCredentials()
} else {
transportCredentials = credentials.NewTLS(&tls.Config{})
}
provision := func() (*pb.ProvisionResponse, error) {
connCtx, connCtxCancel := context.WithTimeout(ctx, 10*time.Second)
defer connCtxCancel()
conn, connErr := grpc.DialContext(connCtx, parsedEndpoint.Host, grpc.WithTransportCredentials(transportCredentials))
if connErr != nil {
return nil, fmt.Errorf("error dialing SideroLink endpoint %q: %w", endpoint, connErr)
}
defer func() {
if closeErr := conn.Close(); closeErr != nil {
logger.Error("failed to close SideroLink provisioning GRPC connection", zap.Error(closeErr))
}
}()
sideroLinkClient := pb.NewProvisionServiceClient(conn)
return sideroLinkClient.Provision(ctx, &pb.ProvisionRequest{
NodeUuid: nodeUUID,
NodePublicKey: ctrl.nodeKey.PublicKey().String(),
JoinToken: parsedEndpoint.JoinToken,
})
}
resp, err := provision()
if err != nil {
return err
}
serverAddress, err := netip.ParseAddr(resp.ServerAddress)
@ -196,10 +287,12 @@ func (ctrl *ManagerController) Run(ctx context.Context, r controller.Runtime, lo
return fmt.Errorf("error parsing node address: %w", err)
}
if err = r.Modify(ctx,
network.NewLinkSpec(network.ConfigNamespaceName, network.LayeredID(network.ConfigOperator, network.LinkID(constants.SideroLinkName))),
func(r resource.Resource) error {
spec := r.(*network.LinkSpec).TypedSpec()
linkSpec := network.NewLinkSpec(network.ConfigNamespaceName, network.LayeredID(network.ConfigOperator, network.LinkID(constants.SideroLinkName)))
addressSpec := network.NewAddressSpec(network.ConfigNamespaceName, network.LayeredID(network.ConfigOperator, network.AddressID(constants.SideroLinkName, nodeAddress)))
if err = safe.WriterModify(ctx, r, linkSpec,
func(res *network.LinkSpec) error {
spec := res.TypedSpec()
spec.ConfigLayer = network.ConfigOperator
spec.Name = constants.SideroLinkName
@ -231,10 +324,9 @@ func (ctrl *ManagerController) Run(ctx context.Context, r controller.Runtime, lo
return fmt.Errorf("error creating siderolink spec: %w", err)
}
if err = r.Modify(ctx,
network.NewAddressSpec(network.ConfigNamespaceName, network.LayeredID(network.ConfigOperator, network.AddressID(constants.SideroLinkName, nodeAddress))),
func(r resource.Resource) error {
spec := r.(*network.AddressSpec).TypedSpec()
if err = safe.WriterModify(ctx, r, addressSpec,
func(res *network.AddressSpec) error {
spec := res.TypedSpec()
spec.ConfigLayer = network.ConfigOperator
spec.Address = nodeAddress
@ -248,7 +340,113 @@ func (ctrl *ManagerController) Run(ctx context.Context, r controller.Runtime, lo
return fmt.Errorf("error creating address spec: %w", err)
}
// all done, terminate controller
return nil
keepLinkSpecSet := map[resource.ID]struct{}{
linkSpec.Metadata().ID(): {},
}
keepAddressSpecSet := map[resource.ID]struct{}{
addressSpec.Metadata().ID(): {},
}
if err = ctrl.cleanup(ctx, r, keepLinkSpecSet, keepAddressSpecSet, logger); err != nil {
return err
}
logger.Info(
"siderolink connection configured",
zap.String("endpoint", endpoint),
zap.String("node_uuid", nodeUUID),
zap.String("node_address", nodeAddress.String()),
)
}
}
func (ctrl *ManagerController) cleanup(
ctx context.Context,
r controller.Runtime,
keepLinkSpecIDSet, keepAddressSpecIDSet map[resource.ID]struct{},
logger *zap.Logger,
) error {
if err := ctrl.cleanupLinkSpecs(ctx, r, keepLinkSpecIDSet, logger); err != nil {
return err
}
return ctrl.cleanupAddressSpecs(ctx, r, keepAddressSpecIDSet, logger)
}
//nolint:dupl
func (ctrl *ManagerController) cleanupLinkSpecs(ctx context.Context, r controller.Runtime, keepSet map[resource.ID]struct{}, logger *zap.Logger) error {
list, err := safe.ReaderList[*network.LinkSpec](ctx, r, network.NewLinkSpec(network.ConfigNamespaceName, "").Metadata())
if err != nil {
return err
}
for iter := safe.IteratorFromList(list); iter.Next(); {
link := iter.Value()
if link.Metadata().Owner() != ctrl.Name() {
continue
}
if _, ok := keepSet[link.Metadata().ID()]; ok {
continue
}
if destroyErr := r.Destroy(ctx, link.Metadata()); destroyErr != nil && !state.IsNotFoundError(destroyErr) {
return destroyErr
}
logger.Info("destroyed link spec", zap.String("link_id", link.Metadata().ID()))
}
return nil
}
//nolint:dupl
func (ctrl *ManagerController) cleanupAddressSpecs(ctx context.Context, r controller.Runtime, keepSet map[resource.ID]struct{}, logger *zap.Logger) error {
list, err := safe.ReaderList[*network.AddressSpec](ctx, r, network.NewAddressSpec(network.ConfigNamespaceName, "").Metadata())
if err != nil {
return err
}
for iter := safe.IteratorFromList(list); iter.Next(); {
address := iter.Value()
if address.Metadata().Owner() != ctrl.Name() {
continue
}
if _, ok := keepSet[address.Metadata().ID()]; ok {
continue
}
if destroyErr := r.Destroy(ctx, address.Metadata()); destroyErr != nil && !state.IsNotFoundError(destroyErr) {
return destroyErr
}
logger.Info("destroyed address spec", zap.String("address_id", address.Metadata().ID()))
}
return nil
}
func (ctrl *ManagerController) shouldReconnect(wgClient *wgctrl.Client) (bool, error) {
wgDevice, err := wgClient.Device(constants.SideroLinkName)
if err != nil {
if errors.Is(err, os.ErrNotExist) {
// no Wireguard device, so no need to reconnect
return false, nil
}
return false, fmt.Errorf("error reading Wireguard device: %w", err)
}
if len(wgDevice.Peers) != 1 {
return false, fmt.Errorf("unexpected number of Wireguard peers: %d", len(wgDevice.Peers))
}
peer := wgDevice.Peers[0]
since := time.Since(peer.LastHandshakeTime)
return since >= wireguard.PeerDownInterval, nil
}

View File

@ -12,7 +12,6 @@ import (
"testing"
"time"
"github.com/cosi-project/runtime/pkg/resource"
"github.com/cosi-project/runtime/pkg/state"
"github.com/siderolabs/go-pointer"
"github.com/siderolabs/go-procfs/procfs"
@ -26,7 +25,10 @@ import (
siderolinkctrl "github.com/siderolabs/talos/internal/app/machined/pkg/controllers/siderolink"
"github.com/siderolabs/talos/pkg/machinery/constants"
"github.com/siderolabs/talos/pkg/machinery/nethelpers"
"github.com/siderolabs/talos/pkg/machinery/resources/config"
"github.com/siderolabs/talos/pkg/machinery/resources/hardware"
"github.com/siderolabs/talos/pkg/machinery/resources/network"
"github.com/siderolabs/talos/pkg/machinery/resources/siderolink"
)
func TestManagerSuite(t *testing.T) {
@ -43,10 +45,10 @@ func TestManagerSuite(t *testing.T) {
}()
cmdline := procfs.NewCmdline(fmt.Sprintf("%s=%s", constants.KernelParamSideroLink, lis.Addr().String()))
configController := siderolinkctrl.ConfigController{Cmdline: cmdline}
suite.Require().NoError(suite.Runtime().RegisterController(&siderolinkctrl.ManagerController{
Cmdline: cmdline,
}))
suite.Require().NoError(suite.Runtime().RegisterController(&siderolinkctrl.ManagerController{}))
suite.Require().NoError(suite.Runtime().RegisterController(&configController))
}
suite.Run(t, &m)
@ -68,7 +70,7 @@ const (
mockNodeAddressPrefix = "fdae:41e4:649b:9303:2a07:9c7:5b08:aef7/64"
)
func (srv mockServer) Provision(ctx context.Context, req *pb.ProvisionRequest) (*pb.ProvisionResponse, error) {
func (srv mockServer) Provision(_ context.Context, _ *pb.ProvisionRequest) (*pb.ProvisionResponse, error) {
return &pb.ProvisionResponse{
ServerEndpoint: mockServerEndpoint,
ServerAddress: mockServerAddress,
@ -83,21 +85,18 @@ func (suite *ManagerSuite) TestReconcile() {
suite.Require().NoError(suite.State().Create(suite.Ctx(), networkStatus))
systemInformation := hardware.NewSystemInformation(hardware.SystemInformationID)
systemInformation.TypedSpec().UUID = "71233efd-7a07-43f8-b6ba-da90fae0e88b"
suite.Require().NoError(suite.State().Create(suite.Ctx(), systemInformation))
nodeAddress := netip.MustParsePrefix(mockNodeAddressPrefix)
addressSpec := network.NewAddressSpec(network.ConfigNamespaceName, network.LayeredID(network.ConfigOperator, network.AddressID(constants.SideroLinkName, nodeAddress)))
linkSpec := network.NewLinkSpec(network.ConfigNamespaceName, network.LayeredID(network.ConfigOperator, network.LinkID(constants.SideroLinkName)))
suite.AssertWithin(10*time.Second, 100*time.Millisecond, func() error {
addressResource, err := ctest.Get[*network.AddressSpec](
suite,
resource.NewMetadata(
network.ConfigNamespaceName,
network.AddressSpecType,
network.LayeredID(
network.ConfigOperator,
network.AddressID(constants.SideroLinkName, nodeAddress),
),
resource.VersionUndefined,
),
)
addressResource, err := ctest.Get[*network.AddressSpec](suite, addressSpec.Metadata())
if err != nil {
if state.IsNotFoundError(err) {
return retry.ExpectedError(err)
@ -113,15 +112,7 @@ func (suite *ManagerSuite) TestReconcile() {
suite.Assert().Equal(nethelpers.FamilyInet6, address.Family)
suite.Assert().Equal(constants.SideroLinkName, address.LinkName)
linkResource, err := ctest.Get[*network.LinkSpec](
suite,
resource.NewMetadata(
network.ConfigNamespaceName,
network.LinkSpecType,
network.LayeredID(network.ConfigOperator, network.LinkID(constants.SideroLinkName)),
resource.VersionUndefined,
),
)
linkResource, err := ctest.Get[*network.LinkSpec](suite, linkSpec.Metadata())
if err != nil {
if state.IsNotFoundError(err) {
return retry.ExpectedError(err)
@ -153,6 +144,30 @@ func (suite *ManagerSuite) TestReconcile() {
return nil
})
// remove config
configPtr := siderolink.NewConfig(config.NamespaceName, siderolink.ConfigID).Metadata()
destroyErr := suite.State().Destroy(suite.Ctx(), configPtr,
state.WithDestroyOwner(pointer.To(siderolinkctrl.ConfigController{}).Name()))
suite.Require().NoError(destroyErr)
suite.AssertWithin(10*time.Second, 100*time.Millisecond, func() error {
_, err := ctest.Get[*network.LinkSpec](suite, linkSpec.Metadata())
if err == nil {
return retry.ExpectedError(fmt.Errorf("link resource still exists"))
}
suite.Assert().Truef(state.IsNotFoundError(err), "unexpected error: %v", err)
_, err = ctest.Get[*network.AddressSpec](suite, addressSpec.Metadata())
if err == nil {
return retry.ExpectedError(fmt.Errorf("address resource still exists"))
}
suite.Assert().Truef(state.IsNotFoundError(err), "unexpected error: %v", err)
return nil
})
}
func TestParseJoinToken(t *testing.T) {

View File

@ -251,9 +251,10 @@ func (ctrl *Controller) Run(ctx context.Context, drainer *runtime.Drainer) error
&secrets.KubernetesController{},
&secrets.RootController{},
&secrets.TrustdController{},
&siderolink.ManagerController{
&siderolink.ConfigController{
Cmdline: procfs.ProcCmdline(),
},
&siderolink.ManagerController{},
&timecontrollers.SyncController{
V1Alpha1Mode: ctrl.v1alpha1Runtime.State().Platform().Mode(),
},

View File

@ -27,6 +27,7 @@ import (
"github.com/siderolabs/talos/pkg/machinery/resources/perf"
"github.com/siderolabs/talos/pkg/machinery/resources/runtime"
"github.com/siderolabs/talos/pkg/machinery/resources/secrets"
"github.com/siderolabs/talos/pkg/machinery/resources/siderolink"
"github.com/siderolabs/talos/pkg/machinery/resources/time"
"github.com/siderolabs/talos/pkg/machinery/resources/v1alpha1"
)
@ -181,6 +182,7 @@ func NewState() (*State, error) {
&secrets.KubernetesRoot{},
&secrets.OSRoot{},
&secrets.Trustd{},
&siderolink.Config{},
&time.Status{},
} {
if err := s.resourceRegistry.Register(ctx, r); err != nil {

View File

@ -0,0 +1,153 @@
// Code generated by protoc-gen-go. DO NOT EDIT.
// versions:
// protoc-gen-go v1.30.0
// protoc v4.22.2
// source: resource/definitions/siderolink/siderolink.proto
package siderolink
import (
reflect "reflect"
sync "sync"
protoreflect "google.golang.org/protobuf/reflect/protoreflect"
protoimpl "google.golang.org/protobuf/runtime/protoimpl"
)
const (
// Verify that this generated code is sufficiently up-to-date.
_ = protoimpl.EnforceVersion(20 - protoimpl.MinVersion)
// Verify that runtime/protoimpl is sufficiently up-to-date.
_ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20)
)
// ConfigSpec describes KubeSpan configuration..
type ConfigSpec struct {
state protoimpl.MessageState
sizeCache protoimpl.SizeCache
unknownFields protoimpl.UnknownFields
ApiEndpoint string `protobuf:"bytes,1,opt,name=api_endpoint,json=apiEndpoint,proto3" json:"api_endpoint,omitempty"`
}
func (x *ConfigSpec) Reset() {
*x = ConfigSpec{}
if protoimpl.UnsafeEnabled {
mi := &file_resource_definitions_siderolink_siderolink_proto_msgTypes[0]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
}
func (x *ConfigSpec) String() string {
return protoimpl.X.MessageStringOf(x)
}
func (*ConfigSpec) ProtoMessage() {}
func (x *ConfigSpec) ProtoReflect() protoreflect.Message {
mi := &file_resource_definitions_siderolink_siderolink_proto_msgTypes[0]
if protoimpl.UnsafeEnabled && x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
ms.StoreMessageInfo(mi)
}
return ms
}
return mi.MessageOf(x)
}
// Deprecated: Use ConfigSpec.ProtoReflect.Descriptor instead.
func (*ConfigSpec) Descriptor() ([]byte, []int) {
return file_resource_definitions_siderolink_siderolink_proto_rawDescGZIP(), []int{0}
}
func (x *ConfigSpec) GetApiEndpoint() string {
if x != nil {
return x.ApiEndpoint
}
return ""
}
var File_resource_definitions_siderolink_siderolink_proto protoreflect.FileDescriptor
var file_resource_definitions_siderolink_siderolink_proto_rawDesc = []byte{
0x0a, 0x30, 0x72, 0x65, 0x73, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x2f, 0x64, 0x65, 0x66, 0x69, 0x6e,
0x69, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x2f, 0x73, 0x69, 0x64, 0x65, 0x72, 0x6f, 0x6c, 0x69, 0x6e,
0x6b, 0x2f, 0x73, 0x69, 0x64, 0x65, 0x72, 0x6f, 0x6c, 0x69, 0x6e, 0x6b, 0x2e, 0x70, 0x72, 0x6f,
0x74, 0x6f, 0x12, 0x25, 0x74, 0x61, 0x6c, 0x6f, 0x73, 0x2e, 0x72, 0x65, 0x73, 0x6f, 0x75, 0x72,
0x63, 0x65, 0x2e, 0x64, 0x65, 0x66, 0x69, 0x6e, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x2e, 0x73,
0x69, 0x64, 0x65, 0x72, 0x6f, 0x6c, 0x69, 0x6e, 0x6b, 0x22, 0x2f, 0x0a, 0x0a, 0x43, 0x6f, 0x6e,
0x66, 0x69, 0x67, 0x53, 0x70, 0x65, 0x63, 0x12, 0x21, 0x0a, 0x0c, 0x61, 0x70, 0x69, 0x5f, 0x65,
0x6e, 0x64, 0x70, 0x6f, 0x69, 0x6e, 0x74, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0b, 0x61,
0x70, 0x69, 0x45, 0x6e, 0x64, 0x70, 0x6f, 0x69, 0x6e, 0x74, 0x42, 0x4f, 0x5a, 0x4d, 0x67, 0x69,
0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x73, 0x69, 0x64, 0x65, 0x72, 0x6f, 0x6c,
0x61, 0x62, 0x73, 0x2f, 0x74, 0x61, 0x6c, 0x6f, 0x73, 0x2f, 0x70, 0x6b, 0x67, 0x2f, 0x6d, 0x61,
0x63, 0x68, 0x69, 0x6e, 0x65, 0x72, 0x79, 0x2f, 0x61, 0x70, 0x69, 0x2f, 0x72, 0x65, 0x73, 0x6f,
0x75, 0x72, 0x63, 0x65, 0x2f, 0x64, 0x65, 0x66, 0x69, 0x6e, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x73,
0x2f, 0x73, 0x69, 0x64, 0x65, 0x72, 0x6f, 0x6c, 0x69, 0x6e, 0x6b, 0x62, 0x06, 0x70, 0x72, 0x6f,
0x74, 0x6f, 0x33,
}
var (
file_resource_definitions_siderolink_siderolink_proto_rawDescOnce sync.Once
file_resource_definitions_siderolink_siderolink_proto_rawDescData = file_resource_definitions_siderolink_siderolink_proto_rawDesc
)
func file_resource_definitions_siderolink_siderolink_proto_rawDescGZIP() []byte {
file_resource_definitions_siderolink_siderolink_proto_rawDescOnce.Do(func() {
file_resource_definitions_siderolink_siderolink_proto_rawDescData = protoimpl.X.CompressGZIP(file_resource_definitions_siderolink_siderolink_proto_rawDescData)
})
return file_resource_definitions_siderolink_siderolink_proto_rawDescData
}
var file_resource_definitions_siderolink_siderolink_proto_msgTypes = make([]protoimpl.MessageInfo, 1)
var file_resource_definitions_siderolink_siderolink_proto_goTypes = []interface{}{
(*ConfigSpec)(nil), // 0: talos.resource.definitions.siderolink.ConfigSpec
}
var file_resource_definitions_siderolink_siderolink_proto_depIdxs = []int32{
0, // [0:0] is the sub-list for method output_type
0, // [0:0] is the sub-list for method input_type
0, // [0:0] is the sub-list for extension type_name
0, // [0:0] is the sub-list for extension extendee
0, // [0:0] is the sub-list for field type_name
}
func init() { file_resource_definitions_siderolink_siderolink_proto_init() }
func file_resource_definitions_siderolink_siderolink_proto_init() {
if File_resource_definitions_siderolink_siderolink_proto != nil {
return
}
if !protoimpl.UnsafeEnabled {
file_resource_definitions_siderolink_siderolink_proto_msgTypes[0].Exporter = func(v interface{}, i int) interface{} {
switch v := v.(*ConfigSpec); i {
case 0:
return &v.state
case 1:
return &v.sizeCache
case 2:
return &v.unknownFields
default:
return nil
}
}
}
type x struct{}
out := protoimpl.TypeBuilder{
File: protoimpl.DescBuilder{
GoPackagePath: reflect.TypeOf(x{}).PkgPath(),
RawDescriptor: file_resource_definitions_siderolink_siderolink_proto_rawDesc,
NumEnums: 0,
NumMessages: 1,
NumExtensions: 0,
NumServices: 0,
},
GoTypes: file_resource_definitions_siderolink_siderolink_proto_goTypes,
DependencyIndexes: file_resource_definitions_siderolink_siderolink_proto_depIdxs,
MessageInfos: file_resource_definitions_siderolink_siderolink_proto_msgTypes,
}.Build()
File_resource_definitions_siderolink_siderolink_proto = out.File
file_resource_definitions_siderolink_siderolink_proto_rawDesc = nil
file_resource_definitions_siderolink_siderolink_proto_goTypes = nil
file_resource_definitions_siderolink_siderolink_proto_depIdxs = nil
}

View File

@ -0,0 +1,260 @@
// Code generated by protoc-gen-go-vtproto. DO NOT EDIT.
// protoc-gen-go-vtproto version: v0.4.0
// source: resource/definitions/siderolink/siderolink.proto
package siderolink
import (
fmt "fmt"
io "io"
bits "math/bits"
protoimpl "google.golang.org/protobuf/runtime/protoimpl"
)
const (
// Verify that this generated code is sufficiently up-to-date.
_ = protoimpl.EnforceVersion(20 - protoimpl.MinVersion)
// Verify that runtime/protoimpl is sufficiently up-to-date.
_ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20)
)
func (m *ConfigSpec) MarshalVT() (dAtA []byte, err error) {
if m == nil {
return nil, nil
}
size := m.SizeVT()
dAtA = make([]byte, size)
n, err := m.MarshalToSizedBufferVT(dAtA[:size])
if err != nil {
return nil, err
}
return dAtA[:n], nil
}
func (m *ConfigSpec) MarshalToVT(dAtA []byte) (int, error) {
size := m.SizeVT()
return m.MarshalToSizedBufferVT(dAtA[:size])
}
func (m *ConfigSpec) MarshalToSizedBufferVT(dAtA []byte) (int, error) {
if m == nil {
return 0, nil
}
i := len(dAtA)
_ = i
var l int
_ = l
if m.unknownFields != nil {
i -= len(m.unknownFields)
copy(dAtA[i:], m.unknownFields)
}
if len(m.ApiEndpoint) > 0 {
i -= len(m.ApiEndpoint)
copy(dAtA[i:], m.ApiEndpoint)
i = encodeVarint(dAtA, i, uint64(len(m.ApiEndpoint)))
i--
dAtA[i] = 0xa
}
return len(dAtA) - i, nil
}
func encodeVarint(dAtA []byte, offset int, v uint64) int {
offset -= sov(v)
base := offset
for v >= 1<<7 {
dAtA[offset] = uint8(v&0x7f | 0x80)
v >>= 7
offset++
}
dAtA[offset] = uint8(v)
return base
}
func (m *ConfigSpec) SizeVT() (n int) {
if m == nil {
return 0
}
var l int
_ = l
l = len(m.ApiEndpoint)
if l > 0 {
n += 1 + l + sov(uint64(l))
}
n += len(m.unknownFields)
return n
}
func sov(x uint64) (n int) {
return (bits.Len64(x|1) + 6) / 7
}
func soz(x uint64) (n int) {
return sov(uint64((x << 1) ^ uint64((int64(x) >> 63))))
}
func (m *ConfigSpec) UnmarshalVT(dAtA []byte) error {
l := len(dAtA)
iNdEx := 0
for iNdEx < l {
preIndex := iNdEx
var wire uint64
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflow
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
wire |= uint64(b&0x7F) << shift
if b < 0x80 {
break
}
}
fieldNum := int32(wire >> 3)
wireType := int(wire & 0x7)
if wireType == 4 {
return fmt.Errorf("proto: ConfigSpec: wiretype end group for non-group")
}
if fieldNum <= 0 {
return fmt.Errorf("proto: ConfigSpec: illegal tag %d (wire type %d)", fieldNum, wire)
}
switch fieldNum {
case 1:
if wireType != 2 {
return fmt.Errorf("proto: wrong wireType = %d for field ApiEndpoint", wireType)
}
var stringLen uint64
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflow
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
stringLen |= uint64(b&0x7F) << shift
if b < 0x80 {
break
}
}
intStringLen := int(stringLen)
if intStringLen < 0 {
return ErrInvalidLength
}
postIndex := iNdEx + intStringLen
if postIndex < 0 {
return ErrInvalidLength
}
if postIndex > l {
return io.ErrUnexpectedEOF
}
m.ApiEndpoint = string(dAtA[iNdEx:postIndex])
iNdEx = postIndex
default:
iNdEx = preIndex
skippy, err := skip(dAtA[iNdEx:])
if err != nil {
return err
}
if (skippy < 0) || (iNdEx+skippy) < 0 {
return ErrInvalidLength
}
if (iNdEx + skippy) > l {
return io.ErrUnexpectedEOF
}
m.unknownFields = append(m.unknownFields, dAtA[iNdEx:iNdEx+skippy]...)
iNdEx += skippy
}
}
if iNdEx > l {
return io.ErrUnexpectedEOF
}
return nil
}
func skip(dAtA []byte) (n int, err error) {
l := len(dAtA)
iNdEx := 0
depth := 0
for iNdEx < l {
var wire uint64
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return 0, ErrIntOverflow
}
if iNdEx >= l {
return 0, io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
wire |= (uint64(b) & 0x7F) << shift
if b < 0x80 {
break
}
}
wireType := int(wire & 0x7)
switch wireType {
case 0:
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return 0, ErrIntOverflow
}
if iNdEx >= l {
return 0, io.ErrUnexpectedEOF
}
iNdEx++
if dAtA[iNdEx-1] < 0x80 {
break
}
}
case 1:
iNdEx += 8
case 2:
var length int
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return 0, ErrIntOverflow
}
if iNdEx >= l {
return 0, io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
length |= (int(b) & 0x7F) << shift
if b < 0x80 {
break
}
}
if length < 0 {
return 0, ErrInvalidLength
}
iNdEx += length
case 3:
depth++
case 4:
if depth == 0 {
return 0, ErrUnexpectedEndOfGroup
}
depth--
case 5:
iNdEx += 4
default:
return 0, fmt.Errorf("proto: illegal wireType %d", wireType)
}
if iNdEx < 0 {
return 0, ErrInvalidLength
}
if depth == 0 {
return iNdEx, nil
}
}
return 0, io.ErrUnexpectedEOF
}
var (
ErrInvalidLength = fmt.Errorf("proto: negative length found during unmarshaling")
ErrIntOverflow = fmt.Errorf("proto: integer overflow")
ErrUnexpectedEndOfGroup = fmt.Errorf("proto: unexpected end of group")
)

View File

@ -0,0 +1,13 @@
// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this
// file, You can obtain one at http://mozilla.org/MPL/2.0/.
// Code generated by "deep-copy -type ConfigSpec -header-file ../../../../hack/boilerplate.txt -o deep_copy.generated.go ."; DO NOT EDIT.
package siderolink
// DeepCopy generates a deep copy of ConfigSpec.
func (o ConfigSpec) DeepCopy() ConfigSpec {
var cp ConfigSpec = o
return cp
}

View File

@ -0,0 +1,70 @@
// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this
// file, You can obtain one at http://mozilla.org/MPL/2.0/.
// Package siderolink contains SideroLink-related resources.
package siderolink
import (
"github.com/cosi-project/runtime/pkg/resource"
"github.com/cosi-project/runtime/pkg/resource/meta"
"github.com/cosi-project/runtime/pkg/resource/protobuf"
"github.com/cosi-project/runtime/pkg/resource/typed"
"github.com/siderolabs/talos/pkg/machinery/proto"
"github.com/siderolabs/talos/pkg/machinery/resources/config"
)
//nolint:lll
//go:generate deep-copy -type ConfigSpec -header-file ../../../../hack/boilerplate.txt -o deep_copy.generated.go .
// ConfigType is type of Config resource.
const ConfigType = resource.Type("SiderolinkConfigs.siderolink.talos.dev")
// ConfigID the singleton config resource ID.
const ConfigID = resource.ID("siderolink")
// Config resource holds KubeSpan configuration.
type Config = typed.Resource[ConfigSpec, ConfigExtension]
// ConfigSpec describes KubeSpan configuration..
//
//gotagsrewrite:gen
type ConfigSpec struct {
APIEndpoint string `yaml:"apiEndpoint" protobuf:"1"`
}
// NewConfig initializes a Config resource.
func NewConfig(namespace resource.Namespace, id resource.ID) *Config {
return typed.NewResource[ConfigSpec, ConfigExtension](
resource.NewMetadata(namespace, ConfigType, id, resource.VersionUndefined),
ConfigSpec{},
)
}
// ConfigExtension provides auxiliary methods for Config.
type ConfigExtension struct{}
// ResourceDefinition implements [typed.Extension] interface.
func (ConfigExtension) ResourceDefinition() meta.ResourceDefinitionSpec {
return meta.ResourceDefinitionSpec{
Type: ConfigType,
Aliases: []resource.Type{},
DefaultNamespace: config.NamespaceName,
PrintColumns: []meta.PrintColumn{
{
Name: "API Endpoint",
JSONPath: `{.apiEndpoint}`,
},
},
}
}
func init() {
proto.RegisterDefaultTypes()
err := protobuf.RegisterDynamic[ConfigSpec](ConfigType, &Config{})
if err != nil {
panic(err)
}
}

View File

@ -197,6 +197,9 @@ description: Talos gRPC API reference.
- [OSRootSpec](#talos.resource.definitions.secrets.OSRootSpec)
- [TrustdCertsSpec](#talos.resource.definitions.secrets.TrustdCertsSpec)
- [resource/definitions/siderolink/siderolink.proto](#resource/definitions/siderolink/siderolink.proto)
- [ConfigSpec](#talos.resource.definitions.siderolink.ConfigSpec)
- [resource/definitions/time/time.proto](#resource/definitions/time/time.proto)
- [StatusSpec](#talos.resource.definitions.time.StatusSpec)
@ -3568,6 +3571,37 @@ TrustdCertsSpec describes etcd certs secrets.
<!-- end messages -->
<!-- end enums -->
<!-- end HasExtensions -->
<!-- end services -->
<a name="resource/definitions/siderolink/siderolink.proto"></a>
<p align="right"><a href="#top">Top</a></p>
## resource/definitions/siderolink/siderolink.proto
<a name="talos.resource.definitions.siderolink.ConfigSpec"></a>
### ConfigSpec
ConfigSpec describes KubeSpan configuration..
| Field | Type | Label | Description |
| ----- | ---- | ----- | ----------- |
| api_endpoint | [string](#string) | | |
<!-- end messages -->
<!-- end enums -->