fix: make talosctl command return nonzero error codes if it had errors

Multinode requests were printing out the errors for each node to stderr,
but they didn't set the global error.

Refactor the code a bit to use a single function for handling that logic
to avoid rewriting it in many other places.

Signed-off-by: Artem Chernyshev <artem.chernyshev@talos-systems.com>
This commit is contained in:
Artem Chernyshev 2022-08-11 15:06:30 +03:00
parent dce923f747
commit 5c6648e3d2
No known key found for this signature in database
GPG Key ID: 9B9D0328B57B443F
17 changed files with 352 additions and 240 deletions

View File

@ -7,14 +7,13 @@ package talos
import (
"context"
"fmt"
"io"
"os"
"text/tabwriter"
humanize "github.com/dustin/go-humanize"
"github.com/spf13/cobra"
"google.golang.org/grpc/codes"
"github.com/talos-systems/talos/cmd/talosctl/pkg/talos/helpers"
machineapi "github.com/talos-systems/talos/pkg/machinery/api/machine"
"github.com/talos-systems/talos/pkg/machinery/client"
)
@ -62,17 +61,13 @@ var duCmd = &cobra.Command{
Paths: paths,
})
if err != nil {
return fmt.Errorf("error fetching logs: %s", err)
return fmt.Errorf("error fetching disk usage: %s", err)
}
addedHeader := false
defaultNode := client.RemotePeer(stream.Context())
w := tabwriter.NewWriter(os.Stdout, 0, 0, 3, ' ', 0)
multipleNodes := false
node := defaultNode
stringifySize := func(s int64) string {
if humanizeFlag {
return humanize.Bytes(uint64(s))
@ -81,20 +76,11 @@ var duCmd = &cobra.Command{
return fmt.Sprintf("%d", s)
}
for {
info, err := stream.Recv()
if err != nil {
if err == io.EOF || client.StatusCode(err) == codes.Canceled {
return w.Flush()
}
return fmt.Errorf("error streaming results: %s", err)
}
defer w.Flush() //nolint:errcheck
return helpers.ReadGRPCStream(stream, func(info *machineapi.DiskUsageInfo, node string, multipleNodes bool) error {
if info.Error != "" {
fmt.Fprintf(os.Stderr, "%s: error reading file %s: %s\n", node, info.Name, info.Error)
continue
return helpers.NonFatalError(fmt.Errorf(info.Error))
}
pattern := "%s\t%s\n"
@ -119,19 +105,15 @@ var duCmd = &cobra.Command{
addedHeader = true
}
if info.Metadata != nil && info.Metadata.Error != "" {
fmt.Fprintf(os.Stderr, "%s: %s\n", node, info.Metadata.Error)
continue
}
if multipleNodes {
pattern = "%s\t%s\t%s\n"
args = append([]interface{}{node}, args...)
}
fmt.Fprintf(w, pattern, args...)
}
return nil
})
})
},
}

View File

@ -7,12 +7,11 @@ package talos
import (
"context"
"fmt"
"io"
"os"
"github.com/spf13/cobra"
"google.golang.org/grpc/codes"
"github.com/talos-systems/talos/cmd/talosctl/pkg/talos/helpers"
"github.com/talos-systems/talos/pkg/machinery/api/common"
"github.com/talos-systems/talos/pkg/machinery/client"
)
@ -31,33 +30,13 @@ var dmesgCmd = &cobra.Command{
return fmt.Errorf("error getting dmesg: %w", err)
}
defaultNode := client.RemotePeer(stream.Context())
for {
resp, err := stream.Recv()
if err != nil {
if err == io.EOF || client.StatusCode(err) == codes.Canceled {
break
}
return fmt.Errorf("error reading from stream: %w", err)
return helpers.ReadGRPCStream(stream, func(data *common.Data, node string, multipleNodes bool) error {
if data.Bytes != nil {
fmt.Printf("%s: %s", node, data.Bytes)
}
node := defaultNode
if resp.Metadata != nil {
node = resp.Metadata.Hostname
if resp.Metadata.Error != "" {
fmt.Fprintf(os.Stderr, "%s: %s\n", node, resp.Metadata.Error)
}
}
if resp.Bytes != nil {
fmt.Printf("%s: %s", node, resp.Bytes)
}
}
return nil
return nil
})
})
},
}

View File

@ -6,6 +6,7 @@ package talos
import (
"context"
"errors"
"fmt"
"os"
"strings"
@ -14,6 +15,7 @@ import (
"github.com/spf13/cobra"
"github.com/talos-systems/talos/cmd/talosctl/pkg/talos/helpers"
"github.com/talos-systems/talos/pkg/machinery/api/machine"
"github.com/talos-systems/talos/pkg/machinery/client"
"github.com/talos-systems/talos/pkg/machinery/generic/slices"
@ -49,70 +51,64 @@ var eventsCmd = &cobra.Command{
opts = append(opts, client.WithTailID(eventsCmdFlags.tailID))
}
return c.EventsWatch(ctx, func(ch <-chan client.Event) {
for {
var (
event client.Event
ok bool
)
events, err := c.Events(ctx, opts...)
if err != nil {
return err
}
select {
case event, ok = <-ch:
if !ok {
return
}
case <-ctx.Done():
return
return helpers.ReadGRPCStream(events, func(ev *machine.Event, node string, multipleNodes bool) error {
format := "%s\t%s\t%s\t%s\t%s\n"
event, err := client.UnmarshalEvent(ev)
if err != nil {
if errors.Is(err, client.ErrEventNotSupported) {
return nil
}
format := "%s\t%s\t%s\t%s\t%s\n"
var args []interface{}
switch msg := event.Payload.(type) {
case *machine.SequenceEvent:
args = []interface{}{msg.GetSequence()}
if msg.Error != nil {
args = append(args, "error:"+" "+msg.GetError().GetMessage())
} else {
args = append(args, msg.GetAction().String())
}
case *machine.PhaseEvent:
args = []interface{}{msg.GetPhase(), msg.GetAction().String()}
case *machine.TaskEvent:
args = []interface{}{msg.GetTask(), msg.GetAction().String()}
case *machine.ServiceStateEvent:
args = []interface{}{msg.GetService(), fmt.Sprintf("%s: %s", msg.GetAction(), msg.GetMessage())}
case *machine.ConfigLoadErrorEvent:
args = []interface{}{"error", msg.GetError()}
case *machine.ConfigValidationErrorEvent:
args = []interface{}{"error", msg.GetError()}
case *machine.AddressEvent:
args = []interface{}{msg.GetHostname(), fmt.Sprintf("ADDRESSES: %s", strings.Join(msg.GetAddresses(), ","))}
case *machine.MachineStatusEvent:
args = []interface{}{
msg.GetStage().String(),
fmt.Sprintf("ready: %v, unmet conditions: %v",
msg.GetStatus().Ready,
slices.Map(msg.GetStatus().GetUnmetConditions(),
func(c *machine.MachineStatusEvent_MachineStatus_UnmetCondition) string {
return c.Name
},
),
),
}
default:
// We haven't implemented the handling of this event yet.
continue
}
args = append([]interface{}{event.Node, event.ID, event.TypeURL}, args...)
fmt.Fprintf(w, format, args...)
//nolint:errcheck
w.Flush()
return err
}
}, opts...)
var args []interface{}
switch msg := event.Payload.(type) {
case *machine.SequenceEvent:
args = []interface{}{msg.GetSequence()}
if msg.Error != nil {
args = append(args, "error:"+" "+msg.GetError().GetMessage())
} else {
args = append(args, msg.GetAction().String())
}
case *machine.PhaseEvent:
args = []interface{}{msg.GetPhase(), msg.GetAction().String()}
case *machine.TaskEvent:
args = []interface{}{msg.GetTask(), msg.GetAction().String()}
case *machine.ServiceStateEvent:
args = []interface{}{msg.GetService(), fmt.Sprintf("%s: %s", msg.GetAction(), msg.GetMessage())}
case *machine.ConfigLoadErrorEvent:
args = []interface{}{"error", msg.GetError()}
case *machine.ConfigValidationErrorEvent:
args = []interface{}{"error", msg.GetError()}
case *machine.AddressEvent:
args = []interface{}{msg.GetHostname(), fmt.Sprintf("ADDRESSES: %s", strings.Join(msg.GetAddresses(), ","))}
case *machine.MachineStatusEvent:
args = []interface{}{
msg.GetStage().String(),
fmt.Sprintf("ready: %v, unmet conditions: %v",
msg.GetStatus().Ready,
slices.Map(msg.GetStatus().GetUnmetConditions(),
func(c *machine.MachineStatusEvent_MachineStatus_UnmetCondition) string {
return c.Name
},
),
),
}
}
args = append([]interface{}{event.Node, event.ID, event.TypeURL}, args...)
fmt.Fprintf(w, format, args...)
return w.Flush()
})
})
},
}

View File

@ -7,7 +7,6 @@ package talos
import (
"context"
"fmt"
"io"
"os"
"strings"
"text/tabwriter"
@ -15,8 +14,8 @@ import (
humanize "github.com/dustin/go-humanize"
"github.com/spf13/cobra"
"google.golang.org/grpc/codes"
"github.com/talos-systems/talos/cmd/talosctl/pkg/talos/helpers"
machineapi "github.com/talos-systems/talos/pkg/machinery/api/machine"
"github.com/talos-systems/talos/pkg/machinery/client"
)
@ -89,44 +88,15 @@ var lsCmd = &cobra.Command{
return fmt.Errorf("error fetching logs: %s", err)
}
defaultNode := client.RemotePeer(stream.Context())
if !long {
w := tabwriter.NewWriter(os.Stdout, 0, 0, 3, ' ', 0)
fmt.Fprintln(w, "NODE\tNAME")
multipleNodes := false
node := defaultNode
for {
info, err := stream.Recv()
if err != nil {
if err == io.EOF || client.StatusCode(err) == codes.Canceled {
if multipleNodes {
return w.Flush()
}
return nil
}
return fmt.Errorf("error streaming results: %s", err)
}
if info.Metadata != nil && info.Metadata.Hostname != "" {
multipleNodes = true
node = info.Metadata.Hostname
}
if info.Metadata != nil && info.Metadata.Error != "" {
fmt.Fprintf(os.Stderr, "%s: %s\n", node, info.Metadata.Error)
continue
}
defer w.Flush() //nolint:errcheck
return helpers.ReadGRPCStream(stream, func(info *machineapi.FileInfo, node string, multipleNodes bool) error {
if info.Error != "" {
fmt.Fprintf(os.Stderr, "%s: error reading file %s: %s\n", node, info.Name, info.Error)
continue
return helpers.NonFatalError(fmt.Errorf("%s: error reading file %s: %s", node, info.Name, info.Error))
}
if !multipleNodes {
@ -138,36 +108,18 @@ var lsCmd = &cobra.Command{
)
}
}
return nil
})
}
w := tabwriter.NewWriter(os.Stdout, 0, 0, 3, ' ', 0)
defer w.Flush() //nolint:errcheck
fmt.Fprintln(w, "NODE\tMODE\tUID\tGID\tSIZE(B)\tLASTMOD\tNAME")
for {
info, err := stream.Recv()
if err != nil {
if err == io.EOF || client.StatusCode(err) == codes.Canceled {
return w.Flush()
}
return fmt.Errorf("error streaming results: %s", err)
}
node := defaultNode
if info.Metadata != nil && info.Metadata.Hostname != "" {
node = info.Metadata.Hostname
}
return helpers.ReadGRPCStream(stream, func(info *machineapi.FileInfo, node string, multipleNodes bool) error {
if info.Error != "" {
fmt.Fprintf(os.Stderr, "%s: error reading file %s: %s\n", node, info.Name, info.Error)
continue
}
if info.Metadata != nil && info.Metadata.Error != "" {
fmt.Fprintf(os.Stderr, "%s: %s\n", node, info.Metadata.Error)
continue
return helpers.NonFatalError(fmt.Errorf("%s: error reading file %s: %s", node, info.Name, info.Error))
}
display := info.RelativeName
@ -203,7 +155,9 @@ var lsCmd = &cobra.Command{
timestampFormatted,
display,
)
}
return nil
})
})
},
}

View File

@ -69,6 +69,8 @@ var logsCmd = &cobra.Command{
respCh, errCh := newLineSlicer(stream)
var gotErrors bool
for data := range respCh {
if data.Metadata != nil && data.Metadata.Error != "" {
_, err = fmt.Fprintf(os.Stderr, "ERROR: %s\n", data.Metadata.Error)
@ -76,6 +78,8 @@ var logsCmd = &cobra.Command{
return err
}
gotErrors = true
continue
}
@ -94,6 +98,10 @@ var logsCmd = &cobra.Command{
return fmt.Errorf("error getting logs: %v", err)
}
if gotErrors {
os.Exit(1)
}
return nil
})
},

View File

@ -14,6 +14,7 @@ import (
"google.golang.org/grpc"
"google.golang.org/grpc/peer"
"github.com/talos-systems/talos/cmd/talosctl/pkg/talos/helpers"
"github.com/talos-systems/talos/pkg/cli"
machineapi "github.com/talos-systems/talos/pkg/machinery/api/machine"
"github.com/talos-systems/talos/pkg/machinery/client"
@ -42,10 +43,16 @@ var memoryCmd = &cobra.Command{
}
if verbose {
return verboseRender(&remotePeer, resp)
verboseRender(&remotePeer, resp)
} else {
err = briefRender(&remotePeer, resp)
if err != nil {
return err
}
}
return briefRender(&remotePeer, resp)
return helpers.CheckErrors(resp.Messages...)
})
},
}
@ -79,7 +86,7 @@ func briefRender(remotePeer *peer.Peer, resp *machineapi.MemoryResponse) error {
return w.Flush()
}
func verboseRender(remotePeer *peer.Peer, resp *machineapi.MemoryResponse) error {
func verboseRender(remotePeer *peer.Peer, resp *machineapi.MemoryResponse) {
defaultNode := client.AddrFromPeer(remotePeer)
// Dump as /proc/meminfo
@ -140,8 +147,6 @@ func verboseRender(remotePeer *peer.Peer, resp *machineapi.MemoryResponse) error
fmt.Printf("%s: %d %s\n", "DirectMap2M", msg.Meminfo.Directmap2M, "kB")
fmt.Printf("%s: %d %s\n", "DirectMap1G", msg.Meminfo.Directmap1G, "kB")
}
return nil
}
func init() {

View File

@ -21,6 +21,7 @@ import (
"google.golang.org/grpc"
"google.golang.org/grpc/peer"
"github.com/talos-systems/talos/cmd/talosctl/pkg/talos/helpers"
"github.com/talos-systems/talos/pkg/cli"
machineapi "github.com/talos-systems/talos/pkg/machinery/api/machine"
"github.com/talos-systems/talos/pkg/machinery/client"
@ -89,9 +90,17 @@ func processesUI(ctx context.Context, c *client.Client) {
cli.Should(err)
// x, y, w, h
l.SetRect(0, 0, w, h)
l.WrapText = false
processOutput, err = processesOutput(ctx, c)
cli.Should(err)
if err != nil {
l.Text = err.Error()
l.WrapText = true
ui.Render(l)
return
}
// Dont refresh if we dont have any output
if processOutput == "" {
@ -174,11 +183,7 @@ func processesOutput(ctx context.Context, c *client.Client) (output string, err
resp, err := c.Processes(ctx, grpc.Peer(&remotePeer))
if err != nil {
// TODO: Figure out how to expose errors to client without messing
// up display
// TODO: Update server side code to not throw an error when process
// no longer exists ( /proc/1234/comm no such file or directory )
return output, nil //nolint:nilerr
return output, err
}
defaultNode := client.AddrFromPeer(&remotePeer)
@ -221,5 +226,10 @@ func processesOutput(ctx context.Context, c *client.Client) (output string, err
}
}
return columnize.SimpleFormat(s), err
res := columnize.SimpleFormat(s)
if err != nil {
return res, err
}
return res, helpers.CheckErrors(resp.Messages...)
}

View File

@ -9,9 +9,9 @@ import (
"fmt"
"io"
"os"
"sync"
"github.com/spf13/cobra"
"golang.org/x/sync/errgroup"
"github.com/talos-systems/talos/cmd/talosctl/pkg/talos/helpers"
"github.com/talos-systems/talos/pkg/machinery/client"
@ -44,24 +44,30 @@ var readCmd = &cobra.Command{
defer r.Close() //nolint:errcheck
var wg sync.WaitGroup
var eg errgroup.Group
eg.Go(func() error {
var errors error
wg.Add(1)
go func() {
defer wg.Done()
for err := range errCh {
fmt.Fprintln(os.Stderr, err.Error())
if err != nil {
errors = helpers.AppendErrors(errors, err)
}
}
}()
defer wg.Wait()
return errors
})
_, err = io.Copy(os.Stdout, r)
if err != nil {
return fmt.Errorf("error reading: %w", err)
}
return r.Close()
if err = r.Close(); err != nil {
return err
}
return eg.Wait()
})
},
}

View File

@ -89,7 +89,7 @@ var supportCmd = &cobra.Command{
return nil
})
err = collectData(archive, progress)
collectErr := collectData(archive, progress)
close(progress)
@ -97,15 +97,23 @@ var supportCmd = &cobra.Command{
return e
}
if err != nil {
if err = printErrors(err); err != nil {
if collectErr != nil {
if err = printErrors(collectErr); err != nil {
return err
}
}
fmt.Printf("Support bundle is written to %s\n", supportCmdFlags.output)
return archive.Archive.Close()
if err = archive.Archive.Close(); err != nil {
return err
}
if collectErr != nil {
os.Exit(1)
}
return nil
},
}

View File

@ -9,6 +9,8 @@ import (
"fmt"
"google.golang.org/grpc/metadata"
"github.com/talos-systems/talos/pkg/machinery/api/common"
)
// FailIfMultiNodes checks if ctx contains multi-node request metadata.
@ -24,3 +26,17 @@ func FailIfMultiNodes(ctx context.Context, command string) error {
return fmt.Errorf("command %q is not supported with multiple nodes", command)
}
// CheckErrors goes through the returned message list and checks if any messages have errors set.
func CheckErrors[T interface{ GetMetadata() *common.Metadata }](messages ...T) error {
var err error
for _, msg := range messages {
md := msg.GetMetadata()
if md.Error != "" {
err = AppendErrors(err, fmt.Errorf(md.Error))
}
}
return err
}

View File

@ -0,0 +1,33 @@
// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this
// file, You can obtain one at http://mozilla.org/MPL/2.0/.
package helpers
import (
"fmt"
"strings"
"github.com/fatih/color"
"github.com/gertd/go-pluralize"
"github.com/hashicorp/go-multierror"
)
// AppendErrors adds errors to the multierr wrapper.
func AppendErrors(err error, errs ...error) error {
res := multierror.Append(err, errs...)
res.ErrorFormat = func(errs []error) string {
lines := []string{}
for _, err := range errs {
lines = append(lines, fmt.Sprintf(" %s", err.Error()))
}
count := pluralize.NewClient().Pluralize("error", len(lines), true)
return color.RedString(fmt.Sprintf("%s occurred:\n%s", count, strings.Join(lines, "\n")))
}
return res
}

View File

@ -0,0 +1,95 @@
// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this
// file, You can obtain one at http://mozilla.org/MPL/2.0/.
package helpers
import (
"errors"
"fmt"
"io"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"github.com/talos-systems/talos/pkg/machinery/api/common"
"github.com/talos-systems/talos/pkg/machinery/client"
"github.com/talos-systems/talos/pkg/machinery/proto"
)
// Stream implements the contract for the grpc stream of a specific type.
type Stream[T proto.Message] interface {
Recv() (T, error)
grpc.ClientStream
}
// Message defines the contract for the grpc message.
type Message interface {
GetMetadata() *common.Metadata
proto.Message
}
// ReadGRPCStream consumes all messages from the gRPC stream, handles errors, calls the passed handler for each message.
func ReadGRPCStream[S Stream[T], T Message](stream S, handler func(T, string, bool) error) error {
var streamErrs error
defaultNode := client.RemotePeer(stream.Context())
multipleNodes := false
for {
info, err := stream.Recv()
if err != nil {
if err == io.EOF || client.StatusCode(err) == codes.Canceled {
return streamErrs
}
return fmt.Errorf("error streaming results: %s", err)
}
node := defaultNode
if info.GetMetadata() != nil {
if info.GetMetadata().Hostname != "" {
multipleNodes = true
node = info.GetMetadata().Hostname
}
if info.GetMetadata().Error != "" {
streamErrs = AppendErrors(streamErrs, fmt.Errorf(info.GetMetadata().Error))
continue
}
}
if err = handler(info, node, multipleNodes); err != nil {
var errNonFatal *ErrNonFatalError
if errors.As(err, &errNonFatal) {
streamErrs = AppendErrors(streamErrs, err)
continue
}
return err
}
}
}
// ErrNonFatalError represents the error that can be returned from the handler in the gRPC stream reader
// which doesn't mean that we should stop iterating over the messages in the stream, but log this error
// and continue the process.
type ErrNonFatalError struct {
err error
}
// Error implements error interface.
func (e *ErrNonFatalError) Error() string {
return e.err.Error()
}
// NonFatalError wraps another error into a ErrNonFatal.
func NonFatalError(err error) error {
return &ErrNonFatalError{
err: err,
}
}

2
go.mod
View File

@ -49,6 +49,7 @@ require (
github.com/fatih/color v1.13.0
github.com/fsnotify/fsnotify v1.5.4
github.com/gdamore/tcell/v2 v2.5.2
github.com/gertd/go-pluralize v0.2.1
github.com/gizak/termui/v3 v3.1.0
github.com/godbus/dbus/v5 v5.1.0
github.com/golang/mock v1.6.0
@ -168,7 +169,6 @@ require (
github.com/exponent-io/jsonpath v0.0.0-20151013193312-d6023ce2651d // indirect
github.com/form3tech-oss/jwt-go v3.2.3+incompatible // indirect
github.com/gdamore/encoding v1.0.0 // indirect
github.com/gertd/go-pluralize v0.2.1 // indirect
github.com/ghodss/yaml v1.0.0 // indirect
github.com/go-errors/errors v1.0.1 // indirect
github.com/go-logr/logr v1.2.3 // indirect

View File

@ -120,6 +120,7 @@ func (suite *TalosconfigSuite) TestNew() {
readerOpts: []base.RunOption{
base.StdoutEmpty(),
base.StderrShouldMatch(regexp.MustCompile(`\Qrpc error: code = PermissionDenied desc = not authorized`)),
base.ShouldFail(),
},
},
{
@ -142,7 +143,7 @@ func (suite *TalosconfigSuite) TestNew() {
args: []string{"kubeconfig", "--force", tempDir},
adminOpts: []base.RunOption{base.StdoutEmpty()},
readerOpts: []base.RunOption{
base.ShouldFail(), // why this one fails, but not others?
base.ShouldFail(),
base.StdoutEmpty(),
base.StderrShouldMatch(regexp.MustCompile(`\Qrpc error: code = PermissionDenied desc = not authorized`)),
},

View File

@ -133,8 +133,10 @@ func (suite *DiskUsageSuite) TestError() {
"usage", "--nodes",
suite.RandomDiscoveredNodeInternalIP(), "/no/such/folder/here/just/for/sure",
},
base.ShouldFail(),
base.StderrNotEmpty(),
base.StdoutEmpty())
base.StdoutEmpty(),
)
}
func init() {

View File

@ -56,6 +56,7 @@ func (suite *LogsSuite) TestServiceNotFound() {
base.StdoutEmpty(),
base.StderrNotEmpty(),
base.StderrShouldMatch(regexp.MustCompile(`ERROR:.+ log "servicenotfound" was not registered`)),
base.ShouldFail(),
)
}

View File

@ -18,6 +18,9 @@ import (
"github.com/talos-systems/talos/pkg/machinery/proto"
)
// ErrEventNotSupported is returned from the event decoder when we encounter an unknown event.
var ErrEventNotSupported = fmt.Errorf("event is not supported")
// EventsOptionFunc defines the options for the Events API.
type EventsOptionFunc func(opts *machineapi.EventsRequest)
@ -103,53 +106,66 @@ func (c *Client) EventsWatch(ctx context.Context, watchFunc func(<-chan Event),
return fmt.Errorf("failed to watch events: %w", err)
}
typeURL := event.GetData().GetTypeUrl()
var msg proto.Message
for _, eventType := range []proto.Message{
&machineapi.SequenceEvent{},
&machineapi.PhaseEvent{},
&machineapi.TaskEvent{},
&machineapi.ServiceStateEvent{},
&machineapi.ConfigLoadErrorEvent{},
&machineapi.ConfigValidationErrorEvent{},
&machineapi.AddressEvent{},
&machineapi.MachineStatusEvent{},
} {
if typeURL == "talos/runtime/"+string(eventType.ProtoReflect().Descriptor().FullName()) {
msg = eventType
break
}
}
if msg == nil {
// We haven't implemented the handling of this event yet.
ev, err := UnmarshalEvent(event)
if err != nil {
continue
}
if err = proto.Unmarshal(event.GetData().GetValue(), msg); err != nil {
log.Printf("failed to unmarshal message: %v", err) // TODO: this should be fixed to return errors
continue
}
ev := Event{
Node: defaultNode,
TypeURL: typeURL,
ID: event.Id,
Payload: msg,
}
if event.Metadata != nil {
ev.Node = event.Metadata.Hostname
if ev.Node == "" {
ev.Node = defaultNode
}
select {
case ch <- ev:
case ch <- *ev:
case <-ctx.Done():
return nil
}
}
}
// UnmarshalEvent decodes the event coming from the gRPC stream from any to the exact type.
func UnmarshalEvent(event *machineapi.Event) (*Event, error) {
typeURL := event.GetData().GetTypeUrl()
var msg proto.Message
for _, eventType := range []proto.Message{
&machineapi.SequenceEvent{},
&machineapi.PhaseEvent{},
&machineapi.TaskEvent{},
&machineapi.ServiceStateEvent{},
&machineapi.ConfigLoadErrorEvent{},
&machineapi.ConfigValidationErrorEvent{},
&machineapi.AddressEvent{},
&machineapi.MachineStatusEvent{},
} {
if typeURL == "talos/runtime/"+string(eventType.ProtoReflect().Descriptor().FullName()) {
msg = eventType
break
}
}
if msg == nil {
// We haven't implemented the handling of this event yet.
return nil, ErrEventNotSupported
}
if err := proto.Unmarshal(event.GetData().GetValue(), msg); err != nil {
log.Printf("failed to unmarshal message: %v", err)
return nil, err
}
ev := Event{
TypeURL: typeURL,
ID: event.Id,
Payload: msg,
}
if event.Metadata != nil {
ev.Node = event.Metadata.Hostname
}
return &ev, nil
}